Compare commits

..

No commits in common. "16720f5ad22e4c6b811db11ff320d75dd0cc800e" and "cc76c5b5bd05562c575067ee07f98c0905fd6719" have entirely different histories.

4 changed files with 54 additions and 214 deletions

View File

@ -3,7 +3,7 @@
//! This is the client component of the Cypraea shell. It connects to the daemon,
//! sends commands, and displays output to the user.
use anyhow::{anyhow, Context, Result};
use anyhow::{Context, Result};
use clap::Parser;
use cypraea_common::paths;
use cypraea_common::protocol::{ClientMessage, DaemonMessage};
@ -22,7 +22,7 @@ mod display;
#[clap(version, about, long_about = None)]
struct Args {
/// Path to the Unix domain socket
#[clap(short = 'S', long)]
#[clap(short, long)]
socket: Option<String>,
/// Session to connect to
@ -33,10 +33,6 @@ struct Args {
#[clap(short, long)]
debug: bool,
/// Request to shutdown the daemon
#[clap(long)]
shutdown: bool,
/// Command to run
#[clap(trailing_var_arg = true)]
command: Vec<String>,
@ -66,72 +62,12 @@ async fn main() -> Result<()> {
.to_string(),
};
// Handle shutdown request if specified
if args.shutdown {
// First try to connect to the daemon
match UnixStream::connect(&socket_path).await {
Ok(stream) => {
info!("Connected to daemon at {}", socket_path);
// Connect to the daemon
let stream = UnixStream::connect(&socket_path)
.await
.context("Failed to connect to daemon")?;
// Send the shutdown message
let mut writer = tokio::io::BufWriter::new(stream);
let shutdown_msg = ClientMessage::Shutdown;
send_message(&mut writer, &shutdown_msg).await
.context("Failed to send shutdown message")?;
println!("Daemon shutdown requested");
return Ok(());
},
Err(_) => {
println!("No daemon running at {}", socket_path);
return Ok(());
}
}
}
// Try to connect to the daemon
let stream = match UnixStream::connect(&socket_path).await {
Ok(stream) => {
info!("Connected to daemon at {}", socket_path);
stream
},
Err(_e) => {
// Daemon not running, try to start it
info!("Daemon not running, attempting to start it");
// Find the daemon binary (should be in the same directory as the client)
let current_exe = std::env::current_exe()
.context("Failed to get current executable path")?;
let daemon_dir = current_exe.parent()
.context("Failed to get parent directory of current executable")?;
let daemon_path = daemon_dir.join("cypraeadd");
if !daemon_path.exists() {
return Err(anyhow!("Daemon binary not found at {}", daemon_path.display()));
}
// Start the daemon process
let _daemon_process = tokio::process::Command::new(&daemon_path)
.stdout(std::process::Stdio::null())
.stderr(std::process::Stdio::null())
.spawn()
.context("Failed to start daemon process")?;
// Give the daemon a moment to start up
tokio::time::sleep(tokio::time::Duration::from_millis(500)).await;
// Try to connect again
match UnixStream::connect(&socket_path).await {
Ok(stream) => {
info!("Successfully connected to newly started daemon");
stream
},
Err(e) => {
return Err(anyhow!("Failed to connect to daemon after starting it: {}", e));
}
}
}
};
info!("Connected to daemon at {}", socket_path);
// Split the stream into reader and writer
let (reader, writer) = tokio::io::split(stream);

View File

@ -6,17 +6,9 @@ use xdg::BaseDirectories;
/// Get the path to the Cypraea socket.
pub fn socket_path() -> Result<PathBuf> {
// Try to use XDG_RUNTIME_DIR if available
if let Ok(xdg) = BaseDirectories::new() {
if let Ok(path) = xdg.place_runtime_file("cypraea.sock") {
return Ok(path);
}
}
// Fallback to a temporary directory if XDG_RUNTIME_DIR is not set
let tmp_dir = std::env::temp_dir();
let socket_path = tmp_dir.join("cypraea.sock");
Ok(socket_path)
// Use XDG_RUNTIME_DIR if available, or fall back to a temporary directory
let xdg = BaseDirectories::new().context("Failed to initialize XDG base directories")?;
Ok(xdg.place_runtime_file("cypraea.sock")?)
}
/// Get the path to the Cypraea database directory.

View File

@ -60,10 +60,6 @@ pub enum ClientMessage {
/// Session ID to get information for.
session: String,
},
/// Request to shutdown the daemon.
#[serde(rename = "shutdown")]
Shutdown,
}
/// Daemon-to-client response message.

View File

@ -39,68 +39,32 @@ impl Server {
}
// Create the listener
let listener =
UnixListener::bind(&self.socket_path).context("Failed to bind Unix domain socket")?;
let listener = UnixListener::bind(&self.socket_path)
.context("Failed to bind Unix domain socket")?;
info!("Listening on {:?}", self.socket_path);
// Channel for shutdown signal
let (shutdown_tx, mut shutdown_rx) = mpsc::channel::<bool>(1);
// Accept connections
loop {
// Check for shutdown signal
if shutdown_rx.try_recv().is_ok() {
info!("Received shutdown signal, stopping server");
break;
}
tokio::select! {
// Accept new connections
conn = listener.accept() => {
match conn {
Ok((stream, _addr)) => {
let session_manager = self.session_manager.clone();
let shutdown_tx = shutdown_tx.clone();
tokio::spawn(async move {
match handle_client(stream, session_manager).await {
Ok(true) => {
// Client requested shutdown
info!("Client requested shutdown");
shutdown_tx.send(true).await.ok();
}
Err(e) => {
error!("Error handling client: {}", e);
}
_ => {}
}
});
match listener.accept().await {
Ok((stream, _addr)) => {
let session_manager = self.session_manager.clone();
tokio::spawn(async move {
if let Err(e) = handle_client(stream, session_manager).await {
error!("Error handling client: {}", e);
}
Err(e) => {
error!("Error accepting connection: {}", e);
}
}
});
}
Err(e) => {
error!("Error accepting connection: {}", e);
}
// Wait for a small duration to prevent CPU spin
_ = tokio::time::sleep(tokio::time::Duration::from_millis(100)) => {}
}
}
// Clean up the socket file before exiting
if self.socket_path.exists() {
match fs::remove_file(&self.socket_path) {
Ok(_) => debug!("Removed socket file on shutdown"),
Err(e) => error!("Failed to remove socket file on shutdown: {}", e),
}
}
Ok(())
}
}
/// Handle a client connection.
/// Returns Ok(true) if the daemon should be shut down.
async fn handle_client(stream: UnixStream, session_manager: SessionManager) -> Result<bool> {
async fn handle_client(stream: UnixStream, session_manager: SessionManager) -> Result<()> {
let (reader, writer) = tokio::io::split(stream);
let mut reader = BufReader::new(reader);
let mut writer = BufWriter::new(writer);
@ -119,9 +83,6 @@ async fn handle_client(stream: UnixStream, session_manager: SessionManager) -> R
Ok::<_, anyhow::Error>(())
});
// Track if the message was a shutdown request
let mut should_shutdown = false;
// Process incoming messages
let mut buffer = String::new();
loop {
@ -134,22 +95,12 @@ async fn handle_client(stream: UnixStream, session_manager: SessionManager) -> R
match serde_json::from_str::<ClientMessage>(&buffer) {
Ok(msg) => {
debug!("Received message: {:?}", msg);
match process_message(msg, &session_manager, tx.clone()).await {
Ok(Some(true)) => {
// Shutdown signal received
info!("Shutting down connection due to shutdown request");
should_shutdown = true;
break;
}
Ok(_) => {} // Continue processing
Err(e) => {
error!("Error processing message: {}", e);
let error_msg = DaemonMessage::Error {
message: e.to_string(),
};
tx.send(error_msg).await.ok();
}
if let Err(e) = process_message(msg, &session_manager, tx.clone()).await {
error!("Error processing message: {}", e);
let error_msg = DaemonMessage::Error {
message: e.to_string(),
};
tx.send(error_msg).await.ok();
}
}
Err(e) => {
@ -171,12 +122,7 @@ async fn handle_client(stream: UnixStream, session_manager: SessionManager) -> R
// Cancel the write task
write_task.abort();
// Return true if a shutdown was requested during processing
if should_shutdown {
Ok(true)
} else {
Ok(false)
}
Ok(())
}
/// Process a client message.
@ -184,14 +130,9 @@ async fn process_message(
msg: ClientMessage,
session_manager: &SessionManager,
tx: mpsc::Sender<DaemonMessage>,
) -> Result<Option<bool>> {
) -> Result<()> {
match msg {
ClientMessage::RunCommand {
session,
cmd,
cwd,
env,
} => {
ClientMessage::RunCommand { session, cmd, cwd, env } => {
// Get or create the session
let session_arc = session_manager.get_or_create_session(&session).await;
let mut session_guard = session_arc.lock().await;
@ -204,7 +145,7 @@ async fn process_message(
})
.await
.context("Failed to send error message")?;
return Ok(None);
return Ok(());
}
}
@ -266,9 +207,7 @@ async fn process_message(
session,
code: exit_code,
};
tx.send(exit_msg)
.await
.context("Failed to send exit message")?;
tx.send(exit_msg).await.context("Failed to send exit message")?;
}
ClientMessage::Attach { session } => {
@ -287,9 +226,7 @@ async fn process_message(
let msg = DaemonMessage::Success {
message: "Detached from session".to_string(),
};
tx.send(msg)
.await
.context("Failed to send success message")?;
tx.send(msg).await.context("Failed to send success message")?;
}
ClientMessage::ListSessions => {
@ -325,14 +262,9 @@ async fn process_message(
match session_guard.change_directory(&dir) {
Ok(_) => {
let msg = DaemonMessage::Success {
message: format!(
"Changed directory to {}",
session_guard.get_cwd().display()
),
message: format!("Changed directory to {}", session_guard.get_cwd().display()),
};
tx.send(msg)
.await
.context("Failed to send success message")?;
tx.send(msg).await.context("Failed to send success message")?;
}
Err(e) => {
let msg = DaemonMessage::Error {
@ -352,7 +284,7 @@ async fn process_message(
message: format!("Session not found: {}", session),
};
tx.send(msg).await.context("Failed to send error message")?;
return Ok(None);
return Ok(());
}
};
@ -361,38 +293,22 @@ async fn process_message(
let msg = DaemonMessage::SessionDetails { session: info };
tx.send(msg).await.context("Failed to send session info")?;
}
ClientMessage::Shutdown => {
info!("Received shutdown request");
// Send success message to client before shutting down
tx.send(DaemonMessage::Success {
message: "Daemon shutting down".to_string(),
})
.await
.context("Failed to send shutdown acknowledgment")?;
// Return signal to break the main server loop
return Ok(Some(true));
}
}
Ok(None)
Ok(())
}
/// Send a message to the client.
async fn send_message<W: AsyncWriteExt + Unpin>(writer: &mut W, msg: &DaemonMessage) -> Result<()> {
async fn send_message<W: AsyncWriteExt + Unpin>(
writer: &mut W,
msg: &DaemonMessage,
) -> Result<()> {
// Serialize the message to JSON
let json = serde_json::to_string(msg).context("Failed to serialize message")?;
// Write the message followed by a newline
writer
.write_all(json.as_bytes())
.await
.context("Failed to write message")?;
writer
.write_all(b"\n")
.await
.context("Failed to write newline")?;
writer.write_all(json.as_bytes()).await.context("Failed to write message")?;
writer.write_all(b"\n").await.context("Failed to write newline")?;
writer.flush().await.context("Failed to flush writer")?;
Ok(())