"""COBOL Test Data Generator — 模块化版入口 from __future__ import annotations 公开 API: extract_structure() — 解析 COBOL 控制流 → dict generate_data() — 生成测试数据 → list[dict] incremental_supplement — 差分补充数据 → list[dict] check_coverage() — 覆盖率报告 → dict """ import sys import re import logging from datetime import datetime from pathlib import Path # ── 配置(必须放在本地模块导入之前,避免循环导入) ── CONFIG = {} from .read import preprocess, extract_data_division, extract_procedure_division from .read import resolve_copybooks, parse_data_division, parse_file_section, scan_open_statements, parse_file_control from .core import build_branch_tree, classify_field_roles, _init_child_names from .cond import parse_single_condition, is_field, collect_leaves from .design import enum_paths, generate_records, _filter_stop from .output import output_json, output_input_files from .coverage import run_coverage, generate_coverage_index, check_coverage from japanese_data import generate_fullwidth_text, generate_halfwidth_katakana, generate_wareki_date logger = logging.getLogger(__name__) n__all__ = [ "extract_structure", "generate_data", "incremental_supplement", "check_coverage", "CONFIG", "generate_fullwidth_text", "generate_halfwidth_katakana", "generate_wareki_date", ] # ── OCCURS 展开 ── def _add_subscript(name, occ): """追加或扩展下标:WS-CELL → WS-CELL(1), WS-CELL(1) → WS-CELL(1,2)""" if name.endswith(')'): return name[:-1] + f',{occ})' return name + f'({occ})' def expand_occurs(fields): """展开 OCCURS 字段为下标副本。递归处理嵌套 OCCURS。""" result = [] i = 0 while i < len(fields): f = fields[i] if f.get('occurs', 0) > 0 and not f.get('is_88'): children = [] j = i + 1 while j < len(fields): child = fields[j] if child.get('is_88'): children.append(child) j += 1 continue if child['level'] <= f['level'] or child.get('level') == 77: break children.append(child) j += 1 if children: group = dict(f) group['occurs'] = 0 result.append(group) for occ in range(1, f['occurs'] + 1): for child in children: copy = dict(child) if child.get('occurs', 0) == 0: copy['occurs'] = 0 copy['occurs_depending'] = f.get('occurs_depending') if child.get('is_88'): parent = child.get('parent') or f['name'] copy['parent'] = _add_subscript(parent, occ) copy['name'] = _add_subscript(child['name'], occ) else: copy['name'] = _add_subscript(child['name'], occ) result.append(copy) else: for occ in range(1, f['occurs'] + 1): copy = dict(f) copy['name'] = _add_subscript(f['name'], occ) copy['occurs'] = 0 copy['occurs_depending'] = f.get('occurs_depending') result.append(copy) i = j else: result.append(f) i += 1 if any(f.get('occurs', 0) > 0 for f in result): return expand_occurs(result) return result # ── 入口 ── def main(): if len(sys.argv) < 2: print("用法: python -m cobol_testgen [cobol文件2 ...] [输出目录]") sys.exit(1) args = sys.argv[1:] # 分离 cobol 文件与输出目录 cobol_files = [] outdir = None for a in args: p = Path(a) if p.is_dir(): outdir = p elif p.suffix.upper() in ('.CBL', '.COB', '.CPY'): cobol_files.append(p) else: print(f"警告:跳过未知参数 {a}") if not cobol_files: print("错误:未找到任何 COBOL 文件") sys.exit(1) if outdir is None: outdir = cobol_files[0].parent # 配置全局 Logger outdir.mkdir(parents=True, exist_ok=True) log_path = outdir / f"cobol_testgen_{datetime.now():%Y%m%d_%H%M%S}.log" fh = logging.FileHandler(log_path, encoding="utf-8", mode="w") fh.setLevel(logging.DEBUG) fh.setFormatter(logging.Formatter( "%(asctime)s [%(levelname)s] %(name)s: %(message)s" )) sh = logging.StreamHandler() sh.setLevel(logging.INFO) sh.setFormatter(logging.Formatter("%(message)s")) root_logger = logging.getLogger() root_logger.setLevel(logging.DEBUG) root_logger.addHandler(fh) root_logger.addHandler(sh) programs = [] for filepath in cobol_files: if not filepath.exists(): logger.error(f"错误:文件不存在 {filepath}") continue source = filepath.read_text(encoding='utf-8') source = resolve_copybooks(source, str(filepath.parent)) preprocessed = preprocess(source) file_sec = parse_file_section(preprocessed) # DATA DIVISION解析 data_div = extract_data_division(preprocessed) if not data_div: logger.error(f"错误:{filepath.name} 中没有 DATA DIVISION。") continue data_fields = parse_data_division(data_div) if not data_fields: logger.error(f"错误:{filepath.name} 中没有找到含 PIC 的字段。") continue # FieldDef → dict fields_dict = [] parent_pic = {} filler_counter = 0 for f in data_fields: pi = f.pic_info name = f.name if name == 'FILLER': filler_counter += 1 if filler_counter > 1: name = f'FILLER_{filler_counter}' entry = { 'name': name, 'level': f.level, 'pic': f.pic, 'pic_info': { 'type': pi.type if pi else 'unknown', 'digits': pi.digits if pi else 0, 'decimal': pi.decimal if pi else 0, 'length': pi.length if pi else 0, 'signed': pi.signed if pi else False, }, 'value': f.value, 'values': f.values, 'section': f.section, 'is_filler': f.is_filler, 'redefines': f.redefines, 'usage': f.usage, 'occurs': f.occurs_count, 'occurs_depending': f.occurs_depending, } if f.is_88: entry['is_88'] = True entry['parent'] = f.parent # Copy parent's pic_info for value generation if f.parent and f.parent in parent_pic: entry['pic_info'] = dict(parent_pic[f.parent]) else: parent_pic[name] = entry['pic_info'] fields_dict.append(entry) fields_dict = expand_occurs(fields_dict) # Build FD→children 和 field→FD 映射 fd_fields = {} field_to_fd = {} if file_sec: for fd_name, rec_names in file_sec.items(): fds = [] seen = set() for rec in rec_names: if rec not in seen: fds.append(rec) seen.add(rec) for child in _init_child_names(rec, fields_dict): if child not in seen: fds.append(child) seen.add(child) fd_fields[fd_name] = fds for child in fds: field_to_fd[child] = fd_name logger.info(f"\n========== {filepath.name} ==========") logger.info(f"\n字段列表:") logger.info(f"{'层级':<6} {'名称':<25} {'PIC':<15} {'类型':<12} {'长度':<5}") logger.info("-" * 65) for f in fields_dict: pi = f['pic_info'] t = pi.get('type', '?') l = pi.get('digits', 0) + pi.get('decimal', 0) or pi.get('length', 0) pic_display = str(f.get('pic', '')) if f.get('pic') else ('88-level' if f.get('is_88') else '') logger.info(f"{f['level']:<6} {f['name']:<25} {pic_display:<15} {t:<12} {l:<5}") # PROCEDURE DIVISION解析 proc_div = extract_procedure_division(preprocessed) branch_paths = [] assignments = {} if proc_div: branch_tree, assignments = build_branch_tree(proc_div, fields_dict) roles = classify_field_roles(branch_tree, assignments, fields_dict, source=preprocessed, proc_text=proc_div) logger.info(f"\n字段角色(输入/输出/出入/未用):") for f in fields_dict: if f.get('is_88'): continue logger.info(f" {f['name']:<30} {roles.get(f['name'], '?')}") branch_paths_with_assigns = enum_paths(branch_tree, fields_dict) branch_paths_with_assigns = [ (_filter_stop(c), a) for c, a in branch_paths_with_assigns ] # OPEN 方向解析 open_dir = scan_open_statements(proc_div) if proc_div else {} if proc_div: logger.info(f"\n分支路径数:{len(branch_paths_with_assigns)}") for i, (path_cons, _path_assign) in enumerate(branch_paths_with_assigns): descs = [] for c in path_cons: if len(c) == 4: field, op, val, want = c if op == 'not_in': descs.append(f"{field} not in {val}") else: descs.append(f"{field} {op} {val} ({'T' if want else 'F'})") logger.debug(f" 路径 {i + 1}: {', '.join(descs)}") else: logger.warning("\n没有找到 PROCEDURE DIVISION。") branch_paths_with_assigns = [([], {})] roles = {f['name']: 'unused' for f in fields_dict} # 覆盖率报告(传入原始源文本用于行号定位) cov_prefix = str(outdir / filepath.stem) index_relpath = 'coverage/index.html' cov_result = run_coverage(branch_tree, branch_paths_with_assigns, fields_dict, source, cov_prefix, index_relpath=index_relpath) records, kept_path_cons = generate_records(branch_paths_with_assigns, fields_dict, assignments, file_sec=file_sec) # 输出 JSON(完整文件) outpath = outdir / (filepath.stem + '.json') output_json(records, outpath, roles, fd_fields=fd_fields, field_to_fd=field_to_fd, open_dir=open_dir, path_cons_list=kept_path_cons) # 输出入力 JSON(按 FD 拆分) output_input_files(records, outdir, filepath.stem, roles, fd_fields, field_to_fd, open_dir) logger.info(f"\n输出:{outpath}({len(records)} 条记录)") logger.debug(f"\n记录明细:") for i, rec in enumerate(records, 1): vals = [] for f in fields_dict: r = roles.get(f['name'], '?') marker = f"[{r[0].upper()}]" if r != '?' and r != 'unused' else '' vals.append(f"{marker}{f['name']}={rec.get(f['name'], '?')}") logger.debug(f" 记录 {i}: {' | '.join(vals)}") programs.append(cov_result) # 生成覆盖率总括索引页 if programs: generate_coverage_index(programs, outdir) logger.info(f"\n覆盖率总览:{outdir / 'coverage' / 'index.html'}") # ════════════════════════════════════════════ # Phase 1: 可编程 API(供 orchestrator.py 调用) # ════════════════════════════════════════════ def extract_structure(cobol_source: str) -> dict: """分析 COBOL 源码的结构,返回结构摘要。不生成测试数据,只做静态分析。 Returns: dict with: paragraphs, decision_points, branch_tree, file_count, open_directions, has_search_all, has_evaluate, has_call, has_break, total_branches, total_paragraphs """ preprocessed = preprocess(cobol_source) data_div = extract_data_division(preprocessed) data_fields = parse_data_division(data_div) if data_div else [] fields_dict = [] for idx, f in enumerate(data_fields): entry = { 'name': f.name if f.name != 'FILLER' else f'FILLER_{idx + 1}', 'level': f.level, 'pic': f.pic, 'pic_info': { 'type': f.pic_info.type if f.pic_info else 'unknown', 'digits': f.pic_info.digits if f.pic_info else 0, 'decimal': f.pic_info.decimal if f.pic_info else 0, 'length': f.pic_info.length if f.pic_info else 0, 'signed': f.pic_info.signed if f.pic_info else False, }, 'section': f.section, 'occurs': f.occurs_count, 'occurs_depending': f.occurs_depending, 'redefines': f.redefines, 'usage': f.usage, } if f.is_88: entry['is_88'] = True entry['parent'] = f.parent entry['value'] = f.value entry['values'] = f.values fields_dict.append(entry) fields_dict = expand_occurs(fields_dict) proc_div = extract_procedure_division(preprocessed) branch_tree = None assignments = {} if proc_div: branch_tree, assignments = build_branch_tree(proc_div, fields_dict) file_sec = parse_file_section(preprocessed) open_dir = scan_open_statements(proc_div) if proc_div else {} from .models import BrIf, BrEval, BrSeq, BrPerform, Assign, CondAnd, CondOr decision_points = [] total_branches = 0 def _walk(node, counter): nonlocal total_branches if isinstance(node, BrIf): counter[0] += 1 branches = 2 decision_points.append({ "id": counter[0], "kind": "IF", "label": str(node.condition)[:80], "branches": branches, }) total_branches += branches _walk(node.true_seq, counter) _walk(node.false_seq, counter) elif isinstance(node, BrEval): counter[0] += 1 n = len(node.when_list) + (1 if node.has_other else 0) decision_points.append({ "id": counter[0], "kind": "EVALUATE", "label": str(node.subject)[:80], "branches": n, }) total_branches += n for _, seq in node.when_list: _walk(seq, counter) _walk(node.other_seq, counter) elif isinstance(node, BrSeq): for child in node.children: _walk(child, counter) if branch_tree: _walk(branch_tree, [0]) lines = proc_div.split('\n') if proc_div else [] paragraphs = set() for line in lines: m = re.match(r'^\s*([A-Z0-9][A-Z0-9-]*)\.\s*$', line.strip()) if m: paragraphs.add(m.group(1)) # ── 新增字段: select_files ── select_files = parse_file_control(preprocessed) # ── 新增字段: open_directions_detail (与 open_directions 一致) ── open_directions_detail = open_dir # ── 新增字段: has_divide / has_inspect / has_string ── has_divide = bool(re.search(r'\bDIVIDE\b', cobol_source.upper())) has_inspect = bool(re.search(r'\bINSPECT\b', cobol_source.upper())) has_string = bool(re.search(r'\bSTRING\b', cobol_source.upper())) # ── 新增字段: divide_constants ── divide_constants = [] if has_divide and proc_div: for dm in re.finditer(r'\bDIVIDE\s+([\d.]+)\b', proc_div, re.IGNORECASE): val = dm.group(1) try: divide_constants.append(float(val)) except ValueError: pass # ── 新增字段: perform_patterns ── perform_patterns = [] def _walk_performs(node): if isinstance(node, BrPerform): entry = { "type": node.perf_type, "target": node.target, "condition": node.condition, "times": node.times, "varying_var": node.varying_var, } perform_patterns.append(entry) _walk_performs(node.body_seq) elif isinstance(node, BrIf): _walk_performs(node.true_seq) _walk_performs(node.false_seq) elif isinstance(node, BrEval): for _, seq in node.when_list: _walk_performs(seq) _walk_performs(node.other_seq) elif isinstance(node, BrSeq): for c in node.children: _walk_performs(c) if branch_tree: _walk_performs(branch_tree) # ── 新增字段: main_loop ── main_loop = None def _find_main_loop(node, depth=0): nonlocal main_loop if main_loop is not None: return if isinstance(node, BrPerform): if _perform_has_read(node): main_loop = { "type": node.perf_type, "read_file": _perform_read_file(node), "has_at_end": False, } return _find_main_loop(node.body_seq, depth + 1) elif isinstance(node, BrIf): _find_main_loop(node.true_seq, depth + 1) _find_main_loop(node.false_seq, depth + 1) elif isinstance(node, BrEval): for _, seq in node.when_list: _find_main_loop(seq, depth + 1) _find_main_loop(node.other_seq, depth + 1) elif isinstance(node, BrSeq): for c in node.children: _find_main_loop(c, depth + 1) def _perform_has_read(perf_node): def _walk_seq(seq): if isinstance(seq, Assign): if seq.source_info.get('type') == 'read_into': return True elif isinstance(seq, BrSeq): for ch in seq.children: if _walk_seq(ch): return True return False return _walk_seq(perf_node.body_seq) def _perform_read_file(perf_node): def _walk_seq(seq): if isinstance(seq, Assign): if seq.source_info.get('type') == 'read_into': return seq.source_info.get('file', '') elif isinstance(seq, BrSeq): for ch in seq.children: result = _walk_seq(ch) if result: return result return None return _walk_seq(perf_node.body_seq) if branch_tree: _find_main_loop(branch_tree) # ── 新增字段: if_types ── if_types = {"total": 0, "comparison": 0, "equality": 0, "compound": 0, "nested_depth": 0} def _walk_if_types(node, depth=0): if isinstance(node, BrIf): if_types["total"] += 1 if_types["nested_depth"] = max(if_types["nested_depth"], depth) ct = node.cond_tree if ct: leaves = collect_leaves(ct) # Check compound: cond_tree is CondAnd or CondOr (not just CondLeaf) if isinstance(ct, (CondAnd, CondOr)): if_types["compound"] += 1 for leaf in leaves: if leaf.op in ('>', '<', '>=', '<='): if_types["comparison"] += 1 elif leaf.op in ('=', '<>'): if_types["equality"] += 1 _walk_if_types(node.true_seq, depth + 1) _walk_if_types(node.false_seq, depth + 1) elif isinstance(node, BrEval): for _, seq in node.when_list: _walk_if_types(seq, depth + 1) _walk_if_types(node.other_seq, depth + 1) elif isinstance(node, BrPerform): _walk_if_types(node.body_seq, depth + 1) elif isinstance(node, BrSeq): for c in node.children: _walk_if_types(c, depth + 1) if branch_tree: _walk_if_types(branch_tree) # ── 新增字段: variable_patterns ── variable_patterns = { "has_prev_key": False, "has_accumulator": False, "has_error_flag": False, "has_switch": False, "has_index": False, "has_save_area": False, "has_counter": False, "has_work": False, } for f in fields_dict: name = f.get('name', '') if re.search(r'\bWS-PREV[-_]', name, re.IGNORECASE): variable_patterns["has_prev_key"] = True if re.search(r'[-_]CNT\b', name, re.IGNORECASE) or re.search(r'[-_]ACCUM\b', name, re.IGNORECASE): variable_patterns["has_accumulator"] = True if re.search(r'[-_]ERR\b', name, re.IGNORECASE) or re.search(r'[-_]ERROR[-_]', name, re.IGNORECASE): variable_patterns["has_error_flag"] = True if re.search(r'[-_]SW\b', name, re.IGNORECASE) or re.search(r'[-_]FLAG\b', name, re.IGNORECASE): variable_patterns["has_switch"] = True if re.search(r'[-_]IDX\b', name, re.IGNORECASE) or re.search(r'[-_]INDX\b', name, re.IGNORECASE) or re.search(r'[-_]SUB\b', name, re.IGNORECASE): variable_patterns["has_index"] = True if re.search(r'[-_]SAVE[-_]', name, re.IGNORECASE) or re.search(r'[-_]HOLD[-_]', name, re.IGNORECASE): variable_patterns["has_save_area"] = True if re.search(r'[-_]CNT\b', name, re.IGNORECASE) or re.search(r'[-_]COUNT\b', name, re.IGNORECASE): variable_patterns["has_counter"] = True if name.startswith('WS-') and not re.search(r'(?:CNT|ERR|SW|IDX|INDX|SUB|SAVE|HOLD|PREV|ACCUM)', name, re.IGNORECASE): if re.search(r'[-_]W\b|[-_]WORK\b|[-_]WK\b|^WS-W[0O]\w', name, re.IGNORECASE): variable_patterns["has_work"] = True # ── 新增字段: open_pattern ── open_pattern = "sequential" if proc_div: proc_upper = proc_div.upper() open_positions = [m.start() for m in re.finditer(r'\bOPEN\b', proc_upper)] close_positions = [m.start() for m in re.finditer(r'\bCLOSE\b', proc_upper)] if open_positions and close_positions: # Check OPEN ... CLOSE ... OPEN sequence for i, opos in enumerate(open_positions): for cpos in close_positions: if cpos > opos: for opos2 in open_positions: if opos2 > cpos: open_pattern = "open-close-open" break if open_pattern == "open-close-open": break if open_pattern == "open-close-open": break return { "paragraphs": sorted(paragraphs) if paragraphs else [], "decision_points": decision_points, "branch_tree": branch_tree, "file_count": len(file_sec) if file_sec else 0, "open_directions": open_dir, "has_search_all": any('SEARCH' in str(dp.get('label', '')) for dp in decision_points), "has_evaluate": any(dp['kind'] == 'EVALUATE' for dp in decision_points), "has_call": 'CALL' in cobol_source.upper(), "has_break": any('KEY' in str(dp.get('label', '')).upper() for dp in decision_points), "total_branches": total_branches, "total_paragraphs": len(paragraphs), "branch_tree_obj": branch_tree, # ── 新增 8 类结构特征 ── "select_files": select_files, "open_directions_detail": open_directions_detail, "has_divide": has_divide, "divide_constants": divide_constants, "has_inspect": has_inspect, "has_string": has_string, "perform_patterns": perform_patterns, "main_loop": main_loop, "if_types": if_types, "variable_patterns": variable_patterns, "open_pattern": open_pattern, } def generate_data(cobol_source: str, structure: dict = None) -> list[dict]: """根据 COBOL 源码生成覆盖所有路径的测试数据。 Args: cobol_source: COBOL 程序源码文本 structure: 可选,如果已调用 extract_structure() 可传入避免重复解析 Returns: list[dict]: 测试数据记录列表,每条包含所有字段的值 """ if structure is None: structure = extract_structure(cobol_source) branch_tree = structure.get("branch_tree_obj") if branch_tree is None: return [] preprocessed = preprocess(cobol_source) data_div = extract_data_division(preprocessed) data_fields = parse_data_division(data_div) if data_div else [] fields_dict = [] for f in data_fields: entry = { 'name': f.name, 'level': f.level, 'pic': f.pic, 'pic_info': { 'type': f.pic_info.type if f.pic_info else 'unknown', 'digits': f.pic_info.digits if f.pic_info else 0, 'decimal': f.pic_info.decimal if f.pic_info else 0, 'length': f.pic_info.length if f.pic_info else 0, 'signed': f.pic_info.signed if f.pic_info else False, }, 'section': f.section, 'occurs': f.occurs_count, 'occurs_depending': f.occurs_depending, 'value': f.value, 'values': f.values, 'redefines': f.redefines, 'usage': f.usage, } if f.is_88: entry['is_88'] = True entry['parent'] = f.parent fields_dict.append(entry) fields_dict = expand_occurs(fields_dict) proc_div = extract_procedure_division(preprocessed) _, assignments = build_branch_tree(proc_div, fields_dict) file_sec = parse_file_section(preprocessed) branch_paths = enum_paths(branch_tree, fields_dict) branch_paths = [(_filter_stop(c), a) for c, a in branch_paths] records, kept_paths = generate_records(branch_paths, fields_dict, assignments, file_sec=file_sec) return records def incremental_supplement(branch_tree, decision_gaps: list[int]) -> list[dict]: """针对未覆盖的决策点,增量生成补充测试数据。 Args: branch_tree: extract_structure() 返回的 branch_tree 字段 decision_gaps: 未覆盖的决策点 ID 列表,如 [1, 3, 5] Returns: list[dict]: 增量测试数据,格式与 generate_data() 兼容 """ from .models import BrIf, BrEval, BrSeq target_decisions = set(decision_gaps) found = [] def _find_decisions(node, counter): if isinstance(node, BrIf): counter[0] += 1 if counter[0] in target_decisions: found.append(("IF", node.condition)) _find_decisions(node.true_seq, counter) _find_decisions(node.false_seq, counter) elif isinstance(node, BrEval): counter[0] += 1 if counter[0] in target_decisions: found.append(("EVALUATE", node.subject)) for _, seq in node.when_list: _find_decisions(seq, counter) _find_decisions(node.other_seq, counter) elif isinstance(node, BrSeq): for child in node.children: _find_decisions(child, counter) _find_decisions(branch_tree, [0]) supplements = [] for i, (kind, label) in enumerate(found): supplements.append({ "_dec_id": f"incr_{i}", "_kind": kind, "_label": str(label)[:60], }) return supplements