import os import random import string import uuid from datetime import datetime, timedelta from typing import Literal import allure import pytest from frostfs_testlib import reporter from frostfs_testlib.resources.common import ASSETS_DIR, DEFAULT_WALLET_PASS from frostfs_testlib.resources.error_patterns import S3_BUCKET_DOES_NOT_ALLOW_ACL, S3_MALFORMED_XML_REQUEST from frostfs_testlib.resources.s3_acl_grants import PRIVATE_GRANTS from frostfs_testlib.s3 import AwsCliClient, S3ClientWrapper, VersioningStatus from frostfs_testlib.steps.s3 import s3_helper from frostfs_testlib.storage.dataclasses.object_size import ObjectSize from frostfs_testlib.testing.test_control import expect_not_raises from frostfs_testlib.utils import wallet_utils from frostfs_testlib.utils.file_utils import TestFile, concat_files, generate_file, generate_file_with_content, get_file_hash @pytest.mark.nightly @pytest.mark.s3_gate @pytest.mark.s3_gate_object class TestS3GateObject: @pytest.fixture def second_wallet_public_key(self): second_wallet = os.path.join(os.getcwd(), ASSETS_DIR, f"{str(uuid.uuid4())}.json") wallet_utils.init_wallet(second_wallet, DEFAULT_WALLET_PASS) public_key = wallet_utils.get_wallet_public_key(second_wallet, DEFAULT_WALLET_PASS) yield public_key @allure.title("Object API (obj_size={object_size}, s3_client={s3_client})") @pytest.mark.parametrize( "object_size", ["simple", "complex"], indirect=True, ) def test_s3_api_object( self, s3_client: S3ClientWrapper, object_size: ObjectSize, bucket: str, ): """ Test base S3 Object API (Put/Head/List) for simple and complex objects. """ with reporter.step("Prepare object to upload"): test_file = generate_file(object_size.value) file_name = s3_helper.object_key_from_file_path(test_file) with reporter.step("Put object to bucket"): s3_client.put_object(bucket, test_file) with reporter.step("Head object from bucket"): s3_client.head_object(bucket, file_name) with reporter.step("Verify object in list"): bucket_objects = s3_client.list_objects(bucket) assert file_name in bucket_objects, f"Expected file {file_name} in objects list {bucket_objects}" with reporter.step("Check object's attributes"): for attrs in (["ETag"], ["ObjectSize", "StorageClass"]): s3_client.get_object_attributes(bucket, file_name, attrs) @allure.title("Copy object (s3_client={s3_client})") def test_s3_copy_object( self, s3_client: S3ClientWrapper, two_buckets: list[str], simple_object_size: ObjectSize, ): file_path = generate_file(simple_object_size.value) file_name = s3_helper.object_key_from_file_path(file_path) bucket_1_objects = [file_name] bucket_1, bucket_2 = two_buckets with reporter.step("Put object into one bucket"): s3_client.put_object(bucket_1, file_path) with reporter.step("Copy one object into the same bucket"): copy_obj_path = s3_client.copy_object(bucket_1, file_name) bucket_1_objects.append(copy_obj_path) s3_helper.check_objects_in_bucket(s3_client, bucket_1, bucket_1_objects) objects_list = s3_client.list_objects(bucket_2) assert not objects_list, f"Expected empty bucket, got {objects_list}" with reporter.step("Copy object from first bucket into second"): copy_obj_path_b2 = s3_client.copy_object(bucket_1, file_name, bucket=bucket_2) s3_helper.check_objects_in_bucket(s3_client, bucket_1, expected_objects=bucket_1_objects) s3_helper.check_objects_in_bucket(s3_client, bucket_2, expected_objects=[copy_obj_path_b2]) with reporter.step("Check copied object has the same content"): got_copied_file_b2 = s3_client.get_object(bucket_2, copy_obj_path_b2) assert get_file_hash(file_path) == get_file_hash(got_copied_file_b2), "Hashes must be the same" with reporter.step("Delete one object from first bucket"): s3_client.delete_object(bucket_1, file_name) bucket_1_objects.remove(file_name) s3_helper.check_objects_in_bucket(s3_client, bucket_1, expected_objects=bucket_1_objects) s3_helper.check_objects_in_bucket(s3_client, bucket_2, expected_objects=[copy_obj_path_b2]) with reporter.step("Copy one object into the same bucket"): with pytest.raises(Exception): s3_client.copy_object(bucket_1, file_name) @allure.title("Copy version of object (s3_client={s3_client})") def test_s3_copy_version_object( self, s3_client: S3ClientWrapper, two_buckets: list[str], simple_object_size: ObjectSize, ): version_1_content = "Version 1" file_name_simple = generate_file_with_content(simple_object_size.value, content=version_1_content) obj_key = os.path.basename(file_name_simple) bucket_1, bucket_2 = two_buckets s3_helper.set_bucket_versioning(s3_client, bucket_1, VersioningStatus.ENABLED) with reporter.step("Put object into bucket"): s3_client.put_object(bucket_1, file_name_simple) bucket_1_objects = [obj_key] s3_helper.check_objects_in_bucket(s3_client, bucket_1, [obj_key]) with reporter.step("Copy one object into the same bucket"): copy_obj_path = s3_client.copy_object(bucket_1, obj_key) bucket_1_objects.append(copy_obj_path) s3_helper.check_objects_in_bucket(s3_client, bucket_1, bucket_1_objects) s3_helper.set_bucket_versioning(s3_client, bucket_2, VersioningStatus.ENABLED) with reporter.step("Copy object from first bucket into second"): copy_obj_path_b2 = s3_client.copy_object(bucket_1, obj_key, bucket=bucket_2) s3_helper.check_objects_in_bucket(s3_client, bucket_1, expected_objects=bucket_1_objects) s3_helper.check_objects_in_bucket(s3_client, bucket_2, expected_objects=[copy_obj_path_b2]) with reporter.step("Delete one object from first bucket and check object in bucket"): s3_client.delete_object(bucket_1, obj_key) bucket_1_objects.remove(obj_key) s3_helper.check_objects_in_bucket(s3_client, bucket_1, expected_objects=bucket_1_objects) with reporter.step("Copy one object into the same bucket"): with pytest.raises(Exception): s3_client.copy_object(bucket_1, obj_key) @allure.title("Copy with acl (s3_client={s3_client})") def test_s3_copy_acl(self, s3_client: S3ClientWrapper, bucket: str, simple_object_size: ObjectSize): file_path = generate_file_with_content(simple_object_size.value) file_name = os.path.basename(file_path) s3_helper.set_bucket_versioning(s3_client, bucket, VersioningStatus.ENABLED) with reporter.step("Put object into bucket"): s3_client.put_object(bucket, file_path) s3_helper.check_objects_in_bucket(s3_client, bucket, [file_name]) with reporter.step("[NEGATIVE] Copy object with public-read-write ACL"): with pytest.raises(Exception, match=S3_BUCKET_DOES_NOT_ALLOW_ACL): copy_path = s3_client.copy_object(bucket, file_name, acl="public-read-write") with reporter.step("Copy object with private ACL"): copy_path = s3_client.copy_object(bucket, file_name, acl="private") object_grants = s3_client.get_object_acl(bucket, copy_path) s3_helper.verify_acl_permissions(object_grants, PRIVATE_GRANTS) @allure.title("Copy object with metadata (s3_client={s3_client})") def test_s3_copy_metadate(self, s3_client: S3ClientWrapper, bucket: str, simple_object_size: ObjectSize): object_metadata = {f"{uuid.uuid4()}": f"{uuid.uuid4()}"} file_path = generate_file(simple_object_size.value) file_name = s3_helper.object_key_from_file_path(file_path) bucket_1_objects = [file_name] s3_helper.set_bucket_versioning(s3_client, bucket, VersioningStatus.ENABLED) with reporter.step("Put object into bucket"): s3_client.put_object(bucket, file_path, metadata=object_metadata) bucket_1_objects = [file_name] s3_helper.check_objects_in_bucket(s3_client, bucket, bucket_1_objects) with reporter.step("Copy one object"): copy_obj_path = s3_client.copy_object(bucket, file_name) bucket_1_objects.append(copy_obj_path) s3_helper.check_objects_in_bucket(s3_client, bucket, bucket_1_objects) obj_head = s3_client.head_object(bucket, copy_obj_path) assert obj_head.get("Metadata") == object_metadata, f"Metadata must be {object_metadata}" with reporter.step("Copy one object with metadata"): copy_obj_path = s3_client.copy_object(bucket, file_name, metadata_directive="COPY") bucket_1_objects.append(copy_obj_path) obj_head = s3_client.head_object(bucket, copy_obj_path) assert obj_head.get("Metadata") == object_metadata, f"Metadata must be {object_metadata}" with reporter.step("Copy one object with new metadata"): object_metadata_1 = {f"{uuid.uuid4()}": f"{uuid.uuid4()}"} copy_obj_path = s3_client.copy_object( bucket, file_name, metadata_directive="REPLACE", metadata=object_metadata_1, ) bucket_1_objects.append(copy_obj_path) obj_head = s3_client.head_object(bucket, copy_obj_path) assert obj_head.get("Metadata") == object_metadata_1, f"Metadata must be {object_metadata_1}" @allure.title("Copy object with tagging (s3_client={s3_client})") def test_s3_copy_tagging(self, s3_client: S3ClientWrapper, bucket: str, simple_object_size: ObjectSize): object_tagging = [(f"{uuid.uuid4()}", f"{uuid.uuid4()}")] file_path = generate_file(simple_object_size.value) file_name_simple = s3_helper.object_key_from_file_path(file_path) bucket_1_objects = [file_name_simple] s3_helper.set_bucket_versioning(s3_client, bucket, VersioningStatus.ENABLED) with reporter.step("Put several versions of object into bucket"): s3_client.put_object(bucket, file_path) s3_client.put_object_tagging(bucket, file_name_simple, tags=object_tagging) bucket_1_objects = [file_name_simple] s3_helper.check_objects_in_bucket(s3_client, bucket, bucket_1_objects) with reporter.step("Copy one object without tag"): copy_obj_path = s3_client.copy_object(bucket, file_name_simple) got_tags = s3_client.get_object_tagging(bucket, copy_obj_path) assert got_tags, f"Expected tags, got {got_tags}" expected_tags = [{"Key": key, "Value": value} for key, value in object_tagging] for tag in expected_tags: assert tag in got_tags, f"Expected tag {tag} in {got_tags}" with reporter.step("Copy one object with tag"): copy_obj_path_1 = s3_client.copy_object(bucket, file_name_simple, tagging_directive="COPY") got_tags = s3_client.get_object_tagging(bucket, copy_obj_path_1) assert got_tags, f"Expected tags, got {got_tags}" expected_tags = [{"Key": key, "Value": value} for key, value in object_tagging] for tag in expected_tags: assert tag in got_tags, f"Expected tag {tag} in {got_tags}" with reporter.step("Copy one object with new tag"): tag_key = "tag1" tag_value = uuid.uuid4() new_tag = f"{tag_key}={tag_value}" copy_obj_path = s3_client.copy_object( bucket, file_name_simple, tagging_directive="REPLACE", tagging=new_tag, ) got_tags = s3_client.get_object_tagging(bucket, copy_obj_path) assert got_tags, f"Expected tags, got {got_tags}" expected_tags = [{"Key": tag_key, "Value": str(tag_value)}] for tag in expected_tags: assert tag in got_tags, f"Expected tag {tag} in {got_tags}" @allure.title("Delete version of object (s3_client={s3_client})") def test_s3_delete_versioning( self, s3_client: S3ClientWrapper, bucket: str, simple_object_size: ObjectSize, complex_object_size: ObjectSize, ): version_1_content = "Version 1" version_2_content = "Version 2" file_name_simple = generate_file_with_content(simple_object_size.value, content=version_1_content) obj_key = os.path.basename(file_name_simple) s3_helper.set_bucket_versioning(s3_client, bucket, VersioningStatus.ENABLED) with reporter.step("Put several versions of object into bucket"): version_id_1 = s3_client.put_object(bucket, file_name_simple) file_name_1 = generate_file_with_content(simple_object_size.value, file_name_simple, version_2_content) version_id_2 = s3_client.put_object(bucket, file_name_1) with reporter.step("Check bucket shows all versions"): versions = s3_client.list_objects_versions(bucket) obj_versions = {version.get("VersionId") for version in versions if version.get("Key") == obj_key} assert obj_versions == { version_id_1, version_id_2, }, f"Object should have versions: {version_id_1, version_id_2}" with reporter.step("Delete 1 version of object"): delete_obj = s3_client.delete_object(bucket, obj_key, version_id=version_id_1) versions = s3_client.list_objects_versions(bucket) obj_versions = {version.get("VersionId") for version in versions if version.get("Key") == obj_key} assert obj_versions == {version_id_2}, f"Object should have versions: {version_id_2}" assert "DeleteMarker" not in delete_obj.keys(), "Delete markers should not be created" with reporter.step("Delete second version of object"): delete_obj = s3_client.delete_object(bucket, obj_key, version_id_2) versions = s3_client.list_objects_versions(bucket) obj_versions = {version.get("VersionId") for version in versions if version.get("Key") == obj_key} assert not obj_versions, "Expected object not found" assert "DeleteMarker" not in delete_obj.keys(), "Delete markers should not be created" with reporter.step("Put new object into bucket"): file_name_complex = generate_file(complex_object_size.value) obj_key = os.path.basename(file_name_complex) s3_client.put_object(bucket, file_name_complex) with reporter.step("Delete last object"): delete_obj = s3_client.delete_object(bucket, obj_key) versions = s3_client.list_objects_versions(bucket, True) assert versions.get("DeleteMarkers", None), "Expected delete Marker" assert "DeleteMarker" in delete_obj.keys(), "Expected delete Marker" @allure.title("Bulk delete version of object (s3_client={s3_client})") def test_s3_bulk_delete_versioning(self, s3_client: S3ClientWrapper, bucket: str, simple_object_size: ObjectSize): version_1_content = "Version 1" version_2_content = "Version 2" version_3_content = "Version 3" version_4_content = "Version 4" file_name_1 = generate_file_with_content(simple_object_size.value, content=version_1_content) obj_key = os.path.basename(file_name_1) s3_helper.set_bucket_versioning(s3_client, bucket, VersioningStatus.ENABLED) with reporter.step("Put several versions of object into bucket"): version_id_1 = s3_client.put_object(bucket, file_name_1) file_name_2 = generate_file_with_content(simple_object_size.value, file_name_1, version_2_content) version_id_2 = s3_client.put_object(bucket, file_name_2) file_name_3 = generate_file_with_content(simple_object_size.value, file_name_1, version_3_content) version_id_3 = s3_client.put_object(bucket, file_name_3) file_name_4 = generate_file_with_content(simple_object_size.value, file_name_1, version_4_content) version_id_4 = s3_client.put_object(bucket, file_name_4) version_ids = {version_id_1, version_id_2, version_id_3, version_id_4} with reporter.step("Check bucket shows all versions"): versions = s3_client.list_objects_versions(bucket) obj_versions = {version.get("VersionId") for version in versions if version.get("Key") == obj_key} assert obj_versions == version_ids, f"Object should have versions: {version_ids}" with reporter.step("Delete two objects from bucket one by one"): version_to_delete_b1 = random.sample([version_id_1, version_id_2, version_id_3, version_id_4], k=2) version_to_save = list(set(version_ids) - set(version_to_delete_b1)) for ver in version_to_delete_b1: s3_client.delete_object(bucket, obj_key, ver) with reporter.step("Check bucket shows all versions"): versions = s3_client.list_objects_versions(bucket) obj_versions = [version.get("VersionId") for version in versions if version.get("Key") == obj_key] assert obj_versions.sort() == version_to_save.sort(), f"Object should have versions: {version_to_save}" @allure.title("Get versions of object (s3_client={s3_client})") def test_s3_get_versioning(self, s3_client: S3ClientWrapper, bucket: str, simple_object_size: ObjectSize): version_1_content = "Version 1" version_2_content = "Version 2" file_name_simple = generate_file_with_content(simple_object_size.value, content=version_1_content) obj_key = os.path.basename(file_name_simple) s3_helper.set_bucket_versioning(s3_client, bucket, VersioningStatus.ENABLED) with reporter.step("Put several versions of object into bucket"): version_id_1 = s3_client.put_object(bucket, file_name_simple) file_name_1 = generate_file_with_content(simple_object_size.value, file_path=file_name_simple, content=version_2_content) version_id_2 = s3_client.put_object(bucket, file_name_1) with reporter.step("Get first version of object"): object_1 = s3_client.get_object(bucket, obj_key, version_id_1, full_output=True) assert object_1.get("VersionId") == version_id_1, f"Get object with version {version_id_1}" with reporter.step("Get second version of object"): object_2 = s3_client.get_object(bucket, obj_key, version_id_2, full_output=True) assert object_2.get("VersionId") == version_id_2, f"Get object with version {version_id_2}" with reporter.step("Get object"): object_3 = s3_client.get_object(bucket, obj_key, full_output=True) assert object_3.get("VersionId") == version_id_2, f"Get object with version {version_id_2}" @allure.title("Get range (s3_client={s3_client})") def test_s3_get_range( self, s3_client: S3ClientWrapper, bucket: str, complex_object_size: ObjectSize, simple_object_size: ObjectSize, ): file_path = generate_file(complex_object_size.value) file_name = s3_helper.object_key_from_file_path(file_path) file_hash = get_file_hash(file_path) s3_helper.set_bucket_versioning(s3_client, bucket, VersioningStatus.ENABLED) with reporter.step("Put several versions of object into bucket"): version_id_1 = s3_client.put_object(bucket, file_path) file_name_1 = generate_file_with_content(simple_object_size.value, file_path=file_path) version_id_2 = s3_client.put_object(bucket, file_name_1) with reporter.step("Get first version of object"): object_1_part_1 = s3_client.get_object( bucket, file_name, version_id_1, object_range=[0, int(complex_object_size.value / 3)], ) object_1_part_2 = s3_client.get_object( bucket, file_name, version_id_1, object_range=[ int(complex_object_size.value / 3) + 1, 2 * int(complex_object_size.value / 3), ], ) object_1_part_3 = s3_client.get_object( bucket, file_name, version_id_1, object_range=[ 2 * int(complex_object_size.value / 3) + 1, complex_object_size.value, ], ) con_file = concat_files([object_1_part_1, object_1_part_2, object_1_part_3]) assert get_file_hash(con_file) == file_hash, "Hashes must be the same" with reporter.step("Get second version of object"): object_2_part_1 = s3_client.get_object( bucket, file_name, version_id_2, object_range=[0, int(simple_object_size.value / 3)], ) object_2_part_2 = s3_client.get_object( bucket, file_name, version_id_2, object_range=[ int(simple_object_size.value / 3) + 1, 2 * int(simple_object_size.value / 3), ], ) object_2_part_3 = s3_client.get_object( bucket, file_name, version_id_2, object_range=[2 * int(simple_object_size.value / 3) + 1, simple_object_size.value], ) con_file_1 = concat_files([object_2_part_1, object_2_part_2, object_2_part_3]) assert get_file_hash(con_file_1) == get_file_hash(file_name_1), "Hashes must be the same" with reporter.step("Get object"): object_3_part_1 = s3_client.get_object(bucket, file_name, object_range=[0, int(simple_object_size.value / 3)]) object_3_part_2 = s3_client.get_object( bucket, file_name, object_range=[ int(simple_object_size.value / 3) + 1, 2 * int(simple_object_size.value / 3), ], ) object_3_part_3 = s3_client.get_object( bucket, file_name, object_range=[2 * int(simple_object_size.value / 3) + 1, simple_object_size.value], ) con_file = concat_files([object_3_part_1, object_3_part_2, object_3_part_3]) assert get_file_hash(con_file) == get_file_hash(file_name_1), "Hashes must be the same" def copy_extend_list(self, original_list: list[str], n: int) -> list[str]: """Extend the list with own elements up to n elements""" multiplier = n // len(original_list) result_list = original_list.copy() result_list = result_list * multiplier for i in range(n - len(result_list)): result_list.append(result_list[i]) return result_list @allure.title("Bulk deletion is limited to 1000 objects (s3_client={s3_client})") def test_s3_bulk_deletion_limit( self, s3_client: S3ClientWrapper, bucket: str, simple_object_size: ObjectSize, ): objects_in_bucket = [] objects_count = 3 with reporter.step(f"Put {objects_count} into bucket"): for _ in range(objects_count): file_path = generate_file(simple_object_size.value) file_name = s3_helper.object_key_from_file_path(file_path) objects_in_bucket.append(file_name) s3_client.put_object(bucket, file_path) # Extend deletion list to 1001 elements with same keys for test speed objects_to_delete = self.copy_extend_list(objects_in_bucket, 1001) with reporter.step("Send delete request with 1001 objects and expect error"): with pytest.raises(Exception, match=S3_MALFORMED_XML_REQUEST): s3_client.delete_objects(bucket, objects_to_delete) with reporter.step("Send delete request with 1000 objects without error"): with expect_not_raises(): s3_client.delete_objects(bucket, objects_to_delete[:1000]) @allure.title("Object head is unloaded with the correct version (s3_client={s3_client})") @pytest.mark.smoke def test_s3_head_object( self, s3_client: S3ClientWrapper, bucket: str, complex_object_size: ObjectSize, simple_object_size: ObjectSize, ): object_metadata = {f"{uuid.uuid4()}": f"{uuid.uuid4()}"} file_path = generate_file(complex_object_size.value) file_name = s3_helper.object_key_from_file_path(file_path) s3_helper.set_bucket_versioning(s3_client, bucket, VersioningStatus.ENABLED) with reporter.step("Put several versions of object into bucket"): version_id_1 = s3_client.put_object(bucket, file_path, metadata=object_metadata) file_name_1 = generate_file_with_content(simple_object_size.value, file_path=file_path) version_id_2 = s3_client.put_object(bucket, file_name_1) with reporter.step("Get head of first version of object"): response = s3_client.head_object(bucket, file_name) assert "LastModified" in response, "Expected LastModified field" assert "ETag" in response, "Expected ETag field" assert response.get("Metadata") == {}, "Expected Metadata empty" assert response.get("VersionId") == version_id_2, f"Expected VersionId is {version_id_2}" assert response.get("ContentLength") != 0, "Expected ContentLength is not zero" with reporter.step("Get head ob first version of object"): response = s3_client.head_object(bucket, file_name, version_id=version_id_1) assert "LastModified" in response, "Expected LastModified field" assert "ETag" in response, "Expected ETag field" assert response.get("Metadata") == object_metadata, f"Expected Metadata is {object_metadata}" assert response.get("VersionId") == version_id_1, f"Expected VersionId is {version_id_1}" assert response.get("ContentLength") != 0, "Expected ContentLength is not zero" @allure.title("List of objects with version (method_version={list_type}, s3_client={s3_client})") @pytest.mark.parametrize("list_type", ["v1", "v2"]) def test_s3_list_object( self, s3_client: S3ClientWrapper, list_type: str, bucket: str, complex_object_size: ObjectSize, ): file_path_1 = generate_file(complex_object_size.value) file_name = s3_helper.object_key_from_file_path(file_path_1) file_path_2 = generate_file(complex_object_size.value) file_name_2 = s3_helper.object_key_from_file_path(file_path_2) s3_helper.set_bucket_versioning(s3_client, bucket, VersioningStatus.ENABLED) with reporter.step("Put several versions of object into bucket"): s3_client.put_object(bucket, file_path_1) s3_client.put_object(bucket, file_path_2) with reporter.step("Get list of object"): if list_type == "v1": list_obj = s3_client.list_objects(bucket) elif list_type == "v2": list_obj = s3_client.list_objects_v2(bucket) assert len(list_obj) == 2, "bucket should have 2 objects" assert list_obj.sort() == [file_name, file_name_2].sort(), f"bucket should have object key {file_name, file_name_2}" with reporter.step("Delete object"): delete_obj = s3_client.delete_object(bucket, file_name) if list_type == "v1": list_obj_1 = s3_client.list_objects(bucket, full_output=True) elif list_type == "v2": list_obj_1 = s3_client.list_objects_v2(bucket, full_output=True) contents = list_obj_1.get("Contents", []) assert len(contents) == 1, "bucket should have only 1 object" assert contents[0].get("Key") == file_name_2, f"bucket should have object key {file_name_2}" assert "DeleteMarker" in delete_obj.keys(), "Expected delete Marker" @allure.title("Put object (s3_client={s3_client})") def test_s3_put_object( self, s3_client: S3ClientWrapper, bucket: str, complex_object_size: ObjectSize, simple_object_size: ObjectSize, ): file_path_1 = generate_file(complex_object_size.value) file_name = s3_helper.object_key_from_file_path(file_path_1) object_1_metadata = {f"{uuid.uuid4()}": f"{uuid.uuid4()}"} tag_key_1 = "tag1" tag_value_1 = uuid.uuid4() tag_1 = f"{tag_key_1}={tag_value_1}" object_2_metadata = {f"{uuid.uuid4()}": f"{uuid.uuid4()}"} tag_key_2 = "tag2" tag_value_2 = uuid.uuid4() tag_2 = f"{tag_key_2}={tag_value_2}" s3_helper.set_bucket_versioning(s3_client, bucket, VersioningStatus.SUSPENDED) with reporter.step("Put first object into bucket"): s3_client.put_object(bucket, file_path_1, metadata=object_1_metadata, tagging=tag_1) obj_head = s3_client.head_object(bucket, file_name) assert obj_head.get("Metadata") == object_1_metadata, "Metadata must be the same" got_tags = s3_client.get_object_tagging(bucket, file_name) assert got_tags, f"Expected tags, got {got_tags}" assert got_tags == [{"Key": tag_key_1, "Value": str(tag_value_1)}], "Tags must be the same" with reporter.step("Rewrite file into bucket"): file_path_2 = generate_file_with_content(simple_object_size.value, file_path=file_path_1) s3_client.put_object(bucket, file_path_2, metadata=object_2_metadata, tagging=tag_2) obj_head = s3_client.head_object(bucket, file_name) assert obj_head.get("Metadata") == object_2_metadata, "Metadata must be the same" got_tags_1 = s3_client.get_object_tagging(bucket, file_name) assert got_tags_1, f"Expected tags, got {got_tags_1}" assert got_tags_1 == [{"Key": tag_key_2, "Value": str(tag_value_2)}], "Tags must be the same" s3_helper.set_bucket_versioning(s3_client, bucket, VersioningStatus.ENABLED) file_path_3 = generate_file(complex_object_size.value) file_hash = get_file_hash(file_path_3) file_name_3 = s3_helper.object_key_from_file_path(file_path_3) object_3_metadata = {f"{uuid.uuid4()}": f"{uuid.uuid4()}"} tag_key_3 = "tag3" tag_value_3 = uuid.uuid4() tag_3 = f"{tag_key_3}={tag_value_3}" with reporter.step("Put third object into bucket"): version_id_1 = s3_client.put_object(bucket, file_path_3, metadata=object_3_metadata, tagging=tag_3) obj_head_3 = s3_client.head_object(bucket, file_name_3) assert obj_head_3.get("Metadata") == object_3_metadata, "Matadata must be the same" got_tags_3 = s3_client.get_object_tagging(bucket, file_name_3) assert got_tags_3, f"Expected tags, got {got_tags_3}" assert got_tags_3 == [{"Key": tag_key_3, "Value": str(tag_value_3)}], "Tags must be the same" with reporter.step("Put new version of file into bucket"): file_path_4 = generate_file_with_content(simple_object_size.value, file_path=file_path_3) version_id_2 = s3_client.put_object(bucket, file_path_4) versions = s3_client.list_objects_versions(bucket) obj_versions = {version.get("VersionId") for version in versions if version.get("Key") == file_name_3} assert obj_versions == { version_id_1, version_id_2, }, f"Object should have versions: {version_id_1, version_id_2}" got_tags_4 = s3_client.get_object_tagging(bucket, file_name_3) assert not got_tags_4, "No tags expected" with reporter.step("Get object"): object_3 = s3_client.get_object(bucket, file_name_3, full_output=True) assert object_3.get("VersionId") == version_id_2, f"get object with version {version_id_2}" object_3 = s3_client.get_object(bucket, file_name_3) assert get_file_hash(file_path_4) == get_file_hash(object_3), "Hashes must be the same" with reporter.step("Get first version of object"): object_4 = s3_client.get_object(bucket, file_name_3, version_id_1, full_output=True) assert object_4.get("VersionId") == version_id_1, f"get object with version {version_id_1}" object_4 = s3_client.get_object(bucket, file_name_3, version_id_1) assert file_hash == get_file_hash(object_4), "Hashes must be the same" obj_head_3 = s3_client.head_object(bucket, file_name_3, version_id_1) assert obj_head_3.get("Metadata") == object_3_metadata, "Metadata must be the same" got_tags_3 = s3_client.get_object_tagging(bucket, file_name_3, version_id_1) assert got_tags_3, f"Expected tags, got {got_tags_3}" assert got_tags_3 == [{"Key": tag_key_3, "Value": str(tag_value_3)}], "Tags must be the same" @allure.title("Put object with ACL (versioning={bucket_versioning}, s3_client={s3_client})") @pytest.mark.parametrize("bucket_versioning", ["ENABLED", "SUSPENDED"]) def test_s3_put_object_acl( self, s3_client: S3ClientWrapper, bucket_versioning: Literal["ENABLED", "SUSPENDED"], bucket: str, complex_object_size: ObjectSize, simple_object_size: ObjectSize, second_wallet_public_key: str, ): file_path = generate_file(complex_object_size.value) file_name = s3_helper.object_key_from_file_path(file_path) s3_helper.set_bucket_versioning(s3_client, bucket, VersioningStatus[bucket_versioning]) with reporter.step("Put object with acl private"): s3_client.put_object(bucket, file_path, acl="private") object_grants = s3_client.get_object_acl(bucket, file_name) s3_helper.verify_acl_permissions(object_grants, PRIVATE_GRANTS) object = s3_client.get_object(bucket, file_name) assert get_file_hash(file_path) == get_file_hash(object), "Hashes must be the same" with reporter.step("[NEGATIVE] Put object with acl public-read"): generate_file_with_content(simple_object_size.value, file_path) with pytest.raises(Exception, match=S3_BUCKET_DOES_NOT_ALLOW_ACL): s3_client.put_object(bucket, file_path, acl="public-read") with reporter.step("[NEGATIVE] Put object with acl public-read-write"): generate_file_with_content(simple_object_size.value, file_path) with pytest.raises(Exception, match=S3_BUCKET_DOES_NOT_ALLOW_ACL): s3_client.put_object(bucket, file_path, acl="public-read-write") with reporter.step("[NEGATIVE] Put object with --grant-full-control id=mycanonicaluserid"): with pytest.raises(Exception, match=S3_BUCKET_DOES_NOT_ALLOW_ACL): s3_client.put_object(bucket, file_path, grant_full_control=f"id={second_wallet_public_key}") with reporter.step("[NEGATIVE] Put object with --grant-read uri=http://acs.amazonaws.com/groups/global/AllUsers"): with pytest.raises(Exception, match=S3_BUCKET_DOES_NOT_ALLOW_ACL): s3_client.put_object(bucket, file_path, grant_read="uri=http://acs.amazonaws.com/groups/global/AllUsers") @allure.title("Put object with lock-mode (s3_client={s3_client})") def test_s3_put_object_lock_mode( self, s3_client: S3ClientWrapper, complex_object_size: ObjectSize, simple_object_size: ObjectSize, ): file_path_1 = generate_file(complex_object_size.value) file_name = s3_helper.object_key_from_file_path(file_path_1) bucket = s3_client.create_bucket(object_lock_enabled_for_bucket=True) s3_helper.set_bucket_versioning(s3_client, bucket, VersioningStatus.ENABLED) with reporter.step("Put object with lock-mode GOVERNANCE lock-retain-until-date +1day, lock-legal-hold-status"): date_obj = datetime.utcnow() + timedelta(days=1) s3_client.put_object( bucket, file_path_1, object_lock_mode="GOVERNANCE", object_lock_retain_until_date=date_obj.strftime("%Y-%m-%dT%H:%M:%S"), object_lock_legal_hold_status="OFF", ) s3_helper.assert_object_lock_mode(s3_client, bucket, file_name, "GOVERNANCE", date_obj, "OFF") with reporter.step("Put new version of object with [--object-lock-mode COMPLIANCE] и [--object-lock-retain-until-date +3days]"): date_obj = datetime.utcnow() + timedelta(days=2) generate_file_with_content(simple_object_size.value, file_path=file_path_1) s3_client.put_object( bucket, file_path_1, object_lock_mode="COMPLIANCE", object_lock_retain_until_date=date_obj, ) s3_helper.assert_object_lock_mode(s3_client, bucket, file_name, "COMPLIANCE", date_obj, "OFF") with reporter.step("Put new version of object with [--object-lock-mode COMPLIANCE] и [--object-lock-retain-until-date +2days]"): date_obj = datetime.utcnow() + timedelta(days=3) generate_file_with_content(simple_object_size.value, file_path=file_path_1) s3_client.put_object( bucket, file_path_1, object_lock_mode="COMPLIANCE", object_lock_retain_until_date=date_obj, object_lock_legal_hold_status="ON", ) s3_helper.assert_object_lock_mode(s3_client, bucket, file_name, "COMPLIANCE", date_obj, "ON") with reporter.step("Put object with lock-mode"): with pytest.raises( Exception, match=r".*must both be supplied*", ): # x-amz-object-lock-retain-until-date and x-amz-object-lock-mode must both be supplied s3_client.put_object(bucket, file_path_1, object_lock_mode="COMPLIANCE") with reporter.step("Put object with lock-mode and past date"): date_obj = datetime.utcnow() - timedelta(days=3) with pytest.raises( Exception, match=r".*until date must be in the future*", ): # The retain until date must be in the future s3_client.put_object( bucket, file_path_1, object_lock_mode="COMPLIANCE", object_lock_retain_until_date=date_obj, ) @allure.title("Delete object & delete objects (s3_client={s3_client})") def test_s3_api_delete( self, s3_client: S3ClientWrapper, two_buckets: list[str], simple_object_size: ObjectSize, complex_object_size: ObjectSize, ): """ Check delete_object and delete_objects S3 API operation. From first bucket some objects deleted one by one. From second bucket some objects deleted all at once. """ max_obj_count = 20 max_delete_objects = 17 put_objects = [] file_paths = [] obj_sizes = [simple_object_size, complex_object_size] bucket_1, bucket_2 = two_buckets with reporter.step(f"Generate {max_obj_count} files"): for _ in range(max_obj_count): test_file = generate_file(random.choice(obj_sizes).value) file_paths.append(test_file) put_objects.append(s3_helper.object_key_from_file_path(test_file.path)) for i, bucket in enumerate([bucket_1, bucket_2], 1): with reporter.step(f"Put {max_obj_count} objects into bucket_{i}"): for file_path in file_paths: s3_client.put_object(bucket, file_path) with reporter.step(f"Check all objects put in bucket_{i} successfully"): bucket_objects = s3_client.list_objects_v2(bucket) assert set(put_objects) == set(bucket_objects), f"Expected all objects {put_objects} in objects list {bucket_objects}" with reporter.step("Delete some objects from bucket_1 one by one"): objects_to_delete_b1 = random.sample(put_objects, k=max_delete_objects) for obj in objects_to_delete_b1: s3_client.delete_object(bucket_1, obj) with reporter.step("Check deleted objects are not visible in bucket bucket_1"): bucket_objects = s3_client.list_objects_v2(bucket_1) assert set(put_objects).difference(set(objects_to_delete_b1)) == set( bucket_objects ), f"Expected all objects {put_objects} in objects list {bucket_objects}" for object_key in objects_to_delete_b1: with pytest.raises(Exception, match="The specified key does not exist"): s3_client.get_object(bucket_1, object_key) with reporter.step("Delete some objects from bucket_2 at once"): objects_to_delete_b2 = random.sample(put_objects, k=max_delete_objects) s3_client.delete_objects(bucket_2, objects_to_delete_b2) with reporter.step("Check deleted objects are not visible in bucket bucket_2"): objects_list = s3_client.list_objects_v2(bucket_2) assert set(put_objects).difference(set(objects_to_delete_b2)) == set( objects_list ), f"Expected all objects {put_objects} in objects list {bucket_objects}" for object_key in objects_to_delete_b2: with pytest.raises(Exception, match="The specified key does not exist"): s3_client.get_object(bucket_2, object_key) @allure.title("Sync directory (sync_type={sync_type}, s3_client={s3_client})") @pytest.mark.parametrize("s3_client", [AwsCliClient], indirect=True) @pytest.mark.parametrize("sync_type", ["sync", "cp"]) def test_s3_sync_dir( self, s3_client: S3ClientWrapper, sync_type: Literal["sync", "cp"], bucket: str, simple_object_size: ObjectSize, ): test_file_1 = TestFile(os.path.join(os.getcwd(), ASSETS_DIR, "test_sync", "test_file_1")) test_file_2 = TestFile(os.path.join(os.getcwd(), ASSETS_DIR, "test_sync", "test_file_2")) object_metadata = {f"{uuid.uuid4()}": f"{uuid.uuid4()}"} key_to_path = {"test_file_1": test_file_1.path, "test_file_2": test_file_2.path} generate_file_with_content(simple_object_size.value, test_file_1) generate_file_with_content(simple_object_size.value, test_file_2) s3_helper.set_bucket_versioning(s3_client, bucket, VersioningStatus.ENABLED) if sync_type == "sync": s3_client.sync(bucket, os.path.dirname(test_file_1), metadata=object_metadata) elif sync_type == "cp": s3_client.cp(bucket, os.path.dirname(test_file_1), metadata=object_metadata) with reporter.step("Check objects are synced"): objects = s3_client.list_objects(bucket) assert set(key_to_path.keys()) == set(objects), f"Expected all abjects saved. Got {objects}" with reporter.step("Check these are the same objects"): for obj_key in objects: got_object = s3_client.get_object(bucket, obj_key) assert get_file_hash(got_object) == get_file_hash(key_to_path.get(obj_key)), "Expected hashes are the same" obj_head = s3_client.head_object(bucket, obj_key) assert obj_head.get("Metadata") == object_metadata, f"Metadata of object is {object_metadata}" object_grants = s3_client.get_object_acl(bucket, obj_key) s3_helper.verify_acl_permissions(object_grants, PRIVATE_GRANTS) @allure.title("Put 10 nested level object (s3_client={s3_client})") def test_s3_put_10_folder( self, s3_client: S3ClientWrapper, bucket: str, simple_object_size: ObjectSize, ): key_characters_sample = string.ascii_letters + string.digits + "._-" with reporter.step("Put object"): test_file = generate_file(simple_object_size.value) obj_key = "/" + "/".join(["".join(random.choices(key_characters_sample, k=5)) for _ in range(10)]) + "/test_file_1" s3_client.put_object(bucket, test_file, obj_key) with reporter.step("Check object can be downloaded"): s3_client.get_object(bucket, obj_key) with reporter.step("Check object listing"): s3_helper.check_objects_in_bucket(s3_client, bucket, [obj_key]) @allure.title("Delete non-existing object from empty bucket (s3_client={s3_client})") def test_s3_delete_non_existing_object(self, s3_client: S3ClientWrapper, bucket: str): s3_helper.set_bucket_versioning(s3_client, bucket, VersioningStatus.ENABLED) obj_key = "fake_object_key" with reporter.step("Delete non-existing object"): delete_obj = s3_client.delete_object(bucket, obj_key) assert "DeleteMarker" not in delete_obj.keys(), "Delete markers should not be created" objects_list = s3_client.list_objects_versions(bucket) assert not objects_list, f"Expected empty bucket, got {objects_list}" @allure.title("Delete the same object twice (s3_client={s3_client})") def test_s3_delete_twice(self, s3_client: S3ClientWrapper, bucket: str, simple_object_size: ObjectSize): s3_helper.set_bucket_versioning(s3_client, bucket, VersioningStatus.ENABLED) file_path = generate_file(simple_object_size.value) file_name = s3_helper.object_key_from_file_path(file_path) with reporter.step("Put object into one bucket"): s3_client.put_object(bucket, file_path) with reporter.step("Delete the object from the bucket"): delete_object = s3_client.delete_object(bucket, file_name) versions = s3_client.list_objects_versions(bucket) obj_versions = {version.get("VersionId") for version in versions if version.get("Key") == file_name} assert obj_versions, f"Object versions were not found {versions}" assert "DeleteMarker" in delete_object.keys(), "Delete markers not found" with reporter.step("Delete the object from the bucket again"): delete_object_2nd_attempt = s3_client.delete_object(bucket, file_name) versions_2nd_attempt = s3_client.list_objects_versions(bucket) assert delete_object.keys() == delete_object_2nd_attempt.keys(), "Delete markers are not the same" # check that nothing was changed # checking here not VersionId only, but all data (for example LastModified) assert versions == versions_2nd_attempt, "Versions are not the same"