From e19b758d37a72736194c0a9e88c315e74edaf621 Mon Sep 17 00:00:00 2001 From: Toby Hede Date: Tue, 10 Mar 2026 13:38:04 +1100 Subject: [PATCH 1/7] feat(proxy): add connection timeout with default 120s and proper error reporting - Add configurable connection_timeout (default 120s, 0 to disable) to DatabaseConfig - Fix ConnectionTimeout error message to display milliseconds (was showing seconds) - Map ConnectionTimeout to PostgreSQL error code 57P05 instead of generic 58000 - Send ErrorResponse to client on timeout via channel writer - Add pull_policy: never to docker-compose proxy services to ensure local builds - Add connect_timeout to integration test client config - Add unit tests for config defaults, error message format, and error code mapping - Add integration tests for connection isolation under load - Document local build requirement in DEVELOPMENT.md --- DEVELOPMENT.md | 5 + mise.toml | 3 + .../src/common.rs | 3 +- .../src/connection_resilience.rs | 181 ++++++++++++++++++ .../cipherstash-proxy-integration/src/lib.rs | 1 + .../cipherstash-proxy/src/config/database.rs | 39 +++- packages/cipherstash-proxy/src/error.rs | 10 +- .../src/postgresql/error_handler.rs | 53 +++++ .../src/postgresql/handler.rs | 14 +- tests/docker-compose.yml | 7 + 10 files changed, 312 insertions(+), 4 deletions(-) create mode 100644 packages/cipherstash-proxy-integration/src/connection_resilience.rs diff --git a/DEVELOPMENT.md b/DEVELOPMENT.md index 5733b1e7..7e9a17a4 100644 --- a/DEVELOPMENT.md +++ b/DEVELOPMENT.md @@ -321,6 +321,9 @@ mise run proxy:down Running Proxy in a container cross-compiles a binary for Linux and the current architecture (`amd64`, `arm64`), then copies the binary into the container. We cross-compile binary outside the container because it's generally faster, due to packages already being cached, and slower network and disk IO in Docker. +> [!IMPORTANT] +> **Proxy must always be built from source for testing.** The `proxy:up` task builds a binary from source (`build:binary`), packages it into a Docker image tagged `cipherstash/proxy:latest` (`build:docker`), then starts it via `docker compose up`. The `tests/docker-compose.yml` file uses `pull_policy: never` on the proxy services to ensure Docker never pulls the released image from Docker Hub. If you see an error like `pull access denied` or `image not found`, run `mise run build` first to build the local image. + ### Building Build a binary and Docker image: @@ -460,6 +463,8 @@ This project uses `docker compose` to manage containers and networking. The configuration for those containers is in `tests/docker-compose.yml`. +The proxy services in `tests/docker-compose.yml` use `pull_policy: never` to ensure Docker never pulls the released `cipherstash/proxy:latest` image from Docker Hub. The image must be built locally from source via `mise run proxy:up` (or `mise run build`). This guarantees integration tests always run against the current source code. + The integration tests use the `proxy:up` and `proxy:down` commands documented above to run containers in different configurations. #### Configuration: configuring PostgreSQL containers in integration tests diff --git a/mise.toml b/mise.toml index bd730982..516dc382 100644 --- a/mise.toml +++ b/mise.toml @@ -663,6 +663,9 @@ cp -v {{config_root}}/target/{{ target }}/release/cipherstash-proxy {{config_roo [tasks."build:docker"] depends = ["eql:download"] +# Tags the image as cipherstash/proxy:latest locally. +# tests/docker-compose.yml uses pull_policy: never to ensure this local image +# is always used instead of the released image on Docker Hub. description = "Build a Docker image for cipherstash-proxy" run = """ {% set default_platform = "linux/" ~ arch() | replace(from="x86_64", to="amd64") %} diff --git a/packages/cipherstash-proxy-integration/src/common.rs b/packages/cipherstash-proxy-integration/src/common.rs index 31e274bf..66c1edb4 100644 --- a/packages/cipherstash-proxy-integration/src/common.rs +++ b/packages/cipherstash-proxy-integration/src/common.rs @@ -128,7 +128,8 @@ pub fn connection_config(port: u16) -> tokio_postgres::Config { .port(port) .user(&username) .password(&password) - .dbname(&name); + .dbname(&name) + .connect_timeout(std::time::Duration::from_secs(10)); db_config } diff --git a/packages/cipherstash-proxy-integration/src/connection_resilience.rs b/packages/cipherstash-proxy-integration/src/connection_resilience.rs new file mode 100644 index 00000000..12a281f7 --- /dev/null +++ b/packages/cipherstash-proxy-integration/src/connection_resilience.rs @@ -0,0 +1,181 @@ +/// Tests that validate proxy connection isolation under load. +/// +/// These tests verify that: +/// - Slow queries on one connection don't block other connections +/// - The proxy accepts new connections after client disconnect +/// - Concurrent connections under load remain responsive +/// - Blocked backend connections don't affect other proxy connections +#[cfg(test)] +mod tests { + use crate::common::{connect_with_tls, PROXY, PG_PORT}; + use std::sync::Arc; + use std::time::Instant; + use tokio::sync::Notify; + use tokio::task::JoinSet; + use tokio::time::{timeout, Duration}; + + /// A slow query on one connection does not block other connections through the proxy. + #[tokio::test] + async fn slow_query_does_not_block_other_connections() { + let result = timeout(Duration::from_secs(30), async { + let client_a = connect_with_tls(PROXY).await; + let client_b = connect_with_tls(PROXY).await; + + // Connection A: run a slow query + let a_handle = tokio::spawn(async move { + client_a + .simple_query("SELECT pg_sleep(5)") + .await + .unwrap(); + }); + + // Brief pause to ensure A's query is in flight + tokio::time::sleep(Duration::from_millis(200)).await; + + // Connection B: run a fast query, should complete promptly + let start = Instant::now(); + let rows = client_b.simple_query("SELECT 1").await.unwrap(); + let elapsed = start.elapsed(); + + assert!(!rows.is_empty(), "Expected result from SELECT 1"); + assert!( + elapsed < Duration::from_secs(2), + "Fast query took {elapsed:?}, expected < 2s — proxy may be blocking" + ); + + a_handle.await.unwrap(); + }) + .await; + + result.expect("Test timed out after 30s"); + } + + /// Proxy accepts new connections after a client disconnects. + #[tokio::test] + async fn proxy_accepts_new_connections_after_client_disconnect() { + let result = timeout(Duration::from_secs(10), async { + // First connection: query, then drop + { + let client = connect_with_tls(PROXY).await; + let rows = client.simple_query("SELECT 1").await.unwrap(); + assert!(!rows.is_empty()); + } + // Client dropped here + + // Brief pause + tokio::time::sleep(Duration::from_millis(100)).await; + + // Second connection: should work fine + let client = connect_with_tls(PROXY).await; + let rows = client.simple_query("SELECT 1").await.unwrap(); + assert!(!rows.is_empty()); + }) + .await; + + result.expect("Test timed out after 10s"); + } + + /// Concurrent slow and fast connections: fast queries complete promptly under slow load. + #[tokio::test] + async fn concurrent_connections_under_slow_load() { + let result = timeout(Duration::from_secs(30), async { + let mut join_set = JoinSet::new(); + + // 5 slow connections + for _ in 0..5 { + join_set.spawn(async { + let client = connect_with_tls(PROXY).await; + client + .simple_query("SELECT pg_sleep(3)") + .await + .unwrap(); + }); + } + + // Brief pause to let slow queries start + tokio::time::sleep(Duration::from_millis(300)).await; + + // 5 fast connections, each should complete promptly + for _ in 0..5 { + join_set.spawn(async { + let start = Instant::now(); + let client = connect_with_tls(PROXY).await; + let rows = client.simple_query("SELECT 1").await.unwrap(); + let elapsed = start.elapsed(); + + assert!(!rows.is_empty()); + assert!( + elapsed < Duration::from_secs(5), + "Fast query took {elapsed:?} under slow load, expected < 5s" + ); + }); + } + + while let Some(result) = join_set.join_next().await { + result.unwrap(); + } + }) + .await; + + result.expect("Test timed out after 30s"); + } + + /// An advisory-lock-blocked connection through the proxy does not block other proxy connections. + #[tokio::test] + async fn advisory_lock_blocked_connection_does_not_block_proxy() { + let result = timeout(Duration::from_secs(30), async { + // Connection A: hold an advisory lock (connect directly to PG to avoid proxy interference) + let client_a = connect_with_tls(PG_PORT).await; + client_a + .simple_query("SELECT pg_advisory_lock(12345)") + .await + .unwrap(); + + let a_ready = Arc::new(Notify::new()); + let a_ready_tx = a_ready.clone(); + + // Connection B: through proxy, attempt to acquire the same lock (will block) + let b_handle = tokio::spawn(async move { + let client_b = connect_with_tls(PROXY).await; + a_ready_tx.notify_one(); + // This will block until A releases the lock + client_b + .simple_query("SELECT pg_advisory_lock(12345)") + .await + .unwrap(); + // Release after acquiring + client_b + .simple_query("SELECT pg_advisory_unlock(12345)") + .await + .unwrap(); + }); + + // Wait for B to be connected and attempting the lock + a_ready.notified().await; + tokio::time::sleep(Duration::from_millis(500)).await; + + // Connection C: through proxy, should complete immediately despite B being blocked + let start = Instant::now(); + let client_c = connect_with_tls(PROXY).await; + let rows = client_c.simple_query("SELECT 1").await.unwrap(); + let elapsed = start.elapsed(); + + assert!(!rows.is_empty()); + assert!( + elapsed < Duration::from_secs(2), + "Connection C took {elapsed:?}, expected < 2s — blocked connection may be affecting proxy" + ); + + // Release the lock so B can complete + client_a + .simple_query("SELECT pg_advisory_unlock(12345)") + .await + .unwrap(); + + b_handle.await.unwrap(); + }) + .await; + + result.expect("Test timed out after 30s"); + } +} diff --git a/packages/cipherstash-proxy-integration/src/lib.rs b/packages/cipherstash-proxy-integration/src/lib.rs index e88d7cb8..ebbd20a4 100644 --- a/packages/cipherstash-proxy-integration/src/lib.rs +++ b/packages/cipherstash-proxy-integration/src/lib.rs @@ -1,4 +1,5 @@ mod common; +mod connection_resilience; mod decrypt; mod diagnostics; mod disable_mapping; diff --git a/packages/cipherstash-proxy/src/config/database.rs b/packages/cipherstash-proxy/src/config/database.rs index 649fa0a5..588ed5cb 100644 --- a/packages/cipherstash-proxy/src/config/database.rs +++ b/packages/cipherstash-proxy/src/config/database.rs @@ -75,8 +75,14 @@ impl DatabaseConfig { self.password.to_owned().risky_unwrap() } + const DEFAULT_CONNECTION_TIMEOUT_MS: u64 = 120_000; + pub fn connection_timeout(&self) -> Option { - self.connection_timeout.map(Duration::from_millis) + match self.connection_timeout { + Some(0) => None, + Some(ms) => Some(Duration::from_millis(ms)), + None => Some(Duration::from_millis(Self::DEFAULT_CONNECTION_TIMEOUT_MS)), + } } pub fn server_name(&self) -> Result, Error> { @@ -104,6 +110,37 @@ impl DatabaseConfig { } } +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn connection_timeout_defaults_to_120_seconds() { + let config = DatabaseConfig::for_testing(); + assert_eq!( + config.connection_timeout(), + Some(Duration::from_secs(120)) + ); + } + + #[test] + fn connection_timeout_zero_disables_timeout() { + let mut config = DatabaseConfig::for_testing(); + config.connection_timeout = Some(0); + assert_eq!(config.connection_timeout(), None); + } + + #[test] + fn connection_timeout_custom_value_in_millis() { + let mut config = DatabaseConfig::for_testing(); + config.connection_timeout = Some(5000); + assert_eq!( + config.connection_timeout(), + Some(Duration::from_millis(5000)) + ); + } +} + /// /// Password is NEVER EVER displayed /// diff --git a/packages/cipherstash-proxy/src/error.rs b/packages/cipherstash-proxy/src/error.rs index afd37b69..b87c1a85 100644 --- a/packages/cipherstash-proxy/src/error.rs +++ b/packages/cipherstash-proxy/src/error.rs @@ -27,7 +27,7 @@ pub enum Error { #[error("Connection closed by client")] ConnectionClosed, - #[error("Connection timed out after {} ms", duration.as_secs())] + #[error("Connection timed out after {} ms", duration.as_millis())] ConnectionTimeout { duration: Duration }, #[error("Error creating connection")] @@ -522,4 +522,12 @@ mod tests { assert_eq!(format!("Statement encountered an internal error. This may be a bug in the statement mapping module of CipherStash Proxy. Please visit {ERROR_DOC_BASE_URL}#mapping-internal-error for more information."), message); } + + #[test] + fn connection_timeout_message_shows_millis() { + let error = Error::ConnectionTimeout { + duration: Duration::from_millis(5000), + }; + assert_eq!(error.to_string(), "Connection timed out after 5000 ms"); + } } diff --git a/packages/cipherstash-proxy/src/postgresql/error_handler.rs b/packages/cipherstash-proxy/src/postgresql/error_handler.rs index 53a87375..644c327d 100644 --- a/packages/cipherstash-proxy/src/postgresql/error_handler.rs +++ b/packages/cipherstash-proxy/src/postgresql/error_handler.rs @@ -53,6 +53,7 @@ pub trait PostgreSqlErrorHandler { Error::Encrypt(EncryptError::UnknownKeysetIdentifier { .. }) => { ErrorResponse::system_error(err.to_string()) } + Error::ConnectionTimeout { .. } => ErrorResponse::connection_timeout(), _ => ErrorResponse::system_error(err.to_string()), } } @@ -67,3 +68,55 @@ pub trait PostgreSqlErrorHandler { /// * `error_response` - The ErrorResponse to send to the client fn send_error_response(&mut self, err: Error) -> Result<(), Error>; } + +#[cfg(test)] +mod tests { + use super::*; + use crate::postgresql::messages::error_response::{ + ErrorResponseCode, CODE_IDLE_SESSION_TIMEOUT, CODE_SYSTEM_ERROR, + }; + use std::time::Duration; + + /// Minimal implementation of PostgreSqlErrorHandler for testing the default method. + struct TestHandler; + + impl PostgreSqlErrorHandler for TestHandler { + fn client_sender(&mut self) -> &mut Sender { + unimplemented!("not needed for error_to_response tests") + } + + fn client_id(&self) -> i32 { + 0 + } + + fn send_error_response(&mut self, _err: Error) -> Result<(), Error> { + unimplemented!("not needed for error_to_response tests") + } + } + + fn error_code(response: &ErrorResponse) -> Option<&str> { + response + .fields + .iter() + .find(|f| f.code == ErrorResponseCode::Code) + .map(|f| f.value.as_str()) + } + + #[test] + fn connection_timeout_maps_to_57p05() { + let handler = TestHandler; + let err = Error::ConnectionTimeout { + duration: Duration::from_millis(5000), + }; + let response = handler.error_to_response(err); + assert_eq!(error_code(&response), Some(CODE_IDLE_SESSION_TIMEOUT)); + } + + #[test] + fn unknown_error_maps_to_system_error() { + let handler = TestHandler; + let err = Error::Unknown; + let response = handler.error_to_response(err); + assert_eq!(error_code(&response), Some(CODE_SYSTEM_ERROR)); + } +} diff --git a/packages/cipherstash-proxy/src/postgresql/handler.rs b/packages/cipherstash-proxy/src/postgresql/handler.rs index b13734b3..e979605c 100644 --- a/packages/cipherstash-proxy/src/postgresql/handler.rs +++ b/packages/cipherstash-proxy/src/postgresql/handler.rs @@ -232,6 +232,7 @@ pub async fn handler(client_stream: AsyncStream, context: Context) -> R } } + let timeout_sender = channel_writer.sender(); tokio::spawn(channel_writer.receive()); let client_to_server = async { @@ -258,7 +259,18 @@ pub async fn handler(client_stream: AsyncStream, context: Context) -> R Ok::<(), Error>(()) }; - tokio::try_join!(client_to_server, server_to_client)?; + let result = tokio::try_join!(client_to_server, server_to_client); + + if let Err(Error::ConnectionTimeout { .. }) = &result { + let error_response = ErrorResponse::connection_timeout(); + if let Ok(bytes) = BytesMut::try_from(error_response) { + let _ = timeout_sender.send(bytes); + } + // Brief yield to allow ChannelWriter to flush + tokio::task::yield_now().await; + } + + result?; Ok(()) } diff --git a/tests/docker-compose.yml b/tests/docker-compose.yml index 9f02b650..d7d0ef3c 100644 --- a/tests/docker-compose.yml +++ b/tests/docker-compose.yml @@ -48,6 +48,11 @@ services: proxy: image: cipherstash/proxy:latest + # Never pull from Docker Hub. The proxy image must be built from source + # via `mise run proxy:up` (which runs build:binary + build:docker). + # This ensures integration tests always run against the current source, + # not the released version on Docker Hub. + pull_policy: never container_name: proxy ports: - 6432:6432 @@ -90,6 +95,8 @@ services: proxy-tls: image: cipherstash/proxy:latest + # See pull_policy comment on the proxy service above. + pull_policy: never container_name: proxy-tls ports: - 6432:6432 From 806eb5936804386efb35deb8c73c32c65b002b5e Mon Sep 17 00:00:00 2001 From: Toby Hede Date: Tue, 10 Mar 2026 13:57:57 +1100 Subject: [PATCH 2/7] fix(proxy): address code review feedback for connection timeout - Pass actual timeout duration in ErrorResponse message to client - Document rationale for using 57P05 over 08006 error code - Document yield_now() as best-effort flush with known limitation - Document advisory lock test race condition timing margin - Replace magic number 12345 with named ADVISORY_LOCK_ID constant --- .../src/connection_resilience.rs | 21 +++++++++++++++---- .../src/postgresql/error_handler.rs | 16 +++++++++++++- .../src/postgresql/handler.rs | 9 +++++--- .../src/postgresql/messages/error_response.rs | 11 ++++++++-- 4 files changed, 47 insertions(+), 10 deletions(-) diff --git a/packages/cipherstash-proxy-integration/src/connection_resilience.rs b/packages/cipherstash-proxy-integration/src/connection_resilience.rs index 12a281f7..305d7e9a 100644 --- a/packages/cipherstash-proxy-integration/src/connection_resilience.rs +++ b/packages/cipherstash-proxy-integration/src/connection_resilience.rs @@ -14,6 +14,10 @@ mod tests { use tokio::task::JoinSet; use tokio::time::{timeout, Duration}; + /// Advisory lock ID used in isolation tests. Arbitrary value — just needs to be + /// unique across concurrently running test suites against the same database. + const ADVISORY_LOCK_ID: i64 = 99_001; + /// A slow query on one connection does not block other connections through the proxy. #[tokio::test] async fn slow_query_does_not_block_other_connections() { @@ -121,18 +125,27 @@ mod tests { } /// An advisory-lock-blocked connection through the proxy does not block other proxy connections. + /// + /// Note: Connection B notifies readiness before `pg_advisory_lock` reaches PostgreSQL. + /// The 500ms sleep provides a generous margin for the lock attempt to reach PG, but is + /// not strictly guaranteed. In practice this has not caused flakiness. #[tokio::test] async fn advisory_lock_blocked_connection_does_not_block_proxy() { + let lock_query = format!("SELECT pg_advisory_lock({ADVISORY_LOCK_ID})"); + let unlock_query = format!("SELECT pg_advisory_unlock({ADVISORY_LOCK_ID})"); + let result = timeout(Duration::from_secs(30), async { // Connection A: hold an advisory lock (connect directly to PG to avoid proxy interference) let client_a = connect_with_tls(PG_PORT).await; client_a - .simple_query("SELECT pg_advisory_lock(12345)") + .simple_query(&lock_query) .await .unwrap(); let a_ready = Arc::new(Notify::new()); let a_ready_tx = a_ready.clone(); + let b_lock_query = lock_query.clone(); + let b_unlock_query = unlock_query.clone(); // Connection B: through proxy, attempt to acquire the same lock (will block) let b_handle = tokio::spawn(async move { @@ -140,12 +153,12 @@ mod tests { a_ready_tx.notify_one(); // This will block until A releases the lock client_b - .simple_query("SELECT pg_advisory_lock(12345)") + .simple_query(&b_lock_query) .await .unwrap(); // Release after acquiring client_b - .simple_query("SELECT pg_advisory_unlock(12345)") + .simple_query(&b_unlock_query) .await .unwrap(); }); @@ -168,7 +181,7 @@ mod tests { // Release the lock so B can complete client_a - .simple_query("SELECT pg_advisory_unlock(12345)") + .simple_query(&unlock_query) .await .unwrap(); diff --git a/packages/cipherstash-proxy/src/postgresql/error_handler.rs b/packages/cipherstash-proxy/src/postgresql/error_handler.rs index 644c327d..cd57aa9e 100644 --- a/packages/cipherstash-proxy/src/postgresql/error_handler.rs +++ b/packages/cipherstash-proxy/src/postgresql/error_handler.rs @@ -53,7 +53,9 @@ pub trait PostgreSqlErrorHandler { Error::Encrypt(EncryptError::UnknownKeysetIdentifier { .. }) => { ErrorResponse::system_error(err.to_string()) } - Error::ConnectionTimeout { .. } => ErrorResponse::connection_timeout(), + Error::ConnectionTimeout { .. } => { + ErrorResponse::connection_timeout(err.to_string()) + } _ => ErrorResponse::system_error(err.to_string()), } } @@ -102,6 +104,14 @@ mod tests { .map(|f| f.value.as_str()) } + fn error_message(response: &ErrorResponse) -> Option<&str> { + response + .fields + .iter() + .find(|f| f.code == ErrorResponseCode::Message) + .map(|f| f.value.as_str()) + } + #[test] fn connection_timeout_maps_to_57p05() { let handler = TestHandler; @@ -110,6 +120,10 @@ mod tests { }; let response = handler.error_to_response(err); assert_eq!(error_code(&response), Some(CODE_IDLE_SESSION_TIMEOUT)); + assert_eq!( + error_message(&response), + Some("Connection timed out after 5000 ms") + ); } #[test] diff --git a/packages/cipherstash-proxy/src/postgresql/handler.rs b/packages/cipherstash-proxy/src/postgresql/handler.rs index e979605c..2a3302fd 100644 --- a/packages/cipherstash-proxy/src/postgresql/handler.rs +++ b/packages/cipherstash-proxy/src/postgresql/handler.rs @@ -261,12 +261,15 @@ pub async fn handler(client_stream: AsyncStream, context: Context) -> R let result = tokio::try_join!(client_to_server, server_to_client); - if let Err(Error::ConnectionTimeout { .. }) = &result { - let error_response = ErrorResponse::connection_timeout(); + if let Err(ref err @ Error::ConnectionTimeout { .. }) = &result { + let error_response = ErrorResponse::connection_timeout(err.to_string()); if let Ok(bytes) = BytesMut::try_from(error_response) { let _ = timeout_sender.send(bytes); } - // Brief yield to allow ChannelWriter to flush + // Best-effort yield to allow ChannelWriter to flush the error response + // before the connection tears down. Not guaranteed — if the runtime doesn't + // schedule the writer task before teardown, the client may see a connection + // reset instead of the ErrorResponse. tokio::task::yield_now().await; } diff --git a/packages/cipherstash-proxy/src/postgresql/messages/error_response.rs b/packages/cipherstash-proxy/src/postgresql/messages/error_response.rs index 204521f4..7013fe1f 100644 --- a/packages/cipherstash-proxy/src/postgresql/messages/error_response.rs +++ b/packages/cipherstash-proxy/src/postgresql/messages/error_response.rs @@ -60,7 +60,14 @@ pub enum ErrorResponseCode { } impl ErrorResponse { - pub fn connection_timeout() -> Self { + /// Create a FATAL error response for connection timeout. + /// + /// Uses PostgreSQL error code 57P05 (idle_session_timeout). While this code + /// is technically for idle session timeouts, it is the closest match for a + /// proxy-enforced connection timeout. The alternative 08006 (connection_failure) + /// implies a network-level failure, which is misleading — the proxy is + /// deliberately terminating a connection that exceeded its time limit. + pub fn connection_timeout(message: String) -> Self { Self { fields: vec![ Field { @@ -77,7 +84,7 @@ impl ErrorResponse { }, Field { code: ErrorResponseCode::Message, - value: "Connection timeout".to_string(), + value: message, }, ], } From eec033fad3b83c62911fa0475fb3674e88b3e050 Mon Sep 17 00:00:00 2001 From: Toby Hede Date: Fri, 6 Mar 2026 10:31:49 +1100 Subject: [PATCH 3/7] fix: bump metrics crate to 0.24.3 to fix Rust compiler error metrics 0.24.1 has a lifetime issue (rust-lang/rust#141402) that causes compilation failures on recent Rust versions. --- Cargo.lock | 4 ++-- .../src/connection_resilience.rs | 12 +++--------- packages/cipherstash-proxy/Cargo.toml | 2 +- packages/cipherstash-proxy/src/config/database.rs | 5 +---- .../src/postgresql/error_handler.rs | 4 +--- 5 files changed, 8 insertions(+), 19 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index bc7fe6f9..f21074bc 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2510,9 +2510,9 @@ checksum = "78ca9ab1a0babb1e7d5695e3530886289c18cf2f87ec19a575a0abdce112e3a3" [[package]] name = "metrics" -version = "0.24.1" +version = "0.24.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7a7deb012b3b2767169ff203fadb4c6b0b82b947512e5eb9e0b78c2e186ad9e3" +checksum = "5d5312e9ba3771cfa961b585728215e3d972c950a3eed9252aa093d6301277e8" dependencies = [ "ahash 0.8.11", "portable-atomic", diff --git a/packages/cipherstash-proxy-integration/src/connection_resilience.rs b/packages/cipherstash-proxy-integration/src/connection_resilience.rs index 305d7e9a..951a2f81 100644 --- a/packages/cipherstash-proxy-integration/src/connection_resilience.rs +++ b/packages/cipherstash-proxy-integration/src/connection_resilience.rs @@ -7,7 +7,7 @@ /// - Blocked backend connections don't affect other proxy connections #[cfg(test)] mod tests { - use crate::common::{connect_with_tls, PROXY, PG_PORT}; + use crate::common::{connect_with_tls, PG_PORT, PROXY}; use std::sync::Arc; use std::time::Instant; use tokio::sync::Notify; @@ -27,10 +27,7 @@ mod tests { // Connection A: run a slow query let a_handle = tokio::spawn(async move { - client_a - .simple_query("SELECT pg_sleep(5)") - .await - .unwrap(); + client_a.simple_query("SELECT pg_sleep(5)").await.unwrap(); }); // Brief pause to ensure A's query is in flight @@ -89,10 +86,7 @@ mod tests { for _ in 0..5 { join_set.spawn(async { let client = connect_with_tls(PROXY).await; - client - .simple_query("SELECT pg_sleep(3)") - .await - .unwrap(); + client.simple_query("SELECT pg_sleep(3)").await.unwrap(); }); } diff --git a/packages/cipherstash-proxy/Cargo.toml b/packages/cipherstash-proxy/Cargo.toml index f71a3ce2..f161b1d3 100644 --- a/packages/cipherstash-proxy/Cargo.toml +++ b/packages/cipherstash-proxy/Cargo.toml @@ -24,7 +24,7 @@ eql-mapper = { path = "../eql-mapper" } exitcode = "1.1.2" hex = "0.4.3" md-5 = "0.10.6" -metrics = "0.24.1" +metrics = "0.24.3" metrics-exporter-prometheus = "0.17" moka = { version = "0.12", features = ["future"] } oid-registry = "0.8" diff --git a/packages/cipherstash-proxy/src/config/database.rs b/packages/cipherstash-proxy/src/config/database.rs index 588ed5cb..b8f71cf5 100644 --- a/packages/cipherstash-proxy/src/config/database.rs +++ b/packages/cipherstash-proxy/src/config/database.rs @@ -117,10 +117,7 @@ mod tests { #[test] fn connection_timeout_defaults_to_120_seconds() { let config = DatabaseConfig::for_testing(); - assert_eq!( - config.connection_timeout(), - Some(Duration::from_secs(120)) - ); + assert_eq!(config.connection_timeout(), Some(Duration::from_secs(120))); } #[test] diff --git a/packages/cipherstash-proxy/src/postgresql/error_handler.rs b/packages/cipherstash-proxy/src/postgresql/error_handler.rs index cd57aa9e..4de24e03 100644 --- a/packages/cipherstash-proxy/src/postgresql/error_handler.rs +++ b/packages/cipherstash-proxy/src/postgresql/error_handler.rs @@ -53,9 +53,7 @@ pub trait PostgreSqlErrorHandler { Error::Encrypt(EncryptError::UnknownKeysetIdentifier { .. }) => { ErrorResponse::system_error(err.to_string()) } - Error::ConnectionTimeout { .. } => { - ErrorResponse::connection_timeout(err.to_string()) - } + Error::ConnectionTimeout { .. } => ErrorResponse::connection_timeout(err.to_string()), _ => ErrorResponse::system_error(err.to_string()), } } From 34841a7a45248903d3f89b1a29cd09ef821fcbe7 Mon Sep 17 00:00:00 2001 From: Toby Hede Date: Tue, 10 Mar 2026 14:24:30 +1100 Subject: [PATCH 4/7] fix: move Display impl before test module to fix clippy error --- .../cipherstash-proxy/src/config/database.rs | 26 +++++++++---------- 1 file changed, 13 insertions(+), 13 deletions(-) diff --git a/packages/cipherstash-proxy/src/config/database.rs b/packages/cipherstash-proxy/src/config/database.rs index b8f71cf5..ade20f82 100644 --- a/packages/cipherstash-proxy/src/config/database.rs +++ b/packages/cipherstash-proxy/src/config/database.rs @@ -110,6 +110,19 @@ impl DatabaseConfig { } } +/// +/// Password is NEVER EVER displayed +/// +impl Display for DatabaseConfig { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!( + f, + "{}@{}:{}/{}", + self.username, self.host, self.port, self.name, + ) + } +} + #[cfg(test)] mod tests { use super::*; @@ -137,16 +150,3 @@ mod tests { ); } } - -/// -/// Password is NEVER EVER displayed -/// -impl Display for DatabaseConfig { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - write!( - f, - "{}@{}:{}/{}", - self.username, self.host, self.port, self.name, - ) - } -} From 5e936ad423728572911053939751528274d52da5 Mon Sep 17 00:00:00 2001 From: Toby Hede Date: Tue, 10 Mar 2026 15:17:29 +1100 Subject: [PATCH 5/7] fix(proxy): address PR review feedback for connection error handling Replace Notify + fixed sleep in advisory lock test with pg_locks polling for deterministic synchronization. Send ErrorResponse to clients for pre-split connection timeouts instead of silent disconnect. --- .../src/connection_resilience.rs | 31 ++++++++++++------- .../src/postgresql/handler.rs | 27 ++++++++++++++-- 2 files changed, 45 insertions(+), 13 deletions(-) diff --git a/packages/cipherstash-proxy-integration/src/connection_resilience.rs b/packages/cipherstash-proxy-integration/src/connection_resilience.rs index 951a2f81..1227d7c7 100644 --- a/packages/cipherstash-proxy-integration/src/connection_resilience.rs +++ b/packages/cipherstash-proxy-integration/src/connection_resilience.rs @@ -8,11 +8,10 @@ #[cfg(test)] mod tests { use crate::common::{connect_with_tls, PG_PORT, PROXY}; - use std::sync::Arc; use std::time::Instant; - use tokio::sync::Notify; use tokio::task::JoinSet; use tokio::time::{timeout, Duration}; + use tokio_postgres::SimpleQueryMessage; /// Advisory lock ID used in isolation tests. Arbitrary value — just needs to be /// unique across concurrently running test suites against the same database. @@ -120,9 +119,8 @@ mod tests { /// An advisory-lock-blocked connection through the proxy does not block other proxy connections. /// - /// Note: Connection B notifies readiness before `pg_advisory_lock` reaches PostgreSQL. - /// The 500ms sleep provides a generous margin for the lock attempt to reach PG, but is - /// not strictly guaranteed. In practice this has not caused flakiness. + /// Uses pg_locks polling to deterministically wait for client_b to be blocked on the + /// advisory lock, rather than relying on a fixed sleep. #[tokio::test] async fn advisory_lock_blocked_connection_does_not_block_proxy() { let lock_query = format!("SELECT pg_advisory_lock({ADVISORY_LOCK_ID})"); @@ -136,15 +134,12 @@ mod tests { .await .unwrap(); - let a_ready = Arc::new(Notify::new()); - let a_ready_tx = a_ready.clone(); let b_lock_query = lock_query.clone(); let b_unlock_query = unlock_query.clone(); // Connection B: through proxy, attempt to acquire the same lock (will block) let b_handle = tokio::spawn(async move { let client_b = connect_with_tls(PROXY).await; - a_ready_tx.notify_one(); // This will block until A releases the lock client_b .simple_query(&b_lock_query) @@ -157,9 +152,23 @@ mod tests { .unwrap(); }); - // Wait for B to be connected and attempting the lock - a_ready.notified().await; - tokio::time::sleep(Duration::from_millis(500)).await; + // Poll pg_locks until client_b is observed waiting for the advisory lock + let poll_query = format!( + "SELECT 1 FROM pg_locks WHERE locktype = 'advisory' AND NOT granted AND classid = 0 AND objid = {ADVISORY_LOCK_ID}" + ); + let deadline = Instant::now() + Duration::from_secs(10); + loop { + let result = client_a.simple_query(&poll_query).await.unwrap(); + let has_waiting = result.iter().any(|m| matches!(m, SimpleQueryMessage::Row(_))); + if has_waiting { + break; + } + assert!( + Instant::now() < deadline, + "Timed out waiting for client_b to be blocked on advisory lock" + ); + tokio::time::sleep(Duration::from_millis(50)).await; + } // Connection C: through proxy, should complete immediately despite B being blocked let start = Instant::now(); diff --git a/packages/cipherstash-proxy/src/postgresql/handler.rs b/packages/cipherstash-proxy/src/postgresql/handler.rs index 2a3302fd..c8c0322a 100644 --- a/packages/cipherstash-proxy/src/postgresql/handler.rs +++ b/packages/cipherstash-proxy/src/postgresql/handler.rs @@ -67,7 +67,14 @@ pub async fn handler(client_stream: AsyncStream, context: Context) -> R loop { let startup_message = - startup::read_message(&mut client_stream, context.connection_timeout()).await?; + match startup::read_message(&mut client_stream, context.connection_timeout()).await { + Ok(msg) => msg, + Err(err @ Error::ConnectionTimeout { .. }) => { + send_timeout_error(&mut client_stream, &err).await; + return Err(err); + } + Err(err) => return Err(err), + }; match &startup_message.code { StartupCode::SSLRequest => { @@ -119,7 +126,14 @@ pub async fn handler(client_stream: AsyncStream, context: Context) -> R let connection_timeout = context.connection_timeout(); let (_code, bytes) = - protocol::read_message(&mut client_stream, client_id, connection_timeout).await?; + match protocol::read_message(&mut client_stream, client_id, connection_timeout).await { + Ok(result) => result, + Err(err @ Error::ConnectionTimeout { .. }) => { + send_timeout_error(&mut client_stream, &err).await; + return Err(err); + } + Err(err) => return Err(err), + }; let password_message = PasswordMessage::try_from(&bytes)?; @@ -353,3 +367,12 @@ async fn scram_sha_256_plus_handler( Err(ProtocolError::AuthenticationFailed.into()) } } + +/// Best-effort send of a connection timeout ErrorResponse directly to a client stream. +/// Used for pre-split timeout sites where no ChannelWriter exists yet. +async fn send_timeout_error(stream: &mut S, err: &Error) { + let error_response = ErrorResponse::connection_timeout(err.to_string()); + if let Ok(bytes) = BytesMut::try_from(error_response) { + let _ = stream.write_all(&bytes).await; + } +} From fc8544d8591840795b9ddc1e63ab1e378c214af4 Mon Sep 17 00:00:00 2001 From: Toby Hede Date: Tue, 10 Mar 2026 17:36:09 +1100 Subject: [PATCH 6/7] fix(test): use proxy's configured database port in advisory lock test client_a was hardcoded to PG_PORT (5532) while the proxy may connect to a different PostgreSQL instance (e.g. postgres-tls:5617 in TLS CI). This meant the advisory lock was held on a different server than client_b was targeting, so the lock never blocked and the test was flaky in CI. Use get_database_port() to ensure client_a connects to the same PostgreSQL instance the proxy uses. --- packages/cipherstash-proxy-integration/src/common.rs | 2 +- .../src/connection_resilience.rs | 5 +++-- 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/packages/cipherstash-proxy-integration/src/common.rs b/packages/cipherstash-proxy-integration/src/common.rs index 66c1edb4..6d3e7092 100644 --- a/packages/cipherstash-proxy-integration/src/common.rs +++ b/packages/cipherstash-proxy-integration/src/common.rs @@ -213,7 +213,7 @@ where } /// Get database port from environment or use default. -fn get_database_port() -> u16 { +pub fn get_database_port() -> u16 { std::env::var("CS_DATABASE__PORT") .ok() .and_then(|s| s.parse().ok()) diff --git a/packages/cipherstash-proxy-integration/src/connection_resilience.rs b/packages/cipherstash-proxy-integration/src/connection_resilience.rs index 1227d7c7..0e8e9c18 100644 --- a/packages/cipherstash-proxy-integration/src/connection_resilience.rs +++ b/packages/cipherstash-proxy-integration/src/connection_resilience.rs @@ -7,7 +7,7 @@ /// - Blocked backend connections don't affect other proxy connections #[cfg(test)] mod tests { - use crate::common::{connect_with_tls, PG_PORT, PROXY}; + use crate::common::{connect_with_tls, get_database_port, PROXY}; use std::time::Instant; use tokio::task::JoinSet; use tokio::time::{timeout, Duration}; @@ -128,7 +128,8 @@ mod tests { let result = timeout(Duration::from_secs(30), async { // Connection A: hold an advisory lock (connect directly to PG to avoid proxy interference) - let client_a = connect_with_tls(PG_PORT).await; + let pg_port = get_database_port(); + let client_a = connect_with_tls(pg_port).await; client_a .simple_query(&lock_query) .await From 7e8331d967e375f3fd2d3f9abbe0fd150ac9f72b Mon Sep 17 00:00:00 2001 From: Toby Hede Date: Thu, 12 Mar 2026 10:08:30 +1100 Subject: [PATCH 7/7] docs: add macOS file descriptor limit note to prerequisites --- CLAUDE.md | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/CLAUDE.md b/CLAUDE.md index 58ead9cd..309cec56 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -56,6 +56,12 @@ mise run postgres:up --extra-args "--detach --wait" mise run postgres:setup # Install EQL and schema ``` +> **macOS Note:** If you hit file descriptor limits during development (e.g. "Too many open files"), you may need to increase the limit: +> ```bash +> ulimit -n 10240 +> ``` +> To make this persistent, add it to your shell profile (e.g. `~/.zshrc`). + ### Core Development Workflow ```bash # Build and run Proxy as a process (development)