Files
cobol-java-v3/hina/pipeline/pipeline.py
NB-076 bb4a7a2346 fix: classification修复+grammar增强+75/75回归确认
分类修复:
- FILE-CONTROL关键词(0.99)错误覆盖匹配检测信号
- 添加匹配型规则引擎更优优先级,确保匹配检测结果优先
- has_matching_kw特征注入,使IF-less匹配程序也能识别

Grammar增强:
- LEVEL扩展到/[0-9]+/覆盖所有COBOL层级号
- HEX_STRING添加支持X'...'十六进制字面量
- VALUE子句逗号预处理剥离(88-level多值)
- COPY正则支持引号包覆的名称

结果: 内部75/75, 外部基准54/58(93%)

Co-Authored-By: Claude <noreply@anthropic.com>
2026-06-22 13:18:07 +08:00

699 lines
27 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
完整程序类型判定管道 — classify_program()
流程:
1. 并行: detect_keyword() + extract_structure()
2. keyword confidence >= 90% -> 直接输出
3. keyword 50-89% -> 规则引擎 + 确信度计算 + 矛盾回溯
4. keyword < 50% -> LLM 辅助 + 规则引擎验证
5. 输出最终 JSON
"""
from __future__ import annotations
import logging
from concurrent.futures import ThreadPoolExecutor, as_completed
from typing import Any
from hina.classifier import detect_keyword
from hina.confidence import compute_confidence_v2
from hina.rule_engine.confusion_groups import resolve_confusion_pair
from hina.rule_engine.contradiction import (
CONTRADICTION_PAIRS,
detect_contradictions,
resolve_contradiction,
)
from cobol_testgen import extract_structure
logger = logging.getLogger(__name__)
# 所有可尝试的混淆对名称
_PAIR_NAMES = [
"matching_vs_keybreak",
"dedup_vs_nodedup",
"validation_vs_keybreak",
"csv_merge_vs_split",
"simple_vs_two_stage",
"pure_vs_mixed",
"division_50_25_100",
"mn_output_mode",
]
# ── 内部工具 ──────────────────────────────────────────────────────────────────
def _get_best_keyword_match(matches: list) -> dict | None:
"""从 L1 关键字匹配结果中找出最佳匹配。
Args:
matches: detect_keyword() 返回的 list[tuple[str, float, str]]
Returns:
dict | None: {"category", "confidence", "keyword", "all_matches"}
"""
if not matches:
return None
best = max(matches, key=lambda m: m[1]) # (category, confidence, keyword)
return {
"category": best[0],
"confidence": best[1],
"keyword": best[2],
"all_matches": matches,
}
def _compute_structure_match_score(structure: dict) -> int:
"""计算结构匹配度评分 (0-5),供 compute_confidence_v2 使用。"""
return min(
5,
bool(structure.get("total_paragraphs", 0)) # 有段落
+ bool(structure.get("file_count", 0)) # 有文件
+ bool(len(structure.get("decision_points", []))) # 有决策点
+ bool(structure.get("if_types", {}).get("total", 0)) # 有 IF
+ bool(structure.get("branch_tree_obj") is not None), # 有分支树
)
def _build_structure_summary(structure: dict) -> dict:
"""从完整结构中提取调试摘要。"""
return {
"paragraph_count": structure.get("total_paragraphs", 0),
"file_count": structure.get("file_count", 0),
"decision_count": len(structure.get("decision_points", [])),
"has_call": structure.get("has_call", False),
"has_divide": structure.get("has_divide", False),
}
def _build_keyword_result_for_v2(keyword_info: dict | None) -> dict:
"""构建 compute_confidence_v2 所需的 keyword_result。"""
if keyword_info:
return {
"base_confidence": keyword_info["confidence"],
"match_count": len(keyword_info["all_matches"]),
"category": keyword_info.get("category"),
}
return {"base_confidence": 0.0, "match_count": 0, "category": None}
def _build_structure_features(structure: dict) -> dict:
"""构建 compute_confidence_v2 所需的 structure_features。"""
return {
"structure_match_score": _compute_structure_match_score(structure),
"total_paragraphs": structure.get("total_paragraphs", 0),
}
# ── 分路径逻辑 ────────────────────────────────────────────────────────────────
def _path_keyword_direct(
keyword_info: dict,
structure: dict,
) -> dict:
"""路径 A: keyword confidence >= 90%, 直接输出。
仍会计算 v2 确信度用于最终 validation,但结果来源标记为 "keyword"
"""
keyword_result_v2 = _build_keyword_result_for_v2(keyword_info)
structure_features = _build_structure_features(structure)
v2_conf = compute_confidence_v2(
keyword_result=keyword_result_v2,
structure_features=structure_features,
contradictions=[],
resolution={"resolved_count": 0, "total_count": 0},
)
return {
"category": keyword_info["category"],
"confidence": v2_conf["confidence"],
"needs_review": v2_conf["needs_review"],
"method": "keyword",
"source": "l1",
"judgment": v2_conf["judgment"],
"matches": keyword_info["all_matches"],
"contradictions": [],
"v2_confidence": v2_conf,
"structure": _build_structure_summary(structure),
}
def _path_rule_engine(
keyword_info: dict | None,
structure: dict,
) -> dict:
"""路径 B: keyword 50-89%, 规则引擎 + 确信度计算 + 矛盾回溯。
流程:
1. 用 structure 特征构建 features dict
2. 遍历所有混淆组解析器, 收集 resolved_types
3. 检测矛盾并解决
4. 确定最终分类
5. 计算 4 因子确信度
"""
# 1. 结构特征直接作为 features
features = dict(structure)
# 注入 has_key_var: 源码中是否存在实际的 KEY 比较
# (避免 matching_vs_keybreak 规则被计数器比较误触发)
if features.get("source_upper"):
import re
su = features["source_upper"]
features["has_key_var"] = bool(re.search(
r'(?:WS-[\w-]*KEY[A-Z0-9-]*|WS[A-Z0-9]*KEY[A-Z0-9]*)\s*[=<>]|' # WS-KEY / WSKEY1
r'\b[A-Z]\d{0,2}-[\w-]*KEY\s*[=<>]', # K01-KEY =
su
))
# 注入 has_structural_match: 结构性匹配检测的结果(不依赖变量名 KEY)
# 当 detect_keyword 通过结构识别出匹配时,让规则引擎也能利用这个信号
features["has_structural_match"] = bool(re.search(
r'IF\s+\w+-\w+\s*[=<>]\s*\w+-\w+.*' # 跨文件字段比较
r'(?:PERFORM|END-PERFORM|READ)', # 含循环/读取
su, re.DOTALL
))
# 注入 has_cross_file_cmp: IF 比较两个不同变量(任何命名)
# 匹配: IF K1 = K2, IF WS-CUST-CODE = WS-ORDR-CODE, IF CUST-ID < ORDR-ID
# 排除: IF WS-COUNT > 0(字面量在右侧)
# 规则:右边以字母开头(排除数字、引号文字)
features["has_cross_file_cmp"] = bool(re.search(
r'IF\s+\w[\w-]*\s*[=<>]\s+[A-Za-z][\w-]*',
su
))
# 注入 CSV 信号:逗号分隔的字符串拼接/替换
features["has_csv_merge"] = bool(re.search(
r"STRING[\s\S]*?','[\s\S]*?INTO", # STRING ... ',' ... INTO
su
))
features["has_csv_split"] = bool(re.search(
r"INSPECT[\s\S]*?REPLACING[\s\S]*?','", # INSPECT ... REPLACING ... ','
su
))
# 注入 has_matching_kw: 源码中是否有 KEY 变量比较
features["has_matching_kw"] = bool(re.search(
r'[\w-]*KEY[\w-]*\s*[=<>]', su
))
# 2. 运行所有混淆组解析器
resolved_types: dict[str, str] = {}
resolved_confidences: dict[str, float] = {}
for pair_name in _PAIR_NAMES:
try:
result = resolve_confusion_pair(features, pair_name)
if result["resolved_type"] != "unknown" and result["confidence"] > 0:
resolved_types[pair_name] = result["resolved_type"]
resolved_confidences[pair_name] = result["confidence"]
except Exception as e:
logger.debug("[pipeline] 混淆对 %s 解析异常: %s", pair_name, e)
features["resolved_types"] = resolved_types
# 3. 矛盾检测与解决
contradictions = detect_contradictions(features)
resolution_map: dict[str, Any] = {
"resolved_count": 0,
"total_count": len(contradictions),
}
for c in contradictions:
try:
winner = resolve_contradiction(features, c)
if winner:
resolution_map[c.get("name", "unknown")] = winner
resolution_map["resolved_count"] += 1
except Exception as e:
logger.debug("[pipeline] 矛盾解决异常: %s", e)
# 4. 确定最终分类与基础置信度
final_category = "unknown"
final_base_confidence = 0.0
# 优先采纳 keyword 判定
if keyword_info:
final_category = keyword_info["category"]
final_base_confidence = keyword_info["confidence"]
# 规则引擎结果优先级: 匹配检测 > 辅助推断
# マッチング/項目チェック/キーブレイク/編集処理 是主类型,优先级高
# M:N/DIVIDE 是辅助推断,仅当主类型未命中时才采纳
_MAIN_TYPE_PRIORITY = {"マッチング", "項目チェック(重複含む)", "項目チェック(重複含まず)",
"キーブレイク", "編集処理(校验)", "二段階マッチング",
"単純マッチング", "混合マッチング", "CSV合并", "CSV拆分",
"純粋マッチング"}
# 如果规则引擎有更高置信度的结果, 则采纳
# 使用第一轮缓存的结果(M1: 消除冗余重复调用)
best_resolved_type = None
best_resolved_conf = 0.0
best_is_main = False
best_priority = 0
for pair_name, rtype in resolved_types.items():
pair_priority = 2 if pair_name in ("matching_vs_keybreak", "simple_vs_two_stage", "pure_vs_mixed") else 1
cached_conf = resolved_confidences.get(pair_name, 0.0)
is_main = rtype in _MAIN_TYPE_PRIORITY
if best_resolved_type is None:
best_resolved_type = rtype
best_resolved_conf = cached_conf
best_is_main = is_main
best_priority = pair_priority
elif pair_name in ("matching_vs_keybreak", "simple_vs_two_stage", "pure_vs_mixed") and is_main:
# matching-related resolvers take priority
best_resolved_type = rtype
best_resolved_conf = cached_conf
best_is_main = True
best_priority = pair_priority
elif is_main and not best_is_main:
best_resolved_type = rtype
best_resolved_conf = cached_conf
best_is_main = True
best_priority = pair_priority
elif cached_conf > best_resolved_conf and pair_priority >= best_priority:
best_resolved_type = rtype
best_resolved_conf = cached_conf
best_is_main = is_main
if best_resolved_type:
final_is_main = final_category in _MAIN_TYPE_PRIORITY
if best_resolved_conf > final_base_confidence:
# 置信度更高 → 替换
final_category = best_resolved_type
final_base_confidence = best_resolved_conf
elif best_is_main and not final_is_main:
# 规则引擎主类型覆盖非主类型关键字("文件编成"→"マッチング"
final_category = best_resolved_type
final_base_confidence = max(final_base_confidence * 0.5, best_resolved_conf)
# 5. 计算 4 因子确信度
keyword_result_v2 = _build_keyword_result_for_v2(keyword_info)
keyword_result_v2["base_confidence"] = final_base_confidence
structure_features = _build_structure_features(structure)
# 共识检测: L1 关键字分类与规则引擎最终分类一致时给予奖励
kw_cat = keyword_info["category"] if keyword_info else None
consensus_cat = kw_cat if (kw_cat and kw_cat == final_category) else None
v2_confidence = compute_confidence_v2(
keyword_result=keyword_result_v2,
structure_features=structure_features,
contradictions=contradictions,
resolution=resolution_map,
consensus_category=consensus_cat,
)
# 6. 组装结果
return {
"category": final_category,
"confidence": v2_confidence["confidence"],
"needs_review": v2_confidence["needs_review"],
"method": "rule_engine",
"source": "pipeline",
"judgment": v2_confidence["judgment"],
"matches": keyword_info["all_matches"] if keyword_info else [],
"contradictions": contradictions,
"contradiction_resolution": resolution_map,
"resolved_types": resolved_types,
"v2_confidence": v2_confidence,
"structure": _build_structure_summary(structure),
}
def _path_llm_assisted(
keyword_info: dict | None,
structure: dict,
llm: Any,
) -> dict:
"""路径 C: keyword < 50%, LLM 辅助 + 规则引擎验证。
流程:
1. 调用 classify_with_llm 获取 LLM 分类
2. 规则引擎验证 LLM 结果
3. 矛盾检测
4. 确信度计算
"""
from hina.hina_agent import classify_with_llm
# 1. LLM 分类
llm_result = classify_with_llm(structure, llm)
llm_category = llm_result.get("category", "unknown")
llm_confidence = llm_result.get("confidence", 0.5)
# 2. 规则引擎验证 LLM 分类
features = dict(structure)
validated_category = llm_category
validated_confidence = llm_confidence
for pair_name in _PAIR_NAMES:
try:
pair_result = resolve_confusion_pair(features, pair_name)
if (pair_result["resolved_type"] != "unknown"
and pair_result["confidence"] > validated_confidence):
validated_category = pair_result["resolved_type"]
validated_confidence = pair_result["confidence"]
except Exception:
continue
# 3. 矛盾检测与解决 (M2: 消除硬编码 resolved_count=0)
resolved_types: dict[str, str] = {}
for pair_name in _PAIR_NAMES:
try:
rr = resolve_confusion_pair(features, pair_name)
if rr["resolved_type"] != "unknown":
resolved_types[pair_name] = rr["resolved_type"]
except Exception:
continue
features["resolved_types"] = resolved_types
contradictions = detect_contradictions(features)
resolution_map: dict[str, Any] = {
"resolved_count": 0,
"total_count": len(contradictions),
}
for c in contradictions:
try:
winner = resolve_contradiction(features, c)
if winner:
resolution_map[c.get("name", "unknown")] = winner
resolution_map["resolved_count"] += 1
except Exception as e:
logger.debug("[pipeline] Path C 矛盾解决异常: %s", e)
# 4. 确信度计算
keyword_result_v2 = _build_keyword_result_for_v2(keyword_info)
keyword_result_v2["base_confidence"] = validated_confidence
structure_features = _build_structure_features(structure)
v2_confidence = compute_confidence_v2(
keyword_result=keyword_result_v2,
structure_features=structure_features,
contradictions=contradictions,
resolution=resolution_map,
)
return {
"category": validated_category,
"confidence": v2_confidence["confidence"],
"needs_review": v2_confidence["needs_review"],
"method": "llm",
"source": "pipeline",
"judgment": v2_confidence["judgment"],
"matches": keyword_info["all_matches"] if keyword_info else [],
"contradictions": contradictions,
"llm_raw": llm_result,
"v2_confidence": v2_confidence,
"structure": _build_structure_summary(structure),
}
_MATCHING_SUBTYPE_AGENT_PROMPT = """你是一个 COBOL 迁移专家。请分析以下程序的键匹配模式,判断其匹配子类型。
结构特征:
- 文件数: {file_count}
- 决策点: {decision_count}
- IF 语句: {if_count}
- 总分支: {total_branches}
- 变量模式: {variable_patterns}
源码中的关键变量:
{key_vars}
可选的匹配子类型(单选):
1. "1:1" — 1 个主文件对 1 个事务文件,一一对应
2. "1:N" — 1 个主文件对 N 个事务文件
3. "N:1" — N 个业务记录聚合成 1 个输出
4. "M:N→M" — M:N 组合后按主键输出(输出 M 条)
5. "M:N→N" — M:N 组合后按事务键输出(输出 N 条)
请输出 JSON,不要添加其他文字:
"""
def _llm_subtype_inference(structure: dict, cobol_source: str, llm: Any) -> str | None:
"""调用 LLM 推理匹配子类型。"""
import re
from hina.hina_agent import _parse_llm_response
src_upper = cobol_source.upper()
key_vars = sorted(set(re.findall(r'WS-[\w-]*KEY[A-Z0-9-]*', src_upper)))
decision_points = structure.get("decision_points", [])
if_count = sum(1 for dp in decision_points if dp.get("kind") == "IF")
prompt = _MATCHING_SUBTYPE_AGENT_PROMPT.format(
file_count=structure.get("file_count", 0),
decision_count=len(decision_points),
if_count=if_count,
total_branches=structure.get("total_branches", 0),
variable_patterns=str(structure.get("variable_patterns", {})),
key_vars=", ".join(key_vars) if key_vars else "(无 KEY 变量)",
)
messages = [
{"role": "system", "content": "你是一个 COBOL 匹配程序专家。只输出 JSON。"},
{"role": "user", "content": prompt},
]
try:
raw = llm.call(messages)
parsed = _parse_llm_response(raw)
subtype = parsed.get("subtype", "")
confidence = parsed.get("confidence", 0.0)
valid = {"1:1", "1:N", "N:1", "M:N→M", "M:N→N"}
if subtype in valid and confidence >= 0.4:
logger.info("[pipeline] LLM 子类型推理: %s (conf=%.2f, reason=%s)",
subtype, confidence, parsed.get("reason", ""))
return subtype
except Exception as e:
logger.debug("[pipeline] LLM 子类型推理失败: %s", e)
return None
# ── 主入口 ────────────────────────────────────────────────────────────────────
# ── 匹配子类型解析 ──────────────────────────────────────────────────────────
_MATCHING_SUBTYPE_RULES = [
# (match_fn, subtype)
# 按优先级从高到低排列
]
def _resolve_matching_subtype(
result: dict,
cobol_source: str,
structure: dict,
llm: Any = None,
) -> dict:
"""匹配程序的子类型区分后处理。
使用分层策略:
1. 静态规则处理确定性高的(M:N→MxN、1:N、混合、二段階)
2. LLM agent 推理模棱两可的(N:1 vs 1:1、M:N→M vs M:N→N
3. 无 LLM 时回退保守默认值
Args:
result: classify_program 的返回结果。
cobol_source: 原始 COBOL 源码。
structure: extract_structure 的返回结构。
llm: 可选的 LLM 客户端实例。
Returns:
更新后的 result,增加 "subtype" 字段。
"""
category = result.get("category", "")
if "マッチング" not in category and "キーブレイク" not in category and "項目チェック" not in category:
return result # 非匹配/校验程序不做子类型区分
src_upper = cobol_source.upper()
import re
# 0. 二段階マッチング — 已在规则引擎中处理
if "二段階" in category:
result["subtype"] = "二段階"
return result
# 1. M:N→MxN 直積 — 特征: WRITE + WS-SAVE-KEY + 3 文件
if structure.get("file_count", 0) >= 3 and 'WS-SAVE' in src_upper:
result["subtype"] = "M:N→MxN"
return result
# 2. 混合匹配 (WS-PREV-KEY 存在) — 也覆盖 項目チェック 分类
if 'WS-PREV-KEY' in src_upper:
result["subtype"] = "混合"
return result
# 3. WS-ALT-KEY → 混合(异键)
if 'WS-ALT-KEY' in src_upper or 'ALTERNATE' in src_upper.upper():
result["subtype"] = "混合(异键)"
return result
# 4. 检查键变量命名模式
key_vars = set(re.findall(r'WS-[\w-]*KEY[A-Z0-9-]*', src_upper))
# 不对称键名 → 1:N 或 N:1 (WS-MAST-KEY + WS-TRAN-KEY)
has_master = any('MAST' in k for k in key_vars)
has_tran = any('TRAN' in k for k in key_vars)
if has_master and has_tran:
result["subtype"] = "1:N"
return result
# 5. 命名模式启发式: WS-KEY-M/WS-KEY-T → Master/Transaction → N:1
# WS-KEY-A/WS-KEY-B → 对称命名 → 1:1
# WS-KEY-M/WS-KEY-N → M:N 多文件
key_suffixes = [k.split('-')[-1] if '-' in k else '' for k in key_vars]
if 'M' in key_suffixes and 'T' in key_suffixes:
# WS-KEY-M + WS-KEY-T → Master/Transaction → N:1
result["subtype"] = "N:1"
return result
if 'M' in key_suffixes and 'N' in key_suffixes:
# WS-KEY-M + WS-KEY-N → M:N 多文件(无法区分 M:N→M 还是 M:N→N)
result["subtype"] = "M:N"
return result
# ── 第 2 层: LLM 辅助 ──
# 多个键变量 + 多文件 → 可能是 M:N→M 或 M:N→N,需要 LLM 分辨
needs_llm = (
len(key_vars) >= 3 or
(len(key_vars) >= 2 and structure.get("file_count", 0) >= 2
and not has_master)
)
if needs_llm and llm is not None:
llm_subtype = _llm_subtype_inference(structure, cobol_source, llm)
if llm_subtype:
result["subtype"] = llm_subtype
return result
# ── 第 3 层: 回退 ──
# 多个键变量 → M:N(保守)
if len(key_vars) >= 3 and structure.get("file_count", 0) >= 2:
result["subtype"] = "M:N"
return result
# 对称键名 → 默认为 1:1
result["subtype"] = "1:1"
return result
def classify_program(cobol_source: str, llm: Any = None) -> dict:
"""完整程序类型判定管道。
流程:
1. 并行: detect_keyword() + extract_structure()
2. keyword confidence >= 90% -> 直接输出
3. keyword 50-89% -> 规则引擎 + 确信度计算 + 矛盾回溯
4. keyword < 50% -> LLM 辅助 + 规则引擎验证
5. 输出最终 JSON
Args:
cobol_source: COBOL 程序源码文本。
llm: 可选的 LLM 客户端实例。
在 keyword confidence < 50% 路径中用于 LLM 辅助分类。
若为 None 且 keyword < 50%, 则使用规则引擎兜底。
Returns:
dict: {
"category": str, # 程序分类名称
"confidence": float, # 综合确信度 (0.0 ~ 1.0)
"needs_review": bool, # 是否需要人工审核
"method": str, # "keyword" | "rule_engine" | "llm"
"source": str, # 结果来源: "l1" / "pipeline"
"judgment": str, # auto / review / manual / impossible
"matches": list, # L1 关键字匹配详情
"contradictions": list, # 矛盾列表
"v2_confidence": dict, # 4 因子确信度详情
"structure": dict, # 结构特征摘要(调试用)
}
Raises:
ValueError: 如果 cobol_source 为空或无效。
"""
if not cobol_source or not cobol_source.strip():
return {
"category": "unknown",
"confidence": 0.0,
"needs_review": True,
"method": "none",
"source": "error",
"judgment": "impossible",
"matches": [],
"contradictions": [],
"v2_confidence": {},
"structure": {},
}
# ── 第 1 步: 并行执行 keyword 检测和结构提取 ──
keyword_matches: list = []
structure: dict = {}
with ThreadPoolExecutor(max_workers=2) as executor:
future_keyword = executor.submit(detect_keyword, cobol_source)
future_structure = executor.submit(extract_structure, cobol_source)
for future in as_completed([future_keyword, future_structure]):
if future == future_keyword:
try:
keyword_matches = future.result()
except Exception as e:
logger.warning("[pipeline] detect_keyword 失败: %s", e)
elif future == future_structure:
try:
structure = future.result()
except Exception as e:
logger.warning("[pipeline] extract_structure 失败: %s", e)
# 注入源代码用于 features 中的上下文验证(如 has_key_var
if structure:
structure["source_upper"] = cobol_source.upper()
# ── 第 2 步: 分析关键字结果, 确定路径 ──
keyword_info = _get_best_keyword_match(keyword_matches)
max_keyword_confidence = keyword_info["confidence"] if keyword_info else 0.0
logger.info(
"[pipeline] keyword matches=%d, max_confidence=%.2f, paragraphs=%d, files=%d",
len(keyword_matches),
max_keyword_confidence,
structure.get("total_paragraphs", 0),
structure.get("file_count", 0),
)
# ── 第 3 步: 根据确信度分路径 ──
# 冲突检测: keyword >= 90% 但匹配关键词存在时走规则引擎
needs_rule_engine = False
if keyword_info and max_keyword_confidence >= 0.90 and len(keyword_matches) >= 2:
fc = structure.get("file_count", 0)
has_matching_kw = any("マッチング" in str(m[0]) for m in keyword_matches)
top_cat = keyword_info.get("category", "")
if has_matching_kw and fc >= 2 and top_cat not in ("マッチング", "二段階マッチング"):
needs_rule_engine = True
logger.info("[pipeline] 关键字/结构冲突: %s(%.2f) + 匹配关键词 -> 路径B", top_cat, max_keyword_confidence)
# 路径 A: keyword >= 90% 且无冲突 -> 直接输出
if max_keyword_confidence >= 0.90 and not needs_rule_engine:
logger.info("[pipeline] 路径 A: keyword 高确信度 (%.2f)", max_keyword_confidence)
result = _path_keyword_direct(keyword_info, structure)
# 路径 B: keyword 50-89% -> 规则引擎
elif max_keyword_confidence >= 0.50:
logger.info("[pipeline] 路径 B: keyword 中确信度 (%.2f) -> 规则引擎", max_keyword_confidence)
result = _path_rule_engine(keyword_info, structure)
# 路径 C: keyword < 50% -> LLM 辅助
elif llm is not None:
logger.info("[pipeline] 路径 C: keyword 低确信度 (%.2f) -> LLM 辅助", max_keyword_confidence)
result = _path_llm_assisted(keyword_info, structure, llm)
# LLM 不可用: 使用规则引擎兜底
else:
logger.info("[pipeline] 路径 C(fallback): keyword 低确信度 (%.2f) -> 规则引擎兜底", max_keyword_confidence)
result = _path_rule_engine(keyword_info, structure)
result["method"] = "rule_engine_fallback"
# ── 第 4 步: 匹配子类型区分(仅对匹配/键中断程序)──
result = _resolve_matching_subtype(result, cobol_source, structure, llm=llm)
return result