"""??????? + COPYBOOK + DATA DIVISION?? + PIC""" import re from pathlib import Path from lark import Lark, Transformer, v_args from .models import FieldDef, PicInfo # 鈹€鈹€ Preprocessor 鈹€鈹€ def _is_fixed_format(source: str) -> bool: if re.search(r'>>SOURCE\s+FORMAT\s+IS\s+FREE', source, re.IGNORECASE): return False if re.search(r'>>SOURCE\s+FORMAT\s+IS\s+FIXED', source, re.IGNORECASE): return True lines = [l for l in source.splitlines() if l.strip()] fixed_hits = 0 free_hits = 0 for line in lines[:10]: if len(line) >= 72: free_hits += 1 elif len(line) >= 7 and line[6] in ('*', '/', '-', 'D'): fixed_hits += 1 return fixed_hits >= free_hits if (fixed_hits + free_hits) > 0 else True def preprocess(source: str) -> str: fixed = _is_fixed_format(source) lines = [] for raw_line in source.splitlines(): line = raw_line.rstrip() if not line: lines.append('') continue if fixed: if len(line) >= 7 and line[6] in ('*', '/'): continue if len(line) >= 7 and line[6] == '-': if lines: lines[-1] = lines[-1] + ' ' + line[7:].lstrip() continue if len(line) >= 7 and line[6].upper() == 'D': continue content = line[6:] if len(line) >= 7 else line else: comment_pos = line.find('*>') if comment_pos >= 0: line = line[:comment_pos] line = line.strip() if not line: continue content = line lines.append(re.sub(r'\s+FALSE\s+[^\s.]+', '', content.upper())) return '\n'.join(lines) def extract_data_division(source: str) -> str: m = re.search(r'DATA\s+DIVISION\s*\.', source) if not m: return '' start = m.end() end_m = re.search(r'PROCEDURE\s+DIVISION', source[start:]) if end_m: end = start + end_m.start() else: end = len(source) return source[start:end].strip() def extract_procedure_division(source: str) -> str: m = re.search(r'PROCEDURE\s+DIVISION', source) if not m: return '' return source[m.start():].strip() # 鈹€鈹€ COPYBOOK Resolution 鈹€鈹€ _COPYBOOK_EXTENSIONS = ['.cpy', '.cbl', '.cpb', ''] def resolve_copybooks(source: str, source_dir: str) -> str: """Find COPY statements and replace with copybook content.""" _RE_COPY = re.compile( r"^\s*COPY\s+(\w[\w-]*)(?:\s+REPLACING\s+(.+?))?\s*\.?\s*$", re.IGNORECASE ) _RE_PAIR = re.compile(r"==(.+?)==\s+BY\s+==(.+?)==", re.IGNORECASE) lines = source.split('\n') result = [] for line in lines: m = _RE_COPY.match(line) if m: name = m.group(1).upper() found = None for ext in _COPYBOOK_EXTENSIONS: p = Path(source_dir, name + ext) if p.exists(): found = p break if found: cb = found.read_text(encoding='utf-8') if m.group(2): pairs = _RE_PAIR.findall(m.group(2)) for old, new in pairs: cb = re.sub( re.escape(old.strip()), new.strip(), cb, flags=re.IGNORECASE ) result.append(f' * COPY {name}') result.append(cb) else: result.append(line) else: result.append(line) return '\n'.join(result) # 鈹€鈹€ Lark Grammar 鈹€鈹€ _GRAMMAR_CACHE = None def _get_grammar() -> str: global _GRAMMAR_CACHE if _GRAMMAR_CACHE is None: lark_path = Path(__file__).parent / 'grammar.lark' _GRAMMAR_CACHE = lark_path.read_text(encoding='utf-8') return _GRAMMAR_CACHE # 鈹€鈹€ Data Transformer 鈹€鈹€ @v_args(inline=True) class DataTransformer(Transformer): def __init__(self): super().__init__() self.fields = [] self._last_parent = None self._pending = [] def start(self, *items): for f in self._pending: f['section'] = f.get('section', 'WORKING-STORAGE') self.fields.append(f) self._pending = [] return self.fields def file_section(self, *args): for f in self._pending: f['section'] = 'FILE' self.fields.append(f) self._pending = [] return None def working_storage(self, *args): for f in self._pending: f['section'] = 'WORKING-STORAGE' self.fields.append(f) self._pending = [] return None def linkage(self, *args): for f in self._pending: f['section'] = 'LINKAGE' self.fields.append(f) self._pending = [] return None def data_item(self, level_num, name, *clauses): level = int(str(level_num)) name = str(name) is_filler = (name.upper() == 'FILLER') pic = None value = None values = None redefines = None usage = None occurs_count = 0 occurs_depending = None for c in clauses: if isinstance(c, dict): if 'pic' in c: pic = c['pic'] if 'value' in c: value = c['value'] if 'values' in c: values = c['values'] if 'redefines' in c: redefines = c['redefines'] if 'usage' in c: usage = c['usage'] if 'occurs' in c: occurs_count = c['occurs'] if 'depends' in c: occurs_depending = c['depends'] base = { 'level': level, 'name': name, 'pic': pic if pic else None, 'value': value, 'values': values, 'is_filler': is_filler, 'redefines': redefines, 'usage': usage, 'occurs': occurs_count, 'occurs_depending': occurs_depending, } if pic is not None: self._pending.append(base) self._last_parent = name elif level == 88 and value is not None: base.update({ 'pic': None, 'value': value.strip("'").strip('"'), 'values': [v.strip("'").strip('"') for v in values] if values else None, 'is_88': True, 'parent': self._last_parent or '', }) self._pending.append(base) else: # 组项目(无 PIC,有下级字段) self._pending.append(base) self._last_parent = name return None def clause(self, *args): # ?????????? dict??????? token result = {} for a in args: if isinstance(a, dict): result.update(a) elif isinstance(a, str) and a.upper() in ( 'COMP', 'COMP-3', 'COMP-5', 'BINARY', 'PACKED-DECIMAL', 'DISPLAY', ): result['usage'] = a.upper() return result if result else None def pic_clause(self, *args): return {'pic': str(args[-1])} def usage_clause(self, token): return {'usage': str(token)} def value_clause(self, *args): values = [] for a in args: if isinstance(a, str) and a.upper() in ('VALUE', 'IS'): continue val = str(a).strip("'").strip('"') values.append(val) return {'value': values[0], 'values': values} if values else {'value': None} def value_literal(self, token): return str(token) def occurs_clause(self, *args): result = {'occurs': int(args[0])} if len(args) >= 2: result['depends'] = str(args[1]) return result def redefines_clause(self, *args): return {'redefines': str(args[-1])} def level_num(self, token): return token def NAME(self, token): return str(token) def PICTURE_STRING(self, token): return str(token) def INT(self, token): return int(token) # 鈹€鈹€ PIC Parser 鈹€鈹€ def _expand_pic(s: str) -> str: result = '' i = 0 while i < len(s): if s[i] == '(': j = s.find(')', i) if j > i + 1: count = int(s[i + 1:j]) if result: result += result[-1] * (count - 1) i = j + 1 continue result += s[i] i += 1 return result def parse_pic(pic_str: str) -> PicInfo: info = PicInfo() s = pic_str.upper().strip() if not s: return info if s.startswith('S'): info.signed = True s = s[1:] expanded = _expand_pic(s) if expanded[0] == '9': info.type = 'numeric' if 'V' in expanded: parts = expanded.split('V') info.digits = parts[0].count('9') info.decimal = parts[1].count('9') else: info.digits = expanded.count('9') info.decimal = 0 elif expanded[0] == 'X': info.type = 'alphanumeric' info.length = len(expanded) elif expanded[0] == 'A': info.type = 'alphabetic' info.length = len(expanded) elif expanded[0] in ('Z', '*', '$', '+', '-'): info.type = 'numeric-edited' info.digits = expanded.count('9') if 'V' in expanded: info.decimal = expanded.split('V')[1].count('9') elif '.' in expanded: info.decimal = expanded.split('.')[1].count('9') info.length = len(expanded) elif expanded.endswith('CR') or expanded.endswith('DB'): info.type = 'numeric-edited' stripped = expanded[:-2] info.digits = stripped.count('9') if 'V' in stripped: info.decimal = stripped.split('V')[1].count('9') elif '.' in stripped: info.decimal = stripped.split('.')[1].count('9') info.length = len(expanded) else: info.type = 'alphanumeric' info.length = len(expanded) return info # 鈹€鈹€ DATA DIVISION 鍏ュ彛 鈹€鈹€ def parse_data_division(data_div_text: str) -> list[FieldDef]: """??DATA DIVISION???FieldDef????PIC???""" grammar = _get_grammar() parser = Lark(grammar, parser='earley', lexer='dynamic') tree = parser.parse(data_div_text) transformer = DataTransformer() raw = transformer.transform(tree) result = [] for r in raw: pic = r.get('pic', '') info = parse_pic(pic) if pic else None f = FieldDef( name=r['name'], level=r['level'], pic=pic, pic_info=info, is_filler=r.get('is_filler', False), occurs_count=r.get('occurs', 0), occurs_depending=r.get('occurs_depending'), redefines=r.get('redefines'), usage=r.get('usage'), value=r.get('value'), values=r.get('values'), is_88=r.get('is_88', False), parent=r.get('parent'), section=r.get('section'), ) result.append(f) return result # 鈹€鈹€ FILE-CONTROL / FILE SECTION / OPEN 瑙f瀽 鈹€鈹€ def parse_file_control(source: str) -> dict: """?? FILE-CONTROL??? {?????: ?????}""" m = re.search(r'FILE-CONTROL\.(.*?)(?=DATA\s+DIVISION|\Z)', source, re.DOTALL | re.IGNORECASE) if not m: return {} fc = m.group(1) result = {} for m in re.finditer( r'SELECT\s+(\w[\w-]*)\s+[^.]*?\bASSIGN\s+TO\s+(["\'])(.*?)\2', fc, re.IGNORECASE ): result[m.group(1).upper()] = m.group(3).upper() return result def parse_file_section(source: str) -> dict: """?? FILE SECTION??? {?????: [01?????...]}""" m = re.search(r'FILE\s+SECTION\.(.*?)(?=WORKING-STORAGE\s+SECTION|LINKAGE\s+SECTION|\Z)', source, re.DOTALL | re.IGNORECASE) if not m: return {} fs = m.group(1) result = {} # ? FD ?????? FD ? fd_blocks = re.split(r'\n\s*(?=FD\s+)', fs.strip()) for block in fd_blocks: m = re.match(r'FD\s+(\w[\w-]*)', block, re.IGNORECASE) if not m: continue name = m.group(1).upper() # ???????? 01 ???? recs = re.findall(r'^\s*0{0,1}1\s+(\w[\w-]*)', block, re.MULTILINE) result[name] = [r.upper() for r in recs] return result def scan_open_statements(source: str) -> dict: """?? OPEN ????? {?????: 'INPUT'|'OUTPUT'|'I-O'}""" dirs = {} for m in re.finditer( r'OPEN\s+((?:INPUT|OUTPUT|I-O)\s+[\w\s-]+' r'(?:\s+(?:INPUT|OUTPUT|I-O)\s+[\w\s-]+)*)', source, re.IGNORECASE ): full = m.group(1) for seg_m in re.finditer( r'(INPUT|OUTPUT|I-O)\s+([\w\s-]+)', full, re.IGNORECASE ): direction = seg_m.group(1).upper() for fname in re.findall(r'\w[\w-]*', seg_m.group(2)): dirs[fname.upper()] = direction return dirs