from abc import abstractmethod from dataclasses import dataclass from datetime import datetime, timezone from typing import Optional, TypedDict, TypeVar import yaml from dateutil import parser from frostfs_testlib import reporter from frostfs_testlib.hosting.config import ServiceConfig from frostfs_testlib.hosting.interfaces import Host from frostfs_testlib.shell.interfaces import CommandResult from frostfs_testlib.storage.constants import ConfigAttributes from frostfs_testlib.testing.readable import HumanReadableABC from frostfs_testlib.utils import wallet_utils @dataclass class NodeBase(HumanReadableABC): """ Represents a node of some underlying service """ id: str name: str host: Host _process_name: str def __init__(self, id, name, host) -> None: self.id = id self.name = name self.host = host self.construct() def construct(self): pass def __eq__(self, other): return self.name == other.name def __hash__(self): return id(self.name) def __str__(self): return self.label def __repr__(self) -> str: return self.label @property def label(self) -> str: return self.name def get_service_systemctl_name(self) -> str: return self._get_attribute(ConfigAttributes.SERVICE_NAME) def get_process_name(self) -> str: return self._process_name def start_service(self): with reporter.step(f"Unmask {self.name} service on {self.host.config.address}"): self.host.unmask_service(self.name) with reporter.step(f"Start {self.name} service on {self.host.config.address}"): self.host.start_service(self.name) @abstractmethod def service_healthcheck(self) -> bool: """Service healthcheck.""" # TODO: Migrate to sub-class Metrcis (not yet exists :)) def get_metric(self, metric: str) -> CommandResult: shell = self.host.get_shell() result = shell.exec(f"curl -s {self.get_metrics_endpoint()} | grep -e '^{metric}'") return result def get_metrics_endpoint(self) -> str: return self._get_attribute(ConfigAttributes.ENDPOINT_PROMETHEUS) def stop_service(self, mask: bool = True): if mask: with reporter.step(f"Mask {self.name} service on {self.host.config.address}"): self.host.mask_service(self.name) with reporter.step(f"Stop {self.name} service on {self.host.config.address}"): self.host.stop_service(self.name) def restart_service(self): with reporter.step(f"Restart {self.name} service on {self.host.config.address}"): self.host.restart_service(self.name) def get_wallet_password(self) -> str: return self._get_attribute(ConfigAttributes.WALLET_PASSWORD) def get_wallet_path(self) -> str: return self._get_attribute( ConfigAttributes.LOCAL_WALLET_PATH, ConfigAttributes.WALLET_PATH, ) def get_remote_wallet_path(self) -> str: """ Returns node wallet file path located on remote host """ return self._get_attribute( ConfigAttributes.WALLET_PATH, ) def get_remote_config_path(self) -> str: """ Returns node config file path located on remote host """ return self._get_attribute( ConfigAttributes.CONFIG_PATH, ) def get_wallet_config_path(self) -> str: return self._get_attribute( ConfigAttributes.LOCAL_WALLET_CONFIG, ConfigAttributes.WALLET_CONFIG, ) @property def config_dir(self) -> str: return self._get_attribute(ConfigAttributes.CONFIG_DIR) @property def main_config_path(self) -> str: return self._get_attribute(ConfigAttributes.CONFIG_PATH) # TODO: Deprecated def get_config(self, config_file_path: Optional[str] = None) -> tuple[str, dict]: if config_file_path is None: config_file_path = self._get_attribute(ConfigAttributes.CONFIG_PATH) shell = self.host.get_shell() result = shell.exec(f"cat {config_file_path}") config_text = result.stdout config = yaml.safe_load(config_text) return config_file_path, config # TODO: Deprecated def save_config(self, new_config: dict, config_file_path: Optional[str] = None) -> None: if config_file_path is None: config_file_path = self._get_attribute(ConfigAttributes.CONFIG_PATH) shell = self.host.get_shell() config_str = yaml.dump(new_config) shell.exec(f"echo '{config_str}' | sudo tee {config_file_path}") def get_wallet_public_key(self): storage_wallet_path = self.get_wallet_path() storage_wallet_pass = self.get_wallet_password() return wallet_utils.get_wallet_public_key(storage_wallet_path, storage_wallet_pass) def _get_attribute(self, attribute_name: str, default_attribute_name: Optional[str] = None) -> str: config = self.host.get_service_config(self.name) if attribute_name not in config.attributes: if default_attribute_name is None: raise RuntimeError( f"Service {self.name} has no {attribute_name} in config and fallback attribute isn't set either" ) return config.attributes[default_attribute_name] return config.attributes[attribute_name] def _get_service_config(self) -> ServiceConfig: return self.host.get_service_config(self.name) def get_service_uptime(self, service: str) -> datetime: result = self.host.get_shell().exec( f"systemctl show {service} --property ActiveEnterTimestamp | cut -d '=' -f 2" ) start_time = parser.parse(result.stdout.strip()) current_time = datetime.now(tz=timezone.utc) active_time = current_time - start_time return active_time ServiceClass = TypeVar("ServiceClass", bound=NodeBase) class NodeClassDict(TypedDict): hosting_service_name: str cls: type[NodeBase]