Files

190 lines
6.9 KiB
Python

# -*- coding: utf-8 -*-
import ast
import json
import os
import re
PROJECT_ROOT = os.path.abspath(os.path.join(os.path.dirname(__file__), "..", ".."))
DEFAULT_HUBOPS_PATH = os.path.join(PROJECT_ROOT, "HubOps.md")
class HubOpsParser(object):
def __init__(self, file_path=None):
self.file_path = file_path or os.getenv("JOYHUB_DOC_PATH", DEFAULT_HUBOPS_PATH)
def parse(self):
with open(self.file_path, "r", encoding="utf-8") as file:
lines = file.read().splitlines()
cases = []
headings = []
for index, line in enumerate(lines):
heading = self._parse_heading(line)
if heading:
level, title = heading
headings = [item for item in headings if item[0] < level]
headings.append((level, title))
continue
if line.strip() != "**接口URL**":
continue
case = self._parse_case(lines, index, headings)
if case.get("url"):
case["case_id"] = "joyhub_{:04d}".format(len(cases) + 1)
cases.append(case)
return cases
def _parse_case(self, lines, url_index, headings):
title = self._case_title(headings, url_index)
url = self._next_quote_value(lines, url_index)
method = self._find_section_quote(lines, url_index, "**请求方式**") or "POST"
content_type = self._find_section_quote(lines, url_index, "**Content-Type**") or "json"
body_text = self._find_code_block(lines, url_index, "**请求Body参数**")
query = self._find_param_table(lines, url_index, "**请求Query参数**")
headers = self._find_headers(lines, url_index)
body = self._parse_body(body_text)
return {
"name": title,
"method": method.upper(),
"url": url,
"content_type": content_type,
"headers": headers,
"query": query,
"body": body,
"raw_body": body_text,
}
@staticmethod
def _parse_heading(line):
match = re.match(r"^(#{2,6})\s+(.+?)\s*$", line)
if not match:
return None
return len(match.group(1)), match.group(2).strip()
@staticmethod
def _case_title(headings, url_index):
if headings:
return " / ".join(title for _, title in headings[-3:])
return "HubOps接口{}".format(url_index + 1)
@staticmethod
def _next_quote_value(lines, start):
for index in range(start + 1, min(start + 8, len(lines))):
line = lines[index].strip()
if line.startswith(">"):
value = line[1:].strip()
if value and value != "暂无参数":
return value
return ""
@staticmethod
def _section_end(lines, start):
for index in range(start + 1, len(lines)):
line = lines[index].strip()
if line.startswith("## ") or line.startswith("### ") or line == "**接口URL**":
return index
return len(lines)
def _find_section_quote(self, lines, url_index, section_name):
start = max(0, url_index - 80)
end = self._section_end(lines, url_index)
for index in range(start, min(end, len(lines))):
if lines[index].strip() == section_name:
return self._next_quote_value(lines, index)
return None
def _find_code_block(self, lines, url_index, section_name):
end = self._section_end(lines, url_index)
for index in range(url_index, end):
if lines[index].strip() != section_name:
continue
for code_start in range(index + 1, end):
if lines[code_start].strip().startswith("```"):
block = []
for code_end in range(code_start + 1, end):
if lines[code_end].strip().startswith("```"):
return "\n".join(block).strip()
block.append(lines[code_end])
return ""
def _find_param_table(self, lines, url_index, section_name):
end = self._section_end(lines, url_index)
for index in range(url_index, end):
if lines[index].strip() != section_name:
continue
params = {}
for row_index in range(index + 1, end):
row = lines[row_index].strip()
if not row.startswith("|"):
if params:
break
continue
if "---" in row or "参数名" in row or "暂无参数" in row:
continue
columns = [column.strip() for column in row.strip("|").split("|")]
if len(columns) >= 2 and columns[0]:
params[columns[0]] = self._coerce_value(columns[1])
return params
return {}
def _find_headers(self, lines, url_index):
headers = {}
end = self._section_end(lines, url_index)
for index in range(url_index, end):
if lines[index].strip() != "**请求Header参数**":
continue
table_started = False
for row_index in range(index + 1, end):
row = lines[row_index].strip()
if row.startswith("**") or row.startswith("#") or row.startswith("*"):
break
if not row.startswith("|"):
if table_started:
break
continue
table_started = True
if "---" in row or "参数名" in row or "暂无参数" in row:
continue
columns = [column.strip() for column in row.strip("|").split("|")]
if len(columns) >= 2 and columns[0] and columns[0] != "Authorization":
headers[columns[0]] = columns[1]
return headers
return headers
@classmethod
def _parse_body(cls, body_text):
if not body_text or body_text == "暂无数据":
return {}
text = cls._clean_body(body_text)
for loader in (json.loads, ast.literal_eval):
try:
value = loader(text)
return value if isinstance(value, (dict, list)) else {}
except Exception:
pass
return {}
@staticmethod
def _clean_body(text):
text = re.sub(r"//.*", "", text)
text = re.sub(r"/\*.*?\*/", "", text, flags=re.S)
text = text.replace("{{token}}", "")
text = re.sub(r",\s*([}\]])", r"\1", text)
return text.strip()
@staticmethod
def _coerce_value(value):
if value in ("-", "暂无参数", ""):
return ""
if value.isdigit():
return int(value)
if value.lower() in ("true", "false"):
return value.lower() == "true"
return value
def load_hubops_cases():
return HubOpsParser().parse()