Python 日志分析一例

近期POS服务因无法确定每个环节耗时,且环境不允许搭建ELK,所以使用Python里面的迭代yield写了一个日志分析功能,具体代码如下:

#!/usr/local/python37/bin/python3.7
# By: swimming
# Date:2020/08/01
# 分析pos-gateway 日志

import time
import sys
import os

# 重写getitem,可以自动创建没有key的字典
class multidict(dict):
    def __getitem__(self, item):
        try:
            return dict.__getitem__(self, item)
        except KeyError:
            value = self[item] = type(self)()
            return value

    # 2 missing方式
    def __missing__(self, key):
        value = self[key] = type(self)()
        return value


def r_line(filepath):
    with open(filepath, "r") as f:
        for item in f.readlines():
            yield item.split(" ")


def diffTime(day, s1, s2):
    FMT = "%Y-%m-%d %H:%M:%S"
    s1 = "{} {}".format(day, s1)
    s2 = "{} {}".format(day, s2)

    # print("S1:{}\nS2:{}\n\n".format(day, s1, s2))
    s_time1 = time.mktime(time.strptime(s1.split(".")[0], FMT)) * 1000 + int(
        s1.split(".")[1]
    )
    s_time2 = time.mktime(time.strptime(s2.split(".")[0], FMT)) * 1000 + int(
        s2.split(".")[1]
    )
    return int(s_time2 - s_time1)


if __name__ == "__main__":
    if len(sys.argv) == 2:
        # file = os.path.dirname(os.path.realpath(__file__)) +'/'+ sys.argv[1]
        # 全路径参数

        file = sys.argv[1]
        dict_list = multidict()
        line_datas = r_line(file)

        for line in line_datas:
            try:
                if line[12].startswith("0x"):
                    id_str = line[12].replace(",", "")
                    end_str = line[-1].replace("\n", "")
                    s_time = str(line[1])
                    # print("ID:{}----- {}".format(id_str, end_str))
                    # 转换时间
                    # time_str = "{} {}".format(str(line[0]), str(line[1]))
                    # 判断执行状态  REGISTERED,ACTIVE,READ: 120B,WRITE: 134B,FLUSH,CLOSE,READ COMPLETE,INACTIVE,UNREGISTERED
                    if end_str == "REGISTERED":
                        dict_list[id_str]["reg"] = s_time
                        # print("REG:{}".format(line[1]))
                    if end_str == "ACTIVE":
                        dict_list[id_str]["act"] = s_time
                    if end_str.endswith("B"):
                        if line[-2] == "READ:":
                            dict_list[id_str]["rec"] = s_time
                            dict_list[id_str]["rec_size"] = end_str
                        if line[-2] == "WRITE:":
                            dict_list[id_str]["send"] = s_time
                            dict_list[id_str]["send_size"] = end_str
                    if end_str == "FLUSH":
                        dict_list[id_str]["flush"] = s_time
                    if end_str == "CLOSE":
                        dict_list[id_str]["close"] = s_time
                    if end_str == "COMPLETE":
                        dict_list[id_str]["compl"] = s_time
                    if end_str == "INACTIVE":
                        dict_list[id_str]["unact"] = s_time
                    if end_str == "UNREGISTERED":
                        dict_list[id_str]["unreg"] = s_time

            except IndexError:
                continue
        num = 0
        for item in dict_list:
            # 时间计算部分 需要计算2个时间,全部耗时和接受耗时
            if dict_list[item]["flush"] and dict_list[item]["reg"]:
                try:
                    used_time = diffTime(
                        line[0], dict_list[item]["reg"], dict_list[item]["unreg"]
                    )
                    read_time = diffTime(
                        line[0], dict_list[item]["reg"], dict_list[item]["rec"]
                    )
                    print("REC: {}".format(dict_list[item]["rec"]))
                except Exception:
                    print("ERROR ID: %s" % item)
                    sys.exit(1)
                print(
                    "-- REC:{} -- SEND:{}\n{} reg: {} act: {} r: {} w: {} flush: {} close: {} com: {} inact: {} unreg: {} used: {} r_used: {}".format(
                        dict_list[item]["rec_size"],
                        dict_list[item]["send_size"],
                        item,
                        dict_list[item]["reg"],
                        dict_list[item]["act"],
                        dict_list[item]["rec"],
                        dict_list[item]["send"],
                        dict_list[item]["flush"],
                        dict_list[item]["close"],
                        dict_list[item]["compl"],
                        dict_list[item]["unact"],
                        dict_list[item]["unreg"],
                        used_time,
                        read_time,
                    )
                )
            elif not dict_list[item]["flush"]:
                num = num + 1
    print("有效处理数据: {}   探测数据: {}".format(len(dict_list), num))

此脚本使用到了yield和多层字典自动创建的功能,使用方法 脚本 + 日志名。


觉得不错,点个赞?
Sign in to leave a comment.
No Leanote account ? Sign up now.
0 条评论
文章目录