forked from TrueCloudLab/frostfs-testcases
150 lines
6.8 KiB
Python
150 lines
6.8 KiB
Python
import os
|
|
|
|
import allure
|
|
import pytest
|
|
from frostfs_testlib import reporter
|
|
from frostfs_testlib.s3 import S3ClientWrapper, VersioningStatus
|
|
from frostfs_testlib.steps.cli.container import search_container_by_name
|
|
from frostfs_testlib.steps.s3 import s3_helper
|
|
from frostfs_testlib.steps.storage_policy import get_simple_object_copies
|
|
from frostfs_testlib.storage.dataclasses.object_size import ObjectSize
|
|
from frostfs_testlib.testing.cluster_test_base import ClusterTestBase
|
|
from frostfs_testlib.testing.test_control import expect_not_raises
|
|
from frostfs_testlib.utils.file_utils import generate_file
|
|
|
|
|
|
@pytest.mark.s3_gate
|
|
@pytest.mark.parametrize("s3_policy", ["pytest_tests/resources/files/policy.json"], indirect=True)
|
|
class TestS3GatePolicy(ClusterTestBase):
|
|
@allure.title("Bucket creation with retention policy applied (s3_client={s3_client})")
|
|
def test_s3_bucket_location(self, default_wallet: str, s3_client: S3ClientWrapper, simple_object_size: ObjectSize):
|
|
file_path_1 = generate_file(simple_object_size.value)
|
|
file_name_1 = s3_helper.object_key_from_file_path(file_path_1)
|
|
file_path_2 = generate_file(simple_object_size.value)
|
|
file_name_2 = s3_helper.object_key_from_file_path(file_path_2)
|
|
|
|
with reporter.step("Create two buckets with different bucket configuration"):
|
|
bucket_1 = s3_client.create_bucket(location_constraint="complex")
|
|
s3_helper.set_bucket_versioning(s3_client, bucket_1, VersioningStatus.ENABLED)
|
|
bucket_2 = s3_client.create_bucket(location_constraint="rep-3")
|
|
s3_helper.set_bucket_versioning(s3_client, bucket_2, VersioningStatus.ENABLED)
|
|
list_buckets = s3_client.list_buckets()
|
|
assert (
|
|
bucket_1 in list_buckets and bucket_2 in list_buckets
|
|
), f"Expected two buckets {bucket_1, bucket_2}, got {list_buckets}"
|
|
|
|
with reporter.step("Check head buckets"):
|
|
with expect_not_raises():
|
|
s3_client.head_bucket(bucket_1)
|
|
s3_client.head_bucket(bucket_2)
|
|
|
|
with reporter.step("Put objects into buckets"):
|
|
version_id_1 = s3_client.put_object(bucket_1, file_path_1)
|
|
version_id_2 = s3_client.put_object(bucket_2, file_path_2)
|
|
s3_helper.check_objects_in_bucket(s3_client, bucket_1, [file_name_1])
|
|
s3_helper.check_objects_in_bucket(s3_client, bucket_2, [file_name_2])
|
|
|
|
with reporter.step("Check bucket location"):
|
|
bucket_loc_1 = s3_client.get_bucket_location(bucket_1)
|
|
bucket_loc_2 = s3_client.get_bucket_location(bucket_2)
|
|
assert bucket_loc_1 == "complex"
|
|
assert bucket_loc_2 == "rep-3"
|
|
|
|
with reporter.step("Check object policy"):
|
|
for cluster_node in self.cluster.cluster_nodes:
|
|
cid_1 = search_container_by_name(name=bucket_1, node=cluster_node)
|
|
if cid_1:
|
|
break
|
|
copies_1 = get_simple_object_copies(
|
|
wallet=default_wallet,
|
|
cid=cid_1,
|
|
oid=version_id_1,
|
|
shell=self.shell,
|
|
nodes=self.cluster.storage_nodes,
|
|
)
|
|
assert copies_1 == 1
|
|
for cluster_node in self.cluster.cluster_nodes:
|
|
cid_2 = search_container_by_name(name=bucket_2, node=cluster_node)
|
|
if cid_2:
|
|
break
|
|
copies_2 = get_simple_object_copies(
|
|
wallet=default_wallet,
|
|
cid=cid_2,
|
|
oid=version_id_2,
|
|
shell=self.shell,
|
|
nodes=self.cluster.storage_nodes,
|
|
)
|
|
assert copies_2 == 3
|
|
|
|
@allure.title("Bucket with unexisting location constraint (s3_client={s3_client})")
|
|
def test_s3_bucket_wrong_location(self, s3_client: S3ClientWrapper):
|
|
with reporter.step("Create bucket with unenxisting location constraint policy"):
|
|
with pytest.raises(Exception):
|
|
s3_client.create_bucket(location_constraint="UNEXISTING LOCATION CONSTRAINT")
|
|
|
|
@allure.title("Bucket policy (s3_client={s3_client})")
|
|
def test_s3_bucket_policy(self, s3_client: S3ClientWrapper):
|
|
with reporter.step("Create bucket with default policy"):
|
|
bucket = s3_client.create_bucket()
|
|
s3_helper.set_bucket_versioning(s3_client, bucket, VersioningStatus.ENABLED)
|
|
|
|
with reporter.step("GetBucketPolicy"):
|
|
s3_client.get_bucket_policy(bucket)
|
|
|
|
with reporter.step("Put new policy"):
|
|
custom_policy = f"file://{os.getcwd()}/pytest_tests/resources/files/bucket_policy.json"
|
|
custom_policy = {
|
|
"Version": "2008-10-17",
|
|
"Id": "aaaa-bbbb-cccc-dddd",
|
|
"Statement": [
|
|
{
|
|
"Sid": "AddPerm",
|
|
"Effect": "Allow",
|
|
"Principal": {"AWS": "*"},
|
|
"Action": ["s3:GetObject"],
|
|
"Resource": [f"arn:aws:s3:::{bucket}/*"],
|
|
}
|
|
],
|
|
}
|
|
|
|
s3_client.put_bucket_policy(bucket, custom_policy)
|
|
with reporter.step("GetBucketPolicy"):
|
|
policy_1 = s3_client.get_bucket_policy(bucket)
|
|
print(policy_1)
|
|
|
|
@allure.title("Bucket CORS (s3_client={s3_client})")
|
|
def test_s3_cors(self, s3_client: S3ClientWrapper):
|
|
with reporter.step("Create bucket without cors"):
|
|
bucket = s3_client.create_bucket()
|
|
s3_helper.set_bucket_versioning(s3_client, bucket, VersioningStatus.ENABLED)
|
|
|
|
with pytest.raises(Exception):
|
|
bucket_cors = s3_client.get_bucket_cors(bucket)
|
|
|
|
with reporter.step("Put bucket cors"):
|
|
cors = {
|
|
"CORSRules": [
|
|
{
|
|
"AllowedOrigins": ["http://www.example.com"],
|
|
"AllowedHeaders": ["*"],
|
|
"AllowedMethods": ["PUT", "POST", "DELETE"],
|
|
"MaxAgeSeconds": 3000,
|
|
"ExposeHeaders": ["x-amz-server-side-encryption"],
|
|
},
|
|
{
|
|
"AllowedOrigins": ["*"],
|
|
"AllowedHeaders": ["Authorization"],
|
|
"AllowedMethods": ["GET"],
|
|
"MaxAgeSeconds": 3000,
|
|
},
|
|
]
|
|
}
|
|
s3_client.put_bucket_cors(bucket, cors)
|
|
bucket_cors = s3_client.get_bucket_cors(bucket)
|
|
assert bucket_cors == cors.get("CORSRules"), f"Expected CORSRules must be {cors.get('CORSRules')}"
|
|
|
|
with reporter.step("delete bucket cors"):
|
|
s3_client.delete_bucket_cors(bucket)
|
|
|
|
with pytest.raises(Exception):
|
|
bucket_cors = s3_client.get_bucket_cors(bucket)
|