From 722bf3e227f84f534fe7c1e5146df7aba305a146 Mon Sep 17 00:00:00 2001 From: David Igandan Date: Tue, 13 Jan 2026 14:45:38 +0000 Subject: [PATCH 01/30] Add basic tracing middleware and global control --- src/workflows/services/common_service.py | 32 ++++++++++++++ .../transport/middleware/otel_tracing.py | 42 +++++++++++++++++++ 2 files changed, 74 insertions(+) create mode 100644 src/workflows/transport/middleware/otel_tracing.py diff --git a/src/workflows/services/common_service.py b/src/workflows/services/common_service.py index de2ef70..d79f7fb 100644 --- a/src/workflows/services/common_service.py +++ b/src/workflows/services/common_service.py @@ -12,6 +12,13 @@ import workflows import workflows.logging +from opentelemetry import trace +from opentelemetry.sdk.trace import TracerProvider +from opentelemetry.sdk.trace.export import BatchSpanProcessor +from opentelemetry.exporter.otlp.proto.http.trace_exporter import OTLPSpanExporter +from workflows.transport.middleware.otel_tracing import OTELTracingMiddleware +from opentelemetry.sdk.resources import Resource, SERVICE_NAME + class Status(enum.Enum): """ @@ -185,6 +192,31 @@ def start_transport(self): self.transport.subscription_callback_set_intercept( self._transport_interceptor ) + + # Configure OTELTracing + resource = Resource.create({ + SERVICE_NAME: self._service_name, + }) + + self.log.debug("Configuring OTELTracing") + provider = TracerProvider(resource=resource) + trace.set_tracer_provider(provider) + + # Configure BatchProcessor and OTLPSpanExporter to point to OTELCollector + otlp_exporter = OTLPSpanExporter( + endpoint="https://otel.tracing.diamond.ac.uk:4318/v1/traces", + timeout=10 + ) + span_processor = BatchSpanProcessor(otlp_exporter) + provider.add_span_processor(span_processor) + + # Add OTELTracingMiddleware to the transport layer + tracer = trace.get_tracer(__name__) + otel_middleware = OTELTracingMiddleware(tracer, service_name=self._service_name) + self._transport.add_middleware(otel_middleware) + + self.log.debug("OTELTracingMiddleware added to transport layer of %s", self._service_name) + metrics = self._environment.get("metrics") if metrics: import prometheus_client diff --git a/src/workflows/transport/middleware/otel_tracing.py b/src/workflows/transport/middleware/otel_tracing.py new file mode 100644 index 0000000..af0a1a1 --- /dev/null +++ b/src/workflows/transport/middleware/otel_tracing.py @@ -0,0 +1,42 @@ +from opentelemetry import trace +from workflows.transport.middleware import BaseTransportMiddleware +from collections.abc import Callable +import functools +from opentelemetry.propagate import inject + +class OTELTracingMiddleware(BaseTransportMiddleware): + def __init__(self, tracer: trace.Tracer, service_name: str): + """ + Initialize the OpenTelemetry Tracing Middleware. + + :param tracer: An OpenTelemetry tracer instance used to create spans. + """ + self.tracer = tracer + self.service_name = service_name + + + def send(self, call_next: Callable, destination, message, **kwargs): + """ + Middleware for tracing the `send` operation + + :param call_next: The next middleware or the original `send` method. + :param destination: The destination service to which the message is being sent. + :param message: The message being sent. + :param kwargs: Additional arguments for the `send` method. + """ + + # Start a new span for the `send` operation + with self.tracer.start_as_current_span("transport.send") as span: + # Attributes we're interested in + span.set_attribute("service_name", self.service_name) + span.set_attribute("destination", destination) + span.set_attribute("message", str(message)) + + # Inject trace context into message headers + headers = kwargs.setdefault("headers", {}) + inject(headers) + kwargs["headers"] = headers + + # Call the next middleware or the original `send` method + return call_next(destination, message, **kwargs) + From 52cb04d756370e00294ff3ad17f38020191b374a Mon Sep 17 00:00:00 2001 From: David Igandan Date: Mon, 26 Jan 2026 13:05:03 +0000 Subject: [PATCH 02/30] Instrument on subscribe and add dcid to span attributes --- src/workflows/recipe/__init__.py | 34 ++++++++++++++ .../transport/middleware/otel_tracing.py | 46 ++++++++----------- 2 files changed, 54 insertions(+), 26 deletions(-) diff --git a/src/workflows/recipe/__init__.py b/src/workflows/recipe/__init__.py index 0f1973f..5834bfe 100644 --- a/src/workflows/recipe/__init__.py +++ b/src/workflows/recipe/__init__.py @@ -3,6 +3,7 @@ import functools import logging from collections.abc import Callable +from opentelemetry import trace from typing import Any from workflows.recipe.recipe import Recipe @@ -69,6 +70,39 @@ def unwrap_recipe(header, message): message = mangle_for_receiving(message) if header.get("workflows-recipe") in {True, "True", "true", 1}: rw = RecipeWrapper(message=message, transport=transport_layer) + print(rw) + logger.log(1, rw) + + # Extract and set DCID on the current span + span = trace.get_current_span() + dcid = None + + # Try multiple locations where DCID might be stored + top_level_params = {} + if isinstance(message, dict): + # Direct parameters (top-level or in recipe) + top_level_params = message.get("parameters", {}) + + # Payload parameters (most common location) + payload = message.get("payload", {}) + payload_params = {} + if isinstance(payload, dict): + payload_params = payload.get("parameters", {}) + + # Try all common locations + dcid = ( + top_level_params.get("ispyb_dcid") or + top_level_params.get("dcid") or + payload_params.get("ispyb_dcid") or + payload_params.get("dcid") or + payload.get("ispyb_dcid") or + payload.get("dcid") + ) + + if dcid: + span.set_attribute("dcid", dcid) + span.add_event("recipe.dcid_extracted", attributes={"dcid": dcid}) + if log_extender and rw.environment and rw.environment.get("ID"): with log_extender("recipe_ID", rw.environment["ID"]): return callback(rw, header, message.get("payload")) diff --git a/src/workflows/transport/middleware/otel_tracing.py b/src/workflows/transport/middleware/otel_tracing.py index af0a1a1..27e89db 100644 --- a/src/workflows/transport/middleware/otel_tracing.py +++ b/src/workflows/transport/middleware/otel_tracing.py @@ -2,7 +2,7 @@ from workflows.transport.middleware import BaseTransportMiddleware from collections.abc import Callable import functools -from opentelemetry.propagate import inject +from opentelemetry.propagate import inject, extract class OTELTracingMiddleware(BaseTransportMiddleware): def __init__(self, tracer: trace.Tracer, service_name: str): @@ -14,29 +14,23 @@ def __init__(self, tracer: trace.Tracer, service_name: str): self.tracer = tracer self.service_name = service_name - - def send(self, call_next: Callable, destination, message, **kwargs): - """ - Middleware for tracing the `send` operation - - :param call_next: The next middleware or the original `send` method. - :param destination: The destination service to which the message is being sent. - :param message: The message being sent. - :param kwargs: Additional arguments for the `send` method. - """ - - # Start a new span for the `send` operation - with self.tracer.start_as_current_span("transport.send") as span: - # Attributes we're interested in - span.set_attribute("service_name", self.service_name) - span.set_attribute("destination", destination) - span.set_attribute("message", str(message)) + def subscribe(self, call_next: Callable, channel, callback, **kwargs) -> int: + @functools.wraps(callback) + def wrapped_callback(header, message): + # Extract trace context from message headers + ctx = extract(header) if header else None - # Inject trace context into message headers - headers = kwargs.setdefault("headers", {}) - inject(headers) - kwargs["headers"] = headers - - # Call the next middleware or the original `send` method - return call_next(destination, message, **kwargs) - + # Start a new span with the extracted context + with self.tracer.start_as_current_span( + "transport.subscribe", + context=ctx + ) as span: + span.set_attribute("service_name", self.service_name) + span.set_attribute("channel", channel) + + + # Call the original callback + return callback(header, message) + + # Call the next middleware with the wrapped callback + return call_next(channel, wrapped_callback, **kwargs) \ No newline at end of file From cc9ee124f06ae3c98f556f9aafbbc35c81430f7c Mon Sep 17 00:00:00 2001 From: David Igandan Date: Mon, 26 Jan 2026 13:10:45 +0000 Subject: [PATCH 03/30] Add spanid and traceid metadata to greylog --- src/workflows/recipe/__init__.py | 14 ++++++++++++++ 1 file changed, 14 insertions(+) diff --git a/src/workflows/recipe/__init__.py b/src/workflows/recipe/__init__.py index 5834bfe..abd5854 100644 --- a/src/workflows/recipe/__init__.py +++ b/src/workflows/recipe/__init__.py @@ -102,6 +102,20 @@ def unwrap_recipe(header, message): if dcid: span.set_attribute("dcid", dcid) span.add_event("recipe.dcid_extracted", attributes={"dcid": dcid}) + + # Extract span_id and trace_id for logging + span_context = span.get_span_context() + if span_context.is_valid: + span_id = format(span_context.span_id, '016x') + trace_id = format(span_context.trace_id, '032x') + + logger.info( + "Processing recipe message", + extra={ + "span_id": span_id, + "trace_id": trace_id, + } + ) if log_extender and rw.environment and rw.environment.get("ID"): with log_extender("recipe_ID", rw.environment["ID"]): From f7cc6589b8af00e60544fae88e8919742b4e8499 Mon Sep 17 00:00:00 2001 From: David Igandan Date: Mon, 26 Jan 2026 13:29:04 +0000 Subject: [PATCH 04/30] Add recipe_id to spans --- src/workflows/recipe/__init__.py | 32 ++++++++++++++++++++++++-------- 1 file changed, 24 insertions(+), 8 deletions(-) diff --git a/src/workflows/recipe/__init__.py b/src/workflows/recipe/__init__.py index abd5854..653ab61 100644 --- a/src/workflows/recipe/__init__.py +++ b/src/workflows/recipe/__init__.py @@ -70,13 +70,19 @@ def unwrap_recipe(header, message): message = mangle_for_receiving(message) if header.get("workflows-recipe") in {True, "True", "true", 1}: rw = RecipeWrapper(message=message, transport=transport_layer) - print(rw) - logger.log(1, rw) + logger.debug("RecipeWrapper created: %s", rw) - # Extract and set DCID on the current span + # Extract and set DCID and recipe_id on the current span span = trace.get_current_span() dcid = None + recipe_id = None + # Extract recipe ID from environment + if isinstance(message, dict): + environment = message.get("environment", {}) + if isinstance(environment, dict): + recipe_id = environment.get("ID") + # Try multiple locations where DCID might be stored top_level_params = {} if isinstance(message, dict): @@ -103,18 +109,28 @@ def unwrap_recipe(header, message): span.set_attribute("dcid", dcid) span.add_event("recipe.dcid_extracted", attributes={"dcid": dcid}) + if recipe_id: + span.set_attribute("recipe_id", recipe_id) + span.add_event("recipe.id_extracted", attributes={"recipe_id": recipe_id}) + # Extract span_id and trace_id for logging span_context = span.get_span_context() - if span_context.is_valid: + if span_context and span_context.is_valid: span_id = format(span_context.span_id, '016x') trace_id = format(span_context.trace_id, '032x') + log_extra = { + "span_id": span_id, + "trace_id": trace_id, + } + if dcid: + log_extra["dcid"] = dcid + if recipe_id: + log_extra["recipe_id"] = recipe_id + logger.info( "Processing recipe message", - extra={ - "span_id": span_id, - "trace_id": trace_id, - } + extra=log_extra ) if log_extender and rw.environment and rw.environment.get("ID"): From 8b2a2f1fa9d1a927a7b05d8ac986e1c71cba80a4 Mon Sep 17 00:00:00 2001 From: David Igandan Date: Mon, 26 Jan 2026 13:45:17 +0000 Subject: [PATCH 05/30] Add dev and prod dependencies --- pyproject.toml | 2 +- requirements_dev.txt | 3 +++ 2 files changed, 4 insertions(+), 1 deletion(-) diff --git a/pyproject.toml b/pyproject.toml index 8d8313a..ea974c1 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -20,7 +20,7 @@ classifiers = [ ] license = { text = "BSD-3-Clause" } requires-python = ">=3.10" -dependencies = ["bidict", "pika", "setuptools", "stomp-py>=7"] +dependencies = ["bidict", "pika", "setuptools", "stomp-py>=7", "opentelemetry-api==1.20.0", "opentelemetry-sdk==1.20.0", "opentelemetry-exporter-otlp-proto-http==1.20.0" ] [project.urls] Download = "https://github.com/DiamondLightSource/python-workflows/releases" diff --git a/requirements_dev.txt b/requirements_dev.txt index 8207c45..78c1d02 100644 --- a/requirements_dev.txt +++ b/requirements_dev.txt @@ -7,3 +7,6 @@ pytest-mock==3.14.0 pytest-timeout==2.3.1 stomp-py==8.1.2 websocket-client==1.8.0 +opentelemetry-api==1.20.0 +opentelemetry-sdk==1.20.0 +opentelemetry-exporter-otlp-proto-http==1.20.0 \ No newline at end of file From 0686e2824e75565eeaaca782b3dbfdaad38246ac Mon Sep 17 00:00:00 2001 From: David Igandan Date: Mon, 26 Jan 2026 15:55:01 +0000 Subject: [PATCH 06/30] Remove dcid extract from message and inject to span logic. Will be added to python-zocalo --- src/workflows/recipe/__init__.py | 29 ++--------------------------- 1 file changed, 2 insertions(+), 27 deletions(-) diff --git a/src/workflows/recipe/__init__.py b/src/workflows/recipe/__init__.py index 653ab61..629cd08 100644 --- a/src/workflows/recipe/__init__.py +++ b/src/workflows/recipe/__init__.py @@ -72,7 +72,7 @@ def unwrap_recipe(header, message): rw = RecipeWrapper(message=message, transport=transport_layer) logger.debug("RecipeWrapper created: %s", rw) - # Extract and set DCID and recipe_id on the current span + # Extract recipe_id on the current span span = trace.get_current_span() dcid = None recipe_id = None @@ -83,32 +83,6 @@ def unwrap_recipe(header, message): if isinstance(environment, dict): recipe_id = environment.get("ID") - # Try multiple locations where DCID might be stored - top_level_params = {} - if isinstance(message, dict): - # Direct parameters (top-level or in recipe) - top_level_params = message.get("parameters", {}) - - # Payload parameters (most common location) - payload = message.get("payload", {}) - payload_params = {} - if isinstance(payload, dict): - payload_params = payload.get("parameters", {}) - - # Try all common locations - dcid = ( - top_level_params.get("ispyb_dcid") or - top_level_params.get("dcid") or - payload_params.get("ispyb_dcid") or - payload_params.get("dcid") or - payload.get("ispyb_dcid") or - payload.get("dcid") - ) - - if dcid: - span.set_attribute("dcid", dcid) - span.add_event("recipe.dcid_extracted", attributes={"dcid": dcid}) - if recipe_id: span.set_attribute("recipe_id", recipe_id) span.add_event("recipe.id_extracted", attributes={"recipe_id": recipe_id}) @@ -129,6 +103,7 @@ def unwrap_recipe(header, message): log_extra["recipe_id"] = recipe_id logger.info( + "Processing recipe message", extra=log_extra ) From 2d9e21c21080af1fbab429c3e5618071ffa02c25 Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Mon, 26 Jan 2026 16:24:53 +0000 Subject: [PATCH 07/30] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- src/workflows/recipe/__init__.py | 19 ++++++----- src/workflows/services/common_service.py | 32 +++++++++++-------- .../transport/middleware/otel_tracing.py | 23 +++++++------ 3 files changed, 41 insertions(+), 33 deletions(-) diff --git a/src/workflows/recipe/__init__.py b/src/workflows/recipe/__init__.py index 629cd08..ab366f1 100644 --- a/src/workflows/recipe/__init__.py +++ b/src/workflows/recipe/__init__.py @@ -3,9 +3,10 @@ import functools import logging from collections.abc import Callable -from opentelemetry import trace from typing import Any +from opentelemetry import trace + from workflows.recipe.recipe import Recipe from workflows.recipe.validate import validate_recipe from workflows.recipe.wrapper import RecipeWrapper @@ -82,16 +83,18 @@ def unwrap_recipe(header, message): environment = message.get("environment", {}) if isinstance(environment, dict): recipe_id = environment.get("ID") - + if recipe_id: span.set_attribute("recipe_id", recipe_id) - span.add_event("recipe.id_extracted", attributes={"recipe_id": recipe_id}) + span.add_event( + "recipe.id_extracted", attributes={"recipe_id": recipe_id} + ) # Extract span_id and trace_id for logging span_context = span.get_span_context() if span_context and span_context.is_valid: - span_id = format(span_context.span_id, '016x') - trace_id = format(span_context.trace_id, '032x') + span_id = format(span_context.span_id, "016x") + trace_id = format(span_context.trace_id, "032x") log_extra = { "span_id": span_id, @@ -102,12 +105,8 @@ def unwrap_recipe(header, message): if recipe_id: log_extra["recipe_id"] = recipe_id - logger.info( + logger.info("Processing recipe message", extra=log_extra) - "Processing recipe message", - extra=log_extra - ) - if log_extender and rw.environment and rw.environment.get("ID"): with log_extender("recipe_ID", rw.environment["ID"]): return callback(rw, header, message.get("payload")) diff --git a/src/workflows/services/common_service.py b/src/workflows/services/common_service.py index d79f7fb..5aa8ee6 100644 --- a/src/workflows/services/common_service.py +++ b/src/workflows/services/common_service.py @@ -9,15 +9,15 @@ import time from typing import Any -import workflows -import workflows.logging - from opentelemetry import trace +from opentelemetry.exporter.otlp.proto.http.trace_exporter import OTLPSpanExporter +from opentelemetry.sdk.resources import SERVICE_NAME, Resource from opentelemetry.sdk.trace import TracerProvider from opentelemetry.sdk.trace.export import BatchSpanProcessor -from opentelemetry.exporter.otlp.proto.http.trace_exporter import OTLPSpanExporter + +import workflows +import workflows.logging from workflows.transport.middleware.otel_tracing import OTELTracingMiddleware -from opentelemetry.sdk.resources import Resource, SERVICE_NAME class Status(enum.Enum): @@ -192,11 +192,13 @@ def start_transport(self): self.transport.subscription_callback_set_intercept( self._transport_interceptor ) - + # Configure OTELTracing - resource = Resource.create({ - SERVICE_NAME: self._service_name, - }) + resource = Resource.create( + { + SERVICE_NAME: self._service_name, + } + ) self.log.debug("Configuring OTELTracing") provider = TracerProvider(resource=resource) @@ -204,18 +206,22 @@ def start_transport(self): # Configure BatchProcessor and OTLPSpanExporter to point to OTELCollector otlp_exporter = OTLPSpanExporter( - endpoint="https://otel.tracing.diamond.ac.uk:4318/v1/traces", - timeout=10 + endpoint="https://otel.tracing.diamond.ac.uk:4318/v1/traces", timeout=10 ) span_processor = BatchSpanProcessor(otlp_exporter) provider.add_span_processor(span_processor) # Add OTELTracingMiddleware to the transport layer tracer = trace.get_tracer(__name__) - otel_middleware = OTELTracingMiddleware(tracer, service_name=self._service_name) + otel_middleware = OTELTracingMiddleware( + tracer, service_name=self._service_name + ) self._transport.add_middleware(otel_middleware) - self.log.debug("OTELTracingMiddleware added to transport layer of %s", self._service_name) + self.log.debug( + "OTELTracingMiddleware added to transport layer of %s", + self._service_name, + ) metrics = self._environment.get("metrics") if metrics: diff --git a/src/workflows/transport/middleware/otel_tracing.py b/src/workflows/transport/middleware/otel_tracing.py index 27e89db..453ff6c 100644 --- a/src/workflows/transport/middleware/otel_tracing.py +++ b/src/workflows/transport/middleware/otel_tracing.py @@ -1,8 +1,13 @@ +from __future__ import annotations + +import functools +from collections.abc import Callable + from opentelemetry import trace +from opentelemetry.propagate import extract + from workflows.transport.middleware import BaseTransportMiddleware -from collections.abc import Callable -import functools -from opentelemetry.propagate import inject, extract + class OTELTracingMiddleware(BaseTransportMiddleware): def __init__(self, tracer: trace.Tracer, service_name: str): @@ -19,18 +24,16 @@ def subscribe(self, call_next: Callable, channel, callback, **kwargs) -> int: def wrapped_callback(header, message): # Extract trace context from message headers ctx = extract(header) if header else None - + # Start a new span with the extracted context with self.tracer.start_as_current_span( - "transport.subscribe", - context=ctx + "transport.subscribe", context=ctx ) as span: span.set_attribute("service_name", self.service_name) span.set_attribute("channel", channel) - - + # Call the original callback return callback(header, message) - + # Call the next middleware with the wrapped callback - return call_next(channel, wrapped_callback, **kwargs) \ No newline at end of file + return call_next(channel, wrapped_callback, **kwargs) From 3a5283ae292a83d168aabf10a70a5dac47a45468 Mon Sep 17 00:00:00 2001 From: David Igandan Date: Mon, 26 Jan 2026 16:52:27 +0000 Subject: [PATCH 08/30] Use plugin configurations to configure connection to OTELCollector --- src/workflows/services/common_service.py | 32 ++++++++++++---------- src/workflows/util/zocalo/configuration.py | 23 ++++++++++++++++ 2 files changed, 41 insertions(+), 14 deletions(-) diff --git a/src/workflows/services/common_service.py b/src/workflows/services/common_service.py index d79f7fb..fc640dd 100644 --- a/src/workflows/services/common_service.py +++ b/src/workflows/services/common_service.py @@ -193,20 +193,24 @@ def start_transport(self): self._transport_interceptor ) - # Configure OTELTracing - resource = Resource.create({ - SERVICE_NAME: self._service_name, - }) - - self.log.debug("Configuring OTELTracing") - provider = TracerProvider(resource=resource) - trace.set_tracer_provider(provider) - - # Configure BatchProcessor and OTLPSpanExporter to point to OTELCollector - otlp_exporter = OTLPSpanExporter( - endpoint="https://otel.tracing.diamond.ac.uk:4318/v1/traces", - timeout=10 - ) + # Configure OTELTracing if configuration is available + otel_config = OTEL.config if hasattr(OTEL, 'config') and OTEL.config else None + + if otel_config: + # Configure OTELTracing + resource = Resource.create({ + SERVICE_NAME: self._service_name, + }) + + self.log.debug("Configuring OTELTracing") + provider = TracerProvider(resource=resource) + trace.set_tracer_provider(provider) + + # Configure BatchProcessor and OTLPSpanExporter using config values + otlp_exporter = OTLPSpanExporter( + endpoint=otel_config["endpoint"], + timeout=otel_config.get("timeout", 10) + ) span_processor = BatchSpanProcessor(otlp_exporter) provider.add_span_processor(span_processor) diff --git a/src/workflows/util/zocalo/configuration.py b/src/workflows/util/zocalo/configuration.py index 08a600a..79ff0fe 100644 --- a/src/workflows/util/zocalo/configuration.py +++ b/src/workflows/util/zocalo/configuration.py @@ -7,6 +7,29 @@ from workflows.transport.pika_transport import PikaTransport from workflows.transport.stomp_transport import StompTransport +class OTEL: + """A Zocalo configuration plugin to pre-populate OTELTracing config defaults""" + + class Schema(PluginSchema): + host = fields.Str(required=True) + port = fields.Int(required=True) + endpoint = fields.Str(required=False) + timeout = fields.Int(required=False, load_default=10) + + # Store configuration for access by services + config = {} + + @staticmethod + def activate(configuration): + # Build the full endpoint URL if not provided + if "endpoint" not in configuration: + endpoint = f"https://{configuration['host']}:{configuration['port']}/v1/traces" + else: + endpoint = configuration["endpoint"] + + OTEL.config["endpoint"] = endpoint + OTEL.config["timeout"] = configuration.get("timeout", 10) + class Stomp: """A Zocalo configuration plugin to pre-populate StompTransport config defaults""" From 4b999f15c3bbe5d5755348961bca2f19f17b2ea6 Mon Sep 17 00:00:00 2001 From: David Igandan Date: Mon, 26 Jan 2026 16:58:49 +0000 Subject: [PATCH 09/30] Remove vestigial dcid handling and unnecessary debug statements --- src/workflows/recipe/__init__.py | 11 +---------- src/workflows/services/common_service.py | 1 - 2 files changed, 1 insertion(+), 11 deletions(-) diff --git a/src/workflows/recipe/__init__.py b/src/workflows/recipe/__init__.py index 629cd08..b7449bb 100644 --- a/src/workflows/recipe/__init__.py +++ b/src/workflows/recipe/__init__.py @@ -70,11 +70,9 @@ def unwrap_recipe(header, message): message = mangle_for_receiving(message) if header.get("workflows-recipe") in {True, "True", "true", 1}: rw = RecipeWrapper(message=message, transport=transport_layer) - logger.debug("RecipeWrapper created: %s", rw) # Extract recipe_id on the current span span = trace.get_current_span() - dcid = None recipe_id = None # Extract recipe ID from environment @@ -97,16 +95,9 @@ def unwrap_recipe(header, message): "span_id": span_id, "trace_id": trace_id, } - if dcid: - log_extra["dcid"] = dcid + if recipe_id: log_extra["recipe_id"] = recipe_id - - logger.info( - - "Processing recipe message", - extra=log_extra - ) if log_extender and rw.environment and rw.environment.get("ID"): with log_extender("recipe_ID", rw.environment["ID"]): diff --git a/src/workflows/services/common_service.py b/src/workflows/services/common_service.py index fc640dd..bc48331 100644 --- a/src/workflows/services/common_service.py +++ b/src/workflows/services/common_service.py @@ -219,7 +219,6 @@ def start_transport(self): otel_middleware = OTELTracingMiddleware(tracer, service_name=self._service_name) self._transport.add_middleware(otel_middleware) - self.log.debug("OTELTracingMiddleware added to transport layer of %s", self._service_name) metrics = self._environment.get("metrics") if metrics: From 4b86715ec6cec2f5ca80c5b5ceb9f1d70025eabc Mon Sep 17 00:00:00 2001 From: David Igandan Date: Mon, 26 Jan 2026 16:59:43 +0000 Subject: [PATCH 10/30] remove unhelpful docstring --- src/workflows/transport/middleware/otel_tracing.py | 5 ----- 1 file changed, 5 deletions(-) diff --git a/src/workflows/transport/middleware/otel_tracing.py b/src/workflows/transport/middleware/otel_tracing.py index 27e89db..ff3e849 100644 --- a/src/workflows/transport/middleware/otel_tracing.py +++ b/src/workflows/transport/middleware/otel_tracing.py @@ -6,11 +6,6 @@ class OTELTracingMiddleware(BaseTransportMiddleware): def __init__(self, tracer: trace.Tracer, service_name: str): - """ - Initialize the OpenTelemetry Tracing Middleware. - - :param tracer: An OpenTelemetry tracer instance used to create spans. - """ self.tracer = tracer self.service_name = service_name From 3e0b9029db0243b425f9380203fad819746885a6 Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Mon, 26 Jan 2026 17:08:34 +0000 Subject: [PATCH 11/30] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- src/workflows/recipe/__init__.py | 2 +- src/workflows/services/common_service.py | 19 +++++++++++-------- src/workflows/util/zocalo/configuration.py | 7 +++++-- 3 files changed, 17 insertions(+), 11 deletions(-) diff --git a/src/workflows/recipe/__init__.py b/src/workflows/recipe/__init__.py index 59afa5e..143d570 100644 --- a/src/workflows/recipe/__init__.py +++ b/src/workflows/recipe/__init__.py @@ -101,7 +101,7 @@ def unwrap_recipe(header, message): if recipe_id: log_extra["recipe_id"] = recipe_id - + if log_extender and rw.environment and rw.environment.get("ID"): with log_extender("recipe_ID", rw.environment["ID"]): return callback(rw, header, message.get("payload")) diff --git a/src/workflows/services/common_service.py b/src/workflows/services/common_service.py index 5258a30..ddf39e8 100644 --- a/src/workflows/services/common_service.py +++ b/src/workflows/services/common_service.py @@ -192,15 +192,19 @@ def start_transport(self): self.transport.subscription_callback_set_intercept( self._transport_interceptor ) - + # Configure OTELTracing if configuration is available - otel_config = OTEL.config if hasattr(OTEL, 'config') and OTEL.config else None - + otel_config = ( + OTEL.config if hasattr(OTEL, "config") and OTEL.config else None + ) + if otel_config: # Configure OTELTracing - resource = Resource.create({ - SERVICE_NAME: self._service_name, - }) + resource = Resource.create( + { + SERVICE_NAME: self._service_name, + } + ) self.log.debug("Configuring OTELTracing") provider = TracerProvider(resource=resource) @@ -209,7 +213,7 @@ def start_transport(self): # Configure BatchProcessor and OTLPSpanExporter using config values otlp_exporter = OTLPSpanExporter( endpoint=otel_config["endpoint"], - timeout=otel_config.get("timeout", 10) + timeout=otel_config.get("timeout", 10), ) span_processor = BatchSpanProcessor(otlp_exporter) provider.add_span_processor(span_processor) @@ -221,7 +225,6 @@ def start_transport(self): ) self._transport.add_middleware(otel_middleware) - metrics = self._environment.get("metrics") if metrics: import prometheus_client diff --git a/src/workflows/util/zocalo/configuration.py b/src/workflows/util/zocalo/configuration.py index 79ff0fe..ca5d77f 100644 --- a/src/workflows/util/zocalo/configuration.py +++ b/src/workflows/util/zocalo/configuration.py @@ -7,6 +7,7 @@ from workflows.transport.pika_transport import PikaTransport from workflows.transport.stomp_transport import StompTransport + class OTEL: """A Zocalo configuration plugin to pre-populate OTELTracing config defaults""" @@ -23,10 +24,12 @@ class Schema(PluginSchema): def activate(configuration): # Build the full endpoint URL if not provided if "endpoint" not in configuration: - endpoint = f"https://{configuration['host']}:{configuration['port']}/v1/traces" + endpoint = ( + f"https://{configuration['host']}:{configuration['port']}/v1/traces" + ) else: endpoint = configuration["endpoint"] - + OTEL.config["endpoint"] = endpoint OTEL.config["timeout"] = configuration.get("timeout", 10) From d446e8099a4fb7e1a61a94207e56dfd20c7e5d84 Mon Sep 17 00:00:00 2001 From: David Igandan Date: Mon, 26 Jan 2026 17:16:49 +0000 Subject: [PATCH 12/30] imported OTEL config class to common_service --- src/workflows/services/common_service.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/workflows/services/common_service.py b/src/workflows/services/common_service.py index ddf39e8..62dc653 100644 --- a/src/workflows/services/common_service.py +++ b/src/workflows/services/common_service.py @@ -18,6 +18,8 @@ import workflows import workflows.logging from workflows.transport.middleware.otel_tracing import OTELTracingMiddleware +from workflows.util.zocalo.configuration import OTEL + class Status(enum.Enum): From 16b0e10143a11f52c7eeec36b84866da52556082 Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Mon, 26 Jan 2026 17:17:01 +0000 Subject: [PATCH 13/30] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- src/workflows/services/common_service.py | 1 - 1 file changed, 1 deletion(-) diff --git a/src/workflows/services/common_service.py b/src/workflows/services/common_service.py index 62dc653..717b448 100644 --- a/src/workflows/services/common_service.py +++ b/src/workflows/services/common_service.py @@ -21,7 +21,6 @@ from workflows.util.zocalo.configuration import OTEL - class Status(enum.Enum): """ Internal service status codes From 7ad857fb3a9c457cd0c09913394d714ab92ea3aa Mon Sep 17 00:00:00 2001 From: David Igandan Date: Tue, 27 Jan 2026 14:28:21 +0000 Subject: [PATCH 14/30] add marshmallow dependency --- pyproject.toml | 2 +- requirements_dev.txt | 3 ++- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/pyproject.toml b/pyproject.toml index ea974c1..9ed046d 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -20,7 +20,7 @@ classifiers = [ ] license = { text = "BSD-3-Clause" } requires-python = ">=3.10" -dependencies = ["bidict", "pika", "setuptools", "stomp-py>=7", "opentelemetry-api==1.20.0", "opentelemetry-sdk==1.20.0", "opentelemetry-exporter-otlp-proto-http==1.20.0" ] +dependencies = ["marshmallow","bidict", "pika", "setuptools", "stomp-py>=7", "opentelemetry-api==1.20.0", "opentelemetry-sdk==1.20.0", "opentelemetry-exporter-otlp-proto-http==1.20.0" ] [project.urls] Download = "https://github.com/DiamondLightSource/python-workflows/releases" diff --git a/requirements_dev.txt b/requirements_dev.txt index 78c1d02..bd711be 100644 --- a/requirements_dev.txt +++ b/requirements_dev.txt @@ -9,4 +9,5 @@ stomp-py==8.1.2 websocket-client==1.8.0 opentelemetry-api==1.20.0 opentelemetry-sdk==1.20.0 -opentelemetry-exporter-otlp-proto-http==1.20.0 \ No newline at end of file +opentelemetry-exporter-otlp-proto-http==1.20.0 +marshmallow \ No newline at end of file From 7aae664ce6f42f75f71b4ba79550938abfa48129 Mon Sep 17 00:00:00 2001 From: David Igandan Date: Tue, 27 Jan 2026 14:45:17 +0000 Subject: [PATCH 15/30] add zocalo dependency --- pyproject.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pyproject.toml b/pyproject.toml index 9ed046d..9848750 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -20,7 +20,7 @@ classifiers = [ ] license = { text = "BSD-3-Clause" } requires-python = ">=3.10" -dependencies = ["marshmallow","bidict", "pika", "setuptools", "stomp-py>=7", "opentelemetry-api==1.20.0", "opentelemetry-sdk==1.20.0", "opentelemetry-exporter-otlp-proto-http==1.20.0" ] +dependencies = ["zocalo","marshmallow","bidict", "pika", "setuptools", "stomp-py>=7", "opentelemetry-api==1.20.0", "opentelemetry-sdk==1.20.0", "opentelemetry-exporter-otlp-proto-http==1.20.0" ] [project.urls] Download = "https://github.com/DiamondLightSource/python-workflows/releases" From 902a7dfdcb9f80af32b3c3d091a71fd9258dfcef Mon Sep 17 00:00:00 2001 From: David Igandan Date: Tue, 27 Jan 2026 15:10:15 +0000 Subject: [PATCH 16/30] Fix possibly unbound error --- src/workflows/services/common_service.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/workflows/services/common_service.py b/src/workflows/services/common_service.py index 717b448..c53b02a 100644 --- a/src/workflows/services/common_service.py +++ b/src/workflows/services/common_service.py @@ -216,8 +216,8 @@ def start_transport(self): endpoint=otel_config["endpoint"], timeout=otel_config.get("timeout", 10), ) - span_processor = BatchSpanProcessor(otlp_exporter) - provider.add_span_processor(span_processor) + span_processor = BatchSpanProcessor(otlp_exporter) + provider.add_span_processor(span_processor) # Add OTELTracingMiddleware to the transport layer tracer = trace.get_tracer(__name__) From 9e5adb764550b8044e7196d65465edab7f1dc0c3 Mon Sep 17 00:00:00 2001 From: David Igandan Date: Mon, 2 Feb 2026 13:22:07 +0000 Subject: [PATCH 17/30] Moved plugin functionality to python-workflows --- pyproject.toml | 1 + src/workflows/services/common_service.py | 66 +++++++++++++----------- 2 files changed, 38 insertions(+), 29 deletions(-) diff --git a/pyproject.toml b/pyproject.toml index 9848750..8a89a3b 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -53,6 +53,7 @@ OfflineTransport = "workflows.transport.offline_transport:OfflineTransport" pika = "workflows.util.zocalo.configuration:Pika" stomp = "workflows.util.zocalo.configuration:Stomp" transport = "workflows.util.zocalo.configuration:DefaultTransport" +opentelemetry = "workflows.util.zocalo.configuration:OTEL" [project.scripts] "workflows.validate_recipe" = "workflows.recipe.validate:main" diff --git a/src/workflows/services/common_service.py b/src/workflows/services/common_service.py index c53b02a..b13b692 100644 --- a/src/workflows/services/common_service.py +++ b/src/workflows/services/common_service.py @@ -18,7 +18,6 @@ import workflows import workflows.logging from workflows.transport.middleware.otel_tracing import OTELTracingMiddleware -from workflows.util.zocalo.configuration import OTEL class Status(enum.Enum): @@ -193,38 +192,47 @@ def start_transport(self): self.transport.subscription_callback_set_intercept( self._transport_interceptor ) - - # Configure OTELTracing if configuration is available - otel_config = ( - OTEL.config if hasattr(OTEL, "config") and OTEL.config else None - ) - - if otel_config: - # Configure OTELTracing - resource = Resource.create( - { - SERVICE_NAME: self._service_name, - } + try: + # Configure OTELTracing if configuration is available + otel_config = ( + self.config.opentelemetry if self.config and hasattr(self.config, "opentelemetry") else None ) - self.log.debug("Configuring OTELTracing") - provider = TracerProvider(resource=resource) - trace.set_tracer_provider(provider) + if otel_config: + if "endpoint" not in otel_config: + self.log.warning("Missing required OTEL configuration field `endpoint`.") + + if "timeout" not in otel_config: + self.log.warning("Missing optional OTEL configuration field `timout`. Will default to 10 seconds. ") + + # Configure OTELTracing + resource = Resource.create( + { + SERVICE_NAME: self._service_name, + } + ) - # Configure BatchProcessor and OTLPSpanExporter using config values - otlp_exporter = OTLPSpanExporter( - endpoint=otel_config["endpoint"], - timeout=otel_config.get("timeout", 10), - ) - span_processor = BatchSpanProcessor(otlp_exporter) - provider.add_span_processor(span_processor) + self.log.debug("Configuring OTELTracing") + provider = TracerProvider(resource=resource) + trace.set_tracer_provider(provider) - # Add OTELTracingMiddleware to the transport layer - tracer = trace.get_tracer(__name__) - otel_middleware = OTELTracingMiddleware( - tracer, service_name=self._service_name - ) - self._transport.add_middleware(otel_middleware) + # Configure BatchProcessor and OTLPSpanExporter using config values + otlp_exporter = OTLPSpanExporter( + endpoint=otel_config["endpoint"], + timeout=otel_config.get("timeout", 10), + ) + span_processor = BatchSpanProcessor(otlp_exporter) + provider.add_span_processor(span_processor) + + # Add OTELTracingMiddleware to the transport layer + tracer = trace.get_tracer(__name__) + otel_middleware = OTELTracingMiddleware( + tracer, service_name=self._service_name + ) + self._transport.add_middleware(otel_middleware) + except Exception as e: + # Continue without tracing if configuration fails + self.log.warning("Failed to configure OpenTelemetry tracing: %s", str(e)) metrics = self._environment.get("metrics") if metrics: From 1d7457ead8f1f6600783a1c994ade9e8dcd71ed1 Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Mon, 2 Feb 2026 13:22:46 +0000 Subject: [PATCH 18/30] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- src/workflows/services/common_service.py | 18 +++++++++++++----- 1 file changed, 13 insertions(+), 5 deletions(-) diff --git a/src/workflows/services/common_service.py b/src/workflows/services/common_service.py index b13b692..72c8903 100644 --- a/src/workflows/services/common_service.py +++ b/src/workflows/services/common_service.py @@ -195,15 +195,21 @@ def start_transport(self): try: # Configure OTELTracing if configuration is available otel_config = ( - self.config.opentelemetry if self.config and hasattr(self.config, "opentelemetry") else None + self.config.opentelemetry + if self.config and hasattr(self.config, "opentelemetry") + else None ) if otel_config: if "endpoint" not in otel_config: - self.log.warning("Missing required OTEL configuration field `endpoint`.") - + self.log.warning( + "Missing required OTEL configuration field `endpoint`." + ) + if "timeout" not in otel_config: - self.log.warning("Missing optional OTEL configuration field `timout`. Will default to 10 seconds. ") + self.log.warning( + "Missing optional OTEL configuration field `timout`. Will default to 10 seconds. " + ) # Configure OTELTracing resource = Resource.create( @@ -232,7 +238,9 @@ def start_transport(self): self._transport.add_middleware(otel_middleware) except Exception as e: # Continue without tracing if configuration fails - self.log.warning("Failed to configure OpenTelemetry tracing: %s", str(e)) + self.log.warning( + "Failed to configure OpenTelemetry tracing: %s", str(e) + ) metrics = self._environment.get("metrics") if metrics: From ff3679c5b62d83d4b13b9bcdf1162e05895b6a1b Mon Sep 17 00:00:00 2001 From: David Igandan Date: Fri, 6 Feb 2026 11:06:10 +0000 Subject: [PATCH 19/30] Fixed typos, vestigial code and improper use of log_extender --- pyproject.toml | 2 +- src/workflows/recipe/__init__.py | 28 +++++------ src/workflows/services/common_service.py | 61 +++++++++++------------- 3 files changed, 39 insertions(+), 52 deletions(-) diff --git a/pyproject.toml b/pyproject.toml index 8a89a3b..2f61f6d 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -20,7 +20,7 @@ classifiers = [ ] license = { text = "BSD-3-Clause" } requires-python = ">=3.10" -dependencies = ["zocalo","marshmallow","bidict", "pika", "setuptools", "stomp-py>=7", "opentelemetry-api==1.20.0", "opentelemetry-sdk==1.20.0", "opentelemetry-exporter-otlp-proto-http==1.20.0" ] +dependencies = ["bidict", "pika", "setuptools", "stomp-py>=7", "opentelemetry-api==1.20.0", "opentelemetry-sdk==1.20.0", "opentelemetry-exporter-otlp-proto-http==1.20.0" ] [project.urls] Download = "https://github.com/DiamondLightSource/python-workflows/releases" diff --git a/src/workflows/recipe/__init__.py b/src/workflows/recipe/__init__.py index 143d570..5b8874e 100644 --- a/src/workflows/recipe/__init__.py +++ b/src/workflows/recipe/__init__.py @@ -72,40 +72,34 @@ def unwrap_recipe(header, message): if header.get("workflows-recipe") in {True, "True", "true", 1}: rw = RecipeWrapper(message=message, transport=transport_layer) - # Extract recipe_id on the current span + # Extract recipe ID from environment and add to current span span = trace.get_current_span() - recipe_id = None - - # Extract recipe ID from environment - if isinstance(message, dict): - environment = message.get("environment", {}) - if isinstance(environment, dict): - recipe_id = environment.get("ID") + recipe_id = rw.environment.get("ID") if recipe_id: span.set_attribute("recipe_id", recipe_id) - span.add_event( - "recipe.id_extracted", attributes={"recipe_id": recipe_id} - ) # Extract span_id and trace_id for logging span_context = span.get_span_context() if span_context and span_context.is_valid: - span_id = format(span_context.span_id, "016x") - trace_id = format(span_context.trace_id, "032x") + span_id = span_context.span_id + trace_id = span_context.trace_id - log_extra = { + otel_logs = { "span_id": span_id, "trace_id": trace_id, } - + if recipe_id: - log_extra["recipe_id"] = recipe_id + otel_logs["recipe_id"] = recipe_id + else: + otel_logs = "No OTEL related logs available" if log_extender and rw.environment and rw.environment.get("ID"): - with log_extender("recipe_ID", rw.environment["ID"]): + with log_extender("recipe_ID", rw.environment["ID"]), log_extender("otel_logs", otel_logs): return callback(rw, header, message.get("payload")) return callback(rw, header, message.get("payload")) + if allow_non_recipe_messages: return callback(None, header, message) # self.log.warning('Discarding non-recipe message:\n' + \ diff --git a/src/workflows/services/common_service.py b/src/workflows/services/common_service.py index b13b692..ca4e0a5 100644 --- a/src/workflows/services/common_service.py +++ b/src/workflows/services/common_service.py @@ -195,44 +195,37 @@ def start_transport(self): try: # Configure OTELTracing if configuration is available otel_config = ( - self.config.opentelemetry if self.config and hasattr(self.config, "opentelemetry") else None + self.config._opentelemetry if self.config and hasattr(self.config, "opentelemetry") else None + ) + + if otel_config and "timeout" not in otel_config: + self.log.warning("Missing optional OTEL configuration field `timeout`. Will default to 10 seconds. ") + + # Configure OTELTracing + resource = Resource.create( + { + SERVICE_NAME: self._service_name, + } ) - if otel_config: - if "endpoint" not in otel_config: - self.log.warning("Missing required OTEL configuration field `endpoint`.") - - if "timeout" not in otel_config: - self.log.warning("Missing optional OTEL configuration field `timout`. Will default to 10 seconds. ") - - # Configure OTELTracing - resource = Resource.create( - { - SERVICE_NAME: self._service_name, - } - ) - - self.log.debug("Configuring OTELTracing") - provider = TracerProvider(resource=resource) - trace.set_tracer_provider(provider) + self.log.debug("Configuring OTELTracing") + provider = TracerProvider(resource=resource) + trace.set_tracer_provider(provider) - # Configure BatchProcessor and OTLPSpanExporter using config values - otlp_exporter = OTLPSpanExporter( - endpoint=otel_config["endpoint"], - timeout=otel_config.get("timeout", 10), - ) - span_processor = BatchSpanProcessor(otlp_exporter) - provider.add_span_processor(span_processor) + # Configure BatchProcessor and OTLPSpanExporter using config values + otlp_exporter = OTLPSpanExporter( + endpoint=otel_config["endpoint"], + timeout=otel_config.get("timeout", 10), + ) + span_processor = BatchSpanProcessor(otlp_exporter) + provider.add_span_processor(span_processor) - # Add OTELTracingMiddleware to the transport layer - tracer = trace.get_tracer(__name__) - otel_middleware = OTELTracingMiddleware( - tracer, service_name=self._service_name - ) - self._transport.add_middleware(otel_middleware) - except Exception as e: - # Continue without tracing if configuration fails - self.log.warning("Failed to configure OpenTelemetry tracing: %s", str(e)) + # Add OTELTracingMiddleware to the transport layer + tracer = trace.get_tracer(__name__) + otel_middleware = OTELTracingMiddleware( + tracer, service_name=self._service_name + ) + self._transport.add_middleware(otel_middleware) metrics = self._environment.get("metrics") if metrics: From e6be92552b0b9c5e8c1d0b8278c26a7ccd42da25 Mon Sep 17 00:00:00 2001 From: David Igandan Date: Fri, 6 Feb 2026 11:13:31 +0000 Subject: [PATCH 20/30] Remove vestigial try block and fix runtime issue where None[] or None.get() was being called --- src/workflows/services/common_service.py | 62 ++++++++++++------------ 1 file changed, 31 insertions(+), 31 deletions(-) diff --git a/src/workflows/services/common_service.py b/src/workflows/services/common_service.py index ca4e0a5..6008b3c 100644 --- a/src/workflows/services/common_service.py +++ b/src/workflows/services/common_service.py @@ -192,40 +192,40 @@ def start_transport(self): self.transport.subscription_callback_set_intercept( self._transport_interceptor ) - try: - # Configure OTELTracing if configuration is available - otel_config = ( - self.config._opentelemetry if self.config and hasattr(self.config, "opentelemetry") else None - ) - - if otel_config and "timeout" not in otel_config: - self.log.warning("Missing optional OTEL configuration field `timeout`. Will default to 10 seconds. ") - - # Configure OTELTracing - resource = Resource.create( - { - SERVICE_NAME: self._service_name, - } - ) + + # Configure OTELTracing if configuration is available + otel_config = ( + self.config._opentelemetry if self.config and hasattr(self.config, "opentelemetry") else None + ) + + if otel_config and "timeout" not in otel_config: + self.log.warning("Missing optional OTEL configuration field `timeout`. Will default to 10 seconds. ") + + # Configure OTELTracing + resource = Resource.create( + { + SERVICE_NAME: self._service_name, + } + ) - self.log.debug("Configuring OTELTracing") - provider = TracerProvider(resource=resource) - trace.set_tracer_provider(provider) + self.log.debug("Configuring OTELTracing") + provider = TracerProvider(resource=resource) + trace.set_tracer_provider(provider) - # Configure BatchProcessor and OTLPSpanExporter using config values - otlp_exporter = OTLPSpanExporter( - endpoint=otel_config["endpoint"], - timeout=otel_config.get("timeout", 10), - ) - span_processor = BatchSpanProcessor(otlp_exporter) - provider.add_span_processor(span_processor) + # Configure BatchProcessor and OTLPSpanExporter using config values + otlp_exporter = OTLPSpanExporter( + endpoint=otel_config["endpoint"], + timeout=otel_config.get("timeout", 10), + ) + span_processor = BatchSpanProcessor(otlp_exporter) + provider.add_span_processor(span_processor) - # Add OTELTracingMiddleware to the transport layer - tracer = trace.get_tracer(__name__) - otel_middleware = OTELTracingMiddleware( - tracer, service_name=self._service_name - ) - self._transport.add_middleware(otel_middleware) + # Add OTELTracingMiddleware to the transport layer + tracer = trace.get_tracer(__name__) + otel_middleware = OTELTracingMiddleware( + tracer, service_name=self._service_name + ) + self._transport.add_middleware(otel_middleware) metrics = self._environment.get("metrics") if metrics: From aebe1caebb4217735da56a25e9b02bb43691c2b8 Mon Sep 17 00:00:00 2001 From: David Igandan Date: Fri, 6 Feb 2026 11:13:43 +0000 Subject: [PATCH 21/30] Remove vestigial try block and fix runtime issue where None[] or None.get() was being called --- src/workflows/services/common_service.py | 46 ++++++++++++------------ 1 file changed, 23 insertions(+), 23 deletions(-) diff --git a/src/workflows/services/common_service.py b/src/workflows/services/common_service.py index 6008b3c..9ec469d 100644 --- a/src/workflows/services/common_service.py +++ b/src/workflows/services/common_service.py @@ -200,32 +200,32 @@ def start_transport(self): if otel_config and "timeout" not in otel_config: self.log.warning("Missing optional OTEL configuration field `timeout`. Will default to 10 seconds. ") + + # Configure OTELTracing + resource = Resource.create( + { + SERVICE_NAME: self._service_name, + } + ) - # Configure OTELTracing - resource = Resource.create( - { - SERVICE_NAME: self._service_name, - } - ) - - self.log.debug("Configuring OTELTracing") - provider = TracerProvider(resource=resource) - trace.set_tracer_provider(provider) + self.log.debug("Configuring OTELTracing") + provider = TracerProvider(resource=resource) + trace.set_tracer_provider(provider) - # Configure BatchProcessor and OTLPSpanExporter using config values - otlp_exporter = OTLPSpanExporter( - endpoint=otel_config["endpoint"], - timeout=otel_config.get("timeout", 10), - ) - span_processor = BatchSpanProcessor(otlp_exporter) - provider.add_span_processor(span_processor) + # Configure BatchProcessor and OTLPSpanExporter using config values + otlp_exporter = OTLPSpanExporter( + endpoint=otel_config["endpoint"], + timeout=otel_config.get("timeout", 10), + ) + span_processor = BatchSpanProcessor(otlp_exporter) + provider.add_span_processor(span_processor) - # Add OTELTracingMiddleware to the transport layer - tracer = trace.get_tracer(__name__) - otel_middleware = OTELTracingMiddleware( - tracer, service_name=self._service_name - ) - self._transport.add_middleware(otel_middleware) + # Add OTELTracingMiddleware to the transport layer + tracer = trace.get_tracer(__name__) + otel_middleware = OTELTracingMiddleware( + tracer, service_name=self._service_name + ) + self._transport.add_middleware(otel_middleware) metrics = self._environment.get("metrics") if metrics: From 77eb9e91cc30c8569230464207f82b2abbe33add Mon Sep 17 00:00:00 2001 From: David Igandan Date: Fri, 6 Feb 2026 14:21:56 +0000 Subject: [PATCH 22/30] Implement ExitStack() to manage multiple context managers and clean them up --- src/workflows/recipe/__init__.py | 23 ++++++++++++++-------- src/workflows/services/common_service.py | 7 +++++++ src/workflows/util/zocalo/configuration.py | 2 ++ 3 files changed, 24 insertions(+), 8 deletions(-) diff --git a/src/workflows/recipe/__init__.py b/src/workflows/recipe/__init__.py index 5b8874e..0e4e2ee 100644 --- a/src/workflows/recipe/__init__.py +++ b/src/workflows/recipe/__init__.py @@ -1,5 +1,5 @@ from __future__ import annotations - +from contextlib import ExitStack import functools import logging from collections.abc import Callable @@ -70,6 +70,7 @@ def unwrap_recipe(header, message): if mangle_for_receiving: message = mangle_for_receiving(message) if header.get("workflows-recipe") in {True, "True", "true", 1}: + otel_logs = None rw = RecipeWrapper(message=message, transport=transport_layer) # Extract recipe ID from environment and add to current span @@ -79,6 +80,8 @@ def unwrap_recipe(header, message): if recipe_id: span.set_attribute("recipe_id", recipe_id) + + # Extract span_id and trace_id for logging span_context = span.get_span_context() if span_context and span_context.is_valid: @@ -92,13 +95,17 @@ def unwrap_recipe(header, message): if recipe_id: otel_logs["recipe_id"] = recipe_id - else: - otel_logs = "No OTEL related logs available" - - if log_extender and rw.environment and rw.environment.get("ID"): - with log_extender("recipe_ID", rw.environment["ID"]), log_extender("otel_logs", otel_logs): - return callback(rw, header, message.get("payload")) - return callback(rw, header, message.get("payload")) + + with ExitStack() as stack: + # Configure the context depending on if service is emitting spans + if otel_logs and log_extender and rw.environment and rw.environment.get("ID"): + stack.enter_context(log_extender('recipe_ID', rw.environment.get("ID"))) + stack.enter_context(log_extender('otel_logs', otel_logs)) + elif log_extender and rw.environment and rw.environment.get("ID"): + stack.enter_context(log_extender('recipe_ID', rw.environment.get("ID"))) + + return callback(rw, header, message.get("payload")) + if allow_non_recipe_messages: return callback(None, header, message) diff --git a/src/workflows/services/common_service.py b/src/workflows/services/common_service.py index 9ec469d..aac8e15 100644 --- a/src/workflows/services/common_service.py +++ b/src/workflows/services/common_service.py @@ -197,6 +197,13 @@ def start_transport(self): otel_config = ( self.config._opentelemetry if self.config and hasattr(self.config, "opentelemetry") else None ) + # debugging + with open("/scratch/logs.txt", 'w+') as file: + if otel_config: + import json + json.dump(otel_config, file, indent=4) + else: + file.write("otel config was not truthy") if otel_config and "timeout" not in otel_config: self.log.warning("Missing optional OTEL configuration field `timeout`. Will default to 10 seconds. ") diff --git a/src/workflows/util/zocalo/configuration.py b/src/workflows/util/zocalo/configuration.py index ca5d77f..48e5296 100644 --- a/src/workflows/util/zocalo/configuration.py +++ b/src/workflows/util/zocalo/configuration.py @@ -33,6 +33,8 @@ def activate(configuration): OTEL.config["endpoint"] = endpoint OTEL.config["timeout"] = configuration.get("timeout", 10) + return OTEL.config + class Stomp: """A Zocalo configuration plugin to pre-populate StompTransport config defaults""" From 5f94077010ffa9d126a5adf17d72b4970bed3f74 Mon Sep 17 00:00:00 2001 From: David Igandan Date: Tue, 17 Feb 2026 11:46:34 +0000 Subject: [PATCH 23/30] Fix broken tracing functionality --- src/workflows/services/common_service.py | 12 +- src/workflows/transport/common_transport.py | 1 - .../transport/middleware/__init__.py | 7 +- .../transport/middleware/otel_tracing.py | 213 +++++++++++++++++- 4 files changed, 212 insertions(+), 21 deletions(-) diff --git a/src/workflows/services/common_service.py b/src/workflows/services/common_service.py index aac8e15..3d646f3 100644 --- a/src/workflows/services/common_service.py +++ b/src/workflows/services/common_service.py @@ -197,17 +197,7 @@ def start_transport(self): otel_config = ( self.config._opentelemetry if self.config and hasattr(self.config, "opentelemetry") else None ) - # debugging - with open("/scratch/logs.txt", 'w+') as file: - if otel_config: - import json - json.dump(otel_config, file, indent=4) - else: - file.write("otel config was not truthy") - - if otel_config and "timeout" not in otel_config: - self.log.warning("Missing optional OTEL configuration field `timeout`. Will default to 10 seconds. ") - + if otel_config: # Configure OTELTracing resource = Resource.create( { diff --git a/src/workflows/transport/common_transport.py b/src/workflows/transport/common_transport.py index 8e26460..435487f 100644 --- a/src/workflows/transport/common_transport.py +++ b/src/workflows/transport/common_transport.py @@ -10,7 +10,6 @@ MessageCallback = Callable[[Mapping[str, Any], Any], None] - class TemporarySubscription(NamedTuple): subscription_id: int queue_name: str diff --git a/src/workflows/transport/middleware/__init__.py b/src/workflows/transport/middleware/__init__.py index 1ace0ff..9357b18 100644 --- a/src/workflows/transport/middleware/__init__.py +++ b/src/workflows/transport/middleware/__init__.py @@ -233,6 +233,10 @@ def wrapped_callback(header, message): def wrap(f: Callable): + # debugging + if f.__name__ == "send": + print("we are wrapping send now") + @functools.wraps(f) def wrapper(self, *args, **kwargs): return functools.reduce( @@ -243,4 +247,5 @@ def wrapper(self, *args, **kwargs): lambda *args, **kwargs: f(self, *args, **kwargs), )(*args, **kwargs) - return wrapper + print(wrapper.__wrapped__) + return wrapper \ No newline at end of file diff --git a/src/workflows/transport/middleware/otel_tracing.py b/src/workflows/transport/middleware/otel_tracing.py index a3d791f..ddbad02 100644 --- a/src/workflows/transport/middleware/otel_tracing.py +++ b/src/workflows/transport/middleware/otel_tracing.py @@ -4,31 +4,228 @@ from collections.abc import Callable from opentelemetry import trace -from opentelemetry.propagate import extract +from opentelemetry.propagate import extract, inject +from opentelemetry.context import Context from workflows.transport.middleware import BaseTransportMiddleware +from workflows.transport.common_transport import TemporarySubscription, MessageCallback +import json - -class OTELTracingMiddleware(BaseTransportMiddleware): +class OTELTracingMiddleware: def __init__(self, tracer: trace.Tracer, service_name: str): self.tracer = tracer self.service_name = service_name - def subscribe(self, call_next: Callable, channel, callback, **kwargs) -> int: + def send(self, call_next: Callable, destination: str, message: Any, **kwargs): + # Get current span context (may be None if this is the root span) + current_span = trace.get_current_span() + parent_context = trace.set_span_in_context(current_span) if current_span else None + + with self.tracer.start_as_current_span( + "transport.send", + context=parent_context, + ) as span: + span.set_attribute("service_name", self.service_name) + + span.set_attribute("message", json.dumps(message)) + span.set_attribute("destination", destination) + print("parent_context is...",parent_context) + + + # Inject the current trace context into the message headers + headers = kwargs.get("headers", {}) + if headers is None: + headers = {} + inject(headers) # This modifies headers in-place + kwargs["headers"] = headers + + return call_next(destination, message, **kwargs) + + def subscribe(self, call_next: Callable, channel: str, callback: Callable, **kwargs) -> int: @functools.wraps(callback) def wrapped_callback(header, message): # Extract trace context from message headers - ctx = extract(header) if header else None + ctx = extract(header) if header else Context() # Start a new span with the extracted context with self.tracer.start_as_current_span( - "transport.subscribe", context=ctx + "transport.subscribe", + context=ctx, + ) as span: + span.set_attribute("service_name", self.service_name) + + span.set_attribute("message", json.dumps(message)) + span.set_attribute("channel", channel) + + # Call the original callback - this will process the message + # and potentially call send() which will pick up this context + return callback(header, message) + + return call_next(channel, wrapped_callback, **kwargs) + + def subscribe_broadcast(self, call_next: Callable, channel: str, callback: Callable, **kwargs) -> int: + @functools.wraps(callback) + def wrapped_callback(header, message): + # Extract trace context from message headers + ctx = extract(header) if header else Context() + + # # Start a new span with the extracted context + with self.tracer.start_as_current_span( + "transport.subscribe_broadcast", + context=ctx, ) as span: span.set_attribute("service_name", self.service_name) + + span.set_attribute("message", json.dumps(message)) span.set_attribute("channel", channel) - # Call the original callback return callback(header, message) - # Call the next middleware with the wrapped callback return call_next(channel, wrapped_callback, **kwargs) + + def subscribe_temporary( + self, + call_next: Callable, + channel_hint: str | None, + callback: MessageCallback, + **kwargs, + ) -> TemporarySubscription: + @functools.wraps(callback) + def wrapped_callback(header, message): + # Extract trace context from message headers + ctx = extract(header) if header else Context() + + # Start a new span with the extracted context + with self.tracer.start_as_current_span( + "transport.subscribe_temporary", + context=ctx, + ) as span: + span.set_attribute("service_name", self.service_name) + + span.set_attribute("message", json.dumps(message)) + if channel_hint: + span.set_attribute("channel_hint", channel_hint) + + return callback(header, message) + + return call_next(channel_hint, wrapped_callback, **kwargs) + + def unsubscribe( + self, + call_next: Callable, + subscription: int, + drop_callback_reference=False, + **kwargs, + ): + # Get current span context + current_span = trace.get_current_span() + current_context = trace.set_span_in_context(current_span) if current_span else Context() + + with self.tracer.start_as_current_span( + "transport.unsubscribe", + context=current_context, + ) as span: + span.set_attribute("service_name", self.service_name) + span.set_attribute("subscription_id", subscription) + + call_next( + subscription, drop_callback_reference=drop_callback_reference, **kwargs + ) + + def ack( + self, + call_next: Callable, + message, + subscription_id: int | None = None, + **kwargs, + ): + # Get current span context + current_span = trace.get_current_span() + current_context = trace.set_span_in_context(current_span) if current_span else Context() + + with self.tracer.start_as_current_span( + "transport.ack", + context=current_context, + ) as span: + span.set_attribute("service_name", self.service_name) + span.set_attribute("message", json.dumps(message)) + if subscription_id: + span.set_attribute("subscription_id", subscription_id) + + call_next(message, subscription_id=subscription_id, **kwargs) + + def nack( + self, + call_next: Callable, + message, + subscription_id: int | None = None, + **kwargs, + ): + # Get current span context + current_span = trace.get_current_span() + current_context = trace.set_span_in_context(current_span) if current_span else Context() + + with self.tracer.start_as_current_span( + "transport.nack", + context=current_context, + ) as span: + span.set_attribute("service_name", self.service_name) + + span.set_attribute("message", json.dumps(message)) + if subscription_id: + span.set_attribute("subscription_id", subscription_id) + + call_next(message, subscription_id=subscription_id, **kwargs) + + def transaction_begin( + self, call_next: Callable, subscription_id: int | None = None, **kwargs + ) -> int: + """Start a new transaction span""" + # Get current span context (may be None if this is the root span) + current_span = trace.get_current_span() + current_context = trace.set_span_in_context(current_span) if current_span else Context() + + with self.tracer.start_as_current_span( + "transaction.begin", + context=current_context, + ) as span: + span.set_attribute("service_name", self.service_name) + + if subscription_id: + span.set_attribute("subscription_id", subscription_id) + + return call_next(subscription_id=subscription_id, **kwargs) + + def transaction_abort(self, call_next: Callable, transaction_id: int | None = None, **kwargs): + """Abort a transaction span""" + # Get current span context + current_span = trace.get_current_span() + current_context = trace.set_span_in_context(current_span) if current_span else Context() + + with self.tracer.start_as_current_span( + "transaction.abort", + context=current_context, + ) as span: + span.set_attribute("service_name", self.service_name) + + if transaction_id: + span.set_attribute("transaction_id", transaction_id) + + call_next(transaction_id=transaction_id, **kwargs) + + def transaction_commit(self, call_next: Callable, transaction_id: int | None = None, **kwargs): + """Commit a transaction span""" + # Get current span context + current_span = trace.get_current_span() + current_context = trace.set_span_in_context(current_span) if current_span else Context() + + with self.tracer.start_as_current_span( + "transaction.commit", + context=current_context, + ) as span: + span.set_attribute("service_name", self.service_name) + if transaction_id: + span.set_attribute("transaction_id", transaction_id) + + call_next(transaction_id=transaction_id, **kwargs) + From 33fbce2b3c92aaaaf5af6d85aa4fedd9b29577e8 Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Tue, 17 Feb 2026 11:51:58 +0000 Subject: [PATCH 24/30] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- src/workflows/recipe/__init__.py | 29 ++++++---- src/workflows/services/common_service.py | 6 +- src/workflows/transport/common_transport.py | 1 + .../transport/middleware/__init__.py | 4 +- .../transport/middleware/otel_tracing.py | 58 +++++++++++++------ 5 files changed, 64 insertions(+), 34 deletions(-) diff --git a/src/workflows/recipe/__init__.py b/src/workflows/recipe/__init__.py index 0e4e2ee..cb58ecc 100644 --- a/src/workflows/recipe/__init__.py +++ b/src/workflows/recipe/__init__.py @@ -1,8 +1,9 @@ from __future__ import annotations -from contextlib import ExitStack + import functools import logging from collections.abc import Callable +from contextlib import ExitStack from typing import Any from opentelemetry import trace @@ -80,8 +81,6 @@ def unwrap_recipe(header, message): if recipe_id: span.set_attribute("recipe_id", recipe_id) - - # Extract span_id and trace_id for logging span_context = span.get_span_context() if span_context and span_context.is_valid: @@ -92,21 +91,29 @@ def unwrap_recipe(header, message): "span_id": span_id, "trace_id": trace_id, } - + if recipe_id: otel_logs["recipe_id"] = recipe_id - + with ExitStack() as stack: # Configure the context depending on if service is emitting spans - if otel_logs and log_extender and rw.environment and rw.environment.get("ID"): - stack.enter_context(log_extender('recipe_ID', rw.environment.get("ID"))) - stack.enter_context(log_extender('otel_logs', otel_logs)) + if ( + otel_logs + and log_extender + and rw.environment + and rw.environment.get("ID") + ): + stack.enter_context( + log_extender("recipe_ID", rw.environment.get("ID")) + ) + stack.enter_context(log_extender("otel_logs", otel_logs)) elif log_extender and rw.environment and rw.environment.get("ID"): - stack.enter_context(log_extender('recipe_ID', rw.environment.get("ID"))) + stack.enter_context( + log_extender("recipe_ID", rw.environment.get("ID")) + ) return callback(rw, header, message.get("payload")) - - + if allow_non_recipe_messages: return callback(None, header, message) # self.log.warning('Discarding non-recipe message:\n' + \ diff --git a/src/workflows/services/common_service.py b/src/workflows/services/common_service.py index 3d646f3..1f09306 100644 --- a/src/workflows/services/common_service.py +++ b/src/workflows/services/common_service.py @@ -192,10 +192,12 @@ def start_transport(self): self.transport.subscription_callback_set_intercept( self._transport_interceptor ) - + # Configure OTELTracing if configuration is available otel_config = ( - self.config._opentelemetry if self.config and hasattr(self.config, "opentelemetry") else None + self.config._opentelemetry + if self.config and hasattr(self.config, "opentelemetry") + else None ) if otel_config: # Configure OTELTracing diff --git a/src/workflows/transport/common_transport.py b/src/workflows/transport/common_transport.py index 435487f..8e26460 100644 --- a/src/workflows/transport/common_transport.py +++ b/src/workflows/transport/common_transport.py @@ -10,6 +10,7 @@ MessageCallback = Callable[[Mapping[str, Any], Any], None] + class TemporarySubscription(NamedTuple): subscription_id: int queue_name: str diff --git a/src/workflows/transport/middleware/__init__.py b/src/workflows/transport/middleware/__init__.py index 9357b18..aeb60fb 100644 --- a/src/workflows/transport/middleware/__init__.py +++ b/src/workflows/transport/middleware/__init__.py @@ -236,7 +236,7 @@ def wrap(f: Callable): # debugging if f.__name__ == "send": print("we are wrapping send now") - + @functools.wraps(f) def wrapper(self, *args, **kwargs): return functools.reduce( @@ -248,4 +248,4 @@ def wrapper(self, *args, **kwargs): )(*args, **kwargs) print(wrapper.__wrapped__) - return wrapper \ No newline at end of file + return wrapper diff --git a/src/workflows/transport/middleware/otel_tracing.py b/src/workflows/transport/middleware/otel_tracing.py index ddbad02..7fb11b0 100644 --- a/src/workflows/transport/middleware/otel_tracing.py +++ b/src/workflows/transport/middleware/otel_tracing.py @@ -1,15 +1,15 @@ from __future__ import annotations import functools +import json from collections.abc import Callable from opentelemetry import trace -from opentelemetry.propagate import extract, inject from opentelemetry.context import Context +from opentelemetry.propagate import extract, inject + +from workflows.transport.common_transport import MessageCallback, TemporarySubscription -from workflows.transport.middleware import BaseTransportMiddleware -from workflows.transport.common_transport import TemporarySubscription, MessageCallback -import json class OTELTracingMiddleware: def __init__(self, tracer: trace.Tracer, service_name: str): @@ -19,7 +19,9 @@ def __init__(self, tracer: trace.Tracer, service_name: str): def send(self, call_next: Callable, destination: str, message: Any, **kwargs): # Get current span context (may be None if this is the root span) current_span = trace.get_current_span() - parent_context = trace.set_span_in_context(current_span) if current_span else None + parent_context = ( + trace.set_span_in_context(current_span) if current_span else None + ) with self.tracer.start_as_current_span( "transport.send", @@ -29,8 +31,7 @@ def send(self, call_next: Callable, destination: str, message: Any, **kwargs): span.set_attribute("message", json.dumps(message)) span.set_attribute("destination", destination) - print("parent_context is...",parent_context) - + print("parent_context is...", parent_context) # Inject the current trace context into the message headers headers = kwargs.get("headers", {}) @@ -41,7 +42,9 @@ def send(self, call_next: Callable, destination: str, message: Any, **kwargs): return call_next(destination, message, **kwargs) - def subscribe(self, call_next: Callable, channel: str, callback: Callable, **kwargs) -> int: + def subscribe( + self, call_next: Callable, channel: str, callback: Callable, **kwargs + ) -> int: @functools.wraps(callback) def wrapped_callback(header, message): # Extract trace context from message headers @@ -63,13 +66,15 @@ def wrapped_callback(header, message): return call_next(channel, wrapped_callback, **kwargs) - def subscribe_broadcast(self, call_next: Callable, channel: str, callback: Callable, **kwargs) -> int: + def subscribe_broadcast( + self, call_next: Callable, channel: str, callback: Callable, **kwargs + ) -> int: @functools.wraps(callback) def wrapped_callback(header, message): # Extract trace context from message headers ctx = extract(header) if header else Context() - # # Start a new span with the extracted context + # # Start a new span with the extracted context with self.tracer.start_as_current_span( "transport.subscribe_broadcast", context=ctx, @@ -119,7 +124,9 @@ def unsubscribe( ): # Get current span context current_span = trace.get_current_span() - current_context = trace.set_span_in_context(current_span) if current_span else Context() + current_context = ( + trace.set_span_in_context(current_span) if current_span else Context() + ) with self.tracer.start_as_current_span( "transport.unsubscribe", @@ -141,7 +148,9 @@ def ack( ): # Get current span context current_span = trace.get_current_span() - current_context = trace.set_span_in_context(current_span) if current_span else Context() + current_context = ( + trace.set_span_in_context(current_span) if current_span else Context() + ) with self.tracer.start_as_current_span( "transport.ack", @@ -163,7 +172,9 @@ def nack( ): # Get current span context current_span = trace.get_current_span() - current_context = trace.set_span_in_context(current_span) if current_span else Context() + current_context = ( + trace.set_span_in_context(current_span) if current_span else Context() + ) with self.tracer.start_as_current_span( "transport.nack", @@ -183,7 +194,9 @@ def transaction_begin( """Start a new transaction span""" # Get current span context (may be None if this is the root span) current_span = trace.get_current_span() - current_context = trace.set_span_in_context(current_span) if current_span else Context() + current_context = ( + trace.set_span_in_context(current_span) if current_span else Context() + ) with self.tracer.start_as_current_span( "transaction.begin", @@ -196,11 +209,15 @@ def transaction_begin( return call_next(subscription_id=subscription_id, **kwargs) - def transaction_abort(self, call_next: Callable, transaction_id: int | None = None, **kwargs): + def transaction_abort( + self, call_next: Callable, transaction_id: int | None = None, **kwargs + ): """Abort a transaction span""" # Get current span context current_span = trace.get_current_span() - current_context = trace.set_span_in_context(current_span) if current_span else Context() + current_context = ( + trace.set_span_in_context(current_span) if current_span else Context() + ) with self.tracer.start_as_current_span( "transaction.abort", @@ -213,11 +230,15 @@ def transaction_abort(self, call_next: Callable, transaction_id: int | None = No call_next(transaction_id=transaction_id, **kwargs) - def transaction_commit(self, call_next: Callable, transaction_id: int | None = None, **kwargs): + def transaction_commit( + self, call_next: Callable, transaction_id: int | None = None, **kwargs + ): """Commit a transaction span""" # Get current span context current_span = trace.get_current_span() - current_context = trace.set_span_in_context(current_span) if current_span else Context() + current_context = ( + trace.set_span_in_context(current_span) if current_span else Context() + ) with self.tracer.start_as_current_span( "transaction.commit", @@ -228,4 +249,3 @@ def transaction_commit(self, call_next: Callable, transaction_id: int | None = N span.set_attribute("transaction_id", transaction_id) call_next(transaction_id=transaction_id, **kwargs) - From ed6bc93c3e096daffa154586f6d378e9a418912f Mon Sep 17 00:00:00 2001 From: David Igandan Date: Tue, 17 Feb 2026 12:43:19 +0000 Subject: [PATCH 25/30] Fix rw.environment.get('ID') bug --- src/workflows/recipe/__init__.py | 76 ++++++++++++++++---------------- 1 file changed, 38 insertions(+), 38 deletions(-) diff --git a/src/workflows/recipe/__init__.py b/src/workflows/recipe/__init__.py index cb58ecc..f299c4f 100644 --- a/src/workflows/recipe/__init__.py +++ b/src/workflows/recipe/__init__.py @@ -73,46 +73,46 @@ def unwrap_recipe(header, message): if header.get("workflows-recipe") in {True, "True", "true", 1}: otel_logs = None rw = RecipeWrapper(message=message, transport=transport_layer) - - # Extract recipe ID from environment and add to current span - span = trace.get_current_span() - recipe_id = rw.environment.get("ID") - - if recipe_id: - span.set_attribute("recipe_id", recipe_id) - - # Extract span_id and trace_id for logging - span_context = span.get_span_context() - if span_context and span_context.is_valid: - span_id = span_context.span_id - trace_id = span_context.trace_id - - otel_logs = { - "span_id": span_id, - "trace_id": trace_id, - } + if rw.environment.get("ID"): + # Extract recipe ID from environment and add to current span + span = trace.get_current_span() + recipe_id = rw.environment.get("ID") if recipe_id: - otel_logs["recipe_id"] = recipe_id - - with ExitStack() as stack: - # Configure the context depending on if service is emitting spans - if ( - otel_logs - and log_extender - and rw.environment - and rw.environment.get("ID") - ): - stack.enter_context( - log_extender("recipe_ID", rw.environment.get("ID")) - ) - stack.enter_context(log_extender("otel_logs", otel_logs)) - elif log_extender and rw.environment and rw.environment.get("ID"): - stack.enter_context( - log_extender("recipe_ID", rw.environment.get("ID")) - ) - - return callback(rw, header, message.get("payload")) + span.set_attribute("recipe_id", recipe_id) + + # Extract span_id and trace_id for logging + span_context = span.get_span_context() + if span_context and span_context.is_valid: + span_id = span_context.span_id + trace_id = span_context.trace_id + + otel_logs = { + "span_id": span_id, + "trace_id": trace_id, + } + + if recipe_id: + otel_logs["recipe_id"] = recipe_id + + with ExitStack() as stack: + # Configure the context depending on if service is emitting spans + if ( + otel_logs + and log_extender + and rw.environment + and rw.environment.get("ID") + ): + stack.enter_context( + log_extender("recipe_ID", rw.environment.get("ID")) + ) + stack.enter_context(log_extender("otel_logs", otel_logs)) + elif log_extender and rw.environment and rw.environment.get("ID"): + stack.enter_context( + log_extender("recipe_ID", rw.environment.get("ID")) + ) + + return callback(rw, header, message.get("payload")) if allow_non_recipe_messages: return callback(None, header, message) From 2369ba115f83a73d2db6fb20f6f8d44ce23206d7 Mon Sep 17 00:00:00 2001 From: David Igandan Date: Tue, 17 Feb 2026 13:00:16 +0000 Subject: [PATCH 26/30] Ensure environment and environment.id exists --- src/workflows/recipe/__init__.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/workflows/recipe/__init__.py b/src/workflows/recipe/__init__.py index f299c4f..0c115ee 100644 --- a/src/workflows/recipe/__init__.py +++ b/src/workflows/recipe/__init__.py @@ -73,7 +73,8 @@ def unwrap_recipe(header, message): if header.get("workflows-recipe") in {True, "True", "true", 1}: otel_logs = None rw = RecipeWrapper(message=message, transport=transport_layer) - if rw.environment.get("ID"): + + if hasattr(rw,"environment") and rw.environment.get("ID"): # Extract recipe ID from environment and add to current span span = trace.get_current_span() recipe_id = rw.environment.get("ID") From 554baa292003aa4bd4f76007fad2253e4e40e6b0 Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Tue, 17 Feb 2026 13:00:26 +0000 Subject: [PATCH 27/30] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- src/workflows/recipe/__init__.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/workflows/recipe/__init__.py b/src/workflows/recipe/__init__.py index 0c115ee..ad8f9ed 100644 --- a/src/workflows/recipe/__init__.py +++ b/src/workflows/recipe/__init__.py @@ -73,8 +73,8 @@ def unwrap_recipe(header, message): if header.get("workflows-recipe") in {True, "True", "true", 1}: otel_logs = None rw = RecipeWrapper(message=message, transport=transport_layer) - - if hasattr(rw,"environment") and rw.environment.get("ID"): + + if hasattr(rw, "environment") and rw.environment.get("ID"): # Extract recipe ID from environment and add to current span span = trace.get_current_span() recipe_id = rw.environment.get("ID") From 7221ec44dbfcb07759733a9895b999cdd3c6cf6d Mon Sep 17 00:00:00 2001 From: David Igandan Date: Tue, 17 Feb 2026 13:10:34 +0000 Subject: [PATCH 28/30] Remove the need for enironment variable in mock --- src/workflows/recipe/__init__.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/src/workflows/recipe/__init__.py b/src/workflows/recipe/__init__.py index ad8f9ed..3043333 100644 --- a/src/workflows/recipe/__init__.py +++ b/src/workflows/recipe/__init__.py @@ -74,7 +74,7 @@ def unwrap_recipe(header, message): otel_logs = None rw = RecipeWrapper(message=message, transport=transport_layer) - if hasattr(rw, "environment") and rw.environment.get("ID"): + if hasattr(rw,"environment") and rw.environment.get("ID"): # Extract recipe ID from environment and add to current span span = trace.get_current_span() recipe_id = rw.environment.get("ID") @@ -112,8 +112,10 @@ def unwrap_recipe(header, message): stack.enter_context( log_extender("recipe_ID", rw.environment.get("ID")) ) + + return callback(rw,header,message.get("payload")) - return callback(rw, header, message.get("payload")) + return callback(rw, header, message.get("payload")) if allow_non_recipe_messages: return callback(None, header, message) From b7e799924fd918e043a5571e7ee73806cc51fecb Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Tue, 17 Feb 2026 13:12:11 +0000 Subject: [PATCH 29/30] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- src/workflows/recipe/__init__.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/workflows/recipe/__init__.py b/src/workflows/recipe/__init__.py index 3043333..73bba19 100644 --- a/src/workflows/recipe/__init__.py +++ b/src/workflows/recipe/__init__.py @@ -74,7 +74,7 @@ def unwrap_recipe(header, message): otel_logs = None rw = RecipeWrapper(message=message, transport=transport_layer) - if hasattr(rw,"environment") and rw.environment.get("ID"): + if hasattr(rw, "environment") and rw.environment.get("ID"): # Extract recipe ID from environment and add to current span span = trace.get_current_span() recipe_id = rw.environment.get("ID") @@ -112,8 +112,8 @@ def unwrap_recipe(header, message): stack.enter_context( log_extender("recipe_ID", rw.environment.get("ID")) ) - - return callback(rw,header,message.get("payload")) + + return callback(rw, header, message.get("payload")) return callback(rw, header, message.get("payload")) From 2fd00fadf2eea4d917545c34acafb3534b9e0f40 Mon Sep 17 00:00:00 2001 From: David Igandan Date: Tue, 17 Feb 2026 14:31:57 +0000 Subject: [PATCH 30/30] Fix ruff error --- src/workflows/transport/middleware/otel_tracing.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/workflows/transport/middleware/otel_tracing.py b/src/workflows/transport/middleware/otel_tracing.py index 7fb11b0..13cd82e 100644 --- a/src/workflows/transport/middleware/otel_tracing.py +++ b/src/workflows/transport/middleware/otel_tracing.py @@ -16,7 +16,7 @@ def __init__(self, tracer: trace.Tracer, service_name: str): self.tracer = tracer self.service_name = service_name - def send(self, call_next: Callable, destination: str, message: Any, **kwargs): + def send(self, call_next: Callable, destination: str, message, **kwargs): # Get current span context (may be None if this is the root span) current_span = trace.get_current_span() parent_context = (