# coding: utf-8 from redis import ConnectionPool, StrictRedis import time from base_framework.public_tools.read_config import ReadConfig from base_framework.base_config.current_pth import env_choose_path class RedisApi: """ | 功能说明: | 操作redis | | 作者信息: | 作者 qiaoxinjiu | | 修改时间: | 2021-09-03 | """ def __init__(self): self.redis_con = None self.pool = None self.host = None self.pwd = "" self.port = 6379 self.evn_cfg = ReadConfig(env_choose_path) self.current_business = self.evn_cfg.get_value(sections="run_evn_name", options="current_business") def kw_conn_redis_service(self, redis_dbname, is_as=True): """ | 功能说明: | 与redis server 建立连接 | | 输入参数: | | | redis_dbname | redis数据库名,默认为5 | | | redis_ip_addr | redis服务器地址 | | | password | redis密码 | | | redis_port | redis端口号,默认为6379 | | | is_as | 是否为ALLSchool业务 | | 返回参数: | 无 | | 作者信息: | 作者 huaxuemin | 修改时间 2021-09-03 | 说明:初始化 pool和redis_con """ if not self.host: # 如果没有指定redis域名 if is_as: self.host = 'redis-qa2.redis.rds.aliyuncs.com' self.pwd = 'AcUVeRb8lN' else: if self.current_business == "hh": self.host = 'redis.qa.huohua.cn' self.pwd = 'AcUVeRb8lN' elif self.current_business == "hhi": self.host = 'redis.qa.visparklearning.com' self.pwd = 'hxTjlWBYdK6UpAGF' try: self.pool = ConnectionPool(host=self.host, port=self.port, db=redis_dbname, password=self.pwd, decode_responses=True) self.redis_con = StrictRedis(connection_pool=self.pool) except RuntimeError as e: raise RuntimeError('Failed to connect redis service. error: %s' % e) def kw_get_redis_message(self, redis_dbname, redis_key, timeout=5, is_as=True): """ | 功能说明: | 获取redis_key对应的value | | 输入参数: | redis_key | | | is_as | 是否为ALLSchool业务 | | 返回参数: | redis_key对应的value | | 作者信息: | 作者 huaxuemin | 修改时间 2021-09-03 | """ self.kw_conn_redis_service(redis_dbname, is_as=is_as) try: while timeout > 0: if not self.redis_con.exists(redis_key): time.sleep(1) timeout -= 1 continue else: resp = self.redis_con.get(redis_key) return resp raise RuntimeError('not have redis_key, please check') except RuntimeError as e: raise RuntimeError('get redis value. error: %s' % e) finally: self.kw_disconnect_redis() def kw_verify_redis_have_key(self, redis_dbname, key, timeout=5, is_as=True): """ | 功能说明: | 验证缓存中存在KEY | | 输入参数: | key | key值 | | | timeout | 超时时间,默认为5s | | | is_as | 是否为ALLSchool业务 | | 返回参数: | True/False:存在返回true,否则失败 | | 作者信息: | 作者 huaxuemin | 修改时间 2021-09-03 | """ self.kw_conn_redis_service(redis_dbname, is_as=is_as) try: while timeout > 0: if self.redis_con.exists(key): return True else: time.sleep(1) timeout -= 1 continue raise RuntimeError('not have this key: %s' % key) except RuntimeError as e: raise RuntimeError('verify_redis_have_key. error: %s' % e) finally: self.kw_disconnect_redis() def kw_verify_redis_not_have_key(self, redis_dbname, key, timeout=5, is_as=True): """ | 功能说明: | 验证缓存中不存在KEY | | 输入参数: | key | key值 | | | timeout | 超时时间,默认为5s | | | is_as | 是否为ALLSchool业务 | | 返回参数: | True/False:不存在返回true,否则失败 | | 作者信息: | 作者 huaxuemin | 修改时间 2021-09-03 | """ self.kw_conn_redis_service(redis_dbname, is_as=is_as) try: while timeout > 0: if not self.redis_con.exists(key): return True else: time.sleep(1) timeout -= 1 continue raise RuntimeError('have this key: %s' % key) except RuntimeError as e: raise RuntimeError('verify_redis_not_have_key. error: %s' % e) finally: self.kw_disconnect_redis() def kw_disconnect_redis(self): """ | 功能说明: | 断开rebids服务 | | 输入参数: | 无 | | 返回参数: | 无 | | 作者信息: | 作者 huaxuemin | 修改时间 2021-09-03 | 备注: 断开连接池 """ try: self.redis_con.close() self.pool.disconnect() except RuntimeError as e: raise RuntimeError('disconnect redis service failed. error: %s' % e) def kw_del_key_by_key(self, redis_dbname, redis_key, is_as=True, is_check=True, host=None, pwd=None): """ | 功能说明: | 删掉redis中的 redis_key | | 输入参数: | host | redis域名,可以不传,按环境走默认配置 | | | pwd | redis密码,可以不传,按环境走默认配置 | | | redis_dbname | redis数据库编号,必传 | | | redis_key | 关键key,必传 | | | is_as | 是否为ALLSchool业务 | | 返回参数: | 无 | | 作者信息: | 作者 huaxuemin | 修改时间 2021-09-03 | """ if host and pwd: # 当指定了域名,则按入参查询 self.host = host self.pwd = pwd self.kw_conn_redis_service(redis_dbname, is_as=is_as) try: resp = self.redis_con.delete(redis_key) if not resp and is_check: raise RuntimeError('del key %s failed. error: %s' % (redis_key, resp)) except Exception as e: print(e) self.kw_disconnect_redis() # def kw_del_key_by_key_pre(self, redis_dbname, key_pre, is_as=True): # """ # | 功能说明: | 根据前缀删除| # | 输入参数: | key_pre | # | | is_as | 是否为ALLSchool业务 | # | 返回参数: | 无 | # | 作者信息: | 作者 huaxuemin | 修改时间 2021-09-03 | # """ # self.kw_conn_redis_service(redis_dbname, is_as=is_as) # try: # key_pre = key_pre + "*" # res = self.redis_con.scan(match=key_pre, count=9999999999) # if len(res[1]) > 0: # for key in res[1]: # resp = self.redis_con.delete(key) # if not resp: # raise RuntimeError('del key %s failed. error: %s' % (key, resp)) # except Exception as e: # print(e) # self.kw_disconnect_redis() def kw_get_key_by_key_pre(self, redis_dbname, key_pre, is_as=True): """ | 功能说明: | 根据前缀删除| | 输入参数: | key_pre | | | is_as | 是否为ALLSchool业务 | | 返回参数: | 无 | | 作者信息: | 作者 huaxuemin | 修改时间 2021-10-07 | """ self.kw_conn_redis_service(redis_dbname, is_as=is_as) key_pre = key_pre + "*" # for key in self.redis_con.scan_iter(match=key_pre): # self.redis_con.delete(key) for key in self.redis_con.keys(key_pre): return key self.kw_disconnect_redis() return "" def kw_set_redis_by_db_key_value(self, db_num, key, value, is_as=False): self.kw_conn_redis_service(db_num, is_as=is_as) self.redis_con.set(key, value) if not self.redis_con.exists(key): raise ValueError("设置redis失败。") def kw_get_zset_message(self, redis_dbname, redis_key, timeout=5, is_as=True): """ | 功能说明: | 获取redis_key 有序集合对应的value | | 输入参数: | redis_key | | | is_as | 是否为ALLSchool业务 | | 返回参数: | redis_key 有序集合对应的value | | 作者信息: | 作者 huaxuemin | 修改时间 2022-07-27 | """ self.kw_conn_redis_service(redis_dbname, is_as=is_as) try: while timeout > 0: if not self.redis_con.exists(redis_key): time.sleep(1) timeout -= 1 continue else: resp = self.redis_con.zrange(redis_key, 0, -1) return resp print('not have redis_key: {}, please check'.format(redis_key)) return [] except RuntimeError as e: print('get redis value. error: %s' % e) return [] finally: self.kw_disconnect_redis() if __name__ == '__main__': test = RedisApi() # res = test.kw_get_redis_message(9, "peppa:ticket:call:pcsm:13708231975", is_as=False) res = test.kw_del_key_by_key(host='rediscourse.qa.huohua.cn', pwd='Bkl6LvqfzFCzYPAh', redis_dbname='9', redis_key="public-holiday-teacher-v1:1_82908", is_as=False) print(res) # test.kw_conn_redis_service(0) # test.kw_del_key_by_key(9, "REVISIT_TASK:REVISITING_IDS", is_as=False) # print(test.kw_get_key_by_key_pre(0, "HULK-ORG-API:RESET-TOKEN:huaxuemin@huohua.cn#"))