BucketPolicy: add tests for ConfirmRemoveSelfBucketAccess header

Refrence: https://github.com/ceph/ceph/pull/57629
Signed-off-by: Seena Fallah <seenafallah@gmail.com>
This commit is contained in:
Seena Fallah 2024-05-22 17:57:25 +02:00
parent 00b9a2a291
commit 06b2250150
3 changed files with 102 additions and 0 deletions

View file

@ -142,6 +142,13 @@ secret_key = aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa
user_id = RGW11111111111111111
email = account1@ceph.com
# iam account non-root user in the same account [iam root]
[iam root alt]
access_key = CCCCCCCCCCCCCCCCCCcc
secret_key = cccccccccccccccccccccccccccccccccccccccc
user_id = testacct1user
email = account1@ceph.com
# iam account root user in a different account than [iam root]
[iam alt root]
access_key = BBBBBBBBBBBBBBBBBBbb

View file

@ -271,6 +271,11 @@ def configure():
config.iam_root_user_id = cfg.get('iam root',"user_id")
config.iam_root_email = cfg.get('iam root',"email")
config.iam_root_alt_access_key = cfg.get('iam root alt',"access_key")
config.iam_root_alt_secret_key = cfg.get('iam root alt',"secret_key")
config.iam_root_alt_user_id = cfg.get('iam root alt',"user_id")
config.iam_root_alt_email = cfg.get('iam root alt',"email")
config.iam_alt_root_access_key = cfg.get('iam alt root',"access_key")
config.iam_alt_root_secret_key = cfg.get('iam alt root',"secret_key")
config.iam_alt_root_user_id = cfg.get('iam alt root',"user_id")
@ -461,6 +466,17 @@ def get_iam_root_client(**kwargs):
verify=config.default_ssl_verify,
**kwargs)
def get_iam_root_alt_client(**kwargs):
kwargs.setdefault('service_name', 's3')
kwargs.setdefault('aws_access_key_id', config.iam_root_alt_access_key)
kwargs.setdefault('aws_secret_access_key', config.iam_root_alt_secret_key)
return boto3.client(endpoint_url=config.default_endpoint,
region_name='',
use_ssl=config.default_is_secure,
verify=config.default_ssl_verify,
**kwargs)
def get_iam_alt_root_client(**kwargs):
kwargs.setdefault('service_name', 'iam')
kwargs.setdefault('aws_access_key_id', config.iam_alt_root_access_key)

View file

@ -66,6 +66,8 @@ from . import (
get_alt_user_id,
get_alt_email,
get_alt_client,
get_iam_root_client,
get_iam_root_alt_client,
get_tenant_client,
get_tenant_iam_client,
get_tenant_user_id,
@ -10403,6 +10405,83 @@ def test_bucketv2_policy():
response = alt_client.list_objects_v2(Bucket=bucket_name)
assert len(response['Contents']) == 1
@pytest.mark.bucket_policy
def test_bucket_policy_deny_self_denied_policy():
root_client = get_iam_root_client(service_name="s3")
bucket_name = get_new_bucket(root_client)
resource1 = "arn:aws:s3:::" + bucket_name
resource2 = "arn:aws:s3:::" + bucket_name + "/*"
policy_document = json.dumps(
{
"Version": "2012-10-17",
"Statement": [{
"Effect": "Deny",
"Principal": "*",
"Action": [
"s3:PutBucketPolicy",
"s3:GetBucketPolicy",
"s3:DeleteBucketPolicy",
],
"Resource": [
"{}".format(resource1),
"{}".format(resource2)
]
}]
})
root_client.put_bucket_policy(Bucket=bucket_name, Policy=policy_document)
# non-root account should not be able to get, put or delete bucket policy
root_alt_client = get_iam_root_alt_client()
check_access_denied(root_alt_client.get_bucket_policy, Bucket=bucket_name)
check_access_denied(root_alt_client.delete_bucket_policy, Bucket=bucket_name)
check_access_denied(root_alt_client.put_bucket_policy, Bucket=bucket_name, Policy=policy_document)
# root account should be able to get, put or delete bucket policy
response = root_client.get_bucket_policy(Bucket=bucket_name)
assert response['Policy'] == policy_document
root_client.delete_bucket_policy(Bucket=bucket_name)
root_client.put_bucket_policy(Bucket=bucket_name, Policy=policy_document)
@pytest.mark.bucket_policy
def test_bucket_policy_deny_self_denied_policy_confirm_header():
root_client = get_iam_root_client(service_name="s3")
bucket_name = get_new_bucket(root_client)
resource1 = "arn:aws:s3:::" + bucket_name
resource2 = "arn:aws:s3:::" + bucket_name + "/*"
policy_document = json.dumps(
{
"Version": "2012-10-17",
"Statement": [{
"Effect": "Deny",
"Principal": "*",
"Action": [
"s3:PutBucketPolicy",
"s3:GetBucketPolicy",
"s3:DeleteBucketPolicy",
],
"Resource": [
"{}".format(resource1),
"{}".format(resource2)
]
}]
})
root_client.put_bucket_policy(Bucket=bucket_name, Policy=policy_document, ConfirmRemoveSelfBucketAccess=True)
# non-root account should not be able to get, put or delete bucket policy
root_alt_client = get_iam_root_alt_client()
check_access_denied(root_alt_client.get_bucket_policy, Bucket=bucket_name)
check_access_denied(root_alt_client.delete_bucket_policy, Bucket=bucket_name)
check_access_denied(root_alt_client.put_bucket_policy, Bucket=bucket_name, Policy=policy_document)
# root account should not be able to get, put or delete bucket policy
check_access_denied(root_client.get_bucket_policy, Bucket=bucket_name)
check_access_denied(root_client.delete_bucket_policy, Bucket=bucket_name)
check_access_denied(root_client.put_bucket_policy, Bucket=bucket_name, Policy=policy_document)
@pytest.mark.bucket_policy
def test_bucket_policy_acl():
bucket_name = get_new_bucket()