diff --git a/backend/app/api/routes/execution.py b/backend/app/api/routes/execution.py index 03a17f50..af8736fa 100644 --- a/backend/app/api/routes/execution.py +++ b/backend/app/api/routes/execution.py @@ -152,8 +152,8 @@ async def get_execution_events( limit: Annotated[int, Query(ge=1, le=1000)] = 100, ) -> list[DomainEvent]: """Get all events for an execution.""" - events = await event_service.get_events_by_aggregate( - aggregate_id=execution.execution_id, event_types=event_types, limit=limit + events = await event_service.get_events_by_execution_id( + execution_id=execution.execution_id, event_types=event_types, limit=limit ) return events diff --git a/backend/app/db/repositories/event_repository.py b/backend/app/db/repositories/event_repository.py index f9b9d54a..9a510e77 100644 --- a/backend/app/db/repositories/event_repository.py +++ b/backend/app/db/repositories/event_repository.py @@ -83,6 +83,17 @@ async def get_events_by_aggregate( ) return [DomainEventAdapter.validate_python(d) for d in docs] + async def get_events_by_execution_id( + self, execution_id: str, event_types: list[EventType] | None = None, limit: int = 100 + ) -> list[DomainEvent]: + conditions: list[BaseFindOperator] = [Eq(EventDocument.execution_id, execution_id)] + if event_types: + conditions.append(In(EventDocument.event_type, event_types)) + docs = ( + await EventDocument.find(*conditions).sort([("timestamp", SortDirection.ASCENDING)]).limit(limit).to_list() + ) + return [DomainEventAdapter.validate_python(d) for d in docs] + async def get_execution_events( self, execution_id: str, diff --git a/backend/app/services/event_service.py b/backend/app/services/event_service.py index 9a27a9b7..e99fd8af 100644 --- a/backend/app/services/event_service.py +++ b/backend/app/services/event_service.py @@ -120,3 +120,15 @@ async def get_events_by_aggregate( event_types=event_types, limit=limit, ) + + async def get_events_by_execution_id( + self, + execution_id: str, + event_types: list[EventType] | None = None, + limit: int = 100, + ) -> list[DomainEvent]: + return await self.repository.get_events_by_execution_id( + execution_id=execution_id, + event_types=event_types, + limit=limit, + ) diff --git a/backend/app/services/k8s_worker/worker.py b/backend/app/services/k8s_worker/worker.py index 8b56ab56..c414a5cf 100644 --- a/backend/app/services/k8s_worker/worker.py +++ b/backend/app/services/k8s_worker/worker.py @@ -96,10 +96,7 @@ async def handle_delete_pod_command(self, command: DeletePodCommandEvent) -> Non self.logger.info(f"Successfully deleted pod {pod_name} (ConfigMap will be GC'd by K8s)") except ApiException as e: - if e.status == 404: - self.logger.warning(f"Pod for execution {execution_id} not found (may have already been deleted)") - else: - self.logger.error(f"Failed to delete pod for execution {execution_id}: {e}") + self.logger.warning("Failed to delete pod", execution_id=execution_id, status=e.status, reason=e.reason) async def _create_pod_for_execution(self, command: CreatePodCommandEvent) -> None: """Create pod for execution""" @@ -226,12 +223,13 @@ async def _set_configmap_owner( f"Set ownerReference on ConfigMap {config_map.metadata.name} -> Pod {owner_pod.metadata.name}" ) except ApiException as e: - self.logger.warning(f"Failed to set ownerReference on ConfigMap: {e.reason}") + self.logger.warning("Failed to set ownerReference on ConfigMap", reason=e.reason) async def _publish_pod_created(self, command: CreatePodCommandEvent, pod: k8s_client.V1Pod) -> None: """Publish pod created event""" event = PodCreatedEvent( execution_id=command.execution_id, + aggregate_id=command.aggregate_id, pod_name=pod.metadata.name, namespace=pod.metadata.namespace, metadata=command.metadata, @@ -242,6 +240,7 @@ async def _publish_pod_creation_failed(self, command: CreatePodCommandEvent, err """Publish pod creation failed event""" event = ExecutionFailedEvent( execution_id=command.execution_id, + aggregate_id=command.aggregate_id, error_type=ExecutionErrorType.SYSTEM_ERROR, exit_code=-1, stderr=f"Failed to create pod: {error}", diff --git a/backend/app/services/pod_monitor/__init__.py b/backend/app/services/pod_monitor/__init__.py index 3887442e..bf662cff 100644 --- a/backend/app/services/pod_monitor/__init__.py +++ b/backend/app/services/pod_monitor/__init__.py @@ -1,5 +1,5 @@ from app.services.pod_monitor.config import PodMonitorConfig -from app.services.pod_monitor.event_mapper import PodContext, PodEventMapper, WatchEventType +from app.services.pod_monitor.event_mapper import PodContext, PodEventMapper, PodMonitorEvent, WatchEventType from app.services.pod_monitor.monitor import ErrorType, PodMonitor __all__ = [ @@ -8,5 +8,6 @@ "PodEventMapper", "PodMonitor", "PodMonitorConfig", + "PodMonitorEvent", "WatchEventType", ] diff --git a/backend/app/services/pod_monitor/event_mapper.py b/backend/app/services/pod_monitor/event_mapper.py index 74b98039..107eaafb 100644 --- a/backend/app/services/pod_monitor/event_mapper.py +++ b/backend/app/services/pod_monitor/event_mapper.py @@ -9,7 +9,6 @@ from app.domain.enums import ExecutionErrorType from app.domain.events import ( ContainerStatusInfo, - DomainEvent, EventMetadata, ExecutionCompletedEvent, ExecutionFailedEvent, @@ -22,8 +21,12 @@ # Python 3.12 type aliases type PodPhase = str -type EventList = list[DomainEvent] -type AsyncMapper = Callable[["PodContext"], Awaitable[DomainEvent | None]] +type PodMonitorEvent = ( + PodScheduledEvent | PodRunningEvent | PodTerminatedEvent + | ExecutionCompletedEvent | ExecutionFailedEvent | ExecutionTimeoutEvent +) +type EventList = list[PodMonitorEvent] +type AsyncMapper = Callable[["PodContext"], Awaitable[PodMonitorEvent | None]] class WatchEventType(StringEnum): @@ -114,7 +117,7 @@ async def map_pod_event(self, pod: k8s_client.V1Pod, event_type: WatchEventType) ) # Collect events from mappers - events: list[DomainEvent] = [] + events: list[PodMonitorEvent] = [] # Check for timeout first - if pod timed out, only return timeout event if timeout_event := await self._check_timeout(ctx): @@ -218,6 +221,7 @@ async def _map_scheduled(self, ctx: PodContext) -> PodScheduledEvent | None: evt = PodScheduledEvent( execution_id=ctx.execution_id, + aggregate_id=ctx.execution_id, pod_name=ctx.pod.metadata.name, node_name=ctx.pod.spec.node_name or "pending", metadata=ctx.metadata, @@ -243,6 +247,7 @@ async def _map_running(self, ctx: PodContext) -> PodRunningEvent | None: evt = PodRunningEvent( execution_id=ctx.execution_id, + aggregate_id=ctx.execution_id, pod_name=ctx.pod.metadata.name, container_statuses=container_statuses, metadata=ctx.metadata, @@ -273,7 +278,7 @@ async def _map_completed(self, ctx: PodContext) -> ExecutionCompletedEvent | Non self.logger.info(f"POD-EVENT: mapped completed exec={ctx.execution_id} exit_code={logs.exit_code}") return evt - async def _map_failed_or_completed(self, ctx: PodContext) -> DomainEvent | None: + async def _map_failed_or_completed(self, ctx: PodContext) -> PodMonitorEvent | None: """Map failed pod to either timeout, completed, or failed""" if ctx.pod.status and ctx.pod.status.reason == "DeadlineExceeded": if timeout_event := await self._check_timeout(ctx): @@ -318,6 +323,7 @@ async def _map_terminated(self, ctx: PodContext) -> PodTerminatedEvent | None: terminated = container.state.terminated evt = PodTerminatedEvent( execution_id=ctx.execution_id, + aggregate_id=ctx.execution_id, pod_name=ctx.pod.metadata.name, exit_code=terminated.exit_code, reason=terminated.reason or "Terminated", diff --git a/backend/app/services/pod_monitor/monitor.py b/backend/app/services/pod_monitor/monitor.py index 93881d7b..746800d6 100644 --- a/backend/app/services/pod_monitor/monitor.py +++ b/backend/app/services/pod_monitor/monitor.py @@ -11,10 +11,9 @@ from app.core.metrics import KubernetesMetrics from app.core.utils import StringEnum from app.domain.enums import EventType -from app.domain.events import DomainEvent from app.services.kafka_event_service import KafkaEventService from app.services.pod_monitor.config import PodMonitorConfig -from app.services.pod_monitor.event_mapper import PodEventMapper, WatchEventType +from app.services.pod_monitor.event_mapper import PodEventMapper, PodMonitorEvent, WatchEventType _TERMINAL_EVENT_TYPES: frozenset[str] = frozenset({ EventType.EXECUTION_COMPLETED, @@ -22,11 +21,6 @@ EventType.EXECUTION_TIMEOUT, }) -# Type aliases -type ResourceVersion = str -type KubeEvent = dict[str, Any] - - class ErrorType(StringEnum): """Error types for metrics.""" @@ -42,7 +36,6 @@ class PodEvent: event_type: WatchEventType pod: k8s_client.V1Pod - resource_version: ResourceVersion | None class PodMonitor: @@ -79,12 +72,12 @@ def __init__( self._kafka_event_service = kafka_event_service # Watch cursor — set from LIST on first run or after 410 Gone - self._last_resource_version: ResourceVersion | None = None + self._last_resource_version: str | None = None # Metrics self._metrics = kubernetes_metrics - self.logger.info(f"PodMonitor initialized for namespace {config.namespace}") + self.logger.info("PodMonitor initialized", namespace=config.namespace) async def watch_pod_events(self) -> None: """Run a single bounded K8s watch stream using list-then-watch. @@ -100,8 +93,9 @@ async def watch_pod_events(self) -> None: await self._list_and_process_existing_pods() self.logger.info( - f"Starting pod watch from rv={self._last_resource_version}, " - f"selector: {self.config.label_selector}" + "Starting pod watch", + rv=self._last_resource_version, + selector=self.config.label_selector, ) kwargs: dict[str, Any] = { @@ -114,12 +108,14 @@ async def watch_pod_events(self) -> None: if self.config.field_selector: kwargs["field_selector"] = self.config.field_selector - async for event in self._watch.stream(self._v1.list_namespaced_pod, **kwargs): - await self._process_raw_event(event) - - # Store resource version from watch for next iteration - if self._watch.resource_version: - self._last_resource_version = self._watch.resource_version + try: + async for event in self._watch.stream(self._v1.list_namespaced_pod, **kwargs): + if event["type"] not in WatchEventType: + continue + await self._process_pod_event(PodEvent(event_type=WatchEventType(event["type"]), pod=event["object"])) + finally: + if self._watch.resource_version: + self._last_resource_version = self._watch.resource_version async def _list_and_process_existing_pods(self) -> None: """LIST all matching pods to bootstrap state and get a resource_version cursor.""" @@ -133,82 +129,55 @@ async def _list_and_process_existing_pods(self) -> None: pod_list = await self._v1.list_namespaced_pod(**kwargs) for pod in pod_list.items: - event = PodEvent( - event_type=WatchEventType.ADDED, - pod=pod, - resource_version=pod.metadata.resource_version if pod.metadata else None, - ) + event = PodEvent(event_type=WatchEventType.ADDED, pod=pod) await self._process_pod_event(event) # Use the list's resource_version as the authoritative cursor self._last_resource_version = pod_list.metadata.resource_version self.logger.info( - f"Listed {len(pod_list.items)} existing pods, rv={self._last_resource_version}" + "Listed existing pods", + count=len(pod_list.items), + rv=self._last_resource_version, ) - async def _process_raw_event(self, raw_event: KubeEvent) -> None: - """Process a raw Kubernetes watch event.""" - try: - event = PodEvent( - event_type=WatchEventType(raw_event["type"].upper()), - pod=raw_event["object"], - resource_version=( - raw_event["object"].metadata.resource_version if raw_event["object"].metadata else None - ), - ) - - await self._process_pod_event(event) - - except (KeyError, ValueError) as e: - self.logger.error(f"Invalid event format: {e}") - self._metrics.record_pod_monitor_watch_error(ErrorType.PROCESSING_ERROR) - async def _process_pod_event(self, event: PodEvent) -> None: """Process a pod event.""" start_time = time.time() - try: - # Update resource version for crash recovery - if event.resource_version: - self._last_resource_version = event.resource_version - - # Skip ignored phases - pod_phase = event.pod.status.phase if event.pod.status else None - if pod_phase in self.config.ignored_pod_phases: - return - - pod_name = event.pod.metadata.name + # Skip ignored phases + pod_phase = event.pod.status.phase if event.pod.status else None + if pod_phase in self.config.ignored_pod_phases: + return - # Map to application events + try: app_events = await self._event_mapper.map_pod_event(event.pod, event.event_type) - for app_event in app_events: await self._publish_event(app_event, event.pod) - - if any(e.event_type in _TERMINAL_EVENT_TYPES for e in app_events): - await self._delete_pod(event.pod) - - if app_events: - self.logger.info( - f"Processed {event.event_type} event for pod {pod_name} " - f"(phase: {pod_phase or 'Unknown'}), " - f"published {len(app_events)} events" - ) - + except Exception: + self.logger.error("Error processing pod event", exc_info=True) + self._metrics.record_pod_monitor_watch_error(ErrorType.PROCESSING_ERROR) + return + finally: duration = time.time() - start_time self._metrics.record_pod_monitor_event_processing_duration(duration, event.event_type) - except Exception as e: - self.logger.error(f"Error processing pod event: {e}", exc_info=True) - self._metrics.record_pod_monitor_watch_error(ErrorType.PROCESSING_ERROR) + if any(e.event_type in _TERMINAL_EVENT_TYPES for e in app_events): + await self._delete_pod(event.pod) + + if app_events: + pod_name = event.pod.metadata.name if event.pod.metadata else "unknown" + self.logger.info( + "Processed pod event", + event_type=event.event_type, + pod_name=pod_name, + phase=pod_phase or "Unknown", + published=len(app_events), + ) - async def _publish_event(self, event: DomainEvent, pod: k8s_client.V1Pod) -> None: + async def _publish_event(self, event: PodMonitorEvent, pod: k8s_client.V1Pod) -> None: """Publish event to Kafka and store in events collection.""" - execution_id = getattr(event, "execution_id", None) or event.aggregate_id - key = str(execution_id or (pod.metadata.name if pod.metadata else "unknown")) - - await self._kafka_event_service.publish_event(event=event, key=key) + await self._kafka_event_service.publish_event(event=event, key=event.execution_id) phase = pod.status.phase if pod.status else "Unknown" self._metrics.record_pod_monitor_event_published(event.event_type, phase) @@ -224,9 +193,6 @@ async def _delete_pod(self, pod: k8s_client.V1Pod) -> None: await self._v1.delete_namespaced_pod( name=pod_name, namespace=pod.metadata.namespace, grace_period_seconds=0, ) - self.logger.info(f"Deleted completed pod {pod_name}") + self.logger.info("Deleted completed pod", pod_name=pod_name) except ApiException as e: - if e.status == 404: - self.logger.debug(f"Pod {pod_name} already deleted") - else: - self.logger.warning(f"Failed to delete pod {pod_name}: {e.reason}") + self.logger.warning("Failed to delete pod", pod_name=pod_name, status=e.status, reason=e.reason) diff --git a/backend/app/services/result_processor/processor.py b/backend/app/services/result_processor/processor.py index 8a7371e3..59edb36d 100644 --- a/backend/app/services/result_processor/processor.py +++ b/backend/app/services/result_processor/processor.py @@ -76,10 +76,10 @@ async def handle_execution_completed(self, event: DomainEvent) -> None: meta = event.metadata try: await self._execution_repo.write_terminal_result(result) - await self._publish_result_stored(result, meta.user_id) + await self._publish_result_stored(result, meta.user_id, event.aggregate_id) except Exception as e: self.logger.error(f"Failed to handle ExecutionCompletedEvent: {e}", exc_info=True) - await self._publish_result_failed(event.execution_id, str(e), meta.user_id) + await self._publish_result_failed(event.execution_id, str(e), meta.user_id, event.aggregate_id) async def handle_execution_failed(self, event: DomainEvent) -> None: """Handle execution failed event.""" @@ -108,10 +108,10 @@ async def handle_execution_failed(self, event: DomainEvent) -> None: meta = event.metadata try: await self._execution_repo.write_terminal_result(result) - await self._publish_result_stored(result, meta.user_id) + await self._publish_result_stored(result, meta.user_id, event.aggregate_id) except Exception as e: self.logger.error(f"Failed to handle ExecutionFailedEvent: {e}", exc_info=True) - await self._publish_result_failed(event.execution_id, str(e), meta.user_id) + await self._publish_result_failed(event.execution_id, str(e), meta.user_id, event.aggregate_id) async def handle_execution_timeout(self, event: DomainEvent) -> None: """Handle execution timeout event.""" @@ -141,16 +141,19 @@ async def handle_execution_timeout(self, event: DomainEvent) -> None: meta = event.metadata try: await self._execution_repo.write_terminal_result(result) - await self._publish_result_stored(result, meta.user_id) + await self._publish_result_stored(result, meta.user_id, event.aggregate_id) except Exception as e: self.logger.error(f"Failed to handle ExecutionTimeoutEvent: {e}", exc_info=True) - await self._publish_result_failed(event.execution_id, str(e), meta.user_id) + await self._publish_result_failed(event.execution_id, str(e), meta.user_id, event.aggregate_id) - async def _publish_result_stored(self, result: ExecutionResultDomain, user_id: str) -> None: + async def _publish_result_stored( + self, result: ExecutionResultDomain, user_id: str, aggregate_id: str | None + ) -> None: """Publish result stored event.""" size_bytes = len(result.stdout) + len(result.stderr) event = ResultStoredEvent( execution_id=result.execution_id, + aggregate_id=aggregate_id, storage_path=result.execution_id, size_bytes=size_bytes, storage_type=StorageType.DATABASE, @@ -163,11 +166,12 @@ async def _publish_result_stored(self, result: ExecutionResultDomain, user_id: s await self._producer.produce(event_to_produce=event, key=result.execution_id) async def _publish_result_failed( - self, execution_id: str, error_message: str, user_id: str, + self, execution_id: str, error_message: str, user_id: str, aggregate_id: str | None ) -> None: """Publish result processing failed event.""" event = ResultFailedEvent( execution_id=execution_id, + aggregate_id=aggregate_id, metadata=EventMetadata( service_name="result-processor", service_version="1.0.0", diff --git a/backend/app/services/saga/execution_saga.py b/backend/app/services/saga/execution_saga.py index d1009f6e..455d4cd3 100644 --- a/backend/app/services/saga/execution_saga.py +++ b/backend/app/services/saga/execution_saga.py @@ -101,6 +101,7 @@ async def execute(self, context: SagaContext, event: ExecutionRequestedEvent) -> create_pod_cmd = CreatePodCommandEvent( saga_id=context.saga_id, execution_id=execution_id, + aggregate_id=event.aggregate_id, script=event.script, language=event.language, language_version=event.language_version, @@ -168,6 +169,7 @@ async def compensate(self, context: SagaContext) -> bool: delete_pod_cmd = DeletePodCommandEvent( saga_id=context.saga_id, execution_id=execution_id, + aggregate_id=context.execution_id, reason="Saga compensation due to failure", metadata=EventMetadata( service_name="saga-orchestrator", diff --git a/backend/app/services/saga/saga_orchestrator.py b/backend/app/services/saga/saga_orchestrator.py index 6cda5730..06d2e416 100644 --- a/backend/app/services/saga/saga_orchestrator.py +++ b/backend/app/services/saga/saga_orchestrator.py @@ -396,6 +396,7 @@ async def _publish_saga_started_event( saga_id=instance.saga_id, saga_name=instance.saga_name, execution_id=instance.execution_id, + aggregate_id=trigger_event.aggregate_id, initial_event_id=trigger_event.event_id, metadata=EventMetadata( service_name="saga-orchestrator", @@ -422,6 +423,7 @@ async def _publish_saga_cancelled_event(self, saga_instance: Saga) -> None: saga_id=saga_instance.saga_id, saga_name=saga_instance.saga_name, execution_id=saga_instance.execution_id, + aggregate_id=saga_instance.execution_id, reason=saga_instance.error_message or "User requested cancellation", completed_steps=saga_instance.completed_steps, compensated_steps=saga_instance.compensated_steps, diff --git a/backend/tests/e2e/test_execution_routes.py b/backend/tests/e2e/test_execution_routes.py index 81e7f7a2..e653c95c 100644 --- a/backend/tests/e2e/test_execution_routes.py +++ b/backend/tests/e2e/test_execution_routes.py @@ -10,7 +10,7 @@ import pytest import redis.asyncio as redis from app.domain.enums import EventType, ExecutionStatus -from app.domain.events import ExecutionDomainEvent +from app.domain.events import DomainEvent from app.schemas_pydantic.execution import ( CancelExecutionRequest, CancelResponse, @@ -30,7 +30,7 @@ pytestmark = [pytest.mark.e2e, pytest.mark.k8s] # TypeAdapter for parsing list of execution events from API response -ExecutionEventsAdapter = TypeAdapter(list[ExecutionDomainEvent]) +ExecutionEventsAdapter = TypeAdapter(list[DomainEvent]) # Initial states when execution is created INITIAL_STATES = { diff --git a/backend/tests/unit/services/pod_monitor/test_config_and_init.py b/backend/tests/unit/services/pod_monitor/test_config_and_init.py index de5f2719..87451689 100644 --- a/backend/tests/unit/services/pod_monitor/test_config_and_init.py +++ b/backend/tests/unit/services/pod_monitor/test_config_and_init.py @@ -20,5 +20,6 @@ def test_package_exports() -> None: "PodEventMapper", "PodMonitor", "PodMonitorConfig", + "PodMonitorEvent", "WatchEventType", } diff --git a/backend/tests/unit/services/pod_monitor/test_monitor.py b/backend/tests/unit/services/pod_monitor/test_monitor.py index 583fb8a5..c5f5c0b6 100644 --- a/backend/tests/unit/services/pod_monitor/test_monitor.py +++ b/backend/tests/unit/services/pod_monitor/test_monitor.py @@ -9,7 +9,6 @@ DomainEvent, EventMetadata, ExecutionCompletedEvent, - ExecutionStartedEvent, ResourceUsageDomain, ) from app.events.core import UnifiedProducer @@ -226,47 +225,6 @@ async def test_watch_resets_after_410( assert pm._last_resource_version == "rv11" -@pytest.mark.asyncio -async def test_process_raw_event_invalid( - event_metrics: EventMetrics, kubernetes_metrics: KubernetesMetrics, -) -> None: - cfg = PodMonitorConfig() - pm = make_pod_monitor(event_metrics, kubernetes_metrics, config=cfg) - - # Should not raise - invalid events are caught and logged - await pm._process_raw_event({}) - - -@pytest.mark.asyncio -async def test_process_raw_event_with_metadata( - event_metrics: EventMetrics, kubernetes_metrics: KubernetesMetrics, -) -> None: - cfg = PodMonitorConfig() - pm = make_pod_monitor(event_metrics, kubernetes_metrics, config=cfg) - - processed: list[PodEvent] = [] - - async def mock_process(event: PodEvent) -> None: - processed.append(event) - - pm._process_pod_event = mock_process # type: ignore[method-assign] - - raw_event = { - "type": "ADDED", - "object": types.SimpleNamespace(metadata=types.SimpleNamespace(resource_version="v1")), - } - - await pm._process_raw_event(raw_event) - assert len(processed) == 1 - assert processed[0].resource_version == "v1" - - raw_event_no_meta = {"type": "MODIFIED", "object": types.SimpleNamespace(metadata=None)} - - await pm._process_raw_event(raw_event_no_meta) - assert len(processed) == 2 - assert processed[1].resource_version is None - - @pytest.mark.asyncio async def test_process_pod_event_full_flow( event_metrics: EventMetrics, kubernetes_metrics: KubernetesMetrics, @@ -277,7 +235,7 @@ async def test_process_pod_event_full_flow( class MockMapper: async def map_pod_event(self, pod: Any, event_type: WatchEventType) -> list[Any]: # noqa: ARG002 class Event: - event_type = types.SimpleNamespace(value="test_event") + event_type = "pod.running" aggregate_id = "agg1" return [Event()] @@ -297,26 +255,21 @@ async def mock_publish(event: Any, pod: Any) -> None: # noqa: ARG001 event = PodEvent( event_type=WatchEventType.ADDED, pod=make_pod(name="test-pod", phase="Running"), - resource_version="v1", ) await pm._process_pod_event(event) - assert pm._last_resource_version == "v1" assert len(published) == 1 event_del = PodEvent( event_type=WatchEventType.DELETED, pod=make_pod(name="test-pod", phase="Succeeded"), - resource_version="v2", ) await pm._process_pod_event(event_del) - assert pm._last_resource_version == "v2" event_ignored = PodEvent( event_type=WatchEventType.ADDED, pod=make_pod(name="ignored-pod", phase="Unknown"), - resource_version="v3", ) published.clear() @@ -342,7 +295,6 @@ def clear_cache(self) -> None: event = PodEvent( event_type=WatchEventType.ADDED, pod=make_pod(name="fail-pod", phase="Pending"), - resource_version=None, ) # Should not raise - errors are caught and logged @@ -394,9 +346,10 @@ async def produce( pm = make_pod_monitor(event_metrics, kubernetes_metrics, config=cfg, kafka_service=failing_service) - event = ExecutionStartedEvent( + event = ExecutionCompletedEvent( execution_id="exec1", - pod_name="test-pod", + exit_code=0, + resource_usage=ResourceUsageDomain(), metadata=EventMetadata(service_name="test", service_version="1.0"), ) diff --git a/backend/workers/run_pod_monitor.py b/backend/workers/run_pod_monitor.py index 93ed54ff..c89d3f54 100644 --- a/backend/workers/run_pod_monitor.py +++ b/backend/workers/run_pod_monitor.py @@ -50,20 +50,23 @@ async def init_monitor() -> None: kubernetes_metrics = await container.get(KubernetesMetrics) async def _watch_cycle() -> None: + error_type: ErrorType | None = None try: await monitor.watch_pod_events() except ApiException as e: if e.status == 410: logger.warning("Resource version expired, resetting watch cursor") monitor._last_resource_version = None - kubernetes_metrics.record_pod_monitor_watch_error(ErrorType.RESOURCE_VERSION_EXPIRED) + error_type = ErrorType.RESOURCE_VERSION_EXPIRED else: - logger.error(f"API error in watch: {e}") - kubernetes_metrics.record_pod_monitor_watch_error(ErrorType.API_ERROR) - kubernetes_metrics.increment_pod_monitor_watch_reconnects() - except Exception as e: - logger.error(f"Unexpected error in watch: {e}", exc_info=True) - kubernetes_metrics.record_pod_monitor_watch_error(ErrorType.UNEXPECTED) + logger.error("API error in watch", status=e.status, reason=e.reason) + error_type = ErrorType.API_ERROR + except Exception: + logger.error("Unexpected error in watch", exc_info=True) + error_type = ErrorType.UNEXPECTED + + if error_type is not None: + kubernetes_metrics.record_pod_monitor_watch_error(error_type) kubernetes_metrics.increment_pod_monitor_watch_reconnects() scheduler.add_job(