feat: Phase 1 - cobol_testgen API + quality fields + retry handler

This commit is contained in:
hangshuo652
2026-06-18 15:47:35 +08:00
parent 7fcdb41a85
commit 097530b036
6 changed files with 1845 additions and 0 deletions
+512
View File
@@ -0,0 +1,512 @@
"""COBOL Test Data Generator — 模块化版入口"""
import sys
import re
import logging
from datetime import datetime
from pathlib import Path
# ── 配置(必须放在本地模块导入之前,避免循环导入) ──
CONFIG = {}
from .read import preprocess, extract_data_division, extract_procedure_division
from .read import resolve_copybooks, parse_data_division, parse_file_section, scan_open_statements
from .core import build_branch_tree, classify_field_roles, _init_child_names
from .cond import parse_single_condition, is_field
from .design import enum_paths, generate_records, _filter_stop
from .output import output_json, output_input_files
from .coverage import run_coverage, generate_coverage_index
logger = logging.getLogger(__name__)
# ── OCCURS 展开 ──
def _add_subscript(name, occ):
"""追加或扩展下标:WS-CELL → WS-CELL(1), WS-CELL(1) → WS-CELL(1,2)"""
if name.endswith(')'):
return name[:-1] + f',{occ})'
return name + f'({occ})'
def expand_occurs(fields):
"""展开 OCCURS 字段为下标副本。递归处理嵌套 OCCURS。"""
result = []
i = 0
while i < len(fields):
f = fields[i]
if f.get('occurs', 0) > 0 and not f.get('is_88'):
children = []
j = i + 1
while j < len(fields):
child = fields[j]
if child.get('is_88'):
children.append(child)
j += 1
continue
if child['level'] <= f['level'] or child.get('level') == 77:
break
children.append(child)
j += 1
if children:
group = dict(f)
group['occurs'] = 0
result.append(group)
for occ in range(1, f['occurs'] + 1):
for child in children:
copy = dict(child)
if child.get('occurs', 0) == 0:
copy['occurs'] = 0
copy['occurs_depending'] = f.get('occurs_depending')
if child.get('is_88'):
parent = child.get('parent') or f['name']
copy['parent'] = _add_subscript(parent, occ)
copy['name'] = _add_subscript(child['name'], occ)
else:
copy['name'] = _add_subscript(child['name'], occ)
result.append(copy)
else:
for occ in range(1, f['occurs'] + 1):
copy = dict(f)
copy['name'] = _add_subscript(f['name'], occ)
copy['occurs'] = 0
copy['occurs_depending'] = f.get('occurs_depending')
result.append(copy)
i = j
else:
result.append(f)
i += 1
if any(f.get('occurs', 0) > 0 for f in result):
return expand_occurs(result)
return result
# ── 入口 ──
def main():
if len(sys.argv) < 2:
print("用法: python -m cobol_testgen <cobol文件1> [cobol文件2 ...] [输出目录]")
sys.exit(1)
args = sys.argv[1:]
# 分离 cobol 文件与输出目录
cobol_files = []
outdir = None
for a in args:
p = Path(a)
if p.is_dir():
outdir = p
elif p.suffix.upper() in ('.CBL', '.COB', '.CPY'):
cobol_files.append(p)
else:
print(f"警告:跳过未知参数 {a}")
if not cobol_files:
print("错误:未找到任何 COBOL 文件")
sys.exit(1)
if outdir is None:
outdir = cobol_files[0].parent
# 配置全局 Logger
outdir.mkdir(parents=True, exist_ok=True)
log_path = outdir / f"cobol_testgen_{datetime.now():%Y%m%d_%H%M%S}.log"
fh = logging.FileHandler(log_path, encoding="utf-8", mode="w")
fh.setLevel(logging.DEBUG)
fh.setFormatter(logging.Formatter(
"%(asctime)s [%(levelname)s] %(name)s: %(message)s"
))
sh = logging.StreamHandler()
sh.setLevel(logging.INFO)
sh.setFormatter(logging.Formatter("%(message)s"))
root_logger = logging.getLogger()
root_logger.setLevel(logging.DEBUG)
root_logger.addHandler(fh)
root_logger.addHandler(sh)
programs = []
for filepath in cobol_files:
if not filepath.exists():
logger.error(f"错误:文件不存在 {filepath}")
continue
source = filepath.read_text(encoding='utf-8')
source = resolve_copybooks(source, str(filepath.parent))
preprocessed = preprocess(source)
file_sec = parse_file_section(preprocessed)
# DATA DIVISION解析
data_div = extract_data_division(preprocessed)
if not data_div:
logger.error(f"错误:{filepath.name} 中没有 DATA DIVISION。")
continue
data_fields = parse_data_division(data_div)
if not data_fields:
logger.error(f"错误:{filepath.name} 中没有找到含 PIC 的字段。")
continue
# FieldDef → dict
fields_dict = []
parent_pic = {}
filler_counter = 0
for f in data_fields:
pi = f.pic_info
name = f.name
if name == 'FILLER':
filler_counter += 1
if filler_counter > 1:
name = f'FILLER_{filler_counter}'
entry = {
'name': name,
'level': f.level,
'pic': f.pic,
'pic_info': {
'type': pi.type if pi else 'unknown',
'digits': pi.digits if pi else 0,
'decimal': pi.decimal if pi else 0,
'length': pi.length if pi else 0,
'signed': pi.signed if pi else False,
},
'value': f.value,
'values': f.values,
'section': f.section,
'is_filler': f.is_filler,
'redefines': f.redefines,
'usage': f.usage,
'occurs': f.occurs_count,
'occurs_depending': f.occurs_depending,
}
if f.is_88:
entry['is_88'] = True
entry['parent'] = f.parent
# Copy parent's pic_info for value generation
if f.parent and f.parent in parent_pic:
entry['pic_info'] = dict(parent_pic[f.parent])
else:
parent_pic[name] = entry['pic_info']
fields_dict.append(entry)
fields_dict = expand_occurs(fields_dict)
# Build FD→children 和 field→FD 映射
fd_fields = {}
field_to_fd = {}
if file_sec:
for fd_name, rec_names in file_sec.items():
fds = []
seen = set()
for rec in rec_names:
if rec not in seen:
fds.append(rec)
seen.add(rec)
for child in _init_child_names(rec, fields_dict):
if child not in seen:
fds.append(child)
seen.add(child)
fd_fields[fd_name] = fds
for child in fds:
field_to_fd[child] = fd_name
logger.info(f"\n========== {filepath.name} ==========")
logger.info(f"\n字段列表:")
logger.info(f"{'层级':<6} {'名称':<25} {'PIC':<15} {'类型':<12} {'长度':<5}")
logger.info("-" * 65)
for f in fields_dict:
pi = f['pic_info']
t = pi.get('type', '?')
l = pi.get('digits', 0) + pi.get('decimal', 0) or pi.get('length', 0)
pic_display = str(f.get('pic', '')) if f.get('pic') else ('88-level' if f.get('is_88') else '')
logger.info(f"{f['level']:<6} {f['name']:<25} {pic_display:<15} {t:<12} {l:<5}")
# PROCEDURE DIVISION解析
proc_div = extract_procedure_division(preprocessed)
branch_paths = []
assignments = {}
if proc_div:
branch_tree, assignments = build_branch_tree(proc_div, fields_dict)
roles = classify_field_roles(branch_tree, assignments, fields_dict,
source=preprocessed, proc_text=proc_div)
logger.info(f"\n字段角色(输入/输出/出入/未用):")
for f in fields_dict:
if f.get('is_88'):
continue
logger.info(f" {f['name']:<30} {roles.get(f['name'], '?')}")
branch_paths_with_assigns = enum_paths(branch_tree, fields_dict)
branch_paths_with_assigns = [
(_filter_stop(c), a) for c, a in branch_paths_with_assigns
]
# OPEN 方向解析
open_dir = scan_open_statements(proc_div) if proc_div else {}
if proc_div:
logger.info(f"\n分支路径数:{len(branch_paths_with_assigns)}")
for i, (path_cons, _path_assign) in enumerate(branch_paths_with_assigns):
descs = []
for c in path_cons:
if len(c) == 4:
field, op, val, want = c
if op == 'not_in':
descs.append(f"{field} not in {val}")
else:
descs.append(f"{field} {op} {val} ({'T' if want else 'F'})")
logger.debug(f" 路径 {i + 1}: {', '.join(descs)}")
else:
logger.warning("\n没有找到 PROCEDURE DIVISION。")
branch_paths_with_assigns = [([], {})]
roles = {f['name']: 'unused' for f in fields_dict}
# 覆盖率报告(传入原始源文本用于行号定位)
cov_prefix = str(outdir / filepath.stem)
index_relpath = 'coverage/index.html'
cov_result = run_coverage(branch_tree, branch_paths_with_assigns, fields_dict,
source, cov_prefix, index_relpath=index_relpath)
records, kept_path_cons = generate_records(branch_paths_with_assigns, fields_dict, assignments, file_sec=file_sec)
# 输出 JSON(完整文件)
outpath = outdir / (filepath.stem + '.json')
output_json(records, outpath, roles,
fd_fields=fd_fields, field_to_fd=field_to_fd,
open_dir=open_dir,
path_cons_list=kept_path_cons)
# 输出入力 JSON(按 FD 拆分)
output_input_files(records, outdir, filepath.stem, roles,
fd_fields, field_to_fd, open_dir)
logger.info(f"\n输出:{outpath}{len(records)} 条记录)")
logger.debug(f"\n记录明细:")
for i, rec in enumerate(records, 1):
vals = []
for f in fields_dict:
r = roles.get(f['name'], '?')
marker = f"[{r[0].upper()}]" if r != '?' and r != 'unused' else ''
vals.append(f"{marker}{f['name']}={rec.get(f['name'], '?')}")
logger.debug(f" 记录 {i}: {' | '.join(vals)}")
programs.append(cov_result)
# 生成覆盖率总括索引页
if programs:
generate_coverage_index(programs, outdir)
logger.info(f"\n覆盖率总览:{outdir / 'coverage' / 'index.html'}")
# ════════════════════════════════════════════
# Phase 1: 可编程 API(供 orchestrator.py 调用)
# ════════════════════════════════════════════
def extract_structure(cobol_source: str) -> dict:
"""分析 COBOL 源码的结构,返回结构摘要。不生成测试数据,只做静态分析。
Returns:
dict with: paragraphs, decision_points, branch_tree, file_count,
open_directions, has_search_all, has_evaluate,
has_call, has_break, total_branches, total_paragraphs
"""
preprocessed = preprocess(cobol_source)
data_div = extract_data_division(preprocessed)
data_fields = parse_data_division(data_div) if data_div else []
fields_dict = []
for idx, f in enumerate(data_fields):
entry = {
'name': f.name if f.name != 'FILLER' else f'FILLER_{idx + 1}',
'level': f.level, 'pic': f.pic,
'pic_info': {
'type': f.pic_info.type if f.pic_info else 'unknown',
'digits': f.pic_info.digits if f.pic_info else 0,
'decimal': f.pic_info.decimal if f.pic_info else 0,
'length': f.pic_info.length if f.pic_info else 0,
'signed': f.pic_info.signed if f.pic_info else False,
},
'section': f.section, 'occurs': f.occurs_count,
'occurs_depending': f.occurs_depending,
'redefines': f.redefines, 'usage': f.usage,
}
if f.is_88:
entry['is_88'] = True
entry['parent'] = f.parent
entry['value'] = f.value
entry['values'] = f.values
fields_dict.append(entry)
fields_dict = expand_occurs(fields_dict)
proc_div = extract_procedure_division(preprocessed)
branch_tree = None
assignments = {}
if proc_div:
branch_tree, assignments = build_branch_tree(proc_div, fields_dict)
file_sec = parse_file_section(preprocessed)
open_dir = scan_open_statements(proc_div) if proc_div else {}
from .models import BrIf, BrEval, BrSeq
decision_points = []
total_branches = 0
def _walk(node, counter):
nonlocal total_branches
if isinstance(node, BrIf):
counter[0] += 1
branches = 2
decision_points.append({
"id": counter[0], "kind": "IF",
"label": str(node.condition)[:80], "branches": branches,
})
total_branches += branches
_walk(node.true_seq, counter)
_walk(node.false_seq, counter)
elif isinstance(node, BrEval):
counter[0] += 1
n = len(node.when_list) + (1 if node.has_other else 0)
decision_points.append({
"id": counter[0], "kind": "EVALUATE",
"label": str(node.subject)[:80], "branches": n,
})
total_branches += n
for _, seq in node.when_list:
_walk(seq, counter)
_walk(node.other_seq, counter)
elif isinstance(node, BrSeq):
for child in node.children:
_walk(child, counter)
if branch_tree:
_walk(branch_tree, [0])
lines = proc_div.split('\n') if proc_div else []
paragraphs = set()
for line in lines:
m = re.match(r'^\s*([A-Z0-9][A-Z0-9-]*)\.\s*$', line.strip())
if m:
paragraphs.add(m.group(1))
return {
"paragraphs": sorted(paragraphs) if paragraphs else [],
"decision_points": decision_points,
"branch_tree": branch_tree,
"file_count": len(file_sec) if file_sec else 0,
"open_directions": open_dir,
"has_search_all": any('SEARCH' in str(dp.get('label', '')) for dp in decision_points),
"has_evaluate": any(dp['kind'] == 'EVALUATE' for dp in decision_points),
"has_call": 'CALL' in cobol_source.upper(),
"has_break": any('KEY' in str(dp.get('label', '')).upper() for dp in decision_points),
"total_branches": total_branches,
"total_paragraphs": len(paragraphs),
"branch_tree_obj": branch_tree,
}
def generate_data(cobol_source: str, structure: dict = None) -> list[dict]:
"""根据 COBOL 源码生成覆盖所有路径的测试数据。
Args:
cobol_source: COBOL 程序源码文本
structure: 可选,如果已调用 extract_structure() 可传入避免重复解析
Returns:
list[dict]: 测试数据记录列表,每条包含所有字段的值
"""
if structure is None:
structure = extract_structure(cobol_source)
branch_tree = structure.get("branch_tree_obj")
if branch_tree is None:
return []
preprocessed = preprocess(cobol_source)
data_div = extract_data_division(preprocessed)
data_fields = parse_data_division(data_div) if data_div else []
fields_dict = []
for f in data_fields:
entry = {
'name': f.name, 'level': f.level, 'pic': f.pic,
'pic_info': {
'type': f.pic_info.type if f.pic_info else 'unknown',
'digits': f.pic_info.digits if f.pic_info else 0,
'decimal': f.pic_info.decimal if f.pic_info else 0,
'length': f.pic_info.length if f.pic_info else 0,
'signed': f.pic_info.signed if f.pic_info else False,
},
'section': f.section, 'occurs': f.occurs_count,
'occurs_depending': f.occurs_depending,
'value': f.value, 'values': f.values,
'redefines': f.redefines, 'usage': f.usage,
}
if f.is_88:
entry['is_88'] = True
entry['parent'] = f.parent
fields_dict.append(entry)
fields_dict = expand_occurs(fields_dict)
proc_div = extract_procedure_division(preprocessed)
_, assignments = build_branch_tree(proc_div, fields_dict)
file_sec = parse_file_section(preprocessed)
branch_paths = enum_paths(branch_tree, fields_dict)
branch_paths = [(_filter_stop(c), a) for c, a in branch_paths]
records, kept_paths = generate_records(branch_paths, fields_dict, assignments, file_sec=file_sec)
return records
def incremental_supplement(branch_tree, decision_gaps: list[int]) -> list[dict]:
"""针对未覆盖的决策点,增量生成补充测试数据。
Args:
branch_tree: extract_structure() 返回的 branch_tree 字段
decision_gaps: 未覆盖的决策点 ID 列表,如 [1, 3, 5]
Returns:
list[dict]: 增量测试数据,格式与 generate_data() 兼容
"""
from .models import BrIf, BrEval, BrSeq
target_decisions = set(decision_gaps)
found = []
def _find_decisions(node, counter):
if isinstance(node, BrIf):
counter[0] += 1
if counter[0] in target_decisions:
found.append(("IF", node.condition))
_find_decisions(node.true_seq, counter)
_find_decisions(node.false_seq, counter)
elif isinstance(node, BrEval):
counter[0] += 1
if counter[0] in target_decisions:
found.append(("EVALUATE", node.subject))
for _, seq in node.when_list:
_find_decisions(seq, counter)
_find_decisions(node.other_seq, counter)
elif isinstance(node, BrSeq):
for child in node.children:
_find_decisions(child, counter)
_find_decisions(branch_tree, [0])
supplements = []
for i, (kind, label) in enumerate(found):
supplements.append({
"_dec_id": f"incr_{i}",
"_kind": kind,
"_label": str(label)[:60],
})
return supplements