diff --git a/cobol_testgen/read.py b/cobol_testgen/read.py index 2c5d107..1e8367c 100644 --- a/cobol_testgen/read.py +++ b/cobol_testgen/read.py @@ -92,7 +92,7 @@ def extract_procedure_division(source: str) -> str: _COPYBOOK_EXTENSIONS = ['.cpy', '.cbl', '.cpb', ''] -def resolve_copybooks(source: str, source_dir: str) -> str: +def resolve_copybooks(source: str, source_dir: str, _recursion_depth: int = 0) -> str: """Find COPY statements and replace with copybook content.""" _RE_COPY = re.compile( r"^\s*COPY\s+(\w[\w-]*)(?:\s+REPLACING\s+(.+?))?\s*\.?\s*$", @@ -113,7 +113,12 @@ def resolve_copybooks(source: str, source_dir: str) -> str: found = p break if found: + if _recursion_depth > 10: + logger.warning(f"COPY circular dependency detected for {name}, skipping") + continue cb = found.read_text(encoding='utf-8') + # Recursively resolve nested COPY inside the copybook + cb = resolve_copybooks(cb, source_dir, _recursion_depth + 1) if m.group(2): pairs = _RE_PAIR.findall(m.group(2)) for old, new in pairs: diff --git a/test-data/r16_vuln_review.py b/test-data/r16_vuln_review.py new file mode 100644 index 0000000..07fd948 --- /dev/null +++ b/test-data/r16_vuln_review.py @@ -0,0 +1,185 @@ +"""R16: Expert vulnerability review — live probing for real bugs""" +import sys, os, glob, json, random, tempfile, shutil, time +from pathlib import Path +sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..')) +P=0;F=0;BUGS=[] +def ck(v,m=""): global P,F; (P:=P+1) if v else (F:=F+1,print(f" FAIL {m}")) +def sec(n): print(f"\n--- {n} ---") +def bug(cat,desc,sev): BUGS.append((cat,desc,sev)) +ML = lambda lines: "\n".join(lines) + +from cobol_testgen import extract_structure, generate_data, expand_occurs +from hina.pipeline.pipeline import classify_program +from hina.classifier import detect_keyword + +sec("VULN#1: Non-deterministic output across runs") +src = " ID DIVISION.\n PROGRAM-ID. T.\n DATA DIVISION.\n WORKING-STORAGE SECTION.\n 01 A PIC 99.\n PROCEDURE DIVISION.\n IF A > 50 STOP RUN ELSE STOP RUN.\n STOP RUN.\n" +results = [] +for _ in range(5): + r = generate_data(src, extract_structure(src)) + results.append([rec.get("A","?") for rec in r]) +all_same = all(r == results[0] for r in results) +if not all_same: + bug("DETERMINISM","generate_data produces different values across runs","HIGH") +ck(all_same, "V1: deterministic across 5 runs") + +sec("VULN#2: Crash on edge COBOL features (ALTER/ENTRY)") +for name, extsrc in [("ALTER","ALTER PARA1 TO PROCEED TO PARA2.\nPARA1.\nSTOP RUN.\nPARA2.\nSTOP RUN.\n"), + ("ENTRY","ENTRY 'SUB'.\nSTOP RUN.\n")]: + s = " ID DIVISION.\n PROGRAM-ID. T.\n PROCEDURE DIVISION.\n"+extsrc + try: + st = extract_structure(s); generate_data(s, st) + ck(True, f"V2: {name} OK") + except Exception as e: + bug("CRASH",f"extract_structure crashes on {name}: {str(e)[:50]}","HIGH") + ck(False, f"V2: {name} CRASH") + +sec("VULN#3: Large COBOL program (500 fields, 250 IFs)") +big = " ID DIVISION.\n PROGRAM-ID. L.\n DATA DIVISION.\n WORKING-STORAGE SECTION.\n" +big += "\n".join(f" 05 F{i:03d} PIC 9(5)." for i in range(500)) +big += "\n PROCEDURE DIVISION.\n" +for i in range(0, 500, 2): + big += f" IF F{i:03d} > F{i+1:03d} D 'X' ELSE D 'Y'.\n" +big += " STOP RUN.\n" +t0=time.time() +st=extract_structure(big) +tt=time.time()-t0 +ck(tt<30, f"V3a: {tt:.1f}s for 500 fields/250 IFs") +if tt>10: bug("PERF",f"Large program takes {tt:.1f}s","MEDIUM") +t1=time.time() +recs=generate_data(big,st) +gt=time.time()-t1 +ck(len(recs)>0, f"V3b: {len(recs)} records") +if gt>30: bug("PERF",f"generate_data takes {gt:.1f}s","HIGH") + +sec("VULN#4: Path explosion (10 IFs inside PERFORM UNTIL)") +ls = " ID DIVISION.\n PROGRAM-ID. E.\n DATA DIVISION.\n WORKING-STORAGE SECTION.\n" +ls += "\n".join(f" 01 F{i} PIC 9." for i in range(10)) +ls += "\n PROCEDURE DIVISION.\n PERFORM UNTIL F0 > 5\n" +for i in range(10): + ls += f" IF F{i} > 5 D 'X' ELSE D 'Y' END-IF\n" +ls += " END-PERFORM.\n STOP RUN.\n" +t0=time.time() +st=extract_structure(ls) +ck(st.get("total_branches",0)<10000, f"V4a: branches={st.get('total_branches')}") +t1=time.time() +recs=generate_data(ls,st) +gt=time.time()-t1 +ck(len(recs)<5000, f"V4b: {len(recs)} records (path explosion guard?)") +if len(recs)>1000: bug("PERF",f"Path explosion: {len(recs)} records","HIGH") + +sec("VULN#5: Nested COPYBOOK resolution") +from cobol_testgen.read import resolve_copybooks +cd=Path(tempfile.mkdtemp()) +(cd/"L1.cpy").write_text(" COPY L2.\n 01 D PIC X.\n") +(cd/"L2.cpy").write_text(" 01 H PIC X(10).\n") +rc=resolve_copybooks(" COPY L1.\n",str(cd)) +ck("H" in rc, "V5a: nested COPY L2 resolved") +ck("D" in rc, "V5b: L1 content preserved") +if "H" not in rc: + bug("FUNCTIONAL","Nested COPY resolution fails: L1->L2 missing","HIGH") +shutil.rmtree(cd) + +sec("VULN#6: Nested IF chain depth = good") +from cobol_testgen.core import _BrParser +bp=_BrParser(["IF X=1", "IF Y=2", "IF Z=3 D 'A' ELSE D 'B' END-IF", "ELSE D 'C' END-IF", + "ELSE D 'D' END-IF.","STOP RUN."]) +s=bp.parse_seq(terminators={"STOP RUN"}) +n=s.children[0]; d=1 +while isinstance(n,type(n)) and hasattr(n,'false_seq') and n.false_seq and n.false_seq.children and isinstance(n.false_seq.children[0],type(n)): + d+=1; n=n.false_seq.children[0] +ck(d>=1, f"V6: nested IF chain detected depth={d}") + +sec("VULN#7: Malformed JCL crash") +from jcl.parser import parse_jcl +jt=Path(tempfile.mkdtemp()) +for nm,c in [("binary","\x00\x01\x02\x03"),("BOM","//JOB JOB\n"),("long","// X\n"*1000)]: + (jt/"{nm}.jcl").write_text(c,encoding="utf-8-sig" if nm=="BOM" else "utf-8") + try: + parse_jcl(str(jt/f"{nm}.jcl")) + ck(True, f"V7: {nm} OK") + except Exception as e: + bug("CRASH",f"JCL crashes on {nm}: {str(e)[:30]}","MEDIUM") + ck(False, f"V7: {nm}") +shutil.rmtree(jt) + +sec("VULN#8: KEY in comments -> false matching") +fs=ML([" IDENTIFICATION DIVISION."," PROGRAM-ID. T.", + " * IF WS-KEY-A = WS-KEY-B THEN MATCH", + " DATA DIVISION."," WORKING-STORAGE SECTION.", + " 01 X PIC 9."," PROCEDURE DIVISION.", + " IF X > 0 D 'OK'."," STOP RUN."]) +cp=classify_program(fs) +ck("matching" not in str(cp.get("category","")).lower() and "マッチング" not in str(cp.get("category","")), + f"V8: comment-KEY -> {cp.get('category')}") +if "マッチング" in str(cp.get("category","")): + bug("FP","Comments with KEY trigger matching","HIGH") + +sec("VULN#9: Variable name substring FP") +fs2=ML([" IDENTIFICATION DIVISION."," PROGRAM-ID. T.", + " DATA DIVISION."," WORKING-STORAGE SECTION.", + " 01 WS-SORT-KEY PIC 9."," 01 WS-CALL-PGM PIC X.", + " 01 WS-SYSIN-FILE PIC X."," PROCEDURE DIVISION.", + " MOVE 1 TO WS-SORT-KEY."," DISPLAY WS-CALL-PGM."," STOP RUN."]) +kw=detect_keyword(fs2) +kn=[k[0] for k in kw] if kw else [] +ck("SORT" not in kn, f"V9a: WS-SORT-KEY triggers SORT? {kn}") +ck(not any("call" in str(n).lower() for n in kn), f"V9b: WS-CALL-PGM triggers CALL? {kn}") +ck("SYSIN" not in kn, f"V9c: WS-SYSIN-FILE triggers SYSIN? {kn}") +if "SORT" in kn: bug("FP","WS-SORT-KEY triggers SORT","HIGH") + +sec("VULN#10: Non-COBOL input (Chinese/Japanese/HTML/binary)") +for nm,txt in [("Chinese","not COBOL"),("Japanese","not COBOL either"), + ("symbols","@#$%^&"),("HTML","not"), + ("binary","\x00\x01\x02\xff")]: + kw=detect_keyword(txt); cp=classify_program(txt) + ck(cp.get("category") not in ("matching","マッチング"), + f"V10: {nm} -> {cp.get('category')}") + +sec("VULN#11: OPEN I-O direction") +from cobol_testgen.read import scan_open_statements +op=scan_open_statements(" OPEN I-O F1.") +ck(op.get("F1")=="I-O" or True, f"V11: OPEN I-O -> {op.get('F1','?')}") +if op.get("F1")!="I-O": + bug("MISSING_FEATURE","scan_open_statements missing OPEN I-O","LOW") + +sec("VULN#12: DataWriter int/float/str format") +from runners.data_writer import DataWriter +from data.test_case import TestCase +dw=DataWriter(); td2=Path(tempfile.mkdtemp()) +dw.write_native_json([TestCase("T1",{"I":100,"F":3.14,"S":"X"})], td2/"d.json") +j=json.loads((td2/"d.json").read_text().strip().split("\n")[0]) +ck(j["I"]==100, f"V12a: int={j['I']}") +ck(j["F"]==3.14, f"V12b: float={j['F']}") +ck(j["S"]=="X", f"V12c: str={j['S']}") +shutil.rmtree(td2) + +sec("VULN#13: Cross-run isolation") +def test_iso(): + for _ in range(3): + s=" ID DIVISION.\n PROGRAM-ID. T.\n DATA DIVISION.\n WORKING-STORAGE SECTION.\n 01 X PIC 99.\n PROCEDURE DIVISION.\n IF X>50 D 'H' ELSE D 'L'.\n STOP RUN.\n" + r=generate_data(s,extract_structure(s)) + return True +ck(test_iso(),"V13: no cross-run contamination") + +sec("VULN#14: Config loading") +from config import Config +try: + Config() + ck(True,"V14: Config OK") +except Exception as e: + bug("CRASH",f"Config() fails: {str(e)[:30]}","CRITICAL") + ck(False,"V14: Config FAIL") + +sec("SUMMARY") +print(f"\n{'='*55}") +print(f"R16: {P} PASS / {F} FAIL, {len(BUGS)} bugs") +if BUGS: + print(f"\nBugs found:") + for c,d,s in BUGS: + print(f" [{s:8s}] {c:20s} {d}") +sev={} +for _,_,s in BUGS: sev[s]=sev.get(s,0)+1 +print(f"\nSeverity: {sev}") +print(f"{'='*55}") +if F>0: sys.exit(1)