#!/usr/bin/python3 # -*- coding: utf-8 -*- """ Точка входа ZFS Backup — cron/CLI (PRD v1.8, v1.9). Конфигурация: config/config_log.yaml (логирование), config/zfs_backup.yaml (бэкап). """ import argparse import logging import os import sys from datetime import datetime from typing import Any, Dict, List, Optional, Tuple import yaml SCRIPT_DIR = os.path.dirname(os.path.abspath(__file__)) if SCRIPT_DIR not in sys.path: sys.path.insert(0, SCRIPT_DIR) from modules.logger import setup_root_logger, get_logger, flush_telegram_handlers, send_batch_notification from modules.zfs_backup_ops import backup_server, MAX_RETRIES DEFAULT_LOG_FILE = "/var/log/zfs_backup.log" CONFIG_LOG_PATH = os.path.join(SCRIPT_DIR, "config", "config_log.yaml") def load_log_config(config_path: Optional[str] = None) -> Dict[str, Any]: """Загрузка config_log.yaml. При отсутствии — возвращает defaults.""" path = config_path or CONFIG_LOG_PATH if not os.path.isfile(path): return {"log_file": DEFAULT_LOG_FILE, "log_level": "INFO"} with open(path, "r", encoding="utf-8") as f: return yaml.safe_load(f) or {} def setup_logging( log_file: Optional[str] = None, log_config_path: Optional[str] = None, ) -> logging.Logger: """Настройка логирования из config_log.yaml (файл, консоль, Telegram).""" cfg = load_log_config(log_config_path) path = log_file or cfg.get("log_file", DEFAULT_LOG_FILE) if not os.path.isabs(path): path = os.path.normpath(os.path.join(SCRIPT_DIR, path)) setup_root_logger( level=cfg.get("log_level", "INFO"), log_file=path, script_dir=SCRIPT_DIR, telegram_config=cfg.get("telegram"), ) return get_logger("zfs_backup") def _build_server_datasets(server: Dict[str, Any]) -> List[Tuple[str, str]]: """Список наборов данных (source_path, target_path) для одного сервера. PRD v1.9.2.""" configs = [] for pool in server["pools"]: for ds in pool["datasets"]: src = f"{pool['source_pool']}/{ds}" tgt = f"{pool['target_pool']}/{ds}" configs.append((src, tgt)) return configs def _format_server_start(server_name: str, configs: List[Tuple[str, str]], max_items: int = 20) -> str: """Формирование уведомления о старте бэкапов для одного сервера. PRD v1.9.2.""" ts = datetime.now().strftime("%Y-%m-%d %H:%M:%S") lines = [ f"🚀 Запуск бэкапов на сервере - {server_name}", f"⏰ Время: {ts}", f"📊 Всего наборов данных: {len(configs)}", "📋 Наборы данных:", ] items = [f" • {src} -> {tgt}" for src, tgt in configs] if len(items) > max_items: items = items[:max_items] + [f" ... и ещё {len(items) - max_items}"] lines.extend(items) lines.append("Статус: в процессе...") return "\n".join(lines) def _error_key_to_display(server: Dict[str, Any], key: str) -> str: """Преобразует ключ ошибки server/dataset в source_pool/dataset для отображения. PRD v1.9.3.""" parts = key.split("/", 1) if len(parts) != 2: return key _server_name, dataset = parts for pool in server["pools"]: if dataset in pool.get("datasets", []): return f"{pool['source_pool']}/{dataset}" return key def _format_server_finish( server_name: str, total: int, success: int, errors: List[Tuple[str, str]], server: Dict[str, Any], max_items: int = 20, ) -> str: """Формирование уведомления о завершении бэкапов для одного сервера. PRD v1.9.3.""" ts = datetime.now().strftime("%Y-%m-%d %H:%M:%S") err_count = len(errors) if err_count == 0: header = f"🎉 Завершены бэкапы на сервере - {server_name}" status = "Статус: ✅ ВСЕ УСПЕШНО" elif err_count < total: header = f"⚠️ Завершены бэкапы на сервере - {server_name}" status = "Статус: ⚠️ ТРЕБУЕТ ВНИМАНИЯ" else: header = f"🔥 Завершены бэкапы на сервере - {server_name}" status = "Статус: 🔥 ТРЕБУЕТСЯ ВМЕШАТЕЛЬСТВО" lines = [ header, f"⏰ Время: {ts}", "📊 Итоги:", f"├─ 📈 Всего: {total}", f"├─ ✅ Успешно: {success}", f"└─ ❌ Ошибки: {err_count}", ] if errors: lines.append("❌ Список ошибок:") err_items = [ f" • {_error_key_to_display(server, k)}: {msg}" for k, msg in errors ] if len(err_items) > max_items: err_items = err_items[:max_items] + [f" ... и ещё {len(err_items) - max_items}"] lines.extend(err_items) lines.append(status) return "\n".join(lines) def load_config(config_path: str) -> Dict[str, Any]: """Загрузка конфигурации из YAML (формат v1.1: servers[].pools[]).""" with open(config_path, "r", encoding="utf-8") as f: config = yaml.safe_load(f) if not config or "servers" not in config: raise ValueError("В config.yaml должен быть раздел 'servers'") for i, srv in enumerate(config["servers"]): if "pools" not in srv or not srv["pools"]: raise ValueError(f"Сервер [{i}] (name={srv.get('name')}) должен содержать непустой раздел 'pools'") for j, pool in enumerate(srv["pools"]): for key in ("source_pool", "target_pool", "datasets"): if key not in pool: raise ValueError(f"Сервер [{i}] pool [{j}]: отсутствует поле '{key}'") return config def main() -> int: parser = argparse.ArgumentParser(description="ZFS Backup — бэкап ZFS датасетов по config.yaml") parser.add_argument( "--config", default="config/zfs_backup.yaml", help="Путь к config (по умолчанию config/zfs_backup.yaml)", ) parser.add_argument( "--log-file", default=None, help="Путь к лог-файлу (переопределяет config/config_log.yaml)", ) parser.add_argument( "--log-config", default=None, help="Путь к config_log.yaml (по умолчанию config/config_log.yaml)", ) args = parser.parse_args() log = setup_logging(log_file=args.log_file, log_config_path=args.log_config) config_path = args.config if not os.path.isabs(config_path): config_path = os.path.normpath(os.path.join(SCRIPT_DIR, config_path)) if not os.path.isfile(config_path): log.error("Файл конфигурации не найден: %s", config_path) return 1 try: config = load_config(config_path) except Exception as e: log.exception("Ошибка загрузки config.yaml: %s", e) return 1 ssh_defaults = config.get("ssh_defaults") or {} servers = config["servers"] log_cfg = load_log_config(args.log_config) telegram_cfg = log_cfg.get("telegram") batch_cfg = (telegram_cfg or {}).get("batch_notifications", {}) batch_enabled = ( batch_cfg.get("enabled") and "zfs_backup" in batch_cfg.get("operations", []) ) fmt = batch_cfg.get("format", {}) max_items = fmt.get("max_items_in_list", 20) all_errors: List[Tuple[str, str]] = [] for server in servers: server_configs = _build_server_datasets(server) server_total = len(server_configs) if batch_enabled and server_total > 0: send_batch_notification( telegram_cfg, _format_server_start(server["name"], server_configs, max_items), ) try: success, errors = backup_server( server, ssh_defaults, log, collect_errors=batch_enabled ) all_errors.extend(errors) if batch_enabled and server_total > 0: send_batch_notification( telegram_cfg, _format_server_finish( server["name"], server_total, success, errors, server, max_items ), ) except Exception as e: log.exception( "❌ %s: бэкап завершился ошибкой после %s попыток: %s", server["name"], MAX_RETRIES, e, ) if batch_enabled: server_errors = [ (f"{server['name']}/{ds}", str(e)) for pool in server["pools"] for ds in pool["datasets"] ] all_errors.extend(server_errors) if server_total > 0: send_batch_notification( telegram_cfg, _format_server_finish( server["name"], server_total, 0, server_errors, server, max_items ), ) else: flush_telegram_handlers() return 1 flush_telegram_handlers() return 0 if not all_errors else 1 if __name__ == "__main__": sys.exit(main())