use anyhow::{anyhow, Result};
use bytes::Bytes;
use cap_primitives::net::Pool;
use cap_std::ipnet::IpNet;
use std::{
io::{Read, Write},
mem,
path::{Path, PathBuf},
sync::{Arc, Mutex},
time::{Duration, Instant},
};
use system_interface::io::ReadReady;
use tokio::io::{AsyncRead, AsyncWrite};
use wasi_common_preview1 as wasi_preview1;
use wasmtime_wasi as wasmtime_wasi_preview1;
use wasmtime_wasi::preview2::{
self as wasi_preview2, HostInputStream, HostOutputStream, StdinStream, StdoutStream,
StreamError, StreamResult, Subscribe,
};
use wasmtime_wasi_http::types::WasiHttpCtx;
use crate::{
async_trait,
host_component::{HostComponents, HostComponentsData},
io::OutputBuffer,
limits::StoreLimitsAsync,
preview1, Data,
};
#[cfg(doc)]
use crate::EngineBuilder;
pub enum Wasi {
Preview1(wasi_preview1::WasiCtx),
Preview2 {
wasi_ctx: wasi_preview2::WasiCtx,
wasi_http_ctx: WasiHttpCtx,
},
}
#[allow(missing_docs)]
pub enum WasiVersion {
Preview1,
Preview2,
}
pub struct Store<T> {
inner: wasmtime::Store<Data<T>>,
epoch_tick_interval: Duration,
}
impl<T> Store<T> {
pub fn host_components_data(&mut self) -> &mut HostComponentsData {
&mut self.inner.data_mut().host_components_data
}
pub fn set_deadline(&mut self, deadline: Instant) {
let now = Instant::now();
let duration = deadline - now;
let ticks = if duration.is_zero() {
tracing::warn!("Execution deadline set in past: {deadline:?} < {now:?}");
0
} else {
let ticks = duration.as_micros() / self.epoch_tick_interval.as_micros();
let ticks = ticks.min(u64::MAX as u128) as u64;
ticks + 1 };
self.inner.set_epoch_deadline(ticks);
}
}
impl<T> AsRef<wasmtime::Store<Data<T>>> for Store<T> {
fn as_ref(&self) -> &wasmtime::Store<Data<T>> {
&self.inner
}
}
impl<T> AsMut<wasmtime::Store<Data<T>>> for Store<T> {
fn as_mut(&mut self) -> &mut wasmtime::Store<Data<T>> {
&mut self.inner
}
}
impl<T> wasmtime::AsContext for Store<T> {
type Data = Data<T>;
fn as_context(&self) -> wasmtime::StoreContext<'_, Self::Data> {
self.inner.as_context()
}
}
impl<T> wasmtime::AsContextMut for Store<T> {
fn as_context_mut(&mut self) -> wasmtime::StoreContextMut<'_, Self::Data> {
self.inner.as_context_mut()
}
}
pub struct StoreBuilder {
engine: wasmtime::Engine,
epoch_tick_interval: Duration,
wasi: std::result::Result<WasiCtxBuilder, String>,
host_components_data: HostComponentsData,
store_limits: StoreLimitsAsync,
net_pool: Pool,
}
impl StoreBuilder {
pub(crate) fn new(
engine: wasmtime::Engine,
epoch_tick_interval: Duration,
host_components: &HostComponents,
wasi: WasiVersion,
) -> Self {
Self {
engine,
epoch_tick_interval,
wasi: Ok(wasi.into()),
host_components_data: host_components.new_data(),
store_limits: StoreLimitsAsync::default(),
net_pool: Pool::default(),
}
}
pub fn max_memory_size(&mut self, max_memory_size: usize) {
self.store_limits = StoreLimitsAsync::new(Some(max_memory_size), None);
}
pub fn inherit_stdin(&mut self) {
self.with_wasi(|wasi| match wasi {
WasiCtxBuilder::Preview1(ctx) => {
ctx.set_stdin(Box::new(wasmtime_wasi_preview1::stdio::stdin()))
}
WasiCtxBuilder::Preview2(ctx) => {
ctx.inherit_stdin();
}
});
}
pub fn insert_ip_net_port_range(
&mut self,
ip_net: IpNet,
ports_start: u16,
ports_end: Option<u16>,
) {
self.with_wasi(|wasi| match wasi {
WasiCtxBuilder::Preview1(_) => {
panic!("Enabling network only allowed in preview2")
}
WasiCtxBuilder::Preview2(_) => {}
});
self.net_pool.insert_ip_net_port_range(
ip_net,
ports_start,
ports_end,
cap_primitives::ambient_authority(),
);
}
pub fn inherit_limited_network(&mut self) {
self.with_wasi(|wasi| match wasi {
WasiCtxBuilder::Preview1(_) => {
panic!("Enabling network only allowed in preview2")
}
WasiCtxBuilder::Preview2(ctx) => {
ctx.inherit_network();
}
});
}
pub fn stdin_pipe(
&mut self,
r: impl AsyncRead + Read + ReadReady + Send + Sync + Unpin + 'static,
) {
self.with_wasi(|wasi| match wasi {
WasiCtxBuilder::Preview1(ctx) => {
ctx.set_stdin(Box::new(wasi_preview1::pipe::ReadPipe::new(r)))
}
WasiCtxBuilder::Preview2(ctx) => {
ctx.stdin(PipeStdinStream::new(r));
}
})
}
pub fn inherit_stdout(&mut self) {
self.with_wasi(|wasi| match wasi {
WasiCtxBuilder::Preview1(ctx) => {
ctx.set_stdout(Box::new(wasmtime_wasi_preview1::stdio::stdout()))
}
WasiCtxBuilder::Preview2(ctx) => {
ctx.inherit_stdout();
}
});
}
pub fn stdout(&mut self, w: Box<dyn wasi_preview1::WasiFile>) -> Result<()> {
self.try_with_wasi(|wasi| match wasi {
WasiCtxBuilder::Preview1(ctx) => {
ctx.set_stdout(w);
Ok(())
}
WasiCtxBuilder::Preview2(_) => Err(anyhow!(
"`Store::stdout` only supported with WASI Preview 1"
)),
})
}
pub fn stdout_pipe(&mut self, w: impl AsyncWrite + Write + Send + Sync + Unpin + 'static) {
self.with_wasi(|wasi| match wasi {
WasiCtxBuilder::Preview1(ctx) => {
ctx.set_stdout(Box::new(wasi_preview1::pipe::WritePipe::new(w)))
}
WasiCtxBuilder::Preview2(ctx) => {
ctx.stdout(PipeStdoutStream::new(w));
}
})
}
pub fn stdout_buffered(&mut self) -> Result<OutputBuffer> {
let buffer = OutputBuffer::default();
self.try_with_wasi(|wasi| match wasi {
WasiCtxBuilder::Preview1(_) => Err(anyhow!(
"`Store::stdout_buffered` only supported with WASI Preview 2"
)),
WasiCtxBuilder::Preview2(ctx) => {
ctx.stdout(BufferStdoutStream(buffer.clone()));
Ok(())
}
})?;
Ok(buffer)
}
pub fn inherit_stderr(&mut self) {
self.with_wasi(|wasi| match wasi {
WasiCtxBuilder::Preview1(ctx) => {
ctx.set_stderr(Box::new(wasmtime_wasi_preview1::stdio::stderr()))
}
WasiCtxBuilder::Preview2(ctx) => {
ctx.inherit_stderr();
}
});
}
pub fn stderr_pipe(&mut self, w: impl AsyncWrite + Write + Send + Sync + Unpin + 'static) {
self.with_wasi(|wasi| match wasi {
WasiCtxBuilder::Preview1(ctx) => {
ctx.set_stderr(Box::new(wasi_preview1::pipe::WritePipe::new(w)))
}
WasiCtxBuilder::Preview2(ctx) => {
ctx.stderr(PipeStdoutStream::new(w));
}
})
}
pub fn args<'b>(&mut self, args: impl IntoIterator<Item = &'b str>) -> Result<()> {
self.try_with_wasi(|wasi| {
for arg in args {
match wasi {
WasiCtxBuilder::Preview1(ctx) => ctx.push_arg(arg)?,
WasiCtxBuilder::Preview2(ctx) => {
ctx.arg(arg);
}
}
}
Ok(())
})
}
pub fn env(
&mut self,
vars: impl IntoIterator<Item = (impl AsRef<str>, impl AsRef<str>)>,
) -> Result<()> {
self.try_with_wasi(|wasi| {
for (k, v) in vars {
match wasi {
WasiCtxBuilder::Preview1(ctx) => ctx.push_env(k.as_ref(), v.as_ref())?,
WasiCtxBuilder::Preview2(ctx) => {
ctx.env(k, v);
}
}
}
Ok(())
})
}
pub fn read_only_preopened_dir(
&mut self,
host_path: impl AsRef<Path>,
guest_path: PathBuf,
) -> Result<()> {
self.preopened_dir_impl(host_path, guest_path, false)
}
pub fn read_write_preopened_dir(
&mut self,
host_path: impl AsRef<Path>,
guest_path: PathBuf,
) -> Result<()> {
self.preopened_dir_impl(host_path, guest_path, true)
}
fn preopened_dir_impl(
&mut self,
host_path: impl AsRef<Path>,
guest_path: PathBuf,
writable: bool,
) -> Result<()> {
let cap_std_dir =
cap_std::fs::Dir::open_ambient_dir(host_path.as_ref(), cap_std::ambient_authority())?;
let path = guest_path
.to_str()
.ok_or_else(|| anyhow!("non-utf8 path: {}", guest_path.display()))?;
self.try_with_wasi(|wasi| {
match wasi {
WasiCtxBuilder::Preview1(ctx) => {
let mut dir =
Box::new(wasmtime_wasi_preview1::dir::Dir::from_cap_std(cap_std_dir)) as _;
if !writable {
dir = Box::new(preview1::ReadOnlyDir(dir));
}
ctx.push_preopened_dir(dir, path)?;
}
WasiCtxBuilder::Preview2(ctx) => {
let dir_perms = if writable {
wasi_preview2::DirPerms::all()
} else {
wasi_preview2::DirPerms::READ
};
let file_perms = wasi_preview2::FilePerms::all();
ctx.preopened_dir(cap_std_dir, dir_perms, file_perms, path);
}
}
Ok(())
})
}
pub fn host_components_data(&mut self) -> &mut HostComponentsData {
&mut self.host_components_data
}
pub fn build_with_data<T>(mut self, inner_data: T) -> Result<Store<T>> {
let net_pool = mem::take(&mut self.net_pool);
self.with_wasi(move |wasi| match wasi {
WasiCtxBuilder::Preview1(_) => {}
WasiCtxBuilder::Preview2(ctx) => {
ctx.socket_addr_check(move |addr, _| net_pool.check_addr(addr).is_ok());
}
});
let wasi = self.wasi.map_err(anyhow::Error::msg)?.build();
let mut inner = wasmtime::Store::new(
&self.engine,
Data {
inner: inner_data,
wasi,
host_components_data: self.host_components_data,
store_limits: self.store_limits,
table: wasi_preview2::ResourceTable::new(),
},
);
inner.limiter_async(move |data| &mut data.store_limits);
inner.set_epoch_deadline(u64::MAX / 2);
Ok(Store {
inner,
epoch_tick_interval: self.epoch_tick_interval,
})
}
pub fn build<T: Default>(self) -> Result<Store<T>> {
self.build_with_data(T::default())
}
fn with_wasi(&mut self, f: impl FnOnce(&mut WasiCtxBuilder)) {
let _ = self.try_with_wasi(|wasi| {
f(wasi);
Ok(())
});
}
fn try_with_wasi(&mut self, f: impl FnOnce(&mut WasiCtxBuilder) -> Result<()>) -> Result<()> {
let wasi = self
.wasi
.as_mut()
.map_err(|err| anyhow!("StoreBuilder already failed: {}", err))?;
match f(wasi) {
Ok(()) => Ok(()),
Err(err) => {
self.wasi = Err(err.to_string());
Err(err)
}
}
}
}
struct PipeStdinStream<T> {
buffer: Vec<u8>,
inner: Arc<Mutex<T>>,
}
impl<T> PipeStdinStream<T> {
fn new(inner: T) -> Self {
Self {
buffer: vec![0_u8; 64 * 1024],
inner: Arc::new(Mutex::new(inner)),
}
}
}
impl<T> Clone for PipeStdinStream<T> {
fn clone(&self) -> Self {
Self {
buffer: vec![0_u8; 64 * 1024],
inner: self.inner.clone(),
}
}
}
impl<T: Read + Send + Sync + 'static> HostInputStream for PipeStdinStream<T> {
fn read(&mut self, size: usize) -> StreamResult<Bytes> {
let size = size.min(self.buffer.len());
let count = self
.inner
.lock()
.unwrap()
.read(&mut self.buffer[..size])
.map_err(|e| StreamError::LastOperationFailed(anyhow::anyhow!(e)))?;
Ok(Bytes::copy_from_slice(&self.buffer[..count]))
}
}
#[async_trait]
impl<T: Read + Send + Sync + 'static> Subscribe for PipeStdinStream<T> {
async fn ready(&mut self) {}
}
impl<T: Read + Send + Sync + 'static> StdinStream for PipeStdinStream<T> {
fn stream(&self) -> Box<dyn HostInputStream> {
Box::new(self.clone())
}
fn isatty(&self) -> bool {
false
}
}
struct PipeStdoutStream<T>(Arc<Mutex<T>>);
impl<T> Clone for PipeStdoutStream<T> {
fn clone(&self) -> Self {
Self(self.0.clone())
}
}
impl<T> PipeStdoutStream<T> {
fn new(inner: T) -> Self {
Self(Arc::new(Mutex::new(inner)))
}
}
impl<T: Write + Send + Sync + 'static> HostOutputStream for PipeStdoutStream<T> {
fn write(&mut self, bytes: Bytes) -> Result<(), StreamError> {
self.0
.lock()
.unwrap()
.write_all(&bytes)
.map_err(|e| StreamError::LastOperationFailed(anyhow::anyhow!(e)))
}
fn flush(&mut self) -> Result<(), StreamError> {
self.0
.lock()
.unwrap()
.flush()
.map_err(|e| StreamError::LastOperationFailed(anyhow::anyhow!(e)))
}
fn check_write(&mut self) -> Result<usize, StreamError> {
Ok(1024 * 1024)
}
}
impl<T: Write + Send + Sync + 'static> StdoutStream for PipeStdoutStream<T> {
fn stream(&self) -> Box<dyn HostOutputStream> {
Box::new(self.clone())
}
fn isatty(&self) -> bool {
false
}
}
#[async_trait]
impl<T: Write + Send + Sync + 'static> Subscribe for PipeStdoutStream<T> {
async fn ready(&mut self) {}
}
struct BufferStdoutStream(OutputBuffer);
impl StdoutStream for BufferStdoutStream {
fn stream(&self) -> Box<dyn HostOutputStream> {
Box::new(self.0.writer())
}
fn isatty(&self) -> bool {
false
}
}
#[allow(clippy::large_enum_variant)]
enum WasiCtxBuilder {
Preview1(wasi_preview1::WasiCtx),
Preview2(wasi_preview2::WasiCtxBuilder),
}
impl From<WasiVersion> for WasiCtxBuilder {
fn from(value: WasiVersion) -> Self {
match value {
WasiVersion::Preview1 => {
Self::Preview1(wasmtime_wasi_preview1::WasiCtxBuilder::new().build())
}
WasiVersion::Preview2 => Self::Preview2(wasi_preview2::WasiCtxBuilder::new()),
}
}
}
impl WasiCtxBuilder {
fn build(self) -> Wasi {
match self {
WasiCtxBuilder::Preview1(ctx) => Wasi::Preview1(ctx),
WasiCtxBuilder::Preview2(mut b) => Wasi::Preview2 {
wasi_ctx: b.build(),
wasi_http_ctx: WasiHttpCtx,
},
}
}
}