347 lines
14 KiB
Python
347 lines
14 KiB
Python
#!/usr/bin/python3
|
||
# -*- coding: utf-8 -*-
|
||
"""
|
||
Единый логгер для проекта SSH Client (PRD v1.8).
|
||
Поддержка файла, консоли и Telegram через TelegramHandler.
|
||
"""
|
||
import logging
|
||
import os
|
||
import queue
|
||
import sys
|
||
import threading
|
||
import time
|
||
from typing import Any, Dict, List, Optional, Union
|
||
|
||
# Имя корневого логгера проекта
|
||
ROOT_LOGGER_NAME = "ssh_client"
|
||
|
||
# Общий формат логов
|
||
LOG_FORMAT = "%(asctime)s - %(name)s - %(levelname)s - %(message)s"
|
||
LOG_DATE_FORMAT = "%Y-%m-%d %H:%M:%S"
|
||
|
||
# Кастомный уровень STATUS (между INFO и WARNING)
|
||
STATUS_LEVEL = 25
|
||
logging.addLevelName(STATUS_LEVEL, "STATUS")
|
||
|
||
|
||
def status(self, message: str, *args, **kwargs) -> None:
|
||
if self.isEnabledFor(STATUS_LEVEL):
|
||
self._log(STATUS_LEVEL, message, args, **kwargs)
|
||
|
||
|
||
logging.Logger.status = status # type: ignore
|
||
|
||
_initialized = False
|
||
|
||
|
||
class TelegramHandler(logging.Handler):
|
||
"""
|
||
Отправляет логи в Telegram через очередь (асинхронно).
|
||
Retry при ошибках, разбивка длинных сообщений, /start для верификации.
|
||
"""
|
||
|
||
def __init__(self, config: Dict[str, Any]) -> None:
|
||
super().__init__()
|
||
self._config = config
|
||
self._suppress_levels: List[str] = []
|
||
batch = config.get("batch_notifications", {})
|
||
if batch.get("enabled"):
|
||
self._suppress_levels = [str(x).upper() for x in batch.get("suppress_individual_logs", [])]
|
||
self._queue: queue.Queue = queue.Queue(maxsize=1000)
|
||
self._bot_token = config.get("bot_token", "")
|
||
self._chat_id = str(config.get("chat_id", ""))
|
||
self._allowed_users: List[Any] = config.get("allowed_users", [])
|
||
self._allowed_levels: List[str] = [str(x).upper() for x in config.get("allowed_levels", ["ERROR"])]
|
||
retry = config.get("retry", {})
|
||
self._retry_attempts = retry.get("attempts", 3)
|
||
self._retry_delay = retry.get("delay", 5)
|
||
opts = config.get("options", {})
|
||
self._max_length = opts.get("max_message_length", 4096)
|
||
self._parse_mode = opts.get("parse_mode", "HTML")
|
||
self._disable_notification = opts.get("disable_notification", False)
|
||
self._enable_commands = config.get("enable_commands", False)
|
||
self._stop = threading.Event()
|
||
self._worker = threading.Thread(target=self._process_queue, daemon=True)
|
||
self._worker.start()
|
||
if self._enable_commands:
|
||
self._poll_thread = threading.Thread(target=self._poll_commands, daemon=True)
|
||
self._poll_thread.start()
|
||
|
||
def _level_allowed(self, record: logging.LogRecord) -> bool:
|
||
levelname = getattr(record, "levelname", record.levelname)
|
||
return str(levelname).upper() in self._allowed_levels
|
||
|
||
def emit(self, record: logging.LogRecord) -> None:
|
||
if not self._bot_token or not self._chat_id:
|
||
return
|
||
levelname = str(getattr(record, "levelname", record.levelname)).upper()
|
||
if self._suppress_levels and levelname in self._suppress_levels:
|
||
return
|
||
if not self._level_allowed(record):
|
||
return
|
||
try:
|
||
msg = self.format(record)
|
||
if self._parse_mode == "HTML":
|
||
msg = msg.replace("&", "&").replace("<", "<").replace(">", ">")
|
||
self._queue.put_nowait(msg)
|
||
except queue.Full:
|
||
sys.stderr.write(f"[TelegramHandler] Очередь переполнена, сообщение пропущено\n")
|
||
except Exception:
|
||
self.handleError(record)
|
||
|
||
def _process_queue(self) -> None:
|
||
while not self._stop.is_set():
|
||
try:
|
||
msg = self._queue.get(timeout=0.5)
|
||
if msg:
|
||
self._send_with_retry(msg)
|
||
except queue.Empty:
|
||
continue
|
||
except Exception as e:
|
||
sys.stderr.write(f"[TelegramHandler] Ошибка отправки: {e}\n")
|
||
|
||
def _split_message(self, text: str) -> List[str]:
|
||
if len(text) <= self._max_length:
|
||
return [text]
|
||
parts = []
|
||
while text:
|
||
chunk = text[: self._max_length]
|
||
idx = chunk.rfind("\n")
|
||
if idx > self._max_length // 2:
|
||
chunk, text = chunk[: idx + 1], text[idx + 1 :]
|
||
else:
|
||
text = text[self._max_length :]
|
||
parts.append(chunk)
|
||
return parts
|
||
|
||
def _send_with_retry(self, text: str) -> None:
|
||
parts = self._split_message(text)
|
||
for part in parts:
|
||
for attempt in range(1, self._retry_attempts + 1):
|
||
try:
|
||
self._send_to_telegram(part)
|
||
break
|
||
except Exception as e:
|
||
if attempt == self._retry_attempts:
|
||
sys.stderr.write(f"[TelegramHandler] Не удалось отправить после {attempt} попыток: {e}\n")
|
||
return
|
||
time.sleep(self._retry_delay)
|
||
|
||
def _send_to_telegram(self, text: str) -> None:
|
||
import requests
|
||
|
||
url = f"https://api.telegram.org/bot{self._bot_token}/sendMessage"
|
||
data = {
|
||
"chat_id": self._chat_id,
|
||
"text": text,
|
||
"parse_mode": self._parse_mode,
|
||
"disable_notification": self._disable_notification,
|
||
}
|
||
resp = requests.post(url, data=data, timeout=10)
|
||
resp.raise_for_status()
|
||
j = resp.json()
|
||
if not j.get("ok"):
|
||
raise RuntimeError(j.get("description", "Unknown Telegram API error"))
|
||
|
||
def _poll_commands(self) -> None:
|
||
import requests
|
||
|
||
url = f"https://api.telegram.org/bot{self._bot_token}/getUpdates"
|
||
offset = 0
|
||
while not self._stop.is_set():
|
||
try:
|
||
r = requests.get(url, params={"offset": offset, "timeout": 30}, timeout=35)
|
||
r.raise_for_status()
|
||
data = r.json()
|
||
if not data.get("ok"):
|
||
time.sleep(5)
|
||
continue
|
||
for upd in data.get("result", []):
|
||
offset = upd["update_id"] + 1
|
||
msg = upd.get("message", {})
|
||
text = msg.get("text", "").strip()
|
||
if text != "/start":
|
||
continue
|
||
user = msg.get("from", {})
|
||
user_id = user.get("id")
|
||
username = user.get("username", "")
|
||
chat_id = msg.get("chat", {}).get("id")
|
||
if self._authenticate_user(user_id, username):
|
||
reply = "✅ Доступ разрешён. Вы будете получать уведомления."
|
||
else:
|
||
reply = "❌ Доступ запрещён."
|
||
self._send_reply(chat_id, reply)
|
||
except Exception as e:
|
||
sys.stderr.write(f"[TelegramHandler] Polling error: {e}\n")
|
||
time.sleep(5)
|
||
|
||
def _send_reply(self, chat_id: Any, text: str) -> None:
|
||
import requests
|
||
|
||
url = f"https://api.telegram.org/bot{self._bot_token}/sendMessage"
|
||
requests.post(url, data={"chat_id": chat_id, "text": text}, timeout=10)
|
||
|
||
def _authenticate_user(self, user_id: Optional[int], username: str) -> bool:
|
||
for u in self._allowed_users:
|
||
if isinstance(u, int) and u == user_id:
|
||
return True
|
||
if isinstance(u, str) and (u == username or str(u) == str(user_id)):
|
||
return True
|
||
return False
|
||
|
||
def flush(self, timeout: float = 15.0) -> None:
|
||
"""Дождаться отправки всех сообщений из очереди перед выходом программы."""
|
||
deadline = time.monotonic() + timeout
|
||
while not self._queue.empty() and time.monotonic() < deadline:
|
||
time.sleep(0.2)
|
||
self._stop.set()
|
||
self._worker.join(timeout=5.0)
|
||
|
||
def close(self) -> None:
|
||
self._stop.set()
|
||
super().close()
|
||
|
||
|
||
def flush_telegram_handlers(timeout: float = 15.0) -> None:
|
||
"""Ожидание отправки всех сообщений в Telegram перед выходом (вызывать в конце main)."""
|
||
root = logging.getLogger(ROOT_LOGGER_NAME)
|
||
for h in root.handlers:
|
||
if isinstance(h, TelegramHandler):
|
||
h.flush(timeout=timeout)
|
||
|
||
|
||
def send_batch_notification(telegram_config: Optional[Dict[str, Any]], text: str) -> None:
|
||
"""
|
||
Отправить структурированное batch-сообщение напрямую в Telegram (PRD v1.9).
|
||
Не идёт через очередь, отправка синхронная с retry.
|
||
"""
|
||
if not telegram_config or not telegram_config.get("enabled"):
|
||
return
|
||
token = telegram_config.get("bot_token")
|
||
chat_id = telegram_config.get("chat_id")
|
||
if not token or not chat_id:
|
||
return
|
||
retry_cfg = telegram_config.get("retry", {})
|
||
attempts = retry_cfg.get("attempts", 3)
|
||
delay = retry_cfg.get("delay", 5)
|
||
opts = telegram_config.get("options", {})
|
||
parse_mode = opts.get("parse_mode", "HTML")
|
||
max_len = opts.get("max_message_length", 4096)
|
||
if parse_mode == "HTML":
|
||
text = text.replace("&", "&").replace("<", "<").replace(">", ">")
|
||
parts = [text[i : i + max_len] for i in range(0, len(text), max_len)] if len(text) > max_len else [text]
|
||
import requests
|
||
url = f"https://api.telegram.org/bot{token}/sendMessage"
|
||
for part in parts:
|
||
for attempt in range(1, attempts + 1):
|
||
try:
|
||
resp = requests.post(
|
||
url,
|
||
data={"chat_id": chat_id, "text": part, "parse_mode": parse_mode},
|
||
timeout=10,
|
||
)
|
||
resp.raise_for_status()
|
||
if not resp.json().get("ok"):
|
||
raise RuntimeError(resp.json().get("description", "Unknown"))
|
||
break
|
||
except Exception as e:
|
||
if attempt == attempts:
|
||
sys.stderr.write(f"[send_batch_notification] Не удалось отправить: {e}\n")
|
||
return
|
||
time.sleep(delay)
|
||
|
||
|
||
def _parse_level(level: str) -> int:
|
||
m = {
|
||
"DEBUG": logging.DEBUG,
|
||
"INFO": logging.INFO,
|
||
"STATUS": STATUS_LEVEL,
|
||
"WARNING": logging.WARNING,
|
||
"ERROR": logging.ERROR,
|
||
"CRITICAL": logging.CRITICAL,
|
||
}
|
||
return m.get(str(level).upper(), logging.INFO)
|
||
|
||
|
||
def setup_root_logger(
|
||
level: Union[int, str] = logging.INFO,
|
||
log_file: Optional[str] = None,
|
||
script_dir: Optional[str] = None,
|
||
telegram_config: Optional[Dict[str, Any]] = None,
|
||
) -> logging.Logger:
|
||
"""
|
||
Настроить корневой логгер проекта.
|
||
|
||
Args:
|
||
level: Уровень логирования (int или "INFO", "DEBUG" и т.д.)
|
||
log_file: Путь к файлу логов (опционально)
|
||
script_dir: Директория скрипта для fallback (опционально)
|
||
telegram_config: Настройки Telegram (если None — Telegram отключён)
|
||
|
||
Returns:
|
||
Настроенный логгер
|
||
"""
|
||
global _initialized
|
||
logger = logging.getLogger(ROOT_LOGGER_NAME)
|
||
formatter = logging.Formatter(LOG_FORMAT, datefmt=LOG_DATE_FORMAT)
|
||
|
||
if isinstance(level, str):
|
||
level = _parse_level(level)
|
||
|
||
if not _initialized:
|
||
logger.setLevel(level)
|
||
ch = logging.StreamHandler()
|
||
ch.setFormatter(formatter)
|
||
logger.addHandler(ch)
|
||
_initialized = True
|
||
|
||
if log_file:
|
||
has_file = any(
|
||
isinstance(h, logging.FileHandler)
|
||
and getattr(h, "baseFilename", "").endswith(os.path.basename(log_file))
|
||
for h in logger.handlers
|
||
)
|
||
if not has_file:
|
||
try:
|
||
log_path = log_file
|
||
if script_dir and not os.path.isabs(log_path):
|
||
log_path = os.path.normpath(os.path.join(script_dir, log_file))
|
||
fh = logging.FileHandler(log_path, encoding="utf-8")
|
||
fh.setFormatter(formatter)
|
||
logger.addHandler(fh)
|
||
except OSError:
|
||
if script_dir:
|
||
fallback = os.path.join(script_dir, "logs", "zfs_backup.log")
|
||
try:
|
||
os.makedirs(os.path.dirname(fallback), exist_ok=True)
|
||
fh = logging.FileHandler(fallback, encoding="utf-8")
|
||
fh.setFormatter(formatter)
|
||
logger.addHandler(fh)
|
||
logger.warning("Не удалось писать в %s, используется %s", log_file, fallback)
|
||
except OSError:
|
||
pass
|
||
|
||
if telegram_config and telegram_config.get("enabled"):
|
||
if not any(isinstance(h, TelegramHandler) for h in logger.handlers):
|
||
try:
|
||
th = TelegramHandler(telegram_config)
|
||
th.setLevel(_parse_level(telegram_config.get("log_level", "INFO")))
|
||
th.setFormatter(formatter)
|
||
logger.addHandler(th)
|
||
except Exception as e:
|
||
sys.stderr.write(f"[logger] Не удалось инициализировать TelegramHandler: {e}\n")
|
||
|
||
return logger
|
||
|
||
|
||
def get_logger(name: str) -> logging.Logger:
|
||
"""
|
||
Получить логгер для модуля.
|
||
"""
|
||
root = logging.getLogger(ROOT_LOGGER_NAME)
|
||
if not root.handlers:
|
||
setup_root_logger()
|
||
if name.startswith("ssh_client"):
|
||
return logging.getLogger(name)
|
||
return logging.getLogger(f"{ROOT_LOGGER_NAME}.{name}")
|