Module spin_sdk.http.poll_loop

Defines a custom asyncio event loop backed by wasi:io/poll#poll.

This also includes helper classes and functions for working with wasi:http.

As of WASI Preview 2, there is not yet a standard for first-class, composable asynchronous functions and streams. We expect that little or none of this boilerplate will be needed once those features arrive in Preview 3.

Expand source code
"""Defines a custom `asyncio` event loop backed by `wasi:io/poll#poll`.

This also includes helper classes and functions for working with `wasi:http`.

As of WASI Preview 2, there is not yet a standard for first-class, composable
asynchronous functions and streams.  We expect that little or none of this
boilerplate will be needed once those features arrive in Preview 3.

import asyncio
import socket
import subprocess

from spin_sdk.wit.types import Ok, Err
from spin_sdk.wit.imports import types, streams, poll, outgoing_handler
from spin_sdk.wit.imports.types import IncomingBody, OutgoingBody, OutgoingRequest, IncomingResponse
from spin_sdk.wit.imports.streams import StreamError_Closed, InputStream
from spin_sdk.wit.imports.poll import Pollable
from typing import Optional, cast

# Maximum number of bytes to read at a time
READ_SIZE: int = 16 * 1024

async def send(request: OutgoingRequest) -> IncomingResponse:
    """Send the specified request and wait asynchronously for the response."""
    future = outgoing_handler.handle(request, None)

    while True:
        response = future.get()
        if response is None:
            await register(cast(PollLoop, asyncio.get_event_loop()), future.subscribe())
            if isinstance(response, Ok):
                if isinstance(response.value, Ok):
                    return response.value.value
                    raise response.value
                raise response

class Stream:
    """Reader abstraction over `wasi:http/types#incoming-body`."""
    def __init__(self, body: IncomingBody):
        self.body: Optional[IncomingBody] = body Optional[InputStream] =

    async def next(self) -> Optional[bytes]:
        """Wait for the next chunk of data to arrive on the stream.

        This will return `None` when the end of the stream has been reached.
        while True:
                if is None:
                    return None
                    buffer =
                    if len(buffer) == 0:
                        await register(cast(PollLoop, asyncio.get_event_loop()),
                        return buffer
            except Err as e:
                if isinstance(e.value, StreamError_Closed):
                    if is not None:
               = None
                    if self.body is not None:
                        self.body = None
                    raise e

class Sink:
    """Writer abstraction over `wasi-http/types#outgoing-body`."""
    def __init__(self, body: OutgoingBody):
        self.body = body = body.write()

    async def send(self, chunk: bytes):
        """Write the specified bytes to the sink.

        This may need to yield according to the backpressure requirements of the sink.
        offset = 0
        flushing = False
        while True:
            count =
            if count == 0:
                await register(cast(PollLoop, asyncio.get_event_loop()),
            elif offset == len(chunk):
                if flushing:
                    flushing = True
                count = min(count, len(chunk) - offset)
                offset += count

    def close(self):
        """Close the stream, indicating no further data will be written.""" = None
        OutgoingBody.finish(self.body, None)
        self.body = None
class PollLoop(asyncio.AbstractEventLoop):
    """Custom `asyncio` event loop backed by `wasi:io/poll#poll`."""
    def __init__(self):
        self.wakers = []
        self.running = False
        self.handles = []
        self.exception = None

    def get_debug(self):
        return False

    def run_until_complete(self, future):
        future = asyncio.ensure_future(future, loop=self)

        self.running = True
        while self.running and not future.done():
            handle = self.handles[0]
            self.handles = self.handles[1:]
            if not handle._cancelled:
            if self.wakers:
                [pollables, wakers] = list(map(list, zip(*self.wakers)))
                new_wakers = []
                ready = [False] * len(pollables)
                for index in poll.poll(pollables):
                    ready[index] = True
                for (ready, pollable), waker in zip(zip(ready, pollables), wakers):
                    if ready:
                        new_wakers.append((pollable, waker))

                self.wakers = new_wakers

            if self.exception is not None:
                raise self.exception
        return future.result()

    def is_running(self):
        return self.running

    def is_closed(self):
        return not self.running

    def stop(self):
        self.running = False

    def close(self):
        self.running = False

    def shutdown_asyncgens(self):

    def call_exception_handler(self, context):
        self.exception = context.get('exception', None)

    def call_soon(self, callback, *args, context=None):
        handle = asyncio.Handle(callback, args, self, context)
        return handle

    def create_task(self, coroutine):
        return asyncio.Task(coroutine, loop=self)

    def create_future(self):
        return asyncio.Future(loop=self)

    # The remaining methods should be irrelevant for our purposes and thus unimplemented

    def run_forever(self):
        raise NotImplementedError

    async def shutdown_default_executor(self):
        raise NotImplementedError

    def _timer_handle_cancelled(self, handle):
        raise NotImplementedError

    def call_later(self, delay, callback, *args, context=None):
        raise NotImplementedError

    def call_at(self, when, callback, *args, context=None):
        raise NotImplementedError

    def time(self):
        raise NotImplementedError

    def call_soon_threadsafe(self, callback, *args, context=None):
        raise NotImplementedError

    def run_in_executor(self, executor, func, *args):
        raise NotImplementedError

    def set_default_executor(self, executor):
        raise NotImplementedError

    async def getaddrinfo(self, host, port, *,
                          family=0, type=0, proto=0, flags=0):
        raise NotImplementedError

    async def getnameinfo(self, sockaddr, flags=0):
        raise NotImplementedError

    async def create_connection(
            self, protocol_factory, host=None, port=None,
            *, ssl=None, family=0, proto=0,
            flags=0, sock=None, local_addr=None,
            happy_eyeballs_delay=None, interleave=None):
        raise NotImplementedError

    async def create_server(
            self, protocol_factory, host=None, port=None,
            *, family=socket.AF_UNSPEC,
            flags=socket.AI_PASSIVE, sock=None, backlog=100,
            ssl=None, reuse_address=None, reuse_port=None,
        raise NotImplementedError

    async def sendfile(self, transport, file, offset=0, count=None,
                       *, fallback=True):
        raise NotImplementedError

    async def start_tls(self, transport, protocol, sslcontext, *,
        raise NotImplementedError

    async def create_unix_connection(
            self, protocol_factory, path=None, *,
            ssl=None, sock=None,
        raise NotImplementedError

    async def create_unix_server(
            self, protocol_factory, path=None, *,
            sock=None, backlog=100, ssl=None,
        raise NotImplementedError

    async def connect_accepted_socket(
            self, protocol_factory, sock,
            *, ssl=None,
        raise NotImplementedError

    async def create_datagram_endpoint(self, protocol_factory,
                                       local_addr=None, remote_addr=None, *,
                                       family=0, proto=0, flags=0,
                                       reuse_address=None, reuse_port=None,
                                       allow_broadcast=None, sock=None):
        raise NotImplementedError

    async def connect_read_pipe(self, protocol_factory, pipe):
        raise NotImplementedError

    async def connect_write_pipe(self, protocol_factory, pipe):
        raise NotImplementedError

    async def subprocess_shell(self, protocol_factory, cmd, *,
        raise NotImplementedError

    async def subprocess_exec(self, protocol_factory, *args,
        raise NotImplementedError

    def add_reader(self, fd, callback, *args):
        raise NotImplementedError

    def remove_reader(self, fd):
        raise NotImplementedError

    def add_writer(self, fd, callback, *args):
        raise NotImplementedError

    def remove_writer(self, fd):
        raise NotImplementedError

    async def sock_recv(self, sock, nbytes):
        raise NotImplementedError

    async def sock_recv_into(self, sock, buf):
        raise NotImplementedError

    async def sock_recvfrom(self, sock, bufsize):
        raise NotImplementedError

    async def sock_recvfrom_into(self, sock, buf, nbytes=0):
        raise NotImplementedError

    async def sock_sendall(self, sock, data):
        raise NotImplementedError

    async def sock_sendto(self, sock, data, address):
        raise NotImplementedError

    async def sock_connect(self, sock, address):
        raise NotImplementedError

    async def sock_accept(self, sock):
        raise NotImplementedError

    async def sock_sendfile(self, sock, file, offset=0, count=None,
                            *, fallback=None):
        raise NotImplementedError

    def add_signal_handler(self, sig, callback, *args):
        raise NotImplementedError

    def remove_signal_handler(self, sig):
        raise NotImplementedError

    def set_task_factory(self, factory):
        raise NotImplementedError

    def get_task_factory(self):
        raise NotImplementedError

    def get_exception_handler(self):
        raise NotImplementedError

    def set_exception_handler(self, handler):
        raise NotImplementedError

    def default_exception_handler(self, context):
        raise NotImplementedError

    def set_debug(self, enabled):
        raise NotImplementedError

async def register(loop: PollLoop, pollable: Pollable):
    waker = loop.create_future()
    loop.wakers.append((pollable, waker))
    await waker


async def register(loop: PollLoop, pollable: Pollable)
Expand source code
async def register(loop: PollLoop, pollable: Pollable):
    waker = loop.create_future()
    loop.wakers.append((pollable, waker))
    await waker
async def send(request: OutgoingRequest) ‑> IncomingResponse

Send the specified request and wait asynchronously for the response.

Expand source code
async def send(request: OutgoingRequest) -> IncomingResponse:
    """Send the specified request and wait asynchronously for the response."""
    future = outgoing_handler.handle(request, None)

    while True:
        response = future.get()
        if response is None:
            await register(cast(PollLoop, asyncio.get_event_loop()), future.subscribe())
            if isinstance(response, Ok):
                if isinstance(response.value, Ok):
                    return response.value.value
                    raise response.value
                raise response


class PollLoop

Custom asyncio event loop backed by wasi:io/poll#poll.

Expand source code
class PollLoop(asyncio.AbstractEventLoop):
    """Custom `asyncio` event loop backed by `wasi:io/poll#poll`."""
    def __init__(self):
        self.wakers = []
        self.running = False
        self.handles = []
        self.exception = None

    def get_debug(self):
        return False

    def run_until_complete(self, future):
        future = asyncio.ensure_future(future, loop=self)

        self.running = True
        while self.running and not future.done():
            handle = self.handles[0]
            self.handles = self.handles[1:]
            if not handle._cancelled:
            if self.wakers:
                [pollables, wakers] = list(map(list, zip(*self.wakers)))
                new_wakers = []
                ready = [False] * len(pollables)
                for index in poll.poll(pollables):
                    ready[index] = True
                for (ready, pollable), waker in zip(zip(ready, pollables), wakers):
                    if ready:
                        new_wakers.append((pollable, waker))

                self.wakers = new_wakers

            if self.exception is not None:
                raise self.exception
        return future.result()

    def is_running(self):
        return self.running

    def is_closed(self):
        return not self.running

    def stop(self):
        self.running = False

    def close(self):
        self.running = False

    def shutdown_asyncgens(self):

    def call_exception_handler(self, context):
        self.exception = context.get('exception', None)

    def call_soon(self, callback, *args, context=None):
        handle = asyncio.Handle(callback, args, self, context)
        return handle

    def create_task(self, coroutine):
        return asyncio.Task(coroutine, loop=self)

    def create_future(self):
        return asyncio.Future(loop=self)

    # The remaining methods should be irrelevant for our purposes and thus unimplemented

    def run_forever(self):
        raise NotImplementedError

    async def shutdown_default_executor(self):
        raise NotImplementedError

    def _timer_handle_cancelled(self, handle):
        raise NotImplementedError

    def call_later(self, delay, callback, *args, context=None):
        raise NotImplementedError

    def call_at(self, when, callback, *args, context=None):
        raise NotImplementedError

    def time(self):
        raise NotImplementedError

    def call_soon_threadsafe(self, callback, *args, context=None):
        raise NotImplementedError

    def run_in_executor(self, executor, func, *args):
        raise NotImplementedError

    def set_default_executor(self, executor):
        raise NotImplementedError

    async def getaddrinfo(self, host, port, *,
                          family=0, type=0, proto=0, flags=0):
        raise NotImplementedError

    async def getnameinfo(self, sockaddr, flags=0):
        raise NotImplementedError

    async def create_connection(
            self, protocol_factory, host=None, port=None,
            *, ssl=None, family=0, proto=0,
            flags=0, sock=None, local_addr=None,
            happy_eyeballs_delay=None, interleave=None):
        raise NotImplementedError

    async def create_server(
            self, protocol_factory, host=None, port=None,
            *, family=socket.AF_UNSPEC,
            flags=socket.AI_PASSIVE, sock=None, backlog=100,
            ssl=None, reuse_address=None, reuse_port=None,
        raise NotImplementedError

    async def sendfile(self, transport, file, offset=0, count=None,
                       *, fallback=True):
        raise NotImplementedError

    async def start_tls(self, transport, protocol, sslcontext, *,
        raise NotImplementedError

    async def create_unix_connection(
            self, protocol_factory, path=None, *,
            ssl=None, sock=None,
        raise NotImplementedError

    async def create_unix_server(
            self, protocol_factory, path=None, *,
            sock=None, backlog=100, ssl=None,
        raise NotImplementedError

    async def connect_accepted_socket(
            self, protocol_factory, sock,
            *, ssl=None,
        raise NotImplementedError

    async def create_datagram_endpoint(self, protocol_factory,
                                       local_addr=None, remote_addr=None, *,
                                       family=0, proto=0, flags=0,
                                       reuse_address=None, reuse_port=None,
                                       allow_broadcast=None, sock=None):
        raise NotImplementedError

    async def connect_read_pipe(self, protocol_factory, pipe):
        raise NotImplementedError

    async def connect_write_pipe(self, protocol_factory, pipe):
        raise NotImplementedError

    async def subprocess_shell(self, protocol_factory, cmd, *,
        raise NotImplementedError

    async def subprocess_exec(self, protocol_factory, *args,
        raise NotImplementedError

    def add_reader(self, fd, callback, *args):
        raise NotImplementedError

    def remove_reader(self, fd):
        raise NotImplementedError

    def add_writer(self, fd, callback, *args):
        raise NotImplementedError

    def remove_writer(self, fd):
        raise NotImplementedError

    async def sock_recv(self, sock, nbytes):
        raise NotImplementedError

    async def sock_recv_into(self, sock, buf):
        raise NotImplementedError

    async def sock_recvfrom(self, sock, bufsize):
        raise NotImplementedError

    async def sock_recvfrom_into(self, sock, buf, nbytes=0):
        raise NotImplementedError

    async def sock_sendall(self, sock, data):
        raise NotImplementedError

    async def sock_sendto(self, sock, data, address):
        raise NotImplementedError

    async def sock_connect(self, sock, address):
        raise NotImplementedError

    async def sock_accept(self, sock):
        raise NotImplementedError

    async def sock_sendfile(self, sock, file, offset=0, count=None,
                            *, fallback=None):
        raise NotImplementedError

    def add_signal_handler(self, sig, callback, *args):
        raise NotImplementedError

    def remove_signal_handler(self, sig):
        raise NotImplementedError

    def set_task_factory(self, factory):
        raise NotImplementedError

    def get_task_factory(self):
        raise NotImplementedError

    def get_exception_handler(self):
        raise NotImplementedError

    def set_exception_handler(self, handler):
        raise NotImplementedError

    def default_exception_handler(self, context):
        raise NotImplementedError

    def set_debug(self, enabled):
        raise NotImplementedError




def add_reader(self, fd, callback, *args)
Expand source code
def add_reader(self, fd, callback, *args):
    raise NotImplementedError
def add_signal_handler(self, sig, callback, *args)
Expand source code
def add_signal_handler(self, sig, callback, *args):
    raise NotImplementedError
def add_writer(self, fd, callback, *args)
Expand source code
def add_writer(self, fd, callback, *args):
    raise NotImplementedError
def call_at(self, when, callback, *args, context=None)
Expand source code
def call_at(self, when, callback, *args, context=None):
    raise NotImplementedError
def call_exception_handler(self, context)
Expand source code
def call_exception_handler(self, context):
    self.exception = context.get('exception', None)
def call_later(self, delay, callback, *args, context=None)
Expand source code
def call_later(self, delay, callback, *args, context=None):
    raise NotImplementedError
def call_soon(self, callback, *args, context=None)
Expand source code
def call_soon(self, callback, *args, context=None):
    handle = asyncio.Handle(callback, args, self, context)
    return handle
def call_soon_threadsafe(self, callback, *args, context=None)
Expand source code
def call_soon_threadsafe(self, callback, *args, context=None):
    raise NotImplementedError
def close(self)

Close the loop.

The loop should not be running.

This is idempotent and irreversible.

No other methods should be called after this one.

Expand source code
def close(self):
    self.running = False
async def connect_accepted_socket(self, protocol_factory, sock, *, ssl=None, ssl_handshake_timeout=None, ssl_shutdown_timeout=None)

Handle an accepted connection.

This is used by servers that accept connections outside of asyncio, but use asyncio to handle connections.

This method is a coroutine. When completed, the coroutine returns a (transport, protocol) pair.

Expand source code
async def connect_accepted_socket(
        self, protocol_factory, sock,
        *, ssl=None,
    raise NotImplementedError
async def connect_read_pipe(self, protocol_factory, pipe)

Register read pipe in event loop. Set the pipe to non-blocking mode.

protocol_factory should instantiate object with Protocol interface. pipe is a file-like object. Return pair (transport, protocol), where transport supports the ReadTransport interface.

Expand source code
async def connect_read_pipe(self, protocol_factory, pipe):
    raise NotImplementedError
async def connect_write_pipe(self, protocol_factory, pipe)

Register write pipe in event loop.

protocol_factory should instantiate object with BaseProtocol interface. Pipe is file-like object already switched to nonblocking. Return pair (transport, protocol), where transport support WriteTransport interface.

Expand source code
async def connect_write_pipe(self, protocol_factory, pipe):
    raise NotImplementedError
async def create_connection(self, protocol_factory, host=None, port=None, *, ssl=None, family=0, proto=0, flags=0, sock=None, local_addr=None, server_hostname=None, ssl_handshake_timeout=None, ssl_shutdown_timeout=None, happy_eyeballs_delay=None, interleave=None)
Expand source code
async def create_connection(
        self, protocol_factory, host=None, port=None,
        *, ssl=None, family=0, proto=0,
        flags=0, sock=None, local_addr=None,
        happy_eyeballs_delay=None, interleave=None):
    raise NotImplementedError
async def create_datagram_endpoint(self, protocol_factory, local_addr=None, remote_addr=None, *, family=0, proto=0, flags=0, reuse_address=None, reuse_port=None, allow_broadcast=None, sock=None)

A coroutine which creates a datagram endpoint.

This method will try to establish the endpoint in the background. When successful, the coroutine returns a (transport, protocol) pair.

protocol_factory must be a callable returning a protocol instance.

socket family AF_INET, socket.AF_INET6 or socket.AF_UNIX depending on host (or family if specified), socket type SOCK_DGRAM.

reuse_address tells the kernel to reuse a local socket in TIME_WAIT state, without waiting for its natural timeout to expire. If not specified it will automatically be set to True on UNIX.

reuse_port tells the kernel to allow this endpoint to be bound to the same port as other existing endpoints are bound to, so long as they all set this flag when being created. This option is not supported on Windows and some UNIX's. If the :py:data:~socket.SO_REUSEPORT constant is not defined then this capability is unsupported.

allow_broadcast tells the kernel to allow this endpoint to send messages to the broadcast address.

sock can optionally be specified in order to use a preexisting socket object.

Expand source code
async def create_datagram_endpoint(self, protocol_factory,
                                   local_addr=None, remote_addr=None, *,
                                   family=0, proto=0, flags=0,
                                   reuse_address=None, reuse_port=None,
                                   allow_broadcast=None, sock=None):
    raise NotImplementedError
def create_future(self)
Expand source code
def create_future(self):
    return asyncio.Future(loop=self)
async def create_server(self, protocol_factory, host=None, port=None, *, family=0, flags=1, sock=None, backlog=100, ssl=None, reuse_address=None, reuse_port=None, ssl_handshake_timeout=None, ssl_shutdown_timeout=None, start_serving=True)

A coroutine which creates a TCP server bound to host and port.

The return value is a Server object which can be used to stop the service.

If host is an empty string or None all interfaces are assumed and a list of multiple sockets will be returned (most likely one for IPv4 and another one for IPv6). The host parameter can also be a sequence (e.g. list) of hosts to bind to.

family can be set to either AF_INET or AF_INET6 to force the socket to use IPv4 or IPv6. If not set it will be determined from host (defaults to AF_UNSPEC).

flags is a bitmask for getaddrinfo().

sock can optionally be specified in order to use a preexisting socket object.

backlog is the maximum number of queued connections passed to listen() (defaults to 100).

ssl can be set to an SSLContext to enable SSL over the accepted connections.

reuse_address tells the kernel to reuse a local socket in TIME_WAIT state, without waiting for its natural timeout to expire. If not specified will automatically be set to True on UNIX.

reuse_port tells the kernel to allow this endpoint to be bound to the same port as other existing endpoints are bound to, so long as they all set this flag when being created. This option is not supported on Windows.

ssl_handshake_timeout is the time in seconds that an SSL server will wait for completion of the SSL handshake before aborting the connection. Default is 60s.

ssl_shutdown_timeout is the time in seconds that an SSL server will wait for completion of the SSL shutdown procedure before aborting the connection. Default is 30s.

start_serving set to True (default) causes the created server to start accepting connections immediately. When set to False, the user should await Server.start_serving() or Server.serve_forever() to make the server to start accepting connections.

Expand source code
async def create_server(
        self, protocol_factory, host=None, port=None,
        *, family=socket.AF_UNSPEC,
        flags=socket.AI_PASSIVE, sock=None, backlog=100,
        ssl=None, reuse_address=None, reuse_port=None,
    raise NotImplementedError
def create_task(self, coroutine)
Expand source code
def create_task(self, coroutine):
    return asyncio.Task(coroutine, loop=self)
async def create_unix_connection(self, protocol_factory, path=None, *, ssl=None, sock=None, server_hostname=None, ssl_handshake_timeout=None, ssl_shutdown_timeout=None)
Expand source code
async def create_unix_connection(
        self, protocol_factory, path=None, *,
        ssl=None, sock=None,
    raise NotImplementedError
async def create_unix_server(self, protocol_factory, path=None, *, sock=None, backlog=100, ssl=None, ssl_handshake_timeout=None, ssl_shutdown_timeout=None, start_serving=True)

A coroutine which creates a UNIX Domain Socket server.

The return value is a Server object, which can be used to stop the service.

path is a str, representing a file system path to bind the server socket to.

sock can optionally be specified in order to use a preexisting socket object.

backlog is the maximum number of queued connections passed to listen() (defaults to 100).

ssl can be set to an SSLContext to enable SSL over the accepted connections.

ssl_handshake_timeout is the time in seconds that an SSL server will wait for the SSL handshake to complete (defaults to 60s).

ssl_shutdown_timeout is the time in seconds that an SSL server will wait for the SSL shutdown to finish (defaults to 30s).

start_serving set to True (default) causes the created server to start accepting connections immediately. When set to False, the user should await Server.start_serving() or Server.serve_forever() to make the server to start accepting connections.

Expand source code
async def create_unix_server(
        self, protocol_factory, path=None, *,
        sock=None, backlog=100, ssl=None,
    raise NotImplementedError
def default_exception_handler(self, context)
Expand source code
def default_exception_handler(self, context):
    raise NotImplementedError
def get_debug(self)
Expand source code
def get_debug(self):
    return False
def get_exception_handler(self)
Expand source code
def get_exception_handler(self):
    raise NotImplementedError
def get_task_factory(self)
Expand source code
def get_task_factory(self):
    raise NotImplementedError
async def getaddrinfo(self, host, port, *, family=0, type=0, proto=0, flags=0)
Expand source code
async def getaddrinfo(self, host, port, *,
                      family=0, type=0, proto=0, flags=0):
    raise NotImplementedError
async def getnameinfo(self, sockaddr, flags=0)
Expand source code
async def getnameinfo(self, sockaddr, flags=0):
    raise NotImplementedError
def is_closed(self)

Returns True if the event loop was closed.

Expand source code
def is_closed(self):
    return not self.running
def is_running(self)

Return whether the event loop is currently running.

Expand source code
def is_running(self):
    return self.running
def remove_reader(self, fd)
Expand source code
def remove_reader(self, fd):
    raise NotImplementedError
def remove_signal_handler(self, sig)
Expand source code
def remove_signal_handler(self, sig):
    raise NotImplementedError
def remove_writer(self, fd)
Expand source code
def remove_writer(self, fd):
    raise NotImplementedError
def run_forever(self)

Run the event loop until stop() is called.

Expand source code
def run_forever(self):
    raise NotImplementedError
def run_in_executor(self, executor, func, *args)
Expand source code
def run_in_executor(self, executor, func, *args):
    raise NotImplementedError
def run_until_complete(self, future)

Run the event loop until a Future is done.

Return the Future's result, or raise its exception.

Expand source code
def run_until_complete(self, future):
    future = asyncio.ensure_future(future, loop=self)

    self.running = True
    while self.running and not future.done():
        handle = self.handles[0]
        self.handles = self.handles[1:]
        if not handle._cancelled:
        if self.wakers:
            [pollables, wakers] = list(map(list, zip(*self.wakers)))
            new_wakers = []
            ready = [False] * len(pollables)
            for index in poll.poll(pollables):
                ready[index] = True
            for (ready, pollable), waker in zip(zip(ready, pollables), wakers):
                if ready:
                    new_wakers.append((pollable, waker))

            self.wakers = new_wakers

        if self.exception is not None:
            raise self.exception
    return future.result()
async def sendfile(self, transport, file, offset=0, count=None, *, fallback=True)

Send a file through a transport.

Return an amount of sent bytes.

Expand source code
async def sendfile(self, transport, file, offset=0, count=None,
                   *, fallback=True):
    raise NotImplementedError
def set_debug(self, enabled)
Expand source code
def set_debug(self, enabled):
    raise NotImplementedError
def set_default_executor(self, executor)
Expand source code
def set_default_executor(self, executor):
    raise NotImplementedError
def set_exception_handler(self, handler)
Expand source code
def set_exception_handler(self, handler):
    raise NotImplementedError
def set_task_factory(self, factory)
Expand source code
def set_task_factory(self, factory):
    raise NotImplementedError
def shutdown_asyncgens(self)

Shutdown all active asynchronous generators.

Expand source code
def shutdown_asyncgens(self):
async def shutdown_default_executor(self)

Schedule the shutdown of the default executor.

Expand source code
async def shutdown_default_executor(self):
    raise NotImplementedError
async def sock_accept(self, sock)
Expand source code
async def sock_accept(self, sock):
    raise NotImplementedError
async def sock_connect(self, sock, address)
Expand source code
async def sock_connect(self, sock, address):
    raise NotImplementedError
async def sock_recv(self, sock, nbytes)
Expand source code
async def sock_recv(self, sock, nbytes):
    raise NotImplementedError
async def sock_recv_into(self, sock, buf)
Expand source code
async def sock_recv_into(self, sock, buf):
    raise NotImplementedError
async def sock_recvfrom(self, sock, bufsize)
Expand source code
async def sock_recvfrom(self, sock, bufsize):
    raise NotImplementedError
async def sock_recvfrom_into(self, sock, buf, nbytes=0)
Expand source code
async def sock_recvfrom_into(self, sock, buf, nbytes=0):
    raise NotImplementedError
async def sock_sendall(self, sock, data)
Expand source code
async def sock_sendall(self, sock, data):
    raise NotImplementedError
async def sock_sendfile(self, sock, file, offset=0, count=None, *, fallback=None)
Expand source code
async def sock_sendfile(self, sock, file, offset=0, count=None,
                        *, fallback=None):
    raise NotImplementedError
async def sock_sendto(self, sock, data, address)
Expand source code
async def sock_sendto(self, sock, data, address):
    raise NotImplementedError
async def start_tls(self, transport, protocol, sslcontext, *, server_side=False, server_hostname=None, ssl_handshake_timeout=None, ssl_shutdown_timeout=None)

Upgrade a transport to TLS.

Return a new transport that protocol should start using immediately.

Expand source code
async def start_tls(self, transport, protocol, sslcontext, *,
    raise NotImplementedError
def stop(self)

Stop the event loop as soon as reasonable.

Exactly how soon that is may depend on the implementation, but no more I/O callbacks should be scheduled.

Expand source code
def stop(self):
    self.running = False
async def subprocess_exec(self, protocol_factory, *args, stdin=-1, stdout=-1, stderr=-1, **kwargs)
Expand source code
async def subprocess_exec(self, protocol_factory, *args,
    raise NotImplementedError
async def subprocess_shell(self, protocol_factory, cmd, *, stdin=-1, stdout=-1, stderr=-1, **kwargs)
Expand source code
async def subprocess_shell(self, protocol_factory, cmd, *,
    raise NotImplementedError
def time(self)
Expand source code
def time(self):
    raise NotImplementedError
class Sink (body: OutgoingBody)

Writer abstraction over wasi-http/types#outgoing-body.

Expand source code
class Sink:
    """Writer abstraction over `wasi-http/types#outgoing-body`."""
    def __init__(self, body: OutgoingBody):
        self.body = body = body.write()

    async def send(self, chunk: bytes):
        """Write the specified bytes to the sink.

        This may need to yield according to the backpressure requirements of the sink.
        offset = 0
        flushing = False
        while True:
            count =
            if count == 0:
                await register(cast(PollLoop, asyncio.get_event_loop()),
            elif offset == len(chunk):
                if flushing:
                    flushing = True
                count = min(count, len(chunk) - offset)
                offset += count

    def close(self):
        """Close the stream, indicating no further data will be written.""" = None
        OutgoingBody.finish(self.body, None)
        self.body = None


def close(self)

Close the stream, indicating no further data will be written.

Expand source code
def close(self):
    """Close the stream, indicating no further data will be written.""" = None
    OutgoingBody.finish(self.body, None)
    self.body = None
async def send(self, chunk: bytes)

Write the specified bytes to the sink.

This may need to yield according to the backpressure requirements of the sink.

Expand source code
async def send(self, chunk: bytes):
    """Write the specified bytes to the sink.

    This may need to yield according to the backpressure requirements of the sink.
    offset = 0
    flushing = False
    while True:
        count =
        if count == 0:
            await register(cast(PollLoop, asyncio.get_event_loop()),
        elif offset == len(chunk):
            if flushing:
                flushing = True
            count = min(count, len(chunk) - offset)
            offset += count
class Stream (body: IncomingBody)

Reader abstraction over wasi:http/types#incoming-body.

Expand source code
class Stream:
    """Reader abstraction over `wasi:http/types#incoming-body`."""
    def __init__(self, body: IncomingBody):
        self.body: Optional[IncomingBody] = body Optional[InputStream] =

    async def next(self) -> Optional[bytes]:
        """Wait for the next chunk of data to arrive on the stream.

        This will return `None` when the end of the stream has been reached.
        while True:
                if is None:
                    return None
                    buffer =
                    if len(buffer) == 0:
                        await register(cast(PollLoop, asyncio.get_event_loop()),
                        return buffer
            except Err as e:
                if isinstance(e.value, StreamError_Closed):
                    if is not None:
               = None
                    if self.body is not None:
                        self.body = None
                    raise e


async def next(self) ‑> Optional[bytes]

Wait for the next chunk of data to arrive on the stream.

This will return None when the end of the stream has been reached.

Expand source code
async def next(self) -> Optional[bytes]:
    """Wait for the next chunk of data to arrive on the stream.

    This will return `None` when the end of the stream has been reached.
    while True:
            if is None:
                return None
                buffer =
                if len(buffer) == 0:
                    await register(cast(PollLoop, asyncio.get_event_loop()),
                    return buffer
        except Err as e:
            if isinstance(e.value, StreamError_Closed):
                if is not None:
           = None
                if self.body is not None:
                    self.body = None
                raise e