Files
cursor_ai/modules/logger.py

347 lines
14 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/python3
# -*- coding: utf-8 -*-
"""
Единый логгер для проекта SSH Client (PRD v1.8).
Поддержка файла, консоли и Telegram через TelegramHandler.
"""
import logging
import os
import queue
import sys
import threading
import time
from typing import Any, Dict, List, Optional, Union
# Имя корневого логгера проекта
ROOT_LOGGER_NAME = "ssh_client"
# Общий формат логов
LOG_FORMAT = "%(asctime)s - %(name)s - %(levelname)s - %(message)s"
LOG_DATE_FORMAT = "%Y-%m-%d %H:%M:%S"
# Кастомный уровень STATUS (между INFO и WARNING)
STATUS_LEVEL = 25
logging.addLevelName(STATUS_LEVEL, "STATUS")
def status(self, message: str, *args, **kwargs) -> None:
if self.isEnabledFor(STATUS_LEVEL):
self._log(STATUS_LEVEL, message, args, **kwargs)
logging.Logger.status = status # type: ignore
_initialized = False
class TelegramHandler(logging.Handler):
"""
Отправляет логи в Telegram через очередь (асинхронно).
Retry при ошибках, разбивка длинных сообщений, /start для верификации.
"""
def __init__(self, config: Dict[str, Any]) -> None:
super().__init__()
self._config = config
self._suppress_levels: List[str] = []
batch = config.get("batch_notifications", {})
if batch.get("enabled"):
self._suppress_levels = [str(x).upper() for x in batch.get("suppress_individual_logs", [])]
self._queue: queue.Queue = queue.Queue(maxsize=1000)
self._bot_token = config.get("bot_token", "")
self._chat_id = str(config.get("chat_id", ""))
self._allowed_users: List[Any] = config.get("allowed_users", [])
self._allowed_levels: List[str] = [str(x).upper() for x in config.get("allowed_levels", ["ERROR"])]
retry = config.get("retry", {})
self._retry_attempts = retry.get("attempts", 3)
self._retry_delay = retry.get("delay", 5)
opts = config.get("options", {})
self._max_length = opts.get("max_message_length", 4096)
self._parse_mode = opts.get("parse_mode", "HTML")
self._disable_notification = opts.get("disable_notification", False)
self._enable_commands = config.get("enable_commands", False)
self._stop = threading.Event()
self._worker = threading.Thread(target=self._process_queue, daemon=True)
self._worker.start()
if self._enable_commands:
self._poll_thread = threading.Thread(target=self._poll_commands, daemon=True)
self._poll_thread.start()
def _level_allowed(self, record: logging.LogRecord) -> bool:
levelname = getattr(record, "levelname", record.levelname)
return str(levelname).upper() in self._allowed_levels
def emit(self, record: logging.LogRecord) -> None:
if not self._bot_token or not self._chat_id:
return
levelname = str(getattr(record, "levelname", record.levelname)).upper()
if self._suppress_levels and levelname in self._suppress_levels:
return
if not self._level_allowed(record):
return
try:
msg = self.format(record)
if self._parse_mode == "HTML":
msg = msg.replace("&", "&amp;").replace("<", "&lt;").replace(">", "&gt;")
self._queue.put_nowait(msg)
except queue.Full:
sys.stderr.write(f"[TelegramHandler] Очередь переполнена, сообщение пропущено\n")
except Exception:
self.handleError(record)
def _process_queue(self) -> None:
while not self._stop.is_set():
try:
msg = self._queue.get(timeout=0.5)
if msg:
self._send_with_retry(msg)
except queue.Empty:
continue
except Exception as e:
sys.stderr.write(f"[TelegramHandler] Ошибка отправки: {e}\n")
def _split_message(self, text: str) -> List[str]:
if len(text) <= self._max_length:
return [text]
parts = []
while text:
chunk = text[: self._max_length]
idx = chunk.rfind("\n")
if idx > self._max_length // 2:
chunk, text = chunk[: idx + 1], text[idx + 1 :]
else:
text = text[self._max_length :]
parts.append(chunk)
return parts
def _send_with_retry(self, text: str) -> None:
parts = self._split_message(text)
for part in parts:
for attempt in range(1, self._retry_attempts + 1):
try:
self._send_to_telegram(part)
break
except Exception as e:
if attempt == self._retry_attempts:
sys.stderr.write(f"[TelegramHandler] Не удалось отправить после {attempt} попыток: {e}\n")
return
time.sleep(self._retry_delay)
def _send_to_telegram(self, text: str) -> None:
import requests
url = f"https://api.telegram.org/bot{self._bot_token}/sendMessage"
data = {
"chat_id": self._chat_id,
"text": text,
"parse_mode": self._parse_mode,
"disable_notification": self._disable_notification,
}
resp = requests.post(url, data=data, timeout=10)
resp.raise_for_status()
j = resp.json()
if not j.get("ok"):
raise RuntimeError(j.get("description", "Unknown Telegram API error"))
def _poll_commands(self) -> None:
import requests
url = f"https://api.telegram.org/bot{self._bot_token}/getUpdates"
offset = 0
while not self._stop.is_set():
try:
r = requests.get(url, params={"offset": offset, "timeout": 30}, timeout=35)
r.raise_for_status()
data = r.json()
if not data.get("ok"):
time.sleep(5)
continue
for upd in data.get("result", []):
offset = upd["update_id"] + 1
msg = upd.get("message", {})
text = msg.get("text", "").strip()
if text != "/start":
continue
user = msg.get("from", {})
user_id = user.get("id")
username = user.get("username", "")
chat_id = msg.get("chat", {}).get("id")
if self._authenticate_user(user_id, username):
reply = "✅ Доступ разрешён. Вы будете получать уведомления."
else:
reply = "❌ Доступ запрещён."
self._send_reply(chat_id, reply)
except Exception as e:
sys.stderr.write(f"[TelegramHandler] Polling error: {e}\n")
time.sleep(5)
def _send_reply(self, chat_id: Any, text: str) -> None:
import requests
url = f"https://api.telegram.org/bot{self._bot_token}/sendMessage"
requests.post(url, data={"chat_id": chat_id, "text": text}, timeout=10)
def _authenticate_user(self, user_id: Optional[int], username: str) -> bool:
for u in self._allowed_users:
if isinstance(u, int) and u == user_id:
return True
if isinstance(u, str) and (u == username or str(u) == str(user_id)):
return True
return False
def flush(self, timeout: float = 15.0) -> None:
"""Дождаться отправки всех сообщений из очереди перед выходом программы."""
deadline = time.monotonic() + timeout
while not self._queue.empty() and time.monotonic() < deadline:
time.sleep(0.2)
self._stop.set()
self._worker.join(timeout=5.0)
def close(self) -> None:
self._stop.set()
super().close()
def flush_telegram_handlers(timeout: float = 15.0) -> None:
"""Ожидание отправки всех сообщений в Telegram перед выходом (вызывать в конце main)."""
root = logging.getLogger(ROOT_LOGGER_NAME)
for h in root.handlers:
if isinstance(h, TelegramHandler):
h.flush(timeout=timeout)
def send_batch_notification(telegram_config: Optional[Dict[str, Any]], text: str) -> None:
"""
Отправить структурированное batch-сообщение напрямую в Telegram (PRD v1.9).
Не идёт через очередь, отправка синхронная с retry.
"""
if not telegram_config or not telegram_config.get("enabled"):
return
token = telegram_config.get("bot_token")
chat_id = telegram_config.get("chat_id")
if not token or not chat_id:
return
retry_cfg = telegram_config.get("retry", {})
attempts = retry_cfg.get("attempts", 3)
delay = retry_cfg.get("delay", 5)
opts = telegram_config.get("options", {})
parse_mode = opts.get("parse_mode", "HTML")
max_len = opts.get("max_message_length", 4096)
if parse_mode == "HTML":
text = text.replace("&", "&amp;").replace("<", "&lt;").replace(">", "&gt;")
parts = [text[i : i + max_len] for i in range(0, len(text), max_len)] if len(text) > max_len else [text]
import requests
url = f"https://api.telegram.org/bot{token}/sendMessage"
for part in parts:
for attempt in range(1, attempts + 1):
try:
resp = requests.post(
url,
data={"chat_id": chat_id, "text": part, "parse_mode": parse_mode},
timeout=10,
)
resp.raise_for_status()
if not resp.json().get("ok"):
raise RuntimeError(resp.json().get("description", "Unknown"))
break
except Exception as e:
if attempt == attempts:
sys.stderr.write(f"[send_batch_notification] Не удалось отправить: {e}\n")
return
time.sleep(delay)
def _parse_level(level: str) -> int:
m = {
"DEBUG": logging.DEBUG,
"INFO": logging.INFO,
"STATUS": STATUS_LEVEL,
"WARNING": logging.WARNING,
"ERROR": logging.ERROR,
"CRITICAL": logging.CRITICAL,
}
return m.get(str(level).upper(), logging.INFO)
def setup_root_logger(
level: Union[int, str] = logging.INFO,
log_file: Optional[str] = None,
script_dir: Optional[str] = None,
telegram_config: Optional[Dict[str, Any]] = None,
) -> logging.Logger:
"""
Настроить корневой логгер проекта.
Args:
level: Уровень логирования (int или "INFO", "DEBUG" и т.д.)
log_file: Путь к файлу логов (опционально)
script_dir: Директория скрипта для fallback (опционально)
telegram_config: Настройки Telegram (если None — Telegram отключён)
Returns:
Настроенный логгер
"""
global _initialized
logger = logging.getLogger(ROOT_LOGGER_NAME)
formatter = logging.Formatter(LOG_FORMAT, datefmt=LOG_DATE_FORMAT)
if isinstance(level, str):
level = _parse_level(level)
if not _initialized:
logger.setLevel(level)
ch = logging.StreamHandler()
ch.setFormatter(formatter)
logger.addHandler(ch)
_initialized = True
if log_file:
has_file = any(
isinstance(h, logging.FileHandler)
and getattr(h, "baseFilename", "").endswith(os.path.basename(log_file))
for h in logger.handlers
)
if not has_file:
try:
log_path = log_file
if script_dir and not os.path.isabs(log_path):
log_path = os.path.normpath(os.path.join(script_dir, log_file))
fh = logging.FileHandler(log_path, encoding="utf-8")
fh.setFormatter(formatter)
logger.addHandler(fh)
except OSError:
if script_dir:
fallback = os.path.join(script_dir, "logs", "zfs_backup.log")
try:
os.makedirs(os.path.dirname(fallback), exist_ok=True)
fh = logging.FileHandler(fallback, encoding="utf-8")
fh.setFormatter(formatter)
logger.addHandler(fh)
logger.warning("Не удалось писать в %s, используется %s", log_file, fallback)
except OSError:
pass
if telegram_config and telegram_config.get("enabled"):
if not any(isinstance(h, TelegramHandler) for h in logger.handlers):
try:
th = TelegramHandler(telegram_config)
th.setLevel(_parse_level(telegram_config.get("log_level", "INFO")))
th.setFormatter(formatter)
logger.addHandler(th)
except Exception as e:
sys.stderr.write(f"[logger] Не удалось инициализировать TelegramHandler: {e}\n")
return logger
def get_logger(name: str) -> logging.Logger:
"""
Получить логгер для модуля.
"""
root = logging.getLogger(ROOT_LOGGER_NAME)
if not root.handlers:
setup_root_logger()
if name.startswith("ssh_client"):
return logging.getLogger(name)
return logging.getLogger(f"{ROOT_LOGGER_NAME}.{name}")