diff --git a/pytest.ini b/pytest.ini index 4a94bc8..3ce92cf 100644 --- a/pytest.ini +++ b/pytest.ini @@ -25,6 +25,7 @@ markers = lifecycle_transition list_objects_v2 object_lock + role_policy session_policy s3select s3website diff --git a/s3tests_boto3/functional/test_iam.py b/s3tests_boto3/functional/test_iam.py index 95fcb80..b06c5b8 100644 --- a/s3tests_boto3/functional/test_iam.py +++ b/s3tests_boto3/functional/test_iam.py @@ -20,6 +20,7 @@ from . import ( get_iam_s3client, get_alt_iam_client, get_alt_user_id, + get_sts_client, ) from .utils import _get_status, _get_status_and_error_code @@ -1718,3 +1719,122 @@ def test_account_role_update(iam_root): assert arn == response['Role']['Arn'] assert desc == response['Role']['Description'] assert 43200 == response['Role']['MaxSessionDuration'] + + +role_policy = json.dumps({ + 'Version': '2012-10-17', + 'Statement': [{ + 'Effect': 'Allow', + 'Action': 's3:*', + "Resource": "*" + }] + }) + +# IAM RolePolicy apis +@pytest.mark.iam_account +@pytest.mark.iam_role +@pytest.mark.role_policy +def test_account_role_policy(iam_root): + path = get_iam_path_prefix() + role_name = make_iam_name('r') + policy_name = 'MyPolicy' + policy2_name = 'AnotherPolicy' + + # Get/Put/Delete fail on nonexistent RoleName + with pytest.raises(iam_root.exceptions.NoSuchEntityException): + iam_root.get_role_policy(RoleName=role_name, PolicyName=policy_name) + with pytest.raises(iam_root.exceptions.NoSuchEntityException): + iam_root.delete_role_policy(RoleName=role_name, PolicyName=policy_name) + with pytest.raises(iam_root.exceptions.NoSuchEntityException): + iam_root.put_role_policy(RoleName=role_name, PolicyName=policy_name, PolicyDocument=role_policy) + + iam_root.create_role(RoleName=role_name, Path=path, AssumeRolePolicyDocument=assume_role_policy) + + # Get/Delete fail on nonexistent PolicyName + with pytest.raises(iam_root.exceptions.NoSuchEntityException): + iam_root.get_role_policy(RoleName=role_name, PolicyName=policy_name) + with pytest.raises(iam_root.exceptions.NoSuchEntityException): + iam_root.delete_role_policy(RoleName=role_name, PolicyName=policy_name) + + iam_root.put_role_policy(RoleName=role_name, PolicyName=policy_name, PolicyDocument=role_policy) + + response = iam_root.get_role_policy(RoleName=role_name, PolicyName=policy_name) + assert role_name == response['RoleName'] + assert policy_name == response['PolicyName'] + assert role_policy == json.dumps(response['PolicyDocument']) + + response = iam_root.list_role_policies(RoleName=role_name) + assert [policy_name] == response['PolicyNames'] + + iam_root.put_role_policy(RoleName=role_name, PolicyName=policy2_name, PolicyDocument=role_policy) + + response = iam_root.list_role_policies(RoleName=role_name) + assert [policy2_name, policy_name] == response['PolicyNames'] + + iam_root.delete_role_policy(RoleName=role_name, PolicyName=policy_name) + iam_root.delete_role_policy(RoleName=role_name, PolicyName=policy2_name) + + # Get/Delete fail after Delete + with pytest.raises(iam_root.exceptions.NoSuchEntityException): + iam_root.get_role_policy(RoleName=role_name, PolicyName=policy_name) + with pytest.raises(iam_root.exceptions.NoSuchEntityException): + iam_root.delete_role_policy(RoleName=role_name, PolicyName=policy_name) + +@pytest.mark.iam_account +@pytest.mark.iam_role +@pytest.mark.role_policy +def test_account_role_policy_allow(iam_root): + path = get_iam_path_prefix() + user_name = make_iam_name('MyUser') + role_name = make_iam_name('MyRole') + session_name = 'MySession' + + user = iam_root.create_user(UserName=user_name, Path=path)['User'] + user_arn = user['Arn'] + + trust_policy = json.dumps({ + 'Version': '2012-10-17', + 'Statement': [{ + 'Effect': 'Allow', + 'Action': 'sts:AssumeRole', + 'Principal': {'AWS': user_arn} + }] + }) + # returns MalformedPolicyDocument until the user arn starts working + role = retry_on('MalformedPolicyDocument', 10, iam_root.create_role, + RoleName=role_name, Path=path, AssumeRolePolicyDocument=trust_policy)['Role'] + role_arn = role['Arn'] + + key = iam_root.create_access_key(UserName=user_name)['AccessKey'] + sts = get_sts_client(aws_access_key_id=key['AccessKeyId'], + aws_secret_access_key=key['SecretAccessKey']) + + # returns InvalidClientTokenId or AccessDenied until the access key starts working + response = retry_on(('InvalidClientTokenId', 'AccessDenied'), 10, sts.assume_role, + RoleArn=role_arn, RoleSessionName=session_name) + creds = response['Credentials'] + + s3 = get_iam_s3client(aws_access_key_id = creds['AccessKeyId'], + aws_secret_access_key = creds['SecretAccessKey'], + aws_session_token = creds['SessionToken']) + + # expect AccessDenied because no identity policy allows s3 actions + e = assert_raises(ClientError, s3.list_buckets) + status, error_code = _get_status_and_error_code(e.response) + assert status == 403 + assert error_code == 'AccessDenied' + + policy_name = 'AllowListAllMyBuckets' + policy = json.dumps({ + 'Version': '2012-10-17', + 'Statement': [{ + 'Effect': 'Allow', + 'Action': 's3:ListAllMyBuckets', + 'Resource': '*' + }] + }) + iam_root.put_role_policy(RoleName=role_name, PolicyName=policy_name, PolicyDocument=policy) + + # the policy may take a bit to start working. retry until it returns + # something other than AccessDenied + retry_on('AccessDenied', 10, s3.list_buckets)