diff --git a/cobol_testgen/design.py b/cobol_testgen/design.py index 5f1ac15..a346b36 100644 --- a/cobol_testgen/design.py +++ b/cobol_testgen/design.py @@ -401,10 +401,12 @@ def _children_of(group_name: str, fields: list) -> list: def _make_numeric_value(idx: int, record_num: int, total_digits: int) -> str: + max_val = 10 ** total_digits - 1 for step in (100, 10, 1): val = idx * step + record_num if val < 10 ** total_digits: - return str(val).zfill(total_digits) + return str(min(val, max_val)).zfill(total_digits) + return str(min(record_num, max_val)).zfill(total_digits) return str(record_num).zfill(total_digits) diff --git a/cobol_testgen/flatfile.py b/cobol_testgen/flatfile.py index 725ba00..dad42fe 100644 --- a/cobol_testgen/flatfile.py +++ b/cobol_testgen/flatfile.py @@ -1,17 +1,10 @@ """Flat file I/O — write fixed-length records from COBOL FD definitions""" -import struct, re +import re, struct from pathlib import Path from typing import Any def analyze_fd_layout(source_text: str) -> dict[str, dict]: - """From preprocessed COBOL source, extract FD file layouts. - - Returns: {assign_to_filename: { - "fd_name": str, - "records": [{record_name, record_length, fields: [{name, offset, length, type, pic}]}], - "direction": "INPUT"|"OUTPUT"|"I-O" - }} - """ + """From preprocessed COBOL source, extract FD file layouts.""" from .read import parse_file_control, parse_file_section, parse_data_division, extract_data_division, scan_open_statements fc = parse_file_control(source_text) if source_text else {} @@ -49,7 +42,6 @@ def analyze_fd_layout(source_text: str) -> dict[str, dict]: "type": ftype, "length": length, "offset": offset, }) offset += length - records.append({"record_name": rec_name, "fields": children, "record_length": offset}) assign_to = fc.get(fd_name, {}).get("assign_to", fd_name) @@ -60,6 +52,22 @@ def analyze_fd_layout(source_text: str) -> dict[str, dict]: return layouts +def select_records_for_file(records: list[dict], layout: dict) -> list[dict]: + """Extract and route only the fields belonging to this file layout.""" + if not layout or not layout.get("records"): + return records + field_names = set() + for rec in layout["records"]: + for f in rec["fields"]: + field_names.add(f["name"]) + result = [] + for rec in records: + row = {k: v for k, v in rec.items() if k in field_names} + if row: + result.append(row) + return result if result else records + + def _format_value(value: Any, field: dict) -> bytes: """Format a value for COBOL fixed-length storage.""" ftype = field["type"] @@ -67,64 +75,41 @@ def _format_value(value: Any, field: dict) -> bytes: val = str(value) if value is not None else "" if ftype == "numeric": - # COBOL numeric DISPLAY format: right-justified, zero-padded try: num = int(float(val)) if val else 0 except (ValueError, TypeError): num = 0 num = abs(num) + # Truncate to fit PIC digits + max_val = 10 ** length - 1 + if num > max_val: + num = max_val s = str(num).zfill(length) if len(s) > length: s = s[-length:] return s.encode("ascii") else: - # Alphanumeric: left-justified, space-padded s = val.ljust(length)[:length] return s.encode("ascii", errors="replace") -def _format_comp3(value: Any, field: dict) -> bytes: - """Format as COMP-3 (packed decimal).""" - length = field["length"] - val = str(value) if value else "0" - try: - num_str = val.replace(".", "").lstrip("0") or "0" - if num_str.startswith("-"): - sign_nibble = 0x0D - num_str = num_str[1:] - else: - sign_nibble = 0x0C - if len(num_str) % 2 == 1: - num_str = "0" + num_str - result = [] - for i in range(0, len(num_str), 2): - result.append(int(num_str[i:i+2], 16)) - result[-1] = (result[-1] & 0xF0) | sign_nibble - return bytes(result) - except (ValueError, TypeError): - return bytes([0x00] * (length // 2 + 1 if length % 2 else length // 2)) - - -def write_flat_file(records: list[dict], layout: dict, outpath: Path): - """Write generated records as a COBOL-compatible fixed-length flat file. - - Args: - records: list of dicts with field_name: value - layout: file layout from analyze_fd_layout() - outpath: output file path - """ +def write_flat_file(records: list[dict], layout: dict, outpath: Path, field_filter: set = None): + """Write records as a COBOL-compatible fixed-length flat file.""" if not layout or not layout.get("records"): return - - rec = layout["records"][0] # Use first record format + rec = layout["records"][0] rec_len = rec["record_length"] if rec_len == 0: return + rec_fields = rec["fields"] + if field_filter: + rec_fields = [f for f in rec_fields if f["name"] in field_filter] + with open(outpath, "wb") as f: for row in records: buf = bytearray(rec_len) - for field in rec["fields"]: + for field in rec_fields: val = row.get(field["name"], "") formatted = _format_value(val, field) end = min(field["offset"] + len(formatted), rec_len) @@ -133,13 +118,19 @@ def write_flat_file(records: list[dict], layout: dict, outpath: Path): def write_all_files(records: list[dict], source_text: str, outdir: Path, prefix: str = ""): - """Analyze source, write flat files for all FDs that have data.""" + """Analyze source, write flat files for all INPUT FDs.""" layouts = analyze_fd_layout(source_text) written = [] for filename, layout in layouts.items(): if layout["direction"] == "OUTPUT": - continue # Don't write output files (COBOL will create them) - outpath = outdir / (prefix + filename) - write_flat_file(records, layout, outpath) - written.append((filename, outpath)) + continue + fnames = set() + for rec in layout["records"]: + for f in rec["fields"]: + fnames.add(f["name"]) + filtered = [{k: v for k, v in r.items() if k in fnames} for r in records] + if filtered and any(v for row in filtered for v in row.values()): + outpath = outdir / (prefix + filename) + write_flat_file(records, layout, outpath) + written.append((filename, outpath, len([r for r in filtered if any(v for v in r.values())]))) return written