cdba324b5a
对抗性全类型测试发现的缺陷和修复: 缺陷1: SORT/MERGE L1 关键词太严格(漏检) - 旧: 'SORT ON KEY' / 'MERGE ON KEY'(精确字符串) - COBOL 中的真实写法: SORT WORK-FILE ON ASCENDING KEY ... - 新: 正则 SORT(?:\s+\S+)?\s+ON\s+(?:ASCENDING|DESCENDING)?KEY 缺陷2: CSV 假阳性(STRING/INSPECT 非CSV也触发) - 旧: has_string=True -> CSV合并 - 新: 要求 has_csv_merge(STRING+逗号分隔) - 单纯字符串拼接不再触发 CSV 分类 缺陷3: ALTERNATE RECORD KEY 被 ORGANIZATION IS 覆盖 - 旧: 文件编成先于替代索引(同确信度先者胜) - 新: 替代索引放前面(更具体的分类优先) 回归: 767 passed(0 new failures)
671 lines
25 KiB
Python
671 lines
25 KiB
Python
"""
|
||
完整程序类型判定管道 — classify_program()
|
||
|
||
流程:
|
||
1. 并行: detect_keyword() + extract_structure()
|
||
2. keyword confidence >= 90% -> 直接输出
|
||
3. keyword 50-89% -> 规则引擎 + 确信度计算 + 矛盾回溯
|
||
4. keyword < 50% -> LLM 辅助 + 规则引擎验证
|
||
5. 输出最终 JSON
|
||
"""
|
||
|
||
from __future__ import annotations
|
||
|
||
import logging
|
||
from concurrent.futures import ThreadPoolExecutor, as_completed
|
||
from typing import Any
|
||
|
||
from hina.classifier import detect_keyword
|
||
from hina.confidence import compute_confidence_v2
|
||
from hina.rule_engine.confusion_groups import resolve_confusion_pair
|
||
from hina.rule_engine.contradiction import (
|
||
CONTRADICTION_PAIRS,
|
||
detect_contradictions,
|
||
resolve_contradiction,
|
||
)
|
||
from cobol_testgen import extract_structure
|
||
|
||
logger = logging.getLogger(__name__)
|
||
|
||
# 所有可尝试的混淆对名称
|
||
_PAIR_NAMES = [
|
||
"matching_vs_keybreak",
|
||
"dedup_vs_nodedup",
|
||
"validation_vs_keybreak",
|
||
"csv_merge_vs_split",
|
||
"simple_vs_two_stage",
|
||
"pure_vs_mixed",
|
||
"division_50_25_100",
|
||
"mn_output_mode",
|
||
]
|
||
|
||
|
||
# ── 内部工具 ──────────────────────────────────────────────────────────────────
|
||
|
||
|
||
def _get_best_keyword_match(matches: list) -> dict | None:
|
||
"""从 L1 关键字匹配结果中找出最佳匹配。
|
||
|
||
Args:
|
||
matches: detect_keyword() 返回的 list[tuple[str, float, str]]
|
||
|
||
Returns:
|
||
dict | None: {"category", "confidence", "keyword", "all_matches"}
|
||
"""
|
||
if not matches:
|
||
return None
|
||
best = max(matches, key=lambda m: m[1]) # (category, confidence, keyword)
|
||
return {
|
||
"category": best[0],
|
||
"confidence": best[1],
|
||
"keyword": best[2],
|
||
"all_matches": matches,
|
||
}
|
||
|
||
|
||
def _compute_structure_match_score(structure: dict) -> int:
|
||
"""计算结构匹配度评分 (0-5),供 compute_confidence_v2 使用。"""
|
||
return min(
|
||
5,
|
||
bool(structure.get("total_paragraphs", 0)) # 有段落
|
||
+ bool(structure.get("file_count", 0)) # 有文件
|
||
+ bool(len(structure.get("decision_points", []))) # 有决策点
|
||
+ bool(structure.get("if_types", {}).get("total", 0)) # 有 IF
|
||
+ bool(structure.get("branch_tree_obj") is not None), # 有分支树
|
||
)
|
||
|
||
|
||
def _build_structure_summary(structure: dict) -> dict:
|
||
"""从完整结构中提取调试摘要。"""
|
||
return {
|
||
"paragraph_count": structure.get("total_paragraphs", 0),
|
||
"file_count": structure.get("file_count", 0),
|
||
"decision_count": len(structure.get("decision_points", [])),
|
||
"has_call": structure.get("has_call", False),
|
||
"has_divide": structure.get("has_divide", False),
|
||
}
|
||
|
||
|
||
def _build_keyword_result_for_v2(keyword_info: dict | None) -> dict:
|
||
"""构建 compute_confidence_v2 所需的 keyword_result。"""
|
||
if keyword_info:
|
||
return {
|
||
"base_confidence": keyword_info["confidence"],
|
||
"match_count": len(keyword_info["all_matches"]),
|
||
"category": keyword_info.get("category"),
|
||
}
|
||
return {"base_confidence": 0.0, "match_count": 0, "category": None}
|
||
|
||
|
||
def _build_structure_features(structure: dict) -> dict:
|
||
"""构建 compute_confidence_v2 所需的 structure_features。"""
|
||
return {
|
||
"structure_match_score": _compute_structure_match_score(structure),
|
||
"total_paragraphs": structure.get("total_paragraphs", 0),
|
||
}
|
||
|
||
|
||
# ── 分路径逻辑 ────────────────────────────────────────────────────────────────
|
||
|
||
|
||
def _path_keyword_direct(
|
||
keyword_info: dict,
|
||
structure: dict,
|
||
) -> dict:
|
||
"""路径 A: keyword confidence >= 90%, 直接输出。
|
||
|
||
仍会计算 v2 确信度用于最终 validation,但结果来源标记为 "keyword"。
|
||
"""
|
||
keyword_result_v2 = _build_keyword_result_for_v2(keyword_info)
|
||
structure_features = _build_structure_features(structure)
|
||
|
||
v2_conf = compute_confidence_v2(
|
||
keyword_result=keyword_result_v2,
|
||
structure_features=structure_features,
|
||
contradictions=[],
|
||
resolution={"resolved_count": 0, "total_count": 0},
|
||
)
|
||
|
||
return {
|
||
"category": keyword_info["category"],
|
||
"confidence": v2_conf["confidence"],
|
||
"needs_review": v2_conf["needs_review"],
|
||
"method": "keyword",
|
||
"source": "l1",
|
||
"judgment": v2_conf["judgment"],
|
||
"matches": keyword_info["all_matches"],
|
||
"contradictions": [],
|
||
"v2_confidence": v2_conf,
|
||
"structure": _build_structure_summary(structure),
|
||
}
|
||
|
||
|
||
def _path_rule_engine(
|
||
keyword_info: dict | None,
|
||
structure: dict,
|
||
) -> dict:
|
||
"""路径 B: keyword 50-89%, 规则引擎 + 确信度计算 + 矛盾回溯。
|
||
|
||
流程:
|
||
1. 用 structure 特征构建 features dict
|
||
2. 遍历所有混淆组解析器, 收集 resolved_types
|
||
3. 检测矛盾并解决
|
||
4. 确定最终分类
|
||
5. 计算 4 因子确信度
|
||
"""
|
||
# 1. 结构特征直接作为 features
|
||
features = dict(structure)
|
||
|
||
# 注入 has_key_var: 源码中是否存在实际的 KEY 比较
|
||
# (避免 matching_vs_keybreak 规则被计数器比较误触发)
|
||
if features.get("source_upper"):
|
||
import re
|
||
su = features["source_upper"]
|
||
features["has_key_var"] = bool(re.search(
|
||
r'(?:WS-[\w-]*KEY[A-Z0-9-]*|WS[A-Z0-9]*KEY[A-Z0-9]*)\s*[=<>]|' # WS-KEY / WSKEY1
|
||
r'\b[A-Z]\d{0,2}-[\w-]*KEY\s*[=<>]', # K01-KEY =
|
||
su
|
||
))
|
||
# 注入 has_structural_match: 结构性匹配检测的结果(不依赖变量名 KEY)
|
||
# 当 detect_keyword 通过结构识别出匹配时,让规则引擎也能利用这个信号
|
||
features["has_structural_match"] = bool(re.search(
|
||
r'IF\s+\w+-\w+\s*[=<>]\s*\w+-\w+.*' # 跨文件字段比较
|
||
r'(?:PERFORM|END-PERFORM|READ)', # 含循环/读取
|
||
su, re.DOTALL
|
||
))
|
||
# 注入 CSV 信号:逗号分隔的字符串拼接/替换
|
||
features["has_csv_merge"] = bool(re.search(
|
||
r"STRING[\s\S]*?','[\s\S]*?INTO", # STRING ... ',' ... INTO
|
||
su
|
||
))
|
||
features["has_csv_split"] = bool(re.search(
|
||
r"INSPECT[\s\S]*?REPLACING[\s\S]*?,',", # INSPECT ... REPLACING ... ','
|
||
su
|
||
))
|
||
|
||
# 2. 运行所有混淆组解析器
|
||
resolved_types: dict[str, str] = {}
|
||
resolved_confidences: dict[str, float] = {}
|
||
for pair_name in _PAIR_NAMES:
|
||
try:
|
||
result = resolve_confusion_pair(features, pair_name)
|
||
if result["resolved_type"] != "unknown" and result["confidence"] > 0:
|
||
resolved_types[pair_name] = result["resolved_type"]
|
||
resolved_confidences[pair_name] = result["confidence"]
|
||
except Exception as e:
|
||
logger.debug("[pipeline] 混淆对 %s 解析异常: %s", pair_name, e)
|
||
|
||
features["resolved_types"] = resolved_types
|
||
|
||
# 3. 矛盾检测与解决
|
||
contradictions = detect_contradictions(features)
|
||
resolution_map: dict[str, Any] = {
|
||
"resolved_count": 0,
|
||
"total_count": len(contradictions),
|
||
}
|
||
for c in contradictions:
|
||
try:
|
||
winner = resolve_contradiction(features, c)
|
||
if winner:
|
||
resolution_map[c.get("name", "unknown")] = winner
|
||
resolution_map["resolved_count"] += 1
|
||
except Exception as e:
|
||
logger.debug("[pipeline] 矛盾解决异常: %s", e)
|
||
|
||
# 4. 确定最终分类与基础置信度
|
||
final_category = "unknown"
|
||
final_base_confidence = 0.0
|
||
|
||
# 优先采纳 keyword 判定
|
||
if keyword_info:
|
||
final_category = keyword_info["category"]
|
||
final_base_confidence = keyword_info["confidence"]
|
||
|
||
# 规则引擎结果优先级: 匹配检测 > 辅助推断
|
||
# マッチング/項目チェック/キーブレイク/編集処理 是主类型,优先级高
|
||
# M:N/DIVIDE 是辅助推断,仅当主类型未命中时才采纳
|
||
_MAIN_TYPE_PRIORITY = {"マッチング", "項目チェック(重複含む)", "項目チェック(重複含まず)",
|
||
"キーブレイク", "編集処理(校验)", "二段階マッチング",
|
||
"単純マッチング", "混合マッチング", "CSV合并", "CSV拆分",
|
||
"純粋マッチング"}
|
||
|
||
# 如果规则引擎有更高置信度的结果, 则采纳
|
||
# 使用第一轮缓存的结果(M1: 消除冗余重复调用)
|
||
best_resolved_type = None
|
||
best_resolved_conf = 0.0
|
||
best_is_main = False
|
||
for pair_name, rtype in resolved_types.items():
|
||
cached_conf = resolved_confidences.get(pair_name, 0.0)
|
||
is_main = rtype in _MAIN_TYPE_PRIORITY
|
||
|
||
if best_resolved_type is None:
|
||
best_resolved_type = rtype
|
||
best_resolved_conf = cached_conf
|
||
best_is_main = is_main
|
||
elif is_main and not best_is_main:
|
||
# 主类型覆盖非主类型(即使置信度略低)
|
||
best_resolved_type = rtype
|
||
best_resolved_conf = cached_conf
|
||
best_is_main = True
|
||
elif cached_conf > best_resolved_conf:
|
||
best_resolved_type = rtype
|
||
best_resolved_conf = cached_conf
|
||
best_is_main = is_main
|
||
|
||
if best_resolved_type:
|
||
final_is_main = final_category in _MAIN_TYPE_PRIORITY
|
||
if best_resolved_conf > final_base_confidence:
|
||
# 置信度更高 → 替换
|
||
final_category = best_resolved_type
|
||
final_base_confidence = best_resolved_conf
|
||
elif best_is_main and not final_is_main and final_base_confidence < 0.40:
|
||
# 主类型替代低确信度的非主类型(如 M:N→マッチング)
|
||
# 但如果 keyword 已确定具体分类(如编码转换 0.85),不覆盖
|
||
final_category = best_resolved_type
|
||
final_base_confidence = max(final_base_confidence, best_resolved_conf)
|
||
|
||
# 5. 计算 4 因子确信度
|
||
keyword_result_v2 = _build_keyword_result_for_v2(keyword_info)
|
||
keyword_result_v2["base_confidence"] = final_base_confidence
|
||
|
||
structure_features = _build_structure_features(structure)
|
||
|
||
# 共识检测: L1 关键字分类与规则引擎最终分类一致时给予奖励
|
||
kw_cat = keyword_info["category"] if keyword_info else None
|
||
consensus_cat = kw_cat if (kw_cat and kw_cat == final_category) else None
|
||
|
||
v2_confidence = compute_confidence_v2(
|
||
keyword_result=keyword_result_v2,
|
||
structure_features=structure_features,
|
||
contradictions=contradictions,
|
||
resolution=resolution_map,
|
||
consensus_category=consensus_cat,
|
||
)
|
||
|
||
# 6. 组装结果
|
||
return {
|
||
"category": final_category,
|
||
"confidence": v2_confidence["confidence"],
|
||
"needs_review": v2_confidence["needs_review"],
|
||
"method": "rule_engine",
|
||
"source": "pipeline",
|
||
"judgment": v2_confidence["judgment"],
|
||
"matches": keyword_info["all_matches"] if keyword_info else [],
|
||
"contradictions": contradictions,
|
||
"contradiction_resolution": resolution_map,
|
||
"resolved_types": resolved_types,
|
||
"v2_confidence": v2_confidence,
|
||
"structure": _build_structure_summary(structure),
|
||
}
|
||
|
||
|
||
def _path_llm_assisted(
|
||
keyword_info: dict | None,
|
||
structure: dict,
|
||
llm: Any,
|
||
) -> dict:
|
||
"""路径 C: keyword < 50%, LLM 辅助 + 规则引擎验证。
|
||
|
||
流程:
|
||
1. 调用 classify_with_llm 获取 LLM 分类
|
||
2. 规则引擎验证 LLM 结果
|
||
3. 矛盾检测
|
||
4. 确信度计算
|
||
"""
|
||
from hina.hina_agent import classify_with_llm
|
||
|
||
# 1. LLM 分类
|
||
llm_result = classify_with_llm(structure, llm)
|
||
llm_category = llm_result.get("category", "unknown")
|
||
llm_confidence = llm_result.get("confidence", 0.5)
|
||
|
||
# 2. 规则引擎验证 LLM 分类
|
||
features = dict(structure)
|
||
validated_category = llm_category
|
||
validated_confidence = llm_confidence
|
||
|
||
for pair_name in _PAIR_NAMES:
|
||
try:
|
||
pair_result = resolve_confusion_pair(features, pair_name)
|
||
if (pair_result["resolved_type"] != "unknown"
|
||
and pair_result["confidence"] > validated_confidence):
|
||
validated_category = pair_result["resolved_type"]
|
||
validated_confidence = pair_result["confidence"]
|
||
except Exception:
|
||
continue
|
||
|
||
# 3. 矛盾检测与解决 (M2: 消除硬编码 resolved_count=0)
|
||
resolved_types: dict[str, str] = {}
|
||
for pair_name in _PAIR_NAMES:
|
||
try:
|
||
rr = resolve_confusion_pair(features, pair_name)
|
||
if rr["resolved_type"] != "unknown":
|
||
resolved_types[pair_name] = rr["resolved_type"]
|
||
except Exception:
|
||
continue
|
||
|
||
features["resolved_types"] = resolved_types
|
||
contradictions = detect_contradictions(features)
|
||
|
||
resolution_map: dict[str, Any] = {
|
||
"resolved_count": 0,
|
||
"total_count": len(contradictions),
|
||
}
|
||
for c in contradictions:
|
||
try:
|
||
winner = resolve_contradiction(features, c)
|
||
if winner:
|
||
resolution_map[c.get("name", "unknown")] = winner
|
||
resolution_map["resolved_count"] += 1
|
||
except Exception as e:
|
||
logger.debug("[pipeline] Path C 矛盾解决异常: %s", e)
|
||
|
||
# 4. 确信度计算
|
||
keyword_result_v2 = _build_keyword_result_for_v2(keyword_info)
|
||
keyword_result_v2["base_confidence"] = validated_confidence
|
||
|
||
structure_features = _build_structure_features(structure)
|
||
|
||
v2_confidence = compute_confidence_v2(
|
||
keyword_result=keyword_result_v2,
|
||
structure_features=structure_features,
|
||
contradictions=contradictions,
|
||
resolution=resolution_map,
|
||
)
|
||
|
||
return {
|
||
"category": validated_category,
|
||
"confidence": v2_confidence["confidence"],
|
||
"needs_review": v2_confidence["needs_review"],
|
||
"method": "llm",
|
||
"source": "pipeline",
|
||
"judgment": v2_confidence["judgment"],
|
||
"matches": keyword_info["all_matches"] if keyword_info else [],
|
||
"contradictions": contradictions,
|
||
"llm_raw": llm_result,
|
||
"v2_confidence": v2_confidence,
|
||
"structure": _build_structure_summary(structure),
|
||
}
|
||
|
||
|
||
|
||
|
||
_MATCHING_SUBTYPE_AGENT_PROMPT = """你是一个 COBOL 迁移专家。请分析以下程序的键匹配模式,判断其匹配子类型。
|
||
|
||
结构特征:
|
||
- 文件数: {file_count}
|
||
- 决策点: {decision_count}
|
||
- IF 语句: {if_count}
|
||
- 总分支: {total_branches}
|
||
- 变量模式: {variable_patterns}
|
||
|
||
源码中的关键变量:
|
||
{key_vars}
|
||
|
||
可选的匹配子类型(单选):
|
||
1. "1:1" — 1 个主文件对 1 个事务文件,一一对应
|
||
2. "1:N" — 1 个主文件对 N 个事务文件
|
||
3. "N:1" — N 个业务记录聚合成 1 个输出
|
||
4. "M:N→M" — M:N 组合后按主键输出(输出 M 条)
|
||
5. "M:N→N" — M:N 组合后按事务键输出(输出 N 条)
|
||
|
||
请输出 JSON,不要添加其他文字:
|
||
"""
|
||
|
||
|
||
def _llm_subtype_inference(structure: dict, cobol_source: str, llm: Any) -> str | None:
|
||
"""调用 LLM 推理匹配子类型。"""
|
||
import re
|
||
from hina.hina_agent import _parse_llm_response
|
||
|
||
src_upper = cobol_source.upper()
|
||
key_vars = sorted(set(re.findall(r'WS-[\w-]*KEY[A-Z0-9-]*', src_upper)))
|
||
decision_points = structure.get("decision_points", [])
|
||
if_count = sum(1 for dp in decision_points if dp.get("kind") == "IF")
|
||
|
||
prompt = _MATCHING_SUBTYPE_AGENT_PROMPT.format(
|
||
file_count=structure.get("file_count", 0),
|
||
decision_count=len(decision_points),
|
||
if_count=if_count,
|
||
total_branches=structure.get("total_branches", 0),
|
||
variable_patterns=str(structure.get("variable_patterns", {})),
|
||
key_vars=", ".join(key_vars) if key_vars else "(无 KEY 变量)",
|
||
)
|
||
|
||
messages = [
|
||
{"role": "system", "content": "你是一个 COBOL 匹配程序专家。只输出 JSON。"},
|
||
{"role": "user", "content": prompt},
|
||
]
|
||
|
||
try:
|
||
raw = llm.call(messages)
|
||
parsed = _parse_llm_response(raw)
|
||
subtype = parsed.get("subtype", "")
|
||
confidence = parsed.get("confidence", 0.0)
|
||
valid = {"1:1", "1:N", "N:1", "M:N→M", "M:N→N"}
|
||
if subtype in valid and confidence >= 0.4:
|
||
logger.info("[pipeline] LLM 子类型推理: %s (conf=%.2f, reason=%s)",
|
||
subtype, confidence, parsed.get("reason", ""))
|
||
return subtype
|
||
except Exception as e:
|
||
logger.debug("[pipeline] LLM 子类型推理失败: %s", e)
|
||
|
||
return None
|
||
|
||
# ── 主入口 ────────────────────────────────────────────────────────────────────
|
||
|
||
# ── 匹配子类型解析 ──────────────────────────────────────────────────────────
|
||
|
||
_MATCHING_SUBTYPE_RULES = [
|
||
# (match_fn, subtype)
|
||
# 按优先级从高到低排列
|
||
]
|
||
|
||
|
||
def _resolve_matching_subtype(
|
||
result: dict,
|
||
cobol_source: str,
|
||
structure: dict,
|
||
llm: Any = None,
|
||
) -> dict:
|
||
"""匹配程序的子类型区分后处理。
|
||
|
||
使用分层策略:
|
||
1. 静态规则处理确定性高的(M:N→MxN、1:N、混合、二段階)
|
||
2. LLM agent 推理模棱两可的(N:1 vs 1:1、M:N→M vs M:N→N)
|
||
3. 无 LLM 时回退保守默认值
|
||
|
||
Args:
|
||
result: classify_program 的返回结果。
|
||
cobol_source: 原始 COBOL 源码。
|
||
structure: extract_structure 的返回结构。
|
||
llm: 可选的 LLM 客户端实例。
|
||
|
||
Returns:
|
||
更新后的 result,增加 "subtype" 字段。
|
||
"""
|
||
category = result.get("category", "")
|
||
if "マッチング" not in category and "キーブレイク" not in category and "項目チェック" not in category:
|
||
return result # 非匹配/校验程序不做子类型区分
|
||
|
||
src_upper = cobol_source.upper()
|
||
import re
|
||
|
||
# 0. 二段階マッチング — 已在规则引擎中处理
|
||
if "二段階" in category:
|
||
result["subtype"] = "二段階"
|
||
return result
|
||
|
||
# 1. M:N→MxN 直積 — 特征: WRITE + WS-SAVE-KEY + 3 文件
|
||
if structure.get("file_count", 0) >= 3 and 'WS-SAVE' in src_upper:
|
||
result["subtype"] = "M:N→MxN"
|
||
return result
|
||
|
||
# 2. 混合匹配 (WS-PREV-KEY 存在) — 也覆盖 項目チェック 分类
|
||
if 'WS-PREV-KEY' in src_upper:
|
||
result["subtype"] = "混合"
|
||
return result
|
||
|
||
# 3. WS-ALT-KEY → 混合(异键)
|
||
if 'WS-ALT-KEY' in src_upper or 'ALTERNATE' in src_upper.upper():
|
||
result["subtype"] = "混合(异键)"
|
||
return result
|
||
|
||
# 4. 检查键变量命名模式
|
||
key_vars = set(re.findall(r'WS-[\w-]*KEY[A-Z0-9-]*', src_upper))
|
||
|
||
# 不对称键名 → 1:N 或 N:1 (WS-MAST-KEY + WS-TRAN-KEY)
|
||
has_master = any('MAST' in k for k in key_vars)
|
||
has_tran = any('TRAN' in k for k in key_vars)
|
||
if has_master and has_tran:
|
||
result["subtype"] = "1:N"
|
||
return result
|
||
|
||
# 5. 命名模式启发式: WS-KEY-M/WS-KEY-T → Master/Transaction → N:1
|
||
# WS-KEY-A/WS-KEY-B → 对称命名 → 1:1
|
||
# WS-KEY-M/WS-KEY-N → M:N 多文件
|
||
key_suffixes = [k.split('-')[-1] if '-' in k else '' for k in key_vars]
|
||
if 'M' in key_suffixes and 'T' in key_suffixes:
|
||
# WS-KEY-M + WS-KEY-T → Master/Transaction → N:1
|
||
result["subtype"] = "N:1"
|
||
return result
|
||
if 'M' in key_suffixes and 'N' in key_suffixes:
|
||
# WS-KEY-M + WS-KEY-N → M:N 多文件(无法区分 M:N→M 还是 M:N→N)
|
||
result["subtype"] = "M:N"
|
||
return result
|
||
|
||
# ── 第 2 层: LLM 辅助 ──
|
||
# 多个键变量 + 多文件 → 可能是 M:N→M 或 M:N→N,需要 LLM 分辨
|
||
needs_llm = (
|
||
len(key_vars) >= 3 or
|
||
(len(key_vars) >= 2 and structure.get("file_count", 0) >= 2
|
||
and not has_master)
|
||
)
|
||
|
||
if needs_llm and llm is not None:
|
||
llm_subtype = _llm_subtype_inference(structure, cobol_source, llm)
|
||
if llm_subtype:
|
||
result["subtype"] = llm_subtype
|
||
return result
|
||
|
||
# ── 第 3 层: 回退 ──
|
||
# 多个键变量 → M:N(保守)
|
||
if len(key_vars) >= 3 and structure.get("file_count", 0) >= 2:
|
||
result["subtype"] = "M:N"
|
||
return result
|
||
|
||
# 对称键名 → 默认为 1:1
|
||
result["subtype"] = "1:1"
|
||
return result
|
||
|
||
|
||
def classify_program(cobol_source: str, llm: Any = None) -> dict:
|
||
"""完整程序类型判定管道。
|
||
|
||
流程:
|
||
1. 并行: detect_keyword() + extract_structure()
|
||
2. keyword confidence >= 90% -> 直接输出
|
||
3. keyword 50-89% -> 规则引擎 + 确信度计算 + 矛盾回溯
|
||
4. keyword < 50% -> LLM 辅助 + 规则引擎验证
|
||
5. 输出最终 JSON
|
||
|
||
Args:
|
||
cobol_source: COBOL 程序源码文本。
|
||
llm: 可选的 LLM 客户端实例。
|
||
在 keyword confidence < 50% 路径中用于 LLM 辅助分类。
|
||
若为 None 且 keyword < 50%, 则使用规则引擎兜底。
|
||
|
||
Returns:
|
||
dict: {
|
||
"category": str, # 程序分类名称
|
||
"confidence": float, # 综合确信度 (0.0 ~ 1.0)
|
||
"needs_review": bool, # 是否需要人工审核
|
||
"method": str, # "keyword" | "rule_engine" | "llm"
|
||
"source": str, # 结果来源: "l1" / "pipeline"
|
||
"judgment": str, # auto / review / manual / impossible
|
||
"matches": list, # L1 关键字匹配详情
|
||
"contradictions": list, # 矛盾列表
|
||
"v2_confidence": dict, # 4 因子确信度详情
|
||
"structure": dict, # 结构特征摘要(调试用)
|
||
}
|
||
|
||
Raises:
|
||
ValueError: 如果 cobol_source 为空或无效。
|
||
"""
|
||
if not cobol_source or not cobol_source.strip():
|
||
return {
|
||
"category": "unknown",
|
||
"confidence": 0.0,
|
||
"needs_review": True,
|
||
"method": "none",
|
||
"source": "error",
|
||
"judgment": "impossible",
|
||
"matches": [],
|
||
"contradictions": [],
|
||
"v2_confidence": {},
|
||
"structure": {},
|
||
}
|
||
|
||
# ── 第 1 步: 并行执行 keyword 检测和结构提取 ──
|
||
keyword_matches: list = []
|
||
structure: dict = {}
|
||
|
||
with ThreadPoolExecutor(max_workers=2) as executor:
|
||
future_keyword = executor.submit(detect_keyword, cobol_source)
|
||
future_structure = executor.submit(extract_structure, cobol_source)
|
||
|
||
for future in as_completed([future_keyword, future_structure]):
|
||
if future == future_keyword:
|
||
try:
|
||
keyword_matches = future.result()
|
||
except Exception as e:
|
||
logger.warning("[pipeline] detect_keyword 失败: %s", e)
|
||
elif future == future_structure:
|
||
try:
|
||
structure = future.result()
|
||
except Exception as e:
|
||
logger.warning("[pipeline] extract_structure 失败: %s", e)
|
||
|
||
# 注入源代码用于 features 中的上下文验证(如 has_key_var)
|
||
if structure:
|
||
structure["source_upper"] = cobol_source.upper()
|
||
|
||
# ── 第 2 步: 分析关键字结果, 确定路径 ──
|
||
keyword_info = _get_best_keyword_match(keyword_matches)
|
||
max_keyword_confidence = keyword_info["confidence"] if keyword_info else 0.0
|
||
|
||
logger.info(
|
||
"[pipeline] keyword matches=%d, max_confidence=%.2f, paragraphs=%d, files=%d",
|
||
len(keyword_matches),
|
||
max_keyword_confidence,
|
||
structure.get("total_paragraphs", 0),
|
||
structure.get("file_count", 0),
|
||
)
|
||
|
||
# ── 第 3 步: 根据确信度分路径 ──
|
||
|
||
# 路径 A: keyword >= 90% -> 直接输出
|
||
if max_keyword_confidence >= 0.90:
|
||
logger.info("[pipeline] 路径 A: keyword 高确信度 (%.2f)", max_keyword_confidence)
|
||
result = _path_keyword_direct(keyword_info, structure)
|
||
|
||
# 路径 B: keyword 50-89% -> 规则引擎
|
||
elif max_keyword_confidence >= 0.50:
|
||
logger.info("[pipeline] 路径 B: keyword 中确信度 (%.2f) -> 规则引擎", max_keyword_confidence)
|
||
result = _path_rule_engine(keyword_info, structure)
|
||
|
||
# 路径 C: keyword < 50% -> LLM 辅助
|
||
elif llm is not None:
|
||
logger.info("[pipeline] 路径 C: keyword 低确信度 (%.2f) -> LLM 辅助", max_keyword_confidence)
|
||
result = _path_llm_assisted(keyword_info, structure, llm)
|
||
|
||
# LLM 不可用: 使用规则引擎兜底
|
||
else:
|
||
logger.info("[pipeline] 路径 C(fallback): keyword 低确信度 (%.2f) -> 规则引擎兜底", max_keyword_confidence)
|
||
result = _path_rule_engine(keyword_info, structure)
|
||
result["method"] = "rule_engine_fallback"
|
||
|
||
# ── 第 4 步: 匹配子类型区分(仅对匹配/键中断程序)──
|
||
result = _resolve_matching_subtype(result, cobol_source, structure, llm=llm)
|
||
return result
|