Files
cobol-java-v3/tests/agents/test_llm_deep.py
T
hangshuo652 bc1d56d1a4 feat: Phase 2 complete — 13 Phases of COBOL type classification and test benchmark
P0.6: gcov infrastructure
P1: extract_structure output expansion (11 new feature fields)
P2: Confusion group rule engine (8 pairs + contradiction + backtrack)
P3: 4-factor confidence calculation + quality gate update
P4: 33+2 COBOL program type test samples (22 files, 7 categories)
P5: parametrized/ test data generation engine
P6: japanese_data.py lookup tables
P7-10: Type-specific test suites (~159 parametrized tests)
P11: Full classification pipeline (classify_program) + orchestrator integration
P12: Documentation (module-interfaces, test-plan v3.0, coverage-matrix)

Architecture decisions:
- classification_pipeline/ merged to hina/pipeline/
- parametrized/ as independent module
- japanese_data.py as root-level file
- hina/__all__ only exports classify_program()

Co-Authored-By: Claude <noreply@anthropic.com>
2026-06-19 23:51:55 +08:00

266 lines
11 KiB
Python

"""LLMClient deep resilience testing — HTTP status codes, cache failures, concurrency, retries."""
import sys, os, json, time, threading, tempfile
from pathlib import Path
from unittest.mock import MagicMock, patch
import httpx
import pytest
sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), "..", "..")))
from agents.llm import LLMClient
def _llm_client(cache_dir=None):
if cache_dir is None:
cache_dir = tempfile.mkdtemp()
return LLMClient(model="test", cache_dir=cache_dir)
def _mock_response(content="resp"):
m = MagicMock()
m.json.return_value = {"choices": [{"message": {"content": content}}]}
m.raise_for_status.return_value = None
return m
def _make_http_error(status_code, message=None):
"""Build an httpx.HTTPStatusError that raise_for_status can raise."""
request = httpx.Request("POST", "http://localhost/chat/completions")
response = httpx.Response(status_code=status_code, request=request)
return httpx.HTTPStatusError(
message or f"{status_code} error",
request=request,
response=response,
)
# ══════════════════════════════════════════════════════════════════════
# HTTP Status Code Handling
# ══════════════════════════════════════════════════════════════════════
def test_401_unauthorized():
"""401 Unauthorized -> exception propagates with correct status code"""
client = _llm_client()
error = _make_http_error(401, "Unauthorized")
resp = _mock_response()
resp.raise_for_status.side_effect = error
with patch("httpx.post", return_value=resp):
with pytest.raises(httpx.HTTPStatusError) as exc:
client.call([{"role": "user", "content": "hi"}], retries=0)
assert exc.value.response.status_code == 401
def test_429_rate_limit():
"""429 Rate Limit -> exception propagates after retries exhausted"""
client = _llm_client()
error = _make_http_error(429, "Too Many Requests")
resp = _mock_response()
resp.raise_for_status.side_effect = error
with patch("httpx.post", return_value=resp):
with pytest.raises(httpx.HTTPStatusError) as exc:
client.call([{"role": "user", "content": "hi"}], retries=1)
assert exc.value.response.status_code == 429
def test_503_service_unavailable():
"""503 Service Unavailable -> exception propagates with correct status code"""
client = _llm_client()
error = _make_http_error(503, "Service Unavailable")
resp = _mock_response()
resp.raise_for_status.side_effect = error
with patch("httpx.post", return_value=resp):
with pytest.raises(httpx.HTTPStatusError) as exc:
client.call([{"role": "user", "content": "hi"}], retries=0)
assert exc.value.response.status_code == 503
def test_network_timeout():
"""httpx.TimeoutException -> exception propagates"""
client = _llm_client()
with patch("httpx.post", side_effect=httpx.TimeoutException("Connection timed out")):
with pytest.raises(httpx.TimeoutException):
client.call([{"role": "user", "content": "hi"}], retries=0)
# ══════════════════════════════════════════════════════════════════════
# Cache Behaviors
# ══════════════════════════════════════════════════════════════════════
def test_cache_disk_full_falls_through():
"""Cache disk full (_set raises OSError) -> call() retries and still returns value"""
with tempfile.TemporaryDirectory() as tmp:
client = _llm_client(tmp)
original_set = client._set
set_attempts = [0]
def flaky_set(k, v):
set_attempts[0] += 1
if set_attempts[0] <= 1:
raise OSError("No space left on device")
original_set(k, v)
with patch("httpx.post", return_value=_mock_response("hello")):
with patch.object(client, "_set", side_effect=flaky_set):
result = client.call([{"role": "user", "content": "hi"}], retries=1)
assert result == "hello"
# First _set call failed (caught by retry), second succeeded
assert set_attempts[0] == 2
def test_cache_corrupted_file():
"""Corrupted cache .json -> cache miss, API called instead"""
with tempfile.TemporaryDirectory() as tmp:
client = _llm_client(tmp)
messages = [{"role": "user", "content": "corrupt-test"}]
# Write a corrupted JSON file where the cache entry would be
k = client._key(messages)
cache_path = Path(tmp) / f"{k}.json"
cache_path.write_text("not valid json{{{")
with patch("httpx.post", return_value=_mock_response("from-api")) as mock_post:
result = client.call(messages, retries=0)
assert result == "from-api"
mock_post.assert_called_once()
def test_multiple_cache_files():
"""Multiple distinct messages create separate cache files with correct key structure"""
with tempfile.TemporaryDirectory() as tmp:
client = _llm_client(tmp)
msgs_a = [{"role": "user", "content": "alpha"}]
msgs_b = [{"role": "user", "content": "beta"}]
with patch("httpx.post", side_effect=[_mock_response("resp-a"), _mock_response("resp-b")]):
client.call(msgs_a, retries=0)
client.call(msgs_b, retries=0)
cached = list(Path(tmp).iterdir())
assert len(cached) == 2
keys = {p.stem for p in cached}
assert client._key(msgs_a) in keys
assert client._key(msgs_b) in keys
# Each file is valid JSON with the expected structure
for p in cached:
data = json.loads(p.read_text())
assert "response" in data
def test_empty_cache_dir_on_init():
"""Init with fresh empty directory -> mkdir creates it; re-init with existing dir works"""
with tempfile.TemporaryDirectory() as tmp:
cache_sub = Path(tmp) / "nested" / "cache"
assert not cache_sub.exists()
client = LLMClient(model="test", cache_dir=str(cache_sub))
assert cache_sub.exists()
assert cache_sub.is_dir()
# Second init with same directory (exist_ok=True) should not fail
client2 = LLMClient(model="test", cache_dir=str(cache_sub))
assert cache_sub.exists()
# ══════════════════════════════════════════════════════════════════════
# Concurrency
# ══════════════════════════════════════════════════════════════════════
def test_concurrent_same_message():
"""Two threads calling call() with same message -> both return same result"""
with tempfile.TemporaryDirectory() as tmp:
client = _llm_client(tmp)
messages = [{"role": "user", "content": "concurrent"}]
call_count_lock = threading.Lock()
api_call_count = [0]
def api_side(*a, **kw):
with call_count_lock:
api_call_count[0] += 1
time.sleep(0.05) # small delay so threads overlap
return _mock_response("shared-result")
results = [None, None]
errors = [None, None]
barrier = threading.Barrier(2, timeout=5)
def _call(idx):
try:
barrier.wait() # both threads start simultaneously
results[idx] = client.call(messages, retries=0)
except Exception as e:
errors[idx] = e
with patch("httpx.post", side_effect=api_side):
t1 = threading.Thread(target=_call, args=(0,))
t2 = threading.Thread(target=_call, args=(1,))
t1.start()
t2.start()
t1.join()
t2.join()
assert errors[0] is None, f"Thread 0 error: {errors[0]}"
assert errors[1] is None, f"Thread 1 error: {errors[1]}"
assert results[0] == "shared-result"
assert results[1] == "shared-result"
# With the barrier both threads race through _get before either writes,
# so both make an API call. Correctness (same result) is the key assertion.
assert api_call_count[0] == 2
# ══════════════════════════════════════════════════════════════════════
# Retry Behavior
# ══════════════════════════════════════════════════════════════════════
def test_retry_3_two_fail_then_success():
"""retries=3, first 2 call attempts fail, 3rd succeeds -> result from 3rd"""
with tempfile.TemporaryDirectory() as tmp:
client = _llm_client(tmp)
call_n = [0]
def _side(*a, **kw):
call_n[0] += 1
if call_n[0] <= 2:
raise Exception(f"fail #{call_n[0]}")
return _mock_response("ok-on-3rd")
with patch("httpx.post", side_effect=_side):
result = client.call([{"role": "user", "content": "x"}], retries=3)
assert result == "ok-on-3rd"
assert call_n[0] == 3 # exactly 3 attempts made
def test_retries_0_immediate_failure():
"""retries=0, first call fails -> immediate exception"""
client = _llm_client()
with patch("httpx.post", side_effect=ValueError("api exploded")):
with pytest.raises(ValueError, match="api exploded"):
client.call([{"role": "user", "content": "x"}], retries=0)
def test_cache_hit_then_eviction_then_retry():
"""Cache hit -> eviction -> cache miss -> API first fail -> retry succeed"""
with tempfile.TemporaryDirectory() as tmp:
client = _llm_client(tmp)
messages = [{"role": "user", "content": "evict-and-retry"}]
k = client._key(messages)
cache_path = Path(tmp) / f"{k}.json"
# Prime cache with a known value
cache_path.write_text(json.dumps({"response": "cached"}))
# Verify cache hit (no API call made)
with patch("httpx.post") as mock_post:
r1 = client.call(messages, retries=0)
assert r1 == "cached"
mock_post.assert_not_called()
# Evict the cache file
cache_path.unlink()
# Now: cache miss -> first API call fails -> retry succeeds
call_n = [0]
def _side(*a, **kw):
call_n[0] += 1
if call_n[0] == 1:
raise Exception("first fail after eviction")
return _mock_response("after-eviction-ok")
with patch("httpx.post", side_effect=_side):
r2 = client.call(messages, retries=1)
assert r2 == "after-eviction-ok"