feat: Phase 1 - orchestrator quality gate loop + hina/gate + main CLI args

This commit is contained in:
hangshuo652
2026-06-18 16:02:38 +08:00
parent 097530b036
commit c021dfe01e
3 changed files with 115 additions and 2 deletions
+62
View File
@@ -0,0 +1,62 @@
"""
质量门禁 — 执行前检查测试数据是否满足覆盖率和边界要求。
Phase 1 可用: 决策点覆盖、段落覆盖
Phase 2 启用: HINA 必须项、字段覆盖
"""
def check(
complete_tests: list,
hina_result: dict,
coverage: dict,
decision_threshold: float = 0.90,
paragraph_threshold: float = 1.0,
) -> dict:
"""质量门禁检查。
Args:
complete_tests: 完整的测试数据集
hina_result: HINA 分类结果
coverage: check_coverage() 输出的覆盖率数据
decision_threshold: 决策点覆盖率阈值
paragraph_threshold: 段落覆盖率阈值
Returns:
dict with: passed, score, issues
"""
issues = {}
branch_rate = coverage.get("branch_rate", 0.0)
if branch_rate < decision_threshold:
issues["decision_gaps"] = coverage.get("uncovered_decision_ids", [])
paragraph_rate = coverage.get("paragraph_rate", 0.0)
if paragraph_rate < paragraph_threshold:
issues.setdefault("paragraph_gaps", []).append(
f"段落覆盖率不足: {paragraph_rate:.0%}"
)
if not complete_tests:
issues["no_data"] = True
passed = len(issues) == 0
score = _compute_score(coverage, hina_result)
return {"passed": passed, "score": score, "issues": issues}
def _compute_score(coverage: dict, hina_result: dict) -> float:
"""质量评分公式(COBOL 版)。
评分 = 覆盖质量 × 0.6 + 边界质量 × 0.4
覆盖质量 = 段落覆盖率 × 0.5 + 分支覆盖率 × 0.5
边界质量 = HINA 必须项覆盖率(Phase 2 后启用,默认 1.0)
"""
paragraph_rate = coverage.get("paragraph_rate", 0.0)
branch_rate = coverage.get("branch_rate", 0.0)
coverage_quality = paragraph_rate * 0.5 + branch_rate * 0.5
boundary_quality = 1.0
return round(coverage_quality * 0.6 + boundary_quality * 0.4, 2)
+5
View File
@@ -15,6 +15,9 @@ def main():
p.add_argument("--verbose", action="store_true") p.add_argument("--verbose", action="store_true")
p.add_argument("--dry-run", action="store_true") p.add_argument("--dry-run", action="store_true")
p.add_argument("--output-dir", default="./reports") p.add_argument("--output-dir", default="./reports")
p.add_argument("--quality-gate-mode", choices=["warn", "off"], default="warn",
help="质量门禁模式: warn=记录警告, off=关闭")
p.add_argument("--gcov", action="store_true", help="启用 gcov 覆盖率采集")
args = p.parse_args() args = p.parse_args()
if args.dry_run: if args.dry_run:
@@ -35,6 +38,8 @@ def main():
c.runner_mode = args.runner c.runner_mode = args.runner
c.coverage_default = args.coverage c.coverage_default = args.coverage
c.tolerance = args.tolerance c.tolerance = args.tolerance
c.quality_gate_mode = args.quality_gate_mode
c.gcov_enabled = args.gcov
vr = run_pipeline(c, args.copybook, args.cobol_src, args.java_src, args.mapping) vr = run_pipeline(c, args.copybook, args.cobol_src, args.java_src, args.mapping)
t = vr.fields_matched + vr.fields_mismatched t = vr.fields_matched + vr.fields_mismatched
print(f"{vr.program}: {vr.status} ({vr.fields_matched}/{t}, {vr.duration_s:.0f}s)" if t else f"{vr.program}: {vr.status}") print(f"{vr.program}: {vr.status} ({vr.fields_matched}/{t}, {vr.duration_s:.0f}s)" if t else f"{vr.program}: {vr.status}")
+48 -2
View File
@@ -1,7 +1,7 @@
import shutil, time import shutil, time, logging
from pathlib import Path from pathlib import Path
from data.field_tree import FieldTree from data.field_tree import FieldTree
from data.test_case import TestSuite, SparkConfig from data.test_case import TestSuite, SparkConfig, TestCase
from data.diff_result import VerificationRun, FieldResult from data.diff_result import VerificationRun, FieldResult
from runners.runner import Runner from runners.runner import Runner
from runners.native_java_runner import NativeJavaRunner from runners.native_java_runner import NativeJavaRunner
@@ -18,6 +18,11 @@ from comparator.cobol_binary_reader import CobolBinaryReader
from report.generator import ReportGenerator from report.generator import ReportGenerator
from storage.bundle import TestDataBundle from storage.bundle import TestDataBundle
from config import Config from config import Config
from cobol_testgen import extract_structure, generate_data, incremental_supplement
from cobol_testgen.coverage import check_coverage
from hina.gate import check as gate_check
logger = logging.getLogger(__name__)
def run_pipeline(cfg: Config, cpath: str, cbl: str, java: str, map_path: str) -> VerificationRun: def run_pipeline(cfg: Config, cpath: str, cbl: str, java: str, map_path: str) -> VerificationRun:
@@ -40,6 +45,47 @@ def run_pipeline(cfg: Config, cpath: str, cbl: str, java: str, map_path: str) ->
if vr.llm_cost > cfg.max_llm_cost: if vr.llm_cost > cfg.max_llm_cost:
return _done(vr, t0, "BLOCKED", 3) return _done(vr, t0, "BLOCKED", 3)
# ── Phase 1: cobol_testgen 结构提取 + 路径覆盖 + 质量门禁 ──
try:
cobol_src_text = Path(cbl).read_text(encoding="utf-8")
structure = extract_structure(cobol_src_text)
base_records = generate_data(cobol_src_text, structure)
vr.debug["cobol_testgen_records"] = len(base_records)
vr.debug["total_branches"] = structure.get("total_branches", 0)
base_testcases = []
for i, rec in enumerate(base_records):
base_testcases.append(TestCase(id=f"CTG-{i+1:04d}", fields=dict(rec)))
cov = check_coverage(structure, base_records)
for attempt in range(cfg.max_quality_retries):
gate_result = gate_check(
base_testcases, {},
cov,
decision_threshold=cfg.quality_gate_decision_threshold,
paragraph_threshold=cfg.quality_gate_paragraph_threshold,
)
if gate_result.get("passed"):
break
gaps = gate_result.get("issues", {}).get("decision_gaps", [])
if gaps and structure.get("branch_tree_obj"):
delta = incremental_supplement(structure["branch_tree_obj"], gaps)
base_records.extend(delta)
cov = check_coverage(structure, base_records)
else:
break
vr.paragraph_rate = cov.get("paragraph_rate", 0.0)
vr.branch_rate = cov.get("branch_rate", 0.0)
vr.decision_rate = cov.get("decision_rate", 0.0)
if cfg.quality_gate_mode != "off" and not gate_result.get("passed", True):
vr.quality_warn = f"质量门禁未完全通过 (尝试 {attempt+1} 次)"
vr.debug["quality_issues"] = gate_result.get("issues", {})
except Exception as e:
vr.debug["cobol_testgen_error"] = str(e)
logger.warning(f"[orchestrator] cobol_testgen 分析失败: {e}")
suite = Agent2Data(llm).design(tree, cfg.coverage_default, cfg.runner_mode == "spark") suite = Agent2Data(llm).design(tree, cfg.coverage_default, cfg.runner_mode == "spark")
vr.llm_cost += 0.002 vr.llm_cost += 0.002
vr.debug["test_cases"] = [{"id":tc.id,"fields":tc.fields,"targets":tc.coverage_targets} for tc in suite.test_cases] vr.debug["test_cases"] = [{"id":tc.id,"fields":tc.fields,"targets":tc.coverage_targets} for tc in suite.test_cases]