Module spin_sdk.wit.imports.mqtt

Expand source code
from typing import TypeVar, Generic, Union, Optional, Protocol, Tuple, List, Any, Self
from enum import Flag, Enum, auto
from dataclasses import dataclass
from abc import abstractmethod
import weakref

from ..types import Result, Ok, Err, Some



@dataclass
class Error_InvalidAddress:
    pass


@dataclass
class Error_TooManyConnections:
    pass


@dataclass
class Error_ConnectionFailed:
    value: str


@dataclass
class Error_Other:
    value: str


Error = Union[Error_InvalidAddress, Error_TooManyConnections, Error_ConnectionFailed, Error_Other]
"""
Errors related to interacting with Mqtt
"""


class Qos(Enum):
    """
    QoS for publishing Mqtt messages
    """
    AT_MOST_ONCE = 0
    AT_LEAST_ONCE = 1
    EXACTLY_ONCE = 2

class Connection:
    
    @classmethod
    def open(cls, address: str, username: str, password: str, keep_alive_interval_in_secs: int) -> Self:
        """
        Open a connection to the Mqtt instance at `address`.
        
        Raises: `spin_sdk.wit.types.Err(spin_sdk.wit.imports.mqtt.Error)`
        """
        raise NotImplementedError

    def publish(self, topic: str, payload: bytes, qos: Qos) -> None:
        """
        Publish an Mqtt message to the specified `topic`.
        
        Raises: `spin_sdk.wit.types.Err(spin_sdk.wit.imports.mqtt.Error)`
        """
        raise NotImplementedError

    def __enter__(self):
        """Returns self"""
        return self
                                                                    
    def __exit__(self, *args):
        """
        Release this resource.
        """
        raise NotImplementedError

Global variables

var Error

Errors related to interacting with Mqtt

Classes

class Connection
Expand source code
class Connection:
    
    @classmethod
    def open(cls, address: str, username: str, password: str, keep_alive_interval_in_secs: int) -> Self:
        """
        Open a connection to the Mqtt instance at `address`.
        
        Raises: `spin_sdk.wit.types.Err(spin_sdk.wit.imports.mqtt.Error)`
        """
        raise NotImplementedError

    def publish(self, topic: str, payload: bytes, qos: Qos) -> None:
        """
        Publish an Mqtt message to the specified `topic`.
        
        Raises: `spin_sdk.wit.types.Err(spin_sdk.wit.imports.mqtt.Error)`
        """
        raise NotImplementedError

    def __enter__(self):
        """Returns self"""
        return self
                                                                    
    def __exit__(self, *args):
        """
        Release this resource.
        """
        raise NotImplementedError

Static methods

def open(address: str, username: str, password: str, keep_alive_interval_in_secs: int) ‑> Self

Open a connection to the Mqtt instance at address.

Raises: Err(Error)

Expand source code
@classmethod
def open(cls, address: str, username: str, password: str, keep_alive_interval_in_secs: int) -> Self:
    """
    Open a connection to the Mqtt instance at `address`.
    
    Raises: `spin_sdk.wit.types.Err(spin_sdk.wit.imports.mqtt.Error)`
    """
    raise NotImplementedError

Methods

def publish(self, topic: str, payload: bytes, qos: Qos) ‑> None

Publish an Mqtt message to the specified topic.

Raises: Err(Error)

Expand source code
def publish(self, topic: str, payload: bytes, qos: Qos) -> None:
    """
    Publish an Mqtt message to the specified `topic`.
    
    Raises: `spin_sdk.wit.types.Err(spin_sdk.wit.imports.mqtt.Error)`
    """
    raise NotImplementedError
class Error_ConnectionFailed (value: str)

Error_ConnectionFailed(value: str)

Expand source code
@dataclass
class Error_ConnectionFailed:
    value: str

Class variables

var value : str
class Error_InvalidAddress

Error_InvalidAddress()

Expand source code
@dataclass
class Error_InvalidAddress:
    pass
class Error_Other (value: str)

Error_Other(value: str)

Expand source code
@dataclass
class Error_Other:
    value: str

Class variables

var value : str
class Error_TooManyConnections

Error_TooManyConnections()

Expand source code
@dataclass
class Error_TooManyConnections:
    pass
class Qos (*args, **kwds)

QoS for publishing Mqtt messages

Expand source code
class Qos(Enum):
    """
    QoS for publishing Mqtt messages
    """
    AT_MOST_ONCE = 0
    AT_LEAST_ONCE = 1
    EXACTLY_ONCE = 2

Ancestors

  • enum.Enum

Class variables

var AT_LEAST_ONCE
var AT_MOST_ONCE
var EXACTLY_ONCE