#!/usr/bin/python3 # -*- coding: utf-8 -*- """ Загрузка конфигурации из YAML. Файлы: config/migration.yaml, config/zfs_backup.yaml """ import os from typing import Any, Dict, Optional import yaml CONFIG_DIR = os.path.dirname(os.path.abspath(__file__)) DEFAULT_MIGRATION_CONFIG = os.path.join(CONFIG_DIR, "migration.yaml") DEFAULT_ZFS_CONFIG = os.path.join(CONFIG_DIR, "zfs_backup.yaml") def _load_yaml(path: str) -> Dict[str, Any]: """Загрузить YAML-файл в словарь.""" with open(path, "r", encoding="utf-8") as f: return yaml.safe_load(f) def get_config(config_path: Optional[str] = None) -> Dict[str, Any]: """ Загрузка конфигурации миграции 1С (SSH, PostgreSQL, 1C, migration). Args: config_path: Путь к migration.yaml. По умолчанию — config/migration.yaml. Returns: dict: Секции ssh, postgresql, c1, migration, logging. """ path = config_path or os.environ.get("MIGRATION_CONFIG", DEFAULT_MIGRATION_CONFIG) if not os.path.isabs(path): # Относительно корня проекта (родитель config/) root = os.path.dirname(CONFIG_DIR) path = os.path.join(root, path) data = _load_yaml(path) config = { "ssh": data.get("ssh", {}), "postgresql": data.get("postgresql", {}), "c1": data.get("c1", {}), "migration": dict(data.get("migration", {})), "logging": data.get("logging", {"level": "INFO"}), } mig = config["migration"] archive = mig.get("archive_bases_name", []) if mig.get("restore_bases_name") is None: mig["restore_bases_name"] = list(archive) if mig.get("bases") is None: mig["bases"] = list(archive) def _norm_on_off(v: Any) -> str: if v in (True, "true", "on", "1"): return "on" if v in (False, "false", "off", "0"): return "off" raise ValueError(f"Ожидается on/off, получено: {v!r}") scheduled = _norm_on_off(mig.get("scheduled_jobs_deny", "off")) sessions = _norm_on_off(mig.get("sessions_deny", "off")) mig["scheduled_jobs_deny"] = scheduled mig["sessions_deny"] = sessions return config def load_zfs_config(config_path: Optional[str] = None) -> Dict[str, Any]: """ Загрузка конфигурации ZFS Backup. Args: config_path: Путь к zfs_backup.yaml. По умолчанию — config/zfs_backup.yaml. """ path = config_path or os.environ.get("ZFS_CONFIG", DEFAULT_ZFS_CONFIG) if not os.path.isabs(path): root = os.path.dirname(CONFIG_DIR) path = os.path.join(root, path) return _load_yaml(path)