Skip to content

Instantly share code, notes, and snippets.

@l04m33
Last active April 10, 2017 14:45
Show Gist options
  • Save l04m33/1aa059b1a85c73bc7222 to your computer and use it in GitHub Desktop.
Save l04m33/1aa059b1a85c73bc7222 to your computer and use it in GitHub Desktop.

Revisions

  1. l04m33 revised this gist Sep 24, 2014. 1 changed file with 0 additions and 1 deletion.
    1 change: 0 additions & 1 deletion gistfile1.txt
    @@ -1 +0,0 @@
    test
  2. l04m33 revised this gist Sep 24, 2014. 1 changed file with 1 addition and 0 deletions.
    1 change: 1 addition & 0 deletions gistfile1.txt
    @@ -0,0 +1 @@
    test
  3. l04m33 revised this gist Aug 17, 2014. 1 changed file with 87 additions and 11 deletions.
    98 changes: 87 additions & 11 deletions async_file_wrapper.py
    @@ -1,5 +1,5 @@
    class AsyncFileWrapper(object):
    DEFAULT_BLOCK_SIZE = 4096
    DEFAULT_BLOCK_SIZE = 8192

    def __init__(self, loop=None, filename=None,
    fileobj=None, mode='rb'):
    @@ -13,15 +13,37 @@ def __init__(self, loop=None, filename=None,
    fileobj = open(filename, mode=mode)
    elif 'b' not in fileobj.mode:
    raise RuntimeError('Only binary mode is supported')

    fl = fcntl.fcntl(fileobj, fcntl.F_GETFL)
    if fcntl.fcntl(fileobj, fcntl.F_SETFL, fl | os.O_NONBLOCK) != 0:
    if filename is not None:
    fileobj.close()
    errcode = ctypes.get_errno()
    raise OSError((errcode, errno.errorcode[errcode]))

    self.fileobj = fileobj

    if loop is None:
    loop = asyncio.get_event_loop()
    self.loop = loop
    self.rbuffer = bytearray()

    def seek(self, offset, whence=None):
    if whence is None:
    return self.fileobj.seek(offset)
    else:
    return self.fileobj.seek(offset, whence)

    def read_ready(self, future, n, total):
    res = self.fileobj.read1(n)
    try:
    res = self.fileobj.read(n)
    except Exception as exc:
    future.set_exception(exc)
    return

    if res is None: # Blocked
    self.read_handle = self.loop.call_soon(self.read_ready, future, n, total)
    return

    if not res: # EOF
    future.set_result(bytes(self.rbuffer))
    @@ -31,13 +53,14 @@ def read_ready(self, future, n, total):

    if total > 0:
    more_to_go = total - len(self.rbuffer)
    if more_to_go <= 0: # enough
    if more_to_go <= 0: # enough
    res, self.rbuffer = self.rbuffer[:n], self.rbuffer[n:]
    future.set_result(bytes(res))
    else:
    self.loop.call_soon(self.read_ready, future, more_to_go, total)
    else: # < 0
    self.loop.call_soon(self.read_ready, future, self.DEFAULT_BLOCK_SIZE, total)
    more_to_go = min(self.DEFAULT_BLOCK_SIZE, more_to_go)
    self.read_handle = self.loop.call_soon(self.read_ready, future, more_to_go, total)
    else: # total < 0
    self.read_handle = self.loop.call_soon(self.read_ready, future, self.DEFAULT_BLOCK_SIZE, total)

    @asyncio.coroutine
    def read(self, n=-1):
    @@ -47,15 +70,68 @@ def read(self, n=-1):
    future.set_result(b'')
    return future
    elif n < 0:
    self.loop.call_soon(self.read_ready, future, self.DEFAULT_BLOCK_SIZE, n)
    self.rbuffer.clear()
    self.read_handle = self.loop.call_soon(self.read_ready, future, self.DEFAULT_BLOCK_SIZE, n)
    else:
    self.loop.call_soon(self.read_ready, future, n, n)
    self.rbuffer.clear()
    read_block_size = min(self.DEFAULT_BLOCK_SIZE, n)
    self.read_handle = self.loop.call_soon(self.read_ready, future, read_block_size, n)

    return future

    def write_ready(self, future, data, written):
    try:
    res = self.fileobj.write(data)
    except BlockingIOError:
    self.write_handle = self.loop.call_soon(self.write_ready, future, data, written)
    return
    except Exception as exc:
    future.set_exception(exc)
    return

    if res < len(data):
    data = data[res:]
    self.write_handle = self.loop.call_soon(self.write_ready, future, data, written + res)
    else:
    future.set_result(written + res)

    @asyncio.coroutine
    def write(self, data):
    # XXX: big data?
    return self.fileobj.write(data)
    future = asyncio.Future(loop=self.loop)

    if len(data) > 0:
    self.write_handle = self.loop.call_soon(self.write_ready, future, data, 0)
    else:
    future.set_result(0)

    return future

    @asyncio.coroutine
    def copy_to(self, dest, copy_len=-1):
    copied_size = 0
    while copy_len != 0:
    if copy_len >= 0:
    read_size = min(copy_len, self.DEFAULT_BLOCK_SIZE)
    else:
    read_size = self.DEFAULT_BLOCK_SIZE
    rcontent = yield from self.read(read_size)
    rlen = len(rcontent)

    if rlen <= 0:
    break

    write_res = dest.write(rcontent)
    if isinstance(write_res, asyncio.Future) \
    or asyncio.iscoroutine(write_res):
    yield from write_res
    copied_size += rlen
    copy_len = copy_len - len(rcontent) if copy_len > 0 else copy_len

    return copied_size

    def close(self):
    self.fileobj
    self.fileobj.close()
    if hasattr(self, 'read_handle'):
    self.read_handle.cancel()
    if hasattr(self, 'write_handle'):
    self.write_handle.cancel()
  4. l04m33 created this gist Aug 17, 2014.
    61 changes: 61 additions & 0 deletions async_file_wrapper.py
    @@ -0,0 +1,61 @@
    class AsyncFileWrapper(object):
    DEFAULT_BLOCK_SIZE = 4096

    def __init__(self, loop=None, filename=None,
    fileobj=None, mode='rb'):
    if (filename is None and fileobj is None) or \
    (filename is not None and fileobj is not None):
    raise RuntimeError('Confilicting arguments')

    if filename is not None:
    if 'b' not in mode:
    raise RuntimeError('Only binary mode is supported')
    fileobj = open(filename, mode=mode)
    elif 'b' not in fileobj.mode:
    raise RuntimeError('Only binary mode is supported')
    self.fileobj = fileobj

    if loop is None:
    loop = asyncio.get_event_loop()
    self.loop = loop
    self.rbuffer = bytearray()

    def read_ready(self, future, n, total):
    res = self.fileobj.read1(n)

    if not res: # EOF
    future.set_result(bytes(self.rbuffer))
    return

    self.rbuffer.extend(res)

    if total > 0:
    more_to_go = total - len(self.rbuffer)
    if more_to_go <= 0: # enough
    res, self.rbuffer = self.rbuffer[:n], self.rbuffer[n:]
    future.set_result(bytes(res))
    else:
    self.loop.call_soon(self.read_ready, future, more_to_go, total)
    else: # < 0
    self.loop.call_soon(self.read_ready, future, self.DEFAULT_BLOCK_SIZE, total)

    @asyncio.coroutine
    def read(self, n=-1):
    future = asyncio.Future(loop=self.loop)

    if n == 0:
    future.set_result(b'')
    return future
    elif n < 0:
    self.loop.call_soon(self.read_ready, future, self.DEFAULT_BLOCK_SIZE, n)
    else:
    self.loop.call_soon(self.read_ready, future, n, n)

    return future

    def write(self, data):
    # XXX: big data?
    return self.fileobj.write(data)

    def close(self):
    self.fileobj