diff --git a/config/config_log.yaml b/config/config_log.yaml index c3ddd84..531cf67 100644 --- a/config/config_log.yaml +++ b/config/config_log.yaml @@ -1,4 +1,4 @@ -# Конфигурация логирования (PRD v1.8) +# Конфигурация логирования (PRD v1.8, v1.9) # Используется: zfs_backup.py, modules/logger.py # Основные настройки @@ -30,3 +30,19 @@ telegram: max_message_length: 4096 enable_commands: true + + # PRD v1.9: структурированные batch-уведомления + batch_notifications: + enabled: true + operations: + - "zfs_backup" + + suppress_individual_logs: + - "INFO" + - "STATUS" + + format: + show_timestamps: true + show_server_names: true + show_datasets: true + max_items_in_list: 20 diff --git a/config/zfs_backup.yaml b/config/zfs_backup.yaml index 6beefeb..8ab57e8 100644 --- a/config/zfs_backup.yaml +++ b/config/zfs_backup.yaml @@ -21,3 +21,14 @@ servers: target_pool: fast-backup snapshot_name: dd-mm-yyyy retention_days: 90 + - name: gwo.gmt.cln.su + pools: + - source_pool: zp0 + datasets: + - containers/smb + - containers/dc0 +# - containers/dns +# - containers/dns2 + target_pool: backup + snapshot_name: dd-mm-yyyy + retention_days: 90 diff --git a/modules/logger.py b/modules/logger.py index 6bbf2b5..8c3d4fd 100644 --- a/modules/logger.py +++ b/modules/logger.py @@ -43,6 +43,10 @@ class TelegramHandler(logging.Handler): def __init__(self, config: Dict[str, Any]) -> None: super().__init__() self._config = config + self._suppress_levels: List[str] = [] + batch = config.get("batch_notifications", {}) + if batch.get("enabled"): + self._suppress_levels = [str(x).upper() for x in batch.get("suppress_individual_logs", [])] self._queue: queue.Queue = queue.Queue(maxsize=1000) self._bot_token = config.get("bot_token", "") self._chat_id = str(config.get("chat_id", "")) @@ -70,11 +74,13 @@ class TelegramHandler(logging.Handler): def emit(self, record: logging.LogRecord) -> None: if not self._bot_token or not self._chat_id: return + levelname = str(getattr(record, "levelname", record.levelname)).upper() + if self._suppress_levels and levelname in self._suppress_levels: + return if not self._level_allowed(record): return try: msg = self.format(record) - # Экранируем HTML при parse_mode=HTML if self._parse_mode == "HTML": msg = msg.replace("&", "&").replace("<", "<").replace(">", ">") self._queue.put_nowait(msg) @@ -183,11 +189,68 @@ class TelegramHandler(logging.Handler): return True return False + def flush(self, timeout: float = 15.0) -> None: + """Дождаться отправки всех сообщений из очереди перед выходом программы.""" + deadline = time.monotonic() + timeout + while not self._queue.empty() and time.monotonic() < deadline: + time.sleep(0.2) + self._stop.set() + self._worker.join(timeout=5.0) + def close(self) -> None: self._stop.set() super().close() +def flush_telegram_handlers(timeout: float = 15.0) -> None: + """Ожидание отправки всех сообщений в Telegram перед выходом (вызывать в конце main).""" + root = logging.getLogger(ROOT_LOGGER_NAME) + for h in root.handlers: + if isinstance(h, TelegramHandler): + h.flush(timeout=timeout) + + +def send_batch_notification(telegram_config: Optional[Dict[str, Any]], text: str) -> None: + """ + Отправить структурированное batch-сообщение напрямую в Telegram (PRD v1.9). + Не идёт через очередь, отправка синхронная с retry. + """ + if not telegram_config or not telegram_config.get("enabled"): + return + token = telegram_config.get("bot_token") + chat_id = telegram_config.get("chat_id") + if not token or not chat_id: + return + retry_cfg = telegram_config.get("retry", {}) + attempts = retry_cfg.get("attempts", 3) + delay = retry_cfg.get("delay", 5) + opts = telegram_config.get("options", {}) + parse_mode = opts.get("parse_mode", "HTML") + max_len = opts.get("max_message_length", 4096) + if parse_mode == "HTML": + text = text.replace("&", "&").replace("<", "<").replace(">", ">") + parts = [text[i : i + max_len] for i in range(0, len(text), max_len)] if len(text) > max_len else [text] + import requests + url = f"https://api.telegram.org/bot{token}/sendMessage" + for part in parts: + for attempt in range(1, attempts + 1): + try: + resp = requests.post( + url, + data={"chat_id": chat_id, "text": part, "parse_mode": parse_mode}, + timeout=10, + ) + resp.raise_for_status() + if not resp.json().get("ok"): + raise RuntimeError(resp.json().get("description", "Unknown")) + break + except Exception as e: + if attempt == attempts: + sys.stderr.write(f"[send_batch_notification] Не удалось отправить: {e}\n") + return + time.sleep(delay) + + def _parse_level(level: str) -> int: m = { "DEBUG": logging.DEBUG, diff --git a/modules/zfs_backup_ops.py b/modules/zfs_backup_ops.py index 490d9b9..cc77246 100644 --- a/modules/zfs_backup_ops.py +++ b/modules/zfs_backup_ops.py @@ -10,6 +10,9 @@ import re from datetime import datetime, timedelta from typing import Any, Dict, List, Optional, Tuple +# Результат batch-операции: (успешно, список ошибок) +# ошибка = (server/dataset_key, сообщение) — например "gwo2.mps.cln.su/containers/smb2" + from .logger import get_logger from .protocols import SSHProtocol from .ssh_base import SSHBase @@ -130,14 +133,18 @@ def backup_server( server: Dict[str, Any], ssh_defaults: Dict[str, Any], log: logging.Logger, -) -> bool: + collect_errors: bool = False, +) -> Tuple[int, List[Tuple[str, str]]]: """ Выполнить бэкап для одного сервера: снапшоты, send/recv, очистка. Конфиг v1.1: server['pools'] — список пулов с source_pool, datasets, target_pool. + Возвращает (успешно, список_ошибок), где ошибка = (ключ "server/dataset", сообщение). """ name = server["name"] pools = server["pools"] retention_days = int(server.get("retention_days", 30)) + success_count = 0 + errors: List[Tuple[str, str]] = [] port = ssh_defaults.get("port", 22) username = ssh_defaults.get("username", "root") @@ -148,27 +155,35 @@ def backup_server( log.info("Сервер %s: дата снапшотов %s", name, date_str) ssh = SSHBase(hostname=name, port=port, username=username, pkey_file=pkey_file, host_keys=host_keys) - ssh.connect() - - pool_counts: Dict[str, int] = {} - total_datasets = 0 + try: + ssh.connect() + except Exception as e: + for pool_config in pools: + for dataset in pool_config["datasets"]: + errors.append((f"{name}/{dataset}", str(e))) + return 0, errors try: for pool_config in pools: source_pool = pool_config["source_pool"] datasets = pool_config["datasets"] target_pool = pool_config["target_pool"] - pool_counts[source_pool] = len(datasets) - total_datasets += len(datasets) for dataset in datasets: full_dataset = f"{source_pool}/{dataset}" + key = f"{name}/{dataset}" def do_snapshot(fd=full_dataset, d=date_str): if not _create_snapshot(ssh, fd, d, log): raise RuntimeError(f"Snapshot {fd}@{d} failed") - _run_with_retry(log, name, f"snapshot {full_dataset}", do_snapshot) + try: + _run_with_retry(log, name, f"snapshot {full_dataset}", do_snapshot) + except Exception as e: + errors.append((key, str(e))) + if not collect_errors: + raise + continue for pool_config in pools: source_pool = pool_config["source_pool"] @@ -178,6 +193,7 @@ def backup_server( for dataset in datasets: full_dataset = f"{source_pool}/{dataset}" target_dataset = f"{target_pool}/{dataset}" + key = f"{name}/{dataset}" def do_send_recv(fd=full_dataset, td=target_dataset, d=date_str): prev_date = _get_previous_snapshot_date(ssh, td) @@ -188,14 +204,20 @@ def backup_server( if not _replicate_snapshot(ssh, fd, td, d, prev_date, log): raise RuntimeError(f"Replicate {fd}@{d} → {td} failed") - _run_with_retry(log, name, f"send/recv {full_dataset}", do_send_recv) + try: + _run_with_retry(log, name, f"send/recv {full_dataset}", do_send_recv) + success_count += 1 + except Exception as e: + errors.append((key, str(e))) + if not collect_errors: + raise + continue cutoff = datetime.now() - timedelta(days=retention_days) for pool_config in pools: source_pool = pool_config["source_pool"] datasets = pool_config["datasets"] target_pool = pool_config["target_pool"] - for dataset in datasets: for pool, ds in [(source_pool, dataset), (target_pool, dataset)]: full_ds = f"{pool}/{ds}" @@ -218,9 +240,9 @@ def backup_server( except ValueError: continue - parts = ", ".join(f"{p}:{c}" for p, c in sorted(pool_counts.items())) - log.info("✅ %s: %s datasets backed up (%s)", name, total_datasets, parts) - return True + if not collect_errors: + success_count = sum(len(pc["datasets"]) for pc in pools) + return success_count, errors finally: ssh.close() diff --git a/zfs_backup.py b/zfs_backup.py index 9163542..09db561 100755 --- a/zfs_backup.py +++ b/zfs_backup.py @@ -1,17 +1,15 @@ #!/usr/bin/python3 # -*- coding: utf-8 -*- """ -Точка входа ZFS Backup — cron/CLI (PRD v1.8). +Точка входа ZFS Backup — cron/CLI (PRD v1.8, v1.9). Конфигурация: config/config_log.yaml (логирование), config/zfs_backup.yaml (бэкап). - -Запуск: python zfs_backup.py - или: zfs-backup (после pip install -e .) """ import argparse import logging import os import sys -from typing import Any, Dict, Optional +from datetime import datetime +from typing import Any, Dict, List, Optional, Tuple import yaml @@ -19,7 +17,7 @@ SCRIPT_DIR = os.path.dirname(os.path.abspath(__file__)) if SCRIPT_DIR not in sys.path: sys.path.insert(0, SCRIPT_DIR) -from modules.logger import setup_root_logger, get_logger +from modules.logger import setup_root_logger, get_logger, flush_telegram_handlers, send_batch_notification from modules.zfs_backup_ops import backup_server, MAX_RETRIES DEFAULT_LOG_FILE = "/var/log/zfs_backup.log" @@ -53,6 +51,87 @@ def setup_logging( return get_logger("zfs_backup") +def _build_server_datasets(server: Dict[str, Any]) -> List[Tuple[str, str]]: + """Список наборов данных (source_path, target_path) для одного сервера. PRD v1.9.2.""" + configs = [] + for pool in server["pools"]: + for ds in pool["datasets"]: + src = f"{pool['source_pool']}/{ds}" + tgt = f"{pool['target_pool']}/{ds}" + configs.append((src, tgt)) + return configs + + +def _format_server_start(server_name: str, configs: List[Tuple[str, str]], max_items: int = 20) -> str: + """Формирование уведомления о старте бэкапов для одного сервера. PRD v1.9.2.""" + ts = datetime.now().strftime("%Y-%m-%d %H:%M:%S") + lines = [ + f"🚀 Запуск бэкапов на сервере - {server_name}", + f"⏰ Время: {ts}", + f"📊 Всего наборов данных: {len(configs)}", + "📋 Наборы данных:", + ] + items = [f" • {src} -> {tgt}" for src, tgt in configs] + if len(items) > max_items: + items = items[:max_items] + [f" ... и ещё {len(items) - max_items}"] + lines.extend(items) + lines.append("Статус: в процессе...") + return "\n".join(lines) + + +def _error_key_to_display(server: Dict[str, Any], key: str) -> str: + """Преобразует ключ ошибки server/dataset в source_pool/dataset для отображения. PRD v1.9.3.""" + parts = key.split("/", 1) + if len(parts) != 2: + return key + _server_name, dataset = parts + for pool in server["pools"]: + if dataset in pool.get("datasets", []): + return f"{pool['source_pool']}/{dataset}" + return key + + +def _format_server_finish( + server_name: str, + total: int, + success: int, + errors: List[Tuple[str, str]], + server: Dict[str, Any], + max_items: int = 20, +) -> str: + """Формирование уведомления о завершении бэкапов для одного сервера. PRD v1.9.3.""" + ts = datetime.now().strftime("%Y-%m-%d %H:%M:%S") + err_count = len(errors) + if err_count == 0: + header = f"🎉 Завершены бэкапы на сервере - {server_name}" + status = "Статус: ✅ ВСЕ УСПЕШНО" + elif err_count < total: + header = f"⚠️ Завершены бэкапы на сервере - {server_name}" + status = "Статус: ⚠️ ТРЕБУЕТ ВНИМАНИЯ" + else: + header = f"🔥 Завершены бэкапы на сервере - {server_name}" + status = "Статус: 🔥 ТРЕБУЕТСЯ ВМЕШАТЕЛЬСТВО" + + lines = [ + header, + f"⏰ Время: {ts}", + "📊 Итоги:", + f"├─ 📈 Всего: {total}", + f"├─ ✅ Успешно: {success}", + f"└─ ❌ Ошибки: {err_count}", + ] + if errors: + lines.append("❌ Список ошибок:") + err_items = [ + f" • {_error_key_to_display(server, k)}: {msg}" for k, msg in errors + ] + if len(err_items) > max_items: + err_items = err_items[:max_items] + [f" ... и ещё {len(err_items) - max_items}"] + lines.extend(err_items) + lines.append(status) + return "\n".join(lines) + + def load_config(config_path: str) -> Dict[str, Any]: """Загрузка конфигурации из YAML (формат v1.1: servers[].pools[]).""" with open(config_path, "r", encoding="utf-8") as f: @@ -105,10 +184,39 @@ def main() -> int: ssh_defaults = config.get("ssh_defaults") or {} servers = config["servers"] + log_cfg = load_log_config(args.log_config) + telegram_cfg = log_cfg.get("telegram") + batch_cfg = (telegram_cfg or {}).get("batch_notifications", {}) + batch_enabled = ( + batch_cfg.get("enabled") + and "zfs_backup" in batch_cfg.get("operations", []) + ) + fmt = batch_cfg.get("format", {}) + max_items = fmt.get("max_items_in_list", 20) + + all_errors: List[Tuple[str, str]] = [] for server in servers: + server_configs = _build_server_datasets(server) + server_total = len(server_configs) + + if batch_enabled and server_total > 0: + send_batch_notification( + telegram_cfg, + _format_server_start(server["name"], server_configs, max_items), + ) try: - backup_server(server, ssh_defaults, log) + success, errors = backup_server( + server, ssh_defaults, log, collect_errors=batch_enabled + ) + all_errors.extend(errors) + if batch_enabled and server_total > 0: + send_batch_notification( + telegram_cfg, + _format_server_finish( + server["name"], server_total, success, errors, server, max_items + ), + ) except Exception as e: log.exception( "❌ %s: бэкап завершился ошибкой после %s попыток: %s", @@ -116,9 +224,26 @@ def main() -> int: MAX_RETRIES, e, ) - return 1 + if batch_enabled: + server_errors = [ + (f"{server['name']}/{ds}", str(e)) + for pool in server["pools"] + for ds in pool["datasets"] + ] + all_errors.extend(server_errors) + if server_total > 0: + send_batch_notification( + telegram_cfg, + _format_server_finish( + server["name"], server_total, 0, server_errors, server, max_items + ), + ) + else: + flush_telegram_handlers() + return 1 - return 0 + flush_telegram_handlers() + return 0 if not all_errors else 1 if __name__ == "__main__":