Delete split-brain test #99

Merged
d.zayakin merged 1 commit from d.zayakin/frostfs-testcases:delete-split-brain into master 2023-10-04 08:27:54 +00:00

View file

@ -1,28 +1,16 @@
import logging
import os
import random
import re
import time
from time import sleep
import allure
import pytest
from frostfs_testlib.resources.common import MORPH_BLOCK_TIME
from frostfs_testlib.resources.wellknown_acl import PUBLIC_ACL
from frostfs_testlib.s3 import AwsCliClient
from frostfs_testlib.s3.interfaces import S3ClientWrapper, VersioningStatus
from frostfs_testlib.steps.cli.container import (
create_container,
list_objects,
search_nodes_with_container,
)
from frostfs_testlib.steps.cli.object import get_object, get_object_nodes, put_object_to_random_node
from frostfs_testlib.steps.s3.s3_helper import set_bucket_versioning
from frostfs_testlib.storage.cluster import Cluster, ClusterNode
from frostfs_testlib.steps.cli.container import create_container
from frostfs_testlib.steps.cli.object import get_object, put_object_to_random_node
from frostfs_testlib.storage.cluster import ClusterNode
from frostfs_testlib.storage.controllers import ClusterStateController
from frostfs_testlib.storage.dataclasses.object_size import ObjectSize
from frostfs_testlib.testing.cluster_test_base import ClusterTestBase
from frostfs_testlib.utils import datetime_utils
from frostfs_testlib.utils.failover_utils import (
wait_all_storage_nodes_returned,
wait_object_replication,
@ -36,21 +24,6 @@ PORTS_TO_BLOCK = [STORAGE_NODE_COMMUNICATION_PORT, STORAGE_NODE_COMMUNICATION_PO
blocked_nodes: list[ClusterNode] = []
def pytest_generate_tests(metafunc):
if "s3_client" in metafunc.fixturenames:
metafunc.parametrize(
"s3_client, auth_container_placement_policy",
[
(
AwsCliClient,
"REP $ALPHABET_NODE_COUNT$ SELECT 4 FROM ALPHA FILTER 'role' EQ 'alphabet' AS ALPHA",
)
],
ids=["aws"],
indirect=True,
)
@pytest.mark.failover
@pytest.mark.failover_network
class TestFailoverNetwork(ClusterTestBase):
@ -158,283 +131,3 @@ class TestFailoverNetwork(ClusterTestBase):
wallet, cid, oid, shell=self.shell, endpoint=new_nodes[0].get_rpc_endpoint()
)
assert get_file_hash(source_file_path) == get_file_hash(got_file_path)
@pytest.mark.failover
@pytest.mark.failover_split_brain
class TestFailoverSplitBrain(ClusterTestBase):
@pytest.fixture(autouse=True)
def restore_traffic_system(self, cluster_state_controller: ClusterStateController) -> None:
yield
cluster_state_controller.restore_all_traffic()
def search_container_bucket(self, bucket):
output = self.shell.exec(
f"curl -k --head https://{self.cluster.default_rpc_endpoint.split(':')[0]}/{bucket}"
).stdout.strip()
pattern = r"x-container-id: (.*)"
matches = re.findall(pattern, output)
if matches:
return matches[0]
else:
return logger.info(f"Key {pattern} no search")
def split_nodes(self, nodes_list: list[ClusterNode]) -> tuple[list[ClusterNode], ...]:
count = len(nodes_list)
splitted = []
free_nodes = list(set(self.cluster.cluster_nodes) - set(nodes_list))
for i in range(count):
splitted.append(nodes_list[i::count] + free_nodes[i::count])
return tuple(s for s in splitted)
@allure.title(
"Replication tree after split brain, versioning bucket (placement_policy={auth_container_placement_policy}, s3_client={s3_client})",
)
def test_versioning_bucket_after_split_brain(
self,
cluster_state_controller: ClusterStateController,
bucket: str,
s3_client: S3ClientWrapper,
default_wallet,
simple_object_size: ObjectSize,
):
object_version = []
with allure.step(f"Search ID container for bucket - {bucket}"):
bucket_cid = self.search_container_bucket(bucket)
with allure.step(f"Check, container - {bucket_cid}, which is located on 4 nodes"):
container_nodes = search_nodes_with_container(
wallet=default_wallet,
cid=bucket_cid,
shell=self.shell,
endpoint=self.cluster.default_rpc_endpoint,
cluster=self.cluster,
)
assert len(container_nodes) == 4, f"Expected 4 nodes, actually - {len(container_nodes)}"
with allure.step(f"Enable versioning in a bucket - {bucket}"):
set_bucket_versioning(
s3_client=s3_client, bucket=bucket, status=VersioningStatus.ENABLED
)
with allure.step("Check that versioning is enabled in the bucket"):
assert s3_client.get_bucket_versioning_status(bucket) == "Enabled"
with allure.step(f"Put an object in a bucket - {bucket}"):
file_one = generate_file(simple_object_size.value)
object_version.append(
s3_client.put_object(
bucket=bucket,
filepath=file_one,
)
)
os.remove(file_one)
with allure.step("Check that the object is placed in the bucket and it is alone there"):
list_objects_versions = s3_client.list_objects_versions(bucket=bucket)
assert (
len(list_objects_versions) == 1
), f"Expected 1, actual {len(list_objects_versions)}"
with allure.step("Find the ID of the placed object"):
oid = list_objects(
wallet=default_wallet,
shell=self.shell,
container_id=bucket_cid,
endpoint=self.cluster.default_rpc_endpoint,
)
with allure.step(f"Find the nodes on which the object lies - {oid}"):
objects_nodes = get_object_nodes(
cluster=self.cluster,
wallet=default_wallet,
cid=bucket_cid,
oid=oid[0],
shell=self.shell,
endpoint=self.cluster.default_rpc_endpoint,
)
with allure.step("Split cluster"):
segment_one, segment_two = self.split_nodes(objects_nodes)
with allure.step(f"Enable splitting on nodes - {segment_one}"):
for node in segment_one:
cluster_state_controller.drop_traffic(
mode="nodes", node=node, wakeup_timeout=60, block_nodes=segment_two
)
with allure.step(f"Checking ping from {node} to {segment_two}, expected False"):
assert (
cluster_state_controller.ping_traffic(node, segment_two, 1) is True
), "Expected True"
for node in segment_two:
with allure.step(f"Checking ping from {node} to {segment_two}, expected False"):
assert (
cluster_state_controller.ping_traffic(node, segment_one, 1) is True
), "Expected True"
with allure.step("Put the 2nd version of the same object on both halves of the cluster"):
for node in objects_nodes:
file = generate_file(simple_object_size.value)
with allure.step(f"Change s3 endpoint to - {node.s3_gate.get_endpoint()}"):
s3_client.set_endpoint(node.s3_gate.get_endpoint())
with allure.step(f"Put object in bucket - {bucket}"):
object_version.append(
s3_client.put_object(
bucket=bucket,
filepath=file,
)
)
os.remove(file)
time.sleep(datetime_utils.parse_time(MORPH_BLOCK_TIME))
with allure.step(
f"Change s3 endpoint to default - {self.cluster.default_s3_gate_endpoint}"
):
s3_client.set_endpoint(self.cluster.default_s3_gate_endpoint)
with allure.step("Turn off split"):
for node in segment_one:
with allure.step(f"Turn off to split - {node}"):
cluster_state_controller.restore_traffic(mode="nodes", node=node)
with allure.step(f"Checking ping from {node} to {segment_two}, expected True"):
assert cluster_state_controller.ping_traffic(
node, segment_two, 0
), "Expected True"
for node in segment_two:
with allure.step(f"Checking ping from {node} to {segment_one}, expected True"):
assert cluster_state_controller.ping_traffic(
node, segment_one, 0
), "Expected True"
with allure.step("Tick 1 epoch and wait 2 block"):
self.tick_epochs(1, self.cluster.storage_nodes[0])
time.sleep(datetime_utils.parse_time(MORPH_BLOCK_TIME) * 2)
with allure.step(
"Check that the nodes of both halves of the cluster contain all 3 downloaded versions"
):
bucket_versions = s3_client.list_objects_versions(bucket=bucket)
bucket_versions = sorted(bucket_versions, key=lambda x: x["LastModified"])
assert len(bucket_versions) == 3, f"Expected 3, actually {len(bucket_versions)}"
with allure.step("Check that the latest version is the one that was uploaded last in time"):
assert object_version[-1] == bucket_versions[-1]["VersionId"], (
f"{object_version[-1]} " f"!= {bucket_versions[-1]['VersionId']}"
)
@allure.title(
"Replication tree after split brain, no version bucket (placement_policy={auth_container_placement_policy}, s3_client={s3_client})"
)
def test_no_version_bucket_after_split_brain(
self,
cluster_state_controller: ClusterStateController,
bucket: str,
s3_client: S3ClientWrapper,
default_wallet,
simple_object_size: ObjectSize,
):
with allure.step(f"Search ID container for bucket - {bucket}"):
bucket_cid = self.search_container_bucket(bucket)
with allure.step(f"Check, container - {bucket_cid}, which is located on 4 nodes"):
container_nodes = search_nodes_with_container(
wallet=default_wallet,
cid=bucket_cid,
shell=self.shell,
endpoint=self.cluster.default_rpc_endpoint,
cluster=self.cluster,
)
assert len(container_nodes) == 4, f"Expected 4 nodes, actually - {len(container_nodes)}"
with allure.step(f"Put an object in a bucket - {bucket}"):
file_one = generate_file(simple_object_size.value)
s3_client.put_object(
bucket=bucket,
filepath=file_one,
)
os.remove(file_one)
with allure.step("Check that the object is placed in the bucket and it is alone there"):
list_objects_versions = s3_client.list_objects_versions(bucket=bucket)
assert (
len(list_objects_versions) == 1
), f"Expected 1, actual {len(list_objects_versions)}"
with allure.step("Find the ID of the placed object"):
oid = list_objects(
wallet=default_wallet,
shell=self.shell,
container_id=bucket_cid,
endpoint=self.cluster.default_rpc_endpoint,
)
with allure.step(f"Find the nodes on which the object lies - {oid}"):
objects_nodes = get_object_nodes(
cluster=self.cluster,
wallet=default_wallet,
cid=bucket_cid,
oid=oid[0],
shell=self.shell,
endpoint=self.cluster.default_rpc_endpoint,
)
with allure.step("Split cluster"):
segment_one, segment_two = self.split_nodes(objects_nodes)
with allure.step(f"Enable splitting on nodes - {segment_one}"):
for node in segment_one:
cluster_state_controller.drop_traffic(
mode="nodes", node=node, wakeup_timeout=60, block_nodes=segment_two
)
with allure.step(f"Checking ping from {node} to {segment_two}, expected False"):
assert cluster_state_controller.ping_traffic(
node, segment_two, 1
), "Expected False"
for node in segment_two:
with allure.step(f"Checking ping from {node} to {segment_two}, expected False"):
assert cluster_state_controller.ping_traffic(
node, segment_one, 1
), "Expected False"
with allure.step("Put the 2nd version of the same object on both halves of the cluster"):
for node in objects_nodes:
file = generate_file(simple_object_size.value)
with allure.step(f"Change s3 endpoint to - {node.s3_gate.get_endpoint()}"):
s3_client.set_endpoint(node.s3_gate.get_endpoint())
with allure.step(f"Put object in bucket - {bucket}"):
s3_client.put_object(
bucket=bucket,
filepath=file,
)
os.remove(file)
time.sleep(datetime_utils.parse_time(MORPH_BLOCK_TIME))
with allure.step(
f"Change s3 endpoint to default - {self.cluster.default_s3_gate_endpoint}"
):
s3_client.set_endpoint(self.cluster.default_s3_gate_endpoint)
with allure.step("Turn off split"):
for node in segment_one:
with allure.step(f"Turn off to split - {node}"):
cluster_state_controller.restore_traffic(mode="nodes", node=node)
with allure.step(f"Checking ping from {node} to {segment_two}, expected True"):
assert cluster_state_controller.ping_traffic(
node, segment_two, 0
), "Expected True"
for node in segment_two:
with allure.step(f"Checking ping from {node} to {segment_one}, expected True"):
assert cluster_state_controller.ping_traffic(
node, segment_one, 0
), "Expected True"
with allure.step("Tick 1 epoch and wait 2 block"):
self.tick_epochs(1, self.cluster.storage_nodes[0])
time.sleep(datetime_utils.parse_time(MORPH_BLOCK_TIME) * 2)
with allure.step(
"Check that the nodes of both halves of the cluster contain all 3 downloaded versions"
):
objects_bucket = s3_client.list_objects_versions(bucket=bucket)
assert len(objects_bucket) == 3, f"Expected 3, actually {len(objects_bucket)}"