From 0d750ed114653c05f810d35b0ab05d1104af40c2 Mon Sep 17 00:00:00 2001 From: Dmitriy Zayakin Date: Tue, 17 Sep 2024 07:52:32 +0300 Subject: [PATCH] [#293] Add in CSC methods change blockchain netmap and update CliWrapper Signed-off-by: Dmitriy Zayakin --- src/frostfs_testlib/steps/node_management.py | 40 +++++---------- .../controllers/cluster_state_controller.py | 49 ++++++++++--------- .../dataclasses/storage_object_info.py | 3 ++ .../grpc_operations/implementations/chunks.py | 10 ++-- .../implementations/container.py | 3 +- .../grpc_operations/implementations/object.py | 8 +++ .../storage/grpc_operations/interfaces.py | 7 ++- 7 files changed, 63 insertions(+), 57 deletions(-) diff --git a/src/frostfs_testlib/steps/node_management.py b/src/frostfs_testlib/steps/node_management.py index ece674b..42b1fc5 100644 --- a/src/frostfs_testlib/steps/node_management.py +++ b/src/frostfs_testlib/steps/node_management.py @@ -13,6 +13,7 @@ from frostfs_testlib.resources.common import MORPH_BLOCK_TIME from frostfs_testlib.shell import Shell from frostfs_testlib.steps.epoch import tick_epoch, wait_for_epochs_align from frostfs_testlib.storage.cluster import Cluster, StorageNode +from frostfs_testlib.testing.test_control import wait_for_success from frostfs_testlib.utils import datetime_utils logger = logging.getLogger("NeoLogger") @@ -111,10 +112,7 @@ def get_netmap_snapshot(node: StorageNode, shell: Shell) -> str: storage_wallet_path = node.get_wallet_path() cli = FrostfsCli(shell, FROSTFS_CLI_EXEC, config_file=storage_wallet_config) - return cli.netmap.snapshot( - rpc_endpoint=node.get_rpc_endpoint(), - wallet=storage_wallet_path, - ).stdout + return cli.netmap.snapshot(rpc_endpoint=node.get_rpc_endpoint(), wallet=storage_wallet_path).stdout @reporter.step("Get shard list for {node}") @@ -202,12 +200,7 @@ def delete_node_data(node: StorageNode) -> None: @reporter.step("Exclude node {node_to_exclude} from network map") -def exclude_node_from_network_map( - node_to_exclude: StorageNode, - alive_node: StorageNode, - shell: Shell, - cluster: Cluster, -) -> None: +def exclude_node_from_network_map(node_to_exclude: StorageNode, alive_node: StorageNode, shell: Shell, cluster: Cluster) -> None: node_netmap_key = node_to_exclude.get_wallet_public_key() storage_node_set_status(node_to_exclude, status="offline") @@ -221,12 +214,7 @@ def exclude_node_from_network_map( @reporter.step("Include node {node_to_include} into network map") -def include_node_to_network_map( - node_to_include: StorageNode, - alive_node: StorageNode, - shell: Shell, - cluster: Cluster, -) -> None: +def include_node_to_network_map(node_to_include: StorageNode, alive_node: StorageNode, shell: Shell, cluster: Cluster) -> None: storage_node_set_status(node_to_include, status="online") # Per suggestion of @fyrchik we need to wait for 2 blocks after we set status and after tick epoch. @@ -236,7 +224,7 @@ def include_node_to_network_map( tick_epoch(shell, cluster) time.sleep(datetime_utils.parse_time(MORPH_BLOCK_TIME) * 2) - check_node_in_map(node_to_include, shell, alive_node) + await_node_in_map(node_to_include, shell, alive_node) @reporter.step("Check node {node} in network map") @@ -250,6 +238,11 @@ def check_node_in_map(node: StorageNode, shell: Shell, alive_node: Optional[Stor assert node_netmap_key in snapshot, f"Expected node with key {node_netmap_key} to be in network map" +@wait_for_success(300, 15, title="Await node {node} in network map") +def await_node_in_map(node: StorageNode, shell: Shell, alive_node: Optional[StorageNode] = None) -> None: + check_node_in_map(node, shell, alive_node) + + @reporter.step("Check node {node} NOT in network map") def check_node_not_in_map(node: StorageNode, shell: Shell, alive_node: Optional[StorageNode] = None) -> None: alive_node = alive_node or node @@ -276,12 +269,7 @@ def wait_for_node_to_be_ready(node: StorageNode) -> None: @reporter.step("Remove nodes from network map trough cli-adm morph command") -def remove_nodes_from_map_morph( - shell: Shell, - cluster: Cluster, - remove_nodes: list[StorageNode], - alive_node: Optional[StorageNode] = None, -): +def remove_nodes_from_map_morph(shell: Shell, cluster: Cluster, remove_nodes: list[StorageNode], alive_node: Optional[StorageNode] = None): """ Move node to the Offline state in the candidates list and tick an epoch to update the netmap using frostfs-adm @@ -300,9 +288,5 @@ def remove_nodes_from_map_morph( if FROSTFS_ADM_EXEC and FROSTFS_ADM_CONFIG_PATH: # If frostfs-adm is available, then we tick epoch with it (to be consistent with UAT tests) - frostfsadm = FrostfsAdm( - shell=remote_shell, - frostfs_adm_exec_path=FROSTFS_ADM_EXEC, - config_file=FROSTFS_ADM_CONFIG_PATH, - ) + frostfsadm = FrostfsAdm(shell=remote_shell, frostfs_adm_exec_path=FROSTFS_ADM_EXEC, config_file=FROSTFS_ADM_CONFIG_PATH) frostfsadm.morph.remove_nodes(node_netmap_keys) diff --git a/src/frostfs_testlib/storage/controllers/cluster_state_controller.py b/src/frostfs_testlib/storage/controllers/cluster_state_controller.py index 7f93e40..53098b1 100644 --- a/src/frostfs_testlib/storage/controllers/cluster_state_controller.py +++ b/src/frostfs_testlib/storage/controllers/cluster_state_controller.py @@ -14,6 +14,7 @@ from frostfs_testlib.resources.cli import FROSTFS_ADM_CONFIG_PATH, FROSTFS_ADM_E from frostfs_testlib.resources.common import MORPH_BLOCK_TIME from frostfs_testlib.shell import CommandOptions, Shell, SshConnectionProvider from frostfs_testlib.steps.network import IpHelper +from frostfs_testlib.steps.node_management import include_node_to_network_map, remove_nodes_from_map_morph from frostfs_testlib.storage.cluster import Cluster, ClusterNode, S3Gate, StorageNode from frostfs_testlib.storage.controllers.disk_controller import DiskController from frostfs_testlib.storage.dataclasses.node_base import NodeBase, ServiceClass @@ -39,6 +40,7 @@ class ClusterStateController: self.stopped_nodes: list[ClusterNode] = [] self.detached_disks: dict[str, DiskController] = {} self.dropped_traffic: list[ClusterNode] = [] + self.excluded_from_netmap: list[StorageNode] = [] self.stopped_services: set[NodeBase] = set() self.cluster = cluster self.healthcheck = healthcheck @@ -307,23 +309,14 @@ class ClusterStateController: self.suspended_services = {} @reporter.step("Drop traffic to {node}, nodes - {block_nodes}") - def drop_traffic( - self, - node: ClusterNode, - wakeup_timeout: int, - name_interface: str, - block_nodes: list[ClusterNode] = None, - ) -> None: + def drop_traffic(self, node: ClusterNode, wakeup_timeout: int, name_interface: str, block_nodes: list[ClusterNode] = None) -> None: list_ip = self._parse_interfaces(block_nodes, name_interface) IpHelper.drop_input_traffic_to_node(node, list_ip) time.sleep(wakeup_timeout) self.dropped_traffic.append(node) @reporter.step("Start traffic to {node}") - def restore_traffic( - self, - node: ClusterNode, - ) -> None: + def restore_traffic(self, node: ClusterNode) -> None: IpHelper.restore_input_traffic_to_node(node=node) index = self.dropped_traffic.index(node) self.dropped_traffic.pop(index) @@ -410,9 +403,7 @@ class ClusterStateController: @reporter.step("Set MaintenanceModeAllowed - {status}") def set_maintenance_mode_allowed(self, status: str, cluster_node: ClusterNode) -> None: frostfs_adm = FrostfsAdm( - shell=cluster_node.host.get_shell(), - frostfs_adm_exec_path=FROSTFS_ADM_EXEC, - config_file=FROSTFS_ADM_CONFIG_PATH, + shell=cluster_node.host.get_shell(), frostfs_adm_exec_path=FROSTFS_ADM_EXEC, config_file=FROSTFS_ADM_CONFIG_PATH ) frostfs_adm.morph.set_config(set_key_value=f"MaintenanceModeAllowed={status}") @@ -453,6 +444,25 @@ class ClusterStateController: else: assert netmap[0].node_status == status, f"Node status should be '{status}', but was '{netmap[0].node_status}'" + def remove_node_from_netmap(self, removes_nodes: list[StorageNode]) -> None: + alive_storage = list(set(self.cluster.storage_nodes) - set(removes_nodes))[0] + remove_nodes_from_map_morph(self.shell, self.cluster, removes_nodes, alive_storage) + self.excluded_from_netmap.extend(removes_nodes) + + def include_node_to_netmap(self, include_node: StorageNode, alive_node: StorageNode): + include_node_to_network_map(include_node, alive_node, self.shell, self.cluster) + self.excluded_from_netmap.pop(self.excluded_from_netmap.index(include_node)) + + def include_all_excluded_nodes(self): + if not self.excluded_from_netmap: + return + alive_node = list(set(self.cluster.storage_nodes) - set(self.excluded_from_netmap))[0] + if not alive_node: + return + + for exclude_node in self.excluded_from_netmap.copy(): + self.include_node_to_netmap(exclude_node, alive_node) + def _get_cli( self, local_shell: Shell, local_wallet: WalletInfo, cluster_node: ClusterNode ) -> tuple[FrostfsAdm, FrostfsCli, FrostfsCli]: @@ -469,11 +479,7 @@ class ClusterStateController: frostfs_adm = FrostfsAdm(shell=shell, frostfs_adm_exec_path=FROSTFS_ADM_EXEC, config_file=FROSTFS_ADM_CONFIG_PATH) frostfs_cli = FrostfsCli(local_shell, FROSTFS_CLI_EXEC, local_wallet.config_path) - frostfs_cli_remote = FrostfsCli( - shell=shell, - frostfs_cli_exec_path=FROSTFS_CLI_EXEC, - config_file=wallet_config_path, - ) + frostfs_cli_remote = FrostfsCli(shell=shell, frostfs_cli_exec_path=FROSTFS_CLI_EXEC, config_file=wallet_config_path) return frostfs_adm, frostfs_cli, frostfs_cli_remote def _enable_date_synchronizer(self, cluster_node: ClusterNode): @@ -536,8 +542,5 @@ class ClusterStateController: @reporter.step("Get contract by domain - {domain_name}") def get_domain_contracts(self, cluster_node: ClusterNode, domain_name: str): - frostfs_adm = FrostfsAdm( - shell=cluster_node.host.get_shell(), - frostfs_adm_exec_path=FROSTFS_ADM_EXEC, - ) + frostfs_adm = FrostfsAdm(shell=cluster_node.host.get_shell(), frostfs_adm_exec_path=FROSTFS_ADM_EXEC) return frostfs_adm.morph.dump_hashes(cluster_node.morph_chain.get_http_endpoint(), domain_name).stdout diff --git a/src/frostfs_testlib/storage/dataclasses/storage_object_info.py b/src/frostfs_testlib/storage/dataclasses/storage_object_info.py index d192de5..55a8388 100644 --- a/src/frostfs_testlib/storage/dataclasses/storage_object_info.py +++ b/src/frostfs_testlib/storage/dataclasses/storage_object_info.py @@ -90,3 +90,6 @@ class Chunk: def __str__(self) -> str: return self.object_id + + def __repr__(self) -> str: + return self.object_id diff --git a/src/frostfs_testlib/storage/grpc_operations/implementations/chunks.py b/src/frostfs_testlib/storage/grpc_operations/implementations/chunks.py index d1bba9f..7f3161c 100644 --- a/src/frostfs_testlib/storage/grpc_operations/implementations/chunks.py +++ b/src/frostfs_testlib/storage/grpc_operations/implementations/chunks.py @@ -8,6 +8,7 @@ from frostfs_testlib.storage.cluster import Cluster, ClusterNode from frostfs_testlib.storage.controllers.shards_watcher import ShardsWatcher from frostfs_testlib.storage.dataclasses.storage_object_info import Chunk, NodeNetmapInfo from frostfs_testlib.storage.grpc_operations import interfaces +from frostfs_testlib.testing.test_control import wait_for_success from frostfs_testlib.utils.cli_utils import parse_netmap_output @@ -42,6 +43,7 @@ class ChunksOperations(interfaces.ChunksInterface): if cluster_node.host_ip == node_info.node: return (cluster_node, node_info) + @wait_for_success(300, 5, fail_testcase=None) @reporter.step("Search shard with chunk {chunk}") def get_shard_chunk(self, node: ClusterNode, chunk: Chunk) -> str: oid_path = f"{chunk.object_id[0]}/{chunk.object_id[1]}/{chunk.object_id[2]}/{chunk.object_id[3]}" @@ -63,7 +65,7 @@ class ChunksOperations(interfaces.ChunksInterface): address: Optional[str] = None, bearer: Optional[str] = None, generate_key: Optional[bool] = None, - trace: bool = False, + trace: bool = True, root: bool = False, verify_presence_all: bool = False, json: bool = True, @@ -86,7 +88,7 @@ class ChunksOperations(interfaces.ChunksInterface): xhdr=xhdr, timeout=timeout, ) - return self._parse_object_nodes(object_nodes.stdout.split("\n")[0])[0] + return self._parse_object_nodes(object_nodes.stdout.split("\n")[0]) @reporter.step("Get last parity chunk") def get_parity( @@ -97,7 +99,7 @@ class ChunksOperations(interfaces.ChunksInterface): bearer: Optional[str] = None, generate_key: Optional[bool] = None, oid: Optional[str] = None, - trace: bool = False, + trace: bool = True, root: bool = False, verify_presence_all: bool = False, json: bool = True, @@ -120,7 +122,7 @@ class ChunksOperations(interfaces.ChunksInterface): xhdr=xhdr, timeout=timeout, ) - return self._parse_object_nodes(object_nodes.stdout.split("\n")[0])[0] + return self._parse_object_nodes(object_nodes.stdout.split("\n")[0])[-1] @reporter.step("Get first data chunk") def get_first_data( diff --git a/src/frostfs_testlib/storage/grpc_operations/implementations/container.py b/src/frostfs_testlib/storage/grpc_operations/implementations/container.py index c8360ea..7a637d7 100644 --- a/src/frostfs_testlib/storage/grpc_operations/implementations/container.py +++ b/src/frostfs_testlib/storage/grpc_operations/implementations/container.py @@ -8,7 +8,7 @@ from frostfs_testlib.cli.frostfs_cli.cli import FrostfsCli from frostfs_testlib.plugins import load_plugin from frostfs_testlib.resources.cli import CLI_DEFAULT_TIMEOUT from frostfs_testlib.s3.interfaces import BucketContainerResolver -from frostfs_testlib.storage.cluster import ClusterNode +from frostfs_testlib.storage.cluster import Cluster, ClusterNode from frostfs_testlib.storage.grpc_operations import interfaces from frostfs_testlib.utils import json_utils @@ -266,6 +266,7 @@ class ContainerOperations(interfaces.ContainerInterface): self, endpoint: str, cid: str, + cluster: Cluster, address: Optional[str] = None, ttl: Optional[int] = None, from_file: Optional[str] = None, diff --git a/src/frostfs_testlib/storage/grpc_operations/implementations/object.py b/src/frostfs_testlib/storage/grpc_operations/implementations/object.py index 63a2922..0e14aec 100644 --- a/src/frostfs_testlib/storage/grpc_operations/implementations/object.py +++ b/src/frostfs_testlib/storage/grpc_operations/implementations/object.py @@ -509,6 +509,7 @@ class ObjectOperations(interfaces.ObjectInterface): cid: str, endpoint: str, bearer: str = "", + oid: Optional[str] = None, filters: Optional[dict] = None, expected_objects_list: Optional[list] = None, xhdr: Optional[dict] = None, @@ -516,6 +517,9 @@ class ObjectOperations(interfaces.ObjectInterface): phy: bool = False, root: bool = False, timeout: Optional[str] = CLI_DEFAULT_TIMEOUT, + address: Optional[str] = None, + generate_key: Optional[bool] = None, + ttl: Optional[int] = None, ) -> list: """ SEARCH an Object. @@ -541,11 +545,15 @@ class ObjectOperations(interfaces.ObjectInterface): rpc_endpoint=endpoint, cid=cid, bearer=bearer, + oid=oid, xhdr=xhdr, filters=[f"{filter_key} EQ {filter_val}" for filter_key, filter_val in filters.items()] if filters else None, session=session, phy=phy, root=root, + address=address, + generate_key=generate_key, + ttl=ttl, timeout=timeout, ) diff --git a/src/frostfs_testlib/storage/grpc_operations/interfaces.py b/src/frostfs_testlib/storage/grpc_operations/interfaces.py index 1947435..c293c2d 100644 --- a/src/frostfs_testlib/storage/grpc_operations/interfaces.py +++ b/src/frostfs_testlib/storage/grpc_operations/interfaces.py @@ -235,6 +235,7 @@ class ObjectInterface(ABC): cid: str, endpoint: str, bearer: str = "", + oid: Optional[str] = None, filters: Optional[dict] = None, expected_objects_list: Optional[list] = None, xhdr: Optional[dict] = None, @@ -242,6 +243,9 @@ class ObjectInterface(ABC): phy: bool = False, root: bool = False, timeout: Optional[str] = None, + address: Optional[str] = None, + generate_key: Optional[bool] = None, + ttl: Optional[int] = None, ) -> List: pass @@ -368,6 +372,7 @@ class ContainerInterface(ABC): self, endpoint: str, cid: str, + cluster: Cluster, address: Optional[str] = None, ttl: Optional[int] = None, from_file: Optional[str] = None, @@ -376,7 +381,7 @@ class ContainerInterface(ABC): xhdr: Optional[dict] = None, generate_key: Optional[bool] = None, timeout: Optional[str] = None, - ) -> List[str]: + ) -> List[ClusterNode]: """Show the nodes participating in the container in the current epoch.""" raise NotImplementedError("No implemethed method nodes")