From c021dfe01e19852d8bff04627538e84bea48b2dd Mon Sep 17 00:00:00 2001 From: hangshuo652 Date: Thu, 18 Jun 2026 16:02:38 +0800 Subject: [PATCH] feat: Phase 1 - orchestrator quality gate loop + hina/gate + main CLI args --- hina/gate.py | 62 +++++++++++++++++++++++++++++++++++++++++++++++++ main.py | 5 ++++ orchestrator.py | 50 +++++++++++++++++++++++++++++++++++++-- 3 files changed, 115 insertions(+), 2 deletions(-) create mode 100644 hina/gate.py diff --git a/hina/gate.py b/hina/gate.py new file mode 100644 index 0000000..a230fcb --- /dev/null +++ b/hina/gate.py @@ -0,0 +1,62 @@ +""" +质量门禁 — 执行前检查测试数据是否满足覆盖率和边界要求。 + +Phase 1 可用: 决策点覆盖、段落覆盖 +Phase 2 启用: HINA 必须项、字段覆盖 +""" + + +def check( + complete_tests: list, + hina_result: dict, + coverage: dict, + decision_threshold: float = 0.90, + paragraph_threshold: float = 1.0, +) -> dict: + """质量门禁检查。 + + Args: + complete_tests: 完整的测试数据集 + hina_result: HINA 分类结果 + coverage: check_coverage() 输出的覆盖率数据 + decision_threshold: 决策点覆盖率阈值 + paragraph_threshold: 段落覆盖率阈值 + + Returns: + dict with: passed, score, issues + """ + issues = {} + + branch_rate = coverage.get("branch_rate", 0.0) + if branch_rate < decision_threshold: + issues["decision_gaps"] = coverage.get("uncovered_decision_ids", []) + + paragraph_rate = coverage.get("paragraph_rate", 0.0) + if paragraph_rate < paragraph_threshold: + issues.setdefault("paragraph_gaps", []).append( + f"段落覆盖率不足: {paragraph_rate:.0%}" + ) + + if not complete_tests: + issues["no_data"] = True + + passed = len(issues) == 0 + score = _compute_score(coverage, hina_result) + + return {"passed": passed, "score": score, "issues": issues} + + +def _compute_score(coverage: dict, hina_result: dict) -> float: + """质量评分公式(COBOL 版)。 + + 评分 = 覆盖质量 × 0.6 + 边界质量 × 0.4 + 覆盖质量 = 段落覆盖率 × 0.5 + 分支覆盖率 × 0.5 + 边界质量 = HINA 必须项覆盖率(Phase 2 后启用,默认 1.0) + """ + paragraph_rate = coverage.get("paragraph_rate", 0.0) + branch_rate = coverage.get("branch_rate", 0.0) + + coverage_quality = paragraph_rate * 0.5 + branch_rate * 0.5 + boundary_quality = 1.0 + + return round(coverage_quality * 0.6 + boundary_quality * 0.4, 2) diff --git a/main.py b/main.py index 5c7fe07..5487c99 100644 --- a/main.py +++ b/main.py @@ -15,6 +15,9 @@ def main(): p.add_argument("--verbose", action="store_true") p.add_argument("--dry-run", action="store_true") p.add_argument("--output-dir", default="./reports") + p.add_argument("--quality-gate-mode", choices=["warn", "off"], default="warn", + help="质量门禁模式: warn=记录警告, off=关闭") + p.add_argument("--gcov", action="store_true", help="启用 gcov 覆盖率采集") args = p.parse_args() if args.dry_run: @@ -35,6 +38,8 @@ def main(): c.runner_mode = args.runner c.coverage_default = args.coverage c.tolerance = args.tolerance + c.quality_gate_mode = args.quality_gate_mode + c.gcov_enabled = args.gcov vr = run_pipeline(c, args.copybook, args.cobol_src, args.java_src, args.mapping) t = vr.fields_matched + vr.fields_mismatched print(f"{vr.program}: {vr.status} ({vr.fields_matched}/{t}, {vr.duration_s:.0f}s)" if t else f"{vr.program}: {vr.status}") diff --git a/orchestrator.py b/orchestrator.py index c0201dd..6e93708 100644 --- a/orchestrator.py +++ b/orchestrator.py @@ -1,7 +1,7 @@ -import shutil, time +import shutil, time, logging from pathlib import Path from data.field_tree import FieldTree -from data.test_case import TestSuite, SparkConfig +from data.test_case import TestSuite, SparkConfig, TestCase from data.diff_result import VerificationRun, FieldResult from runners.runner import Runner from runners.native_java_runner import NativeJavaRunner @@ -18,6 +18,11 @@ from comparator.cobol_binary_reader import CobolBinaryReader from report.generator import ReportGenerator from storage.bundle import TestDataBundle from config import Config +from cobol_testgen import extract_structure, generate_data, incremental_supplement +from cobol_testgen.coverage import check_coverage +from hina.gate import check as gate_check + +logger = logging.getLogger(__name__) def run_pipeline(cfg: Config, cpath: str, cbl: str, java: str, map_path: str) -> VerificationRun: @@ -40,6 +45,47 @@ def run_pipeline(cfg: Config, cpath: str, cbl: str, java: str, map_path: str) -> if vr.llm_cost > cfg.max_llm_cost: return _done(vr, t0, "BLOCKED", 3) + # ── Phase 1: cobol_testgen 结构提取 + 路径覆盖 + 质量门禁 ── + try: + cobol_src_text = Path(cbl).read_text(encoding="utf-8") + structure = extract_structure(cobol_src_text) + base_records = generate_data(cobol_src_text, structure) + vr.debug["cobol_testgen_records"] = len(base_records) + vr.debug["total_branches"] = structure.get("total_branches", 0) + + base_testcases = [] + for i, rec in enumerate(base_records): + base_testcases.append(TestCase(id=f"CTG-{i+1:04d}", fields=dict(rec))) + + cov = check_coverage(structure, base_records) + for attempt in range(cfg.max_quality_retries): + gate_result = gate_check( + base_testcases, {}, + cov, + decision_threshold=cfg.quality_gate_decision_threshold, + paragraph_threshold=cfg.quality_gate_paragraph_threshold, + ) + if gate_result.get("passed"): + break + gaps = gate_result.get("issues", {}).get("decision_gaps", []) + if gaps and structure.get("branch_tree_obj"): + delta = incremental_supplement(structure["branch_tree_obj"], gaps) + base_records.extend(delta) + cov = check_coverage(structure, base_records) + else: + break + + vr.paragraph_rate = cov.get("paragraph_rate", 0.0) + vr.branch_rate = cov.get("branch_rate", 0.0) + vr.decision_rate = cov.get("decision_rate", 0.0) + + if cfg.quality_gate_mode != "off" and not gate_result.get("passed", True): + vr.quality_warn = f"质量门禁未完全通过 (尝试 {attempt+1} 次)" + vr.debug["quality_issues"] = gate_result.get("issues", {}) + except Exception as e: + vr.debug["cobol_testgen_error"] = str(e) + logger.warning(f"[orchestrator] cobol_testgen 分析失败: {e}") + suite = Agent2Data(llm).design(tree, cfg.coverage_default, cfg.runner_mode == "spark") vr.llm_cost += 0.002 vr.debug["test_cases"] = [{"id":tc.id,"fields":tc.fields,"targets":tc.coverage_targets} for tc in suite.test_cases]