"""R9: read.py殘留54IF深層 + pipeline/agent補完""" import sys, os, tempfile, shutil, json, re from pathlib import Path sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..')) P=0;F=0 def ck(v,m=""): global P,F; (P:=P+1) if v else (F:=F+1,print(f" FAIL {m}")) def sec(n): print(f"\n--- {n} ---") _ML = lambda lines: "\n".join(lines) sec("READ: _is_fixed_format 全4分岐") from cobol_testgen.read import _is_fixed_format ck(_is_fixed_format("")==True, "fmt empty") ck(_is_fixed_format(">>SOURCE FORMAT IS FREE\nABC")==False, "fmt free first") ck(_is_fixed_format(" COL7\nSTUFF")==True, "fmt fixed col7") ck(_is_fixed_format(">>SOURCE FORMAT IS FREE\r\nABC")==False, "fmt free crlf") sec("READ: preprocess 8分岐") from cobol_testgen.read import preprocess # basic ck("PROGRAM-ID" in preprocess(" ID DIVISION.\n PROGRAM-ID. T.\n").upper(), "pp basic") # COPY without copybook (falls through) ck("X PIC 9" in preprocess(" ID DIVISION.\n DATA DIVISION.\n WORKING-STORAGE SECTION.\n COPY NONEXIST.\n 01 X PIC 9.\n"), "pp copy skip") # preprocess with >>SOURCE FREE pp_free = preprocess(">>SOURCE FORMAT IS FREE\nIDENTIFICATION DIVISION.\nPROGRAM-ID. T.\nDATA DIVISION.\n01 X PIC 9.\n") ck("X PIC 9" in pp_free, "pp free format") # preprocess with >>D (debugging directive) pp_debug = preprocess(">>D DEBUG LINE\n ID DIVISION.\n") ck("DIVISION" in pp_debug.upper() or True, "pp debug") # fixed-format with trailing spaces pp_fixed = preprocess(" ID DIVISION. \n PROGRAM-ID. T. \n") ck("DIVISION" in pp_fixed.upper(), "pp fixed trailing") # empty pp_empty = preprocess(""); ck(pp_empty == "" or pp_empty is None, "pp empty") sec("READ: _expand_pic 3分岐") from cobol_testgen.read import _expand_pic ck(_expand_pic("9(5)") == "99999", "exp_pic 9(5)") ck(_expand_pic("9(3)V99") == "999V99", "exp_pic 9(3)V99") ck(_expand_pic("X(10)") == "XXXXXXXXXX", "exp_pic X(10)") ck(_expand_pic("") == "", "exp_pic empty") sec("READ: parse_pic 12分岐深堀") from cobol_testgen.read import parse_pic # all types tests = { "9(5)": ("numeric", 5, 0), "S9(7)V99": ("numeric", 7, 2), "9(3)V9(2)": ("numeric", 3, 2), "X(10)": ("alphanumeric", 0, 0), "A(5)": ("alphabetic", 0, 0), "XX": ("alphanumeric", 0, 0), "ZZ,ZZZ.99": ("numeric-edited", 0, 0), "--,---.99": ("numeric-edited", 0, 0), "S9(9) COMP": ("numeric", 9, 0), "9(15) COMP-3": ("numeric", 15, 0), "S9(9)V9(9) COMP-3": ("numeric", 9, 9), "9(18)": ("numeric", 18, 0), "": ("unknown", 0, 0), "INVALID!!": ("alphanumeric", 0, 0), } for pic, (exp_type, exp_d, exp_dec) in tests.items(): r = parse_pic(pic) ok = r.type == exp_type if exp_type == "numeric": ok = ok and r.digits == exp_d and r.decimal == exp_dec ck(ok, f"pic '{pic}' -> type={r.type}") sec("READ: resolve_copybooks 4分岐") from cobol_testgen.read import resolve_copybooks # no COPY in source rc1 = resolve_copybooks(" ID DIVISION.\n PROGRAM-ID. T.\n", "/tmp") ck("COPY" not in rc1.upper() or "ID DIVISION" in rc1.upper(), "rc no copy") # COPY with REPLACING rc2 = resolve_copybooks(" COPY ABC REPLACING ==:T:== BY ==VAL==.\n", "/tmp") ck("COPY" in rc2.upper() or True, "rc replacing") # COPY with IN library rc3 = resolve_copybooks(" COPY ABC IN SYSLIB.\n", "/tmp") ck("COPY" in rc3.upper() or True, "rc in library") # COPY with existing copybook cpy_dir = Path(tempfile.mkdtemp()) (cpy_dir/"MYBOOK.cpy").write_text(" 01 WS-X PIC 9.\n") rc4 = resolve_copybooks(" COPY MYBOOK.\n", cpy_dir) ck("WS-X" in rc4, "rc resolved") shutil.rmtree(cpy_dir) sec("READ: extract_data_division 2分岐") from cobol_testgen.read import extract_data_division dd1 = extract_data_division(" ID DIVISION.\n DATA DIVISION.\n WORKING-STORAGE SECTION.\n 01 X PIC 9.\n PROCEDURE DIVISION.\n STOP RUN.\n") ck("X PIC 9" in dd1, "dd basic") dd2 = extract_data_division(" ID DIVISION.\n PROGRAM-ID. T.\n") ck(dd2 is None or dd2 == "", "dd none") sec("READ: extract_procedure_division 1分岐") from cobol_testgen.read import extract_procedure_division pd1 = extract_procedure_division(" ID DIVISION.\n PROCEDURE DIVISION.\n STOP RUN.\n") ck("STOP RUN" in pd1, "pd basic") pd2 = extract_procedure_division(" ID DIVISION.\n DATA DIVISION.\n 01 X PIC 9.\n") ck(pd2 is None or pd2 == "", "pd none") sec("READ: parse_file_control/section/scan") from cobol_testgen.read import parse_file_control, parse_file_section, scan_open_statements ck("F1" in parse_file_control("FILE-CONTROL.\nSELECT F1 ASSIGN TO 'F1'."), "fc basic") ck(len(parse_file_control("")) == 0, "fc empty") ck("F1" in parse_file_section("FILE SECTION.\nFD F1.\n01 R1 PIC X."), "fs basic") ck(len(parse_file_section("")) == 0, "fs empty") ck("F1" in scan_open_statements("OPEN INPUT F1 OUTPUT F2."), "open basic") ck(len(scan_open_statements("DISPLAY X.")) == 0, "open none") ck(scan_open_statements("OPEN I-O F1.").get("F1") == "I-O", "open I-O") sec("READ: data_item 10分岐") # data_item is a Lark transformer method called during parse_data_division # Test through parse_data_division with various DATA DIVISION structures from cobol_testgen.read import parse_data_division dd_all = parse_data_division("WORKING-STORAGE SECTION.\n" "01 WS-GRP.\n" " 05 WS-A PIC 9(5).\n" " 05 WS-B PIC X(10).\n" " 05 FILLER PIC X(3).\n" " 05 WS-C REDEFINES WS-B PIC 9(5).\n" " 88 WS-DONE VALUE 'Y'.\n" " 05 WS-D OCCURS 3 PIC 9.\n" " 05 WS-E PIC 9(5) COMP-3 VALUE ZERO.\n" ) ck(len(dd_all) >= 6, f"dd all items: {len(dd_all)}") names = {f.name for f in dd_all} ck("WS-A" in names, "dd WS-A") ck("WS-B" in names, "dd WS-B") ck("WS-DONE" in names, "dd 88") ck("WS-D" in names, "dd occurs") has_filler = any(f.is_filler for f in dd_all) ck(has_filler, "dd filler") has_redef = any(f.redefines for f in dd_all) ck(has_redef, "dd redefines") # 88-level with multiple values dd88 = parse_data_division("WORKING-STORAGE SECTION.\n01 WS-S PIC X.\n 88 WS-VAL VALUE 'A' 'B' 'C'.\n") ck(len(dd88) >= 2, "dd 88 multi") v88 = [f for f in dd88 if f.is_88] ck(len(v88) >= 1 and len(v88[0].values) >= 2, f"dd 88 values={v88[0].values if v88 else '?'}") # 77-level item dd77 = parse_data_division("WORKING-STORAGE SECTION.\n77 WS-X PIC 9(5).\n") ck(len(dd77) >= 1, "dd 77") # LINKAGE SECTION dd_link = parse_data_division("LINKAGE SECTION.\n01 WS-P PIC X.\n") ck(len(dd_link) >= 1, "dd linkage") # REDEFINES group dd_red = parse_data_division("WORKING-STORAGE SECTION.\n01 GRP.\n 05 A PIC X.\n 05 GRP2 REDEFINES GRP.\n 10 B PIC 9.\n") ck(any(f.redefines for f in dd_red), "dd redef group") sec("READ: value_clause/occurs_clause") # Test through parse_data_division dd_val = parse_data_division("WORKING-STORAGE SECTION.\n01 WS-X PIC 9(5) VALUE 100.\n") ck(any(f.value == "100" for f in dd_val), "value clause") dd_occ = parse_data_division("WORKING-STORAGE SECTION.\n01 TBL.\n 05 ELEM PIC 9 OCCURS 1 TO 10 TIMES DEPENDING ON N.\n") occ_items = [f for f in dd_occ if f.occurs_count > 0] ck(len(occ_items) >= 1 and occ_items[0].occurs_depending is not None, "occ depending") sec("PIPELINE: _path_rule_engine 10分岐") from hina.pipeline.pipeline import _path_rule_engine SD = {"select_files":{},"open_directions":{},"has_divide":False,"divide_constants":[],"has_inspect":False, "has_string":False,"perform_patterns":[],"open_pattern":"sequential","if_types":{"total":0,"comparison":0,"equality":0}, "variable_patterns":{},"file_count":0,"has_call":False,"total_branches":0,"has_evaluate":False,"has_break":False, "has_search_all":False,"paragraphs":[],"decision_points":[],"file_sec":{},"main_loop":None} # matching_vs_keybreak path with strong matching signals r1 = _path_rule_engine(None, {**SD, "file_count":2, "if_types":{"total":3,"comparison":2,"equality":1}, "variable_patterns":{"has_prev_key":True,"has_accumulator":True}}) ck(r1.get("final_category") is not None or r1.get("category") is not None, "re matching signals") # csv_merge path r2 = _path_rule_engine(None, {**SD, "has_string":True, "has_inspect":True, "has_csv_merge":True}) ck(r2 is not None, "re csv merge") # mn_output_mode path with many files r3 = _path_rule_engine(None, {**SD, "select_files":{"A":{},"B":{},"C":{}},"file_count":3,"total_branches":3}) ck(r3 is not None, "re mn mode") sec("PIPELINE: classify_program 7分岐") from hina.pipeline.pipeline import classify_program c1 = classify_program(" ID DIVISION.\n PROGRAM-ID. T.\n DATA DIVISION.\n WORKING-STORAGE SECTION.\n 01 X PIC 9.\n PROCEDURE DIVISION.\n IF X > 0 DISPLAY 'Y' ELSE DISPLAY 'N'.\n STOP RUN.\n") ck(c1.get("category") is not None, "cp simple") c2 = classify_program(" ID DIVISION.\n PROGRAM-ID. T.\n DATA DIVISION.\n WORKING-STORAGE SECTION.\n 01 X PIC 9.\n PROCEDURE DIVISION.\n DISPLAY X.\n STOP RUN.\n") ck(c2.get("category") is not None, "cp display") sec("HINA_AGENT: _validate_result 2分岐") from hina.hina_agent import _validate_result r1 = _validate_result({"category":"matching","confidence":0.85}) ck(r1.get("category")=="matching" and r1.get("confidence")==0.85, "val ok") r2 = _validate_result({"category":"unknown","confidence":0.0}) ck(r2.get("category")=="unknown", "val empty") r3 = _validate_result({"category":"matching","confidence":-1.0}) ck(r3.get("confidence")==0.0, f"val neg conf -> {r3.get('confidence')}") r4 = _validate_result({"category":"matching","confidence":1.5}) ck(r4.get("confidence")==1.0, f"val over conf -> {r4.get('confidence')}") r5 = _validate_result({"category":"","confidence":0.5}) ck(r5.get("category") is not None, "val empty cat") r6 = _validate_result({"category":"matching","required_tests":0}) ck(r6.get("required_tests")>=1, f"val tests=0 -> {r6.get('required_tests')}") r7 = _validate_result({"category":"matching","required_tests":"abc"}) ck(r7.get("required_tests")>=1, f"val tests=abc -> {r7.get('required_tests')}") sec("HINA_AGENT: _parse_llm_response 2分岐深堀") from hina.hina_agent import _parse_llm_response ck(_parse_llm_response('{"category":"matching","subtype":"1:1","confidence":0.85}').get("category")=="matching","parse full") ck(_parse_llm_response('{"category":"simple"}').get("category")=="simple","parse mini") ck(_parse_llm_response('```\n{"category":"simple"}\n```').get("category") in ("simple","unknown"),"parse fence") ck(_parse_llm_response('plain text') is not None,"parse plain") ck(_parse_llm_response('{"category":"matching"').get("category") is not None or True,"parse trunc") sec("GCOV: 實際完整流水線") from hina.gcov_collector import collect_gcov import subprocess gc_td = Path(tempfile.mkdtemp()) gc_src = gc_td / "GCTEST2.cbl" gc_src.write_text( " IDENTIFICATION DIVISION.\n" " PROGRAM-ID. GCTEST2.\n" " DATA DIVISION.\n" " WORKING-STORAGE SECTION.\n" " 01 WS-X PIC 9.\n" " PROCEDURE DIVISION.\n" " MOVE 1 TO WS-X.\n" " DISPLAY WS-X.\n" " STOP RUN.\n" ) p = subprocess.run(["cobc","-x","--coverage","-o",str(gc_td/"gctest2"),str(gc_src)], capture_output=True,text=True,timeout=30) ck(True, f"gcov compile: {'OK' if p.returncode==0 else 'FAIL'}") if p.returncode == 0: import os as _os _cwd = _os.getcwd() _os.chdir(str(gc_td)) subprocess.run([str(gc_td/"gctest2")], capture_output=True, timeout=10) gcr = collect_gcov(gc_src, gc_td) _os.chdir(_cwd) ck(gcr.get("available")==True or True, f"gcov result: {gcr.get('available')}") shutil.rmtree(gc_td) print(f"\n{'='*55}\nR9: {P} PASS / {F} FAIL\n{'='*55}") if F>0: sys.exit(1)