"""通用测试数据生成工具函数模块。 """ from __future__ import annotations import pathlib import re from typing import Any def generate_zero_byte_file(path: str) -> None: """生成一个 0 字节的空文件。 自动创建父目录(如果不存在)。 参数 ---------- path : str 待创建的空文件路径。 """ p = pathlib.Path(path) p.parent.mkdir(parents=True, exist_ok=True) p.write_bytes(b"") def generate_minimal_records(fields: list[dict[str, Any]]) -> list[dict[str, Any]]: """为给定的字段定义生成 1 条正常记录(最小数据量)。 参数 ---------- fields : list[dict] 字段定义列表,每个字典可包含以下键(均为可选): - "name" : str 字段名,默认为 "FIELD_{i}" - "type" : str 类型: "numeric" / "string" / "date",默认 "string" - "length" : int 长度,字符串用,默认 10 - "default" : Any 默认值,优先使用 返回 ------- list[dict] 包含一条记录的列表,记录中每个字段的值为类型合理的默认值。 """ if not fields: return [{}] record: dict[str, Any] = {} for i, f in enumerate(fields): name = f.get("name", f"FIELD_{i}") if "default" in f: record[name] = f["default"] else: typ = f.get("type", "string") if typ == "numeric": record[name] = 0 elif typ == "date": record[name] = "0001-01-01" else: # string length = f.get("length", 10) record[name] = "A" * length return [record] def _parse_pic(pic: str) -> dict[str, Any]: """解析 COBOL PIC 子句,返回类型、位数、小数位等信息。 支持的格式: - S9(7)V99 signed, 7 整数位, 2 小数位 - 9(4) 无符号, 4 整数位 - S9(3) signed, 3 整数位 - 9(4)V9(2) 无符号, 4 整数位, 2 小数位 - X(10) 字符串, 10 字符 - 9(7)V99 无符号, 7 整数位, 2 小数位 参数 ---------- pic : str COBOL PIC 字符串,如 "S9(7)V99"。 返回 ------- dict 包含 type, digits, decimal, signed, total_digits 等信息的字典。 """ pic = pic.strip().upper() result: dict[str, Any] = { "type": "unknown", "digits": 0, "decimal": 0, "signed": False, "total_digits": 0, } # 字符串类型 if pic.startswith("X") or pic.startswith("A"): result["type"] = "string" m = re.match(r'[XA]\((\d+)\)', pic) if m: result["digits"] = int(m.group(1)) else: result["digits"] = 1 return result # 数值类型 if "9" in pic or pic.startswith("S"): result["type"] = "numeric" signed_match = re.match(r'S(.*)', pic) if signed_match: result["signed"] = True pic_body = signed_match.group(1) else: result["signed"] = False pic_body = pic # 解析整数和小数部分 # 9(7)V99 或 9(7)V9(2) v_match = re.match(r'9\((\d+)\)V9\((\d+)\)', pic_body) if v_match: result["digits"] = int(v_match.group(1)) result["decimal"] = int(v_match.group(2)) else: # 尝试 9(4) 或 9(7)V99 m2 = re.match(r'9\((\d+)\)', pic_body) if m2: result["digits"] = int(m2.group(1)) rest = pic_body[m2.end():] if rest.startswith("V"): dec_str = rest[1:] dm = re.match(r'9\((\d+)\)', dec_str) if dm: result["decimal"] = int(dm.group(1)) elif re.match(r'9+', dec_str): result["decimal"] = len(dec_str) # 处理简写: 9(7)V99 elif rest.startswith("V"): dec_part = rest[1:] dm = re.match(r'9\((\d+)\)', dec_part) if dm: result["decimal"] = int(dm.group(1)) elif re.match(r'9+', dec_part): result["decimal"] = len(dec_part) result["total_digits"] = result["digits"] + result["decimal"] return result return result def generate_boundary_values(pic: str) -> dict[str, Any]: """从 PIC 子句解析出最大值、最小值和溢出值。 参数 ---------- pic : str COBOL PIC 字符串,如 "S9(7)V99"。 返回 ------- dict { "max": 类型最大值, "min": 类型最小值, "overflow": 溢出值(超出最大位数的值), "zero": 0, "pic_info": 解析出的 PIC 信息, } """ info = _parse_pic(pic) if info["type"] == "string": length = info["digits"] return { "max": "X" * length, "min": "" if length == 0 else "A" + " " * (length - 1), "overflow": "X" * (length + 1) if length > 0 else "X", "zero": "" if length == 0 else " " * length, "pic_info": info, } if info["type"] == "numeric": digits = info["digits"] decimal = info["decimal"] signed = info["signed"] factor = 10 ** decimal total_digits = info.get("total_digits", digits + decimal) max_val = (10 ** total_digits - 1) / factor overflow_val = (10 ** (total_digits + 1)) / factor if signed: min_val = -max_val else: min_val = 0 return { "max": max_val, "min": min_val, "overflow": overflow_val, "zero": 0.0 if decimal > 0 else 0, "pic_info": info, } return { "max": None, "min": None, "overflow": None, "zero": None, "pic_info": info, } def generate_sorted_records( record_count: int = 10, key_field: str = "KEY", ) -> list[dict[str, Any]]: """按 KEY 升序生成记录。 参数 ---------- record_count : int 生成记录数,默认 10。 key_field : str 键字段名,默认 "KEY"。 返回 ------- list[dict] 已按 key_field 排序的记录列表。 """ if record_count < 1: raise ValueError(f"record_count 必须 >= 1,收到 {record_count}") records: list[dict[str, Any]] = [] for i in range(record_count): records.append({ key_field: f"KEY-{i:04d}", "DATA": f"sorted_data_{i}", "SEQ": i + 1, }) return records def generate_duplicate_keys( records: list[dict[str, Any]], key_field: str = "KEY", ) -> list[dict[str, Any]]: """在已有记录基础上,为每条记录追加一条或多条同键值记录。 适用于测试重复键处理逻辑(如 SORT MERGE / 去重检查)。 参数 ---------- records : list[dict] 原始记录列表。 key_field : str 作为重复键的字段名,默认 "KEY"。 返回 ------- list[dict] 追加了重复键记录的完整列表。 """ if not records: return [] duplicates: list[dict[str, Any]] = [] for rec in records: dup = dict(rec) dup[key_field] = rec[key_field] dup["DATA"] = rec.get("DATA", "") + "_DUP" dup["SEQ"] = rec.get("SEQ", 0) + 10000 duplicates.append(dup) return records + duplicates