Skip to content

Feat: partial refactoring, issues detected by pyscn#266

Open
HardMax71 wants to merge 7 commits intomainfrom
feat/pyscn
Open

Feat: partial refactoring, issues detected by pyscn#266
HardMax71 wants to merge 7 commits intomainfrom
feat/pyscn

Conversation

@HardMax71
Copy link
Owner

@HardMax71 HardMax71 commented Mar 4, 2026


Summary by cubic

Standardizes worker startup with a shared run_worker and replaces the Redis SSE bus with a stream-backed SSEService for simpler, more reliable fan-out. Cleans up event models, services, tests, and frontend stores to reduce duplication and improve stability.

  • Refactors
    • SSE: removed SSERedisBus; SSEService now publishes/reads via Redis Streams; providers, handlers, event replay, and notifications updated; tests migrated; added Redis socket_keepalive.
    • Workers: added workers/bootstrap.py; all run_*.py use run_worker for Beanie/DI/broker; per-worker hooks retained.
    • Events: introduced ExecutionSpec mixin reused by ExecutionRequestedEvent and CreatePodCommandEvent.
    • User settings: moved editor validation into UserSettingsService._validate_editor_settings and enforced on update; simplified DB/schema models.
    • Services: extracted helpers in EventReplayService; refined NotificationService schedule/throttle flow.
    • Frontend/tests: enabled Vitest isolate: true; switched mocks to $-alias imports; admin stores accept optional autoRefresh; tests disable timers where needed.
    • Docs: updated architecture to cover event mixins, SSE design, and the shared worker bootstrap.
    • CI: backend unit tests run with uv --no-sync to speed up CI.

Written for commit d024b8a. Summary will update on new commits.

Summary by CodeRabbit

  • New Features

    • New notification preferences: execution completed/failed, system updates, security alerts, and delivery channels.
  • Refactor

    • Unified worker bootstrap for consistent background worker lifecycle.
    • SSE moved to a stream-based service powering event/notification/replay flows.
  • User-facing behavior

    • Editor settings now validated on save (font/tab size limits).
    • Admin stores can opt out of automatic refresh.
  • Documentation

    • Updated architecture docs: worker bootstrap and event-system mixins.
  • Tests

    • Improved test isolation, DOM cleanup, and mocked module path aliases.

Copilot AI review requested due to automatic review settings March 4, 2026 23:35
@coderabbitai
Copy link

coderabbitai bot commented Mar 4, 2026

Note

Reviews paused

It looks like this branch is under active development. To avoid overwhelming you with review comments due to an influx of new commits, CodeRabbit has automatically paused this review. You can configure this behavior by changing the reviews.auto_review.auto_pause_after_reviewed_commits setting.

Use the following commands to manage reviews:

  • @coderabbitai resume to resume automatic reviews.
  • @coderabbitai review to trigger a single review.

Use the checkboxes below for quick actions:

  • ▶️ Resume reviews
  • 🔍 Trigger review

No actionable comments were generated in the recent review. 🎉

ℹ️ Recent review info
⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro

Run ID: 1469cfa3-000b-4d8a-b524-eab963f1fe60

📥 Commits

Reviewing files that changed from the base of the PR and between 99f2c0f and d024b8a.

📒 Files selected for processing (1)
  • .github/workflows/stack-tests.yml

📝 Walkthrough

Walkthrough

Consolidates execution fields into an ExecutionSpec mixin; replaces the Redis pub/sub SSERedisBus with a Redis Streams-based SSEService and updates providers/tests/handlers; centralizes worker startup with a new run_worker bootstrap used by run_* entrypoints; removes SagaInstance and trims exported user-settings docs/exports; adds/relocates validators and schema fields.

Changes

Cohort / File(s) Summary
Worker bootstrap & entrypoints
backend/workers/bootstrap.py, backend/workers/run_dlq_processor.py, backend/workers/run_event_replay.py, backend/workers/run_k8s_worker.py, backend/workers/run_pod_monitor.py, backend/workers/run_result_processor.py, backend/workers/run_saga_orchestrator.py
Added run_worker() to centralize startup (settings, Beanie, DI, broker, FastStream); refactored run_* scripts to delegate lifecycle, register optional on_startup/on_shutdown hooks, and use container services.
SSE subsystem (implementation + tests)
backend/app/services/sse/redis_bus.py (deleted), backend/app/services/sse/sse_service.py, backend/app/services/sse/__init__.py, backend/app/core/providers.py, backend/app/events/handlers.py, backend/tests/unit/services/sse/*, backend/tests/e2e/*, backend/tests/e2e/conftest.py
Removed SSERedisBus, added SSEService using Redis Streams (publish/poll), updated providers and handlers to accept SSEService, replaced pub/sub listeners with stream polling, migrated tests to stream-based fakes/adapters.
Event model refactor & docs
backend/app/domain/events/typed.py, docs/architecture/event-system-design.md
Extracted shared execution fields into ExecutionSpec (Pydantic mixin); ExecutionRequestedEvent and CreatePodCommandEvent inherit it; docs updated to explain mixins and discriminator usage.
Event replay & notification services
backend/app/services/event_replay/replay_service.py, backend/app/services/notification_service.py
Reworked replay dispatch (batch loading, delay calc, reschedule, metrics), added scheduling helpers and per-event metrics; swapped SSERedisBus injection → SSEService and adjusted constructors/usages; added validation/throttling helpers in notification service.
SSE provider wiring & Redis client
backend/app/core/providers.py
Replaced get_sse_redis_bus with get_sse_service, propagated SSEService into notification/replay providers, and enabled socket_keepalive on Redis client creation.
User settings / schemas / exports
backend/app/db/docs/user_settings.py, backend/app/db/docs/__init__.py, backend/app/schemas_pydantic/user_settings.py, backend/app/services/user_settings_service.py
Expanded NotificationSettings (new flags + channels), removed EditorSettings/NotificationSettings from docs init exports, removed Pydantic field_validators and added runtime editor validation in service.
Saga API surface
backend/app/domain/saga/models.py, backend/app/domain/saga/__init__.py, backend/app/services/saga/__init__.py
Removed SagaInstance dataclass and removed it from package export surfaces.
Tests & test helpers
backend/tests/unit/services/sse/test_redis_bus.py (deleted), backend/tests/unit/services/sse/test_sse_publish.py, backend/tests/unit/services/sse/test_sse_service.py, backend/tests/e2e/*, backend/tests/e2e/conftest.py
Deleted bus-based unit tests; added stream-based unit tests and updated e2e tests/conftest to poll Redis Streams via adapters and new _read_stream helper; test doubles converted to xadd/xread semantics.
Frontend: test config, isolation, mocks, and stores auto-refresh
frontend/vitest.config.ts, frontend/vitest.setup.ts, many frontend/.../__tests__/*, frontend/src/lib/admin/stores/*.svelte.ts
Enabled vitest isolate, added DOM cleanup in setup, migrated many test mocks to path aliases, and made admin stores' auto-refresh configurable via constructor/options (factory signatures updated).
Plot helper & misc docs
backend/tests/load/plot_report.py, docs/architecture/lifecycle.md, docs/architecture/services-overview.md, docs/components/workers/index.md
Added _save_bar_chart helper; refactored plots to reuse it. Documentation updated to describe unified run_worker bootstrap and worker lifecycle examples.
Minor docstring / export trims and small edits
backend/app/db/docs/__init__.py, backend/app/db/docs/saga.py, backend/app/services/sse/__init__.py, backend/app/domain/saga/__init__.py, other small files
Removed specific imports/exports (EditorSettings, NotificationSettings, SagaInstance); shortened docstring in SagaDocument; removed SSERedisBus from all where applicable.

Sequence Diagram(s)

sequenceDiagram
    participant Main as Worker main()
    participant Runner as run_worker()
    participant Settings as Settings
    participant Beanie as Beanie (Mongo)
    participant Container as DI Container
    participant Broker as KafkaBroker
    participant Startup as on_startup hook
    participant FastStream as FastStream
    participant Shutdown as on_shutdown hook

    Main->>Runner: call run_worker(name, config_override, factory, register_handlers?, hooks?)
    Runner->>Settings: load configuration
    Runner->>Beanie: initialize Beanie / ALL_DOCUMENTS
    Runner->>Container: create DI container via factory
    Container->>Broker: resolve KafkaBroker
    Runner->>Runner: optionally call register_handlers(container, broker)
    Runner->>Startup: invoke on_startup(container, broker, logger)
    Startup->>Container: resolve services and schedule jobs
    Runner->>FastStream: configure & start FastStream with lifecycle hooks
    FastStream->>Shutdown: invoke on_shutdown() on termination
    Shutdown->>Container: shutdown schedulers and cleanup
    Runner->>Container: close container
    Runner->>Main: return / exit
Loading

Estimated code review effort

🎯 5 (Critical) | ⏱️ ~120 minutes

Possibly related PRs

Poem

🐇 I hopped through fields and bindings bright,

Mixins stitched execution neat and tight,
Workers gather, one neat start,
Streams hum softly where buses part,
Validators guard the garden at night ✨

🚥 Pre-merge checks | ✅ 1 | ❌ 2

❌ Failed checks (1 warning, 1 inconclusive)

Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 21.90% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
Title check ❓ Inconclusive The title 'Feat: partial refactoring, issues detected by pyscn' is vague and does not clearly convey the substantial changes made in this PR. Use a more descriptive title such as 'Refactor: replace SSE Redis bus with streams, standardize worker bootstrap' to better reflect the main changes.
✅ Passed checks (1 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Post copyable unit tests in a comment
  • Commit unit tests in branch feat/pyscn

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

@codecov-commenter
Copy link

codecov-commenter commented Mar 4, 2026

⚠️ Please install the 'codecov app svg image' to ensure uploads and comments are reliably processed by Codecov.

Codecov Report

❌ Patch coverage is 85.62092% with 22 lines in your changes missing coverage. Please review.

Files with missing lines Patch % Lines
backend/app/services/notification_service.py 34.78% 15 Missing ⚠️
...ackend/app/services/event_replay/replay_service.py 80.00% 5 Missing ⚠️
backend/app/services/user_settings_service.py 77.77% 2 Missing ⚠️
❗ Your organization needs to install the Codecov GitHub app to enable full functionality.
Flag Coverage Δ
backend-e2e 81.34% <74.80%> (-1.87%) ⬇️
backend-unit 67.68% <61.06%> (-0.24%) ⬇️
frontend-unit 86.51% <100.00%> (+0.02%) ⬆️

Flags with carried forward coverage won't be shown. Click here to find out more.

Files with missing lines Coverage Δ
backend/app/core/providers.py 94.94% <100.00%> (-0.06%) ⬇️
backend/app/db/docs/__init__.py 100.00% <ø> (ø)
backend/app/db/docs/saga.py 100.00% <ø> (ø)
backend/app/db/docs/user_settings.py 100.00% <100.00%> (+3.17%) ⬆️
backend/app/domain/events/typed.py 100.00% <100.00%> (ø)
backend/app/domain/saga/__init__.py 100.00% <ø> (ø)
backend/app/domain/saga/models.py 98.87% <ø> (-1.13%) ⬇️
backend/app/events/handlers.py 45.55% <100.00%> (-3.34%) ⬇️
backend/app/schemas_pydantic/user_settings.py 100.00% <100.00%> (+2.89%) ⬆️
backend/app/services/saga/__init__.py 100.00% <100.00%> (ø)
... and 8 more

... and 12 files with indirect coverage changes

Components Coverage Δ
Backend 88.74% <83.20%> (-1.33%) ⬇️
Frontend 86.51% <100.00%> (+0.02%) ⬆️
🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.
  • 📦 JS Bundle Analysis: Save yourself from yourself by tracking and limiting bundle sizes in JS merges.

Copy link

@cubic-dev-ai cubic-dev-ai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

3 issues found across 23 files

Prompt for AI agents (unresolved issues)

Check if these issues are valid — if so, understand the root cause of each and fix them. If appropriate, use sub-agents to investigate and fix each issue separately.


<file name="backend/workers/bootstrap.py">

<violation number="1" location="backend/workers/bootstrap.py:54">
P2: AsyncMongoClient is created but never closed; add a shutdown hook to close the client so worker shutdowns release Mongo connections cleanly.</violation>
</file>

<file name="backend/app/schemas_pydantic/user_settings.py">

<violation number="1" location="backend/app/schemas_pydantic/user_settings.py:4">
P1: Validation constraints for editor settings were removed without replacement, allowing invalid `font_size`/`tab_size` values to be accepted.</violation>
</file>

<file name="backend/workers/run_saga_orchestrator.py">

<violation number="1" location="backend/workers/run_saga_orchestrator.py:11">
P2: Initialize AsyncIOScheduler inside the running event loop. Creating it at module import time happens before `asyncio.run()` builds the loop, which APScheduler documents can prevent the scheduler from running. Move scheduler creation into `_on_startup` (or pass the current loop) instead of a module-level singleton.</violation>
</file>

Reply with feedback, questions, or to request a fix. Tag @cubic-dev-ai to re-run a review.

Copy link

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR introduces a shared worker bootstrap (run_worker) to centralize common initialization (settings/logging/Beanie/Dishka/FastStream), updates worker entrypoints and architecture docs accordingly, and includes a few related refactors (event field mixins, settings validation, plotting helper extraction).

Changes:

  • Added backend/workers/bootstrap.py and migrated all worker run_*.py entrypoints to use run_worker().
  • Refactored execution/pod command event models to reuse a shared ExecutionSpec mixin; updated docs to describe the pattern.
  • Adjusted validation/refactors in services and utilities (user settings editor validation, notification helper methods, event replay helpers, load-test plotting helper).

Reviewed changes

Copilot reviewed 23 out of 23 changed files in this pull request and generated 4 comments.

Show a summary per file
File Description
docs/components/workers/index.md Documents shared worker bootstrap location and lifecycle link.
docs/architecture/services-overview.md Updates deployed worker overview to reflect shared bootstrap.
docs/architecture/lifecycle.md Rewrites worker entrypoint/lifecycle guidance to match run_worker() pattern.
docs/architecture/event-system-design.md Documents shared Pydantic mixin approach for event fields.
backend/workers/run_saga_orchestrator.py Migrates saga orchestrator worker entrypoint to run_worker() + hooks.
backend/workers/run_result_processor.py Migrates result processor worker entrypoint to run_worker().
backend/workers/run_pod_monitor.py Migrates pod monitor worker entrypoint to run_worker() + APScheduler startup/shutdown hooks.
backend/workers/run_k8s_worker.py Migrates k8s worker entrypoint to run_worker() + startup hook.
backend/workers/run_event_replay.py Migrates event replay worker entrypoint to run_worker() + APScheduler hooks.
backend/workers/run_dlq_processor.py Migrates DLQ processor entrypoint to run_worker() + APScheduler hooks.
backend/workers/bootstrap.py New shared bootstrap that initializes logging, Beanie, Dishka+FastStream, and worker hooks.
backend/tests/load/plot_report.py Extracts shared bar chart saving logic to a helper.
backend/app/services/user_settings_service.py Adds service-layer validation for editor settings (font/tab sizes).
backend/app/services/saga/init.py Removes re-export of deleted SagaInstance.
backend/app/services/notification_service.py Extracts scheduled-time validation and throttle check into helpers.
backend/app/services/event_replay/replay_service.py Extracts helper methods to simplify replay dispatch control flow.
backend/app/schemas_pydantic/user_settings.py Removes Pydantic field validators for editor settings in API schemas.
backend/app/domain/saga/models.py Deletes unused SagaInstance dataclass.
backend/app/domain/saga/init.py Removes SagaInstance from exports.
backend/app/domain/events/typed.py Introduces ExecutionSpec mixin and applies it to relevant events.
backend/app/db/docs/user_settings.py Removes embedded-document validators and simplifies docstrings.
backend/app/db/docs/saga.py Simplifies SagaDocument docstring after SagaInstance removal.
backend/app/db/docs/init.py Stops exporting embedded settings models from docs package.

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 11

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
backend/app/domain/events/typed.py (1)

61-83: ⚠️ Potential issue | 🟡 Minor

Remove banner/separator comments from this module.

Line 61 and Line 82 add # --- ... --- separators, which violate the repo’s Python style rule.

Suggested cleanup
-# --- Execution Spec (shared fields between ExecutionRequestedEvent and CreatePodCommandEvent) ---
-
 class ExecutionSpec(BaseModel):
     """Shared execution specification fields (mixin for ExecutionRequestedEvent and CreatePodCommandEvent)."""
@@
-# --- Execution Events ---
-
 class ExecutionRequestedEvent(BaseEvent, ExecutionSpec):

As per coding guidelines: Never use banner/separator comments (# ---, # ===, # ***) — use classes, blank lines, and docstrings to organize code.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@backend/app/domain/events/typed.py` around lines 61 - 83, Remove the
banner/separator comments surrounding the ExecutionSpec declaration: delete the
lines containing "# --- Execution Spec (shared fields between
ExecutionRequestedEvent and CreatePodCommandEvent) ---" and "# --- Execution
Events ---" and instead rely on the ExecutionSpec class docstring, a single
blank line separation, or nearby class definitions (e.g.,
ExecutionRequestedEvent, CreatePodCommandEvent) to structure the module; ensure
no other banner-style comments remain in typed.py and keep the ExecutionSpec
class and its fields unchanged.
🧹 Nitpick comments (3)
backend/app/schemas_pydantic/user_settings.py (1)

21-30: Consider the trade-offs of removing Pydantic validators.

Moving validation from Pydantic to the service layer (_validate_editor_settings in user_settings_service.py) is a valid architectural choice, but note:

  1. OpenAPI schema impact: The generated API docs will no longer show font_size (8-32) and tab_size (2, 4, 8) constraints, potentially affecting API consumers.
  2. Error response timing: Validation errors now occur deeper in the request lifecycle rather than at deserialization.

If this is intentional for centralized validation control, consider adding Field(ge=8, le=32) for font_size and Literal[2, 4, 8] for tab_size to preserve OpenAPI documentation while keeping service-level validation as the enforcement layer.

📝 Optional: Restore constraints for OpenAPI documentation
+from typing import Literal
+
 class EditorSettings(BaseModel):
     """Code editor preferences"""

     model_config = ConfigDict(from_attributes=True, json_schema_serialization_defaults_required=True)

-    font_size: int = 14
-    tab_size: int = 4
+    font_size: int = Field(default=14, ge=8, le=32)
+    tab_size: Literal[2, 4, 8] = 4
     use_tabs: bool = False
     word_wrap: bool = True
     show_line_numbers: bool = True
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@backend/app/schemas_pydantic/user_settings.py` around lines 21 - 30, Add
explicit Pydantic constraints back to the EditorSettings model to preserve
OpenAPI docs: change the font_size and tab_size annotations in EditorSettings to
use Field(ge=8, le=32) for font_size and typing.Literal[2, 4, 8] for tab_size so
the schema reflects the allowed ranges/options, while still keeping the deeper
enforcement in _validate_editor_settings in user_settings_service.py; this keeps
documentation accurate without removing your centralized service-layer
validation.
backend/workers/bootstrap.py (1)

51-51: Use structured log kwargs instead of f-strings.

These logs should pass worker_name as a keyword field rather than interpolating into the message string.

Proposed fix
-    logger.info(f"Starting {worker_name}...")
+    logger.info("Starting worker", worker_name=worker_name)
...
-        logger.info(f"{worker_name} shutdown complete")
+        logger.info("Worker shutdown complete", worker_name=worker_name)
As per coding guidelines: "Use keyword arguments for all log statements — never interpolate user-controlled data into log messages."

Also applies to: 81-81

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@backend/workers/bootstrap.py` at line 51, Replace the f-string interpolation
in the logger calls with structured keyword fields: instead of
logger.info(f"Starting {worker_name}...") call logger.info("Starting worker",
worker_name=worker_name). Do the same for the other f-string log at the other
logger.info occurrence (the second logger.info that interpolates worker_name):
pass worker_name as a keyword argument rather than embedding it in the message.
docs/architecture/event-system-design.md (1)

64-65: Clarify “optional” wording for runtime_command.

The current sentence can be read as allowing None, but the field is list[str] with a default factory. Consider saying it is “omittable at construction” rather than “optional”.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@docs/architecture/event-system-design.md` around lines 64 - 65, The phrase
saying `CreatePodCommandEvent` makes `runtime_command` "optional" is ambiguous;
update the wording to indicate the field is a list[str] with a default factory
and therefore "omittable at construction" (or "has a default empty list via
Field(default_factory=list)") rather than implying it can be None; reference
`CreatePodCommandEvent`, the `runtime_command` field, and
`Field(default_factory=list)` when rewriting the sentence alongside the existing
note that mixins inherit from `BaseModel` not `BaseEvent`.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@backend/app/domain/events/typed.py`:
- Around line 424-427: The CreatePodCommandEvent currently sets runtime_command
with Field(default_factory=list), making it silently optional; change the
declaration in the CreatePodCommandEvent class to require runtime_command
(remove Field and default_factory so runtime_command: list[str] is mandatory) to
enforce the invariant that commands are provided (class name:
CreatePodCommandEvent, field: runtime_command). Ensure any callers (e.g., where
CreatePodCommandEvent is constructed in the saga) already pass runtime_command;
update tests/constructors if they relied on the empty-list default.

In `@backend/app/services/event_replay/replay_service.py`:
- Around line 198-200: ReplayConfig.speed_multiplier is unconstrained and can be
zero or negative causing ZeroDivisionError in the replay delay calculation;
update the ReplayConfig model (ReplayConfig.speed_multiplier) to enforce gt=0
(e.g., use Pydantic Field(..., gt=0)) in models.py and also add a runtime guard
in the replay delay logic (the condition that checks
session.config.speed_multiplier < 100) to require
session.config.speed_multiplier > 0 before dividing so the code that computes
time_diff / session.config.speed_multiplier cannot divide by zero or negative
values.

In `@backend/app/services/notification_service.py`:
- Line 182: create_notification is calling _check_throttle a second time for
system notifications already rate-checked in _create_system_for_user, causing
two entries in ThrottleCache.check_throttle; add a boolean parameter (e.g.,
throttle_already_checked or skip_internal_throttle, default False) to
create_notification and propagate it to avoid calling self._check_throttle when
True, then update _create_system_for_user to call create_notification(...,
throttle_already_checked=True) (or alternatively detect the "system" source and
skip the second _check_throttle), and ensure only ThrottleCache.check_throttle
is invoked once per notification path.
- Around line 144-149: The warning log in the throttling branch currently
interpolates user-controlled user_id into the message using f-strings; change
the self.logger.warning call in the block where `throttled` is True to use
structured keyword arguments instead (e.g., pass a template string without
interpolation and provide user_id, NOTIF_THROTTLE_MAX_PER_HOUR, and
NOTIF_THROTTLE_WINDOW_HOURS as keyword args), updating the call site that
references `self.logger.warning` and
`self.settings.NOTIF_THROTTLE_MAX_PER_HOUR`/`self.settings.NOTIF_THROTTLE_WINDOW_HOURS`
so sensitive data is not directly formatted into the message.

In `@backend/tests/load/plot_report.py`:
- Line 121: The return statement calling _save_bar_chart exceeds the 120-char
limit; break it across multiple lines so arguments and the long path expression
are wrapped (e.g., assign out_dir / "endpoint_latency.png" to a short variable
or use a parenthesized call with each argument on its own line) and then return
_save_bar_chart(labels, "Success Latency by Endpoint (Top N)", "Latency (ms)",
path_var, bars) so the line lengths stay under 120 characters and references to
_save_bar_chart, labels, bars, and out_dir remain clear.
- Around line 81-100: The helper function _save_bar_chart lacks a Google-style
docstring; add a docstring above the function using Google style that describes
the function purpose and documents Args (labels: List[str], title: str, ylabel:
str, out_path: Path, plot_bars: Callable[[Axes, range], None]), Returns (Path)
and any Raises (e.g., propagation of matplotlib/IO errors if applicable),
include brief descriptions and types for each parameter and the return value,
and keep it concise to match existing backend docstring conventions.
- Around line 6-7: Remove List, Dict, and Tuple from the typing import and
import only TYPE_CHECKING, Callable, TypedDict; then replace all usages of
typing collection aliases with PEP 585 built-ins: change List[...] to list[...],
Dict[...] to dict[...], and Tuple[...] to tuple[...] in the TypedDict
definitions (e.g., fields on EndpointData and any PlotReport TypedDict), in the
local variable annotations inside the functions referenced around lines 51–53,
the return type at line 74 and line 143, the variable annotation at line 75, and
the parameter annotation at line 82 so they use list[int], dict[str,
EndpointData], tuple[...], etc., matching the original types but using built-in
generics; keep Callable and TypedDict imports as needed.

In `@backend/workers/bootstrap.py`:
- Around line 54-80: The AsyncMongoClient instance named client is never closed;
wrap the lifecycle around app.run so client.close() is invoked on shutdown:
create the client via AsyncMongoClient(...) before init_beanie as now, but
ensure you register its close in the worker shutdown path (either add an async
lambda calling client.close() to shutdown_hooks or surround the app.run() call
with a try/finally that calls await client.close()), keeping init_beanie and
container setup intact; reference AsyncMongoClient/client, init_beanie,
shutdown_hooks (or finally after await app.run()), and container.close to locate
where to add the cleanup.

In `@backend/workers/run_pod_monitor.py`:
- Around line 41-48: The scheduler interval is set to 5s while the watch cycle
(watch_pod_events / _watch_cycle) can block up to watch_timeout_seconds (30s),
causing skipped runs due to max_instances=1; update the _scheduler.add_job call
(id "pod_monitor_watch", function _watch_cycle) to use an interval equal to the
configured watch_timeout_seconds (or otherwise align it with that config value)
so the schedule reflects the actual blocking duration, or alternatively increase
max_instances and handle concurrency—prefer changing seconds from 5 to the
watch_timeout_seconds config to keep behavior predictable.

In `@backend/workers/run_saga_orchestrator.py`:
- Around line 38-39: The shutdown handler _on_shutdown currently calls
_scheduler.shutdown(wait=False) unconditionally which can raise
SchedulerNotRunningError if _on_startup failed before _scheduler.start(); modify
_on_shutdown to check the scheduler's running property (e.g., if
_scheduler.running:) before calling _scheduler.shutdown(...) so shutdown is only
invoked when the scheduler is running, avoiding the exception during application
shutdown.

In `@docs/architecture/lifecycle.md`:
- Around line 100-104: Checklist items 3 and 4 repeat "Optionally" — replace the
two separate "Optionally" phrases with a single, concise sentence covering both
actions; for example, change "Optionally register Kafka subscriber handlers" and
"Optionally define `on_startup` / `on_shutdown` callbacks for APScheduler or
other setup" into a single line like "Register Kafka subscriber handlers and/or
define `on_startup` / `on_shutdown` callbacks for APScheduler or other setup as
needed" so the checklist reads smoother and avoids repetition.

---

Outside diff comments:
In `@backend/app/domain/events/typed.py`:
- Around line 61-83: Remove the banner/separator comments surrounding the
ExecutionSpec declaration: delete the lines containing "# --- Execution Spec
(shared fields between ExecutionRequestedEvent and CreatePodCommandEvent) ---"
and "# --- Execution Events ---" and instead rely on the ExecutionSpec class
docstring, a single blank line separation, or nearby class definitions (e.g.,
ExecutionRequestedEvent, CreatePodCommandEvent) to structure the module; ensure
no other banner-style comments remain in typed.py and keep the ExecutionSpec
class and its fields unchanged.

---

Nitpick comments:
In `@backend/app/schemas_pydantic/user_settings.py`:
- Around line 21-30: Add explicit Pydantic constraints back to the
EditorSettings model to preserve OpenAPI docs: change the font_size and tab_size
annotations in EditorSettings to use Field(ge=8, le=32) for font_size and
typing.Literal[2, 4, 8] for tab_size so the schema reflects the allowed
ranges/options, while still keeping the deeper enforcement in
_validate_editor_settings in user_settings_service.py; this keeps documentation
accurate without removing your centralized service-layer validation.

In `@backend/workers/bootstrap.py`:
- Line 51: Replace the f-string interpolation in the logger calls with
structured keyword fields: instead of logger.info(f"Starting {worker_name}...")
call logger.info("Starting worker", worker_name=worker_name). Do the same for
the other f-string log at the other logger.info occurrence (the second
logger.info that interpolates worker_name): pass worker_name as a keyword
argument rather than embedding it in the message.

In `@docs/architecture/event-system-design.md`:
- Around line 64-65: The phrase saying `CreatePodCommandEvent` makes
`runtime_command` "optional" is ambiguous; update the wording to indicate the
field is a list[str] with a default factory and therefore "omittable at
construction" (or "has a default empty list via Field(default_factory=list)")
rather than implying it can be None; reference `CreatePodCommandEvent`, the
`runtime_command` field, and `Field(default_factory=list)` when rewriting the
sentence alongside the existing note that mixins inherit from `BaseModel` not
`BaseEvent`.

ℹ️ Review info
⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro

Run ID: 60ea9b8f-1cfa-41e5-8e3d-2244b2b32807

📥 Commits

Reviewing files that changed from the base of the PR and between 666d9e1 and 26299a3.

📒 Files selected for processing (23)
  • backend/app/db/docs/__init__.py
  • backend/app/db/docs/saga.py
  • backend/app/db/docs/user_settings.py
  • backend/app/domain/events/typed.py
  • backend/app/domain/saga/__init__.py
  • backend/app/domain/saga/models.py
  • backend/app/schemas_pydantic/user_settings.py
  • backend/app/services/event_replay/replay_service.py
  • backend/app/services/notification_service.py
  • backend/app/services/saga/__init__.py
  • backend/app/services/user_settings_service.py
  • backend/tests/load/plot_report.py
  • backend/workers/bootstrap.py
  • backend/workers/run_dlq_processor.py
  • backend/workers/run_event_replay.py
  • backend/workers/run_k8s_worker.py
  • backend/workers/run_pod_monitor.py
  • backend/workers/run_result_processor.py
  • backend/workers/run_saga_orchestrator.py
  • docs/architecture/event-system-design.md
  • docs/architecture/lifecycle.md
  • docs/architecture/services-overview.md
  • docs/components/workers/index.md
💤 Files with no reviewable changes (3)
  • backend/app/domain/saga/models.py
  • backend/app/domain/saga/init.py
  • backend/app/db/docs/init.py

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧹 Nitpick comments (1)
frontend/vitest.setup.ts (1)

101-106: Consider moving cleanup() to afterEach and adding vi.clearAllMocks().

Per project guidelines, cleanup() should be called in afterEach (not beforeEach) to immediately release DOM resources after each test. Additionally, vi.clearAllMocks() should be called in beforeEach to reset mock state between tests.

♻️ Suggested refactor
+import { vi, beforeEach, afterEach } from 'vitest';
-import { vi, beforeEach } from 'vitest';
 // Reset storage and DOM between every test
 beforeEach(() => {
   Object.keys(localStorageStore).forEach(key => delete localStorageStore[key]);
   Object.keys(sessionStorageStore).forEach(key => delete sessionStorageStore[key]);
-  cleanup();
+  vi.clearAllMocks();
+});
+
+afterEach(() => {
+  cleanup();
 });

Based on learnings: "Call cleanup() in Vitest afterEach, vi.clearAllMocks() in beforeEach".

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@frontend/vitest.setup.ts` around lines 101 - 106, Move the DOM cleanup to an
afterEach and add mock clearing in beforeEach: keep the storage reset logic in
beforeEach (iterating localStorageStore and sessionStorageStore) but call
vi.clearAllMocks() at the start of that beforeEach; remove cleanup() from
beforeEach and call cleanup() inside an afterEach to release DOM resources
immediately after each test. Ensure you reference the existing
beforeEach/afterEach hooks and the cleanup(), vi.clearAllMocks(),
localStorageStore and sessionStorageStore identifiers when making the change.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Nitpick comments:
In `@frontend/vitest.setup.ts`:
- Around line 101-106: Move the DOM cleanup to an afterEach and add mock
clearing in beforeEach: keep the storage reset logic in beforeEach (iterating
localStorageStore and sessionStorageStore) but call vi.clearAllMocks() at the
start of that beforeEach; remove cleanup() from beforeEach and call cleanup()
inside an afterEach to release DOM resources immediately after each test. Ensure
you reference the existing beforeEach/afterEach hooks and the cleanup(),
vi.clearAllMocks(), localStorageStore and sessionStorageStore identifiers when
making the change.

ℹ️ Review info
⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro

Run ID: e7a422e7-ef0a-4d97-ab59-bc427549dccd

📥 Commits

Reviewing files that changed from the base of the PR and between 26299a3 and bdc4e18.

📒 Files selected for processing (2)
  • frontend/vitest.config.ts
  • frontend/vitest.setup.ts

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (2)
frontend/src/lib/__tests__/user-settings.test.ts (1)

6-33: ⚠️ Potential issue | 🟠 Major

Use vi.hoisted() to hoist mock declarations before vi.mock() calls.

The mock functions defined at lines 3–4, 13, 19, and 25 are declared outside vi.hoisted() and used immediately in vi.mock() callbacks starting at line 6. This violates the test standard requiring all mocks to be hoisted before any vi.mock() declarations.

Consolidate all mock definitions into a single vi.hoisted() block at the top of the file before any vi.mock() calls:

Suggested refactor
+const { mockGetUserSettings, mockUpdateUserSettings, mockSetUserSettings, mockSetTheme, mockAuthStore } =
+    vi.hoisted(() => ({
+        mockGetUserSettings: vi.fn(),
+        mockUpdateUserSettings: vi.fn(),
+        mockSetUserSettings: vi.fn(),
+        mockSetTheme: vi.fn(),
+        mockAuthStore: { isAuthenticated: true as boolean | null },
+    }));
+
-const mockGetUserSettings = vi.fn();
-const mockUpdateUserSettings = vi.fn();
-
 vi.mock('$lib/api', () => ({
     getUserSettingsApiV1UserSettingsGet: (...args: unknown[]) =>
         (mockGetUserSettings as (...a: unknown[]) => unknown)(...args),
     updateUserSettingsApiV1UserSettingsPut: (...args: unknown[]) =>
         (mockUpdateUserSettings as (...a: unknown[]) => unknown)(...args),
 }));
-
-const mockSetUserSettings = vi.fn();
 
 vi.mock('$stores/userSettings.svelte', () => ({
     setUserSettings: (settings: unknown) => mockSetUserSettings(settings),
 }));
-
-const mockSetTheme = vi.fn();
 
 vi.mock('$stores/theme.svelte', () => ({
     setTheme: (theme: string) => mockSetTheme(theme),
 }));
-
-const mockAuthStore = {
-    isAuthenticated: true as boolean | null,
-};
 
 vi.mock('$stores/auth.svelte', () => ({
     authStore: mockAuthStore,
 }));
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@frontend/src/lib/__tests__/user-settings.test.ts` around lines 6 - 33, Move
the standalone mock declarations (mockGetUserSettings, mockUpdateUserSettings,
mockSetUserSettings, mockSetTheme, and mockAuthStore) into a single vi.hoisted()
block placed before any vi.mock() calls so they are hoisted; then update the
vi.mock() callbacks (the ones creating getUserSettingsApiV1UserSettingsGet,
updateUserSettingsApiV1UserSettingsPut, setUserSettings, setTheme, and
authStore) to reference those hoisted mocks instead of locally-scoped vars.
Ensure vi.hoisted() returns the mock functions/objects and keep the vi.mock()
callbacks unchanged except for using the hoisted identifiers.
frontend/src/stores/__tests__/theme.test.ts (1)

4-12: ⚠️ Potential issue | 🟠 Major

Wrap mocks in vi.hoisted() before vi.mock() declarations.

Lines 4-12 require hoisting mocks following the established pattern used throughout the test suite. Refactor to:

Suggested structure
const mocks = vi.hoisted(() => ({
    saveUserSettings: vi.fn().mockResolvedValue(true),
    authStore: {
        isAuthenticated: false,
    },
}));

vi.mock('$lib/user-settings', () => ({
    saveUserSettings: mocks.saveUserSettings,
}));

vi.mock('$stores/auth.svelte', () => ({
    authStore: mocks.authStore,
}));

Per coding guidelines: "Frontend Vitest tests must hoist mocks with vi.hoisted() before vi.mock() declarations."

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@frontend/src/stores/__tests__/theme.test.ts` around lines 4 - 12, Refactor
the top-level mocks so they are created with vi.hoisted() and then referenced in
the vi.mock() calls: create a hoisted mocks object (via vi.hoisted(() => ({
saveUserSettings: vi.fn().mockResolvedValue(true), authStore: { isAuthenticated:
false } }))) and update the existing vi.mock('$lib/user-settings', ...) to
return mocks.saveUserSettings and vi.mock('$stores/auth.svelte', ...) to return
mocks.authStore; this ensures saveUserSettings and authStore are hoisted before
the vi.mock() declarations.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@frontend/src/stores/__tests__/auth.test.ts`:
- Around line 8-26: The three top-level mock functions (mockLoginApi,
mockLogoutApi, mockGetProfileApi), mockClearUserSettings, and
mockLoadUserSettings must be created with vi.hoisted so they are defined before
the vi.mock factories; replace the existing const declarations with hoisted
destructuring assignments (e.g., const { mockLoginApi, mockLogoutApi,
mockGetProfileApi } = vi.hoisted(() => ({ mockLoginApi: vi.fn(), ... }))) and
keep the vi.mock('$lib/api'), vi.mock('$stores/userSettings.svelte') and
vi.mock('$lib/user-settings') factories using those hoisted symbols
(loginApiV1AuthLoginPost calling mockLoginApi, clearUserSettings calling
mockClearUserSettings, loadUserSettings calling mockLoadUserSettings) so the
mocks are evaluated in the required order.

---

Outside diff comments:
In `@frontend/src/lib/__tests__/user-settings.test.ts`:
- Around line 6-33: Move the standalone mock declarations (mockGetUserSettings,
mockUpdateUserSettings, mockSetUserSettings, mockSetTheme, and mockAuthStore)
into a single vi.hoisted() block placed before any vi.mock() calls so they are
hoisted; then update the vi.mock() callbacks (the ones creating
getUserSettingsApiV1UserSettingsGet, updateUserSettingsApiV1UserSettingsPut,
setUserSettings, setTheme, and authStore) to reference those hoisted mocks
instead of locally-scoped vars. Ensure vi.hoisted() returns the mock
functions/objects and keep the vi.mock() callbacks unchanged except for using
the hoisted identifiers.

In `@frontend/src/stores/__tests__/theme.test.ts`:
- Around line 4-12: Refactor the top-level mocks so they are created with
vi.hoisted() and then referenced in the vi.mock() calls: create a hoisted mocks
object (via vi.hoisted(() => ({ saveUserSettings:
vi.fn().mockResolvedValue(true), authStore: { isAuthenticated: false } }))) and
update the existing vi.mock('$lib/user-settings', ...) to return
mocks.saveUserSettings and vi.mock('$stores/auth.svelte', ...) to return
mocks.authStore; this ensures saveUserSettings and authStore are hoisted before
the vi.mock() declarations.

ℹ️ Review info
⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro

Run ID: 233e5efe-2de8-4249-aab7-e273180357c8

📥 Commits

Reviewing files that changed from the base of the PR and between bdc4e18 and 22b09ba.

📒 Files selected for processing (11)
  • backend/workers/bootstrap.py
  • frontend/src/components/__tests__/Header.test.ts
  • frontend/src/components/__tests__/NotificationCenter.test.ts
  • frontend/src/lib/__tests__/user-settings.test.ts
  • frontend/src/routes/admin/__tests__/AdminEvents.test.ts
  • frontend/src/routes/admin/__tests__/AdminExecutions.test.ts
  • frontend/src/routes/admin/__tests__/AdminSagas.test.ts
  • frontend/src/routes/admin/__tests__/AdminSettings.test.ts
  • frontend/src/routes/admin/__tests__/AdminUsers.test.ts
  • frontend/src/stores/__tests__/auth.test.ts
  • frontend/src/stores/__tests__/theme.test.ts
🚧 Files skipped from review as they are similar to previous changes (1)
  • backend/workers/bootstrap.py

Copy link

@cubic-dev-ai cubic-dev-ai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

1 issue found across 15 files (changes from recent commits).

Prompt for AI agents (unresolved issues)

Check if these issues are valid — if so, understand the root cause of each and fix them. If appropriate, use sub-agents to investigate and fix each issue separately.


<file name="backend/tests/e2e/conftest.py">

<violation number="1" location="backend/tests/e2e/conftest.py:101">
P1: `wait_for_notification` can return on stale stream entries, causing false-positive readiness in E2E tests. Track a stream cursor (or start position) so only new messages satisfy the wait.</violation>
</file>

Reply with feedback, questions, or to request a fix. Tag @cubic-dev-ai to re-run a review.

async for _message in pubsub.listen():
return # first message = notification persisted
while True:
if await _read_stream(redis_client, key):
Copy link

@cubic-dev-ai cubic-dev-ai bot Mar 5, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1: wait_for_notification can return on stale stream entries, causing false-positive readiness in E2E tests. Track a stream cursor (or start position) so only new messages satisfy the wait.

Prompt for AI agents
Check if this issue is valid — if so, understand the root cause and fix it. At backend/tests/e2e/conftest.py, line 101:

<comment>`wait_for_notification` can return on stale stream entries, causing false-positive readiness in E2E tests. Track a stream cursor (or start position) so only new messages satisfy the wait.</comment>

<file context>
@@ -76,19 +88,19 @@ async def wait_for_notification(
-            async for _message in pubsub.listen():
-                return  # first message = notification persisted
+        while True:
+            if await _read_stream(redis_client, key):
+                return
+            await asyncio.sleep(0.1)
</file context>
Fix with Cubic

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 4

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
backend/app/services/event_replay/replay_service.py (1)

34-41: 🛠️ Refactor suggestion | 🟠 Major

Inject Settings in EventReplayService constructor to match service DI contract.

This constructor still omits settings, which diverges from the service injection rule used across backend services.

As per coding guidelines: "Services must be constructor-injected with: repo, producer, logger, metrics, settings — never instantiated directly".

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@backend/app/services/event_replay/replay_service.py` around lines 34 - 41,
The EventReplayService __init__ is missing the Settings injection required by
our DI contract; add a settings: Settings parameter to the constructor signature
for EventReplayService and store it on self (e.g., self.settings = settings),
updating any callers/instantiation sites to provide the Settings object;
reference the existing symbols ReplayRepository, UnifiedProducer, ReplayMetrics,
structlog.stdlib.BoundLogger, SSEService and ensure the constructor now accepts
and retains Settings alongside those dependencies.
♻️ Duplicate comments (3)
backend/app/services/notification_service.py (2)

145-149: ⚠️ Potential issue | 🟡 Minor

Use structured fields in throttle warning logs.

This warning still interpolates user_id directly into the message. Keep the message static and pass fields as kwargs.

🔧 Suggested fix
         if throttled:
             self.logger.warning(
-                f"Notification rate limit exceeded for user {user_id}. "
-                f"Max {self.settings.NOTIF_THROTTLE_MAX_PER_HOUR} "
-                f"per {self.settings.NOTIF_THROTTLE_WINDOW_HOURS} hour(s)"
+                "Notification rate limit exceeded",
+                user_id=user_id,
+                max_per_hour=self.settings.NOTIF_THROTTLE_MAX_PER_HOUR,
+                window_hours=self.settings.NOTIF_THROTTLE_WINDOW_HOURS,
             )
Based on learnings: "Never log user-controlled data via string interpolation — always use structured keyword args".
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@backend/app/services/notification_service.py` around lines 145 - 149, The
warning in NotificationService uses string interpolation and logs
user-controlled user_id directly; change the self.logger.warning call in
notification_service.py (the block that references user_id and
self.settings.NOTIF_THROTTLE_MAX_PER_HOUR / NOTIF_THROTTLE_WINDOW_HOURS) to use
a static message string and pass dynamic values as structured kwargs (e.g.,
user_id, max_per_hour, window_hours) instead of embedding them into the message
so the logger records fields separately.

300-314: ⚠️ Potential issue | 🟠 Major

System notifications are still being throttled twice.

Line 301 checks throttle in _create_system_for_user, and create_notification checks again on Line 182. One notification can consume two slots and distort throttling outcomes.

🔧 Suggested fix
     async def create_notification(
         self,
         user_id: str,
         subject: str,
         body: str,
         tags: list[str],
         action_url: str,
         severity: NotificationSeverity = NotificationSeverity.MEDIUM,
         channel: NotificationChannel = NotificationChannel.IN_APP,
         scheduled_for: datetime | None = None,
         metadata: NotificationContext | None = None,
+        _skip_throttle: bool = False,
     ) -> DomainNotification:
@@
-        await self._check_throttle(user_id, severity, "general")
+        if not _skip_throttle:
+            await self._check_throttle(user_id, severity, "general")
@@
             await self.create_notification(
                 user_id=user_id,
                 subject=title,
                 body=str(base_context.get("message", "Alert")),
                 tags=tags,
                 action_url="/api/v1/notifications",
                 severity=cfg.severity,
                 channel=NotificationChannel.IN_APP,
                 metadata=base_context,
+                _skip_throttle=True,  # Already checked in this path
             )
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@backend/app/services/notification_service.py` around lines 300 - 314, The
code double-checks throttling in _create_system_for_user (calling
_check_throttle) and again inside create_notification, causing two throttle
slots to be consumed; fix by consolidating the check: either remove the explicit
_check_throttle call in _create_system_for_user, or (preferred) add a flag to
create_notification (e.g., skip_throttle or perform_throttle: bool = True) that
defaults to performing the throttle check, update create_notification to respect
this flag and skip its internal _check_throttle when skip_throttle=True, then
call create_notification from _create_system_for_user with the flag set so the
throttle is only evaluated once and NotificationThrottledError handling remains
correct.
backend/app/services/event_replay/replay_service.py (1)

195-200: ⚠️ Potential issue | 🔴 Critical

Guard speed multiplier before division in replay delay calculation.

time_diff / session.config.speed_multiplier can raise at runtime when multiplier is 0 (and negative values should also be rejected).

🔧 Suggested fix
     def _calculate_replay_delay(self, session: ReplaySessionState) -> float:
         """Calculate the delay before dispatching the next event based on speed multiplier."""
         next_event = self._peek_next_event(session.session_id)
-        if next_event and session.last_event_at and session.config.speed_multiplier < 100:
+        multiplier = session.config.speed_multiplier
+        if next_event and session.last_event_at and 0 < multiplier < 100:
             time_diff = (next_event.timestamp - session.last_event_at).total_seconds()
-            return max(time_diff / session.config.speed_multiplier, 0)
+            return max(time_diff / multiplier, 0)
         return 0.0

Verification: confirm ReplayConfig.speed_multiplier is constrained (gt=0) or that runtime guards exist before division.

#!/bin/bash
set -euo pipefail

# 1) Check ReplayConfig constraints for speed_multiplier
fd -t f "models.py" backend/app/domain --exec rg -n "class ReplayConfig|speed_multiplier|Field\\(|gt=|ge=" {} +

# 2) Check replay delay logic guard in service
rg -n -C3 "def _calculate_replay_delay|speed_multiplier" backend/app/services/event_replay/replay_service.py
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@backend/app/services/event_replay/replay_service.py` around lines 195 - 200,
The division by session.config.speed_multiplier in _calculate_replay_delay can
raise on zero/negative values; update the method (function
_calculate_replay_delay in replay_service.py) to explicitly guard
ReplaySessionState.config.speed_multiplier before dividing (e.g., if
speed_multiplier <= 0: log or raise a clear error and return 0 or fallback to a
safe default) so you never divide by zero; alternatively ensure
ReplayConfig.speed_multiplier has a gt=0 constraint, but if you keep runtime
protection add the check immediately after obtaining session.config and before
using it with _peek_next_event.
🧹 Nitpick comments (9)
frontend/src/lib/admin/stores/eventsStore.svelte.ts (1)

42-43: Replace inline options object types with a named interface.

Lines 42 and 240 use inline object-shape types; please switch to an interface and reuse it across constructor/factory.

As per coding guidelines: "Use interface for object shapes, type for unions/intersections/aliases in TypeScript".

Also applies to: 240-241

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@frontend/src/lib/admin/stores/eventsStore.svelte.ts` around lines 42 - 43,
Define a named interface (e.g. EventsStoreOptions) for the options object
currently typed inline and replace the inline type in the constructor signature
(constructor({ autoRefresh = true }: { autoRefresh?: boolean } = {})) with that
interface; then reuse the same EventsStoreOptions interface for the
factory/creator function signature referenced around lines 240-241 so both
constructor and the factory accept the same typed options. Ensure the interface
is exported/visible where both constructor and the factory (e.g.,
createEventsStore or similar) can reference it.
frontend/src/lib/admin/stores/sagasStore.svelte.ts (1)

23-24: Use a shared options interface for autoRefresh.

Inline object-shape annotations in constructor/factory should be replaced with a named interface for consistency and guideline compliance.

As per coding guidelines: "Use interface for object shapes, type for unions/intersections/aliases in TypeScript".

Also applies to: 82-83

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@frontend/src/lib/admin/stores/sagasStore.svelte.ts` around lines 23 - 24,
Define a named interface (e.g., SagasStoreOptions { autoRefresh?: boolean }) and
replace the inline object-shape annotation in the constructor signature
(currently written as constructor({ autoRefresh = true }: { autoRefresh?:
boolean } = {})) with that interface (constructor(options: SagasStoreOptions = {
autoRefresh: true }) or equivalent destructuring using SagasStoreOptions); apply
the same replacement to the other occurrence referenced (the factory/initializer
at lines 82-83) so both use the shared SagasStoreOptions interface for
consistency.
frontend/src/lib/admin/stores/executionsStore.svelte.ts (1)

26-27: Use a named interface for store options instead of inline object types.

This repeats the same object shape twice and violates the TS style rule for object shapes.

♻️ Suggested refactor
+interface ExecutionsStoreOptions {
+    autoRefresh?: boolean;
+}
+
 class ExecutionsStore {
@@
-    constructor({ autoRefresh = true }: { autoRefresh?: boolean } = {}) {
+    constructor({ autoRefresh = true }: ExecutionsStoreOptions = {}) {
@@
-export function createExecutionsStore(options?: { autoRefresh?: boolean }): ExecutionsStore {
+export function createExecutionsStore(options?: ExecutionsStoreOptions): ExecutionsStore {
     return new ExecutionsStore(options);
 }

As per coding guidelines: "Use interface for object shapes, type for unions/intersections/aliases in TypeScript".

Also applies to: 85-86

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@frontend/src/lib/admin/stores/executionsStore.svelte.ts` around lines 26 -
27, Create a named interface for the store options (e.g.,
ExecutionsStoreOptions) and replace the inline object type occurrences with that
interface: update the constructor signature (the constructor({ autoRefresh =
true }: { autoRefresh?: boolean } = {})) to use the new interface and update the
other occurrence around the code referenced (lines 85-86) to accept/annotate the
same ExecutionsStoreOptions type; ensure the interface declares autoRefresh?:
boolean with the existing default behavior preserved.
backend/tests/unit/services/sse/test_sse_service.py (2)

156-180: Test imports private adapter from implementation module.

Line 162 uses _notif_adapter which is a module-private adapter. While this is acceptable in unit tests for verifying internal behavior, consider whether the test could use the public publish_notification method instead of directly calling xadd.

The current approach directly exercises the stream format, which is useful for unit testing but creates coupling to internal structure.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@backend/tests/unit/services/sse/test_sse_service.py` around lines 156 - 180,
Test directly uses the private _notif_adapter and xadd to push a stream entry;
change the test to use the public publish_notification API instead so it
exercises the public contract rather than internal adapter details: create the
DomainNotificationSSEPayload (or equivalent public DTO) and call
publish_notification (or svc.publish_notification) to write to the stream, then
use create_notification_stream and _FakeRedis as before to assert the yielded
event; this removes coupling to _notif_adapter and xadd while keeping the same
assertions on create_notification_stream.

209-220: Late import inside test function.

Line 210 imports _replay_adapter inside the test function. For consistency with the file's other imports (lines 14-15), consider moving this import to the top of the file.

♻️ Move import to file level
 from app.services.sse import SSEService
-from app.services.sse.sse_service import _exec_adapter, _notif_adapter
+from app.services.sse.sse_service import _exec_adapter, _notif_adapter, _replay_adapter

Then remove line 210.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@backend/tests/unit/services/sse/test_sse_service.py` around lines 209 - 220,
The test has a late inline import of _replay_adapter inside the test function;
move the import for _replay_adapter to the module-level imports alongside the
other top imports and remove the inline import statement from the test body so
the test uses the module-level _replay_adapter import instead.
backend/tests/e2e/services/sse/test_partitioned_event_router.py (1)

49-51: Test accesses private _poll_stream method.

Similar to the notification test, this test directly calls svc._poll_stream() which is a private method. While acceptable for e2e tests that need to verify internal behavior, consider documenting this as intentional coupling for integration testing purposes.

The test otherwise correctly validates the publish-to-stream flow.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@backend/tests/e2e/services/sse/test_partitioned_event_router.py` around lines
49 - 51, This test intentionally calls the private method svc._poll_stream(...)
to validate internal publish-to-stream behavior; update the test's docstring or
add a brief inline comment above the call to explain that accessing the private
method is deliberate for end-to-end/integration verification (i.e., intentional
coupling), referencing svc._poll_stream and the read-back lines so future
maintainers understand why the private API is used instead of a public wrapper.
backend/app/services/sse/sse_service.py (2)

72-80: Unbounded polling loop lacks cancellation-safe cleanup.

The _poll_stream method runs an infinite loop that only terminates when the caller stops iterating. This is correct for SSE streaming, but consider whether there should be any explicit cancellation handling or timeout mechanism to prevent resource leaks if consumers abandon the generator without proper cleanup.

The current pattern relies on the caller to properly close the async generator, which should work with async for and context managers, but worth noting in documentation.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@backend/app/services/sse/sse_service.py` around lines 72 - 80, The
_poll_stream async generator can leak if iteration is abandoned; update
_poll_stream to be cancellation-safe by wrapping the polling loop in a
try/finally (or try/except asyncio.CancelledError) and perform cleanup in the
finally block (e.g., release any connection/stream handles returned by
_read_after or notify any owner via an Event/close callback), and optionally
accept an optional cancellation token/timeout parameter to break the loop;
reference _poll_stream, _read_after, adapter, last_id, and _poll_interval when
making the changes so the generator reliably cleans up resources when the caller
stops iterating or the task is cancelled.

53-55: Consider adding error handling for Redis operations.

The _xpublish method makes two Redis calls (xadd and expire) without error handling. If Redis is unavailable or experiences issues, this will propagate exceptions to callers. Consider whether these should be wrapped in try/except with logging for resilience, or if propagating the exception is the intended behavior.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@backend/app/services/sse/sse_service.py` around lines 53 - 55, Wrap the two
Redis calls in _xpublish (self._redis.xadd and self._redis.expire) in a
try/except block to catch exceptions (e.g., Exception as e), log the failure
including the stream key, _MAXLEN and _STREAM_TTL context and the exception (use
the service logger, e.g., self._logger.error(...)), and then re-raise the
exception so callers can handle it (or, if you prefer resilience, return
gracefully after logging instead); update the _xpublish method to perform this
try/except around the xadd/expire sequence referencing _xpublish, _redis, xadd,
expire, _MAXLEN and _STREAM_TTL.
backend/tests/e2e/notifications/test_notification_sse.py (1)

36-38: Test couples to private SSE service internals.

The test directly accesses sse._poll_stream() (a private method with _ prefix) and imports _notif_adapter from the implementation module. This creates tight coupling to internal implementation details.

Consider using the public create_notification_stream() method instead, which is the intended public API for consuming notification streams. This would make the test more resilient to internal refactoring.

♻️ Suggested approach using public API
-    # Read back from stream
-    gen = sse._poll_stream(f"sse:notif:{user_id}", _notif_adapter)
-    msg: DomainNotificationSSEPayload = await asyncio.wait_for(gen.__anext__(), timeout=5.0)
+    # Read back from stream using public API
+    gen = sse.create_notification_stream(user_id=user_id)
+    raw = await asyncio.wait_for(gen.__anext__(), timeout=5.0)
+    # Parse the SSE payload
+    msg_data = json.loads(raw["data"])

This would require importing json and adjusting assertions to work with the dict/JSON structure.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@backend/tests/e2e/notifications/test_notification_sse.py` around lines 36 -
38, The test currently couples to internals by calling sse._poll_stream and
importing _notif_adapter; replace that with the public
create_notification_stream API: call
create_notification_stream(f"sse:notif:{user_id}") to obtain the async
generator/stream, import/use the public adapter (or let
create_notification_stream provide the correct adapter), await the next event
from that generator, parse the payload as JSON/dict (import json if needed) and
adjust assertions to examine the resulting dict structure rather than
implementation-specific payload objects; update references to _poll_stream and
_notif_adapter to use create_notification_stream and the JSON/dict shape.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@backend/tests/e2e/conftest.py`:
- Around line 40-43: Update the docstring to reflect actual behavior: instead of
saying it "Reads from '0-0' on each iteration," state that the stream read
starts from "0-0" on the first iteration and then advances using the last_id
variable after each message so subsequent reads continue from the most recent
event; reference the loop variable last_id in the docstring to make behavior
explicit.
- Around line 98-103: wait_for_notification currently reads the stream from the
start every loop and thus returns on stale entries; change it to be cursor-aware
like wait_for_sse_event by tracking a last_id cursor (initialize it to the
stream end or fetch the current last id once), pass that cursor into
_read_stream on each iteration so only entries with id > last_id are considered,
update last_id when new entries are returned and return only when a new entry is
observed; reference the wait_for_notification function and the helper
_read_stream to implement this cursor-based loop.

In `@frontend/src/lib/admin/stores/__tests__/sagasStore.test.ts`:
- Around line 43-46: Update the afterEach teardown to call cleanup() for Vitest
hygiene: inside the existing afterEach that currently calls teardown?.() and
vi.clearAllTimers(), also invoke cleanup() (imported from the testing library)
to ensure DOM and test side-effects are fully reset; adjust the
teardown/afterEach block where teardown and vi.clearAllTimers() are referenced
so it calls cleanup() before/after teardown as appropriate.

---

Outside diff comments:
In `@backend/app/services/event_replay/replay_service.py`:
- Around line 34-41: The EventReplayService __init__ is missing the Settings
injection required by our DI contract; add a settings: Settings parameter to the
constructor signature for EventReplayService and store it on self (e.g.,
self.settings = settings), updating any callers/instantiation sites to provide
the Settings object; reference the existing symbols ReplayRepository,
UnifiedProducer, ReplayMetrics, structlog.stdlib.BoundLogger, SSEService and
ensure the constructor now accepts and retains Settings alongside those
dependencies.

---

Duplicate comments:
In `@backend/app/services/event_replay/replay_service.py`:
- Around line 195-200: The division by session.config.speed_multiplier in
_calculate_replay_delay can raise on zero/negative values; update the method
(function _calculate_replay_delay in replay_service.py) to explicitly guard
ReplaySessionState.config.speed_multiplier before dividing (e.g., if
speed_multiplier <= 0: log or raise a clear error and return 0 or fallback to a
safe default) so you never divide by zero; alternatively ensure
ReplayConfig.speed_multiplier has a gt=0 constraint, but if you keep runtime
protection add the check immediately after obtaining session.config and before
using it with _peek_next_event.

In `@backend/app/services/notification_service.py`:
- Around line 145-149: The warning in NotificationService uses string
interpolation and logs user-controlled user_id directly; change the
self.logger.warning call in notification_service.py (the block that references
user_id and self.settings.NOTIF_THROTTLE_MAX_PER_HOUR /
NOTIF_THROTTLE_WINDOW_HOURS) to use a static message string and pass dynamic
values as structured kwargs (e.g., user_id, max_per_hour, window_hours) instead
of embedding them into the message so the logger records fields separately.
- Around line 300-314: The code double-checks throttling in
_create_system_for_user (calling _check_throttle) and again inside
create_notification, causing two throttle slots to be consumed; fix by
consolidating the check: either remove the explicit _check_throttle call in
_create_system_for_user, or (preferred) add a flag to create_notification (e.g.,
skip_throttle or perform_throttle: bool = True) that defaults to performing the
throttle check, update create_notification to respect this flag and skip its
internal _check_throttle when skip_throttle=True, then call create_notification
from _create_system_for_user with the flag set so the throttle is only evaluated
once and NotificationThrottledError handling remains correct.

---

Nitpick comments:
In `@backend/app/services/sse/sse_service.py`:
- Around line 72-80: The _poll_stream async generator can leak if iteration is
abandoned; update _poll_stream to be cancellation-safe by wrapping the polling
loop in a try/finally (or try/except asyncio.CancelledError) and perform cleanup
in the finally block (e.g., release any connection/stream handles returned by
_read_after or notify any owner via an Event/close callback), and optionally
accept an optional cancellation token/timeout parameter to break the loop;
reference _poll_stream, _read_after, adapter, last_id, and _poll_interval when
making the changes so the generator reliably cleans up resources when the caller
stops iterating or the task is cancelled.
- Around line 53-55: Wrap the two Redis calls in _xpublish (self._redis.xadd and
self._redis.expire) in a try/except block to catch exceptions (e.g., Exception
as e), log the failure including the stream key, _MAXLEN and _STREAM_TTL context
and the exception (use the service logger, e.g., self._logger.error(...)), and
then re-raise the exception so callers can handle it (or, if you prefer
resilience, return gracefully after logging instead); update the _xpublish
method to perform this try/except around the xadd/expire sequence referencing
_xpublish, _redis, xadd, expire, _MAXLEN and _STREAM_TTL.

In `@backend/tests/e2e/notifications/test_notification_sse.py`:
- Around line 36-38: The test currently couples to internals by calling
sse._poll_stream and importing _notif_adapter; replace that with the public
create_notification_stream API: call
create_notification_stream(f"sse:notif:{user_id}") to obtain the async
generator/stream, import/use the public adapter (or let
create_notification_stream provide the correct adapter), await the next event
from that generator, parse the payload as JSON/dict (import json if needed) and
adjust assertions to examine the resulting dict structure rather than
implementation-specific payload objects; update references to _poll_stream and
_notif_adapter to use create_notification_stream and the JSON/dict shape.

In `@backend/tests/e2e/services/sse/test_partitioned_event_router.py`:
- Around line 49-51: This test intentionally calls the private method
svc._poll_stream(...) to validate internal publish-to-stream behavior; update
the test's docstring or add a brief inline comment above the call to explain
that accessing the private method is deliberate for end-to-end/integration
verification (i.e., intentional coupling), referencing svc._poll_stream and the
read-back lines so future maintainers understand why the private API is used
instead of a public wrapper.

In `@backend/tests/unit/services/sse/test_sse_service.py`:
- Around line 156-180: Test directly uses the private _notif_adapter and xadd to
push a stream entry; change the test to use the public publish_notification API
instead so it exercises the public contract rather than internal adapter
details: create the DomainNotificationSSEPayload (or equivalent public DTO) and
call publish_notification (or svc.publish_notification) to write to the stream,
then use create_notification_stream and _FakeRedis as before to assert the
yielded event; this removes coupling to _notif_adapter and xadd while keeping
the same assertions on create_notification_stream.
- Around line 209-220: The test has a late inline import of _replay_adapter
inside the test function; move the import for _replay_adapter to the
module-level imports alongside the other top imports and remove the inline
import statement from the test body so the test uses the module-level
_replay_adapter import instead.

In `@frontend/src/lib/admin/stores/eventsStore.svelte.ts`:
- Around line 42-43: Define a named interface (e.g. EventsStoreOptions) for the
options object currently typed inline and replace the inline type in the
constructor signature (constructor({ autoRefresh = true }: { autoRefresh?:
boolean } = {})) with that interface; then reuse the same EventsStoreOptions
interface for the factory/creator function signature referenced around lines
240-241 so both constructor and the factory accept the same typed options.
Ensure the interface is exported/visible where both constructor and the factory
(e.g., createEventsStore or similar) can reference it.

In `@frontend/src/lib/admin/stores/executionsStore.svelte.ts`:
- Around line 26-27: Create a named interface for the store options (e.g.,
ExecutionsStoreOptions) and replace the inline object type occurrences with that
interface: update the constructor signature (the constructor({ autoRefresh =
true }: { autoRefresh?: boolean } = {})) to use the new interface and update the
other occurrence around the code referenced (lines 85-86) to accept/annotate the
same ExecutionsStoreOptions type; ensure the interface declares autoRefresh?:
boolean with the existing default behavior preserved.

In `@frontend/src/lib/admin/stores/sagasStore.svelte.ts`:
- Around line 23-24: Define a named interface (e.g., SagasStoreOptions {
autoRefresh?: boolean }) and replace the inline object-shape annotation in the
constructor signature (currently written as constructor({ autoRefresh = true }:
{ autoRefresh?: boolean } = {})) with that interface (constructor(options:
SagasStoreOptions = { autoRefresh: true }) or equivalent destructuring using
SagasStoreOptions); apply the same replacement to the other occurrence
referenced (the factory/initializer at lines 82-83) so both use the shared
SagasStoreOptions interface for consistency.

ℹ️ Review info
⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro

Run ID: 45975b93-5730-4a2e-8097-cbcf155bc2f5

📥 Commits

Reviewing files that changed from the base of the PR and between 22b09ba and 99f2c0f.

📒 Files selected for processing (21)
  • backend/app/core/providers.py
  • backend/app/events/handlers.py
  • backend/app/services/event_replay/replay_service.py
  • backend/app/services/notification_service.py
  • backend/app/services/sse/__init__.py
  • backend/app/services/sse/redis_bus.py
  • backend/app/services/sse/sse_service.py
  • backend/tests/e2e/conftest.py
  • backend/tests/e2e/core/test_dishka_lifespan.py
  • backend/tests/e2e/notifications/test_notification_sse.py
  • backend/tests/e2e/services/sse/test_partitioned_event_router.py
  • backend/tests/unit/services/sse/test_redis_bus.py
  • backend/tests/unit/services/sse/test_sse_publish.py
  • backend/tests/unit/services/sse/test_sse_service.py
  • docs/architecture/lifecycle.md
  • frontend/src/lib/admin/stores/__tests__/eventsStore.test.ts
  • frontend/src/lib/admin/stores/__tests__/executionsStore.test.ts
  • frontend/src/lib/admin/stores/__tests__/sagasStore.test.ts
  • frontend/src/lib/admin/stores/eventsStore.svelte.ts
  • frontend/src/lib/admin/stores/executionsStore.svelte.ts
  • frontend/src/lib/admin/stores/sagasStore.svelte.ts
💤 Files with no reviewable changes (2)
  • backend/app/services/sse/redis_bus.py
  • backend/tests/unit/services/sse/test_redis_bus.py

Comment on lines +40 to 43
"""Poll execution's Redis Stream and await matching event.

The SSE bridge publishes all execution lifecycle events to
sse:exec:{execution_id}. Pure event-driven — no polling.
Reads from "0-0" on each iteration so late subscribers never miss events.
"""
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Docstring behavior is stale vs implementation.

Line 42 says reads start from "0-0" on each iteration, but the loop advances last_id after each message. Please update the wording to match actual behavior.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@backend/tests/e2e/conftest.py` around lines 40 - 43, Update the docstring to
reflect actual behavior: instead of saying it "Reads from '0-0' on each
iteration," state that the stream read starts from "0-0" on the first iteration
and then advances using the last_id variable after each message so subsequent
reads continue from the most recent event; reference the loop variable last_id
in the docstring to make behavior explicit.

⚠️ Potential issue | 🟡 Minor

Updated docstrings are not in Google style.

Please convert these docstrings to include structured sections (at least Args/Returns, and Raises where relevant).

As per coding guidelines, "Use Google-style docstrings with Args/Returns/Raises sections".

Also applies to: 91-97

Comment on lines +98 to +103
key = f"sse:notif:{user_id}"
async with asyncio.timeout(timeout):
async with redis_client.pubsub(ignore_subscribe_messages=True) as pubsub:
await pubsub.subscribe(channel)
async for _message in pubsub.listen():
return # first message = notification persisted
while True:
if await _read_stream(redis_client, key):
return
await asyncio.sleep(0.1)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

🧩 Analysis chain

🏁 Script executed:

#!/bin/bash
# Examine the notification waiting logic
rg -n -C3 'async def wait_for_notification|_read_stream\(redis_client, key\)|create_execution_with_notification|wait_for_notification\(' backend/tests/e2e/conftest.py

Repository: HardMax71/Integr8sCode

Length of output: 925


🏁 Script executed:

#!/bin/bash
# Find and examine _read_stream function definition
rg -n -B2 -A10 'async def _read_stream' backend/tests/e2e/conftest.py

Repository: HardMax71/Integr8sCode

Length of output: 417


🏁 Script executed:

#!/bin/bash
# Check for stream cleanup/setup in the conftest file
rg -n 'asyncio.run\|redis_client.delete\|FLUSHDB\|xdel\|xtrim\|fixture\|setUp\|tearDown' backend/tests/e2e/conftest.py | head -30

Repository: HardMax71/Integr8sCode

Length of output: 48


🏁 Script executed:

#!/bin/bash
# Read the entire conftest.py to understand full context
wc -l backend/tests/e2e/conftest.py

Repository: HardMax71/Integr8sCode

Length of output: 99


🏁 Script executed:

#!/bin/bash
# Read the entire conftest.py file
cat -n backend/tests/e2e/conftest.py

Repository: HardMax71/Integr8sCode

Length of output: 8714


wait_for_notification uses stale stream entries instead of waiting for new ones.

The function reads from "0-0" (stream start) on every iteration, so any pre-existing entries in the stream satisfy the wait immediately. Compare with wait_for_sse_event (lines 33–54), which correctly tracks cursor position with last_id to avoid replay. This causes tests to pass incorrectly when the notification stream contains old entries.

Suggested fix (cursor-aware wait)
 async def wait_for_notification(
     redis_client: redis.Redis,
     user_id: str,
     *,
+    after: str = "0-0",
     timeout: float = 30.0,
 ) -> None:
     """Wait for a notification on the user's Redis Stream.
 
     The notification service publishes to sse:notif:{user_id} only after
     persisting to MongoDB, so receiving a message is a correct readiness
     signal — unlike RESULT_STORED which comes from an independent consumer
     group with no ordering guarantee.
     """
     key = f"sse:notif:{user_id}"
+    last_id = after
     async with asyncio.timeout(timeout):
         while True:
-            if await _read_stream(redis_client, key):
+            batch = await _read_stream(redis_client, key, last_id)
+            if batch:
                 return
+            last_id = batch[-1][0] if batch else last_id
             await asyncio.sleep(0.1)
 async def create_execution_with_notification(
     client: AsyncClient,
     redis_client: redis.Redis,
     request: ExecutionRequest,
     *,
     timeout: float = 30.0,
 ) -> tuple[ExecutionResponse, NotificationResponse]:
     """Create execution, wait for notification delivery, return (execution, notification).
 
     Fetches user_id from /auth/me, waits on sse:notif:{user_id} (correct
     signal — notification service publishes after MongoDB persist), then
     queries the notification list.
     """
     me_resp = await client.get("/api/v1/auth/me")
     assert me_resp.status_code == 200
     user_id = me_resp.json()["user_id"]
 
+    key = f"sse:notif:{user_id}"
+    existing = await _read_stream(redis_client, key, "0-0")
+    after = existing[-1][0] if existing else "0-0"
+
     execution = await create_execution(client, request)
-    await wait_for_notification(redis_client, user_id, timeout=timeout)
+    await wait_for_notification(redis_client, user_id, after=after, timeout=timeout)
 
     resp = await client.get("/api/v1/notifications", params={"limit": 10})
     assert resp.status_code == 200
     result = NotificationListResponse.model_validate(resp.json())
     assert result.notifications, "No notification after SSE delivery"
     return execution, result.notifications[0]
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@backend/tests/e2e/conftest.py` around lines 98 - 103, wait_for_notification
currently reads the stream from the start every loop and thus returns on stale
entries; change it to be cursor-aware like wait_for_sse_event by tracking a
last_id cursor (initialize it to the stream end or fetch the current last id
once), pass that cursor into _read_stream on each iteration so only entries with
id > last_id are considered, update last_id when new entries are returned and
return only when a new entry is observed; reference the wait_for_notification
function and the helper _read_stream to implement this cursor-based loop.

Comment on lines 43 to 46
afterEach(() => {
teardown?.();
vi.clearAllTimers();
});
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

🧩 Analysis chain

🏁 Script executed:

cat -n frontend/src/lib/admin/stores/__tests__/sagasStore.test.ts | head -60

Repository: HardMax71/Integr8sCode

Length of output: 2342


🏁 Script executed:

# Check if cleanup is imported anywhere in the file
rg "cleanup" frontend/src/lib/admin/stores/__tests__/sagasStore.test.ts

Repository: HardMax71/Integr8sCode

Length of output: 48


🏁 Script executed:

# Check the beforeEach structure to see if vi.clearAllMocks() exists
rg "beforeEach|afterEach" -A 5 frontend/src/lib/admin/stores/__tests__/sagasStore.test.ts

Repository: HardMax71/Integr8sCode

Length of output: 613


🏁 Script executed:

rg "render\(|@testing-library" frontend/src/lib/admin/stores/__tests__/sagasStore.test.ts

Repository: HardMax71/Integr8sCode

Length of output: 48


🏁 Script executed:

# Also check the full afterEach and surrounding context
wc -l frontend/src/lib/admin/stores/__tests__/sagasStore.test.ts

Repository: HardMax71/Integr8sCode

Length of output: 128


🏁 Script executed:

# Get the complete file to understand test scope
cat frontend/src/lib/admin/stores/__tests__/sagasStore.test.ts

Repository: HardMax71/Integr8sCode

Length of output: 8408


Add cleanup() in afterEach for Vitest test hygiene.

The afterEach hook is missing the cleanup() call required by the frontend test guideline. While this store test doesn't render components, cleanup() should still be called for consistency with the testing standards.

🧪 Suggested patch
+import { cleanup } from '@testing-library/svelte';
@@
     afterEach(() => {
+        cleanup();
         teardown?.();
         vi.clearAllTimers();
     });
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@frontend/src/lib/admin/stores/__tests__/sagasStore.test.ts` around lines 43 -
46, Update the afterEach teardown to call cleanup() for Vitest hygiene: inside
the existing afterEach that currently calls teardown?.() and
vi.clearAllTimers(), also invoke cleanup() (imported from the testing library)
to ensure DOM and test side-effects are fully reset; adjust the
teardown/afterEach block where teardown and vi.clearAllTimers() are referenced
so it calls cleanup() before/after teardown as appropriate.

@sonarqubecloud
Copy link

sonarqubecloud bot commented Mar 5, 2026

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants