Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
118 changes: 42 additions & 76 deletions .github/workflows/tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -11,99 +11,65 @@ on:
- ".github/workflows/tests.yml"

jobs:
test-worker:
test-datashare-python:
runs-on: ubuntu-latest
env:
DATASHARE_VERSION: "20.12.0"
DATASHARE_PACKAGE: "datashare-20.12.0.deb"
PYTHON_VERSION: "3.13"
ASTRAL_VERSION: "0.10.3"
BLACK_VERSION: "~= 24.2.0"
TEMPORAL_VERSION: "1.5.1"
PYTHONUNBUFFERED: 1
services:
elasticsearch:
image: elasticsearch:7.17.28
env:
http.host: "0.0.0.0"
transport.host: "0.0.0.0"
cluster.name: datashare
discovery.type: single-node
discovery.zen.minimum_master_nodes: 1
xpack.license.self_generated.type: basic
http.cors.enabled: true
http.cors.allow-origin: "*"
http.cors.allow-methods: OPTIONS, HEAD, GET, POST, PUT, DELETE
options: >-
--health-cmd "curl --silent --fail elasticsearch:9200/_cluster/health || exit 1"
--health-interval 3s
--health-timeout 1s
--health-retries 10
--health-start-period 5s
ports:
- "9200:9200"
postgres:
image: postgres:15
env:
POSTGRES_USER: datashare
POSTGRES_PASSWORD: datashare
POSTGRES_DB: datashare
# This is needed by the heathcheck command
# @see https://stackoverflow.com/a/60194261
PGUSER: datashare
options: >-
--health-cmd "pg_isready -U datashare -d datashare"
--health-interval 3s
--health-timeout 1s
--health-retries 10
--health-start-period 5s
ports:
- "5432:5432"
redis:
image: redis:8.6-rc1-trixie
options: >-
--health-cmd "redis-cli --raw incr ping"
--health-interval 3s
--health-timeout 1s
--health-retries 10
--health-start-period 5s
ports:
- "6379:6379"
steps:
- uses: actions/checkout@v4
- name: Setup Python project
uses: actions/setup-python@v6
with:
python-version: ${{ env.PYTHON_VERSION }}
- name: Start Temporal
- name: Cache Docker images
uses: ScribeMD/docker-cache@0.5.0
with:
key: docker-${{ runner.os }}-${{ hashFiles('docker-compose.yml') }}
- name: Start test services
run: docker compose up -d datashare temporal-post-init elasticsearch
- name: test temporal setup
run: |
docker run -d --rm -p 7233:7233 -p 8233:8233 temporalio/temporal:${{ env.TEMPORAL_VERSION }} server start-dev --ip 0.0.0.0
# - name: Download and install datashare
# run: |
# wget "https://github.com/ICIJ/datashare-installer/releases/download/${{ env.DATASHARE_VERSION }}/${{ env.DATASHARE_PACKAGE }}"
# sudo apt update && sudo apt install -y ./${{ env.DATASHARE_PACKAGE }}
# - name: Run Datashare in background
# run: ./scripts/run_datashare.sh &
curl "https://temporal.download/cli/archive/latest?platform=linux&arch=amd64" --output temporal.tar.gz
tar xzvf temporal.tar.gz
sudo mv temporal /usr/local/bin
temporal operator namespace describe -n datashare-default --address localhost:7233
- name: Install uv
uses: astral-sh/setup-uv@v7
with:
version: ${{ env.ASTRAL_VERSION }}
python-version: ${{ env.PYTHON_VERSION }}
enable-cache: true
activate-environment: true
- name: Install ffmpeg
run: sudo apt update && sudo apt install -y ffmpeg
- name: Install worker deps
run: |
make install-deps
make create-dirs
- name: Run workers in background
working-directory: datashare-python
- name: Run tests
run: |
./scripts/worker_entrypoint.sh localhost 7233 &
cd datashare-python
uv sync --frozen --all-extras
uv run --frozen python -m pytest -vvv --cache-clear --show-capture=all -r A

test-worker-template:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- name: Setup Python project
uses: actions/setup-python@v6
with:
python-version: ${{ env.PYTHON_VERSION }}
- name: Cache Docker images
uses: ScribeMD/docker-cache@0.5.0
with:
key: docker-${{ runner.os }}-${{ hashFiles('docker-compose.yml') }}
- name: Start test services
run: docker compose up -d datashare temporal-post-init elasticsearch
- name: Install uv
uses: astral-sh/setup-uv@v7
with:
version: ${{ env.ASTRAL_VERSION }}
python-version: ${{ env.PYTHON_VERSION }}
enable-cache: true
working-directory: worker-template
- name: Run tests
run: |
uv pip install pytest-timeout
uv run --frozen pytest --timeout=120 -vvv --cache-clear --show-capture=all -r A **/tests/
cd worker-template
uv sync --frozen --all-extras
uv run --frozen python -m pytest --timeout=180 -vvv --cache-clear --show-capture=all -r A

concurrency:
group: ${{ github.workflow }}-${{ github.ref }}
Expand Down
12 changes: 3 additions & 9 deletions .testcontainers/temporal/dynamic.yml
Original file line number Diff line number Diff line change
@@ -1,9 +1,3 @@
limit.maxIDLength:
- value: 255
constraints: { }
system.namespaceCacheRefreshInterval:
- value: 0.1s
constraints: { }
system.forceSearchAttributesCacheRefreshOnRead:
- value: true
constraints: { }
system:
namespaceCacheRefreshInterval: 0.1s
forceSearchAttributesCacheRefreshOnRead: true
1 change: 1 addition & 0 deletions .testcontainers/temporal/post-init.sh
Original file line number Diff line number Diff line change
Expand Up @@ -17,4 +17,5 @@ create_custom_search_attributes_namespace() {
}

create_temporal_namespace
sleep 5
create_custom_search_attributes_namespace
11 changes: 7 additions & 4 deletions asr-worker/asr_worker/activities.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import torchaudio
from caul.model_handlers.helpers import ParakeetModelHandlerResult
from caul.configs.parakeet import ParakeetConfig
from caul.model_handlers.helpers import ParakeetModelHandlerResult
from caul.tasks.preprocessing.helpers import PreprocessedInput
from temporalio import activity

Expand All @@ -16,7 +16,7 @@ def __init__(self):
# load models
self.asr_handler.startup()

@activity.defn
@activity.defn(name="asr.transcription.preprocess")
async def preprocess(self, inputs: list[str]) -> list[list[PreprocessedInput]]:
"""Preprocess transcription inputs

Expand All @@ -25,7 +25,7 @@ async def preprocess(self, inputs: list[str]) -> list[list[PreprocessedInput]]:
"""
return self.asr_handler.preprocessor.process(inputs)

@activity.defn
@activity.defn(name="asr.transcription.infer")
async def infer(
self, inputs: list[PreprocessedInput]
) -> list[ParakeetModelHandlerResult]:
Expand All @@ -44,7 +44,7 @@ async def infer(

return self.asr_handler.inference_handler.process(inputs)

@activity.defn
@activity.defn(name="asr.transcription.postprocess")
async def postprocess(
self, inputs: list[ParakeetModelHandlerResult]
) -> list[ParakeetModelHandlerResult]:
Expand All @@ -54,3 +54,6 @@ async def postprocess(
:return: list of parakeet inference handler results
"""
return self.asr_handler.postprocessor.process(inputs)


REGISTRY = [ASRActivities.preprocess, ASRActivities.infer, ASRActivities.postprocess]
10 changes: 6 additions & 4 deletions asr-worker/asr_worker/workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,16 @@
from more_itertools import flatten
from temporalio import workflow

from asr_worker.models import ASRResponse, ASRInputs
from asr_worker.constants import _TEN_MINUTES, RESPONSE_SUCCESS, RESPONSE_ERROR
from asr_worker.constants import _TEN_MINUTES, RESPONSE_ERROR, RESPONSE_SUCCESS
from asr_worker.models import ASRInputs, ASRResponse

with workflow.unsafe.imports_passed_through():
from asr_worker.activities import ASRActivities


# TODO: Figure out which modules are violating sandbox restrictions
# and grant a limited passthrough
@workflow.defn(sandboxed=False)
@workflow.defn(name="asr.transcription", sandboxed=False)
class ASRWorkflow:
"""ASR workflow definition"""

Expand All @@ -28,7 +28,6 @@ async def run(self, inputs: ASRInputs) -> ASRResponse:
:param inputs: ASRInputs
:return: ASRResponse
"""

try:
# Preprocessing
preprocessed_batches = await gather(
Expand Down Expand Up @@ -99,3 +98,6 @@ async def run(self, inputs: ASRInputs) -> ASRResponse:
except ValueError as e:
workflow.logger.exception(e)
return ASRResponse(status=RESPONSE_ERROR, error=str(e))


WORKFLOWS = [ASRWorkflow]
9 changes: 9 additions & 0 deletions asr-worker/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,11 @@ dependencies = [
"ml-dtypes>=0.5",
"kaldialign>=0.9.3",
]
[project.entry-points."datashare.workflows"]
workflows = "asr_worker.workflows:REGISTRY"

[project.entry-points."datashare.activities"]
activities = "asr_worker.activities:REGISTRY"

[[tool.uv.index]]
name = "pytorch-cpu"
Expand Down Expand Up @@ -69,3 +74,7 @@ markers = [
"integration",
"pull",
]
log_cli = 1
log_cli_level = "DEBUG"
log_file_format = "[%(levelname)s][%(asctime)s.%(msecs)03d][%(name)s]: %(message)s"
log_file_date_format = "%Y-%m-%d %H:%M:%S"
22 changes: 19 additions & 3 deletions datashare-python/datashare_python/cli/__init__.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,20 @@
import importlib.metadata
import os
from typing import Annotated

import typer
from icij_common.logging_utils import setup_loggers

import datashare_python
from datashare_python.cli.local import local_app
from datashare_python.cli.tasks import task_app
from datashare_python.cli.task import task_app
from datashare_python.cli.utils import AsyncTyper
from datashare_python.cli.worker import worker_app

cli_app = AsyncTyper(context_settings={"help_option_names": ["-h", "--help"]})
cli_app.add_typer(task_app)
cli_app.add_typer(local_app)
cli_app.add_typer(worker_app)


def version_callback(value: bool) -> None: # noqa: FBT001
Expand All @@ -20,12 +24,24 @@ def version_callback(value: bool) -> None: # noqa: FBT001
raise typer.Exit()


def pretty_exc_callback(value: bool) -> None: # noqa: FBT001
if not value:
os.environ["TYPER_STANDARD_TRACEBACK"] = "1"


@cli_app.callback()
def main(
version: Annotated[
version: Annotated[ # noqa: ARG001
bool | None,
typer.Option("--version", callback=version_callback, is_eager=True),
] = None,
*,
pretty_exceptions: Annotated[ # noqa: ARG001
bool,
typer.Option(
"--pretty-exceptions", callback=pretty_exc_callback, is_eager=True
),
] = False,
) -> None:
"""Datashare Python CLI."""
pass
setup_loggers(["__main__", datashare_python.__name__])
25 changes: 0 additions & 25 deletions datashare-python/datashare_python/cli/local.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
import json
from typing import Annotated

import typer
Expand All @@ -7,38 +6,14 @@
from datashare_python.constants import DEFAULT_NAMESPACE, DEFAULT_TEMPORAL_ADDRESS
from datashare_python.local_client import LocalClient

_START_WORKERS_HELP = "starts local temporal workers"
_REGISTER_NAMESPACE_HELP = "register namespace"
_WORKERS_HELP = "list of worker paths"
_TEMPORAL_URL_HELP = "address for temporal server"
_NAMESPACE_HELP = "namespace name"
_LOCAL = "local"

local_app = AsyncTyper(name=_LOCAL)


@local_app.async_command(help=_START_WORKERS_HELP)
async def start_workers(
worker_paths: Annotated[
str, typer.Option("--worker-paths", "-wp", help=_WORKERS_HELP)
] = None,
temporal_address: Annotated[
str, typer.Option("--temporal-address", "-a", help=_TEMPORAL_URL_HELP)
] = DEFAULT_TEMPORAL_ADDRESS,
) -> None:
match worker_paths:
case str():
worker_paths = json.loads(worker_paths)
case None:
worker_paths = dict()
case _:
raise TypeError(f"Invalid worker paths: {worker_paths}")

client = LocalClient()

await client.start_workers(temporal_address, worker_paths)


@local_app.async_command(help=_REGISTER_NAMESPACE_HELP)
async def register_namespace(
namespace: Annotated[
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@


@task_app.async_command(help=_START_HELP)
async def start_task(
async def start(
name: Annotated[str, typer.Argument(help=_NAME_HELP)],
args: Annotated[TaskArgs, typer.Argument(help=_ARGS_HELP)] = None,
group: Annotated[
Expand Down Expand Up @@ -84,7 +84,7 @@ async def watch(
client = DatashareTaskClient(ds_address, api_key=ds_api_key)
async with client:
task = await client.get_task(task_id)
if task.state is READY_STATES:
if task.state in READY_STATES:
await _handle_ready(task, client, already_done=True)
await _handle_alive(task, client, polling_interval_s)
print(task_id)
Expand Down
Loading
Loading