spin_factor_key_value/
lib.rsmod host;
pub mod runtime_config;
mod util;
use std::{
collections::{HashMap, HashSet},
sync::Arc,
};
use anyhow::ensure;
use spin_factors::{
ConfigureAppContext, Factor, FactorInstanceBuilder, InitContext, PrepareContext, RuntimeFactors,
};
use spin_locked_app::MetadataKey;
pub const KEY_VALUE_STORES_KEY: MetadataKey<Vec<String>> = MetadataKey::new("key_value_stores");
pub use host::{log_cas_error, log_error, Error, KeyValueDispatch, Store, StoreManager};
pub use runtime_config::RuntimeConfig;
use spin_core::async_trait;
pub use util::{CachingStoreManager, DelegatingStoreManager};
#[derive(Default)]
pub struct KeyValueFactor {
_priv: (),
}
impl KeyValueFactor {
pub fn new() -> Self {
Self { _priv: () }
}
}
impl Factor for KeyValueFactor {
type RuntimeConfig = RuntimeConfig;
type AppState = AppState;
type InstanceBuilder = InstanceBuilder;
fn init<T: Send + 'static>(&mut self, mut ctx: InitContext<T, Self>) -> anyhow::Result<()> {
ctx.link_bindings(spin_world::v1::key_value::add_to_linker)?;
ctx.link_bindings(spin_world::v2::key_value::add_to_linker)?;
ctx.link_bindings(spin_world::wasi::keyvalue::store::add_to_linker)?;
ctx.link_bindings(spin_world::wasi::keyvalue::batch::add_to_linker)?;
ctx.link_bindings(spin_world::wasi::keyvalue::atomics::add_to_linker)?;
Ok(())
}
fn configure_app<T: RuntimeFactors>(
&self,
mut ctx: ConfigureAppContext<T, Self>,
) -> anyhow::Result<Self::AppState> {
let store_managers = ctx.take_runtime_config().unwrap_or_default();
let delegating_manager = DelegatingStoreManager::new(store_managers);
let caching_manager = CachingStoreManager::new(delegating_manager);
let store_manager = Arc::new(caching_manager);
let mut component_allowed_stores = HashMap::new();
for component in ctx.app().components() {
let component_id = component.id().to_string();
let key_value_stores = component
.get_metadata(KEY_VALUE_STORES_KEY)?
.unwrap_or_default()
.into_iter()
.collect::<HashSet<_>>();
for label in &key_value_stores {
ensure!(
store_manager.is_defined(label),
"unknown key_value_stores label {label:?} for component {component_id:?}"
);
}
component_allowed_stores.insert(component_id, key_value_stores);
}
Ok(AppState {
store_manager,
component_allowed_stores,
})
}
fn prepare<T: RuntimeFactors>(
&self,
ctx: PrepareContext<T, Self>,
) -> anyhow::Result<InstanceBuilder> {
let app_state = ctx.app_state();
let allowed_stores = app_state
.component_allowed_stores
.get(ctx.app_component().id())
.expect("component should be in component_stores")
.clone();
Ok(InstanceBuilder {
store_manager: app_state.store_manager.clone(),
allowed_stores,
})
}
}
type AppStoreManager = CachingStoreManager<DelegatingStoreManager>;
pub struct AppState {
store_manager: Arc<AppStoreManager>,
component_allowed_stores: HashMap<String, HashSet<String>>,
}
impl AppState {
pub fn store_summary(&self, label: &str) -> Option<String> {
self.store_manager.summary(label)
}
pub fn store_is_used(&self, label: &str) -> bool {
self.component_allowed_stores
.values()
.any(|stores| stores.contains(label))
}
pub async fn get_store(&self, label: &str) -> Option<Arc<dyn Store>> {
self.store_manager.get(label).await.ok()
}
}
#[derive(Debug, thiserror::Error)]
pub enum SwapError {
#[error("{0}")]
CasFailed(String),
#[error("{0}")]
Other(String),
}
#[async_trait]
pub trait Cas: Sync + Send {
async fn current(&self) -> anyhow::Result<Option<Vec<u8>>, Error>;
async fn swap(&self, value: Vec<u8>) -> anyhow::Result<(), SwapError>;
async fn bucket_rep(&self) -> u32;
async fn key(&self) -> String;
}
pub struct InstanceBuilder {
store_manager: Arc<AppStoreManager>,
allowed_stores: HashSet<String>,
}
impl FactorInstanceBuilder for InstanceBuilder {
type InstanceState = KeyValueDispatch;
fn build(self) -> anyhow::Result<Self::InstanceState> {
let Self {
store_manager,
allowed_stores,
} = self;
Ok(KeyValueDispatch::new_with_capacity(
allowed_stores,
store_manager,
u32::MAX,
))
}
}