xianyu-auto-reply/file_log_collector.py
2025-07-25 10:30:33 +08:00

241 lines
7.5 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
"""
基于文件监控的日志收集器
"""
import os
import re
import time
import threading
from collections import deque
from datetime import datetime
from typing import List, Dict, Optional
from pathlib import Path
class FileLogCollector:
"""基于文件监控的日志收集器"""
def __init__(self, max_logs: int = 2000):
self.max_logs = max_logs
self.logs = deque(maxlen=max_logs)
self.lock = threading.Lock()
# 日志文件路径
self.log_file = None
self.last_position = 0
# 启动文件监控
self.setup_file_monitoring()
def setup_file_monitoring(self):
"""设置文件监控"""
# 查找日志文件
possible_files = [
"xianyu.log",
"app.log",
"system.log",
"logs/xianyu.log",
"logs/app.log"
]
for file_path in possible_files:
if os.path.exists(file_path):
self.log_file = file_path
break
if not self.log_file:
# 如果没有找到现有文件,创建一个新的
self.log_file = "realtime.log"
# 设置loguru输出到文件
self.setup_loguru_file_output()
# 启动文件监控线程
self.monitor_thread = threading.Thread(target=self.monitor_file, daemon=True)
self.monitor_thread.start()
def setup_loguru_file_output(self):
"""设置loguru输出到文件"""
try:
from loguru import logger
# 添加文件输出
logger.add(
self.log_file,
format="{time:YYYY-MM-DD HH:mm:ss.SSS} | {level} | {name}:{function}:{line} - {message}",
level="DEBUG",
rotation="10 MB",
retention="7 days",
enqueue=False, # 改为False避免队列延迟
buffering=1 # 行缓冲,立即写入
)
logger.info("文件日志收集器已启动")
except ImportError:
pass
def monitor_file(self):
"""监控日志文件变化"""
while True:
try:
if os.path.exists(self.log_file):
# 获取文件大小
file_size = os.path.getsize(self.log_file)
if file_size > self.last_position:
# 读取新增内容
with open(self.log_file, 'r', encoding='utf-8') as f:
f.seek(self.last_position)
new_lines = f.readlines()
self.last_position = f.tell()
# 解析新增的日志行
for line in new_lines:
self.parse_log_line(line.strip())
time.sleep(0.5) # 每0.5秒检查一次
except Exception as e:
time.sleep(1) # 出错时等待1秒
def parse_log_line(self, line: str):
"""解析日志行"""
if not line:
return
try:
# 解析loguru格式的日志
# 格式: 2025-07-23 15:46:03.430 | INFO | __main__:debug_collector:70 - 消息
pattern = r'(\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}\.\d{3}) \| (\w+) \| ([^:]+):([^:]+):(\d+) - (.*)'
match = re.match(pattern, line)
if match:
timestamp_str, level, source, function, line_num, message = match.groups()
# 转换时间格式
try:
timestamp = datetime.strptime(timestamp_str, '%Y-%m-%d %H:%M:%S.%f')
except:
timestamp = datetime.now()
log_entry = {
"timestamp": timestamp.isoformat(),
"level": level,
"source": source,
"function": function,
"line": int(line_num),
"message": message
}
with self.lock:
self.logs.append(log_entry)
except Exception as e:
# 如果解析失败,作为普通消息处理
log_entry = {
"timestamp": datetime.now().isoformat(),
"level": "INFO",
"source": "system",
"function": "unknown",
"line": 0,
"message": line
}
with self.lock:
self.logs.append(log_entry)
def get_logs(self, lines: int = 200, level_filter: str = None, source_filter: str = None) -> List[Dict]:
"""获取日志记录"""
with self.lock:
logs_list = list(self.logs)
# 应用过滤器
if level_filter:
logs_list = [log for log in logs_list if log['level'] == level_filter]
if source_filter:
logs_list = [log for log in logs_list if source_filter.lower() in log['source'].lower()]
# 返回最后N行
return logs_list[-lines:] if len(logs_list) > lines else logs_list
def clear_logs(self):
"""清空日志"""
with self.lock:
self.logs.clear()
def get_stats(self) -> Dict:
"""获取日志统计信息"""
with self.lock:
total_logs = len(self.logs)
# 统计各级别日志数量
level_counts = {}
source_counts = {}
for log in self.logs:
level = log['level']
source = log['source']
level_counts[level] = level_counts.get(level, 0) + 1
source_counts[source] = source_counts.get(source, 0) + 1
return {
"total_logs": total_logs,
"level_counts": level_counts,
"source_counts": source_counts,
"max_capacity": self.max_logs,
"log_file": self.log_file
}
# 全局文件日志收集器实例
_file_collector = None
_file_collector_lock = threading.Lock()
def get_file_log_collector() -> FileLogCollector:
"""获取全局文件日志收集器实例"""
global _file_collector
if _file_collector is None:
with _file_collector_lock:
if _file_collector is None:
_file_collector = FileLogCollector(max_logs=2000)
return _file_collector
def setup_file_logging():
"""设置文件日志系统"""
collector = get_file_log_collector()
return collector
if __name__ == "__main__":
# 测试文件日志收集器
collector = setup_file_logging()
# 生成一些测试日志
from loguru import logger
logger.info("文件日志收集器测试开始")
logger.debug("这是调试信息")
logger.warning("这是警告信息")
logger.error("这是错误信息")
logger.info("文件日志收集器测试结束")
# 等待文件写入和监控
time.sleep(2)
# 获取日志
logs = collector.get_logs(10)
print(f"收集到 {len(logs)} 条日志:")
for log in logs:
print(f" [{log['level']}] {log['source']}: {log['message']}")
# 获取统计信息
stats = collector.get_stats()
print(f"\n统计信息: {stats}")