Files
cobol-java-v3/test-data/s25_per_program_report.py
NB-076 50995d3335 chore: SETUP.md + 测试报告脚本 + 文档更新
- SETUP.md: 完整环境搭建指南(同事用)
- SETUP_QUICK.md: 快速搭环境(4步)
- s22~s26: TNA端到端、覆盖率报告、回归检查
- procedure_grammar.lark: 实验性Lark语法

Co-Authored-By: Claude <noreply@anthropic.com>
2026-06-25 08:50:17 +08:00

306 lines
15 KiB
Python

"""S25: 每程序独立详细报告 — 分类、分支覆盖、决策点明细"""
import sys, os, re, time, json
sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..'))
ROOT_BENCH = "D:/cobol-java/cobol-test-programs/"
COPYBOOKS_BENCH = os.path.join(ROOT_BENCH, "common", "copybooks")
ROOT_TNA = "D:/cobol-java/cobol-tna-system/"
COPYBOOKS_TNA = os.path.join(ROOT_TNA, "cpy")
from cobol_testgen import extract_structure, generate_data
from cobol_testgen.read import preprocess, resolve_copybooks, \
extract_data_division, extract_procedure_division, parse_data_division
from cobol_testgen.design_mcdc import enum_paths
from cobol_testgen.pipeline_bridge import build_branch_tree_fallback
from cobol_testgen.flatfile import analyze_fd_layout
from cobol_testgen.cond import parse_single_condition
CLASS_MAP = {
"01-matching-1-1": ("Matching", "1:1照合", "电信计费"),
"02-matching-1-N": ("Matching", "1:N照合", "电信计费"),
"03-matching-N-1": ("Matching", "N:1照合", "电信计费"),
"04-edit-getput": ("Edit/Output", "请求书编辑", "电信计费"),
"05-branch-if": ("ControlFlow", "IF判定", "电信计费"),
"06-branch-evaluate": ("ControlFlow", "EVALUATE多分岐", "电信计费"),
"07-keybreak-summary": ("KeyBreak", "キーブレイク集計", "电信计费"),
"08-keybreak-aggregate": ("KeyBreak", "キーブレイク集計2", "电信计费"),
"09-db-update": ("DB/SQL", "DB更新", "电信计费"),
"10-divide-50": ("Division", "50件分割", "电信计费"),
"11-divide-25": ("Division", "25件分割", "电信计费"),
"12-divide-100": ("Division", "100件分割", "电信计费"),
"13-validation-nodup": ("Validation", "重複無チェック", "电信计费"),
"14-online-cics": ("CICS/Online", "CICSオンライン", "电信计费"),
"15-csv-fb-nolf": ("CSV", "CSV→FB改行無", "电信计费"),
"16-matching-2stage-1-1": ("Matching", "2段階1:1照合", "电信计费"),
"17-matching-2stage-N-1": ("Matching", "2段階N:1照合", "电信计费"),
"18-matching-MN-to-M": ("Matching", "MN→M照合", "电信计费"),
"19-matching-MN-to-N": ("Matching", "MN→N照合", "电信计费"),
"20-matching-MN-to-MxN": ("Matching", "MN→MxN照合", "电信计费"),
"21-csv-fb-lf": ("CSV", "CSV→FB改行有", "电信计费"),
"22-matching-2stage-MN": ("Matching", "2段階MN照合", "电信计费"),
"23-select-condition": ("DB/SQL", "条件抽出", "电信计费"),
"24-table-search": ("Table/Search", "内部表検索", "电信计费"),
"25-subprogram": ("Subprogram", "CALLサブプログラム", "电信计费"),
"26-db-search": ("DB/SQL", "DB検索", "电信计费"),
"27-validation-halfwidth": ("Validation", "半角チェック", "电信计费"),
"28-sysin": ("ControlFlow", "SYSINパラメータ", "电信计费"),
"29-ascii-ebcdic": ("Encoding", "ASCII/EBCDIC変換", "电信计费"),
"30-keybreak-other": ("KeyBreak", "キーブレイク別", "电信计费"),
"31-validation-withdup": ("Validation", "重複有チェック", "电信计费"),
"32-mix-1N-samekeybreak": ("Matching", "混合1N同KEY", "电信计费"),
"33-mix-1N-diffkeybreak": ("Matching", "混合1N別KEY", "电信计费"),
"34-sort": ("Sort/Merge", "SORT処理", "电信计费"),
"35-merge": ("Sort/Merge", "MERGE処理", "电信计费"),
"36-billing-calc": ("Division", "料金計算", "电信计费"),
"pipeline": ("Pipeline", "パイプラインドライバ", "电信计费"),
"ZAN01CHK": ("Matching", "残業申請振分", "勤怠管理"),
"ZAN02CHK": ("Validation", "重複チェック", "勤怠管理"),
"ZAN03CHK": ("Matching", "残業申請照合", "勤怠管理"),
"ZAN04MAT": ("Matching", "残業実績照合", "勤怠管理"),
"ZAN05CAL": ("Division", "残業計算", "勤怠管理"),
"ZAN06UPD": ("DB/SQL", "DB更新処理", "勤怠管理"),
}
def find_main(d):
cbls = [f for f in os.listdir(d) if f.endswith(".cbl")]
ws = [f for f in cbls if re.match(r"main-\d{2}-", f, re.IGNORECASE)]
if ws: return max(ws, key=lambda f: os.path.getsize(os.path.join(d, f)))
return max(cbls, key=lambda f: os.path.getsize(os.path.join(d, f))) if cbls else None
def analyze_one(name, fpath, source_dir, copybook_dirs):
data = {"name": name, "branches": 0, "covered": 0, "dpoints": 0, "records": 0,
"flat_files": 0, "lines": 0, "code_lines": 0, "error": "",
"time_ms": 0, "parsed_ratio": 0, "dp_detail": [], "fd_layouts": {},
"prog_type": "", "prog_subtype": "", "domain": ""}
cls = CLASS_MAP.get(name, ("?", "?", "?"))
data["prog_type"], data["prog_subtype"], data["domain"] = cls
try:
src = open(fpath, encoding="utf-8-sig").read()
data["lines"] = len(src.split("\n"))
data["code_lines"] = sum(1 for l in src.split("\n")
if l.strip() and not l.strip().startswith("*"))
t0 = time.time()
st = extract_structure(src)
data["branches"] = st.get("total_branches", 0)
data["dpoints"] = len(st.get("decision_points", []))
# Generate data with copybook-aware preprocessing
recs = generate_data(src, st, copybook_dirs=copybook_dirs)
data["records"] = len(recs)
cov = st.get("coverage", {})
data["covered"] = cov.get("covered", 0)
data["cov_total"] = cov.get("total", 0)
data["cov_pct"] = cov.get("pct", 0)
data["dp_detail"] = cov.get("decision_points", [])
# FD layouts
pp_resolved = preprocess(resolve_copybooks(src, source_dir, extra_search_paths=copybook_dirs))
layouts = analyze_fd_layout(pp_resolved)
data["flat_files"] = len(layouts)
fd_info = {}
for lname, layout in layouts.items():
for rec in layout.get("records", []):
fields = rec.get("fields", [])
fd_info[lname] = {
"direction": layout["direction"],
"record_name": rec["record_name"],
"record_length": rec["record_length"],
"field_count": len(fields),
}
data["fd_layouts"] = fd_info
# Parsed condition ratio
dd = extract_data_division(pp_str)
fields = parse_data_division(dd) if dd else []
fdict = [{"name": f.name} for f in fields]
proc = extract_procedure_division(pp_str)
tree, ass = build_branch_tree_fallback(proc, fdict)
parsed_count = 0
total_if = 0
def count_parsed(nd):
nonlocal parsed_count, total_if
from cobol_testgen.models import BrIf, BrSeq, BrEval, BrPerform
if isinstance(nd, BrIf):
total_if += 1
if getattr(nd, 'condition', '') and \
parse_single_condition(nd.condition, fdict) is not None:
parsed_count += 1
if hasattr(nd, 'children'):
for c in nd.children: count_parsed(c)
if isinstance(nd, BrSeq):
for c in nd.children: count_parsed(c)
if isinstance(nd, BrEval):
for _, s in nd.when_list: count_parsed(s)
count_parsed(nd.other_seq)
if isinstance(nd, BrPerform):
count_parsed(nd.body_seq)
count_parsed(tree)
data["parsed_ratio"] = parsed_count / max(total_if, 1) * 100
data["time_ms"] = int((time.time() - t0) * 1000)
except Exception as e:
data["error"] = str(e)[:80]
return data
# ── Collect all results ──
all_results = []
prog_list = []
for d in sorted(os.listdir(ROOT_BENCH)):
dp = os.path.join(ROOT_BENCH, d)
if not os.path.isdir(dp) or d in ("common","docs","cross-cutting"): continue
fn = find_main(dp)
if not fn: continue
r = analyze_one(d, os.path.join(dp, fn), dp, [COPYBOOKS_BENCH])
all_results.append(r)
prog_list.append(r["name"])
for f in ["ZAN01CHK","ZAN02CHK","ZAN03CHK","ZAN04MAT","ZAN05CAL","ZAN06UPD"]:
fpath = os.path.join(ROOT_TNA, "src", f + ".cbl")
if not os.path.exists(fpath): continue
r = analyze_one(f, fpath, os.path.join(ROOT_TNA, "src"), [COPYBOOKS_TNA])
all_results.append(r)
prog_list.append(r["name"])
# ── Per-program detail ──
for r in all_results:
print("=" * 90)
print("PROGRAM: %s" % r["name"])
print("=" * 90)
print(" Classification: %s / %s" % (r["prog_type"], r["prog_subtype"]))
print(" Domain: %s" % r["domain"])
print(" Source lines: %d (non-comment: %d)" % (r["lines"], r["code_lines"]))
print()
if r.get("error"):
print(" ERROR: %s" % r["error"])
print()
continue
# Branch coverage summary
print(" ┌─ BRANCH COVERAGE ─────────────────────────────┐")
total = r["branches"]
covered = r["covered"]
pct = r["cov_pct"]
# Visual bar
bar_len = 30
filled = int(bar_len * pct / 100)
bar = "" * filled + "" * (bar_len - filled)
print("%s %5.1f%%" % (bar, pct))
print(" │ Covered: %d / %d branches (%d decision pts) │" % (covered, total, r["dpoints"]))
print(" └────────────────────────────────────────────────┘")
# Condition parsing
print(" ┌─ CONDITION PARSING ───────────────────────────┐")
print(" │ Parsed: %5.1f%% of IF conditions │" % r["parsed_ratio"])
unparsed_pct = max(0, 100 - r["parsed_ratio"])
if unparsed_pct > 20:
print(" │ ⚠ %d%% unparsed — synthetic coverage applied │" % int(unparsed_pct))
else:
print(" │ ✅ %d%% conditions parsed directly │" % int(r["parsed_ratio"]))
print(" └────────────────────────────────────────────────┘")
# Decision point detail
dp_detail = r.get("dp_detail", [])
if dp_detail:
print(" ┌─ DECISION POINT DETAIL ──────────────────────┐")
# Count by kind
from collections import Counter
kind_count = Counter(dp.get("kind", "?") for dp in dp_detail)
for k, c in sorted(kind_count.items()):
covered_k = sum(1 for dp in dp_detail if dp.get("kind") == k
and dp.get("covered", 0) >= dp.get("branches", 1))
print("%-12s: %d DPs (%d/%d fully covered) │" % (k, c, covered_k, c))
print(" │ │")
# Show first few uncovered
uncovered = [dp for dp in dp_detail
if dp.get("covered", 0) < dp.get("branches", 1)]
if uncovered:
print(" │ Uncovered DPs (%d):" % len(uncovered))
for dp in uncovered[:6]:
br = dp.get("branches", 0)
cov = dp.get("covered", 0)
lbl = dp.get("label", "?")[:45]
print("%s %d/%d%s" % (
"" if cov == 0 else "", cov, br, lbl))
if len(uncovered) > 6:
print(" │ ... and %d more" % (len(uncovered) - 6))
else:
print(" │ ✅ All DPs fully covered!")
print(" └────────────────────────────────────────────────┘")
# FD layouts
fd_layouts = r.get("fd_layouts", {})
if fd_layouts:
print(" ┌─ FILE DESCRIPTIONS ──────────────────────────┐")
for lname, info in sorted(fd_layouts.items()):
print("%-14s %-4s %sB %d fields │" % (
lname[:14], info["direction"],
info["record_length"], info["field_count"]))
print(" └────────────────────────────────────────────────┘")
# Generated test data
print(" ┌─ TEST DATA ───────────────────────────────────┐")
print(" │ Records: %d (%d paths generated) │" % (r["records"], r["branches"]))
print(" │ Flat file layouts: %d" % r["flat_files"])
print(" │ Time: %.2fs │" % (r["time_ms"] / 1000))
print(" └────────────────────────────────────────────────┘")
print()
# ── Summary table ──
print("=" * 140)
print("PROGRAM LIST — SUMMARY TABLE")
print("=" * 140)
print(f"{'#':>2} {'Program':<26} {'Type':<14} {'Br':>4} {'Cov':>4} {'C%':>5} {'DPs':>4} {'Recs':>4} {'FDs':>4} {'Lines':>6} {'Par%':>5} {'Time':>6}")
print("-" * 140)
for i, r in enumerate(all_results, 1):
print(f"{i:>2} {r['name']:<26} {r['prog_type']:<14} {r['branches']:>4} {r['covered']:>4} {r['cov_pct']:>4.0f}% {r['dpoints']:>4} {r['records']:>4} {r['flat_files']:>4} {r['code_lines']:>6} {r['parsed_ratio']:>4.0f}% {r['time_ms']/1000:>5.2f}s")
print("-" * 140)
# Totals
total_br = sum(r["branches"] for r in all_results)
total_cov = sum(r["covered"] for r in all_results)
total_recs = sum(r["records"] for r in all_results)
total_flats = sum(r["flat_files"] for r in all_results)
total_lines = sum(r["code_lines"] for r in all_results)
total_time = sum(r["time_ms"] for r in all_results)
print(f"{'TOTAL':>30} {total_br:>4} {total_cov:>4} {total_cov/max(total_br,1)*100:>4.0f}% {total_recs:>4} {total_flats:>4} {total_lines:>6} {total_time/1000:>5.1f}s")
print()
# Distribution histogram of coverage rates
print("=" * 60)
print("COVERAGE DISTRIBUTION")
print("=" * 60)
buckets = [(100, "100%"), (95, "95-99%"), (80, "80-94%"), (60, "60-79%"), (40, "40-59%"), (0, "0-39%")]
for threshold, label in buckets:
if threshold == 100:
count = sum(1 for r in all_results if r["cov_pct"] >= 100)
else:
upper = 100 if buckets.index((threshold, label)) == 0 else \
buckets[buckets.index((threshold, label)) - 1][0]
count = sum(1 for r in all_results if threshold <= r["cov_pct"] < upper)
bar = "" * count + "" * (max(0, 10 - count))
print(" %s: %2d programs %s" % (label, count, bar))
# Domain breakdown
print()
print("=" * 60)
print("BY DOMAIN")
print("=" * 60)
from collections import defaultdict
domains = defaultdict(lambda: {"count": 0, "branches": 0, "covered": 0, "lines": 0})
for r in all_results:
d = r.get("domain", "?")
domains[d]["count"] += 1
domains[d]["branches"] += r["branches"]
domains[d]["covered"] += r["covered"]
domains[d]["lines"] += r["code_lines"]
for d, data in sorted(domains.items()):
print(" %-12s %2d programs %4d/%4d branches %5.1f%% %5d lines" % (
d, data["count"], data["covered"], data["branches"],
data["covered"]/max(data["branches"],1)*100, data["lines"]))
print()
print("=" * 60)
print("REPORT GENERATED: S25 per-program report")
print("=" * 60)