Update usage of CLI for node management #209

Merged
3 changed files with 150 additions and 194 deletions

View file

@ -10,7 +10,7 @@ from frostfs_testlib import reporter
from frostfs_testlib.cli import FrostfsCli
from frostfs_testlib.cli.netmap_parser import NetmapParser
from frostfs_testlib.resources.cli import FROSTFS_CLI_EXEC
from frostfs_testlib.resources.common import DEFAULT_WALLET_CONFIG, MORPH_BLOCK_TIME
from frostfs_testlib.resources.common import MORPH_BLOCK_TIME
from frostfs_testlib.resources.error_patterns import OBJECT_NOT_FOUND
from frostfs_testlib.resources.wellknown_acl import PUBLIC_ACL
from frostfs_testlib.steps.cli.container import create_container, search_nodes_with_container
@ -36,10 +36,10 @@ from frostfs_testlib.steps.node_management import (
wait_for_node_to_be_ready,
)
from frostfs_testlib.steps.storage_policy import get_nodes_with_object
from frostfs_testlib.storage.cluster import ClusterNode, StorageNode
from frostfs_testlib.storage.cluster import Cluster, ClusterNode, StorageNode
from frostfs_testlib.storage.controllers import ClusterStateController
from frostfs_testlib.storage.dataclasses.object_size import ObjectSize
from frostfs_testlib.storage.dataclasses.storage_object_info import ModeNode
from frostfs_testlib.storage.dataclasses.storage_object_info import NodeStatus
from frostfs_testlib.storage.dataclasses.wallet import WalletInfo
from frostfs_testlib.testing.cluster_test_base import ClusterTestBase
from frostfs_testlib.utils import datetime_utils, string_utils
@ -333,8 +333,6 @@ class TestNodeManagement(ClusterTestBase):
@pytest.mark.maintenance
class TestMaintenanceMode(ClusterTestBase):
change_node: ClusterNode = None
@pytest.fixture()
@allure.title("Init Frostfs CLI remote")
def frostfs_cli_remote(self, node_under_test: ClusterNode) -> FrostfsCli:
@ -352,34 +350,42 @@ class TestMaintenanceMode(ClusterTestBase):
@pytest.fixture()
@allure.title("Init Frostfs CLI remote")
def frostfs_cli(self) -> FrostfsCli:
cli = FrostfsCli(shell=self.shell, frostfs_cli_exec_path=FROSTFS_CLI_EXEC, config_file=DEFAULT_WALLET_CONFIG)
def frostfs_cli(self, default_wallet: WalletInfo) -> FrostfsCli:
cli = FrostfsCli(
shell=self.shell, frostfs_cli_exec_path=FROSTFS_CLI_EXEC, config_file=default_wallet.config_path
)
return cli
@pytest.fixture()
def restore_online_mode_node(self, cluster_state_controller: ClusterStateController, default_wallet: WalletInfo):
yield
cluster_state_controller.set_mode_node(cluster_node=self.change_node, wallet=default_wallet, status="online")
def restore_node_status(self, cluster_state_controller: ClusterStateController, default_wallet: WalletInfo):
nodes_to_restore = []
yield nodes_to_restore
for node_to_restore in nodes_to_restore:
cluster_state_controller.set_node_status(node_to_restore, default_wallet, NodeStatus.ONLINE)
self.tick_epoch(wait_block=2)
def basic_operations(self, wallet, cid, oid, shell, endpoint, matchs, object_size):
file_path = generate_file(object_size)
default_kw = {"wallet": wallet, "cid": cid, "shell": shell, "endpoint": endpoint}
operations = {
get_object: {"oid": oid},
search_object: {},
delete_object: {"oid": oid},
put_object: {"path": file_path},
}
for index, operation, kw in enumerate(operations.items()):
with reporter.step(f"Run {operation.__name__} object, waiting response - {matchs[index]}"):
default_kw.update(kw)
if operation == search_object and "Found" in matchs[index]:
operation(**default_kw)
continue
with pytest.raises(RuntimeError, match=matchs[index]):
operation(**default_kw)
os.remove(file_path)
def check_node_status(
self, expected_status: NodeStatus, node_under_test: ClusterNode, frostfs_cli: FrostfsCli, rpc_endpoint: str
):
netmap = frostfs_cli.netmap.snapshot(rpc_endpoint).stdout
all_snapshots = NetmapParser.snapshot_all_nodes(netmap)
node_snapshot = [snapshot for snapshot in all_snapshots if node_under_test.host_ip == snapshot.node]
if expected_status == NodeStatus.OFFLINE and not node_snapshot:
assert (
node_under_test.host_ip not in netmap
), f"{node_under_test} status should be {expected_status}. See netmap:\n{netmap}"
return
assert (
node_snapshot
), f"{node_under_test} status should be {expected_status}, but was not in netmap. See netmap:\n{netmap}"
node_snapshot = node_snapshot[0]
assert (
expected_status == node_snapshot.node_status
), f"{node_under_test} status should be {expected_status}, but was {node_snapshot.node_status}. See netmap:\n{netmap}"
@allure.title("Test of basic node operations in maintenance mode")
def test_maintenance_mode(
@ -387,7 +393,7 @@ class TestMaintenanceMode(ClusterTestBase):
default_wallet: WalletInfo,
simple_object_size: ObjectSize,
cluster_state_controller: ClusterStateController,
restore_online_mode_node: None,
restore_node_status: list[ClusterNode],
):
with reporter.step("Create container and create\put object"):
cid = create_container(
@ -396,48 +402,59 @@ class TestMaintenanceMode(ClusterTestBase):
endpoint=self.cluster.default_rpc_endpoint,
rule="REP 1 CBF 1",
)
node = search_nodes_with_container(
nodes_with_container = search_nodes_with_container(
wallet=default_wallet,
cid=cid,
shell=self.shell,
endpoint=self.cluster.default_rpc_endpoint,
cluster=self.cluster,
)
self.change_node = node[0]
node_under_test = nodes_with_container[0]
endpoint = node_under_test.storage_node.get_rpc_endpoint()
restore_node_status.append(node_under_test)
file_path = generate_file(simple_object_size.value)
oid = put_object(
wallet=default_wallet,
path=file_path,
cid=cid,
shell=self.shell,
endpoint=self.change_node.storage_node.get_rpc_endpoint(),
)
with reporter.step("Enable MaintenanceModeAllowed:"):
cluster_state_controller.set_mode_node(
cluster_node=self.change_node, wallet=default_wallet, status="maintenance"
endpoint=endpoint,
)
with reporter.step("Set node status to 'maintenance'"):
cluster_state_controller.set_node_status(node_under_test, default_wallet, NodeStatus.MAINTENANCE)
other_nodes = list(set(self.cluster.cluster_nodes) - set(node))
node_and_match = {
self.change_node: ["node is under maintenance"] * 4,
other_nodes[0]: [
"object not found",
"Found 0 objects",
"object not found",
"node is under maintenance",
],
}
with reporter.step("Run basic operations"):
for cluster_node, matchs in node_and_match.items():
self.basic_operations(
wallet=default_wallet,
cid=cid,
oid=oid,
shell=self.shell,
endpoint=cluster_node.storage_node.get_rpc_endpoint(),
matchs=matchs,
object_size=simple_object_size.value,
)
node_under_maintenance_error = "node is under maintenance"
with reporter.step("Run basic operations with node in maintenance"):
with pytest.raises(RuntimeError, match=node_under_maintenance_error):
get_object(default_wallet, cid, oid, self.shell, endpoint)
with pytest.raises(RuntimeError, match=node_under_maintenance_error):
search_object(default_wallet, cid, self.shell, endpoint)
with pytest.raises(RuntimeError, match=node_under_maintenance_error):
delete_object(default_wallet, cid, oid, self.shell, endpoint)
with pytest.raises(RuntimeError, match=node_under_maintenance_error):
put_object(default_wallet, file_path, cid, self.shell, endpoint)
with reporter.step("Run basic operations with node not in maintenance"):
other_nodes = list(set(self.cluster.cluster_nodes) - set(nodes_with_container))
endpoint = other_nodes[0].storage_node.get_rpc_endpoint()
with pytest.raises(RuntimeError, match=OBJECT_NOT_FOUND):
get_object(default_wallet, cid, oid, self.shell, endpoint)
search_object(default_wallet, cid, self.shell, endpoint)
with pytest.raises(RuntimeError, match=OBJECT_NOT_FOUND):
delete_object(default_wallet, cid, oid, self.shell, endpoint)
with pytest.raises(RuntimeError, match=node_under_maintenance_error):
put_object(default_wallet, file_path, cid, self.shell, endpoint)
os.remove(file_path)
@pytest.mark.sanity
@allure.title("MAINTENANCE and OFFLINE mode transitions")
@ -447,147 +464,99 @@ class TestMaintenanceMode(ClusterTestBase):
node_under_test: ClusterNode,
default_wallet: WalletInfo,
frostfs_cli: FrostfsCli,
restore_online_mode_node: None,
restore_node_status: list[ClusterNode],
):
self.change_node = node_under_test
cluster_nodes = list(set(self.cluster.cluster_nodes) - {node_under_test})
restore_node_status.append(node_under_test)
alive_nodes = list(set(self.cluster.cluster_nodes) - {node_under_test})
alive_storage_node = alive_nodes[0].storage_node
alive_rpc_endpoint = alive_storage_node.get_rpc_endpoint()
with reporter.step("Tick epoch"):
self.tick_epochs(epochs_to_tick=2, alive_node=cluster_nodes[0].storage_node, wait_block=2)
self.tick_epochs(2, alive_storage_node, 2)
with reporter.step("Set node mode to offline"):
cluster_state_controller.set_mode_node(
cluster_node=node_under_test,
wallet=default_wallet,
status=ModeNode.OFFLINE.value,
)
with reporter.step("Set node status to 'offline'"):
cluster_state_controller.set_node_status(node_under_test, default_wallet, NodeStatus.OFFLINE)
with reporter.step("Tick epoch to update the network map"):
self.tick_epochs(epochs_to_tick=2, alive_node=cluster_nodes[0].storage_node, wait_block=2)
self.tick_epochs(2, alive_storage_node, 2)
with reporter.step("Check node mode = offline, after update the network map"):
netmap = frostfs_cli.netmap.snapshot(
rpc_endpoint=cluster_nodes[0].storage_node.get_rpc_endpoint(), wallet=default_wallet
).stdout
netmap = NetmapParser.snapshot_all_nodes(netmap)
assert node_under_test.host_ip not in [
netmap_node.node for netmap_node in netmap
], f"Node {node_under_test.host_ip} not in state offline. netmap - {netmap}"
with reporter.step("Check node status is 'offline' after update the network map"):
self.check_node_status(NodeStatus.OFFLINE, node_under_test, frostfs_cli, alive_rpc_endpoint)
with reporter.step("Restart storage service"):
cluster_state_controller.stop_storage_service(node_under_test)
cluster_state_controller.start_storage_service(node_under_test)
with reporter.step("Tick epoch after restart storage service and set mode to offline"):
self.tick_epochs(epochs_to_tick=2, alive_node=cluster_nodes[0].storage_node, wait_block=2)
with reporter.step("Tick 2 epochs"):
self.tick_epochs(2, alive_storage_node, 2)
with (reporter.step("Check node mode = online, after restart storage service")):
netmap = frostfs_cli.netmap.snapshot(
rpc_endpoint=cluster_nodes[0].storage_node.get_rpc_endpoint(), wallet=default_wallet
).stdout
node = NetmapParser.snapshot_one_node(netmap, node_under_test)
assert (
node.node_status == ModeNode.ONLINE.value.upper()
), f"{node_under_test} actual state in netmap - {netmap}"
with reporter.step("Check node status is 'online' after storage service restart"):
self.check_node_status(NodeStatus.ONLINE, node_under_test, frostfs_cli, alive_rpc_endpoint)
with reporter.step("Tick epoch"):
self.tick_epochs(epochs_to_tick=2, alive_node=cluster_nodes[0].storage_node, wait_block=2)
with reporter.step("Tick 2 epochs"):
self.tick_epochs(2, alive_storage_node, 2)
with reporter.step("Set node status to 'maintenance'"):
cluster_state_controller.set_node_status(node_under_test, default_wallet, NodeStatus.MAINTENANCE)
with reporter.step("Set node mode to maintenance"):
cluster_state_controller.set_mode_node(
cluster_node=node_under_test, wallet=default_wallet, status=ModeNode.MAINTENANCE.value
)
with reporter.step("Restart storage service"):
cluster_state_controller.stop_storage_service(node_under_test)
cluster_state_controller.start_storage_service(node_under_test)
with reporter.step("Tick epoch after restart storage service"):
self.tick_epochs(epochs_to_tick=2, alive_node=cluster_nodes[0].storage_node, wait_block=2)
with reporter.step("Tick 2 epochs"):
self.tick_epochs(2, alive_storage_node, 2)
with reporter.step("Check node mode = maintenance, after restart storage service and tick epoch"):
netmap = frostfs_cli.netmap.snapshot(
rpc_endpoint=cluster_nodes[0].storage_node.get_rpc_endpoint(), wallet=default_wallet
).stdout
node = NetmapParser.snapshot_one_node(netmap, node_under_test)
assert node == ModeNode.MAINTENANCE.value.upper(), f"{node_under_test} actual state in netmap - {netmap}"
with reporter.step("Check node staus is 'maintenance' after storage service restart"):
self.check_node_status(NodeStatus.MAINTENANCE, node_under_test, frostfs_cli, alive_rpc_endpoint)
with reporter.step("Tick epoch"):
self.tick_epochs(epochs_to_tick=2, alive_node=cluster_nodes[0].storage_node, wait_block=2)
with reporter.step("Tick 2 epochs"):
self.tick_epochs(2, alive_storage_node, 2)
with reporter.step("Set node mode to offline"):
netmap = frostfs_cli.netmap.snapshot(
rpc_endpoint=cluster_nodes[0].storage_node.get_rpc_endpoint(), wallet=default_wallet
).stdout
node = NetmapParser.snapshot_one_node(netmap, node_under_test)
assert (
node.node_status == ModeNode.OFFLINE.value.upper()
), f"{node_under_test} actual state in netmap - {netmap}"
with reporter.step("Set node status to 'offline'"):
cluster_state_controller.set_node_status(node_under_test, default_wallet, NodeStatus.OFFLINE)
with reporter.step("Stop storage service"):
cluster_state_controller.stop_storage_service(node_under_test)
with reporter.step("Tick epoch"):
self.tick_epochs(epochs_to_tick=2, alive_node=cluster_nodes[0].storage_node, wait_block=2)
with reporter.step("Tick 2 epochs"):
self.tick_epochs(2, alive_storage_node, 2)
with reporter.step("Start storage service"):
cluster_state_controller.start_storage_service(node_under_test)
with reporter.step("Check node mode = offline, after tick epoch and start storage service"):
netmap = frostfs_cli.netmap.snapshot(
rpc_endpoint=cluster_nodes[0].storage_node.get_rpc_endpoint(), wallet=default_wallet
).stdout
node = NetmapParser.snapshot_one_node(netmap, node_under_test)
assert node == ModeNode.OFFLINE.value.upper(), f"{node_under_test} actual state in netmap - {netmap}"
with reporter.step("Check node status is 'offline' after storage service start"):
self.check_node_status(NodeStatus.OFFLINE, node_under_test, frostfs_cli, alive_rpc_endpoint)
with reporter.step("Tick epoch"):
self.tick_epochs(epochs_to_tick=2, alive_node=cluster_nodes[0].storage_node, wait_block=2)
with reporter.step("Tick 2 epochs"):
self.tick_epochs(2, alive_storage_node, 2)
with reporter.step("Check node mode = online, after start storage service and tick epoch"):
netmap = frostfs_cli.netmap.snapshot(
rpc_endpoint=cluster_nodes[0].storage_node.get_rpc_endpoint(), wallet=default_wallet
).stdout
node = NetmapParser.snapshot_one_node(netmap, node_under_test)
assert (
node.node_status == ModeNode.ONLINE.value.upper()
), f"{node_under_test} actual state in netmap - {netmap}"
with reporter.step("Check node status is 'online' after storage service start"):
self.check_node_status(NodeStatus.ONLINE, node_under_test, frostfs_cli, alive_rpc_endpoint)
with reporter.step("Tick epoch"):
self.tick_epochs(epochs_to_tick=2, alive_node=cluster_nodes[0].storage_node, wait_block=2)
with reporter.step("Tick 2 epochs"):
self.tick_epochs(2, alive_storage_node, 2)
with reporter.step("Set node mode to maintenance"):
cluster_state_controller.set_mode_node(
cluster_node=node_under_test, wallet=default_wallet, status=ModeNode.MAINTENANCE.value
)
with reporter.step("Set node status to 'maintenance'"):
cluster_state_controller.set_node_status(node_under_test, default_wallet, NodeStatus.MAINTENANCE)
with reporter.step("Stop storage service"):
cluster_state_controller.stop_storage_service(node_under_test)
with reporter.step("Tick epoch"):
self.tick_epochs(epochs_to_tick=2, alive_node=cluster_nodes[0].storage_node, wait_block=2)
with reporter.step("Tick 2 epochs"):
self.tick_epochs(2, alive_storage_node, 2)
with reporter.step("Start storage service"):
cluster_state_controller.start_storage_service(node_under_test)
with reporter.step("Check node mode = maintenance"):
netmap = frostfs_cli.netmap.snapshot(
rpc_endpoint=cluster_nodes[0].storage_node.get_rpc_endpoint(), wallet=default_wallet
).stdout
node = NetmapParser.snapshot_one_node(netmap, node_under_test)
assert (
node.node_status == ModeNode.MAINTENANCE.value.upper()
), f"{node_under_test} actual state in netmap - {netmap}"
with reporter.step("Check node status is 'maintenance'"):
self.check_node_status(NodeStatus.MAINTENANCE, node_under_test, frostfs_cli, alive_rpc_endpoint)
with reporter.step("Tick epoch"):
self.tick_epochs(epochs_to_tick=2, alive_node=cluster_nodes[0].storage_node, wait_block=2)
with reporter.step("Tick 2 epochs"):
self.tick_epochs(2, alive_storage_node, 2)
with reporter.step("Check node mode = maintenance"):
netmap = frostfs_cli.netmap.snapshot(
rpc_endpoint=cluster_nodes[0].storage_node.get_rpc_endpoint(), wallet=default_wallet
).stdout
node = NetmapParser.snapshot_one_node(netmap, node_under_test)
assert (
node.node_status == ModeNode.MAINTENANCE.value.upper()
), f"{node_under_test} actual state in netmap - {netmap}"
with reporter.step("Check node status is 'maintenance'"):
self.check_node_status(NodeStatus.MAINTENANCE, node_under_test, frostfs_cli, alive_rpc_endpoint)
@allure.title("A node cannot go into maintenance if maintenance is prohibited globally in the network")
def test_maintenance_globally_forbidden(
@ -596,53 +565,40 @@ class TestMaintenanceMode(ClusterTestBase):
node_under_test: ClusterNode,
frostfs_cli_remote: FrostfsCli,
frostfs_cli: FrostfsCli,
default_wallet: WalletInfo,
restore_online_mode_node: None,
restore_node_status: list[ClusterNode],
):
self.change_node = node_under_test
restore_node_status.append(node_under_test)
control_endpoint = node_under_test.service(StorageNode).get_control_endpoint()
with reporter.step("Set MaintenanceModeAllowed = false"):
cluster_state_controller.set_maintenance_mode_allowed("false", node_under_test)
with reporter.step("Set status node - maintenance"):
with reporter.step("Set node status to 'maintenance'"):
with pytest.raises(RuntimeError, match="maintenance mode is not allowed by the network"):
frostfs_cli_remote.control.set_status(endpoint=control_endpoint, status="maintenance")
with reporter.step("Set MaintenanceModeAllowed = true"):
cluster_state_controller.set_maintenance_mode_allowed("true", node_under_test)
with reporter.step("Set status node - maintenance"):
with reporter.step("Set node status to 'maintenance'"):
output = frostfs_cli_remote.control.set_status(endpoint=control_endpoint, status="maintenance")
assert "update request successfully sent" in output.stdout, f"Response = {output}"
with reporter.step("Tick epoch"):
self.tick_epoch(wait_block=2)
with reporter.step("Check state node = maintenance "):
netmap_node = NetmapParser.snapshot_one_node(
frostfs_cli.netmap.snapshot(
rpc_endpoint=node_under_test.storage_node.get_rpc_endpoint(), wallet=default_wallet
).stdout,
node_under_test,
with reporter.step("Check node status is 'maintenance'"):
self.check_node_status(
NodeStatus.MAINTENANCE, node_under_test, frostfs_cli, node_under_test.storage_node.get_rpc_endpoint()
)
assert (
netmap_node.node_status == ModeNode.MAINTENANCE.value.upper()
), f"Node actual state - {netmap_node.node_status}, expect - {ModeNode.MAINTENANCE.value}"
with reporter.step("Set status node - online "):
with reporter.step("Set node status to 'online'"):
frostfs_cli_remote.control.set_status(endpoint=control_endpoint, status="online")
with reporter.step("Tick epoch"):
self.tick_epoch()
with reporter.step("Check state node: online"):
netmap_node = NetmapParser.snapshot_one_node(
frostfs_cli.netmap.snapshot(
rpc_endpoint=node_under_test.storage_node.get_rpc_endpoint(), wallet=default_wallet
).stdout,
node_under_test,
with reporter.step("Check node status is 'online'"):
self.check_node_status(
NodeStatus.ONLINE, node_under_test, frostfs_cli, node_under_test.storage_node.get_rpc_endpoint()
)
assert (
netmap_node.node_status == ModeNode.ONLINE.value.upper()
), f"Node actual state - {netmap_node.node_status}, expect - {ModeNode.ONLINE.value}"

View file

@ -98,18 +98,19 @@ class TestReplication(ClusterTestBase):
f"expected attribute value: {attribute_value}"
)
with reporter.step("Cleanup"):
delete_object(
wallet=default_wallet,
cid=cid,
oid=oid,
shell=client_shell,
endpoint=cluster.default_rpc_endpoint,
)
# TODO: Research why this fails
# with reporter.step("Cleanup"):
# delete_object(
# wallet=default_wallet,
# cid=cid,
# oid=oid,
# shell=client_shell,
# endpoint=cluster.default_rpc_endpoint,
# )
delete_container(
wallet=default_wallet,
cid=cid,
shell=client_shell,
endpoint=cluster.default_rpc_endpoint,
)
# delete_container(
# wallet=default_wallet,
# cid=cid,
# shell=client_shell,
# endpoint=cluster.default_rpc_endpoint,
# )

View file

@ -5,7 +5,6 @@ import pytest
from frostfs_testlib import reporter
from frostfs_testlib.cli import FrostfsCli
from frostfs_testlib.resources.cli import CLI_DEFAULT_TIMEOUT
from frostfs_testlib.resources.common import DEFAULT_WALLET_CONFIG
from frostfs_testlib.resources.wellknown_acl import EACL_PUBLIC_READ_WRITE
from frostfs_testlib.steps.cli.container import create_container, delete_container
from frostfs_testlib.steps.cli.object import delete_object, get_object, get_object_nodes, put_object
@ -83,7 +82,7 @@ class TestControlShard(ClusterTestBase):
cli_config = node.host.get_cli_config("frostfs-cli")
cli = FrostfsCli(node.host.get_shell(), cli_config.exec_path, DEFAULT_WALLET_CONFIG)
cli = FrostfsCli(node.host.get_shell(), cli_config.exec_path)
result = cli.shards.list(
endpoint=control_endpoint,
wallet=wallet_path,