frostfs-testcases/pytest_tests/testsuites/services/s3_gate/test_s3_bucket.py
a.berezin 6d68d14461 [#246] Fix ACL and Policy tests
Signed-off-by: a.berezin <a.berezin@yadro.com>
2024-06-05 15:00:07 +03:00

88 lines
4.7 KiB
Python

from datetime import datetime, timedelta
import allure
import pytest
from frostfs_testlib import reporter
from frostfs_testlib.resources.s3_acl_grants import PRIVATE_GRANTS, PUBLIC_READ_GRANTS, PUBLIC_READ_WRITE_GRANTS
from frostfs_testlib.s3 import S3ClientWrapper
from frostfs_testlib.steps.s3 import s3_helper
from frostfs_testlib.storage.dataclasses.object_size import ObjectSize
from frostfs_testlib.utils.file_utils import generate_file
@pytest.mark.s3_gate
@pytest.mark.s3_gate_bucket
class TestS3GateBucket:
@allure.title("Create Bucket with different ACL (s3_client={s3_client})")
def test_s3_create_bucket_with_ACL(self, s3_client: S3ClientWrapper):
with reporter.step("Create bucket with ACL private"):
bucket = s3_client.create_bucket(object_lock_enabled_for_bucket=True, acl="private")
bucket_grants = s3_client.get_bucket_acl(bucket)
s3_helper.verify_acl_permissions(bucket_grants, PRIVATE_GRANTS)
with reporter.step("Create bucket with ACL public-read"):
read_bucket = s3_client.create_bucket(object_lock_enabled_for_bucket=True, acl="public-read")
bucket_grants = s3_client.get_bucket_acl(read_bucket)
s3_helper.verify_acl_permissions(bucket_grants, PUBLIC_READ_GRANTS)
with reporter.step("Create bucket with ACL public-read-write"):
public_rw_bucket = s3_client.create_bucket(object_lock_enabled_for_bucket=True, acl="public-read-write")
bucket_grants = s3_client.get_bucket_acl(public_rw_bucket)
s3_helper.verify_acl_permissions(bucket_grants, PUBLIC_READ_WRITE_GRANTS)
@allure.title("Create bucket with object lock (s3_client={s3_client})")
def test_s3_bucket_object_lock(self, s3_client: S3ClientWrapper, simple_object_size: ObjectSize):
file_path = generate_file(simple_object_size.value)
file_name = s3_helper.object_key_from_file_path(file_path)
with reporter.step("Create bucket with --no-object-lock-enabled-for-bucket"):
bucket = s3_client.create_bucket(object_lock_enabled_for_bucket=False)
date_obj = datetime.utcnow() + timedelta(days=1)
with pytest.raises(Exception, match=r".*Object Lock configuration does not exist for this bucket.*"):
# An error occurred (ObjectLockConfigurationNotFoundError) when calling the PutObject operation (reached max retries: 0):
# Object Lock configuration does not exist for this bucket
s3_client.put_object(
bucket,
file_path,
object_lock_mode="COMPLIANCE",
object_lock_retain_until_date=date_obj.strftime("%Y-%m-%dT%H:%M:%S"),
)
with reporter.step("Create bucket with --object-lock-enabled-for-bucket"):
bucket_1 = s3_client.create_bucket(object_lock_enabled_for_bucket=True)
date_obj_1 = datetime.utcnow() + timedelta(days=1)
s3_client.put_object(
bucket_1,
file_path,
object_lock_mode="COMPLIANCE",
object_lock_retain_until_date=date_obj_1.strftime("%Y-%m-%dT%H:%M:%S"),
object_lock_legal_hold_status="ON",
)
s3_helper.assert_object_lock_mode(s3_client, bucket_1, file_name, "COMPLIANCE", date_obj_1, "ON")
@allure.title("Delete bucket (s3_client={s3_client})")
def test_s3_delete_bucket(self, s3_client: S3ClientWrapper, simple_object_size: ObjectSize):
file_path_1 = generate_file(simple_object_size.value)
file_name_1 = s3_helper.object_key_from_file_path(file_path_1)
file_path_2 = generate_file(simple_object_size.value)
file_name_2 = s3_helper.object_key_from_file_path(file_path_2)
bucket = s3_client.create_bucket()
with reporter.step("Put two objects into bucket"):
s3_client.put_object(bucket, file_path_1)
s3_client.put_object(bucket, file_path_2)
s3_helper.check_objects_in_bucket(s3_client, bucket, [file_name_1, file_name_2])
with reporter.step("Try to delete not empty bucket and get error"):
with pytest.raises(Exception, match=r".*The bucket you tried to delete is not empty.*"):
s3_client.delete_bucket(bucket)
with reporter.step("Delete object in bucket"):
s3_client.delete_object(bucket, file_name_1)
s3_client.delete_object(bucket, file_name_2)
s3_helper.check_objects_in_bucket(s3_client, bucket, [])
with reporter.step("Delete empty bucket"):
s3_client.delete_bucket(bucket)
with pytest.raises(Exception, match=r".*Not Found.*"):
s3_client.head_bucket(bucket)