Module spin_sdk.wit.imports.postgres

Expand source code
from typing import TypeVar, Generic, Union, Optional, Union, Protocol, Tuple, List, Any, Self
from enum import Flag, Enum, auto
from dataclasses import dataclass
from abc import abstractmethod
import weakref

from ..types import Result, Ok, Err, Some
from ..imports import rdbms_types

class Connection:
    """
    A connection to a postgres database.
    """
    
    @classmethod
    def open(cls, address: str) -> Self:
        """
        Open a connection to the Postgres instance at `address`.
        
        Raises: `spin_sdk.wit.types.Err(spin_sdk.wit.imports.rdbms_types.Error)`
        """
        raise NotImplementedError

    def query(self, statement: str, params: List[rdbms_types.ParameterValue]) -> rdbms_types.RowSet:
        """
        Query the database.
        
        Raises: `spin_sdk.wit.types.Err(spin_sdk.wit.imports.rdbms_types.Error)`
        """
        raise NotImplementedError

    def execute(self, statement: str, params: List[rdbms_types.ParameterValue]) -> int:
        """
        Execute command to the database.
        
        Raises: `spin_sdk.wit.types.Err(spin_sdk.wit.imports.rdbms_types.Error)`
        """
        raise NotImplementedError

    def __enter__(self):
        """Returns self"""
        return self
                                                                    
    def __exit__(self, *args):
        """
        Release this resource.
        """
        raise NotImplementedError

Classes

class Connection

A connection to a postgres database.

Expand source code
class Connection:
    """
    A connection to a postgres database.
    """
    
    @classmethod
    def open(cls, address: str) -> Self:
        """
        Open a connection to the Postgres instance at `address`.
        
        Raises: `spin_sdk.wit.types.Err(spin_sdk.wit.imports.rdbms_types.Error)`
        """
        raise NotImplementedError

    def query(self, statement: str, params: List[rdbms_types.ParameterValue]) -> rdbms_types.RowSet:
        """
        Query the database.
        
        Raises: `spin_sdk.wit.types.Err(spin_sdk.wit.imports.rdbms_types.Error)`
        """
        raise NotImplementedError

    def execute(self, statement: str, params: List[rdbms_types.ParameterValue]) -> int:
        """
        Execute command to the database.
        
        Raises: `spin_sdk.wit.types.Err(spin_sdk.wit.imports.rdbms_types.Error)`
        """
        raise NotImplementedError

    def __enter__(self):
        """Returns self"""
        return self
                                                                    
    def __exit__(self, *args):
        """
        Release this resource.
        """
        raise NotImplementedError

Static methods

def open(address: str) ‑> Self

Open a connection to the Postgres instance at address.

Raises: Err(Error)

Expand source code
@classmethod
def open(cls, address: str) -> Self:
    """
    Open a connection to the Postgres instance at `address`.
    
    Raises: `spin_sdk.wit.types.Err(spin_sdk.wit.imports.rdbms_types.Error)`
    """
    raise NotImplementedError

Methods

def execute(self, statement: str, params: List[Union[ParameterValueBooleanParameterValueInt8ParameterValueInt16ParameterValueInt32ParameterValueInt64ParameterValueUint8ParameterValueUint16ParameterValueUint32ParameterValueUint64ParameterValueFloating32ParameterValueFloating64ParameterValueStrParameterValueBinaryParameterValueDbNull]]) ‑> int

Execute command to the database.

Raises: Err(Error)

Expand source code
def execute(self, statement: str, params: List[rdbms_types.ParameterValue]) -> int:
    """
    Execute command to the database.
    
    Raises: `spin_sdk.wit.types.Err(spin_sdk.wit.imports.rdbms_types.Error)`
    """
    raise NotImplementedError
def query(self, statement: str, params: List[Union[ParameterValueBooleanParameterValueInt8ParameterValueInt16ParameterValueInt32ParameterValueInt64ParameterValueUint8ParameterValueUint16ParameterValueUint32ParameterValueUint64ParameterValueFloating32ParameterValueFloating64ParameterValueStrParameterValueBinaryParameterValueDbNull]]) ‑> RowSet

Query the database.

Raises: Err(Error)

Expand source code
def query(self, statement: str, params: List[rdbms_types.ParameterValue]) -> rdbms_types.RowSet:
    """
    Query the database.
    
    Raises: `spin_sdk.wit.types.Err(spin_sdk.wit.imports.rdbms_types.Error)`
    """
    raise NotImplementedError