"""核心层:PROCEDURE DIVISION解析 + 数据流追踪""" import re import logging from datetime import datetime from .models import BrSeq, BrIf, BrEval, BrPerform, BrSearch, BrSeq, CondLeaf, CondNot, ParseError, Assign, CallNode, ExitNode, GoTo from .cond import parse_compound_condition, parse_single_condition, collect_leaves logger = logging.getLogger(__name__) _COBOL_SCOPE_ENDERS = { 'END-IF', 'END-EVALUATE', 'END-PERFORM', 'END-EXEC', 'END-CALL', 'END-READ', 'END-WRITE', 'END-DELETE', 'END-REWRITE', 'END-START', 'END-SEARCH', 'ELSE', 'WHEN', 'OTHER', } def scan_paragraphs(raw_lines): paragraphs = {} i = 0 while i < len(raw_lines): line = raw_lines[i].strip() m = re.match(r'^([A-Z0-9][A-Z0-9-]*)\.\s*$', line) sec_m = re.match(r'^([A-Z][A-Z0-9-]*)\s+SECTION\.?\s*$', line, re.IGNORECASE) if m and m.group(1) not in _COBOL_SCOPE_ENDERS: name = m.group(1) elif sec_m: name = sec_m.group(1).upper() else: i += 1 continue start = i + 1 j = i + 1 while j < len(raw_lines): nline = raw_lines[j].strip() nm = re.match(r'^([A-Z0-9][A-Z0-9-]*)\.\s*$', nline) if nm and nm.group(1) not in _COBOL_SCOPE_ENDERS: break if re.match(r'^[A-Z][A-Z0-9-]*\s+SECTION\.\s*$', nline, re.IGNORECASE): break j += 1 paragraphs[name] = (start, j - 1) i = j return paragraphs def build_branch_tree(proc_text, fields=None): raw_lines = proc_text.split('\n') paragraphs = scan_paragraphs(raw_lines) first_para_name = None first_para_idx = None for i, line in enumerate(raw_lines): clean = line.strip() m = re.match(r'^([A-Z0-9][A-Z0-9-]*)\.\s*$', clean) if m and m.group(1) in paragraphs: first_para_name = m.group(1) first_para_idx = i break if first_para_name: before = raw_lines[:first_para_idx] has_code = any( l.strip() and 'PROCEDURE DIVISION' not in l for l in before ) if has_code: main_raw = raw_lines[:first_para_idx] else: p_start, p_end = paragraphs[first_para_name] main_raw = raw_lines[p_start:p_end + 1] else: main_raw = raw_lines filtered = [l for l in main_raw if l.strip()] assignments = {} parser = _BrParser(filtered, paragraphs, raw_lines, assignments, fields) tree = parser.parse_seq(terminators={'GOBACK', 'STOP RUN', 'EXIT PROGRAM'}) return tree, assignments # ── 定数 ── _FIGURATIVE_CONSTANTS = frozenset({ 'ZERO', 'ZEROS', 'ZEROES', 'SPACE', 'SPACES', 'HIGH-VALUE', 'HIGH-VALUES', 'LOW-VALUE', 'LOW-VALUES', }) # ── _BrParser ── class _BrParser: def __init__(self, lines, paragraphs=None, raw_lines=None, assignments=None, fields=None, goto_depth=0): self.lines = lines self.pos = 0 self.paragraphs = paragraphs or {} self.raw_lines = raw_lines or lines # assignments is a dict[str, list[dict]] — append, never overwrite self.assignments = assignments if assignments is not None else {} self.fields = fields self._goto_depth = goto_depth def peek(self): if self.pos < len(self.lines): return self.lines[self.pos].strip() return '' def clean(self): return self.peek().rstrip('.').strip() def advance(self): self.pos += 1 def parse_seq(self, end_tokens=None, end_check=None, terminators=None): if end_tokens is None: end_tokens = [] seq = BrSeq() while self.pos < len(self.lines): line = self.clean() if self._is_end(line, end_tokens, end_check): return seq if terminators and line in terminators: self.advance() return seq m_goto = re.match(r'^GO\s+TO\s+(\w[\w-]*)\s*$', line) if m_goto: goto_node = self._parse_goto(m_goto.group(1)) if goto_node: seq.add(goto_node) while self.pos < len(self.lines): cl = self.clean() if self._is_end(cl, end_tokens, end_check): break if cl in _COBOL_SCOPE_ENDERS: break self.advance() return seq m_exit = re.match(r'^EXIT\s+(PARAGRAPH|PERFORM|SECTION)\s*$', line) if m_exit: self.advance() seq.add(ExitNode(m_exit.group(1))) while self.pos < len(self.lines): cl = self.clean() if self._is_end(cl, end_tokens, end_check): break if cl in _COBOL_SCOPE_ENDERS: break self.advance() return seq m = re.match(r'^IF\s+(.+?)(?:THEN)?\s*$', line) if m: seq.add(self._parse_if()) continue m = re.match(r'^EVALUATE\s+(.+?)\s*$', line) if m: seq.add(self._parse_evaluate()) continue m = re.match(r'^PERFORM\s+', line) if m: perf_node = self._parse_perform() if perf_node: seq.add(perf_node) continue m_search = re.match(r'^SEARCH\b(?:\s+(ALL))?\s+(\w[\w-]*)(?:\s+VARYING\s+(\w[\w-]*))?', line, re.IGNORECASE) if m_search: seq.add(self._parse_search(m_search)) continue m = re.match(r'^INITIALIZE\s+', line) if m: init_seq = self._parse_initialize() if init_seq: seq.add(init_seq) continue m_str = re.match(r'^STRING\s+', line) if m_str: str_seq = self._parse_string() if str_seq: seq.add(str_seq) continue m_unstr = re.match(r'^UNSTRING\s+', line) if m_unstr: unstr_seq = self._parse_unstring() if unstr_seq: seq.add(unstr_seq) continue m = re.match(r'^CALL\s+', line) if m: seq.add(self._parse_call()) continue m = re.match( r'^ACCEPT\s+(\w[\w-]*)(?:\s+FROM\s+(DATE|TIME|DAY|DAY-OF-WEEK|YEAR|YYYYMMDD|HHMMSS))?\s*$', line, re.IGNORECASE ) if m: tgt = m.group(1).strip().upper() from_type = (m.group(2) or 'USER').upper() info = {'type': 'accept', 'from': from_type} self.assignments.setdefault(tgt, []).append(info) seq.add(Assign(tgt, info)) self.advance() continue m = re.match(r'^READ\s+(\w[\w-]*)\s+INTO\s+(\w[\w-]*)\s*$', line, re.IGNORECASE) if m: tgt = m.group(2).strip().upper() info = {'type': 'read_into', 'file': m.group(1).strip().upper(), 'source_vars': []} self.assignments.setdefault(tgt, []).append(info) seq.add(Assign(tgt, info)) self.advance() # 跳过 READ 语句剩余行(AT END / NOT AT END / END-READ) while self.pos < len(self.lines): cl = self.clean() if cl in ('END-READ', 'END-READ.'): self.advance() break self.advance() continue m_set_false = re.match(r'^SET\s+(\w[\w-]*)\s+TO\s+FALSE\s*$', line, re.IGNORECASE) if m_set_false: seq.add(self._parse_set_false(m_set_false.group(1))) continue m = re.match(r'^(?:WRITE|REWRITE)\s+(\w[\w-]*)(?:\s+FROM\s+(\w[\w-]*))?\s*$', line, re.IGNORECASE) if m: rec_name = m.group(1).strip().upper() if m.group(2): tgt = m.group(2).strip().upper() info = {'type': 'write_from', 'file': rec_name, 'source_vars': [tgt]} self.assignments.setdefault(tgt, []).append(info) seq.add(Assign(tgt, info)) else: seq.add(Assign(rec_name, {'type': 'write_bare', 'file': rec_name})) self.advance() continue m_set = re.match(r'^SET\s+(\w[\w-]*)\s+TO\s+TRUE\s*$', line, re.IGNORECASE) if m_set: seq.add(self._parse_set_true(m_set.group(1))) continue m_insp = re.match(r'^INSPECT\s+', line, re.IGNORECASE) if m_insp: info = self._parse_inspect(line) if info: tgt = info.get('tgt', '') self.assignments.setdefault(tgt, []).append(info) seq.add(Assign(tgt, info)) self.advance() continue assign_node = self._record_assignment(line) if assign_node: seq.add(assign_node) self.advance() return seq def _is_end(self, line, end_tokens, end_check): if end_check and end_check(line): return True for tok in end_tokens: if line == tok or line.startswith(tok + ' '): return True return False # ── INSPECT ── _PIC_FIG_CONV = {'ZERO': '0', 'ZEROS': '0', 'ZEROES': '0', 'SPACE': ' ', 'SPACES': ' '} @staticmethod def _expand_figurative(val): if val.upper() in _BrParser._PIC_FIG_CONV: return _BrParser._PIC_FIG_CONV[val.upper()] return val def _parse_inspect_phrase(self, phrase): m = re.match( r'TALLYING\s+(\w[\w-]*)\s+FOR\s+' r'(LEADING|TRAILING|CHARACTERS)' r'(?:\s+([\'"])(.*?)\3)?' r'(?:\s+(BEFORE|AFTER)\s+INITIAL\s+([\'"])(.*?)\6)?\s*$', phrase, re.IGNORECASE ) if m: return ('tally', { 'count_var': m.group(1).upper(), 'kind': m.group(2).upper(), 'char': self._expand_figurative(m.group(4) or ''), 'before_after': (m.group(5) or '').upper(), 'delimiter': self._expand_figurative(m.group(7) or ''), }) m = re.match( r'REPLACING\s+' r'(ALL|LEADING|FIRST|CHARACTERS)\s+' r'([\'"])(.*?)\2\s+BY\s+' r'([\'"])(.*?)\4' r'(?:\s+(BEFORE|AFTER)\s+INITIAL\s+([\'"])(.*?)\7)?\s*$', phrase, re.IGNORECASE ) if m: return ('replace', { 'kind': m.group(1).upper(), 'src': self._expand_figurative(m.group(3)), 'dst': self._expand_figurative(m.group(5)), 'before_after': (m.group(6) or '').upper(), 'delimiter': self._expand_figurative(m.group(8) or ''), }) m = re.match( r'CONVERTING\s+([\'"])(.*?)\1\s+TO\s+([\'"])(.*?)\3\s*$', phrase, re.IGNORECASE ) if m: return ('convert', { 'from_chars': self._expand_figurative(m.group(2)), 'to_chars': self._expand_figurative(m.group(4)), }) return None def _parse_inspect(self, line): m = re.match(r'^INSPECT\s+(\w[\w-]*)\s+(.+)$', line, re.IGNORECASE) if not m: return None tgt = m.group(1).upper() rest = m.group(2).strip() phrases = re.split(r'\s+(?=(?:TALLYING|REPLACING|CONVERTING)\b)', rest, flags=re.IGNORECASE) sub_ops = [] for phrase in phrases: sub = self._parse_inspect_phrase(phrase.strip()) if sub: sub_ops.append(sub) if not sub_ops: return None return { 'type': 'inspect', 'tgt': tgt, 'source_vars': [tgt], 'sub_ops': sub_ops, } def _record_assignment(self, line): if self.assignments is None: return None # MOVE m = re.match(r'^MOVE\s+(.+?)\s+TO\s+(.+?)\s*$', line) if m: raw_src = m.group(1).strip() tgt = m.group(2).strip() # 保留下标:WS-CODE-VAL(1) → key='WS-CODE-VAL(1)' m_tgt = re.match(r'^([A-Z][A-Z0-9-]*)(?:\s*\(([^)]*)\))?\s*$', tgt, re.IGNORECASE) if not m_tgt: return None tgt_base = m_tgt.group(1).upper() if m_tgt.group(2): subscript = re.sub(r'\s*', '', m_tgt.group(2)) tgt_key = f"{tgt_base}({subscript})" else: tgt_key = tgt_base src_clean = raw_src.strip("'").strip('"') is_field_name = self.fields and any(f['name'] == src_clean for f in self.fields) if is_field_name: info = {'type': 'move', 'source_vars': [src_clean]} else: info = {'type': 'move_literal', 'literal': src_clean} self.assignments.setdefault(tgt_key, []).append(info) return Assign(tgt_key, info) # COMPUTE m = re.match(r'^COMPUTE\s+(.+?)(?:\s+ROUNDED)?\s*=\s*(.*)$', line) if m: tgt_raw = m.group(1).strip() expr = m.group(2).strip() m_tgt = re.match(r'^([A-Z][A-Z0-9-]*)(?:\s*\(([^)]*)\))?\s*$', tgt_raw, re.IGNORECASE) tgt_key = tgt_raw if m_tgt: tgt_base = m_tgt.group(1).upper() if m_tgt.group(2): subscript = re.sub(r'\s*', '', m_tgt.group(2)) tgt_key = f"{tgt_base}({subscript})" else: tgt_key = tgt_base if not expr: peek_pos = self.pos + 1 if peek_pos < len(self.lines): nxt = self.lines[peek_pos].strip().rstrip('.').strip() if nxt and not re.match(r'^(PERFORM|END-|IF|ELSE|EVALUATE|WHEN|OTHER|MOVE|COMPUTE|ADD|SUBTRACT|MULTIPLY|DIVIDE|STRING|UNSTRING|READ|WRITE|INITIALIZE|ACCEPT|CALL|GO\s*TO|GOBACK|STOP|EXIT)', nxt, re.IGNORECASE): expr = nxt if expr: info = self._parse_compute_expr(tgt_key, expr) self.assignments.setdefault(tgt_key, []).append(info) return Assign(tgt_key, info) # ADD x TO y → y = y + x (支持变量和常量源) m = re.match(r'^ADD\s+(\w[\w-]*)\s+TO\s+(\w[\w-]*?)(?:\s+ROUNDED)?\s*$', line) if m: src = m.group(1).strip() tgt = m.group(2).strip() is_field = self.fields and any(f['name'] == src for f in self.fields) if is_field: info = {'type': 'compute', 'source_vars': [tgt, src], 'op': '+', 'const': None, 'expr': f'{tgt} + {src}'} else: try: const = float(src) info = {'type': 'compute', 'source_vars': [tgt], 'op': '+', 'const': const, 'expr': f'{tgt} + {const}'} except ValueError: return None self.assignments.setdefault(tgt, []).append(info) return Assign(tgt, info) # ADD x TO y GIVING z → z = y + x m = re.match(r'^ADD\s+(.+?)\s+TO\s+(\w[\w-]*)\s+GIVING\s+(\w[\w-]*?)(?:\s+ROUNDED)?\s*$', line, re.IGNORECASE) if m: raw_a = m.group(1).strip() src_b = m.group(2).strip() tgt = m.group(3).strip() is_field_a = self.fields and any(f['name'] == raw_a for f in self.fields) if is_field_a: info = {'type': 'compute', 'source_vars': [src_b, raw_a], 'op': '+', 'const': None, 'expr': f'{src_b} + {raw_a}'} else: try: const = float(raw_a) info = {'type': 'compute', 'source_vars': [src_b], 'op': '+', 'const': const, 'expr': f'{src_b} + {const}'} except ValueError: return None self.assignments.setdefault(tgt, []).append(info) return Assign(tgt, info) # ADD a[, b[, c...]] GIVING z → z = a + b + c + ... m = re.match(r'^ADD\s+(.+?)\s+GIVING\s+(\w[\w-]*?)(?:\s+ROUNDED)?\s*$', line, re.IGNORECASE) if m: raw_parts = re.findall(r'[A-Z][A-Z0-9-]*|\d+(?:\.\d+)?', m.group(1).upper()) fields_only = [] const_sum = 0.0 for p in raw_parts: if self.fields and any(f['name'] == p for f in self.fields): fields_only.append(p) else: try: const_sum += float(p) except ValueError: pass tgt = m.group(2).strip() if not fields_only: info = {'type': 'move_literal', 'literal': str(int(const_sum)) if const_sum == int(const_sum) else str(const_sum)} else: info = {'type': 'compute', 'source_vars': fields_only, 'op': '+', 'const': const_sum if const_sum != 0 else None, 'expr': '+'.join(fields_only) + (f' + {const_sum}' if const_sum else '')} self.assignments.setdefault(tgt, []).append(info) return Assign(tgt, info) # SUBTRACT x FROM y → y = y - x m = re.match(r'^SUBTRACT\s+([\d.]+)\s+FROM\s+(\w[\w-]*?)(?:\s+ROUNDED)?\s*$', line) if m: const = float(m.group(1)) tgt = m.group(2).strip() info = {'type': 'compute', 'source_vars': [tgt], 'op': '-', 'const': const, 'expr': f'{tgt} - {const}'} self.assignments.setdefault(tgt, []).append(info) return Assign(tgt, info) # SUBTRACT a FROM b GIVING z → z = b - a m = re.match(r'^SUBTRACT\s+([\d.\w-]*)\s+FROM\s+(\w[\w-]*)\s+GIVING\s+(\w[\w-]*?)(?:\s+ROUNDED)?\s*$', line, re.IGNORECASE) if m: raw_a = m.group(1).strip() src_b = m.group(2).strip() tgt = m.group(3).strip() is_field_a = self.fields and any(f['name'] == raw_a for f in self.fields) if is_field_a: info = {'type': 'compute', 'source_vars': [src_b, raw_a], 'op': '-', 'const': None, 'expr': f'{src_b} - {raw_a}'} else: try: const = float(raw_a) info = {'type': 'compute', 'source_vars': [src_b], 'op': '-', 'const': const, 'expr': f'{src_b} - {const}'} except ValueError: return None self.assignments.setdefault(tgt, []).append(info) return Assign(tgt, info) # MULTIPLY x BY y → y = y * x m = re.match(r'^MULTIPLY\s+([\d.]+)\s+BY\s+(\w[\w-]*?)(?:\s+ROUNDED)?\s*$', line) if m: const = float(m.group(1)) tgt = m.group(2).strip() info = {'type': 'compute', 'source_vars': [tgt], 'op': '*', 'const': const, 'expr': f'{tgt} * {const}'} self.assignments.setdefault(tgt, []).append(info) return Assign(tgt, info) # MULTIPLY a BY b GIVING z → z = a * b m = re.match(r'^MULTIPLY\s+(\w[\w-]*)\s+BY\s+(\w[\w-]*)\s+GIVING\s+(\w[\w-]*?)(?:\s+ROUNDED)?\s*$', line, re.IGNORECASE) if m: src_a = m.group(1).strip() src_b = m.group(2).strip() tgt = m.group(3).strip() is_field_a = self.fields and any(f['name'] == src_a for f in self.fields) if is_field_a: info = {'type': 'compute', 'source_vars': [src_a, src_b], 'op': '*', 'const': None, 'expr': f'{src_a} * {src_b}'} else: try: const = float(src_a) info = {'type': 'compute', 'source_vars': [src_b], 'op': '*', 'const': const, 'expr': f'{const} * {src_b}'} except ValueError: return None self.assignments.setdefault(tgt, []).append(info) return Assign(tgt, info) # DIVIDE x INTO y → y = y / x m = re.match(r'^DIVIDE\s+([\d.]+)\s+INTO\s+(\w[\w-]*?)(?:\s+ROUNDED)?\s*$', line) if m: const = float(m.group(1)) tgt = m.group(2).strip() info = {'type': 'compute', 'source_vars': [tgt], 'op': '/', 'const': const, 'expr': f'{tgt} / {const}'} self.assignments.setdefault(tgt, []).append(info) return Assign(tgt, info) # DIVIDE a INTO b GIVING z → z = b / a # Optional REMAINDER r → r = b - (b / a) * a m = re.match(r'^DIVIDE\s+(.+?)\s+INTO\s+(\w[\w-]*)\s+GIVING\s+(\w[\w-]*?)(?:\s+ROUNDED)?(?:\s+REMAINDER\s+(\w[\w-]*))?\s*$', line, re.IGNORECASE) if m: raw_a = m.group(1).strip() src_b = m.group(2).strip() tgt = m.group(3).strip() rem_tgt = m.group(4).strip().upper() if m.group(4) else None is_field_a = self.fields and any(f['name'] == raw_a for f in self.fields) if is_field_a: info = {'type': 'compute', 'source_vars': [src_b, raw_a], 'op': '/', 'const': None, 'expr': f'{src_b} / {raw_a}'} rem_info = {'type': 'compute', 'source_vars': [src_b, raw_a], 'op': 'rem', 'const': None, 'expr': f'REM({src_b} / {raw_a})'} else: try: const = float(raw_a) info = {'type': 'compute', 'source_vars': [src_b], 'op': '/', 'const': const, 'expr': f'{src_b} / {const}'} rem_info = {'type': 'compute', 'source_vars': [src_b], 'op': 'rem', 'const': const, 'expr': f'REM({src_b} / {const})'} except ValueError: return None self.assignments.setdefault(tgt, []).append(info) seq = BrSeq() seq.add(Assign(tgt, info)) if rem_tgt: self.assignments.setdefault(rem_tgt, []).append(rem_info) seq.add(Assign(rem_tgt, rem_info)) return seq # DIVIDE a BY b GIVING z → z = a / b # Optional REMAINDER r → r = a - (a / b) * b m = re.match(r'^DIVIDE\s+(\w[\w-]*)\s+BY\s+(\w[\w-]*)\s+GIVING\s+(\w[\w-]*?)(?:\s+ROUNDED)?(?:\s+REMAINDER\s+(\w[\w-]*))?\s*$', line, re.IGNORECASE) if m: src_a = m.group(1).strip() src_b = m.group(2).strip() tgt = m.group(3).strip() rem_tgt = m.group(4).strip().upper() if m.group(4) else None info = {'type': 'compute', 'source_vars': [src_a, src_b], 'op': '/', 'const': None, 'expr': f'{src_a} / {src_b}'} rem_info = {'type': 'compute', 'source_vars': [src_a, src_b], 'op': 'rem', 'const': None, 'expr': f'REM({src_a} / {src_b})'} self.assignments.setdefault(tgt, []).append(info) seq = BrSeq() seq.add(Assign(tgt, info)) if rem_tgt: self.assignments.setdefault(rem_tgt, []).append(rem_info) seq.add(Assign(rem_tgt, rem_info)) return seq return None def _parse_compute_expr(self, target, expr): # const OP var m = re.match(r'^\s*([\d.]+)\s*([+\-*/])\s*(\w[\w-]*)\s*$', expr) if m: const, op, var = float(m.group(1)), m.group(2), m.group(3) return {'type': 'compute', 'source_vars': [var], 'op': op, 'const': const, 'expr': expr} # var OP const m = re.match(r'^\s*(\w[\w-]*)\s*([+\-*/])\s*([\d.]+)\s*$', expr) if m: var, op, const = m.group(1), m.group(2), float(m.group(3)) return {'type': 'compute', 'source_vars': [var], 'op': op, 'const': const, 'expr': expr} # var OP var m = re.match(r'^\s*(\w[\w-]*)\s*([+\-*/])\s*(\w[\w-]*)\s*$', expr) if m: var1, op, var2 = m.group(1), m.group(2), m.group(3) return {'type': 'compute', 'source_vars': [var1, var2], 'op': op, 'expr': expr} # complex expression — extract variable names only vars_in = re.findall(r'[A-Z][A-Z0-9-]*', expr.upper()) return {'type': 'compute', 'source_vars': list(set(vars_in)), 'op': None, 'const': None, 'expr': expr} # ── SEARCH / SEARCH ALL ── def _parse_search(self, m): is_all = bool(m.group(1)) table = m.group(2).upper() varying = m.group(3).upper() if m.group(3) else None node = BrSearch(table, is_all=is_all, varying=varying) self.advance() while self.pos < len(self.lines): line = self.clean() if line in ('END-SEARCH', 'END-SEARCH.'): self.advance() return node m_at = re.match(r'^AT\s+END(.+)?$', line, re.IGNORECASE) if m_at: self.advance() rest = m_at.group(1) if rest and rest.strip(): self.lines.insert(self.pos, rest.strip()) node.at_end_seq = self.parse_seq( end_check=lambda l: re.match(r'^WHEN\b', l) or l in ('END-SEARCH',) ) node.has_at_end = True continue m_when = re.match(r'^WHEN\s+(.+?)\s*$', line, re.IGNORECASE) if m_when: cond_upper = m_when.group(1).strip() self.advance() cond_tree = parse_compound_condition(cond_upper, self.fields) body_seq = self.parse_seq( end_check=lambda l: re.match(r'^(WHEN|AT\s+END)\b', l) or l in ('END-SEARCH',) ) node.when_list.append((cond_upper, body_seq)) node.cond_trees.append(cond_tree) continue self.advance() return node def _parse_if(self): line = self.clean() m = re.match(r'^IF\s+(.+?)(?:THEN)?\s*$', line) cond_text = m.group(1).strip() self.advance() # Join continuation lines (multi-line IF conditions) while self.pos < len(self.lines): peek = self.clean() if re.match(r'^(THEN|ELSE|END-IF|MOVE|IF|PERFORM|EVALUATE|COMPUTE|CALL|STRING|UNSTRING|INITIALIZE|ADD|SUBTRACT|MULTIPLY|DIVIDE|GO\b|EXIT\b)', peek, re.IGNORECASE): break if peek.endswith('.'): cond_text += ' ' + peek.rstrip('.') self.advance() break cond_text += ' ' + peek self.advance() # Consume optional THEN on its own line if self.pos < len(self.lines): peek = self.clean() if peek == 'THEN': self.advance() node = BrIf(cond_text) node.cond_tree = parse_compound_condition(node.condition, self.fields) node.true_seq = self.parse_seq(['ELSE', 'END-IF']) if self.clean() == 'ELSE': self.advance() node.false_seq = self.parse_seq(['END-IF']) if self.clean() == 'END-IF': self.advance() return node def _parse_evaluate(self): line = self.clean() m = re.match(r'^EVALUATE\s+(.+?)\s*$', line) raw_subject = m.group(1).strip() node = BrEval(raw_subject) if ' ALSO ' in raw_subject: node.subjects = [s.strip() for s in re.split(r'\s+ALSO\s+', raw_subject)] self.advance() while self.pos < len(self.lines): line = self.clean() if line == 'END-EVALUATE': self.advance() return node m = re.match(r'^WHEN\s+(.+?)\s*$', line) if m: raw_val = m.group(1).strip().strip("'").strip('"') self.advance() # Capture multi-line WHEN conditions (AND/OR continuation) while self.pos < len(self.lines): peek = self.clean() if re.match(r'^(?:AND|OR)\b', peek, re.IGNORECASE): raw_val += ' ' + peek self.advance() else: break if raw_val == 'OTHER': node.other_seq = self.parse_seq(end_check=lambda l: l == 'END-EVALUATE') node.has_other = True else: case_seq = self.parse_seq(end_check=lambda l: l.startswith('WHEN') or l == 'END-EVALUATE') if node.subjects: vals = [v.strip().strip("'").strip('"') for v in re.split(r'\s+ALSO\s+', raw_val)] node.when_list.append((vals, case_seq)) else: node.when_list.append((raw_val, case_seq)) continue self.advance() return node def _parse_perform(self): line = self.clean() m = re.match(r'^PERFORM\s+UNTIL\s+(.+?)\s*$', line) if m: node = BrPerform('until', condition=m.group(1).strip()) self.advance() node.body_seq = self.parse_seq(end_check=lambda l: l == 'END-PERFORM') if self.clean() == 'END-PERFORM': self.advance() return node m = re.match(r'^PERFORM\s+(\w[\w-]*)\s+UNTIL\s+(.+?)\s*$', line) if m: target = m.group(1).strip() node = BrPerform('para_until', target=target, condition=m.group(2).strip()) self.advance() self._inline_perform(node, target) return node m = re.match(r'^PERFORM\s+(\d+)\s+TIMES\s*$', line) if m: node = BrPerform('times', times=int(m.group(1))) self.advance() return node m = re.match(r'^PERFORM\s+(\w[\w-]*)\s+THRU\s+(\w[\w-]*)\s*$', line) if m: node = BrPerform('thru', target=m.group(1).strip(), thru=m.group(2).strip()) self.advance() self._inline_perform(node, node.target, node.thru) return node m = re.match(r'^PERFORM\s+VARYING\s+(\w[\w-]*)\s+FROM\s+(\S+)\s+BY\s+(\S+)(?:\s+UNTIL\s+(.+))?\s*$', line) if m: varying_var = m.group(1).strip() from_val = m.group(2).strip() by_val = m.group(3).strip() condition = m.group(4).strip() if m.group(4) else None if not condition: save_pos = self.pos self.advance() while self.pos < len(self.lines): nxt = self.clean() cm = re.match(r'^UNTIL\s+(.+)$', nxt) if cm: condition = cm.group(1).strip() self.advance() break fm = re.match(r'^FROM\s+(\S+)\s+BY\s+(\S+)$', nxt) if fm: from_val = fm.group(1).strip() by_val = fm.group(2).strip() self.advance() continue self.pos = save_pos break if condition: node = BrPerform('varying', condition=condition, varying_var=varying_var, varying_from=from_val, varying_by=by_val) # condition from regex (single-line) → advance past PERFORM line # condition from while-loop (multi-line) → already advanced past FROM/BY/UNTIL if m.group(4): self.advance() node.body_seq = self.parse_seq(end_check=lambda l: l == 'END-PERFORM') if self.clean() == 'END-PERFORM': self.advance() return node self.pos = save_pos # PERFORM VARYING var — FROM/BY/UNTIL all on subsequent lines m = re.match(r'^PERFORM\s+VARYING\s+(\w[\w-]*)\s*$', line) if m: varying_var = m.group(1).strip() save_pos = self.pos self.advance() from_val = by_val = condition = None while self.pos < len(self.lines): nxt = self.clean() fm = re.match(r'^FROM\s+(\S+)\s+BY\s+(\S+)$', nxt) if fm: from_val, by_val = fm.group(1).strip(), fm.group(2).strip() self.advance() continue um = re.match(r'^UNTIL\s+(.+)$', nxt) if um: condition = um.group(1).strip() self.advance() break break if from_val and by_val and condition: node = BrPerform('varying', condition=condition, varying_var=varying_var, varying_from=from_val, varying_by=by_val) node.body_seq = self.parse_seq(end_check=lambda l: l == 'END-PERFORM') if self.clean() == 'END-PERFORM': self.advance() return node self.pos = save_pos m = re.match(r'^PERFORM\s+(\w[\w-]*)\s+VARYING\s+(\w[\w-]*)\s+FROM\s+(\S+)\s+BY\s+(\S+)(?:\s+UNTIL\s+(.+))?\s*$', line) if m: target = m.group(1).strip() varying_var = m.group(2).strip() from_val = m.group(3).strip() by_val = m.group(4).strip() condition = m.group(5).strip() if m.group(5) else None if not condition: save_pos = self.pos self.advance() while self.pos < len(self.lines): nxt = self.clean() cm = re.match(r'^UNTIL\s+(.+)$', nxt) if cm: condition = cm.group(1).strip() self.advance() break self.pos = save_pos break if condition: node = BrPerform('para_varying', target=target, condition=condition, varying_var=varying_var, varying_from=from_val, varying_by=by_val) self.advance() self._inline_perform(node, node.target) return node self.pos = save_pos m = re.match(r'^PERFORM\s+(\w[\w-]*)\s*$', line) if m: target = m.group(1).strip() node = BrPerform('para', target=target) self.advance() self._inline_perform(node, target) return node self.advance() return None def _inline_perform(self, node, target, thru=None): if thru: if target in self.paragraphs and thru in self.paragraphs: start = self.paragraphs[target][0] end = self.paragraphs[thru][1] all_lines = [] for name, (s, e) in self.paragraphs.items(): if s >= start and e <= end: all_lines.extend(self.raw_lines[s:e + 1]) sub = _BrParser( [l for l in all_lines if l.strip()], self.paragraphs, self.raw_lines, self.assignments, self.fields ) node.body_seq = sub.parse_seq() elif target in self.paragraphs: start, end = self.paragraphs[target] para_lines = self.raw_lines[start:end + 1] sub = _BrParser( [l for l in para_lines if l.strip()], self.paragraphs, self.raw_lines, self.assignments, self.fields ) node.body_seq = sub.parse_seq() def _parse_initialize(self): line = self.clean() m = re.match(r'^INITIALIZE\s+(.+?)\s*$', line) if not m: self.advance() return None rest = m.group(1).strip() # Split off REPLACING clause parts = re.split(r'\s+REPLACING\s+', rest, maxsplit=1, flags=re.IGNORECASE) target_str = parts[0].strip() targets = re.findall(r'[A-Z][A-Z0-9-]*', target_str) # Parse REPLACING: (NUMERIC|ALPHANUMERIC|ALPHABETIC) DATA BY literal replacing = {} if len(parts) > 1: pairs = re.findall( r'(NUMERIC|ALPHANUMERIC-EDITED|NUMERIC-EDITED|ALPHANUMERIC|ALPHABETIC)\s+DATA\s+BY\s+(\S+)', parts[1], re.IGNORECASE ) for ptype, literal in pairs: replacing[ptype.upper()] = literal.strip("'").strip('"') seq = BrSeq() for tgt in targets: info = {'type': 'initialize'} if replacing: info['replacing'] = replacing self.assignments.setdefault(tgt, []).append(info) seq.add(Assign(tgt, info)) self.advance() return seq def _parse_string(self): parts = [self.clean()] self.advance() while self.pos < len(self.lines): cl = self.clean() if cl == 'END-STRING': self.advance() break parts.append(cl) self.advance() full = ' '.join(parts) m = re.match(r'^STRING\s+(.+)\s+INTO\s+(\w[\w-]*)\s*$', full, re.IGNORECASE | re.DOTALL) if not m: return None source_part = m.group(1).strip() target = m.group(2).strip() source_vars = re.findall(r'[A-Z][A-Z0-9-]*', source_part) info = {'type': 'string_concat', 'source_vars': source_vars} self.assignments.setdefault(target, []).append(info) seq = BrSeq() seq.add(Assign(target, info)) return seq def _parse_unstring(self): parts = [self.clean()] self.advance() while self.pos < len(self.lines): cl = self.clean() if cl == 'END-UNSTRING': self.advance() break parts.append(cl) self.advance() full = ' '.join(parts) m = re.match(r'^UNSTRING\s+(.+?)\s+INTO\s+(.+?)\s*$', full, re.IGNORECASE | re.DOTALL) if not m: return None source_part = m.group(1).strip() targets_part = m.group(2).strip() source_vars = re.findall(r'[A-Z][A-Z0-9-]*', source_part) targets = re.findall(r'[A-Z][A-Z0-9-]*', targets_part) source_var = source_vars[0] if source_vars else '' seq = BrSeq() for tgt in targets: info = {'type': 'unstring_split', 'source_vars': [source_var], 'index': targets.index(tgt)} self.assignments.setdefault(tgt, []).append(info) seq.add(Assign(tgt, info)) return seq def _parse_call(self): line = self.clean() m = re.match(r'^CALL\s+(\S+?)(?:\s+USING\s+(.+))?\s*$', line) if not m: self.advance() return BrSeq() prog = m.group(1).strip("'\"").upper() params = [] if m.group(2): rest = m.group(2) # 逐 segment 解析: BY mechanism names... current = "reference" # COBOL 默认 BY REFERENCE for seg in re.split(r'\s+(?=BY\s+(?:REFERENCE|CONTENT|VALUE)\s+)', rest, flags=re.IGNORECASE): seg = seg.strip() m_mech = re.match( r'BY\s+(REFERENCE|CONTENT|VALUE)\s+(.*)', seg, re.IGNORECASE ) if m_mech: current = m_mech.group(1).lower() names_text = m_mech.group(2) else: names_text = seg for nm in re.findall(r'\w[\w-]*', names_text): params.append({"name": nm.upper(), "mechanism": current}) node = CallNode(prog, using_params=params) self.advance() return node def _parse_goto(self, target): node = GoTo(target) if self._goto_depth < 10 and target in self.paragraphs: start, end = self.paragraphs[target] para_lines = self.raw_lines[start:end + 1] sub = _BrParser( [l for l in para_lines if l.strip()], self.paragraphs, self.raw_lines, self.assignments, self.fields, goto_depth=self._goto_depth + 1 ) node.body_seq = sub.parse_seq() self.advance() return node def _parse_set_true(self, name): name = name.upper() parent = None value = None if self.fields: for f in self.fields: if f.get('is_88') and f['name'] == name: parent = f.get('parent', '') value = f.get('value', '') break info = {'type': 'set_true', '88_name': name, 'value': value} tgt = parent or name if parent: self.assignments.setdefault(tgt, []).append(info) self.advance() return Assign(tgt, info) def _parse_set_false(self, name): name = name.upper() parent = None value = None if self.fields: for f in self.fields: if f.get('is_88') and f['name'] == name: parent = f.get('parent', '') value = f.get('value', '') break # FALSE 值 = 88-level VALUE 的反值 if value: false_val = 'N' if value == 'Y' else ('Y' if value == 'N' else ' ') else: false_val = 'N' info = {'type': 'move_literal', 'literal': false_val} tgt = parent or name self.assignments.setdefault(tgt, []).append(info) self.advance() return Assign(tgt, info) # ── 工具函数 ── def _basename(name: str) -> str: """去除下标后缀,如 WS-TABLE(1) → WS-TABLE""" return re.sub(r'\s*\(.*?\)\s*$', '', name).strip() def _init_child_names(group_name: str, fields: list) -> list: """递归收集 group 下所有非 88 级子字段的扁平名列表""" result = [] grp_level = None found = False for f in fields: if not found and f['name'] == group_name: grp_level = f.get('level', 0) found = True continue if found: if f.get('level', 0) <= grp_level or f.get('level') == 77: break if f.get('is_88') or f.get('redefines'): continue if not f.get('pic_info') or f['pic_info'].get('type') == 'unknown': result.extend(_init_child_names(f['name'], fields)) else: result.append(f['name']) return result # ── 数据流追踪 ── def trace_to_root(field_name, assignments, fields, path_assign=None): seen = set() var = field_name chain = [] while var in assignments and var not in seen: seen.add(var) if path_assign and var in path_assign: asgn_list = path_assign[var] if isinstance(asgn_list, list): asgn = asgn_list[-1] for a in reversed(asgn_list): sv = a.get('source_vars', []) if len(sv) == 1 and sv[0] == var: continue asgn = a break else: asgn = asgn_list else: asgn_list = assignments[var] asgn = asgn_list[-1] if isinstance(asgn_list, list): for a in reversed(asgn_list): sv = a.get('source_vars', []) if len(sv) == 1 and sv[0] == var: continue asgn = a break chain.append((var, asgn)) if not asgn.get('source_vars'): break sv = asgn['source_vars'] if len(sv) == 1: next_var = sv[0] if next_var == var: break var = next_var if next_var not in assignments: break elif len(sv) >= 2 and asgn.get('op') == '+': # 多源加法:取第一个源变量继续追溯 var = sv[0] else: break return var, chain def invert_through_chain(root_var, chain, operator, value): op = operator try: val = float(value) except (ValueError, TypeError): return root_var, op, value for var, asgn in reversed(chain): if asgn['type'] == 'move': continue sv = asgn.get('source_vars', []) if asgn['type'] == 'compute' and asgn['op'] is not None: if len(sv) == 1: c = asgn['const'] inv = {'+': '-', '-': '+', '*': '/', '/': '*'}[asgn['op']] if inv == '/': val = val / c if c != 0 else val elif inv == '*': val = val * c elif inv == '-': val = val - c elif inv == '+': val = val + c elif len(sv) >= 2 and asgn['op'] == '+': # 多源加法:追溯第一个源变量,值不变(忽略其他源) pass if val == int(val): return root_var, op, str(int(val)) return root_var, op, str(val) FIGURATIVE_NUMERIC = { 'ZERO': 0.0, 'ZEROS': 0.0, 'ZEROES': 0.0, 'SPACE': 0.0, 'SPACES': 0.0, 'HIGH-VALUE': None, 'HIGH-VALUES': None, 'LOW-VALUE': 0.0, 'LOW-VALUES': 0.0, } FIGURATIVE_ALPHA = { 'SPACE': ' ', 'SPACES': ' ', 'HIGH-VALUE': chr(255), 'HIGH-VALUES': chr(255), 'LOW-VALUE': chr(0), 'LOW-VALUES': chr(0), } def _resolve_subscript(key, rec): """将变量下标解析为具体值:WS-FIXED-KEY(WS-IDX) → WS-FIXED-KEY(1) if WS-IDX=1 in rec""" m = re.match(r'^(\w[\w-]*)\((\w[\w-]*)\)$', key) if m: base, var = m.groups() if var in rec: try: return f'{base}({int(rec[var])})' except (ValueError, TypeError): pass return key def _apply_before_after(val, before_after, delimiter): if not delimiter: return val if before_after == 'BEFORE': idx = val.find(delimiter) return val[:idx] if idx >= 0 else val if before_after == 'AFTER': idx = val.find(delimiter) return val[idx + len(delimiter):] if idx >= 0 else '' return val def propagate_assignments(rec, assignments, fields, file_sec=None): def raw_to_float(val, pi): if pi.get('type') == 'numeric': digits = pi.get('digits', 0) decimal = pi.get('decimal', 0) total = digits + decimal s = str(val) neg = s.startswith('-') if neg: s = s[1:] s = s.zfill(total) int_part = s[:digits] if digits else '0' dec_part = s[digits:] if decimal > 0 else '0' result = float(int(int_part or '0') + int(dec_part or '0') / (10 ** decimal)) return -result if neg else result try: return float(val) except (ValueError, TypeError): return 0.0 def float_to_raw(val, pi): if pi.get('type') == 'numeric': digits = pi.get('digits', 0) decimal = pi.get('decimal', 0) signed = pi.get('signed', False) scaled = int(round(val * (10 ** decimal))) if not signed and scaled < 0: scaled = 0 capped = abs(scaled) % (10 ** (digits + decimal)) int_part = str(capped // (10 ** decimal)).zfill(digits) dec_part = str(capped % (10 ** decimal)).zfill(decimal) result = int_part + (dec_part if decimal > 0 else '') if signed and scaled < 0: result = '-' + result return result return str(val) def literal_to_raw(literal, pi): ftype = pi.get('type', 'unknown') if ftype == 'numeric': key = literal.upper() if key in FIGURATIVE_NUMERIC: v = FIGURATIVE_NUMERIC[key] if v is None: digits = pi.get('digits', 0) decimal = pi.get('decimal', 0) v = 10 ** (digits + decimal) - 1 return float_to_raw(v, pi) try: return float_to_raw(float(literal), pi) except ValueError: return float_to_raw(0.0, pi) if ftype in ('alphanumeric', 'alphabetic'): key = literal.upper() if key in FIGURATIVE_ALPHA: ch = FIGURATIVE_ALPHA[key] return ch[0].ljust(pi.get('length', 1), ch[0]) return literal.ljust(pi.get('length', len(literal)))[:pi.get('length', len(literal))] return literal pi_map = {f['name']: f.get('pic_info', {}) for f in fields} if file_sec is None: file_sec = {} # Flatten: {tgt: [info1, info2]} → [(tgt, info1), (tgt, info2)] flat_list = [] for tgt, asgn_val in assignments.items(): if isinstance(asgn_val, list): for asgn in asgn_val: flat_list.append((tgt, asgn)) elif isinstance(asgn_val, dict): flat_list.append((tgt, asgn_val)) _MAX_CONVERGE = 20 # 识别有"锚定赋值"(非自引用赋值,如 MOVE literal 或不同字段的 MOVE) 的 target _anchored = set() for tgt, asgn in flat_list: if asgn.get('type') != 'compute': _anchored.add(tgt) else: sv = asgn.get('source_vars', []) if not (len(sv) == 1 and sv[0] == tgt) and not (len(sv) >= 2 and tgt == sv[0]): _anchored.add(tgt) for _converge_iter in range(_MAX_CONVERGE): _old = dict(rec) # Pass 1: variable-to-variable MOVE for tgt, asgn in flat_list: if asgn['type'] == 'move' and asgn['source_vars']: src = asgn['source_vars'][0] resolved_tgt = _resolve_subscript(tgt, rec) resolved_src = _resolve_subscript(src, rec) if resolved_src in rec: rec[resolved_tgt] = rec[resolved_src] # Pass 2: literal MOVE for tgt, asgn in flat_list: if asgn['type'] == 'move_literal': resolved_tgt = _resolve_subscript(tgt, rec) pi = pi_map.get(resolved_tgt, {}) rec[resolved_tgt] = literal_to_raw(asgn['literal'], pi) # Pass 3: INITIALIZE for tgt, asgn in flat_list: if asgn['type'] == 'initialize': resolved_tgt = _resolve_subscript(tgt, rec) pi = pi_map.get(resolved_tgt, {}) ftype = pi.get('type', 'unknown') replacing = asgn.get('replacing', {}) if replacing: mapped = replacing.get(ftype.upper(), None) if mapped: rec[resolved_tgt] = literal_to_raw(mapped, pi) else: if ftype == 'numeric': rec[resolved_tgt] = float_to_raw(0.0, pi) else: rec[resolved_tgt] = literal_to_raw('SPACE', pi) else: if ftype == 'numeric': rec[resolved_tgt] = float_to_raw(0.0, pi) else: rec[resolved_tgt] = literal_to_raw('SPACE', pi) # Pass 3.5: READ INTO for tgt, asgn in flat_list: if asgn['type'] == 'read_into': fname = asgn.get('file', '') if fname in file_sec: fd_children = _init_child_names(file_sec[fname][0], fields) ws_children = _init_child_names(tgt, fields) for ws_c in ws_children: fd_candidate = ws_c if ws_c.startswith('WS-'): fd_candidate = ws_c[3:] if fd_candidate in rec: rec[ws_c] = rec[fd_candidate] else: idx = ws_children.index(ws_c) if idx < len(fd_children) and fd_children[idx] in rec: rec[ws_c] = rec[fd_children[idx]] rec[tgt] = ''.join(str(rec.get(c, '')) for c in ws_children) # Pass 4: COMPUTE for tgt, asgn in flat_list: if asgn['type'] == 'compute' and asgn['source_vars'] and asgn['op'] is not None: resolved_tgt = _resolve_subscript(tgt, rec) pi_tgt = pi_map.get(resolved_tgt, {}) if len(asgn['source_vars']) == 1: src = asgn['source_vars'][0] resolved_src = _resolve_subscript(src, rec) # 无锚定的自引用 COMPUTE(如 ADD 1 TO X):只在第 0 轮应用一次 if resolved_tgt == resolved_src and tgt not in _anchored and _converge_iter > 0: continue if resolved_src in rec: sv = raw_to_float(rec[resolved_src], pi_map.get(resolved_src, {})) c = asgn.get('const', 0) if asgn['op'] == 'rem': quotient = int(sv / c) if c != 0 else 0 result = sv - quotient * c else: result = {'+': sv + c, '-': sv - c, '*': sv * c, '/': sv / c if c != 0 else sv}[asgn['op']] rec[resolved_tgt] = float_to_raw(result, pi_tgt) elif len(asgn['source_vars']) == 2: v1, v2 = asgn['source_vars'] resolved_v1 = _resolve_subscript(v1, rec) resolved_v2 = _resolve_subscript(v2, rec) # 无锚定的自引用 COMPUTE(如 ADD X TO Y 且 Y 无前置 MOVE) if resolved_tgt == resolved_v1 and tgt not in _anchored and _converge_iter > 0: continue if resolved_v1 in rec and resolved_v2 in rec: sv1 = raw_to_float(rec[resolved_v1], pi_map.get(resolved_v1, {})) sv2 = raw_to_float(rec[resolved_v2], pi_map.get(resolved_v2, {})) if asgn['op'] == 'rem': quotient = int(sv1 / sv2) if sv2 != 0 else 0 result = sv1 - quotient * sv2 else: result = {'+': sv1 + sv2, '-': sv1 - sv2, '*': sv1 * sv2, '/': sv1 / sv2 if sv2 != 0 else sv1}[asgn['op']] rec[resolved_tgt] = float_to_raw(result, pi_tgt) elif len(asgn['source_vars']) >= 3 and asgn['op'] == '+': total = 0 all_found = True for v in asgn['source_vars']: resolved_v = _resolve_subscript(v, rec) if resolved_v in rec: total += raw_to_float(rec[resolved_v], pi_map.get(resolved_v, {})) else: all_found = False break if all_found: rec[resolved_tgt] = float_to_raw(total, pi_tgt) # Pass 4.5: INSPECT for tgt, asgn in flat_list: if asgn['type'] != 'inspect': continue resolved_tgt = _resolve_subscript(tgt, rec) if resolved_tgt not in rec: continue src_val = str(rec[resolved_tgt]) for op_type, params in asgn.get('sub_ops', []): if op_type == 'tally': cv = params['count_var'].upper() cv_pi = pi_map.get(cv, {}) effective = _apply_before_after(src_val, params.get('before_after'), params.get('delimiter')) cnt = 0 if params['kind'] == 'LEADING': cnt = len(effective) - len(effective.lstrip(params['char'])) elif params['kind'] == 'TRAILING': cnt = len(effective) - len(effective.rstrip(params['char'])) else: cnt = len(effective) if cv_pi.get('type') == 'numeric': rec[cv] = float_to_raw(float(cnt), cv_pi) elif op_type == 'replace': effective = _apply_before_after(src_val, params.get('before_after'), params.get('delimiter')) if params['kind'] == 'ALL': new_val = effective.replace(params['src'], params['dst']) elif params['kind'] == 'LEADING': new_val = effective while new_val.startswith(params['src']): new_val = new_val[len(params['src']):] new_val = effective.replace(params['src'], params['dst'], 1) elif params['kind'] == 'FIRST': new_val = effective.replace(params['src'], params['dst'], 1) else: new_val = params['dst'] * len(effective) rec[resolved_tgt] = new_val elif op_type == 'convert': effective = _apply_before_after(src_val, params.get('before_after'), params.get('delimiter')) table = str.maketrans(params['from_chars'], params['to_chars']) rec[resolved_tgt] = effective.translate(table) # Pass 5: STRING / UNSTRING for tgt, asgn in flat_list: if asgn['type'] == 'string_concat': resolved_tgt = _resolve_subscript(tgt, rec) pi = pi_map.get(resolved_tgt, {}) parts = [] for v in asgn.get('source_vars', []): resolved_v = _resolve_subscript(v, rec) if resolved_v in rec: parts.append(str(rec[resolved_v])) val = ''.join(parts) if pi.get('type') in ('alphanumeric', 'alphabetic'): val = val.ljust(pi.get('length', len(val)))[:pi.get('length', len(val))] rec[resolved_tgt] = val elif asgn['type'] == 'unstring_split': resolved_tgt = _resolve_subscript(tgt, rec) pi = pi_map.get(resolved_tgt, {}) src_var = asgn.get('source_vars', [None])[0] resolved_src = _resolve_subscript(src_var, rec) if src_var else None idx = asgn.get('index', 0) if resolved_src and resolved_src in rec: src_val = str(rec[resolved_src]) ftype = pi.get('type', 'unknown') if idx == 0: val = src_val else: val = ' ' if ftype in ('alphanumeric', 'alphabetic') else '0' if ftype in ('alphanumeric', 'alphabetic'): val = val.ljust(pi.get('length', len(val)))[:pi.get('length', len(val))] rec[resolved_tgt] = val # Pass 6: READ INTO / WRITE FROM for tgt, asgn in flat_list: if asgn['type'] == 'read_into': fname = asgn.get('file', '') if fname in file_sec: children = _init_child_names(file_sec[fname][0], fields) rec[tgt] = ''.join(str(rec.get(c, '')) for c in children) elif asgn['type'] == 'write_from': buf = tgt rec_name = asgn.get('file', '') children = _init_child_names(rec_name, fields) if children: src = str(rec.get(buf, '')) pos = 0 for c in children: pi = pi_map.get(c, {}) length = pi.get('digits', 0) + pi.get('decimal', 0) or pi.get('length', 0) if length > 0: chunk = src[pos:pos + length] if not chunk: chunk = '0' if pi.get('type') == 'numeric' else ' ' rec[c] = chunk.ljust(length) pos += length # Pass 7: ACCEPT for tgt, asgn in flat_list: if asgn['type'] == 'accept': resolved_tgt = _resolve_subscript(tgt, rec) pi = pi_map.get(resolved_tgt, {}) ftype = pi.get('type', 'unknown') total = pi.get('digits', 0) + pi.get('decimal', 0) length = pi.get('length', 0) from_type = asgn.get('from', 'USER') val = None if from_type == 'DATE': val = '20260603' elif from_type == 'TIME': val = '120000' elif from_type == 'DAY': val = '2026154' elif from_type == 'DAY-OF-WEEK': val = '3' elif from_type == 'YEAR': val = '2026' if val is not None: if ftype == 'numeric': rec[resolved_tgt] = val.zfill(total) else: rec[resolved_tgt] = val.ljust(length)[:length] if length else val # Pass 8: SET var TO TRUE (88-level) for tgt, asgn in flat_list: if asgn['type'] == 'set_true': resolved_tgt = _resolve_subscript(tgt, rec) val = asgn.get('value', '1') pi = pi_map.get(resolved_tgt, {}) ftype = pi.get('type', 'unknown') if ftype in ('alphanumeric', 'alphabetic'): length = pi.get('length', len(str(val))) rec[resolved_tgt] = str(val)[0].ljust(length)[:length] else: total = pi.get('digits', 0) + pi.get('decimal', 0) rec[resolved_tgt] = str(val).zfill(max(total, 1)) if rec == _old: break else: logger.warning(f"propagate_assignments 未收敛({_MAX_CONVERGE} 次迭代后仍有变化)") def classify_field_roles(tree, assignments, fields, source=None, proc_text=None): """分析分支树和赋值记录,分类各字段的入出力角色。 优先级:FD/OPEN 方向 > 静态分析 返回 {字段名: 'input'|'output'|'inout'|'unused'}. """ # Phase 0: FD/OPEN 方向解析 fd_roles = {} if source and proc_text: from .read import parse_file_control, parse_file_section, scan_open_statements file_ctl = parse_file_control(source) file_sec = parse_file_section(source) open_dir = scan_open_statements(proc_text) for iname, direction in open_dir.items(): if iname in file_sec: for rec_name in file_sec[iname]: if direction == 'INPUT': fd_roles[rec_name] = 'input' elif direction == 'OUTPUT': fd_roles[rec_name] = 'output' elif direction == 'I-O': fd_roles[rec_name] = 'inout' # 传播到子字段 for rec_name, role in list(fd_roles.items()): for child in _init_child_names(rec_name, fields): fd_roles[child] = role counts = {f['name']: {'read': 0, 'write': 0} for f in fields} def _walk(node): if isinstance(node, BrIf): if node.cond_tree: for leaf in collect_leaves(node.cond_tree): name = _basename(leaf.field) if name in counts: counts[name]['read'] += 1 _walk(node.true_seq) _walk(node.false_seq) elif isinstance(node, BrEval): name = _basename(node.subject) if name in counts: counts[name]['read'] += 1 for _, seq in node.when_list: _walk(seq) _walk(node.other_seq) elif isinstance(node, BrPerform): if node.condition: parsed = parse_single_condition(node.condition) if parsed: name = _basename(parsed[0]) if name in counts: counts[name]['read'] += 1 if node.varying_var: name = _basename(node.varying_var) if name in counts: counts[name]['write'] += 1 _walk(node.body_seq) elif isinstance(node, CallNode): for p in node.using_params: name = _basename(p.get("name", "")) mechanism = p.get("mechanism", "reference") if name in counts: counts[name]["read"] += 1 if mechanism.lower() == "reference": counts[name]["write"] += 1 elif isinstance(node, Assign): tgt_base = _basename(node.target) atype = node.source_info.get('type') if atype == 'read_into': if tgt_base in counts: counts[tgt_base]['write'] += 1 elif atype == 'write_from': if tgt_base in counts: counts[tgt_base]['read'] += 1 elif atype == 'set_true': if tgt_base in counts: counts[tgt_base]['write'] += 1 else: if tgt_base in counts: counts[tgt_base]['write'] += 1 for v in node.source_info.get('source_vars', []): v_base = _basename(v) if v_base in counts: counts[v_base]['read'] += 1 if atype == 'initialize' and tgt_base in counts: for child in _init_child_names(tgt_base, fields): if child in counts: counts[child]['write'] += 1 elif isinstance(node, BrSeq): for c in node.children: _walk(c) _walk(tree) # Phase extra: ACCEPT / DISPLAY (proc_text 扫描) if proc_text: for m in re.finditer(r'ACCEPT\s+(\w[\w-]*)', proc_text): name = _basename(m.group(1).upper()) if name in counts: counts[name]['write'] += 1 for m in re.finditer(r'DISPLAY\s+(\w[\w-]*)', proc_text): name = _basename(m.group(1).upper()) if name in counts: counts[name]['read'] += 1 # LINKAGE 字段默认 input(未使用时不改变) for f in fields: if f.get('section') == 'LINKAGE': name = f['name'] if name in counts and counts[name]['read'] == 0 and counts[name]['write'] == 0: counts[name]['read'] = 1 result = {} for name, c in counts.items(): if name in fd_roles: result[name] = fd_roles[name] continue if c['read'] > 0 and c['write'] > 0: result[name] = 'inout' elif c['write'] > 0: result[name] = 'output' elif c['read'] > 0: result[name] = 'input' else: result[name] = 'unused' # 确保 FD 记录字段也出现(即使不在 fields 中—应不会) for name, role in fd_roles.items(): if name not in result: result[name] = role return result