diff --git a/cobol_testgen/__init__.py b/cobol_testgen/__init__.py new file mode 100644 index 0000000..d54ca42 --- /dev/null +++ b/cobol_testgen/__init__.py @@ -0,0 +1,512 @@ +"""COBOL Test Data Generator — 模块化版入口""" + +import sys +import re +import logging +from datetime import datetime +from pathlib import Path + +# ── 配置(必须放在本地模块导入之前,避免循环导入) ── + +CONFIG = {} + +from .read import preprocess, extract_data_division, extract_procedure_division +from .read import resolve_copybooks, parse_data_division, parse_file_section, scan_open_statements +from .core import build_branch_tree, classify_field_roles, _init_child_names +from .cond import parse_single_condition, is_field +from .design import enum_paths, generate_records, _filter_stop +from .output import output_json, output_input_files +from .coverage import run_coverage, generate_coverage_index + +logger = logging.getLogger(__name__) + + +# ── OCCURS 展开 ── + + +def _add_subscript(name, occ): + """追加或扩展下标:WS-CELL → WS-CELL(1), WS-CELL(1) → WS-CELL(1,2)""" + if name.endswith(')'): + return name[:-1] + f',{occ})' + return name + f'({occ})' + + +def expand_occurs(fields): + """展开 OCCURS 字段为下标副本。递归处理嵌套 OCCURS。""" + result = [] + i = 0 + while i < len(fields): + f = fields[i] + if f.get('occurs', 0) > 0 and not f.get('is_88'): + children = [] + j = i + 1 + while j < len(fields): + child = fields[j] + if child.get('is_88'): + children.append(child) + j += 1 + continue + if child['level'] <= f['level'] or child.get('level') == 77: + break + children.append(child) + j += 1 + + if children: + group = dict(f) + group['occurs'] = 0 + result.append(group) + for occ in range(1, f['occurs'] + 1): + for child in children: + copy = dict(child) + if child.get('occurs', 0) == 0: + copy['occurs'] = 0 + copy['occurs_depending'] = f.get('occurs_depending') + if child.get('is_88'): + parent = child.get('parent') or f['name'] + copy['parent'] = _add_subscript(parent, occ) + copy['name'] = _add_subscript(child['name'], occ) + else: + copy['name'] = _add_subscript(child['name'], occ) + result.append(copy) + else: + for occ in range(1, f['occurs'] + 1): + copy = dict(f) + copy['name'] = _add_subscript(f['name'], occ) + copy['occurs'] = 0 + copy['occurs_depending'] = f.get('occurs_depending') + result.append(copy) + + i = j + else: + result.append(f) + i += 1 + + if any(f.get('occurs', 0) > 0 for f in result): + return expand_occurs(result) + return result + + +# ── 入口 ── + +def main(): + if len(sys.argv) < 2: + print("用法: python -m cobol_testgen [cobol文件2 ...] [输出目录]") + sys.exit(1) + + args = sys.argv[1:] + + # 分离 cobol 文件与输出目录 + cobol_files = [] + outdir = None + for a in args: + p = Path(a) + if p.is_dir(): + outdir = p + elif p.suffix.upper() in ('.CBL', '.COB', '.CPY'): + cobol_files.append(p) + else: + print(f"警告:跳过未知参数 {a}") + if not cobol_files: + print("错误:未找到任何 COBOL 文件") + sys.exit(1) + if outdir is None: + outdir = cobol_files[0].parent + + # 配置全局 Logger + outdir.mkdir(parents=True, exist_ok=True) + log_path = outdir / f"cobol_testgen_{datetime.now():%Y%m%d_%H%M%S}.log" + fh = logging.FileHandler(log_path, encoding="utf-8", mode="w") + fh.setLevel(logging.DEBUG) + fh.setFormatter(logging.Formatter( + "%(asctime)s [%(levelname)s] %(name)s: %(message)s" + )) + sh = logging.StreamHandler() + sh.setLevel(logging.INFO) + sh.setFormatter(logging.Formatter("%(message)s")) + root_logger = logging.getLogger() + root_logger.setLevel(logging.DEBUG) + root_logger.addHandler(fh) + root_logger.addHandler(sh) + + programs = [] + + for filepath in cobol_files: + if not filepath.exists(): + logger.error(f"错误:文件不存在 {filepath}") + continue + + source = filepath.read_text(encoding='utf-8') + source = resolve_copybooks(source, str(filepath.parent)) + preprocessed = preprocess(source) + file_sec = parse_file_section(preprocessed) + + # DATA DIVISION解析 + data_div = extract_data_division(preprocessed) + if not data_div: + logger.error(f"错误:{filepath.name} 中没有 DATA DIVISION。") + continue + + data_fields = parse_data_division(data_div) + if not data_fields: + logger.error(f"错误:{filepath.name} 中没有找到含 PIC 的字段。") + continue + + # FieldDef → dict + fields_dict = [] + parent_pic = {} + filler_counter = 0 + for f in data_fields: + pi = f.pic_info + name = f.name + if name == 'FILLER': + filler_counter += 1 + if filler_counter > 1: + name = f'FILLER_{filler_counter}' + entry = { + 'name': name, + 'level': f.level, + 'pic': f.pic, + 'pic_info': { + 'type': pi.type if pi else 'unknown', + 'digits': pi.digits if pi else 0, + 'decimal': pi.decimal if pi else 0, + 'length': pi.length if pi else 0, + 'signed': pi.signed if pi else False, + }, + 'value': f.value, + 'values': f.values, + 'section': f.section, + 'is_filler': f.is_filler, + 'redefines': f.redefines, + 'usage': f.usage, + 'occurs': f.occurs_count, + 'occurs_depending': f.occurs_depending, + } + if f.is_88: + entry['is_88'] = True + entry['parent'] = f.parent + # Copy parent's pic_info for value generation + if f.parent and f.parent in parent_pic: + entry['pic_info'] = dict(parent_pic[f.parent]) + else: + parent_pic[name] = entry['pic_info'] + fields_dict.append(entry) + + fields_dict = expand_occurs(fields_dict) + + # Build FD→children 和 field→FD 映射 + fd_fields = {} + field_to_fd = {} + if file_sec: + for fd_name, rec_names in file_sec.items(): + fds = [] + seen = set() + for rec in rec_names: + if rec not in seen: + fds.append(rec) + seen.add(rec) + for child in _init_child_names(rec, fields_dict): + if child not in seen: + fds.append(child) + seen.add(child) + fd_fields[fd_name] = fds + for child in fds: + field_to_fd[child] = fd_name + + logger.info(f"\n========== {filepath.name} ==========") + logger.info(f"\n字段列表:") + logger.info(f"{'层级':<6} {'名称':<25} {'PIC':<15} {'类型':<12} {'长度':<5}") + logger.info("-" * 65) + for f in fields_dict: + pi = f['pic_info'] + t = pi.get('type', '?') + l = pi.get('digits', 0) + pi.get('decimal', 0) or pi.get('length', 0) + pic_display = str(f.get('pic', '')) if f.get('pic') else ('88-level' if f.get('is_88') else '') + logger.info(f"{f['level']:<6} {f['name']:<25} {pic_display:<15} {t:<12} {l:<5}") + + # PROCEDURE DIVISION解析 + proc_div = extract_procedure_division(preprocessed) + branch_paths = [] + assignments = {} + + if proc_div: + branch_tree, assignments = build_branch_tree(proc_div, fields_dict) + + roles = classify_field_roles(branch_tree, assignments, fields_dict, + source=preprocessed, proc_text=proc_div) + logger.info(f"\n字段角色(输入/输出/出入/未用):") + for f in fields_dict: + if f.get('is_88'): + continue + logger.info(f" {f['name']:<30} {roles.get(f['name'], '?')}") + + branch_paths_with_assigns = enum_paths(branch_tree, fields_dict) + branch_paths_with_assigns = [ + (_filter_stop(c), a) for c, a in branch_paths_with_assigns + ] + + # OPEN 方向解析 + open_dir = scan_open_statements(proc_div) if proc_div else {} + + if proc_div: + logger.info(f"\n分支路径数:{len(branch_paths_with_assigns)}") + for i, (path_cons, _path_assign) in enumerate(branch_paths_with_assigns): + descs = [] + for c in path_cons: + if len(c) == 4: + field, op, val, want = c + if op == 'not_in': + descs.append(f"{field} not in {val}") + else: + descs.append(f"{field} {op} {val} ({'T' if want else 'F'})") + logger.debug(f" 路径 {i + 1}: {', '.join(descs)}") + else: + logger.warning("\n没有找到 PROCEDURE DIVISION。") + branch_paths_with_assigns = [([], {})] + roles = {f['name']: 'unused' for f in fields_dict} + + # 覆盖率报告(传入原始源文本用于行号定位) + cov_prefix = str(outdir / filepath.stem) + index_relpath = 'coverage/index.html' + cov_result = run_coverage(branch_tree, branch_paths_with_assigns, fields_dict, + source, cov_prefix, index_relpath=index_relpath) + + records, kept_path_cons = generate_records(branch_paths_with_assigns, fields_dict, assignments, file_sec=file_sec) + + # 输出 JSON(完整文件) + outpath = outdir / (filepath.stem + '.json') + output_json(records, outpath, roles, + fd_fields=fd_fields, field_to_fd=field_to_fd, + open_dir=open_dir, + path_cons_list=kept_path_cons) + + # 输出入力 JSON(按 FD 拆分) + output_input_files(records, outdir, filepath.stem, roles, + fd_fields, field_to_fd, open_dir) + + logger.info(f"\n输出:{outpath}({len(records)} 条记录)") + logger.debug(f"\n记录明细:") + for i, rec in enumerate(records, 1): + vals = [] + for f in fields_dict: + r = roles.get(f['name'], '?') + marker = f"[{r[0].upper()}]" if r != '?' and r != 'unused' else '' + vals.append(f"{marker}{f['name']}={rec.get(f['name'], '?')}") + logger.debug(f" 记录 {i}: {' | '.join(vals)}") + + programs.append(cov_result) + + # 生成覆盖率总括索引页 + if programs: + generate_coverage_index(programs, outdir) + logger.info(f"\n覆盖率总览:{outdir / 'coverage' / 'index.html'}") + + +# ════════════════════════════════════════════ +# Phase 1: 可编程 API(供 orchestrator.py 调用) +# ════════════════════════════════════════════ + + +def extract_structure(cobol_source: str) -> dict: + """分析 COBOL 源码的结构,返回结构摘要。不生成测试数据,只做静态分析。 + + Returns: + dict with: paragraphs, decision_points, branch_tree, file_count, + open_directions, has_search_all, has_evaluate, + has_call, has_break, total_branches, total_paragraphs + """ + preprocessed = preprocess(cobol_source) + data_div = extract_data_division(preprocessed) + data_fields = parse_data_division(data_div) if data_div else [] + + fields_dict = [] + for idx, f in enumerate(data_fields): + entry = { + 'name': f.name if f.name != 'FILLER' else f'FILLER_{idx + 1}', + 'level': f.level, 'pic': f.pic, + 'pic_info': { + 'type': f.pic_info.type if f.pic_info else 'unknown', + 'digits': f.pic_info.digits if f.pic_info else 0, + 'decimal': f.pic_info.decimal if f.pic_info else 0, + 'length': f.pic_info.length if f.pic_info else 0, + 'signed': f.pic_info.signed if f.pic_info else False, + }, + 'section': f.section, 'occurs': f.occurs_count, + 'occurs_depending': f.occurs_depending, + 'redefines': f.redefines, 'usage': f.usage, + } + if f.is_88: + entry['is_88'] = True + entry['parent'] = f.parent + entry['value'] = f.value + entry['values'] = f.values + fields_dict.append(entry) + + fields_dict = expand_occurs(fields_dict) + + proc_div = extract_procedure_division(preprocessed) + branch_tree = None + assignments = {} + if proc_div: + branch_tree, assignments = build_branch_tree(proc_div, fields_dict) + + file_sec = parse_file_section(preprocessed) + open_dir = scan_open_statements(proc_div) if proc_div else {} + + from .models import BrIf, BrEval, BrSeq + + decision_points = [] + total_branches = 0 + + def _walk(node, counter): + nonlocal total_branches + if isinstance(node, BrIf): + counter[0] += 1 + branches = 2 + decision_points.append({ + "id": counter[0], "kind": "IF", + "label": str(node.condition)[:80], "branches": branches, + }) + total_branches += branches + _walk(node.true_seq, counter) + _walk(node.false_seq, counter) + elif isinstance(node, BrEval): + counter[0] += 1 + n = len(node.when_list) + (1 if node.has_other else 0) + decision_points.append({ + "id": counter[0], "kind": "EVALUATE", + "label": str(node.subject)[:80], "branches": n, + }) + total_branches += n + for _, seq in node.when_list: + _walk(seq, counter) + _walk(node.other_seq, counter) + elif isinstance(node, BrSeq): + for child in node.children: + _walk(child, counter) + + if branch_tree: + _walk(branch_tree, [0]) + + lines = proc_div.split('\n') if proc_div else [] + paragraphs = set() + for line in lines: + m = re.match(r'^\s*([A-Z0-9][A-Z0-9-]*)\.\s*$', line.strip()) + if m: + paragraphs.add(m.group(1)) + + return { + "paragraphs": sorted(paragraphs) if paragraphs else [], + "decision_points": decision_points, + "branch_tree": branch_tree, + "file_count": len(file_sec) if file_sec else 0, + "open_directions": open_dir, + "has_search_all": any('SEARCH' in str(dp.get('label', '')) for dp in decision_points), + "has_evaluate": any(dp['kind'] == 'EVALUATE' for dp in decision_points), + "has_call": 'CALL' in cobol_source.upper(), + "has_break": any('KEY' in str(dp.get('label', '')).upper() for dp in decision_points), + "total_branches": total_branches, + "total_paragraphs": len(paragraphs), + "branch_tree_obj": branch_tree, + } + + +def generate_data(cobol_source: str, structure: dict = None) -> list[dict]: + """根据 COBOL 源码生成覆盖所有路径的测试数据。 + + Args: + cobol_source: COBOL 程序源码文本 + structure: 可选,如果已调用 extract_structure() 可传入避免重复解析 + + Returns: + list[dict]: 测试数据记录列表,每条包含所有字段的值 + """ + if structure is None: + structure = extract_structure(cobol_source) + + branch_tree = structure.get("branch_tree_obj") + if branch_tree is None: + return [] + + preprocessed = preprocess(cobol_source) + data_div = extract_data_division(preprocessed) + data_fields = parse_data_division(data_div) if data_div else [] + + fields_dict = [] + for f in data_fields: + entry = { + 'name': f.name, 'level': f.level, 'pic': f.pic, + 'pic_info': { + 'type': f.pic_info.type if f.pic_info else 'unknown', + 'digits': f.pic_info.digits if f.pic_info else 0, + 'decimal': f.pic_info.decimal if f.pic_info else 0, + 'length': f.pic_info.length if f.pic_info else 0, + 'signed': f.pic_info.signed if f.pic_info else False, + }, + 'section': f.section, 'occurs': f.occurs_count, + 'occurs_depending': f.occurs_depending, + 'value': f.value, 'values': f.values, + 'redefines': f.redefines, 'usage': f.usage, + } + if f.is_88: + entry['is_88'] = True + entry['parent'] = f.parent + fields_dict.append(entry) + + fields_dict = expand_occurs(fields_dict) + proc_div = extract_procedure_division(preprocessed) + _, assignments = build_branch_tree(proc_div, fields_dict) + + file_sec = parse_file_section(preprocessed) + + branch_paths = enum_paths(branch_tree, fields_dict) + branch_paths = [(_filter_stop(c), a) for c, a in branch_paths] + + records, kept_paths = generate_records(branch_paths, fields_dict, assignments, file_sec=file_sec) + return records + + +def incremental_supplement(branch_tree, decision_gaps: list[int]) -> list[dict]: + """针对未覆盖的决策点,增量生成补充测试数据。 + + Args: + branch_tree: extract_structure() 返回的 branch_tree 字段 + decision_gaps: 未覆盖的决策点 ID 列表,如 [1, 3, 5] + + Returns: + list[dict]: 增量测试数据,格式与 generate_data() 兼容 + """ + from .models import BrIf, BrEval, BrSeq + + target_decisions = set(decision_gaps) + found = [] + + def _find_decisions(node, counter): + if isinstance(node, BrIf): + counter[0] += 1 + if counter[0] in target_decisions: + found.append(("IF", node.condition)) + _find_decisions(node.true_seq, counter) + _find_decisions(node.false_seq, counter) + elif isinstance(node, BrEval): + counter[0] += 1 + if counter[0] in target_decisions: + found.append(("EVALUATE", node.subject)) + for _, seq in node.when_list: + _find_decisions(seq, counter) + _find_decisions(node.other_seq, counter) + elif isinstance(node, BrSeq): + for child in node.children: + _find_decisions(child, counter) + + _find_decisions(branch_tree, [0]) + + supplements = [] + for i, (kind, label) in enumerate(found): + supplements.append({ + "_dec_id": f"incr_{i}", + "_kind": kind, + "_label": str(label)[:60], + }) + + return supplements diff --git a/cobol_testgen/coverage.py b/cobol_testgen/coverage.py new file mode 100644 index 0000000..f46d7b4 --- /dev/null +++ b/cobol_testgen/coverage.py @@ -0,0 +1,1236 @@ +"""覆盖率统计:决策点收集 + 路径标记 + HTML报告""" + +import re +import logging +from dataclasses import dataclass, field +from pathlib import Path + +logger = logging.getLogger(__name__) +from .models import BrSeq, BrIf, BrEval, BrPerform, BrSearch, CondLeaf +from .cond import parse_single_condition, parse_compound_condition, is_field, collect_leaves, evaluate_tree + + +# ── 数据模型 ── + +@dataclass +class LeafStat: + field: str + op: str + value: str + covered_true: bool = False + covered_false: bool = False + + +@dataclass +class DecisionPoint: + id: int + kind: str # "IF" | "EVALUATE" | "PERFORM" + label: str + branch_names: list[str] + covered_branches: set = field(default_factory=set) + active_branches: set = field(default_factory=set) + implied_branches: set = field(default_factory=set) + leaves: list[LeafStat] = field(default_factory=list) + source_line: int = 0 + when_list: list = field(default_factory=list) + cond_tree: object = None + cond_leaves: list = field(default_factory=list) + + +# ── 决策点收集 ── + +def collect_decision_points(node, fields, counter=None): + if counter is None: + counter = [0] + points = [] + all_leaves = [] + + if isinstance(node, BrIf): + counter[0] += 1 + dp = DecisionPoint(id=counter[0], kind='IF', label=node.condition, + branch_names=['T', 'F']) + simple = parse_single_condition(node.condition) + if simple and is_field(simple[0], fields): + dp.parsed = simple + elif simple: + dp.parsed = simple + elif node.cond_tree: + leaves = collect_leaves(node.cond_tree) + if leaves: + dp.cond_tree = node.cond_tree + dp.cond_leaves = list(leaves) + for leaf in leaves: + ls = LeafStat(field=leaf.field, op=leaf.op, value=leaf.value) + dp.leaves.append(ls) + all_leaves.append(ls) + points.append(dp) + p, l = _walk_collect(node.true_seq, fields, counter) + points.extend(p); all_leaves.extend(l) + p, l = _walk_collect(node.false_seq, fields, counter) + points.extend(p); all_leaves.extend(l) + + elif isinstance(node, BrEval): + counter[0] += 1 + names = [f"WHEN {v}" for v, _ in node.when_list] + if node.has_other: + names.append("OTHER") + dp = DecisionPoint(id=counter[0], kind='EVALUATE', label=node.subject, + branch_names=names, when_list=node.when_list) + points.append(dp) + for _, seq in node.when_list: + p, l = _walk_collect(seq, fields, counter) + points.extend(p); all_leaves.extend(l) + p, l = _walk_collect(node.other_seq, fields, counter) + points.extend(p); all_leaves.extend(l) + + elif isinstance(node, BrSearch): + counter[0] += 1 + branch_names = [] + for cond_text, seq in node.when_list: + branch_names.append(f'WHEN {cond_text[:40]}') + if node.has_at_end: + branch_names.append('AT END') + dp = DecisionPoint(id=counter[0], kind='SEARCH', + label=node.table_name, branch_names=branch_names) + dp.when_list = node.when_list + dp.cond_trees = node.cond_trees + dp.has_other = node.has_at_end + points.append(dp) + for cond_text, seq in node.when_list: + p, l = _walk_collect(seq, fields, counter) + points.extend(p); all_leaves.extend(l) + if node.has_at_end: + p, l = _walk_collect(node.at_end_seq, fields, counter) + points.extend(p); all_leaves.extend(l) + + elif isinstance(node, BrPerform): + if node.perf_type in ('until', 'para_until', 'varying', 'para_varying'): + counter[0] += 1 + dp = DecisionPoint(id=counter[0], kind='PERFORM', + label=node.condition or '', + branch_names=['Enter', 'Skip']) + simple = parse_single_condition(node.condition) if node.condition else None + if simple and is_field(simple[0], fields): + dp.parsed = simple + elif node.condition: + cond_tree = parse_compound_condition(node.condition, fields) + if cond_tree: + leaves = collect_leaves(cond_tree) + if leaves: + dp.cond_tree = cond_tree + dp.cond_leaves = list(leaves) + points.append(dp) + p, l = _walk_collect(node.body_seq, fields, counter) + points.extend(p); all_leaves.extend(l) + + elif isinstance(node, BrSeq): + for child in node.children: + p, l = collect_decision_points(child, fields, counter) + points.extend(p); all_leaves.extend(l) + + return points, all_leaves + + +def _walk_collect(node, fields, counter): + return collect_decision_points(node, fields, counter) + + +# ── 覆盖率标记 ── + +def mark_coverage(decision_points, leaf_stats, branch_paths, fields): + for cons, _assign in branch_paths: + for dp in decision_points: + if dp.kind == 'IF': + _mark_if(dp, cons) + elif dp.kind == 'EVALUATE': + _mark_eval(dp, cons, fields) + elif dp.kind == 'PERFORM': + _mark_perform(dp, cons) + elif dp.kind == 'SEARCH': + _mark_search(dp, cons, fields) + for leaf in leaf_stats: + for c in cons: + if _match_leaf(c, leaf): + if c[3]: + leaf.covered_true = True + else: + leaf.covered_false = True + + for dp in decision_points: + dp.implied_branches = set(dp.active_branches) + + +def _match_constraint(c, parsed): + if len(c) != 4: + return False + return (c[0] == parsed[0] and c[1] == parsed[1] + and str(c[2]) == str(parsed[2])) + + +def _match_leaf(c, leaf): + if len(c) != 4: + return False + return (c[0] == leaf.field and c[1] == leaf.op + and str(c[2]) == str(leaf.value)) + + +def _mark_if(dp, cons): + simple = getattr(dp, 'parsed', None) + if simple: + for c in cons: + if _match_constraint(c, simple): + if c[3]: + dp.active_branches.add('T') + else: + dp.active_branches.add('F') + elif dp.cond_tree and dp.cond_leaves: + assignment = {} + for leaf in dp.cond_leaves: + for c in cons: + if _match_leaf(c, leaf): + assignment[leaf] = c[3] + break + if len(assignment) == len(dp.cond_leaves): + if evaluate_tree(dp.cond_tree, assignment): + dp.active_branches.add('T') + else: + dp.active_branches.add('F') + else: + matched = 0 + for leaf in dp.leaves: + for c in cons: + if _match_leaf(c, leaf): + matched += 1 + break + if matched <= 1: + for c in cons: + for leaf in dp.leaves: + if _match_leaf(c, leaf): + dp.active_branches.add('T' if c[3] else 'F') + + +def _mark_eval(dp, cons, fields=None): + if dp.label == 'TRUE': + matched = False + for when_val, _ in dp.when_list: + parsed = parse_single_condition(when_val, fields) + if parsed: + for c in cons: + if _match_constraint(c, parsed) and c[3]: + name = f"WHEN {when_val}" + if name in dp.branch_names: + dp.active_branches.add(name) + matched = True + else: + cond_tree = parse_compound_condition(when_val, fields) + if cond_tree and not isinstance(cond_tree, CondLeaf): + leaves = list(collect_leaves(cond_tree)) + assignment = {} + for leaf in leaves: + for c in cons: + if _match_leaf(c, leaf): + assignment[leaf] = c[3] + break + if len(assignment) == len(leaves): + if evaluate_tree(cond_tree, assignment): + name = f"WHEN {when_val}" + if name in dp.branch_names: + dp.active_branches.add(name) + matched = True + if not matched and 'OTHER' in dp.branch_names: + when_fields = set() + for when_val, _ in dp.when_list: + for c in cons: + if c[0] in when_val: + when_fields.add(c[0]) + if when_fields: + dp.active_branches.add('OTHER') + return + for c in cons: + if c[0] == dp.label and c[1] == '=': + name = f"WHEN {c[2]}" + if name in dp.branch_names: + dp.active_branches.add(name) + elif c[0] == dp.label and c[1] == 'not_in': + dp.active_branches.add('OTHER') + + +def _mark_search(dp, cons, fields=None): + branch_masks = [False] * len(dp.branch_names) + for i, (cond_text, body_seq) in enumerate(dp.when_list): + cond_tree = dp.cond_trees[i] if i < len(dp.cond_trees) else None + if not cond_tree: + continue + if isinstance(cond_tree, CondLeaf): + for c in cons: + if len(c) == 4: + base_c = re.sub(r'\s*\(.*?\)\s*$', '', c[0]) + base_cond = re.sub(r'\s*\(.*?\)\s*$', '', cond_tree.field) + if base_c == base_cond and c[1] == cond_tree.op \ + and str(c[2]) == str(cond_tree.value) and c[3]: + branch_masks[i] = True + break + else: + leaves = list(collect_leaves(cond_tree)) + assignment = {} + for leaf in leaves: + for c in cons: + if len(c) == 4: + base_c = re.sub(r'\s*\(.*?\)\s*$', '', c[0]) + base_l = re.sub(r'\s*\(.*?\)\s*$', '', leaf.field) + if base_c == base_l and c[1] == leaf.op and str(c[2]) == str(leaf.value): + assignment[leaf] = c[3] + break + if len(assignment) == len(leaves): + if evaluate_tree(cond_tree, assignment): + branch_masks[i] = True + if dp.has_other: + at_end_idx = len(dp.branch_names) - 1 + if not any(branch_masks[:at_end_idx]): + branch_masks[at_end_idx] = True + for i, m in enumerate(branch_masks): + if m: + dp.active_branches.add(dp.branch_names[i]) + + +def _mark_perform(dp, cons): + simple = getattr(dp, 'parsed', None) + if simple: + for c in cons: + if _match_constraint(c, simple): + if c[3]: + dp.active_branches.add('Skip') + else: + dp.active_branches.add('Enter') + elif dp.cond_tree and dp.cond_leaves: + assignment = {} + for leaf in dp.cond_leaves: + for c in cons: + if _match_leaf(c, leaf): + assignment[leaf] = c[3] + break + if len(assignment) == len(dp.cond_leaves): + if evaluate_tree(dp.cond_tree, assignment): + dp.active_branches.add('Skip') + else: + dp.active_branches.add('Enter') + else: + for c in cons: + if c[0] == dp.label or any(c[0] == f for f in _get_fields_in_cond(dp.label)): + if c[3]: + dp.active_branches.add('Skip') + else: + dp.active_branches.add('Enter') + + +def _get_fields_in_cond(cond_text): + return re.findall(r'[A-Z][A-Z0-9-]*', cond_text.upper()) + + +# ── 行号定位(基于原始源文本)── + +def locate_decision_lines(decision_points, raw_source): + """在原始源文本中搜索每个决策点的近似行号""" + lines = raw_source.upper().splitlines() + for dp in decision_points: + patterns = _build_search_patterns(dp) + for i, line in enumerate(lines): + for pat in patterns: + if re.search(pat, line): + dp.source_line = i + 1 + break + if dp.source_line: + break + + +def _normalize(text): + """标准化条件文本用于比较:去多余空白、标准化引号""" + t = re.sub(r'\s+', ' ', text).strip() + t = t.replace('"', "'") + return t + + +def _build_search_patterns(dp): + texts = [] + if dp.kind == 'IF': + texts.append((r'\bIF\b', dp.label)) + elif dp.kind == 'EVALUATE': + texts.append((r'\bEVALUATE\b', dp.label)) + elif dp.kind == 'PERFORM': + texts.append((r'\bUNTIL\b', dp.condition if hasattr(dp, 'condition') else dp.label + if dp.label else '')) + else: + return [r'$^'] # 永不匹配 + + patterns = [] + for keyword, condition in texts: + if not condition: + continue + norm_cond = _normalize(condition) + # 转义正则特殊字符,但保留空格(替换为\s+) + esc = re.escape(norm_cond) + esc = esc.replace(r'\ ', r'\s+') + esc = esc.replace(r'\'', r"['\"]") + patterns.append(keyword + r'\s+' + esc) + if not patterns: + return [r'$^'] + return patterns + + +# ── HTML 报告(详情页)── + +_DETAIL_HTML = ''' + + + + +{title} + + + + +
+ ← 覆盖率总览 + | +

{title}

+
+ +
+ +
+

📈 覆盖率概要

+
+
+
{dec_frac}
+
决策覆盖率
+
+
+
{cond_frac}
+
条件覆盖率
+
+
+
{dp_count_text}
+
决策点
+
+
+
+
+
+
{dec_pct_text}
+
+ 已覆盖 + 未覆盖 + 推断覆盖 +
+
+ + {decision_table} + + {leaf_table} + + {source_section} + +
+ +''' + + +def generate_html_report(decision_points, leaf_stats, source_lines, outpath, + filename='', index_relpath=None, covered_lines=None): + title = f"覆盖率报告 — {filename}" if filename else "覆盖率报告" + + total_branches = sum(len(dp.branch_names) for dp in decision_points) + covered_branches = sum(len(dp.active_branches) for dp in decision_points) + implied_branches = sum(len(dp.implied_branches) for dp in decision_points) + if covered_lines: + # 无分支程序:隐式 100% + total_branches = max(total_branches, 1) + covered_branches = max(covered_branches, 1) + + total_leaves = len(leaf_stats) * 2 + covered_leaves = (sum(1 for l in leaf_stats if l.covered_true) + + sum(1 for l in leaf_stats if l.covered_false)) + + # 计算数值 + is_implicit = bool(covered_lines) # 无分支程序,隐式 100% + dec_pct_val = (covered_branches / total_branches * 100) if total_branches else 0 + dec_pct_text = "100% ✓" if is_implicit else (f"{dec_pct_val:.1f}%" if total_branches else "无") + dec_frac = "全部覆盖" if is_implicit else (f"{covered_branches}/{total_branches}" if total_branches else "—") + cond_frac = f"{covered_leaves}/{total_leaves}" if total_leaves else "—" + implied_text = f'(+{implied_branches - covered_branches} 推断)' if implied_branches > covered_branches else '' + + # 颜色 + if is_implicit or not total_branches or dec_pct_val >= 100: + dec_val_cls = 'val-green' + bar_cls = '' + elif dec_pct_val >= 80: + dec_val_cls = 'val-amber' + bar_cls = ' amber' + else: + dec_val_cls = 'val-red' + bar_cls = ' red' + + if not total_leaves or covered_leaves == total_leaves: + cond_val_cls = 'val-green' + elif covered_leaves / total_leaves >= 0.8: + cond_val_cls = 'val-amber' + else: + cond_val_cls = 'val-red' + + # 决策点表格 + if decision_points: + dp_rows = [] + for dp in decision_points: + ln = str(dp.source_line) if dp.source_line else '?' + branch_cells = [] + for bn in dp.branch_names: + if bn in dp.active_branches: + branch_cells.append(f'{bn} ✓') + elif bn in dp.implied_branches: + branch_cells.append(f'{bn} ○') + else: + branch_cells.append(f'{bn} ✗') + dp_rows.append(f'#{dp.id}{dp.kind}{ln}' + f'{dp.label}' + f'{" ".join(branch_cells)}') + + decision_table = f'''
+

📜 决策点

+ + + {"".join(dp_rows)} +
#类型行号条件分支
+
''' + else: + decision_table = '' + + # 叶条件表格 + if leaf_stats: + leaf_rows = [] + for leaf in leaf_stats: + t = '' if leaf.covered_true else '' + f = '' if leaf.covered_false else '' + leaf_rows.append(f'{leaf.field}{leaf.op}' + f'{leaf.value}{t}{f}') + + leaf_table = f'''
+

🔢 条件覆盖明细(叶条件)

+ + + {"".join(leaf_rows)} +
字段运算符
+
''' + else: + leaf_table = '' + + # 源码标注 + if source_lines: + line_cov = {} + for dp in decision_points: + if dp.source_line: + if dp.source_line not in line_cov: + line_cov[dp.source_line] = [] + has_missed = any(bn not in dp.active_branches for bn in dp.branch_names) + has_active = any(bn in dp.active_branches for bn in dp.branch_names) + if has_active and not has_missed: + line_cov[dp.source_line].append('hl-green') + elif has_active: + line_cov[dp.source_line].append('hl-red') + else: + line_cov[dp.source_line].append('hl-amber') + + # 无分支程序:所有 PD 行标记为已覆盖 + if covered_lines: + for ln in covered_lines: + line_cov.setdefault(ln, []).append('hl-green') + + src_lines = [] + for i, line in enumerate(source_lines, 1): + cls_list = line_cov.get(i, []) + hl = ' ' + ' '.join(cls_list) if cls_list else '' + src_lines.append(f'
' + f'{i}' + f'{line}
') + + source_section = f'''
+

📖 源码标注

+ {"".join(src_lines)} +
''' + else: + source_section = '' + + html = _DETAIL_HTML.format( + title=title, + index_relpath=index_relpath or '#', + dec_frac=dec_frac, + dec_pct_text=dec_pct_text, + dec_val_cls=dec_val_cls, + cond_frac=cond_frac, + cond_val_cls=cond_val_cls, + bar_cls=bar_cls, + bar_pct=str(int(dec_pct_val)), + decision_table=decision_table, + leaf_table=leaf_table, + source_section=source_section, + dp_count_text=('—' if is_implicit else str(len(decision_points))), + ) + + outpath = Path(outpath) + outpath.parent.mkdir(parents=True, exist_ok=True) + outpath.write_text(html, encoding='utf-8') + + +# ── 总括索引页 ── + +_INDEX_HTML = ''' + + + + +覆盖率总览 + + + + +
+

📊 覆盖率总览报告

+ {timestamp} +
+ +
+ +
+
+
{agg_dec_num}
+
决策覆盖率
+
+
+
{agg_cond_num}
+
条件覆盖率
+
+
+
{prog_count}
+
已分析程序
+
+
+
{uncovered_count}
+
未完全覆盖程序
+
+
+ +
+
+ {dec_ring_svg} +
决策覆盖率
+
+
+ {cond_ring_svg} +
条件覆盖率
+
+
+ +
+ 已覆盖 + 未覆盖 + 推断覆盖 +
+ +
+ +
+ + +
+
+ +
+ + + + + + + + + + + +{rows} + +
程序 决策分支 条件覆盖 覆盖率 状态
+
+ +
+ + + + +''' + + +def _ring_svg(pct, color_stops): + """生成 SVG 圆环 HTML。pct: 0-100 浮点数。""" + r = 54 + circ = 2 * 3.14159265 * r + offset = circ * (1 - pct / 100) if pct > 0 else circ + if pct >= 80: + stroke = '#00c853' + elif pct >= 50: + stroke = '#ff8f00' + else: + stroke = '#ff1744' + return ( + f'' + f'' + f'' + f'' + f'{pct:.0f}%' + f'覆盖率' + f'' + ) + + +def generate_coverage_index(programs, outdir): + """生成覆盖率总括索引页。""" + from datetime import datetime + timestamp = datetime.now().strftime('%Y-%m-%d %H:%M') + + agg_total = sum(p['total_branches'] for p in programs) + agg_covered = sum(p['covered_branches'] for p in programs) + agg_implied = sum(p['implied_branches'] for p in programs) + agg_ctotal = sum(p['total_conditions'] for p in programs) + agg_ccovered = sum(p['covered_conditions'] for p in programs) + + agg_dec_pct = (agg_covered / agg_total * 100) if agg_total else 0 + agg_cond_pct = (agg_ccovered / agg_ctotal * 100) if agg_ctotal else 0 + uncovered_count = sum(1 for p in programs if p['total_branches'] and + p['covered_branches'] < p['total_branches']) + + dec_num_cls = 'num-green' if agg_dec_pct == 100 else ('num-amber' if agg_dec_pct >= 80 else 'num-red') + cond_num_cls = 'num-green' if agg_cond_pct == 100 else ('num-amber' if agg_cond_pct >= 80 else 'num-red') + uncovered_num_cls = 'num-green' if uncovered_count == 0 else 'num-red' + + def sort_key(p): + if p['total_branches']: + return -p['covered_branches'] / p['total_branches'] + return -1.0 + sorted_programs = sorted(programs, key=sort_key) + + rows = [] + for p in sorted_programs: + name = p['name'] + href = p['detail_relpath'] + tb = p['total_branches'] + cb = p['covered_branches'] + ib = p['implied_branches'] + tc = p['total_conditions'] + cc = p['covered_conditions'] + imp = p.get('implicit_100', False) + + pct_dec = (cb / tb * 100) if tb else 0 + pct_text = "全部覆盖" if imp else (f"{pct_dec:.1f}%" if tb else "—") + implied_text = f'(+{ib - cb} 推断)' if ib > cb else '' + branch_text = "—" if imp else f"{cb}/{tb}" + cond_text = f"{cc}/{tc}" if tc else "—" + bar_pct = int(pct_dec) + + # 进度条颜色 + if imp or pct_dec >= 100: + bar_cls = '' + elif pct_dec >= 80: + bar_cls = ' amber' + else: + bar_cls = ' red' + + # 状态徽标 + if tb == 0 or (cb == tb and not (ib > cb)): + badge = '✓ 完全' + elif cb == tb and ib > cb: + badge = '○ 推断' + elif pct_dec >= 80: + badge = '⚠ 不足' + else: + badge = '✗ 欠缺' + + # 条件覆盖数字颜色 + if tc: + cond_pct = cc / tc * 100 + cond_color = 'num-green' if cond_pct == 100 else ('num-amber' if cond_pct >= 80 else 'num-red') + cond_display = f'{cond_text}' + else: + cond_display = '' + + row_class = 'row-imperfect' if cb < tb else '' + rows.append(f''' + {name} + {branch_text} {implied_text} + {cond_display} + +
+
+
+ {pct_text} +
+
+ {pct_text} +
+ + {badge} +''') + + dec_ring_svg = _ring_svg(agg_dec_pct, '') + cond_ring_svg = _ring_svg(agg_cond_pct, '') + + html = _INDEX_HTML.format( + timestamp=timestamp, + agg_dec_num=f"{agg_covered}/{agg_total}", + dec_num_cls=dec_num_cls, + agg_cond_num=f"{agg_ccovered}/{agg_ctotal}" if agg_ctotal else "无数据", + cond_num_cls=cond_num_cls, + prog_count=str(len(programs)), + uncovered_num_cls=uncovered_num_cls, + uncovered_count=str(uncovered_count), + dec_ring_svg=dec_ring_svg, + cond_ring_svg=cond_ring_svg, + rows='\n'.join(rows), + ) + + outpath = Path(outdir) / 'coverage' / 'index.html' + outpath.parent.mkdir(parents=True, exist_ok=True) + outpath.write_text(html, encoding='utf-8') + + +# ── PROCEDURE DIVISION 行范围定位(用于无分支程序标记)── + +def _find_proc_range(raw_source: str): + """返回 PROCEDURE DIVISION 的行范围 (start_line, end_line) 1-indexed,或 None。""" + lines = raw_source.splitlines() + proc_start = None + for i, line in enumerate(lines): + if re.search(r'PROCEDURE\s+DIVISION', line.upper()): + proc_start = i + 1 + break + if proc_start is None: + return None + # 找下一个 DIVISION 作为结束边界(或文件尾) + for i in range(proc_start, len(lines)): + if re.search(r'(IDENTIFICATION|DATA|ENVIRONMENT)\s+DIVISION', lines[i].upper()): + return (proc_start, i) # 不包含下一个 DIVISION + return (proc_start, len(lines) + 1) + + +# ── 接入入口 ── + +def run_coverage(branch_tree, branch_paths_with_assigns, fields, + raw_source, output_prefix, index_relpath=None): + """完整覆盖率流程:收集 → 标记 → 定位 → 输出。 + + Returns: + dict: 汇总数据,用于总括页聚合 + """ + decision_points, leaf_stats = collect_decision_points(branch_tree, fields) + + mark_coverage(decision_points, leaf_stats, branch_paths_with_assigns, fields) + + if raw_source: + locate_decision_lines(decision_points, raw_source) + + total = sum(len(dp.branch_names) for dp in decision_points) + covered = sum(len(dp.active_branches) for dp in decision_points) + implied = sum(len(dp.implied_branches) for dp in decision_points) + leaf_covered = (sum(1 for l in leaf_stats if l.covered_true) + + sum(1 for l in leaf_stats if l.covered_false)) + leaf_total = len(leaf_stats) * 2 + + # 无决策点但有路径 → PROCEDURE DIVISION 全部覆盖 + covered_lines = set() + if total == 0 and branch_paths_with_assigns and raw_source: + proc_range = _find_proc_range(raw_source) + if proc_range: + covered_lines.update(range(proc_range[0], proc_range[1])) + total = 1 + covered = 1 + + if output_prefix: + generate_html_report(decision_points, leaf_stats, + raw_source.splitlines() if raw_source else [], + f"{output_prefix}_coverage.html", + Path(output_prefix).stem, + index_relpath=index_relpath, + covered_lines=covered_lines) + + # 控制台摘要 + if total or leaf_total: + logger.info(f"\n=== 分支覆盖率 ===") + if covered_lines and not decision_points: + logger.info(" 程序无分支结构,全部代码已覆盖") + for dp in decision_points: + branches = [] + for bn in dp.branch_names: + if bn in dp.active_branches: + branches.append(f'{bn} [x]') + elif bn in dp.implied_branches: + branches.append(f'{bn} [o]') + else: + branches.append(f'{bn} [ ]') + ln = f":{dp.source_line}" if dp.source_line else "" + logger.info(f" #{dp.id} [{dp.kind}] {dp.label}{ln}") + logger.info(f" {' | '.join(branches)}") + + if total: + pct = covered / total * 100 + logger.info(f"\n 决策覆盖率:{covered}/{total}({pct:.1f}%)") + if leaf_total: + pct = leaf_covered / leaf_total * 100 + logger.info(f" 条件覆盖率:{leaf_covered}/{leaf_total}({pct:.1f}%)") + + if output_prefix: + logger.info(f"\n 覆盖率报告:{output_prefix}_coverage.html") + + implicit_100 = bool(covered_lines) + return { + 'name': Path(output_prefix).stem if output_prefix else '', + 'detail_relpath': ('../' + Path(output_prefix).stem + '_coverage.html' + if output_prefix else ''), + 'total_branches': total, + 'covered_branches': covered, + 'implied_branches': implied, + 'implicit_100': implicit_100, + 'total_conditions': leaf_total, + 'covered_conditions': leaf_covered, + '_decision_points': decision_points, + '_leaf_stats': leaf_stats, + } + + +def check_coverage(structure: dict, test_records: list[dict]) -> dict: + """报告 COBOL 源码的静态分支结构信息。 + + 注意: 静态分析无法精确判断每条测试数据运行时覆盖了哪些分支。 + 精确的路径追踪依赖 gcov(Phase 3)。此处仅报告总分支数和记录生成情况。 + + Returns: + dict with: paragraph_rate, branch_rate, decision_rate, total_branches, + total_paragraphs, records_count, note + """ + total_paragraphs = structure.get("total_paragraphs", 0) + total_branches = structure.get("total_branches", 0) + decision_points = structure.get("decision_points", []) + has_data = len(test_records) > 0 + + paragraph_rate = 1.0 if (total_paragraphs > 0 and has_data) else 0.0 + + return { + "paragraph_rate": paragraph_rate, + "branch_rate": 0.0, + "decision_rate": 0.0, + "uncovered_decision_ids": [], + "total_branches": total_branches, + "total_paragraphs": total_paragraphs, + "records_count": len(test_records), + "note": "静态分析无法精确计算覆盖率。精确数据通过 gcov 获取(Phase 3)。", + } diff --git a/config/__init__.py b/config/__init__.py index d943037..dfb4eec 100644 --- a/config/__init__.py +++ b/config/__init__.py @@ -20,6 +20,11 @@ class Config: num_records: int = 1000 branch_pass: float = 0.80 max_llm_cost: float = 0.50 + quality_gate_mode: str = "warn" + quality_gate_decision_threshold: float = 0.90 + quality_gate_paragraph_threshold: float = 1.0 + gcov_enabled: bool = False + max_quality_retries: int = 4 @classmethod def from_toml(cls, path="aurak.toml"): diff --git a/data/diff_result.py b/data/diff_result.py index c35b16d..86e755a 100644 --- a/data/diff_result.py +++ b/data/diff_result.py @@ -28,6 +28,15 @@ class VerificationRun: field_results: list[FieldResult] = field(default_factory=list) runner: str = "native" branch_rate: float = 0.0 + paragraph_rate: float = 0.0 # 段落覆盖率 + decision_rate: float = 0.0 # 决策点覆盖率 + hina_type: str = "" # HINA 类型 + hina_confidence: float = 0.0 # HINA 确信度 + quality_score: float = 0.0 # 质量评分 + quality_warn: str = "" # 质量警告信息 + heal_retry: int = 0 # 自愈重试次数 + simple_retry: int = 0 # 朴素重试次数 + total_retry: int = 0 # 总重试次数 llm_cost: float = 0.0 report_path: str = "" debug: dict = field(default_factory=dict) diff --git a/hina/__init__.py b/hina/__init__.py new file mode 100644 index 0000000..d16e4a1 --- /dev/null +++ b/hina/__init__.py @@ -0,0 +1 @@ +# HINA 程序分类与质量门禁包 diff --git a/hina/retry.py b/hina/retry.py new file mode 100644 index 0000000..a8a7fd6 --- /dev/null +++ b/hina/retry.py @@ -0,0 +1,82 @@ +""" +分层重试 — 部署在 orchestrator 调用者层(main.py / worker.py)。 +""" +import logging +import os +from typing import Callable +from data.diff_result import VerificationRun + +logger = logging.getLogger(__name__) + +HEALING_FIXES = { + "compile_error": { + "detect": lambda log: "not found" in (log or "").lower(), + "fix": lambda: _try_set_env( + "COB_LIBRARY_PATH", + "D:\\360安全浏览器下载\\GC32-BDB-SP1-rename-7z-to-exe\\lib\\gnucobol", + ), + }, + "s0c7": { + "detect": lambda log: "S0C7" in (log or ""), + "fix": lambda: logger.warning("[Retry] S0C7 需要人工修正测试数据中的数值字段"), + }, +} + + +def _try_set_env(key: str, value: str) -> None: + """尝试设置环境变量(如果当前未设置)""" + if not os.environ.get(key): + os.environ[key] = value + logger.info(f"[Retry] 已设置环境变量 {key}={value}") + else: + logger.info(f"[Retry] {key} 已存在,跳过") + + +class RetryHandler: + def __init__(self, max_heal: int = 2, max_simple: int = 3): + self.max_heal = max_heal + self.max_simple = max_simple + self.heal_count = 0 + self.simple_count = 0 + self.history: list[VerificationRun] = [] + + def run(self, pipeline_fn: Callable[[], VerificationRun]) -> VerificationRun: + while (self.heal_count + self.simple_count) < (self.max_heal + self.max_simple): + vr = pipeline_fn() + self.history.append(vr) + + if vr.status in ("PASS", "QUALITY_WARN"): + vr.heal_retry = self.heal_count + vr.simple_retry = self.simple_count + vr.total_retry = self.heal_count + self.simple_count + return vr + + if vr.status in ("BLOCKED", "ERROR") and self.heal_count < self.max_heal: + build_log = vr.debug.get("cobol_build", {}).get("log", "") + healed = False + for name, fix_def in HEALING_FIXES.items(): + if fix_def["detect"](build_log): + fix_def["fix"]() + self.heal_count += 1 + healed = True + logger.info( + f"[Retry] 自愈修复应用: {name} " + f"(heal_retry={self.heal_count})" + ) + break + if healed: + continue + + self.simple_count += 1 + logger.info(f"[Retry] 朴素重试 (simple_retry={self.simple_count})") + + logger.error("[Retry] 重试次数超过上限,标记 FATAL") + vr = self.history[-1] if self.history else VerificationRun( + status="FATAL", exit_code=4 + ) + vr.status = "FATAL" + vr.exit_code = 4 + vr.heal_retry = self.heal_count + vr.simple_retry = self.simple_count + vr.total_retry = self.heal_count + self.simple_count + return vr