import logging import os import random import re import time from time import sleep import allure import pytest from frostfs_testlib.resources.common import MORPH_BLOCK_TIME from frostfs_testlib.resources.wellknown_acl import PUBLIC_ACL from frostfs_testlib.s3 import AwsCliClient from frostfs_testlib.s3.interfaces import S3ClientWrapper, VersioningStatus from frostfs_testlib.steps.cli.container import ( create_container, list_objects, search_nodes_with_container, ) from frostfs_testlib.steps.cli.object import get_object, get_object_nodes, put_object_to_random_node from frostfs_testlib.steps.s3.s3_helper import set_bucket_versioning from frostfs_testlib.storage.cluster import Cluster, ClusterNode from frostfs_testlib.storage.controllers import ClusterStateController from frostfs_testlib.storage.dataclasses.object_size import ObjectSize from frostfs_testlib.testing.cluster_test_base import ClusterTestBase from frostfs_testlib.utils import datetime_utils from frostfs_testlib.utils.failover_utils import ( wait_all_storage_nodes_returned, wait_object_replication, ) from frostfs_testlib.utils.file_utils import generate_file, get_file_hash logger = logging.getLogger("NeoLogger") STORAGE_NODE_COMMUNICATION_PORT = "8080" STORAGE_NODE_COMMUNICATION_PORT_TLS = "8082" PORTS_TO_BLOCK = [STORAGE_NODE_COMMUNICATION_PORT, STORAGE_NODE_COMMUNICATION_PORT_TLS] blocked_nodes: list[ClusterNode] = [] def pytest_generate_tests(metafunc): if "s3_client" in metafunc.fixturenames: metafunc.parametrize( "s3_client, auth_container_placement_policy", [ ( AwsCliClient, "REP $ALPHABET_NODE_COUNT$ SELECT 4 FROM ALPHA FILTER 'role' EQ 'alphabet' AS ALPHA", ) ], ids=["aws"], indirect=True, ) @pytest.mark.failover @pytest.mark.failover_network class TestFailoverNetwork(ClusterTestBase): @pytest.fixture(autouse=True) @allure.title("Restore network") def restore_network(self, cluster_state_controller: ClusterStateController): yield with allure.step(f"Count blocked nodes {len(blocked_nodes)}"): not_empty = len(blocked_nodes) != 0 for node in list(blocked_nodes): with allure.step(f"Restore network for {node}"): cluster_state_controller.restore_traffic(mode="ports", node=node) blocked_nodes.remove(node) if not_empty: wait_all_storage_nodes_returned(self.shell, self.cluster) @allure.title("Block Storage node traffic") def test_block_storage_node_traffic( self, default_wallet: str, require_multiple_hosts, simple_object_size: ObjectSize, cluster_state_controller: ClusterStateController, ): """ Block storage nodes traffic using iptables and wait for replication for objects. """ wallet = default_wallet placement_rule = "REP 2 IN X CBF 2 SELECT 2 FROM * AS X" wakeup_node_timeout = 10 # timeout to let nodes detect that traffic has blocked nodes_to_block_count = 2 source_file_path = generate_file(simple_object_size.value) cid = create_container( wallet, shell=self.shell, endpoint=self.cluster.default_rpc_endpoint, rule=placement_rule, basic_acl=PUBLIC_ACL, ) oid = put_object_to_random_node( wallet, source_file_path, cid, shell=self.shell, cluster=self.cluster ) nodes = wait_object_replication( cid, oid, 2, shell=self.shell, nodes=self.cluster.storage_nodes ) logger.info(f"Nodes are {nodes}") nodes_to_block = nodes if nodes_to_block_count > len(nodes): # TODO: the intent of this logic is not clear, need to revisit nodes_to_block = random.choices(nodes, k=2) excluded_nodes = [] for node in nodes_to_block: with allure.step(f"Block incoming traffic at node {node} on port {PORTS_TO_BLOCK}"): block_node = [ cluster_node for cluster_node in self.cluster.cluster_nodes if cluster_node.storage_node == node ] blocked_nodes.append(*block_node) excluded_nodes.append(node) cluster_state_controller.drop_traffic( mode="ports", node=node, wakeup_timeout=wakeup_node_timeout, ports=PORTS_TO_BLOCK, ) with allure.step(f"Check object is not stored on node {node}"): new_nodes = wait_object_replication( cid, oid, 2, shell=self.shell, nodes=list(set(self.cluster.storage_nodes) - set(excluded_nodes)), ) assert node not in new_nodes with allure.step("Check object data is not corrupted"): got_file_path = get_object( wallet, cid, oid, endpoint=new_nodes[0].get_rpc_endpoint(), shell=self.shell ) assert get_file_hash(source_file_path) == get_file_hash(got_file_path) for node in nodes_to_block: with allure.step(f"Unblock incoming traffic at host {node} on port {PORTS_TO_BLOCK}"): cluster_state_controller.restore_traffic(mode="ports", node=node) block_node = [ cluster_node for cluster_node in self.cluster.cluster_nodes if cluster_node.storage_node == node ] blocked_nodes.remove(*block_node) sleep(wakeup_node_timeout) with allure.step("Check object data is not corrupted"): new_nodes = wait_object_replication( cid, oid, 2, shell=self.shell, nodes=self.cluster.storage_nodes ) got_file_path = get_object( wallet, cid, oid, shell=self.shell, endpoint=new_nodes[0].get_rpc_endpoint() ) assert get_file_hash(source_file_path) == get_file_hash(got_file_path) @pytest.mark.failover @pytest.mark.failover_split_brain class TestFailoverSplitBrain(ClusterTestBase): @pytest.fixture(autouse=True) def restore_traffic_system(self, cluster_state_controller: ClusterStateController) -> None: yield cluster_state_controller.restore_all_traffic() def search_container_bucket(self, bucket): output = self.shell.exec( f"curl -k --head https://{self.cluster.default_rpc_endpoint.split(':')[0]}/{bucket}" ).stdout.strip() pattern = r"x-container-id: (.*)" matches = re.findall(pattern, output) if matches: return matches[0] else: return logger.info(f"Key {pattern} no search") def split_nodes(self, nodes_list: list[ClusterNode]) -> tuple[list[ClusterNode], ...]: count = len(nodes_list) splitted = [] free_nodes = list(set(self.cluster.cluster_nodes) - set(nodes_list)) for i in range(count): splitted.append(nodes_list[i::count] + free_nodes[i::count]) return tuple(s for s in splitted) @allure.title( "Replication tree after split brain, versioning bucket (placement_policy={auth_container_placement_policy}, s3_client={s3_client})", ) def test_versioning_bucket_after_split_brain( self, cluster_state_controller: ClusterStateController, bucket: str, s3_client: S3ClientWrapper, default_wallet, simple_object_size: ObjectSize, ): object_version = [] with allure.step(f"Search ID container for bucket - {bucket}"): bucket_cid = self.search_container_bucket(bucket) with allure.step(f"Check, container - {bucket_cid}, which is located on 4 nodes"): container_nodes = search_nodes_with_container( wallet=default_wallet, cid=bucket_cid, shell=self.shell, endpoint=self.cluster.default_rpc_endpoint, cluster=self.cluster, ) assert len(container_nodes) == 4, f"Expected 4 nodes, actually - {len(container_nodes)}" with allure.step(f"Enable versioning in a bucket - {bucket}"): set_bucket_versioning( s3_client=s3_client, bucket=bucket, status=VersioningStatus.ENABLED ) with allure.step("Check that versioning is enabled in the bucket"): assert s3_client.get_bucket_versioning_status(bucket) == "Enabled" with allure.step(f"Put an object in a bucket - {bucket}"): file_one = generate_file(simple_object_size.value) object_version.append( s3_client.put_object( bucket=bucket, filepath=file_one, ) ) os.remove(file_one) with allure.step("Check that the object is placed in the bucket and it is alone there"): list_objects_versions = s3_client.list_objects_versions(bucket=bucket) assert ( len(list_objects_versions) == 1 ), f"Expected 1, actual {len(list_objects_versions)}" with allure.step("Find the ID of the placed object"): oid = list_objects( wallet=default_wallet, shell=self.shell, container_id=bucket_cid, endpoint=self.cluster.default_rpc_endpoint, ) with allure.step(f"Find the nodes on which the object lies - {oid}"): objects_nodes = get_object_nodes( cluster=self.cluster, wallet=default_wallet, cid=bucket_cid, oid=oid[0], shell=self.shell, endpoint=self.cluster.default_rpc_endpoint, ) with allure.step("Split cluster"): segment_one, segment_two = self.split_nodes(objects_nodes) with allure.step(f"Enable splitting on nodes - {segment_one}"): for node in segment_one: cluster_state_controller.drop_traffic( mode="nodes", node=node, wakeup_timeout=60, block_nodes=segment_two ) with allure.step(f"Checking ping from {node} to {segment_two}, expected False"): assert ( cluster_state_controller.ping_traffic(node, segment_two, 1) is True ), "Expected True" for node in segment_two: with allure.step(f"Checking ping from {node} to {segment_two}, expected False"): assert ( cluster_state_controller.ping_traffic(node, segment_one, 1) is True ), "Expected True" with allure.step("Put the 2nd version of the same object on both halves of the cluster"): for node in objects_nodes: file = generate_file(simple_object_size.value) with allure.step(f"Change s3 endpoint to - {node.s3_gate.get_endpoint()}"): s3_client.set_endpoint(node.s3_gate.get_endpoint()) with allure.step(f"Put object in bucket - {bucket}"): object_version.append( s3_client.put_object( bucket=bucket, filepath=file, ) ) os.remove(file) time.sleep(datetime_utils.parse_time(MORPH_BLOCK_TIME)) with allure.step( f"Change s3 endpoint to default - {self.cluster.default_s3_gate_endpoint}" ): s3_client.set_endpoint(self.cluster.default_s3_gate_endpoint) with allure.step("Turn off split"): for node in segment_one: with allure.step(f"Turn off to split - {node}"): cluster_state_controller.restore_traffic(mode="nodes", node=node) with allure.step(f"Checking ping from {node} to {segment_two}, expected True"): assert cluster_state_controller.ping_traffic( node, segment_two, 0 ), "Expected True" for node in segment_two: with allure.step(f"Checking ping from {node} to {segment_one}, expected True"): assert cluster_state_controller.ping_traffic( node, segment_one, 0 ), "Expected True" with allure.step("Tick 1 epoch and wait 2 block"): self.tick_epochs(1, self.cluster.storage_nodes[0]) time.sleep(datetime_utils.parse_time(MORPH_BLOCK_TIME) * 2) with allure.step( "Check that the nodes of both halves of the cluster contain all 3 downloaded versions" ): bucket_versions = s3_client.list_objects_versions(bucket=bucket) bucket_versions = sorted(bucket_versions, key=lambda x: x["LastModified"]) assert len(bucket_versions) == 3, f"Expected 3, actually {len(bucket_versions)}" with allure.step("Check that the latest version is the one that was uploaded last in time"): assert object_version[-1] == bucket_versions[-1]["VersionId"], ( f"{object_version[-1]} " f"!= {bucket_versions[-1]['VersionId']}" ) @allure.title( "Replication tree after split brain, no version bucket (placement_policy={auth_container_placement_policy}, s3_client={s3_client})" ) def test_no_version_bucket_after_split_brain( self, cluster_state_controller: ClusterStateController, bucket: str, s3_client: S3ClientWrapper, default_wallet, simple_object_size: ObjectSize, ): with allure.step(f"Search ID container for bucket - {bucket}"): bucket_cid = self.search_container_bucket(bucket) with allure.step(f"Check, container - {bucket_cid}, which is located on 4 nodes"): container_nodes = search_nodes_with_container( wallet=default_wallet, cid=bucket_cid, shell=self.shell, endpoint=self.cluster.default_rpc_endpoint, cluster=self.cluster, ) assert len(container_nodes) == 4, f"Expected 4 nodes, actually - {len(container_nodes)}" with allure.step(f"Put an object in a bucket - {bucket}"): file_one = generate_file(simple_object_size.value) s3_client.put_object( bucket=bucket, filepath=file_one, ) os.remove(file_one) with allure.step("Check that the object is placed in the bucket and it is alone there"): list_objects_versions = s3_client.list_objects_versions(bucket=bucket) assert ( len(list_objects_versions) == 1 ), f"Expected 1, actual {len(list_objects_versions)}" with allure.step("Find the ID of the placed object"): oid = list_objects( wallet=default_wallet, shell=self.shell, container_id=bucket_cid, endpoint=self.cluster.default_rpc_endpoint, ) with allure.step(f"Find the nodes on which the object lies - {oid}"): objects_nodes = get_object_nodes( cluster=self.cluster, wallet=default_wallet, cid=bucket_cid, oid=oid[0], shell=self.shell, endpoint=self.cluster.default_rpc_endpoint, ) with allure.step("Split cluster"): segment_one, segment_two = self.split_nodes(objects_nodes) with allure.step(f"Enable splitting on nodes - {segment_one}"): for node in segment_one: cluster_state_controller.drop_traffic( mode="nodes", node=node, wakeup_timeout=60, block_nodes=segment_two ) with allure.step(f"Checking ping from {node} to {segment_two}, expected False"): assert cluster_state_controller.ping_traffic( node, segment_two, 1 ), "Expected False" for node in segment_two: with allure.step(f"Checking ping from {node} to {segment_two}, expected False"): assert cluster_state_controller.ping_traffic( node, segment_one, 1 ), "Expected False" with allure.step("Put the 2nd version of the same object on both halves of the cluster"): for node in objects_nodes: file = generate_file(simple_object_size.value) with allure.step(f"Change s3 endpoint to - {node.s3_gate.get_endpoint()}"): s3_client.set_endpoint(node.s3_gate.get_endpoint()) with allure.step(f"Put object in bucket - {bucket}"): s3_client.put_object( bucket=bucket, filepath=file, ) os.remove(file) time.sleep(datetime_utils.parse_time(MORPH_BLOCK_TIME)) with allure.step( f"Change s3 endpoint to default - {self.cluster.default_s3_gate_endpoint}" ): s3_client.set_endpoint(self.cluster.default_s3_gate_endpoint) with allure.step("Turn off split"): for node in segment_one: with allure.step(f"Turn off to split - {node}"): cluster_state_controller.restore_traffic(mode="nodes", node=node) with allure.step(f"Checking ping from {node} to {segment_two}, expected True"): assert cluster_state_controller.ping_traffic( node, segment_two, 0 ), "Expected True" for node in segment_two: with allure.step(f"Checking ping from {node} to {segment_one}, expected True"): assert cluster_state_controller.ping_traffic( node, segment_one, 0 ), "Expected True" with allure.step("Tick 1 epoch and wait 2 block"): self.tick_epochs(1, self.cluster.storage_nodes[0]) time.sleep(datetime_utils.parse_time(MORPH_BLOCK_TIME) * 2) with allure.step( "Check that the nodes of both halves of the cluster contain all 3 downloaded versions" ): objects_bucket = s3_client.list_objects_versions(bucket=bucket) assert len(objects_bucket) == 3, f"Expected 3, actually {len(objects_bucket)}"