33762ca959
COBOL migration expert adversarial testing found 4 real defects:
FIX 1: Comment-stripping in detect_keyword() (FP-2)
- Remove *> inline comments and * comment lines before keyword matching
- Prevents 「マッチング」 from triggering on WS-KEY in comments
FIX 2: KEY comparison context validation (FP-1, FP-6)
- Add _matches_key_comparison() — requires WS-KEY variable to appear
NEAR an actual comparison operator (= < >), not just as PIC/VALUE decl
- Same check in _path_rule_engine features via has_key_var injection
- Fix regex bug: [=<>\s] vs [=<>] — \s matched whitespace after PIC decl
FIX 3: Old-school naming support (FN-1)
- Add L1 keyword r'[A-Z]\d{0,2}-\w*KEY' with 0.55 confidence
- Matches K01-KEY, KS-KEY etc. (non-WS- prefix naming convention)
FIX 4: mn_output_mode over-matching (FP-6)
- Require IF branches + KEY evidence before returning M:N for file>=3
- matching_vs_keybreak rule 3 now requires has_key_var
New tests: test_adversarial.py — 8 parametrized adversarial tests
Regression: 755 passed (0 new failures)
245 lines
11 KiB
Python
245 lines
11 KiB
Python
"""混淆组判定规则引擎 — 8 个混淆对的化解函数。
|
|
|
|
每个函数接收 features dict,返回:
|
|
{
|
|
"resolved_type": str,
|
|
"confidence": float,
|
|
"evidence": list[str],
|
|
}
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
|
|
def resolve_matching_vs_keybreak(features: dict) -> dict:
|
|
"""区分「マッチング」与「キーブレイク」。
|
|
|
|
规则:
|
|
- IF 三路分支 (comparison ≥ 2) + SELECT 文件数 ≥ 2 → マッチング
|
|
- IF 双路分支 (equality 为主) + WS-PREV-KEY 存在 + 累加器存在 → キーブレイク
|
|
"""
|
|
if_types = features.get("if_types", {})
|
|
total_ifs = if_types.get("total", 0)
|
|
comparison_ifs = if_types.get("comparison", 0)
|
|
equality_ifs = if_types.get("equality", 0)
|
|
|
|
select_files = features.get("select_files", {})
|
|
file_count = len(select_files) if isinstance(select_files, dict) else features.get("file_count", 0)
|
|
|
|
variable_patterns = features.get("variable_patterns", {})
|
|
has_prev_key = variable_patterns.get("has_prev_key", False)
|
|
has_accumulator = variable_patterns.get("has_accumulator", False)
|
|
|
|
evidence: list[str] = []
|
|
|
|
# 规则 1: 三路分支 + 多文件 → マッチング
|
|
if comparison_ifs >= 2 and file_count >= 2:
|
|
evidence.append(f"三路 IF 分支 (comparison={comparison_ifs}) + SELECT 文件数 >=2 ({file_count}) → マッチング")
|
|
return {"resolved_type": "マッチング", "confidence": 0.90, "evidence": evidence}
|
|
|
|
# 规则 2: 双路 + WS-PREV-KEY + 累加器 → キーブレイク
|
|
if total_ifs >= 1 and has_prev_key and has_accumulator:
|
|
evidence.append(f"WS-PREV-KEY 存在 + 累加器存在 + IF 分支 → キーブレイク")
|
|
return {"resolved_type": "キーブレイク", "confidence": 0.85, "evidence": evidence}
|
|
|
|
# 补充规则: SELECT 文件数 >= 2 且 comparison 至少 1 → 倾向マッチング
|
|
# 要求必须有实际的 KEY 变量比较(防止计数器比较误判)
|
|
has_key_compare = variable_patterns.get("has_prev_key", False) or features.get("has_key_var", False)
|
|
if file_count >= 2 and comparison_ifs >= 1 and has_key_compare:
|
|
evidence.append(f"SELECT 文件数 >=2 + comparison IF >=1 + KEY 变量 → マッチング")
|
|
return {"resolved_type": "マッチング", "confidence": 0.75, "evidence": evidence}
|
|
|
|
# 回退: 无法明确判定
|
|
evidence.append(f"特征不足: total_ifs={total_ifs}, comparison={comparison_ifs}, "
|
|
f"file_count={file_count}, has_prev_key={has_prev_key}, "
|
|
f"has_accumulator={has_accumulator}")
|
|
return {"resolved_type": "unknown", "confidence": 0.0, "evidence": evidence}
|
|
|
|
|
|
def resolve_dedup_vs_nodedup(features: dict) -> dict:
|
|
"""区分「項目チェック(重複含む)」与「項目チェック(重複含まず)」。
|
|
|
|
规则:
|
|
- WS-PREV-KEY 存在 → 含重复
|
|
- 无 WS-PREV-KEY → 不含重复
|
|
"""
|
|
variable_patterns = features.get("variable_patterns", {})
|
|
has_prev_key = variable_patterns.get("has_prev_key", False)
|
|
evidence: list[str] = []
|
|
|
|
if has_prev_key:
|
|
evidence.append("WS-PREV-KEY 存在 → 含重复")
|
|
return {"resolved_type": "項目チェック(重複含む)", "confidence": 0.90, "evidence": evidence}
|
|
else:
|
|
evidence.append("未检测到 WS-PREV-KEY → 可能不含重复(置信度低:缺少 WS-PREV-KEY 不代表一定是项目检查)")
|
|
return {"resolved_type": "項目チェック(重複含まず)", "confidence": 0.50, "evidence": evidence}
|
|
|
|
|
|
def resolve_validation_vs_keybreak(features: dict) -> dict:
|
|
"""区分「編集処理(校验)」与「キーブレイク」。
|
|
|
|
规则:
|
|
- WS-ERR* 相关字段存在 → 校验 (validation)
|
|
- WS-*CNT 累加计数器存在 → キーブレイク (key break)
|
|
"""
|
|
variable_patterns = features.get("variable_patterns", {})
|
|
has_error_flag = variable_patterns.get("has_error_flag", False)
|
|
has_counter = variable_patterns.get("has_counter", False)
|
|
evidence: list[str] = []
|
|
|
|
if has_error_flag:
|
|
evidence.append("WS-ERR* 错误字段存在 → 校验")
|
|
return {"resolved_type": "編集処理(校验)", "confidence": 0.85, "evidence": evidence}
|
|
|
|
if has_counter:
|
|
evidence.append("WS-*CNT 计数器存在 → 可能キーブレイク(置信度低:计数器是通用模式,非决定性证据)")
|
|
return {"resolved_type": "キーブレイク", "confidence": 0.55, "evidence": evidence}
|
|
|
|
evidence.append("既无错误字段也无计数器,无法判定")
|
|
return {"resolved_type": "unknown", "confidence": 0.0, "evidence": evidence}
|
|
|
|
|
|
def resolve_csv_merge_vs_split(features: dict) -> dict:
|
|
"""区分 CSV 合并与拆分。
|
|
|
|
规则:
|
|
- STRING 语句存在 → 无换行 (合并, merge)
|
|
- INSPECT REPLACING 存在 → 有换行 (拆分, split)
|
|
"""
|
|
has_string = features.get("has_string", False)
|
|
has_inspect = features.get("has_inspect", False)
|
|
evidence: list[str] = []
|
|
|
|
if has_string:
|
|
evidence.append("STRING 语句存在 → CSV 合并 (无换行)")
|
|
return {"resolved_type": "CSV合并", "confidence": 0.85, "evidence": evidence}
|
|
|
|
if has_inspect:
|
|
evidence.append("INSPECT REPLACING 存在 → CSV 拆分 (有换行)")
|
|
return {"resolved_type": "CSV拆分", "confidence": 0.85, "evidence": evidence}
|
|
|
|
evidence.append("既无 STRING 也无 INSPECT REPLACING")
|
|
return {"resolved_type": "unknown", "confidence": 0.0, "evidence": evidence}
|
|
|
|
|
|
def resolve_simple_vs_two_stage(features: dict) -> dict:
|
|
"""区分「単純マッチング」与「二段階マッチング」。
|
|
|
|
规则:
|
|
- OPEN → CLOSE → 再 OPEN 模式 → 二级匹配
|
|
- 其他顺序 → 简单匹配
|
|
"""
|
|
open_pattern = features.get("open_pattern", "")
|
|
evidence: list[str] = []
|
|
|
|
if open_pattern == "open-close-open":
|
|
evidence.append("OPEN→CLOSE→再OPEN 模式 → 二级匹配")
|
|
return {"resolved_type": "二段階マッチング", "confidence": 0.90, "evidence": evidence}
|
|
else:
|
|
evidence.append(f"OPEN 模式为 '{open_pattern}' → 默认为単純マッチング(非決定的: 无 OPEN-CLOSE-OPEN 模式不代表一定是匹配程序)")
|
|
return {"resolved_type": "単純マッチング", "confidence": 0.50, "evidence": evidence}
|
|
|
|
|
|
def resolve_pure_vs_mixed(features: dict) -> dict:
|
|
"""区分「純粋マッチング」与「混合マッチング」。
|
|
|
|
规则:
|
|
- variable_patterns 中 has_switch 且 has_counter → 混合(隐含额外键比较)
|
|
- 有 PERFORM 且 多文件 → 可能混合
|
|
- 否则 → 纯粹匹配(低确信度,因无法静态确定有无额外键比较)
|
|
"""
|
|
variable_patterns = features.get("variable_patterns", {})
|
|
if_types = features.get("if_types", {})
|
|
evidence: list[str] = []
|
|
|
|
has_switch = variable_patterns.get("has_switch", False)
|
|
has_counter = variable_patterns.get("has_counter", False)
|
|
if_count = if_types.get("total", 0)
|
|
|
|
if has_switch and has_counter and if_count >= 3:
|
|
evidence.append("多个变量模式和 IF 分支 → 可能混合匹配")
|
|
return {"resolved_type": "混合マッチング", "confidence": 0.70, "evidence": evidence}
|
|
|
|
evidence.append("无明确混合特征 → 纯粹匹配(需数据验证)")
|
|
return {"resolved_type": "unknown", "confidence": 0.0, "evidence": evidence}
|
|
|
|
|
|
def resolve_division_50_25_100(features: dict) -> dict:
|
|
"""区分 DIVIDE 被除数常量 50/25/100。
|
|
|
|
从 features["divide_constants"] 列表中匹配已知常量。
|
|
"""
|
|
divide_constants = features.get("divide_constants", [])
|
|
evidence: list[str] = []
|
|
|
|
if not isinstance(divide_constants, (list, tuple)):
|
|
evidence.append("divide_constants 格式无效")
|
|
return {"resolved_type": "unknown", "confidence": 0.0, "evidence": evidence}
|
|
|
|
for c in divide_constants:
|
|
if c in (50, 25, 100):
|
|
evidence.append(f"DIVIDE 被除数 = {c}")
|
|
return {"resolved_type": f"DIVIDE_{c}", "confidence": 0.95, "evidence": evidence}
|
|
|
|
evidence.append(f"未匹配已知常量 (50/25/100),当前值: {divide_constants}")
|
|
return {"resolved_type": "unknown", "confidence": 0.0, "evidence": evidence}
|
|
|
|
|
|
def resolve_mn_output_mode(features: dict) -> dict:
|
|
"""判断 M:N 输出模式。
|
|
|
|
规则:
|
|
- 根据文件或记录数判断 M:N 关系
|
|
- 返回 unknown 注明需数据验证
|
|
"""
|
|
select_files = features.get("select_files", {})
|
|
file_count = len(select_files) if isinstance(select_files, dict) else features.get("file_count", 0)
|
|
evidence: list[str] = []
|
|
|
|
# 尝试判断 M:N(从现有特征推断)
|
|
select_count = len(select_files)
|
|
total_branches = features.get("total_branches", 0)
|
|
if select_count >= 2 and total_branches >= 3:
|
|
evidence.append(f"SELECT={select_count}, 分支={total_branches} → 可能 M:N")
|
|
return {"resolved_type": "M:N", "confidence": 0.65, "evidence": evidence}
|
|
|
|
if file_count >= 3:
|
|
# 需要至少有 IF 分支和 KEY 变量的证据,否则单纯文件多不是匹配程序
|
|
vp = features.get("variable_patterns", {})
|
|
total_ifs = features.get("if_types", {}).get("total", 0)
|
|
has_key_evidence = vp.get("has_prev_key", False) or vp.get("has_accumulator", False)
|
|
if total_ifs >= 1 and has_key_evidence:
|
|
evidence.append(f"文件数 {file_count} >= 3, IF 分支 {total_ifs}, KEY 证据 → 可能 M:N")
|
|
return {"resolved_type": "M:N", "confidence": 0.60, "evidence": evidence}
|
|
evidence.append(f"文件数 {file_count} 但无 IF+KEY 证据 → 不是 M:N 匹配")
|
|
return {"resolved_type": "unknown", "confidence": 0.0, "evidence": evidence}
|
|
|
|
evidence.append("需数据验证确定 M:N 输出模式")
|
|
return {"resolved_type": "unknown", "confidence": 0.0, "evidence": evidence}
|
|
|
|
|
|
# ── 调度表 ──────────────────────────────────────────────────────────────────
|
|
|
|
_RESOLVER_MAP = {
|
|
"matching_vs_keybreak": resolve_matching_vs_keybreak,
|
|
"dedup_vs_nodedup": resolve_dedup_vs_nodedup,
|
|
"validation_vs_keybreak": resolve_validation_vs_keybreak,
|
|
"csv_merge_vs_split": resolve_csv_merge_vs_split,
|
|
"simple_vs_two_stage": resolve_simple_vs_two_stage,
|
|
"pure_vs_mixed": resolve_pure_vs_mixed,
|
|
"division_50_25_100": resolve_division_50_25_100,
|
|
"mn_output_mode": resolve_mn_output_mode,
|
|
}
|
|
|
|
|
|
def resolve_confusion_pair(features: dict, pair_name: str) -> dict:
|
|
"""Dispatch to the correct function by pair_name."""
|
|
resolver = _RESOLVER_MAP.get(pair_name)
|
|
if resolver is None:
|
|
return {
|
|
"resolved_type": "unknown",
|
|
"confidence": 0.0,
|
|
"evidence": [f"未知混淆对名称: {pair_name}"],
|
|
}
|
|
return resolver(features)
|