Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions backend/app/api/routes/execution.py
Original file line number Diff line number Diff line change
Expand Up @@ -152,8 +152,8 @@ async def get_execution_events(
limit: Annotated[int, Query(ge=1, le=1000)] = 100,
) -> list[DomainEvent]:
"""Get all events for an execution."""
events = await event_service.get_events_by_aggregate(
aggregate_id=execution.execution_id, event_types=event_types, limit=limit
events = await event_service.get_events_by_execution_id(
execution_id=execution.execution_id, event_types=event_types, limit=limit
)
return events

Expand Down
11 changes: 11 additions & 0 deletions backend/app/db/repositories/event_repository.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,17 @@ async def get_events_by_aggregate(
)
return [DomainEventAdapter.validate_python(d) for d in docs]

async def get_events_by_execution_id(
self, execution_id: str, event_types: list[EventType] | None = None, limit: int = 100
) -> list[DomainEvent]:
conditions: list[BaseFindOperator] = [Eq(EventDocument.execution_id, execution_id)]
if event_types:
conditions.append(In(EventDocument.event_type, event_types))
docs = (
await EventDocument.find(*conditions).sort([("timestamp", SortDirection.ASCENDING)]).limit(limit).to_list()
)
return [DomainEventAdapter.validate_python(d) for d in docs]

async def get_execution_events(
self,
execution_id: str,
Expand Down
12 changes: 12 additions & 0 deletions backend/app/services/event_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -120,3 +120,15 @@ async def get_events_by_aggregate(
event_types=event_types,
limit=limit,
)

async def get_events_by_execution_id(
self,
execution_id: str,
event_types: list[EventType] | None = None,
limit: int = 100,
) -> list[DomainEvent]:
return await self.repository.get_events_by_execution_id(
execution_id=execution_id,
event_types=event_types,
limit=limit,
)
9 changes: 4 additions & 5 deletions backend/app/services/k8s_worker/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,10 +96,7 @@ async def handle_delete_pod_command(self, command: DeletePodCommandEvent) -> Non
self.logger.info(f"Successfully deleted pod {pod_name} (ConfigMap will be GC'd by K8s)")

except ApiException as e:
if e.status == 404:
self.logger.warning(f"Pod for execution {execution_id} not found (may have already been deleted)")
else:
self.logger.error(f"Failed to delete pod for execution {execution_id}: {e}")
self.logger.warning("Failed to delete pod", execution_id=execution_id, status=e.status, reason=e.reason)

async def _create_pod_for_execution(self, command: CreatePodCommandEvent) -> None:
"""Create pod for execution"""
Expand Down Expand Up @@ -226,12 +223,13 @@ async def _set_configmap_owner(
f"Set ownerReference on ConfigMap {config_map.metadata.name} -> Pod {owner_pod.metadata.name}"
)
except ApiException as e:
self.logger.warning(f"Failed to set ownerReference on ConfigMap: {e.reason}")
self.logger.warning("Failed to set ownerReference on ConfigMap", reason=e.reason)

async def _publish_pod_created(self, command: CreatePodCommandEvent, pod: k8s_client.V1Pod) -> None:
"""Publish pod created event"""
event = PodCreatedEvent(
execution_id=command.execution_id,
aggregate_id=command.aggregate_id,
pod_name=pod.metadata.name,
namespace=pod.metadata.namespace,
metadata=command.metadata,
Expand All @@ -242,6 +240,7 @@ async def _publish_pod_creation_failed(self, command: CreatePodCommandEvent, err
"""Publish pod creation failed event"""
event = ExecutionFailedEvent(
execution_id=command.execution_id,
aggregate_id=command.aggregate_id,
error_type=ExecutionErrorType.SYSTEM_ERROR,
exit_code=-1,
stderr=f"Failed to create pod: {error}",
Expand Down
3 changes: 2 additions & 1 deletion backend/app/services/pod_monitor/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from app.services.pod_monitor.config import PodMonitorConfig
from app.services.pod_monitor.event_mapper import PodContext, PodEventMapper, WatchEventType
from app.services.pod_monitor.event_mapper import PodContext, PodEventMapper, PodMonitorEvent, WatchEventType
from app.services.pod_monitor.monitor import ErrorType, PodMonitor

__all__ = [
Expand All @@ -8,5 +8,6 @@
"PodEventMapper",
"PodMonitor",
"PodMonitorConfig",
"PodMonitorEvent",
"WatchEventType",
]
16 changes: 11 additions & 5 deletions backend/app/services/pod_monitor/event_mapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
from app.domain.enums import ExecutionErrorType
from app.domain.events import (
ContainerStatusInfo,
DomainEvent,
EventMetadata,
ExecutionCompletedEvent,
ExecutionFailedEvent,
Expand All @@ -22,8 +21,12 @@

# Python 3.12 type aliases
type PodPhase = str
type EventList = list[DomainEvent]
type AsyncMapper = Callable[["PodContext"], Awaitable[DomainEvent | None]]
type PodMonitorEvent = (
PodScheduledEvent | PodRunningEvent | PodTerminatedEvent
| ExecutionCompletedEvent | ExecutionFailedEvent | ExecutionTimeoutEvent
)
type EventList = list[PodMonitorEvent]
type AsyncMapper = Callable[["PodContext"], Awaitable[PodMonitorEvent | None]]


class WatchEventType(StringEnum):
Expand Down Expand Up @@ -114,7 +117,7 @@ async def map_pod_event(self, pod: k8s_client.V1Pod, event_type: WatchEventType)
)

# Collect events from mappers
events: list[DomainEvent] = []
events: list[PodMonitorEvent] = []

# Check for timeout first - if pod timed out, only return timeout event
if timeout_event := await self._check_timeout(ctx):
Expand Down Expand Up @@ -218,6 +221,7 @@ async def _map_scheduled(self, ctx: PodContext) -> PodScheduledEvent | None:

evt = PodScheduledEvent(
execution_id=ctx.execution_id,
aggregate_id=ctx.execution_id,
pod_name=ctx.pod.metadata.name,
node_name=ctx.pod.spec.node_name or "pending",
metadata=ctx.metadata,
Expand All @@ -243,6 +247,7 @@ async def _map_running(self, ctx: PodContext) -> PodRunningEvent | None:

evt = PodRunningEvent(
execution_id=ctx.execution_id,
aggregate_id=ctx.execution_id,
pod_name=ctx.pod.metadata.name,
container_statuses=container_statuses,
metadata=ctx.metadata,
Expand Down Expand Up @@ -273,7 +278,7 @@ async def _map_completed(self, ctx: PodContext) -> ExecutionCompletedEvent | Non
self.logger.info(f"POD-EVENT: mapped completed exec={ctx.execution_id} exit_code={logs.exit_code}")
return evt

async def _map_failed_or_completed(self, ctx: PodContext) -> DomainEvent | None:
async def _map_failed_or_completed(self, ctx: PodContext) -> PodMonitorEvent | None:
"""Map failed pod to either timeout, completed, or failed"""
if ctx.pod.status and ctx.pod.status.reason == "DeadlineExceeded":
if timeout_event := await self._check_timeout(ctx):
Expand Down Expand Up @@ -318,6 +323,7 @@ async def _map_terminated(self, ctx: PodContext) -> PodTerminatedEvent | None:
terminated = container.state.terminated
evt = PodTerminatedEvent(
execution_id=ctx.execution_id,
aggregate_id=ctx.execution_id,
pod_name=ctx.pod.metadata.name,
exit_code=terminated.exit_code,
reason=terminated.reason or "Terminated",
Expand Down
122 changes: 44 additions & 78 deletions backend/app/services/pod_monitor/monitor.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,22 +11,16 @@
from app.core.metrics import KubernetesMetrics
from app.core.utils import StringEnum
from app.domain.enums import EventType
from app.domain.events import DomainEvent
from app.services.kafka_event_service import KafkaEventService
from app.services.pod_monitor.config import PodMonitorConfig
from app.services.pod_monitor.event_mapper import PodEventMapper, WatchEventType
from app.services.pod_monitor.event_mapper import PodEventMapper, PodMonitorEvent, WatchEventType

_TERMINAL_EVENT_TYPES: frozenset[str] = frozenset({
EventType.EXECUTION_COMPLETED,
EventType.EXECUTION_FAILED,
EventType.EXECUTION_TIMEOUT,
})

# Type aliases
type ResourceVersion = str
type KubeEvent = dict[str, Any]


class ErrorType(StringEnum):
"""Error types for metrics."""

Expand All @@ -42,7 +36,6 @@ class PodEvent:

event_type: WatchEventType
pod: k8s_client.V1Pod
resource_version: ResourceVersion | None


class PodMonitor:
Expand Down Expand Up @@ -79,12 +72,12 @@ def __init__(
self._kafka_event_service = kafka_event_service

# Watch cursor — set from LIST on first run or after 410 Gone
self._last_resource_version: ResourceVersion | None = None
self._last_resource_version: str | None = None

# Metrics
self._metrics = kubernetes_metrics

self.logger.info(f"PodMonitor initialized for namespace {config.namespace}")
self.logger.info("PodMonitor initialized", namespace=config.namespace)

async def watch_pod_events(self) -> None:
"""Run a single bounded K8s watch stream using list-then-watch.
Expand All @@ -100,8 +93,9 @@ async def watch_pod_events(self) -> None:
await self._list_and_process_existing_pods()

self.logger.info(
f"Starting pod watch from rv={self._last_resource_version}, "
f"selector: {self.config.label_selector}"
"Starting pod watch",
rv=self._last_resource_version,
selector=self.config.label_selector,
)

kwargs: dict[str, Any] = {
Expand All @@ -114,12 +108,14 @@ async def watch_pod_events(self) -> None:
if self.config.field_selector:
kwargs["field_selector"] = self.config.field_selector

async for event in self._watch.stream(self._v1.list_namespaced_pod, **kwargs):
await self._process_raw_event(event)

# Store resource version from watch for next iteration
if self._watch.resource_version:
self._last_resource_version = self._watch.resource_version
try:
async for event in self._watch.stream(self._v1.list_namespaced_pod, **kwargs):
if event["type"] not in WatchEventType:
continue
await self._process_pod_event(PodEvent(event_type=WatchEventType(event["type"]), pod=event["object"]))
finally:
if self._watch.resource_version:
self._last_resource_version = self._watch.resource_version

async def _list_and_process_existing_pods(self) -> None:
"""LIST all matching pods to bootstrap state and get a resource_version cursor."""
Expand All @@ -133,82 +129,55 @@ async def _list_and_process_existing_pods(self) -> None:
pod_list = await self._v1.list_namespaced_pod(**kwargs)

for pod in pod_list.items:
event = PodEvent(
event_type=WatchEventType.ADDED,
pod=pod,
resource_version=pod.metadata.resource_version if pod.metadata else None,
)
event = PodEvent(event_type=WatchEventType.ADDED, pod=pod)
await self._process_pod_event(event)

# Use the list's resource_version as the authoritative cursor
self._last_resource_version = pod_list.metadata.resource_version

self.logger.info(
f"Listed {len(pod_list.items)} existing pods, rv={self._last_resource_version}"
"Listed existing pods",
count=len(pod_list.items),
rv=self._last_resource_version,
)

async def _process_raw_event(self, raw_event: KubeEvent) -> None:
"""Process a raw Kubernetes watch event."""
try:
event = PodEvent(
event_type=WatchEventType(raw_event["type"].upper()),
pod=raw_event["object"],
resource_version=(
raw_event["object"].metadata.resource_version if raw_event["object"].metadata else None
),
)

await self._process_pod_event(event)

except (KeyError, ValueError) as e:
self.logger.error(f"Invalid event format: {e}")
self._metrics.record_pod_monitor_watch_error(ErrorType.PROCESSING_ERROR)

async def _process_pod_event(self, event: PodEvent) -> None:
"""Process a pod event."""
start_time = time.time()

try:
# Update resource version for crash recovery
if event.resource_version:
self._last_resource_version = event.resource_version

# Skip ignored phases
pod_phase = event.pod.status.phase if event.pod.status else None
if pod_phase in self.config.ignored_pod_phases:
return

pod_name = event.pod.metadata.name
# Skip ignored phases
pod_phase = event.pod.status.phase if event.pod.status else None
if pod_phase in self.config.ignored_pod_phases:
return

# Map to application events
try:
app_events = await self._event_mapper.map_pod_event(event.pod, event.event_type)

for app_event in app_events:
await self._publish_event(app_event, event.pod)

if any(e.event_type in _TERMINAL_EVENT_TYPES for e in app_events):
await self._delete_pod(event.pod)

if app_events:
self.logger.info(
f"Processed {event.event_type} event for pod {pod_name} "
f"(phase: {pod_phase or 'Unknown'}), "
f"published {len(app_events)} events"
)

except Exception:
self.logger.error("Error processing pod event", exc_info=True)
self._metrics.record_pod_monitor_watch_error(ErrorType.PROCESSING_ERROR)
return
finally:
duration = time.time() - start_time
self._metrics.record_pod_monitor_event_processing_duration(duration, event.event_type)

except Exception as e:
self.logger.error(f"Error processing pod event: {e}", exc_info=True)
self._metrics.record_pod_monitor_watch_error(ErrorType.PROCESSING_ERROR)
if any(e.event_type in _TERMINAL_EVENT_TYPES for e in app_events):
await self._delete_pod(event.pod)

if app_events:
pod_name = event.pod.metadata.name if event.pod.metadata else "unknown"
self.logger.info(
"Processed pod event",
event_type=event.event_type,
pod_name=pod_name,
phase=pod_phase or "Unknown",
published=len(app_events),
)

async def _publish_event(self, event: DomainEvent, pod: k8s_client.V1Pod) -> None:
async def _publish_event(self, event: PodMonitorEvent, pod: k8s_client.V1Pod) -> None:
"""Publish event to Kafka and store in events collection."""
execution_id = getattr(event, "execution_id", None) or event.aggregate_id
key = str(execution_id or (pod.metadata.name if pod.metadata else "unknown"))

await self._kafka_event_service.publish_event(event=event, key=key)
await self._kafka_event_service.publish_event(event=event, key=event.execution_id)

phase = pod.status.phase if pod.status else "Unknown"
self._metrics.record_pod_monitor_event_published(event.event_type, phase)
Expand All @@ -224,9 +193,6 @@ async def _delete_pod(self, pod: k8s_client.V1Pod) -> None:
await self._v1.delete_namespaced_pod(
name=pod_name, namespace=pod.metadata.namespace, grace_period_seconds=0,
)
self.logger.info(f"Deleted completed pod {pod_name}")
self.logger.info("Deleted completed pod", pod_name=pod_name)
except ApiException as e:
if e.status == 404:
self.logger.debug(f"Pod {pod_name} already deleted")
else:
self.logger.warning(f"Failed to delete pod {pod_name}: {e.reason}")
self.logger.warning("Failed to delete pod", pod_name=pod_name, status=e.status, reason=e.reason)
Loading
Loading