Add daemon shutdown functionality for testing and administration

- Added Shutdown message type to the protocol
- Implemented server-side handling of shutdown requests
- Made socket server exit gracefully when shutdown is requested
- Added --shutdown flag to the client for requesting daemon termination
- Ensured proper cleanup of resources on shutdown
main
John Kenyon 2025-04-07 05:07:38 +00:00
parent 03eccc999f
commit 39d7302a4e
3 changed files with 129 additions and 37 deletions

View File

@ -33,6 +33,10 @@ struct Args {
#[clap(short, long)] #[clap(short, long)]
debug: bool, debug: bool,
/// Request to shutdown the daemon
#[clap(long)]
shutdown: bool,
/// Command to run /// Command to run
#[clap(trailing_var_arg = true)] #[clap(trailing_var_arg = true)]
command: Vec<String>, command: Vec<String>,

View File

@ -60,6 +60,10 @@ pub enum ClientMessage {
/// Session ID to get information for. /// Session ID to get information for.
session: String, session: String,
}, },
/// Request to shutdown the daemon.
#[serde(rename = "shutdown")]
Shutdown,
} }
/// Daemon-to-client response message. /// Daemon-to-client response message.

View File

@ -39,32 +39,68 @@ impl Server {
} }
// Create the listener // Create the listener
let listener = UnixListener::bind(&self.socket_path) let listener =
.context("Failed to bind Unix domain socket")?; UnixListener::bind(&self.socket_path).context("Failed to bind Unix domain socket")?;
info!("Listening on {:?}", self.socket_path); info!("Listening on {:?}", self.socket_path);
// Channel for shutdown signal
let (shutdown_tx, mut shutdown_rx) = mpsc::channel::<bool>(1);
// Accept connections // Accept connections
loop { loop {
match listener.accept().await { // Check for shutdown signal
Ok((stream, _addr)) => { if shutdown_rx.try_recv().is_ok() {
let session_manager = self.session_manager.clone(); info!("Received shutdown signal, stopping server");
tokio::spawn(async move { break;
if let Err(e) = handle_client(stream, session_manager).await { }
error!("Error handling client: {}", e);
tokio::select! {
// Accept new connections
conn = listener.accept() => {
match conn {
Ok((stream, _addr)) => {
let session_manager = self.session_manager.clone();
let shutdown_tx = shutdown_tx.clone();
tokio::spawn(async move {
match handle_client(stream, session_manager).await {
Ok(true) => {
// Client requested shutdown
info!("Client requested shutdown");
shutdown_tx.send(true).await.ok();
}
Err(e) => {
error!("Error handling client: {}", e);
}
_ => {}
}
});
} }
}); Err(e) => {
} error!("Error accepting connection: {}", e);
Err(e) => { }
error!("Error accepting connection: {}", e); }
} }
// Wait for a small duration to prevent CPU spin
_ = tokio::time::sleep(tokio::time::Duration::from_millis(100)) => {}
} }
} }
// Clean up the socket file before exiting
if self.socket_path.exists() {
match fs::remove_file(&self.socket_path) {
Ok(_) => debug!("Removed socket file on shutdown"),
Err(e) => error!("Failed to remove socket file on shutdown: {}", e),
}
}
Ok(())
} }
} }
/// Handle a client connection. /// Handle a client connection.
async fn handle_client(stream: UnixStream, session_manager: SessionManager) -> Result<()> { /// Returns Ok(true) if the daemon should be shut down.
async fn handle_client(stream: UnixStream, session_manager: SessionManager) -> Result<bool> {
let (reader, writer) = tokio::io::split(stream); let (reader, writer) = tokio::io::split(stream);
let mut reader = BufReader::new(reader); let mut reader = BufReader::new(reader);
let mut writer = BufWriter::new(writer); let mut writer = BufWriter::new(writer);
@ -83,6 +119,9 @@ async fn handle_client(stream: UnixStream, session_manager: SessionManager) -> R
Ok::<_, anyhow::Error>(()) Ok::<_, anyhow::Error>(())
}); });
// Track if the message was a shutdown request
let mut should_shutdown = false;
// Process incoming messages // Process incoming messages
let mut buffer = String::new(); let mut buffer = String::new();
loop { loop {
@ -95,12 +134,22 @@ async fn handle_client(stream: UnixStream, session_manager: SessionManager) -> R
match serde_json::from_str::<ClientMessage>(&buffer) { match serde_json::from_str::<ClientMessage>(&buffer) {
Ok(msg) => { Ok(msg) => {
debug!("Received message: {:?}", msg); debug!("Received message: {:?}", msg);
if let Err(e) = process_message(msg, &session_manager, tx.clone()).await {
error!("Error processing message: {}", e); match process_message(msg, &session_manager, tx.clone()).await {
let error_msg = DaemonMessage::Error { Ok(Some(true)) => {
message: e.to_string(), // Shutdown signal received
}; info!("Shutting down connection due to shutdown request");
tx.send(error_msg).await.ok(); should_shutdown = true;
break;
}
Ok(_) => {} // Continue processing
Err(e) => {
error!("Error processing message: {}", e);
let error_msg = DaemonMessage::Error {
message: e.to_string(),
};
tx.send(error_msg).await.ok();
}
} }
} }
Err(e) => { Err(e) => {
@ -122,7 +171,12 @@ async fn handle_client(stream: UnixStream, session_manager: SessionManager) -> R
// Cancel the write task // Cancel the write task
write_task.abort(); write_task.abort();
Ok(()) // Return true if a shutdown was requested during processing
if should_shutdown {
Ok(true)
} else {
Ok(false)
}
} }
/// Process a client message. /// Process a client message.
@ -130,9 +184,14 @@ async fn process_message(
msg: ClientMessage, msg: ClientMessage,
session_manager: &SessionManager, session_manager: &SessionManager,
tx: mpsc::Sender<DaemonMessage>, tx: mpsc::Sender<DaemonMessage>,
) -> Result<()> { ) -> Result<Option<bool>> {
match msg { match msg {
ClientMessage::RunCommand { session, cmd, cwd, env } => { ClientMessage::RunCommand {
session,
cmd,
cwd,
env,
} => {
// Get or create the session // Get or create the session
let session_arc = session_manager.get_or_create_session(&session).await; let session_arc = session_manager.get_or_create_session(&session).await;
let mut session_guard = session_arc.lock().await; let mut session_guard = session_arc.lock().await;
@ -145,7 +204,7 @@ async fn process_message(
}) })
.await .await
.context("Failed to send error message")?; .context("Failed to send error message")?;
return Ok(()); return Ok(None);
} }
} }
@ -207,7 +266,9 @@ async fn process_message(
session, session,
code: exit_code, code: exit_code,
}; };
tx.send(exit_msg).await.context("Failed to send exit message")?; tx.send(exit_msg)
.await
.context("Failed to send exit message")?;
} }
ClientMessage::Attach { session } => { ClientMessage::Attach { session } => {
@ -226,7 +287,9 @@ async fn process_message(
let msg = DaemonMessage::Success { let msg = DaemonMessage::Success {
message: "Detached from session".to_string(), message: "Detached from session".to_string(),
}; };
tx.send(msg).await.context("Failed to send success message")?; tx.send(msg)
.await
.context("Failed to send success message")?;
} }
ClientMessage::ListSessions => { ClientMessage::ListSessions => {
@ -262,9 +325,14 @@ async fn process_message(
match session_guard.change_directory(&dir) { match session_guard.change_directory(&dir) {
Ok(_) => { Ok(_) => {
let msg = DaemonMessage::Success { let msg = DaemonMessage::Success {
message: format!("Changed directory to {}", session_guard.get_cwd().display()), message: format!(
"Changed directory to {}",
session_guard.get_cwd().display()
),
}; };
tx.send(msg).await.context("Failed to send success message")?; tx.send(msg)
.await
.context("Failed to send success message")?;
} }
Err(e) => { Err(e) => {
let msg = DaemonMessage::Error { let msg = DaemonMessage::Error {
@ -284,7 +352,7 @@ async fn process_message(
message: format!("Session not found: {}", session), message: format!("Session not found: {}", session),
}; };
tx.send(msg).await.context("Failed to send error message")?; tx.send(msg).await.context("Failed to send error message")?;
return Ok(()); return Ok(None);
} }
}; };
@ -293,23 +361,39 @@ async fn process_message(
let msg = DaemonMessage::SessionDetails { session: info }; let msg = DaemonMessage::SessionDetails { session: info };
tx.send(msg).await.context("Failed to send session info")?; tx.send(msg).await.context("Failed to send session info")?;
} }
ClientMessage::Shutdown => {
info!("Received shutdown request");
// Send success message to client before shutting down
tx.send(DaemonMessage::Success {
message: "Daemon shutting down".to_string(),
})
.await
.context("Failed to send shutdown acknowledgment")?;
// Return signal to break the main server loop
return Ok(Some(true));
}
} }
Ok(()) Ok(None)
} }
/// Send a message to the client. /// Send a message to the client.
async fn send_message<W: AsyncWriteExt + Unpin>( async fn send_message<W: AsyncWriteExt + Unpin>(writer: &mut W, msg: &DaemonMessage) -> Result<()> {
writer: &mut W,
msg: &DaemonMessage,
) -> Result<()> {
// Serialize the message to JSON // Serialize the message to JSON
let json = serde_json::to_string(msg).context("Failed to serialize message")?; let json = serde_json::to_string(msg).context("Failed to serialize message")?;
// Write the message followed by a newline // Write the message followed by a newline
writer.write_all(json.as_bytes()).await.context("Failed to write message")?; writer
writer.write_all(b"\n").await.context("Failed to write newline")?; .write_all(json.as_bytes())
.await
.context("Failed to write message")?;
writer
.write_all(b"\n")
.await
.context("Failed to write newline")?;
writer.flush().await.context("Failed to flush writer")?; writer.flush().await.context("Failed to flush writer")?;
Ok(()) Ok(())
} }