63b5284715
test: add gap coverage tests (hina_agent/JCL/quality gate edge cases)
284 lines
11 KiB
Python
284 lines
11 KiB
Python
"""
|
||
HINA 混淆组判定 — 基于 LLM 的 COBOL 程序结构分类。
|
||
|
||
根据 extract_structure() 输出的结构特征,调用 LLM 将程序归类到
|
||
混淆组(confusion group),并返回分类结果和策略参数。
|
||
"""
|
||
|
||
import json
|
||
import logging
|
||
|
||
logger = logging.getLogger(__name__)
|
||
|
||
CONFUSION_PROMPT = """你是一个 COBOL 程序混淆组分类专家。请根据以下程序结构特征,将其归类到合适的混淆组中。
|
||
|
||
程序结构特征:
|
||
- 段落数: {paragraph_count}
|
||
- 决策点总数: {decision_count}
|
||
- IF 语句数: {if_count}
|
||
- EVALUATE 语句数: {evaluate_count}
|
||
- 关联文件数: {file_count}
|
||
- OPEN 方向: {open_directions}
|
||
- SEARCH ALL: {has_search_all}
|
||
- CALL 语句: {has_call}
|
||
- KEY BREAK 关键词: {has_break}
|
||
- 总分支数: {total_branches}
|
||
|
||
混淆组定义:
|
||
1. simple_sequential — 极少决策点(<=2),无 EVALUATE/SEARCH ALL/CALL,直接顺序执行
|
||
2. condition_heavy — IF 语句占比高(>60% 的决策点),嵌套深,逻辑复杂
|
||
3. evaluate_driven — EVALUATE 主导,多分支选择结构
|
||
4. data_file_centric — 文件操作密集(>=2 文件),OPEN 方向多样(I-O/OUTPUT/INPUT)
|
||
5. search_intensive — 包含 SEARCH ALL,表/数组查找为主
|
||
6. call_based — 包含 CALL 语句,模块间调用为主
|
||
7. mixed_complex — 同时具备多种复杂特征(决策点多且文件多且含 CALL/SEARCH 等)
|
||
|
||
请按 JSON 格式输出分类结果,不要包含其他文字:
|
||
|
||
```json
|
||
{{
|
||
"category": "<混淆组类别>",
|
||
"subtype": "<子类别,如 nested_if / flat_evaluate / multi_file 等>",
|
||
"confidence": <0~1 置信度>,
|
||
"features": {{
|
||
"paragraph_count": {paragraph_count},
|
||
"decision_count": {decision_count},
|
||
"if_count": {if_count},
|
||
"evaluate_count": {evaluate_count},
|
||
"file_count": {file_count},
|
||
"has_search_all": {has_search_all},
|
||
"has_call": {has_call},
|
||
"has_break": {has_break},
|
||
"total_branches": {total_branches}
|
||
}},
|
||
"required_tests": <建议测试用例数,整数>,
|
||
"strategy_params": {{
|
||
"max_nesting_depth": <最大嵌套深度建议>,
|
||
"coverage_target": "branch" 或 "path",
|
||
"file_isolation": true 或 false,
|
||
"supplement_strategy": "incremental" 或 "full" 或 "skip"
|
||
}}
|
||
}}
|
||
```"""
|
||
|
||
|
||
def classify_with_llm(structure: dict, llm) -> dict:
|
||
"""调用 LLM 对程序结构进行混淆组分类。
|
||
|
||
根据 extract_structure() 返回的结构字典,构造 CONFUSION_PROMPT
|
||
并调用 LLM 进行分类。结果包含 category、subtype、confidence、
|
||
features、required_tests、strategy_params。
|
||
|
||
Args:
|
||
structure: extract_structure() 返回的字典,包含 paragraphs、
|
||
decision_points、file_count、open_directions、
|
||
has_search_all、has_evaluate、has_call、has_break、
|
||
total_branches、total_paragraphs 等字段。
|
||
llm: LLMClient 实例,call 方法签名为
|
||
llm.call([{"role":"system","content":"..."},
|
||
{"role":"user","content":prompt}]) -> str
|
||
|
||
Returns:
|
||
dict: {
|
||
"category": str,
|
||
"subtype": str,
|
||
"confidence": float,
|
||
"features": dict,
|
||
"required_tests": int,
|
||
"strategy_params": dict
|
||
}
|
||
"""
|
||
decision_points = structure.get("decision_points", [])
|
||
if_count = sum(1 for dp in decision_points if dp.get("kind") == "IF")
|
||
evaluate_count = sum(1 for dp in decision_points if dp.get("kind") == "EVALUATE")
|
||
|
||
paragraph_count = structure.get("total_paragraphs", len(structure.get("paragraphs", [])))
|
||
open_dirs = structure.get("open_directions", {})
|
||
|
||
has_search_all = str(structure.get("has_search_all", False)).lower()
|
||
has_call = str(structure.get("has_call", False)).lower()
|
||
has_break = str(structure.get("has_break", False)).lower()
|
||
|
||
prompt = CONFUSION_PROMPT.format(
|
||
paragraph_count=paragraph_count,
|
||
decision_count=len(decision_points),
|
||
if_count=if_count,
|
||
evaluate_count=evaluate_count,
|
||
file_count=structure.get("file_count", 0),
|
||
open_directions=json.dumps(open_dirs, ensure_ascii=False),
|
||
has_search_all=has_search_all,
|
||
has_call=has_call,
|
||
has_break=has_break,
|
||
total_branches=structure.get("total_branches", 0),
|
||
)
|
||
|
||
messages = [
|
||
{"role": "system", "content": "你是一个 COBOL 程序混淆组分类专家。只输出 JSON,不要输出解释。"},
|
||
{"role": "user", "content": prompt},
|
||
]
|
||
|
||
try:
|
||
raw = llm.call(messages)
|
||
result = _parse_llm_response(raw)
|
||
logger.info(
|
||
"HINA classification: %s/%s (confidence=%.2f, tests=%s)",
|
||
result.get("category", "?"),
|
||
result.get("subtype", "?"),
|
||
result.get("confidence", 0.0),
|
||
result.get("required_tests", "?"),
|
||
)
|
||
return result
|
||
except Exception as e:
|
||
logger.warning("HINA LLM classification failed: %s", e)
|
||
return _fallback_classification(structure)
|
||
|
||
|
||
def _parse_llm_response(raw: str) -> dict:
|
||
"""从 LLM 响应中提取 JSON 并解析。
|
||
|
||
处理 JSON 可能被 ```json ... ``` 包裹的情况。
|
||
"""
|
||
text = raw.strip()
|
||
|
||
# 尝试提取 ```json ... ``` 代码块
|
||
if "```json" in text:
|
||
start = text.index("```json") + 7
|
||
end = text.index("```", start) if "```" in text[start:] else len(text)
|
||
text = text[start:end].strip()
|
||
elif "```" in text:
|
||
# 尝试 ``` ... ``` (无 json 标注)
|
||
start = text.index("```") + 3
|
||
end = text.index("```", start) if "```" in text[start:] else len(text)
|
||
text = text[start:end].strip()
|
||
|
||
try:
|
||
parsed = json.loads(text)
|
||
return _validate_result(parsed)
|
||
except (json.JSONDecodeError, ValueError):
|
||
return _validate_result({})
|
||
|
||
|
||
def _validate_result(parsed: dict) -> dict:
|
||
"""验证并规范化 LLM 返回的分类结果。"""
|
||
defaults = {
|
||
"category": "unknown",
|
||
"subtype": "",
|
||
"confidence": 0.0,
|
||
"features": {},
|
||
"required_tests": 1,
|
||
"strategy_params": {
|
||
"max_nesting_depth": 1,
|
||
"coverage_target": "branch",
|
||
"file_isolation": False,
|
||
"supplement_strategy": "full",
|
||
},
|
||
}
|
||
|
||
result = {}
|
||
for key, default_value in defaults.items():
|
||
value = parsed.get(key, default_value)
|
||
if key == "confidence":
|
||
try:
|
||
value = float(value)
|
||
value = max(0.0, min(1.0, value))
|
||
except (ValueError, TypeError):
|
||
value = 0.0
|
||
elif key == "required_tests":
|
||
try:
|
||
value = int(value)
|
||
value = max(1, value)
|
||
except (ValueError, TypeError):
|
||
value = 1
|
||
result[key] = value
|
||
|
||
return result
|
||
|
||
|
||
def _fallback_classification(structure: dict) -> dict:
|
||
"""当 LLM 调用失败时,基于规则的兜底分类。"""
|
||
decision_points = structure.get("decision_points", [])
|
||
if_count = sum(1 for dp in decision_points if dp.get("kind") == "IF")
|
||
evaluate_count = sum(1 for dp in decision_points if dp.get("kind") == "EVALUATE")
|
||
total_decisions = len(decision_points)
|
||
file_count = structure.get("file_count", 0)
|
||
has_search_all = structure.get("has_search_all", False)
|
||
has_call = structure.get("has_call", False)
|
||
has_break = structure.get("has_break", False)
|
||
|
||
# 规则优先级:从高到低
|
||
if total_decisions == 0:
|
||
category, subtype = "simple_sequential", "no_branch"
|
||
required_tests = 1
|
||
strategy = {"max_nesting_depth": 0, "coverage_target": "branch",
|
||
"file_isolation": False, "supplement_strategy": "skip"}
|
||
elif has_search_all:
|
||
category, subtype = "search_intensive", "table_lookup"
|
||
required_tests = max(total_decisions, 3)
|
||
strategy = {"max_nesting_depth": 3, "coverage_target": "path",
|
||
"file_isolation": True, "supplement_strategy": "incremental"}
|
||
elif has_call:
|
||
category, subtype = "call_based", "external_call"
|
||
required_tests = max(total_decisions, 3)
|
||
strategy = {"max_nesting_depth": 2, "coverage_target": "branch",
|
||
"file_isolation": False, "supplement_strategy": "full"}
|
||
elif evaluate_count > if_count and evaluate_count >= 2:
|
||
category, subtype = "evaluate_driven", "multi_way"
|
||
required_tests = total_decisions + 1
|
||
strategy = {"max_nesting_depth": evaluate_count, "coverage_target": "path",
|
||
"file_isolation": False, "supplement_strategy": "full"}
|
||
elif file_count >= 2:
|
||
category, subtype = "data_file_centric", "multi_file"
|
||
required_tests = max(total_decisions, file_count * 2)
|
||
strategy = {"max_nesting_depth": 2, "coverage_target": "branch",
|
||
"file_isolation": True, "supplement_strategy": "incremental"}
|
||
elif if_count >= 5 or total_decisions >= 8:
|
||
category, subtype = "condition_heavy", "nested_if"
|
||
required_tests = total_decisions + 2
|
||
strategy = {"max_nesting_depth": 4, "coverage_target": "path",
|
||
"file_isolation": False, "supplement_strategy": "incremental"}
|
||
elif if_count >= 2:
|
||
category, subtype = "condition_heavy", "simple_if"
|
||
required_tests = total_decisions + 1
|
||
strategy = {"max_nesting_depth": 2, "coverage_target": "branch",
|
||
"file_isolation": False, "supplement_strategy": "incremental"}
|
||
else:
|
||
category, subtype = "simple_sequential", "minimal"
|
||
required_tests = 1
|
||
strategy = {"max_nesting_depth": 0, "coverage_target": "branch",
|
||
"file_isolation": False, "supplement_strategy": "skip"}
|
||
|
||
# 检查是否应升级为 mixed_complex
|
||
complexity_flags = sum([
|
||
has_search_all,
|
||
has_call,
|
||
has_break,
|
||
file_count >= 2,
|
||
if_count >= 5,
|
||
evaluate_count >= 3,
|
||
])
|
||
if complexity_flags >= 3:
|
||
category, subtype = "mixed_complex", f"{subtype}_plus"
|
||
required_tests = max(required_tests, 10)
|
||
strategy["max_nesting_depth"] = max(strategy.get("max_nesting_depth", 2), 5)
|
||
strategy["coverage_target"] = "path"
|
||
strategy["supplement_strategy"] = "full"
|
||
|
||
return {
|
||
"category": category,
|
||
"subtype": subtype,
|
||
"confidence": 0.6,
|
||
"features": {
|
||
"paragraph_count": structure.get("total_paragraphs", len(structure.get("paragraphs", []))),
|
||
"decision_count": total_decisions,
|
||
"if_count": if_count,
|
||
"evaluate_count": evaluate_count,
|
||
"file_count": file_count,
|
||
"has_search_all": has_search_all,
|
||
"has_call": has_call,
|
||
"has_break": has_break,
|
||
"total_branches": structure.get("total_branches", 0),
|
||
},
|
||
"required_tests": required_tests,
|
||
"strategy_params": strategy,
|
||
}
|