Files
cobol-java-v3/cobol_testgen/design.py
hangshuo652 7fb9304212 merge local cobol_testgen improvements into v3 shared modules
- cond.py: SQLCODE/SQLSTATE handling, alphanumeric >/< boundary fix
- output.py: termination tracking, db_input support, _is_field_assigned filter
- coverage.py: mark_from_gcov, THRU support, KeyError protection
- gcov.py: new file (dependency for coverage.py)
- grammar.lark: multi-segment PIC support
- read.py: SQL INCLUDE resolution, DECLARE TABLE parsing, * comment fix
- core.py: SQL parsing, blocked_names, keyword list
- design.py: multi-sentinel, THRU ranges, PERFORM VARYING last iteration
- __init__.py: local main() + v3 API functions, guarded imports

All 6 ZAN programs verified passing through v3 pipeline
2026-06-23 22:38:17 +08:00

1307 lines
57 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""设计层:路径枚举 + 值生成 + 约束应用"""
import re
import logging
from .models import BrSeq, BrIf, BrEval, BrPerform, BrSearch, Assign, CallNode, CondNot, CondLeaf, ExitNode, GoTo
from .cond import parse_single_condition, parse_compound_condition, is_field, collect_leaves, mcdc_sets, satisfying_value
from .core import trace_to_root, invert_through_chain, propagate_assignments, _basename
logger = logging.getLogger(__name__)
_STOP_EXIT_PERFORM = ('__STOP_EXIT_PERFORM__', '', None, True)
_STOP_SENTINEL = ('__STOP__', '', None, True)
_ABEND_SENTINEL = ('__ABEND__', '', None, True)
_SENTINELS_ALL = {_STOP_EXIT_PERFORM, _STOP_SENTINEL, _ABEND_SENTINEL}
_ABEND_PROGRAMS = {'ABENDPGM'}
def extend_abend_programs(names: list[str]):
_ABEND_PROGRAMS.update(n.upper() for n in names)
_MAX_PATHS = 10000
def _is_sentinel(c):
return c is _STOP_EXIT_PERFORM or c is _STOP_SENTINEL or c is _ABEND_SENTINEL
def _hashable_cons(cons):
"""将约束列表转为可哈希形式(列表值转tuple)用于签名去重。"""
result = []
for c in cons:
if len(c) == 4:
field, op, val, want = c
if isinstance(val, list):
val = tuple(val)
result.append((field, op, val, want))
else:
result.append(c)
return result
def _filter_stop(cons):
"""Legacy: strip all sentinel markers. 供旧测试代码使用。"""
return [c for c in cons if not _is_sentinel(c)]
def get_term_type(cons):
"""提取终止类型,返回 (filtered_cons, term_type)."""
remaining = []
term = 'normal'
for c in cons:
if c is _ABEND_SENTINEL:
term = 'abend'
elif _is_sentinel(c):
pass
else:
remaining.append(c)
return remaining, term
def _cap_paths(paths):
if len(paths) > _MAX_PATHS:
return paths[:_MAX_PATHS]
return paths
def _cap_paths_fair(new_active, child_paths):
"""两阶段公平截断:每个前置路径至少保留一条子路径,再填充剩余配额。"""
if len(new_active) <= _MAX_PATHS:
return new_active
k = len(child_paths)
if k <= 1:
return new_active[:_MAX_PATHS]
# 分离 sentinel 路径(不参与组合,直接保留)
stop_paths = [(p, a) for p, a in new_active if any(_is_sentinel(c) for c in p)]
combined = [(p, a) for p, a in new_active if not any(_is_sentinel(c) for c in p)]
n_pred = len(combined) // k
result = []
if n_pred <= 1:
result.extend(combined[:_MAX_PATHS - len(result)])
return result[:_MAX_PATHS]
remaining_quota = _MAX_PATHS - len(result)
# Phase 1: 每个前置至少保留一条子路径(轮询分配不同子路径索引)
quota = min(n_pred, remaining_quota)
selected = set()
for p_idx in range(quota):
c_idx = p_idx % k
idx = p_idx * k + c_idx
selected.add(idx)
result.append(combined[idx])
if len(result) >= _MAX_PATHS:
return result[:_MAX_PATHS]
# Phase 2: 用剩余配额填充其余组合
remaining = _MAX_PATHS - len(result)
for idx in range(len(combined)):
if idx not in selected:
result.append(combined[idx])
remaining -= 1
if remaining <= 0:
break
return result[:_MAX_PATHS]
# ── 路径枚举 ──
def enum_paths(node, fields):
"""枚举路径,每条路径返回 (constraints, assignments).
返回 list[tuple[list[tuple], dict]].
"""
if isinstance(node, Assign):
return [([], {node.target: [node.source_info]})]
if isinstance(node, BrSeq):
if not node.children:
return [([], {})]
paths = [([], {})]
for child in node.children:
child_paths = _cap_paths(enum_paths(child, fields))
if not child_paths:
continue
new_active = []
covered_sigs = set()
for p_cons, p_assign in paths:
if any(_is_sentinel(c) for c in p_cons):
new_active.append((p_cons, p_assign))
continue
for cp_cons, cp_assign in child_paths:
merged_cons = p_cons + list(cp_cons)
sig = frozenset(_hashable_cons(merged_cons))
if sig not in covered_sigs:
covered_sigs.add(sig)
merged = {}
for d in (p_assign, cp_assign):
for k, v in d.items():
merged.setdefault(k, []).extend(v if isinstance(v, list) else [v])
new_active.append((merged_cons, merged))
if not new_active:
for pc, pa in paths:
if not any(_is_sentinel(c) for c in pc):
new_active.append((pc, dict(pa)))
break
paths = new_active
return paths
elif isinstance(node, BrIf):
parsed = parse_single_condition(node.condition, fields)
if parsed and is_field(parsed[0], fields):
field, op, val = parsed
paths = []
true_sub = _cap_paths(enum_paths(node.true_seq, fields))
for sp_cons, sp_assign in (true_sub or [([], {})]):
paths.append(([(field, op, val, True)] + sp_cons, sp_assign))
false_sub = _cap_paths(enum_paths(node.false_seq, fields))
for fp_cons, fp_assign in (false_sub or [([], {})]):
paths.append(([(field, op, val, False)] + fp_cons, fp_assign))
return paths
# CondNot wrapping a single leaf (e.g., IF NOT WS-AMOUNT > 1000)
if node.cond_tree and isinstance(node.cond_tree, CondNot):
child = node.cond_tree.child
if isinstance(child, CondLeaf) and is_field(child.field, fields):
paths = []
true_sub = _cap_paths(enum_paths(node.true_seq, fields))
for sp_cons, sp_assign in (true_sub or [([], {})]):
paths.append(([(child.field, child.op, child.value, False)] + sp_cons, sp_assign))
false_sub = _cap_paths(enum_paths(node.false_seq, fields))
for fp_cons, fp_assign in (false_sub or [([], {})]):
paths.append(([(child.field, child.op, child.value, True)] + fp_cons, fp_assign))
return paths
if node.cond_tree:
leaves = collect_leaves(node.cond_tree)
if leaves and all(is_field(l.field, fields) for l in leaves):
sets = mcdc_sets(node.cond_tree, fields)
if sets:
paths = []
for constraints, decision in sets:
body = _cap_paths(enum_paths(
node.true_seq if decision else node.false_seq, fields
))
for sp_cons, sp_assign in (body or [([], {})]):
paths.append((constraints + sp_cons, sp_assign))
return paths
# CondLeaf fallback: 单 leaf(含 88-level 解析后的条件树)MC/DC 不适用
if len(leaves) == 1:
leaf = leaves[0]
paths = []
true_sub = _cap_paths(enum_paths(node.true_seq, fields))
for sp_cons, sp_assign in (true_sub or [([], {})]):
paths.append(([(leaf.field, leaf.op, leaf.value, True)] + sp_cons, sp_assign))
false_sub = _cap_paths(enum_paths(node.false_seq, fields))
for fp_cons, fp_assign in (false_sub or [([], {})]):
paths.append(([(leaf.field, leaf.op, leaf.value, False)] + fp_cons, fp_assign))
return paths
# Fallback: parsed condition but non-field (e.g. arithmetic expr)
if parsed:
field, op, val = parsed
paths = []
true_sub = enum_paths(node.true_seq, fields)
for sp_cons, sp_assign in (true_sub or [([], {})]):
paths.append(([(field, op, val, True)] + sp_cons, sp_assign))
false_sub = enum_paths(node.false_seq, fields)
for fp_cons, fp_assign in (false_sub or [([], {})]):
paths.append(([(field, op, val, False)] + fp_cons, fp_assign))
return paths
return [([], {})]
elif isinstance(node, BrEval):
if node.subjects:
paths = []
prior_false_cons = []
for values, seq in node.when_list:
sub = _cap_paths(enum_paths(seq, fields))
for sp_cons, sp_assign in (sub or [([], {})]):
when_cons = [(node.subjects[i], '=', values[i], True)
for i in range(len(node.subjects))]
constraints = list(prior_false_cons) + when_cons + sp_cons
paths.append((constraints, sp_assign))
for i in range(len(node.subjects)):
prior_false_cons.append((node.subjects[i], '=', values[i], False))
if node.has_other:
sub = _cap_paths(enum_paths(node.other_seq, fields))
for sp_cons, sp_assign in (sub or [([], {})]):
paths.append((list(prior_false_cons) + sp_cons, sp_assign))
return paths
if node.subject == 'TRUE':
paths = []
prior_false_sets = [] # list[list[Constraint]]
for value, seq in node.when_list:
cond = parse_compound_condition(value, fields)
if cond and isinstance(cond, CondLeaf) and is_field(cond.field, fields):
sub = _cap_paths(enum_paths(seq, fields))
for sp_cons, sp_assign in (sub or [([], {})]):
constraints = [c for pf in prior_false_sets for c in pf]
constraints.append((cond.field, cond.op, cond.value, True))
paths.append((constraints + sp_cons, sp_assign))
prior_false_sets.append([(cond.field, cond.op, cond.value, False)])
elif cond and isinstance(cond, CondNot) and isinstance(cond.child, CondLeaf) and is_field(cond.child.field, fields):
leaf = cond.child
sub = _cap_paths(enum_paths(seq, fields))
for sp_cons, sp_assign in (sub or [([], {})]):
constraints = [c for pf in prior_false_sets for c in pf]
constraints.append((leaf.field, leaf.op, leaf.value, False))
paths.append((constraints + sp_cons, sp_assign))
prior_false_sets.append([(leaf.field, leaf.op, leaf.value, True)])
elif cond:
leaves = collect_leaves(cond)
if leaves and all(is_field(l.field, fields) for l in leaves):
sets = mcdc_sets(cond, fields)
if sets:
sub = _cap_paths(enum_paths(seq, fields))
new_false_sets = []
for cs, decision in sets:
if decision:
if not prior_false_sets:
for sp_cons, sp_assign in (sub or [([], {})]):
paths.append((list(cs) + sp_cons, sp_assign))
else:
for pf_set in prior_false_sets:
for sp_cons, sp_assign in (sub or [([], {})]):
paths.append((list(pf_set) + list(cs) + sp_cons, sp_assign))
else:
new_false_sets.append(cs)
if not new_false_sets:
prior_false_sets = []
break
combined = []
for pf_set in prior_false_sets:
for nf_set in new_false_sets:
combined.append(list(pf_set) + list(nf_set))
prior_false_sets = combined
else:
prior_false_sets = []
break
else:
prior_false_sets = []
break
else:
prior_false_sets = []
break
if node.has_other:
sub = _cap_paths(enum_paths(node.other_seq, fields))
for sp_cons, sp_assign in (sub or [([], {})]):
constraints = [c for pf in prior_false_sets for c in pf]
paths.append((constraints + sp_cons, sp_assign))
return paths
if not is_field(node.subject, fields):
return [([], {})]
paths = []
for value, seq in node.when_list:
sub = _cap_paths(enum_paths(seq, fields))
thru_m = re.match(r'^(\d+)\s+THRU\s+(\d+)$', str(value), re.IGNORECASE)
if thru_m and not node.subjects:
low, high = thru_m.group(1), thru_m.group(2)
for sp_cons, sp_assign in (sub or [([], {})]):
paths.append(([(node.subject, '>=', low, True), (node.subject, '<=', high, True)] + sp_cons, sp_assign))
paths.append(([(node.subject, '<=', high, True), (node.subject, '>=', low, True)] + sp_cons, sp_assign))
else:
for sp_cons, sp_assign in (sub or [([], {})]):
paths.append(([(node.subject, '=', value, True)] + sp_cons, sp_assign))
if node.has_other:
sub = _cap_paths(enum_paths(node.other_seq, fields))
thru_found = False
for v, _ in node.when_list:
thru_m = re.match(r'^(\d+)\s+THRU\s+(\d+)$', str(v), re.IGNORECASE)
if thru_m and not node.subjects:
thru_found = True
low_int, high_int = int(thru_m.group(1)), int(thru_m.group(2))
for sp_cons, sp_assign in (sub or [([], {})]):
a_low = dict(sp_assign)
a_low[node.subject] = [{'type': 'move_literal', 'literal': str(max(0, low_int - 1))}]
low_cons = [(node.subject, 'not_in', [thru_m.group(1), thru_m.group(2)], True)]
paths.append((low_cons + sp_cons, a_low))
a_high = dict(sp_assign)
a_high[node.subject] = [{'type': 'move_literal', 'literal': str(high_int + 1)}]
high_cons = [(node.subject, 'not_in', [thru_m.group(1), thru_m.group(2)], True)]
paths.append((high_cons + sp_cons, a_high))
if not thru_found:
case_vals = [v for v, _ in node.when_list]
for sp_cons, sp_assign in (sub or [([], {})]):
paths.append(([(node.subject, 'not_in', case_vals, True)] + sp_cons, sp_assign))
return paths
elif isinstance(node, BrSearch):
return _enum_search_paths(node, fields)
elif isinstance(node, BrPerform):
if node.perf_type in ('para', 'thru'):
if node.body_seq:
paths = enum_paths(node.body_seq, fields)
# EXIT PERFORM 只在 PERFORM 体内有效,剥离后不影响后续 BrSeq 组合
paths = [([c for c in cons if c is not _STOP_EXIT_PERFORM], a) for cons, a in paths]
return paths
return [([], {})]
elif node.perf_type in ('until', 'para_until', 'varying', 'para_varying'):
# 尝试单条件(现有逻辑)
parsed = parse_single_condition(node.condition, fields)
if parsed and is_field(parsed[0], fields):
field, op, val = parsed
paths = []
false_sub = _cap_paths(enum_paths(node.body_seq, fields))
false_sub = [([c for c in cons if c is not _STOP_EXIT_PERFORM], a) for cons, a in false_sub]
for sp_cons, sp_assign in (false_sub or [([], {})]):
body_assign = dict(sp_assign)
# PERFORM VARYING: 将 FROM 值作为 MOVE 赋值加入 Enter 路径
if node.varying_from and node.varying_var:
is_fld = any(f['name'] == node.varying_from for f in fields) if fields else False
from_asgn = {'type': 'move', 'source_vars': [node.varying_from]} if is_fld else {'type': 'move_literal', 'literal': node.varying_from}
from_assign = {node.varying_var: [from_asgn]}
merged = {}
for d in (from_assign, sp_assign):
for k, v in d.items():
merged.setdefault(k, []).extend(v if isinstance(v, list) else [v])
sp_assign = merged
paths.append(([(field, op, val, False)] + sp_cons, sp_assign))
# PERFORM VARYING: 末次迭代路径(下标=MAX)
if node.varying_from and node.varying_var and op in ('>', '>=', '<', '<=', '='):
try:
if op == '>':
max_val = int(val)
elif op == '>=':
max_val = int(val) - 1
elif op == '<':
max_val = int(val)
elif op == '<=':
max_val = int(val) + 1
elif op == '=':
by_str = str(node.varying_by or '1')
if by_str.lstrip('-').isdigit() and int(by_str) < 0:
max_val = int(val) + 1
else:
max_val = int(val) - 1
from_val = int(node.varying_from)
by_str = str(node.varying_by or '1')
if by_str.lstrip('-').isdigit() and int(by_str) < 0:
ok = max_val <= from_val
else:
ok = max_val >= from_val
if ok:
max_asgn = {'type': 'move_literal', 'literal': str(max_val)}
max_assign = {node.varying_var: [max_asgn]}
merged_max = {}
for d in (max_assign, body_assign):
for k, v in d.items():
merged_max.setdefault(k, []).extend(v if isinstance(v, list) else [v])
the_cons = [(field, op, val, False)]
paths.append((the_cons + sp_cons, merged_max))
except (ValueError, TypeError):
pass
paths.append(([(field, op, val, True)], {}))
return paths
# 尝试复合条件(AND/OR
cond_tree = parse_compound_condition(node.condition, fields)
if cond_tree:
leaves = collect_leaves(cond_tree)
if leaves and all(is_field(l.field, fields) for l in leaves):
sets = mcdc_sets(cond_tree, fields)
if sets:
paths = []
false_sub = _cap_paths(enum_paths(node.body_seq, fields))
false_sub = [([c for c in cons if c is not _STOP_EXIT_PERFORM], a) for cons, a in false_sub]
for sp_cons, sp_assign in (false_sub or [([], {})]):
# PERFORM VARYING: 将 FROM 值作为 MOVE 赋值加入 Enter 路径
if node.varying_from and node.varying_var:
is_fld = any(f['name'] == node.varying_from for f in fields) if fields else False
from_asgn = {'type': 'move', 'source_vars': [node.varying_from]} if is_fld else {'type': 'move_literal', 'literal': node.varying_from}
from_assign = {node.varying_var: [from_asgn]}
merged = {}
for d in (from_assign, sp_assign):
for k, v in d.items():
merged.setdefault(k, []).extend(v if isinstance(v, list) else [v])
sp_assign = merged
for constraints, decision in sets:
if not decision:
paths.append((list(constraints) + sp_cons, sp_assign))
for constraints, decision in sets:
if decision:
paths.append((list(constraints), {}))
if paths:
return paths
return [([], {})]
elif isinstance(node, CallNode):
if node.program_name in _ABEND_PROGRAMS:
return [([_ABEND_SENTINEL], {})]
return [([], {})]
elif isinstance(node, ExitNode):
if node.exit_type == 'PERFORM':
return [([_STOP_EXIT_PERFORM], {})]
return [([_STOP_SENTINEL], {})]
elif isinstance(node, GoTo):
paths = enum_paths(node.body_seq, fields)
return [([_STOP_SENTINEL] + c, a) for c, a in paths]
return [([], {})]
# ── 值生成 ──
def seq_numeric(seq_num: int, total_digits: int) -> str:
val = seq_num % (10 ** total_digits)
if val == 0:
val = 10 ** total_digits - 1
return str(val).zfill(total_digits)
def seq_alpha(seq_num: int, length: int) -> str:
letter = chr(65 + (seq_num - 1) % 26)
return letter * length
def seq_date(seq_num: int) -> str:
from datetime import datetime, timedelta
base = datetime(2000, 1, 1)
d = base + timedelta(days=seq_num - 1)
return d.strftime('%Y%m%d')
def _is_date_field(name: str) -> bool:
patterns = [r'DATE', r'YYMMDD', r'YYYYMM']
for p in patterns:
if re.search(p, name.upper()):
return True
return False
_SPECIAL_VALUES = {
'ZERO': '0', 'ZEROS': '0', 'ZEROES': '0',
'SPACE': ' ', 'SPACES': ' ',
'HIGH-VALUE': '\xff', 'HIGH-VALUES': '\xff',
'LOW-VALUE': '\x00', 'LOW-VALUES': '\x00',
'QUOTE': "'", 'QUOTES': "'",
'ALL': '',
}
def _apply_value(field: dict, rec: dict) -> bool:
"""尝试应用 VALUE 子句的初始值。返回 True 表示已处理。"""
raw = field.get('value')
if raw is None:
return False
val = str(raw).strip("'\"").strip()
name = field['name']
pi = field.get('pic_info', {})
# 处理 COBOL 特殊值
if val.upper() in _SPECIAL_VALUES:
val = _SPECIAL_VALUES[val.upper()]
ftype = pi.get('type', 'unknown')
if ftype == 'numeric':
digits = pi.get('digits', 0) + pi.get('decimal', 0)
if digits:
rec[name] = val.zfill(digits)
else:
rec[name] = val
else:
length = pi.get('length', 0) or 1
rec[name] = val.ljust(length)[:length]
return True
def _children_of(group_name: str, fields: list) -> list:
"""返回组项目 group_name 在 fields 中的直属子字段列表(按声明顺序)。
终止条件:遇到同/更高级别(sibling/组边界)或 77 级(独立字段)。
"""
result = []
group_level = None
found = False
for f in fields:
if not found and f['name'] == group_name:
group_level = f['level']
found = True
continue
if found:
if f['level'] <= group_level or f['level'] == 77:
break
# 88-level 是条件名,不计为子字段
if f.get('is_88'):
continue
result.append(f)
return result
def _make_numeric_value(idx: int, record_num: int, total_digits: int) -> str:
max_val = 10 ** total_digits
for step in (100, 10, 1):
val = idx * step + record_num
if val < max_val:
return str(val).zfill(total_digits)
return str(record_num % max_val).zfill(total_digits)
def _make_alpha_value(idx: int, record_num: int, length: int) -> str:
if length == 1:
ch = chr(65 + (idx + record_num - 2) % 26)
return ch
letter = chr(65 + (idx - 1) % 26)
return letter + str(record_num).zfill(length - 1)
def make_base_record(seq_num: int, fields: list) -> dict:
rec = {}
redefines_map = {} # 标量 REDEFINES: parent_name → [child_names]
group_redefines = [] # 组 REDEFINES: [(redef_name, target_name)]
filler_key_counter = 0
numeric_idx = 0
alpha_idx = 0
record_num = seq_num
for f in fields:
name = f['name']
if f.get('is_88'):
continue
if f.get('redefines'):
parent = f['redefines']
if f.get('pic'):
# 标量 REDEFINES(有 PIC,如 WS-AMOUNT-DISP REDEFINES WS-AMOUNT PIC X(9)
redefines_map.setdefault(parent, []).append(name)
continue
else:
# 组 REDEFINES(无 PIC,如 CUST-ADDR2 REDEFINES CUST-ADDR
group_redefines.append((name, parent))
# 不 continue — 组本身无 PIC 会在下方"组项目跳过"处理
# 其子字段作为独立字段正常走循环
if f.get('is_filler'):
if name in rec:
filler_key_counter += 1
name = f'FILLER_{filler_key_counter + 1}'
rec[name] = 'x' * (f.get('pic_info', {}).get('length', 0) or 1)
continue
# Pass 0: VALUE 子句初始值优先
if _apply_value(f, rec):
continue
# 组项目(无 PIC)跳过
if not f.get('pic'):
continue
pi = f.get('pic_info', {})
ftype = pi.get('type', 'unknown')
digits = pi.get('digits', 0)
decimal = pi.get('decimal', 0)
length = pi.get('length', 0)
if ftype == 'numeric':
if _is_date_field(name):
rec[name] = seq_date(record_num)
else:
numeric_idx += 1
rec[name] = _make_numeric_value(numeric_idx, record_num, digits + decimal)
elif ftype in ('alphanumeric', 'alphabetic'):
alpha_idx += 1
rec[name] = _make_alpha_value(alpha_idx, record_num, length or 1)
elif ftype == 'numeric-edited':
numeric_idx += 1
raw = _make_numeric_value(numeric_idx, record_num, digits + decimal)
rec[name] = raw.rjust(length)
else:
alpha_idx += 1
rec[name] = _make_alpha_value(alpha_idx, record_num, 8)
# Pass 2a: 标量 REDEFINES 复制
for parent_name, child_names in redefines_map.items():
if parent_name in rec:
for child_name in child_names:
rec[child_name] = rec[parent_name]
# Pass 2b: 组 REDEFINES 按位置递归复制子字段
for redef_name, target_name in group_redefines:
redef_kids = _children_of(redef_name, fields)
tgt_kids = _children_of(target_name, fields)
tgt_idx = 0
for i, rk in enumerate(redef_kids):
if tgt_idx >= len(tgt_kids):
break
if i == len(redef_kids) - 1 and len(redef_kids) < len(tgt_kids):
# 最后一个 REDEFINES 子字段,且目标更多 → 拼接剩余所有目标值
parts = [rec.get(tk['name'], '') for tk in tgt_kids[tgt_idx:]]
rec[rk['name']] = ''.join(parts)
elif i == len(redef_kids) - 1 and len(redef_kids) > len(tgt_kids):
# REDEFINES 子字段更多 → 最后一个 REDEFINES 子字段取最后目标值
rec[rk['name']] = rec.get(tgt_kids[-1]['name'], '')
else:
rec[rk['name']] = rec.get(tgt_kids[tgt_idx]['name'], '')
tgt_idx += 1
return rec
# ── 约束应用 ──
def _check_constraint_satisfied(rec, field_name, operator, value, want_true, fields):
"""检查 field_name 当前值是否满足该约束。满足返回 True。"""
for f in fields:
if f['name'] == field_name:
pi = f.get('pic_info', {})
ftype = pi.get('type', 'unknown')
val = rec.get(field_name)
if val is None:
return False
if operator == 'not_in':
cases = value if isinstance(value, list) else []
return str(val) not in cases
if ftype == 'numeric':
try:
num_val = int(float(str(val)))
num_target = int(float(str(value)))
except (ValueError, TypeError):
return False
if operator in ('>=', '>', '<', '<=', '=', '<>'):
if operator == '>=': ok = num_val >= num_target
elif operator == '>': ok = num_val > num_target
elif operator == '<': ok = num_val < num_target
elif operator == '<=': ok = num_val <= num_target
elif operator == '=': ok = num_val == num_target
elif operator == '<>': ok = num_val != num_target
return ok == want_true
return True
else:
s_val = str(val).strip().upper()
s_target = str(value).strip().upper()
eq = s_val == s_target
if operator == '=':
return eq == want_true
elif operator == '<>':
return (not eq) == want_true
elif operator in ('>', '<', '>=', '<='):
if operator == '>':
ok = s_val > s_target
elif operator == '<':
ok = s_val < s_target
elif operator == '>=':
ok = s_val >= s_target
elif operator == '<=':
ok = s_val <= s_target
return ok == want_true
return True
return False
_ARITH_BOUNDS = {
'left_big_ops': {'>', '>=', '<>'},
'left_small_ops': {'<', '<='},
}
def _arith_pic_info(field_name, fields):
for f in fields:
if f['name'] == field_name.upper():
return f.get('pic_info', {})
return {}
def _arith_numeric_pick(field_name, want_big, fields):
"""为字段选一个大值或小值,返回字符串。"""
pi = _arith_pic_info(field_name, fields)
if pi.get('type') != 'numeric':
return None
digits = pi.get('digits', 0)
decimal = pi.get('decimal', 0)
total = digits + decimal
max_val = 10 ** total - 1
if want_big:
pick = int(max_val * 0.7)
else:
pick = 1
int_part = str(pick // (10 ** decimal)).zfill(digits)
dec_part = str(pick % (10 ** decimal)).zfill(decimal)
if decimal == 0:
return int_part
return int_part + dec_part
def _apply_arith_constraint(rec, field_name, operator, value, want_true, fields):
"""对算术表达式条件进行字段值 steering。
例如 A + B > C (want_true=True):
- 左值字段(A, B)设大 → 右值字段(C)设小
例如 A + B <= C (want_true=True):
- 左值字段设小 → 右值字段设大
这是启发式 steering,不是精确求解。
主要目标是保证分支可达,不保证边界值精确。
"""
# 1. 提取左值表达式中的所有字段名(大写)
tokens = re.findall(r'\b[A-Z][A-Z0-9-]*(?:\([^)]*\))?\b', field_name.upper())
left_fields = [t for t in tokens if any(f['name'] == t for f in fields)]
# 2. 右值是否也为字段
right_field = value if any(f['name'] == value for f in fields) else None
if not left_fields:
logger.debug(f"算术表达式无法提取字段: {field_name}")
return
# 3. 确定方向:want_true 时左值应大还是小
if operator in _ARITH_BOUNDS['left_big_ops']:
left_big = want_true
elif operator in _ARITH_BOUNDS['left_small_ops']:
left_big = not want_true
else:
left_big = want_true
# 4. 设置左值字段
for lf in left_fields:
pick = _arith_numeric_pick(lf, left_big, fields)
if pick is not None:
rec[lf] = pick
# 5. 设置右值字段(如果有)
if right_field:
pick = _arith_numeric_pick(right_field, not left_big, fields)
if pick is not None:
rec[right_field] = pick
def _inc_str(s, length):
s = str(s).strip()
try:
r = str(int(s) + 1).zfill(length)
return r if len(r) <= length else '9' * length
except ValueError:
c = list(str(s).ljust(length)[:length])
for i in range(len(c) - 1, -1, -1):
if c[i] not in ' 9Zz\xff':
c[i] = chr(ord(c[i]) + 1)
break
if c[i] == ' ':
c[i] = '0'
break
if c[i] == '9':
c[i] = '0'
elif c[i] == 'Z':
c[i] = 'A'
elif c[i] == 'z':
c[i] = 'a'
return ''.join(c)
def _dec_str(s, length):
s = str(s).strip()
try:
n = max(0, int(s) - 1)
return str(n).zfill(length)
except ValueError:
c = list(str(s).ljust(length)[:length])
for i in range(len(c) - 1, -1, -1):
if c[i] not in ' 0Aa\x00':
c[i] = chr(ord(c[i]) - 1)
break
if c[i] == ' ':
break
if c[i] == '0':
c[i] = '9'
elif c[i] == 'A':
c[i] = ' '
elif c[i] == 'a':
c[i] = ' '
return ''.join(c)
def _reconcile_unstring_fields(rec, left_field, operator, right_field, want_true,
fields, left_chain, assignments, path_assign):
right_root, right_chain = trace_to_root(right_field, assignments, fields, path_assign)
if right_root not in rec:
logger.debug(f"字段间比较协调:右侧根 {right_root} 不在 rec,跳过")
return
all_entries = (left_chain or []) + (right_chain or [])
for _, asgn in all_entries:
if asgn.get('type') not in ('move', 'unstring_split'):
logger.debug(f"字段间比较协调:链含非 MOVE 类型 {asgn.get('type')},跳过")
return
left_val = str(rec.get(left_field, ''))
if not left_val.strip():
logger.debug(f"字段间比较协调:左侧 {left_field} 无值,跳过")
return
length = 0
for f in fields:
if f['name'] == right_root:
length = f.get('pic_info', {}).get('length', 0)
break
if length == 0:
length = len(left_val)
if operator in ('>=', '<='):
if want_true:
right_val = left_val
else:
right_val = _inc_str(left_val, length) if operator == '>=' else _dec_str(left_val, length)
elif operator in ('>', '<'):
if want_true:
right_val = _dec_str(left_val, length) if operator == '>' else _inc_str(left_val, length)
else:
right_val = left_val
elif operator == '=':
right_val = left_val if want_true else _inc_str(left_val, length)
elif operator == '<>':
right_val = _inc_str(left_val, length) if want_true else left_val
else:
return
rec[right_root] = right_val[:length] if right_val else right_val
logger.debug(f"字段间比较协调:{left_field}={left_val} {operator} {right_field} -> {right_root}={rec[right_root]} (want={want_true})")
def apply_constraint(rec, field_name, operator, value, want_true, fields, assignments=None, path_assign=None):
# 标准化字段名:去除括号内空格(WS-CELL ( 1, 1 ) → WS-CELL(1,1)
field_name = re.sub(r'\s*([(),])\s*', r'\1', field_name)
# 变量下标解析:WS-FIXED-VALUE(WS-IDX) → WS-FIXED-VALUE(1)
vm = re.match(r'^(\w[\w-]*)\((\w[\w-]*)\)$', field_name)
if vm:
base_var, subscript_var = vm.groups()
if subscript_var in rec:
try:
resolved_name = f'{base_var}({int(rec[subscript_var])})'
if any(f['name'] == resolved_name for f in fields):
apply_constraint(rec, resolved_name, operator, value, want_true, fields, assignments, path_assign)
return
except (ValueError, TypeError):
pass
# 下标传播:无下标约束 → 应用到所有下标变体
base = _basename(field_name)
subscripted = [f for f in fields if f['name'] != base and _basename(f['name']) == base]
if subscripted and field_name == base:
for sf in subscripted:
apply_constraint(rec, sf['name'], operator, value, want_true, fields, assignments, path_assign)
return
# REDEFINES 字段的约束重定向到父字段(共享存储)
for f in fields:
if f['name'] == field_name:
if f.get('is_filler'):
return
if f.get('redefines'):
parent_name = f['redefines']
logger.debug(f"REDEFINES 约束重定向: {field_name}{parent_name}")
apply_constraint(rec, parent_name, operator, value, want_true, fields, assignments, path_assign)
return
break
chain = None
if assignments:
root_var, chain = trace_to_root(field_name, assignments, fields, path_assign)
if root_var != field_name:
new_field_name, new_op, new_val = invert_through_chain(root_var, chain, operator, value)
if any(f['name'] == new_field_name for f in fields):
field_name, operator, value = new_field_name, new_op, new_val
# 字段间比较:在 satisfied check 前解析/处理
if any(f['name'] == value for f in fields):
resolved_literal = None
for f in fields:
if f['name'] == value and f.get('value') is not None:
resolved_literal = str(f['value']).strip("'").strip('"')
break
if resolved_literal is not None:
value = resolved_literal
elif chain is not None and assignments:
_reconcile_unstring_fields(rec, field_name, operator, value, want_true,
fields, chain, assignments, path_assign)
return
elif re.search(r'[+\-*/]', field_name):
_apply_arith_constraint(rec, field_name, operator, value, want_true, fields)
return
else:
logger.debug(f"字段间比较约束跳过:{field_name} {operator} {value}")
return
# 如果当前值已满足该约束,跳过覆盖(保持先前约束的一致性)
# 但零值时强制使用边界值(非 0/非 min)
if _check_constraint_satisfied(rec, field_name, operator, value, want_true, fields):
cur = str(rec.get(field_name, '')).strip('0')
if (cur == '' or cur == '.') and (
(operator in ('>', '>=') and not want_true) or
(operator in ('<', '<=') and want_true)
):
for f in fields:
if f['name'] == field_name:
pi = f.get('pic_info', {})
if pi.get('type') == 'numeric':
val = satisfying_value(pi, operator, value, want_true)
rec[field_name] = val
return
return
if operator == 'not_in':
for f in fields:
if f['name'] == field_name:
pi = f.get('pic_info', {})
cases = value if isinstance(value, list) else []
ftype = pi.get('type', 'unknown')
if ftype in ('alphanumeric', 'alphabetic'):
for c in 'ABCDEFGHIJKLMNOPQRSTUVWXYZ':
if c not in cases:
rec[field_name] = c.ljust(pi.get('length', 1), c)
return
else:
for n in range(1, 100):
if str(n) not in cases:
rec[field_name] = str(n).zfill(pi.get('digits', 0) + pi.get('decimal', 0))
return
return
for f in fields:
if f['name'] == field_name:
pi = f.get('pic_info', {})
val = satisfying_value(pi, operator, value, want_true)
rec[field_name] = val
return
# ── 记录生成入口 ──
def sync_redefined_fields(rec, fields):
"""赋值/约束后同步 REDEFINES 字段:父字段的值拷贝到所有 REDEFINES 子字段。"""
redefines_map = {}
group_redefines = []
for f in fields:
if f.get('is_88') or f.get('is_filler'):
continue
if f.get('redefines') and f.get('pic'):
redefines_map.setdefault(f['redefines'], []).append(f['name'])
elif f.get('redefines') and not f.get('pic'):
group_redefines.append((f['name'], f['redefines']))
for parent_name, child_names in redefines_map.items():
if parent_name in rec:
for child_name in child_names:
rec[child_name] = rec[parent_name]
for redef_name, target_name in group_redefines:
redef_kids = _children_of(redef_name, fields)
tgt_kids = _children_of(target_name, fields)
tgt_idx = 0
for i, rk in enumerate(redef_kids):
if tgt_idx >= len(tgt_kids):
break
if i == len(redef_kids) - 1 and len(redef_kids) < len(tgt_kids):
parts = [rec.get(tk['name'], '') for tk in tgt_kids[tgt_idx:]]
rec[rk['name']] = ''.join(parts)
elif i == len(redef_kids) - 1 and len(redef_kids) > len(tgt_kids):
rec[rk['name']] = rec.get(tgt_kids[-1]['name'], '')
else:
rec[rk['name']] = rec.get(tgt_kids[tgt_idx]['name'], '')
tgt_idx += 1
def apply_occurs_depending(rec, fields):
"""根据 OCCURS DEPENDING ON 变量的当前值,清零超范围的下标字段。"""
# Phase 1: 将零值的 DEPENDING ON 变量设为最大下标
dep_max = {}
for f in fields:
dep_var = f.get('occurs_depending')
if not dep_var:
continue
m = re.search(r'\((\d+)\)$', f['name'])
if m:
sub = int(m.group(1))
if sub > dep_max.get(dep_var, 0):
dep_max[dep_var] = sub
for dep_var, max_sub in dep_max.items():
try:
cur_val = int(float(str(rec.get(dep_var, '0'))))
except (ValueError, TypeError):
cur_val = 0
if cur_val == 0:
for f in fields:
if f['name'] == dep_var:
pi = f.get('pic_info', {})
digits = pi.get('digits', 0) + pi.get('decimal', 0)
if digits > 0:
rec[dep_var] = str(max_sub).zfill(digits)
break
# Phase 2: 清零超范围的下标字段
for f in fields:
dep_var = f.get('occurs_depending')
if not dep_var:
continue
name = f['name']
m = re.search(r'\((\d+)\)$', name)
if not m:
continue
sub = int(m.group(1))
max_val = int(rec.get(dep_var, 0))
if sub <= max_val:
continue
pi = f.get('pic_info', {})
ftype = pi.get('type', 'unknown')
length = pi.get('length', 0) or 1
if ftype == 'numeric':
rec[name] = '0' * (pi.get('digits', 0) + pi.get('decimal', 0))
elif ftype in ('alphanumeric', 'alphabetic'):
rec[name] = ' ' * length
else:
rec[name] = '0' * length
def _non_match_for(cond_leaf, fields):
if not fields or not cond_leaf:
return None
base = re.sub(r'\s*\(.*?\)\s*$', '', cond_leaf.field)
for f in fields:
if re.sub(r'\s*\(.*?\)\s*$', '', f['name']) == base:
pic = f.get('pic_info', {})
if pic.get('type') == 'numeric':
return '0'
return ' '
return None
def _enum_search_paths(node, fields):
# 从条件字段名推断 OCCURS 数;如 WS-CODE-VAL(WS-IDX) → 查 WS-CODE-VAL(j) 最大 j
occurs_count = 1
if node.when_list and node.cond_trees and node.cond_trees[0]:
ct = node.cond_trees[0]
if isinstance(ct, CondLeaf):
base = re.sub(r'\s*\(.*?\)\s*$', '', ct.field)
for f in fields:
m = re.match(rf'^{re.escape(base)}\((\d+)\)$', f['name'])
if m:
occurs_count = max(occurs_count, int(m.group(1)))
if occurs_count <= 1:
# 再查父组名下各字段的后缀
parent = node.table_name
for f in fields:
m = re.match(rf'^{re.escape(parent)}\((\d+)\)$', f['name'])
if m:
occurs_count = max(occurs_count, int(m.group(1)))
paths = []
for i, (cond_text, body_seq) in enumerate(node.when_list):
cond_tree = node.cond_trees[i] if i < len(node.cond_trees) else None
sub = _cap_paths(enum_paths(body_seq, fields))
if not sub:
sub = [([], {})]
extra_assign = {}
if cond_tree and isinstance(cond_tree, CondLeaf):
base = re.sub(r'\s*\(.*?\)\s*$', '', cond_tree.field)
matching_val = cond_tree.value
elem_key = f'{base}({i + 1})'
if any(f['name'] == matching_val for f in fields):
extra_assign[elem_key] = [{'type': 'move', 'source_vars': [matching_val]}]
else:
extra_assign[elem_key] = [{'type': 'move_literal', 'literal': matching_val}]
non_match = _non_match_for(cond_tree, fields) or ' '
for j in range(i):
prev_key = f'{base}({j + 1})'
extra_assign[prev_key] = [{'type': 'move_literal', 'literal': non_match}]
for sp_cons, sp_assign in (sub or [([], {})]):
merged_assign = dict(extra_assign)
for k, v in sp_assign.items():
merged_assign.setdefault(k, []).extend(v if isinstance(v, list) else [v])
if cond_tree and isinstance(cond_tree, CondLeaf):
paths.append(([(elem_key, cond_tree.op, matching_val, True)] + sp_cons, merged_assign))
else:
paths.append((sp_cons, merged_assign))
if node.has_at_end:
sub = _cap_paths(enum_paths(node.at_end_seq, fields))
for sp_cons, sp_assign in (sub or [([], {})]):
extra_assign = {}
non_match = ' '
if node.when_list:
ct = node.cond_trees[0]
if ct and isinstance(ct, CondLeaf):
non_match = _non_match_for(ct, fields) or ' '
base = re.sub(r'\s*\(.*?\)\s*$', '', ct.field)
for j in range(max(occurs_count, 1)):
extra_assign[f'{base}({j + 1})'] = [{'type': 'move_literal', 'literal': non_match}]
merged_assign = dict(extra_assign)
for k, v in sp_assign.items():
merged_assign.setdefault(k, []).extend(v if isinstance(v, list) else [v])
paths.append((sp_cons, merged_assign))
return paths
def generate_records(path_infos, data_fields, base_assignments=None, file_sec=None):
"""生成测试数据记录。
path_infos: list of (constraints, path_assignments) 或 (constraints, path_assignments, term_type).
base_assignments: 全局 assignments dict (用于 trace_to_root).
返回: (records, kept_path_cons, term_types).
"""
# 自动兼容旧 2-tuple 格式
if path_infos and len(path_infos[0]) == 2:
path_infos = [(c, a, 'normal') for c, a in path_infos]
records = []
kept_path_cons = []
term_types = []
if path_infos:
for seq, (path_cons, path_assign, term_type) in enumerate(path_infos, start=1):
path_cons = _filter_stop(path_cons)
rec = make_base_record(seq, data_fields)
# Pass A: 先传播赋值(MOVE/COMPUTE/READ INTO 等),模拟到决策点前的程序状态
if isinstance(path_assign, dict):
propagate_assignments(rec, path_assign, data_fields, file_sec=file_sec)
# Pass A.5: 检查约束是否经过链追溯到字面量截断(不可能路径)
skip_impossible = False
if base_assignments and isinstance(path_assign, dict):
for c in path_cons:
if len(c) == 4 and not skip_impossible:
field, op, val, want = c
root_var, chain = trace_to_root(field, base_assignments, data_fields, path_assign)
if root_var != field:
new_fn, new_op, new_val = invert_through_chain(root_var, chain, op, val)
if any(f['name'] == new_fn for f in data_fields):
asgn_val = path_assign.get(root_var)
if asgn_val is not None:
asgn_list = asgn_val if isinstance(asgn_val, list) else [asgn_val]
if asgn_list and asgn_list[-1]['type'] == 'move_literal' and root_var in rec:
if not _check_constraint_satisfied(rec, root_var, new_op, new_val, want, data_fields):
skip_impossible = True
break
elif field in rec:
asgn_val = path_assign.get(field)
if asgn_val is not None:
asgn_list = asgn_val if isinstance(asgn_val, list) else [asgn_val]
if asgn_list and asgn_list[-1]['type'] == 'move_literal':
cur_val = str(rec.get(field, ''))
if cur_val != '':
pi = next((f.get('pic_info', {}) for f in data_fields if f['name'] == field), {})
if pi.get('type') == 'numeric':
try:
nv = int(float(cur_val))
tv = int(float(str(val)))
ops = {'>': lambda a,b: a > b, '<': lambda a,b: a < b, '=': lambda a,b: a == b, '<>': lambda a,b: a != b, '>=': lambda a,b: a >= b, '<=': lambda a,b: a <= b}
if op in ops:
satisfied = ops[op](nv, tv) == want
if not satisfied:
skip_impossible = True
break
except (ValueError, TypeError):
pass
if skip_impossible:
continue
# Pass B: 约束覆盖(确保决策条件满足,覆盖 MOVE 带来的值)
for c in path_cons:
if len(c) == 4:
field, op, val, want = c
apply_constraint(rec, field, op, val, want, data_fields, base_assignments, path_assign)
# Pass B.5: 前向再传播变量间MOVE,保持约束修改后的链一致性
if isinstance(path_assign, dict):
forward = {}
for tgt, asgn_val in path_assign.items():
asgn_list = asgn_val if isinstance(asgn_val, list) else [asgn_val]
filtered = [a for a in asgn_list if a['type'] == 'move' and a.get('source_vars')]
if filtered:
forward[tgt] = filtered
if forward:
propagate_assignments(rec, forward, data_fields, file_sec=file_sec)
# Pass B.75: COMPUTE 重算(约束修改了 COMPUTE 源字段的值)
if isinstance(path_assign, dict):
compute_only = {}
for tgt, asgn_val in path_assign.items():
asgn_list = asgn_val if isinstance(asgn_val, list) else [asgn_val]
filtered = [a for a in asgn_list if a['type'] == 'compute']
if filtered:
compute_only[tgt] = filtered
if compute_only:
propagate_assignments(rec, compute_only, data_fields, file_sec=file_sec)
# Pass B.8: UNSTRING source reconstruction (targets → source)
if base_assignments:
_reconstruct_unstring_sources(rec, base_assignments, data_fields)
# Pass C: 同步 REDEFINES(确保共享存储一致)
sync_redefined_fields(rec, data_fields)
# Pass D: OCCURS DEPENDING ON — 清零超范围的下标字段
apply_occurs_depending(rec, data_fields)
# Pass E: PIC 长度约束 — 模拟 COBOL 截断语义
for f in data_fields:
name = f['name']
if name in rec and not f.get('is_88') and not f.get('is_filler'):
pi = f.get('pic_info', {})
ftype = pi.get('type', 'unknown')
val = str(rec[name])
if ftype == 'numeric':
total = pi.get('digits', 0) + pi.get('decimal', 0)
if total > 0 and len(val) > total:
rec[name] = val[-total:].zfill(total)
elif ftype in ('alphanumeric', 'alphabetic'):
length = pi.get('length', 0)
if length > 0 and len(val) > length:
rec[name] = val[:length]
records.append(rec)
kept_path_cons.append(path_cons)
term_types.append(term_type)
# Track which fields were explicitly assigned in this path
if isinstance(path_assign, dict):
rec['_assigned_fields'] = set(path_assign.keys())
else:
rec['_assigned_fields'] = set()
if not records:
rec = make_base_record(1, data_fields)
if base_assignments:
propagate_assignments(rec, base_assignments, data_fields, file_sec=file_sec)
if base_assignments:
_reconstruct_unstring_sources(rec, base_assignments, data_fields)
rec['_assigned_fields'] = set()
records.append(rec)
kept_path_cons.append([])
term_types.append('normal')
return records, kept_path_cons, term_types
def _reconstruct_unstring_sources(rec, base_assignments, data_fields):
"""Build UNSTRING source field value from comma-separated target values.
After constraints determine target field values, construct the source
string so the COBOL UNSTRING can correctly parse it.
"""
groups = {}
for tgt, asgn_list in base_assignments.items():
for asgn in asgn_list:
if asgn.get('type') == 'unstring_split' and asgn.get('source_vars'):
src = asgn['source_vars'][0]
idx = asgn.get('index', 0)
groups.setdefault(src, []).append((idx, tgt))
for src_var, targets in groups.items():
targets.sort(key=lambda x: x[0])
# Resolve group→child name if source not directly in rec
resolved_src = src_var
if resolved_src not in rec:
grp_level = None
found = False
for f in data_fields:
if not found and f['name'] == resolved_src:
grp_level = f.get('level', 0)
found = True
continue
if found:
if f.get('level', 0) <= grp_level or f.get('level') == 77:
break
if f.get('pic'):
resolved_src = f['name']
break
if resolved_src not in rec:
continue
csv_parts = []
for idx, tgt in targets:
val = rec.get(tgt, '')
csv_parts.append(val if val is not None else '')
csv_value = ','.join(csv_parts)
src_len = 0
for f in data_fields:
if f['name'] == resolved_src:
pi = f.get('pic_info', {})
if pi:
src_len = pi.get('length', 0)
break
if src_len > 0:
csv_value = csv_value.ljust(src_len)[:src_len]
rec[resolved_src] = csv_value
# Also sync to child fields (group→elementary) for FD output consistency
if resolved_src == src_var:
grp_level = None
found = False
for f in data_fields:
if not found and f['name'] == resolved_src:
grp_level = f.get('level', 0)
found = True
continue
if found:
if f.get('level', 0) <= grp_level or f.get('level') == 77:
break
if f.get('pic'):
rec[f['name']] = csv_value
break