diff --git a/s3tests_boto3/functional/test_iam.py b/s3tests_boto3/functional/test_iam.py index 4919798..1e09a5e 100644 --- a/s3tests_boto3/functional/test_iam.py +++ b/s3tests_boto3/functional/test_iam.py @@ -16,6 +16,7 @@ from . import ( make_iam_name, get_iam_path_prefix, get_new_bucket, + get_new_bucket_name, get_iam_s3client, get_alt_iam_client, get_alt_user_id, @@ -878,12 +879,25 @@ def nuke_user_keys(client, name): except: pass +def nuke_user_policies(client, name): + p = client.get_paginator('list_user_policies') + for response in p.paginate(UserName=name): + for policy in response['PolicyNames']: + try: + client.delete_user_policy(UserName=name, PolicyName=policy) + except: + pass + def nuke_user(client, name): # delete access keys, user policies, etc try: nuke_user_keys(client, name) except: pass + try: + nuke_user_policies(client, name) + except: + pass client.delete_user(UserName=name) def nuke_users(client, **kwargs): @@ -1333,3 +1347,98 @@ def test_account_user_bucket_policy_allow(iam_root): retry_on('AccessDenied', 10, client.list_objects, Bucket=bucket) finally: roots3.delete_bucket(Bucket=bucket) + + +# IAM UserPolicy apis +@pytest.mark.user_policy +@pytest.mark.iam_account +def test_account_user_policy(iam_root): + path = get_iam_path_prefix() + name = make_iam_name('name') + policy_name = 'List' + bucket_name = get_new_bucket_name() + policy1 = json.dumps({'Version': '2012-10-17', 'Statement': [ + {'Effect': 'Deny', + 'Action': 's3:ListBucket', + 'Resource': f'arn:aws:s3:::{bucket_name}'}]}) + policy2 = json.dumps({'Version': '2012-10-17', 'Statement': [ + {'Effect': 'Allow', + 'Action': 's3:ListBucket', + 'Resource': f'arn:aws:s3:::{bucket_name}'}]}) + + # Get/Put/Delete fail on nonexistent UserName + with pytest.raises(iam_root.exceptions.NoSuchEntityException): + iam_root.get_user_policy(UserName=name, PolicyName=policy_name) + with pytest.raises(iam_root.exceptions.NoSuchEntityException): + iam_root.delete_user_policy(UserName=name, PolicyName=policy_name) + with pytest.raises(iam_root.exceptions.NoSuchEntityException): + iam_root.put_user_policy(UserName=name, PolicyName=policy_name, PolicyDocument=policy1) + + iam_root.create_user(UserName=name, Path=path) + + # Get/Delete fail on nonexistent PolicyName + with pytest.raises(iam_root.exceptions.NoSuchEntityException): + iam_root.get_user_policy(UserName=name, PolicyName=policy_name) + with pytest.raises(iam_root.exceptions.NoSuchEntityException): + iam_root.delete_user_policy(UserName=name, PolicyName=policy_name) + + iam_root.put_user_policy(UserName=name, PolicyName=policy_name, PolicyDocument=policy1) + + response = iam_root.get_user_policy(UserName=name, PolicyName=policy_name) + assert policy1 == json.dumps(response['PolicyDocument']) + response = iam_root.list_user_policies(UserName=name) + assert [policy_name] == response['PolicyNames'] + + iam_root.put_user_policy(UserName=name, PolicyName=policy_name, PolicyDocument=policy2) + + response = iam_root.get_user_policy(UserName=name, PolicyName=policy_name) + assert policy2 == json.dumps(response['PolicyDocument']) + response = iam_root.list_user_policies(UserName=name) + assert [policy_name] == response['PolicyNames'] + + iam_root.delete_user_policy(UserName=name, PolicyName=policy_name) + + # Get/Delete fail after Delete + with pytest.raises(iam_root.exceptions.NoSuchEntityException): + iam_root.get_user_policy(UserName=name, PolicyName=policy_name) + with pytest.raises(iam_root.exceptions.NoSuchEntityException): + iam_root.delete_user_policy(UserName=name, PolicyName=policy_name) + + response = iam_root.list_user_policies(UserName=name) + assert [] == response['PolicyNames'] + +@pytest.mark.user_policy +@pytest.mark.iam_account +def test_account_user_policy_allow(iam_root): + path = get_iam_path_prefix() + name = make_iam_name('name') + bucket_name = get_new_bucket_name() + iam_root.create_user(UserName=name, Path=path) + + key = iam_root.create_access_key(UserName=name)['AccessKey'] + client = get_iam_s3client(aws_access_key_id=key['AccessKeyId'], + aws_secret_access_key=key['SecretAccessKey']) + + # the access key may take a bit to start working. retry until it returns + # something other than InvalidAccessKeyId + e = assert_raises(ClientError, retry_on, 'InvalidAccessKeyId', 10, client.list_buckets) + # expect AccessDenied because no identity policy allows s3 actions + status, error_code = _get_status_and_error_code(e.response) + assert status == 403 + assert error_code == 'AccessDenied' + + # add a user policy that allows s3 actions + policy = json.dumps({ + 'Version': '2012-10-17', + 'Statement': [{ + 'Effect': 'Allow', + 'Action': 's3:*', + 'Resource': '*' + }] + }) + policy_name = 'AllowStar' + iam_root.put_user_policy(UserName=name, PolicyName=policy_name, PolicyDocument=policy) + + # the policy may take a bit to start working. retry until it returns + # something other than AccessDenied + retry_on('AccessDenied', 10, client.list_buckets)