Files
cursor_ai/modules/postgresql.py

369 lines
18 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/python3
# -*- coding: utf-8 -*-
"""
Модуль для работы с PostgreSQL через SSH
"""
import logging
from datetime import datetime
from typing import Optional, List, Dict, Union
logger = logging.getLogger(__name__)
class PostgreSQLOperations:
"""
Класс для операций с PostgreSQL через SSH
"""
def __init__(self, ssh_client):
"""
Инициализация модуля PostgreSQL
Args:
ssh_client: Экземпляр SSHBase для выполнения команд
"""
self.ssh = ssh_client
def bases_list(self, srv_pgsql: str) -> List[str]:
"""
Получение списка баз данных PostgreSQL
Args:
srv_pgsql: Сервер PostgreSQL
Returns:
list: Список имен баз данных
"""
try:
logger.info(f"Получение списка баз данных PostgreSQL с сервера {srv_pgsql}")
std, err = self.ssh.cmd(f'psql -h {srv_pgsql} -U postgres -t -A -q -c "select datname from pg_database"')
if err and err.strip():
logger.error(f"Ошибка получения списка баз: {err}")
raise Exception(f"Ошибка получения списка баз: {err}")
bases = [b.strip() for b in std.split('\n') if b.strip()]
# Удаляем системные базы
for sys_base in ['postgres', 'template0', 'template1']:
if sys_base in bases:
bases.remove(sys_base)
logger.info(f"Найдено {len(bases)} баз данных")
return bases
except Exception as e:
logger.error(f"Ошибка при получении списка баз данных: {e}")
raise Exception(f"Ошибка при получении списка баз данных: {e}")
def bases_size(self, srv_pgsql: str) -> List[List[Union[str, List[str]]]]:
"""
Получение размеров всех баз данных PostgreSQL
Args:
srv_pgsql: Сервер PostgreSQL
Returns:
list: Список списков [имя_базы, размер, ошибки]
"""
if not srv_pgsql or not isinstance(srv_pgsql, str) or not srv_pgsql.strip():
raise ValueError("srv_pgsql должен быть непустой строкой")
try:
logger.info(f"Получение размеров баз данных PostgreSQL с сервера {srv_pgsql}")
base_size = []
for base in self.bases_list(srv_pgsql):
std, err = self.ssh.cmd(f'psql -h {srv_pgsql} -U postgres -c "SELECT pg_size_pretty( pg_database_size( \'{base}\' ) );"')
base_size.append([base, std.split('\n')[2:-3], err.split('\n')])
logger.info(f"Получены размеры для {len(base_size)} баз данных")
return base_size
except Exception as e:
logger.error(f"Ошибка при получении размеров баз данных: {e}")
raise Exception(f"Ошибка при получении размеров баз данных: {e}")
def bases_size_print(self, srv_pgsql: str) -> None:
"""
Вывод размеров баз данных PostgreSQL в консоль
Args:
srv_pgsql: Сервер PostgreSQL
"""
for base_info in self.bases_size(srv_pgsql):
print(f"{base_info[0].ljust(20)} | {''.join(base_info[1])}")
def bases_list_print(self, srv_pgsql: str) -> None:
"""
Вывод списка баз данных PostgreSQL в консоль
Args:
srv_pgsql: Сервер PostgreSQL
"""
for base in self.bases_list(srv_pgsql):
print(base)
def bases_backup(self, srv_pgsql: str, base_name: Optional[str] = None) -> List[Dict[str, Union[str, bool]]]:
"""
Создание бэкапа базы данных PostgreSQL
Args:
srv_pgsql: Сервер PostgreSQL
base_name: Имя базы данных (если None, бэкапятся все базы)
Returns:
list: Список словарей с результатами бэкапа, каждый содержит:
- 'base': имя базы данных
- 'stdout': стандартный вывод
- 'stderr': вывод ошибок
- 'success': True/False в зависимости от успешности операции
Raises:
ValueError: При невалидных входных данных
Exception: При ошибке создания бэкапа
"""
# Валидация входных данных
if not srv_pgsql or not isinstance(srv_pgsql, str) or not srv_pgsql.strip():
raise ValueError("srv_pgsql должен быть непустой строкой")
if base_name is not None and (not isinstance(base_name, str) or not base_name.strip()):
raise ValueError("base_name должен быть None или непустой строкой")
try:
date = datetime.now().strftime("%d.%m.%Y")
backup_dir = f'/backup/pgsql/{srv_pgsql}/{date}-extra/'
logger.info(f"Создание бэкапа базы данных {base_name or 'всех баз'} с сервера {srv_pgsql}")
std, err = self.ssh.cmd(f'mkdir -p {backup_dir}')
if err and err.strip():
logger.error(f"Ошибка создания директории: {err}")
raise Exception(f"Ошибка создания директории: {err}")
results = []
if base_name:
logger.info(f"Создание бэкапа базы {base_name}")
std, err = self.ssh.cmd(f'pg_dump -h {srv_pgsql} -U postgres -Fd -w {base_name} -j 10 -f {backup_dir}{base_name}.dump')
success = not (err and err.strip())
results.append({
'base': base_name,
'stdout': std,
'stderr': err,
'success': success
})
if not success:
logger.error(f"Ошибка бэкапа базы {base_name}: {err}")
raise Exception(f"Ошибка бэкапа базы {base_name}: {err}")
logger.info(f"Бэкап базы {base_name} создан успешно")
else:
bases = self.bases_list(srv_pgsql)
logger.info(f"Создание бэкапа {len(bases)} баз данных")
for base in bases:
logger.info(f"Создание бэкапа базы {base}")
std, err = self.ssh.cmd(f'pg_dump -h {srv_pgsql} -U postgres -Fd -w {base} -j 10 -f {backup_dir}{base}.dump')
success = not (err and err.strip())
results.append({
'base': base,
'stdout': std,
'stderr': err,
'success': success
})
if success:
logger.info(f"Бэкап базы {base} создан успешно")
else:
logger.warning(f"Ошибка при создании бэкапа базы {base}: {err}")
return results
except Exception as e:
logger.error(f"Ошибка при создании бэкапа: {e}")
raise Exception(f"Ошибка при создании бэкапа: {e}")
def bases_backup_all(self, srv_pgsql: str) -> str:
"""
Создание бэкапа всех баз данных PostgreSQL
Args:
srv_pgsql: Сервер PostgreSQL
Returns:
str: Результат последней операции бэкапа
"""
try:
date = datetime.now().strftime("%d.%m.%Y")
backup_dir = f'/backup/pgsql/{srv_pgsql}-{date}/'
logger.info(f"Создание бэкапа всех баз данных с сервера {srv_pgsql}")
std, err = self.ssh.cmd(f'mkdir -p {backup_dir}')
if err and err.strip():
logger.error(f"Ошибка создания директории: {err}")
raise Exception(f"Ошибка создания директории: {err}")
bases = self.bases_list(srv_pgsql)
logger.info(f"Создание бэкапа {len(bases)} баз данных")
for base in bases:
logger.info(f"Создание бэкапа базы {base}")
std, err = self.ssh.cmd(f'pg_dump -h {srv_pgsql} -U postgres -Fd -w {base} -j 10 -f {backup_dir}{base}.dump')
if err and err.strip():
logger.error(f"Ошибка бэкапа базы {base}: {err}")
raise Exception(f"Ошибка бэкапа базы {base}: {err}")
logger.info(f"Бэкап базы {base} создан успешно")
logger.info(f"Бэкап всех баз данных завершен")
return std
except Exception as e:
logger.error(f"Ошибка при создании бэкапа всех баз: {e}")
raise Exception(f"Ошибка при создании бэкапа всех баз: {e}")
def base_create(self, srv_pgsql: str, base_name: str) -> str:
"""
Создание базы данных PostgreSQL
Args:
srv_pgsql: Сервер PostgreSQL
base_name: Имя базы данных
Returns:
str: Результат выполнения команды
"""
if not srv_pgsql or not isinstance(srv_pgsql, str) or not srv_pgsql.strip():
raise ValueError("srv_pgsql должен быть непустой строкой")
if not base_name or not isinstance(base_name, str) or not base_name.strip():
raise ValueError("base_name должен быть непустой строкой")
try:
logger.info(f"Создание базы данных {base_name} на сервере {srv_pgsql}")
std, err = self.ssh.cmd(f'createdb -h {srv_pgsql} -U postgres -w {base_name}')
if err and err.strip():
logger.error(f"Ошибка создания базы данных: {err}")
raise Exception(f"Ошибка создания базы данных: {err}")
logger.info(f"База данных {base_name} создана успешно")
return std
except Exception as e:
logger.error(f"Ошибка при создании базы данных: {e}")
raise Exception(f"Ошибка при создании базы данных: {e}")
def base_drop(self, srv_pgsql: str, base_name: str) -> str:
"""
Удаление базы данных PostgreSQL
Args:
srv_pgsql: Сервер PostgreSQL
base_name: Имя базы данных
Returns:
str: Результат выполнения команды
"""
if not srv_pgsql or not isinstance(srv_pgsql, str) or not srv_pgsql.strip():
raise ValueError("srv_pgsql должен быть непустой строкой")
if not base_name or not isinstance(base_name, str) or not base_name.strip():
raise ValueError("base_name должен быть непустой строкой")
try:
logger.info(f"Удаление базы данных {base_name} с сервера {srv_pgsql}")
std, err = self.ssh.cmd(f'dropdb -h {srv_pgsql} -U postgres -w {base_name} -f')
if err and err.strip():
logger.error(f"Ошибка удаления базы данных: {err}")
raise Exception(f"Ошибка удаления базы данных: {err}")
logger.info(f"База данных {base_name} удалена успешно")
return std
except Exception as e:
logger.error(f"Ошибка при удалении базы данных: {e}")
raise Exception(f"Ошибка при удалении базы данных: {e}")
def base_restore(self, arhive_srv_pgsql: str, restore_srv_pgsql: str, backup_date: str,
arhive_base_name: str, restore_base_name: str, extra: bool = False) -> str:
"""
Восстановление базы данных PostgreSQL из бэкапа
Args:
arhive_srv_pgsql: Сервер с архивом
restore_srv_pgsql: Сервер для восстановления
backup_date: Дата бэкапа
arhive_base_name: Имя базы в архиве
restore_base_name: Имя базы для восстановления
extra: Флаг экстра-бэкапа
Returns:
str: Результат выполнения команды
"""
# Валидация входных данных
for param_name, param_value in [
('arhive_srv_pgsql', arhive_srv_pgsql),
('restore_srv_pgsql', restore_srv_pgsql),
('backup_date', backup_date),
('arhive_base_name', arhive_base_name),
('restore_base_name', restore_base_name)
]:
if not param_value or not isinstance(param_value, str) or not param_value.strip():
raise ValueError(f"{param_name} должен быть непустой строкой")
try:
if extra:
backup_path = f'/backup/pgsql/{arhive_srv_pgsql}/{backup_date}-extra/{arhive_base_name}.dump'
else:
backup_path = f'/backup/pgsql/{arhive_srv_pgsql}-{backup_date}/{arhive_base_name}.dump'
logger.info(f"Восстановление базы данных {restore_base_name} из {backup_path}")
std, err = self.ssh.cmd(
f'pg_restore -h {restore_srv_pgsql} -U postgres -w -Fd -j10 -d {restore_base_name} {backup_path}'
)
if err and err.strip():
logger.error(f"Ошибка восстановления базы данных: {err}")
raise Exception(f"Ошибка восстановления базы данных: {err}")
logger.info(f"База данных {restore_base_name} восстановлена успешно")
return std
except Exception as e:
logger.error(f"Ошибка при восстановлении базы данных: {e}")
raise Exception(f"Ошибка при восстановлении базы данных: {e}")
def file_list(self, path: str, days: Union[int, str]) -> str:
"""
Получение списка директорий старше указанного количества дней
Args:
path: Путь для поиска
days: Количество дней (строка или число)
Returns:
str: Список директорий
"""
if not path or not isinstance(path, str) or not path.strip():
raise ValueError("path должен быть непустой строкой")
try:
days_str = str(days)
logger.info(f"Поиск директорий старше {days} дней в {path}")
std, err = self.ssh.cmd(f'/usr/bin/find {path} -type d -maxdepth 1 -mtime +{days_str} -print')
if err and err.strip():
logger.error(f"Ошибка поиска файлов: {err}")
raise Exception(f"Ошибка поиска файлов: {err}")
return std
except Exception as e:
logger.error(f"Ошибка при получении списка файлов: {e}")
raise Exception(f"Ошибка при получении списка файлов: {e}")
def delete_old_backups(self, path: str, days: Union[int, str]) -> None:
"""
Удаление старых бэкапов
Args:
path: Путь к директории с бэкапами
days: Количество дней (строка или число)
"""
if not path or not isinstance(path, str) or not path.strip():
raise ValueError("path должен быть непустой строкой")
try:
days_str = str(days)
logger.info(f"Удаление бэкапов старше {days} дней из {path}")
dirs, err = self.ssh.cmd(f'/usr/bin/find {path} -type d -maxdepth 1 -mtime +{days_str} -print')
if err and err.strip():
logger.error(f"Ошибка поиска директорий: {err}")
raise Exception(f"Ошибка поиска директорий: {err}")
dirs_list = [d.strip() for d in dirs.split('\n') if d.strip()]
logger.info(f"Найдено {len(dirs_list)} директорий для удаления")
for d in dirs_list:
logger.info(f"Удаление директории {d}")
std, err = self.ssh.cmd(f'rm -r {d}')
if err and err.strip():
logger.error(f"Ошибка удаления директории {d}: {err}")
raise Exception(f"Ошибка удаления директории {d}: {err}")
logger.info(f"Удаление завершено")
except Exception as e:
logger.error(f"Ошибка при удалении старых бэкапов: {e}")
raise Exception(f"Ошибка при удалении старых бэкапов: {e}")