Merge pull request #551 from clwluvw/bucket-public-block

BucketPublicAccessBlock: compatibility issues
This commit is contained in:
Casey Bodley 2024-03-07 15:55:37 +00:00 committed by GitHub
commit 12abc78b9b
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
2 changed files with 42 additions and 8 deletions

View file

@ -37,10 +37,10 @@ class Policy(object):
return json.dumps(policy_dict) return json.dumps(policy_dict)
def make_json_policy(action, resource, principal={"AWS": "*"}, conditions=None): def make_json_policy(action, resource, principal={"AWS": "*"}, effect="Allow", conditions=None):
""" """
Helper function to make single statement policies Helper function to make single statement policies
""" """
s = Statement(action, resource, principal, condition=conditions) s = Statement(action, resource, principal, effect=effect, condition=conditions)
p = Policy() p = Policy()
return p.add_statement(s).to_json() return p.add_statement(s).to_json()

View file

@ -12590,16 +12590,50 @@ def test_get_nonpublicpolicy_deny_bucket_policy_status():
resp = client.get_bucket_policy_status(Bucket=bucket_name) resp = client.get_bucket_policy_status(Bucket=bucket_name)
assert resp['PolicyStatus']['IsPublic'] == True assert resp['PolicyStatus']['IsPublic'] == True
def test_get_default_public_block(): def test_get_undefined_public_block():
#client = get_svc_client(svc='s3control', client_config=Config(s3={'addressing_style': 'path'}))
bucket_name = get_new_bucket() bucket_name = get_new_bucket()
client = get_client() client = get_client()
# delete the existing public access block configuration
# as AWS creates a default public access block configuration
resp = client.delete_public_access_block(Bucket=bucket_name)
assert resp['ResponseMetadata']['HTTPStatusCode'] == 204
response_code = ""
try:
resp = client.get_public_access_block(Bucket=bucket_name) resp = client.get_public_access_block(Bucket=bucket_name)
assert resp['PublicAccessBlockConfiguration']['BlockPublicAcls'] == False except ClientError as e:
assert resp['PublicAccessBlockConfiguration']['BlockPublicPolicy'] == False response_code = e.response['Error']['Code']
assert resp['PublicAccessBlockConfiguration']['IgnorePublicAcls'] == False
assert resp['PublicAccessBlockConfiguration']['RestrictPublicBuckets'] == False assert response_code == 'NoSuchPublicAccessBlockConfiguration'
def test_get_public_block_deny_bucket_policy():
bucket_name = get_new_bucket()
client = get_client()
access_conf = {'BlockPublicAcls': True,
'IgnorePublicAcls': True,
'BlockPublicPolicy': True,
'RestrictPublicBuckets': False}
client.put_public_access_block(Bucket=bucket_name, PublicAccessBlockConfiguration=access_conf)
# make sure we can get the public access block
resp = client.get_public_access_block(Bucket=bucket_name)
assert resp['PublicAccessBlockConfiguration']['BlockPublicAcls'] == access_conf['BlockPublicAcls']
assert resp['PublicAccessBlockConfiguration']['BlockPublicPolicy'] == access_conf['BlockPublicPolicy']
assert resp['PublicAccessBlockConfiguration']['IgnorePublicAcls'] == access_conf['IgnorePublicAcls']
assert resp['PublicAccessBlockConfiguration']['RestrictPublicBuckets'] == access_conf['RestrictPublicBuckets']
# make bucket policy to deny access
resource = _make_arn_resource(bucket_name)
policy_document = make_json_policy("s3:GetBucketPublicAccessBlock",
resource, effect="Deny")
client.put_bucket_policy(Bucket=bucket_name, Policy=policy_document)
# check if the access is denied
e = assert_raises(ClientError, client.get_public_access_block, Bucket=bucket_name)
status, error_code = _get_status_and_error_code(e.response)
assert status == 403
def test_put_public_block(): def test_put_public_block():
#client = get_svc_client(svc='s3control', client_config=Config(s3={'addressing_style': 'path'})) #client = get_svc_client(svc='s3control', client_config=Config(s3={'addressing_style': 'path'}))