use chrono::Local; use rand::Rng; use spamassassin_milter::{Config, ConfigBuilder}; use std::{ io::{self, ErrorKind}, net::{Ipv4Addr, Ipv6Addr, SocketAddr}, time::Duration, }; use tokio::{ io::{AsyncReadExt, AsyncWriteExt}, net::{TcpListener, ToSocketAddrs}, sync::oneshot, task::JoinHandle, time, }; pub const LOCALHOST: (Ipv4Addr, u16) = (Ipv4Addr::LOCALHOST, 0); /// Configures the builder for integration testing with `spamc`. Most /// importantly, this isolates `spamc` from any configuration file /// `/etc/spamassassin/spamc.conf` present on the host, as this configuration is /// read by default and may break the integration tests. pub fn configure_spamc(builder: ConfigBuilder) -> ConfigBuilder { // Note: Must use `-F` instead of `--config` due to a bug in `spamc`. // `--no-safe-fallback` prevents connection attempts from failing silently, // and `--log-to-stderr` avoids polluting syslog with test output. builder.spamc_args(["-F", "/dev/null", "--no-safe-fallback", "--log-to-stderr"]) } pub const SPAMD_PORT: u16 = 3783; // mock port pub type HamOrSpam = Result; /// Spawns a mock `spamd` server that echoes what it is sent after applying /// transformation `f` to the message content and mock-classifying it as ham or /// spam. pub async fn spawn_mock_spamd_server(port: u16, f: F) -> io::Result>> where F: Fn(String) -> HamOrSpam + Send + 'static, { let socket_addr = (Ipv6Addr::LOCALHOST, port); let listener = TcpListener::bind(socket_addr).await?; Ok(tokio::spawn(async move { // This server expects and handles only a single connection, so that we // can `join` this task in the tests and detect errors and panics. let (mut stream, _) = time::timeout(Duration::from_secs(10), listener.accept()) .await .map_err(|e| io::Error::new(ErrorKind::Other, e))??; let mut buf = Vec::new(); stream.read_to_end(&mut buf).await?; let msg = process_message(buf, &f); stream.write_all(msg.as_bytes()).await?; Ok(()) })) } // The SpamAssassin client/server protocol is here reverse-engineered in a very // rudimentary fashion: Both client and server send a protocol header containing // a content length indication and terminated with "\r\n\r\n". The payload is // the email message itself with CRLF line endings. const SPAMD_PROTOCOL_OK: &str = "SPAMD/1.1 0 EX_OK"; fn process_message(buf: Vec, f: &F) -> String where F: Fn(String) -> HamOrSpam + Send + 'static, { let mut msg = String::from_utf8(buf).unwrap(); // Crude handling of the `spamc` client protocol: strip off everything // before and including the first "\r\n\r\n". let i = msg.find("\r\n\r\n").expect("spamc protocol header missing"); msg.drain(..i + 4); match f(msg) { // Again very basic handling of the `spamd` server protocol: add a // forged protocol header terminated with "\r\n\r\n". (This is currently // not used in tests.) Ok(ham) => format!( "{}\r\nContent-length: {}\r\nSpam: False ; 4.0 / 5.0\r\n\r\n{}", SPAMD_PROTOCOL_OK, ham.len(), ham ), Err(spam) => format!( "{}\r\nContent-length: {}\r\nSpam: True ; 6.0 / 5.0\r\n\r\n{}", SPAMD_PROTOCOL_OK, spam.len(), spam ), } } pub fn rand_msg_id() -> String { let n = rand::thread_rng().gen_range(1..=999999); format!("<{n:06}@gluet.ch>") } pub fn current_date() -> String { Local::now().to_rfc2822() } pub struct SpamAssassinMilter { milter_handle: JoinHandle>, shutdown: oneshot::Sender<()>, addr: SocketAddr, } impl SpamAssassinMilter { pub async fn spawn(addr: impl ToSocketAddrs, config: Config) -> io::Result { let listener = TcpListener::bind(addr).await?; let addr = listener.local_addr()?; let (shutdown_tx, shutdown_rx) = oneshot::channel(); let milter = tokio::spawn(spamassassin_milter::run(listener, config, shutdown_rx)); Ok(Self { milter_handle: milter, shutdown: shutdown_tx, addr, }) } pub fn addr(&self) -> SocketAddr { self.addr } pub async fn shutdown(self) -> io::Result<()> { let _ = self.shutdown.send(()); self.milter_handle.await? } }