frostfs-testcases/pytest_tests/testsuites/failovers/test_failover_network.py

428 lines
18 KiB
Python
Raw Normal View History

import logging
import os
import random
import re
import time
from time import sleep
from typing import List, Tuple
import allure
import pytest
from frostfs_testlib.resources.common import MORPH_BLOCK_TIME
from frostfs_testlib.resources.wellknown_acl import PUBLIC_ACL
from frostfs_testlib.s3 import AwsCliClient
from frostfs_testlib.s3.interfaces import S3ClientWrapper, VersioningStatus
from frostfs_testlib.steps.cli.container import (
create_container,
list_objects,
search_nodes_with_container,
)
from frostfs_testlib.steps.cli.object import get_object, get_object_nodes, put_object_to_random_node
from frostfs_testlib.steps.s3.s3_helper import set_bucket_versioning
from frostfs_testlib.storage.cluster import ClusterNode
from frostfs_testlib.storage.controllers import ClusterStateController
from frostfs_testlib.storage.dataclasses.object_size import ObjectSize
from frostfs_testlib.testing.cluster_test_base import ClusterTestBase
from frostfs_testlib.utils import datetime_utils
from frostfs_testlib.utils.failover_utils import (
wait_all_storage_nodes_returned,
wait_object_replication,
)
from frostfs_testlib.utils.file_utils import generate_file, get_file_hash
logger = logging.getLogger("NeoLogger")
STORAGE_NODE_COMMUNICATION_PORT = "8080"
STORAGE_NODE_COMMUNICATION_PORT_TLS = "8082"
PORTS_TO_BLOCK = [STORAGE_NODE_COMMUNICATION_PORT, STORAGE_NODE_COMMUNICATION_PORT_TLS]
blocked_nodes: list[ClusterNode] = []
def pytest_generate_tests(metafunc):
if "s3_client" in metafunc.fixturenames:
metafunc.parametrize("s3_client", [AwsCliClient], ids=["aws"], indirect=True)
@pytest.mark.failover
@pytest.mark.failover_network
class TestFailoverNetwork(ClusterTestBase):
@pytest.fixture(autouse=True)
@allure.title("Restore network")
def restore_network(self, cluster_state_controller: ClusterStateController):
yield
with allure.step(f"Count blocked nodes {len(blocked_nodes)}"):
not_empty = len(blocked_nodes) != 0
for node in list(blocked_nodes):
with allure.step(f"Restore network at host for {node.label}"):
cluster_state_controller.restore_traffic(mode="ports", node=node)
blocked_nodes.remove(node)
if not_empty:
wait_all_storage_nodes_returned(self.shell, self.cluster)
@allure.title("Block Storage node traffic")
def test_block_storage_node_traffic(
self,
default_wallet: str,
require_multiple_hosts,
simple_object_size: ObjectSize,
cluster_state_controller: ClusterStateController,
):
"""
Block storage nodes traffic using iptables and wait for replication for objects.
"""
wallet = default_wallet
placement_rule = "REP 2 IN X CBF 2 SELECT 2 FROM * AS X"
wakeup_node_timeout = 10 # timeout to let nodes detect that traffic has blocked
nodes_to_block_count = 2
source_file_path = generate_file(simple_object_size.value)
cid = create_container(
wallet,
shell=self.shell,
endpoint=self.cluster.default_rpc_endpoint,
rule=placement_rule,
basic_acl=PUBLIC_ACL,
)
oid = put_object_to_random_node(
wallet, source_file_path, cid, shell=self.shell, cluster=self.cluster
)
nodes = wait_object_replication(
cid, oid, 2, shell=self.shell, nodes=self.cluster.storage_nodes
)
logger.info(f"Nodes are {nodes}")
nodes_to_block = nodes
if nodes_to_block_count > len(nodes):
# TODO: the intent of this logic is not clear, need to revisit
nodes_to_block = random.choices(nodes, k=2)
excluded_nodes = []
for node in nodes_to_block:
with allure.step(f"Block incoming traffic at node {node} on port {PORTS_TO_BLOCK}"):
block_node = [
cluster_node
for cluster_node in self.cluster.cluster_nodes
if cluster_node.storage_node == node
]
blocked_nodes.append(*block_node)
excluded_nodes.append(node)
cluster_state_controller.drop_traffic(
mode="ports",
node=node,
wakeup_timeout=wakeup_node_timeout,
ports=PORTS_TO_BLOCK,
)
with allure.step(f"Check object is not stored on node {node}"):
new_nodes = wait_object_replication(
cid,
oid,
2,
shell=self.shell,
nodes=list(set(self.cluster.storage_nodes) - set(excluded_nodes)),
)
assert node not in new_nodes
with allure.step("Check object data is not corrupted"):
got_file_path = get_object(
wallet, cid, oid, endpoint=new_nodes[0].get_rpc_endpoint(), shell=self.shell
)
assert get_file_hash(source_file_path) == get_file_hash(got_file_path)
for node in nodes_to_block:
with allure.step(f"Unblock incoming traffic at host {node} on port {PORTS_TO_BLOCK}"):
cluster_state_controller.restore_traffic(mode="ports", node=node)
block_node = [
cluster_node
for cluster_node in self.cluster.cluster_nodes
if cluster_node.storage_node == node
]
blocked_nodes.remove(*block_node)
sleep(wakeup_node_timeout)
with allure.step("Check object data is not corrupted"):
new_nodes = wait_object_replication(
cid, oid, 2, shell=self.shell, nodes=self.cluster.storage_nodes
)
got_file_path = get_object(
wallet, cid, oid, shell=self.shell, endpoint=new_nodes[0].get_rpc_endpoint()
)
assert get_file_hash(source_file_path) == get_file_hash(got_file_path)
@pytest.mark.failover
@pytest.mark.failover_split_brain
class TestFailoverSplitBrain(ClusterTestBase):
@pytest.fixture(autouse=True)
def restore_traffic_system(self, cluster_state_controller: ClusterStateController) -> None:
yield
cluster_state_controller.restore_all_traffic()
def search_container_bucket(self, bucket):
output = self.shell.exec(
f"curl -k --head https://{self.cluster.default_rpc_endpoint.split(':')[0]}/{bucket}"
).stdout.strip()
pattern = r"x-container-id: (.*)"
matches = re.findall(pattern, output)
if matches:
return matches[0]
else:
return logger.info(f"Key {pattern} no search")
def split_nodes(self, nodes_list: list[ClusterNode]) -> tuple[list[ClusterNode], ...]:
count = len(nodes_list)
splitted = []
free_nodes = list(set(self.cluster.cluster_nodes) - set(nodes_list))
for i in range(count):
splitted.append(nodes_list[i::count] + free_nodes[i::count])
return tuple(s for s in splitted)
@allure.title("Replication tree after split brain, versioning bucket")
def test_versioning_bucket_after_split_brain(
self,
cluster_state_controller: ClusterStateController,
bucket: str,
s3_client: S3ClientWrapper,
default_wallet,
simple_object_size: ObjectSize,
):
object_version = []
with allure.step(f"Search ID container for bucket - {bucket}"):
bucket_cid = self.search_container_bucket(bucket)
with allure.step(f"Check, container - {bucket_cid}, which is located on 4 nodes"):
container_nodes = search_nodes_with_container(
wallet=default_wallet,
cid=bucket_cid,
shell=self.shell,
endpoint=self.cluster.default_rpc_endpoint,
cluster=self.cluster,
)
assert len(container_nodes) == 4, f"Expected 4 nodes, actually - {len(container_nodes)}"
with allure.step(f"Enable versioning in a bucket - {bucket}"):
set_bucket_versioning(
s3_client=s3_client, bucket=bucket, status=VersioningStatus.ENABLED
)
with allure.step("Check that versioning is enabled in the bucket"):
assert s3_client.get_bucket_versioning_status(bucket) == "Enabled"
with allure.step(f"Put an object in a bucket - {bucket}"):
file_one = generate_file(simple_object_size.value)
object_version.append(
s3_client.put_object(
bucket=bucket,
filepath=file_one,
)
)
os.remove(file_one)
with allure.step("Check that the object is placed in the bucket and it is alone there"):
list_objects_versions = s3_client.list_objects_versions(bucket=bucket)
assert (
len(list_objects_versions) == 1
), f"Expected 1, actual {len(list_objects_versions)}"
with allure.step("Find the ID of the placed object"):
oid = list_objects(
wallet=default_wallet,
shell=self.shell,
container_id=bucket_cid,
endpoint=self.cluster.default_rpc_endpoint,
)
with allure.step(f"Find the nodes on which the object lies - {oid}"):
objects_nodes = get_object_nodes(
cluster=self.cluster,
wallet=default_wallet,
cid=bucket_cid,
oid=oid[0],
shell=self.shell,
endpoint=self.cluster.default_rpc_endpoint,
)
with allure.step("Split cluster"):
segment_one, segment_two = self.split_nodes(objects_nodes)
with allure.step(f"Enable splitting on nodes - {segment_one}"):
for node in segment_one:
cluster_state_controller.drop_traffic(
mode="nodes", node=node, wakeup_timeout=60, block_nodes=segment_two
)
with allure.step(f"Checking ping from {node} to {segment_two}, expected False"):
assert (
cluster_state_controller.ping_traffic(node, segment_two, 1) is True
), "Expected True"
for node in segment_two:
with allure.step(f"Checking ping from {node} to {segment_two}, expected False"):
assert (
cluster_state_controller.ping_traffic(node, segment_one, 1) is True
), "Expected True"
with allure.step("Put the 2nd version of the same object on both halves of the cluster"):
for node in objects_nodes:
file = generate_file(simple_object_size.value)
with allure.step(f"Change s3 endpoint to - {node.s3_gate.get_endpoint()}"):
s3_client.set_endpoint(node.s3_gate.get_endpoint())
with allure.step(f"Put object in bucket - {bucket}"):
object_version.append(
s3_client.put_object(
bucket=bucket,
filepath=file,
)
)
os.remove(file)
time.sleep(datetime_utils.parse_time(MORPH_BLOCK_TIME))
with allure.step(
f"Change s3 endpoint to default - {self.cluster.default_s3_gate_endpoint}"
):
s3_client.set_endpoint(self.cluster.default_s3_gate_endpoint)
with allure.step("Turn off split"):
for node in segment_one:
with allure.step(f"Turn off to split - {node}"):
cluster_state_controller.restore_traffic(mode="nodes", node=node)
with allure.step(f"Checking ping from {node} to {segment_two}, expected True"):
assert cluster_state_controller.ping_traffic(
node, segment_two, 0
), "Expected True"
for node in segment_two:
with allure.step(f"Checking ping from {node} to {segment_one}, expected True"):
assert cluster_state_controller.ping_traffic(
node, segment_one, 0
), "Expected True"
with allure.step("Tick 1 epoch and wait 2 block"):
self.tick_epochs(1, self.cluster.storage_nodes[0])
time.sleep(datetime_utils.parse_time(MORPH_BLOCK_TIME) * 2)
with allure.step(
"Check that the nodes of both halves of the cluster contain all 3 downloaded versions"
):
bucket_versions = s3_client.list_objects_versions(bucket=bucket)
bucket_versions = sorted(bucket_versions, key=lambda x: x["LastModified"])
assert len(bucket_versions) == 3, f"Expected 3, actually {len(bucket_versions)}"
with allure.step("Check that the latest version is the one that was uploaded last in time"):
assert object_version[-1] == bucket_versions[-1]["VersionId"], (
f"{object_version[-1]} " f"!= {bucket_versions[-1]['VersionId']}"
)
@allure.title("Replication tree after split brain, no version bucket")
def test_no_version_bucket_after_split_brain(
self,
cluster_state_controller: ClusterStateController,
bucket: str,
s3_client: S3ClientWrapper,
default_wallet,
simple_object_size: ObjectSize,
):
with allure.step(f"Search ID container for bucket - {bucket}"):
bucket_cid = self.search_container_bucket(bucket)
with allure.step(f"Check, container - {bucket_cid}, which is located on 4 nodes"):
container_nodes = search_nodes_with_container(
wallet=default_wallet,
cid=bucket_cid,
shell=self.shell,
endpoint=self.cluster.default_rpc_endpoint,
cluster=self.cluster,
)
assert len(container_nodes) == 4, f"Expected 4 nodes, actually - {len(container_nodes)}"
with allure.step(f"Put an object in a bucket - {bucket}"):
file_one = generate_file(simple_object_size.value)
s3_client.put_object(
bucket=bucket,
filepath=file_one,
)
os.remove(file_one)
with allure.step("Check that the object is placed in the bucket and it is alone there"):
list_objects_versions = s3_client.list_objects_versions(bucket=bucket)
assert (
len(list_objects_versions) == 1
), f"Expected 1, actual {len(list_objects_versions)}"
with allure.step("Find the ID of the placed object"):
oid = list_objects(
wallet=default_wallet,
shell=self.shell,
container_id=bucket_cid,
endpoint=self.cluster.default_rpc_endpoint,
)
with allure.step(f"Find the nodes on which the object lies - {oid}"):
objects_nodes = get_object_nodes(
cluster=self.cluster,
wallet=default_wallet,
cid=bucket_cid,
oid=oid[0],
shell=self.shell,
endpoint=self.cluster.default_rpc_endpoint,
)
with allure.step("Split cluster"):
segment_one, segment_two = self.split_nodes(objects_nodes)
with allure.step(f"Enable splitting on nodes - {segment_one}"):
for node in segment_one:
cluster_state_controller.drop_traffic(
mode="nodes", node=node, wakeup_timeout=60, block_nodes=segment_two
)
with allure.step(f"Checking ping from {node} to {segment_two}, expected False"):
assert cluster_state_controller.ping_traffic(
node, segment_two, 1
), "Expected False"
for node in segment_two:
with allure.step(f"Checking ping from {node} to {segment_two}, expected False"):
assert cluster_state_controller.ping_traffic(
node, segment_one, 1
), "Expected False"
with allure.step("Put the 2nd version of the same object on both halves of the cluster"):
for node in objects_nodes:
file = generate_file(simple_object_size.value)
with allure.step(f"Change s3 endpoint to - {node.s3_gate.get_endpoint()}"):
s3_client.set_endpoint(node.s3_gate.get_endpoint())
with allure.step(f"Put object in bucket - {bucket}"):
s3_client.put_object(
bucket=bucket,
filepath=file,
)
os.remove(file)
time.sleep(datetime_utils.parse_time(MORPH_BLOCK_TIME))
with allure.step(
f"Change s3 endpoint to default - {self.cluster.default_s3_gate_endpoint}"
):
s3_client.set_endpoint(self.cluster.default_s3_gate_endpoint)
with allure.step("Turn off split"):
for node in segment_one:
with allure.step(f"Turn off to split - {node}"):
cluster_state_controller.restore_traffic(mode="nodes", node=node)
with allure.step(f"Checking ping from {node} to {segment_two}, expected True"):
assert cluster_state_controller.ping_traffic(
node, segment_two, 0
), "Expected True"
for node in segment_two:
with allure.step(f"Checking ping from {node} to {segment_one}, expected True"):
assert cluster_state_controller.ping_traffic(
node, segment_one, 0
), "Expected True"
with allure.step("Tick 1 epoch and wait 2 block"):
self.tick_epochs(1, self.cluster.storage_nodes[0])
time.sleep(datetime_utils.parse_time(MORPH_BLOCK_TIME) * 2)
with allure.step(
"Check that the nodes of both halves of the cluster contain all 3 downloaded versions"
):
objects_bucket = s3_client.list_objects_versions(bucket=bucket)
assert len(objects_bucket) == 3, f"Expected 3, actually {len(objects_bucket)}"