From e2486db510050d66fa27372d026d049ba0a4ab4a Mon Sep 17 00:00:00 2001 From: hangshuo652 Date: Thu, 18 Jun 2026 16:26:44 +0800 Subject: [PATCH] fix: 3 issues found during real COBOL validation --- cobol_testgen/read.py | 442 ++++++++++++++++++++++++++++++++++++++++++ hina/classifier.py | 12 ++ 2 files changed, 454 insertions(+) create mode 100644 cobol_testgen/read.py diff --git a/cobol_testgen/read.py b/cobol_testgen/read.py new file mode 100644 index 0000000..54230b9 --- /dev/null +++ b/cobol_testgen/read.py @@ -0,0 +1,442 @@ +"""??????? + COPYBOOK + DATA DIVISION?? + PIC""" + +import re +from pathlib import Path +from lark import Lark, Transformer, v_args + +from .models import FieldDef, PicInfo + + +# 鈹€鈹€ Preprocessor 鈹€鈹€ + + +def _is_fixed_format(source: str) -> bool: + if re.search(r'>>SOURCE\s+FORMAT\s+IS\s+FREE', source, re.IGNORECASE): + return False + if re.search(r'>>SOURCE\s+FORMAT\s+IS\s+FIXED', source, re.IGNORECASE): + return True + lines = [l for l in source.splitlines() if l.strip()] + fixed_hits = 0 + free_hits = 0 + for line in lines[:10]: + if len(line) >= 72: + free_hits += 1 + elif len(line) >= 7 and line[6] in ('*', '/', '-', 'D'): + fixed_hits += 1 + return fixed_hits >= free_hits if (fixed_hits + free_hits) > 0 else True + + +def preprocess(source: str) -> str: + fixed = _is_fixed_format(source) + lines = [] + for raw_line in source.splitlines(): + line = raw_line.rstrip() + if not line: + lines.append('') + continue + if fixed: + if len(line) >= 7 and line[6] in ('*', '/'): + continue + if len(line) >= 7 and line[6] == '-': + if lines: + lines[-1] = lines[-1] + ' ' + line[7:].lstrip() + continue + if len(line) >= 7 and line[6].upper() == 'D': + continue + content = line[6:] if len(line) >= 7 else line + else: + comment_pos = line.find('*>') + if comment_pos >= 0: + line = line[:comment_pos] + line = line.strip() + if not line: + continue + content = line + lines.append(re.sub(r'\s+FALSE\s+[^\s.]+', '', content.upper())) + return '\n'.join(lines) + + +def extract_data_division(source: str) -> str: + m = re.search(r'DATA\s+DIVISION\s*\.', source) + if not m: + return '' + start = m.end() + end_m = re.search(r'PROCEDURE\s+DIVISION', source[start:]) + if end_m: + end = start + end_m.start() + else: + end = len(source) + return source[start:end].strip() + + +def extract_procedure_division(source: str) -> str: + m = re.search(r'PROCEDURE\s+DIVISION', source) + if not m: + return '' + return source[m.start():].strip() + + +# 鈹€鈹€ COPYBOOK Resolution 鈹€鈹€ + +_COPYBOOK_EXTENSIONS = ['.cpy', '.cbl', '.cpb', ''] + + +def resolve_copybooks(source: str, source_dir: str) -> str: + """Find COPY statements and replace with copybook content.""" + _RE_COPY = re.compile( + r"^\s*COPY\s+(\w[\w-]*)(?:\s+REPLACING\s+(.+?))?\s*\.?\s*$", + re.IGNORECASE + ) + _RE_PAIR = re.compile(r"==(.+?)==\s+BY\s+==(.+?)==", re.IGNORECASE) + + lines = source.split('\n') + result = [] + for line in lines: + m = _RE_COPY.match(line) + if m: + name = m.group(1).upper() + found = None + for ext in _COPYBOOK_EXTENSIONS: + p = Path(source_dir, name + ext) + if p.exists(): + found = p + break + if found: + cb = found.read_text(encoding='utf-8') + if m.group(2): + pairs = _RE_PAIR.findall(m.group(2)) + for old, new in pairs: + cb = re.sub( + re.escape(old.strip()), new.strip(), + cb, flags=re.IGNORECASE + ) + result.append(f' * COPY {name}') + result.append(cb) + else: + result.append(line) + else: + result.append(line) + return '\n'.join(result) + + +# 鈹€鈹€ Lark Grammar 鈹€鈹€ + +_GRAMMAR_CACHE = None + + +def _get_grammar() -> str: + global _GRAMMAR_CACHE + if _GRAMMAR_CACHE is None: + lark_path = Path(__file__).parent / 'grammar.lark' + _GRAMMAR_CACHE = lark_path.read_text(encoding='utf-8') + return _GRAMMAR_CACHE + + +# 鈹€鈹€ Data Transformer 鈹€鈹€ + +@v_args(inline=True) +class DataTransformer(Transformer): + def __init__(self): + super().__init__() + self.fields = [] + self._last_parent = None + self._pending = [] + + def start(self, *items): + for f in self._pending: + f['section'] = f.get('section', 'WORKING-STORAGE') + self.fields.append(f) + self._pending = [] + return self.fields + + def file_section(self, *args): + for f in self._pending: + f['section'] = 'FILE' + self.fields.append(f) + self._pending = [] + return None + + def working_storage(self, *args): + for f in self._pending: + f['section'] = 'WORKING-STORAGE' + self.fields.append(f) + self._pending = [] + return None + + def linkage(self, *args): + for f in self._pending: + f['section'] = 'LINKAGE' + self.fields.append(f) + self._pending = [] + return None + + def data_item(self, level_num, name, *clauses): + level = int(str(level_num)) + name = str(name) + is_filler = (name.upper() == 'FILLER') + pic = None + value = None + values = None + redefines = None + usage = None + occurs_count = 0 + occurs_depending = None + for c in clauses: + if isinstance(c, dict): + if 'pic' in c: + pic = c['pic'] + if 'value' in c: + value = c['value'] + if 'values' in c: + values = c['values'] + if 'redefines' in c: + redefines = c['redefines'] + if 'usage' in c: + usage = c['usage'] + if 'occurs' in c: + occurs_count = c['occurs'] + if 'depends' in c: + occurs_depending = c['depends'] + + base = { + 'level': level, + 'name': name, + 'pic': pic if pic else None, + 'value': value, + 'values': values, + 'is_filler': is_filler, + 'redefines': redefines, + 'usage': usage, + 'occurs': occurs_count, + 'occurs_depending': occurs_depending, + } + + if pic is not None: + self._pending.append(base) + self._last_parent = name + elif level == 88 and value is not None: + base.update({ + 'pic': None, + 'value': value.strip("'").strip('"'), + 'values': [v.strip("'").strip('"') for v in values] if values else None, + 'is_88': True, + 'parent': self._last_parent or '', + }) + self._pending.append(base) + else: + # 组项目(无 PIC,有下级字段) + self._pending.append(base) + self._last_parent = name + return None + + def clause(self, *args): + # ?????????? dict??????? token + result = {} + for a in args: + if isinstance(a, dict): + result.update(a) + elif isinstance(a, str) and a.upper() in ( + 'COMP', 'COMP-3', 'COMP-5', 'BINARY', 'PACKED-DECIMAL', 'DISPLAY', + ): + result['usage'] = a.upper() + return result if result else None + + def pic_clause(self, *args): + return {'pic': str(args[-1])} + + def usage_clause(self, token): + return {'usage': str(token)} + + def value_clause(self, *args): + values = [] + for a in args: + if isinstance(a, str) and a.upper() in ('VALUE', 'IS'): + continue + val = str(a).strip("'").strip('"') + values.append(val) + return {'value': values[0], 'values': values} if values else {'value': None} + + def value_literal(self, *args): + if args: + return str(args[-1]) + return '' + + def occurs_clause(self, *args): + result = {'occurs': int(args[0])} + if len(args) >= 2: + result['depends'] = str(args[1]) + return result + + def redefines_clause(self, *args): + return {'redefines': str(args[-1])} + + def level_num(self, token): + return token + + def NAME(self, token): + return str(token) + + def PICTURE_STRING(self, token): + return str(token) + + def INT(self, token): + return int(token) + + +# 鈹€鈹€ PIC Parser 鈹€鈹€ + +def _expand_pic(s: str) -> str: + result = '' + i = 0 + while i < len(s): + if s[i] == '(': + j = s.find(')', i) + if j > i + 1: + count = int(s[i + 1:j]) + if result: + result += result[-1] * (count - 1) + i = j + 1 + continue + result += s[i] + i += 1 + return result + + +def parse_pic(pic_str: str) -> PicInfo: + info = PicInfo() + s = pic_str.upper().strip() + if not s: + return info + if s.startswith('S'): + info.signed = True + s = s[1:] + expanded = _expand_pic(s) + if expanded[0] == '9': + info.type = 'numeric' + if 'V' in expanded: + parts = expanded.split('V') + info.digits = parts[0].count('9') + info.decimal = parts[1].count('9') + else: + info.digits = expanded.count('9') + info.decimal = 0 + elif expanded[0] == 'X': + info.type = 'alphanumeric' + info.length = len(expanded) + elif expanded[0] == 'A': + info.type = 'alphabetic' + info.length = len(expanded) + elif expanded[0] in ('Z', '*', '$', '+', '-'): + info.type = 'numeric-edited' + info.digits = expanded.count('9') + if 'V' in expanded: + info.decimal = expanded.split('V')[1].count('9') + elif '.' in expanded: + info.decimal = expanded.split('.')[1].count('9') + info.length = len(expanded) + elif expanded.endswith('CR') or expanded.endswith('DB'): + info.type = 'numeric-edited' + stripped = expanded[:-2] + info.digits = stripped.count('9') + if 'V' in stripped: + info.decimal = stripped.split('V')[1].count('9') + elif '.' in stripped: + info.decimal = stripped.split('.')[1].count('9') + info.length = len(expanded) + else: + info.type = 'alphanumeric' + info.length = len(expanded) + return info + + +# 鈹€鈹€ DATA DIVISION 鍏ュ彛 鈹€鈹€ + +def parse_data_division(data_div_text: str) -> list[FieldDef]: + """??DATA DIVISION???FieldDef????PIC???""" + grammar = _get_grammar() + parser = Lark(grammar, parser='earley', lexer='dynamic') + tree = parser.parse(data_div_text) + + transformer = DataTransformer() + raw = transformer.transform(tree) + + result = [] + for r in raw: + pic = r.get('pic', '') + info = parse_pic(pic) if pic else None + f = FieldDef( + name=r['name'], + level=r['level'], + pic=pic, + pic_info=info, + is_filler=r.get('is_filler', False), + occurs_count=r.get('occurs', 0), + occurs_depending=r.get('occurs_depending'), + redefines=r.get('redefines'), + usage=r.get('usage'), + value=r.get('value'), + values=r.get('values'), + is_88=r.get('is_88', False), + parent=r.get('parent'), + section=r.get('section'), + ) + result.append(f) + return result + + +# 鈹€鈹€ FILE-CONTROL / FILE SECTION / OPEN 瑙f瀽 鈹€鈹€ + + +def parse_file_control(source: str) -> dict: + """?? FILE-CONTROL??? {?????: ?????}""" + m = re.search(r'FILE-CONTROL\.(.*?)(?=DATA\s+DIVISION|\Z)', source, re.DOTALL | re.IGNORECASE) + if not m: + return {} + fc = m.group(1) + result = {} + for m in re.finditer( + r'SELECT\s+(\w[\w-]*)\s+[^.]*?\bASSIGN\s+TO\s+(["\'])(.*?)\2', + fc, re.IGNORECASE + ): + result[m.group(1).upper()] = m.group(3).upper() + return result + + +def parse_file_section(source: str) -> dict: + """?? FILE SECTION??? {?????: [01?????...]}""" + m = re.search(r'FILE\s+SECTION\.(.*?)(?=WORKING-STORAGE\s+SECTION|LINKAGE\s+SECTION|\Z)', + source, re.DOTALL | re.IGNORECASE) + if not m: + return {} + fs = m.group(1) + result = {} + # ? FD ?????? FD ? + fd_blocks = re.split(r'\n\s*(?=FD\s+)', fs.strip()) + for block in fd_blocks: + m = re.match(r'FD\s+(\w[\w-]*)', block, re.IGNORECASE) + if not m: + continue + name = m.group(1).upper() + # ???????? 01 ???? + recs = re.findall(r'^\s*0{0,1}1\s+(\w[\w-]*)', block, re.MULTILINE) + result[name] = [r.upper() for r in recs] + return result + + +def scan_open_statements(source: str) -> dict: + """?? OPEN ????? {?????: 'INPUT'|'OUTPUT'|'I-O'}""" + dirs = {} + for m in re.finditer( + r'OPEN\s+((?:INPUT|OUTPUT|I-O)\s+[\w\s-]+' + r'(?:\s+(?:INPUT|OUTPUT|I-O)\s+[\w\s-]+)*)', + source, re.IGNORECASE + ): + full = m.group(1) + for seg_m in re.finditer( + r'(INPUT|OUTPUT|I-O)\s+([\w\s-]+)', full, re.IGNORECASE + ): + direction = seg_m.group(1).upper() + for fname in re.findall(r'\w[\w-]*', seg_m.group(2)): + if fname.upper() not in ('INPUT', 'OUTPUT', 'I-O'): + dirs[fname.upper()] = direction + return dirs diff --git a/hina/classifier.py b/hina/classifier.py index b972ae9..39b6416 100644 --- a/hina/classifier.py +++ b/hina/classifier.py @@ -96,7 +96,11 @@ def compute_confidence( return { "category": category, "confidence": confidence, + "method": "keyword", "source": "l1", + "features": [best[2]], + "required_tests": [], + "strategy_params": {"special_boundaries": [], "coverage_requirements": {"branch": 0.95, "paragraph": 1.0}}, "matches": matches, } @@ -107,7 +111,11 @@ def compute_confidence( return { "category": llm_category, "confidence": llm_confidence, + "method": "hybrid", "source": "llm", + "features": [], + "required_tests": [], + "strategy_params": {"special_boundaries": [], "coverage_requirements": {"branch": 0.95, "paragraph": 1.0}}, "matches": matches, } @@ -115,6 +123,10 @@ def compute_confidence( return { "category": "unknown", "confidence": 0.0, + "method": "none", "source": "unknown", + "features": [], + "required_tests": [], + "strategy_params": {"special_boundaries": [], "coverage_requirements": {"branch": 0.95, "paragraph": 1.0}}, "matches": [], }