diff --git a/hina/classifier.py b/hina/classifier.py index 9018af6..f41a98f 100644 --- a/hina/classifier.py +++ b/hina/classifier.py @@ -92,6 +92,48 @@ def _get_procedure_division(source_upper: str) -> str: return source_upper +def _detect_matching_structure(source_upper: str) -> float: + """结构检测:不依赖变量名 KEY 的模式匹配检测。 + + 通过分析 COBOL 程序的控制流结构判断是否为匹配程序。 + 返回确信度 0.0~0.55,0.0 表示不是匹配。 + + 匹配程序的结构性特征: + 信号 1: READ + AT END + EOF(文件读取循环) + 信号 2: PERFORM UNTIL + EOF(主循环) + 信号 3: ELSE 体内 READ(条件性读取——匹配核心) + 信号 4: IF 比较两个连字号字段(跨文件字段比较) + 信号 5: 2+ 文件 OPEN INPUT(多文件输入) + """ + import re + + signals = 0 + # 信号 1: READ + AT END + EOF(文件读取循环) + if re.search(r'READ\s+\w+.*AT\s+END.*EOF', source_upper): + signals += 1 + # 信号 2: PERFORM UNTIL + EOF(主循环) + if re.search(r'PERFORM\s+UNTIL\s+.*EOF', source_upper): + signals += 1 + # 信号 3: ELSE 体内 READ(条件性读取) + if re.search(r'ELSE\s+.*READ\s+', source_upper): + signals += 1 + # 信号 4: IF 比较两个连字号字段(跨文件字段比较) + if re.search(r'IF\s+\w+-\w+\s*[=<>]\s*\w+-\w+', source_upper): + signals += 1 + # 信号 5: 2+ 文件 OPEN INPUT + if re.search(r'OPEN\s+INPUT\s+\w+\s+\w+', source_upper): + signals += 1 + + # 确信度: 5 中 5 = 0.55, 5 中 4 = 0.50, 5 中 3 = 0.40 + if signals >= 5: + return 0.55 + elif signals >= 4: + return 0.50 + elif signals >= 3: + return 0.40 + return 0.0 + + def detect_keyword(source: str) -> list[tuple[str, float, str]]: """在 COBOL 源码中搜索 L1_RULES 定义的关键字,返回匹配结果。 @@ -135,6 +177,15 @@ def detect_keyword(source: str) -> list[tuple[str, float, str]]: matched = True break + # ── 结构性匹配检测(不依赖 KEY 变量名)── + match_conf = _detect_matching_structure(source_upper) + if match_conf > 0: + has_more_specific = any( + cat != "マッチング" for cat, _, _ in results + ) + if not has_more_specific: + results.append(("マッチング", match_conf, "structural_matching")) + return results diff --git a/hina/pipeline/pipeline.py b/hina/pipeline/pipeline.py index 2d1729d..0ed0e16 100644 --- a/hina/pipeline/pipeline.py +++ b/hina/pipeline/pipeline.py @@ -166,6 +166,13 @@ def _path_rule_engine( r'\b[A-Z]\d{0,2}-[\w-]*KEY\s*[=<>]', # K01-KEY = su )) + # 注入 has_structural_match: 结构性匹配检测的结果(不依赖变量名 KEY) + # 当 detect_keyword 通过结构识别出匹配时,让规则引擎也能利用这个信号 + features["has_structural_match"] = bool(re.search( + r'IF\s+\w+-\w+\s*[=<>]\s*\w+-\w+.*' # 跨文件字段比较 + r'(?:PERFORM|END-PERFORM|READ)', # 含循环/读取 + su, re.DOTALL + )) # 2. 运行所有混淆组解析器 resolved_types: dict[str, str] = {} @@ -205,19 +212,48 @@ def _path_rule_engine( final_category = keyword_info["category"] final_base_confidence = keyword_info["confidence"] + # 规则引擎结果优先级: 匹配检测 > 辅助推断 + # マッチング/項目チェック/キーブレイク/編集処理 是主类型,优先级高 + # M:N/DIVIDE 是辅助推断,仅当主类型未命中时才采纳 + _MAIN_TYPE_PRIORITY = {"マッチング", "項目チェック(重複含む)", "項目チェック(重複含まず)", + "キーブレイク", "編集処理(校验)", "二段階マッチング", + "単純マッチング", "混合マッチング", "CSV合并", "CSV拆分", + "純粋マッチング"} + # 如果规则引擎有更高置信度的结果, 则采纳 # 使用第一轮缓存的结果(M1: 消除冗余重复调用) best_resolved_type = None best_resolved_conf = 0.0 + best_is_main = False for pair_name, rtype in resolved_types.items(): cached_conf = resolved_confidences.get(pair_name, 0.0) - if cached_conf > best_resolved_conf: - best_resolved_conf = cached_conf - best_resolved_type = rtype + is_main = rtype in _MAIN_TYPE_PRIORITY - if best_resolved_type and best_resolved_conf > final_base_confidence: - final_category = best_resolved_type - final_base_confidence = best_resolved_conf + if best_resolved_type is None: + best_resolved_type = rtype + best_resolved_conf = cached_conf + best_is_main = is_main + elif is_main and not best_is_main: + # 主类型覆盖非主类型(即使置信度略低) + best_resolved_type = rtype + best_resolved_conf = cached_conf + best_is_main = True + elif cached_conf > best_resolved_conf: + best_resolved_type = rtype + best_resolved_conf = cached_conf + best_is_main = is_main + + if best_resolved_type: + final_is_main = final_category in _MAIN_TYPE_PRIORITY + if best_resolved_conf > final_base_confidence: + # 置信度更高 → 替换 + final_category = best_resolved_type + final_base_confidence = best_resolved_conf + elif best_is_main and not final_is_main and final_base_confidence < 0.40: + # 主类型替代低确信度的非主类型(如 M:N→マッチング) + # 但如果 keyword 已确定具体分类(如编码转换 0.85),不覆盖 + final_category = best_resolved_type + final_base_confidence = max(final_base_confidence, best_resolved_conf) # 5. 计算 4 因子确信度 keyword_result_v2 = _build_keyword_result_for_v2(keyword_info) diff --git a/hina/rule_engine/confusion_groups.py b/hina/rule_engine/confusion_groups.py index 68d8803..4f8a4f7 100644 --- a/hina/rule_engine/confusion_groups.py +++ b/hina/rule_engine/confusion_groups.py @@ -42,11 +42,14 @@ def resolve_matching_vs_keybreak(features: dict) -> dict: evidence.append(f"WS-PREV-KEY 存在 + 累加器存在 + IF 分支 → キーブレイク") return {"resolved_type": "キーブレイク", "confidence": 0.85, "evidence": evidence} - # 补充规则: SELECT 文件数 >= 2 且 comparison 至少 1 → 倾向マッチング + # 补充规则: SELECT 文件数 >= 2 且 comparison/eqlality 至少 1 → 倾向マッチング # 要求必须有实际的 KEY 变量比较(防止计数器比较误判) + # 或结构性匹配检测信号(变量名不含 KEY 但结构是匹配) has_key_compare = variable_patterns.get("has_prev_key", False) or features.get("has_key_var", False) - if file_count >= 2 and comparison_ifs >= 1 and has_key_compare: - evidence.append(f"SELECT 文件数 >=2 + comparison IF >=1 + KEY 变量 → マッチング") + has_struct_match = features.get("has_structural_match", False) or features.get("has_prev_key", False) + effective_ifs = comparison_ifs + equality_ifs + if file_count >= 2 and effective_ifs >= 1 and (has_key_compare or has_struct_match): + evidence.append(f"SELECT 文件数 >=2 + IF >=1 + KEY/结构证据 → マッチング") return {"resolved_type": "マッチング", "confidence": 0.75, "evidence": evidence} # 回退: 无法明确判定 diff --git a/tests/parametrized/test_statements/test_adversarial.py b/tests/parametrized/test_statements/test_adversarial.py index 38e38d9..1d28a21 100644 --- a/tests/parametrized/test_statements/test_adversarial.py +++ b/tests/parametrized/test_statements/test_adversarial.py @@ -4,8 +4,10 @@ COBOL 迁移专家设计的攻击面: - FP: 非匹配程序被误判为マッチング - FN: 真实匹配程序未被识别 - 边界: 注释关键词、旧式命名、多文件非匹配 +- FN: 变量名不含 KEY 但结构是匹配程序 """ +import re from pathlib import Path import pytest @@ -16,25 +18,23 @@ from hina.classifier import detect_keyword FIXTURES = Path(__file__).parents[3] / "test-data" / "cobol" / "adversarial" # (filename, expect_matching, reason) -# expect_matching=True → must be マッチング/二段階 -# expect_matching=False → must NOT be マッチング/二段階 ADVERSARIAL_TESTS = [ ("ADV-FALSE-KEY.cbl", False, - "FP: WS-KEY 变量但只是简单 ADD 程序,不应触发匹配"), + "FP: WS-KEY variable but only simple ADD, should NOT trigger matching"), ("ADV-KEY-IN-COMMENT.cbl", False, - "FP: KEY 只在 *> 注释中,不应触发匹配"), + "FP: KEY only in *> comments, should NOT trigger matching"), ("ADV-PREVKEY-FAKE.cbl", False, - "FP: WS-PREV-KEY 但无匹配逻辑,不应触发匹配"), + "FP: WS-PREV-KEY without matching logic, should NOT trigger"), ("ADV-OLD-SCHOOL.cbl", True, - "FN: K01-KEY 旧式命名,应识别为匹配"), + "FN: K01-KEY old-school naming, should detect matching"), ("ADV-TINY-MATCH.cbl", True, - "FN: 极简匹配程序(1 文件),应识别"), + "FN: Minimal matching (1 file), should detect"), ("ADV-CALL-MATCH.cbl", False, - "FP: CALL+WS-MAST-KEY,子程序调用应优先"), + "FP: CALL+WS-MAST-KEY, subprogram call should win"), ("ADV-ASCII-KEY.cbl", False, - "FP: ASCII+WS-KEY,编码转换应优先"), + "FP: ASCII+WS-KEY, encoding conversion should win"), ("ADV-10FILES.cbl", False, - "FP: 10 文件无 KEY 比较,不应触发匹配"), + "FP: 10 files no KEY comparison, should NOT trigger matching"), ] @@ -44,21 +44,18 @@ ADVERSARIAL_TESTS = [ ids=[t[0].replace('.cbl','') for t in ADVERSARIAL_TESTS], ) def test_adversarial(filename, expect_matching, reason): - """对抗性测试:验证明假阳性/假阴性""" + """Adversarial test: false positive / false negative check""" path = FIXTURES / filename assert path.exists(), f"Missing: {path}" src = path.read_text("utf-8") - # 1. extract_structure must not crash struct = extract_structure(src) assert struct is not None - # 2. classify_program must not crash result = classify_program(src) assert result is not None assert result["confidence"] >= 0 - # 3. False positive/negative check is_matching = "マッチング" in result["category"] or "二段階" in result["category"] if expect_matching: assert is_matching, ( @@ -71,10 +68,74 @@ def test_adversarial(filename, expect_matching, reason): f"(conf={result['confidence']:.2f}). Reason: {reason}" ) - # 4. Keyword detection sanity kw = detect_keyword(src) if expect_matching: - # Matching programs should have at least 1 keyword match assert len(kw) >= 1 or result["method"] != "rule_engine_fallback", ( f"{filename}: matching program with 0 keyword matches" ) + + +def test_structural_matching_no_keyword(): + """FN: Matching program without KEY in variable names (CUST-CODE vs ORDR-CODE) + + Real-world COBOL matching programs often use -CODE or -ID instead of -KEY. + Structural detection must catch these even without naming hints. + """ + src = """ IDENTIFICATION DIVISION. + PROGRAM-ID. REALMT. + ENVIRONMENT DIVISION. + INPUT-OUTPUT SECTION. + FILE-CONTROL. + SELECT CUST-FILE ASSIGN TO 'CUST.DAT'. + SELECT ORDR-FILE ASSIGN TO 'ORDR.DAT'. + DATA DIVISION. + FILE SECTION. + FD CUST-FILE. + 01 CUST-REC. + 05 CUST-CODE PIC X(10). + 05 CUST-NAME PIC X(30). + FD ORDR-FILE. + 01 ORDR-REC. + 05 ORDR-CODE PIC X(10). + 05 ORDR-AMT PIC 9(7)V99. + WORKING-STORAGE SECTION. + 01 WS-CUST-CODE PIC X(10). + 01 WS-ORDR-CODE PIC X(10). + 01 WS-EOF1 PIC X VALUE 'N'. + 01 WS-EOF2 PIC X VALUE 'N'. + PROCEDURE DIVISION. + MAIN. + OPEN INPUT CUST-FILE ORDR-FILE. + READ CUST-FILE INTO CUST-REC + AT END MOVE 'Y' TO WS-EOF1. + READ ORDR-FILE INTO ORDR-REC + AT END MOVE 'Y' TO WS-EOF2. + PERFORM UNTIL WS-EOF1 = 'Y' OR WS-EOF2 = 'Y' + IF CUST-CODE = ORDR-CODE + DISPLAY 'MATCH' + ELSE IF CUST-CODE < ORDR-CODE + READ CUST-FILE AT END MOVE 'Y' TO WS-EOF1 + ELSE + READ ORDR-FILE AT END MOVE 'Y' TO WS-EOF2 + END-IF + END-PERFORM. + CLOSE CUST-FILE ORDR-FILE. + STOP RUN. +""" + result = classify_program(src) + kw = detect_keyword(src) + + # Must have structural matching keyword + assert any("structural" in k[2] for k in kw), ( + f"Expected structural matching keyword, got {kw}" + ) + + # Must be classified as matching + assert "マッチング" in result["category"] or "二段階" in result["category"], ( + f"Expected matching, got '{result['category']}'" + ) + + # Confidence should be reasonable + assert result["confidence"] > 0.30, ( + f"Confidence too low: {result['confidence']:.2f}" + )