Files
cursor_ai/zfs_backup.py

251 lines
9.5 KiB
Python
Executable File
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/python3
# -*- coding: utf-8 -*-
"""
Точка входа ZFS Backup — cron/CLI (PRD v1.8, v1.9).
Конфигурация: config/config_log.yaml (логирование), config/zfs_backup.yaml (бэкап).
"""
import argparse
import logging
import os
import sys
from datetime import datetime
from typing import Any, Dict, List, Optional, Tuple
import yaml
SCRIPT_DIR = os.path.dirname(os.path.abspath(__file__))
if SCRIPT_DIR not in sys.path:
sys.path.insert(0, SCRIPT_DIR)
from modules.logger import setup_root_logger, get_logger, flush_telegram_handlers, send_batch_notification
from modules.zfs_backup_ops import backup_server, MAX_RETRIES
DEFAULT_LOG_FILE = "/var/log/zfs_backup.log"
CONFIG_LOG_PATH = os.path.join(SCRIPT_DIR, "config", "config_log.yaml")
def load_log_config(config_path: Optional[str] = None) -> Dict[str, Any]:
"""Загрузка config_log.yaml. При отсутствии — возвращает defaults."""
path = config_path or CONFIG_LOG_PATH
if not os.path.isfile(path):
return {"log_file": DEFAULT_LOG_FILE, "log_level": "INFO"}
with open(path, "r", encoding="utf-8") as f:
return yaml.safe_load(f) or {}
def setup_logging(
log_file: Optional[str] = None,
log_config_path: Optional[str] = None,
) -> logging.Logger:
"""Настройка логирования из config_log.yaml (файл, консоль, Telegram)."""
cfg = load_log_config(log_config_path)
path = log_file or cfg.get("log_file", DEFAULT_LOG_FILE)
if not os.path.isabs(path):
path = os.path.normpath(os.path.join(SCRIPT_DIR, path))
setup_root_logger(
level=cfg.get("log_level", "INFO"),
log_file=path,
script_dir=SCRIPT_DIR,
telegram_config=cfg.get("telegram"),
)
return get_logger("zfs_backup")
def _build_server_datasets(server: Dict[str, Any]) -> List[Tuple[str, str]]:
"""Список наборов данных (source_path, target_path) для одного сервера. PRD v1.9.2."""
configs = []
for pool in server["pools"]:
for ds in pool["datasets"]:
src = f"{pool['source_pool']}/{ds}"
tgt = f"{pool['target_pool']}/{ds}"
configs.append((src, tgt))
return configs
def _format_server_start(server_name: str, configs: List[Tuple[str, str]], max_items: int = 20) -> str:
"""Формирование уведомления о старте бэкапов для одного сервера. PRD v1.9.2."""
ts = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
lines = [
f"🚀 Запуск бэкапов на сервере - {server_name}",
f"⏰ Время: {ts}",
f"📊 Всего наборов данных: {len(configs)}",
"📋 Наборы данных:",
]
items = [f"{src} -> {tgt}" for src, tgt in configs]
if len(items) > max_items:
items = items[:max_items] + [f" ... и ещё {len(items) - max_items}"]
lines.extend(items)
lines.append("Статус: в процессе...")
return "\n".join(lines)
def _error_key_to_display(server: Dict[str, Any], key: str) -> str:
"""Преобразует ключ ошибки server/dataset в source_pool/dataset для отображения. PRD v1.9.3."""
parts = key.split("/", 1)
if len(parts) != 2:
return key
_server_name, dataset = parts
for pool in server["pools"]:
if dataset in pool.get("datasets", []):
return f"{pool['source_pool']}/{dataset}"
return key
def _format_server_finish(
server_name: str,
total: int,
success: int,
errors: List[Tuple[str, str]],
server: Dict[str, Any],
max_items: int = 20,
) -> str:
"""Формирование уведомления о завершении бэкапов для одного сервера. PRD v1.9.3."""
ts = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
err_count = len(errors)
if err_count == 0:
header = f"🎉 Завершены бэкапы на сервере - {server_name}"
status = "Статус: ✅ ВСЕ УСПЕШНО"
elif err_count < total:
header = f"⚠️ Завершены бэкапы на сервере - {server_name}"
status = "Статус: ⚠️ ТРЕБУЕТ ВНИМАНИЯ"
else:
header = f"🔥 Завершены бэкапы на сервере - {server_name}"
status = "Статус: 🔥 ТРЕБУЕТСЯ ВМЕШАТЕЛЬСТВО"
lines = [
header,
f"⏰ Время: {ts}",
"📊 Итоги:",
f"├─ 📈 Всего: {total}",
f"├─ ✅ Успешно: {success}",
f"└─ ❌ Ошибки: {err_count}",
]
if errors:
lines.append("❌ Список ошибок:")
err_items = [
f"{_error_key_to_display(server, k)}: {msg}" for k, msg in errors
]
if len(err_items) > max_items:
err_items = err_items[:max_items] + [f" ... и ещё {len(err_items) - max_items}"]
lines.extend(err_items)
lines.append(status)
return "\n".join(lines)
def load_config(config_path: str) -> Dict[str, Any]:
"""Загрузка конфигурации из YAML (формат v1.1: servers[].pools[])."""
with open(config_path, "r", encoding="utf-8") as f:
config = yaml.safe_load(f)
if not config or "servers" not in config:
raise ValueError("В config.yaml должен быть раздел 'servers'")
for i, srv in enumerate(config["servers"]):
if "pools" not in srv or not srv["pools"]:
raise ValueError(f"Сервер [{i}] (name={srv.get('name')}) должен содержать непустой раздел 'pools'")
for j, pool in enumerate(srv["pools"]):
for key in ("source_pool", "target_pool", "datasets"):
if key not in pool:
raise ValueError(f"Сервер [{i}] pool [{j}]: отсутствует поле '{key}'")
return config
def main() -> int:
parser = argparse.ArgumentParser(description="ZFS Backup — бэкап ZFS датасетов по config.yaml")
parser.add_argument(
"--config",
default="config/zfs_backup.yaml",
help="Путь к config (по умолчанию config/zfs_backup.yaml)",
)
parser.add_argument(
"--log-file",
default=None,
help="Путь к лог-файлу (переопределяет config/config_log.yaml)",
)
parser.add_argument(
"--log-config",
default=None,
help="Путь к config_log.yaml (по умолчанию config/config_log.yaml)",
)
args = parser.parse_args()
log = setup_logging(log_file=args.log_file, log_config_path=args.log_config)
config_path = args.config
if not os.path.isabs(config_path):
config_path = os.path.normpath(os.path.join(SCRIPT_DIR, config_path))
if not os.path.isfile(config_path):
log.error("Файл конфигурации не найден: %s", config_path)
return 1
try:
config = load_config(config_path)
except Exception as e:
log.exception("Ошибка загрузки config.yaml: %s", e)
return 1
ssh_defaults = config.get("ssh_defaults") or {}
servers = config["servers"]
log_cfg = load_log_config(args.log_config)
telegram_cfg = log_cfg.get("telegram")
batch_cfg = (telegram_cfg or {}).get("batch_notifications", {})
batch_enabled = (
batch_cfg.get("enabled")
and "zfs_backup" in batch_cfg.get("operations", [])
)
fmt = batch_cfg.get("format", {})
max_items = fmt.get("max_items_in_list", 20)
all_errors: List[Tuple[str, str]] = []
for server in servers:
server_configs = _build_server_datasets(server)
server_total = len(server_configs)
if batch_enabled and server_total > 0:
send_batch_notification(
telegram_cfg,
_format_server_start(server["name"], server_configs, max_items),
)
try:
success, errors = backup_server(
server, ssh_defaults, log, collect_errors=batch_enabled
)
all_errors.extend(errors)
if batch_enabled and server_total > 0:
send_batch_notification(
telegram_cfg,
_format_server_finish(
server["name"], server_total, success, errors, server, max_items
),
)
except Exception as e:
log.exception(
"%s: бэкап завершился ошибкой после %s попыток: %s",
server["name"],
MAX_RETRIES,
e,
)
if batch_enabled:
server_errors = [
(f"{server['name']}/{ds}", str(e))
for pool in server["pools"]
for ds in pool["datasets"]
]
all_errors.extend(server_errors)
if server_total > 0:
send_batch_notification(
telegram_cfg,
_format_server_finish(
server["name"], server_total, 0, server_errors, server, max_items
),
)
else:
flush_telegram_handlers()
return 1
flush_telegram_handlers()
return 0 if not all_errors else 1
if __name__ == "__main__":
sys.exit(main())