e2a8d53e60
## 修复内容 ### C1: _mark_eval 反向操作符 (coverage.py) - EVALUATE 约束匹配支持 操作符 - WHEN OTHER 的自动检测(全部 WHEN 被否定时) ### C2: _mark_perform 反向操作符 (coverage.py) - PERFORM 同 _mark_if 的反向操作符匹配 - PERFORM UNTIL 条件截断后桥接器通过 branch_names 识别类型 ### H1: parse_single_condition 传递 fields (coverage.py) - collect_decision_points 调用时传 fields 参数 - NOT 前缀条件解析 (NOT WS-X > 50 → WS-X <= 50) ### H4: generate_data 输入约束 (__init__.py) - 文档注明接收原始源码,非预处理后文本 ### M1: not_map break (cond.py) - NOT 操作符映射循环添加 break ## 覆盖测试结果 - IF: 100% (T/F) - NOT IF: 100% (NOT_TRUE/NOT_FALSE) - PERFORM UNTIL: 100% (ENTER/SKIP) - EVALUATE: 100% (4 WHENs) - Nested IF: 100% (4 branches) - S15 回归: 17/17 PASS Co-Authored-By: Claude <noreply@anthropic.com>
173 lines
5.7 KiB
Python
173 lines
5.7 KiB
Python
"""Bridge: procedure_parser -> BrSeq/BrIf/BrEval tree pipeline integration.
|
|
|
|
Primary: new procedure_parser (fast, deterministic, no path explosion).
|
|
Fallback: old BrParser (timeout-guarded for programs new parser can't handle).
|
|
"""
|
|
|
|
from .models import BrSeq, BrIf, BrEval, BrPerform, BrSearch, GoTo
|
|
from .procedure_parser import extract_branch_tree as new_parse, BranchNode
|
|
|
|
|
|
def build_branch_tree_fallback(proc_text, fields=None):
|
|
"""New parser primary with old parser timeout fallback."""
|
|
from .core import build_branch_tree as old_build
|
|
|
|
# 1. New parser (fast, 10-50ms, no DP cap limit)
|
|
new_tree, new_assigns = None, {}
|
|
try:
|
|
root, assigns_list = new_parse(proc_text, fields)
|
|
new_tree = _convert_to_model(root)
|
|
new_assigns = _assigns_list_to_dict(assigns_list)
|
|
except Exception:
|
|
pass
|
|
|
|
# New parser generates O(N) paths (not O(2^N)), so no cap needed.
|
|
# Just use it directly when it works.
|
|
if new_tree is not None:
|
|
return new_tree, new_assigns
|
|
|
|
# 2. Old parser with 3s timeout (fallback only)
|
|
old_tree, old_assigns = None, {}
|
|
try:
|
|
import threading
|
|
r, e, d = [None], [None], [False]
|
|
def run():
|
|
try:
|
|
r[0] = old_build(proc_text, fields)
|
|
except Exception as ex:
|
|
e[0] = ex
|
|
d[0] = True
|
|
t = threading.Thread(target=run, daemon=True)
|
|
t.start(); t.join(3.0)
|
|
if d[0] and not e[0] and r[0]:
|
|
ot, oa = r[0]
|
|
old_tree, old_assigns = ot, oa
|
|
except Exception:
|
|
pass
|
|
|
|
if old_tree is not None:
|
|
return old_tree, old_assigns
|
|
return BrSeq(), {}
|
|
|
|
|
|
def _convert_to_model(root: BranchNode) -> BrSeq:
|
|
seq = BrSeq()
|
|
for c in root.children:
|
|
_convert_node(c, seq)
|
|
return seq
|
|
|
|
|
|
def _convert_node(node: BranchNode, parent: BrSeq):
|
|
k = node.kind
|
|
|
|
if k in ("PARAGRAPH", "SECTION", "PERFORM_CALL", "GO_TO", "EXIT", "CALL"):
|
|
for c in node.children:
|
|
_convert_node(c, parent)
|
|
return
|
|
|
|
if k == "IF":
|
|
br = BrIf(node.condition_text or " ".join(node.branch_names))
|
|
for c in node.children:
|
|
if c.kind == "ELSE":
|
|
for ec in c.children: _convert_node(ec, br.false_seq)
|
|
elif c.kind == "THEN":
|
|
for sc in c.children: _convert_node(sc, br.true_seq)
|
|
else:
|
|
_convert_node(c, br.true_seq)
|
|
parent.add(br)
|
|
return
|
|
|
|
if k == "EVALUATE":
|
|
subj = (node.branch_names or [""])[0]
|
|
subj = subj[5:-1] if subj.startswith("EVAL(") and subj.endswith(")") else subj
|
|
br = BrEval(subj)
|
|
for c in node.children:
|
|
if c.kind == "WHEN":
|
|
cond = (c.branch_names or [""])[0]
|
|
cond = cond[5:-1] if cond.startswith("WHEN(") and cond.endswith(")") else cond
|
|
# Strip trailing body text (everything after first COBOL verb)
|
|
cond = cond.split()[0] if cond.split() else cond
|
|
ws = BrSeq()
|
|
for wc in c.children: _convert_node(wc, ws)
|
|
if cond.upper() == "OTHER":
|
|
br.has_other = True
|
|
for wc in c.children: _convert_node(wc, br.other_seq)
|
|
else:
|
|
br.when_list.append((cond, ws))
|
|
parent.add(br)
|
|
return
|
|
|
|
if k == "PERFORM":
|
|
cond = node.condition_text or ""
|
|
br_names = [b.upper() for b in node.branch_names] if node.branch_names else []
|
|
if any('VARY' in b for b in br_names):
|
|
br = BrPerform("varying", condition=cond)
|
|
elif any('SKIP' in b or 'ENTER' in b for b in br_names):
|
|
br = BrPerform("until", condition=cond)
|
|
elif any('TIMES' in b for b in br_names):
|
|
br = BrPerform("times", condition=cond)
|
|
else:
|
|
br = BrPerform("until", condition=cond)
|
|
for c in node.children: _convert_node(c, br.body_seq)
|
|
parent.add(br)
|
|
return
|
|
|
|
if k == "READ":
|
|
for c in node.children: _convert_node(c, parent)
|
|
return
|
|
|
|
if k == "AT_END":
|
|
br = BrIf("AT END")
|
|
for c in node.children: _convert_node(c, br.true_seq)
|
|
parent.add(br)
|
|
return
|
|
|
|
if k == "NOT_AT_END":
|
|
for i in range(len(parent.children) - 1, -1, -1):
|
|
if isinstance(parent.children[i], BrIf):
|
|
for c in node.children:
|
|
_convert_node(c, parent.children[i].false_seq)
|
|
break
|
|
return
|
|
|
|
if k in ("SORT", "MERGE", "WHEN"):
|
|
name = " ".join(node.branch_names) if node.branch_names else k
|
|
parent.add(BrPerform("sort", condition=name))
|
|
return
|
|
|
|
if k == "GO_TO_DEPENDING":
|
|
parent.add(GoTo("DEPENDING"))
|
|
return
|
|
|
|
for c in node.children:
|
|
_convert_node(c, parent)
|
|
|
|
|
|
def _count_br_nodes(node) -> int:
|
|
count = 0
|
|
if isinstance(node, (BrIf, BrEval, BrPerform, BrSearch)):
|
|
count += 1
|
|
if isinstance(node, BrSeq):
|
|
for c in node.children: count += _count_br_nodes(c)
|
|
if isinstance(node, BrIf):
|
|
count += _count_br_nodes(node.true_seq) + _count_br_nodes(node.false_seq)
|
|
if isinstance(node, BrEval):
|
|
for _, s in node.when_list: count += _count_br_nodes(s)
|
|
count += _count_br_nodes(node.other_seq)
|
|
if isinstance(node, BrPerform):
|
|
count += _count_br_nodes(node.body_seq)
|
|
if isinstance(node, BrSearch):
|
|
count += _count_br_nodes(node.at_end_seq)
|
|
for _, s in node.when_list: count += _count_br_nodes(s)
|
|
return count
|
|
|
|
|
|
def _assigns_list_to_dict(assigns_list: list) -> dict:
|
|
result = {}
|
|
for a in assigns_list:
|
|
tgt = a.get("tgt", "")
|
|
src = a.get("src") or a.get("source_vars")
|
|
if tgt and src:
|
|
result[tgt] = [a]
|
|
return result
|