frostfs-testcases/pytest_tests/testsuites/failovers/test_failover_storage.py
anikeev-yadro eb464f422c Add tests with empty map
Signed-off-by: anikeev-yadro <a.anikeev@yadro.com>
2023-03-17 17:15:01 +03:00

377 lines
15 KiB
Python

import os
import logging
from time import sleep
import allure
import pytest
from frostfs_testlib.analytics import test_case
from frostfs_testlib.hosting import Host
from frostfs_testlib.resources.common import PUBLIC_ACL
from frostfs_testlib.shell import CommandOptions
from frostfs_testlib.utils import datetime_utils
from pytest_tests.resources.common import FROSTFS_CONTRACT_CACHE_TIMEOUT, MORPH_BLOCK_TIME
from pytest_tests.helpers.cluster import Cluster, StorageNode
from pytest_tests.helpers.container import create_container
from pytest_tests.helpers.failover_utils import (
wait_all_storage_nodes_returned,
wait_object_replication,
)
from pytest_tests.helpers.file_helper import generate_file, get_file_hash
from pytest_tests.helpers.frostfs_verbs import get_object, put_object_to_random_node
from pytest_tests.steps.cluster_test_base import ClusterTestBase
from pytest_tests.helpers.node_management import (
check_node_in_map,
check_node_not_in_map,
exclude_node_from_network_map,
include_node_to_network_map,
stop_random_storage_nodes,
wait_for_node_to_be_ready,
remove_nodes_from_map_morph
)
from pytest_tests.helpers.s3_helper import (
check_objects_in_bucket
)
from pytest_tests.steps import s3_gate_object
from pytest_tests.steps.s3_gate_base import TestS3GateBase
from pytest_tests.helpers.aws_cli_client import AwsCliClient
from pytest_tests.helpers.file_helper import (
generate_file,
get_file_hash,
)
from pytest_tests.helpers.node_management import (
check_node_in_map,
exclude_node_from_network_map,
include_node_to_network_map,
)
logger = logging.getLogger("NeoLogger")
stopped_nodes: list[StorageNode] = []
@allure.step("Return all stopped hosts")
@pytest.fixture(scope="function", autouse=True)
def after_run_return_all_stopped_hosts(cluster: Cluster):
yield
return_stopped_hosts(cluster)
def panic_reboot_host(host: Host) -> None:
shell = host.get_shell()
shell.exec('sudo sh -c "echo 1 > /proc/sys/kernel/sysrq"')
options = CommandOptions(close_stdin=True, timeout=1, check=False)
shell.exec('sudo sh -c "echo b > /proc/sysrq-trigger"', options)
def return_stopped_hosts(cluster: Cluster) -> None:
for node in list(stopped_nodes):
with allure.step(f"Start host {node}"):
node.host.start_host()
stopped_nodes.remove(node)
wait_all_storage_nodes_returned(cluster)
@pytest.mark.failover
class TestFailoverStorage(ClusterTestBase):
@allure.title("Lose and return storage node's host")
@pytest.mark.parametrize("hard_reboot", [True, False])
@pytest.mark.failover_reboot
def test_lose_storage_node_host(
self, default_wallet, hard_reboot: bool, require_multiple_hosts, simple_object_size
):
wallet = default_wallet
placement_rule = "REP 2 IN X CBF 2 SELECT 2 FROM * AS X"
source_file_path = generate_file(simple_object_size)
cid = create_container(
wallet,
shell=self.shell,
endpoint=self.cluster.default_rpc_endpoint,
rule=placement_rule,
basic_acl=PUBLIC_ACL,
)
oid = put_object_to_random_node(
wallet, source_file_path, cid, shell=self.shell, cluster=self.cluster
)
nodes = wait_object_replication(
cid, oid, 2, shell=self.shell, nodes=self.cluster.storage_nodes
)
for node in nodes:
stopped_nodes.append(node)
with allure.step(f"Stop host {node}"):
node.host.stop_host("hard" if hard_reboot else "soft")
new_nodes = wait_object_replication(
cid,
oid,
2,
shell=self.shell,
nodes=list(set(self.cluster.storage_nodes) - {node}),
)
assert all(old_node not in new_nodes for old_node in nodes)
with allure.step("Check object data is not corrupted"):
got_file_path = get_object(
wallet, cid, oid, endpoint=new_nodes[0].get_rpc_endpoint(), shell=self.shell
)
assert get_file_hash(source_file_path) == get_file_hash(got_file_path)
with allure.step("Return all hosts"):
return_stopped_hosts(self.cluster)
with allure.step("Check object data is not corrupted"):
new_nodes = wait_object_replication(
cid, oid, 2, shell=self.shell, nodes=self.cluster.storage_nodes
)
got_file_path = get_object(
wallet, cid, oid, shell=self.shell, endpoint=new_nodes[0].get_rpc_endpoint()
)
assert get_file_hash(source_file_path) == get_file_hash(got_file_path)
@allure.title("Panic storage node's host")
@pytest.mark.parametrize("sequence", [True, False])
@pytest.mark.failover_panic
def test_panic_storage_node_host(
self, default_wallet, require_multiple_hosts, sequence: bool, simple_object_size
):
wallet = default_wallet
placement_rule = "REP 2 IN X CBF 2 SELECT 2 FROM * AS X"
source_file_path = generate_file(simple_object_size)
cid = create_container(
wallet,
shell=self.shell,
endpoint=self.cluster.default_rpc_endpoint,
rule=placement_rule,
basic_acl=PUBLIC_ACL,
)
oid = put_object_to_random_node(
wallet, source_file_path, cid, shell=self.shell, cluster=self.cluster
)
nodes = wait_object_replication(
cid, oid, 2, shell=self.shell, nodes=self.cluster.storage_nodes
)
allure.attach(
"\n".join([str(node) for node in nodes]),
"Current nodes with object",
allure.attachment_type.TEXT,
)
new_nodes: list[StorageNode] = []
for node in nodes:
with allure.step(f"Hard reboot host {node} via magic SysRq option"):
panic_reboot_host(node.host)
if sequence:
try:
new_nodes = wait_object_replication(
cid,
oid,
2,
shell=self.shell,
nodes=list(set(self.cluster.storage_nodes) - {node}),
)
except AssertionError:
new_nodes = wait_object_replication(
cid,
oid,
2,
shell=self.shell,
nodes=self.cluster.storage_nodes,
)
allure.attach(
"\n".join([str(new_node) for new_node in new_nodes]),
f"Nodes with object after {node} fail",
allure.attachment_type.TEXT,
)
if not sequence:
new_nodes = wait_object_replication(
cid, oid, 2, shell=self.shell, nodes=self.cluster.storage_nodes
)
allure.attach(
"\n".join([str(new_node) for new_node in new_nodes]),
"Nodes with object after nodes fail",
allure.attachment_type.TEXT,
)
got_file_path = get_object(
wallet, cid, oid, shell=self.shell, endpoint=new_nodes[0].get_rpc_endpoint()
)
assert get_file_hash(source_file_path) == get_file_hash(got_file_path)
def pytest_generate_tests(metafunc):
if "s3_client" in metafunc.fixturenames:
metafunc.parametrize("s3_client", ["aws cli", "boto3"], indirect=True)
@pytest.mark.failover
@pytest.mark.failover_empty_map
class TestEmptyMap(TestS3GateBase):
"""
A set of tests for makes map empty and verify that we can read objects after that
"""
@allure.step("Teardown after EmptyMap offline test")
@pytest.fixture()
def empty_map_offline_teardown(self):
yield
with allure.step("Return all storage nodes to network map"):
for node in list(stopped_nodes):
include_node_to_network_map(
node, node, shell=self.shell, cluster=self.cluster
)
stopped_nodes.remove(node)
@staticmethod
def object_key_from_file_path(full_path: str) -> str:
return os.path.basename(full_path)
@test_case.title("Test makes network map empty (offline all storage nodes)")
@test_case.priority(test_case.TestCasePriority.HIGH)
@test_case.suite_name("failovers")
@test_case.suite_section("test_failover_storage")
@pytest.mark.failover_empty_map_offlne
@allure.title("Test makes network map empty (offline all storage nodes)")
def test_offline_all_storage_nodes(self, bucket, simple_object_size, empty_map_offline_teardown):
"""
The test makes network map empty (set offline status on all storage nodes) then returns all nodes to map and checks that object can read through s3.
Steps:
1. Check that bucket is empty
2: PUT object into bucket
3: Check that object exists in bucket
4: Exclude all storage nodes from network map (set status OFFLINE)
5: Return all storage nodes to network map
6: Check that we can read object from #2
Args:
bucket: bucket which contains tested object
simple_object_size: size of object
"""
file_path = generate_file(simple_object_size)
file_name = self.object_key_from_file_path(file_path)
bucket_objects = [file_name]
objects_list = s3_gate_object.list_objects_s3(self.s3_client, bucket)
assert not objects_list, f"Expected empty bucket, got {objects_list}"
with allure.step("Put object into bucket"):
s3_gate_object.put_object_s3(self.s3_client, bucket, file_path)
with allure.step("Check that object exists in bucket"):
check_objects_in_bucket(self.s3_client, bucket, bucket_objects)
storage_nodes = self.cluster.storage_nodes
with allure.step("Exclude all storage nodes from network map"):
for node in storage_nodes:
exclude_node_from_network_map(
node, node, shell=self.shell, cluster=self.cluster
)
stopped_nodes.append(node)
with allure.step("Return all storage nodes to network map"):
for node in storage_nodes:
include_node_to_network_map(
node, node, shell=self.shell, cluster=self.cluster
)
stopped_nodes.remove(node)
with allure.step("Check that we can read object"):
check_objects_in_bucket(self.s3_client, bucket, bucket_objects)
objects_list = s3_gate_object.list_objects_s3(self.s3_client, bucket)
assert not objects_list, f"Expected empty bucket, got {objects_list}"
@allure.step("Teardown after EmptyMap stop service test")
@pytest.fixture()
def empty_map_stop_service_teardown(self):
yield
with allure.step("Return all storage nodes to network map"):
for node in list(list(stopped_nodes)):
with allure.step(f"Start node {node}"):
node.start_service()
with allure.step(f"Waiting status ready for node {node}"):
wait_for_node_to_be_ready(node)
sleep(datetime_utils.parse_time(MORPH_BLOCK_TIME))
self.tick_epochs(1)
check_node_in_map(node, shell=self.shell, alive_node=node)
stopped_nodes.remove(node)
@test_case.title("Test makes network map empty (stop storage service on all nodes)")
@test_case.priority(test_case.TestCasePriority.HIGH)
@test_case.suite_name("failovers")
@test_case.suite_section("test_failover_storage")
@pytest.mark.failover_empty_map_stop_service
@allure.title("Test makes network map empty (stop storage service on all nodes)")
def test_stop_all_storage_nodes(self, bucket, simple_object_size, empty_map_stop_service_teardown):
"""
The test makes network map empty (stop storage service on all nodes
then use 'frostfs-adm morph delete-nodes' to delete nodes from map)
then start all services and checks that object can read through s3.
Steps:
1. Check that bucket is empty
2: PUT object into bucket
3: Check that object exists in bucket
4: Exclude all storage nodes from network map (stop storage service
and manual exclude from map)
5: Return all storage nodes to network map
6: Check that we can read object from #2
Args:
bucket: bucket which contains tested object
simple_object_size: size of object
"""
file_path = generate_file(simple_object_size)
file_name = self.object_key_from_file_path(file_path)
bucket_objects = [file_name]
objects_list = s3_gate_object.list_objects_s3(self.s3_client, bucket)
assert not objects_list, f"Expected empty bucket, got {objects_list}"
with allure.step("Put object into bucket"):
s3_gate_object.put_object_s3(self.s3_client, bucket, file_path)
with allure.step("Check that object exists in bucket"):
check_objects_in_bucket(self.s3_client, bucket, bucket_objects)
with allure.step("Stop all storage nodes"):
for node in self.cluster.storage_nodes:
with allure.step(f"Stop storage service on node: {node}"):
node.stop_service()
stopped_nodes.append(node)
with allure.step(f"Remove all nodes from network map"):
remove_nodes_from_map_morph(shell=self.shell, cluster=self.cluster, remove_nodes=stopped_nodes)
with allure.step("Return all storage nodes to network map"):
self.return_nodes_after_stop_with_check_empty_map(stopped_nodes)
with allure.step("Check that object exists in bucket"):
check_objects_in_bucket(self.s3_client, bucket, bucket_objects)
@allure.step("Return all nodes to cluster with check empty map first")
def return_nodes_after_stop_with_check_empty_map(self, return_nodes = None) -> None:
first_node = True
for node in list(return_nodes):
with allure.step(f"Start node {node}"):
node.start_service()
with allure.step(f"Waiting status ready for node {node}"):
wait_for_node_to_be_ready(node)
with allure.step(f"We need to make sure that network map is empty"):
if first_node:
for check_node in list(return_nodes):
check_node_not_in_map(check_node, shell=self.shell, alive_node=node)
first_node = False
sleep(datetime_utils.parse_time(MORPH_BLOCK_TIME))
self.tick_epochs(1)
check_node_in_map(node, shell=self.shell, alive_node=node)
stopped_nodes.remove(node)