bc1d56d1a4
P0.6: gcov infrastructure P1: extract_structure output expansion (11 new feature fields) P2: Confusion group rule engine (8 pairs + contradiction + backtrack) P3: 4-factor confidence calculation + quality gate update P4: 33+2 COBOL program type test samples (22 files, 7 categories) P5: parametrized/ test data generation engine P6: japanese_data.py lookup tables P7-10: Type-specific test suites (~159 parametrized tests) P11: Full classification pipeline (classify_program) + orchestrator integration P12: Documentation (module-interfaces, test-plan v3.0, coverage-matrix) Architecture decisions: - classification_pipeline/ merged to hina/pipeline/ - parametrized/ as independent module - japanese_data.py as root-level file - hina/__all__ only exports classify_program() Co-Authored-By: Claude <noreply@anthropic.com>
204 lines
9.0 KiB
Python
204 lines
9.0 KiB
Python
import shutil, time, logging
|
|
from pathlib import Path
|
|
from data.field_tree import FieldTree
|
|
from data.test_case import TestSuite, SparkConfig, TestCase
|
|
from data.diff_result import VerificationRun, FieldResult
|
|
from runners.runner import Runner
|
|
from runners import NativeJavaRunner, SparkJavaRunner, CobolRunner, DataWriter
|
|
from agents import Agent1Parser, Agent2Data, Agent3Diagnostic, LLMClient
|
|
from comparator import align_records, compare_field, CobolBinaryReader
|
|
from report import ReportGenerator
|
|
from storage import TestDataBundle
|
|
from config import Config
|
|
from cobol_testgen import extract_structure, generate_data, incremental_supplement, check_coverage
|
|
from hina import classify_program, gate_check, supplement as strategy_supplement
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
def run_pipeline(cfg: Config, cpath: str, cbl: str, java: str, map_path: str) -> VerificationRun:
|
|
t0 = time.time()
|
|
vr = VerificationRun(program=Path(java).stem, runner=cfg.runner_mode)
|
|
|
|
try:
|
|
text = Path(cpath).read_text()
|
|
if not text.strip():
|
|
return _done(vr, t0, "BLOCKED", 2)
|
|
|
|
llm = LLMClient(model=cfg.llm_model, timeout=cfg.llm_timeout, cache_dir=cfg.llm_cache_dir)
|
|
tree = Agent1Parser(llm).parse(text)
|
|
vr.llm_cost += 0.002
|
|
vr.debug["field_tree"] = [{"name":f.name,"level":f.level,"pic":f.pic,"usage":f.usage,
|
|
"offset":f.offset,"length":f.length,"redefines":f.redefines}
|
|
for f in tree.flatten().values()]
|
|
if not tree.fields:
|
|
return _done(vr, t0, "BLOCKED", 2)
|
|
if vr.llm_cost > cfg.max_llm_cost:
|
|
return _done(vr, t0, "BLOCKED", 3)
|
|
|
|
# ── Phase 1+2: cobol_testgen + HINA Agent + 策略 Agent + 质量门禁 ──
|
|
try:
|
|
cobol_src_text = Path(cbl).read_text(encoding="utf-8")
|
|
structure = extract_structure(cobol_src_text, source_dir=str(Path(cbl).parent))
|
|
|
|
# cobol_testgen 路径枚举 + 基础数据生成
|
|
base_records = generate_data(cobol_src_text, structure, source_dir=str(Path(cbl).parent))
|
|
vr.debug["cobol_testgen_records"] = len(base_records)
|
|
vr.debug["total_branches"] = structure.get("total_branches", 0)
|
|
|
|
# 转换为 TestCase 列表(增强管线的基础数据集)
|
|
complete_tests = []
|
|
for i, rec in enumerate(base_records):
|
|
complete_tests.append(TestCase(id=f"CTG-{i+1:04d}", fields=dict(rec)))
|
|
|
|
# HINA 完整类型判定管道(Keyword / 规则引擎 / LLM 辅助三路径)
|
|
classification: dict = {}
|
|
try:
|
|
classification = classify_program(cobol_src_text, llm=llm)
|
|
vr.hina_type = classification["category"]
|
|
vr.hina_confidence = classification["confidence"]
|
|
vr.debug["classification"] = classification
|
|
if classification["needs_review"]:
|
|
vr.quality_warn = f"类型判定确信度过低({classification['confidence']:.0%})"
|
|
except Exception as e:
|
|
vr.debug["hina_classify_error"] = str(e)
|
|
logger.warning(f"[orchestrator] HINA 类型判定失败: {e}")
|
|
|
|
# 策略 Agent 补充(追加标记记录,统一为 TestCase 格式)
|
|
for m in strategy_supplement([], classification):
|
|
complete_tests.append(TestCase(
|
|
id=m.get("id", f"STG-{len(complete_tests)+1:04d}"),
|
|
fields=m.get("fields", {}),
|
|
coverage_targets=m.get("coverage_targets", []),
|
|
))
|
|
|
|
# 质量门禁循环
|
|
cov = check_coverage(structure, base_records)
|
|
for attempt in range(cfg.max_quality_retries):
|
|
gate_result = gate_check(
|
|
complete_tests, classification, cov,
|
|
decision_threshold=cfg.quality_gate_decision_threshold,
|
|
paragraph_threshold=cfg.quality_gate_paragraph_threshold,
|
|
)
|
|
if gate_result.get("passed"):
|
|
break
|
|
gaps = gate_result.get("issues", {}).get("decision_gaps", [])
|
|
if gaps and structure.get("branch_tree_obj"):
|
|
delta = incremental_supplement(structure["branch_tree_obj"], gaps)
|
|
base_records.extend(delta)
|
|
# 同步更新 complete_tests
|
|
for i, d in enumerate(delta):
|
|
complete_tests.append(TestCase(
|
|
id=f"CTG-S{attempt+1}-{i+1:04d}",
|
|
fields=dict(d),
|
|
))
|
|
cov = check_coverage(structure, base_records)
|
|
else:
|
|
break
|
|
|
|
vr.paragraph_rate = 0.0 # Phase 3 通过 gcov 获取精确值
|
|
vr.branch_rate = cov.get("branch_rate", 0.0)
|
|
vr.decision_rate = cov.get("decision_rate", 0.0)
|
|
|
|
if cfg.quality_gate_mode != "off" and not gate_result.get("passed", True):
|
|
vr.quality_warn = f"质量门禁未完全通过 (尝试 {attempt+1} 次)"
|
|
vr.debug["quality_issues"] = gate_result.get("issues", {})
|
|
except Exception as e:
|
|
vr.debug["cobol_testgen_error"] = str(e)
|
|
logger.warning(f"[orchestrator] cobol_testgen 分析失败: {e}")
|
|
|
|
suite = Agent2Data(llm).design(tree, cfg.coverage_default, cfg.runner_mode == "spark")
|
|
vr.llm_cost += 0.002
|
|
suite.test_cases = complete_tests # 替换为增强管线数据(P1/P2 修复)
|
|
vr.debug["test_cases"] = [{"id":tc.id,"fields":tc.fields,"targets":tc.coverage_targets} for tc in suite.test_cases]
|
|
vr.debug["spark_config"] = {"records":suite.spark_config.num_records} if suite.has_spark else None
|
|
|
|
bundle = TestDataBundle(base_path=Path("test-data-bundle"))
|
|
bundle.ensure_dirs()
|
|
dw = DataWriter()
|
|
dw.write_cobol_binary(suite.test_cases, bundle.cobol_input())
|
|
if cfg.runner_mode == "spark":
|
|
sc = suite.spark_config or SparkConfig(num_records=cfg.num_records)
|
|
dw.write_spark_json(suite.test_cases, sc, bundle.spark_input_dir())
|
|
else:
|
|
dw.write_native_json(suite.test_cases, bundle.native_input())
|
|
|
|
cob = CobolRunner()
|
|
build = cob.compile(cbl, cfg.dialect)
|
|
vr.debug["cobol_build"] = {"ok": build.success, "log": build.log[-300:]}
|
|
if not build.success:
|
|
return _done(vr, t0, "BLOCKED", 2)
|
|
co = Path("cobol_out.bin")
|
|
if not cob.run(build.artifact_path, str(bundle.cobol_input()), str(co)).success:
|
|
return _done(vr, t0, "ERROR", 3)
|
|
|
|
if not shutil.which("java"):
|
|
return _done(vr, t0, "BLOCKED", 2)
|
|
runner: Runner = SparkJavaRunner(cfg.spark_master) if cfg.runner_mode == "spark" else NativeJavaRunner()
|
|
jb = runner.compile(java)
|
|
vr.debug["java_build"] = {"ok": jb.success, "log": jb.log[-300:]}
|
|
if not jb.success:
|
|
return _done(vr, t0, "BLOCKED", 2)
|
|
inp = str(bundle.spark_input_dir() if cfg.runner_mode == "spark" else bundle.native_input())
|
|
jr = runner.run(jb.artifact_path, inp, "java_out")
|
|
|
|
reader = CobolBinaryReader()
|
|
cr = reader.read(str(co), tree)
|
|
if len(cr) == 0 and len(jr.records) == 0:
|
|
return _done(vr, t0, "PASS", 0)
|
|
|
|
aligned = align_records(cr, jr.records, key_field="CUST-ID")
|
|
frs = []
|
|
for c, j, st in aligned:
|
|
if st != "MATCHED":
|
|
frs.append(FieldResult(field_name="unknown", status="NOT_SET" if st == "MISSING_IN_SPARK" else "EXTRA"))
|
|
continue
|
|
for k in c:
|
|
if k == "CUST-ID":
|
|
continue
|
|
cv = str(c.get(k, ""))
|
|
jv = str(j.get(k, ""))
|
|
ft = "decimal"
|
|
m = tree.get_by_name(k)
|
|
if m and m.usage != "COMP-3":
|
|
ft = "string"
|
|
frs.append(compare_field(k, cv, jv, ft, cfg.tolerance))
|
|
|
|
m = sum(1 for f in frs if f.status in ("MISMATCH", "NOT_SET"))
|
|
vr.fields_matched = len(frs) - m
|
|
vr.fields_mismatched = m
|
|
vr.field_results = frs
|
|
vr.status = "PASS" if m == 0 else "MISMATCH"
|
|
vr.exit_code = 0 if m == 0 else 1
|
|
|
|
diag = Agent3Diagnostic(llm)
|
|
for fr in frs:
|
|
if fr.status in ("MISMATCH", "NOT_SET", "NPE"):
|
|
try:
|
|
fr.suggestion = diag.analyze(fr) or ""
|
|
except:
|
|
pass
|
|
|
|
rd = Path(f"reports/{vr.program}") / vr.timestamp
|
|
rd.mkdir(parents=True, exist_ok=True)
|
|
g = ReportGenerator()
|
|
g.generate_json(vr, rd / "result.json")
|
|
g.generate_html(vr, rd / "report.html")
|
|
g.generate_machine_json(vr, rd / "machine.json")
|
|
vr.report_path = str(rd)
|
|
|
|
except Exception as e:
|
|
vr.status = "ERROR"
|
|
vr.exit_code = 3
|
|
vr.report_path = str(e)[:200]
|
|
|
|
vr.duration_s = time.time() - t0
|
|
return vr
|
|
|
|
|
|
def _done(vr, t0, s, ec):
|
|
vr.status = s
|
|
vr.exit_code = ec
|
|
vr.duration_s = time.time() - t0
|
|
return vr
|