Wasm-wc: Run src/lib.rs through rustfmt

Run from the repository root like

  $ rustfmt --edition 2021 src/wasm-wasi-component/src/lib.rs

Also manually fix up some overly long comments.

Signed-off-by: Andrew Clayton <a.clayton@nginx.com>
This commit is contained in:
Andrew Clayton 2024-02-06 16:33:59 +00:00
parent a9345dd46e
commit 79c8177247

View file

@ -9,7 +9,9 @@ use std::sync::OnceLock;
use tokio::sync::mpsc; use tokio::sync::mpsc;
use wasmtime::component::{Component, InstancePre, Linker}; use wasmtime::component::{Component, InstancePre, Linker};
use wasmtime::{Config, Engine, Store}; use wasmtime::{Config, Engine, Store};
use wasmtime_wasi::preview2::{DirPerms, FilePerms, Table, WasiCtx, WasiCtxBuilder, WasiView}; use wasmtime_wasi::preview2::{
DirPerms, FilePerms, Table, WasiCtx, WasiCtxBuilder, WasiView,
};
use wasmtime_wasi::{ambient_authority, Dir}; use wasmtime_wasi::{ambient_authority, Dir};
use wasmtime_wasi_http::{WasiHttpCtx, WasiHttpView}; use wasmtime_wasi_http::{WasiHttpCtx, WasiHttpView};
@ -70,11 +72,17 @@ unsafe extern "C" fn setup(
ptr::null_mut(), ptr::null_mut(),
); );
for i in 0..bindings::nxt_conf_object_members_count(dirs_ptr) { for i in 0..bindings::nxt_conf_object_members_count(dirs_ptr) {
let value = bindings::nxt_conf_get_array_element(dirs_ptr, i.try_into().unwrap()); let value = bindings::nxt_conf_get_array_element(
dirs_ptr,
i.try_into().unwrap(),
);
let mut s = bindings::nxt_string(""); let mut s = bindings::nxt_string("");
bindings::nxt_conf_get_string(value, &mut s); bindings::nxt_conf_get_string(value, &mut s);
dirs.push( dirs.push(
std::str::from_utf8(std::slice::from_raw_parts(s.start, s.length))?.to_string(), std::str::from_utf8(std::slice::from_raw_parts(
s.start, s.length,
))?
.to_string(),
); );
} }
} }
@ -94,13 +102,15 @@ unsafe extern "C" fn start(
) -> bindings::nxt_int_t { ) -> bindings::nxt_int_t {
handle_result(task, || { handle_result(task, || {
let config = GLOBAL_CONFIG.get().unwrap(); let config = GLOBAL_CONFIG.get().unwrap();
let state = GlobalState::new(&config).context("failed to create initial state")?; let state = GlobalState::new(&config)
.context("failed to create initial state")?;
let res = GLOBAL_STATE.set(state); let res = GLOBAL_STATE.set(state);
assert!(res.is_ok()); assert!(res.is_ok());
let conf = (*data).app; let conf = (*data).app;
let mut wasm_init = MaybeUninit::uninit(); let mut wasm_init = MaybeUninit::uninit();
let ret = bindings::nxt_unit_default_init(task, wasm_init.as_mut_ptr(), conf); let ret =
bindings::nxt_unit_default_init(task, wasm_init.as_mut_ptr(), conf);
if ret != bindings::NXT_OK as bindings::nxt_int_t { if ret != bindings::NXT_OK as bindings::nxt_int_t {
bail!("nxt_unit_default_init() failed"); bail!("nxt_unit_default_init() failed");
} }
@ -144,7 +154,9 @@ unsafe fn handle_result(
} }
} }
unsafe extern "C" fn request_handler(info: *mut bindings::nxt_unit_request_info_t) { unsafe extern "C" fn request_handler(
info: *mut bindings::nxt_unit_request_info_t,
) {
// Enqueue this request to get processed by the Tokio event loop, and // Enqueue this request to get processed by the Tokio event loop, and
// otherwise immediately return. // otherwise immediately return.
let state = GLOBAL_STATE.get().unwrap(); let state = GLOBAL_STATE.get().unwrap();
@ -235,8 +247,15 @@ impl GlobalState {
cx.inherit_stderr(); cx.inherit_stderr();
for dir in self.global_config.dirs.iter() { for dir in self.global_config.dirs.iter() {
let fd = Dir::open_ambient_dir(dir, ambient_authority()) let fd = Dir::open_ambient_dir(dir, ambient_authority())
.with_context(|| format!("failed to open directory '{dir}'"))?; .with_context(|| {
cx.preopened_dir(fd, DirPerms::all(), FilePerms::all(), dir); format!("failed to open directory '{dir}'")
})?;
cx.preopened_dir(
fd,
DirPerms::all(),
FilePerms::all(),
dir,
);
} }
cx.build() cx.build()
}, },
@ -262,10 +281,12 @@ impl GlobalState {
// generate headers, write those below, and then compute the body // generate headers, write those below, and then compute the body
// afterwards. // afterwards.
let task = tokio::spawn(async move { let task = tokio::spawn(async move {
let (proxy, _) = let (proxy, _) = wasmtime_wasi_http::proxy::Proxy::instantiate_pre(
wasmtime_wasi_http::proxy::Proxy::instantiate_pre(&mut store, &self.component) &mut store,
.await &self.component,
.context("failed to instantiate")?; )
.await
.context("failed to instantiate")?;
let req = store.data_mut().new_incoming_request(request)?; let req = store.data_mut().new_incoming_request(request)?;
let out = store.data_mut().new_response_outparam(sender)?; let out = store.data_mut().new_response_outparam(sender)?;
proxy proxy
@ -308,7 +329,10 @@ impl GlobalState {
Ok(()) Ok(())
} }
fn to_request_builder(&self, info: &NxtRequestInfo) -> Result<http::request::Builder> { fn to_request_builder(
&self,
info: &NxtRequestInfo,
) -> Result<http::request::Builder> {
let mut request = http::Request::builder(); let mut request = http::Request::builder();
request = request.method(info.method()); request = request.method(info.method());
@ -338,12 +362,16 @@ impl GlobalState {
Ok(request) Ok(request)
} }
fn to_request_body(&self, info: &mut NxtRequestInfo) -> BoxBody<Bytes, anyhow::Error> { fn to_request_body(
// TODO: should convert the body into a form of `Stream` to become an async &self,
// stream of frames. The return value can represent that here but for now info: &mut NxtRequestInfo,
// this slurps up the entire body into memory and puts it all in a single ) -> BoxBody<Bytes, anyhow::Error> {
// `BytesMut` which is then converted to `Bytes`. // TODO: should convert the body into a form of `Stream` to become an
let mut body = BytesMut::with_capacity(info.content_length().try_into().unwrap()); // async stream of frames. The return value can represent that here
// but for now this slurps up the entire body into memory and puts it
// all in a single `BytesMut` which is then converted to `Bytes`.
let mut body =
BytesMut::with_capacity(info.content_length().try_into().unwrap());
// TODO: can this perform a partial read? // TODO: can this perform a partial read?
// TODO: how to make this async at the nxt level? // TODO: how to make this async at the nxt level?
@ -352,7 +380,11 @@ impl GlobalState {
Full::new(body.freeze()).map_err(|e| match e {}).boxed() Full::new(body.freeze()).map_err(|e| match e {}).boxed()
} }
fn send_response<T>(&self, info: &mut NxtRequestInfo, response: http::Response<T>) -> T { fn send_response<T>(
&self,
info: &mut NxtRequestInfo,
response: http::Response<T>,
) -> T {
info.init_response( info.init_response(
response.status().as_u16(), response.status().as_u16(),
response.headers().len().try_into().unwrap(), response.headers().len().try_into().unwrap(),
@ -378,9 +410,9 @@ impl GlobalState {
mut body: BoxBody<Bytes, anyhow::Error>, mut body: BoxBody<Bytes, anyhow::Error>,
) -> Result<()> { ) -> Result<()> {
loop { loop {
// Acquire the next frame, and because nothing is actually async at the // Acquire the next frame, and because nothing is actually async
// moment this should never block meaning that the `Pending` case // at the moment this should never block meaning that the
// should not happen. // `Pending` case should not happen.
let frame = match body.frame().await { let frame = match body.frame().await {
Some(Ok(frame)) => frame, Some(Ok(frame)) => frame,
Some(Err(e)) => break Err(e), Some(Err(e)) => break Err(e),
@ -451,8 +483,10 @@ impl NxtRequestInfo {
let raw = (*self.info).request; let raw = (*self.info).request;
(0..(*raw).fields_count).map(move |i| { (0..(*raw).fields_count).map(move |i| {
let field = (*raw).fields.as_ptr().add(i as usize); let field = (*raw).fields.as_ptr().add(i as usize);
let name = self.get_str(&(*field).name, (*field).name_length.into()); let name =
let value = self.get_str(&(*field).value, (*field).value_length.into()); self.get_str(&(*field).name, (*field).name_length.into());
let value =
self.get_str(&(*field).value, (*field).value_length.into());
(name, value) (name, value)
}) })
} }
@ -461,8 +495,11 @@ impl NxtRequestInfo {
fn request_read(&mut self, dst: &mut BytesMut) { fn request_read(&mut self, dst: &mut BytesMut) {
unsafe { unsafe {
let rest = dst.spare_capacity_mut(); let rest = dst.spare_capacity_mut();
let amt = let amt = bindings::nxt_unit_request_read(
bindings::nxt_unit_request_read(self.info, rest.as_mut_ptr().cast(), rest.len()); self.info,
rest.as_mut_ptr().cast(),
rest.len(),
);
// TODO: handle failure when `amt` is negative // TODO: handle failure when `amt` is negative
let amt: usize = amt.try_into().unwrap(); let amt: usize = amt.try_into().unwrap();
dst.set_len(dst.len() + amt); dst.set_len(dst.len() + amt);
@ -471,14 +508,23 @@ impl NxtRequestInfo {
fn response_write(&mut self, data: &[u8]) { fn response_write(&mut self, data: &[u8]) {
unsafe { unsafe {
let rc = bindings::nxt_unit_response_write(self.info, data.as_ptr().cast(), data.len()); let rc = bindings::nxt_unit_response_write(
self.info,
data.as_ptr().cast(),
data.len(),
);
assert_eq!(rc, 0); assert_eq!(rc, 0);
} }
} }
fn init_response(&mut self, status: u16, headers: u32, headers_size: u32) { fn init_response(&mut self, status: u16, headers: u32, headers_size: u32) {
unsafe { unsafe {
let rc = bindings::nxt_unit_response_init(self.info, status, headers, headers_size); let rc = bindings::nxt_unit_response_init(
self.info,
status,
headers,
headers_size,
);
assert_eq!(rc, 0); assert_eq!(rc, 0);
} }
} }
@ -505,11 +551,18 @@ impl NxtRequestInfo {
fn request_done(self) { fn request_done(self) {
unsafe { unsafe {
bindings::nxt_unit_request_done(self.info, bindings::NXT_UNIT_OK as i32); bindings::nxt_unit_request_done(
self.info,
bindings::NXT_UNIT_OK as i32,
);
} }
} }
unsafe fn get_str(&self, ptr: &bindings::nxt_unit_sptr_t, len: u32) -> &str { unsafe fn get_str(
&self,
ptr: &bindings::nxt_unit_sptr_t,
len: u32,
) -> &str {
let ptr = bindings::nxt_unit_sptr_get(ptr); let ptr = bindings::nxt_unit_sptr_get(ptr);
let slice = std::slice::from_raw_parts(ptr, len.try_into().unwrap()); let slice = std::slice::from_raw_parts(ptr, len.try_into().unwrap());
std::str::from_utf8(slice).unwrap() std::str::from_utf8(slice).unwrap()