Module spin_sdk.wit.imports.mqtt
Expand source code
from typing import TypeVar, Generic, Union, Optional, Protocol, Tuple, List, Any, Self
from enum import Flag, Enum, auto
from dataclasses import dataclass
from abc import abstractmethod
import weakref
from ..types import Result, Ok, Err, Some
@dataclass
class Error_InvalidAddress:
pass
@dataclass
class Error_TooManyConnections:
pass
@dataclass
class Error_ConnectionFailed:
value: str
@dataclass
class Error_Other:
value: str
Error = Union[Error_InvalidAddress, Error_TooManyConnections, Error_ConnectionFailed, Error_Other]
"""
Errors related to interacting with Mqtt
"""
class Qos(Enum):
"""
QoS for publishing Mqtt messages
"""
AT_MOST_ONCE = 0
AT_LEAST_ONCE = 1
EXACTLY_ONCE = 2
class Connection:
@classmethod
def open(cls, address: str, username: str, password: str, keep_alive_interval_in_secs: int) -> Self:
"""
Open a connection to the Mqtt instance at `address`.
Raises: `spin_sdk.wit.types.Err(spin_sdk.wit.imports.mqtt.Error)`
"""
raise NotImplementedError
def publish(self, topic: str, payload: bytes, qos: Qos) -> None:
"""
Publish an Mqtt message to the specified `topic`.
Raises: `spin_sdk.wit.types.Err(spin_sdk.wit.imports.mqtt.Error)`
"""
raise NotImplementedError
def __enter__(self):
"""Returns self"""
return self
def __exit__(self, *args):
"""
Release this resource.
"""
raise NotImplementedError
Global variables
var Error
-
Errors related to interacting with Mqtt
Classes
class Connection
-
Expand source code
class Connection: @classmethod def open(cls, address: str, username: str, password: str, keep_alive_interval_in_secs: int) -> Self: """ Open a connection to the Mqtt instance at `address`. Raises: `spin_sdk.wit.types.Err(spin_sdk.wit.imports.mqtt.Error)` """ raise NotImplementedError def publish(self, topic: str, payload: bytes, qos: Qos) -> None: """ Publish an Mqtt message to the specified `topic`. Raises: `spin_sdk.wit.types.Err(spin_sdk.wit.imports.mqtt.Error)` """ raise NotImplementedError def __enter__(self): """Returns self""" return self def __exit__(self, *args): """ Release this resource. """ raise NotImplementedError
Static methods
def open(address: str, username: str, password: str, keep_alive_interval_in_secs: int) ‑> Self
-
Expand source code
@classmethod def open(cls, address: str, username: str, password: str, keep_alive_interval_in_secs: int) -> Self: """ Open a connection to the Mqtt instance at `address`. Raises: `spin_sdk.wit.types.Err(spin_sdk.wit.imports.mqtt.Error)` """ raise NotImplementedError
Methods
def publish(self, topic: str, payload: bytes, qos: Qos) ‑> None
-
Expand source code
def publish(self, topic: str, payload: bytes, qos: Qos) -> None: """ Publish an Mqtt message to the specified `topic`. Raises: `spin_sdk.wit.types.Err(spin_sdk.wit.imports.mqtt.Error)` """ raise NotImplementedError
class Error_ConnectionFailed (value: str)
-
Error_ConnectionFailed(value: str)
Expand source code
@dataclass class Error_ConnectionFailed: value: str
Class variables
var value : str
class Error_InvalidAddress
-
Error_InvalidAddress()
Expand source code
@dataclass class Error_InvalidAddress: pass
class Error_Other (value: str)
-
Error_Other(value: str)
Expand source code
@dataclass class Error_Other: value: str
Class variables
var value : str
class Error_TooManyConnections
-
Error_TooManyConnections()
Expand source code
@dataclass class Error_TooManyConnections: pass
class Qos (*args, **kwds)
-
QoS for publishing Mqtt messages
Expand source code
class Qos(Enum): """ QoS for publishing Mqtt messages """ AT_MOST_ONCE = 0 AT_LEAST_ONCE = 1 EXACTLY_ONCE = 2
Ancestors
- enum.Enum
Class variables
var AT_LEAST_ONCE
var AT_MOST_ONCE
var EXACTLY_ONCE