diff --git a/AGENTS.md b/AGENTS.md new file mode 100644 index 0000000..149383f --- /dev/null +++ b/AGENTS.md @@ -0,0 +1,18 @@ +# cobol-java-v3 + +## 工作目录 +C:\Users\marye\Desktop\2026技术大赛\cobol-java-v3 + +## 我的模块 +cobol_testgen/ + +## 远程仓库 +https://gittea.dev/hangshuo652/cobol-java-v3 + +## 工作流程 +```powershell +cd "C:\Users\marye\Desktop\2026技术大赛\cobol-java-v3" +git add cobol_testgen/ +git commit -m "描述修改" +git push +``` diff --git a/cobol_testgen/__init__.py b/cobol_testgen/__init__.py index 90b13b3..791e429 100644 --- a/cobol_testgen/__init__.py +++ b/cobol_testgen/__init__.py @@ -7,10 +7,7 @@ from pathlib import Path # ── 配置(必须放在本地模块导入之前,避免循环导入) ── -CONFIG = { - "proc_parser": "rule", # "rule" | "ai" - "llm_generator": False, # True=启用LLM路径生成; False=纯规则引擎 -} +CONFIG = {} from .read import preprocess, extract_data_division, extract_procedure_division from .read import resolve_copybooks, parse_data_division, parse_file_section, scan_open_statements @@ -18,7 +15,6 @@ from .core import build_branch_tree, classify_field_roles, _init_child_names from .cond import parse_single_condition, is_field from .design import enum_paths, generate_records, _filter_stop from .output import output_json, output_input_files -from . import agents from .coverage import run_coverage, generate_coverage_index logger = logging.getLogger(__name__) @@ -233,14 +229,7 @@ def main(): assignments = {} if proc_div: - if CONFIG["proc_parser"] == "ai": - try: - result = agents.parse_proc_division_ai(proc_div, fields_dict) - branch_tree, assignments = result - except NotImplementedError: - branch_tree, assignments = build_branch_tree(proc_div, fields_dict) - else: - branch_tree, assignments = build_branch_tree(proc_div, fields_dict) + branch_tree, assignments = build_branch_tree(proc_div, fields_dict) roles = classify_field_roles(branch_tree, assignments, fields_dict, source=preprocessed, proc_text=proc_div) @@ -281,14 +270,14 @@ def main(): cov_result = run_coverage(branch_tree, branch_paths_with_assigns, fields_dict, source, cov_prefix, index_relpath=index_relpath) - records = generate_records(branch_paths_with_assigns, fields_dict, assignments, file_sec=file_sec) + records, kept_path_cons = generate_records(branch_paths_with_assigns, fields_dict, assignments, file_sec=file_sec) # 输出 JSON(完整文件) outpath = outdir / (filepath.stem + '.json') output_json(records, outpath, roles, fd_fields=fd_fields, field_to_fd=field_to_fd, open_dir=open_dir, - path_cons_list=[c for c, a in branch_paths_with_assigns]) + path_cons_list=kept_path_cons) # 输出入力 JSON(按 FD 拆分) output_input_files(records, outdir, filepath.stem, roles, diff --git a/cobol_testgen/agents.py b/cobol_testgen/agents.py deleted file mode 100644 index dc95016..0000000 --- a/cobol_testgen/agents.py +++ /dev/null @@ -1,308 +0,0 @@ -"""AI智能体接口 — 基于DeepSeek的PROCEDURE DIVISION解析""" - -import json -import os -import re -from pathlib import Path - -from .models import BrSeq, BrIf, BrEval, BrPerform, Assign, CallNode - - -DEEPSEEK_API_KEY_ENV = "DEEPSEEK_API_KEY" -DEEPSEEK_BASE_URL = "https://api.deepseek.com/v1" -DEEPSEEK_MODEL = "deepseek-chat" -PROMPT_FILE = Path(__file__).parent / "prompts" / "parse_proc_division.txt" - - -def parse_proc_division_ai(proc_text: str, fields: list = None, spec_doc: str = ""): - """AI版PROCEDURE DIVISION解析:调用DeepSeek API,返回(branch_tree, assignments).""" - api_key = os.environ.get(DEEPSEEK_API_KEY_ENV) - if not api_key: - raise NotImplementedError( - f"AI agent requires {DEEPSEEK_API_KEY_ENV} environment variable" - ) - - prompt = _build_prompt(proc_text, fields) - response_text = _call_llm(prompt, api_key) - data = _extract_json(response_text) - if not data: - raise NotImplementedError("AI returned no parsable JSON") - - branch_tree = _json_to_tree(data.get("tree", {})) - assignments = data.get("assignments", {}) - return branch_tree, assignments - - -def _build_prompt(proc_text: str, fields: list = None) -> list[dict]: - system = PROMPT_FILE.read_text(encoding="utf-8") - - fields_json = json.dumps(fields, ensure_ascii=False, indent=2) if fields else "[]" - - user = f"""## PROCEDURE DIVISION 源码 - -``` -{proc_text} -``` - -## DATA DIVISION 字段列表 - -```json -{fields_json} -``` -""" - - return [ - {"role": "system", "content": system}, - {"role": "user", "content": user}, - ] - - -def _call_llm(messages: list[dict], api_key: str) -> str: - try: - from openai import OpenAI - except ImportError: - raise NotImplementedError( - "openai package not installed. Run: pip install openai" - ) - - client = OpenAI(api_key=api_key, base_url=DEEPSEEK_BASE_URL) - response = client.chat.completions.create( - model=DEEPSEEK_MODEL, - messages=messages, - temperature=0.1, - max_tokens=8192, - ) - return response.choices[0].message.content or "" - - -def _extract_json(text: str) -> dict | None: - stripped = text.strip() - # Try extracting from markdown code block first - m = re.search(r"```(?:json)?\s*\n?(.*?)\n?```", stripped, re.DOTALL) - if m: - stripped = m.group(1).strip() - try: - return json.loads(stripped) - except json.JSONDecodeError: - return None - - -def _json_to_tree(data: dict): - node_type = data.get("type", "seq") - - if node_type == "seq": - node = BrSeq() - for child_data in data.get("children", []): - child = _json_to_tree(child_data) - if child is not None: - node.add(child) - return node - - if node_type == "if": - node = BrIf(data.get("condition", "")) - node.true_seq = _json_to_tree(data.get("true_seq", {"type": "seq", "children": []})) - node.false_seq = _json_to_tree(data.get("false_seq", {"type": "seq", "children": []})) - return node - - if node_type == "eval": - node = BrEval(data.get("subject", "")) - for w in data.get("when_list", []): - node.when_list.append((w.get("value", ""), _json_to_tree(w.get("seq", {"type": "seq", "children": []})))) - node.other_seq = _json_to_tree(data.get("other_seq", {"type": "seq", "children": []})) - node.has_other = data.get("has_other", False) - return node - - if node_type == "perform": - perf_type = data.get("perf_type", "para") - kw = {"perf_type": perf_type} - for k in ("condition", "target", "thru", "times", - "varying_var", "varying_from", "varying_by"): - if k in data: - kw[k] = data[k] - node = BrPerform(**kw) - if "body_seq" in data: - node.body_seq = _json_to_tree(data["body_seq"]) - return node - - if node_type == "assign": - return Assign( - target=data.get("target", ""), - source_info=data.get("source_info", {}), - ) - - if node_type == "call": - return CallNode( - program_name=data.get("program_name", ""), - using_params=data.get("using_params", []), - ) - - return None - - -# ── LLM 路径生成 ── - - -def llm_generate_all_paths(tree_root, fields) -> list | None: - """为整个控制流树生成 MC/DC 路径。返回 [(constraints, assignments), ...] 或 None。""" - api_key = os.environ.get(DEEPSEEK_API_KEY_ENV) - if not api_key: - return None - - tree_json = _serialize_tree_for_llm(tree_root) - if tree_json is None: - return None - - level88_map = _extract_88_mapping(fields) - messages = _build_path_prompt(tree_json, fields, level88_map) - - try: - response = _call_llm(messages, api_key) - data = _extract_json(response) - if data and "paths" in data: - return _parse_llm_paths(data["paths"]) - except Exception: - pass - return None - - -def _serialize_tree_for_llm(node): - if node is None: - return None - from .models import BrSeq, BrIf, BrEval, BrPerform, Assign, CallNode, ExitNode, GoTo - - if isinstance(node, BrSeq): - children = [] - for child in node.children: - s = _serialize_tree_for_llm(child) - if s is not None: - children.append(s) - return {"type": "seq", "children": children} if children else None - - if isinstance(node, BrIf): - return { - "type": "if", - "condition": node.condition, - "true_seq": _serialize_tree_for_llm(node.true_seq) or {"type": "seq", "children": []}, - "false_seq": _serialize_tree_for_llm(node.false_seq) or {"type": "seq", "children": []}, - } - - if isinstance(node, BrEval): - when_list = [] - for val, seq in node.when_list: - s = _serialize_tree_for_llm(seq) - when_list.append({"value": val, "seq": s or {"type": "seq", "children": []}}) - return { - "type": "eval", - "subject": node.subject, - "when_list": when_list, - "other_seq": _serialize_tree_for_llm(node.other_seq) or {"type": "seq", "children": []}, - "has_other": node.has_other, - } - - if isinstance(node, BrPerform): - result = {"type": "perform", "perf_type": node.perf_type} - for attr in ("condition", "target", "thru", "times", - "varying_var", "varying_from", "varying_by"): - val = getattr(node, attr, None) - if val is not None: - result[attr] = val - if node.body_seq: - bs = _serialize_tree_for_llm(node.body_seq) - if bs: - result["body_seq"] = bs - return result - - # Assign / CallNode / ExitNode / GoTo — 不影响路径生成,可省略 - return None - - -def _extract_88_mapping(fields): - mapping = {} - for f in fields: - if f.get('is_88'): - mapping[f['name']] = { - "parent": f['parent'], - "value": f['value'], - "pic_info": f.get('pic_info', {}), - } - return mapping - - -def _build_path_prompt(tree_json, fields, level88_map): - system = ("你是 COBOL 测试路径生成专家。" - "请为给定的控制流树生成满足 MC/DC 覆盖的测试路径集。" - "只输出 JSON,不要多余文字。") - - reduced_fields = [] - for f in fields: - entry = {"name": f["name"], "pic": f.get("pic", "")} - pi = f.get("pic_info", {}) - if pi: - entry["pic_info"] = { - "type": pi.get("type"), "digits": pi.get("digits"), - "decimal": pi.get("decimal"), "length": pi.get("length"), - } - if f.get("is_88"): - entry["is_88"] = True - entry["value"] = f.get("value") - entry["parent"] = f.get("parent") - reduced_fields.append(entry) - - user = ( - "## 控制流树(JSON)\n\n" - f"```json\n{json.dumps(tree_json, ensure_ascii=False, indent=2)}\n```\n\n" - "## 字段定义\n\n" - f"```json\n{json.dumps(reduced_fields, ensure_ascii=False, indent=2)}\n```\n\n" - "## 要求\n" - "1. 每个 IF/EVALUATE/PERFORM UNTIL 的每个分支至少被覆盖一次\n" - "2. 复合条件(AND/OR/NOT)需要满足 MC/DC:每个叶条件的独立影响对\n" - "3. 路径数尽量少(最小集优先)\n" - "4. 88-level 条件名要展开为实际字段比较(如 CUST-VIP → WS-CUST-LEVEL='V')\n" - "5. 同一路径中的约束不能自相矛盾(同一字段不能同时等于 'A' 和等于 'B')\n" - "6. 数值边界值合理(>5000 → 5001, <100 → 99)\n" - "7. AND 优先级高于 OR\n\n" - "## 输出格式\n\n" - "```json\n" - "{\n" - ' "paths": [\n' - " {\n" - ' "constraints": [\n' - ' {"field": "WS-AMOUNT", "op": ">", "value": "5000", "want_true": true}\n' - " ],\n" - ' "assignments": {}\n' - " }\n" - " ]\n" - "}\n" - "```" - ) - - return [ - {"role": "system", "content": system}, - {"role": "user", "content": user}, - ] - - -def _parse_llm_paths(paths_data): - result = [] - for p in paths_data: - constraints = [] - for c in p.get("constraints", []): - constraints.append((c["field"], c["op"], str(c["value"]), c["want_true"])) - assignments = p.get("assignments", {}) - result.append((constraints, assignments)) - return result - - -def resolve_constraints_ai(paths, fields=None, assignments=None): - """AI版约束推理(未来实现)""" - raise NotImplementedError("AI agent not yet implemented") - - -def enhance_metadata_ai(records, fields=None, spec_doc: str = ""): - """AI版测试用例元数据生成(未来实现)""" - raise NotImplementedError("AI agent not yet implemented") - - -def analyze_spec_ai(spec_doc: str = ""): - """AI版式样书解析(未来实现)""" - raise NotImplementedError("AI agent not yet implemented") diff --git a/cobol_testgen/core.py b/cobol_testgen/core.py index 43dc9a8..5f6a0f3 100644 --- a/cobol_testgen/core.py +++ b/cobol_testgen/core.py @@ -3,7 +3,7 @@ import re import logging from datetime import datetime -from .models import BrSeq, BrIf, BrEval, BrPerform, BrSeq, CondLeaf, CondNot, ParseError, Assign, CallNode, ExitNode, GoTo +from .models import BrSeq, BrIf, BrEval, BrPerform, BrSearch, BrSeq, CondLeaf, CondNot, ParseError, Assign, CallNode, ExitNode, GoTo from .cond import parse_compound_condition, parse_single_condition, collect_leaves logger = logging.getLogger(__name__) @@ -12,6 +12,7 @@ logger = logging.getLogger(__name__) _COBOL_SCOPE_ENDERS = { 'END-IF', 'END-EVALUATE', 'END-PERFORM', 'END-EXEC', 'END-CALL', 'END-READ', 'END-WRITE', 'END-DELETE', 'END-REWRITE', 'END-START', + 'END-SEARCH', 'ELSE', 'WHEN', 'OTHER', } @@ -22,22 +23,26 @@ def scan_paragraphs(raw_lines): while i < len(raw_lines): line = raw_lines[i].strip() m = re.match(r'^([A-Z0-9][A-Z0-9-]*)\.\s*$', line) + sec_m = re.match(r'^([A-Z][A-Z0-9-]*)\s+SECTION\.?\s*$', line, re.IGNORECASE) if m and m.group(1) not in _COBOL_SCOPE_ENDERS: name = m.group(1) - start = i + 1 - j = i + 1 - while j < len(raw_lines): - nline = raw_lines[j].strip() - nm = re.match(r'^([A-Z0-9][A-Z0-9-]*)\.\s*$', nline) - if nm and nm.group(1) not in _COBOL_SCOPE_ENDERS: - break - if re.match(r'^[A-Z][A-Z0-9-]*\s+SECTION\.\s*$', nline, re.IGNORECASE): - break - j += 1 - paragraphs[name] = (start, j - 1) - i = j + elif sec_m: + name = sec_m.group(1).upper() else: i += 1 + continue + start = i + 1 + j = i + 1 + while j < len(raw_lines): + nline = raw_lines[j].strip() + nm = re.match(r'^([A-Z0-9][A-Z0-9-]*)\.\s*$', nline) + if nm and nm.group(1) not in _COBOL_SCOPE_ENDERS: + break + if re.match(r'^[A-Z][A-Z0-9-]*\s+SECTION\.\s*$', nline, re.IGNORECASE): + break + j += 1 + paragraphs[name] = (start, j - 1) + i = j return paragraphs @@ -160,6 +165,10 @@ class _BrParser: if perf_node: seq.add(perf_node) continue + m_search = re.match(r'^SEARCH\b(?:\s+(ALL))?\s+(\w[\w-]*)(?:\s+VARYING\s+(\w[\w-]*))?', line, re.IGNORECASE) + if m_search: + seq.add(self._parse_search(m_search)) + continue m = re.match(r'^INITIALIZE\s+', line) if m: init_seq = self._parse_initialize() @@ -229,6 +238,15 @@ class _BrParser: if m_set: seq.add(self._parse_set_true(m_set.group(1))) continue + m_insp = re.match(r'^INSPECT\s+', line, re.IGNORECASE) + if m_insp: + info = self._parse_inspect(line) + if info: + tgt = info.get('tgt', '') + self.assignments.setdefault(tgt, []).append(info) + seq.add(Assign(tgt, info)) + self.advance() + continue assign_node = self._record_assignment(line) if assign_node: seq.add(assign_node) @@ -243,6 +261,81 @@ class _BrParser: return True return False + # ── INSPECT ── + + _PIC_FIG_CONV = {'ZERO': '0', 'ZEROS': '0', 'ZEROES': '0', + 'SPACE': ' ', 'SPACES': ' '} + + @staticmethod + def _expand_figurative(val): + if val.upper() in _BrParser._PIC_FIG_CONV: + return _BrParser._PIC_FIG_CONV[val.upper()] + return val + + def _parse_inspect_phrase(self, phrase): + m = re.match( + r'TALLYING\s+(\w[\w-]*)\s+FOR\s+' + r'(LEADING|TRAILING|CHARACTERS)' + r'(?:\s+([\'"])(.*?)\3)?' + r'(?:\s+(BEFORE|AFTER)\s+INITIAL\s+([\'"])(.*?)\6)?\s*$', + phrase, re.IGNORECASE + ) + if m: + return ('tally', { + 'count_var': m.group(1).upper(), + 'kind': m.group(2).upper(), + 'char': self._expand_figurative(m.group(4) or ''), + 'before_after': (m.group(5) or '').upper(), + 'delimiter': self._expand_figurative(m.group(7) or ''), + }) + m = re.match( + r'REPLACING\s+' + r'(ALL|LEADING|FIRST|CHARACTERS)\s+' + r'([\'"])(.*?)\2\s+BY\s+' + r'([\'"])(.*?)\4' + r'(?:\s+(BEFORE|AFTER)\s+INITIAL\s+([\'"])(.*?)\7)?\s*$', + phrase, re.IGNORECASE + ) + if m: + return ('replace', { + 'kind': m.group(1).upper(), + 'src': self._expand_figurative(m.group(3)), + 'dst': self._expand_figurative(m.group(5)), + 'before_after': (m.group(6) or '').upper(), + 'delimiter': self._expand_figurative(m.group(8) or ''), + }) + m = re.match( + r'CONVERTING\s+([\'"])(.*?)\1\s+TO\s+([\'"])(.*?)\3\s*$', + phrase, re.IGNORECASE + ) + if m: + return ('convert', { + 'from_chars': self._expand_figurative(m.group(2)), + 'to_chars': self._expand_figurative(m.group(4)), + }) + return None + + def _parse_inspect(self, line): + m = re.match(r'^INSPECT\s+(\w[\w-]*)\s+(.+)$', line, re.IGNORECASE) + if not m: + return None + tgt = m.group(1).upper() + rest = m.group(2).strip() + phrases = re.split(r'\s+(?=(?:TALLYING|REPLACING|CONVERTING)\b)', rest, flags=re.IGNORECASE) + sub_ops = [] + for phrase in phrases: + sub = self._parse_inspect_phrase(phrase.strip()) + if sub: + sub_ops.append(sub) + if not sub_ops: + return None + return { + 'type': 'inspect', + 'tgt': tgt, + 'source_vars': [tgt], + 'sub_ops': sub_ops, + } + def _record_assignment(self, line): if self.assignments is None: return None @@ -503,6 +596,44 @@ class _BrParser: vars_in = re.findall(r'[A-Z][A-Z0-9-]*', expr.upper()) return {'type': 'compute', 'source_vars': list(set(vars_in)), 'op': None, 'const': None, 'expr': expr} + # ── SEARCH / SEARCH ALL ── + + def _parse_search(self, m): + is_all = bool(m.group(1)) + table = m.group(2).upper() + varying = m.group(3).upper() if m.group(3) else None + node = BrSearch(table, is_all=is_all, varying=varying) + self.advance() + while self.pos < len(self.lines): + line = self.clean() + if line in ('END-SEARCH', 'END-SEARCH.'): + self.advance() + return node + m_at = re.match(r'^AT\s+END(.+)?$', line, re.IGNORECASE) + if m_at: + self.advance() + rest = m_at.group(1) + if rest and rest.strip(): + self.lines.insert(self.pos, rest.strip()) + node.at_end_seq = self.parse_seq( + end_check=lambda l: re.match(r'^WHEN\b', l) or l in ('END-SEARCH',) + ) + node.has_at_end = True + continue + m_when = re.match(r'^WHEN\s+(.+?)\s*$', line, re.IGNORECASE) + if m_when: + cond_upper = m_when.group(1).strip() + self.advance() + cond_tree = parse_compound_condition(cond_upper, self.fields) + body_seq = self.parse_seq( + end_check=lambda l: re.match(r'^(WHEN|AT\s+END)\b', l) or l in ('END-SEARCH',) + ) + node.when_list.append((cond_upper, body_seq)) + node.cond_trees.append(cond_tree) + continue + self.advance() + return node + def _parse_if(self): line = self.clean() m = re.match(r'^IF\s+(.+?)(?:THEN)?\s*$', line) @@ -1039,6 +1170,18 @@ def _resolve_subscript(key, rec): return key +def _apply_before_after(val, before_after, delimiter): + if not delimiter: + return val + if before_after == 'BEFORE': + idx = val.find(delimiter) + return val[:idx] if idx >= 0 else val + if before_after == 'AFTER': + idx = val.find(delimiter) + return val[idx + len(delimiter):] if idx >= 0 else '' + return val + + def propagate_assignments(rec, assignments, fields, file_sec=None): def raw_to_float(val, pi): if pi.get('type') == 'numeric': @@ -1233,6 +1376,47 @@ def propagate_assignments(rec, assignments, fields, file_sec=None): if all_found: rec[resolved_tgt] = float_to_raw(total, pi_tgt) + # Pass 4.5: INSPECT + for tgt, asgn in flat_list: + if asgn['type'] != 'inspect': + continue + resolved_tgt = _resolve_subscript(tgt, rec) + if resolved_tgt not in rec: + continue + src_val = str(rec[resolved_tgt]) + for op_type, params in asgn.get('sub_ops', []): + if op_type == 'tally': + cv = params['count_var'].upper() + cv_pi = pi_map.get(cv, {}) + effective = _apply_before_after(src_val, params.get('before_after'), params.get('delimiter')) + cnt = 0 + if params['kind'] == 'LEADING': + cnt = len(effective) - len(effective.lstrip(params['char'])) + elif params['kind'] == 'TRAILING': + cnt = len(effective) - len(effective.rstrip(params['char'])) + else: + cnt = len(effective) + if cv_pi.get('type') == 'numeric': + rec[cv] = float_to_raw(float(cnt), cv_pi) + elif op_type == 'replace': + effective = _apply_before_after(src_val, params.get('before_after'), params.get('delimiter')) + if params['kind'] == 'ALL': + new_val = effective.replace(params['src'], params['dst']) + elif params['kind'] == 'LEADING': + new_val = effective + while new_val.startswith(params['src']): + new_val = new_val[len(params['src']):] + new_val = effective.replace(params['src'], params['dst'], 1) + elif params['kind'] == 'FIRST': + new_val = effective.replace(params['src'], params['dst'], 1) + else: + new_val = params['dst'] * len(effective) + rec[resolved_tgt] = new_val + elif op_type == 'convert': + effective = _apply_before_after(src_val, params.get('before_after'), params.get('delimiter')) + table = str.maketrans(params['from_chars'], params['to_chars']) + rec[resolved_tgt] = effective.translate(table) + # Pass 5: STRING / UNSTRING for tgt, asgn in flat_list: if asgn['type'] == 'string_concat': diff --git a/cobol_testgen/coverage.py b/cobol_testgen/coverage.py index efb91b7..768d7fc 100644 --- a/cobol_testgen/coverage.py +++ b/cobol_testgen/coverage.py @@ -6,7 +6,7 @@ from dataclasses import dataclass, field from pathlib import Path logger = logging.getLogger(__name__) -from .models import BrSeq, BrIf, BrEval, BrPerform, CondLeaf +from .models import BrSeq, BrIf, BrEval, BrPerform, BrSearch, CondLeaf from .cond import parse_single_condition, parse_compound_condition, is_field, collect_leaves, evaluate_tree @@ -83,6 +83,26 @@ def collect_decision_points(node, fields, counter=None): p, l = _walk_collect(node.other_seq, fields, counter) points.extend(p); all_leaves.extend(l) + elif isinstance(node, BrSearch): + counter[0] += 1 + branch_names = [] + for cond_text, seq in node.when_list: + branch_names.append(f'WHEN {cond_text[:40]}') + if node.has_at_end: + branch_names.append('AT END') + dp = DecisionPoint(id=counter[0], kind='SEARCH', + label=node.table_name, branch_names=branch_names) + dp.when_list = node.when_list + dp.cond_trees = node.cond_trees + dp.has_other = node.has_at_end + points.append(dp) + for cond_text, seq in node.when_list: + p, l = _walk_collect(seq, fields, counter) + points.extend(p); all_leaves.extend(l) + if node.has_at_end: + p, l = _walk_collect(node.at_end_seq, fields, counter) + points.extend(p); all_leaves.extend(l) + elif isinstance(node, BrPerform): if node.perf_type in ('until', 'para_until', 'varying', 'para_varying'): counter[0] += 1 @@ -92,6 +112,13 @@ def collect_decision_points(node, fields, counter=None): simple = parse_single_condition(node.condition) if node.condition else None if simple and is_field(simple[0], fields): dp.parsed = simple + elif node.condition: + cond_tree = parse_compound_condition(node.condition, fields) + if cond_tree: + leaves = collect_leaves(cond_tree) + if leaves: + dp.cond_tree = cond_tree + dp.cond_leaves = list(leaves) points.append(dp) p, l = _walk_collect(node.body_seq, fields, counter) points.extend(p); all_leaves.extend(l) @@ -116,9 +143,11 @@ def mark_coverage(decision_points, leaf_stats, branch_paths, fields): if dp.kind == 'IF': _mark_if(dp, cons) elif dp.kind == 'EVALUATE': - _mark_eval(dp, cons) + _mark_eval(dp, cons, fields) elif dp.kind == 'PERFORM': _mark_perform(dp, cons) + elif dp.kind == 'SEARCH': + _mark_search(dp, cons, fields) for leaf in leaf_stats: for c in cons: if _match_leaf(c, leaf): @@ -128,7 +157,7 @@ def mark_coverage(decision_points, leaf_stats, branch_paths, fields): leaf.covered_false = True for dp in decision_points: - _infer_implied(dp) + dp.implied_branches = set(dp.active_branches) def _match_constraint(c, parsed): @@ -180,18 +209,20 @@ def _mark_if(dp, cons): dp.active_branches.add('T' if c[3] else 'F') -def _mark_eval(dp, cons): +def _mark_eval(dp, cons, fields=None): if dp.label == 'TRUE': + matched = False for when_val, _ in dp.when_list: - parsed = parse_single_condition(when_val) + parsed = parse_single_condition(when_val, fields) if parsed: for c in cons: - if _match_constraint(c, parsed): + if _match_constraint(c, parsed) and c[3]: name = f"WHEN {when_val}" if name in dp.branch_names: dp.active_branches.add(name) + matched = True else: - cond_tree = parse_compound_condition(when_val) + cond_tree = parse_compound_condition(when_val, fields) if cond_tree and not isinstance(cond_tree, CondLeaf): leaves = list(collect_leaves(cond_tree)) assignment = {} @@ -205,6 +236,15 @@ def _mark_eval(dp, cons): name = f"WHEN {when_val}" if name in dp.branch_names: dp.active_branches.add(name) + matched = True + if not matched and 'OTHER' in dp.branch_names: + when_fields = set() + for when_val, _ in dp.when_list: + for c in cons: + if c[0] in when_val: + when_fields.add(c[0]) + if when_fields: + dp.active_branches.add('OTHER') return for c in cons: if c[0] == dp.label and c[1] == '=': @@ -215,6 +255,44 @@ def _mark_eval(dp, cons): dp.active_branches.add('OTHER') +def _mark_search(dp, cons, fields=None): + branch_masks = [False] * len(dp.branch_names) + for i, (cond_text, body_seq) in enumerate(dp.when_list): + cond_tree = dp.cond_trees[i] if i < len(dp.cond_trees) else None + if not cond_tree: + continue + if isinstance(cond_tree, CondLeaf): + for c in cons: + if len(c) == 4: + base_c = re.sub(r'\s*\(.*?\)\s*$', '', c[0]) + base_cond = re.sub(r'\s*\(.*?\)\s*$', '', cond_tree.field) + if base_c == base_cond and c[1] == cond_tree.op \ + and str(c[2]) == str(cond_tree.value) and c[3]: + branch_masks[i] = True + break + else: + leaves = list(collect_leaves(cond_tree)) + assignment = {} + for leaf in leaves: + for c in cons: + if len(c) == 4: + base_c = re.sub(r'\s*\(.*?\)\s*$', '', c[0]) + base_l = re.sub(r'\s*\(.*?\)\s*$', '', leaf.field) + if base_c == base_l and c[1] == leaf.op and str(c[2]) == str(leaf.value): + assignment[leaf] = c[3] + break + if len(assignment) == len(leaves): + if evaluate_tree(cond_tree, assignment): + branch_masks[i] = True + if dp.has_other: + at_end_idx = len(dp.branch_names) - 1 + if not any(branch_masks[:at_end_idx]): + branch_masks[at_end_idx] = True + for i, m in enumerate(branch_masks): + if m: + dp.active_branches.add(dp.branch_names[i]) + + def _mark_perform(dp, cons): simple = getattr(dp, 'parsed', None) if simple: @@ -224,6 +302,18 @@ def _mark_perform(dp, cons): dp.active_branches.add('Skip') else: dp.active_branches.add('Enter') + elif dp.cond_tree and dp.cond_leaves: + assignment = {} + for leaf in dp.cond_leaves: + for c in cons: + if _match_leaf(c, leaf): + assignment[leaf] = c[3] + break + if len(assignment) == len(dp.cond_leaves): + if evaluate_tree(dp.cond_tree, assignment): + dp.active_branches.add('Skip') + else: + dp.active_branches.add('Enter') else: for c in cons: if c[0] == dp.label or any(c[0] == f for f in _get_fields_in_cond(dp.label)): @@ -237,10 +327,6 @@ def _get_fields_in_cond(cond_text): return re.findall(r'[A-Z][A-Z0-9-]*', cond_text.upper()) -def _infer_implied(dp): - dp.implied_branches.update(dp.active_branches) - - # ── 行号定位(基于原始源文本)── def locate_decision_lines(decision_points, raw_source): diff --git a/cobol_testgen/design.py b/cobol_testgen/design.py index c0d4542..046e67f 100644 --- a/cobol_testgen/design.py +++ b/cobol_testgen/design.py @@ -1,20 +1,15 @@ """设计层:路径枚举 + 值生成 + 约束应用""" -import os import re import logging -from . import agents, CONFIG -from .models import BrSeq, BrIf, BrEval, BrPerform, Assign, CallNode, CondNot, CondLeaf, ExitNode, GoTo +from .models import BrSeq, BrIf, BrEval, BrPerform, BrSearch, Assign, CallNode, CondNot, CondLeaf, ExitNode, GoTo from .cond import parse_single_condition, parse_compound_condition, is_field, collect_leaves, mcdc_sets, satisfying_value from .core import trace_to_root, invert_through_chain, propagate_assignments, _basename logger = logging.getLogger(__name__) _STOP = ('__STOP__', '', None, True) -_MAX_PATHS = 5000 -_FALLBACK_MAX_PATHS = 100 -_ACTIVE_MAX_PATHS = _MAX_PATHS -_LLM_FAILED = False +_MAX_PATHS = 10000 def _filter_stop(cons): @@ -22,46 +17,51 @@ def _filter_stop(cons): def _cap_paths(paths): - if len(paths) > _ACTIVE_MAX_PATHS: - return paths[:_ACTIVE_MAX_PATHS] + if len(paths) > _MAX_PATHS: + return paths[:_MAX_PATHS] return paths +def _cap_paths_fair(new_active, child_paths): + """两阶段公平截断:每个前置路径至少保留一条子路径,再填充剩余配额。""" + if len(new_active) <= _MAX_PATHS: + return new_active + k = len(child_paths) + if k <= 1: + return new_active[:_MAX_PATHS] + # 分离 STOP 路径(不参与组合,直接保留) + stop_paths = [(p, a) for p, a in new_active if any(c is _STOP for c in p)] + combined = [(p, a) for p, a in new_active if not any(c is _STOP for c in p)] + n_pred = len(combined) // k + result = list(stop_paths) + if n_pred <= 1: + result.extend(combined[:_MAX_PATHS - len(result)]) + return result[:_MAX_PATHS] + remaining_quota = _MAX_PATHS - len(result) + # Phase 1: 每个前置至少保留一条子路径(轮询分配不同子路径索引) + quota = min(n_pred, remaining_quota) + selected = set() + for p_idx in range(quota): + c_idx = p_idx % k + idx = p_idx * k + c_idx + selected.add(idx) + result.append(combined[idx]) + if len(result) >= _MAX_PATHS: + return result[:_MAX_PATHS] + # Phase 2: 用剩余配额填充其余组合 + remaining = _MAX_PATHS - len(result) + for idx in range(len(combined)): + if idx not in selected: + result.append(combined[idx]) + remaining -= 1 + if remaining <= 0: + break + return result[:_MAX_PATHS] + + # ── 路径枚举 ── -def _try_llm_enum_paths(node, fields): - global _LLM_FAILED - if _LLM_FAILED: - logger.debug("断路器已跳,跳过 LLM") - return None - if not CONFIG.get("llm_generator", True): - logger.debug("llm_generator 已关闭,降级规则引擎") - return None - if not os.environ.get(agents.DEEPSEEK_API_KEY_ENV): - logger.warning("DEEPSEEK_API_KEY 未设置,降级规则引擎") - return None - try: - result = agents.llm_generate_all_paths(node, fields) - if result is not None: - logger.info(f"LLM 路径生成成功,{len(result)} 条") - return result - logger.warning("LLM 返回空,降级规则引擎") - except Exception as e: - logger.error(f"LLM API 调用异常: {e}") - _LLM_FAILED = True - return None - - def enum_paths(node, fields): - global _ACTIVE_MAX_PATHS - # === LLM 优先(整体替换整个树的路径生成) === - llm_result = _try_llm_enum_paths(node, fields) - if llm_result is not None: - _ACTIVE_MAX_PATHS = _MAX_PATHS - return llm_result - if _ACTIVE_MAX_PATHS == _MAX_PATHS: - logger.warning("降级到规则引擎(路径上限 5000 → 100)") - _ACTIVE_MAX_PATHS = _FALLBACK_MAX_PATHS """枚举路径,每条路径返回 (constraints, assignments). 返回 list[tuple[list[tuple], dict]]. """ @@ -86,7 +86,7 @@ def enum_paths(node, fields): merged.setdefault(k, []).extend(v if isinstance(v, list) else [v]) merged_cons = p_cons + list(cp_cons) new_active.append((merged_cons, merged)) - paths = _cap_paths(new_active) + paths = _cap_paths_fair(new_active, child_paths) return paths elif isinstance(node, BrIf): @@ -170,49 +170,56 @@ def enum_paths(node, fields): return paths if node.subject == 'TRUE': paths = [] - prior_false = [] + prior_false_sets = [] # list[list[Constraint]] for value, seq in node.when_list: cond = parse_compound_condition(value, fields) if cond and isinstance(cond, CondLeaf) and is_field(cond.field, fields): - # Simple condition sub = _cap_paths(enum_paths(seq, fields)) for sp_cons, sp_assign in (sub or [([], {})]): - constraints = list(prior_false) + constraints = [c for pf in prior_false_sets for c in pf] constraints.append((cond.field, cond.op, cond.value, True)) paths.append((constraints + sp_cons, sp_assign)) - prior_false.append((cond.field, cond.op, cond.value, False)) + prior_false_sets.append([(cond.field, cond.op, cond.value, False)]) elif cond: - # Compound condition — use MC/DC for path generation leaves = collect_leaves(cond) if leaves and all(is_field(l.field, fields) for l in leaves): sets = mcdc_sets(cond, fields) if sets: sub = _cap_paths(enum_paths(seq, fields)) - false_set = None + new_false_sets = [] for cs, decision in sets: if decision: - for sp_cons, sp_assign in (sub or [([], {})]): - paths.append((list(prior_false) + list(cs) + sp_cons, sp_assign)) - elif false_set is None: - false_set = cs - if false_set is not None: - prior_false.extend(false_set) - else: - prior_false = [] + if not prior_false_sets: + for sp_cons, sp_assign in (sub or [([], {})]): + paths.append((list(cs) + sp_cons, sp_assign)) + else: + for pf_set in prior_false_sets: + for sp_cons, sp_assign in (sub or [([], {})]): + paths.append((list(pf_set) + list(cs) + sp_cons, sp_assign)) + else: + new_false_sets.append(cs) + if not new_false_sets: + prior_false_sets = [] break + combined = [] + for pf_set in prior_false_sets: + for nf_set in new_false_sets: + combined.append(list(pf_set) + list(nf_set)) + prior_false_sets = combined else: - prior_false = [] + prior_false_sets = [] break else: - prior_false = [] + prior_false_sets = [] break else: - prior_false = [] + prior_false_sets = [] break if node.has_other: sub = _cap_paths(enum_paths(node.other_seq, fields)) for sp_cons, sp_assign in (sub or [([], {})]): - paths.append((list(prior_false) + sp_cons, sp_assign)) + constraints = [c for pf in prior_false_sets for c in pf] + paths.append((constraints + sp_cons, sp_assign)) return paths if not is_field(node.subject, fields): return [([], {})] @@ -228,6 +235,9 @@ def enum_paths(node, fields): paths.append(([(node.subject, 'not_in', case_vals, True)] + sp_cons, sp_assign)) return paths + elif isinstance(node, BrSearch): + return _enum_search_paths(node, fields) + elif isinstance(node, BrPerform): if node.perf_type in ('para', 'thru'): if node.body_seq: @@ -743,12 +753,90 @@ def apply_occurs_depending(rec, fields): rec[name] = '0' * length +def _non_match_for(cond_leaf, fields): + if not fields or not cond_leaf: + return None + base = re.sub(r'\s*\(.*?\)\s*$', '', cond_leaf.field) + for f in fields: + if re.sub(r'\s*\(.*?\)\s*$', '', f['name']) == base: + pic = f.get('pic_info', {}) + if pic.get('type') == 'numeric': + return '0' + return ' ' + return None + + +def _enum_search_paths(node, fields): + # 从条件字段名推断 OCCURS 数;如 WS-CODE-VAL(WS-IDX) → 查 WS-CODE-VAL(j) 最大 j + occurs_count = 1 + if node.when_list and node.cond_trees and node.cond_trees[0]: + ct = node.cond_trees[0] + if isinstance(ct, CondLeaf): + base = re.sub(r'\s*\(.*?\)\s*$', '', ct.field) + for f in fields: + m = re.match(rf'^{re.escape(base)}\((\d+)\)$', f['name']) + if m: + occurs_count = max(occurs_count, int(m.group(1))) + if occurs_count <= 1: + # 再查父组名下各字段的后缀 + parent = node.table_name + for f in fields: + m = re.match(rf'^{re.escape(parent)}\((\d+)\)$', f['name']) + if m: + occurs_count = max(occurs_count, int(m.group(1))) + + paths = [] + for i, (cond_text, body_seq) in enumerate(node.when_list): + cond_tree = node.cond_trees[i] if i < len(node.cond_trees) else None + sub = _cap_paths(enum_paths(body_seq, fields)) + if not sub: + sub = [([], {})] + + extra_assign = {} + if cond_tree and isinstance(cond_tree, CondLeaf): + base = re.sub(r'\s*\(.*?\)\s*$', '', cond_tree.field) + matching_val = cond_tree.value + elem_key = f'{base}({i + 1})' + extra_assign[elem_key] = [{'type': 'move_literal', 'literal': matching_val}] + non_match = _non_match_for(cond_tree, fields) or ' ' + for j in range(i): + prev_key = f'{base}({j + 1})' + extra_assign[prev_key] = [{'type': 'move_literal', 'literal': non_match}] + + for sp_cons, sp_assign in (sub or [([], {})]): + merged_assign = dict(extra_assign) + for k, v in sp_assign.items(): + merged_assign.setdefault(k, []).extend(v if isinstance(v, list) else [v]) + paths.append((sp_cons, merged_assign)) + + if node.has_at_end: + sub = _cap_paths(enum_paths(node.at_end_seq, fields)) + for sp_cons, sp_assign in (sub or [([], {})]): + extra_assign = {} + non_match = ' ' + if node.when_list: + ct = node.cond_trees[0] + if ct and isinstance(ct, CondLeaf): + non_match = _non_match_for(ct, fields) or ' ' + base = re.sub(r'\s*\(.*?\)\s*$', '', ct.field) + for j in range(max(occurs_count, 1)): + extra_assign[f'{base}({j + 1})'] = [{'type': 'move_literal', 'literal': non_match}] + merged_assign = dict(extra_assign) + for k, v in sp_assign.items(): + merged_assign.setdefault(k, []).extend(v if isinstance(v, list) else [v]) + paths.append((sp_cons, merged_assign)) + + return paths + + def generate_records(branch_paths_with_assigns, data_fields, base_assignments=None, file_sec=None): """生成测试数据记录。 branch_paths_with_assigns: list of (constraints, path_assignments). base_assignments: 全局 assignments dict (用于 trace_to_root). + 返回: (records, kept_path_cons) — kept_path_cons 是与 records 一一对应的约束。 """ records = [] + kept_path_cons = [] if branch_paths_with_assigns: for seq, (path_cons, path_assign) in enumerate(branch_paths_with_assigns, start=1): path_cons = _filter_stop(path_cons) @@ -756,20 +844,51 @@ def generate_records(branch_paths_with_assigns, data_fields, base_assignments=No # Pass A: 先传播赋值(MOVE/COMPUTE/READ INTO 等),模拟到决策点前的程序状态 if isinstance(path_assign, dict): propagate_assignments(rec, path_assign, data_fields, file_sec=file_sec) + # Pass A.5: 检查约束是否经过链追溯到字面量截断(不可能路径) + skip_impossible = False + if base_assignments and isinstance(path_assign, dict): + for c in path_cons: + if len(c) == 4 and not skip_impossible: + field, op, val, want = c + root_var, chain = trace_to_root(field, base_assignments, data_fields, path_assign) + if root_var != field: + new_fn, new_op, new_val = invert_through_chain(root_var, chain, op, val) + if any(f['name'] == new_fn for f in data_fields): + asgn_val = path_assign.get(root_var) + if asgn_val is not None: + asgn_list = asgn_val if isinstance(asgn_val, list) else [asgn_val] + if asgn_list and asgn_list[-1]['type'] == 'move_literal' and root_var in rec: + if not _check_constraint_satisfied(rec, root_var, new_op, new_val, want, data_fields): + skip_impossible = True + break + if skip_impossible: + continue # Pass B: 约束覆盖(确保决策条件满足,覆盖 MOVE 带来的值) for c in path_cons: if len(c) == 4: field, op, val, want = c apply_constraint(rec, field, op, val, want, data_fields, base_assignments, path_assign) + # Pass B.5: 前向再传播变量间MOVE,保持约束修改后的链一致性 + if isinstance(path_assign, dict): + forward = {} + for tgt, asgn_val in path_assign.items(): + asgn_list = asgn_val if isinstance(asgn_val, list) else [asgn_val] + filtered = [a for a in asgn_list if a['type'] == 'move' and a.get('source_vars')] + if filtered: + forward[tgt] = filtered + if forward: + propagate_assignments(rec, forward, data_fields, file_sec=file_sec) # Pass C: 同步 REDEFINES(确保共享存储一致) sync_redefined_fields(rec, data_fields) # Pass D: OCCURS DEPENDING ON — 清零超范围的下标字段 apply_occurs_depending(rec, data_fields) records.append(rec) + kept_path_cons.append(path_cons) if not records: rec = make_base_record(1, data_fields) if base_assignments: propagate_assignments(rec, base_assignments, data_fields, file_sec=file_sec) records.append(rec) - return records + kept_path_cons.append([]) + return records, kept_path_cons diff --git a/cobol_testgen/models.py b/cobol_testgen/models.py index 4a06fc8..716d098 100644 --- a/cobol_testgen/models.py +++ b/cobol_testgen/models.py @@ -115,6 +115,18 @@ class CondOr: self.right = right +class BrSearch: + """SEARCH / SEARCH ALL 表查找""" + def __init__(self, table_name, is_all=False, varying=None): + self.table_name = table_name + self.is_all = is_all + self.varying = varying.upper() if varying else None + self.at_end_seq = BrSeq() + self.when_list = [] # [(condition_text, BrSeq)] + self.cond_trees = [] # [cond_tree, ...] + self.has_at_end = False + + class GoTo: """GO TO 节点:无条件跳转到指定段落""" def __init__(self, target: str, body_seq: 'BrSeq' = None): diff --git a/cobol_testgen/prompts/parse_proc_division.txt b/cobol_testgen/prompts/parse_proc_division.txt deleted file mode 100644 index 4062656..0000000 --- a/cobol_testgen/prompts/parse_proc_division.txt +++ /dev/null @@ -1,596 +0,0 @@ -你是一个 COBOL 自动化测试数据生成器的核心解析模块。你的任务是将预处理的 COBOL PROCEDURE DIVISION 源码转换为结构化的 JSON 树,用于后续的路径枚举和测试数据生成。 - -## 输入格式 - -你会收到两样东西: -1. **PROCEDURE DIVISION 源码文本** — 已预处理(大写、无注释、缩进规整) -2. **DATA DIVISION 字段列表** — JSON 数组,每个字段包括 name/level/pic/pic_info 等 - -## 输出格式 - -输出一个 JSON 对象,包含两个顶级键: - -### 1. `assignments` (对象) -记录了 PROCEDURE DIVISION 中每个赋值语句的来源信息。键是目标字段名,值是一个对象,类型如下: - -- **move**: 变数对变数 MOVE (e.g., `MOVE WS-A TO WS-B`) - ```json - {"type": "move", "source_vars": ["WS-A"]} - ``` -- **move_literal**: 字面量/定数 MOVE (e.g., `MOVE 'HELLO' TO WS-B`, `MOVE ZERO TO WS-B`) - ```json - {"type": "move_literal", "literal": "HELLO"} - ``` -- **compute**: COMPUTE/ADD/SUBTRACT/MULTIPLY/DIVIDE - - 二元运算 (var OP const / const OP var): - ```json - {"type": "compute", "source_vars": ["WS-A"], "op": "+", "const": 5, "expr": "WS-A + 5"} - ``` - - 变数间运算 (var OP var): - ```json - {"type": "compute", "source_vars": ["WS-A", "WS-B"], "op": "+", "expr": "WS-A + WS-B"} - ``` - - 复杂表达式 (无法解析): - ```json - {"type": "compute", "source_vars": ["WS-A", "WS-B"], "op": null, "const": null, "expr": "WS-A * (WS-B + 1)"} - ``` - -### 2. `tree` (对象) -一个递归的 JSON 树,表示 PROCEDURE DIVISION 的代码结构。不要包含注释、段落标签(仅作为 PERFORM 目标引用)。 - -#### 节点类型 - -**seq**: 顺序序列(子节点列表) -```json -{"type": "seq", "children": [子节点...]} -``` - -**assign**: 赋值语句(MOVE / COMPUTE / ADD / SUBTRACT / MULTIPLY / DIVIDE) -```json -{"type": "assign", "target": "WS-STATUS", "source_info": {"type": "move_literal", "literal": "H"}} -``` -source_info 必须与 assignments 中对应条目一致。 - -**if**: 条件分支 -```json -{ - "type": "if", - "condition": "WS-AMOUNT > 1000", - "true_seq": {"type": "seq", "children": [...]}, - "false_seq": {"type": "seq", "children": [...]} -} -``` -- 如果无 ELSE,false_seq 应为 `{"type": "seq", "children": []}` -- condition 保持原始文本(不加解析) - -**eval**: EVALUATE 多路分支 -```json -{ - "type": "eval", - "subject": "WS-TYPE", - "when_list": [ - {"value": "A", "seq": {"type": "seq", "children": [...]}}, - {"value": "B", "seq": {"type": "seq", "children": [...]}} - ], - "other_seq": {"type": "seq", "children": [...]}, - "has_other": true -} -``` -- WHEN OTHER 时 has_other=true -- 无 WHEN OTHER 时 has_other=false, other_seq 为空 seq - -**call**: CALL 子程序调用 -```json -{"type": "call", "program_name": "SUBPGM", "using_params": [ - {"name": "WS-AMOUNT", "mechanism": "reference"}, - {"name": "WS-RESULT", "mechanism": "reference"} -]} -``` -- CALL 是顺序执行语句(不产生分支),作为 seq 的子节点放在相应位置 -- USING 参数按 COBOL 源码顺序列出 -- mechanism 取值: - - `"reference"`: BY REFERENCE(默认)— 子程序可能修改该变量 - - `"content"`: BY CONTENT — 传副本,调用方变量不会被修改 - - `"value"`: BY VALUE — 传值(仅数值/指针) - - 无 BY 子句时默认为 `"reference"` -- 字面量参数(如 `BY VALUE 100`)不包含字段名,只在 mechanism 为 `"value"` 时保留 - -**perform**: PERFORM 语句 -```json -// 段落调用: -{"type": "perform", "perf_type": "para", "target": "1000-INIT"} - -// PERFORM THRU: -{"type": "perform", "perf_type": "thru", "target": "1000-INIT", "thru": "2000-END"} - -// 内联 PERFORM UNTIL: -{"type": "perform", "perf_type": "until", "condition": "WS-COUNT > 3", - "body_seq": {"type": "seq", "children": [...]}} - -// PERFORM VARYING: -{"type": "perform", "perf_type": "varying", "condition": "WS-I > 10", - "varying_var": "WS-I", "varying_from": "1", "varying_by": "1", - "body_seq": {"type": "seq", "children": [...]}} - -// PERFORM 段落 + UNTIL: -{"type": "perform", "perf_type": "para_until", "target": "2000-HIGH", "condition": "WS-COUNT > 100"} -``` - -### 定数 (Figurative Constants) 处理规则 - -以下定数在 MOVE 时直接用作字面量(保留原值): - -| 定数 | 规则 | -|------|------| -| ZERO / ZEROS / ZEROES | `literal: "0"` | -| SPACE / SPACES | `literal: " "` | -| HIGH-VALUE / HIGH-VALUES | `literal: "HIGH-VALUE"` | -| LOW-VALUE / LOW-VALUES | `literal: "LOW-VALUE"` | -| QUOTE / QUOTES | `literal: "'"` | -| ALL literal | `literal: literal值` | - -## COBOL 语法处理规则 - -### 1. IF 语句 -``` -IF condition - statements... -[ELSE - statements...] -END-IF. -``` -- condition 可以是简单条件、复合条件(AND/OR)、带 NOT 前置 -- true_seq 为 condition 为真时执行的分支,false_seq 为条件为假时的分支 -- IF 可以和 ELSE IF 嵌套,此时结构化为嵌套 if 的 false_seq - -### 2. EVALUATE 语句 -``` -EVALUATE subject - WHEN value1 - statements... - WHEN value2 - statements... - WHEN OTHER - statements... -END-EVALUATE. -``` -- subject 是单个字段 -- value 是具体值或 OTHER -- 每个 WHEN 的 seq 是该分支下的语句序列 -- WHEN 内的 GO TO / STOP RUN 不影响结构 - -### 3. PERFORM 语句 - -多种形态: - -**段落调用**: -``` -PERFORM 1000-INIT -``` - -**段落范围**: -``` -PERFORM 1000-INIT THRU 2000-END -``` - -**内联 UNTIL**: -``` -PERFORM UNTIL condition - statements... -END-PERFORM -``` - -**VARYING**: -``` -PERFORM VARYING WS-I FROM 1 BY 1 UNTIL WS-I > 10 - statements... -END-PERFORM -``` - -**段落 + UNTIL**: -``` -PERFORM 2000-HIGH UNTIL WS-COUNT > 100 -``` - -### 4. 段落 (Paragraphs) - -PROCEDURE DIVISION 中的段落以标签名(后跟句点)开始、以下一个段落标签或文件末尾结束。 - -``` -PARA-NAME. - statement - statement - . -NEXT-PARA. - statement -``` - -段落标签会被 PERFORM 引用。如果代码不在任何 PERFORM 中执行(顶级流程),段落按顺序依次执行,遇到 STOP RUN / GOBACK 结束。 - -在树结构中: -- 顶级流程入口(PROCEDURE DIVISION 后的第一个段落)作为树的根 seq -- 后续每个段落对应一个独立的 seq,只有在被 PERFORM 调用时才执行 -- 段落标签本身不是节点,只作为 PERFORM 的目标引用 - -### 5. CALL 语句 - -CALL 调用子程序,参数通过 USING 传递。 - -``` -CALL 'SUBPGM' USING WS-A WS-B WS-C -CALL 'SUBPGM' USING BY REFERENCE WS-A BY CONTENT WS-B BY VALUE 100 -``` - -- CALL 是顺序执行,不产生分支 -- USING 参数按 COBOL 源码顺序列出 -- 缺省传递机制时默认为 BY REFERENCE -- 字段名参数保持原样,字面量/数值参数如 `BY VALUE 100` 不放入 using_params(因为无字段名) -- CALL 后继续执行下一条语句 - -### 6. 赋值语句 - -| COBOL | JSON 类型 | 示例 source_info | -|-------|-----------|-----------------| -| MOVE 'HELLO' TO WS-A | move_literal | `{"type":"move_literal","literal":"HELLO"}` | -| MOVE WS-B TO WS-A | move | `{"type":"move","source_vars":["WS-B"]}` | -| MOVE ZERO TO WS-A | move_literal | `{"type":"move_literal","literal":"0"}` | -| MOVE SPACE TO WS-A | move_literal | `{"type":"move_literal","literal":" "}` | -| MOVE HIGH-VALUE TO WS-A | move_literal | `{"type":"move_literal","literal":"HIGH-VALUE"}` | -| COMPUTE WS-A = WS-B + 1 | compute (const OP var) | `{"type":"compute","source_vars":["WS-B"],"op":"+","const":1,"expr":"WS-B + 1"}` | -| COMPUTE WS-A = 2 * WS-B | compute (const OP var) | 同上,op="*" | -| COMPUTE WS-A = WS-B + WS-C | compute (var OP var) | `{"type":"compute","source_vars":["WS-B","WS-C"],"op":"+","expr":"WS-B + WS-C"}` | -| COMPUTE WS-A = (WS-B + 1) * WS-C | compute (复杂) | `{"type":"compute","source_vars":["WS-B","WS-C"],"op":null,"const":null,"expr":"(WS-B + 1) * WS-C"}` | -| ADD 5 TO WS-A | compute (const) | `{"type":"compute","source_vars":["WS-A"],"op":"+","const":5,"expr":"WS-A + 5"}` | -| SUBTRACT 3 FROM WS-A | compute (const) | `{"type":"compute","source_vars":["WS-A"],"op":"-","const":3,"expr":"WS-A - 3"}` | -| MULTIPLY 2 BY WS-A | compute (const) | `{"type":"compute","source_vars":["WS-A"],"op":"*","const":2,"expr":"WS-A * 2"}` | -| DIVIDE 4 INTO WS-A | compute (const) | `{"type":"compute","source_vars":["WS-A"],"op":"/","const":4,"expr":"WS-A / 4"}` | - -### 7. 控制流结束 - -| 语句 | 含义 | -|------|------| -| STOP RUN | 程序结束,不执行后续代码 | -| GOBACK | 返回调用者(类似 STOP RUN) | -| EXIT PROGRAM | 返回调用者 | - -这些语句不是树节点,但标记了当前段落/分支的结束。 - -### 8. 88-level 条件名 - -``` -05 CALL-TYPE PIC X(1). - 88 CALL-LOCAL VALUE 'L'. - 88 CALL-DOMESTIC VALUE 'D'. -``` - -在条件中如 `IF CALL-LOCAL`,等价于 `IF CALL-TYPE = 'L'`。条件名可替换为父字段 + 值。 - -## 输出规则总结 - -1. **assignments**: 包含所有出现的赋值语句,**不区分分支**(全局收集) -2. **tree**: 只包含结构化的 if/eval/perform/assign 节点,**不包含段落标签** -3. 注释行(* 在第7列)已被预处理移除 -4. 每个 assign 节点必须与 assignments 中的条目一一对应 -5. condition 保持原始文本,不要解析或转换 -6. 88-level 条件在 tree.condition 中直接替换为父字段条件(如 `IF CALL-TYPE = 'L'`) -7. 赋值中的字段名、字面量保持原始值,多单词字段用连字符(如 WS-AMOUNT) - -## Few-Shot 示例 - -### 示例 1:简单 IF/ELSE -**输入:** -``` -PROCEDURE DIVISION. - IF WS-AMOUNT > 1000 - MOVE 'H' TO WS-STATUS - ELSE - MOVE 'L' TO WS-STATUS - END-IF. - STOP RUN. -``` - -**输出:** -```json -{ - "assignments": { - "WS-STATUS": {"type": "move_literal", "literal": "H"}, - "WS-STATUS": {"type": "move_literal", "literal": "L"} - }, - "tree": { - "type": "seq", - "children": [ - { - "type": "if", - "condition": "WS-AMOUNT > 1000", - "true_seq": { - "type": "seq", - "children": [ - {"type": "assign", "target": "WS-STATUS", "source_info": {"type": "move_literal", "literal": "H"}} - ] - }, - "false_seq": { - "type": "seq", - "children": [ - {"type": "assign", "target": "WS-STATUS", "source_info": {"type": "move_literal", "literal": "L"}} - ] - } - } - ] - } -} -``` - -### 示例 2:EVALUATE -**输入:** -``` -PROCEDURE DIVISION. - EVALUATE WS-TYPE - WHEN 'A' - MOVE 'TYPE-A' TO WS-MEMO - WHEN 'B' - MOVE 'TYPE-B' TO WS-MEMO - WHEN OTHER - MOVE 'OTHER' TO WS-MEMO - END-EVALUATE. - STOP RUN. -``` - -**输出:** -```json -{ - "assignments": { - "WS-MEMO": {"type": "move_literal", "literal": "TYPE-A"}, - "WS-MEMO": {"type": "move_literal", "literal": "TYPE-B"}, - "WS-MEMO": {"type": "move_literal", "literal": "OTHER"} - }, - "tree": { - "type": "seq", - "children": [ - { - "type": "eval", - "subject": "WS-TYPE", - "when_list": [ - {"value": "A", "seq": {"type": "seq", "children": [ - {"type": "assign", "target": "WS-MEMO", "source_info": {"type": "move_literal", "literal": "TYPE-A"}} - ]}}, - {"value": "B", "seq": {"type": "seq", "children": [ - {"type": "assign", "target": "WS-MEMO", "source_info": {"type": "move_literal", "literal": "TYPE-B"}} - ]}} - ], - "other_seq": {"type": "seq", "children": [ - {"type": "assign", "target": "WS-MEMO", "source_info": {"type": "move_literal", "literal": "OTHER"}} - ]}, - "has_other": true - } - ] - } -} -``` - -### 示例 3:嵌套 IF + PERFORM 段落 -**输入:** -``` -PROCEDURE DIVISION. - IF WS-AMOUNT > 5000 - PERFORM 2000-HIGH - ELSE - PERFORM 3000-LOW - END-IF. - STOP RUN. -2000-HIGH. - MOVE 'H' TO WS-STATUS. -3000-LOW. - MOVE 'L' TO WS-STATUS. -``` - -**输出:** -```json -{ - "assignments": { - "WS-STATUS": {"type": "move_literal", "literal": "H"}, - "WS-STATUS": {"type": "move_literal", "literal": "L"} - }, - "tree": { - "type": "seq", - "children": [ - { - "type": "if", - "condition": "WS-AMOUNT > 5000", - "true_seq": {"type": "seq", "children": [ - {"type": "perform", "perf_type": "para", "target": "2000-HIGH"} - ]}, - "false_seq": {"type": "seq", "children": [ - {"type": "perform", "perf_type": "para", "target": "3000-LOW"} - ]} - } - ] - } -} -``` - -### 示例 4:内联 PERFORM UNTIL -**输入:** -``` -PROCEDURE DIVISION. - MOVE 1 TO WS-COUNT. - PERFORM UNTIL WS-COUNT > 10 - ADD 1 TO WS-COUNT - END-PERFORM. - STOP RUN. -``` - -**输出:** -```json -{ - "assignments": { - "WS-COUNT": {"type": "move_literal", "literal": "1"}, - "WS-COUNT": {"type": "compute", "source_vars": ["WS-COUNT"], "op": "+", "const": 1, "expr": "WS-COUNT + 1"} - }, - "tree": { - "type": "seq", - "children": [ - {"type": "assign", "target": "WS-COUNT", "source_info": {"type": "move_literal", "literal": "1"}}, - { - "type": "perform", - "perf_type": "until", - "condition": "WS-COUNT > 10", - "body_seq": {"type": "seq", "children": [ - {"type": "assign", "target": "WS-COUNT", "source_info": {"type": "compute", "source_vars": ["WS-COUNT"], "op": "+", "const": 1, "expr": "WS-COUNT + 1"}} - ]} - } - ] - } -} -``` - -### 示例 5:PERFORM VARYING + 复合条件 -**输入:** -``` -PROCEDURE DIVISION. - MOVE 0 TO WS-TOTAL-CHARGE. - PERFORM VARYING WS-COUNT FROM 1 BY 1 UNTIL WS-COUNT > 3 - IF CALL-HOUR >= 08 AND CALL-HOUR < 22 - MOVE 'Y' TO WS-PEAK-FLAG - ELSE - MOVE 'N' TO WS-PEAK-FLAG - END-IF - END-PERFORM. - STOP RUN. -``` - -**输出:** -```json -{ - "assignments": { - "WS-TOTAL-CHARGE": {"type": "move_literal", "literal": "0"}, - "WS-PEAK-FLAG": {"type": "move_literal", "literal": "Y"}, - "WS-PEAK-FLAG": {"type": "move_literal", "literal": "N"} - }, - "tree": { - "type": "seq", - "children": [ - {"type": "assign", "target": "WS-TOTAL-CHARGE", "source_info": {"type": "move_literal", "literal": "0"}}, - { - "type": "perform", - "perf_type": "varying", - "condition": "WS-COUNT > 3", - "varying_var": "WS-COUNT", - "varying_from": "1", - "varying_by": "1", - "body_seq": {"type": "seq", "children": [ - { - "type": "if", - "condition": "CALL-HOUR >= 08 AND CALL-HOUR < 22", - "true_seq": {"type": "seq", "children": [ - {"type": "assign", "target": "WS-PEAK-FLAG", "source_info": {"type": "move_literal", "literal": "Y"}} - ]}, - "false_seq": {"type": "seq", "children": [ - {"type": "assign", "target": "WS-PEAK-FLAG", "source_info": {"type": "move_literal", "literal": "N"}} - ]} - } - ]} - } - ] - } -} -``` - -### 示例 6:88-level 条件名 -**输入:** -``` -PROCEDURE DIVISION. - IF CALL-LOCAL - MOVE 'L' TO WS-TYPE - END-IF. - STOP RUN. -``` -(DATA: 88 CALL-LOCAL VALUE 'L', parent field CALL-TYPE PIC X(1)) - -**输出:** -```json -{ - "assignments": { - "WS-TYPE": {"type": "move_literal", "literal": "L"} - }, - "tree": { - "type": "seq", - "children": [ - { - "type": "if", - "condition": "CALL-TYPE = 'L'", - "true_seq": {"type": "seq", "children": [ - {"type": "assign", "target": "WS-TYPE", "source_info": {"type": "move_literal", "literal": "L"}} - ]}, - "false_seq": {"type": "seq", "children": []} - } - ] - } -} -``` - -### 示例 7:CALL 子程序调用 -**输入:** -``` -PROCEDURE DIVISION. - MOVE 0 TO WS-RESULT. - IF WS-AMOUNT > 1000 - MOVE 'H' TO WS-STATUS - CALL 'CALCSUB' USING WS-AMOUNT WS-TYPE WS-RESULT - ELSE - MOVE 'L' TO WS-STATUS - CALL 'CALCSUB' USING WS-AMOUNT WS-TYPE WS-RESULT - END-IF. - STOP RUN. -``` - -**输出:** -```json -{ - "assignments": { - "WS-RESULT": {"type": "move_literal", "literal": "0"}, - "WS-STATUS": {"type": "move_literal", "literal": "H"}, - "WS-STATUS": {"type": "move_literal", "literal": "L"} - }, - "tree": { - "type": "seq", - "children": [ - {"type": "assign", "target": "WS-RESULT", "source_info": {"type": "move_literal", "literal": "0"}}, - { - "type": "if", - "condition": "WS-AMOUNT > 1000", - "true_seq": {"type": "seq", "children": [ - {"type": "assign", "target": "WS-STATUS", "source_info": {"type": "move_literal", "literal": "H"}}, - {"type": "call", "program_name": "CALCSUB", "using_params": [ - {"name": "WS-AMOUNT", "mechanism": "reference"}, - {"name": "WS-TYPE", "mechanism": "reference"}, - {"name": "WS-RESULT", "mechanism": "reference"} - ]} - ]}, - "false_seq": {"type": "seq", "children": [ - {"type": "assign", "target": "WS-STATUS", "source_info": {"type": "move_literal", "literal": "L"}}, - {"type": "call", "program_name": "CALCSUB", "using_params": [ - {"name": "WS-AMOUNT", "mechanism": "reference"}, - {"name": "WS-TYPE", "mechanism": "reference"}, - {"name": "WS-RESULT", "mechanism": "reference"} - ]} - ]} - } - ] - } -} -``` - -## 错误处理 - -- 无法识别的语句:跳过该行(不影响整体结构) -- 不完整的语句(如 IF 无 END-IF):尝试合理推断嵌套关系 -- 嵌套段落引用(PERFORM A THRU B):使用 perf_type "thru" -- 字段名与 88-level 名冲突:以字段定义为准 - -## 输出要求 - -- 只输出一个 JSON 对象(无多余文本、无 markdown 标记) -- JSON 必须合法(双引号、正确逗号、无尾逗号) -- assignments 中**每个赋值只记录一次**(不区分分支) -- tree 必须完整包含所有可达代码路径 -- 字段名、字面量保持原始值(不转换大小写,不移动) diff --git a/cobol_testgen/read.py b/cobol_testgen/read.py index d0f4ef2..52add45 100644 --- a/cobol_testgen/read.py +++ b/cobol_testgen/read.py @@ -52,7 +52,7 @@ def preprocess(source: str) -> str: if not line: continue content = line - lines.append(content.upper()) + lines.append(re.sub(r'\s+FALSE\s+[^\s.]+', '', content.upper())) return '\n'.join(lines)