Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/stack-tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ jobs:
timeout-minutes: 5
run: |
cd backend
uv run pytest tests/unit -v -rs \
uv run --no-sync pytest tests/unit -v -rs \
--durations=0 \
--cov=app \
--cov-report=xml --cov-report=term
Expand Down
30 changes: 14 additions & 16 deletions backend/app/core/providers.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@
from app.services.runtime_settings import RuntimeSettingsLoader
from app.services.saga import SagaOrchestrator, SagaService
from app.services.saved_script_service import SavedScriptService
from app.services.sse import SSERedisBus, SSEService
from app.services.sse import SSEService
from app.services.user_settings_service import UserSettingsService
from app.settings import Settings

Expand Down Expand Up @@ -131,6 +131,7 @@ async def get_redis_client(
decode_responses=settings.REDIS_DECODE_RESPONSES,
socket_connect_timeout=5,
socket_timeout=5,
socket_keepalive=True,
)
# Test connection
await client.ping() # type: ignore[misc] # redis-py returns Awaitable[bool] | bool
Expand Down Expand Up @@ -351,22 +352,19 @@ class SSEProvider(Provider):
scope = Scope.APP

@provide
def get_sse_redis_bus(
def get_sse_service(
self,
redis_client: redis.Redis,
execution_repository: ExecutionRepository,
logger: structlog.stdlib.BoundLogger,
connection_metrics: ConnectionMetrics,
) -> SSERedisBus:
return SSERedisBus(redis_client, logger, connection_metrics)

@provide
def get_sse_service(
self,
bus: SSERedisBus,
execution_repository: ExecutionRepository,
logger: structlog.stdlib.BoundLogger,
) -> SSEService:
return SSEService(bus=bus, execution_repository=execution_repository, logger=logger)
return SSEService(
redis_client=redis_client,
execution_repository=execution_repository,
logger=logger,
connection_metrics=connection_metrics,
)


class AuthProvider(Provider):
Expand Down Expand Up @@ -483,15 +481,15 @@ def get_notification_service(
self,
notification_repository: NotificationRepository,
kafka_event_service: KafkaEventService,
sse_redis_bus: SSERedisBus,
sse_service: SSEService,
settings: Settings,
logger: structlog.stdlib.BoundLogger,
notification_metrics: NotificationMetrics,
) -> NotificationService:
return NotificationService(
notification_repository=notification_repository,
event_service=kafka_event_service,
sse_bus=sse_redis_bus,
sse_service=sse_service,
settings=settings,
logger=logger,
notification_metrics=notification_metrics,
Expand Down Expand Up @@ -731,12 +729,12 @@ def get_event_replay_service(
kafka_producer: UnifiedProducer,
replay_metrics: ReplayMetrics,
logger: structlog.stdlib.BoundLogger,
sse_bus: SSERedisBus,
sse_service: SSEService,
) -> EventReplayService:
return EventReplayService(
repository=replay_repository,
producer=kafka_producer,
replay_metrics=replay_metrics,
logger=logger,
sse_bus=sse_bus,
sse_service=sse_service,
)
4 changes: 0 additions & 4 deletions backend/app/db/docs/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,6 @@
from app.db.docs.saved_script import SavedScriptDocument
from app.db.docs.user import UserDocument
from app.db.docs.user_settings import (
EditorSettings,
NotificationSettings,
UserSettingsDocument,
UserSettingsSnapshotDocument,
)
Expand Down Expand Up @@ -60,8 +58,6 @@
# User Settings
"UserSettingsDocument",
"UserSettingsSnapshotDocument",
"NotificationSettings",
"EditorSettings",
# Saga
"SagaDocument",
# DLQ
Expand Down
5 changes: 1 addition & 4 deletions backend/app/db/docs/saga.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,7 @@


class SagaDocument(Document):
"""Domain model for saga stored in database.

Copied from Saga/SagaInstance dataclass.
"""
"""Domain model for saga stored in database."""

saga_id: Indexed(str, unique=True) = Field(default_factory=lambda: str(uuid4())) # type: ignore[valid-type]
saga_name: Indexed(str) # type: ignore[valid-type]
Expand Down
31 changes: 4 additions & 27 deletions backend/app/db/docs/user_settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,13 @@
from typing import Any

from beanie import Document, Indexed
from pydantic import BaseModel, ConfigDict, Field, field_validator
from pydantic import BaseModel, ConfigDict, Field

from app.domain.enums import NotificationChannel, Theme


class NotificationSettings(BaseModel):
"""User notification preferences (embedded document).

Copied from user_settings.py NotificationSettings.
"""
"""User notification preferences (embedded document)."""

model_config = ConfigDict(from_attributes=True)

Expand All @@ -23,10 +20,7 @@ class NotificationSettings(BaseModel):


class EditorSettings(BaseModel):
"""Code editor preferences (embedded document).

Copied from user_settings.py EditorSettings.
"""
"""Code editor preferences (embedded document)."""

model_config = ConfigDict(from_attributes=True)

Expand All @@ -36,26 +30,9 @@ class EditorSettings(BaseModel):
word_wrap: bool = True
show_line_numbers: bool = True

@field_validator("font_size")
@classmethod
def validate_font_size(cls, v: int) -> int:
if v < 8 or v > 32:
raise ValueError("Font size must be between 8 and 32")
return v

@field_validator("tab_size")
@classmethod
def validate_tab_size(cls, v: int) -> int:
if v not in (2, 4, 8):
raise ValueError("Tab size must be 2, 4, or 8")
return v


class UserSettingsDocument(Document):
"""Complete user settings model.

Copied from UserSettings schema.
"""
"""Complete user settings model."""

user_id: Indexed(str, unique=True) # type: ignore[valid-type]
theme: Theme = Theme.AUTO
Expand Down
28 changes: 12 additions & 16 deletions backend/app/domain/events/typed.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,11 +58,12 @@ class BaseEvent(BaseModel):
metadata: EventMetadata


# --- Execution Events ---
# --- Execution Spec (shared fields between ExecutionRequestedEvent and CreatePodCommandEvent) ---


class ExecutionRequestedEvent(BaseEvent):
event_type: Literal[EventType.EXECUTION_REQUESTED] = EventType.EXECUTION_REQUESTED
class ExecutionSpec(BaseModel):
"""Shared execution specification fields (mixin for ExecutionRequestedEvent and CreatePodCommandEvent)."""

execution_id: str
script: str
language: str
Expand All @@ -78,6 +79,13 @@ class ExecutionRequestedEvent(BaseEvent):
priority: QueuePriority = QueuePriority.NORMAL


# --- Execution Events ---


class ExecutionRequestedEvent(BaseEvent, ExecutionSpec):
event_type: Literal[EventType.EXECUTION_REQUESTED] = EventType.EXECUTION_REQUESTED


class ExecutionAcceptedEvent(BaseEvent):
event_type: Literal[EventType.EXECUTION_ACCEPTED] = EventType.EXECUTION_ACCEPTED
execution_id: str
Expand Down Expand Up @@ -413,22 +421,10 @@ class SagaCompensatedEvent(BaseEvent):
# --- Saga Command Events ---


class CreatePodCommandEvent(BaseEvent):
class CreatePodCommandEvent(BaseEvent, ExecutionSpec):
event_type: Literal[EventType.CREATE_POD_COMMAND] = EventType.CREATE_POD_COMMAND
saga_id: str
execution_id: str
script: str
language: str
language_version: str
runtime_image: str
runtime_command: list[str] = Field(default_factory=list)
runtime_filename: str
timeout_seconds: int
cpu_limit: str
memory_limit: str
cpu_request: str
memory_request: str
priority: QueuePriority = QueuePriority.NORMAL


class DeletePodCommandEvent(BaseEvent):
Expand Down
2 changes: 0 additions & 2 deletions backend/app/domain/saga/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
SagaConfig,
SagaContextData,
SagaFilter,
SagaInstance,
SagaListResult,
SagaQuery,
)
Expand All @@ -25,7 +24,6 @@
"SagaCancellationResult",
"SagaConfig",
"SagaContextData",
"SagaInstance",
"SagaFilter",
"SagaListResult",
"SagaQuery",
Expand Down
19 changes: 0 additions & 19 deletions backend/app/domain/saga/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,25 +92,6 @@ class SagaConfig:
publish_commands: bool = True


@dataclass
class SagaInstance:
"""Runtime instance of a saga execution (domain)."""

saga_name: str
execution_id: str
state: SagaState = SagaState.CREATED
saga_id: str = field(default_factory=lambda: str(uuid4()))
current_step: str | None = None
completed_steps: list[str] = field(default_factory=list)
compensated_steps: list[str] = field(default_factory=list)
context_data: SagaContextData = field(default_factory=SagaContextData)
error_message: str | None = None
created_at: datetime = field(default_factory=lambda: datetime.now(timezone.utc))
updated_at: datetime = field(default_factory=lambda: datetime.now(timezone.utc))
completed_at: datetime | None = None
retry_count: int = 0


@dataclass
class SagaCancellationResult:
"""Domain result for saga cancellation operations."""
Expand Down
6 changes: 3 additions & 3 deletions backend/app/events/handlers.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
from app.services.notification_service import NotificationService
from app.services.result_processor import ResultProcessor
from app.services.saga import SagaOrchestrator
from app.services.sse import SSERedisBus
from app.services.sse import SSEService
from app.settings import Settings

_sse_field_names: frozenset[str] = frozenset(f.name for f in dataclasses.fields(SSEExecutionEventData))
Expand Down Expand Up @@ -261,14 +261,14 @@ def register_sse_subscriber(broker: KafkaBroker, settings: Settings) -> None:
)
async def on_sse_event(
body: DomainEvent,
sse_bus: FromDishka[SSERedisBus],
sse_service: FromDishka[SSEService],
) -> None:
execution_id = getattr(body, "execution_id", None)
if execution_id:
sse_data = SSEExecutionEventData(**{
k: v for k, v in body.model_dump().items() if k in _sse_field_names
})
await sse_bus.publish_event(execution_id, sse_data)
await sse_service.publish_event(execution_id, sse_data)


def register_notification_subscriber(broker: KafkaBroker) -> None:
Expand Down
16 changes: 1 addition & 15 deletions backend/app/schemas_pydantic/user_settings.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
from datetime import datetime, timezone
from typing import Any

from pydantic import BaseModel, ConfigDict, Field, field_validator
from pydantic import BaseModel, ConfigDict, Field

from app.domain.enums import EventType, NotificationChannel, Theme

Expand Down Expand Up @@ -29,20 +29,6 @@ class EditorSettings(BaseModel):
word_wrap: bool = True
show_line_numbers: bool = True

@field_validator("font_size")
@classmethod
def validate_font_size(cls, v: int) -> int:
if v < 8 or v > 32:
raise ValueError("Font size must be between 8 and 32")
return v

@field_validator("tab_size")
@classmethod
def validate_tab_size(cls, v: int) -> int:
if v not in (2, 4, 8):
raise ValueError("Tab size must be 2, 4, or 8")
return v


class UserSettings(BaseModel):
"""Complete user settings model"""
Expand Down
Loading
Loading