Files
cobol-java-v3/cobol_testgen/output.py
hangshuo652 7fb9304212 merge local cobol_testgen improvements into v3 shared modules
- cond.py: SQLCODE/SQLSTATE handling, alphanumeric >/< boundary fix
- output.py: termination tracking, db_input support, _is_field_assigned filter
- coverage.py: mark_from_gcov, THRU support, KeyError protection
- gcov.py: new file (dependency for coverage.py)
- grammar.lark: multi-segment PIC support
- read.py: SQL INCLUDE resolution, DECLARE TABLE parsing, * comment fix
- core.py: SQL parsing, blocked_names, keyword list
- design.py: multi-sentinel, THRU ranges, PERFORM VARYING last iteration
- __init__.py: local main() + v3 API functions, guarded imports

All 6 ZAN programs verified passing through v3 pipeline
2026-06-23 22:38:17 +08:00

162 lines
5.3 KiB
Python

"""输出层:JSON输出(按文件分组入出力 + 工作存储区分)"""
import json
from pathlib import Path
_INVERSE_OP = {'>': '<=', '<': '>=', '=': '<>', '>=': '<', '<=': '>'}
def _scenario_text(path_cons):
parts = []
for c in path_cons:
if len(c) != 4:
continue
field, op, val, want = c
if op == 'not_in':
desc = f"{field} not in {val}" if want else f"{field} in {val}"
elif not want:
desc = f"{field} {_INVERSE_OP.get(op, '?' + op)} {val}"
else:
desc = f"{field} {op} {val}"
parts.append(desc)
return ', '.join(parts)
def _write_json(entries, outpath):
if not entries:
return
outpath.parent.mkdir(parents=True, exist_ok=True)
with open(outpath, 'w', encoding='utf-8') as f:
json.dump(entries, f, ensure_ascii=False, indent=2)
def _is_field_assigned(fname, assigned_set, fields, fd_fields_lookup):
if not assigned_set:
return False
if fname in assigned_set:
return True
level_map = {}
name_order = []
for f in fields:
fn = f['name']
lv = f.get('level', 77)
level_map[fn] = lv
name_order.append((lv, fn))
flv = level_map.get(fname, 77)
ancestor = None
for lv, fn in name_order:
if fn == fname:
break
if lv < flv:
ancestor = fn
if ancestor and ancestor in assigned_set:
return True
return False
def output_json(records, outpath, roles=None, fd_fields=None, field_to_fd=None,
open_dir=None, term_types=None, db_input=None, data_fields=None):
outpath.parent.mkdir(parents=True, exist_ok=True)
if not roles:
out = []
for i, rec in enumerate(records):
entry = dict(rec)
entry['termination'] = (term_types or ['normal'] * len(records))[i]
out.append(entry)
obj = {'program': outpath.stem, 'records': out}
if db_input:
obj['db_input'] = db_input
with open(outpath, 'w', encoding='utf-8') as f:
json.dump(obj, f, ensure_ascii=False, indent=2)
return
term_types = term_types or ['normal'] * len(records)
out = []
for i, rec in enumerate(records):
inp = {}
out_exp = {}
ws = {}
if fd_fields and field_to_fd:
for fd_name, fds_set in fd_fields.items():
direction = (open_dir or {}).get(fd_name, '')
inp_block = {}
out_block = {}
assigned_set = rec.get('_assigned_fields', set())
for fname in fds_set:
if fname not in rec:
continue
r = roles.get(fname, 'unused')
val = rec[fname]
if direction in ('INPUT', 'I-O') and r in ('input', 'inout'):
inp_block[fname] = val
if direction in ('OUTPUT', 'I-O') and r in ('output', 'inout'):
if _is_field_assigned(fname, assigned_set, data_fields or [], fd_fields):
out_block[fname] = val
if inp_block:
inp[fd_name] = inp_block
if out_block:
out_exp[fd_name] = out_block
for name, val in rec.items():
if not field_to_fd or name not in field_to_fd:
ws[name] = val
entry = {
'input': inp,
'expected_output': out_exp,
'working_storage': {k: v for k, v in ws.items() if k != '_assigned_fields'},
'termination': term_types[i] if i < len(term_types) else 'normal',
}
out.append(entry)
obj = {'program': outpath.stem, 'records': out}
if db_input:
obj['db_input'] = db_input
_write_json(obj, outpath)
def output_input_files(records, outdir, stem, roles, fd_fields, field_to_fd, open_dir,
term_types=None):
term_types = term_types or ['normal'] * len(records)
input_fds = {}
for fd_name, fds_set in fd_fields.items():
direction = (open_dir or {}).get(fd_name, '')
if direction not in ('INPUT', 'I-O'):
continue
has_input = any(roles.get(fname, 'unused') in ('input', 'inout') for fname in fds_set)
if not has_input:
continue
input_fds[fd_name] = fds_set
if not input_fds:
return
outdir.mkdir(parents=True, exist_ok=True)
for fd_name, fds_set in input_fds.items():
normals = []
abends = []
direction = (open_dir or {}).get(fd_name, '')
for i, rec in enumerate(records):
term = term_types[i] if i < len(term_types) else 'normal'
fd_rec = {}
for fname in fds_set:
r = roles.get(fname, 'unused')
if direction in ('INPUT', 'I-O') and r in ('input', 'inout'):
if fname in rec:
fd_rec[fname] = rec[fname]
if fd_rec:
if term == 'abend':
abends.append(fd_rec)
else:
normals.append(fd_rec)
if normals:
_write_json(normals, outdir / f'{stem}_{fd_name}.json')
if abends:
_write_json(abends, outdir / f'{stem}_abend_{fd_name}.json')