Compare commits
4 Commits
cc76c5b5bd
...
16720f5ad2
| Author | SHA1 | Date |
|---|---|---|
|
|
16720f5ad2 | |
|
|
39d7302a4e | |
|
|
03eccc999f | |
|
|
e3fce1c0c7 |
|
|
@ -3,7 +3,7 @@
|
||||||
//! This is the client component of the Cypraea shell. It connects to the daemon,
|
//! This is the client component of the Cypraea shell. It connects to the daemon,
|
||||||
//! sends commands, and displays output to the user.
|
//! sends commands, and displays output to the user.
|
||||||
|
|
||||||
use anyhow::{Context, Result};
|
use anyhow::{anyhow, Context, Result};
|
||||||
use clap::Parser;
|
use clap::Parser;
|
||||||
use cypraea_common::paths;
|
use cypraea_common::paths;
|
||||||
use cypraea_common::protocol::{ClientMessage, DaemonMessage};
|
use cypraea_common::protocol::{ClientMessage, DaemonMessage};
|
||||||
|
|
@ -22,7 +22,7 @@ mod display;
|
||||||
#[clap(version, about, long_about = None)]
|
#[clap(version, about, long_about = None)]
|
||||||
struct Args {
|
struct Args {
|
||||||
/// Path to the Unix domain socket
|
/// Path to the Unix domain socket
|
||||||
#[clap(short, long)]
|
#[clap(short = 'S', long)]
|
||||||
socket: Option<String>,
|
socket: Option<String>,
|
||||||
|
|
||||||
/// Session to connect to
|
/// Session to connect to
|
||||||
|
|
@ -33,6 +33,10 @@ struct Args {
|
||||||
#[clap(short, long)]
|
#[clap(short, long)]
|
||||||
debug: bool,
|
debug: bool,
|
||||||
|
|
||||||
|
/// Request to shutdown the daemon
|
||||||
|
#[clap(long)]
|
||||||
|
shutdown: bool,
|
||||||
|
|
||||||
/// Command to run
|
/// Command to run
|
||||||
#[clap(trailing_var_arg = true)]
|
#[clap(trailing_var_arg = true)]
|
||||||
command: Vec<String>,
|
command: Vec<String>,
|
||||||
|
|
@ -62,13 +66,73 @@ async fn main() -> Result<()> {
|
||||||
.to_string(),
|
.to_string(),
|
||||||
};
|
};
|
||||||
|
|
||||||
// Connect to the daemon
|
// Handle shutdown request if specified
|
||||||
let stream = UnixStream::connect(&socket_path)
|
if args.shutdown {
|
||||||
.await
|
// First try to connect to the daemon
|
||||||
.context("Failed to connect to daemon")?;
|
match UnixStream::connect(&socket_path).await {
|
||||||
|
Ok(stream) => {
|
||||||
info!("Connected to daemon at {}", socket_path);
|
info!("Connected to daemon at {}", socket_path);
|
||||||
|
|
||||||
|
// Send the shutdown message
|
||||||
|
let mut writer = tokio::io::BufWriter::new(stream);
|
||||||
|
let shutdown_msg = ClientMessage::Shutdown;
|
||||||
|
send_message(&mut writer, &shutdown_msg).await
|
||||||
|
.context("Failed to send shutdown message")?;
|
||||||
|
|
||||||
|
println!("Daemon shutdown requested");
|
||||||
|
return Ok(());
|
||||||
|
},
|
||||||
|
Err(_) => {
|
||||||
|
println!("No daemon running at {}", socket_path);
|
||||||
|
return Ok(());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Try to connect to the daemon
|
||||||
|
let stream = match UnixStream::connect(&socket_path).await {
|
||||||
|
Ok(stream) => {
|
||||||
|
info!("Connected to daemon at {}", socket_path);
|
||||||
|
stream
|
||||||
|
},
|
||||||
|
Err(_e) => {
|
||||||
|
// Daemon not running, try to start it
|
||||||
|
info!("Daemon not running, attempting to start it");
|
||||||
|
|
||||||
|
// Find the daemon binary (should be in the same directory as the client)
|
||||||
|
let current_exe = std::env::current_exe()
|
||||||
|
.context("Failed to get current executable path")?;
|
||||||
|
let daemon_dir = current_exe.parent()
|
||||||
|
.context("Failed to get parent directory of current executable")?;
|
||||||
|
let daemon_path = daemon_dir.join("cypraeadd");
|
||||||
|
|
||||||
|
if !daemon_path.exists() {
|
||||||
|
return Err(anyhow!("Daemon binary not found at {}", daemon_path.display()));
|
||||||
|
}
|
||||||
|
|
||||||
|
// Start the daemon process
|
||||||
|
let _daemon_process = tokio::process::Command::new(&daemon_path)
|
||||||
|
.stdout(std::process::Stdio::null())
|
||||||
|
.stderr(std::process::Stdio::null())
|
||||||
|
.spawn()
|
||||||
|
.context("Failed to start daemon process")?;
|
||||||
|
|
||||||
|
// Give the daemon a moment to start up
|
||||||
|
tokio::time::sleep(tokio::time::Duration::from_millis(500)).await;
|
||||||
|
|
||||||
|
// Try to connect again
|
||||||
|
match UnixStream::connect(&socket_path).await {
|
||||||
|
Ok(stream) => {
|
||||||
|
info!("Successfully connected to newly started daemon");
|
||||||
|
stream
|
||||||
|
},
|
||||||
|
Err(e) => {
|
||||||
|
return Err(anyhow!("Failed to connect to daemon after starting it: {}", e));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
// Split the stream into reader and writer
|
// Split the stream into reader and writer
|
||||||
let (reader, writer) = tokio::io::split(stream);
|
let (reader, writer) = tokio::io::split(stream);
|
||||||
let mut reader = BufReader::new(reader);
|
let mut reader = BufReader::new(reader);
|
||||||
|
|
|
||||||
|
|
@ -6,9 +6,17 @@ use xdg::BaseDirectories;
|
||||||
|
|
||||||
/// Get the path to the Cypraea socket.
|
/// Get the path to the Cypraea socket.
|
||||||
pub fn socket_path() -> Result<PathBuf> {
|
pub fn socket_path() -> Result<PathBuf> {
|
||||||
// Use XDG_RUNTIME_DIR if available, or fall back to a temporary directory
|
// Try to use XDG_RUNTIME_DIR if available
|
||||||
let xdg = BaseDirectories::new().context("Failed to initialize XDG base directories")?;
|
if let Ok(xdg) = BaseDirectories::new() {
|
||||||
Ok(xdg.place_runtime_file("cypraea.sock")?)
|
if let Ok(path) = xdg.place_runtime_file("cypraea.sock") {
|
||||||
|
return Ok(path);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Fallback to a temporary directory if XDG_RUNTIME_DIR is not set
|
||||||
|
let tmp_dir = std::env::temp_dir();
|
||||||
|
let socket_path = tmp_dir.join("cypraea.sock");
|
||||||
|
Ok(socket_path)
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Get the path to the Cypraea database directory.
|
/// Get the path to the Cypraea database directory.
|
||||||
|
|
|
||||||
|
|
@ -60,6 +60,10 @@ pub enum ClientMessage {
|
||||||
/// Session ID to get information for.
|
/// Session ID to get information for.
|
||||||
session: String,
|
session: String,
|
||||||
},
|
},
|
||||||
|
|
||||||
|
/// Request to shutdown the daemon.
|
||||||
|
#[serde(rename = "shutdown")]
|
||||||
|
Shutdown,
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Daemon-to-client response message.
|
/// Daemon-to-client response message.
|
||||||
|
|
|
||||||
|
|
@ -39,20 +39,41 @@ impl Server {
|
||||||
}
|
}
|
||||||
|
|
||||||
// Create the listener
|
// Create the listener
|
||||||
let listener = UnixListener::bind(&self.socket_path)
|
let listener =
|
||||||
.context("Failed to bind Unix domain socket")?;
|
UnixListener::bind(&self.socket_path).context("Failed to bind Unix domain socket")?;
|
||||||
|
|
||||||
info!("Listening on {:?}", self.socket_path);
|
info!("Listening on {:?}", self.socket_path);
|
||||||
|
|
||||||
|
// Channel for shutdown signal
|
||||||
|
let (shutdown_tx, mut shutdown_rx) = mpsc::channel::<bool>(1);
|
||||||
|
|
||||||
// Accept connections
|
// Accept connections
|
||||||
loop {
|
loop {
|
||||||
match listener.accept().await {
|
// Check for shutdown signal
|
||||||
|
if shutdown_rx.try_recv().is_ok() {
|
||||||
|
info!("Received shutdown signal, stopping server");
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
tokio::select! {
|
||||||
|
// Accept new connections
|
||||||
|
conn = listener.accept() => {
|
||||||
|
match conn {
|
||||||
Ok((stream, _addr)) => {
|
Ok((stream, _addr)) => {
|
||||||
let session_manager = self.session_manager.clone();
|
let session_manager = self.session_manager.clone();
|
||||||
|
let shutdown_tx = shutdown_tx.clone();
|
||||||
tokio::spawn(async move {
|
tokio::spawn(async move {
|
||||||
if let Err(e) = handle_client(stream, session_manager).await {
|
match handle_client(stream, session_manager).await {
|
||||||
|
Ok(true) => {
|
||||||
|
// Client requested shutdown
|
||||||
|
info!("Client requested shutdown");
|
||||||
|
shutdown_tx.send(true).await.ok();
|
||||||
|
}
|
||||||
|
Err(e) => {
|
||||||
error!("Error handling client: {}", e);
|
error!("Error handling client: {}", e);
|
||||||
}
|
}
|
||||||
|
_ => {}
|
||||||
|
}
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
Err(e) => {
|
Err(e) => {
|
||||||
|
|
@ -60,11 +81,26 @@ impl Server {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
// Wait for a small duration to prevent CPU spin
|
||||||
|
_ = tokio::time::sleep(tokio::time::Duration::from_millis(100)) => {}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Clean up the socket file before exiting
|
||||||
|
if self.socket_path.exists() {
|
||||||
|
match fs::remove_file(&self.socket_path) {
|
||||||
|
Ok(_) => debug!("Removed socket file on shutdown"),
|
||||||
|
Err(e) => error!("Failed to remove socket file on shutdown: {}", e),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(())
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Handle a client connection.
|
/// Handle a client connection.
|
||||||
async fn handle_client(stream: UnixStream, session_manager: SessionManager) -> Result<()> {
|
/// Returns Ok(true) if the daemon should be shut down.
|
||||||
|
async fn handle_client(stream: UnixStream, session_manager: SessionManager) -> Result<bool> {
|
||||||
let (reader, writer) = tokio::io::split(stream);
|
let (reader, writer) = tokio::io::split(stream);
|
||||||
let mut reader = BufReader::new(reader);
|
let mut reader = BufReader::new(reader);
|
||||||
let mut writer = BufWriter::new(writer);
|
let mut writer = BufWriter::new(writer);
|
||||||
|
|
@ -83,6 +119,9 @@ async fn handle_client(stream: UnixStream, session_manager: SessionManager) -> R
|
||||||
Ok::<_, anyhow::Error>(())
|
Ok::<_, anyhow::Error>(())
|
||||||
});
|
});
|
||||||
|
|
||||||
|
// Track if the message was a shutdown request
|
||||||
|
let mut should_shutdown = false;
|
||||||
|
|
||||||
// Process incoming messages
|
// Process incoming messages
|
||||||
let mut buffer = String::new();
|
let mut buffer = String::new();
|
||||||
loop {
|
loop {
|
||||||
|
|
@ -95,7 +134,16 @@ async fn handle_client(stream: UnixStream, session_manager: SessionManager) -> R
|
||||||
match serde_json::from_str::<ClientMessage>(&buffer) {
|
match serde_json::from_str::<ClientMessage>(&buffer) {
|
||||||
Ok(msg) => {
|
Ok(msg) => {
|
||||||
debug!("Received message: {:?}", msg);
|
debug!("Received message: {:?}", msg);
|
||||||
if let Err(e) = process_message(msg, &session_manager, tx.clone()).await {
|
|
||||||
|
match process_message(msg, &session_manager, tx.clone()).await {
|
||||||
|
Ok(Some(true)) => {
|
||||||
|
// Shutdown signal received
|
||||||
|
info!("Shutting down connection due to shutdown request");
|
||||||
|
should_shutdown = true;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
Ok(_) => {} // Continue processing
|
||||||
|
Err(e) => {
|
||||||
error!("Error processing message: {}", e);
|
error!("Error processing message: {}", e);
|
||||||
let error_msg = DaemonMessage::Error {
|
let error_msg = DaemonMessage::Error {
|
||||||
message: e.to_string(),
|
message: e.to_string(),
|
||||||
|
|
@ -103,6 +151,7 @@ async fn handle_client(stream: UnixStream, session_manager: SessionManager) -> R
|
||||||
tx.send(error_msg).await.ok();
|
tx.send(error_msg).await.ok();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
}
|
||||||
Err(e) => {
|
Err(e) => {
|
||||||
error!("Failed to parse client message: {}", e);
|
error!("Failed to parse client message: {}", e);
|
||||||
let error_msg = DaemonMessage::Error {
|
let error_msg = DaemonMessage::Error {
|
||||||
|
|
@ -122,7 +171,12 @@ async fn handle_client(stream: UnixStream, session_manager: SessionManager) -> R
|
||||||
// Cancel the write task
|
// Cancel the write task
|
||||||
write_task.abort();
|
write_task.abort();
|
||||||
|
|
||||||
Ok(())
|
// Return true if a shutdown was requested during processing
|
||||||
|
if should_shutdown {
|
||||||
|
Ok(true)
|
||||||
|
} else {
|
||||||
|
Ok(false)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Process a client message.
|
/// Process a client message.
|
||||||
|
|
@ -130,9 +184,14 @@ async fn process_message(
|
||||||
msg: ClientMessage,
|
msg: ClientMessage,
|
||||||
session_manager: &SessionManager,
|
session_manager: &SessionManager,
|
||||||
tx: mpsc::Sender<DaemonMessage>,
|
tx: mpsc::Sender<DaemonMessage>,
|
||||||
) -> Result<()> {
|
) -> Result<Option<bool>> {
|
||||||
match msg {
|
match msg {
|
||||||
ClientMessage::RunCommand { session, cmd, cwd, env } => {
|
ClientMessage::RunCommand {
|
||||||
|
session,
|
||||||
|
cmd,
|
||||||
|
cwd,
|
||||||
|
env,
|
||||||
|
} => {
|
||||||
// Get or create the session
|
// Get or create the session
|
||||||
let session_arc = session_manager.get_or_create_session(&session).await;
|
let session_arc = session_manager.get_or_create_session(&session).await;
|
||||||
let mut session_guard = session_arc.lock().await;
|
let mut session_guard = session_arc.lock().await;
|
||||||
|
|
@ -145,7 +204,7 @@ async fn process_message(
|
||||||
})
|
})
|
||||||
.await
|
.await
|
||||||
.context("Failed to send error message")?;
|
.context("Failed to send error message")?;
|
||||||
return Ok(());
|
return Ok(None);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -207,7 +266,9 @@ async fn process_message(
|
||||||
session,
|
session,
|
||||||
code: exit_code,
|
code: exit_code,
|
||||||
};
|
};
|
||||||
tx.send(exit_msg).await.context("Failed to send exit message")?;
|
tx.send(exit_msg)
|
||||||
|
.await
|
||||||
|
.context("Failed to send exit message")?;
|
||||||
}
|
}
|
||||||
|
|
||||||
ClientMessage::Attach { session } => {
|
ClientMessage::Attach { session } => {
|
||||||
|
|
@ -226,7 +287,9 @@ async fn process_message(
|
||||||
let msg = DaemonMessage::Success {
|
let msg = DaemonMessage::Success {
|
||||||
message: "Detached from session".to_string(),
|
message: "Detached from session".to_string(),
|
||||||
};
|
};
|
||||||
tx.send(msg).await.context("Failed to send success message")?;
|
tx.send(msg)
|
||||||
|
.await
|
||||||
|
.context("Failed to send success message")?;
|
||||||
}
|
}
|
||||||
|
|
||||||
ClientMessage::ListSessions => {
|
ClientMessage::ListSessions => {
|
||||||
|
|
@ -262,9 +325,14 @@ async fn process_message(
|
||||||
match session_guard.change_directory(&dir) {
|
match session_guard.change_directory(&dir) {
|
||||||
Ok(_) => {
|
Ok(_) => {
|
||||||
let msg = DaemonMessage::Success {
|
let msg = DaemonMessage::Success {
|
||||||
message: format!("Changed directory to {}", session_guard.get_cwd().display()),
|
message: format!(
|
||||||
|
"Changed directory to {}",
|
||||||
|
session_guard.get_cwd().display()
|
||||||
|
),
|
||||||
};
|
};
|
||||||
tx.send(msg).await.context("Failed to send success message")?;
|
tx.send(msg)
|
||||||
|
.await
|
||||||
|
.context("Failed to send success message")?;
|
||||||
}
|
}
|
||||||
Err(e) => {
|
Err(e) => {
|
||||||
let msg = DaemonMessage::Error {
|
let msg = DaemonMessage::Error {
|
||||||
|
|
@ -284,7 +352,7 @@ async fn process_message(
|
||||||
message: format!("Session not found: {}", session),
|
message: format!("Session not found: {}", session),
|
||||||
};
|
};
|
||||||
tx.send(msg).await.context("Failed to send error message")?;
|
tx.send(msg).await.context("Failed to send error message")?;
|
||||||
return Ok(());
|
return Ok(None);
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
|
|
@ -293,22 +361,38 @@ async fn process_message(
|
||||||
let msg = DaemonMessage::SessionDetails { session: info };
|
let msg = DaemonMessage::SessionDetails { session: info };
|
||||||
tx.send(msg).await.context("Failed to send session info")?;
|
tx.send(msg).await.context("Failed to send session info")?;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
ClientMessage::Shutdown => {
|
||||||
|
info!("Received shutdown request");
|
||||||
|
// Send success message to client before shutting down
|
||||||
|
tx.send(DaemonMessage::Success {
|
||||||
|
message: "Daemon shutting down".to_string(),
|
||||||
|
})
|
||||||
|
.await
|
||||||
|
.context("Failed to send shutdown acknowledgment")?;
|
||||||
|
|
||||||
|
// Return signal to break the main server loop
|
||||||
|
return Ok(Some(true));
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
Ok(())
|
Ok(None)
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Send a message to the client.
|
/// Send a message to the client.
|
||||||
async fn send_message<W: AsyncWriteExt + Unpin>(
|
async fn send_message<W: AsyncWriteExt + Unpin>(writer: &mut W, msg: &DaemonMessage) -> Result<()> {
|
||||||
writer: &mut W,
|
|
||||||
msg: &DaemonMessage,
|
|
||||||
) -> Result<()> {
|
|
||||||
// Serialize the message to JSON
|
// Serialize the message to JSON
|
||||||
let json = serde_json::to_string(msg).context("Failed to serialize message")?;
|
let json = serde_json::to_string(msg).context("Failed to serialize message")?;
|
||||||
|
|
||||||
// Write the message followed by a newline
|
// Write the message followed by a newline
|
||||||
writer.write_all(json.as_bytes()).await.context("Failed to write message")?;
|
writer
|
||||||
writer.write_all(b"\n").await.context("Failed to write newline")?;
|
.write_all(json.as_bytes())
|
||||||
|
.await
|
||||||
|
.context("Failed to write message")?;
|
||||||
|
writer
|
||||||
|
.write_all(b"\n")
|
||||||
|
.await
|
||||||
|
.context("Failed to write newline")?;
|
||||||
writer.flush().await.context("Failed to flush writer")?;
|
writer.flush().await.context("Failed to flush writer")?;
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue