frostfs-testcases/pytest_tests/testsuites/services/s3_gate/test_s3_object.py
a.berezin e65d19f056
Some checks reported warnings
DCO check / Commits Check (pull_request) Has been cancelled
[#258] Speed up tests by removing cleanup and per test healthcheck
Signed-off-by: a.berezin <a.berezin@yadro.com>
2024-06-25 02:27:54 +03:00

936 lines
47 KiB
Python

import os
import random
import string
import uuid
from datetime import datetime, timedelta
from typing import Literal
import allure
import pytest
from frostfs_testlib import reporter
from frostfs_testlib.resources.common import ASSETS_DIR, DEFAULT_WALLET_PASS
from frostfs_testlib.resources.error_patterns import S3_BUCKET_DOES_NOT_ALLOW_ACL, S3_MALFORMED_XML_REQUEST
from frostfs_testlib.resources.s3_acl_grants import PRIVATE_GRANTS
from frostfs_testlib.s3 import AwsCliClient, S3ClientWrapper, VersioningStatus
from frostfs_testlib.steps.s3 import s3_helper
from frostfs_testlib.storage.dataclasses.object_size import ObjectSize
from frostfs_testlib.testing.test_control import expect_not_raises
from frostfs_testlib.utils import wallet_utils
from frostfs_testlib.utils.file_utils import (
TestFile,
concat_files,
generate_file,
generate_file_with_content,
get_file_hash,
)
@pytest.mark.s3_gate
@pytest.mark.s3_gate_object
class TestS3GateObject:
@pytest.fixture
def second_wallet_public_key(self):
second_wallet = os.path.join(os.getcwd(), ASSETS_DIR, f"{str(uuid.uuid4())}.json")
wallet_utils.init_wallet(second_wallet, DEFAULT_WALLET_PASS)
public_key = wallet_utils.get_wallet_public_key(second_wallet, DEFAULT_WALLET_PASS)
yield public_key
@allure.title("Object API (obj_size={object_size}, s3_client={s3_client})")
@pytest.mark.parametrize(
"object_size",
["simple", "complex"],
indirect=True,
)
def test_s3_api_object(
self,
s3_client: S3ClientWrapper,
object_size: ObjectSize,
bucket: str,
):
"""
Test base S3 Object API (Put/Head/List) for simple and complex objects.
"""
with reporter.step("Prepare object to upload"):
test_file = generate_file(object_size.value)
file_name = s3_helper.object_key_from_file_path(test_file)
with reporter.step("Put object to bucket"):
s3_client.put_object(bucket, test_file)
with reporter.step("Head object from bucket"):
s3_client.head_object(bucket, file_name)
with reporter.step("Verify object in list"):
bucket_objects = s3_client.list_objects(bucket)
assert file_name in bucket_objects, f"Expected file {file_name} in objects list {bucket_objects}"
with reporter.step("Check object's attributes"):
for attrs in (["ETag"], ["ObjectSize", "StorageClass"]):
s3_client.get_object_attributes(bucket, file_name, attrs)
@allure.title("Copy object (s3_client={s3_client})")
def test_s3_copy_object(
self,
s3_client: S3ClientWrapper,
two_buckets: list[str],
simple_object_size: ObjectSize,
):
file_path = generate_file(simple_object_size.value)
file_name = s3_helper.object_key_from_file_path(file_path)
bucket_1_objects = [file_name]
bucket_1, bucket_2 = two_buckets
with reporter.step("Put object into one bucket"):
s3_client.put_object(bucket_1, file_path)
with reporter.step("Copy one object into the same bucket"):
copy_obj_path = s3_client.copy_object(bucket_1, file_name)
bucket_1_objects.append(copy_obj_path)
s3_helper.check_objects_in_bucket(s3_client, bucket_1, bucket_1_objects)
objects_list = s3_client.list_objects(bucket_2)
assert not objects_list, f"Expected empty bucket, got {objects_list}"
with reporter.step("Copy object from first bucket into second"):
copy_obj_path_b2 = s3_client.copy_object(bucket_1, file_name, bucket=bucket_2)
s3_helper.check_objects_in_bucket(s3_client, bucket_1, expected_objects=bucket_1_objects)
s3_helper.check_objects_in_bucket(s3_client, bucket_2, expected_objects=[copy_obj_path_b2])
with reporter.step("Check copied object has the same content"):
got_copied_file_b2 = s3_client.get_object(bucket_2, copy_obj_path_b2)
assert get_file_hash(file_path) == get_file_hash(got_copied_file_b2), "Hashes must be the same"
with reporter.step("Delete one object from first bucket"):
s3_client.delete_object(bucket_1, file_name)
bucket_1_objects.remove(file_name)
s3_helper.check_objects_in_bucket(s3_client, bucket_1, expected_objects=bucket_1_objects)
s3_helper.check_objects_in_bucket(s3_client, bucket_2, expected_objects=[copy_obj_path_b2])
with reporter.step("Copy one object into the same bucket"):
with pytest.raises(Exception):
s3_client.copy_object(bucket_1, file_name)
@allure.title("Copy version of object (s3_client={s3_client})")
def test_s3_copy_version_object(
self,
s3_client: S3ClientWrapper,
two_buckets: list[str],
simple_object_size: ObjectSize,
):
version_1_content = "Version 1"
file_name_simple = generate_file_with_content(simple_object_size.value, content=version_1_content)
obj_key = os.path.basename(file_name_simple)
bucket_1, bucket_2 = two_buckets
s3_helper.set_bucket_versioning(s3_client, bucket_1, VersioningStatus.ENABLED)
with reporter.step("Put object into bucket"):
s3_client.put_object(bucket_1, file_name_simple)
bucket_1_objects = [obj_key]
s3_helper.check_objects_in_bucket(s3_client, bucket_1, [obj_key])
with reporter.step("Copy one object into the same bucket"):
copy_obj_path = s3_client.copy_object(bucket_1, obj_key)
bucket_1_objects.append(copy_obj_path)
s3_helper.check_objects_in_bucket(s3_client, bucket_1, bucket_1_objects)
s3_helper.set_bucket_versioning(s3_client, bucket_2, VersioningStatus.ENABLED)
with reporter.step("Copy object from first bucket into second"):
copy_obj_path_b2 = s3_client.copy_object(bucket_1, obj_key, bucket=bucket_2)
s3_helper.check_objects_in_bucket(s3_client, bucket_1, expected_objects=bucket_1_objects)
s3_helper.check_objects_in_bucket(s3_client, bucket_2, expected_objects=[copy_obj_path_b2])
with reporter.step("Delete one object from first bucket and check object in bucket"):
s3_client.delete_object(bucket_1, obj_key)
bucket_1_objects.remove(obj_key)
s3_helper.check_objects_in_bucket(s3_client, bucket_1, expected_objects=bucket_1_objects)
with reporter.step("Copy one object into the same bucket"):
with pytest.raises(Exception):
s3_client.copy_object(bucket_1, obj_key)
@allure.title("Copy with acl (s3_client={s3_client})")
def test_s3_copy_acl(self, s3_client: S3ClientWrapper, bucket: str, simple_object_size: ObjectSize):
file_path = generate_file_with_content(simple_object_size.value)
file_name = os.path.basename(file_path)
s3_helper.set_bucket_versioning(s3_client, bucket, VersioningStatus.ENABLED)
with reporter.step("Put object into bucket"):
s3_client.put_object(bucket, file_path)
s3_helper.check_objects_in_bucket(s3_client, bucket, [file_name])
with reporter.step("[NEGATIVE] Copy object with public-read-write ACL"):
with pytest.raises(Exception, match=S3_BUCKET_DOES_NOT_ALLOW_ACL):
copy_path = s3_client.copy_object(bucket, file_name, acl="public-read-write")
with reporter.step("Copy object with private ACL"):
copy_path = s3_client.copy_object(bucket, file_name, acl="private")
object_grants = s3_client.get_object_acl(bucket, copy_path)
s3_helper.verify_acl_permissions(object_grants, PRIVATE_GRANTS)
@allure.title("Copy object with metadata (s3_client={s3_client})")
def test_s3_copy_metadate(self, s3_client: S3ClientWrapper, bucket: str, simple_object_size: ObjectSize):
object_metadata = {f"{uuid.uuid4()}": f"{uuid.uuid4()}"}
file_path = generate_file(simple_object_size.value)
file_name = s3_helper.object_key_from_file_path(file_path)
bucket_1_objects = [file_name]
s3_helper.set_bucket_versioning(s3_client, bucket, VersioningStatus.ENABLED)
with reporter.step("Put object into bucket"):
s3_client.put_object(bucket, file_path, metadata=object_metadata)
bucket_1_objects = [file_name]
s3_helper.check_objects_in_bucket(s3_client, bucket, bucket_1_objects)
with reporter.step("Copy one object"):
copy_obj_path = s3_client.copy_object(bucket, file_name)
bucket_1_objects.append(copy_obj_path)
s3_helper.check_objects_in_bucket(s3_client, bucket, bucket_1_objects)
obj_head = s3_client.head_object(bucket, copy_obj_path)
assert obj_head.get("Metadata") == object_metadata, f"Metadata must be {object_metadata}"
with reporter.step("Copy one object with metadata"):
copy_obj_path = s3_client.copy_object(bucket, file_name, metadata_directive="COPY")
bucket_1_objects.append(copy_obj_path)
obj_head = s3_client.head_object(bucket, copy_obj_path)
assert obj_head.get("Metadata") == object_metadata, f"Metadata must be {object_metadata}"
with reporter.step("Copy one object with new metadata"):
object_metadata_1 = {f"{uuid.uuid4()}": f"{uuid.uuid4()}"}
copy_obj_path = s3_client.copy_object(
bucket,
file_name,
metadata_directive="REPLACE",
metadata=object_metadata_1,
)
bucket_1_objects.append(copy_obj_path)
obj_head = s3_client.head_object(bucket, copy_obj_path)
assert obj_head.get("Metadata") == object_metadata_1, f"Metadata must be {object_metadata_1}"
@allure.title("Copy object with tagging (s3_client={s3_client})")
def test_s3_copy_tagging(self, s3_client: S3ClientWrapper, bucket: str, simple_object_size: ObjectSize):
object_tagging = [(f"{uuid.uuid4()}", f"{uuid.uuid4()}")]
file_path = generate_file(simple_object_size.value)
file_name_simple = s3_helper.object_key_from_file_path(file_path)
bucket_1_objects = [file_name_simple]
s3_helper.set_bucket_versioning(s3_client, bucket, VersioningStatus.ENABLED)
with reporter.step("Put several versions of object into bucket"):
s3_client.put_object(bucket, file_path)
s3_client.put_object_tagging(bucket, file_name_simple, tags=object_tagging)
bucket_1_objects = [file_name_simple]
s3_helper.check_objects_in_bucket(s3_client, bucket, bucket_1_objects)
with reporter.step("Copy one object without tag"):
copy_obj_path = s3_client.copy_object(bucket, file_name_simple)
got_tags = s3_client.get_object_tagging(bucket, copy_obj_path)
assert got_tags, f"Expected tags, got {got_tags}"
expected_tags = [{"Key": key, "Value": value} for key, value in object_tagging]
for tag in expected_tags:
assert tag in got_tags, f"Expected tag {tag} in {got_tags}"
with reporter.step("Copy one object with tag"):
copy_obj_path_1 = s3_client.copy_object(bucket, file_name_simple, tagging_directive="COPY")
got_tags = s3_client.get_object_tagging(bucket, copy_obj_path_1)
assert got_tags, f"Expected tags, got {got_tags}"
expected_tags = [{"Key": key, "Value": value} for key, value in object_tagging]
for tag in expected_tags:
assert tag in got_tags, f"Expected tag {tag} in {got_tags}"
with reporter.step("Copy one object with new tag"):
tag_key = "tag1"
tag_value = uuid.uuid4()
new_tag = f"{tag_key}={tag_value}"
copy_obj_path = s3_client.copy_object(
bucket,
file_name_simple,
tagging_directive="REPLACE",
tagging=new_tag,
)
got_tags = s3_client.get_object_tagging(bucket, copy_obj_path)
assert got_tags, f"Expected tags, got {got_tags}"
expected_tags = [{"Key": tag_key, "Value": str(tag_value)}]
for tag in expected_tags:
assert tag in got_tags, f"Expected tag {tag} in {got_tags}"
@allure.title("Delete version of object (s3_client={s3_client})")
def test_s3_delete_versioning(
self,
s3_client: S3ClientWrapper,
bucket: str,
simple_object_size: ObjectSize,
complex_object_size: ObjectSize,
):
version_1_content = "Version 1"
version_2_content = "Version 2"
file_name_simple = generate_file_with_content(simple_object_size.value, content=version_1_content)
obj_key = os.path.basename(file_name_simple)
s3_helper.set_bucket_versioning(s3_client, bucket, VersioningStatus.ENABLED)
with reporter.step("Put several versions of object into bucket"):
version_id_1 = s3_client.put_object(bucket, file_name_simple)
file_name_1 = generate_file_with_content(simple_object_size.value, file_name_simple, version_2_content)
version_id_2 = s3_client.put_object(bucket, file_name_1)
with reporter.step("Check bucket shows all versions"):
versions = s3_client.list_objects_versions(bucket)
obj_versions = {version.get("VersionId") for version in versions if version.get("Key") == obj_key}
assert obj_versions == {
version_id_1,
version_id_2,
}, f"Object should have versions: {version_id_1, version_id_2}"
with reporter.step("Delete 1 version of object"):
delete_obj = s3_client.delete_object(bucket, obj_key, version_id=version_id_1)
versions = s3_client.list_objects_versions(bucket)
obj_versions = {version.get("VersionId") for version in versions if version.get("Key") == obj_key}
assert obj_versions == {version_id_2}, f"Object should have versions: {version_id_2}"
assert "DeleteMarker" not in delete_obj.keys(), "Delete markers should not be created"
with reporter.step("Delete second version of object"):
delete_obj = s3_client.delete_object(bucket, obj_key, version_id_2)
versions = s3_client.list_objects_versions(bucket)
obj_versions = {version.get("VersionId") for version in versions if version.get("Key") == obj_key}
assert not obj_versions, "Expected object not found"
assert "DeleteMarker" not in delete_obj.keys(), "Delete markers should not be created"
with reporter.step("Put new object into bucket"):
file_name_complex = generate_file(complex_object_size.value)
obj_key = os.path.basename(file_name_complex)
s3_client.put_object(bucket, file_name_complex)
with reporter.step("Delete last object"):
delete_obj = s3_client.delete_object(bucket, obj_key)
versions = s3_client.list_objects_versions(bucket, True)
assert versions.get("DeleteMarkers", None), "Expected delete Marker"
assert "DeleteMarker" in delete_obj.keys(), "Expected delete Marker"
@allure.title("Bulk delete version of object (s3_client={s3_client})")
def test_s3_bulk_delete_versioning(self, s3_client: S3ClientWrapper, bucket: str, simple_object_size: ObjectSize):
version_1_content = "Version 1"
version_2_content = "Version 2"
version_3_content = "Version 3"
version_4_content = "Version 4"
file_name_1 = generate_file_with_content(simple_object_size.value, content=version_1_content)
obj_key = os.path.basename(file_name_1)
s3_helper.set_bucket_versioning(s3_client, bucket, VersioningStatus.ENABLED)
with reporter.step("Put several versions of object into bucket"):
version_id_1 = s3_client.put_object(bucket, file_name_1)
file_name_2 = generate_file_with_content(simple_object_size.value, file_name_1, version_2_content)
version_id_2 = s3_client.put_object(bucket, file_name_2)
file_name_3 = generate_file_with_content(simple_object_size.value, file_name_1, version_3_content)
version_id_3 = s3_client.put_object(bucket, file_name_3)
file_name_4 = generate_file_with_content(simple_object_size.value, file_name_1, version_4_content)
version_id_4 = s3_client.put_object(bucket, file_name_4)
version_ids = {version_id_1, version_id_2, version_id_3, version_id_4}
with reporter.step("Check bucket shows all versions"):
versions = s3_client.list_objects_versions(bucket)
obj_versions = {version.get("VersionId") for version in versions if version.get("Key") == obj_key}
assert obj_versions == version_ids, f"Object should have versions: {version_ids}"
with reporter.step("Delete two objects from bucket one by one"):
version_to_delete_b1 = random.sample([version_id_1, version_id_2, version_id_3, version_id_4], k=2)
version_to_save = list(set(version_ids) - set(version_to_delete_b1))
for ver in version_to_delete_b1:
s3_client.delete_object(bucket, obj_key, ver)
with reporter.step("Check bucket shows all versions"):
versions = s3_client.list_objects_versions(bucket)
obj_versions = [version.get("VersionId") for version in versions if version.get("Key") == obj_key]
assert obj_versions.sort() == version_to_save.sort(), f"Object should have versions: {version_to_save}"
@allure.title("Get versions of object (s3_client={s3_client})")
def test_s3_get_versioning(self, s3_client: S3ClientWrapper, bucket: str, simple_object_size: ObjectSize):
version_1_content = "Version 1"
version_2_content = "Version 2"
file_name_simple = generate_file_with_content(simple_object_size.value, content=version_1_content)
obj_key = os.path.basename(file_name_simple)
s3_helper.set_bucket_versioning(s3_client, bucket, VersioningStatus.ENABLED)
with reporter.step("Put several versions of object into bucket"):
version_id_1 = s3_client.put_object(bucket, file_name_simple)
file_name_1 = generate_file_with_content(
simple_object_size.value, file_path=file_name_simple, content=version_2_content
)
version_id_2 = s3_client.put_object(bucket, file_name_1)
with reporter.step("Get first version of object"):
object_1 = s3_client.get_object(bucket, obj_key, version_id_1, full_output=True)
assert object_1.get("VersionId") == version_id_1, f"Get object with version {version_id_1}"
with reporter.step("Get second version of object"):
object_2 = s3_client.get_object(bucket, obj_key, version_id_2, full_output=True)
assert object_2.get("VersionId") == version_id_2, f"Get object with version {version_id_2}"
with reporter.step("Get object"):
object_3 = s3_client.get_object(bucket, obj_key, full_output=True)
assert object_3.get("VersionId") == version_id_2, f"Get object with version {version_id_2}"
@allure.title("Get range (s3_client={s3_client})")
def test_s3_get_range(
self,
s3_client: S3ClientWrapper,
bucket: str,
complex_object_size: ObjectSize,
simple_object_size: ObjectSize,
):
file_path = generate_file(complex_object_size.value)
file_name = s3_helper.object_key_from_file_path(file_path)
file_hash = get_file_hash(file_path)
s3_helper.set_bucket_versioning(s3_client, bucket, VersioningStatus.ENABLED)
with reporter.step("Put several versions of object into bucket"):
version_id_1 = s3_client.put_object(bucket, file_path)
file_name_1 = generate_file_with_content(simple_object_size.value, file_path=file_path)
version_id_2 = s3_client.put_object(bucket, file_name_1)
with reporter.step("Get first version of object"):
object_1_part_1 = s3_client.get_object(
bucket,
file_name,
version_id_1,
object_range=[0, int(complex_object_size.value / 3)],
)
object_1_part_2 = s3_client.get_object(
bucket,
file_name,
version_id_1,
object_range=[
int(complex_object_size.value / 3) + 1,
2 * int(complex_object_size.value / 3),
],
)
object_1_part_3 = s3_client.get_object(
bucket,
file_name,
version_id_1,
object_range=[
2 * int(complex_object_size.value / 3) + 1,
complex_object_size.value,
],
)
con_file = concat_files([object_1_part_1, object_1_part_2, object_1_part_3])
assert get_file_hash(con_file) == file_hash, "Hashes must be the same"
with reporter.step("Get second version of object"):
object_2_part_1 = s3_client.get_object(
bucket,
file_name,
version_id_2,
object_range=[0, int(simple_object_size.value / 3)],
)
object_2_part_2 = s3_client.get_object(
bucket,
file_name,
version_id_2,
object_range=[
int(simple_object_size.value / 3) + 1,
2 * int(simple_object_size.value / 3),
],
)
object_2_part_3 = s3_client.get_object(
bucket,
file_name,
version_id_2,
object_range=[2 * int(simple_object_size.value / 3) + 1, simple_object_size.value],
)
con_file_1 = concat_files([object_2_part_1, object_2_part_2, object_2_part_3])
assert get_file_hash(con_file_1) == get_file_hash(file_name_1), "Hashes must be the same"
with reporter.step("Get object"):
object_3_part_1 = s3_client.get_object(
bucket, file_name, object_range=[0, int(simple_object_size.value / 3)]
)
object_3_part_2 = s3_client.get_object(
bucket,
file_name,
object_range=[
int(simple_object_size.value / 3) + 1,
2 * int(simple_object_size.value / 3),
],
)
object_3_part_3 = s3_client.get_object(
bucket,
file_name,
object_range=[2 * int(simple_object_size.value / 3) + 1, simple_object_size.value],
)
con_file = concat_files([object_3_part_1, object_3_part_2, object_3_part_3])
assert get_file_hash(con_file) == get_file_hash(file_name_1), "Hashes must be the same"
def copy_extend_list(self, original_list: list[str], n: int) -> list[str]:
"""Extend the list with own elements up to n elements"""
multiplier = n // len(original_list)
result_list = original_list.copy()
result_list = result_list * multiplier
for i in range(n - len(result_list)):
result_list.append(result_list[i])
return result_list
@allure.title("Bulk deletion is limited to 1000 objects (s3_client={s3_client})")
def test_s3_bulk_deletion_limit(
self,
s3_client: S3ClientWrapper,
bucket: str,
simple_object_size: ObjectSize,
):
objects_in_bucket = []
objects_count = 3
with reporter.step(f"Put {objects_count} into bucket"):
for _ in range(objects_count):
file_path = generate_file(simple_object_size.value)
file_name = s3_helper.object_key_from_file_path(file_path)
objects_in_bucket.append(file_name)
s3_client.put_object(bucket, file_path)
# Extend deletion list to 1001 elements with same keys for test speed
objects_to_delete = self.copy_extend_list(objects_in_bucket, 1001)
with reporter.step("Send delete request with 1001 objects and expect error"):
with pytest.raises(Exception, match=S3_MALFORMED_XML_REQUEST):
s3_client.delete_objects(bucket, objects_to_delete)
with reporter.step("Send delete request with 1000 objects without error"):
with expect_not_raises():
s3_client.delete_objects(bucket, objects_to_delete[:1000])
@allure.title("Object head is unloaded with the correct version (s3_client={s3_client})")
@pytest.mark.smoke
def test_s3_head_object(
self,
s3_client: S3ClientWrapper,
bucket: str,
complex_object_size: ObjectSize,
simple_object_size: ObjectSize,
):
object_metadata = {f"{uuid.uuid4()}": f"{uuid.uuid4()}"}
file_path = generate_file(complex_object_size.value)
file_name = s3_helper.object_key_from_file_path(file_path)
s3_helper.set_bucket_versioning(s3_client, bucket, VersioningStatus.ENABLED)
with reporter.step("Put several versions of object into bucket"):
version_id_1 = s3_client.put_object(bucket, file_path, metadata=object_metadata)
file_name_1 = generate_file_with_content(simple_object_size.value, file_path=file_path)
version_id_2 = s3_client.put_object(bucket, file_name_1)
with reporter.step("Get head of first version of object"):
response = s3_client.head_object(bucket, file_name)
assert "LastModified" in response, "Expected LastModified field"
assert "ETag" in response, "Expected ETag field"
assert response.get("Metadata") == {}, "Expected Metadata empty"
assert response.get("VersionId") == version_id_2, f"Expected VersionId is {version_id_2}"
assert response.get("ContentLength") != 0, "Expected ContentLength is not zero"
with reporter.step("Get head ob first version of object"):
response = s3_client.head_object(bucket, file_name, version_id=version_id_1)
assert "LastModified" in response, "Expected LastModified field"
assert "ETag" in response, "Expected ETag field"
assert response.get("Metadata") == object_metadata, f"Expected Metadata is {object_metadata}"
assert response.get("VersionId") == version_id_1, f"Expected VersionId is {version_id_1}"
assert response.get("ContentLength") != 0, "Expected ContentLength is not zero"
@allure.title("List of objects with version (method_version={list_type}, s3_client={s3_client})")
@pytest.mark.parametrize("list_type", ["v1", "v2"])
def test_s3_list_object(
self,
s3_client: S3ClientWrapper,
list_type: str,
bucket: str,
complex_object_size: ObjectSize,
):
file_path_1 = generate_file(complex_object_size.value)
file_name = s3_helper.object_key_from_file_path(file_path_1)
file_path_2 = generate_file(complex_object_size.value)
file_name_2 = s3_helper.object_key_from_file_path(file_path_2)
s3_helper.set_bucket_versioning(s3_client, bucket, VersioningStatus.ENABLED)
with reporter.step("Put several versions of object into bucket"):
s3_client.put_object(bucket, file_path_1)
s3_client.put_object(bucket, file_path_2)
with reporter.step("Get list of object"):
if list_type == "v1":
list_obj = s3_client.list_objects(bucket)
elif list_type == "v2":
list_obj = s3_client.list_objects_v2(bucket)
assert len(list_obj) == 2, "bucket should have 2 objects"
assert (
list_obj.sort() == [file_name, file_name_2].sort()
), f"bucket should have object key {file_name, file_name_2}"
with reporter.step("Delete object"):
delete_obj = s3_client.delete_object(bucket, file_name)
if list_type == "v1":
list_obj_1 = s3_client.list_objects(bucket, full_output=True)
elif list_type == "v2":
list_obj_1 = s3_client.list_objects_v2(bucket, full_output=True)
contents = list_obj_1.get("Contents", [])
assert len(contents) == 1, "bucket should have only 1 object"
assert contents[0].get("Key") == file_name_2, f"bucket should have object key {file_name_2}"
assert "DeleteMarker" in delete_obj.keys(), "Expected delete Marker"
@allure.title("Put object (s3_client={s3_client})")
def test_s3_put_object(
self,
s3_client: S3ClientWrapper,
bucket: str,
complex_object_size: ObjectSize,
simple_object_size: ObjectSize,
):
file_path_1 = generate_file(complex_object_size.value)
file_name = s3_helper.object_key_from_file_path(file_path_1)
object_1_metadata = {f"{uuid.uuid4()}": f"{uuid.uuid4()}"}
tag_key_1 = "tag1"
tag_value_1 = uuid.uuid4()
tag_1 = f"{tag_key_1}={tag_value_1}"
object_2_metadata = {f"{uuid.uuid4()}": f"{uuid.uuid4()}"}
tag_key_2 = "tag2"
tag_value_2 = uuid.uuid4()
tag_2 = f"{tag_key_2}={tag_value_2}"
s3_helper.set_bucket_versioning(s3_client, bucket, VersioningStatus.SUSPENDED)
with reporter.step("Put first object into bucket"):
s3_client.put_object(bucket, file_path_1, metadata=object_1_metadata, tagging=tag_1)
obj_head = s3_client.head_object(bucket, file_name)
assert obj_head.get("Metadata") == object_1_metadata, "Metadata must be the same"
got_tags = s3_client.get_object_tagging(bucket, file_name)
assert got_tags, f"Expected tags, got {got_tags}"
assert got_tags == [{"Key": tag_key_1, "Value": str(tag_value_1)}], "Tags must be the same"
with reporter.step("Rewrite file into bucket"):
file_path_2 = generate_file_with_content(simple_object_size.value, file_path=file_path_1)
s3_client.put_object(bucket, file_path_2, metadata=object_2_metadata, tagging=tag_2)
obj_head = s3_client.head_object(bucket, file_name)
assert obj_head.get("Metadata") == object_2_metadata, "Metadata must be the same"
got_tags_1 = s3_client.get_object_tagging(bucket, file_name)
assert got_tags_1, f"Expected tags, got {got_tags_1}"
assert got_tags_1 == [{"Key": tag_key_2, "Value": str(tag_value_2)}], "Tags must be the same"
s3_helper.set_bucket_versioning(s3_client, bucket, VersioningStatus.ENABLED)
file_path_3 = generate_file(complex_object_size.value)
file_hash = get_file_hash(file_path_3)
file_name_3 = s3_helper.object_key_from_file_path(file_path_3)
object_3_metadata = {f"{uuid.uuid4()}": f"{uuid.uuid4()}"}
tag_key_3 = "tag3"
tag_value_3 = uuid.uuid4()
tag_3 = f"{tag_key_3}={tag_value_3}"
with reporter.step("Put third object into bucket"):
version_id_1 = s3_client.put_object(bucket, file_path_3, metadata=object_3_metadata, tagging=tag_3)
obj_head_3 = s3_client.head_object(bucket, file_name_3)
assert obj_head_3.get("Metadata") == object_3_metadata, "Matadata must be the same"
got_tags_3 = s3_client.get_object_tagging(bucket, file_name_3)
assert got_tags_3, f"Expected tags, got {got_tags_3}"
assert got_tags_3 == [{"Key": tag_key_3, "Value": str(tag_value_3)}], "Tags must be the same"
with reporter.step("Put new version of file into bucket"):
file_path_4 = generate_file_with_content(simple_object_size.value, file_path=file_path_3)
version_id_2 = s3_client.put_object(bucket, file_path_4)
versions = s3_client.list_objects_versions(bucket)
obj_versions = {version.get("VersionId") for version in versions if version.get("Key") == file_name_3}
assert obj_versions == {
version_id_1,
version_id_2,
}, f"Object should have versions: {version_id_1, version_id_2}"
got_tags_4 = s3_client.get_object_tagging(bucket, file_name_3)
assert not got_tags_4, "No tags expected"
with reporter.step("Get object"):
object_3 = s3_client.get_object(bucket, file_name_3, full_output=True)
assert object_3.get("VersionId") == version_id_2, f"get object with version {version_id_2}"
object_3 = s3_client.get_object(bucket, file_name_3)
assert get_file_hash(file_path_4) == get_file_hash(object_3), "Hashes must be the same"
with reporter.step("Get first version of object"):
object_4 = s3_client.get_object(bucket, file_name_3, version_id_1, full_output=True)
assert object_4.get("VersionId") == version_id_1, f"get object with version {version_id_1}"
object_4 = s3_client.get_object(bucket, file_name_3, version_id_1)
assert file_hash == get_file_hash(object_4), "Hashes must be the same"
obj_head_3 = s3_client.head_object(bucket, file_name_3, version_id_1)
assert obj_head_3.get("Metadata") == object_3_metadata, "Metadata must be the same"
got_tags_3 = s3_client.get_object_tagging(bucket, file_name_3, version_id_1)
assert got_tags_3, f"Expected tags, got {got_tags_3}"
assert got_tags_3 == [{"Key": tag_key_3, "Value": str(tag_value_3)}], "Tags must be the same"
@allure.title("Put object with ACL (versioning={bucket_versioning}, s3_client={s3_client})")
@pytest.mark.parametrize("bucket_versioning", ["ENABLED", "SUSPENDED"])
def test_s3_put_object_acl(
self,
s3_client: S3ClientWrapper,
bucket_versioning: Literal["ENABLED", "SUSPENDED"],
bucket: str,
complex_object_size: ObjectSize,
simple_object_size: ObjectSize,
second_wallet_public_key: str,
):
file_path = generate_file(complex_object_size.value)
file_name = s3_helper.object_key_from_file_path(file_path)
s3_helper.set_bucket_versioning(s3_client, bucket, VersioningStatus[bucket_versioning])
with reporter.step("Put object with acl private"):
s3_client.put_object(bucket, file_path, acl="private")
object_grants = s3_client.get_object_acl(bucket, file_name)
s3_helper.verify_acl_permissions(object_grants, PRIVATE_GRANTS)
object = s3_client.get_object(bucket, file_name)
assert get_file_hash(file_path) == get_file_hash(object), "Hashes must be the same"
with reporter.step("[NEGATIVE] Put object with acl public-read"):
generate_file_with_content(simple_object_size.value, file_path)
with pytest.raises(Exception, match=S3_BUCKET_DOES_NOT_ALLOW_ACL):
s3_client.put_object(bucket, file_path, acl="public-read")
with reporter.step("[NEGATIVE] Put object with acl public-read-write"):
generate_file_with_content(simple_object_size.value, file_path)
with pytest.raises(Exception, match=S3_BUCKET_DOES_NOT_ALLOW_ACL):
s3_client.put_object(bucket, file_path, acl="public-read-write")
with reporter.step("[NEGATIVE] Put object with --grant-full-control id=mycanonicaluserid"):
with pytest.raises(Exception, match=S3_BUCKET_DOES_NOT_ALLOW_ACL):
s3_client.put_object(bucket, file_path, grant_full_control=f"id={second_wallet_public_key}")
with reporter.step(
"[NEGATIVE] Put object with --grant-read uri=http://acs.amazonaws.com/groups/global/AllUsers"
):
with pytest.raises(Exception, match=S3_BUCKET_DOES_NOT_ALLOW_ACL):
s3_client.put_object(
bucket, file_path, grant_read="uri=http://acs.amazonaws.com/groups/global/AllUsers"
)
@allure.title("Put object with lock-mode (s3_client={s3_client})")
def test_s3_put_object_lock_mode(
self,
s3_client: S3ClientWrapper,
complex_object_size: ObjectSize,
simple_object_size: ObjectSize,
):
file_path_1 = generate_file(complex_object_size.value)
file_name = s3_helper.object_key_from_file_path(file_path_1)
bucket = s3_client.create_bucket(object_lock_enabled_for_bucket=True)
s3_helper.set_bucket_versioning(s3_client, bucket, VersioningStatus.ENABLED)
with reporter.step("Put object with lock-mode GOVERNANCE lock-retain-until-date +1day, lock-legal-hold-status"):
date_obj = datetime.utcnow() + timedelta(days=1)
s3_client.put_object(
bucket,
file_path_1,
object_lock_mode="GOVERNANCE",
object_lock_retain_until_date=date_obj.strftime("%Y-%m-%dT%H:%M:%S"),
object_lock_legal_hold_status="OFF",
)
s3_helper.assert_object_lock_mode(s3_client, bucket, file_name, "GOVERNANCE", date_obj, "OFF")
with reporter.step(
"Put new version of object with [--object-lock-mode COMPLIANCE] и [--object-lock-retain-until-date +3days]"
):
date_obj = datetime.utcnow() + timedelta(days=2)
generate_file_with_content(simple_object_size.value, file_path=file_path_1)
s3_client.put_object(
bucket,
file_path_1,
object_lock_mode="COMPLIANCE",
object_lock_retain_until_date=date_obj,
)
s3_helper.assert_object_lock_mode(s3_client, bucket, file_name, "COMPLIANCE", date_obj, "OFF")
with reporter.step(
"Put new version of object with [--object-lock-mode COMPLIANCE] и [--object-lock-retain-until-date +2days]"
):
date_obj = datetime.utcnow() + timedelta(days=3)
generate_file_with_content(simple_object_size.value, file_path=file_path_1)
s3_client.put_object(
bucket,
file_path_1,
object_lock_mode="COMPLIANCE",
object_lock_retain_until_date=date_obj,
object_lock_legal_hold_status="ON",
)
s3_helper.assert_object_lock_mode(s3_client, bucket, file_name, "COMPLIANCE", date_obj, "ON")
with reporter.step("Put object with lock-mode"):
with pytest.raises(
Exception,
match=r".*must both be supplied*",
):
# x-amz-object-lock-retain-until-date and x-amz-object-lock-mode must both be supplied
s3_client.put_object(bucket, file_path_1, object_lock_mode="COMPLIANCE")
with reporter.step("Put object with lock-mode and past date"):
date_obj = datetime.utcnow() - timedelta(days=3)
with pytest.raises(
Exception,
match=r".*until date must be in the future*",
):
# The retain until date must be in the future
s3_client.put_object(
bucket,
file_path_1,
object_lock_mode="COMPLIANCE",
object_lock_retain_until_date=date_obj,
)
@allure.title("Delete object & delete objects (s3_client={s3_client})")
def test_s3_api_delete(
self,
s3_client: S3ClientWrapper,
two_buckets: list[str],
simple_object_size: ObjectSize,
complex_object_size: ObjectSize,
):
"""
Check delete_object and delete_objects S3 API operation. From first bucket some objects deleted one by one.
From second bucket some objects deleted all at once.
"""
max_obj_count = 20
max_delete_objects = 17
put_objects = []
file_paths = []
obj_sizes = [simple_object_size, complex_object_size]
bucket_1, bucket_2 = two_buckets
with reporter.step(f"Generate {max_obj_count} files"):
for _ in range(max_obj_count):
test_file = generate_file(random.choice(obj_sizes).value)
file_paths.append(test_file)
put_objects.append(s3_helper.object_key_from_file_path(test_file.path))
for i, bucket in enumerate([bucket_1, bucket_2], 1):
with reporter.step(f"Put {max_obj_count} objects into bucket_{i}"):
for file_path in file_paths:
s3_client.put_object(bucket, file_path)
with reporter.step(f"Check all objects put in bucket_{i} successfully"):
bucket_objects = s3_client.list_objects_v2(bucket)
assert set(put_objects) == set(
bucket_objects
), f"Expected all objects {put_objects} in objects list {bucket_objects}"
with reporter.step("Delete some objects from bucket_1 one by one"):
objects_to_delete_b1 = random.sample(put_objects, k=max_delete_objects)
for obj in objects_to_delete_b1:
s3_client.delete_object(bucket_1, obj)
with reporter.step("Check deleted objects are not visible in bucket bucket_1"):
bucket_objects = s3_client.list_objects_v2(bucket_1)
assert set(put_objects).difference(set(objects_to_delete_b1)) == set(
bucket_objects
), f"Expected all objects {put_objects} in objects list {bucket_objects}"
for object_key in objects_to_delete_b1:
with pytest.raises(Exception, match="The specified key does not exist"):
s3_client.get_object(bucket_1, object_key)
with reporter.step("Delete some objects from bucket_2 at once"):
objects_to_delete_b2 = random.sample(put_objects, k=max_delete_objects)
s3_client.delete_objects(bucket_2, objects_to_delete_b2)
with reporter.step("Check deleted objects are not visible in bucket bucket_2"):
objects_list = s3_client.list_objects_v2(bucket_2)
assert set(put_objects).difference(set(objects_to_delete_b2)) == set(
objects_list
), f"Expected all objects {put_objects} in objects list {bucket_objects}"
for object_key in objects_to_delete_b2:
with pytest.raises(Exception, match="The specified key does not exist"):
s3_client.get_object(bucket_2, object_key)
@allure.title("Sync directory (sync_type={sync_type}, s3_client={s3_client})")
@pytest.mark.parametrize("s3_client", [AwsCliClient], indirect=True)
@pytest.mark.parametrize("sync_type", ["sync", "cp"])
def test_s3_sync_dir(
self,
s3_client: S3ClientWrapper,
sync_type: Literal["sync", "cp"],
bucket: str,
simple_object_size: ObjectSize,
):
test_file_1 = TestFile(os.path.join(os.getcwd(), ASSETS_DIR, "test_sync", "test_file_1"))
test_file_2 = TestFile(os.path.join(os.getcwd(), ASSETS_DIR, "test_sync", "test_file_2"))
object_metadata = {f"{uuid.uuid4()}": f"{uuid.uuid4()}"}
key_to_path = {"test_file_1": test_file_1.path, "test_file_2": test_file_2.path}
generate_file_with_content(simple_object_size.value, test_file_1)
generate_file_with_content(simple_object_size.value, test_file_2)
s3_helper.set_bucket_versioning(s3_client, bucket, VersioningStatus.ENABLED)
if sync_type == "sync":
s3_client.sync(bucket, os.path.dirname(test_file_1), metadata=object_metadata)
elif sync_type == "cp":
s3_client.cp(bucket, os.path.dirname(test_file_1), metadata=object_metadata)
with reporter.step("Check objects are synced"):
objects = s3_client.list_objects(bucket)
assert set(key_to_path.keys()) == set(objects), f"Expected all abjects saved. Got {objects}"
with reporter.step("Check these are the same objects"):
for obj_key in objects:
got_object = s3_client.get_object(bucket, obj_key)
assert get_file_hash(got_object) == get_file_hash(
key_to_path.get(obj_key)
), "Expected hashes are the same"
obj_head = s3_client.head_object(bucket, obj_key)
assert obj_head.get("Metadata") == object_metadata, f"Metadata of object is {object_metadata}"
object_grants = s3_client.get_object_acl(bucket, obj_key)
s3_helper.verify_acl_permissions(object_grants, PRIVATE_GRANTS)
@allure.title("Put 10 nested level object (s3_client={s3_client})")
def test_s3_put_10_folder(
self,
s3_client: S3ClientWrapper,
bucket: str,
temp_directory,
simple_object_size: ObjectSize,
):
path = "/".join(["".join(random.choices(string.ascii_letters, k=3)) for _ in range(10)])
file_path_1 = TestFile(os.path.join(temp_directory, path, "test_file_1"))
generate_file_with_content(simple_object_size.value, file_path=file_path_1)
file_name = s3_helper.object_key_from_file_path(file_path_1)
with reporter.step("Put object"):
s3_client.put_object(bucket, file_path_1)
s3_helper.check_objects_in_bucket(s3_client, bucket, [file_name])
@allure.title("Delete non-existing object from empty bucket (s3_client={s3_client})")
def test_s3_delete_non_existing_object(self, s3_client: S3ClientWrapper, bucket: str):
s3_helper.set_bucket_versioning(s3_client, bucket, VersioningStatus.ENABLED)
obj_key = "fake_object_key"
with reporter.step("Delete non-existing object"):
delete_obj = s3_client.delete_object(bucket, obj_key)
assert "DeleteMarker" not in delete_obj.keys(), "Delete markers should not be created"
objects_list = s3_client.list_objects_versions(bucket)
assert not objects_list, f"Expected empty bucket, got {objects_list}"
@allure.title("Delete the same object twice (s3_client={s3_client})")
def test_s3_delete_twice(self, s3_client: S3ClientWrapper, bucket: str, simple_object_size: ObjectSize):
s3_helper.set_bucket_versioning(s3_client, bucket, VersioningStatus.ENABLED)
file_path = generate_file(simple_object_size.value)
file_name = s3_helper.object_key_from_file_path(file_path)
with reporter.step("Put object into one bucket"):
s3_client.put_object(bucket, file_path)
with reporter.step("Delete the object from the bucket"):
delete_object = s3_client.delete_object(bucket, file_name)
versions = s3_client.list_objects_versions(bucket)
obj_versions = {version.get("VersionId") for version in versions if version.get("Key") == file_name}
assert obj_versions, f"Object versions were not found {versions}"
assert "DeleteMarker" in delete_object.keys(), "Delete markers not found"
with reporter.step("Delete the object from the bucket again"):
delete_object_2nd_attempt = s3_client.delete_object(bucket, file_name)
versions_2nd_attempt = s3_client.list_objects_versions(bucket)
assert delete_object.keys() == delete_object_2nd_attempt.keys(), "Delete markers are not the same"
# check that nothing was changed
# checking here not VersionId only, but all data (for example LastModified)
assert versions == versions_2nd_attempt, "Versions are not the same"