Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ jobs:
if: env.skip_backend_tests == 'false'
run: |
cd src/ContentProcessor
python -m pytest -vv --cov=. --cov-report=xml --cov-report=term-missing --cov-fail-under=80
python -m pytest -vv --cov=src --cov-report=xml --cov-report=term-missing --cov-fail-under=80

- name: Skip Backend Tests
if: env.skip_backend_tests == 'true'
Expand Down
10 changes: 10 additions & 0 deletions src/ContentProcessor/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -41,5 +41,15 @@ addopts = "--maxfail=1"
testpaths = ["tests"]
pythonpath = ["src"]

[tool.coverage.run]
source = ["src"]
omit = ["src/tests/*", "**/test_*.py", "**/*_test.py"]

[tool.coverage.report]
exclude_lines = [
"pragma: no cover",
"if __name__ == .__main__.:",
]

[tool.poetry.scripts]
start-app = "src.main:main"
99 changes: 99 additions & 0 deletions src/ContentProcessor/src/tests/models/test_content_process.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
"""Tests for ContentProcess model."""

from unittest.mock import patch, MagicMock
from libs.models.content_process import ContentProcess, Step_Outputs


class TestStepOutputs:
"""Tests for Step_Outputs class."""

def test_step_outputs_creation(self):
"""Test creating Step_Outputs."""
step = Step_Outputs(
step_name="extract",
processed_time="2026-01-01T00:00:00Z",
step_result={"extracted": "data"}
)
assert step.step_name == "extract"
assert step.step_result == {"extracted": "data"}


class TestContentProcess:
"""Tests for ContentProcess class."""

def test_content_process_creation(self):
"""Test creating ContentProcess."""
process = ContentProcess(
process_id="test-123",
status="processing"
)
assert process.process_id == "test-123"
assert process.status == "processing"

@patch("libs.models.content_process.CosmosMongDBHelper")
def test_update_process_status_to_cosmos_existing(self, mock_cosmos):
"""Test updating existing process status in Cosmos."""
mock_instance = MagicMock()
mock_instance.find_document.return_value = [{"process_id": "test-123"}]
mock_cosmos.return_value = mock_instance

process = ContentProcess(process_id="test-123", status="completed")
process.update_process_status_to_cosmos(
"connection_string",
"database",
"collection"
)

mock_instance.find_document.assert_called_once()
mock_instance.update_document.assert_called_once()

@patch("libs.models.content_process.CosmosMongDBHelper")
def test_update_process_status_to_cosmos_new(self, mock_cosmos):
"""Test inserting new process status in Cosmos."""
mock_instance = MagicMock()
mock_instance.find_document.return_value = []
mock_cosmos.return_value = mock_instance

process = ContentProcess(process_id="test-123", status="processing")
process.update_process_status_to_cosmos(
"connection_string",
"database",
"collection"
)

mock_instance.find_document.assert_called_once()
mock_instance.insert_document.assert_called_once()

@patch("libs.models.content_process.CosmosMongDBHelper")
def test_update_status_to_cosmos_existing(self, mock_cosmos):
"""Test updating existing status in Cosmos."""
mock_instance = MagicMock()
mock_instance.find_document.return_value = [{"process_id": "test-123"}]
mock_cosmos.return_value = mock_instance

process = ContentProcess(process_id="test-123", status="completed")
process.update_status_to_cosmos(
"connection_string",
"database",
"collection"
)

mock_instance.find_document.assert_called_once()
mock_instance.update_document.assert_called_once()

@patch("libs.models.content_process.CosmosMongDBHelper")
def test_update_status_to_cosmos_new(self, mock_cosmos):
"""Test inserting new status in Cosmos."""
mock_instance = MagicMock()
mock_instance.find_document.return_value = []
mock_cosmos.return_value = mock_instance

process = ContentProcess(process_id="test-123", status="processing")
process.update_status_to_cosmos(
"connection_string",
"database",
"collection"
)

mock_instance.find_document.assert_called_once()
mock_instance.insert_document.assert_called_once()
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
import pytest
from unittest.mock import Mock
from unittest.mock import Mock, patch, MagicMock
from libs.pipeline.entities.pipeline_step_result import StepResult
from libs.pipeline.entities.pipeline_status import PipelineStatus
from libs.pipeline.entities.pipeline_data import DataPipeline
from libs.pipeline.entities.pipeline_file import ArtifactType


def test_update_step():
Expand Down Expand Up @@ -47,57 +49,6 @@ def test_get_previous_step_result():
assert result is None


# def test_save_to_persistent_storage(mocker):
# # Mock the StorageBlobHelper.upload_text method
# mock_upload_text = mocker.patch(
# "libs.azure_helper.storage_blob.StorageBlobHelper.upload_text"
# )

# # Mock the StorageBlobHelper constructor to return a mock instance
# mock_storage_blob_helper = mocker.patch(
# "libs.azure_helper.storage_blob.StorageBlobHelper", autospec=True
# )
# mock_storage_blob_helper_instance = mock_storage_blob_helper.return_value

# # Mock the create_container method on the container_client
# mock_container_client = Mock()
# mock_container_client.create_container = Mock()
# mock_storage_blob_helper_instance._invalidate_container = Mock()
# mock_storage_blob_helper_instance._invalidate_container.return_value = (
# mock_container_client
# )

# # Create a PipelineStatus object with a process_id
# pipeline_status = PipelineStatus(process_id="123")

# # Mock the update_step method using pytest-mock
# mock_update_step = mocker.patch.object(
# PipelineStatus, "update_step", return_value=None
# )

# # Mock the model_dump_json method using pytest-mock
# mock_model_dump_json = mocker.patch.object(
# PipelineStatus, "model_dump_json", return_value='{"key": "value"}'
# )

# account_url = "https://example.com"
# container_name = "container"

# # Call the save_to_persistent_storage method
# pipeline_status.save_to_persistent_storage(account_url, container_name)

# # Assert that update_step was called once
# mock_update_step.assert_called_once()

# # Assert that model_dump_json was called once
# mock_model_dump_json.assert_called_once()

# # Assert that upload_text was called with the correct arguments
# mock_upload_text.assert_called_once_with(
# container_name="123", blob_name="process-status.json", text='{"key": "value"}'
# )


def test_save_to_persistent_storage_no_process_id():
pipeline_status = PipelineStatus()
with pytest.raises(ValueError, match="Process ID is required to save the result."):
Expand All @@ -115,3 +66,91 @@ def test_move_to_next_step():
assert pipeline_status.completed_steps == ["step1", "step2"]
assert pipeline_status.remaining_steps == []
assert pipeline_status.completed is True


# DataPipeline Tests
class TestDataPipeline:
"""Tests for DataPipeline class."""

def test_get_object_valid_json(self):
"""Test parsing valid JSON string to DataPipeline."""
json_str = '{"process_id": "test-123", "PipelineStatus": {"Completed": false}, "Files": []}'
result = DataPipeline.get_object(json_str)
assert result.process_id == "test-123"
assert result.pipeline_status is not None

def test_get_object_invalid_json(self):
"""Test that invalid JSON raises ValueError."""
with pytest.raises(ValueError, match="Failed to parse"):
DataPipeline.get_object("invalid json {")

def test_add_file(self):
"""Test adding a file to the pipeline."""
pipeline_status = PipelineStatus(process_id="test-123", active_step="step1")
data_pipeline = DataPipeline(process_id="test-123", pipeline_status=pipeline_status)

file = data_pipeline.add_file("document.pdf", ArtifactType.SourceContent)

assert len(data_pipeline.files) == 1
assert file.name == "document.pdf"
assert file.artifact_type == ArtifactType.SourceContent
assert file.processed_by == "step1"

def test_get_step_result(self):
"""Test getting step result from DataPipeline."""
pipeline_status = PipelineStatus(process_id="test-123")
step_result = StepResult(step_name="extract", result={"data": "value"})
pipeline_status.process_results.append(step_result)

data_pipeline = DataPipeline(process_id="test-123", pipeline_status=pipeline_status)

result = data_pipeline.get_step_result("extract")
assert result == step_result

def test_get_previous_step_result(self):
"""Test getting previous step result from DataPipeline."""
pipeline_status = PipelineStatus(process_id="test-123", completed_steps=["step1"])
step_result = StepResult(step_name="step1", result={"data": "value"})
pipeline_status.process_results.append(step_result)

data_pipeline = DataPipeline(process_id="test-123", pipeline_status=pipeline_status)

result = data_pipeline.get_previous_step_result("step2")
assert result == step_result

def test_get_source_files(self):
"""Test getting source files from pipeline."""
pipeline_status = PipelineStatus(process_id="test-123", active_step="step1")
data_pipeline = DataPipeline(process_id="test-123", pipeline_status=pipeline_status)

# Add source file
data_pipeline.add_file("source.pdf", ArtifactType.SourceContent)
# Add extracted file
data_pipeline.add_file("output.json", ArtifactType.ExtractedContent)

source_files = data_pipeline.get_source_files()

assert len(source_files) == 1
assert source_files[0].name == "source.pdf"

def test_save_to_database_not_implemented(self):
"""Test that save_to_database raises NotImplementedError."""
pipeline_status = PipelineStatus(process_id="test-123")
data_pipeline = DataPipeline(process_id="test-123", pipeline_status=pipeline_status)

with pytest.raises(NotImplementedError):
data_pipeline.save_to_database()

@patch("libs.pipeline.entities.pipeline_data.StorageBlobHelper")
def test_save_to_persistent_storage(self, mock_storage_helper):
"""Test saving pipeline to persistent storage."""
mock_instance = MagicMock()
mock_storage_helper.return_value = mock_instance

pipeline_status = PipelineStatus(process_id="test-123")
data_pipeline = DataPipeline(process_id="test-123", pipeline_status=pipeline_status)

data_pipeline.save_to_persistent_storage("https://storage.blob.core.windows.net", "container")

mock_storage_helper.assert_called_once()
mock_instance.upload_text.assert_called_once()
Loading