from __future__ import annotations """ ZenDao bug list crawler. Design overview: 1. Read credentials + crawl settings from zendao_config.ini. 2. Use ZenDao REST API (`api.php/v1/accessTokens`) to obtain a short-lived token. 3. Pull paginated bug records under the configured product via `/api.php/v1/products/{product_id}/bugs`. 4. Export normalized bug data to CSV (default) or JSON, usable by other automation. """ import argparse import configparser import csv import hashlib import json import sys from dataclasses import dataclass from pathlib import Path from typing import Dict, Iterable, List, Sequence import requests from requests import Response, Session BUG_EXPORT_FIELDS: Sequence[str] = ( "id", "title", "status", "severity", "pri", "openedBy", "openedDate", "assignedTo", "resolvedBy", "resolution", "lastEditedDate", ) DEFAULT_CONFIG_PATH = Path(__file__).with_name("zendao_config.ini") class ZenDaoAPIError(RuntimeError): """Raised when ZenDao returns an unexpected payload.""" @dataclass class ZenDaoSettings: base_url: str username: str password: str product_id: int per_page: int = 50 max_pages: int = 10 verify_ssl: bool = False timeout: int = 10 auth_mode: str = "password" # password | token api_token: str | None = None auth_endpoint: str = "api.php/v1/tokens" token_header: str = "Token" password_hash: str = "md5" # md5 | plain @classmethod def from_file(cls, path: Path, section: str = "zendao") -> "ZenDaoSettings": parser = configparser.ConfigParser() if not path.exists(): raise FileNotFoundError(f"Config file not found: {path}") parser.read(path, encoding="utf-8") if section not in parser: raise KeyError(f"Section [{section}] not found in {path}") cfg = parser[section] return cls( base_url=cfg.get("base_url", "").rstrip("/"), username=cfg.get("username", ""), password=cfg.get("password", ""), product_id=cfg.getint("product_id", fallback=0), per_page=cfg.getint("per_page", fallback=50), max_pages=cfg.getint("max_pages", fallback=10), verify_ssl=cfg.getboolean("verify_ssl", fallback=False), timeout=cfg.getint("timeout", fallback=10), auth_mode=cfg.get("auth_mode", "password").lower(), api_token=cfg.get("api_token"), auth_endpoint=cfg.get("auth_endpoint", "api.php/v1/tokens"), token_header=cfg.get("token_header", "Token"), password_hash=cfg.get("password_hash", "md5").lower(), ) def validate(self) -> None: missing = [ name for name, value in ( ("base_url", self.base_url), ("username", self.username), ("password", self.password), ) if not value ] if missing: raise ValueError(f"Missing config values: {', '.join(missing)}") if self.product_id <= 0: raise ValueError("product_id must be > 0") if self.auth_mode not in {"password", "token"}: raise ValueError("auth_mode must be 'password' or 'token'") if self.password_hash not in {"md5", "plain"}: raise ValueError("password_hash must be 'md5' or 'plain'") if self.auth_mode == "token" and not self.api_token: raise ValueError("api_token required when auth_mode=token") class ZenDaoClient: def __init__(self, settings: ZenDaoSettings, session: Session | None = None) -> None: self.settings = settings self.session: Session = session or requests.Session() self._token: str | None = None def authenticate(self) -> str: if self.settings.auth_mode == "token": if not self.settings.api_token: raise ValueError("api_token missing in config") self.session.headers.update({self.settings.token_header: self.settings.api_token}) self._token = self.settings.api_token return self._token payload = { "account": self.settings.username, "password": self._format_password(self.settings.password), } response = self._request("post", self.settings.auth_endpoint, json=payload) data = response.json() token = data.get("token") or data.get("accessToken") if not token: raise ZenDaoAPIError(f"Unexpected token response: {data}") self._token = token self.session.headers.update({self.settings.token_header: token}) return token def _format_password(self, password: str) -> str: if self.settings.password_hash == "md5": return hashlib.md5(password.encode()).hexdigest() return password def fetch_bugs_page(self, page: int = 1) -> List[Dict]: params = {"page": page, "limit": self.settings.per_page} endpoint = f"api.php/v1/products/{self.settings.product_id}/bugs" response = self._request("get", endpoint, params=params) payload = response.json() bugs = payload.get("bugs") or payload.get("data") if bugs is None: raise ZenDaoAPIError(f"Unexpected bug payload: {payload}") return list(bugs) def fetch_all_bugs(self) -> List[Dict]: all_bugs: List[Dict] = [] for page in range(1, self.settings.max_pages + 1): page_bugs = self.fetch_bugs_page(page) if not page_bugs: break all_bugs.extend(page_bugs) if len(page_bugs) < self.settings.per_page: break return all_bugs def _request(self, method: str, path: str, **kwargs) -> Response: url = f"{self.settings.base_url}/{path.lstrip('/')}" kwargs.setdefault("timeout", self.settings.timeout) kwargs.setdefault("verify", self.settings.verify_ssl) response = self.session.request(method=method, url=url, **kwargs) response.raise_for_status() return response def normalize_bug(bug: Dict, fields: Sequence[str] = BUG_EXPORT_FIELDS) -> Dict[str, str]: normalized = {} for field in fields: value = bug.get(field, "") if isinstance(value, dict): value = value.get("realname") or value.get("account") or value normalized[field] = value if isinstance(value, str) else str(value) return normalized def write_json(path: Path, bugs: Iterable[Dict]) -> None: path.write_text(json.dumps(list(bugs), ensure_ascii=False, indent=2), encoding="utf-8") def write_csv(path: Path, bugs: Iterable[Dict], fields: Sequence[str]) -> None: bugs = list(bugs) if not bugs: path.write_text("", encoding="utf-8") return with path.open("w", encoding="utf-8", newline="") as csv_file: writer = csv.DictWriter(csv_file, fieldnames=fields) writer.writeheader() for bug in bugs: writer.writerow(normalize_bug(bug, fields)) def run_crawler(config: Path, output: Path) -> Path: settings = ZenDaoSettings.from_file(config) settings.validate() client = ZenDaoClient(settings) client.authenticate() bugs = client.fetch_all_bugs() if output.suffix.lower() == ".json": write_json(output, bugs) else: write_csv(output, bugs, BUG_EXPORT_FIELDS) return output def parse_args(argv: Sequence[str]) -> argparse.Namespace: parser = argparse.ArgumentParser(description="Fetch bug list from ZenDao.") parser.add_argument("--config", type=Path, default=DEFAULT_CONFIG_PATH, help="Path to zendao_config.ini") parser.add_argument( "--output", type=Path, default=Path("bugs.csv"), help="Output file path (.csv or .json)", ) return parser.parse_args(argv) def main(argv: Sequence[str] | None = None) -> None: args = parse_args(argv or sys.argv[1:]) output = run_crawler(args.config, args.output) print(f"Bug list saved to: {output}") if __name__ == "__main__": main()