7ac887c776
- Add INSPECT (TALLYING/REPLACING/CONVERTING) with BEFORE/AFTER INITIAL - Add SEARCH/SEARCH ALL with element-assignment path enumeration - Fix _mark_perform compound condition marking via evaluate_tree - Fix EVALUATE TRUE prior_false to collect all MC/DC false sets - Add impossible path filtering (Pass A.5) with trace-to-root conflict detection - Fix multi-line PERFORM VARYING parsing (VARYING/FROM/BY/UNTIL on separate lines) - Remove dead code: agents.py LLM parser (replaced by rule-based _BrParser) - 59 unit tests passing, 5 integration programs verified
440 lines
13 KiB
Python
440 lines
13 KiB
Python
"""??????? + COPYBOOK + DATA DIVISION?? + PIC"""
|
|
|
|
import re
|
|
from pathlib import Path
|
|
from lark import Lark, Transformer, v_args
|
|
|
|
from .models import FieldDef, PicInfo
|
|
|
|
|
|
# 鈹€鈹€ Preprocessor 鈹€鈹€
|
|
|
|
|
|
def _is_fixed_format(source: str) -> bool:
|
|
if re.search(r'>>SOURCE\s+FORMAT\s+IS\s+FREE', source, re.IGNORECASE):
|
|
return False
|
|
if re.search(r'>>SOURCE\s+FORMAT\s+IS\s+FIXED', source, re.IGNORECASE):
|
|
return True
|
|
lines = [l for l in source.splitlines() if l.strip()]
|
|
fixed_hits = 0
|
|
free_hits = 0
|
|
for line in lines[:10]:
|
|
if len(line) >= 72:
|
|
free_hits += 1
|
|
elif len(line) >= 7 and line[6] in ('*', '/', '-', 'D'):
|
|
fixed_hits += 1
|
|
return fixed_hits >= free_hits if (fixed_hits + free_hits) > 0 else True
|
|
|
|
|
|
def preprocess(source: str) -> str:
|
|
fixed = _is_fixed_format(source)
|
|
lines = []
|
|
for raw_line in source.splitlines():
|
|
line = raw_line.rstrip()
|
|
if not line:
|
|
lines.append('')
|
|
continue
|
|
if fixed:
|
|
if len(line) >= 7 and line[6] in ('*', '/'):
|
|
continue
|
|
if len(line) >= 7 and line[6] == '-':
|
|
if lines:
|
|
lines[-1] = lines[-1] + ' ' + line[7:].lstrip()
|
|
continue
|
|
if len(line) >= 7 and line[6].upper() == 'D':
|
|
continue
|
|
content = line[6:] if len(line) >= 7 else line
|
|
else:
|
|
comment_pos = line.find('*>')
|
|
if comment_pos >= 0:
|
|
line = line[:comment_pos]
|
|
line = line.strip()
|
|
if not line:
|
|
continue
|
|
content = line
|
|
lines.append(re.sub(r'\s+FALSE\s+[^\s.]+', '', content.upper()))
|
|
return '\n'.join(lines)
|
|
|
|
|
|
def extract_data_division(source: str) -> str:
|
|
m = re.search(r'DATA\s+DIVISION\s*\.', source)
|
|
if not m:
|
|
return ''
|
|
start = m.end()
|
|
end_m = re.search(r'PROCEDURE\s+DIVISION', source[start:])
|
|
if end_m:
|
|
end = start + end_m.start()
|
|
else:
|
|
end = len(source)
|
|
return source[start:end].strip()
|
|
|
|
|
|
def extract_procedure_division(source: str) -> str:
|
|
m = re.search(r'PROCEDURE\s+DIVISION', source)
|
|
if not m:
|
|
return ''
|
|
return source[m.start():].strip()
|
|
|
|
|
|
# 鈹€鈹€ COPYBOOK Resolution 鈹€鈹€
|
|
|
|
_COPYBOOK_EXTENSIONS = ['.cpy', '.cbl', '.cpb', '']
|
|
|
|
|
|
def resolve_copybooks(source: str, source_dir: str) -> str:
|
|
"""Find COPY statements and replace with copybook content."""
|
|
_RE_COPY = re.compile(
|
|
r"^\s*COPY\s+(\w[\w-]*)(?:\s+REPLACING\s+(.+?))?\s*\.?\s*$",
|
|
re.IGNORECASE
|
|
)
|
|
_RE_PAIR = re.compile(r"==(.+?)==\s+BY\s+==(.+?)==", re.IGNORECASE)
|
|
|
|
lines = source.split('\n')
|
|
result = []
|
|
for line in lines:
|
|
m = _RE_COPY.match(line)
|
|
if m:
|
|
name = m.group(1).upper()
|
|
found = None
|
|
for ext in _COPYBOOK_EXTENSIONS:
|
|
p = Path(source_dir, name + ext)
|
|
if p.exists():
|
|
found = p
|
|
break
|
|
if found:
|
|
cb = found.read_text(encoding='utf-8')
|
|
if m.group(2):
|
|
pairs = _RE_PAIR.findall(m.group(2))
|
|
for old, new in pairs:
|
|
cb = re.sub(
|
|
re.escape(old.strip()), new.strip(),
|
|
cb, flags=re.IGNORECASE
|
|
)
|
|
result.append(f' * COPY {name}')
|
|
result.append(cb)
|
|
else:
|
|
result.append(line)
|
|
else:
|
|
result.append(line)
|
|
return '\n'.join(result)
|
|
|
|
|
|
# 鈹€鈹€ Lark Grammar 鈹€鈹€
|
|
|
|
_GRAMMAR_CACHE = None
|
|
|
|
|
|
def _get_grammar() -> str:
|
|
global _GRAMMAR_CACHE
|
|
if _GRAMMAR_CACHE is None:
|
|
lark_path = Path(__file__).parent / 'grammar.lark'
|
|
_GRAMMAR_CACHE = lark_path.read_text(encoding='utf-8')
|
|
return _GRAMMAR_CACHE
|
|
|
|
|
|
# 鈹€鈹€ Data Transformer 鈹€鈹€
|
|
|
|
@v_args(inline=True)
|
|
class DataTransformer(Transformer):
|
|
def __init__(self):
|
|
super().__init__()
|
|
self.fields = []
|
|
self._last_parent = None
|
|
self._pending = []
|
|
|
|
def start(self, *items):
|
|
for f in self._pending:
|
|
f['section'] = f.get('section', 'WORKING-STORAGE')
|
|
self.fields.append(f)
|
|
self._pending = []
|
|
return self.fields
|
|
|
|
def file_section(self, *args):
|
|
for f in self._pending:
|
|
f['section'] = 'FILE'
|
|
self.fields.append(f)
|
|
self._pending = []
|
|
return None
|
|
|
|
def working_storage(self, *args):
|
|
for f in self._pending:
|
|
f['section'] = 'WORKING-STORAGE'
|
|
self.fields.append(f)
|
|
self._pending = []
|
|
return None
|
|
|
|
def linkage(self, *args):
|
|
for f in self._pending:
|
|
f['section'] = 'LINKAGE'
|
|
self.fields.append(f)
|
|
self._pending = []
|
|
return None
|
|
|
|
def data_item(self, level_num, name, *clauses):
|
|
level = int(str(level_num))
|
|
name = str(name)
|
|
is_filler = (name.upper() == 'FILLER')
|
|
pic = None
|
|
value = None
|
|
values = None
|
|
redefines = None
|
|
usage = None
|
|
occurs_count = 0
|
|
occurs_depending = None
|
|
for c in clauses:
|
|
if isinstance(c, dict):
|
|
if 'pic' in c:
|
|
pic = c['pic']
|
|
if 'value' in c:
|
|
value = c['value']
|
|
if 'values' in c:
|
|
values = c['values']
|
|
if 'redefines' in c:
|
|
redefines = c['redefines']
|
|
if 'usage' in c:
|
|
usage = c['usage']
|
|
if 'occurs' in c:
|
|
occurs_count = c['occurs']
|
|
if 'depends' in c:
|
|
occurs_depending = c['depends']
|
|
|
|
base = {
|
|
'level': level,
|
|
'name': name,
|
|
'pic': pic if pic else None,
|
|
'value': value,
|
|
'values': values,
|
|
'is_filler': is_filler,
|
|
'redefines': redefines,
|
|
'usage': usage,
|
|
'occurs': occurs_count,
|
|
'occurs_depending': occurs_depending,
|
|
}
|
|
|
|
if pic is not None:
|
|
self._pending.append(base)
|
|
self._last_parent = name
|
|
elif level == 88 and value is not None:
|
|
base.update({
|
|
'pic': None,
|
|
'value': value.strip("'").strip('"'),
|
|
'values': [v.strip("'").strip('"') for v in values] if values else None,
|
|
'is_88': True,
|
|
'parent': self._last_parent or '',
|
|
})
|
|
self._pending.append(base)
|
|
else:
|
|
# 组项目(无 PIC,有下级字段)
|
|
self._pending.append(base)
|
|
self._last_parent = name
|
|
return None
|
|
|
|
def clause(self, *args):
|
|
# ?????????? dict??????? token
|
|
result = {}
|
|
for a in args:
|
|
if isinstance(a, dict):
|
|
result.update(a)
|
|
elif isinstance(a, str) and a.upper() in (
|
|
'COMP', 'COMP-3', 'COMP-5', 'BINARY', 'PACKED-DECIMAL', 'DISPLAY',
|
|
):
|
|
result['usage'] = a.upper()
|
|
return result if result else None
|
|
|
|
def pic_clause(self, *args):
|
|
return {'pic': str(args[-1])}
|
|
|
|
def usage_clause(self, token):
|
|
return {'usage': str(token)}
|
|
|
|
def value_clause(self, *args):
|
|
values = []
|
|
for a in args:
|
|
if isinstance(a, str) and a.upper() in ('VALUE', 'IS'):
|
|
continue
|
|
val = str(a).strip("'").strip('"')
|
|
values.append(val)
|
|
return {'value': values[0], 'values': values} if values else {'value': None}
|
|
|
|
def value_literal(self, token):
|
|
return str(token)
|
|
|
|
def occurs_clause(self, *args):
|
|
result = {'occurs': int(args[0])}
|
|
if len(args) >= 2:
|
|
result['depends'] = str(args[1])
|
|
return result
|
|
|
|
def redefines_clause(self, *args):
|
|
return {'redefines': str(args[-1])}
|
|
|
|
def level_num(self, token):
|
|
return token
|
|
|
|
def NAME(self, token):
|
|
return str(token)
|
|
|
|
def PICTURE_STRING(self, token):
|
|
return str(token)
|
|
|
|
def INT(self, token):
|
|
return int(token)
|
|
|
|
|
|
# 鈹€鈹€ PIC Parser 鈹€鈹€
|
|
|
|
def _expand_pic(s: str) -> str:
|
|
result = ''
|
|
i = 0
|
|
while i < len(s):
|
|
if s[i] == '(':
|
|
j = s.find(')', i)
|
|
if j > i + 1:
|
|
count = int(s[i + 1:j])
|
|
if result:
|
|
result += result[-1] * (count - 1)
|
|
i = j + 1
|
|
continue
|
|
result += s[i]
|
|
i += 1
|
|
return result
|
|
|
|
|
|
def parse_pic(pic_str: str) -> PicInfo:
|
|
info = PicInfo()
|
|
s = pic_str.upper().strip()
|
|
if not s:
|
|
return info
|
|
if s.startswith('S'):
|
|
info.signed = True
|
|
s = s[1:]
|
|
expanded = _expand_pic(s)
|
|
if expanded[0] == '9':
|
|
info.type = 'numeric'
|
|
if 'V' in expanded:
|
|
parts = expanded.split('V')
|
|
info.digits = parts[0].count('9')
|
|
info.decimal = parts[1].count('9')
|
|
else:
|
|
info.digits = expanded.count('9')
|
|
info.decimal = 0
|
|
elif expanded[0] == 'X':
|
|
info.type = 'alphanumeric'
|
|
info.length = len(expanded)
|
|
elif expanded[0] == 'A':
|
|
info.type = 'alphabetic'
|
|
info.length = len(expanded)
|
|
elif expanded[0] in ('Z', '*', '$', '+', '-'):
|
|
info.type = 'numeric-edited'
|
|
info.digits = expanded.count('9')
|
|
if 'V' in expanded:
|
|
info.decimal = expanded.split('V')[1].count('9')
|
|
elif '.' in expanded:
|
|
info.decimal = expanded.split('.')[1].count('9')
|
|
info.length = len(expanded)
|
|
elif expanded.endswith('CR') or expanded.endswith('DB'):
|
|
info.type = 'numeric-edited'
|
|
stripped = expanded[:-2]
|
|
info.digits = stripped.count('9')
|
|
if 'V' in stripped:
|
|
info.decimal = stripped.split('V')[1].count('9')
|
|
elif '.' in stripped:
|
|
info.decimal = stripped.split('.')[1].count('9')
|
|
info.length = len(expanded)
|
|
else:
|
|
info.type = 'alphanumeric'
|
|
info.length = len(expanded)
|
|
return info
|
|
|
|
|
|
# 鈹€鈹€ DATA DIVISION 鍏ュ彛 鈹€鈹€
|
|
|
|
def parse_data_division(data_div_text: str) -> list[FieldDef]:
|
|
"""??DATA DIVISION???FieldDef????PIC???"""
|
|
grammar = _get_grammar()
|
|
parser = Lark(grammar, parser='earley', lexer='dynamic')
|
|
tree = parser.parse(data_div_text)
|
|
|
|
transformer = DataTransformer()
|
|
raw = transformer.transform(tree)
|
|
|
|
result = []
|
|
for r in raw:
|
|
pic = r.get('pic', '')
|
|
info = parse_pic(pic) if pic else None
|
|
f = FieldDef(
|
|
name=r['name'],
|
|
level=r['level'],
|
|
pic=pic,
|
|
pic_info=info,
|
|
is_filler=r.get('is_filler', False),
|
|
occurs_count=r.get('occurs', 0),
|
|
occurs_depending=r.get('occurs_depending'),
|
|
redefines=r.get('redefines'),
|
|
usage=r.get('usage'),
|
|
value=r.get('value'),
|
|
values=r.get('values'),
|
|
is_88=r.get('is_88', False),
|
|
parent=r.get('parent'),
|
|
section=r.get('section'),
|
|
)
|
|
result.append(f)
|
|
return result
|
|
|
|
|
|
# 鈹€鈹€ FILE-CONTROL / FILE SECTION / OPEN 瑙f瀽 鈹€鈹€
|
|
|
|
|
|
def parse_file_control(source: str) -> dict:
|
|
"""?? FILE-CONTROL??? {?????: ?????}"""
|
|
m = re.search(r'FILE-CONTROL\.(.*?)(?=DATA\s+DIVISION|\Z)', source, re.DOTALL | re.IGNORECASE)
|
|
if not m:
|
|
return {}
|
|
fc = m.group(1)
|
|
result = {}
|
|
for m in re.finditer(
|
|
r'SELECT\s+(\w[\w-]*)\s+[^.]*?\bASSIGN\s+TO\s+(["\'])(.*?)\2',
|
|
fc, re.IGNORECASE
|
|
):
|
|
result[m.group(1).upper()] = m.group(3).upper()
|
|
return result
|
|
|
|
|
|
def parse_file_section(source: str) -> dict:
|
|
"""?? FILE SECTION??? {?????: [01?????...]}"""
|
|
m = re.search(r'FILE\s+SECTION\.(.*?)(?=WORKING-STORAGE\s+SECTION|LINKAGE\s+SECTION|\Z)',
|
|
source, re.DOTALL | re.IGNORECASE)
|
|
if not m:
|
|
return {}
|
|
fs = m.group(1)
|
|
result = {}
|
|
# ? FD ?????? FD ?
|
|
fd_blocks = re.split(r'\n\s*(?=FD\s+)', fs.strip())
|
|
for block in fd_blocks:
|
|
m = re.match(r'FD\s+(\w[\w-]*)', block, re.IGNORECASE)
|
|
if not m:
|
|
continue
|
|
name = m.group(1).upper()
|
|
# ???????? 01 ????
|
|
recs = re.findall(r'^\s*0{0,1}1\s+(\w[\w-]*)', block, re.MULTILINE)
|
|
result[name] = [r.upper() for r in recs]
|
|
return result
|
|
|
|
|
|
def scan_open_statements(source: str) -> dict:
|
|
"""?? OPEN ????? {?????: 'INPUT'|'OUTPUT'|'I-O'}"""
|
|
dirs = {}
|
|
for m in re.finditer(
|
|
r'OPEN\s+((?:INPUT|OUTPUT|I-O)\s+[\w\s-]+'
|
|
r'(?:\s+(?:INPUT|OUTPUT|I-O)\s+[\w\s-]+)*)',
|
|
source, re.IGNORECASE
|
|
):
|
|
full = m.group(1)
|
|
for seg_m in re.finditer(
|
|
r'(INPUT|OUTPUT|I-O)\s+([\w\s-]+)', full, re.IGNORECASE
|
|
):
|
|
direction = seg_m.group(1).upper()
|
|
for fname in re.findall(r'\w[\w-]*', seg_m.group(2)):
|
|
dirs[fname.upper()] = direction
|
|
return dirs
|