From 0e7472598d3428b9ae626e8ba3ef9cf8a0d97058 Mon Sep 17 00:00:00 2001 From: NB-076 Date: Mon, 22 Jun 2026 13:52:56 +0800 Subject: [PATCH] =?UTF-8?q?fix:=20=E8=B7=A8=E6=96=87=E4=BB=B6KEY=E7=BA=A6?= =?UTF-8?q?=E6=9D=9F=20+=20PERFORM=E5=88=86=E6=94=AF=E7=BB=9F=E8=AE=A1=20+?= =?UTF-8?q?=20=E5=B9=B3=E9=9D=A2=E6=96=87=E4=BB=B6=E5=86=99=E5=85=A5?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 1. 跨文件KEY约束(修复) 匹配型程的M-KEY与D-KEY值不同导致匹配0条。 修复: generate_data后处理检测IF KEY比较, 前半记录对齐KEY值(8条匹配),后半保待差异(9条不匹配). 实际cobc运行验证: MATCHED=8, PASS. 2. extract_structure PERFORM分支统计(修复) _walk函数未添加BrPerform决策点, total_branches缺失. 修复: 为PERFORM UNTIL/VARYING决策点添加2分支(Enter/Skip). 之前total_branches=0,现在=2. 3. flatfile.py(新增) COBOL固定长平面文件写入器. - analyze_fd_layout(): 从COBOL源码自动解析文件布局 - write_flat_file(): 生成为COBOL可直接读取的二进制格式 Co-Authored-By: Claude --- cobol_testgen/__init__.py | 22 ++++++ cobol_testgen/flatfile.py | 145 ++++++++++++++++++++++++++++++++++++++ 2 files changed, 167 insertions(+) create mode 100644 cobol_testgen/flatfile.py diff --git a/cobol_testgen/__init__.py b/cobol_testgen/__init__.py index e16aab1..ef7e435 100644 --- a/cobol_testgen/__init__.py +++ b/cobol_testgen/__init__.py @@ -404,6 +404,13 @@ def extract_structure(cobol_source: str) -> dict: for child in node.children: _walk(child, counter) elif isinstance(node, BrPerform): + if node.condition and node.perf_type in ('until', 'para_until', 'varying', 'para_varying'): + counter[0] += 1 + decision_points.append({ + "id": counter[0], "kind": "PERFORM", + "label": str(node.condition)[:80], "branches": 2, + }) + total_branches += 2 _walk(node.body_seq, counter) elif isinstance(node, BrSearch): _walk(node.at_end_seq, counter) @@ -688,6 +695,21 @@ def generate_data(cobol_source: str, structure: dict = None) -> list[dict]: branch_paths = [(_filter_stop(c), a) for c, a in branch_paths] records, kept_paths = generate_records(branch_paths, fields_dict, assignments, file_sec=file_sec) + + # Cross-file KEY alignment for matching programs + if records: + import re as _re + proc_upper = (proc_div or "").upper() + for m in _re.finditer(r'IF\s+(\w[\w-]*)\s*[=<>]\s*(\w[\w-]*)', proc_upper): + lhs, rhs = m.group(1), m.group(2) + lhs_in = any(lhs == f['name'] for f in fields_dict) + rhs_in = any(rhs == f['name'] for f in fields_dict) + if lhs_in and rhs_in and any(lhs in r for r in records) and any(rhs in r for r in records): + half = max(1, len(records) // 2) + for i, rec in enumerate(records): + if lhs in rec and rhs in rec and i < half: + rec[rhs] = rec[lhs] + return records diff --git a/cobol_testgen/flatfile.py b/cobol_testgen/flatfile.py new file mode 100644 index 0000000..725ba00 --- /dev/null +++ b/cobol_testgen/flatfile.py @@ -0,0 +1,145 @@ +"""Flat file I/O — write fixed-length records from COBOL FD definitions""" +import struct, re +from pathlib import Path +from typing import Any + +def analyze_fd_layout(source_text: str) -> dict[str, dict]: + """From preprocessed COBOL source, extract FD file layouts. + + Returns: {assign_to_filename: { + "fd_name": str, + "records": [{record_name, record_length, fields: [{name, offset, length, type, pic}]}], + "direction": "INPUT"|"OUTPUT"|"I-O" + }} + """ + from .read import parse_file_control, parse_file_section, parse_data_division, extract_data_division, scan_open_statements + + fc = parse_file_control(source_text) if source_text else {} + fs = parse_file_section(source_text) if source_text else {} + ops = scan_open_statements(source_text) if source_text else {} + dd = extract_data_division(source_text) + all_fields = parse_data_division(dd) if dd else [] + + layouts = {} + for fd_name, rec_names in fs.items(): + records = [] + for rec_name in rec_names: + children = [] + found = False + rec_level = None + offset = 0 + for f in all_fields: + if f.name == rec_name: + found = True + rec_level = f.level + continue + if found: + if f.level is not None and f.level <= rec_level: + break + if f.is_88 or f.is_filler: + continue + pi = f.pic_info + if pi: + length = (pi.digits + pi.decimal) if pi.type == "numeric" else (pi.length or 0) + else: + length = 0 + ftype = pi.type if pi else "unknown" + children.append({ + "name": f.name, "pic": str(f.pic or ""), + "type": ftype, "length": length, "offset": offset, + }) + offset += length + + records.append({"record_name": rec_name, "fields": children, "record_length": offset}) + + assign_to = fc.get(fd_name, {}).get("assign_to", fd_name) + layouts[assign_to] = { + "fd_name": fd_name, "records": records, + "direction": ops.get(fd_name, "INPUT"), + } + return layouts + + +def _format_value(value: Any, field: dict) -> bytes: + """Format a value for COBOL fixed-length storage.""" + ftype = field["type"] + length = field["length"] + val = str(value) if value is not None else "" + + if ftype == "numeric": + # COBOL numeric DISPLAY format: right-justified, zero-padded + try: + num = int(float(val)) if val else 0 + except (ValueError, TypeError): + num = 0 + num = abs(num) + s = str(num).zfill(length) + if len(s) > length: + s = s[-length:] + return s.encode("ascii") + else: + # Alphanumeric: left-justified, space-padded + s = val.ljust(length)[:length] + return s.encode("ascii", errors="replace") + + +def _format_comp3(value: Any, field: dict) -> bytes: + """Format as COMP-3 (packed decimal).""" + length = field["length"] + val = str(value) if value else "0" + try: + num_str = val.replace(".", "").lstrip("0") or "0" + if num_str.startswith("-"): + sign_nibble = 0x0D + num_str = num_str[1:] + else: + sign_nibble = 0x0C + if len(num_str) % 2 == 1: + num_str = "0" + num_str + result = [] + for i in range(0, len(num_str), 2): + result.append(int(num_str[i:i+2], 16)) + result[-1] = (result[-1] & 0xF0) | sign_nibble + return bytes(result) + except (ValueError, TypeError): + return bytes([0x00] * (length // 2 + 1 if length % 2 else length // 2)) + + +def write_flat_file(records: list[dict], layout: dict, outpath: Path): + """Write generated records as a COBOL-compatible fixed-length flat file. + + Args: + records: list of dicts with field_name: value + layout: file layout from analyze_fd_layout() + outpath: output file path + """ + if not layout or not layout.get("records"): + return + + rec = layout["records"][0] # Use first record format + rec_len = rec["record_length"] + if rec_len == 0: + return + + with open(outpath, "wb") as f: + for row in records: + buf = bytearray(rec_len) + for field in rec["fields"]: + val = row.get(field["name"], "") + formatted = _format_value(val, field) + end = min(field["offset"] + len(formatted), rec_len) + buf[field["offset"]:end] = formatted[:end - field["offset"]] + f.write(buf) + + +def write_all_files(records: list[dict], source_text: str, outdir: Path, prefix: str = ""): + """Analyze source, write flat files for all FDs that have data.""" + layouts = analyze_fd_layout(source_text) + written = [] + for filename, layout in layouts.items(): + if layout["direction"] == "OUTPUT": + continue # Don't write output files (COBOL will create them) + outpath = outdir / (prefix + filename) + write_flat_file(records, layout, outpath) + written.append((filename, outpath)) + return written