bc1d56d1a4
P0.6: gcov infrastructure P1: extract_structure output expansion (11 new feature fields) P2: Confusion group rule engine (8 pairs + contradiction + backtrack) P3: 4-factor confidence calculation + quality gate update P4: 33+2 COBOL program type test samples (22 files, 7 categories) P5: parametrized/ test data generation engine P6: japanese_data.py lookup tables P7-10: Type-specific test suites (~159 parametrized tests) P11: Full classification pipeline (classify_program) + orchestrator integration P12: Documentation (module-interfaces, test-plan v3.0, coverage-matrix) Architecture decisions: - classification_pipeline/ merged to hina/pipeline/ - parametrized/ as independent module - japanese_data.py as root-level file - hina/__all__ only exports classify_program() Co-Authored-By: Claude <noreply@anthropic.com>
266 lines
11 KiB
Python
266 lines
11 KiB
Python
"""LLMClient deep resilience testing — HTTP status codes, cache failures, concurrency, retries."""
|
|
import sys, os, json, time, threading, tempfile
|
|
from pathlib import Path
|
|
from unittest.mock import MagicMock, patch
|
|
import httpx
|
|
import pytest
|
|
|
|
sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), "..", "..")))
|
|
from agents.llm import LLMClient
|
|
|
|
|
|
def _llm_client(cache_dir=None):
|
|
if cache_dir is None:
|
|
cache_dir = tempfile.mkdtemp()
|
|
return LLMClient(model="test", cache_dir=cache_dir)
|
|
|
|
|
|
def _mock_response(content="resp"):
|
|
m = MagicMock()
|
|
m.json.return_value = {"choices": [{"message": {"content": content}}]}
|
|
m.raise_for_status.return_value = None
|
|
return m
|
|
|
|
|
|
def _make_http_error(status_code, message=None):
|
|
"""Build an httpx.HTTPStatusError that raise_for_status can raise."""
|
|
request = httpx.Request("POST", "http://localhost/chat/completions")
|
|
response = httpx.Response(status_code=status_code, request=request)
|
|
return httpx.HTTPStatusError(
|
|
message or f"{status_code} error",
|
|
request=request,
|
|
response=response,
|
|
)
|
|
|
|
|
|
# ══════════════════════════════════════════════════════════════════════
|
|
# HTTP Status Code Handling
|
|
# ══════════════════════════════════════════════════════════════════════
|
|
|
|
def test_401_unauthorized():
|
|
"""401 Unauthorized -> exception propagates with correct status code"""
|
|
client = _llm_client()
|
|
error = _make_http_error(401, "Unauthorized")
|
|
resp = _mock_response()
|
|
resp.raise_for_status.side_effect = error
|
|
with patch("httpx.post", return_value=resp):
|
|
with pytest.raises(httpx.HTTPStatusError) as exc:
|
|
client.call([{"role": "user", "content": "hi"}], retries=0)
|
|
assert exc.value.response.status_code == 401
|
|
|
|
|
|
def test_429_rate_limit():
|
|
"""429 Rate Limit -> exception propagates after retries exhausted"""
|
|
client = _llm_client()
|
|
error = _make_http_error(429, "Too Many Requests")
|
|
resp = _mock_response()
|
|
resp.raise_for_status.side_effect = error
|
|
with patch("httpx.post", return_value=resp):
|
|
with pytest.raises(httpx.HTTPStatusError) as exc:
|
|
client.call([{"role": "user", "content": "hi"}], retries=1)
|
|
assert exc.value.response.status_code == 429
|
|
|
|
|
|
def test_503_service_unavailable():
|
|
"""503 Service Unavailable -> exception propagates with correct status code"""
|
|
client = _llm_client()
|
|
error = _make_http_error(503, "Service Unavailable")
|
|
resp = _mock_response()
|
|
resp.raise_for_status.side_effect = error
|
|
with patch("httpx.post", return_value=resp):
|
|
with pytest.raises(httpx.HTTPStatusError) as exc:
|
|
client.call([{"role": "user", "content": "hi"}], retries=0)
|
|
assert exc.value.response.status_code == 503
|
|
|
|
|
|
def test_network_timeout():
|
|
"""httpx.TimeoutException -> exception propagates"""
|
|
client = _llm_client()
|
|
with patch("httpx.post", side_effect=httpx.TimeoutException("Connection timed out")):
|
|
with pytest.raises(httpx.TimeoutException):
|
|
client.call([{"role": "user", "content": "hi"}], retries=0)
|
|
|
|
|
|
# ══════════════════════════════════════════════════════════════════════
|
|
# Cache Behaviors
|
|
# ══════════════════════════════════════════════════════════════════════
|
|
|
|
def test_cache_disk_full_falls_through():
|
|
"""Cache disk full (_set raises OSError) -> call() retries and still returns value"""
|
|
with tempfile.TemporaryDirectory() as tmp:
|
|
client = _llm_client(tmp)
|
|
original_set = client._set
|
|
set_attempts = [0]
|
|
|
|
def flaky_set(k, v):
|
|
set_attempts[0] += 1
|
|
if set_attempts[0] <= 1:
|
|
raise OSError("No space left on device")
|
|
original_set(k, v)
|
|
|
|
with patch("httpx.post", return_value=_mock_response("hello")):
|
|
with patch.object(client, "_set", side_effect=flaky_set):
|
|
result = client.call([{"role": "user", "content": "hi"}], retries=1)
|
|
assert result == "hello"
|
|
# First _set call failed (caught by retry), second succeeded
|
|
assert set_attempts[0] == 2
|
|
|
|
|
|
def test_cache_corrupted_file():
|
|
"""Corrupted cache .json -> cache miss, API called instead"""
|
|
with tempfile.TemporaryDirectory() as tmp:
|
|
client = _llm_client(tmp)
|
|
messages = [{"role": "user", "content": "corrupt-test"}]
|
|
# Write a corrupted JSON file where the cache entry would be
|
|
k = client._key(messages)
|
|
cache_path = Path(tmp) / f"{k}.json"
|
|
cache_path.write_text("not valid json{{{")
|
|
with patch("httpx.post", return_value=_mock_response("from-api")) as mock_post:
|
|
result = client.call(messages, retries=0)
|
|
assert result == "from-api"
|
|
mock_post.assert_called_once()
|
|
|
|
|
|
def test_multiple_cache_files():
|
|
"""Multiple distinct messages create separate cache files with correct key structure"""
|
|
with tempfile.TemporaryDirectory() as tmp:
|
|
client = _llm_client(tmp)
|
|
msgs_a = [{"role": "user", "content": "alpha"}]
|
|
msgs_b = [{"role": "user", "content": "beta"}]
|
|
with patch("httpx.post", side_effect=[_mock_response("resp-a"), _mock_response("resp-b")]):
|
|
client.call(msgs_a, retries=0)
|
|
client.call(msgs_b, retries=0)
|
|
cached = list(Path(tmp).iterdir())
|
|
assert len(cached) == 2
|
|
keys = {p.stem for p in cached}
|
|
assert client._key(msgs_a) in keys
|
|
assert client._key(msgs_b) in keys
|
|
# Each file is valid JSON with the expected structure
|
|
for p in cached:
|
|
data = json.loads(p.read_text())
|
|
assert "response" in data
|
|
|
|
|
|
def test_empty_cache_dir_on_init():
|
|
"""Init with fresh empty directory -> mkdir creates it; re-init with existing dir works"""
|
|
with tempfile.TemporaryDirectory() as tmp:
|
|
cache_sub = Path(tmp) / "nested" / "cache"
|
|
assert not cache_sub.exists()
|
|
client = LLMClient(model="test", cache_dir=str(cache_sub))
|
|
assert cache_sub.exists()
|
|
assert cache_sub.is_dir()
|
|
# Second init with same directory (exist_ok=True) should not fail
|
|
client2 = LLMClient(model="test", cache_dir=str(cache_sub))
|
|
assert cache_sub.exists()
|
|
|
|
|
|
# ══════════════════════════════════════════════════════════════════════
|
|
# Concurrency
|
|
# ══════════════════════════════════════════════════════════════════════
|
|
|
|
def test_concurrent_same_message():
|
|
"""Two threads calling call() with same message -> both return same result"""
|
|
with tempfile.TemporaryDirectory() as tmp:
|
|
client = _llm_client(tmp)
|
|
messages = [{"role": "user", "content": "concurrent"}]
|
|
|
|
call_count_lock = threading.Lock()
|
|
api_call_count = [0]
|
|
|
|
def api_side(*a, **kw):
|
|
with call_count_lock:
|
|
api_call_count[0] += 1
|
|
time.sleep(0.05) # small delay so threads overlap
|
|
return _mock_response("shared-result")
|
|
|
|
results = [None, None]
|
|
errors = [None, None]
|
|
barrier = threading.Barrier(2, timeout=5)
|
|
|
|
def _call(idx):
|
|
try:
|
|
barrier.wait() # both threads start simultaneously
|
|
results[idx] = client.call(messages, retries=0)
|
|
except Exception as e:
|
|
errors[idx] = e
|
|
|
|
with patch("httpx.post", side_effect=api_side):
|
|
t1 = threading.Thread(target=_call, args=(0,))
|
|
t2 = threading.Thread(target=_call, args=(1,))
|
|
t1.start()
|
|
t2.start()
|
|
t1.join()
|
|
t2.join()
|
|
|
|
assert errors[0] is None, f"Thread 0 error: {errors[0]}"
|
|
assert errors[1] is None, f"Thread 1 error: {errors[1]}"
|
|
assert results[0] == "shared-result"
|
|
assert results[1] == "shared-result"
|
|
# With the barrier both threads race through _get before either writes,
|
|
# so both make an API call. Correctness (same result) is the key assertion.
|
|
assert api_call_count[0] == 2
|
|
|
|
|
|
# ══════════════════════════════════════════════════════════════════════
|
|
# Retry Behavior
|
|
# ══════════════════════════════════════════════════════════════════════
|
|
|
|
def test_retry_3_two_fail_then_success():
|
|
"""retries=3, first 2 call attempts fail, 3rd succeeds -> result from 3rd"""
|
|
with tempfile.TemporaryDirectory() as tmp:
|
|
client = _llm_client(tmp)
|
|
call_n = [0]
|
|
|
|
def _side(*a, **kw):
|
|
call_n[0] += 1
|
|
if call_n[0] <= 2:
|
|
raise Exception(f"fail #{call_n[0]}")
|
|
return _mock_response("ok-on-3rd")
|
|
|
|
with patch("httpx.post", side_effect=_side):
|
|
result = client.call([{"role": "user", "content": "x"}], retries=3)
|
|
assert result == "ok-on-3rd"
|
|
assert call_n[0] == 3 # exactly 3 attempts made
|
|
|
|
|
|
def test_retries_0_immediate_failure():
|
|
"""retries=0, first call fails -> immediate exception"""
|
|
client = _llm_client()
|
|
with patch("httpx.post", side_effect=ValueError("api exploded")):
|
|
with pytest.raises(ValueError, match="api exploded"):
|
|
client.call([{"role": "user", "content": "x"}], retries=0)
|
|
|
|
|
|
def test_cache_hit_then_eviction_then_retry():
|
|
"""Cache hit -> eviction -> cache miss -> API first fail -> retry succeed"""
|
|
with tempfile.TemporaryDirectory() as tmp:
|
|
client = _llm_client(tmp)
|
|
messages = [{"role": "user", "content": "evict-and-retry"}]
|
|
k = client._key(messages)
|
|
cache_path = Path(tmp) / f"{k}.json"
|
|
|
|
# Prime cache with a known value
|
|
cache_path.write_text(json.dumps({"response": "cached"}))
|
|
|
|
# Verify cache hit (no API call made)
|
|
with patch("httpx.post") as mock_post:
|
|
r1 = client.call(messages, retries=0)
|
|
assert r1 == "cached"
|
|
mock_post.assert_not_called()
|
|
|
|
# Evict the cache file
|
|
cache_path.unlink()
|
|
|
|
# Now: cache miss -> first API call fails -> retry succeeds
|
|
call_n = [0]
|
|
|
|
def _side(*a, **kw):
|
|
call_n[0] += 1
|
|
if call_n[0] == 1:
|
|
raise Exception("first fail after eviction")
|
|
return _mock_response("after-eviction-ok")
|
|
|
|
with patch("httpx.post", side_effect=_side):
|
|
r2 = client.call(messages, retries=1)
|
|
assert r2 == "after-eviction-ok"
|