//! Cypraea Client (cypraea) //! //! This is the client component of the Cypraea shell. It connects to the daemon, //! sends commands, and displays output to the user. use anyhow::{Context, Result}; use clap::Parser; use cypraea_common::paths; use cypraea_common::protocol::{ClientMessage, DaemonMessage}; use rustyline::error::ReadlineError; use rustyline::DefaultEditor; //use serde_json::Deserializer; use std::io::{self, Write}; use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader}; use tokio::net::UnixStream; use tokio::sync::mpsc; use tracing::{debug, error, info /*,warn*/}; mod display; #[derive(Parser, Debug)] #[clap(version, about, long_about = None)] struct Args { /// Path to the Unix domain socket #[clap(short, long)] socket: Option, /// Session to connect to #[clap(short, long, default_value = "default")] session: String, /// Enable debug logging #[clap(short, long)] debug: bool, /// Command to run #[clap(trailing_var_arg = true)] command: Vec, } #[tokio::main] async fn main() -> Result<()> { let args = Args::parse(); // Setup logging let log_level = if args.debug { tracing::Level::DEBUG } else { tracing::Level::INFO }; tracing_subscriber::fmt() .with_max_level(log_level) .init(); // Get socket path let socket_path = match args.socket { Some(path) => path, None => paths::socket_path() .context("Failed to get socket path")? .to_string_lossy() .to_string(), }; // Connect to the daemon let stream = UnixStream::connect(&socket_path) .await .context("Failed to connect to daemon")?; info!("Connected to daemon at {}", socket_path); // Split the stream into reader and writer let (reader, writer) = tokio::io::split(stream); let mut reader = BufReader::new(reader); let mut writer = tokio::io::BufWriter::new(writer); // Create a channel for message handling let (tx, mut rx) = mpsc::channel::(100); // Spawn a task to read messages from the daemon let read_task = tokio::spawn(async move { let mut buffer = String::new(); loop { // Read a line from the stream buffer.clear(); match reader.read_line(&mut buffer).await { Ok(0) => break, // End of stream Ok(_) => { // Parse the message match serde_json::from_str::(&buffer) { Ok(msg) => { debug!("Received message: {:?}", msg); if tx.send(msg).await.is_err() { break; } } Err(e) => { error!("Error parsing message: {}", e); // Continue trying to parse other messages } } } Err(e) => { error!("Error reading from daemon: {}", e); break; } } } }); // Send an attach message to the daemon let attach_msg = ClientMessage::Attach { session: args.session.clone(), }; send_message(&mut writer, &attach_msg) .await .context("Failed to send attach message")?; // If a command was provided, run it and exit if !args.command.is_empty() { let cmd = args.command.join(" "); let run_msg = ClientMessage::RunCommand { session: args.session.clone(), cmd, cwd: None, env: None, }; send_message(&mut writer, &run_msg) .await .context("Failed to send run message")?; // Process messages until we get an exit code while let Some(msg) = rx.recv().await { match msg { DaemonMessage::Stdout { data, .. } => { print!("{}", data); io::stdout().flush()?; } DaemonMessage::Stderr { data, .. } => { eprint!("{}", data); io::stderr().flush()?; } DaemonMessage::Exit { code, .. } => { std::process::exit(code); } DaemonMessage::Error { message } => { eprintln!("Error: {}", message); std::process::exit(1); } _ => {} } } } else { // Start an interactive REPL run_repl(args.session, writer, rx).await? } // Cancel the read task read_task.abort(); Ok(()) } /// Send a message to the daemon. async fn send_message( writer: &mut W, msg: &ClientMessage, ) -> Result<()> { // Serialize the message to JSON let json = serde_json::to_string(msg).context("Failed to serialize message")?; // Write the message followed by a newline writer.write_all(json.as_bytes()).await.context("Failed to write message")?; writer.write_all(b"\n").await.context("Failed to write newline")?; writer.flush().await.context("Failed to flush writer")?; Ok(()) } /// Run the interactive REPL. async fn run_repl( session: String, mut writer: tokio::io::BufWriter>, mut rx: mpsc::Receiver, ) -> Result<()> { let mut rl = DefaultEditor::new().context("Failed to create line editor")?; // Send a message to get session info let info_msg = ClientMessage::SessionInfo { session: session.clone(), }; send_message(&mut writer, &info_msg).await.context("Failed to send session info message")?; // Wait for the session info response let mut session_info = None; if let Some(msg) = rx.recv().await { if let DaemonMessage::SessionDetails { session: info } = msg { session_info = Some(info); } } // Main REPL loop let mut command_in_progress = false; loop { // Only prompt for input if no command is in progress if !command_in_progress { // Get current directory from session info let cwd = match &session_info { Some(info) => info.cwd.clone(), None => "?".to_string(), }; // Create the prompt let prompt = format!("[{}] {} $ ", session, cwd); // Read a line of input match rl.readline(&prompt) { Ok(line) => { // Skip empty lines if line.trim().is_empty() { continue; } // Add to history rl.add_history_entry(&line)?; // Special commands if line.trim() == "exit" { break; } // Send the command to the daemon let run_msg = ClientMessage::RunCommand { session: session.clone(), cmd: line, cwd: None, env: None, }; send_message(&mut writer, &run_msg).await .context("Failed to send run message")?; command_in_progress = true; } Err(ReadlineError::Interrupted) => { println!("^C"); continue; } Err(ReadlineError::Eof) => { println!("^D"); break; } Err(err) => { error!("Error reading line: {}", err); break; } } } else { // Process messages from the daemon match rx.recv().await { Some(DaemonMessage::Stdout { data, .. }) => { print!("{}", data); io::stdout().flush()?; } Some(DaemonMessage::Stderr { data, .. }) => { eprint!("{}", data); io::stderr().flush()?; } Some(DaemonMessage::Exit { .. }) => { command_in_progress = false; // Refresh session info let info_msg = ClientMessage::SessionInfo { session: session.clone(), }; send_message(&mut writer, &info_msg).await .context("Failed to send session info message")?; } Some(DaemonMessage::SessionDetails { session: info }) => { session_info = Some(info); } Some(DaemonMessage::Error { message }) => { eprintln!("Error: {}", message); command_in_progress = false; } Some(_) => {} None => break, } } } // Send a detach message to the daemon let detach_msg = ClientMessage::Detach { session: session.clone(), }; send_message(&mut writer, &detach_msg).await.context("Failed to send detach message")?; Ok(()) }