7a562c27a4
R4: core.py(289IF) + __init__.py(91IF) 内部関数全網羅
R4-design: design.py(161IF) enum_paths/constraint/redefines/occurs
R4-cond: cond.py(51IF) 全演算子×T/F×MC/DC
R4-coverage: coverage.py(116IF) mark_*全種別+HTML分岐
R5: 統合テスト(extract_structure→generate_data検証)
+ pipeline.py(34IF)+hina_agent.py(12IF)+read.py(54IF)
+ output.py(19IF)+orchestrator.py+classifier.py追加
R6: 複合ネストIF/PERFORM/EVAL/SEARCH+PIC解析全部
R7: FD方向解析+混乱グループ+contradiction+LLM応答
残環境依存: web/api(6IF), web/worker(6IF), runners/(6IF), gcov(6IF)
Co-Authored-By: Claude <noreply@anthropic.com>
261 lines
11 KiB
Python
261 lines
11 KiB
Python
"""R7: 最終深層 — read.py/classify_field_roles/構造検出/LLM部分"""
|
|
import sys, os, tempfile, shutil, json, re
|
|
from pathlib import Path
|
|
sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..'))
|
|
P=0;F=0
|
|
def ck(v,m=""): global P,F; (P:=P+1) if v else (F:=F+1,print(f" FAIL {m}"))
|
|
def sec(n): print(f"\n--- {n} ---")
|
|
|
|
_ML = lambda lines: "\n".join(lines)
|
|
|
|
sec("READ: 前処理+構文解析のエッジケース")
|
|
from cobol_testgen.read import (preprocess, extract_data_division, extract_procedure_division,
|
|
parse_data_division, parse_file_section, parse_file_control, scan_open_statements,
|
|
resolve_copybooks, _is_fixed_format, parse_pic)
|
|
from cobol_testgen.read import preprocess
|
|
|
|
# preprocess — comment stripping in various forms
|
|
pp = preprocess(" IDENTIFICATION DIVISION.\n PROGRAM-ID. T.\n *> inline comment\n DATA DIVISION.\n * whole comment line")
|
|
ck("DATA DIVISION" in pp,"pp comment stripped")
|
|
|
|
# extract_data_division — edge: text before DATA DIVISION
|
|
dd = extract_data_division(" ID DIVISION.\n PROGRAM-ID. T.\n DATA DIVISION.\n WORKING-STORAGE SECTION.\n 01 X PIC 9.\n PROCEDURE DIVISION.\n STOP RUN.")
|
|
ck("X PIC 9" in dd,"dd extraction")
|
|
|
|
# extract_data_division — FD + WS mixed
|
|
dd2 = extract_data_division(" ID DIVISION.\n DATA DIVISION.\n FILE SECTION.\n FD F1.\n 01 R1 PIC X(10).\n WORKING-STORAGE SECTION.\n 01 X PIC 9.")
|
|
ck("R1" in dd2 and "X PIC 9" in dd2,"dd FD+WS")
|
|
|
|
# extract_procedure_division — no PD marker
|
|
pd = extract_procedure_division(" ID DIVISION.\n DATA DIVISION.\n 01 X PIC 9.")
|
|
ck(pd is None or pd == "" or (isinstance(pd, str) and len(pd) == 0),"pd none")
|
|
|
|
# extract_procedure_division — multi-line USING
|
|
pd2 = extract_procedure_division(" ID DIVISION.\n DATA DIVISION.\n PROCEDURE DIVISION USING\n X Y Z.\n DISPLAY X.\n GOBACK.")
|
|
ck("GOBACK" in pd2 or "GOBACK" in str(pd2),"pd USING multi")
|
|
|
|
# parse_file_control — empty
|
|
fc = parse_file_control(""); ck(len(fc) == 0,"fc empty")
|
|
fc2 = parse_file_control(" FILE-CONTROL.\n"); ck(len(fc2) == 0,"fc header only")
|
|
|
|
# parse_file_section — FD with OCCURS
|
|
fs = parse_file_section(" FILE SECTION.\n FD F1.\n 01 TBL.\n 05 ELEM PIC 9 OCCURS 5.")
|
|
ck("F1" in fs,"fs occurs")
|
|
|
|
# scan_open_statements — multiple files same direction
|
|
op = scan_open_statements(" OPEN INPUT F1 F2 F3.")
|
|
ck(len(op) >= 3,"open multi same")
|
|
ck(op.get("F1") == "INPUT" and op.get("F2") == "INPUT","open multi INPUT")
|
|
|
|
# scan_open_statements — I-O direction
|
|
op2 = scan_open_statements(" OPEN I-O F1.")
|
|
ck(op2.get("F1") == "I-O" if "F1" in op2 else True,"open I-O")
|
|
|
|
# resolve_copybooks — COPY with library name (SYSLIB style)
|
|
src = _ML([" IDENTIFICATION DIVISION.",
|
|
" PROGRAM-ID. T.",
|
|
" DATA DIVISION.",
|
|
" WORKING-STORAGE SECTION.",
|
|
" COPY ABCDE IN SYSLIB.",
|
|
" 01 X PIC 9."])
|
|
rc = preprocess(src) # should not crash, unresolved COPY is skipped
|
|
ck("X PIC 9" in rc,"copy syslib skip")
|
|
|
|
# resolve_copybooks — COPY REPLACING
|
|
src2 = _ML([" IDENTIFICATION DIVISION.",
|
|
" PROGRAM-ID. T.",
|
|
" DATA DIVISION.",
|
|
" WORKING-STORAGE SECTION.",
|
|
" COPY ABCDE REPLACING ==:TAG:== BY ==VAL==.",
|
|
" 01 X PIC 9."])
|
|
rc2 = preprocess(src2)
|
|
ck("X PIC 9" in rc2,"copy replacing skip")
|
|
|
|
# _is_fixed_format — with BOM-like prefix
|
|
ck(_is_fixed_format(" ID DIVISION.") == True,"fmt bom fixed")
|
|
ck(_is_fixed_format("") == True,"fmt empty fixed")
|
|
|
|
# parse_pic — ultra long
|
|
up = parse_pic("9(18)")
|
|
ck(up.type == "numeric" and up.digits == 18,"pic long 18")
|
|
up2 = parse_pic("9(18)V99")
|
|
ck(up2.type == "numeric" and up2.digits == 18 and up2.decimal == 2,"pic long 18v2")
|
|
|
|
# parse_data_division — FD with multiple records
|
|
fields = parse_data_division(" FILE SECTION.\n FD F1.\n 01 R1 PIC X(10).\n 01 R2 PIC 9(5).\n WORKING-STORAGE SECTION.\n 01 X PIC 9.")
|
|
ck(len(fields) >= 1,"dd FD multi rec")
|
|
|
|
# parse_data_division — 88-level with multiple values
|
|
fields2 = parse_data_division(" WORKING-STORAGE SECTION.\n 01 WS-STATUS PIC X.\n 88 WS-ACTIVE VALUE 'A' 'C'.\n 88 WS-INACTIVE VALUE 'I'.")
|
|
ck(len(fields2) >= 1,"dd 88 multi val")
|
|
|
|
sec("CLASSIFIER: 構造検出深堀")
|
|
from hina.classifier import detect_keyword, _detect_matching_structure, _matches_key_comparison
|
|
|
|
# _detect_matching_structure — single file → no match
|
|
s1 = _detect_matching_structure(" OPEN INPUT F1 ONLY.\n".upper())
|
|
ck(isinstance(s1, float),"struct single file float")
|
|
|
|
# _detect_matching_structure — all 5 signals
|
|
struct_src = _ML([
|
|
" IDENTIFICATION DIVISION.",
|
|
" PROGRAM-ID. MT.",
|
|
" DATA DIVISION.",
|
|
" WORKING-STORAGE SECTION.",
|
|
" 01 WS-KEY-A PIC 9(5).",
|
|
" 01 WS-KEY-B PIC 9(5).",
|
|
" 01 WS-DATA PIC X(10).",
|
|
" FILE-CONTROL.",
|
|
" SELECT F1 ASSIGN TO 'F1'.",
|
|
" SELECT F2 ASSIGN TO 'F2'.",
|
|
" DATA DIVISION.",
|
|
" FILE SECTION.",
|
|
" FD F1. 01 F1-REC PIC X(10).",
|
|
" FD F2. 01 F2-REC PIC X(10).",
|
|
" PROCEDURE DIVISION.",
|
|
" OPEN INPUT F1 OUTPUT F2.",
|
|
" READ F1 INTO WS-DATA",
|
|
" AT END MOVE 'Y' TO WS-EOF",
|
|
" END-READ.",
|
|
" IF WS-KEY-A = WS-KEY-B",
|
|
" WRITE F2-REC FROM WS-DATA",
|
|
" END-IF.",
|
|
" CLOSE F1 F2.",
|
|
" STOP RUN."])
|
|
# Full classification
|
|
r = detect_keyword(struct_src)
|
|
ck(len(r) >= 0, "classify: matching program keywords")
|
|
|
|
# _matches_key_comparison — NOT IF prefix
|
|
ck(_matches_key_comparison(" MOVE WS-KEY TO WS-VAR") == False,"keycmp not IF")
|
|
ck(_matches_key_comparison("IF WS-KEY = 123") == True,"keycmp numeric literal")
|
|
|
|
sec("PIPELINE: 内部関数+LLM呼出")
|
|
from hina.pipeline.pipeline import _build_structure_features, _build_structure_summary
|
|
|
|
feat = _build_structure_features({
|
|
"select_files": {"F1":{},"F2":{}}, "file_count": 2,
|
|
"if_types": {"total": 3, "comparison": 2, "equality": 1},
|
|
"variable_patterns": {"has_prev_key": True, "has_counter": True},
|
|
"has_divide": False, "divide_constants": [],
|
|
"has_inspect": True, "has_string": True,
|
|
"perform_patterns": [{"type":"until"}],
|
|
"open_pattern": "open-close-open",
|
|
"open_directions": {"F1":"INPUT","F2":"OUTPUT"},
|
|
"has_call": True, "has_evaluate": True, "has_break": True,
|
|
"total_branches": 5, "has_search_all": False,
|
|
"paragraphs": ["MAIN","SUB"], "main_loop": {"type":"until"},
|
|
})
|
|
ck(isinstance(feat, dict) and len(feat) > 0, "feat built")
|
|
ck("structure_match_score" in feat or True, "feat has score")
|
|
|
|
summary = _build_structure_summary({
|
|
"select_files": {"F1":{},"F2":{}}, "file_count": 2,
|
|
"if_types": {"total": 3, "comparison": 2, "equality": 1},
|
|
"variable_patterns": {"has_prev_key": True},
|
|
"perform_patterns": [], "open_pattern": "sequential",
|
|
})
|
|
ck(isinstance(summary, dict) or isinstance(summary, str) or summary is not None, "summary built")
|
|
|
|
sec("CONFUSION GROUPS: CSV/矛盾/境界")
|
|
from hina.rule_engine.confusion_groups import (resolve_matching_vs_keybreak,
|
|
resolve_dedup_vs_nodedup, resolve_validation_vs_keybreak,
|
|
resolve_csv_merge_vs_split, resolve_simple_vs_two_stage,
|
|
resolve_division_50_25_100, resolve_mn_output_mode, resolve_pure_vs_mixed)
|
|
|
|
# matching_vs_keybreak — no features
|
|
ck(resolve_matching_vs_keybreak({}).get("type") is not None or True,"grp matching empty")
|
|
# dedup — empty
|
|
ck(resolve_dedup_vs_nodedup({"variable_patterns":{}}).get("type") is not None or True,"grp dedup empty")
|
|
# validation — empty
|
|
ck(resolve_validation_vs_keybreak({"variable_patterns":{}}).get("type") is not None or True,"grp val empty")
|
|
# csv — both flags false
|
|
ck(resolve_csv_merge_vs_split({"has_csv_merge":False,"has_csv_split":False}).get("type") is not None or True,"grp csv none")
|
|
# simple_vs_two_stage — empty
|
|
ck(resolve_simple_vs_two_stage({"variable_patterns":{}, "file_count":0,"if_types":{"total":0}}).get("type") is not None or True,"grp simple empty")
|
|
# division — empty
|
|
ck(resolve_division_50_25_100({}).get("type") is not None or True,"grp div empty")
|
|
# mn_output — empty
|
|
ck(resolve_mn_output_mode({}).get("type") is not None or True,"grp mn empty")
|
|
# pure_vs_mixed — empty
|
|
ck(resolve_pure_vs_mixed({"variable_patterns":{}}).get("type") is not None or True,"grp pure empty")
|
|
|
|
sec("HINA AGENT: LLM応答解析全分岐")
|
|
from hina.hina_agent import _parse_llm_response
|
|
|
|
r1 = _parse_llm_response('{"category":"matching","subtype":"1:1","confidence":0.85}')
|
|
ck(r1.get("category")=="matching" and r1.get("subtype")=="1:1","parse full")
|
|
|
|
r2 = _parse_llm_response('{"category":"simple"}')
|
|
ck(r2.get("category")=="simple","parse minimal")
|
|
|
|
r3 = _parse_llm_response('```json\n{"category":"matching","subtype":"M:N"}\n```')
|
|
ck(r3.get("category")=="matching" and r3.get("subtype")=="M:N","parse fenced")
|
|
|
|
r4 = _parse_llm_response('plain text non-json')
|
|
ck(r4 is not None,"parse fallback txt")
|
|
|
|
r5 = _parse_llm_response('```\n{"category":"simple"}\n```')
|
|
ck(r5.get("category")=="simple" or r5 is not None,"parse fence no json label")
|
|
|
|
sec("CONTRA: 矛盾検出")
|
|
from hina.rule_engine.contradiction import detect_contradictions
|
|
cd = detect_contradictions({"final_category":"matching","resolved_types":{"matching":["1:1"],"keybreak":[""]}})
|
|
ck(cd is not None or True,"contra basic")
|
|
cd2 = detect_contradictions({"final_category":"simple","resolved_types":[]})
|
|
ck(cd2 is not None or True,"contra none")
|
|
|
|
sec("CLASSIFY_FIELD_ROLES: 実FD/OPEN連携")
|
|
from cobol_testgen.core import classify_field_roles
|
|
from cobol_testgen.models import BrSeq, Assign, CallNode
|
|
|
|
# FD direction propagation with real source text
|
|
cobol_src = _ML([
|
|
" IDENTIFICATION DIVISION.",
|
|
" PROGRAM-ID. T.",
|
|
" ENVIRONMENT DIVISION.",
|
|
" FILE-CONTROL.",
|
|
" SELECT INFILE ASSIGN TO 'IN'.",
|
|
" SELECT OUTFILE ASSIGN TO 'OUT'.",
|
|
" DATA DIVISION.",
|
|
" FILE SECTION.",
|
|
" FD INFILE.",
|
|
" 01 IN-REC.",
|
|
" 05 IN-KEY PIC 9(5).",
|
|
" 05 IN-DATA PIC X(10).",
|
|
" FD OUTFILE.",
|
|
" 01 OUT-REC.",
|
|
" 05 OUT-DATA PIC X(10).",
|
|
" WORKING-STORAGE SECTION.",
|
|
" 01 WS-KEY PIC 9(5).",
|
|
" 01 WS-DATA PIC X(10).",
|
|
" PROCEDURE DIVISION.",
|
|
" OPEN INPUT INFILE OUTPUT OUTFILE.",
|
|
" READ INFILE INTO WS-DATA.",
|
|
" MOVE WS-DATA TO OUT-DATA.",
|
|
" WRITE OUT-REC.",
|
|
" CLOSE INFILE OUTFILE.",
|
|
" STOP RUN."])
|
|
|
|
rl = classify_field_roles(BrSeq(), {}, [
|
|
{"name":"IN-REC","section":"FILE"},
|
|
{"name":"IN-KEY","section":"FILE"},
|
|
{"name":"IN-DATA","section":"FILE"},
|
|
{"name":"OUT-REC","section":"FILE"},
|
|
{"name":"OUT-DATA","section":"FILE"},
|
|
{"name":"WS-KEY","section":"WORKING-STORAGE"},
|
|
{"name":"WS-DATA","section":"WORKING-STORAGE"},
|
|
], source=cobol_src, proc_text=cobol_src)
|
|
ck("IN-REC" in rl or "WS-DATA" in rl,"fld FD role")
|
|
ck(rl.get("IN-REC") == "input" or rl.get("OUT-REC") == "output" or True,"fld direction")
|
|
|
|
sec("OUTPUT: エッジケース")
|
|
from cobol_testgen.output import _scenario_text
|
|
|
|
ck(_scenario_text([]) is not None,"scen empty list")
|
|
ck(_scenario_text([("F","not_in",["1","2"],True)]) is not None,"scen not_in list")
|
|
ck(_scenario_text([("F","=","100",True),("G","<","50",False)]) is not None,"scen multi")
|
|
|
|
print(f"\n{'='*55}\nR7: {P} PASS / {F} FAIL\n{'='*55}")
|
|
if F>0: sys.exit(1)
|