""" HINA 混淆组判定 — 基于 LLM 的 COBOL 程序结构分类。 根据 extract_structure() 输出的结构特征,调用 LLM 将程序归类到 混淆组(confusion group),并返回分类结果和策略参数。 """ import json import logging logger = logging.getLogger(__name__) CONFUSION_PROMPT = """你是一个 COBOL 程序混淆组分类专家。请根据以下程序结构特征,将其归类到合适的混淆组中。 程序结构特征: - 段落数: {paragraph_count} - 决策点总数: {decision_count} - IF 语句数: {if_count} - EVALUATE 语句数: {evaluate_count} - 关联文件数: {file_count} - OPEN 方向: {open_directions} - SEARCH ALL: {has_search_all} - CALL 语句: {has_call} - KEY BREAK 关键词: {has_break} - 总分支数: {total_branches} 混淆组定义: 1. simple_sequential — 极少决策点(<=2),无 EVALUATE/SEARCH ALL/CALL,直接顺序执行 2. condition_heavy — IF 语句占比高(>60% 的决策点),嵌套深,逻辑复杂 3. evaluate_driven — EVALUATE 主导,多分支选择结构 4. data_file_centric — 文件操作密集(>=2 文件),OPEN 方向多样(I-O/OUTPUT/INPUT) 5. search_intensive — 包含 SEARCH ALL,表/数组查找为主 6. call_based — 包含 CALL 语句,模块间调用为主 7. mixed_complex — 同时具备多种复杂特征(决策点多且文件多且含 CALL/SEARCH 等) 请按 JSON 格式输出分类结果,不要包含其他文字: ```json {{ "category": "<混淆组类别>", "subtype": "<子类别,如 nested_if / flat_evaluate / multi_file 等>", "confidence": <0~1 置信度>, "features": {{ "paragraph_count": {paragraph_count}, "decision_count": {decision_count}, "if_count": {if_count}, "evaluate_count": {evaluate_count}, "file_count": {file_count}, "has_search_all": {has_search_all}, "has_call": {has_call}, "has_break": {has_break}, "total_branches": {total_branches} }}, "required_tests": <建议测试用例数,整数>, "strategy_params": {{ "max_nesting_depth": <最大嵌套深度建议>, "coverage_target": "branch" 或 "path", "file_isolation": true 或 false, "supplement_strategy": "incremental" 或 "full" 或 "skip" }} }} ```""" def classify_with_llm(structure: dict, llm) -> dict: """调用 LLM 对程序结构进行混淆组分类。 根据 extract_structure() 返回的结构字典,构造 CONFUSION_PROMPT 并调用 LLM 进行分类。结果包含 category、subtype、confidence、 features、required_tests、strategy_params。 Args: structure: extract_structure() 返回的字典,包含 paragraphs、 decision_points、file_count、open_directions、 has_search_all、has_evaluate、has_call、has_break、 total_branches、total_paragraphs 等字段。 llm: LLMClient 实例,call 方法签名为 llm.call([{"role":"system","content":"..."}, {"role":"user","content":prompt}]) -> str Returns: dict: { "category": str, "subtype": str, "confidence": float, "features": dict, "required_tests": int, "strategy_params": dict } """ decision_points = structure.get("decision_points", []) if_count = sum(1 for dp in decision_points if dp.get("kind") == "IF") evaluate_count = sum(1 for dp in decision_points if dp.get("kind") == "EVALUATE") paragraph_count = structure.get("total_paragraphs", len(structure.get("paragraphs", []))) open_dirs = structure.get("open_directions", {}) has_search_all = str(structure.get("has_search_all", False)).lower() has_call = str(structure.get("has_call", False)).lower() has_break = str(structure.get("has_break", False)).lower() prompt = CONFUSION_PROMPT.format( paragraph_count=paragraph_count, decision_count=len(decision_points), if_count=if_count, evaluate_count=evaluate_count, file_count=structure.get("file_count", 0), open_directions=json.dumps(open_dirs, ensure_ascii=False), has_search_all=has_search_all, has_call=has_call, has_break=has_break, total_branches=structure.get("total_branches", 0), ) messages = [ {"role": "system", "content": "你是一个 COBOL 程序混淆组分类专家。只输出 JSON,不要输出解释。"}, {"role": "user", "content": prompt}, ] try: raw = llm.call(messages) result = _parse_llm_response(raw) logger.info( "HINA classification: %s/%s (confidence=%.2f, tests=%s)", result.get("category", "?"), result.get("subtype", "?"), result.get("confidence", 0.0), result.get("required_tests", "?"), ) return result except Exception as e: logger.warning("HINA LLM classification failed: %s", e) return _fallback_classification(structure) def _parse_llm_response(raw: str) -> dict: """从 LLM 响应中提取 JSON 并解析。 处理 JSON 可能被 ```json ... ``` 包裹的情况。 """ text = raw.strip() # 尝试提取 ```json ... ``` 代码块 if "```json" in text: start = text.index("```json") + 7 end = text.index("```", start) if "```" in text[start:] else len(text) text = text[start:end].strip() elif "```" in text: # 尝试 ``` ... ``` (无 json 标注) start = text.index("```") + 3 end = text.index("```", start) if "```" in text[start:] else len(text) text = text[start:end].strip() parsed = json.loads(text) return _validate_result(parsed) def _validate_result(parsed: dict) -> dict: """验证并规范化 LLM 返回的分类结果。""" defaults = { "category": "unknown", "subtype": "", "confidence": 0.0, "features": {}, "required_tests": 1, "strategy_params": { "max_nesting_depth": 1, "coverage_target": "branch", "file_isolation": False, "supplement_strategy": "full", }, } result = {} for key, default_value in defaults.items(): value = parsed.get(key, default_value) if key == "confidence": try: value = float(value) value = max(0.0, min(1.0, value)) except (ValueError, TypeError): value = 0.0 elif key == "required_tests": try: value = int(value) value = max(1, value) except (ValueError, TypeError): value = 1 result[key] = value return result def _fallback_classification(structure: dict) -> dict: """当 LLM 调用失败时,基于规则的兜底分类。""" decision_points = structure.get("decision_points", []) if_count = sum(1 for dp in decision_points if dp.get("kind") == "IF") evaluate_count = sum(1 for dp in decision_points if dp.get("kind") == "EVALUATE") total_decisions = len(decision_points) file_count = structure.get("file_count", 0) has_search_all = structure.get("has_search_all", False) has_call = structure.get("has_call", False) has_break = structure.get("has_break", False) # 规则优先级:从高到低 if total_decisions == 0: category, subtype = "simple_sequential", "no_branch" required_tests = 1 strategy = {"max_nesting_depth": 0, "coverage_target": "branch", "file_isolation": False, "supplement_strategy": "skip"} elif has_search_all: category, subtype = "search_intensive", "table_lookup" required_tests = max(total_decisions, 3) strategy = {"max_nesting_depth": 3, "coverage_target": "path", "file_isolation": True, "supplement_strategy": "incremental"} elif has_call: category, subtype = "call_based", "external_call" required_tests = max(total_decisions, 3) strategy = {"max_nesting_depth": 2, "coverage_target": "branch", "file_isolation": False, "supplement_strategy": "full"} elif evaluate_count > if_count and evaluate_count >= 2: category, subtype = "evaluate_driven", "multi_way" required_tests = total_decisions + 1 strategy = {"max_nesting_depth": evaluate_count, "coverage_target": "path", "file_isolation": False, "supplement_strategy": "full"} elif file_count >= 2: category, subtype = "data_file_centric", "multi_file" required_tests = max(total_decisions, file_count * 2) strategy = {"max_nesting_depth": 2, "coverage_target": "branch", "file_isolation": True, "supplement_strategy": "incremental"} elif if_count >= 5 or total_decisions >= 8: category, subtype = "condition_heavy", "nested_if" required_tests = total_decisions + 2 strategy = {"max_nesting_depth": 4, "coverage_target": "path", "file_isolation": False, "supplement_strategy": "incremental"} elif if_count >= 2: category, subtype = "condition_heavy", "simple_if" required_tests = total_decisions + 1 strategy = {"max_nesting_depth": 2, "coverage_target": "branch", "file_isolation": False, "supplement_strategy": "incremental"} else: category, subtype = "simple_sequential", "minimal" required_tests = 1 strategy = {"max_nesting_depth": 0, "coverage_target": "branch", "file_isolation": False, "supplement_strategy": "skip"} # 检查是否应升级为 mixed_complex complexity_flags = sum([ has_search_all, has_call, has_break, file_count >= 2, if_count >= 5, evaluate_count >= 3, ]) if complexity_flags >= 3: category, subtype = "mixed_complex", f"{subtype}_plus" required_tests = max(required_tests, 10) strategy["max_nesting_depth"] = max(strategy.get("max_nesting_depth", 2), 5) strategy["coverage_target"] = "path" strategy["supplement_strategy"] = "full" return { "category": category, "subtype": subtype, "confidence": 0.6, "features": { "paragraph_count": structure.get("total_paragraphs", len(structure.get("paragraphs", []))), "decision_count": total_decisions, "if_count": if_count, "evaluate_count": evaluate_count, "file_count": file_count, "has_search_all": has_search_all, "has_call": has_call, "has_break": has_break, "total_branches": structure.get("total_branches", 0), }, "required_tests": required_tests, "strategy_params": strategy, }