近期POS服务因无法确定每个环节耗时,且环境不允许搭建ELK,所以使用Python里面的迭代yield写了一个日志分析功能,具体代码如下:
#!/usr/local/python37/bin/python3.7 # By: swimming # Date:2020/08/01 # 分析pos-gateway 日志 import time import sys import os # 重写getitem,可以自动创建没有key的字典 class multidict(dict): def __getitem__(self, item): try: return dict.__getitem__(self, item) except KeyError: value = self[item] = type(self)() return value # 2 missing方式 def __missing__(self, key): value = self[key] = type(self)() return value def r_line(filepath): with open(filepath, "r") as f: for item in f.readlines(): yield item.split(" ") def diffTime(day, s1, s2): FMT = "%Y-%m-%d %H:%M:%S" s1 = "{} {}".format(day, s1) s2 = "{} {}".format(day, s2) # print("S1:{}\nS2:{}\n\n".format(day, s1, s2)) s_time1 = time.mktime(time.strptime(s1.split(".")[0], FMT)) * 1000 + int( s1.split(".")[1] ) s_time2 = time.mktime(time.strptime(s2.split(".")[0], FMT)) * 1000 + int( s2.split(".")[1] ) return int(s_time2 - s_time1) if __name__ == "__main__": if len(sys.argv) == 2: # file = os.path.dirname(os.path.realpath(__file__)) +'/'+ sys.argv[1] # 全路径参数 file = sys.argv[1] dict_list = multidict() line_datas = r_line(file) for line in line_datas: try: if line[12].startswith("0x"): id_str = line[12].replace(",", "") end_str = line[-1].replace("\n", "") s_time = str(line[1]) # print("ID:{}----- {}".format(id_str, end_str)) # 转换时间 # time_str = "{} {}".format(str(line[0]), str(line[1])) # 判断执行状态 REGISTERED,ACTIVE,READ: 120B,WRITE: 134B,FLUSH,CLOSE,READ COMPLETE,INACTIVE,UNREGISTERED if end_str == "REGISTERED": dict_list[id_str]["reg"] = s_time # print("REG:{}".format(line[1])) if end_str == "ACTIVE": dict_list[id_str]["act"] = s_time if end_str.endswith("B"): if line[-2] == "READ:": dict_list[id_str]["rec"] = s_time dict_list[id_str]["rec_size"] = end_str if line[-2] == "WRITE:": dict_list[id_str]["send"] = s_time dict_list[id_str]["send_size"] = end_str if end_str == "FLUSH": dict_list[id_str]["flush"] = s_time if end_str == "CLOSE": dict_list[id_str]["close"] = s_time if end_str == "COMPLETE": dict_list[id_str]["compl"] = s_time if end_str == "INACTIVE": dict_list[id_str]["unact"] = s_time if end_str == "UNREGISTERED": dict_list[id_str]["unreg"] = s_time except IndexError: continue num = 0 for item in dict_list: # 时间计算部分 需要计算2个时间,全部耗时和接受耗时 if dict_list[item]["flush"] and dict_list[item]["reg"]: try: used_time = diffTime( line[0], dict_list[item]["reg"], dict_list[item]["unreg"] ) read_time = diffTime( line[0], dict_list[item]["reg"], dict_list[item]["rec"] ) print("REC: {}".format(dict_list[item]["rec"])) except Exception: print("ERROR ID: %s" % item) sys.exit(1) print( "-- REC:{} -- SEND:{}\n{} reg: {} act: {} r: {} w: {} flush: {} close: {} com: {} inact: {} unreg: {} used: {} r_used: {}".format( dict_list[item]["rec_size"], dict_list[item]["send_size"], item, dict_list[item]["reg"], dict_list[item]["act"], dict_list[item]["rec"], dict_list[item]["send"], dict_list[item]["flush"], dict_list[item]["close"], dict_list[item]["compl"], dict_list[item]["unact"], dict_list[item]["unreg"], used_time, read_time, ) ) elif not dict_list[item]["flush"]: num = num + 1 print("有效处理数据: {} 探测数据: {}".format(len(dict_list), num))
此脚本使用到了yield和多层字典自动创建的功能,使用方法 脚本 + 日志名。
No Leanote account ? Sign up now.