Read spamc stdout in separate thread
This commit is contained in:
parent
792619e9bc
commit
e9118ec42e
3 changed files with 41 additions and 14 deletions
|
@ -2,6 +2,10 @@
|
|||
|
||||
## 0.1.5 (unreleased)
|
||||
|
||||
* Read output from `spamc` in a separate thread in order to avoid blocking
|
||||
when processing large messages in certain configurations.
|
||||
* Document requirement to keep `--max-message-size` setting in sync with
|
||||
`spamc`’s `--max-size` setting.
|
||||
* Properly specify minimal dependency versions in `Cargo.toml`.
|
||||
* Document minimum supported Rust version 1.42.0.
|
||||
|
||||
|
|
|
@ -83,11 +83,13 @@ Print usage information.
|
|||
Maximum message size in bytes to pass to
|
||||
.BR spamc .
|
||||
.I BYTES
|
||||
must be equal to or greater than the max size configured for
|
||||
.BR spamc ,
|
||||
in order to ensure that SpamAssassin does not process messages truncated to the
|
||||
size configured for
|
||||
.BR spamc .
|
||||
should be equal to the max size configured for
|
||||
.B spamc
|
||||
(option
|
||||
.BR \-\-max-size );
|
||||
these two settings are designed to be in sync.
|
||||
Messages with a size exceeding the maximum message size are not processed with
|
||||
SpamAssassin.
|
||||
Defaults to the
|
||||
.B spamc
|
||||
default,
|
||||
|
|
|
@ -6,9 +6,10 @@ use crate::{
|
|||
use milter::{ActionContext, SetErrorReply, Status};
|
||||
use std::{
|
||||
any::Any,
|
||||
io::Write,
|
||||
io::{self, Read, Write},
|
||||
os::unix::process::ExitStatusExt,
|
||||
process::{Child, Command, Stdio},
|
||||
thread::{self, JoinHandle},
|
||||
};
|
||||
|
||||
pub trait Process {
|
||||
|
@ -21,6 +22,7 @@ pub trait Process {
|
|||
pub struct Spamc {
|
||||
spamc_args: &'static [String],
|
||||
spamc: Option<Child>,
|
||||
stdout_reader: Option<JoinHandle<io::Result<Vec<u8>>>>,
|
||||
}
|
||||
|
||||
impl Spamc {
|
||||
|
@ -30,6 +32,7 @@ impl Spamc {
|
|||
Self {
|
||||
spamc_args,
|
||||
spamc: None,
|
||||
stdout_reader: None,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -38,13 +41,24 @@ impl Process for Spamc {
|
|||
fn connect(&mut self) -> Result<()> {
|
||||
// `Command::spawn` always succeeds when `spamc` can be invoked, even if
|
||||
// logically the command is invalid, eg if it uses non-existing options.
|
||||
let child = Command::new(Spamc::SPAMC_PROGRAM)
|
||||
let mut spamc = Command::new(Spamc::SPAMC_PROGRAM)
|
||||
.args(self.spamc_args)
|
||||
.stdin(Stdio::piped())
|
||||
.stdout(Stdio::piped())
|
||||
.spawn()?;
|
||||
|
||||
self.spamc = Some(child);
|
||||
let mut stdout = spamc.stdout.take().unwrap();
|
||||
|
||||
self.spamc = Some(spamc);
|
||||
|
||||
// When processing large messages, `spamc` may begin to write its
|
||||
// response to stdout while it is still receiving parts of the message
|
||||
// body. Avoid blocking by reading stdout in a separate thread.
|
||||
self.stdout_reader = Some(thread::spawn(move || {
|
||||
let mut output = Vec::new();
|
||||
stdout.read_to_end(&mut output)?;
|
||||
Ok(output)
|
||||
}));
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
@ -56,17 +70,24 @@ impl Process for Spamc {
|
|||
}
|
||||
|
||||
fn finish(&mut self) -> Result<Vec<u8>> {
|
||||
let spamc = self.spamc.take().expect("spamc process not started");
|
||||
let mut spamc = self.spamc.take().expect("spamc process not started");
|
||||
|
||||
let output = spamc.wait_with_output()?;
|
||||
let status = spamc.wait()?;
|
||||
|
||||
if output.status.success() {
|
||||
Ok(output.stdout)
|
||||
let stdout = self
|
||||
.stdout_reader
|
||||
.take()
|
||||
.expect("spamc stdout reader thread not available")
|
||||
.join()
|
||||
.expect("panic in spamc stdout reader thread")?;
|
||||
|
||||
if status.success() {
|
||||
Ok(stdout)
|
||||
} else {
|
||||
Err(match output.status.code() {
|
||||
Err(match status.code() {
|
||||
None => Error::Io(format!(
|
||||
"spamc terminated by signal {}",
|
||||
output.status.signal().unwrap()
|
||||
status.signal().unwrap()
|
||||
)),
|
||||
Some(code) => Error::Io(format!("spamc exited with status code {}", code)),
|
||||
})
|
||||
|
|
Loading…
Reference in a new issue