"""COBOL Test Data Generator — 模块化版入口""" import sys import logging from datetime import datetime from pathlib import Path # ── 配置(必须放在本地模块导入之前,避免循环导入) ── CONFIG = {} from .read import preprocess, extract_data_division, extract_procedure_division from .read import resolve_copybooks, parse_data_division, parse_file_section, scan_open_statements from .core import build_branch_tree, classify_field_roles, _init_child_names from .cond import parse_single_condition, is_field from .design import enum_paths, generate_records, _filter_stop from .output import output_json, output_input_files from .coverage import run_coverage, generate_coverage_index logger = logging.getLogger(__name__) # ── OCCURS 展开 ── def _add_subscript(name, occ): """追加或扩展下标:WS-CELL → WS-CELL(1), WS-CELL(1) → WS-CELL(1,2)""" if name.endswith(')'): return name[:-1] + f',{occ})' return name + f'({occ})' def expand_occurs(fields): """展开 OCCURS 字段为下标副本。递归处理嵌套 OCCURS。""" result = [] i = 0 while i < len(fields): f = fields[i] if f.get('occurs', 0) > 0 and not f.get('is_88'): children = [] j = i + 1 while j < len(fields): child = fields[j] if child.get('is_88'): children.append(child) j += 1 continue if child['level'] <= f['level'] or child.get('level') == 77: break children.append(child) j += 1 if children: group = dict(f) group['occurs'] = 0 result.append(group) for occ in range(1, f['occurs'] + 1): for child in children: copy = dict(child) if child.get('occurs', 0) == 0: copy['occurs'] = 0 copy['occurs_depending'] = f.get('occurs_depending') if child.get('is_88'): parent = child.get('parent') or f['name'] copy['parent'] = _add_subscript(parent, occ) copy['name'] = _add_subscript(child['name'], occ) else: copy['name'] = _add_subscript(child['name'], occ) result.append(copy) else: for occ in range(1, f['occurs'] + 1): copy = dict(f) copy['name'] = _add_subscript(f['name'], occ) copy['occurs'] = 0 copy['occurs_depending'] = f.get('occurs_depending') result.append(copy) i = j else: result.append(f) i += 1 if any(f.get('occurs', 0) > 0 for f in result): return expand_occurs(result) return result # ── 入口 ── def main(): if len(sys.argv) < 2: print("用法: python -m cobol_testgen [cobol文件2 ...] [输出目录]") sys.exit(1) args = sys.argv[1:] # 分离 cobol 文件与输出目录 cobol_files = [] outdir = None for a in args: p = Path(a) if p.is_dir(): outdir = p elif p.suffix.upper() in ('.CBL', '.COB', '.CPY'): cobol_files.append(p) else: print(f"警告:跳过未知参数 {a}") if not cobol_files: print("错误:未找到任何 COBOL 文件") sys.exit(1) if outdir is None: outdir = cobol_files[0].parent # 配置全局 Logger outdir.mkdir(parents=True, exist_ok=True) log_path = outdir / f"cobol_testgen_{datetime.now():%Y%m%d_%H%M%S}.log" fh = logging.FileHandler(log_path, encoding="utf-8", mode="w") fh.setLevel(logging.DEBUG) fh.setFormatter(logging.Formatter( "%(asctime)s [%(levelname)s] %(name)s: %(message)s" )) sh = logging.StreamHandler() sh.setLevel(logging.INFO) sh.setFormatter(logging.Formatter("%(message)s")) root_logger = logging.getLogger() root_logger.setLevel(logging.DEBUG) root_logger.addHandler(fh) root_logger.addHandler(sh) programs = [] for filepath in cobol_files: if not filepath.exists(): logger.error(f"错误:文件不存在 {filepath}") continue source = filepath.read_text(encoding='utf-8') source = resolve_copybooks(source, str(filepath.parent)) preprocessed = preprocess(source) file_sec = parse_file_section(preprocessed) # DATA DIVISION解析 data_div = extract_data_division(preprocessed) if not data_div: logger.error(f"错误:{filepath.name} 中没有 DATA DIVISION。") continue data_fields = parse_data_division(data_div) if not data_fields: logger.error(f"错误:{filepath.name} 中没有找到含 PIC 的字段。") continue # FieldDef → dict fields_dict = [] parent_pic = {} filler_counter = 0 for f in data_fields: pi = f.pic_info name = f.name if name == 'FILLER': filler_counter += 1 if filler_counter > 1: name = f'FILLER_{filler_counter}' entry = { 'name': name, 'level': f.level, 'pic': f.pic, 'pic_info': { 'type': pi.type if pi else 'unknown', 'digits': pi.digits if pi else 0, 'decimal': pi.decimal if pi else 0, 'length': pi.length if pi else 0, 'signed': pi.signed if pi else False, }, 'value': f.value, 'values': f.values, 'section': f.section, 'is_filler': f.is_filler, 'redefines': f.redefines, 'usage': f.usage, 'occurs': f.occurs_count, 'occurs_depending': f.occurs_depending, } if f.is_88: entry['is_88'] = True entry['parent'] = f.parent # Copy parent's pic_info for value generation if f.parent and f.parent in parent_pic: entry['pic_info'] = dict(parent_pic[f.parent]) else: parent_pic[name] = entry['pic_info'] fields_dict.append(entry) fields_dict = expand_occurs(fields_dict) # Build FD→children 和 field→FD 映射 fd_fields = {} field_to_fd = {} if file_sec: for fd_name, rec_names in file_sec.items(): fds = [] seen = set() for rec in rec_names: if rec not in seen: fds.append(rec) seen.add(rec) for child in _init_child_names(rec, fields_dict): if child not in seen: fds.append(child) seen.add(child) fd_fields[fd_name] = fds for child in fds: field_to_fd[child] = fd_name logger.info(f"\n========== {filepath.name} ==========") logger.info(f"\n字段列表:") logger.info(f"{'层级':<6} {'名称':<25} {'PIC':<15} {'类型':<12} {'长度':<5}") logger.info("-" * 65) for f in fields_dict: pi = f['pic_info'] t = pi.get('type', '?') l = pi.get('digits', 0) + pi.get('decimal', 0) or pi.get('length', 0) pic_display = str(f.get('pic', '')) if f.get('pic') else ('88-level' if f.get('is_88') else '') logger.info(f"{f['level']:<6} {f['name']:<25} {pic_display:<15} {t:<12} {l:<5}") # PROCEDURE DIVISION解析 proc_div = extract_procedure_division(preprocessed) branch_paths = [] assignments = {} if proc_div: branch_tree, assignments = build_branch_tree(proc_div, fields_dict) roles = classify_field_roles(branch_tree, assignments, fields_dict, source=preprocessed, proc_text=proc_div) logger.info(f"\n字段角色(输入/输出/出入/未用):") for f in fields_dict: if f.get('is_88'): continue logger.info(f" {f['name']:<30} {roles.get(f['name'], '?')}") branch_paths_with_assigns = enum_paths(branch_tree, fields_dict) branch_paths_with_assigns = [ (_filter_stop(c), a) for c, a in branch_paths_with_assigns ] # OPEN 方向解析 open_dir = scan_open_statements(proc_div) if proc_div else {} if proc_div: logger.info(f"\n分支路径数:{len(branch_paths_with_assigns)}") for i, (path_cons, _path_assign) in enumerate(branch_paths_with_assigns): descs = [] for c in path_cons: if len(c) == 4: field, op, val, want = c if op == 'not_in': descs.append(f"{field} not in {val}") else: descs.append(f"{field} {op} {val} ({'T' if want else 'F'})") logger.debug(f" 路径 {i + 1}: {', '.join(descs)}") else: logger.warning("\n没有找到 PROCEDURE DIVISION。") branch_paths_with_assigns = [([], {})] roles = {f['name']: 'unused' for f in fields_dict} # 覆盖率报告(传入原始源文本用于行号定位) cov_prefix = str(outdir / filepath.stem) index_relpath = 'coverage/index.html' cov_result = run_coverage(branch_tree, branch_paths_with_assigns, fields_dict, source, cov_prefix, index_relpath=index_relpath) records, kept_path_cons = generate_records(branch_paths_with_assigns, fields_dict, assignments, file_sec=file_sec) # 输出 JSON(完整文件) outpath = outdir / (filepath.stem + '.json') output_json(records, outpath, roles, fd_fields=fd_fields, field_to_fd=field_to_fd, open_dir=open_dir, path_cons_list=kept_path_cons) # 输出入力 JSON(按 FD 拆分) output_input_files(records, outdir, filepath.stem, roles, fd_fields, field_to_fd, open_dir) logger.info(f"\n输出:{outpath}({len(records)} 条记录)") logger.debug(f"\n记录明细:") for i, rec in enumerate(records, 1): vals = [] for f in fields_dict: r = roles.get(f['name'], '?') marker = f"[{r[0].upper()}]" if r != '?' and r != 'unused' else '' vals.append(f"{marker}{f['name']}={rec.get(f['name'], '?')}") logger.debug(f" 记录 {i}: {' | '.join(vals)}") programs.append(cov_result) # 生成覆盖率总括索引页 if programs: generate_coverage_index(programs, outdir) logger.info(f"\n覆盖率总览:{outdir / 'coverage' / 'index.html'}")