frostfs-testcases/pytest_tests/testsuites/services/s3_gate/test_s3_bucket.py

119 lines
6.2 KiB
Python
Raw Normal View History

from datetime import datetime, timedelta
import allure
import pytest
2023-10-31 14:51:09 +00:00
from frostfs_testlib.s3 import S3ClientWrapper
from frostfs_testlib.steps.s3 import s3_helper
from frostfs_testlib.storage.dataclasses.object_size import ObjectSize
from frostfs_testlib.utils.file_utils import generate_file
@pytest.mark.s3_gate
@pytest.mark.s3_gate_bucket
class TestS3GateBucket:
2023-09-08 10:35:34 +00:00
@allure.title("Create Bucket with different ACL (s3_client={s3_client})")
def test_s3_create_bucket_with_ACL(self, s3_client: S3ClientWrapper):
with allure.step("Create bucket with ACL private"):
bucket = s3_client.create_bucket(object_lock_enabled_for_bucket=True, acl="private")
bucket_acl = s3_client.get_bucket_acl(bucket)
s3_helper.assert_s3_acl(acl_grants=bucket_acl, permitted_users="CanonicalUser")
with allure.step("Create bucket with ACL = public-read"):
2023-10-31 14:51:09 +00:00
bucket_1 = s3_client.create_bucket(object_lock_enabled_for_bucket=True, acl="public-read")
bucket_acl_1 = s3_client.get_bucket_acl(bucket_1)
s3_helper.assert_s3_acl(acl_grants=bucket_acl_1, permitted_users="AllUsers")
with allure.step("Create bucket with ACL public-read-write"):
2023-10-31 14:51:09 +00:00
bucket_2 = s3_client.create_bucket(object_lock_enabled_for_bucket=True, acl="public-read-write")
bucket_acl_2 = s3_client.get_bucket_acl(bucket_2)
s3_helper.assert_s3_acl(acl_grants=bucket_acl_2, permitted_users="AllUsers")
with allure.step("Create bucket with ACL = authenticated-read"):
2023-10-31 14:51:09 +00:00
bucket_3 = s3_client.create_bucket(object_lock_enabled_for_bucket=True, acl="authenticated-read")
bucket_acl_3 = s3_client.get_bucket_acl(bucket_3)
s3_helper.assert_s3_acl(acl_grants=bucket_acl_3, permitted_users="AllUsers")
2023-09-08 10:35:34 +00:00
@allure.title("Create Bucket with different ACL by grant (s3_client={s3_client})")
def test_s3_create_bucket_with_grands(self, s3_client: S3ClientWrapper):
with allure.step("Create bucket with --grant-read"):
bucket = s3_client.create_bucket(
object_lock_enabled_for_bucket=True,
grant_read="uri=http://acs.amazonaws.com/groups/global/AllUsers",
)
bucket_acl = s3_client.get_bucket_acl(bucket)
s3_helper.assert_s3_acl(acl_grants=bucket_acl, permitted_users="AllUsers")
with allure.step("Create bucket with --grant-wtite"):
bucket_1 = s3_client.create_bucket(
object_lock_enabled_for_bucket=True,
grant_write="uri=http://acs.amazonaws.com/groups/global/AllUsers",
)
bucket_acl_1 = s3_client.get_bucket_acl(bucket_1)
s3_helper.assert_s3_acl(acl_grants=bucket_acl_1, permitted_users="AllUsers")
with allure.step("Create bucket with --grant-full-control"):
bucket_2 = s3_client.create_bucket(
object_lock_enabled_for_bucket=True,
grant_full_control="uri=http://acs.amazonaws.com/groups/global/AllUsers",
)
bucket_acl_2 = s3_client.get_bucket_acl(bucket_2)
s3_helper.assert_s3_acl(acl_grants=bucket_acl_2, permitted_users="AllUsers")
2023-09-08 10:35:34 +00:00
@allure.title("Create bucket with object lock (s3_client={s3_client})")
2023-10-31 14:51:09 +00:00
def test_s3_bucket_object_lock(self, s3_client: S3ClientWrapper, simple_object_size: ObjectSize):
file_path = generate_file(simple_object_size.value)
file_name = s3_helper.object_key_from_file_path(file_path)
with allure.step("Create bucket with --no-object-lock-enabled-for-bucket"):
bucket = s3_client.create_bucket(object_lock_enabled_for_bucket=False)
date_obj = datetime.utcnow() + timedelta(days=1)
2023-10-31 14:51:09 +00:00
with pytest.raises(Exception, match=r".*Object Lock configuration does not exist for this bucket.*"):
# An error occurred (ObjectLockConfigurationNotFoundError) when calling the PutObject operation (reached max retries: 0):
# Object Lock configuration does not exist for this bucket
s3_client.put_object(
bucket,
file_path,
object_lock_mode="COMPLIANCE",
object_lock_retain_until_date=date_obj.strftime("%Y-%m-%dT%H:%M:%S"),
)
with allure.step("Create bucket with --object-lock-enabled-for-bucket"):
bucket_1 = s3_client.create_bucket(object_lock_enabled_for_bucket=True)
date_obj_1 = datetime.utcnow() + timedelta(days=1)
s3_client.put_object(
bucket_1,
file_path,
object_lock_mode="COMPLIANCE",
object_lock_retain_until_date=date_obj_1.strftime("%Y-%m-%dT%H:%M:%S"),
object_lock_legal_hold_status="ON",
)
2023-10-31 14:51:09 +00:00
s3_helper.assert_object_lock_mode(s3_client, bucket_1, file_name, "COMPLIANCE", date_obj_1, "ON")
2023-09-08 10:35:34 +00:00
@allure.title("Delete bucket (s3_client={s3_client})")
def test_s3_delete_bucket(self, s3_client: S3ClientWrapper, simple_object_size: ObjectSize):
file_path_1 = generate_file(simple_object_size.value)
file_name_1 = s3_helper.object_key_from_file_path(file_path_1)
file_path_2 = generate_file(simple_object_size.value)
file_name_2 = s3_helper.object_key_from_file_path(file_path_2)
bucket = s3_client.create_bucket()
with allure.step("Put two objects into bucket"):
s3_client.put_object(bucket, file_path_1)
s3_client.put_object(bucket, file_path_2)
s3_helper.check_objects_in_bucket(s3_client, bucket, [file_name_1, file_name_2])
with allure.step("Try to delete not empty bucket and get error"):
with pytest.raises(Exception, match=r".*The bucket you tried to delete is not empty.*"):
s3_client.delete_bucket(bucket)
with allure.step("Delete object in bucket"):
s3_client.delete_object(bucket, file_name_1)
s3_client.delete_object(bucket, file_name_2)
s3_helper.check_objects_in_bucket(s3_client, bucket, [])
with allure.step("Delete empty bucket"):
s3_client.delete_bucket(bucket)
with pytest.raises(Exception, match=r".*Not Found.*"):
s3_client.head_bucket(bucket)