frostfs-testlib/src/frostfs_testlib/hosting/interfaces.py
Dmitriy Zayakin f1264bd473 [#143] Change network utils
Signed-off-by: Dmitriy Zayakin <d.zayakin@yadro.com>
2023-12-13 13:15:37 +00:00

354 lines
10 KiB
Python

from abc import ABC, abstractmethod
from datetime import datetime
from typing import Optional
from frostfs_testlib.hosting.config import CLIConfig, HostConfig, ServiceConfig
from frostfs_testlib.shell.interfaces import Shell
from frostfs_testlib.testing.readable import HumanReadableEnum
from frostfs_testlib.testing.test_control import retry
class HostStatus(HumanReadableEnum):
ONLINE = "Online"
OFFLINE = "Offline"
UNKNOWN = "Unknown"
class DiskInfo(dict):
"""Dict wrapper for disk_info for disk management commands."""
class Host(ABC):
"""Interface of a host machine where frostFS services are running.
Allows to manage the machine and frostFS services that are hosted on it.
"""
def __init__(self, config: HostConfig) -> None:
self._config = config
self._service_config_by_name = {service_config.name: service_config for service_config in config.services}
self._cli_config_by_name = {cli_config.name: cli_config for cli_config in config.clis}
@property
def config(self) -> HostConfig:
"""Returns config of the host.
Returns:
Config of this host.
"""
return self._config
def get_service_config(self, service_name: str) -> ServiceConfig:
"""Returns config of service with specified name.
The service must be hosted on this host.
Args:
service_name: Name of the service.
Returns:
Config of the service.
"""
service_config = self._service_config_by_name.get(service_name)
if service_config is None:
raise ValueError(f"Unknown service name: '{service_name}'")
return service_config
def get_cli_config(self, cli_name: str) -> CLIConfig:
"""Returns config of CLI tool with specified name.
The CLI must be located on this host.
Args:
cli_name: Name of the CLI tool.
Returns:
Config of the CLI tool.
"""
cli_config = self._cli_config_by_name.get(cli_name)
if cli_config is None:
raise ValueError(f"Unknown CLI name: '{cli_name}'")
return cli_config
@abstractmethod
def get_shell(self, sudo: bool = True) -> Shell:
"""Returns shell to this host.
Args:
sudo: if True, run all commands in shell with elevated rights
Returns:
Shell that executes commands on this host.
"""
@abstractmethod
def start_host(self) -> None:
"""Starts the host machine."""
@abstractmethod
def get_host_status(self) -> HostStatus:
"""Check host status."""
@abstractmethod
def stop_host(self, mode: str) -> None:
"""Stops the host machine.
Args:
mode: Specifies mode how host should be stopped. Mode might be host-specific.
"""
@abstractmethod
def start_service(self, service_name: str) -> None:
"""Starts the service with specified name and waits until it starts.
The service must be hosted on this host.
Args:
service_name: Name of the service to start.
"""
@abstractmethod
def stop_service(self, service_name: str) -> None:
"""Stops the service with specified name and waits until it stops.
The service must be hosted on this host.
Args:
service_name: Name of the service to stop.
"""
@abstractmethod
def mask_service(self, service_name: str) -> None:
"""Prevent the service from start by any activity by masking it.
The service must be hosted on this host.
Args:
service_name: Name of the service to mask.
"""
@abstractmethod
def unmask_service(self, service_name: str) -> None:
"""Allow the service to start by any activity by unmasking it.
The service must be hosted on this host.
Args:
service_name: Name of the service to unmask.
"""
@abstractmethod
def restart_service(self, service_name: str) -> None:
"""Restarts the service with specified name and waits until it starts.
The service must be hosted on this host.
Args:
service_name: Name of the service to restart.
"""
@abstractmethod
def get_data_directory(self, service_name: str) -> str:
"""
Getting path to data directory on node for further usage
(example: list databases pilorama.db)
Args:
service_name: Name of storage node service.
"""
@abstractmethod
def wait_success_suspend_process(self, process_name: str) -> None:
"""Search for a service ID by its name and stop the process
Args:
process_name: Name
"""
@abstractmethod
def wait_success_resume_process(self, process_name: str) -> None:
"""Search for a service by its ID and start the process
Args:
process_name: Name
"""
@abstractmethod
def delete_storage_node_data(self, service_name: str, cache_only: bool = False) -> None:
"""Erases all data of the storage node with specified name.
Args:
service_name: Name of storage node service.
cache_only: To delete cache only.
"""
@abstractmethod
def delete_fstree(self, service_name: str) -> None:
"""
Deletes all fstrees in the node.
Args:
service_name: Name of storage node service.
"""
@abstractmethod
def delete_metabase(self, service_name: str) -> None:
"""
Deletes all metabase*.db in the node.
Args:
service_name: Name of storage node service.
"""
@abstractmethod
def delete_write_cache(self, service_name: str) -> None:
"""
Deletes all write_cache in the node.
Args:
service_name: Name of storage node service.
"""
@abstractmethod
def delete_blobovnicza(self, service_name: str) -> None:
"""
Deletes all blobovniczas in the node.
Args:
service_name: Name of storage node service.
"""
@abstractmethod
def delete_pilorama(self, service_name: str) -> None:
"""
Deletes all pilorama.db files in the node.
Args:
service_name: Name of storage node service.
"""
@abstractmethod
def detach_disk(self, device: str) -> DiskInfo:
"""Detaches disk device to simulate disk offline/failover scenario.
Args:
device: Device name to detach.
Returns:
internal service disk info related to host plugin (i.e. volume id for cloud devices),
which may be used to identify or re-attach existing volume back.
"""
@abstractmethod
def attach_disk(self, device: str, disk_info: DiskInfo) -> None:
"""Attaches disk device back.
Args:
device: Device name to attach.
service_info: any info required for host plugin to identify/attach disk.
"""
@abstractmethod
def is_disk_attached(self, device: str, disk_info: DiskInfo) -> bool:
"""Checks if disk device is attached.
Args:
device: Device name to check.
service_info: any info required for host plugin to identify disk.
Returns:
True if attached.
False if detached.
"""
@abstractmethod
def dump_logs(
self,
directory_path: str,
since: Optional[datetime] = None,
until: Optional[datetime] = None,
filter_regex: Optional[str] = None,
) -> None:
"""Dumps logs of all services on the host to specified directory.
Args:
directory_path: Path to the directory where logs should be stored.
since: If set, limits the time from which logs should be collected. Must be in UTC.
until: If set, limits the time until which logs should be collected. Must be in UTC.
filter_regex: regex to filter output
"""
@abstractmethod
def get_filtered_logs(
self,
filter_regex: str,
since: Optional[datetime] = None,
until: Optional[datetime] = None,
unit: Optional[str] = None,
exclude_filter: Optional[str] = None,
) -> str:
"""Get logs from host filtered by regex.
Args:
filter_regex: regex filter for logs.
since: If set, limits the time from which logs should be collected. Must be in UTC.
until: If set, limits the time until which logs should be collected. Must be in UTC.
unit: required unit.
Returns:
Found entries as str if any found.
Empty string otherwise.
"""
@abstractmethod
def is_message_in_logs(
self,
message_regex: str,
since: Optional[datetime] = None,
until: Optional[datetime] = None,
unit: Optional[str] = None,
) -> bool:
"""Checks logs on host for specified message regex.
Args:
message_regex: message to find.
since: If set, limits the time from which logs should be collected. Must be in UTC.
until: If set, limits the time until which logs should be collected. Must be in UTC.
Returns:
True if message found in logs in the given time frame.
False otherwise.
"""
@abstractmethod
def wait_for_service_to_be_in_state(self, systemd_service_name: str, expected_state: str, timeout: int) -> None:
"""
Waites for service to be in specified state.
Args:
systemd_service_name: Service to wait state of.
expected_state: State to wait for
timeout: Seconds to wait
"""
def down_interface(self, interface: str) -> None:
shell = self.get_shell()
shell.exec(f"ip link set {interface} down")
def up_interface(self, interface: str) -> None:
shell = self.get_shell()
shell.exec(f"ip link set {interface} up")
def check_state(self, interface: str) -> str:
shell = self.get_shell()
return shell.exec(f"ip link show {interface} | sed -z 's/.*state \(.*\) mode .*/\\1/'").stdout.strip()
@retry(max_attempts=5, sleep_interval=5, expected_result="UP")
def check_state_up(self, interface: str) -> str:
return self.check_state(interface=interface)
@retry(max_attempts=5, sleep_interval=5, expected_result="DOWN")
def check_state_down(self, interface: str) -> str:
return self.check_state(interface=interface)