import random import re import yaml from yarl import URL from frostfs_testlib.hosting import Host, Hosting from frostfs_testlib.hosting.config import ServiceConfig from frostfs_testlib.reporter import get_reporter from frostfs_testlib.storage import get_service_registry from frostfs_testlib.storage.constants import ConfigAttributes from frostfs_testlib.storage.dataclasses.frostfs_services import ( HTTPGate, InnerRing, MorphChain, S3Gate, StorageNode, ) from frostfs_testlib.storage.dataclasses.node_base import NodeBase, ServiceClass from frostfs_testlib.storage.dataclasses.storage_object_info import Interfaces from frostfs_testlib.storage.service_registry import ServiceRegistry reporter = get_reporter() class ClusterNode: """ Represents physical node where multiple different services may be located """ class_registry: ServiceRegistry id: int host: Host def __init__(self, host: Host, id: int) -> None: self.host = host self.id = id self.class_registry = get_service_registry() @property def host_ip(self): return self.host.config.address def __eq__(self, other): return self.host.config.address == other.host.config.address def __hash__(self): return id(self.host.config.address) def __str__(self): return self.host.config.address def __repr__(self) -> str: return self.host.config.address # for backward compatibility and to not touch other codebase too much @property def storage_node(self) -> StorageNode: return self.service(StorageNode) # for backward compatibility and to not touch other codebase too much @property def ir_node(self) -> InnerRing: return self.service(InnerRing) # for backward compatibility and to not touch other codebase too much @property def morph_chain(self) -> MorphChain: return self.service(MorphChain) # for backward compatibility and to not touch other codebase too much @property def http_gate(self) -> HTTPGate: return self.service(HTTPGate) # for backward compatibility and to not touch other codebase too much @property def s3_gate(self) -> S3Gate: return self.service(S3Gate) def get_config(self, config_file_path: str) -> dict: shell = self.host.get_shell() result = shell.exec(f"cat {config_file_path}") config_text = result.stdout config = yaml.safe_load(config_text) return config def save_config(self, new_config: dict, config_file_path: str) -> None: shell = self.host.get_shell() config_str = yaml.dump(new_config) shell.exec(f"echo '{config_str}' | sudo tee {config_file_path}") def service(self, service_type: type[ServiceClass]) -> ServiceClass: """ Get a service cluster node of specified type. Args: service_type: type of the service which should be returned, for frostfs it can be StorageNode, S3Gate, HttpGate, MorphChain and InnerRing. Returns: service of service_type class. """ service_entry = self.class_registry.get_entry(service_type) service_name = service_entry["hosting_service_name"] pattern = f"{service_name}{self.id:02}" config = self.host.get_service_config(pattern) return service_type( self.id, config.name, self.host, ) def get_list_of_services(self) -> list[str]: return [ config.attributes[ConfigAttributes.SERVICE_NAME] for config in self.host.config.services ] def get_all_interfaces(self) -> dict[str, str]: return self.host.config.interfaces def get_interface(self, interface: Interfaces) -> str: return self.host.config.interfaces[interface.value] def get_data_interfaces(self) -> list[str]: return [ ip_address for name_interface, ip_address in self.host.config.interfaces.items() if "data" in name_interface ] def get_data_interface(self, search_interface: str) -> list[str]: return [ self.host.config.interfaces[interface] for interface in self.host.config.interfaces.keys() if search_interface == interface ] def get_internal_interfaces(self) -> list[str]: return [ ip_address for name_interface, ip_address in self.host.config.interfaces.items() if "internal" in name_interface ] def get_internal_interface(self, search_internal: str) -> list[str]: return [ self.host.config.interfaces[interface] for interface in self.host.config.interfaces.keys() if search_internal == interface ] class Cluster: """ This class represents a Cluster object for the whole storage based on provided hosting """ default_rpc_endpoint: str default_s3_gate_endpoint: str default_http_gate_endpoint: str default_http_hostname: str default_s3_hostname: str def __init__(self, hosting: Hosting) -> None: self._hosting = hosting self.class_registry = get_service_registry() self.default_rpc_endpoint = self.services(StorageNode)[0].get_rpc_endpoint() self.default_s3_gate_endpoint = self.services(S3Gate)[0].get_endpoint() self.default_http_gate_endpoint = self.services(HTTPGate)[0].get_endpoint() self.default_http_hostname = self.services(StorageNode)[0].get_http_hostname() self.default_s3_hostname = self.services(StorageNode)[0].get_s3_hostname() @property def hosts(self) -> list[Host]: """ Returns list of Hosts """ return self._hosting.hosts # for backward compatibility and to not touch other codebase too much @property def storage_nodes(self) -> list[StorageNode]: return self.services(StorageNode) # for backward compatibility and to not touch other codebase too much @property def ir_nodes(self) -> list[InnerRing]: return self.services(InnerRing) # for backward compatibility and to not touch other codebase too much @property def s3_gates(self) -> list[S3Gate]: return self.services(S3Gate) @property def http_gates(self) -> list[HTTPGate]: return self.services(HTTPGate) @property def morph_chain(self) -> list[MorphChain]: return self.services(MorphChain) def services(self, service_type: type[ServiceClass]) -> list[ServiceClass]: """ Get all services in a cluster of specified type. Args: service_type: type of the services which should be returned, for frostfs it can be StorageNode, S3Gate, HttpGate, MorphChain and InnerRing. Returns: list of services of service_type class. """ service = self.class_registry.get_entry(service_type) service_name = service["hosting_service_name"] cls: type[NodeBase] = service["cls"] pattern = f"{service_name}\d*$" configs = self.hosting.find_service_configs(pattern) found_nodes = [] for config in configs: # config.name is something like s3-gate01. Cut last digits to know service type service_type = re.findall(".*\D", config.name)[0] # exclude unsupported services if service_type != service_name: continue found_nodes.append( cls( self._get_id(config.name), config.name, self.hosting.get_host_by_service(config.name), ) ) return found_nodes @property def cluster_nodes(self) -> list[ClusterNode]: """ Returns list of Cluster Nodes """ return [ClusterNode(host, id) for id, host in enumerate(self.hosts, start=1)] @property def hosting(self) -> Hosting: return self._hosting def _create_wallet_config(self, service: ServiceConfig) -> None: wallet_path = service.attributes[ConfigAttributes.LOCAL_WALLET_CONFIG] wallet_password = service.attributes[ConfigAttributes.WALLET_PASSWORD] with open(wallet_path, "w") as file: yaml.dump({"password": wallet_password}, file) def create_wallet_configs(self, hosting: Hosting) -> None: configs = hosting.find_service_configs(".*") for config in configs: if ConfigAttributes.LOCAL_WALLET_CONFIG in config.attributes: self._create_wallet_config(config) def is_local_devenv(self) -> bool: if len(self.hosting.hosts) == 1: host = self.hosting.hosts[0] if host.config.address == "localhost" and host.config.plugin_name == "docker": return True return False def _get_id(self, node_name) -> int: pattern = "\d*$" matches = re.search(pattern, node_name) if not matches: raise RuntimeError(f"Can't parse Id of the node {node_name}") return int(matches.group()) def get_random_storage_rpc_endpoint(self) -> str: return random.choice(self.get_storage_rpc_endpoints()) def get_storage_rpc_endpoints(self) -> list[str]: nodes: list[StorageNode] = self.services(StorageNode) return [node.get_rpc_endpoint() for node in nodes] def get_morph_endpoints(self) -> list[str]: nodes: list[MorphChain] = self.services(MorphChain) return [node.get_endpoint() for node in nodes] def get_nodes_by_ip(self, ips: list[str]) -> list[ClusterNode]: cluster_nodes = [ node for node in self.cluster_nodes if URL(node.morph_chain.get_endpoint()).host in ips ] with reporter.step(f"Return cluster nodes - {cluster_nodes}"): return cluster_nodes