Files
cobol-java-v3/cobol_testgen/__init__.py

513 lines
19 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""COBOL Test Data Generator — 模块化版入口"""
import sys
import re
import logging
from datetime import datetime
from pathlib import Path
# ── 配置(必须放在本地模块导入之前,避免循环导入) ──
CONFIG = {}
from .read import preprocess, extract_data_division, extract_procedure_division
from .read import resolve_copybooks, parse_data_division, parse_file_section, scan_open_statements
from .core import build_branch_tree, classify_field_roles, _init_child_names
from .cond import parse_single_condition, is_field
from .design import enum_paths, generate_records, _filter_stop
from .output import output_json, output_input_files
from .coverage import run_coverage, generate_coverage_index
logger = logging.getLogger(__name__)
# ── OCCURS 展开 ──
def _add_subscript(name, occ):
"""追加或扩展下标:WS-CELL → WS-CELL(1), WS-CELL(1) → WS-CELL(1,2)"""
if name.endswith(')'):
return name[:-1] + f',{occ})'
return name + f'({occ})'
def expand_occurs(fields):
"""展开 OCCURS 字段为下标副本。递归处理嵌套 OCCURS。"""
result = []
i = 0
while i < len(fields):
f = fields[i]
if f.get('occurs', 0) > 0 and not f.get('is_88'):
children = []
j = i + 1
while j < len(fields):
child = fields[j]
if child.get('is_88'):
children.append(child)
j += 1
continue
if child['level'] <= f['level'] or child.get('level') == 77:
break
children.append(child)
j += 1
if children:
group = dict(f)
group['occurs'] = 0
result.append(group)
for occ in range(1, f['occurs'] + 1):
for child in children:
copy = dict(child)
if child.get('occurs', 0) == 0:
copy['occurs'] = 0
copy['occurs_depending'] = f.get('occurs_depending')
if child.get('is_88'):
parent = child.get('parent') or f['name']
copy['parent'] = _add_subscript(parent, occ)
copy['name'] = _add_subscript(child['name'], occ)
else:
copy['name'] = _add_subscript(child['name'], occ)
result.append(copy)
else:
for occ in range(1, f['occurs'] + 1):
copy = dict(f)
copy['name'] = _add_subscript(f['name'], occ)
copy['occurs'] = 0
copy['occurs_depending'] = f.get('occurs_depending')
result.append(copy)
i = j
else:
result.append(f)
i += 1
if any(f.get('occurs', 0) > 0 for f in result):
return expand_occurs(result)
return result
# ── 入口 ──
def main():
if len(sys.argv) < 2:
print("用法: python -m cobol_testgen <cobol文件1> [cobol文件2 ...] [输出目录]")
sys.exit(1)
args = sys.argv[1:]
# 分离 cobol 文件与输出目录
cobol_files = []
outdir = None
for a in args:
p = Path(a)
if p.is_dir():
outdir = p
elif p.suffix.upper() in ('.CBL', '.COB', '.CPY'):
cobol_files.append(p)
else:
print(f"警告:跳过未知参数 {a}")
if not cobol_files:
print("错误:未找到任何 COBOL 文件")
sys.exit(1)
if outdir is None:
outdir = cobol_files[0].parent
# 配置全局 Logger
outdir.mkdir(parents=True, exist_ok=True)
log_path = outdir / f"cobol_testgen_{datetime.now():%Y%m%d_%H%M%S}.log"
fh = logging.FileHandler(log_path, encoding="utf-8", mode="w")
fh.setLevel(logging.DEBUG)
fh.setFormatter(logging.Formatter(
"%(asctime)s [%(levelname)s] %(name)s: %(message)s"
))
sh = logging.StreamHandler()
sh.setLevel(logging.INFO)
sh.setFormatter(logging.Formatter("%(message)s"))
root_logger = logging.getLogger()
root_logger.setLevel(logging.DEBUG)
root_logger.addHandler(fh)
root_logger.addHandler(sh)
programs = []
for filepath in cobol_files:
if not filepath.exists():
logger.error(f"错误:文件不存在 {filepath}")
continue
source = filepath.read_text(encoding='utf-8')
source = resolve_copybooks(source, str(filepath.parent))
preprocessed = preprocess(source)
file_sec = parse_file_section(preprocessed)
# DATA DIVISION解析
data_div = extract_data_division(preprocessed)
if not data_div:
logger.error(f"错误:{filepath.name} 中没有 DATA DIVISION。")
continue
data_fields = parse_data_division(data_div)
if not data_fields:
logger.error(f"错误:{filepath.name} 中没有找到含 PIC 的字段。")
continue
# FieldDef → dict
fields_dict = []
parent_pic = {}
filler_counter = 0
for f in data_fields:
pi = f.pic_info
name = f.name
if name == 'FILLER':
filler_counter += 1
if filler_counter > 1:
name = f'FILLER_{filler_counter}'
entry = {
'name': name,
'level': f.level,
'pic': f.pic,
'pic_info': {
'type': pi.type if pi else 'unknown',
'digits': pi.digits if pi else 0,
'decimal': pi.decimal if pi else 0,
'length': pi.length if pi else 0,
'signed': pi.signed if pi else False,
},
'value': f.value,
'values': f.values,
'section': f.section,
'is_filler': f.is_filler,
'redefines': f.redefines,
'usage': f.usage,
'occurs': f.occurs_count,
'occurs_depending': f.occurs_depending,
}
if f.is_88:
entry['is_88'] = True
entry['parent'] = f.parent
# Copy parent's pic_info for value generation
if f.parent and f.parent in parent_pic:
entry['pic_info'] = dict(parent_pic[f.parent])
else:
parent_pic[name] = entry['pic_info']
fields_dict.append(entry)
fields_dict = expand_occurs(fields_dict)
# Build FD→children 和 field→FD 映射
fd_fields = {}
field_to_fd = {}
if file_sec:
for fd_name, rec_names in file_sec.items():
fds = []
seen = set()
for rec in rec_names:
if rec not in seen:
fds.append(rec)
seen.add(rec)
for child in _init_child_names(rec, fields_dict):
if child not in seen:
fds.append(child)
seen.add(child)
fd_fields[fd_name] = fds
for child in fds:
field_to_fd[child] = fd_name
logger.info(f"\n========== {filepath.name} ==========")
logger.info(f"\n字段列表:")
logger.info(f"{'层级':<6} {'名称':<25} {'PIC':<15} {'类型':<12} {'长度':<5}")
logger.info("-" * 65)
for f in fields_dict:
pi = f['pic_info']
t = pi.get('type', '?')
l = pi.get('digits', 0) + pi.get('decimal', 0) or pi.get('length', 0)
pic_display = str(f.get('pic', '')) if f.get('pic') else ('88-level' if f.get('is_88') else '')
logger.info(f"{f['level']:<6} {f['name']:<25} {pic_display:<15} {t:<12} {l:<5}")
# PROCEDURE DIVISION解析
proc_div = extract_procedure_division(preprocessed)
branch_paths = []
assignments = {}
if proc_div:
branch_tree, assignments = build_branch_tree(proc_div, fields_dict)
roles = classify_field_roles(branch_tree, assignments, fields_dict,
source=preprocessed, proc_text=proc_div)
logger.info(f"\n字段角色(输入/输出/出入/未用):")
for f in fields_dict:
if f.get('is_88'):
continue
logger.info(f" {f['name']:<30} {roles.get(f['name'], '?')}")
branch_paths_with_assigns = enum_paths(branch_tree, fields_dict)
branch_paths_with_assigns = [
(_filter_stop(c), a) for c, a in branch_paths_with_assigns
]
# OPEN 方向解析
open_dir = scan_open_statements(proc_div) if proc_div else {}
if proc_div:
logger.info(f"\n分支路径数:{len(branch_paths_with_assigns)}")
for i, (path_cons, _path_assign) in enumerate(branch_paths_with_assigns):
descs = []
for c in path_cons:
if len(c) == 4:
field, op, val, want = c
if op == 'not_in':
descs.append(f"{field} not in {val}")
else:
descs.append(f"{field} {op} {val} ({'T' if want else 'F'})")
logger.debug(f" 路径 {i + 1}: {', '.join(descs)}")
else:
logger.warning("\n没有找到 PROCEDURE DIVISION。")
branch_paths_with_assigns = [([], {})]
roles = {f['name']: 'unused' for f in fields_dict}
# 覆盖率报告(传入原始源文本用于行号定位)
cov_prefix = str(outdir / filepath.stem)
index_relpath = 'coverage/index.html'
cov_result = run_coverage(branch_tree, branch_paths_with_assigns, fields_dict,
source, cov_prefix, index_relpath=index_relpath)
records, kept_path_cons = generate_records(branch_paths_with_assigns, fields_dict, assignments, file_sec=file_sec)
# 输出 JSON(完整文件)
outpath = outdir / (filepath.stem + '.json')
output_json(records, outpath, roles,
fd_fields=fd_fields, field_to_fd=field_to_fd,
open_dir=open_dir,
path_cons_list=kept_path_cons)
# 输出入力 JSON(按 FD 拆分)
output_input_files(records, outdir, filepath.stem, roles,
fd_fields, field_to_fd, open_dir)
logger.info(f"\n输出:{outpath}{len(records)} 条记录)")
logger.debug(f"\n记录明细:")
for i, rec in enumerate(records, 1):
vals = []
for f in fields_dict:
r = roles.get(f['name'], '?')
marker = f"[{r[0].upper()}]" if r != '?' and r != 'unused' else ''
vals.append(f"{marker}{f['name']}={rec.get(f['name'], '?')}")
logger.debug(f" 记录 {i}: {' | '.join(vals)}")
programs.append(cov_result)
# 生成覆盖率总括索引页
if programs:
generate_coverage_index(programs, outdir)
logger.info(f"\n覆盖率总览:{outdir / 'coverage' / 'index.html'}")
# ════════════════════════════════════════════
# Phase 1: 可编程 API(供 orchestrator.py 调用)
# ════════════════════════════════════════════
def extract_structure(cobol_source: str) -> dict:
"""分析 COBOL 源码的结构,返回结构摘要。不生成测试数据,只做静态分析。
Returns:
dict with: paragraphs, decision_points, branch_tree, file_count,
open_directions, has_search_all, has_evaluate,
has_call, has_break, total_branches, total_paragraphs
"""
preprocessed = preprocess(cobol_source)
data_div = extract_data_division(preprocessed)
data_fields = parse_data_division(data_div) if data_div else []
fields_dict = []
for idx, f in enumerate(data_fields):
entry = {
'name': f.name if f.name != 'FILLER' else f'FILLER_{idx + 1}',
'level': f.level, 'pic': f.pic,
'pic_info': {
'type': f.pic_info.type if f.pic_info else 'unknown',
'digits': f.pic_info.digits if f.pic_info else 0,
'decimal': f.pic_info.decimal if f.pic_info else 0,
'length': f.pic_info.length if f.pic_info else 0,
'signed': f.pic_info.signed if f.pic_info else False,
},
'section': f.section, 'occurs': f.occurs_count,
'occurs_depending': f.occurs_depending,
'redefines': f.redefines, 'usage': f.usage,
}
if f.is_88:
entry['is_88'] = True
entry['parent'] = f.parent
entry['value'] = f.value
entry['values'] = f.values
fields_dict.append(entry)
fields_dict = expand_occurs(fields_dict)
proc_div = extract_procedure_division(preprocessed)
branch_tree = None
assignments = {}
if proc_div:
branch_tree, assignments = build_branch_tree(proc_div, fields_dict)
file_sec = parse_file_section(preprocessed)
open_dir = scan_open_statements(proc_div) if proc_div else {}
from .models import BrIf, BrEval, BrSeq
decision_points = []
total_branches = 0
def _walk(node, counter):
nonlocal total_branches
if isinstance(node, BrIf):
counter[0] += 1
branches = 2
decision_points.append({
"id": counter[0], "kind": "IF",
"label": str(node.condition)[:80], "branches": branches,
})
total_branches += branches
_walk(node.true_seq, counter)
_walk(node.false_seq, counter)
elif isinstance(node, BrEval):
counter[0] += 1
n = len(node.when_list) + (1 if node.has_other else 0)
decision_points.append({
"id": counter[0], "kind": "EVALUATE",
"label": str(node.subject)[:80], "branches": n,
})
total_branches += n
for _, seq in node.when_list:
_walk(seq, counter)
_walk(node.other_seq, counter)
elif isinstance(node, BrSeq):
for child in node.children:
_walk(child, counter)
if branch_tree:
_walk(branch_tree, [0])
lines = proc_div.split('\n') if proc_div else []
paragraphs = set()
for line in lines:
m = re.match(r'^\s*([A-Z0-9][A-Z0-9-]*)\.\s*$', line.strip())
if m:
paragraphs.add(m.group(1))
return {
"paragraphs": sorted(paragraphs) if paragraphs else [],
"decision_points": decision_points,
"branch_tree": branch_tree,
"file_count": len(file_sec) if file_sec else 0,
"open_directions": open_dir,
"has_search_all": any('SEARCH' in str(dp.get('label', '')) for dp in decision_points),
"has_evaluate": any(dp['kind'] == 'EVALUATE' for dp in decision_points),
"has_call": 'CALL' in cobol_source.upper(),
"has_break": any('KEY' in str(dp.get('label', '')).upper() for dp in decision_points),
"total_branches": total_branches,
"total_paragraphs": len(paragraphs),
"branch_tree_obj": branch_tree,
}
def generate_data(cobol_source: str, structure: dict = None) -> list[dict]:
"""根据 COBOL 源码生成覆盖所有路径的测试数据。
Args:
cobol_source: COBOL 程序源码文本
structure: 可选,如果已调用 extract_structure() 可传入避免重复解析
Returns:
list[dict]: 测试数据记录列表,每条包含所有字段的值
"""
if structure is None:
structure = extract_structure(cobol_source)
branch_tree = structure.get("branch_tree_obj")
if branch_tree is None:
return []
preprocessed = preprocess(cobol_source)
data_div = extract_data_division(preprocessed)
data_fields = parse_data_division(data_div) if data_div else []
fields_dict = []
for f in data_fields:
entry = {
'name': f.name, 'level': f.level, 'pic': f.pic,
'pic_info': {
'type': f.pic_info.type if f.pic_info else 'unknown',
'digits': f.pic_info.digits if f.pic_info else 0,
'decimal': f.pic_info.decimal if f.pic_info else 0,
'length': f.pic_info.length if f.pic_info else 0,
'signed': f.pic_info.signed if f.pic_info else False,
},
'section': f.section, 'occurs': f.occurs_count,
'occurs_depending': f.occurs_depending,
'value': f.value, 'values': f.values,
'redefines': f.redefines, 'usage': f.usage,
}
if f.is_88:
entry['is_88'] = True
entry['parent'] = f.parent
fields_dict.append(entry)
fields_dict = expand_occurs(fields_dict)
proc_div = extract_procedure_division(preprocessed)
_, assignments = build_branch_tree(proc_div, fields_dict)
file_sec = parse_file_section(preprocessed)
branch_paths = enum_paths(branch_tree, fields_dict)
branch_paths = [(_filter_stop(c), a) for c, a in branch_paths]
records, kept_paths = generate_records(branch_paths, fields_dict, assignments, file_sec=file_sec)
return records
def incremental_supplement(branch_tree, decision_gaps: list[int]) -> list[dict]:
"""针对未覆盖的决策点,增量生成补充测试数据。
Args:
branch_tree: extract_structure() 返回的 branch_tree 字段
decision_gaps: 未覆盖的决策点 ID 列表,如 [1, 3, 5]
Returns:
list[dict]: 增量测试数据,格式与 generate_data() 兼容
"""
from .models import BrIf, BrEval, BrSeq
target_decisions = set(decision_gaps)
found = []
def _find_decisions(node, counter):
if isinstance(node, BrIf):
counter[0] += 1
if counter[0] in target_decisions:
found.append(("IF", node.condition))
_find_decisions(node.true_seq, counter)
_find_decisions(node.false_seq, counter)
elif isinstance(node, BrEval):
counter[0] += 1
if counter[0] in target_decisions:
found.append(("EVALUATE", node.subject))
for _, seq in node.when_list:
_find_decisions(seq, counter)
_find_decisions(node.other_seq, counter)
elif isinstance(node, BrSeq):
for child in node.children:
_find_decisions(child, counter)
_find_decisions(branch_tree, [0])
supplements = []
for i, (kind, label) in enumerate(found):
supplements.append({
"_dec_id": f"incr_{i}",
"_kind": kind,
"_label": str(label)[:60],
})
return supplements