e2a8d53e60
## 修复内容 ### C1: _mark_eval 反向操作符 (coverage.py) - EVALUATE 约束匹配支持 操作符 - WHEN OTHER 的自动检测(全部 WHEN 被否定时) ### C2: _mark_perform 反向操作符 (coverage.py) - PERFORM 同 _mark_if 的反向操作符匹配 - PERFORM UNTIL 条件截断后桥接器通过 branch_names 识别类型 ### H1: parse_single_condition 传递 fields (coverage.py) - collect_decision_points 调用时传 fields 参数 - NOT 前缀条件解析 (NOT WS-X > 50 → WS-X <= 50) ### H4: generate_data 输入约束 (__init__.py) - 文档注明接收原始源码,非预处理后文本 ### M1: not_map break (cond.py) - NOT 操作符映射循环添加 break ## 覆盖测试结果 - IF: 100% (T/F) - NOT IF: 100% (NOT_TRUE/NOT_FALSE) - PERFORM UNTIL: 100% (ENTER/SKIP) - EVALUATE: 100% (4 WHENs) - Nested IF: 100% (4 branches) - S15 回归: 17/17 PASS Co-Authored-By: Claude <noreply@anthropic.com>
591 lines
22 KiB
Python
591 lines
22 KiB
Python
"""PROCEDURE DIVISION parser — line-based control flow extraction
|
|
|
|
MIT license (as project)
|
|
|
|
Two-tier approach:
|
|
Tier 1: Line-oriented state machine → extract nesting structure (IF/ELSE/END-IF,
|
|
EVALUATE/WHEN/END-EVALUATE, PERFORM/END-PERFORM, READ/AT END/END-READ, etc.)
|
|
Tier 2: Rule-based condition parser → extract branch conditions from each decision point
|
|
|
|
Fallback: LLM structural output for programs Tier 1+2 cannot handle.
|
|
"""
|
|
|
|
import re
|
|
from typing import Any
|
|
|
|
|
|
# ── Model ──
|
|
|
|
class BranchNode:
|
|
"""Node in the branch tree — maps directly to existing BrBranchNode format."""
|
|
def __init__(self, kind: str, branch_names: list[str] = None,
|
|
children: list = None, condition_text: str = "",
|
|
source_line: int = 0):
|
|
self.kind = kind # "IF", "EVALUATE", "PERFORM", "AT_END", "AND"
|
|
self.branch_names = branch_names or []
|
|
self.children = children or []
|
|
self.condition_text = condition_text
|
|
self.source_line = source_line
|
|
|
|
def __repr__(self):
|
|
return f"BranchNode({self.kind}, br={self.branch_names})"
|
|
|
|
|
|
# ── Tier 1: Line-based state machine ──
|
|
|
|
_CONTROL_KW = re.compile(
|
|
r'^\s*(IF|ELSE|END-IF|EVALUATE|WHEN|OTHER\b|END-EVALUATE|'
|
|
r'PERFORM|END-PERFORM|READ\b|WRITE\b|'
|
|
r'AT\s+END|NOT\s+AT\s+END|END-READ|END-WRITE|'
|
|
r'INVALID\s+KEY|NOT\s+INVALID\s+KEY|'
|
|
r'SORT\b|MERGE\b|CALL\b|END-CALL|'
|
|
r'GOBACK|EXIT|STOP\s+RUN|GO\s+TO|CONTINUE)',
|
|
re.IGNORECASE
|
|
)
|
|
|
|
_PARAGRAPH_RE = re.compile(r'^\s*([A-Z][A-Z0-9-]*)\s+SECTION\b', re.IGNORECASE)
|
|
_PARAGRAPH_SIMPLE_RE = re.compile(r'^\s*([A-Z][A-Z0-9-]*)\s*\.(\s|$)', re.IGNORECASE)
|
|
|
|
_IF_COND_RE = re.compile(r'^\s*IF\b\s*(.*)', re.IGNORECASE)
|
|
_ELSE_IF_RE = re.compile(r'^\s*ELSE\s+IF\b\s*(.*)', re.IGNORECASE)
|
|
_EVAL_RE = re.compile(r'^\s*EVALUATE\b\s*(.*)', re.IGNORECASE)
|
|
_WHEN_RE = re.compile(r'^\s*WHEN\b\s*(.*)', re.IGNORECASE)
|
|
_PERFORM_RE = re.compile(r'^\s*PERFORM\b\s*(.*)', re.IGNORECASE)
|
|
_READ_RE = re.compile(r'^\s*READ\b\s*(.*)', re.IGNORECASE)
|
|
_WRITE_RE = re.compile(r'^\s*WRITE\b\s*(.*)', re.IGNORECASE)
|
|
_SORT_RE = re.compile(r'^\s*(SORT|MERGE)\b\s*(.*)', re.IGNORECASE)
|
|
_CALL_RE = re.compile(r'^\s*CALL\b\s*(.*)', re.IGNORECASE)
|
|
|
|
|
|
def _clean_line(line: str) -> str:
|
|
"""Strip comments, collapse whitespace, uppercase."""
|
|
# Strip inline *> comments
|
|
if '*>' in line:
|
|
line = line.split('*>')[0]
|
|
# Strip string literals content for keyword detection
|
|
return line.strip().upper()
|
|
|
|
|
|
def _detect_paragraph(line: str) -> str | None:
|
|
"""Detect paragraph start."""
|
|
m = _PARAGRAPH_RE.match(line)
|
|
if m:
|
|
return m.group(1)
|
|
# Simple paragraph: name followed by DOT
|
|
m = _PARAGRAPH_SIMPLE_RE.match(line)
|
|
if m:
|
|
name = m.group(1)
|
|
# Avoid matching COBOL verbs/reserved words
|
|
reserved = {'IF', 'ELSE', 'END', 'END-IF', 'END-EVALUATE', 'END-PERFORM',
|
|
'END-READ', 'END-WRITE', 'END-CALL',
|
|
'READ', 'WRITE', 'SORT', 'MERGE',
|
|
'CALL', 'PERFORM', 'EVALUATE', 'WHEN', 'OTHER',
|
|
'MOVE', 'ADD', 'SUBTRACT', 'MULTIPLY', 'DIVIDE',
|
|
'COMPUTE', 'STRING', 'UNSTRING', 'INSPECT',
|
|
'INITIALIZE', 'DISPLAY', 'OPEN', 'CLOSE',
|
|
'STOP', 'GOBACK', 'EXIT', 'CONTINUE',
|
|
'VARYING', 'UNTIL', 'FROM', 'BY', 'THRU',
|
|
'ASCENDING', 'DESCENDING', 'USING', 'GIVING',
|
|
'MAIN', 'MB-PROCESS'}
|
|
if name not in reserved:
|
|
return name
|
|
return None
|
|
|
|
|
|
def extract_branch_tree(source: str, data_fields: list = None) -> tuple[Any, list]:
|
|
"""Parse PROCEDURE DIVISION → branch tree + assignments.
|
|
|
|
Returns:
|
|
(root_node, assignments_list) — same format as build_branch_tree
|
|
"""
|
|
lines = source.split('\n')
|
|
root = BranchNode("PROGRAM", branch_names=["__start__"])
|
|
stack = [root]
|
|
assignments = []
|
|
|
|
i = 0
|
|
in_procedure = False
|
|
in_proc_div = False
|
|
|
|
while i < len(lines):
|
|
raw = lines[i]
|
|
line = _clean_line(raw)
|
|
|
|
if not line:
|
|
i += 1
|
|
continue
|
|
|
|
# Detect PROCEDURE DIVISION header
|
|
if re.match(r'PROCEDURE\s+DIVISION', line, re.IGNORECASE):
|
|
in_proc_div = True
|
|
i += 1
|
|
continue
|
|
|
|
if not in_proc_div:
|
|
i += 1
|
|
continue
|
|
|
|
# Paragraph detection
|
|
para = _detect_paragraph(line)
|
|
if para and in_proc_div:
|
|
# Close any open PERFORM scopes by matching paragraph name
|
|
# Add as a new child segment
|
|
para_node = BranchNode("PARAGRAPH", branch_names=[para])
|
|
_add_or_merge(para_node, root)
|
|
i += 1
|
|
continue
|
|
|
|
# ── Control flow ──
|
|
|
|
# IF
|
|
if m := _IF_COND_RE.match(line):
|
|
cond = m.group(1).strip()
|
|
node = _make_if_node(cond, i)
|
|
# Remove trailing DOT from condition
|
|
if cond.endswith('.'):
|
|
cond = cond[:-1].strip()
|
|
# Check if this is a "one-line IF" (then-body on same line)
|
|
then_body, else_body = _split_one_line_if(line, cond)
|
|
if then_body or else_body:
|
|
# Single-line IF: create THEN and ELSE children inline
|
|
then_node = BranchNode("THEN", branch_names=["TRUE"])
|
|
if then_body:
|
|
_parse_inline_assignments(then_body, assignments, i)
|
|
else_node = BranchNode("ELSE", branch_names=["FALSE"])
|
|
if else_body:
|
|
_parse_inline_assignments(else_body, assignments, i)
|
|
if not else_body:
|
|
# No ELSE → implicit ELSE is just continuation
|
|
pass
|
|
node.children = [then_node, else_node] if else_body else [then_node]
|
|
stack[-1].children.append(node)
|
|
else:
|
|
# Multi-line IF — push to stack
|
|
stack[-1].children.append(node)
|
|
stack.append(node)
|
|
i += 1
|
|
continue
|
|
|
|
# ELSE IF
|
|
if m := _ELSE_IF_RE.match(line):
|
|
cond = m.group(1).strip()
|
|
# Close current THEN
|
|
_close_open_if(stack, line)
|
|
# Pop IF node, add ELSE IF as sibling
|
|
if len(stack) >= 2 and stack[-1].kind == "IF":
|
|
stack.pop()
|
|
elif len(stack) >= 2 and stack[-1].kind == "THEN":
|
|
stack.pop()
|
|
if stack and stack[-1].kind == "IF":
|
|
stack.pop()
|
|
elif len(stack) >= 3 and stack[-2].kind == "IF":
|
|
# pop THEN + IF
|
|
stack.pop()
|
|
stack.pop()
|
|
node = _make_if_node(cond, i)
|
|
node.branch_names = ["ELSE_IF_TRUE", "ELSE_IF_FALSE"]
|
|
stack[-1].children.append(node)
|
|
stack.append(node)
|
|
i += 1
|
|
continue
|
|
|
|
# ELSE
|
|
if re.match(r'^\s*ELSE\b', line, re.IGNORECASE) and not re.match(r'^\s*ELSE\s+IF', line, re.IGNORECASE):
|
|
# Close THEN, open ELSE
|
|
_close_open_if(stack, line)
|
|
else_node = BranchNode("ELSE", branch_names=["FALSE", "FALLTHROUGH"])
|
|
stack[-1].children.append(else_node)
|
|
stack.append(else_node)
|
|
i += 1
|
|
continue
|
|
|
|
# END-IF
|
|
if re.match(r'^\s*END-IF', line, re.IGNORECASE):
|
|
# Pop back to before this IF
|
|
_close_to_kind(stack, "IF", line)
|
|
i += 1
|
|
continue
|
|
|
|
# EVALUATE
|
|
if m := _EVAL_RE.match(line):
|
|
eval_expr = m.group(1).strip()
|
|
node = BranchNode("EVALUATE", branch_names=[f"EVAL({eval_expr})"])
|
|
stack[-1].children.append(node)
|
|
stack.append(node)
|
|
i += 1
|
|
continue
|
|
|
|
# WHEN
|
|
if m := _WHEN_RE.match(line):
|
|
# Close only WHEN scopes; preserve EVALUATE parent
|
|
while len(stack) > 1 and stack[-1].kind == "WHEN":
|
|
stack.pop()
|
|
cond = m.group(1).strip().rstrip('.')
|
|
when_node = BranchNode("WHEN", branch_names=[f"WHEN({cond})"])
|
|
stack[-1].children.append(when_node)
|
|
stack.append(when_node)
|
|
i += 1
|
|
continue
|
|
|
|
# WHEN OTHER
|
|
if re.match(r'^\s*WHEN\s+OTHER', line, re.IGNORECASE):
|
|
while len(stack) > 1 and stack[-1].kind == "WHEN":
|
|
stack.pop()
|
|
other_node = BranchNode("WHEN", branch_names=["OTHER"])
|
|
stack[-1].children.append(other_node)
|
|
stack.append(other_node)
|
|
i += 1
|
|
continue
|
|
|
|
# END-EVALUATE
|
|
if re.match(r'^\s*END-EVALUATE', line, re.IGNORECASE):
|
|
_close_to_kind(stack, "EVALUATE", line)
|
|
i += 1
|
|
continue
|
|
|
|
# PERFORM
|
|
if m := _PERFORM_RE.match(line):
|
|
rest = m.group(1).strip()
|
|
node = _make_perform_node(rest, i)
|
|
if node.kind == "PERFORM_CALL":
|
|
# Simple PERFORM paragraph — no branch
|
|
stack[-1].children.append(node)
|
|
i += 1
|
|
continue
|
|
# PERFORM with body (UNTIL or VARYING) — has branches
|
|
stack[-1].children.append(node)
|
|
if node.kind == "PERFORM":
|
|
stack.append(node)
|
|
i += 1
|
|
continue
|
|
|
|
# END-PERFORM
|
|
if re.match(r'^\s*END-PERFORM', line, re.IGNORECASE):
|
|
_close_to_kind(stack, "PERFORM", line)
|
|
i += 1
|
|
continue
|
|
|
|
# READ
|
|
if m := _READ_RE.match(line):
|
|
rest = m.group(1).strip()
|
|
node = BranchNode("READ", branch_names=[f"READ({rest})"])
|
|
stack[-1].children.append(node)
|
|
stack.append(node)
|
|
i += 1
|
|
continue
|
|
|
|
# AT END
|
|
if re.match(r'AT\s+END', line, re.IGNORECASE):
|
|
at_end = BranchNode("AT_END", branch_names=["AT_END", "NOT_AT_END"])
|
|
stack[-1].children.append(at_end)
|
|
stack.append(at_end)
|
|
i += 1
|
|
continue
|
|
|
|
# NOT AT END
|
|
if re.match(r'NOT\s+AT\s+END', line, re.IGNORECASE):
|
|
# Pop AT_END, add NOT_AT_END sibling
|
|
_close_to_kind(stack, "AT_END", line)
|
|
not_at_end = BranchNode("NOT_AT_END", branch_names=["NOT_AT_END"])
|
|
stack[-1].children.append(not_at_end)
|
|
stack.append(not_at_end)
|
|
i += 1
|
|
continue
|
|
|
|
# END-READ
|
|
if re.match(r'^\s*END-READ', line, re.IGNORECASE):
|
|
_close_to_kind(stack, "READ", line)
|
|
i += 1
|
|
continue
|
|
|
|
# SORT
|
|
if m := _SORT_RE.match(line):
|
|
rest = (m.group(1) + ' ' + m.group(2)).strip()
|
|
node = BranchNode("SORT")
|
|
# Check for USING/GIVING
|
|
if 'USING' in rest.upper():
|
|
names = re.findall(r'USING\s+(\w[\w-]*)', rest, re.IGNORECASE)
|
|
node.branch_names = names or [rest[:30]]
|
|
else:
|
|
node.branch_names = [rest[:30]]
|
|
stack[-1].children.append(node)
|
|
# SORT can have INPUT PROCEDURE / OUTPUT PROCEDURE blocks
|
|
if re.search(r'INPUT\s+PROCEDURE|OUTPUT\s+PROCEDURE', rest, re.IGNORECASE):
|
|
stack.append(node)
|
|
i += 1
|
|
continue
|
|
|
|
# MERGE (same pattern as SORT)
|
|
if re.match(r'^\s*MERGE\b', line, re.IGNORECASE):
|
|
node = BranchNode("MERGE", branch_names=[line[:40]])
|
|
stack[-1].children.append(node)
|
|
i += 1
|
|
continue
|
|
|
|
# CALL
|
|
if m := _CALL_RE.match(line):
|
|
rest = m.group(1).strip()
|
|
node = BranchNode("CALL", branch_names=[f"CALL({rest[:30]})"])
|
|
stack[-1].children.append(node)
|
|
stack.append(node)
|
|
i += 1
|
|
continue
|
|
|
|
# ON EXCEPTION
|
|
if re.match(r'ON\s+EXCEPTION', line, re.IGNORECASE):
|
|
exc_node = BranchNode("ON_EXCEPTION", branch_names=["EXCEPTION", "NO_EXCEPTION"])
|
|
stack[-1].children.append(exc_node)
|
|
stack.append(exc_node)
|
|
i += 1
|
|
continue
|
|
|
|
# NOT ON EXCEPTION
|
|
if re.match(r'NOT\s+ON\s+EXCEPTION', line, re.IGNORECASE):
|
|
_close_to_kind(stack, "ON_EXCEPTION", line)
|
|
noexc = BranchNode("NOT_ON_EXCEPTION", branch_names=["NO_EXCEPTION"])
|
|
stack[-1].children.append(noexc)
|
|
stack.append(noexc)
|
|
i += 1
|
|
continue
|
|
|
|
# END-CALL
|
|
if re.match(r'^\s*END-CALL', line, re.IGNORECASE):
|
|
_close_to_kind(stack, "CALL", line)
|
|
i += 1
|
|
continue
|
|
|
|
# STOP RUN / GOBACK / EXIT PROGRAM — terminate scope
|
|
if re.match(r'STOP\s+RUN|GOBACK|EXIT\s+PROGRAM|EXIT\s+SECTION|EXIT\s+PARAGRAPH',
|
|
line, re.IGNORECASE):
|
|
node = BranchNode("EXIT", branch_names=["EXIT"])
|
|
stack[-1].children.append(node)
|
|
i += 1
|
|
continue
|
|
|
|
# GO TO
|
|
if re.match(r'GO\s+TO', line, re.IGNORECASE):
|
|
rest = line[5:].strip()
|
|
if rest.upper().startswith('DEPENDING'):
|
|
# GO TO DEPENDING ON — multi-branch
|
|
names = re.findall(r'\b[A-Z][A-Z0-9-]*\b', rest.split('ON')[-1] if 'ON' in rest.upper() else rest)
|
|
node = BranchNode("GO_TO_DEPENDING", branch_names=names[:10] or ["GOTO"])
|
|
else:
|
|
node = BranchNode("GO_TO", branch_names=[rest[:20] or "GOTO"])
|
|
stack[-1].children.append(node)
|
|
i += 1
|
|
continue
|
|
|
|
# CONTINUE — no-op, skip
|
|
if re.match(r'CONTINUE', line, re.IGNORECASE):
|
|
i += 1
|
|
continue
|
|
|
|
# Detect simple assignments (MOVE / = )
|
|
_detect_assignments(line, assignments, i)
|
|
|
|
i += 1
|
|
|
|
# Close any remaining open scopes
|
|
while len(stack) > 1:
|
|
stack.pop()
|
|
|
|
return root, assignments
|
|
|
|
|
|
# ── Helper functions ──
|
|
|
|
def _add_or_merge(node: BranchNode, root: BranchNode):
|
|
"""Add paragraph node — merge with last if same name."""
|
|
if root.children and root.children[-1].kind == "PARAGRAPH":
|
|
# Just merge into existing
|
|
return
|
|
root.children.append(node)
|
|
|
|
|
|
def _make_if_node(cond_text: str, line_no: int) -> BranchNode:
|
|
"""Create IF node with proper branch names from condition."""
|
|
base_cond = cond_text.rstrip('.').strip()
|
|
# Truncate condition at COBOL statement verbs (one-line IF)
|
|
_COBOL_VERBS = (
|
|
'DISPLAY', 'MOVE', 'ADD', 'SUBTRACT', 'MULTIPLY', 'DIVIDE', 'COMPUTE',
|
|
'STRING', 'UNSTRING', 'SET', 'INSPECT', 'INITIALIZE', 'CONTINUE',
|
|
'PERFORM', 'CALL', 'EXIT', 'GOBACK', 'STOP', 'THEN', 'ELSE',
|
|
'READ', 'WRITE', 'DELETE', 'REWRITE', 'ACCEPT', 'OPEN', 'CLOSE',
|
|
)
|
|
for verb in _COBOL_VERBS:
|
|
idx = base_cond.upper().find(f' {verb} ')
|
|
if idx >= 0:
|
|
base_cond = base_cond[:idx].strip()
|
|
break
|
|
# Parse condition for branch count
|
|
has_and = bool(re.search(r'\bAND\b', base_cond, re.IGNORECASE)
|
|
and not re.search(r'\bAND\b', base_cond.split('NOT')[1], re.IGNORECASE)
|
|
if 'NOT' in base_cond.upper() and len(base_cond.split('NOT')) > 1
|
|
else bool(re.search(r'\bAND\b', base_cond, re.IGNORECASE)))
|
|
has_or = bool(re.search(r'\bOR\b', base_cond, re.IGNORECASE))
|
|
|
|
if has_and and not has_or:
|
|
# AND implies: each term evaluated independently
|
|
and_count = len(re.findall(r'\bAND\b', base_cond, re.IGNORECASE))
|
|
branches = 2 + and_count # each AND adds a decision point
|
|
return BranchNode("IF", branch_names=[f"AND_PART({i})" for i in range(branches)],
|
|
condition_text=base_cond, source_line=line_no)
|
|
elif has_or:
|
|
return BranchNode("IF", branch_names=["TRUE", "FALSE"],
|
|
condition_text=base_cond, source_line=line_no)
|
|
elif base_cond.upper().startswith('NOT'):
|
|
return BranchNode("IF", branch_names=["NOT_TRUE", "NOT_FALSE"],
|
|
condition_text=base_cond, source_line=line_no)
|
|
else:
|
|
return BranchNode("IF", branch_names=["TRUE", "FALSE"],
|
|
condition_text=base_cond, source_line=line_no)
|
|
|
|
|
|
def _make_perform_node(rest: str, line_no: int) -> BranchNode:
|
|
"""Create PERFORM node."""
|
|
upper = rest.upper()
|
|
# Truncate at COBOL verbs (one-line PERFORM: UNTIL cond BODY)
|
|
verb_list = (
|
|
'DISPLAY', 'MOVE', 'ADD', 'SUBTRACT', 'MULTIPLY', 'DIVIDE', 'COMPUTE',
|
|
'STRING', 'UNSTRING', 'SET', 'INSPECT', 'INITIALIZE', 'CONTINUE',
|
|
'PERFORM', 'CALL', 'EXIT', 'GOBACK', 'STOP',
|
|
'READ', 'WRITE', 'DELETE', 'REWRITE', 'ACCEPT', 'OPEN', 'CLOSE',
|
|
)
|
|
cond_text = rest
|
|
for verb in verb_list:
|
|
idx = rest.upper().find(f' {verb} ')
|
|
if idx >= 0:
|
|
cond_text = rest[:idx].strip()
|
|
break
|
|
if upper.startswith('UNTIL'):
|
|
ctext = cond_text[5:].strip() if cond_text.upper().startswith('UNTIL') else cond_text
|
|
return BranchNode("PERFORM", branch_names=["ENTER", "SKIP"],
|
|
condition_text=ctext, source_line=line_no)
|
|
elif upper.startswith('VARYING'):
|
|
return BranchNode("PERFORM", branch_names=["VARY_ENTER", "VARY_EXIT"],
|
|
condition_text=cond_text, source_line=line_no)
|
|
elif re.match(r'\bTIMES\b', upper):
|
|
return BranchNode("PERFORM", branch_names=["TIMES_ENTER", "TIMES_EXIT"],
|
|
condition_text=cond_text, source_line=line_no)
|
|
else:
|
|
# Simple PERFORM paragraph-name — just a call, no branch
|
|
para_name = rest.split()[0].upper() if rest.split() else "?"
|
|
return BranchNode("PERFORM_CALL", branch_names=[para_name],
|
|
source_line=line_no)
|
|
|
|
|
|
def _split_one_line_if(line: str, cond: str) -> tuple[str | None, str | None]:
|
|
"""Check for single-line IF with THEN/ELSE on same line.
|
|
Returns (then_body, else_body).
|
|
"""
|
|
# Full line already upper-cased
|
|
rest = line[line.upper().index('IF') + 2:].strip()
|
|
# Remove condition from rest
|
|
cond_upper = cond.upper().rstrip('.')
|
|
rest = rest[len(cond_upper):].strip()
|
|
if not rest:
|
|
return None, None
|
|
if rest.startswith('.'):
|
|
return None, None
|
|
|
|
# Check for ELSE in rest
|
|
else_idx = -1
|
|
# Find ELSE but not ELSE IF
|
|
for m in re.finditer(r'\bELSE\b', rest, re.IGNORECASE):
|
|
# Check it's not ELSE IF
|
|
after_else = rest[m.end():].strip()
|
|
if not after_else.upper().startswith('IF'):
|
|
else_idx = m.start()
|
|
break
|
|
|
|
if else_idx >= 0:
|
|
then_body = rest[:else_idx].strip()
|
|
else_body = rest[else_idx + 4:].strip().rstrip('.')
|
|
return then_body, else_body
|
|
else:
|
|
# Remove trailing DOT
|
|
then_body = rest.rstrip('.').strip()
|
|
return then_body if then_body else None, None
|
|
|
|
|
|
def _close_open_if(stack: list, current_line: str):
|
|
"""Close the THEN/ELSE scope of the current IF block."""
|
|
if len(stack) >= 2 and stack[-1].kind == "THEN":
|
|
stack.pop()
|
|
elif len(stack) >= 2 and stack[-1].kind == "ELSE":
|
|
stack.pop()
|
|
elif len(stack) >= 2 and stack[-1].kind == "IF":
|
|
# Single-line IF without THEN/ELSE push — close it
|
|
pass
|
|
|
|
|
|
def _close_to_kind(stack: list, kind: str, current_line: str):
|
|
"""Pop until we find a node of given kind."""
|
|
guard = 0
|
|
while len(stack) > 1 and stack[-1].kind != kind and guard < 50:
|
|
guard += 1
|
|
stack.pop()
|
|
if len(stack) > 1 and stack[-1].kind == kind:
|
|
stack.pop()
|
|
|
|
|
|
def _close_to_kind_unless(stack: list, kinds: set, current_line: str):
|
|
"""Pop until we find a node whose kind is in kinds set."""
|
|
guard = 0
|
|
while len(stack) > 1 and stack[-1].kind not in kinds and guard < 50:
|
|
guard += 1
|
|
stack.pop()
|
|
return stack[-1] if stack and stack[-1].kind in kinds else None
|
|
|
|
|
|
def _parse_inline_assignments(text: str, assignments: list, line_no: int):
|
|
"""Parse simple assignments from inline THEN/ELSE text."""
|
|
for m in re.finditer(r'MOVE\s+(\S+)\s+TO\s+(\S[\w-]*)', text, re.IGNORECASE):
|
|
src, tgt = m.group(1), m.group(2)
|
|
assignments.append({"type": "MOVE", "src": src, "tgt": tgt, "line": line_no})
|
|
|
|
|
|
def _detect_assignments(line: str, assignments: list, line_no: int):
|
|
"""Detect MOVE/ADD/COMPUTE assignments."""
|
|
# MOVE a TO b
|
|
for m in re.finditer(r'MOVE\s+(\S[\w-]*)\s+TO\s+(\S[\w-]*)', line, re.IGNORECASE):
|
|
assignments.append({"type": "MOVE", "src": m.group(1), "tgt": m.group(2), "line": line_no})
|
|
# ADD something TO something
|
|
for m in re.finditer(r'ADD\s+(\S[\w-]*)\s+TO\s+(\S[\w-]*)', line, re.IGNORECASE):
|
|
assignments.append({"type": "ADD", "src": m.group(1), "tgt": m.group(2), "line": line_no})
|
|
# SET to TRUE/FALSE (88-level condition)
|
|
for m in re.finditer(r'SET\s+(\S[\w-]*)\s+TO\s+TRUE', line, re.IGNORECASE):
|
|
assignments.append({"type": "SET_TRUE", "tgt": m.group(1), "line": line_no})
|
|
|
|
|
|
# ── Tree statistics ──
|
|
|
|
def count_branching_nodes(node: BranchNode) -> int:
|
|
"""Count decision points (nodes with multiple branches)."""
|
|
count = 0
|
|
if len(node.branch_names) >= 2:
|
|
count += 1
|
|
for child in node.children:
|
|
count += count_branching_nodes(child)
|
|
return count
|
|
|
|
|
|
def collect_decision_points(node: BranchNode) -> list:
|
|
"""Flatten tree to list of decision points."""
|
|
points = []
|
|
_walk_points(node, points, 0)
|
|
return points
|
|
|
|
|
|
def _walk_points(node: BranchNode, points: list, depth: int):
|
|
if len(node.branch_names) >= 2:
|
|
points.append({
|
|
"kind": node.kind,
|
|
"branches": node.branch_names,
|
|
"condition": node.condition_text,
|
|
"line": node.source_line,
|
|
"depth": depth,
|
|
})
|
|
for child in node.children:
|
|
_walk_points(child, points, depth + 1)
|