"""Phase 7: 匹配系测试 — 基于 parametrized 生成匹配数据。 测试覆盖: - 1:1 / 1:N / N:1 基本匹配(含内容校验) - 不平衡场景(主 > 从 / 从 > 主) - gcov 验证入口(需要 cobc 环境) """ from __future__ import annotations import pytest from parametrized import generate_matching_data, generate_keybreak_data # ============================================================ # 1:1 匹配 # ============================================================ class TestMatchingOneToOne: """1:1 — 主件每条在从件最多命中一条""" def test_1to1_equal_counts_all_matched(self): main, sub = generate_matching_data("1:1", 10, 10, 1.0) assert len(main) == 10 assert len(sub) == 10 main_keys = {r["KEY"] for r in main} sub_keys = {r["KEY"] for r in sub} assert main_keys == sub_keys, "全部匹配时主从 KEY 集合应一致" def test_1to1_equal_counts_partial_50(self): main, sub = generate_matching_data("1:1", 10, 10, 0.5) assert len(main) == 10 assert len(sub) == 10 matched = sum(1 for r in sub if r["KEY"].startswith("MAIN")) assert matched == 5, "50% 匹配应有 5 条从件命中" def test_1to1_unbalanced_main_more(self): main, sub = generate_matching_data("1:1", 20, 5, 1.0) assert len(main) == 20 assert len(sub) == 5 sub_keys = {r["KEY"] for r in sub} matched = sum(1 for r in main if r["KEY"] in sub_keys) assert matched == 5, "主件多于从件时最多只能匹配从件数" def test_1to1_unbalanced_sub_more(self): main, sub = generate_matching_data("1:1", 5, 20, 1.0) assert len(main) == 5 assert len(sub) == 20 matched = sum(1 for r in sub if r["KEY"].startswith("MAIN")) assert matched == 5, "从件多于主件时最多只能匹配主件数" def test_1to1_no_match(self): main, sub = generate_matching_data("1:1", 10, 10, 0.0) main_keys = {r["KEY"] for r in main} sub_keys = {r["KEY"] for r in sub} assert main_keys.isdisjoint(sub_keys), "ratio=0 时主从 KEY 应无交集" def test_1to1_ratio_boundary(self): """边界: match_ratio=0.0 和 1.0""" main0, sub0 = generate_matching_data("1:1", 5, 5, 0.0) main1, sub1 = generate_matching_data("1:1", 5, 5, 1.0) m0 = {r["KEY"] for r in main0} s0 = {r["KEY"] for r in sub0} assert m0.isdisjoint(s0) m1 = {r["KEY"] for r in main1} s1 = {r["KEY"] for r in sub1} assert m1 == s1 def test_1to1_content_integrity(self): """验证每条记录包含正确的字段结构""" main, sub = generate_matching_data("1:1", 5, 5, 1.0) for rec in main: assert "KEY" in rec assert "DATA" in rec assert "SEQ" in rec for rec in sub: assert "KEY" in rec assert "DATA" in rec assert "SEQ" in rec # ============================================================ # 1:N 匹配 # ============================================================ class TestMatchingOneToMany: """1:N — 主件每条在从件可能命中多条""" def test_1toN_one_main_many_sub(self): main, sub = generate_matching_data("1:N", 1, 10, 1.0) assert len(main) == 1 assert len(sub) == 10 assert main[0]["KEY"] == "MAIN-0000" assert all(r["KEY"] == "MAIN-0000" for r in sub), "全部从件应匹配同一主件" def test_1toN_mixed_unmatched(self): main, sub = generate_matching_data("1:N", 5, 10, 0.6) assert len(main) == 5 assert len(sub) == 10 matched = [r for r in sub if r["KEY"].startswith("MAIN")] unmatched = [r for r in sub if r["KEY"].startswith("UNMATCHED")] assert len(matched) > 0 assert len(unmatched) > 0 def test_1toN_all_main_unmatched(self): main, sub = generate_matching_data("1:N", 5, 10, 0.0) assert all(r["KEY"].startswith("UNMATCHED") for r in sub) # ============================================================ # N:1 匹配 # ============================================================ class TestMatchingManyToOne: """N:1 — 从件每条在主件可能命中多条""" def test_Nto1_many_main_one_sub(self): main, sub = generate_matching_data("N:1", 10, 1, 1.0) assert len(main) == 10 assert len(sub) == 1 sub_key = sub[0]["KEY"] assert sub_key.startswith("MAIN") matched = sum(1 for r in main if r["KEY"] == sub_key) assert matched >= 1 def test_Nto1_unbalanced(self): main, sub = generate_matching_data("N:1", 100, 20, 0.5) assert len(main) == 100 assert len(sub) == 20 matched = sum(1 for r in sub if r["KEY"].startswith("MAIN")) assert matched <= 20 def test_Nto1_all_unmatched(self): main, sub = generate_matching_data("N:1", 10, 5, 0.0) sub_keys = {r["KEY"] for r in sub} assert all(r["KEY"] not in sub_keys for r in main) # ============================================================ # KEY 切中断 # ============================================================ class TestKeybreak: """KEY 值变化触发中断 / AT END / BREAK""" def test_keybreak_three_groups(self): data = generate_keybreak_data(3, 2) assert len(data) == 6 keys = [r["KEY"] for r in data] assert keys == ["KEY-A", "KEY-A", "KEY-B", "KEY-B", "KEY-C", "KEY-C"] def test_keybreak_many_groups(self): data = generate_keybreak_data(10, 1) assert len(data) == 10 assert len({r["KEY"] for r in data}) == 10 def test_keybreak_field_accumulate(self): data = generate_keybreak_data(3, 2, "accumulate") assert data[0]["FIELD"] == 101 assert data[1]["FIELD"] == 102 assert data[2]["FIELD"] == 201 assert data[5]["FIELD"] == 302 def test_keybreak_field_aggregate(self): data = generate_keybreak_data(3, 3, "aggregate") assert all(r["FIELD"] == 100 for r in data[0:3]) assert all(r["FIELD"] == 200 for r in data[3:6]) assert all(r["FIELD"] == 300 for r in data[6:9]) def test_keybreak_field_mark(self): data = generate_keybreak_data(4, 1, "mark") assert [r["FIELD"] for r in data] == ["MARK-A", "MARK-B", "MARK-C", "MARK-D"] # ============================================================ # gcov 验证(可选,需要 cobc) # ============================================================ class TestGcovVerification: """gcov 验证 — 需要 cobc 编译器""" @pytest.mark.skip(reason="需要 cobc 编译器才能运行真实的 gcov 验证") def test_gcov_with_cobc(self): """基于真实 COBOL 编译的 gcov 覆盖验证""" pytest.skip("COBOL 编译器 (cobc) 不可用 — 跳过 gcov 验证") def test_gcov_coverage_data_structure(self): """验证 gcov 所需的数据结构完整性(不依赖 cobc)""" from parametrized.common import generate_minimal_records fields = [ {"name": "KEY", "type": "string", "length": 10}, {"name": "AMOUNT", "type": "numeric"}, ] records = generate_minimal_records(fields) assert len(records) == 1 assert "KEY" in records[0] assert "AMOUNT" in records[0] assert records[0]["AMOUNT"] == 0