Initial commit: COBOL+JCL credit card billing system with COMP-3, OCCURS, COPY REPLACING, INSPECT, and JCL runner

This commit is contained in:
hsyx3952501
2026-05-25 12:27:00 +08:00
commit 8e551c35d9
17 changed files with 2080 additions and 0 deletions
+240
View File
@@ -0,0 +1,240 @@
"""
JCL Executor - executes parsed JCL steps.
Phase 1: sequential execution, COND on return codes,
DD mapping to files, SORT mapped to external sort utility.
"""
import os
import re
import subprocess
import sys
import tempfile
from pathlib import Path
from typing import Optional
from parser import Job, JobStep, DDEntry, CondParam, COND_OPS
COB_MAIN_DIR = r"D:\360安全浏览器下载\GC32-BDB-SP1-rename-7z-to-exe"
class Executor:
def __init__(self, root_dir: str, cobol_dir: str, copybook_dir: str):
self.root_dir = Path(root_dir).resolve()
self.cobol_dir = Path(cobol_dir).resolve()
self.copybook_dir = Path(copybook_dir).resolve()
self.bin_dir = self.root_dir / "bin"
self.bin_dir.mkdir(exist_ok=True)
self.last_rc: int = 0
self.step_rcs: dict[str, int] = {}
def _cobol_env(self) -> dict[str, str]:
"""Build environment dict for GnuCOBOL execution."""
env = os.environ.copy()
env["COB_MAIN_DIR"] = COB_MAIN_DIR
env["COB_CONFIG_DIR"] = os.path.join(COB_MAIN_DIR, "config")
env["COB_LIBRARY_PATH"] = os.path.join(COB_MAIN_DIR, "lib", "gnucobol")
env["COBCPY"] = str(self.copybook_dir)
cobbin = os.path.join(COB_MAIN_DIR, "bin")
env["PATH"] = cobbin + os.pathsep + env.get("PATH", "")
return env
def run(self, job: Job):
print(f"\n{'='*60}")
print(f"JOB: {job.job_name}")
print(f"{'='*60}")
for i, step in enumerate(job.steps):
self._execute_step(step, i)
print(f"\n{'='*60}")
print(f"JOB {job.job_name} COMPLETED")
print(f"STEPS: {len(job.steps)}, FINAL RC: {self.last_rc}")
print(f"{'='*60}")
return self.last_rc
def _execute_step(self, step: JobStep, idx: int):
print(f"\n--- STEP {idx+1}: {step.step_name} (PGM={step.program}) ---")
# COND check
if step.cond and not self._check_cond(step.cond):
print(f" COND: SKIPPED ({step.cond})")
return
program_upper = step.program.upper()
if program_upper == "SORT":
rc = self._run_sort(step)
else:
rc = self._run_cobol(step)
self.last_rc = rc
self.step_rcs[step.step_name] = rc
print(f" RC: {rc}")
def _check_cond(self, cond: CondParam) -> bool:
"""Return True if step should run, False if skipped."""
if cond.step_name:
target_rc = self.step_rcs.get(cond.step_name, 0)
else:
# Check all previous steps
for rc in self.step_rcs.values():
if COND_OPS.get(cond.operator, lambda x, y: False)(rc, cond.code):
return False
return True
if COND_OPS.get(cond.operator, lambda x, y: False)(target_rc, cond.code):
return False
return True
def _run_cobol(self, step: JobStep) -> int:
"""Compile (if needed) and execute a COBOL program."""
cbl_path = self.cobol_dir / f"{step.program}.cbl"
exe_path = self.bin_dir / f"{step.program}.exe"
# Compile if source newer than binary
if not exe_path.exists() or (
cbl_path.exists()
and os.path.getmtime(cbl_path) > os.path.getmtime(exe_path)
):
print(f" COMPILE: {cbl_path.name}")
result = subprocess.run(
["cobc", "-std=ibm", "-x", str(cbl_path), "-o", str(exe_path)],
capture_output=True, text=True, env=self._cobol_env(),
)
if result.returncode != 0:
print(f" COMPILE ERROR (RC={result.returncode}):")
print(result.stderr[:500])
return result.returncode
# Map DD entries to file paths
stdin_file = None
stdout_file = None
env_map = {}
for dd in step.dd_entries:
dd_name_upper = dd.dd_name.upper()
if dd_name_upper == "SYSOUT":
if dd.sysout == "*":
stdout_file = (
self.root_dir / "data" / "output" /
f"{step.step_name.lower()}_sysout.txt"
)
continue
if dd_name_upper == "SYSIN" and dd.inline_data:
# Write inline data to temp file
tmp = tempfile.NamedTemporaryFile(
mode="w", delete=False, suffix=".sysin", dir=self.root_dir / "data" / "work"
)
for line in dd.inline_data:
tmp.write(line + "\n")
tmp.close()
stdin_file = tmp.name
env_map[dd_name_upper] = stdin_file
continue
if dd.dsn:
# Map DSN to file path
file_path = self._resolve_dsn(dd.dsn, dd.disp)
env_map[dd_name_upper] = str(file_path)
# Execute COBOL program
print(f" EXECUTE: {exe_path.name}")
env = self._cobol_env()
env.update(env_map)
stdin = None
if stdin_file:
stdin = open(stdin_file, "r")
stdout = None
if stdout_file:
stdout = open(stdout_file, "w")
try:
result = subprocess.run(
[str(exe_path)],
stdin=stdin,
stdout=stdout,
stderr=subprocess.PIPE,
text=True,
env=env,
cwd=str(self.root_dir),
)
if result.stderr:
print(f" STDERR: {result.stderr[:200]}")
return result.returncode
finally:
if stdin:
stdin.close()
if stdout:
stdout.close()
def _run_sort(self, step: JobStep) -> int:
"""Execute SORT step (maps to GNU sort or PowerShell Sort-Object)."""
sortin = None
sortout = None
sort_fields = None
for dd in step.dd_entries:
dd_name_upper = dd.dd_name.upper()
if dd_name_upper == "SORTIN" and dd.dsn:
sortin = self._resolve_dsn(dd.dsn, dd.disp)
elif dd_name_upper == "SORTOUT" and dd.dsn:
sortout = self._resolve_dsn(dd.dsn, dd.disp)
elif dd_name_upper == "SYSIN" and dd.inline_data:
sort_text = " ".join(dd.inline_data).upper()
# Parse SORT FIELDS=(start,len,order,...)
match = re.search(
r"FIELDS=\s*\(([^)]+)\)", sort_text
)
if match:
sort_fields = match.group(1)
if not sortin or not sortout:
print(" ERROR: SORT requires SORTIN and SORTOUT DD")
return 12
if sort_fields:
# Parse sort fields for PowerShell Sort-Object
fields = sort_fields.split(",")
# fields: start,len,type,order,start2,len2,type2,order2
pscmd = f"Get-Content '{sortin}' | Sort-Object"
i = 0
first = True
while i + 3 < len(fields):
start = int(fields[i].strip()) - 1 # 0-based
length = int(fields[i + 1].strip())
order = fields[i + 3].strip() # skip type field (CH, PD, etc.)
ascending = order.upper() != "D"
if not first:
pscmd += ","
pscmd += (
f" {{$_.Substring({start},{length})}}"
f"{'' if ascending else ' -Descending'}"
)
first = False
i += 4
pscmd += f" | Set-Content '{sortout}' -Encoding Ascii"
else:
pscmd = f"Get-Content '{sortin}' | Sort-Object | Set-Content '{sortout}' -Encoding Ascii"
print(f" SORT CMD: {pscmd[:100]}...")
result = subprocess.run(
["powershell", "-NoProfile", "-Command", pscmd],
capture_output=True, text=True,
)
if result.returncode != 0:
print(f" SORT ERROR: {result.stderr[:200]}")
return result.returncode
def _resolve_dsn(self, dsn: str, disp: Optional[str] = None) -> Path:
"""Map z/OS DSN to Windows file path."""
# Handle GDG notation (simplified)
dsn = re.sub(r"\(\+?\d+\)", "", dsn).strip(".")
# If it's a z/OS DSN (no slashes, has dots as qualifiers), convert dots
if "/" not in dsn and "\\" not in dsn:
dsn = dsn.replace(".", "/")
path = (self.root_dir / dsn.lstrip("/")).resolve()
return path
+82
View File
@@ -0,0 +1,82 @@
"""
JCL Runner - Main entry point.
Usage: python main.py <jcl_file>
"""
import sys
import argparse
from pathlib import Path
from parser import parse_jcl
from executor import Executor
def main():
parser = argparse.ArgumentParser(
description="JCL Runner - Execute JCL scripts on Windows"
)
parser.add_argument("jcl_file", help="Path to JCL script")
parser.add_argument(
"--root",
default=".",
help="System root directory (default: current dir)",
)
parser.add_argument(
"--cobol-dir",
default="cobol",
help="COBOL source directory (relative to root)",
)
parser.add_argument(
"--copybook-dir",
default="copybooks",
help="COPYBOOK directory (relative to root)",
)
args = parser.parse_args()
root = Path(args.root).resolve()
cobol_dir = root / args.cobol_dir
copybook_dir = root / args.copybook_dir
# Validate paths
if not root.exists():
print(f"ERROR: Root directory not found: {root}")
sys.exit(1)
if not cobol_dir.exists():
print(f"ERROR: COBOL directory not found: {cobol_dir}")
sys.exit(1)
if not copybook_dir.exists():
print(f"ERROR: COPYBOOK directory not found: {copybook_dir}")
sys.exit(1)
# Parse JCL
jcl_path = Path(args.jcl_file)
if not jcl_path.exists():
print(f"ERROR: JCL file not found: {jcl_path}")
sys.exit(1)
print(f"Parsing JCL: {jcl_path}")
job = parse_jcl(str(jcl_path))
if not job:
print("ERROR: Failed to parse JCL (no JOB statement found)")
sys.exit(1)
print(f"Job: {job.job_name}, Steps: {len(job.steps)}")
for i, step in enumerate(job.steps):
cond_str = f" COND={step.cond}" if step.cond else ""
print(f" {i+1}. {step.step_name}: EXEC PGM={step.program}{cond_str}")
# Execute
executor = Executor(
root_dir=str(root),
cobol_dir=str(cobol_dir),
copybook_dir=str(copybook_dir),
)
rc = executor.run(job)
print(f"\nExit code: {rc}")
sys.exit(rc)
if __name__ == "__main__":
main()
+190
View File
@@ -0,0 +1,190 @@
"""
JCL Parser - parses JCL scripts into structured JobStep objects.
Phase 1: supports JOB, EXEC PGM=, DD, SYSOUT, SYSIN inline data,
COND=(code,op), * comments.
Phase 2+: PROC, GDG, COND with step names, EVEN/ONLY.
"""
import re
from dataclasses import dataclass, field
from typing import Optional
@dataclass
class DDEntry:
dd_name: str
dsn: Optional[str] = None
disp: Optional[str] = None
sysout: Optional[str] = None
inline_data: list[str] = field(default_factory=list)
unit: Optional[str] = None
space: Optional[str] = None
@dataclass
class CondParam:
code: int
operator: str # EQ, NE, GT, GE, LT, LE
step_name: Optional[str] = None # None means "any previous step"
@dataclass
class JobStep:
step_name: str
program: str
dd_entries: list[DDEntry] = field(default_factory=list)
cond: Optional[CondParam] = None
parm: Optional[str] = None
@dataclass
class Job:
job_name: str
steps: list[JobStep] = field(default_factory=list)
# COND operator mapping
COND_OPS = {
"EQ": lambda rc, code: rc == code,
"NE": lambda rc, code: rc != code,
"GT": lambda rc, code: rc > code,
"GE": lambda rc, code: rc >= code,
"LT": lambda rc, code: rc < code,
"LE": lambda rc, code: rc <= code,
}
def parse_jcl(filepath: str) -> Job:
"""Parse a JCL file into a Job object."""
with open(filepath, "r", encoding="utf-8") as f:
raw_lines = f.readlines()
# Continuation handling: lines ending with ',' continue on next line
lines = _merge_continuations(raw_lines)
job = None
current_step: Optional[JobStep] = None
current_dd: Optional[DDEntry] = None
in_sysin = False
sysin_lines: list[str] = []
for line in lines:
stripped = line.strip()
# Skip comments
if stripped.startswith("//*"):
continue
if not stripped:
continue
# Handle SYSIN inline data (lines after //SYSIN DD * until /*)
if in_sysin:
if stripped == "/*":
if current_dd:
current_dd.inline_data = sysin_lines
sysin_lines = []
in_sysin = False
current_dd = None
else:
sysin_lines.append(stripped)
continue
# Must start with //
if not stripped.startswith("//"):
continue
content = stripped[2:].strip()
# JOB statement: //jobname JOB ...
if re.search(r"\bJOB\b", content, re.IGNORECASE):
parts = stripped[2:].split(None, 2)
job_name = parts[0]
job = Job(job_name=job_name)
continue
# EXEC statement
match = re.match(r"(\w+)\s+EXEC\s+(?:PGM=)?(\w+)", content, re.IGNORECASE)
if match:
step_name = match.group(1)
program = match.group(2)
# Parse COND parameter
cond = None
cond_match = re.search(
r"COND=\s*\(\s*(\d+)\s*,\s*(\w+)", content, re.IGNORECASE
)
if cond_match:
code = int(cond_match.group(1))
op = cond_match.group(2).upper()
cond = CondParam(code=code, operator=op)
# Parse PARM parameter
parm = None
parm_match = re.search(r"PARM=\s*'([^']*)'", content, re.IGNORECASE)
if parm_match:
parm = parm_match.group(1)
current_step = JobStep(
step_name=step_name,
program=program,
cond=cond,
parm=parm,
)
if job:
job.steps.append(current_step)
continue
# DD statement
dd_match = re.match(r"(\w+)\s+DD\s*(.*)", content, re.IGNORECASE)
if dd_match and current_step is not None:
dd_name = dd_match.group(1)
dd_params = dd_match.group(2)
dd = DDEntry(dd_name=dd_name)
# Parse DSN
dsn_match = re.search(r"DSN=\s*([^\s,]+)", dd_params, re.IGNORECASE)
if dsn_match:
dd.dsn = dsn_match.group(1)
# Parse DISP
disp_match = re.search(
r"DISP=\s*\(?([^,\s)]+)(?:,([^,\s)]+))?(?:,([^,\s)]+))?\)?",
dd_params, re.IGNORECASE,
)
if disp_match:
dd.disp = disp_match.group(1)
# Parse SYSOUT
sysout_match = re.search(r"SYSOUT=\s*(\*|\w+)", dd_params, re.IGNORECASE)
if sysout_match:
dd.sysout = sysout_match.group(1)
# Check for SYSIN inline data
if dd_name.upper() == "SYSIN" and "*" in dd_params:
in_sysin = True
current_step.dd_entries.append(dd)
current_dd = dd
continue
return job
def _merge_continuations(lines: list[str]) -> list[str]:
"""Merge JCL continuation lines (lines ending with ',')."""
merged = []
buffer = ""
for line in lines:
stripped = line.rstrip("\n\r")
if buffer:
buffer += stripped
else:
buffer = stripped
# Check if line ends with continuation
if stripped.rstrip().endswith(",") and not stripped.strip().startswith("//*"):
continue
merged.append(buffer)
buffer = ""
if buffer:
merged.append(buffer)
return merged