feat: complete INSPECT/SEARCH support, fix PERFORM/EVAL coverage marking

- Add INSPECT (TALLYING/REPLACING/CONVERTING) with BEFORE/AFTER INITIAL
- Add SEARCH/SEARCH ALL with element-assignment path enumeration
- Fix _mark_perform compound condition marking via evaluate_tree
- Fix EVALUATE TRUE prior_false to collect all MC/DC false sets
- Add impossible path filtering (Pass A.5) with trace-to-root conflict detection
- Fix multi-line PERFORM VARYING parsing (VARYING/FROM/BY/UNTIL on separate lines)
- Remove dead code: agents.py LLM parser (replaced by rule-based _BrParser)
- 59 unit tests passing, 5 integration programs verified
This commit is contained in:
hangshuo652
2026-06-10 22:56:22 +08:00
parent 0730045e27
commit 7ac887c776
9 changed files with 509 additions and 1005 deletions
+180 -61
View File
@@ -1,20 +1,15 @@
"""设计层:路径枚举 + 值生成 + 约束应用"""
import os
import re
import logging
from . import agents, CONFIG
from .models import BrSeq, BrIf, BrEval, BrPerform, Assign, CallNode, CondNot, CondLeaf, ExitNode, GoTo
from .models import BrSeq, BrIf, BrEval, BrPerform, BrSearch, Assign, CallNode, CondNot, CondLeaf, ExitNode, GoTo
from .cond import parse_single_condition, parse_compound_condition, is_field, collect_leaves, mcdc_sets, satisfying_value
from .core import trace_to_root, invert_through_chain, propagate_assignments, _basename
logger = logging.getLogger(__name__)
_STOP = ('__STOP__', '', None, True)
_MAX_PATHS = 5000
_FALLBACK_MAX_PATHS = 100
_ACTIVE_MAX_PATHS = _MAX_PATHS
_LLM_FAILED = False
_MAX_PATHS = 10000
def _filter_stop(cons):
@@ -22,46 +17,51 @@ def _filter_stop(cons):
def _cap_paths(paths):
if len(paths) > _ACTIVE_MAX_PATHS:
return paths[:_ACTIVE_MAX_PATHS]
if len(paths) > _MAX_PATHS:
return paths[:_MAX_PATHS]
return paths
def _cap_paths_fair(new_active, child_paths):
"""两阶段公平截断:每个前置路径至少保留一条子路径,再填充剩余配额。"""
if len(new_active) <= _MAX_PATHS:
return new_active
k = len(child_paths)
if k <= 1:
return new_active[:_MAX_PATHS]
# 分离 STOP 路径(不参与组合,直接保留)
stop_paths = [(p, a) for p, a in new_active if any(c is _STOP for c in p)]
combined = [(p, a) for p, a in new_active if not any(c is _STOP for c in p)]
n_pred = len(combined) // k
result = list(stop_paths)
if n_pred <= 1:
result.extend(combined[:_MAX_PATHS - len(result)])
return result[:_MAX_PATHS]
remaining_quota = _MAX_PATHS - len(result)
# Phase 1: 每个前置至少保留一条子路径(轮询分配不同子路径索引)
quota = min(n_pred, remaining_quota)
selected = set()
for p_idx in range(quota):
c_idx = p_idx % k
idx = p_idx * k + c_idx
selected.add(idx)
result.append(combined[idx])
if len(result) >= _MAX_PATHS:
return result[:_MAX_PATHS]
# Phase 2: 用剩余配额填充其余组合
remaining = _MAX_PATHS - len(result)
for idx in range(len(combined)):
if idx not in selected:
result.append(combined[idx])
remaining -= 1
if remaining <= 0:
break
return result[:_MAX_PATHS]
# ── 路径枚举 ──
def _try_llm_enum_paths(node, fields):
global _LLM_FAILED
if _LLM_FAILED:
logger.debug("断路器已跳,跳过 LLM")
return None
if not CONFIG.get("llm_generator", True):
logger.debug("llm_generator 已关闭,降级规则引擎")
return None
if not os.environ.get(agents.DEEPSEEK_API_KEY_ENV):
logger.warning("DEEPSEEK_API_KEY 未设置,降级规则引擎")
return None
try:
result = agents.llm_generate_all_paths(node, fields)
if result is not None:
logger.info(f"LLM 路径生成成功,{len(result)}")
return result
logger.warning("LLM 返回空,降级规则引擎")
except Exception as e:
logger.error(f"LLM API 调用异常: {e}")
_LLM_FAILED = True
return None
def enum_paths(node, fields):
global _ACTIVE_MAX_PATHS
# === LLM 优先(整体替换整个树的路径生成) ===
llm_result = _try_llm_enum_paths(node, fields)
if llm_result is not None:
_ACTIVE_MAX_PATHS = _MAX_PATHS
return llm_result
if _ACTIVE_MAX_PATHS == _MAX_PATHS:
logger.warning("降级到规则引擎(路径上限 5000 → 100)")
_ACTIVE_MAX_PATHS = _FALLBACK_MAX_PATHS
"""枚举路径,每条路径返回 (constraints, assignments).
返回 list[tuple[list[tuple], dict]].
"""
@@ -86,7 +86,7 @@ def enum_paths(node, fields):
merged.setdefault(k, []).extend(v if isinstance(v, list) else [v])
merged_cons = p_cons + list(cp_cons)
new_active.append((merged_cons, merged))
paths = _cap_paths(new_active)
paths = _cap_paths_fair(new_active, child_paths)
return paths
elif isinstance(node, BrIf):
@@ -170,49 +170,56 @@ def enum_paths(node, fields):
return paths
if node.subject == 'TRUE':
paths = []
prior_false = []
prior_false_sets = [] # list[list[Constraint]]
for value, seq in node.when_list:
cond = parse_compound_condition(value, fields)
if cond and isinstance(cond, CondLeaf) and is_field(cond.field, fields):
# Simple condition
sub = _cap_paths(enum_paths(seq, fields))
for sp_cons, sp_assign in (sub or [([], {})]):
constraints = list(prior_false)
constraints = [c for pf in prior_false_sets for c in pf]
constraints.append((cond.field, cond.op, cond.value, True))
paths.append((constraints + sp_cons, sp_assign))
prior_false.append((cond.field, cond.op, cond.value, False))
prior_false_sets.append([(cond.field, cond.op, cond.value, False)])
elif cond:
# Compound condition — use MC/DC for path generation
leaves = collect_leaves(cond)
if leaves and all(is_field(l.field, fields) for l in leaves):
sets = mcdc_sets(cond, fields)
if sets:
sub = _cap_paths(enum_paths(seq, fields))
false_set = None
new_false_sets = []
for cs, decision in sets:
if decision:
for sp_cons, sp_assign in (sub or [([], {})]):
paths.append((list(prior_false) + list(cs) + sp_cons, sp_assign))
elif false_set is None:
false_set = cs
if false_set is not None:
prior_false.extend(false_set)
else:
prior_false = []
if not prior_false_sets:
for sp_cons, sp_assign in (sub or [([], {})]):
paths.append((list(cs) + sp_cons, sp_assign))
else:
for pf_set in prior_false_sets:
for sp_cons, sp_assign in (sub or [([], {})]):
paths.append((list(pf_set) + list(cs) + sp_cons, sp_assign))
else:
new_false_sets.append(cs)
if not new_false_sets:
prior_false_sets = []
break
combined = []
for pf_set in prior_false_sets:
for nf_set in new_false_sets:
combined.append(list(pf_set) + list(nf_set))
prior_false_sets = combined
else:
prior_false = []
prior_false_sets = []
break
else:
prior_false = []
prior_false_sets = []
break
else:
prior_false = []
prior_false_sets = []
break
if node.has_other:
sub = _cap_paths(enum_paths(node.other_seq, fields))
for sp_cons, sp_assign in (sub or [([], {})]):
paths.append((list(prior_false) + sp_cons, sp_assign))
constraints = [c for pf in prior_false_sets for c in pf]
paths.append((constraints + sp_cons, sp_assign))
return paths
if not is_field(node.subject, fields):
return [([], {})]
@@ -228,6 +235,9 @@ def enum_paths(node, fields):
paths.append(([(node.subject, 'not_in', case_vals, True)] + sp_cons, sp_assign))
return paths
elif isinstance(node, BrSearch):
return _enum_search_paths(node, fields)
elif isinstance(node, BrPerform):
if node.perf_type in ('para', 'thru'):
if node.body_seq:
@@ -743,12 +753,90 @@ def apply_occurs_depending(rec, fields):
rec[name] = '0' * length
def _non_match_for(cond_leaf, fields):
if not fields or not cond_leaf:
return None
base = re.sub(r'\s*\(.*?\)\s*$', '', cond_leaf.field)
for f in fields:
if re.sub(r'\s*\(.*?\)\s*$', '', f['name']) == base:
pic = f.get('pic_info', {})
if pic.get('type') == 'numeric':
return '0'
return ' '
return None
def _enum_search_paths(node, fields):
# 从条件字段名推断 OCCURS 数;如 WS-CODE-VAL(WS-IDX) → 查 WS-CODE-VAL(j) 最大 j
occurs_count = 1
if node.when_list and node.cond_trees and node.cond_trees[0]:
ct = node.cond_trees[0]
if isinstance(ct, CondLeaf):
base = re.sub(r'\s*\(.*?\)\s*$', '', ct.field)
for f in fields:
m = re.match(rf'^{re.escape(base)}\((\d+)\)$', f['name'])
if m:
occurs_count = max(occurs_count, int(m.group(1)))
if occurs_count <= 1:
# 再查父组名下各字段的后缀
parent = node.table_name
for f in fields:
m = re.match(rf'^{re.escape(parent)}\((\d+)\)$', f['name'])
if m:
occurs_count = max(occurs_count, int(m.group(1)))
paths = []
for i, (cond_text, body_seq) in enumerate(node.when_list):
cond_tree = node.cond_trees[i] if i < len(node.cond_trees) else None
sub = _cap_paths(enum_paths(body_seq, fields))
if not sub:
sub = [([], {})]
extra_assign = {}
if cond_tree and isinstance(cond_tree, CondLeaf):
base = re.sub(r'\s*\(.*?\)\s*$', '', cond_tree.field)
matching_val = cond_tree.value
elem_key = f'{base}({i + 1})'
extra_assign[elem_key] = [{'type': 'move_literal', 'literal': matching_val}]
non_match = _non_match_for(cond_tree, fields) or ' '
for j in range(i):
prev_key = f'{base}({j + 1})'
extra_assign[prev_key] = [{'type': 'move_literal', 'literal': non_match}]
for sp_cons, sp_assign in (sub or [([], {})]):
merged_assign = dict(extra_assign)
for k, v in sp_assign.items():
merged_assign.setdefault(k, []).extend(v if isinstance(v, list) else [v])
paths.append((sp_cons, merged_assign))
if node.has_at_end:
sub = _cap_paths(enum_paths(node.at_end_seq, fields))
for sp_cons, sp_assign in (sub or [([], {})]):
extra_assign = {}
non_match = ' '
if node.when_list:
ct = node.cond_trees[0]
if ct and isinstance(ct, CondLeaf):
non_match = _non_match_for(ct, fields) or ' '
base = re.sub(r'\s*\(.*?\)\s*$', '', ct.field)
for j in range(max(occurs_count, 1)):
extra_assign[f'{base}({j + 1})'] = [{'type': 'move_literal', 'literal': non_match}]
merged_assign = dict(extra_assign)
for k, v in sp_assign.items():
merged_assign.setdefault(k, []).extend(v if isinstance(v, list) else [v])
paths.append((sp_cons, merged_assign))
return paths
def generate_records(branch_paths_with_assigns, data_fields, base_assignments=None, file_sec=None):
"""生成测试数据记录。
branch_paths_with_assigns: list of (constraints, path_assignments).
base_assignments: 全局 assignments dict (用于 trace_to_root).
返回: (records, kept_path_cons) — kept_path_cons 是与 records 一一对应的约束。
"""
records = []
kept_path_cons = []
if branch_paths_with_assigns:
for seq, (path_cons, path_assign) in enumerate(branch_paths_with_assigns, start=1):
path_cons = _filter_stop(path_cons)
@@ -756,20 +844,51 @@ def generate_records(branch_paths_with_assigns, data_fields, base_assignments=No
# Pass A: 先传播赋值(MOVE/COMPUTE/READ INTO 等),模拟到决策点前的程序状态
if isinstance(path_assign, dict):
propagate_assignments(rec, path_assign, data_fields, file_sec=file_sec)
# Pass A.5: 检查约束是否经过链追溯到字面量截断(不可能路径)
skip_impossible = False
if base_assignments and isinstance(path_assign, dict):
for c in path_cons:
if len(c) == 4 and not skip_impossible:
field, op, val, want = c
root_var, chain = trace_to_root(field, base_assignments, data_fields, path_assign)
if root_var != field:
new_fn, new_op, new_val = invert_through_chain(root_var, chain, op, val)
if any(f['name'] == new_fn for f in data_fields):
asgn_val = path_assign.get(root_var)
if asgn_val is not None:
asgn_list = asgn_val if isinstance(asgn_val, list) else [asgn_val]
if asgn_list and asgn_list[-1]['type'] == 'move_literal' and root_var in rec:
if not _check_constraint_satisfied(rec, root_var, new_op, new_val, want, data_fields):
skip_impossible = True
break
if skip_impossible:
continue
# Pass B: 约束覆盖(确保决策条件满足,覆盖 MOVE 带来的值)
for c in path_cons:
if len(c) == 4:
field, op, val, want = c
apply_constraint(rec, field, op, val, want, data_fields, base_assignments, path_assign)
# Pass B.5: 前向再传播变量间MOVE,保持约束修改后的链一致性
if isinstance(path_assign, dict):
forward = {}
for tgt, asgn_val in path_assign.items():
asgn_list = asgn_val if isinstance(asgn_val, list) else [asgn_val]
filtered = [a for a in asgn_list if a['type'] == 'move' and a.get('source_vars')]
if filtered:
forward[tgt] = filtered
if forward:
propagate_assignments(rec, forward, data_fields, file_sec=file_sec)
# Pass C: 同步 REDEFINES(确保共享存储一致)
sync_redefined_fields(rec, data_fields)
# Pass D: OCCURS DEPENDING ON — 清零超范围的下标字段
apply_occurs_depending(rec, data_fields)
records.append(rec)
kept_path_cons.append(path_cons)
if not records:
rec = make_base_record(1, data_fields)
if base_assignments:
propagate_assignments(rec, base_assignments, data_fields, file_sec=file_sec)
records.append(rec)
return records
kept_path_cons.append([])
return records, kept_path_cons