diff --git a/pyproject.toml b/pyproject.toml index b0baa53..58f47e3 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -16,6 +16,7 @@ dependencies = [ "requests", "pydantic>=2,<3", "workflows>=3.0", + "opentelemetry-api==1.20" ] [dependency-groups] diff --git a/src/zocalo/service/dispatcher.py b/src/zocalo/service/dispatcher.py index d6a30ee..bbc3c32 100644 --- a/src/zocalo/service/dispatcher.py +++ b/src/zocalo/service/dispatcher.py @@ -11,9 +11,23 @@ from importlib.metadata import entry_points import workflows.recipe +from opentelemetry import trace from workflows.services.common_service import CommonService +# Helper method to get dcid. Used for injecting it into current span +def _extract_dcid(params: dict) -> int | None: + if not isinstance(params, dict): + return None + + if dcid := params.get("ispyb_dcid"): + return dcid + if dcid := params.get("dcid"): + return dcid + + return None + + class Dispatcher(CommonService): """ Single point of contact service that takes in job meta-information @@ -205,6 +219,18 @@ def process(self, rw, header, message): recipe_id = parameters.get("guid") or str(uuid.uuid4()) parameters["guid"] = recipe_id + # Extract DCID and set on trace span if OpenTelemetry is available + if trace is not None: + try: + span = trace.get_current_span() + if span and span.is_recording(): + dcid = _extract_dcid(parameters) + if dcid: + span.set_attribute("dcid", dcid) + self.log.debug(f"Set DCID {dcid} on trace span") + except Exception as e: + self.log.warning(f"Failed to set DCID on trace span: {e}") + if rw: # If we received a recipe wrapper then we already have a recipe_ID # attached to logs. Make a note of the downstream recipe ID so that