82 lines
2.7 KiB
Python
82 lines
2.7 KiB
Python
#!/usr/bin/python3
|
||
# -*- coding: utf-8 -*-
|
||
"""
|
||
Загрузка конфигурации из YAML.
|
||
Файлы: config/migration.yaml, config/zfs_backup.yaml
|
||
"""
|
||
import os
|
||
from typing import Any, Dict, Optional
|
||
|
||
import yaml
|
||
|
||
CONFIG_DIR = os.path.dirname(os.path.abspath(__file__))
|
||
DEFAULT_MIGRATION_CONFIG = os.path.join(CONFIG_DIR, "migration.yaml")
|
||
DEFAULT_ZFS_CONFIG = os.path.join(CONFIG_DIR, "zfs_backup.yaml")
|
||
|
||
|
||
def _load_yaml(path: str) -> Dict[str, Any]:
|
||
"""Загрузить YAML-файл в словарь."""
|
||
with open(path, "r", encoding="utf-8") as f:
|
||
return yaml.safe_load(f)
|
||
|
||
|
||
def get_config(config_path: Optional[str] = None) -> Dict[str, Any]:
|
||
"""
|
||
Загрузка конфигурации миграции 1С (SSH, PostgreSQL, 1C, migration).
|
||
|
||
Args:
|
||
config_path: Путь к migration.yaml. По умолчанию — config/migration.yaml.
|
||
|
||
Returns:
|
||
dict: Секции ssh, postgresql, c1, migration, logging.
|
||
"""
|
||
path = config_path or os.environ.get("MIGRATION_CONFIG", DEFAULT_MIGRATION_CONFIG)
|
||
if not os.path.isabs(path):
|
||
# Относительно корня проекта (родитель config/)
|
||
root = os.path.dirname(CONFIG_DIR)
|
||
path = os.path.join(root, path)
|
||
data = _load_yaml(path)
|
||
|
||
config = {
|
||
"ssh": data.get("ssh", {}),
|
||
"postgresql": data.get("postgresql", {}),
|
||
"c1": data.get("c1", {}),
|
||
"migration": dict(data.get("migration", {})),
|
||
"logging": data.get("logging", {"level": "INFO"}),
|
||
}
|
||
|
||
mig = config["migration"]
|
||
archive = mig.get("archive_bases_name", [])
|
||
if mig.get("restore_bases_name") is None:
|
||
mig["restore_bases_name"] = list(archive)
|
||
if mig.get("bases") is None:
|
||
mig["bases"] = list(archive)
|
||
|
||
def _norm_on_off(v: Any) -> str:
|
||
if v in (True, "true", "on", "1"):
|
||
return "on"
|
||
if v in (False, "false", "off", "0"):
|
||
return "off"
|
||
raise ValueError(f"Ожидается on/off, получено: {v!r}")
|
||
|
||
scheduled = _norm_on_off(mig.get("scheduled_jobs_deny", "off"))
|
||
sessions = _norm_on_off(mig.get("sessions_deny", "off"))
|
||
mig["scheduled_jobs_deny"] = scheduled
|
||
mig["sessions_deny"] = sessions
|
||
|
||
return config
|
||
|
||
|
||
def load_zfs_config(config_path: Optional[str] = None) -> Dict[str, Any]:
|
||
"""
|
||
Загрузка конфигурации ZFS Backup.
|
||
|
||
Args:
|
||
config_path: Путь к zfs_backup.yaml. По умолчанию — config/zfs_backup.yaml.
|
||
"""
|
||
path = config_path or os.environ.get("ZFS_CONFIG", DEFAULT_ZFS_CONFIG)
|
||
if not os.path.isabs(path):
|
||
root = os.path.dirname(CONFIG_DIR)
|
||
path = os.path.join(root, path)
|
||
return _load_yaml(path)
|