frostfs-testcases/pytest_tests/testsuites/services/s3_gate/test_s3_locking.py

229 lines
10 KiB
Python
Raw Permalink Normal View History

import time
from datetime import datetime, timedelta
import allure
import pytest
from frostfs_testlib.s3 import AwsCliClient, Boto3ClientWrapper, S3ClientWrapper
from frostfs_testlib.steps.s3 import s3_helper
from frostfs_testlib.storage.dataclasses.object_size import ObjectSize
from frostfs_testlib.utils.file_utils import generate_file, generate_file_with_content
def pytest_generate_tests(metafunc: pytest.Metafunc):
if "s3_client" in metafunc.fixturenames:
metafunc.parametrize("s3_client", [AwsCliClient, Boto3ClientWrapper], indirect=True)
@pytest.mark.sanity
@pytest.mark.s3_gate
@pytest.mark.s3_gate_locking
@pytest.mark.parametrize("version_id", [None, "second"])
class TestS3GateLocking:
2023-08-07 09:43:16 +00:00
@allure.title(
"{s3_client}: Retention period & legal lock on the object with version_id={version_id}"
)
def test_s3_object_locking(
self, s3_client: S3ClientWrapper, version_id: str, simple_object_size: ObjectSize
):
file_path = generate_file(simple_object_size.value)
file_name = s3_helper.object_key_from_file_path(file_path)
retention_period = 2
bucket = s3_client.create_bucket(object_lock_enabled_for_bucket=True)
with allure.step("Put several versions of object into bucket"):
s3_client.put_object(bucket, file_path)
file_name_1 = generate_file_with_content(simple_object_size.value, file_path=file_path)
version_id_2 = s3_client.put_object(bucket, file_name_1)
s3_helper.check_objects_in_bucket(s3_client, bucket, [file_name])
if version_id:
version_id = version_id_2
with allure.step(f"Put retention period {retention_period}min to object {file_name}"):
date_obj = datetime.utcnow() + timedelta(minutes=retention_period)
retention = {
"Mode": "COMPLIANCE",
"RetainUntilDate": date_obj,
}
s3_client.put_object_retention(bucket, file_name, retention, version_id)
s3_helper.assert_object_lock_mode(
s3_client, bucket, file_name, "COMPLIANCE", date_obj, "OFF"
)
with allure.step(f"Put legal hold to object {file_name}"):
s3_client.put_object_legal_hold(bucket, file_name, "ON", version_id)
s3_helper.assert_object_lock_mode(
s3_client, bucket, file_name, "COMPLIANCE", date_obj, "ON"
)
with allure.step("Fail with deleting object with legal hold and retention period"):
if version_id:
with pytest.raises(Exception):
# An error occurred (AccessDenied) when calling the DeleteObject operation (reached max retries: 0): Access Denied.
s3_client.delete_object(bucket, file_name, version_id)
with allure.step("Check retention period is no longer set on the uploaded object"):
time.sleep((retention_period + 1) * 60)
s3_helper.assert_object_lock_mode(
s3_client, bucket, file_name, "COMPLIANCE", date_obj, "ON"
)
with allure.step("Fail with deleting object with legal hold and retention period"):
if version_id:
with pytest.raises(Exception):
# An error occurred (AccessDenied) when calling the DeleteObject operation (reached max retries: 0): Access Denied.
s3_client.delete_object(bucket, file_name, version_id)
else:
s3_client.delete_object(bucket, file_name, version_id)
2023-08-07 09:43:16 +00:00
@allure.title(
"{s3_client}: Impossible to change the retention mode COMPLIANCE with version_id={version_id}"
)
def test_s3_mode_compliance(
self, s3_client: S3ClientWrapper, version_id: str, simple_object_size: ObjectSize
):
file_path = generate_file(simple_object_size.value)
file_name = s3_helper.object_key_from_file_path(file_path)
retention_period = 2
retention_period_1 = 1
bucket = s3_client.create_bucket(object_lock_enabled_for_bucket=True)
with allure.step("Put object into bucket"):
obj_version = s3_client.put_object(bucket, file_path)
if version_id:
version_id = obj_version
s3_helper.check_objects_in_bucket(s3_client, bucket, [file_name])
with allure.step(f"Put retention period {retention_period}min to object {file_name}"):
date_obj = datetime.utcnow() + timedelta(minutes=retention_period)
retention = {
"Mode": "COMPLIANCE",
"RetainUntilDate": date_obj,
}
s3_client.put_object_retention(bucket, file_name, retention, version_id)
s3_helper.assert_object_lock_mode(
s3_client, bucket, file_name, "COMPLIANCE", date_obj, "OFF"
)
with allure.step(
f"Try to change retention period {retention_period_1}min to object {file_name}"
):
date_obj = datetime.utcnow() + timedelta(minutes=retention_period_1)
retention = {
"Mode": "COMPLIANCE",
"RetainUntilDate": date_obj,
}
with pytest.raises(Exception):
s3_client.put_object_retention(bucket, file_name, retention, version_id)
2023-08-07 09:43:16 +00:00
@allure.title("{s3_client}: Change retention mode GOVERNANCE with version_id={version_id}")
def test_s3_mode_governance(
self, s3_client: S3ClientWrapper, version_id: str, simple_object_size: ObjectSize
):
file_path = generate_file(simple_object_size.value)
file_name = s3_helper.object_key_from_file_path(file_path)
retention_period = 3
retention_period_1 = 2
retention_period_2 = 5
bucket = s3_client.create_bucket(object_lock_enabled_for_bucket=True)
with allure.step("Put object into bucket"):
obj_version = s3_client.put_object(bucket, file_path)
if version_id:
version_id = obj_version
s3_helper.check_objects_in_bucket(s3_client, bucket, [file_name])
with allure.step(f"Put retention period {retention_period}min to object {file_name}"):
date_obj = datetime.utcnow() + timedelta(minutes=retention_period)
retention = {
"Mode": "GOVERNANCE",
"RetainUntilDate": date_obj,
}
s3_client.put_object_retention(bucket, file_name, retention, version_id)
s3_helper.assert_object_lock_mode(
s3_client, bucket, file_name, "GOVERNANCE", date_obj, "OFF"
)
with allure.step(
f"Try to change retention period {retention_period_1}min to object {file_name}"
):
date_obj = datetime.utcnow() + timedelta(minutes=retention_period_1)
retention = {
"Mode": "GOVERNANCE",
"RetainUntilDate": date_obj,
}
with pytest.raises(Exception):
s3_client.put_object_retention(bucket, file_name, retention, version_id)
with allure.step(
f"Try to change retention period {retention_period_1}min to object {file_name}"
):
date_obj = datetime.utcnow() + timedelta(minutes=retention_period_1)
retention = {
"Mode": "GOVERNANCE",
"RetainUntilDate": date_obj,
}
with pytest.raises(Exception):
s3_client.put_object_retention(bucket, file_name, retention, version_id)
with allure.step(f"Put new retention period {retention_period_2}min to object {file_name}"):
date_obj = datetime.utcnow() + timedelta(minutes=retention_period_2)
retention = {
"Mode": "GOVERNANCE",
"RetainUntilDate": date_obj,
}
s3_client.put_object_retention(bucket, file_name, retention, version_id, True)
s3_helper.assert_object_lock_mode(
s3_client, bucket, file_name, "GOVERNANCE", date_obj, "OFF"
)
2023-08-07 09:43:16 +00:00
@allure.title("{s3_client}: Object Cannot Be Locked with version_id={version_id}")
def test_s3_legal_hold(
self, s3_client: S3ClientWrapper, version_id: str, simple_object_size: ObjectSize
):
file_path = generate_file(simple_object_size.value)
file_name = s3_helper.object_key_from_file_path(file_path)
bucket = s3_client.create_bucket(object_lock_enabled_for_bucket=False)
with allure.step("Put object into bucket"):
obj_version = s3_client.put_object(bucket, file_path)
if version_id:
version_id = obj_version
s3_helper.check_objects_in_bucket(s3_client, bucket, [file_name])
with allure.step(f"Put legal hold to object {file_name}"):
with pytest.raises(Exception):
s3_client.put_object_legal_hold(bucket, file_name, "ON", version_id)
@pytest.mark.s3_gate
class TestS3GateLockingBucket:
2023-08-07 09:43:16 +00:00
@allure.title("{s3_client}: Bucket Lock")
def test_s3_bucket_lock(self, s3_client: S3ClientWrapper, simple_object_size: ObjectSize):
file_path = generate_file(simple_object_size.value)
file_name = s3_helper.object_key_from_file_path(file_path)
configuration = {"Rule": {"DefaultRetention": {"Mode": "COMPLIANCE", "Days": 1}}}
bucket = s3_client.create_bucket(object_lock_enabled_for_bucket=True)
with allure.step("PutObjectLockConfiguration with ObjectLockEnabled=False"):
s3_client.put_object_lock_configuration(bucket, configuration)
with allure.step("PutObjectLockConfiguration with ObjectLockEnabled=True"):
configuration["ObjectLockEnabled"] = "Enabled"
s3_client.put_object_lock_configuration(bucket, configuration)
with allure.step("GetObjectLockConfiguration"):
config = s3_client.get_object_lock_configuration(bucket)
configuration["Rule"]["DefaultRetention"]["Years"] = 0
assert config == configuration, f"Configurations must be equal {configuration}"
with allure.step("Put object into bucket"):
s3_client.put_object(bucket, file_path)
s3_helper.assert_object_lock_mode(
s3_client, bucket, file_name, "COMPLIANCE", None, "OFF", 1
)