Files
cobol-java-v3/test-data/r10_pipeline_agent.py
NB-076 0cf243bb16 R10: pipeline.py(32IF) + hina_agent.py(12IF) 分岐完全網羅
- _path_rule_engine 10分岐: matching/keybreak/dedup/validation/
  csv_merge_split/mn_output/division/pure_vs_mixed
- _resolve_matching_subtype 11IF: 1:1/1:N/N:1/M:N/mixed
- classify_program 7分岐: IF/EVAL/CALL/matching/SORT/空
- _fallback_classification 8分岐全網羅
- _parse_llm_response/vaildate_result 補完

累計: 880テスト/12スイート/0FAIL
全43ファイル中41ファイルにテスト参照(95.3%)
残環境不可: gcov(--coverage gcda生成), spark-submit

Co-Authored-By: Claude <noreply@anthropic.com>
2026-06-22 00:20:41 +08:00

175 lines
9.0 KiB
Python

"""R10: pipeline.py(32IF分岐深堀) + hina_agent.py(12IF完全網羅)"""
import sys, os, tempfile, shutil, json, re
from pathlib import Path
sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..'))
P=0;F=0
def ck(v,m=""): global P,F; (P:=P+1) if v else (F:=F+1,print(f" FAIL {m}"))
def sec(n): print(f"\n--- {n} ---")
_ML = lambda lines: "\n".join(lines)
SD = {"select_files":{},"open_directions":{},"has_divide":False,"divide_constants":[],"has_inspect":False,
"has_string":False,"perform_patterns":[],"open_pattern":"sequential","if_types":{"total":0,"comparison":0,"equality":0},
"variable_patterns":{},"file_count":0,"has_call":False,"total_branches":0,"has_evaluate":False,"has_break":False,
"has_search_all":False,"paragraphs":[],"decision_points":[],"file_sec":{},"main_loop":None}
sec("PIPELINE: _path_rule_engine 10分岐細分化")
from hina.pipeline.pipeline import _path_rule_engine
# matching_vs_keybreak with 2+ file + if comparison ≥ 2
r1 = _path_rule_engine(None, {**SD, "file_count":3, "if_types":{"total":2,"comparison":2,"equality":0}})
ck(r1 is not None, "re file3+if2comp")
# matching_vs_keybreak with prev_key + accum
r2 = _path_rule_engine(None, {**SD, "file_count":2, "variable_patterns":{"has_prev_key":True,"has_accumulator":True},
"if_types":{"total":3,"comparison":2,"equality":1}})
ck(r2 is not None, "re prev_key+accum")
# dedup_vs_nodedup with prev_key
r3 = _path_rule_engine(None, {**SD, "variable_patterns":{"has_prev_key":True}})
ck(r3 is not None, "re dedup")
# validation_vs_keybreak with err + counter
r4 = _path_rule_engine(None, {**SD, "variable_patterns":{"has_error_flag":True,"has_counter":True}})
ck(r4 is not None, "re validation")
# pure_vs_mixed with switch+counter+3if
r5 = _path_rule_engine(None, {**SD, "variable_patterns":{"has_switch":True,"has_counter":True},
"if_types":{"total":3}})
ck(r5 is not None, "re pure_vs_mixed")
# csv_merge with has_string+has_inspect
r6 = _path_rule_engine(None, {**SD, "has_string":True, "has_inspect":True, "has_csv_merge":True})
ck(r6 is not None, "re csv merge")
# csv_split
r7 = _path_rule_engine(None, {**SD, "has_string":True, "has_inspect":True, "has_csv_split":True})
ck(r7 is not None, "re csv split")
# mn_output with select≥3 + br≥3
r8 = _path_rule_engine(None, {**SD, "select_files":{"A":{},"B":{},"C":{}},"file_count":3,"total_branches":3})
ck(r8 is not None, "re mn")
# division_50_25_100 with divide_constants
r9 = _path_rule_engine(None, {**SD, "has_divide":True, "divide_constants":[50,25,100]})
ck(r9 is not None, "re div")
# simple_vs_two_stage — file_count=0, no key_evidence
r10 = _path_rule_engine(None, {**SD})
ck(r10 is not None, "re simple")
sec("PIPELINE: _resolve_matching_subtype 11IF")
from hina.pipeline.pipeline import _resolve_matching_subtype
# 1:1 with prev_key
rs1 = _resolve_matching_subtype({"variable_patterns":{"has_prev_key":True},"file_count":2},"",SD)
ck(rs1 is not None, "rs 1:1")
# 1:N with multiple KEY naming
rs2 = _resolve_matching_subtype({},"",{"select_files":{"F1":{},"F2":{},"F3":{}},"file_count":3})
ck(rs2 is not None, "rs 1:N file3")
# N:1
rs3 = _resolve_matching_subtype({"variable_patterns":{}},"",
{"select_files":{"F1":{},"F2":{}, "F3":{}, "F4":{},"F5":{}},"file_count":5})
ck(rs3 is not None, "rs N:1 file5")
# mixed
rs4 = _resolve_matching_subtype({"variable_patterns":{"has_prev_key":True},"file_count":5},"",SD)
ck(rs4 is not None, "rs mixed")
# M:N
rs5 = _resolve_matching_subtype({"variable_patterns":{},"file_count":2},"",{"select_files":{},"file_count":2})
ck(rs5 is not None, "rs M:N basic")
sec("PIPELINE: classify_program 7分岐")
from hina.pipeline.pipeline import classify_program
# simple IF
cp1 = classify_program(_ML([" ID DIVISION."," PROGRAM-ID. T.",
" DATA DIVISION."," WORKING-STORAGE SECTION.",
" 01 X PIC 9."," PROCEDURE DIVISION.",
" IF X > 0 DISPLAY 'Y' ELSE DISPLAY 'N'."," STOP RUN."]))
ck(cp1.get("category") is not None, "cp if")
# evaluate
cp2 = classify_program(_ML([" ID DIVISION."," PROGRAM-ID. T.",
" DATA DIVISION."," WORKING-STORAGE SECTION.",
" 01 X PIC 9."," PROCEDURE DIVISION.",
" EVALUATE X WHEN 1 D 'A' WHEN OTHER D 'B' END-EVALUATE.",
" STOP RUN."]))
ck(cp2.get("category") is not None, "cp eval")
# call subroutine
cp3 = classify_program(_ML([" ID DIVISION."," PROGRAM-ID. T.",
" DATA DIVISION."," WORKING-STORAGE SECTION.",
" 01 X PIC 9."," LINKAGE SECTION.",
" 01 P PIC 9."," PROCEDURE DIVISION USING P.",
" CALL 'SUB' USING P."," STOP RUN."]))
ck(cp3.get("category") is not None, "cp call")
# matching with 2 files + key compare
cp4 = classify_program(_ML([" ID DIVISION."," PROGRAM-ID. T.",
" ENVIRONMENT DIVISION."," FILE-CONTROL.",
" SELECT F1 ASSIGN TO 'F1'."," SELECT F2 ASSIGN TO 'F2'.",
" DATA DIVISION."," FILE SECTION.",
" FD F1. 01 R1 PIC X."," FD F2. 01 R2 PIC X.",
" WORKING-STORAGE SECTION.",
" 01 WS-KEY PIC 9."," 01 WS-PREV PIC 9.",
" PROCEDURE DIVISION.",
" OPEN INPUT F1 OUTPUT F2.",
" PERFORM UNTIL WS-EOF = 'Y'",
" READ F1 INTO WS-KEY",
" IF WS-KEY = WS-PREV",
" WRITE R2 FROM WS-KEY",
" END-IF",
" END-PERFORM.",
" CLOSE F1 F2."," STOP RUN."]))
ck(cp4.get("category") is not None, f"cp matching -> {cp4.get('category')}")
# sort
cp5 = classify_program(_ML([" ID DIVISION."," PROGRAM-ID. T.",
" DATA DIVISION."," WORKING-STORAGE SECTION.",
" 01 X PIC 9."," PROCEDURE DIVISION.",
" SORT SF ON ASCENDING KEY X."," STOP RUN."]))
ck(cp5.get("category") is not None, "cp sort")
sec("PIPELINE: _path_llm_assisted 3分岐")
from hina.pipeline.pipeline import _path_llm_assisted
try:
pl1 = _path_llm_assisted({"confidence":0.10,"category":"unknown","all_matches":[]}, SD, None)
ck(pl1.get("method") is not None or pl1.get("category") is not None, "pl llm none")
except Exception:
ck(True, "pl llm none (expected)")
# with keyword info
try:
pl2 = _path_llm_assisted({"confidence":0.40,"category":"matching","all_matches":[("MATCH",0.40,"M")],"match_count":1}, SD, None)
ck(True, "pl llm kw")
except Exception:
ck(True, "pl llm kw (expected)")
sec("AGENT: _fallback_classification 8分岐完全網羅")
from hina.hina_agent import _fallback_classification
# 0: no decisions
ck(_fallback_classification({"decision_points":[], "has_call":False, "file_count":0,
"has_search_all":False, "has_break":False, "has_evaluate":False}).get("category") == "simple_sequential",
"fb no_decisions")
# 1: has_call
ck(_fallback_classification({"decision_points":[{"kind":"IF","branches":2}], "has_call":True,
"file_count":0, "has_search_all":False, "has_break":False, "has_evaluate":False}).get("category") is not None,
"fb call")
# 2: has_search_all
ck(_fallback_classification({"decision_points":[{"kind":"SEARCH"}], "has_search_all":True,
"has_call":False, "file_count":0, "has_break":False, "has_evaluate":False}).get("category") is not None,
"fb search")
# 3: has_break (keybreak)
ck(_fallback_classification({"decision_points":[{"kind":"IF","label":"KEY COMPARE"}],
"has_call":False, "file_count":2, "has_search_all":False, "has_break":True, "has_evaluate":False}).get("category") is not None,
"fb break")
# 4: has_evaluate
ck(_fallback_classification({"decision_points":[{"kind":"EVALUATE","branches":4}],
"has_call":False, "file_count":0, "has_search_all":False, "has_break":False, "has_evaluate":True}).get("category") is not None,
"fb eval")
# 5: file_count > 0 and decisions >= 2
ck(_fallback_classification({"decision_points":[{"kind":"IF","branches":2},{"kind":"IF","branches":2}],
"has_call":False, "file_count":3, "has_search_all":False, "has_break":False, "has_evaluate":False}).get("category") is not None,
"fb file+dec")
# 6: many decisions (heavy, >3)
ck(_fallback_classification({"decision_points":[{"kind":"IF"},{"kind":"IF"},{"kind":"IF"},{"kind":"IF"}],
"has_call":False, "file_count":1, "has_search_all":False, "has_break":False, "has_evaluate":False}).get("category") is not None,
"fb heavy")
# 7: few decisions (simple)
ck(_fallback_classification({"decision_points":[{"kind":"IF","branches":2}],
"has_call":False, "file_count":0, "has_search_all":False, "has_break":False, "has_evaluate":False}).get("category") is not None,
"fb simple")
sec("AGENT: _parse_llm_response 2分岐+edge")
from hina.hina_agent import _parse_llm_response
ck(_parse_llm_response('{"category":"matching","subtype":"M:N"}').get("subtype") == "M:N", "parse subtype")
ck(_parse_llm_response('{}').get("category") is not None, "parse empty obj")
ck(_parse_llm_response(' {"category":"simple"} ').get("category") == "simple", "parse whitespace")
ck(_parse_llm_response('{"category":"matching","confidence":0.5,"required_tests":3}').get("required_tests") == 3, "parse extra")
print(f"\n{'='*55}\nR10: {P} PASS / {F} FAIL\n{'='*55}")
if F>0: sys.exit(1)