近期POS服务因无法确定每个环节耗时,且环境不允许搭建ELK,所以使用Python里面的迭代yield写了一个日志分析功能,具体代码如下:
#!/usr/local/python37/bin/python3.7
# By: swimming
# Date:2020/08/01
# 分析pos-gateway 日志
import time
import sys
import os
# 重写getitem,可以自动创建没有key的字典
class multidict(dict):
def __getitem__(self, item):
try:
return dict.__getitem__(self, item)
except KeyError:
value = self[item] = type(self)()
return value
# 2 missing方式
def __missing__(self, key):
value = self[key] = type(self)()
return value
def r_line(filepath):
with open(filepath, "r") as f:
for item in f.readlines():
yield item.split(" ")
def diffTime(day, s1, s2):
FMT = "%Y-%m-%d %H:%M:%S"
s1 = "{} {}".format(day, s1)
s2 = "{} {}".format(day, s2)
# print("S1:{}\nS2:{}\n\n".format(day, s1, s2))
s_time1 = time.mktime(time.strptime(s1.split(".")[0], FMT)) * 1000 + int(
s1.split(".")[1]
)
s_time2 = time.mktime(time.strptime(s2.split(".")[0], FMT)) * 1000 + int(
s2.split(".")[1]
)
return int(s_time2 - s_time1)
if __name__ == "__main__":
if len(sys.argv) == 2:
# file = os.path.dirname(os.path.realpath(__file__)) +'/'+ sys.argv[1]
# 全路径参数
file = sys.argv[1]
dict_list = multidict()
line_datas = r_line(file)
for line in line_datas:
try:
if line[12].startswith("0x"):
id_str = line[12].replace(",", "")
end_str = line[-1].replace("\n", "")
s_time = str(line[1])
# print("ID:{}----- {}".format(id_str, end_str))
# 转换时间
# time_str = "{} {}".format(str(line[0]), str(line[1]))
# 判断执行状态 REGISTERED,ACTIVE,READ: 120B,WRITE: 134B,FLUSH,CLOSE,READ COMPLETE,INACTIVE,UNREGISTERED
if end_str == "REGISTERED":
dict_list[id_str]["reg"] = s_time
# print("REG:{}".format(line[1]))
if end_str == "ACTIVE":
dict_list[id_str]["act"] = s_time
if end_str.endswith("B"):
if line[-2] == "READ:":
dict_list[id_str]["rec"] = s_time
dict_list[id_str]["rec_size"] = end_str
if line[-2] == "WRITE:":
dict_list[id_str]["send"] = s_time
dict_list[id_str]["send_size"] = end_str
if end_str == "FLUSH":
dict_list[id_str]["flush"] = s_time
if end_str == "CLOSE":
dict_list[id_str]["close"] = s_time
if end_str == "COMPLETE":
dict_list[id_str]["compl"] = s_time
if end_str == "INACTIVE":
dict_list[id_str]["unact"] = s_time
if end_str == "UNREGISTERED":
dict_list[id_str]["unreg"] = s_time
except IndexError:
continue
num = 0
for item in dict_list:
# 时间计算部分 需要计算2个时间,全部耗时和接受耗时
if dict_list[item]["flush"] and dict_list[item]["reg"]:
try:
used_time = diffTime(
line[0], dict_list[item]["reg"], dict_list[item]["unreg"]
)
read_time = diffTime(
line[0], dict_list[item]["reg"], dict_list[item]["rec"]
)
print("REC: {}".format(dict_list[item]["rec"]))
except Exception:
print("ERROR ID: %s" % item)
sys.exit(1)
print(
"-- REC:{} -- SEND:{}\n{} reg: {} act: {} r: {} w: {} flush: {} close: {} com: {} inact: {} unreg: {} used: {} r_used: {}".format(
dict_list[item]["rec_size"],
dict_list[item]["send_size"],
item,
dict_list[item]["reg"],
dict_list[item]["act"],
dict_list[item]["rec"],
dict_list[item]["send"],
dict_list[item]["flush"],
dict_list[item]["close"],
dict_list[item]["compl"],
dict_list[item]["unact"],
dict_list[item]["unreg"],
used_time,
read_time,
)
)
elif not dict_list[item]["flush"]:
num = num + 1
print("有效处理数据: {} 探测数据: {}".format(len(dict_list), num))
此脚本使用到了yield和多层字典自动创建的功能,使用方法 脚本 + 日志名。
swimming
No Leanote account ? Sign up now.