Files
effekt-interface/app/api/controller/projectHookController.py
2026-05-07 19:21:19 +08:00

226 lines
8.4 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# encoding: UTF-8
import time
import hmac
import hashlib
import base64
import requests
from .baseCrudController import BaseCrudController
from ..model.projectHookModel import ProjectHook
from ..service.projectHookService import ProjectHookService
class ProjectHookController(BaseCrudController):
def hook_list(self):
filters = []
project_id = self._get(self.req_data, 'projectId', 'project_id')
hook_type = self._get(self.req_data, 'hookType', 'hook_type')
if project_id:
filters.append(ProjectHook.project_id == int(project_id))
if hook_type not in (None, ''):
filters.append(ProjectHook.hook_type == int(hook_type))
items, total = ProjectHookService.list_by_filters(
self.session, ProjectHook, filters,
self._get(self.req_data, 'pageNo', 'page', default=1),
self._get(self.req_data, 'pageSize', 'size', default=20),
ProjectHook.created_time
)
result_list = []
hook_type_map = {1: '飞书', 2: '钉钉', 3: '企微'}
for item in items:
hook_dict = item.to_dict()
hook_dict['hook_type_name'] = hook_type_map.get(item.hook_type, '')
result_list.append(hook_dict)
return {'list': result_list, 'total': total}
def hook_detail(self):
hook_id = self._get(self.req_data, 'hookId', 'id')
if not hook_id:
return {}, 'hookId 为必传参数'
item = ProjectHookService.get_by_id(self.session, ProjectHook, hook_id)
if not item:
return {}, '未查询到对应Hook配置'
ret = item.to_dict()
hook_type_map = {1: '飞书', 2: '钉钉', 3: '企微'}
ret['hook_type_name'] = hook_type_map.get(item.hook_type, '')
return ret, ''
def hook_create(self):
project_id = self._get(self.req_data, 'projectId', 'project_id')
hook_type = self._get(self.req_data, 'hookType', 'hook_type')
webhook_url = self._get(self.req_data, 'webhookUrl', 'webhook_url')
if not project_id:
return 0, 'projectId 为必传参数'
if not hook_type:
return 0, 'hookType 为必传参数'
if not webhook_url:
return 0, 'webhookUrl 为必传参数'
add_info = {
'project_id': project_id,
'hook_type': int(hook_type),
'webhook_url': webhook_url,
'secret': self._get(self.req_data, 'secret'),
'enabled': int(self._get(self.req_data, 'enabled', default=1)),
'description': self._get(self.req_data, 'description'),
'config': self._get(self.req_data, 'config', default={}),
'is_delete': 0
}
return ProjectHookService.create(self.session, ProjectHook, add_info)
def hook_update(self):
hook_id = self._get(self.req_data, 'hookId', 'id')
if not hook_id:
return 0, 'hookId 为必传参数'
update_info = {}
field_mapping = [
(('hookType', 'hook_type'), 'hook_type'),
(('webhookUrl', 'webhook_url'), 'webhook_url'),
(('secret',), 'secret'),
(('enabled',), 'enabled'),
(('description',), 'description'),
(('config',), 'config')
]
for req_keys, column_key in field_mapping:
value = self._get(self.req_data, *req_keys)
if value is not None:
update_info[column_key] = value
return ProjectHookService.update_by_id(self.session, ProjectHook, hook_id, update_info)
def hook_delete(self):
hook_id = self._get(self.req_data, 'hookId', 'id')
if not hook_id:
return 0, 'hookId 为必传参数'
return ProjectHookService.delete_by_id(self.session, ProjectHook, hook_id)
def hook_send(self):
project_id = self._get(self.req_data, 'projectId', 'project_id')
title = self._get(self.req_data, 'title')
content = self._get(self.req_data, 'content')
hook_type = self._get(self.req_data, 'hookType', 'hook_type')
hook_id = self._get(self.req_data, 'hookId', 'id')
real_name = self._get(self.req_data, 'real_name', 'realName')
if not project_id:
return 0, 'projectId 为必传参数'
if not title:
return 0, 'title 为必传参数'
if not content:
return 0, 'content 为必传参数'
at_prefix = f'@{real_name} ' if real_name else ''
final_content = f'{at_prefix}{content}'
if hook_id:
hook = ProjectHookService.get_by_id(self.session, ProjectHook, hook_id)
if not hook or hook.is_delete == 1 or hook.enabled != 1:
return 0, '未找到对应的Hook或Hook未启用'
hooks = [hook]
else:
hooks = ProjectHookService.get_hooks_by_project(self.session, project_id, hook_type)
if not hooks:
return 0, '未配置对应的Hook'
results = []
for hook in hooks:
if hook.hook_type == 1:
success, err_msg = self._send_feishu_message(hook.webhook_url, hook.secret, title, final_content)
elif hook.hook_type == 2:
success, err_msg = self._send_dingtalk_message(hook.webhook_url, hook.secret, title, final_content)
elif hook.hook_type == 3:
success, err_msg = self._send_wecom_message(hook.webhook_url, hook.secret, title, final_content)
else:
success, err_msg = False, '未知Hook类型'
results.append({
'hook_id': hook.id,
'hook_type': hook.hook_type,
'success': success,
'error': err_msg
})
all_success = all(r['success'] for r in results)
return 1 if all_success else 0, results
def _send_feishu_message(self, webhook_url, secret, title, content):
timestamp = str(int(time.time()))
sign = ''
if secret:
string_to_sign = f'{timestamp}\n{secret}'
hmac_code = hmac.new(secret.encode('utf-8'), string_to_sign.encode('utf-8'), hashlib.sha256).digest()
sign = base64.b64encode(hmac_code).decode('utf-8')
separator = '&' if '?' in webhook_url else '?'
url = f'{webhook_url}{separator}timestamp={timestamp}&sign={sign}' if sign else webhook_url
payload = {
'msg_type': 'text',
'content': {
'text': f'{title}\n\n{content}'
}
}
try:
response = requests.post(url, json=payload, timeout=10)
result = response.json()
if result.get('code') == 0:
return True, ''
else:
return False, result.get('msg', '发送失败')
except Exception as e:
return False, str(e)
def _send_dingtalk_message(self, webhook_url, secret, title, content):
timestamp = str(int(time.time() * 1000))
sign = ''
if secret:
string_to_sign = f'{timestamp}\n{secret}'
hmac_code = hmac.new(secret.encode('utf-8'), string_to_sign.encode('utf-8'), hashlib.sha256).digest()
sign = base64.b64encode(hmac_code).decode('utf-8')
separator = '&' if '?' in webhook_url else '?'
url = f'{webhook_url}{separator}timestamp={timestamp}&sign={sign}' if sign else webhook_url
payload = {
'msgtype': 'text',
'text': {
'content': f'{title}\n\n{content}'
}
}
try:
response = requests.post(url, json=payload, timeout=10)
result = response.json()
if result.get('errcode') == 0:
return True, ''
else:
return False, result.get('errmsg', '发送失败')
except Exception as e:
return False, str(e)
def _send_wecom_message(self, webhook_url, secret, title, content):
payload = {
'msgtype': 'text',
'text': {
'content': f'{title}\n\n{content}'
}
}
try:
response = requests.post(webhook_url, json=payload, timeout=10)
result = response.json()
if result.get('errcode') == 0:
return True, ''
else:
return False, result.get('errmsg', '发送失败')
except Exception as e:
return False, str(e)