a.berezin
cc440f9c12
Some checks reported warnings
DCO check / Commits Check (pull_request) Has been cancelled
Signed-off-by: a.berezin <a.berezin@yadro.com>
141 lines
6.7 KiB
Python
141 lines
6.7 KiB
Python
from datetime import datetime, timedelta
|
|
|
|
import allure
|
|
import pytest
|
|
from frostfs_testlib import reporter
|
|
from frostfs_testlib.s3 import S3ClientWrapper, VersioningStatus
|
|
from frostfs_testlib.steps.s3 import s3_helper
|
|
from frostfs_testlib.storage.dataclasses.object_size import ObjectSize
|
|
from frostfs_testlib.utils.file_utils import generate_file
|
|
|
|
|
|
@pytest.mark.s3_gate
|
|
@pytest.mark.s3_gate_bucket
|
|
class TestS3GateBucket:
|
|
@allure.title("Bucket API (s3_client={s3_client})")
|
|
def test_s3_buckets(
|
|
self,
|
|
s3_client: S3ClientWrapper,
|
|
simple_object_size: ObjectSize,
|
|
):
|
|
"""
|
|
Test base S3 Bucket API (Create/List/Head/Delete).
|
|
"""
|
|
|
|
file_path = generate_file(simple_object_size.value)
|
|
file_name = s3_helper.object_key_from_file_path(file_path)
|
|
|
|
with reporter.step("Create buckets"):
|
|
bucket_1 = s3_client.create_bucket(object_lock_enabled_for_bucket=True)
|
|
s3_helper.set_bucket_versioning(s3_client, bucket_1, VersioningStatus.ENABLED)
|
|
bucket_2 = s3_client.create_bucket()
|
|
|
|
with reporter.step("Check buckets are presented in the system"):
|
|
buckets = s3_client.list_buckets()
|
|
assert bucket_1 in buckets, f"Expected bucket {bucket_1} is in the list"
|
|
assert bucket_2 in buckets, f"Expected bucket {bucket_2} is in the list"
|
|
|
|
with reporter.step("Bucket must be empty"):
|
|
for bucket in (bucket_1, bucket_2):
|
|
with reporter.step("Verify default list command"):
|
|
objects_list = s3_client.list_objects(bucket)
|
|
assert not objects_list, f"Expected empty bucket, got {objects_list}"
|
|
|
|
with reporter.step("Verify V2 list command"):
|
|
objects_list = s3_client.list_objects_v2(bucket)
|
|
assert not objects_list, f"Expected empty bucket, got {objects_list}"
|
|
|
|
with reporter.step("Check buckets are visible with S3 head command"):
|
|
s3_client.head_bucket(bucket_1)
|
|
s3_client.head_bucket(bucket_2)
|
|
|
|
with reporter.step("Check we can put/list object with S3 commands"):
|
|
version_id = s3_client.put_object(bucket_1, file_path)
|
|
s3_client.head_object(bucket_1, file_name)
|
|
|
|
bucket_objects = s3_client.list_objects(bucket_1)
|
|
assert file_name in bucket_objects, f"Expected file {file_name} in objects list {bucket_objects}"
|
|
|
|
with reporter.step("Try to delete not empty bucket and get error"):
|
|
with pytest.raises(Exception, match=r".*The bucket you tried to delete is not empty.*"):
|
|
s3_client.delete_bucket(bucket_1)
|
|
|
|
s3_client.head_bucket(bucket_1)
|
|
|
|
with reporter.step("Delete empty bucket_2"):
|
|
s3_client.delete_bucket(bucket_2)
|
|
|
|
with reporter.step("Check bucket_2 is deleted"):
|
|
with pytest.raises(Exception, match=r".*Not Found.*"):
|
|
s3_client.head_bucket(bucket_2)
|
|
|
|
buckets = s3_client.list_buckets()
|
|
assert bucket_1 in buckets, f"Expected bucket {bucket_1} is in the list"
|
|
assert bucket_2 not in buckets, f"Expected bucket {bucket_2} is not in the list"
|
|
|
|
with reporter.step("Delete object from bucket_1"):
|
|
s3_client.delete_object(bucket_1, file_name, version_id)
|
|
s3_helper.check_objects_in_bucket(s3_client, bucket_1, expected_objects=[])
|
|
|
|
with reporter.step("Delete bucket_1"):
|
|
s3_client.delete_bucket(bucket_1)
|
|
|
|
with reporter.step("Check bucket_1 deleted"):
|
|
with pytest.raises(Exception, match=r".*Not Found.*"):
|
|
s3_client.head_bucket(bucket_1)
|
|
|
|
@allure.title("Create bucket with object lock (s3_client={s3_client})")
|
|
def test_s3_bucket_object_lock(self, s3_client: S3ClientWrapper, simple_object_size: ObjectSize):
|
|
file_path = generate_file(simple_object_size.value)
|
|
file_name = s3_helper.object_key_from_file_path(file_path)
|
|
|
|
with reporter.step("Create bucket with --no-object-lock-enabled-for-bucket"):
|
|
bucket = s3_client.create_bucket(object_lock_enabled_for_bucket=False)
|
|
date_obj = datetime.utcnow() + timedelta(days=1)
|
|
with pytest.raises(Exception, match=r".*Object Lock configuration does not exist for this bucket.*"):
|
|
# An error occurred (ObjectLockConfigurationNotFoundError) when calling the PutObject operation (reached max retries: 0):
|
|
# Object Lock configuration does not exist for this bucket
|
|
s3_client.put_object(
|
|
bucket,
|
|
file_path,
|
|
object_lock_mode="COMPLIANCE",
|
|
object_lock_retain_until_date=date_obj.strftime("%Y-%m-%dT%H:%M:%S"),
|
|
)
|
|
with reporter.step("Create bucket with --object-lock-enabled-for-bucket"):
|
|
bucket_1 = s3_client.create_bucket(object_lock_enabled_for_bucket=True)
|
|
date_obj_1 = datetime.utcnow() + timedelta(days=1)
|
|
s3_client.put_object(
|
|
bucket_1,
|
|
file_path,
|
|
object_lock_mode="COMPLIANCE",
|
|
object_lock_retain_until_date=date_obj_1.strftime("%Y-%m-%dT%H:%M:%S"),
|
|
object_lock_legal_hold_status="ON",
|
|
)
|
|
s3_helper.assert_object_lock_mode(s3_client, bucket_1, file_name, "COMPLIANCE", date_obj_1, "ON")
|
|
|
|
@allure.title("Delete bucket (s3_client={s3_client})")
|
|
def test_s3_delete_bucket(self, s3_client: S3ClientWrapper, simple_object_size: ObjectSize):
|
|
file_path_1 = generate_file(simple_object_size.value)
|
|
file_name_1 = s3_helper.object_key_from_file_path(file_path_1)
|
|
file_path_2 = generate_file(simple_object_size.value)
|
|
file_name_2 = s3_helper.object_key_from_file_path(file_path_2)
|
|
bucket = s3_client.create_bucket()
|
|
|
|
with reporter.step("Put two objects into bucket"):
|
|
s3_client.put_object(bucket, file_path_1)
|
|
s3_client.put_object(bucket, file_path_2)
|
|
s3_helper.check_objects_in_bucket(s3_client, bucket, [file_name_1, file_name_2])
|
|
|
|
with reporter.step("Try to delete not empty bucket and get error"):
|
|
with pytest.raises(Exception, match=r".*The bucket you tried to delete is not empty.*"):
|
|
s3_client.delete_bucket(bucket)
|
|
|
|
with reporter.step("Delete object in bucket"):
|
|
s3_client.delete_object(bucket, file_name_1)
|
|
s3_client.delete_object(bucket, file_name_2)
|
|
s3_helper.check_objects_in_bucket(s3_client, bucket, [])
|
|
|
|
with reporter.step("Delete empty bucket"):
|
|
s3_client.delete_bucket(bucket)
|
|
with pytest.raises(Exception, match=r".*Not Found.*"):
|
|
s3_client.head_bucket(bucket)
|