Module spin_sdk.http.poll_loop
Defines a custom asyncio
event loop backed by wasi:io/poll#poll
.
This also includes helper classes and functions for working with wasi:http
.
As of WASI Preview 2, there is not yet a standard for first-class, composable asynchronous functions and streams. We expect that little or none of this boilerplate will be needed once those features arrive in Preview 3.
Expand source code
"""Defines a custom `asyncio` event loop backed by `wasi:io/poll#poll`.
This also includes helper classes and functions for working with `wasi:http`.
As of WASI Preview 2, there is not yet a standard for first-class, composable
asynchronous functions and streams. We expect that little or none of this
boilerplate will be needed once those features arrive in Preview 3.
"""
import asyncio
import socket
import subprocess
from spin_sdk.wit.types import Ok, Err
from spin_sdk.wit.imports import types, streams, poll, outgoing_handler
from spin_sdk.wit.imports.types import IncomingBody, OutgoingBody, OutgoingRequest, IncomingResponse
from spin_sdk.wit.imports.streams import StreamErrorClosed, InputStream
from spin_sdk.wit.imports.poll import Pollable
from typing import Optional, cast
# Maximum number of bytes to read at a time
READ_SIZE: int = 16 * 1024
async def send(request: OutgoingRequest) -> IncomingResponse:
"""Send the specified request and wait asynchronously for the response."""
future = outgoing_handler.handle(request, None)
while True:
response = future.get()
if response is None:
await register(cast(PollLoop, asyncio.get_event_loop()), future.subscribe())
else:
future.__exit__()
if isinstance(response, Ok):
if isinstance(response.value, Ok):
return response.value.value
else:
raise response.value
else:
raise response
class Stream:
"""Reader abstraction over `wasi:http/types#incoming-body`."""
def __init__(self, body: IncomingBody):
self.body: Optional[IncomingBody] = body
self.stream: Optional[InputStream] = body.stream()
async def next(self) -> Optional[bytes]:
"""Wait for the next chunk of data to arrive on the stream.
This will return `None` when the end of the stream has been reached.
"""
while True:
try:
if self.stream is None:
return None
else:
buffer = self.stream.read(READ_SIZE)
if len(buffer) == 0:
await register(cast(PollLoop, asyncio.get_event_loop()), self.stream.subscribe())
else:
return buffer
except Err as e:
if isinstance(e.value, StreamErrorClosed):
if self.stream is not None:
self.stream.__exit__()
self.stream = None
if self.body is not None:
IncomingBody.finish(self.body)
self.body = None
else:
raise e
class Sink:
"""Writer abstraction over `wasi-http/types#outgoing-body`."""
def __init__(self, body: OutgoingBody):
self.body = body
self.stream = body.write()
async def send(self, chunk: bytes):
"""Write the specified bytes to the sink.
This may need to yield according to the backpressure requirements of the sink.
"""
offset = 0
flushing = False
while True:
count = self.stream.check_write()
if count == 0:
await register(cast(PollLoop, asyncio.get_event_loop()), self.stream.subscribe())
elif offset == len(chunk):
if flushing:
return
else:
self.stream.flush()
flushing = True
else:
count = min(count, len(chunk) - offset)
self.stream.write(chunk[offset:offset+count])
offset += count
def close(self):
"""Close the stream, indicating no further data will be written."""
self.stream.__exit__()
self.stream = None
OutgoingBody.finish(self.body, None)
self.body = None
class PollLoop(asyncio.AbstractEventLoop):
"""Custom `asyncio` event loop backed by `wasi:io/poll#poll`."""
def __init__(self):
self.wakers = []
self.running = False
self.handles = []
self.exception = None
def get_debug(self):
return False
def run_until_complete(self, future):
future = asyncio.ensure_future(future, loop=self)
self.running = True
asyncio.events._set_running_loop(self)
while self.running and not future.done():
handle = self.handles[0]
self.handles = self.handles[1:]
if not handle._cancelled:
handle._run()
if self.wakers:
[pollables, wakers] = list(map(list, zip(*self.wakers)))
new_wakers = []
ready = [False] * len(pollables)
for index in poll.poll(pollables):
ready[index] = True
for (ready, pollable), waker in zip(zip(ready, pollables), wakers):
if ready:
pollable.__exit__()
waker.set_result(None)
else:
new_wakers.append((pollable, waker))
self.wakers = new_wakers
if self.exception is not None:
raise self.exception
future.result()
def is_running(self):
return self.running
def is_closed(self):
return not self.running
def stop(self):
self.running = False
def close(self):
self.running = False
def shutdown_asyncgens(self):
pass
def call_exception_handler(self, context):
self.exception = context.get('exception', None)
def call_soon(self, callback, *args, context=None):
handle = asyncio.Handle(callback, args, self, context)
self.handles.append(handle)
return handle
def create_task(self, coroutine):
return asyncio.Task(coroutine, loop=self)
def create_future(self):
return asyncio.Future(loop=self)
# The remaining methods should be irrelevant for our purposes and thus unimplemented
def run_forever(self):
raise NotImplementedError
async def shutdown_default_executor(self):
raise NotImplementedError
def _timer_handle_cancelled(self, handle):
raise NotImplementedError
def call_later(self, delay, callback, *args, context=None):
raise NotImplementedError
def call_at(self, when, callback, *args, context=None):
raise NotImplementedError
def time(self):
raise NotImplementedError
def call_soon_threadsafe(self, callback, *args, context=None):
raise NotImplementedError
def run_in_executor(self, executor, func, *args):
raise NotImplementedError
def set_default_executor(self, executor):
raise NotImplementedError
async def getaddrinfo(self, host, port, *,
family=0, type=0, proto=0, flags=0):
raise NotImplementedError
async def getnameinfo(self, sockaddr, flags=0):
raise NotImplementedError
async def create_connection(
self, protocol_factory, host=None, port=None,
*, ssl=None, family=0, proto=0,
flags=0, sock=None, local_addr=None,
server_hostname=None,
ssl_handshake_timeout=None,
ssl_shutdown_timeout=None,
happy_eyeballs_delay=None, interleave=None):
raise NotImplementedError
async def create_server(
self, protocol_factory, host=None, port=None,
*, family=socket.AF_UNSPEC,
flags=socket.AI_PASSIVE, sock=None, backlog=100,
ssl=None, reuse_address=None, reuse_port=None,
ssl_handshake_timeout=None,
ssl_shutdown_timeout=None,
start_serving=True):
raise NotImplementedError
async def sendfile(self, transport, file, offset=0, count=None,
*, fallback=True):
raise NotImplementedError
async def start_tls(self, transport, protocol, sslcontext, *,
server_side=False,
server_hostname=None,
ssl_handshake_timeout=None,
ssl_shutdown_timeout=None):
raise NotImplementedError
async def create_unix_connection(
self, protocol_factory, path=None, *,
ssl=None, sock=None,
server_hostname=None,
ssl_handshake_timeout=None,
ssl_shutdown_timeout=None):
raise NotImplementedError
async def create_unix_server(
self, protocol_factory, path=None, *,
sock=None, backlog=100, ssl=None,
ssl_handshake_timeout=None,
ssl_shutdown_timeout=None,
start_serving=True):
raise NotImplementedError
async def connect_accepted_socket(
self, protocol_factory, sock,
*, ssl=None,
ssl_handshake_timeout=None,
ssl_shutdown_timeout=None):
raise NotImplementedError
async def create_datagram_endpoint(self, protocol_factory,
local_addr=None, remote_addr=None, *,
family=0, proto=0, flags=0,
reuse_address=None, reuse_port=None,
allow_broadcast=None, sock=None):
raise NotImplementedError
async def connect_read_pipe(self, protocol_factory, pipe):
raise NotImplementedError
async def connect_write_pipe(self, protocol_factory, pipe):
raise NotImplementedError
async def subprocess_shell(self, protocol_factory, cmd, *,
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
**kwargs):
raise NotImplementedError
async def subprocess_exec(self, protocol_factory, *args,
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
**kwargs):
raise NotImplementedError
def add_reader(self, fd, callback, *args):
raise NotImplementedError
def remove_reader(self, fd):
raise NotImplementedError
def add_writer(self, fd, callback, *args):
raise NotImplementedError
def remove_writer(self, fd):
raise NotImplementedError
async def sock_recv(self, sock, nbytes):
raise NotImplementedError
async def sock_recv_into(self, sock, buf):
raise NotImplementedError
async def sock_recvfrom(self, sock, bufsize):
raise NotImplementedError
async def sock_recvfrom_into(self, sock, buf, nbytes=0):
raise NotImplementedError
async def sock_sendall(self, sock, data):
raise NotImplementedError
async def sock_sendto(self, sock, data, address):
raise NotImplementedError
async def sock_connect(self, sock, address):
raise NotImplementedError
async def sock_accept(self, sock):
raise NotImplementedError
async def sock_sendfile(self, sock, file, offset=0, count=None,
*, fallback=None):
raise NotImplementedError
def add_signal_handler(self, sig, callback, *args):
raise NotImplementedError
def remove_signal_handler(self, sig):
raise NotImplementedError
def set_task_factory(self, factory):
raise NotImplementedError
def get_task_factory(self):
raise NotImplementedError
def get_exception_handler(self):
raise NotImplementedError
def set_exception_handler(self, handler):
raise NotImplementedError
def default_exception_handler(self, context):
raise NotImplementedError
def set_debug(self, enabled):
raise NotImplementedError
async def register(loop: PollLoop, pollable: Pollable):
waker = loop.create_future()
loop.wakers.append((pollable, waker))
await waker
Functions
async def register(loop: PollLoop, pollable: Pollable)
-
Expand source code
async def register(loop: PollLoop, pollable: Pollable): waker = loop.create_future() loop.wakers.append((pollable, waker)) await waker
async def send(request: OutgoingRequest) ‑> IncomingResponse
-
Send the specified request and wait asynchronously for the response.
Expand source code
async def send(request: OutgoingRequest) -> IncomingResponse: """Send the specified request and wait asynchronously for the response.""" future = outgoing_handler.handle(request, None) while True: response = future.get() if response is None: await register(cast(PollLoop, asyncio.get_event_loop()), future.subscribe()) else: future.__exit__() if isinstance(response, Ok): if isinstance(response.value, Ok): return response.value.value else: raise response.value else: raise response
Classes
class PollLoop
-
Custom
asyncio
event loop backed bywasi:io/poll#poll
.Expand source code
class PollLoop(asyncio.AbstractEventLoop): """Custom `asyncio` event loop backed by `wasi:io/poll#poll`.""" def __init__(self): self.wakers = [] self.running = False self.handles = [] self.exception = None def get_debug(self): return False def run_until_complete(self, future): future = asyncio.ensure_future(future, loop=self) self.running = True asyncio.events._set_running_loop(self) while self.running and not future.done(): handle = self.handles[0] self.handles = self.handles[1:] if not handle._cancelled: handle._run() if self.wakers: [pollables, wakers] = list(map(list, zip(*self.wakers))) new_wakers = [] ready = [False] * len(pollables) for index in poll.poll(pollables): ready[index] = True for (ready, pollable), waker in zip(zip(ready, pollables), wakers): if ready: pollable.__exit__() waker.set_result(None) else: new_wakers.append((pollable, waker)) self.wakers = new_wakers if self.exception is not None: raise self.exception future.result() def is_running(self): return self.running def is_closed(self): return not self.running def stop(self): self.running = False def close(self): self.running = False def shutdown_asyncgens(self): pass def call_exception_handler(self, context): self.exception = context.get('exception', None) def call_soon(self, callback, *args, context=None): handle = asyncio.Handle(callback, args, self, context) self.handles.append(handle) return handle def create_task(self, coroutine): return asyncio.Task(coroutine, loop=self) def create_future(self): return asyncio.Future(loop=self) # The remaining methods should be irrelevant for our purposes and thus unimplemented def run_forever(self): raise NotImplementedError async def shutdown_default_executor(self): raise NotImplementedError def _timer_handle_cancelled(self, handle): raise NotImplementedError def call_later(self, delay, callback, *args, context=None): raise NotImplementedError def call_at(self, when, callback, *args, context=None): raise NotImplementedError def time(self): raise NotImplementedError def call_soon_threadsafe(self, callback, *args, context=None): raise NotImplementedError def run_in_executor(self, executor, func, *args): raise NotImplementedError def set_default_executor(self, executor): raise NotImplementedError async def getaddrinfo(self, host, port, *, family=0, type=0, proto=0, flags=0): raise NotImplementedError async def getnameinfo(self, sockaddr, flags=0): raise NotImplementedError async def create_connection( self, protocol_factory, host=None, port=None, *, ssl=None, family=0, proto=0, flags=0, sock=None, local_addr=None, server_hostname=None, ssl_handshake_timeout=None, ssl_shutdown_timeout=None, happy_eyeballs_delay=None, interleave=None): raise NotImplementedError async def create_server( self, protocol_factory, host=None, port=None, *, family=socket.AF_UNSPEC, flags=socket.AI_PASSIVE, sock=None, backlog=100, ssl=None, reuse_address=None, reuse_port=None, ssl_handshake_timeout=None, ssl_shutdown_timeout=None, start_serving=True): raise NotImplementedError async def sendfile(self, transport, file, offset=0, count=None, *, fallback=True): raise NotImplementedError async def start_tls(self, transport, protocol, sslcontext, *, server_side=False, server_hostname=None, ssl_handshake_timeout=None, ssl_shutdown_timeout=None): raise NotImplementedError async def create_unix_connection( self, protocol_factory, path=None, *, ssl=None, sock=None, server_hostname=None, ssl_handshake_timeout=None, ssl_shutdown_timeout=None): raise NotImplementedError async def create_unix_server( self, protocol_factory, path=None, *, sock=None, backlog=100, ssl=None, ssl_handshake_timeout=None, ssl_shutdown_timeout=None, start_serving=True): raise NotImplementedError async def connect_accepted_socket( self, protocol_factory, sock, *, ssl=None, ssl_handshake_timeout=None, ssl_shutdown_timeout=None): raise NotImplementedError async def create_datagram_endpoint(self, protocol_factory, local_addr=None, remote_addr=None, *, family=0, proto=0, flags=0, reuse_address=None, reuse_port=None, allow_broadcast=None, sock=None): raise NotImplementedError async def connect_read_pipe(self, protocol_factory, pipe): raise NotImplementedError async def connect_write_pipe(self, protocol_factory, pipe): raise NotImplementedError async def subprocess_shell(self, protocol_factory, cmd, *, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE, **kwargs): raise NotImplementedError async def subprocess_exec(self, protocol_factory, *args, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE, **kwargs): raise NotImplementedError def add_reader(self, fd, callback, *args): raise NotImplementedError def remove_reader(self, fd): raise NotImplementedError def add_writer(self, fd, callback, *args): raise NotImplementedError def remove_writer(self, fd): raise NotImplementedError async def sock_recv(self, sock, nbytes): raise NotImplementedError async def sock_recv_into(self, sock, buf): raise NotImplementedError async def sock_recvfrom(self, sock, bufsize): raise NotImplementedError async def sock_recvfrom_into(self, sock, buf, nbytes=0): raise NotImplementedError async def sock_sendall(self, sock, data): raise NotImplementedError async def sock_sendto(self, sock, data, address): raise NotImplementedError async def sock_connect(self, sock, address): raise NotImplementedError async def sock_accept(self, sock): raise NotImplementedError async def sock_sendfile(self, sock, file, offset=0, count=None, *, fallback=None): raise NotImplementedError def add_signal_handler(self, sig, callback, *args): raise NotImplementedError def remove_signal_handler(self, sig): raise NotImplementedError def set_task_factory(self, factory): raise NotImplementedError def get_task_factory(self): raise NotImplementedError def get_exception_handler(self): raise NotImplementedError def set_exception_handler(self, handler): raise NotImplementedError def default_exception_handler(self, context): raise NotImplementedError def set_debug(self, enabled): raise NotImplementedError
Ancestors
- asyncio.events.AbstractEventLoop
Methods
def add_reader(self, fd, callback, *args)
-
Expand source code
def add_reader(self, fd, callback, *args): raise NotImplementedError
def add_signal_handler(self, sig, callback, *args)
-
Expand source code
def add_signal_handler(self, sig, callback, *args): raise NotImplementedError
def add_writer(self, fd, callback, *args)
-
Expand source code
def add_writer(self, fd, callback, *args): raise NotImplementedError
def call_at(self, when, callback, *args, context=None)
-
Expand source code
def call_at(self, when, callback, *args, context=None): raise NotImplementedError
def call_exception_handler(self, context)
-
Expand source code
def call_exception_handler(self, context): self.exception = context.get('exception', None)
def call_later(self, delay, callback, *args, context=None)
-
Expand source code
def call_later(self, delay, callback, *args, context=None): raise NotImplementedError
def call_soon(self, callback, *args, context=None)
-
Expand source code
def call_soon(self, callback, *args, context=None): handle = asyncio.Handle(callback, args, self, context) self.handles.append(handle) return handle
def call_soon_threadsafe(self, callback, *args, context=None)
-
Expand source code
def call_soon_threadsafe(self, callback, *args, context=None): raise NotImplementedError
def close(self)
-
Close the loop.
The loop should not be running.
This is idempotent and irreversible.
No other methods should be called after this one.
Expand source code
def close(self): self.running = False
async def connect_accepted_socket(self, protocol_factory, sock, *, ssl=None, ssl_handshake_timeout=None, ssl_shutdown_timeout=None)
-
Handle an accepted connection.
This is used by servers that accept connections outside of asyncio, but use asyncio to handle connections.
This method is a coroutine. When completed, the coroutine returns a (transport, protocol) pair.
Expand source code
async def connect_accepted_socket( self, protocol_factory, sock, *, ssl=None, ssl_handshake_timeout=None, ssl_shutdown_timeout=None): raise NotImplementedError
async def connect_read_pipe(self, protocol_factory, pipe)
-
Register read pipe in event loop. Set the pipe to non-blocking mode.
protocol_factory should instantiate object with Protocol interface. pipe is a file-like object. Return pair (transport, protocol), where transport supports the ReadTransport interface.
Expand source code
async def connect_read_pipe(self, protocol_factory, pipe): raise NotImplementedError
async def connect_write_pipe(self, protocol_factory, pipe)
-
Register write pipe in event loop.
protocol_factory should instantiate object with BaseProtocol interface. Pipe is file-like object already switched to nonblocking. Return pair (transport, protocol), where transport support WriteTransport interface.
Expand source code
async def connect_write_pipe(self, protocol_factory, pipe): raise NotImplementedError
async def create_connection(self, protocol_factory, host=None, port=None, *, ssl=None, family=0, proto=0, flags=0, sock=None, local_addr=None, server_hostname=None, ssl_handshake_timeout=None, ssl_shutdown_timeout=None, happy_eyeballs_delay=None, interleave=None)
-
Expand source code
async def create_connection( self, protocol_factory, host=None, port=None, *, ssl=None, family=0, proto=0, flags=0, sock=None, local_addr=None, server_hostname=None, ssl_handshake_timeout=None, ssl_shutdown_timeout=None, happy_eyeballs_delay=None, interleave=None): raise NotImplementedError
async def create_datagram_endpoint(self, protocol_factory, local_addr=None, remote_addr=None, *, family=0, proto=0, flags=0, reuse_address=None, reuse_port=None, allow_broadcast=None, sock=None)
-
A coroutine which creates a datagram endpoint.
This method will try to establish the endpoint in the background. When successful, the coroutine returns a (transport, protocol) pair.
protocol_factory must be a callable returning a protocol instance.
socket family AF_INET, socket.AF_INET6 or socket.AF_UNIX depending on host (or family if specified), socket type SOCK_DGRAM.
reuse_address tells the kernel to reuse a local socket in TIME_WAIT state, without waiting for its natural timeout to expire. If not specified it will automatically be set to True on UNIX.
reuse_port tells the kernel to allow this endpoint to be bound to the same port as other existing endpoints are bound to, so long as they all set this flag when being created. This option is not supported on Windows and some UNIX's. If the :py:data:
~socket.SO_REUSEPORT
constant is not defined then this capability is unsupported.allow_broadcast tells the kernel to allow this endpoint to send messages to the broadcast address.
sock can optionally be specified in order to use a preexisting socket object.
Expand source code
async def create_datagram_endpoint(self, protocol_factory, local_addr=None, remote_addr=None, *, family=0, proto=0, flags=0, reuse_address=None, reuse_port=None, allow_broadcast=None, sock=None): raise NotImplementedError
def create_future(self)
-
Expand source code
def create_future(self): return asyncio.Future(loop=self)
async def create_server(self, protocol_factory, host=None, port=None, *, family=0, flags=1, sock=None, backlog=100, ssl=None, reuse_address=None, reuse_port=None, ssl_handshake_timeout=None, ssl_shutdown_timeout=None, start_serving=True)
-
A coroutine which creates a TCP server bound to host and port.
The return value is a Server object which can be used to stop the service.
If host is an empty string or None all interfaces are assumed and a list of multiple sockets will be returned (most likely one for IPv4 and another one for IPv6). The host parameter can also be a sequence (e.g. list) of hosts to bind to.
family can be set to either AF_INET or AF_INET6 to force the socket to use IPv4 or IPv6. If not set it will be determined from host (defaults to AF_UNSPEC).
flags is a bitmask for getaddrinfo().
sock can optionally be specified in order to use a preexisting socket object.
backlog is the maximum number of queued connections passed to listen() (defaults to 100).
ssl can be set to an SSLContext to enable SSL over the accepted connections.
reuse_address tells the kernel to reuse a local socket in TIME_WAIT state, without waiting for its natural timeout to expire. If not specified will automatically be set to True on UNIX.
reuse_port tells the kernel to allow this endpoint to be bound to the same port as other existing endpoints are bound to, so long as they all set this flag when being created. This option is not supported on Windows.
ssl_handshake_timeout is the time in seconds that an SSL server will wait for completion of the SSL handshake before aborting the connection. Default is 60s.
ssl_shutdown_timeout is the time in seconds that an SSL server will wait for completion of the SSL shutdown procedure before aborting the connection. Default is 30s.
start_serving set to True (default) causes the created server to start accepting connections immediately. When set to False, the user should await Server.start_serving() or Server.serve_forever() to make the server to start accepting connections.
Expand source code
async def create_server( self, protocol_factory, host=None, port=None, *, family=socket.AF_UNSPEC, flags=socket.AI_PASSIVE, sock=None, backlog=100, ssl=None, reuse_address=None, reuse_port=None, ssl_handshake_timeout=None, ssl_shutdown_timeout=None, start_serving=True): raise NotImplementedError
def create_task(self, coroutine)
-
Expand source code
def create_task(self, coroutine): return asyncio.Task(coroutine, loop=self)
async def create_unix_connection(self, protocol_factory, path=None, *, ssl=None, sock=None, server_hostname=None, ssl_handshake_timeout=None, ssl_shutdown_timeout=None)
-
Expand source code
async def create_unix_connection( self, protocol_factory, path=None, *, ssl=None, sock=None, server_hostname=None, ssl_handshake_timeout=None, ssl_shutdown_timeout=None): raise NotImplementedError
async def create_unix_server(self, protocol_factory, path=None, *, sock=None, backlog=100, ssl=None, ssl_handshake_timeout=None, ssl_shutdown_timeout=None, start_serving=True)
-
A coroutine which creates a UNIX Domain Socket server.
The return value is a Server object, which can be used to stop the service.
path is a str, representing a file system path to bind the server socket to.
sock can optionally be specified in order to use a preexisting socket object.
backlog is the maximum number of queued connections passed to listen() (defaults to 100).
ssl can be set to an SSLContext to enable SSL over the accepted connections.
ssl_handshake_timeout is the time in seconds that an SSL server will wait for the SSL handshake to complete (defaults to 60s).
ssl_shutdown_timeout is the time in seconds that an SSL server will wait for the SSL shutdown to finish (defaults to 30s).
start_serving set to True (default) causes the created server to start accepting connections immediately. When set to False, the user should await Server.start_serving() or Server.serve_forever() to make the server to start accepting connections.
Expand source code
async def create_unix_server( self, protocol_factory, path=None, *, sock=None, backlog=100, ssl=None, ssl_handshake_timeout=None, ssl_shutdown_timeout=None, start_serving=True): raise NotImplementedError
def default_exception_handler(self, context)
-
Expand source code
def default_exception_handler(self, context): raise NotImplementedError
def get_debug(self)
-
Expand source code
def get_debug(self): return False
def get_exception_handler(self)
-
Expand source code
def get_exception_handler(self): raise NotImplementedError
def get_task_factory(self)
-
Expand source code
def get_task_factory(self): raise NotImplementedError
async def getaddrinfo(self, host, port, *, family=0, type=0, proto=0, flags=0)
-
Expand source code
async def getaddrinfo(self, host, port, *, family=0, type=0, proto=0, flags=0): raise NotImplementedError
async def getnameinfo(self, sockaddr, flags=0)
-
Expand source code
async def getnameinfo(self, sockaddr, flags=0): raise NotImplementedError
def is_closed(self)
-
Returns True if the event loop was closed.
Expand source code
def is_closed(self): return not self.running
def is_running(self)
-
Return whether the event loop is currently running.
Expand source code
def is_running(self): return self.running
def remove_reader(self, fd)
-
Expand source code
def remove_reader(self, fd): raise NotImplementedError
def remove_signal_handler(self, sig)
-
Expand source code
def remove_signal_handler(self, sig): raise NotImplementedError
def remove_writer(self, fd)
-
Expand source code
def remove_writer(self, fd): raise NotImplementedError
def run_forever(self)
-
Run the event loop until stop() is called.
Expand source code
def run_forever(self): raise NotImplementedError
def run_in_executor(self, executor, func, *args)
-
Expand source code
def run_in_executor(self, executor, func, *args): raise NotImplementedError
def run_until_complete(self, future)
-
Run the event loop until a Future is done.
Return the Future's result, or raise its exception.
Expand source code
def run_until_complete(self, future): future = asyncio.ensure_future(future, loop=self) self.running = True asyncio.events._set_running_loop(self) while self.running and not future.done(): handle = self.handles[0] self.handles = self.handles[1:] if not handle._cancelled: handle._run() if self.wakers: [pollables, wakers] = list(map(list, zip(*self.wakers))) new_wakers = [] ready = [False] * len(pollables) for index in poll.poll(pollables): ready[index] = True for (ready, pollable), waker in zip(zip(ready, pollables), wakers): if ready: pollable.__exit__() waker.set_result(None) else: new_wakers.append((pollable, waker)) self.wakers = new_wakers if self.exception is not None: raise self.exception future.result()
async def sendfile(self, transport, file, offset=0, count=None, *, fallback=True)
-
Send a file through a transport.
Return an amount of sent bytes.
Expand source code
async def sendfile(self, transport, file, offset=0, count=None, *, fallback=True): raise NotImplementedError
def set_debug(self, enabled)
-
Expand source code
def set_debug(self, enabled): raise NotImplementedError
def set_default_executor(self, executor)
-
Expand source code
def set_default_executor(self, executor): raise NotImplementedError
def set_exception_handler(self, handler)
-
Expand source code
def set_exception_handler(self, handler): raise NotImplementedError
def set_task_factory(self, factory)
-
Expand source code
def set_task_factory(self, factory): raise NotImplementedError
def shutdown_asyncgens(self)
-
Shutdown all active asynchronous generators.
Expand source code
def shutdown_asyncgens(self): pass
async def shutdown_default_executor(self)
-
Schedule the shutdown of the default executor.
Expand source code
async def shutdown_default_executor(self): raise NotImplementedError
async def sock_accept(self, sock)
-
Expand source code
async def sock_accept(self, sock): raise NotImplementedError
async def sock_connect(self, sock, address)
-
Expand source code
async def sock_connect(self, sock, address): raise NotImplementedError
async def sock_recv(self, sock, nbytes)
-
Expand source code
async def sock_recv(self, sock, nbytes): raise NotImplementedError
async def sock_recv_into(self, sock, buf)
-
Expand source code
async def sock_recv_into(self, sock, buf): raise NotImplementedError
async def sock_recvfrom(self, sock, bufsize)
-
Expand source code
async def sock_recvfrom(self, sock, bufsize): raise NotImplementedError
async def sock_recvfrom_into(self, sock, buf, nbytes=0)
-
Expand source code
async def sock_recvfrom_into(self, sock, buf, nbytes=0): raise NotImplementedError
async def sock_sendall(self, sock, data)
-
Expand source code
async def sock_sendall(self, sock, data): raise NotImplementedError
async def sock_sendfile(self, sock, file, offset=0, count=None, *, fallback=None)
-
Expand source code
async def sock_sendfile(self, sock, file, offset=0, count=None, *, fallback=None): raise NotImplementedError
async def sock_sendto(self, sock, data, address)
-
Expand source code
async def sock_sendto(self, sock, data, address): raise NotImplementedError
async def start_tls(self, transport, protocol, sslcontext, *, server_side=False, server_hostname=None, ssl_handshake_timeout=None, ssl_shutdown_timeout=None)
-
Upgrade a transport to TLS.
Return a new transport that protocol should start using immediately.
Expand source code
async def start_tls(self, transport, protocol, sslcontext, *, server_side=False, server_hostname=None, ssl_handshake_timeout=None, ssl_shutdown_timeout=None): raise NotImplementedError
def stop(self)
-
Stop the event loop as soon as reasonable.
Exactly how soon that is may depend on the implementation, but no more I/O callbacks should be scheduled.
Expand source code
def stop(self): self.running = False
async def subprocess_exec(self, protocol_factory, *args, stdin=-1, stdout=-1, stderr=-1, **kwargs)
-
Expand source code
async def subprocess_exec(self, protocol_factory, *args, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE, **kwargs): raise NotImplementedError
async def subprocess_shell(self, protocol_factory, cmd, *, stdin=-1, stdout=-1, stderr=-1, **kwargs)
-
Expand source code
async def subprocess_shell(self, protocol_factory, cmd, *, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE, **kwargs): raise NotImplementedError
def time(self)
-
Expand source code
def time(self): raise NotImplementedError
class Sink (body: OutgoingBody)
-
Writer abstraction over
wasi-http/types#outgoing-body
.Expand source code
class Sink: """Writer abstraction over `wasi-http/types#outgoing-body`.""" def __init__(self, body: OutgoingBody): self.body = body self.stream = body.write() async def send(self, chunk: bytes): """Write the specified bytes to the sink. This may need to yield according to the backpressure requirements of the sink. """ offset = 0 flushing = False while True: count = self.stream.check_write() if count == 0: await register(cast(PollLoop, asyncio.get_event_loop()), self.stream.subscribe()) elif offset == len(chunk): if flushing: return else: self.stream.flush() flushing = True else: count = min(count, len(chunk) - offset) self.stream.write(chunk[offset:offset+count]) offset += count def close(self): """Close the stream, indicating no further data will be written.""" self.stream.__exit__() self.stream = None OutgoingBody.finish(self.body, None) self.body = None
Methods
def close(self)
-
Close the stream, indicating no further data will be written.
Expand source code
def close(self): """Close the stream, indicating no further data will be written.""" self.stream.__exit__() self.stream = None OutgoingBody.finish(self.body, None) self.body = None
async def send(self, chunk: bytes)
-
Write the specified bytes to the sink.
This may need to yield according to the backpressure requirements of the sink.
Expand source code
async def send(self, chunk: bytes): """Write the specified bytes to the sink. This may need to yield according to the backpressure requirements of the sink. """ offset = 0 flushing = False while True: count = self.stream.check_write() if count == 0: await register(cast(PollLoop, asyncio.get_event_loop()), self.stream.subscribe()) elif offset == len(chunk): if flushing: return else: self.stream.flush() flushing = True else: count = min(count, len(chunk) - offset) self.stream.write(chunk[offset:offset+count]) offset += count
class Stream (body: IncomingBody)
-
Reader abstraction over
wasi:http/types#incoming-body
.Expand source code
class Stream: """Reader abstraction over `wasi:http/types#incoming-body`.""" def __init__(self, body: IncomingBody): self.body: Optional[IncomingBody] = body self.stream: Optional[InputStream] = body.stream() async def next(self) -> Optional[bytes]: """Wait for the next chunk of data to arrive on the stream. This will return `None` when the end of the stream has been reached. """ while True: try: if self.stream is None: return None else: buffer = self.stream.read(READ_SIZE) if len(buffer) == 0: await register(cast(PollLoop, asyncio.get_event_loop()), self.stream.subscribe()) else: return buffer except Err as e: if isinstance(e.value, StreamErrorClosed): if self.stream is not None: self.stream.__exit__() self.stream = None if self.body is not None: IncomingBody.finish(self.body) self.body = None else: raise e
Methods
async def next(self) ‑> Optional[bytes]
-
Wait for the next chunk of data to arrive on the stream.
This will return
None
when the end of the stream has been reached.Expand source code
async def next(self) -> Optional[bytes]: """Wait for the next chunk of data to arrive on the stream. This will return `None` when the end of the stream has been reached. """ while True: try: if self.stream is None: return None else: buffer = self.stream.read(READ_SIZE) if len(buffer) == 0: await register(cast(PollLoop, asyncio.get_event_loop()), self.stream.subscribe()) else: return buffer except Err as e: if isinstance(e.value, StreamErrorClosed): if self.stream is not None: self.stream.__exit__() self.stream = None if self.body is not None: IncomingBody.finish(self.body) self.body = None else: raise e