7ac887c776
- Add INSPECT (TALLYING/REPLACING/CONVERTING) with BEFORE/AFTER INITIAL - Add SEARCH/SEARCH ALL with element-assignment path enumeration - Fix _mark_perform compound condition marking via evaluate_tree - Fix EVALUATE TRUE prior_false to collect all MC/DC false sets - Add impossible path filtering (Pass A.5) with trace-to-root conflict detection - Fix multi-line PERFORM VARYING parsing (VARYING/FROM/BY/UNTIL on separate lines) - Remove dead code: agents.py LLM parser (replaced by rule-based _BrParser) - 59 unit tests passing, 5 integration programs verified
302 lines
11 KiB
Python
302 lines
11 KiB
Python
"""COBOL Test Data Generator — 模块化版入口"""
|
||
|
||
import sys
|
||
import logging
|
||
from datetime import datetime
|
||
from pathlib import Path
|
||
|
||
# ── 配置(必须放在本地模块导入之前,避免循环导入) ──
|
||
|
||
CONFIG = {}
|
||
|
||
from .read import preprocess, extract_data_division, extract_procedure_division
|
||
from .read import resolve_copybooks, parse_data_division, parse_file_section, scan_open_statements
|
||
from .core import build_branch_tree, classify_field_roles, _init_child_names
|
||
from .cond import parse_single_condition, is_field
|
||
from .design import enum_paths, generate_records, _filter_stop
|
||
from .output import output_json, output_input_files
|
||
from .coverage import run_coverage, generate_coverage_index
|
||
|
||
logger = logging.getLogger(__name__)
|
||
|
||
|
||
# ── OCCURS 展开 ──
|
||
|
||
|
||
def _add_subscript(name, occ):
|
||
"""追加或扩展下标:WS-CELL → WS-CELL(1), WS-CELL(1) → WS-CELL(1,2)"""
|
||
if name.endswith(')'):
|
||
return name[:-1] + f',{occ})'
|
||
return name + f'({occ})'
|
||
|
||
|
||
def expand_occurs(fields):
|
||
"""展开 OCCURS 字段为下标副本。递归处理嵌套 OCCURS。"""
|
||
result = []
|
||
i = 0
|
||
while i < len(fields):
|
||
f = fields[i]
|
||
if f.get('occurs', 0) > 0 and not f.get('is_88'):
|
||
children = []
|
||
j = i + 1
|
||
while j < len(fields):
|
||
child = fields[j]
|
||
if child.get('is_88'):
|
||
children.append(child)
|
||
j += 1
|
||
continue
|
||
if child['level'] <= f['level'] or child.get('level') == 77:
|
||
break
|
||
children.append(child)
|
||
j += 1
|
||
|
||
if children:
|
||
group = dict(f)
|
||
group['occurs'] = 0
|
||
result.append(group)
|
||
for occ in range(1, f['occurs'] + 1):
|
||
for child in children:
|
||
copy = dict(child)
|
||
if child.get('occurs', 0) == 0:
|
||
copy['occurs'] = 0
|
||
copy['occurs_depending'] = f.get('occurs_depending')
|
||
if child.get('is_88'):
|
||
parent = child.get('parent') or f['name']
|
||
copy['parent'] = _add_subscript(parent, occ)
|
||
copy['name'] = _add_subscript(child['name'], occ)
|
||
else:
|
||
copy['name'] = _add_subscript(child['name'], occ)
|
||
result.append(copy)
|
||
else:
|
||
for occ in range(1, f['occurs'] + 1):
|
||
copy = dict(f)
|
||
copy['name'] = _add_subscript(f['name'], occ)
|
||
copy['occurs'] = 0
|
||
copy['occurs_depending'] = f.get('occurs_depending')
|
||
result.append(copy)
|
||
|
||
i = j
|
||
else:
|
||
result.append(f)
|
||
i += 1
|
||
|
||
if any(f.get('occurs', 0) > 0 for f in result):
|
||
return expand_occurs(result)
|
||
return result
|
||
|
||
|
||
# ── 入口 ──
|
||
|
||
def main():
|
||
if len(sys.argv) < 2:
|
||
print("用法: python -m cobol_testgen <cobol文件1> [cobol文件2 ...] [输出目录]")
|
||
sys.exit(1)
|
||
|
||
args = sys.argv[1:]
|
||
|
||
# 分离 cobol 文件与输出目录
|
||
cobol_files = []
|
||
outdir = None
|
||
for a in args:
|
||
p = Path(a)
|
||
if p.is_dir():
|
||
outdir = p
|
||
elif p.suffix.upper() in ('.CBL', '.COB', '.CPY'):
|
||
cobol_files.append(p)
|
||
else:
|
||
print(f"警告:跳过未知参数 {a}")
|
||
if not cobol_files:
|
||
print("错误:未找到任何 COBOL 文件")
|
||
sys.exit(1)
|
||
if outdir is None:
|
||
outdir = cobol_files[0].parent
|
||
|
||
# 配置全局 Logger
|
||
outdir.mkdir(parents=True, exist_ok=True)
|
||
log_path = outdir / f"cobol_testgen_{datetime.now():%Y%m%d_%H%M%S}.log"
|
||
fh = logging.FileHandler(log_path, encoding="utf-8", mode="w")
|
||
fh.setLevel(logging.DEBUG)
|
||
fh.setFormatter(logging.Formatter(
|
||
"%(asctime)s [%(levelname)s] %(name)s: %(message)s"
|
||
))
|
||
sh = logging.StreamHandler()
|
||
sh.setLevel(logging.INFO)
|
||
sh.setFormatter(logging.Formatter("%(message)s"))
|
||
root_logger = logging.getLogger()
|
||
root_logger.setLevel(logging.DEBUG)
|
||
root_logger.addHandler(fh)
|
||
root_logger.addHandler(sh)
|
||
|
||
programs = []
|
||
|
||
for filepath in cobol_files:
|
||
if not filepath.exists():
|
||
logger.error(f"错误:文件不存在 {filepath}")
|
||
continue
|
||
|
||
source = filepath.read_text(encoding='utf-8')
|
||
source = resolve_copybooks(source, str(filepath.parent))
|
||
preprocessed = preprocess(source)
|
||
file_sec = parse_file_section(preprocessed)
|
||
|
||
# DATA DIVISION解析
|
||
data_div = extract_data_division(preprocessed)
|
||
if not data_div:
|
||
logger.error(f"错误:{filepath.name} 中没有 DATA DIVISION。")
|
||
continue
|
||
|
||
data_fields = parse_data_division(data_div)
|
||
if not data_fields:
|
||
logger.error(f"错误:{filepath.name} 中没有找到含 PIC 的字段。")
|
||
continue
|
||
|
||
# FieldDef → dict
|
||
fields_dict = []
|
||
parent_pic = {}
|
||
filler_counter = 0
|
||
for f in data_fields:
|
||
pi = f.pic_info
|
||
name = f.name
|
||
if name == 'FILLER':
|
||
filler_counter += 1
|
||
if filler_counter > 1:
|
||
name = f'FILLER_{filler_counter}'
|
||
entry = {
|
||
'name': name,
|
||
'level': f.level,
|
||
'pic': f.pic,
|
||
'pic_info': {
|
||
'type': pi.type if pi else 'unknown',
|
||
'digits': pi.digits if pi else 0,
|
||
'decimal': pi.decimal if pi else 0,
|
||
'length': pi.length if pi else 0,
|
||
'signed': pi.signed if pi else False,
|
||
},
|
||
'value': f.value,
|
||
'values': f.values,
|
||
'section': f.section,
|
||
'is_filler': f.is_filler,
|
||
'redefines': f.redefines,
|
||
'usage': f.usage,
|
||
'occurs': f.occurs_count,
|
||
'occurs_depending': f.occurs_depending,
|
||
}
|
||
if f.is_88:
|
||
entry['is_88'] = True
|
||
entry['parent'] = f.parent
|
||
# Copy parent's pic_info for value generation
|
||
if f.parent and f.parent in parent_pic:
|
||
entry['pic_info'] = dict(parent_pic[f.parent])
|
||
else:
|
||
parent_pic[name] = entry['pic_info']
|
||
fields_dict.append(entry)
|
||
|
||
fields_dict = expand_occurs(fields_dict)
|
||
|
||
# Build FD→children 和 field→FD 映射
|
||
fd_fields = {}
|
||
field_to_fd = {}
|
||
if file_sec:
|
||
for fd_name, rec_names in file_sec.items():
|
||
fds = []
|
||
seen = set()
|
||
for rec in rec_names:
|
||
if rec not in seen:
|
||
fds.append(rec)
|
||
seen.add(rec)
|
||
for child in _init_child_names(rec, fields_dict):
|
||
if child not in seen:
|
||
fds.append(child)
|
||
seen.add(child)
|
||
fd_fields[fd_name] = fds
|
||
for child in fds:
|
||
field_to_fd[child] = fd_name
|
||
|
||
logger.info(f"\n========== {filepath.name} ==========")
|
||
logger.info(f"\n字段列表:")
|
||
logger.info(f"{'层级':<6} {'名称':<25} {'PIC':<15} {'类型':<12} {'长度':<5}")
|
||
logger.info("-" * 65)
|
||
for f in fields_dict:
|
||
pi = f['pic_info']
|
||
t = pi.get('type', '?')
|
||
l = pi.get('digits', 0) + pi.get('decimal', 0) or pi.get('length', 0)
|
||
pic_display = str(f.get('pic', '')) if f.get('pic') else ('88-level' if f.get('is_88') else '')
|
||
logger.info(f"{f['level']:<6} {f['name']:<25} {pic_display:<15} {t:<12} {l:<5}")
|
||
|
||
# PROCEDURE DIVISION解析
|
||
proc_div = extract_procedure_division(preprocessed)
|
||
branch_paths = []
|
||
assignments = {}
|
||
|
||
if proc_div:
|
||
branch_tree, assignments = build_branch_tree(proc_div, fields_dict)
|
||
|
||
roles = classify_field_roles(branch_tree, assignments, fields_dict,
|
||
source=preprocessed, proc_text=proc_div)
|
||
logger.info(f"\n字段角色(输入/输出/出入/未用):")
|
||
for f in fields_dict:
|
||
if f.get('is_88'):
|
||
continue
|
||
logger.info(f" {f['name']:<30} {roles.get(f['name'], '?')}")
|
||
|
||
branch_paths_with_assigns = enum_paths(branch_tree, fields_dict)
|
||
branch_paths_with_assigns = [
|
||
(_filter_stop(c), a) for c, a in branch_paths_with_assigns
|
||
]
|
||
|
||
# OPEN 方向解析
|
||
open_dir = scan_open_statements(proc_div) if proc_div else {}
|
||
|
||
if proc_div:
|
||
logger.info(f"\n分支路径数:{len(branch_paths_with_assigns)}")
|
||
for i, (path_cons, _path_assign) in enumerate(branch_paths_with_assigns):
|
||
descs = []
|
||
for c in path_cons:
|
||
if len(c) == 4:
|
||
field, op, val, want = c
|
||
if op == 'not_in':
|
||
descs.append(f"{field} not in {val}")
|
||
else:
|
||
descs.append(f"{field} {op} {val} ({'T' if want else 'F'})")
|
||
logger.debug(f" 路径 {i + 1}: {', '.join(descs)}")
|
||
else:
|
||
logger.warning("\n没有找到 PROCEDURE DIVISION。")
|
||
branch_paths_with_assigns = [([], {})]
|
||
roles = {f['name']: 'unused' for f in fields_dict}
|
||
|
||
# 覆盖率报告(传入原始源文本用于行号定位)
|
||
cov_prefix = str(outdir / filepath.stem)
|
||
index_relpath = 'coverage/index.html'
|
||
cov_result = run_coverage(branch_tree, branch_paths_with_assigns, fields_dict,
|
||
source, cov_prefix, index_relpath=index_relpath)
|
||
|
||
records, kept_path_cons = generate_records(branch_paths_with_assigns, fields_dict, assignments, file_sec=file_sec)
|
||
|
||
# 输出 JSON(完整文件)
|
||
outpath = outdir / (filepath.stem + '.json')
|
||
output_json(records, outpath, roles,
|
||
fd_fields=fd_fields, field_to_fd=field_to_fd,
|
||
open_dir=open_dir,
|
||
path_cons_list=kept_path_cons)
|
||
|
||
# 输出入力 JSON(按 FD 拆分)
|
||
output_input_files(records, outdir, filepath.stem, roles,
|
||
fd_fields, field_to_fd, open_dir)
|
||
|
||
logger.info(f"\n输出:{outpath}({len(records)} 条记录)")
|
||
logger.debug(f"\n记录明细:")
|
||
for i, rec in enumerate(records, 1):
|
||
vals = []
|
||
for f in fields_dict:
|
||
r = roles.get(f['name'], '?')
|
||
marker = f"[{r[0].upper()}]" if r != '?' and r != 'unused' else ''
|
||
vals.append(f"{marker}{f['name']}={rec.get(f['name'], '?')}")
|
||
logger.debug(f" 记录 {i}: {' | '.join(vals)}")
|
||
|
||
programs.append(cov_result)
|
||
|
||
# 生成覆盖率总括索引页
|
||
if programs:
|
||
generate_coverage_index(programs, outdir)
|
||
logger.info(f"\n覆盖率总览:{outdir / 'coverage' / 'index.html'}")
|