Module spin_sdk.wit.imports.mysql

Expand source code
from typing import TypeVar, Generic, Union, Optional, Protocol, Tuple, List, Any, Self
from enum import Flag, Enum, auto
from dataclasses import dataclass
from abc import abstractmethod
import weakref

from ..types import Result, Ok, Err, Some
from ..imports import rdbms_types

class Connection:
    """
    A connection to a MySQL database.
    """
    
    @classmethod
    def open(cls, address: str) -> Self:
        """
        Open a connection to the MySQL instance at `address`.
        
        Raises: `spin_sdk.wit.types.Err(spin_sdk.wit.imports.rdbms_types.Error)`
        """
        raise NotImplementedError

    def query(self, statement: str, params: List[rdbms_types.ParameterValue]) -> rdbms_types.RowSet:
        """
        query the database: select
        
        Raises: `spin_sdk.wit.types.Err(spin_sdk.wit.imports.rdbms_types.Error)`
        """
        raise NotImplementedError

    def execute(self, statement: str, params: List[rdbms_types.ParameterValue]) -> None:
        """
        execute command to the database: insert, update, delete
        
        Raises: `spin_sdk.wit.types.Err(spin_sdk.wit.imports.rdbms_types.Error)`
        """
        raise NotImplementedError

    def __enter__(self):
        """Returns self"""
        return self
                                                                    
    def __exit__(self, *args):
        """
        Release this resource.
        """
        raise NotImplementedError

Classes

class Connection

A connection to a MySQL database.

Expand source code
class Connection:
    """
    A connection to a MySQL database.
    """
    
    @classmethod
    def open(cls, address: str) -> Self:
        """
        Open a connection to the MySQL instance at `address`.
        
        Raises: `spin_sdk.wit.types.Err(spin_sdk.wit.imports.rdbms_types.Error)`
        """
        raise NotImplementedError

    def query(self, statement: str, params: List[rdbms_types.ParameterValue]) -> rdbms_types.RowSet:
        """
        query the database: select
        
        Raises: `spin_sdk.wit.types.Err(spin_sdk.wit.imports.rdbms_types.Error)`
        """
        raise NotImplementedError

    def execute(self, statement: str, params: List[rdbms_types.ParameterValue]) -> None:
        """
        execute command to the database: insert, update, delete
        
        Raises: `spin_sdk.wit.types.Err(spin_sdk.wit.imports.rdbms_types.Error)`
        """
        raise NotImplementedError

    def __enter__(self):
        """Returns self"""
        return self
                                                                    
    def __exit__(self, *args):
        """
        Release this resource.
        """
        raise NotImplementedError

Static methods

def open(address: str) ‑> Self

Open a connection to the MySQL instance at address.

Raises: Err(Error)

Expand source code
@classmethod
def open(cls, address: str) -> Self:
    """
    Open a connection to the MySQL instance at `address`.
    
    Raises: `spin_sdk.wit.types.Err(spin_sdk.wit.imports.rdbms_types.Error)`
    """
    raise NotImplementedError

Methods

def execute(self, statement: str, params: List[Union[ParameterValue_BooleanParameterValue_Int8ParameterValue_Int16ParameterValue_Int32ParameterValue_Int64ParameterValue_Uint8ParameterValue_Uint16ParameterValue_Uint32ParameterValue_Uint64ParameterValue_Floating32ParameterValue_Floating64ParameterValue_StrParameterValue_BinaryParameterValue_DbNull]]) ‑> None

execute command to the database: insert, update, delete

Raises: Err(Error)

Expand source code
def execute(self, statement: str, params: List[rdbms_types.ParameterValue]) -> None:
    """
    execute command to the database: insert, update, delete
    
    Raises: `spin_sdk.wit.types.Err(spin_sdk.wit.imports.rdbms_types.Error)`
    """
    raise NotImplementedError
def query(self, statement: str, params: List[Union[ParameterValue_BooleanParameterValue_Int8ParameterValue_Int16ParameterValue_Int32ParameterValue_Int64ParameterValue_Uint8ParameterValue_Uint16ParameterValue_Uint32ParameterValue_Uint64ParameterValue_Floating32ParameterValue_Floating64ParameterValue_StrParameterValue_BinaryParameterValue_DbNull]]) ‑> RowSet

query the database: select

Raises: Err(Error)

Expand source code
def query(self, statement: str, params: List[rdbms_types.ParameterValue]) -> rdbms_types.RowSet:
    """
    query the database: select
    
    Raises: `spin_sdk.wit.types.Err(spin_sdk.wit.imports.rdbms_types.Error)`
    """
    raise NotImplementedError