PublicAccessBlock: test access deny via bucket policy

Make sure 403 is returned when access is denied via s3:GetBucketPublicAccessBlock action on GetBucketPublicAccessBlock

Refs: https://github.com/ceph/ceph/pull/55652
Signed-off-by: Seena Fallah <seenafallah@gmail.com>
(cherry picked from commit 3af42312bf)
This commit is contained in:
Seena Fallah 2024-02-19 22:43:43 +01:00 committed by Casey Bodley
parent a0d38e7d35
commit 509acecc67
2 changed files with 30 additions and 2 deletions

View file

@ -37,10 +37,10 @@ class Policy(object):
return json.dumps(policy_dict)
def make_json_policy(action, resource, principal={"AWS": "*"}, conditions=None):
def make_json_policy(action, resource, principal={"AWS": "*"}, effect="Allow", conditions=None):
"""
Helper function to make single statement policies
"""
s = Statement(action, resource, principal, condition=conditions)
s = Statement(action, resource, principal, effect=effect, condition=conditions)
p = Policy()
return p.add_statement(s).to_json()

View file

@ -12607,6 +12607,34 @@ def test_get_undefined_public_block():
assert response_code == 'NoSuchPublicAccessBlockConfiguration'
def test_get_public_block_deny_bucket_policy():
bucket_name = get_new_bucket()
client = get_client()
access_conf = {'BlockPublicAcls': True,
'IgnorePublicAcls': True,
'BlockPublicPolicy': True,
'RestrictPublicBuckets': False}
client.put_public_access_block(Bucket=bucket_name, PublicAccessBlockConfiguration=access_conf)
# make sure we can get the public access block
resp = client.get_public_access_block(Bucket=bucket_name)
assert resp['PublicAccessBlockConfiguration']['BlockPublicAcls'] == access_conf['BlockPublicAcls']
assert resp['PublicAccessBlockConfiguration']['BlockPublicPolicy'] == access_conf['BlockPublicPolicy']
assert resp['PublicAccessBlockConfiguration']['IgnorePublicAcls'] == access_conf['IgnorePublicAcls']
assert resp['PublicAccessBlockConfiguration']['RestrictPublicBuckets'] == access_conf['RestrictPublicBuckets']
# make bucket policy to deny access
resource = _make_arn_resource(bucket_name)
policy_document = make_json_policy("s3:GetBucketPublicAccessBlock",
resource, effect="Deny")
client.put_bucket_policy(Bucket=bucket_name, Policy=policy_document)
# check if the access is denied
e = assert_raises(ClientError, client.get_public_access_block, Bucket=bucket_name)
status, error_code = _get_status_and_error_code(e.response)
assert status == 403
def test_put_public_block():
#client = get_svc_client(svc='s3control', client_config=Config(s3={'addressing_style': 'path'}))
bucket_name = get_new_bucket()