Files
cobol-java-v3/tests/test_worker_deep.py
T
hangshuo652 bc1d56d1a4 feat: Phase 2 complete — 13 Phases of COBOL type classification and test benchmark
P0.6: gcov infrastructure
P1: extract_structure output expansion (11 new feature fields)
P2: Confusion group rule engine (8 pairs + contradiction + backtrack)
P3: 4-factor confidence calculation + quality gate update
P4: 33+2 COBOL program type test samples (22 files, 7 categories)
P5: parametrized/ test data generation engine
P6: japanese_data.py lookup tables
P7-10: Type-specific test suites (~159 parametrized tests)
P11: Full classification pipeline (classify_program) + orchestrator integration
P12: Documentation (module-interfaces, test-plan v3.0, coverage-matrix)

Architecture decisions:
- classification_pipeline/ merged to hina/pipeline/
- parametrized/ as independent module
- japanese_data.py as root-level file
- hina/__all__ only exports classify_program()

Co-Authored-By: Claude <noreply@anthropic.com>
2026-06-19 23:51:55 +08:00

308 lines
12 KiB
Python

"""Deep Web Worker state machine and concurrency testing.
Covers advanced state machine transitions, partial-write recovery,
exception truncation, empty-directory resilience, and concurrency
hazards (file deletion during processing).
"""
import sys, os, json, tempfile
from pathlib import Path
from unittest.mock import patch, MagicMock
sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), "..")))
from web.worker import main as worker_main
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
def _write_task(tasks_dir, task_id, status="queued", runner="native"):
"""Write a standard task JSON file into *tasks_dir*."""
data = {
"id": task_id,
"status": status,
"runner": runner,
"copybook": f"/tmp/{task_id}/copybook.cpy",
"cobol_src": f"/tmp/{task_id}/program.cbl",
"java_src": f"/tmp/{task_id}/java",
"mapping": f"/tmp/{task_id}/mapping.yaml",
}
(tasks_dir / f"{task_id}.json").write_text(json.dumps(data), encoding="utf-8")
def _mock_vr(**overrides):
"""Build a standard MagicMock shaped like a VerificationRun."""
defaults = dict(
program="T",
status="PASS",
fields_matched=5,
fields_mismatched=0,
duration_s=0.5,
runner="native",
field_results=[],
debug={},
report_path=None,
)
defaults.update(overrides)
return MagicMock(**defaults)
# ---------------------------------------------------------------------------
# DEEP-01: Task state machine -- strict transitions
# ---------------------------------------------------------------------------
def test_deep_queued_to_done():
"""queued -> running -> done: the happy path."""
with tempfile.TemporaryDirectory() as tmp:
tasks_dir = Path(tmp)
_write_task(tasks_dir, "t001")
vr = _mock_vr(program="T1", status="PASS", fields_matched=10)
with patch("web.worker.TASKS_DIR", tasks_dir), \
patch("config.Config") as mock_cfg, \
patch("orchestrator.run_pipeline", return_value=vr), \
patch("web.worker.time") as mock_time:
mock_time.sleep.side_effect = KeyboardInterrupt
mock_cfg.return_value = MagicMock()
try:
worker_main()
except KeyboardInterrupt:
pass
data = json.loads((tasks_dir / "t001.json").read_text())
assert data["status"] == "done"
assert data["result"]["program"] == "T1"
assert data["result"]["status"] == "PASS"
assert data["result"]["matched"] == 10
assert "fields" in data
assert "debug" in data
def test_deep_queued_to_error():
"""queued -> error when the pipeline itself raises."""
with tempfile.TemporaryDirectory() as tmp:
tasks_dir = Path(tmp)
_write_task(tasks_dir, "e001")
with patch("web.worker.TASKS_DIR", tasks_dir), \
patch("config.Config") as mock_cfg, \
patch("orchestrator.run_pipeline",
side_effect=Exception("pipeline crashed")), \
patch("web.worker.time") as mock_time:
mock_time.sleep.side_effect = KeyboardInterrupt
mock_cfg.return_value = MagicMock()
try:
worker_main()
except KeyboardInterrupt:
pass
data = json.loads((tasks_dir / "e001.json").read_text())
assert data["status"] == "error"
assert "pipeline crashed" in data["result"]
def test_deep_running_skipped():
"""A task already in 'running' state is never re-processed."""
with tempfile.TemporaryDirectory() as tmp:
tasks_dir = Path(tmp)
_write_task(tasks_dir, "r001", status="running")
with patch("web.worker.TASKS_DIR", tasks_dir), \
patch("web.worker.time") as mock_time:
mock_time.sleep.side_effect = KeyboardInterrupt
try:
worker_main()
except KeyboardInterrupt:
pass
data = json.loads((tasks_dir / "r001.json").read_text())
assert data["status"] == "running" # untouched
def test_deep_done_skipped():
"""A task already in 'done' state is never re-processed."""
with tempfile.TemporaryDirectory() as tmp:
tasks_dir = Path(tmp)
_write_task(tasks_dir, "d001", status="done")
with patch("web.worker.TASKS_DIR", tasks_dir), \
patch("web.worker.time") as mock_time:
mock_time.sleep.side_effect = KeyboardInterrupt
try:
worker_main()
except KeyboardInterrupt:
pass
data = json.loads((tasks_dir / "d001.json").read_text())
assert data["status"] == "done" # untouched
# ---------------------------------------------------------------------------
# DEEP-02: Mixed states in a single polling iteration
# ---------------------------------------------------------------------------
def test_deep_mixed_states_only_queued_processed():
"""Only 'queued' tasks are processed when 'running'+'done' also present."""
with tempfile.TemporaryDirectory() as tmp:
tasks_dir = Path(tmp)
_write_task(tasks_dir, "z_done", status="done")
_write_task(tasks_dir, "q_queued", status="queued")
_write_task(tasks_dir, "m_running", status="running")
vr = _mock_vr()
with patch("web.worker.TASKS_DIR", tasks_dir), \
patch("config.Config") as mock_cfg, \
patch("orchestrator.run_pipeline", return_value=vr), \
patch("web.worker.time") as mock_time:
mock_time.sleep.side_effect = KeyboardInterrupt
mock_cfg.return_value = MagicMock()
try:
worker_main()
except KeyboardInterrupt:
pass
# queued -> done
q = json.loads((tasks_dir / "q_queued.json").read_text())
assert q["status"] == "done"
# running unchanged (still "running")
m = json.loads((tasks_dir / "m_running.json").read_text())
assert m["status"] == "running"
# done unchanged (still "done")
z = json.loads((tasks_dir / "z_done.json").read_text())
assert z["status"] == "done"
# ---------------------------------------------------------------------------
# DEEP-03: Partial-write recovery (missing required key)
# ---------------------------------------------------------------------------
def test_deep_partial_write_missing_copybook():
"""Valid JSON missing 'copybook' -> status=error; pipeline never called."""
with tempfile.TemporaryDirectory() as tmp:
tasks_dir = Path(tmp)
# A syntactically-valid task file that lacks the mandatory "copybook" key
data = {
"id": "partial1",
"status": "queued",
"runner": "native",
"cobol_src": "/tmp/x/program.cbl",
"java_src": "/tmp/x/java",
"mapping": "/tmp/x/mapping.yaml",
}
(tasks_dir / "partial1.json").write_text(json.dumps(data), encoding="utf-8")
with patch("web.worker.TASKS_DIR", tasks_dir), \
patch("config.Config") as mock_cfg, \
patch("orchestrator.run_pipeline") as mock_run, \
patch("web.worker.time") as mock_time:
mock_time.sleep.side_effect = KeyboardInterrupt
mock_cfg.return_value = MagicMock()
try:
worker_main()
except KeyboardInterrupt:
pass
result = json.loads((tasks_dir / "partial1.json").read_text())
assert result["status"] == "error"
# KeyError message contains 'copybook'
assert "copybook" in result["result"]
# The KeyError is raised during argument evaluation of the
# run_pipeline() call, so the function itself is never invoked.
mock_run.assert_not_called()
# ---------------------------------------------------------------------------
# DEEP-04: Pipeline exception message truncation to 500 characters
# ---------------------------------------------------------------------------
def test_deep_exception_truncation():
"""Exception message longer than 500 chars is truncated."""
with tempfile.TemporaryDirectory() as tmp:
tasks_dir = Path(tmp)
_write_task(tasks_dir, "trunc001")
long_msg = "X" * 1000
with patch("web.worker.TASKS_DIR", tasks_dir), \
patch("config.Config") as mock_cfg, \
patch("orchestrator.run_pipeline",
side_effect=Exception(long_msg)), \
patch("web.worker.time") as mock_time:
mock_time.sleep.side_effect = KeyboardInterrupt
mock_cfg.return_value = MagicMock()
try:
worker_main()
except KeyboardInterrupt:
pass
data = json.loads((tasks_dir / "trunc001.json").read_text())
assert data["status"] == "error"
assert len(data["result"]) == 500
assert data["result"] == "X" * 500
# ---------------------------------------------------------------------------
# DEEP-05: Empty tasks directory over multiple loop iterations
# ---------------------------------------------------------------------------
def test_deep_empty_dir_multiple_loops():
"""No task files across two loop iterations -> no crash."""
with tempfile.TemporaryDirectory() as tmp:
tasks_dir = Path(tmp)
with patch("web.worker.TASKS_DIR", tasks_dir), \
patch("web.worker.time") as mock_time:
# First sleep succeeds (returns None), second raises exit signal
mock_time.sleep.side_effect = [None, KeyboardInterrupt]
try:
worker_main()
except KeyboardInterrupt:
pass
# Exactly two loop iterations executed
assert mock_time.sleep.call_count == 2
for call_args in mock_time.sleep.call_args_list:
assert call_args == ((2,),)
# ---------------------------------------------------------------------------
# DEEP-06: File deleted between read and write (FileNotFoundError)
# ---------------------------------------------------------------------------
def test_deep_file_deleted_during_write():
"""FileNotFoundError on write_text() is caught gracefully."""
with tempfile.TemporaryDirectory() as tmp:
tasks_dir = Path(tmp)
_write_task(tasks_dir, "t001")
vr = _mock_vr()
call_count = [0]
_orig_write = Path.write_text
def _failing_write(self, *args, **kwargs):
call_count[0] += 1
if call_count[0] == 1:
# First call: write "running" status -> proceed normally
return _orig_write(self, *args, **kwargs)
# Subsequent calls: simulate the file disappearing
raise FileNotFoundError(f"No such file: {self}")
with patch("web.worker.TASKS_DIR", tasks_dir), \
patch("config.Config") as mock_cfg, \
patch("orchestrator.run_pipeline", return_value=vr), \
patch("web.worker.time") as mock_time, \
patch.object(Path, "write_text", _failing_write):
mock_time.sleep.side_effect = KeyboardInterrupt
mock_cfg.return_value = MagicMock()
try:
worker_main()
except KeyboardInterrupt:
pass
# The first write ("running") persisted; the "done" / "error" writes
# were skipped without crashing the worker.
data = json.loads((tasks_dir / "t001.json").read_text())
assert data["status"] == "running"
assert call_count[0] >= 2