diff --git a/pytest_tests/testsuites/replication/test_ec_replication.py b/pytest_tests/testsuites/replication/test_ec_replication.py index 0877da59..c445b728 100644 --- a/pytest_tests/testsuites/replication/test_ec_replication.py +++ b/pytest_tests/testsuites/replication/test_ec_replication.py @@ -20,8 +20,9 @@ from frostfs_testlib.storage.grpc_operations.interfaces import GrpcClientWrapper from frostfs_testlib.testing.cluster_test_base import ClusterTestBase from frostfs_testlib.testing.test_control import wait_for_success from frostfs_testlib.utils import datetime_utils -from frostfs_testlib.utils.file_utils import generate_file, get_file_hash +from frostfs_testlib.utils.file_utils import TestFile, generate_file, get_file_hash +from ...helpers.container_request import PUBLIC_WITH_POLICY, requires_container from ...resources.common import S3_POLICY_FILE_LOCATION @@ -142,39 +143,36 @@ class TestECReplication(ClusterTestBase): assert len(chunks_nodes) == count @allure.title("Create container with EC policy (size={object_size})") - def test_create_container_with_ec_policy(self, object_size: ObjectSize, rep_count: int, grpc_client: GrpcClientWrapper) -> None: - test_file = generate_file(object_size.value) - - with reporter.step("Create container."): - cid = grpc_client.container.create(self.cluster.default_rpc_endpoint, policy="EC 2.1", await_mode=True) - + @requires_container(PUBLIC_WITH_POLICY("EC 2.1")) + def test_create_container_with_ec_policy( + self, container: str, rep_count: int, grpc_client: GrpcClientWrapper, test_file: TestFile + ) -> None: with reporter.step("Put object in container."): - oid = grpc_client.object.put(test_file, cid, self.cluster.default_rpc_endpoint) + oid = grpc_client.object.put(test_file, container, self.cluster.default_rpc_endpoint) with reporter.step("Check replication chunks."): - assert self.check_replication(rep_count, grpc_client, cid, oid) + assert self.check_replication(rep_count, grpc_client, container, oid) @allure.title("Lose node with chunk data") @pytest.mark.failover + @requires_container(PUBLIC_WITH_POLICY("EC 3.1")) def test_lose_node_with_data_chunk( self, grpc_client: GrpcClientWrapper, simple_object_size: ObjectSize, cluster_state_controller: ClusterStateController, + container: str, disable_policer: None, ) -> None: - with reporter.step("Create container."): - cid = grpc_client.container.create(self.cluster.default_rpc_endpoint, policy="EC 3.1", await_mode=True) - with reporter.step("Put object in container."): test_file = generate_file(simple_object_size.value) - oid = grpc_client.object.put(test_file, cid, self.cluster.default_rpc_endpoint) + oid = grpc_client.object.put(test_file, container, self.cluster.default_rpc_endpoint) with reporter.step("Check chunk replication on 4 nodes."): - assert self.check_replication(4, grpc_client, cid, oid) + assert self.check_replication(4, grpc_client, container, oid) with reporter.step("Search node data chunk"): - chunk = grpc_client.object.chunks.get_first_data(self.cluster.default_rpc_endpoint, cid, oid=oid) + chunk = grpc_client.object.chunks.get_first_data(self.cluster.default_rpc_endpoint, container, oid=oid) chunk_node = grpc_client.object.chunks.get_chunk_node(self.cluster, chunk) with reporter.step("Stop node with data chunk."): @@ -182,33 +180,32 @@ class TestECReplication(ClusterTestBase): with reporter.step("Get object"): node = list(set(self.cluster.cluster_nodes) - {chunk_node[0]})[0] - grpc_client.object.get(cid, oid, node.storage_node.get_rpc_endpoint()) + grpc_client.object.get(container, oid, node.storage_node.get_rpc_endpoint()) with reporter.step("Start stopped node, and check replication chunks."): cluster_state_controller.start_node_host(chunk_node[0]) - self.wait_replication(4, grpc_client, cid, oid) + self.wait_replication(4, grpc_client, container, oid) @allure.title("Lose node with chunk parity") @pytest.mark.failover + @requires_container(PUBLIC_WITH_POLICY("EC 3.1")) def test_lose_node_with_parity_chunk( self, grpc_client: GrpcClientWrapper, simple_object_size: ObjectSize, cluster_state_controller: ClusterStateController, + container: str, disable_policer: None, ) -> None: - with reporter.step("Create container."): - cid = grpc_client.container.create(self.cluster.default_rpc_endpoint, policy="EC 3.1", await_mode=True) - with reporter.step("Put object in container."): test_file = generate_file(simple_object_size.value) - oid = grpc_client.object.put(test_file, cid, self.cluster.default_rpc_endpoint) + oid = grpc_client.object.put(test_file, container, self.cluster.default_rpc_endpoint) with reporter.step("Check chunk replication on 4 nodes."): - assert self.check_replication(4, grpc_client, cid, oid) + assert self.check_replication(4, grpc_client, container, oid) with reporter.step("Search node with parity chunk"): - chunk = grpc_client.object.chunks.get_parity(self.cluster.default_rpc_endpoint, cid, oid=oid) + chunk = grpc_client.object.chunks.get_parity(self.cluster.default_rpc_endpoint, container, oid=oid) chunk_node = grpc_client.object.chunks.get_chunk_node(self.cluster, chunk)[0] with reporter.step("Stop node parity chunk."): @@ -216,35 +213,34 @@ class TestECReplication(ClusterTestBase): with reporter.step("Get object, expect success."): node = list(set(self.cluster.cluster_nodes) - {chunk_node})[0] - grpc_client.object.get(cid, oid, node.storage_node.get_rpc_endpoint()) + grpc_client.object.get(container, oid, node.storage_node.get_rpc_endpoint()) with reporter.step("Start stoped node, and check replication chunks."): cluster_state_controller.start_node_host(chunk_node) - self.wait_replication(4, grpc_client, cid, oid) + self.wait_replication(4, grpc_client, container, oid) @allure.title("Lose nodes with chunk data and parity") @pytest.mark.failover + @requires_container(PUBLIC_WITH_POLICY("EC 3.1")) def test_lose_nodes_data_chunk_and_parity( self, grpc_client: GrpcClientWrapper, simple_object_size: ObjectSize, cluster_state_controller: ClusterStateController, + container: str, disable_policer: None, ) -> None: - with reporter.step("Create container."): - cid = grpc_client.container.create(self.cluster.default_rpc_endpoint, policy="EC 3.1", await_mode=True) - with reporter.step("Put object in container."): test_file = generate_file(simple_object_size.value) - oid = grpc_client.object.put(test_file, cid, self.cluster.default_rpc_endpoint) + oid = grpc_client.object.put(test_file, container, self.cluster.default_rpc_endpoint) with reporter.step("Check count chunks, expect 4."): - assert self.check_replication(4, grpc_client, cid, oid) + assert self.check_replication(4, grpc_client, container, oid) with reporter.step("Search node data chunk and node parity chunk"): - data_chunk = grpc_client.object.chunks.get_first_data(self.cluster.default_rpc_endpoint, cid, oid=oid) + data_chunk = grpc_client.object.chunks.get_first_data(self.cluster.default_rpc_endpoint, container, oid=oid) data_chunk_node = grpc_client.object.chunks.get_chunk_node(self.cluster, data_chunk)[0] - parity_chunk = grpc_client.object.chunks.get_parity(self.cluster.default_rpc_endpoint, cid, oid=oid) + parity_chunk = grpc_client.object.chunks.get_parity(self.cluster.default_rpc_endpoint, container, oid=oid) parity_chunk_node = grpc_client.object.chunks.get_chunk_node(self.cluster, parity_chunk)[0] with reporter.step("Stop node with data chunk."): @@ -252,11 +248,11 @@ class TestECReplication(ClusterTestBase): with reporter.step("Get object"): node = list(set(self.cluster.cluster_nodes) - {data_chunk_node, parity_chunk_node})[0] - grpc_client.object.get(cid, oid, node.storage_node.get_rpc_endpoint()) + grpc_client.object.get(container, oid, node.storage_node.get_rpc_endpoint()) with reporter.step("Start stopped host and check chunks."): cluster_state_controller.start_node_host(data_chunk_node) - self.wait_replication(4, grpc_client, cid, oid) + self.wait_replication(4, grpc_client, container, oid) with reporter.step("Stop node with parity chunk and one all node."): cluster_state_controller.stop_node_host(data_chunk_node, "hard") @@ -264,35 +260,34 @@ class TestECReplication(ClusterTestBase): with reporter.step("Get object, expect error."): with pytest.raises(RuntimeError): - grpc_client.object.get(cid, oid, node.storage_node.get_rpc_endpoint()) + grpc_client.object.get(container, oid, node.storage_node.get_rpc_endpoint()) with reporter.step("Start stopped nodes and check replication chunk."): cluster_state_controller.start_stopped_hosts() - self.wait_replication(4, grpc_client, cid, oid) + self.wait_replication(4, grpc_client, container, oid) @allure.title("Policer work with chunk") @pytest.mark.failover + @requires_container(PUBLIC_WITH_POLICY("EC 2.1")) def test_work_policer_with_nodes( self, simple_object_size: ObjectSize, grpc_client: GrpcClientWrapper, cluster_state_controller: ClusterStateController, + container: str, include_excluded_nodes: None, ) -> None: - with reporter.step("Create container."): - cid = grpc_client.container.create(self.cluster.default_rpc_endpoint, policy="EC 2.1", await_mode=True) - with reporter.step("Put object on container."): test_file = generate_file(simple_object_size.value) - oid = grpc_client.object.put(test_file, cid, self.cluster.default_rpc_endpoint) + oid = grpc_client.object.put(test_file, container, self.cluster.default_rpc_endpoint) with reporter.step("Check count chunks nodes on 3."): - assert self.check_replication(3, grpc_client, cid, oid) + assert self.check_replication(3, grpc_client, container, oid) with reporter.step("Search node with chunk."): - data_chunk = grpc_client.object.chunks.get_first_data(self.cluster.default_rpc_endpoint, cid, oid=oid) + data_chunk = grpc_client.object.chunks.get_first_data(self.cluster.default_rpc_endpoint, container, oid=oid) node_data_chunk = grpc_client.object.chunks.get_chunk_node(self.cluster, data_chunk)[0] - first_all_chunks = grpc_client.object.chunks.get_all(self.cluster.default_rpc_endpoint, cid, oid) + first_all_chunks = grpc_client.object.chunks.get_all(self.cluster.default_rpc_endpoint, container, oid) with reporter.step("Remove chunk node from network map"): cluster_state_controller.remove_node_from_netmap([node_data_chunk.storage_node]) @@ -305,10 +300,10 @@ class TestECReplication(ClusterTestBase): node = grpc_client.object.chunks.search_node_without_chunks( first_all_chunks, self.cluster, alive_node.storage_node.get_rpc_endpoint() )[0] - self.wait_replication(3, grpc_client, cid, oid) + self.wait_replication(3, grpc_client, container, oid) with reporter.step("Get new chunks"): - second_all_chunks = grpc_client.object.chunks.get_all(node.storage_node.get_rpc_endpoint(), cid, oid) + second_all_chunks = grpc_client.object.chunks.get_all(node.storage_node.get_rpc_endpoint(), container, oid) with reporter.step("Check that oid no change."): assert [chunk for chunk in second_all_chunks if data_chunk.object_id == chunk.object_id] @@ -316,11 +311,11 @@ class TestECReplication(ClusterTestBase): with reporter.step("Include node in netmap"): cluster_state_controller.include_node_to_netmap(node_data_chunk.storage_node, alive_node.storage_node) - self.wait_sync_count_chunks_nodes(grpc_client, cid, oid, 3) + self.wait_sync_count_chunks_nodes(grpc_client, container, oid, 3) @allure.title("EC X.Y combinations (nodes={node_count},policy={ec_policy},size={object_size})") def test_create_container_with_difference_count_nodes( - self, node_count: int, ec_policy: str, object_size: ObjectSize, grpc_client: GrpcClientWrapper + self, frostfs_cli: FrostfsCli, node_count: int, ec_policy: str, object_size: ObjectSize, grpc_client: GrpcClientWrapper ) -> None: with reporter.step("Create container."): expected_chunks = int(ec_policy.split(" ")[1].split(".")[0]) + int(ec_policy.split(" ")[1].split(".")[1]) @@ -328,6 +323,14 @@ class TestECReplication(ClusterTestBase): expected_chunks *= 4 cid = grpc_client.container.create(self.cluster.default_rpc_endpoint, policy=ec_policy, await_mode=True) + with reporter.step("Apply Ape rule for container"): + frostfs_cli.ape_manager.add( + self.cluster.default_rpc_endpoint, chain_id="allowAll", target_name=cid, target_type="container", rule="allow Object.* *" + ) + + with reporter.step("Wait for one block"): + self.wait_for_blocks() + with reporter.step("Put object in container."): test_file = generate_file(object_size.value) oid = grpc_client.object.put(test_file, cid, self.cluster.default_rpc_endpoint) @@ -341,64 +344,60 @@ class TestECReplication(ClusterTestBase): assert get_file_hash(test_file) == get_file_hash(file_with_node) @allure.title("Request PUT with copies_number flag") - def test_put_object_with_copies_number(self, grpc_client: GrpcClientWrapper, simple_object_size: ObjectSize) -> None: - with reporter.step("Create container."): - cid = grpc_client.container.create(self.cluster.default_rpc_endpoint, policy="EC 2.1", await_mode=True) - + @requires_container(PUBLIC_WITH_POLICY("EC 2.1")) + def test_put_object_with_copies_number(self, container: str, grpc_client: GrpcClientWrapper, simple_object_size: ObjectSize) -> None: with reporter.step("Put object in container with copies number = 1"): test_file = generate_file(simple_object_size.value) - oid = grpc_client.object.put(test_file, cid, self.cluster.default_rpc_endpoint, copies_number=1) + oid = grpc_client.object.put(test_file, container, self.cluster.default_rpc_endpoint, copies_number=1) with reporter.step("Check that count chunks > 1."): - chunks = grpc_client.object.chunks.get_all(self.cluster.default_rpc_endpoint, cid, oid) + chunks = grpc_client.object.chunks.get_all(self.cluster.default_rpc_endpoint, container, oid) assert len(chunks) > 1 @allure.title("Request PUT and 1 node off") @pytest.mark.failover + @requires_container(PUBLIC_WITH_POLICY("EC 3.1")) def test_put_object_with_off_cnr_node( - self, grpc_client: GrpcClientWrapper, cluster_state_controller: ClusterStateController, simple_object_size: ObjectSize + self, + container: str, + grpc_client: GrpcClientWrapper, + cluster_state_controller: ClusterStateController, + simple_object_size: ObjectSize, ) -> None: - with reporter.step("Create container."): - cid = grpc_client.container.create(self.cluster.default_rpc_endpoint, policy="EC 3.1", await_mode=True) - with reporter.step("Stop one node in container nodes"): cluster_state_controller.stop_node_host(self.cluster.cluster_nodes[1], "hard") with reporter.step("Put object in container, expect success for EC container."): test_file = generate_file(simple_object_size.value) - grpc_client.object.put(test_file, cid, self.cluster.default_rpc_endpoint, copies_number=1) + grpc_client.object.put(test_file, container, self.cluster.default_rpc_endpoint, copies_number=1) @allure.title("Request PUT (size={object_size})") - def test_put_object_with_ec_cnr(self, grpc_client: GrpcClientWrapper, object_size: ObjectSize) -> None: - with reporter.step("Create container."): - cid = grpc_client.container.create(self.cluster.default_rpc_endpoint, policy="EC 2.1", await_mode=True) - + @requires_container(PUBLIC_WITH_POLICY("EC 2.1")) + def test_put_object_with_ec_cnr(self, container: str, grpc_client: GrpcClientWrapper, object_size: ObjectSize) -> None: with reporter.step("Put object in container"): test_file = generate_file(object_size.value) - oid = grpc_client.object.put(test_file, cid, self.cluster.default_rpc_endpoint) + oid = grpc_client.object.put(test_file, container, self.cluster.default_rpc_endpoint) with reporter.step("Get chunks object."): - chunks = grpc_client.object.chunks.get_all(self.cluster.default_rpc_endpoint, cid, oid) + chunks = grpc_client.object.chunks.get_all(self.cluster.default_rpc_endpoint, container, oid) with reporter.step("Check header chunks object"): for chunk in chunks: chunk_head = grpc_client.object.head( - cid, chunk.object_id, self.cluster.default_rpc_endpoint, is_raw=True, json_output=False + container, chunk.object_id, self.cluster.default_rpc_endpoint, is_raw=True, json_output=False ).stdout assert "EC header:" in chunk_head @allure.title("Request GET (size={object_size})") - def test_get_object_in_ec_cnr(self, grpc_client: GrpcClientWrapper, object_size: ObjectSize) -> None: - with reporter.step("Create container."): - cid = grpc_client.container.create(self.cluster.default_rpc_endpoint, policy="EC 2.1 CBF 1", await_mode=True) - + @requires_container(PUBLIC_WITH_POLICY("EC 2.1 CBF 1")) + def test_get_object_in_ec_cnr(self, container: str, grpc_client: GrpcClientWrapper, object_size: ObjectSize) -> None: with reporter.step("Put object in container"): test_file = generate_file(object_size.value) hash_origin_file = get_file_hash(test_file) - oid = grpc_client.object.put(test_file, cid, self.cluster.default_rpc_endpoint) + oid = grpc_client.object.put(test_file, container, self.cluster.default_rpc_endpoint) with reporter.step("Get id all chunks."): - chunks = grpc_client.object.chunks.get_all(self.cluster.default_rpc_endpoint, cid, oid) + chunks = grpc_client.object.chunks.get_all(self.cluster.default_rpc_endpoint, container, oid) with reporter.step("Search chunk node and not chunks node."): chunk_node = grpc_client.object.chunks.get_chunk_node(self.cluster, chunks[0])[0] @@ -407,88 +406,80 @@ class TestECReplication(ClusterTestBase): ] with reporter.step("GET request with chunk node, expect success"): - file_one = grpc_client.object.get(cid, oid, chunk_node.storage_node.get_rpc_endpoint()) + file_one = grpc_client.object.get(container, oid, chunk_node.storage_node.get_rpc_endpoint()) hash_file_one = get_file_hash(file_one) assert hash_file_one == hash_origin_file with reporter.step("Get request with not chunk node"): - file_two = grpc_client.object.get(cid, oid, not_chunk_node.storage_node.get_rpc_endpoint()) + file_two = grpc_client.object.get(container, oid, not_chunk_node.storage_node.get_rpc_endpoint()) hash_file_two = get_file_hash(file_two) assert hash_file_two == hash_file_one == hash_origin_file @allure.title("Request SEARCH with flags 'root' (size={object_size})") - def test_search_object_in_ec_cnr_root_flags(self, grpc_client: GrpcClientWrapper, object_size: ObjectSize) -> None: - with reporter.step("Create container."): - cid = grpc_client.container.create(self.cluster.default_rpc_endpoint, policy="EC 2.1", await_mode=True) - + @requires_container(PUBLIC_WITH_POLICY("EC 2.1")) + def test_search_object_in_ec_cnr_root_flags(self, container: str, grpc_client: GrpcClientWrapper, object_size: ObjectSize) -> None: with reporter.step("Put object in container"): test_file = generate_file(object_size.value) - oid = grpc_client.object.put(test_file, cid, self.cluster.default_rpc_endpoint) + oid = grpc_client.object.put(test_file, container, self.cluster.default_rpc_endpoint) with reporter.step("Search operation with --root flags"): - search_output = grpc_client.object.search(cid, self.cluster.default_rpc_endpoint, root=True) + search_output = grpc_client.object.search(container, self.cluster.default_rpc_endpoint, root=True) assert search_output[0] == oid @allure.title("Request SEARCH check valid chunk id (size={object_size})") - def test_search_object_in_ec_cnr_chunk_id(self, grpc_client: GrpcClientWrapper, object_size: ObjectSize) -> None: - with reporter.step("Create container."): - cid = grpc_client.container.create(self.cluster.default_rpc_endpoint, policy="EC 2.1", await_mode=True) - + @requires_container(PUBLIC_WITH_POLICY("EC 2.1")) + def test_search_object_in_ec_cnr_chunk_id(self, container: str, grpc_client: GrpcClientWrapper, object_size: ObjectSize) -> None: with reporter.step("Put object in container"): test_file = generate_file(object_size.value) - oid = grpc_client.object.put(test_file, cid, self.cluster.default_rpc_endpoint) + oid = grpc_client.object.put(test_file, container, self.cluster.default_rpc_endpoint) with reporter.step("Search operation object"): - search_output = grpc_client.object.search(cid, self.cluster.default_rpc_endpoint) - chunks = grpc_client.object.chunks.get_all(self.cluster.default_rpc_endpoint, cid, oid) + search_output = grpc_client.object.search(container, self.cluster.default_rpc_endpoint) + chunks = grpc_client.object.chunks.get_all(self.cluster.default_rpc_endpoint, container, oid) for chunk in chunks: assert chunk.object_id in search_output @allure.title("Request SEARCH check no chunk index info (size={object_size})") - def test_search_object_in_ec_cnr(self, grpc_client: GrpcClientWrapper, object_size: ObjectSize) -> None: - with reporter.step("Create container."): - cid = grpc_client.container.create(self.cluster.default_rpc_endpoint, policy="EC 2.1", await_mode=True) - + @requires_container(PUBLIC_WITH_POLICY("EC 2.1")) + def test_search_object_in_ec_cnr(self, container: str, grpc_client: GrpcClientWrapper, object_size: ObjectSize) -> None: with reporter.step("Put object in container"): test_file = generate_file(object_size.value) - oid = grpc_client.object.put(test_file, cid, self.cluster.default_rpc_endpoint) + oid = grpc_client.object.put(test_file, container, self.cluster.default_rpc_endpoint) with reporter.step("Search operation all chunk"): - chunks = grpc_client.object.chunks.get_all(self.cluster.default_rpc_endpoint, cid, oid) + chunks = grpc_client.object.chunks.get_all(self.cluster.default_rpc_endpoint, container, oid) for chunk in chunks: - chunk_search = grpc_client.object.search(cid, self.cluster.default_rpc_endpoint, oid=chunk.object_id) + chunk_search = grpc_client.object.search(container, self.cluster.default_rpc_endpoint, oid=chunk.object_id) assert "index" not in chunk_search @allure.title("Request DELETE (size={object_size})") @pytest.mark.failover + @requires_container(PUBLIC_WITH_POLICY("EC 2.1")) def test_delete_object_in_ec_cnr( - self, grpc_client: GrpcClientWrapper, object_size: ObjectSize, cluster_state_controller: ClusterStateController + self, container: str, grpc_client: GrpcClientWrapper, object_size: ObjectSize, cluster_state_controller: ClusterStateController ) -> None: - with reporter.step("Create container."): - cid = grpc_client.container.create(self.cluster.default_rpc_endpoint, policy="EC 2.1", await_mode=True) - with reporter.step("Put object in container."): test_file = generate_file(object_size.value) - oid = grpc_client.object.put(test_file, cid, self.cluster.default_rpc_endpoint) + oid = grpc_client.object.put(test_file, container, self.cluster.default_rpc_endpoint) with reporter.step("Check object chunks nodes."): - chunks = grpc_client.object.chunks.get_all(self.cluster.default_rpc_endpoint, cid, oid) + chunks = grpc_client.object.chunks.get_all(self.cluster.default_rpc_endpoint, container, oid) replication_count = 3 if object_size.name == "simple" else 3 * 4 assert len(chunks) == replication_count with reporter.step("Delete object"): - grpc_client.object.delete(cid, oid, self.cluster.default_rpc_endpoint) + grpc_client.object.delete(container, oid, self.cluster.default_rpc_endpoint) with reporter.step("Check that delete all chunks."): for chunk in chunks: with pytest.raises(RuntimeError, match="object already removed"): - grpc_client.object.head(cid, chunk.object_id, self.cluster.default_rpc_endpoint) + grpc_client.object.head(container, chunk.object_id, self.cluster.default_rpc_endpoint) with reporter.step("Put second object."): - oid_second = grpc_client.object.put(test_file, cid, self.cluster.default_rpc_endpoint) + oid_second = grpc_client.object.put(test_file, container, self.cluster.default_rpc_endpoint) with reporter.step("Check second object chunks nodes."): - chunks_second_object = grpc_client.object.chunks.get_all(self.cluster.default_rpc_endpoint, cid, oid_second) + chunks_second_object = grpc_client.object.chunks.get_all(self.cluster.default_rpc_endpoint, container, oid_second) assert len(chunks_second_object) == replication_count with reporter.step("Stop nodes with chunk."): @@ -497,48 +488,47 @@ class TestECReplication(ClusterTestBase): with reporter.step("Delete second object"): cluster_nodes = list(set(self.cluster.cluster_nodes) - {chunk_node[0]}) - grpc_client.object.delete(cid, oid_second, cluster_nodes[0].storage_node.get_rpc_endpoint()) + grpc_client.object.delete(container, oid_second, cluster_nodes[0].storage_node.get_rpc_endpoint()) with reporter.step("Check that delete all chunk second object."): for chunk in chunks_second_object: with pytest.raises(RuntimeError, match="object already removed|object not found"): - grpc_client.object.head(cid, chunk.object_id, cluster_nodes[0].storage_node.get_rpc_endpoint()) + grpc_client.object.head(container, chunk.object_id, cluster_nodes[0].storage_node.get_rpc_endpoint()) @allure.title("Request LOCK (size={object_size})") @pytest.mark.failover + @requires_container(PUBLIC_WITH_POLICY("EC 2.1")) def test_lock_object_in_ec_cnr( self, + container: str, + test_file: TestFile, grpc_client: GrpcClientWrapper, frostfs_cli: FrostfsCli, object_size: ObjectSize, cluster_state_controller: ClusterStateController, include_excluded_nodes: None, ) -> None: - with reporter.step("Create container."): - cid = grpc_client.container.create(self.cluster.default_rpc_endpoint, policy="EC 2.1", await_mode=True) - with reporter.step("Put object in container."): - test_file = generate_file(object_size.value) - oid = grpc_client.object.put(test_file, cid, self.cluster.default_rpc_endpoint) + oid = grpc_client.object.put(test_file, container, self.cluster.default_rpc_endpoint) with reporter.step("Check object chunks nodes."): - chunks_object = grpc_client.object.chunks.get_all(self.cluster.default_rpc_endpoint, cid, oid) + chunks_object = grpc_client.object.chunks.get_all(self.cluster.default_rpc_endpoint, container, oid) replication_count = 3 if object_size.name == "simple" else 3 * 4 assert len(chunks_object) == replication_count with reporter.step("Put LOCK in object."): # TODO Rework for the grpc_client when the netmap methods are implemented epoch = frostfs_cli.netmap.epoch(self.cluster.default_rpc_endpoint, timeout=CLI_DEFAULT_TIMEOUT).stdout.strip() - grpc_client.object.lock(cid, oid, self.cluster.default_rpc_endpoint, expire_at=(int(epoch) + 5)) + grpc_client.object.lock(container, oid, self.cluster.default_rpc_endpoint, expire_at=(int(epoch) + 5)) with reporter.step("Check don`t delete chunk"): for chunk in chunks_object: with pytest.raises(RuntimeError, match="Lock EC chunk failed"): - grpc_client.object.delete(cid, chunk.object_id, self.cluster.default_rpc_endpoint) + grpc_client.object.delete(container, chunk.object_id, self.cluster.default_rpc_endpoint) with reporter.step("Check enable LOCK object"): with pytest.raises(RuntimeError, match="object is locked"): - grpc_client.object.delete(cid, oid, self.cluster.default_rpc_endpoint) + grpc_client.object.delete(container, oid, self.cluster.default_rpc_endpoint) with reporter.step("Remove node in netmap."): chunk_node = grpc_client.object.chunks.get_chunk_node(self.cluster, chunks_object[0])[0] @@ -548,11 +538,11 @@ class TestECReplication(ClusterTestBase): with reporter.step("Check don`t delete chunk."): for chunk in chunks_object: with pytest.raises(RuntimeError, match="Lock EC chunk failed|object not found"): - grpc_client.object.delete(cid, chunk.object_id, alive_node.storage_node.get_rpc_endpoint()) + grpc_client.object.delete(container, chunk.object_id, alive_node.storage_node.get_rpc_endpoint()) with reporter.step("Check enable LOCK object"): with pytest.raises(RuntimeError, match="object is locked"): - grpc_client.object.delete(cid, oid, alive_node.storage_node.get_rpc_endpoint()) + grpc_client.object.delete(container, oid, alive_node.storage_node.get_rpc_endpoint()) with reporter.step("Include node in netmap"): cluster_state_controller.include_node_to_netmap(chunk_node.storage_node, alive_node.storage_node) @@ -639,8 +629,10 @@ class TestECReplication(ClusterTestBase): @allure.title("Evacuation shard with chunk (type={type})") @pytest.mark.parametrize("type, get_chunk", [("data", get_data_chunk_object), ("parity", get_parity_chunk_object)]) + @requires_container(PUBLIC_WITH_POLICY("EC 1.1 CBF 1")) def test_evacuation_data_shard( self, + container: str, restore_nodes_shards_mode: None, frostfs_cli: FrostfsCli, grpc_client: GrpcClientWrapper, @@ -648,15 +640,12 @@ class TestECReplication(ClusterTestBase): type: str, get_chunk, ) -> None: - with reporter.step("Create container."): - cid = grpc_client.container.create(self.cluster.default_rpc_endpoint, policy="EC 1.1 CBF 1", await_mode=True) - with reporter.step("Put object in container."): test_file = generate_file(max_object_size - 1000) - oid = grpc_client.object.put(test_file, cid, self.cluster.default_rpc_endpoint) + oid = grpc_client.object.put(test_file, container, self.cluster.default_rpc_endpoint) with reporter.step("Get object chunks."): - chunk = get_chunk(self, frostfs_cli, cid, oid, self.cluster.default_rpc_endpoint) + chunk = get_chunk(self, frostfs_cli, container, oid, self.cluster.default_rpc_endpoint) chunk_node = grpc_client.object.chunks.get_chunk_node(self.cluster, chunk) frostfs_node_cli = self.get_node_cli(chunk_node[0], config=chunk_node[0].storage_node.get_remote_wallet_config_path()) @@ -669,7 +658,7 @@ class TestECReplication(ClusterTestBase): frostfs_node_cli.shards.evacuation_start(chunk_node[0].storage_node.get_control_endpoint(), shard_id, await_mode=True) with reporter.step("Get object after evacuation shard"): - grpc_client.object.get(cid, oid, self.cluster.default_rpc_endpoint) + grpc_client.object.get(container, oid, self.cluster.default_rpc_endpoint) @allure.title("[NEGATIVE] Don`t create more 1 EC policy") def test_more_one_ec_policy(self, grpc_client: GrpcClientWrapper) -> None: @@ -698,6 +687,7 @@ class TestECReplication(ClusterTestBase): @pytest.mark.parametrize("s3_policy, s3_client", [(S3_POLICY_FILE_LOCATION, AwsCliClient)], indirect=True) def test_count_chunks_bucket_with_ec_location( self, + test_file: TestFile, s3_client: S3ClientWrapper, bucket_container_resolver: BucketContainerResolver, grpc_client: GrpcClientWrapper, @@ -712,7 +702,6 @@ class TestECReplication(ClusterTestBase): assert bucket_status == VersioningStatus.ENABLED.value with reporter.step("Put object in bucket"): - test_file = generate_file(object_size.value) bucket_object = s3_client.put_object(bucket, test_file) with reporter.step("Watch replication count chunks"): @@ -722,23 +711,20 @@ class TestECReplication(ClusterTestBase): assert len(chunks) == expect_chunks @allure.title("Replication chunk after drop (size={object_size})") - def test_drop_chunk_and_replication(self, grpc_client: GrpcClientWrapper, object_size: ObjectSize, rep_count: int) -> None: - with reporter.step("Create container"): - cid = grpc_client.container.create(self.cluster.default_rpc_endpoint, policy="EC 2.1 CBF 1", await_mode=True) - + @requires_container(PUBLIC_WITH_POLICY("EC 2.1 CBF 1")) + def test_drop_chunk_and_replication(self, test_file: TestFile, container: str, grpc_client: GrpcClientWrapper, rep_count: int) -> None: with reporter.step("Put object"): - test_file = generate_file(object_size.value) - oid = grpc_client.object.put(test_file, cid, self.cluster.default_rpc_endpoint) + oid = grpc_client.object.put(test_file, container, self.cluster.default_rpc_endpoint) with reporter.step("Get all chunks"): - data_chunk = grpc_client.object.chunks.get_first_data(self.cluster.default_rpc_endpoint, cid, oid=oid) + data_chunk = grpc_client.object.chunks.get_first_data(self.cluster.default_rpc_endpoint, container, oid=oid) with reporter.step("Search chunk node"): chunk_node = grpc_client.object.chunks.get_chunk_node(self.cluster, data_chunk) shell_chunk_node = chunk_node[0].host.get_shell() with reporter.step("Get replication count"): - assert self.check_replication(rep_count, grpc_client, cid, oid) + assert self.check_replication(rep_count, grpc_client, container, oid) with reporter.step("Delete chunk"): frostfs_node_cli = FrostfsCli( @@ -746,7 +732,7 @@ class TestECReplication(ClusterTestBase): frostfs_cli_exec_path=FROSTFS_CLI_EXEC, config_file=chunk_node[0].storage_node.get_remote_wallet_config_path(), ) - frostfs_node_cli.control.drop_objects(chunk_node[0].storage_node.get_control_endpoint(), f"{cid}/{data_chunk.object_id}") + frostfs_node_cli.control.drop_objects(chunk_node[0].storage_node.get_control_endpoint(), f"{container}/{data_chunk.object_id}") with reporter.step("Wait replication count after drop one chunk"): - self.wait_replication(rep_count, grpc_client, cid, oid) + self.wait_replication(rep_count, grpc_client, container, oid)