249 lines
9.6 KiB
Python
249 lines
9.6 KiB
Python
#!/usr/bin/python3
|
||
# -*- coding: utf-8 -*-
|
||
"""
|
||
Операционный модуль ZFS Backup.
|
||
Реализует логику снапшотов, репликации и очистки для удалённых ZFS-пулов.
|
||
Используется точкой входа zfs_backup.py (cron/CLI).
|
||
"""
|
||
import logging
|
||
import re
|
||
from datetime import datetime, timedelta
|
||
from typing import Any, Dict, List, Optional, Tuple
|
||
|
||
# Результат batch-операции: (успешно, список ошибок)
|
||
# ошибка = (server/dataset_key, сообщение) — например "gwo2.mps.cln.su/containers/smb2"
|
||
|
||
from .logger import get_logger
|
||
from .protocols import SSHProtocol
|
||
from .ssh_base import SSHBase
|
||
|
||
logger = get_logger("zfs_backup")
|
||
|
||
MAX_RETRIES = 3
|
||
SNAPSHOT_DATE_FMT = "%d-%m-%Y" # dd-mm-yyyy
|
||
|
||
|
||
def _execute(ssh: SSHProtocol, command: str, log: logging.Logger) -> Tuple[str, str]:
|
||
"""Выполнить команду по SSH. При ошибке — исключение."""
|
||
stdout, stderr = ssh.cmd(command)
|
||
if stderr and stderr.strip():
|
||
raise RuntimeError(f"Команда завершилась с ошибкой: {stderr.strip()}")
|
||
return stdout, stderr
|
||
|
||
|
||
def _create_snapshot(
|
||
ssh: SSHProtocol,
|
||
full_dataset: str,
|
||
date_str: str,
|
||
log: logging.Logger,
|
||
) -> bool:
|
||
"""Создать ZFS-снапшот. При 'dataset already exists' — успех (skip)."""
|
||
cmd = f"/usr/sbin/zfs snapshot {full_dataset}@{date_str}"
|
||
stdout, stderr = ssh.cmd(cmd, suppress_warnings=True)
|
||
stderr_lower = (stderr or "").lower()
|
||
|
||
if "dataset already exists" in stderr_lower:
|
||
pool_name = full_dataset.split("/")[0]
|
||
snapshot_name = "/".join(full_dataset.split("/")[1:])
|
||
log.info(
|
||
"В пуле %s снимок %s@%s уже существует, действие пропущено",
|
||
pool_name,
|
||
snapshot_name,
|
||
date_str,
|
||
)
|
||
return True
|
||
|
||
if stderr and stderr.strip():
|
||
log.error("❌ Создание снимка failed: %s", stderr.strip())
|
||
return False
|
||
|
||
log.info("✅ Снимок %s@%s создан", full_dataset, date_str)
|
||
return True
|
||
|
||
|
||
def _replicate_snapshot(
|
||
ssh: SSHProtocol,
|
||
source_dataset: str,
|
||
target_dataset: str,
|
||
date_str: str,
|
||
prev_date: Optional[str],
|
||
log: logging.Logger,
|
||
) -> bool:
|
||
"""Репликация снапшота. При 'not an earlier snapshot' — успех (skip)."""
|
||
if prev_date:
|
||
send_cmd = f"/usr/sbin/zfs send -i @{prev_date} {source_dataset}@{date_str}"
|
||
else:
|
||
send_cmd = f"/usr/sbin/zfs send {source_dataset}@{date_str}"
|
||
full_cmd = f"{send_cmd} | /usr/sbin/zfs recv -F {target_dataset}"
|
||
stdout, stderr = ssh.cmd(full_cmd, suppress_warnings=True)
|
||
|
||
if stderr and "not an earlier snapshot from the same fs" in stderr:
|
||
pool_name = target_dataset.split("/")[0]
|
||
dataset_name = "/".join(target_dataset.split("/")[1:])
|
||
log.info("В пуле %s снимок %s@%s уже существует", pool_name, dataset_name, date_str)
|
||
return True
|
||
|
||
if stderr and stderr.strip():
|
||
log.error("❌ Репликация failed: %s", stderr.strip())
|
||
return False
|
||
|
||
log.info("✅ Репликация %s → %s", source_dataset, target_dataset)
|
||
return True
|
||
|
||
|
||
def _get_previous_snapshot_date(ssh: SSHProtocol, target_dataset: str) -> Optional[str]:
|
||
"""Дата последнего снапшота target_dataset (dd-mm-yyyy) или None."""
|
||
cmd = f"/usr/sbin/zfs list -t snapshot -H -o name {target_dataset} 2>/dev/null || true"
|
||
stdout, _ = ssh.cmd(cmd)
|
||
lines = [s.strip() for s in stdout.splitlines() if s.strip()]
|
||
pattern = re.compile(r"@(\d{2}-\d{2}-\d{4})$")
|
||
dates: List[str] = []
|
||
for line in lines:
|
||
m = pattern.search(line)
|
||
if m:
|
||
dates.append(m.group(1))
|
||
if not dates:
|
||
return None
|
||
dates.sort(key=lambda s: datetime.strptime(s, "%d-%m-%Y"))
|
||
return dates[-1]
|
||
|
||
|
||
def _run_with_retry(
|
||
log: logging.Logger,
|
||
server_name: str,
|
||
operation_name: str,
|
||
func,
|
||
*args,
|
||
**kwargs,
|
||
) -> Any:
|
||
"""Выполнить операцию до MAX_RETRIES раз при ошибке."""
|
||
last_exc = None
|
||
for attempt in range(1, MAX_RETRIES + 1):
|
||
try:
|
||
return func(*args, **kwargs)
|
||
except Exception as e:
|
||
last_exc = e
|
||
log.warning("❌ %s: failed %s (retry %s/%s): %s", server_name, operation_name, attempt, MAX_RETRIES, e)
|
||
if attempt == MAX_RETRIES:
|
||
raise
|
||
raise last_exc
|
||
|
||
|
||
def backup_server(
|
||
server: Dict[str, Any],
|
||
ssh_defaults: Dict[str, Any],
|
||
log: logging.Logger,
|
||
collect_errors: bool = False,
|
||
) -> Tuple[int, List[Tuple[str, str]]]:
|
||
"""
|
||
Выполнить бэкап для одного сервера: снапшоты, send/recv, очистка.
|
||
Конфиг v1.1: server['pools'] — список пулов с source_pool, datasets, target_pool.
|
||
Возвращает (успешно, список_ошибок), где ошибка = (ключ "server/dataset", сообщение).
|
||
"""
|
||
name = server["name"]
|
||
pools = server["pools"]
|
||
retention_days = int(server.get("retention_days", 30))
|
||
success_count = 0
|
||
errors: List[Tuple[str, str]] = []
|
||
|
||
port = ssh_defaults.get("port", 22)
|
||
username = ssh_defaults.get("username", "root")
|
||
pkey_file = ssh_defaults.get("pkey_file", "/root/.ssh/id_rsa")
|
||
host_keys = ssh_defaults.get("host_keys", "~/.ssh/known_hosts")
|
||
|
||
date_str = datetime.now().strftime(SNAPSHOT_DATE_FMT)
|
||
log.info("Сервер %s: дата снапшотов %s", name, date_str)
|
||
|
||
ssh = SSHBase(hostname=name, port=port, username=username, pkey_file=pkey_file, host_keys=host_keys)
|
||
try:
|
||
ssh.connect()
|
||
except Exception as e:
|
||
for pool_config in pools:
|
||
for dataset in pool_config["datasets"]:
|
||
errors.append((f"{name}/{dataset}", str(e)))
|
||
return 0, errors
|
||
|
||
try:
|
||
for pool_config in pools:
|
||
source_pool = pool_config["source_pool"]
|
||
datasets = pool_config["datasets"]
|
||
target_pool = pool_config["target_pool"]
|
||
|
||
for dataset in datasets:
|
||
full_dataset = f"{source_pool}/{dataset}"
|
||
key = f"{name}/{dataset}"
|
||
|
||
def do_snapshot(fd=full_dataset, d=date_str):
|
||
if not _create_snapshot(ssh, fd, d, log):
|
||
raise RuntimeError(f"Snapshot {fd}@{d} failed")
|
||
|
||
try:
|
||
_run_with_retry(log, name, f"snapshot {full_dataset}", do_snapshot)
|
||
except Exception as e:
|
||
errors.append((key, str(e)))
|
||
if not collect_errors:
|
||
raise
|
||
continue
|
||
|
||
for pool_config in pools:
|
||
source_pool = pool_config["source_pool"]
|
||
datasets = pool_config["datasets"]
|
||
target_pool = pool_config["target_pool"]
|
||
|
||
for dataset in datasets:
|
||
full_dataset = f"{source_pool}/{dataset}"
|
||
target_dataset = f"{target_pool}/{dataset}"
|
||
key = f"{name}/{dataset}"
|
||
|
||
def do_send_recv(fd=full_dataset, td=target_dataset, d=date_str):
|
||
prev_date = _get_previous_snapshot_date(ssh, td)
|
||
if prev_date:
|
||
log.info("Инкрементальная передача %s (от %s)", fd, prev_date)
|
||
else:
|
||
log.info("Полная передача %s", fd)
|
||
if not _replicate_snapshot(ssh, fd, td, d, prev_date, log):
|
||
raise RuntimeError(f"Replicate {fd}@{d} → {td} failed")
|
||
|
||
try:
|
||
_run_with_retry(log, name, f"send/recv {full_dataset}", do_send_recv)
|
||
success_count += 1
|
||
except Exception as e:
|
||
errors.append((key, str(e)))
|
||
if not collect_errors:
|
||
raise
|
||
continue
|
||
|
||
cutoff = datetime.now() - timedelta(days=retention_days)
|
||
for pool_config in pools:
|
||
source_pool = pool_config["source_pool"]
|
||
datasets = pool_config["datasets"]
|
||
target_pool = pool_config["target_pool"]
|
||
for dataset in datasets:
|
||
for pool, ds in [(source_pool, dataset), (target_pool, dataset)]:
|
||
full_ds = f"{pool}/{ds}"
|
||
cmd = f"/usr/sbin/zfs list -t snapshot -H -o name {full_ds} 2>/dev/null || true"
|
||
stdout, _ = ssh.cmd(cmd)
|
||
pattern = re.compile(r"@(\d{2}-\d{2}-\d{4})$")
|
||
for line in stdout.splitlines():
|
||
line = line.strip()
|
||
if not line:
|
||
continue
|
||
m = pattern.search(line)
|
||
if not m:
|
||
continue
|
||
try:
|
||
snap_date = datetime.strptime(m.group(1), "%d-%m-%Y")
|
||
if snap_date < cutoff:
|
||
destroy_cmd = f"/usr/sbin/zfs destroy {line}"
|
||
_execute(ssh, destroy_cmd, log)
|
||
log.info("Удалён старый снапшот: %s", line)
|
||
except ValueError:
|
||
continue
|
||
|
||
if not collect_errors:
|
||
success_count = sum(len(pc["datasets"]) for pc in pools)
|
||
return success_count, errors
|
||
|
||
finally:
|
||
ssh.close()
|