497 lines
20 KiB
Python
497 lines
20 KiB
Python
import logging
|
|
import os
|
|
from time import sleep
|
|
|
|
import allure
|
|
import pytest
|
|
from frostfs_testlib.analytics import test_case
|
|
from frostfs_testlib.hosting import Host
|
|
from frostfs_testlib.resources.common import MORPH_BLOCK_TIME
|
|
from frostfs_testlib.resources.wellknown_acl import PUBLIC_ACL
|
|
from frostfs_testlib.s3 import AwsCliClient, Boto3ClientWrapper, S3ClientWrapper, VersioningStatus
|
|
from frostfs_testlib.shell import CommandOptions, Shell
|
|
from frostfs_testlib.steps.cli.container import create_container
|
|
from frostfs_testlib.steps.cli.object import get_object, put_object_to_random_node
|
|
from frostfs_testlib.steps.node_management import (
|
|
check_node_in_map,
|
|
check_node_not_in_map,
|
|
exclude_node_from_network_map,
|
|
include_node_to_network_map,
|
|
remove_nodes_from_map_morph,
|
|
wait_for_node_to_be_ready,
|
|
)
|
|
from frostfs_testlib.steps.s3 import s3_helper
|
|
from frostfs_testlib.storage.cluster import Cluster, StorageNode
|
|
from frostfs_testlib.storage.controllers.cluster_state_controller import ClusterStateController
|
|
from frostfs_testlib.testing.cluster_test_base import ClusterTestBase
|
|
from frostfs_testlib.utils import datetime_utils
|
|
from frostfs_testlib.utils.failover_utils import (
|
|
wait_all_storage_nodes_returned,
|
|
wait_object_replication,
|
|
)
|
|
from frostfs_testlib.utils.file_utils import generate_file, get_file_hash
|
|
|
|
logger = logging.getLogger("NeoLogger")
|
|
stopped_nodes: list[StorageNode] = []
|
|
|
|
|
|
@allure.step("Return all stopped hosts")
|
|
@pytest.fixture(scope="function", autouse=True)
|
|
def after_run_return_all_stopped_hosts(client_shell: Shell, cluster: Cluster):
|
|
yield
|
|
return_stopped_hosts(client_shell, cluster)
|
|
|
|
|
|
def panic_reboot_host(host: Host) -> None:
|
|
shell = host.get_shell()
|
|
shell.exec('sudo sh -c "echo 1 > /proc/sys/kernel/sysrq"')
|
|
|
|
options = CommandOptions(close_stdin=True, timeout=1, check=False)
|
|
shell.exec('sudo sh -c "echo b > /proc/sysrq-trigger"', options)
|
|
|
|
|
|
def return_stopped_hosts(shell: Shell, cluster: Cluster) -> None:
|
|
for node in list(stopped_nodes):
|
|
with allure.step(f"Start host {node}"):
|
|
node.host.start_host()
|
|
stopped_nodes.remove(node)
|
|
|
|
wait_all_storage_nodes_returned(shell, cluster)
|
|
|
|
|
|
@pytest.mark.failover
|
|
class TestFailoverStorage(ClusterTestBase):
|
|
@pytest.fixture(scope="function", autouse=True)
|
|
def start_stopped_services(self, cluster_state_controller: ClusterStateController):
|
|
yield
|
|
cluster_state_controller.start_stopped_storage_services()
|
|
|
|
@allure.title("Lose and return storage node's host")
|
|
@pytest.mark.parametrize("hard_reboot", [True, False])
|
|
@pytest.mark.failover_reboot
|
|
def test_lose_storage_node_host(
|
|
self, default_wallet, hard_reboot: bool, require_multiple_hosts, simple_object_size
|
|
):
|
|
wallet = default_wallet
|
|
placement_rule = "REP 2 IN X CBF 2 SELECT 2 FROM * AS X"
|
|
source_file_path = generate_file(simple_object_size)
|
|
cid = create_container(
|
|
wallet,
|
|
shell=self.shell,
|
|
endpoint=self.cluster.default_rpc_endpoint,
|
|
rule=placement_rule,
|
|
basic_acl=PUBLIC_ACL,
|
|
)
|
|
oid = put_object_to_random_node(
|
|
wallet, source_file_path, cid, shell=self.shell, cluster=self.cluster
|
|
)
|
|
nodes = wait_object_replication(
|
|
cid, oid, 2, shell=self.shell, nodes=self.cluster.storage_nodes
|
|
)
|
|
|
|
for node in nodes:
|
|
stopped_nodes.append(node)
|
|
|
|
with allure.step(f"Stop host {node}"):
|
|
node.host.stop_host("hard" if hard_reboot else "soft")
|
|
|
|
new_nodes = wait_object_replication(
|
|
cid,
|
|
oid,
|
|
2,
|
|
shell=self.shell,
|
|
nodes=list(set(self.cluster.storage_nodes) - {node}),
|
|
)
|
|
assert all(old_node not in new_nodes for old_node in nodes)
|
|
|
|
with allure.step("Check object data is not corrupted"):
|
|
got_file_path = get_object(
|
|
wallet, cid, oid, endpoint=new_nodes[0].get_rpc_endpoint(), shell=self.shell
|
|
)
|
|
assert get_file_hash(source_file_path) == get_file_hash(got_file_path)
|
|
|
|
with allure.step("Return all hosts"):
|
|
return_stopped_hosts(self.shell, self.cluster)
|
|
|
|
with allure.step("Check object data is not corrupted"):
|
|
new_nodes = wait_object_replication(
|
|
cid, oid, 2, shell=self.shell, nodes=self.cluster.storage_nodes
|
|
)
|
|
got_file_path = get_object(
|
|
wallet, cid, oid, shell=self.shell, endpoint=new_nodes[0].get_rpc_endpoint()
|
|
)
|
|
assert get_file_hash(source_file_path) == get_file_hash(got_file_path)
|
|
|
|
@allure.title("Panic storage node's host")
|
|
@pytest.mark.parametrize("sequence", [True, False])
|
|
@pytest.mark.failover_panic
|
|
def test_panic_storage_node_host(
|
|
self, default_wallet, require_multiple_hosts, sequence: bool, simple_object_size
|
|
):
|
|
wallet = default_wallet
|
|
placement_rule = "REP 2 IN X CBF 2 SELECT 2 FROM * AS X"
|
|
source_file_path = generate_file(simple_object_size)
|
|
cid = create_container(
|
|
wallet,
|
|
shell=self.shell,
|
|
endpoint=self.cluster.default_rpc_endpoint,
|
|
rule=placement_rule,
|
|
basic_acl=PUBLIC_ACL,
|
|
)
|
|
oid = put_object_to_random_node(
|
|
wallet, source_file_path, cid, shell=self.shell, cluster=self.cluster
|
|
)
|
|
|
|
nodes = wait_object_replication(
|
|
cid, oid, 2, shell=self.shell, nodes=self.cluster.storage_nodes
|
|
)
|
|
allure.attach(
|
|
"\n".join([str(node) for node in nodes]),
|
|
"Current nodes with object",
|
|
allure.attachment_type.TEXT,
|
|
)
|
|
|
|
new_nodes: list[StorageNode] = []
|
|
for node in nodes:
|
|
with allure.step(f"Hard reboot host {node} via magic SysRq option"):
|
|
panic_reboot_host(node.host)
|
|
if sequence:
|
|
try:
|
|
new_nodes = wait_object_replication(
|
|
cid,
|
|
oid,
|
|
2,
|
|
shell=self.shell,
|
|
nodes=list(set(self.cluster.storage_nodes) - {node}),
|
|
)
|
|
except AssertionError:
|
|
new_nodes = wait_object_replication(
|
|
cid,
|
|
oid,
|
|
2,
|
|
shell=self.shell,
|
|
nodes=self.cluster.storage_nodes,
|
|
)
|
|
|
|
allure.attach(
|
|
"\n".join([str(new_node) for new_node in new_nodes]),
|
|
f"Nodes with object after {node} fail",
|
|
allure.attachment_type.TEXT,
|
|
)
|
|
|
|
if not sequence:
|
|
new_nodes = wait_object_replication(
|
|
cid, oid, 2, shell=self.shell, nodes=self.cluster.storage_nodes
|
|
)
|
|
allure.attach(
|
|
"\n".join([str(new_node) for new_node in new_nodes]),
|
|
"Nodes with object after nodes fail",
|
|
allure.attachment_type.TEXT,
|
|
)
|
|
|
|
got_file_path = get_object(
|
|
wallet, cid, oid, shell=self.shell, endpoint=new_nodes[0].get_rpc_endpoint()
|
|
)
|
|
assert get_file_hash(source_file_path) == get_file_hash(got_file_path)
|
|
|
|
|
|
def pytest_generate_tests(metafunc: pytest.Metafunc):
|
|
if "s3_client" in metafunc.fixturenames:
|
|
metafunc.parametrize("s3_client", [AwsCliClient, Boto3ClientWrapper], indirect=True)
|
|
|
|
|
|
@pytest.mark.failover
|
|
@pytest.mark.failover_empty_map
|
|
class TestEmptyMap(ClusterTestBase):
|
|
"""
|
|
A set of tests for makes map empty and verify that we can read objects after that
|
|
"""
|
|
|
|
@allure.step("Teardown after EmptyMap offline test")
|
|
@pytest.fixture()
|
|
def empty_map_offline_teardown(self):
|
|
yield
|
|
with allure.step("Return all storage nodes to network map"):
|
|
for node in list(stopped_nodes):
|
|
include_node_to_network_map(node, node, shell=self.shell, cluster=self.cluster)
|
|
stopped_nodes.remove(node)
|
|
|
|
@test_case.title("Test makes network map empty (offline all storage nodes)")
|
|
@test_case.priority(test_case.TestCasePriority.HIGH)
|
|
@test_case.suite_name("failovers")
|
|
@test_case.suite_section("test_failover_storage")
|
|
@pytest.mark.failover_empty_map_offlne
|
|
@allure.title("Test makes network map empty (offline all storage nodes)")
|
|
def test_offline_all_storage_nodes(
|
|
self,
|
|
s3_client: S3ClientWrapper,
|
|
bucket: str,
|
|
simple_object_size: int,
|
|
empty_map_offline_teardown,
|
|
):
|
|
"""
|
|
The test makes network map empty (set offline status on all storage nodes) then returns all nodes to map and checks that object can read through s3.
|
|
|
|
Steps:
|
|
1. Check that bucket is empty
|
|
2: PUT object into bucket
|
|
3: Check that object exists in bucket
|
|
4: Exclude all storage nodes from network map (set status OFFLINE)
|
|
5: Return all storage nodes to network map
|
|
6: Check that we can read object from #2
|
|
Args:
|
|
bucket: bucket which contains tested object
|
|
simple_object_size: size of object
|
|
"""
|
|
file_path = generate_file(simple_object_size)
|
|
file_name = s3_helper.object_key_from_file_path(file_path)
|
|
bucket_objects = [file_name]
|
|
|
|
objects_list = s3_client.list_objects(bucket)
|
|
assert not objects_list, f"Expected empty bucket, got {objects_list}"
|
|
|
|
with allure.step("Put object into bucket"):
|
|
s3_client.put_object(bucket, file_path)
|
|
|
|
with allure.step("Check that object exists in bucket"):
|
|
s3_helper.check_objects_in_bucket(s3_client, bucket, bucket_objects)
|
|
|
|
storage_nodes = self.cluster.storage_nodes
|
|
with allure.step("Exclude all storage nodes from network map"):
|
|
for node in storage_nodes:
|
|
exclude_node_from_network_map(node, node, shell=self.shell, cluster=self.cluster)
|
|
stopped_nodes.append(node)
|
|
|
|
with allure.step("Return all storage nodes to network map"):
|
|
for node in storage_nodes:
|
|
include_node_to_network_map(node, node, shell=self.shell, cluster=self.cluster)
|
|
stopped_nodes.remove(node)
|
|
|
|
with allure.step("Check that we can read object"):
|
|
s3_helper.check_objects_in_bucket(s3_client, bucket, bucket_objects)
|
|
|
|
@allure.step("Teardown after EmptyMap stop service test")
|
|
@pytest.fixture()
|
|
def empty_map_stop_service_teardown(self):
|
|
yield
|
|
with allure.step("Return all storage nodes to network map"):
|
|
for node in list(list(stopped_nodes)):
|
|
with allure.step(f"Start node {node}"):
|
|
node.start_service()
|
|
with allure.step(f"Waiting status ready for node {node}"):
|
|
wait_for_node_to_be_ready(node)
|
|
|
|
sleep(datetime_utils.parse_time(MORPH_BLOCK_TIME))
|
|
self.tick_epochs(1)
|
|
check_node_in_map(node, shell=self.shell, alive_node=node)
|
|
stopped_nodes.remove(node)
|
|
|
|
@test_case.title("Test makes network map empty (stop storage service on all nodes)")
|
|
@test_case.priority(test_case.TestCasePriority.HIGH)
|
|
@test_case.suite_name("failovers")
|
|
@test_case.suite_section("test_failover_storage")
|
|
@pytest.mark.failover_empty_map_stop_service
|
|
@allure.title("Test makes network map empty (stop storage service on all nodes)")
|
|
def test_stop_all_storage_nodes(
|
|
self,
|
|
s3_client: S3ClientWrapper,
|
|
bucket: str,
|
|
simple_object_size: int,
|
|
empty_map_stop_service_teardown,
|
|
):
|
|
"""
|
|
The test makes network map empty (stop storage service on all nodes
|
|
then use 'frostfs-adm morph delete-nodes' to delete nodes from map)
|
|
then start all services and checks that object can read through s3.
|
|
|
|
Steps:
|
|
1. Check that bucket is empty
|
|
2: PUT object into bucket
|
|
3: Check that object exists in bucket
|
|
4: Exclude all storage nodes from network map (stop storage service
|
|
and manual exclude from map)
|
|
5: Return all storage nodes to network map
|
|
6: Check that we can read object from #2
|
|
Args:
|
|
bucket: bucket which contains tested object
|
|
simple_object_size: size of object
|
|
"""
|
|
file_path = generate_file(simple_object_size)
|
|
file_name = s3_helper.object_key_from_file_path(file_path)
|
|
bucket_objects = [file_name]
|
|
|
|
objects_list = s3_client.list_objects(bucket)
|
|
assert not objects_list, f"Expected empty bucket, got {objects_list}"
|
|
|
|
with allure.step("Put object into bucket"):
|
|
s3_client.put_object(bucket, file_path)
|
|
|
|
with allure.step("Check that object exists in bucket"):
|
|
s3_helper.check_objects_in_bucket(s3_client, bucket, bucket_objects)
|
|
|
|
with allure.step("Stop all storage nodes"):
|
|
for node in self.cluster.storage_nodes:
|
|
with allure.step(f"Stop storage service on node: {node}"):
|
|
node.stop_service()
|
|
stopped_nodes.append(node)
|
|
|
|
with allure.step("Remove all nodes from network map"):
|
|
remove_nodes_from_map_morph(
|
|
shell=self.shell, cluster=self.cluster, remove_nodes=stopped_nodes
|
|
)
|
|
|
|
with allure.step("Return all storage nodes to network map"):
|
|
self.return_nodes_after_stop_with_check_empty_map(stopped_nodes)
|
|
|
|
with allure.step("Check that object exists in bucket"):
|
|
s3_helper.check_objects_in_bucket(s3_client, bucket, bucket_objects)
|
|
|
|
@allure.step("Return all nodes to cluster with check empty map first")
|
|
def return_nodes_after_stop_with_check_empty_map(self, return_nodes=None) -> None:
|
|
first_node = True
|
|
for node in list(return_nodes):
|
|
with allure.step(f"Start node {node}"):
|
|
node.start_service()
|
|
with allure.step(f"Waiting status ready for node {node}"):
|
|
wait_for_node_to_be_ready(node)
|
|
|
|
with allure.step("Make sure that network map is empty"):
|
|
if first_node:
|
|
for check_node in list(return_nodes):
|
|
check_node_not_in_map(check_node, shell=self.shell, alive_node=node)
|
|
first_node = False
|
|
|
|
sleep(datetime_utils.parse_time(MORPH_BLOCK_TIME))
|
|
self.tick_epochs(1)
|
|
check_node_in_map(node, shell=self.shell, alive_node=node)
|
|
stopped_nodes.remove(node)
|
|
|
|
@allure.title("Test S3 Object loss from fstree/blobovnicza, versioning is enabled")
|
|
def test_s3_fstree_blobovnicza_loss_versioning_on(
|
|
self,
|
|
s3_client: S3ClientWrapper,
|
|
simple_object_size: int,
|
|
cluster_state_controller: ClusterStateController,
|
|
):
|
|
bucket = s3_client.create_bucket()
|
|
s3_helper.set_bucket_versioning(s3_client, bucket, VersioningStatus.ENABLED)
|
|
|
|
file_path = generate_file(simple_object_size)
|
|
file_name = s3_helper.object_key_from_file_path(file_path)
|
|
|
|
object_versions = []
|
|
with allure.step("Put object into one bucket"):
|
|
put_object = s3_client.put_object(bucket, file_path)
|
|
s3_helper.check_objects_in_bucket(s3_client, bucket, expected_objects=[file_name])
|
|
object_versions.append(put_object)
|
|
|
|
with allure.step("Stop all storage nodes"):
|
|
for node in self.cluster.cluster_nodes:
|
|
with allure.step(f"Stop storage service on node: {node}"):
|
|
cluster_state_controller.stop_storage_service(node)
|
|
|
|
with allure.step("Delete blobovnicza and fstree from all nodes"):
|
|
for node in self.cluster.storage_nodes:
|
|
node.delete_blobovnicza()
|
|
node.delete_fstree()
|
|
|
|
with allure.step("Start all storage nodes"):
|
|
cluster_state_controller.start_stopped_storage_services()
|
|
|
|
# need to get Delete Marker first
|
|
with allure.step("Delete the object from the bucket"):
|
|
delete_object = s3_client.delete_object(bucket, file_name)
|
|
object_versions.append(delete_object["VersionId"])
|
|
|
|
# and now delete all versions of object (including Delete Markers)
|
|
with allure.step("Delete all versions of the object from the bucket"):
|
|
for version in object_versions:
|
|
delete_object = s3_client.delete_object(bucket, file_name, version_id=version)
|
|
|
|
with allure.step("Delete bucket"):
|
|
s3_client.delete_bucket(bucket)
|
|
|
|
@allure.title("Test S3 Object loss from fstree/blobovnicza, versioning is disabled")
|
|
def test_s3_fstree_blobovnicza_loss_versioning_off(
|
|
self,
|
|
s3_client: S3ClientWrapper,
|
|
simple_object_size: int,
|
|
cluster_state_controller: ClusterStateController,
|
|
):
|
|
bucket = s3_client.create_bucket()
|
|
|
|
file_path = generate_file(simple_object_size)
|
|
file_name = s3_helper.object_key_from_file_path(file_path)
|
|
|
|
with allure.step("Put object into one bucket"):
|
|
s3_client.put_object(bucket, file_path)
|
|
s3_helper.check_objects_in_bucket(s3_client, bucket, expected_objects=[file_name])
|
|
|
|
with allure.step("Stop all storage nodes"):
|
|
for node in self.cluster.cluster_nodes:
|
|
with allure.step(f"Stop storage service on node: {node}"):
|
|
cluster_state_controller.stop_storage_service(node)
|
|
|
|
with allure.step("Delete blobovnicza and fstree from all nodes"):
|
|
for node in self.cluster.storage_nodes:
|
|
node.delete_blobovnicza()
|
|
node.delete_fstree()
|
|
|
|
with allure.step("Start all storage nodes"):
|
|
cluster_state_controller.start_stopped_storage_services()
|
|
|
|
with allure.step("Delete the object from the bucket"):
|
|
s3_client.delete_object(bucket, file_name)
|
|
|
|
with allure.step("Delete bucket"):
|
|
s3_client.delete_bucket(bucket)
|
|
|
|
@pytest.mark.skip(reason="Need to increase cache lifetime")
|
|
@pytest.mark.parametrize(
|
|
# versioning should NOT be VersioningStatus.SUSPENDED, it needs to be undefined
|
|
"versioning_status",
|
|
[VersioningStatus.ENABLED, None],
|
|
)
|
|
@allure.title(
|
|
"After Pilorama.db loss on all nodes list objects should return nothing in second listing"
|
|
)
|
|
def test_s3_pilorama_loss(
|
|
self,
|
|
s3_client: S3ClientWrapper,
|
|
simple_object_size: int,
|
|
versioning_status: VersioningStatus,
|
|
cluster_state_controller: ClusterStateController,
|
|
):
|
|
bucket = s3_client.create_bucket()
|
|
if versioning_status:
|
|
s3_helper.set_bucket_versioning(s3_client, bucket, versioning_status)
|
|
|
|
file_path = generate_file(simple_object_size)
|
|
file_name = s3_helper.object_key_from_file_path(file_path)
|
|
|
|
with allure.step("Put object into one bucket"):
|
|
s3_client.put_object(bucket, file_path)
|
|
s3_helper.check_objects_in_bucket(s3_client, bucket, expected_objects=[file_name])
|
|
|
|
with allure.step("Stop all storage nodes"):
|
|
for node in self.cluster.cluster_nodes:
|
|
with allure.step(f"Stop storage service on node: {node}"):
|
|
cluster_state_controller.stop_storage_service(node)
|
|
|
|
with allure.step("Delete pilorama.db from all nodes"):
|
|
for node in self.cluster.storage_nodes:
|
|
node.delete_pilorama()
|
|
|
|
with allure.step("Start all storage nodes"):
|
|
cluster_state_controller.start_stopped_storage_services()
|
|
|
|
with allure.step("Check list objects first time"):
|
|
objects_list = s3_client.list_objects(bucket)
|
|
assert objects_list, f"Expected not empty bucket"
|
|
|
|
with allure.step("Check list objects second time"):
|
|
objects_list = s3_client.list_objects(bucket)
|
|
assert not objects_list, f"Expected empty bucket, got {objects_list}"
|
|
|
|
with allure.step("Delete bucket"):
|
|
s3_client.delete_bucket(bucket)
|