Files
cobol-java-v3/tests/test_golden.py
T

227 lines
9.2 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
Golden tests — 对接真实 COBOL 信用卡月结系统
数据源: D:\cobol-java\jcl-cobol-git\
验证目标: 28 transactions → 20 valid + 8 rejected → 6 cards → ¥48,250.20 total
"""
import sys, os
sys.path.insert(0, r"D:\cobol-java\v3-gstack-code-gen")
from pathlib import Path
from data.field_tree import Field, FieldTree
from comparator.cobol_binary_reader import CobolBinaryReader
from comparator.normalizer import Normalizer
from comparator.field_compare import compare_field
from comparator.aligner import align_records
from preprocessor import CopybookPreprocessor
GOLDEN = Path(r"D:\cobol-java\jcl-cobol-git")
# ── Test 1: TXCPY COPYBOOK 结构验证 ──
def test_txcpy_field_count():
"""TXCPY COPYBOOK defines 6 top-level fields."""
txcpy = FieldTree(fields=[
Field(name="TX-CARD-NO", level=5, pic="9(16)", usage="DISPLAY", offset=0, length=16),
Field(name="TX-DATE", level=5, pic="9(8)", usage="DISPLAY", offset=16, length=8),
Field(name="TX-TYPE", level=5, pic="X", usage="DISPLAY", offset=24, length=1),
Field(name="TX-AMOUNT", level=5, pic="S9(9)V99", usage="DISPLAY", offset=25, length=11),
Field(name="TX-CURRENCY", level=5, pic="X(3)", usage="DISPLAY", offset=36, length=3),
Field(name="TX-MERCHANT", level=5, pic="X(20)", usage="DISPLAY", offset=39, length=20),
], copybook_name="TXCPY")
flat = txcpy.flatten()
assert len(flat) == 6, f"Expected 6 fields, got {len(flat)}"
assert flat["TX-CARD-NO"].length == 16
assert flat["TX-AMOUNT"].length == 11
# ── Test 2: 二进制文件读取 ──
def test_read_transactions():
"""读取交易数据,验证第一条记录结构正确。"""
txcpy = FieldTree(fields=[
Field(name="TX-CARD-NO", level=5, pic="9(16)", usage="DISPLAY", offset=0, length=16),
Field(name="TX-DATE", level=5, pic="9(8)", usage="DISPLAY", offset=16, length=8),
Field(name="TX-TYPE", level=5, pic="X", usage="DISPLAY", offset=24, length=1),
Field(name="TX-AMOUNT", level=5, pic="S9(9)V99", usage="DISPLAY", offset=25, length=12),
Field(name="TX-CURRENCY", level=5, pic="X(3)", usage="DISPLAY", offset=37, length=3),
Field(name="TX-MERCHANT", level=5, pic="X(20)", usage="DISPLAY", offset=40, length=20),
], copybook_name="TXCPY")
reader = CobolBinaryReader()
records = reader.read(str(GOLDEN / "data/input/transactions.dat"), txcpy)
# 文件 2044 bytes, record size ≈ 73, 大约 28 条记录
assert len(records) >= 28, f"Expected >=28, got {len(records)}"
r0 = records[0]
assert r0["TX-CARD-NO"] == "6222021234567800"
assert r0["TX-DATE"] == "20260501"
assert r0["TX-TYPE"] == "P"
# ── Test 3: COMP-3 RATE 解析 ──
def test_comp3_rate():
"""验证利率表 COMP-3 编码。"""
n = Normalizer()
data = (GOLDEN / "data/input/rate.dat").read_bytes()
assert len(data) == 24, "2 records × 12 bytes"
# Record 1: Cash rate
assert n.normalize_comp3(data[1:4]) == "5" # 0.0005
assert chr(data[0]) == "C"
# Record 2: Overdue rate
assert n.normalize_comp3(data[13:16]) == "500" # 0.0500
assert chr(data[12]) == "O"
# ── Test 4: 管道输出一致性 ──
def test_pipeline_counts():
"""验证 28→20+8→6 的管道计数。"""
validated = len((GOLDEN / "data/work/validated_tx.dat").read_text().splitlines())
rejected = len((GOLDEN / "data/output/rejected_tx.dat").read_text().splitlines())
assert validated == 20, f"Expected 20 valid, got {validated}"
assert rejected == 8, f"Expected 8 rejected, got {rejected}"
def test_error_report_coverage():
"""验证全部 7 条校验规则被触发。"""
errors = (GOLDEN / "data/output/error_report.dat").read_text()
expected_errors = ["INVALID-CARD", "FROZEN-CARD", "INVALID-MERCHANT",
"INVALID-AMOUNT", "INVALID-REFUND", "OUT-OF-MONTH", "MEMBER-NOT-FOUND"]
for e in expected_errors:
assert e in errors, f"Missing error: {e}"
def test_grand_total():
"""验证全局合计金额。"""
summary = (GOLDEN / "data/output/summary_report.dat").read_text()
assert "AMOUNT: 48250.20" in summary
assert "INTEREST: 300.00" in summary
assert "FEE: 800.00" in summary
assert "CARDS:00006" in summary
# ── Test 5: COPY REPLACING 展开 ──
def test_copy_replacing_datesub():
"""验证 DATESUB COPYBOOK 的 REPLACING 展开。"""
pp = CopybookPreprocessor(paths=[str(GOLDEN / "copybooks")])
source = " COPY DATESUB REPLACING ==:TAG:== BY ==WS-RUN==."
result = pp.expand(source)
assert "WS-RUN" in result or "DATESUB" in result
# ── Test 6: 比对引擎集成 ──
def test_compare_pipeline_output():
"""验证比对引擎可以处理同源数据的 self-compare(应全部 PASS)。"""
reader = CobolBinaryReader()
txcpy = FieldTree(fields=[
Field(name="TX-CARD-NO", level=5, pic="9(16)", usage="DISPLAY", offset=0, length=16),
Field(name="TX-DATE", level=5, pic="9(8)", usage="DISPLAY", offset=16, length=8),
Field(name="TX-TYPE", level=5, pic="X", usage="DISPLAY", offset=24, length=1),
Field(name="TX-AMOUNT", level=5, pic="S9(9)V99", usage="DISPLAY", offset=25, length=11),
Field(name="TX-CURRENCY", level=5, pic="X(3)", usage="DISPLAY", offset=36, length=3),
Field(name="TX-MERCHANT", level=5, pic="X(20)", usage="DISPLAY", offset=39, length=20),
], copybook_name="TXCPY")
records = reader.read(str(GOLDEN / "data/input/transactions.dat"), txcpy)
# Self-compare: 同源数据应对齐后全 PASS
aligned = align_records(records, records, key_field="TX-CARD-NO")
assert len(aligned) == len(records), f"Alignment lost records: {len(aligned)} vs {len(records)}"
# 每个字段 self-compare 应 PASS
for c, j, _ in aligned:
for key in ["TX-TYPE", "TX-CURRENCY", "TX-MERCHANT"]:
fr = compare_field(key, str(c.get(key, "")), str(j.get(key, "")), "string")
assert fr.status == "PASS", f"Self-compare failed: {key}"
# ── JCL Tests ──
def test_jcl_parse():
"""验证 JCL 解析器正确解析 CREDIT25.jcl"""
import sys
sys.path.insert(0, str(GOLDEN.parent / "v3-gstack-code-gen"))
from jcl.parser import parse_jcl
jcl_path = str(GOLDEN / "jcl" / "CREDIT25.jcl")
job = parse_jcl(jcl_path)
assert job is not None, "JCL parse returned None"
assert job.job_name == "CREDIT25", f"Expected CREDIT25, got {job.job_name}"
assert len(job.steps) == 4, f"Expected 4 steps, got {len(job.steps)}"
# Step 1: SORT
assert job.steps[0].step_name == "STEP1"
assert job.steps[0].program == "SORT"
assert len(job.steps[0].dd_entries) == 3 # SORTIN, SORTOUT, SYSIN
assert any(dd.dd_name.upper() == "SORTIN" for dd in job.steps[0].dd_entries)
assert any(dd.dd_name.upper() == "SORTOUT" for dd in job.steps[0].dd_entries)
# Step 2: CRDVAL with COND
assert job.steps[1].step_name == "STEP2"
assert job.steps[1].program == "CRDVAL"
assert job.steps[1].cond is not None
assert job.steps[1].cond.code == 0
assert job.steps[1].cond.operator == "NE"
assert len(job.steps[1].dd_entries) == 6 # TRANSIN, MEMBER, VALIDOUT, REJECT, REPORTERR, SYSOUT
# Step 3: CRDCALC with COND
assert job.steps[2].step_name == "STEP3"
assert job.steps[2].program == "CRDCALC"
assert job.steps[2].cond.code == 0
# Step 4: CRDRPT with COND
assert job.steps[3].step_name == "STEP4"
assert job.steps[3].program == "CRDRPT"
assert job.steps[3].cond.code == 0
def test_jcl_dd_mapping():
"""验证 JCL DD 语句正确提取"""
import sys
sys.path.insert(0, str(GOLDEN.parent / "v3-gstack-code-gen"))
from jcl.parser import parse_jcl
job = parse_jcl(str(GOLDEN / "jcl" / "CREDIT25.jcl"))
# Check DD names for CRDVAL step
crdval_dds = {dd.dd_name.upper() for dd in job.steps[1].dd_entries}
assert crdval_dds >= {"TRANSIN", "MEMBER", "VALIDOUT", "REJECT", "REPORTERR"}, \
f"Missing DDs: {crdval_dds}"
# Check DD names for CRDCALC step
crdcalc_dds = {dd.dd_name.upper() for dd in job.steps[2].dd_entries}
assert crdcalc_dds >= {"VALIDIN", "RATE", "CALCOUT"}, \
f"Missing DDs: {crdcalc_dds}"
# Check DD names for CRDRPT step
crdrpt_dds = {dd.dd_name.upper() for dd in job.steps[3].dd_entries}
assert crdrpt_dds >= {"BILLING", "STMT", "SUMMARY"}, \
f"Missing DDs: {crdrpt_dds}"
def test_jcl_job_info():
"""验证 JCL Job 基本信息"""
import sys
sys.path.insert(0, str(GOLDEN.parent / "v3-gstack-code-gen"))
from jcl.parser import parse_jcl
job = parse_jcl(str(GOLDEN / "jcl" / "CREDIT25.jcl"))
assert job.job_name == "CREDIT25"
assert len(job.steps) == 4
programs = [s.program for s in job.steps]
assert programs == ["SORT", "CRDVAL", "CRDCALC", "CRDRPT"], \
f"Unexpected program order: {programs}"
# Verify all COND are (0,NE) — run step if previous step succeeded
for i in [1, 2, 3]:
assert job.steps[i].cond is not None, f"Step {i+1} missing COND"
assert job.steps[i].cond.code == 0
assert job.steps[i].cond.operator == "NE"