百度360必应搜狗淘宝本站头条
当前位置:网站首页 > 技术分类 > 正文

Python实用案例之实时日志关键词监控系统

ztj100 2025-03-14 22:34 21 浏览 0 评论

实时日志关键词监控系统代码如下:

"""

实时日志关键词监控系统

功能:持续跟踪日志文件,识别预设关键词模式并触发报警

特性:

- 支持多关键词分级报警

- 处理日志轮转(rotation)问题

- 上下文异常捕获

- 频率抑制机制

版本:v2.3

"""

import time

import re

from pathlib import Path

from collections import deque

import hashlib # 用于去重校验

class LogMonitor:

def __init__(self, log_path, patterns, context_lines=5):

"""

初始化日志监视器

:param log_path: 日志文件路径

:param patterns: 监控模式字典

示例:{'ERROR': r'\b(error|exception)\b', 'WARN': r'warning'}

:param context_lines: 异常上下文捕获行数

"""

self.log_path = Path(log_path)

self.patterns = {k: re.compile(v, re.IGNORECASE) for k, v in patterns.items()}

self.context_lines = context_lines

self.position = 0 # 记录已读取的文件位置

self.history = deque(maxlen=1000) # 保存最近日志用于上下文

self.alert_cache = set() # 报警去重缓存

self._validate_path()

def _validate_path(self):

"""路径合法性验证"""

if not self.log_path.exists():

raise FileNotFoundError(f"日志文件 {self.log_path} 不存在")

if not self.log_path.is_file():

raise IsADirectoryError(f"{self.log_path} 是目录而非文件")

def _handle_log_rotation(self):

"""

检测并处理日志轮转情况

原理:当文件inode或大小小于上次记录时,判定为发生轮转

"""

current_inode = self.log_path.stat().st_ino

current_size = self.log_path.stat().st_size

if current_size < self.position or current_inode != self._last_inode:

print(f"检测到日志轮转,重置读取位置 (原inode:{self._last_inode} 新inode:{current_inode})")

self.position = 0

self.history.clear()

self._last_inode = current_inode

def tail_log(self):

"""读取新增日志内容"""

self._handle_log_rotation() # 每次读取前检查轮转

with open(self.log_path, 'r', encoding='utf-8', errors='ignore') as f:

f.seek(self.position)

new_lines = f.readlines()

self.position = f.tell() # 更新读取位置

# 处理Windows换行符差异

if new_lines and '\r' in new_lines[0]:

new_lines = [line.replace('\r', '') for line in new_lines]

return new_lines

def analyze_lines(self, lines):

"""

分析日志行并生成报警

返回结构:{告警级别: [匹配行详情]}

"""

alerts = {}

for line in lines:

line = line.strip()

if not line:

continue

self.history.append(line) # 更新上下文缓存

# 生成当前行指纹用于去重

line_hash = hashlib.md5(line.encode()).hexdigest()

if line_hash in self.alert_cache:

continue

# 多模式匹配

for level, pattern in self.patterns.items():

if pattern.search(line):

context = self._get_context(line)

alerts.setdefault(level, []).append({

'raw': line,

'context': context,

'timestamp': time.strftime('%Y-%m-%d %H:%M:%S')

})

self.alert_cache.add(line_hash)

break # 匹配到高级别后停止检查低级别

return alerts

def _get_context(self, current_line):

"""

获取异常上下文

返回当前行附近的日志作为上下文

"""

context = []

window_size = self.context_lines // 2

# 向前追溯

for line in reversed(self.history):

if line == current_line:

break

context.insert(0, line)

if len(context) >= window_size:

break

# 添加当前行

context.append(f">>> {current_line} <<<")

# 向后查找(由于实时监控,新日志可能还未到达)

return context[:self.context_lines]

def continuous_monitor(self, interval=10, burst_threshold=5):

"""

持续监控入口

:param interval: 检查间隔(秒)

:param burst_threshold: 突发报警阈值(1分钟内同类型报警次数)

"""

burst_counter = {} # 突发计数器 {'ERROR': 5}

while True:

try:

new_lines = self.tail_log()

if new_lines:

alerts = self.analyze_lines(new_lines)

# 处理分级报警

for level, items in alerts.items():

# 突发检测

burst_counter[level] = burst_counter.get(level, 0) + len(items)

if burst_counter[level] > burst_threshold:

self.send_alert(

f"[突发报警] {level}级别报警在1分钟内达到{burst_counter[level]}次",

level='CRITICAL'

)

burst_counter[level] = 0 # 重置计数器

# 发送单个报警

for item in items:

self.send_alert(

f"[{level}] 检测到异常\n"

f"时间: {item['timestamp']}\n"

f"内容: {item['raw']}\n"

f"上下文:\n" + "\n".join(item['context']),

level=level

)

# 清理过期突发计数

for k in list(burst_counter.keys()):

burst_counter[k] = max(0, burst_counter[k] - len(new_lines))

time.sleep(interval)

except KeyboardInterrupt:

print("监控终止")

break

except Exception as e:

self.send_alert(f"监控器自身异常: {str(e)}", level='CRITICAL')

time.sleep(60) # 避免频繁报错

def send_alert(self, message, level='ERROR'):

"""发送报警信息(示例:打印到控制台)"""

# 实际应替换为邮件/钉钉/企业微信等发送逻辑

print(f"\n{'!' * 20} [{level}级别报警] {'!' * 20}\n{message}\n{'!' * 50}\n")

if __name__ == '__main__':

# 示例配置

monitor = LogMonitor(

log_path="/var/log/syslog",

patterns={

'CRITICAL': r'(oom|out of memory|segmentation fault)',

'ERROR': r'(error|exception)',

'WARN': r'(warn|timeout)',

'AUTH': r'(authentication failed|invalid user)'

},

context_lines=7

)

monitor.continuous_monitor(interval=15)

代码解析

  1. 日志轮转处理机制

"""

日志轮转检测原理:

1. 记录文件的inode编号和大小

2. 当检测到当前inode变化 或 文件大小小于上次读取位置时:

- 判定发生日志轮转

- 重置读取位置

- 清空上下文缓存

教学重点:

- 理解inode在文件系统中的作用

- 掌握日志轮转的常见处理方式

- 处理多日志文件场景(如access.log -> access.log.1)

"""

  1. 上下文捕获逻辑

"""

上下文获取策略:

1. 维护固定长度的历史队列(deque)

2. 当发现异常行时:

- 向前追溯获取上文

- 包含当前异常行

- 后续新日志自动作为下文

优势:

- 无需重复读取文件

- 实时更新上下文内容

生产建议:

- 根据日志密度调整上下文行数

- 添加时间范围限制(如最多30秒内的日志)

"""

  1. 报警频率控制

"""

防骚扰机制:

1. MD5去重:相同内容不重复报警

2. 突发检测:1分钟内同类型报警超过阈值时合并通知

3. 计数器衰减:每次检查后减少计数

扩展思路:

- 添加静默期设置(如已报错的服务30分钟内不再提醒)

- 实现报警升级策略(多次相同报警→更高优先级)

"""

  1. 编码处理技巧

"""

with open(..., errors='ignore') as f:

# 处理非UTF8字符问题

日志编码处理:

- 使用errors='ignore'跳过非法字符

- 检测文件编码(使用chardet库)

- 转换到统一编码(如UTF-8)

"""

运维实践指南

  1. 生产环境增强建议

# 在analyze_lines()中添加:

# 1. 业务白名单过滤

if any(ignore in line for ignore in ['expected error', 'test environment']):

continue

# 2. 动态模式加载

def reload_patterns(self):

"""运行时重载匹配规则"""

with open('patterns.yaml') as f:

self.patterns = yaml.safe_load(f)

  1. 性能优化技巧

"""

1. 使用更高效的数据结构:

- 将self.history改为双向链表实现

- 使用Bloom Filter替代MD5去重

2. IO优化:

- 使用inotify机制(需安装pyinotify)替代轮询

- 增大读取缓冲区

"""

  1. 监控器自保护

# 在continuous_monitor()中添加:

# 1. 资源限制

self.process = psutil.Process()

if self.process.memory_percent() > 30:

self.send_alert("监控器内存占用过高!", level='CRITICAL')

# 2. 看门狗机制

with open('/tmp/monitor_heartbeat', 'w') as f:

f.write(time.strftime('%Y-%m-%d %H:%M:%S'))

  1. 日志样例测试

# 生成测试日志

for i in {1..100}; do

echo "[$(date)] This is a test error message $i" >> test.log

sleep 0.1

done

扩展功能建议

  1. 多日志源支持

def add_log_source(self, new_path):

"""动态添加监控日志"""

self.additional_sources.append(LogMonitor(new_path, self.patterns))

  1. 智能模式学习

from sklearn.feature_extraction.text import TfidfVectorizer

def learn_anomaly_patterns(self):

"""基于历史日志训练异常检测模型"""

vectorizer = TfidfVectorizer()

X = vectorizer.fit_transform(self.history)

# 训练异常检测模型...

  1. 与工单系统集成

def create_ticket(self, alert):

"""自动创建运维工单"""

jira.create_issue(

project='OPS',

summary=f"[自动工单] {alert['level']}级别日志报警",

description=alert['context']

)

相关推荐

sharding-jdbc实现`分库分表`与`读写分离`

一、前言本文将基于以下环境整合...

三分钟了解mysql中主键、外键、非空、唯一、默认约束是什么

在数据库中,数据表是数据库中最重要、最基本的操作对象,是数据存储的基本单位。数据表被定义为列的集合,数据在表中是按照行和列的格式来存储的。每一行代表一条唯一的记录,每一列代表记录中的一个域。...

MySQL8行级锁_mysql如何加行级锁

MySQL8行级锁版本:8.0.34基本概念...

mysql使用小技巧_mysql使用入门

1、MySQL中有许多很实用的函数,好好利用它们可以省去很多时间:group_concat()将取到的值用逗号连接,可以这么用:selectgroup_concat(distinctid)fr...

MySQL/MariaDB中如何支持全部的Unicode?

永远不要在MySQL中使用utf8,并且始终使用utf8mb4。utf8mb4介绍MySQL/MariaDB中,utf8字符集并不是对Unicode的真正实现,即不是真正的UTF-8编码,因...

聊聊 MySQL Server 可执行注释,你懂了吗?

前言MySQLServer当前支持如下3种注释风格:...

MySQL系列-源码编译安装(v5.7.34)

一、系统环境要求...

MySQL的锁就锁住我啦!与腾讯大佬的技术交谈,是我小看它了

对酒当歌,人生几何!朝朝暮暮,唯有己脱。苦苦寻觅找工作之间,殊不知今日之事乃我心之痛,难道是我不配拥有工作嘛。自面试后他所谓的等待都过去一段时日,可惜在下京东上的小金库都要见低啦。每每想到不由心中一...

MySQL字符问题_mysql中字符串的位置

中文写入乱码问题:我输入的中文编码是urf8的,建的库是urf8的,但是插入mysql总是乱码,一堆"???????????????????????"我用的是ibatis,终于找到原因了,我是这么解决...

深圳尚学堂:mysql基本sql语句大全(三)

数据开发-经典1.按姓氏笔画排序:Select*FromTableNameOrderByCustomerNameCollateChinese_PRC_Stroke_ci_as//从少...

MySQL进行行级锁的?一会next-key锁,一会间隙锁,一会记录锁?

大家好,是不是很多人都对MySQL加行级锁的规则搞的迷迷糊糊,一会是next-key锁,一会是间隙锁,一会又是记录锁。坦白说,确实还挺复杂的,但是好在我找点了点规律,也知道如何如何用命令分析加...

一文讲清怎么利用Python Django实现Excel数据表的导入导出功能

摘要:Python作为一门简单易学且功能强大的编程语言,广受程序员、数据分析师和AI工程师的青睐。本文系统讲解了如何使用Python的Django框架结合openpyxl库实现Excel...

用DataX实现两个MySQL实例间的数据同步

DataXDataX使用Java实现。如果可以实现数据库实例之间准实时的...

MySQL数据库知识_mysql数据库基础知识

MySQL是一种关系型数据库管理系统;那废话不多说,直接上自己以前学习整理文档:查看数据库命令:(1).查看存储过程状态:showprocedurestatus;(2).显示系统变量:show...

如何为MySQL中的JSON字段设置索引

背景MySQL在2015年中发布的5.7.8版本中首次引入了JSON数据类型。自此,它成了一种逃离严格列定义的方式,可以存储各种形状和大小的JSON文档,例如审计日志、配置信息、第三方数据包、用户自定...

取消回复欢迎 发表评论: