v3: gstack-code-gen 生成

This commit is contained in:
hangshuo652
2026-05-24 12:36:44 +08:00
commit 818e81269c
50 changed files with 1343 additions and 0 deletions
View File
+30
View File
@@ -0,0 +1,30 @@
def align_records(cobol_records: list[dict], java_records: list[dict],
key_field: str = "CUST-ID") -> list[tuple]:
if not cobol_records and not java_records:
return []
def _by(records, kf):
d = {}
for r in records:
key = str(r.get(kf, "__NONE__"))
d.setdefault(key, []).append(r)
return d
c_by = _by(cobol_records, key_field)
j_by = _by(java_records, key_field)
pairs = []
all_keys = set(c_by) | set(j_by)
for k in sorted(all_keys):
c_items = c_by.get(k, [])
j_items = j_by.get(k, [])
for i in range(max(len(c_items), len(j_items))):
c = c_items[i] if i < len(c_items) else None
j = j_items[i] if i < len(j_items) else None
if c and j:
pairs.append((c, j, "MATCHED"))
elif c:
pairs.append((c, None, "MISSING_IN_SPARK"))
else:
pairs.append((None, j, "EXTRA_IN_SPARK"))
return pairs
+43
View File
@@ -0,0 +1,43 @@
import struct
from pathlib import Path
from data.field_tree import FieldTree
class CobolBinaryReader:
def read(self, path: str, tree: FieldTree) -> list[dict]:
d = Path(path).read_bytes()
rs = self._record_size(tree)
if rs == 0 or len(d) == 0:
return []
return [self._parse(d[o:o + rs], tree) for o in range(0, len(d), rs) if len(d[o:o + rs]) >= rs]
def _record_size(self, tree):
return max((f.offset + f.length for f in tree.fields), default=0)
def _parse(self, r, tree):
out = {}
for n, f in tree.flatten().items():
if f.length == 0 or f.offset + f.length > len(r):
continue
raw = r[f.offset:f.offset + f.length]
if f.usage == "COMP-3":
out[n] = self._comp3(raw, f.signed, f.decimal)
elif f.usage in ("COMP", "COMP-5"):
out[n] = int.from_bytes(raw, "big", signed=f.signed)
else:
out[n] = raw.decode("ascii", errors="replace").strip()
return out
def _comp3(self, raw, signed, dec):
if not raw:
return "0"
nib = []
for b in raw:
nib.append((b >> 4) & 0xF)
nib.append(b & 0xF)
s = nib.pop()
v = sum(n * (10 ** (len(nib) - i)) for i, n in zip(range(len(nib)), nib))
if signed and s in (0xD, 0xB):
v = -v
d = 10 ** dec
return f"{float(v) / d:.{dec}f}" if dec else str(v)
+64
View File
@@ -0,0 +1,64 @@
from data.diff_result import FieldResult
from decimal import Decimal, InvalidOperation
DEFAULT_TOLERANCE = 0.01
def compare_field(name: str, c: str, j: str, field_type: str = "decimal",
tolerance: float = DEFAULT_TOLERANCE) -> FieldResult:
fr = FieldResult(field_name=name, cobol_value=c, java_value=j)
if field_type in ("decimal", "numeric"):
return _numeric(fr, c, j, tolerance)
if field_type == "date":
return _date(fr, c, j)
if field_type == "string":
return _string(fr, c, j)
fr.status = "PASS" if c == j else "MISMATCH"
return fr
def _numeric(fr, c, j, tol):
cv = _num(c)
jv = _num(j)
if cv is None or jv is None:
fr.status = "NOT_SET" if cv is None and jv is None else (
"MISMATCH" if jv is None else "NOT_SET")
return fr
if cv == jv:
fr.status = "PASS"
return fr
diff = abs(float(cv - jv))
if diff <= tol:
fr.status = "TOLERATED"
fr.tolerance_applied = tol
else:
fr.status = "MISMATCH"
return fr
def _date(fr, c, j):
def _norm(v):
v = v.strip()
if len(v) == 8 and v.isdigit():
return f"{v[:4]}-{v[4:6]}-{v[6:8]}"
return v
fr.status = "PASS" if _norm(c) == _norm(j) else "MISMATCH"
return fr
def _string(fr, c, j):
fr.status = "PASS" if (c or "").strip() == (j or "").strip() else "MISMATCH"
return fr
def _num(v):
if v is None or v == "None":
return None
s = str(v).replace("\x00", "").strip()
if s == "":
return Decimal("0")
try:
return Decimal(s)
except InvalidOperation:
return None
+72
View File
@@ -0,0 +1,72 @@
from dataclasses import dataclass
EBCDIC_037 = {
0x40: ' ', 0x4B: '.', 0x4C: '<', 0x4D: '(', 0x4E: '+', 0x5A: '!', 0x5B: '$',
0x5C: '*', 0x5D: ')', 0x5E: ';', 0x60: '-', 0x61: '/', 0x6B: ',', 0x6C: '%',
0x6D: '_', 0x6E: '>', 0x6F: '?', 0x7A: ':', 0x7B: '#', 0x7C: '@', 0x7D: "'",
0x7E: '=', 0x7F: '"',
0x81: 'a', 0x82: 'b', 0x83: 'c', 0x84: 'd', 0x85: 'e', 0x86: 'f', 0x87: 'g',
0x88: 'h', 0x89: 'i', 0x91: 'j', 0x92: 'k', 0x93: 'l', 0x94: 'm', 0x95: 'n',
0x96: 'o', 0x97: 'p', 0x98: 'q', 0x99: 'r', 0xA2: 's', 0xA3: 't', 0xA4: 'u',
0xA5: 'v', 0xA6: 'w', 0xA7: 'x', 0xA8: 'y', 0xA9: 'z',
0xC1: 'A', 0xC2: 'B', 0xC3: 'C', 0xC4: 'D', 0xC5: 'E', 0xC6: 'F', 0xC7: 'G',
0xC8: 'H', 0xC9: 'I', 0xD1: 'J', 0xD2: 'K', 0xD3: 'L', 0xD4: 'M', 0xD5: 'N',
0xD6: 'O', 0xD7: 'P', 0xD8: 'Q', 0xD9: 'R', 0xE2: 'S', 0xE3: 'T', 0xE4: 'U',
0xE5: 'V', 0xE6: 'W', 0xE7: 'X', 0xE8: 'Y', 0xE9: 'Z',
0xF0: '0', 0xF1: '1', 0xF2: '2', 0xF3: '3', 0xF4: '4', 0xF5: '5',
0xF6: '6', 0xF7: '7', 0xF8: '8', 0xF9: '9',
}
@dataclass
class CobolIRField:
raw_hex: str; decoded_value: str; encoding: str
field_type: str; length: int; scale: int; signed: bool
@dataclass
class JavaIRField:
raw_value: str; decoded_value: str; field_type: str; nullable: bool
@dataclass
class IRRecord:
field_name: str
cobol: CobolIRField | None = None
java: JavaIRField | None = None
class Normalizer:
def normalize_encoding(self, raw: bytes, encoding: str) -> str:
if encoding == "EBCDIC":
return "".join(EBCDIC_037.get(b, chr(b) if 32 <= b < 127 else "?") for b in raw)
return raw.decode("ascii", errors="replace")
def normalize_comp3(self, raw: bytes) -> str:
if not raw:
return "0"
nibbles = []
for b in raw:
nibbles.append((b >> 4) & 0x0F)
nibbles.append(b & 0x0F)
sign = nibbles.pop()
v = 0
for n in nibbles:
v = v * 10 + (n if n <= 9 else 0)
if sign in (0x0D, 0x0B):
v = -v
return str(v)
def normalize_date(self, s: str) -> str:
s = s.strip()
if len(s) == 8 and s.isdigit():
return f"{s[:4]}-{s[4:6]}-{s[6:8]}"
return s
def to_ir_record(self, name, hex_, val, enc, ft, length=0, scale=0, signed=False):
return IRRecord(name, CobolIRField(hex_, val, enc, ft, length, scale, signed))
def to_null_ir(self, name, side="java"):
if side == "java":
return IRRecord(name, java=JavaIRField("", "", "null", True))
return IRRecord(name, java=JavaIRField("", "", "null", True))
+31
View File
@@ -0,0 +1,31 @@
from dataclasses import dataclass
from decimal import Decimal, InvalidOperation
@dataclass
class RoundingResult:
mode: str = "EXACT"; confidence: float = 1.0; suggestion: str = ""
def detect_rounding(c: str, j: str) -> RoundingResult:
cv = _d(c)
jv = _d(j)
if cv is None or jv is None:
return RoundingResult(mode="UNKNOWN", confidence=0, suggestion="parse error")
if cv == jv:
return RoundingResult()
diff = abs(float(cv - jv))
mag = max(abs(float(cv)), abs(float(jv)), 1)
rel = diff / mag
if diff < 2:
return RoundingResult("TRUNCATE", 0.6, f"Likely TRUNCATE, diff={diff}")
if diff < 100:
return RoundingResult("ROUNDING", 0.4, f"Possible rounding, diff={diff}")
return RoundingResult("SIGNIFICANT", 0.9, f"Significant diff={diff}")
def _d(v):
try:
return Decimal(str(v).strip())
except:
return None