百度360必应搜狗淘宝本站头条
当前位置:网站首页 > 技术分类 > 正文

Python实用案例之实时日志关键词监控系统

ztj100 2025-03-14 22:34 16 浏览 0 评论

实时日志关键词监控系统代码如下:

"""

实时日志关键词监控系统

功能:持续跟踪日志文件,识别预设关键词模式并触发报警

特性:

- 支持多关键词分级报警

- 处理日志轮转(rotation)问题

- 上下文异常捕获

- 频率抑制机制

版本:v2.3

"""

import time

import re

from pathlib import Path

from collections import deque

import hashlib # 用于去重校验

class LogMonitor:

def __init__(self, log_path, patterns, context_lines=5):

"""

初始化日志监视器

:param log_path: 日志文件路径

:param patterns: 监控模式字典

示例:{'ERROR': r'\b(error|exception)\b', 'WARN': r'warning'}

:param context_lines: 异常上下文捕获行数

"""

self.log_path = Path(log_path)

self.patterns = {k: re.compile(v, re.IGNORECASE) for k, v in patterns.items()}

self.context_lines = context_lines

self.position = 0 # 记录已读取的文件位置

self.history = deque(maxlen=1000) # 保存最近日志用于上下文

self.alert_cache = set() # 报警去重缓存

self._validate_path()

def _validate_path(self):

"""路径合法性验证"""

if not self.log_path.exists():

raise FileNotFoundError(f"日志文件 {self.log_path} 不存在")

if not self.log_path.is_file():

raise IsADirectoryError(f"{self.log_path} 是目录而非文件")

def _handle_log_rotation(self):

"""

检测并处理日志轮转情况

原理:当文件inode或大小小于上次记录时,判定为发生轮转

"""

current_inode = self.log_path.stat().st_ino

current_size = self.log_path.stat().st_size

if current_size < self.position or current_inode != self._last_inode:

print(f"检测到日志轮转,重置读取位置 (原inode:{self._last_inode} 新inode:{current_inode})")

self.position = 0

self.history.clear()

self._last_inode = current_inode

def tail_log(self):

"""读取新增日志内容"""

self._handle_log_rotation() # 每次读取前检查轮转

with open(self.log_path, 'r', encoding='utf-8', errors='ignore') as f:

f.seek(self.position)

new_lines = f.readlines()

self.position = f.tell() # 更新读取位置

# 处理Windows换行符差异

if new_lines and '\r' in new_lines[0]:

new_lines = [line.replace('\r', '') for line in new_lines]

return new_lines

def analyze_lines(self, lines):

"""

分析日志行并生成报警

返回结构:{告警级别: [匹配行详情]}

"""

alerts = {}

for line in lines:

line = line.strip()

if not line:

continue

self.history.append(line) # 更新上下文缓存

# 生成当前行指纹用于去重

line_hash = hashlib.md5(line.encode()).hexdigest()

if line_hash in self.alert_cache:

continue

# 多模式匹配

for level, pattern in self.patterns.items():

if pattern.search(line):

context = self._get_context(line)

alerts.setdefault(level, []).append({

'raw': line,

'context': context,

'timestamp': time.strftime('%Y-%m-%d %H:%M:%S')

})

self.alert_cache.add(line_hash)

break # 匹配到高级别后停止检查低级别

return alerts

def _get_context(self, current_line):

"""

获取异常上下文

返回当前行附近的日志作为上下文

"""

context = []

window_size = self.context_lines // 2

# 向前追溯

for line in reversed(self.history):

if line == current_line:

break

context.insert(0, line)

if len(context) >= window_size:

break

# 添加当前行

context.append(f">>> {current_line} <<<")

# 向后查找(由于实时监控,新日志可能还未到达)

return context[:self.context_lines]

def continuous_monitor(self, interval=10, burst_threshold=5):

"""

持续监控入口

:param interval: 检查间隔(秒)

:param burst_threshold: 突发报警阈值(1分钟内同类型报警次数)

"""

burst_counter = {} # 突发计数器 {'ERROR': 5}

while True:

try:

new_lines = self.tail_log()

if new_lines:

alerts = self.analyze_lines(new_lines)

# 处理分级报警

for level, items in alerts.items():

# 突发检测

burst_counter[level] = burst_counter.get(level, 0) + len(items)

if burst_counter[level] > burst_threshold:

self.send_alert(

f"[突发报警] {level}级别报警在1分钟内达到{burst_counter[level]}次",

level='CRITICAL'

)

burst_counter[level] = 0 # 重置计数器

# 发送单个报警

for item in items:

self.send_alert(

f"[{level}] 检测到异常\n"

f"时间: {item['timestamp']}\n"

f"内容: {item['raw']}\n"

f"上下文:\n" + "\n".join(item['context']),

level=level

)

# 清理过期突发计数

for k in list(burst_counter.keys()):

burst_counter[k] = max(0, burst_counter[k] - len(new_lines))

time.sleep(interval)

except KeyboardInterrupt:

print("监控终止")

break

except Exception as e:

self.send_alert(f"监控器自身异常: {str(e)}", level='CRITICAL')

time.sleep(60) # 避免频繁报错

def send_alert(self, message, level='ERROR'):

"""发送报警信息(示例:打印到控制台)"""

# 实际应替换为邮件/钉钉/企业微信等发送逻辑

print(f"\n{'!' * 20} [{level}级别报警] {'!' * 20}\n{message}\n{'!' * 50}\n")

if __name__ == '__main__':

# 示例配置

monitor = LogMonitor(

log_path="/var/log/syslog",

patterns={

'CRITICAL': r'(oom|out of memory|segmentation fault)',

'ERROR': r'(error|exception)',

'WARN': r'(warn|timeout)',

'AUTH': r'(authentication failed|invalid user)'

},

context_lines=7

)

monitor.continuous_monitor(interval=15)

代码解析

  1. 日志轮转处理机制

"""

日志轮转检测原理:

1. 记录文件的inode编号和大小

2. 当检测到当前inode变化 或 文件大小小于上次读取位置时:

- 判定发生日志轮转

- 重置读取位置

- 清空上下文缓存

教学重点:

- 理解inode在文件系统中的作用

- 掌握日志轮转的常见处理方式

- 处理多日志文件场景(如access.log -> access.log.1)

"""

  1. 上下文捕获逻辑

"""

上下文获取策略:

1. 维护固定长度的历史队列(deque)

2. 当发现异常行时:

- 向前追溯获取上文

- 包含当前异常行

- 后续新日志自动作为下文

优势:

- 无需重复读取文件

- 实时更新上下文内容

生产建议:

- 根据日志密度调整上下文行数

- 添加时间范围限制(如最多30秒内的日志)

"""

  1. 报警频率控制

"""

防骚扰机制:

1. MD5去重:相同内容不重复报警

2. 突发检测:1分钟内同类型报警超过阈值时合并通知

3. 计数器衰减:每次检查后减少计数

扩展思路:

- 添加静默期设置(如已报错的服务30分钟内不再提醒)

- 实现报警升级策略(多次相同报警→更高优先级)

"""

  1. 编码处理技巧

"""

with open(..., errors='ignore') as f:

# 处理非UTF8字符问题

日志编码处理:

- 使用errors='ignore'跳过非法字符

- 检测文件编码(使用chardet库)

- 转换到统一编码(如UTF-8)

"""

运维实践指南

  1. 生产环境增强建议

# 在analyze_lines()中添加:

# 1. 业务白名单过滤

if any(ignore in line for ignore in ['expected error', 'test environment']):

continue

# 2. 动态模式加载

def reload_patterns(self):

"""运行时重载匹配规则"""

with open('patterns.yaml') as f:

self.patterns = yaml.safe_load(f)

  1. 性能优化技巧

"""

1. 使用更高效的数据结构:

- 将self.history改为双向链表实现

- 使用Bloom Filter替代MD5去重

2. IO优化:

- 使用inotify机制(需安装pyinotify)替代轮询

- 增大读取缓冲区

"""

  1. 监控器自保护

# 在continuous_monitor()中添加:

# 1. 资源限制

self.process = psutil.Process()

if self.process.memory_percent() > 30:

self.send_alert("监控器内存占用过高!", level='CRITICAL')

# 2. 看门狗机制

with open('/tmp/monitor_heartbeat', 'w') as f:

f.write(time.strftime('%Y-%m-%d %H:%M:%S'))

  1. 日志样例测试

# 生成测试日志

for i in {1..100}; do

echo "[$(date)] This is a test error message $i" >> test.log

sleep 0.1

done

扩展功能建议

  1. 多日志源支持

def add_log_source(self, new_path):

"""动态添加监控日志"""

self.additional_sources.append(LogMonitor(new_path, self.patterns))

  1. 智能模式学习

from sklearn.feature_extraction.text import TfidfVectorizer

def learn_anomaly_patterns(self):

"""基于历史日志训练异常检测模型"""

vectorizer = TfidfVectorizer()

X = vectorizer.fit_transform(self.history)

# 训练异常检测模型...

  1. 与工单系统集成

def create_ticket(self, alert):

"""自动创建运维工单"""

jira.create_issue(

project='OPS',

summary=f"[自动工单] {alert['level']}级别日志报警",

description=alert['context']

)

相关推荐

前端案例·程序员的浪漫:流星雨背景

如果文章对你有收获,还请不要吝啬【点赞收藏评论】三连哦,你的鼓励将是我成长助力之一!谢谢!(1)方式1:简单版本【1】先看实现效果...

UI样式iPod classic的HTML本地音乐播放器框架

PS:音量可以鼠标点击按住在音量图标边的轮盘上下拖拽滑动音量大小中心按钮可以更改播放器为白色...

JavaScript 强制回流问题及优化方案

JavaScript代码在运行过程中可能会强制触发浏览器的回流(Reflow)...

Ai 编辑器 Cursor 零基础教程:推箱子小游戏实战演练

最近Ai火的同时,Ai编辑器Cursor同样火了一把。今天我们就白漂一下Cursor,使用免费版本搞一个零基础教程...

19年前司机被沉尸水库!凶手落网,竟已是身家千万的大老板

]|\[sS])*"|'(?:[^\']|\[sS])*'|[^)}]+)s*)/g,l=window.testenv_reshost||window.__moon_host||"res.wx.qq...

全民健身网络热度调查“居家健身”成为第一网络热词

]|\[sS])*"|'(?:[^\']|\[sS])*'|[^)}]+)s*)/g,l=window.testenv_reshost||window.__moon_host||"res.wx.qq...

取代JavaScript库的10个现代Web API及详细实施代码

为什么浏览器内置的API你还在用某个臃肿的Javascript库呢?用内置的API有什么好处呢?Web平台经历了巨大演进,引入了强大的原生API,不再需要臃肿的JavaScript库。现代浏览器现已支...

前端文件下载的N种姿势:从简单到高级

文件下载是web开发里一个非常常见的功能,无论是下载用户生成的数据、图片、文档还是应用程序包。前端开发者有多种方式来实现这一需求,每种方式都有其适用场景和优缺点。介绍下几种比较常用的文件下载方法。...

JavaScript 性能优化方法(js前端性能优化)

JavaScript性能优化方法减少DOM操作频繁的DOM操作会导致浏览器重绘和回流,影响性能。使用文档片段(DocumentFragment)或虚拟DOM技术减少直接操作。...

DOM节点的创建、插入、删除、查找、替换

在前端开发中,js与html联系最紧密的莫过于对DOM的操作了,本文为大家分享一些DOM节点的基本操作。一、创建DOM节点使用的命令是varoDiv=document.createElement...

前端里的拖拖拽拽(拖拽式前端框架)

最近在项目中使用了react-dnd,一个基于HTML5的拖拽库,“拖拽能力”丰富了前端的交互方式,基于拖拽能力,会扩展各种各样的拖拽反馈效果,因此有必要学习了解,最好的学习方式就是实操!...

大模型实战:Flask+H5三件套实现大模型基础聊天界面

本文使用Flask和H5三件套(HTML+JS+CSS)实现大模型聊天应用的基本方式话不多说,先贴上实现效果:流式输出:思考输出:聊天界面模型设置:模型设置会话切换:前言大模型的聊天应用从功能...

SSE前端(sse前端数据)

<!DOCTYPEhtml><htmllang="zh-CN"><head>...

课堂点名总尴尬?试试 DeepSeek,或能实现点名自由!(附教程)

2025年2月26日"你有没有经历过这样的场景?老师拿着花名册扫视全班:'今天我们来点名...'那一刻心跳加速,默念:'别点我!'但现在,我要...

我会在每个项目中复制这10个JS代码片段

你是否也有这种感觉:在搭建新项目时,你会想:"这个函数我是不是已经写过了...在某个地方?"是的——我也是。所以在开发了数十个React、Node和全栈应用后,我不再重复造轮子。我创建...

取消回复欢迎 发表评论: