Files
zzyy-cs/zendao_tools/zendao_api.py
2025-12-04 17:27:14 +08:00

238 lines
7.9 KiB
Python

from __future__ import annotations
"""
ZenDao bug list crawler.
Design overview:
1. Read credentials + crawl settings from zendao_config.ini.
2. Use ZenDao REST API (`api.php/v1/accessTokens`) to obtain a short-lived token.
3. Pull paginated bug records under the configured product via `/api.php/v1/products/{product_id}/bugs`.
4. Export normalized bug data to CSV (default) or JSON, usable by other automation.
"""
import argparse
import configparser
import csv
import hashlib
import json
import sys
from dataclasses import dataclass
from pathlib import Path
from typing import Dict, Iterable, List, Sequence
import requests
from requests import Response, Session
BUG_EXPORT_FIELDS: Sequence[str] = (
"id",
"title",
"status",
"severity",
"pri",
"openedBy",
"openedDate",
"assignedTo",
"resolvedBy",
"resolution",
"lastEditedDate",
)
DEFAULT_CONFIG_PATH = Path(__file__).with_name("zendao_config.ini")
class ZenDaoAPIError(RuntimeError):
"""Raised when ZenDao returns an unexpected payload."""
@dataclass
class ZenDaoSettings:
base_url: str
username: str
password: str
product_id: int
per_page: int = 50
max_pages: int = 10
verify_ssl: bool = False
timeout: int = 10
auth_mode: str = "password" # password | token
api_token: str | None = None
auth_endpoint: str = "api.php/v1/tokens"
token_header: str = "Token"
password_hash: str = "md5" # md5 | plain
@classmethod
def from_file(cls, path: Path, section: str = "zendao") -> "ZenDaoSettings":
parser = configparser.ConfigParser()
if not path.exists():
raise FileNotFoundError(f"Config file not found: {path}")
parser.read(path, encoding="utf-8")
if section not in parser:
raise KeyError(f"Section [{section}] not found in {path}")
cfg = parser[section]
return cls(
base_url=cfg.get("base_url", "").rstrip("/"),
username=cfg.get("username", ""),
password=cfg.get("password", ""),
product_id=cfg.getint("product_id", fallback=0),
per_page=cfg.getint("per_page", fallback=50),
max_pages=cfg.getint("max_pages", fallback=10),
verify_ssl=cfg.getboolean("verify_ssl", fallback=False),
timeout=cfg.getint("timeout", fallback=10),
auth_mode=cfg.get("auth_mode", "password").lower(),
api_token=cfg.get("api_token"),
auth_endpoint=cfg.get("auth_endpoint", "api.php/v1/tokens"),
token_header=cfg.get("token_header", "Token"),
password_hash=cfg.get("password_hash", "md5").lower(),
)
def validate(self) -> None:
missing = [
name
for name, value in (
("base_url", self.base_url),
("username", self.username),
("password", self.password),
)
if not value
]
if missing:
raise ValueError(f"Missing config values: {', '.join(missing)}")
if self.product_id <= 0:
raise ValueError("product_id must be > 0")
if self.auth_mode not in {"password", "token"}:
raise ValueError("auth_mode must be 'password' or 'token'")
if self.password_hash not in {"md5", "plain"}:
raise ValueError("password_hash must be 'md5' or 'plain'")
if self.auth_mode == "token" and not self.api_token:
raise ValueError("api_token required when auth_mode=token")
class ZenDaoClient:
def __init__(self, settings: ZenDaoSettings, session: Session | None = None) -> None:
self.settings = settings
self.session: Session = session or requests.Session()
self._token: str | None = None
def authenticate(self) -> str:
if self.settings.auth_mode == "token":
if not self.settings.api_token:
raise ValueError("api_token missing in config")
self.session.headers.update({self.settings.token_header: self.settings.api_token})
self._token = self.settings.api_token
return self._token
payload = {
"account": self.settings.username,
"password": self._format_password(self.settings.password),
}
response = self._request("post", self.settings.auth_endpoint, json=payload)
data = response.json()
token = data.get("token") or data.get("accessToken")
if not token:
raise ZenDaoAPIError(f"Unexpected token response: {data}")
self._token = token
self.session.headers.update({self.settings.token_header: token})
return token
def _format_password(self, password: str) -> str:
if self.settings.password_hash == "md5":
return hashlib.md5(password.encode()).hexdigest()
return password
def fetch_bugs_page(self, page: int = 1) -> List[Dict]:
params = {"page": page, "limit": self.settings.per_page}
endpoint = f"api.php/v1/products/{self.settings.product_id}/bugs"
response = self._request("get", endpoint, params=params)
payload = response.json()
bugs = payload.get("bugs") or payload.get("data")
if bugs is None:
raise ZenDaoAPIError(f"Unexpected bug payload: {payload}")
return list(bugs)
def fetch_all_bugs(self) -> List[Dict]:
all_bugs: List[Dict] = []
for page in range(1, self.settings.max_pages + 1):
page_bugs = self.fetch_bugs_page(page)
if not page_bugs:
break
all_bugs.extend(page_bugs)
if len(page_bugs) < self.settings.per_page:
break
return all_bugs
def _request(self, method: str, path: str, **kwargs) -> Response:
url = f"{self.settings.base_url}/{path.lstrip('/')}"
kwargs.setdefault("timeout", self.settings.timeout)
kwargs.setdefault("verify", self.settings.verify_ssl)
response = self.session.request(method=method, url=url, **kwargs)
response.raise_for_status()
return response
def normalize_bug(bug: Dict, fields: Sequence[str] = BUG_EXPORT_FIELDS) -> Dict[str, str]:
normalized = {}
for field in fields:
value = bug.get(field, "")
if isinstance(value, dict):
value = value.get("realname") or value.get("account") or value
normalized[field] = value if isinstance(value, str) else str(value)
return normalized
def write_json(path: Path, bugs: Iterable[Dict]) -> None:
path.write_text(json.dumps(list(bugs), ensure_ascii=False, indent=2), encoding="utf-8")
def write_csv(path: Path, bugs: Iterable[Dict], fields: Sequence[str]) -> None:
bugs = list(bugs)
if not bugs:
path.write_text("", encoding="utf-8")
return
with path.open("w", encoding="utf-8", newline="") as csv_file:
writer = csv.DictWriter(csv_file, fieldnames=fields)
writer.writeheader()
for bug in bugs:
writer.writerow(normalize_bug(bug, fields))
def run_crawler(config: Path, output: Path) -> Path:
settings = ZenDaoSettings.from_file(config)
settings.validate()
client = ZenDaoClient(settings)
client.authenticate()
bugs = client.fetch_all_bugs()
if output.suffix.lower() == ".json":
write_json(output, bugs)
else:
write_csv(output, bugs, BUG_EXPORT_FIELDS)
return output
def parse_args(argv: Sequence[str]) -> argparse.Namespace:
parser = argparse.ArgumentParser(description="Fetch bug list from ZenDao.")
parser.add_argument("--config", type=Path, default=DEFAULT_CONFIG_PATH, help="Path to zendao_config.ini")
parser.add_argument(
"--output",
type=Path,
default=Path("bugs.csv"),
help="Output file path (.csv or .json)",
)
return parser.parse_args(argv)
def main(argv: Sequence[str] | None = None) -> None:
args = parse_args(argv or sys.argv[1:])
output = run_crawler(args.config, args.output)
print(f"Bug list saved to: {output}")
if __name__ == "__main__":
main()