"""Worker process — polls task queue, executes run_pipeline().""" import json, sys, time, shutil from pathlib import Path sys.path.insert(0, str(Path(__file__).parent.parent)) TASKS_DIR = Path("tasks") def main(): print("Worker started. Watching tasks/ ...") while True: for tf in sorted(TASKS_DIR.glob("*.json")): try: data = json.loads(tf.read_text()) if data.get("status") != "queued": continue data["status"] = "running" tf.write_text(json.dumps(data)) from config import Config from orchestrator import run_pipeline cfg = Config() cfg.runner_mode = data.get("runner", "native") if cfg.runner_mode == "spark" and not shutil.which("spark-submit"): data["status"] = "blocked" data["result"] = "spark-submit not installed" tf.write_text(json.dumps(data)) continue vr = run_pipeline(cfg, data["copybook"], data["cobol_src"], data["java_src"], data["mapping"]) data["status"] = "done" data["result"] = { "program": vr.program, "status": vr.status, "matched": vr.fields_matched, "mismatched": vr.fields_mismatched, "duration": vr.duration_s, "runner": vr.runner, } tf.write_text(json.dumps(data)) except Exception as e: data = json.loads(tf.read_text()) if tf.exists() else {} data["status"] = "error" data["result"] = str(e)[:500] tf.write_text(json.dumps(data)) time.sleep(2) if __name__ == "__main__": main()