fix: gcov Windows本地化 + runner合并 + 决策覆盖率修复

- gcov从WSL 15.2改为Windows MinGW 9.2.0,消除.gcno格式不匹配
- coverage.py去除leaf_stats复位bug(决策覆盖率恢复)
- runner.py合并V3编译+SOURCE分组执行+出力保存
- __init__.py gcov条件/outdir解析修复
This commit is contained in:
hangshuo652
2026-07-02 21:26:51 +08:00
parent b3d1643220
commit 40e8a50ab4
6 changed files with 635 additions and 21 deletions
+5 -2
View File
@@ -288,6 +288,9 @@ def main():
if '--gcov' in args:
gcov_mode = True
args.remove('--gcov')
if not _HAVE_RUNNER:
logger.warning("--gcov: runner.py not found. Compile/run will be skipped. "
"Use --gcov without runner only generates test data + static coverage.")
i = 0
while i < len(args):
if args[i] == '--temp-dir':
@@ -309,7 +312,7 @@ def main():
outdir = None
for a in args:
p = Path(a)
if p.is_dir():
if p.is_dir() or (not p.suffix and p.parent.exists()):
outdir = p
elif p.suffix.upper() in ('.CBL', '.COB', '.CPY'):
cobol_files.append(p)
@@ -553,7 +556,7 @@ def main():
data_fields=fields_dict, select_info=select_info)
gcov_data = None
if gcov_mode and proc_div and _HAVE_GCOV:
if gcov_mode and proc_div and _HAVE_GCOV and _HAVE_RUNNER:
_temp = temp_dir or str(outdir / '.gcov_cache')
source_dir = str(filepath.parent)
expected_records: list[dict] = [{}] * len(records)
+10 -3
View File
@@ -1402,15 +1402,22 @@ def propagate_assignments(rec, assignments, fields, file_sec=None):
if pi.get('type') == 'numeric':
digits = pi.get('digits', 0)
decimal = pi.get('decimal', 0)
total = digits + decimal
s = str(val)
neg = s.startswith('-')
if neg:
s = s[1:]
# Handle explicit decimal point (e.g. "008." or "12.34")
if '.' in s:
parts = s.split('.')
int_part = parts[0]
dec_part = parts[1] if len(parts) > 1 else ''
else:
total = digits + decimal
s = s.zfill(total)
int_part = s[:digits] if digits else '0'
dec_part = s[digits:] if decimal > 0 else '0'
result = float(int(int_part or '0') + int(dec_part or '0') / (10 ** decimal))
dec_part = s[digits:] if decimal > 0 else ''
dec_val = int(dec_part.ljust(decimal, '0')[:decimal] or '0') if decimal > 0 else 0
result = float(int(int_part or '0') + dec_val / (10 ** decimal))
return -result if neg else result
try:
return float(val)
+1 -3
View File
@@ -1199,9 +1199,7 @@ def run_coverage(branch_tree, branch_paths_with_assigns, fields,
if gcov_data:
mark_from_gcov(decision_points, gcov_data, branch_tree)
for ls in leaf_stats:
ls.covered_true = False
ls.covered_false = False
# leaf_stats 保留静态分析结果(gcov 无 -b 时不提供叶条件级别的分支数据)
_source_note = ''
if gcov_data:
+14 -6
View File
@@ -18,7 +18,7 @@ def parse_cbl_gcov(gcov_path: str) -> dict[int, int]:
-: 17: 源码行 → 不可执行(注释/声明行,跳过)
"""
counts = {}
with open(gcov_path, encoding='utf-8') as f:
with open(gcov_path, encoding='utf-8', errors='replace') as f:
for line in f:
m = re.match(r'^\s*(#####|\d+\*?|-):\s*(\d+):', line)
if not m:
@@ -35,7 +35,10 @@ def parse_cbl_gcov(gcov_path: str) -> dict[int, int]:
def run_gcov(program_name: str, work_dir: str) -> dict[int, int]:
"""在 work_dir 中通过 WSL 执行 gcov 并解析 COBOL 行计数。
"""在 work_dir 中执行 gcov 并解析 COBOL 行计数。
使用 Windows 本地的 gcov(与 GnuCOBOL 内置 MinGW 同版本)。
不要通过 WSL 调用 gcov,否则 .gcno/.gcda 版本不匹配。
Args:
program_name: 程序名(不含扩展名),如 "ALLCMDS"
@@ -44,21 +47,26 @@ def run_gcov(program_name: str, work_dir: str) -> dict[int, int]:
Returns:
{COBOL行号: 执行次数} 字典。失败时返回空 dict。
"""
wsl_work = _wsl_path(work_dir)
cmd = ['wsl', 'sh', '-c', f'cd {wsl_work} && gcov {program_name}.c']
try:
result = subprocess.run(
cmd,
['gcov', f'{program_name}.cbl'],
capture_output=True, text=True,
encoding='utf-8', errors='replace',
timeout=30,
cwd=work_dir,
)
except FileNotFoundError:
logger.warning("gcov 命令未找到(未安装 MinGW gcov 或不在 PATH 中)")
return {}
if result.returncode != 0:
logger.warning(f"gcov 失败 (exit={result.returncode}): {result.stderr.strip()}")
return {}
cbl_gcov = Path(work_dir) / f'{program_name}.cbl.gcov'
if not cbl_gcov.exists():
logger.warning(f"gcov 输出不存在: {cbl_gcov}")
cbl_gcov = Path(work_dir) / f'{program_name}.gcov'
if not cbl_gcov.exists():
logger.warning(f"gcov 输出不存在 (tried .cbl.gcov / .gcov)")
return {}
gcov_data = parse_cbl_gcov(str(cbl_gcov))
+504
View File
@@ -0,0 +1,504 @@
"""非DB COBOL程序的编译·执行·验证层(被 __init__.py 的 --gcov/--run 调用)
V3 编译方式(Windows 本地 cobc+ SOURCE 分组执行+字段对比+出力保存。
"""
import logging
import os
import re
import shutil
import subprocess
from dataclasses import dataclass, field
from pathlib import Path
from . import file_io
logger = logging.getLogger(__name__)
# ── 数据模型 ──
@dataclass
class GroupInfo:
"""一组执行用例(SOURCE 版)"""
name: str
records: list[dict]
expected_outputs: list[dict]
expected_returncode: int = 0
fd_field_dicts: dict = field(default_factory=dict)
open_dir: dict = field(default_factory=dict)
select_info: dict = field(default_factory=dict)
overlap_mask: list[bool] = field(default_factory=list)
multi_write_fds: set = field(default_factory=set)
@dataclass
class CompareDetail:
"""单字段对比结果(兼容 __init__.py 的 DetailItem 接口)"""
field: str
expected: str
actual: str
match: bool
@dataclass
class GroupResult:
"""单组执行+对比结果"""
name: str
returncode: int
passed: bool
details: list[CompareDetail] = field(default_factory=list)
error_message: str = ''
# ── 路径解析(V3)──
def _resolve_sub_dir(source_dir: str) -> str:
p = Path(source_dir).resolve()
for d in [p.parent / 'sub', p / 'sub']:
if d.is_dir():
return str(d)
return str(p.parent / 'sub')
def _resolve_cpy_dir(source_dir: str) -> str:
p = Path(source_dir).resolve()
for d in [p.parent / 'cpy', p / 'cpy']:
if d.is_dir():
return str(d)
return str(p.parent / 'cpy')
def _input_assign_names(select_info: dict, open_dir: dict,
fd_fields: dict) -> list[str]:
names = []
seen = set()
for fd_name in fd_fields:
direction = (open_dir or {}).get(fd_name, '')
if direction not in ('INPUT', 'I-O'):
continue
assign = select_info.get(fd_name, {}).get('assign', '')
if assign and assign not in seen:
seen.add(assign)
names.append(assign)
return names
def _output_assign_names(select_info: dict, open_dir: dict,
fd_fields: dict) -> list[str]:
names = []
seen = set()
for fd_name in fd_fields:
direction = (open_dir or {}).get(fd_name, '')
if direction not in ('OUTPUT', 'I-O'):
continue
assign = select_info.get(fd_name, {}).get('assign', '')
if assign and assign not in seen:
seen.add(assign)
names.append(assign)
return names
# ── SUB 模块编译(V3)──
def compile_sub_modules(sub_dir: str, work_dir: str,
cpy_dir: str | None = None) -> list[str]:
sub_path = Path(sub_dir)
work_path = Path(work_dir)
work_path.mkdir(parents=True, exist_ok=True)
o_files = []
if not sub_path.is_dir():
logger.warning(f" SUB目录不存在: {sub_dir}")
return o_files
for cbl in sorted(sub_path.glob('*.cbl')):
o_path = work_path / f'{cbl.stem}.o'
gcno_path = work_path / f'{cbl.stem}.gcno'
if o_path.exists() and gcno_path.exists():
o_files.append(str(o_path))
continue
cmd = ['cobc', '-c', '-g', '--coverage', '-o', str(o_path)]
if cpy_dir:
cmd.extend(['-I', cpy_dir])
cmd.append(str(cbl))
logger.info(f" SUB: {cbl.name}")
orig = os.getcwd()
try:
os.chdir(str(work_path))
r = subprocess.run(cmd, capture_output=True, text=True, timeout=120)
finally:
os.chdir(orig)
if r.returncode != 0:
logger.warning(f" SUB编译失败 {cbl.name}: {r.stderr.strip()[:200]}")
continue
o_files.append(str(o_path))
return o_files
def compile_program(program_name: str, source_dir: str, work_dir: str,
sub_objects: list[str],
cpy_dir: str | None = None) -> str:
work_path = Path(work_dir)
work_path.mkdir(parents=True, exist_ok=True)
src = Path(source_dir) / f'{program_name}.cbl'
if not src.exists():
raise FileNotFoundError(f"源文件不存在: {src}")
exe = work_path / f'{program_name}.exe'
cmd = ['cobc', '-x', '-g', '--coverage', '-o', str(exe)]
if cpy_dir:
cmd.extend(['-I', cpy_dir])
cmd.append(str(src))
cmd.extend(sub_objects)
logger.info(f" LINK: {program_name}.exe")
orig = os.getcwd()
try:
os.chdir(str(work_path))
r = subprocess.run(cmd, capture_output=True, text=True, timeout=120)
finally:
os.chdir(orig)
if r.returncode != 0:
raise RuntimeError(f"编译失败 {program_name}: {r.stderr.strip()[:500]}")
return str(exe)
# ── 目录管理(SOURCE)──
def _clean_gcda(temp_dir: str):
for f in Path(temp_dir).glob('*.gcda'):
try:
f.unlink()
except OSError:
pass
# ── 工具函数(SOURCE)──
def _build_fd_field_dicts(fd_fields: dict, fields_dict: list) -> dict:
name_map = {f['name']: f for f in fields_dict}
result = {}
for fd_name, names in fd_fields.items():
result[fd_name] = []
for n in names:
if n in name_map:
pi = name_map[n].get('pic_info', {})
if pi.get('type') == 'unknown':
continue
result[fd_name].append(name_map[n])
return result
def _name_to_field(fields_dict: list[dict]) -> dict:
return {f['name']: f for f in fields_dict
if not f.get('is_88') and not f.get('is_filler')}
# ── 读取出力(SOURCE)──
def read_outputs(fd_field_dicts: dict, open_dir: dict,
select_info: dict, temp_dir: str) -> dict[str, list[dict]]:
result = {}
for fd_name, fds in fd_field_dicts.items():
direction = open_dir.get(fd_name, '')
if direction not in ('OUTPUT', 'I-O'):
continue
sel = select_info.get(fd_name, {})
if isinstance(sel, dict):
assign = sel.get('assign', fd_name)
org = sel.get('organization', 'SEQUENTIAL')
rec_mode = sel.get('recording_mode', 'F')
else:
assign = sel
org = 'SEQUENTIAL'
rec_mode = 'F'
outpath = os.path.join(temp_dir, assign)
if not os.path.exists(outpath):
logger.warning(f" 出力文件不存在: {outpath}")
continue
line_seq = (org == 'LINE SEQUENTIAL')
try:
records = file_io.read_output_file(
outpath, fds, line_sequential=line_seq, recording_mode=rec_mode
)
result[fd_name] = records
except Exception as e:
logger.warning(f" 读取出力文件失败 {outpath}: {e}")
continue
return result
# ── 对比(SOURCE)──
def compare_outputs(actual: list[dict], expected: list[dict],
fd_fields: list[dict],
subset_match: bool = False) -> tuple[bool, list[CompareDetail]]:
all_pass = True
details = []
if subset_match:
for exp in expected:
for fd in fd_fields:
fname = fd['name']
expected_val = exp.get(fname, '')
if not expected_val:
continue
found = any(
act.get(fname, '') == expected_val
for act in actual
)
if not found:
all_pass = False
details.append(CompareDetail(
field=fname,
expected=expected_val,
actual='(not found in actual)',
match=False,
))
return all_pass, details
if len(actual) != len(expected):
logger.warning(f" compare_outputs: actual={len(actual)} vs expected={len(expected)}")
for i, (act, exp) in enumerate(zip(actual, expected)):
for fd in fd_fields:
fname = fd['name']
actual_val = act.get(fname, '')
expected_val = exp.get(fname, '')
match = (actual_val == expected_val)
if not match:
all_pass = False
details.append(CompareDetail(
field=f'{fname}[{i}]',
expected=expected_val,
actual=actual_val,
match=match,
))
if len(actual) != len(expected):
all_pass = False
details.append(CompareDetail(
field='record_count',
expected=str(len(expected)),
actual=str(len(actual)),
match=False,
))
return all_pass, details
# ── 单组执行(SOURCE 改 - Native)──
def run_group(group: GroupInfo, exe_path: str, temp_dir: str) -> GroupResult:
logger.info(f" 执行组: {group.name} ({len(group.records)} 条记录)")
exe = Path(exe_path).resolve()
work_dir = Path(temp_dir).resolve()
orig = os.getcwd()
try:
os.chdir(str(work_dir))
result = subprocess.run(
[str(exe)], capture_output=True, text=True,
encoding='utf-8', errors='replace', timeout=60,
)
except subprocess.TimeoutExpired:
return GroupResult(name=group.name, returncode=-1, passed=False,
error_message='timeout')
finally:
os.chdir(orig)
rc = result.returncode
all_pass = (rc == group.expected_returncode)
all_details = []
if rc != group.expected_returncode:
all_details.append(CompareDetail(
field='returncode',
expected=str(group.expected_returncode),
actual=str(rc),
match=False,
))
return GroupResult(
name=group.name,
returncode=rc,
passed=all_pass,
details=all_details,
)
# ── 主编排 ──
def run_all(program_name: str, outdir: str, temp_dir: str,
fields_dict: list[dict], fd_fields: dict,
select_info: dict, open_dir: dict,
term_types: list[str], records: list[dict],
expected_records: list[dict] | None = None,
source_dir: str | None = None,
path_infos: list | None = None,
multi_write_fds: set | None = None
) -> list[GroupResult]:
"""完整编排:编译 → 准备目录 → 逐组执行 → 出力保存。"""
source_dir = source_dir or str(Path(outdir).parent)
work_dir = Path(temp_dir)
work_dir.mkdir(parents=True, exist_ok=True)
expected = expected_records or []
path_infos = path_infos or []
multi_write_fds = multi_write_fds or set()
fd_field_dicts = _build_fd_field_dicts(fd_fields, fields_dict)
# ── 1. SUB 编译(V3)──
sub_dir = _resolve_sub_dir(source_dir)
cpy_dir = _resolve_cpy_dir(source_dir)
sub_o = compile_sub_modules(sub_dir, str(work_dir), cpy_dir)
# ── 2. 主程序编译(V3)──
exe_path = compile_program(
program_name, source_dir, str(work_dir), sub_o, cpy_dir
)
# ── 3. 入力ファイル配置(V3)──
input_dir = Path(outdir) / 'input'
assign_names = _input_assign_names(select_info, open_dir, fd_fields)
if input_dir.is_dir():
for assign in assign_names:
src = input_dir / assign
if src.exists():
shutil.copy2(str(src), str(work_dir / assign))
logger.info(f" INPUT: {assign} ({src.stat().st_size} bytes)")
# ── 4. 清理旧 gcda + 执行 ──
_clean_gcda(str(work_dir))
def _is_output_fd(fd_name: str) -> bool:
dir_val = open_dir.get(fd_name, '')
return dir_val in ('OUTPUT', 'I-O')
# output_input_files 只写入非 abend 记录,同步过滤 expected
filtered_expected = []
for i, rec in enumerate(expected):
term = term_types[i] if i < len(term_types) else 'normal'
if term != 'abend':
filtered_expected.append(rec)
expected = filtered_expected
group = GroupInfo(
name=program_name,
records=records,
expected_outputs=expected,
expected_returncode=0,
fd_field_dicts=fd_field_dicts,
open_dir=open_dir,
select_info=select_info,
multi_write_fds=multi_write_fds,
)
r = run_group(group, exe_path, str(work_dir))
results = [r]
# 出力拷贝到 output_dir
output_dir = Path(outdir) / 'output'
output_dir.mkdir(parents=True, exist_ok=True)
for fd_name in fd_field_dicts:
if not _is_output_fd(fd_name):
continue
sel = select_info.get(fd_name, {})
assign = sel.get('assign', fd_name) if isinstance(sel, dict) else fd_name
src = os.path.join(str(work_dir), assign)
dst = output_dir / assign
if os.path.exists(src):
shutil.copy2(src, str(dst))
status = '' if r.passed else ''
logger.info(f"'{group.name}': returncode={r.returncode}, "
f"{len(r.details)} fields, {status}")
logger.info(f" EXIT={r.returncode}, 出力パス={work_dir}")
return results
# ── run_and_compare(被 --run 调用,SOURCE 兼容)──
def run_and_compare(program_name: str, outdir: str,
fields_dict: list[dict], fd_fields: dict,
select_info: dict, open_dir: dict,
term_types: list[str], records: list[dict]) -> dict:
"""旧版接口兼容包装。返回 {normal_pass, normal_count, ...}。"""
fd_field_dicts = _build_fd_field_dicts(fd_fields, fields_dict)
normal_recs = []
abend_recs = []
for i, term in enumerate(term_types):
if term == 'abend' and i < len(records):
abend_recs.append(records[i])
elif i < len(records):
normal_recs.append(records[i])
result = {
'normal_pass': False, 'normal_count': 0,
'abend_pass': 0, 'abend_total': len(abend_recs),
'output_summary': {},
}
temp_dir = os.path.join(outdir, '.run_cache')
source_dir = os.path.join(outdir, '..', 'input')
work_dir = Path(temp_dir)
work_dir.mkdir(parents=True, exist_ok=True)
_clean_gcda(temp_dir)
sub_dir = _resolve_sub_dir(source_dir)
cpy_dir = _resolve_cpy_dir(source_dir)
sub_o = compile_sub_modules(sub_dir, temp_dir, cpy_dir)
exe_path = compile_program(program_name, source_dir, temp_dir, sub_o, cpy_dir)
if normal_recs:
group = GroupInfo(
name='normal', records=normal_recs, expected_outputs=[],
fd_field_dicts=fd_field_dicts, open_dir=open_dir,
select_info=select_info,
)
r = run_group(group, exe_path, temp_dir)
result['normal_pass'] = (r.returncode == 0)
result['normal_returncode'] = r.returncode
for fd_name in fd_field_dicts:
sel = select_info.get(fd_name, {})
assign = sel.get('assign', fd_name) if isinstance(sel, dict) else fd_name
outpath = os.path.join(temp_dir, assign)
fds = fd_field_dicts.get(fd_name, [])
org = sel.get('organization', 'SEQUENTIAL') if isinstance(sel, dict) else 'SEQUENTIAL'
line_seq = (org == 'LINE SEQUENTIAL')
try:
recs = file_io.read_output_file(outpath, fds, line_sequential=line_seq)
result['output_summary'][fd_name] = len(recs)
except Exception:
result['output_summary'][fd_name] = -1
for i, rec in enumerate(abend_recs):
group = GroupInfo(
name=f'abend_{i+1}', records=[rec], expected_outputs=[],
expected_returncode=1,
fd_field_dicts=fd_field_dicts, open_dir=open_dir,
select_info=select_info,
)
r = run_group(group, exe_path, temp_dir)
if r.returncode != 0:
result['abend_pass'] += 1
return result
+94
View File
@@ -1,10 +1,20 @@
import os
import subprocess
from pathlib import Path
from runners.runner import BuildResult, RunResult
class CobolRunner:
"""COBOL 程序编译·执行器。
非DB程序(KIN系)使用 ``compile_with_links()`` + ``run_file_based()``。
DB程序/旧式调用保持 ``compile()`` + ``run()`` 不变。
"""
# ── 旧式(orchestrator.py 互換)──
def compile(self, src: str, dialect="ibm", gcov: bool = False) -> BuildResult:
"""旧式编译(-std=ibm-strict)。orchestrator.py 用,不修改。"""
stem = Path(src).stem
out = str(Path(src).parent / stem)
cmd = ["cobc", "-x", f"-std={dialect}-strict", "-o", out, src]
@@ -14,8 +24,92 @@ class CobolRunner:
return BuildResult(success=p.returncode == 0, artifact_path=out, log=p.stdout + p.stderr)
def run(self, binary: str, input_path: str, output_path: str) -> RunResult:
"""旧式执行(stdin→stdout)。orchestrator.py 用。"""
with open(input_path, "rb") as f:
data = f.read()
p = subprocess.run([binary], input=data, capture_output=True, timeout=30)
Path(output_path).write_bytes(p.stdout)
return RunResult(success=p.returncode == 0)
# ── 新式(非DB KIN系向け)──
def compile_with_links(self, src: str, work_dir: str,
copybook_dirs: list[str] | None = None,
sub_objects: list[str] | None = None,
gcov: bool = False) -> BuildResult:
"""编译主程序 + 链接 SUB.o,不使用 -std=ibm-strict。
Args:
src: COBOL 源文件路径。
work_dir: 编译工作目录(.gcno 产出位置,必须提前存在)。
copybook_dirs: COPYBOOK 搜索路径列表(映射为 -I 参数)。
sub_objects: SUB*.o 文件路径列表(静态链接)。
gcov: 是否启用 ``--coverage``。
Returns:
BuildResultartifact_path 指向 .exe 的绝对路径。
"""
work_path = Path(work_dir).resolve()
work_path.mkdir(parents=True, exist_ok=True)
stem = Path(src).stem
exe_path = work_path / f'{stem}.exe'
cmd = ["cobc", "-x", "-g"]
if gcov:
cmd.append("--coverage")
for cpy in (copybook_dirs or []):
cmd.extend(["-I", str(cpy)])
cmd.extend(["-o", str(exe_path), str(src)])
cmd.extend(str(o) for o in (sub_objects or []))
orig = os.getcwd()
try:
os.chdir(str(work_path))
p = subprocess.run(cmd, capture_output=True, text=True, timeout=120)
finally:
os.chdir(orig)
return BuildResult(
success=p.returncode == 0,
artifact_path=str(exe_path),
log=(p.stdout + p.stderr)[:2000],
)
def run_file_based(self, binary: str, run_dir: str,
input_files: dict[str, str] | None = None,
timeout: int = 60) -> RunResult:
"""基于文件的执行。将输入文件复制到 run_dir 后启动程序。
Args:
binary: 可执行文件路径。
run_dir: 执行目录(CWD,输入文件在此,输出文件也在此)。
input_files: {assign_name: source_path} 的映射。
如 ``{"KIN04R01": "/path/to/input/KIN04R01"}``。
timeout: 超时秒数。
Returns:
RunResult。
"""
run_path = Path(run_dir).resolve()
run_path.mkdir(parents=True, exist_ok=True)
# 入力ファイル配置
for assign_name, src_path in (input_files or {}).items():
src = Path(src_path)
if src.exists():
dst = run_path / assign_name
import shutil
shutil.copy2(str(src), str(dst))
orig = os.getcwd()
try:
os.chdir(str(run_path))
p = subprocess.run([str(binary)], capture_output=True, text=True, timeout=timeout)
finally:
os.chdir(orig)
return RunResult(
success=p.returncode == 0,
log=(p.stdout + p.stderr)[:2000],
)