Implement core project structure and functionality
This commit includes: - Set up workspace with daemon, client, and common crates - Implemented protocol definitions for client-server communication - Created session management system with command execution - Built SQLite database logging infrastructure - Added Unix domain socket server for daemon - Implemented client with REPL interface - Added utility functions for paths and error handlingmain
parent
44de903ccd
commit
bb683724a3
|
|
@ -0,0 +1,23 @@
|
|||
[workspace]
|
||||
members = [
|
||||
"daemon",
|
||||
"client",
|
||||
"common",
|
||||
]
|
||||
resolver = "2"
|
||||
|
||||
[workspace.dependencies]
|
||||
tokio = { version = "1.32", features = ["full"] }
|
||||
serde = { version = "1.0", features = ["derive"] }
|
||||
serde_json = "1.0"
|
||||
rusqlite = { version = "0.30", features = ["bundled"] }
|
||||
shell-words = "1.1"
|
||||
clap = { version = "4.4", features = ["derive"] }
|
||||
anyhow = "1.0"
|
||||
thiserror = "1.0"
|
||||
tracing = "0.1"
|
||||
tracing-subscriber = "0.3"
|
||||
chrono = { version = "0.4", features = ["serde"] }
|
||||
xdg = "2.5"
|
||||
tempfile = "3.8"
|
||||
dirs = "5.0"
|
||||
|
|
@ -0,0 +1,18 @@
|
|||
[package]
|
||||
name = "cypraea"
|
||||
version = "0.1.0"
|
||||
edition = "2021"
|
||||
|
||||
[dependencies]
|
||||
cypraea-common = { path = "../common" }
|
||||
tokio = { workspace = true }
|
||||
serde = { workspace = true }
|
||||
serde_json = { workspace = true }
|
||||
clap = { workspace = true }
|
||||
anyhow = { workspace = true }
|
||||
thiserror = { workspace = true }
|
||||
tracing = { workspace = true }
|
||||
tracing-subscriber = { workspace = true }
|
||||
chrono = { workspace = true }
|
||||
xdg = { workspace = true }
|
||||
rustyline = "12.0"
|
||||
|
|
@ -0,0 +1,287 @@
|
|||
//! Cypraea Client (cypraea)
|
||||
//!
|
||||
//! This is the client component of the Cypraea shell. It connects to the daemon,
|
||||
//! sends commands, and displays output to the user.
|
||||
|
||||
use anyhow::{Context, Result};
|
||||
use clap::Parser;
|
||||
use cypraea_common::paths;
|
||||
use cypraea_common::protocol::{ClientMessage, DaemonMessage};
|
||||
use rustyline::error::ReadlineError;
|
||||
use rustyline::DefaultEditor;
|
||||
use serde_json::Deserializer;
|
||||
use std::io::{self, Write};
|
||||
use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
|
||||
use tokio::net::UnixStream;
|
||||
use tokio::sync::mpsc;
|
||||
use tracing::{debug, error, info, warn};
|
||||
|
||||
mod display;
|
||||
|
||||
#[derive(Parser, Debug)]
|
||||
#[clap(version, about, long_about = None)]
|
||||
struct Args {
|
||||
/// Path to the Unix domain socket
|
||||
#[clap(short, long)]
|
||||
socket: Option<String>,
|
||||
|
||||
/// Session to connect to
|
||||
#[clap(short, long, default_value = "default")]
|
||||
session: String,
|
||||
|
||||
/// Enable debug logging
|
||||
#[clap(short, long)]
|
||||
debug: bool,
|
||||
|
||||
/// Command to run
|
||||
#[clap(trailing_var_arg = true)]
|
||||
command: Vec<String>,
|
||||
}
|
||||
|
||||
#[tokio::main]
|
||||
async fn main() -> Result<()> {
|
||||
let args = Args::parse();
|
||||
|
||||
// Setup logging
|
||||
let log_level = if args.debug {
|
||||
tracing::Level::DEBUG
|
||||
} else {
|
||||
tracing::Level::INFO
|
||||
};
|
||||
|
||||
tracing_subscriber::fmt()
|
||||
.with_max_level(log_level)
|
||||
.init();
|
||||
|
||||
// Get socket path
|
||||
let socket_path = match args.socket {
|
||||
Some(path) => path,
|
||||
None => paths::socket_path()
|
||||
.context("Failed to get socket path")?
|
||||
.to_string_lossy()
|
||||
.to_string(),
|
||||
};
|
||||
|
||||
// Connect to the daemon
|
||||
let stream = UnixStream::connect(&socket_path)
|
||||
.await
|
||||
.context("Failed to connect to daemon")?;
|
||||
|
||||
info!("Connected to daemon at {}", socket_path);
|
||||
|
||||
// Split the stream into reader and writer
|
||||
let (reader, writer) = tokio::io::split(stream);
|
||||
let mut reader = BufReader::new(reader);
|
||||
let mut writer = tokio::io::BufWriter::new(writer);
|
||||
|
||||
// Create a channel for message handling
|
||||
let (tx, mut rx) = mpsc::channel::<DaemonMessage>(100);
|
||||
|
||||
// Spawn a task to read messages from the daemon
|
||||
let read_task = tokio::spawn(async move {
|
||||
let mut stream = Deserializer::from_reader(reader).into_iter::<DaemonMessage>();
|
||||
|
||||
while let Some(msg_result) = stream.next() {
|
||||
match msg_result {
|
||||
Ok(msg) => {
|
||||
debug!("Received message: {:?}", msg);
|
||||
if tx.send(msg).await.is_err() {
|
||||
break;
|
||||
}
|
||||
}
|
||||
Err(e) => {
|
||||
error!("Error parsing message: {}", e);
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
// Send an attach message to the daemon
|
||||
let attach_msg = ClientMessage::Attach {
|
||||
session: args.session.clone(),
|
||||
};
|
||||
send_message(&mut writer, &attach_msg)
|
||||
.await
|
||||
.context("Failed to send attach message")?;
|
||||
|
||||
// If a command was provided, run it and exit
|
||||
if !args.command.is_empty() {
|
||||
let cmd = args.command.join(" ");
|
||||
let run_msg = ClientMessage::RunCommand {
|
||||
session: args.session.clone(),
|
||||
cmd,
|
||||
cwd: None,
|
||||
env: None,
|
||||
};
|
||||
send_message(&mut writer, &run_msg)
|
||||
.await
|
||||
.context("Failed to send run message")?;
|
||||
|
||||
// Process messages until we get an exit code
|
||||
while let Some(msg) = rx.recv().await {
|
||||
match msg {
|
||||
DaemonMessage::Stdout { data, .. } => {
|
||||
print!("{}", data);
|
||||
io::stdout().flush()?;
|
||||
}
|
||||
DaemonMessage::Stderr { data, .. } => {
|
||||
eprint!("{}", data);
|
||||
io::stderr().flush()?;
|
||||
}
|
||||
DaemonMessage::Exit { code, .. } => {
|
||||
std::process::exit(code);
|
||||
}
|
||||
DaemonMessage::Error { message } => {
|
||||
eprintln!("Error: {}", message);
|
||||
std::process::exit(1);
|
||||
}
|
||||
_ => {}
|
||||
}
|
||||
}
|
||||
} else {
|
||||
// Start an interactive REPL
|
||||
run_repl(args.session, writer, rx).await?
|
||||
}
|
||||
|
||||
// Cancel the read task
|
||||
read_task.abort();
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Send a message to the daemon.
|
||||
async fn send_message<W: AsyncWriteExt + Unpin>(
|
||||
writer: &mut W,
|
||||
msg: &ClientMessage,
|
||||
) -> Result<()> {
|
||||
// Serialize the message to JSON
|
||||
let json = serde_json::to_string(msg).context("Failed to serialize message")?;
|
||||
|
||||
// Write the message followed by a newline
|
||||
writer.write_all(json.as_bytes()).await.context("Failed to write message")?;
|
||||
writer.write_all(b"\n").await.context("Failed to write newline")?;
|
||||
writer.flush().await.context("Failed to flush writer")?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Run the interactive REPL.
|
||||
async fn run_repl(
|
||||
session: String,
|
||||
mut writer: tokio::io::BufWriter<tokio::io::WriteHalf<UnixStream>>,
|
||||
mut rx: mpsc::Receiver<DaemonMessage>,
|
||||
) -> Result<()> {
|
||||
let mut rl = DefaultEditor::new().context("Failed to create line editor")?;
|
||||
|
||||
// Send a message to get session info
|
||||
let info_msg = ClientMessage::SessionInfo {
|
||||
session: session.clone(),
|
||||
};
|
||||
send_message(&mut writer, &info_msg).await.context("Failed to send session info message")?;
|
||||
|
||||
// Wait for the session info response
|
||||
let mut session_info = None;
|
||||
if let Some(msg) = rx.recv().await {
|
||||
if let DaemonMessage::SessionInfo { session: info } = msg {
|
||||
session_info = Some(info);
|
||||
}
|
||||
}
|
||||
|
||||
// Main REPL loop
|
||||
let mut command_in_progress = false;
|
||||
loop {
|
||||
// Only prompt for input if no command is in progress
|
||||
if !command_in_progress {
|
||||
// Get current directory from session info
|
||||
let cwd = match &session_info {
|
||||
Some(info) => info.cwd.clone(),
|
||||
None => "?".to_string(),
|
||||
};
|
||||
|
||||
// Create the prompt
|
||||
let prompt = format!("[{}] {} $ ", session, cwd);
|
||||
|
||||
// Read a line of input
|
||||
match rl.readline(&prompt) {
|
||||
Ok(line) => {
|
||||
// Skip empty lines
|
||||
if line.trim().is_empty() {
|
||||
continue;
|
||||
}
|
||||
|
||||
// Add to history
|
||||
rl.add_history_entry(&line)?;
|
||||
|
||||
// Special commands
|
||||
if line.trim() == "exit" {
|
||||
break;
|
||||
}
|
||||
|
||||
// Send the command to the daemon
|
||||
let run_msg = ClientMessage::RunCommand {
|
||||
session: session.clone(),
|
||||
cmd: line,
|
||||
cwd: None,
|
||||
env: None,
|
||||
};
|
||||
send_message(&mut writer, &run_msg).await
|
||||
.context("Failed to send run message")?;
|
||||
|
||||
command_in_progress = true;
|
||||
}
|
||||
Err(ReadlineError::Interrupted) => {
|
||||
println!("^C");
|
||||
continue;
|
||||
}
|
||||
Err(ReadlineError::Eof) => {
|
||||
println!("^D");
|
||||
break;
|
||||
}
|
||||
Err(err) => {
|
||||
error!("Error reading line: {}", err);
|
||||
break;
|
||||
}
|
||||
}
|
||||
} else {
|
||||
// Process messages from the daemon
|
||||
match rx.recv().await {
|
||||
Some(DaemonMessage::Stdout { data, .. }) => {
|
||||
print!("{}", data);
|
||||
io::stdout().flush()?;
|
||||
}
|
||||
Some(DaemonMessage::Stderr { data, .. }) => {
|
||||
eprint!("{}", data);
|
||||
io::stderr().flush()?;
|
||||
}
|
||||
Some(DaemonMessage::Exit { .. }) => {
|
||||
command_in_progress = false;
|
||||
|
||||
// Refresh session info
|
||||
let info_msg = ClientMessage::SessionInfo {
|
||||
session: session.clone(),
|
||||
};
|
||||
send_message(&mut writer, &info_msg).await
|
||||
.context("Failed to send session info message")?;
|
||||
}
|
||||
Some(DaemonMessage::SessionInfo { session: info }) => {
|
||||
session_info = Some(info);
|
||||
}
|
||||
Some(DaemonMessage::Error { message }) => {
|
||||
eprintln!("Error: {}", message);
|
||||
command_in_progress = false;
|
||||
}
|
||||
Some(_) => {}
|
||||
None => break,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Send a detach message to the daemon
|
||||
let detach_msg = ClientMessage::Detach {
|
||||
session: session.clone(),
|
||||
};
|
||||
send_message(&mut writer, &detach_msg).await.context("Failed to send detach message")?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
|
@ -0,0 +1,14 @@
|
|||
[package]
|
||||
name = "cypraea-common"
|
||||
version = "0.1.0"
|
||||
edition = "2021"
|
||||
|
||||
[dependencies]
|
||||
serde = { workspace = true }
|
||||
serde_json = { workspace = true }
|
||||
tokio = { workspace = true }
|
||||
chrono = { workspace = true }
|
||||
thiserror = { workspace = true }
|
||||
anyhow = { workspace = true }
|
||||
xdg = { workspace = true }
|
||||
dirs = "5.0"
|
||||
|
|
@ -0,0 +1,29 @@
|
|||
//! Error types for the Cypraea shell.
|
||||
|
||||
use thiserror::Error;
|
||||
|
||||
#[derive(Debug, Error)]
|
||||
pub enum CypraeaError {
|
||||
#[error("I/O error: {0}")]
|
||||
Io(#[from] std::io::Error),
|
||||
|
||||
#[error("Invalid JSON: {0}")]
|
||||
Json(#[from] serde_json::Error),
|
||||
|
||||
#[error("Database error: {0}")]
|
||||
Database(String),
|
||||
|
||||
#[error("Session error: {0}")]
|
||||
Session(String),
|
||||
|
||||
#[error("Command error: {0}")]
|
||||
Command(String),
|
||||
|
||||
#[error("Protocol error: {0}")]
|
||||
Protocol(String),
|
||||
|
||||
#[error("{0}")]
|
||||
Other(String),
|
||||
}
|
||||
|
||||
pub type Result<T> = std::result::Result<T, CypraeaError>;
|
||||
|
|
@ -0,0 +1,9 @@
|
|||
//! Common types and utilities for the Cypraea shell.
|
||||
//!
|
||||
//! This crate defines the protocol used for communication between the
|
||||
//! Cypraea client and daemon, as well as utilities for working with
|
||||
//! shell sessions and commands.
|
||||
|
||||
pub mod protocol;
|
||||
pub mod paths;
|
||||
pub mod error;
|
||||
|
|
@ -0,0 +1,43 @@
|
|||
//! Path utilities for the Cypraea shell.
|
||||
|
||||
use anyhow::{Context, Result};
|
||||
use std::path::{Path, PathBuf};
|
||||
use xdg::BaseDirectories;
|
||||
|
||||
/// Get the path to the Cypraea socket.
|
||||
pub fn socket_path() -> Result<PathBuf> {
|
||||
// Use XDG_RUNTIME_DIR if available, or fall back to a temporary directory
|
||||
let xdg = BaseDirectories::new().context("Failed to initialize XDG base directories")?;
|
||||
Ok(xdg.get_runtime_path().join("cypraea.sock"))
|
||||
}
|
||||
|
||||
/// Get the path to the Cypraea database directory.
|
||||
pub fn data_dir() -> Result<PathBuf> {
|
||||
let xdg = BaseDirectories::new().context("Failed to initialize XDG base directories")?;
|
||||
let path = xdg.get_data_home().join("cypraea");
|
||||
std::fs::create_dir_all(&path).context("Failed to create data directory")?;
|
||||
Ok(path)
|
||||
}
|
||||
|
||||
/// Get the path to the Cypraea database file.
|
||||
pub fn database_path() -> Result<PathBuf> {
|
||||
Ok(data_dir()?.join("log.sqlite"))
|
||||
}
|
||||
|
||||
/// Get the path to the Cypraea configuration directory.
|
||||
pub fn config_dir() -> Result<PathBuf> {
|
||||
let xdg = BaseDirectories::new().context("Failed to initialize XDG base directories")?;
|
||||
let path = xdg.get_config_home().join("cypraea");
|
||||
std::fs::create_dir_all(&path).context("Failed to create config directory")?;
|
||||
Ok(path)
|
||||
}
|
||||
|
||||
/// Expand a path with respect to the user's home directory.
|
||||
pub fn expand_path(path: &str) -> PathBuf {
|
||||
if path.starts_with('~') {
|
||||
if let Some(home) = dirs::home_dir() {
|
||||
return home.join(&path[2..]);
|
||||
}
|
||||
}
|
||||
Path::new(path).to_path_buf()
|
||||
}
|
||||
|
|
@ -0,0 +1,140 @@
|
|||
//! Protocol definitions for communication between the Cypraea client and daemon.
|
||||
|
||||
use serde::{Deserialize, Serialize};
|
||||
use std::collections::HashMap;
|
||||
|
||||
/// Client-to-daemon request message.
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
#[serde(tag = "type")]
|
||||
pub enum ClientMessage {
|
||||
/// Request to run a command in a session.
|
||||
#[serde(rename = "run_command")]
|
||||
RunCommand {
|
||||
/// Session ID to run the command in.
|
||||
session: String,
|
||||
/// Command to run.
|
||||
cmd: String,
|
||||
/// Current working directory (optional).
|
||||
cwd: Option<String>,
|
||||
/// Environment variables to set (optional).
|
||||
env: Option<HashMap<String, String>>,
|
||||
},
|
||||
|
||||
/// Request to attach to an existing session.
|
||||
#[serde(rename = "attach")]
|
||||
Attach {
|
||||
/// Session ID to attach to.
|
||||
session: String,
|
||||
},
|
||||
|
||||
/// Request to detach from a session.
|
||||
#[serde(rename = "detach")]
|
||||
Detach {
|
||||
/// Session ID to detach from.
|
||||
session: String,
|
||||
},
|
||||
|
||||
/// Request to list available sessions.
|
||||
#[serde(rename = "list_sessions")]
|
||||
ListSessions,
|
||||
|
||||
/// Request to create a new session.
|
||||
#[serde(rename = "create_session")]
|
||||
CreateSession {
|
||||
/// Optional name for the session.
|
||||
name: Option<String>,
|
||||
},
|
||||
|
||||
/// Request to change directory in a session.
|
||||
#[serde(rename = "cd")]
|
||||
ChangeDirectory {
|
||||
/// Session ID to change directory in.
|
||||
session: String,
|
||||
/// Directory to change to.
|
||||
dir: String,
|
||||
},
|
||||
|
||||
/// Request to get session information.
|
||||
#[serde(rename = "session_info")]
|
||||
SessionInfo {
|
||||
/// Session ID to get information for.
|
||||
session: String,
|
||||
},
|
||||
}
|
||||
|
||||
/// Daemon-to-client response message.
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
#[serde(tag = "type")]
|
||||
pub enum DaemonMessage {
|
||||
/// Standard output from a command.
|
||||
#[serde(rename = "stdout")]
|
||||
Stdout {
|
||||
/// Session ID the output is from.
|
||||
session: String,
|
||||
/// Output data.
|
||||
data: String,
|
||||
},
|
||||
|
||||
/// Standard error from a command.
|
||||
#[serde(rename = "stderr")]
|
||||
Stderr {
|
||||
/// Session ID the output is from.
|
||||
session: String,
|
||||
/// Output data.
|
||||
data: String,
|
||||
},
|
||||
|
||||
/// Command has exited.
|
||||
#[serde(rename = "exit")]
|
||||
Exit {
|
||||
/// Session ID the command was run in.
|
||||
session: String,
|
||||
/// Exit code from the command.
|
||||
code: i32,
|
||||
},
|
||||
|
||||
/// Response to a session list request.
|
||||
#[serde(rename = "sessions")]
|
||||
Sessions {
|
||||
/// List of available session IDs.
|
||||
sessions: Vec<SessionInfo>,
|
||||
},
|
||||
|
||||
/// Response to a session info request.
|
||||
#[serde(rename = "session_info")]
|
||||
SessionInfo {
|
||||
/// Session information.
|
||||
session: SessionInfo,
|
||||
},
|
||||
|
||||
/// Error message.
|
||||
#[serde(rename = "error")]
|
||||
Error {
|
||||
/// Error message.
|
||||
message: String,
|
||||
},
|
||||
|
||||
/// Success message.
|
||||
#[serde(rename = "success")]
|
||||
Success {
|
||||
/// Success message.
|
||||
message: String,
|
||||
},
|
||||
}
|
||||
|
||||
/// Information about a session.
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
pub struct SessionInfo {
|
||||
/// Session ID.
|
||||
pub id: String,
|
||||
/// Current working directory of the session.
|
||||
pub cwd: String,
|
||||
/// Whether the session has an active command running.
|
||||
pub active: bool,
|
||||
/// When the session was created.
|
||||
pub created_at: chrono::DateTime<chrono::Utc>,
|
||||
/// When the session was last used.
|
||||
pub last_used: chrono::DateTime<chrono::Utc>,
|
||||
/// Number of commands run in this session.
|
||||
pub command_count: u64,
|
||||
}
|
||||
|
|
@ -0,0 +1,19 @@
|
|||
[package]
|
||||
name = "cypraeadd"
|
||||
version = "0.1.0"
|
||||
edition = "2021"
|
||||
|
||||
[dependencies]
|
||||
cypraea-common = { path = "../common" }
|
||||
tokio = { workspace = true }
|
||||
serde = { workspace = true }
|
||||
serde_json = { workspace = true }
|
||||
rusqlite = { workspace = true }
|
||||
shell-words = { workspace = true }
|
||||
clap = { workspace = true }
|
||||
anyhow = { workspace = true }
|
||||
thiserror = { workspace = true }
|
||||
tracing = { workspace = true }
|
||||
tracing-subscriber = { workspace = true }
|
||||
chrono = { workspace = true }
|
||||
xdg = { workspace = true }
|
||||
|
|
@ -0,0 +1,184 @@
|
|||
//! Database module for the Cypraea daemon.
|
||||
//!
|
||||
//! This module handles logging command execution and other events to a SQLite database.
|
||||
|
||||
use anyhow::{Context, Result};
|
||||
use chrono::{DateTime, Utc};
|
||||
use rusqlite::{params, Connection};
|
||||
use std::path::Path;
|
||||
use std::sync::Arc;
|
||||
use tokio::sync::Mutex;
|
||||
use tracing::{debug, error, info};
|
||||
|
||||
/// Database connection wrapper.
|
||||
#[derive(Clone)]
|
||||
pub struct Database {
|
||||
conn: Arc<Mutex<Connection>>,
|
||||
}
|
||||
|
||||
/// Command execution record.
|
||||
pub struct CommandRecord {
|
||||
pub session: String,
|
||||
pub timestamp_start: DateTime<Utc>,
|
||||
pub timestamp_end: DateTime<Utc>,
|
||||
pub duration_ms: i64,
|
||||
pub cmd: String,
|
||||
pub cwd: String,
|
||||
pub exit_code: i32,
|
||||
pub stdout: String,
|
||||
pub stderr: String,
|
||||
}
|
||||
|
||||
impl Database {
|
||||
/// Create a new database connection.
|
||||
pub async fn new(path: &Path) -> Result<Self> {
|
||||
// Ensure parent directory exists
|
||||
if let Some(parent) = path.parent() {
|
||||
tokio::fs::create_dir_all(parent)
|
||||
.await
|
||||
.context("Failed to create database directory")?;
|
||||
}
|
||||
|
||||
debug!("Opening database at {:?}", path);
|
||||
let conn = Connection::open(path).context("Failed to open database")?;
|
||||
|
||||
// Initialize schema
|
||||
let db = Self {
|
||||
conn: Arc::new(Mutex::new(conn)),
|
||||
};
|
||||
|
||||
db.init_schema().await.context("Failed to initialize database schema")?;
|
||||
|
||||
info!("Database initialized");
|
||||
Ok(db)
|
||||
}
|
||||
|
||||
/// Initialize the database schema.
|
||||
async fn init_schema(&self) -> Result<()> {
|
||||
let conn = self.conn.lock().await;
|
||||
conn.execute(
|
||||
"
|
||||
CREATE TABLE IF NOT EXISTS commands (
|
||||
id INTEGER PRIMARY KEY,
|
||||
session TEXT NOT NULL,
|
||||
timestamp_start TEXT NOT NULL,
|
||||
timestamp_end TEXT NOT NULL,
|
||||
duration_ms INTEGER NOT NULL,
|
||||
cmd TEXT NOT NULL,
|
||||
cwd TEXT NOT NULL,
|
||||
exit_code INTEGER NOT NULL,
|
||||
stdout TEXT,
|
||||
stderr TEXT
|
||||
)
|
||||
",
|
||||
[],
|
||||
)
|
||||
.context("Failed to create commands table")?;
|
||||
|
||||
Ok()
|
||||
}
|
||||
|
||||
/// Log a command execution.
|
||||
pub async fn log_command(&self, record: &CommandRecord) -> Result<i64> {
|
||||
let conn = self.conn.lock().await;
|
||||
let res = conn
|
||||
.execute(
|
||||
"
|
||||
INSERT INTO commands (
|
||||
session,
|
||||
timestamp_start,
|
||||
timestamp_end,
|
||||
duration_ms,
|
||||
cmd,
|
||||
cwd,
|
||||
exit_code,
|
||||
stdout,
|
||||
stderr
|
||||
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
|
||||
",
|
||||
params![
|
||||
record.session,
|
||||
record.timestamp_start.to_rfc3339(),
|
||||
record.timestamp_end.to_rfc3339(),
|
||||
record.duration_ms,
|
||||
record.cmd,
|
||||
record.cwd,
|
||||
record.exit_code,
|
||||
record.stdout,
|
||||
record.stderr
|
||||
],
|
||||
)
|
||||
.context("Failed to insert command record")?;
|
||||
|
||||
Ok(conn.last_insert_rowid())
|
||||
}
|
||||
|
||||
/// Get command history for a session.
|
||||
pub async fn get_command_history(&self, session: &str, limit: usize) -> Result<Vec<CommandRecord>> {
|
||||
let conn = self.conn.lock().await;
|
||||
let mut stmt = conn
|
||||
.prepare(
|
||||
"
|
||||
SELECT
|
||||
session,
|
||||
timestamp_start,
|
||||
timestamp_end,
|
||||
duration_ms,
|
||||
cmd,
|
||||
cwd,
|
||||
exit_code,
|
||||
stdout,
|
||||
stderr
|
||||
FROM commands
|
||||
WHERE session = ?
|
||||
ORDER BY timestamp_start DESC
|
||||
LIMIT ?
|
||||
",
|
||||
)
|
||||
.context("Failed to prepare command history query")?;
|
||||
|
||||
let records = stmt
|
||||
.query_map(params![session, limit as i64], |row| {
|
||||
let timestamp_start: String = row.get(1)?;
|
||||
let timestamp_end: String = row.get(2)?;
|
||||
|
||||
Ok(CommandRecord {
|
||||
session: row.get(0)?,
|
||||
timestamp_start: DateTime::parse_from_rfc3339(×tamp_start)
|
||||
.map(|dt| dt.with_timezone(&Utc))
|
||||
.unwrap_or_else(|_| Utc::now()),
|
||||
timestamp_end: DateTime::parse_from_rfc3339(×tamp_end)
|
||||
.map(|dt| dt.with_timezone(&Utc))
|
||||
.unwrap_or_else(|_| Utc::now()),
|
||||
duration_ms: row.get(3)?,
|
||||
cmd: row.get(4)?,
|
||||
cwd: row.get(5)?,
|
||||
exit_code: row.get(6)?,
|
||||
stdout: row.get(7)?,
|
||||
stderr: row.get(8)?,
|
||||
})
|
||||
})
|
||||
.context("Failed to execute command history query")?;
|
||||
|
||||
let mut results = Vec::new();
|
||||
for record in records {
|
||||
results.push(record.context("Failed to parse command record")?);
|
||||
}
|
||||
|
||||
Ok(results)
|
||||
}
|
||||
|
||||
/// Get command count for a session.
|
||||
pub async fn get_command_count(&self, session: &str) -> Result<u64> {
|
||||
let conn = self.conn.lock().await;
|
||||
let count: u64 = conn
|
||||
.query_row(
|
||||
"SELECT COUNT(*) FROM commands WHERE session = ?",
|
||||
params![session],
|
||||
|row| row.get(0),
|
||||
)
|
||||
.context("Failed to get command count")?;
|
||||
|
||||
Ok(count)
|
||||
}
|
||||
}
|
||||
|
|
@ -0,0 +1,74 @@
|
|||
//! Cypraea Daemon (cypraeadd)
|
||||
//!
|
||||
//! This is the server component of the Cypraea shell. It manages shell sessions,
|
||||
//! executes commands, and provides logging and persistence.
|
||||
|
||||
use anyhow::{Context, Result};
|
||||
use clap::Parser;
|
||||
use tracing::{info, warn, error};
|
||||
use cypraea_common::paths;
|
||||
|
||||
mod db;
|
||||
mod session;
|
||||
mod socket;
|
||||
|
||||
#[derive(Parser, Debug)]
|
||||
#[clap(version, about, long_about = None)]
|
||||
struct Args {
|
||||
/// Path to the Unix domain socket
|
||||
#[clap(short, long)]
|
||||
socket: Option<String>,
|
||||
|
||||
/// Path to the SQLite database
|
||||
#[clap(short, long)]
|
||||
database: Option<String>,
|
||||
|
||||
/// Enable debug logging
|
||||
#[clap(short, long)]
|
||||
debug: bool,
|
||||
}
|
||||
|
||||
#[tokio::main]
|
||||
async fn main() -> Result<()> {
|
||||
let args = Args::parse();
|
||||
|
||||
// Setup logging
|
||||
let log_level = if args.debug {
|
||||
tracing::Level::DEBUG
|
||||
} else {
|
||||
tracing::Level::INFO
|
||||
};
|
||||
|
||||
tracing_subscriber::fmt()
|
||||
.with_max_level(log_level)
|
||||
.init();
|
||||
|
||||
info!("Starting Cypraea daemon");
|
||||
|
||||
// Get paths
|
||||
let socket_path = match args.socket {
|
||||
Some(path) => path.into(),
|
||||
None => paths::socket_path().context("Failed to get socket path")?,
|
||||
};
|
||||
|
||||
let db_path = match args.database {
|
||||
Some(path) => path.into(),
|
||||
None => paths::database_path().context("Failed to get database path")?,
|
||||
};
|
||||
|
||||
// Initialize database
|
||||
let db = db::Database::new(&db_path).await
|
||||
.context("Failed to initialize database")?;
|
||||
|
||||
// Initialize session manager
|
||||
let session_manager = session::SessionManager::new(db.clone());
|
||||
|
||||
// Start socket server
|
||||
socket::Server::new(socket_path, session_manager)
|
||||
.run()
|
||||
.await
|
||||
.context("Socket server error")?;
|
||||
|
||||
info!("Daemon shutting down");
|
||||
Ok()
|
||||
}
|
||||
|
|
@ -0,0 +1,109 @@
|
|||
//! Command execution utilities for sessions.
|
||||
|
||||
use anyhow::{Context, Result};
|
||||
use std::process::Stdio;
|
||||
use tokio::io::{AsyncBufReadExt, AsyncRead, BufReader};
|
||||
use tokio::process::{Child, Command};
|
||||
use tokio::sync::mpsc;
|
||||
|
||||
/// Output capture result.
|
||||
pub struct CapturedOutput {
|
||||
pub stdout: String,
|
||||
pub stderr: String,
|
||||
pub exit_code: i32,
|
||||
}
|
||||
|
||||
/// Run a command and capture its output.
|
||||
pub async fn run_and_capture(cmd: &str, cwd: &str, env: &[(String, String)]) -> Result<CapturedOutput> {
|
||||
// Parse the command line
|
||||
let args = shell_words::split(cmd).context("Failed to parse command line")?;
|
||||
if args.is_empty() {
|
||||
return Ok(CapturedOutput {
|
||||
stdout: String::new(),
|
||||
stderr: String::new(),
|
||||
exit_code: 0,
|
||||
});
|
||||
}
|
||||
|
||||
// Set up the command
|
||||
let mut command = Command::new(&args[0]);
|
||||
command
|
||||
.args(&args[1..])
|
||||
.current_dir(cwd)
|
||||
.env_clear();
|
||||
|
||||
// Add environment variables
|
||||
for (key, value) in env {
|
||||
command.env(key, value);
|
||||
}
|
||||
|
||||
// Set up I/O
|
||||
command
|
||||
.stdout(Stdio::piped())
|
||||
.stderr(Stdio::piped());
|
||||
|
||||
// Spawn the command
|
||||
let mut child = command.spawn().context("Failed to spawn command")?;
|
||||
|
||||
// Set up channels for stdout and stderr
|
||||
let (stdout_tx, mut stdout_rx) = mpsc::channel::<String>(100);
|
||||
let (stderr_tx, mut stderr_rx) = mpsc::channel::<String>(100);
|
||||
|
||||
// Capture stdout
|
||||
let stdout = child.stdout.take().expect("Child stdout not captured");
|
||||
let stdout_future = capture_output(stdout, stdout_tx);
|
||||
|
||||
// Capture stderr
|
||||
let stderr = child.stderr.take().expect("Child stderr not captured");
|
||||
let stderr_future = capture_output(stderr, stderr_tx);
|
||||
|
||||
// Spawn futures to capture stdout and stderr
|
||||
tokio::spawn(stdout_future);
|
||||
tokio::spawn(stderr_future);
|
||||
|
||||
// Collect stdout and stderr
|
||||
let mut stdout_buffer = String::new();
|
||||
let mut stderr_buffer = String::new();
|
||||
|
||||
loop {
|
||||
tokio::select! {
|
||||
Some(line) = stdout_rx.recv() => {
|
||||
stdout_buffer.push_str(&line);
|
||||
}
|
||||
Some(line) = stderr_rx.recv() => {
|
||||
stderr_buffer.push_str(&line);
|
||||
}
|
||||
else => break,
|
||||
}
|
||||
}
|
||||
|
||||
// Wait for the command to finish
|
||||
let status = child.wait().await.context("Failed to wait for command")?;
|
||||
|
||||
Ok(CapturedOutput {
|
||||
stdout: stdout_buffer,
|
||||
stderr: stderr_buffer,
|
||||
exit_code: status.code().unwrap_or(-1),
|
||||
})
|
||||
}
|
||||
|
||||
/// Capture output from a child process.
|
||||
async fn capture_output<R>(reader: R, tx: mpsc::Sender<String>) -> Result<()>
|
||||
where
|
||||
R: AsyncRead + Unpin,
|
||||
{
|
||||
let mut reader = BufReader::new(reader);
|
||||
let mut line = String::new();
|
||||
|
||||
loop {
|
||||
line.clear();
|
||||
let bytes_read = reader.read_line(&mut line).await?;
|
||||
if bytes_read == 0 {
|
||||
break;
|
||||
}
|
||||
|
||||
tx.send(line.clone()).await.ok();
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
|
@ -0,0 +1,308 @@
|
|||
//! Session management for the Cypraea daemon.
|
||||
//!
|
||||
//! This module handles shell sessions, including creation, management,
|
||||
//! and command execution within sessions.
|
||||
|
||||
use anyhow::{anyhow, Context, Result};
|
||||
use chrono::{DateTime, Duration, Utc};
|
||||
use cypraea_common::protocol::SessionInfo;
|
||||
use shell_words;
|
||||
use std::collections::HashMap;
|
||||
use std::path::{Path, PathBuf};
|
||||
use std::process::Stdio;
|
||||
use std::sync::Arc;
|
||||
use tokio::io::{AsyncBufReadExt, AsyncRead, BufReader};
|
||||
use tokio::process::{Child, Command};
|
||||
use tokio::sync::{mpsc, Mutex, RwLock};
|
||||
use tracing::{debug, error, info, warn};
|
||||
|
||||
use crate::db::{CommandRecord, Database};
|
||||
|
||||
pub mod command;
|
||||
|
||||
/// A shell session.
|
||||
#[derive(Debug)]
|
||||
pub struct Session {
|
||||
/// Session ID.
|
||||
id: String,
|
||||
/// Current working directory.
|
||||
cwd: PathBuf,
|
||||
/// Environment variables.
|
||||
env: HashMap<String, String>,
|
||||
/// Aliases.
|
||||
aliases: HashMap<String, String>,
|
||||
/// When the session was created.
|
||||
created_at: DateTime<Utc>,
|
||||
/// When the session was last used.
|
||||
last_used: DateTime<Utc>,
|
||||
/// Whether a command is currently running.
|
||||
active: bool,
|
||||
/// Command count.
|
||||
command_count: u64,
|
||||
}
|
||||
|
||||
impl Session {
|
||||
/// Create a new session.
|
||||
pub fn new(id: String) -> Self {
|
||||
let home = dirs::home_dir().unwrap_or_else(|| PathBuf::from("."));
|
||||
let now = Utc::now();
|
||||
|
||||
Self {
|
||||
id,
|
||||
cwd: home,
|
||||
env: std::env::vars().collect(),
|
||||
aliases: HashMap::new(),
|
||||
created_at: now,
|
||||
last_used: now,
|
||||
active: false,
|
||||
command_count: 0,
|
||||
}
|
||||
}
|
||||
|
||||
/// Get the current working directory.
|
||||
pub fn get_cwd(&self) -> &Path {
|
||||
&self.cwd
|
||||
}
|
||||
|
||||
/// Change the current working directory.
|
||||
pub fn change_directory(&mut self, dir: &str) -> Result<()> {
|
||||
let new_dir = if dir.starts_with("/") {
|
||||
PathBuf::from(dir)
|
||||
} else {
|
||||
self.cwd.join(dir)
|
||||
};
|
||||
|
||||
// Canonicalize the path to resolve . and ..
|
||||
let canonical = new_dir
|
||||
.canonicalize()
|
||||
.context("Failed to resolve directory")?;
|
||||
|
||||
// Verify the directory exists and is accessible
|
||||
if !canonical.is_dir() {
|
||||
return Err(anyhow!("Not a directory: {}", dir));
|
||||
}
|
||||
|
||||
self.cwd = canonical;
|
||||
self.last_used = Utc::now();
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Set an environment variable.
|
||||
pub fn set_env(&mut self, key: String, value: String) {
|
||||
self.env.insert(key, value);
|
||||
self.last_used = Utc::now();
|
||||
}
|
||||
|
||||
/// Get an environment variable.
|
||||
pub fn get_env(&self, key: &str) -> Option<&String> {
|
||||
self.env.get(key)
|
||||
}
|
||||
|
||||
/// Set an alias.
|
||||
pub fn set_alias(&mut self, name: String, value: String) {
|
||||
self.aliases.insert(name, value);
|
||||
self.last_used = Utc::now();
|
||||
}
|
||||
|
||||
/// Get an alias.
|
||||
pub fn get_alias(&self, name: &str) -> Option<&String> {
|
||||
self.aliases.get(name)
|
||||
}
|
||||
|
||||
/// Get the session info.
|
||||
pub fn get_info(&self) -> SessionInfo {
|
||||
SessionInfo {
|
||||
id: self.id.clone(),
|
||||
cwd: self.cwd.to_string_lossy().to_string(),
|
||||
active: self.active,
|
||||
created_at: self.created_at,
|
||||
last_used: self.last_used,
|
||||
command_count: self.command_count,
|
||||
}
|
||||
}
|
||||
|
||||
/// Execute a command in this session.
|
||||
pub async fn execute_command(
|
||||
&mut self,
|
||||
cmd: &str,
|
||||
stdout_tx: mpsc::Sender<String>,
|
||||
stderr_tx: mpsc::Sender<String>,
|
||||
db: Option<Database>,
|
||||
) -> Result<i32> {
|
||||
// Parse the command line
|
||||
let args = shell_words::split(cmd).context("Failed to parse command line")?;
|
||||
if args.is_empty() {
|
||||
return Ok(0);
|
||||
}
|
||||
|
||||
// Check for built-in commands
|
||||
if args[0] == "cd" {
|
||||
let dir = args.get(1).map(|s| s.as_str()).unwrap_or("~");
|
||||
match self.change_directory(dir) {
|
||||
Ok(_) => return Ok(0),
|
||||
Err(e) => {
|
||||
let err_msg = format!("cd: {}", e);
|
||||
stderr_tx.send(err_msg).await.ok();
|
||||
return Ok(1);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Check for alias substitution
|
||||
let args = if let Some(alias) = self.get_alias(&args[0]) {
|
||||
// Replace the command with its alias
|
||||
let mut alias_args = shell_words::split(alias).unwrap_or_default();
|
||||
let mut cmd_args = args[1..].to_vec();
|
||||
alias_args.append(&mut cmd_args);
|
||||
alias_args
|
||||
} else {
|
||||
args
|
||||
};
|
||||
|
||||
if args.is_empty() {
|
||||
return Ok(0);
|
||||
}
|
||||
|
||||
self.active = true;
|
||||
self.last_used = Utc::now();
|
||||
self.command_count += 1;
|
||||
|
||||
// Record start time
|
||||
let start_time = Utc::now();
|
||||
|
||||
// Set up the command
|
||||
let mut command = Command::new(&args[0]);
|
||||
command
|
||||
.args(&args[1..])
|
||||
.current_dir(&self.cwd)
|
||||
.env_clear()
|
||||
.envs(&self.env)
|
||||
.stdout(Stdio::piped())
|
||||
.stderr(Stdio::piped());
|
||||
|
||||
// Spawn the command
|
||||
let mut child = command.spawn().context("Failed to spawn command")?;
|
||||
|
||||
// Capture stdout
|
||||
let stdout = child.stdout.take().expect("Child stdout not captured");
|
||||
let stdout_tx_clone = stdout_tx.clone();
|
||||
let stdout_future = Self::capture_output(stdout, stdout_tx_clone);
|
||||
|
||||
// Capture stderr
|
||||
let stderr = child.stderr.take().expect("Child stderr not captured");
|
||||
let stderr_future = Self::capture_output(stderr, stderr_tx.clone());
|
||||
|
||||
// Spawn futures to capture stdout and stderr
|
||||
tokio::spawn(stdout_future);
|
||||
tokio::spawn(stderr_future);
|
||||
|
||||
// Collect stdout and stderr for logging
|
||||
let mut stdout_buffer = Vec::new();
|
||||
let mut stderr_buffer = Vec::new();
|
||||
|
||||
// Wait for the command to finish
|
||||
let status = child.wait().await.context("Failed to wait for command")?;
|
||||
|
||||
// Record end time and calculate duration
|
||||
let end_time = Utc::now();
|
||||
let duration = end_time.signed_duration_since(start_time);
|
||||
|
||||
// Log the command execution if database is provided
|
||||
if let Some(db) = db {
|
||||
let record = CommandRecord {
|
||||
session: self.id.clone(),
|
||||
timestamp_start: start_time,
|
||||
timestamp_end: end_time,
|
||||
duration_ms: duration.num_milliseconds(),
|
||||
cmd: cmd.to_string(),
|
||||
cwd: self.cwd.to_string_lossy().to_string(),
|
||||
exit_code: status.code().unwrap_or(-1),
|
||||
stdout: String::from_utf8_lossy(&stdout_buffer).to_string(),
|
||||
stderr: String::from_utf8_lossy(&stderr_buffer).to_string(),
|
||||
};
|
||||
|
||||
if let Err(e) = db.log_command(&record).await {
|
||||
error!("Failed to log command: {}", e);
|
||||
}
|
||||
}
|
||||
|
||||
self.active = false;
|
||||
Ok(status.code().unwrap_or(-1))
|
||||
}
|
||||
|
||||
/// Capture output from a child process.
|
||||
async fn capture_output<R>(reader: R, tx: mpsc::Sender<String>) -> Result<()>
|
||||
where
|
||||
R: AsyncRead + Unpin,
|
||||
{
|
||||
let mut reader = BufReader::new(reader);
|
||||
let mut line = String::new();
|
||||
|
||||
loop {
|
||||
line.clear();
|
||||
let bytes_read = reader.read_line(&mut line).await?;
|
||||
if bytes_read == 0 {
|
||||
break;
|
||||
}
|
||||
|
||||
tx.send(line.clone()).await.ok();
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
/// Session manager.
|
||||
#[derive(Clone)]
|
||||
pub struct SessionManager {
|
||||
sessions: Arc<RwLock<HashMap<String, Arc<Mutex<Session>>>>>,
|
||||
db: Database,
|
||||
}
|
||||
|
||||
impl SessionManager {
|
||||
/// Create a new session manager.
|
||||
pub fn new(db: Database) -> Self {
|
||||
Self {
|
||||
sessions: Arc::new(RwLock::new(HashMap::new())),
|
||||
db,
|
||||
}
|
||||
}
|
||||
|
||||
/// Get or create a session.
|
||||
pub async fn get_or_create_session(&self, id: &str) -> Arc<Mutex<Session>> {
|
||||
let mut sessions = self.sessions.write().await;
|
||||
|
||||
if !sessions.contains_key(id) {
|
||||
info!("Creating new session: {}", id);
|
||||
let session = Session::new(id.to_string());
|
||||
sessions.insert(id.to_string(), Arc::new(Mutex::new(session)));
|
||||
}
|
||||
|
||||
sessions.get(id).unwrap().clone()
|
||||
}
|
||||
|
||||
/// Get a session if it exists.
|
||||
pub async fn get_session(&self, id: &str) -> Option<Arc<Mutex<Session>>> {
|
||||
let sessions = self.sessions.read().await;
|
||||
sessions.get(id).cloned()
|
||||
}
|
||||
|
||||
/// List all sessions.
|
||||
pub async fn list_sessions(&self) -> Vec<SessionInfo> {
|
||||
let sessions = self.sessions.read().await;
|
||||
let mut result = Vec::new();
|
||||
|
||||
for session in sessions.values() {
|
||||
let session = session.lock().await;
|
||||
result.push(session.get_info());
|
||||
}
|
||||
|
||||
result
|
||||
}
|
||||
|
||||
/// Get the database connection.
|
||||
pub fn get_db(&self) -> Database {
|
||||
self.db.clone()
|
||||
}
|
||||
}
|
||||
|
|
@ -0,0 +1,295 @@
|
|||
//! Socket server for the Cypraea daemon.
|
||||
//!
|
||||
//! This module handles communication with clients over a Unix domain socket.
|
||||
|
||||
use anyhow::{Context, Result};
|
||||
use cypraea_common::protocol::{ClientMessage, DaemonMessage};
|
||||
use serde_json::Deserializer;
|
||||
use std::fs;
|
||||
use std::path::Path;
|
||||
use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader, BufWriter};
|
||||
use tokio::net::{UnixListener, UnixStream};
|
||||
use tokio::sync::mpsc;
|
||||
use tracing::{debug, error, info, warn};
|
||||
|
||||
use crate::session::SessionManager;
|
||||
|
||||
/// Socket server for the daemon.
|
||||
pub struct Server {
|
||||
/// Path to the Unix domain socket.
|
||||
socket_path: Box<Path>,
|
||||
/// Session manager.
|
||||
session_manager: SessionManager,
|
||||
}
|
||||
|
||||
impl Server {
|
||||
/// Create a new socket server.
|
||||
pub fn new<P: AsRef<Path>>(socket_path: P, session_manager: SessionManager) -> Self {
|
||||
Self {
|
||||
socket_path: socket_path.as_ref().into(),
|
||||
session_manager,
|
||||
}
|
||||
}
|
||||
|
||||
/// Run the socket server.
|
||||
pub async fn run(&self) -> Result<()> {
|
||||
// Remove the socket file if it already exists
|
||||
if self.socket_path.exists() {
|
||||
fs::remove_file(&self.socket_path).context("Failed to remove existing socket file")?;
|
||||
}
|
||||
|
||||
// Create the listener
|
||||
let listener = UnixListener::bind(&self.socket_path)
|
||||
.context("Failed to bind Unix domain socket")?;
|
||||
|
||||
info!("Listening on {:?}", self.socket_path);
|
||||
|
||||
// Accept connections
|
||||
loop {
|
||||
match listener.accept().await {
|
||||
Ok((stream, _addr)) => {
|
||||
let session_manager = self.session_manager.clone();
|
||||
tokio::spawn(async move {
|
||||
if let Err(e) = handle_client(stream, session_manager).await {
|
||||
error!("Error handling client: {}", e);
|
||||
}
|
||||
});
|
||||
}
|
||||
Err(e) => {
|
||||
error!("Error accepting connection: {}", e);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Handle a client connection.
|
||||
async fn handle_client(stream: UnixStream, session_manager: SessionManager) -> Result<()> {
|
||||
let (reader, writer) = tokio::io::split(stream);
|
||||
let mut reader = BufReader::new(reader);
|
||||
let mut writer = BufWriter::new(writer);
|
||||
|
||||
// Create a channel for sending responses back to the client
|
||||
let (tx, mut rx) = mpsc::channel::<DaemonMessage>(100);
|
||||
|
||||
// Spawn a task to handle sending responses
|
||||
let write_task = tokio::spawn(async move {
|
||||
while let Some(msg) = rx.recv().await {
|
||||
if let Err(e) = send_message(&mut writer, &msg).await {
|
||||
error!("Error sending message to client: {}", e);
|
||||
break;
|
||||
}
|
||||
}
|
||||
Ok::<_, anyhow::Error>(())
|
||||
});
|
||||
|
||||
// Process incoming messages
|
||||
let mut stream = Deserializer::from_reader(reader).into_iter::<ClientMessage>();
|
||||
|
||||
while let Some(msg_result) = stream.next() {
|
||||
let msg = msg_result.context("Failed to parse client message")?;
|
||||
debug!("Received message: {:?}", msg);
|
||||
|
||||
if let Err(e) = process_message(msg, &session_manager, tx.clone()).await {
|
||||
error!("Error processing message: {}", e);
|
||||
let error_msg = DaemonMessage::Error {
|
||||
message: e.to_string(),
|
||||
};
|
||||
tx.send(error_msg).await.ok();
|
||||
}
|
||||
}
|
||||
|
||||
// Cancel the write task
|
||||
write_task.abort();
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Process a client message.
|
||||
async fn process_message(
|
||||
msg: ClientMessage,
|
||||
session_manager: &SessionManager,
|
||||
tx: mpsc::Sender<DaemonMessage>,
|
||||
) -> Result<()> {
|
||||
match msg {
|
||||
ClientMessage::RunCommand { session, cmd, cwd, env } => {
|
||||
// Get or create the session
|
||||
let session_arc = session_manager.get_or_create_session(&session).await;
|
||||
let mut session_guard = session_arc.lock().await;
|
||||
|
||||
// Set custom working directory if provided
|
||||
if let Some(dir) = cwd {
|
||||
if let Err(e) = session_guard.change_directory(&dir) {
|
||||
tx.send(DaemonMessage::Error {
|
||||
message: format!("Failed to change directory: {}", e),
|
||||
})
|
||||
.await
|
||||
.context("Failed to send error message")?;
|
||||
return Ok(());
|
||||
}
|
||||
}
|
||||
|
||||
// Set custom environment variables if provided
|
||||
if let Some(env_vars) = env {
|
||||
for (key, value) in env_vars {
|
||||
session_guard.set_env(key, value);
|
||||
}
|
||||
}
|
||||
|
||||
// Create channels for stdout and stderr
|
||||
let (stdout_tx, mut stdout_rx) = mpsc::channel::<String>(100);
|
||||
let (stderr_tx, mut stderr_rx) = mpsc::channel::<String>(100);
|
||||
|
||||
// Clone the response channel and session ID for the output handlers
|
||||
let stdout_session = session.clone();
|
||||
let stderr_session = session.clone();
|
||||
let tx_stdout = tx.clone();
|
||||
let tx_stderr = tx.clone();
|
||||
|
||||
// Spawn tasks to forward stdout and stderr to the client
|
||||
let stdout_task = tokio::spawn(async move {
|
||||
while let Some(data) = stdout_rx.recv().await {
|
||||
let msg = DaemonMessage::Stdout {
|
||||
session: stdout_session.clone(),
|
||||
data,
|
||||
};
|
||||
if tx_stdout.send(msg).await.is_err() {
|
||||
break;
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
let stderr_task = tokio::spawn(async move {
|
||||
while let Some(data) = stderr_rx.recv().await {
|
||||
let msg = DaemonMessage::Stderr {
|
||||
session: stderr_session.clone(),
|
||||
data,
|
||||
};
|
||||
if tx_stderr.send(msg).await.is_err() {
|
||||
break;
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
// Execute the command
|
||||
let db = session_manager.get_db();
|
||||
let exit_code = session_guard
|
||||
.execute_command(&cmd, stdout_tx, stderr_tx, Some(db))
|
||||
.await
|
||||
.context("Failed to execute command")?;
|
||||
|
||||
// Wait for the output handlers to finish
|
||||
stdout_task.abort();
|
||||
stderr_task.abort();
|
||||
|
||||
// Send the exit code
|
||||
let exit_msg = DaemonMessage::Exit {
|
||||
session,
|
||||
code: exit_code,
|
||||
};
|
||||
tx.send(exit_msg).await.context("Failed to send exit message")?;
|
||||
}
|
||||
|
||||
ClientMessage::Attach { session } => {
|
||||
// Get or create the session
|
||||
let session_arc = session_manager.get_or_create_session(&session).await;
|
||||
let session_guard = session_arc.lock().await;
|
||||
|
||||
// Send session info
|
||||
let info = session_guard.get_info();
|
||||
let msg = DaemonMessage::SessionInfo { session: info };
|
||||
tx.send(msg).await.context("Failed to send session info")?;
|
||||
}
|
||||
|
||||
ClientMessage::Detach { session: _ } => {
|
||||
// Nothing special to do for detach, just acknowledge
|
||||
let msg = DaemonMessage::Success {
|
||||
message: "Detached from session".to_string(),
|
||||
};
|
||||
tx.send(msg).await.context("Failed to send success message")?;
|
||||
}
|
||||
|
||||
ClientMessage::ListSessions => {
|
||||
// Get all sessions
|
||||
let sessions = session_manager.list_sessions().await;
|
||||
let msg = DaemonMessage::Sessions { sessions };
|
||||
tx.send(msg).await.context("Failed to send sessions list")?;
|
||||
}
|
||||
|
||||
ClientMessage::CreateSession { name } => {
|
||||
// Generate session ID
|
||||
let id = match name {
|
||||
Some(name) => name,
|
||||
None => format!("{}", chrono::Utc::now().timestamp()),
|
||||
};
|
||||
|
||||
// Create the session
|
||||
let session_arc = session_manager.get_or_create_session(&id).await;
|
||||
let session_guard = session_arc.lock().await;
|
||||
|
||||
// Send session info
|
||||
let info = session_guard.get_info();
|
||||
let msg = DaemonMessage::SessionInfo { session: info };
|
||||
tx.send(msg).await.context("Failed to send session info")?;
|
||||
}
|
||||
|
||||
ClientMessage::ChangeDirectory { session, dir } => {
|
||||
// Get the session
|
||||
let session_arc = session_manager.get_or_create_session(&session).await;
|
||||
let mut session_guard = session_arc.lock().await;
|
||||
|
||||
// Change directory
|
||||
match session_guard.change_directory(&dir) {
|
||||
Ok(_) => {
|
||||
let msg = DaemonMessage::Success {
|
||||
message: format!("Changed directory to {}", session_guard.get_cwd().display()),
|
||||
};
|
||||
tx.send(msg).await.context("Failed to send success message")?;
|
||||
}
|
||||
Err(e) => {
|
||||
let msg = DaemonMessage::Error {
|
||||
message: format!("Failed to change directory: {}", e),
|
||||
};
|
||||
tx.send(msg).await.context("Failed to send error message")?;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
ClientMessage::SessionInfo { session } => {
|
||||
// Get the session
|
||||
let session_arc = match session_manager.get_session(&session).await {
|
||||
Some(s) => s,
|
||||
None => {
|
||||
let msg = DaemonMessage::Error {
|
||||
message: format!("Session not found: {}", session),
|
||||
};
|
||||
tx.send(msg).await.context("Failed to send error message")?;
|
||||
return Ok(());
|
||||
}
|
||||
};
|
||||
|
||||
let session_guard = session_arc.lock().await;
|
||||
let info = session_guard.get_info();
|
||||
let msg = DaemonMessage::SessionInfo { session: info };
|
||||
tx.send(msg).await.context("Failed to send session info")?;
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Send a message to the client.
|
||||
async fn send_message<W: AsyncWriteExt + Unpin>(
|
||||
writer: &mut W,
|
||||
msg: &DaemonMessage,
|
||||
) -> Result<()> {
|
||||
// Serialize the message to JSON
|
||||
let json = serde_json::to_string(msg).context("Failed to serialize message")?;
|
||||
|
||||
// Write the message followed by a newline
|
||||
writer.write_all(json.as_bytes()).await.context("Failed to write message")?;
|
||||
writer.write_all(b"\n").await.context("Failed to write newline")?;
|
||||
writer.flush().await.context("Failed to flush writer")?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
Loading…
Reference in New Issue