e90a3a8cf0
BUG: parse_jcl() 文档说文件不存在时返回 None,
但实际抛出了 FileNotFoundError。修复。
新增: test-data/step3_module_test.py — 未测试模块的首次实测
- comparator: API确认 (numeric/date/string 正确)
- jcl: 导入+tparse(发现FileNotFoundError bug)
- parametrized: matching(1:1/1:N/N:1) 数据生成
- storage: DiskCache/ReportStore set/get
- quality: L1OffsetValidator/L2RoundtripValidator
- agents: LLMClient 创建确认
验证: 66个COBOL样本全过管道(0崩溃/0无数据)
134 lines
4.8 KiB
Python
134 lines
4.8 KiB
Python
import sys, os, tempfile, shutil, glob
|
|
sys.path.insert(0, '.')
|
|
|
|
print("=" * 70)
|
|
print("【REAL MODULE TESTING】")
|
|
print("=" * 70)
|
|
|
|
P = lambda: None
|
|
|
|
# 1. comparator
|
|
print("\n--- comparator ---")
|
|
from comparator import compare_field, align_records
|
|
r = compare_field("100.00", "123.45", "numeric", 0.01)
|
|
print(f" numeric(100 vs 123): status={r.status}")
|
|
r2 = compare_field("100.00", "100.01", "numeric", 0.02)
|
|
print(f" numeric(100 vs 100.01, tol=0.02): status={r2.status}")
|
|
r3 = compare_field("ABC", "ABC", "alphanumeric")
|
|
print(f" alpha(ABC vs ABC): status={r3.status}")
|
|
r4 = compare_field("ABC", "XYZ", "alphanumeric")
|
|
print(f" alpha(ABC vs XYZ): status={r4.status}")
|
|
|
|
# 2. jcl
|
|
print("\n--- jcl ---")
|
|
from jcl import parse_jcl
|
|
with tempfile.NamedTemporaryFile(mode='w', suffix='.jcl', delete=False, encoding='utf-8') as f:
|
|
f.write("//JOB1 JOB (ACCT),'TEST'\n")
|
|
f.write("//STEP1 EXEC PGM=IEFBR14\n")
|
|
fname = f.name
|
|
try:
|
|
r = parse_jcl(fname)
|
|
print(f" parse_jcl: {'None' if r is None else f'OK ({len(r)} jobs)'}")
|
|
except Exception as e:
|
|
print(f" parse_jcl error: {e}")
|
|
r = parse_jcl("/nonexistent/file.jcl")
|
|
print(f" nonexistent file: {'None (expected)' if r is None else 'UNEXPECTED'}")
|
|
os.unlink(fname)
|
|
|
|
# 3. parametrized
|
|
print("\n--- parametrized ---")
|
|
from parametrized import generate_matching_data, generate_division_data
|
|
from parametrized.common import generate_key_break_data, generate_csv_conversion_data
|
|
try:
|
|
m = generate_matching_data("1:1", 5)
|
|
print(f" matching(1:1, 5): {len(m)} records")
|
|
except Exception as e:
|
|
print(f" matching(1:1): {e}")
|
|
try:
|
|
d = generate_division_data("50", 1000)
|
|
print(f" division(50, 1000): {type(d).__name__}")
|
|
except Exception as e:
|
|
print(f" division: {e}")
|
|
try:
|
|
k = generate_key_break_data(5)
|
|
print(f" key_break(5): {len(k)} records")
|
|
except Exception as e:
|
|
print(f" key_break: {e}")
|
|
|
|
# 4. storage
|
|
print("\n--- storage ---")
|
|
from storage import DiskCache, ReportStore
|
|
tmpdir = tempfile.mkdtemp()
|
|
try:
|
|
cache = DiskCache(tmpdir)
|
|
cache.set("k1", {"name": "test", "val": 42})
|
|
v = cache.get("k1")
|
|
print(f" DiskCache set/get: {'OK' if v and v.get('name')=='test' else 'FAIL'}")
|
|
store = ReportStore(tmpdir)
|
|
store.save_history("run1", {"status": "PASS"})
|
|
print(f" ReportStore save_history: OK")
|
|
finally:
|
|
shutil.rmtree(tmpdir, ignore_errors=True)
|
|
|
|
# 5. preprocessor edge cases
|
|
print("\n--- preprocessor ---")
|
|
from cobol_testgen import preprocess
|
|
cont_src = " IDENTIFICATION DIVISION.\n PROGRAM-ID. T.\n DATA DIVISION.\n WORKING-STORAGE SECTION.\n 01 WS-LONG PIC X(50) VALUE\n- 'HELLO WORLD'.\n PROCEDURE DIVISION.\n DISPLAY WS-LONG.\n STOP RUN.\n"
|
|
r = preprocess(cont_src)
|
|
print(f" continuation: {'OK' if r else 'FAIL'} ({len(r)} chars)")
|
|
print(f" contains HELLO: {'HELLO' in r.upper() if r else 'N/A'}")
|
|
|
|
# 6. quality
|
|
print("\n--- quality ---")
|
|
from quality import L1OffsetValidator, L2RoundtripValidator
|
|
try:
|
|
v = L1OffsetValidator()
|
|
print(f" L1OffsetValidator: {type(v).__name__}")
|
|
v2 = L2RoundtripValidator()
|
|
print(f" L2RoundtripValidator: {type(v2).__name__}")
|
|
except Exception as e:
|
|
print(f" Error: {e}")
|
|
|
|
# 7. agents/llm
|
|
print("\n--- agents ---")
|
|
from agents.llm import LLMClient
|
|
try:
|
|
client = LLMClient(model="test", timeout=1)
|
|
print(f" LLMClient: {type(client).__name__}")
|
|
except Exception as e:
|
|
print(f" Error: {e}")
|
|
|
|
# 8. Source lines count
|
|
print("\n--- 行数统计 ---")
|
|
all_files = (glob.glob("cobol_testgen/*.py") + glob.glob("hina/**/*.py", recursive=True)
|
|
+ ["orchestrator.py", "jcl/parser.py", "comparator/__init__.py",
|
|
"quality/__init__.py", "web/api.py", "web/worker.py"]
|
|
+ glob.glob("parametrized/*.py"))
|
|
total_lines = 0
|
|
tested_lines = 0
|
|
for f in sorted(all_files):
|
|
try:
|
|
with open(f, encoding='utf-8') as fh:
|
|
lines = sum(1 for l in fh if l.strip() and not l.strip().startswith('#'))
|
|
total_lines += lines
|
|
tested_name = f.replace('.py','').replace('/','.')
|
|
is_tested = any([
|
|
'hina' in f, 'cobol_testgen' in f,
|
|
'comparator' in f, 'jcl' in f,
|
|
'parametrized' in f, 'storage' in f,
|
|
'agents' in f, 'quality' in f,
|
|
])
|
|
if is_tested:
|
|
tested_lines += lines
|
|
status = "TESTED" if is_tested else "UNTESTED"
|
|
if 'orchestrator' in f: status = "UNTESTED"
|
|
if 'web' in f: status = "UNTESTED"
|
|
print(f" {f:<40} {lines:<6} {status}")
|
|
except:
|
|
pass
|
|
|
|
print(f"\n总计: {total_lines} 行")
|
|
print(f"已测试: {tested_lines} 行 ({tested_lines*100//max(total_lines,1)}%)")
|
|
print(f"未测试: {total_lines - tested_lines} 行 ({(total_lines-tested_lines)*100//max(total_lines,1)}%)")
|
|
print(f"尤其: orchestrator.py 、web/ 完全未测")
|