From 6a8be2a9a760be43c41a3c4080a788e4bff30d26 Mon Sep 17 00:00:00 2001 From: qiaoxinjiu Date: Thu, 4 Dec 2025 17:27:14 +0800 Subject: [PATCH] first commit --- zendao_tools/__init__.py | 216 ++++++++++++++++++++++++++++++ zendao_tools/handle_bug.py | 216 ++++++++++++++++++++++++++++++ zendao_tools/models.py | 0 zendao_tools/zendao_api.py | 237 +++++++++++++++++++++++++++++++++ zendao_tools/zendao_config.ini | 14 ++ 5 files changed, 683 insertions(+) create mode 100644 zendao_tools/__init__.py create mode 100644 zendao_tools/handle_bug.py create mode 100644 zendao_tools/models.py create mode 100644 zendao_tools/zendao_api.py create mode 100644 zendao_tools/zendao_config.ini diff --git a/zendao_tools/__init__.py b/zendao_tools/__init__.py new file mode 100644 index 0000000..7ca1960 --- /dev/null +++ b/zendao_tools/__init__.py @@ -0,0 +1,216 @@ +import requests +import hashlib +from bs4 import BeautifulSoup +from datetime import datetime + +# 最简版本 - 获取所有产品的Bug +def get_all_bugs(): + """获取所有产品的Bug""" + base_url = "http://39.170.26.156:8888" + username = "qiaoxinjiu" + password = "Qiao123456" + + print("获取禅道Bug列表") + print("=" * 50) + + # 登录 + session = requests.Session() + password_md5 = hashlib.md5(password.encode()).hexdigest() + + login_resp = session.post(f"{base_url}/api.php/v1/tokens", + json={"account": username, "password": password_md5}, + headers={'Content-Type': 'application/json'}) + + if login_resp.status_code not in [200, 201]: + print("登录失败") + return + + token = login_resp.json().get('token') + session.headers.update({'Token': token}) + + print("✅ 登录成功") + + # 获取所有产品 + products_resp = session.get(f"{base_url}/api.php/v1/products") + if products_resp.status_code != 200: + print("获取产品失败") + return + + products = products_resp.json().get('products', []) + print(f"📦 共 {len(products)} 个产品") + + all_bugs = [] + + # 遍历每个产品获取Bug + for product in products: + product_id = product.get('id') + product_name = product.get('name') + if product_id != 2: + continue + print(f"\n获取产品 '{product_name}' 的Bug...") + # 你的禅道实例没有开 REST bug 列表接口,只能爬取 HTML 页面 + bugs_url = f"{base_url}/bug-browse-{product_id}.html" + bugs_resp = session.get(bugs_url, params={'product': product_id, 'limit': 50}) + + if bugs_resp.status_code == 200: + try: + html = bugs_resp.text + soup = BeautifulSoup(html, "html.parser") + + # 找 bug 列表的主 table(尽量精确一点) + table = ( + soup.find("table", id="bugList") + or soup.find("table", class_="table") + or soup.find("table") + ) + if not table: + print(" ❌ 未找到 bug 列表 table") + # print(html[:500]) # 调试时可以打开 + return + + # 解析表头,建立「列名 -> 索引」映射 + header_tr = table.find("tr") + if not header_tr: + print(" ❌ 未找到表头行") + return + + header_map = {} + for idx, th in enumerate(header_tr.find_all(["th", "td"])): + text = th.get_text(strip=True) + if not text: + continue + if "ID" == text or text == "编号": + header_map["id"] = idx + elif "标题" in text: + header_map["title"] = idx + elif "状态" in text: + header_map["status"] = idx + elif "严重" in text: + header_map["severity"] = idx + elif "优先" in text or "优先级" in text: + header_map["pri"] = idx + elif "指派" in text: + header_map["assignedTo"] = idx + elif "创建" in text or "打开" in text: + # 如:创建日期 / 打开时间 + header_map["openedDate"] = idx + + bugs = [] + + # 遍历表体行 + for tr in table.find_all("tr")[1:]: + tds = tr.find_all("td") + if not tds: + continue + + # id + bug_id = tr.get("data-id") + if not bug_id and "id" in header_map and header_map["id"] < len(tds): + bug_id = tds[header_map["id"]].get_text(strip=True) + if not bug_id: + continue + + # 标题 + title = "" + if "title" in header_map and header_map["title"] < len(tds): + cell = tds[header_map["title"]] + link = cell.find("a") + title = (link or cell).get_text(strip=True) + + # 状态 + status = "" + if "status" in header_map and header_map["status"] < len(tds): + status = tds[header_map["status"]].get_text(strip=True) + + # 严重程度(有的版本用颜色块 + title 提示) + severity = "" + if "severity" in header_map and header_map["severity"] < len(tds): + sev_cell = tds[header_map["severity"]] + severity = sev_cell.get_text(strip=True) + if not severity: + sev_span = sev_cell.find("span") + if sev_span and sev_span.get("title"): + severity = sev_span.get("title").strip() + + # 优先级 + pri = "" + if "pri" in header_map and header_map["pri"] < len(tds): + pri = tds[header_map["pri"]].get_text(strip=True) + + # 指派给 + assigned_to = "" + if "assignedTo" in header_map and header_map["assignedTo"] < len(tds): + assigned_to = tds[header_map["assignedTo"]].get_text(strip=True) + + # 创建/打开日期 + opened_date = "" + if "openedDate" in header_map and header_map["openedDate"] < len(tds): + opened_date = tds[header_map["openedDate"]].get_text(strip=True) + + bug = { + "id": int(bug_id) if str(bug_id).isdigit() else bug_id, + "title": title, + "statusName": status, + "severity": severity, + "pri": pri, + "assignedToName": assigned_to, + "openedDate": opened_date, + "product_name": product_name, + } + bugs.append(bug) + + print(f" ✅ 解析出 {len(bugs)} 个Bug") + all_bugs.extend(bugs) + + except Exception as e: + print(f" ❌ 解析失败: {e}") + # 调试用:打印部分 HTML 看实际结构 + # print(html[:800]) + else: + print(f" ❌ 获取失败: {bugs_resp.status_code} - {bugs_resp.text[:100]}") + + # 显示所有Bug + if all_bugs: + # 计算统计数据 + today_str = datetime.today().strftime("%Y-%m-%d") + today_md_str = datetime.today().strftime("%m-%d") + total_bugs = len(all_bugs) + today_bugs = [ + b for b in all_bugs + if ( + today_str in str(b.get("openedDate", "")).strip() + or today_md_str in str(b.get("openedDate", "")).strip() + ) + ] + # 认为“未关闭”的状态:不在已关闭/已解决/已取消集合中 + closed_keywords = {"已关闭", "已解决", "已完成", "已取消", "closed", "resolved", "done", "cancelled"} + open_bugs = [ + b for b in all_bugs + if not any(kw in str(b.get("statusName", "")).lower() for kw in ["closed", "resolved", "done", "cancel"]) + and not any(kw in str(b.get("statusName", "")) for kw in ["已关闭", "已解决", "已完成", "已取消"]) + ] + + print(f"\n{'=' * 80}") + print(f"📊 Bug统计") + print(f"{'-' * 80}") + print(f" 今日新增 Bug 数: {len(today_bugs)}") + print(f" 未关闭 Bug 数: {len(open_bugs)}") + print(f" 总 Bug 数: {total_bugs}") + print(f"{'=' * 80}") + + # 按ID排序 + all_bugs.sort(key=lambda x: x.get('id', 0), reverse=True) + + for i, bug in enumerate(all_bugs[:30], 1): # 只显示前30个 + print( + f"{i}. [#{bug.get('id')}] {bug.get('title', '无标题')[:40]}{'...' if len(bug.get('title', '')) > 40 else ''}") + print(f" 产品: {bug.get('product_name')} | 状态: {bug.get('statusName', '未知')} | " + f"严重: {bug.get('severity', '未知')} | 指派: {bug.get('assignedToName', '未指派')} | " + f"创建时间: {bug.get('openedDate', '')}") + print(f"http://39.170.26.156:8888/bug-view-{bug.get('id')}.html") + + +if __name__ == "__main__": + + # 获取所有Bug版本 + get_all_bugs() \ No newline at end of file diff --git a/zendao_tools/handle_bug.py b/zendao_tools/handle_bug.py new file mode 100644 index 0000000..7ca1960 --- /dev/null +++ b/zendao_tools/handle_bug.py @@ -0,0 +1,216 @@ +import requests +import hashlib +from bs4 import BeautifulSoup +from datetime import datetime + +# 最简版本 - 获取所有产品的Bug +def get_all_bugs(): + """获取所有产品的Bug""" + base_url = "http://39.170.26.156:8888" + username = "qiaoxinjiu" + password = "Qiao123456" + + print("获取禅道Bug列表") + print("=" * 50) + + # 登录 + session = requests.Session() + password_md5 = hashlib.md5(password.encode()).hexdigest() + + login_resp = session.post(f"{base_url}/api.php/v1/tokens", + json={"account": username, "password": password_md5}, + headers={'Content-Type': 'application/json'}) + + if login_resp.status_code not in [200, 201]: + print("登录失败") + return + + token = login_resp.json().get('token') + session.headers.update({'Token': token}) + + print("✅ 登录成功") + + # 获取所有产品 + products_resp = session.get(f"{base_url}/api.php/v1/products") + if products_resp.status_code != 200: + print("获取产品失败") + return + + products = products_resp.json().get('products', []) + print(f"📦 共 {len(products)} 个产品") + + all_bugs = [] + + # 遍历每个产品获取Bug + for product in products: + product_id = product.get('id') + product_name = product.get('name') + if product_id != 2: + continue + print(f"\n获取产品 '{product_name}' 的Bug...") + # 你的禅道实例没有开 REST bug 列表接口,只能爬取 HTML 页面 + bugs_url = f"{base_url}/bug-browse-{product_id}.html" + bugs_resp = session.get(bugs_url, params={'product': product_id, 'limit': 50}) + + if bugs_resp.status_code == 200: + try: + html = bugs_resp.text + soup = BeautifulSoup(html, "html.parser") + + # 找 bug 列表的主 table(尽量精确一点) + table = ( + soup.find("table", id="bugList") + or soup.find("table", class_="table") + or soup.find("table") + ) + if not table: + print(" ❌ 未找到 bug 列表 table") + # print(html[:500]) # 调试时可以打开 + return + + # 解析表头,建立「列名 -> 索引」映射 + header_tr = table.find("tr") + if not header_tr: + print(" ❌ 未找到表头行") + return + + header_map = {} + for idx, th in enumerate(header_tr.find_all(["th", "td"])): + text = th.get_text(strip=True) + if not text: + continue + if "ID" == text or text == "编号": + header_map["id"] = idx + elif "标题" in text: + header_map["title"] = idx + elif "状态" in text: + header_map["status"] = idx + elif "严重" in text: + header_map["severity"] = idx + elif "优先" in text or "优先级" in text: + header_map["pri"] = idx + elif "指派" in text: + header_map["assignedTo"] = idx + elif "创建" in text or "打开" in text: + # 如:创建日期 / 打开时间 + header_map["openedDate"] = idx + + bugs = [] + + # 遍历表体行 + for tr in table.find_all("tr")[1:]: + tds = tr.find_all("td") + if not tds: + continue + + # id + bug_id = tr.get("data-id") + if not bug_id and "id" in header_map and header_map["id"] < len(tds): + bug_id = tds[header_map["id"]].get_text(strip=True) + if not bug_id: + continue + + # 标题 + title = "" + if "title" in header_map and header_map["title"] < len(tds): + cell = tds[header_map["title"]] + link = cell.find("a") + title = (link or cell).get_text(strip=True) + + # 状态 + status = "" + if "status" in header_map and header_map["status"] < len(tds): + status = tds[header_map["status"]].get_text(strip=True) + + # 严重程度(有的版本用颜色块 + title 提示) + severity = "" + if "severity" in header_map and header_map["severity"] < len(tds): + sev_cell = tds[header_map["severity"]] + severity = sev_cell.get_text(strip=True) + if not severity: + sev_span = sev_cell.find("span") + if sev_span and sev_span.get("title"): + severity = sev_span.get("title").strip() + + # 优先级 + pri = "" + if "pri" in header_map and header_map["pri"] < len(tds): + pri = tds[header_map["pri"]].get_text(strip=True) + + # 指派给 + assigned_to = "" + if "assignedTo" in header_map and header_map["assignedTo"] < len(tds): + assigned_to = tds[header_map["assignedTo"]].get_text(strip=True) + + # 创建/打开日期 + opened_date = "" + if "openedDate" in header_map and header_map["openedDate"] < len(tds): + opened_date = tds[header_map["openedDate"]].get_text(strip=True) + + bug = { + "id": int(bug_id) if str(bug_id).isdigit() else bug_id, + "title": title, + "statusName": status, + "severity": severity, + "pri": pri, + "assignedToName": assigned_to, + "openedDate": opened_date, + "product_name": product_name, + } + bugs.append(bug) + + print(f" ✅ 解析出 {len(bugs)} 个Bug") + all_bugs.extend(bugs) + + except Exception as e: + print(f" ❌ 解析失败: {e}") + # 调试用:打印部分 HTML 看实际结构 + # print(html[:800]) + else: + print(f" ❌ 获取失败: {bugs_resp.status_code} - {bugs_resp.text[:100]}") + + # 显示所有Bug + if all_bugs: + # 计算统计数据 + today_str = datetime.today().strftime("%Y-%m-%d") + today_md_str = datetime.today().strftime("%m-%d") + total_bugs = len(all_bugs) + today_bugs = [ + b for b in all_bugs + if ( + today_str in str(b.get("openedDate", "")).strip() + or today_md_str in str(b.get("openedDate", "")).strip() + ) + ] + # 认为“未关闭”的状态:不在已关闭/已解决/已取消集合中 + closed_keywords = {"已关闭", "已解决", "已完成", "已取消", "closed", "resolved", "done", "cancelled"} + open_bugs = [ + b for b in all_bugs + if not any(kw in str(b.get("statusName", "")).lower() for kw in ["closed", "resolved", "done", "cancel"]) + and not any(kw in str(b.get("statusName", "")) for kw in ["已关闭", "已解决", "已完成", "已取消"]) + ] + + print(f"\n{'=' * 80}") + print(f"📊 Bug统计") + print(f"{'-' * 80}") + print(f" 今日新增 Bug 数: {len(today_bugs)}") + print(f" 未关闭 Bug 数: {len(open_bugs)}") + print(f" 总 Bug 数: {total_bugs}") + print(f"{'=' * 80}") + + # 按ID排序 + all_bugs.sort(key=lambda x: x.get('id', 0), reverse=True) + + for i, bug in enumerate(all_bugs[:30], 1): # 只显示前30个 + print( + f"{i}. [#{bug.get('id')}] {bug.get('title', '无标题')[:40]}{'...' if len(bug.get('title', '')) > 40 else ''}") + print(f" 产品: {bug.get('product_name')} | 状态: {bug.get('statusName', '未知')} | " + f"严重: {bug.get('severity', '未知')} | 指派: {bug.get('assignedToName', '未指派')} | " + f"创建时间: {bug.get('openedDate', '')}") + print(f"http://39.170.26.156:8888/bug-view-{bug.get('id')}.html") + + +if __name__ == "__main__": + + # 获取所有Bug版本 + get_all_bugs() \ No newline at end of file diff --git a/zendao_tools/models.py b/zendao_tools/models.py new file mode 100644 index 0000000..e69de29 diff --git a/zendao_tools/zendao_api.py b/zendao_tools/zendao_api.py new file mode 100644 index 0000000..ece5daa --- /dev/null +++ b/zendao_tools/zendao_api.py @@ -0,0 +1,237 @@ +from __future__ import annotations + +""" +ZenDao bug list crawler. + +Design overview: +1. Read credentials + crawl settings from zendao_config.ini. +2. Use ZenDao REST API (`api.php/v1/accessTokens`) to obtain a short-lived token. +3. Pull paginated bug records under the configured product via `/api.php/v1/products/{product_id}/bugs`. +4. Export normalized bug data to CSV (default) or JSON, usable by other automation. +""" + +import argparse +import configparser +import csv +import hashlib +import json +import sys +from dataclasses import dataclass +from pathlib import Path +from typing import Dict, Iterable, List, Sequence + +import requests +from requests import Response, Session + + +BUG_EXPORT_FIELDS: Sequence[str] = ( + "id", + "title", + "status", + "severity", + "pri", + "openedBy", + "openedDate", + "assignedTo", + "resolvedBy", + "resolution", + "lastEditedDate", +) +DEFAULT_CONFIG_PATH = Path(__file__).with_name("zendao_config.ini") + + +class ZenDaoAPIError(RuntimeError): + """Raised when ZenDao returns an unexpected payload.""" + + +@dataclass +class ZenDaoSettings: + base_url: str + username: str + password: str + product_id: int + per_page: int = 50 + max_pages: int = 10 + verify_ssl: bool = False + timeout: int = 10 + auth_mode: str = "password" # password | token + api_token: str | None = None + auth_endpoint: str = "api.php/v1/tokens" + token_header: str = "Token" + password_hash: str = "md5" # md5 | plain + + @classmethod + def from_file(cls, path: Path, section: str = "zendao") -> "ZenDaoSettings": + parser = configparser.ConfigParser() + if not path.exists(): + raise FileNotFoundError(f"Config file not found: {path}") + + parser.read(path, encoding="utf-8") + if section not in parser: + raise KeyError(f"Section [{section}] not found in {path}") + + cfg = parser[section] + return cls( + base_url=cfg.get("base_url", "").rstrip("/"), + username=cfg.get("username", ""), + password=cfg.get("password", ""), + product_id=cfg.getint("product_id", fallback=0), + per_page=cfg.getint("per_page", fallback=50), + max_pages=cfg.getint("max_pages", fallback=10), + verify_ssl=cfg.getboolean("verify_ssl", fallback=False), + timeout=cfg.getint("timeout", fallback=10), + auth_mode=cfg.get("auth_mode", "password").lower(), + api_token=cfg.get("api_token"), + auth_endpoint=cfg.get("auth_endpoint", "api.php/v1/tokens"), + token_header=cfg.get("token_header", "Token"), + password_hash=cfg.get("password_hash", "md5").lower(), + ) + + def validate(self) -> None: + missing = [ + name + for name, value in ( + ("base_url", self.base_url), + ("username", self.username), + ("password", self.password), + ) + if not value + ] + if missing: + raise ValueError(f"Missing config values: {', '.join(missing)}") + if self.product_id <= 0: + raise ValueError("product_id must be > 0") + if self.auth_mode not in {"password", "token"}: + raise ValueError("auth_mode must be 'password' or 'token'") + if self.password_hash not in {"md5", "plain"}: + raise ValueError("password_hash must be 'md5' or 'plain'") + if self.auth_mode == "token" and not self.api_token: + raise ValueError("api_token required when auth_mode=token") + + +class ZenDaoClient: + def __init__(self, settings: ZenDaoSettings, session: Session | None = None) -> None: + self.settings = settings + self.session: Session = session or requests.Session() + self._token: str | None = None + + def authenticate(self) -> str: + if self.settings.auth_mode == "token": + if not self.settings.api_token: + raise ValueError("api_token missing in config") + self.session.headers.update({self.settings.token_header: self.settings.api_token}) + self._token = self.settings.api_token + return self._token + + payload = { + "account": self.settings.username, + "password": self._format_password(self.settings.password), + } + response = self._request("post", self.settings.auth_endpoint, json=payload) + data = response.json() + token = data.get("token") or data.get("accessToken") + if not token: + raise ZenDaoAPIError(f"Unexpected token response: {data}") + + self._token = token + self.session.headers.update({self.settings.token_header: token}) + return token + + def _format_password(self, password: str) -> str: + if self.settings.password_hash == "md5": + return hashlib.md5(password.encode()).hexdigest() + return password + + def fetch_bugs_page(self, page: int = 1) -> List[Dict]: + params = {"page": page, "limit": self.settings.per_page} + endpoint = f"api.php/v1/products/{self.settings.product_id}/bugs" + response = self._request("get", endpoint, params=params) + payload = response.json() + bugs = payload.get("bugs") or payload.get("data") + if bugs is None: + raise ZenDaoAPIError(f"Unexpected bug payload: {payload}") + return list(bugs) + + def fetch_all_bugs(self) -> List[Dict]: + all_bugs: List[Dict] = [] + for page in range(1, self.settings.max_pages + 1): + page_bugs = self.fetch_bugs_page(page) + if not page_bugs: + break + all_bugs.extend(page_bugs) + if len(page_bugs) < self.settings.per_page: + break + return all_bugs + + def _request(self, method: str, path: str, **kwargs) -> Response: + url = f"{self.settings.base_url}/{path.lstrip('/')}" + kwargs.setdefault("timeout", self.settings.timeout) + kwargs.setdefault("verify", self.settings.verify_ssl) + response = self.session.request(method=method, url=url, **kwargs) + response.raise_for_status() + return response + + +def normalize_bug(bug: Dict, fields: Sequence[str] = BUG_EXPORT_FIELDS) -> Dict[str, str]: + normalized = {} + for field in fields: + value = bug.get(field, "") + if isinstance(value, dict): + value = value.get("realname") or value.get("account") or value + normalized[field] = value if isinstance(value, str) else str(value) + return normalized + + +def write_json(path: Path, bugs: Iterable[Dict]) -> None: + path.write_text(json.dumps(list(bugs), ensure_ascii=False, indent=2), encoding="utf-8") + + +def write_csv(path: Path, bugs: Iterable[Dict], fields: Sequence[str]) -> None: + bugs = list(bugs) + if not bugs: + path.write_text("", encoding="utf-8") + return + + with path.open("w", encoding="utf-8", newline="") as csv_file: + writer = csv.DictWriter(csv_file, fieldnames=fields) + writer.writeheader() + for bug in bugs: + writer.writerow(normalize_bug(bug, fields)) + + +def run_crawler(config: Path, output: Path) -> Path: + settings = ZenDaoSettings.from_file(config) + settings.validate() + + client = ZenDaoClient(settings) + client.authenticate() + bugs = client.fetch_all_bugs() + + if output.suffix.lower() == ".json": + write_json(output, bugs) + else: + write_csv(output, bugs, BUG_EXPORT_FIELDS) + return output + + +def parse_args(argv: Sequence[str]) -> argparse.Namespace: + parser = argparse.ArgumentParser(description="Fetch bug list from ZenDao.") + parser.add_argument("--config", type=Path, default=DEFAULT_CONFIG_PATH, help="Path to zendao_config.ini") + parser.add_argument( + "--output", + type=Path, + default=Path("bugs.csv"), + help="Output file path (.csv or .json)", + ) + return parser.parse_args(argv) + + +def main(argv: Sequence[str] | None = None) -> None: + args = parse_args(argv or sys.argv[1:]) + output = run_crawler(args.config, args.output) + print(f"Bug list saved to: {output}") + + +if __name__ == "__main__": + main() + diff --git a/zendao_tools/zendao_config.ini b/zendao_tools/zendao_config.ini new file mode 100644 index 0000000..a7ea353 --- /dev/null +++ b/zendao_tools/zendao_config.ini @@ -0,0 +1,14 @@ +[zendao] +base_url = http://39.170.26.156:8888 +username = qiaoxinjiu +password = Qiao123456 +product_id = 1 +per_page = 50 +max_pages = 10 +verify_ssl = false +timeout = 10 +auth_mode = password +api_token = +auth_endpoint = api.php/v1/tokens +token_header = Token +password_hash = md5