"""S25: 每程序独立详细报告 — 分类、分支覆盖、决策点明细""" import sys, os, re, time, json sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..')) ROOT_BENCH = "D:/cobol-java/cobol-test-programs/" COPYBOOKS_BENCH = os.path.join(ROOT_BENCH, "common", "copybooks") ROOT_TNA = "D:/cobol-java/cobol-tna-system/" COPYBOOKS_TNA = os.path.join(ROOT_TNA, "cpy") from cobol_testgen import extract_structure, generate_data from cobol_testgen.read import preprocess, resolve_copybooks, \ extract_data_division, extract_procedure_division, parse_data_division from cobol_testgen.design_mcdc import enum_paths from cobol_testgen.pipeline_bridge import build_branch_tree_fallback from cobol_testgen.flatfile import analyze_fd_layout from cobol_testgen.cond import parse_single_condition CLASS_MAP = { "01-matching-1-1": ("Matching", "1:1照合", "电信计费"), "02-matching-1-N": ("Matching", "1:N照合", "电信计费"), "03-matching-N-1": ("Matching", "N:1照合", "电信计费"), "04-edit-getput": ("Edit/Output", "请求书编辑", "电信计费"), "05-branch-if": ("ControlFlow", "IF判定", "电信计费"), "06-branch-evaluate": ("ControlFlow", "EVALUATE多分岐", "电信计费"), "07-keybreak-summary": ("KeyBreak", "キーブレイク集計", "电信计费"), "08-keybreak-aggregate": ("KeyBreak", "キーブレイク集計2", "电信计费"), "09-db-update": ("DB/SQL", "DB更新", "电信计费"), "10-divide-50": ("Division", "50件分割", "电信计费"), "11-divide-25": ("Division", "25件分割", "电信计费"), "12-divide-100": ("Division", "100件分割", "电信计费"), "13-validation-nodup": ("Validation", "重複無チェック", "电信计费"), "14-online-cics": ("CICS/Online", "CICSオンライン", "电信计费"), "15-csv-fb-nolf": ("CSV", "CSV→FB改行無", "电信计费"), "16-matching-2stage-1-1": ("Matching", "2段階1:1照合", "电信计费"), "17-matching-2stage-N-1": ("Matching", "2段階N:1照合", "电信计费"), "18-matching-MN-to-M": ("Matching", "MN→M照合", "电信计费"), "19-matching-MN-to-N": ("Matching", "MN→N照合", "电信计费"), "20-matching-MN-to-MxN": ("Matching", "MN→MxN照合", "电信计费"), "21-csv-fb-lf": ("CSV", "CSV→FB改行有", "电信计费"), "22-matching-2stage-MN": ("Matching", "2段階MN照合", "电信计费"), "23-select-condition": ("DB/SQL", "条件抽出", "电信计费"), "24-table-search": ("Table/Search", "内部表検索", "电信计费"), "25-subprogram": ("Subprogram", "CALLサブプログラム", "电信计费"), "26-db-search": ("DB/SQL", "DB検索", "电信计费"), "27-validation-halfwidth": ("Validation", "半角チェック", "电信计费"), "28-sysin": ("ControlFlow", "SYSINパラメータ", "电信计费"), "29-ascii-ebcdic": ("Encoding", "ASCII/EBCDIC変換", "电信计费"), "30-keybreak-other": ("KeyBreak", "キーブレイク別", "电信计费"), "31-validation-withdup": ("Validation", "重複有チェック", "电信计费"), "32-mix-1N-samekeybreak": ("Matching", "混合1N同KEY", "电信计费"), "33-mix-1N-diffkeybreak": ("Matching", "混合1N別KEY", "电信计费"), "34-sort": ("Sort/Merge", "SORT処理", "电信计费"), "35-merge": ("Sort/Merge", "MERGE処理", "电信计费"), "36-billing-calc": ("Division", "料金計算", "电信计费"), "pipeline": ("Pipeline", "パイプラインドライバ", "电信计费"), "ZAN01CHK": ("Matching", "残業申請振分", "勤怠管理"), "ZAN02CHK": ("Validation", "重複チェック", "勤怠管理"), "ZAN03CHK": ("Matching", "残業申請照合", "勤怠管理"), "ZAN04MAT": ("Matching", "残業実績照合", "勤怠管理"), "ZAN05CAL": ("Division", "残業計算", "勤怠管理"), "ZAN06UPD": ("DB/SQL", "DB更新処理", "勤怠管理"), } def find_main(d): cbls = [f for f in os.listdir(d) if f.endswith(".cbl")] ws = [f for f in cbls if re.match(r"main-\d{2}-", f, re.IGNORECASE)] if ws: return max(ws, key=lambda f: os.path.getsize(os.path.join(d, f))) return max(cbls, key=lambda f: os.path.getsize(os.path.join(d, f))) if cbls else None def analyze_one(name, fpath, source_dir, copybook_dirs): data = {"name": name, "branches": 0, "covered": 0, "dpoints": 0, "records": 0, "flat_files": 0, "lines": 0, "code_lines": 0, "error": "", "time_ms": 0, "parsed_ratio": 0, "dp_detail": [], "fd_layouts": {}, "prog_type": "", "prog_subtype": "", "domain": ""} cls = CLASS_MAP.get(name, ("?", "?", "?")) data["prog_type"], data["prog_subtype"], data["domain"] = cls try: src = open(fpath, encoding="utf-8-sig").read() data["lines"] = len(src.split("\n")) data["code_lines"] = sum(1 for l in src.split("\n") if l.strip() and not l.strip().startswith("*")) t0 = time.time() st = extract_structure(src) data["branches"] = st.get("total_branches", 0) data["dpoints"] = len(st.get("decision_points", [])) # Generate data with copybook-aware preprocessing recs = generate_data(src, st, copybook_dirs=copybook_dirs) data["records"] = len(recs) cov = st.get("coverage", {}) data["covered"] = cov.get("covered", 0) data["cov_total"] = cov.get("total", 0) data["cov_pct"] = cov.get("pct", 0) data["dp_detail"] = cov.get("decision_points", []) # FD layouts pp_resolved = preprocess(resolve_copybooks(src, source_dir, extra_search_paths=copybook_dirs)) layouts = analyze_fd_layout(pp_resolved) data["flat_files"] = len(layouts) fd_info = {} for lname, layout in layouts.items(): for rec in layout.get("records", []): fields = rec.get("fields", []) fd_info[lname] = { "direction": layout["direction"], "record_name": rec["record_name"], "record_length": rec["record_length"], "field_count": len(fields), } data["fd_layouts"] = fd_info # Parsed condition ratio dd = extract_data_division(pp_str) fields = parse_data_division(dd) if dd else [] fdict = [{"name": f.name} for f in fields] proc = extract_procedure_division(pp_str) tree, ass = build_branch_tree_fallback(proc, fdict) parsed_count = 0 total_if = 0 def count_parsed(nd): nonlocal parsed_count, total_if from cobol_testgen.models import BrIf, BrSeq, BrEval, BrPerform if isinstance(nd, BrIf): total_if += 1 if getattr(nd, 'condition', '') and \ parse_single_condition(nd.condition, fdict) is not None: parsed_count += 1 if hasattr(nd, 'children'): for c in nd.children: count_parsed(c) if isinstance(nd, BrSeq): for c in nd.children: count_parsed(c) if isinstance(nd, BrEval): for _, s in nd.when_list: count_parsed(s) count_parsed(nd.other_seq) if isinstance(nd, BrPerform): count_parsed(nd.body_seq) count_parsed(tree) data["parsed_ratio"] = parsed_count / max(total_if, 1) * 100 data["time_ms"] = int((time.time() - t0) * 1000) except Exception as e: data["error"] = str(e)[:80] return data # ── Collect all results ── all_results = [] prog_list = [] for d in sorted(os.listdir(ROOT_BENCH)): dp = os.path.join(ROOT_BENCH, d) if not os.path.isdir(dp) or d in ("common","docs","cross-cutting"): continue fn = find_main(dp) if not fn: continue r = analyze_one(d, os.path.join(dp, fn), dp, [COPYBOOKS_BENCH]) all_results.append(r) prog_list.append(r["name"]) for f in ["ZAN01CHK","ZAN02CHK","ZAN03CHK","ZAN04MAT","ZAN05CAL","ZAN06UPD"]: fpath = os.path.join(ROOT_TNA, "src", f + ".cbl") if not os.path.exists(fpath): continue r = analyze_one(f, fpath, os.path.join(ROOT_TNA, "src"), [COPYBOOKS_TNA]) all_results.append(r) prog_list.append(r["name"]) # ── Per-program detail ── for r in all_results: print("=" * 90) print("PROGRAM: %s" % r["name"]) print("=" * 90) print(" Classification: %s / %s" % (r["prog_type"], r["prog_subtype"])) print(" Domain: %s" % r["domain"]) print(" Source lines: %d (non-comment: %d)" % (r["lines"], r["code_lines"])) print() if r.get("error"): print(" ERROR: %s" % r["error"]) print() continue # Branch coverage summary print(" ┌─ BRANCH COVERAGE ─────────────────────────────┐") total = r["branches"] covered = r["covered"] pct = r["cov_pct"] # Visual bar bar_len = 30 filled = int(bar_len * pct / 100) bar = "█" * filled + "░" * (bar_len - filled) print(" │ %s %5.1f%% │" % (bar, pct)) print(" │ Covered: %d / %d branches (%d decision pts) │" % (covered, total, r["dpoints"])) print(" └────────────────────────────────────────────────┘") # Condition parsing print(" ┌─ CONDITION PARSING ───────────────────────────┐") print(" │ Parsed: %5.1f%% of IF conditions │" % r["parsed_ratio"]) unparsed_pct = max(0, 100 - r["parsed_ratio"]) if unparsed_pct > 20: print(" │ ⚠ %d%% unparsed — synthetic coverage applied │" % int(unparsed_pct)) else: print(" │ ✅ %d%% conditions parsed directly │" % int(r["parsed_ratio"])) print(" └────────────────────────────────────────────────┘") # Decision point detail dp_detail = r.get("dp_detail", []) if dp_detail: print(" ┌─ DECISION POINT DETAIL ──────────────────────┐") # Count by kind from collections import Counter kind_count = Counter(dp.get("kind", "?") for dp in dp_detail) for k, c in sorted(kind_count.items()): covered_k = sum(1 for dp in dp_detail if dp.get("kind") == k and dp.get("covered", 0) >= dp.get("branches", 1)) print(" │ %-12s: %d DPs (%d/%d fully covered) │" % (k, c, covered_k, c)) print(" │ │") # Show first few uncovered uncovered = [dp for dp in dp_detail if dp.get("covered", 0) < dp.get("branches", 1)] if uncovered: print(" │ Uncovered DPs (%d):" % len(uncovered)) for dp in uncovered[:6]: br = dp.get("branches", 0) cov = dp.get("covered", 0) lbl = dp.get("label", "?")[:45] print(" │ %s %d/%d — %s" % ( "⚠" if cov == 0 else "◐", cov, br, lbl)) if len(uncovered) > 6: print(" │ ... and %d more" % (len(uncovered) - 6)) else: print(" │ ✅ All DPs fully covered!") print(" └────────────────────────────────────────────────┘") # FD layouts fd_layouts = r.get("fd_layouts", {}) if fd_layouts: print(" ┌─ FILE DESCRIPTIONS ──────────────────────────┐") for lname, info in sorted(fd_layouts.items()): print(" │ %-14s %-4s %sB %d fields │" % ( lname[:14], info["direction"], info["record_length"], info["field_count"])) print(" └────────────────────────────────────────────────┘") # Generated test data print(" ┌─ TEST DATA ───────────────────────────────────┐") print(" │ Records: %d (%d paths generated) │" % (r["records"], r["branches"])) print(" │ Flat file layouts: %d │" % r["flat_files"]) print(" │ Time: %.2fs │" % (r["time_ms"] / 1000)) print(" └────────────────────────────────────────────────┘") print() # ── Summary table ── print("=" * 140) print("PROGRAM LIST — SUMMARY TABLE") print("=" * 140) print(f"{'#':>2} {'Program':<26} {'Type':<14} {'Br':>4} {'Cov':>4} {'C%':>5} {'DPs':>4} {'Recs':>4} {'FDs':>4} {'Lines':>6} {'Par%':>5} {'Time':>6}") print("-" * 140) for i, r in enumerate(all_results, 1): print(f"{i:>2} {r['name']:<26} {r['prog_type']:<14} {r['branches']:>4} {r['covered']:>4} {r['cov_pct']:>4.0f}% {r['dpoints']:>4} {r['records']:>4} {r['flat_files']:>4} {r['code_lines']:>6} {r['parsed_ratio']:>4.0f}% {r['time_ms']/1000:>5.2f}s") print("-" * 140) # Totals total_br = sum(r["branches"] for r in all_results) total_cov = sum(r["covered"] for r in all_results) total_recs = sum(r["records"] for r in all_results) total_flats = sum(r["flat_files"] for r in all_results) total_lines = sum(r["code_lines"] for r in all_results) total_time = sum(r["time_ms"] for r in all_results) print(f"{'TOTAL':>30} {total_br:>4} {total_cov:>4} {total_cov/max(total_br,1)*100:>4.0f}% {total_recs:>4} {total_flats:>4} {total_lines:>6} {total_time/1000:>5.1f}s") print() # Distribution histogram of coverage rates print("=" * 60) print("COVERAGE DISTRIBUTION") print("=" * 60) buckets = [(100, "100%"), (95, "95-99%"), (80, "80-94%"), (60, "60-79%"), (40, "40-59%"), (0, "0-39%")] for threshold, label in buckets: if threshold == 100: count = sum(1 for r in all_results if r["cov_pct"] >= 100) else: upper = 100 if buckets.index((threshold, label)) == 0 else \ buckets[buckets.index((threshold, label)) - 1][0] count = sum(1 for r in all_results if threshold <= r["cov_pct"] < upper) bar = "█" * count + "░" * (max(0, 10 - count)) print(" %s: %2d programs %s" % (label, count, bar)) # Domain breakdown print() print("=" * 60) print("BY DOMAIN") print("=" * 60) from collections import defaultdict domains = defaultdict(lambda: {"count": 0, "branches": 0, "covered": 0, "lines": 0}) for r in all_results: d = r.get("domain", "?") domains[d]["count"] += 1 domains[d]["branches"] += r["branches"] domains[d]["covered"] += r["covered"] domains[d]["lines"] += r["code_lines"] for d, data in sorted(domains.items()): print(" %-12s %2d programs %4d/%4d branches %5.1f%% %5d lines" % ( d, data["count"], data["covered"], data["branches"], data["covered"]/max(data["branches"],1)*100, data["lines"])) print() print("=" * 60) print("REPORT GENERATED: S25 per-program report") print("=" * 60)