frostfs-testcases/pytest_tests/testsuites/failovers/test_failover_server.py

340 lines
14 KiB
Python
Raw Permalink Normal View History

import logging
import os.path
import random
import allure
import pytest
from frostfs_testlib import reporter
from frostfs_testlib.resources.common import MORPH_BLOCK_TIME
from frostfs_testlib.resources.wellknown_acl import PUBLIC_ACL
from frostfs_testlib.steps.cli.container import StorageContainer, StorageContainerInfo, create_container
from frostfs_testlib.steps.cli.object import get_object, get_object_nodes, put_object
from frostfs_testlib.steps.node_management import check_node_in_map, check_node_not_in_map
from frostfs_testlib.storage.cluster import ClusterNode, StorageNode
from frostfs_testlib.storage.controllers import ClusterStateController
from frostfs_testlib.storage.dataclasses.object_size import ObjectSize
from frostfs_testlib.storage.dataclasses.storage_object_info import StorageObjectInfo
from frostfs_testlib.storage.dataclasses.wallet import WalletInfo
from frostfs_testlib.testing.cluster_test_base import ClusterTestBase
from frostfs_testlib.testing.test_control import wait_for_success
from frostfs_testlib.utils import datetime_utils
from frostfs_testlib.utils.failover_utils import wait_object_replication
from frostfs_testlib.utils.file_utils import get_file_hash
from pytest import FixtureRequest
logger = logging.getLogger("NeoLogger")
@pytest.mark.failover
@pytest.mark.failover_server
class TestFailoverServer(ClusterTestBase):
@wait_for_success(max_wait_time=120, interval=1)
def wait_node_not_in_map(self, *args, **kwargs):
check_node_not_in_map(*args, **kwargs)
@wait_for_success(max_wait_time=120, interval=1)
def wait_node_in_map(self, *args, **kwargs):
check_node_in_map(*args, **kwargs)
@reporter.step("Create {count_containers} containers and {count_files} objects")
@pytest.fixture
def containers(
self,
request: FixtureRequest,
default_wallet: str,
) -> list[StorageContainer]:
placement_rule = "REP 2 CBF 2 SELECT 2 FROM *"
containers = []
for _ in range(request.param):
cont_id = create_container(
default_wallet,
shell=self.shell,
endpoint=self.cluster.default_rpc_endpoint,
rule=placement_rule,
basic_acl=PUBLIC_ACL,
)
wallet = WalletInfo(path=default_wallet)
storage_cont_info = StorageContainerInfo(id=cont_id, wallet_file=wallet)
containers.append(
StorageContainer(storage_container_info=storage_cont_info, shell=self.shell, cluster=self.cluster)
)
return containers
@reporter.step("Creation container")
@pytest.fixture()
def container(self, default_wallet: str) -> StorageContainer:
select = len(self.cluster.cluster_nodes)
placement_rule = f"REP {select - 1} CBF 1 SELECT {select} FROM *"
cont_id = create_container(
default_wallet,
shell=self.shell,
endpoint=self.cluster.default_rpc_endpoint,
rule=placement_rule,
basic_acl=PUBLIC_ACL,
)
wallet = WalletInfo(path=default_wallet)
storage_cont_info = StorageContainerInfo(id=cont_id, wallet_file=wallet)
return StorageContainer(storage_container_info=storage_cont_info, shell=self.shell, cluster=self.cluster)
@reporter.step("Create object and delete after test")
@pytest.fixture(scope="class")
def storage_objects(
self,
request: FixtureRequest,
containers: list[StorageContainer],
simple_object_size: ObjectSize,
complex_object_size: ObjectSize,
) -> StorageObjectInfo:
count_object = request.param
object_sizes = [simple_object_size, complex_object_size]
object_list: list[StorageObjectInfo] = []
for cont in containers:
for _ in range(count_object):
object_list.append(cont.generate_object(size=random.choice(object_sizes).value))
for storage_object in object_list:
os.remove(storage_object.file_path)
yield object_list
@reporter.step("Select random node to stop and start it after test")
@pytest.fixture
def node_to_stop(
self, node_under_test: ClusterNode, cluster_state_controller: ClusterStateController
) -> ClusterNode:
yield node_under_test
with reporter.step(f"start {node_under_test.storage_node}"):
cluster_state_controller.start_stopped_hosts()
@reporter.step("Upload object with nodes and compare")
def get_corrupted_objects_list(
self, nodes: list[StorageNode], storage_objects: list[StorageObjectInfo]
) -> list[StorageObjectInfo]:
corrupted_objects = []
errors_get = []
for node in nodes:
for storage_object in storage_objects:
try:
got_file_path = get_object(
storage_object.wallet_file_path,
storage_object.cid,
storage_object.oid,
endpoint=node.get_rpc_endpoint(),
shell=self.shell,
timeout="60s",
)
if storage_object.file_hash != get_file_hash(got_file_path):
corrupted_objects.append(storage_object)
os.remove(got_file_path)
except RuntimeError:
errors_get.append(storage_object.oid)
assert len(errors_get) == 0, f"Get failed - {errors_get}"
return corrupted_objects
def check_objects_replication(
self, storage_objects: list[StorageObjectInfo], storage_nodes: list[StorageNode]
) -> None:
for storage_object in storage_objects:
wait_object_replication(
storage_object.cid,
storage_object.oid,
2,
shell=self.shell,
nodes=storage_nodes,
sleep_interval=45,
attempts=60,
)
@pytest.fixture()
def object_and_nodes(
self, simple_object_size: ObjectSize, container: StorageContainer, default_wallet: str
) -> tuple[StorageObjectInfo, list[ClusterNode]]:
object_info = container.generate_object(simple_object_size.value)
object_nodes = get_object_nodes(
cluster=self.cluster, cid=object_info.cid, oid=object_info.oid, alive_node=self.cluster.cluster_nodes[0]
)
return object_info, object_nodes
@pytest.fixture()
def up_stop_nodes(self, cluster_state_controller: ClusterStateController):
yield
cluster_state_controller.start_stopped_hosts()
@allure.title("Full shutdown node")
@pytest.mark.parametrize("containers, storage_objects", [(5, 10)], indirect=True)
def test_complete_node_shutdown(
self,
storage_objects: list[StorageObjectInfo],
node_to_stop: ClusterNode,
cluster_state_controller: ClusterStateController,
):
with reporter.step(f"Remove {node_to_stop} from the list of nodes"):
alive_nodes = list(set(self.cluster.cluster_nodes) - {node_to_stop})
storage_nodes = [cluster.storage_node for cluster in alive_nodes]
with reporter.step("Tick epoch and wait for 2 blocks"):
self.tick_epochs(1, storage_nodes[0], wait_block=2)
with reporter.step(f"Stop node"):
cluster_state_controller.stop_node_host(node=node_to_stop, mode="hard")
with reporter.step("Verify that there are no corrupted objects"):
corrupted_objects_list = self.get_corrupted_objects_list(storage_nodes, storage_objects)
assert not corrupted_objects_list
with reporter.step(f"check {node_to_stop.storage_node} in map"):
self.wait_node_in_map(node_to_stop.storage_node, self.shell, alive_node=storage_nodes[0])
count_tick_epoch = int(alive_nodes[0].ir_node.get_netmap_cleaner_threshold()) + 4
with reporter.step(f"Tick {count_tick_epoch} epochs and wait for 2 blocks"):
self.tick_epochs(count_tick_epoch, storage_nodes[0], wait_block=2)
with reporter.step(f"Check {node_to_stop} in not map"):
self.wait_node_not_in_map(node_to_stop.storage_node, self.shell, alive_node=storage_nodes[0])
with reporter.step(f"Verify that there are no corrupted objects after {count_tick_epoch} epoch"):
corrupted_objects_list = self.get_corrupted_objects_list(storage_nodes, storage_objects)
assert not corrupted_objects_list
@allure.title("Temporarily disable a node")
@pytest.mark.parametrize("containers, storage_objects", [(5, 10)], indirect=True)
def test_temporarily_disable_a_node(
self,
storage_objects: list[StorageObjectInfo],
node_to_stop: ClusterNode,
cluster_state_controller: ClusterStateController,
):
with reporter.step(f"Remove {node_to_stop} from the list of nodes"):
storage_nodes = list(set(self.cluster.storage_nodes) - {node_to_stop.storage_node})
with reporter.step("Tick epoch and wait for 2 blocks"):
self.tick_epochs(1, storage_nodes[0], wait_block=2)
with reporter.step(f"Stop node"):
cluster_state_controller.stop_node_host(node=node_to_stop, mode="hard")
with reporter.step("Verify that there are no corrupted objects"):
corrupted_objects_list = self.get_corrupted_objects_list(storage_nodes, storage_objects)
assert not corrupted_objects_list
with reporter.step(f"Check {node_to_stop} in map"):
self.wait_node_in_map(node_to_stop.storage_node, self.shell, alive_node=storage_nodes[0])
cluster_state_controller.start_node_host(node_to_stop)
with reporter.step("Verify that there are no corrupted objects"):
corrupted_objects_list = self.get_corrupted_objects_list(storage_nodes, storage_objects)
assert not corrupted_objects_list
@allure.title("Not enough nodes in the container with policy - 'REP 3 CBF 1 SELECT 4 FROM *'")
def test_not_enough_nodes_in_container_rep_3(
self,
container: list[StorageContainer],
object_and_nodes: tuple[StorageObjectInfo, list[ClusterNode]],
default_wallet: str,
cluster_state_controller: ClusterStateController,
simple_file: str,
up_stop_nodes: None,
):
object_info, object_nodes = object_and_nodes
node_not_object = list(set(self.cluster.cluster_nodes) - set(object_nodes))[0]
with reporter.step("Stop all nodes with object, except 1"):
for cluster_node in object_nodes[1:]:
cluster_state_controller.stop_node_host(node=cluster_node, mode="hard")
with reporter.step("Get object"):
with reporter.step(f"Get operation to {node_not_object} where it does not exist, expect success"):
get_object(
wallet=default_wallet,
cid=object_info.cid,
oid=object_info.oid,
shell=self.shell,
endpoint=node_not_object.storage_node.get_rpc_endpoint(),
)
with reporter.step(f"Get operation to {object_nodes[0]} with object, expect success"):
get_object(
wallet=default_wallet,
cid=object_info.cid,
oid=object_info.oid,
shell=self.shell,
endpoint=object_nodes[0].storage_node.get_rpc_endpoint(),
)
with reporter.step(f"Put operation to node with object, expect error"):
with pytest.raises(RuntimeError):
put_object(
wallet=default_wallet,
path=simple_file,
cid=object_info.cid,
shell=self.shell,
endpoint=node_not_object.storage_node.get_rpc_endpoint(),
)
@allure.title("Not enough nodes in the container with policy - 'REP 2 CBF 2 SELECT 4 FROM *'")
def test_not_enough_nodes_in_container_rep_2(
self,
default_wallet: str,
cluster_state_controller: ClusterStateController,
simple_file: str,
up_stop_nodes: None,
):
with reporter.step("Creating a container with a full network map"):
select = len(self.cluster.cluster_nodes)
placement_rule = f"REP {select - 2} IN X CBF 2 SELECT {select} FROM * AS X"
cid_1 = create_container(
default_wallet,
shell=self.shell,
endpoint=self.cluster.default_rpc_endpoint,
rule=placement_rule,
basic_acl=PUBLIC_ACL,
)
with reporter.step("Put object"):
oid = put_object(
wallet=default_wallet,
path=simple_file,
cid=cid_1,
shell=self.shell,
endpoint=self.cluster.default_rpc_endpoint,
)
with reporter.step("Search nodes with object"):
object_nodes = get_object_nodes(
cluster=self.cluster, cid=cid_1, oid=oid, alive_node=self.cluster.cluster_nodes[0]
)
with reporter.step("Turn off random node with object"):
cluster_state_controller.stop_node_host(node=random.choice(object_nodes[1:]), mode="hard")
with reporter.step("Checking PUT operation"):
oid_2 = put_object(
wallet=default_wallet,
path=simple_file,
cid=cid_1,
shell=self.shell,
endpoint=self.cluster.default_rpc_endpoint,
)
with reporter.step("Checking GET operation"):
get_file = get_object(
wallet=default_wallet,
cid=cid_1,
oid=oid_2,
shell=self.shell,
endpoint=object_nodes[0].storage_node.get_rpc_endpoint(),
)
os.remove(get_file)
with reporter.step("Checking creating container"):
_ = create_container(
default_wallet,
shell=self.shell,
endpoint=object_nodes[0].storage_node.get_rpc_endpoint(),
rule=placement_rule,
basic_acl=PUBLIC_ACL,
)