7fb9304212
- cond.py: SQLCODE/SQLSTATE handling, alphanumeric >/< boundary fix - output.py: termination tracking, db_input support, _is_field_assigned filter - coverage.py: mark_from_gcov, THRU support, KeyError protection - gcov.py: new file (dependency for coverage.py) - grammar.lark: multi-segment PIC support - read.py: SQL INCLUDE resolution, DECLARE TABLE parsing, * comment fix - core.py: SQL parsing, blocked_names, keyword list - design.py: multi-sentinel, THRU ranges, PERFORM VARYING last iteration - __init__.py: local main() + v3 API functions, guarded imports All 6 ZAN programs verified passing through v3 pipeline
1307 lines
57 KiB
Python
1307 lines
57 KiB
Python
"""设计层:路径枚举 + 值生成 + 约束应用"""
|
||
|
||
import re
|
||
import logging
|
||
from .models import BrSeq, BrIf, BrEval, BrPerform, BrSearch, Assign, CallNode, CondNot, CondLeaf, ExitNode, GoTo
|
||
from .cond import parse_single_condition, parse_compound_condition, is_field, collect_leaves, mcdc_sets, satisfying_value
|
||
from .core import trace_to_root, invert_through_chain, propagate_assignments, _basename
|
||
|
||
logger = logging.getLogger(__name__)
|
||
|
||
_STOP_EXIT_PERFORM = ('__STOP_EXIT_PERFORM__', '', None, True)
|
||
_STOP_SENTINEL = ('__STOP__', '', None, True)
|
||
_ABEND_SENTINEL = ('__ABEND__', '', None, True)
|
||
_SENTINELS_ALL = {_STOP_EXIT_PERFORM, _STOP_SENTINEL, _ABEND_SENTINEL}
|
||
_ABEND_PROGRAMS = {'ABENDPGM'}
|
||
|
||
def extend_abend_programs(names: list[str]):
|
||
_ABEND_PROGRAMS.update(n.upper() for n in names)
|
||
_MAX_PATHS = 10000
|
||
|
||
|
||
def _is_sentinel(c):
|
||
return c is _STOP_EXIT_PERFORM or c is _STOP_SENTINEL or c is _ABEND_SENTINEL
|
||
|
||
|
||
def _hashable_cons(cons):
|
||
"""将约束列表转为可哈希形式(列表值转tuple)用于签名去重。"""
|
||
result = []
|
||
for c in cons:
|
||
if len(c) == 4:
|
||
field, op, val, want = c
|
||
if isinstance(val, list):
|
||
val = tuple(val)
|
||
result.append((field, op, val, want))
|
||
else:
|
||
result.append(c)
|
||
return result
|
||
|
||
|
||
def _filter_stop(cons):
|
||
"""Legacy: strip all sentinel markers. 供旧测试代码使用。"""
|
||
return [c for c in cons if not _is_sentinel(c)]
|
||
|
||
|
||
def get_term_type(cons):
|
||
"""提取终止类型,返回 (filtered_cons, term_type)."""
|
||
remaining = []
|
||
term = 'normal'
|
||
for c in cons:
|
||
if c is _ABEND_SENTINEL:
|
||
term = 'abend'
|
||
elif _is_sentinel(c):
|
||
pass
|
||
else:
|
||
remaining.append(c)
|
||
return remaining, term
|
||
|
||
|
||
def _cap_paths(paths):
|
||
if len(paths) > _MAX_PATHS:
|
||
return paths[:_MAX_PATHS]
|
||
return paths
|
||
|
||
|
||
def _cap_paths_fair(new_active, child_paths):
|
||
"""两阶段公平截断:每个前置路径至少保留一条子路径,再填充剩余配额。"""
|
||
if len(new_active) <= _MAX_PATHS:
|
||
return new_active
|
||
k = len(child_paths)
|
||
if k <= 1:
|
||
return new_active[:_MAX_PATHS]
|
||
# 分离 sentinel 路径(不参与组合,直接保留)
|
||
stop_paths = [(p, a) for p, a in new_active if any(_is_sentinel(c) for c in p)]
|
||
combined = [(p, a) for p, a in new_active if not any(_is_sentinel(c) for c in p)]
|
||
n_pred = len(combined) // k
|
||
result = []
|
||
if n_pred <= 1:
|
||
result.extend(combined[:_MAX_PATHS - len(result)])
|
||
return result[:_MAX_PATHS]
|
||
remaining_quota = _MAX_PATHS - len(result)
|
||
# Phase 1: 每个前置至少保留一条子路径(轮询分配不同子路径索引)
|
||
quota = min(n_pred, remaining_quota)
|
||
selected = set()
|
||
for p_idx in range(quota):
|
||
c_idx = p_idx % k
|
||
idx = p_idx * k + c_idx
|
||
selected.add(idx)
|
||
result.append(combined[idx])
|
||
if len(result) >= _MAX_PATHS:
|
||
return result[:_MAX_PATHS]
|
||
# Phase 2: 用剩余配额填充其余组合
|
||
remaining = _MAX_PATHS - len(result)
|
||
for idx in range(len(combined)):
|
||
if idx not in selected:
|
||
result.append(combined[idx])
|
||
remaining -= 1
|
||
if remaining <= 0:
|
||
break
|
||
return result[:_MAX_PATHS]
|
||
|
||
|
||
# ── 路径枚举 ──
|
||
|
||
def enum_paths(node, fields):
|
||
"""枚举路径,每条路径返回 (constraints, assignments).
|
||
返回 list[tuple[list[tuple], dict]].
|
||
"""
|
||
if isinstance(node, Assign):
|
||
return [([], {node.target: [node.source_info]})]
|
||
|
||
if isinstance(node, BrSeq):
|
||
if not node.children:
|
||
return [([], {})]
|
||
paths = [([], {})]
|
||
for child in node.children:
|
||
child_paths = _cap_paths(enum_paths(child, fields))
|
||
if not child_paths:
|
||
continue
|
||
new_active = []
|
||
covered_sigs = set()
|
||
for p_cons, p_assign in paths:
|
||
if any(_is_sentinel(c) for c in p_cons):
|
||
new_active.append((p_cons, p_assign))
|
||
continue
|
||
for cp_cons, cp_assign in child_paths:
|
||
merged_cons = p_cons + list(cp_cons)
|
||
sig = frozenset(_hashable_cons(merged_cons))
|
||
if sig not in covered_sigs:
|
||
covered_sigs.add(sig)
|
||
merged = {}
|
||
for d in (p_assign, cp_assign):
|
||
for k, v in d.items():
|
||
merged.setdefault(k, []).extend(v if isinstance(v, list) else [v])
|
||
new_active.append((merged_cons, merged))
|
||
if not new_active:
|
||
for pc, pa in paths:
|
||
if not any(_is_sentinel(c) for c in pc):
|
||
new_active.append((pc, dict(pa)))
|
||
break
|
||
paths = new_active
|
||
return paths
|
||
|
||
elif isinstance(node, BrIf):
|
||
parsed = parse_single_condition(node.condition, fields)
|
||
if parsed and is_field(parsed[0], fields):
|
||
field, op, val = parsed
|
||
paths = []
|
||
true_sub = _cap_paths(enum_paths(node.true_seq, fields))
|
||
for sp_cons, sp_assign in (true_sub or [([], {})]):
|
||
paths.append(([(field, op, val, True)] + sp_cons, sp_assign))
|
||
false_sub = _cap_paths(enum_paths(node.false_seq, fields))
|
||
for fp_cons, fp_assign in (false_sub or [([], {})]):
|
||
paths.append(([(field, op, val, False)] + fp_cons, fp_assign))
|
||
return paths
|
||
# CondNot wrapping a single leaf (e.g., IF NOT WS-AMOUNT > 1000)
|
||
if node.cond_tree and isinstance(node.cond_tree, CondNot):
|
||
child = node.cond_tree.child
|
||
if isinstance(child, CondLeaf) and is_field(child.field, fields):
|
||
paths = []
|
||
true_sub = _cap_paths(enum_paths(node.true_seq, fields))
|
||
for sp_cons, sp_assign in (true_sub or [([], {})]):
|
||
paths.append(([(child.field, child.op, child.value, False)] + sp_cons, sp_assign))
|
||
false_sub = _cap_paths(enum_paths(node.false_seq, fields))
|
||
for fp_cons, fp_assign in (false_sub or [([], {})]):
|
||
paths.append(([(child.field, child.op, child.value, True)] + fp_cons, fp_assign))
|
||
return paths
|
||
if node.cond_tree:
|
||
leaves = collect_leaves(node.cond_tree)
|
||
if leaves and all(is_field(l.field, fields) for l in leaves):
|
||
sets = mcdc_sets(node.cond_tree, fields)
|
||
if sets:
|
||
paths = []
|
||
for constraints, decision in sets:
|
||
body = _cap_paths(enum_paths(
|
||
node.true_seq if decision else node.false_seq, fields
|
||
))
|
||
for sp_cons, sp_assign in (body or [([], {})]):
|
||
paths.append((constraints + sp_cons, sp_assign))
|
||
return paths
|
||
# CondLeaf fallback: 单 leaf(含 88-level 解析后的条件树)MC/DC 不适用
|
||
if len(leaves) == 1:
|
||
leaf = leaves[0]
|
||
paths = []
|
||
true_sub = _cap_paths(enum_paths(node.true_seq, fields))
|
||
for sp_cons, sp_assign in (true_sub or [([], {})]):
|
||
paths.append(([(leaf.field, leaf.op, leaf.value, True)] + sp_cons, sp_assign))
|
||
false_sub = _cap_paths(enum_paths(node.false_seq, fields))
|
||
for fp_cons, fp_assign in (false_sub or [([], {})]):
|
||
paths.append(([(leaf.field, leaf.op, leaf.value, False)] + fp_cons, fp_assign))
|
||
return paths
|
||
# Fallback: parsed condition but non-field (e.g. arithmetic expr)
|
||
if parsed:
|
||
field, op, val = parsed
|
||
paths = []
|
||
true_sub = enum_paths(node.true_seq, fields)
|
||
for sp_cons, sp_assign in (true_sub or [([], {})]):
|
||
paths.append(([(field, op, val, True)] + sp_cons, sp_assign))
|
||
false_sub = enum_paths(node.false_seq, fields)
|
||
for fp_cons, fp_assign in (false_sub or [([], {})]):
|
||
paths.append(([(field, op, val, False)] + fp_cons, fp_assign))
|
||
return paths
|
||
return [([], {})]
|
||
|
||
elif isinstance(node, BrEval):
|
||
if node.subjects:
|
||
paths = []
|
||
prior_false_cons = []
|
||
for values, seq in node.when_list:
|
||
sub = _cap_paths(enum_paths(seq, fields))
|
||
for sp_cons, sp_assign in (sub or [([], {})]):
|
||
when_cons = [(node.subjects[i], '=', values[i], True)
|
||
for i in range(len(node.subjects))]
|
||
constraints = list(prior_false_cons) + when_cons + sp_cons
|
||
paths.append((constraints, sp_assign))
|
||
for i in range(len(node.subjects)):
|
||
prior_false_cons.append((node.subjects[i], '=', values[i], False))
|
||
if node.has_other:
|
||
sub = _cap_paths(enum_paths(node.other_seq, fields))
|
||
for sp_cons, sp_assign in (sub or [([], {})]):
|
||
paths.append((list(prior_false_cons) + sp_cons, sp_assign))
|
||
return paths
|
||
if node.subject == 'TRUE':
|
||
paths = []
|
||
prior_false_sets = [] # list[list[Constraint]]
|
||
for value, seq in node.when_list:
|
||
cond = parse_compound_condition(value, fields)
|
||
if cond and isinstance(cond, CondLeaf) and is_field(cond.field, fields):
|
||
sub = _cap_paths(enum_paths(seq, fields))
|
||
for sp_cons, sp_assign in (sub or [([], {})]):
|
||
constraints = [c for pf in prior_false_sets for c in pf]
|
||
constraints.append((cond.field, cond.op, cond.value, True))
|
||
paths.append((constraints + sp_cons, sp_assign))
|
||
prior_false_sets.append([(cond.field, cond.op, cond.value, False)])
|
||
elif cond and isinstance(cond, CondNot) and isinstance(cond.child, CondLeaf) and is_field(cond.child.field, fields):
|
||
leaf = cond.child
|
||
sub = _cap_paths(enum_paths(seq, fields))
|
||
for sp_cons, sp_assign in (sub or [([], {})]):
|
||
constraints = [c for pf in prior_false_sets for c in pf]
|
||
constraints.append((leaf.field, leaf.op, leaf.value, False))
|
||
paths.append((constraints + sp_cons, sp_assign))
|
||
prior_false_sets.append([(leaf.field, leaf.op, leaf.value, True)])
|
||
elif cond:
|
||
leaves = collect_leaves(cond)
|
||
if leaves and all(is_field(l.field, fields) for l in leaves):
|
||
sets = mcdc_sets(cond, fields)
|
||
if sets:
|
||
sub = _cap_paths(enum_paths(seq, fields))
|
||
new_false_sets = []
|
||
for cs, decision in sets:
|
||
if decision:
|
||
if not prior_false_sets:
|
||
for sp_cons, sp_assign in (sub or [([], {})]):
|
||
paths.append((list(cs) + sp_cons, sp_assign))
|
||
else:
|
||
for pf_set in prior_false_sets:
|
||
for sp_cons, sp_assign in (sub or [([], {})]):
|
||
paths.append((list(pf_set) + list(cs) + sp_cons, sp_assign))
|
||
else:
|
||
new_false_sets.append(cs)
|
||
if not new_false_sets:
|
||
prior_false_sets = []
|
||
break
|
||
combined = []
|
||
for pf_set in prior_false_sets:
|
||
for nf_set in new_false_sets:
|
||
combined.append(list(pf_set) + list(nf_set))
|
||
prior_false_sets = combined
|
||
else:
|
||
prior_false_sets = []
|
||
break
|
||
else:
|
||
prior_false_sets = []
|
||
break
|
||
else:
|
||
prior_false_sets = []
|
||
break
|
||
if node.has_other:
|
||
sub = _cap_paths(enum_paths(node.other_seq, fields))
|
||
for sp_cons, sp_assign in (sub or [([], {})]):
|
||
constraints = [c for pf in prior_false_sets for c in pf]
|
||
paths.append((constraints + sp_cons, sp_assign))
|
||
return paths
|
||
if not is_field(node.subject, fields):
|
||
return [([], {})]
|
||
paths = []
|
||
for value, seq in node.when_list:
|
||
sub = _cap_paths(enum_paths(seq, fields))
|
||
thru_m = re.match(r'^(\d+)\s+THRU\s+(\d+)$', str(value), re.IGNORECASE)
|
||
if thru_m and not node.subjects:
|
||
low, high = thru_m.group(1), thru_m.group(2)
|
||
for sp_cons, sp_assign in (sub or [([], {})]):
|
||
paths.append(([(node.subject, '>=', low, True), (node.subject, '<=', high, True)] + sp_cons, sp_assign))
|
||
paths.append(([(node.subject, '<=', high, True), (node.subject, '>=', low, True)] + sp_cons, sp_assign))
|
||
else:
|
||
for sp_cons, sp_assign in (sub or [([], {})]):
|
||
paths.append(([(node.subject, '=', value, True)] + sp_cons, sp_assign))
|
||
if node.has_other:
|
||
sub = _cap_paths(enum_paths(node.other_seq, fields))
|
||
thru_found = False
|
||
for v, _ in node.when_list:
|
||
thru_m = re.match(r'^(\d+)\s+THRU\s+(\d+)$', str(v), re.IGNORECASE)
|
||
if thru_m and not node.subjects:
|
||
thru_found = True
|
||
low_int, high_int = int(thru_m.group(1)), int(thru_m.group(2))
|
||
for sp_cons, sp_assign in (sub or [([], {})]):
|
||
a_low = dict(sp_assign)
|
||
a_low[node.subject] = [{'type': 'move_literal', 'literal': str(max(0, low_int - 1))}]
|
||
low_cons = [(node.subject, 'not_in', [thru_m.group(1), thru_m.group(2)], True)]
|
||
paths.append((low_cons + sp_cons, a_low))
|
||
a_high = dict(sp_assign)
|
||
a_high[node.subject] = [{'type': 'move_literal', 'literal': str(high_int + 1)}]
|
||
high_cons = [(node.subject, 'not_in', [thru_m.group(1), thru_m.group(2)], True)]
|
||
paths.append((high_cons + sp_cons, a_high))
|
||
if not thru_found:
|
||
case_vals = [v for v, _ in node.when_list]
|
||
for sp_cons, sp_assign in (sub or [([], {})]):
|
||
paths.append(([(node.subject, 'not_in', case_vals, True)] + sp_cons, sp_assign))
|
||
return paths
|
||
|
||
elif isinstance(node, BrSearch):
|
||
return _enum_search_paths(node, fields)
|
||
|
||
elif isinstance(node, BrPerform):
|
||
if node.perf_type in ('para', 'thru'):
|
||
if node.body_seq:
|
||
paths = enum_paths(node.body_seq, fields)
|
||
# EXIT PERFORM 只在 PERFORM 体内有效,剥离后不影响后续 BrSeq 组合
|
||
paths = [([c for c in cons if c is not _STOP_EXIT_PERFORM], a) for cons, a in paths]
|
||
return paths
|
||
return [([], {})]
|
||
elif node.perf_type in ('until', 'para_until', 'varying', 'para_varying'):
|
||
# 尝试单条件(现有逻辑)
|
||
parsed = parse_single_condition(node.condition, fields)
|
||
if parsed and is_field(parsed[0], fields):
|
||
field, op, val = parsed
|
||
paths = []
|
||
false_sub = _cap_paths(enum_paths(node.body_seq, fields))
|
||
false_sub = [([c for c in cons if c is not _STOP_EXIT_PERFORM], a) for cons, a in false_sub]
|
||
for sp_cons, sp_assign in (false_sub or [([], {})]):
|
||
body_assign = dict(sp_assign)
|
||
# PERFORM VARYING: 将 FROM 值作为 MOVE 赋值加入 Enter 路径
|
||
if node.varying_from and node.varying_var:
|
||
is_fld = any(f['name'] == node.varying_from for f in fields) if fields else False
|
||
from_asgn = {'type': 'move', 'source_vars': [node.varying_from]} if is_fld else {'type': 'move_literal', 'literal': node.varying_from}
|
||
from_assign = {node.varying_var: [from_asgn]}
|
||
merged = {}
|
||
for d in (from_assign, sp_assign):
|
||
for k, v in d.items():
|
||
merged.setdefault(k, []).extend(v if isinstance(v, list) else [v])
|
||
sp_assign = merged
|
||
paths.append(([(field, op, val, False)] + sp_cons, sp_assign))
|
||
# PERFORM VARYING: 末次迭代路径(下标=MAX)
|
||
if node.varying_from and node.varying_var and op in ('>', '>=', '<', '<=', '='):
|
||
try:
|
||
if op == '>':
|
||
max_val = int(val)
|
||
elif op == '>=':
|
||
max_val = int(val) - 1
|
||
elif op == '<':
|
||
max_val = int(val)
|
||
elif op == '<=':
|
||
max_val = int(val) + 1
|
||
elif op == '=':
|
||
by_str = str(node.varying_by or '1')
|
||
if by_str.lstrip('-').isdigit() and int(by_str) < 0:
|
||
max_val = int(val) + 1
|
||
else:
|
||
max_val = int(val) - 1
|
||
from_val = int(node.varying_from)
|
||
by_str = str(node.varying_by or '1')
|
||
if by_str.lstrip('-').isdigit() and int(by_str) < 0:
|
||
ok = max_val <= from_val
|
||
else:
|
||
ok = max_val >= from_val
|
||
if ok:
|
||
max_asgn = {'type': 'move_literal', 'literal': str(max_val)}
|
||
max_assign = {node.varying_var: [max_asgn]}
|
||
merged_max = {}
|
||
for d in (max_assign, body_assign):
|
||
for k, v in d.items():
|
||
merged_max.setdefault(k, []).extend(v if isinstance(v, list) else [v])
|
||
the_cons = [(field, op, val, False)]
|
||
paths.append((the_cons + sp_cons, merged_max))
|
||
except (ValueError, TypeError):
|
||
pass
|
||
paths.append(([(field, op, val, True)], {}))
|
||
return paths
|
||
# 尝试复合条件(AND/OR)
|
||
cond_tree = parse_compound_condition(node.condition, fields)
|
||
if cond_tree:
|
||
leaves = collect_leaves(cond_tree)
|
||
if leaves and all(is_field(l.field, fields) for l in leaves):
|
||
sets = mcdc_sets(cond_tree, fields)
|
||
if sets:
|
||
paths = []
|
||
false_sub = _cap_paths(enum_paths(node.body_seq, fields))
|
||
false_sub = [([c for c in cons if c is not _STOP_EXIT_PERFORM], a) for cons, a in false_sub]
|
||
for sp_cons, sp_assign in (false_sub or [([], {})]):
|
||
# PERFORM VARYING: 将 FROM 值作为 MOVE 赋值加入 Enter 路径
|
||
if node.varying_from and node.varying_var:
|
||
is_fld = any(f['name'] == node.varying_from for f in fields) if fields else False
|
||
from_asgn = {'type': 'move', 'source_vars': [node.varying_from]} if is_fld else {'type': 'move_literal', 'literal': node.varying_from}
|
||
from_assign = {node.varying_var: [from_asgn]}
|
||
merged = {}
|
||
for d in (from_assign, sp_assign):
|
||
for k, v in d.items():
|
||
merged.setdefault(k, []).extend(v if isinstance(v, list) else [v])
|
||
sp_assign = merged
|
||
for constraints, decision in sets:
|
||
if not decision:
|
||
paths.append((list(constraints) + sp_cons, sp_assign))
|
||
for constraints, decision in sets:
|
||
if decision:
|
||
paths.append((list(constraints), {}))
|
||
if paths:
|
||
return paths
|
||
return [([], {})]
|
||
|
||
elif isinstance(node, CallNode):
|
||
if node.program_name in _ABEND_PROGRAMS:
|
||
return [([_ABEND_SENTINEL], {})]
|
||
return [([], {})]
|
||
|
||
elif isinstance(node, ExitNode):
|
||
if node.exit_type == 'PERFORM':
|
||
return [([_STOP_EXIT_PERFORM], {})]
|
||
return [([_STOP_SENTINEL], {})]
|
||
|
||
elif isinstance(node, GoTo):
|
||
paths = enum_paths(node.body_seq, fields)
|
||
return [([_STOP_SENTINEL] + c, a) for c, a in paths]
|
||
|
||
return [([], {})]
|
||
|
||
|
||
# ── 值生成 ──
|
||
|
||
def seq_numeric(seq_num: int, total_digits: int) -> str:
|
||
val = seq_num % (10 ** total_digits)
|
||
if val == 0:
|
||
val = 10 ** total_digits - 1
|
||
return str(val).zfill(total_digits)
|
||
|
||
|
||
def seq_alpha(seq_num: int, length: int) -> str:
|
||
letter = chr(65 + (seq_num - 1) % 26)
|
||
return letter * length
|
||
|
||
|
||
def seq_date(seq_num: int) -> str:
|
||
from datetime import datetime, timedelta
|
||
base = datetime(2000, 1, 1)
|
||
d = base + timedelta(days=seq_num - 1)
|
||
return d.strftime('%Y%m%d')
|
||
|
||
|
||
def _is_date_field(name: str) -> bool:
|
||
patterns = [r'DATE', r'YYMMDD', r'YYYYMM']
|
||
for p in patterns:
|
||
if re.search(p, name.upper()):
|
||
return True
|
||
return False
|
||
|
||
|
||
_SPECIAL_VALUES = {
|
||
'ZERO': '0', 'ZEROS': '0', 'ZEROES': '0',
|
||
'SPACE': ' ', 'SPACES': ' ',
|
||
'HIGH-VALUE': '\xff', 'HIGH-VALUES': '\xff',
|
||
'LOW-VALUE': '\x00', 'LOW-VALUES': '\x00',
|
||
'QUOTE': "'", 'QUOTES': "'",
|
||
'ALL': '',
|
||
}
|
||
|
||
|
||
def _apply_value(field: dict, rec: dict) -> bool:
|
||
"""尝试应用 VALUE 子句的初始值。返回 True 表示已处理。"""
|
||
raw = field.get('value')
|
||
if raw is None:
|
||
return False
|
||
val = str(raw).strip("'\"").strip()
|
||
name = field['name']
|
||
pi = field.get('pic_info', {})
|
||
|
||
# 处理 COBOL 特殊值
|
||
if val.upper() in _SPECIAL_VALUES:
|
||
val = _SPECIAL_VALUES[val.upper()]
|
||
|
||
ftype = pi.get('type', 'unknown')
|
||
if ftype == 'numeric':
|
||
digits = pi.get('digits', 0) + pi.get('decimal', 0)
|
||
if digits:
|
||
rec[name] = val.zfill(digits)
|
||
else:
|
||
rec[name] = val
|
||
else:
|
||
length = pi.get('length', 0) or 1
|
||
rec[name] = val.ljust(length)[:length]
|
||
return True
|
||
|
||
|
||
def _children_of(group_name: str, fields: list) -> list:
|
||
"""返回组项目 group_name 在 fields 中的直属子字段列表(按声明顺序)。
|
||
终止条件:遇到同/更高级别(sibling/组边界)或 77 级(独立字段)。
|
||
"""
|
||
result = []
|
||
group_level = None
|
||
found = False
|
||
for f in fields:
|
||
if not found and f['name'] == group_name:
|
||
group_level = f['level']
|
||
found = True
|
||
continue
|
||
if found:
|
||
if f['level'] <= group_level or f['level'] == 77:
|
||
break
|
||
# 88-level 是条件名,不计为子字段
|
||
if f.get('is_88'):
|
||
continue
|
||
result.append(f)
|
||
return result
|
||
|
||
|
||
def _make_numeric_value(idx: int, record_num: int, total_digits: int) -> str:
|
||
max_val = 10 ** total_digits
|
||
for step in (100, 10, 1):
|
||
val = idx * step + record_num
|
||
if val < max_val:
|
||
return str(val).zfill(total_digits)
|
||
return str(record_num % max_val).zfill(total_digits)
|
||
|
||
|
||
def _make_alpha_value(idx: int, record_num: int, length: int) -> str:
|
||
if length == 1:
|
||
ch = chr(65 + (idx + record_num - 2) % 26)
|
||
return ch
|
||
letter = chr(65 + (idx - 1) % 26)
|
||
return letter + str(record_num).zfill(length - 1)
|
||
|
||
|
||
def make_base_record(seq_num: int, fields: list) -> dict:
|
||
rec = {}
|
||
redefines_map = {} # 标量 REDEFINES: parent_name → [child_names]
|
||
group_redefines = [] # 组 REDEFINES: [(redef_name, target_name)]
|
||
filler_key_counter = 0
|
||
numeric_idx = 0
|
||
alpha_idx = 0
|
||
record_num = seq_num
|
||
|
||
for f in fields:
|
||
name = f['name']
|
||
|
||
if f.get('is_88'):
|
||
continue
|
||
|
||
if f.get('redefines'):
|
||
parent = f['redefines']
|
||
if f.get('pic'):
|
||
# 标量 REDEFINES(有 PIC,如 WS-AMOUNT-DISP REDEFINES WS-AMOUNT PIC X(9))
|
||
redefines_map.setdefault(parent, []).append(name)
|
||
continue
|
||
else:
|
||
# 组 REDEFINES(无 PIC,如 CUST-ADDR2 REDEFINES CUST-ADDR)
|
||
group_redefines.append((name, parent))
|
||
# 不 continue — 组本身无 PIC 会在下方"组项目跳过"处理
|
||
# 其子字段作为独立字段正常走循环
|
||
|
||
if f.get('is_filler'):
|
||
if name in rec:
|
||
filler_key_counter += 1
|
||
name = f'FILLER_{filler_key_counter + 1}'
|
||
rec[name] = 'x' * (f.get('pic_info', {}).get('length', 0) or 1)
|
||
continue
|
||
|
||
# Pass 0: VALUE 子句初始值优先
|
||
if _apply_value(f, rec):
|
||
continue
|
||
|
||
# 组项目(无 PIC)跳过
|
||
if not f.get('pic'):
|
||
continue
|
||
|
||
pi = f.get('pic_info', {})
|
||
ftype = pi.get('type', 'unknown')
|
||
digits = pi.get('digits', 0)
|
||
decimal = pi.get('decimal', 0)
|
||
length = pi.get('length', 0)
|
||
|
||
if ftype == 'numeric':
|
||
if _is_date_field(name):
|
||
rec[name] = seq_date(record_num)
|
||
else:
|
||
numeric_idx += 1
|
||
rec[name] = _make_numeric_value(numeric_idx, record_num, digits + decimal)
|
||
elif ftype in ('alphanumeric', 'alphabetic'):
|
||
alpha_idx += 1
|
||
rec[name] = _make_alpha_value(alpha_idx, record_num, length or 1)
|
||
elif ftype == 'numeric-edited':
|
||
numeric_idx += 1
|
||
raw = _make_numeric_value(numeric_idx, record_num, digits + decimal)
|
||
rec[name] = raw.rjust(length)
|
||
else:
|
||
alpha_idx += 1
|
||
rec[name] = _make_alpha_value(alpha_idx, record_num, 8)
|
||
|
||
# Pass 2a: 标量 REDEFINES 复制
|
||
for parent_name, child_names in redefines_map.items():
|
||
if parent_name in rec:
|
||
for child_name in child_names:
|
||
rec[child_name] = rec[parent_name]
|
||
|
||
# Pass 2b: 组 REDEFINES 按位置递归复制子字段
|
||
for redef_name, target_name in group_redefines:
|
||
redef_kids = _children_of(redef_name, fields)
|
||
tgt_kids = _children_of(target_name, fields)
|
||
tgt_idx = 0
|
||
for i, rk in enumerate(redef_kids):
|
||
if tgt_idx >= len(tgt_kids):
|
||
break
|
||
if i == len(redef_kids) - 1 and len(redef_kids) < len(tgt_kids):
|
||
# 最后一个 REDEFINES 子字段,且目标更多 → 拼接剩余所有目标值
|
||
parts = [rec.get(tk['name'], '') for tk in tgt_kids[tgt_idx:]]
|
||
rec[rk['name']] = ''.join(parts)
|
||
elif i == len(redef_kids) - 1 and len(redef_kids) > len(tgt_kids):
|
||
# REDEFINES 子字段更多 → 最后一个 REDEFINES 子字段取最后目标值
|
||
rec[rk['name']] = rec.get(tgt_kids[-1]['name'], '')
|
||
else:
|
||
rec[rk['name']] = rec.get(tgt_kids[tgt_idx]['name'], '')
|
||
tgt_idx += 1
|
||
|
||
return rec
|
||
|
||
|
||
# ── 约束应用 ──
|
||
|
||
def _check_constraint_satisfied(rec, field_name, operator, value, want_true, fields):
|
||
"""检查 field_name 当前值是否满足该约束。满足返回 True。"""
|
||
for f in fields:
|
||
if f['name'] == field_name:
|
||
pi = f.get('pic_info', {})
|
||
ftype = pi.get('type', 'unknown')
|
||
val = rec.get(field_name)
|
||
if val is None:
|
||
return False
|
||
if operator == 'not_in':
|
||
cases = value if isinstance(value, list) else []
|
||
return str(val) not in cases
|
||
if ftype == 'numeric':
|
||
try:
|
||
num_val = int(float(str(val)))
|
||
num_target = int(float(str(value)))
|
||
except (ValueError, TypeError):
|
||
return False
|
||
if operator in ('>=', '>', '<', '<=', '=', '<>'):
|
||
if operator == '>=': ok = num_val >= num_target
|
||
elif operator == '>': ok = num_val > num_target
|
||
elif operator == '<': ok = num_val < num_target
|
||
elif operator == '<=': ok = num_val <= num_target
|
||
elif operator == '=': ok = num_val == num_target
|
||
elif operator == '<>': ok = num_val != num_target
|
||
return ok == want_true
|
||
return True
|
||
else:
|
||
s_val = str(val).strip().upper()
|
||
s_target = str(value).strip().upper()
|
||
eq = s_val == s_target
|
||
if operator == '=':
|
||
return eq == want_true
|
||
elif operator == '<>':
|
||
return (not eq) == want_true
|
||
elif operator in ('>', '<', '>=', '<='):
|
||
if operator == '>':
|
||
ok = s_val > s_target
|
||
elif operator == '<':
|
||
ok = s_val < s_target
|
||
elif operator == '>=':
|
||
ok = s_val >= s_target
|
||
elif operator == '<=':
|
||
ok = s_val <= s_target
|
||
return ok == want_true
|
||
return True
|
||
return False
|
||
|
||
|
||
_ARITH_BOUNDS = {
|
||
'left_big_ops': {'>', '>=', '<>'},
|
||
'left_small_ops': {'<', '<='},
|
||
}
|
||
|
||
def _arith_pic_info(field_name, fields):
|
||
for f in fields:
|
||
if f['name'] == field_name.upper():
|
||
return f.get('pic_info', {})
|
||
return {}
|
||
|
||
def _arith_numeric_pick(field_name, want_big, fields):
|
||
"""为字段选一个大值或小值,返回字符串。"""
|
||
pi = _arith_pic_info(field_name, fields)
|
||
if pi.get('type') != 'numeric':
|
||
return None
|
||
digits = pi.get('digits', 0)
|
||
decimal = pi.get('decimal', 0)
|
||
total = digits + decimal
|
||
max_val = 10 ** total - 1
|
||
if want_big:
|
||
pick = int(max_val * 0.7)
|
||
else:
|
||
pick = 1
|
||
int_part = str(pick // (10 ** decimal)).zfill(digits)
|
||
dec_part = str(pick % (10 ** decimal)).zfill(decimal)
|
||
if decimal == 0:
|
||
return int_part
|
||
return int_part + dec_part
|
||
|
||
def _apply_arith_constraint(rec, field_name, operator, value, want_true, fields):
|
||
"""对算术表达式条件进行字段值 steering。
|
||
|
||
例如 A + B > C (want_true=True):
|
||
- 左值字段(A, B)设大 → 右值字段(C)设小
|
||
例如 A + B <= C (want_true=True):
|
||
- 左值字段设小 → 右值字段设大
|
||
|
||
这是启发式 steering,不是精确求解。
|
||
主要目标是保证分支可达,不保证边界值精确。
|
||
"""
|
||
# 1. 提取左值表达式中的所有字段名(大写)
|
||
tokens = re.findall(r'\b[A-Z][A-Z0-9-]*(?:\([^)]*\))?\b', field_name.upper())
|
||
left_fields = [t for t in tokens if any(f['name'] == t for f in fields)]
|
||
|
||
# 2. 右值是否也为字段
|
||
right_field = value if any(f['name'] == value for f in fields) else None
|
||
|
||
if not left_fields:
|
||
logger.debug(f"算术表达式无法提取字段: {field_name}")
|
||
return
|
||
|
||
# 3. 确定方向:want_true 时左值应大还是小
|
||
if operator in _ARITH_BOUNDS['left_big_ops']:
|
||
left_big = want_true
|
||
elif operator in _ARITH_BOUNDS['left_small_ops']:
|
||
left_big = not want_true
|
||
else:
|
||
left_big = want_true
|
||
|
||
# 4. 设置左值字段
|
||
for lf in left_fields:
|
||
pick = _arith_numeric_pick(lf, left_big, fields)
|
||
if pick is not None:
|
||
rec[lf] = pick
|
||
|
||
# 5. 设置右值字段(如果有)
|
||
if right_field:
|
||
pick = _arith_numeric_pick(right_field, not left_big, fields)
|
||
if pick is not None:
|
||
rec[right_field] = pick
|
||
|
||
|
||
def _inc_str(s, length):
|
||
s = str(s).strip()
|
||
try:
|
||
r = str(int(s) + 1).zfill(length)
|
||
return r if len(r) <= length else '9' * length
|
||
except ValueError:
|
||
c = list(str(s).ljust(length)[:length])
|
||
for i in range(len(c) - 1, -1, -1):
|
||
if c[i] not in ' 9Zz\xff':
|
||
c[i] = chr(ord(c[i]) + 1)
|
||
break
|
||
if c[i] == ' ':
|
||
c[i] = '0'
|
||
break
|
||
if c[i] == '9':
|
||
c[i] = '0'
|
||
elif c[i] == 'Z':
|
||
c[i] = 'A'
|
||
elif c[i] == 'z':
|
||
c[i] = 'a'
|
||
return ''.join(c)
|
||
|
||
|
||
def _dec_str(s, length):
|
||
s = str(s).strip()
|
||
try:
|
||
n = max(0, int(s) - 1)
|
||
return str(n).zfill(length)
|
||
except ValueError:
|
||
c = list(str(s).ljust(length)[:length])
|
||
for i in range(len(c) - 1, -1, -1):
|
||
if c[i] not in ' 0Aa\x00':
|
||
c[i] = chr(ord(c[i]) - 1)
|
||
break
|
||
if c[i] == ' ':
|
||
break
|
||
if c[i] == '0':
|
||
c[i] = '9'
|
||
elif c[i] == 'A':
|
||
c[i] = ' '
|
||
elif c[i] == 'a':
|
||
c[i] = ' '
|
||
return ''.join(c)
|
||
|
||
|
||
def _reconcile_unstring_fields(rec, left_field, operator, right_field, want_true,
|
||
fields, left_chain, assignments, path_assign):
|
||
right_root, right_chain = trace_to_root(right_field, assignments, fields, path_assign)
|
||
if right_root not in rec:
|
||
logger.debug(f"字段间比较协调:右侧根 {right_root} 不在 rec,跳过")
|
||
return
|
||
all_entries = (left_chain or []) + (right_chain or [])
|
||
for _, asgn in all_entries:
|
||
if asgn.get('type') not in ('move', 'unstring_split'):
|
||
logger.debug(f"字段间比较协调:链含非 MOVE 类型 {asgn.get('type')},跳过")
|
||
return
|
||
left_val = str(rec.get(left_field, ''))
|
||
if not left_val.strip():
|
||
logger.debug(f"字段间比较协调:左侧 {left_field} 无值,跳过")
|
||
return
|
||
length = 0
|
||
for f in fields:
|
||
if f['name'] == right_root:
|
||
length = f.get('pic_info', {}).get('length', 0)
|
||
break
|
||
if length == 0:
|
||
length = len(left_val)
|
||
|
||
if operator in ('>=', '<='):
|
||
if want_true:
|
||
right_val = left_val
|
||
else:
|
||
right_val = _inc_str(left_val, length) if operator == '>=' else _dec_str(left_val, length)
|
||
elif operator in ('>', '<'):
|
||
if want_true:
|
||
right_val = _dec_str(left_val, length) if operator == '>' else _inc_str(left_val, length)
|
||
else:
|
||
right_val = left_val
|
||
elif operator == '=':
|
||
right_val = left_val if want_true else _inc_str(left_val, length)
|
||
elif operator == '<>':
|
||
right_val = _inc_str(left_val, length) if want_true else left_val
|
||
else:
|
||
return
|
||
|
||
rec[right_root] = right_val[:length] if right_val else right_val
|
||
logger.debug(f"字段间比较协调:{left_field}={left_val} {operator} {right_field} -> {right_root}={rec[right_root]} (want={want_true})")
|
||
|
||
|
||
def apply_constraint(rec, field_name, operator, value, want_true, fields, assignments=None, path_assign=None):
|
||
# 标准化字段名:去除括号内空格(WS-CELL ( 1, 1 ) → WS-CELL(1,1))
|
||
field_name = re.sub(r'\s*([(),])\s*', r'\1', field_name)
|
||
# 变量下标解析:WS-FIXED-VALUE(WS-IDX) → WS-FIXED-VALUE(1)
|
||
vm = re.match(r'^(\w[\w-]*)\((\w[\w-]*)\)$', field_name)
|
||
if vm:
|
||
base_var, subscript_var = vm.groups()
|
||
if subscript_var in rec:
|
||
try:
|
||
resolved_name = f'{base_var}({int(rec[subscript_var])})'
|
||
if any(f['name'] == resolved_name for f in fields):
|
||
apply_constraint(rec, resolved_name, operator, value, want_true, fields, assignments, path_assign)
|
||
return
|
||
except (ValueError, TypeError):
|
||
pass
|
||
# 下标传播:无下标约束 → 应用到所有下标变体
|
||
base = _basename(field_name)
|
||
subscripted = [f for f in fields if f['name'] != base and _basename(f['name']) == base]
|
||
if subscripted and field_name == base:
|
||
for sf in subscripted:
|
||
apply_constraint(rec, sf['name'], operator, value, want_true, fields, assignments, path_assign)
|
||
return
|
||
|
||
# REDEFINES 字段的约束重定向到父字段(共享存储)
|
||
for f in fields:
|
||
if f['name'] == field_name:
|
||
if f.get('is_filler'):
|
||
return
|
||
if f.get('redefines'):
|
||
parent_name = f['redefines']
|
||
logger.debug(f"REDEFINES 约束重定向: {field_name} → {parent_name}")
|
||
apply_constraint(rec, parent_name, operator, value, want_true, fields, assignments, path_assign)
|
||
return
|
||
break
|
||
chain = None
|
||
if assignments:
|
||
root_var, chain = trace_to_root(field_name, assignments, fields, path_assign)
|
||
if root_var != field_name:
|
||
new_field_name, new_op, new_val = invert_through_chain(root_var, chain, operator, value)
|
||
if any(f['name'] == new_field_name for f in fields):
|
||
field_name, operator, value = new_field_name, new_op, new_val
|
||
|
||
# 字段间比较:在 satisfied check 前解析/处理
|
||
if any(f['name'] == value for f in fields):
|
||
resolved_literal = None
|
||
for f in fields:
|
||
if f['name'] == value and f.get('value') is not None:
|
||
resolved_literal = str(f['value']).strip("'").strip('"')
|
||
break
|
||
if resolved_literal is not None:
|
||
value = resolved_literal
|
||
elif chain is not None and assignments:
|
||
_reconcile_unstring_fields(rec, field_name, operator, value, want_true,
|
||
fields, chain, assignments, path_assign)
|
||
return
|
||
elif re.search(r'[+\-*/]', field_name):
|
||
_apply_arith_constraint(rec, field_name, operator, value, want_true, fields)
|
||
return
|
||
else:
|
||
logger.debug(f"字段间比较约束跳过:{field_name} {operator} {value}")
|
||
return
|
||
|
||
# 如果当前值已满足该约束,跳过覆盖(保持先前约束的一致性)
|
||
# 但零值时强制使用边界值(非 0/非 min)
|
||
if _check_constraint_satisfied(rec, field_name, operator, value, want_true, fields):
|
||
cur = str(rec.get(field_name, '')).strip('0')
|
||
if (cur == '' or cur == '.') and (
|
||
(operator in ('>', '>=') and not want_true) or
|
||
(operator in ('<', '<=') and want_true)
|
||
):
|
||
for f in fields:
|
||
if f['name'] == field_name:
|
||
pi = f.get('pic_info', {})
|
||
if pi.get('type') == 'numeric':
|
||
val = satisfying_value(pi, operator, value, want_true)
|
||
rec[field_name] = val
|
||
return
|
||
return
|
||
|
||
if operator == 'not_in':
|
||
for f in fields:
|
||
if f['name'] == field_name:
|
||
pi = f.get('pic_info', {})
|
||
cases = value if isinstance(value, list) else []
|
||
ftype = pi.get('type', 'unknown')
|
||
if ftype in ('alphanumeric', 'alphabetic'):
|
||
for c in 'ABCDEFGHIJKLMNOPQRSTUVWXYZ':
|
||
if c not in cases:
|
||
rec[field_name] = c.ljust(pi.get('length', 1), c)
|
||
return
|
||
else:
|
||
for n in range(1, 100):
|
||
if str(n) not in cases:
|
||
rec[field_name] = str(n).zfill(pi.get('digits', 0) + pi.get('decimal', 0))
|
||
return
|
||
return
|
||
for f in fields:
|
||
if f['name'] == field_name:
|
||
pi = f.get('pic_info', {})
|
||
val = satisfying_value(pi, operator, value, want_true)
|
||
rec[field_name] = val
|
||
return
|
||
|
||
|
||
# ── 记录生成入口 ──
|
||
|
||
def sync_redefined_fields(rec, fields):
|
||
"""赋值/约束后同步 REDEFINES 字段:父字段的值拷贝到所有 REDEFINES 子字段。"""
|
||
redefines_map = {}
|
||
group_redefines = []
|
||
for f in fields:
|
||
if f.get('is_88') or f.get('is_filler'):
|
||
continue
|
||
if f.get('redefines') and f.get('pic'):
|
||
redefines_map.setdefault(f['redefines'], []).append(f['name'])
|
||
elif f.get('redefines') and not f.get('pic'):
|
||
group_redefines.append((f['name'], f['redefines']))
|
||
for parent_name, child_names in redefines_map.items():
|
||
if parent_name in rec:
|
||
for child_name in child_names:
|
||
rec[child_name] = rec[parent_name]
|
||
for redef_name, target_name in group_redefines:
|
||
redef_kids = _children_of(redef_name, fields)
|
||
tgt_kids = _children_of(target_name, fields)
|
||
tgt_idx = 0
|
||
for i, rk in enumerate(redef_kids):
|
||
if tgt_idx >= len(tgt_kids):
|
||
break
|
||
if i == len(redef_kids) - 1 and len(redef_kids) < len(tgt_kids):
|
||
parts = [rec.get(tk['name'], '') for tk in tgt_kids[tgt_idx:]]
|
||
rec[rk['name']] = ''.join(parts)
|
||
elif i == len(redef_kids) - 1 and len(redef_kids) > len(tgt_kids):
|
||
rec[rk['name']] = rec.get(tgt_kids[-1]['name'], '')
|
||
else:
|
||
rec[rk['name']] = rec.get(tgt_kids[tgt_idx]['name'], '')
|
||
tgt_idx += 1
|
||
|
||
|
||
def apply_occurs_depending(rec, fields):
|
||
"""根据 OCCURS DEPENDING ON 变量的当前值,清零超范围的下标字段。"""
|
||
# Phase 1: 将零值的 DEPENDING ON 变量设为最大下标
|
||
dep_max = {}
|
||
for f in fields:
|
||
dep_var = f.get('occurs_depending')
|
||
if not dep_var:
|
||
continue
|
||
m = re.search(r'\((\d+)\)$', f['name'])
|
||
if m:
|
||
sub = int(m.group(1))
|
||
if sub > dep_max.get(dep_var, 0):
|
||
dep_max[dep_var] = sub
|
||
for dep_var, max_sub in dep_max.items():
|
||
try:
|
||
cur_val = int(float(str(rec.get(dep_var, '0'))))
|
||
except (ValueError, TypeError):
|
||
cur_val = 0
|
||
if cur_val == 0:
|
||
for f in fields:
|
||
if f['name'] == dep_var:
|
||
pi = f.get('pic_info', {})
|
||
digits = pi.get('digits', 0) + pi.get('decimal', 0)
|
||
if digits > 0:
|
||
rec[dep_var] = str(max_sub).zfill(digits)
|
||
break
|
||
# Phase 2: 清零超范围的下标字段
|
||
for f in fields:
|
||
dep_var = f.get('occurs_depending')
|
||
if not dep_var:
|
||
continue
|
||
name = f['name']
|
||
m = re.search(r'\((\d+)\)$', name)
|
||
if not m:
|
||
continue
|
||
sub = int(m.group(1))
|
||
max_val = int(rec.get(dep_var, 0))
|
||
if sub <= max_val:
|
||
continue
|
||
pi = f.get('pic_info', {})
|
||
ftype = pi.get('type', 'unknown')
|
||
length = pi.get('length', 0) or 1
|
||
if ftype == 'numeric':
|
||
rec[name] = '0' * (pi.get('digits', 0) + pi.get('decimal', 0))
|
||
elif ftype in ('alphanumeric', 'alphabetic'):
|
||
rec[name] = ' ' * length
|
||
else:
|
||
rec[name] = '0' * length
|
||
|
||
|
||
def _non_match_for(cond_leaf, fields):
|
||
if not fields or not cond_leaf:
|
||
return None
|
||
base = re.sub(r'\s*\(.*?\)\s*$', '', cond_leaf.field)
|
||
for f in fields:
|
||
if re.sub(r'\s*\(.*?\)\s*$', '', f['name']) == base:
|
||
pic = f.get('pic_info', {})
|
||
if pic.get('type') == 'numeric':
|
||
return '0'
|
||
return ' '
|
||
return None
|
||
|
||
|
||
def _enum_search_paths(node, fields):
|
||
# 从条件字段名推断 OCCURS 数;如 WS-CODE-VAL(WS-IDX) → 查 WS-CODE-VAL(j) 最大 j
|
||
occurs_count = 1
|
||
if node.when_list and node.cond_trees and node.cond_trees[0]:
|
||
ct = node.cond_trees[0]
|
||
if isinstance(ct, CondLeaf):
|
||
base = re.sub(r'\s*\(.*?\)\s*$', '', ct.field)
|
||
for f in fields:
|
||
m = re.match(rf'^{re.escape(base)}\((\d+)\)$', f['name'])
|
||
if m:
|
||
occurs_count = max(occurs_count, int(m.group(1)))
|
||
if occurs_count <= 1:
|
||
# 再查父组名下各字段的后缀
|
||
parent = node.table_name
|
||
for f in fields:
|
||
m = re.match(rf'^{re.escape(parent)}\((\d+)\)$', f['name'])
|
||
if m:
|
||
occurs_count = max(occurs_count, int(m.group(1)))
|
||
|
||
paths = []
|
||
for i, (cond_text, body_seq) in enumerate(node.when_list):
|
||
cond_tree = node.cond_trees[i] if i < len(node.cond_trees) else None
|
||
sub = _cap_paths(enum_paths(body_seq, fields))
|
||
if not sub:
|
||
sub = [([], {})]
|
||
|
||
extra_assign = {}
|
||
if cond_tree and isinstance(cond_tree, CondLeaf):
|
||
base = re.sub(r'\s*\(.*?\)\s*$', '', cond_tree.field)
|
||
matching_val = cond_tree.value
|
||
elem_key = f'{base}({i + 1})'
|
||
if any(f['name'] == matching_val for f in fields):
|
||
extra_assign[elem_key] = [{'type': 'move', 'source_vars': [matching_val]}]
|
||
else:
|
||
extra_assign[elem_key] = [{'type': 'move_literal', 'literal': matching_val}]
|
||
non_match = _non_match_for(cond_tree, fields) or ' '
|
||
for j in range(i):
|
||
prev_key = f'{base}({j + 1})'
|
||
extra_assign[prev_key] = [{'type': 'move_literal', 'literal': non_match}]
|
||
|
||
for sp_cons, sp_assign in (sub or [([], {})]):
|
||
merged_assign = dict(extra_assign)
|
||
for k, v in sp_assign.items():
|
||
merged_assign.setdefault(k, []).extend(v if isinstance(v, list) else [v])
|
||
if cond_tree and isinstance(cond_tree, CondLeaf):
|
||
paths.append(([(elem_key, cond_tree.op, matching_val, True)] + sp_cons, merged_assign))
|
||
else:
|
||
paths.append((sp_cons, merged_assign))
|
||
|
||
if node.has_at_end:
|
||
sub = _cap_paths(enum_paths(node.at_end_seq, fields))
|
||
for sp_cons, sp_assign in (sub or [([], {})]):
|
||
extra_assign = {}
|
||
non_match = ' '
|
||
if node.when_list:
|
||
ct = node.cond_trees[0]
|
||
if ct and isinstance(ct, CondLeaf):
|
||
non_match = _non_match_for(ct, fields) or ' '
|
||
base = re.sub(r'\s*\(.*?\)\s*$', '', ct.field)
|
||
for j in range(max(occurs_count, 1)):
|
||
extra_assign[f'{base}({j + 1})'] = [{'type': 'move_literal', 'literal': non_match}]
|
||
merged_assign = dict(extra_assign)
|
||
for k, v in sp_assign.items():
|
||
merged_assign.setdefault(k, []).extend(v if isinstance(v, list) else [v])
|
||
paths.append((sp_cons, merged_assign))
|
||
|
||
return paths
|
||
|
||
|
||
def generate_records(path_infos, data_fields, base_assignments=None, file_sec=None):
|
||
"""生成测试数据记录。
|
||
path_infos: list of (constraints, path_assignments) 或 (constraints, path_assignments, term_type).
|
||
base_assignments: 全局 assignments dict (用于 trace_to_root).
|
||
返回: (records, kept_path_cons, term_types).
|
||
"""
|
||
# 自动兼容旧 2-tuple 格式
|
||
if path_infos and len(path_infos[0]) == 2:
|
||
path_infos = [(c, a, 'normal') for c, a in path_infos]
|
||
records = []
|
||
kept_path_cons = []
|
||
term_types = []
|
||
if path_infos:
|
||
for seq, (path_cons, path_assign, term_type) in enumerate(path_infos, start=1):
|
||
path_cons = _filter_stop(path_cons)
|
||
rec = make_base_record(seq, data_fields)
|
||
# Pass A: 先传播赋值(MOVE/COMPUTE/READ INTO 等),模拟到决策点前的程序状态
|
||
if isinstance(path_assign, dict):
|
||
propagate_assignments(rec, path_assign, data_fields, file_sec=file_sec)
|
||
# Pass A.5: 检查约束是否经过链追溯到字面量截断(不可能路径)
|
||
skip_impossible = False
|
||
if base_assignments and isinstance(path_assign, dict):
|
||
for c in path_cons:
|
||
if len(c) == 4 and not skip_impossible:
|
||
field, op, val, want = c
|
||
root_var, chain = trace_to_root(field, base_assignments, data_fields, path_assign)
|
||
if root_var != field:
|
||
new_fn, new_op, new_val = invert_through_chain(root_var, chain, op, val)
|
||
if any(f['name'] == new_fn for f in data_fields):
|
||
asgn_val = path_assign.get(root_var)
|
||
if asgn_val is not None:
|
||
asgn_list = asgn_val if isinstance(asgn_val, list) else [asgn_val]
|
||
if asgn_list and asgn_list[-1]['type'] == 'move_literal' and root_var in rec:
|
||
if not _check_constraint_satisfied(rec, root_var, new_op, new_val, want, data_fields):
|
||
skip_impossible = True
|
||
break
|
||
elif field in rec:
|
||
asgn_val = path_assign.get(field)
|
||
if asgn_val is not None:
|
||
asgn_list = asgn_val if isinstance(asgn_val, list) else [asgn_val]
|
||
if asgn_list and asgn_list[-1]['type'] == 'move_literal':
|
||
cur_val = str(rec.get(field, ''))
|
||
if cur_val != '':
|
||
pi = next((f.get('pic_info', {}) for f in data_fields if f['name'] == field), {})
|
||
if pi.get('type') == 'numeric':
|
||
try:
|
||
nv = int(float(cur_val))
|
||
tv = int(float(str(val)))
|
||
ops = {'>': lambda a,b: a > b, '<': lambda a,b: a < b, '=': lambda a,b: a == b, '<>': lambda a,b: a != b, '>=': lambda a,b: a >= b, '<=': lambda a,b: a <= b}
|
||
if op in ops:
|
||
satisfied = ops[op](nv, tv) == want
|
||
if not satisfied:
|
||
skip_impossible = True
|
||
break
|
||
except (ValueError, TypeError):
|
||
pass
|
||
if skip_impossible:
|
||
continue
|
||
# Pass B: 约束覆盖(确保决策条件满足,覆盖 MOVE 带来的值)
|
||
for c in path_cons:
|
||
if len(c) == 4:
|
||
field, op, val, want = c
|
||
apply_constraint(rec, field, op, val, want, data_fields, base_assignments, path_assign)
|
||
# Pass B.5: 前向再传播变量间MOVE,保持约束修改后的链一致性
|
||
if isinstance(path_assign, dict):
|
||
forward = {}
|
||
for tgt, asgn_val in path_assign.items():
|
||
asgn_list = asgn_val if isinstance(asgn_val, list) else [asgn_val]
|
||
filtered = [a for a in asgn_list if a['type'] == 'move' and a.get('source_vars')]
|
||
if filtered:
|
||
forward[tgt] = filtered
|
||
if forward:
|
||
propagate_assignments(rec, forward, data_fields, file_sec=file_sec)
|
||
# Pass B.75: COMPUTE 重算(约束修改了 COMPUTE 源字段的值)
|
||
if isinstance(path_assign, dict):
|
||
compute_only = {}
|
||
for tgt, asgn_val in path_assign.items():
|
||
asgn_list = asgn_val if isinstance(asgn_val, list) else [asgn_val]
|
||
filtered = [a for a in asgn_list if a['type'] == 'compute']
|
||
if filtered:
|
||
compute_only[tgt] = filtered
|
||
if compute_only:
|
||
propagate_assignments(rec, compute_only, data_fields, file_sec=file_sec)
|
||
# Pass B.8: UNSTRING source reconstruction (targets → source)
|
||
if base_assignments:
|
||
_reconstruct_unstring_sources(rec, base_assignments, data_fields)
|
||
# Pass C: 同步 REDEFINES(确保共享存储一致)
|
||
sync_redefined_fields(rec, data_fields)
|
||
# Pass D: OCCURS DEPENDING ON — 清零超范围的下标字段
|
||
apply_occurs_depending(rec, data_fields)
|
||
|
||
# Pass E: PIC 长度约束 — 模拟 COBOL 截断语义
|
||
for f in data_fields:
|
||
name = f['name']
|
||
if name in rec and not f.get('is_88') and not f.get('is_filler'):
|
||
pi = f.get('pic_info', {})
|
||
ftype = pi.get('type', 'unknown')
|
||
val = str(rec[name])
|
||
if ftype == 'numeric':
|
||
total = pi.get('digits', 0) + pi.get('decimal', 0)
|
||
if total > 0 and len(val) > total:
|
||
rec[name] = val[-total:].zfill(total)
|
||
elif ftype in ('alphanumeric', 'alphabetic'):
|
||
length = pi.get('length', 0)
|
||
if length > 0 and len(val) > length:
|
||
rec[name] = val[:length]
|
||
|
||
records.append(rec)
|
||
kept_path_cons.append(path_cons)
|
||
term_types.append(term_type)
|
||
# Track which fields were explicitly assigned in this path
|
||
if isinstance(path_assign, dict):
|
||
rec['_assigned_fields'] = set(path_assign.keys())
|
||
else:
|
||
rec['_assigned_fields'] = set()
|
||
if not records:
|
||
rec = make_base_record(1, data_fields)
|
||
if base_assignments:
|
||
propagate_assignments(rec, base_assignments, data_fields, file_sec=file_sec)
|
||
if base_assignments:
|
||
_reconstruct_unstring_sources(rec, base_assignments, data_fields)
|
||
rec['_assigned_fields'] = set()
|
||
records.append(rec)
|
||
kept_path_cons.append([])
|
||
term_types.append('normal')
|
||
return records, kept_path_cons, term_types
|
||
|
||
|
||
def _reconstruct_unstring_sources(rec, base_assignments, data_fields):
|
||
"""Build UNSTRING source field value from comma-separated target values.
|
||
After constraints determine target field values, construct the source
|
||
string so the COBOL UNSTRING can correctly parse it.
|
||
"""
|
||
groups = {}
|
||
for tgt, asgn_list in base_assignments.items():
|
||
for asgn in asgn_list:
|
||
if asgn.get('type') == 'unstring_split' and asgn.get('source_vars'):
|
||
src = asgn['source_vars'][0]
|
||
idx = asgn.get('index', 0)
|
||
groups.setdefault(src, []).append((idx, tgt))
|
||
|
||
for src_var, targets in groups.items():
|
||
targets.sort(key=lambda x: x[0])
|
||
# Resolve group→child name if source not directly in rec
|
||
resolved_src = src_var
|
||
if resolved_src not in rec:
|
||
grp_level = None
|
||
found = False
|
||
for f in data_fields:
|
||
if not found and f['name'] == resolved_src:
|
||
grp_level = f.get('level', 0)
|
||
found = True
|
||
continue
|
||
if found:
|
||
if f.get('level', 0) <= grp_level or f.get('level') == 77:
|
||
break
|
||
if f.get('pic'):
|
||
resolved_src = f['name']
|
||
break
|
||
if resolved_src not in rec:
|
||
continue
|
||
csv_parts = []
|
||
for idx, tgt in targets:
|
||
val = rec.get(tgt, '')
|
||
csv_parts.append(val if val is not None else '')
|
||
csv_value = ','.join(csv_parts)
|
||
src_len = 0
|
||
for f in data_fields:
|
||
if f['name'] == resolved_src:
|
||
pi = f.get('pic_info', {})
|
||
if pi:
|
||
src_len = pi.get('length', 0)
|
||
break
|
||
if src_len > 0:
|
||
csv_value = csv_value.ljust(src_len)[:src_len]
|
||
rec[resolved_src] = csv_value
|
||
# Also sync to child fields (group→elementary) for FD output consistency
|
||
if resolved_src == src_var:
|
||
grp_level = None
|
||
found = False
|
||
for f in data_fields:
|
||
if not found and f['name'] == resolved_src:
|
||
grp_level = f.get('level', 0)
|
||
found = True
|
||
continue
|
||
if found:
|
||
if f.get('level', 0) <= grp_level or f.get('level') == 77:
|
||
break
|
||
if f.get('pic'):
|
||
rec[f['name']] = csv_value
|
||
break
|