diff --git a/pytest_tests/testsuites/object/test_object_api.py b/pytest_tests/testsuites/object/test_object_api.py index 700cdb4..e410f91 100755 --- a/pytest_tests/testsuites/object/test_object_api.py +++ b/pytest_tests/testsuites/object/test_object_api.py @@ -12,7 +12,7 @@ from frostfs_testlib.resources.error_patterns import ( INVALID_RANGE_ZERO_LENGTH, OUT_OF_RANGE, ) -from frostfs_testlib.shell import Shell +from frostfs_testlib.shell import Shell, CommandResult from frostfs_testlib.steps.cli.container import create_container from frostfs_testlib.steps.cli.object import ( get_object_from_random_node, @@ -21,6 +21,7 @@ from frostfs_testlib.steps.cli.object import ( head_object, put_object_to_random_node, search_object, + delete_object, ) from frostfs_testlib.steps.complex_object_actions import get_complex_object_split_ranges from frostfs_testlib.steps.storage_object import delete_objects @@ -133,6 +134,9 @@ def storage_objects( @pytest.mark.sanity @pytest.mark.grpc_api class TestObjectApi(ClusterTestBase): + + PLACEMENT_RULE_1 = "REP 1 IN X CBF 1 SELECT 1 FROM * AS X" + @allure.title("Storage policy by native API (obj_size={object_size})") def test_object_storage_policies( self, @@ -207,6 +211,26 @@ class TestObjectApi(ClusterTestBase): ) self.check_header_is_presented(head_info, storage_object_2.attributes) + def test_object_head_raw(self, default_wallet: str, object_size: ObjectSize): + """ + Execute command object head --raw and check object deletion + """ + + wallet = default_wallet + cid = create_container(wallet, self.shell, self.cluster.default_rpc_endpoint, self.PLACEMENT_RULE_1) + + file_path = generate_file(object_size.value) + oid = put_object_to_random_node(wallet, file_path, cid, self.shell, self.cluster) + + delete_object(wallet, cid, oid, self.shell, self.cluster.default_rpc_endpoint) + + result = head_object(wallet, cid, oid, self.shell, self.cluster.default_rpc_endpoint, is_raw=True) + + with reporter.step("Execute command object head --raw"): + result = head_object(wallet, cid, oid, self.shell, self.cluster.default_rpc_endpoint, is_raw=True) + if type(result) == CommandResult: + assert result.return_code != 0 + @allure.title("Search objects by native API (obj_size={object_size})") def test_search_object_api(self, storage_objects: list[StorageObjectInfo]): """