Merge pull request #343 from theanalyst/ceph-master-public-buckets-qa

Ceph master public buckets backport

Reviewed-By: Casey Bodley <cbodley@redhat.com>
This commit is contained in:
Abhishek L 2020-03-30 15:25:57 +02:00 committed by GitHub
commit 9092d1ac61
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
2 changed files with 317 additions and 0 deletions

View file

@ -293,6 +293,18 @@ def get_bad_auth_client(aws_access_key_id='badauth'):
config=Config(signature_version='s3v4'))
return client
def get_svc_client(client_config=None, svc='s3'):
if client_config == None:
client_config = Config(signature_version='s3v4')
client = boto3.client(service_name=svc,
aws_access_key_id=config.main_access_key,
aws_secret_access_key=config.main_secret_key,
endpoint_url=config.default_endpoint,
use_ssl=config.default_is_secure,
config=client_config)
return client
bucket_counter = itertools.count(1)
def get_new_bucket_name():

View file

@ -69,6 +69,7 @@ from . import (
get_objects_list,
get_main_kms_keyid,
get_secondary_kms_keyid,
get_svc_client,
nuke_prefixed_buckets,
)
@ -12396,3 +12397,307 @@ def test_user_policy():
PolicyName='AllAccessPolicy',
UserName=get_tenant_user_id(),
)
@attr(resource='bucket')
@attr(method='get')
@attr(operation='get bucket policy status on a new bucket')
@attr(assertion='succeeds')
@attr('policy_status')
def test_get_bucket_policy_status():
bucket_name = get_new_bucket()
client = get_client()
resp = client.get_bucket_policy_status(Bucket=bucket_name)
eq(resp['PolicyStatus']['IsPublic'],False)
@attr(resource='bucket')
@attr(method='get')
@attr(operation='get bucket policy status on a public acl bucket')
@attr(assertion='succeeds')
@attr('policy_status')
def test_get_public_acl_bucket_policy_status():
bucket_name = get_new_bucket()
client = get_client()
client = get_client()
client.put_bucket_acl(Bucket=bucket_name, ACL='public-read')
resp = client.get_bucket_policy_status(Bucket=bucket_name)
eq(resp['PolicyStatus']['IsPublic'],True)
@attr(resource='bucket')
@attr(method='get')
@attr(operation='get bucket policy status on a authenticated acl bucket')
@attr(assertion='succeeds')
@attr('policy_status')
def test_get_authpublic_acl_bucket_policy_status():
bucket_name = get_new_bucket()
client = get_client()
client = get_client()
client.put_bucket_acl(Bucket=bucket_name, ACL='authenticated-read')
resp = client.get_bucket_policy_status(Bucket=bucket_name)
eq(resp['PolicyStatus']['IsPublic'],True)
@attr(resource='bucket')
@attr(method='get')
@attr(operation='get bucket policy status on a public policy bucket')
@attr(assertion='succeeds')
@attr('policy_status')
def test_get_publicpolicy_acl_bucket_policy_status():
bucket_name = get_new_bucket()
client = get_client()
client = get_client()
resp = client.get_bucket_policy_status(Bucket=bucket_name)
eq(resp['PolicyStatus']['IsPublic'],False)
resource1 = "arn:aws:s3:::" + bucket_name
resource2 = "arn:aws:s3:::" + bucket_name + "/*"
policy_document = json.dumps(
{
"Version": "2012-10-17",
"Statement": [{
"Effect": "Allow",
"Principal": {"AWS": "*"},
"Action": "s3:ListBucket",
"Resource": [
"{}".format(resource1),
"{}".format(resource2)
]
}]
})
client.put_bucket_policy(Bucket=bucket_name, Policy=policy_document)
resp = client.get_bucket_policy_status(Bucket=bucket_name)
eq(resp['PolicyStatus']['IsPublic'],True)
@attr(resource='bucket')
@attr(method='get')
@attr(operation='get bucket policy status on a public policy bucket')
@attr(assertion='succeeds')
@attr('policy_status')
def test_get_nonpublicpolicy_acl_bucket_policy_status():
bucket_name = get_new_bucket()
client = get_client()
client = get_client()
resp = client.get_bucket_policy_status(Bucket=bucket_name)
eq(resp['PolicyStatus']['IsPublic'],False)
resource1 = "arn:aws:s3:::" + bucket_name
resource2 = "arn:aws:s3:::" + bucket_name + "/*"
policy_document = json.dumps(
{
"Version": "2012-10-17",
"Statement": [{
"Effect": "Allow",
"Principal": {"AWS": "*"},
"Action": "s3:ListBucket",
"Resource": [
"{}".format(resource1),
"{}".format(resource2)
],
"Condition": {
"IpAddress":
{"aws:SourceIp": "10.0.0.0/32"}
}
}]
})
client.put_bucket_policy(Bucket=bucket_name, Policy=policy_document)
resp = client.get_bucket_policy_status(Bucket=bucket_name)
eq(resp['PolicyStatus']['IsPublic'],False)
@attr(resource='bucket')
@attr(method='get')
@attr(operation='get bucket policy status on a public policy bucket')
@attr(assertion='succeeds')
@attr('policy_status')
def test_get_nonpublicpolicy_deny_bucket_policy_status():
bucket_name = get_new_bucket()
client = get_client()
resp = client.get_bucket_policy_status(Bucket=bucket_name)
eq(resp['PolicyStatus']['IsPublic'],False)
resource1 = "arn:aws:s3:::" + bucket_name
resource2 = "arn:aws:s3:::" + bucket_name + "/*"
policy_document = json.dumps(
{
"Version": "2012-10-17",
"Statement": [{
"Effect": "Allow",
"NotPrincipal": {"AWS": "arn:aws:iam::s3tenant1:root"},
"Action": "s3:ListBucket",
"Resource": [
"{}".format(resource1),
"{}".format(resource2)
],
}]
})
client.put_bucket_policy(Bucket=bucket_name, Policy=policy_document)
resp = client.get_bucket_policy_status(Bucket=bucket_name)
eq(resp['PolicyStatus']['IsPublic'],True)
@attr(resource='bucket')
@attr(method='get')
@attr(operation='get public access block on a bucket')
@attr(assertion='succeeds')
@attr('policy_status')
def test_get_default_public_block():
#client = get_svc_client(svc='s3control', client_config=Config(s3={'addressing_style': 'path'}))
bucket_name = get_new_bucket()
client = get_client()
resp = client.get_public_access_block(Bucket=bucket_name)
eq(resp['PublicAccessBlockConfiguration']['BlockPublicAcls'], False)
eq(resp['PublicAccessBlockConfiguration']['BlockPublicPolicy'], False)
eq(resp['PublicAccessBlockConfiguration']['IgnorePublicAcls'], False)
eq(resp['PublicAccessBlockConfiguration']['RestrictPublicBuckets'], False)
@attr(resource='bucket')
@attr(method='put')
@attr(operation='get public access block on a bucket')
@attr(assertion='succeeds')
@attr('policy_status')
def test_put_public_block():
#client = get_svc_client(svc='s3control', client_config=Config(s3={'addressing_style': 'path'}))
bucket_name = get_new_bucket()
client = get_client()
access_conf = {'BlockPublicAcls': True,
'IgnorePublicAcls': True,
'BlockPublicPolicy': True,
'RestrictPublicBuckets': False}
client.put_public_access_block(Bucket=bucket_name, PublicAccessBlockConfiguration=access_conf)
resp = client.get_public_access_block(Bucket=bucket_name)
eq(resp['PublicAccessBlockConfiguration']['BlockPublicAcls'], access_conf['BlockPublicAcls'])
eq(resp['PublicAccessBlockConfiguration']['BlockPublicPolicy'], access_conf['BlockPublicPolicy'])
eq(resp['PublicAccessBlockConfiguration']['IgnorePublicAcls'], access_conf['IgnorePublicAcls'])
eq(resp['PublicAccessBlockConfiguration']['RestrictPublicBuckets'], access_conf['RestrictPublicBuckets'])
@attr(resource='bucket')
@attr(method='put')
@attr(operation='get public access block on a bucket')
@attr(assertion='succeeds')
@attr('policy_status')
def test_block_public_put_bucket_acls():
#client = get_svc_client(svc='s3control', client_config=Config(s3={'addressing_style': 'path'}))
bucket_name = get_new_bucket()
client = get_client()
access_conf = {'BlockPublicAcls': True,
'IgnorePublicAcls': False,
'BlockPublicPolicy': True,
'RestrictPublicBuckets': False}
client.put_public_access_block(Bucket=bucket_name, PublicAccessBlockConfiguration=access_conf)
resp = client.get_public_access_block(Bucket=bucket_name)
eq(resp['PublicAccessBlockConfiguration']['BlockPublicAcls'], access_conf['BlockPublicAcls'])
eq(resp['PublicAccessBlockConfiguration']['BlockPublicPolicy'], access_conf['BlockPublicPolicy'])
e = assert_raises(ClientError, client.put_bucket_acl, Bucket=bucket_name,ACL='public-read')
status, error_code = _get_status_and_error_code(e.response)
eq(status, 403)
e = assert_raises(ClientError, client.put_bucket_acl, Bucket=bucket_name,ACL='public-read-write')
status, error_code = _get_status_and_error_code(e.response)
eq(status, 403)
e = assert_raises(ClientError, client.put_bucket_acl, Bucket=bucket_name,ACL='authenticated-read')
status, error_code = _get_status_and_error_code(e.response)
eq(status, 403)
@attr(resource='bucket')
@attr(method='put')
@attr(operation='block public acls on canned acls')
@attr(assertion='succeeds')
@attr('policy_status')
def test_block_public_object_canned_acls():
bucket_name = get_new_bucket()
client = get_client()
access_conf = {'BlockPublicAcls': True,
'IgnorePublicAcls': False,
'BlockPublicPolicy': False,
'RestrictPublicBuckets': False}
client.put_public_access_block(Bucket=bucket_name, PublicAccessBlockConfiguration=access_conf)
# resp = client.get_public_access_block(Bucket=bucket_name)
# eq(resp['PublicAccessBlockConfiguration']['BlockPublicAcls'], access_conf['BlockPublicAcls'])
# eq(resp['PublicAccessBlockConfiguration']['BlockPublicPolicy'], access_conf['BlockPublicPolicy'])
#FIXME: use empty body until #42208
e = assert_raises(ClientError, client.put_object, Bucket=bucket_name, Key='foo1', Body='', ACL='public-read')
status, error_code = _get_status_and_error_code(e.response)
eq(status, 403)
e = assert_raises(ClientError, client.put_object, Bucket=bucket_name, Key='foo2', Body='', ACL='public-read')
status, error_code = _get_status_and_error_code(e.response)
eq(status, 403)
e = assert_raises(ClientError, client.put_object, Bucket=bucket_name, Key='foo3', Body='', ACL='authenticated-read')
status, error_code = _get_status_and_error_code(e.response)
eq(status, 403)
@attr(resource='bucket')
@attr(method='put')
@attr(operation='block public acls on canned acls')
@attr(assertion='succeeds')
@attr('policy_status')
def test_block_public_policy():
bucket_name = get_new_bucket()
client = get_client()
access_conf = {'BlockPublicAcls': False,
'IgnorePublicAcls': False,
'BlockPublicPolicy': True,
'RestrictPublicBuckets': False}
client.put_public_access_block(Bucket=bucket_name, PublicAccessBlockConfiguration=access_conf)
resource = _make_arn_resource("{}/{}".format(bucket_name, "*"))
policy_document = make_json_policy("s3:GetObject",
resource)
check_access_denied(client.put_bucket_policy, Bucket=bucket_name, Policy=policy_document)
@attr(resource='bucket')
@attr(method='put')
@attr(operation='ignore public acls on canned acls')
@attr(assertion='succeeds')
@attr('policy_status')
def test_ignore_public_acls():
bucket_name = get_new_bucket()
client = get_client()
alt_client = get_alt_client()
client.put_bucket_acl(Bucket=bucket_name, ACL='public-read')
# Public bucket should be accessible
alt_client.list_objects(Bucket=bucket_name)
client.put_object(Bucket=bucket_name,Key='key1',Body='abcde',ACL='public-read')
resp=alt_client.get_object(Bucket=bucket_name, Key='key1')
eq(_get_body(resp), 'abcde')
access_conf = {'BlockPublicAcls': False,
'IgnorePublicAcls': True,
'BlockPublicPolicy': False,
'RestrictPublicBuckets': False}
client.put_public_access_block(Bucket=bucket_name, PublicAccessBlockConfiguration=access_conf)
resource = _make_arn_resource("{}/{}".format(bucket_name, "*"))
client.put_bucket_acl(Bucket=bucket_name, ACL='public-read')
# IgnorePublicACLs is true, so regardless this should behave as a private bucket
check_access_denied(alt_client.list_objects, Bucket=bucket_name)
check_access_denied(alt_client.get_object, Bucket=bucket_name, Key='key1')