From e9118ec42e175c5e43dc701e31aff2f58e4e20a7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?David=20B=C3=BCrgin?= Date: Mon, 8 Mar 2021 10:01:25 +0100 Subject: [PATCH] Read spamc stdout in separate thread --- CHANGELOG.md | 4 ++++ spamassassin-milter.8 | 12 +++++++----- src/client.rs | 39 ++++++++++++++++++++++++++++++--------- 3 files changed, 41 insertions(+), 14 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index ba2d5e8..394257c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,10 @@ ## 0.1.5 (unreleased) +* Read output from `spamc` in a separate thread in order to avoid blocking + when processing large messages in certain configurations. +* Document requirement to keep `--max-message-size` setting in sync with + `spamc`’s `--max-size` setting. * Properly specify minimal dependency versions in `Cargo.toml`. * Document minimum supported Rust version 1.42.0. diff --git a/spamassassin-milter.8 b/spamassassin-milter.8 index c9d49df..665f9e4 100644 --- a/spamassassin-milter.8 +++ b/spamassassin-milter.8 @@ -83,11 +83,13 @@ Print usage information. Maximum message size in bytes to pass to .BR spamc . .I BYTES -must be equal to or greater than the max size configured for -.BR spamc , -in order to ensure that SpamAssassin does not process messages truncated to the -size configured for -.BR spamc . +should be equal to the max size configured for +.B spamc +(option +.BR \-\-max-size ); +these two settings are designed to be in sync. +Messages with a size exceeding the maximum message size are not processed with +SpamAssassin. Defaults to the .B spamc default, diff --git a/src/client.rs b/src/client.rs index 72a975b..a143af4 100644 --- a/src/client.rs +++ b/src/client.rs @@ -6,9 +6,10 @@ use crate::{ use milter::{ActionContext, SetErrorReply, Status}; use std::{ any::Any, - io::Write, + io::{self, Read, Write}, os::unix::process::ExitStatusExt, process::{Child, Command, Stdio}, + thread::{self, JoinHandle}, }; pub trait Process { @@ -21,6 +22,7 @@ pub trait Process { pub struct Spamc { spamc_args: &'static [String], spamc: Option, + stdout_reader: Option>>>, } impl Spamc { @@ -30,6 +32,7 @@ impl Spamc { Self { spamc_args, spamc: None, + stdout_reader: None, } } } @@ -38,13 +41,24 @@ impl Process for Spamc { fn connect(&mut self) -> Result<()> { // `Command::spawn` always succeeds when `spamc` can be invoked, even if // logically the command is invalid, eg if it uses non-existing options. - let child = Command::new(Spamc::SPAMC_PROGRAM) + let mut spamc = Command::new(Spamc::SPAMC_PROGRAM) .args(self.spamc_args) .stdin(Stdio::piped()) .stdout(Stdio::piped()) .spawn()?; - self.spamc = Some(child); + let mut stdout = spamc.stdout.take().unwrap(); + + self.spamc = Some(spamc); + + // When processing large messages, `spamc` may begin to write its + // response to stdout while it is still receiving parts of the message + // body. Avoid blocking by reading stdout in a separate thread. + self.stdout_reader = Some(thread::spawn(move || { + let mut output = Vec::new(); + stdout.read_to_end(&mut output)?; + Ok(output) + })); Ok(()) } @@ -56,17 +70,24 @@ impl Process for Spamc { } fn finish(&mut self) -> Result> { - let spamc = self.spamc.take().expect("spamc process not started"); + let mut spamc = self.spamc.take().expect("spamc process not started"); - let output = spamc.wait_with_output()?; + let status = spamc.wait()?; - if output.status.success() { - Ok(output.stdout) + let stdout = self + .stdout_reader + .take() + .expect("spamc stdout reader thread not available") + .join() + .expect("panic in spamc stdout reader thread")?; + + if status.success() { + Ok(stdout) } else { - Err(match output.status.code() { + Err(match status.code() { None => Error::Io(format!( "spamc terminated by signal {}", - output.status.signal().unwrap() + status.signal().unwrap() )), Some(code) => Error::Io(format!("spamc exited with status code {}", code)), })