# -*- coding: utf-8 -*- import ast import json import os import re PROJECT_ROOT = os.path.abspath(os.path.join(os.path.dirname(__file__), "..", "..")) DEFAULT_HUBOPS_PATH = os.path.join(PROJECT_ROOT, "HubOps.md") class HubOpsParser(object): def __init__(self, file_path=None): self.file_path = file_path or os.getenv("JOYHUB_DOC_PATH", DEFAULT_HUBOPS_PATH) def parse(self): with open(self.file_path, "r", encoding="utf-8") as file: lines = file.read().splitlines() cases = [] headings = [] for index, line in enumerate(lines): heading = self._parse_heading(line) if heading: level, title = heading headings = [item for item in headings if item[0] < level] headings.append((level, title)) continue if line.strip() != "**接口URL**": continue case = self._parse_case(lines, index, headings) if case.get("url"): case["case_id"] = "joyhub_{:04d}".format(len(cases) + 1) cases.append(case) return cases def _parse_case(self, lines, url_index, headings): title = self._case_title(headings, url_index) url = self._next_quote_value(lines, url_index) method = self._find_section_quote(lines, url_index, "**请求方式**") or "POST" content_type = self._find_section_quote(lines, url_index, "**Content-Type**") or "json" body_text = self._find_code_block(lines, url_index, "**请求Body参数**") query = self._find_param_table(lines, url_index, "**请求Query参数**") headers = self._find_headers(lines, url_index) body = self._parse_body(body_text) return { "name": title, "method": method.upper(), "url": url, "content_type": content_type, "headers": headers, "query": query, "body": body, "raw_body": body_text, } @staticmethod def _parse_heading(line): match = re.match(r"^(#{2,6})\s+(.+?)\s*$", line) if not match: return None return len(match.group(1)), match.group(2).strip() @staticmethod def _case_title(headings, url_index): if headings: return " / ".join(title for _, title in headings[-3:]) return "HubOps接口{}".format(url_index + 1) @staticmethod def _next_quote_value(lines, start): for index in range(start + 1, min(start + 8, len(lines))): line = lines[index].strip() if line.startswith(">"): value = line[1:].strip() if value and value != "暂无参数": return value return "" @staticmethod def _section_end(lines, start): for index in range(start + 1, len(lines)): line = lines[index].strip() if line.startswith("## ") or line.startswith("### ") or line == "**接口URL**": return index return len(lines) def _find_section_quote(self, lines, url_index, section_name): start = max(0, url_index - 80) end = self._section_end(lines, url_index) for index in range(start, min(end, len(lines))): if lines[index].strip() == section_name: return self._next_quote_value(lines, index) return None def _find_code_block(self, lines, url_index, section_name): end = self._section_end(lines, url_index) for index in range(url_index, end): if lines[index].strip() != section_name: continue for code_start in range(index + 1, end): if lines[code_start].strip().startswith("```"): block = [] for code_end in range(code_start + 1, end): if lines[code_end].strip().startswith("```"): return "\n".join(block).strip() block.append(lines[code_end]) return "" def _find_param_table(self, lines, url_index, section_name): end = self._section_end(lines, url_index) for index in range(url_index, end): if lines[index].strip() != section_name: continue params = {} for row_index in range(index + 1, end): row = lines[row_index].strip() if not row.startswith("|"): if params: break continue if "---" in row or "参数名" in row or "暂无参数" in row: continue columns = [column.strip() for column in row.strip("|").split("|")] if len(columns) >= 2 and columns[0]: params[columns[0]] = self._coerce_value(columns[1]) return params return {} def _find_headers(self, lines, url_index): headers = {} end = self._section_end(lines, url_index) for index in range(url_index, end): if lines[index].strip() != "**请求Header参数**": continue table_started = False for row_index in range(index + 1, end): row = lines[row_index].strip() if row.startswith("**") or row.startswith("#") or row.startswith("*"): break if not row.startswith("|"): if table_started: break continue table_started = True if "---" in row or "参数名" in row or "暂无参数" in row: continue columns = [column.strip() for column in row.strip("|").split("|")] if len(columns) >= 2 and columns[0] and columns[0] != "Authorization": headers[columns[0]] = columns[1] return headers return headers @classmethod def _parse_body(cls, body_text): if not body_text or body_text == "暂无数据": return {} text = cls._clean_body(body_text) for loader in (json.loads, ast.literal_eval): try: value = loader(text) return value if isinstance(value, (dict, list)) else {} except Exception: pass return {} @staticmethod def _clean_body(text): text = re.sub(r"//.*", "", text) text = re.sub(r"/\*.*?\*/", "", text, flags=re.S) text = text.replace("{{token}}", "") text = re.sub(r",\s*([}\]])", r"\1", text) return text.strip() @staticmethod def _coerce_value(value): if value in ("-", "暂无参数", ""): return "" if value.isdigit(): return int(value) if value.lower() in ("true", "false"): return value.lower() == "true" return value def load_hubops_cases(): return HubOpsParser().parse()