# encoding: UTF-8 import time import subprocess from subprocess import PIPE,Popen import os class OSType: WIN, LINUX, UNKNOWN = range(3) def __init__(self): pass @staticmethod def get_type(): import platform system_name = platform.system() if system_name.lower() == 'windows': return OSType.WIN elif system_name.lower() == 'linux': return OSType.LINUX else: return OSType.UNKNOWN class tool(object): def __init__(self): self.env_port = 5011 def run_process(self, cmd_str, out_p=False): """ run command cmd_str unicode string. """ if OSType.WIN == OSType.get_type(): # cmd_str = cmd_str.encode('gbk') cmd_str = cmd_str elif OSType.LINUX == OSType.get_type(): cmd_str = cmd_str.encode('utf-8') else: raise RuntimeError("your os is not support.") close_fds = False if OSType.WIN == OSType.get_type() else True if out_p: p = subprocess.Popen(cmd_str, shell=True, close_fds=close_fds, stdout=PIPE) p.wait() return p.returncode, p.stdout.read() else: c = self.get_devnull() p = subprocess.Popen(cmd_str, shell=True, close_fds=close_fds, stdout=c) # for line in p.stdout.readline(): # print(line) # p.stdout.close() p.communicate() return p.returncode, None def adb_cmd(self,cmd): process = Popen(cmd, shell=True, stderr=PIPE, stdout=PIPE) (stdout, stdrr) = process.communicate() stdout = stdout.decode('gbk') # 返回字段中存在中文,使用gbk return stdout, stdrr def get_devnull(self): try: return subprocess.DEVNULL except AttributeError: # Python 2.x or older return open(os.devnull, 'r+') def _kills_pid(self): if OSType.WIN == OSType.get_type(): kill_pid_cmd = "taskkill /f /pid {}".format(self.pid) elif OSType.LINUX == OSType.get_type(): kill_pid_cmd = "kill -9 {}".format(self.pid) else: raise RuntimeError("your os is not support.") res_code, res_context = self.run_process(kill_pid_cmd) if res_code: raise RuntimeError("kill pid: {} failed. error: {}".format(self.pid, res_context)) def check_port(self): if OSType.WIN == OSType.get_type(): find_pid_win_cmd = 'netstat -ano | findstr {} | findstr LISTENING'.format(self.env_port) print(find_pid_win_cmd) res_code, res_context = self.run_process(find_pid_win_cmd, out_p=True) if res_code == 0: if len(res_context) > 0: try: self.pid = str(res_context).split()[-1].replace("\\r\\n'", "") self._kills_pid() except IndexError: pass elif OSType.LINUX == OSType.get_type(): find_pid_linux_cmd = "lsof -i:{}".format(self.env_port) res_code, res_context = self.run_process(find_pid_linux_cmd, out_p=True) if res_code == 0: # 获取pid if len(res_context) > 0: try: self.pid = str(res_context).split("\n")[1].split()[1] self._kills_pid() except IndexError: pass else: raise RuntimeError("your os is not support.") def run_manage(self): count = 3 while count > 0: self.run_manages() find_pid_linux_cmd = "lsof -i:{}".format(self.env_port) res_code, res_context = self.run_process(find_pid_linux_cmd, out_p=True) print(res_code, "---res_code---", res_context, "---res_context---") if len(res_context) > 0: time.sleep(2) pid = str(res_context).split("\n")[1].split()[1] print(pid, "pid####") count -= 1 if pid: break else: continue else: break def run_manages(self): lod = "nohup python3 platform_tools/aida/manage.py &." sh_lod = "./5011.sh" self.run_process(sh_lod) # subprocess.call(["./5011.sh"]) # self.run_process(lod) if __name__ == '__main__': test = tool() test.check_port() time.sleep(3) test.run_manage()