import yaml from frostfs_testlib.blockchain import RPCClient from frostfs_testlib.storage.constants import ConfigAttributes from frostfs_testlib.storage.dataclasses.node_base import NodeBase from frostfs_testlib.storage.dataclasses.shard import Shard class InnerRing(NodeBase): """ Class represents inner ring node in a cluster Inner ring node is not always the same as physical host (or physical node, if you will): It can be service running in a container or on physical host For testing perspective, it's not relevant how it is actually running, since frostfs network will still treat it as "node" """ def service_healthcheck(self) -> bool: health_metric = "frostfs_ir_ir_health" output = ( self.host.get_shell() .exec(f"curl -s localhost:6662 | grep {health_metric} | sed 1,2d") .stdout ) return health_metric in output def get_netmap_cleaner_threshold(self) -> str: config_file = self.get_remote_config_path() contents = self.host.get_shell().exec(f"cat {config_file}").stdout config = yaml.safe_load(contents) value = config["netmap_cleaner"]["threshold"] return value class S3Gate(NodeBase): """ Class represents S3 gateway in a cluster """ def get_endpoint(self) -> str: return self._get_attribute(ConfigAttributes.ENDPOINT_DATA_0) def get_all_endpoints(self) -> list[str]: return [ self._get_attribute(ConfigAttributes.ENDPOINT_DATA_0), self._get_attribute(ConfigAttributes.ENDPOINT_DATA_1), ] def service_healthcheck(self) -> bool: health_metric = "frostfs_s3_gw_state_health" output = ( self.host.get_shell() .exec(f"curl -s localhost:8086 | grep {health_metric} | sed 1,2d") .stdout ) return health_metric in output @property def label(self) -> str: return f"{self.name}: {self.get_endpoint()}" class HTTPGate(NodeBase): """ Class represents HTTP gateway in a cluster """ def get_endpoint(self) -> str: return self._get_attribute(ConfigAttributes.ENDPOINT_DATA_0) def service_healthcheck(self) -> bool: health_metric = "frostfs_http_gw_state_health" output = ( self.host.get_shell() .exec(f"curl -s localhost:5662 | grep {health_metric} | sed 1,2d") .stdout ) return health_metric in output @property def label(self) -> str: return f"{self.name}: {self.get_endpoint()}" class MorphChain(NodeBase): """ Class represents side-chain aka morph-chain consensus node in a cluster Consensus node is not always the same as physical host (or physical node, if you will): It can be service running in a container or on physical host For testing perspective, it's not relevant how it is actually running, since frostfs network will still treat it as "node" """ rpc_client: RPCClient def construct(self): self.rpc_client = RPCClient(self.get_endpoint()) def get_endpoint(self) -> str: return self._get_attribute(ConfigAttributes.ENDPOINT_INTERNAL) def service_healthcheck(self) -> bool: # TODO Rework in 1.3 Release when metrics for each service will be available return True @property def label(self) -> str: return f"{self.name}: {self.get_endpoint()}" def get_http_endpoint(self) -> str: return self._get_attribute("http_endpoint") class StorageNode(NodeBase): """ Class represents storage node in a storage cluster Storage node is not always the same as physical host: It can be service running in a container or on physical host (or physical node, if you will): For testing perspective, it's not relevant how it is actually running, since frostfs network will still treat it as "node" """ def get_rpc_endpoint(self) -> str: return self._get_attribute(ConfigAttributes.ENDPOINT_DATA_0) def get_all_rpc_endpoint(self) -> list[str]: return [ self._get_attribute(ConfigAttributes.ENDPOINT_DATA_0), self._get_attribute(ConfigAttributes.ENDPOINT_DATA_1), ] def service_healthcheck(self) -> bool: health_metric = "frostfs_node_state_health" output = ( self.host.get_shell() .exec(f"curl -s localhost:6672 | grep {health_metric} | sed 1,2d") .stdout ) return health_metric in output def get_shard_config_path(self) -> str: return self._get_attribute(ConfigAttributes.SHARD_CONFIG_PATH) def get_shards_config(self) -> tuple[str, dict]: return self.get_config(self.get_shard_config_path()) def get_shards(self) -> list[Shard]: config = self.get_shards_config()[1] config["storage"]["shard"].pop("default") return [Shard.from_object(shard) for shard in config["storage"]["shard"].values()] def get_shards_from_env(self) -> list[Shard]: config = self.get_shards_config()[1] configObj = ConfigObj(StringIO(config)) pattern = f"{SHARD_PREFIX}\d*" num_shards = len(set(re.findall(pattern, self.get_shards_config()))) return [Shard.from_config_object(configObj, shard_id) for shard_id in range(num_shards)] def get_control_endpoint(self) -> str: return self._get_attribute(ConfigAttributes.CONTROL_ENDPOINT) def get_un_locode(self): return self._get_attribute(ConfigAttributes.UN_LOCODE) def get_data_directory(self) -> str: return self.host.get_data_directory(self.name) def get_storage_config(self) -> str: return self.host.get_storage_config(self.name) def get_http_hostname(self) -> str: return self._get_attribute(ConfigAttributes.HTTP_HOSTNAME) def get_s3_hostname(self) -> str: return self._get_attribute(ConfigAttributes.S3_HOSTNAME) def delete_blobovnicza(self): self.host.delete_blobovnicza(self.name) def delete_fstree(self): self.host.delete_fstree(self.name) def delete_file(self, file_path: str) -> None: self.host.delete_file(file_path) def is_file_exist(self, file_path) -> bool: return self.host.is_file_exist(file_path) def delete_metabase(self): self.host.delete_metabase(self.name) def delete_write_cache(self): self.host.delete_write_cache(self.name) @property def label(self) -> str: return f"{self.name}: {self.get_rpc_endpoint()}"