Skip to content

feat(gateway): ninelives retry for hub observer#264

Open
flyingrobots wants to merge 1 commit intomainfrom
ninelives
Open

feat(gateway): ninelives retry for hub observer#264
flyingrobots wants to merge 1 commit intomainfrom
ninelives

Conversation

@flyingrobots
Copy link
Owner

Summary

  • Replaces hand-rolled exponential backoff in hub observer reconnect loop with ninelives::RetryPolicy
  • Adds full jitter to prevent synchronized retry storms across gateway instances
  • 10-attempt bursts with 10s cooldown between bursts (vs. infinite immediate retries before)
  • Extracts hub_observer_try_connect — cleanly separates connection logic from retry orchestration

Determinism

Verified safe: gateway is pure transport (no warp-core dependency, no simulation feedback). Jitter randomizes when reconnects happen, not what data flows. Protocol integrity (canonical CBOR + BLAKE3 checksums) is enforced by echo-session-proto, untouched by this change.

Test plan

  • cargo check -p echo-session-ws-gateway
  • cargo test -p echo-session-ws-gateway — 12/12 pass
  • Pre-push hook passes (fmt, clippy, tests, rustdoc, patterns, determinism)

Replaces the hand-rolled exponential backoff in the hub observer
reconnect loop with a ninelives RetryPolicy (250ms→3s, full jitter,
10-attempt bursts with 10s cooldown). Extracts connection setup into
hub_observer_try_connect, cleanly separating retry orchestration from
connection logic.
@coderabbitai
Copy link
Contributor

coderabbitai bot commented Feb 7, 2026

Summary by CodeRabbit

  • Bug Fixes

    • Improved gateway connection resilience with enhanced retry logic featuring exponential backoff (250ms–3s) and full jitter
    • Prevents retry storms across gateway instances with configurable burst limits and cooldown periods
  • Documentation

    • Updated gateway documentation with resilience improvements

Walkthrough

Introduces ninelives-based resilience policy for hub observer gateway reconnection with exponential backoff (250ms–3s), 10 burst attempts, 10s cooldown, and full jitter. Refactors connection logic into hub_observer_try_connect helper. Replaces ad-hoc retry loop with structured retry policy framework.

Changes

Cohort / File(s) Summary
Documentation & Dependency
CHANGELOG.md, README.md, Cargo.toml
Documents new gateway resilience feature. Adds ninelives 0.2 dependency. Documents hub observer reconnect with retry and jitter capability.
Core Resilience Implementation
crates/echo-session-ws-gateway/src/main.rs
Introduces HubConnectError type and Display/Error impls. Adds hub_observer_try_connect() helper for isolated connection attempts. Replaces manual retry loop with RetryPolicy: 10 attempts per burst, exponential backoff (250ms–3s, full jitter), 10s cooldown post-exhaustion. Integrates metrics and centralized logging. Phase 2 now records connected timestamp and state updates.

Sequence Diagram

sequenceDiagram
    actor Gateway as Hub Observer Gateway
    participant RetryPolicy as Retry Policy<br/>(ninelives)
    participant Connector as hub_observer_try_connect()
    participant Hub as Hub Socket
    participant Metrics as Metrics/Logging

    Gateway->>RetryPolicy: Start retry burst (max 10 attempts)
    
    loop Per attempt with exponential backoff (250ms–3s, full jitter)
        RetryPolicy->>Connector: Execute connection attempt
        Connector->>Hub: Timeout-guarded Unix socket connect + handshake
        alt Success
            Hub-->>Connector: Connected
            Connector->>Metrics: Increment connect-attempt counter
            Connector-->>RetryPolicy: ✓ Connected
            RetryPolicy->>Gateway: Return success, record timestamp
        else Failure
            Hub-->>Connector: Connection failed
            Connector->>Metrics: Increment connect-attempt counter, log error
            Connector-->>RetryPolicy: ✗ Error
            RetryPolicy->>RetryPolicy: Apply backoff + jitter delay
        end
    end

    alt Burst exhausted
        RetryPolicy->>Metrics: Log burst exhaustion
        RetryPolicy->>Gateway: Wait 10s cooldown
        Gateway->>RetryPolicy: Restart new burst
    end
Loading

Estimated code review effort

🎯 3 (Moderate) | ⏱️ ~22 minutes

Poem

Nine lives granted to the gateway's plight, 🐱
With jittered backoff, retries burn bright,
Bursts of ten, then cooldown's rest,
Hub resilience put to the test! ✨
No retry storms shall breach the gate—
Reconnection comes, no matter the fate.

🚥 Pre-merge checks | ✅ 3
✅ Passed checks (3 passed)
Check name Status Explanation
Title check ✅ Passed Title accurately summarizes the main change: introducing ninelives-based retry policy for hub observer in the gateway component.
Description check ✅ Passed Description comprehensively details the changes, rationale, and test coverage; clearly related to the changeset with specific technical justifications.
Docstring Coverage ✅ Passed Docstring coverage is 100.00% which is sufficient. The required threshold is 80.00%.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing touches
  • 📝 Generate docstrings
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Post copyable unit tests in a comment
  • Commit unit tests in branch ninelives

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link

@chatgpt-codex-connector chatgpt-codex-connector bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codex Review

Here are some automated review suggestions for this pull request.

Reviewed commit: c138d525fe

ℹ️ About Codex in GitHub

Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you

  • Open a pull request for review
  • Mark a draft as ready
  • Comment "@codex review".

If Codex has suggestions, it will comment; otherwise it will react with 👍.

Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".

Comment on lines 738 to 741
if let Err(err) = read_result {
warn!(?err, "hub observer: read loop exited with error");
}

time::sleep(backoff).await;
backoff = (backoff * 2).min(Duration::from_secs(3));
}

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Reintroduce cooldown after observer read-loop exits

When the read loop exits (either Ok(()) on EOF or an error), the code immediately re-enters the outer loop with no delay. If the hub accepts a connection but then promptly closes it (e.g., maintenance windows, load-shedding, or crash loops that still accept TCP/UDS), this results in a tight reconnect loop that bypasses the intended backoff/jitter and can hammer the hub/CPU. Consider applying a short delay or reusing the retry policy for post-disconnect reconnects so quick disconnects still get backoff.

Useful? React with 👍 / 👎.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 6

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
crates/echo-session-ws-gateway/src/main.rs (1)

544-546: 🛠️ Refactor suggestion | 🟠 Major

Fire-and-forget spawn with no panic/error surface for the observer task.

The JoinHandle from tokio::spawn is silently discarded. If run_hub_observer panics (e.g., from the new .expect() calls on the retry policy, or any other bug), the gateway continues serving with a dead observer and zero indication in logs or metrics. This is a pre-existing pattern, but the new code adds panic paths that didn't exist before.

Consider at least logging if the task exits:

🛡️ Suggested: surface observer task exits
     if !args.no_observer {
         let observer_state = state.clone();
         let warps = args.observe_warp.clone();
-        tokio::spawn(async move {
+        let handle = tokio::spawn(async move {
             run_hub_observer(observer_state, warps).await;
         });
+        tokio::spawn(async move {
+            if let Err(err) = handle.await {
+                error!(?err, "hub observer task terminated unexpectedly");
+            }
+        });
     }
🤖 Fix all issues with AI agents
In `@crates/echo-session-ws-gateway/Cargo.toml`:
- Line 23: Move the dependency entry for ninelives = "0.2" so the dependencies
block is strictly alphabetical: place ninelives between futures-util and serde;
update the Cargo.toml dependencies order so that entries read ... futures-util,
ninelives, serde ... to restore alphabetical ordering and manifest readability.

In `@crates/echo-session-ws-gateway/src/main.rs`:
- Line 702: The tuple binding let (mut reader, _writer) = match connect_result {
... } intentionally retains the WriteHalf as _writer so it stays alive for the
duration of the read loop; add a short inline comment next to that binding
referencing reader and _writer (or above the match) explaining that _writer is
deliberately kept (not named `_`) to keep the write half alive (due to
tokio::io::split/Arc-based internals) and to prevent accidental refactoring that
would drop it early.
- Around line 630-634: The change altered connect_failures semantics so it now
counts burst exhaustions instead of per-attempt failures; update the code in
hub_observer_try_connect to restore the original per-attempt failure counting by
calling the metrics.increment/connect_failures (i.e. invoke the same metric
update used by observe_hub_observer_error) for each failed connection attempt
where metrics.observe_hub_observer_connect_attempt is already called, or
alternatively add a new metric named burst_exhaustions and keep connect_failures
incremented per attempt while documenting the new field in the metrics
response/dashboard; reference hub_observer_try_connect,
observe_hub_observer_error, metrics.observe_hub_observer_connect_attempt, and
connect_failures/burst_exhaustions when making the change.
- Around line 675-686: The two .expect() calls used when constructing the
ExponentialBackoff and RetryPolicy inside run_hub_observer must be removed so
the observer task cannot panic silently; instead, handle construction failures
by returning an Err or logging the error and early-returning from
run_hub_observer (and set the hub_observer.connected flag to false if
applicable). Replace the .expect() usages on
ExponentialBackoff::new(...).with_max(...) and on RetryPolicy::builder().build()
with proper error handling (match/if let or the ? operator on a Result), log the
concrete error via tracing/logger, and return so the caller can observe the
failure; alternatively spawn the task using your existing log_void_task_result
wrapper (or await the JoinHandle) so any panic/error is surfaced. Ensure you
reference the RetryPolicy::<HubConnectError>::builder(),
ExponentialBackoff::new(...).with_max(...), and run_hub_observer symbols when
making the change.
- Around line 621-668: The connect timeout currently only covers
UnixStream::connect; to prevent a stalled peer from hanging the retry, wrap the
post-connect operations in a timeout (or individually wrap
hub_observer_send_handshake and each hub_observer_send_subscribe) using
time::timeout with an appropriate Duration (e.g., 2s), converting timeout errors
into HubConnectError and logging via warn! like the connect branch; update
hub_observer_try_connect so the call to hub_observer_send_handshake and the loop
over hub_observer_send_subscribe are executed inside the timeout (or each call
is wrapped) and their timeout/map_err handling mirrors the existing
error->HubConnectError mapping so the function returns promptly on stalled
writes.
- Around line 690-700: The retry logic currently treats every HubConnectError as
retriable by wrapping errors as ResilienceError::Inner; update the
RetryPolicyBuilder (the builder that produces retry_policy used around
hub_observer_try_connect) to include a .should_retry(|err: &HubConnectError| ->
bool) predicate that returns false for permanent protocol errors and true for
transient network errors, and change HubConnectError from an opaque String to a
typed enum (e.g., Timeout, ConnectionRefused, HandshakeFailed, SubscribeFailed)
so the predicate can make a deterministic decision; locate
hub_observer_try_connect and the RetryPolicyBuilder instantiation, adjust
hub_observer_try_connect to return the new HubConnectError enum variants, and
ensure the .should_retry closure inspects those variants to fail fast on
HandshakeFailed/SubscribeFailed while retrying on Timeout/ConnectionRefused.

echo-session-proto = { version = "0.1.0", path = "../echo-session-proto" }
futures-util = "0.3"
serde = { version = "1.0", features = ["derive"] }
ninelives = "0.2"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

🧩 Analysis chain

🏁 Script executed:

cat -n crates/echo-session-ws-gateway/Cargo.toml

Repository: flyingrobots/echo

Length of output: 1327


🌐 Web query:

ninelives crate rust crates.io 0.2

💡 Result:

ninelives (Rust) v0.2.0 is a resilience-patterns crate that provides Tower-native layers/policies for timeouts, retries (with backoff), circuit breakers, and bulkheads, with an algebraic composition system (operators like +, |, &) for combining policies. [1]

Add it to Cargo.toml:

[dependencies]
ninelives = "0.2"

[2]

License: Apache-2.0. [1]
Repository: flyingrobots/ninelives on GitHub. [2]
Notable dependencies: tokio, tower, futures, tracing, rand, async-trait. [1]

Sources: [1] https://docs.rs/ninelives/latest/ninelives/ [2] https://github.com/flyingrobots/ninelives


Dependencies must maintain strict alphabetical order—ninelives is currently misplaced.

Line 23 places ninelives after serde (line 22), violating alphabetical ordering (n < s). This inconsistency degrades manifest readability and introduces cognitive friction when scanning dependencies. Move ninelives between futures-util and serde immediately.

🔧 Correct ordering
 futures-util = "0.3"
+ninelives = "0.2"
 serde = { version = "1.0", features = ["derive"] }
-ninelives = "0.2"
 tokio = { version = "1", features = ["rt-multi-thread", "macros", "net", "io-util", "signal", "time"] }

Verification confirms ninelives v0.2.0 is a valid, published crate (Apache-2.0 licensed, Tower-native resilience library). The version and crate name are correct.

🤖 Prompt for AI Agents
In `@crates/echo-session-ws-gateway/Cargo.toml` at line 23, Move the dependency
entry for ninelives = "0.2" so the dependencies block is strictly alphabetical:
place ninelives between futures-util and serde; update the Cargo.toml
dependencies order so that entries read ... futures-util, ninelives, serde ...
to restore alphabetical ordering and manifest readability.

Comment on lines +621 to +668
async fn hub_observer_try_connect(
state: &Arc<AppState>,
warps: &[u64],
) -> Result<
(
tokio::io::ReadHalf<UnixStream>,
tokio::io::WriteHalf<UnixStream>,
),
HubConnectError,
> {
{
let mut metrics = state.metrics.lock().await;
metrics.observe_hub_observer_connect_attempt();
}

let socket_path = &state.unix_socket;

let stream = time::timeout(Duration::from_secs(2), UnixStream::connect(socket_path))
.await
.map_err(|_| {
warn!(path = %socket_path.display(), "hub observer: connect timeout");
HubConnectError("connect timeout".into())
})?
.map_err(|err| {
warn!(?err, path = %socket_path.display(), "hub observer: connect failed");
HubConnectError(format!("connect: {err}"))
})?;

let (reader, mut writer) = tokio::io::split(stream);

hub_observer_send_handshake(&mut writer)
.await
.map_err(|err| {
warn!(?err, "hub observer: handshake failed");
HubConnectError(format!("handshake: {err}"))
})?;

for warp_id in warps {
hub_observer_send_subscribe(&mut writer, *warp_id)
.await
.map_err(|err| {
warn!(?err, warp_id, "hub observer: subscribe failed");
HubConnectError(format!("subscribe warp {warp_id}: {err}"))
})?;
}

Ok((reader, writer))
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

No timeout guard on handshake/subscribe writes — a stuck peer hangs the attempt forever.

The UnixStream::connect is wrapped in a 2 s timeout (line 638), but hub_observer_send_handshake (line 651) and hub_observer_send_subscribe (line 659) are bare write_all calls with no timeout. If the hub accepts the TCP connection but stalls (e.g., kernel buffer full, hung process), this attempt never returns, and the retry policy never gets a chance to retry or back off — effectively an infinite hang inside the retry closure.

Wrap the entire attempt body in a single timeout, or at minimum wrap the write calls:

🛡️ Suggested: wrap the full attempt in a single deadline
 async fn hub_observer_try_connect(
     state: &Arc<AppState>,
     warps: &[u64],
 ) -> Result<
     (
         tokio::io::ReadHalf<UnixStream>,
         tokio::io::WriteHalf<UnixStream>,
     ),
     HubConnectError,
 > {
     {
         let mut metrics = state.metrics.lock().await;
         metrics.observe_hub_observer_connect_attempt();
     }
 
     let socket_path = &state.unix_socket;
 
-    let stream = time::timeout(Duration::from_secs(2), UnixStream::connect(socket_path))
+    let stream = time::timeout(Duration::from_secs(5), async {
+        let stream = UnixStream::connect(socket_path)
+            .await
+            .map_err(|err| {
+                warn!(?err, path = %socket_path.display(), "hub observer: connect failed");
+                HubConnectError(format!("connect: {err}"))
+            })?;
+
+        let (reader, mut writer) = tokio::io::split(stream);
+
+        hub_observer_send_handshake(&mut writer)
+            .await
+            .map_err(|err| {
+                warn!(?err, "hub observer: handshake failed");
+                HubConnectError(format!("handshake: {err}"))
+            })?;
+
+        for warp_id in warps {
+            hub_observer_send_subscribe(&mut writer, *warp_id)
+                .await
+                .map_err(|err| {
+                    warn!(?err, warp_id, "hub observer: subscribe failed");
+                    HubConnectError(format!("subscribe warp {warp_id}: {err}"))
+                })?;
+        }
+
+        Ok::<_, HubConnectError>((reader, writer))
+    })
         .await
         .map_err(|_| {
-            warn!(path = %socket_path.display(), "hub observer: connect timeout");
-            HubConnectError("connect timeout".into())
-        })?
-        .map_err(|err| {
-            warn!(?err, path = %socket_path.display(), "hub observer: connect failed");
-            HubConnectError(format!("connect: {err}"))
+            warn!(path = %socket_path.display(), "hub observer: connect+handshake timeout");
+            HubConnectError("connect+handshake timeout".into())
         })?;
-
-    let (reader, mut writer) = tokio::io::split(stream);
-
-    hub_observer_send_handshake(&mut writer)
-        .await
-        .map_err(|err| {
-            warn!(?err, "hub observer: handshake failed");
-            HubConnectError(format!("handshake: {err}"))
-        })?;
-
-    for warp_id in warps {
-        hub_observer_send_subscribe(&mut writer, *warp_id)
-            .await
-            .map_err(|err| {
-                warn!(?err, warp_id, "hub observer: subscribe failed");
-                HubConnectError(format!("subscribe warp {warp_id}: {err}"))
-            })?;
-    }
-
-    Ok((reader, writer))
+
+    stream
 }
🤖 Prompt for AI Agents
In `@crates/echo-session-ws-gateway/src/main.rs` around lines 621 - 668, The
connect timeout currently only covers UnixStream::connect; to prevent a stalled
peer from hanging the retry, wrap the post-connect operations in a timeout (or
individually wrap hub_observer_send_handshake and each
hub_observer_send_subscribe) using time::timeout with an appropriate Duration
(e.g., 2s), converting timeout errors into HubConnectError and logging via warn!
like the connect branch; update hub_observer_try_connect so the call to
hub_observer_send_handshake and the loop over hub_observer_send_subscribe are
executed inside the timeout (or each call is wrapped) and their timeout/map_err
handling mirrors the existing error->HubConnectError mapping so the function
returns promptly on stalled writes.

Comment on lines +630 to +634
> {
{
let mut metrics = state.metrics.lock().await;
metrics.observe_hub_observer_connect_attempt();
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

connect_failures semantics silently changed — now counts burst exhaustions, not individual failures.

Previously each failed connection attempt presumably incremented connect_failures. Now:

  • connect_attempts increments per-attempt (line 633, inside hub_observer_try_connect) → 10 per burst.
  • connect_failures increments once per exhausted burst (line 708, via observe_hub_observer_error) → 1 per burst.

During a prolonged outage, a dashboard consumer will see connect_attempts: 100, connect_failures: 10 instead of the old 100:100. If any monitoring or alerting is keyed on connect_failures, this 10× drop will mask outage severity.

Either:

  1. Increment connect_failures per attempt inside hub_observer_try_connect (preserving old semantics), or
  2. Document the new semantics explicitly in the metrics response / dashboard and add a separate burst_exhaustions counter.

Also applies to: 702-712

🤖 Prompt for AI Agents
In `@crates/echo-session-ws-gateway/src/main.rs` around lines 630 - 634, The
change altered connect_failures semantics so it now counts burst exhaustions
instead of per-attempt failures; update the code in hub_observer_try_connect to
restore the original per-attempt failure counting by calling the
metrics.increment/connect_failures (i.e. invoke the same metric update used by
observe_hub_observer_error) for each failed connection attempt where
metrics.observe_hub_observer_connect_attempt is already called, or alternatively
add a new metric named burst_exhaustions and keep connect_failures incremented
per attempt while documenting the new field in the metrics response/dashboard;
reference hub_observer_try_connect, observe_hub_observer_error,
metrics.observe_hub_observer_connect_attempt, and
connect_failures/burst_exhaustions when making the change.

Comment on lines +675 to +686
// 10 attempts per burst × exponential backoff (250ms → 3s) with full jitter.
// On exhaustion the outer loop applies a 10s cooldown before the next burst.
let retry_policy = RetryPolicy::<HubConnectError>::builder()
.max_attempts(10)
.backoff(
ExponentialBackoff::new(Duration::from_millis(250))
.with_max(Duration::from_secs(3))
.expect("valid backoff bounds"),
)
.with_jitter(Jitter::full())
.build()
.expect("valid retry config");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

.expect() inside a fire-and-forget tokio::spawn — silent death if it ever trips.

Lines 682 and 686 use .expect() on the backoff/policy builders. These run inside run_hub_observer, which is spawned on line 544 with a discarded JoinHandle. If either expect panics (e.g., a future ninelives version tightens validation), the observer task silently dies — no tracing log, no metric update, no hub_observer.connected = false. The gateway keeps running, blissfully unaware the observer is gone.

Yes, the inputs are currently compile-time constants and "should never fail." But you went to the trouble of building a proper resilience layer; don't undermine it with a panic landmine on the bootstrap path.

Options:

  1. Propagate the error and log it before returning from run_hub_observer.
  2. At minimum, .await the JoinHandle or use tokio::spawn + error logging wrapper (as you already do for other tasks with log_void_task_result).
🛡️ Option 1: Replace expects with match + early return
-        .backoff(
-            ExponentialBackoff::new(Duration::from_millis(250))
-                .with_max(Duration::from_secs(3))
-                .expect("valid backoff bounds"),
-        )
-        .with_jitter(Jitter::full())
-        .build()
-        .expect("valid retry config");
+        .backoff({
+            let backoff = match ExponentialBackoff::new(Duration::from_millis(250))
+                .with_max(Duration::from_secs(3))
+            {
+                Ok(b) => b,
+                Err(err) => {
+                    error!(?err, "hub observer: invalid backoff config — observer disabled");
+                    return;
+                }
+            };
+            backoff
+        })
+        .with_jitter(Jitter::full())
+        .build();
+
+    let retry_policy = match retry_policy {
+        Ok(p) => p,
+        Err(err) => {
+            error!(?err, "hub observer: invalid retry config — observer disabled");
+            return;
+        }
+    };
🤖 Prompt for AI Agents
In `@crates/echo-session-ws-gateway/src/main.rs` around lines 675 - 686, The two
.expect() calls used when constructing the ExponentialBackoff and RetryPolicy
inside run_hub_observer must be removed so the observer task cannot panic
silently; instead, handle construction failures by returning an Err or logging
the error and early-returning from run_hub_observer (and set the
hub_observer.connected flag to false if applicable). Replace the .expect()
usages on ExponentialBackoff::new(...).with_max(...) and on
RetryPolicy::builder().build() with proper error handling (match/if let or the ?
operator on a Result), log the concrete error via tracing/logger, and return so
the caller can observe the failure; alternatively spawn the task using your
existing log_void_task_result wrapper (or await the JoinHandle) so any
panic/error is surfaced. Ensure you reference the
RetryPolicy::<HubConnectError>::builder(),
ExponentialBackoff::new(...).with_max(...), and run_hub_observer symbols when
making the change.

Comment on lines +690 to +700
let connect_result = retry_policy
.execute(|| {
let state = state.clone();
let warps = warps.clone();
async move {
hub_observer_try_connect(&state, &warps)
.await
.map_err(ResilienceError::Inner)
}
Ok(Err(err)) => {
warn!(
?err,
path = %socket_path.display(),
"hub observer: failed to connect to unix socket"
);
})
.await;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

🧩 Analysis chain

🏁 Script executed:

# First, let's find and examine the main.rs file in question
fd "main.rs" crates/echo-session-ws-gateway/src --type f

Repository: flyingrobots/echo

Length of output: 103


🏁 Script executed:

# Search for ResilienceError definition
rg "enum ResilienceError|struct ResilienceError" --type rust

Repository: flyingrobots/echo

Length of output: 43


🏁 Script executed:

# Search for HubConnectError definition
rg "enum HubConnectError|struct HubConnectError" --type rust

Repository: flyingrobots/echo

Length of output: 135


🏁 Script executed:

# Look for ninelives usage and retry_policy definition
rg "retry_policy|ninelives" --type rust -A 3 -B 3

Repository: flyingrobots/echo

Length of output: 2343


🏁 Script executed:

# Read the specific lines in context
sed -n '680,710p' crates/echo-session-ws-gateway/src/main.rs

Repository: flyingrobots/echo

Length of output: 1240


🏁 Script executed:

# Check Cargo.toml for ninelives version
cat crates/echo-session-ws-gateway/Cargo.toml | grep -A 5 -B 5 ninelives

Repository: flyingrobots/echo

Length of output: 529


🏁 Script executed:

# Search for any use of ResilienceError variants in the codebase
rg "ResilienceError::" --type rust

Repository: flyingrobots/echo

Length of output: 160


🌐 Web query:

ninelives 0.2 ResilienceError variants Rust crate

💡 Result:

In ninelives v0.2.0, ResilienceError<E> has these variants:

  • Timeout { elapsed: Duration, timeout: Duration }
  • Bulkhead { in_flight: usize, max: usize }
  • BulkheadClosed
  • CircuitOpen { failure_count: usize, open_duration: Duration }
  • RetryExhausted { attempts: usize, failures: Arc<Vec<E>> }
  • Inner(E) (wraps the underlying service/operation error) [1]

🌐 Web query:

ninelives Rust crate latest version ResilienceError Fatal non-retriable

💡 Result:

  • Latest ninelives crate version: 0.2.0 (docs.rs “latest” currently points to ninelives-0.2.0). [1]

  • ResilienceError and “Fatal / non-retriable”: ninelives::ResilienceError<E> does not have a Fatal variant. It’s a unified wrapper with variants like Timeout, CircuitOpen, RetryExhausted, and Inner(E) for the underlying application error. [2]

  • How to make an error non-retriable (“fatal”): classify it in your retry policy via RetryPolicyBuilder::should_retry, which is “Predicate to decide if an Inner error is retryable.” Return false for fatal/non-retriable errors so retries stop and the original error is surfaced (as ResilienceError::Inner(your_error)). [3][2]

Example (pattern):

use ninelives::RetryPolicyBuilder;

#[derive(Debug)]
enum AppError { Temporary, Fatal }

let policy = RetryPolicyBuilder::<AppError>::new()
  .should_retry(|e| matches!(e, AppError::Temporary))
  .build()?;

Sources: [1] [2] [3]


🏁 Script executed:

# Find hub_observer_try_connect definition
rg "fn hub_observer_try_connect" --type rust -A 30

Repository: flyingrobots/echo

Length of output: 2335


🏁 Script executed:

# Search for all error handling in hub_observer_try_connect
rg "Err\(|map_err" crates/echo-session-ws-gateway/src/main.rs | head -20

Repository: flyingrobots/echo

Length of output: 832


🏁 Script executed:

# Get more lines of hub_observer_try_connect to see handshake and subscribe error paths
sed -n '600,650p' crates/echo-session-ws-gateway/src/main.rs

Repository: flyingrobots/echo

Length of output: 1714


🏁 Script executed:

# Search for hub_observer_send_handshake and subscribe functions
rg "fn hub_observer_send_handshake|fn hub_observer_subscribe" --type rust -A 20

Repository: flyingrobots/echo

Length of output: 1746


Call out the retry predicate mechanism — ResilienceError::Fatal doesn't exist in ninelives 0.2; use .should_retry() instead.

Line 697 wraps every HubConnectError as ResilienceError::Inner, treating all errors as retriable. You're right that handshake and subscribe failures are permanent (protocol mismatch, invalid warp ID) and will exhaust all 10 attempts pointlessly.

However, ninelives 0.2 has no ResilienceError::Fatal variant. The solution is to add a .should_retry() predicate on the RetryPolicyBuilder:

.should_retry(|err: &HubConnectError| {
    // Return false for permanent errors to fail fast
    // Return true for transient errors (timeout, connection refused)
})

Problem: HubConnectError(String) is untyped. You'd need to either:

  1. Parse the error message (brittle), or
  2. Refactor HubConnectError to an enum with variants distinguishing network failures (Timeout, ConnectionRefused) from protocol failures (HandshakeFailed, SubscribeFailed)

The second approach lets you return false from should_retry() for handshake/subscribe errors, failing fast instead of burning retries.

🤖 Prompt for AI Agents
In `@crates/echo-session-ws-gateway/src/main.rs` around lines 690 - 700, The retry
logic currently treats every HubConnectError as retriable by wrapping errors as
ResilienceError::Inner; update the RetryPolicyBuilder (the builder that produces
retry_policy used around hub_observer_try_connect) to include a
.should_retry(|err: &HubConnectError| -> bool) predicate that returns false for
permanent protocol errors and true for transient network errors, and change
HubConnectError from an opaque String to a typed enum (e.g., Timeout,
ConnectionRefused, HandshakeFailed, SubscribeFailed) so the predicate can make a
deterministic decision; locate hub_observer_try_connect and the
RetryPolicyBuilder instantiation, adjust hub_observer_try_connect to return the
new HubConnectError enum variants, and ensure the .should_retry closure inspects
those variants to fail fast on HandshakeFailed/SubscribeFailed while retrying on
Timeout/ConnectionRefused.

})
.await;

let (mut reader, _writer) = match connect_result {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧹 Nitpick | 🔵 Trivial

_writer binding is intentionally kept alive — subtle but correct.

_writer (not bare _) keeps the WriteHalf alive through the read loop. With tokio::io::split's Arc-based internals, dropping it early via _ would technically be safe, but the explicit binding is a good defensive choice. A comment noting why the writer half is retained (even unused) would prevent a well-meaning future refactor from changing _writer to _.

🤖 Prompt for AI Agents
In `@crates/echo-session-ws-gateway/src/main.rs` at line 702, The tuple binding
let (mut reader, _writer) = match connect_result { ... } intentionally retains
the WriteHalf as _writer so it stays alive for the duration of the read loop;
add a short inline comment next to that binding referencing reader and _writer
(or above the match) explaining that _writer is deliberately kept (not named
`_`) to keep the write half alive (due to tokio::io::split/Arc-based internals)
and to prevent accidental refactoring that would drop it early.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant