227 lines
9.2 KiB
Python
227 lines
9.2 KiB
Python
"""
|
||
Golden tests — 对接真实 COBOL 信用卡月结系统
|
||
数据源: D:\cobol-java\jcl-cobol-git\
|
||
验证目标: 28 transactions → 20 valid + 8 rejected → 6 cards → ¥48,250.20 total
|
||
"""
|
||
import sys, os
|
||
sys.path.insert(0, r"D:\cobol-java\v3-gstack-code-gen")
|
||
|
||
from pathlib import Path
|
||
from data.field_tree import Field, FieldTree
|
||
from comparator.cobol_binary_reader import CobolBinaryReader
|
||
from comparator.normalizer import Normalizer
|
||
from comparator.field_compare import compare_field
|
||
from comparator.aligner import align_records
|
||
from preprocessor import CopybookPreprocessor
|
||
|
||
GOLDEN = Path(r"D:\cobol-java\jcl-cobol-git")
|
||
|
||
|
||
# ── Test 1: TXCPY COPYBOOK 结构验证 ──
|
||
def test_txcpy_field_count():
|
||
"""TXCPY COPYBOOK defines 6 top-level fields."""
|
||
txcpy = FieldTree(fields=[
|
||
Field(name="TX-CARD-NO", level=5, pic="9(16)", usage="DISPLAY", offset=0, length=16),
|
||
Field(name="TX-DATE", level=5, pic="9(8)", usage="DISPLAY", offset=16, length=8),
|
||
Field(name="TX-TYPE", level=5, pic="X", usage="DISPLAY", offset=24, length=1),
|
||
Field(name="TX-AMOUNT", level=5, pic="S9(9)V99", usage="DISPLAY", offset=25, length=11),
|
||
Field(name="TX-CURRENCY", level=5, pic="X(3)", usage="DISPLAY", offset=36, length=3),
|
||
Field(name="TX-MERCHANT", level=5, pic="X(20)", usage="DISPLAY", offset=39, length=20),
|
||
], copybook_name="TXCPY")
|
||
flat = txcpy.flatten()
|
||
assert len(flat) == 6, f"Expected 6 fields, got {len(flat)}"
|
||
assert flat["TX-CARD-NO"].length == 16
|
||
assert flat["TX-AMOUNT"].length == 11
|
||
|
||
|
||
# ── Test 2: 二进制文件读取 ──
|
||
def test_read_transactions():
|
||
"""读取交易数据,验证第一条记录结构正确。"""
|
||
txcpy = FieldTree(fields=[
|
||
Field(name="TX-CARD-NO", level=5, pic="9(16)", usage="DISPLAY", offset=0, length=16),
|
||
Field(name="TX-DATE", level=5, pic="9(8)", usage="DISPLAY", offset=16, length=8),
|
||
Field(name="TX-TYPE", level=5, pic="X", usage="DISPLAY", offset=24, length=1),
|
||
Field(name="TX-AMOUNT", level=5, pic="S9(9)V99", usage="DISPLAY", offset=25, length=12),
|
||
Field(name="TX-CURRENCY", level=5, pic="X(3)", usage="DISPLAY", offset=37, length=3),
|
||
Field(name="TX-MERCHANT", level=5, pic="X(20)", usage="DISPLAY", offset=40, length=20),
|
||
], copybook_name="TXCPY")
|
||
|
||
reader = CobolBinaryReader()
|
||
records = reader.read(str(GOLDEN / "data/input/transactions.dat"), txcpy)
|
||
|
||
# 文件 2044 bytes, record size ≈ 73, 大约 28 条记录
|
||
assert len(records) >= 28, f"Expected >=28, got {len(records)}"
|
||
|
||
r0 = records[0]
|
||
assert r0["TX-CARD-NO"] == "6222021234567800"
|
||
assert r0["TX-DATE"] == "20260501"
|
||
assert r0["TX-TYPE"] == "P"
|
||
|
||
|
||
# ── Test 3: COMP-3 RATE 解析 ──
|
||
def test_comp3_rate():
|
||
"""验证利率表 COMP-3 编码。"""
|
||
n = Normalizer()
|
||
data = (GOLDEN / "data/input/rate.dat").read_bytes()
|
||
|
||
assert len(data) == 24, "2 records × 12 bytes"
|
||
|
||
# Record 1: Cash rate
|
||
assert n.normalize_comp3(data[1:4]) == "5" # 0.0005
|
||
assert chr(data[0]) == "C"
|
||
|
||
# Record 2: Overdue rate
|
||
assert n.normalize_comp3(data[13:16]) == "500" # 0.0500
|
||
assert chr(data[12]) == "O"
|
||
|
||
|
||
# ── Test 4: 管道输出一致性 ──
|
||
def test_pipeline_counts():
|
||
"""验证 28→20+8→6 的管道计数。"""
|
||
validated = len((GOLDEN / "data/work/validated_tx.dat").read_text().splitlines())
|
||
rejected = len((GOLDEN / "data/output/rejected_tx.dat").read_text().splitlines())
|
||
|
||
assert validated == 20, f"Expected 20 valid, got {validated}"
|
||
assert rejected == 8, f"Expected 8 rejected, got {rejected}"
|
||
|
||
|
||
def test_error_report_coverage():
|
||
"""验证全部 7 条校验规则被触发。"""
|
||
errors = (GOLDEN / "data/output/error_report.dat").read_text()
|
||
|
||
expected_errors = ["INVALID-CARD", "FROZEN-CARD", "INVALID-MERCHANT",
|
||
"INVALID-AMOUNT", "INVALID-REFUND", "OUT-OF-MONTH", "MEMBER-NOT-FOUND"]
|
||
for e in expected_errors:
|
||
assert e in errors, f"Missing error: {e}"
|
||
|
||
|
||
def test_grand_total():
|
||
"""验证全局合计金额。"""
|
||
summary = (GOLDEN / "data/output/summary_report.dat").read_text()
|
||
assert "AMOUNT: 48250.20" in summary
|
||
assert "INTEREST: 300.00" in summary
|
||
assert "FEE: 800.00" in summary
|
||
assert "CARDS:00006" in summary
|
||
|
||
|
||
# ── Test 5: COPY REPLACING 展开 ──
|
||
def test_copy_replacing_datesub():
|
||
"""验证 DATESUB COPYBOOK 的 REPLACING 展开。"""
|
||
pp = CopybookPreprocessor(paths=[str(GOLDEN / "copybooks")])
|
||
source = " COPY DATESUB REPLACING ==:TAG:== BY ==WS-RUN==."
|
||
result = pp.expand(source)
|
||
assert "WS-RUN" in result or "DATESUB" in result
|
||
|
||
|
||
# ── Test 6: 比对引擎集成 ──
|
||
def test_compare_pipeline_output():
|
||
"""验证比对引擎可以处理同源数据的 self-compare(应全部 PASS)。"""
|
||
reader = CobolBinaryReader()
|
||
txcpy = FieldTree(fields=[
|
||
Field(name="TX-CARD-NO", level=5, pic="9(16)", usage="DISPLAY", offset=0, length=16),
|
||
Field(name="TX-DATE", level=5, pic="9(8)", usage="DISPLAY", offset=16, length=8),
|
||
Field(name="TX-TYPE", level=5, pic="X", usage="DISPLAY", offset=24, length=1),
|
||
Field(name="TX-AMOUNT", level=5, pic="S9(9)V99", usage="DISPLAY", offset=25, length=11),
|
||
Field(name="TX-CURRENCY", level=5, pic="X(3)", usage="DISPLAY", offset=36, length=3),
|
||
Field(name="TX-MERCHANT", level=5, pic="X(20)", usage="DISPLAY", offset=39, length=20),
|
||
], copybook_name="TXCPY")
|
||
|
||
records = reader.read(str(GOLDEN / "data/input/transactions.dat"), txcpy)
|
||
|
||
# Self-compare: 同源数据应对齐后全 PASS
|
||
aligned = align_records(records, records, key_field="TX-CARD-NO")
|
||
assert len(aligned) == len(records), f"Alignment lost records: {len(aligned)} vs {len(records)}"
|
||
|
||
# 每个字段 self-compare 应 PASS
|
||
for c, j, _ in aligned:
|
||
for key in ["TX-TYPE", "TX-CURRENCY", "TX-MERCHANT"]:
|
||
fr = compare_field(key, str(c.get(key, "")), str(j.get(key, "")), "string")
|
||
assert fr.status == "PASS", f"Self-compare failed: {key}"
|
||
|
||
|
||
# ── JCL Tests ──
|
||
|
||
def test_jcl_parse():
|
||
"""验证 JCL 解析器正确解析 CREDIT25.jcl"""
|
||
import sys
|
||
sys.path.insert(0, str(GOLDEN.parent / "v3-gstack-code-gen"))
|
||
from jcl.parser import parse_jcl
|
||
|
||
jcl_path = str(GOLDEN / "jcl" / "CREDIT25.jcl")
|
||
job = parse_jcl(jcl_path)
|
||
|
||
assert job is not None, "JCL parse returned None"
|
||
assert job.job_name == "CREDIT25", f"Expected CREDIT25, got {job.job_name}"
|
||
assert len(job.steps) == 4, f"Expected 4 steps, got {len(job.steps)}"
|
||
|
||
# Step 1: SORT
|
||
assert job.steps[0].step_name == "STEP1"
|
||
assert job.steps[0].program == "SORT"
|
||
assert len(job.steps[0].dd_entries) == 3 # SORTIN, SORTOUT, SYSIN
|
||
assert any(dd.dd_name.upper() == "SORTIN" for dd in job.steps[0].dd_entries)
|
||
assert any(dd.dd_name.upper() == "SORTOUT" for dd in job.steps[0].dd_entries)
|
||
|
||
# Step 2: CRDVAL with COND
|
||
assert job.steps[1].step_name == "STEP2"
|
||
assert job.steps[1].program == "CRDVAL"
|
||
assert job.steps[1].cond is not None
|
||
assert job.steps[1].cond.code == 0
|
||
assert job.steps[1].cond.operator == "NE"
|
||
assert len(job.steps[1].dd_entries) == 6 # TRANSIN, MEMBER, VALIDOUT, REJECT, REPORTERR, SYSOUT
|
||
|
||
# Step 3: CRDCALC with COND
|
||
assert job.steps[2].step_name == "STEP3"
|
||
assert job.steps[2].program == "CRDCALC"
|
||
assert job.steps[2].cond.code == 0
|
||
|
||
# Step 4: CRDRPT with COND
|
||
assert job.steps[3].step_name == "STEP4"
|
||
assert job.steps[3].program == "CRDRPT"
|
||
assert job.steps[3].cond.code == 0
|
||
|
||
|
||
def test_jcl_dd_mapping():
|
||
"""验证 JCL DD 语句正确提取"""
|
||
import sys
|
||
sys.path.insert(0, str(GOLDEN.parent / "v3-gstack-code-gen"))
|
||
from jcl.parser import parse_jcl
|
||
|
||
job = parse_jcl(str(GOLDEN / "jcl" / "CREDIT25.jcl"))
|
||
|
||
# Check DD names for CRDVAL step
|
||
crdval_dds = {dd.dd_name.upper() for dd in job.steps[1].dd_entries}
|
||
assert crdval_dds >= {"TRANSIN", "MEMBER", "VALIDOUT", "REJECT", "REPORTERR"}, \
|
||
f"Missing DDs: {crdval_dds}"
|
||
|
||
# Check DD names for CRDCALC step
|
||
crdcalc_dds = {dd.dd_name.upper() for dd in job.steps[2].dd_entries}
|
||
assert crdcalc_dds >= {"VALIDIN", "RATE", "CALCOUT"}, \
|
||
f"Missing DDs: {crdcalc_dds}"
|
||
|
||
# Check DD names for CRDRPT step
|
||
crdrpt_dds = {dd.dd_name.upper() for dd in job.steps[3].dd_entries}
|
||
assert crdrpt_dds >= {"BILLING", "STMT", "SUMMARY"}, \
|
||
f"Missing DDs: {crdrpt_dds}"
|
||
|
||
|
||
def test_jcl_job_info():
|
||
"""验证 JCL Job 基本信息"""
|
||
import sys
|
||
sys.path.insert(0, str(GOLDEN.parent / "v3-gstack-code-gen"))
|
||
from jcl.parser import parse_jcl
|
||
|
||
job = parse_jcl(str(GOLDEN / "jcl" / "CREDIT25.jcl"))
|
||
|
||
assert job.job_name == "CREDIT25"
|
||
assert len(job.steps) == 4
|
||
|
||
programs = [s.program for s in job.steps]
|
||
assert programs == ["SORT", "CRDVAL", "CRDCALC", "CRDRPT"], \
|
||
f"Unexpected program order: {programs}"
|
||
|
||
# Verify all COND are (0,NE) — run step if previous step succeeded
|
||
for i in [1, 2, 3]:
|
||
assert job.steps[i].cond is not None, f"Step {i+1} missing COND"
|
||
assert job.steps[i].cond.code == 0
|
||
assert job.steps[i].cond.operator == "NE"
|