"""设计层:路径枚举 + 值生成 + 约束应用""" import re import logging from .models import BrSeq, BrIf, BrEval, BrPerform, BrSearch, Assign, CallNode, CondNot, CondLeaf, ExitNode, GoTo from .cond import parse_single_condition, parse_compound_condition, is_field, collect_leaves, mcdc_sets, satisfying_value from .core import trace_to_root, invert_through_chain, propagate_assignments, _basename logger = logging.getLogger(__name__) _STOP = ('__STOP__', '', None, True) _MAX_PATHS = 10000 def _filter_stop(cons): return [c for c in cons if c is not _STOP] def _cap_paths(paths): if len(paths) > _MAX_PATHS: return paths[:_MAX_PATHS] return paths def _cap_paths_fair(new_active, child_paths): """两阶段公平截断:每个前置路径至少保留一条子路径,再填充剩余配额。""" if len(new_active) <= _MAX_PATHS: return new_active k = len(child_paths) if k <= 1: return new_active[:_MAX_PATHS] # 分离 STOP 路径(不参与组合,直接保留) stop_paths = [(p, a) for p, a in new_active if any(c is _STOP for c in p)] combined = [(p, a) for p, a in new_active if not any(c is _STOP for c in p)] n_pred = len(combined) // k result = list(stop_paths) if n_pred <= 1: result.extend(combined[:_MAX_PATHS - len(result)]) return result[:_MAX_PATHS] remaining_quota = _MAX_PATHS - len(result) # Phase 1: 每个前置至少保留一条子路径(轮询分配不同子路径索引) quota = min(n_pred, remaining_quota) selected = set() for p_idx in range(quota): c_idx = p_idx % k idx = p_idx * k + c_idx selected.add(idx) result.append(combined[idx]) if len(result) >= _MAX_PATHS: return result[:_MAX_PATHS] # Phase 2: 用剩余配额填充其余组合 remaining = _MAX_PATHS - len(result) for idx in range(len(combined)): if idx not in selected: result.append(combined[idx]) remaining -= 1 if remaining <= 0: break return result[:_MAX_PATHS] # ── 路径枚举 ── def enum_paths(node, fields): """枚举路径,每条路径返回 (constraints, assignments). 返回 list[tuple[list[tuple], dict]]. """ if isinstance(node, Assign): return [([], {node.target: [node.source_info]})] if isinstance(node, BrSeq): if not node.children: return [([], {})] paths = [([], {})] for child in node.children: child_paths = _cap_paths(enum_paths(child, fields)) new_active = [] for p_cons, p_assign in paths: if any(c is _STOP for c in p_cons): new_active.append((p_cons, p_assign)) continue for cp_cons, cp_assign in child_paths: merged = {} for d in (p_assign, cp_assign): for k, v in d.items(): merged.setdefault(k, []).extend(v if isinstance(v, list) else [v]) merged_cons = p_cons + list(cp_cons) new_active.append((merged_cons, merged)) paths = _cap_paths_fair(new_active, child_paths) return paths elif isinstance(node, BrIf): parsed = parse_single_condition(node.condition, fields) if parsed and is_field(parsed[0], fields): field, op, val = parsed paths = [] true_sub = _cap_paths(enum_paths(node.true_seq, fields)) for sp_cons, sp_assign in (true_sub or [([], {})]): paths.append(([(field, op, val, True)] + sp_cons, sp_assign)) false_sub = _cap_paths(enum_paths(node.false_seq, fields)) for fp_cons, fp_assign in (false_sub or [([], {})]): paths.append(([(field, op, val, False)] + fp_cons, fp_assign)) return paths # CondNot wrapping a single leaf (e.g., IF NOT WS-AMOUNT > 1000) if node.cond_tree and isinstance(node.cond_tree, CondNot): child = node.cond_tree.child if isinstance(child, CondLeaf) and is_field(child.field, fields): paths = [] true_sub = _cap_paths(enum_paths(node.true_seq, fields)) for sp_cons, sp_assign in (true_sub or [([], {})]): paths.append(([(child.field, child.op, child.value, False)] + sp_cons, sp_assign)) false_sub = _cap_paths(enum_paths(node.false_seq, fields)) for fp_cons, fp_assign in (false_sub or [([], {})]): paths.append(([(child.field, child.op, child.value, True)] + fp_cons, fp_assign)) return paths if node.cond_tree: leaves = collect_leaves(node.cond_tree) if leaves and all(is_field(l.field, fields) for l in leaves): sets = mcdc_sets(node.cond_tree, fields) if sets: paths = [] for constraints, decision in sets: body = _cap_paths(enum_paths( node.true_seq if decision else node.false_seq, fields )) for sp_cons, sp_assign in (body or [([], {})]): paths.append((constraints + sp_cons, sp_assign)) return paths # CondLeaf fallback: 单 leaf(含 88-level 解析后的条件树)MC/DC 不适用 if len(leaves) == 1: leaf = leaves[0] paths = [] true_sub = _cap_paths(enum_paths(node.true_seq, fields)) for sp_cons, sp_assign in (true_sub or [([], {})]): paths.append(([(leaf.field, leaf.op, leaf.value, True)] + sp_cons, sp_assign)) false_sub = _cap_paths(enum_paths(node.false_seq, fields)) for fp_cons, fp_assign in (false_sub or [([], {})]): paths.append(([(leaf.field, leaf.op, leaf.value, False)] + fp_cons, fp_assign)) return paths # Fallback: parsed condition but non-field (e.g. arithmetic expr) if parsed: field, op, val = parsed paths = [] true_sub = enum_paths(node.true_seq, fields) for sp_cons, sp_assign in (true_sub or [([], {})]): paths.append(([(field, op, val, True)] + sp_cons, sp_assign)) false_sub = enum_paths(node.false_seq, fields) for fp_cons, fp_assign in (false_sub or [([], {})]): paths.append(([(field, op, val, False)] + fp_cons, fp_assign)) return paths return [([], {})] elif isinstance(node, BrEval): if node.subjects: paths = [] prior_false_cons = [] for values, seq in node.when_list: sub = _cap_paths(enum_paths(seq, fields)) for sp_cons, sp_assign in (sub or [([], {})]): when_cons = [(node.subjects[i], '=', values[i], True) for i in range(len(node.subjects))] constraints = list(prior_false_cons) + when_cons + sp_cons paths.append((constraints, sp_assign)) for i in range(len(node.subjects)): prior_false_cons.append((node.subjects[i], '=', values[i], False)) if node.has_other: sub = _cap_paths(enum_paths(node.other_seq, fields)) for sp_cons, sp_assign in (sub or [([], {})]): paths.append((list(prior_false_cons) + sp_cons, sp_assign)) return paths if node.subject == 'TRUE': paths = [] prior_false_sets = [] # list[list[Constraint]] for value, seq in node.when_list: cond = parse_compound_condition(value, fields) if cond and isinstance(cond, CondLeaf) and is_field(cond.field, fields): sub = _cap_paths(enum_paths(seq, fields)) for sp_cons, sp_assign in (sub or [([], {})]): constraints = [c for pf in prior_false_sets for c in pf] constraints.append((cond.field, cond.op, cond.value, True)) paths.append((constraints + sp_cons, sp_assign)) prior_false_sets.append([(cond.field, cond.op, cond.value, False)]) elif cond: leaves = collect_leaves(cond) if leaves and all(is_field(l.field, fields) for l in leaves): sets = mcdc_sets(cond, fields) if sets: sub = _cap_paths(enum_paths(seq, fields)) new_false_sets = [] for cs, decision in sets: if decision: if not prior_false_sets: for sp_cons, sp_assign in (sub or [([], {})]): paths.append((list(cs) + sp_cons, sp_assign)) else: for pf_set in prior_false_sets: for sp_cons, sp_assign in (sub or [([], {})]): paths.append((list(pf_set) + list(cs) + sp_cons, sp_assign)) else: new_false_sets.append(cs) if not new_false_sets: prior_false_sets = [] break combined = [] for pf_set in prior_false_sets: for nf_set in new_false_sets: combined.append(list(pf_set) + list(nf_set)) prior_false_sets = combined else: prior_false_sets = [] break else: prior_false_sets = [] break else: prior_false_sets = [] break if node.has_other: sub = _cap_paths(enum_paths(node.other_seq, fields)) for sp_cons, sp_assign in (sub or [([], {})]): constraints = [c for pf in prior_false_sets for c in pf] paths.append((constraints + sp_cons, sp_assign)) return paths if not is_field(node.subject, fields): return [([], {})] paths = [] for value, seq in node.when_list: sub = _cap_paths(enum_paths(seq, fields)) for sp_cons, sp_assign in (sub or [([], {})]): paths.append(([(node.subject, '=', value, True)] + sp_cons, sp_assign)) if node.has_other: case_vals = [v for v, _ in node.when_list] sub = _cap_paths(enum_paths(node.other_seq, fields)) for sp_cons, sp_assign in (sub or [([], {})]): paths.append(([(node.subject, 'not_in', case_vals, True)] + sp_cons, sp_assign)) return paths elif isinstance(node, BrSearch): return _enum_search_paths(node, fields) elif isinstance(node, BrPerform): if node.perf_type in ('para', 'thru'): if node.body_seq: return enum_paths(node.body_seq, fields) return [([], {})] elif node.perf_type in ('until', 'para_until', 'varying', 'para_varying'): # 尝试单条件(现有逻辑) parsed = parse_single_condition(node.condition, fields) if parsed and is_field(parsed[0], fields): field, op, val = parsed paths = [] false_sub = _cap_paths(enum_paths(node.body_seq, fields)) for sp_cons, sp_assign in (false_sub or [([], {})]): # PERFORM VARYING: 将 FROM 值作为 MOVE 赋值加入 Enter 路径 if node.varying_from and node.varying_var: is_fld = any(f['name'] == node.varying_from for f in fields) if fields else False from_asgn = {'type': 'move', 'source_vars': [node.varying_from]} if is_fld else {'type': 'move_literal', 'literal': node.varying_from} from_assign = {node.varying_var: [from_asgn]} merged = {} for d in (from_assign, sp_assign): for k, v in d.items(): merged.setdefault(k, []).extend(v if isinstance(v, list) else [v]) sp_assign = merged paths.append(([(field, op, val, False)] + sp_cons, sp_assign)) paths.append(([(field, op, val, True)], {})) return paths # 尝试复合条件(AND/OR) cond_tree = parse_compound_condition(node.condition, fields) if cond_tree: leaves = collect_leaves(cond_tree) if leaves and all(is_field(l.field, fields) for l in leaves): sets = mcdc_sets(cond_tree, fields) if sets: paths = [] false_sub = _cap_paths(enum_paths(node.body_seq, fields)) for sp_cons, sp_assign in (false_sub or [([], {})]): # PERFORM VARYING: 将 FROM 值作为 MOVE 赋值加入 Enter 路径 if node.varying_from and node.varying_var: is_fld = any(f['name'] == node.varying_from for f in fields) if fields else False from_asgn = {'type': 'move', 'source_vars': [node.varying_from]} if is_fld else {'type': 'move_literal', 'literal': node.varying_from} from_assign = {node.varying_var: [from_asgn]} merged = {} for d in (from_assign, sp_assign): for k, v in d.items(): merged.setdefault(k, []).extend(v if isinstance(v, list) else [v]) sp_assign = merged for constraints, decision in sets: if not decision: paths.append((list(constraints) + sp_cons, sp_assign)) for constraints, decision in sets: if decision: paths.append((list(constraints), {})) if paths: return paths return [([], {})] elif isinstance(node, CallNode): return [([], {})] elif isinstance(node, ExitNode): return [([_STOP], {})] elif isinstance(node, GoTo): paths = enum_paths(node.body_seq, fields) return [([_STOP] + c, a) for c, a in paths] return [([], {})] # ── 值生成 ── def seq_numeric(seq_num: int, total_digits: int) -> str: val = seq_num % (10 ** total_digits) if val == 0: val = 10 ** total_digits - 1 return str(val).zfill(total_digits) def seq_alpha(seq_num: int, length: int) -> str: letter = chr(65 + (seq_num - 1) % 26) return letter * length def seq_date(seq_num: int) -> str: from datetime import datetime, timedelta base = datetime(2000, 1, 1) d = base + timedelta(days=seq_num - 1) return d.strftime('%Y%m%d') def _is_date_field(name: str) -> bool: patterns = [r'DATE', r'YYMMDD', r'YYYYMM', r'YEAR', r'MONTH', r'DAY'] for p in patterns: if re.search(p, name.upper()): return True return False _SPECIAL_VALUES = { 'ZERO': '0', 'ZEROS': '0', 'ZEROES': '0', 'SPACE': ' ', 'SPACES': ' ', 'HIGH-VALUE': '\xff', 'HIGH-VALUES': '\xff', 'LOW-VALUE': '\x00', 'LOW-VALUES': '\x00', 'QUOTE': "'", 'QUOTES': "'", 'ALL': '', } def _apply_value(field: dict, rec: dict) -> bool: """尝试应用 VALUE 子句的初始值。返回 True 表示已处理。""" raw = field.get('value') if raw is None: return False val = str(raw).strip("'\"").strip() name = field['name'] pi = field.get('pic_info', {}) # 处理 COBOL 特殊值 if val.upper() in _SPECIAL_VALUES: val = _SPECIAL_VALUES[val.upper()] ftype = pi.get('type', 'unknown') if ftype == 'numeric': digits = pi.get('digits', 0) + pi.get('decimal', 0) if digits: rec[name] = val.zfill(digits) else: rec[name] = val else: length = pi.get('length', 0) or 1 rec[name] = val.ljust(length)[:length] return True def _children_of(group_name: str, fields: list) -> list: """返回组项目 group_name 在 fields 中的直属子字段列表(按声明顺序)。 终止条件:遇到同/更高级别(sibling/组边界)或 77 级(独立字段)。 """ result = [] group_level = None found = False for f in fields: if not found and f['name'] == group_name: group_level = f['level'] found = True continue if found: if f['level'] <= group_level or f['level'] == 77: break # 88-level 是条件名,不计为子字段 if f.get('is_88'): continue result.append(f) return result def _make_numeric_value(idx: int, record_num: int, total_digits: int) -> str: for step in (100, 10, 1): val = idx * step + record_num if val < 10 ** total_digits: return str(val).zfill(total_digits) return str(record_num).zfill(total_digits) def _make_alpha_value(idx: int, record_num: int, length: int) -> str: if length == 1: ch = chr(65 + (idx + record_num - 2) % 26) return ch letter = chr(65 + (idx - 1) % 26) return letter + str(record_num).zfill(length - 1) def make_base_record(seq_num: int, fields: list) -> dict: rec = {} redefines_map = {} # 标量 REDEFINES: parent_name → [child_names] group_redefines = [] # 组 REDEFINES: [(redef_name, target_name)] filler_key_counter = 0 numeric_idx = 0 alpha_idx = 0 record_num = seq_num for f in fields: name = f['name'] if f.get('is_88'): continue if f.get('redefines'): parent = f['redefines'] if f.get('pic'): # 标量 REDEFINES(有 PIC,如 WS-AMOUNT-DISP REDEFINES WS-AMOUNT PIC X(9)) redefines_map.setdefault(parent, []).append(name) continue else: # 组 REDEFINES(无 PIC,如 CUST-ADDR2 REDEFINES CUST-ADDR) group_redefines.append((name, parent)) # 不 continue — 组本身无 PIC 会在下方"组项目跳过"处理 # 其子字段作为独立字段正常走循环 if f.get('is_filler'): if name in rec: filler_key_counter += 1 name = f'FILLER_{filler_key_counter + 1}' rec[name] = 'x' * (f.get('pic_info', {}).get('length', 0) or 1) continue # Pass 0: VALUE 子句初始值优先 if _apply_value(f, rec): continue # 组项目(无 PIC)跳过 if not f.get('pic'): continue pi = f.get('pic_info', {}) ftype = pi.get('type', 'unknown') digits = pi.get('digits', 0) decimal = pi.get('decimal', 0) length = pi.get('length', 0) if ftype == 'numeric': if _is_date_field(name): rec[name] = seq_date(record_num) else: numeric_idx += 1 rec[name] = _make_numeric_value(numeric_idx, record_num, digits + decimal) elif ftype in ('alphanumeric', 'alphabetic'): alpha_idx += 1 rec[name] = _make_alpha_value(alpha_idx, record_num, length or 1) elif ftype == 'numeric-edited': numeric_idx += 1 raw = _make_numeric_value(numeric_idx, record_num, digits + decimal) rec[name] = raw.rjust(length) else: alpha_idx += 1 rec[name] = _make_alpha_value(alpha_idx, record_num, 8) # Pass 2a: 标量 REDEFINES 复制 for parent_name, child_names in redefines_map.items(): if parent_name in rec: for child_name in child_names: rec[child_name] = rec[parent_name] # Pass 2b: 组 REDEFINES 按位置递归复制子字段 for redef_name, target_name in group_redefines: redef_kids = _children_of(redef_name, fields) tgt_kids = _children_of(target_name, fields) tgt_idx = 0 for i, rk in enumerate(redef_kids): if tgt_idx >= len(tgt_kids): break if i == len(redef_kids) - 1 and len(redef_kids) < len(tgt_kids): # 最后一个 REDEFINES 子字段,且目标更多 → 拼接剩余所有目标值 parts = [rec.get(tk['name'], '') for tk in tgt_kids[tgt_idx:]] rec[rk['name']] = ''.join(parts) elif i == len(redef_kids) - 1 and len(redef_kids) > len(tgt_kids): # REDEFINES 子字段更多 → 最后一个 REDEFINES 子字段取最后目标值 rec[rk['name']] = rec.get(tgt_kids[-1]['name'], '') else: rec[rk['name']] = rec.get(tgt_kids[tgt_idx]['name'], '') tgt_idx += 1 return rec # ── 约束应用 ── def _check_constraint_satisfied(rec, field_name, operator, value, want_true, fields): """检查 field_name 当前值是否满足该约束。满足返回 True。""" for f in fields: if f['name'] == field_name: pi = f.get('pic_info', {}) ftype = pi.get('type', 'unknown') val = rec.get(field_name) if val is None: return False if operator == 'not_in': cases = value if isinstance(value, list) else [] return str(val) not in cases if ftype == 'numeric': try: num_val = int(float(str(val))) num_target = int(float(str(value))) except (ValueError, TypeError): return False if operator in ('>=', '>', '<', '<=', '=', '<>'): if operator == '>=': ok = num_val >= num_target elif operator == '>': ok = num_val > num_target elif operator == '<': ok = num_val < num_target elif operator == '<=': ok = num_val <= num_target elif operator == '=': ok = num_val == num_target elif operator == '<>': ok = num_val != num_target return ok == want_true return True else: s_val = str(val).strip().upper() s_target = str(value).strip().upper() eq = s_val == s_target if operator == '=': return eq == want_true elif operator == '<>': return (not eq) == want_true return True return False _ARITH_BOUNDS = { 'left_big_ops': {'>', '>=', '<>'}, 'left_small_ops': {'<', '<='}, } def _arith_pic_info(field_name, fields): for f in fields: if f['name'] == field_name.upper(): return f.get('pic_info', {}) return {} def _arith_numeric_pick(field_name, want_big, fields): """为字段选一个大值或小值,返回字符串。""" pi = _arith_pic_info(field_name, fields) if pi.get('type') != 'numeric': return None digits = pi.get('digits', 0) decimal = pi.get('decimal', 0) total = digits + decimal max_val = 10 ** total - 1 if want_big: pick = int(max_val * 0.7) else: pick = 1 int_part = str(pick // (10 ** decimal)).zfill(digits) dec_part = str(pick % (10 ** decimal)).zfill(decimal) if decimal == 0: return int_part return int_part + dec_part def _apply_arith_constraint(rec, field_name, operator, value, want_true, fields): """对算术表达式条件进行字段值 steering。 例如 A + B > C (want_true=True): - 左值字段(A, B)设大 → 右值字段(C)设小 例如 A + B <= C (want_true=True): - 左值字段设小 → 右值字段设大 这是启发式 steering,不是精确求解。 主要目标是保证分支可达,不保证边界值精确。 """ # 1. 提取左值表达式中的所有字段名(大写) tokens = re.findall(r'\b[A-Z][A-Z0-9-]*(?:\([^)]*\))?\b', field_name.upper()) left_fields = [t for t in tokens if any(f['name'] == t for f in fields)] # 2. 右值是否也为字段 right_field = value if any(f['name'] == value for f in fields) else None if not left_fields: logger.debug(f"算术表达式无法提取字段: {field_name}") return # 3. 确定方向:want_true 时左值应大还是小 if operator in _ARITH_BOUNDS['left_big_ops']: left_big = want_true elif operator in _ARITH_BOUNDS['left_small_ops']: left_big = not want_true else: left_big = want_true # 4. 设置左值字段 for lf in left_fields: pick = _arith_numeric_pick(lf, left_big, fields) if pick is not None: rec[lf] = pick # 5. 设置右值字段(如果有) if right_field: pick = _arith_numeric_pick(right_field, not left_big, fields) if pick is not None: rec[right_field] = pick def apply_constraint(rec, field_name, operator, value, want_true, fields, assignments=None, path_assign=None): # 标准化字段名:去除括号内空格(WS-CELL ( 1, 1 ) → WS-CELL(1,1)) field_name = re.sub(r'\s*([(),])\s*', r'\1', field_name) # 变量下标解析:WS-FIXED-VALUE(WS-IDX) → WS-FIXED-VALUE(1) vm = re.match(r'^(\w[\w-]*)\((\w[\w-]*)\)$', field_name) if vm: base_var, subscript_var = vm.groups() if subscript_var in rec: try: resolved_name = f'{base_var}({int(rec[subscript_var])})' if any(f['name'] == resolved_name for f in fields): apply_constraint(rec, resolved_name, operator, value, want_true, fields, assignments, path_assign) return except (ValueError, TypeError): pass # 下标传播:无下标约束 → 应用到所有下标变体 base = _basename(field_name) subscripted = [f for f in fields if f['name'] != base and _basename(f['name']) == base] if subscripted and field_name == base: for sf in subscripted: apply_constraint(rec, sf['name'], operator, value, want_true, fields, assignments, path_assign) return # REDEFINES 字段的约束重定向到父字段(共享存储) for f in fields: if f['name'] == field_name: if f.get('is_filler'): return if f.get('redefines'): parent_name = f['redefines'] logger.debug(f"REDEFINES 约束重定向: {field_name} → {parent_name}") apply_constraint(rec, parent_name, operator, value, want_true, fields, assignments, path_assign) return break if assignments: root_var, chain = trace_to_root(field_name, assignments, fields, path_assign) if root_var != field_name: new_field_name, new_op, new_val = invert_through_chain(root_var, chain, operator, value) if any(f['name'] == new_field_name for f in fields): field_name, operator, value = new_field_name, new_op, new_val # 如果当前值已满足该约束,跳过覆盖(保持先前约束的一致性) if _check_constraint_satisfied(rec, field_name, operator, value, want_true, fields): return if operator == 'not_in': for f in fields: if f['name'] == field_name: pi = f.get('pic_info', {}) cases = value if isinstance(value, list) else [] ftype = pi.get('type', 'unknown') if ftype in ('alphanumeric', 'alphabetic'): for c in 'ABCDEFGHIJKLMNOPQRSTUVWXYZ': if c not in cases: rec[field_name] = c.ljust(pi.get('length', 1), c) return else: for n in range(1, 100): if str(n) not in cases: rec[field_name] = str(n).zfill(pi.get('digits', 0) + pi.get('decimal', 0)) return return # 字段间比较(值侧也是字段名) if any(f['name'] == value for f in fields): if re.search(r'[+\-*/]', field_name): _apply_arith_constraint(rec, field_name, operator, value, want_true, fields) else: logger.debug(f"字段间比较约束跳过:{field_name} {operator} {value}") return for f in fields: if f['name'] == field_name: pi = f.get('pic_info', {}) val = satisfying_value(pi, operator, value, want_true) rec[field_name] = val return # ── 记录生成入口 ── def sync_redefined_fields(rec, fields): """赋值/约束后同步 REDEFINES 字段:父字段的值拷贝到所有 REDEFINES 子字段。""" redefines_map = {} group_redefines = [] for f in fields: if f.get('is_88') or f.get('is_filler'): continue if f.get('redefines') and f.get('pic'): redefines_map.setdefault(f['redefines'], []).append(f['name']) elif f.get('redefines') and not f.get('pic'): group_redefines.append((f['name'], f['redefines'])) for parent_name, child_names in redefines_map.items(): if parent_name in rec: for child_name in child_names: rec[child_name] = rec[parent_name] for redef_name, target_name in group_redefines: redef_kids = _children_of(redef_name, fields) tgt_kids = _children_of(target_name, fields) tgt_idx = 0 for i, rk in enumerate(redef_kids): if tgt_idx >= len(tgt_kids): break if i == len(redef_kids) - 1 and len(redef_kids) < len(tgt_kids): parts = [rec.get(tk['name'], '') for tk in tgt_kids[tgt_idx:]] rec[rk['name']] = ''.join(parts) elif i == len(redef_kids) - 1 and len(redef_kids) > len(tgt_kids): rec[rk['name']] = rec.get(tgt_kids[-1]['name'], '') else: rec[rk['name']] = rec.get(tgt_kids[tgt_idx]['name'], '') tgt_idx += 1 def apply_occurs_depending(rec, fields): """根据 OCCURS DEPENDING ON 变量的当前值,清零超范围的下标字段。""" for f in fields: dep_var = f.get('occurs_depending') if not dep_var: continue name = f['name'] m = re.search(r'\((\d+)\)$', name) if not m: continue sub = int(m.group(1)) max_val = int(rec.get(dep_var, 0)) if sub <= max_val: continue pi = f.get('pic_info', {}) ftype = pi.get('type', 'unknown') length = pi.get('length', 0) or 1 if ftype == 'numeric': rec[name] = '0' * (pi.get('digits', 0) + pi.get('decimal', 0)) elif ftype in ('alphanumeric', 'alphabetic'): rec[name] = ' ' * length else: rec[name] = '0' * length def _non_match_for(cond_leaf, fields): if not fields or not cond_leaf: return None base = re.sub(r'\s*\(.*?\)\s*$', '', cond_leaf.field) for f in fields: if re.sub(r'\s*\(.*?\)\s*$', '', f['name']) == base: pic = f.get('pic_info', {}) if pic.get('type') == 'numeric': return '0' return ' ' return None def _enum_search_paths(node, fields): # 从条件字段名推断 OCCURS 数;如 WS-CODE-VAL(WS-IDX) → 查 WS-CODE-VAL(j) 最大 j occurs_count = 1 if node.when_list and node.cond_trees and node.cond_trees[0]: ct = node.cond_trees[0] if isinstance(ct, CondLeaf): base = re.sub(r'\s*\(.*?\)\s*$', '', ct.field) for f in fields: m = re.match(rf'^{re.escape(base)}\((\d+)\)$', f['name']) if m: occurs_count = max(occurs_count, int(m.group(1))) if occurs_count <= 1: # 再查父组名下各字段的后缀 parent = node.table_name for f in fields: m = re.match(rf'^{re.escape(parent)}\((\d+)\)$', f['name']) if m: occurs_count = max(occurs_count, int(m.group(1))) paths = [] for i, (cond_text, body_seq) in enumerate(node.when_list): cond_tree = node.cond_trees[i] if i < len(node.cond_trees) else None sub = _cap_paths(enum_paths(body_seq, fields)) if not sub: sub = [([], {})] extra_assign = {} if cond_tree and isinstance(cond_tree, CondLeaf): base = re.sub(r'\s*\(.*?\)\s*$', '', cond_tree.field) matching_val = cond_tree.value elem_key = f'{base}({i + 1})' extra_assign[elem_key] = [{'type': 'move_literal', 'literal': matching_val}] non_match = _non_match_for(cond_tree, fields) or ' ' for j in range(i): prev_key = f'{base}({j + 1})' extra_assign[prev_key] = [{'type': 'move_literal', 'literal': non_match}] for sp_cons, sp_assign in (sub or [([], {})]): merged_assign = dict(extra_assign) for k, v in sp_assign.items(): merged_assign.setdefault(k, []).extend(v if isinstance(v, list) else [v]) paths.append((sp_cons, merged_assign)) if node.has_at_end: sub = _cap_paths(enum_paths(node.at_end_seq, fields)) for sp_cons, sp_assign in (sub or [([], {})]): extra_assign = {} non_match = ' ' if node.when_list: ct = node.cond_trees[0] if ct and isinstance(ct, CondLeaf): non_match = _non_match_for(ct, fields) or ' ' base = re.sub(r'\s*\(.*?\)\s*$', '', ct.field) for j in range(max(occurs_count, 1)): extra_assign[f'{base}({j + 1})'] = [{'type': 'move_literal', 'literal': non_match}] merged_assign = dict(extra_assign) for k, v in sp_assign.items(): merged_assign.setdefault(k, []).extend(v if isinstance(v, list) else [v]) paths.append((sp_cons, merged_assign)) return paths def generate_records(branch_paths_with_assigns, data_fields, base_assignments=None, file_sec=None): """生成测试数据记录。 branch_paths_with_assigns: list of (constraints, path_assignments). base_assignments: 全局 assignments dict (用于 trace_to_root). 返回: (records, kept_path_cons) — kept_path_cons 是与 records 一一对应的约束。 """ records = [] kept_path_cons = [] if branch_paths_with_assigns: for seq, (path_cons, path_assign) in enumerate(branch_paths_with_assigns, start=1): path_cons = _filter_stop(path_cons) rec = make_base_record(seq, data_fields) # Pass A: 先传播赋值(MOVE/COMPUTE/READ INTO 等),模拟到决策点前的程序状态 if isinstance(path_assign, dict): propagate_assignments(rec, path_assign, data_fields, file_sec=file_sec) # Pass A.5: 检查约束是否经过链追溯到字面量截断(不可能路径) skip_impossible = False if base_assignments and isinstance(path_assign, dict): for c in path_cons: if len(c) == 4 and not skip_impossible: field, op, val, want = c root_var, chain = trace_to_root(field, base_assignments, data_fields, path_assign) if root_var != field: new_fn, new_op, new_val = invert_through_chain(root_var, chain, op, val) if any(f['name'] == new_fn for f in data_fields): asgn_val = path_assign.get(root_var) if asgn_val is not None: asgn_list = asgn_val if isinstance(asgn_val, list) else [asgn_val] if asgn_list and asgn_list[-1]['type'] == 'move_literal' and root_var in rec: if not _check_constraint_satisfied(rec, root_var, new_op, new_val, want, data_fields): skip_impossible = True break if skip_impossible: continue # Pass B: 约束覆盖(确保决策条件满足,覆盖 MOVE 带来的值) for c in path_cons: if len(c) == 4: field, op, val, want = c apply_constraint(rec, field, op, val, want, data_fields, base_assignments, path_assign) # Pass B.5: 前向再传播变量间MOVE,保持约束修改后的链一致性 if isinstance(path_assign, dict): forward = {} for tgt, asgn_val in path_assign.items(): asgn_list = asgn_val if isinstance(asgn_val, list) else [asgn_val] filtered = [a for a in asgn_list if a['type'] == 'move' and a.get('source_vars')] if filtered: forward[tgt] = filtered if forward: propagate_assignments(rec, forward, data_fields, file_sec=file_sec) # Pass C: 同步 REDEFINES(确保共享存储一致) sync_redefined_fields(rec, data_fields) # Pass D: OCCURS DEPENDING ON — 清零超范围的下标字段 apply_occurs_depending(rec, data_fields) records.append(rec) kept_path_cons.append(path_cons) if not records: rec = make_base_record(1, data_fields) if base_assignments: propagate_assignments(rec, base_assignments, data_fields, file_sec=file_sec) records.append(rec) kept_path_cons.append([]) return records, kept_path_cons