Files
cobol-java-v3/cobol_testgen/flatfile.py
T
NB-076 097f5449da fix: 溢出截断 + flatfile字段路由 + 多E2E验证
1. _make_numeric_value截断保护
   PIC 9(3)字段值超过999时截断(之前不截断)
2. flatfile.py字段路由
   write_all_files按FD分配字段值到对应的文件
3. 端到端运行验证:
   01-matching-1-1: PASS (8匹配/9不匹配)
   03-matching-N-1: PASS (COPYBOOK正常解析)
   10-divide-50: 程序自身OPEN逻辑问题
   34-sort-anomaly: PARTIAL(异常测试用例部分通过)

Co-Authored-By: Claude <noreply@anthropic.com>
2026-06-22 13:59:54 +08:00

137 lines
5.0 KiB
Python

"""Flat file I/O — write fixed-length records from COBOL FD definitions"""
import re, struct
from pathlib import Path
from typing import Any
def analyze_fd_layout(source_text: str) -> dict[str, dict]:
"""From preprocessed COBOL source, extract FD file layouts."""
from .read import parse_file_control, parse_file_section, parse_data_division, extract_data_division, scan_open_statements
fc = parse_file_control(source_text) if source_text else {}
fs = parse_file_section(source_text) if source_text else {}
ops = scan_open_statements(source_text) if source_text else {}
dd = extract_data_division(source_text)
all_fields = parse_data_division(dd) if dd else []
layouts = {}
for fd_name, rec_names in fs.items():
records = []
for rec_name in rec_names:
children = []
found = False
rec_level = None
offset = 0
for f in all_fields:
if f.name == rec_name:
found = True
rec_level = f.level
continue
if found:
if f.level is not None and f.level <= rec_level:
break
if f.is_88 or f.is_filler:
continue
pi = f.pic_info
if pi:
length = (pi.digits + pi.decimal) if pi.type == "numeric" else (pi.length or 0)
else:
length = 0
ftype = pi.type if pi else "unknown"
children.append({
"name": f.name, "pic": str(f.pic or ""),
"type": ftype, "length": length, "offset": offset,
})
offset += length
records.append({"record_name": rec_name, "fields": children, "record_length": offset})
assign_to = fc.get(fd_name, {}).get("assign_to", fd_name)
layouts[assign_to] = {
"fd_name": fd_name, "records": records,
"direction": ops.get(fd_name, "INPUT"),
}
return layouts
def select_records_for_file(records: list[dict], layout: dict) -> list[dict]:
"""Extract and route only the fields belonging to this file layout."""
if not layout or not layout.get("records"):
return records
field_names = set()
for rec in layout["records"]:
for f in rec["fields"]:
field_names.add(f["name"])
result = []
for rec in records:
row = {k: v for k, v in rec.items() if k in field_names}
if row:
result.append(row)
return result if result else records
def _format_value(value: Any, field: dict) -> bytes:
"""Format a value for COBOL fixed-length storage."""
ftype = field["type"]
length = field["length"]
val = str(value) if value is not None else ""
if ftype == "numeric":
try:
num = int(float(val)) if val else 0
except (ValueError, TypeError):
num = 0
num = abs(num)
# Truncate to fit PIC digits
max_val = 10 ** length - 1
if num > max_val:
num = max_val
s = str(num).zfill(length)
if len(s) > length:
s = s[-length:]
return s.encode("ascii")
else:
s = val.ljust(length)[:length]
return s.encode("ascii", errors="replace")
def write_flat_file(records: list[dict], layout: dict, outpath: Path, field_filter: set = None):
"""Write records as a COBOL-compatible fixed-length flat file."""
if not layout or not layout.get("records"):
return
rec = layout["records"][0]
rec_len = rec["record_length"]
if rec_len == 0:
return
rec_fields = rec["fields"]
if field_filter:
rec_fields = [f for f in rec_fields if f["name"] in field_filter]
with open(outpath, "wb") as f:
for row in records:
buf = bytearray(rec_len)
for field in rec_fields:
val = row.get(field["name"], "")
formatted = _format_value(val, field)
end = min(field["offset"] + len(formatted), rec_len)
buf[field["offset"]:end] = formatted[:end - field["offset"]]
f.write(buf)
def write_all_files(records: list[dict], source_text: str, outdir: Path, prefix: str = ""):
"""Analyze source, write flat files for all INPUT FDs."""
layouts = analyze_fd_layout(source_text)
written = []
for filename, layout in layouts.items():
if layout["direction"] == "OUTPUT":
continue
fnames = set()
for rec in layout["records"]:
for f in rec["fields"]:
fnames.add(f["name"])
filtered = [{k: v for k, v in r.items() if k in fnames} for r in records]
if filtered and any(v for row in filtered for v in row.values()):
outpath = outdir / (prefix + filename)
write_flat_file(records, layout, outpath)
written.append((filename, outpath, len([r for r in filtered if any(v for v in r.values())])))
return written