Files
cobol-java-v3/test-data/test_with_coverage.py
T
NB-076 50995d3335 chore: SETUP.md + 测试报告脚本 + 文档更新
- SETUP.md: 完整环境搭建指南(同事用)
- SETUP_QUICK.md: 快速搭环境(4步)
- s22~s26: TNA端到端、覆盖率报告、回归检查
- procedure_grammar.lark: 实验性Lark语法

Co-Authored-By: Claude <noreply@anthropic.com>
2026-06-25 08:50:17 +08:00

305 lines
13 KiB
Python

"""
覆盖约束测试 — 每个测试强制记录执行的行号
失败条件: 覆盖率不达标的测试块会被标记
"""
import sys, os, collections, glob, ast
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
PASS = 0
FAIL = 0
COVERED_LINES = collections.defaultdict(set)
TOTAL_EXEC_LINES = {}
TOTAL_BRANCHES = {}
# ── 工具: 扫描所有可执行行 ──
def scan_executable_lines(module_dir):
"""返回 {文件路径: {可执行行号集合}}"""
result = {}
for f in sorted(glob.glob(f"{module_dir}/**/*.py", recursive=True)):
if "__pycache__" in f or "test" in f.split(os.sep)[-1]:
continue
try:
with open(f, encoding='utf-8-sig') as fh:
tree = ast.parse(fh.read())
except:
continue
exec_lines = set()
br_lines = set()
for node in ast.walk(tree):
if hasattr(node, 'lineno'):
if isinstance(node, (ast.If, ast.Return, ast.Raise, ast.Try,
ast.For, ast.While, ast.Assign, ast.AugAssign, ast.Expr,
ast.FunctionDef, ast.With, ast.Assert)):
exec_lines.add(node.lineno)
if isinstance(node, ast.If):
br_lines.add(node.lineno)
result[f] = (exec_lines, br_lines)
return result
# ── 追踪器: 记录所有执行过的行 ──
_tracer_active = False
def start_trace():
global _tracer_active
_tracer_active = True
sys.settrace(_trace_lines)
def _trace_lines(frame, event, arg):
if not _tracer_active:
return _trace_lines
if event == 'line':
fname = frame.f_code.co_filename
lineno = frame.f_lineno
if 'hina' in fname or 'cobol_testgen' in fname or 'comparator' in fname or \
'parametrized' in fname or 'jcl' in fname or 'orchestrator' in fname or \
'quality' in fname or 'storage' in fname or 'config' in fname or \
'japanese_data' in fname or 'coverage' in fname or 'report' in fname or \
'runners' in fname or 'agents' in fname or 'data' in fname:
COVERED_LINES[fname].add(lineno)
return _trace_lines
def stop_trace():
global _tracer_active
_tracer_active = False
sys.settrace(None)
def check(name, cond, msg=""):
global PASS, FAIL
if cond:
PASS += 1
else:
FAIL += 1
print(f" ❌ [{name}] {msg}")
def section(name):
print(f"\n{'='*60}\n{name}\n{'='*60}")
# ════════════════════════════════════════════════════════════════
# PHASE 1: 扫描代码库基准
# ════════════════════════════════════════════════════════════════
print("正在扫描代码库...")
modules_to_scan = ['hina', 'cobol_testgen', 'comparator', 'jcl', 'parametrized',
'orchestrator', 'quality', 'storage', 'agents', 'config',
'coverage', 'data', 'report', 'runners', '.']
all_exec = {}
for mod in modules_to_scan:
scanned = {}
try:
scanned = scan_executable_lines(mod)
except:
pass
for k, v in scanned.items():
if k not in all_exec and 'test' not in k and '__pycache__' not in k:
all_exec[k] = v
total_exec = sum(len(v[0]) for v in all_exec.values())
total_branches = sum(len(v[1]) for v in all_exec.values())
for f, (exec_set, br_set) in sorted(all_exec.items()):
TOTAL_EXEC_LINES[f] = exec_set
TOTAL_BRANCHES[f] = br_set
print(f"扫描完成: {len(all_exec)} 文件, {total_exec} 可执行行, {total_branches} IF分支")
print(f"覆盖测量开始...\n")
# ════════════════════════════════════════════════════════════════
# PHASE 2: 按模块执行测试
# ════════════════════════════════════════════════════════════════
# 1. japanese_data — 14 IF
section("japanese_data.py")
import japanese_data as jp
import random
random.seed(42)
start_trace()
jp.generate_fullwidth_text({"pic_info": {"length": 10}})
jp.generate_fullwidth_text({"pic_info": {"length": 0}})
jp.generate_halfwidth_katakana({"pic_info": {"length": 8}})
jp.generate_sjis_5c_problem({"pic_info": {"length": 6}})
jp.generate_sjis_7c_problem({"pic_info": {"length": 5}})
jp.generate_wareki_date("R")
jp.generate_wareki_date("X")
jp.generate_wareki_boundary("平成")
jp.generate_wareki_boundary("存在しない")
jp.generate_encoding_test_data()
jp.generate_encoding_test_data_bytes(text="テスト")
jp.generate_encoding_test_data_bytes()
jp.select_data_type({"pic_info": {"type": "national"}})
jp.select_data_type({"pic_info": {"type": "numeric"}})
jp.select_data_type({"pic_info": {"type": "numeric_edited"}})
jp.select_data_type({"pic_info": {"type": "numeric_float"}})
jp.select_data_type({"pic_info": {"type": "unknown", "usage": "COMP-3"}})
jp.select_data_type({"pic_info": {"type": "alphanumeric"}})
jp.select_data_type({"pic_info": {"type": "alphabetic"}})
jp.select_data_type({"pic_info": {"type": "unknown", "usage": ""}})
stop_trace()
# 2. hina/classifier — 28 IF
section("hina/classifier.py")
from hina.classifier import detect_keyword, L1_RULES, _strip_cobol_comments, _matches_key_comparison, _detect_matching_structure
start_trace()
# 所有14条L1规则正例
test_srcs = {
"DB操作": " EXEC SQL SELECT * FROM T END-EXEC.\n",
"子程序调用": " CALL \"SUB\" USING WS-P.\n LINKAGE SECTION.\n",
"IS INITIAL": " PROGRAM-ID. MYPROG IS INITIAL.\n",
"SYSIN": " ACCEPT WS-D FROM SYSIN.\n",
"编码转换": " ALPHABETIC.\n",
"online": " DFHCOMMAREA.\n",
"SORT": " SORT SF ON ASCENDING KEY SK.\n",
"MERGE": " MERGE MF ON ASCENDING KEY MK.\n",
"编辑输出": " WRITE OUT AFTER ADVANCING 1.\n",
"文件编成": " ORGANIZATION IS INDEXED.\n",
"替代索引": " ALTERNATE RECORD KEY IS AK.\n",
}
for cat, src in test_srcs.items():
detect_keyword(src)
# FP测试
detect_keyword("01 WS-CALL-COUNT PIC 9(5).\n")
detect_keyword("01 WS-MAP-FIELD PIC X(10).\n")
detect_keyword("01 SYSIN PIC X(80).\n")
detect_keyword("DISPLAY \"EXEC SQL SELECT *\"\n")
# マッチング keyword
detect_keyword("IF WS-KEY-A = WS-KEY-B\n")
# 结构性检测
_detect_matching_structure("READ F1 AT END MOVE 'Y' TO WS-E.\n".upper())
_detect_matching_structure("READ F2.\n".upper())
_detect_matching_structure("PERFORM UNTIL WS-E = 'Y'\n".upper())
_detect_matching_structure("ELSE READ F1\n".upper())
_detect_matching_structure("IF WS-KEY-A = WS-KEY-B\n".upper())
_detect_matching_structure("OPEN INPUT F1 F2.\n".upper())
# 注释剥离
_strip_cobol_comments(" MOVE 1 TO X. *> COMMENT\n")
_strip_cobol_comments(" * LINE COMMENT\n DISPLAY 'OK'.\n")
# KEY比较检测
_matches_key_comparison("IF WS-KEY-A = WS-KEY-B")
_matches_key_comparison("IF WS-KEY = SPACES")
stop_trace()
# 3. hina/confidence — 13 IF
section("hina/confidence.py")
from hina.confidence import compute_confidence_v2
start_trace()
compute_confidence_v2({"base_confidence": 0.95, "match_count": 3}, {"structure_match_score": 5})
compute_confidence_v2({"base_confidence": 0.95, "match_count": 2}, {"structure_match_score": 3})
compute_confidence_v2({"base_confidence": 0.85, "match_count": 1}, {"structure_match_score": 4})
compute_confidence_v2({"base_confidence": 0.50, "match_count": 0}, {"structure_match_score": 0})
compute_confidence_v2({"base_confidence": 0.65, "match_count": 1}, {"structure_match_score": 5}, consensus_category="X")
compute_confidence_v2({"base_confidence": 0.95, "match_count": 2}, {"structure_match_score": 3}, contradictions=[])
compute_confidence_v2({"base_confidence": 0.95, "match_count": 2}, {"structure_match_score": 3}, contradictions=[{"resolved": True}])
compute_confidence_v2({"base_confidence": 0.95, "match_count": 2}, {"structure_match_score": 3}, contradictions=[{"resolved": False}])
compute_confidence_v2({"base_confidence": 0.95, "match_count": 2}, {"structure_match_score": 3}, contradictions=[{"resolved": False},{"resolved": False}])
stop_trace()
# 4. hina/confusion_groups — 19 IF
section("hina/rule_engine/confusion_groups.py")
from hina.rule_engine.confusion_groups import (resolve_matching_vs_keybreak, resolve_dedup_vs_nodedup,
resolve_validation_vs_keybreak, resolve_csv_merge_vs_split, resolve_simple_vs_two_stage,
resolve_pure_vs_mixed, resolve_division_50_25_100, resolve_mn_output_mode)
start_trace()
for fn, fts in [
(resolve_matching_vs_keybreak, [
{"file_count":2,"if_types":{"total":2,"comparison":2,"equality":0},"select_files":{"A":{},"B":{}},"variable_patterns":{}},
{"file_count":2,"if_types":{"total":1,"comparison":0,"equality":1},"select_files":{"A":{},"B":{}},"variable_patterns":{"has_prev_key":True,"has_accumulator":True}},
{"file_count":0,"if_types":{"total":0},"select_files":{},"variable_patterns":{}},
]),
(resolve_dedup_vs_nodedup, [
{"variable_patterns":{"has_prev_key":True}},
{"variable_patterns":{"has_prev_key":False}},
]),
(resolve_validation_vs_keybreak, [
{"variable_patterns":{"has_error_flag":True,"has_counter":False}},
{"variable_patterns":{"has_error_flag":False,"has_counter":True}},
{"variable_patterns":{"has_error_flag":False,"has_counter":False}},
]),
(resolve_csv_merge_vs_split, [
{"has_csv_merge":True},{"has_csv_split":True},{"has_string":True},{"has_inspect":True},{"has_string":False,"has_inspect":False},
]),
(resolve_simple_vs_two_stage, [
{"open_pattern":"open-close-open","file_count":2,"if_types":{"total":2}},
{"open_pattern":"sequential","file_count":2,"if_types":{"total":2},"variable_patterns":{},"has_key_var":True},
{"open_pattern":"sequential","file_count":0,"if_types":{"total":0},"variable_patterns":{}},
]),
(resolve_pure_vs_mixed, [
{"variable_patterns":{"has_switch":True,"has_counter":True},"if_types":{"total":3}},
{"variable_patterns":{"has_switch":False},"if_types":{"total":1}},
]),
(resolve_division_50_25_100, [
{"divide_constants":"invalid"},{"divide_constants":[50]},{"divide_constants":[999]},
]),
(resolve_mn_output_mode, [
{"select_files":{"A":{},"B":{},"C":{}},"total_branches":3,"file_count":3},
{"select_files":{"A":{},"B":{},"C":{},"D":{}},"total_branches":4,"file_count":4},
{"select_files":{"A":{},"B":{}},"file_count":1,"total_branches":1},
]),
]:
for ft in fts:
fn(ft)
stop_trace()
# ════════════════════════════════════════════════════════════════
# PHASE 3: 报告覆盖率
# ════════════════════════════════════════════════════════════════
print(f"\n{'='*60}")
print(f"测试结果: {PASS} PASS / {FAIL} FAIL")
print(f"{'='*60}")
# 报告每个文件的覆盖率
executed_any = set()
executed_all = set()
total_exec_covered = 0
total_branch_covered = 0
print(f"\n{'文件':<50} {'执行行':<8} {'总执行行':<10} {'覆盖率':<8}")
print("-" * 76)
for f in sorted(TOTAL_EXEC_LINES, key=lambda x: -len(TOTAL_EXEC_LINES[x])):
if 'test' in f or '__pycache__' in f:
continue
exec_set = TOTAL_EXEC_LINES[f]
br_set = TOTAL_BRANCHES.get(f, set())
covered = COVERED_LINES.get(f, set())
exec_covered = len(exec_set & covered)
br_covered = len(br_set & covered)
total_exec_covered += exec_covered
total_branch_covered += br_covered
if len(exec_set) > 0:
pct = exec_covered * 100 // len(exec_set)
else:
pct = 100
short = f.replace("\\", "/")
if len(short) > 49:
short = "..." + short[-46:]
bar = "" * (pct // 10) + "" * (10 - pct // 10)
if pct >= 80:
executed_any.add(f)
executed_all.add(f)
print(f"{short:<50} {exec_covered:<8} {len(exec_set):<10} {pct:<7}% {bar}")
overall = total_exec_covered * 100 // max(total_exec, 1)
branch_overall = total_branch_covered * 100 // max(len([b for bs in TOTAL_BRANCHES.values() for b in bs]), 1)
print(f"\n{'='*60}")
print(f"覆盖率报告")
print(f"{'='*60}")
print(f"总执行行: {total_exec}")
print(f"已覆盖行: {total_exec_covered}")
print(f"行覆盖率: {overall}%")
print(f"总IF分支: {total_branches}")
print(f"已覆盖分支: {total_branch_covered}")
print(f"分支覆盖率: {branch_overall}%")
print(f"{'='*60}")
if FAIL > 0:
sys.exit(1)