import shutil, time, logging from pathlib import Path from data.field_tree import FieldTree from data.test_case import TestSuite, SparkConfig, TestCase from data.diff_result import VerificationRun, FieldResult from runners.runner import Runner from runners import NativeJavaRunner, SparkJavaRunner, CobolRunner, DataWriter from agents import Agent1Parser, Agent2Data, Agent3Diagnostic, LLMClient from comparator import align_records, compare_field, CobolBinaryReader from report import ReportGenerator from storage import TestDataBundle from config import Config from cobol_testgen import extract_structure, generate_data, incremental_supplement, check_coverage from hina import classify_program, gate_check, supplement as strategy_supplement logger = logging.getLogger(__name__) def run_pipeline(cfg: Config, cpath: str, cbl: str, java: str, map_path: str) -> VerificationRun: t0 = time.time() vr = VerificationRun(program=Path(java).stem, runner=cfg.runner_mode) try: text = Path(cpath).read_text() if not text.strip(): return _done(vr, t0, "BLOCKED", 2) llm = LLMClient(model=cfg.llm_model, timeout=cfg.llm_timeout, cache_dir=cfg.llm_cache_dir) tree = Agent1Parser(llm).parse(text) vr.llm_cost += 0.002 vr.debug["field_tree"] = [{"name":f.name,"level":f.level,"pic":f.pic,"usage":f.usage, "offset":f.offset,"length":f.length,"redefines":f.redefines} for f in tree.flatten().values()] if not tree.fields: return _done(vr, t0, "BLOCKED", 2) if vr.llm_cost > cfg.max_llm_cost: return _done(vr, t0, "BLOCKED", 3) # ── Phase 1+2: cobol_testgen + HINA Agent + 策略 Agent + 质量门禁 ── try: cobol_src_text = Path(cbl).read_text(encoding="utf-8") structure = extract_structure(cobol_src_text, source_dir=str(Path(cbl).parent)) # cobol_testgen 路径枚举 + 基础数据生成 base_records = generate_data(cobol_src_text, structure, source_dir=str(Path(cbl).parent)) vr.debug["cobol_testgen_records"] = len(base_records) vr.debug["total_branches"] = structure.get("total_branches", 0) # 转换为 TestCase 列表(增强管线的基础数据集) complete_tests = [] for i, rec in enumerate(base_records): complete_tests.append(TestCase(id=f"CTG-{i+1:04d}", fields=dict(rec))) # HINA 完整类型判定管道(Keyword / 规则引擎 / LLM 辅助三路径) classification: dict = {} try: classification = classify_program(cobol_src_text, llm=llm) vr.hina_type = classification["category"] vr.hina_confidence = classification["confidence"] vr.debug["classification"] = classification if classification["needs_review"]: vr.quality_warn = f"类型判定确信度过低({classification['confidence']:.0%})" except Exception as e: vr.debug["hina_classify_error"] = str(e) logger.warning(f"[orchestrator] HINA 类型判定失败: {e}") # 策略 Agent 补充(追加标记记录,统一为 TestCase 格式) for m in strategy_supplement([], classification): complete_tests.append(TestCase( id=m.get("id", f"STG-{len(complete_tests)+1:04d}"), fields=m.get("fields", {}), coverage_targets=m.get("coverage_targets", []), )) # 质量门禁循环 cov = check_coverage(structure, base_records) for attempt in range(cfg.max_quality_retries): gate_result = gate_check( complete_tests, classification, cov, decision_threshold=cfg.quality_gate_decision_threshold, paragraph_threshold=cfg.quality_gate_paragraph_threshold, ) if gate_result.get("passed"): break gaps = gate_result.get("issues", {}).get("decision_gaps", []) if gaps and structure.get("branch_tree_obj"): delta = incremental_supplement(structure["branch_tree_obj"], gaps) base_records.extend(delta) # 同步更新 complete_tests for i, d in enumerate(delta): complete_tests.append(TestCase( id=f"CTG-S{attempt+1}-{i+1:04d}", fields=dict(d), )) cov = check_coverage(structure, base_records) else: break vr.paragraph_rate = 0.0 # Phase 3 通过 gcov 获取精确值 vr.branch_rate = cov.get("branch_rate", 0.0) vr.decision_rate = cov.get("decision_rate", 0.0) if cfg.quality_gate_mode != "off" and not gate_result.get("passed", True): vr.quality_warn = f"质量门禁未完全通过 (尝试 {attempt+1} 次)" vr.debug["quality_issues"] = gate_result.get("issues", {}) except Exception as e: vr.debug["cobol_testgen_error"] = str(e) logger.warning(f"[orchestrator] cobol_testgen 分析失败: {e}") suite = Agent2Data(llm).design(tree, cfg.coverage_default, cfg.runner_mode == "spark") vr.llm_cost += 0.002 suite.test_cases = complete_tests # 替换为增强管线数据(P1/P2 修复) vr.debug["test_cases"] = [{"id":tc.id,"fields":tc.fields,"targets":tc.coverage_targets} for tc in suite.test_cases] vr.debug["spark_config"] = {"records":suite.spark_config.num_records} if suite.has_spark else None bundle = TestDataBundle(base_path=Path("test-data-bundle")) bundle.ensure_dirs() dw = DataWriter() dw.write_cobol_binary(suite.test_cases, bundle.cobol_input()) if cfg.runner_mode == "spark": sc = suite.spark_config or SparkConfig(num_records=cfg.num_records) dw.write_spark_json(suite.test_cases, sc, bundle.spark_input_dir()) else: dw.write_native_json(suite.test_cases, bundle.native_input()) cob = CobolRunner() build = cob.compile(cbl, cfg.dialect) vr.debug["cobol_build"] = {"ok": build.success, "log": build.log[-300:]} if not build.success: return _done(vr, t0, "BLOCKED", 2) co = Path("cobol_out.bin") if not cob.run(build.artifact_path, str(bundle.cobol_input()), str(co)).success: return _done(vr, t0, "ERROR", 3) if not shutil.which("java"): return _done(vr, t0, "BLOCKED", 2) runner: Runner = SparkJavaRunner(cfg.spark_master) if cfg.runner_mode == "spark" else NativeJavaRunner() jb = runner.compile(java) vr.debug["java_build"] = {"ok": jb.success, "log": jb.log[-300:]} if not jb.success: return _done(vr, t0, "BLOCKED", 2) inp = str(bundle.spark_input_dir() if cfg.runner_mode == "spark" else bundle.native_input()) jr = runner.run(jb.artifact_path, inp, "java_out") reader = CobolBinaryReader() cr = reader.read(str(co), tree) if len(cr) == 0 and len(jr.records) == 0: return _done(vr, t0, "PASS", 0) aligned = align_records(cr, jr.records, key_field="CUST-ID") frs = [] for c, j, st in aligned: if st != "MATCHED": frs.append(FieldResult(field_name="unknown", status="NOT_SET" if st == "MISSING_IN_SPARK" else "EXTRA")) continue for k in c: if k == "CUST-ID": continue cv = str(c.get(k, "")) jv = str(j.get(k, "")) ft = "decimal" m = tree.get_by_name(k) if m and m.usage != "COMP-3": ft = "string" frs.append(compare_field(k, cv, jv, ft, cfg.tolerance)) m = sum(1 for f in frs if f.status in ("MISMATCH", "NOT_SET")) vr.fields_matched = len(frs) - m vr.fields_mismatched = m vr.field_results = frs vr.status = "PASS" if m == 0 else "MISMATCH" vr.exit_code = 0 if m == 0 else 1 diag = Agent3Diagnostic(llm) for fr in frs: if fr.status in ("MISMATCH", "NOT_SET", "NPE"): try: fr.suggestion = diag.analyze(fr) or "" except: pass rd = Path(f"reports/{vr.program}") / vr.timestamp rd.mkdir(parents=True, exist_ok=True) g = ReportGenerator() g.generate_json(vr, rd / "result.json") g.generate_html(vr, rd / "report.html") g.generate_machine_json(vr, rd / "machine.json") vr.report_path = str(rd) except Exception as e: vr.status = "ERROR" vr.exit_code = 3 vr.report_path = str(e)[:200] vr.duration_s = time.time() - t0 return vr def _done(vr, t0, s, ec): vr.status = s vr.exit_code = ec vr.duration_s = time.time() - t0 return vr