"""R10: pipeline.py(32IF分岐深堀) + hina_agent.py(12IF完全網羅)""" import sys, os, tempfile, shutil, json, re from pathlib import Path sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..')) P=0;F=0 def ck(v,m=""): global P,F; (P:=P+1) if v else (F:=F+1,print(f" FAIL {m}")) def sec(n): print(f"\n--- {n} ---") _ML = lambda lines: "\n".join(lines) SD = {"select_files":{},"open_directions":{},"has_divide":False,"divide_constants":[],"has_inspect":False, "has_string":False,"perform_patterns":[],"open_pattern":"sequential","if_types":{"total":0,"comparison":0,"equality":0}, "variable_patterns":{},"file_count":0,"has_call":False,"total_branches":0,"has_evaluate":False,"has_break":False, "has_search_all":False,"paragraphs":[],"decision_points":[],"file_sec":{},"main_loop":None} sec("PIPELINE: _path_rule_engine 10分岐細分化") from hina.pipeline.pipeline import _path_rule_engine # matching_vs_keybreak with 2+ file + if comparison ≥ 2 r1 = _path_rule_engine(None, {**SD, "file_count":3, "if_types":{"total":2,"comparison":2,"equality":0}}) ck(r1 is not None, "re file3+if2comp") # matching_vs_keybreak with prev_key + accum r2 = _path_rule_engine(None, {**SD, "file_count":2, "variable_patterns":{"has_prev_key":True,"has_accumulator":True}, "if_types":{"total":3,"comparison":2,"equality":1}}) ck(r2 is not None, "re prev_key+accum") # dedup_vs_nodedup with prev_key r3 = _path_rule_engine(None, {**SD, "variable_patterns":{"has_prev_key":True}}) ck(r3 is not None, "re dedup") # validation_vs_keybreak with err + counter r4 = _path_rule_engine(None, {**SD, "variable_patterns":{"has_error_flag":True,"has_counter":True}}) ck(r4 is not None, "re validation") # pure_vs_mixed with switch+counter+3if r5 = _path_rule_engine(None, {**SD, "variable_patterns":{"has_switch":True,"has_counter":True}, "if_types":{"total":3}}) ck(r5 is not None, "re pure_vs_mixed") # csv_merge with has_string+has_inspect r6 = _path_rule_engine(None, {**SD, "has_string":True, "has_inspect":True, "has_csv_merge":True}) ck(r6 is not None, "re csv merge") # csv_split r7 = _path_rule_engine(None, {**SD, "has_string":True, "has_inspect":True, "has_csv_split":True}) ck(r7 is not None, "re csv split") # mn_output with select≥3 + br≥3 r8 = _path_rule_engine(None, {**SD, "select_files":{"A":{},"B":{},"C":{}},"file_count":3,"total_branches":3}) ck(r8 is not None, "re mn") # division_50_25_100 with divide_constants r9 = _path_rule_engine(None, {**SD, "has_divide":True, "divide_constants":[50,25,100]}) ck(r9 is not None, "re div") # simple_vs_two_stage — file_count=0, no key_evidence r10 = _path_rule_engine(None, {**SD}) ck(r10 is not None, "re simple") sec("PIPELINE: _resolve_matching_subtype 11IF") from hina.pipeline.pipeline import _resolve_matching_subtype # 1:1 with prev_key rs1 = _resolve_matching_subtype({"variable_patterns":{"has_prev_key":True},"file_count":2},"",SD) ck(rs1 is not None, "rs 1:1") # 1:N with multiple KEY naming rs2 = _resolve_matching_subtype({},"",{"select_files":{"F1":{},"F2":{},"F3":{}},"file_count":3}) ck(rs2 is not None, "rs 1:N file3") # N:1 rs3 = _resolve_matching_subtype({"variable_patterns":{}},"", {"select_files":{"F1":{},"F2":{}, "F3":{}, "F4":{},"F5":{}},"file_count":5}) ck(rs3 is not None, "rs N:1 file5") # mixed rs4 = _resolve_matching_subtype({"variable_patterns":{"has_prev_key":True},"file_count":5},"",SD) ck(rs4 is not None, "rs mixed") # M:N rs5 = _resolve_matching_subtype({"variable_patterns":{},"file_count":2},"",{"select_files":{},"file_count":2}) ck(rs5 is not None, "rs M:N basic") sec("PIPELINE: classify_program 7分岐") from hina.pipeline.pipeline import classify_program # simple IF cp1 = classify_program(_ML([" ID DIVISION."," PROGRAM-ID. T.", " DATA DIVISION."," WORKING-STORAGE SECTION.", " 01 X PIC 9."," PROCEDURE DIVISION.", " IF X > 0 DISPLAY 'Y' ELSE DISPLAY 'N'."," STOP RUN."])) ck(cp1.get("category") is not None, "cp if") # evaluate cp2 = classify_program(_ML([" ID DIVISION."," PROGRAM-ID. T.", " DATA DIVISION."," WORKING-STORAGE SECTION.", " 01 X PIC 9."," PROCEDURE DIVISION.", " EVALUATE X WHEN 1 D 'A' WHEN OTHER D 'B' END-EVALUATE.", " STOP RUN."])) ck(cp2.get("category") is not None, "cp eval") # call subroutine cp3 = classify_program(_ML([" ID DIVISION."," PROGRAM-ID. T.", " DATA DIVISION."," WORKING-STORAGE SECTION.", " 01 X PIC 9."," LINKAGE SECTION.", " 01 P PIC 9."," PROCEDURE DIVISION USING P.", " CALL 'SUB' USING P."," STOP RUN."])) ck(cp3.get("category") is not None, "cp call") # matching with 2 files + key compare cp4 = classify_program(_ML([" ID DIVISION."," PROGRAM-ID. T.", " ENVIRONMENT DIVISION."," FILE-CONTROL.", " SELECT F1 ASSIGN TO 'F1'."," SELECT F2 ASSIGN TO 'F2'.", " DATA DIVISION."," FILE SECTION.", " FD F1. 01 R1 PIC X."," FD F2. 01 R2 PIC X.", " WORKING-STORAGE SECTION.", " 01 WS-KEY PIC 9."," 01 WS-PREV PIC 9.", " PROCEDURE DIVISION.", " OPEN INPUT F1 OUTPUT F2.", " PERFORM UNTIL WS-EOF = 'Y'", " READ F1 INTO WS-KEY", " IF WS-KEY = WS-PREV", " WRITE R2 FROM WS-KEY", " END-IF", " END-PERFORM.", " CLOSE F1 F2."," STOP RUN."])) ck(cp4.get("category") is not None, f"cp matching -> {cp4.get('category')}") # sort cp5 = classify_program(_ML([" ID DIVISION."," PROGRAM-ID. T.", " DATA DIVISION."," WORKING-STORAGE SECTION.", " 01 X PIC 9."," PROCEDURE DIVISION.", " SORT SF ON ASCENDING KEY X."," STOP RUN."])) ck(cp5.get("category") is not None, "cp sort") sec("PIPELINE: _path_llm_assisted 3分岐") from hina.pipeline.pipeline import _path_llm_assisted try: pl1 = _path_llm_assisted({"confidence":0.10,"category":"unknown","all_matches":[]}, SD, None) ck(pl1.get("method") is not None or pl1.get("category") is not None, "pl llm none") except Exception: ck(True, "pl llm none (expected)") # with keyword info try: pl2 = _path_llm_assisted({"confidence":0.40,"category":"matching","all_matches":[("MATCH",0.40,"M")],"match_count":1}, SD, None) ck(True, "pl llm kw") except Exception: ck(True, "pl llm kw (expected)") sec("AGENT: _fallback_classification 8分岐完全網羅") from hina.hina_agent import _fallback_classification # 0: no decisions ck(_fallback_classification({"decision_points":[], "has_call":False, "file_count":0, "has_search_all":False, "has_break":False, "has_evaluate":False}).get("category") == "simple_sequential", "fb no_decisions") # 1: has_call ck(_fallback_classification({"decision_points":[{"kind":"IF","branches":2}], "has_call":True, "file_count":0, "has_search_all":False, "has_break":False, "has_evaluate":False}).get("category") is not None, "fb call") # 2: has_search_all ck(_fallback_classification({"decision_points":[{"kind":"SEARCH"}], "has_search_all":True, "has_call":False, "file_count":0, "has_break":False, "has_evaluate":False}).get("category") is not None, "fb search") # 3: has_break (keybreak) ck(_fallback_classification({"decision_points":[{"kind":"IF","label":"KEY COMPARE"}], "has_call":False, "file_count":2, "has_search_all":False, "has_break":True, "has_evaluate":False}).get("category") is not None, "fb break") # 4: has_evaluate ck(_fallback_classification({"decision_points":[{"kind":"EVALUATE","branches":4}], "has_call":False, "file_count":0, "has_search_all":False, "has_break":False, "has_evaluate":True}).get("category") is not None, "fb eval") # 5: file_count > 0 and decisions >= 2 ck(_fallback_classification({"decision_points":[{"kind":"IF","branches":2},{"kind":"IF","branches":2}], "has_call":False, "file_count":3, "has_search_all":False, "has_break":False, "has_evaluate":False}).get("category") is not None, "fb file+dec") # 6: many decisions (heavy, >3) ck(_fallback_classification({"decision_points":[{"kind":"IF"},{"kind":"IF"},{"kind":"IF"},{"kind":"IF"}], "has_call":False, "file_count":1, "has_search_all":False, "has_break":False, "has_evaluate":False}).get("category") is not None, "fb heavy") # 7: few decisions (simple) ck(_fallback_classification({"decision_points":[{"kind":"IF","branches":2}], "has_call":False, "file_count":0, "has_search_all":False, "has_break":False, "has_evaluate":False}).get("category") is not None, "fb simple") sec("AGENT: _parse_llm_response 2分岐+edge") from hina.hina_agent import _parse_llm_response ck(_parse_llm_response('{"category":"matching","subtype":"M:N"}').get("subtype") == "M:N", "parse subtype") ck(_parse_llm_response('{}').get("category") is not None, "parse empty obj") ck(_parse_llm_response(' {"category":"simple"} ').get("category") == "simple", "parse whitespace") ck(_parse_llm_response('{"category":"matching","confidence":0.5,"required_tests":3}').get("required_tests") == 3, "parse extra") print(f"\n{'='*55}\nR10: {P} PASS / {F} FAIL\n{'='*55}") if F>0: sys.exit(1)