BucketPolicy: add tests for ConfirmRemoveSelfBucketAccess header

Refrence: https://github.com/ceph/ceph/pull/57629
Signed-off-by: Seena Fallah <seenafallah@gmail.com>
This commit is contained in:
Seena Fallah 2024-05-22 17:57:25 +02:00
parent 78458f02d9
commit 99d56caad6
2 changed files with 106 additions and 3 deletions

View file

@ -804,3 +804,24 @@ def get_restore_debug_interval():
def get_read_through_days(): def get_read_through_days():
return config.read_through_restore_days return config.read_through_restore_days
def create_iam_user_s3client(client):
prefix = get_iam_path_prefix()
# generate random name
randname = ''.join(
random.choice(string.ascii_lowercase + string.digits)
for c in range(8)
)
name = make_iam_name(randname)
user = client.create_user(UserName=name, Path=prefix)
# create s3 access and secret keys
keys = client.create_access_key(UserName=user['User']['UserName'])
# create s3 client
return get_iam_s3client(
aws_access_key_id=keys['AccessKey']['AccessKeyId'],
aws_secret_access_key=keys['AccessKey']['SecretAccessKey'],
)

View file

@ -28,11 +28,8 @@ import dateutil.parser
import ssl import ssl
from collections import namedtuple from collections import namedtuple
from collections import defaultdict from collections import defaultdict
from io import StringIO
from io import BytesIO from io import BytesIO
from email.header import decode_header
from .utils import assert_raises from .utils import assert_raises
from .utils import generate_random from .utils import generate_random
from .utils import _get_status_and_error_code from .utils import _get_status_and_error_code
@ -40,6 +37,8 @@ from .utils import _get_status
from .policy import Policy, Statement, make_json_policy from .policy import Policy, Statement, make_json_policy
from .iam import iam_root
from . import ( from . import (
configfile, configfile,
setup_teardown, setup_teardown,
@ -68,6 +67,7 @@ from . import (
get_alt_user_id, get_alt_user_id,
get_alt_email, get_alt_email,
get_alt_client, get_alt_client,
get_iam_root_client,
get_tenant_client, get_tenant_client,
get_v2_tenant_client, get_v2_tenant_client,
get_tenant_iam_client, get_tenant_iam_client,
@ -90,6 +90,7 @@ from . import (
get_lc_debug_interval, get_lc_debug_interval,
get_restore_debug_interval, get_restore_debug_interval,
get_read_through_days, get_read_through_days,
create_iam_user_s3client,
) )
@ -10848,6 +10849,87 @@ def test_bucketv2_policy():
response = alt_client.list_objects_v2(Bucket=bucket_name) response = alt_client.list_objects_v2(Bucket=bucket_name)
assert len(response['Contents']) == 1 assert len(response['Contents']) == 1
@pytest.mark.bucket_policy
@pytest.mark.iam_account
@pytest.mark.iam_user
def test_bucket_policy_deny_self_denied_policy(iam_root):
root_client = get_iam_root_client(service_name="s3")
bucket_name = get_new_bucket(root_client)
resource1 = "arn:aws:s3:::" + bucket_name
resource2 = "arn:aws:s3:::" + bucket_name + "/*"
policy_document = json.dumps(
{
"Version": "2012-10-17",
"Statement": [{
"Effect": "Deny",
"Principal": "*",
"Action": [
"s3:PutBucketPolicy",
"s3:GetBucketPolicy",
"s3:DeleteBucketPolicy",
],
"Resource": [
"{}".format(resource1),
"{}".format(resource2)
]
}]
})
root_client.put_bucket_policy(Bucket=bucket_name, Policy=policy_document)
# non-root account should not be able to get, put or delete bucket policy
root_alt_client = create_iam_user_s3client(iam_root)
check_access_denied(root_alt_client.get_bucket_policy, Bucket=bucket_name)
check_access_denied(root_alt_client.delete_bucket_policy, Bucket=bucket_name)
check_access_denied(root_alt_client.put_bucket_policy, Bucket=bucket_name, Policy=policy_document)
# root account should be able to get, put or delete bucket policy
response = root_client.get_bucket_policy(Bucket=bucket_name)
assert response['Policy'] == policy_document
root_client.delete_bucket_policy(Bucket=bucket_name)
root_client.put_bucket_policy(Bucket=bucket_name, Policy=policy_document)
@pytest.mark.bucket_policy
@pytest.mark.iam_account
@pytest.mark.iam_user
def test_bucket_policy_deny_self_denied_policy_confirm_header(iam_root):
root_client = get_iam_root_client(service_name="s3")
bucket_name = get_new_bucket(root_client)
resource1 = "arn:aws:s3:::" + bucket_name
resource2 = "arn:aws:s3:::" + bucket_name + "/*"
policy_document = json.dumps(
{
"Version": "2012-10-17",
"Statement": [{
"Effect": "Deny",
"Principal": "*",
"Action": [
"s3:PutBucketPolicy",
"s3:GetBucketPolicy",
"s3:DeleteBucketPolicy",
],
"Resource": [
"{}".format(resource1),
"{}".format(resource2)
]
}]
})
root_client.put_bucket_policy(Bucket=bucket_name, Policy=policy_document, ConfirmRemoveSelfBucketAccess=True)
# non-root account should not be able to get, put or delete bucket policy
root_alt_client = create_iam_user_s3client(iam_root)
check_access_denied(root_alt_client.get_bucket_policy, Bucket=bucket_name)
check_access_denied(root_alt_client.delete_bucket_policy, Bucket=bucket_name)
check_access_denied(root_alt_client.put_bucket_policy, Bucket=bucket_name, Policy=policy_document)
# root account should not be able to get, put or delete bucket policy
check_access_denied(root_client.get_bucket_policy, Bucket=bucket_name)
check_access_denied(root_client.delete_bucket_policy, Bucket=bucket_name)
check_access_denied(root_client.put_bucket_policy, Bucket=bucket_name, Policy=policy_document)
@pytest.mark.bucket_policy @pytest.mark.bucket_policy
def test_bucket_policy_acl(): def test_bucket_policy_acl():
bucket_name = get_new_bucket() bucket_name = get_new_bucket()