"""Phase 7: 分割系测试 — 基于 parametrized.generate_division_data。 测试覆盖: - 50% / 25% / 100% 分割 - 余数处理(奇偶 / 不可整除) - 边界条件(单条记录 / 大量记录) """ from __future__ import annotations import pytest from parametrized import generate_division_data class TestDivisionFifty: """50% 对半分割 → 2 个文件""" def test_50_even_split(self): result = generate_division_data(50, 100) assert len(result) == 2 assert len(result[0]) == 50 assert len(result[1]) == 50 assert sum(len(f) for f in result) == 100 def test_50_odd_remainder(self): """奇数条记录: 最后一条应归属第 2 个文件""" result = generate_division_data(50, 5) assert len(result) == 2 assert len(result[0]) + len(result[1]) == 5 def test_50_single_record(self): result = generate_division_data(50, 1) assert len(result) == 2 assert len(result[0]) == 0 assert len(result[1]) == 1 def test_50_content_check(self): result = generate_division_data(50, 10) for file_no, records in enumerate(result, 1): for rec in records: assert rec["FILE_NO"] == file_no assert rec["KEY"].startswith("DIV") assert "SEQ" in rec assert "DATA" in rec class TestDivisionTwentyFive: """25% 四等分分割 → 4 个文件""" def test_25_even_split(self): result = generate_division_data(25, 100) assert len(result) == 4 # 100/4 = 25 各 for records in result: assert len(records) == 25 def test_25_remainder(self): """不可被 4 整除时,最后文件拿到剩余条数""" result = generate_division_data(25, 10) assert len(result) == 4 total = sum(len(f) for f in result) assert total == 10 # 前 3 个文件各 2 条(floor(10*0.25)=2)→ 第 4 个文件得 4 条 assert len(result[0]) == 2 assert len(result[1]) == 2 assert len(result[2]) == 2 assert len(result[3]) == 4 def test_25_single_record(self): result = generate_division_data(25, 1) assert len(result) == 4 assert len(result[0]) == 0 assert len(result[1]) == 0 assert len(result[2]) == 0 assert len(result[3]) == 1 def test_25_content_check(self): result = generate_division_data(25, 40) for file_no, records in enumerate(result, 1): for rec in records: assert rec["FILE_NO"] == file_no class TestDivisionOneHundred: """100% 全量(不分)→ 1 个文件""" def test_100_all_in_one(self): result = generate_division_data(100, 50) assert len(result) == 1 assert len(result[0]) == 50 def test_100_single_record(self): result = generate_division_data(100, 1) assert len(result) == 1 assert len(result[0]) == 1 assert result[0][0]["FILE_NO"] == 1 def test_100_large_count(self): result = generate_division_data(100, 10000) assert len(result) == 1 assert len(result[0]) == 10000 assert result[0][0]["SEQ"] == 1 assert result[0][-1]["SEQ"] == 10000 class TestDivisionEdgeCases: """边界与异常""" def test_invalid_division_type(self): with pytest.raises(ValueError, match="division_type"): generate_division_data(99, 50) def test_invalid_record_count(self): with pytest.raises(ValueError, match="record_count"): generate_division_data(50, 0) def test_sequence_global(self): """验证 SEQ 全局递增,不重复""" result = generate_division_data(25, 30) all_seq = [] for records in result: for rec in records: all_seq.append(rec["SEQ"]) assert all_seq == sorted(all_seq) assert len(set(all_seq)) == len(all_seq)