tools/unitctl: Enable Multi Socket Support

This commit refactors the CLI code to accept
multiple instances of the control socket flag.
All subcommands except for edit and save now
support being run against multiple specified
instances of unitd.

* control_socket_addresses CLI field is now a vector
* centralize error related logic into the error module
* wait_for_socket now returns a vector of sockets. all
  sockets in vector are waited upon and validated
* extraneous code is removed
* applications, execute, import, listeners, and status
  commands all run against N control sockets now
* edit and save commands return error when run against
  a single control socket

Signed-off-by: Ava Hahn <a.hahn@f5.com>
This commit is contained in:
Ava Hahn 2024-07-03 12:44:56 -07:00 committed by Ava Hahn
parent b5fe3eaf1a
commit 706ea1a689
12 changed files with 281 additions and 201 deletions

View file

@ -111,6 +111,14 @@ $ unitctl app reload wasm
}
```
*Note:* Both of the above commands support operating on multiple instances
of Unit at once. To do this, pass multiple values for the `-s` flag as
shown below:
```
$ unitctl -s '127.0.0.1:8001' -s /run/nginx-unit.control.sock app list
```
### Lists active listeners from running Unit processes
```
unitctl listeners
@ -122,6 +130,13 @@ No socket path provided - attempting to detect from running instance
}
```
*Note:* This command supports operating on multiple instances of Unit at once.
To do this, pass multiple values for the `-s` flag as shown below:
```
$ unitctl -s '127.0.0.1:8001' -s /run/nginx-unit.control.sock listeners
```
### Get the current status of NGINX Unit processes
```
$ unitctl status -t yaml
@ -136,6 +151,13 @@ requests:
applications: {}
```
*Note:* This command supports operating on multiple instances of Unit at once.
To do this, pass multiple values for the `-s` flag as shown below:
```
$ unitctl -s '127.0.0.1:8001' -s /run/nginx-unit.control.sock status
```
### Send arbitrary configuration payloads to Unit
```
$ echo '{
@ -158,6 +180,13 @@ $ echo '{
}
```
*Note:* This command supports operating on multiple instances of Unit at once.
To do this, pass multiple values for the `-s` flag as shown below:
```
$ unitctl -s '127.0.0.1:8001' -s /run/nginx-unit.control.sock execute ...
```
### Edit current configuration in your favorite editor
```
$ unitctl edit
@ -168,6 +197,8 @@ $ unitctl edit
}
```
*Note:* This command does not support operating on multiple instances of Unit at once.
### Import configuration, certificates, and NJS modules from directory
```
$ unitctl import /opt/unit/config
@ -191,6 +222,8 @@ $ unitctl export -f - > config.tar
*Note:* The exported configuration omits certificates.
*Note:* This command does not support operating on multiple instances of Unit at once.
### Wait for socket to become available
```
$ unitctl --wait-timeout-seconds=3 --wait-max-tries=4 import /opt/unit/config`

View file

@ -1,36 +1,46 @@
use crate::unitctl::{ApplicationArgs, ApplicationCommands, UnitCtl};
use crate::{wait, UnitctlError};
use crate::{wait, UnitctlError, eprint_error};
use crate::requests::send_empty_body_deserialize_response;
use unit_client_rs::unit_client::UnitClient;
pub(crate) async fn cmd(cli: &UnitCtl, args: &ApplicationArgs) -> Result<(), UnitctlError> {
let control_socket = wait::wait_for_socket(cli).await?;
let client = UnitClient::new(control_socket);
let clients: Vec<UnitClient> = wait::wait_for_sockets(cli)
.await?
.into_iter()
.map(|sock| UnitClient::new(sock))
.collect();
match &args.command {
ApplicationCommands::Reload { ref name } => client
.restart_application(name)
.await
.map_err(|e| UnitctlError::UnitClientError { source: *e })
.and_then(|r| args.output_format.write_to_stdout(&r)),
for client in clients {
let _ = match &args.command {
ApplicationCommands::Reload { ref name } => client
.restart_application(name)
.await
.map_err(|e| UnitctlError::UnitClientError { source: *e })
.and_then(|r| args.output_format.write_to_stdout(&r)),
/* we should be able to use this but the openapi generator library
* is fundamentally incorrect and provides a broken API for the
* applications endpoint.
ApplicationCommands::List {} => client
.applications()
.await
.map_err(|e| UnitctlError::UnitClientError { source: *e })
.and_then(|response| args.output_format.write_to_stdout(&response)),*/
/* we should be able to use this but the openapi generator library
* is fundamentally incorrect and provides a broken API for the
* applications endpoint.
ApplicationCommands::List {} => client
.applications()
.await
.map_err(|e| UnitctlError::UnitClientError { source: *e })
.and_then(|response| args.output_format.write_to_stdout(&response)),*/
ApplicationCommands::List {} => {
args.output_format.write_to_stdout(
&send_empty_body_deserialize_response(
&client,
"GET",
"/config/applications",
).await?
)
},
ApplicationCommands::List {} => {
args.output_format.write_to_stdout(
&send_empty_body_deserialize_response(
&client,
"GET",
"/config/applications",
).await?
)
},
}.map_err(|error| {
eprint_error(&error);
std::process::exit(error.exit_code());
});
}
Ok(())
}

View file

@ -1,6 +1,7 @@
use crate::inputfile::{InputFile, InputFormat};
use crate::requests::{send_and_validate_config_deserialize_response, send_empty_body_deserialize_response};
use crate::unitctl::UnitCtl;
use crate::unitctl_error::ControlSocketErrorKind;
use crate::{wait, OutputFormat, UnitctlError};
use std::path::{Path, PathBuf};
use unit_client_rs::unit_client::UnitClient;
@ -19,8 +20,16 @@ const EDITOR_KNOWN_LIST: [&str; 8] = [
];
pub(crate) async fn cmd(cli: &UnitCtl, output_format: OutputFormat) -> Result<(), UnitctlError> {
let control_socket = wait::wait_for_socket(cli).await?;
let client = UnitClient::new(control_socket);
if cli.control_socket_addresses.is_some() &&
cli.control_socket_addresses.clone().unwrap().len() > 1 {
return Err(UnitctlError::ControlSocketError{
kind: ControlSocketErrorKind::General,
message: "too many control sockets. specify at most one.".to_string(),
});
}
let mut control_sockets = wait::wait_for_sockets(cli).await?;
let client = UnitClient::new(control_sockets.pop().unwrap());
// Get latest configuration
let current_config = send_empty_body_deserialize_response(&client, "GET", "/config").await?;

View file

@ -5,7 +5,7 @@ use crate::requests::{
};
use crate::unitctl::UnitCtl;
use crate::wait;
use crate::{OutputFormat, UnitctlError};
use crate::{OutputFormat, UnitctlError, eprint_error};
use unit_client_rs::unit_client::UnitClient;
pub(crate) async fn cmd(
@ -15,8 +15,11 @@ pub(crate) async fn cmd(
method: &str,
path: &str,
) -> Result<(), UnitctlError> {
let control_socket = wait::wait_for_socket(cli).await?;
let client = UnitClient::new(control_socket);
let clients: Vec<_> = wait::wait_for_sockets(cli)
.await?
.into_iter()
.map(|sock| UnitClient::new(sock))
.collect();
let path_trimmed = path.trim();
let method_upper = method.to_uppercase();
@ -28,7 +31,21 @@ pub(crate) async fn cmd(
eprintln!("Cannot use GET method with input file - ignoring input file");
}
send_and_deserialize(client, method_upper, input_file_arg, path_trimmed, output_format).await
for client in clients {
let _ = send_and_deserialize(
client,
method_upper.clone(),
input_file_arg.clone(),
path_trimmed,
output_format
).await
.map_err(|e| {
eprint_error(&e);
std::process::exit(e.exit_code());
});
}
Ok(())
}
async fn send_and_deserialize(

View file

@ -50,8 +50,12 @@ pub async fn cmd(cli: &UnitCtl, directory: &PathBuf) -> Result<(), UnitctlError>
});
}
let control_socket = wait::wait_for_socket(cli).await?;
let client = UnitClient::new(control_socket);
let clients: Vec<_> = wait::wait_for_sockets(cli)
.await?
.into_iter()
.map(|sock| UnitClient::new(sock))
.collect();
let mut results = vec![];
for i in WalkDir::new(directory)
.follow_links(true)
@ -60,7 +64,9 @@ pub async fn cmd(cli: &UnitCtl, directory: &PathBuf) -> Result<(), UnitctlError>
.filter_map(Result::ok)
.filter(|e| !e.path().is_dir())
{
results.push(process_entry(i, &client).await);
for client in &clients {
results.push(process_entry(i.clone(), client).await);
}
}
if results.iter().filter(|r| r.is_err()).count() == results.len() {

View file

@ -1,14 +1,23 @@
use crate::unitctl::UnitCtl;
use crate::wait;
use crate::{OutputFormat, UnitctlError};
use crate::{OutputFormat, UnitctlError, eprint_error};
use unit_client_rs::unit_client::UnitClient;
pub async fn cmd(cli: &UnitCtl, output_format: OutputFormat) -> Result<(), UnitctlError> {
let control_socket = wait::wait_for_socket(cli).await?;
let client = UnitClient::new(control_socket);
client
.listeners()
.await
.map_err(|e| UnitctlError::UnitClientError { source: *e })
.and_then(|response| output_format.write_to_stdout(&response))
let socks = wait::wait_for_sockets(cli)
.await?;
let clients = socks.iter()
.map(|sock| UnitClient::new(sock.clone()));
for client in clients {
let _ = client.listeners()
.await
.map_err(|e| {
let err = UnitctlError::UnitClientError { source: *e };
eprint_error(&err);
std::process::exit(err.exit_code());
})
.and_then(|response| output_format.write_to_stdout(&response));
}
Ok(())
}

View file

@ -2,6 +2,7 @@ use crate::unitctl::UnitCtl;
use crate::wait;
use crate::UnitctlError;
use crate::requests::send_empty_body_deserialize_response;
use crate::unitctl_error::ControlSocketErrorKind;
use unit_client_rs::unit_client::UnitClient;
use tar::{Builder, Header};
use std::fs::File;
@ -12,13 +13,21 @@ pub async fn cmd(
cli: &UnitCtl,
filename: &String
) -> Result<(), UnitctlError> {
if cli.control_socket_addresses.is_some() &&
cli.control_socket_addresses.clone().unwrap().len() > 1 {
return Err(UnitctlError::ControlSocketError{
kind: ControlSocketErrorKind::General,
message: "too many control sockets. specify at most one.".to_string(),
});
}
let mut control_sockets = wait::wait_for_sockets(cli).await?;
let client = UnitClient::new(control_sockets.pop().unwrap());
if !filename.ends_with(".tar") {
eprintln!("Warning: writing uncompressed tarball to {}", filename);
}
let control_socket = wait::wait_for_socket(cli).await?;
let client = UnitClient::new(control_socket);
let config_res = serde_json::to_string_pretty(
&send_empty_body_deserialize_response(&client, "GET", "/config").await?
);

View file

@ -1,14 +1,23 @@
use crate::unitctl::UnitCtl;
use crate::wait;
use crate::{OutputFormat, UnitctlError};
use crate::{OutputFormat, UnitctlError, eprint_error};
use unit_client_rs::unit_client::UnitClient;
pub async fn cmd(cli: &UnitCtl, output_format: OutputFormat) -> Result<(), UnitctlError> {
let control_socket = wait::wait_for_socket(cli).await?;
let client = UnitClient::new(control_socket);
client
.status()
.await
.map_err(|e| UnitctlError::UnitClientError { source: *e })
.and_then(|response| output_format.write_to_stdout(&response))
let socks = wait::wait_for_sockets(cli)
.await?;
let clients = socks.iter()
.map(|sock| UnitClient::new(sock.clone()));
for client in clients {
let _ = client.status()
.await
.map_err(|e| {
let err = UnitctlError::UnitClientError { source: *e };
eprint_error(&err);
std::process::exit(err.exit_code());
})
.and_then(|response| output_format.write_to_stdout(&response));
}
Ok(())
}

View file

@ -15,8 +15,8 @@ use crate::cmd::{
};
use crate::output_format::OutputFormat;
use crate::unitctl::{Commands, UnitCtl};
use crate::unitctl_error::UnitctlError;
use unit_client_rs::unit_client::{UnitClient, UnitClientError, UnitSerializableMap};
use crate::unitctl_error::{UnitctlError, eprint_error};
use unit_client_rs::unit_client::{UnitClient, UnitSerializableMap};
mod cmd;
mod inputfile;
@ -58,56 +58,3 @@ async fn main() -> Result<(), UnitctlError> {
std::process::exit(error.exit_code());
})
}
fn eprint_error(error: &UnitctlError) {
match error {
UnitctlError::NoUnitInstancesError => {
eprintln!("No running unit instances found");
}
UnitctlError::MultipleUnitInstancesError { ref suggestion } => {
eprintln!("{}", suggestion);
}
UnitctlError::NoSocketPathError => {
eprintln!("Unable to detect socket path from running instance");
}
UnitctlError::UnitClientError { source } => match source {
UnitClientError::SocketPermissionsError { .. } => {
eprintln!("{}", source);
eprintln!("Try running again with the same permissions as the unit control socket");
}
UnitClientError::OpenAPIError { source } => {
eprintln!("OpenAPI Error: {}", source);
}
_ => {
eprintln!("Unit client error: {}", source);
}
},
UnitctlError::SerializationError { message } => {
eprintln!("Serialization error: {}", message);
}
UnitctlError::DeserializationError { message } => {
eprintln!("Deserialization error: {}", message);
}
UnitctlError::IoError { ref source } => {
eprintln!("IO error: {}", source);
}
UnitctlError::PathNotFound { path } => {
eprintln!("Path not found: {}", path);
}
UnitctlError::EditorError { message } => {
eprintln!("Error opening editor: {}", message);
}
UnitctlError::CertificateError { message } => {
eprintln!("Certificate error: {}", message);
}
UnitctlError::NoInputFileError => {
eprintln!("No input file specified when required");
}
UnitctlError::UiServerError { ref message } => {
eprintln!("UI server error: {}", message);
}
_ => {
eprintln!("{}", error);
}
}
}

View file

@ -16,7 +16,7 @@ pub(crate) struct UnitCtl {
value_parser = parse_control_socket_address,
help = "Path (unix:/var/run/unit/control.sock), tcp address with port (127.0.0.1:80), or URL"
)]
pub(crate) control_socket_address: Option<ControlSocket>,
pub(crate) control_socket_addresses: Option<Vec<ControlSocket>>,
#[arg(
required = false,
default_missing_value = "1",

View file

@ -70,3 +70,56 @@ impl Termination for UnitctlError {
ExitCode::from(self.exit_code() as u8)
}
}
pub fn eprint_error(error: &UnitctlError) {
match error {
UnitctlError::NoUnitInstancesError => {
eprintln!("No running unit instances found");
}
UnitctlError::MultipleUnitInstancesError { ref suggestion } => {
eprintln!("{}", suggestion);
}
UnitctlError::NoSocketPathError => {
eprintln!("Unable to detect socket path from running instance");
}
UnitctlError::UnitClientError { source } => match source {
UnitClientError::SocketPermissionsError { .. } => {
eprintln!("{}", source);
eprintln!("Try running again with the same permissions as the unit control socket");
}
UnitClientError::OpenAPIError { source } => {
eprintln!("OpenAPI Error: {}", source);
}
_ => {
eprintln!("Unit client error: {}", source);
}
},
UnitctlError::SerializationError { message } => {
eprintln!("Serialization error: {}", message);
}
UnitctlError::DeserializationError { message } => {
eprintln!("Deserialization error: {}", message);
}
UnitctlError::IoError { ref source } => {
eprintln!("IO error: {}", source);
}
UnitctlError::PathNotFound { path } => {
eprintln!("Path not found: {}", path);
}
UnitctlError::EditorError { message } => {
eprintln!("Error opening editor: {}", message);
}
UnitctlError::CertificateError { message } => {
eprintln!("Certificate error: {}", message);
}
UnitctlError::NoInputFileError => {
eprintln!("No input file specified when required");
}
UnitctlError::UiServerError { ref message } => {
eprintln!("UI server error: {}", message);
}
_ => {
eprintln!("{}", error);
}
}
}

View file

@ -8,105 +8,83 @@ use unit_client_rs::unitd_instance::UnitdInstance;
/// Waits for a socket to become available. Availability is tested by attempting to access the
/// status endpoint via the control socket. When socket is available, ControlSocket instance
/// is returned.
pub async fn wait_for_socket(cli: &UnitCtl) -> Result<ControlSocket, UnitctlError> {
// Don't wait, if wait_time is not specified
if cli.wait_time_seconds.is_none() {
return cli.control_socket_address.instance_value_if_none().await.and_validate();
pub async fn wait_for_sockets(cli: &UnitCtl) -> Result<Vec<ControlSocket>, UnitctlError> {
let socks: Vec<ControlSocket>;
match &cli.control_socket_addresses {
None => {
socks = vec![find_socket_address_from_instance().await?];
},
Some(s) => socks = s.clone(),
}
let wait_time =
Duration::from_secs(cli.wait_time_seconds.expect("wait_time_option default was not applied") as u64);
let max_tries = cli.wait_max_tries.expect("max_tries_option default was not applied");
let mut attempt: u8 = 0;
let mut control_socket: ControlSocket;
while attempt < max_tries {
if attempt > 0 {
eprintln!(
"Waiting for {}s control socket to be available try {}/{}...",
wait_time.as_secs(),
attempt + 1,
max_tries
);
std::thread::sleep(wait_time);
let mut mapped = vec![];
for addr in socks {
if cli.wait_time_seconds.is_none() {
mapped.push(addr.to_owned().validate()?);
continue;
}
attempt += 1;
let wait_time =
Duration::from_secs(cli.wait_time_seconds.expect("wait_time_option default was not applied") as u64);
let max_tries = cli.wait_max_tries.expect("max_tries_option default was not applied");
let result = cli.control_socket_address.instance_value_if_none().await.and_validate();
let mut attempt = 0;
while attempt < max_tries {
if attempt > 0 {
eprintln!(
"Waiting for {}s control socket to be available try {}/{}...",
wait_time.as_secs(),
attempt + 1,
max_tries
);
std::thread::sleep(wait_time);
}
if let Err(error) = result {
if error.retryable() {
continue;
attempt += 1;
let res = addr.to_owned().validate();
if res.is_err() {
let err = res.map_err(|error| match error {
UnitClientError::UnixSocketNotFound { .. } => UnitctlError::ControlSocketError {
kind: ControlSocketErrorKind::NotFound,
message: format!("{}", error),
},
UnitClientError::SocketPermissionsError { .. } => UnitctlError::ControlSocketError {
kind: ControlSocketErrorKind::Permissions,
message: format!("{}", error),
},
UnitClientError::TcpSocketAddressUriError { .. }
| UnitClientError::TcpSocketAddressNoPortError { .. }
| UnitClientError::TcpSocketAddressParseError { .. } => UnitctlError::ControlSocketError {
kind: ControlSocketErrorKind::Parse,
message: format!("{}", error),
},
_ => UnitctlError::ControlSocketError {
kind: ControlSocketErrorKind::General,
message: format!("{}", error),
},
});
if err.as_ref().is_err_and(|e| e.retryable()) {
continue;
} else {
return Err(err.expect_err("impossible error condition"));
}
} else {
return Err(error);
let sock = res.unwrap();
if let Err(e) = UnitClient::new(sock.clone()).status().await {
eprintln!("Unable to access status endpoint: {}", *e);
continue;
}
mapped.push(sock);
}
}
control_socket = result.unwrap();
let client = UnitClient::new(control_socket.clone());
match client.status().await {
Ok(_) => {
return Ok(control_socket.to_owned());
}
Err(error) => {
eprintln!("Unable to access status endpoint: {}", *error);
continue;
}
if attempt >= max_tries {
return Err(UnitctlError::WaitTimeoutError);
}
}
if attempt >= max_tries {
Err(UnitctlError::WaitTimeoutError)
} else {
panic!("Unexpected state - this should never happen");
}
}
trait OptionControlSocket {
async fn instance_value_if_none(&self) -> Result<ControlSocket, UnitctlError>;
}
impl OptionControlSocket for Option<ControlSocket> {
async fn instance_value_if_none(&self) -> Result<ControlSocket, UnitctlError> {
if let Some(control_socket) = self {
Ok(control_socket.to_owned())
} else {
find_socket_address_from_instance().await
}
}
}
trait ResultControlSocket<T, E> {
fn and_validate(self) -> Result<ControlSocket, UnitctlError>;
}
impl ResultControlSocket<ControlSocket, UnitctlError> for Result<ControlSocket, UnitctlError> {
fn and_validate(self) -> Result<ControlSocket, UnitctlError> {
self.and_then(|control_socket| {
control_socket.validate().map_err(|error| match error {
UnitClientError::UnixSocketNotFound { .. } => UnitctlError::ControlSocketError {
kind: ControlSocketErrorKind::NotFound,
message: format!("{}", error),
},
UnitClientError::SocketPermissionsError { .. } => UnitctlError::ControlSocketError {
kind: ControlSocketErrorKind::Permissions,
message: format!("{}", error),
},
UnitClientError::TcpSocketAddressUriError { .. }
| UnitClientError::TcpSocketAddressNoPortError { .. }
| UnitClientError::TcpSocketAddressParseError { .. } => UnitctlError::ControlSocketError {
kind: ControlSocketErrorKind::Parse,
message: format!("{}", error),
},
_ => UnitctlError::ControlSocketError {
kind: ControlSocketErrorKind::General,
message: format!("{}", error),
},
})
})
}
return Ok(mapped);
}
async fn find_socket_address_from_instance() -> Result<ControlSocket, UnitctlError> {
@ -114,7 +92,7 @@ async fn find_socket_address_from_instance() -> Result<ControlSocket, UnitctlErr
if instances.is_empty() {
return Err(UnitctlError::NoUnitInstancesError);
} else if instances.len() > 1 {
let suggestion: String = "Multiple unit instances found. Specify the socket address to the instance you wish \
let suggestion: String = "Multiple unit instances found. Specify the socket address(es) to the instance you wish \
to control using the `--control-socket-address` flag"
.to_string();
return Err(UnitctlError::MultipleUnitInstancesError { suggestion });
@ -131,14 +109,14 @@ async fn find_socket_address_from_instance() -> Result<ControlSocket, UnitctlErr
async fn wait_for_unavailable_unix_socket() {
let control_socket = ControlSocket::try_from("unix:/tmp/this_socket_does_not_exist.sock");
let cli = UnitCtl {
control_socket_address: Some(control_socket.unwrap()),
control_socket_addresses: Some(vec![control_socket.unwrap()]),
wait_time_seconds: Some(1u8),
wait_max_tries: Some(3u8),
command: crate::unitctl::Commands::Status {
output_format: crate::output_format::OutputFormat::JsonPretty,
},
};
let error = wait_for_socket(&cli)
let error = wait_for_sockets(&cli)
.await
.expect_err("Expected error, but no error received");
match error {
@ -151,7 +129,7 @@ async fn wait_for_unavailable_unix_socket() {
async fn wait_for_unavailable_tcp_socket() {
let control_socket = ControlSocket::try_from("http://127.0.0.1:9783456");
let cli = UnitCtl {
control_socket_address: Some(control_socket.unwrap()),
control_socket_addresses: Some(vec![control_socket.unwrap()]),
wait_time_seconds: Some(1u8),
wait_max_tries: Some(3u8),
command: crate::unitctl::Commands::Status {
@ -159,7 +137,7 @@ async fn wait_for_unavailable_tcp_socket() {
},
};
let error = wait_for_socket(&cli)
let error = wait_for_sockets(&cli)
.await
.expect_err("Expected error, but no error received");
match error {