#!/usr/bin/python3 # -*- coding: utf-8 -*- """ Модуль для работы с PostgreSQL через SSH """ import logging from datetime import datetime from typing import Optional, List, Dict, Union logger = logging.getLogger(__name__) class PostgreSQLOperations: """ Класс для операций с PostgreSQL через SSH """ def __init__(self, ssh_client): """ Инициализация модуля PostgreSQL Args: ssh_client: Экземпляр SSHBase для выполнения команд """ self.ssh = ssh_client def bases_list(self, srv_pgsql: str) -> List[str]: """ Получение списка баз данных PostgreSQL Args: srv_pgsql: Сервер PostgreSQL Returns: list: Список имен баз данных """ try: logger.info(f"Получение списка баз данных PostgreSQL с сервера {srv_pgsql}") std, err = self.ssh.cmd(f'psql -h {srv_pgsql} -U postgres -t -A -q -c "select datname from pg_database"') if err and err.strip(): logger.error(f"Ошибка получения списка баз: {err}") raise Exception(f"Ошибка получения списка баз: {err}") bases = [b.strip() for b in std.split('\n') if b.strip()] # Удаляем системные базы for sys_base in ['postgres', 'template0', 'template1']: if sys_base in bases: bases.remove(sys_base) logger.info(f"Найдено {len(bases)} баз данных") return bases except Exception as e: logger.error(f"Ошибка при получении списка баз данных: {e}") raise Exception(f"Ошибка при получении списка баз данных: {e}") def bases_size(self, srv_pgsql: str) -> List[List[Union[str, List[str]]]]: """ Получение размеров всех баз данных PostgreSQL Args: srv_pgsql: Сервер PostgreSQL Returns: list: Список списков [имя_базы, размер, ошибки] """ if not srv_pgsql or not isinstance(srv_pgsql, str) or not srv_pgsql.strip(): raise ValueError("srv_pgsql должен быть непустой строкой") try: logger.info(f"Получение размеров баз данных PostgreSQL с сервера {srv_pgsql}") base_size = [] for base in self.bases_list(srv_pgsql): std, err = self.ssh.cmd(f'psql -h {srv_pgsql} -U postgres -c "SELECT pg_size_pretty( pg_database_size( \'{base}\' ) );"') base_size.append([base, std.split('\n')[2:-3], err.split('\n')]) logger.info(f"Получены размеры для {len(base_size)} баз данных") return base_size except Exception as e: logger.error(f"Ошибка при получении размеров баз данных: {e}") raise Exception(f"Ошибка при получении размеров баз данных: {e}") def bases_size_print(self, srv_pgsql: str) -> None: """ Вывод размеров баз данных PostgreSQL в консоль Args: srv_pgsql: Сервер PostgreSQL """ for base_info in self.bases_size(srv_pgsql): print(f"{base_info[0].ljust(20)} | {''.join(base_info[1])}") def bases_list_print(self, srv_pgsql: str) -> None: """ Вывод списка баз данных PostgreSQL в консоль Args: srv_pgsql: Сервер PostgreSQL """ for base in self.bases_list(srv_pgsql): print(base) def bases_backup(self, srv_pgsql: str, base_name: Optional[str] = None) -> List[Dict[str, Union[str, bool]]]: """ Создание бэкапа базы данных PostgreSQL Args: srv_pgsql: Сервер PostgreSQL base_name: Имя базы данных (если None, бэкапятся все базы) Returns: list: Список словарей с результатами бэкапа, каждый содержит: - 'base': имя базы данных - 'stdout': стандартный вывод - 'stderr': вывод ошибок - 'success': True/False в зависимости от успешности операции Raises: ValueError: При невалидных входных данных Exception: При ошибке создания бэкапа """ # Валидация входных данных if not srv_pgsql or not isinstance(srv_pgsql, str) or not srv_pgsql.strip(): raise ValueError("srv_pgsql должен быть непустой строкой") if base_name is not None and (not isinstance(base_name, str) or not base_name.strip()): raise ValueError("base_name должен быть None или непустой строкой") try: date = datetime.now().strftime("%d.%m.%Y") backup_dir = f'/backup/pgsql/{srv_pgsql}/{date}-extra/' logger.info(f"Создание бэкапа базы данных {base_name or 'всех баз'} с сервера {srv_pgsql}") std, err = self.ssh.cmd(f'mkdir -p {backup_dir}') if err and err.strip(): logger.error(f"Ошибка создания директории: {err}") raise Exception(f"Ошибка создания директории: {err}") results = [] if base_name: logger.info(f"Создание бэкапа базы {base_name}") std, err = self.ssh.cmd(f'pg_dump -h {srv_pgsql} -U postgres -Fd -w {base_name} -j 10 -f {backup_dir}{base_name}.dump') success = not (err and err.strip()) results.append({ 'base': base_name, 'stdout': std, 'stderr': err, 'success': success }) if not success: logger.error(f"Ошибка бэкапа базы {base_name}: {err}") raise Exception(f"Ошибка бэкапа базы {base_name}: {err}") logger.info(f"Бэкап базы {base_name} создан успешно") else: bases = self.bases_list(srv_pgsql) logger.info(f"Создание бэкапа {len(bases)} баз данных") for base in bases: logger.info(f"Создание бэкапа базы {base}") std, err = self.ssh.cmd(f'pg_dump -h {srv_pgsql} -U postgres -Fd -w {base} -j 10 -f {backup_dir}{base}.dump') success = not (err and err.strip()) results.append({ 'base': base, 'stdout': std, 'stderr': err, 'success': success }) if success: logger.info(f"Бэкап базы {base} создан успешно") else: logger.warning(f"Ошибка при создании бэкапа базы {base}: {err}") return results except Exception as e: logger.error(f"Ошибка при создании бэкапа: {e}") raise Exception(f"Ошибка при создании бэкапа: {e}") def bases_backup_all(self, srv_pgsql: str) -> str: """ Создание бэкапа всех баз данных PostgreSQL Args: srv_pgsql: Сервер PostgreSQL Returns: str: Результат последней операции бэкапа """ try: date = datetime.now().strftime("%d.%m.%Y") backup_dir = f'/backup/pgsql/{srv_pgsql}-{date}/' logger.info(f"Создание бэкапа всех баз данных с сервера {srv_pgsql}") std, err = self.ssh.cmd(f'mkdir -p {backup_dir}') if err and err.strip(): logger.error(f"Ошибка создания директории: {err}") raise Exception(f"Ошибка создания директории: {err}") bases = self.bases_list(srv_pgsql) logger.info(f"Создание бэкапа {len(bases)} баз данных") for base in bases: logger.info(f"Создание бэкапа базы {base}") std, err = self.ssh.cmd(f'pg_dump -h {srv_pgsql} -U postgres -Fd -w {base} -j 10 -f {backup_dir}{base}.dump') if err and err.strip(): logger.error(f"Ошибка бэкапа базы {base}: {err}") raise Exception(f"Ошибка бэкапа базы {base}: {err}") logger.info(f"Бэкап базы {base} создан успешно") logger.info(f"Бэкап всех баз данных завершен") return std except Exception as e: logger.error(f"Ошибка при создании бэкапа всех баз: {e}") raise Exception(f"Ошибка при создании бэкапа всех баз: {e}") def base_create(self, srv_pgsql: str, base_name: str) -> str: """ Создание базы данных PostgreSQL Args: srv_pgsql: Сервер PostgreSQL base_name: Имя базы данных Returns: str: Результат выполнения команды """ if not srv_pgsql or not isinstance(srv_pgsql, str) or not srv_pgsql.strip(): raise ValueError("srv_pgsql должен быть непустой строкой") if not base_name or not isinstance(base_name, str) or not base_name.strip(): raise ValueError("base_name должен быть непустой строкой") try: logger.info(f"Создание базы данных {base_name} на сервере {srv_pgsql}") std, err = self.ssh.cmd(f'createdb -h {srv_pgsql} -U postgres -w {base_name}') if err and err.strip(): logger.error(f"Ошибка создания базы данных: {err}") raise Exception(f"Ошибка создания базы данных: {err}") logger.info(f"База данных {base_name} создана успешно") return std except Exception as e: logger.error(f"Ошибка при создании базы данных: {e}") raise Exception(f"Ошибка при создании базы данных: {e}") def base_drop(self, srv_pgsql: str, base_name: str) -> str: """ Удаление базы данных PostgreSQL Args: srv_pgsql: Сервер PostgreSQL base_name: Имя базы данных Returns: str: Результат выполнения команды """ if not srv_pgsql or not isinstance(srv_pgsql, str) or not srv_pgsql.strip(): raise ValueError("srv_pgsql должен быть непустой строкой") if not base_name or not isinstance(base_name, str) or not base_name.strip(): raise ValueError("base_name должен быть непустой строкой") try: logger.info(f"Удаление базы данных {base_name} с сервера {srv_pgsql}") std, err = self.ssh.cmd(f'dropdb -h {srv_pgsql} -U postgres -w {base_name} -f') if err and err.strip(): logger.error(f"Ошибка удаления базы данных: {err}") raise Exception(f"Ошибка удаления базы данных: {err}") logger.info(f"База данных {base_name} удалена успешно") return std except Exception as e: logger.error(f"Ошибка при удалении базы данных: {e}") raise Exception(f"Ошибка при удалении базы данных: {e}") def base_restore(self, arhive_srv_pgsql: str, restore_srv_pgsql: str, backup_date: str, arhive_base_name: str, restore_base_name: str, extra: bool = False) -> str: """ Восстановление базы данных PostgreSQL из бэкапа Args: arhive_srv_pgsql: Сервер с архивом restore_srv_pgsql: Сервер для восстановления backup_date: Дата бэкапа arhive_base_name: Имя базы в архиве restore_base_name: Имя базы для восстановления extra: Флаг экстра-бэкапа Returns: str: Результат выполнения команды """ # Валидация входных данных for param_name, param_value in [ ('arhive_srv_pgsql', arhive_srv_pgsql), ('restore_srv_pgsql', restore_srv_pgsql), ('backup_date', backup_date), ('arhive_base_name', arhive_base_name), ('restore_base_name', restore_base_name) ]: if not param_value or not isinstance(param_value, str) or not param_value.strip(): raise ValueError(f"{param_name} должен быть непустой строкой") try: if extra: backup_path = f'/backup/pgsql/{arhive_srv_pgsql}/{backup_date}-extra/{arhive_base_name}.dump' else: backup_path = f'/backup/pgsql/{arhive_srv_pgsql}-{backup_date}/{arhive_base_name}.dump' logger.info(f"Восстановление базы данных {restore_base_name} из {backup_path}") std, err = self.ssh.cmd( f'pg_restore -h {restore_srv_pgsql} -U postgres -w -Fd -j10 -d {restore_base_name} {backup_path}' ) if err and err.strip(): logger.error(f"Ошибка восстановления базы данных: {err}") raise Exception(f"Ошибка восстановления базы данных: {err}") logger.info(f"База данных {restore_base_name} восстановлена успешно") return std except Exception as e: logger.error(f"Ошибка при восстановлении базы данных: {e}") raise Exception(f"Ошибка при восстановлении базы данных: {e}") def file_list(self, path: str, days: Union[int, str]) -> str: """ Получение списка директорий старше указанного количества дней Args: path: Путь для поиска days: Количество дней (строка или число) Returns: str: Список директорий """ if not path or not isinstance(path, str) or not path.strip(): raise ValueError("path должен быть непустой строкой") try: days_str = str(days) logger.info(f"Поиск директорий старше {days} дней в {path}") std, err = self.ssh.cmd(f'/usr/bin/find {path} -type d -maxdepth 1 -mtime +{days_str} -print') if err and err.strip(): logger.error(f"Ошибка поиска файлов: {err}") raise Exception(f"Ошибка поиска файлов: {err}") return std except Exception as e: logger.error(f"Ошибка при получении списка файлов: {e}") raise Exception(f"Ошибка при получении списка файлов: {e}") def delete_old_backups(self, path: str, days: Union[int, str]) -> None: """ Удаление старых бэкапов Args: path: Путь к директории с бэкапами days: Количество дней (строка или число) """ if not path or not isinstance(path, str) or not path.strip(): raise ValueError("path должен быть непустой строкой") try: days_str = str(days) logger.info(f"Удаление бэкапов старше {days} дней из {path}") dirs, err = self.ssh.cmd(f'/usr/bin/find {path} -type d -maxdepth 1 -mtime +{days_str} -print') if err and err.strip(): logger.error(f"Ошибка поиска директорий: {err}") raise Exception(f"Ошибка поиска директорий: {err}") dirs_list = [d.strip() for d in dirs.split('\n') if d.strip()] logger.info(f"Найдено {len(dirs_list)} директорий для удаления") for d in dirs_list: logger.info(f"Удаление директории {d}") std, err = self.ssh.cmd(f'rm -r {d}') if err and err.strip(): logger.error(f"Ошибка удаления директории {d}: {err}") raise Exception(f"Ошибка удаления директории {d}: {err}") logger.info(f"Удаление завершено") except Exception as e: logger.error(f"Ошибка при удалении старых бэкапов: {e}") raise Exception(f"Ошибка при удалении старых бэкапов: {e}")