Files
cobol-java-v3/hina/hina_agent.py
hangshuo652 63b5284715 fix: _parse_llm_response now handles empty/invalid JSON gracefully
test: add gap coverage tests (hina_agent/JCL/quality gate edge cases)
2026-06-18 17:31:16 +08:00

284 lines
11 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
HINA 混淆组判定 — 基于 LLM 的 COBOL 程序结构分类。
根据 extract_structure() 输出的结构特征,调用 LLM 将程序归类到
混淆组(confusion group),并返回分类结果和策略参数。
"""
import json
import logging
logger = logging.getLogger(__name__)
CONFUSION_PROMPT = """你是一个 COBOL 程序混淆组分类专家。请根据以下程序结构特征,将其归类到合适的混淆组中。
程序结构特征:
- 段落数: {paragraph_count}
- 决策点总数: {decision_count}
- IF 语句数: {if_count}
- EVALUATE 语句数: {evaluate_count}
- 关联文件数: {file_count}
- OPEN 方向: {open_directions}
- SEARCH ALL: {has_search_all}
- CALL 语句: {has_call}
- KEY BREAK 关键词: {has_break}
- 总分支数: {total_branches}
混淆组定义:
1. simple_sequential — 极少决策点(<=2),无 EVALUATE/SEARCH ALL/CALL,直接顺序执行
2. condition_heavy — IF 语句占比高(>60% 的决策点),嵌套深,逻辑复杂
3. evaluate_driven — EVALUATE 主导,多分支选择结构
4. data_file_centric — 文件操作密集(>=2 文件),OPEN 方向多样(I-O/OUTPUT/INPUT
5. search_intensive — 包含 SEARCH ALL,表/数组查找为主
6. call_based — 包含 CALL 语句,模块间调用为主
7. mixed_complex — 同时具备多种复杂特征(决策点多且文件多且含 CALL/SEARCH 等)
请按 JSON 格式输出分类结果,不要包含其他文字:
```json
{{
"category": "<混淆组类别>",
"subtype": "<子类别,如 nested_if / flat_evaluate / multi_file 等>",
"confidence": <0~1 置信度>,
"features": {{
"paragraph_count": {paragraph_count},
"decision_count": {decision_count},
"if_count": {if_count},
"evaluate_count": {evaluate_count},
"file_count": {file_count},
"has_search_all": {has_search_all},
"has_call": {has_call},
"has_break": {has_break},
"total_branches": {total_branches}
}},
"required_tests": <建议测试用例数,整数>,
"strategy_params": {{
"max_nesting_depth": <最大嵌套深度建议>,
"coverage_target": "branch""path",
"file_isolation": true 或 false,
"supplement_strategy": "incremental""full""skip"
}}
}}
```"""
def classify_with_llm(structure: dict, llm) -> dict:
"""调用 LLM 对程序结构进行混淆组分类。
根据 extract_structure() 返回的结构字典,构造 CONFUSION_PROMPT
并调用 LLM 进行分类。结果包含 category、subtype、confidence、
features、required_tests、strategy_params。
Args:
structure: extract_structure() 返回的字典,包含 paragraphs、
decision_points、file_count、open_directions、
has_search_all、has_evaluate、has_call、has_break、
total_branches、total_paragraphs 等字段。
llm: LLMClient 实例,call 方法签名为
llm.call([{"role":"system","content":"..."},
{"role":"user","content":prompt}]) -> str
Returns:
dict: {
"category": str,
"subtype": str,
"confidence": float,
"features": dict,
"required_tests": int,
"strategy_params": dict
}
"""
decision_points = structure.get("decision_points", [])
if_count = sum(1 for dp in decision_points if dp.get("kind") == "IF")
evaluate_count = sum(1 for dp in decision_points if dp.get("kind") == "EVALUATE")
paragraph_count = structure.get("total_paragraphs", len(structure.get("paragraphs", [])))
open_dirs = structure.get("open_directions", {})
has_search_all = str(structure.get("has_search_all", False)).lower()
has_call = str(structure.get("has_call", False)).lower()
has_break = str(structure.get("has_break", False)).lower()
prompt = CONFUSION_PROMPT.format(
paragraph_count=paragraph_count,
decision_count=len(decision_points),
if_count=if_count,
evaluate_count=evaluate_count,
file_count=structure.get("file_count", 0),
open_directions=json.dumps(open_dirs, ensure_ascii=False),
has_search_all=has_search_all,
has_call=has_call,
has_break=has_break,
total_branches=structure.get("total_branches", 0),
)
messages = [
{"role": "system", "content": "你是一个 COBOL 程序混淆组分类专家。只输出 JSON,不要输出解释。"},
{"role": "user", "content": prompt},
]
try:
raw = llm.call(messages)
result = _parse_llm_response(raw)
logger.info(
"HINA classification: %s/%s (confidence=%.2f, tests=%s)",
result.get("category", "?"),
result.get("subtype", "?"),
result.get("confidence", 0.0),
result.get("required_tests", "?"),
)
return result
except Exception as e:
logger.warning("HINA LLM classification failed: %s", e)
return _fallback_classification(structure)
def _parse_llm_response(raw: str) -> dict:
"""从 LLM 响应中提取 JSON 并解析。
处理 JSON 可能被 ```json ... ``` 包裹的情况。
"""
text = raw.strip()
# 尝试提取 ```json ... ``` 代码块
if "```json" in text:
start = text.index("```json") + 7
end = text.index("```", start) if "```" in text[start:] else len(text)
text = text[start:end].strip()
elif "```" in text:
# 尝试 ``` ... ``` (无 json 标注)
start = text.index("```") + 3
end = text.index("```", start) if "```" in text[start:] else len(text)
text = text[start:end].strip()
try:
parsed = json.loads(text)
return _validate_result(parsed)
except (json.JSONDecodeError, ValueError):
return _validate_result({})
def _validate_result(parsed: dict) -> dict:
"""验证并规范化 LLM 返回的分类结果。"""
defaults = {
"category": "unknown",
"subtype": "",
"confidence": 0.0,
"features": {},
"required_tests": 1,
"strategy_params": {
"max_nesting_depth": 1,
"coverage_target": "branch",
"file_isolation": False,
"supplement_strategy": "full",
},
}
result = {}
for key, default_value in defaults.items():
value = parsed.get(key, default_value)
if key == "confidence":
try:
value = float(value)
value = max(0.0, min(1.0, value))
except (ValueError, TypeError):
value = 0.0
elif key == "required_tests":
try:
value = int(value)
value = max(1, value)
except (ValueError, TypeError):
value = 1
result[key] = value
return result
def _fallback_classification(structure: dict) -> dict:
"""当 LLM 调用失败时,基于规则的兜底分类。"""
decision_points = structure.get("decision_points", [])
if_count = sum(1 for dp in decision_points if dp.get("kind") == "IF")
evaluate_count = sum(1 for dp in decision_points if dp.get("kind") == "EVALUATE")
total_decisions = len(decision_points)
file_count = structure.get("file_count", 0)
has_search_all = structure.get("has_search_all", False)
has_call = structure.get("has_call", False)
has_break = structure.get("has_break", False)
# 规则优先级:从高到低
if total_decisions == 0:
category, subtype = "simple_sequential", "no_branch"
required_tests = 1
strategy = {"max_nesting_depth": 0, "coverage_target": "branch",
"file_isolation": False, "supplement_strategy": "skip"}
elif has_search_all:
category, subtype = "search_intensive", "table_lookup"
required_tests = max(total_decisions, 3)
strategy = {"max_nesting_depth": 3, "coverage_target": "path",
"file_isolation": True, "supplement_strategy": "incremental"}
elif has_call:
category, subtype = "call_based", "external_call"
required_tests = max(total_decisions, 3)
strategy = {"max_nesting_depth": 2, "coverage_target": "branch",
"file_isolation": False, "supplement_strategy": "full"}
elif evaluate_count > if_count and evaluate_count >= 2:
category, subtype = "evaluate_driven", "multi_way"
required_tests = total_decisions + 1
strategy = {"max_nesting_depth": evaluate_count, "coverage_target": "path",
"file_isolation": False, "supplement_strategy": "full"}
elif file_count >= 2:
category, subtype = "data_file_centric", "multi_file"
required_tests = max(total_decisions, file_count * 2)
strategy = {"max_nesting_depth": 2, "coverage_target": "branch",
"file_isolation": True, "supplement_strategy": "incremental"}
elif if_count >= 5 or total_decisions >= 8:
category, subtype = "condition_heavy", "nested_if"
required_tests = total_decisions + 2
strategy = {"max_nesting_depth": 4, "coverage_target": "path",
"file_isolation": False, "supplement_strategy": "incremental"}
elif if_count >= 2:
category, subtype = "condition_heavy", "simple_if"
required_tests = total_decisions + 1
strategy = {"max_nesting_depth": 2, "coverage_target": "branch",
"file_isolation": False, "supplement_strategy": "incremental"}
else:
category, subtype = "simple_sequential", "minimal"
required_tests = 1
strategy = {"max_nesting_depth": 0, "coverage_target": "branch",
"file_isolation": False, "supplement_strategy": "skip"}
# 检查是否应升级为 mixed_complex
complexity_flags = sum([
has_search_all,
has_call,
has_break,
file_count >= 2,
if_count >= 5,
evaluate_count >= 3,
])
if complexity_flags >= 3:
category, subtype = "mixed_complex", f"{subtype}_plus"
required_tests = max(required_tests, 10)
strategy["max_nesting_depth"] = max(strategy.get("max_nesting_depth", 2), 5)
strategy["coverage_target"] = "path"
strategy["supplement_strategy"] = "full"
return {
"category": category,
"subtype": subtype,
"confidence": 0.6,
"features": {
"paragraph_count": structure.get("total_paragraphs", len(structure.get("paragraphs", []))),
"decision_count": total_decisions,
"if_count": if_count,
"evaluate_count": evaluate_count,
"file_count": file_count,
"has_search_all": has_search_all,
"has_call": has_call,
"has_break": has_break,
"total_branches": structure.get("total_branches", 0),
},
"required_tests": required_tests,
"strategy_params": strategy,
}