From 4b67d44dff53de4b70f14e8e115cf19128fb0258 Mon Sep 17 00:00:00 2001 From: Ganesh Patil <7030871503ganeshpatil@gmail.com> Date: Tue, 24 Feb 2026 18:12:46 +0530 Subject: [PATCH] fix: raise TimeoutError on ZMQ retry exhaustion instead of returning None (#393) - recv_json_with_retry() now raises TimeoutError after 5 failed attempts instead of returning None - send_json_with_retry() now raises TimeoutError after 5 failed attempts instead of silently returning None - read() catches TimeoutError explicitly and returns (default_return_val, False) - write() catches TimeoutError explicitly and logs error without crashing - Patched time.sleep in retry tests to avoid ~2.5s real sleeping per test - Fixed ruff formatting in test files Closes #393 --- concore_base.py | 12 +++-- tests/test_concore.py | 95 +++++++++++++++++++++++++++++++++++++ tests/test_concoredocker.py | 58 ++++++++++++++++++++++ tests/test_read_status.py | 50 ++++++++++++------- 4 files changed, 195 insertions(+), 20 deletions(-) diff --git a/concore_base.py b/concore_base.py index 35f2c34..d11b40e 100644 --- a/concore_base.py +++ b/concore_base.py @@ -59,8 +59,7 @@ def send_json_with_retry(self, message): except zmq.Again: logger.warning(f"Send timeout (attempt {attempt + 1}/5)") time.sleep(0.5) - logger.error("Failed to send after retries.") - return + raise TimeoutError(f"ZMQ send failed after 5 retries on {self.address}") def recv_json_with_retry(self): """Receive JSON message with retries if timeout occurs.""" @@ -70,8 +69,7 @@ def recv_json_with_retry(self): except zmq.Again: logger.warning(f"Receive timeout (attempt {attempt + 1}/5)") time.sleep(0.5) - logger.error("Failed to receive after retries.") - return None + raise TimeoutError(f"ZMQ recv failed after 5 retries on {self.address}") def init_zmq_port(mod, port_name, port_type, address, socket_type_str): @@ -282,6 +280,10 @@ def read(mod, port_identifier, name, initstr_val): return message[1:], True last_read_status = "SUCCESS" return message, True + except TimeoutError as e: + logger.error(f"ZMQ recv timeout on port {port_identifier} (name: {name}): {e}. Returning default.") + last_read_status = "TIMEOUT" + return default_return_val, False except zmq.error.ZMQError as e: logger.error(f"ZMQ read error on port {port_identifier} (name: {name}): {e}. Returning default.") last_read_status = "TIMEOUT" @@ -384,6 +386,8 @@ def write(mod, port_identifier, name, val, delta=0): # Mutation breaks cross-language determinism (see issue #385). else: zmq_p.send_json_with_retry(zmq_val) + except TimeoutError as e: + logger.error(f"ZMQ send timeout on port {port_identifier} (name: {name}): {e}") except zmq.error.ZMQError as e: logger.error(f"ZMQ write error on port {port_identifier} (name: {name}): {e}") except Exception as e: diff --git a/tests/test_concore.py b/tests/test_concore.py index b1a980e..da0746e 100644 --- a/tests/test_concore.py +++ b/tests/test_concore.py @@ -1,6 +1,7 @@ import pytest import os import numpy as np +from unittest.mock import patch class TestSafeLiteralEval: @@ -450,3 +451,97 @@ def test_write_timestamp_matches_cpp_semantics(self, temp_dir): "After 3 writes with delta=1 simtime must remain 0 " "(matching C++/MATLAB/Verilog); got %s" % concore.simtime ) + + +# =================================================================== +# ZMQ Retry Exhaustion Tests (Issue #393) +# =================================================================== + + +class TestZMQRetryExhaustion: + """Tests for issue #393 — TimeoutError on retry exhaustion.""" + + @pytest.fixture(autouse=True) + def reset_zmq_ports(self): + import concore + + original_ports = concore.zmq_ports.copy() + yield + concore.zmq_ports.clear() + concore.zmq_ports.update(original_ports) + + @pytest.fixture(autouse=True) + def reset_simtime(self): + import concore + + old_simtime = concore.simtime + yield + concore.simtime = old_simtime + + @patch("concore_base.time.sleep") + def test_recv_json_with_retry_raises_timeout_error(self, mock_sleep): + """recv_json_with_retry must raise TimeoutError after 5 failed attempts.""" + from concore import ZeroMQPort + from unittest.mock import MagicMock, patch + import zmq + + with patch.object(ZeroMQPort, "__init__", lambda self, *a, **kw: None): + port = ZeroMQPort.__new__(ZeroMQPort) + port.socket = MagicMock() + port.socket.recv_json.side_effect = zmq.Again() + port.address = "tcp://test:5555" + + with pytest.raises(TimeoutError, match="ZMQ recv failed after 5 retries"): + port.recv_json_with_retry() + + assert port.socket.recv_json.call_count == 5 + + @patch("concore_base.time.sleep") + def test_send_json_with_retry_raises_timeout_error(self, mock_sleep): + """send_json_with_retry must raise TimeoutError after 5 failed attempts.""" + from concore import ZeroMQPort + from unittest.mock import MagicMock, patch + import zmq + + with patch.object(ZeroMQPort, "__init__", lambda self, *a, **kw: None): + port = ZeroMQPort.__new__(ZeroMQPort) + port.socket = MagicMock() + port.socket.send_json.side_effect = zmq.Again() + port.address = "tcp://test:5555" + + with pytest.raises(TimeoutError, match="ZMQ send failed after 5 retries"): + port.send_json_with_retry({"test": "data"}) + + assert port.socket.send_json.call_count == 5 + + def test_read_returns_default_on_zmq_timeout(self): + """read() must return default_return_val when recv exhausts retries, not None.""" + import concore + + class MockZMQPort: + def recv_json_with_retry(self): + raise TimeoutError("ZMQ recv failed after 5 retries on tcp://test:5555") + + concore.zmq_ports["test_timeout_port"] = MockZMQPort() + concore.simtime = 0 + + result, ok = concore.read("test_timeout_port", "test_name", "[1.0, 2.0]") + + assert result == [1.0, 2.0], ( + "read() must return default_return_val on TimeoutError, got %s" % result + ) + assert ok is False + + def test_write_does_not_crash_on_zmq_send_timeout(self): + """write() must handle TimeoutError from send gracefully.""" + import concore + + class MockZMQPort: + def send_json_with_retry(self, message): + raise TimeoutError("ZMQ send failed after 5 retries on tcp://test:5555") + + concore.zmq_ports["test_timeout_port"] = MockZMQPort() + concore.simtime = 0 + + # Should not raise — just log the error + concore.write("test_timeout_port", "test_name", [1.0, 2.0]) diff --git a/tests/test_concoredocker.py b/tests/test_concoredocker.py index 40d6808..4399c0e 100644 --- a/tests/test_concoredocker.py +++ b/tests/test_concoredocker.py @@ -247,3 +247,61 @@ def recv_json_with_retry(self): assert result == original assert ok is True + + +# =================================================================== +# ZMQ Retry Exhaustion Tests (Issue #393) +# =================================================================== + + +class TestZMQRetryExhaustion: + """Tests for issue #393 — TimeoutError on retry exhaustion via concoredocker.""" + + @pytest.fixture(autouse=True) + def reset_zmq_ports(self): + import concoredocker + + original_ports = concoredocker.zmq_ports.copy() + yield + concoredocker.zmq_ports.clear() + concoredocker.zmq_ports.update(original_ports) + + @pytest.fixture(autouse=True) + def reset_simtime(self): + import concoredocker + + old_simtime = concoredocker.simtime + yield + concoredocker.simtime = old_simtime + + def test_read_returns_default_on_zmq_timeout(self): + """read() must return default_return_val when recv exhausts retries, not None.""" + import concoredocker + + class MockZMQPort: + def recv_json_with_retry(self): + raise TimeoutError("ZMQ recv failed after 5 retries on tcp://test:5555") + + concoredocker.zmq_ports["test_timeout_port"] = MockZMQPort() + concoredocker.simtime = 0 + + result, ok = concoredocker.read("test_timeout_port", "test_name", "[1.0, 2.0]") + + assert result == [1.0, 2.0], ( + "read() must return default_return_val on TimeoutError, got %s" % result + ) + assert ok is False + + def test_write_does_not_crash_on_zmq_send_timeout(self): + """write() must handle TimeoutError from send gracefully.""" + import concoredocker + + class MockZMQPort: + def send_json_with_retry(self, message): + raise TimeoutError("ZMQ send failed after 5 retries on tcp://test:5555") + + concoredocker.zmq_ports["test_timeout_port"] = MockZMQPort() + concoredocker.simtime = 0 + + # Should not raise — just log the error + concoredocker.write("test_timeout_port", "test_name", [1.0, 2.0]) diff --git a/tests/test_read_status.py b/tests/test_read_status.py index 54dc4b8..c2a6bb0 100644 --- a/tests/test_read_status.py +++ b/tests/test_read_status.py @@ -6,13 +6,13 @@ import os import pytest -import numpy as np # --------------------------------------------------------------------------- # Helpers # --------------------------------------------------------------------------- + class DummyZMQPort: """Minimal stand-in for ZeroMQPort used in ZMQ read tests.""" @@ -33,14 +33,16 @@ def recv_json_with_retry(self): # File-based read tests # --------------------------------------------------------------------------- + class TestReadFileSuccess: """read() on a valid file returns (data, True) with SUCCESS status.""" @pytest.fixture(autouse=True) def setup(self, temp_dir, monkeypatch): import concore + self.concore = concore - monkeypatch.setattr(concore, 'delay', 0) + monkeypatch.setattr(concore, "delay", 0) # Create ./in1/ym with valid data: [simtime, value] in_dir = os.path.join(temp_dir, "in1") @@ -48,7 +50,7 @@ def setup(self, temp_dir, monkeypatch): with open(os.path.join(in_dir, "ym"), "w") as f: f.write("[10, 3.14]") - monkeypatch.setattr(concore, 'inpath', os.path.join(temp_dir, "in")) + monkeypatch.setattr(concore, "inpath", os.path.join(temp_dir, "in")) def test_returns_data_and_true(self): data, ok = self.concore.read(1, "ym", "[0, 0.0]") @@ -66,10 +68,11 @@ class TestReadFileMissing: @pytest.fixture(autouse=True) def setup(self, temp_dir, monkeypatch): import concore + self.concore = concore - monkeypatch.setattr(concore, 'delay', 0) + monkeypatch.setattr(concore, "delay", 0) # Point to a directory that does NOT have the file - monkeypatch.setattr(concore, 'inpath', os.path.join(temp_dir, "in")) + monkeypatch.setattr(concore, "inpath", os.path.join(temp_dir, "in")) def test_returns_default_and_false(self): data, ok = self.concore.read(1, "nonexistent", "[0, 0.0]") @@ -86,15 +89,16 @@ class TestReadFileParseError: @pytest.fixture(autouse=True) def setup(self, temp_dir, monkeypatch): import concore + self.concore = concore - monkeypatch.setattr(concore, 'delay', 0) + monkeypatch.setattr(concore, "delay", 0) in_dir = os.path.join(temp_dir, "in1") os.makedirs(in_dir, exist_ok=True) with open(os.path.join(in_dir, "ym"), "w") as f: f.write("NOT_VALID_PYTHON{{{") - monkeypatch.setattr(concore, 'inpath', os.path.join(temp_dir, "in")) + monkeypatch.setattr(concore, "inpath", os.path.join(temp_dir, "in")) def test_returns_default_and_false(self): data, ok = self.concore.read(1, "ym", "[0, 0.0]") @@ -111,16 +115,16 @@ class TestReadFileRetriesExceeded: @pytest.fixture(autouse=True) def setup(self, temp_dir, monkeypatch): import concore + self.concore = concore - monkeypatch.setattr(concore, 'delay', 0) + monkeypatch.setattr(concore, "delay", 0) # Create an empty file in_dir = os.path.join(temp_dir, "in1") os.makedirs(in_dir, exist_ok=True) - with open(os.path.join(in_dir, "ym"), "w") as f: - pass # empty + open(os.path.join(in_dir, "ym"), "w").close() # empty - monkeypatch.setattr(concore, 'inpath', os.path.join(temp_dir, "in")) + monkeypatch.setattr(concore, "inpath", os.path.join(temp_dir, "in")) def test_returns_default_and_false(self): data, ok = self.concore.read(1, "ym", "[0, 0.0]") @@ -135,12 +139,14 @@ def test_last_read_status_is_retries_exceeded(self): # ZMQ read tests # --------------------------------------------------------------------------- + class TestReadZMQSuccess: """Successful ZMQ read returns (data, True).""" @pytest.fixture(autouse=True) def setup(self, monkeypatch): import concore + self.concore = concore self.original_ports = concore.zmq_ports.copy() yield @@ -164,6 +170,7 @@ class TestReadZMQTimeout: @pytest.fixture(autouse=True) def setup(self, monkeypatch): import concore + self.concore = concore self.original_ports = concore.zmq_ports.copy() yield @@ -185,6 +192,7 @@ class TestReadZMQError: @pytest.fixture(autouse=True) def setup(self, monkeypatch): import concore + self.concore = concore self.original_ports = concore.zmq_ports.copy() yield @@ -193,6 +201,7 @@ def setup(self, monkeypatch): def test_zmq_error_returns_default_and_false(self): import zmq + dummy = DummyZMQPort(raise_on_recv=zmq.error.ZMQError("test error")) self.concore.zmq_ports["test_port"] = dummy @@ -205,21 +214,23 @@ def test_zmq_error_returns_default_and_false(self): # Backward compatibility # --------------------------------------------------------------------------- + class TestReadBackwardCompatibility: """Legacy callers can use isinstance check on the result.""" @pytest.fixture(autouse=True) def setup(self, temp_dir, monkeypatch): import concore + self.concore = concore - monkeypatch.setattr(concore, 'delay', 0) + monkeypatch.setattr(concore, "delay", 0) in_dir = os.path.join(temp_dir, "in1") os.makedirs(in_dir, exist_ok=True) with open(os.path.join(in_dir, "ym"), "w") as f: f.write("[10, 42.0]") - monkeypatch.setattr(concore, 'inpath', os.path.join(temp_dir, "in")) + monkeypatch.setattr(concore, "inpath", os.path.join(temp_dir, "in")) def test_legacy_unpack_pattern(self): """The recommended migration pattern works correctly.""" @@ -245,17 +256,24 @@ def test_tuple_unpack(self): # last_read_status exposed on module # --------------------------------------------------------------------------- + class TestLastReadStatusExposed: """concore.last_read_status is publicly accessible.""" def test_attribute_exists(self): import concore - assert hasattr(concore, 'last_read_status') + + assert hasattr(concore, "last_read_status") def test_initial_value_is_success(self): import concore + # Before any read, default is SUCCESS assert concore.last_read_status in ( - "SUCCESS", "FILE_NOT_FOUND", "TIMEOUT", - "PARSE_ERROR", "EMPTY_DATA", "RETRIES_EXCEEDED", + "SUCCESS", + "FILE_NOT_FOUND", + "TIMEOUT", + "PARSE_ERROR", + "EMPTY_DATA", + "RETRIES_EXCEEDED", )