bc1d56d1a4
P0.6: gcov infrastructure P1: extract_structure output expansion (11 new feature fields) P2: Confusion group rule engine (8 pairs + contradiction + backtrack) P3: 4-factor confidence calculation + quality gate update P4: 33+2 COBOL program type test samples (22 files, 7 categories) P5: parametrized/ test data generation engine P6: japanese_data.py lookup tables P7-10: Type-specific test suites (~159 parametrized tests) P11: Full classification pipeline (classify_program) + orchestrator integration P12: Documentation (module-interfaces, test-plan v3.0, coverage-matrix) Architecture decisions: - classification_pipeline/ merged to hina/pipeline/ - parametrized/ as independent module - japanese_data.py as root-level file - hina/__all__ only exports classify_program() Co-Authored-By: Claude <noreply@anthropic.com>
97 lines
3.5 KiB
Python
97 lines
3.5 KiB
Python
"""回溯机制 — 多轮判定,必要时重新提取特征以化解矛盾。
|
|
|
|
BacktrackResolver 封装了多轮判定的核心逻辑:
|
|
1. 用当前 features 检测矛盾。
|
|
2. 对有矛盾的对调用 resolve_contradiction。
|
|
3. 如果仍然存在矛盾,重新提取特征再判定。
|
|
4. 超过 max_rounds 轮或 30s 超时后降级。
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import time
|
|
from typing import Any, Callable
|
|
|
|
from .contradiction import detect_contradictions, resolve_contradiction
|
|
|
|
|
|
class BacktrackResolver:
|
|
"""多轮回溯判定器。
|
|
|
|
Args:
|
|
structure_extractor: 接受 COBOL 源码字符串,返回 features dict 的可调用对象。
|
|
"""
|
|
|
|
def __init__(self, structure_extractor: Callable[[str], dict[str, Any]]) -> None:
|
|
self.extract = structure_extractor
|
|
self.max_rounds = 3
|
|
|
|
def _needs_backtrack(self, contradictions: list[dict]) -> bool:
|
|
"""判断是否需要回溯重提取。
|
|
|
|
只要检测到矛盾(列表非空),就需要回溯。
|
|
"""
|
|
return len(contradictions) > 0
|
|
|
|
def resolve(self, cobol_source: str, initial_features: dict) -> dict[str, Any]:
|
|
"""多轮判定,30s 超时降级。
|
|
|
|
Args:
|
|
cobol_source: COBOL 程序源码。
|
|
initial_features: 初始提取的特征字典。
|
|
|
|
Returns:
|
|
最终的特征字典,可能包含 backtrack_rounds 和 backtrack_timeout 信息。
|
|
"""
|
|
start = time.time()
|
|
features: dict[str, Any] = dict(initial_features)
|
|
features["backtrack_rounds"] = 0
|
|
|
|
for round_num in range(1, self.max_rounds + 1):
|
|
# 超时检查
|
|
if time.time() - start > 30:
|
|
features["backtrack_timeout"] = True
|
|
break
|
|
|
|
# 检测矛盾
|
|
contradictions = detect_contradictions(features)
|
|
if not contradictions:
|
|
# 无矛盾,判定完成
|
|
features["backtrack_resolved"] = True
|
|
break
|
|
|
|
# 解决矛盾
|
|
for c in contradictions:
|
|
resolution = resolve_contradiction(features, c)
|
|
# 将解决结果写入 features
|
|
resolved_types = features.setdefault("resolved_types", {})
|
|
resolved_types[f"resolved_{c['name']}"] = resolution
|
|
|
|
features["backtrack_rounds"] = round_num
|
|
|
|
# 判断是否需要重新提取
|
|
if self._needs_backtrack(contradictions):
|
|
# 重新提取特征
|
|
try:
|
|
new_features = self.extract(cobol_source)
|
|
# 合并新特征,保留旧特征中的回溯状态和已解决的矛盾
|
|
preserved_keys = ("backtrack_rounds", "backtrack_timeout", "resolved_types")
|
|
preserved = {k: features[k] for k in preserved_keys if k in features}
|
|
features.update(new_features)
|
|
features.update(preserved)
|
|
except Exception:
|
|
features["backtrack_extract_error"] = True
|
|
break
|
|
else:
|
|
# max_rounds 耗尽,标记降级
|
|
features["backtrack_degraded"] = True
|
|
|
|
# 确保时间字段存在
|
|
elapsed = time.time() - start
|
|
features.setdefault("backtrack_timeout", False)
|
|
features.setdefault("backtrack_resolved", False)
|
|
features.setdefault("backtrack_degraded", False)
|
|
features["backtrack_elapsed"] = round(elapsed, 3)
|
|
|
|
return features
|