Files
qiaoxinjiu 6994b185a3 addproject
2026-01-22 19:10:37 +08:00

46 lines
1.2 KiB
Python

import requests
def fetch_kibana_logs(query, time_range, index_pattern):
kibana_url = "https://logstashlog-kibana.qc.huohua.cn/login" # 替换为实际的Kibana URL
search_endpoint = f"{kibana_url}/app/kibana" # Kibana搜索API的URL
headers = {
"Content-Type": "application/json",
}
user = "elastic"
pwd = "s3dr40O,&j"
payload = {
"query": query,
"timeRange": time_range,
"index": index_pattern
}
try:
response = requests.post(kibana_url, headers=headers,auth=(user, pwd), json=payload)
response.raise_for_status()
logs = response.json()
# 处理日志数据
# ...
return logs
except requests.exceptions.RequestException as e:
print(f"Error fetching Kibana logs: {str(e)}")
return None
query = "error" # 搜索关键字
time_range = "now-1d/d" # 过去一天内的日志
index_pattern = "my-logs-*" # 匹配以 "my-logs-" 开头的索引
logs = fetch_kibana_logs(query, time_range, index_pattern)
if logs:
# 处理日志数据
for log in logs:
# 对每条日志进行操作
print(log)
else:
# 处理错误情况
print("Failed to fetch Kibana logs.")