Files
qiaoxinjiu 6994b185a3 addproject
2026-01-22 19:10:37 +08:00

248 lines
10 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# coding: utf-8
from redis import ConnectionPool, StrictRedis
import time
from base_framework.public_tools.read_config import ReadConfig
from base_framework.base_config.current_pth import env_choose_path
class RedisApi:
"""
| 功能说明: | 操作redis |
| 作者信息: | 作者 qiaoxinjiu |
| 修改时间: | 2021-09-03 |
"""
def __init__(self):
self.redis_con = None
self.pool = None
self.host = None
self.pwd = ""
self.port = 6379
self.evn_cfg = ReadConfig(env_choose_path)
self.current_business = self.evn_cfg.get_value(sections="run_evn_name", options="current_business")
def kw_conn_redis_service(self, redis_dbname, is_as=True):
"""
| 功能说明: | 与redis server 建立连接 |
| 输入参数: |
| | redis_dbname | redis数据库名默认为5 |
| | redis_ip_addr | redis服务器地址 |
| | password | redis密码 |
| | redis_port | redis端口号默认为6379 |
| | is_as | 是否为ALLSchool业务 |
| 返回参数: | 无 |
| 作者信息: | 作者 huaxuemin | 修改时间 2021-09-03 |
说明:初始化 pool和redis_con
"""
if not self.host: # 如果没有指定redis域名
if is_as:
self.host = 'redis-qa2.redis.rds.aliyuncs.com'
self.pwd = 'AcUVeRb8lN'
else:
if self.current_business == "hh":
self.host = 'redis.qa.huohua.cn'
self.pwd = 'AcUVeRb8lN'
elif self.current_business == "hhi":
self.host = 'redis.qa.visparklearning.com'
self.pwd = 'hxTjlWBYdK6UpAGF'
try:
self.pool = ConnectionPool(host=self.host, port=self.port,
db=redis_dbname, password=self.pwd, decode_responses=True)
self.redis_con = StrictRedis(connection_pool=self.pool)
except RuntimeError as e:
raise RuntimeError('Failed to connect redis service. error: %s' % e)
def kw_get_redis_message(self, redis_dbname, redis_key, timeout=5, is_as=True):
"""
| 功能说明: | 获取redis_key对应的value |
| 输入参数: | redis_key |
| | is_as | 是否为ALLSchool业务 |
| 返回参数: | redis_key对应的value |
| 作者信息: | 作者 huaxuemin | 修改时间 2021-09-03 |
"""
self.kw_conn_redis_service(redis_dbname, is_as=is_as)
try:
while timeout > 0:
if not self.redis_con.exists(redis_key):
time.sleep(1)
timeout -= 1
continue
else:
resp = self.redis_con.get(redis_key)
return resp
raise RuntimeError('not have redis_key, please check')
except RuntimeError as e:
raise RuntimeError('get redis value. error: %s' % e)
finally:
self.kw_disconnect_redis()
def kw_verify_redis_have_key(self, redis_dbname, key, timeout=5, is_as=True):
"""
| 功能说明: | 验证缓存中存在KEY |
| 输入参数: | key | key值 |
| | timeout | 超时时间默认为5s |
| | is_as | 是否为ALLSchool业务 |
| 返回参数: | True/False存在返回true否则失败 |
| 作者信息: | 作者 huaxuemin | 修改时间 2021-09-03 |
"""
self.kw_conn_redis_service(redis_dbname, is_as=is_as)
try:
while timeout > 0:
if self.redis_con.exists(key):
return True
else:
time.sleep(1)
timeout -= 1
continue
raise RuntimeError('not have this key: %s' % key)
except RuntimeError as e:
raise RuntimeError('verify_redis_have_key. error: %s' % e)
finally:
self.kw_disconnect_redis()
def kw_verify_redis_not_have_key(self, redis_dbname, key, timeout=5, is_as=True):
"""
| 功能说明: | 验证缓存中不存在KEY |
| 输入参数: | key | key值 |
| | timeout | 超时时间默认为5s |
| | is_as | 是否为ALLSchool业务 |
| 返回参数: | True/False不存在返回true否则失败 |
| 作者信息: | 作者 huaxuemin | 修改时间 2021-09-03 |
"""
self.kw_conn_redis_service(redis_dbname, is_as=is_as)
try:
while timeout > 0:
if not self.redis_con.exists(key):
return True
else:
time.sleep(1)
timeout -= 1
continue
raise RuntimeError('have this key: %s' % key)
except RuntimeError as e:
raise RuntimeError('verify_redis_not_have_key. error: %s' % e)
finally:
self.kw_disconnect_redis()
def kw_disconnect_redis(self):
"""
| 功能说明: | 断开rebids服务 |
| 输入参数: | 无 |
| 返回参数: | 无 |
| 作者信息: | 作者 huaxuemin | 修改时间 2021-09-03 |
备注: 断开连接池
"""
try:
self.redis_con.close()
self.pool.disconnect()
except RuntimeError as e:
raise RuntimeError('disconnect redis service failed. error: %s' % e)
def kw_del_key_by_key(self, redis_dbname, redis_key, is_as=True, is_check=True, host=None, pwd=None):
"""
| 功能说明: | 删掉redis中的 redis_key |
| 输入参数: | host | redis域名可以不传按环境走默认配置 |
| | pwd | redis密码可以不传按环境走默认配置 |
| | redis_dbname | redis数据库编号必传 |
| | redis_key | 关键key必传 |
| | is_as | 是否为ALLSchool业务 |
| 返回参数: | 无 |
| 作者信息: | 作者 huaxuemin | 修改时间 2021-09-03 |
"""
if host and pwd: # 当指定了域名,则按入参查询
self.host = host
self.pwd = pwd
self.kw_conn_redis_service(redis_dbname, is_as=is_as)
try:
resp = self.redis_con.delete(redis_key)
if not resp and is_check:
raise RuntimeError('del key %s failed. error: %s' % (redis_key, resp))
except Exception as e:
print(e)
self.kw_disconnect_redis()
# def kw_del_key_by_key_pre(self, redis_dbname, key_pre, is_as=True):
# """
# | 功能说明: | 根据前缀删除|
# | 输入参数: | key_pre |
# | | is_as | 是否为ALLSchool业务 |
# | 返回参数: | 无 |
# | 作者信息: | 作者 huaxuemin | 修改时间 2021-09-03 |
# """
# self.kw_conn_redis_service(redis_dbname, is_as=is_as)
# try:
# key_pre = key_pre + "*"
# res = self.redis_con.scan(match=key_pre, count=9999999999)
# if len(res[1]) > 0:
# for key in res[1]:
# resp = self.redis_con.delete(key)
# if not resp:
# raise RuntimeError('del key %s failed. error: %s' % (key, resp))
# except Exception as e:
# print(e)
# self.kw_disconnect_redis()
def kw_get_key_by_key_pre(self, redis_dbname, key_pre, is_as=True):
"""
| 功能说明: | 根据前缀删除|
| 输入参数: | key_pre |
| | is_as | 是否为ALLSchool业务 |
| 返回参数: | 无 |
| 作者信息: | 作者 huaxuemin | 修改时间 2021-10-07 |
"""
self.kw_conn_redis_service(redis_dbname, is_as=is_as)
key_pre = key_pre + "*"
# for key in self.redis_con.scan_iter(match=key_pre):
# self.redis_con.delete(key)
for key in self.redis_con.keys(key_pre):
return key
self.kw_disconnect_redis()
return ""
def kw_set_redis_by_db_key_value(self, db_num, key, value, is_as=False):
self.kw_conn_redis_service(db_num, is_as=is_as)
self.redis_con.set(key, value)
if not self.redis_con.exists(key):
raise ValueError("设置redis失败。")
def kw_get_zset_message(self, redis_dbname, redis_key, timeout=5, is_as=True):
"""
| 功能说明: | 获取redis_key 有序集合对应的value |
| 输入参数: | redis_key |
| | is_as | 是否为ALLSchool业务 |
| 返回参数: | redis_key 有序集合对应的value |
| 作者信息: | 作者 huaxuemin | 修改时间 2022-07-27 |
"""
self.kw_conn_redis_service(redis_dbname, is_as=is_as)
try:
while timeout > 0:
if not self.redis_con.exists(redis_key):
time.sleep(1)
timeout -= 1
continue
else:
resp = self.redis_con.zrange(redis_key, 0, -1)
return resp
print('not have redis_key: {}, please check'.format(redis_key))
return []
except RuntimeError as e:
print('get redis value. error: %s' % e)
return []
finally:
self.kw_disconnect_redis()
if __name__ == '__main__':
test = RedisApi()
# res = test.kw_get_redis_message(9, "peppa:ticket:call:pcsm:13708231975", is_as=False)
res = test.kw_del_key_by_key(host='rediscourse.qa.huohua.cn',
pwd='Bkl6LvqfzFCzYPAh',
redis_dbname='9',
redis_key="public-holiday-teacher-v1:1_82908",
is_as=False)
print(res)
# test.kw_conn_redis_service(0)
# test.kw_del_key_by_key(9, "REVISIT_TASK:REVISITING_IDS", is_as=False)
# print(test.kw_get_key_by_key_pre(0, "HULK-ORG-API:RESET-TOKEN:huaxuemin@huohua.cn#"))