百度360必应搜狗淘宝本站头条
当前位置:网站首页 > 技术分类 > 正文

Python实用案例之实时日志关键词监控系统

ztj100 2025-03-14 22:34 13 浏览 0 评论

实时日志关键词监控系统代码如下:

"""

实时日志关键词监控系统

功能:持续跟踪日志文件,识别预设关键词模式并触发报警

特性:

- 支持多关键词分级报警

- 处理日志轮转(rotation)问题

- 上下文异常捕获

- 频率抑制机制

版本:v2.3

"""

import time

import re

from pathlib import Path

from collections import deque

import hashlib # 用于去重校验

class LogMonitor:

def __init__(self, log_path, patterns, context_lines=5):

"""

初始化日志监视器

:param log_path: 日志文件路径

:param patterns: 监控模式字典

示例:{'ERROR': r'\b(error|exception)\b', 'WARN': r'warning'}

:param context_lines: 异常上下文捕获行数

"""

self.log_path = Path(log_path)

self.patterns = {k: re.compile(v, re.IGNORECASE) for k, v in patterns.items()}

self.context_lines = context_lines

self.position = 0 # 记录已读取的文件位置

self.history = deque(maxlen=1000) # 保存最近日志用于上下文

self.alert_cache = set() # 报警去重缓存

self._validate_path()

def _validate_path(self):

"""路径合法性验证"""

if not self.log_path.exists():

raise FileNotFoundError(f"日志文件 {self.log_path} 不存在")

if not self.log_path.is_file():

raise IsADirectoryError(f"{self.log_path} 是目录而非文件")

def _handle_log_rotation(self):

"""

检测并处理日志轮转情况

原理:当文件inode或大小小于上次记录时,判定为发生轮转

"""

current_inode = self.log_path.stat().st_ino

current_size = self.log_path.stat().st_size

if current_size < self.position or current_inode != self._last_inode:

print(f"检测到日志轮转,重置读取位置 (原inode:{self._last_inode} 新inode:{current_inode})")

self.position = 0

self.history.clear()

self._last_inode = current_inode

def tail_log(self):

"""读取新增日志内容"""

self._handle_log_rotation() # 每次读取前检查轮转

with open(self.log_path, 'r', encoding='utf-8', errors='ignore') as f:

f.seek(self.position)

new_lines = f.readlines()

self.position = f.tell() # 更新读取位置

# 处理Windows换行符差异

if new_lines and '\r' in new_lines[0]:

new_lines = [line.replace('\r', '') for line in new_lines]

return new_lines

def analyze_lines(self, lines):

"""

分析日志行并生成报警

返回结构:{告警级别: [匹配行详情]}

"""

alerts = {}

for line in lines:

line = line.strip()

if not line:

continue

self.history.append(line) # 更新上下文缓存

# 生成当前行指纹用于去重

line_hash = hashlib.md5(line.encode()).hexdigest()

if line_hash in self.alert_cache:

continue

# 多模式匹配

for level, pattern in self.patterns.items():

if pattern.search(line):

context = self._get_context(line)

alerts.setdefault(level, []).append({

'raw': line,

'context': context,

'timestamp': time.strftime('%Y-%m-%d %H:%M:%S')

})

self.alert_cache.add(line_hash)

break # 匹配到高级别后停止检查低级别

return alerts

def _get_context(self, current_line):

"""

获取异常上下文

返回当前行附近的日志作为上下文

"""

context = []

window_size = self.context_lines // 2

# 向前追溯

for line in reversed(self.history):

if line == current_line:

break

context.insert(0, line)

if len(context) >= window_size:

break

# 添加当前行

context.append(f">>> {current_line} <<<")

# 向后查找(由于实时监控,新日志可能还未到达)

return context[:self.context_lines]

def continuous_monitor(self, interval=10, burst_threshold=5):

"""

持续监控入口

:param interval: 检查间隔(秒)

:param burst_threshold: 突发报警阈值(1分钟内同类型报警次数)

"""

burst_counter = {} # 突发计数器 {'ERROR': 5}

while True:

try:

new_lines = self.tail_log()

if new_lines:

alerts = self.analyze_lines(new_lines)

# 处理分级报警

for level, items in alerts.items():

# 突发检测

burst_counter[level] = burst_counter.get(level, 0) + len(items)

if burst_counter[level] > burst_threshold:

self.send_alert(

f"[突发报警] {level}级别报警在1分钟内达到{burst_counter[level]}次",

level='CRITICAL'

)

burst_counter[level] = 0 # 重置计数器

# 发送单个报警

for item in items:

self.send_alert(

f"[{level}] 检测到异常\n"

f"时间: {item['timestamp']}\n"

f"内容: {item['raw']}\n"

f"上下文:\n" + "\n".join(item['context']),

level=level

)

# 清理过期突发计数

for k in list(burst_counter.keys()):

burst_counter[k] = max(0, burst_counter[k] - len(new_lines))

time.sleep(interval)

except KeyboardInterrupt:

print("监控终止")

break

except Exception as e:

self.send_alert(f"监控器自身异常: {str(e)}", level='CRITICAL')

time.sleep(60) # 避免频繁报错

def send_alert(self, message, level='ERROR'):

"""发送报警信息(示例:打印到控制台)"""

# 实际应替换为邮件/钉钉/企业微信等发送逻辑

print(f"\n{'!' * 20} [{level}级别报警] {'!' * 20}\n{message}\n{'!' * 50}\n")

if __name__ == '__main__':

# 示例配置

monitor = LogMonitor(

log_path="/var/log/syslog",

patterns={

'CRITICAL': r'(oom|out of memory|segmentation fault)',

'ERROR': r'(error|exception)',

'WARN': r'(warn|timeout)',

'AUTH': r'(authentication failed|invalid user)'

},

context_lines=7

)

monitor.continuous_monitor(interval=15)

代码解析

  1. 日志轮转处理机制

"""

日志轮转检测原理:

1. 记录文件的inode编号和大小

2. 当检测到当前inode变化 或 文件大小小于上次读取位置时:

- 判定发生日志轮转

- 重置读取位置

- 清空上下文缓存

教学重点:

- 理解inode在文件系统中的作用

- 掌握日志轮转的常见处理方式

- 处理多日志文件场景(如access.log -> access.log.1)

"""

  1. 上下文捕获逻辑

"""

上下文获取策略:

1. 维护固定长度的历史队列(deque)

2. 当发现异常行时:

- 向前追溯获取上文

- 包含当前异常行

- 后续新日志自动作为下文

优势:

- 无需重复读取文件

- 实时更新上下文内容

生产建议:

- 根据日志密度调整上下文行数

- 添加时间范围限制(如最多30秒内的日志)

"""

  1. 报警频率控制

"""

防骚扰机制:

1. MD5去重:相同内容不重复报警

2. 突发检测:1分钟内同类型报警超过阈值时合并通知

3. 计数器衰减:每次检查后减少计数

扩展思路:

- 添加静默期设置(如已报错的服务30分钟内不再提醒)

- 实现报警升级策略(多次相同报警→更高优先级)

"""

  1. 编码处理技巧

"""

with open(..., errors='ignore') as f:

# 处理非UTF8字符问题

日志编码处理:

- 使用errors='ignore'跳过非法字符

- 检测文件编码(使用chardet库)

- 转换到统一编码(如UTF-8)

"""

运维实践指南

  1. 生产环境增强建议

# 在analyze_lines()中添加:

# 1. 业务白名单过滤

if any(ignore in line for ignore in ['expected error', 'test environment']):

continue

# 2. 动态模式加载

def reload_patterns(self):

"""运行时重载匹配规则"""

with open('patterns.yaml') as f:

self.patterns = yaml.safe_load(f)

  1. 性能优化技巧

"""

1. 使用更高效的数据结构:

- 将self.history改为双向链表实现

- 使用Bloom Filter替代MD5去重

2. IO优化:

- 使用inotify机制(需安装pyinotify)替代轮询

- 增大读取缓冲区

"""

  1. 监控器自保护

# 在continuous_monitor()中添加:

# 1. 资源限制

self.process = psutil.Process()

if self.process.memory_percent() > 30:

self.send_alert("监控器内存占用过高!", level='CRITICAL')

# 2. 看门狗机制

with open('/tmp/monitor_heartbeat', 'w') as f:

f.write(time.strftime('%Y-%m-%d %H:%M:%S'))

  1. 日志样例测试

# 生成测试日志

for i in {1..100}; do

echo "[$(date)] This is a test error message $i" >> test.log

sleep 0.1

done

扩展功能建议

  1. 多日志源支持

def add_log_source(self, new_path):

"""动态添加监控日志"""

self.additional_sources.append(LogMonitor(new_path, self.patterns))

  1. 智能模式学习

from sklearn.feature_extraction.text import TfidfVectorizer

def learn_anomaly_patterns(self):

"""基于历史日志训练异常检测模型"""

vectorizer = TfidfVectorizer()

X = vectorizer.fit_transform(self.history)

# 训练异常检测模型...

  1. 与工单系统集成

def create_ticket(self, alert):

"""自动创建运维工单"""

jira.create_issue(

project='OPS',

summary=f"[自动工单] {alert['level']}级别日志报警",

description=alert['context']

)

相关推荐

30天学会Python编程:16. Python常用标准库使用教程

16.1collections模块16.1.1高级数据结构16.1.2示例...

强烈推荐!Python 这个宝藏库 re 正则匹配

Python的re模块(RegularExpression正则表达式)提供各种正则表达式的匹配操作。...

Python爬虫中正则表达式的用法,只讲如何应用,不讲原理

Python爬虫:正则的用法(非原理)。大家好,这节课给大家讲正则的实际用法,不讲原理,通俗易懂的讲如何用正则抓取内容。·导入re库,这里是需要从html这段字符串中提取出中间的那几个文字。实例一个对...

Python数据分析实战-正则提取文本的URL网址和邮箱(源码和效果)

实现功能:Python数据分析实战-利用正则表达式提取文本中的URL网址和邮箱...

python爬虫教程之爬取当当网 Top 500 本五星好评书籍

我们使用requests和re来写一个爬虫作为一个爱看书的你(说的跟真的似的)怎么能发现好书呢?所以我们爬取当当网的前500本好五星评书籍怎么样?ok接下来就是学习python的正确姿...

深入理解re模块:Python中的正则表达式神器解析

在Python中,"re"是一个强大的模块,用于处理正则表达式(regularexpressions)。正则表达式是一种强大的文本模式匹配工具,用于在字符串中查找、替换或提取特定模式...

如何使用正则表达式和 Python 匹配不以模式开头的字符串

需要在Python中使用正则表达式来匹配不以给定模式开头的字符串吗?如果是这样,你可以使用下面的语法来查找所有的字符串,除了那些不以https开始的字符串。r"^(?!https).*&...

先Mark后用!8分钟读懂 Python 性能优化

从本文总结了Python开发时,遇到的性能优化问题的定位和解决。概述:性能优化的原则——优化需要优化的部分。性能优化的一般步骤:首先,让你的程序跑起来结果一切正常。然后,运行这个结果正常的代码,看看它...

Python“三步”即可爬取,毋庸置疑

声明:本实例仅供学习,切忌遵守robots协议,请不要使用多线程等方式频繁访问网站。#第一步导入模块importreimportrequests#第二步获取你想爬取的网页地址,发送请求,获取网页内...

简单学Python——re库(正则表达式)2(split、findall、和sub)

1、split():分割字符串,返回列表语法:re.split('分隔符','目标字符串')例如:importrere.split(',','...

Lavazza拉瓦萨再度牵手上海大师赛

阅读此文前,麻烦您点击一下“关注”,方便您进行讨论和分享。Lavazza拉瓦萨再度牵手上海大师赛标题:2024上海大师赛:网球与咖啡的浪漫邂逅在2024年的上海劳力士大师赛上,拉瓦萨咖啡再次成为官...

ArkUI-X构建Android平台AAR及使用

本教程主要讲述如何利用ArkUI-XSDK完成AndroidAAR开发,实现基于ArkTS的声明式开发范式在android平台显示。包括:1.跨平台Library工程开发介绍...

Deepseek写歌详细教程(怎样用deepseek写歌功能)

以下为结合DeepSeek及相关工具实现AI写歌的详细教程,涵盖作词、作曲、演唱全流程:一、核心流程三步法1.AI生成歌词-打开DeepSeek(网页/APP/API),使用结构化提示词生成歌词:...

“AI说唱解说影视”走红,“零基础入行”靠谱吗?本报记者实测

“手里翻找冻鱼,精心的布局;老漠却不言语,脸上带笑意……”《狂飙》剧情被写成歌词,再配上“科目三”背景音乐的演唱,这段1分钟30秒的视频受到了无数网友的点赞。最近一段时间随着AI技术的发展,说唱解说影...

AI音乐制作神器揭秘!3款工具让你秒变高手

在音乐创作的领域里,每个人都有一颗想要成为大师的心。但是面对复杂的乐理知识和繁复的制作过程,许多人的热情被一点点消磨。...

取消回复欢迎 发表评论: