frostfs-testcases/pytest_tests/testsuites/services/s3_gate/test_s3_object.py
Andrey Berezin 873d6e3d14 [#161] Improve logging
Signed-off-by: Andrey Berezin <a.berezin@yadro.com>
2023-11-29 16:34:59 +03:00

894 lines
46 KiB
Python

import os
import string
import uuid
from datetime import datetime, timedelta
from random import choices, sample
from typing import Literal
import allure
import pytest
from frostfs_testlib import reporter
from frostfs_testlib.resources.common import ASSETS_DIR, DEFAULT_WALLET_PASS
from frostfs_testlib.resources.error_patterns import S3_MALFORMED_XML_REQUEST
from frostfs_testlib.s3 import AwsCliClient, S3ClientWrapper, VersioningStatus
from frostfs_testlib.steps.s3 import s3_helper
from frostfs_testlib.storage.dataclasses.object_size import ObjectSize
from frostfs_testlib.testing.test_control import expect_not_raises
from frostfs_testlib.utils import wallet_utils
from frostfs_testlib.utils.file_utils import concat_files, generate_file, generate_file_with_content, get_file_hash
@pytest.mark.s3_gate
@pytest.mark.s3_gate_object
class TestS3GateObject:
@pytest.fixture
def second_wallet_public_key(self):
second_wallet = os.path.join(os.getcwd(), ASSETS_DIR, f"{str(uuid.uuid4())}.json")
wallet_utils.init_wallet(second_wallet, DEFAULT_WALLET_PASS)
public_key = wallet_utils.get_wallet_public_key(second_wallet, DEFAULT_WALLET_PASS)
yield public_key
@allure.title("Copy object (s3_client={s3_client})")
def test_s3_copy_object(
self,
s3_client: S3ClientWrapper,
two_buckets: tuple[str, str],
simple_object_size: ObjectSize,
):
file_path = generate_file(simple_object_size.value)
file_name = s3_helper.object_key_from_file_path(file_path)
bucket_1_objects = [file_name]
bucket_1, bucket_2 = two_buckets
objects_list = s3_client.list_objects(bucket_1)
assert not objects_list, f"Expected empty bucket, got {objects_list}"
with reporter.step("Put object into one bucket"):
s3_client.put_object(bucket_1, file_path)
with reporter.step("Copy one object into the same bucket"):
copy_obj_path = s3_client.copy_object(bucket_1, file_name)
bucket_1_objects.append(copy_obj_path)
s3_helper.check_objects_in_bucket(s3_client, bucket_1, bucket_1_objects)
objects_list = s3_client.list_objects(bucket_2)
assert not objects_list, f"Expected empty bucket, got {objects_list}"
with reporter.step("Copy object from first bucket into second"):
copy_obj_path_b2 = s3_client.copy_object(bucket_1, file_name, bucket=bucket_2)
s3_helper.check_objects_in_bucket(s3_client, bucket_1, expected_objects=bucket_1_objects)
s3_helper.check_objects_in_bucket(s3_client, bucket_2, expected_objects=[copy_obj_path_b2])
with reporter.step("Check copied object has the same content"):
got_copied_file_b2 = s3_client.get_object(bucket_2, copy_obj_path_b2)
assert get_file_hash(file_path) == get_file_hash(got_copied_file_b2), "Hashes must be the same"
with reporter.step("Delete one object from first bucket"):
s3_client.delete_object(bucket_1, file_name)
bucket_1_objects.remove(file_name)
s3_helper.check_objects_in_bucket(s3_client, bucket_1, expected_objects=bucket_1_objects)
s3_helper.check_objects_in_bucket(s3_client, bucket_2, expected_objects=[copy_obj_path_b2])
with reporter.step("Copy one object into the same bucket"):
with pytest.raises(Exception):
s3_client.copy_object(bucket_1, file_name)
@allure.title("Copy version of object (s3_client={s3_client})")
def test_s3_copy_version_object(
self,
s3_client: S3ClientWrapper,
two_buckets: tuple[str, str],
simple_object_size: ObjectSize,
):
version_1_content = "Version 1"
file_name_simple = generate_file_with_content(simple_object_size.value, content=version_1_content)
obj_key = os.path.basename(file_name_simple)
bucket_1, bucket_2 = two_buckets
s3_helper.set_bucket_versioning(s3_client, bucket_1, VersioningStatus.ENABLED)
with reporter.step("Put object into bucket"):
s3_client.put_object(bucket_1, file_name_simple)
bucket_1_objects = [obj_key]
s3_helper.check_objects_in_bucket(s3_client, bucket_1, [obj_key])
with reporter.step("Copy one object into the same bucket"):
copy_obj_path = s3_client.copy_object(bucket_1, obj_key)
bucket_1_objects.append(copy_obj_path)
s3_helper.check_objects_in_bucket(s3_client, bucket_1, bucket_1_objects)
s3_helper.set_bucket_versioning(s3_client, bucket_2, VersioningStatus.ENABLED)
with reporter.step("Copy object from first bucket into second"):
copy_obj_path_b2 = s3_client.copy_object(bucket_1, obj_key, bucket=bucket_2)
s3_helper.check_objects_in_bucket(s3_client, bucket_1, expected_objects=bucket_1_objects)
s3_helper.check_objects_in_bucket(s3_client, bucket_2, expected_objects=[copy_obj_path_b2])
with reporter.step("Delete one object from first bucket and check object in bucket"):
s3_client.delete_object(bucket_1, obj_key)
bucket_1_objects.remove(obj_key)
s3_helper.check_objects_in_bucket(s3_client, bucket_1, expected_objects=bucket_1_objects)
with reporter.step("Copy one object into the same bucket"):
with pytest.raises(Exception):
s3_client.copy_object(bucket_1, obj_key)
@allure.title("Copy with acl (s3_client={s3_client})")
def test_s3_copy_acl(self, s3_client: S3ClientWrapper, bucket: str, simple_object_size: ObjectSize):
version_1_content = "Version 1"
file_name_simple = generate_file_with_content(simple_object_size.value, content=version_1_content)
obj_key = os.path.basename(file_name_simple)
s3_helper.set_bucket_versioning(s3_client, bucket, VersioningStatus.ENABLED)
with reporter.step("Put several versions of object into bucket"):
s3_client.put_object(bucket, file_name_simple)
s3_helper.check_objects_in_bucket(s3_client, bucket, [obj_key])
with reporter.step("Copy object and check acl attribute"):
copy_obj_path = s3_client.copy_object(bucket, obj_key, acl="public-read-write")
obj_acl = s3_client.get_object_acl(bucket, copy_obj_path)
s3_helper.assert_s3_acl(acl_grants=obj_acl, permitted_users="CanonicalUser")
@allure.title("Copy object with metadata (s3_client={s3_client})")
def test_s3_copy_metadate(self, s3_client: S3ClientWrapper, bucket: str, simple_object_size: ObjectSize):
object_metadata = {f"{uuid.uuid4()}": f"{uuid.uuid4()}"}
file_path = generate_file(simple_object_size.value)
file_name = s3_helper.object_key_from_file_path(file_path)
bucket_1_objects = [file_name]
s3_helper.set_bucket_versioning(s3_client, bucket, VersioningStatus.ENABLED)
with reporter.step("Put object into bucket"):
s3_client.put_object(bucket, file_path, metadata=object_metadata)
bucket_1_objects = [file_name]
s3_helper.check_objects_in_bucket(s3_client, bucket, bucket_1_objects)
with reporter.step("Copy one object"):
copy_obj_path = s3_client.copy_object(bucket, file_name)
bucket_1_objects.append(copy_obj_path)
s3_helper.check_objects_in_bucket(s3_client, bucket, bucket_1_objects)
obj_head = s3_client.head_object(bucket, copy_obj_path)
assert obj_head.get("Metadata") == object_metadata, f"Metadata must be {object_metadata}"
with reporter.step("Copy one object with metadata"):
copy_obj_path = s3_client.copy_object(bucket, file_name, metadata_directive="COPY")
bucket_1_objects.append(copy_obj_path)
obj_head = s3_client.head_object(bucket, copy_obj_path)
assert obj_head.get("Metadata") == object_metadata, f"Metadata must be {object_metadata}"
with reporter.step("Copy one object with new metadata"):
object_metadata_1 = {f"{uuid.uuid4()}": f"{uuid.uuid4()}"}
copy_obj_path = s3_client.copy_object(
bucket,
file_name,
metadata_directive="REPLACE",
metadata=object_metadata_1,
)
bucket_1_objects.append(copy_obj_path)
obj_head = s3_client.head_object(bucket, copy_obj_path)
assert obj_head.get("Metadata") == object_metadata_1, f"Metadata must be {object_metadata_1}"
@allure.title("Copy object with tagging (s3_client={s3_client})")
def test_s3_copy_tagging(self, s3_client: S3ClientWrapper, bucket: str, simple_object_size: ObjectSize):
object_tagging = [(f"{uuid.uuid4()}", f"{uuid.uuid4()}")]
file_path = generate_file(simple_object_size.value)
file_name_simple = s3_helper.object_key_from_file_path(file_path)
bucket_1_objects = [file_name_simple]
s3_helper.set_bucket_versioning(s3_client, bucket, VersioningStatus.ENABLED)
with reporter.step("Put several versions of object into bucket"):
s3_client.put_object(bucket, file_path)
s3_client.put_object_tagging(bucket, file_name_simple, tags=object_tagging)
bucket_1_objects = [file_name_simple]
s3_helper.check_objects_in_bucket(s3_client, bucket, bucket_1_objects)
with reporter.step("Copy one object without tag"):
copy_obj_path = s3_client.copy_object(bucket, file_name_simple)
got_tags = s3_client.get_object_tagging(bucket, copy_obj_path)
assert got_tags, f"Expected tags, got {got_tags}"
expected_tags = [{"Key": key, "Value": value} for key, value in object_tagging]
for tag in expected_tags:
assert tag in got_tags, f"Expected tag {tag} in {got_tags}"
with reporter.step("Copy one object with tag"):
copy_obj_path_1 = s3_client.copy_object(bucket, file_name_simple, tagging_directive="COPY")
got_tags = s3_client.get_object_tagging(bucket, copy_obj_path_1)
assert got_tags, f"Expected tags, got {got_tags}"
expected_tags = [{"Key": key, "Value": value} for key, value in object_tagging]
for tag in expected_tags:
assert tag in got_tags, f"Expected tag {tag} in {got_tags}"
with reporter.step("Copy one object with new tag"):
tag_key = "tag1"
tag_value = uuid.uuid4()
new_tag = f"{tag_key}={tag_value}"
copy_obj_path = s3_client.copy_object(
bucket,
file_name_simple,
tagging_directive="REPLACE",
tagging=new_tag,
)
got_tags = s3_client.get_object_tagging(bucket, copy_obj_path)
assert got_tags, f"Expected tags, got {got_tags}"
expected_tags = [{"Key": tag_key, "Value": str(tag_value)}]
for tag in expected_tags:
assert tag in got_tags, f"Expected tag {tag} in {got_tags}"
@allure.title("Delete version of object (s3_client={s3_client})")
def test_s3_delete_versioning(
self,
s3_client: S3ClientWrapper,
bucket: str,
simple_object_size: ObjectSize,
complex_object_size: ObjectSize,
):
version_1_content = "Version 1"
version_2_content = "Version 2"
file_name_simple = generate_file_with_content(simple_object_size.value, content=version_1_content)
obj_key = os.path.basename(file_name_simple)
s3_helper.set_bucket_versioning(s3_client, bucket, VersioningStatus.ENABLED)
with reporter.step("Put several versions of object into bucket"):
version_id_1 = s3_client.put_object(bucket, file_name_simple)
file_name_1 = generate_file_with_content(
simple_object_size.value, file_path=file_name_simple, content=version_2_content
)
version_id_2 = s3_client.put_object(bucket, file_name_1)
with reporter.step("Check bucket shows all versions"):
versions = s3_client.list_objects_versions(bucket)
obj_versions = {version.get("VersionId") for version in versions if version.get("Key") == obj_key}
assert obj_versions == {
version_id_1,
version_id_2,
}, f"Object should have versions: {version_id_1, version_id_2}"
with reporter.step("Delete 1 version of object"):
delete_obj = s3_client.delete_object(bucket, obj_key, version_id=version_id_1)
versions = s3_client.list_objects_versions(bucket)
obj_versions = {version.get("VersionId") for version in versions if version.get("Key") == obj_key}
assert obj_versions == {version_id_2}, f"Object should have versions: {version_id_2}"
assert "DeleteMarker" not in delete_obj.keys(), "Delete markers should not be created"
with reporter.step("Delete second version of object"):
delete_obj = s3_client.delete_object(bucket, obj_key, version_id=version_id_2)
versions = s3_client.list_objects_versions(bucket)
obj_versions = {version.get("VersionId") for version in versions if version.get("Key") == obj_key}
assert not obj_versions, "Expected object not found"
assert "DeleteMarker" not in delete_obj.keys(), "Delete markers should not be created"
with reporter.step("Put new object into bucket"):
file_name_simple = generate_file(complex_object_size.value)
obj_key = os.path.basename(file_name_simple)
s3_client.put_object(bucket, file_name_simple)
with reporter.step("Delete last object"):
delete_obj = s3_client.delete_object(bucket, obj_key)
versions = s3_client.list_objects_versions(bucket, True)
assert versions.get("DeleteMarkers", None), "Expected delete Marker"
assert "DeleteMarker" in delete_obj.keys(), "Expected delete Marker"
@allure.title("Bulk delete version of object (s3_client={s3_client})")
def test_s3_bulk_delete_versioning(self, s3_client: S3ClientWrapper, bucket: str, simple_object_size: ObjectSize):
version_1_content = "Version 1"
version_2_content = "Version 2"
version_3_content = "Version 3"
version_4_content = "Version 4"
file_name_1 = generate_file_with_content(simple_object_size.value, content=version_1_content)
obj_key = os.path.basename(file_name_1)
s3_helper.set_bucket_versioning(s3_client, bucket, VersioningStatus.ENABLED)
with reporter.step("Put several versions of object into bucket"):
version_id_1 = s3_client.put_object(bucket, file_name_1)
file_name_2 = generate_file_with_content(
simple_object_size.value, file_path=file_name_1, content=version_2_content
)
version_id_2 = s3_client.put_object(bucket, file_name_2)
file_name_3 = generate_file_with_content(
simple_object_size.value, file_path=file_name_1, content=version_3_content
)
version_id_3 = s3_client.put_object(bucket, file_name_3)
file_name_4 = generate_file_with_content(
simple_object_size.value, file_path=file_name_1, content=version_4_content
)
version_id_4 = s3_client.put_object(bucket, file_name_4)
version_ids = {version_id_1, version_id_2, version_id_3, version_id_4}
with reporter.step("Check bucket shows all versions"):
versions = s3_client.list_objects_versions(bucket)
obj_versions = {version.get("VersionId") for version in versions if version.get("Key") == obj_key}
assert obj_versions == version_ids, f"Object should have versions: {version_ids}"
with reporter.step("Delete two objects from bucket one by one"):
version_to_delete_b1 = sample([version_id_1, version_id_2, version_id_3, version_id_4], k=2)
version_to_save = list(set(version_ids) - set(version_to_delete_b1))
for ver in version_to_delete_b1:
s3_client.delete_object(bucket, obj_key, ver)
with reporter.step("Check bucket shows all versions"):
versions = s3_client.list_objects_versions(bucket)
obj_versions = [version.get("VersionId") for version in versions if version.get("Key") == obj_key]
assert obj_versions.sort() == version_to_save.sort(), f"Object should have versions: {version_to_save}"
@allure.title("Get versions of object (s3_client={s3_client})")
def test_s3_get_versioning(self, s3_client: S3ClientWrapper, bucket: str, simple_object_size: ObjectSize):
version_1_content = "Version 1"
version_2_content = "Version 2"
file_name_simple = generate_file_with_content(simple_object_size.value, content=version_1_content)
obj_key = os.path.basename(file_name_simple)
s3_helper.set_bucket_versioning(s3_client, bucket, VersioningStatus.ENABLED)
with reporter.step("Put several versions of object into bucket"):
version_id_1 = s3_client.put_object(bucket, file_name_simple)
file_name_1 = generate_file_with_content(
simple_object_size.value, file_path=file_name_simple, content=version_2_content
)
version_id_2 = s3_client.put_object(bucket, file_name_1)
with reporter.step("Get first version of object"):
object_1 = s3_client.get_object(bucket, obj_key, version_id_1, full_output=True)
assert object_1.get("VersionId") == version_id_1, f"Get object with version {version_id_1}"
with reporter.step("Get second version of object"):
object_2 = s3_client.get_object(bucket, obj_key, version_id_2, full_output=True)
assert object_2.get("VersionId") == version_id_2, f"Get object with version {version_id_2}"
with reporter.step("Get object"):
object_3 = s3_client.get_object(bucket, obj_key, full_output=True)
assert object_3.get("VersionId") == version_id_2, f"Get object with version {version_id_2}"
@allure.title("Get range (s3_client={s3_client})")
def test_s3_get_range(
self,
s3_client: S3ClientWrapper,
bucket: str,
complex_object_size: ObjectSize,
simple_object_size: ObjectSize,
):
file_path = generate_file(complex_object_size.value)
file_name = s3_helper.object_key_from_file_path(file_path)
file_hash = get_file_hash(file_path)
s3_helper.set_bucket_versioning(s3_client, bucket, VersioningStatus.ENABLED)
with reporter.step("Put several versions of object into bucket"):
version_id_1 = s3_client.put_object(bucket, file_path)
file_name_1 = generate_file_with_content(simple_object_size.value, file_path=file_path)
version_id_2 = s3_client.put_object(bucket, file_name_1)
with reporter.step("Get first version of object"):
object_1_part_1 = s3_client.get_object(
bucket,
file_name,
version_id_1,
object_range=[0, int(complex_object_size.value / 3)],
)
object_1_part_2 = s3_client.get_object(
bucket,
file_name,
version_id_1,
object_range=[
int(complex_object_size.value / 3) + 1,
2 * int(complex_object_size.value / 3),
],
)
object_1_part_3 = s3_client.get_object(
bucket,
file_name,
version_id_1,
object_range=[
2 * int(complex_object_size.value / 3) + 1,
complex_object_size.value,
],
)
con_file = concat_files([object_1_part_1, object_1_part_2, object_1_part_3])
assert get_file_hash(con_file) == file_hash, "Hashes must be the same"
with reporter.step("Get second version of object"):
object_2_part_1 = s3_client.get_object(
bucket,
file_name,
version_id_2,
object_range=[0, int(simple_object_size.value / 3)],
)
object_2_part_2 = s3_client.get_object(
bucket,
file_name,
version_id_2,
object_range=[
int(simple_object_size.value / 3) + 1,
2 * int(simple_object_size.value / 3),
],
)
object_2_part_3 = s3_client.get_object(
bucket,
file_name,
version_id_2,
object_range=[2 * int(simple_object_size.value / 3) + 1, simple_object_size.value],
)
con_file_1 = concat_files([object_2_part_1, object_2_part_2, object_2_part_3])
assert get_file_hash(con_file_1) == get_file_hash(file_name_1), "Hashes must be the same"
with reporter.step("Get object"):
object_3_part_1 = s3_client.get_object(
bucket, file_name, object_range=[0, int(simple_object_size.value / 3)]
)
object_3_part_2 = s3_client.get_object(
bucket,
file_name,
object_range=[
int(simple_object_size.value / 3) + 1,
2 * int(simple_object_size.value / 3),
],
)
object_3_part_3 = s3_client.get_object(
bucket,
file_name,
object_range=[2 * int(simple_object_size.value / 3) + 1, simple_object_size.value],
)
con_file = concat_files([object_3_part_1, object_3_part_2, object_3_part_3])
assert get_file_hash(con_file) == get_file_hash(file_name_1), "Hashes must be the same"
def copy_extend_list(self, original_list: list[str], n: int) -> list[str]:
"""Extend the list with own elements up to n elements"""
multiplier = n // len(original_list)
result_list = original_list.copy()
result_list = result_list * multiplier
for i in range(n - len(result_list)):
result_list.append(result_list[i])
return result_list
@allure.title("Bulk deletion is limited to 1000 objects (s3_client={s3_client})")
def test_s3_bulk_deletion_limit(
self,
s3_client: S3ClientWrapper,
bucket: str,
simple_object_size: ObjectSize,
):
objects_in_bucket = []
objects_count = 3
with reporter.step(f"Put {objects_count} into bucket"):
for _ in range(objects_count):
file_path = generate_file(simple_object_size.value)
file_name = s3_helper.object_key_from_file_path(file_path)
objects_in_bucket.append(file_name)
s3_client.put_object(bucket, file_path)
# Extend deletion list to 1001 elements with same keys for test speed
objects_to_delete = self.copy_extend_list(objects_in_bucket, 1001)
with reporter.step("Send delete request with 1001 objects and expect error"):
with pytest.raises(Exception, match=S3_MALFORMED_XML_REQUEST):
s3_client.delete_objects(bucket, objects_to_delete)
with reporter.step("Send delete request with 1000 objects without error"):
with expect_not_raises():
s3_client.delete_objects(bucket, objects_to_delete[:1000])
@allure.title("Object head is unloaded with the correct version (s3_client={s3_client})")
@pytest.mark.smoke
def test_s3_head_object(
self,
s3_client: S3ClientWrapper,
bucket: str,
complex_object_size: ObjectSize,
simple_object_size: ObjectSize,
):
object_metadata = {f"{uuid.uuid4()}": f"{uuid.uuid4()}"}
file_path = generate_file(complex_object_size.value)
file_name = s3_helper.object_key_from_file_path(file_path)
s3_helper.set_bucket_versioning(s3_client, bucket, VersioningStatus.ENABLED)
with reporter.step("Put several versions of object into bucket"):
version_id_1 = s3_client.put_object(bucket, file_path, metadata=object_metadata)
file_name_1 = generate_file_with_content(simple_object_size.value, file_path=file_path)
version_id_2 = s3_client.put_object(bucket, file_name_1)
with reporter.step("Get head of first version of object"):
response = s3_client.head_object(bucket, file_name)
assert "LastModified" in response, "Expected LastModified field"
assert "ETag" in response, "Expected ETag field"
assert response.get("Metadata") == {}, "Expected Metadata empty"
assert response.get("VersionId") == version_id_2, f"Expected VersionId is {version_id_2}"
assert response.get("ContentLength") != 0, "Expected ContentLength is not zero"
with reporter.step("Get head ob first version of object"):
response = s3_client.head_object(bucket, file_name, version_id=version_id_1)
assert "LastModified" in response, "Expected LastModified field"
assert "ETag" in response, "Expected ETag field"
assert response.get("Metadata") == object_metadata, f"Expected Metadata is {object_metadata}"
assert response.get("VersionId") == version_id_1, f"Expected VersionId is {version_id_1}"
assert response.get("ContentLength") != 0, "Expected ContentLength is not zero"
@allure.title("List of objects with version (method_version={list_type}, s3_client={s3_client})")
@pytest.mark.parametrize("list_type", ["v1", "v2"])
def test_s3_list_object(
self,
s3_client: S3ClientWrapper,
list_type: str,
bucket: str,
complex_object_size: ObjectSize,
):
file_path_1 = generate_file(complex_object_size.value)
file_name = s3_helper.object_key_from_file_path(file_path_1)
file_path_2 = generate_file(complex_object_size.value)
file_name_2 = s3_helper.object_key_from_file_path(file_path_2)
s3_helper.set_bucket_versioning(s3_client, bucket, VersioningStatus.ENABLED)
with reporter.step("Put several versions of object into bucket"):
s3_client.put_object(bucket, file_path_1)
s3_client.put_object(bucket, file_path_2)
with reporter.step("Get list of object"):
if list_type == "v1":
list_obj = s3_client.list_objects(bucket)
elif list_type == "v2":
list_obj = s3_client.list_objects_v2(bucket)
assert len(list_obj) == 2, "bucket should have 2 objects"
assert (
list_obj.sort() == [file_name, file_name_2].sort()
), f"bucket should have object key {file_name, file_name_2}"
with reporter.step("Delete object"):
delete_obj = s3_client.delete_object(bucket, file_name)
if list_type == "v1":
list_obj_1 = s3_client.list_objects(bucket, full_output=True)
elif list_type == "v2":
list_obj_1 = s3_client.list_objects_v2(bucket, full_output=True)
contents = list_obj_1.get("Contents", [])
assert len(contents) == 1, "bucket should have only 1 object"
assert contents[0].get("Key") == file_name_2, f"bucket should have object key {file_name_2}"
assert "DeleteMarker" in delete_obj.keys(), "Expected delete Marker"
@allure.title("Put object (s3_client={s3_client})")
def test_s3_put_object(
self,
s3_client: S3ClientWrapper,
bucket: str,
complex_object_size: ObjectSize,
simple_object_size: ObjectSize,
):
file_path_1 = generate_file(complex_object_size.value)
file_name = s3_helper.object_key_from_file_path(file_path_1)
object_1_metadata = {f"{uuid.uuid4()}": f"{uuid.uuid4()}"}
tag_key_1 = "tag1"
tag_value_1 = uuid.uuid4()
tag_1 = f"{tag_key_1}={tag_value_1}"
object_2_metadata = {f"{uuid.uuid4()}": f"{uuid.uuid4()}"}
tag_key_2 = "tag2"
tag_value_2 = uuid.uuid4()
tag_2 = f"{tag_key_2}={tag_value_2}"
s3_helper.set_bucket_versioning(s3_client, bucket, VersioningStatus.SUSPENDED)
with reporter.step("Put first object into bucket"):
s3_client.put_object(bucket, file_path_1, metadata=object_1_metadata, tagging=tag_1)
obj_head = s3_client.head_object(bucket, file_name)
assert obj_head.get("Metadata") == object_1_metadata, "Metadata must be the same"
got_tags = s3_client.get_object_tagging(bucket, file_name)
assert got_tags, f"Expected tags, got {got_tags}"
assert got_tags == [{"Key": tag_key_1, "Value": str(tag_value_1)}], "Tags must be the same"
with reporter.step("Rewrite file into bucket"):
file_path_2 = generate_file_with_content(simple_object_size.value, file_path=file_path_1)
s3_client.put_object(bucket, file_path_2, metadata=object_2_metadata, tagging=tag_2)
obj_head = s3_client.head_object(bucket, file_name)
assert obj_head.get("Metadata") == object_2_metadata, "Metadata must be the same"
got_tags_1 = s3_client.get_object_tagging(bucket, file_name)
assert got_tags_1, f"Expected tags, got {got_tags_1}"
assert got_tags_1 == [{"Key": tag_key_2, "Value": str(tag_value_2)}], "Tags must be the same"
s3_helper.set_bucket_versioning(s3_client, bucket, VersioningStatus.ENABLED)
file_path_3 = generate_file(complex_object_size.value)
file_hash = get_file_hash(file_path_3)
file_name_3 = s3_helper.object_key_from_file_path(file_path_3)
object_3_metadata = {f"{uuid.uuid4()}": f"{uuid.uuid4()}"}
tag_key_3 = "tag3"
tag_value_3 = uuid.uuid4()
tag_3 = f"{tag_key_3}={tag_value_3}"
with reporter.step("Put third object into bucket"):
version_id_1 = s3_client.put_object(bucket, file_path_3, metadata=object_3_metadata, tagging=tag_3)
obj_head_3 = s3_client.head_object(bucket, file_name_3)
assert obj_head_3.get("Metadata") == object_3_metadata, "Matadata must be the same"
got_tags_3 = s3_client.get_object_tagging(bucket, file_name_3)
assert got_tags_3, f"Expected tags, got {got_tags_3}"
assert got_tags_3 == [{"Key": tag_key_3, "Value": str(tag_value_3)}], "Tags must be the same"
with reporter.step("Put new version of file into bucket"):
file_path_4 = generate_file_with_content(simple_object_size.value, file_path=file_path_3)
version_id_2 = s3_client.put_object(bucket, file_path_4)
versions = s3_client.list_objects_versions(bucket)
obj_versions = {version.get("VersionId") for version in versions if version.get("Key") == file_name_3}
assert obj_versions == {
version_id_1,
version_id_2,
}, f"Object should have versions: {version_id_1, version_id_2}"
got_tags_4 = s3_client.get_object_tagging(bucket, file_name_3)
assert not got_tags_4, "No tags expected"
with reporter.step("Get object"):
object_3 = s3_client.get_object(bucket, file_name_3, full_output=True)
assert object_3.get("VersionId") == version_id_2, f"get object with version {version_id_2}"
object_3 = s3_client.get_object(bucket, file_name_3)
assert get_file_hash(file_path_4) == get_file_hash(object_3), "Hashes must be the same"
with reporter.step("Get first version of object"):
object_4 = s3_client.get_object(bucket, file_name_3, version_id_1, full_output=True)
assert object_4.get("VersionId") == version_id_1, f"get object with version {version_id_1}"
object_4 = s3_client.get_object(bucket, file_name_3, version_id_1)
assert file_hash == get_file_hash(object_4), "Hashes must be the same"
obj_head_3 = s3_client.head_object(bucket, file_name_3, version_id_1)
assert obj_head_3.get("Metadata") == object_3_metadata, "Metadata must be the same"
got_tags_3 = s3_client.get_object_tagging(bucket, file_name_3, version_id_1)
assert got_tags_3, f"Expected tags, got {got_tags_3}"
assert got_tags_3 == [{"Key": tag_key_3, "Value": str(tag_value_3)}], "Tags must be the same"
@allure.title("Put object with ACL (versioning={bucket_versioning}, s3_client={s3_client})")
@pytest.mark.parametrize("bucket_versioning", ["ENABLED", "SUSPENDED"])
def test_s3_put_object_acl(
self,
s3_client: S3ClientWrapper,
bucket_versioning: Literal["ENABLED", "SUSPENDED"],
bucket: str,
complex_object_size: ObjectSize,
simple_object_size: ObjectSize,
second_wallet_public_key: str,
):
file_path_1 = generate_file(complex_object_size.value)
file_name = s3_helper.object_key_from_file_path(file_path_1)
if bucket_versioning == "ENABLED":
status = VersioningStatus.ENABLED
elif bucket_versioning == "SUSPENDED":
status = VersioningStatus.SUSPENDED
s3_helper.set_bucket_versioning(s3_client, bucket, status)
with reporter.step("Put object with acl private"):
s3_client.put_object(bucket, file_path_1, acl="private")
obj_acl = s3_client.get_object_acl(bucket, file_name)
s3_helper.assert_s3_acl(acl_grants=obj_acl, permitted_users="CanonicalUser")
object_1 = s3_client.get_object(bucket, file_name)
assert get_file_hash(file_path_1) == get_file_hash(object_1), "Hashes must be the same"
with reporter.step("Put object with acl public-read"):
file_path_2 = generate_file_with_content(simple_object_size.value, file_path=file_path_1)
s3_client.put_object(bucket, file_path_2, acl="public-read")
obj_acl = s3_client.get_object_acl(bucket, file_name)
s3_helper.assert_s3_acl(acl_grants=obj_acl, permitted_users="AllUsers")
object_2 = s3_client.get_object(bucket, file_name)
assert get_file_hash(file_path_2) == get_file_hash(object_2), "Hashes must be the same"
with reporter.step("Put object with acl public-read-write"):
file_path_3 = generate_file_with_content(simple_object_size.value, file_path=file_path_1)
s3_client.put_object(bucket, file_path_3, acl="public-read-write")
obj_acl = s3_client.get_object_acl(bucket, file_name)
s3_helper.assert_s3_acl(acl_grants=obj_acl, permitted_users="AllUsers")
object_3 = s3_client.get_object(bucket, file_name)
assert get_file_hash(file_path_3) == get_file_hash(object_3), "Hashes must be the same"
with reporter.step("Put object with acl authenticated-read"):
file_path_4 = generate_file_with_content(simple_object_size.value, file_path=file_path_1)
s3_client.put_object(bucket, file_path_4, acl="authenticated-read")
obj_acl = s3_client.get_object_acl(bucket, file_name)
s3_helper.assert_s3_acl(acl_grants=obj_acl, permitted_users="AllUsers")
object_4 = s3_client.get_object(bucket, file_name)
assert get_file_hash(file_path_4) == get_file_hash(object_4), "Hashes must be the same"
file_path_5 = generate_file(complex_object_size.value)
file_name_5 = s3_helper.object_key_from_file_path(file_path_5)
with reporter.step("Put object with --grant-full-control id=mycanonicaluserid"):
generate_file_with_content(simple_object_size.value, file_path=file_path_5)
s3_client.put_object(
bucket,
file_path_5,
grant_full_control=f"id={second_wallet_public_key}",
)
obj_acl = s3_client.get_object_acl(bucket, file_name_5)
s3_helper.assert_s3_acl(acl_grants=obj_acl, permitted_users="CanonicalUser")
object_5 = s3_client.get_object(bucket, file_name_5)
assert get_file_hash(file_path_5) == get_file_hash(object_5), "Hashes must be the same"
with reporter.step("Put object with --grant-read uri=http://acs.amazonaws.com/groups/global/AllUsers"):
generate_file_with_content(simple_object_size.value, file_path=file_path_5)
s3_client.put_object(
bucket,
file_path_5,
grant_read="uri=http://acs.amazonaws.com/groups/global/AllUsers",
)
obj_acl = s3_client.get_object_acl(bucket, file_name_5)
s3_helper.assert_s3_acl(acl_grants=obj_acl, permitted_users="AllUsers")
object_6 = s3_client.get_object(bucket, file_name_5)
assert get_file_hash(file_path_5) == get_file_hash(object_6), "Hashes must be the same"
@allure.title("Put object with lock-mode (s3_client={s3_client})")
def test_s3_put_object_lock_mode(
self,
s3_client: S3ClientWrapper,
complex_object_size: ObjectSize,
simple_object_size: ObjectSize,
):
file_path_1 = generate_file(complex_object_size.value)
file_name = s3_helper.object_key_from_file_path(file_path_1)
bucket = s3_client.create_bucket(object_lock_enabled_for_bucket=True)
s3_helper.set_bucket_versioning(s3_client, bucket, VersioningStatus.ENABLED)
with reporter.step("Put object with lock-mode GOVERNANCE lock-retain-until-date +1day, lock-legal-hold-status"):
date_obj = datetime.utcnow() + timedelta(days=1)
s3_client.put_object(
bucket,
file_path_1,
object_lock_mode="GOVERNANCE",
object_lock_retain_until_date=date_obj.strftime("%Y-%m-%dT%H:%M:%S"),
object_lock_legal_hold_status="OFF",
)
s3_helper.assert_object_lock_mode(s3_client, bucket, file_name, "GOVERNANCE", date_obj, "OFF")
with reporter.step(
"Put new version of object with [--object-lock-mode COMPLIANCE] и [--object-lock-retain-until-date +3days]"
):
date_obj = datetime.utcnow() + timedelta(days=2)
generate_file_with_content(simple_object_size.value, file_path=file_path_1)
s3_client.put_object(
bucket,
file_path_1,
object_lock_mode="COMPLIANCE",
object_lock_retain_until_date=date_obj,
)
s3_helper.assert_object_lock_mode(s3_client, bucket, file_name, "COMPLIANCE", date_obj, "OFF")
with reporter.step(
"Put new version of object with [--object-lock-mode COMPLIANCE] и [--object-lock-retain-until-date +2days]"
):
date_obj = datetime.utcnow() + timedelta(days=3)
generate_file_with_content(simple_object_size.value, file_path=file_path_1)
s3_client.put_object(
bucket,
file_path_1,
object_lock_mode="COMPLIANCE",
object_lock_retain_until_date=date_obj,
object_lock_legal_hold_status="ON",
)
s3_helper.assert_object_lock_mode(s3_client, bucket, file_name, "COMPLIANCE", date_obj, "ON")
with reporter.step("Put object with lock-mode"):
with pytest.raises(
Exception,
match=r".*must both be supplied*",
):
# x-amz-object-lock-retain-until-date and x-amz-object-lock-mode must both be supplied
s3_client.put_object(bucket, file_path_1, object_lock_mode="COMPLIANCE")
with reporter.step("Put object with lock-mode and past date"):
date_obj = datetime.utcnow() - timedelta(days=3)
with pytest.raises(
Exception,
match=r".*until date must be in the future*",
):
# The retain until date must be in the future
s3_client.put_object(
bucket,
file_path_1,
object_lock_mode="COMPLIANCE",
object_lock_retain_until_date=date_obj,
)
@allure.title("Sync directory (sync_type={sync_type}, s3_client={s3_client})")
@pytest.mark.parametrize("s3_client", [AwsCliClient], indirect=True)
@pytest.mark.parametrize("sync_type", ["sync", "cp"])
def test_s3_sync_dir(
self,
s3_client: S3ClientWrapper,
sync_type: Literal["sync", "cp"],
bucket: str,
simple_object_size: ObjectSize,
):
file_path_1 = os.path.join(os.getcwd(), ASSETS_DIR, "test_sync", "test_file_1")
file_path_2 = os.path.join(os.getcwd(), ASSETS_DIR, "test_sync", "test_file_2")
object_metadata = {f"{uuid.uuid4()}": f"{uuid.uuid4()}"}
key_to_path = {"test_file_1": file_path_1, "test_file_2": file_path_2}
generate_file_with_content(simple_object_size.value, file_path=file_path_1)
generate_file_with_content(simple_object_size.value, file_path=file_path_2)
s3_helper.set_bucket_versioning(s3_client, bucket, VersioningStatus.ENABLED)
# TODO: return ACL, when https://github.com/nspcc-dev/neofs-s3-gw/issues/685 will be closed
if sync_type == "sync":
s3_client.sync(
bucket=bucket,
dir_path=os.path.dirname(file_path_1),
# acl="public-read-write",
metadata=object_metadata,
)
elif sync_type == "cp":
s3_client.cp(
bucket=bucket,
dir_path=os.path.dirname(file_path_1),
# acl="public-read-write",
metadata=object_metadata,
)
with reporter.step("Check objects are synced"):
objects = s3_client.list_objects(bucket)
assert set(key_to_path.keys()) == set(objects), f"Expected all abjects saved. Got {objects}"
with reporter.step("Check these are the same objects"):
for obj_key in objects:
got_object = s3_client.get_object(bucket, obj_key)
assert get_file_hash(got_object) == get_file_hash(
key_to_path.get(obj_key)
), "Expected hashes are the same"
obj_head = s3_client.head_object(bucket, obj_key)
assert obj_head.get("Metadata") == object_metadata, f"Metadata of object is {object_metadata}"
# Uncomment after https://github.com/nspcc-dev/neofs-s3-gw/issues/685 is solved
# obj_acl = s3_client.get_object_acl(bucket, obj_key)
# s3_helper.assert_s3_acl(acl_grants = obj_acl, permitted_users = "AllUsers")
@allure.title("Put 10 nested level object (s3_client={s3_client})")
def test_s3_put_10_folder(
self,
s3_client: S3ClientWrapper,
bucket: str,
temp_directory,
simple_object_size: ObjectSize,
):
path = "/".join(["".join(choices(string.ascii_letters, k=3)) for _ in range(10)])
file_path_1 = os.path.join(temp_directory, path, "test_file_1")
generate_file_with_content(simple_object_size.value, file_path=file_path_1)
file_name = s3_helper.object_key_from_file_path(file_path_1)
objects_list = s3_client.list_objects(bucket)
assert not objects_list, f"Expected empty bucket, got {objects_list}"
with reporter.step("Put object"):
s3_client.put_object(bucket, file_path_1)
s3_helper.check_objects_in_bucket(s3_client, bucket, [file_name])
@allure.title("Delete non-existing object from empty bucket (s3_client={s3_client})")
def test_s3_delete_non_existing_object(self, s3_client: S3ClientWrapper, bucket: str):
s3_helper.set_bucket_versioning(s3_client, bucket, VersioningStatus.ENABLED)
objects_list = s3_client.list_objects_versions(bucket)
with reporter.step("Check that bucket is empty"):
assert not objects_list, f"Expected empty bucket, got {objects_list}"
obj_key = "fake_object_key"
with reporter.step("Delete non-existing object"):
delete_obj = s3_client.delete_object(bucket, obj_key)
# there should be no objects or delete markers in the bucket
assert "DeleteMarker" not in delete_obj.keys(), "Delete markers should not be created"
objects_list = s3_client.list_objects_versions(bucket)
assert not objects_list, f"Expected empty bucket, got {objects_list}"
@allure.title("Delete the same object twice (s3_client={s3_client})")
def test_s3_delete_twice(self, s3_client: S3ClientWrapper, bucket: str, simple_object_size: ObjectSize):
s3_helper.set_bucket_versioning(s3_client, bucket, VersioningStatus.ENABLED)
objects_list = s3_client.list_objects(bucket)
with reporter.step("Check that bucket is empty"):
assert not objects_list, f"Expected empty bucket, got {objects_list}"
file_path = generate_file(simple_object_size.value)
file_name = s3_helper.object_key_from_file_path(file_path)
with reporter.step("Put object into one bucket"):
s3_client.put_object(bucket, file_path)
with reporter.step("Delete the object from the bucket"):
delete_object = s3_client.delete_object(bucket, file_name)
versions = s3_client.list_objects_versions(bucket)
obj_versions = {version.get("VersionId") for version in versions if version.get("Key") == file_name}
assert obj_versions, f"Object versions were not found {objects_list}"
assert "DeleteMarker" in delete_object.keys(), "Delete markers not found"
with reporter.step("Delete the object from the bucket again"):
delete_object_2nd_attempt = s3_client.delete_object(bucket, file_name)
versions_2nd_attempt = s3_client.list_objects_versions(bucket)
assert delete_object.keys() == delete_object_2nd_attempt.keys(), "Delete markers are not the same"
# check that nothing was changed
# checking here not VersionId only, but all data (for example LastModified)
assert versions == versions_2nd_attempt, "Versions are not the same"