diff --git a/s3tests_boto3/functional/__init__.py b/s3tests_boto3/functional/__init__.py index 5fd5891..08f884b 100644 --- a/s3tests_boto3/functional/__init__.py +++ b/s3tests_boto3/functional/__init__.py @@ -804,3 +804,24 @@ def get_restore_debug_interval(): def get_read_through_days(): return config.read_through_restore_days + +def create_iam_user_s3client(client): + prefix = get_iam_path_prefix() + + # generate random name + randname = ''.join( + random.choice(string.ascii_lowercase + string.digits) + for c in range(8) + ) + name = make_iam_name(randname) + + user = client.create_user(UserName=name, Path=prefix) + + # create s3 access and secret keys + keys = client.create_access_key(UserName=user['User']['UserName']) + + # create s3 client + return get_iam_s3client( + aws_access_key_id=keys['AccessKey']['AccessKeyId'], + aws_secret_access_key=keys['AccessKey']['SecretAccessKey'], + ) diff --git a/s3tests_boto3/functional/test_s3.py b/s3tests_boto3/functional/test_s3.py index 2e86d44..b6e3c5c 100644 --- a/s3tests_boto3/functional/test_s3.py +++ b/s3tests_boto3/functional/test_s3.py @@ -28,11 +28,8 @@ import dateutil.parser import ssl from collections import namedtuple from collections import defaultdict -from io import StringIO from io import BytesIO -from email.header import decode_header - from .utils import assert_raises from .utils import generate_random from .utils import _get_status_and_error_code @@ -40,6 +37,8 @@ from .utils import _get_status from .policy import Policy, Statement, make_json_policy +from .iam import iam_root + from . import ( configfile, setup_teardown, @@ -68,6 +67,7 @@ from . import ( get_alt_user_id, get_alt_email, get_alt_client, + get_iam_root_client, get_tenant_client, get_v2_tenant_client, get_tenant_iam_client, @@ -90,6 +90,7 @@ from . import ( get_lc_debug_interval, get_restore_debug_interval, get_read_through_days, + create_iam_user_s3client, ) @@ -10848,6 +10849,87 @@ def test_bucketv2_policy(): response = alt_client.list_objects_v2(Bucket=bucket_name) assert len(response['Contents']) == 1 +@pytest.mark.bucket_policy +@pytest.mark.iam_account +@pytest.mark.iam_user +def test_bucket_policy_deny_self_denied_policy(iam_root): + root_client = get_iam_root_client(service_name="s3") + bucket_name = get_new_bucket(root_client) + + resource1 = "arn:aws:s3:::" + bucket_name + resource2 = "arn:aws:s3:::" + bucket_name + "/*" + policy_document = json.dumps( + { + "Version": "2012-10-17", + "Statement": [{ + "Effect": "Deny", + "Principal": "*", + "Action": [ + "s3:PutBucketPolicy", + "s3:GetBucketPolicy", + "s3:DeleteBucketPolicy", + ], + "Resource": [ + "{}".format(resource1), + "{}".format(resource2) + ] + }] + }) + + root_client.put_bucket_policy(Bucket=bucket_name, Policy=policy_document) + + # non-root account should not be able to get, put or delete bucket policy + root_alt_client = create_iam_user_s3client(iam_root) + check_access_denied(root_alt_client.get_bucket_policy, Bucket=bucket_name) + check_access_denied(root_alt_client.delete_bucket_policy, Bucket=bucket_name) + check_access_denied(root_alt_client.put_bucket_policy, Bucket=bucket_name, Policy=policy_document) + + # root account should be able to get, put or delete bucket policy + response = root_client.get_bucket_policy(Bucket=bucket_name) + assert response['Policy'] == policy_document + root_client.delete_bucket_policy(Bucket=bucket_name) + root_client.put_bucket_policy(Bucket=bucket_name, Policy=policy_document) + +@pytest.mark.bucket_policy +@pytest.mark.iam_account +@pytest.mark.iam_user +def test_bucket_policy_deny_self_denied_policy_confirm_header(iam_root): + root_client = get_iam_root_client(service_name="s3") + bucket_name = get_new_bucket(root_client) + + resource1 = "arn:aws:s3:::" + bucket_name + resource2 = "arn:aws:s3:::" + bucket_name + "/*" + policy_document = json.dumps( + { + "Version": "2012-10-17", + "Statement": [{ + "Effect": "Deny", + "Principal": "*", + "Action": [ + "s3:PutBucketPolicy", + "s3:GetBucketPolicy", + "s3:DeleteBucketPolicy", + ], + "Resource": [ + "{}".format(resource1), + "{}".format(resource2) + ] + }] + }) + + root_client.put_bucket_policy(Bucket=bucket_name, Policy=policy_document, ConfirmRemoveSelfBucketAccess=True) + + # non-root account should not be able to get, put or delete bucket policy + root_alt_client = create_iam_user_s3client(iam_root) + check_access_denied(root_alt_client.get_bucket_policy, Bucket=bucket_name) + check_access_denied(root_alt_client.delete_bucket_policy, Bucket=bucket_name) + check_access_denied(root_alt_client.put_bucket_policy, Bucket=bucket_name, Policy=policy_document) + + # root account should not be able to get, put or delete bucket policy + check_access_denied(root_client.get_bucket_policy, Bucket=bucket_name) + check_access_denied(root_client.delete_bucket_policy, Bucket=bucket_name) + check_access_denied(root_client.put_bucket_policy, Bucket=bucket_name, Policy=policy_document) + @pytest.mark.bucket_policy def test_bucket_policy_acl(): bucket_name = get_new_bucket()