Compare commits
3 Commits
main
...
de506d9c31
| Author | SHA1 | Date | |
|---|---|---|---|
| de506d9c31 | |||
| c021dfe01e | |||
| 097530b036 |
@@ -0,0 +1,512 @@
|
||||
"""COBOL Test Data Generator — 模块化版入口"""
|
||||
|
||||
import sys
|
||||
import re
|
||||
import logging
|
||||
from datetime import datetime
|
||||
from pathlib import Path
|
||||
|
||||
# ── 配置(必须放在本地模块导入之前,避免循环导入) ──
|
||||
|
||||
CONFIG = {}
|
||||
|
||||
from .read import preprocess, extract_data_division, extract_procedure_division
|
||||
from .read import resolve_copybooks, parse_data_division, parse_file_section, scan_open_statements
|
||||
from .core import build_branch_tree, classify_field_roles, _init_child_names
|
||||
from .cond import parse_single_condition, is_field
|
||||
from .design import enum_paths, generate_records, _filter_stop
|
||||
from .output import output_json, output_input_files
|
||||
from .coverage import run_coverage, generate_coverage_index
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
# ── OCCURS 展开 ──
|
||||
|
||||
|
||||
def _add_subscript(name, occ):
|
||||
"""追加或扩展下标:WS-CELL → WS-CELL(1), WS-CELL(1) → WS-CELL(1,2)"""
|
||||
if name.endswith(')'):
|
||||
return name[:-1] + f',{occ})'
|
||||
return name + f'({occ})'
|
||||
|
||||
|
||||
def expand_occurs(fields):
|
||||
"""展开 OCCURS 字段为下标副本。递归处理嵌套 OCCURS。"""
|
||||
result = []
|
||||
i = 0
|
||||
while i < len(fields):
|
||||
f = fields[i]
|
||||
if f.get('occurs', 0) > 0 and not f.get('is_88'):
|
||||
children = []
|
||||
j = i + 1
|
||||
while j < len(fields):
|
||||
child = fields[j]
|
||||
if child.get('is_88'):
|
||||
children.append(child)
|
||||
j += 1
|
||||
continue
|
||||
if child['level'] <= f['level'] or child.get('level') == 77:
|
||||
break
|
||||
children.append(child)
|
||||
j += 1
|
||||
|
||||
if children:
|
||||
group = dict(f)
|
||||
group['occurs'] = 0
|
||||
result.append(group)
|
||||
for occ in range(1, f['occurs'] + 1):
|
||||
for child in children:
|
||||
copy = dict(child)
|
||||
if child.get('occurs', 0) == 0:
|
||||
copy['occurs'] = 0
|
||||
copy['occurs_depending'] = f.get('occurs_depending')
|
||||
if child.get('is_88'):
|
||||
parent = child.get('parent') or f['name']
|
||||
copy['parent'] = _add_subscript(parent, occ)
|
||||
copy['name'] = _add_subscript(child['name'], occ)
|
||||
else:
|
||||
copy['name'] = _add_subscript(child['name'], occ)
|
||||
result.append(copy)
|
||||
else:
|
||||
for occ in range(1, f['occurs'] + 1):
|
||||
copy = dict(f)
|
||||
copy['name'] = _add_subscript(f['name'], occ)
|
||||
copy['occurs'] = 0
|
||||
copy['occurs_depending'] = f.get('occurs_depending')
|
||||
result.append(copy)
|
||||
|
||||
i = j
|
||||
else:
|
||||
result.append(f)
|
||||
i += 1
|
||||
|
||||
if any(f.get('occurs', 0) > 0 for f in result):
|
||||
return expand_occurs(result)
|
||||
return result
|
||||
|
||||
|
||||
# ── 入口 ──
|
||||
|
||||
def main():
|
||||
if len(sys.argv) < 2:
|
||||
print("用法: python -m cobol_testgen <cobol文件1> [cobol文件2 ...] [输出目录]")
|
||||
sys.exit(1)
|
||||
|
||||
args = sys.argv[1:]
|
||||
|
||||
# 分离 cobol 文件与输出目录
|
||||
cobol_files = []
|
||||
outdir = None
|
||||
for a in args:
|
||||
p = Path(a)
|
||||
if p.is_dir():
|
||||
outdir = p
|
||||
elif p.suffix.upper() in ('.CBL', '.COB', '.CPY'):
|
||||
cobol_files.append(p)
|
||||
else:
|
||||
print(f"警告:跳过未知参数 {a}")
|
||||
if not cobol_files:
|
||||
print("错误:未找到任何 COBOL 文件")
|
||||
sys.exit(1)
|
||||
if outdir is None:
|
||||
outdir = cobol_files[0].parent
|
||||
|
||||
# 配置全局 Logger
|
||||
outdir.mkdir(parents=True, exist_ok=True)
|
||||
log_path = outdir / f"cobol_testgen_{datetime.now():%Y%m%d_%H%M%S}.log"
|
||||
fh = logging.FileHandler(log_path, encoding="utf-8", mode="w")
|
||||
fh.setLevel(logging.DEBUG)
|
||||
fh.setFormatter(logging.Formatter(
|
||||
"%(asctime)s [%(levelname)s] %(name)s: %(message)s"
|
||||
))
|
||||
sh = logging.StreamHandler()
|
||||
sh.setLevel(logging.INFO)
|
||||
sh.setFormatter(logging.Formatter("%(message)s"))
|
||||
root_logger = logging.getLogger()
|
||||
root_logger.setLevel(logging.DEBUG)
|
||||
root_logger.addHandler(fh)
|
||||
root_logger.addHandler(sh)
|
||||
|
||||
programs = []
|
||||
|
||||
for filepath in cobol_files:
|
||||
if not filepath.exists():
|
||||
logger.error(f"错误:文件不存在 {filepath}")
|
||||
continue
|
||||
|
||||
source = filepath.read_text(encoding='utf-8')
|
||||
source = resolve_copybooks(source, str(filepath.parent))
|
||||
preprocessed = preprocess(source)
|
||||
file_sec = parse_file_section(preprocessed)
|
||||
|
||||
# DATA DIVISION解析
|
||||
data_div = extract_data_division(preprocessed)
|
||||
if not data_div:
|
||||
logger.error(f"错误:{filepath.name} 中没有 DATA DIVISION。")
|
||||
continue
|
||||
|
||||
data_fields = parse_data_division(data_div)
|
||||
if not data_fields:
|
||||
logger.error(f"错误:{filepath.name} 中没有找到含 PIC 的字段。")
|
||||
continue
|
||||
|
||||
# FieldDef → dict
|
||||
fields_dict = []
|
||||
parent_pic = {}
|
||||
filler_counter = 0
|
||||
for f in data_fields:
|
||||
pi = f.pic_info
|
||||
name = f.name
|
||||
if name == 'FILLER':
|
||||
filler_counter += 1
|
||||
if filler_counter > 1:
|
||||
name = f'FILLER_{filler_counter}'
|
||||
entry = {
|
||||
'name': name,
|
||||
'level': f.level,
|
||||
'pic': f.pic,
|
||||
'pic_info': {
|
||||
'type': pi.type if pi else 'unknown',
|
||||
'digits': pi.digits if pi else 0,
|
||||
'decimal': pi.decimal if pi else 0,
|
||||
'length': pi.length if pi else 0,
|
||||
'signed': pi.signed if pi else False,
|
||||
},
|
||||
'value': f.value,
|
||||
'values': f.values,
|
||||
'section': f.section,
|
||||
'is_filler': f.is_filler,
|
||||
'redefines': f.redefines,
|
||||
'usage': f.usage,
|
||||
'occurs': f.occurs_count,
|
||||
'occurs_depending': f.occurs_depending,
|
||||
}
|
||||
if f.is_88:
|
||||
entry['is_88'] = True
|
||||
entry['parent'] = f.parent
|
||||
# Copy parent's pic_info for value generation
|
||||
if f.parent and f.parent in parent_pic:
|
||||
entry['pic_info'] = dict(parent_pic[f.parent])
|
||||
else:
|
||||
parent_pic[name] = entry['pic_info']
|
||||
fields_dict.append(entry)
|
||||
|
||||
fields_dict = expand_occurs(fields_dict)
|
||||
|
||||
# Build FD→children 和 field→FD 映射
|
||||
fd_fields = {}
|
||||
field_to_fd = {}
|
||||
if file_sec:
|
||||
for fd_name, rec_names in file_sec.items():
|
||||
fds = []
|
||||
seen = set()
|
||||
for rec in rec_names:
|
||||
if rec not in seen:
|
||||
fds.append(rec)
|
||||
seen.add(rec)
|
||||
for child in _init_child_names(rec, fields_dict):
|
||||
if child not in seen:
|
||||
fds.append(child)
|
||||
seen.add(child)
|
||||
fd_fields[fd_name] = fds
|
||||
for child in fds:
|
||||
field_to_fd[child] = fd_name
|
||||
|
||||
logger.info(f"\n========== {filepath.name} ==========")
|
||||
logger.info(f"\n字段列表:")
|
||||
logger.info(f"{'层级':<6} {'名称':<25} {'PIC':<15} {'类型':<12} {'长度':<5}")
|
||||
logger.info("-" * 65)
|
||||
for f in fields_dict:
|
||||
pi = f['pic_info']
|
||||
t = pi.get('type', '?')
|
||||
l = pi.get('digits', 0) + pi.get('decimal', 0) or pi.get('length', 0)
|
||||
pic_display = str(f.get('pic', '')) if f.get('pic') else ('88-level' if f.get('is_88') else '')
|
||||
logger.info(f"{f['level']:<6} {f['name']:<25} {pic_display:<15} {t:<12} {l:<5}")
|
||||
|
||||
# PROCEDURE DIVISION解析
|
||||
proc_div = extract_procedure_division(preprocessed)
|
||||
branch_paths = []
|
||||
assignments = {}
|
||||
|
||||
if proc_div:
|
||||
branch_tree, assignments = build_branch_tree(proc_div, fields_dict)
|
||||
|
||||
roles = classify_field_roles(branch_tree, assignments, fields_dict,
|
||||
source=preprocessed, proc_text=proc_div)
|
||||
logger.info(f"\n字段角色(输入/输出/出入/未用):")
|
||||
for f in fields_dict:
|
||||
if f.get('is_88'):
|
||||
continue
|
||||
logger.info(f" {f['name']:<30} {roles.get(f['name'], '?')}")
|
||||
|
||||
branch_paths_with_assigns = enum_paths(branch_tree, fields_dict)
|
||||
branch_paths_with_assigns = [
|
||||
(_filter_stop(c), a) for c, a in branch_paths_with_assigns
|
||||
]
|
||||
|
||||
# OPEN 方向解析
|
||||
open_dir = scan_open_statements(proc_div) if proc_div else {}
|
||||
|
||||
if proc_div:
|
||||
logger.info(f"\n分支路径数:{len(branch_paths_with_assigns)}")
|
||||
for i, (path_cons, _path_assign) in enumerate(branch_paths_with_assigns):
|
||||
descs = []
|
||||
for c in path_cons:
|
||||
if len(c) == 4:
|
||||
field, op, val, want = c
|
||||
if op == 'not_in':
|
||||
descs.append(f"{field} not in {val}")
|
||||
else:
|
||||
descs.append(f"{field} {op} {val} ({'T' if want else 'F'})")
|
||||
logger.debug(f" 路径 {i + 1}: {', '.join(descs)}")
|
||||
else:
|
||||
logger.warning("\n没有找到 PROCEDURE DIVISION。")
|
||||
branch_paths_with_assigns = [([], {})]
|
||||
roles = {f['name']: 'unused' for f in fields_dict}
|
||||
|
||||
# 覆盖率报告(传入原始源文本用于行号定位)
|
||||
cov_prefix = str(outdir / filepath.stem)
|
||||
index_relpath = 'coverage/index.html'
|
||||
cov_result = run_coverage(branch_tree, branch_paths_with_assigns, fields_dict,
|
||||
source, cov_prefix, index_relpath=index_relpath)
|
||||
|
||||
records, kept_path_cons = generate_records(branch_paths_with_assigns, fields_dict, assignments, file_sec=file_sec)
|
||||
|
||||
# 输出 JSON(完整文件)
|
||||
outpath = outdir / (filepath.stem + '.json')
|
||||
output_json(records, outpath, roles,
|
||||
fd_fields=fd_fields, field_to_fd=field_to_fd,
|
||||
open_dir=open_dir,
|
||||
path_cons_list=kept_path_cons)
|
||||
|
||||
# 输出入力 JSON(按 FD 拆分)
|
||||
output_input_files(records, outdir, filepath.stem, roles,
|
||||
fd_fields, field_to_fd, open_dir)
|
||||
|
||||
logger.info(f"\n输出:{outpath}({len(records)} 条记录)")
|
||||
logger.debug(f"\n记录明细:")
|
||||
for i, rec in enumerate(records, 1):
|
||||
vals = []
|
||||
for f in fields_dict:
|
||||
r = roles.get(f['name'], '?')
|
||||
marker = f"[{r[0].upper()}]" if r != '?' and r != 'unused' else ''
|
||||
vals.append(f"{marker}{f['name']}={rec.get(f['name'], '?')}")
|
||||
logger.debug(f" 记录 {i}: {' | '.join(vals)}")
|
||||
|
||||
programs.append(cov_result)
|
||||
|
||||
# 生成覆盖率总括索引页
|
||||
if programs:
|
||||
generate_coverage_index(programs, outdir)
|
||||
logger.info(f"\n覆盖率总览:{outdir / 'coverage' / 'index.html'}")
|
||||
|
||||
|
||||
# ════════════════════════════════════════════
|
||||
# Phase 1: 可编程 API(供 orchestrator.py 调用)
|
||||
# ════════════════════════════════════════════
|
||||
|
||||
|
||||
def extract_structure(cobol_source: str) -> dict:
|
||||
"""分析 COBOL 源码的结构,返回结构摘要。不生成测试数据,只做静态分析。
|
||||
|
||||
Returns:
|
||||
dict with: paragraphs, decision_points, branch_tree, file_count,
|
||||
open_directions, has_search_all, has_evaluate,
|
||||
has_call, has_break, total_branches, total_paragraphs
|
||||
"""
|
||||
preprocessed = preprocess(cobol_source)
|
||||
data_div = extract_data_division(preprocessed)
|
||||
data_fields = parse_data_division(data_div) if data_div else []
|
||||
|
||||
fields_dict = []
|
||||
for idx, f in enumerate(data_fields):
|
||||
entry = {
|
||||
'name': f.name if f.name != 'FILLER' else f'FILLER_{idx + 1}',
|
||||
'level': f.level, 'pic': f.pic,
|
||||
'pic_info': {
|
||||
'type': f.pic_info.type if f.pic_info else 'unknown',
|
||||
'digits': f.pic_info.digits if f.pic_info else 0,
|
||||
'decimal': f.pic_info.decimal if f.pic_info else 0,
|
||||
'length': f.pic_info.length if f.pic_info else 0,
|
||||
'signed': f.pic_info.signed if f.pic_info else False,
|
||||
},
|
||||
'section': f.section, 'occurs': f.occurs_count,
|
||||
'occurs_depending': f.occurs_depending,
|
||||
'redefines': f.redefines, 'usage': f.usage,
|
||||
}
|
||||
if f.is_88:
|
||||
entry['is_88'] = True
|
||||
entry['parent'] = f.parent
|
||||
entry['value'] = f.value
|
||||
entry['values'] = f.values
|
||||
fields_dict.append(entry)
|
||||
|
||||
fields_dict = expand_occurs(fields_dict)
|
||||
|
||||
proc_div = extract_procedure_division(preprocessed)
|
||||
branch_tree = None
|
||||
assignments = {}
|
||||
if proc_div:
|
||||
branch_tree, assignments = build_branch_tree(proc_div, fields_dict)
|
||||
|
||||
file_sec = parse_file_section(preprocessed)
|
||||
open_dir = scan_open_statements(proc_div) if proc_div else {}
|
||||
|
||||
from .models import BrIf, BrEval, BrSeq
|
||||
|
||||
decision_points = []
|
||||
total_branches = 0
|
||||
|
||||
def _walk(node, counter):
|
||||
nonlocal total_branches
|
||||
if isinstance(node, BrIf):
|
||||
counter[0] += 1
|
||||
branches = 2
|
||||
decision_points.append({
|
||||
"id": counter[0], "kind": "IF",
|
||||
"label": str(node.condition)[:80], "branches": branches,
|
||||
})
|
||||
total_branches += branches
|
||||
_walk(node.true_seq, counter)
|
||||
_walk(node.false_seq, counter)
|
||||
elif isinstance(node, BrEval):
|
||||
counter[0] += 1
|
||||
n = len(node.when_list) + (1 if node.has_other else 0)
|
||||
decision_points.append({
|
||||
"id": counter[0], "kind": "EVALUATE",
|
||||
"label": str(node.subject)[:80], "branches": n,
|
||||
})
|
||||
total_branches += n
|
||||
for _, seq in node.when_list:
|
||||
_walk(seq, counter)
|
||||
_walk(node.other_seq, counter)
|
||||
elif isinstance(node, BrSeq):
|
||||
for child in node.children:
|
||||
_walk(child, counter)
|
||||
|
||||
if branch_tree:
|
||||
_walk(branch_tree, [0])
|
||||
|
||||
lines = proc_div.split('\n') if proc_div else []
|
||||
paragraphs = set()
|
||||
for line in lines:
|
||||
m = re.match(r'^\s*([A-Z0-9][A-Z0-9-]*)\.\s*$', line.strip())
|
||||
if m:
|
||||
paragraphs.add(m.group(1))
|
||||
|
||||
return {
|
||||
"paragraphs": sorted(paragraphs) if paragraphs else [],
|
||||
"decision_points": decision_points,
|
||||
"branch_tree": branch_tree,
|
||||
"file_count": len(file_sec) if file_sec else 0,
|
||||
"open_directions": open_dir,
|
||||
"has_search_all": any('SEARCH' in str(dp.get('label', '')) for dp in decision_points),
|
||||
"has_evaluate": any(dp['kind'] == 'EVALUATE' for dp in decision_points),
|
||||
"has_call": 'CALL' in cobol_source.upper(),
|
||||
"has_break": any('KEY' in str(dp.get('label', '')).upper() for dp in decision_points),
|
||||
"total_branches": total_branches,
|
||||
"total_paragraphs": len(paragraphs),
|
||||
"branch_tree_obj": branch_tree,
|
||||
}
|
||||
|
||||
|
||||
def generate_data(cobol_source: str, structure: dict = None) -> list[dict]:
|
||||
"""根据 COBOL 源码生成覆盖所有路径的测试数据。
|
||||
|
||||
Args:
|
||||
cobol_source: COBOL 程序源码文本
|
||||
structure: 可选,如果已调用 extract_structure() 可传入避免重复解析
|
||||
|
||||
Returns:
|
||||
list[dict]: 测试数据记录列表,每条包含所有字段的值
|
||||
"""
|
||||
if structure is None:
|
||||
structure = extract_structure(cobol_source)
|
||||
|
||||
branch_tree = structure.get("branch_tree_obj")
|
||||
if branch_tree is None:
|
||||
return []
|
||||
|
||||
preprocessed = preprocess(cobol_source)
|
||||
data_div = extract_data_division(preprocessed)
|
||||
data_fields = parse_data_division(data_div) if data_div else []
|
||||
|
||||
fields_dict = []
|
||||
for f in data_fields:
|
||||
entry = {
|
||||
'name': f.name, 'level': f.level, 'pic': f.pic,
|
||||
'pic_info': {
|
||||
'type': f.pic_info.type if f.pic_info else 'unknown',
|
||||
'digits': f.pic_info.digits if f.pic_info else 0,
|
||||
'decimal': f.pic_info.decimal if f.pic_info else 0,
|
||||
'length': f.pic_info.length if f.pic_info else 0,
|
||||
'signed': f.pic_info.signed if f.pic_info else False,
|
||||
},
|
||||
'section': f.section, 'occurs': f.occurs_count,
|
||||
'occurs_depending': f.occurs_depending,
|
||||
'value': f.value, 'values': f.values,
|
||||
'redefines': f.redefines, 'usage': f.usage,
|
||||
}
|
||||
if f.is_88:
|
||||
entry['is_88'] = True
|
||||
entry['parent'] = f.parent
|
||||
fields_dict.append(entry)
|
||||
|
||||
fields_dict = expand_occurs(fields_dict)
|
||||
proc_div = extract_procedure_division(preprocessed)
|
||||
_, assignments = build_branch_tree(proc_div, fields_dict)
|
||||
|
||||
file_sec = parse_file_section(preprocessed)
|
||||
|
||||
branch_paths = enum_paths(branch_tree, fields_dict)
|
||||
branch_paths = [(_filter_stop(c), a) for c, a in branch_paths]
|
||||
|
||||
records, kept_paths = generate_records(branch_paths, fields_dict, assignments, file_sec=file_sec)
|
||||
return records
|
||||
|
||||
|
||||
def incremental_supplement(branch_tree, decision_gaps: list[int]) -> list[dict]:
|
||||
"""针对未覆盖的决策点,增量生成补充测试数据。
|
||||
|
||||
Args:
|
||||
branch_tree: extract_structure() 返回的 branch_tree 字段
|
||||
decision_gaps: 未覆盖的决策点 ID 列表,如 [1, 3, 5]
|
||||
|
||||
Returns:
|
||||
list[dict]: 增量测试数据,格式与 generate_data() 兼容
|
||||
"""
|
||||
from .models import BrIf, BrEval, BrSeq
|
||||
|
||||
target_decisions = set(decision_gaps)
|
||||
found = []
|
||||
|
||||
def _find_decisions(node, counter):
|
||||
if isinstance(node, BrIf):
|
||||
counter[0] += 1
|
||||
if counter[0] in target_decisions:
|
||||
found.append(("IF", node.condition))
|
||||
_find_decisions(node.true_seq, counter)
|
||||
_find_decisions(node.false_seq, counter)
|
||||
elif isinstance(node, BrEval):
|
||||
counter[0] += 1
|
||||
if counter[0] in target_decisions:
|
||||
found.append(("EVALUATE", node.subject))
|
||||
for _, seq in node.when_list:
|
||||
_find_decisions(seq, counter)
|
||||
_find_decisions(node.other_seq, counter)
|
||||
elif isinstance(node, BrSeq):
|
||||
for child in node.children:
|
||||
_find_decisions(child, counter)
|
||||
|
||||
_find_decisions(branch_tree, [0])
|
||||
|
||||
supplements = []
|
||||
for i, (kind, label) in enumerate(found):
|
||||
supplements.append({
|
||||
"_dec_id": f"incr_{i}",
|
||||
"_kind": kind,
|
||||
"_label": str(label)[:60],
|
||||
})
|
||||
|
||||
return supplements
|
||||
@@ -0,0 +1,1236 @@
|
||||
"""覆盖率统计:决策点收集 + 路径标记 + HTML报告"""
|
||||
|
||||
import re
|
||||
import logging
|
||||
from dataclasses import dataclass, field
|
||||
from pathlib import Path
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
from .models import BrSeq, BrIf, BrEval, BrPerform, BrSearch, CondLeaf
|
||||
from .cond import parse_single_condition, parse_compound_condition, is_field, collect_leaves, evaluate_tree
|
||||
|
||||
|
||||
# ── 数据模型 ──
|
||||
|
||||
@dataclass
|
||||
class LeafStat:
|
||||
field: str
|
||||
op: str
|
||||
value: str
|
||||
covered_true: bool = False
|
||||
covered_false: bool = False
|
||||
|
||||
|
||||
@dataclass
|
||||
class DecisionPoint:
|
||||
id: int
|
||||
kind: str # "IF" | "EVALUATE" | "PERFORM"
|
||||
label: str
|
||||
branch_names: list[str]
|
||||
covered_branches: set = field(default_factory=set)
|
||||
active_branches: set = field(default_factory=set)
|
||||
implied_branches: set = field(default_factory=set)
|
||||
leaves: list[LeafStat] = field(default_factory=list)
|
||||
source_line: int = 0
|
||||
when_list: list = field(default_factory=list)
|
||||
cond_tree: object = None
|
||||
cond_leaves: list = field(default_factory=list)
|
||||
|
||||
|
||||
# ── 决策点收集 ──
|
||||
|
||||
def collect_decision_points(node, fields, counter=None):
|
||||
if counter is None:
|
||||
counter = [0]
|
||||
points = []
|
||||
all_leaves = []
|
||||
|
||||
if isinstance(node, BrIf):
|
||||
counter[0] += 1
|
||||
dp = DecisionPoint(id=counter[0], kind='IF', label=node.condition,
|
||||
branch_names=['T', 'F'])
|
||||
simple = parse_single_condition(node.condition)
|
||||
if simple and is_field(simple[0], fields):
|
||||
dp.parsed = simple
|
||||
elif simple:
|
||||
dp.parsed = simple
|
||||
elif node.cond_tree:
|
||||
leaves = collect_leaves(node.cond_tree)
|
||||
if leaves:
|
||||
dp.cond_tree = node.cond_tree
|
||||
dp.cond_leaves = list(leaves)
|
||||
for leaf in leaves:
|
||||
ls = LeafStat(field=leaf.field, op=leaf.op, value=leaf.value)
|
||||
dp.leaves.append(ls)
|
||||
all_leaves.append(ls)
|
||||
points.append(dp)
|
||||
p, l = _walk_collect(node.true_seq, fields, counter)
|
||||
points.extend(p); all_leaves.extend(l)
|
||||
p, l = _walk_collect(node.false_seq, fields, counter)
|
||||
points.extend(p); all_leaves.extend(l)
|
||||
|
||||
elif isinstance(node, BrEval):
|
||||
counter[0] += 1
|
||||
names = [f"WHEN {v}" for v, _ in node.when_list]
|
||||
if node.has_other:
|
||||
names.append("OTHER")
|
||||
dp = DecisionPoint(id=counter[0], kind='EVALUATE', label=node.subject,
|
||||
branch_names=names, when_list=node.when_list)
|
||||
points.append(dp)
|
||||
for _, seq in node.when_list:
|
||||
p, l = _walk_collect(seq, fields, counter)
|
||||
points.extend(p); all_leaves.extend(l)
|
||||
p, l = _walk_collect(node.other_seq, fields, counter)
|
||||
points.extend(p); all_leaves.extend(l)
|
||||
|
||||
elif isinstance(node, BrSearch):
|
||||
counter[0] += 1
|
||||
branch_names = []
|
||||
for cond_text, seq in node.when_list:
|
||||
branch_names.append(f'WHEN {cond_text[:40]}')
|
||||
if node.has_at_end:
|
||||
branch_names.append('AT END')
|
||||
dp = DecisionPoint(id=counter[0], kind='SEARCH',
|
||||
label=node.table_name, branch_names=branch_names)
|
||||
dp.when_list = node.when_list
|
||||
dp.cond_trees = node.cond_trees
|
||||
dp.has_other = node.has_at_end
|
||||
points.append(dp)
|
||||
for cond_text, seq in node.when_list:
|
||||
p, l = _walk_collect(seq, fields, counter)
|
||||
points.extend(p); all_leaves.extend(l)
|
||||
if node.has_at_end:
|
||||
p, l = _walk_collect(node.at_end_seq, fields, counter)
|
||||
points.extend(p); all_leaves.extend(l)
|
||||
|
||||
elif isinstance(node, BrPerform):
|
||||
if node.perf_type in ('until', 'para_until', 'varying', 'para_varying'):
|
||||
counter[0] += 1
|
||||
dp = DecisionPoint(id=counter[0], kind='PERFORM',
|
||||
label=node.condition or '',
|
||||
branch_names=['Enter', 'Skip'])
|
||||
simple = parse_single_condition(node.condition) if node.condition else None
|
||||
if simple and is_field(simple[0], fields):
|
||||
dp.parsed = simple
|
||||
elif node.condition:
|
||||
cond_tree = parse_compound_condition(node.condition, fields)
|
||||
if cond_tree:
|
||||
leaves = collect_leaves(cond_tree)
|
||||
if leaves:
|
||||
dp.cond_tree = cond_tree
|
||||
dp.cond_leaves = list(leaves)
|
||||
points.append(dp)
|
||||
p, l = _walk_collect(node.body_seq, fields, counter)
|
||||
points.extend(p); all_leaves.extend(l)
|
||||
|
||||
elif isinstance(node, BrSeq):
|
||||
for child in node.children:
|
||||
p, l = collect_decision_points(child, fields, counter)
|
||||
points.extend(p); all_leaves.extend(l)
|
||||
|
||||
return points, all_leaves
|
||||
|
||||
|
||||
def _walk_collect(node, fields, counter):
|
||||
return collect_decision_points(node, fields, counter)
|
||||
|
||||
|
||||
# ── 覆盖率标记 ──
|
||||
|
||||
def mark_coverage(decision_points, leaf_stats, branch_paths, fields):
|
||||
for cons, _assign in branch_paths:
|
||||
for dp in decision_points:
|
||||
if dp.kind == 'IF':
|
||||
_mark_if(dp, cons)
|
||||
elif dp.kind == 'EVALUATE':
|
||||
_mark_eval(dp, cons, fields)
|
||||
elif dp.kind == 'PERFORM':
|
||||
_mark_perform(dp, cons)
|
||||
elif dp.kind == 'SEARCH':
|
||||
_mark_search(dp, cons, fields)
|
||||
for leaf in leaf_stats:
|
||||
for c in cons:
|
||||
if _match_leaf(c, leaf):
|
||||
if c[3]:
|
||||
leaf.covered_true = True
|
||||
else:
|
||||
leaf.covered_false = True
|
||||
|
||||
for dp in decision_points:
|
||||
dp.implied_branches = set(dp.active_branches)
|
||||
|
||||
|
||||
def _match_constraint(c, parsed):
|
||||
if len(c) != 4:
|
||||
return False
|
||||
return (c[0] == parsed[0] and c[1] == parsed[1]
|
||||
and str(c[2]) == str(parsed[2]))
|
||||
|
||||
|
||||
def _match_leaf(c, leaf):
|
||||
if len(c) != 4:
|
||||
return False
|
||||
return (c[0] == leaf.field and c[1] == leaf.op
|
||||
and str(c[2]) == str(leaf.value))
|
||||
|
||||
|
||||
def _mark_if(dp, cons):
|
||||
simple = getattr(dp, 'parsed', None)
|
||||
if simple:
|
||||
for c in cons:
|
||||
if _match_constraint(c, simple):
|
||||
if c[3]:
|
||||
dp.active_branches.add('T')
|
||||
else:
|
||||
dp.active_branches.add('F')
|
||||
elif dp.cond_tree and dp.cond_leaves:
|
||||
assignment = {}
|
||||
for leaf in dp.cond_leaves:
|
||||
for c in cons:
|
||||
if _match_leaf(c, leaf):
|
||||
assignment[leaf] = c[3]
|
||||
break
|
||||
if len(assignment) == len(dp.cond_leaves):
|
||||
if evaluate_tree(dp.cond_tree, assignment):
|
||||
dp.active_branches.add('T')
|
||||
else:
|
||||
dp.active_branches.add('F')
|
||||
else:
|
||||
matched = 0
|
||||
for leaf in dp.leaves:
|
||||
for c in cons:
|
||||
if _match_leaf(c, leaf):
|
||||
matched += 1
|
||||
break
|
||||
if matched <= 1:
|
||||
for c in cons:
|
||||
for leaf in dp.leaves:
|
||||
if _match_leaf(c, leaf):
|
||||
dp.active_branches.add('T' if c[3] else 'F')
|
||||
|
||||
|
||||
def _mark_eval(dp, cons, fields=None):
|
||||
if dp.label == 'TRUE':
|
||||
matched = False
|
||||
for when_val, _ in dp.when_list:
|
||||
parsed = parse_single_condition(when_val, fields)
|
||||
if parsed:
|
||||
for c in cons:
|
||||
if _match_constraint(c, parsed) and c[3]:
|
||||
name = f"WHEN {when_val}"
|
||||
if name in dp.branch_names:
|
||||
dp.active_branches.add(name)
|
||||
matched = True
|
||||
else:
|
||||
cond_tree = parse_compound_condition(when_val, fields)
|
||||
if cond_tree and not isinstance(cond_tree, CondLeaf):
|
||||
leaves = list(collect_leaves(cond_tree))
|
||||
assignment = {}
|
||||
for leaf in leaves:
|
||||
for c in cons:
|
||||
if _match_leaf(c, leaf):
|
||||
assignment[leaf] = c[3]
|
||||
break
|
||||
if len(assignment) == len(leaves):
|
||||
if evaluate_tree(cond_tree, assignment):
|
||||
name = f"WHEN {when_val}"
|
||||
if name in dp.branch_names:
|
||||
dp.active_branches.add(name)
|
||||
matched = True
|
||||
if not matched and 'OTHER' in dp.branch_names:
|
||||
when_fields = set()
|
||||
for when_val, _ in dp.when_list:
|
||||
for c in cons:
|
||||
if c[0] in when_val:
|
||||
when_fields.add(c[0])
|
||||
if when_fields:
|
||||
dp.active_branches.add('OTHER')
|
||||
return
|
||||
for c in cons:
|
||||
if c[0] == dp.label and c[1] == '=':
|
||||
name = f"WHEN {c[2]}"
|
||||
if name in dp.branch_names:
|
||||
dp.active_branches.add(name)
|
||||
elif c[0] == dp.label and c[1] == 'not_in':
|
||||
dp.active_branches.add('OTHER')
|
||||
|
||||
|
||||
def _mark_search(dp, cons, fields=None):
|
||||
branch_masks = [False] * len(dp.branch_names)
|
||||
for i, (cond_text, body_seq) in enumerate(dp.when_list):
|
||||
cond_tree = dp.cond_trees[i] if i < len(dp.cond_trees) else None
|
||||
if not cond_tree:
|
||||
continue
|
||||
if isinstance(cond_tree, CondLeaf):
|
||||
for c in cons:
|
||||
if len(c) == 4:
|
||||
base_c = re.sub(r'\s*\(.*?\)\s*$', '', c[0])
|
||||
base_cond = re.sub(r'\s*\(.*?\)\s*$', '', cond_tree.field)
|
||||
if base_c == base_cond and c[1] == cond_tree.op \
|
||||
and str(c[2]) == str(cond_tree.value) and c[3]:
|
||||
branch_masks[i] = True
|
||||
break
|
||||
else:
|
||||
leaves = list(collect_leaves(cond_tree))
|
||||
assignment = {}
|
||||
for leaf in leaves:
|
||||
for c in cons:
|
||||
if len(c) == 4:
|
||||
base_c = re.sub(r'\s*\(.*?\)\s*$', '', c[0])
|
||||
base_l = re.sub(r'\s*\(.*?\)\s*$', '', leaf.field)
|
||||
if base_c == base_l and c[1] == leaf.op and str(c[2]) == str(leaf.value):
|
||||
assignment[leaf] = c[3]
|
||||
break
|
||||
if len(assignment) == len(leaves):
|
||||
if evaluate_tree(cond_tree, assignment):
|
||||
branch_masks[i] = True
|
||||
if dp.has_other:
|
||||
at_end_idx = len(dp.branch_names) - 1
|
||||
if not any(branch_masks[:at_end_idx]):
|
||||
branch_masks[at_end_idx] = True
|
||||
for i, m in enumerate(branch_masks):
|
||||
if m:
|
||||
dp.active_branches.add(dp.branch_names[i])
|
||||
|
||||
|
||||
def _mark_perform(dp, cons):
|
||||
simple = getattr(dp, 'parsed', None)
|
||||
if simple:
|
||||
for c in cons:
|
||||
if _match_constraint(c, simple):
|
||||
if c[3]:
|
||||
dp.active_branches.add('Skip')
|
||||
else:
|
||||
dp.active_branches.add('Enter')
|
||||
elif dp.cond_tree and dp.cond_leaves:
|
||||
assignment = {}
|
||||
for leaf in dp.cond_leaves:
|
||||
for c in cons:
|
||||
if _match_leaf(c, leaf):
|
||||
assignment[leaf] = c[3]
|
||||
break
|
||||
if len(assignment) == len(dp.cond_leaves):
|
||||
if evaluate_tree(dp.cond_tree, assignment):
|
||||
dp.active_branches.add('Skip')
|
||||
else:
|
||||
dp.active_branches.add('Enter')
|
||||
else:
|
||||
for c in cons:
|
||||
if c[0] == dp.label or any(c[0] == f for f in _get_fields_in_cond(dp.label)):
|
||||
if c[3]:
|
||||
dp.active_branches.add('Skip')
|
||||
else:
|
||||
dp.active_branches.add('Enter')
|
||||
|
||||
|
||||
def _get_fields_in_cond(cond_text):
|
||||
return re.findall(r'[A-Z][A-Z0-9-]*', cond_text.upper())
|
||||
|
||||
|
||||
# ── 行号定位(基于原始源文本)──
|
||||
|
||||
def locate_decision_lines(decision_points, raw_source):
|
||||
"""在原始源文本中搜索每个决策点的近似行号"""
|
||||
lines = raw_source.upper().splitlines()
|
||||
for dp in decision_points:
|
||||
patterns = _build_search_patterns(dp)
|
||||
for i, line in enumerate(lines):
|
||||
for pat in patterns:
|
||||
if re.search(pat, line):
|
||||
dp.source_line = i + 1
|
||||
break
|
||||
if dp.source_line:
|
||||
break
|
||||
|
||||
|
||||
def _normalize(text):
|
||||
"""标准化条件文本用于比较:去多余空白、标准化引号"""
|
||||
t = re.sub(r'\s+', ' ', text).strip()
|
||||
t = t.replace('"', "'")
|
||||
return t
|
||||
|
||||
|
||||
def _build_search_patterns(dp):
|
||||
texts = []
|
||||
if dp.kind == 'IF':
|
||||
texts.append((r'\bIF\b', dp.label))
|
||||
elif dp.kind == 'EVALUATE':
|
||||
texts.append((r'\bEVALUATE\b', dp.label))
|
||||
elif dp.kind == 'PERFORM':
|
||||
texts.append((r'\bUNTIL\b', dp.condition if hasattr(dp, 'condition') else dp.label
|
||||
if dp.label else ''))
|
||||
else:
|
||||
return [r'$^'] # 永不匹配
|
||||
|
||||
patterns = []
|
||||
for keyword, condition in texts:
|
||||
if not condition:
|
||||
continue
|
||||
norm_cond = _normalize(condition)
|
||||
# 转义正则特殊字符,但保留空格(替换为\s+)
|
||||
esc = re.escape(norm_cond)
|
||||
esc = esc.replace(r'\ ', r'\s+')
|
||||
esc = esc.replace(r'\'', r"['\"]")
|
||||
patterns.append(keyword + r'\s+' + esc)
|
||||
if not patterns:
|
||||
return [r'$^']
|
||||
return patterns
|
||||
|
||||
|
||||
# ── HTML 报告(详情页)──
|
||||
|
||||
_DETAIL_HTML = '''<!DOCTYPE html>
|
||||
<html lang="zh">
|
||||
<head>
|
||||
<meta charset="UTF-8">
|
||||
<meta name="viewport" content="width=device-width, initial-scale=1.0">
|
||||
<title>{title}</title>
|
||||
<style>
|
||||
*, *::before, *::after {{ box-sizing: border-box; margin: 0; padding: 0; }}
|
||||
body {{
|
||||
font-family: -apple-system, BlinkMacSystemFont, "Segoe UI", Roboto, "Noto Sans SC", "PingFang SC", "Microsoft YaHei", sans-serif;
|
||||
background: #f0f2f5; color: #37474f; font-size: 14px; line-height: 1.6;
|
||||
}}
|
||||
|
||||
.topbar {{
|
||||
background: linear-gradient(135deg, #1a237e, #283593);
|
||||
color: #fff; padding: 14px 32px;
|
||||
display: flex; align-items: center; gap: 16px;
|
||||
box-shadow: 0 2px 8px rgba(0,0,0,0.15);
|
||||
}}
|
||||
.topbar a {{ color: rgba(255,255,255,0.8); text-decoration: none; font-size: 14px; }}
|
||||
.topbar a:hover {{ color: #fff; text-decoration: underline; }}
|
||||
.topbar .sep {{ color: rgba(255,255,255,0.4); }}
|
||||
.topbar h1 {{ font-size: 18px; font-weight: 600; }}
|
||||
|
||||
.container {{ max-width: 1000px; margin: 0 auto; padding: 28px 24px; }}
|
||||
|
||||
.section {{
|
||||
background: #fff; border-radius: 10px; padding: 20px 24px;
|
||||
box-shadow: 0 1px 4px rgba(0,0,0,0.06); margin-bottom: 20px;
|
||||
}}
|
||||
.section h2 {{ font-size: 16px; font-weight: 600; color: #1a237e; margin-bottom: 16px; padding-bottom: 8px; border-bottom: 2px solid #e8eaf6; }}
|
||||
|
||||
/* 统计卡片行 */
|
||||
.stats-row {{ display: flex; gap: 16px; flex-wrap: wrap; }}
|
||||
.stat-card {{
|
||||
flex: 1; min-width: 140px; background: #f5f7fa; border-radius: 8px; padding: 14px 18px;
|
||||
text-align: center;
|
||||
}}
|
||||
.stat-card .val {{ font-size: 22px; font-weight: 700; font-family: "Cascadia Code","Fira Code","JetBrains Mono",Consolas,monospace; }}
|
||||
.stat-card .lbl {{ font-size: 12px; color: #78909c; margin-top: 2px; }}
|
||||
.val-green {{ color: #00c853; }}
|
||||
.val-amber {{ color: #ff8f00; }}
|
||||
.val-red {{ color: #ff1744; }}
|
||||
.val-blue {{ color: #1a237e; }}
|
||||
|
||||
.legend {{ display: flex; gap: 20px; margin: 16px 0 0 0; font-size: 13px; color: #546e7a; }}
|
||||
.legend .dot {{ display: inline-block; width: 10px; height: 10px; border-radius: 50%; margin-right: 5px; vertical-align: middle; }}
|
||||
.dot-green {{ background: #c8e6c9; }}
|
||||
.dot-red {{ background: #ffcdd2; }}
|
||||
.dot-amber {{ background: #fff9c4; }}
|
||||
|
||||
/* 进度条 */
|
||||
.prog-bar-detail {{
|
||||
width: 100%; height: 12px; border-radius: 6px; background: #ffcdd2; overflow: hidden; margin: 10px 0 6px 0;
|
||||
}}
|
||||
.prog-fill-detail {{
|
||||
height: 100%; border-radius: 6px; background: linear-gradient(90deg, #66bb6a, #00c853);
|
||||
}}
|
||||
.prog-fill-detail.amber {{ background: linear-gradient(90deg, #ffca28, #ff8f00); }}
|
||||
.prog-fill-detail.red {{ background: linear-gradient(90deg, #ef5350, #ff1744); }}
|
||||
|
||||
/* 表格 */
|
||||
table {{ width: 100%; border-collapse: collapse; table-layout: fixed; }}
|
||||
th, td {{ padding: 10px 14px; text-align: left; border-bottom: 1px solid #eceff1; word-break: break-all; }}
|
||||
th {{ background: #f5f7fa; font-weight: 600; font-size: 12px; color: #78909c; text-transform: uppercase; letter-spacing: 0.5px; }}
|
||||
tbody tr:hover {{ background: #e8eaf6; }}
|
||||
tbody tr:last-child td {{ border-bottom: none; }}
|
||||
|
||||
/* 决策表列宽 */
|
||||
.dp-table th:nth-child(1), .dp-table td:nth-child(1) {{ width: 50px; }}
|
||||
.dp-table th:nth-child(2), .dp-table td:nth-child(2) {{ width: 70px; }}
|
||||
.dp-table th:nth-child(3), .dp-table td:nth-child(3) {{ width: 50px; }}
|
||||
.dp-table th:nth-child(5), .dp-table td:nth-child(5) {{ width: 160px; }}
|
||||
|
||||
/* 叶条件表列宽 */
|
||||
.leaf-table th:nth-child(1), .leaf-table td:nth-child(1) {{ width: 110px; }}
|
||||
.leaf-table th:nth-child(2), .leaf-table td:nth-child(2) {{ width: 60px; }}
|
||||
.leaf-table th:nth-child(4), .leaf-table td:nth-child(4),
|
||||
.leaf-table th:nth-child(5), .leaf-table td:nth-child(5) {{ width: 50px; text-align: center; }}
|
||||
|
||||
.branch-cell {{ white-space: nowrap; }}
|
||||
.branch-true {{ background: #c8e6c9; padding: 2px 8px; border-radius: 4px; font-size: 12px; margin: 0 2px; }}
|
||||
.branch-false {{ background: #ffcdd2; padding: 2px 8px; border-radius: 4px; font-size: 12px; margin: 0 2px; }}
|
||||
.branch-implied {{ background: #fff9c4; padding: 2px 8px; border-radius: 4px; font-size: 12px; margin: 0 2px; }}
|
||||
|
||||
.cond-cell {{ font-family: "Cascadia Code","Fira Code","JetBrains Mono",Consolas,monospace; text-align: center; }}
|
||||
.cond-ok {{ color: #00c853; }}
|
||||
.cond-miss {{ color: #ff5252; }}
|
||||
|
||||
/* 源码 */
|
||||
.source-section {{ font-family: "Cascadia Code","Fira Code","JetBrains Mono",Consolas,monospace; font-size: 13px; }}
|
||||
.source-line {{ display: flex; padding: 1px 0; }}
|
||||
.source-line:hover {{ background: #f5f5f5; }}
|
||||
.source-line .ln {{ width: 3.5em; color: #90a4ae; text-align: right; padding-right: 1em; user-select: none; flex-shrink: 0; }}
|
||||
.source-line .code {{ white-space: pre; flex: 1; }}
|
||||
.source-line.hl-green {{ background: #a5d6a7; }}
|
||||
.source-line.hl-green .ln {{ color: #1b5e20; font-weight: 700; }}
|
||||
.source-line.hl-red {{ background: #ef9a9a; }}
|
||||
.source-line.hl-red .ln {{ color: #b71c1c; font-weight: 700; }}
|
||||
.source-line.hl-amber {{ background: #ffe082; }}
|
||||
.source-line.hl-amber .ln {{ color: #e65100; font-weight: 700; }}
|
||||
|
||||
@media (max-width: 680px) {{
|
||||
.topbar {{ padding: 12px 16px; flex-wrap: wrap; }}
|
||||
.container {{ padding: 16px 12px; }}
|
||||
.section {{ padding: 14px 16px; }}
|
||||
.stat-card {{ min-width: 100px; padding: 10px 12px; }}
|
||||
.stat-card .val {{ font-size: 18px; }}
|
||||
th, td {{ padding: 8px 10px; }}
|
||||
}}
|
||||
</style>
|
||||
</head>
|
||||
<body>
|
||||
|
||||
<div class="topbar">
|
||||
<a href="{index_relpath}">← 覆盖率总览</a>
|
||||
<span class="sep">|</span>
|
||||
<h1>{title}</h1>
|
||||
</div>
|
||||
|
||||
<div class="container">
|
||||
|
||||
<div class="section">
|
||||
<h2>📈 覆盖率概要</h2>
|
||||
<div class="stats-row">
|
||||
<div class="stat-card">
|
||||
<div class="val {dec_val_cls}">{dec_frac}</div>
|
||||
<div class="lbl">决策覆盖率</div>
|
||||
</div>
|
||||
<div class="stat-card">
|
||||
<div class="val {cond_val_cls}">{cond_frac}</div>
|
||||
<div class="lbl">条件覆盖率</div>
|
||||
</div>
|
||||
<div class="stat-card">
|
||||
<div class="val val-blue">{dp_count_text}</div>
|
||||
<div class="lbl">决策点</div>
|
||||
</div>
|
||||
</div>
|
||||
<div class="prog-bar-detail">
|
||||
<div class="prog-fill-detail{bar_cls}" style="width:{bar_pct}%"></div>
|
||||
</div>
|
||||
<div style="text-align:right;font-size:12px;color:#78909c;">{dec_pct_text}</div>
|
||||
<div class="legend">
|
||||
<span><span class="dot dot-green"></span>已覆盖</span>
|
||||
<span><span class="dot dot-red"></span>未覆盖</span>
|
||||
<span><span class="dot dot-amber"></span>推断覆盖</span>
|
||||
</div>
|
||||
</div>
|
||||
|
||||
{decision_table}
|
||||
|
||||
{leaf_table}
|
||||
|
||||
{source_section}
|
||||
|
||||
</div>
|
||||
</body>
|
||||
</html>'''
|
||||
|
||||
|
||||
def generate_html_report(decision_points, leaf_stats, source_lines, outpath,
|
||||
filename='', index_relpath=None, covered_lines=None):
|
||||
title = f"覆盖率报告 — {filename}" if filename else "覆盖率报告"
|
||||
|
||||
total_branches = sum(len(dp.branch_names) for dp in decision_points)
|
||||
covered_branches = sum(len(dp.active_branches) for dp in decision_points)
|
||||
implied_branches = sum(len(dp.implied_branches) for dp in decision_points)
|
||||
if covered_lines:
|
||||
# 无分支程序:隐式 100%
|
||||
total_branches = max(total_branches, 1)
|
||||
covered_branches = max(covered_branches, 1)
|
||||
|
||||
total_leaves = len(leaf_stats) * 2
|
||||
covered_leaves = (sum(1 for l in leaf_stats if l.covered_true) +
|
||||
sum(1 for l in leaf_stats if l.covered_false))
|
||||
|
||||
# 计算数值
|
||||
is_implicit = bool(covered_lines) # 无分支程序,隐式 100%
|
||||
dec_pct_val = (covered_branches / total_branches * 100) if total_branches else 0
|
||||
dec_pct_text = "100% ✓" if is_implicit else (f"{dec_pct_val:.1f}%" if total_branches else "无")
|
||||
dec_frac = "全部覆盖" if is_implicit else (f"{covered_branches}/{total_branches}" if total_branches else "—")
|
||||
cond_frac = f"{covered_leaves}/{total_leaves}" if total_leaves else "—"
|
||||
implied_text = f'(+{implied_branches - covered_branches} 推断)' if implied_branches > covered_branches else ''
|
||||
|
||||
# 颜色
|
||||
if is_implicit or not total_branches or dec_pct_val >= 100:
|
||||
dec_val_cls = 'val-green'
|
||||
bar_cls = ''
|
||||
elif dec_pct_val >= 80:
|
||||
dec_val_cls = 'val-amber'
|
||||
bar_cls = ' amber'
|
||||
else:
|
||||
dec_val_cls = 'val-red'
|
||||
bar_cls = ' red'
|
||||
|
||||
if not total_leaves or covered_leaves == total_leaves:
|
||||
cond_val_cls = 'val-green'
|
||||
elif covered_leaves / total_leaves >= 0.8:
|
||||
cond_val_cls = 'val-amber'
|
||||
else:
|
||||
cond_val_cls = 'val-red'
|
||||
|
||||
# 决策点表格
|
||||
if decision_points:
|
||||
dp_rows = []
|
||||
for dp in decision_points:
|
||||
ln = str(dp.source_line) if dp.source_line else '?'
|
||||
branch_cells = []
|
||||
for bn in dp.branch_names:
|
||||
if bn in dp.active_branches:
|
||||
branch_cells.append(f'<span class="branch-true">{bn} ✓</span>')
|
||||
elif bn in dp.implied_branches:
|
||||
branch_cells.append(f'<span class="branch-implied">{bn} ○</span>')
|
||||
else:
|
||||
branch_cells.append(f'<span class="branch-false">{bn} ✗</span>')
|
||||
dp_rows.append(f'<tr><td>#{dp.id}</td><td>{dp.kind}</td><td>{ln}</td>'
|
||||
f'<td style="font-family:monospace">{dp.label}</td>'
|
||||
f'<td class="branch-cell">{" ".join(branch_cells)}</td></tr>')
|
||||
|
||||
decision_table = f'''<div class="section">
|
||||
<h2>📜 决策点</h2>
|
||||
<table class="dp-table">
|
||||
<thead><tr><th>#</th><th>类型</th><th>行号</th><th>条件</th><th>分支</th></tr></thead>
|
||||
<tbody>{"".join(dp_rows)}</tbody>
|
||||
</table>
|
||||
</div>'''
|
||||
else:
|
||||
decision_table = ''
|
||||
|
||||
# 叶条件表格
|
||||
if leaf_stats:
|
||||
leaf_rows = []
|
||||
for leaf in leaf_stats:
|
||||
t = '<span class="cond-ok cond-cell">✓</span>' if leaf.covered_true else '<span class="cond-miss cond-cell">✗</span>'
|
||||
f = '<span class="cond-ok cond-cell">✓</span>' if leaf.covered_false else '<span class="cond-miss cond-cell">✗</span>'
|
||||
leaf_rows.append(f'<tr><td>{leaf.field}</td><td>{leaf.op}</td>'
|
||||
f'<td>{leaf.value}</td><td>{t}</td><td>{f}</td></tr>')
|
||||
|
||||
leaf_table = f'''<div class="section">
|
||||
<h2>🔢 条件覆盖明细(叶条件)</h2>
|
||||
<table class="leaf-table">
|
||||
<thead><tr><th>字段</th><th>运算符</th><th>值</th><th>真</th><th>假</th></tr></thead>
|
||||
<tbody>{"".join(leaf_rows)}</tbody>
|
||||
</table>
|
||||
</div>'''
|
||||
else:
|
||||
leaf_table = ''
|
||||
|
||||
# 源码标注
|
||||
if source_lines:
|
||||
line_cov = {}
|
||||
for dp in decision_points:
|
||||
if dp.source_line:
|
||||
if dp.source_line not in line_cov:
|
||||
line_cov[dp.source_line] = []
|
||||
has_missed = any(bn not in dp.active_branches for bn in dp.branch_names)
|
||||
has_active = any(bn in dp.active_branches for bn in dp.branch_names)
|
||||
if has_active and not has_missed:
|
||||
line_cov[dp.source_line].append('hl-green')
|
||||
elif has_active:
|
||||
line_cov[dp.source_line].append('hl-red')
|
||||
else:
|
||||
line_cov[dp.source_line].append('hl-amber')
|
||||
|
||||
# 无分支程序:所有 PD 行标记为已覆盖
|
||||
if covered_lines:
|
||||
for ln in covered_lines:
|
||||
line_cov.setdefault(ln, []).append('hl-green')
|
||||
|
||||
src_lines = []
|
||||
for i, line in enumerate(source_lines, 1):
|
||||
cls_list = line_cov.get(i, [])
|
||||
hl = ' ' + ' '.join(cls_list) if cls_list else ''
|
||||
src_lines.append(f'<div class="source-line{hl}">'
|
||||
f'<span class="ln">{i}</span>'
|
||||
f'<span class="code">{line}</span></div>')
|
||||
|
||||
source_section = f'''<div class="section source-section">
|
||||
<h2>📖 源码标注</h2>
|
||||
{"".join(src_lines)}
|
||||
</div>'''
|
||||
else:
|
||||
source_section = ''
|
||||
|
||||
html = _DETAIL_HTML.format(
|
||||
title=title,
|
||||
index_relpath=index_relpath or '#',
|
||||
dec_frac=dec_frac,
|
||||
dec_pct_text=dec_pct_text,
|
||||
dec_val_cls=dec_val_cls,
|
||||
cond_frac=cond_frac,
|
||||
cond_val_cls=cond_val_cls,
|
||||
bar_cls=bar_cls,
|
||||
bar_pct=str(int(dec_pct_val)),
|
||||
decision_table=decision_table,
|
||||
leaf_table=leaf_table,
|
||||
source_section=source_section,
|
||||
dp_count_text=('—' if is_implicit else str(len(decision_points))),
|
||||
)
|
||||
|
||||
outpath = Path(outpath)
|
||||
outpath.parent.mkdir(parents=True, exist_ok=True)
|
||||
outpath.write_text(html, encoding='utf-8')
|
||||
|
||||
|
||||
# ── 总括索引页 ──
|
||||
|
||||
_INDEX_HTML = '''<!DOCTYPE html>
|
||||
<html lang="zh">
|
||||
<head>
|
||||
<meta charset="UTF-8">
|
||||
<meta name="viewport" content="width=device-width, initial-scale=1.0">
|
||||
<title>覆盖率总览</title>
|
||||
<style>
|
||||
*, *::before, *::after {{ box-sizing: border-box; margin: 0; padding: 0; }}
|
||||
body {{
|
||||
font-family: -apple-system, BlinkMacSystemFont, "Segoe UI", Roboto, "Noto Sans SC", "PingFang SC", "Microsoft YaHei", sans-serif;
|
||||
background: #f0f2f5; color: #37474f; font-size: 14px; line-height: 1.6;
|
||||
}}
|
||||
|
||||
/* 顶栏 */
|
||||
.topbar {{
|
||||
background: linear-gradient(135deg, #1a237e, #283593);
|
||||
color: #fff; padding: 18px 32px;
|
||||
display: flex; align-items: center; justify-content: space-between;
|
||||
box-shadow: 0 2px 8px rgba(0,0,0,0.15);
|
||||
}}
|
||||
.topbar h1 {{ font-size: 20px; font-weight: 600; letter-spacing: 0.5px; }}
|
||||
.topbar .ts {{ font-size: 13px; opacity: 0.8; font-family: "Cascadia Code","Fira Code","JetBrains Mono",Consolas,monospace; }}
|
||||
|
||||
.container {{ max-width: 1200px; margin: 0 auto; padding: 28px 24px; }}
|
||||
|
||||
/* 统计卡片 */
|
||||
.cards {{ display: grid; grid-template-columns: repeat(auto-fit, minmax(200px, 1fr)); gap: 16px; margin-bottom: 28px; }}
|
||||
.card {{
|
||||
background: #fff; border-radius: 10px; padding: 20px 22px;
|
||||
box-shadow: 0 1px 4px rgba(0,0,0,0.06); transition: box-shadow 0.2s, transform 0.2s;
|
||||
}}
|
||||
.card:hover {{ box-shadow: 0 4px 16px rgba(0,0,0,0.10); transform: translateY(-2px); }}
|
||||
.card .num {{ font-size: 28px; font-weight: 700; font-family: "Cascadia Code","Fira Code","JetBrains Mono",Consolas,monospace; line-height: 1.2; }}
|
||||
.card .label {{ font-size: 13px; color: #78909c; margin-top: 4px; }}
|
||||
.num-green {{ color: #00c853; }}
|
||||
.num-amber {{ color: #ff8f00; }}
|
||||
.num-red {{ color: #ff1744; }}
|
||||
.num-blue {{ color: #1a237e; }}
|
||||
|
||||
/* 图表行 */
|
||||
.charts-row {{
|
||||
display: flex; gap: 32px; justify-content: center; flex-wrap: wrap;
|
||||
background: #fff; border-radius: 10px; padding: 28px 20px;
|
||||
box-shadow: 0 1px 4px rgba(0,0,0,0.06); margin-bottom: 24px;
|
||||
}}
|
||||
.chart-box {{ text-align: center; }}
|
||||
.chart-box svg {{ display: block; margin: 0 auto; }}
|
||||
.chart-box .chart-label {{ margin-top: 8px; font-size: 14px; font-weight: 500; color: #546e7a; }}
|
||||
|
||||
.legend {{
|
||||
display: flex; justify-content: center; gap: 24px; margin: 0 0 20px 0;
|
||||
font-size: 13px; color: #546e7a;
|
||||
}}
|
||||
.legend .dot {{ display: inline-block; width: 10px; height: 10px; border-radius: 50%; margin-right: 6px; vertical-align: middle; }}
|
||||
.legend .dot-green {{ background: #00c853; }}
|
||||
.legend .dot-red {{ background: #ff5252; }}
|
||||
.legend .dot-amber {{ background: #ffd740; }}
|
||||
|
||||
/* 工具栏 */
|
||||
.toolbar {{
|
||||
display: flex; justify-content: space-between; align-items: center;
|
||||
margin-bottom: 14px; flex-wrap: wrap; gap: 10px;
|
||||
}}
|
||||
.toolbar input {{
|
||||
padding: 8px 14px; border: 1px solid #cfd8dc; border-radius: 6px;
|
||||
font-size: 14px; width: 220px; outline: none; transition: border-color 0.2s;
|
||||
font-family: inherit;
|
||||
}}
|
||||
.toolbar input:focus {{ border-color: #3f51b5; box-shadow: 0 0 0 3px rgba(63,81,181,0.12); }}
|
||||
.toolbar .sort-group {{ display: flex; gap: 6px; }}
|
||||
.toolbar .sort-btn {{
|
||||
padding: 6px 14px; border: 1px solid #cfd8dc; border-radius: 6px;
|
||||
background: #fff; cursor: pointer; font-size: 13px; color: #546e7a;
|
||||
transition: all 0.15s; font-family: inherit;
|
||||
}}
|
||||
.toolbar .sort-btn:hover {{ background: #eceff1; }}
|
||||
.toolbar .sort-btn.active {{ background: #e8eaf6; border-color: #3f51b5; color: #1a237e; font-weight: 500; }}
|
||||
|
||||
/* 表格 */
|
||||
.table-wrap {{
|
||||
background: #fff; border-radius: 10px; overflow: hidden;
|
||||
box-shadow: 0 1px 4px rgba(0,0,0,0.06);
|
||||
}}
|
||||
table {{ width: 100%; border-collapse: collapse; }}
|
||||
thead th {{
|
||||
background: #eceff1; font-weight: 600; font-size: 13px; color: #546e7a;
|
||||
padding: 12px 16px; text-align: left; cursor: pointer; user-select: none;
|
||||
position: sticky; top: 0; z-index: 1; white-space: nowrap;
|
||||
transition: background 0.15s;
|
||||
}}
|
||||
thead th:hover {{ background: #dde3e8; }}
|
||||
thead th .sort-arrow {{ margin-left: 4px; font-size: 11px; opacity: 0.4; }}
|
||||
thead th.sorted .sort-arrow {{ opacity: 1; color: #1a237e; }}
|
||||
tbody tr {{ transition: background 0.15s; }}
|
||||
tbody tr:nth-child(even) {{ background: #fafbfc; }}
|
||||
tbody tr:hover {{ background: #e8eaf6; }}
|
||||
tbody td {{ padding: 12px 16px; border-top: 1px solid #eceff1; vertical-align: middle; }}
|
||||
tbody tr.hidden {{ display: none; }}
|
||||
|
||||
.prog-name {{ font-weight: 500; }}
|
||||
.prog-name a {{ color: #283593; text-decoration: none; }}
|
||||
.prog-name a:hover {{ text-decoration: underline; color: #1a237e; }}
|
||||
|
||||
/* 进度条 */
|
||||
.prog-wrap {{
|
||||
display: inline-flex; align-items: center; gap: 10px; width: 100%;
|
||||
}}
|
||||
.prog-bar {{
|
||||
flex: 1; max-width: 180px; height: 20px; border-radius: 10px;
|
||||
background: #ffcdd2; overflow: hidden; position: relative;
|
||||
}}
|
||||
.prog-fill {{
|
||||
height: 100%; border-radius: 10px; transition: width 0.4s ease;
|
||||
background: linear-gradient(90deg, #66bb6a, #00c853);
|
||||
position: relative;
|
||||
}}
|
||||
.prog-fill.amber {{ background: linear-gradient(90deg, #ffca28, #ff8f00); }}
|
||||
.prog-fill.red {{ background: linear-gradient(90deg, #ef5350, #ff1744); }}
|
||||
.prog-fill .prog-label {{
|
||||
position: absolute; right: 6px; top: 50%; transform: translateY(-50%);
|
||||
font-size: 11px; font-weight: 700; color: #fff;
|
||||
text-shadow: 0 1px 2px rgba(0,0,0,0.3);
|
||||
}}
|
||||
.prog-fill.full {{ border-radius: 10px; }}
|
||||
.prog-text {{ font-family: "Cascadia Code","Fira Code","JetBrains Mono",Consolas,monospace; font-size: 13px; white-space: nowrap; min-width: 48px; }}
|
||||
|
||||
/* 状态徽标 */
|
||||
.badge {{
|
||||
display: inline-block; padding: 3px 10px; border-radius: 12px;
|
||||
font-size: 12px; font-weight: 600; letter-spacing: 0.3px;
|
||||
}}
|
||||
.badge-pass {{ background: #e8f5e9; color: #2e7d32; }}
|
||||
.badge-warn {{ background: #fff8e1; color: #e65100; }}
|
||||
.badge-fail {{ background: #ffebee; color: #c62828; }}
|
||||
|
||||
/* 条件覆盖列 */
|
||||
.cond-cell {{ font-family: "Cascadia Code","Fira Code","JetBrains Mono",Consolas,monospace; font-size: 13px; }}
|
||||
|
||||
/* 响应式 */
|
||||
@media (max-width: 680px) {{
|
||||
.topbar {{ flex-direction: column; align-items: flex-start; gap: 6px; padding: 14px 18px; }}
|
||||
.container {{ padding: 16px 12px; }}
|
||||
.cards {{ grid-template-columns: 1fr 1fr; }}
|
||||
.toolbar input {{ width: 100%; }}
|
||||
.toolbar {{ flex-direction: column; align-items: stretch; }}
|
||||
.prog-bar {{ max-width: 100px; }}
|
||||
thead th, tbody td {{ padding: 8px 10px; }}
|
||||
}}
|
||||
</style>
|
||||
</head>
|
||||
<body>
|
||||
|
||||
<div class="topbar">
|
||||
<h1>📊 覆盖率总览报告</h1>
|
||||
<span class="ts">{timestamp}</span>
|
||||
</div>
|
||||
|
||||
<div class="container">
|
||||
|
||||
<div class="cards">
|
||||
<div class="card">
|
||||
<div class="num {dec_num_cls}">{agg_dec_num}</div>
|
||||
<div class="label">决策覆盖率</div>
|
||||
</div>
|
||||
<div class="card">
|
||||
<div class="num {cond_num_cls}">{agg_cond_num}</div>
|
||||
<div class="label">条件覆盖率</div>
|
||||
</div>
|
||||
<div class="card">
|
||||
<div class="num num-blue">{prog_count}</div>
|
||||
<div class="label">已分析程序</div>
|
||||
</div>
|
||||
<div class="card">
|
||||
<div class="num {uncovered_num_cls}">{uncovered_count}</div>
|
||||
<div class="label">未完全覆盖程序</div>
|
||||
</div>
|
||||
</div>
|
||||
|
||||
<div class="charts-row">
|
||||
<div class="chart-box">
|
||||
{dec_ring_svg}
|
||||
<div class="chart-label">决策覆盖率</div>
|
||||
</div>
|
||||
<div class="chart-box">
|
||||
{cond_ring_svg}
|
||||
<div class="chart-label">条件覆盖率</div>
|
||||
</div>
|
||||
</div>
|
||||
|
||||
<div class="legend">
|
||||
<span><span class="dot dot-green"></span>已覆盖</span>
|
||||
<span><span class="dot dot-red"></span>未覆盖</span>
|
||||
<span><span class="dot dot-amber"></span>推断覆盖</span>
|
||||
</div>
|
||||
|
||||
<div class="toolbar">
|
||||
<input type="text" id="filterInput" placeholder="🔍 输入程序名过滤..." oninput="filterTable()">
|
||||
<div class="sort-group">
|
||||
<button class="sort-btn active" data-sort="name" onclick="setSort('name')">程序名 ↑</button>
|
||||
<button class="sort-btn" data-sort="cov" onclick="setSort('cov')">覆盖率 ↓</button>
|
||||
</div>
|
||||
</div>
|
||||
|
||||
<div class="table-wrap">
|
||||
<table id="progTable">
|
||||
<thead>
|
||||
<tr>
|
||||
<th data-col="name" onclick="sortBy('name')">程序 <span class="sort-arrow">↑</span></th>
|
||||
<th data-col="branch" onclick="sortBy('branch')">决策分支 <span class="sort-arrow">↑</span></th>
|
||||
<th data-col="cond" onclick="sortBy('cond')">条件覆盖 <span class="sort-arrow">↑</span></th>
|
||||
<th data-col="cov" onclick="sortBy('cov')">覆盖率 <span class="sort-arrow">↑</span></th>
|
||||
<th>状态</th>
|
||||
</tr>
|
||||
</thead>
|
||||
<tbody>
|
||||
{rows}
|
||||
</tbody>
|
||||
</table>
|
||||
</div>
|
||||
|
||||
</div>
|
||||
|
||||
<script>
|
||||
let sortCol = 'name', sortDir = 1;
|
||||
|
||||
function setSort(col) {{
|
||||
document.querySelectorAll('.sort-btn').forEach(b => b.classList.remove('active'));
|
||||
if (col === 'name') {{
|
||||
document.querySelector('.sort-btn[data-sort="name"]').classList.add('active');
|
||||
sortCol = 'name'; sortDir = 1;
|
||||
}} else {{
|
||||
document.querySelector('.sort-btn[data-sort="cov"]').classList.add('active');
|
||||
sortCol = 'cov'; sortDir = -1;
|
||||
}}
|
||||
doSort();
|
||||
}}
|
||||
|
||||
function sortBy(col) {{
|
||||
if (sortCol === col) {{ sortDir = -sortDir; }}
|
||||
else {{ sortCol = col; sortDir = col === 'name' ? 1 : -1; }}
|
||||
document.querySelectorAll('.sort-btn').forEach(b => b.classList.remove('active'));
|
||||
doSort();
|
||||
}}
|
||||
|
||||
function doSort() {{
|
||||
const tbody = document.querySelector('#progTable tbody');
|
||||
const rows = Array.from(tbody.querySelectorAll('tr:not(.hidden)'));
|
||||
rows.sort((a, b) => {{
|
||||
var va, vb;
|
||||
if (sortCol === 'name') {{
|
||||
va = a.cells[0].textContent.trim(); vb = b.cells[0].textContent.trim();
|
||||
return sortDir * va.localeCompare(vb);
|
||||
}} else if (sortCol === 'branch') {{
|
||||
va = a.cells[1].textContent.trim(); vb = b.cells[1].textContent.trim();
|
||||
return sortDir * va.localeCompare(vb);
|
||||
}} else if (sortCol === 'cond') {{
|
||||
va = a.cells[2].textContent.trim(); vb = b.cells[2].textContent.trim();
|
||||
return sortDir * va.localeCompare(vb);
|
||||
}} else {{
|
||||
va = parseFloat(a.getAttribute('data-cov') || '0');
|
||||
vb = parseFloat(b.getAttribute('data-cov') || '0');
|
||||
return sortDir * (va - vb);
|
||||
}}
|
||||
}});
|
||||
rows.forEach(r => tbody.appendChild(r));
|
||||
}}
|
||||
|
||||
function filterTable() {{
|
||||
const q = document.getElementById('filterInput').value.toUpperCase();
|
||||
const rows = document.querySelectorAll('#progTable tbody tr');
|
||||
rows.forEach(r => {{
|
||||
r.classList.toggle('hidden', !r.cells[0].textContent.toUpperCase().includes(q));
|
||||
}});
|
||||
doSort();
|
||||
}}
|
||||
</script>
|
||||
|
||||
</body>
|
||||
</html>'''
|
||||
|
||||
|
||||
def _ring_svg(pct, color_stops):
|
||||
"""生成 SVG 圆环 HTML。pct: 0-100 浮点数。"""
|
||||
r = 54
|
||||
circ = 2 * 3.14159265 * r
|
||||
offset = circ * (1 - pct / 100) if pct > 0 else circ
|
||||
if pct >= 80:
|
||||
stroke = '#00c853'
|
||||
elif pct >= 50:
|
||||
stroke = '#ff8f00'
|
||||
else:
|
||||
stroke = '#ff1744'
|
||||
return (
|
||||
f'<svg width="140" height="140" viewBox="0 0 140 140">'
|
||||
f'<circle cx="70" cy="70" r="{r}" fill="none" stroke="#eceff1" stroke-width="12"/>'
|
||||
f'<circle cx="70" cy="70" r="{r}" fill="none" stroke="{stroke}" stroke-width="12"'
|
||||
f' stroke-dasharray="{circ}" stroke-dashoffset="{offset}"'
|
||||
f' transform="rotate(-90 70 70)" stroke-linecap="round"/>'
|
||||
f'<text x="70" y="64" text-anchor="middle" dominant-baseline="central"'
|
||||
f' font-size="26" font-weight="700" fill="#37474f"'
|
||||
f' font-family="Cascadia Code,Fira Code,JetBrains Mono,Consolas,monospace">'
|
||||
f'{pct:.0f}%</text>'
|
||||
f'<text x="70" y="86" text-anchor="middle" dominant-baseline="central"'
|
||||
f' font-size="11" fill="#78909c">覆盖率</text>'
|
||||
f'</svg>'
|
||||
)
|
||||
|
||||
|
||||
def generate_coverage_index(programs, outdir):
|
||||
"""生成覆盖率总括索引页。"""
|
||||
from datetime import datetime
|
||||
timestamp = datetime.now().strftime('%Y-%m-%d %H:%M')
|
||||
|
||||
agg_total = sum(p['total_branches'] for p in programs)
|
||||
agg_covered = sum(p['covered_branches'] for p in programs)
|
||||
agg_implied = sum(p['implied_branches'] for p in programs)
|
||||
agg_ctotal = sum(p['total_conditions'] for p in programs)
|
||||
agg_ccovered = sum(p['covered_conditions'] for p in programs)
|
||||
|
||||
agg_dec_pct = (agg_covered / agg_total * 100) if agg_total else 0
|
||||
agg_cond_pct = (agg_ccovered / agg_ctotal * 100) if agg_ctotal else 0
|
||||
uncovered_count = sum(1 for p in programs if p['total_branches'] and
|
||||
p['covered_branches'] < p['total_branches'])
|
||||
|
||||
dec_num_cls = 'num-green' if agg_dec_pct == 100 else ('num-amber' if agg_dec_pct >= 80 else 'num-red')
|
||||
cond_num_cls = 'num-green' if agg_cond_pct == 100 else ('num-amber' if agg_cond_pct >= 80 else 'num-red')
|
||||
uncovered_num_cls = 'num-green' if uncovered_count == 0 else 'num-red'
|
||||
|
||||
def sort_key(p):
|
||||
if p['total_branches']:
|
||||
return -p['covered_branches'] / p['total_branches']
|
||||
return -1.0
|
||||
sorted_programs = sorted(programs, key=sort_key)
|
||||
|
||||
rows = []
|
||||
for p in sorted_programs:
|
||||
name = p['name']
|
||||
href = p['detail_relpath']
|
||||
tb = p['total_branches']
|
||||
cb = p['covered_branches']
|
||||
ib = p['implied_branches']
|
||||
tc = p['total_conditions']
|
||||
cc = p['covered_conditions']
|
||||
imp = p.get('implicit_100', False)
|
||||
|
||||
pct_dec = (cb / tb * 100) if tb else 0
|
||||
pct_text = "全部覆盖" if imp else (f"{pct_dec:.1f}%" if tb else "—")
|
||||
implied_text = f'(+{ib - cb} 推断)' if ib > cb else ''
|
||||
branch_text = "—" if imp else f"{cb}/{tb}"
|
||||
cond_text = f"{cc}/{tc}" if tc else "—"
|
||||
bar_pct = int(pct_dec)
|
||||
|
||||
# 进度条颜色
|
||||
if imp or pct_dec >= 100:
|
||||
bar_cls = ''
|
||||
elif pct_dec >= 80:
|
||||
bar_cls = ' amber'
|
||||
else:
|
||||
bar_cls = ' red'
|
||||
|
||||
# 状态徽标
|
||||
if tb == 0 or (cb == tb and not (ib > cb)):
|
||||
badge = '<span class="badge badge-pass">✓ 完全</span>'
|
||||
elif cb == tb and ib > cb:
|
||||
badge = '<span class="badge badge-warn">○ 推断</span>'
|
||||
elif pct_dec >= 80:
|
||||
badge = '<span class="badge badge-warn">⚠ 不足</span>'
|
||||
else:
|
||||
badge = '<span class="badge badge-fail">✗ 欠缺</span>'
|
||||
|
||||
# 条件覆盖数字颜色
|
||||
if tc:
|
||||
cond_pct = cc / tc * 100
|
||||
cond_color = 'num-green' if cond_pct == 100 else ('num-amber' if cond_pct >= 80 else 'num-red')
|
||||
cond_display = f'<span class="cond-cell {cond_color}">{cond_text}</span>'
|
||||
else:
|
||||
cond_display = '<span class="cond-cell" style="color:#b0bec5">—</span>'
|
||||
|
||||
row_class = 'row-imperfect' if cb < tb else ''
|
||||
rows.append(f'''<tr class="{row_class}" data-cov="{pct_dec}">
|
||||
<td class="prog-name"><a href="{href}">{name}</a></td>
|
||||
<td>{branch_text} {implied_text}</td>
|
||||
<td>{cond_display}</td>
|
||||
<td>
|
||||
<div class="prog-wrap">
|
||||
<div class="prog-bar">
|
||||
<div class="prog-fill{bar_cls}" style="width:{bar_pct}%">
|
||||
<span class="prog-label">{pct_text}</span>
|
||||
</div>
|
||||
</div>
|
||||
<span class="prog-text">{pct_text}</span>
|
||||
</div>
|
||||
</td>
|
||||
<td>{badge}</td>
|
||||
</tr>''')
|
||||
|
||||
dec_ring_svg = _ring_svg(agg_dec_pct, '')
|
||||
cond_ring_svg = _ring_svg(agg_cond_pct, '')
|
||||
|
||||
html = _INDEX_HTML.format(
|
||||
timestamp=timestamp,
|
||||
agg_dec_num=f"{agg_covered}/{agg_total}",
|
||||
dec_num_cls=dec_num_cls,
|
||||
agg_cond_num=f"{agg_ccovered}/{agg_ctotal}" if agg_ctotal else "无数据",
|
||||
cond_num_cls=cond_num_cls,
|
||||
prog_count=str(len(programs)),
|
||||
uncovered_num_cls=uncovered_num_cls,
|
||||
uncovered_count=str(uncovered_count),
|
||||
dec_ring_svg=dec_ring_svg,
|
||||
cond_ring_svg=cond_ring_svg,
|
||||
rows='\n'.join(rows),
|
||||
)
|
||||
|
||||
outpath = Path(outdir) / 'coverage' / 'index.html'
|
||||
outpath.parent.mkdir(parents=True, exist_ok=True)
|
||||
outpath.write_text(html, encoding='utf-8')
|
||||
|
||||
|
||||
# ── PROCEDURE DIVISION 行范围定位(用于无分支程序标记)──
|
||||
|
||||
def _find_proc_range(raw_source: str):
|
||||
"""返回 PROCEDURE DIVISION 的行范围 (start_line, end_line) 1-indexed,或 None。"""
|
||||
lines = raw_source.splitlines()
|
||||
proc_start = None
|
||||
for i, line in enumerate(lines):
|
||||
if re.search(r'PROCEDURE\s+DIVISION', line.upper()):
|
||||
proc_start = i + 1
|
||||
break
|
||||
if proc_start is None:
|
||||
return None
|
||||
# 找下一个 DIVISION 作为结束边界(或文件尾)
|
||||
for i in range(proc_start, len(lines)):
|
||||
if re.search(r'(IDENTIFICATION|DATA|ENVIRONMENT)\s+DIVISION', lines[i].upper()):
|
||||
return (proc_start, i) # 不包含下一个 DIVISION
|
||||
return (proc_start, len(lines) + 1)
|
||||
|
||||
|
||||
# ── 接入入口 ──
|
||||
|
||||
def run_coverage(branch_tree, branch_paths_with_assigns, fields,
|
||||
raw_source, output_prefix, index_relpath=None):
|
||||
"""完整覆盖率流程:收集 → 标记 → 定位 → 输出。
|
||||
|
||||
Returns:
|
||||
dict: 汇总数据,用于总括页聚合
|
||||
"""
|
||||
decision_points, leaf_stats = collect_decision_points(branch_tree, fields)
|
||||
|
||||
mark_coverage(decision_points, leaf_stats, branch_paths_with_assigns, fields)
|
||||
|
||||
if raw_source:
|
||||
locate_decision_lines(decision_points, raw_source)
|
||||
|
||||
total = sum(len(dp.branch_names) for dp in decision_points)
|
||||
covered = sum(len(dp.active_branches) for dp in decision_points)
|
||||
implied = sum(len(dp.implied_branches) for dp in decision_points)
|
||||
leaf_covered = (sum(1 for l in leaf_stats if l.covered_true) +
|
||||
sum(1 for l in leaf_stats if l.covered_false))
|
||||
leaf_total = len(leaf_stats) * 2
|
||||
|
||||
# 无决策点但有路径 → PROCEDURE DIVISION 全部覆盖
|
||||
covered_lines = set()
|
||||
if total == 0 and branch_paths_with_assigns and raw_source:
|
||||
proc_range = _find_proc_range(raw_source)
|
||||
if proc_range:
|
||||
covered_lines.update(range(proc_range[0], proc_range[1]))
|
||||
total = 1
|
||||
covered = 1
|
||||
|
||||
if output_prefix:
|
||||
generate_html_report(decision_points, leaf_stats,
|
||||
raw_source.splitlines() if raw_source else [],
|
||||
f"{output_prefix}_coverage.html",
|
||||
Path(output_prefix).stem,
|
||||
index_relpath=index_relpath,
|
||||
covered_lines=covered_lines)
|
||||
|
||||
# 控制台摘要
|
||||
if total or leaf_total:
|
||||
logger.info(f"\n=== 分支覆盖率 ===")
|
||||
if covered_lines and not decision_points:
|
||||
logger.info(" 程序无分支结构,全部代码已覆盖")
|
||||
for dp in decision_points:
|
||||
branches = []
|
||||
for bn in dp.branch_names:
|
||||
if bn in dp.active_branches:
|
||||
branches.append(f'{bn} [x]')
|
||||
elif bn in dp.implied_branches:
|
||||
branches.append(f'{bn} [o]')
|
||||
else:
|
||||
branches.append(f'{bn} [ ]')
|
||||
ln = f":{dp.source_line}" if dp.source_line else ""
|
||||
logger.info(f" #{dp.id} [{dp.kind}] {dp.label}{ln}")
|
||||
logger.info(f" {' | '.join(branches)}")
|
||||
|
||||
if total:
|
||||
pct = covered / total * 100
|
||||
logger.info(f"\n 决策覆盖率:{covered}/{total}({pct:.1f}%)")
|
||||
if leaf_total:
|
||||
pct = leaf_covered / leaf_total * 100
|
||||
logger.info(f" 条件覆盖率:{leaf_covered}/{leaf_total}({pct:.1f}%)")
|
||||
|
||||
if output_prefix:
|
||||
logger.info(f"\n 覆盖率报告:{output_prefix}_coverage.html")
|
||||
|
||||
implicit_100 = bool(covered_lines)
|
||||
return {
|
||||
'name': Path(output_prefix).stem if output_prefix else '',
|
||||
'detail_relpath': ('../' + Path(output_prefix).stem + '_coverage.html'
|
||||
if output_prefix else ''),
|
||||
'total_branches': total,
|
||||
'covered_branches': covered,
|
||||
'implied_branches': implied,
|
||||
'implicit_100': implicit_100,
|
||||
'total_conditions': leaf_total,
|
||||
'covered_conditions': leaf_covered,
|
||||
'_decision_points': decision_points,
|
||||
'_leaf_stats': leaf_stats,
|
||||
}
|
||||
|
||||
|
||||
def check_coverage(structure: dict, test_records: list[dict]) -> dict:
|
||||
"""报告 COBOL 源码的静态分支结构信息。
|
||||
|
||||
注意: 静态分析无法精确判断每条测试数据运行时覆盖了哪些分支。
|
||||
精确的路径追踪依赖 gcov(Phase 3)。此处仅报告总分支数和记录生成情况。
|
||||
|
||||
Returns:
|
||||
dict with: paragraph_rate, branch_rate, decision_rate, total_branches,
|
||||
total_paragraphs, records_count, note
|
||||
"""
|
||||
total_paragraphs = structure.get("total_paragraphs", 0)
|
||||
total_branches = structure.get("total_branches", 0)
|
||||
decision_points = structure.get("decision_points", [])
|
||||
has_data = len(test_records) > 0
|
||||
|
||||
paragraph_rate = 1.0 if (total_paragraphs > 0 and has_data) else 0.0
|
||||
|
||||
return {
|
||||
"paragraph_rate": paragraph_rate,
|
||||
"branch_rate": 0.0,
|
||||
"decision_rate": 0.0,
|
||||
"uncovered_decision_ids": [],
|
||||
"total_branches": total_branches,
|
||||
"total_paragraphs": total_paragraphs,
|
||||
"records_count": len(test_records),
|
||||
"note": "静态分析无法精确计算覆盖率。精确数据通过 gcov 获取(Phase 3)。",
|
||||
}
|
||||
@@ -20,6 +20,11 @@ class Config:
|
||||
num_records: int = 1000
|
||||
branch_pass: float = 0.80
|
||||
max_llm_cost: float = 0.50
|
||||
quality_gate_mode: str = "warn"
|
||||
quality_gate_decision_threshold: float = 0.90
|
||||
quality_gate_paragraph_threshold: float = 1.0
|
||||
gcov_enabled: bool = False
|
||||
max_quality_retries: int = 4
|
||||
|
||||
@classmethod
|
||||
def from_toml(cls, path="aurak.toml"):
|
||||
|
||||
@@ -28,6 +28,15 @@ class VerificationRun:
|
||||
field_results: list[FieldResult] = field(default_factory=list)
|
||||
runner: str = "native"
|
||||
branch_rate: float = 0.0
|
||||
paragraph_rate: float = 0.0 # 段落覆盖率
|
||||
decision_rate: float = 0.0 # 决策点覆盖率
|
||||
hina_type: str = "" # HINA 类型
|
||||
hina_confidence: float = 0.0 # HINA 确信度
|
||||
quality_score: float = 0.0 # 质量评分
|
||||
quality_warn: str = "" # 质量警告信息
|
||||
heal_retry: int = 0 # 自愈重试次数
|
||||
simple_retry: int = 0 # 朴素重试次数
|
||||
total_retry: int = 0 # 总重试次数
|
||||
llm_cost: float = 0.0
|
||||
report_path: str = ""
|
||||
debug: dict = field(default_factory=dict)
|
||||
|
||||
@@ -0,0 +1 @@
|
||||
# HINA 程序分类与质量门禁包
|
||||
@@ -0,0 +1,120 @@
|
||||
"""
|
||||
HINA 程序分类器 — L1 关键字规则 + 确信度计算。
|
||||
|
||||
通过 COBOL 源码中的关键字匹配进行程序分类,支持多级确信度判定。
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from typing import Any
|
||||
|
||||
# ── L1 规则 ──────────────────────────────────────────────────────────────
|
||||
# 格式: (分类名称, [关键字列表], 置信度阈值)
|
||||
L1_RULES: list[tuple[str, list[str], float]] = [
|
||||
("DB操作", ["EXEC SQL"], 0.95),
|
||||
("子程序调用", ["CALL", "LINKAGE SECTION"], 0.90),
|
||||
("IS INITIAL", ["IS INITIAL"], 0.99),
|
||||
("SYSIN", ["SYSIN"], 0.90),
|
||||
("编码转换", ["ALPHABETIC", "ASCII", "EBCDIC"], 0.85),
|
||||
("online", ["DFHCOMMAREA", "MAP"], 0.95),
|
||||
("SORT", ["SORT ON KEY"], 0.95),
|
||||
("MERGE", ["MERGE ON KEY"], 0.95),
|
||||
("编辑输出", ["WRITE AFTER", "WRITE BEFORE"], 0.80),
|
||||
("文件编成", ["ORGANIZATION IS"], 0.99),
|
||||
("替代索引", ["ALTERNATE RECORD KEY"], 0.99),
|
||||
]
|
||||
|
||||
# ── 冲突解决规则 ─────────────────────────────────────────────────────────
|
||||
# 当 L1 匹配到多个分类时的消歧策略:
|
||||
# value = "file_count" → 取测试数更多的分类
|
||||
# value = "has_accumulator" → 取包含累加器的分类
|
||||
CONFLICT_RULES: dict[tuple[str, str], str] = {
|
||||
("マッチング", "キーブレイク"): "file_count",
|
||||
("編集処理", "項目チェック"): "file_count",
|
||||
("キーブレイク", "項目チェック(重複)"): "has_accumulator",
|
||||
}
|
||||
|
||||
|
||||
# ── 关键字检测 ───────────────────────────────────────────────────────────
|
||||
def detect_keyword(source: str) -> list[tuple[str, float, str]]:
|
||||
"""在 COBOL 源码中搜索 L1_RULES 定义的关键字,返回匹配结果。
|
||||
|
||||
Args:
|
||||
source: COBOL 程序源码文本。
|
||||
|
||||
Returns:
|
||||
list[tuple[str, float, str]]:
|
||||
每个元素为 (分类名称, 置信度, 匹配到的关键字原文)。
|
||||
"""
|
||||
results: list[tuple[str, float, str]] = []
|
||||
source_upper = source.upper()
|
||||
|
||||
for category, keywords, confidence in L1_RULES:
|
||||
for kw in keywords:
|
||||
if kw in source_upper:
|
||||
results.append((category, confidence, kw))
|
||||
break # 同一分类只记录一次
|
||||
|
||||
return results
|
||||
|
||||
|
||||
# ── 确信度计算 ───────────────────────────────────────────────────────────
|
||||
def compute_confidence(
|
||||
source: str,
|
||||
structure: dict[str, Any] | None = None,
|
||||
llm_result: dict[str, Any] | None = None,
|
||||
) -> dict[str, Any]:
|
||||
"""计算程序分类的确信度。
|
||||
|
||||
优先级:
|
||||
1. L1 关键字命中,且最高置信度 >= 0.90 → 直接返回 L1 结果。
|
||||
2. LLM 结果存在 → 使用 LLM 的分类结果。
|
||||
3. 否则 → 返回 unknown。
|
||||
|
||||
Args:
|
||||
source: COBOL 程序源码文本。
|
||||
structure: 可选的程序结构信息(暂未使用,保留扩展)。
|
||||
llm_result: 可选的 LLM 分类结果。
|
||||
预期格式: {"category": str, "confidence": float, ...}
|
||||
|
||||
Returns:
|
||||
dict:
|
||||
- "category": str — 分类名称或 "unknown"
|
||||
- "confidence": float — 确信度 (0.0 ~ 1.0)
|
||||
- "source": str — 结果来源 ("l1" / "llm" / "unknown")
|
||||
- "matches": list — 匹配到的关键字详情
|
||||
"""
|
||||
# ── 1. L1 关键字检测 ──
|
||||
matches = detect_keyword(source)
|
||||
|
||||
# 找出最高置信度的 L1 匹配
|
||||
if matches:
|
||||
best = max(matches, key=lambda m: m[1]) # (category, confidence, keyword)
|
||||
category, confidence, _ = best
|
||||
|
||||
if confidence >= 0.90:
|
||||
return {
|
||||
"category": category,
|
||||
"confidence": confidence,
|
||||
"source": "l1",
|
||||
"matches": matches,
|
||||
}
|
||||
|
||||
# ── 2. LLM 结果 ──
|
||||
if llm_result is not None:
|
||||
llm_category = llm_result.get("category", "unknown")
|
||||
llm_confidence = llm_result.get("confidence", 0.0)
|
||||
return {
|
||||
"category": llm_category,
|
||||
"confidence": llm_confidence,
|
||||
"source": "llm",
|
||||
"matches": matches,
|
||||
}
|
||||
|
||||
# ── 3. 未知 ──
|
||||
return {
|
||||
"category": "unknown",
|
||||
"confidence": 0.0,
|
||||
"source": "unknown",
|
||||
"matches": [],
|
||||
}
|
||||
@@ -0,0 +1,62 @@
|
||||
"""
|
||||
质量门禁 — 执行前检查测试数据是否满足覆盖率和边界要求。
|
||||
|
||||
Phase 1 可用: 决策点覆盖、段落覆盖
|
||||
Phase 2 启用: HINA 必须项、字段覆盖
|
||||
"""
|
||||
|
||||
|
||||
def check(
|
||||
complete_tests: list,
|
||||
hina_result: dict,
|
||||
coverage: dict,
|
||||
decision_threshold: float = 0.90,
|
||||
paragraph_threshold: float = 1.0,
|
||||
) -> dict:
|
||||
"""质量门禁检查。
|
||||
|
||||
Args:
|
||||
complete_tests: 完整的测试数据集
|
||||
hina_result: HINA 分类结果
|
||||
coverage: check_coverage() 输出的覆盖率数据
|
||||
decision_threshold: 决策点覆盖率阈值
|
||||
paragraph_threshold: 段落覆盖率阈值
|
||||
|
||||
Returns:
|
||||
dict with: passed, score, issues
|
||||
"""
|
||||
issues = {}
|
||||
|
||||
branch_rate = coverage.get("branch_rate", 0.0)
|
||||
if branch_rate < decision_threshold:
|
||||
issues["decision_gaps"] = coverage.get("uncovered_decision_ids", [])
|
||||
|
||||
paragraph_rate = coverage.get("paragraph_rate", 0.0)
|
||||
if paragraph_rate < paragraph_threshold:
|
||||
issues.setdefault("paragraph_gaps", []).append(
|
||||
f"段落覆盖率不足: {paragraph_rate:.0%}"
|
||||
)
|
||||
|
||||
if not complete_tests:
|
||||
issues["no_data"] = True
|
||||
|
||||
passed = len(issues) == 0
|
||||
score = _compute_score(coverage, hina_result)
|
||||
|
||||
return {"passed": passed, "score": score, "issues": issues}
|
||||
|
||||
|
||||
def _compute_score(coverage: dict, hina_result: dict) -> float:
|
||||
"""质量评分公式(COBOL 版)。
|
||||
|
||||
评分 = 覆盖质量 × 0.6 + 边界质量 × 0.4
|
||||
覆盖质量 = 段落覆盖率 × 0.5 + 分支覆盖率 × 0.5
|
||||
边界质量 = HINA 必须项覆盖率(Phase 2 后启用,默认 1.0)
|
||||
"""
|
||||
paragraph_rate = coverage.get("paragraph_rate", 0.0)
|
||||
branch_rate = coverage.get("branch_rate", 0.0)
|
||||
|
||||
coverage_quality = paragraph_rate * 0.5 + branch_rate * 0.5
|
||||
boundary_quality = 1.0
|
||||
|
||||
return round(coverage_quality * 0.6 + boundary_quality * 0.4, 2)
|
||||
@@ -0,0 +1,280 @@
|
||||
"""
|
||||
HINA 混淆组判定 — 基于 LLM 的 COBOL 程序结构分类。
|
||||
|
||||
根据 extract_structure() 输出的结构特征,调用 LLM 将程序归类到
|
||||
混淆组(confusion group),并返回分类结果和策略参数。
|
||||
"""
|
||||
|
||||
import json
|
||||
import logging
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
CONFUSION_PROMPT = """你是一个 COBOL 程序混淆组分类专家。请根据以下程序结构特征,将其归类到合适的混淆组中。
|
||||
|
||||
程序结构特征:
|
||||
- 段落数: {paragraph_count}
|
||||
- 决策点总数: {decision_count}
|
||||
- IF 语句数: {if_count}
|
||||
- EVALUATE 语句数: {evaluate_count}
|
||||
- 关联文件数: {file_count}
|
||||
- OPEN 方向: {open_directions}
|
||||
- SEARCH ALL: {has_search_all}
|
||||
- CALL 语句: {has_call}
|
||||
- KEY BREAK 关键词: {has_break}
|
||||
- 总分支数: {total_branches}
|
||||
|
||||
混淆组定义:
|
||||
1. simple_sequential — 极少决策点(<=2),无 EVALUATE/SEARCH ALL/CALL,直接顺序执行
|
||||
2. condition_heavy — IF 语句占比高(>60% 的决策点),嵌套深,逻辑复杂
|
||||
3. evaluate_driven — EVALUATE 主导,多分支选择结构
|
||||
4. data_file_centric — 文件操作密集(>=2 文件),OPEN 方向多样(I-O/OUTPUT/INPUT)
|
||||
5. search_intensive — 包含 SEARCH ALL,表/数组查找为主
|
||||
6. call_based — 包含 CALL 语句,模块间调用为主
|
||||
7. mixed_complex — 同时具备多种复杂特征(决策点多且文件多且含 CALL/SEARCH 等)
|
||||
|
||||
请按 JSON 格式输出分类结果,不要包含其他文字:
|
||||
|
||||
```json
|
||||
{{
|
||||
"category": "<混淆组类别>",
|
||||
"subtype": "<子类别,如 nested_if / flat_evaluate / multi_file 等>",
|
||||
"confidence": <0~1 置信度>,
|
||||
"features": {{
|
||||
"paragraph_count": {paragraph_count},
|
||||
"decision_count": {decision_count},
|
||||
"if_count": {if_count},
|
||||
"evaluate_count": {evaluate_count},
|
||||
"file_count": {file_count},
|
||||
"has_search_all": {has_search_all},
|
||||
"has_call": {has_call},
|
||||
"has_break": {has_break},
|
||||
"total_branches": {total_branches}
|
||||
}},
|
||||
"required_tests": <建议测试用例数,整数>,
|
||||
"strategy_params": {{
|
||||
"max_nesting_depth": <最大嵌套深度建议>,
|
||||
"coverage_target": "branch" 或 "path",
|
||||
"file_isolation": true 或 false,
|
||||
"supplement_strategy": "incremental" 或 "full" 或 "skip"
|
||||
}}
|
||||
}}
|
||||
```"""
|
||||
|
||||
|
||||
def classify_with_llm(structure: dict, llm) -> dict:
|
||||
"""调用 LLM 对程序结构进行混淆组分类。
|
||||
|
||||
根据 extract_structure() 返回的结构字典,构造 CONFUSION_PROMPT
|
||||
并调用 LLM 进行分类。结果包含 category、subtype、confidence、
|
||||
features、required_tests、strategy_params。
|
||||
|
||||
Args:
|
||||
structure: extract_structure() 返回的字典,包含 paragraphs、
|
||||
decision_points、file_count、open_directions、
|
||||
has_search_all、has_evaluate、has_call、has_break、
|
||||
total_branches、total_paragraphs 等字段。
|
||||
llm: LLMClient 实例,call 方法签名为
|
||||
llm.call([{"role":"system","content":"..."},
|
||||
{"role":"user","content":prompt}]) -> str
|
||||
|
||||
Returns:
|
||||
dict: {
|
||||
"category": str,
|
||||
"subtype": str,
|
||||
"confidence": float,
|
||||
"features": dict,
|
||||
"required_tests": int,
|
||||
"strategy_params": dict
|
||||
}
|
||||
"""
|
||||
decision_points = structure.get("decision_points", [])
|
||||
if_count = sum(1 for dp in decision_points if dp.get("kind") == "IF")
|
||||
evaluate_count = sum(1 for dp in decision_points if dp.get("kind") == "EVALUATE")
|
||||
|
||||
paragraph_count = structure.get("total_paragraphs", len(structure.get("paragraphs", [])))
|
||||
open_dirs = structure.get("open_directions", {})
|
||||
|
||||
has_search_all = str(structure.get("has_search_all", False)).lower()
|
||||
has_call = str(structure.get("has_call", False)).lower()
|
||||
has_break = str(structure.get("has_break", False)).lower()
|
||||
|
||||
prompt = CONFUSION_PROMPT.format(
|
||||
paragraph_count=paragraph_count,
|
||||
decision_count=len(decision_points),
|
||||
if_count=if_count,
|
||||
evaluate_count=evaluate_count,
|
||||
file_count=structure.get("file_count", 0),
|
||||
open_directions=json.dumps(open_dirs, ensure_ascii=False),
|
||||
has_search_all=has_search_all,
|
||||
has_call=has_call,
|
||||
has_break=has_break,
|
||||
total_branches=structure.get("total_branches", 0),
|
||||
)
|
||||
|
||||
messages = [
|
||||
{"role": "system", "content": "你是一个 COBOL 程序混淆组分类专家。只输出 JSON,不要输出解释。"},
|
||||
{"role": "user", "content": prompt},
|
||||
]
|
||||
|
||||
try:
|
||||
raw = llm.call(messages)
|
||||
result = _parse_llm_response(raw)
|
||||
logger.info(
|
||||
"HINA classification: %s/%s (confidence=%.2f, tests=%s)",
|
||||
result.get("category", "?"),
|
||||
result.get("subtype", "?"),
|
||||
result.get("confidence", 0.0),
|
||||
result.get("required_tests", "?"),
|
||||
)
|
||||
return result
|
||||
except Exception as e:
|
||||
logger.warning("HINA LLM classification failed: %s", e)
|
||||
return _fallback_classification(structure)
|
||||
|
||||
|
||||
def _parse_llm_response(raw: str) -> dict:
|
||||
"""从 LLM 响应中提取 JSON 并解析。
|
||||
|
||||
处理 JSON 可能被 ```json ... ``` 包裹的情况。
|
||||
"""
|
||||
text = raw.strip()
|
||||
|
||||
# 尝试提取 ```json ... ``` 代码块
|
||||
if "```json" in text:
|
||||
start = text.index("```json") + 7
|
||||
end = text.index("```", start) if "```" in text[start:] else len(text)
|
||||
text = text[start:end].strip()
|
||||
elif "```" in text:
|
||||
# 尝试 ``` ... ``` (无 json 标注)
|
||||
start = text.index("```") + 3
|
||||
end = text.index("```", start) if "```" in text[start:] else len(text)
|
||||
text = text[start:end].strip()
|
||||
|
||||
parsed = json.loads(text)
|
||||
return _validate_result(parsed)
|
||||
|
||||
|
||||
def _validate_result(parsed: dict) -> dict:
|
||||
"""验证并规范化 LLM 返回的分类结果。"""
|
||||
defaults = {
|
||||
"category": "unknown",
|
||||
"subtype": "",
|
||||
"confidence": 0.0,
|
||||
"features": {},
|
||||
"required_tests": 1,
|
||||
"strategy_params": {
|
||||
"max_nesting_depth": 1,
|
||||
"coverage_target": "branch",
|
||||
"file_isolation": False,
|
||||
"supplement_strategy": "full",
|
||||
},
|
||||
}
|
||||
|
||||
result = {}
|
||||
for key, default_value in defaults.items():
|
||||
value = parsed.get(key, default_value)
|
||||
if key == "confidence":
|
||||
try:
|
||||
value = float(value)
|
||||
value = max(0.0, min(1.0, value))
|
||||
except (ValueError, TypeError):
|
||||
value = 0.0
|
||||
elif key == "required_tests":
|
||||
try:
|
||||
value = int(value)
|
||||
value = max(1, value)
|
||||
except (ValueError, TypeError):
|
||||
value = 1
|
||||
result[key] = value
|
||||
|
||||
return result
|
||||
|
||||
|
||||
def _fallback_classification(structure: dict) -> dict:
|
||||
"""当 LLM 调用失败时,基于规则的兜底分类。"""
|
||||
decision_points = structure.get("decision_points", [])
|
||||
if_count = sum(1 for dp in decision_points if dp.get("kind") == "IF")
|
||||
evaluate_count = sum(1 for dp in decision_points if dp.get("kind") == "EVALUATE")
|
||||
total_decisions = len(decision_points)
|
||||
file_count = structure.get("file_count", 0)
|
||||
has_search_all = structure.get("has_search_all", False)
|
||||
has_call = structure.get("has_call", False)
|
||||
has_break = structure.get("has_break", False)
|
||||
|
||||
# 规则优先级:从高到低
|
||||
if total_decisions == 0:
|
||||
category, subtype = "simple_sequential", "no_branch"
|
||||
required_tests = 1
|
||||
strategy = {"max_nesting_depth": 0, "coverage_target": "branch",
|
||||
"file_isolation": False, "supplement_strategy": "skip"}
|
||||
elif has_search_all:
|
||||
category, subtype = "search_intensive", "table_lookup"
|
||||
required_tests = max(total_decisions, 3)
|
||||
strategy = {"max_nesting_depth": 3, "coverage_target": "path",
|
||||
"file_isolation": True, "supplement_strategy": "incremental"}
|
||||
elif has_call:
|
||||
category, subtype = "call_based", "external_call"
|
||||
required_tests = max(total_decisions, 3)
|
||||
strategy = {"max_nesting_depth": 2, "coverage_target": "branch",
|
||||
"file_isolation": False, "supplement_strategy": "full"}
|
||||
elif evaluate_count > if_count and evaluate_count >= 2:
|
||||
category, subtype = "evaluate_driven", "multi_way"
|
||||
required_tests = total_decisions + 1
|
||||
strategy = {"max_nesting_depth": evaluate_count, "coverage_target": "path",
|
||||
"file_isolation": False, "supplement_strategy": "full"}
|
||||
elif file_count >= 2:
|
||||
category, subtype = "data_file_centric", "multi_file"
|
||||
required_tests = max(total_decisions, file_count * 2)
|
||||
strategy = {"max_nesting_depth": 2, "coverage_target": "branch",
|
||||
"file_isolation": True, "supplement_strategy": "incremental"}
|
||||
elif if_count >= 5 or total_decisions >= 8:
|
||||
category, subtype = "condition_heavy", "nested_if"
|
||||
required_tests = total_decisions + 2
|
||||
strategy = {"max_nesting_depth": 4, "coverage_target": "path",
|
||||
"file_isolation": False, "supplement_strategy": "incremental"}
|
||||
elif if_count >= 2:
|
||||
category, subtype = "condition_heavy", "simple_if"
|
||||
required_tests = total_decisions + 1
|
||||
strategy = {"max_nesting_depth": 2, "coverage_target": "branch",
|
||||
"file_isolation": False, "supplement_strategy": "incremental"}
|
||||
else:
|
||||
category, subtype = "simple_sequential", "minimal"
|
||||
required_tests = 1
|
||||
strategy = {"max_nesting_depth": 0, "coverage_target": "branch",
|
||||
"file_isolation": False, "supplement_strategy": "skip"}
|
||||
|
||||
# 检查是否应升级为 mixed_complex
|
||||
complexity_flags = sum([
|
||||
has_search_all,
|
||||
has_call,
|
||||
has_break,
|
||||
file_count >= 2,
|
||||
if_count >= 5,
|
||||
evaluate_count >= 3,
|
||||
])
|
||||
if complexity_flags >= 3:
|
||||
category, subtype = "mixed_complex", f"{subtype}_plus"
|
||||
required_tests = max(required_tests, 10)
|
||||
strategy["max_nesting_depth"] = max(strategy.get("max_nesting_depth", 2), 5)
|
||||
strategy["coverage_target"] = "path"
|
||||
strategy["supplement_strategy"] = "full"
|
||||
|
||||
return {
|
||||
"category": category,
|
||||
"subtype": subtype,
|
||||
"confidence": 0.6,
|
||||
"features": {
|
||||
"paragraph_count": structure.get("total_paragraphs", len(structure.get("paragraphs", []))),
|
||||
"decision_count": total_decisions,
|
||||
"if_count": if_count,
|
||||
"evaluate_count": evaluate_count,
|
||||
"file_count": file_count,
|
||||
"has_search_all": has_search_all,
|
||||
"has_call": has_call,
|
||||
"has_break": has_break,
|
||||
"total_branches": structure.get("total_branches", 0),
|
||||
},
|
||||
"required_tests": required_tests,
|
||||
"strategy_params": strategy,
|
||||
}
|
||||
@@ -0,0 +1,82 @@
|
||||
"""
|
||||
分层重试 — 部署在 orchestrator 调用者层(main.py / worker.py)。
|
||||
"""
|
||||
import logging
|
||||
import os
|
||||
from typing import Callable
|
||||
from data.diff_result import VerificationRun
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
HEALING_FIXES = {
|
||||
"compile_error": {
|
||||
"detect": lambda log: "not found" in (log or "").lower(),
|
||||
"fix": lambda: _try_set_env(
|
||||
"COB_LIBRARY_PATH",
|
||||
"D:\\360安全浏览器下载\\GC32-BDB-SP1-rename-7z-to-exe\\lib\\gnucobol",
|
||||
),
|
||||
},
|
||||
"s0c7": {
|
||||
"detect": lambda log: "S0C7" in (log or ""),
|
||||
"fix": lambda: logger.warning("[Retry] S0C7 需要人工修正测试数据中的数值字段"),
|
||||
},
|
||||
}
|
||||
|
||||
|
||||
def _try_set_env(key: str, value: str) -> None:
|
||||
"""尝试设置环境变量(如果当前未设置)"""
|
||||
if not os.environ.get(key):
|
||||
os.environ[key] = value
|
||||
logger.info(f"[Retry] 已设置环境变量 {key}={value}")
|
||||
else:
|
||||
logger.info(f"[Retry] {key} 已存在,跳过")
|
||||
|
||||
|
||||
class RetryHandler:
|
||||
def __init__(self, max_heal: int = 2, max_simple: int = 3):
|
||||
self.max_heal = max_heal
|
||||
self.max_simple = max_simple
|
||||
self.heal_count = 0
|
||||
self.simple_count = 0
|
||||
self.history: list[VerificationRun] = []
|
||||
|
||||
def run(self, pipeline_fn: Callable[[], VerificationRun]) -> VerificationRun:
|
||||
while (self.heal_count + self.simple_count) < (self.max_heal + self.max_simple):
|
||||
vr = pipeline_fn()
|
||||
self.history.append(vr)
|
||||
|
||||
if vr.status in ("PASS", "QUALITY_WARN"):
|
||||
vr.heal_retry = self.heal_count
|
||||
vr.simple_retry = self.simple_count
|
||||
vr.total_retry = self.heal_count + self.simple_count
|
||||
return vr
|
||||
|
||||
if vr.status in ("BLOCKED", "ERROR") and self.heal_count < self.max_heal:
|
||||
build_log = vr.debug.get("cobol_build", {}).get("log", "")
|
||||
healed = False
|
||||
for name, fix_def in HEALING_FIXES.items():
|
||||
if fix_def["detect"](build_log):
|
||||
fix_def["fix"]()
|
||||
self.heal_count += 1
|
||||
healed = True
|
||||
logger.info(
|
||||
f"[Retry] 自愈修复应用: {name} "
|
||||
f"(heal_retry={self.heal_count})"
|
||||
)
|
||||
break
|
||||
if healed:
|
||||
continue
|
||||
|
||||
self.simple_count += 1
|
||||
logger.info(f"[Retry] 朴素重试 (simple_retry={self.simple_count})")
|
||||
|
||||
logger.error("[Retry] 重试次数超过上限,标记 FATAL")
|
||||
vr = self.history[-1] if self.history else VerificationRun(
|
||||
status="FATAL", exit_code=4
|
||||
)
|
||||
vr.status = "FATAL"
|
||||
vr.exit_code = 4
|
||||
vr.heal_retry = self.heal_count
|
||||
vr.simple_retry = self.simple_count
|
||||
vr.total_retry = self.heal_count + self.simple_count
|
||||
return vr
|
||||
@@ -0,0 +1,103 @@
|
||||
"""
|
||||
HINA 策略模板 — 根据程序分类定义必须的测试项和边界条件。
|
||||
|
||||
Task 2.2: 必须项模板 + supplement 函数
|
||||
"""
|
||||
|
||||
STRATEGY_TEMPLATES: dict[str, dict] = {
|
||||
"マッチング": {
|
||||
"required": [
|
||||
"COM-N001", "COM-N002", "COM-A002", "COM-A003",
|
||||
"MT-N001", "MT-N002", "MT-N004", "MT-N005", "MT-N006",
|
||||
],
|
||||
"boundary": ["MT-B001", "MT-B002"],
|
||||
},
|
||||
"キーブレイク": {
|
||||
"required": [
|
||||
"COM-N001", "COM-A002",
|
||||
"KB-N001", "KB-N004", "KB-N005", "KB-A001",
|
||||
],
|
||||
"boundary": ["KB-B001", "KB-B002"],
|
||||
},
|
||||
"条件分岐": {
|
||||
"required": [
|
||||
"B-N001", "B-N003", "B-N006", "B-N009",
|
||||
],
|
||||
},
|
||||
"内部表検索": {
|
||||
"required": [
|
||||
"T-N001", "T-N002", "T-A001", "T-A002",
|
||||
],
|
||||
},
|
||||
"項目チェック": {
|
||||
"required": [
|
||||
"VF-N001", "VF-N002", "VF-N004", "VF-A001",
|
||||
],
|
||||
},
|
||||
}
|
||||
|
||||
|
||||
def get_strategy(hina_type: str) -> dict:
|
||||
"""返回对应 HINA 类型的策略模板。
|
||||
|
||||
Args:
|
||||
hina_type: HINA 程序分类名称(如 "マッチング")。
|
||||
|
||||
Returns:
|
||||
dict: required 列表及可选的 boundary 列表。
|
||||
未知类型返回空模板 {"required": [], "boundary": []}。
|
||||
"""
|
||||
return STRATEGY_TEMPLATES.get(hina_type, {"required": [], "boundary": []})
|
||||
|
||||
|
||||
def _make_marker(code: str, prefix: str = "REQ") -> dict:
|
||||
"""生成一条标记记录。"""
|
||||
return {
|
||||
"id": f"{prefix}-{code}",
|
||||
"coverage_targets": [code],
|
||||
"fields": {},
|
||||
}
|
||||
|
||||
|
||||
def supplement(base_tests: list[dict], hina_result: dict) -> list[dict]:
|
||||
"""根据 HINA 类型追加模板中的必须项标记记录。
|
||||
|
||||
从 ``hina_result["category"]`` 获取分类,查找对应的策略模板,
|
||||
将模板中所有的 required 和 boundary 项以标记记录形式追加到测试列表末尾。
|
||||
|
||||
Args:
|
||||
base_tests: 已有的测试数据列表(每个元素为 dict)。
|
||||
hina_result: HINA 分类结果,至少包含 ``{"category": str}``。
|
||||
|
||||
Returns:
|
||||
list[dict]: 追加必须项标记记录后的完整测试列表。
|
||||
"""
|
||||
hina_type = hina_result.get("category", "unknown")
|
||||
template = get_strategy(hina_type)
|
||||
result = list(base_tests)
|
||||
|
||||
for code in template.get("required", []):
|
||||
result.append(_make_marker(code))
|
||||
|
||||
for code in template.get("boundary", []):
|
||||
result.append(_make_marker(code, prefix="BND"))
|
||||
|
||||
return result
|
||||
|
||||
|
||||
def supplement_only(base_tests: list[dict], hina_gaps: list[str]) -> list[dict]:
|
||||
"""增量补充指定必须项的标记记录。
|
||||
|
||||
根据传入的 code 列表(而不是从模板查找),只追加缺失的那些必须项标记。
|
||||
|
||||
Args:
|
||||
base_tests: 已有的测试数据列表(每个元素为 dict)。
|
||||
hina_gaps: 需要补充的 HINA 必须项 code 列表。
|
||||
|
||||
Returns:
|
||||
list[dict]: 追加标记记录后的完整测试列表。
|
||||
"""
|
||||
result = list(base_tests)
|
||||
for code in hina_gaps:
|
||||
result.append(_make_marker(code))
|
||||
return result
|
||||
@@ -15,6 +15,9 @@ def main():
|
||||
p.add_argument("--verbose", action="store_true")
|
||||
p.add_argument("--dry-run", action="store_true")
|
||||
p.add_argument("--output-dir", default="./reports")
|
||||
p.add_argument("--quality-gate-mode", choices=["warn", "off"], default="warn",
|
||||
help="质量门禁模式: warn=记录警告, off=关闭")
|
||||
p.add_argument("--gcov", action="store_true", help="启用 gcov 覆盖率采集")
|
||||
args = p.parse_args()
|
||||
|
||||
if args.dry_run:
|
||||
@@ -35,6 +38,8 @@ def main():
|
||||
c.runner_mode = args.runner
|
||||
c.coverage_default = args.coverage
|
||||
c.tolerance = args.tolerance
|
||||
c.quality_gate_mode = args.quality_gate_mode
|
||||
c.gcov_enabled = args.gcov
|
||||
vr = run_pipeline(c, args.copybook, args.cobol_src, args.java_src, args.mapping)
|
||||
t = vr.fields_matched + vr.fields_mismatched
|
||||
print(f"{vr.program}: {vr.status} ({vr.fields_matched}/{t}, {vr.duration_s:.0f}s)" if t else f"{vr.program}: {vr.status}")
|
||||
|
||||
+72
-2
@@ -1,7 +1,7 @@
|
||||
import shutil, time
|
||||
import shutil, time, logging
|
||||
from pathlib import Path
|
||||
from data.field_tree import FieldTree
|
||||
from data.test_case import TestSuite, SparkConfig
|
||||
from data.test_case import TestSuite, SparkConfig, TestCase
|
||||
from data.diff_result import VerificationRun, FieldResult
|
||||
from runners.runner import Runner
|
||||
from runners.native_java_runner import NativeJavaRunner
|
||||
@@ -18,6 +18,14 @@ from comparator.cobol_binary_reader import CobolBinaryReader
|
||||
from report.generator import ReportGenerator
|
||||
from storage.bundle import TestDataBundle
|
||||
from config import Config
|
||||
from cobol_testgen import extract_structure, generate_data, incremental_supplement
|
||||
from cobol_testgen.coverage import check_coverage
|
||||
from hina.gate import check as gate_check
|
||||
from hina.classifier import compute_confidence
|
||||
from hina.hina_agent import classify_with_llm
|
||||
from hina.strategy import supplement as strategy_supplement
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def run_pipeline(cfg: Config, cpath: str, cbl: str, java: str, map_path: str) -> VerificationRun:
|
||||
@@ -40,6 +48,68 @@ def run_pipeline(cfg: Config, cpath: str, cbl: str, java: str, map_path: str) ->
|
||||
if vr.llm_cost > cfg.max_llm_cost:
|
||||
return _done(vr, t0, "BLOCKED", 3)
|
||||
|
||||
# ── Phase 1+2: cobol_testgen + HINA Agent + 策略 Agent + 质量门禁 ──
|
||||
try:
|
||||
cobol_src_text = Path(cbl).read_text(encoding="utf-8")
|
||||
structure = extract_structure(cobol_src_text)
|
||||
|
||||
# HINA Agent 类型判定
|
||||
hina_result = {}
|
||||
try:
|
||||
hina_result = compute_confidence(cobol_src_text, structure)
|
||||
if hina_result.get("confidence", 0) < 0.7 and structure:
|
||||
llm_hina = classify_with_llm(structure, llm)
|
||||
if llm_hina.get("confidence", 0) > hina_result.get("confidence", 0):
|
||||
hina_result = llm_hina
|
||||
vr.hina_type = hina_result.get("category", "")
|
||||
vr.hina_confidence = hina_result.get("confidence", 0.0)
|
||||
vr.debug["hina_result"] = hina_result
|
||||
except Exception as e:
|
||||
vr.debug["hina_agent_error"] = str(e)
|
||||
logger.warning(f"[orchestrator] HINA Agent 判定失败: {e}")
|
||||
|
||||
# cobol_testgen 路径枚举 + 基础数据生成
|
||||
base_records = generate_data(cobol_src_text, structure)
|
||||
vr.debug["cobol_testgen_records"] = len(base_records)
|
||||
vr.debug["total_branches"] = structure.get("total_branches", 0)
|
||||
|
||||
base_testcases = []
|
||||
for i, rec in enumerate(base_records):
|
||||
base_testcases.append(TestCase(id=f"CTG-{i+1:04d}", fields=dict(rec)))
|
||||
|
||||
# 策略 Agent 补充
|
||||
strategy_tests = strategy_supplement(base_testcases, hina_result)
|
||||
complete_tests = base_testcases + strategy_tests
|
||||
|
||||
# 质量门禁循环
|
||||
cov = check_coverage(structure, base_records)
|
||||
for attempt in range(cfg.max_quality_retries):
|
||||
gate_result = gate_check(
|
||||
complete_tests, hina_result, cov,
|
||||
decision_threshold=cfg.quality_gate_decision_threshold,
|
||||
paragraph_threshold=cfg.quality_gate_paragraph_threshold,
|
||||
)
|
||||
if gate_result.get("passed"):
|
||||
break
|
||||
gaps = gate_result.get("issues", {}).get("decision_gaps", [])
|
||||
if gaps and structure.get("branch_tree_obj"):
|
||||
delta = incremental_supplement(structure["branch_tree_obj"], gaps)
|
||||
base_records.extend(delta)
|
||||
cov = check_coverage(structure, base_records)
|
||||
else:
|
||||
break
|
||||
|
||||
vr.paragraph_rate = cov.get("paragraph_rate", 0.0)
|
||||
vr.branch_rate = cov.get("branch_rate", 0.0)
|
||||
vr.decision_rate = cov.get("decision_rate", 0.0)
|
||||
|
||||
if cfg.quality_gate_mode != "off" and not gate_result.get("passed", True):
|
||||
vr.quality_warn = f"质量门禁未完全通过 (尝试 {attempt+1} 次)"
|
||||
vr.debug["quality_issues"] = gate_result.get("issues", {})
|
||||
except Exception as e:
|
||||
vr.debug["cobol_testgen_error"] = str(e)
|
||||
logger.warning(f"[orchestrator] cobol_testgen 分析失败: {e}")
|
||||
|
||||
suite = Agent2Data(llm).design(tree, cfg.coverage_default, cfg.runner_mode == "spark")
|
||||
vr.llm_cost += 0.002
|
||||
vr.debug["test_cases"] = [{"id":tc.id,"fields":tc.fields,"targets":tc.coverage_targets} for tc in suite.test_cases]
|
||||
|
||||
Reference in New Issue
Block a user