Module spin_sdk.wit.imports.mysql
Expand source code
from typing import TypeVar, Generic, Union, Optional, Union, Protocol, Tuple, List, Any, Self
from enum import Flag, Enum, auto
from dataclasses import dataclass
from abc import abstractmethod
import weakref
from ..types import Result, Ok, Err, Some
from ..imports import rdbms_types
class Connection:
"""
A connection to a MySQL database.
"""
@classmethod
def open(cls, address: str) -> Self:
"""
Open a connection to the MySQL instance at `address`.
Raises: `spin_sdk.wit.types.Err(spin_sdk.wit.imports.rdbms_types.Error)`
"""
raise NotImplementedError
def query(self, statement: str, params: List[rdbms_types.ParameterValue]) -> rdbms_types.RowSet:
"""
query the database: select
Raises: `spin_sdk.wit.types.Err(spin_sdk.wit.imports.rdbms_types.Error)`
"""
raise NotImplementedError
def execute(self, statement: str, params: List[rdbms_types.ParameterValue]) -> None:
"""
execute command to the database: insert, update, delete
Raises: `spin_sdk.wit.types.Err(spin_sdk.wit.imports.rdbms_types.Error)`
"""
raise NotImplementedError
def __enter__(self):
"""Returns self"""
return self
def __exit__(self, *args):
"""
Release this resource.
"""
raise NotImplementedError
Classes
class Connection
-
A connection to a MySQL database.
Expand source code
class Connection: """ A connection to a MySQL database. """ @classmethod def open(cls, address: str) -> Self: """ Open a connection to the MySQL instance at `address`. Raises: `spin_sdk.wit.types.Err(spin_sdk.wit.imports.rdbms_types.Error)` """ raise NotImplementedError def query(self, statement: str, params: List[rdbms_types.ParameterValue]) -> rdbms_types.RowSet: """ query the database: select Raises: `spin_sdk.wit.types.Err(spin_sdk.wit.imports.rdbms_types.Error)` """ raise NotImplementedError def execute(self, statement: str, params: List[rdbms_types.ParameterValue]) -> None: """ execute command to the database: insert, update, delete Raises: `spin_sdk.wit.types.Err(spin_sdk.wit.imports.rdbms_types.Error)` """ raise NotImplementedError def __enter__(self): """Returns self""" return self def __exit__(self, *args): """ Release this resource. """ raise NotImplementedError
Static methods
def open(address: str) ‑> Self
-
Expand source code
@classmethod def open(cls, address: str) -> Self: """ Open a connection to the MySQL instance at `address`. Raises: `spin_sdk.wit.types.Err(spin_sdk.wit.imports.rdbms_types.Error)` """ raise NotImplementedError
Methods
def execute(self, statement: str, params: List[Union[ParameterValueBoolean, ParameterValueInt8, ParameterValueInt16, ParameterValueInt32, ParameterValueInt64, ParameterValueUint8, ParameterValueUint16, ParameterValueUint32, ParameterValueUint64, ParameterValueFloating32, ParameterValueFloating64, ParameterValueStr, ParameterValueBinary, ParameterValueDbNull]]) ‑> None
-
Expand source code
def execute(self, statement: str, params: List[rdbms_types.ParameterValue]) -> None: """ execute command to the database: insert, update, delete Raises: `spin_sdk.wit.types.Err(spin_sdk.wit.imports.rdbms_types.Error)` """ raise NotImplementedError
def query(self, statement: str, params: List[Union[ParameterValueBoolean, ParameterValueInt8, ParameterValueInt16, ParameterValueInt32, ParameterValueInt64, ParameterValueUint8, ParameterValueUint16, ParameterValueUint32, ParameterValueUint64, ParameterValueFloating32, ParameterValueFloating64, ParameterValueStr, ParameterValueBinary, ParameterValueDbNull]]) ‑> RowSet
-
Expand source code
def query(self, statement: str, params: List[rdbms_types.ParameterValue]) -> rdbms_types.RowSet: """ query the database: select Raises: `spin_sdk.wit.types.Err(spin_sdk.wit.imports.rdbms_types.Error)` """ raise NotImplementedError