""" AI 自动化测试流程 v6 节点实现合规性验证 ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 参照: 1. analyze_node — 构造解析 + HINA分类 2. generate_node — テストケース生成 + カバレッジ 3. review_node — 品質門禁 + 合否判定 4. execute_node — 実行パイプライン 5. analyze_result_node — 致命缺陷/自愈/リトライ 6. report_node — JSON/HTML/MachineJSON 実行: python -X utf8 test-data/test_ai_flow_compliance.py """ import sys, json, os, time, tempfile, shutil from pathlib import Path sys.path.insert(0, str(Path(__file__).parent.parent)) from hina.classifier import compute_confidence from hina.retry import RetryHandler, HEALING_FIXES from hina.gate import check as gate_check, _compute_score from hina.strategy import get_strategy, supplement from cobol_testgen import extract_structure, generate_data from cobol_testgen.coverage import check_coverage from data.diff_result import VerificationRun from data.test_case import TestCase from report.generator import ReportGenerator PASS = 0; FAIL = 0; NODES = {} NODE_COUNTER = 0 LOG = [] def test(node, name, fn): global PASS, FAIL, NODE_COUNTER NODE_COUNTER += 1 NODES.setdefault(node, []).append(name) try: fn() PASS += 1 LOG.append(f" [{node}] {name} -> PASS") except Exception as e: FAIL += 1 LOG.append(f" [{node}] {name} -> FAIL: {str(e)[:80]}") def S(): return """ IDENTIFICATION DIVISION. PROGRAM-ID. T. DATA DIVISION. WORKING-STORAGE SECTION. 01 A PIC X. PROCEDURE DIVISION. IF A = 'X' THEN DISPLAY 'X' ELSE DISPLAY 'Y' END-IF. GOBACK.""" print("=" * 67) print(" AI 自动化测试流程 v6 节点 — 实现合规性验证") print("=" * 67) # ══════════════════════════════════════ # Node 1: analyze_node # ══════════════════════════════════════ print("\n【Node 1】分析节点 analyze_node") print(" 入力: core_flows / boundaries / rules / scenarios") print(" 出力: analysis_result -> HINA分類 + 構造解析") test("N1", "构造解析 extract_structure", lambda: ( extract_structure(S()).get("total_branches", 0) >= 2)) test("N1", "HINA分類 compute_confidence", lambda: ( hina := compute_confidence(S(), {}), hina.get("method") != "" and hina.get("category") != "")[1]) test("N1", "失败时返回空结构", lambda: ( extract_structure("INVALID").get("total_branches", 0) == 0)) test("N1", "分析成功->true(route条件)", lambda: ( hina := compute_confidence("EXEC SQL SELECT", {}), hina.get("confidence", 0) >= 0.95)[1]) # ══════════════════════════════════════ # Node 2: generate_node # ══════════════════════════════════════ print("\n【Node 2】生成节点 generate_node") print(" 出力: test_cases + coverage_metrics") test("N2", "テストケース生成 generate_data", lambda: ( isinstance(generate_data(S()), list))) test("N2", "カバレッジ指標 check_coverage", lambda: ( struct := extract_structure(S()), cov := check_coverage(struct, generate_data(S())), cov.get("branch_rate") is not None and cov.get("paragraph_rate") is not None)[2]) test("N2", "標準化 normalize->TestCase", lambda: ( records := generate_data(S()), cases := [TestCase(id=f"TC-{i}", fields=dict(r)) for i, r in enumerate(records)], all(isinstance(c, TestCase) for c in cases))[2]) # ══════════════════════════════════════ # Node 3: review_node # ══════════════════════════════════════ print("\n【Node 3】审查节点 review_node") print(" 判定: 品質門禁 + 合格/不合格 + 差戻し") test("N3", "品質門禁: 合格時続行", lambda: ( gate_check([{"x": 1}], {}, {"branch_rate": 1.0, "paragraph_rate": 1.0, "uncovered_decision_ids": []}).get("passed"))) test("N3", "品質門禁: 不合格時差戻し", lambda: ( r := gate_check([], {}, {"branch_rate": 0.0, "paragraph_rate": 0.0, "uncovered_decision_ids": [1]}), r.get("passed") == False and ("decision_gaps" in r.get("issues", {}) or "no_data" in r.get("issues", {})))[1]) test("N3", "戦略テンプレート(審査者相当)", lambda: ( len(get_strategy("マッチング").get("required", [])) == 9)) test("N3", "品質門禁: スコア計算", lambda: ( _compute_score({"branch_rate": 0.95, "paragraph_rate": 1.0}, {}) > 0)) # ══════════════════════════════════════ # Node 4: execute_node # ══════════════════════════════════════ print("\n【Node 4】执行节点 execute_node") print(" 出力: execution_results + pass_rate") test("N4", "パイプライン実行関数", lambda: ( hasattr(__import__("orchestrator"), "run_pipeline"))) test("N4", "実行結果モデル execution_results", lambda: ( vr := VerificationRun(status="PASS", fields_matched=10, fields_mismatched=0), vr.total_fields == 10 and vr.status == "PASS")[1]) test("N4", "pass_rate 記録", lambda: ( vr := VerificationRun(branch_rate=0.95), vr.branch_rate == 0.95)[1]) test("N4", "DataWriter TestCase受入", lambda: ( tc := TestCase(id="EXEC-001", fields={"X": 100}), tc.id == "EXEC-001" and tc.fields["X"] == 100)[1]) # ══════════════════════════════════════ # Node 5: analyze_result_node # ══════════════════════════════════════ print("\n【Node 5】结果分析节点 analyze_result_node") print(" 3 ルート: 正常 / 自愈リトライ / 致命缺陷->BugReport") test("N5", "致命缺陷 -> FATAL", lambda: ( h := RetryHandler(max_heal=0, max_simple=1), h.run(lambda: VerificationRun(status="ERROR", exit_code=3)).status == "FATAL")[1]) test("N5", "自愈(heal)回復", lambda: ( c := [0], h := RetryHandler(3, 1), vr := h.run(lambda: ( c.__setitem__(0, c[0] + 1), VerificationRun(status="BLOCKED", debug={"cobol_build": {"log": "not found"}}) )[1] if c[0] <= 2 else VerificationRun(status="PASS")), vr.status == "PASS" and vr.heal_retry > 0)[2]) test("N5", "pass_rate<0.8 -> 差戻し(QG判定)", lambda: ( r := gate_check([{"x": 1}], {}, {"branch_rate": 0.5, "paragraph_rate": 1.0, "uncovered_decision_ids": [1, 2]}), r.get("passed") == False and "decision_gaps" in r.get("issues", {}))[1]) test("N5", "自愈パターン定義 HEALING_FIXES", lambda: ( "compile_error" in HEALING_FIXES and "s0c7" in HEALING_FIXES)) test("N5", "QUALITY_WARN時は続行(非致命的)", lambda: ( h := RetryHandler(), h.run(lambda: VerificationRun(status="QUALITY_WARN")).status == "QUALITY_WARN")[1]) # ══════════════════════════════════════ # Node 6: report_node # ══════════════════════════════════════ print("\n【Node 6】报告节点 report_node") print(" 出力: MySQL + HTML/JSON レポート") rd = Path(tempfile.mkdtemp()) try: vr = VerificationRun(program="AI-FLOW", status="PASS", runner="native", branch_rate=0.95, paragraph_rate=1.0, quality_score=0.90, hina_type="IF分岐", heal_retry=1, simple_retry=0, total_retry=1) g = ReportGenerator() test("N6", "JSON生成+全フィールド", lambda: ( p := g.generate_json(vr, rd / "r.json"), d := json.loads(p.read_text()), all(k in d for k in ["program", "status", "branch_rate", "quality_score", "hina_type", "heal_retry"]))[2]) test("N6", "HTML生成+HINA表示", lambda: ( p := g.generate_html(vr, rd / "r.html"), html := p.read_text(encoding="utf-8"), "IF分岐" in html and "branch_rate" in html)[2]) test("N6", "MachineJSON+全必須フィールド", lambda: ( p := g.generate_machine_json(vr, rd / "m.json"), d := json.loads(p.read_text()), all(k in d for k in ["branch_rate", "paragraph_rate", "quality_score", "hina_type", "heal_retry"]))[2]) test("N6", "品質スコア計算(スコアリング)", lambda: ( _compute_score({"branch_rate": 0.95, "paragraph_rate": 1.0}, {}) > 0)) finally: shutil.rmtree(rd) # ══════════════════════════════════════ # Summary # ══════════════════════════════════════ print("\n" + "=" * 67) total = PASS + FAIL print(f" AI Agent v6 Node Compliance Report") print(f" Total: {total} | PASS: {PASS} | FAIL: {FAIL} | RATE: {PASS/max(total,1)*100:.1f}%") print(f" Nodes: 6/6 implemented") print("=" * 67) for l in LOG: print(l) print(f"\n RESULT: {'ALL NODES PASSED' if FAIL==0 else 'SOME NODES FAILED'}") sys.exit(0 if FAIL == 0 else 1)