From 40e8a50ab492344f1f7a39f8f782bb21139c99ff Mon Sep 17 00:00:00 2001 From: hangshuo652 Date: Thu, 2 Jul 2026 21:26:51 +0800 Subject: [PATCH] =?UTF-8?q?fix:=20gcov=20Windows=E6=9C=AC=E5=9C=B0?= =?UTF-8?q?=E5=8C=96=20+=20runner=E5=90=88=E5=B9=B6=20+=20=E5=86=B3?= =?UTF-8?q?=E7=AD=96=E8=A6=86=E7=9B=96=E7=8E=87=E4=BF=AE=E5=A4=8D?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - gcov从WSL 15.2改为Windows MinGW 9.2.0,消除.gcno格式不匹配 - coverage.py去除leaf_stats复位bug(决策覆盖率恢复) - runner.py合并V3编译+SOURCE分组执行+出力保存 - __init__.py gcov条件/outdir解析修复 --- cobol_testgen/__init__.py | 7 +- cobol_testgen/core.py | 17 +- cobol_testgen/coverage.py | 4 +- cobol_testgen/gcov.py | 30 ++- cobol_testgen/runner.py | 504 ++++++++++++++++++++++++++++++++++++++ runners/cobol_runner.py | 94 +++++++ 6 files changed, 635 insertions(+), 21 deletions(-) create mode 100644 cobol_testgen/runner.py diff --git a/cobol_testgen/__init__.py b/cobol_testgen/__init__.py index 1c1c783..5c1d725 100644 --- a/cobol_testgen/__init__.py +++ b/cobol_testgen/__init__.py @@ -288,6 +288,9 @@ def main(): if '--gcov' in args: gcov_mode = True args.remove('--gcov') + if not _HAVE_RUNNER: + logger.warning("--gcov: runner.py not found. Compile/run will be skipped. " + "Use --gcov without runner only generates test data + static coverage.") i = 0 while i < len(args): if args[i] == '--temp-dir': @@ -309,7 +312,7 @@ def main(): outdir = None for a in args: p = Path(a) - if p.is_dir(): + if p.is_dir() or (not p.suffix and p.parent.exists()): outdir = p elif p.suffix.upper() in ('.CBL', '.COB', '.CPY'): cobol_files.append(p) @@ -553,7 +556,7 @@ def main(): data_fields=fields_dict, select_info=select_info) gcov_data = None - if gcov_mode and proc_div and _HAVE_GCOV: + if gcov_mode and proc_div and _HAVE_GCOV and _HAVE_RUNNER: _temp = temp_dir or str(outdir / '.gcov_cache') source_dir = str(filepath.parent) expected_records: list[dict] = [{}] * len(records) diff --git a/cobol_testgen/core.py b/cobol_testgen/core.py index a5c40bb..e3e7e9f 100644 --- a/cobol_testgen/core.py +++ b/cobol_testgen/core.py @@ -1402,15 +1402,22 @@ def propagate_assignments(rec, assignments, fields, file_sec=None): if pi.get('type') == 'numeric': digits = pi.get('digits', 0) decimal = pi.get('decimal', 0) - total = digits + decimal s = str(val) neg = s.startswith('-') if neg: s = s[1:] - s = s.zfill(total) - int_part = s[:digits] if digits else '0' - dec_part = s[digits:] if decimal > 0 else '0' - result = float(int(int_part or '0') + int(dec_part or '0') / (10 ** decimal)) + # Handle explicit decimal point (e.g. "008." or "12.34") + if '.' in s: + parts = s.split('.') + int_part = parts[0] + dec_part = parts[1] if len(parts) > 1 else '' + else: + total = digits + decimal + s = s.zfill(total) + int_part = s[:digits] if digits else '0' + dec_part = s[digits:] if decimal > 0 else '' + dec_val = int(dec_part.ljust(decimal, '0')[:decimal] or '0') if decimal > 0 else 0 + result = float(int(int_part or '0') + dec_val / (10 ** decimal)) return -result if neg else result try: return float(val) diff --git a/cobol_testgen/coverage.py b/cobol_testgen/coverage.py index 7acd2a6..a9601f1 100644 --- a/cobol_testgen/coverage.py +++ b/cobol_testgen/coverage.py @@ -1199,9 +1199,7 @@ def run_coverage(branch_tree, branch_paths_with_assigns, fields, if gcov_data: mark_from_gcov(decision_points, gcov_data, branch_tree) - for ls in leaf_stats: - ls.covered_true = False - ls.covered_false = False + # leaf_stats 保留静态分析结果(gcov 无 -b 时不提供叶条件级别的分支数据) _source_note = '' if gcov_data: diff --git a/cobol_testgen/gcov.py b/cobol_testgen/gcov.py index 41572bf..28dbb6c 100644 --- a/cobol_testgen/gcov.py +++ b/cobol_testgen/gcov.py @@ -18,7 +18,7 @@ def parse_cbl_gcov(gcov_path: str) -> dict[int, int]: -: 17: 源码行 → 不可执行(注释/声明行,跳过) """ counts = {} - with open(gcov_path, encoding='utf-8') as f: + with open(gcov_path, encoding='utf-8', errors='replace') as f: for line in f: m = re.match(r'^\s*(#####|\d+\*?|-):\s*(\d+):', line) if not m: @@ -35,7 +35,10 @@ def parse_cbl_gcov(gcov_path: str) -> dict[int, int]: def run_gcov(program_name: str, work_dir: str) -> dict[int, int]: - """在 work_dir 中通过 WSL 执行 gcov 并解析 COBOL 行计数。 + """在 work_dir 中执行 gcov 并解析 COBOL 行计数。 + + 使用 Windows 本地的 gcov(与 GnuCOBOL 内置 MinGW 同版本)。 + 不要通过 WSL 调用 gcov,否则 .gcno/.gcda 版本不匹配。 Args: program_name: 程序名(不含扩展名),如 "ALLCMDS" @@ -44,21 +47,26 @@ def run_gcov(program_name: str, work_dir: str) -> dict[int, int]: Returns: {COBOL行号: 执行次数} 字典。失败时返回空 dict。 """ - wsl_work = _wsl_path(work_dir) - cmd = ['wsl', 'sh', '-c', f'cd {wsl_work} && gcov {program_name}.c'] - result = subprocess.run( - cmd, - capture_output=True, text=True, - encoding='utf-8', errors='replace', - timeout=30, - ) + try: + result = subprocess.run( + ['gcov', f'{program_name}.cbl'], + capture_output=True, text=True, + encoding='utf-8', errors='replace', + timeout=30, + cwd=work_dir, + ) + except FileNotFoundError: + logger.warning("gcov 命令未找到(未安装 MinGW gcov 或不在 PATH 中)") + return {} if result.returncode != 0: logger.warning(f"gcov 失败 (exit={result.returncode}): {result.stderr.strip()}") return {} cbl_gcov = Path(work_dir) / f'{program_name}.cbl.gcov' if not cbl_gcov.exists(): - logger.warning(f"gcov 输出不存在: {cbl_gcov}") + cbl_gcov = Path(work_dir) / f'{program_name}.gcov' + if not cbl_gcov.exists(): + logger.warning(f"gcov 输出不存在 (tried .cbl.gcov / .gcov)") return {} gcov_data = parse_cbl_gcov(str(cbl_gcov)) diff --git a/cobol_testgen/runner.py b/cobol_testgen/runner.py new file mode 100644 index 0000000..39a89c4 --- /dev/null +++ b/cobol_testgen/runner.py @@ -0,0 +1,504 @@ +"""非DB COBOL程序的编译·执行·验证层(被 __init__.py 的 --gcov/--run 调用) + +V3 编译方式(Windows 本地 cobc)+ SOURCE 分组执行+字段对比+出力保存。 +""" + +import logging +import os +import re +import shutil +import subprocess +from dataclasses import dataclass, field +from pathlib import Path + +from . import file_io + +logger = logging.getLogger(__name__) + + +# ── 数据模型 ── + + +@dataclass +class GroupInfo: + """一组执行用例(SOURCE 版)""" + name: str + records: list[dict] + expected_outputs: list[dict] + expected_returncode: int = 0 + fd_field_dicts: dict = field(default_factory=dict) + open_dir: dict = field(default_factory=dict) + select_info: dict = field(default_factory=dict) + overlap_mask: list[bool] = field(default_factory=list) + multi_write_fds: set = field(default_factory=set) + + +@dataclass +class CompareDetail: + """单字段对比结果(兼容 __init__.py 的 DetailItem 接口)""" + field: str + expected: str + actual: str + match: bool + + +@dataclass +class GroupResult: + """单组执行+对比结果""" + name: str + returncode: int + passed: bool + details: list[CompareDetail] = field(default_factory=list) + error_message: str = '' + + +# ── 路径解析(V3)── + + +def _resolve_sub_dir(source_dir: str) -> str: + p = Path(source_dir).resolve() + for d in [p.parent / 'sub', p / 'sub']: + if d.is_dir(): + return str(d) + return str(p.parent / 'sub') + + +def _resolve_cpy_dir(source_dir: str) -> str: + p = Path(source_dir).resolve() + for d in [p.parent / 'cpy', p / 'cpy']: + if d.is_dir(): + return str(d) + return str(p.parent / 'cpy') + + +def _input_assign_names(select_info: dict, open_dir: dict, + fd_fields: dict) -> list[str]: + names = [] + seen = set() + for fd_name in fd_fields: + direction = (open_dir or {}).get(fd_name, '') + if direction not in ('INPUT', 'I-O'): + continue + assign = select_info.get(fd_name, {}).get('assign', '') + if assign and assign not in seen: + seen.add(assign) + names.append(assign) + return names + + +def _output_assign_names(select_info: dict, open_dir: dict, + fd_fields: dict) -> list[str]: + names = [] + seen = set() + for fd_name in fd_fields: + direction = (open_dir or {}).get(fd_name, '') + if direction not in ('OUTPUT', 'I-O'): + continue + assign = select_info.get(fd_name, {}).get('assign', '') + if assign and assign not in seen: + seen.add(assign) + names.append(assign) + return names + + +# ── SUB 模块编译(V3)── + + +def compile_sub_modules(sub_dir: str, work_dir: str, + cpy_dir: str | None = None) -> list[str]: + sub_path = Path(sub_dir) + work_path = Path(work_dir) + work_path.mkdir(parents=True, exist_ok=True) + o_files = [] + + if not sub_path.is_dir(): + logger.warning(f" SUB目录不存在: {sub_dir}") + return o_files + + for cbl in sorted(sub_path.glob('*.cbl')): + o_path = work_path / f'{cbl.stem}.o' + gcno_path = work_path / f'{cbl.stem}.gcno' + if o_path.exists() and gcno_path.exists(): + o_files.append(str(o_path)) + continue + + cmd = ['cobc', '-c', '-g', '--coverage', '-o', str(o_path)] + if cpy_dir: + cmd.extend(['-I', cpy_dir]) + cmd.append(str(cbl)) + + logger.info(f" SUB: {cbl.name}") + orig = os.getcwd() + try: + os.chdir(str(work_path)) + r = subprocess.run(cmd, capture_output=True, text=True, timeout=120) + finally: + os.chdir(orig) + if r.returncode != 0: + logger.warning(f" SUB编译失败 {cbl.name}: {r.stderr.strip()[:200]}") + continue + o_files.append(str(o_path)) + + return o_files + + +def compile_program(program_name: str, source_dir: str, work_dir: str, + sub_objects: list[str], + cpy_dir: str | None = None) -> str: + work_path = Path(work_dir) + work_path.mkdir(parents=True, exist_ok=True) + + src = Path(source_dir) / f'{program_name}.cbl' + if not src.exists(): + raise FileNotFoundError(f"源文件不存在: {src}") + + exe = work_path / f'{program_name}.exe' + + cmd = ['cobc', '-x', '-g', '--coverage', '-o', str(exe)] + if cpy_dir: + cmd.extend(['-I', cpy_dir]) + cmd.append(str(src)) + cmd.extend(sub_objects) + + logger.info(f" LINK: {program_name}.exe") + orig = os.getcwd() + try: + os.chdir(str(work_path)) + r = subprocess.run(cmd, capture_output=True, text=True, timeout=120) + finally: + os.chdir(orig) + if r.returncode != 0: + raise RuntimeError(f"编译失败 {program_name}: {r.stderr.strip()[:500]}") + return str(exe) + + +# ── 目录管理(SOURCE)── + + +def _clean_gcda(temp_dir: str): + for f in Path(temp_dir).glob('*.gcda'): + try: + f.unlink() + except OSError: + pass + + +# ── 工具函数(SOURCE)── + + +def _build_fd_field_dicts(fd_fields: dict, fields_dict: list) -> dict: + name_map = {f['name']: f for f in fields_dict} + result = {} + for fd_name, names in fd_fields.items(): + result[fd_name] = [] + for n in names: + if n in name_map: + pi = name_map[n].get('pic_info', {}) + if pi.get('type') == 'unknown': + continue + result[fd_name].append(name_map[n]) + return result + + +def _name_to_field(fields_dict: list[dict]) -> dict: + return {f['name']: f for f in fields_dict + if not f.get('is_88') and not f.get('is_filler')} + + +# ── 读取出力(SOURCE)── + + +def read_outputs(fd_field_dicts: dict, open_dir: dict, + select_info: dict, temp_dir: str) -> dict[str, list[dict]]: + result = {} + for fd_name, fds in fd_field_dicts.items(): + direction = open_dir.get(fd_name, '') + if direction not in ('OUTPUT', 'I-O'): + continue + sel = select_info.get(fd_name, {}) + if isinstance(sel, dict): + assign = sel.get('assign', fd_name) + org = sel.get('organization', 'SEQUENTIAL') + rec_mode = sel.get('recording_mode', 'F') + else: + assign = sel + org = 'SEQUENTIAL' + rec_mode = 'F' + outpath = os.path.join(temp_dir, assign) + if not os.path.exists(outpath): + logger.warning(f" 出力文件不存在: {outpath}") + continue + line_seq = (org == 'LINE SEQUENTIAL') + try: + records = file_io.read_output_file( + outpath, fds, line_sequential=line_seq, recording_mode=rec_mode + ) + result[fd_name] = records + except Exception as e: + logger.warning(f" 读取出力文件失败 {outpath}: {e}") + continue + return result + + +# ── 对比(SOURCE)── + + +def compare_outputs(actual: list[dict], expected: list[dict], + fd_fields: list[dict], + subset_match: bool = False) -> tuple[bool, list[CompareDetail]]: + all_pass = True + details = [] + + if subset_match: + for exp in expected: + for fd in fd_fields: + fname = fd['name'] + expected_val = exp.get(fname, '') + if not expected_val: + continue + found = any( + act.get(fname, '') == expected_val + for act in actual + ) + if not found: + all_pass = False + details.append(CompareDetail( + field=fname, + expected=expected_val, + actual='(not found in actual)', + match=False, + )) + return all_pass, details + + if len(actual) != len(expected): + logger.warning(f" compare_outputs: actual={len(actual)} vs expected={len(expected)}") + for i, (act, exp) in enumerate(zip(actual, expected)): + for fd in fd_fields: + fname = fd['name'] + actual_val = act.get(fname, '') + expected_val = exp.get(fname, '') + match = (actual_val == expected_val) + if not match: + all_pass = False + details.append(CompareDetail( + field=f'{fname}[{i}]', + expected=expected_val, + actual=actual_val, + match=match, + )) + + if len(actual) != len(expected): + all_pass = False + details.append(CompareDetail( + field='record_count', + expected=str(len(expected)), + actual=str(len(actual)), + match=False, + )) + + return all_pass, details + + +# ── 单组执行(SOURCE 改 - Native)── + + +def run_group(group: GroupInfo, exe_path: str, temp_dir: str) -> GroupResult: + logger.info(f" 执行组: {group.name} ({len(group.records)} 条记录)") + + exe = Path(exe_path).resolve() + work_dir = Path(temp_dir).resolve() + + orig = os.getcwd() + try: + os.chdir(str(work_dir)) + result = subprocess.run( + [str(exe)], capture_output=True, text=True, + encoding='utf-8', errors='replace', timeout=60, + ) + except subprocess.TimeoutExpired: + return GroupResult(name=group.name, returncode=-1, passed=False, + error_message='timeout') + finally: + os.chdir(orig) + + rc = result.returncode + all_pass = (rc == group.expected_returncode) + all_details = [] + + if rc != group.expected_returncode: + all_details.append(CompareDetail( + field='returncode', + expected=str(group.expected_returncode), + actual=str(rc), + match=False, + )) + + return GroupResult( + name=group.name, + returncode=rc, + passed=all_pass, + details=all_details, + ) + + +# ── 主编排 ── + + +def run_all(program_name: str, outdir: str, temp_dir: str, + fields_dict: list[dict], fd_fields: dict, + select_info: dict, open_dir: dict, + term_types: list[str], records: list[dict], + expected_records: list[dict] | None = None, + source_dir: str | None = None, + path_infos: list | None = None, + multi_write_fds: set | None = None + ) -> list[GroupResult]: + """完整编排:编译 → 准备目录 → 逐组执行 → 出力保存。""" + source_dir = source_dir or str(Path(outdir).parent) + work_dir = Path(temp_dir) + work_dir.mkdir(parents=True, exist_ok=True) + expected = expected_records or [] + path_infos = path_infos or [] + multi_write_fds = multi_write_fds or set() + + fd_field_dicts = _build_fd_field_dicts(fd_fields, fields_dict) + + # ── 1. SUB 编译(V3)── + sub_dir = _resolve_sub_dir(source_dir) + cpy_dir = _resolve_cpy_dir(source_dir) + sub_o = compile_sub_modules(sub_dir, str(work_dir), cpy_dir) + + # ── 2. 主程序编译(V3)── + exe_path = compile_program( + program_name, source_dir, str(work_dir), sub_o, cpy_dir + ) + + # ── 3. 入力ファイル配置(V3)── + input_dir = Path(outdir) / 'input' + assign_names = _input_assign_names(select_info, open_dir, fd_fields) + if input_dir.is_dir(): + for assign in assign_names: + src = input_dir / assign + if src.exists(): + shutil.copy2(str(src), str(work_dir / assign)) + logger.info(f" INPUT: {assign} ({src.stat().st_size} bytes)") + + # ── 4. 清理旧 gcda + 执行 ── + _clean_gcda(str(work_dir)) + + def _is_output_fd(fd_name: str) -> bool: + dir_val = open_dir.get(fd_name, '') + return dir_val in ('OUTPUT', 'I-O') + + # output_input_files 只写入非 abend 记录,同步过滤 expected + filtered_expected = [] + for i, rec in enumerate(expected): + term = term_types[i] if i < len(term_types) else 'normal' + if term != 'abend': + filtered_expected.append(rec) + expected = filtered_expected + + group = GroupInfo( + name=program_name, + records=records, + expected_outputs=expected, + expected_returncode=0, + fd_field_dicts=fd_field_dicts, + open_dir=open_dir, + select_info=select_info, + multi_write_fds=multi_write_fds, + ) + + r = run_group(group, exe_path, str(work_dir)) + results = [r] + + # 出力拷贝到 output_dir + output_dir = Path(outdir) / 'output' + output_dir.mkdir(parents=True, exist_ok=True) + for fd_name in fd_field_dicts: + if not _is_output_fd(fd_name): + continue + sel = select_info.get(fd_name, {}) + assign = sel.get('assign', fd_name) if isinstance(sel, dict) else fd_name + src = os.path.join(str(work_dir), assign) + dst = output_dir / assign + if os.path.exists(src): + shutil.copy2(src, str(dst)) + + status = '✓' if r.passed else '✗' + logger.info(f" 组 '{group.name}': returncode={r.returncode}, " + f"{len(r.details)} fields, {status}") + logger.info(f" EXIT={r.returncode}, 出力パス={work_dir}") + return results + + +# ── run_and_compare(被 --run 调用,SOURCE 兼容)── + + +def run_and_compare(program_name: str, outdir: str, + fields_dict: list[dict], fd_fields: dict, + select_info: dict, open_dir: dict, + term_types: list[str], records: list[dict]) -> dict: + """旧版接口兼容包装。返回 {normal_pass, normal_count, ...}。""" + fd_field_dicts = _build_fd_field_dicts(fd_fields, fields_dict) + + normal_recs = [] + abend_recs = [] + for i, term in enumerate(term_types): + if term == 'abend' and i < len(records): + abend_recs.append(records[i]) + elif i < len(records): + normal_recs.append(records[i]) + + result = { + 'normal_pass': False, 'normal_count': 0, + 'abend_pass': 0, 'abend_total': len(abend_recs), + 'output_summary': {}, + } + + temp_dir = os.path.join(outdir, '.run_cache') + source_dir = os.path.join(outdir, '..', 'input') + + work_dir = Path(temp_dir) + work_dir.mkdir(parents=True, exist_ok=True) + _clean_gcda(temp_dir) + + sub_dir = _resolve_sub_dir(source_dir) + cpy_dir = _resolve_cpy_dir(source_dir) + sub_o = compile_sub_modules(sub_dir, temp_dir, cpy_dir) + exe_path = compile_program(program_name, source_dir, temp_dir, sub_o, cpy_dir) + + if normal_recs: + group = GroupInfo( + name='normal', records=normal_recs, expected_outputs=[], + fd_field_dicts=fd_field_dicts, open_dir=open_dir, + select_info=select_info, + ) + r = run_group(group, exe_path, temp_dir) + result['normal_pass'] = (r.returncode == 0) + result['normal_returncode'] = r.returncode + for fd_name in fd_field_dicts: + sel = select_info.get(fd_name, {}) + assign = sel.get('assign', fd_name) if isinstance(sel, dict) else fd_name + outpath = os.path.join(temp_dir, assign) + fds = fd_field_dicts.get(fd_name, []) + org = sel.get('organization', 'SEQUENTIAL') if isinstance(sel, dict) else 'SEQUENTIAL' + line_seq = (org == 'LINE SEQUENTIAL') + try: + recs = file_io.read_output_file(outpath, fds, line_sequential=line_seq) + result['output_summary'][fd_name] = len(recs) + except Exception: + result['output_summary'][fd_name] = -1 + + for i, rec in enumerate(abend_recs): + group = GroupInfo( + name=f'abend_{i+1}', records=[rec], expected_outputs=[], + expected_returncode=1, + fd_field_dicts=fd_field_dicts, open_dir=open_dir, + select_info=select_info, + ) + r = run_group(group, exe_path, temp_dir) + if r.returncode != 0: + result['abend_pass'] += 1 + + return result diff --git a/runners/cobol_runner.py b/runners/cobol_runner.py index 3e736e6..14a69b1 100644 --- a/runners/cobol_runner.py +++ b/runners/cobol_runner.py @@ -1,10 +1,20 @@ +import os import subprocess from pathlib import Path from runners.runner import BuildResult, RunResult class CobolRunner: + """COBOL 程序编译·执行器。 + + 非DB程序(KIN系)使用 ``compile_with_links()`` + ``run_file_based()``。 + DB程序/旧式调用保持 ``compile()`` + ``run()`` 不变。 + """ + + # ── 旧式(orchestrator.py 互換)── + def compile(self, src: str, dialect="ibm", gcov: bool = False) -> BuildResult: + """旧式编译(-std=ibm-strict)。orchestrator.py 用,不修改。""" stem = Path(src).stem out = str(Path(src).parent / stem) cmd = ["cobc", "-x", f"-std={dialect}-strict", "-o", out, src] @@ -14,8 +24,92 @@ class CobolRunner: return BuildResult(success=p.returncode == 0, artifact_path=out, log=p.stdout + p.stderr) def run(self, binary: str, input_path: str, output_path: str) -> RunResult: + """旧式执行(stdin→stdout)。orchestrator.py 用。""" with open(input_path, "rb") as f: data = f.read() p = subprocess.run([binary], input=data, capture_output=True, timeout=30) Path(output_path).write_bytes(p.stdout) return RunResult(success=p.returncode == 0) + + # ── 新式(非DB KIN系向け)── + + def compile_with_links(self, src: str, work_dir: str, + copybook_dirs: list[str] | None = None, + sub_objects: list[str] | None = None, + gcov: bool = False) -> BuildResult: + """编译主程序 + 链接 SUB.o,不使用 -std=ibm-strict。 + + Args: + src: COBOL 源文件路径。 + work_dir: 编译工作目录(.gcno 产出位置,必须提前存在)。 + copybook_dirs: COPYBOOK 搜索路径列表(映射为 -I 参数)。 + sub_objects: SUB*.o 文件路径列表(静态链接)。 + gcov: 是否启用 ``--coverage``。 + + Returns: + BuildResult,artifact_path 指向 .exe 的绝对路径。 + """ + work_path = Path(work_dir).resolve() + work_path.mkdir(parents=True, exist_ok=True) + + stem = Path(src).stem + exe_path = work_path / f'{stem}.exe' + + cmd = ["cobc", "-x", "-g"] + if gcov: + cmd.append("--coverage") + for cpy in (copybook_dirs or []): + cmd.extend(["-I", str(cpy)]) + cmd.extend(["-o", str(exe_path), str(src)]) + cmd.extend(str(o) for o in (sub_objects or [])) + + orig = os.getcwd() + try: + os.chdir(str(work_path)) + p = subprocess.run(cmd, capture_output=True, text=True, timeout=120) + finally: + os.chdir(orig) + + return BuildResult( + success=p.returncode == 0, + artifact_path=str(exe_path), + log=(p.stdout + p.stderr)[:2000], + ) + + def run_file_based(self, binary: str, run_dir: str, + input_files: dict[str, str] | None = None, + timeout: int = 60) -> RunResult: + """基于文件的执行。将输入文件复制到 run_dir 后启动程序。 + + Args: + binary: 可执行文件路径。 + run_dir: 执行目录(CWD,输入文件在此,输出文件也在此)。 + input_files: {assign_name: source_path} 的映射。 + 如 ``{"KIN04R01": "/path/to/input/KIN04R01"}``。 + timeout: 超时秒数。 + + Returns: + RunResult。 + """ + run_path = Path(run_dir).resolve() + run_path.mkdir(parents=True, exist_ok=True) + + # 入力ファイル配置 + for assign_name, src_path in (input_files or {}).items(): + src = Path(src_path) + if src.exists(): + dst = run_path / assign_name + import shutil + shutil.copy2(str(src), str(dst)) + + orig = os.getcwd() + try: + os.chdir(str(run_path)) + p = subprocess.run([str(binary)], capture_output=True, text=True, timeout=timeout) + finally: + os.chdir(orig) + + return RunResult( + success=p.returncode == 0, + log=(p.stdout + p.stderr)[:2000], + )