mirror of
https://github.com/ceph/s3-tests.git
synced 2024-11-21 23:29:47 +00:00
iam: test cross-account policy with assumed role
Signed-off-by: Casey Bodley <cbodley@redhat.com>
(cherry picked from commit a3a16eb66a
)
This commit is contained in:
parent
98fd3f55a0
commit
281ab8796f
1 changed files with 282 additions and 0 deletions
|
@ -2301,6 +2301,288 @@ def test_account_role_policy_allow(iam_root):
|
||||||
# something other than AccessDenied
|
# something other than AccessDenied
|
||||||
retry_on('AccessDenied', 10, s3.list_buckets)
|
retry_on('AccessDenied', 10, s3.list_buckets)
|
||||||
|
|
||||||
|
# alt account user assumes main account role to access main account bucket
|
||||||
|
@pytest.mark.iam_account
|
||||||
|
@pytest.mark.iam_cross_account
|
||||||
|
@pytest.mark.iam_role
|
||||||
|
@pytest.mark.role_policy
|
||||||
|
def test_same_account_role_policy_allow(iam_root, iam_alt_root):
|
||||||
|
path = get_iam_path_prefix()
|
||||||
|
user_name = make_iam_name('AltUser')
|
||||||
|
role_name = make_iam_name('MyRole')
|
||||||
|
session_name = 'MySession'
|
||||||
|
bucket_name = get_new_bucket_name()
|
||||||
|
|
||||||
|
user = iam_alt_root.create_user(UserName=user_name, Path=path)['User']
|
||||||
|
user_arn = user['Arn']
|
||||||
|
key = iam_alt_root.create_access_key(UserName=user_name)['AccessKey']
|
||||||
|
|
||||||
|
s3_main = get_iam_root_client(service_name='s3')
|
||||||
|
s3_main.create_bucket(Bucket=bucket_name)
|
||||||
|
|
||||||
|
trust_policy = json.dumps({
|
||||||
|
'Version': '2012-10-17',
|
||||||
|
'Statement': [{
|
||||||
|
'Effect': 'Allow',
|
||||||
|
'Action': 'sts:AssumeRole',
|
||||||
|
'Principal': {'AWS': user_arn}
|
||||||
|
}]
|
||||||
|
})
|
||||||
|
# returns MalformedPolicyDocument until the user arn starts working
|
||||||
|
role = retry_on('MalformedPolicyDocument', 10, iam_root.create_role,
|
||||||
|
RoleName=role_name, Path=path, AssumeRolePolicyDocument=trust_policy)['Role']
|
||||||
|
role_arn = role['Arn']
|
||||||
|
|
||||||
|
sts = get_sts_client(aws_access_key_id=key['AccessKeyId'],
|
||||||
|
aws_secret_access_key=key['SecretAccessKey'])
|
||||||
|
|
||||||
|
# returns InvalidClientTokenId or AccessDenied until the access key starts working
|
||||||
|
response = retry_on(('InvalidClientTokenId', 'AccessDenied'), 10, sts.assume_role,
|
||||||
|
RoleArn=role_arn, RoleSessionName=session_name)
|
||||||
|
creds = response['Credentials']
|
||||||
|
|
||||||
|
s3 = get_iam_s3client(aws_access_key_id = creds['AccessKeyId'],
|
||||||
|
aws_secret_access_key = creds['SecretAccessKey'],
|
||||||
|
aws_session_token = creds['SessionToken'])
|
||||||
|
|
||||||
|
# expect AccessDenied because no identity policy allows s3 actions
|
||||||
|
e = assert_raises(ClientError, s3.list_objects, Bucket=bucket_name)
|
||||||
|
status, error_code = _get_status_and_error_code(e.response)
|
||||||
|
assert status == 403
|
||||||
|
assert error_code == 'AccessDenied'
|
||||||
|
|
||||||
|
policy_name = 'AllowListBucket'
|
||||||
|
policy = json.dumps({
|
||||||
|
'Version': '2012-10-17',
|
||||||
|
'Statement': [{
|
||||||
|
'Effect': 'Allow',
|
||||||
|
'Action': 's3:ListBucket',
|
||||||
|
'Resource': '*'
|
||||||
|
}]
|
||||||
|
})
|
||||||
|
iam_root.put_role_policy(RoleName=role_name, PolicyName=policy_name, PolicyDocument=policy)
|
||||||
|
|
||||||
|
# the policy may take a bit to start working. retry until it returns
|
||||||
|
# something other than AccessDenied
|
||||||
|
retry_on('AccessDenied', 10, s3.list_objects, Bucket=bucket_name)
|
||||||
|
|
||||||
|
# alt account user assumes main account role to access alt account bucket
|
||||||
|
@pytest.mark.iam_account
|
||||||
|
@pytest.mark.iam_cross_account
|
||||||
|
@pytest.mark.iam_role
|
||||||
|
@pytest.mark.role_policy
|
||||||
|
def test_cross_account_role_policy_allow(iam_root, iam_alt_root):
|
||||||
|
path = get_iam_path_prefix()
|
||||||
|
user_name = make_iam_name('AltUser')
|
||||||
|
role_name = make_iam_name('MyRole')
|
||||||
|
session_name = 'MySession'
|
||||||
|
bucket_name = get_new_bucket_name()
|
||||||
|
|
||||||
|
user = iam_alt_root.create_user(UserName=user_name, Path=path)['User']
|
||||||
|
user_arn = user['Arn']
|
||||||
|
key = iam_alt_root.create_access_key(UserName=user_name)['AccessKey']
|
||||||
|
|
||||||
|
s3_alt = get_iam_alt_root_client(service_name='s3')
|
||||||
|
s3_alt.create_bucket(Bucket=bucket_name)
|
||||||
|
|
||||||
|
trust_policy = json.dumps({
|
||||||
|
'Version': '2012-10-17',
|
||||||
|
'Statement': [{
|
||||||
|
'Effect': 'Allow',
|
||||||
|
'Action': 'sts:AssumeRole',
|
||||||
|
'Principal': {'AWS': user_arn}
|
||||||
|
}]
|
||||||
|
})
|
||||||
|
# returns MalformedPolicyDocument until the user arn starts working
|
||||||
|
role = retry_on('MalformedPolicyDocument', 10, iam_root.create_role,
|
||||||
|
RoleName=role_name, Path=path, AssumeRolePolicyDocument=trust_policy)['Role']
|
||||||
|
role_arn = role['Arn']
|
||||||
|
|
||||||
|
sts = get_sts_client(aws_access_key_id=key['AccessKeyId'],
|
||||||
|
aws_secret_access_key=key['SecretAccessKey'])
|
||||||
|
|
||||||
|
# returns InvalidClientTokenId or AccessDenied until the access key starts working
|
||||||
|
response = retry_on(('InvalidClientTokenId', 'AccessDenied'), 10, sts.assume_role,
|
||||||
|
RoleArn=role_arn, RoleSessionName=session_name)
|
||||||
|
creds = response['Credentials']
|
||||||
|
|
||||||
|
s3 = get_iam_s3client(aws_access_key_id = creds['AccessKeyId'],
|
||||||
|
aws_secret_access_key = creds['SecretAccessKey'],
|
||||||
|
aws_session_token = creds['SessionToken'])
|
||||||
|
|
||||||
|
# expect AccessDenied because no identity policy allows s3 actions
|
||||||
|
e = assert_raises(ClientError, s3.list_objects, Bucket=bucket_name)
|
||||||
|
status, error_code = _get_status_and_error_code(e.response)
|
||||||
|
assert status == 403
|
||||||
|
assert error_code == 'AccessDenied'
|
||||||
|
|
||||||
|
policy_name = 'AllowListBucket'
|
||||||
|
policy = json.dumps({
|
||||||
|
'Version': '2012-10-17',
|
||||||
|
'Statement': [{
|
||||||
|
'Effect': 'Allow',
|
||||||
|
'Action': 's3:ListBucket',
|
||||||
|
'Resource': '*'
|
||||||
|
}]
|
||||||
|
})
|
||||||
|
iam_root.put_role_policy(RoleName=role_name, PolicyName=policy_name, PolicyDocument=policy)
|
||||||
|
|
||||||
|
# expect AccessDenied because no resource policy allows the main account
|
||||||
|
e = assert_raises(ClientError, s3.list_objects, Bucket=bucket_name)
|
||||||
|
status, error_code = _get_status_and_error_code(e.response)
|
||||||
|
assert status == 403
|
||||||
|
assert error_code == 'AccessDenied'
|
||||||
|
|
||||||
|
# add a bucket policy that allows s3:ListBucket for the main account's arn
|
||||||
|
main_arn = iam_root.get_user()['User']['Arn']
|
||||||
|
s3_alt.put_bucket_policy(Bucket=bucket_name, Policy=json.dumps({
|
||||||
|
'Version': '2012-10-17',
|
||||||
|
'Statement': [{
|
||||||
|
'Effect': 'Allow',
|
||||||
|
'Principal': {'AWS': main_arn},
|
||||||
|
'Action': 's3:ListBucket',
|
||||||
|
'Resource': f'arn:aws:s3:::{bucket_name}'
|
||||||
|
}]
|
||||||
|
}))
|
||||||
|
|
||||||
|
# the policy may take a bit to start working. retry until it returns
|
||||||
|
# something other than AccessDenied
|
||||||
|
retry_on('AccessDenied', 10, s3.list_objects, Bucket=bucket_name)
|
||||||
|
|
||||||
|
# alt account user assumes main account role to create a bucket
|
||||||
|
@pytest.mark.iam_account
|
||||||
|
@pytest.mark.iam_cross_account
|
||||||
|
@pytest.mark.iam_role
|
||||||
|
@pytest.mark.role_policy
|
||||||
|
def test_account_role_policy_allow_create_bucket(iam_root, iam_alt_root):
|
||||||
|
path = get_iam_path_prefix()
|
||||||
|
user_name = make_iam_name('AltUser')
|
||||||
|
role_name = make_iam_name('MyRole')
|
||||||
|
session_name = 'MySession'
|
||||||
|
bucket_name = get_new_bucket_name()
|
||||||
|
|
||||||
|
user = iam_alt_root.create_user(UserName=user_name, Path=path)['User']
|
||||||
|
user_arn = user['Arn']
|
||||||
|
key = iam_alt_root.create_access_key(UserName=user_name)['AccessKey']
|
||||||
|
|
||||||
|
trust_policy = json.dumps({
|
||||||
|
'Version': '2012-10-17',
|
||||||
|
'Statement': [{
|
||||||
|
'Effect': 'Allow',
|
||||||
|
'Action': 'sts:AssumeRole',
|
||||||
|
'Principal': {'AWS': user_arn}
|
||||||
|
}]
|
||||||
|
})
|
||||||
|
# returns MalformedPolicyDocument until the user arn starts working
|
||||||
|
role = retry_on('MalformedPolicyDocument', 10, iam_root.create_role,
|
||||||
|
RoleName=role_name, Path=path, AssumeRolePolicyDocument=trust_policy)['Role']
|
||||||
|
role_arn = role['Arn']
|
||||||
|
|
||||||
|
sts = get_sts_client(aws_access_key_id=key['AccessKeyId'],
|
||||||
|
aws_secret_access_key=key['SecretAccessKey'])
|
||||||
|
|
||||||
|
# returns InvalidClientTokenId or AccessDenied until the access key starts working
|
||||||
|
response = retry_on(('InvalidClientTokenId', 'AccessDenied'), 10, sts.assume_role,
|
||||||
|
RoleArn=role_arn, RoleSessionName=session_name)
|
||||||
|
creds = response['Credentials']
|
||||||
|
|
||||||
|
s3 = get_iam_s3client(aws_access_key_id = creds['AccessKeyId'],
|
||||||
|
aws_secret_access_key = creds['SecretAccessKey'],
|
||||||
|
aws_session_token = creds['SessionToken'])
|
||||||
|
|
||||||
|
# expect AccessDenied because no identity policy allows s3 actions
|
||||||
|
e = assert_raises(ClientError, s3.create_bucket, Bucket=bucket_name, ObjectOwnership='ObjectWriter', ACL='private')
|
||||||
|
status, error_code = _get_status_and_error_code(e.response)
|
||||||
|
assert status == 403
|
||||||
|
assert error_code == 'AccessDenied'
|
||||||
|
|
||||||
|
policy_name = 'AllowCreateBucket'
|
||||||
|
policy = json.dumps({
|
||||||
|
'Version': '2012-10-17',
|
||||||
|
'Statement': [{
|
||||||
|
'Effect': 'Allow',
|
||||||
|
'Action': ['s3:CreateBucket', 's3:PutBucketAcl'],
|
||||||
|
'Resource': '*'
|
||||||
|
}]
|
||||||
|
})
|
||||||
|
iam_root.put_role_policy(RoleName=role_name, PolicyName=policy_name, PolicyDocument=policy)
|
||||||
|
|
||||||
|
# the policy may take a bit to start working. retry until it returns
|
||||||
|
# something other than AccessDenied
|
||||||
|
retry_on('AccessDenied', 10, s3.create_bucket, Bucket=bucket_name, ObjectOwnership='ObjectWriter', ACL='private')
|
||||||
|
|
||||||
|
# verify that the bucket is owned by the role's account
|
||||||
|
s3_main = get_iam_root_client(service_name='s3')
|
||||||
|
response = s3_main.get_bucket_acl(Bucket=bucket_name)
|
||||||
|
|
||||||
|
main_arn = iam_root.get_user()['User']['Arn']
|
||||||
|
account_id = main_arn.removeprefix('arn:aws:iam::').removesuffix(':root')
|
||||||
|
assert response['Owner']['ID'] == account_id
|
||||||
|
assert response['Grants'][0]['Grantee']['ID'] == account_id
|
||||||
|
|
||||||
|
# alt account user assumes main account role to read the role info
|
||||||
|
@pytest.mark.iam_account
|
||||||
|
@pytest.mark.iam_cross_account
|
||||||
|
@pytest.mark.iam_role
|
||||||
|
@pytest.mark.role_policy
|
||||||
|
def test_account_role_policy_allow_get_role(iam_root, iam_alt_root):
|
||||||
|
path = get_iam_path_prefix()
|
||||||
|
user_name = make_iam_name('AltUser')
|
||||||
|
role_name = make_iam_name('MyRole')
|
||||||
|
session_name = 'MySession'
|
||||||
|
bucket_name = get_new_bucket_name()
|
||||||
|
|
||||||
|
user = iam_alt_root.create_user(UserName=user_name, Path=path)['User']
|
||||||
|
user_arn = user['Arn']
|
||||||
|
key = iam_alt_root.create_access_key(UserName=user_name)['AccessKey']
|
||||||
|
|
||||||
|
trust_policy = json.dumps({
|
||||||
|
'Version': '2012-10-17',
|
||||||
|
'Statement': [{
|
||||||
|
'Effect': 'Allow',
|
||||||
|
'Action': 'sts:AssumeRole',
|
||||||
|
'Principal': {'AWS': user_arn}
|
||||||
|
}]
|
||||||
|
})
|
||||||
|
# returns MalformedPolicyDocument until the user arn starts working
|
||||||
|
role = retry_on('MalformedPolicyDocument', 10, iam_root.create_role,
|
||||||
|
RoleName=role_name, Path=path, AssumeRolePolicyDocument=trust_policy)['Role']
|
||||||
|
role_arn = role['Arn']
|
||||||
|
|
||||||
|
sts = get_sts_client(aws_access_key_id=key['AccessKeyId'],
|
||||||
|
aws_secret_access_key=key['SecretAccessKey'])
|
||||||
|
|
||||||
|
# returns InvalidClientTokenId or AccessDenied until the access key starts working
|
||||||
|
response = retry_on(('InvalidClientTokenId', 'AccessDenied'), 10, sts.assume_role,
|
||||||
|
RoleArn=role_arn, RoleSessionName=session_name)
|
||||||
|
creds = response['Credentials']
|
||||||
|
|
||||||
|
iam = get_iam_root_client(service_name='iam',
|
||||||
|
aws_access_key_id = creds['AccessKeyId'],
|
||||||
|
aws_secret_access_key = creds['SecretAccessKey'],
|
||||||
|
aws_session_token = creds['SessionToken'])
|
||||||
|
|
||||||
|
# expect AccessDenied because no identity policy allows iam actions
|
||||||
|
e = assert_raises(ClientError, iam.get_role, RoleName=role_name)
|
||||||
|
status, error_code = _get_status_and_error_code(e.response)
|
||||||
|
assert status == 403
|
||||||
|
assert error_code == 'AccessDenied'
|
||||||
|
|
||||||
|
policy_name = 'AllowGetRole'
|
||||||
|
policy = json.dumps({
|
||||||
|
'Version': '2012-10-17',
|
||||||
|
'Statement': [{
|
||||||
|
'Effect': 'Allow',
|
||||||
|
'Action': 'iam:GetRole',
|
||||||
|
'Resource': '*'
|
||||||
|
}]
|
||||||
|
})
|
||||||
|
iam_root.put_role_policy(RoleName=role_name, PolicyName=policy_name, PolicyDocument=policy)
|
||||||
|
|
||||||
|
# the policy may take a bit to start working. retry until it returns
|
||||||
|
# something other than AccessDenied
|
||||||
|
retry_on('AccessDenied', 10, iam.get_role, RoleName=role_name)
|
||||||
|
|
||||||
|
|
||||||
# IAM OpenIDConnectProvider apis
|
# IAM OpenIDConnectProvider apis
|
||||||
@pytest.mark.iam_account
|
@pytest.mark.iam_account
|
||||||
|
|
Loading…
Reference in a new issue