# coding: utf-8 import importlib import os import re import sys import traceback from base_framework.base_config.current_pth import env_choose_path from base_framework.public_tools.read_config import ReadConfig from base_framework.public_tools.log import get_logger evn_cfg = ReadConfig(env_choose_path) team = evn_cfg.get_value(sections="run_evn_name", options="current_team") __all__ = ['KwLibrary'] cls_list_all = list() cls_list_father = list() cls_list_children = list() t_c_list = list() obj_log = get_logger() def is_import(root): black_list = [os.sep.join(['platform', 'tools']),'scene_server'] import_flag = True for black_item in black_list: if black_item in root: # print(3, root) import_flag = False return import_flag def class_import(path_list): root_path = os.path.abspath(os.path.join(os.getcwd(), "../")) module_path = root_path + os.sep + os.sep.join(path_list) sys.path.append(module_path) for root, dirs, files in os.walk(module_path): # if is_import(root): for file in files: if os.path.splitext(file)[1] == '.py' and not file.startswith('__'): with open(os.sep.join([root, file]), encoding="utf-8") as f: for line in f.readlines(): cls_match = re.match(r"class\s(.*?)[\(:]", line) if cls_match: cls_name = cls_match.group(1) try: if cls_name not in ["Runner"]: # module_list = root.split(os.sep)[len(os.getcwd().split(os.sep)) - 1:] # module_list.append(os.path.splitext(file)[0]) # module = importlib.import_module('.'.join(module_list)) module_path_list = root.split(os.sep) module_path_list.append(os.path.splitext(file)[0]) module_path = '' for index in range(len(module_path_list)): if module_path_list[index].lower() == 'base_framework' or module_path_list[index].lower() == 'library': tmp_path = module_path_list[index:] for path in tmp_path: module_path = module_path + path + '.' break module_path = module_path[0:-1] module = importlib.import_module(module_path) class_attr = getattr(module, cls_name) t_c_list.append(class_attr) # if class_attr: # if class_attr.__bases__[0] == object: # cls_list_children.append(class_attr) # else: # cls_list_father.append(class_attr) except: traceback.print_exc() print("ERROR import", file, cls_name) def sort_class(class_list): """ 功能:将所有的class按继承关系排序,子类在前,父类在后,用于startup启动加载 """ while len(class_list) > 0: # cls_list_len = len(cls_list_all) tmp_class_list = [] del_list = [] for index in range(len(class_list)): if not cls_list_all: # 先找出所有的父类是object的基类 if class_list[index].__bases__[0] == object: # while class_list[index].__bases__[0] == object: tmp_class_list.append(class_list[index]) del_list.append(class_list[index]) else: # 寻找前一步晒出基类的子类 if class_list[index].__bases__[0] in cls_list_all[len(cls_list_all) - 1]: tmp_class_list.append(class_list[index]) del_list.append(class_list[index]) if len(tmp_class_list) > 0: # 如果父类在已选出的类中,则保存该类 cls_list_all.append(tmp_class_list) for item in del_list: class_list.remove(item) else: # 如果父类都不在已选出的类中,则全部按父类是object处理 cls_list_all.append(class_list) class_list = [] class_import(["base_framework", "public_tools"]) class_import(["base_framework", "public_business"]) class_import([team, "library"]) # BaseLibrary = type('BaseLibrary', tuple(cls_list_father + cls_list_children), {}) sort_class(t_c_list) c_len = len(cls_list_all) obj_cls_tuple = () while c_len > 0: obj_cls_tuple += tuple(cls_list_all[c_len - 1], ) c_len = c_len - 1 print(obj_cls_tuple) BaseLibrary = type('BaseLibrary', obj_cls_tuple, {}) class KwLibrary(BaseLibrary): def __init__(self): class_len = 0 for base in BaseLibrary.__bases__: try: if hasattr(base, '__init__'): if '__init__' in base.__dict__.keys(): if base.__bases__[0] != object or len(base.__init__.__code__.co_varnames) == 1: base.__init__(self) print(base) # obj_log.info("init class:{} success...".format(base)) class_len += 1 except Exception as e: if base.__bases__[0] == object: print("INIT", e, base, base.__dict__) # obj_log.info("import files num: {}".format(class_len)) print("import files num: {}".format(class_len))