frostfs-testcases/pytest_tests/testsuites/management/test_node_management.py

535 lines
23 KiB
Python
Raw Permalink Normal View History

import logging
import random
from time import sleep
from typing import Callable, Optional, Tuple
import allure
import pytest
from frostfs_testlib import reporter
from frostfs_testlib.cli import FrostfsCli
from frostfs_testlib.cli.netmap_parser import NetmapParser
from frostfs_testlib.resources.cli import FROSTFS_CLI_EXEC
from frostfs_testlib.resources.error_patterns import OBJECT_NOT_FOUND
from frostfs_testlib.resources.wellknown_acl import PUBLIC_ACL
from frostfs_testlib.steps.cli.container import create_container, search_nodes_with_container
from frostfs_testlib.steps.cli.object import (
delete_object,
get_object,
get_object_from_random_node,
head_object,
put_object,
put_object_to_random_node,
search_object,
)
from frostfs_testlib.steps.node_management import (
check_node_in_map,
delete_node_data,
drop_object,
exclude_node_from_network_map,
get_locode_from_random_node,
include_node_to_network_map,
node_shard_list,
node_shard_set_mode,
storage_node_set_status,
wait_for_node_to_be_ready,
)
from frostfs_testlib.steps.storage_policy import get_nodes_with_object
from frostfs_testlib.storage.cluster import ClusterNode, StorageNode
from frostfs_testlib.storage.controllers import ClusterStateController
from frostfs_testlib.storage.dataclasses.object_size import ObjectSize
from frostfs_testlib.storage.dataclasses.storage_object_info import NodeStatus
from frostfs_testlib.storage.dataclasses.wallet import WalletInfo
from frostfs_testlib.testing.cluster_test_base import ClusterTestBase
from frostfs_testlib.utils import string_utils
from frostfs_testlib.utils.failover_utils import wait_object_replication
from frostfs_testlib.utils.file_utils import generate_file
from pytest_tests.helpers.utility import wait_for_gc_pass_on_storage_nodes
logger = logging.getLogger("NeoLogger")
check_nodes: list[StorageNode] = []
@pytest.mark.node_mgmt
@pytest.mark.failover
@pytest.mark.order(10)
class TestNodeManagement(ClusterTestBase):
@pytest.fixture
@allure.title("Create container and pick the node with data")
def create_container_and_pick_node(self, default_wallet: WalletInfo, simple_object_size: ObjectSize) -> Tuple[str, StorageNode]:
file_path = generate_file(simple_object_size.value)
placement_rule = "REP 1 IN X CBF 1 SELECT 1 FROM * AS X"
endpoint = self.cluster.default_rpc_endpoint
cid = create_container(
default_wallet,
shell=self.shell,
endpoint=endpoint,
rule=placement_rule,
basic_acl=PUBLIC_ACL,
)
oid = put_object_to_random_node(default_wallet, file_path, cid, self.shell, self.cluster)
nodes = get_nodes_with_object(cid, oid, shell=self.shell, nodes=self.cluster.storage_nodes)
assert len(nodes) == 1
node = nodes[0]
yield cid, node
shards = node_shard_list(node)
assert shards
for shard in shards:
node_shard_set_mode(node, shard, "read-write")
node_shard_list(node)
@reporter.step("Tick epoch with retries")
def tick_epoch_with_retries(self, attempts: int = 3, timeout: int = 3, wait_block: int = None):
for attempt in range(attempts):
try:
self.tick_epoch(wait_block=wait_block)
except RuntimeError:
sleep(timeout)
if attempt >= attempts - 1:
raise
continue
return
@pytest.fixture
def return_nodes_after_test_run(self):
yield
self.return_nodes()
@reporter.step("Return node to cluster")
def return_nodes(self, alive_node: Optional[StorageNode] = None) -> None:
for node in list(check_nodes):
with reporter.step(f"Start node {node}"):
node.start_service()
with reporter.step(f"Waiting status ready for node {node}"):
wait_for_node_to_be_ready(node)
# We need to wait for node to establish notifications from morph-chain
# Otherwise it will hang up when we will try to set status
self.wait_for_blocks()
with reporter.step(f"Move node {node} to online state"):
storage_node_set_status(node, status="online", retries=2)
check_nodes.remove(node)
self.wait_for_blocks()
self.tick_epoch_with_retries(3, wait_block=2)
check_node_in_map(node, shell=self.shell, alive_node=alive_node)
@allure.title("Add one node to cluster")
def test_add_nodes(
self,
default_wallet: WalletInfo,
simple_object_size: ObjectSize,
return_nodes_after_test_run,
):
"""
This test remove one node from frostfs_testlib.storage.cluster then add it back. Test uses base control operations with storage nodes (healthcheck, netmap-snapshot, set-status).
"""
wallet = default_wallet
placement_rule_3 = "REP 3 IN X CBF 1 SELECT 3 FROM * AS X"
placement_rule_4 = "REP 4 IN X CBF 1 SELECT 4 FROM * AS X"
source_file_path = generate_file(simple_object_size.value)
storage_nodes = self.cluster.storage_nodes
random_node = random.choice(storage_nodes[1:])
alive_node = random.choice([storage_node for storage_node in storage_nodes if storage_node.id != random_node.id])
check_node_in_map(random_node, shell=self.shell, alive_node=alive_node)
# Add node to recovery list before messing with it
check_nodes.append(random_node)
exclude_node_from_network_map(random_node, alive_node, shell=self.shell, cluster=self.cluster)
delete_node_data(random_node)
cid = create_container(
wallet,
rule=placement_rule_3,
basic_acl=PUBLIC_ACL,
shell=self.shell,
endpoint=alive_node.get_rpc_endpoint(),
)
oid = put_object(
wallet,
source_file_path,
cid,
shell=self.shell,
endpoint=alive_node.get_rpc_endpoint(),
)
wait_object_replication(cid, oid, 3, shell=self.shell, nodes=storage_nodes)
self.return_nodes(alive_node)
with reporter.step("Check data could be replicated to new node"):
random_node = random.choice(list(set(storage_nodes) - {random_node, alive_node}))
# Add node to recovery list before messing with it
check_nodes.append(random_node)
exclude_node_from_network_map(random_node, alive_node, shell=self.shell, cluster=self.cluster)
wait_object_replication(
cid,
oid,
3,
shell=self.shell,
nodes=list(set(storage_nodes) - {random_node}),
)
include_node_to_network_map(random_node, alive_node, shell=self.shell, cluster=self.cluster)
wait_object_replication(cid, oid, 3, shell=self.shell, nodes=storage_nodes)
with reporter.step("Check container could be created with new node"):
cid = create_container(
wallet,
rule=placement_rule_4,
basic_acl=PUBLIC_ACL,
shell=self.shell,
endpoint=alive_node.get_rpc_endpoint(),
)
oid = put_object(
wallet,
source_file_path,
cid,
shell=self.shell,
endpoint=alive_node.get_rpc_endpoint(),
)
wait_object_replication(cid, oid, 4, shell=self.shell, nodes=storage_nodes)
@allure.title("Drop object using control command")
def test_drop_object(self, default_wallet, complex_object_size: ObjectSize, simple_object_size: ObjectSize):
"""
Test checks object could be dropped using `frostfs-cli control drop-objects` command.
"""
wallet = default_wallet
endpoint = self.cluster.default_rpc_endpoint
file_path_simple = generate_file(simple_object_size.value)
file_path_complex = generate_file(complex_object_size.value)
locode = get_locode_from_random_node(self.cluster)
rule = f"REP 1 IN SE CBF 1 SELECT 1 FROM LOC AS SE FILTER 'UN-LOCODE' EQ '{locode}' AS LOC"
cid = create_container(wallet, rule=rule, shell=self.shell, endpoint=endpoint)
oid_simple = put_object_to_random_node(wallet, file_path_simple, cid, shell=self.shell, cluster=self.cluster)
oid_complex = put_object_to_random_node(wallet, file_path_complex, cid, shell=self.shell, cluster=self.cluster)
for oid in (oid_simple, oid_complex):
get_object_from_random_node(wallet, cid, oid, shell=self.shell, cluster=self.cluster)
head_object(wallet, cid, oid, shell=self.shell, endpoint=endpoint)
nodes_with_object = get_nodes_with_object(cid, oid_simple, shell=self.shell, nodes=self.cluster.storage_nodes)
random_node = random.choice(nodes_with_object)
for oid in (oid_simple, oid_complex):
with reporter.step(f"Drop object {oid}"):
get_object_from_random_node(wallet, cid, oid, shell=self.shell, cluster=self.cluster)
head_object(wallet, cid, oid, shell=self.shell, endpoint=endpoint)
drop_object(random_node, cid, oid)
self.wait_for_obj_dropped(wallet, cid, oid, endpoint, get_object)
self.wait_for_obj_dropped(wallet, cid, oid, endpoint, head_object)
@pytest.mark.skip(reason="Need to clarify scenario")
@allure.title("Control Operations with storage nodes")
def test_shards(
self,
default_wallet,
create_container_and_pick_node,
simple_object_size: ObjectSize,
):
wallet = default_wallet
file_path = generate_file(simple_object_size.value)
cid, node = create_container_and_pick_node
original_oid = put_object_to_random_node(wallet, file_path, cid, self.shell, self.cluster)
# for mode in ('read-only', 'degraded'):
for mode in ("degraded",):
shards = node_shard_list(node)
assert shards
for shard in shards:
node_shard_set_mode(node, shard, mode)
shards = node_shard_list(node)
assert shards
with pytest.raises(RuntimeError):
put_object_to_random_node(wallet, file_path, cid, self.shell, self.cluster)
with pytest.raises(RuntimeError):
delete_object(wallet, cid, original_oid, self.shell, self.cluster.default_rpc_endpoint)
get_object_from_random_node(wallet, cid, original_oid, self.shell, self.cluster)
for shard in shards:
node_shard_set_mode(node, shard, "read-write")
shards = node_shard_list(node)
assert shards
oid = put_object_to_random_node(wallet, file_path, cid, self.shell, self.cluster)
delete_object(wallet, cid, oid, self.shell, self.cluster.default_rpc_endpoint)
@allure.title("Put object with stopped node")
def test_stop_node(self, default_wallet, return_nodes_after_test_run, simple_object_size: ObjectSize):
wallet = default_wallet
placement_rule = "REP 3 IN X SELECT 4 FROM * AS X"
source_file_path = generate_file(simple_object_size.value)
storage_nodes = self.cluster.storage_nodes
random_node = random.choice(storage_nodes[1:])
alive_node = random.choice([storage_node for storage_node in storage_nodes if storage_node.id != random_node.id])
cid = create_container(
wallet,
rule=placement_rule,
basic_acl=PUBLIC_ACL,
shell=self.shell,
endpoint=random_node.get_rpc_endpoint(),
)
with reporter.step("Stop the random node"):
check_nodes.append(random_node)
random_node.stop_service()
with reporter.step("Try to put an object and expect success"):
put_object(
wallet,
source_file_path,
cid,
shell=self.shell,
endpoint=alive_node.get_rpc_endpoint(),
)
self.return_nodes(alive_node)
@reporter.step("Wait for object to be dropped")
def wait_for_obj_dropped(self, wallet: str, cid: str, oid: str, endpoint: str, checker: Callable) -> None:
for _ in range(3):
try:
checker(wallet, cid, oid, shell=self.shell, endpoint=endpoint)
wait_for_gc_pass_on_storage_nodes()
except Exception as err:
if string_utils.is_str_match_pattern(err, OBJECT_NOT_FOUND):
return
raise AssertionError(f'Expected "{OBJECT_NOT_FOUND}" error, got\n{err}')
raise AssertionError(f"Object {oid} was not dropped from node")
@pytest.mark.maintenance
@pytest.mark.failover
@pytest.mark.order(9)
class TestMaintenanceMode(ClusterTestBase):
@pytest.fixture()
@allure.title("Init Frostfs CLI remote")
def frostfs_cli_remote(self, node_under_test: ClusterNode) -> FrostfsCli:
host = node_under_test.host
service_config = host.get_service_config(node_under_test.storage_node.name)
wallet_path = service_config.attributes["wallet_path"]
wallet_password = service_config.attributes["wallet_password"]
shell = host.get_shell()
wallet_config_path = f"/tmp/{node_under_test.storage_node.name}-config.yaml"
wallet_config = f'wallet: {wallet_path}\npassword: "{wallet_password}"'
shell.exec(f"echo '{wallet_config}' > {wallet_config_path}")
cli = FrostfsCli(shell=shell, frostfs_cli_exec_path=FROSTFS_CLI_EXEC, config_file=wallet_config_path)
return cli
@pytest.fixture()
def restore_node_status(self, cluster_state_controller: ClusterStateController, default_wallet: WalletInfo):
nodes_to_restore = []
yield nodes_to_restore
for node_to_restore in nodes_to_restore:
cluster_state_controller.set_node_status(node_to_restore, default_wallet, NodeStatus.ONLINE)
def check_node_status(self, expected_status: NodeStatus, node_under_test: ClusterNode, frostfs_cli: FrostfsCli, rpc_endpoint: str):
netmap = frostfs_cli.netmap.snapshot(rpc_endpoint).stdout
all_snapshots = NetmapParser.snapshot_all_nodes(netmap)
node_snapshot = [snapshot for snapshot in all_snapshots if node_under_test.host_ip == snapshot.node]
if expected_status == NodeStatus.OFFLINE and not node_snapshot:
assert node_under_test.host_ip not in netmap, f"{node_under_test} status should be {expected_status}. See netmap:\n{netmap}"
return
assert node_snapshot, f"{node_under_test} status should be {expected_status}, but was not in netmap. See netmap:\n{netmap}"
node_snapshot = node_snapshot[0]
assert (
expected_status == node_snapshot.node_status
), f"{node_under_test} status should be {expected_status}, but was {node_snapshot.node_status}. See netmap:\n{netmap}"
@allure.title("Test of basic node operations in maintenance mode")
def test_maintenance_mode(
self,
default_wallet: WalletInfo,
simple_object_size: ObjectSize,
cluster_state_controller: ClusterStateController,
restore_node_status: list[ClusterNode],
):
with reporter.step("Create container and create\put object"):
cid = create_container(
wallet=default_wallet,
shell=self.shell,
endpoint=self.cluster.default_rpc_endpoint,
rule="REP 1 CBF 1",
)
nodes_with_container = search_nodes_with_container(
wallet=default_wallet,
cid=cid,
shell=self.shell,
endpoint=self.cluster.default_rpc_endpoint,
cluster=self.cluster,
)
node_under_test = nodes_with_container[0]
endpoint = node_under_test.storage_node.get_rpc_endpoint()
file_path = generate_file(simple_object_size.value)
oid = put_object(
wallet=default_wallet,
path=file_path,
cid=cid,
shell=self.shell,
endpoint=endpoint,
)
with reporter.step("Set node status to 'maintenance'"):
restore_node_status.append(node_under_test)
cluster_state_controller.set_node_status(node_under_test, default_wallet, NodeStatus.MAINTENANCE)
node_under_maintenance_error = "node is under maintenance"
with reporter.step("Run basic operations with node in maintenance"):
with pytest.raises(RuntimeError, match=node_under_maintenance_error):
get_object(default_wallet, cid, oid, self.shell, endpoint)
with pytest.raises(RuntimeError, match=node_under_maintenance_error):
search_object(default_wallet, cid, self.shell, endpoint)
with pytest.raises(RuntimeError, match=node_under_maintenance_error):
delete_object(default_wallet, cid, oid, self.shell, endpoint)
with pytest.raises(RuntimeError, match=node_under_maintenance_error):
put_object(default_wallet, file_path, cid, self.shell, endpoint)
with reporter.step("Run basic operations with node not in maintenance"):
other_nodes = list(set(self.cluster.cluster_nodes) - set(nodes_with_container))
endpoint = other_nodes[0].storage_node.get_rpc_endpoint()
with pytest.raises(RuntimeError, match=OBJECT_NOT_FOUND):
get_object(default_wallet, cid, oid, self.shell, endpoint)
search_object(default_wallet, cid, self.shell, endpoint)
with pytest.raises(RuntimeError, match=OBJECT_NOT_FOUND):
delete_object(default_wallet, cid, oid, self.shell, endpoint)
with pytest.raises(RuntimeError, match=node_under_maintenance_error):
put_object(default_wallet, file_path, cid, self.shell, endpoint)
@pytest.mark.sanity
@allure.title("MAINTENANCE and OFFLINE mode transitions")
def test_mode_transitions(
self,
cluster_state_controller: ClusterStateController,
node_under_test: ClusterNode,
default_wallet: WalletInfo,
frostfs_cli: FrostfsCli,
restore_node_status: list[ClusterNode],
):
restore_node_status.append(node_under_test)
alive_nodes = list(set(self.cluster.cluster_nodes) - {node_under_test})
alive_storage_node = alive_nodes[0].storage_node
alive_rpc_endpoint = alive_storage_node.get_rpc_endpoint()
with reporter.step("Set node status to 'offline'"):
cluster_state_controller.set_node_status(node_under_test, default_wallet, NodeStatus.OFFLINE)
with reporter.step("Check node status is 'offline' after update the network map"):
self.check_node_status(NodeStatus.OFFLINE, node_under_test, frostfs_cli, alive_rpc_endpoint)
with reporter.step("Restart storage service"):
cluster_state_controller.stop_storage_service(node_under_test)
cluster_state_controller.start_storage_service(node_under_test)
with reporter.step("Tick 2 epochs"):
self.tick_epochs(2, alive_storage_node, 2)
with reporter.step("Check node status is 'online' after storage service restart"):
self.check_node_status(NodeStatus.ONLINE, node_under_test, frostfs_cli, alive_rpc_endpoint)
with reporter.step("Set node status to 'maintenance'"):
cluster_state_controller.set_node_status(node_under_test, default_wallet, NodeStatus.MAINTENANCE)
with reporter.step("Restart storage service"):
cluster_state_controller.stop_storage_service(node_under_test)
cluster_state_controller.start_storage_service(node_under_test)
with reporter.step("Tick 2 epochs"):
self.tick_epochs(2, alive_storage_node, 2)
with reporter.step("Check node staus is 'maintenance' after storage service restart"):
self.check_node_status(NodeStatus.MAINTENANCE, node_under_test, frostfs_cli, alive_rpc_endpoint)
with reporter.step("Set node status to 'offline'"):
cluster_state_controller.set_node_status(node_under_test, default_wallet, NodeStatus.OFFLINE)
with reporter.step("Stop storage service"):
cluster_state_controller.stop_storage_service(node_under_test)
with reporter.step("Tick 2 epochs"):
self.tick_epochs(2, alive_storage_node, 2)
with reporter.step("Start storage service"):
cluster_state_controller.start_storage_service(node_under_test)
with reporter.step("Tick 2 epochs"):
self.tick_epochs(2, alive_storage_node, 2)
with reporter.step("Check node status is 'online' after storage service start"):
self.check_node_status(NodeStatus.ONLINE, node_under_test, frostfs_cli, alive_rpc_endpoint)
with reporter.step("Set node status to 'maintenance'"):
cluster_state_controller.set_node_status(node_under_test, default_wallet, NodeStatus.MAINTENANCE)
with reporter.step("Stop storage service"):
cluster_state_controller.stop_storage_service(node_under_test)
with reporter.step("Tick 2 epochs"):
self.tick_epochs(2, alive_storage_node, 2)
with reporter.step("Start storage service"):
cluster_state_controller.start_storage_service(node_under_test)
with reporter.step("Check node status is 'maintenance'"):
self.check_node_status(NodeStatus.MAINTENANCE, node_under_test, frostfs_cli, alive_rpc_endpoint)
with reporter.step("Tick 2 epochs"):
self.tick_epochs(2, alive_storage_node, 2)
with reporter.step("Check node status is 'maintenance'"):
self.check_node_status(NodeStatus.MAINTENANCE, node_under_test, frostfs_cli, alive_rpc_endpoint)
@allure.title("A node cannot go into maintenance if maintenance is prohibited globally in the network")
def test_maintenance_globally_forbidden(
self,
cluster_state_controller: ClusterStateController,
node_under_test: ClusterNode,
frostfs_cli_remote: FrostfsCli,
default_wallet: WalletInfo,
restore_node_status: list[ClusterNode],
):
restore_node_status.append(node_under_test)
control_endpoint = node_under_test.service(StorageNode).get_control_endpoint()
with reporter.step("Set MaintenanceModeAllowed = false"):
cluster_state_controller.set_maintenance_mode_allowed("false", node_under_test)
with reporter.step("Set node status to 'maintenance'"):
with pytest.raises(RuntimeError, match="maintenance mode is not allowed by the network"):
frostfs_cli_remote.control.set_status(endpoint=control_endpoint, status="maintenance")
with reporter.step("Set MaintenanceModeAllowed = true"):
cluster_state_controller.set_maintenance_mode_allowed("true", node_under_test)
with reporter.step("Set node status to 'maintenance'"):
cluster_state_controller.set_node_status(node_under_test, default_wallet, NodeStatus.MAINTENANCE)