[#293] Add in CSC methods change blockchain netmap and update CliWrapper

Signed-off-by: Dmitriy Zayakin <d.zayakin@yadro.com>
This commit is contained in:
Dmitriy Zayakin 2024-09-17 07:52:32 +03:00 committed by Dmitriy Zayakin
parent 1bee69042b
commit 0d750ed114
7 changed files with 63 additions and 57 deletions

View file

@ -13,6 +13,7 @@ from frostfs_testlib.resources.common import MORPH_BLOCK_TIME
from frostfs_testlib.shell import Shell from frostfs_testlib.shell import Shell
from frostfs_testlib.steps.epoch import tick_epoch, wait_for_epochs_align from frostfs_testlib.steps.epoch import tick_epoch, wait_for_epochs_align
from frostfs_testlib.storage.cluster import Cluster, StorageNode from frostfs_testlib.storage.cluster import Cluster, StorageNode
from frostfs_testlib.testing.test_control import wait_for_success
from frostfs_testlib.utils import datetime_utils from frostfs_testlib.utils import datetime_utils
logger = logging.getLogger("NeoLogger") logger = logging.getLogger("NeoLogger")
@ -111,10 +112,7 @@ def get_netmap_snapshot(node: StorageNode, shell: Shell) -> str:
storage_wallet_path = node.get_wallet_path() storage_wallet_path = node.get_wallet_path()
cli = FrostfsCli(shell, FROSTFS_CLI_EXEC, config_file=storage_wallet_config) cli = FrostfsCli(shell, FROSTFS_CLI_EXEC, config_file=storage_wallet_config)
return cli.netmap.snapshot( return cli.netmap.snapshot(rpc_endpoint=node.get_rpc_endpoint(), wallet=storage_wallet_path).stdout
rpc_endpoint=node.get_rpc_endpoint(),
wallet=storage_wallet_path,
).stdout
@reporter.step("Get shard list for {node}") @reporter.step("Get shard list for {node}")
@ -202,12 +200,7 @@ def delete_node_data(node: StorageNode) -> None:
@reporter.step("Exclude node {node_to_exclude} from network map") @reporter.step("Exclude node {node_to_exclude} from network map")
def exclude_node_from_network_map( def exclude_node_from_network_map(node_to_exclude: StorageNode, alive_node: StorageNode, shell: Shell, cluster: Cluster) -> None:
node_to_exclude: StorageNode,
alive_node: StorageNode,
shell: Shell,
cluster: Cluster,
) -> None:
node_netmap_key = node_to_exclude.get_wallet_public_key() node_netmap_key = node_to_exclude.get_wallet_public_key()
storage_node_set_status(node_to_exclude, status="offline") storage_node_set_status(node_to_exclude, status="offline")
@ -221,12 +214,7 @@ def exclude_node_from_network_map(
@reporter.step("Include node {node_to_include} into network map") @reporter.step("Include node {node_to_include} into network map")
def include_node_to_network_map( def include_node_to_network_map(node_to_include: StorageNode, alive_node: StorageNode, shell: Shell, cluster: Cluster) -> None:
node_to_include: StorageNode,
alive_node: StorageNode,
shell: Shell,
cluster: Cluster,
) -> None:
storage_node_set_status(node_to_include, status="online") storage_node_set_status(node_to_include, status="online")
# Per suggestion of @fyrchik we need to wait for 2 blocks after we set status and after tick epoch. # Per suggestion of @fyrchik we need to wait for 2 blocks after we set status and after tick epoch.
@ -236,7 +224,7 @@ def include_node_to_network_map(
tick_epoch(shell, cluster) tick_epoch(shell, cluster)
time.sleep(datetime_utils.parse_time(MORPH_BLOCK_TIME) * 2) time.sleep(datetime_utils.parse_time(MORPH_BLOCK_TIME) * 2)
check_node_in_map(node_to_include, shell, alive_node) await_node_in_map(node_to_include, shell, alive_node)
@reporter.step("Check node {node} in network map") @reporter.step("Check node {node} in network map")
@ -250,6 +238,11 @@ def check_node_in_map(node: StorageNode, shell: Shell, alive_node: Optional[Stor
assert node_netmap_key in snapshot, f"Expected node with key {node_netmap_key} to be in network map" assert node_netmap_key in snapshot, f"Expected node with key {node_netmap_key} to be in network map"
@wait_for_success(300, 15, title="Await node {node} in network map")
def await_node_in_map(node: StorageNode, shell: Shell, alive_node: Optional[StorageNode] = None) -> None:
check_node_in_map(node, shell, alive_node)
@reporter.step("Check node {node} NOT in network map") @reporter.step("Check node {node} NOT in network map")
def check_node_not_in_map(node: StorageNode, shell: Shell, alive_node: Optional[StorageNode] = None) -> None: def check_node_not_in_map(node: StorageNode, shell: Shell, alive_node: Optional[StorageNode] = None) -> None:
alive_node = alive_node or node alive_node = alive_node or node
@ -276,12 +269,7 @@ def wait_for_node_to_be_ready(node: StorageNode) -> None:
@reporter.step("Remove nodes from network map trough cli-adm morph command") @reporter.step("Remove nodes from network map trough cli-adm morph command")
def remove_nodes_from_map_morph( def remove_nodes_from_map_morph(shell: Shell, cluster: Cluster, remove_nodes: list[StorageNode], alive_node: Optional[StorageNode] = None):
shell: Shell,
cluster: Cluster,
remove_nodes: list[StorageNode],
alive_node: Optional[StorageNode] = None,
):
""" """
Move node to the Offline state in the candidates list and tick an epoch to update the netmap Move node to the Offline state in the candidates list and tick an epoch to update the netmap
using frostfs-adm using frostfs-adm
@ -300,9 +288,5 @@ def remove_nodes_from_map_morph(
if FROSTFS_ADM_EXEC and FROSTFS_ADM_CONFIG_PATH: if FROSTFS_ADM_EXEC and FROSTFS_ADM_CONFIG_PATH:
# If frostfs-adm is available, then we tick epoch with it (to be consistent with UAT tests) # If frostfs-adm is available, then we tick epoch with it (to be consistent with UAT tests)
frostfsadm = FrostfsAdm( frostfsadm = FrostfsAdm(shell=remote_shell, frostfs_adm_exec_path=FROSTFS_ADM_EXEC, config_file=FROSTFS_ADM_CONFIG_PATH)
shell=remote_shell,
frostfs_adm_exec_path=FROSTFS_ADM_EXEC,
config_file=FROSTFS_ADM_CONFIG_PATH,
)
frostfsadm.morph.remove_nodes(node_netmap_keys) frostfsadm.morph.remove_nodes(node_netmap_keys)

View file

@ -14,6 +14,7 @@ from frostfs_testlib.resources.cli import FROSTFS_ADM_CONFIG_PATH, FROSTFS_ADM_E
from frostfs_testlib.resources.common import MORPH_BLOCK_TIME from frostfs_testlib.resources.common import MORPH_BLOCK_TIME
from frostfs_testlib.shell import CommandOptions, Shell, SshConnectionProvider from frostfs_testlib.shell import CommandOptions, Shell, SshConnectionProvider
from frostfs_testlib.steps.network import IpHelper from frostfs_testlib.steps.network import IpHelper
from frostfs_testlib.steps.node_management import include_node_to_network_map, remove_nodes_from_map_morph
from frostfs_testlib.storage.cluster import Cluster, ClusterNode, S3Gate, StorageNode from frostfs_testlib.storage.cluster import Cluster, ClusterNode, S3Gate, StorageNode
from frostfs_testlib.storage.controllers.disk_controller import DiskController from frostfs_testlib.storage.controllers.disk_controller import DiskController
from frostfs_testlib.storage.dataclasses.node_base import NodeBase, ServiceClass from frostfs_testlib.storage.dataclasses.node_base import NodeBase, ServiceClass
@ -39,6 +40,7 @@ class ClusterStateController:
self.stopped_nodes: list[ClusterNode] = [] self.stopped_nodes: list[ClusterNode] = []
self.detached_disks: dict[str, DiskController] = {} self.detached_disks: dict[str, DiskController] = {}
self.dropped_traffic: list[ClusterNode] = [] self.dropped_traffic: list[ClusterNode] = []
self.excluded_from_netmap: list[StorageNode] = []
self.stopped_services: set[NodeBase] = set() self.stopped_services: set[NodeBase] = set()
self.cluster = cluster self.cluster = cluster
self.healthcheck = healthcheck self.healthcheck = healthcheck
@ -307,23 +309,14 @@ class ClusterStateController:
self.suspended_services = {} self.suspended_services = {}
@reporter.step("Drop traffic to {node}, nodes - {block_nodes}") @reporter.step("Drop traffic to {node}, nodes - {block_nodes}")
def drop_traffic( def drop_traffic(self, node: ClusterNode, wakeup_timeout: int, name_interface: str, block_nodes: list[ClusterNode] = None) -> None:
self,
node: ClusterNode,
wakeup_timeout: int,
name_interface: str,
block_nodes: list[ClusterNode] = None,
) -> None:
list_ip = self._parse_interfaces(block_nodes, name_interface) list_ip = self._parse_interfaces(block_nodes, name_interface)
IpHelper.drop_input_traffic_to_node(node, list_ip) IpHelper.drop_input_traffic_to_node(node, list_ip)
time.sleep(wakeup_timeout) time.sleep(wakeup_timeout)
self.dropped_traffic.append(node) self.dropped_traffic.append(node)
@reporter.step("Start traffic to {node}") @reporter.step("Start traffic to {node}")
def restore_traffic( def restore_traffic(self, node: ClusterNode) -> None:
self,
node: ClusterNode,
) -> None:
IpHelper.restore_input_traffic_to_node(node=node) IpHelper.restore_input_traffic_to_node(node=node)
index = self.dropped_traffic.index(node) index = self.dropped_traffic.index(node)
self.dropped_traffic.pop(index) self.dropped_traffic.pop(index)
@ -410,9 +403,7 @@ class ClusterStateController:
@reporter.step("Set MaintenanceModeAllowed - {status}") @reporter.step("Set MaintenanceModeAllowed - {status}")
def set_maintenance_mode_allowed(self, status: str, cluster_node: ClusterNode) -> None: def set_maintenance_mode_allowed(self, status: str, cluster_node: ClusterNode) -> None:
frostfs_adm = FrostfsAdm( frostfs_adm = FrostfsAdm(
shell=cluster_node.host.get_shell(), shell=cluster_node.host.get_shell(), frostfs_adm_exec_path=FROSTFS_ADM_EXEC, config_file=FROSTFS_ADM_CONFIG_PATH
frostfs_adm_exec_path=FROSTFS_ADM_EXEC,
config_file=FROSTFS_ADM_CONFIG_PATH,
) )
frostfs_adm.morph.set_config(set_key_value=f"MaintenanceModeAllowed={status}") frostfs_adm.morph.set_config(set_key_value=f"MaintenanceModeAllowed={status}")
@ -453,6 +444,25 @@ class ClusterStateController:
else: else:
assert netmap[0].node_status == status, f"Node status should be '{status}', but was '{netmap[0].node_status}'" assert netmap[0].node_status == status, f"Node status should be '{status}', but was '{netmap[0].node_status}'"
def remove_node_from_netmap(self, removes_nodes: list[StorageNode]) -> None:
alive_storage = list(set(self.cluster.storage_nodes) - set(removes_nodes))[0]
remove_nodes_from_map_morph(self.shell, self.cluster, removes_nodes, alive_storage)
self.excluded_from_netmap.extend(removes_nodes)
def include_node_to_netmap(self, include_node: StorageNode, alive_node: StorageNode):
include_node_to_network_map(include_node, alive_node, self.shell, self.cluster)
self.excluded_from_netmap.pop(self.excluded_from_netmap.index(include_node))
def include_all_excluded_nodes(self):
if not self.excluded_from_netmap:
return
alive_node = list(set(self.cluster.storage_nodes) - set(self.excluded_from_netmap))[0]
if not alive_node:
return
for exclude_node in self.excluded_from_netmap.copy():
self.include_node_to_netmap(exclude_node, alive_node)
def _get_cli( def _get_cli(
self, local_shell: Shell, local_wallet: WalletInfo, cluster_node: ClusterNode self, local_shell: Shell, local_wallet: WalletInfo, cluster_node: ClusterNode
) -> tuple[FrostfsAdm, FrostfsCli, FrostfsCli]: ) -> tuple[FrostfsAdm, FrostfsCli, FrostfsCli]:
@ -469,11 +479,7 @@ class ClusterStateController:
frostfs_adm = FrostfsAdm(shell=shell, frostfs_adm_exec_path=FROSTFS_ADM_EXEC, config_file=FROSTFS_ADM_CONFIG_PATH) frostfs_adm = FrostfsAdm(shell=shell, frostfs_adm_exec_path=FROSTFS_ADM_EXEC, config_file=FROSTFS_ADM_CONFIG_PATH)
frostfs_cli = FrostfsCli(local_shell, FROSTFS_CLI_EXEC, local_wallet.config_path) frostfs_cli = FrostfsCli(local_shell, FROSTFS_CLI_EXEC, local_wallet.config_path)
frostfs_cli_remote = FrostfsCli( frostfs_cli_remote = FrostfsCli(shell=shell, frostfs_cli_exec_path=FROSTFS_CLI_EXEC, config_file=wallet_config_path)
shell=shell,
frostfs_cli_exec_path=FROSTFS_CLI_EXEC,
config_file=wallet_config_path,
)
return frostfs_adm, frostfs_cli, frostfs_cli_remote return frostfs_adm, frostfs_cli, frostfs_cli_remote
def _enable_date_synchronizer(self, cluster_node: ClusterNode): def _enable_date_synchronizer(self, cluster_node: ClusterNode):
@ -536,8 +542,5 @@ class ClusterStateController:
@reporter.step("Get contract by domain - {domain_name}") @reporter.step("Get contract by domain - {domain_name}")
def get_domain_contracts(self, cluster_node: ClusterNode, domain_name: str): def get_domain_contracts(self, cluster_node: ClusterNode, domain_name: str):
frostfs_adm = FrostfsAdm( frostfs_adm = FrostfsAdm(shell=cluster_node.host.get_shell(), frostfs_adm_exec_path=FROSTFS_ADM_EXEC)
shell=cluster_node.host.get_shell(),
frostfs_adm_exec_path=FROSTFS_ADM_EXEC,
)
return frostfs_adm.morph.dump_hashes(cluster_node.morph_chain.get_http_endpoint(), domain_name).stdout return frostfs_adm.morph.dump_hashes(cluster_node.morph_chain.get_http_endpoint(), domain_name).stdout

View file

@ -90,3 +90,6 @@ class Chunk:
def __str__(self) -> str: def __str__(self) -> str:
return self.object_id return self.object_id
def __repr__(self) -> str:
return self.object_id

View file

@ -8,6 +8,7 @@ from frostfs_testlib.storage.cluster import Cluster, ClusterNode
from frostfs_testlib.storage.controllers.shards_watcher import ShardsWatcher from frostfs_testlib.storage.controllers.shards_watcher import ShardsWatcher
from frostfs_testlib.storage.dataclasses.storage_object_info import Chunk, NodeNetmapInfo from frostfs_testlib.storage.dataclasses.storage_object_info import Chunk, NodeNetmapInfo
from frostfs_testlib.storage.grpc_operations import interfaces from frostfs_testlib.storage.grpc_operations import interfaces
from frostfs_testlib.testing.test_control import wait_for_success
from frostfs_testlib.utils.cli_utils import parse_netmap_output from frostfs_testlib.utils.cli_utils import parse_netmap_output
@ -42,6 +43,7 @@ class ChunksOperations(interfaces.ChunksInterface):
if cluster_node.host_ip == node_info.node: if cluster_node.host_ip == node_info.node:
return (cluster_node, node_info) return (cluster_node, node_info)
@wait_for_success(300, 5, fail_testcase=None)
@reporter.step("Search shard with chunk {chunk}") @reporter.step("Search shard with chunk {chunk}")
def get_shard_chunk(self, node: ClusterNode, chunk: Chunk) -> str: def get_shard_chunk(self, node: ClusterNode, chunk: Chunk) -> str:
oid_path = f"{chunk.object_id[0]}/{chunk.object_id[1]}/{chunk.object_id[2]}/{chunk.object_id[3]}" oid_path = f"{chunk.object_id[0]}/{chunk.object_id[1]}/{chunk.object_id[2]}/{chunk.object_id[3]}"
@ -63,7 +65,7 @@ class ChunksOperations(interfaces.ChunksInterface):
address: Optional[str] = None, address: Optional[str] = None,
bearer: Optional[str] = None, bearer: Optional[str] = None,
generate_key: Optional[bool] = None, generate_key: Optional[bool] = None,
trace: bool = False, trace: bool = True,
root: bool = False, root: bool = False,
verify_presence_all: bool = False, verify_presence_all: bool = False,
json: bool = True, json: bool = True,
@ -86,7 +88,7 @@ class ChunksOperations(interfaces.ChunksInterface):
xhdr=xhdr, xhdr=xhdr,
timeout=timeout, timeout=timeout,
) )
return self._parse_object_nodes(object_nodes.stdout.split("\n")[0])[0] return self._parse_object_nodes(object_nodes.stdout.split("\n")[0])
@reporter.step("Get last parity chunk") @reporter.step("Get last parity chunk")
def get_parity( def get_parity(
@ -97,7 +99,7 @@ class ChunksOperations(interfaces.ChunksInterface):
bearer: Optional[str] = None, bearer: Optional[str] = None,
generate_key: Optional[bool] = None, generate_key: Optional[bool] = None,
oid: Optional[str] = None, oid: Optional[str] = None,
trace: bool = False, trace: bool = True,
root: bool = False, root: bool = False,
verify_presence_all: bool = False, verify_presence_all: bool = False,
json: bool = True, json: bool = True,
@ -120,7 +122,7 @@ class ChunksOperations(interfaces.ChunksInterface):
xhdr=xhdr, xhdr=xhdr,
timeout=timeout, timeout=timeout,
) )
return self._parse_object_nodes(object_nodes.stdout.split("\n")[0])[0] return self._parse_object_nodes(object_nodes.stdout.split("\n")[0])[-1]
@reporter.step("Get first data chunk") @reporter.step("Get first data chunk")
def get_first_data( def get_first_data(

View file

@ -8,7 +8,7 @@ from frostfs_testlib.cli.frostfs_cli.cli import FrostfsCli
from frostfs_testlib.plugins import load_plugin from frostfs_testlib.plugins import load_plugin
from frostfs_testlib.resources.cli import CLI_DEFAULT_TIMEOUT from frostfs_testlib.resources.cli import CLI_DEFAULT_TIMEOUT
from frostfs_testlib.s3.interfaces import BucketContainerResolver from frostfs_testlib.s3.interfaces import BucketContainerResolver
from frostfs_testlib.storage.cluster import ClusterNode from frostfs_testlib.storage.cluster import Cluster, ClusterNode
from frostfs_testlib.storage.grpc_operations import interfaces from frostfs_testlib.storage.grpc_operations import interfaces
from frostfs_testlib.utils import json_utils from frostfs_testlib.utils import json_utils
@ -266,6 +266,7 @@ class ContainerOperations(interfaces.ContainerInterface):
self, self,
endpoint: str, endpoint: str,
cid: str, cid: str,
cluster: Cluster,
address: Optional[str] = None, address: Optional[str] = None,
ttl: Optional[int] = None, ttl: Optional[int] = None,
from_file: Optional[str] = None, from_file: Optional[str] = None,

View file

@ -509,6 +509,7 @@ class ObjectOperations(interfaces.ObjectInterface):
cid: str, cid: str,
endpoint: str, endpoint: str,
bearer: str = "", bearer: str = "",
oid: Optional[str] = None,
filters: Optional[dict] = None, filters: Optional[dict] = None,
expected_objects_list: Optional[list] = None, expected_objects_list: Optional[list] = None,
xhdr: Optional[dict] = None, xhdr: Optional[dict] = None,
@ -516,6 +517,9 @@ class ObjectOperations(interfaces.ObjectInterface):
phy: bool = False, phy: bool = False,
root: bool = False, root: bool = False,
timeout: Optional[str] = CLI_DEFAULT_TIMEOUT, timeout: Optional[str] = CLI_DEFAULT_TIMEOUT,
address: Optional[str] = None,
generate_key: Optional[bool] = None,
ttl: Optional[int] = None,
) -> list: ) -> list:
""" """
SEARCH an Object. SEARCH an Object.
@ -541,11 +545,15 @@ class ObjectOperations(interfaces.ObjectInterface):
rpc_endpoint=endpoint, rpc_endpoint=endpoint,
cid=cid, cid=cid,
bearer=bearer, bearer=bearer,
oid=oid,
xhdr=xhdr, xhdr=xhdr,
filters=[f"{filter_key} EQ {filter_val}" for filter_key, filter_val in filters.items()] if filters else None, filters=[f"{filter_key} EQ {filter_val}" for filter_key, filter_val in filters.items()] if filters else None,
session=session, session=session,
phy=phy, phy=phy,
root=root, root=root,
address=address,
generate_key=generate_key,
ttl=ttl,
timeout=timeout, timeout=timeout,
) )

View file

@ -235,6 +235,7 @@ class ObjectInterface(ABC):
cid: str, cid: str,
endpoint: str, endpoint: str,
bearer: str = "", bearer: str = "",
oid: Optional[str] = None,
filters: Optional[dict] = None, filters: Optional[dict] = None,
expected_objects_list: Optional[list] = None, expected_objects_list: Optional[list] = None,
xhdr: Optional[dict] = None, xhdr: Optional[dict] = None,
@ -242,6 +243,9 @@ class ObjectInterface(ABC):
phy: bool = False, phy: bool = False,
root: bool = False, root: bool = False,
timeout: Optional[str] = None, timeout: Optional[str] = None,
address: Optional[str] = None,
generate_key: Optional[bool] = None,
ttl: Optional[int] = None,
) -> List: ) -> List:
pass pass
@ -368,6 +372,7 @@ class ContainerInterface(ABC):
self, self,
endpoint: str, endpoint: str,
cid: str, cid: str,
cluster: Cluster,
address: Optional[str] = None, address: Optional[str] = None,
ttl: Optional[int] = None, ttl: Optional[int] = None,
from_file: Optional[str] = None, from_file: Optional[str] = None,
@ -376,7 +381,7 @@ class ContainerInterface(ABC):
xhdr: Optional[dict] = None, xhdr: Optional[dict] = None,
generate_key: Optional[bool] = None, generate_key: Optional[bool] = None,
timeout: Optional[str] = None, timeout: Optional[str] = None,
) -> List[str]: ) -> List[ClusterNode]:
"""Show the nodes participating in the container in the current epoch.""" """Show the nodes participating in the container in the current epoch."""
raise NotImplementedError("No implemethed method nodes") raise NotImplementedError("No implemethed method nodes")