"""覆盖率统计:决策点收集 + 路径标记 + HTML报告"""
import re
import logging
from dataclasses import dataclass, field
from pathlib import Path
logger = logging.getLogger(__name__)
from .models import BrSeq, BrIf, BrEval, BrPerform, BrSearch, CondLeaf
from .cond import parse_single_condition, parse_compound_condition, is_field, collect_leaves, evaluate_tree
# ── 数据模型 ──
@dataclass
class LeafStat:
field: str
op: str
value: str
covered_true: bool = False
covered_false: bool = False
@dataclass
class DecisionPoint:
id: int
kind: str # "IF" | "EVALUATE" | "PERFORM"
label: str
branch_names: list[str]
covered_branches: set = field(default_factory=set)
active_branches: set = field(default_factory=set)
implied_branches: set = field(default_factory=set)
leaves: list[LeafStat] = field(default_factory=list)
source_line: int = 0
when_list: list = field(default_factory=list)
cond_tree: object = None
cond_leaves: list = field(default_factory=list)
# ── 决策点收集 ──
def collect_decision_points(node, fields, counter=None):
if counter is None:
counter = [0]
points = []
all_leaves = []
if isinstance(node, BrIf):
counter[0] += 1
dp = DecisionPoint(id=counter[0], kind='IF', label=node.condition,
branch_names=['T', 'F'])
simple = parse_single_condition(node.condition)
if simple and is_field(simple[0], fields):
dp.parsed = simple
elif simple:
dp.parsed = simple
elif node.cond_tree:
leaves = collect_leaves(node.cond_tree)
if leaves:
dp.cond_tree = node.cond_tree
dp.cond_leaves = list(leaves)
for leaf in leaves:
ls = LeafStat(field=leaf.field, op=leaf.op, value=leaf.value)
dp.leaves.append(ls)
all_leaves.append(ls)
points.append(dp)
p, l = _walk_collect(node.true_seq, fields, counter)
points.extend(p); all_leaves.extend(l)
p, l = _walk_collect(node.false_seq, fields, counter)
points.extend(p); all_leaves.extend(l)
elif isinstance(node, BrEval):
counter[0] += 1
names = [f"WHEN {v}" for v, _ in node.when_list]
if node.has_other:
names.append("OTHER")
dp = DecisionPoint(id=counter[0], kind='EVALUATE', label=node.subject,
branch_names=names, when_list=node.when_list)
points.append(dp)
for _, seq in node.when_list:
p, l = _walk_collect(seq, fields, counter)
points.extend(p); all_leaves.extend(l)
p, l = _walk_collect(node.other_seq, fields, counter)
points.extend(p); all_leaves.extend(l)
elif isinstance(node, BrSearch):
counter[0] += 1
branch_names = []
for cond_text, seq in node.when_list:
branch_names.append(f'WHEN {cond_text[:40]}')
if node.has_at_end:
branch_names.append('AT END')
dp = DecisionPoint(id=counter[0], kind='SEARCH',
label=node.table_name, branch_names=branch_names)
dp.when_list = node.when_list
dp.cond_trees = node.cond_trees
dp.has_other = node.has_at_end
points.append(dp)
for cond_text, seq in node.when_list:
p, l = _walk_collect(seq, fields, counter)
points.extend(p); all_leaves.extend(l)
if node.has_at_end:
p, l = _walk_collect(node.at_end_seq, fields, counter)
points.extend(p); all_leaves.extend(l)
elif isinstance(node, BrPerform):
if node.perf_type in ('until', 'para_until', 'varying', 'para_varying'):
counter[0] += 1
dp = DecisionPoint(id=counter[0], kind='PERFORM',
label=node.condition or '',
branch_names=['Enter', 'Skip'])
simple = parse_single_condition(node.condition) if node.condition else None
if simple and is_field(simple[0], fields):
dp.parsed = simple
elif node.condition:
cond_tree = parse_compound_condition(node.condition, fields)
if cond_tree:
leaves = collect_leaves(cond_tree)
if leaves:
dp.cond_tree = cond_tree
dp.cond_leaves = list(leaves)
points.append(dp)
p, l = _walk_collect(node.body_seq, fields, counter)
points.extend(p); all_leaves.extend(l)
elif isinstance(node, BrSeq):
for child in node.children:
p, l = collect_decision_points(child, fields, counter)
points.extend(p); all_leaves.extend(l)
return points, all_leaves
def _walk_collect(node, fields, counter):
return collect_decision_points(node, fields, counter)
# ── 覆盖率标记 ──
def mark_coverage(decision_points, leaf_stats, branch_paths, fields):
for cons, _assign in branch_paths:
for dp in decision_points:
if dp.kind == 'IF':
_mark_if(dp, cons)
elif dp.kind == 'EVALUATE':
_mark_eval(dp, cons, fields)
elif dp.kind == 'PERFORM':
_mark_perform(dp, cons)
elif dp.kind == 'SEARCH':
_mark_search(dp, cons, fields)
for leaf in leaf_stats:
for c in cons:
if _match_leaf(c, leaf):
if c[3]:
leaf.covered_true = True
else:
leaf.covered_false = True
for dp in decision_points:
dp.implied_branches = set(dp.active_branches)
def _match_constraint(c, parsed):
if len(c) != 4:
return False
return (c[0] == parsed[0] and c[1] == parsed[1]
and str(c[2]) == str(parsed[2]))
def _match_leaf(c, leaf):
if len(c) != 4:
return False
return (c[0] == leaf.field and c[1] == leaf.op
and str(c[2]) == str(leaf.value))
def _mark_if(dp, cons):
simple = getattr(dp, 'parsed', None)
if simple:
for c in cons:
if _match_constraint(c, simple):
if c[3]:
dp.active_branches.add('T')
else:
dp.active_branches.add('F')
elif dp.cond_tree and dp.cond_leaves:
assignment = {}
for leaf in dp.cond_leaves:
for c in cons:
if _match_leaf(c, leaf):
assignment[leaf] = c[3]
break
if len(assignment) == len(dp.cond_leaves):
if evaluate_tree(dp.cond_tree, assignment):
dp.active_branches.add('T')
else:
dp.active_branches.add('F')
else:
matched = 0
for leaf in dp.leaves:
for c in cons:
if _match_leaf(c, leaf):
matched += 1
break
if matched <= 1:
for c in cons:
for leaf in dp.leaves:
if _match_leaf(c, leaf):
dp.active_branches.add('T' if c[3] else 'F')
def _mark_eval(dp, cons, fields=None):
if dp.label == 'TRUE':
matched = False
for when_val, _ in dp.when_list:
parsed = parse_single_condition(when_val, fields)
if parsed:
for c in cons:
if _match_constraint(c, parsed) and c[3]:
name = f"WHEN {when_val}"
if name in dp.branch_names:
dp.active_branches.add(name)
matched = True
else:
cond_tree = parse_compound_condition(when_val, fields)
if cond_tree and not isinstance(cond_tree, CondLeaf):
leaves = list(collect_leaves(cond_tree))
assignment = {}
for leaf in leaves:
for c in cons:
if _match_leaf(c, leaf):
assignment[leaf] = c[3]
break
if len(assignment) == len(leaves):
if evaluate_tree(cond_tree, assignment):
name = f"WHEN {when_val}"
if name in dp.branch_names:
dp.active_branches.add(name)
matched = True
if not matched and 'OTHER' in dp.branch_names:
when_fields = set()
for when_val, _ in dp.when_list:
for c in cons:
if c[0] in when_val:
when_fields.add(c[0])
if when_fields:
dp.active_branches.add('OTHER')
return
for c in cons:
if c[0] == dp.label and c[1] == '=':
name = f"WHEN {c[2]}"
if name in dp.branch_names:
dp.active_branches.add(name)
elif c[0] == dp.label and c[1] == 'not_in':
dp.active_branches.add('OTHER')
def _mark_search(dp, cons, fields=None):
branch_masks = [False] * len(dp.branch_names)
for i, (cond_text, body_seq) in enumerate(dp.when_list):
cond_tree = dp.cond_trees[i] if i < len(dp.cond_trees) else None
if not cond_tree:
continue
if isinstance(cond_tree, CondLeaf):
for c in cons:
if len(c) == 4:
base_c = re.sub(r'\s*\(.*?\)\s*$', '', c[0])
base_cond = re.sub(r'\s*\(.*?\)\s*$', '', cond_tree.field)
if base_c == base_cond and c[1] == cond_tree.op \
and str(c[2]) == str(cond_tree.value) and c[3]:
branch_masks[i] = True
break
else:
leaves = list(collect_leaves(cond_tree))
assignment = {}
for leaf in leaves:
for c in cons:
if len(c) == 4:
base_c = re.sub(r'\s*\(.*?\)\s*$', '', c[0])
base_l = re.sub(r'\s*\(.*?\)\s*$', '', leaf.field)
if base_c == base_l and c[1] == leaf.op and str(c[2]) == str(leaf.value):
assignment[leaf] = c[3]
break
if len(assignment) == len(leaves):
if evaluate_tree(cond_tree, assignment):
branch_masks[i] = True
if dp.has_other:
at_end_idx = len(dp.branch_names) - 1
if not any(branch_masks[:at_end_idx]):
branch_masks[at_end_idx] = True
for i, m in enumerate(branch_masks):
if m:
dp.active_branches.add(dp.branch_names[i])
def _mark_perform(dp, cons):
simple = getattr(dp, 'parsed', None)
if simple:
for c in cons:
if _match_constraint(c, simple):
if c[3]:
dp.active_branches.add('Skip')
else:
dp.active_branches.add('Enter')
elif dp.cond_tree and dp.cond_leaves:
assignment = {}
for leaf in dp.cond_leaves:
for c in cons:
if _match_leaf(c, leaf):
assignment[leaf] = c[3]
break
if len(assignment) == len(dp.cond_leaves):
if evaluate_tree(dp.cond_tree, assignment):
dp.active_branches.add('Skip')
else:
dp.active_branches.add('Enter')
else:
for c in cons:
if c[0] == dp.label or any(c[0] == f for f in _get_fields_in_cond(dp.label)):
if c[3]:
dp.active_branches.add('Skip')
else:
dp.active_branches.add('Enter')
def _get_fields_in_cond(cond_text):
return re.findall(r'[A-Z][A-Z0-9-]*', cond_text.upper())
# ── 行号定位(基于原始源文本)──
def locate_decision_lines(decision_points, raw_source):
"""在原始源文本中搜索每个决策点的近似行号"""
lines = raw_source.upper().splitlines()
for dp in decision_points:
patterns = _build_search_patterns(dp)
for i, line in enumerate(lines):
for pat in patterns:
if re.search(pat, line):
dp.source_line = i + 1
break
if dp.source_line:
break
def _normalize(text):
"""标准化条件文本用于比较:去多余空白、标准化引号"""
t = re.sub(r'\s+', ' ', text).strip()
t = t.replace('"', "'")
return t
def _build_search_patterns(dp):
texts = []
if dp.kind == 'IF':
texts.append((r'\bIF\b', dp.label))
elif dp.kind == 'EVALUATE':
texts.append((r'\bEVALUATE\b', dp.label))
elif dp.kind == 'PERFORM':
texts.append((r'\bUNTIL\b', dp.condition if hasattr(dp, 'condition') else dp.label
if dp.label else ''))
else:
return [r'$^'] # 永不匹配
patterns = []
for keyword, condition in texts:
if not condition:
continue
norm_cond = _normalize(condition)
# 转义正则特殊字符,但保留空格(替换为\s+)
esc = re.escape(norm_cond)
esc = esc.replace(r'\ ', r'\s+')
esc = esc.replace(r'\'', r"['\"]")
patterns.append(keyword + r'\s+' + esc)
if not patterns:
return [r'$^']
return patterns
# ── HTML 报告(详情页)──
_DETAIL_HTML = '''
'''
def generate_html_report(decision_points, leaf_stats, source_lines, outpath,
filename='', index_relpath=None, covered_lines=None):
title = f"覆盖率报告 — {filename}" if filename else "覆盖率报告"
total_branches = sum(len(dp.branch_names) for dp in decision_points)
covered_branches = sum(len(dp.active_branches) for dp in decision_points)
implied_branches = sum(len(dp.implied_branches) for dp in decision_points)
if covered_lines:
# 无分支程序:隐式 100%
total_branches = max(total_branches, 1)
covered_branches = max(covered_branches, 1)
total_leaves = len(leaf_stats) * 2
covered_leaves = (sum(1 for l in leaf_stats if l.covered_true) +
sum(1 for l in leaf_stats if l.covered_false))
# 计算数值
is_implicit = bool(covered_lines) # 无分支程序,隐式 100%
dec_pct_val = (covered_branches / total_branches * 100) if total_branches else 0
dec_pct_text = "100% ✓" if is_implicit else (f"{dec_pct_val:.1f}%" if total_branches else "无")
dec_frac = "全部覆盖" if is_implicit else (f"{covered_branches}/{total_branches}" if total_branches else "—")
cond_frac = f"{covered_leaves}/{total_leaves}" if total_leaves else "—"
implied_text = f'(+{implied_branches - covered_branches} 推断)' if implied_branches > covered_branches else ''
# 颜色
if is_implicit or not total_branches or dec_pct_val >= 100:
dec_val_cls = 'val-green'
bar_cls = ''
elif dec_pct_val >= 80:
dec_val_cls = 'val-amber'
bar_cls = ' amber'
else:
dec_val_cls = 'val-red'
bar_cls = ' red'
if not total_leaves or covered_leaves == total_leaves:
cond_val_cls = 'val-green'
elif covered_leaves / total_leaves >= 0.8:
cond_val_cls = 'val-amber'
else:
cond_val_cls = 'val-red'
# 决策点表格
if decision_points:
dp_rows = []
for dp in decision_points:
ln = str(dp.source_line) if dp.source_line else '?'
branch_cells = []
for bn in dp.branch_names:
if bn in dp.active_branches:
branch_cells.append(f'{bn} ✓')
elif bn in dp.implied_branches:
branch_cells.append(f'{bn} ○')
else:
branch_cells.append(f'{bn} ✗')
dp_rows.append(f'
#{dp.id}
{dp.kind}
{ln}
'
f'
{dp.label}
'
f'
{" ".join(branch_cells)}
')
decision_table = f'''
📜 决策点
#
类型
行号
条件
分支
{"".join(dp_rows)}
'''
else:
decision_table = ''
# 叶条件表格
if leaf_stats:
leaf_rows = []
for leaf in leaf_stats:
t = '✓' if leaf.covered_true else '✗'
f = '✓' if leaf.covered_false else '✗'
leaf_rows.append(f'
{leaf.field}
{leaf.op}
'
f'
{leaf.value}
{t}
{f}
')
leaf_table = f'''
🔢 条件覆盖明细(叶条件)
字段
运算符
值
真
假
{"".join(leaf_rows)}
'''
else:
leaf_table = ''
# 源码标注
if source_lines:
line_cov = {}
for dp in decision_points:
if dp.source_line:
if dp.source_line not in line_cov:
line_cov[dp.source_line] = []
has_missed = any(bn not in dp.active_branches for bn in dp.branch_names)
has_active = any(bn in dp.active_branches for bn in dp.branch_names)
if has_active and not has_missed:
line_cov[dp.source_line].append('hl-green')
elif has_active:
line_cov[dp.source_line].append('hl-red')
else:
line_cov[dp.source_line].append('hl-amber')
# 无分支程序:所有 PD 行标记为已覆盖
if covered_lines:
for ln in covered_lines:
line_cov.setdefault(ln, []).append('hl-green')
src_lines = []
for i, line in enumerate(source_lines, 1):
cls_list = line_cov.get(i, [])
hl = ' ' + ' '.join(cls_list) if cls_list else ''
src_lines.append(f'