|
|
|
@ -69,6 +69,7 @@ from . import (
|
|
|
|
|
get_objects_list,
|
|
|
|
|
get_main_kms_keyid,
|
|
|
|
|
get_secondary_kms_keyid,
|
|
|
|
|
get_svc_client,
|
|
|
|
|
nuke_prefixed_buckets,
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
@ -12396,3 +12397,307 @@ def test_user_policy():
|
|
|
|
|
PolicyName='AllAccessPolicy',
|
|
|
|
|
UserName=get_tenant_user_id(),
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@attr(resource='bucket')
|
|
|
|
|
@attr(method='get')
|
|
|
|
|
@attr(operation='get bucket policy status on a new bucket')
|
|
|
|
|
@attr(assertion='succeeds')
|
|
|
|
|
@attr('policy_status')
|
|
|
|
|
def test_get_bucket_policy_status():
|
|
|
|
|
bucket_name = get_new_bucket()
|
|
|
|
|
client = get_client()
|
|
|
|
|
resp = client.get_bucket_policy_status(Bucket=bucket_name)
|
|
|
|
|
eq(resp['PolicyStatus']['IsPublic'],False)
|
|
|
|
|
|
|
|
|
|
@attr(resource='bucket')
|
|
|
|
|
@attr(method='get')
|
|
|
|
|
@attr(operation='get bucket policy status on a public acl bucket')
|
|
|
|
|
@attr(assertion='succeeds')
|
|
|
|
|
@attr('policy_status')
|
|
|
|
|
def test_get_public_acl_bucket_policy_status():
|
|
|
|
|
bucket_name = get_new_bucket()
|
|
|
|
|
client = get_client()
|
|
|
|
|
client = get_client()
|
|
|
|
|
client.put_bucket_acl(Bucket=bucket_name, ACL='public-read')
|
|
|
|
|
resp = client.get_bucket_policy_status(Bucket=bucket_name)
|
|
|
|
|
eq(resp['PolicyStatus']['IsPublic'],True)
|
|
|
|
|
|
|
|
|
|
@attr(resource='bucket')
|
|
|
|
|
@attr(method='get')
|
|
|
|
|
@attr(operation='get bucket policy status on a authenticated acl bucket')
|
|
|
|
|
@attr(assertion='succeeds')
|
|
|
|
|
@attr('policy_status')
|
|
|
|
|
def test_get_authpublic_acl_bucket_policy_status():
|
|
|
|
|
bucket_name = get_new_bucket()
|
|
|
|
|
client = get_client()
|
|
|
|
|
client = get_client()
|
|
|
|
|
client.put_bucket_acl(Bucket=bucket_name, ACL='authenticated-read')
|
|
|
|
|
resp = client.get_bucket_policy_status(Bucket=bucket_name)
|
|
|
|
|
eq(resp['PolicyStatus']['IsPublic'],True)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@attr(resource='bucket')
|
|
|
|
|
@attr(method='get')
|
|
|
|
|
@attr(operation='get bucket policy status on a public policy bucket')
|
|
|
|
|
@attr(assertion='succeeds')
|
|
|
|
|
@attr('policy_status')
|
|
|
|
|
def test_get_publicpolicy_acl_bucket_policy_status():
|
|
|
|
|
bucket_name = get_new_bucket()
|
|
|
|
|
client = get_client()
|
|
|
|
|
client = get_client()
|
|
|
|
|
|
|
|
|
|
resp = client.get_bucket_policy_status(Bucket=bucket_name)
|
|
|
|
|
eq(resp['PolicyStatus']['IsPublic'],False)
|
|
|
|
|
|
|
|
|
|
resource1 = "arn:aws:s3:::" + bucket_name
|
|
|
|
|
resource2 = "arn:aws:s3:::" + bucket_name + "/*"
|
|
|
|
|
policy_document = json.dumps(
|
|
|
|
|
{
|
|
|
|
|
"Version": "2012-10-17",
|
|
|
|
|
"Statement": [{
|
|
|
|
|
"Effect": "Allow",
|
|
|
|
|
"Principal": {"AWS": "*"},
|
|
|
|
|
"Action": "s3:ListBucket",
|
|
|
|
|
"Resource": [
|
|
|
|
|
"{}".format(resource1),
|
|
|
|
|
"{}".format(resource2)
|
|
|
|
|
]
|
|
|
|
|
}]
|
|
|
|
|
})
|
|
|
|
|
|
|
|
|
|
client.put_bucket_policy(Bucket=bucket_name, Policy=policy_document)
|
|
|
|
|
resp = client.get_bucket_policy_status(Bucket=bucket_name)
|
|
|
|
|
eq(resp['PolicyStatus']['IsPublic'],True)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@attr(resource='bucket')
|
|
|
|
|
@attr(method='get')
|
|
|
|
|
@attr(operation='get bucket policy status on a public policy bucket')
|
|
|
|
|
@attr(assertion='succeeds')
|
|
|
|
|
@attr('policy_status')
|
|
|
|
|
def test_get_nonpublicpolicy_acl_bucket_policy_status():
|
|
|
|
|
bucket_name = get_new_bucket()
|
|
|
|
|
client = get_client()
|
|
|
|
|
client = get_client()
|
|
|
|
|
|
|
|
|
|
resp = client.get_bucket_policy_status(Bucket=bucket_name)
|
|
|
|
|
eq(resp['PolicyStatus']['IsPublic'],False)
|
|
|
|
|
|
|
|
|
|
resource1 = "arn:aws:s3:::" + bucket_name
|
|
|
|
|
resource2 = "arn:aws:s3:::" + bucket_name + "/*"
|
|
|
|
|
policy_document = json.dumps(
|
|
|
|
|
{
|
|
|
|
|
"Version": "2012-10-17",
|
|
|
|
|
"Statement": [{
|
|
|
|
|
"Effect": "Allow",
|
|
|
|
|
"Principal": {"AWS": "*"},
|
|
|
|
|
"Action": "s3:ListBucket",
|
|
|
|
|
"Resource": [
|
|
|
|
|
"{}".format(resource1),
|
|
|
|
|
"{}".format(resource2)
|
|
|
|
|
],
|
|
|
|
|
"Condition": {
|
|
|
|
|
"IpAddress":
|
|
|
|
|
{"aws:SourceIp": "10.0.0.0/32"}
|
|
|
|
|
}
|
|
|
|
|
}]
|
|
|
|
|
})
|
|
|
|
|
|
|
|
|
|
client.put_bucket_policy(Bucket=bucket_name, Policy=policy_document)
|
|
|
|
|
resp = client.get_bucket_policy_status(Bucket=bucket_name)
|
|
|
|
|
eq(resp['PolicyStatus']['IsPublic'],False)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@attr(resource='bucket')
|
|
|
|
|
@attr(method='get')
|
|
|
|
|
@attr(operation='get bucket policy status on a public policy bucket')
|
|
|
|
|
@attr(assertion='succeeds')
|
|
|
|
|
@attr('policy_status')
|
|
|
|
|
def test_get_nonpublicpolicy_deny_bucket_policy_status():
|
|
|
|
|
bucket_name = get_new_bucket()
|
|
|
|
|
client = get_client()
|
|
|
|
|
|
|
|
|
|
resp = client.get_bucket_policy_status(Bucket=bucket_name)
|
|
|
|
|
eq(resp['PolicyStatus']['IsPublic'],False)
|
|
|
|
|
|
|
|
|
|
resource1 = "arn:aws:s3:::" + bucket_name
|
|
|
|
|
resource2 = "arn:aws:s3:::" + bucket_name + "/*"
|
|
|
|
|
policy_document = json.dumps(
|
|
|
|
|
{
|
|
|
|
|
"Version": "2012-10-17",
|
|
|
|
|
"Statement": [{
|
|
|
|
|
"Effect": "Allow",
|
|
|
|
|
"NotPrincipal": {"AWS": "arn:aws:iam::s3tenant1:root"},
|
|
|
|
|
"Action": "s3:ListBucket",
|
|
|
|
|
"Resource": [
|
|
|
|
|
"{}".format(resource1),
|
|
|
|
|
"{}".format(resource2)
|
|
|
|
|
],
|
|
|
|
|
}]
|
|
|
|
|
})
|
|
|
|
|
|
|
|
|
|
client.put_bucket_policy(Bucket=bucket_name, Policy=policy_document)
|
|
|
|
|
resp = client.get_bucket_policy_status(Bucket=bucket_name)
|
|
|
|
|
eq(resp['PolicyStatus']['IsPublic'],True)
|
|
|
|
|
|
|
|
|
|
@attr(resource='bucket')
|
|
|
|
|
@attr(method='get')
|
|
|
|
|
@attr(operation='get public access block on a bucket')
|
|
|
|
|
@attr(assertion='succeeds')
|
|
|
|
|
@attr('policy_status')
|
|
|
|
|
def test_get_default_public_block():
|
|
|
|
|
#client = get_svc_client(svc='s3control', client_config=Config(s3={'addressing_style': 'path'}))
|
|
|
|
|
bucket_name = get_new_bucket()
|
|
|
|
|
client = get_client()
|
|
|
|
|
|
|
|
|
|
resp = client.get_public_access_block(Bucket=bucket_name)
|
|
|
|
|
eq(resp['PublicAccessBlockConfiguration']['BlockPublicAcls'], False)
|
|
|
|
|
eq(resp['PublicAccessBlockConfiguration']['BlockPublicPolicy'], False)
|
|
|
|
|
eq(resp['PublicAccessBlockConfiguration']['IgnorePublicAcls'], False)
|
|
|
|
|
eq(resp['PublicAccessBlockConfiguration']['RestrictPublicBuckets'], False)
|
|
|
|
|
|
|
|
|
|
@attr(resource='bucket')
|
|
|
|
|
@attr(method='put')
|
|
|
|
|
@attr(operation='get public access block on a bucket')
|
|
|
|
|
@attr(assertion='succeeds')
|
|
|
|
|
@attr('policy_status')
|
|
|
|
|
def test_put_public_block():
|
|
|
|
|
#client = get_svc_client(svc='s3control', client_config=Config(s3={'addressing_style': 'path'}))
|
|
|
|
|
bucket_name = get_new_bucket()
|
|
|
|
|
client = get_client()
|
|
|
|
|
|
|
|
|
|
access_conf = {'BlockPublicAcls': True,
|
|
|
|
|
'IgnorePublicAcls': True,
|
|
|
|
|
'BlockPublicPolicy': True,
|
|
|
|
|
'RestrictPublicBuckets': False}
|
|
|
|
|
|
|
|
|
|
client.put_public_access_block(Bucket=bucket_name, PublicAccessBlockConfiguration=access_conf)
|
|
|
|
|
|
|
|
|
|
resp = client.get_public_access_block(Bucket=bucket_name)
|
|
|
|
|
eq(resp['PublicAccessBlockConfiguration']['BlockPublicAcls'], access_conf['BlockPublicAcls'])
|
|
|
|
|
eq(resp['PublicAccessBlockConfiguration']['BlockPublicPolicy'], access_conf['BlockPublicPolicy'])
|
|
|
|
|
eq(resp['PublicAccessBlockConfiguration']['IgnorePublicAcls'], access_conf['IgnorePublicAcls'])
|
|
|
|
|
eq(resp['PublicAccessBlockConfiguration']['RestrictPublicBuckets'], access_conf['RestrictPublicBuckets'])
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@attr(resource='bucket')
|
|
|
|
|
@attr(method='put')
|
|
|
|
|
@attr(operation='get public access block on a bucket')
|
|
|
|
|
@attr(assertion='succeeds')
|
|
|
|
|
@attr('policy_status')
|
|
|
|
|
def test_block_public_put_bucket_acls():
|
|
|
|
|
#client = get_svc_client(svc='s3control', client_config=Config(s3={'addressing_style': 'path'}))
|
|
|
|
|
bucket_name = get_new_bucket()
|
|
|
|
|
client = get_client()
|
|
|
|
|
|
|
|
|
|
access_conf = {'BlockPublicAcls': True,
|
|
|
|
|
'IgnorePublicAcls': False,
|
|
|
|
|
'BlockPublicPolicy': True,
|
|
|
|
|
'RestrictPublicBuckets': False}
|
|
|
|
|
|
|
|
|
|
client.put_public_access_block(Bucket=bucket_name, PublicAccessBlockConfiguration=access_conf)
|
|
|
|
|
|
|
|
|
|
resp = client.get_public_access_block(Bucket=bucket_name)
|
|
|
|
|
eq(resp['PublicAccessBlockConfiguration']['BlockPublicAcls'], access_conf['BlockPublicAcls'])
|
|
|
|
|
eq(resp['PublicAccessBlockConfiguration']['BlockPublicPolicy'], access_conf['BlockPublicPolicy'])
|
|
|
|
|
|
|
|
|
|
e = assert_raises(ClientError, client.put_bucket_acl, Bucket=bucket_name,ACL='public-read')
|
|
|
|
|
status, error_code = _get_status_and_error_code(e.response)
|
|
|
|
|
eq(status, 403)
|
|
|
|
|
|
|
|
|
|
e = assert_raises(ClientError, client.put_bucket_acl, Bucket=bucket_name,ACL='public-read-write')
|
|
|
|
|
status, error_code = _get_status_and_error_code(e.response)
|
|
|
|
|
eq(status, 403)
|
|
|
|
|
|
|
|
|
|
e = assert_raises(ClientError, client.put_bucket_acl, Bucket=bucket_name,ACL='authenticated-read')
|
|
|
|
|
status, error_code = _get_status_and_error_code(e.response)
|
|
|
|
|
eq(status, 403)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@attr(resource='bucket')
|
|
|
|
|
@attr(method='put')
|
|
|
|
|
@attr(operation='block public acls on canned acls')
|
|
|
|
|
@attr(assertion='succeeds')
|
|
|
|
|
@attr('policy_status')
|
|
|
|
|
def test_block_public_object_canned_acls():
|
|
|
|
|
bucket_name = get_new_bucket()
|
|
|
|
|
client = get_client()
|
|
|
|
|
|
|
|
|
|
access_conf = {'BlockPublicAcls': True,
|
|
|
|
|
'IgnorePublicAcls': False,
|
|
|
|
|
'BlockPublicPolicy': False,
|
|
|
|
|
'RestrictPublicBuckets': False}
|
|
|
|
|
|
|
|
|
|
client.put_public_access_block(Bucket=bucket_name, PublicAccessBlockConfiguration=access_conf)
|
|
|
|
|
|
|
|
|
|
# resp = client.get_public_access_block(Bucket=bucket_name)
|
|
|
|
|
# eq(resp['PublicAccessBlockConfiguration']['BlockPublicAcls'], access_conf['BlockPublicAcls'])
|
|
|
|
|
# eq(resp['PublicAccessBlockConfiguration']['BlockPublicPolicy'], access_conf['BlockPublicPolicy'])
|
|
|
|
|
|
|
|
|
|
#FIXME: use empty body until #42208
|
|
|
|
|
e = assert_raises(ClientError, client.put_object, Bucket=bucket_name, Key='foo1', Body='', ACL='public-read')
|
|
|
|
|
status, error_code = _get_status_and_error_code(e.response)
|
|
|
|
|
eq(status, 403)
|
|
|
|
|
|
|
|
|
|
e = assert_raises(ClientError, client.put_object, Bucket=bucket_name, Key='foo2', Body='', ACL='public-read')
|
|
|
|
|
status, error_code = _get_status_and_error_code(e.response)
|
|
|
|
|
eq(status, 403)
|
|
|
|
|
|
|
|
|
|
e = assert_raises(ClientError, client.put_object, Bucket=bucket_name, Key='foo3', Body='', ACL='authenticated-read')
|
|
|
|
|
status, error_code = _get_status_and_error_code(e.response)
|
|
|
|
|
eq(status, 403)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@attr(resource='bucket')
|
|
|
|
|
@attr(method='put')
|
|
|
|
|
@attr(operation='block public acls on canned acls')
|
|
|
|
|
@attr(assertion='succeeds')
|
|
|
|
|
@attr('policy_status')
|
|
|
|
|
def test_block_public_policy():
|
|
|
|
|
bucket_name = get_new_bucket()
|
|
|
|
|
client = get_client()
|
|
|
|
|
|
|
|
|
|
access_conf = {'BlockPublicAcls': False,
|
|
|
|
|
'IgnorePublicAcls': False,
|
|
|
|
|
'BlockPublicPolicy': True,
|
|
|
|
|
'RestrictPublicBuckets': False}
|
|
|
|
|
|
|
|
|
|
client.put_public_access_block(Bucket=bucket_name, PublicAccessBlockConfiguration=access_conf)
|
|
|
|
|
resource = _make_arn_resource("{}/{}".format(bucket_name, "*"))
|
|
|
|
|
policy_document = make_json_policy("s3:GetObject",
|
|
|
|
|
resource)
|
|
|
|
|
|
|
|
|
|
check_access_denied(client.put_bucket_policy, Bucket=bucket_name, Policy=policy_document)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@attr(resource='bucket')
|
|
|
|
|
@attr(method='put')
|
|
|
|
|
@attr(operation='ignore public acls on canned acls')
|
|
|
|
|
@attr(assertion='succeeds')
|
|
|
|
|
@attr('policy_status')
|
|
|
|
|
def test_ignore_public_acls():
|
|
|
|
|
bucket_name = get_new_bucket()
|
|
|
|
|
client = get_client()
|
|
|
|
|
alt_client = get_alt_client()
|
|
|
|
|
|
|
|
|
|
client.put_bucket_acl(Bucket=bucket_name, ACL='public-read')
|
|
|
|
|
# Public bucket should be accessible
|
|
|
|
|
alt_client.list_objects(Bucket=bucket_name)
|
|
|
|
|
|
|
|
|
|
client.put_object(Bucket=bucket_name,Key='key1',Body='abcde',ACL='public-read')
|
|
|
|
|
resp=alt_client.get_object(Bucket=bucket_name, Key='key1')
|
|
|
|
|
eq(_get_body(resp), 'abcde')
|
|
|
|
|
|
|
|
|
|
access_conf = {'BlockPublicAcls': False,
|
|
|
|
|
'IgnorePublicAcls': True,
|
|
|
|
|
'BlockPublicPolicy': False,
|
|
|
|
|
'RestrictPublicBuckets': False}
|
|
|
|
|
|
|
|
|
|
client.put_public_access_block(Bucket=bucket_name, PublicAccessBlockConfiguration=access_conf)
|
|
|
|
|
resource = _make_arn_resource("{}/{}".format(bucket_name, "*"))
|
|
|
|
|
|
|
|
|
|
client.put_bucket_acl(Bucket=bucket_name, ACL='public-read')
|
|
|
|
|
# IgnorePublicACLs is true, so regardless this should behave as a private bucket
|
|
|
|
|
check_access_denied(alt_client.list_objects, Bucket=bucket_name)
|
|
|
|
|
check_access_denied(alt_client.get_object, Bucket=bucket_name, Key='key1')
|
|
|
|
|