0e7472598d
1. 跨文件KEY约束(修复) 匹配型程的M-KEY与D-KEY值不同导致匹配0条。 修复: generate_data后处理检测IF KEY比较, 前半记录对齐KEY值(8条匹配),后半保待差异(9条不匹配). 实际cobc运行验证: MATCHED=8, PASS. 2. extract_structure PERFORM分支统计(修复) _walk函数未添加BrPerform决策点, total_branches缺失. 修复: 为PERFORM UNTIL/VARYING决策点添加2分支(Enter/Skip). 之前total_branches=0,现在=2. 3. flatfile.py(新增) COBOL固定长平面文件写入器. - analyze_fd_layout(): 从COBOL源码自动解析文件布局 - write_flat_file(): 生成为COBOL可直接读取的二进制格式 Co-Authored-By: Claude <noreply@anthropic.com>
146 lines
5.2 KiB
Python
146 lines
5.2 KiB
Python
"""Flat file I/O — write fixed-length records from COBOL FD definitions"""
|
|
import struct, re
|
|
from pathlib import Path
|
|
from typing import Any
|
|
|
|
def analyze_fd_layout(source_text: str) -> dict[str, dict]:
|
|
"""From preprocessed COBOL source, extract FD file layouts.
|
|
|
|
Returns: {assign_to_filename: {
|
|
"fd_name": str,
|
|
"records": [{record_name, record_length, fields: [{name, offset, length, type, pic}]}],
|
|
"direction": "INPUT"|"OUTPUT"|"I-O"
|
|
}}
|
|
"""
|
|
from .read import parse_file_control, parse_file_section, parse_data_division, extract_data_division, scan_open_statements
|
|
|
|
fc = parse_file_control(source_text) if source_text else {}
|
|
fs = parse_file_section(source_text) if source_text else {}
|
|
ops = scan_open_statements(source_text) if source_text else {}
|
|
dd = extract_data_division(source_text)
|
|
all_fields = parse_data_division(dd) if dd else []
|
|
|
|
layouts = {}
|
|
for fd_name, rec_names in fs.items():
|
|
records = []
|
|
for rec_name in rec_names:
|
|
children = []
|
|
found = False
|
|
rec_level = None
|
|
offset = 0
|
|
for f in all_fields:
|
|
if f.name == rec_name:
|
|
found = True
|
|
rec_level = f.level
|
|
continue
|
|
if found:
|
|
if f.level is not None and f.level <= rec_level:
|
|
break
|
|
if f.is_88 or f.is_filler:
|
|
continue
|
|
pi = f.pic_info
|
|
if pi:
|
|
length = (pi.digits + pi.decimal) if pi.type == "numeric" else (pi.length or 0)
|
|
else:
|
|
length = 0
|
|
ftype = pi.type if pi else "unknown"
|
|
children.append({
|
|
"name": f.name, "pic": str(f.pic or ""),
|
|
"type": ftype, "length": length, "offset": offset,
|
|
})
|
|
offset += length
|
|
|
|
records.append({"record_name": rec_name, "fields": children, "record_length": offset})
|
|
|
|
assign_to = fc.get(fd_name, {}).get("assign_to", fd_name)
|
|
layouts[assign_to] = {
|
|
"fd_name": fd_name, "records": records,
|
|
"direction": ops.get(fd_name, "INPUT"),
|
|
}
|
|
return layouts
|
|
|
|
|
|
def _format_value(value: Any, field: dict) -> bytes:
|
|
"""Format a value for COBOL fixed-length storage."""
|
|
ftype = field["type"]
|
|
length = field["length"]
|
|
val = str(value) if value is not None else ""
|
|
|
|
if ftype == "numeric":
|
|
# COBOL numeric DISPLAY format: right-justified, zero-padded
|
|
try:
|
|
num = int(float(val)) if val else 0
|
|
except (ValueError, TypeError):
|
|
num = 0
|
|
num = abs(num)
|
|
s = str(num).zfill(length)
|
|
if len(s) > length:
|
|
s = s[-length:]
|
|
return s.encode("ascii")
|
|
else:
|
|
# Alphanumeric: left-justified, space-padded
|
|
s = val.ljust(length)[:length]
|
|
return s.encode("ascii", errors="replace")
|
|
|
|
|
|
def _format_comp3(value: Any, field: dict) -> bytes:
|
|
"""Format as COMP-3 (packed decimal)."""
|
|
length = field["length"]
|
|
val = str(value) if value else "0"
|
|
try:
|
|
num_str = val.replace(".", "").lstrip("0") or "0"
|
|
if num_str.startswith("-"):
|
|
sign_nibble = 0x0D
|
|
num_str = num_str[1:]
|
|
else:
|
|
sign_nibble = 0x0C
|
|
if len(num_str) % 2 == 1:
|
|
num_str = "0" + num_str
|
|
result = []
|
|
for i in range(0, len(num_str), 2):
|
|
result.append(int(num_str[i:i+2], 16))
|
|
result[-1] = (result[-1] & 0xF0) | sign_nibble
|
|
return bytes(result)
|
|
except (ValueError, TypeError):
|
|
return bytes([0x00] * (length // 2 + 1 if length % 2 else length // 2))
|
|
|
|
|
|
def write_flat_file(records: list[dict], layout: dict, outpath: Path):
|
|
"""Write generated records as a COBOL-compatible fixed-length flat file.
|
|
|
|
Args:
|
|
records: list of dicts with field_name: value
|
|
layout: file layout from analyze_fd_layout()
|
|
outpath: output file path
|
|
"""
|
|
if not layout or not layout.get("records"):
|
|
return
|
|
|
|
rec = layout["records"][0] # Use first record format
|
|
rec_len = rec["record_length"]
|
|
if rec_len == 0:
|
|
return
|
|
|
|
with open(outpath, "wb") as f:
|
|
for row in records:
|
|
buf = bytearray(rec_len)
|
|
for field in rec["fields"]:
|
|
val = row.get(field["name"], "")
|
|
formatted = _format_value(val, field)
|
|
end = min(field["offset"] + len(formatted), rec_len)
|
|
buf[field["offset"]:end] = formatted[:end - field["offset"]]
|
|
f.write(buf)
|
|
|
|
|
|
def write_all_files(records: list[dict], source_text: str, outdir: Path, prefix: str = ""):
|
|
"""Analyze source, write flat files for all FDs that have data."""
|
|
layouts = analyze_fd_layout(source_text)
|
|
written = []
|
|
for filename, layout in layouts.items():
|
|
if layout["direction"] == "OUTPUT":
|
|
continue # Don't write output files (COBOL will create them)
|
|
outpath = outdir / (prefix + filename)
|
|
write_flat_file(records, layout, outpath)
|
|
written.append((filename, outpath))
|
|
return written
|