iam: test cross-account permissions

test the [iam alt root] user's access to buckets owned by [iam root]
using various policy principals and acl grantees

Signed-off-by: Casey Bodley <cbodley@redhat.com>
(cherry picked from commit ba292fbf59)
This commit is contained in:
Casey Bodley 2024-01-30 18:17:17 -05:00
parent 93d9143791
commit 8182d10048
2 changed files with 380 additions and 0 deletions

View file

@ -17,6 +17,7 @@ markers =
fails_on_s3 fails_on_s3
fails_with_subdomain fails_with_subdomain
iam_account iam_account
iam_cross_account
iam_role iam_role
iam_tenant iam_tenant
iam_user iam_user

View file

@ -13,6 +13,11 @@ from . import (
get_alt_client, get_alt_client,
get_iam_client, get_iam_client,
get_iam_root_client, get_iam_root_client,
get_iam_root_user_id,
get_iam_root_email,
get_iam_alt_root_client,
get_iam_alt_root_user_id,
get_iam_alt_root_email,
make_iam_name, make_iam_name,
get_iam_path_prefix, get_iam_path_prefix,
get_new_bucket, get_new_bucket,
@ -1908,3 +1913,377 @@ def test_account_role_policy_allow(iam_root):
# the policy may take a bit to start working. retry until it returns # the policy may take a bit to start working. retry until it returns
# something other than AccessDenied # something other than AccessDenied
retry_on('AccessDenied', 10, s3.list_buckets) retry_on('AccessDenied', 10, s3.list_buckets)
# fixture for iam alt account root user
@pytest.fixture
def iam_alt_root(configfile):
client = get_iam_alt_root_client()
try:
arn = client.get_user()['User']['Arn']
if not arn.endswith(':root'):
pytest.skip('[iam alt root] user does not have :root arn')
except ClientError as e:
pytest.skip('[iam alt root] user does not belong to an account')
yield client
nuke_users(client, PathPrefix=get_iam_path_prefix())
nuke_roles(client, PathPrefix=get_iam_path_prefix())
# test cross-account access, adding user policy before the bucket policy
def _test_cross_account_user_bucket_policy(roots3, alt_root, alt_name, alt_arn):
# add a user policy that allows s3 actions
alt_root.put_user_policy(UserName=alt_name, PolicyName='AllowStar', PolicyDocument=json.dumps({
'Version': '2012-10-17',
'Statement': [{
'Effect': 'Allow',
'Action': 's3:*',
'Resource': '*'
}]
}))
key = alt_root.create_access_key(UserName=alt_name)['AccessKey']
alts3 = get_iam_s3client(aws_access_key_id=key['AccessKeyId'],
aws_secret_access_key=key['SecretAccessKey'])
# create a bucket with the root user
bucket = get_new_bucket(roots3)
try:
# the access key may take a bit to start working. retry until it returns
# something other than InvalidAccessKeyId
e = assert_raises(ClientError, retry_on, 'InvalidAccessKeyId', 10, alts3.list_objects, Bucket=bucket)
status, error_code = _get_status_and_error_code(e.response)
assert status == 403
assert error_code == 'AccessDenied'
# add a bucket policy that allows s3:ListBucket for the iam user's arn
roots3.put_bucket_policy(Bucket=bucket, Policy=json.dumps({
'Version': '2012-10-17',
'Statement': [{
'Effect': 'Allow',
'Principal': {'AWS': alt_arn},
'Action': 's3:ListBucket',
'Resource': f'arn:aws:s3:::{bucket}'
}]
}))
# verify that the iam user can eventually access it
retry_on('AccessDenied', 10, alts3.list_objects, Bucket=bucket)
finally:
roots3.delete_bucket(Bucket=bucket)
# test cross-account access, adding bucket policy before the user policy
def _test_cross_account_bucket_user_policy(roots3, alt_root, alt_name, alt_arn):
key = alt_root.create_access_key(UserName=alt_name)['AccessKey']
alts3 = get_iam_s3client(aws_access_key_id=key['AccessKeyId'],
aws_secret_access_key=key['SecretAccessKey'])
# create a bucket with the root user
bucket = get_new_bucket(roots3)
try:
# add a bucket policy that allows s3:ListBucket for the iam user's arn
roots3.put_bucket_policy(Bucket=bucket, Policy=json.dumps({
'Version': '2012-10-17',
'Statement': [{
'Effect': 'Allow',
'Principal': {'AWS': alt_arn},
'Action': 's3:ListBucket',
'Resource': f'arn:aws:s3:::{bucket}'
}]
}))
# the access key may take a bit to start working. retry until it returns
# something other than InvalidAccessKeyId
e = assert_raises(ClientError, retry_on, 'InvalidAccessKeyId', 10, alts3.list_objects, Bucket=bucket)
status, error_code = _get_status_and_error_code(e.response)
assert status == 403
assert error_code == 'AccessDenied'
# add a user policy that allows s3 actions
alt_root.put_user_policy(UserName=alt_name, PolicyName='AllowStar', PolicyDocument=json.dumps({
'Version': '2012-10-17',
'Statement': [{
'Effect': 'Allow',
'Action': 's3:*',
'Resource': '*'
}]
}))
# verify that the iam user can eventually access it
retry_on('AccessDenied', 10, alts3.list_objects, Bucket=bucket)
finally:
roots3.delete_bucket(Bucket=bucket)
@pytest.mark.iam_account
@pytest.mark.iam_cross_account
def test_cross_account_bucket_user_policy_allow_user_arn(iam_root, iam_alt_root):
roots3 = get_iam_root_client(service_name='s3')
path = get_iam_path_prefix()
user_name = make_iam_name('AltUser')
response = iam_alt_root.create_user(UserName=user_name, Path=path)
user_arn = response['User']['Arn']
_test_cross_account_bucket_user_policy(roots3, iam_alt_root, user_name, user_arn)
@pytest.mark.iam_account
@pytest.mark.iam_cross_account
def test_cross_account_user_bucket_policy_allow_user_arn(iam_root, iam_alt_root):
roots3 = get_iam_root_client(service_name='s3')
path = get_iam_path_prefix()
user_name = make_iam_name('AltUser')
response = iam_alt_root.create_user(UserName=user_name, Path=path)
user_arn = response['User']['Arn']
_test_cross_account_user_bucket_policy(roots3, iam_alt_root, user_name, user_arn)
@pytest.mark.iam_account
@pytest.mark.iam_cross_account
def test_cross_account_user_bucket_policy_allow_account_arn(iam_root, iam_alt_root):
roots3 = get_iam_root_client(service_name='s3')
path = get_iam_path_prefix()
user_name = make_iam_name('AltUser')
response = iam_alt_root.create_user(UserName=user_name, Path=path)
user_arn = response['User']['Arn']
account_arn = user_arn.replace(f':user{path}{user_name}', ':root')
_test_cross_account_user_bucket_policy(roots3, iam_alt_root, user_name, account_arn)
@pytest.mark.iam_account
@pytest.mark.iam_cross_account
def test_cross_account_bucket_user_policy_allow_account_arn(iam_root, iam_alt_root):
roots3 = get_iam_root_client(service_name='s3')
path = get_iam_path_prefix()
user_name = make_iam_name('AltUser')
response = iam_alt_root.create_user(UserName=user_name, Path=path)
user_arn = response['User']['Arn']
account_arn = user_arn.replace(f':user{path}{user_name}', ':root')
_test_cross_account_bucket_user_policy(roots3, iam_alt_root, user_name, account_arn)
@pytest.mark.iam_account
@pytest.mark.iam_cross_account
def test_cross_account_user_bucket_policy_allow_account_id(iam_root, iam_alt_root):
roots3 = get_iam_root_client(service_name='s3')
path = get_iam_path_prefix()
user_name = make_iam_name('AltUser')
response = iam_alt_root.create_user(UserName=user_name, Path=path)
user_arn = response['User']['Arn']
account_id = user_arn.removeprefix('arn:aws:iam::').removesuffix(f':user{path}{user_name}')
_test_cross_account_user_bucket_policy(roots3, iam_alt_root, user_name, account_id)
@pytest.mark.iam_account
@pytest.mark.iam_cross_account
def test_cross_account_bucket_user_policy_allow_account_id(iam_root, iam_alt_root):
roots3 = get_iam_root_client(service_name='s3')
path = get_iam_path_prefix()
user_name = make_iam_name('AltUser')
response = iam_alt_root.create_user(UserName=user_name, Path=path)
user_arn = response['User']['Arn']
account_id = user_arn.removeprefix('arn:aws:iam::').removesuffix(f':user{path}{user_name}')
_test_cross_account_bucket_user_policy(roots3, iam_alt_root, user_name, account_id)
# test cross-account access, adding user policy before the bucket acl
def _test_cross_account_user_policy_bucket_acl(roots3, alt_root, alt_name, grantee):
# add a user policy that allows s3 actions
alt_root.put_user_policy(UserName=alt_name, PolicyName='AllowStar', PolicyDocument=json.dumps({
'Version': '2012-10-17',
'Statement': [{
'Effect': 'Allow',
'Action': 's3:*',
'Resource': '*'
}]
}))
key = alt_root.create_access_key(UserName=alt_name)['AccessKey']
alts3 = get_iam_s3client(aws_access_key_id=key['AccessKeyId'],
aws_secret_access_key=key['SecretAccessKey'])
# create a bucket with the root user
bucket = get_new_bucket(roots3)
try:
# the access key may take a bit to start working. retry until it returns
# something other than InvalidAccessKeyId
e = assert_raises(ClientError, retry_on, 'InvalidAccessKeyId', 10, alts3.list_objects, Bucket=bucket)
status, error_code = _get_status_and_error_code(e.response)
assert status == 403
assert error_code == 'AccessDenied'
# add a bucket acl that grants READ access
roots3.put_bucket_acl(Bucket=bucket, GrantRead=grantee)
# verify that the iam user can eventually access it
retry_on('AccessDenied', 10, alts3.list_objects, Bucket=bucket)
finally:
roots3.delete_bucket(Bucket=bucket)
# test cross-account access, adding bucket acl before the user policy
def _test_cross_account_bucket_acl_user_policy(roots3, alt_root, alt_name, grantee):
key = alt_root.create_access_key(UserName=alt_name)['AccessKey']
alts3 = get_iam_s3client(aws_access_key_id=key['AccessKeyId'],
aws_secret_access_key=key['SecretAccessKey'])
# create a bucket with the root user
bucket = get_new_bucket(roots3)
try:
# add a bucket acl that grants READ access
roots3.put_bucket_acl(Bucket=bucket, GrantRead=grantee)
# the access key may take a bit to start working. retry until it returns
# something other than InvalidAccessKeyId
e = assert_raises(ClientError, retry_on, 'InvalidAccessKeyId', 10, alts3.list_objects, Bucket=bucket)
status, error_code = _get_status_and_error_code(e.response)
assert status == 403
assert error_code == 'AccessDenied'
# add a user policy that allows s3 actions
alt_root.put_user_policy(UserName=alt_name, PolicyName='AllowStar', PolicyDocument=json.dumps({
'Version': '2012-10-17',
'Statement': [{
'Effect': 'Allow',
'Action': 's3:*',
'Resource': '*'
}]
}))
# verify that the iam user can eventually access it
retry_on('AccessDenied', 10, alts3.list_objects, Bucket=bucket)
finally:
roots3.delete_bucket(Bucket=bucket)
@pytest.mark.iam_account
@pytest.mark.iam_cross_account
@pytest.mark.fails_on_aws # can't grant to individual users
def test_cross_account_bucket_acl_user_policy_grant_user_id(iam_root, iam_alt_root):
roots3 = get_iam_root_client(service_name='s3')
path = get_iam_path_prefix()
user_name = make_iam_name('AltUser')
response = iam_alt_root.create_user(UserName=user_name, Path=path)
grantee = 'id=' + response['User']['UserId']
_test_cross_account_bucket_acl_user_policy(roots3, iam_alt_root, user_name, grantee)
@pytest.mark.iam_account
@pytest.mark.iam_cross_account
@pytest.mark.fails_on_aws # can't grant to individual users
def test_cross_account_user_policy_bucket_acl_grant_user_id(iam_root, iam_alt_root):
roots3 = get_iam_root_client(service_name='s3')
path = get_iam_path_prefix()
user_name = make_iam_name('AltUser')
response = iam_alt_root.create_user(UserName=user_name, Path=path)
grantee = 'id=' + response['User']['UserId']
_test_cross_account_user_policy_bucket_acl(roots3, iam_alt_root, user_name, grantee)
@pytest.mark.iam_account
@pytest.mark.iam_cross_account
def test_cross_account_bucket_acl_user_policy_grant_canonical_id(iam_root, iam_alt_root):
roots3 = get_iam_root_client(service_name='s3')
path = get_iam_path_prefix()
user_name = make_iam_name('AltUser')
iam_alt_root.create_user(UserName=user_name, Path=path)
grantee = 'id=' + get_iam_alt_root_user_id()
_test_cross_account_bucket_acl_user_policy(roots3, iam_alt_root, user_name, grantee)
@pytest.mark.iam_account
@pytest.mark.iam_cross_account
def test_cross_account_user_policy_bucket_acl_grant_canonical_id(iam_root, iam_alt_root):
roots3 = get_iam_root_client(service_name='s3')
path = get_iam_path_prefix()
user_name = make_iam_name('AltUser')
iam_alt_root.create_user(UserName=user_name, Path=path)
grantee = 'id=' + get_iam_alt_root_user_id()
_test_cross_account_user_policy_bucket_acl(roots3, iam_alt_root, user_name, grantee)
@pytest.mark.iam_account
@pytest.mark.iam_cross_account
def test_cross_account_bucket_acl_user_policy_grant_account_email(iam_root, iam_alt_root):
roots3 = get_iam_root_client(service_name='s3')
path = get_iam_path_prefix()
user_name = make_iam_name('AltUser')
iam_alt_root.create_user(UserName=user_name, Path=path)
grantee = 'emailAddress=' + get_iam_alt_root_email()
_test_cross_account_bucket_acl_user_policy(roots3, iam_alt_root, user_name, grantee)
@pytest.mark.iam_account
@pytest.mark.iam_cross_account
def test_cross_account_user_policy_bucket_acl_grant_account_email(iam_root, iam_alt_root):
roots3 = get_iam_root_client(service_name='s3')
path = get_iam_path_prefix()
user_name = make_iam_name('AltUser')
iam_alt_root.create_user(UserName=user_name, Path=path)
grantee = 'emailAddress=' + get_iam_alt_root_email()
_test_cross_account_user_policy_bucket_acl(roots3, iam_alt_root, user_name, grantee)
# test root cross-account access with bucket policy
def _test_cross_account_root_bucket_policy(roots3, alts3, alt_arn):
# create a bucket with the root user
bucket = get_new_bucket(roots3)
try:
e = assert_raises(ClientError, alts3.list_objects, Bucket=bucket)
status, error_code = _get_status_and_error_code(e.response)
assert status == 403
assert error_code == 'AccessDenied'
# add a bucket policy that allows s3:ListBucket for the iam user's arn
roots3.put_bucket_policy(Bucket=bucket, Policy=json.dumps({
'Version': '2012-10-17',
'Statement': [{
'Effect': 'Allow',
'Principal': {'AWS': alt_arn},
'Action': 's3:ListBucket',
'Resource': f'arn:aws:s3:::{bucket}'
}]
}))
# verify that the iam user can eventually access it
retry_on('AccessDenied', 10, alts3.list_objects, Bucket=bucket)
finally:
roots3.delete_bucket(Bucket=bucket)
@pytest.mark.iam_account
@pytest.mark.iam_cross_account
def test_cross_account_root_bucket_policy_allow_account_arn(iam_root, iam_alt_root):
roots3 = get_iam_root_client(service_name='s3')
alts3 = get_iam_alt_root_client(service_name='s3')
alt_arn = iam_alt_root.get_user()['User']['Arn']
_test_cross_account_root_bucket_policy(roots3, alts3, alt_arn)
@pytest.mark.iam_account
@pytest.mark.iam_cross_account
def test_cross_account_root_bucket_policy_allow_account_id(iam_root, iam_alt_root):
roots3 = get_iam_root_client(service_name='s3')
alts3 = get_iam_alt_root_client(service_name='s3')
alt_arn = iam_alt_root.get_user()['User']['Arn']
account_id = alt_arn.removeprefix('arn:aws:iam::').removesuffix(':root')
_test_cross_account_root_bucket_policy(roots3, alts3, account_id)
# test root cross-account access with bucket acls
def _test_cross_account_root_bucket_acl(roots3, alts3, grantee):
# create a bucket with the root user
bucket = get_new_bucket(roots3)
try:
e = assert_raises(ClientError, alts3.list_objects, Bucket=bucket)
status, error_code = _get_status_and_error_code(e.response)
assert status == 403
assert error_code == 'AccessDenied'
# add a bucket acl that grants READ
roots3.put_bucket_acl(Bucket=bucket, GrantRead=grantee)
# verify that the iam user can eventually access it
retry_on('AccessDenied', 10, alts3.list_objects, Bucket=bucket)
finally:
roots3.delete_bucket(Bucket=bucket)
@pytest.mark.iam_account
@pytest.mark.iam_cross_account
def test_cross_account_root_bucket_acl_grant_canonical_id(iam_root, iam_alt_root):
roots3 = get_iam_root_client(service_name='s3')
alts3 = get_iam_alt_root_client(service_name='s3')
grantee = 'id=' + get_iam_alt_root_user_id()
_test_cross_account_root_bucket_acl(roots3, alts3, grantee)
@pytest.mark.iam_account
@pytest.mark.iam_cross_account
def test_cross_account_root_bucket_acl_grant_account_email(iam_root, iam_alt_root):
roots3 = get_iam_root_client(service_name='s3')
alts3 = get_iam_alt_root_client(service_name='s3')
grantee = 'emailAddress=' + get_iam_alt_root_email()
_test_cross_account_root_bucket_acl(roots3, alts3, grantee)