mirror of
https://github.com/zhinianboke/xianyu-auto-reply.git
synced 2025-08-01 12:07:36 +08:00
241 lines
7.5 KiB
Python
241 lines
7.5 KiB
Python
#!/usr/bin/env python3
|
||
"""
|
||
基于文件监控的日志收集器
|
||
"""
|
||
|
||
import os
|
||
import re
|
||
import time
|
||
import threading
|
||
from collections import deque
|
||
from datetime import datetime
|
||
from typing import List, Dict, Optional
|
||
from pathlib import Path
|
||
|
||
class FileLogCollector:
|
||
"""基于文件监控的日志收集器"""
|
||
|
||
def __init__(self, max_logs: int = 2000):
|
||
self.max_logs = max_logs
|
||
self.logs = deque(maxlen=max_logs)
|
||
self.lock = threading.Lock()
|
||
|
||
# 日志文件路径
|
||
self.log_file = None
|
||
self.last_position = 0
|
||
|
||
# 启动文件监控
|
||
self.setup_file_monitoring()
|
||
|
||
def setup_file_monitoring(self):
|
||
"""设置文件监控"""
|
||
# 查找日志文件
|
||
possible_files = [
|
||
"xianyu.log",
|
||
"app.log",
|
||
"system.log",
|
||
"logs/xianyu.log",
|
||
"logs/app.log"
|
||
]
|
||
|
||
for file_path in possible_files:
|
||
if os.path.exists(file_path):
|
||
self.log_file = file_path
|
||
break
|
||
|
||
if not self.log_file:
|
||
# 如果没有找到现有文件,创建一个新的
|
||
self.log_file = "realtime.log"
|
||
|
||
# 设置loguru输出到文件
|
||
self.setup_loguru_file_output()
|
||
|
||
# 启动文件监控线程
|
||
self.monitor_thread = threading.Thread(target=self.monitor_file, daemon=True)
|
||
self.monitor_thread.start()
|
||
|
||
def setup_loguru_file_output(self):
|
||
"""设置loguru输出到文件"""
|
||
try:
|
||
from loguru import logger
|
||
|
||
# 添加文件输出
|
||
logger.add(
|
||
self.log_file,
|
||
format="{time:YYYY-MM-DD HH:mm:ss.SSS} | {level} | {name}:{function}:{line} - {message}",
|
||
level="DEBUG",
|
||
rotation="10 MB",
|
||
retention="7 days",
|
||
enqueue=False, # 改为False,避免队列延迟
|
||
buffering=1 # 行缓冲,立即写入
|
||
)
|
||
|
||
logger.info("文件日志收集器已启动")
|
||
|
||
except ImportError:
|
||
pass
|
||
|
||
def monitor_file(self):
|
||
"""监控日志文件变化"""
|
||
while True:
|
||
try:
|
||
if os.path.exists(self.log_file):
|
||
# 获取文件大小
|
||
file_size = os.path.getsize(self.log_file)
|
||
|
||
if file_size > self.last_position:
|
||
# 读取新增内容
|
||
with open(self.log_file, 'r', encoding='utf-8') as f:
|
||
f.seek(self.last_position)
|
||
new_lines = f.readlines()
|
||
self.last_position = f.tell()
|
||
|
||
# 解析新增的日志行
|
||
for line in new_lines:
|
||
self.parse_log_line(line.strip())
|
||
|
||
time.sleep(0.5) # 每0.5秒检查一次
|
||
|
||
except Exception as e:
|
||
time.sleep(1) # 出错时等待1秒
|
||
|
||
def parse_log_line(self, line: str):
|
||
"""解析日志行"""
|
||
if not line:
|
||
return
|
||
|
||
try:
|
||
# 解析loguru格式的日志
|
||
# 格式: 2025-07-23 15:46:03.430 | INFO | __main__:debug_collector:70 - 消息
|
||
pattern = r'(\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}\.\d{3}) \| (\w+) \| ([^:]+):([^:]+):(\d+) - (.*)'
|
||
match = re.match(pattern, line)
|
||
|
||
if match:
|
||
timestamp_str, level, source, function, line_num, message = match.groups()
|
||
|
||
# 转换时间格式
|
||
try:
|
||
timestamp = datetime.strptime(timestamp_str, '%Y-%m-%d %H:%M:%S.%f')
|
||
except:
|
||
timestamp = datetime.now()
|
||
|
||
log_entry = {
|
||
"timestamp": timestamp.isoformat(),
|
||
"level": level,
|
||
"source": source,
|
||
"function": function,
|
||
"line": int(line_num),
|
||
"message": message
|
||
}
|
||
|
||
with self.lock:
|
||
self.logs.append(log_entry)
|
||
|
||
except Exception as e:
|
||
# 如果解析失败,作为普通消息处理
|
||
log_entry = {
|
||
"timestamp": datetime.now().isoformat(),
|
||
"level": "INFO",
|
||
"source": "system",
|
||
"function": "unknown",
|
||
"line": 0,
|
||
"message": line
|
||
}
|
||
|
||
with self.lock:
|
||
self.logs.append(log_entry)
|
||
|
||
def get_logs(self, lines: int = 200, level_filter: str = None, source_filter: str = None) -> List[Dict]:
|
||
"""获取日志记录"""
|
||
with self.lock:
|
||
logs_list = list(self.logs)
|
||
|
||
# 应用过滤器
|
||
if level_filter:
|
||
logs_list = [log for log in logs_list if log['level'] == level_filter]
|
||
|
||
if source_filter:
|
||
logs_list = [log for log in logs_list if source_filter.lower() in log['source'].lower()]
|
||
|
||
# 返回最后N行
|
||
return logs_list[-lines:] if len(logs_list) > lines else logs_list
|
||
|
||
def clear_logs(self):
|
||
"""清空日志"""
|
||
with self.lock:
|
||
self.logs.clear()
|
||
|
||
def get_stats(self) -> Dict:
|
||
"""获取日志统计信息"""
|
||
with self.lock:
|
||
total_logs = len(self.logs)
|
||
|
||
# 统计各级别日志数量
|
||
level_counts = {}
|
||
source_counts = {}
|
||
|
||
for log in self.logs:
|
||
level = log['level']
|
||
source = log['source']
|
||
|
||
level_counts[level] = level_counts.get(level, 0) + 1
|
||
source_counts[source] = source_counts.get(source, 0) + 1
|
||
|
||
return {
|
||
"total_logs": total_logs,
|
||
"level_counts": level_counts,
|
||
"source_counts": source_counts,
|
||
"max_capacity": self.max_logs,
|
||
"log_file": self.log_file
|
||
}
|
||
|
||
|
||
# 全局文件日志收集器实例
|
||
_file_collector = None
|
||
_file_collector_lock = threading.Lock()
|
||
|
||
|
||
def get_file_log_collector() -> FileLogCollector:
|
||
"""获取全局文件日志收集器实例"""
|
||
global _file_collector
|
||
|
||
if _file_collector is None:
|
||
with _file_collector_lock:
|
||
if _file_collector is None:
|
||
_file_collector = FileLogCollector(max_logs=2000)
|
||
|
||
return _file_collector
|
||
|
||
|
||
def setup_file_logging():
|
||
"""设置文件日志系统"""
|
||
collector = get_file_log_collector()
|
||
return collector
|
||
|
||
|
||
if __name__ == "__main__":
|
||
# 测试文件日志收集器
|
||
collector = setup_file_logging()
|
||
|
||
# 生成一些测试日志
|
||
from loguru import logger
|
||
|
||
logger.info("文件日志收集器测试开始")
|
||
logger.debug("这是调试信息")
|
||
logger.warning("这是警告信息")
|
||
logger.error("这是错误信息")
|
||
logger.info("文件日志收集器测试结束")
|
||
|
||
# 等待文件写入和监控
|
||
time.sleep(2)
|
||
|
||
# 获取日志
|
||
logs = collector.get_logs(10)
|
||
print(f"收集到 {len(logs)} 条日志:")
|
||
for log in logs:
|
||
print(f" [{log['level']}] {log['source']}: {log['message']}")
|
||
|
||
# 获取统计信息
|
||
stats = collector.get_stats()
|
||
print(f"\n统计信息: {stats}")
|