iam: test bucket policy principal for iam user with path

Signed-off-by: Casey Bodley <cbodley@redhat.com>
This commit is contained in:
Casey Bodley 2024-01-01 19:11:24 -05:00
parent db76dfe791
commit 0377466704

View file

@ -1,5 +1,6 @@
import json
import datetime
import time
from botocore.exceptions import ClientError
import pytest
@ -1276,3 +1277,59 @@ def test_account_user_access_key_list(iam_root):
keys = sorted([id1, id2])
assert keys == sorted(user_list_key_ids(iam_root, UserName=name))
assert keys == sorted(user_list_key_ids(iam_root, UserName=name, PaginationConfig={'PageSize': 1}))
def retry_on(code, tries, func, *args, **kwargs):
for i in range(tries):
try:
return func(*args, **kwargs)
except ClientError as e:
err = e.response['Error']['Code']
if i + 1 < tries and err in code:
print(f'Got {err}, retrying in {i}s..')
time.sleep(i)
continue
raise
@pytest.mark.iam_account
@pytest.mark.iam_user
def test_account_user_bucket_policy_allow(iam_root):
path = get_iam_path_prefix()
name = make_iam_name('name')
response = iam_root.create_user(UserName=name, Path=path)
user_arn = response['User']['Arn']
assert user_arn.startswith('arn:aws:iam:')
assert user_arn.endswith(f':user{path}{name}')
key = iam_root.create_access_key(UserName=name)['AccessKey']
client = get_iam_s3client(aws_access_key_id=key['AccessKeyId'],
aws_secret_access_key=key['SecretAccessKey'])
# create a bucket with the root user
roots3 = get_iam_root_client(service_name='s3')
bucket = get_new_bucket(roots3)
try:
# the access key may take a bit to start working. retry until it returns
# something other than InvalidAccessKeyId
e = assert_raises(ClientError, retry_on, 'InvalidAccessKeyId', 10, client.list_objects, Bucket=bucket)
# expect AccessDenied because no identity policy allows s3 actions
status, error_code = _get_status_and_error_code(e.response)
assert status == 403
assert error_code == 'AccessDenied'
# add a bucket policy that allows s3:ListBucket for the iam user's arn
policy = json.dumps({
'Version': '2012-10-17',
'Statement': [{
'Effect': 'Allow',
'Principal': {'AWS': user_arn},
'Action': 's3:ListBucket',
'Resource': f'arn:aws:s3:::{bucket}'
}]
})
roots3.put_bucket_policy(Bucket=bucket, Policy=policy)
# verify that the iam user can eventually access it
retry_on('AccessDenied', 10, client.list_objects, Bucket=bucket)
finally:
roots3.delete_bucket(Bucket=bucket)