feat(gateway): ninelives retry for hub observer#264
feat(gateway): ninelives retry for hub observer#264flyingrobots wants to merge 1 commit intomainfrom
Conversation
Replaces the hand-rolled exponential backoff in the hub observer reconnect loop with a ninelives RetryPolicy (250ms→3s, full jitter, 10-attempt bursts with 10s cooldown). Extracts connection setup into hub_observer_try_connect, cleanly separating retry orchestration from connection logic.
Summary by CodeRabbit
WalkthroughIntroduces ninelives-based resilience policy for hub observer gateway reconnection with exponential backoff (250ms–3s), 10 burst attempts, 10s cooldown, and full jitter. Refactors connection logic into Changes
Sequence DiagramsequenceDiagram
actor Gateway as Hub Observer Gateway
participant RetryPolicy as Retry Policy<br/>(ninelives)
participant Connector as hub_observer_try_connect()
participant Hub as Hub Socket
participant Metrics as Metrics/Logging
Gateway->>RetryPolicy: Start retry burst (max 10 attempts)
loop Per attempt with exponential backoff (250ms–3s, full jitter)
RetryPolicy->>Connector: Execute connection attempt
Connector->>Hub: Timeout-guarded Unix socket connect + handshake
alt Success
Hub-->>Connector: Connected
Connector->>Metrics: Increment connect-attempt counter
Connector-->>RetryPolicy: ✓ Connected
RetryPolicy->>Gateway: Return success, record timestamp
else Failure
Hub-->>Connector: Connection failed
Connector->>Metrics: Increment connect-attempt counter, log error
Connector-->>RetryPolicy: ✗ Error
RetryPolicy->>RetryPolicy: Apply backoff + jitter delay
end
end
alt Burst exhausted
RetryPolicy->>Metrics: Log burst exhaustion
RetryPolicy->>Gateway: Wait 10s cooldown
Gateway->>RetryPolicy: Restart new burst
end
Estimated code review effort🎯 3 (Moderate) | ⏱️ ~22 minutes Poem
🚥 Pre-merge checks | ✅ 3✅ Passed checks (3 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing touches
🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: c138d525fe
ℹ️ About Codex in GitHub
Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".
| if let Err(err) = read_result { | ||
| warn!(?err, "hub observer: read loop exited with error"); | ||
| } | ||
|
|
||
| time::sleep(backoff).await; | ||
| backoff = (backoff * 2).min(Duration::from_secs(3)); | ||
| } |
There was a problem hiding this comment.
Reintroduce cooldown after observer read-loop exits
When the read loop exits (either Ok(()) on EOF or an error), the code immediately re-enters the outer loop with no delay. If the hub accepts a connection but then promptly closes it (e.g., maintenance windows, load-shedding, or crash loops that still accept TCP/UDS), this results in a tight reconnect loop that bypasses the intended backoff/jitter and can hammer the hub/CPU. Consider applying a short delay or reusing the retry policy for post-disconnect reconnects so quick disconnects still get backoff.
Useful? React with 👍 / 👎.
There was a problem hiding this comment.
Actionable comments posted: 6
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
crates/echo-session-ws-gateway/src/main.rs (1)
544-546: 🛠️ Refactor suggestion | 🟠 MajorFire-and-forget spawn with no panic/error surface for the observer task.
The
JoinHandlefromtokio::spawnis silently discarded. Ifrun_hub_observerpanics (e.g., from the new.expect()calls on the retry policy, or any other bug), the gateway continues serving with a dead observer and zero indication in logs or metrics. This is a pre-existing pattern, but the new code adds panic paths that didn't exist before.Consider at least logging if the task exits:
🛡️ Suggested: surface observer task exits
if !args.no_observer { let observer_state = state.clone(); let warps = args.observe_warp.clone(); - tokio::spawn(async move { + let handle = tokio::spawn(async move { run_hub_observer(observer_state, warps).await; }); + tokio::spawn(async move { + if let Err(err) = handle.await { + error!(?err, "hub observer task terminated unexpectedly"); + } + }); }
🤖 Fix all issues with AI agents
In `@crates/echo-session-ws-gateway/Cargo.toml`:
- Line 23: Move the dependency entry for ninelives = "0.2" so the dependencies
block is strictly alphabetical: place ninelives between futures-util and serde;
update the Cargo.toml dependencies order so that entries read ... futures-util,
ninelives, serde ... to restore alphabetical ordering and manifest readability.
In `@crates/echo-session-ws-gateway/src/main.rs`:
- Line 702: The tuple binding let (mut reader, _writer) = match connect_result {
... } intentionally retains the WriteHalf as _writer so it stays alive for the
duration of the read loop; add a short inline comment next to that binding
referencing reader and _writer (or above the match) explaining that _writer is
deliberately kept (not named `_`) to keep the write half alive (due to
tokio::io::split/Arc-based internals) and to prevent accidental refactoring that
would drop it early.
- Around line 630-634: The change altered connect_failures semantics so it now
counts burst exhaustions instead of per-attempt failures; update the code in
hub_observer_try_connect to restore the original per-attempt failure counting by
calling the metrics.increment/connect_failures (i.e. invoke the same metric
update used by observe_hub_observer_error) for each failed connection attempt
where metrics.observe_hub_observer_connect_attempt is already called, or
alternatively add a new metric named burst_exhaustions and keep connect_failures
incremented per attempt while documenting the new field in the metrics
response/dashboard; reference hub_observer_try_connect,
observe_hub_observer_error, metrics.observe_hub_observer_connect_attempt, and
connect_failures/burst_exhaustions when making the change.
- Around line 675-686: The two .expect() calls used when constructing the
ExponentialBackoff and RetryPolicy inside run_hub_observer must be removed so
the observer task cannot panic silently; instead, handle construction failures
by returning an Err or logging the error and early-returning from
run_hub_observer (and set the hub_observer.connected flag to false if
applicable). Replace the .expect() usages on
ExponentialBackoff::new(...).with_max(...) and on RetryPolicy::builder().build()
with proper error handling (match/if let or the ? operator on a Result), log the
concrete error via tracing/logger, and return so the caller can observe the
failure; alternatively spawn the task using your existing log_void_task_result
wrapper (or await the JoinHandle) so any panic/error is surfaced. Ensure you
reference the RetryPolicy::<HubConnectError>::builder(),
ExponentialBackoff::new(...).with_max(...), and run_hub_observer symbols when
making the change.
- Around line 621-668: The connect timeout currently only covers
UnixStream::connect; to prevent a stalled peer from hanging the retry, wrap the
post-connect operations in a timeout (or individually wrap
hub_observer_send_handshake and each hub_observer_send_subscribe) using
time::timeout with an appropriate Duration (e.g., 2s), converting timeout errors
into HubConnectError and logging via warn! like the connect branch; update
hub_observer_try_connect so the call to hub_observer_send_handshake and the loop
over hub_observer_send_subscribe are executed inside the timeout (or each call
is wrapped) and their timeout/map_err handling mirrors the existing
error->HubConnectError mapping so the function returns promptly on stalled
writes.
- Around line 690-700: The retry logic currently treats every HubConnectError as
retriable by wrapping errors as ResilienceError::Inner; update the
RetryPolicyBuilder (the builder that produces retry_policy used around
hub_observer_try_connect) to include a .should_retry(|err: &HubConnectError| ->
bool) predicate that returns false for permanent protocol errors and true for
transient network errors, and change HubConnectError from an opaque String to a
typed enum (e.g., Timeout, ConnectionRefused, HandshakeFailed, SubscribeFailed)
so the predicate can make a deterministic decision; locate
hub_observer_try_connect and the RetryPolicyBuilder instantiation, adjust
hub_observer_try_connect to return the new HubConnectError enum variants, and
ensure the .should_retry closure inspects those variants to fail fast on
HandshakeFailed/SubscribeFailed while retrying on Timeout/ConnectionRefused.
| echo-session-proto = { version = "0.1.0", path = "../echo-session-proto" } | ||
| futures-util = "0.3" | ||
| serde = { version = "1.0", features = ["derive"] } | ||
| ninelives = "0.2" |
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
cat -n crates/echo-session-ws-gateway/Cargo.tomlRepository: flyingrobots/echo
Length of output: 1327
🌐 Web query:
ninelives crate rust crates.io 0.2
💡 Result:
ninelives (Rust) v0.2.0 is a resilience-patterns crate that provides Tower-native layers/policies for timeouts, retries (with backoff), circuit breakers, and bulkheads, with an algebraic composition system (operators like +, |, &) for combining policies. [1]
Add it to Cargo.toml:
[dependencies]
ninelives = "0.2"[2]
License: Apache-2.0. [1]
Repository: flyingrobots/ninelives on GitHub. [2]
Notable dependencies: tokio, tower, futures, tracing, rand, async-trait. [1]
Sources: [1] https://docs.rs/ninelives/latest/ninelives/ [2] https://github.com/flyingrobots/ninelives
Dependencies must maintain strict alphabetical order—ninelives is currently misplaced.
Line 23 places ninelives after serde (line 22), violating alphabetical ordering (n < s). This inconsistency degrades manifest readability and introduces cognitive friction when scanning dependencies. Move ninelives between futures-util and serde immediately.
🔧 Correct ordering
futures-util = "0.3"
+ninelives = "0.2"
serde = { version = "1.0", features = ["derive"] }
-ninelives = "0.2"
tokio = { version = "1", features = ["rt-multi-thread", "macros", "net", "io-util", "signal", "time"] }Verification confirms ninelives v0.2.0 is a valid, published crate (Apache-2.0 licensed, Tower-native resilience library). The version and crate name are correct.
🤖 Prompt for AI Agents
In `@crates/echo-session-ws-gateway/Cargo.toml` at line 23, Move the dependency
entry for ninelives = "0.2" so the dependencies block is strictly alphabetical:
place ninelives between futures-util and serde; update the Cargo.toml
dependencies order so that entries read ... futures-util, ninelives, serde ...
to restore alphabetical ordering and manifest readability.
| async fn hub_observer_try_connect( | ||
| state: &Arc<AppState>, | ||
| warps: &[u64], | ||
| ) -> Result< | ||
| ( | ||
| tokio::io::ReadHalf<UnixStream>, | ||
| tokio::io::WriteHalf<UnixStream>, | ||
| ), | ||
| HubConnectError, | ||
| > { | ||
| { | ||
| let mut metrics = state.metrics.lock().await; | ||
| metrics.observe_hub_observer_connect_attempt(); | ||
| } | ||
|
|
||
| let socket_path = &state.unix_socket; | ||
|
|
||
| let stream = time::timeout(Duration::from_secs(2), UnixStream::connect(socket_path)) | ||
| .await | ||
| .map_err(|_| { | ||
| warn!(path = %socket_path.display(), "hub observer: connect timeout"); | ||
| HubConnectError("connect timeout".into()) | ||
| })? | ||
| .map_err(|err| { | ||
| warn!(?err, path = %socket_path.display(), "hub observer: connect failed"); | ||
| HubConnectError(format!("connect: {err}")) | ||
| })?; | ||
|
|
||
| let (reader, mut writer) = tokio::io::split(stream); | ||
|
|
||
| hub_observer_send_handshake(&mut writer) | ||
| .await | ||
| .map_err(|err| { | ||
| warn!(?err, "hub observer: handshake failed"); | ||
| HubConnectError(format!("handshake: {err}")) | ||
| })?; | ||
|
|
||
| for warp_id in warps { | ||
| hub_observer_send_subscribe(&mut writer, *warp_id) | ||
| .await | ||
| .map_err(|err| { | ||
| warn!(?err, warp_id, "hub observer: subscribe failed"); | ||
| HubConnectError(format!("subscribe warp {warp_id}: {err}")) | ||
| })?; | ||
| } | ||
|
|
||
| Ok((reader, writer)) | ||
| } |
There was a problem hiding this comment.
No timeout guard on handshake/subscribe writes — a stuck peer hangs the attempt forever.
The UnixStream::connect is wrapped in a 2 s timeout (line 638), but hub_observer_send_handshake (line 651) and hub_observer_send_subscribe (line 659) are bare write_all calls with no timeout. If the hub accepts the TCP connection but stalls (e.g., kernel buffer full, hung process), this attempt never returns, and the retry policy never gets a chance to retry or back off — effectively an infinite hang inside the retry closure.
Wrap the entire attempt body in a single timeout, or at minimum wrap the write calls:
🛡️ Suggested: wrap the full attempt in a single deadline
async fn hub_observer_try_connect(
state: &Arc<AppState>,
warps: &[u64],
) -> Result<
(
tokio::io::ReadHalf<UnixStream>,
tokio::io::WriteHalf<UnixStream>,
),
HubConnectError,
> {
{
let mut metrics = state.metrics.lock().await;
metrics.observe_hub_observer_connect_attempt();
}
let socket_path = &state.unix_socket;
- let stream = time::timeout(Duration::from_secs(2), UnixStream::connect(socket_path))
+ let stream = time::timeout(Duration::from_secs(5), async {
+ let stream = UnixStream::connect(socket_path)
+ .await
+ .map_err(|err| {
+ warn!(?err, path = %socket_path.display(), "hub observer: connect failed");
+ HubConnectError(format!("connect: {err}"))
+ })?;
+
+ let (reader, mut writer) = tokio::io::split(stream);
+
+ hub_observer_send_handshake(&mut writer)
+ .await
+ .map_err(|err| {
+ warn!(?err, "hub observer: handshake failed");
+ HubConnectError(format!("handshake: {err}"))
+ })?;
+
+ for warp_id in warps {
+ hub_observer_send_subscribe(&mut writer, *warp_id)
+ .await
+ .map_err(|err| {
+ warn!(?err, warp_id, "hub observer: subscribe failed");
+ HubConnectError(format!("subscribe warp {warp_id}: {err}"))
+ })?;
+ }
+
+ Ok::<_, HubConnectError>((reader, writer))
+ })
.await
.map_err(|_| {
- warn!(path = %socket_path.display(), "hub observer: connect timeout");
- HubConnectError("connect timeout".into())
- })?
- .map_err(|err| {
- warn!(?err, path = %socket_path.display(), "hub observer: connect failed");
- HubConnectError(format!("connect: {err}"))
+ warn!(path = %socket_path.display(), "hub observer: connect+handshake timeout");
+ HubConnectError("connect+handshake timeout".into())
})?;
-
- let (reader, mut writer) = tokio::io::split(stream);
-
- hub_observer_send_handshake(&mut writer)
- .await
- .map_err(|err| {
- warn!(?err, "hub observer: handshake failed");
- HubConnectError(format!("handshake: {err}"))
- })?;
-
- for warp_id in warps {
- hub_observer_send_subscribe(&mut writer, *warp_id)
- .await
- .map_err(|err| {
- warn!(?err, warp_id, "hub observer: subscribe failed");
- HubConnectError(format!("subscribe warp {warp_id}: {err}"))
- })?;
- }
-
- Ok((reader, writer))
+
+ stream
}🤖 Prompt for AI Agents
In `@crates/echo-session-ws-gateway/src/main.rs` around lines 621 - 668, The
connect timeout currently only covers UnixStream::connect; to prevent a stalled
peer from hanging the retry, wrap the post-connect operations in a timeout (or
individually wrap hub_observer_send_handshake and each
hub_observer_send_subscribe) using time::timeout with an appropriate Duration
(e.g., 2s), converting timeout errors into HubConnectError and logging via warn!
like the connect branch; update hub_observer_try_connect so the call to
hub_observer_send_handshake and the loop over hub_observer_send_subscribe are
executed inside the timeout (or each call is wrapped) and their timeout/map_err
handling mirrors the existing error->HubConnectError mapping so the function
returns promptly on stalled writes.
| > { | ||
| { | ||
| let mut metrics = state.metrics.lock().await; | ||
| metrics.observe_hub_observer_connect_attempt(); | ||
| } |
There was a problem hiding this comment.
connect_failures semantics silently changed — now counts burst exhaustions, not individual failures.
Previously each failed connection attempt presumably incremented connect_failures. Now:
connect_attemptsincrements per-attempt (line 633, insidehub_observer_try_connect) → 10 per burst.connect_failuresincrements once per exhausted burst (line 708, viaobserve_hub_observer_error) → 1 per burst.
During a prolonged outage, a dashboard consumer will see connect_attempts: 100, connect_failures: 10 instead of the old 100:100. If any monitoring or alerting is keyed on connect_failures, this 10× drop will mask outage severity.
Either:
- Increment
connect_failuresper attempt insidehub_observer_try_connect(preserving old semantics), or - Document the new semantics explicitly in the metrics response / dashboard and add a separate
burst_exhaustionscounter.
Also applies to: 702-712
🤖 Prompt for AI Agents
In `@crates/echo-session-ws-gateway/src/main.rs` around lines 630 - 634, The
change altered connect_failures semantics so it now counts burst exhaustions
instead of per-attempt failures; update the code in hub_observer_try_connect to
restore the original per-attempt failure counting by calling the
metrics.increment/connect_failures (i.e. invoke the same metric update used by
observe_hub_observer_error) for each failed connection attempt where
metrics.observe_hub_observer_connect_attempt is already called, or alternatively
add a new metric named burst_exhaustions and keep connect_failures incremented
per attempt while documenting the new field in the metrics response/dashboard;
reference hub_observer_try_connect, observe_hub_observer_error,
metrics.observe_hub_observer_connect_attempt, and
connect_failures/burst_exhaustions when making the change.
| // 10 attempts per burst × exponential backoff (250ms → 3s) with full jitter. | ||
| // On exhaustion the outer loop applies a 10s cooldown before the next burst. | ||
| let retry_policy = RetryPolicy::<HubConnectError>::builder() | ||
| .max_attempts(10) | ||
| .backoff( | ||
| ExponentialBackoff::new(Duration::from_millis(250)) | ||
| .with_max(Duration::from_secs(3)) | ||
| .expect("valid backoff bounds"), | ||
| ) | ||
| .with_jitter(Jitter::full()) | ||
| .build() | ||
| .expect("valid retry config"); |
There was a problem hiding this comment.
.expect() inside a fire-and-forget tokio::spawn — silent death if it ever trips.
Lines 682 and 686 use .expect() on the backoff/policy builders. These run inside run_hub_observer, which is spawned on line 544 with a discarded JoinHandle. If either expect panics (e.g., a future ninelives version tightens validation), the observer task silently dies — no tracing log, no metric update, no hub_observer.connected = false. The gateway keeps running, blissfully unaware the observer is gone.
Yes, the inputs are currently compile-time constants and "should never fail." But you went to the trouble of building a proper resilience layer; don't undermine it with a panic landmine on the bootstrap path.
Options:
- Propagate the error and log it before returning from
run_hub_observer. - At minimum,
.awaittheJoinHandleor usetokio::spawn+ error logging wrapper (as you already do for other tasks withlog_void_task_result).
🛡️ Option 1: Replace expects with match + early return
- .backoff(
- ExponentialBackoff::new(Duration::from_millis(250))
- .with_max(Duration::from_secs(3))
- .expect("valid backoff bounds"),
- )
- .with_jitter(Jitter::full())
- .build()
- .expect("valid retry config");
+ .backoff({
+ let backoff = match ExponentialBackoff::new(Duration::from_millis(250))
+ .with_max(Duration::from_secs(3))
+ {
+ Ok(b) => b,
+ Err(err) => {
+ error!(?err, "hub observer: invalid backoff config — observer disabled");
+ return;
+ }
+ };
+ backoff
+ })
+ .with_jitter(Jitter::full())
+ .build();
+
+ let retry_policy = match retry_policy {
+ Ok(p) => p,
+ Err(err) => {
+ error!(?err, "hub observer: invalid retry config — observer disabled");
+ return;
+ }
+ };🤖 Prompt for AI Agents
In `@crates/echo-session-ws-gateway/src/main.rs` around lines 675 - 686, The two
.expect() calls used when constructing the ExponentialBackoff and RetryPolicy
inside run_hub_observer must be removed so the observer task cannot panic
silently; instead, handle construction failures by returning an Err or logging
the error and early-returning from run_hub_observer (and set the
hub_observer.connected flag to false if applicable). Replace the .expect()
usages on ExponentialBackoff::new(...).with_max(...) and on
RetryPolicy::builder().build() with proper error handling (match/if let or the ?
operator on a Result), log the concrete error via tracing/logger, and return so
the caller can observe the failure; alternatively spawn the task using your
existing log_void_task_result wrapper (or await the JoinHandle) so any
panic/error is surfaced. Ensure you reference the
RetryPolicy::<HubConnectError>::builder(),
ExponentialBackoff::new(...).with_max(...), and run_hub_observer symbols when
making the change.
| let connect_result = retry_policy | ||
| .execute(|| { | ||
| let state = state.clone(); | ||
| let warps = warps.clone(); | ||
| async move { | ||
| hub_observer_try_connect(&state, &warps) | ||
| .await | ||
| .map_err(ResilienceError::Inner) | ||
| } | ||
| Ok(Err(err)) => { | ||
| warn!( | ||
| ?err, | ||
| path = %socket_path.display(), | ||
| "hub observer: failed to connect to unix socket" | ||
| ); | ||
| }) | ||
| .await; |
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
# First, let's find and examine the main.rs file in question
fd "main.rs" crates/echo-session-ws-gateway/src --type fRepository: flyingrobots/echo
Length of output: 103
🏁 Script executed:
# Search for ResilienceError definition
rg "enum ResilienceError|struct ResilienceError" --type rustRepository: flyingrobots/echo
Length of output: 43
🏁 Script executed:
# Search for HubConnectError definition
rg "enum HubConnectError|struct HubConnectError" --type rustRepository: flyingrobots/echo
Length of output: 135
🏁 Script executed:
# Look for ninelives usage and retry_policy definition
rg "retry_policy|ninelives" --type rust -A 3 -B 3Repository: flyingrobots/echo
Length of output: 2343
🏁 Script executed:
# Read the specific lines in context
sed -n '680,710p' crates/echo-session-ws-gateway/src/main.rsRepository: flyingrobots/echo
Length of output: 1240
🏁 Script executed:
# Check Cargo.toml for ninelives version
cat crates/echo-session-ws-gateway/Cargo.toml | grep -A 5 -B 5 ninelivesRepository: flyingrobots/echo
Length of output: 529
🏁 Script executed:
# Search for any use of ResilienceError variants in the codebase
rg "ResilienceError::" --type rustRepository: flyingrobots/echo
Length of output: 160
🌐 Web query:
ninelives 0.2 ResilienceError variants Rust crate
💡 Result:
In ninelives v0.2.0, ResilienceError<E> has these variants:
Timeout { elapsed: Duration, timeout: Duration }Bulkhead { in_flight: usize, max: usize }BulkheadClosedCircuitOpen { failure_count: usize, open_duration: Duration }RetryExhausted { attempts: usize, failures: Arc<Vec<E>> }Inner(E)(wraps the underlying service/operation error) [1]
🌐 Web query:
ninelives Rust crate latest version ResilienceError Fatal non-retriable
💡 Result:
-
Latest
ninelivescrate version:0.2.0(docs.rs “latest” currently points toninelives-0.2.0). [1] -
ResilienceErrorand “Fatal / non-retriable”:ninelives::ResilienceError<E>does not have aFatalvariant. It’s a unified wrapper with variants likeTimeout,CircuitOpen,RetryExhausted, andInner(E)for the underlying application error. [2] -
How to make an error non-retriable (“fatal”): classify it in your retry policy via
RetryPolicyBuilder::should_retry, which is “Predicate to decide if anInnererror is retryable.” Returnfalsefor fatal/non-retriable errors so retries stop and the original error is surfaced (asResilienceError::Inner(your_error)). [3][2]
Example (pattern):
use ninelives::RetryPolicyBuilder;
#[derive(Debug)]
enum AppError { Temporary, Fatal }
let policy = RetryPolicyBuilder::<AppError>::new()
.should_retry(|e| matches!(e, AppError::Temporary))
.build()?;Sources: [1] [2] [3]
🏁 Script executed:
# Find hub_observer_try_connect definition
rg "fn hub_observer_try_connect" --type rust -A 30Repository: flyingrobots/echo
Length of output: 2335
🏁 Script executed:
# Search for all error handling in hub_observer_try_connect
rg "Err\(|map_err" crates/echo-session-ws-gateway/src/main.rs | head -20Repository: flyingrobots/echo
Length of output: 832
🏁 Script executed:
# Get more lines of hub_observer_try_connect to see handshake and subscribe error paths
sed -n '600,650p' crates/echo-session-ws-gateway/src/main.rsRepository: flyingrobots/echo
Length of output: 1714
🏁 Script executed:
# Search for hub_observer_send_handshake and subscribe functions
rg "fn hub_observer_send_handshake|fn hub_observer_subscribe" --type rust -A 20Repository: flyingrobots/echo
Length of output: 1746
Call out the retry predicate mechanism — ResilienceError::Fatal doesn't exist in ninelives 0.2; use .should_retry() instead.
Line 697 wraps every HubConnectError as ResilienceError::Inner, treating all errors as retriable. You're right that handshake and subscribe failures are permanent (protocol mismatch, invalid warp ID) and will exhaust all 10 attempts pointlessly.
However, ninelives 0.2 has no ResilienceError::Fatal variant. The solution is to add a .should_retry() predicate on the RetryPolicyBuilder:
.should_retry(|err: &HubConnectError| {
// Return false for permanent errors to fail fast
// Return true for transient errors (timeout, connection refused)
})Problem: HubConnectError(String) is untyped. You'd need to either:
- Parse the error message (brittle), or
- Refactor
HubConnectErrorto an enum with variants distinguishing network failures (Timeout,ConnectionRefused) from protocol failures (HandshakeFailed,SubscribeFailed)
The second approach lets you return false from should_retry() for handshake/subscribe errors, failing fast instead of burning retries.
🤖 Prompt for AI Agents
In `@crates/echo-session-ws-gateway/src/main.rs` around lines 690 - 700, The retry
logic currently treats every HubConnectError as retriable by wrapping errors as
ResilienceError::Inner; update the RetryPolicyBuilder (the builder that produces
retry_policy used around hub_observer_try_connect) to include a
.should_retry(|err: &HubConnectError| -> bool) predicate that returns false for
permanent protocol errors and true for transient network errors, and change
HubConnectError from an opaque String to a typed enum (e.g., Timeout,
ConnectionRefused, HandshakeFailed, SubscribeFailed) so the predicate can make a
deterministic decision; locate hub_observer_try_connect and the
RetryPolicyBuilder instantiation, adjust hub_observer_try_connect to return the
new HubConnectError enum variants, and ensure the .should_retry closure inspects
those variants to fail fast on HandshakeFailed/SubscribeFailed while retrying on
Timeout/ConnectionRefused.
| }) | ||
| .await; | ||
|
|
||
| let (mut reader, _writer) = match connect_result { |
There was a problem hiding this comment.
🧹 Nitpick | 🔵 Trivial
_writer binding is intentionally kept alive — subtle but correct.
_writer (not bare _) keeps the WriteHalf alive through the read loop. With tokio::io::split's Arc-based internals, dropping it early via _ would technically be safe, but the explicit binding is a good defensive choice. A comment noting why the writer half is retained (even unused) would prevent a well-meaning future refactor from changing _writer to _.
🤖 Prompt for AI Agents
In `@crates/echo-session-ws-gateway/src/main.rs` at line 702, The tuple binding
let (mut reader, _writer) = match connect_result { ... } intentionally retains
the WriteHalf as _writer so it stays alive for the duration of the read loop;
add a short inline comment next to that binding referencing reader and _writer
(or above the match) explaining that _writer is deliberately kept (not named
`_`) to keep the write half alive (due to tokio::io::split/Arc-based internals)
and to prevent accidental refactoring that would drop it early.
Summary
ninelives::RetryPolicyhub_observer_try_connect— cleanly separates connection logic from retry orchestrationDeterminism
Verified safe: gateway is pure transport (no
warp-coredependency, no simulation feedback). Jitter randomizes when reconnects happen, not what data flows. Protocol integrity (canonical CBOR + BLAKE3 checksums) is enforced byecho-session-proto, untouched by this change.Test plan
cargo check -p echo-session-ws-gatewaycargo test -p echo-session-ws-gateway— 12/12 pass