import logging import random import re import time from dataclasses import dataclass from time import sleep from typing import Optional from frostfs_testlib import reporter from frostfs_testlib.cli import FrostfsAdm, FrostfsCli from frostfs_testlib.resources.cli import FROSTFS_ADM_CONFIG_PATH, FROSTFS_ADM_EXEC, FROSTFS_CLI_EXEC from frostfs_testlib.resources.common import MORPH_BLOCK_TIME from frostfs_testlib.shell import Shell from frostfs_testlib.steps.epoch import tick_epoch, wait_for_epochs_align from frostfs_testlib.storage.cluster import Cluster, StorageNode from frostfs_testlib.utils import datetime_utils logger = logging.getLogger("NeoLogger") @dataclass class HealthStatus: network_status: Optional[str] = None health_status: Optional[str] = None @staticmethod def from_stdout(output: str) -> "HealthStatus": network, health = None, None for line in output.split("\n"): if "Network status" in line: network = line.split(":")[-1].strip() if "Health status" in line: health = line.split(":")[-1].strip() return HealthStatus(network, health) @reporter.step("Get Locode from random storage node") def get_locode_from_random_node(cluster: Cluster) -> str: node = random.choice(cluster.services(StorageNode)) locode = node.get_un_locode() logger.info(f"Chosen '{locode}' locode from node {node}") return locode @reporter.step("Healthcheck for storage node {node}") def storage_node_healthcheck(node: StorageNode) -> HealthStatus: """ The function returns storage node's health status. Args: node: storage node for which health status should be retrieved. Returns: health status as HealthStatus object. """ host = node.host service_config = host.get_service_config(node.name) wallet_path = service_config.attributes["wallet_path"] wallet_password = service_config.attributes["wallet_password"] control_endpoint = service_config.attributes["control_endpoint"] shell = host.get_shell() wallet_config_path = f"/tmp/{node.name}-config.yaml" wallet_config = f'wallet: {wallet_path}\npassword: "{wallet_password}"' shell.exec(f"echo '{wallet_config}' > {wallet_config_path}") cli_config = host.get_cli_config("frostfs-cli") cli = FrostfsCli(shell, cli_config.exec_path, wallet_config_path) result = cli.control.healthcheck(control_endpoint) return HealthStatus.from_stdout(result.stdout) @reporter.step("Set status for {node}") def storage_node_set_status(node: StorageNode, status: str, retries: int = 0) -> None: """ The function sets particular status for given node. Args: node: node for which status should be set. status: online or offline. retries (optional, int): number of retry attempts if it didn't work from the first time """ host = node.host service_config = host.get_service_config(node.name) wallet_path = service_config.attributes["wallet_path"] wallet_password = service_config.attributes["wallet_password"] control_endpoint = service_config.attributes["control_endpoint"] shell = host.get_shell() wallet_config_path = f"/tmp/{node.name}-config.yaml" wallet_config = f'wallet: {wallet_path}\npassword: "{wallet_password}"' shell.exec(f"echo '{wallet_config}' > {wallet_config_path}") cli_config = host.get_cli_config("frostfs-cli") cli = FrostfsCli(shell, cli_config.exec_path, wallet_config_path) cli.control.set_status(control_endpoint, status) @reporter.step("Get netmap snapshot") def get_netmap_snapshot(node: StorageNode, shell: Shell) -> str: """ The function returns string representation of netmap snapshot. Args: node: node from which netmap snapshot should be requested. Returns: string representation of netmap """ storage_wallet_config = node.get_wallet_config_path() storage_wallet_path = node.get_wallet_path() cli = FrostfsCli(shell, FROSTFS_CLI_EXEC, config_file=storage_wallet_config) return cli.netmap.snapshot( rpc_endpoint=node.get_rpc_endpoint(), wallet=storage_wallet_path, ).stdout @reporter.step("Get shard list for {node}") def node_shard_list(node: StorageNode, json: Optional[bool] = None) -> list[str]: """ The function returns list of shards for specified storage node. Args: node: node for which shards should be returned. Returns: list of shards. """ host = node.host service_config = host.get_service_config(node.name) wallet_path = service_config.attributes["wallet_path"] wallet_password = service_config.attributes["wallet_password"] control_endpoint = service_config.attributes["control_endpoint"] shell = host.get_shell() wallet_config_path = f"/tmp/{node.name}-config.yaml" wallet_config = f'wallet: {wallet_path}\npassword: "{wallet_password}"' shell.exec(f"echo '{wallet_config}' > {wallet_config_path}") cli_config = host.get_cli_config("frostfs-cli") cli = FrostfsCli(shell, cli_config.exec_path, wallet_config_path) result = cli.shards.list(endpoint=control_endpoint, json_mode=json) return re.findall(r"Shard (.*):", result.stdout) @reporter.step("Shard set for {node}") def node_shard_set_mode(node: StorageNode, shard: list[str], mode: str) -> None: """ The function sets mode for specified shard. Args: node: node on which shard mode should be set. """ host = node.host service_config = host.get_service_config(node.name) wallet_path = service_config.attributes["wallet_path"] wallet_password = service_config.attributes["wallet_password"] control_endpoint = service_config.attributes["control_endpoint"] shell = host.get_shell() wallet_config_path = f"/tmp/{node.name}-config.yaml" wallet_config = f'wallet: {wallet_path}\npassword: "{wallet_password}"' shell.exec(f"echo '{wallet_config}' > {wallet_config_path}") cli_config = host.get_cli_config("frostfs-cli") cli = FrostfsCli(shell, cli_config.exec_path, wallet_config_path) cli.shards.set_mode(endpoint=control_endpoint, mode=mode, id=shard) @reporter.step("Drop object from {node}") def drop_object(node: StorageNode, cid: str, oid: str) -> None: """ The function drops object from specified node. Args: node: node from which object should be dropped. """ host = node.host service_config = host.get_service_config(node.name) wallet_path = service_config.attributes["wallet_path"] wallet_password = service_config.attributes["wallet_password"] control_endpoint = service_config.attributes["control_endpoint"] shell = host.get_shell() wallet_config_path = f"/tmp/{node.name}-config.yaml" wallet_config = f'wallet: {wallet_path}\npassword: "{wallet_password}"' shell.exec(f"echo '{wallet_config}' > {wallet_config_path}") cli_config = host.get_cli_config("frostfs-cli") cli = FrostfsCli(shell, cli_config.exec_path, wallet_config_path) objects = f"{cid}/{oid}" cli.control.drop_objects(control_endpoint, objects) @reporter.step("Delete data from host for node {node}") def delete_node_data(node: StorageNode) -> None: node.stop_service() node.host.delete_storage_node_data(node.name) time.sleep(datetime_utils.parse_time(MORPH_BLOCK_TIME)) @reporter.step("Exclude node {node_to_exclude} from network map") def exclude_node_from_network_map( node_to_exclude: StorageNode, alive_node: StorageNode, shell: Shell, cluster: Cluster, ) -> None: node_netmap_key = node_to_exclude.get_wallet_public_key() storage_node_set_status(node_to_exclude, status="offline") time.sleep(datetime_utils.parse_time(MORPH_BLOCK_TIME)) tick_epoch(shell, cluster) wait_for_epochs_align(shell, cluster) snapshot = get_netmap_snapshot(node=alive_node, shell=shell) assert node_netmap_key not in snapshot, f"Expected node with key {node_netmap_key} to be absent in network map" @reporter.step("Include node {node_to_include} into network map") def include_node_to_network_map( node_to_include: StorageNode, alive_node: StorageNode, shell: Shell, cluster: Cluster, ) -> None: storage_node_set_status(node_to_include, status="online") # Per suggestion of @fyrchik we need to wait for 2 blocks after we set status and after tick epoch. # First sleep can be omitted after https://git.frostfs.info/TrueCloudLab/frostfs-node/issues/60 complete. time.sleep(datetime_utils.parse_time(MORPH_BLOCK_TIME) * 2) tick_epoch(shell, cluster) time.sleep(datetime_utils.parse_time(MORPH_BLOCK_TIME) * 2) check_node_in_map(node_to_include, shell, alive_node) @reporter.step("Check node {node} in network map") def check_node_in_map(node: StorageNode, shell: Shell, alive_node: Optional[StorageNode] = None) -> None: alive_node = alive_node or node node_netmap_key = node.get_wallet_public_key() logger.info(f"Node ({node.label}) netmap key: {node_netmap_key}") snapshot = get_netmap_snapshot(alive_node, shell) assert node_netmap_key in snapshot, f"Expected node with key {node_netmap_key} to be in network map" @reporter.step("Check node {node} NOT in network map") def check_node_not_in_map(node: StorageNode, shell: Shell, alive_node: Optional[StorageNode] = None) -> None: alive_node = alive_node or node node_netmap_key = node.get_wallet_public_key() logger.info(f"Node ({node.label}) netmap key: {node_netmap_key}") snapshot = get_netmap_snapshot(alive_node, shell) assert node_netmap_key not in snapshot, f"Expected node with key {node_netmap_key} to be NOT in network map" @reporter.step("Wait for node {node} is ready") def wait_for_node_to_be_ready(node: StorageNode) -> None: timeout, attempts = 60, 15 for _ in range(attempts): try: health_check = storage_node_healthcheck(node) if health_check.health_status == "READY": return except Exception as err: logger.warning(f"Node {node} is not ready:\n{err}") sleep(timeout) raise AssertionError(f"Node {node} hasn't gone to the READY state after {timeout * attempts} seconds") @reporter.step("Remove nodes from network map trough cli-adm morph command") def remove_nodes_from_map_morph( shell: Shell, cluster: Cluster, remove_nodes: list[StorageNode], alive_node: Optional[StorageNode] = None, ): """ Move node to the Offline state in the candidates list and tick an epoch to update the netmap using frostfs-adm Args: shell: local shell to make queries about current epoch. Remote shell will be used to tick new one cluster: cluster instance under test alive_node: node to send requests to (first node in cluster by default) remove_nodes: list of nodes which would be removed from map """ alive_node = alive_node if alive_node else remove_nodes[0] remote_shell = alive_node.host.get_shell() node_netmap_keys = list(map(StorageNode.get_wallet_public_key, remove_nodes)) logger.info(f"Nodes netmap keys are: {' '.join(node_netmap_keys)}") if FROSTFS_ADM_EXEC and FROSTFS_ADM_CONFIG_PATH: # If frostfs-adm is available, then we tick epoch with it (to be consistent with UAT tests) frostfsadm = FrostfsAdm( shell=remote_shell, frostfs_adm_exec_path=FROSTFS_ADM_EXEC, config_file=FROSTFS_ADM_CONFIG_PATH, ) frostfsadm.morph.remove_nodes(node_netmap_keys)