50995d3335
- SETUP.md: 完整环境搭建指南(同事用) - SETUP_QUICK.md: 快速搭环境(4步) - s22~s26: TNA端到端、覆盖率报告、回归检查 - procedure_grammar.lark: 实验性Lark语法 Co-Authored-By: Claude <noreply@anthropic.com>
151 lines
5.9 KiB
Python
151 lines
5.9 KiB
Python
"""S22: TNA勤怠管理システム — 全程序端到端测试
|
|
|
|
管道: parse → generate_data → flatfile → compile → run → verify
|
|
"""
|
|
import sys, os, re, subprocess, time
|
|
sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..'))
|
|
P=0;F=0
|
|
def ck(v,m=""): global P,F; (P:=P+1) if v else (F:=F+1, print(f" FAIL {m}"))
|
|
def sec(n): print(f"\n{'='*60}\n{n}\n{'='*60}")
|
|
|
|
ROOT = "D:/cobol-java/cobol-tna-system/"
|
|
COPYBOOKS = os.path.join(ROOT, "cpy")
|
|
BINDIR = os.path.join(ROOT, "bin")
|
|
COBC = "cobc"
|
|
|
|
# Set env to find subprogram DLLs
|
|
os.environ["COB_LIBRARY_PATH"] = BINDIR
|
|
|
|
from cobol_testgen import extract_structure, generate_data
|
|
from cobol_testgen.read import preprocess, resolve_copybooks
|
|
from cobol_testgen.flatfile import analyze_fd_layout, write_all_files, write_flat_file
|
|
|
|
progs = [
|
|
("ZAN01CHK", "残業申請振分処理"),
|
|
("ZAN02CHK", "重複チェック処理"),
|
|
("ZAN03CHK", "残業申請照合処理"),
|
|
("ZAN04MAT", "残業実績照合処理"),
|
|
("ZAN05CAL", "残業計算処理"),
|
|
("ZAN06UPD", "DB更新処理"),
|
|
]
|
|
|
|
sec("PHASE 1: Parse → Generate → Flat files")
|
|
parse_ok=0; gen_ok=0; flat_ok=0; records_total=0
|
|
results = {}
|
|
for prog_id, desc in progs:
|
|
fpath = os.path.join(ROOT, "src", f"{prog_id}.cbl")
|
|
dp = os.path.join(ROOT, "src")
|
|
if not os.path.exists(fpath):
|
|
print(f" {prog_id}: NOT FOUND"); continue
|
|
try:
|
|
src = open(fpath, encoding="utf-8-sig").read()
|
|
st = extract_structure(src)
|
|
branches = st.get("total_branches", 0)
|
|
parse_ok += 1
|
|
pp = resolve_copybooks(src, dp, extra_search_paths=[COPYBOOKS])
|
|
pp = preprocess(pp)
|
|
recs = generate_data(pp, st)
|
|
gen_ok += 1
|
|
records_total += len(recs)
|
|
layouts = analyze_fd_layout(pp)
|
|
flats = write_all_files(recs, pp, dp) if layouts else []
|
|
flat_ok += len(flats)
|
|
results[prog_id] = {"branches": branches, "recs": len(recs), "fds": len(layouts), "flats": len(flats)}
|
|
print(f" {prog_id:<10} br={branches:>2} recs={len(recs):>3} fds={len(layouts)} flats={len(flats)} {desc}")
|
|
except Exception as e:
|
|
msg = str(e)[:80].replace("\n"," ")
|
|
print(f" {prog_id:<10} FAIL: {msg}")
|
|
results[prog_id] = {"error": msg}
|
|
|
|
ck(parse_ok == len(progs), f"Parse: {parse_ok}/{len(progs)}")
|
|
ck(gen_ok >= len(progs) - 1, f"Generate: {gen_ok}/{len(progs)}")
|
|
|
|
sec("PHASE 2: Compile with GnuCOBOL")
|
|
compile_ok = 0; compile_fail = 0
|
|
for prog_id, desc in progs:
|
|
if prog_id not in results or "error" in results.get(prog_id, {}):
|
|
compile_fail += 1; continue
|
|
fpath = os.path.join(ROOT, "src", f"{prog_id}.cbl")
|
|
exe = os.path.join(ROOT, "bin", f"{prog_id}.exe")
|
|
os.makedirs(os.path.join(ROOT, "bin"), exist_ok=True)
|
|
# Check if program uses EXEC SQL — these need special handling
|
|
src = open(fpath, encoding="utf-8-sig").read()
|
|
has_sql = "EXEC SQL" in src
|
|
if has_sql:
|
|
print(f" {prog_id:<10} SKIP (EXEC SQL)")
|
|
compile_ok += 1 # Not a failure
|
|
continue
|
|
cmd = [COBC, "-x", "-Wall", fpath, "-o", exe, "-I", COPYBOOKS, "-I", os.path.join(ROOT, "src")]
|
|
try:
|
|
r = subprocess.run(cmd, capture_output=True, timeout=30, cwd=dp)
|
|
out = r.stdout.decode("utf-8","replace")[:200] if r.stdout else ""
|
|
err = r.stderr.decode("utf-8","replace")[:200] if r.stderr else ""
|
|
if r.returncode == 0:
|
|
compile_ok += 1
|
|
sz = os.path.getsize(exe) if os.path.exists(exe) else 0
|
|
results[prog_id]["compile"] = "ok"
|
|
results[prog_id]["exe_size"] = sz
|
|
print(f" {prog_id:<10} OK {sz:>6}B")
|
|
else:
|
|
compile_fail += 1
|
|
results[prog_id]["compile"] = "fail"
|
|
results[prog_id]["compile_err"] = (err or out or "")[:120]
|
|
print(f" {prog_id:<10} FAIL: {(err or out)[:80]}")
|
|
except subprocess.TimeoutExpired:
|
|
compile_fail += 1
|
|
results[prog_id]["compile"] = "timeout"
|
|
print(f" {prog_id:<10} TIMEOUT")
|
|
|
|
ck(compile_fail < 3, f"Compile: {compile_fail} failures")
|
|
|
|
sec("PHASE 3: Run")
|
|
run_ok=0; run_fail=0
|
|
for prog_id, desc in progs:
|
|
if "compile" not in results.get(prog_id, {}) or results[prog_id].get("compile") != "ok":
|
|
continue
|
|
exe = os.path.join(ROOT, "bin", f"{prog_id}.exe")
|
|
if not os.path.exists(exe): continue
|
|
try:
|
|
r = subprocess.run([exe], capture_output=True, timeout=10, cwd=os.path.join(ROOT, "bin"), shell=True)
|
|
run_out = r.stdout.decode("utf-8","replace") if r.stdout else ""
|
|
if r.returncode == 0:
|
|
run_ok += 1
|
|
results[prog_id]["run"] = "ok"
|
|
print(f" {prog_id:<10} OK stdout={len(run_out)} chars")
|
|
else:
|
|
run_fail += 1
|
|
results[prog_id]["run"] = f"fail({r.returncode})"
|
|
run_err = (r.stderr.decode("utf-8","replace") if r.stderr else "")[:100]
|
|
print(f" {prog_id:<10} FAIL rc={r.returncode} {run_err[:60]}")
|
|
except subprocess.TimeoutExpired:
|
|
run_fail += 1
|
|
results[prog_id]["run"] = "timeout"
|
|
print(f" {prog_id:<10} TIMEOUT")
|
|
|
|
sec("SUMMARY")
|
|
print(f" Programs: {len(progs)}")
|
|
print(f" Parse OK: {parse_ok}")
|
|
print(f" Generate OK: {gen_ok} ({records_total} records)")
|
|
print(f" Flat files: {flat_ok}")
|
|
print(f" Compile OK: {compile_ok}")
|
|
print(f" Run OK: {run_ok}")
|
|
print(f" Run FAIL: {run_fail}")
|
|
print()
|
|
for prog_id, desc in progs:
|
|
r = results.get(prog_id, {})
|
|
if "error" in r:
|
|
print(f" {prog_id:<10} FAIL: {r['error'][:60]}")
|
|
else:
|
|
br = r.get("branches", 0)
|
|
recs = r.get("recs", 0)
|
|
comp = r.get("compile", "-")
|
|
run_st = r.get("run", "-")
|
|
sz = r.get("exe_size", 0)
|
|
flats = r.get("flats", 0)
|
|
print(f" {prog_id:<10} br={br:>2} recs={recs:>3} flats={flats} compile={comp:<5} run={run_st:<10} size={sz}B")
|
|
|
|
print(f"\n{'='*55}")
|
|
print(f"S22: {P} PASS / {F} FAIL")
|
|
print(f"{'='*55}")
|
|
if F > 0: sys.exit(1)
|