import yaml from frostfs_testlib.blockchain import RPCClient from frostfs_testlib.storage.constants import ConfigAttributes from frostfs_testlib.storage.dataclasses.node_base import NodeBase from frostfs_testlib.storage.dataclasses.shard import Shard class InnerRing(NodeBase): """ Class represents inner ring node in a cluster Inner ring node is not always the same as physical host (or physical node, if you will): It can be service running in a container or on physical host For testing perspective, it's not relevant how it is actually running, since frostfs network will still treat it as "node" """ def service_healthcheck(self) -> bool: health_metric = "frostfs_ir_ir_health" output = self.host.get_shell().exec(f"curl -s localhost:6662 | grep {health_metric} | sed 1,2d").stdout return health_metric in output def get_netmap_cleaner_threshold(self) -> str: config_file = self.get_remote_config_path() contents = self.host.get_shell().exec(f"cat {config_file}").stdout config = yaml.safe_load(contents) value = config["netmap_cleaner"]["threshold"] return value class S3Gate(NodeBase): """ Class represents S3 gateway in a cluster """ def get_endpoint(self) -> str: return self._get_attribute(ConfigAttributes.ENDPOINT_DATA_0) def get_all_endpoints(self) -> list[str]: return [ self._get_attribute(ConfigAttributes.ENDPOINT_DATA_0), self._get_attribute(ConfigAttributes.ENDPOINT_DATA_1), ] def service_healthcheck(self) -> bool: health_metric = "frostfs_s3_gw_state_health" output = self.host.get_shell().exec(f"curl -s localhost:8086 | grep {health_metric} | sed 1,2d").stdout return health_metric in output @property def label(self) -> str: return f"{self.name}: {self.get_endpoint()}" class HTTPGate(NodeBase): """ Class represents HTTP gateway in a cluster """ def get_endpoint(self) -> str: return self._get_attribute(ConfigAttributes.ENDPOINT_DATA_0) def service_healthcheck(self) -> bool: health_metric = "frostfs_http_gw_state_health" output = self.host.get_shell().exec(f"curl -s localhost:5662 | grep {health_metric} | sed 1,2d").stdout return health_metric in output @property def label(self) -> str: return f"{self.name}: {self.get_endpoint()}" class MorphChain(NodeBase): """ Class represents side-chain aka morph-chain consensus node in a cluster Consensus node is not always the same as physical host (or physical node, if you will): It can be service running in a container or on physical host For testing perspective, it's not relevant how it is actually running, since frostfs network will still treat it as "node" """ rpc_client: RPCClient def construct(self): self.rpc_client = RPCClient(self.get_endpoint()) def get_endpoint(self) -> str: return self._get_attribute(ConfigAttributes.ENDPOINT_INTERNAL) def service_healthcheck(self) -> bool: # TODO Rework in 1.3 Release when metrics for each service will be available return True @property def label(self) -> str: return f"{self.name}: {self.get_endpoint()}" def get_http_endpoint(self) -> str: return self._get_attribute("http_endpoint") class StorageNode(NodeBase): """ Class represents storage node in a storage cluster Storage node is not always the same as physical host: It can be service running in a container or on physical host (or physical node, if you will): For testing perspective, it's not relevant how it is actually running, since frostfs network will still treat it as "node" """ def get_rpc_endpoint(self) -> str: return self._get_attribute(ConfigAttributes.ENDPOINT_DATA_0) def get_all_rpc_endpoint(self) -> list[str]: return [ self._get_attribute(ConfigAttributes.ENDPOINT_DATA_0), self._get_attribute(ConfigAttributes.ENDPOINT_DATA_1), ] def service_healthcheck(self) -> bool: health_metric = "frostfs_node_state_health" output = self.host.get_shell().exec(f"curl -s localhost:6672 | grep {health_metric} | sed 1,2d").stdout return health_metric in output # TODO: Deprecated. Use new approach with config def get_shard_config_path(self) -> str: return self._get_attribute(ConfigAttributes.SHARD_CONFIG_PATH) # TODO: Deprecated. Use new approach with config def get_shards_config(self) -> tuple[str, dict]: return self.get_config(self.get_shard_config_path()) def get_shards(self) -> list[Shard]: shards = self.config.get("storage:shard") if not shards: raise RuntimeError(f"Cannot get shards information for {self.name} on {self.host.config.address}") if "default" in shards: shards.pop("default") return [Shard.from_object(shard) for shard in shards.values()] def get_control_endpoint(self) -> str: return self._get_attribute(ConfigAttributes.CONTROL_ENDPOINT) def get_un_locode(self): return self._get_attribute(ConfigAttributes.UN_LOCODE) def get_data_directory(self) -> str: return self.host.get_data_directory(self.name) def delete_blobovnicza(self): self.host.delete_blobovnicza(self.name) def delete_fstree(self): self.host.delete_fstree(self.name) def delete_file(self, file_path: str) -> None: self.host.delete_file(file_path) def is_file_exist(self, file_path) -> bool: return self.host.is_file_exist(file_path) def delete_metabase(self): self.host.delete_metabase(self.name) def delete_write_cache(self): self.host.delete_write_cache(self.name) @property def label(self) -> str: return f"{self.name}: {self.get_rpc_endpoint()}"