Add new test #154

Merged
d.zayakin merged 1 commit from d.zayakin/frostfs-testcases:new-restart-nodes-test into master 2023-11-24 10:54:13 +00:00
6 changed files with 833 additions and 677 deletions

View file

@ -53,7 +53,8 @@ def pytest_collection_modifyitems(items: list[pytest.Item]):
def priority(item: pytest.Item) -> int: def priority(item: pytest.Item) -> int:
is_node_mgmt_test = 1 if item.get_closest_marker("node_mgmt") else 0 is_node_mgmt_test = 1 if item.get_closest_marker("node_mgmt") else 0
is_logs_check_test = 100 if item.get_closest_marker("logs_after_session") else 0 is_logs_check_test = 100 if item.get_closest_marker("logs_after_session") else 0
return is_node_mgmt_test + is_logs_check_test is_system_time_test = 10 if item.get_closest_marker("system_time") else 0
return is_node_mgmt_test + is_logs_check_test + is_system_time_test
items.sort(key=lambda item: priority(item)) items.sort(key=lambda item: priority(item))

View file

@ -1,3 +1,4 @@
import os
import random import random
from datetime import datetime from datetime import datetime
@ -5,6 +6,8 @@ import allure
import pytest import pytest
from frostfs_testlib.storage.cluster import Cluster, ClusterNode from frostfs_testlib.storage.cluster import Cluster, ClusterNode
from frostfs_testlib.storage.controllers import ShardsWatcher from frostfs_testlib.storage.controllers import ShardsWatcher
from frostfs_testlib.storage.dataclasses.object_size import ObjectSize
from frostfs_testlib.utils.file_utils import generate_file
@pytest.fixture() @pytest.fixture()
@ -27,3 +30,11 @@ def shards_watcher(node_under_test: ClusterNode) -> ShardsWatcher:
def test_start_time() -> datetime: def test_start_time() -> datetime:
start_time = datetime.utcnow() start_time = datetime.utcnow()
return start_time return start_time
@pytest.fixture()
@allure.title("Generate simple size file")
def simple_file(simple_object_size: ObjectSize) -> str:
path_file = generate_file(size=simple_object_size.value)
yield path_file
os.remove(path_file)

View file

@ -1,12 +1,14 @@
import logging import logging
import os.path import os.path
import random import random
import time
import allure import allure
import pytest import pytest
from frostfs_testlib.resources.common import MORPH_BLOCK_TIME
from frostfs_testlib.resources.wellknown_acl import PUBLIC_ACL from frostfs_testlib.resources.wellknown_acl import PUBLIC_ACL
from frostfs_testlib.steps.cli.container import StorageContainer, StorageContainerInfo, create_container from frostfs_testlib.steps.cli.container import StorageContainer, StorageContainerInfo, create_container
from frostfs_testlib.steps.cli.object import get_object from frostfs_testlib.steps.cli.object import get_object, get_object_nodes, put_object
from frostfs_testlib.steps.node_management import check_node_in_map, check_node_not_in_map from frostfs_testlib.steps.node_management import check_node_in_map, check_node_not_in_map
from frostfs_testlib.storage.cluster import ClusterNode, StorageNode from frostfs_testlib.storage.cluster import ClusterNode, StorageNode
from frostfs_testlib.storage.controllers import ClusterStateController from frostfs_testlib.storage.controllers import ClusterStateController
@ -15,6 +17,7 @@ from frostfs_testlib.storage.dataclasses.storage_object_info import StorageObjec
from frostfs_testlib.storage.dataclasses.wallet import WalletInfo from frostfs_testlib.storage.dataclasses.wallet import WalletInfo
from frostfs_testlib.testing.cluster_test_base import ClusterTestBase from frostfs_testlib.testing.cluster_test_base import ClusterTestBase
from frostfs_testlib.testing.test_control import wait_for_success from frostfs_testlib.testing.test_control import wait_for_success
from frostfs_testlib.utils import datetime_utils
from frostfs_testlib.utils.failover_utils import wait_object_replication from frostfs_testlib.utils.failover_utils import wait_object_replication
from frostfs_testlib.utils.file_utils import get_file_hash from frostfs_testlib.utils.file_utils import get_file_hash
from pytest import FixtureRequest from pytest import FixtureRequest
@ -61,6 +64,22 @@ class TestFailoverServer(ClusterTestBase):
return containers return containers
@allure.step("Creation container")
@pytest.fixture()
def container(self, default_wallet: str) -> StorageContainer:
select = len(self.cluster.cluster_nodes)
placement_rule = f"REP {select - 1} CBF 1 SELECT {select} FROM *"
cont_id = create_container(
default_wallet,
shell=self.shell,
endpoint=self.cluster.default_rpc_endpoint,
rule=placement_rule,
basic_acl=PUBLIC_ACL,
)
wallet = WalletInfo(path=default_wallet)
storage_cont_info = StorageContainerInfo(id=cont_id, wallet_file=wallet)
return StorageContainer(storage_container_info=storage_cont_info, shell=self.shell, cluster=self.cluster)
@allure.step("Create object and delete after test") @allure.step("Create object and delete after test")
@pytest.fixture(scope="class") @pytest.fixture(scope="class")
def storage_objects( def storage_objects(
@ -96,20 +115,25 @@ class TestFailoverServer(ClusterTestBase):
self, nodes: list[StorageNode], storage_objects: list[StorageObjectInfo] self, nodes: list[StorageNode], storage_objects: list[StorageObjectInfo]
) -> list[StorageObjectInfo]: ) -> list[StorageObjectInfo]:
corrupted_objects = [] corrupted_objects = []
errors_get = []
for node in nodes: for node in nodes:
for storage_object in storage_objects: for storage_object in storage_objects:
got_file_path = get_object( try:
storage_object.wallet_file_path, got_file_path = get_object(
storage_object.cid, storage_object.wallet_file_path,
storage_object.oid, storage_object.cid,
endpoint=node.get_rpc_endpoint(), storage_object.oid,
shell=self.shell, endpoint=node.get_rpc_endpoint(),
timeout="60s", shell=self.shell,
) timeout="60s",
if storage_object.file_hash != get_file_hash(got_file_path): )
corrupted_objects.append(storage_object) if storage_object.file_hash != get_file_hash(got_file_path):
os.remove(got_file_path) corrupted_objects.append(storage_object)
os.remove(got_file_path)
except RuntimeError:
errors_get.append(storage_object.oid)
assert len(errors_get) == 0, f"Get failed - {errors_get}"
return corrupted_objects return corrupted_objects
def check_objects_replication( def check_objects_replication(
@ -126,6 +150,26 @@ class TestFailoverServer(ClusterTestBase):
attempts=60, attempts=60,
) )
@pytest.fixture()
def object_and_nodes(
self, simple_object_size: ObjectSize, container: StorageContainer, default_wallet: str
) -> tuple[StorageObjectInfo, list[ClusterNode]]:
object_info = container.generate_object(simple_object_size.value)
object_nodes = get_object_nodes(
cluster=self.cluster,
wallet=default_wallet,
cid=object_info.cid,
oid=object_info.oid,
shell=self.shell,
endpoint=self.cluster.default_rpc_endpoint,
)
return object_info, object_nodes
@pytest.fixture()
def up_stop_nodes(self, cluster_state_controller: ClusterStateController):
yield
cluster_state_controller.start_stopped_hosts()
@allure.title("Full shutdown node") @allure.title("Full shutdown node")
@pytest.mark.parametrize("containers, storage_objects", [(5, 10)], indirect=True) @pytest.mark.parametrize("containers, storage_objects", [(5, 10)], indirect=True)
def test_complete_node_shutdown( def test_complete_node_shutdown(
@ -157,7 +201,7 @@ class TestFailoverServer(ClusterTestBase):
count_tick_epoch = int(alive_nodes[0].ir_node.get_netmap_cleaner_threshold()) + 4 count_tick_epoch = int(alive_nodes[0].ir_node.get_netmap_cleaner_threshold()) + 4
with allure.step(f"Tick {count_tick_epoch} epochs and wait for 2 blocks"): with allure.step(f"Tick {count_tick_epoch} epochs and wait for 2 blocks"):
self.tick_epochs(count_tick_epoch, wait_block=2) self.tick_epochs(count_tick_epoch, storage_nodes[0], wait_block=2)
with allure.step(f"Check {node_to_stop} in not map"): with allure.step(f"Check {node_to_stop} in not map"):
self.wait_node_not_in_map(node_to_stop.storage_node, self.shell, alive_node=storage_nodes[0]) self.wait_node_not_in_map(node_to_stop.storage_node, self.shell, alive_node=storage_nodes[0])
@ -195,3 +239,111 @@ class TestFailoverServer(ClusterTestBase):
with allure.step("Verify that there are no corrupted objects"): with allure.step("Verify that there are no corrupted objects"):
corrupted_objects_list = self.get_corrupted_objects_list(storage_nodes, storage_objects) corrupted_objects_list = self.get_corrupted_objects_list(storage_nodes, storage_objects)
assert not corrupted_objects_list assert not corrupted_objects_list
@allure.title("Not enough nodes in the container with policy - 'REP 3 CBF 1 SELECT 4 FROM *'")
def test_not_enough_nodes_in_container_rep_3(
self,
container: list[StorageContainer],
object_and_nodes: tuple[StorageObjectInfo, list[ClusterNode]],
default_wallet: str,
cluster_state_controller: ClusterStateController,
simple_file: str,
up_stop_nodes: None,
):
object_info, object_nodes = object_and_nodes
node_not_object = list(set(self.cluster.cluster_nodes) - set(object_nodes))[0]
with allure.step("Stop all nodes with object, except 1"):
for cluster_node in object_nodes[1:]:
cluster_state_controller.stop_node_host(node=cluster_node, mode="hard")
with allure.step("Get object"):
with allure.step(f"Get operation to {node_not_object} where it does not exist, expect success"):
get_object(
wallet=default_wallet,
cid=object_info.cid,
oid=object_info.oid,
shell=self.shell,
endpoint=node_not_object.storage_node.get_rpc_endpoint(),
)
with allure.step(f"Get operation to {object_nodes[0]} with object, expect success"):
get_object(
wallet=default_wallet,
cid=object_info.cid,
oid=object_info.oid,
shell=self.shell,
endpoint=object_nodes[0].storage_node.get_rpc_endpoint(),
)
with allure.step(f"Put operation to node with object, expect error"):
with pytest.raises(RuntimeError):
put_object(
wallet=default_wallet,
path=simple_file,
cid=object_info.cid,
shell=self.shell,
endpoint=node_not_object.storage_node.get_rpc_endpoint(),
)
@allure.title("Not enough nodes in the container with policy - 'REP 2 CBF 2 SELECT 4 FROM *'")
def test_not_enough_nodes_in_container_rep_2(
self,
default_wallet: str,
cluster_state_controller: ClusterStateController,
simple_file: str,
up_stop_nodes: None,
):
with allure.step("Creating a container with a full network map"):
select = len(self.cluster.cluster_nodes)
placement_rule = f"REP {select - 2} IN X CBF 2 SELECT {select} FROM * AS X"
cid_1 = create_container(
default_wallet,
shell=self.shell,
endpoint=self.cluster.default_rpc_endpoint,
rule=placement_rule,
basic_acl=PUBLIC_ACL,
)
with allure.step("Put object"):
oid = put_object(
wallet=default_wallet,
path=simple_file,
cid=cid_1,
shell=self.shell,
endpoint=self.cluster.default_rpc_endpoint,
)
with allure.step("Search nodes with object"):
object_nodes = get_object_nodes(
cluster=self.cluster,
wallet=default_wallet,
cid=cid_1,
oid=oid,
shell=self.shell,
endpoint=self.cluster.default_rpc_endpoint,
)
with allure.step("Turn off random node with object"):
cluster_state_controller.stop_node_host(node=random.choice(object_nodes[1:]), mode="hard")
with allure.step("Checking PUT operation"):
oid_2 = put_object(
wallet=default_wallet,
path=simple_file,
cid=cid_1,
shell=self.shell,
endpoint=self.cluster.default_rpc_endpoint,
)
with allure.step("Checking GET operation"):
get_file = get_object(
wallet=default_wallet,
cid=cid_1,
oid=oid_2,
shell=self.shell,
endpoint=object_nodes[0].storage_node.get_rpc_endpoint(),
)
os.remove(get_file)
with allure.step("Checking creating container"):
_ = create_container(
default_wallet,
shell=self.shell,
endpoint=object_nodes[0].storage_node.get_rpc_endpoint(),
rule=placement_rule,
basic_acl=PUBLIC_ACL,
)

View file

@ -1,30 +1,15 @@
import logging import logging
import os
import random import random
from datetime import datetime from datetime import datetime
from time import sleep from time import sleep
import allure import allure
import pytest import pytest
from frostfs_testlib.cli import FrostfsCli from frostfs_testlib.resources.common import MORPH_BLOCK_TIME
from frostfs_testlib.cli.netmap_parser import NetmapParser
from frostfs_testlib.resources.cli import FROSTFS_CLI_EXEC
from frostfs_testlib.resources.common import DEFAULT_WALLET_CONFIG, MORPH_BLOCK_TIME
from frostfs_testlib.resources.wellknown_acl import PUBLIC_ACL from frostfs_testlib.resources.wellknown_acl import PUBLIC_ACL
from frostfs_testlib.s3 import S3ClientWrapper, VersioningStatus from frostfs_testlib.s3 import S3ClientWrapper, VersioningStatus
from frostfs_testlib.steps.cli.container import ( from frostfs_testlib.steps.cli.container import StorageContainer, StorageContainerInfo, create_container
StorageContainer, from frostfs_testlib.steps.cli.object import get_object, put_object_to_random_node
StorageContainerInfo,
create_container,
search_nodes_with_container,
)
from frostfs_testlib.steps.cli.object import (
delete_object,
get_object,
put_object,
put_object_to_random_node,
search_object,
)
from frostfs_testlib.steps.node_management import ( from frostfs_testlib.steps.node_management import (
check_node_in_map, check_node_in_map,
check_node_not_in_map, check_node_not_in_map,
@ -37,7 +22,7 @@ from frostfs_testlib.steps.s3 import s3_helper
from frostfs_testlib.storage.cluster import Cluster, ClusterNode, S3Gate, StorageNode from frostfs_testlib.storage.cluster import Cluster, ClusterNode, S3Gate, StorageNode
from frostfs_testlib.storage.controllers import ClusterStateController, ShardsWatcher from frostfs_testlib.storage.controllers import ClusterStateController, ShardsWatcher
from frostfs_testlib.storage.dataclasses.object_size import ObjectSize from frostfs_testlib.storage.dataclasses.object_size import ObjectSize
from frostfs_testlib.storage.dataclasses.storage_object_info import ModeNode, StorageObjectInfo from frostfs_testlib.storage.dataclasses.storage_object_info import StorageObjectInfo
from frostfs_testlib.storage.dataclasses.wallet import WalletInfo from frostfs_testlib.storage.dataclasses.wallet import WalletInfo
from frostfs_testlib.testing.cluster_test_base import ClusterTestBase from frostfs_testlib.testing.cluster_test_base import ClusterTestBase
from frostfs_testlib.testing.test_control import expect_not_raises from frostfs_testlib.testing.test_control import expect_not_raises
@ -727,325 +712,3 @@ class TestStorageDataLoss(ClusterTestBase):
with allure.step("Delete bucket"): with allure.step("Delete bucket"):
s3_client.delete_bucket(bucket) s3_client.delete_bucket(bucket)
@pytest.mark.maintenance
class TestMaintenanceMode(ClusterTestBase):
change_node: ClusterNode = None
@pytest.fixture()
@allure.title("Init Frostfs CLI remote")
def frostfs_cli_remote(self, node_under_test: ClusterNode) -> FrostfsCli:
host = node_under_test.host
service_config = host.get_service_config(node_under_test.storage_node.name)
wallet_path = service_config.attributes["wallet_path"]
wallet_password = service_config.attributes["wallet_password"]
shell = host.get_shell()
wallet_config_path = f"/tmp/{node_under_test.storage_node.name}-config.yaml"
wallet_config = f'wallet: {wallet_path}\npassword: "{wallet_password}"'
shell.exec(f"echo '{wallet_config}' > {wallet_config_path}")
cli = FrostfsCli(shell=shell, frostfs_cli_exec_path=FROSTFS_CLI_EXEC, config_file=wallet_config_path)
return cli
@pytest.fixture()
@allure.title("Init Frostfs CLI remote")
def frostfs_cli(self) -> FrostfsCli:
cli = FrostfsCli(shell=self.shell, frostfs_cli_exec_path=FROSTFS_CLI_EXEC, config_file=DEFAULT_WALLET_CONFIG)
return cli
@pytest.fixture()
def restore_online_mode_node(self, cluster_state_controller: ClusterStateController, default_wallet: str):
yield
cluster_state_controller.set_mode_node(cluster_node=self.change_node, wallet=default_wallet, status="online")
self.tick_epoch(wait_block=2)
def basic_operations(self, wallet, cid, oid, shell, endpoint, matchs, object_size):
file_path = generate_file(object_size)
default_kw = {"wallet": wallet, "cid": cid, "shell": shell, "endpoint": endpoint}
operations = {
get_object: {"oid": oid},
search_object: {},
delete_object: {"oid": oid},
put_object: {"path": file_path},
}
for index, operation, kw in enumerate(operations.items()):
with allure.step(f"Run {operation.__name__} object, waiting response - {matchs[index]}"):
default_kw.update(kw)
if operation == search_object and "Found" in matchs[index]:
operation(**default_kw)
continue
with pytest.raises(RuntimeError, match=matchs[index]):
operation(**default_kw)
os.remove(file_path)
@allure.title("Test of basic node operations in maintenance mode")
def test_maintenance_mode(
self,
default_wallet: str,
simple_object_size: ObjectSize,
cluster_state_controller: ClusterStateController,
restore_online_mode_node: None,
):
with allure.step("Create container and create\put object"):
cid = create_container(
wallet=default_wallet,
shell=self.shell,
endpoint=self.cluster.default_rpc_endpoint,
rule="REP 1 CBF 1",
)
node = search_nodes_with_container(
wallet=default_wallet,
cid=cid,
shell=self.shell,
endpoint=self.cluster.default_rpc_endpoint,
cluster=self.cluster,
)
self.change_node = node[0]
file_path = generate_file(simple_object_size.value)
oid = put_object(
wallet=default_wallet,
path=file_path,
cid=cid,
shell=self.shell,
endpoint=self.change_node.storage_node.get_rpc_endpoint(),
)
with allure.step("Enable MaintenanceModeAllowed:"):
cluster_state_controller.set_mode_node(
cluster_node=self.change_node, wallet=default_wallet, status="maintenance"
)
other_nodes = list(set(self.cluster.cluster_nodes) - set(node))
node_and_match = {
self.change_node: ["node is under maintenance"] * 4,
other_nodes[0]: [
"object not found",
"Found 0 objects",
"object not found",
"node is under maintenance",
],
}
with allure.step("Run basic operations"):
for cluster_node, matchs in node_and_match.items():
self.basic_operations(
wallet=default_wallet,
cid=cid,
oid=oid,
shell=self.shell,
endpoint=cluster_node.storage_node.get_rpc_endpoint(),
matchs=matchs,
object_size=simple_object_size.value,
)
@pytest.mark.sanity
@allure.title("MAINTENANCE and OFFLINE mode transitions")
def test_mode_transitions(
self,
cluster_state_controller: ClusterStateController,
node_under_test: ClusterNode,
default_wallet: str,
frostfs_cli: FrostfsCli,
restore_online_mode_node: None,
):
self.change_node = node_under_test
cluster_nodes = list(set(self.cluster.cluster_nodes) - {node_under_test})
with allure.step("Tick epoch"):
self.tick_epochs(epochs_to_tick=2, alive_node=cluster_nodes[0].storage_node, wait_block=2)
with allure.step("Set node mode to offline"):
cluster_state_controller.set_mode_node(
cluster_node=node_under_test,
wallet=default_wallet,
status=ModeNode.OFFLINE.value,
)
with allure.step("Tick epoch to update the network map"):
self.tick_epochs(epochs_to_tick=2, alive_node=cluster_nodes[0].storage_node, wait_block=2)
with ((allure.step("Check node mode = offline, after update the network map"))):
netmap = frostfs_cli.netmap.snapshot(
rpc_endpoint=cluster_nodes[0].storage_node.get_rpc_endpoint(), wallet=default_wallet
).stdout
netmap = NetmapParser.snapshot_all_nodes(netmap)
assert node_under_test.host_ip not in [
netmap_node.node for netmap_node in netmap
], f"Node {node_under_test.host_ip} not in state offline"
with allure.step("Restart storage service"):
cluster_state_controller.stop_storage_service(node_under_test)
cluster_state_controller.start_storage_service(node_under_test)
with allure.step("Tick epoch after restart storage service and set mode to offline"):
self.tick_epochs(epochs_to_tick=2, alive_node=cluster_nodes[0].storage_node, wait_block=2)
with (allure.step("Check node mode = online, after restart storage service")):
netmap = frostfs_cli.netmap.snapshot(
rpc_endpoint=cluster_nodes[0].storage_node.get_rpc_endpoint(), wallet=default_wallet
).stdout
node_state = NetmapParser.snapshot_one_node(netmap, node_under_test).node_status
assert node_state == ModeNode.ONLINE.value.upper(), (
f"Node actual state - {node_state}, " f"expect - {ModeNode.ONLINE.value}"
)
with allure.step("Tick epoch"):
self.tick_epochs(epochs_to_tick=2, alive_node=cluster_nodes[0].storage_node, wait_block=2)
with allure.step("Set node mode to maintenance"):
cluster_state_controller.set_mode_node(
cluster_node=node_under_test, wallet=default_wallet, status=ModeNode.MAINTENANCE.value
)
with allure.step("Restart storage service"):
cluster_state_controller.stop_storage_service(node_under_test)
cluster_state_controller.start_storage_service(node_under_test)
with allure.step("Tick epoch after restart storage service"):
self.tick_epochs(epochs_to_tick=2, alive_node=cluster_nodes[0].storage_node, wait_block=2)
with allure.step("Check node mode = maintenance, after restart storage service and tick epoch"):
netmap = frostfs_cli.netmap.snapshot(
rpc_endpoint=cluster_nodes[0].storage_node.get_rpc_endpoint(), wallet=default_wallet
).stdout
node_state = NetmapParser.snapshot_one_node(netmap, node_under_test).node_status
assert node_state == ModeNode.MAINTENANCE.value.upper(), (
f"Node actual state - {node_state}, " f"expect - {ModeNode.MAINTENANCE.value}"
)
with allure.step("Tick epoch"):
self.tick_epochs(epochs_to_tick=2, alive_node=cluster_nodes[0].storage_node, wait_block=2)
with allure.step("Set node mode to offline"):
netmap = frostfs_cli.netmap.snapshot(
rpc_endpoint=cluster_nodes[0].storage_node.get_rpc_endpoint(), wallet=default_wallet
).stdout
node_state = NetmapParser.snapshot_one_node(netmap, node_under_test).node_status
assert node_state == ModeNode.OFFLINE.value.upper(), (
f"Node actual state - {node_state}, " f"expect - {ModeNode.OFFLINE.value}"
)
with allure.step("Stop storage service"):
cluster_state_controller.stop_storage_service(node_under_test)
with allure.step("Tick epoch"):
self.tick_epochs(epochs_to_tick=2, alive_node=cluster_nodes[0].storage_node, wait_block=2)
with allure.step("Start storage service"):
cluster_state_controller.start_storage_service(node_under_test)
with allure.step("Check node mode = offline, after tick epoch and start storage service"):
netmap = frostfs_cli.netmap.snapshot(
rpc_endpoint=cluster_nodes[0].storage_node.get_rpc_endpoint(), wallet=default_wallet
).stdout
node_state = NetmapParser.snapshot_one_node(netmap, node_under_test).node_status
assert node_state == ModeNode.OFFLINE.value.upper(), (
f"Node actual state - {node_state}, " f"expect - {ModeNode.OFFLINE.value}"
)
with allure.step("Tick epoch"):
self.tick_epochs(epochs_to_tick=2, alive_node=cluster_nodes[0].storage_node, wait_block=2)
with allure.step("Check node mode = online, after start storage service and tick epoch"):
netmap = frostfs_cli.netmap.snapshot(
rpc_endpoint=cluster_nodes[0].storage_node.get_rpc_endpoint(), wallet=default_wallet
).stdout
node_state = NetmapParser.snapshot_one_node(netmap, node_under_test).node_status
assert node_state == ModeNode.ONLINE.value.upper(), (
f"Node actual state - {node_state}, " f"expect - {ModeNode.ONLINE.value}"
)
with allure.step("Tick epoch"):
self.tick_epochs(epochs_to_tick=2, alive_node=cluster_nodes[0].storage_node, wait_block=2)
with allure.step("Set node mode to maintenance"):
cluster_state_controller.set_mode_node(
cluster_node=node_under_test, wallet=default_wallet, status=ModeNode.MAINTENANCE.value
)
with allure.step("Stop storage service"):
cluster_state_controller.stop_storage_service(node_under_test)
with allure.step("Tick epoch"):
self.tick_epochs(epochs_to_tick=2, alive_node=cluster_nodes[0].storage_node, wait_block=2)
with allure.step("Start storage service"):
cluster_state_controller.start_storage_service(node_under_test)
with allure.step("Check node mode = maintenance"):
netmap = frostfs_cli.netmap.snapshot(
rpc_endpoint=cluster_nodes[0].storage_node.get_rpc_endpoint(), wallet=default_wallet
).stdout
node_state = NetmapParser.snapshot_one_node(netmap, node_under_test).node_status
assert (
node_state == ModeNode.MAINTENANCE.value.upper()
), f"Node actual state - {node_state}, expect - {ModeNode.MAINTENANCE.value}"
with allure.step("Tick epoch"):
self.tick_epochs(epochs_to_tick=2, alive_node=cluster_nodes[0].storage_node, wait_block=2)
with allure.step("Check node mode = maintenance"):
netmap = frostfs_cli.netmap.snapshot(
rpc_endpoint=cluster_nodes[0].storage_node.get_rpc_endpoint(), wallet=default_wallet
).stdout
node_state = NetmapParser.snapshot_one_node(netmap, node_under_test).node_status
assert node_state == ModeNode.MAINTENANCE.value.upper(), (
f"Node actual state - {node_state}, " f"expect - {ModeNode.MAINTENANCE.value}"
)
@allure.title("A node cannot go into maintenance if maintenance is prohibited globally in the network")
def test_maintenance_globally_forbidden(
self,
cluster_state_controller: ClusterStateController,
node_under_test: ClusterNode,
frostfs_cli_remote: FrostfsCli,
frostfs_cli: FrostfsCli,
default_wallet: str,
restore_online_mode_node: None,
):
self.change_node = node_under_test
control_endpoint = node_under_test.service(StorageNode).get_control_endpoint()
with allure.step("Set MaintenanceModeAllowed = false"):
cluster_state_controller.set_maintenance_mode_allowed("false", node_under_test)
with allure.step("Set status node - maintenance"):
with pytest.raises(RuntimeError, match="maintenance mode is not allowed by the network"):
frostfs_cli_remote.control.set_status(endpoint=control_endpoint, status="maintenance")
with allure.step("Set MaintenanceModeAllowed = true"):
cluster_state_controller.set_maintenance_mode_allowed("true", node_under_test)
with allure.step("Set status node - maintenance"):
output = frostfs_cli_remote.control.set_status(endpoint=control_endpoint, status="maintenance")
assert "update request successfully sent" in output.stdout, f"Response = {output}"
with allure.step("Tick epoch"):
self.tick_epoch(wait_block=2)
with allure.step("Check state node = maintenance "):
netmap_node = NetmapParser.snapshot_one_node(
frostfs_cli.netmap.snapshot(
rpc_endpoint=node_under_test.storage_node.get_rpc_endpoint(), wallet=default_wallet
).stdout,
node_under_test,
)
assert (
netmap_node.node_status == ModeNode.MAINTENANCE.value.upper()
), f"Node actual state - {netmap_node.node_status}, expect - {ModeNode.MAINTENANCE.value}"
with allure.step("Set status node - online "):
frostfs_cli_remote.control.set_status(endpoint=control_endpoint, status="online")
with allure.step("Tick epoch"):
self.tick_epoch()
with allure.step("Check state node: online"):
netmap_node = NetmapParser.snapshot_one_node(
frostfs_cli.netmap.snapshot(
rpc_endpoint=node_under_test.storage_node.get_rpc_endpoint(), wallet=default_wallet
).stdout,
node_under_test,
)
assert (
netmap_node.node_status == ModeNode.ONLINE.value.upper()
), f"Node actual state - {netmap_node.node_status}, expect - {ModeNode.ONLINE.value}"

View file

@ -0,0 +1,651 @@
import logging
import os
import random
from time import sleep
from typing import Optional, Tuple
import allure
import pytest
from frostfs_testlib.cli import FrostfsCli
from frostfs_testlib.cli.netmap_parser import NetmapParser
from frostfs_testlib.resources.cli import FROSTFS_CLI_EXEC
from frostfs_testlib.resources.common import DEFAULT_WALLET_CONFIG, MORPH_BLOCK_TIME
from frostfs_testlib.resources.error_patterns import OBJECT_NOT_FOUND
from frostfs_testlib.resources.wellknown_acl import PUBLIC_ACL
from frostfs_testlib.steps.cli.container import create_container, search_nodes_with_container
from frostfs_testlib.steps.cli.object import (
delete_object,
get_object,
get_object_from_random_node,
head_object,
put_object,
put_object_to_random_node,
search_object,
)
from frostfs_testlib.steps.node_management import (
check_node_in_map,
delete_node_data,
drop_object,
exclude_node_from_network_map,
get_locode_from_random_node,
include_node_to_network_map,
node_shard_list,
node_shard_set_mode,
storage_node_set_status,
wait_for_node_to_be_ready,
)
from frostfs_testlib.steps.storage_policy import get_nodes_with_object
from frostfs_testlib.storage.cluster import ClusterNode, StorageNode
from frostfs_testlib.storage.controllers import ClusterStateController
from frostfs_testlib.storage.dataclasses.object_size import ObjectSize
from frostfs_testlib.storage.dataclasses.storage_object_info import ModeNode
from frostfs_testlib.testing.cluster_test_base import ClusterTestBase
from frostfs_testlib.utils import datetime_utils, string_utils
from frostfs_testlib.utils.failover_utils import wait_object_replication
from frostfs_testlib.utils.file_utils import generate_file
from pytest_tests.helpers.utility import wait_for_gc_pass_on_storage_nodes
logger = logging.getLogger("NeoLogger")
check_nodes: list[StorageNode] = []
@allure.title("Add one node to cluster")
@pytest.mark.add_nodes
@pytest.mark.node_mgmt
class TestNodeManagement(ClusterTestBase):
@pytest.fixture
@allure.title("Create container and pick the node with data")
def create_container_and_pick_node(
self, default_wallet: str, simple_object_size: ObjectSize
) -> Tuple[str, StorageNode]:
file_path = generate_file(simple_object_size.value)
placement_rule = "REP 1 IN X CBF 1 SELECT 1 FROM * AS X"
endpoint = self.cluster.default_rpc_endpoint
cid = create_container(
default_wallet,
shell=self.shell,
endpoint=endpoint,
rule=placement_rule,
basic_acl=PUBLIC_ACL,
)
oid = put_object_to_random_node(default_wallet, file_path, cid, self.shell, self.cluster)
nodes = get_nodes_with_object(cid, oid, shell=self.shell, nodes=self.cluster.storage_nodes)
assert len(nodes) == 1
node = nodes[0]
yield cid, node
shards = node_shard_list(node)
assert shards
for shard in shards:
node_shard_set_mode(node, shard, "read-write")
node_shard_list(node)
@allure.step("Tick epoch with retries")
def tick_epoch_with_retries(self, attempts: int = 3, timeout: int = 3, wait_block: int = None):
for attempt in range(attempts):
try:
self.tick_epoch(wait_block=wait_block)
except RuntimeError:
sleep(timeout)
if attempt >= attempts - 1:
raise
continue
return
@pytest.fixture
def after_run_start_all_nodes(self):
yield
self.return_nodes()
@pytest.fixture
def return_nodes_after_test_run(self):
yield
self.return_nodes()
@allure.step("Return node to cluster")
def return_nodes(self, alive_node: Optional[StorageNode] = None) -> None:
for node in list(check_nodes):
with allure.step(f"Start node {node}"):
node.start_service()
with allure.step(f"Waiting status ready for node {node}"):
wait_for_node_to_be_ready(node)
# We need to wait for node to establish notifications from morph-chain
# Otherwise it will hang up when we will try to set status
sleep(datetime_utils.parse_time(MORPH_BLOCK_TIME))
with allure.step(f"Move node {node} to online state"):
storage_node_set_status(node, status="online", retries=2)
check_nodes.remove(node)
sleep(datetime_utils.parse_time(MORPH_BLOCK_TIME))
self.tick_epoch_with_retries(3, wait_block=2)
check_node_in_map(node, shell=self.shell, alive_node=alive_node)
@allure.title("Add one node to cluster")
@pytest.mark.add_nodes
def test_add_nodes(
self,
default_wallet: str,
simple_object_size: ObjectSize,
return_nodes_after_test_run,
):
"""
This test remove one node from frostfs_testlib.storage.cluster then add it back. Test uses base control operations with storage nodes (healthcheck, netmap-snapshot, set-status).
"""
wallet = default_wallet
placement_rule_3 = "REP 3 IN X CBF 1 SELECT 3 FROM * AS X"
placement_rule_4 = "REP 4 IN X CBF 1 SELECT 4 FROM * AS X"
source_file_path = generate_file(simple_object_size.value)
storage_nodes = self.cluster.storage_nodes
random_node = random.choice(storage_nodes[1:])
alive_node = random.choice(
[storage_node for storage_node in storage_nodes if storage_node.id != random_node.id]
)
check_node_in_map(random_node, shell=self.shell, alive_node=alive_node)
# Add node to recovery list before messing with it
check_nodes.append(random_node)
exclude_node_from_network_map(random_node, alive_node, shell=self.shell, cluster=self.cluster)
delete_node_data(random_node)
cid = create_container(
wallet,
rule=placement_rule_3,
basic_acl=PUBLIC_ACL,
shell=self.shell,
endpoint=alive_node.get_rpc_endpoint(),
)
oid = put_object(
wallet,
source_file_path,
cid,
shell=self.shell,
endpoint=alive_node.get_rpc_endpoint(),
)
wait_object_replication(cid, oid, 3, shell=self.shell, nodes=storage_nodes)
self.return_nodes(alive_node)
with allure.step("Check data could be replicated to new node"):
random_node = random.choice(list(set(storage_nodes) - {random_node, alive_node}))
# Add node to recovery list before messing with it
check_nodes.append(random_node)
exclude_node_from_network_map(random_node, alive_node, shell=self.shell, cluster=self.cluster)
wait_object_replication(
cid,
oid,
3,
shell=self.shell,
nodes=list(set(storage_nodes) - {random_node}),
)
include_node_to_network_map(random_node, alive_node, shell=self.shell, cluster=self.cluster)
wait_object_replication(cid, oid, 3, shell=self.shell, nodes=storage_nodes)
with allure.step("Check container could be created with new node"):
cid = create_container(
wallet,
rule=placement_rule_4,
basic_acl=PUBLIC_ACL,
shell=self.shell,
endpoint=alive_node.get_rpc_endpoint(),
)
oid = put_object(
wallet,
source_file_path,
cid,
shell=self.shell,
endpoint=alive_node.get_rpc_endpoint(),
)
wait_object_replication(cid, oid, 4, shell=self.shell, nodes=storage_nodes)
@pytest.mark.node_mgmt
@allure.title("Drop object using control command")
def test_drop_object(self, default_wallet, complex_object_size: ObjectSize, simple_object_size: ObjectSize):
"""
Test checks object could be dropped using `frostfs-cli control drop-objects` command.
"""
wallet = default_wallet
endpoint = self.cluster.default_rpc_endpoint
file_path_simple = generate_file(simple_object_size.value)
file_path_complex = generate_file(complex_object_size.value)
locode = get_locode_from_random_node(self.cluster)
rule = f"REP 1 IN SE CBF 1 SELECT 1 FROM LOC AS SE FILTER 'UN-LOCODE' EQ '{locode}' AS LOC"
cid = create_container(wallet, rule=rule, shell=self.shell, endpoint=endpoint)
oid_simple = put_object_to_random_node(wallet, file_path_simple, cid, shell=self.shell, cluster=self.cluster)
oid_complex = put_object_to_random_node(wallet, file_path_complex, cid, shell=self.shell, cluster=self.cluster)
for oid in (oid_simple, oid_complex):
get_object_from_random_node(wallet, cid, oid, shell=self.shell, cluster=self.cluster)
head_object(wallet, cid, oid, shell=self.shell, endpoint=endpoint)
nodes_with_object = get_nodes_with_object(cid, oid_simple, shell=self.shell, nodes=self.cluster.storage_nodes)
random_node = random.choice(nodes_with_object)
for oid in (oid_simple, oid_complex):
with allure.step(f"Drop object {oid}"):
get_object_from_random_node(wallet, cid, oid, shell=self.shell, cluster=self.cluster)
head_object(wallet, cid, oid, shell=self.shell, endpoint=endpoint)
drop_object(random_node, cid, oid)
self.wait_for_obj_dropped(wallet, cid, oid, endpoint, get_object)
self.wait_for_obj_dropped(wallet, cid, oid, endpoint, head_object)
@pytest.mark.node_mgmt
@pytest.mark.skip(reason="Need to clarify scenario")
@allure.title("Control Operations with storage nodes")
def test_shards(
self,
default_wallet,
create_container_and_pick_node,
simple_object_size: ObjectSize,
):
wallet = default_wallet
file_path = generate_file(simple_object_size.value)
cid, node = create_container_and_pick_node
original_oid = put_object_to_random_node(wallet, file_path, cid, self.shell, self.cluster)
# for mode in ('read-only', 'degraded'):
for mode in ("degraded",):
shards = node_shard_list(node)
assert shards
for shard in shards:
node_shard_set_mode(node, shard, mode)
shards = node_shard_list(node)
assert shards
with pytest.raises(RuntimeError):
put_object_to_random_node(wallet, file_path, cid, self.shell, self.cluster)
with pytest.raises(RuntimeError):
delete_object(wallet, cid, original_oid, self.shell, self.cluster.default_rpc_endpoint)
get_object_from_random_node(wallet, cid, original_oid, self.shell, self.cluster)
for shard in shards:
node_shard_set_mode(node, shard, "read-write")
shards = node_shard_list(node)
assert shards
oid = put_object_to_random_node(wallet, file_path, cid, self.shell, self.cluster)
delete_object(wallet, cid, oid, self.shell, self.cluster.default_rpc_endpoint)
@pytest.mark.node_mgmt
@allure.title("Put object with stopped node")
def test_stop_node(self, default_wallet, return_nodes_after_test_run, simple_object_size: ObjectSize):
wallet = default_wallet
placement_rule = "REP 3 IN X SELECT 4 FROM * AS X"
source_file_path = generate_file(simple_object_size.value)
storage_nodes = self.cluster.storage_nodes
random_node = random.choice(storage_nodes[1:])
alive_node = random.choice(
[storage_node for storage_node in storage_nodes if storage_node.id != random_node.id]
)
cid = create_container(
wallet,
rule=placement_rule,
basic_acl=PUBLIC_ACL,
shell=self.shell,
endpoint=random_node.get_rpc_endpoint(),
)
with allure.step("Stop the random node"):
check_nodes.append(random_node)
random_node.stop_service()
with allure.step("Try to put an object and expect success"):
put_object(
wallet,
source_file_path,
cid,
shell=self.shell,
endpoint=alive_node.get_rpc_endpoint(),
)
self.return_nodes(alive_node)
@allure.step("Wait for object to be dropped")
def wait_for_obj_dropped(self, wallet: str, cid: str, oid: str, endpoint: str, checker) -> None:
for _ in range(3):
try:
checker(wallet, cid, oid, shell=self.shell, endpoint=endpoint)
wait_for_gc_pass_on_storage_nodes()
except Exception as err:
if string_utils.is_str_match_pattern(err, OBJECT_NOT_FOUND):
return
raise AssertionError(f'Expected "{OBJECT_NOT_FOUND}" error, got\n{err}')
raise AssertionError(f"Object {oid} was not dropped from node")
@pytest.mark.maintenance
class TestMaintenanceMode(ClusterTestBase):
change_node: ClusterNode = None
@pytest.fixture()
@allure.title("Init Frostfs CLI remote")
def frostfs_cli_remote(self, node_under_test: ClusterNode) -> FrostfsCli:
host = node_under_test.host
service_config = host.get_service_config(node_under_test.storage_node.name)
wallet_path = service_config.attributes["wallet_path"]
wallet_password = service_config.attributes["wallet_password"]
shell = host.get_shell()
wallet_config_path = f"/tmp/{node_under_test.storage_node.name}-config.yaml"
wallet_config = f'wallet: {wallet_path}\npassword: "{wallet_password}"'
shell.exec(f"echo '{wallet_config}' > {wallet_config_path}")
cli = FrostfsCli(shell=shell, frostfs_cli_exec_path=FROSTFS_CLI_EXEC, config_file=wallet_config_path)
return cli
@pytest.fixture()
@allure.title("Init Frostfs CLI remote")
def frostfs_cli(self) -> FrostfsCli:
cli = FrostfsCli(shell=self.shell, frostfs_cli_exec_path=FROSTFS_CLI_EXEC, config_file=DEFAULT_WALLET_CONFIG)
return cli
@pytest.fixture()
def restore_online_mode_node(self, cluster_state_controller: ClusterStateController, default_wallet: str):
yield
cluster_state_controller.set_mode_node(cluster_node=self.change_node, wallet=default_wallet, status="online")
self.tick_epoch(wait_block=2)
def basic_operations(self, wallet, cid, oid, shell, endpoint, matchs, object_size):
file_path = generate_file(object_size)
default_kw = {"wallet": wallet, "cid": cid, "shell": shell, "endpoint": endpoint}
operations = {
get_object: {"oid": oid},
search_object: {},
delete_object: {"oid": oid},
put_object: {"path": file_path},
}
for index, operation, kw in enumerate(operations.items()):
with allure.step(f"Run {operation.__name__} object, waiting response - {matchs[index]}"):
default_kw.update(kw)
if operation == search_object and "Found" in matchs[index]:
operation(**default_kw)
continue
with pytest.raises(RuntimeError, match=matchs[index]):
operation(**default_kw)
os.remove(file_path)
@allure.title("Test of basic node operations in maintenance mode")
def test_maintenance_mode(
self,
default_wallet: str,
simple_object_size: ObjectSize,
cluster_state_controller: ClusterStateController,
restore_online_mode_node: None,
):
with allure.step("Create container and create\put object"):
cid = create_container(
wallet=default_wallet,
shell=self.shell,
endpoint=self.cluster.default_rpc_endpoint,
rule="REP 1 CBF 1",
)
node = search_nodes_with_container(
wallet=default_wallet,
cid=cid,
shell=self.shell,
endpoint=self.cluster.default_rpc_endpoint,
cluster=self.cluster,
)
self.change_node = node[0]
file_path = generate_file(simple_object_size.value)
oid = put_object(
wallet=default_wallet,
path=file_path,
cid=cid,
shell=self.shell,
endpoint=self.change_node.storage_node.get_rpc_endpoint(),
)
with allure.step("Enable MaintenanceModeAllowed:"):
cluster_state_controller.set_mode_node(
cluster_node=self.change_node, wallet=default_wallet, status="maintenance"
)
other_nodes = list(set(self.cluster.cluster_nodes) - set(node))
node_and_match = {
self.change_node: ["node is under maintenance"] * 4,
other_nodes[0]: [
"object not found",
"Found 0 objects",
"object not found",
"node is under maintenance",
],
}
with allure.step("Run basic operations"):
for cluster_node, matchs in node_and_match.items():
self.basic_operations(
wallet=default_wallet,
cid=cid,
oid=oid,
shell=self.shell,
endpoint=cluster_node.storage_node.get_rpc_endpoint(),
matchs=matchs,
object_size=simple_object_size.value,
)
@pytest.mark.sanity
@allure.title("MAINTENANCE and OFFLINE mode transitions")
def test_mode_transitions(
self,
cluster_state_controller: ClusterStateController,
node_under_test: ClusterNode,
default_wallet: str,
frostfs_cli: FrostfsCli,
restore_online_mode_node: None,
):
self.change_node = node_under_test
cluster_nodes = list(set(self.cluster.cluster_nodes) - {node_under_test})
with allure.step("Tick epoch"):
self.tick_epochs(epochs_to_tick=2, alive_node=cluster_nodes[0].storage_node, wait_block=2)
with allure.step("Set node mode to offline"):
cluster_state_controller.set_mode_node(
cluster_node=node_under_test,
wallet=default_wallet,
status=ModeNode.OFFLINE.value,
)
with allure.step("Tick epoch to update the network map"):
self.tick_epochs(epochs_to_tick=2, alive_node=cluster_nodes[0].storage_node, wait_block=2)
with ((allure.step("Check node mode = offline, after update the network map"))):
netmap = frostfs_cli.netmap.snapshot(
rpc_endpoint=cluster_nodes[0].storage_node.get_rpc_endpoint(), wallet=default_wallet
).stdout
netmap = NetmapParser.snapshot_all_nodes(netmap)
assert node_under_test.host_ip not in [
netmap_node.node for netmap_node in netmap
], f"Node {node_under_test.host_ip} not in state offline"
with allure.step("Restart storage service"):
cluster_state_controller.stop_storage_service(node_under_test)
cluster_state_controller.start_storage_service(node_under_test)
with allure.step("Tick epoch after restart storage service and set mode to offline"):
self.tick_epochs(epochs_to_tick=2, alive_node=cluster_nodes[0].storage_node, wait_block=2)
with (allure.step("Check node mode = online, after restart storage service")):
netmap = frostfs_cli.netmap.snapshot(
rpc_endpoint=cluster_nodes[0].storage_node.get_rpc_endpoint(), wallet=default_wallet
).stdout
node_state = NetmapParser.snapshot_one_node(netmap, node_under_test).node_status
assert node_state == ModeNode.ONLINE.value.upper(), (
f"Node actual state - {node_state}, " f"expect - {ModeNode.ONLINE.value}"
)
with allure.step("Tick epoch"):
self.tick_epochs(epochs_to_tick=2, alive_node=cluster_nodes[0].storage_node, wait_block=2)
with allure.step("Set node mode to maintenance"):
cluster_state_controller.set_mode_node(
cluster_node=node_under_test, wallet=default_wallet, status=ModeNode.MAINTENANCE.value
)
with allure.step("Restart storage service"):
cluster_state_controller.stop_storage_service(node_under_test)
cluster_state_controller.start_storage_service(node_under_test)
with allure.step("Tick epoch after restart storage service"):
self.tick_epochs(epochs_to_tick=2, alive_node=cluster_nodes[0].storage_node, wait_block=2)
with allure.step("Check node mode = maintenance, after restart storage service and tick epoch"):
netmap = frostfs_cli.netmap.snapshot(
rpc_endpoint=cluster_nodes[0].storage_node.get_rpc_endpoint(), wallet=default_wallet
).stdout
node_state = NetmapParser.snapshot_one_node(netmap, node_under_test).node_status
assert node_state == ModeNode.MAINTENANCE.value.upper(), (
f"Node actual state - {node_state}, " f"expect - {ModeNode.MAINTENANCE.value}"
)
with allure.step("Tick epoch"):
self.tick_epochs(epochs_to_tick=2, alive_node=cluster_nodes[0].storage_node, wait_block=2)
with allure.step("Set node mode to offline"):
netmap = frostfs_cli.netmap.snapshot(
rpc_endpoint=cluster_nodes[0].storage_node.get_rpc_endpoint(), wallet=default_wallet
).stdout
node_state = NetmapParser.snapshot_one_node(netmap, node_under_test).node_status
assert node_state == ModeNode.OFFLINE.value.upper(), (
f"Node actual state - {node_state}, " f"expect - {ModeNode.OFFLINE.value}"
)
with allure.step("Stop storage service"):
cluster_state_controller.stop_storage_service(node_under_test)
with allure.step("Tick epoch"):
self.tick_epochs(epochs_to_tick=2, alive_node=cluster_nodes[0].storage_node, wait_block=2)
with allure.step("Start storage service"):
cluster_state_controller.start_storage_service(node_under_test)
with allure.step("Check node mode = offline, after tick epoch and start storage service"):
netmap = frostfs_cli.netmap.snapshot(
rpc_endpoint=cluster_nodes[0].storage_node.get_rpc_endpoint(), wallet=default_wallet
).stdout
node_state = NetmapParser.snapshot_one_node(netmap, node_under_test).node_status
assert node_state == ModeNode.OFFLINE.value.upper(), (
f"Node actual state - {node_state}, " f"expect - {ModeNode.OFFLINE.value}"
)
with allure.step("Tick epoch"):
self.tick_epochs(epochs_to_tick=2, alive_node=cluster_nodes[0].storage_node, wait_block=2)
with allure.step("Check node mode = online, after start storage service and tick epoch"):
netmap = frostfs_cli.netmap.snapshot(
rpc_endpoint=cluster_nodes[0].storage_node.get_rpc_endpoint(), wallet=default_wallet
).stdout
node_state = NetmapParser.snapshot_one_node(netmap, node_under_test).node_status
assert node_state == ModeNode.ONLINE.value.upper(), (
f"Node actual state - {node_state}, " f"expect - {ModeNode.ONLINE.value}"
)
with allure.step("Tick epoch"):
self.tick_epochs(epochs_to_tick=2, alive_node=cluster_nodes[0].storage_node, wait_block=2)
with allure.step("Set node mode to maintenance"):
cluster_state_controller.set_mode_node(
cluster_node=node_under_test, wallet=default_wallet, status=ModeNode.MAINTENANCE.value
)
with allure.step("Stop storage service"):
cluster_state_controller.stop_storage_service(node_under_test)
with allure.step("Tick epoch"):
self.tick_epochs(epochs_to_tick=2, alive_node=cluster_nodes[0].storage_node, wait_block=2)
with allure.step("Start storage service"):
cluster_state_controller.start_storage_service(node_under_test)
with allure.step("Check node mode = maintenance"):
netmap = frostfs_cli.netmap.snapshot(
rpc_endpoint=cluster_nodes[0].storage_node.get_rpc_endpoint(), wallet=default_wallet
).stdout
node_state = NetmapParser.snapshot_one_node(netmap, node_under_test).node_status
assert (
node_state == ModeNode.MAINTENANCE.value.upper()
), f"Node actual state - {node_state}, expect - {ModeNode.MAINTENANCE.value}"
with allure.step("Tick epoch"):
self.tick_epochs(epochs_to_tick=2, alive_node=cluster_nodes[0].storage_node, wait_block=2)
with allure.step("Check node mode = maintenance"):
netmap = frostfs_cli.netmap.snapshot(
rpc_endpoint=cluster_nodes[0].storage_node.get_rpc_endpoint(), wallet=default_wallet
).stdout
node_state = NetmapParser.snapshot_one_node(netmap, node_under_test).node_status
assert node_state == ModeNode.MAINTENANCE.value.upper(), (
f"Node actual state - {node_state}, " f"expect - {ModeNode.MAINTENANCE.value}"
)
@allure.title("A node cannot go into maintenance if maintenance is prohibited globally in the network")
def test_maintenance_globally_forbidden(
self,
cluster_state_controller: ClusterStateController,
node_under_test: ClusterNode,
frostfs_cli_remote: FrostfsCli,
frostfs_cli: FrostfsCli,
default_wallet: str,
restore_online_mode_node: None,
):
self.change_node = node_under_test
control_endpoint = node_under_test.service(StorageNode).get_control_endpoint()
with allure.step("Set MaintenanceModeAllowed = false"):
cluster_state_controller.set_maintenance_mode_allowed("false", node_under_test)
with allure.step("Set status node - maintenance"):
with pytest.raises(RuntimeError, match="maintenance mode is not allowed by the network"):
frostfs_cli_remote.control.set_status(endpoint=control_endpoint, status="maintenance")
with allure.step("Set MaintenanceModeAllowed = true"):
cluster_state_controller.set_maintenance_mode_allowed("true", node_under_test)
with allure.step("Set status node - maintenance"):
output = frostfs_cli_remote.control.set_status(endpoint=control_endpoint, status="maintenance")
assert "update request successfully sent" in output.stdout, f"Response = {output}"
with allure.step("Tick epoch"):
self.tick_epoch(wait_block=2)
with allure.step("Check state node = maintenance "):
netmap_node = NetmapParser.snapshot_one_node(
frostfs_cli.netmap.snapshot(
rpc_endpoint=node_under_test.storage_node.get_rpc_endpoint(), wallet=default_wallet
).stdout,
node_under_test,
)
assert (
netmap_node.node_status == ModeNode.MAINTENANCE.value.upper()
), f"Node actual state - {netmap_node.node_status}, expect - {ModeNode.MAINTENANCE.value}"
with allure.step("Set status node - online "):
frostfs_cli_remote.control.set_status(endpoint=control_endpoint, status="online")
with allure.step("Tick epoch"):
self.tick_epoch()
with allure.step("Check state node: online"):
netmap_node = NetmapParser.snapshot_one_node(
frostfs_cli.netmap.snapshot(
rpc_endpoint=node_under_test.storage_node.get_rpc_endpoint(), wallet=default_wallet
).stdout,
node_under_test,
)
assert (
netmap_node.node_status == ModeNode.ONLINE.value.upper()
), f"Node actual state - {netmap_node.node_status}, expect - {ModeNode.ONLINE.value}"

View file

@ -1,322 +0,0 @@
import logging
import random
from time import sleep
from typing import Optional, Tuple
import allure
import pytest
from frostfs_testlib.resources.common import MORPH_BLOCK_TIME
from frostfs_testlib.resources.error_patterns import OBJECT_NOT_FOUND
from frostfs_testlib.resources.wellknown_acl import PUBLIC_ACL
from frostfs_testlib.steps.cli.container import create_container
from frostfs_testlib.steps.cli.object import (
delete_object,
get_object,
get_object_from_random_node,
head_object,
put_object,
put_object_to_random_node,
)
from frostfs_testlib.steps.node_management import (
check_node_in_map,
delete_node_data,
drop_object,
exclude_node_from_network_map,
get_locode_from_random_node,
include_node_to_network_map,
node_shard_list,
node_shard_set_mode,
storage_node_set_status,
wait_for_node_to_be_ready,
)
from frostfs_testlib.steps.storage_policy import get_nodes_with_object
from frostfs_testlib.storage.cluster import StorageNode
from frostfs_testlib.storage.dataclasses.object_size import ObjectSize
from frostfs_testlib.testing.cluster_test_base import ClusterTestBase
from frostfs_testlib.utils import datetime_utils, string_utils
from frostfs_testlib.utils.failover_utils import wait_object_replication
from frostfs_testlib.utils.file_utils import generate_file
from pytest_tests.helpers.utility import wait_for_gc_pass_on_storage_nodes
logger = logging.getLogger("NeoLogger")
check_nodes: list[StorageNode] = []
@allure.title("Add one node to cluster")
@pytest.mark.add_nodes
@pytest.mark.node_mgmt
class TestNodeManagement(ClusterTestBase):
@pytest.fixture
@allure.title("Create container and pick the node with data")
def create_container_and_pick_node(
self, default_wallet: str, simple_object_size: ObjectSize
) -> Tuple[str, StorageNode]:
file_path = generate_file(simple_object_size.value)
placement_rule = "REP 1 IN X CBF 1 SELECT 1 FROM * AS X"
endpoint = self.cluster.default_rpc_endpoint
cid = create_container(
default_wallet,
shell=self.shell,
endpoint=endpoint,
rule=placement_rule,
basic_acl=PUBLIC_ACL,
)
oid = put_object_to_random_node(default_wallet, file_path, cid, self.shell, self.cluster)
nodes = get_nodes_with_object(cid, oid, shell=self.shell, nodes=self.cluster.storage_nodes)
assert len(nodes) == 1
node = nodes[0]
yield cid, node
shards = node_shard_list(node)
assert shards
for shard in shards:
node_shard_set_mode(node, shard, "read-write")
node_shard_list(node)
@allure.step("Tick epoch with retries")
def tick_epoch_with_retries(self, attempts: int = 3, timeout: int = 3, wait_block: int = None):
for attempt in range(attempts):
try:
self.tick_epoch(wait_block=wait_block)
except RuntimeError:
sleep(timeout)
if attempt >= attempts - 1:
raise
continue
return
@pytest.fixture
def after_run_start_all_nodes(self):
yield
self.return_nodes()
@pytest.fixture
def return_nodes_after_test_run(self):
yield
self.return_nodes()
@allure.step("Return node to cluster")
def return_nodes(self, alive_node: Optional[StorageNode] = None) -> None:
for node in list(check_nodes):
with allure.step(f"Start node {node}"):
node.start_service()
with allure.step(f"Waiting status ready for node {node}"):
wait_for_node_to_be_ready(node)
# We need to wait for node to establish notifications from morph-chain
# Otherwise it will hang up when we will try to set status
sleep(datetime_utils.parse_time(MORPH_BLOCK_TIME))
with allure.step(f"Move node {node} to online state"):
storage_node_set_status(node, status="online", retries=2)
check_nodes.remove(node)
sleep(datetime_utils.parse_time(MORPH_BLOCK_TIME))
self.tick_epoch_with_retries(3, wait_block=2)
check_node_in_map(node, shell=self.shell, alive_node=alive_node)
@allure.title("Add one node to cluster")
@pytest.mark.add_nodes
def test_add_nodes(
self,
default_wallet: str,
simple_object_size: ObjectSize,
return_nodes_after_test_run,
):
"""
This test remove one node from frostfs_testlib.storage.cluster then add it back. Test uses base control operations with storage nodes (healthcheck, netmap-snapshot, set-status).
"""
wallet = default_wallet
placement_rule_3 = "REP 3 IN X CBF 1 SELECT 3 FROM * AS X"
placement_rule_4 = "REP 4 IN X CBF 1 SELECT 4 FROM * AS X"
source_file_path = generate_file(simple_object_size.value)
storage_nodes = self.cluster.storage_nodes
random_node = random.choice(storage_nodes[1:])
alive_node = random.choice(
[storage_node for storage_node in storage_nodes if storage_node.id != random_node.id]
)
check_node_in_map(random_node, shell=self.shell, alive_node=alive_node)
# Add node to recovery list before messing with it
check_nodes.append(random_node)
exclude_node_from_network_map(random_node, alive_node, shell=self.shell, cluster=self.cluster)
delete_node_data(random_node)
cid = create_container(
wallet,
rule=placement_rule_3,
basic_acl=PUBLIC_ACL,
shell=self.shell,
endpoint=alive_node.get_rpc_endpoint(),
)
oid = put_object(
wallet,
source_file_path,
cid,
shell=self.shell,
endpoint=alive_node.get_rpc_endpoint(),
)
wait_object_replication(cid, oid, 3, shell=self.shell, nodes=storage_nodes)
self.return_nodes(alive_node)
with allure.step("Check data could be replicated to new node"):
random_node = random.choice(list(set(storage_nodes) - {random_node, alive_node}))
# Add node to recovery list before messing with it
check_nodes.append(random_node)
exclude_node_from_network_map(random_node, alive_node, shell=self.shell, cluster=self.cluster)
wait_object_replication(
cid,
oid,
3,
shell=self.shell,
nodes=list(set(storage_nodes) - {random_node}),
)
include_node_to_network_map(random_node, alive_node, shell=self.shell, cluster=self.cluster)
wait_object_replication(cid, oid, 3, shell=self.shell, nodes=storage_nodes)
with allure.step("Check container could be created with new node"):
cid = create_container(
wallet,
rule=placement_rule_4,
basic_acl=PUBLIC_ACL,
shell=self.shell,
endpoint=alive_node.get_rpc_endpoint(),
)
oid = put_object(
wallet,
source_file_path,
cid,
shell=self.shell,
endpoint=alive_node.get_rpc_endpoint(),
)
wait_object_replication(cid, oid, 4, shell=self.shell, nodes=storage_nodes)
@pytest.mark.node_mgmt
@allure.title("Drop object using control command")
def test_drop_object(self, default_wallet, complex_object_size: ObjectSize, simple_object_size: ObjectSize):
"""
Test checks object could be dropped using `frostfs-cli control drop-objects` command.
"""
wallet = default_wallet
endpoint = self.cluster.default_rpc_endpoint
file_path_simple = generate_file(simple_object_size.value)
file_path_complex = generate_file(complex_object_size.value)
locode = get_locode_from_random_node(self.cluster)
rule = f"REP 1 IN SE CBF 1 SELECT 1 FROM LOC AS SE FILTER 'UN-LOCODE' EQ '{locode}' AS LOC"
cid = create_container(wallet, rule=rule, shell=self.shell, endpoint=endpoint)
oid_simple = put_object_to_random_node(wallet, file_path_simple, cid, shell=self.shell, cluster=self.cluster)
oid_complex = put_object_to_random_node(wallet, file_path_complex, cid, shell=self.shell, cluster=self.cluster)
for oid in (oid_simple, oid_complex):
get_object_from_random_node(wallet, cid, oid, shell=self.shell, cluster=self.cluster)
head_object(wallet, cid, oid, shell=self.shell, endpoint=endpoint)
nodes_with_object = get_nodes_with_object(cid, oid_simple, shell=self.shell, nodes=self.cluster.storage_nodes)
random_node = random.choice(nodes_with_object)
for oid in (oid_simple, oid_complex):
with allure.step(f"Drop object {oid}"):
get_object_from_random_node(wallet, cid, oid, shell=self.shell, cluster=self.cluster)
head_object(wallet, cid, oid, shell=self.shell, endpoint=endpoint)
drop_object(random_node, cid, oid)
self.wait_for_obj_dropped(wallet, cid, oid, endpoint, get_object)
self.wait_for_obj_dropped(wallet, cid, oid, endpoint, head_object)
@pytest.mark.node_mgmt
@pytest.mark.skip(reason="Need to clarify scenario")
@allure.title("Control Operations with storage nodes")
def test_shards(
self,
default_wallet,
create_container_and_pick_node,
simple_object_size: ObjectSize,
):
wallet = default_wallet
file_path = generate_file(simple_object_size.value)
cid, node = create_container_and_pick_node
original_oid = put_object_to_random_node(wallet, file_path, cid, self.shell, self.cluster)
# for mode in ('read-only', 'degraded'):
for mode in ("degraded",):
shards = node_shard_list(node)
assert shards
for shard in shards:
node_shard_set_mode(node, shard, mode)
shards = node_shard_list(node)
assert shards
with pytest.raises(RuntimeError):
put_object_to_random_node(wallet, file_path, cid, self.shell, self.cluster)
with pytest.raises(RuntimeError):
delete_object(wallet, cid, original_oid, self.shell, self.cluster.default_rpc_endpoint)
get_object_from_random_node(wallet, cid, original_oid, self.shell, self.cluster)
for shard in shards:
node_shard_set_mode(node, shard, "read-write")
shards = node_shard_list(node)
assert shards
oid = put_object_to_random_node(wallet, file_path, cid, self.shell, self.cluster)
delete_object(wallet, cid, oid, self.shell, self.cluster.default_rpc_endpoint)
@pytest.mark.node_mgmt
@allure.title("Put object with stopped node")
def test_stop_node(self, default_wallet, return_nodes_after_test_run, simple_object_size: ObjectSize):
wallet = default_wallet
placement_rule = "REP 3 IN X SELECT 4 FROM * AS X"
source_file_path = generate_file(simple_object_size.value)
storage_nodes = self.cluster.storage_nodes
random_node = random.choice(storage_nodes[1:])
alive_node = random.choice(
[storage_node for storage_node in storage_nodes if storage_node.id != random_node.id]
)
cid = create_container(
wallet,
rule=placement_rule,
basic_acl=PUBLIC_ACL,
shell=self.shell,
endpoint=random_node.get_rpc_endpoint(),
)
with allure.step("Stop the random node"):
check_nodes.append(random_node)
random_node.stop_service()
with allure.step("Try to put an object and expect success"):
put_object(
wallet,
source_file_path,
cid,
shell=self.shell,
endpoint=alive_node.get_rpc_endpoint(),
)
self.return_nodes(alive_node)
@allure.step("Wait for object to be dropped")
def wait_for_obj_dropped(self, wallet: str, cid: str, oid: str, endpoint: str, checker) -> None:
for _ in range(3):
try:
checker(wallet, cid, oid, shell=self.shell, endpoint=endpoint)
wait_for_gc_pass_on_storage_nodes()
except Exception as err:
if string_utils.is_str_match_pattern(err, OBJECT_NOT_FOUND):
return
raise AssertionError(f'Expected "{OBJECT_NOT_FOUND}" error, got\n{err}')
raise AssertionError(f"Object {oid} was not dropped from node")