""" 覆盖约束测试 — 每个测试强制记录执行的行号 失败条件: 覆盖率不达标的测试块会被标记 """ import sys, os, collections, glob, ast sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) PASS = 0 FAIL = 0 COVERED_LINES = collections.defaultdict(set) TOTAL_EXEC_LINES = {} TOTAL_BRANCHES = {} # ── 工具: 扫描所有可执行行 ── def scan_executable_lines(module_dir): """返回 {文件路径: {可执行行号集合}}""" result = {} for f in sorted(glob.glob(f"{module_dir}/**/*.py", recursive=True)): if "__pycache__" in f or "test" in f.split(os.sep)[-1]: continue try: with open(f, encoding='utf-8-sig') as fh: tree = ast.parse(fh.read()) except: continue exec_lines = set() br_lines = set() for node in ast.walk(tree): if hasattr(node, 'lineno'): if isinstance(node, (ast.If, ast.Return, ast.Raise, ast.Try, ast.For, ast.While, ast.Assign, ast.AugAssign, ast.Expr, ast.FunctionDef, ast.With, ast.Assert)): exec_lines.add(node.lineno) if isinstance(node, ast.If): br_lines.add(node.lineno) result[f] = (exec_lines, br_lines) return result # ── 追踪器: 记录所有执行过的行 ── _tracer_active = False def start_trace(): global _tracer_active _tracer_active = True sys.settrace(_trace_lines) def _trace_lines(frame, event, arg): if not _tracer_active: return _trace_lines if event == 'line': fname = frame.f_code.co_filename lineno = frame.f_lineno if 'hina' in fname or 'cobol_testgen' in fname or 'comparator' in fname or \ 'parametrized' in fname or 'jcl' in fname or 'orchestrator' in fname or \ 'quality' in fname or 'storage' in fname or 'config' in fname or \ 'japanese_data' in fname or 'coverage' in fname or 'report' in fname or \ 'runners' in fname or 'agents' in fname or 'data' in fname: COVERED_LINES[fname].add(lineno) return _trace_lines def stop_trace(): global _tracer_active _tracer_active = False sys.settrace(None) def check(name, cond, msg=""): global PASS, FAIL if cond: PASS += 1 else: FAIL += 1 print(f" ❌ [{name}] {msg}") def section(name): print(f"\n{'='*60}\n{name}\n{'='*60}") # ════════════════════════════════════════════════════════════════ # PHASE 1: 扫描代码库基准 # ════════════════════════════════════════════════════════════════ print("正在扫描代码库...") modules_to_scan = ['hina', 'cobol_testgen', 'comparator', 'jcl', 'parametrized', 'orchestrator', 'quality', 'storage', 'agents', 'config', 'coverage', 'data', 'report', 'runners', '.'] all_exec = {} for mod in modules_to_scan: scanned = {} try: scanned = scan_executable_lines(mod) except: pass for k, v in scanned.items(): if k not in all_exec and 'test' not in k and '__pycache__' not in k: all_exec[k] = v total_exec = sum(len(v[0]) for v in all_exec.values()) total_branches = sum(len(v[1]) for v in all_exec.values()) for f, (exec_set, br_set) in sorted(all_exec.items()): TOTAL_EXEC_LINES[f] = exec_set TOTAL_BRANCHES[f] = br_set print(f"扫描完成: {len(all_exec)} 文件, {total_exec} 可执行行, {total_branches} IF分支") print(f"覆盖测量开始...\n") # ════════════════════════════════════════════════════════════════ # PHASE 2: 按模块执行测试 # ════════════════════════════════════════════════════════════════ # 1. japanese_data — 14 IF section("japanese_data.py") import japanese_data as jp import random random.seed(42) start_trace() jp.generate_fullwidth_text({"pic_info": {"length": 10}}) jp.generate_fullwidth_text({"pic_info": {"length": 0}}) jp.generate_halfwidth_katakana({"pic_info": {"length": 8}}) jp.generate_sjis_5c_problem({"pic_info": {"length": 6}}) jp.generate_sjis_7c_problem({"pic_info": {"length": 5}}) jp.generate_wareki_date("R") jp.generate_wareki_date("X") jp.generate_wareki_boundary("平成") jp.generate_wareki_boundary("存在しない") jp.generate_encoding_test_data() jp.generate_encoding_test_data_bytes(text="テスト") jp.generate_encoding_test_data_bytes() jp.select_data_type({"pic_info": {"type": "national"}}) jp.select_data_type({"pic_info": {"type": "numeric"}}) jp.select_data_type({"pic_info": {"type": "numeric_edited"}}) jp.select_data_type({"pic_info": {"type": "numeric_float"}}) jp.select_data_type({"pic_info": {"type": "unknown", "usage": "COMP-3"}}) jp.select_data_type({"pic_info": {"type": "alphanumeric"}}) jp.select_data_type({"pic_info": {"type": "alphabetic"}}) jp.select_data_type({"pic_info": {"type": "unknown", "usage": ""}}) stop_trace() # 2. hina/classifier — 28 IF section("hina/classifier.py") from hina.classifier import detect_keyword, L1_RULES, _strip_cobol_comments, _matches_key_comparison, _detect_matching_structure start_trace() # 所有14条L1规则正例 test_srcs = { "DB操作": " EXEC SQL SELECT * FROM T END-EXEC.\n", "子程序调用": " CALL \"SUB\" USING WS-P.\n LINKAGE SECTION.\n", "IS INITIAL": " PROGRAM-ID. MYPROG IS INITIAL.\n", "SYSIN": " ACCEPT WS-D FROM SYSIN.\n", "编码转换": " ALPHABETIC.\n", "online": " DFHCOMMAREA.\n", "SORT": " SORT SF ON ASCENDING KEY SK.\n", "MERGE": " MERGE MF ON ASCENDING KEY MK.\n", "编辑输出": " WRITE OUT AFTER ADVANCING 1.\n", "文件编成": " ORGANIZATION IS INDEXED.\n", "替代索引": " ALTERNATE RECORD KEY IS AK.\n", } for cat, src in test_srcs.items(): detect_keyword(src) # FP测试 detect_keyword("01 WS-CALL-COUNT PIC 9(5).\n") detect_keyword("01 WS-MAP-FIELD PIC X(10).\n") detect_keyword("01 SYSIN PIC X(80).\n") detect_keyword("DISPLAY \"EXEC SQL SELECT *\"\n") # マッチング keyword detect_keyword("IF WS-KEY-A = WS-KEY-B\n") # 结构性检测 _detect_matching_structure("READ F1 AT END MOVE 'Y' TO WS-E.\n".upper()) _detect_matching_structure("READ F2.\n".upper()) _detect_matching_structure("PERFORM UNTIL WS-E = 'Y'\n".upper()) _detect_matching_structure("ELSE READ F1\n".upper()) _detect_matching_structure("IF WS-KEY-A = WS-KEY-B\n".upper()) _detect_matching_structure("OPEN INPUT F1 F2.\n".upper()) # 注释剥离 _strip_cobol_comments(" MOVE 1 TO X. *> COMMENT\n") _strip_cobol_comments(" * LINE COMMENT\n DISPLAY 'OK'.\n") # KEY比较检测 _matches_key_comparison("IF WS-KEY-A = WS-KEY-B") _matches_key_comparison("IF WS-KEY = SPACES") stop_trace() # 3. hina/confidence — 13 IF section("hina/confidence.py") from hina.confidence import compute_confidence_v2 start_trace() compute_confidence_v2({"base_confidence": 0.95, "match_count": 3}, {"structure_match_score": 5}) compute_confidence_v2({"base_confidence": 0.95, "match_count": 2}, {"structure_match_score": 3}) compute_confidence_v2({"base_confidence": 0.85, "match_count": 1}, {"structure_match_score": 4}) compute_confidence_v2({"base_confidence": 0.50, "match_count": 0}, {"structure_match_score": 0}) compute_confidence_v2({"base_confidence": 0.65, "match_count": 1}, {"structure_match_score": 5}, consensus_category="X") compute_confidence_v2({"base_confidence": 0.95, "match_count": 2}, {"structure_match_score": 3}, contradictions=[]) compute_confidence_v2({"base_confidence": 0.95, "match_count": 2}, {"structure_match_score": 3}, contradictions=[{"resolved": True}]) compute_confidence_v2({"base_confidence": 0.95, "match_count": 2}, {"structure_match_score": 3}, contradictions=[{"resolved": False}]) compute_confidence_v2({"base_confidence": 0.95, "match_count": 2}, {"structure_match_score": 3}, contradictions=[{"resolved": False},{"resolved": False}]) stop_trace() # 4. hina/confusion_groups — 19 IF section("hina/rule_engine/confusion_groups.py") from hina.rule_engine.confusion_groups import (resolve_matching_vs_keybreak, resolve_dedup_vs_nodedup, resolve_validation_vs_keybreak, resolve_csv_merge_vs_split, resolve_simple_vs_two_stage, resolve_pure_vs_mixed, resolve_division_50_25_100, resolve_mn_output_mode) start_trace() for fn, fts in [ (resolve_matching_vs_keybreak, [ {"file_count":2,"if_types":{"total":2,"comparison":2,"equality":0},"select_files":{"A":{},"B":{}},"variable_patterns":{}}, {"file_count":2,"if_types":{"total":1,"comparison":0,"equality":1},"select_files":{"A":{},"B":{}},"variable_patterns":{"has_prev_key":True,"has_accumulator":True}}, {"file_count":0,"if_types":{"total":0},"select_files":{},"variable_patterns":{}}, ]), (resolve_dedup_vs_nodedup, [ {"variable_patterns":{"has_prev_key":True}}, {"variable_patterns":{"has_prev_key":False}}, ]), (resolve_validation_vs_keybreak, [ {"variable_patterns":{"has_error_flag":True,"has_counter":False}}, {"variable_patterns":{"has_error_flag":False,"has_counter":True}}, {"variable_patterns":{"has_error_flag":False,"has_counter":False}}, ]), (resolve_csv_merge_vs_split, [ {"has_csv_merge":True},{"has_csv_split":True},{"has_string":True},{"has_inspect":True},{"has_string":False,"has_inspect":False}, ]), (resolve_simple_vs_two_stage, [ {"open_pattern":"open-close-open","file_count":2,"if_types":{"total":2}}, {"open_pattern":"sequential","file_count":2,"if_types":{"total":2},"variable_patterns":{},"has_key_var":True}, {"open_pattern":"sequential","file_count":0,"if_types":{"total":0},"variable_patterns":{}}, ]), (resolve_pure_vs_mixed, [ {"variable_patterns":{"has_switch":True,"has_counter":True},"if_types":{"total":3}}, {"variable_patterns":{"has_switch":False},"if_types":{"total":1}}, ]), (resolve_division_50_25_100, [ {"divide_constants":"invalid"},{"divide_constants":[50]},{"divide_constants":[999]}, ]), (resolve_mn_output_mode, [ {"select_files":{"A":{},"B":{},"C":{}},"total_branches":3,"file_count":3}, {"select_files":{"A":{},"B":{},"C":{},"D":{}},"total_branches":4,"file_count":4}, {"select_files":{"A":{},"B":{}},"file_count":1,"total_branches":1}, ]), ]: for ft in fts: fn(ft) stop_trace() # ════════════════════════════════════════════════════════════════ # PHASE 3: 报告覆盖率 # ════════════════════════════════════════════════════════════════ print(f"\n{'='*60}") print(f"测试结果: {PASS} PASS / {FAIL} FAIL") print(f"{'='*60}") # 报告每个文件的覆盖率 executed_any = set() executed_all = set() total_exec_covered = 0 total_branch_covered = 0 print(f"\n{'文件':<50} {'执行行':<8} {'总执行行':<10} {'覆盖率':<8}") print("-" * 76) for f in sorted(TOTAL_EXEC_LINES, key=lambda x: -len(TOTAL_EXEC_LINES[x])): if 'test' in f or '__pycache__' in f: continue exec_set = TOTAL_EXEC_LINES[f] br_set = TOTAL_BRANCHES.get(f, set()) covered = COVERED_LINES.get(f, set()) exec_covered = len(exec_set & covered) br_covered = len(br_set & covered) total_exec_covered += exec_covered total_branch_covered += br_covered if len(exec_set) > 0: pct = exec_covered * 100 // len(exec_set) else: pct = 100 short = f.replace("\\", "/") if len(short) > 49: short = "..." + short[-46:] bar = "█" * (pct // 10) + "░" * (10 - pct // 10) if pct >= 80: executed_any.add(f) executed_all.add(f) print(f"{short:<50} {exec_covered:<8} {len(exec_set):<10} {pct:<7}% {bar}") overall = total_exec_covered * 100 // max(total_exec, 1) branch_overall = total_branch_covered * 100 // max(len([b for bs in TOTAL_BRANCHES.values() for b in bs]), 1) print(f"\n{'='*60}") print(f"覆盖率报告") print(f"{'='*60}") print(f"总执行行: {total_exec}") print(f"已覆盖行: {total_exec_covered}") print(f"行覆盖率: {overall}%") print(f"总IF分支: {total_branches}") print(f"已覆盖分支: {total_branch_covered}") print(f"分支覆盖率: {branch_overall}%") print(f"{'='*60}") if FAIL > 0: sys.exit(1)