diff --git a/hina/pipeline/pipeline.py b/hina/pipeline/pipeline.py index 6a622d5..7e614da 100644 --- a/hina/pipeline/pipeline.py +++ b/hina/pipeline/pipeline.py @@ -332,6 +332,70 @@ def _path_llm_assisted( } + + +_MATCHING_SUBTYPE_AGENT_PROMPT = """你是一个 COBOL 迁移专家。请分析以下程序的键匹配模式,判断其匹配子类型。 + +结构特征: +- 文件数: {file_count} +- 决策点: {decision_count} +- IF 语句: {if_count} +- 总分支: {total_branches} +- 变量模式: {variable_patterns} + +源码中的关键变量: +{key_vars} + +可选的匹配子类型(单选): +1. "1:1" — 1 个主文件对 1 个事务文件,一一对应 +2. "1:N" — 1 个主文件对 N 个事务文件 +3. "N:1" — N 个业务记录聚合成 1 个输出 +4. "M:N→M" — M:N 组合后按主键输出(输出 M 条) +5. "M:N→N" — M:N 组合后按事务键输出(输出 N 条) + +请输出 JSON,不要添加其他文字: +""" + + +def _llm_subtype_inference(structure: dict, cobol_source: str, llm: Any) -> str | None: + """调用 LLM 推理匹配子类型。""" + import re + from hina.hina_agent import _parse_llm_response + + src_upper = cobol_source.upper() + key_vars = sorted(set(re.findall(r'WS-[\w-]*KEY[A-Z0-9-]*', src_upper))) + decision_points = structure.get("decision_points", []) + if_count = sum(1 for dp in decision_points if dp.get("kind") == "IF") + + prompt = _MATCHING_SUBTYPE_AGENT_PROMPT.format( + file_count=structure.get("file_count", 0), + decision_count=len(decision_points), + if_count=if_count, + total_branches=structure.get("total_branches", 0), + variable_patterns=str(structure.get("variable_patterns", {})), + key_vars=", ".join(key_vars) if key_vars else "(无 KEY 变量)", + ) + + messages = [ + {"role": "system", "content": "你是一个 COBOL 匹配程序专家。只输出 JSON。"}, + {"role": "user", "content": prompt}, + ] + + try: + raw = llm.call(messages) + parsed = _parse_llm_response(raw) + subtype = parsed.get("subtype", "") + confidence = parsed.get("confidence", 0.0) + valid = {"1:1", "1:N", "N:1", "M:N→M", "M:N→N"} + if subtype in valid and confidence >= 0.4: + logger.info("[pipeline] LLM 子类型推理: %s (conf=%.2f, reason=%s)", + subtype, confidence, parsed.get("reason", "")) + return subtype + except Exception as e: + logger.debug("[pipeline] LLM 子类型推理失败: %s", e) + + return None + # ── 主入口 ──────────────────────────────────────────────────────────────────── # ── 匹配子类型解析 ────────────────────────────────────────────────────────── @@ -346,16 +410,20 @@ def _resolve_matching_subtype( result: dict, cobol_source: str, structure: dict, + llm: Any = None, ) -> dict: """匹配程序的子类型区分后处理。 - 在 classify_program 判定为 マッチング 后,进一步区分子类型: - - 1:1 マッチング / 1:N / N:1 / M:N / M:N→M 等 + 使用分层策略: + 1. 静态规则处理确定性高的(M:N→MxN、1:N、混合、二段階) + 2. LLM agent 推理模棱两可的(N:1 vs 1:1、M:N→M vs M:N→N) + 3. 无 LLM 时回退保守默认值 Args: result: classify_program 的返回结果。 cobol_source: 原始 COBOL 源码。 structure: extract_structure 的返回结构。 + llm: 可选的 LLM 客户端实例。 Returns: 更新后的 result,增加 "subtype" 字段。 @@ -392,12 +460,27 @@ def _resolve_matching_subtype( result["subtype"] = "1:N" return result - # 4. 多个键名 → 多文件匹配 (M:N 模式) + # ── 第 2 层: 静态规则+LLM 辅助 ── + # 多个键变量 + 多文件 → 可能是 M:N→M 或 M:N→N,需要 LLM 分辨 + needs_llm = ( + len(key_vars) >= 3 or + (len(key_vars) >= 2 and structure.get("file_count", 0) >= 2 + and not has_master) + ) + + if needs_llm and llm is not None: + llm_subtype = _llm_subtype_inference(structure, cobol_source, llm) + if llm_subtype: + result["subtype"] = llm_subtype + return result + + # ── 第 3 层: 回退 ── + # 多个键变量 → M:N(保守) if len(key_vars) >= 3 and structure.get("file_count", 0) >= 2: result["subtype"] = "M:N" return result - # 5. 对称键名 → 默认为 1:1 + # 对称键名 → 默认为 1:1 result["subtype"] = "1:1" return result @@ -505,5 +588,5 @@ def classify_program(cobol_source: str, llm: Any = None) -> dict: result["method"] = "rule_engine_fallback" # ── 第 4 步: 匹配子类型区分(仅对匹配/键中断程序)── - result = _resolve_matching_subtype(result, cobol_source, structure) + result = _resolve_matching_subtype(result, cobol_source, structure, llm=llm) return result