238 lines
7.9 KiB
Python
238 lines
7.9 KiB
Python
from __future__ import annotations
|
|
|
|
"""
|
|
ZenDao bug list crawler.
|
|
|
|
Design overview:
|
|
1. Read credentials + crawl settings from zendao_config.ini.
|
|
2. Use ZenDao REST API (`api.php/v1/accessTokens`) to obtain a short-lived token.
|
|
3. Pull paginated bug records under the configured product via `/api.php/v1/products/{product_id}/bugs`.
|
|
4. Export normalized bug data to CSV (default) or JSON, usable by other automation.
|
|
"""
|
|
|
|
import argparse
|
|
import configparser
|
|
import csv
|
|
import hashlib
|
|
import json
|
|
import sys
|
|
from dataclasses import dataclass
|
|
from pathlib import Path
|
|
from typing import Dict, Iterable, List, Sequence
|
|
|
|
import requests
|
|
from requests import Response, Session
|
|
|
|
|
|
BUG_EXPORT_FIELDS: Sequence[str] = (
|
|
"id",
|
|
"title",
|
|
"status",
|
|
"severity",
|
|
"pri",
|
|
"openedBy",
|
|
"openedDate",
|
|
"assignedTo",
|
|
"resolvedBy",
|
|
"resolution",
|
|
"lastEditedDate",
|
|
)
|
|
DEFAULT_CONFIG_PATH = Path(__file__).with_name("zendao_config.ini")
|
|
|
|
|
|
class ZenDaoAPIError(RuntimeError):
|
|
"""Raised when ZenDao returns an unexpected payload."""
|
|
|
|
|
|
@dataclass
|
|
class ZenDaoSettings:
|
|
base_url: str
|
|
username: str
|
|
password: str
|
|
product_id: int
|
|
per_page: int = 50
|
|
max_pages: int = 10
|
|
verify_ssl: bool = False
|
|
timeout: int = 10
|
|
auth_mode: str = "password" # password | token
|
|
api_token: str | None = None
|
|
auth_endpoint: str = "api.php/v1/tokens"
|
|
token_header: str = "Token"
|
|
password_hash: str = "md5" # md5 | plain
|
|
|
|
@classmethod
|
|
def from_file(cls, path: Path, section: str = "zendao") -> "ZenDaoSettings":
|
|
parser = configparser.ConfigParser()
|
|
if not path.exists():
|
|
raise FileNotFoundError(f"Config file not found: {path}")
|
|
|
|
parser.read(path, encoding="utf-8")
|
|
if section not in parser:
|
|
raise KeyError(f"Section [{section}] not found in {path}")
|
|
|
|
cfg = parser[section]
|
|
return cls(
|
|
base_url=cfg.get("base_url", "").rstrip("/"),
|
|
username=cfg.get("username", ""),
|
|
password=cfg.get("password", ""),
|
|
product_id=cfg.getint("product_id", fallback=0),
|
|
per_page=cfg.getint("per_page", fallback=50),
|
|
max_pages=cfg.getint("max_pages", fallback=10),
|
|
verify_ssl=cfg.getboolean("verify_ssl", fallback=False),
|
|
timeout=cfg.getint("timeout", fallback=10),
|
|
auth_mode=cfg.get("auth_mode", "password").lower(),
|
|
api_token=cfg.get("api_token"),
|
|
auth_endpoint=cfg.get("auth_endpoint", "api.php/v1/tokens"),
|
|
token_header=cfg.get("token_header", "Token"),
|
|
password_hash=cfg.get("password_hash", "md5").lower(),
|
|
)
|
|
|
|
def validate(self) -> None:
|
|
missing = [
|
|
name
|
|
for name, value in (
|
|
("base_url", self.base_url),
|
|
("username", self.username),
|
|
("password", self.password),
|
|
)
|
|
if not value
|
|
]
|
|
if missing:
|
|
raise ValueError(f"Missing config values: {', '.join(missing)}")
|
|
if self.product_id <= 0:
|
|
raise ValueError("product_id must be > 0")
|
|
if self.auth_mode not in {"password", "token"}:
|
|
raise ValueError("auth_mode must be 'password' or 'token'")
|
|
if self.password_hash not in {"md5", "plain"}:
|
|
raise ValueError("password_hash must be 'md5' or 'plain'")
|
|
if self.auth_mode == "token" and not self.api_token:
|
|
raise ValueError("api_token required when auth_mode=token")
|
|
|
|
|
|
class ZenDaoClient:
|
|
def __init__(self, settings: ZenDaoSettings, session: Session | None = None) -> None:
|
|
self.settings = settings
|
|
self.session: Session = session or requests.Session()
|
|
self._token: str | None = None
|
|
|
|
def authenticate(self) -> str:
|
|
if self.settings.auth_mode == "token":
|
|
if not self.settings.api_token:
|
|
raise ValueError("api_token missing in config")
|
|
self.session.headers.update({self.settings.token_header: self.settings.api_token})
|
|
self._token = self.settings.api_token
|
|
return self._token
|
|
|
|
payload = {
|
|
"account": self.settings.username,
|
|
"password": self._format_password(self.settings.password),
|
|
}
|
|
response = self._request("post", self.settings.auth_endpoint, json=payload)
|
|
data = response.json()
|
|
token = data.get("token") or data.get("accessToken")
|
|
if not token:
|
|
raise ZenDaoAPIError(f"Unexpected token response: {data}")
|
|
|
|
self._token = token
|
|
self.session.headers.update({self.settings.token_header: token})
|
|
return token
|
|
|
|
def _format_password(self, password: str) -> str:
|
|
if self.settings.password_hash == "md5":
|
|
return hashlib.md5(password.encode()).hexdigest()
|
|
return password
|
|
|
|
def fetch_bugs_page(self, page: int = 1) -> List[Dict]:
|
|
params = {"page": page, "limit": self.settings.per_page}
|
|
endpoint = f"api.php/v1/products/{self.settings.product_id}/bugs"
|
|
response = self._request("get", endpoint, params=params)
|
|
payload = response.json()
|
|
bugs = payload.get("bugs") or payload.get("data")
|
|
if bugs is None:
|
|
raise ZenDaoAPIError(f"Unexpected bug payload: {payload}")
|
|
return list(bugs)
|
|
|
|
def fetch_all_bugs(self) -> List[Dict]:
|
|
all_bugs: List[Dict] = []
|
|
for page in range(1, self.settings.max_pages + 1):
|
|
page_bugs = self.fetch_bugs_page(page)
|
|
if not page_bugs:
|
|
break
|
|
all_bugs.extend(page_bugs)
|
|
if len(page_bugs) < self.settings.per_page:
|
|
break
|
|
return all_bugs
|
|
|
|
def _request(self, method: str, path: str, **kwargs) -> Response:
|
|
url = f"{self.settings.base_url}/{path.lstrip('/')}"
|
|
kwargs.setdefault("timeout", self.settings.timeout)
|
|
kwargs.setdefault("verify", self.settings.verify_ssl)
|
|
response = self.session.request(method=method, url=url, **kwargs)
|
|
response.raise_for_status()
|
|
return response
|
|
|
|
|
|
def normalize_bug(bug: Dict, fields: Sequence[str] = BUG_EXPORT_FIELDS) -> Dict[str, str]:
|
|
normalized = {}
|
|
for field in fields:
|
|
value = bug.get(field, "")
|
|
if isinstance(value, dict):
|
|
value = value.get("realname") or value.get("account") or value
|
|
normalized[field] = value if isinstance(value, str) else str(value)
|
|
return normalized
|
|
|
|
|
|
def write_json(path: Path, bugs: Iterable[Dict]) -> None:
|
|
path.write_text(json.dumps(list(bugs), ensure_ascii=False, indent=2), encoding="utf-8")
|
|
|
|
|
|
def write_csv(path: Path, bugs: Iterable[Dict], fields: Sequence[str]) -> None:
|
|
bugs = list(bugs)
|
|
if not bugs:
|
|
path.write_text("", encoding="utf-8")
|
|
return
|
|
|
|
with path.open("w", encoding="utf-8", newline="") as csv_file:
|
|
writer = csv.DictWriter(csv_file, fieldnames=fields)
|
|
writer.writeheader()
|
|
for bug in bugs:
|
|
writer.writerow(normalize_bug(bug, fields))
|
|
|
|
|
|
def run_crawler(config: Path, output: Path) -> Path:
|
|
settings = ZenDaoSettings.from_file(config)
|
|
settings.validate()
|
|
|
|
client = ZenDaoClient(settings)
|
|
client.authenticate()
|
|
bugs = client.fetch_all_bugs()
|
|
|
|
if output.suffix.lower() == ".json":
|
|
write_json(output, bugs)
|
|
else:
|
|
write_csv(output, bugs, BUG_EXPORT_FIELDS)
|
|
return output
|
|
|
|
|
|
def parse_args(argv: Sequence[str]) -> argparse.Namespace:
|
|
parser = argparse.ArgumentParser(description="Fetch bug list from ZenDao.")
|
|
parser.add_argument("--config", type=Path, default=DEFAULT_CONFIG_PATH, help="Path to zendao_config.ini")
|
|
parser.add_argument(
|
|
"--output",
|
|
type=Path,
|
|
default=Path("bugs.csv"),
|
|
help="Output file path (.csv or .json)",
|
|
)
|
|
return parser.parse_args(argv)
|
|
|
|
|
|
def main(argv: Sequence[str] | None = None) -> None:
|
|
args = parse_args(argv or sys.argv[1:])
|
|
output = run_crawler(args.config, args.output)
|
|
print(f"Bug list saved to: {output}")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|
|
|