7ac887c776
- Add INSPECT (TALLYING/REPLACING/CONVERTING) with BEFORE/AFTER INITIAL - Add SEARCH/SEARCH ALL with element-assignment path enumeration - Fix _mark_perform compound condition marking via evaluate_tree - Fix EVALUATE TRUE prior_false to collect all MC/DC false sets - Add impossible path filtering (Pass A.5) with trace-to-root conflict detection - Fix multi-line PERFORM VARYING parsing (VARYING/FROM/BY/UNTIL on separate lines) - Remove dead code: agents.py LLM parser (replaced by rule-based _BrParser) - 59 unit tests passing, 5 integration programs verified
895 lines
38 KiB
Python
895 lines
38 KiB
Python
"""设计层:路径枚举 + 值生成 + 约束应用"""
|
||
|
||
import re
|
||
import logging
|
||
from .models import BrSeq, BrIf, BrEval, BrPerform, BrSearch, Assign, CallNode, CondNot, CondLeaf, ExitNode, GoTo
|
||
from .cond import parse_single_condition, parse_compound_condition, is_field, collect_leaves, mcdc_sets, satisfying_value
|
||
from .core import trace_to_root, invert_through_chain, propagate_assignments, _basename
|
||
|
||
logger = logging.getLogger(__name__)
|
||
|
||
_STOP = ('__STOP__', '', None, True)
|
||
_MAX_PATHS = 10000
|
||
|
||
|
||
def _filter_stop(cons):
|
||
return [c for c in cons if c is not _STOP]
|
||
|
||
|
||
def _cap_paths(paths):
|
||
if len(paths) > _MAX_PATHS:
|
||
return paths[:_MAX_PATHS]
|
||
return paths
|
||
|
||
|
||
def _cap_paths_fair(new_active, child_paths):
|
||
"""两阶段公平截断:每个前置路径至少保留一条子路径,再填充剩余配额。"""
|
||
if len(new_active) <= _MAX_PATHS:
|
||
return new_active
|
||
k = len(child_paths)
|
||
if k <= 1:
|
||
return new_active[:_MAX_PATHS]
|
||
# 分离 STOP 路径(不参与组合,直接保留)
|
||
stop_paths = [(p, a) for p, a in new_active if any(c is _STOP for c in p)]
|
||
combined = [(p, a) for p, a in new_active if not any(c is _STOP for c in p)]
|
||
n_pred = len(combined) // k
|
||
result = list(stop_paths)
|
||
if n_pred <= 1:
|
||
result.extend(combined[:_MAX_PATHS - len(result)])
|
||
return result[:_MAX_PATHS]
|
||
remaining_quota = _MAX_PATHS - len(result)
|
||
# Phase 1: 每个前置至少保留一条子路径(轮询分配不同子路径索引)
|
||
quota = min(n_pred, remaining_quota)
|
||
selected = set()
|
||
for p_idx in range(quota):
|
||
c_idx = p_idx % k
|
||
idx = p_idx * k + c_idx
|
||
selected.add(idx)
|
||
result.append(combined[idx])
|
||
if len(result) >= _MAX_PATHS:
|
||
return result[:_MAX_PATHS]
|
||
# Phase 2: 用剩余配额填充其余组合
|
||
remaining = _MAX_PATHS - len(result)
|
||
for idx in range(len(combined)):
|
||
if idx not in selected:
|
||
result.append(combined[idx])
|
||
remaining -= 1
|
||
if remaining <= 0:
|
||
break
|
||
return result[:_MAX_PATHS]
|
||
|
||
|
||
# ── 路径枚举 ──
|
||
|
||
def enum_paths(node, fields):
|
||
"""枚举路径,每条路径返回 (constraints, assignments).
|
||
返回 list[tuple[list[tuple], dict]].
|
||
"""
|
||
if isinstance(node, Assign):
|
||
return [([], {node.target: [node.source_info]})]
|
||
|
||
if isinstance(node, BrSeq):
|
||
if not node.children:
|
||
return [([], {})]
|
||
paths = [([], {})]
|
||
for child in node.children:
|
||
child_paths = _cap_paths(enum_paths(child, fields))
|
||
new_active = []
|
||
for p_cons, p_assign in paths:
|
||
if any(c is _STOP for c in p_cons):
|
||
new_active.append((p_cons, p_assign))
|
||
continue
|
||
for cp_cons, cp_assign in child_paths:
|
||
merged = {}
|
||
for d in (p_assign, cp_assign):
|
||
for k, v in d.items():
|
||
merged.setdefault(k, []).extend(v if isinstance(v, list) else [v])
|
||
merged_cons = p_cons + list(cp_cons)
|
||
new_active.append((merged_cons, merged))
|
||
paths = _cap_paths_fair(new_active, child_paths)
|
||
return paths
|
||
|
||
elif isinstance(node, BrIf):
|
||
parsed = parse_single_condition(node.condition, fields)
|
||
if parsed and is_field(parsed[0], fields):
|
||
field, op, val = parsed
|
||
paths = []
|
||
true_sub = _cap_paths(enum_paths(node.true_seq, fields))
|
||
for sp_cons, sp_assign in (true_sub or [([], {})]):
|
||
paths.append(([(field, op, val, True)] + sp_cons, sp_assign))
|
||
false_sub = _cap_paths(enum_paths(node.false_seq, fields))
|
||
for fp_cons, fp_assign in (false_sub or [([], {})]):
|
||
paths.append(([(field, op, val, False)] + fp_cons, fp_assign))
|
||
return paths
|
||
# CondNot wrapping a single leaf (e.g., IF NOT WS-AMOUNT > 1000)
|
||
if node.cond_tree and isinstance(node.cond_tree, CondNot):
|
||
child = node.cond_tree.child
|
||
if isinstance(child, CondLeaf) and is_field(child.field, fields):
|
||
paths = []
|
||
true_sub = _cap_paths(enum_paths(node.true_seq, fields))
|
||
for sp_cons, sp_assign in (true_sub or [([], {})]):
|
||
paths.append(([(child.field, child.op, child.value, False)] + sp_cons, sp_assign))
|
||
false_sub = _cap_paths(enum_paths(node.false_seq, fields))
|
||
for fp_cons, fp_assign in (false_sub or [([], {})]):
|
||
paths.append(([(child.field, child.op, child.value, True)] + fp_cons, fp_assign))
|
||
return paths
|
||
if node.cond_tree:
|
||
leaves = collect_leaves(node.cond_tree)
|
||
if leaves and all(is_field(l.field, fields) for l in leaves):
|
||
sets = mcdc_sets(node.cond_tree, fields)
|
||
if sets:
|
||
paths = []
|
||
for constraints, decision in sets:
|
||
body = _cap_paths(enum_paths(
|
||
node.true_seq if decision else node.false_seq, fields
|
||
))
|
||
for sp_cons, sp_assign in (body or [([], {})]):
|
||
paths.append((constraints + sp_cons, sp_assign))
|
||
return paths
|
||
# CondLeaf fallback: 单 leaf(含 88-level 解析后的条件树)MC/DC 不适用
|
||
if len(leaves) == 1:
|
||
leaf = leaves[0]
|
||
paths = []
|
||
true_sub = _cap_paths(enum_paths(node.true_seq, fields))
|
||
for sp_cons, sp_assign in (true_sub or [([], {})]):
|
||
paths.append(([(leaf.field, leaf.op, leaf.value, True)] + sp_cons, sp_assign))
|
||
false_sub = _cap_paths(enum_paths(node.false_seq, fields))
|
||
for fp_cons, fp_assign in (false_sub or [([], {})]):
|
||
paths.append(([(leaf.field, leaf.op, leaf.value, False)] + fp_cons, fp_assign))
|
||
return paths
|
||
# Fallback: parsed condition but non-field (e.g. arithmetic expr)
|
||
if parsed:
|
||
field, op, val = parsed
|
||
paths = []
|
||
true_sub = enum_paths(node.true_seq, fields)
|
||
for sp_cons, sp_assign in (true_sub or [([], {})]):
|
||
paths.append(([(field, op, val, True)] + sp_cons, sp_assign))
|
||
false_sub = enum_paths(node.false_seq, fields)
|
||
for fp_cons, fp_assign in (false_sub or [([], {})]):
|
||
paths.append(([(field, op, val, False)] + fp_cons, fp_assign))
|
||
return paths
|
||
return [([], {})]
|
||
|
||
elif isinstance(node, BrEval):
|
||
if node.subjects:
|
||
paths = []
|
||
prior_false_cons = []
|
||
for values, seq in node.when_list:
|
||
sub = _cap_paths(enum_paths(seq, fields))
|
||
for sp_cons, sp_assign in (sub or [([], {})]):
|
||
when_cons = [(node.subjects[i], '=', values[i], True)
|
||
for i in range(len(node.subjects))]
|
||
constraints = list(prior_false_cons) + when_cons + sp_cons
|
||
paths.append((constraints, sp_assign))
|
||
for i in range(len(node.subjects)):
|
||
prior_false_cons.append((node.subjects[i], '=', values[i], False))
|
||
if node.has_other:
|
||
sub = _cap_paths(enum_paths(node.other_seq, fields))
|
||
for sp_cons, sp_assign in (sub or [([], {})]):
|
||
paths.append((list(prior_false_cons) + sp_cons, sp_assign))
|
||
return paths
|
||
if node.subject == 'TRUE':
|
||
paths = []
|
||
prior_false_sets = [] # list[list[Constraint]]
|
||
for value, seq in node.when_list:
|
||
cond = parse_compound_condition(value, fields)
|
||
if cond and isinstance(cond, CondLeaf) and is_field(cond.field, fields):
|
||
sub = _cap_paths(enum_paths(seq, fields))
|
||
for sp_cons, sp_assign in (sub or [([], {})]):
|
||
constraints = [c for pf in prior_false_sets for c in pf]
|
||
constraints.append((cond.field, cond.op, cond.value, True))
|
||
paths.append((constraints + sp_cons, sp_assign))
|
||
prior_false_sets.append([(cond.field, cond.op, cond.value, False)])
|
||
elif cond:
|
||
leaves = collect_leaves(cond)
|
||
if leaves and all(is_field(l.field, fields) for l in leaves):
|
||
sets = mcdc_sets(cond, fields)
|
||
if sets:
|
||
sub = _cap_paths(enum_paths(seq, fields))
|
||
new_false_sets = []
|
||
for cs, decision in sets:
|
||
if decision:
|
||
if not prior_false_sets:
|
||
for sp_cons, sp_assign in (sub or [([], {})]):
|
||
paths.append((list(cs) + sp_cons, sp_assign))
|
||
else:
|
||
for pf_set in prior_false_sets:
|
||
for sp_cons, sp_assign in (sub or [([], {})]):
|
||
paths.append((list(pf_set) + list(cs) + sp_cons, sp_assign))
|
||
else:
|
||
new_false_sets.append(cs)
|
||
if not new_false_sets:
|
||
prior_false_sets = []
|
||
break
|
||
combined = []
|
||
for pf_set in prior_false_sets:
|
||
for nf_set in new_false_sets:
|
||
combined.append(list(pf_set) + list(nf_set))
|
||
prior_false_sets = combined
|
||
else:
|
||
prior_false_sets = []
|
||
break
|
||
else:
|
||
prior_false_sets = []
|
||
break
|
||
else:
|
||
prior_false_sets = []
|
||
break
|
||
if node.has_other:
|
||
sub = _cap_paths(enum_paths(node.other_seq, fields))
|
||
for sp_cons, sp_assign in (sub or [([], {})]):
|
||
constraints = [c for pf in prior_false_sets for c in pf]
|
||
paths.append((constraints + sp_cons, sp_assign))
|
||
return paths
|
||
if not is_field(node.subject, fields):
|
||
return [([], {})]
|
||
paths = []
|
||
for value, seq in node.when_list:
|
||
sub = _cap_paths(enum_paths(seq, fields))
|
||
for sp_cons, sp_assign in (sub or [([], {})]):
|
||
paths.append(([(node.subject, '=', value, True)] + sp_cons, sp_assign))
|
||
if node.has_other:
|
||
case_vals = [v for v, _ in node.when_list]
|
||
sub = _cap_paths(enum_paths(node.other_seq, fields))
|
||
for sp_cons, sp_assign in (sub or [([], {})]):
|
||
paths.append(([(node.subject, 'not_in', case_vals, True)] + sp_cons, sp_assign))
|
||
return paths
|
||
|
||
elif isinstance(node, BrSearch):
|
||
return _enum_search_paths(node, fields)
|
||
|
||
elif isinstance(node, BrPerform):
|
||
if node.perf_type in ('para', 'thru'):
|
||
if node.body_seq:
|
||
return enum_paths(node.body_seq, fields)
|
||
return [([], {})]
|
||
elif node.perf_type in ('until', 'para_until', 'varying', 'para_varying'):
|
||
# 尝试单条件(现有逻辑)
|
||
parsed = parse_single_condition(node.condition, fields)
|
||
if parsed and is_field(parsed[0], fields):
|
||
field, op, val = parsed
|
||
paths = []
|
||
false_sub = _cap_paths(enum_paths(node.body_seq, fields))
|
||
for sp_cons, sp_assign in (false_sub or [([], {})]):
|
||
# PERFORM VARYING: 将 FROM 值作为 MOVE 赋值加入 Enter 路径
|
||
if node.varying_from and node.varying_var:
|
||
is_fld = any(f['name'] == node.varying_from for f in fields) if fields else False
|
||
from_asgn = {'type': 'move', 'source_vars': [node.varying_from]} if is_fld else {'type': 'move_literal', 'literal': node.varying_from}
|
||
from_assign = {node.varying_var: [from_asgn]}
|
||
merged = {}
|
||
for d in (from_assign, sp_assign):
|
||
for k, v in d.items():
|
||
merged.setdefault(k, []).extend(v if isinstance(v, list) else [v])
|
||
sp_assign = merged
|
||
paths.append(([(field, op, val, False)] + sp_cons, sp_assign))
|
||
paths.append(([(field, op, val, True)], {}))
|
||
return paths
|
||
# 尝试复合条件(AND/OR)
|
||
cond_tree = parse_compound_condition(node.condition, fields)
|
||
if cond_tree:
|
||
leaves = collect_leaves(cond_tree)
|
||
if leaves and all(is_field(l.field, fields) for l in leaves):
|
||
sets = mcdc_sets(cond_tree, fields)
|
||
if sets:
|
||
paths = []
|
||
false_sub = _cap_paths(enum_paths(node.body_seq, fields))
|
||
for sp_cons, sp_assign in (false_sub or [([], {})]):
|
||
# PERFORM VARYING: 将 FROM 值作为 MOVE 赋值加入 Enter 路径
|
||
if node.varying_from and node.varying_var:
|
||
is_fld = any(f['name'] == node.varying_from for f in fields) if fields else False
|
||
from_asgn = {'type': 'move', 'source_vars': [node.varying_from]} if is_fld else {'type': 'move_literal', 'literal': node.varying_from}
|
||
from_assign = {node.varying_var: [from_asgn]}
|
||
merged = {}
|
||
for d in (from_assign, sp_assign):
|
||
for k, v in d.items():
|
||
merged.setdefault(k, []).extend(v if isinstance(v, list) else [v])
|
||
sp_assign = merged
|
||
for constraints, decision in sets:
|
||
if not decision:
|
||
paths.append((list(constraints) + sp_cons, sp_assign))
|
||
for constraints, decision in sets:
|
||
if decision:
|
||
paths.append((list(constraints), {}))
|
||
if paths:
|
||
return paths
|
||
return [([], {})]
|
||
|
||
elif isinstance(node, CallNode):
|
||
return [([], {})]
|
||
|
||
elif isinstance(node, ExitNode):
|
||
return [([_STOP], {})]
|
||
|
||
elif isinstance(node, GoTo):
|
||
paths = enum_paths(node.body_seq, fields)
|
||
return [([_STOP] + c, a) for c, a in paths]
|
||
|
||
return [([], {})]
|
||
|
||
|
||
# ── 值生成 ──
|
||
|
||
def seq_numeric(seq_num: int, total_digits: int) -> str:
|
||
val = seq_num % (10 ** total_digits)
|
||
if val == 0:
|
||
val = 10 ** total_digits - 1
|
||
return str(val).zfill(total_digits)
|
||
|
||
|
||
def seq_alpha(seq_num: int, length: int) -> str:
|
||
letter = chr(65 + (seq_num - 1) % 26)
|
||
return letter * length
|
||
|
||
|
||
def seq_date(seq_num: int) -> str:
|
||
from datetime import datetime, timedelta
|
||
base = datetime(2000, 1, 1)
|
||
d = base + timedelta(days=seq_num - 1)
|
||
return d.strftime('%Y%m%d')
|
||
|
||
|
||
def _is_date_field(name: str) -> bool:
|
||
patterns = [r'DATE', r'YYMMDD', r'YYYYMM', r'YEAR', r'MONTH', r'DAY']
|
||
for p in patterns:
|
||
if re.search(p, name.upper()):
|
||
return True
|
||
return False
|
||
|
||
|
||
_SPECIAL_VALUES = {
|
||
'ZERO': '0', 'ZEROS': '0', 'ZEROES': '0',
|
||
'SPACE': ' ', 'SPACES': ' ',
|
||
'HIGH-VALUE': '\xff', 'HIGH-VALUES': '\xff',
|
||
'LOW-VALUE': '\x00', 'LOW-VALUES': '\x00',
|
||
'QUOTE': "'", 'QUOTES': "'",
|
||
'ALL': '',
|
||
}
|
||
|
||
|
||
def _apply_value(field: dict, rec: dict) -> bool:
|
||
"""尝试应用 VALUE 子句的初始值。返回 True 表示已处理。"""
|
||
raw = field.get('value')
|
||
if raw is None:
|
||
return False
|
||
val = str(raw).strip("'\"").strip()
|
||
name = field['name']
|
||
pi = field.get('pic_info', {})
|
||
|
||
# 处理 COBOL 特殊值
|
||
if val.upper() in _SPECIAL_VALUES:
|
||
val = _SPECIAL_VALUES[val.upper()]
|
||
|
||
ftype = pi.get('type', 'unknown')
|
||
if ftype == 'numeric':
|
||
digits = pi.get('digits', 0) + pi.get('decimal', 0)
|
||
if digits:
|
||
rec[name] = val.zfill(digits)
|
||
else:
|
||
rec[name] = val
|
||
else:
|
||
length = pi.get('length', 0) or 1
|
||
rec[name] = val.ljust(length)[:length]
|
||
return True
|
||
|
||
|
||
def _children_of(group_name: str, fields: list) -> list:
|
||
"""返回组项目 group_name 在 fields 中的直属子字段列表(按声明顺序)。
|
||
终止条件:遇到同/更高级别(sibling/组边界)或 77 级(独立字段)。
|
||
"""
|
||
result = []
|
||
group_level = None
|
||
found = False
|
||
for f in fields:
|
||
if not found and f['name'] == group_name:
|
||
group_level = f['level']
|
||
found = True
|
||
continue
|
||
if found:
|
||
if f['level'] <= group_level or f['level'] == 77:
|
||
break
|
||
# 88-level 是条件名,不计为子字段
|
||
if f.get('is_88'):
|
||
continue
|
||
result.append(f)
|
||
return result
|
||
|
||
|
||
def _make_numeric_value(idx: int, record_num: int, total_digits: int) -> str:
|
||
for step in (100, 10, 1):
|
||
val = idx * step + record_num
|
||
if val < 10 ** total_digits:
|
||
return str(val).zfill(total_digits)
|
||
return str(record_num).zfill(total_digits)
|
||
|
||
|
||
def _make_alpha_value(idx: int, record_num: int, length: int) -> str:
|
||
if length == 1:
|
||
ch = chr(65 + (idx + record_num - 2) % 26)
|
||
return ch
|
||
letter = chr(65 + (idx - 1) % 26)
|
||
return letter + str(record_num).zfill(length - 1)
|
||
|
||
|
||
def make_base_record(seq_num: int, fields: list) -> dict:
|
||
rec = {}
|
||
redefines_map = {} # 标量 REDEFINES: parent_name → [child_names]
|
||
group_redefines = [] # 组 REDEFINES: [(redef_name, target_name)]
|
||
filler_key_counter = 0
|
||
numeric_idx = 0
|
||
alpha_idx = 0
|
||
record_num = seq_num
|
||
|
||
for f in fields:
|
||
name = f['name']
|
||
|
||
if f.get('is_88'):
|
||
continue
|
||
|
||
if f.get('redefines'):
|
||
parent = f['redefines']
|
||
if f.get('pic'):
|
||
# 标量 REDEFINES(有 PIC,如 WS-AMOUNT-DISP REDEFINES WS-AMOUNT PIC X(9))
|
||
redefines_map.setdefault(parent, []).append(name)
|
||
continue
|
||
else:
|
||
# 组 REDEFINES(无 PIC,如 CUST-ADDR2 REDEFINES CUST-ADDR)
|
||
group_redefines.append((name, parent))
|
||
# 不 continue — 组本身无 PIC 会在下方"组项目跳过"处理
|
||
# 其子字段作为独立字段正常走循环
|
||
|
||
if f.get('is_filler'):
|
||
if name in rec:
|
||
filler_key_counter += 1
|
||
name = f'FILLER_{filler_key_counter + 1}'
|
||
rec[name] = 'x' * (f.get('pic_info', {}).get('length', 0) or 1)
|
||
continue
|
||
|
||
# Pass 0: VALUE 子句初始值优先
|
||
if _apply_value(f, rec):
|
||
continue
|
||
|
||
# 组项目(无 PIC)跳过
|
||
if not f.get('pic'):
|
||
continue
|
||
|
||
pi = f.get('pic_info', {})
|
||
ftype = pi.get('type', 'unknown')
|
||
digits = pi.get('digits', 0)
|
||
decimal = pi.get('decimal', 0)
|
||
length = pi.get('length', 0)
|
||
|
||
if ftype == 'numeric':
|
||
if _is_date_field(name):
|
||
rec[name] = seq_date(record_num)
|
||
else:
|
||
numeric_idx += 1
|
||
rec[name] = _make_numeric_value(numeric_idx, record_num, digits + decimal)
|
||
elif ftype in ('alphanumeric', 'alphabetic'):
|
||
alpha_idx += 1
|
||
rec[name] = _make_alpha_value(alpha_idx, record_num, length or 1)
|
||
elif ftype == 'numeric-edited':
|
||
numeric_idx += 1
|
||
raw = _make_numeric_value(numeric_idx, record_num, digits + decimal)
|
||
rec[name] = raw.rjust(length)
|
||
else:
|
||
alpha_idx += 1
|
||
rec[name] = _make_alpha_value(alpha_idx, record_num, 8)
|
||
|
||
# Pass 2a: 标量 REDEFINES 复制
|
||
for parent_name, child_names in redefines_map.items():
|
||
if parent_name in rec:
|
||
for child_name in child_names:
|
||
rec[child_name] = rec[parent_name]
|
||
|
||
# Pass 2b: 组 REDEFINES 按位置递归复制子字段
|
||
for redef_name, target_name in group_redefines:
|
||
redef_kids = _children_of(redef_name, fields)
|
||
tgt_kids = _children_of(target_name, fields)
|
||
tgt_idx = 0
|
||
for i, rk in enumerate(redef_kids):
|
||
if tgt_idx >= len(tgt_kids):
|
||
break
|
||
if i == len(redef_kids) - 1 and len(redef_kids) < len(tgt_kids):
|
||
# 最后一个 REDEFINES 子字段,且目标更多 → 拼接剩余所有目标值
|
||
parts = [rec.get(tk['name'], '') for tk in tgt_kids[tgt_idx:]]
|
||
rec[rk['name']] = ''.join(parts)
|
||
elif i == len(redef_kids) - 1 and len(redef_kids) > len(tgt_kids):
|
||
# REDEFINES 子字段更多 → 最后一个 REDEFINES 子字段取最后目标值
|
||
rec[rk['name']] = rec.get(tgt_kids[-1]['name'], '')
|
||
else:
|
||
rec[rk['name']] = rec.get(tgt_kids[tgt_idx]['name'], '')
|
||
tgt_idx += 1
|
||
|
||
return rec
|
||
|
||
|
||
# ── 约束应用 ──
|
||
|
||
def _check_constraint_satisfied(rec, field_name, operator, value, want_true, fields):
|
||
"""检查 field_name 当前值是否满足该约束。满足返回 True。"""
|
||
for f in fields:
|
||
if f['name'] == field_name:
|
||
pi = f.get('pic_info', {})
|
||
ftype = pi.get('type', 'unknown')
|
||
val = rec.get(field_name)
|
||
if val is None:
|
||
return False
|
||
if operator == 'not_in':
|
||
cases = value if isinstance(value, list) else []
|
||
return str(val) not in cases
|
||
if ftype == 'numeric':
|
||
try:
|
||
num_val = int(float(str(val)))
|
||
num_target = int(float(str(value)))
|
||
except (ValueError, TypeError):
|
||
return False
|
||
if operator in ('>=', '>', '<', '<=', '=', '<>'):
|
||
if operator == '>=': ok = num_val >= num_target
|
||
elif operator == '>': ok = num_val > num_target
|
||
elif operator == '<': ok = num_val < num_target
|
||
elif operator == '<=': ok = num_val <= num_target
|
||
elif operator == '=': ok = num_val == num_target
|
||
elif operator == '<>': ok = num_val != num_target
|
||
return ok == want_true
|
||
return True
|
||
else:
|
||
s_val = str(val).strip().upper()
|
||
s_target = str(value).strip().upper()
|
||
eq = s_val == s_target
|
||
if operator == '=':
|
||
return eq == want_true
|
||
elif operator == '<>':
|
||
return (not eq) == want_true
|
||
return True
|
||
return False
|
||
|
||
|
||
_ARITH_BOUNDS = {
|
||
'left_big_ops': {'>', '>=', '<>'},
|
||
'left_small_ops': {'<', '<='},
|
||
}
|
||
|
||
def _arith_pic_info(field_name, fields):
|
||
for f in fields:
|
||
if f['name'] == field_name.upper():
|
||
return f.get('pic_info', {})
|
||
return {}
|
||
|
||
def _arith_numeric_pick(field_name, want_big, fields):
|
||
"""为字段选一个大值或小值,返回字符串。"""
|
||
pi = _arith_pic_info(field_name, fields)
|
||
if pi.get('type') != 'numeric':
|
||
return None
|
||
digits = pi.get('digits', 0)
|
||
decimal = pi.get('decimal', 0)
|
||
total = digits + decimal
|
||
max_val = 10 ** total - 1
|
||
if want_big:
|
||
pick = int(max_val * 0.7)
|
||
else:
|
||
pick = 1
|
||
int_part = str(pick // (10 ** decimal)).zfill(digits)
|
||
dec_part = str(pick % (10 ** decimal)).zfill(decimal)
|
||
if decimal == 0:
|
||
return int_part
|
||
return int_part + dec_part
|
||
|
||
def _apply_arith_constraint(rec, field_name, operator, value, want_true, fields):
|
||
"""对算术表达式条件进行字段值 steering。
|
||
|
||
例如 A + B > C (want_true=True):
|
||
- 左值字段(A, B)设大 → 右值字段(C)设小
|
||
例如 A + B <= C (want_true=True):
|
||
- 左值字段设小 → 右值字段设大
|
||
|
||
这是启发式 steering,不是精确求解。
|
||
主要目标是保证分支可达,不保证边界值精确。
|
||
"""
|
||
# 1. 提取左值表达式中的所有字段名(大写)
|
||
tokens = re.findall(r'\b[A-Z][A-Z0-9-]*(?:\([^)]*\))?\b', field_name.upper())
|
||
left_fields = [t for t in tokens if any(f['name'] == t for f in fields)]
|
||
|
||
# 2. 右值是否也为字段
|
||
right_field = value if any(f['name'] == value for f in fields) else None
|
||
|
||
if not left_fields:
|
||
logger.debug(f"算术表达式无法提取字段: {field_name}")
|
||
return
|
||
|
||
# 3. 确定方向:want_true 时左值应大还是小
|
||
if operator in _ARITH_BOUNDS['left_big_ops']:
|
||
left_big = want_true
|
||
elif operator in _ARITH_BOUNDS['left_small_ops']:
|
||
left_big = not want_true
|
||
else:
|
||
left_big = want_true
|
||
|
||
# 4. 设置左值字段
|
||
for lf in left_fields:
|
||
pick = _arith_numeric_pick(lf, left_big, fields)
|
||
if pick is not None:
|
||
rec[lf] = pick
|
||
|
||
# 5. 设置右值字段(如果有)
|
||
if right_field:
|
||
pick = _arith_numeric_pick(right_field, not left_big, fields)
|
||
if pick is not None:
|
||
rec[right_field] = pick
|
||
|
||
|
||
def apply_constraint(rec, field_name, operator, value, want_true, fields, assignments=None, path_assign=None):
|
||
# 标准化字段名:去除括号内空格(WS-CELL ( 1, 1 ) → WS-CELL(1,1))
|
||
field_name = re.sub(r'\s*([(),])\s*', r'\1', field_name)
|
||
# 变量下标解析:WS-FIXED-VALUE(WS-IDX) → WS-FIXED-VALUE(1)
|
||
vm = re.match(r'^(\w[\w-]*)\((\w[\w-]*)\)$', field_name)
|
||
if vm:
|
||
base_var, subscript_var = vm.groups()
|
||
if subscript_var in rec:
|
||
try:
|
||
resolved_name = f'{base_var}({int(rec[subscript_var])})'
|
||
if any(f['name'] == resolved_name for f in fields):
|
||
apply_constraint(rec, resolved_name, operator, value, want_true, fields, assignments, path_assign)
|
||
return
|
||
except (ValueError, TypeError):
|
||
pass
|
||
# 下标传播:无下标约束 → 应用到所有下标变体
|
||
base = _basename(field_name)
|
||
subscripted = [f for f in fields if f['name'] != base and _basename(f['name']) == base]
|
||
if subscripted and field_name == base:
|
||
for sf in subscripted:
|
||
apply_constraint(rec, sf['name'], operator, value, want_true, fields, assignments, path_assign)
|
||
return
|
||
|
||
# REDEFINES 字段的约束重定向到父字段(共享存储)
|
||
for f in fields:
|
||
if f['name'] == field_name:
|
||
if f.get('is_filler'):
|
||
return
|
||
if f.get('redefines'):
|
||
parent_name = f['redefines']
|
||
logger.debug(f"REDEFINES 约束重定向: {field_name} → {parent_name}")
|
||
apply_constraint(rec, parent_name, operator, value, want_true, fields, assignments, path_assign)
|
||
return
|
||
break
|
||
if assignments:
|
||
root_var, chain = trace_to_root(field_name, assignments, fields, path_assign)
|
||
if root_var != field_name:
|
||
new_field_name, new_op, new_val = invert_through_chain(root_var, chain, operator, value)
|
||
if any(f['name'] == new_field_name for f in fields):
|
||
field_name, operator, value = new_field_name, new_op, new_val
|
||
|
||
# 如果当前值已满足该约束,跳过覆盖(保持先前约束的一致性)
|
||
if _check_constraint_satisfied(rec, field_name, operator, value, want_true, fields):
|
||
return
|
||
|
||
if operator == 'not_in':
|
||
for f in fields:
|
||
if f['name'] == field_name:
|
||
pi = f.get('pic_info', {})
|
||
cases = value if isinstance(value, list) else []
|
||
ftype = pi.get('type', 'unknown')
|
||
if ftype in ('alphanumeric', 'alphabetic'):
|
||
for c in 'ABCDEFGHIJKLMNOPQRSTUVWXYZ':
|
||
if c not in cases:
|
||
rec[field_name] = c.ljust(pi.get('length', 1), c)
|
||
return
|
||
else:
|
||
for n in range(1, 100):
|
||
if str(n) not in cases:
|
||
rec[field_name] = str(n).zfill(pi.get('digits', 0) + pi.get('decimal', 0))
|
||
return
|
||
return
|
||
# 字段间比较(值侧也是字段名)
|
||
if any(f['name'] == value for f in fields):
|
||
if re.search(r'[+\-*/]', field_name):
|
||
_apply_arith_constraint(rec, field_name, operator, value, want_true, fields)
|
||
else:
|
||
logger.debug(f"字段间比较约束跳过:{field_name} {operator} {value}")
|
||
return
|
||
for f in fields:
|
||
if f['name'] == field_name:
|
||
pi = f.get('pic_info', {})
|
||
val = satisfying_value(pi, operator, value, want_true)
|
||
rec[field_name] = val
|
||
return
|
||
|
||
|
||
# ── 记录生成入口 ──
|
||
|
||
def sync_redefined_fields(rec, fields):
|
||
"""赋值/约束后同步 REDEFINES 字段:父字段的值拷贝到所有 REDEFINES 子字段。"""
|
||
redefines_map = {}
|
||
group_redefines = []
|
||
for f in fields:
|
||
if f.get('is_88') or f.get('is_filler'):
|
||
continue
|
||
if f.get('redefines') and f.get('pic'):
|
||
redefines_map.setdefault(f['redefines'], []).append(f['name'])
|
||
elif f.get('redefines') and not f.get('pic'):
|
||
group_redefines.append((f['name'], f['redefines']))
|
||
for parent_name, child_names in redefines_map.items():
|
||
if parent_name in rec:
|
||
for child_name in child_names:
|
||
rec[child_name] = rec[parent_name]
|
||
for redef_name, target_name in group_redefines:
|
||
redef_kids = _children_of(redef_name, fields)
|
||
tgt_kids = _children_of(target_name, fields)
|
||
tgt_idx = 0
|
||
for i, rk in enumerate(redef_kids):
|
||
if tgt_idx >= len(tgt_kids):
|
||
break
|
||
if i == len(redef_kids) - 1 and len(redef_kids) < len(tgt_kids):
|
||
parts = [rec.get(tk['name'], '') for tk in tgt_kids[tgt_idx:]]
|
||
rec[rk['name']] = ''.join(parts)
|
||
elif i == len(redef_kids) - 1 and len(redef_kids) > len(tgt_kids):
|
||
rec[rk['name']] = rec.get(tgt_kids[-1]['name'], '')
|
||
else:
|
||
rec[rk['name']] = rec.get(tgt_kids[tgt_idx]['name'], '')
|
||
tgt_idx += 1
|
||
|
||
|
||
def apply_occurs_depending(rec, fields):
|
||
"""根据 OCCURS DEPENDING ON 变量的当前值,清零超范围的下标字段。"""
|
||
for f in fields:
|
||
dep_var = f.get('occurs_depending')
|
||
if not dep_var:
|
||
continue
|
||
name = f['name']
|
||
m = re.search(r'\((\d+)\)$', name)
|
||
if not m:
|
||
continue
|
||
sub = int(m.group(1))
|
||
max_val = int(rec.get(dep_var, 0))
|
||
if sub <= max_val:
|
||
continue
|
||
pi = f.get('pic_info', {})
|
||
ftype = pi.get('type', 'unknown')
|
||
length = pi.get('length', 0) or 1
|
||
if ftype == 'numeric':
|
||
rec[name] = '0' * (pi.get('digits', 0) + pi.get('decimal', 0))
|
||
elif ftype in ('alphanumeric', 'alphabetic'):
|
||
rec[name] = ' ' * length
|
||
else:
|
||
rec[name] = '0' * length
|
||
|
||
|
||
def _non_match_for(cond_leaf, fields):
|
||
if not fields or not cond_leaf:
|
||
return None
|
||
base = re.sub(r'\s*\(.*?\)\s*$', '', cond_leaf.field)
|
||
for f in fields:
|
||
if re.sub(r'\s*\(.*?\)\s*$', '', f['name']) == base:
|
||
pic = f.get('pic_info', {})
|
||
if pic.get('type') == 'numeric':
|
||
return '0'
|
||
return ' '
|
||
return None
|
||
|
||
|
||
def _enum_search_paths(node, fields):
|
||
# 从条件字段名推断 OCCURS 数;如 WS-CODE-VAL(WS-IDX) → 查 WS-CODE-VAL(j) 最大 j
|
||
occurs_count = 1
|
||
if node.when_list and node.cond_trees and node.cond_trees[0]:
|
||
ct = node.cond_trees[0]
|
||
if isinstance(ct, CondLeaf):
|
||
base = re.sub(r'\s*\(.*?\)\s*$', '', ct.field)
|
||
for f in fields:
|
||
m = re.match(rf'^{re.escape(base)}\((\d+)\)$', f['name'])
|
||
if m:
|
||
occurs_count = max(occurs_count, int(m.group(1)))
|
||
if occurs_count <= 1:
|
||
# 再查父组名下各字段的后缀
|
||
parent = node.table_name
|
||
for f in fields:
|
||
m = re.match(rf'^{re.escape(parent)}\((\d+)\)$', f['name'])
|
||
if m:
|
||
occurs_count = max(occurs_count, int(m.group(1)))
|
||
|
||
paths = []
|
||
for i, (cond_text, body_seq) in enumerate(node.when_list):
|
||
cond_tree = node.cond_trees[i] if i < len(node.cond_trees) else None
|
||
sub = _cap_paths(enum_paths(body_seq, fields))
|
||
if not sub:
|
||
sub = [([], {})]
|
||
|
||
extra_assign = {}
|
||
if cond_tree and isinstance(cond_tree, CondLeaf):
|
||
base = re.sub(r'\s*\(.*?\)\s*$', '', cond_tree.field)
|
||
matching_val = cond_tree.value
|
||
elem_key = f'{base}({i + 1})'
|
||
extra_assign[elem_key] = [{'type': 'move_literal', 'literal': matching_val}]
|
||
non_match = _non_match_for(cond_tree, fields) or ' '
|
||
for j in range(i):
|
||
prev_key = f'{base}({j + 1})'
|
||
extra_assign[prev_key] = [{'type': 'move_literal', 'literal': non_match}]
|
||
|
||
for sp_cons, sp_assign in (sub or [([], {})]):
|
||
merged_assign = dict(extra_assign)
|
||
for k, v in sp_assign.items():
|
||
merged_assign.setdefault(k, []).extend(v if isinstance(v, list) else [v])
|
||
paths.append((sp_cons, merged_assign))
|
||
|
||
if node.has_at_end:
|
||
sub = _cap_paths(enum_paths(node.at_end_seq, fields))
|
||
for sp_cons, sp_assign in (sub or [([], {})]):
|
||
extra_assign = {}
|
||
non_match = ' '
|
||
if node.when_list:
|
||
ct = node.cond_trees[0]
|
||
if ct and isinstance(ct, CondLeaf):
|
||
non_match = _non_match_for(ct, fields) or ' '
|
||
base = re.sub(r'\s*\(.*?\)\s*$', '', ct.field)
|
||
for j in range(max(occurs_count, 1)):
|
||
extra_assign[f'{base}({j + 1})'] = [{'type': 'move_literal', 'literal': non_match}]
|
||
merged_assign = dict(extra_assign)
|
||
for k, v in sp_assign.items():
|
||
merged_assign.setdefault(k, []).extend(v if isinstance(v, list) else [v])
|
||
paths.append((sp_cons, merged_assign))
|
||
|
||
return paths
|
||
|
||
|
||
def generate_records(branch_paths_with_assigns, data_fields, base_assignments=None, file_sec=None):
|
||
"""生成测试数据记录。
|
||
branch_paths_with_assigns: list of (constraints, path_assignments).
|
||
base_assignments: 全局 assignments dict (用于 trace_to_root).
|
||
返回: (records, kept_path_cons) — kept_path_cons 是与 records 一一对应的约束。
|
||
"""
|
||
records = []
|
||
kept_path_cons = []
|
||
if branch_paths_with_assigns:
|
||
for seq, (path_cons, path_assign) in enumerate(branch_paths_with_assigns, start=1):
|
||
path_cons = _filter_stop(path_cons)
|
||
rec = make_base_record(seq, data_fields)
|
||
# Pass A: 先传播赋值(MOVE/COMPUTE/READ INTO 等),模拟到决策点前的程序状态
|
||
if isinstance(path_assign, dict):
|
||
propagate_assignments(rec, path_assign, data_fields, file_sec=file_sec)
|
||
# Pass A.5: 检查约束是否经过链追溯到字面量截断(不可能路径)
|
||
skip_impossible = False
|
||
if base_assignments and isinstance(path_assign, dict):
|
||
for c in path_cons:
|
||
if len(c) == 4 and not skip_impossible:
|
||
field, op, val, want = c
|
||
root_var, chain = trace_to_root(field, base_assignments, data_fields, path_assign)
|
||
if root_var != field:
|
||
new_fn, new_op, new_val = invert_through_chain(root_var, chain, op, val)
|
||
if any(f['name'] == new_fn for f in data_fields):
|
||
asgn_val = path_assign.get(root_var)
|
||
if asgn_val is not None:
|
||
asgn_list = asgn_val if isinstance(asgn_val, list) else [asgn_val]
|
||
if asgn_list and asgn_list[-1]['type'] == 'move_literal' and root_var in rec:
|
||
if not _check_constraint_satisfied(rec, root_var, new_op, new_val, want, data_fields):
|
||
skip_impossible = True
|
||
break
|
||
if skip_impossible:
|
||
continue
|
||
# Pass B: 约束覆盖(确保决策条件满足,覆盖 MOVE 带来的值)
|
||
for c in path_cons:
|
||
if len(c) == 4:
|
||
field, op, val, want = c
|
||
apply_constraint(rec, field, op, val, want, data_fields, base_assignments, path_assign)
|
||
# Pass B.5: 前向再传播变量间MOVE,保持约束修改后的链一致性
|
||
if isinstance(path_assign, dict):
|
||
forward = {}
|
||
for tgt, asgn_val in path_assign.items():
|
||
asgn_list = asgn_val if isinstance(asgn_val, list) else [asgn_val]
|
||
filtered = [a for a in asgn_list if a['type'] == 'move' and a.get('source_vars')]
|
||
if filtered:
|
||
forward[tgt] = filtered
|
||
if forward:
|
||
propagate_assignments(rec, forward, data_fields, file_sec=file_sec)
|
||
# Pass C: 同步 REDEFINES(确保共享存储一致)
|
||
sync_redefined_fields(rec, data_fields)
|
||
# Pass D: OCCURS DEPENDING ON — 清零超范围的下标字段
|
||
apply_occurs_depending(rec, data_fields)
|
||
|
||
records.append(rec)
|
||
kept_path_cons.append(path_cons)
|
||
if not records:
|
||
rec = make_base_record(1, data_fields)
|
||
if base_assignments:
|
||
propagate_assignments(rec, base_assignments, data_fields, file_sec=file_sec)
|
||
records.append(rec)
|
||
kept_path_cons.append([])
|
||
return records, kept_path_cons
|