Доведен до ума и стабилизирован модуль ZFS

This commit is contained in:
2026-02-24 14:35:26 +03:00
parent f227824070
commit 347d7388b2
5 changed files with 261 additions and 24 deletions

View File

@@ -1,4 +1,4 @@
# Конфигурация логирования (PRD v1.8) # Конфигурация логирования (PRD v1.8, v1.9)
# Используется: zfs_backup.py, modules/logger.py # Используется: zfs_backup.py, modules/logger.py
# Основные настройки # Основные настройки
@@ -30,3 +30,19 @@ telegram:
max_message_length: 4096 max_message_length: 4096
enable_commands: true enable_commands: true
# PRD v1.9: структурированные batch-уведомления
batch_notifications:
enabled: true
operations:
- "zfs_backup"
suppress_individual_logs:
- "INFO"
- "STATUS"
format:
show_timestamps: true
show_server_names: true
show_datasets: true
max_items_in_list: 20

View File

@@ -21,3 +21,14 @@ servers:
target_pool: fast-backup target_pool: fast-backup
snapshot_name: dd-mm-yyyy snapshot_name: dd-mm-yyyy
retention_days: 90 retention_days: 90
- name: gwo.gmt.cln.su
pools:
- source_pool: zp0
datasets:
- containers/smb
- containers/dc0
# - containers/dns
# - containers/dns2
target_pool: backup
snapshot_name: dd-mm-yyyy
retention_days: 90

View File

@@ -43,6 +43,10 @@ class TelegramHandler(logging.Handler):
def __init__(self, config: Dict[str, Any]) -> None: def __init__(self, config: Dict[str, Any]) -> None:
super().__init__() super().__init__()
self._config = config self._config = config
self._suppress_levels: List[str] = []
batch = config.get("batch_notifications", {})
if batch.get("enabled"):
self._suppress_levels = [str(x).upper() for x in batch.get("suppress_individual_logs", [])]
self._queue: queue.Queue = queue.Queue(maxsize=1000) self._queue: queue.Queue = queue.Queue(maxsize=1000)
self._bot_token = config.get("bot_token", "") self._bot_token = config.get("bot_token", "")
self._chat_id = str(config.get("chat_id", "")) self._chat_id = str(config.get("chat_id", ""))
@@ -70,11 +74,13 @@ class TelegramHandler(logging.Handler):
def emit(self, record: logging.LogRecord) -> None: def emit(self, record: logging.LogRecord) -> None:
if not self._bot_token or not self._chat_id: if not self._bot_token or not self._chat_id:
return return
levelname = str(getattr(record, "levelname", record.levelname)).upper()
if self._suppress_levels and levelname in self._suppress_levels:
return
if not self._level_allowed(record): if not self._level_allowed(record):
return return
try: try:
msg = self.format(record) msg = self.format(record)
# Экранируем HTML при parse_mode=HTML
if self._parse_mode == "HTML": if self._parse_mode == "HTML":
msg = msg.replace("&", "&amp;").replace("<", "&lt;").replace(">", "&gt;") msg = msg.replace("&", "&amp;").replace("<", "&lt;").replace(">", "&gt;")
self._queue.put_nowait(msg) self._queue.put_nowait(msg)
@@ -183,11 +189,68 @@ class TelegramHandler(logging.Handler):
return True return True
return False return False
def flush(self, timeout: float = 15.0) -> None:
"""Дождаться отправки всех сообщений из очереди перед выходом программы."""
deadline = time.monotonic() + timeout
while not self._queue.empty() and time.monotonic() < deadline:
time.sleep(0.2)
self._stop.set()
self._worker.join(timeout=5.0)
def close(self) -> None: def close(self) -> None:
self._stop.set() self._stop.set()
super().close() super().close()
def flush_telegram_handlers(timeout: float = 15.0) -> None:
"""Ожидание отправки всех сообщений в Telegram перед выходом (вызывать в конце main)."""
root = logging.getLogger(ROOT_LOGGER_NAME)
for h in root.handlers:
if isinstance(h, TelegramHandler):
h.flush(timeout=timeout)
def send_batch_notification(telegram_config: Optional[Dict[str, Any]], text: str) -> None:
"""
Отправить структурированное batch-сообщение напрямую в Telegram (PRD v1.9).
Не идёт через очередь, отправка синхронная с retry.
"""
if not telegram_config or not telegram_config.get("enabled"):
return
token = telegram_config.get("bot_token")
chat_id = telegram_config.get("chat_id")
if not token or not chat_id:
return
retry_cfg = telegram_config.get("retry", {})
attempts = retry_cfg.get("attempts", 3)
delay = retry_cfg.get("delay", 5)
opts = telegram_config.get("options", {})
parse_mode = opts.get("parse_mode", "HTML")
max_len = opts.get("max_message_length", 4096)
if parse_mode == "HTML":
text = text.replace("&", "&amp;").replace("<", "&lt;").replace(">", "&gt;")
parts = [text[i : i + max_len] for i in range(0, len(text), max_len)] if len(text) > max_len else [text]
import requests
url = f"https://api.telegram.org/bot{token}/sendMessage"
for part in parts:
for attempt in range(1, attempts + 1):
try:
resp = requests.post(
url,
data={"chat_id": chat_id, "text": part, "parse_mode": parse_mode},
timeout=10,
)
resp.raise_for_status()
if not resp.json().get("ok"):
raise RuntimeError(resp.json().get("description", "Unknown"))
break
except Exception as e:
if attempt == attempts:
sys.stderr.write(f"[send_batch_notification] Не удалось отправить: {e}\n")
return
time.sleep(delay)
def _parse_level(level: str) -> int: def _parse_level(level: str) -> int:
m = { m = {
"DEBUG": logging.DEBUG, "DEBUG": logging.DEBUG,

View File

@@ -10,6 +10,9 @@ import re
from datetime import datetime, timedelta from datetime import datetime, timedelta
from typing import Any, Dict, List, Optional, Tuple from typing import Any, Dict, List, Optional, Tuple
# Результат batch-операции: (успешно, список ошибок)
# ошибка = (server/dataset_key, сообщение) — например "gwo2.mps.cln.su/containers/smb2"
from .logger import get_logger from .logger import get_logger
from .protocols import SSHProtocol from .protocols import SSHProtocol
from .ssh_base import SSHBase from .ssh_base import SSHBase
@@ -130,14 +133,18 @@ def backup_server(
server: Dict[str, Any], server: Dict[str, Any],
ssh_defaults: Dict[str, Any], ssh_defaults: Dict[str, Any],
log: logging.Logger, log: logging.Logger,
) -> bool: collect_errors: bool = False,
) -> Tuple[int, List[Tuple[str, str]]]:
""" """
Выполнить бэкап для одного сервера: снапшоты, send/recv, очистка. Выполнить бэкап для одного сервера: снапшоты, send/recv, очистка.
Конфиг v1.1: server['pools'] — список пулов с source_pool, datasets, target_pool. Конфиг v1.1: server['pools'] — список пулов с source_pool, datasets, target_pool.
Возвращает (успешно, список_ошибок), где ошибка = (ключ "server/dataset", сообщение).
""" """
name = server["name"] name = server["name"]
pools = server["pools"] pools = server["pools"]
retention_days = int(server.get("retention_days", 30)) retention_days = int(server.get("retention_days", 30))
success_count = 0
errors: List[Tuple[str, str]] = []
port = ssh_defaults.get("port", 22) port = ssh_defaults.get("port", 22)
username = ssh_defaults.get("username", "root") username = ssh_defaults.get("username", "root")
@@ -148,27 +155,35 @@ def backup_server(
log.info("Сервер %s: дата снапшотов %s", name, date_str) log.info("Сервер %s: дата снапшотов %s", name, date_str)
ssh = SSHBase(hostname=name, port=port, username=username, pkey_file=pkey_file, host_keys=host_keys) ssh = SSHBase(hostname=name, port=port, username=username, pkey_file=pkey_file, host_keys=host_keys)
ssh.connect() try:
ssh.connect()
pool_counts: Dict[str, int] = {} except Exception as e:
total_datasets = 0 for pool_config in pools:
for dataset in pool_config["datasets"]:
errors.append((f"{name}/{dataset}", str(e)))
return 0, errors
try: try:
for pool_config in pools: for pool_config in pools:
source_pool = pool_config["source_pool"] source_pool = pool_config["source_pool"]
datasets = pool_config["datasets"] datasets = pool_config["datasets"]
target_pool = pool_config["target_pool"] target_pool = pool_config["target_pool"]
pool_counts[source_pool] = len(datasets)
total_datasets += len(datasets)
for dataset in datasets: for dataset in datasets:
full_dataset = f"{source_pool}/{dataset}" full_dataset = f"{source_pool}/{dataset}"
key = f"{name}/{dataset}"
def do_snapshot(fd=full_dataset, d=date_str): def do_snapshot(fd=full_dataset, d=date_str):
if not _create_snapshot(ssh, fd, d, log): if not _create_snapshot(ssh, fd, d, log):
raise RuntimeError(f"Snapshot {fd}@{d} failed") raise RuntimeError(f"Snapshot {fd}@{d} failed")
_run_with_retry(log, name, f"snapshot {full_dataset}", do_snapshot) try:
_run_with_retry(log, name, f"snapshot {full_dataset}", do_snapshot)
except Exception as e:
errors.append((key, str(e)))
if not collect_errors:
raise
continue
for pool_config in pools: for pool_config in pools:
source_pool = pool_config["source_pool"] source_pool = pool_config["source_pool"]
@@ -178,6 +193,7 @@ def backup_server(
for dataset in datasets: for dataset in datasets:
full_dataset = f"{source_pool}/{dataset}" full_dataset = f"{source_pool}/{dataset}"
target_dataset = f"{target_pool}/{dataset}" target_dataset = f"{target_pool}/{dataset}"
key = f"{name}/{dataset}"
def do_send_recv(fd=full_dataset, td=target_dataset, d=date_str): def do_send_recv(fd=full_dataset, td=target_dataset, d=date_str):
prev_date = _get_previous_snapshot_date(ssh, td) prev_date = _get_previous_snapshot_date(ssh, td)
@@ -188,14 +204,20 @@ def backup_server(
if not _replicate_snapshot(ssh, fd, td, d, prev_date, log): if not _replicate_snapshot(ssh, fd, td, d, prev_date, log):
raise RuntimeError(f"Replicate {fd}@{d}{td} failed") raise RuntimeError(f"Replicate {fd}@{d}{td} failed")
_run_with_retry(log, name, f"send/recv {full_dataset}", do_send_recv) try:
_run_with_retry(log, name, f"send/recv {full_dataset}", do_send_recv)
success_count += 1
except Exception as e:
errors.append((key, str(e)))
if not collect_errors:
raise
continue
cutoff = datetime.now() - timedelta(days=retention_days) cutoff = datetime.now() - timedelta(days=retention_days)
for pool_config in pools: for pool_config in pools:
source_pool = pool_config["source_pool"] source_pool = pool_config["source_pool"]
datasets = pool_config["datasets"] datasets = pool_config["datasets"]
target_pool = pool_config["target_pool"] target_pool = pool_config["target_pool"]
for dataset in datasets: for dataset in datasets:
for pool, ds in [(source_pool, dataset), (target_pool, dataset)]: for pool, ds in [(source_pool, dataset), (target_pool, dataset)]:
full_ds = f"{pool}/{ds}" full_ds = f"{pool}/{ds}"
@@ -218,9 +240,9 @@ def backup_server(
except ValueError: except ValueError:
continue continue
parts = ", ".join(f"{p}:{c}" for p, c in sorted(pool_counts.items())) if not collect_errors:
log.info("%s: %s datasets backed up (%s)", name, total_datasets, parts) success_count = sum(len(pc["datasets"]) for pc in pools)
return True return success_count, errors
finally: finally:
ssh.close() ssh.close()

View File

@@ -1,17 +1,15 @@
#!/usr/bin/python3 #!/usr/bin/python3
# -*- coding: utf-8 -*- # -*- coding: utf-8 -*-
""" """
Точка входа ZFS Backup — cron/CLI (PRD v1.8). Точка входа ZFS Backup — cron/CLI (PRD v1.8, v1.9).
Конфигурация: config/config_log.yaml (логирование), config/zfs_backup.yaml (бэкап). Конфигурация: config/config_log.yaml (логирование), config/zfs_backup.yaml (бэкап).
Запуск: python zfs_backup.py
или: zfs-backup (после pip install -e .)
""" """
import argparse import argparse
import logging import logging
import os import os
import sys import sys
from typing import Any, Dict, Optional from datetime import datetime
from typing import Any, Dict, List, Optional, Tuple
import yaml import yaml
@@ -19,7 +17,7 @@ SCRIPT_DIR = os.path.dirname(os.path.abspath(__file__))
if SCRIPT_DIR not in sys.path: if SCRIPT_DIR not in sys.path:
sys.path.insert(0, SCRIPT_DIR) sys.path.insert(0, SCRIPT_DIR)
from modules.logger import setup_root_logger, get_logger from modules.logger import setup_root_logger, get_logger, flush_telegram_handlers, send_batch_notification
from modules.zfs_backup_ops import backup_server, MAX_RETRIES from modules.zfs_backup_ops import backup_server, MAX_RETRIES
DEFAULT_LOG_FILE = "/var/log/zfs_backup.log" DEFAULT_LOG_FILE = "/var/log/zfs_backup.log"
@@ -53,6 +51,87 @@ def setup_logging(
return get_logger("zfs_backup") return get_logger("zfs_backup")
def _build_server_datasets(server: Dict[str, Any]) -> List[Tuple[str, str]]:
"""Список наборов данных (source_path, target_path) для одного сервера. PRD v1.9.2."""
configs = []
for pool in server["pools"]:
for ds in pool["datasets"]:
src = f"{pool['source_pool']}/{ds}"
tgt = f"{pool['target_pool']}/{ds}"
configs.append((src, tgt))
return configs
def _format_server_start(server_name: str, configs: List[Tuple[str, str]], max_items: int = 20) -> str:
"""Формирование уведомления о старте бэкапов для одного сервера. PRD v1.9.2."""
ts = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
lines = [
f"🚀 Запуск бэкапов на сервере - {server_name}",
f"⏰ Время: {ts}",
f"📊 Всего наборов данных: {len(configs)}",
"📋 Наборы данных:",
]
items = [f"{src} -> {tgt}" for src, tgt in configs]
if len(items) > max_items:
items = items[:max_items] + [f" ... и ещё {len(items) - max_items}"]
lines.extend(items)
lines.append("Статус: в процессе...")
return "\n".join(lines)
def _error_key_to_display(server: Dict[str, Any], key: str) -> str:
"""Преобразует ключ ошибки server/dataset в source_pool/dataset для отображения. PRD v1.9.3."""
parts = key.split("/", 1)
if len(parts) != 2:
return key
_server_name, dataset = parts
for pool in server["pools"]:
if dataset in pool.get("datasets", []):
return f"{pool['source_pool']}/{dataset}"
return key
def _format_server_finish(
server_name: str,
total: int,
success: int,
errors: List[Tuple[str, str]],
server: Dict[str, Any],
max_items: int = 20,
) -> str:
"""Формирование уведомления о завершении бэкапов для одного сервера. PRD v1.9.3."""
ts = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
err_count = len(errors)
if err_count == 0:
header = f"🎉 Завершены бэкапы на сервере - {server_name}"
status = "Статус: ✅ ВСЕ УСПЕШНО"
elif err_count < total:
header = f"⚠️ Завершены бэкапы на сервере - {server_name}"
status = "Статус: ⚠️ ТРЕБУЕТ ВНИМАНИЯ"
else:
header = f"🔥 Завершены бэкапы на сервере - {server_name}"
status = "Статус: 🔥 ТРЕБУЕТСЯ ВМЕШАТЕЛЬСТВО"
lines = [
header,
f"⏰ Время: {ts}",
"📊 Итоги:",
f"├─ 📈 Всего: {total}",
f"├─ ✅ Успешно: {success}",
f"└─ ❌ Ошибки: {err_count}",
]
if errors:
lines.append("❌ Список ошибок:")
err_items = [
f"{_error_key_to_display(server, k)}: {msg}" for k, msg in errors
]
if len(err_items) > max_items:
err_items = err_items[:max_items] + [f" ... и ещё {len(err_items) - max_items}"]
lines.extend(err_items)
lines.append(status)
return "\n".join(lines)
def load_config(config_path: str) -> Dict[str, Any]: def load_config(config_path: str) -> Dict[str, Any]:
"""Загрузка конфигурации из YAML (формат v1.1: servers[].pools[]).""" """Загрузка конфигурации из YAML (формат v1.1: servers[].pools[])."""
with open(config_path, "r", encoding="utf-8") as f: with open(config_path, "r", encoding="utf-8") as f:
@@ -105,10 +184,39 @@ def main() -> int:
ssh_defaults = config.get("ssh_defaults") or {} ssh_defaults = config.get("ssh_defaults") or {}
servers = config["servers"] servers = config["servers"]
log_cfg = load_log_config(args.log_config)
telegram_cfg = log_cfg.get("telegram")
batch_cfg = (telegram_cfg or {}).get("batch_notifications", {})
batch_enabled = (
batch_cfg.get("enabled")
and "zfs_backup" in batch_cfg.get("operations", [])
)
fmt = batch_cfg.get("format", {})
max_items = fmt.get("max_items_in_list", 20)
all_errors: List[Tuple[str, str]] = []
for server in servers: for server in servers:
server_configs = _build_server_datasets(server)
server_total = len(server_configs)
if batch_enabled and server_total > 0:
send_batch_notification(
telegram_cfg,
_format_server_start(server["name"], server_configs, max_items),
)
try: try:
backup_server(server, ssh_defaults, log) success, errors = backup_server(
server, ssh_defaults, log, collect_errors=batch_enabled
)
all_errors.extend(errors)
if batch_enabled and server_total > 0:
send_batch_notification(
telegram_cfg,
_format_server_finish(
server["name"], server_total, success, errors, server, max_items
),
)
except Exception as e: except Exception as e:
log.exception( log.exception(
"%s: бэкап завершился ошибкой после %s попыток: %s", "%s: бэкап завершился ошибкой после %s попыток: %s",
@@ -116,9 +224,26 @@ def main() -> int:
MAX_RETRIES, MAX_RETRIES,
e, e,
) )
return 1 if batch_enabled:
server_errors = [
(f"{server['name']}/{ds}", str(e))
for pool in server["pools"]
for ds in pool["datasets"]
]
all_errors.extend(server_errors)
if server_total > 0:
send_batch_notification(
telegram_cfg,
_format_server_finish(
server["name"], server_total, 0, server_errors, server, max_items
),
)
else:
flush_telegram_handlers()
return 1
return 0 flush_telegram_handlers()
return 0 if not all_errors else 1
if __name__ == "__main__": if __name__ == "__main__":