diff --git a/s3tests_boto3/functional/__init__.py b/s3tests_boto3/functional/__init__.py index 5d7e3c1..9549276 100644 --- a/s3tests_boto3/functional/__init__.py +++ b/s3tests_boto3/functional/__init__.py @@ -293,6 +293,18 @@ def get_bad_auth_client(aws_access_key_id='badauth'): config=Config(signature_version='s3v4')) return client +def get_svc_client(client_config=None, svc='s3'): + if client_config == None: + client_config = Config(signature_version='s3v4') + + client = boto3.client(service_name=svc, + aws_access_key_id=config.main_access_key, + aws_secret_access_key=config.main_secret_key, + endpoint_url=config.default_endpoint, + use_ssl=config.default_is_secure, + config=client_config) + return client + bucket_counter = itertools.count(1) def get_new_bucket_name(): diff --git a/s3tests_boto3/functional/test_s3.py b/s3tests_boto3/functional/test_s3.py index c3d2045..aaf885b 100644 --- a/s3tests_boto3/functional/test_s3.py +++ b/s3tests_boto3/functional/test_s3.py @@ -69,6 +69,7 @@ from . import ( get_objects_list, get_main_kms_keyid, get_secondary_kms_keyid, + get_svc_client, nuke_prefixed_buckets, ) @@ -12396,3 +12397,307 @@ def test_user_policy(): PolicyName='AllAccessPolicy', UserName=get_tenant_user_id(), ) + + +@attr(resource='bucket') +@attr(method='get') +@attr(operation='get bucket policy status on a new bucket') +@attr(assertion='succeeds') +@attr('policy_status') +def test_get_bucket_policy_status(): + bucket_name = get_new_bucket() + client = get_client() + resp = client.get_bucket_policy_status(Bucket=bucket_name) + eq(resp['PolicyStatus']['IsPublic'],False) + +@attr(resource='bucket') +@attr(method='get') +@attr(operation='get bucket policy status on a public acl bucket') +@attr(assertion='succeeds') +@attr('policy_status') +def test_get_public_acl_bucket_policy_status(): + bucket_name = get_new_bucket() + client = get_client() + client = get_client() + client.put_bucket_acl(Bucket=bucket_name, ACL='public-read') + resp = client.get_bucket_policy_status(Bucket=bucket_name) + eq(resp['PolicyStatus']['IsPublic'],True) + +@attr(resource='bucket') +@attr(method='get') +@attr(operation='get bucket policy status on a authenticated acl bucket') +@attr(assertion='succeeds') +@attr('policy_status') +def test_get_authpublic_acl_bucket_policy_status(): + bucket_name = get_new_bucket() + client = get_client() + client = get_client() + client.put_bucket_acl(Bucket=bucket_name, ACL='authenticated-read') + resp = client.get_bucket_policy_status(Bucket=bucket_name) + eq(resp['PolicyStatus']['IsPublic'],True) + + +@attr(resource='bucket') +@attr(method='get') +@attr(operation='get bucket policy status on a public policy bucket') +@attr(assertion='succeeds') +@attr('policy_status') +def test_get_publicpolicy_acl_bucket_policy_status(): + bucket_name = get_new_bucket() + client = get_client() + client = get_client() + + resp = client.get_bucket_policy_status(Bucket=bucket_name) + eq(resp['PolicyStatus']['IsPublic'],False) + + resource1 = "arn:aws:s3:::" + bucket_name + resource2 = "arn:aws:s3:::" + bucket_name + "/*" + policy_document = json.dumps( + { + "Version": "2012-10-17", + "Statement": [{ + "Effect": "Allow", + "Principal": {"AWS": "*"}, + "Action": "s3:ListBucket", + "Resource": [ + "{}".format(resource1), + "{}".format(resource2) + ] + }] + }) + + client.put_bucket_policy(Bucket=bucket_name, Policy=policy_document) + resp = client.get_bucket_policy_status(Bucket=bucket_name) + eq(resp['PolicyStatus']['IsPublic'],True) + + +@attr(resource='bucket') +@attr(method='get') +@attr(operation='get bucket policy status on a public policy bucket') +@attr(assertion='succeeds') +@attr('policy_status') +def test_get_nonpublicpolicy_acl_bucket_policy_status(): + bucket_name = get_new_bucket() + client = get_client() + client = get_client() + + resp = client.get_bucket_policy_status(Bucket=bucket_name) + eq(resp['PolicyStatus']['IsPublic'],False) + + resource1 = "arn:aws:s3:::" + bucket_name + resource2 = "arn:aws:s3:::" + bucket_name + "/*" + policy_document = json.dumps( + { + "Version": "2012-10-17", + "Statement": [{ + "Effect": "Allow", + "Principal": {"AWS": "*"}, + "Action": "s3:ListBucket", + "Resource": [ + "{}".format(resource1), + "{}".format(resource2) + ], + "Condition": { + "IpAddress": + {"aws:SourceIp": "10.0.0.0/32"} + } + }] + }) + + client.put_bucket_policy(Bucket=bucket_name, Policy=policy_document) + resp = client.get_bucket_policy_status(Bucket=bucket_name) + eq(resp['PolicyStatus']['IsPublic'],False) + + +@attr(resource='bucket') +@attr(method='get') +@attr(operation='get bucket policy status on a public policy bucket') +@attr(assertion='succeeds') +@attr('policy_status') +def test_get_nonpublicpolicy_deny_bucket_policy_status(): + bucket_name = get_new_bucket() + client = get_client() + + resp = client.get_bucket_policy_status(Bucket=bucket_name) + eq(resp['PolicyStatus']['IsPublic'],False) + + resource1 = "arn:aws:s3:::" + bucket_name + resource2 = "arn:aws:s3:::" + bucket_name + "/*" + policy_document = json.dumps( + { + "Version": "2012-10-17", + "Statement": [{ + "Effect": "Allow", + "NotPrincipal": {"AWS": "arn:aws:iam::s3tenant1:root"}, + "Action": "s3:ListBucket", + "Resource": [ + "{}".format(resource1), + "{}".format(resource2) + ], + }] + }) + + client.put_bucket_policy(Bucket=bucket_name, Policy=policy_document) + resp = client.get_bucket_policy_status(Bucket=bucket_name) + eq(resp['PolicyStatus']['IsPublic'],True) + +@attr(resource='bucket') +@attr(method='get') +@attr(operation='get public access block on a bucket') +@attr(assertion='succeeds') +@attr('policy_status') +def test_get_default_public_block(): + #client = get_svc_client(svc='s3control', client_config=Config(s3={'addressing_style': 'path'})) + bucket_name = get_new_bucket() + client = get_client() + + resp = client.get_public_access_block(Bucket=bucket_name) + eq(resp['PublicAccessBlockConfiguration']['BlockPublicAcls'], False) + eq(resp['PublicAccessBlockConfiguration']['BlockPublicPolicy'], False) + eq(resp['PublicAccessBlockConfiguration']['IgnorePublicAcls'], False) + eq(resp['PublicAccessBlockConfiguration']['RestrictPublicBuckets'], False) + +@attr(resource='bucket') +@attr(method='put') +@attr(operation='get public access block on a bucket') +@attr(assertion='succeeds') +@attr('policy_status') +def test_put_public_block(): + #client = get_svc_client(svc='s3control', client_config=Config(s3={'addressing_style': 'path'})) + bucket_name = get_new_bucket() + client = get_client() + + access_conf = {'BlockPublicAcls': True, + 'IgnorePublicAcls': True, + 'BlockPublicPolicy': True, + 'RestrictPublicBuckets': False} + + client.put_public_access_block(Bucket=bucket_name, PublicAccessBlockConfiguration=access_conf) + + resp = client.get_public_access_block(Bucket=bucket_name) + eq(resp['PublicAccessBlockConfiguration']['BlockPublicAcls'], access_conf['BlockPublicAcls']) + eq(resp['PublicAccessBlockConfiguration']['BlockPublicPolicy'], access_conf['BlockPublicPolicy']) + eq(resp['PublicAccessBlockConfiguration']['IgnorePublicAcls'], access_conf['IgnorePublicAcls']) + eq(resp['PublicAccessBlockConfiguration']['RestrictPublicBuckets'], access_conf['RestrictPublicBuckets']) + + +@attr(resource='bucket') +@attr(method='put') +@attr(operation='get public access block on a bucket') +@attr(assertion='succeeds') +@attr('policy_status') +def test_block_public_put_bucket_acls(): + #client = get_svc_client(svc='s3control', client_config=Config(s3={'addressing_style': 'path'})) + bucket_name = get_new_bucket() + client = get_client() + + access_conf = {'BlockPublicAcls': True, + 'IgnorePublicAcls': False, + 'BlockPublicPolicy': True, + 'RestrictPublicBuckets': False} + + client.put_public_access_block(Bucket=bucket_name, PublicAccessBlockConfiguration=access_conf) + + resp = client.get_public_access_block(Bucket=bucket_name) + eq(resp['PublicAccessBlockConfiguration']['BlockPublicAcls'], access_conf['BlockPublicAcls']) + eq(resp['PublicAccessBlockConfiguration']['BlockPublicPolicy'], access_conf['BlockPublicPolicy']) + + e = assert_raises(ClientError, client.put_bucket_acl, Bucket=bucket_name,ACL='public-read') + status, error_code = _get_status_and_error_code(e.response) + eq(status, 403) + + e = assert_raises(ClientError, client.put_bucket_acl, Bucket=bucket_name,ACL='public-read-write') + status, error_code = _get_status_and_error_code(e.response) + eq(status, 403) + + e = assert_raises(ClientError, client.put_bucket_acl, Bucket=bucket_name,ACL='authenticated-read') + status, error_code = _get_status_and_error_code(e.response) + eq(status, 403) + + +@attr(resource='bucket') +@attr(method='put') +@attr(operation='block public acls on canned acls') +@attr(assertion='succeeds') +@attr('policy_status') +def test_block_public_object_canned_acls(): + bucket_name = get_new_bucket() + client = get_client() + + access_conf = {'BlockPublicAcls': True, + 'IgnorePublicAcls': False, + 'BlockPublicPolicy': False, + 'RestrictPublicBuckets': False} + + client.put_public_access_block(Bucket=bucket_name, PublicAccessBlockConfiguration=access_conf) + + # resp = client.get_public_access_block(Bucket=bucket_name) + # eq(resp['PublicAccessBlockConfiguration']['BlockPublicAcls'], access_conf['BlockPublicAcls']) + # eq(resp['PublicAccessBlockConfiguration']['BlockPublicPolicy'], access_conf['BlockPublicPolicy']) + + #FIXME: use empty body until #42208 + e = assert_raises(ClientError, client.put_object, Bucket=bucket_name, Key='foo1', Body='', ACL='public-read') + status, error_code = _get_status_and_error_code(e.response) + eq(status, 403) + + e = assert_raises(ClientError, client.put_object, Bucket=bucket_name, Key='foo2', Body='', ACL='public-read') + status, error_code = _get_status_and_error_code(e.response) + eq(status, 403) + + e = assert_raises(ClientError, client.put_object, Bucket=bucket_name, Key='foo3', Body='', ACL='authenticated-read') + status, error_code = _get_status_and_error_code(e.response) + eq(status, 403) + + +@attr(resource='bucket') +@attr(method='put') +@attr(operation='block public acls on canned acls') +@attr(assertion='succeeds') +@attr('policy_status') +def test_block_public_policy(): + bucket_name = get_new_bucket() + client = get_client() + + access_conf = {'BlockPublicAcls': False, + 'IgnorePublicAcls': False, + 'BlockPublicPolicy': True, + 'RestrictPublicBuckets': False} + + client.put_public_access_block(Bucket=bucket_name, PublicAccessBlockConfiguration=access_conf) + resource = _make_arn_resource("{}/{}".format(bucket_name, "*")) + policy_document = make_json_policy("s3:GetObject", + resource) + + check_access_denied(client.put_bucket_policy, Bucket=bucket_name, Policy=policy_document) + + +@attr(resource='bucket') +@attr(method='put') +@attr(operation='ignore public acls on canned acls') +@attr(assertion='succeeds') +@attr('policy_status') +def test_ignore_public_acls(): + bucket_name = get_new_bucket() + client = get_client() + alt_client = get_alt_client() + + client.put_bucket_acl(Bucket=bucket_name, ACL='public-read') + # Public bucket should be accessible + alt_client.list_objects(Bucket=bucket_name) + + client.put_object(Bucket=bucket_name,Key='key1',Body='abcde',ACL='public-read') + resp=alt_client.get_object(Bucket=bucket_name, Key='key1') + eq(_get_body(resp), 'abcde') + + access_conf = {'BlockPublicAcls': False, + 'IgnorePublicAcls': True, + 'BlockPublicPolicy': False, + 'RestrictPublicBuckets': False} + + client.put_public_access_block(Bucket=bucket_name, PublicAccessBlockConfiguration=access_conf) + resource = _make_arn_resource("{}/{}".format(bucket_name, "*")) + + client.put_bucket_acl(Bucket=bucket_name, ACL='public-read') + # IgnorePublicACLs is true, so regardless this should behave as a private bucket + check_access_denied(alt_client.list_objects, Bucket=bucket_name) + check_access_denied(alt_client.get_object, Bucket=bucket_name, Key='key1')