From 0377466704fba6847e109b1ac035e48284293561 Mon Sep 17 00:00:00 2001 From: Casey Bodley Date: Mon, 1 Jan 2024 19:11:24 -0500 Subject: [PATCH] iam: test bucket policy principal for iam user with path Signed-off-by: Casey Bodley --- s3tests_boto3/functional/test_iam.py | 57 ++++++++++++++++++++++++++++ 1 file changed, 57 insertions(+) diff --git a/s3tests_boto3/functional/test_iam.py b/s3tests_boto3/functional/test_iam.py index be0a6bb..4919798 100644 --- a/s3tests_boto3/functional/test_iam.py +++ b/s3tests_boto3/functional/test_iam.py @@ -1,5 +1,6 @@ import json import datetime +import time from botocore.exceptions import ClientError import pytest @@ -1276,3 +1277,59 @@ def test_account_user_access_key_list(iam_root): keys = sorted([id1, id2]) assert keys == sorted(user_list_key_ids(iam_root, UserName=name)) assert keys == sorted(user_list_key_ids(iam_root, UserName=name, PaginationConfig={'PageSize': 1})) + +def retry_on(code, tries, func, *args, **kwargs): + for i in range(tries): + try: + return func(*args, **kwargs) + except ClientError as e: + err = e.response['Error']['Code'] + if i + 1 < tries and err in code: + print(f'Got {err}, retrying in {i}s..') + time.sleep(i) + continue + raise + + +@pytest.mark.iam_account +@pytest.mark.iam_user +def test_account_user_bucket_policy_allow(iam_root): + path = get_iam_path_prefix() + name = make_iam_name('name') + response = iam_root.create_user(UserName=name, Path=path) + user_arn = response['User']['Arn'] + assert user_arn.startswith('arn:aws:iam:') + assert user_arn.endswith(f':user{path}{name}') + + key = iam_root.create_access_key(UserName=name)['AccessKey'] + client = get_iam_s3client(aws_access_key_id=key['AccessKeyId'], + aws_secret_access_key=key['SecretAccessKey']) + + # create a bucket with the root user + roots3 = get_iam_root_client(service_name='s3') + bucket = get_new_bucket(roots3) + try: + # the access key may take a bit to start working. retry until it returns + # something other than InvalidAccessKeyId + e = assert_raises(ClientError, retry_on, 'InvalidAccessKeyId', 10, client.list_objects, Bucket=bucket) + # expect AccessDenied because no identity policy allows s3 actions + status, error_code = _get_status_and_error_code(e.response) + assert status == 403 + assert error_code == 'AccessDenied' + + # add a bucket policy that allows s3:ListBucket for the iam user's arn + policy = json.dumps({ + 'Version': '2012-10-17', + 'Statement': [{ + 'Effect': 'Allow', + 'Principal': {'AWS': user_arn}, + 'Action': 's3:ListBucket', + 'Resource': f'arn:aws:s3:::{bucket}' + }] + }) + roots3.put_bucket_policy(Bucket=bucket, Policy=policy) + + # verify that the iam user can eventually access it + retry_on('AccessDenied', 10, client.list_objects, Bucket=bucket) + finally: + roots3.delete_bucket(Bucket=bucket)