#!/usr/bin/python3 # -*- coding: utf-8 -*- """ Единый логгер для проекта SSH Client (PRD v1.8). Поддержка файла, консоли и Telegram через TelegramHandler. """ import logging import os import queue import sys import threading import time from typing import Any, Dict, List, Optional, Union # Имя корневого логгера проекта ROOT_LOGGER_NAME = "ssh_client" # Общий формат логов LOG_FORMAT = "%(asctime)s - %(name)s - %(levelname)s - %(message)s" LOG_DATE_FORMAT = "%Y-%m-%d %H:%M:%S" # Кастомный уровень STATUS (между INFO и WARNING) STATUS_LEVEL = 25 logging.addLevelName(STATUS_LEVEL, "STATUS") def status(self, message: str, *args, **kwargs) -> None: if self.isEnabledFor(STATUS_LEVEL): self._log(STATUS_LEVEL, message, args, **kwargs) logging.Logger.status = status # type: ignore _initialized = False class TelegramHandler(logging.Handler): """ Отправляет логи в Telegram через очередь (асинхронно). Retry при ошибках, разбивка длинных сообщений, /start для верификации. """ def __init__(self, config: Dict[str, Any]) -> None: super().__init__() self._config = config self._queue: queue.Queue = queue.Queue(maxsize=1000) self._bot_token = config.get("bot_token", "") self._chat_id = str(config.get("chat_id", "")) self._allowed_users: List[Any] = config.get("allowed_users", []) self._allowed_levels: List[str] = [str(x).upper() for x in config.get("allowed_levels", ["ERROR"])] retry = config.get("retry", {}) self._retry_attempts = retry.get("attempts", 3) self._retry_delay = retry.get("delay", 5) opts = config.get("options", {}) self._max_length = opts.get("max_message_length", 4096) self._parse_mode = opts.get("parse_mode", "HTML") self._disable_notification = opts.get("disable_notification", False) self._enable_commands = config.get("enable_commands", False) self._stop = threading.Event() self._worker = threading.Thread(target=self._process_queue, daemon=True) self._worker.start() if self._enable_commands: self._poll_thread = threading.Thread(target=self._poll_commands, daemon=True) self._poll_thread.start() def _level_allowed(self, record: logging.LogRecord) -> bool: levelname = getattr(record, "levelname", record.levelname) return str(levelname).upper() in self._allowed_levels def emit(self, record: logging.LogRecord) -> None: if not self._bot_token or not self._chat_id: return if not self._level_allowed(record): return try: msg = self.format(record) # Экранируем HTML при parse_mode=HTML if self._parse_mode == "HTML": msg = msg.replace("&", "&").replace("<", "<").replace(">", ">") self._queue.put_nowait(msg) except queue.Full: sys.stderr.write(f"[TelegramHandler] Очередь переполнена, сообщение пропущено\n") except Exception: self.handleError(record) def _process_queue(self) -> None: while not self._stop.is_set(): try: msg = self._queue.get(timeout=0.5) if msg: self._send_with_retry(msg) except queue.Empty: continue except Exception as e: sys.stderr.write(f"[TelegramHandler] Ошибка отправки: {e}\n") def _split_message(self, text: str) -> List[str]: if len(text) <= self._max_length: return [text] parts = [] while text: chunk = text[: self._max_length] idx = chunk.rfind("\n") if idx > self._max_length // 2: chunk, text = chunk[: idx + 1], text[idx + 1 :] else: text = text[self._max_length :] parts.append(chunk) return parts def _send_with_retry(self, text: str) -> None: parts = self._split_message(text) for part in parts: for attempt in range(1, self._retry_attempts + 1): try: self._send_to_telegram(part) break except Exception as e: if attempt == self._retry_attempts: sys.stderr.write(f"[TelegramHandler] Не удалось отправить после {attempt} попыток: {e}\n") return time.sleep(self._retry_delay) def _send_to_telegram(self, text: str) -> None: import requests url = f"https://api.telegram.org/bot{self._bot_token}/sendMessage" data = { "chat_id": self._chat_id, "text": text, "parse_mode": self._parse_mode, "disable_notification": self._disable_notification, } resp = requests.post(url, data=data, timeout=10) resp.raise_for_status() j = resp.json() if not j.get("ok"): raise RuntimeError(j.get("description", "Unknown Telegram API error")) def _poll_commands(self) -> None: import requests url = f"https://api.telegram.org/bot{self._bot_token}/getUpdates" offset = 0 while not self._stop.is_set(): try: r = requests.get(url, params={"offset": offset, "timeout": 30}, timeout=35) r.raise_for_status() data = r.json() if not data.get("ok"): time.sleep(5) continue for upd in data.get("result", []): offset = upd["update_id"] + 1 msg = upd.get("message", {}) text = msg.get("text", "").strip() if text != "/start": continue user = msg.get("from", {}) user_id = user.get("id") username = user.get("username", "") chat_id = msg.get("chat", {}).get("id") if self._authenticate_user(user_id, username): reply = "✅ Доступ разрешён. Вы будете получать уведомления." else: reply = "❌ Доступ запрещён." self._send_reply(chat_id, reply) except Exception as e: sys.stderr.write(f"[TelegramHandler] Polling error: {e}\n") time.sleep(5) def _send_reply(self, chat_id: Any, text: str) -> None: import requests url = f"https://api.telegram.org/bot{self._bot_token}/sendMessage" requests.post(url, data={"chat_id": chat_id, "text": text}, timeout=10) def _authenticate_user(self, user_id: Optional[int], username: str) -> bool: for u in self._allowed_users: if isinstance(u, int) and u == user_id: return True if isinstance(u, str) and (u == username or str(u) == str(user_id)): return True return False def close(self) -> None: self._stop.set() super().close() def _parse_level(level: str) -> int: m = { "DEBUG": logging.DEBUG, "INFO": logging.INFO, "STATUS": STATUS_LEVEL, "WARNING": logging.WARNING, "ERROR": logging.ERROR, "CRITICAL": logging.CRITICAL, } return m.get(str(level).upper(), logging.INFO) def setup_root_logger( level: Union[int, str] = logging.INFO, log_file: Optional[str] = None, script_dir: Optional[str] = None, telegram_config: Optional[Dict[str, Any]] = None, ) -> logging.Logger: """ Настроить корневой логгер проекта. Args: level: Уровень логирования (int или "INFO", "DEBUG" и т.д.) log_file: Путь к файлу логов (опционально) script_dir: Директория скрипта для fallback (опционально) telegram_config: Настройки Telegram (если None — Telegram отключён) Returns: Настроенный логгер """ global _initialized logger = logging.getLogger(ROOT_LOGGER_NAME) formatter = logging.Formatter(LOG_FORMAT, datefmt=LOG_DATE_FORMAT) if isinstance(level, str): level = _parse_level(level) if not _initialized: logger.setLevel(level) ch = logging.StreamHandler() ch.setFormatter(formatter) logger.addHandler(ch) _initialized = True if log_file: has_file = any( isinstance(h, logging.FileHandler) and getattr(h, "baseFilename", "").endswith(os.path.basename(log_file)) for h in logger.handlers ) if not has_file: try: log_path = log_file if script_dir and not os.path.isabs(log_path): log_path = os.path.normpath(os.path.join(script_dir, log_file)) fh = logging.FileHandler(log_path, encoding="utf-8") fh.setFormatter(formatter) logger.addHandler(fh) except OSError: if script_dir: fallback = os.path.join(script_dir, "logs", "zfs_backup.log") try: os.makedirs(os.path.dirname(fallback), exist_ok=True) fh = logging.FileHandler(fallback, encoding="utf-8") fh.setFormatter(formatter) logger.addHandler(fh) logger.warning("Не удалось писать в %s, используется %s", log_file, fallback) except OSError: pass if telegram_config and telegram_config.get("enabled"): if not any(isinstance(h, TelegramHandler) for h in logger.handlers): try: th = TelegramHandler(telegram_config) th.setLevel(_parse_level(telegram_config.get("log_level", "INFO"))) th.setFormatter(formatter) logger.addHandler(th) except Exception as e: sys.stderr.write(f"[logger] Не удалось инициализировать TelegramHandler: {e}\n") return logger def get_logger(name: str) -> logging.Logger: """ Получить логгер для модуля. """ root = logging.getLogger(ROOT_LOGGER_NAME) if not root.handlers: setup_root_logger() if name.startswith("ssh_client"): return logging.getLogger(name) return logging.getLogger(f"{ROOT_LOGGER_NAME}.{name}")