"""覆盖率统计:决策点收集 + 路径标记 + HTML报告""" import re import logging from dataclasses import dataclass, field from pathlib import Path logger = logging.getLogger(__name__) from .models import BrSeq, BrIf, BrEval, BrPerform, BrSearch, CondLeaf from .cond import parse_single_condition, parse_compound_condition, is_field, collect_leaves, evaluate_tree # ── 数据模型 ── @dataclass class LeafStat: field: str op: str value: str covered_true: bool = False covered_false: bool = False @dataclass class DecisionPoint: id: int kind: str # "IF" | "EVALUATE" | "PERFORM" label: str branch_names: list[str] covered_branches: set = field(default_factory=set) active_branches: set = field(default_factory=set) implied_branches: set = field(default_factory=set) leaves: list[LeafStat] = field(default_factory=list) source_line: int = 0 when_list: list = field(default_factory=list) cond_tree: object = None cond_leaves: list = field(default_factory=list) # ── 决策点收集 ── def collect_decision_points(node, fields, counter=None): if counter is None: counter = [0] points = [] all_leaves = [] if isinstance(node, BrIf): counter[0] += 1 dp = DecisionPoint(id=counter[0], kind='IF', label=node.condition, branch_names=['T', 'F']) simple = parse_single_condition(node.condition) if simple and is_field(simple[0], fields): dp.parsed = simple elif simple: dp.parsed = simple elif node.cond_tree: leaves = collect_leaves(node.cond_tree) if leaves: dp.cond_tree = node.cond_tree dp.cond_leaves = list(leaves) for leaf in leaves: ls = LeafStat(field=leaf.field, op=leaf.op, value=leaf.value) dp.leaves.append(ls) all_leaves.append(ls) points.append(dp) p, l = _walk_collect(node.true_seq, fields, counter) points.extend(p); all_leaves.extend(l) p, l = _walk_collect(node.false_seq, fields, counter) points.extend(p); all_leaves.extend(l) elif isinstance(node, BrEval): counter[0] += 1 names = [f"WHEN {v}" for v, _ in node.when_list] if node.has_other: names.append("OTHER") dp = DecisionPoint(id=counter[0], kind='EVALUATE', label=node.subject, branch_names=names, when_list=node.when_list) points.append(dp) for _, seq in node.when_list: p, l = _walk_collect(seq, fields, counter) points.extend(p); all_leaves.extend(l) p, l = _walk_collect(node.other_seq, fields, counter) points.extend(p); all_leaves.extend(l) elif isinstance(node, BrSearch): counter[0] += 1 branch_names = [] for cond_text, seq in node.when_list: branch_names.append(f'WHEN {cond_text[:40]}') if node.has_at_end: branch_names.append('AT END') dp = DecisionPoint(id=counter[0], kind='SEARCH', label=node.table_name, branch_names=branch_names) dp.when_list = node.when_list dp.cond_trees = node.cond_trees dp.has_other = node.has_at_end points.append(dp) for cond_text, seq in node.when_list: p, l = _walk_collect(seq, fields, counter) points.extend(p); all_leaves.extend(l) if node.has_at_end: p, l = _walk_collect(node.at_end_seq, fields, counter) points.extend(p); all_leaves.extend(l) elif isinstance(node, BrPerform): if node.perf_type in ('until', 'para_until', 'varying', 'para_varying'): counter[0] += 1 dp = DecisionPoint(id=counter[0], kind='PERFORM', label=node.condition or '', branch_names=['Enter', 'Skip']) simple = parse_single_condition(node.condition) if node.condition else None if simple and is_field(simple[0], fields): dp.parsed = simple elif node.condition: cond_tree = parse_compound_condition(node.condition, fields) if cond_tree: leaves = collect_leaves(cond_tree) if leaves: dp.cond_tree = cond_tree dp.cond_leaves = list(leaves) points.append(dp) p, l = _walk_collect(node.body_seq, fields, counter) points.extend(p); all_leaves.extend(l) elif isinstance(node, BrSeq): for child in node.children: p, l = collect_decision_points(child, fields, counter) points.extend(p); all_leaves.extend(l) return points, all_leaves def _walk_collect(node, fields, counter): return collect_decision_points(node, fields, counter) # ── 覆盖率标记 ── def mark_coverage(decision_points, leaf_stats, branch_paths, fields): for cons, _assign in branch_paths: for dp in decision_points: if dp.kind == 'IF': _mark_if(dp, cons) elif dp.kind == 'EVALUATE': _mark_eval(dp, cons, fields) elif dp.kind == 'PERFORM': _mark_perform(dp, cons) elif dp.kind == 'SEARCH': _mark_search(dp, cons, fields) for leaf in leaf_stats: for c in cons: if _match_leaf(c, leaf): if c[3]: leaf.covered_true = True else: leaf.covered_false = True for dp in decision_points: dp.implied_branches = set(dp.active_branches) def _match_constraint(c, parsed): if len(c) != 4: return False return (c[0] == parsed[0] and c[1] == parsed[1] and str(c[2]) == str(parsed[2])) def _match_leaf(c, leaf): if len(c) != 4: return False return (c[0] == leaf.field and c[1] == leaf.op and str(c[2]) == str(leaf.value)) def _mark_if(dp, cons): simple = getattr(dp, 'parsed', None) if simple: for c in cons: if _match_constraint(c, simple): if c[3]: dp.active_branches.add('T') else: dp.active_branches.add('F') elif dp.cond_tree and dp.cond_leaves: assignment = {} for leaf in dp.cond_leaves: for c in cons: if _match_leaf(c, leaf): assignment[leaf] = c[3] break if len(assignment) == len(dp.cond_leaves): if evaluate_tree(dp.cond_tree, assignment): dp.active_branches.add('T') else: dp.active_branches.add('F') else: matched = 0 for leaf in dp.leaves: for c in cons: if _match_leaf(c, leaf): matched += 1 break if matched <= 1: for c in cons: for leaf in dp.leaves: if _match_leaf(c, leaf): dp.active_branches.add('T' if c[3] else 'F') def _mark_eval(dp, cons, fields=None): if dp.label == 'TRUE': matched = False for when_val, _ in dp.when_list: parsed = parse_single_condition(when_val, fields) if parsed: for c in cons: if _match_constraint(c, parsed) and c[3]: name = f"WHEN {when_val}" if name in dp.branch_names: dp.active_branches.add(name) matched = True else: cond_tree = parse_compound_condition(when_val, fields) if cond_tree and not isinstance(cond_tree, CondLeaf): leaves = list(collect_leaves(cond_tree)) assignment = {} for leaf in leaves: for c in cons: if _match_leaf(c, leaf): assignment[leaf] = c[3] break if len(assignment) == len(leaves): if evaluate_tree(cond_tree, assignment): name = f"WHEN {when_val}" if name in dp.branch_names: dp.active_branches.add(name) matched = True if not matched and 'OTHER' in dp.branch_names: when_fields = set() for when_val, _ in dp.when_list: for c in cons: if c[0] in when_val: when_fields.add(c[0]) if when_fields: dp.active_branches.add('OTHER') return for c in cons: if c[0] == dp.label and c[1] == '=': name = f"WHEN {c[2]}" if name in dp.branch_names: dp.active_branches.add(name) elif c[0] == dp.label and c[1] == 'not_in': dp.active_branches.add('OTHER') def _mark_search(dp, cons, fields=None): branch_masks = [False] * len(dp.branch_names) for i, (cond_text, body_seq) in enumerate(dp.when_list): cond_tree = dp.cond_trees[i] if i < len(dp.cond_trees) else None if not cond_tree: continue if isinstance(cond_tree, CondLeaf): for c in cons: if len(c) == 4: base_c = re.sub(r'\s*\(.*?\)\s*$', '', c[0]) base_cond = re.sub(r'\s*\(.*?\)\s*$', '', cond_tree.field) if base_c == base_cond and c[1] == cond_tree.op \ and str(c[2]) == str(cond_tree.value) and c[3]: branch_masks[i] = True break else: leaves = list(collect_leaves(cond_tree)) assignment = {} for leaf in leaves: for c in cons: if len(c) == 4: base_c = re.sub(r'\s*\(.*?\)\s*$', '', c[0]) base_l = re.sub(r'\s*\(.*?\)\s*$', '', leaf.field) if base_c == base_l and c[1] == leaf.op and str(c[2]) == str(leaf.value): assignment[leaf] = c[3] break if len(assignment) == len(leaves): if evaluate_tree(cond_tree, assignment): branch_masks[i] = True if dp.has_other: at_end_idx = len(dp.branch_names) - 1 if not any(branch_masks[:at_end_idx]): branch_masks[at_end_idx] = True for i, m in enumerate(branch_masks): if m: dp.active_branches.add(dp.branch_names[i]) def _mark_perform(dp, cons): simple = getattr(dp, 'parsed', None) if simple: for c in cons: if _match_constraint(c, simple): if c[3]: dp.active_branches.add('Skip') else: dp.active_branches.add('Enter') elif dp.cond_tree and dp.cond_leaves: assignment = {} for leaf in dp.cond_leaves: for c in cons: if _match_leaf(c, leaf): assignment[leaf] = c[3] break if len(assignment) == len(dp.cond_leaves): if evaluate_tree(dp.cond_tree, assignment): dp.active_branches.add('Skip') else: dp.active_branches.add('Enter') else: for c in cons: if c[0] == dp.label or any(c[0] == f for f in _get_fields_in_cond(dp.label)): if c[3]: dp.active_branches.add('Skip') else: dp.active_branches.add('Enter') def _get_fields_in_cond(cond_text): return re.findall(r'[A-Z][A-Z0-9-]*', cond_text.upper()) # ── 行号定位(基于原始源文本)── def locate_decision_lines(decision_points, raw_source): """在原始源文本中搜索每个决策点的近似行号""" lines = raw_source.upper().splitlines() for dp in decision_points: patterns = _build_search_patterns(dp) for i, line in enumerate(lines): for pat in patterns: if re.search(pat, line): dp.source_line = i + 1 break if dp.source_line: break def _normalize(text): """标准化条件文本用于比较:去多余空白、标准化引号""" t = re.sub(r'\s+', ' ', text).strip() t = t.replace('"', "'") return t def _build_search_patterns(dp): texts = [] if dp.kind == 'IF': texts.append((r'\bIF\b', dp.label)) elif dp.kind == 'EVALUATE': texts.append((r'\bEVALUATE\b', dp.label)) elif dp.kind == 'PERFORM': texts.append((r'\bUNTIL\b', dp.condition if hasattr(dp, 'condition') else dp.label if dp.label else '')) else: return [r'$^'] # 永不匹配 patterns = [] for keyword, condition in texts: if not condition: continue norm_cond = _normalize(condition) # 转义正则特殊字符,但保留空格(替换为\s+) esc = re.escape(norm_cond) esc = esc.replace(r'\ ', r'\s+') esc = esc.replace(r'\'', r"['\"]") patterns.append(keyword + r'\s+' + esc) if not patterns: return [r'$^'] return patterns # ── HTML 报告(详情页)── _DETAIL_HTML = ''' {title}
← 覆盖率总览 |

{title}

📈 覆盖率概要

{dec_frac}
决策覆盖率
{cond_frac}
条件覆盖率
{dp_count_text}
决策点
{dec_pct_text}
已覆盖 未覆盖 推断覆盖
{decision_table} {leaf_table} {source_section}
''' def generate_html_report(decision_points, leaf_stats, source_lines, outpath, filename='', index_relpath=None, covered_lines=None): title = f"覆盖率报告 — {filename}" if filename else "覆盖率报告" total_branches = sum(len(dp.branch_names) for dp in decision_points) covered_branches = sum(len(dp.active_branches) for dp in decision_points) implied_branches = sum(len(dp.implied_branches) for dp in decision_points) if covered_lines: # 无分支程序:隐式 100% total_branches = max(total_branches, 1) covered_branches = max(covered_branches, 1) total_leaves = len(leaf_stats) * 2 covered_leaves = (sum(1 for l in leaf_stats if l.covered_true) + sum(1 for l in leaf_stats if l.covered_false)) # 计算数值 is_implicit = bool(covered_lines) # 无分支程序,隐式 100% dec_pct_val = (covered_branches / total_branches * 100) if total_branches else 0 dec_pct_text = "100% ✓" if is_implicit else (f"{dec_pct_val:.1f}%" if total_branches else "无") dec_frac = "全部覆盖" if is_implicit else (f"{covered_branches}/{total_branches}" if total_branches else "—") cond_frac = f"{covered_leaves}/{total_leaves}" if total_leaves else "—" implied_text = f'(+{implied_branches - covered_branches} 推断)' if implied_branches > covered_branches else '' # 颜色 if is_implicit or not total_branches or dec_pct_val >= 100: dec_val_cls = 'val-green' bar_cls = '' elif dec_pct_val >= 80: dec_val_cls = 'val-amber' bar_cls = ' amber' else: dec_val_cls = 'val-red' bar_cls = ' red' if not total_leaves or covered_leaves == total_leaves: cond_val_cls = 'val-green' elif covered_leaves / total_leaves >= 0.8: cond_val_cls = 'val-amber' else: cond_val_cls = 'val-red' # 决策点表格 if decision_points: dp_rows = [] for dp in decision_points: ln = str(dp.source_line) if dp.source_line else '?' branch_cells = [] for bn in dp.branch_names: if bn in dp.active_branches: branch_cells.append(f'{bn} ✓') elif bn in dp.implied_branches: branch_cells.append(f'{bn} ○') else: branch_cells.append(f'{bn} ✗') dp_rows.append(f'#{dp.id}{dp.kind}{ln}' f'{dp.label}' f'{" ".join(branch_cells)}') decision_table = f'''

📜 决策点

{"".join(dp_rows)}
#类型行号条件分支
''' else: decision_table = '' # 叶条件表格 if leaf_stats: leaf_rows = [] for leaf in leaf_stats: t = '' if leaf.covered_true else '' f = '' if leaf.covered_false else '' leaf_rows.append(f'{leaf.field}{leaf.op}' f'{leaf.value}{t}{f}') leaf_table = f'''

🔢 条件覆盖明细(叶条件)

{"".join(leaf_rows)}
字段运算符
''' else: leaf_table = '' # 源码标注 if source_lines: line_cov = {} for dp in decision_points: if dp.source_line: if dp.source_line not in line_cov: line_cov[dp.source_line] = [] has_missed = any(bn not in dp.active_branches for bn in dp.branch_names) has_active = any(bn in dp.active_branches for bn in dp.branch_names) if has_active and not has_missed: line_cov[dp.source_line].append('hl-green') elif has_active: line_cov[dp.source_line].append('hl-red') else: line_cov[dp.source_line].append('hl-amber') # 无分支程序:所有 PD 行标记为已覆盖 if covered_lines: for ln in covered_lines: line_cov.setdefault(ln, []).append('hl-green') src_lines = [] for i, line in enumerate(source_lines, 1): cls_list = line_cov.get(i, []) hl = ' ' + ' '.join(cls_list) if cls_list else '' src_lines.append(f'
' f'{i}' f'{line}
') source_section = f'''

📖 源码标注

{"".join(src_lines)}
''' else: source_section = '' html = _DETAIL_HTML.format( title=title, index_relpath=index_relpath or '#', dec_frac=dec_frac, dec_pct_text=dec_pct_text, dec_val_cls=dec_val_cls, cond_frac=cond_frac, cond_val_cls=cond_val_cls, bar_cls=bar_cls, bar_pct=str(int(dec_pct_val)), decision_table=decision_table, leaf_table=leaf_table, source_section=source_section, dp_count_text=('—' if is_implicit else str(len(decision_points))), ) outpath = Path(outpath) outpath.parent.mkdir(parents=True, exist_ok=True) outpath.write_text(html, encoding='utf-8') # ── 总括索引页 ── _INDEX_HTML = ''' 覆盖率总览

📊 覆盖率总览报告

{timestamp}
{agg_dec_num}
决策覆盖率
{agg_cond_num}
条件覆盖率
{prog_count}
已分析程序
{uncovered_count}
未完全覆盖程序
{dec_ring_svg}
决策覆盖率
{cond_ring_svg}
条件覆盖率
已覆盖 未覆盖 推断覆盖
{rows}
程序 决策分支 条件覆盖 覆盖率 状态
''' def _ring_svg(pct, color_stops): """生成 SVG 圆环 HTML。pct: 0-100 浮点数。""" r = 54 circ = 2 * 3.14159265 * r offset = circ * (1 - pct / 100) if pct > 0 else circ if pct >= 80: stroke = '#00c853' elif pct >= 50: stroke = '#ff8f00' else: stroke = '#ff1744' return ( f'' f'' f'' f'' f'{pct:.0f}%' f'覆盖率' f'' ) def generate_coverage_index(programs, outdir): """生成覆盖率总括索引页。""" from datetime import datetime timestamp = datetime.now().strftime('%Y-%m-%d %H:%M') agg_total = sum(p['total_branches'] for p in programs) agg_covered = sum(p['covered_branches'] for p in programs) agg_implied = sum(p['implied_branches'] for p in programs) agg_ctotal = sum(p['total_conditions'] for p in programs) agg_ccovered = sum(p['covered_conditions'] for p in programs) agg_dec_pct = (agg_covered / agg_total * 100) if agg_total else 0 agg_cond_pct = (agg_ccovered / agg_ctotal * 100) if agg_ctotal else 0 uncovered_count = sum(1 for p in programs if p['total_branches'] and p['covered_branches'] < p['total_branches']) dec_num_cls = 'num-green' if agg_dec_pct == 100 else ('num-amber' if agg_dec_pct >= 80 else 'num-red') cond_num_cls = 'num-green' if agg_cond_pct == 100 else ('num-amber' if agg_cond_pct >= 80 else 'num-red') uncovered_num_cls = 'num-green' if uncovered_count == 0 else 'num-red' def sort_key(p): if p['total_branches']: return -p['covered_branches'] / p['total_branches'] return -1.0 sorted_programs = sorted(programs, key=sort_key) rows = [] for p in sorted_programs: name = p['name'] href = p['detail_relpath'] tb = p['total_branches'] cb = p['covered_branches'] ib = p['implied_branches'] tc = p['total_conditions'] cc = p['covered_conditions'] imp = p.get('implicit_100', False) pct_dec = (cb / tb * 100) if tb else 0 pct_text = "全部覆盖" if imp else (f"{pct_dec:.1f}%" if tb else "—") implied_text = f'(+{ib - cb} 推断)' if ib > cb else '' branch_text = "—" if imp else f"{cb}/{tb}" cond_text = f"{cc}/{tc}" if tc else "—" bar_pct = int(pct_dec) # 进度条颜色 if imp or pct_dec >= 100: bar_cls = '' elif pct_dec >= 80: bar_cls = ' amber' else: bar_cls = ' red' # 状态徽标 if tb == 0 or (cb == tb and not (ib > cb)): badge = '✓ 完全' elif cb == tb and ib > cb: badge = '○ 推断' elif pct_dec >= 80: badge = '⚠ 不足' else: badge = '✗ 欠缺' # 条件覆盖数字颜色 if tc: cond_pct = cc / tc * 100 cond_color = 'num-green' if cond_pct == 100 else ('num-amber' if cond_pct >= 80 else 'num-red') cond_display = f'{cond_text}' else: cond_display = '' row_class = 'row-imperfect' if cb < tb else '' rows.append(f''' {name} {branch_text} {implied_text} {cond_display}
{pct_text}
{pct_text}
{badge} ''') dec_ring_svg = _ring_svg(agg_dec_pct, '') cond_ring_svg = _ring_svg(agg_cond_pct, '') html = _INDEX_HTML.format( timestamp=timestamp, agg_dec_num=f"{agg_covered}/{agg_total}", dec_num_cls=dec_num_cls, agg_cond_num=f"{agg_ccovered}/{agg_ctotal}" if agg_ctotal else "无数据", cond_num_cls=cond_num_cls, prog_count=str(len(programs)), uncovered_num_cls=uncovered_num_cls, uncovered_count=str(uncovered_count), dec_ring_svg=dec_ring_svg, cond_ring_svg=cond_ring_svg, rows='\n'.join(rows), ) outpath = Path(outdir) / 'coverage' / 'index.html' outpath.parent.mkdir(parents=True, exist_ok=True) outpath.write_text(html, encoding='utf-8') # ── PROCEDURE DIVISION 行范围定位(用于无分支程序标记)── def _find_proc_range(raw_source: str): """返回 PROCEDURE DIVISION 的行范围 (start_line, end_line) 1-indexed,或 None。""" lines = raw_source.splitlines() proc_start = None for i, line in enumerate(lines): if re.search(r'PROCEDURE\s+DIVISION', line.upper()): proc_start = i + 1 break if proc_start is None: return None # 找下一个 DIVISION 作为结束边界(或文件尾) for i in range(proc_start, len(lines)): if re.search(r'(IDENTIFICATION|DATA|ENVIRONMENT)\s+DIVISION', lines[i].upper()): return (proc_start, i) # 不包含下一个 DIVISION return (proc_start, len(lines) + 1) # ── 接入入口 ── def run_coverage(branch_tree, branch_paths_with_assigns, fields, raw_source, output_prefix, index_relpath=None): """完整覆盖率流程:收集 → 标记 → 定位 → 输出。 Returns: dict: 汇总数据,用于总括页聚合 """ decision_points, leaf_stats = collect_decision_points(branch_tree, fields) mark_coverage(decision_points, leaf_stats, branch_paths_with_assigns, fields) if raw_source: locate_decision_lines(decision_points, raw_source) total = sum(len(dp.branch_names) for dp in decision_points) covered = sum(len(dp.active_branches) for dp in decision_points) implied = sum(len(dp.implied_branches) for dp in decision_points) leaf_covered = (sum(1 for l in leaf_stats if l.covered_true) + sum(1 for l in leaf_stats if l.covered_false)) leaf_total = len(leaf_stats) * 2 # 无决策点但有路径 → PROCEDURE DIVISION 全部覆盖 covered_lines = set() if total == 0 and branch_paths_with_assigns and raw_source: proc_range = _find_proc_range(raw_source) if proc_range: covered_lines.update(range(proc_range[0], proc_range[1])) total = 1 covered = 1 if output_prefix: generate_html_report(decision_points, leaf_stats, raw_source.splitlines() if raw_source else [], f"{output_prefix}_coverage.html", Path(output_prefix).stem, index_relpath=index_relpath, covered_lines=covered_lines) # 控制台摘要 if total or leaf_total: logger.info(f"\n=== 分支覆盖率 ===") if covered_lines and not decision_points: logger.info(" 程序无分支结构,全部代码已覆盖") for dp in decision_points: branches = [] for bn in dp.branch_names: if bn in dp.active_branches: branches.append(f'{bn} [x]') elif bn in dp.implied_branches: branches.append(f'{bn} [o]') else: branches.append(f'{bn} [ ]') ln = f":{dp.source_line}" if dp.source_line else "" logger.info(f" #{dp.id} [{dp.kind}] {dp.label}{ln}") logger.info(f" {' | '.join(branches)}") if total: pct = covered / total * 100 logger.info(f"\n 决策覆盖率:{covered}/{total}({pct:.1f}%)") if leaf_total: pct = leaf_covered / leaf_total * 100 logger.info(f" 条件覆盖率:{leaf_covered}/{leaf_total}({pct:.1f}%)") if output_prefix: logger.info(f"\n 覆盖率报告:{output_prefix}_coverage.html") implicit_100 = bool(covered_lines) return { 'name': Path(output_prefix).stem if output_prefix else '', 'detail_relpath': ('../' + Path(output_prefix).stem + '_coverage.html' if output_prefix else ''), 'total_branches': total, 'covered_branches': covered, 'implied_branches': implied, 'implicit_100': implicit_100, 'total_conditions': leaf_total, 'covered_conditions': leaf_covered, '_decision_points': decision_points, '_leaf_stats': leaf_stats, }