From 5ec844417aa2e8c7f73b6a6ff356d23feaacd9eb Mon Sep 17 00:00:00 2001
From: "a.berezin" <a.berezin@yadro.com>
Date: Thu, 27 Jun 2024 12:31:02 +0300
Subject: [PATCH] [#259] Optimize failover tests by paralleling long steps

Signed-off-by: a.berezin <a.berezin@yadro.com>
---
 .../failovers/test_failover_server.py         | 158 ++++++++----------
 1 file changed, 67 insertions(+), 91 deletions(-)

diff --git a/pytest_tests/testsuites/failovers/test_failover_server.py b/pytest_tests/testsuites/failovers/test_failover_server.py
index 39f30d5e..6b03fca3 100644
--- a/pytest_tests/testsuites/failovers/test_failover_server.py
+++ b/pytest_tests/testsuites/failovers/test_failover_server.py
@@ -1,5 +1,5 @@
+import itertools
 import logging
-import os.path
 import random
 
 import allure
@@ -17,7 +17,6 @@ from frostfs_testlib.storage.dataclasses.wallet import WalletInfo
 from frostfs_testlib.testing.cluster_test_base import ClusterTestBase
 from frostfs_testlib.testing.parallel import parallel
 from frostfs_testlib.testing.test_control import wait_for_success
-from frostfs_testlib.utils.failover_utils import wait_object_replication
 from frostfs_testlib.utils.file_utils import get_file_hash
 from pytest import FixtureRequest
 
@@ -35,7 +34,7 @@ class TestFailoverServer(ClusterTestBase):
     def wait_node_in_map(self, *args, **kwargs):
         check_node_in_map(*args, **kwargs)
 
-    @reporter.step("Create {count_containers} containers and {count_files} objects")
+    @allure.title("[Test] Create containers")
     @pytest.fixture
     def containers(
         self,
@@ -45,22 +44,24 @@ class TestFailoverServer(ClusterTestBase):
 
         placement_rule = "REP 2 CBF 2 SELECT 2 FROM *"
 
-        containers = []
+        containers_count = request.param
+        results = parallel(
+            [create_container for _ in range(containers_count)],
+            wallet=default_wallet,
+            shell=self.shell,
+            endpoint=self.cluster.default_rpc_endpoint,
+            rule=placement_rule,
+            basic_acl=PUBLIC_ACL,
+        )
 
-        for _ in range(request.param):
-            cont_id = create_container(
-                default_wallet,
-                shell=self.shell,
-                endpoint=self.cluster.default_rpc_endpoint,
-                rule=placement_rule,
-                basic_acl=PUBLIC_ACL,
-            )
-            storage_cont_info = StorageContainerInfo(cont_id, default_wallet)
-            containers.append(StorageContainer(storage_cont_info, self.shell, self.cluster))
+        containers = [
+            StorageContainer(StorageContainerInfo(result.result(), default_wallet), self.shell, self.cluster)
+            for result in results
+        ]
 
         return containers
 
-    @reporter.step("Creation container")
+    @allure.title("[Test] Create container")
     @pytest.fixture()
     def container(self, default_wallet: WalletInfo) -> StorageContainer:
         select = len(self.cluster.cluster_nodes)
@@ -75,7 +76,7 @@ class TestFailoverServer(ClusterTestBase):
         storage_cont_info = StorageContainerInfo(cont_id, default_wallet)
         return StorageContainer(storage_cont_info, self.shell, self.cluster)
 
-    @reporter.step("Create object and delete after test")
+    @allure.title("[Class] Create objects")
     @pytest.fixture(scope="class")
     def storage_objects(
         self,
@@ -83,69 +84,48 @@ class TestFailoverServer(ClusterTestBase):
         containers: list[StorageContainer],
         simple_object_size: ObjectSize,
         complex_object_size: ObjectSize,
-    ) -> StorageObjectInfo:
-        count_object = request.param
-        object_sizes = [simple_object_size, complex_object_size]
-        object_list: list[StorageObjectInfo] = []
-        for cont in containers:
-            for _ in range(count_object):
-                object_list.append(cont.generate_object(size=random.choice(object_sizes).value))
-
-        for storage_object in object_list:
-            os.remove(storage_object.file_path)
-
-        yield object_list
-
-    @reporter.step("Upload object with nodes and compare")
-    def get_corrupted_objects_list(
-        self, nodes: list[StorageNode], storage_objects: list[StorageObjectInfo]
     ) -> list[StorageObjectInfo]:
-        corrupted_objects = []
-        errors_get = []
-        for node in nodes:
-            for storage_object in storage_objects:
-                try:
-                    got_file_path = get_object(
-                        storage_object.wallet,
-                        storage_object.cid,
-                        storage_object.oid,
-                        endpoint=node.get_rpc_endpoint(),
-                        shell=self.shell,
-                        timeout="60s",
-                    )
-                    if storage_object.file_hash != get_file_hash(got_file_path):
-                        corrupted_objects.append(storage_object)
-                    os.remove(got_file_path)
-                except RuntimeError:
-                    errors_get.append(storage_object.oid)
+        object_count = request.param
+        sizes_samples = [simple_object_size, complex_object_size]
+        samples_count = len(sizes_samples)
+        assert object_count >= samples_count, f"Object count is too low, must be >= {samples_count}"
 
-        assert len(errors_get) == 0, f"Get failed - {errors_get}"
-        return corrupted_objects
+        sizes_weights = [2, 1]
+        sizes = sizes_samples + random.choices(sizes_samples, weights=sizes_weights, k=object_count - samples_count)
 
-    def check_objects_replication(
-        self, storage_objects: list[StorageObjectInfo], storage_nodes: list[StorageNode]
-    ) -> None:
-        for storage_object in storage_objects:
-            wait_object_replication(
-                storage_object.cid,
-                storage_object.oid,
-                2,
-                shell=self.shell,
-                nodes=storage_nodes,
-                sleep_interval=45,
-                attempts=60,
-            )
+        results = parallel(
+            [container.generate_object for _ in sizes for container in containers],
+            size=itertools.cycle([size.value for size in sizes]),
+        )
 
+        return [result.result() for result in results]
+
+    @allure.title("[Test] Create objects and get nodes with object")
     @pytest.fixture()
     def object_and_nodes(
         self, simple_object_size: ObjectSize, container: StorageContainer
     ) -> tuple[StorageObjectInfo, list[ClusterNode]]:
         object_info = container.generate_object(simple_object_size.value)
-        object_nodes = get_object_nodes(
-            cluster=self.cluster, cid=object_info.cid, oid=object_info.oid, alive_node=self.cluster.cluster_nodes[0]
-        )
+        object_nodes = get_object_nodes(self.cluster, object_info.cid, object_info.oid, self.cluster.cluster_nodes[0])
         return object_info, object_nodes
 
+    def _verify_object(self, storage_object: StorageObjectInfo, node: StorageNode):
+        with reporter.step(f"Verify object {storage_object.oid} from node {node}"):
+            file_path = get_object(
+                storage_object.wallet,
+                storage_object.cid,
+                storage_object.oid,
+                endpoint=node.get_rpc_endpoint(),
+                shell=self.shell,
+                timeout="60s",
+            )
+
+            assert storage_object.file_hash == get_file_hash(file_path)
+
+    @reporter.step("Verify objects")
+    def verify_objects(self, nodes: list[StorageNode], storage_objects: list[StorageObjectInfo]) -> None:
+        parallel(self._verify_object, storage_objects * len(nodes), node=itertools.cycle(nodes))
+
     @allure.title("Full shutdown node")
     @pytest.mark.parametrize("containers, storage_objects", [(5, 10)], indirect=True)
     def test_complete_node_shutdown(
@@ -154,22 +134,21 @@ class TestFailoverServer(ClusterTestBase):
         node_under_test: ClusterNode,
         cluster_state_controller: ClusterStateController,
     ):
-        with reporter.step(f"Remove {node_under_test} from the list of nodes"):
+        with reporter.step(f"Remove one node from the list of nodes"):
             alive_nodes = list(set(self.cluster.cluster_nodes) - {node_under_test})
 
         storage_nodes = [cluster.storage_node for cluster in alive_nodes]
 
-        with reporter.step("Tick epoch and wait for 2 blocks"):
-            self.tick_epochs(1, storage_nodes[0], wait_block=2)
+        with reporter.step("Tick 2 epochs and wait for 2 blocks"):
+            self.tick_epochs(2, storage_nodes[0], wait_block=2)
 
         with reporter.step(f"Stop node"):
-            cluster_state_controller.stop_node_host(node=node_under_test, mode="hard")
+            cluster_state_controller.stop_node_host(node_under_test, "hard")
 
         with reporter.step("Verify that there are no corrupted objects"):
-            corrupted_objects_list = self.get_corrupted_objects_list(storage_nodes, storage_objects)
-            assert not corrupted_objects_list
+            self.verify_objects(storage_nodes, storage_objects)
 
-        with reporter.step(f"check {node_under_test.storage_node} in map"):
+        with reporter.step(f"Check node still in map"):
             self.wait_node_in_map(node_under_test.storage_node, self.shell, alive_node=storage_nodes[0])
 
         count_tick_epoch = int(alive_nodes[0].ir_node.get_netmap_cleaner_threshold()) + 4
@@ -177,12 +156,11 @@ class TestFailoverServer(ClusterTestBase):
         with reporter.step(f"Tick {count_tick_epoch} epochs and wait for 2 blocks"):
             self.tick_epochs(count_tick_epoch, storage_nodes[0], wait_block=2)
 
-        with reporter.step(f"Check {node_under_test} in not map"):
+        with reporter.step(f"Check node in not map after {count_tick_epoch} epochs"):
             self.wait_node_not_in_map(node_under_test.storage_node, self.shell, alive_node=storage_nodes[0])
 
-        with reporter.step(f"Verify that there are no corrupted objects after {count_tick_epoch} epoch"):
-            corrupted_objects_list = self.get_corrupted_objects_list(storage_nodes, storage_objects)
-            assert not corrupted_objects_list
+        with reporter.step(f"Verify that there are no corrupted objects after {count_tick_epoch} epochs"):
+            self.verify_objects(storage_nodes, storage_objects)
 
     @allure.title("Temporarily disable a node")
     @pytest.mark.parametrize("containers, storage_objects", [(5, 10)], indirect=True)
@@ -192,27 +170,26 @@ class TestFailoverServer(ClusterTestBase):
         node_under_test: ClusterNode,
         cluster_state_controller: ClusterStateController,
     ):
-        with reporter.step(f"Remove {node_under_test} from the list of nodes"):
+        with reporter.step(f"Remove one node from the list"):
             storage_nodes = list(set(self.cluster.storage_nodes) - {node_under_test.storage_node})
 
-        with reporter.step("Tick epoch and wait for 2 blocks"):
-            self.tick_epochs(1, storage_nodes[0], wait_block=2)
+        with reporter.step("Tick 2 epochs and wait for 2 blocks"):
+            self.tick_epochs(2, storage_nodes[0], wait_block=2)
 
         with reporter.step(f"Stop node"):
             cluster_state_controller.stop_node_host(node_under_test, "hard")
 
         with reporter.step("Verify that there are no corrupted objects"):
-            corrupted_objects_list = self.get_corrupted_objects_list(storage_nodes, storage_objects)
-            assert not corrupted_objects_list
+            self.verify_objects(storage_nodes, storage_objects)
 
-        with reporter.step(f"Check {node_under_test} in map"):
+        with reporter.step(f"Check node still in map"):
             self.wait_node_in_map(node_under_test.storage_node, self.shell, alive_node=storage_nodes[0])
 
-        cluster_state_controller.start_node_host(node_under_test)
+        with reporter.step(f"Start node"):
+            cluster_state_controller.start_node_host(node_under_test)
 
         with reporter.step("Verify that there are no corrupted objects"):
-            corrupted_objects_list = self.get_corrupted_objects_list(storage_nodes, storage_objects)
-            assert not corrupted_objects_list
+            self.verify_objects(storage_nodes, storage_objects)
 
     @allure.title("Not enough nodes in the container with policy - 'REP 3 CBF 1 SELECT 4 FROM *'")
     def test_not_enough_nodes_in_container_rep_3(
@@ -237,7 +214,7 @@ class TestFailoverServer(ClusterTestBase):
         with reporter.step(f"Get object from node with object"):
             get_object(default_wallet, object_info.cid, object_info.oid, self.shell, endpoint_with_object)
 
-        with reporter.step(f"Put operation to node with object, expect error"):
+        with reporter.step(f"[Negative] Put operation to node with object"):
             with pytest.raises(RuntimeError):
                 put_object(default_wallet, simple_file, object_info.cid, self.shell, endpoint_with_object)
 
@@ -277,8 +254,7 @@ class TestFailoverServer(ClusterTestBase):
             oid_2 = put_object(default_wallet, simple_file, cid, self.shell, alive_endpoint_with_object)
 
         with reporter.step("Get object from alive node with object"):
-            get_file = get_object(default_wallet, cid, oid_2, self.shell, alive_endpoint_with_object)
-            os.remove(get_file)
+            get_object(default_wallet, cid, oid_2, self.shell, alive_endpoint_with_object)
 
         with reporter.step("Create container on alive node"):
             create_container(