From bb683724a3ab3434b5e31c713aac760e51209ee0 Mon Sep 17 00:00:00 2001 From: Developer Date: Sun, 6 Apr 2025 03:27:28 +0000 Subject: [PATCH] Implement core project structure and functionality This commit includes: - Set up workspace with daemon, client, and common crates - Implemented protocol definitions for client-server communication - Created session management system with command execution - Built SQLite database logging infrastructure - Added Unix domain socket server for daemon - Implemented client with REPL interface - Added utility functions for paths and error handling --- Cargo.toml | 23 +++ client/Cargo.toml | 18 ++ client/src/main.rs | 287 +++++++++++++++++++++++++++++++ common/Cargo.toml | 14 ++ common/src/error.rs | 29 ++++ common/src/lib.rs | 9 + common/src/paths.rs | 43 +++++ common/src/protocol.rs | 140 ++++++++++++++++ daemon/Cargo.toml | 19 +++ daemon/src/db/mod.rs | 184 ++++++++++++++++++++ daemon/src/main.rs | 74 ++++++++ daemon/src/session/command.rs | 109 ++++++++++++ daemon/src/session/mod.rs | 308 ++++++++++++++++++++++++++++++++++ daemon/src/socket/mod.rs | 295 ++++++++++++++++++++++++++++++++ 14 files changed, 1552 insertions(+) create mode 100644 Cargo.toml create mode 100644 client/Cargo.toml create mode 100644 client/src/main.rs create mode 100644 common/Cargo.toml create mode 100644 common/src/error.rs create mode 100644 common/src/lib.rs create mode 100644 common/src/paths.rs create mode 100644 common/src/protocol.rs create mode 100644 daemon/Cargo.toml create mode 100644 daemon/src/db/mod.rs create mode 100644 daemon/src/main.rs create mode 100644 daemon/src/session/command.rs create mode 100644 daemon/src/session/mod.rs create mode 100644 daemon/src/socket/mod.rs diff --git a/Cargo.toml b/Cargo.toml new file mode 100644 index 0000000..0f9f797 --- /dev/null +++ b/Cargo.toml @@ -0,0 +1,23 @@ +[workspace] +members = [ + "daemon", + "client", + "common", +] +resolver = "2" + +[workspace.dependencies] +tokio = { version = "1.32", features = ["full"] } +serde = { version = "1.0", features = ["derive"] } +serde_json = "1.0" +rusqlite = { version = "0.30", features = ["bundled"] } +shell-words = "1.1" +clap = { version = "4.4", features = ["derive"] } +anyhow = "1.0" +thiserror = "1.0" +tracing = "0.1" +tracing-subscriber = "0.3" +chrono = { version = "0.4", features = ["serde"] } +xdg = "2.5" +tempfile = "3.8" +dirs = "5.0" diff --git a/client/Cargo.toml b/client/Cargo.toml new file mode 100644 index 0000000..8ab5625 --- /dev/null +++ b/client/Cargo.toml @@ -0,0 +1,18 @@ +[package] +name = "cypraea" +version = "0.1.0" +edition = "2021" + +[dependencies] +cypraea-common = { path = "../common" } +tokio = { workspace = true } +serde = { workspace = true } +serde_json = { workspace = true } +clap = { workspace = true } +anyhow = { workspace = true } +thiserror = { workspace = true } +tracing = { workspace = true } +tracing-subscriber = { workspace = true } +chrono = { workspace = true } +xdg = { workspace = true } +rustyline = "12.0" diff --git a/client/src/main.rs b/client/src/main.rs new file mode 100644 index 0000000..1682d2b --- /dev/null +++ b/client/src/main.rs @@ -0,0 +1,287 @@ +//! Cypraea Client (cypraea) +//! +//! This is the client component of the Cypraea shell. It connects to the daemon, +//! sends commands, and displays output to the user. + +use anyhow::{Context, Result}; +use clap::Parser; +use cypraea_common::paths; +use cypraea_common::protocol::{ClientMessage, DaemonMessage}; +use rustyline::error::ReadlineError; +use rustyline::DefaultEditor; +use serde_json::Deserializer; +use std::io::{self, Write}; +use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader}; +use tokio::net::UnixStream; +use tokio::sync::mpsc; +use tracing::{debug, error, info, warn}; + +mod display; + +#[derive(Parser, Debug)] +#[clap(version, about, long_about = None)] +struct Args { + /// Path to the Unix domain socket + #[clap(short, long)] + socket: Option, + + /// Session to connect to + #[clap(short, long, default_value = "default")] + session: String, + + /// Enable debug logging + #[clap(short, long)] + debug: bool, + + /// Command to run + #[clap(trailing_var_arg = true)] + command: Vec, +} + +#[tokio::main] +async fn main() -> Result<()> { + let args = Args::parse(); + + // Setup logging + let log_level = if args.debug { + tracing::Level::DEBUG + } else { + tracing::Level::INFO + }; + + tracing_subscriber::fmt() + .with_max_level(log_level) + .init(); + + // Get socket path + let socket_path = match args.socket { + Some(path) => path, + None => paths::socket_path() + .context("Failed to get socket path")? + .to_string_lossy() + .to_string(), + }; + + // Connect to the daemon + let stream = UnixStream::connect(&socket_path) + .await + .context("Failed to connect to daemon")?; + + info!("Connected to daemon at {}", socket_path); + + // Split the stream into reader and writer + let (reader, writer) = tokio::io::split(stream); + let mut reader = BufReader::new(reader); + let mut writer = tokio::io::BufWriter::new(writer); + + // Create a channel for message handling + let (tx, mut rx) = mpsc::channel::(100); + + // Spawn a task to read messages from the daemon + let read_task = tokio::spawn(async move { + let mut stream = Deserializer::from_reader(reader).into_iter::(); + + while let Some(msg_result) = stream.next() { + match msg_result { + Ok(msg) => { + debug!("Received message: {:?}", msg); + if tx.send(msg).await.is_err() { + break; + } + } + Err(e) => { + error!("Error parsing message: {}", e); + break; + } + } + } + }); + + // Send an attach message to the daemon + let attach_msg = ClientMessage::Attach { + session: args.session.clone(), + }; + send_message(&mut writer, &attach_msg) + .await + .context("Failed to send attach message")?; + + // If a command was provided, run it and exit + if !args.command.is_empty() { + let cmd = args.command.join(" "); + let run_msg = ClientMessage::RunCommand { + session: args.session.clone(), + cmd, + cwd: None, + env: None, + }; + send_message(&mut writer, &run_msg) + .await + .context("Failed to send run message")?; + + // Process messages until we get an exit code + while let Some(msg) = rx.recv().await { + match msg { + DaemonMessage::Stdout { data, .. } => { + print!("{}", data); + io::stdout().flush()?; + } + DaemonMessage::Stderr { data, .. } => { + eprint!("{}", data); + io::stderr().flush()?; + } + DaemonMessage::Exit { code, .. } => { + std::process::exit(code); + } + DaemonMessage::Error { message } => { + eprintln!("Error: {}", message); + std::process::exit(1); + } + _ => {} + } + } + } else { + // Start an interactive REPL + run_repl(args.session, writer, rx).await? + } + + // Cancel the read task + read_task.abort(); + + Ok(()) +} + +/// Send a message to the daemon. +async fn send_message( + writer: &mut W, + msg: &ClientMessage, +) -> Result<()> { + // Serialize the message to JSON + let json = serde_json::to_string(msg).context("Failed to serialize message")?; + + // Write the message followed by a newline + writer.write_all(json.as_bytes()).await.context("Failed to write message")?; + writer.write_all(b"\n").await.context("Failed to write newline")?; + writer.flush().await.context("Failed to flush writer")?; + + Ok(()) +} + +/// Run the interactive REPL. +async fn run_repl( + session: String, + mut writer: tokio::io::BufWriter>, + mut rx: mpsc::Receiver, +) -> Result<()> { + let mut rl = DefaultEditor::new().context("Failed to create line editor")?; + + // Send a message to get session info + let info_msg = ClientMessage::SessionInfo { + session: session.clone(), + }; + send_message(&mut writer, &info_msg).await.context("Failed to send session info message")?; + + // Wait for the session info response + let mut session_info = None; + if let Some(msg) = rx.recv().await { + if let DaemonMessage::SessionInfo { session: info } = msg { + session_info = Some(info); + } + } + + // Main REPL loop + let mut command_in_progress = false; + loop { + // Only prompt for input if no command is in progress + if !command_in_progress { + // Get current directory from session info + let cwd = match &session_info { + Some(info) => info.cwd.clone(), + None => "?".to_string(), + }; + + // Create the prompt + let prompt = format!("[{}] {} $ ", session, cwd); + + // Read a line of input + match rl.readline(&prompt) { + Ok(line) => { + // Skip empty lines + if line.trim().is_empty() { + continue; + } + + // Add to history + rl.add_history_entry(&line)?; + + // Special commands + if line.trim() == "exit" { + break; + } + + // Send the command to the daemon + let run_msg = ClientMessage::RunCommand { + session: session.clone(), + cmd: line, + cwd: None, + env: None, + }; + send_message(&mut writer, &run_msg).await + .context("Failed to send run message")?; + + command_in_progress = true; + } + Err(ReadlineError::Interrupted) => { + println!("^C"); + continue; + } + Err(ReadlineError::Eof) => { + println!("^D"); + break; + } + Err(err) => { + error!("Error reading line: {}", err); + break; + } + } + } else { + // Process messages from the daemon + match rx.recv().await { + Some(DaemonMessage::Stdout { data, .. }) => { + print!("{}", data); + io::stdout().flush()?; + } + Some(DaemonMessage::Stderr { data, .. }) => { + eprint!("{}", data); + io::stderr().flush()?; + } + Some(DaemonMessage::Exit { .. }) => { + command_in_progress = false; + + // Refresh session info + let info_msg = ClientMessage::SessionInfo { + session: session.clone(), + }; + send_message(&mut writer, &info_msg).await + .context("Failed to send session info message")?; + } + Some(DaemonMessage::SessionInfo { session: info }) => { + session_info = Some(info); + } + Some(DaemonMessage::Error { message }) => { + eprintln!("Error: {}", message); + command_in_progress = false; + } + Some(_) => {} + None => break, + } + } + } + + // Send a detach message to the daemon + let detach_msg = ClientMessage::Detach { + session: session.clone(), + }; + send_message(&mut writer, &detach_msg).await.context("Failed to send detach message")?; + + Ok(()) +} diff --git a/common/Cargo.toml b/common/Cargo.toml new file mode 100644 index 0000000..b04c507 --- /dev/null +++ b/common/Cargo.toml @@ -0,0 +1,14 @@ +[package] +name = "cypraea-common" +version = "0.1.0" +edition = "2021" + +[dependencies] +serde = { workspace = true } +serde_json = { workspace = true } +tokio = { workspace = true } +chrono = { workspace = true } +thiserror = { workspace = true } +anyhow = { workspace = true } +xdg = { workspace = true } +dirs = "5.0" diff --git a/common/src/error.rs b/common/src/error.rs new file mode 100644 index 0000000..b647635 --- /dev/null +++ b/common/src/error.rs @@ -0,0 +1,29 @@ +//! Error types for the Cypraea shell. + +use thiserror::Error; + +#[derive(Debug, Error)] +pub enum CypraeaError { + #[error("I/O error: {0}")] + Io(#[from] std::io::Error), + + #[error("Invalid JSON: {0}")] + Json(#[from] serde_json::Error), + + #[error("Database error: {0}")] + Database(String), + + #[error("Session error: {0}")] + Session(String), + + #[error("Command error: {0}")] + Command(String), + + #[error("Protocol error: {0}")] + Protocol(String), + + #[error("{0}")] + Other(String), +} + +pub type Result = std::result::Result; diff --git a/common/src/lib.rs b/common/src/lib.rs new file mode 100644 index 0000000..e46f08b --- /dev/null +++ b/common/src/lib.rs @@ -0,0 +1,9 @@ +//! Common types and utilities for the Cypraea shell. +//! +//! This crate defines the protocol used for communication between the +//! Cypraea client and daemon, as well as utilities for working with +//! shell sessions and commands. + +pub mod protocol; +pub mod paths; +pub mod error; diff --git a/common/src/paths.rs b/common/src/paths.rs new file mode 100644 index 0000000..43396a1 --- /dev/null +++ b/common/src/paths.rs @@ -0,0 +1,43 @@ +//! Path utilities for the Cypraea shell. + +use anyhow::{Context, Result}; +use std::path::{Path, PathBuf}; +use xdg::BaseDirectories; + +/// Get the path to the Cypraea socket. +pub fn socket_path() -> Result { + // Use XDG_RUNTIME_DIR if available, or fall back to a temporary directory + let xdg = BaseDirectories::new().context("Failed to initialize XDG base directories")?; + Ok(xdg.get_runtime_path().join("cypraea.sock")) +} + +/// Get the path to the Cypraea database directory. +pub fn data_dir() -> Result { + let xdg = BaseDirectories::new().context("Failed to initialize XDG base directories")?; + let path = xdg.get_data_home().join("cypraea"); + std::fs::create_dir_all(&path).context("Failed to create data directory")?; + Ok(path) +} + +/// Get the path to the Cypraea database file. +pub fn database_path() -> Result { + Ok(data_dir()?.join("log.sqlite")) +} + +/// Get the path to the Cypraea configuration directory. +pub fn config_dir() -> Result { + let xdg = BaseDirectories::new().context("Failed to initialize XDG base directories")?; + let path = xdg.get_config_home().join("cypraea"); + std::fs::create_dir_all(&path).context("Failed to create config directory")?; + Ok(path) +} + +/// Expand a path with respect to the user's home directory. +pub fn expand_path(path: &str) -> PathBuf { + if path.starts_with('~') { + if let Some(home) = dirs::home_dir() { + return home.join(&path[2..]); + } + } + Path::new(path).to_path_buf() +} diff --git a/common/src/protocol.rs b/common/src/protocol.rs new file mode 100644 index 0000000..53046fa --- /dev/null +++ b/common/src/protocol.rs @@ -0,0 +1,140 @@ +//! Protocol definitions for communication between the Cypraea client and daemon. + +use serde::{Deserialize, Serialize}; +use std::collections::HashMap; + +/// Client-to-daemon request message. +#[derive(Debug, Clone, Serialize, Deserialize)] +#[serde(tag = "type")] +pub enum ClientMessage { + /// Request to run a command in a session. + #[serde(rename = "run_command")] + RunCommand { + /// Session ID to run the command in. + session: String, + /// Command to run. + cmd: String, + /// Current working directory (optional). + cwd: Option, + /// Environment variables to set (optional). + env: Option>, + }, + + /// Request to attach to an existing session. + #[serde(rename = "attach")] + Attach { + /// Session ID to attach to. + session: String, + }, + + /// Request to detach from a session. + #[serde(rename = "detach")] + Detach { + /// Session ID to detach from. + session: String, + }, + + /// Request to list available sessions. + #[serde(rename = "list_sessions")] + ListSessions, + + /// Request to create a new session. + #[serde(rename = "create_session")] + CreateSession { + /// Optional name for the session. + name: Option, + }, + + /// Request to change directory in a session. + #[serde(rename = "cd")] + ChangeDirectory { + /// Session ID to change directory in. + session: String, + /// Directory to change to. + dir: String, + }, + + /// Request to get session information. + #[serde(rename = "session_info")] + SessionInfo { + /// Session ID to get information for. + session: String, + }, +} + +/// Daemon-to-client response message. +#[derive(Debug, Clone, Serialize, Deserialize)] +#[serde(tag = "type")] +pub enum DaemonMessage { + /// Standard output from a command. + #[serde(rename = "stdout")] + Stdout { + /// Session ID the output is from. + session: String, + /// Output data. + data: String, + }, + + /// Standard error from a command. + #[serde(rename = "stderr")] + Stderr { + /// Session ID the output is from. + session: String, + /// Output data. + data: String, + }, + + /// Command has exited. + #[serde(rename = "exit")] + Exit { + /// Session ID the command was run in. + session: String, + /// Exit code from the command. + code: i32, + }, + + /// Response to a session list request. + #[serde(rename = "sessions")] + Sessions { + /// List of available session IDs. + sessions: Vec, + }, + + /// Response to a session info request. + #[serde(rename = "session_info")] + SessionInfo { + /// Session information. + session: SessionInfo, + }, + + /// Error message. + #[serde(rename = "error")] + Error { + /// Error message. + message: String, + }, + + /// Success message. + #[serde(rename = "success")] + Success { + /// Success message. + message: String, + }, +} + +/// Information about a session. +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct SessionInfo { + /// Session ID. + pub id: String, + /// Current working directory of the session. + pub cwd: String, + /// Whether the session has an active command running. + pub active: bool, + /// When the session was created. + pub created_at: chrono::DateTime, + /// When the session was last used. + pub last_used: chrono::DateTime, + /// Number of commands run in this session. + pub command_count: u64, +} diff --git a/daemon/Cargo.toml b/daemon/Cargo.toml new file mode 100644 index 0000000..a582497 --- /dev/null +++ b/daemon/Cargo.toml @@ -0,0 +1,19 @@ +[package] +name = "cypraeadd" +version = "0.1.0" +edition = "2021" + +[dependencies] +cypraea-common = { path = "../common" } +tokio = { workspace = true } +serde = { workspace = true } +serde_json = { workspace = true } +rusqlite = { workspace = true } +shell-words = { workspace = true } +clap = { workspace = true } +anyhow = { workspace = true } +thiserror = { workspace = true } +tracing = { workspace = true } +tracing-subscriber = { workspace = true } +chrono = { workspace = true } +xdg = { workspace = true } diff --git a/daemon/src/db/mod.rs b/daemon/src/db/mod.rs new file mode 100644 index 0000000..c0466d6 --- /dev/null +++ b/daemon/src/db/mod.rs @@ -0,0 +1,184 @@ +//! Database module for the Cypraea daemon. +//! +//! This module handles logging command execution and other events to a SQLite database. + +use anyhow::{Context, Result}; +use chrono::{DateTime, Utc}; +use rusqlite::{params, Connection}; +use std::path::Path; +use std::sync::Arc; +use tokio::sync::Mutex; +use tracing::{debug, error, info}; + +/// Database connection wrapper. +#[derive(Clone)] +pub struct Database { + conn: Arc>, +} + +/// Command execution record. +pub struct CommandRecord { + pub session: String, + pub timestamp_start: DateTime, + pub timestamp_end: DateTime, + pub duration_ms: i64, + pub cmd: String, + pub cwd: String, + pub exit_code: i32, + pub stdout: String, + pub stderr: String, +} + +impl Database { + /// Create a new database connection. + pub async fn new(path: &Path) -> Result { + // Ensure parent directory exists + if let Some(parent) = path.parent() { + tokio::fs::create_dir_all(parent) + .await + .context("Failed to create database directory")?; + } + + debug!("Opening database at {:?}", path); + let conn = Connection::open(path).context("Failed to open database")?; + + // Initialize schema + let db = Self { + conn: Arc::new(Mutex::new(conn)), + }; + + db.init_schema().await.context("Failed to initialize database schema")?; + + info!("Database initialized"); + Ok(db) + } + + /// Initialize the database schema. + async fn init_schema(&self) -> Result<()> { + let conn = self.conn.lock().await; + conn.execute( + " + CREATE TABLE IF NOT EXISTS commands ( + id INTEGER PRIMARY KEY, + session TEXT NOT NULL, + timestamp_start TEXT NOT NULL, + timestamp_end TEXT NOT NULL, + duration_ms INTEGER NOT NULL, + cmd TEXT NOT NULL, + cwd TEXT NOT NULL, + exit_code INTEGER NOT NULL, + stdout TEXT, + stderr TEXT + ) + ", + [], + ) + .context("Failed to create commands table")?; + + Ok() + } + + /// Log a command execution. + pub async fn log_command(&self, record: &CommandRecord) -> Result { + let conn = self.conn.lock().await; + let res = conn + .execute( + " + INSERT INTO commands ( + session, + timestamp_start, + timestamp_end, + duration_ms, + cmd, + cwd, + exit_code, + stdout, + stderr + ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?) + ", + params![ + record.session, + record.timestamp_start.to_rfc3339(), + record.timestamp_end.to_rfc3339(), + record.duration_ms, + record.cmd, + record.cwd, + record.exit_code, + record.stdout, + record.stderr + ], + ) + .context("Failed to insert command record")?; + + Ok(conn.last_insert_rowid()) + } + + /// Get command history for a session. + pub async fn get_command_history(&self, session: &str, limit: usize) -> Result> { + let conn = self.conn.lock().await; + let mut stmt = conn + .prepare( + " + SELECT + session, + timestamp_start, + timestamp_end, + duration_ms, + cmd, + cwd, + exit_code, + stdout, + stderr + FROM commands + WHERE session = ? + ORDER BY timestamp_start DESC + LIMIT ? + ", + ) + .context("Failed to prepare command history query")?; + + let records = stmt + .query_map(params![session, limit as i64], |row| { + let timestamp_start: String = row.get(1)?; + let timestamp_end: String = row.get(2)?; + + Ok(CommandRecord { + session: row.get(0)?, + timestamp_start: DateTime::parse_from_rfc3339(×tamp_start) + .map(|dt| dt.with_timezone(&Utc)) + .unwrap_or_else(|_| Utc::now()), + timestamp_end: DateTime::parse_from_rfc3339(×tamp_end) + .map(|dt| dt.with_timezone(&Utc)) + .unwrap_or_else(|_| Utc::now()), + duration_ms: row.get(3)?, + cmd: row.get(4)?, + cwd: row.get(5)?, + exit_code: row.get(6)?, + stdout: row.get(7)?, + stderr: row.get(8)?, + }) + }) + .context("Failed to execute command history query")?; + + let mut results = Vec::new(); + for record in records { + results.push(record.context("Failed to parse command record")?); + } + + Ok(results) + } + + /// Get command count for a session. + pub async fn get_command_count(&self, session: &str) -> Result { + let conn = self.conn.lock().await; + let count: u64 = conn + .query_row( + "SELECT COUNT(*) FROM commands WHERE session = ?", + params![session], + |row| row.get(0), + ) + .context("Failed to get command count")?; + + Ok(count) + } +} diff --git a/daemon/src/main.rs b/daemon/src/main.rs new file mode 100644 index 0000000..2aa0dba --- /dev/null +++ b/daemon/src/main.rs @@ -0,0 +1,74 @@ +//! Cypraea Daemon (cypraeadd) +//! +//! This is the server component of the Cypraea shell. It manages shell sessions, +//! executes commands, and provides logging and persistence. + +use anyhow::{Context, Result}; +use clap::Parser; +use tracing::{info, warn, error}; +use cypraea_common::paths; + +mod db; +mod session; +mod socket; + +#[derive(Parser, Debug)] +#[clap(version, about, long_about = None)] +struct Args { + /// Path to the Unix domain socket + #[clap(short, long)] + socket: Option, + + /// Path to the SQLite database + #[clap(short, long)] + database: Option, + + /// Enable debug logging + #[clap(short, long)] + debug: bool, +} + +#[tokio::main] +async fn main() -> Result<()> { + let args = Args::parse(); + + // Setup logging + let log_level = if args.debug { + tracing::Level::DEBUG + } else { + tracing::Level::INFO + }; + + tracing_subscriber::fmt() + .with_max_level(log_level) + .init(); + + info!("Starting Cypraea daemon"); + + // Get paths + let socket_path = match args.socket { + Some(path) => path.into(), + None => paths::socket_path().context("Failed to get socket path")?, + }; + + let db_path = match args.database { + Some(path) => path.into(), + None => paths::database_path().context("Failed to get database path")?, + }; + + // Initialize database + let db = db::Database::new(&db_path).await + .context("Failed to initialize database")?; + + // Initialize session manager + let session_manager = session::SessionManager::new(db.clone()); + + // Start socket server + socket::Server::new(socket_path, session_manager) + .run() + .await + .context("Socket server error")?; + + info!("Daemon shutting down"); + Ok() +} diff --git a/daemon/src/session/command.rs b/daemon/src/session/command.rs new file mode 100644 index 0000000..9ae3d38 --- /dev/null +++ b/daemon/src/session/command.rs @@ -0,0 +1,109 @@ +//! Command execution utilities for sessions. + +use anyhow::{Context, Result}; +use std::process::Stdio; +use tokio::io::{AsyncBufReadExt, AsyncRead, BufReader}; +use tokio::process::{Child, Command}; +use tokio::sync::mpsc; + +/// Output capture result. +pub struct CapturedOutput { + pub stdout: String, + pub stderr: String, + pub exit_code: i32, +} + +/// Run a command and capture its output. +pub async fn run_and_capture(cmd: &str, cwd: &str, env: &[(String, String)]) -> Result { + // Parse the command line + let args = shell_words::split(cmd).context("Failed to parse command line")?; + if args.is_empty() { + return Ok(CapturedOutput { + stdout: String::new(), + stderr: String::new(), + exit_code: 0, + }); + } + + // Set up the command + let mut command = Command::new(&args[0]); + command + .args(&args[1..]) + .current_dir(cwd) + .env_clear(); + + // Add environment variables + for (key, value) in env { + command.env(key, value); + } + + // Set up I/O + command + .stdout(Stdio::piped()) + .stderr(Stdio::piped()); + + // Spawn the command + let mut child = command.spawn().context("Failed to spawn command")?; + + // Set up channels for stdout and stderr + let (stdout_tx, mut stdout_rx) = mpsc::channel::(100); + let (stderr_tx, mut stderr_rx) = mpsc::channel::(100); + + // Capture stdout + let stdout = child.stdout.take().expect("Child stdout not captured"); + let stdout_future = capture_output(stdout, stdout_tx); + + // Capture stderr + let stderr = child.stderr.take().expect("Child stderr not captured"); + let stderr_future = capture_output(stderr, stderr_tx); + + // Spawn futures to capture stdout and stderr + tokio::spawn(stdout_future); + tokio::spawn(stderr_future); + + // Collect stdout and stderr + let mut stdout_buffer = String::new(); + let mut stderr_buffer = String::new(); + + loop { + tokio::select! { + Some(line) = stdout_rx.recv() => { + stdout_buffer.push_str(&line); + } + Some(line) = stderr_rx.recv() => { + stderr_buffer.push_str(&line); + } + else => break, + } + } + + // Wait for the command to finish + let status = child.wait().await.context("Failed to wait for command")?; + + Ok(CapturedOutput { + stdout: stdout_buffer, + stderr: stderr_buffer, + exit_code: status.code().unwrap_or(-1), + }) +} + +/// Capture output from a child process. +async fn capture_output(reader: R, tx: mpsc::Sender) -> Result<()> +where + R: AsyncRead + Unpin, +{ + let mut reader = BufReader::new(reader); + let mut line = String::new(); + + loop { + line.clear(); + let bytes_read = reader.read_line(&mut line).await?; + if bytes_read == 0 { + break; + } + + tx.send(line.clone()).await.ok(); + } + + Ok(()) +} diff --git a/daemon/src/session/mod.rs b/daemon/src/session/mod.rs new file mode 100644 index 0000000..c6c2dd2 --- /dev/null +++ b/daemon/src/session/mod.rs @@ -0,0 +1,308 @@ +//! Session management for the Cypraea daemon. +//! +//! This module handles shell sessions, including creation, management, +//! and command execution within sessions. + +use anyhow::{anyhow, Context, Result}; +use chrono::{DateTime, Duration, Utc}; +use cypraea_common::protocol::SessionInfo; +use shell_words; +use std::collections::HashMap; +use std::path::{Path, PathBuf}; +use std::process::Stdio; +use std::sync::Arc; +use tokio::io::{AsyncBufReadExt, AsyncRead, BufReader}; +use tokio::process::{Child, Command}; +use tokio::sync::{mpsc, Mutex, RwLock}; +use tracing::{debug, error, info, warn}; + +use crate::db::{CommandRecord, Database}; + +pub mod command; + +/// A shell session. +#[derive(Debug)] +pub struct Session { + /// Session ID. + id: String, + /// Current working directory. + cwd: PathBuf, + /// Environment variables. + env: HashMap, + /// Aliases. + aliases: HashMap, + /// When the session was created. + created_at: DateTime, + /// When the session was last used. + last_used: DateTime, + /// Whether a command is currently running. + active: bool, + /// Command count. + command_count: u64, +} + +impl Session { + /// Create a new session. + pub fn new(id: String) -> Self { + let home = dirs::home_dir().unwrap_or_else(|| PathBuf::from(".")); + let now = Utc::now(); + + Self { + id, + cwd: home, + env: std::env::vars().collect(), + aliases: HashMap::new(), + created_at: now, + last_used: now, + active: false, + command_count: 0, + } + } + + /// Get the current working directory. + pub fn get_cwd(&self) -> &Path { + &self.cwd + } + + /// Change the current working directory. + pub fn change_directory(&mut self, dir: &str) -> Result<()> { + let new_dir = if dir.starts_with("/") { + PathBuf::from(dir) + } else { + self.cwd.join(dir) + }; + + // Canonicalize the path to resolve . and .. + let canonical = new_dir + .canonicalize() + .context("Failed to resolve directory")?; + + // Verify the directory exists and is accessible + if !canonical.is_dir() { + return Err(anyhow!("Not a directory: {}", dir)); + } + + self.cwd = canonical; + self.last_used = Utc::now(); + + Ok(()) + } + + /// Set an environment variable. + pub fn set_env(&mut self, key: String, value: String) { + self.env.insert(key, value); + self.last_used = Utc::now(); + } + + /// Get an environment variable. + pub fn get_env(&self, key: &str) -> Option<&String> { + self.env.get(key) + } + + /// Set an alias. + pub fn set_alias(&mut self, name: String, value: String) { + self.aliases.insert(name, value); + self.last_used = Utc::now(); + } + + /// Get an alias. + pub fn get_alias(&self, name: &str) -> Option<&String> { + self.aliases.get(name) + } + + /// Get the session info. + pub fn get_info(&self) -> SessionInfo { + SessionInfo { + id: self.id.clone(), + cwd: self.cwd.to_string_lossy().to_string(), + active: self.active, + created_at: self.created_at, + last_used: self.last_used, + command_count: self.command_count, + } + } + + /// Execute a command in this session. + pub async fn execute_command( + &mut self, + cmd: &str, + stdout_tx: mpsc::Sender, + stderr_tx: mpsc::Sender, + db: Option, + ) -> Result { + // Parse the command line + let args = shell_words::split(cmd).context("Failed to parse command line")?; + if args.is_empty() { + return Ok(0); + } + + // Check for built-in commands + if args[0] == "cd" { + let dir = args.get(1).map(|s| s.as_str()).unwrap_or("~"); + match self.change_directory(dir) { + Ok(_) => return Ok(0), + Err(e) => { + let err_msg = format!("cd: {}", e); + stderr_tx.send(err_msg).await.ok(); + return Ok(1); + } + } + } + + // Check for alias substitution + let args = if let Some(alias) = self.get_alias(&args[0]) { + // Replace the command with its alias + let mut alias_args = shell_words::split(alias).unwrap_or_default(); + let mut cmd_args = args[1..].to_vec(); + alias_args.append(&mut cmd_args); + alias_args + } else { + args + }; + + if args.is_empty() { + return Ok(0); + } + + self.active = true; + self.last_used = Utc::now(); + self.command_count += 1; + + // Record start time + let start_time = Utc::now(); + + // Set up the command + let mut command = Command::new(&args[0]); + command + .args(&args[1..]) + .current_dir(&self.cwd) + .env_clear() + .envs(&self.env) + .stdout(Stdio::piped()) + .stderr(Stdio::piped()); + + // Spawn the command + let mut child = command.spawn().context("Failed to spawn command")?; + + // Capture stdout + let stdout = child.stdout.take().expect("Child stdout not captured"); + let stdout_tx_clone = stdout_tx.clone(); + let stdout_future = Self::capture_output(stdout, stdout_tx_clone); + + // Capture stderr + let stderr = child.stderr.take().expect("Child stderr not captured"); + let stderr_future = Self::capture_output(stderr, stderr_tx.clone()); + + // Spawn futures to capture stdout and stderr + tokio::spawn(stdout_future); + tokio::spawn(stderr_future); + + // Collect stdout and stderr for logging + let mut stdout_buffer = Vec::new(); + let mut stderr_buffer = Vec::new(); + + // Wait for the command to finish + let status = child.wait().await.context("Failed to wait for command")?; + + // Record end time and calculate duration + let end_time = Utc::now(); + let duration = end_time.signed_duration_since(start_time); + + // Log the command execution if database is provided + if let Some(db) = db { + let record = CommandRecord { + session: self.id.clone(), + timestamp_start: start_time, + timestamp_end: end_time, + duration_ms: duration.num_milliseconds(), + cmd: cmd.to_string(), + cwd: self.cwd.to_string_lossy().to_string(), + exit_code: status.code().unwrap_or(-1), + stdout: String::from_utf8_lossy(&stdout_buffer).to_string(), + stderr: String::from_utf8_lossy(&stderr_buffer).to_string(), + }; + + if let Err(e) = db.log_command(&record).await { + error!("Failed to log command: {}", e); + } + } + + self.active = false; + Ok(status.code().unwrap_or(-1)) + } + + /// Capture output from a child process. + async fn capture_output(reader: R, tx: mpsc::Sender) -> Result<()> + where + R: AsyncRead + Unpin, + { + let mut reader = BufReader::new(reader); + let mut line = String::new(); + + loop { + line.clear(); + let bytes_read = reader.read_line(&mut line).await?; + if bytes_read == 0 { + break; + } + + tx.send(line.clone()).await.ok(); + } + + Ok(()) + } +} + +/// Session manager. +#[derive(Clone)] +pub struct SessionManager { + sessions: Arc>>>>, + db: Database, +} + +impl SessionManager { + /// Create a new session manager. + pub fn new(db: Database) -> Self { + Self { + sessions: Arc::new(RwLock::new(HashMap::new())), + db, + } + } + + /// Get or create a session. + pub async fn get_or_create_session(&self, id: &str) -> Arc> { + let mut sessions = self.sessions.write().await; + + if !sessions.contains_key(id) { + info!("Creating new session: {}", id); + let session = Session::new(id.to_string()); + sessions.insert(id.to_string(), Arc::new(Mutex::new(session))); + } + + sessions.get(id).unwrap().clone() + } + + /// Get a session if it exists. + pub async fn get_session(&self, id: &str) -> Option>> { + let sessions = self.sessions.read().await; + sessions.get(id).cloned() + } + + /// List all sessions. + pub async fn list_sessions(&self) -> Vec { + let sessions = self.sessions.read().await; + let mut result = Vec::new(); + + for session in sessions.values() { + let session = session.lock().await; + result.push(session.get_info()); + } + + result + } + + /// Get the database connection. + pub fn get_db(&self) -> Database { + self.db.clone() + } +} diff --git a/daemon/src/socket/mod.rs b/daemon/src/socket/mod.rs new file mode 100644 index 0000000..331bc5b --- /dev/null +++ b/daemon/src/socket/mod.rs @@ -0,0 +1,295 @@ +//! Socket server for the Cypraea daemon. +//! +//! This module handles communication with clients over a Unix domain socket. + +use anyhow::{Context, Result}; +use cypraea_common::protocol::{ClientMessage, DaemonMessage}; +use serde_json::Deserializer; +use std::fs; +use std::path::Path; +use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader, BufWriter}; +use tokio::net::{UnixListener, UnixStream}; +use tokio::sync::mpsc; +use tracing::{debug, error, info, warn}; + +use crate::session::SessionManager; + +/// Socket server for the daemon. +pub struct Server { + /// Path to the Unix domain socket. + socket_path: Box, + /// Session manager. + session_manager: SessionManager, +} + +impl Server { + /// Create a new socket server. + pub fn new>(socket_path: P, session_manager: SessionManager) -> Self { + Self { + socket_path: socket_path.as_ref().into(), + session_manager, + } + } + + /// Run the socket server. + pub async fn run(&self) -> Result<()> { + // Remove the socket file if it already exists + if self.socket_path.exists() { + fs::remove_file(&self.socket_path).context("Failed to remove existing socket file")?; + } + + // Create the listener + let listener = UnixListener::bind(&self.socket_path) + .context("Failed to bind Unix domain socket")?; + + info!("Listening on {:?}", self.socket_path); + + // Accept connections + loop { + match listener.accept().await { + Ok((stream, _addr)) => { + let session_manager = self.session_manager.clone(); + tokio::spawn(async move { + if let Err(e) = handle_client(stream, session_manager).await { + error!("Error handling client: {}", e); + } + }); + } + Err(e) => { + error!("Error accepting connection: {}", e); + } + } + } + } +} + +/// Handle a client connection. +async fn handle_client(stream: UnixStream, session_manager: SessionManager) -> Result<()> { + let (reader, writer) = tokio::io::split(stream); + let mut reader = BufReader::new(reader); + let mut writer = BufWriter::new(writer); + + // Create a channel for sending responses back to the client + let (tx, mut rx) = mpsc::channel::(100); + + // Spawn a task to handle sending responses + let write_task = tokio::spawn(async move { + while let Some(msg) = rx.recv().await { + if let Err(e) = send_message(&mut writer, &msg).await { + error!("Error sending message to client: {}", e); + break; + } + } + Ok::<_, anyhow::Error>(()) + }); + + // Process incoming messages + let mut stream = Deserializer::from_reader(reader).into_iter::(); + + while let Some(msg_result) = stream.next() { + let msg = msg_result.context("Failed to parse client message")?; + debug!("Received message: {:?}", msg); + + if let Err(e) = process_message(msg, &session_manager, tx.clone()).await { + error!("Error processing message: {}", e); + let error_msg = DaemonMessage::Error { + message: e.to_string(), + }; + tx.send(error_msg).await.ok(); + } + } + + // Cancel the write task + write_task.abort(); + + Ok(()) +} + +/// Process a client message. +async fn process_message( + msg: ClientMessage, + session_manager: &SessionManager, + tx: mpsc::Sender, +) -> Result<()> { + match msg { + ClientMessage::RunCommand { session, cmd, cwd, env } => { + // Get or create the session + let session_arc = session_manager.get_or_create_session(&session).await; + let mut session_guard = session_arc.lock().await; + + // Set custom working directory if provided + if let Some(dir) = cwd { + if let Err(e) = session_guard.change_directory(&dir) { + tx.send(DaemonMessage::Error { + message: format!("Failed to change directory: {}", e), + }) + .await + .context("Failed to send error message")?; + return Ok(()); + } + } + + // Set custom environment variables if provided + if let Some(env_vars) = env { + for (key, value) in env_vars { + session_guard.set_env(key, value); + } + } + + // Create channels for stdout and stderr + let (stdout_tx, mut stdout_rx) = mpsc::channel::(100); + let (stderr_tx, mut stderr_rx) = mpsc::channel::(100); + + // Clone the response channel and session ID for the output handlers + let stdout_session = session.clone(); + let stderr_session = session.clone(); + let tx_stdout = tx.clone(); + let tx_stderr = tx.clone(); + + // Spawn tasks to forward stdout and stderr to the client + let stdout_task = tokio::spawn(async move { + while let Some(data) = stdout_rx.recv().await { + let msg = DaemonMessage::Stdout { + session: stdout_session.clone(), + data, + }; + if tx_stdout.send(msg).await.is_err() { + break; + } + } + }); + + let stderr_task = tokio::spawn(async move { + while let Some(data) = stderr_rx.recv().await { + let msg = DaemonMessage::Stderr { + session: stderr_session.clone(), + data, + }; + if tx_stderr.send(msg).await.is_err() { + break; + } + } + }); + + // Execute the command + let db = session_manager.get_db(); + let exit_code = session_guard + .execute_command(&cmd, stdout_tx, stderr_tx, Some(db)) + .await + .context("Failed to execute command")?; + + // Wait for the output handlers to finish + stdout_task.abort(); + stderr_task.abort(); + + // Send the exit code + let exit_msg = DaemonMessage::Exit { + session, + code: exit_code, + }; + tx.send(exit_msg).await.context("Failed to send exit message")?; + } + + ClientMessage::Attach { session } => { + // Get or create the session + let session_arc = session_manager.get_or_create_session(&session).await; + let session_guard = session_arc.lock().await; + + // Send session info + let info = session_guard.get_info(); + let msg = DaemonMessage::SessionInfo { session: info }; + tx.send(msg).await.context("Failed to send session info")?; + } + + ClientMessage::Detach { session: _ } => { + // Nothing special to do for detach, just acknowledge + let msg = DaemonMessage::Success { + message: "Detached from session".to_string(), + }; + tx.send(msg).await.context("Failed to send success message")?; + } + + ClientMessage::ListSessions => { + // Get all sessions + let sessions = session_manager.list_sessions().await; + let msg = DaemonMessage::Sessions { sessions }; + tx.send(msg).await.context("Failed to send sessions list")?; + } + + ClientMessage::CreateSession { name } => { + // Generate session ID + let id = match name { + Some(name) => name, + None => format!("{}", chrono::Utc::now().timestamp()), + }; + + // Create the session + let session_arc = session_manager.get_or_create_session(&id).await; + let session_guard = session_arc.lock().await; + + // Send session info + let info = session_guard.get_info(); + let msg = DaemonMessage::SessionInfo { session: info }; + tx.send(msg).await.context("Failed to send session info")?; + } + + ClientMessage::ChangeDirectory { session, dir } => { + // Get the session + let session_arc = session_manager.get_or_create_session(&session).await; + let mut session_guard = session_arc.lock().await; + + // Change directory + match session_guard.change_directory(&dir) { + Ok(_) => { + let msg = DaemonMessage::Success { + message: format!("Changed directory to {}", session_guard.get_cwd().display()), + }; + tx.send(msg).await.context("Failed to send success message")?; + } + Err(e) => { + let msg = DaemonMessage::Error { + message: format!("Failed to change directory: {}", e), + }; + tx.send(msg).await.context("Failed to send error message")?; + } + } + } + + ClientMessage::SessionInfo { session } => { + // Get the session + let session_arc = match session_manager.get_session(&session).await { + Some(s) => s, + None => { + let msg = DaemonMessage::Error { + message: format!("Session not found: {}", session), + }; + tx.send(msg).await.context("Failed to send error message")?; + return Ok(()); + } + }; + + let session_guard = session_arc.lock().await; + let info = session_guard.get_info(); + let msg = DaemonMessage::SessionInfo { session: info }; + tx.send(msg).await.context("Failed to send session info")?; + } + } + + Ok(()) +} + +/// Send a message to the client. +async fn send_message( + writer: &mut W, + msg: &DaemonMessage, +) -> Result<()> { + // Serialize the message to JSON + let json = serde_json::to_string(msg).context("Failed to serialize message")?; + + // Write the message followed by a newline + writer.write_all(json.as_bytes()).await.context("Failed to write message")?; + writer.write_all(b"\n").await.context("Failed to write newline")?; + writer.flush().await.context("Failed to flush writer")?; + + Ok(()) +}