mirror of
https://github.com/ceph/s3-tests.git
synced 2024-11-24 19:30:38 +00:00
iam: add account test for RolePolicy apis
adds test cases for the following iam actions: * PutRolePolicy * GetRolePolicy * DeleteRolePolicy * ListRolePolicies verified to pass against aws when an account root user's credentials are provided in the [iam] section of s3tests.conf Signed-off-by: Casey Bodley <cbodley@redhat.com>
This commit is contained in:
parent
d4ada317e1
commit
cefea0fd26
2 changed files with 121 additions and 0 deletions
|
@ -25,6 +25,7 @@ markers =
|
||||||
lifecycle_transition
|
lifecycle_transition
|
||||||
list_objects_v2
|
list_objects_v2
|
||||||
object_lock
|
object_lock
|
||||||
|
role_policy
|
||||||
session_policy
|
session_policy
|
||||||
s3select
|
s3select
|
||||||
s3website
|
s3website
|
||||||
|
|
|
@ -20,6 +20,7 @@ from . import (
|
||||||
get_iam_s3client,
|
get_iam_s3client,
|
||||||
get_alt_iam_client,
|
get_alt_iam_client,
|
||||||
get_alt_user_id,
|
get_alt_user_id,
|
||||||
|
get_sts_client,
|
||||||
)
|
)
|
||||||
from .utils import _get_status, _get_status_and_error_code
|
from .utils import _get_status, _get_status_and_error_code
|
||||||
|
|
||||||
|
@ -1718,3 +1719,122 @@ def test_account_role_update(iam_root):
|
||||||
assert arn == response['Role']['Arn']
|
assert arn == response['Role']['Arn']
|
||||||
assert desc == response['Role']['Description']
|
assert desc == response['Role']['Description']
|
||||||
assert 43200 == response['Role']['MaxSessionDuration']
|
assert 43200 == response['Role']['MaxSessionDuration']
|
||||||
|
|
||||||
|
|
||||||
|
role_policy = json.dumps({
|
||||||
|
'Version': '2012-10-17',
|
||||||
|
'Statement': [{
|
||||||
|
'Effect': 'Allow',
|
||||||
|
'Action': 's3:*',
|
||||||
|
"Resource": "*"
|
||||||
|
}]
|
||||||
|
})
|
||||||
|
|
||||||
|
# IAM RolePolicy apis
|
||||||
|
@pytest.mark.iam_account
|
||||||
|
@pytest.mark.iam_role
|
||||||
|
@pytest.mark.role_policy
|
||||||
|
def test_account_role_policy(iam_root):
|
||||||
|
path = get_iam_path_prefix()
|
||||||
|
role_name = make_iam_name('r')
|
||||||
|
policy_name = 'MyPolicy'
|
||||||
|
policy2_name = 'AnotherPolicy'
|
||||||
|
|
||||||
|
# Get/Put/Delete fail on nonexistent RoleName
|
||||||
|
with pytest.raises(iam_root.exceptions.NoSuchEntityException):
|
||||||
|
iam_root.get_role_policy(RoleName=role_name, PolicyName=policy_name)
|
||||||
|
with pytest.raises(iam_root.exceptions.NoSuchEntityException):
|
||||||
|
iam_root.delete_role_policy(RoleName=role_name, PolicyName=policy_name)
|
||||||
|
with pytest.raises(iam_root.exceptions.NoSuchEntityException):
|
||||||
|
iam_root.put_role_policy(RoleName=role_name, PolicyName=policy_name, PolicyDocument=role_policy)
|
||||||
|
|
||||||
|
iam_root.create_role(RoleName=role_name, Path=path, AssumeRolePolicyDocument=assume_role_policy)
|
||||||
|
|
||||||
|
# Get/Delete fail on nonexistent PolicyName
|
||||||
|
with pytest.raises(iam_root.exceptions.NoSuchEntityException):
|
||||||
|
iam_root.get_role_policy(RoleName=role_name, PolicyName=policy_name)
|
||||||
|
with pytest.raises(iam_root.exceptions.NoSuchEntityException):
|
||||||
|
iam_root.delete_role_policy(RoleName=role_name, PolicyName=policy_name)
|
||||||
|
|
||||||
|
iam_root.put_role_policy(RoleName=role_name, PolicyName=policy_name, PolicyDocument=role_policy)
|
||||||
|
|
||||||
|
response = iam_root.get_role_policy(RoleName=role_name, PolicyName=policy_name)
|
||||||
|
assert role_name == response['RoleName']
|
||||||
|
assert policy_name == response['PolicyName']
|
||||||
|
assert role_policy == json.dumps(response['PolicyDocument'])
|
||||||
|
|
||||||
|
response = iam_root.list_role_policies(RoleName=role_name)
|
||||||
|
assert [policy_name] == response['PolicyNames']
|
||||||
|
|
||||||
|
iam_root.put_role_policy(RoleName=role_name, PolicyName=policy2_name, PolicyDocument=role_policy)
|
||||||
|
|
||||||
|
response = iam_root.list_role_policies(RoleName=role_name)
|
||||||
|
assert [policy2_name, policy_name] == response['PolicyNames']
|
||||||
|
|
||||||
|
iam_root.delete_role_policy(RoleName=role_name, PolicyName=policy_name)
|
||||||
|
iam_root.delete_role_policy(RoleName=role_name, PolicyName=policy2_name)
|
||||||
|
|
||||||
|
# Get/Delete fail after Delete
|
||||||
|
with pytest.raises(iam_root.exceptions.NoSuchEntityException):
|
||||||
|
iam_root.get_role_policy(RoleName=role_name, PolicyName=policy_name)
|
||||||
|
with pytest.raises(iam_root.exceptions.NoSuchEntityException):
|
||||||
|
iam_root.delete_role_policy(RoleName=role_name, PolicyName=policy_name)
|
||||||
|
|
||||||
|
@pytest.mark.iam_account
|
||||||
|
@pytest.mark.iam_role
|
||||||
|
@pytest.mark.role_policy
|
||||||
|
def test_account_role_policy_allow(iam_root):
|
||||||
|
path = get_iam_path_prefix()
|
||||||
|
user_name = make_iam_name('MyUser')
|
||||||
|
role_name = make_iam_name('MyRole')
|
||||||
|
session_name = 'MySession'
|
||||||
|
|
||||||
|
user = iam_root.create_user(UserName=user_name, Path=path)['User']
|
||||||
|
user_arn = user['Arn']
|
||||||
|
|
||||||
|
trust_policy = json.dumps({
|
||||||
|
'Version': '2012-10-17',
|
||||||
|
'Statement': [{
|
||||||
|
'Effect': 'Allow',
|
||||||
|
'Action': 'sts:AssumeRole',
|
||||||
|
'Principal': {'AWS': user_arn}
|
||||||
|
}]
|
||||||
|
})
|
||||||
|
# returns MalformedPolicyDocument until the user arn starts working
|
||||||
|
role = retry_on('MalformedPolicyDocument', 10, iam_root.create_role,
|
||||||
|
RoleName=role_name, Path=path, AssumeRolePolicyDocument=trust_policy)['Role']
|
||||||
|
role_arn = role['Arn']
|
||||||
|
|
||||||
|
key = iam_root.create_access_key(UserName=user_name)['AccessKey']
|
||||||
|
sts = get_sts_client(aws_access_key_id=key['AccessKeyId'],
|
||||||
|
aws_secret_access_key=key['SecretAccessKey'])
|
||||||
|
|
||||||
|
# returns InvalidClientTokenId or AccessDenied until the access key starts working
|
||||||
|
response = retry_on(('InvalidClientTokenId', 'AccessDenied'), 10, sts.assume_role,
|
||||||
|
RoleArn=role_arn, RoleSessionName=session_name)
|
||||||
|
creds = response['Credentials']
|
||||||
|
|
||||||
|
s3 = get_iam_s3client(aws_access_key_id = creds['AccessKeyId'],
|
||||||
|
aws_secret_access_key = creds['SecretAccessKey'],
|
||||||
|
aws_session_token = creds['SessionToken'])
|
||||||
|
|
||||||
|
# expect AccessDenied because no identity policy allows s3 actions
|
||||||
|
e = assert_raises(ClientError, s3.list_buckets)
|
||||||
|
status, error_code = _get_status_and_error_code(e.response)
|
||||||
|
assert status == 403
|
||||||
|
assert error_code == 'AccessDenied'
|
||||||
|
|
||||||
|
policy_name = 'AllowListAllMyBuckets'
|
||||||
|
policy = json.dumps({
|
||||||
|
'Version': '2012-10-17',
|
||||||
|
'Statement': [{
|
||||||
|
'Effect': 'Allow',
|
||||||
|
'Action': 's3:ListAllMyBuckets',
|
||||||
|
'Resource': '*'
|
||||||
|
}]
|
||||||
|
})
|
||||||
|
iam_root.put_role_policy(RoleName=role_name, PolicyName=policy_name, PolicyDocument=policy)
|
||||||
|
|
||||||
|
# the policy may take a bit to start working. retry until it returns
|
||||||
|
# something other than AccessDenied
|
||||||
|
retry_on('AccessDenied', 10, s3.list_buckets)
|
||||||
|
|
Loading…
Reference in a new issue