feat: structural matching detection — no KEY variable needed
Add _detect_matching_structure(): detection based on control flow pattern, not variable naming conventions. Uses 5 structural signals: 1. READ + AT END + EOF pattern 2. PERFORM UNTIL with EOF condition 3. ELSE body with conditional READ (matching core) 4. IF comparing hyphenated fields (cross-file comparison) 5. Multi-file OPEN INPUT 5/5 signals → 0.55, 4/5 → 0.50, 3/5 → 0.40. Real-world impact: matching programs with key fields named CUST-CODE and ORDR-CODE (no '-KEY' in name) are now correctly detected. Also: - Rule engine type priority: main types (マッチング etc.) override secondary types (M:N, DIVIDE) when keyword confidence is low - has_structural_match injected into features so rule engine can use it - matching_vs_keybreak accepts equality IFs as matching evidence - New test: test_structural_matching_no_keyword() Regression: 764 passed (0 new failures).
This commit is contained in:
@@ -92,6 +92,48 @@ def _get_procedure_division(source_upper: str) -> str:
|
|||||||
return source_upper
|
return source_upper
|
||||||
|
|
||||||
|
|
||||||
|
def _detect_matching_structure(source_upper: str) -> float:
|
||||||
|
"""结构检测:不依赖变量名 KEY 的模式匹配检测。
|
||||||
|
|
||||||
|
通过分析 COBOL 程序的控制流结构判断是否为匹配程序。
|
||||||
|
返回确信度 0.0~0.55,0.0 表示不是匹配。
|
||||||
|
|
||||||
|
匹配程序的结构性特征:
|
||||||
|
信号 1: READ + AT END + EOF(文件读取循环)
|
||||||
|
信号 2: PERFORM UNTIL + EOF(主循环)
|
||||||
|
信号 3: ELSE 体内 READ(条件性读取——匹配核心)
|
||||||
|
信号 4: IF 比较两个连字号字段(跨文件字段比较)
|
||||||
|
信号 5: 2+ 文件 OPEN INPUT(多文件输入)
|
||||||
|
"""
|
||||||
|
import re
|
||||||
|
|
||||||
|
signals = 0
|
||||||
|
# 信号 1: READ + AT END + EOF(文件读取循环)
|
||||||
|
if re.search(r'READ\s+\w+.*AT\s+END.*EOF', source_upper):
|
||||||
|
signals += 1
|
||||||
|
# 信号 2: PERFORM UNTIL + EOF(主循环)
|
||||||
|
if re.search(r'PERFORM\s+UNTIL\s+.*EOF', source_upper):
|
||||||
|
signals += 1
|
||||||
|
# 信号 3: ELSE 体内 READ(条件性读取)
|
||||||
|
if re.search(r'ELSE\s+.*READ\s+', source_upper):
|
||||||
|
signals += 1
|
||||||
|
# 信号 4: IF 比较两个连字号字段(跨文件字段比较)
|
||||||
|
if re.search(r'IF\s+\w+-\w+\s*[=<>]\s*\w+-\w+', source_upper):
|
||||||
|
signals += 1
|
||||||
|
# 信号 5: 2+ 文件 OPEN INPUT
|
||||||
|
if re.search(r'OPEN\s+INPUT\s+\w+\s+\w+', source_upper):
|
||||||
|
signals += 1
|
||||||
|
|
||||||
|
# 确信度: 5 中 5 = 0.55, 5 中 4 = 0.50, 5 中 3 = 0.40
|
||||||
|
if signals >= 5:
|
||||||
|
return 0.55
|
||||||
|
elif signals >= 4:
|
||||||
|
return 0.50
|
||||||
|
elif signals >= 3:
|
||||||
|
return 0.40
|
||||||
|
return 0.0
|
||||||
|
|
||||||
|
|
||||||
def detect_keyword(source: str) -> list[tuple[str, float, str]]:
|
def detect_keyword(source: str) -> list[tuple[str, float, str]]:
|
||||||
"""在 COBOL 源码中搜索 L1_RULES 定义的关键字,返回匹配结果。
|
"""在 COBOL 源码中搜索 L1_RULES 定义的关键字,返回匹配结果。
|
||||||
|
|
||||||
@@ -135,6 +177,15 @@ def detect_keyword(source: str) -> list[tuple[str, float, str]]:
|
|||||||
matched = True
|
matched = True
|
||||||
break
|
break
|
||||||
|
|
||||||
|
# ── 结构性匹配检测(不依赖 KEY 变量名)──
|
||||||
|
match_conf = _detect_matching_structure(source_upper)
|
||||||
|
if match_conf > 0:
|
||||||
|
has_more_specific = any(
|
||||||
|
cat != "マッチング" for cat, _, _ in results
|
||||||
|
)
|
||||||
|
if not has_more_specific:
|
||||||
|
results.append(("マッチング", match_conf, "structural_matching"))
|
||||||
|
|
||||||
return results
|
return results
|
||||||
|
|
||||||
|
|
||||||
|
|||||||
@@ -166,6 +166,13 @@ def _path_rule_engine(
|
|||||||
r'\b[A-Z]\d{0,2}-[\w-]*KEY\s*[=<>]', # K01-KEY =
|
r'\b[A-Z]\d{0,2}-[\w-]*KEY\s*[=<>]', # K01-KEY =
|
||||||
su
|
su
|
||||||
))
|
))
|
||||||
|
# 注入 has_structural_match: 结构性匹配检测的结果(不依赖变量名 KEY)
|
||||||
|
# 当 detect_keyword 通过结构识别出匹配时,让规则引擎也能利用这个信号
|
||||||
|
features["has_structural_match"] = bool(re.search(
|
||||||
|
r'IF\s+\w+-\w+\s*[=<>]\s*\w+-\w+.*' # 跨文件字段比较
|
||||||
|
r'(?:PERFORM|END-PERFORM|READ)', # 含循环/读取
|
||||||
|
su, re.DOTALL
|
||||||
|
))
|
||||||
|
|
||||||
# 2. 运行所有混淆组解析器
|
# 2. 运行所有混淆组解析器
|
||||||
resolved_types: dict[str, str] = {}
|
resolved_types: dict[str, str] = {}
|
||||||
@@ -205,19 +212,48 @@ def _path_rule_engine(
|
|||||||
final_category = keyword_info["category"]
|
final_category = keyword_info["category"]
|
||||||
final_base_confidence = keyword_info["confidence"]
|
final_base_confidence = keyword_info["confidence"]
|
||||||
|
|
||||||
|
# 规则引擎结果优先级: 匹配检测 > 辅助推断
|
||||||
|
# マッチング/項目チェック/キーブレイク/編集処理 是主类型,优先级高
|
||||||
|
# M:N/DIVIDE 是辅助推断,仅当主类型未命中时才采纳
|
||||||
|
_MAIN_TYPE_PRIORITY = {"マッチング", "項目チェック(重複含む)", "項目チェック(重複含まず)",
|
||||||
|
"キーブレイク", "編集処理(校验)", "二段階マッチング",
|
||||||
|
"単純マッチング", "混合マッチング", "CSV合并", "CSV拆分",
|
||||||
|
"純粋マッチング"}
|
||||||
|
|
||||||
# 如果规则引擎有更高置信度的结果, 则采纳
|
# 如果规则引擎有更高置信度的结果, 则采纳
|
||||||
# 使用第一轮缓存的结果(M1: 消除冗余重复调用)
|
# 使用第一轮缓存的结果(M1: 消除冗余重复调用)
|
||||||
best_resolved_type = None
|
best_resolved_type = None
|
||||||
best_resolved_conf = 0.0
|
best_resolved_conf = 0.0
|
||||||
|
best_is_main = False
|
||||||
for pair_name, rtype in resolved_types.items():
|
for pair_name, rtype in resolved_types.items():
|
||||||
cached_conf = resolved_confidences.get(pair_name, 0.0)
|
cached_conf = resolved_confidences.get(pair_name, 0.0)
|
||||||
if cached_conf > best_resolved_conf:
|
is_main = rtype in _MAIN_TYPE_PRIORITY
|
||||||
best_resolved_conf = cached_conf
|
|
||||||
best_resolved_type = rtype
|
|
||||||
|
|
||||||
if best_resolved_type and best_resolved_conf > final_base_confidence:
|
if best_resolved_type is None:
|
||||||
|
best_resolved_type = rtype
|
||||||
|
best_resolved_conf = cached_conf
|
||||||
|
best_is_main = is_main
|
||||||
|
elif is_main and not best_is_main:
|
||||||
|
# 主类型覆盖非主类型(即使置信度略低)
|
||||||
|
best_resolved_type = rtype
|
||||||
|
best_resolved_conf = cached_conf
|
||||||
|
best_is_main = True
|
||||||
|
elif cached_conf > best_resolved_conf:
|
||||||
|
best_resolved_type = rtype
|
||||||
|
best_resolved_conf = cached_conf
|
||||||
|
best_is_main = is_main
|
||||||
|
|
||||||
|
if best_resolved_type:
|
||||||
|
final_is_main = final_category in _MAIN_TYPE_PRIORITY
|
||||||
|
if best_resolved_conf > final_base_confidence:
|
||||||
|
# 置信度更高 → 替换
|
||||||
final_category = best_resolved_type
|
final_category = best_resolved_type
|
||||||
final_base_confidence = best_resolved_conf
|
final_base_confidence = best_resolved_conf
|
||||||
|
elif best_is_main and not final_is_main and final_base_confidence < 0.40:
|
||||||
|
# 主类型替代低确信度的非主类型(如 M:N→マッチング)
|
||||||
|
# 但如果 keyword 已确定具体分类(如编码转换 0.85),不覆盖
|
||||||
|
final_category = best_resolved_type
|
||||||
|
final_base_confidence = max(final_base_confidence, best_resolved_conf)
|
||||||
|
|
||||||
# 5. 计算 4 因子确信度
|
# 5. 计算 4 因子确信度
|
||||||
keyword_result_v2 = _build_keyword_result_for_v2(keyword_info)
|
keyword_result_v2 = _build_keyword_result_for_v2(keyword_info)
|
||||||
|
|||||||
@@ -42,11 +42,14 @@ def resolve_matching_vs_keybreak(features: dict) -> dict:
|
|||||||
evidence.append(f"WS-PREV-KEY 存在 + 累加器存在 + IF 分支 → キーブレイク")
|
evidence.append(f"WS-PREV-KEY 存在 + 累加器存在 + IF 分支 → キーブレイク")
|
||||||
return {"resolved_type": "キーブレイク", "confidence": 0.85, "evidence": evidence}
|
return {"resolved_type": "キーブレイク", "confidence": 0.85, "evidence": evidence}
|
||||||
|
|
||||||
# 补充规则: SELECT 文件数 >= 2 且 comparison 至少 1 → 倾向マッチング
|
# 补充规则: SELECT 文件数 >= 2 且 comparison/eqlality 至少 1 → 倾向マッチング
|
||||||
# 要求必须有实际的 KEY 变量比较(防止计数器比较误判)
|
# 要求必须有实际的 KEY 变量比较(防止计数器比较误判)
|
||||||
|
# 或结构性匹配检测信号(变量名不含 KEY 但结构是匹配)
|
||||||
has_key_compare = variable_patterns.get("has_prev_key", False) or features.get("has_key_var", False)
|
has_key_compare = variable_patterns.get("has_prev_key", False) or features.get("has_key_var", False)
|
||||||
if file_count >= 2 and comparison_ifs >= 1 and has_key_compare:
|
has_struct_match = features.get("has_structural_match", False) or features.get("has_prev_key", False)
|
||||||
evidence.append(f"SELECT 文件数 >=2 + comparison IF >=1 + KEY 变量 → マッチング")
|
effective_ifs = comparison_ifs + equality_ifs
|
||||||
|
if file_count >= 2 and effective_ifs >= 1 and (has_key_compare or has_struct_match):
|
||||||
|
evidence.append(f"SELECT 文件数 >=2 + IF >=1 + KEY/结构证据 → マッチング")
|
||||||
return {"resolved_type": "マッチング", "confidence": 0.75, "evidence": evidence}
|
return {"resolved_type": "マッチング", "confidence": 0.75, "evidence": evidence}
|
||||||
|
|
||||||
# 回退: 无法明确判定
|
# 回退: 无法明确判定
|
||||||
|
|||||||
@@ -4,8 +4,10 @@ COBOL 迁移专家设计的攻击面:
|
|||||||
- FP: 非匹配程序被误判为マッチング
|
- FP: 非匹配程序被误判为マッチング
|
||||||
- FN: 真实匹配程序未被识别
|
- FN: 真实匹配程序未被识别
|
||||||
- 边界: 注释关键词、旧式命名、多文件非匹配
|
- 边界: 注释关键词、旧式命名、多文件非匹配
|
||||||
|
- FN: 变量名不含 KEY 但结构是匹配程序
|
||||||
"""
|
"""
|
||||||
|
|
||||||
|
import re
|
||||||
from pathlib import Path
|
from pathlib import Path
|
||||||
import pytest
|
import pytest
|
||||||
|
|
||||||
@@ -16,25 +18,23 @@ from hina.classifier import detect_keyword
|
|||||||
FIXTURES = Path(__file__).parents[3] / "test-data" / "cobol" / "adversarial"
|
FIXTURES = Path(__file__).parents[3] / "test-data" / "cobol" / "adversarial"
|
||||||
|
|
||||||
# (filename, expect_matching, reason)
|
# (filename, expect_matching, reason)
|
||||||
# expect_matching=True → must be マッチング/二段階
|
|
||||||
# expect_matching=False → must NOT be マッチング/二段階
|
|
||||||
ADVERSARIAL_TESTS = [
|
ADVERSARIAL_TESTS = [
|
||||||
("ADV-FALSE-KEY.cbl", False,
|
("ADV-FALSE-KEY.cbl", False,
|
||||||
"FP: WS-KEY 变量但只是简单 ADD 程序,不应触发匹配"),
|
"FP: WS-KEY variable but only simple ADD, should NOT trigger matching"),
|
||||||
("ADV-KEY-IN-COMMENT.cbl", False,
|
("ADV-KEY-IN-COMMENT.cbl", False,
|
||||||
"FP: KEY 只在 *> 注释中,不应触发匹配"),
|
"FP: KEY only in *> comments, should NOT trigger matching"),
|
||||||
("ADV-PREVKEY-FAKE.cbl", False,
|
("ADV-PREVKEY-FAKE.cbl", False,
|
||||||
"FP: WS-PREV-KEY 但无匹配逻辑,不应触发匹配"),
|
"FP: WS-PREV-KEY without matching logic, should NOT trigger"),
|
||||||
("ADV-OLD-SCHOOL.cbl", True,
|
("ADV-OLD-SCHOOL.cbl", True,
|
||||||
"FN: K01-KEY 旧式命名,应识别为匹配"),
|
"FN: K01-KEY old-school naming, should detect matching"),
|
||||||
("ADV-TINY-MATCH.cbl", True,
|
("ADV-TINY-MATCH.cbl", True,
|
||||||
"FN: 极简匹配程序(1 文件),应识别"),
|
"FN: Minimal matching (1 file), should detect"),
|
||||||
("ADV-CALL-MATCH.cbl", False,
|
("ADV-CALL-MATCH.cbl", False,
|
||||||
"FP: CALL+WS-MAST-KEY,子程序调用应优先"),
|
"FP: CALL+WS-MAST-KEY, subprogram call should win"),
|
||||||
("ADV-ASCII-KEY.cbl", False,
|
("ADV-ASCII-KEY.cbl", False,
|
||||||
"FP: ASCII+WS-KEY,编码转换应优先"),
|
"FP: ASCII+WS-KEY, encoding conversion should win"),
|
||||||
("ADV-10FILES.cbl", False,
|
("ADV-10FILES.cbl", False,
|
||||||
"FP: 10 文件无 KEY 比较,不应触发匹配"),
|
"FP: 10 files no KEY comparison, should NOT trigger matching"),
|
||||||
]
|
]
|
||||||
|
|
||||||
|
|
||||||
@@ -44,21 +44,18 @@ ADVERSARIAL_TESTS = [
|
|||||||
ids=[t[0].replace('.cbl','') for t in ADVERSARIAL_TESTS],
|
ids=[t[0].replace('.cbl','') for t in ADVERSARIAL_TESTS],
|
||||||
)
|
)
|
||||||
def test_adversarial(filename, expect_matching, reason):
|
def test_adversarial(filename, expect_matching, reason):
|
||||||
"""对抗性测试:验证明假阳性/假阴性"""
|
"""Adversarial test: false positive / false negative check"""
|
||||||
path = FIXTURES / filename
|
path = FIXTURES / filename
|
||||||
assert path.exists(), f"Missing: {path}"
|
assert path.exists(), f"Missing: {path}"
|
||||||
src = path.read_text("utf-8")
|
src = path.read_text("utf-8")
|
||||||
|
|
||||||
# 1. extract_structure must not crash
|
|
||||||
struct = extract_structure(src)
|
struct = extract_structure(src)
|
||||||
assert struct is not None
|
assert struct is not None
|
||||||
|
|
||||||
# 2. classify_program must not crash
|
|
||||||
result = classify_program(src)
|
result = classify_program(src)
|
||||||
assert result is not None
|
assert result is not None
|
||||||
assert result["confidence"] >= 0
|
assert result["confidence"] >= 0
|
||||||
|
|
||||||
# 3. False positive/negative check
|
|
||||||
is_matching = "マッチング" in result["category"] or "二段階" in result["category"]
|
is_matching = "マッチング" in result["category"] or "二段階" in result["category"]
|
||||||
if expect_matching:
|
if expect_matching:
|
||||||
assert is_matching, (
|
assert is_matching, (
|
||||||
@@ -71,10 +68,74 @@ def test_adversarial(filename, expect_matching, reason):
|
|||||||
f"(conf={result['confidence']:.2f}). Reason: {reason}"
|
f"(conf={result['confidence']:.2f}). Reason: {reason}"
|
||||||
)
|
)
|
||||||
|
|
||||||
# 4. Keyword detection sanity
|
|
||||||
kw = detect_keyword(src)
|
kw = detect_keyword(src)
|
||||||
if expect_matching:
|
if expect_matching:
|
||||||
# Matching programs should have at least 1 keyword match
|
|
||||||
assert len(kw) >= 1 or result["method"] != "rule_engine_fallback", (
|
assert len(kw) >= 1 or result["method"] != "rule_engine_fallback", (
|
||||||
f"{filename}: matching program with 0 keyword matches"
|
f"{filename}: matching program with 0 keyword matches"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
|
def test_structural_matching_no_keyword():
|
||||||
|
"""FN: Matching program without KEY in variable names (CUST-CODE vs ORDR-CODE)
|
||||||
|
|
||||||
|
Real-world COBOL matching programs often use -CODE or -ID instead of -KEY.
|
||||||
|
Structural detection must catch these even without naming hints.
|
||||||
|
"""
|
||||||
|
src = """ IDENTIFICATION DIVISION.
|
||||||
|
PROGRAM-ID. REALMT.
|
||||||
|
ENVIRONMENT DIVISION.
|
||||||
|
INPUT-OUTPUT SECTION.
|
||||||
|
FILE-CONTROL.
|
||||||
|
SELECT CUST-FILE ASSIGN TO 'CUST.DAT'.
|
||||||
|
SELECT ORDR-FILE ASSIGN TO 'ORDR.DAT'.
|
||||||
|
DATA DIVISION.
|
||||||
|
FILE SECTION.
|
||||||
|
FD CUST-FILE.
|
||||||
|
01 CUST-REC.
|
||||||
|
05 CUST-CODE PIC X(10).
|
||||||
|
05 CUST-NAME PIC X(30).
|
||||||
|
FD ORDR-FILE.
|
||||||
|
01 ORDR-REC.
|
||||||
|
05 ORDR-CODE PIC X(10).
|
||||||
|
05 ORDR-AMT PIC 9(7)V99.
|
||||||
|
WORKING-STORAGE SECTION.
|
||||||
|
01 WS-CUST-CODE PIC X(10).
|
||||||
|
01 WS-ORDR-CODE PIC X(10).
|
||||||
|
01 WS-EOF1 PIC X VALUE 'N'.
|
||||||
|
01 WS-EOF2 PIC X VALUE 'N'.
|
||||||
|
PROCEDURE DIVISION.
|
||||||
|
MAIN.
|
||||||
|
OPEN INPUT CUST-FILE ORDR-FILE.
|
||||||
|
READ CUST-FILE INTO CUST-REC
|
||||||
|
AT END MOVE 'Y' TO WS-EOF1.
|
||||||
|
READ ORDR-FILE INTO ORDR-REC
|
||||||
|
AT END MOVE 'Y' TO WS-EOF2.
|
||||||
|
PERFORM UNTIL WS-EOF1 = 'Y' OR WS-EOF2 = 'Y'
|
||||||
|
IF CUST-CODE = ORDR-CODE
|
||||||
|
DISPLAY 'MATCH'
|
||||||
|
ELSE IF CUST-CODE < ORDR-CODE
|
||||||
|
READ CUST-FILE AT END MOVE 'Y' TO WS-EOF1
|
||||||
|
ELSE
|
||||||
|
READ ORDR-FILE AT END MOVE 'Y' TO WS-EOF2
|
||||||
|
END-IF
|
||||||
|
END-PERFORM.
|
||||||
|
CLOSE CUST-FILE ORDR-FILE.
|
||||||
|
STOP RUN.
|
||||||
|
"""
|
||||||
|
result = classify_program(src)
|
||||||
|
kw = detect_keyword(src)
|
||||||
|
|
||||||
|
# Must have structural matching keyword
|
||||||
|
assert any("structural" in k[2] for k in kw), (
|
||||||
|
f"Expected structural matching keyword, got {kw}"
|
||||||
|
)
|
||||||
|
|
||||||
|
# Must be classified as matching
|
||||||
|
assert "マッチング" in result["category"] or "二段階" in result["category"], (
|
||||||
|
f"Expected matching, got '{result['category']}'"
|
||||||
|
)
|
||||||
|
|
||||||
|
# Confidence should be reasonable
|
||||||
|
assert result["confidence"] > 0.30, (
|
||||||
|
f"Confidence too low: {result['confidence']:.2f}"
|
||||||
|
)
|
||||||
|
|||||||
Reference in New Issue
Block a user