diff --git a/pytest_tests/helpers/iptables_helper.py b/pytest_tests/helpers/iptables_helper.py deleted file mode 100644 index f5aa5043..00000000 --- a/pytest_tests/helpers/iptables_helper.py +++ /dev/null @@ -1,13 +0,0 @@ -from frostfs_testlib.shell import Shell - - -class IpTablesHelper: - @staticmethod - def drop_input_traffic_to_port(shell: Shell, ports: list[str]) -> None: - for port in ports: - shell.exec(f"sudo iptables -A INPUT -p tcp --dport {port} -j DROP") - - @staticmethod - def restore_input_traffic_to_port(shell: Shell, ports: list[str]) -> None: - for port in ports: - shell.exec(f"sudo iptables -D INPUT -p tcp --dport {port} -j DROP") diff --git a/pytest_tests/testsuites/failovers/test_failover_network.py b/pytest_tests/testsuites/failovers/test_failover_network.py index 0fe96999..7ebfcf04 100644 --- a/pytest_tests/testsuites/failovers/test_failover_network.py +++ b/pytest_tests/testsuites/failovers/test_failover_network.py @@ -1,28 +1,45 @@ import logging +import os import random +import re +import time from time import sleep +from typing import List, Tuple import allure import pytest +from frostfs_testlib.resources.common import MORPH_BLOCK_TIME from frostfs_testlib.resources.wellknown_acl import PUBLIC_ACL -from frostfs_testlib.steps.cli.container import create_container -from frostfs_testlib.steps.cli.object import get_object, put_object_to_random_node -from frostfs_testlib.storage.cluster import StorageNode +from frostfs_testlib.s3 import AwsCliClient +from frostfs_testlib.s3.interfaces import S3ClientWrapper, VersioningStatus +from frostfs_testlib.steps.cli.container import ( + create_container, + list_objects, + search_nodes_with_container, +) +from frostfs_testlib.steps.cli.object import get_object, get_object_nodes, put_object_to_random_node +from frostfs_testlib.steps.s3.s3_helper import set_bucket_versioning +from frostfs_testlib.storage.cluster import ClusterNode +from frostfs_testlib.storage.controllers import ClusterStateController from frostfs_testlib.storage.dataclasses.object_size import ObjectSize from frostfs_testlib.testing.cluster_test_base import ClusterTestBase +from frostfs_testlib.utils import datetime_utils from frostfs_testlib.utils.failover_utils import ( wait_all_storage_nodes_returned, wait_object_replication, ) from frostfs_testlib.utils.file_utils import generate_file, get_file_hash -from pytest_tests.helpers.iptables_helper import IpTablesHelper - logger = logging.getLogger("NeoLogger") STORAGE_NODE_COMMUNICATION_PORT = "8080" STORAGE_NODE_COMMUNICATION_PORT_TLS = "8082" PORTS_TO_BLOCK = [STORAGE_NODE_COMMUNICATION_PORT, STORAGE_NODE_COMMUNICATION_PORT_TLS] -blocked_nodes: list[StorageNode] = [] +blocked_nodes: list[ClusterNode] = [] + + +def pytest_generate_tests(metafunc): + if "s3_client" in metafunc.fixturenames: + metafunc.parametrize("s3_client", [AwsCliClient], ids=["aws"], indirect=True) @pytest.mark.failover @@ -30,13 +47,13 @@ blocked_nodes: list[StorageNode] = [] class TestFailoverNetwork(ClusterTestBase): @pytest.fixture(autouse=True) @allure.title("Restore network") - def restore_network(self): + def restore_network(self, cluster_state_controller: ClusterStateController): yield with allure.step(f"Count blocked nodes {len(blocked_nodes)}"): not_empty = len(blocked_nodes) != 0 for node in list(blocked_nodes): with allure.step(f"Restore network at host for {node.label}"): - IpTablesHelper.restore_input_traffic_to_port(node.host.get_shell(), PORTS_TO_BLOCK) + cluster_state_controller.restore_traffic(mode="ports", node=node) blocked_nodes.remove(node) if not_empty: wait_all_storage_nodes_returned(self.shell, self.cluster) @@ -47,6 +64,7 @@ class TestFailoverNetwork(ClusterTestBase): default_wallet: str, require_multiple_hosts, simple_object_size: ObjectSize, + cluster_state_controller: ClusterStateController, ): """ Block storage nodes traffic using iptables and wait for replication for objects. @@ -81,10 +99,19 @@ class TestFailoverNetwork(ClusterTestBase): excluded_nodes = [] for node in nodes_to_block: with allure.step(f"Block incoming traffic at node {node} on port {PORTS_TO_BLOCK}"): - blocked_nodes.append(node) + block_node = [ + cluster_node + for cluster_node in self.cluster.cluster_nodes + if cluster_node.storage_node == node + ] + blocked_nodes.append(*block_node) excluded_nodes.append(node) - IpTablesHelper.drop_input_traffic_to_port(node.host.get_shell(), PORTS_TO_BLOCK) - sleep(wakeup_node_timeout) + cluster_state_controller.drop_traffic( + mode="ports", + node=node, + wakeup_timeout=wakeup_node_timeout, + ports=PORTS_TO_BLOCK, + ) with allure.step(f"Check object is not stored on node {node}"): new_nodes = wait_object_replication( @@ -104,8 +131,13 @@ class TestFailoverNetwork(ClusterTestBase): for node in nodes_to_block: with allure.step(f"Unblock incoming traffic at host {node} on port {PORTS_TO_BLOCK}"): - IpTablesHelper.restore_input_traffic_to_port(node.host.get_shell(), PORTS_TO_BLOCK) - blocked_nodes.remove(node) + cluster_state_controller.restore_traffic(mode="ports", node=node) + block_node = [ + cluster_node + for cluster_node in self.cluster.cluster_nodes + if cluster_node.storage_node == node + ] + blocked_nodes.remove(*block_node) sleep(wakeup_node_timeout) with allure.step("Check object data is not corrupted"): @@ -117,3 +149,279 @@ class TestFailoverNetwork(ClusterTestBase): wallet, cid, oid, shell=self.shell, endpoint=new_nodes[0].get_rpc_endpoint() ) assert get_file_hash(source_file_path) == get_file_hash(got_file_path) + + +@pytest.mark.failover +@pytest.mark.failover_split_brain +class TestFailoverSplitBrain(ClusterTestBase): + @pytest.fixture(autouse=True) + def restore_traffic_system(self, cluster_state_controller: ClusterStateController) -> None: + yield + cluster_state_controller.restore_all_traffic() + + def search_container_bucket(self, bucket): + output = self.shell.exec( + f"curl -k --head https://{self.cluster.default_rpc_endpoint.split(':')[0]}/{bucket}" + ).stdout.strip() + pattern = r"x-container-id: (.*)" + matches = re.findall(pattern, output) + if matches: + return matches[0] + else: + return logger.info(f"Key {pattern} no search") + + def split_nodes(self, nodes_list: list[ClusterNode]) -> tuple[list[ClusterNode], ...]: + count = len(nodes_list) + splitted = [] + free_nodes = list(set(self.cluster.cluster_nodes) - set(nodes_list)) + for i in range(count): + splitted.append(nodes_list[i::count] + free_nodes[i::count]) + return tuple(s for s in splitted) + + @allure.title("Replication tree after split brain, versioning bucket") + def test_versioning_bucket_after_split_brain( + self, + cluster_state_controller: ClusterStateController, + bucket: str, + s3_client: S3ClientWrapper, + default_wallet, + simple_object_size: ObjectSize, + ): + object_version = [] + + with allure.step(f"Search ID container for bucket - {bucket}"): + bucket_cid = self.search_container_bucket(bucket) + + with allure.step(f"Check, container - {bucket_cid}, which is located on 4 nodes"): + container_nodes = search_nodes_with_container( + wallet=default_wallet, + cid=bucket_cid, + shell=self.shell, + endpoint=self.cluster.default_rpc_endpoint, + cluster=self.cluster, + ) + + assert len(container_nodes) == 4, f"Expected 4 nodes, actually - {len(container_nodes)}" + + with allure.step(f"Enable versioning in a bucket - {bucket}"): + set_bucket_versioning( + s3_client=s3_client, bucket=bucket, status=VersioningStatus.ENABLED + ) + + with allure.step("Check that versioning is enabled in the bucket"): + assert s3_client.get_bucket_versioning_status(bucket) == "Enabled" + + with allure.step(f"Put an object in a bucket - {bucket}"): + file_one = generate_file(simple_object_size.value) + object_version.append( + s3_client.put_object( + bucket=bucket, + filepath=file_one, + ) + ) + os.remove(file_one) + + with allure.step("Check that the object is placed in the bucket and it is alone there"): + list_objects_versions = s3_client.list_objects_versions(bucket=bucket) + assert ( + len(list_objects_versions) == 1 + ), f"Expected 1, actual {len(list_objects_versions)}" + + with allure.step("Find the ID of the placed object"): + oid = list_objects( + wallet=default_wallet, + shell=self.shell, + container_id=bucket_cid, + endpoint=self.cluster.default_rpc_endpoint, + ) + + with allure.step(f"Find the nodes on which the object lies - {oid}"): + objects_nodes = get_object_nodes( + cluster=self.cluster, + wallet=default_wallet, + cid=bucket_cid, + oid=oid[0], + shell=self.shell, + endpoint=self.cluster.default_rpc_endpoint, + ) + + with allure.step("Split cluster"): + segment_one, segment_two = self.split_nodes(objects_nodes) + + with allure.step(f"Enable splitting on nodes - {segment_one}"): + for node in segment_one: + cluster_state_controller.drop_traffic( + mode="nodes", node=node, wakeup_timeout=60, block_nodes=segment_two + ) + with allure.step(f"Checking ping from {node} to {segment_two}, expected False"): + assert ( + cluster_state_controller.ping_traffic(node, segment_two, 1) is True + ), "Expected True" + for node in segment_two: + with allure.step(f"Checking ping from {node} to {segment_two}, expected False"): + assert ( + cluster_state_controller.ping_traffic(node, segment_one, 1) is True + ), "Expected True" + + with allure.step("Put the 2nd version of the same object on both halves of the cluster"): + for node in objects_nodes: + file = generate_file(simple_object_size.value) + with allure.step(f"Change s3 endpoint to - {node.s3_gate.get_endpoint()}"): + s3_client.set_endpoint(node.s3_gate.get_endpoint()) + with allure.step(f"Put object in bucket - {bucket}"): + object_version.append( + s3_client.put_object( + bucket=bucket, + filepath=file, + ) + ) + os.remove(file) + time.sleep(datetime_utils.parse_time(MORPH_BLOCK_TIME)) + with allure.step( + f"Change s3 endpoint to default - {self.cluster.default_s3_gate_endpoint}" + ): + s3_client.set_endpoint(self.cluster.default_s3_gate_endpoint) + + with allure.step("Turn off split"): + for node in segment_one: + with allure.step(f"Turn off to split - {node}"): + cluster_state_controller.restore_traffic(mode="nodes", node=node) + with allure.step(f"Checking ping from {node} to {segment_two}, expected True"): + assert cluster_state_controller.ping_traffic( + node, segment_two, 0 + ), "Expected True" + for node in segment_two: + with allure.step(f"Checking ping from {node} to {segment_one}, expected True"): + assert cluster_state_controller.ping_traffic( + node, segment_one, 0 + ), "Expected True" + + with allure.step("Tick 1 epoch and wait 2 block"): + self.tick_epochs(1, self.cluster.storage_nodes[0]) + time.sleep(datetime_utils.parse_time(MORPH_BLOCK_TIME) * 2) + + with allure.step( + "Check that the nodes of both halves of the cluster contain all 3 downloaded versions" + ): + bucket_versions = s3_client.list_objects_versions(bucket=bucket) + bucket_versions = sorted(bucket_versions, key=lambda x: x["LastModified"]) + assert len(bucket_versions) == 3, f"Expected 3, actually {len(bucket_versions)}" + + with allure.step("Check that the latest version is the one that was uploaded last in time"): + assert object_version[-1] == bucket_versions[-1]["VersionId"], ( + f"{object_version[-1]} " f"!= {bucket_versions[-1]['VersionId']}" + ) + + @allure.title("Replication tree after split brain, no version bucket") + def test_no_version_bucket_after_split_brain( + self, + cluster_state_controller: ClusterStateController, + bucket: str, + s3_client: S3ClientWrapper, + default_wallet, + simple_object_size: ObjectSize, + ): + with allure.step(f"Search ID container for bucket - {bucket}"): + bucket_cid = self.search_container_bucket(bucket) + + with allure.step(f"Check, container - {bucket_cid}, which is located on 4 nodes"): + container_nodes = search_nodes_with_container( + wallet=default_wallet, + cid=bucket_cid, + shell=self.shell, + endpoint=self.cluster.default_rpc_endpoint, + cluster=self.cluster, + ) + + assert len(container_nodes) == 4, f"Expected 4 nodes, actually - {len(container_nodes)}" + + with allure.step(f"Put an object in a bucket - {bucket}"): + file_one = generate_file(simple_object_size.value) + s3_client.put_object( + bucket=bucket, + filepath=file_one, + ) + os.remove(file_one) + + with allure.step("Check that the object is placed in the bucket and it is alone there"): + list_objects_versions = s3_client.list_objects_versions(bucket=bucket) + assert ( + len(list_objects_versions) == 1 + ), f"Expected 1, actual {len(list_objects_versions)}" + + with allure.step("Find the ID of the placed object"): + oid = list_objects( + wallet=default_wallet, + shell=self.shell, + container_id=bucket_cid, + endpoint=self.cluster.default_rpc_endpoint, + ) + + with allure.step(f"Find the nodes on which the object lies - {oid}"): + objects_nodes = get_object_nodes( + cluster=self.cluster, + wallet=default_wallet, + cid=bucket_cid, + oid=oid[0], + shell=self.shell, + endpoint=self.cluster.default_rpc_endpoint, + ) + + with allure.step("Split cluster"): + segment_one, segment_two = self.split_nodes(objects_nodes) + + with allure.step(f"Enable splitting on nodes - {segment_one}"): + for node in segment_one: + cluster_state_controller.drop_traffic( + mode="nodes", node=node, wakeup_timeout=60, block_nodes=segment_two + ) + with allure.step(f"Checking ping from {node} to {segment_two}, expected False"): + assert cluster_state_controller.ping_traffic( + node, segment_two, 1 + ), "Expected False" + for node in segment_two: + with allure.step(f"Checking ping from {node} to {segment_two}, expected False"): + assert cluster_state_controller.ping_traffic( + node, segment_one, 1 + ), "Expected False" + + with allure.step("Put the 2nd version of the same object on both halves of the cluster"): + for node in objects_nodes: + file = generate_file(simple_object_size.value) + with allure.step(f"Change s3 endpoint to - {node.s3_gate.get_endpoint()}"): + s3_client.set_endpoint(node.s3_gate.get_endpoint()) + with allure.step(f"Put object in bucket - {bucket}"): + s3_client.put_object( + bucket=bucket, + filepath=file, + ) + os.remove(file) + time.sleep(datetime_utils.parse_time(MORPH_BLOCK_TIME)) + with allure.step( + f"Change s3 endpoint to default - {self.cluster.default_s3_gate_endpoint}" + ): + s3_client.set_endpoint(self.cluster.default_s3_gate_endpoint) + + with allure.step("Turn off split"): + for node in segment_one: + with allure.step(f"Turn off to split - {node}"): + cluster_state_controller.restore_traffic(mode="nodes", node=node) + with allure.step(f"Checking ping from {node} to {segment_two}, expected True"): + assert cluster_state_controller.ping_traffic( + node, segment_two, 0 + ), "Expected True" + for node in segment_two: + with allure.step(f"Checking ping from {node} to {segment_one}, expected True"): + assert cluster_state_controller.ping_traffic( + node, segment_one, 0 + ), "Expected True" + + with allure.step("Tick 1 epoch and wait 2 block"): + self.tick_epochs(1, self.cluster.storage_nodes[0]) + time.sleep(datetime_utils.parse_time(MORPH_BLOCK_TIME) * 2) + + with allure.step( + "Check that the nodes of both halves of the cluster contain all 3 downloaded versions" + ): + objects_bucket = s3_client.list_objects_versions(bucket=bucket) + assert len(objects_bucket) == 3, f"Expected 3, actually {len(objects_bucket)}"