diff --git a/CLAUDE.md b/CLAUDE.md index 58ead9cd..309cec56 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -56,6 +56,12 @@ mise run postgres:up --extra-args "--detach --wait" mise run postgres:setup # Install EQL and schema ``` +> **macOS Note:** If you hit file descriptor limits during development (e.g. "Too many open files"), you may need to increase the limit: +> ```bash +> ulimit -n 10240 +> ``` +> To make this persistent, add it to your shell profile (e.g. `~/.zshrc`). + ### Core Development Workflow ```bash # Build and run Proxy as a process (development) diff --git a/Cargo.lock b/Cargo.lock index bc7fe6f9..f21074bc 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2510,9 +2510,9 @@ checksum = "78ca9ab1a0babb1e7d5695e3530886289c18cf2f87ec19a575a0abdce112e3a3" [[package]] name = "metrics" -version = "0.24.1" +version = "0.24.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7a7deb012b3b2767169ff203fadb4c6b0b82b947512e5eb9e0b78c2e186ad9e3" +checksum = "5d5312e9ba3771cfa961b585728215e3d972c950a3eed9252aa093d6301277e8" dependencies = [ "ahash 0.8.11", "portable-atomic", diff --git a/DEVELOPMENT.md b/DEVELOPMENT.md index 5733b1e7..7e9a17a4 100644 --- a/DEVELOPMENT.md +++ b/DEVELOPMENT.md @@ -321,6 +321,9 @@ mise run proxy:down Running Proxy in a container cross-compiles a binary for Linux and the current architecture (`amd64`, `arm64`), then copies the binary into the container. We cross-compile binary outside the container because it's generally faster, due to packages already being cached, and slower network and disk IO in Docker. +> [!IMPORTANT] +> **Proxy must always be built from source for testing.** The `proxy:up` task builds a binary from source (`build:binary`), packages it into a Docker image tagged `cipherstash/proxy:latest` (`build:docker`), then starts it via `docker compose up`. The `tests/docker-compose.yml` file uses `pull_policy: never` on the proxy services to ensure Docker never pulls the released image from Docker Hub. If you see an error like `pull access denied` or `image not found`, run `mise run build` first to build the local image. + ### Building Build a binary and Docker image: @@ -460,6 +463,8 @@ This project uses `docker compose` to manage containers and networking. The configuration for those containers is in `tests/docker-compose.yml`. +The proxy services in `tests/docker-compose.yml` use `pull_policy: never` to ensure Docker never pulls the released `cipherstash/proxy:latest` image from Docker Hub. The image must be built locally from source via `mise run proxy:up` (or `mise run build`). This guarantees integration tests always run against the current source code. + The integration tests use the `proxy:up` and `proxy:down` commands documented above to run containers in different configurations. #### Configuration: configuring PostgreSQL containers in integration tests diff --git a/mise.toml b/mise.toml index bd730982..516dc382 100644 --- a/mise.toml +++ b/mise.toml @@ -663,6 +663,9 @@ cp -v {{config_root}}/target/{{ target }}/release/cipherstash-proxy {{config_roo [tasks."build:docker"] depends = ["eql:download"] +# Tags the image as cipherstash/proxy:latest locally. +# tests/docker-compose.yml uses pull_policy: never to ensure this local image +# is always used instead of the released image on Docker Hub. description = "Build a Docker image for cipherstash-proxy" run = """ {% set default_platform = "linux/" ~ arch() | replace(from="x86_64", to="amd64") %} diff --git a/packages/cipherstash-proxy-integration/src/common.rs b/packages/cipherstash-proxy-integration/src/common.rs index 31e274bf..6d3e7092 100644 --- a/packages/cipherstash-proxy-integration/src/common.rs +++ b/packages/cipherstash-proxy-integration/src/common.rs @@ -128,7 +128,8 @@ pub fn connection_config(port: u16) -> tokio_postgres::Config { .port(port) .user(&username) .password(&password) - .dbname(&name); + .dbname(&name) + .connect_timeout(std::time::Duration::from_secs(10)); db_config } @@ -212,7 +213,7 @@ where } /// Get database port from environment or use default. -fn get_database_port() -> u16 { +pub fn get_database_port() -> u16 { std::env::var("CS_DATABASE__PORT") .ok() .and_then(|s| s.parse().ok()) diff --git a/packages/cipherstash-proxy-integration/src/connection_resilience.rs b/packages/cipherstash-proxy-integration/src/connection_resilience.rs new file mode 100644 index 00000000..0e8e9c18 --- /dev/null +++ b/packages/cipherstash-proxy-integration/src/connection_resilience.rs @@ -0,0 +1,198 @@ +/// Tests that validate proxy connection isolation under load. +/// +/// These tests verify that: +/// - Slow queries on one connection don't block other connections +/// - The proxy accepts new connections after client disconnect +/// - Concurrent connections under load remain responsive +/// - Blocked backend connections don't affect other proxy connections +#[cfg(test)] +mod tests { + use crate::common::{connect_with_tls, get_database_port, PROXY}; + use std::time::Instant; + use tokio::task::JoinSet; + use tokio::time::{timeout, Duration}; + use tokio_postgres::SimpleQueryMessage; + + /// Advisory lock ID used in isolation tests. Arbitrary value — just needs to be + /// unique across concurrently running test suites against the same database. + const ADVISORY_LOCK_ID: i64 = 99_001; + + /// A slow query on one connection does not block other connections through the proxy. + #[tokio::test] + async fn slow_query_does_not_block_other_connections() { + let result = timeout(Duration::from_secs(30), async { + let client_a = connect_with_tls(PROXY).await; + let client_b = connect_with_tls(PROXY).await; + + // Connection A: run a slow query + let a_handle = tokio::spawn(async move { + client_a.simple_query("SELECT pg_sleep(5)").await.unwrap(); + }); + + // Brief pause to ensure A's query is in flight + tokio::time::sleep(Duration::from_millis(200)).await; + + // Connection B: run a fast query, should complete promptly + let start = Instant::now(); + let rows = client_b.simple_query("SELECT 1").await.unwrap(); + let elapsed = start.elapsed(); + + assert!(!rows.is_empty(), "Expected result from SELECT 1"); + assert!( + elapsed < Duration::from_secs(2), + "Fast query took {elapsed:?}, expected < 2s — proxy may be blocking" + ); + + a_handle.await.unwrap(); + }) + .await; + + result.expect("Test timed out after 30s"); + } + + /// Proxy accepts new connections after a client disconnects. + #[tokio::test] + async fn proxy_accepts_new_connections_after_client_disconnect() { + let result = timeout(Duration::from_secs(10), async { + // First connection: query, then drop + { + let client = connect_with_tls(PROXY).await; + let rows = client.simple_query("SELECT 1").await.unwrap(); + assert!(!rows.is_empty()); + } + // Client dropped here + + // Brief pause + tokio::time::sleep(Duration::from_millis(100)).await; + + // Second connection: should work fine + let client = connect_with_tls(PROXY).await; + let rows = client.simple_query("SELECT 1").await.unwrap(); + assert!(!rows.is_empty()); + }) + .await; + + result.expect("Test timed out after 10s"); + } + + /// Concurrent slow and fast connections: fast queries complete promptly under slow load. + #[tokio::test] + async fn concurrent_connections_under_slow_load() { + let result = timeout(Duration::from_secs(30), async { + let mut join_set = JoinSet::new(); + + // 5 slow connections + for _ in 0..5 { + join_set.spawn(async { + let client = connect_with_tls(PROXY).await; + client.simple_query("SELECT pg_sleep(3)").await.unwrap(); + }); + } + + // Brief pause to let slow queries start + tokio::time::sleep(Duration::from_millis(300)).await; + + // 5 fast connections, each should complete promptly + for _ in 0..5 { + join_set.spawn(async { + let start = Instant::now(); + let client = connect_with_tls(PROXY).await; + let rows = client.simple_query("SELECT 1").await.unwrap(); + let elapsed = start.elapsed(); + + assert!(!rows.is_empty()); + assert!( + elapsed < Duration::from_secs(5), + "Fast query took {elapsed:?} under slow load, expected < 5s" + ); + }); + } + + while let Some(result) = join_set.join_next().await { + result.unwrap(); + } + }) + .await; + + result.expect("Test timed out after 30s"); + } + + /// An advisory-lock-blocked connection through the proxy does not block other proxy connections. + /// + /// Uses pg_locks polling to deterministically wait for client_b to be blocked on the + /// advisory lock, rather than relying on a fixed sleep. + #[tokio::test] + async fn advisory_lock_blocked_connection_does_not_block_proxy() { + let lock_query = format!("SELECT pg_advisory_lock({ADVISORY_LOCK_ID})"); + let unlock_query = format!("SELECT pg_advisory_unlock({ADVISORY_LOCK_ID})"); + + let result = timeout(Duration::from_secs(30), async { + // Connection A: hold an advisory lock (connect directly to PG to avoid proxy interference) + let pg_port = get_database_port(); + let client_a = connect_with_tls(pg_port).await; + client_a + .simple_query(&lock_query) + .await + .unwrap(); + + let b_lock_query = lock_query.clone(); + let b_unlock_query = unlock_query.clone(); + + // Connection B: through proxy, attempt to acquire the same lock (will block) + let b_handle = tokio::spawn(async move { + let client_b = connect_with_tls(PROXY).await; + // This will block until A releases the lock + client_b + .simple_query(&b_lock_query) + .await + .unwrap(); + // Release after acquiring + client_b + .simple_query(&b_unlock_query) + .await + .unwrap(); + }); + + // Poll pg_locks until client_b is observed waiting for the advisory lock + let poll_query = format!( + "SELECT 1 FROM pg_locks WHERE locktype = 'advisory' AND NOT granted AND classid = 0 AND objid = {ADVISORY_LOCK_ID}" + ); + let deadline = Instant::now() + Duration::from_secs(10); + loop { + let result = client_a.simple_query(&poll_query).await.unwrap(); + let has_waiting = result.iter().any(|m| matches!(m, SimpleQueryMessage::Row(_))); + if has_waiting { + break; + } + assert!( + Instant::now() < deadline, + "Timed out waiting for client_b to be blocked on advisory lock" + ); + tokio::time::sleep(Duration::from_millis(50)).await; + } + + // Connection C: through proxy, should complete immediately despite B being blocked + let start = Instant::now(); + let client_c = connect_with_tls(PROXY).await; + let rows = client_c.simple_query("SELECT 1").await.unwrap(); + let elapsed = start.elapsed(); + + assert!(!rows.is_empty()); + assert!( + elapsed < Duration::from_secs(2), + "Connection C took {elapsed:?}, expected < 2s — blocked connection may be affecting proxy" + ); + + // Release the lock so B can complete + client_a + .simple_query(&unlock_query) + .await + .unwrap(); + + b_handle.await.unwrap(); + }) + .await; + + result.expect("Test timed out after 30s"); + } +} diff --git a/packages/cipherstash-proxy-integration/src/lib.rs b/packages/cipherstash-proxy-integration/src/lib.rs index e88d7cb8..ebbd20a4 100644 --- a/packages/cipherstash-proxy-integration/src/lib.rs +++ b/packages/cipherstash-proxy-integration/src/lib.rs @@ -1,4 +1,5 @@ mod common; +mod connection_resilience; mod decrypt; mod diagnostics; mod disable_mapping; diff --git a/packages/cipherstash-proxy/Cargo.toml b/packages/cipherstash-proxy/Cargo.toml index f71a3ce2..f161b1d3 100644 --- a/packages/cipherstash-proxy/Cargo.toml +++ b/packages/cipherstash-proxy/Cargo.toml @@ -24,7 +24,7 @@ eql-mapper = { path = "../eql-mapper" } exitcode = "1.1.2" hex = "0.4.3" md-5 = "0.10.6" -metrics = "0.24.1" +metrics = "0.24.3" metrics-exporter-prometheus = "0.17" moka = { version = "0.12", features = ["future"] } oid-registry = "0.8" diff --git a/packages/cipherstash-proxy/src/config/database.rs b/packages/cipherstash-proxy/src/config/database.rs index 649fa0a5..ade20f82 100644 --- a/packages/cipherstash-proxy/src/config/database.rs +++ b/packages/cipherstash-proxy/src/config/database.rs @@ -75,8 +75,14 @@ impl DatabaseConfig { self.password.to_owned().risky_unwrap() } + const DEFAULT_CONNECTION_TIMEOUT_MS: u64 = 120_000; + pub fn connection_timeout(&self) -> Option { - self.connection_timeout.map(Duration::from_millis) + match self.connection_timeout { + Some(0) => None, + Some(ms) => Some(Duration::from_millis(ms)), + None => Some(Duration::from_millis(Self::DEFAULT_CONNECTION_TIMEOUT_MS)), + } } pub fn server_name(&self) -> Result, Error> { @@ -116,3 +122,31 @@ impl Display for DatabaseConfig { ) } } + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn connection_timeout_defaults_to_120_seconds() { + let config = DatabaseConfig::for_testing(); + assert_eq!(config.connection_timeout(), Some(Duration::from_secs(120))); + } + + #[test] + fn connection_timeout_zero_disables_timeout() { + let mut config = DatabaseConfig::for_testing(); + config.connection_timeout = Some(0); + assert_eq!(config.connection_timeout(), None); + } + + #[test] + fn connection_timeout_custom_value_in_millis() { + let mut config = DatabaseConfig::for_testing(); + config.connection_timeout = Some(5000); + assert_eq!( + config.connection_timeout(), + Some(Duration::from_millis(5000)) + ); + } +} diff --git a/packages/cipherstash-proxy/src/error.rs b/packages/cipherstash-proxy/src/error.rs index afd37b69..b87c1a85 100644 --- a/packages/cipherstash-proxy/src/error.rs +++ b/packages/cipherstash-proxy/src/error.rs @@ -27,7 +27,7 @@ pub enum Error { #[error("Connection closed by client")] ConnectionClosed, - #[error("Connection timed out after {} ms", duration.as_secs())] + #[error("Connection timed out after {} ms", duration.as_millis())] ConnectionTimeout { duration: Duration }, #[error("Error creating connection")] @@ -522,4 +522,12 @@ mod tests { assert_eq!(format!("Statement encountered an internal error. This may be a bug in the statement mapping module of CipherStash Proxy. Please visit {ERROR_DOC_BASE_URL}#mapping-internal-error for more information."), message); } + + #[test] + fn connection_timeout_message_shows_millis() { + let error = Error::ConnectionTimeout { + duration: Duration::from_millis(5000), + }; + assert_eq!(error.to_string(), "Connection timed out after 5000 ms"); + } } diff --git a/packages/cipherstash-proxy/src/postgresql/error_handler.rs b/packages/cipherstash-proxy/src/postgresql/error_handler.rs index 53a87375..4de24e03 100644 --- a/packages/cipherstash-proxy/src/postgresql/error_handler.rs +++ b/packages/cipherstash-proxy/src/postgresql/error_handler.rs @@ -53,6 +53,7 @@ pub trait PostgreSqlErrorHandler { Error::Encrypt(EncryptError::UnknownKeysetIdentifier { .. }) => { ErrorResponse::system_error(err.to_string()) } + Error::ConnectionTimeout { .. } => ErrorResponse::connection_timeout(err.to_string()), _ => ErrorResponse::system_error(err.to_string()), } } @@ -67,3 +68,67 @@ pub trait PostgreSqlErrorHandler { /// * `error_response` - The ErrorResponse to send to the client fn send_error_response(&mut self, err: Error) -> Result<(), Error>; } + +#[cfg(test)] +mod tests { + use super::*; + use crate::postgresql::messages::error_response::{ + ErrorResponseCode, CODE_IDLE_SESSION_TIMEOUT, CODE_SYSTEM_ERROR, + }; + use std::time::Duration; + + /// Minimal implementation of PostgreSqlErrorHandler for testing the default method. + struct TestHandler; + + impl PostgreSqlErrorHandler for TestHandler { + fn client_sender(&mut self) -> &mut Sender { + unimplemented!("not needed for error_to_response tests") + } + + fn client_id(&self) -> i32 { + 0 + } + + fn send_error_response(&mut self, _err: Error) -> Result<(), Error> { + unimplemented!("not needed for error_to_response tests") + } + } + + fn error_code(response: &ErrorResponse) -> Option<&str> { + response + .fields + .iter() + .find(|f| f.code == ErrorResponseCode::Code) + .map(|f| f.value.as_str()) + } + + fn error_message(response: &ErrorResponse) -> Option<&str> { + response + .fields + .iter() + .find(|f| f.code == ErrorResponseCode::Message) + .map(|f| f.value.as_str()) + } + + #[test] + fn connection_timeout_maps_to_57p05() { + let handler = TestHandler; + let err = Error::ConnectionTimeout { + duration: Duration::from_millis(5000), + }; + let response = handler.error_to_response(err); + assert_eq!(error_code(&response), Some(CODE_IDLE_SESSION_TIMEOUT)); + assert_eq!( + error_message(&response), + Some("Connection timed out after 5000 ms") + ); + } + + #[test] + fn unknown_error_maps_to_system_error() { + let handler = TestHandler; + let err = Error::Unknown; + let response = handler.error_to_response(err); + assert_eq!(error_code(&response), Some(CODE_SYSTEM_ERROR)); + } +} diff --git a/packages/cipherstash-proxy/src/postgresql/handler.rs b/packages/cipherstash-proxy/src/postgresql/handler.rs index b13734b3..c8c0322a 100644 --- a/packages/cipherstash-proxy/src/postgresql/handler.rs +++ b/packages/cipherstash-proxy/src/postgresql/handler.rs @@ -67,7 +67,14 @@ pub async fn handler(client_stream: AsyncStream, context: Context) -> R loop { let startup_message = - startup::read_message(&mut client_stream, context.connection_timeout()).await?; + match startup::read_message(&mut client_stream, context.connection_timeout()).await { + Ok(msg) => msg, + Err(err @ Error::ConnectionTimeout { .. }) => { + send_timeout_error(&mut client_stream, &err).await; + return Err(err); + } + Err(err) => return Err(err), + }; match &startup_message.code { StartupCode::SSLRequest => { @@ -119,7 +126,14 @@ pub async fn handler(client_stream: AsyncStream, context: Context) -> R let connection_timeout = context.connection_timeout(); let (_code, bytes) = - protocol::read_message(&mut client_stream, client_id, connection_timeout).await?; + match protocol::read_message(&mut client_stream, client_id, connection_timeout).await { + Ok(result) => result, + Err(err @ Error::ConnectionTimeout { .. }) => { + send_timeout_error(&mut client_stream, &err).await; + return Err(err); + } + Err(err) => return Err(err), + }; let password_message = PasswordMessage::try_from(&bytes)?; @@ -232,6 +246,7 @@ pub async fn handler(client_stream: AsyncStream, context: Context) -> R } } + let timeout_sender = channel_writer.sender(); tokio::spawn(channel_writer.receive()); let client_to_server = async { @@ -258,7 +273,21 @@ pub async fn handler(client_stream: AsyncStream, context: Context) -> R Ok::<(), Error>(()) }; - tokio::try_join!(client_to_server, server_to_client)?; + let result = tokio::try_join!(client_to_server, server_to_client); + + if let Err(ref err @ Error::ConnectionTimeout { .. }) = &result { + let error_response = ErrorResponse::connection_timeout(err.to_string()); + if let Ok(bytes) = BytesMut::try_from(error_response) { + let _ = timeout_sender.send(bytes); + } + // Best-effort yield to allow ChannelWriter to flush the error response + // before the connection tears down. Not guaranteed — if the runtime doesn't + // schedule the writer task before teardown, the client may see a connection + // reset instead of the ErrorResponse. + tokio::task::yield_now().await; + } + + result?; Ok(()) } @@ -338,3 +367,12 @@ async fn scram_sha_256_plus_handler( Err(ProtocolError::AuthenticationFailed.into()) } } + +/// Best-effort send of a connection timeout ErrorResponse directly to a client stream. +/// Used for pre-split timeout sites where no ChannelWriter exists yet. +async fn send_timeout_error(stream: &mut S, err: &Error) { + let error_response = ErrorResponse::connection_timeout(err.to_string()); + if let Ok(bytes) = BytesMut::try_from(error_response) { + let _ = stream.write_all(&bytes).await; + } +} diff --git a/packages/cipherstash-proxy/src/postgresql/messages/error_response.rs b/packages/cipherstash-proxy/src/postgresql/messages/error_response.rs index 204521f4..7013fe1f 100644 --- a/packages/cipherstash-proxy/src/postgresql/messages/error_response.rs +++ b/packages/cipherstash-proxy/src/postgresql/messages/error_response.rs @@ -60,7 +60,14 @@ pub enum ErrorResponseCode { } impl ErrorResponse { - pub fn connection_timeout() -> Self { + /// Create a FATAL error response for connection timeout. + /// + /// Uses PostgreSQL error code 57P05 (idle_session_timeout). While this code + /// is technically for idle session timeouts, it is the closest match for a + /// proxy-enforced connection timeout. The alternative 08006 (connection_failure) + /// implies a network-level failure, which is misleading — the proxy is + /// deliberately terminating a connection that exceeded its time limit. + pub fn connection_timeout(message: String) -> Self { Self { fields: vec![ Field { @@ -77,7 +84,7 @@ impl ErrorResponse { }, Field { code: ErrorResponseCode::Message, - value: "Connection timeout".to_string(), + value: message, }, ], } diff --git a/tests/docker-compose.yml b/tests/docker-compose.yml index 9f02b650..d7d0ef3c 100644 --- a/tests/docker-compose.yml +++ b/tests/docker-compose.yml @@ -48,6 +48,11 @@ services: proxy: image: cipherstash/proxy:latest + # Never pull from Docker Hub. The proxy image must be built from source + # via `mise run proxy:up` (which runs build:binary + build:docker). + # This ensures integration tests always run against the current source, + # not the released version on Docker Hub. + pull_policy: never container_name: proxy ports: - 6432:6432 @@ -90,6 +95,8 @@ services: proxy-tls: image: cipherstash/proxy:latest + # See pull_policy comment on the proxy service above. + pull_policy: never container_name: proxy-tls ports: - 6432:6432