import logging import os from random import choice, choices import allure import pytest from frostfs_testlib import reporter from frostfs_testlib.resources.common import ASSETS_DIR from frostfs_testlib.s3 import AwsCliClient, S3ClientWrapper, VersioningStatus from frostfs_testlib.shell import Shell from frostfs_testlib.steps.epoch import tick_epoch from frostfs_testlib.steps.s3 import s3_helper from frostfs_testlib.storage.cluster import Cluster from frostfs_testlib.storage.dataclasses.object_size import ObjectSize from frostfs_testlib.utils.file_utils import ( TestFile, generate_file, generate_file_with_content, get_file_content, get_file_hash, split_file, ) logger = logging.getLogger("NeoLogger") @allure.link("https://git.frostfs.info/TrueCloudLab/frostfs-s3-gw#frostfs-s3-gw", name="frostfs-s3-gateway") @pytest.mark.sanity @pytest.mark.s3_gate @pytest.mark.s3_gate_base class TestS3Gate: @allure.title("Bucket API (s3_client={s3_client})") def test_s3_buckets( self, s3_client: S3ClientWrapper, client_shell: Shell, cluster: Cluster, simple_object_size: ObjectSize, ): """ Test base S3 Bucket API (Create/List/Head/Delete). """ file_path = generate_file(simple_object_size.value) file_name = s3_helper.object_key_from_file_path(file_path) with reporter.step("Create buckets"): bucket_1 = s3_client.create_bucket(object_lock_enabled_for_bucket=True) s3_helper.set_bucket_versioning(s3_client, bucket_1, VersioningStatus.ENABLED) bucket_2 = s3_client.create_bucket() with reporter.step("Check buckets are presented in the system"): buckets = s3_client.list_buckets() assert bucket_1 in buckets, f"Expected bucket {bucket_1} is in the list" assert bucket_2 in buckets, f"Expected bucket {bucket_2} is in the list" with reporter.step("Bucket must be empty"): for bucket in (bucket_1, bucket_2): objects_list = s3_client.list_objects(bucket) assert not objects_list, f"Expected empty bucket, got {objects_list}" with reporter.step("Check buckets are visible with S3 head command"): s3_client.head_bucket(bucket_1) s3_client.head_bucket(bucket_2) with reporter.step("Check we can put/list object with S3 commands"): version_id = s3_client.put_object(bucket_1, file_path) s3_client.head_object(bucket_1, file_name) bucket_objects = s3_client.list_objects(bucket_1) assert file_name in bucket_objects, f"Expected file {file_name} in objects list {bucket_objects}" with reporter.step("Try to delete not empty bucket and get error"): with pytest.raises(Exception, match=r".*The bucket you tried to delete is not empty.*"): s3_client.delete_bucket(bucket_1) s3_client.head_bucket(bucket_1) with reporter.step(f"Delete empty bucket {bucket_2}"): s3_client.delete_bucket(bucket_2) tick_epoch(client_shell, cluster) with reporter.step(f"Check bucket {bucket_2} deleted"): with pytest.raises(Exception, match=r".*Not Found.*"): s3_client.head_bucket(bucket_2) buckets = s3_client.list_buckets() assert bucket_1 in buckets, f"Expected bucket {bucket_1} is in the list" assert bucket_2 not in buckets, f"Expected bucket {bucket_2} is not in the list" with reporter.step(f"Delete object from {bucket_1}"): s3_client.delete_object(bucket_1, file_name, version_id) s3_helper.check_objects_in_bucket(s3_client, bucket_1, expected_objects=[]) with reporter.step(f"Delete bucket {bucket_1}"): s3_client.delete_bucket(bucket_1) tick_epoch(client_shell, cluster) with reporter.step(f"Check bucket {bucket_1} deleted"): with pytest.raises(Exception, match=r".*Not Found.*"): s3_client.head_bucket(bucket_1) @allure.title("Object API (obj_size={object_size}, s3_client={s3_client})") @pytest.mark.parametrize( "object_size", ["simple", "complex"], indirect=True, ) def test_s3_api_object( self, s3_client: S3ClientWrapper, object_size: ObjectSize, two_buckets: tuple[str, str], ): """ Test base S3 Object API (Put/Head/List) for simple and complex objects. """ file_path = generate_file(object_size.value) file_name = s3_helper.object_key_from_file_path(file_path) bucket_1, bucket_2 = two_buckets for bucket in (bucket_1, bucket_2): with reporter.step("Bucket must be empty"): objects_list = s3_client.list_objects(bucket) assert not objects_list, f"Expected empty bucket, got {objects_list}" s3_client.put_object(bucket, file_path) s3_client.head_object(bucket, file_name) bucket_objects = s3_client.list_objects(bucket) assert file_name in bucket_objects, f"Expected file {file_name} in objects list {bucket_objects}" with reporter.step("Check object's attributes"): for attrs in (["ETag"], ["ObjectSize", "StorageClass"]): s3_client.get_object_attributes(bucket, file_name, attrs) @allure.title("Sync directory (s3_client={s3_client})") @pytest.mark.parametrize("s3_client", [AwsCliClient], indirect=True) def test_s3_sync_dir(self, s3_client: S3ClientWrapper, bucket: str, simple_object_size: ObjectSize): """ Test checks sync directory with AWS CLI utility. """ test_file_1 = TestFile(os.path.join(os.getcwd(), ASSETS_DIR, "test_sync", "test_file_1")) test_file_2 = TestFile(os.path.join(os.getcwd(), ASSETS_DIR, "test_sync", "test_file_2")) key_to_path = {"test_file_1": test_file_1.path, "test_file_2": test_file_2.path} generate_file_with_content(simple_object_size.value, test_file_1) generate_file_with_content(simple_object_size.value, test_file_2) s3_client.sync(bucket, os.path.dirname(test_file_1)) with reporter.step("Check objects are synced"): objects = s3_client.list_objects(bucket) with reporter.step("Check these are the same objects"): assert set(key_to_path.keys()) == set(objects), f"Expected all objects saved. Got {objects}" for obj_key in objects: got_object = s3_client.get_object(bucket, obj_key) assert get_file_hash(got_object) == get_file_hash( key_to_path.get(obj_key) ), "Expected hashes are the same" @allure.title("Object versioning (s3_client={s3_client})") def test_s3_api_versioning(self, s3_client: S3ClientWrapper, bucket: str, simple_object_size: ObjectSize): """ Test checks basic versioning functionality for S3 bucket. """ version_1_content = "Version 1" version_2_content = "Version 2" file_name_simple = generate_file_with_content(simple_object_size.value, content=version_1_content) obj_key = os.path.basename(file_name_simple) s3_helper.set_bucket_versioning(s3_client, bucket, VersioningStatus.ENABLED) with reporter.step("Put several versions of object into bucket"): version_id_1 = s3_client.put_object(bucket, file_name_simple) generate_file_with_content(simple_object_size.value, file_path=file_name_simple, content=version_2_content) version_id_2 = s3_client.put_object(bucket, file_name_simple) with reporter.step("Check bucket shows all versions"): versions = s3_client.list_objects_versions(bucket) obj_versions = {version.get("VersionId") for version in versions if version.get("Key") == obj_key} assert obj_versions == { version_id_1, version_id_2, }, f"Expected object has versions: {version_id_1, version_id_2}" with reporter.step("Show information about particular version"): for version_id in (version_id_1, version_id_2): response = s3_client.head_object(bucket, obj_key, version_id=version_id) assert "LastModified" in response, "Expected LastModified field" assert "ETag" in response, "Expected ETag field" assert response.get("VersionId") == version_id, f"Expected VersionId is {version_id}" assert response.get("ContentLength") != 0, "Expected ContentLength is not zero" with reporter.step("Check object's attributes"): for version_id in (version_id_1, version_id_2): got_attrs = s3_client.get_object_attributes(bucket, obj_key, ["ETag"], version_id=version_id) if got_attrs: assert got_attrs.get("VersionId") == version_id, f"Expected VersionId is {version_id}" with reporter.step("Delete object and check it was deleted"): response = s3_client.delete_object(bucket, obj_key) version_id_delete = response.get("VersionId") with pytest.raises(Exception, match=r".*Not Found.*"): s3_client.head_object(bucket, obj_key) with reporter.step("Get content for all versions and check it is correct"): for version, content in ( (version_id_2, version_2_content), (version_id_1, version_1_content), ): file_name = s3_client.get_object(bucket, obj_key, version_id=version) got_content = get_file_content(file_name) assert got_content == content, f"Expected object content is\n{content}\nGot\n{got_content}" with reporter.step("Restore previous object version"): s3_client.delete_object(bucket, obj_key, version_id=version_id_delete) file_name = s3_client.get_object(bucket, obj_key) got_content = get_file_content(file_name) assert ( got_content == version_2_content ), f"Expected object content is\n{version_2_content}\nGot\n{got_content}" @pytest.mark.s3_gate_multipart @allure.title("Object Multipart API (s3_client={s3_client})") def test_s3_api_multipart(self, s3_client: S3ClientWrapper, bucket: str, simple_object_size: ObjectSize): """ Test checks S3 Multipart API (Create multipart upload/Abort multipart upload/List multipart upload/ Upload part/List parts/Complete multipart upload). """ parts_count = 3 file_name_large = generate_file(simple_object_size.value * 1024 * 6 * parts_count) # 5Mb - min part object_key = s3_helper.object_key_from_file_path(file_name_large) part_files = split_file(file_name_large, parts_count) parts = [] uploads = s3_client.list_multipart_uploads(bucket) assert not uploads, f"Expected there is no uploads in bucket {bucket}" with reporter.step("Create and abort multipart upload"): upload_id = s3_client.create_multipart_upload(bucket, object_key) uploads = s3_client.list_multipart_uploads(bucket) assert uploads, f"Expected there one upload in bucket {bucket}" assert uploads[0].get("Key") == object_key, f"Expected correct key {object_key} in upload {uploads}" assert uploads[0].get("UploadId") == upload_id, f"Expected correct UploadId {upload_id} in upload {uploads}" s3_client.abort_multipart_upload(bucket, object_key, upload_id) uploads = s3_client.list_multipart_uploads(bucket) assert not uploads, f"Expected there is no uploads in bucket {bucket}" with reporter.step("Create new multipart upload and upload several parts"): upload_id = s3_client.create_multipart_upload(bucket, object_key) for part_id, file_path in enumerate(part_files, start=1): etag = s3_client.upload_part(bucket, object_key, upload_id, part_id, file_path) parts.append((part_id, etag)) with reporter.step("Check all parts are visible in bucket"): got_parts = s3_client.list_parts(bucket, object_key, upload_id) assert len(got_parts) == len(part_files), f"Expected {parts_count} parts, got\n{got_parts}" s3_client.complete_multipart_upload(bucket, object_key, upload_id, parts) uploads = s3_client.list_multipart_uploads(bucket) assert not uploads, f"Expected there is no uploads in bucket {bucket}" with reporter.step("Check we can get whole object from bucket"): got_object = s3_client.get_object(bucket, object_key) assert get_file_hash(got_object) == get_file_hash(file_name_large) self.check_object_attributes(s3_client, bucket, object_key, parts_count) @allure.title("Bucket tagging API (s3_client={s3_client})") def test_s3_api_bucket_tagging(self, s3_client: S3ClientWrapper, bucket: str): """ Test checks S3 Bucket tagging API (Put tag/Get tag). """ key_value_pair = [("some-key", "some-value"), ("some-key-2", "some-value-2")] s3_client.put_bucket_tagging(bucket, key_value_pair) s3_helper.check_tags_by_bucket(s3_client, bucket, key_value_pair) s3_client.delete_bucket_tagging(bucket) s3_helper.check_tags_by_bucket(s3_client, bucket, []) @allure.title("Object tagging API (s3_client={s3_client})") def test_s3_api_object_tagging(self, s3_client: S3ClientWrapper, bucket: str, simple_object_size: ObjectSize): """ Test checks S3 Object tagging API (Put tag/Get tag/Update tag). """ key_value_pair_bucket = [("some-key", "some-value"), ("some-key-2", "some-value-2")] key_value_pair_obj = [ ("some-key-obj", "some-value-obj"), ("some-key--obj2", "some-value--obj2"), ] key_value_pair_obj_new = [("some-key-obj-new", "some-value-obj-new")] file_name_simple = generate_file(simple_object_size.value) obj_key = s3_helper.object_key_from_file_path(file_name_simple) s3_client.put_bucket_tagging(bucket, key_value_pair_bucket) s3_client.put_object(bucket, file_name_simple) for tags in (key_value_pair_obj, key_value_pair_obj_new): s3_client.put_object_tagging(bucket, obj_key, tags) s3_helper.check_tags_by_object( s3_client, bucket, obj_key, tags, ) s3_client.delete_object_tagging(bucket, obj_key) s3_helper.check_tags_by_object(s3_client, bucket, obj_key, []) @allure.title("Delete object & delete objects (s3_client={s3_client})") def test_s3_api_delete( self, s3_client: S3ClientWrapper, two_buckets: tuple[str, str], simple_object_size: ObjectSize, complex_object_size: ObjectSize, ): """ Check delete_object and delete_objects S3 API operation. From first bucket some objects deleted one by one. From second bucket some objects deleted all at once. """ max_obj_count = 20 max_delete_objects = 17 put_objects = [] file_paths = [] obj_sizes = [simple_object_size, complex_object_size] bucket_1, bucket_2 = two_buckets with reporter.step(f"Generate {max_obj_count} files"): for _ in range(max_obj_count): file_paths.append(generate_file(choice(obj_sizes).value)) for bucket in (bucket_1, bucket_2): with reporter.step(f"Bucket {bucket} must be empty as it just created"): objects_list = s3_client.list_objects_v2(bucket) assert not objects_list, f"Expected empty bucket, got {objects_list}" for file_path in file_paths: s3_client.put_object(bucket, file_path) put_objects.append(s3_helper.object_key_from_file_path(file_path)) with reporter.step(f"Check all objects put in bucket {bucket} successfully"): bucket_objects = s3_client.list_objects_v2(bucket) assert set(put_objects) == set( bucket_objects ), f"Expected all objects {put_objects} in objects list {bucket_objects}" with reporter.step("Delete some objects from bucket_1 one by one"): objects_to_delete_b1 = choices(put_objects, k=max_delete_objects) for obj in objects_to_delete_b1: s3_client.delete_object(bucket_1, obj) with reporter.step("Check deleted objects are not visible in bucket bucket_1"): bucket_objects = s3_client.list_objects_v2(bucket_1) assert set(put_objects).difference(set(objects_to_delete_b1)) == set( bucket_objects ), f"Expected all objects {put_objects} in objects list {bucket_objects}" for object_key in objects_to_delete_b1: with pytest.raises(Exception, match="The specified key does not exist"): s3_client.get_object(bucket_1, object_key) with reporter.step("Delete some objects from bucket_2 at once"): objects_to_delete_b2 = choices(put_objects, k=max_delete_objects) s3_client.delete_objects(bucket_2, objects_to_delete_b2) with reporter.step("Check deleted objects are not visible in bucket bucket_2"): objects_list = s3_client.list_objects_v2(bucket_2) assert set(put_objects).difference(set(objects_to_delete_b2)) == set( objects_list ), f"Expected all objects {put_objects} in objects list {bucket_objects}" for object_key in objects_to_delete_b2: with pytest.raises(Exception, match="The specified key does not exist"): s3_client.get_object(bucket_2, object_key) @allure.title("Copy object to the same bucket (s3_client={s3_client})") def test_s3_copy_same_bucket( self, s3_client: S3ClientWrapper, bucket: str, complex_object_size: ObjectSize, simple_object_size: ObjectSize, ): """ Test object can be copied to the same bucket. #TODO: delete after test_s3_copy_object will be merge """ file_path_simple = generate_file(simple_object_size.value) file_path_large = generate_file(complex_object_size.value) file_name_simple = s3_helper.object_key_from_file_path(file_path_simple) file_name_large = s3_helper.object_key_from_file_path(file_path_large) bucket_objects = [file_name_simple, file_name_large] with reporter.step("Bucket must be empty"): objects_list = s3_client.list_objects(bucket) assert not objects_list, f"Expected empty bucket, got {objects_list}" with reporter.step("Put objects into bucket"): for file_path in (file_path_simple, file_path_large): s3_client.put_object(bucket, file_path) with reporter.step("Copy one object into the same bucket"): copy_obj_path = s3_client.copy_object(bucket, file_name_simple) bucket_objects.append(copy_obj_path) s3_helper.check_objects_in_bucket(s3_client, bucket, bucket_objects) with reporter.step("Check copied object has the same content"): got_copied_file = s3_client.get_object(bucket, copy_obj_path) assert get_file_hash(file_path_simple) == get_file_hash(got_copied_file), "Hashes must be the same" with reporter.step("Delete one object from bucket"): s3_client.delete_object(bucket, file_name_simple) bucket_objects.remove(file_name_simple) s3_helper.check_objects_in_bucket( s3_client, bucket, expected_objects=bucket_objects, unexpected_objects=[file_name_simple], ) @allure.title("Copy object to another bucket (s3_client={s3_client})") def test_s3_copy_to_another_bucket( self, s3_client: S3ClientWrapper, two_buckets: tuple[str, str], complex_object_size: ObjectSize, simple_object_size: ObjectSize, ): """ Test object can be copied to another bucket. #TODO: delete after test_s3_copy_object will be merge """ file_path_simple = generate_file(simple_object_size.value) file_path_large = generate_file(complex_object_size.value) file_name_simple = s3_helper.object_key_from_file_path(file_path_simple) file_name_large = s3_helper.object_key_from_file_path(file_path_large) bucket_1_objects = [file_name_simple, file_name_large] bucket_1, bucket_2 = two_buckets with reporter.step("Buckets must be empty"): for bucket in (bucket_1, bucket_2): objects_list = s3_client.list_objects(bucket) assert not objects_list, f"Expected empty bucket, got {objects_list}" with reporter.step("Put objects into one bucket"): for file_path in (file_path_simple, file_path_large): s3_client.put_object(bucket_1, file_path) with reporter.step("Copy object from first bucket into second"): copy_obj_path_b2 = s3_client.copy_object(bucket_1, file_name_large, bucket=bucket_2) s3_helper.check_objects_in_bucket(s3_client, bucket_1, expected_objects=bucket_1_objects) s3_helper.check_objects_in_bucket(s3_client, bucket_2, expected_objects=[copy_obj_path_b2]) with reporter.step("Check copied object has the same content"): got_copied_file_b2 = s3_client.get_object(bucket_2, copy_obj_path_b2) assert get_file_hash(file_path_large) == get_file_hash(got_copied_file_b2), "Hashes must be the same" with reporter.step("Delete one object from first bucket"): s3_client.delete_object(bucket_1, file_name_simple) bucket_1_objects.remove(file_name_simple) s3_helper.check_objects_in_bucket(s3_client, bucket_1, expected_objects=bucket_1_objects) s3_helper.check_objects_in_bucket(s3_client, bucket_2, expected_objects=[copy_obj_path_b2]) with reporter.step("Delete one object from second bucket and check it is empty"): s3_client.delete_object(bucket_2, copy_obj_path_b2) s3_helper.check_objects_in_bucket(s3_client, bucket_2, expected_objects=[]) def check_object_attributes(self, s3_client: S3ClientWrapper, bucket: str, object_key: str, parts_count: int): if not isinstance(s3_client, AwsCliClient): logger.warning("Attributes check is not supported for boto3 implementation") return with reporter.step("Check object's attributes"): obj_parts = s3_client.get_object_attributes(bucket, object_key, ["ObjectParts"], full_output=False) assert obj_parts.get("TotalPartsCount") == parts_count, f"Expected TotalPartsCount is {parts_count}" assert len(obj_parts.get("Parts")) == parts_count, f"Expected Parts cunt is {parts_count}" with reporter.step("Check object's attribute max-parts"): max_parts = 2 obj_parts = s3_client.get_object_attributes( bucket, object_key, ["ObjectParts"], max_parts=max_parts, full_output=False, ) assert obj_parts.get("TotalPartsCount") == parts_count, f"Expected TotalPartsCount is {parts_count}" assert obj_parts.get("MaxParts") == max_parts, f"Expected MaxParts is {parts_count}" assert len(obj_parts.get("Parts")) == max_parts, f"Expected Parts count is {parts_count}" with reporter.step("Check object's attribute part-number-marker"): part_number_marker = 3 obj_parts = s3_client.get_object_attributes( bucket, object_key, ["ObjectParts"], part_number=part_number_marker, full_output=False, ) assert obj_parts.get("TotalPartsCount") == parts_count, f"Expected TotalPartsCount is {parts_count}" assert ( obj_parts.get("PartNumberMarker") == part_number_marker ), f"Expected PartNumberMarker is {part_number_marker}" assert len(obj_parts.get("Parts")) == 1, f"Expected Parts count is {parts_count}"