diff --git a/pytest.ini b/pytest.ini index 0e3bcba..73d1563 100644 --- a/pytest.ini +++ b/pytest.ini @@ -16,21 +16,28 @@ markers = fails_on_rgw fails_on_s3 fails_with_subdomain + group + group_policy + iam_account + iam_cross_account + iam_role + iam_tenant + iam_user lifecycle lifecycle_expiration lifecycle_transition list_objects_v2 object_lock + role_policy session_policy s3select s3website s3website_routing_rules s3website_redirect_location - 3website + sns sse_s3 storage_class tagging - test_of_iam test_of_sts token_claims_trust_policy_test token_principal_tag_role_policy_test diff --git a/s3tests.conf.SAMPLE b/s3tests.conf.SAMPLE index 9593fc1..c0dc89a 100644 --- a/s3tests.conf.SAMPLE +++ b/s3tests.conf.SAMPLE @@ -19,6 +19,14 @@ ssl_verify = False ## the prefix to 30 characters long, and avoid collisions bucket prefix = yournamehere-{random}- +# all the iam account resources (users, roles, etc) created +# will start with this name prefix +iam name prefix = s3-tests- + +# all the iam account resources (users, roles, etc) created +# will start with this path prefix +iam path prefix = /s3-tests/ + [s3 main] # main display_name set in vstart.sh display_name = M. Tester @@ -127,6 +135,20 @@ secret_key = abcdefghijklmnopqrstuvwxyzabcdefghijklmn #display_name from vstart.sh display_name = youruseridhere +# iam account root user for iam_account tests +[iam root] +access_key = AAAAAAAAAAAAAAAAAAaa +secret_key = aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa +user_id = RGW11111111111111111 +email = account1@ceph.com + +# iam account root user in a different account than [iam root] +[iam alt root] +access_key = BBBBBBBBBBBBBBBBBBbb +secret_key = bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb +user_id = RGW22222222222222222 +email = account2@ceph.com + #following section needs to be added when you want to run Assume Role With Webidentity test [webidentity] #used for assume role with web identity test in sts-tests diff --git a/s3tests_boto3/functional/__init__.py b/s3tests_boto3/functional/__init__.py index a65b54c..2f9f7e1 100644 --- a/s3tests_boto3/functional/__init__.py +++ b/s3tests_boto3/functional/__init__.py @@ -174,7 +174,7 @@ def configured_storage_classes(): return sc -def setup(): +def configure(): cfg = configparser.RawConfigParser() try: path = os.environ['S3TEST_CONF'] @@ -260,24 +260,41 @@ def setup(): config.tenant_user_id = cfg.get('s3 tenant',"user_id") config.tenant_email = cfg.get('s3 tenant',"email") - # vars from the fixtures section - try: - template = cfg.get('fixtures', "bucket prefix") - except (configparser.NoOptionError): - template = 'test-{random}-' - prefix = choose_bucket_prefix(template=template) + config.iam_access_key = cfg.get('iam',"access_key") + config.iam_secret_key = cfg.get('iam',"secret_key") + config.iam_display_name = cfg.get('iam',"display_name") + config.iam_user_id = cfg.get('iam',"user_id") + config.iam_email = cfg.get('iam',"email") - alt_client = get_alt_client() - tenant_client = get_tenant_client() - nuke_prefixed_buckets(prefix=prefix) - nuke_prefixed_buckets(prefix=prefix, client=alt_client) - nuke_prefixed_buckets(prefix=prefix, client=tenant_client) + config.iam_root_access_key = cfg.get('iam root',"access_key") + config.iam_root_secret_key = cfg.get('iam root',"secret_key") + config.iam_root_user_id = cfg.get('iam root',"user_id") + config.iam_root_email = cfg.get('iam root',"email") + + config.iam_alt_root_access_key = cfg.get('iam alt root',"access_key") + config.iam_alt_root_secret_key = cfg.get('iam alt root',"secret_key") + config.iam_alt_root_user_id = cfg.get('iam alt root',"user_id") + config.iam_alt_root_email = cfg.get('iam alt root',"email") + + # vars from the fixtures section + template = cfg.get('fixtures', "bucket prefix", fallback='test-{random}-') + prefix = choose_bucket_prefix(template=template) + template = cfg.get('fixtures', "iam name prefix", fallback="s3-tests-") + config.iam_name_prefix = choose_bucket_prefix(template=template) + template = cfg.get('fixtures', "iam path prefix", fallback="/s3-tests/") + config.iam_path_prefix = choose_bucket_prefix(template=template) if cfg.has_section("s3 cloud"): get_cloud_config(cfg) else: config.cloud_storage_class = None +def setup(): + alt_client = get_alt_client() + tenant_client = get_tenant_client() + nuke_prefixed_buckets(prefix=prefix) + nuke_prefixed_buckets(prefix=prefix, client=alt_client) + nuke_prefixed_buckets(prefix=prefix, client=tenant_client) def teardown(): alt_client = get_alt_client() @@ -306,11 +323,12 @@ def teardown(): @pytest.fixture(scope="package") def configfile(): - setup() + configure() return config @pytest.fixture(autouse=True) def setup_teardown(configfile): + setup() yield teardown() @@ -395,64 +413,65 @@ def get_v2_client(): config=Config(signature_version='s3')) return client -def get_sts_client(client_config=None): - if client_config == None: - client_config = Config(signature_version='s3v4') +def get_sts_client(**kwargs): + kwargs.setdefault('aws_access_key_id', config.alt_access_key) + kwargs.setdefault('aws_secret_access_key', config.alt_secret_key) + kwargs.setdefault('config', Config(signature_version='s3v4')) client = boto3.client(service_name='sts', - aws_access_key_id=config.alt_access_key, - aws_secret_access_key=config.alt_secret_key, - endpoint_url=config.default_endpoint, - region_name='', - use_ssl=config.default_is_secure, - verify=config.default_ssl_verify, - config=client_config) + endpoint_url=config.default_endpoint, + region_name='', + use_ssl=config.default_is_secure, + verify=config.default_ssl_verify, + **kwargs) return client -def get_iam_client(client_config=None): - cfg = configparser.RawConfigParser() - try: - path = os.environ['S3TEST_CONF'] - except KeyError: - raise RuntimeError( - 'To run tests, point environment ' - + 'variable S3TEST_CONF to a config file.', - ) - cfg.read(path) - if not cfg.has_section("iam"): - raise RuntimeError('Your config file is missing the "iam" section!') +def get_iam_client(**kwargs): + kwargs.setdefault('aws_access_key_id', config.iam_access_key) + kwargs.setdefault('aws_secret_access_key', config.iam_secret_key) - config.iam_access_key = cfg.get('iam',"access_key") - config.iam_secret_key = cfg.get('iam',"secret_key") - config.iam_display_name = cfg.get('iam',"display_name") - config.iam_user_id = cfg.get('iam',"user_id") - config.iam_email = cfg.get('iam',"email") - - if client_config == None: - client_config = Config(signature_version='s3v4') - client = boto3.client(service_name='iam', - aws_access_key_id=config.iam_access_key, - aws_secret_access_key=config.iam_secret_key, endpoint_url=config.default_endpoint, region_name='', use_ssl=config.default_is_secure, verify=config.default_ssl_verify, - config=client_config) + **kwargs) return client -def get_iam_s3client(client_config=None): - if client_config == None: - client_config = Config(signature_version='s3v4') +def get_iam_s3client(**kwargs): + kwargs.setdefault('aws_access_key_id', config.iam_access_key) + kwargs.setdefault('aws_secret_access_key', config.iam_secret_key) + kwargs.setdefault('config', Config(signature_version='s3v4')) + client = boto3.client(service_name='s3', - aws_access_key_id=get_iam_access_key(), - aws_secret_access_key=get_iam_secret_key(), endpoint_url=config.default_endpoint, use_ssl=config.default_is_secure, verify=config.default_ssl_verify, - config=client_config) + **kwargs) return client +def get_iam_root_client(**kwargs): + kwargs.setdefault('service_name', 'iam') + kwargs.setdefault('aws_access_key_id', config.iam_root_access_key) + kwargs.setdefault('aws_secret_access_key', config.iam_root_secret_key) + + return boto3.client(endpoint_url=config.default_endpoint, + region_name='', + use_ssl=config.default_is_secure, + verify=config.default_ssl_verify, + **kwargs) + +def get_iam_alt_root_client(**kwargs): + kwargs.setdefault('service_name', 'iam') + kwargs.setdefault('aws_access_key_id', config.iam_alt_root_access_key) + kwargs.setdefault('aws_secret_access_key', config.iam_alt_root_secret_key) + + return boto3.client(endpoint_url=config.default_endpoint, + region_name='', + use_ssl=config.default_is_secure, + verify=config.default_ssl_verify, + **kwargs) + def get_alt_client(client_config=None): if client_config == None: client_config = Config(signature_version='s3v4') @@ -699,12 +718,33 @@ def get_token(): def get_realm_name(): return config.webidentity_realm +def get_iam_name_prefix(): + return config.iam_name_prefix + +def make_iam_name(name): + return config.iam_name_prefix + name + +def get_iam_path_prefix(): + return config.iam_path_prefix + def get_iam_access_key(): return config.iam_access_key def get_iam_secret_key(): return config.iam_secret_key +def get_iam_root_user_id(): + return config.iam_root_user_id + +def get_iam_root_email(): + return config.iam_root_email + +def get_iam_alt_root_user_id(): + return config.iam_alt_root_user_id + +def get_iam_alt_root_email(): + return config.iam_alt_root_email + def get_user_token(): return config.webidentity_user_token diff --git a/s3tests_boto3/functional/iam.py b/s3tests_boto3/functional/iam.py new file mode 100644 index 0000000..a070e5d --- /dev/null +++ b/s3tests_boto3/functional/iam.py @@ -0,0 +1,199 @@ +from botocore.exceptions import ClientError +import pytest + +from . import ( + configfile, + get_iam_root_client, + get_iam_root_user_id, + get_iam_root_email, + get_iam_alt_root_client, + get_iam_alt_root_user_id, + get_iam_alt_root_email, + get_iam_path_prefix, +) + +def nuke_user_keys(client, name): + p = client.get_paginator('list_access_keys') + for response in p.paginate(UserName=name): + for key in response['AccessKeyMetadata']: + try: + client.delete_access_key(UserName=name, AccessKeyId=key['AccessKeyId']) + except: + pass + +def nuke_user_policies(client, name): + p = client.get_paginator('list_user_policies') + for response in p.paginate(UserName=name): + for policy in response['PolicyNames']: + try: + client.delete_user_policy(UserName=name, PolicyName=policy) + except: + pass + +def nuke_attached_user_policies(client, name): + p = client.get_paginator('list_attached_user_policies') + for response in p.paginate(UserName=name): + for policy in response['AttachedPolicies']: + try: + client.detach_user_policy(UserName=name, PolicyArn=policy['PolicyArn']) + except: + pass + +def nuke_user(client, name): + # delete access keys, user policies, etc + try: + nuke_user_keys(client, name) + except: + pass + try: + nuke_user_policies(client, name) + except: + pass + try: + nuke_attached_user_policies(client, name) + except: + pass + client.delete_user(UserName=name) + +def nuke_users(client, **kwargs): + p = client.get_paginator('list_users') + for response in p.paginate(**kwargs): + for user in response['Users']: + try: + nuke_user(client, user['UserName']) + except: + pass + +def nuke_group_policies(client, name): + p = client.get_paginator('list_group_policies') + for response in p.paginate(GroupName=name): + for policy in response['PolicyNames']: + try: + client.delete_group_policy(GroupName=name, PolicyName=policy) + except: + pass + +def nuke_attached_group_policies(client, name): + p = client.get_paginator('list_attached_group_policies') + for response in p.paginate(GroupName=name): + for policy in response['AttachedPolicies']: + try: + client.detach_group_policy(GroupName=name, PolicyArn=policy['PolicyArn']) + except: + pass + +def nuke_group_users(client, name): + p = client.get_paginator('get_group') + for response in p.paginate(GroupName=name): + for user in response['Users']: + try: + client.remove_user_from_group(GroupName=name, UserName=user['UserName']) + except: + pass + +def nuke_group(client, name): + # delete group policies and remove all users + try: + nuke_group_policies(client, name) + except: + pass + try: + nuke_attached_group_policies(client, name) + except: + pass + try: + nuke_group_users(client, name) + except: + pass + client.delete_group(GroupName=name) + +def nuke_groups(client, **kwargs): + p = client.get_paginator('list_groups') + for response in p.paginate(**kwargs): + for user in response['Groups']: + try: + nuke_group(client, user['GroupName']) + except: + pass + +def nuke_role_policies(client, name): + p = client.get_paginator('list_role_policies') + for response in p.paginate(RoleName=name): + for policy in response['PolicyNames']: + try: + client.delete_role_policy(RoleName=name, PolicyName=policy) + except: + pass + +def nuke_attached_role_policies(client, name): + p = client.get_paginator('list_attached_role_policies') + for response in p.paginate(RoleName=name): + for policy in response['AttachedPolicies']: + try: + client.detach_role_policy(RoleName=name, PolicyArn=policy['PolicyArn']) + except: + pass + +def nuke_role(client, name): + # delete role policies, etc + try: + nuke_role_policies(client, name) + except: + pass + try: + nuke_attached_role_policies(client, name) + except: + pass + client.delete_role(RoleName=name) + +def nuke_roles(client, **kwargs): + p = client.get_paginator('list_roles') + for response in p.paginate(**kwargs): + for role in response['Roles']: + try: + nuke_role(client, role['RoleName']) + except: + pass + +def nuke_oidc_providers(client, prefix): + result = client.list_open_id_connect_providers() + for provider in result['OpenIDConnectProviderList']: + arn = provider['Arn'] + if f':oidc-provider{prefix}' in arn: + try: + client.delete_open_id_connect_provider(OpenIDConnectProviderArn=arn) + except: + pass + + +# fixture for iam account root user +@pytest.fixture +def iam_root(configfile): + client = get_iam_root_client() + try: + arn = client.get_user()['User']['Arn'] + if not arn.endswith(':root'): + pytest.skip('[iam root] user does not have :root arn') + except ClientError as e: + pytest.skip('[iam root] user does not belong to an account') + + yield client + nuke_users(client, PathPrefix=get_iam_path_prefix()) + nuke_groups(client, PathPrefix=get_iam_path_prefix()) + nuke_roles(client, PathPrefix=get_iam_path_prefix()) + nuke_oidc_providers(client, get_iam_path_prefix()) + +# fixture for iam alt account root user +@pytest.fixture +def iam_alt_root(configfile): + client = get_iam_alt_root_client() + try: + arn = client.get_user()['User']['Arn'] + if not arn.endswith(':root'): + pytest.skip('[iam alt root] user does not have :root arn') + except ClientError as e: + pytest.skip('[iam alt root] user does not belong to an account') + + yield client + nuke_users(client, PathPrefix=get_iam_path_prefix()) + nuke_roles(client, PathPrefix=get_iam_path_prefix()) diff --git a/s3tests_boto3/functional/test_iam.py b/s3tests_boto3/functional/test_iam.py index fa44357..fb288ce 100644 --- a/s3tests_boto3/functional/test_iam.py +++ b/s3tests_boto3/functional/test_iam.py @@ -1,4 +1,6 @@ import json +import datetime +import time from botocore.exceptions import ClientError import pytest @@ -10,16 +12,25 @@ from . import ( setup_teardown, get_alt_client, get_iam_client, + get_iam_root_client, + get_iam_alt_root_client, + get_iam_alt_root_user_id, + get_iam_alt_root_email, + make_iam_name, + get_iam_path_prefix, get_new_bucket, + get_new_bucket_name, get_iam_s3client, get_alt_iam_client, get_alt_user_id, + get_sts_client, ) from .utils import _get_status, _get_status_and_error_code +from .iam import iam_root, iam_alt_root @pytest.mark.user_policy -@pytest.mark.test_of_iam +@pytest.mark.iam_tenant def test_put_user_policy(): client = get_iam_client() @@ -39,7 +50,7 @@ def test_put_user_policy(): @pytest.mark.user_policy -@pytest.mark.test_of_iam +@pytest.mark.iam_tenant def test_put_user_policy_invalid_user(): client = get_iam_client() @@ -57,7 +68,7 @@ def test_put_user_policy_invalid_user(): @pytest.mark.user_policy -@pytest.mark.test_of_iam +@pytest.mark.iam_tenant def test_put_user_policy_parameter_limit(): client = get_iam_client() @@ -76,7 +87,7 @@ def test_put_user_policy_parameter_limit(): @pytest.mark.user_policy -@pytest.mark.test_of_iam +@pytest.mark.iam_tenant @pytest.mark.fails_on_rgw def test_put_user_policy_invalid_element(): client = get_iam_client() @@ -142,7 +153,7 @@ def test_put_user_policy_invalid_element(): @pytest.mark.user_policy -@pytest.mark.test_of_iam +@pytest.mark.iam_tenant def test_put_existing_user_policy(): client = get_iam_client() @@ -163,7 +174,7 @@ def test_put_existing_user_policy(): @pytest.mark.user_policy -@pytest.mark.test_of_iam +@pytest.mark.iam_tenant def test_list_user_policy(): client = get_iam_client() @@ -184,7 +195,7 @@ def test_list_user_policy(): @pytest.mark.user_policy -@pytest.mark.test_of_iam +@pytest.mark.iam_tenant def test_list_user_policy_invalid_user(): client = get_iam_client() e = assert_raises(ClientError, client.list_user_policies, UserName="some-non-existing-user-id") @@ -193,7 +204,7 @@ def test_list_user_policy_invalid_user(): @pytest.mark.user_policy -@pytest.mark.test_of_iam +@pytest.mark.iam_tenant def test_get_user_policy(): client = get_iam_client() @@ -216,7 +227,7 @@ def test_get_user_policy(): @pytest.mark.user_policy -@pytest.mark.test_of_iam +@pytest.mark.iam_tenant def test_get_user_policy_invalid_user(): client = get_iam_client() @@ -238,7 +249,7 @@ def test_get_user_policy_invalid_user(): @pytest.mark.user_policy -@pytest.mark.test_of_iam +@pytest.mark.iam_tenant @pytest.mark.fails_on_rgw def test_get_user_policy_invalid_policy_name(): client = get_iam_client() @@ -260,7 +271,7 @@ def test_get_user_policy_invalid_policy_name(): @pytest.mark.user_policy -@pytest.mark.test_of_iam +@pytest.mark.iam_tenant @pytest.mark.fails_on_rgw def test_get_deleted_user_policy(): client = get_iam_client() @@ -282,7 +293,7 @@ def test_get_deleted_user_policy(): @pytest.mark.user_policy -@pytest.mark.test_of_iam +@pytest.mark.iam_tenant def test_get_user_policy_from_multiple_policies(): client = get_iam_client() @@ -315,7 +326,7 @@ def test_get_user_policy_from_multiple_policies(): @pytest.mark.user_policy -@pytest.mark.test_of_iam +@pytest.mark.iam_tenant def test_delete_user_policy(): client = get_iam_client() @@ -337,7 +348,7 @@ def test_delete_user_policy(): @pytest.mark.user_policy -@pytest.mark.test_of_iam +@pytest.mark.iam_tenant def test_delete_user_policy_invalid_user(): client = get_iam_client() @@ -363,7 +374,7 @@ def test_delete_user_policy_invalid_user(): @pytest.mark.user_policy -@pytest.mark.test_of_iam +@pytest.mark.iam_tenant def test_delete_user_policy_invalid_policy_name(): client = get_iam_client() @@ -389,7 +400,7 @@ def test_delete_user_policy_invalid_policy_name(): @pytest.mark.user_policy -@pytest.mark.test_of_iam +@pytest.mark.iam_tenant def test_delete_user_policy_from_multiple_policies(): client = get_iam_client() @@ -429,7 +440,7 @@ def test_delete_user_policy_from_multiple_policies(): @pytest.mark.user_policy -@pytest.mark.test_of_iam +@pytest.mark.iam_tenant def test_allow_bucket_actions_in_user_policy(): client = get_iam_client() s3_client_alt = get_alt_client() @@ -476,7 +487,7 @@ def test_allow_bucket_actions_in_user_policy(): @pytest.mark.user_policy -@pytest.mark.test_of_iam +@pytest.mark.iam_tenant @pytest.mark.fails_on_dbstore def test_deny_bucket_actions_in_user_policy(): client = get_iam_client() @@ -512,7 +523,7 @@ def test_deny_bucket_actions_in_user_policy(): @pytest.mark.user_policy -@pytest.mark.test_of_iam +@pytest.mark.iam_tenant def test_allow_object_actions_in_user_policy(): client = get_iam_client() s3_client_alt = get_alt_client() @@ -551,7 +562,7 @@ def test_allow_object_actions_in_user_policy(): @pytest.mark.user_policy -@pytest.mark.test_of_iam +@pytest.mark.iam_tenant @pytest.mark.fails_on_dbstore def test_deny_object_actions_in_user_policy(): client = get_iam_client() @@ -591,7 +602,7 @@ def test_deny_object_actions_in_user_policy(): @pytest.mark.user_policy -@pytest.mark.test_of_iam +@pytest.mark.iam_tenant def test_allow_multipart_actions_in_user_policy(): client = get_iam_client() s3_client_alt = get_alt_client() @@ -626,7 +637,7 @@ def test_allow_multipart_actions_in_user_policy(): @pytest.mark.user_policy -@pytest.mark.test_of_iam +@pytest.mark.iam_tenant @pytest.mark.fails_on_dbstore def test_deny_multipart_actions_in_user_policy(): client = get_iam_client() @@ -669,7 +680,7 @@ def test_deny_multipart_actions_in_user_policy(): @pytest.mark.user_policy -@pytest.mark.test_of_iam +@pytest.mark.iam_tenant @pytest.mark.fails_on_dbstore def test_allow_tagging_actions_in_user_policy(): client = get_iam_client() @@ -715,7 +726,7 @@ def test_allow_tagging_actions_in_user_policy(): @pytest.mark.user_policy -@pytest.mark.test_of_iam +@pytest.mark.iam_tenant @pytest.mark.fails_on_dbstore def test_deny_tagging_actions_in_user_policy(): client = get_iam_client() @@ -767,7 +778,7 @@ def test_deny_tagging_actions_in_user_policy(): @pytest.mark.user_policy -@pytest.mark.test_of_iam +@pytest.mark.iam_tenant @pytest.mark.fails_on_dbstore def test_verify_conflicting_user_policy_statements(): s3client = get_alt_client() @@ -799,7 +810,7 @@ def test_verify_conflicting_user_policy_statements(): @pytest.mark.user_policy -@pytest.mark.test_of_iam +@pytest.mark.iam_tenant @pytest.mark.fails_on_dbstore def test_verify_conflicting_user_policies(): s3client = get_alt_client() @@ -838,7 +849,7 @@ def test_verify_conflicting_user_policies(): @pytest.mark.user_policy -@pytest.mark.test_of_iam +@pytest.mark.iam_tenant def test_verify_allow_iam_actions(): policy1 = json.dumps( {"Version": "2012-10-17", @@ -862,3 +873,1931 @@ def test_verify_allow_iam_actions(): response = iam_client_alt.delete_user_policy(PolicyName='AllowAccessPolicy', UserName=get_alt_user_id()) assert response['ResponseMetadata']['HTTPStatusCode'] == 200 + + +# IAM User apis +@pytest.mark.iam_account +@pytest.mark.iam_user +def test_account_user_create(iam_root): + path = get_iam_path_prefix() + name1 = make_iam_name('U1') + response = iam_root.create_user(UserName=name1, Path=path) + user = response['User'] + assert user['Path'] == path + assert user['UserName'] == name1 + assert len(user['UserId']) + assert user['Arn'].startswith('arn:aws:iam:') + assert user['Arn'].endswith(f':user{path}{name1}') + assert user['CreateDate'] > datetime.datetime(1970, 1, 1, tzinfo=datetime.timezone.utc) + + path2 = get_iam_path_prefix() + 'foo/' + with pytest.raises(iam_root.exceptions.EntityAlreadyExistsException): + iam_root.create_user(UserName=name1, Path=path2) + + name2 = make_iam_name('U2') + response = iam_root.create_user(UserName=name2, Path=path2) + user = response['User'] + assert user['Path'] == path2 + assert user['UserName'] == name2 + +@pytest.mark.iam_account +@pytest.mark.iam_user +def test_account_user_case_insensitive_name(iam_root): + path = get_iam_path_prefix() + name_upper = make_iam_name('U1') + name_lower = make_iam_name('u1') + response = iam_root.create_user(UserName=name_upper, Path=path) + user = response['User'] + + # name is case-insensitive, so 'u1' should also conflict + with pytest.raises(iam_root.exceptions.EntityAlreadyExistsException): + iam_root.create_user(UserName=name_lower) + + # search for 'u1' should return the same 'U1' user + response = iam_root.get_user(UserName=name_lower) + assert user == response['User'] + + # delete for 'u1' should delete the same 'U1' user + iam_root.delete_user(UserName=name_lower) + + with pytest.raises(iam_root.exceptions.NoSuchEntityException): + iam_root.get_user(UserName=name_lower) + +@pytest.mark.iam_account +@pytest.mark.iam_user +def test_account_user_delete(iam_root): + path = get_iam_path_prefix() + name = make_iam_name('U1') + with pytest.raises(iam_root.exceptions.NoSuchEntityException): + iam_root.delete_user(UserName=name) + + response = iam_root.create_user(UserName=name, Path=path) + uid = response['User']['UserId'] + create_date = response['User']['CreateDate'] + + iam_root.delete_user(UserName=name) + + response = iam_root.create_user(UserName=name, Path=path) + assert uid != response['User']['UserId'] + assert create_date <= response['User']['CreateDate'] + +def user_list_names(client, **kwargs): + p = client.get_paginator('list_users') + usernames = [] + for response in p.paginate(**kwargs): + usernames += [u['UserName'] for u in response['Users']] + return usernames + +@pytest.mark.iam_account +@pytest.mark.iam_user +def test_account_user_list(iam_root): + path = get_iam_path_prefix() + response = iam_root.list_users(PathPrefix=path) + assert len(response['Users']) == 0 + assert response['IsTruncated'] == False + + name1 = make_iam_name('aa') + name2 = make_iam_name('Ab') + name3 = make_iam_name('ac') + name4 = make_iam_name('Ad') + + # sort order is independent of CreateDate, Path, and UserName capitalization + iam_root.create_user(UserName=name4, Path=path+'w/') + iam_root.create_user(UserName=name3, Path=path+'x/') + iam_root.create_user(UserName=name2, Path=path+'y/') + iam_root.create_user(UserName=name1, Path=path+'z/') + + assert [name1, name2, name3, name4] == \ + user_list_names(iam_root, PathPrefix=path) + assert [name1, name2, name3, name4] == \ + user_list_names(iam_root, PathPrefix=path, PaginationConfig={'PageSize': 1}) + +@pytest.mark.iam_account +@pytest.mark.iam_user +def test_account_user_list_path_prefix(iam_root): + path = get_iam_path_prefix() + response = iam_root.list_users(PathPrefix=path) + assert len(response['Users']) == 0 + assert response['IsTruncated'] == False + + name1 = make_iam_name('a') + name2 = make_iam_name('b') + name3 = make_iam_name('c') + name4 = make_iam_name('d') + + iam_root.create_user(UserName=name1, Path=path) + iam_root.create_user(UserName=name2, Path=path) + iam_root.create_user(UserName=name3, Path=path+'a/') + iam_root.create_user(UserName=name4, Path=path+'a/x/') + + assert [name1, name2, name3, name4] == \ + user_list_names(iam_root, PathPrefix=path) + assert [name1, name2, name3, name4] == \ + user_list_names(iam_root, PathPrefix=path, + PaginationConfig={'PageSize': 1}) + assert [name3, name4] == \ + user_list_names(iam_root, PathPrefix=path+'a') + assert [name3, name4] == \ + user_list_names(iam_root, PathPrefix=path+'a', + PaginationConfig={'PageSize': 1}) + assert [name4] == \ + user_list_names(iam_root, PathPrefix=path+'a/x') + assert [name4] == \ + user_list_names(iam_root, PathPrefix=path+'a/x', + PaginationConfig={'PageSize': 1}) + assert [] == user_list_names(iam_root, PathPrefix=path+'a/x/d') + +@pytest.mark.iam_account +@pytest.mark.iam_user +def test_account_user_update_name(iam_root): + path = get_iam_path_prefix() + name1 = make_iam_name('a') + new_name1 = make_iam_name('z') + name2 = make_iam_name('b') + with pytest.raises(iam_root.exceptions.NoSuchEntityException): + iam_root.update_user(UserName=name1, NewUserName=new_name1) + + iam_root.create_user(UserName=name1, Path=path) + iam_root.create_user(UserName=name2, Path=path+'m/') + assert [name1, name2] == user_list_names(iam_root, PathPrefix=path) + + response = iam_root.get_user(UserName=name1) + assert name1 == response['User']['UserName'] + uid = response['User']['UserId'] + + iam_root.update_user(UserName=name1, NewUserName=new_name1) + + with pytest.raises(iam_root.exceptions.NoSuchEntityException): + iam_root.get_user(UserName=name1) + + response = iam_root.get_user(UserName=new_name1) + assert new_name1 == response['User']['UserName'] + assert uid == response['User']['UserId'] + assert response['User']['Arn'].endswith(f':user{path}{new_name1}') + + assert [name2, new_name1] == user_list_names(iam_root, PathPrefix=path) + +@pytest.mark.iam_account +@pytest.mark.iam_user +def test_account_user_update_path(iam_root): + path = get_iam_path_prefix() + name1 = make_iam_name('a') + name2 = make_iam_name('b') + iam_root.create_user(UserName=name1, Path=path) + iam_root.create_user(UserName=name2, Path=path+'m/') + assert [name1, name2] == user_list_names(iam_root, PathPrefix=path) + + response = iam_root.get_user(UserName=name1) + assert name1 == response['User']['UserName'] + assert path == response['User']['Path'] + uid = response['User']['UserId'] + + iam_root.update_user(UserName=name1, NewPath=path+'z/') + + response = iam_root.get_user(UserName=name1) + assert name1 == response['User']['UserName'] + assert f'{path}z/' == response['User']['Path'] + assert uid == response['User']['UserId'] + assert response['User']['Arn'].endswith(f':user{path}z/{name1}') + + assert [name1, name2] == user_list_names(iam_root, PathPrefix=path) + + +# IAM AccessKey apis +@pytest.mark.iam_account +@pytest.mark.iam_user +def test_account_user_access_key_create(iam_root): + path = get_iam_path_prefix() + name = make_iam_name('a') + with pytest.raises(iam_root.exceptions.NoSuchEntityException): + iam_root.create_access_key(UserName=name) + + iam_root.create_user(UserName=name, Path=path) + + response = iam_root.create_access_key(UserName=name) + key = response['AccessKey'] + assert name == key['UserName'] + assert len(key['AccessKeyId']) + assert len(key['SecretAccessKey']) + assert 'Active' == key['Status'] + assert key['CreateDate'] > datetime.datetime(1970, 1, 1, tzinfo=datetime.timezone.utc) + +@pytest.mark.iam_account +@pytest.mark.iam_user +def test_account_current_user_access_key_create(iam_root): + # omit the UserName argument to operate on the current authenticated + # user (assumed to be an account root user) + + response = iam_root.create_access_key() + key = response['AccessKey'] + keyid = key['AccessKeyId'] + assert len(keyid) + try: + assert len(key['SecretAccessKey']) + assert 'Active' == key['Status'] + assert key['CreateDate'] > datetime.datetime(1970, 1, 1, tzinfo=datetime.timezone.utc) + finally: + # iam_root doesn't see the account root user, so clean up + # this key manually + iam_root.delete_access_key(AccessKeyId=keyid) + +@pytest.mark.iam_account +@pytest.mark.iam_user +def test_account_user_access_key_update(iam_root): + with pytest.raises(iam_root.exceptions.NoSuchEntityException): + iam_root.update_access_key(UserName='nosuchuser', AccessKeyId='abcdefghijklmnopqrstu', Status='Active') + + path = get_iam_path_prefix() + name = make_iam_name('a') + iam_root.create_user(UserName=name, Path=path) + + response = iam_root.create_access_key(UserName=name) + key = response['AccessKey'] + keyid = key['AccessKeyId'] + create_date = key['CreateDate'] + assert create_date > datetime.datetime(1970, 1, 1, tzinfo=datetime.timezone.utc) + + with pytest.raises(iam_root.exceptions.NoSuchEntityException): + iam_root.update_access_key(UserName=name, AccessKeyId='abcdefghijklmnopqrstu', Status='Active') + + iam_root.update_access_key(UserName=name, AccessKeyId=keyid, Status='Active') + iam_root.update_access_key(UserName=name, AccessKeyId=keyid, Status='Inactive') + + response = iam_root.list_access_keys(UserName=name) + keys = response['AccessKeyMetadata'] + assert 1 == len(keys) + key = keys[0] + assert name == key['UserName'] + assert keyid == key['AccessKeyId'] + assert 'Inactive' == key['Status'] + assert create_date == key['CreateDate'] # CreateDate unchanged by update_access_key() + +@pytest.mark.iam_account +@pytest.mark.iam_user +def test_account_current_user_access_key_update(iam_root): + # omit the UserName argument to operate on the current authenticated + # user (assumed to be an account root user) + + with pytest.raises(iam_root.exceptions.NoSuchEntityException): + iam_root.update_access_key(AccessKeyId='abcdefghijklmnopqrstu', Status='Active') + + response = iam_root.create_access_key() + key = response['AccessKey'] + keyid = key['AccessKeyId'] + assert len(keyid) + try: + iam_root.update_access_key(AccessKeyId=keyid, Status='Active') + iam_root.update_access_key(AccessKeyId=keyid, Status='Inactive') + + # find the access key id we created + p = iam_root.get_paginator('list_access_keys') + for response in p.paginate(): + for key in response['AccessKeyMetadata']: + if keyid == key['AccessKeyId']: + assert 'Inactive' == key['Status'] + return + assert False, f'AccessKeyId={keyid} not found in list_access_keys()' + + finally: + # iam_root doesn't see the account root user, so clean up + # this key manually + iam_root.delete_access_key(AccessKeyId=keyid) + +@pytest.mark.iam_account +@pytest.mark.iam_user +def test_account_user_access_key_delete(iam_root): + with pytest.raises(iam_root.exceptions.NoSuchEntityException): + iam_root.delete_access_key(UserName='nosuchuser', AccessKeyId='abcdefghijklmnopqrstu') + + path = get_iam_path_prefix() + name = make_iam_name('a') + iam_root.create_user(UserName=name, Path=path) + + with pytest.raises(iam_root.exceptions.NoSuchEntityException): + iam_root.delete_access_key(UserName=name, AccessKeyId='abcdefghijklmnopqrstu') + + response = iam_root.create_access_key(UserName=name) + keyid = response['AccessKey']['AccessKeyId'] + + iam_root.delete_access_key(UserName=name, AccessKeyId=keyid) + + with pytest.raises(iam_root.exceptions.NoSuchEntityException): + iam_root.delete_access_key(UserName=name, AccessKeyId=keyid) + + response = iam_root.list_access_keys(UserName=name) + keys = response['AccessKeyMetadata'] + assert 0 == len(keys) + +@pytest.mark.iam_account +@pytest.mark.iam_user +def test_account_current_user_access_key_delete(iam_root): + # omit the UserName argument to operate on the current authenticated + # user (assumed to be an account root user) + + with pytest.raises(iam_root.exceptions.NoSuchEntityException): + iam_root.delete_access_key(AccessKeyId='abcdefghijklmnopqrstu') + + response = iam_root.create_access_key() + keyid = response['AccessKey']['AccessKeyId'] + + iam_root.delete_access_key(AccessKeyId=keyid) + + with pytest.raises(iam_root.exceptions.NoSuchEntityException): + iam_root.delete_access_key(AccessKeyId=keyid) + + # make sure list_access_keys() doesn't return the access key id we deleted + p = iam_root.get_paginator('list_access_keys') + for response in p.paginate(): + for key in response['AccessKeyMetadata']: + assert keyid != key['AccessKeyId'] + +def user_list_key_ids(client, **kwargs): + p = client.get_paginator('list_access_keys') + ids = [] + for response in p.paginate(**kwargs): + ids += [k['AccessKeyId'] for k in response['AccessKeyMetadata']] + return ids + +@pytest.mark.iam_account +@pytest.mark.iam_user +def test_account_user_access_key_list(iam_root): + with pytest.raises(iam_root.exceptions.NoSuchEntityException): + iam_root.list_access_keys(UserName='nosuchuser') + + path = get_iam_path_prefix() + name = make_iam_name('a') + iam_root.create_user(UserName=name, Path=path) + + assert [] == user_list_key_ids(iam_root, UserName=name) + assert [] == user_list_key_ids(iam_root, UserName=name, PaginationConfig={'PageSize': 1}) + + id1 = iam_root.create_access_key(UserName=name)['AccessKey']['AccessKeyId'] + + assert [id1] == user_list_key_ids(iam_root, UserName=name) + assert [id1] == user_list_key_ids(iam_root, UserName=name, PaginationConfig={'PageSize': 1}) + + id2 = iam_root.create_access_key(UserName=name)['AccessKey']['AccessKeyId'] + # AccessKeysPerUser=2 is the default quota in aws + + keys = sorted([id1, id2]) + assert keys == sorted(user_list_key_ids(iam_root, UserName=name)) + assert keys == sorted(user_list_key_ids(iam_root, UserName=name, PaginationConfig={'PageSize': 1})) + +def retry_on(code, tries, func, *args, **kwargs): + for i in range(tries): + try: + return func(*args, **kwargs) + except ClientError as e: + err = e.response['Error']['Code'] + if i + 1 < tries and err in code: + print(f'Got {err}, retrying in {i}s..') + time.sleep(i) + continue + raise + + +@pytest.mark.iam_account +@pytest.mark.iam_user +def test_account_user_bucket_policy_allow(iam_root): + path = get_iam_path_prefix() + name = make_iam_name('name') + response = iam_root.create_user(UserName=name, Path=path) + user_arn = response['User']['Arn'] + assert user_arn.startswith('arn:aws:iam:') + assert user_arn.endswith(f':user{path}{name}') + + key = iam_root.create_access_key(UserName=name)['AccessKey'] + client = get_iam_s3client(aws_access_key_id=key['AccessKeyId'], + aws_secret_access_key=key['SecretAccessKey']) + + # create a bucket with the root user + roots3 = get_iam_root_client(service_name='s3') + bucket = get_new_bucket(roots3) + try: + # the access key may take a bit to start working. retry until it returns + # something other than InvalidAccessKeyId + e = assert_raises(ClientError, retry_on, 'InvalidAccessKeyId', 10, client.list_objects, Bucket=bucket) + # expect AccessDenied because no identity policy allows s3 actions + status, error_code = _get_status_and_error_code(e.response) + assert status == 403 + assert error_code == 'AccessDenied' + + # add a bucket policy that allows s3:ListBucket for the iam user's arn + policy = json.dumps({ + 'Version': '2012-10-17', + 'Statement': [{ + 'Effect': 'Allow', + 'Principal': {'AWS': user_arn}, + 'Action': 's3:ListBucket', + 'Resource': f'arn:aws:s3:::{bucket}' + }] + }) + roots3.put_bucket_policy(Bucket=bucket, Policy=policy) + + # verify that the iam user can eventually access it + retry_on('AccessDenied', 10, client.list_objects, Bucket=bucket) + finally: + roots3.delete_bucket(Bucket=bucket) + + +# IAM UserPolicy apis +@pytest.mark.user_policy +@pytest.mark.iam_account +def test_account_user_policy(iam_root): + path = get_iam_path_prefix() + name = make_iam_name('name') + policy_name = 'List' + bucket_name = get_new_bucket_name() + policy1 = json.dumps({'Version': '2012-10-17', 'Statement': [ + {'Effect': 'Deny', + 'Action': 's3:ListBucket', + 'Resource': f'arn:aws:s3:::{bucket_name}'}]}) + policy2 = json.dumps({'Version': '2012-10-17', 'Statement': [ + {'Effect': 'Allow', + 'Action': 's3:ListBucket', + 'Resource': f'arn:aws:s3:::{bucket_name}'}]}) + + # Get/Put/Delete fail on nonexistent UserName + with pytest.raises(iam_root.exceptions.NoSuchEntityException): + iam_root.get_user_policy(UserName=name, PolicyName=policy_name) + with pytest.raises(iam_root.exceptions.NoSuchEntityException): + iam_root.delete_user_policy(UserName=name, PolicyName=policy_name) + with pytest.raises(iam_root.exceptions.NoSuchEntityException): + iam_root.put_user_policy(UserName=name, PolicyName=policy_name, PolicyDocument=policy1) + + iam_root.create_user(UserName=name, Path=path) + + # Get/Delete fail on nonexistent PolicyName + with pytest.raises(iam_root.exceptions.NoSuchEntityException): + iam_root.get_user_policy(UserName=name, PolicyName=policy_name) + with pytest.raises(iam_root.exceptions.NoSuchEntityException): + iam_root.delete_user_policy(UserName=name, PolicyName=policy_name) + + iam_root.put_user_policy(UserName=name, PolicyName=policy_name, PolicyDocument=policy1) + + response = iam_root.get_user_policy(UserName=name, PolicyName=policy_name) + assert policy1 == json.dumps(response['PolicyDocument']) + response = iam_root.list_user_policies(UserName=name) + assert [policy_name] == response['PolicyNames'] + + iam_root.put_user_policy(UserName=name, PolicyName=policy_name, PolicyDocument=policy2) + + response = iam_root.get_user_policy(UserName=name, PolicyName=policy_name) + assert policy2 == json.dumps(response['PolicyDocument']) + response = iam_root.list_user_policies(UserName=name) + assert [policy_name] == response['PolicyNames'] + + iam_root.delete_user_policy(UserName=name, PolicyName=policy_name) + + # Get/Delete fail after Delete + with pytest.raises(iam_root.exceptions.NoSuchEntityException): + iam_root.get_user_policy(UserName=name, PolicyName=policy_name) + with pytest.raises(iam_root.exceptions.NoSuchEntityException): + iam_root.delete_user_policy(UserName=name, PolicyName=policy_name) + + response = iam_root.list_user_policies(UserName=name) + assert [] == response['PolicyNames'] + +@pytest.mark.user_policy +@pytest.mark.iam_account +def test_account_user_policy_managed(iam_root): + path = get_iam_path_prefix() + name = make_iam_name('name') + policy1 = 'arn:aws:iam::aws:policy/AmazonS3FullAccess' + policy2 = 'arn:aws:iam::aws:policy/AmazonS3ReadOnlyAccess' + + # Attach/Detach/List fail on nonexistent UserName + with pytest.raises(iam_root.exceptions.NoSuchEntityException): + iam_root.attach_user_policy(UserName=name, PolicyArn=policy1) + with pytest.raises(iam_root.exceptions.NoSuchEntityException): + iam_root.detach_user_policy(UserName=name, PolicyArn=policy1) + with pytest.raises(iam_root.exceptions.NoSuchEntityException): + iam_root.list_attached_user_policies(UserName=name) + + iam_root.create_user(UserName=name, Path=path) + + # Detach fails on unattached PolicyArn + with pytest.raises(iam_root.exceptions.NoSuchEntityException): + iam_root.detach_user_policy(UserName=name, PolicyArn=policy1) + + iam_root.attach_user_policy(UserName=name, PolicyArn=policy1) + iam_root.attach_user_policy(UserName=name, PolicyArn=policy1) + + response = iam_root.list_attached_user_policies(UserName=name) + assert len(response['AttachedPolicies']) == 1 + assert 'AmazonS3FullAccess' == response['AttachedPolicies'][0]['PolicyName'] + assert policy1 == response['AttachedPolicies'][0]['PolicyArn'] + + iam_root.attach_user_policy(UserName=name, PolicyArn=policy2) + + response = iam_root.list_attached_user_policies(UserName=name) + policies = response['AttachedPolicies'] + assert len(policies) == 2 + names = [p['PolicyName'] for p in policies] + arns = [p['PolicyArn'] for p in policies] + assert 'AmazonS3FullAccess' in names + assert policy1 in arns + assert 'AmazonS3ReadOnlyAccess' in names + assert policy2 in arns + + iam_root.detach_user_policy(UserName=name, PolicyArn=policy2) + + # Detach fails after Detach + with pytest.raises(iam_root.exceptions.NoSuchEntityException): + iam_root.detach_user_policy(UserName=name, PolicyArn=policy2) + + response = iam_root.list_attached_user_policies(UserName=name) + assert len(response['AttachedPolicies']) == 1 + assert 'AmazonS3FullAccess' == response['AttachedPolicies'][0]['PolicyName'] + assert policy1 == response['AttachedPolicies'][0]['PolicyArn'] + + # DeleteUser fails while policies are still attached + with pytest.raises(iam_root.exceptions.DeleteConflictException): + iam_root.delete_user(UserName=name) + +@pytest.mark.user_policy +@pytest.mark.iam_account +def test_account_user_policy_allow(iam_root): + path = get_iam_path_prefix() + name = make_iam_name('name') + bucket_name = get_new_bucket_name() + iam_root.create_user(UserName=name, Path=path) + + key = iam_root.create_access_key(UserName=name)['AccessKey'] + client = get_iam_s3client(aws_access_key_id=key['AccessKeyId'], + aws_secret_access_key=key['SecretAccessKey']) + + # the access key may take a bit to start working. retry until it returns + # something other than InvalidAccessKeyId + e = assert_raises(ClientError, retry_on, 'InvalidAccessKeyId', 10, client.list_buckets) + # expect AccessDenied because no identity policy allows s3 actions + status, error_code = _get_status_and_error_code(e.response) + assert status == 403 + assert error_code == 'AccessDenied' + + # add a user policy that allows s3 actions + policy = json.dumps({ + 'Version': '2012-10-17', + 'Statement': [{ + 'Effect': 'Allow', + 'Action': 's3:*', + 'Resource': '*' + }] + }) + policy_name = 'AllowStar' + iam_root.put_user_policy(UserName=name, PolicyName=policy_name, PolicyDocument=policy) + + # the policy may take a bit to start working. retry until it returns + # something other than AccessDenied + retry_on('AccessDenied', 10, client.list_buckets) + + +def group_list_names(client, **kwargs): + p = client.get_paginator('list_groups') + names = [] + for response in p.paginate(**kwargs): + names += [u['GroupName'] for u in response['Groups']] + return names + +# IAM Group apis +@pytest.mark.group +@pytest.mark.iam_account +def test_account_group_create(iam_root): + path = get_iam_path_prefix() + name = make_iam_name('G1') + + assert [] == group_list_names(iam_root, PathPrefix=path) + + response = iam_root.create_group(GroupName=name, Path=path) + group = response['Group'] + assert path == group['Path'] + assert name == group['GroupName'] + assert len(group['GroupId']) + arn = group['Arn'] + assert arn.startswith('arn:aws:iam:') + assert arn.endswith(f':group{path}{name}') + + with pytest.raises(iam_root.exceptions.EntityAlreadyExistsException): + iam_root.create_group(GroupName=name) + + response = iam_root.get_group(GroupName=name) + assert group == response['Group'] + + assert [name] == group_list_names(iam_root, PathPrefix=path) + + iam_root.delete_group(GroupName=name) + + with pytest.raises(iam_root.exceptions.NoSuchEntityException): + iam_root.get_group(GroupName=name) + + assert [] == group_list_names(iam_root, PathPrefix=path) + +@pytest.mark.iam_account +@pytest.mark.group +def test_account_group_case_insensitive_name(iam_root): + path = get_iam_path_prefix() + name_upper = make_iam_name('G1') + name_lower = make_iam_name('g1') + response = iam_root.create_group(GroupName=name_upper, Path=path) + group = response['Group'] + + with pytest.raises(iam_root.exceptions.EntityAlreadyExistsException): + iam_root.create_group(GroupName=name_lower) + + response = iam_root.get_group(GroupName=name_lower) + assert group == response['Group'] + + iam_root.delete_group(GroupName=name_lower) + + with pytest.raises(iam_root.exceptions.NoSuchEntityException): + iam_root.delete_group(GroupName=name_upper) + +@pytest.mark.iam_account +@pytest.mark.group +def test_account_group_list(iam_root): + path = get_iam_path_prefix() + response = iam_root.list_groups(PathPrefix=path) + assert len(response['Groups']) == 0 + assert response['IsTruncated'] == False + + name1 = make_iam_name('aa') + name2 = make_iam_name('Ab') + name3 = make_iam_name('ac') + name4 = make_iam_name('Ad') + + # sort order is independent of Path and GroupName capitalization + iam_root.create_group(GroupName=name4, Path=path+'w/') + iam_root.create_group(GroupName=name3, Path=path+'x/') + iam_root.create_group(GroupName=name2, Path=path+'y/') + iam_root.create_group(GroupName=name1, Path=path+'z/') + + assert [name1, name2, name3, name4] == \ + group_list_names(iam_root, PathPrefix=path) + assert [name1, name2, name3, name4] == \ + group_list_names(iam_root, PathPrefix=path, PaginationConfig={'PageSize': 1}) + +@pytest.mark.group +@pytest.mark.iam_account +def test_account_group_update(iam_root): + path = get_iam_path_prefix() + name = make_iam_name('G1') + response = iam_root.create_group(GroupName=name, Path=path) + group_id = response['Group']['GroupId'] + + username = make_iam_name('U1') + iam_root.create_user(UserName=username, Path=path) + + iam_root.add_user_to_group(GroupName=name, UserName=username) + + response = iam_root.list_groups_for_user(UserName=username) + groups = response['Groups'] + assert len(groups) == 1 + assert path == groups[0]['Path'] + assert name == groups[0]['GroupName'] + assert group_id == groups[0]['GroupId'] + + new_path = path + 'new/' + new_name = make_iam_name('NG1') + iam_root.update_group(GroupName=name, NewPath=new_path, NewGroupName=new_name) + + response = iam_root.get_group(GroupName=new_name) + group = response['Group'] + assert new_path == group['Path'] + assert new_name == group['GroupName'] + assert group_id == group['GroupId'] + arn = group['Arn'] + assert arn.startswith('arn:aws:iam:') + assert arn.endswith(f':group{new_path}{new_name}') + users = response['Users'] + assert len(users) == 1 + assert username == users[0]['UserName'] + + response = iam_root.list_groups_for_user(UserName=username) + groups = response['Groups'] + assert len(groups) == 1 + assert new_path == groups[0]['Path'] + assert new_name == groups[0]['GroupName'] + assert group_id == groups[0]['GroupId'] + + +# IAM GroupPolicy apis +@pytest.mark.group_policy +@pytest.mark.iam_account +def test_account_inline_group_policy(iam_root): + path = get_iam_path_prefix() + name = make_iam_name('name') + policy_name = 'List' + bucket_name = get_new_bucket_name() + policy1 = json.dumps({'Version': '2012-10-17', 'Statement': [ + {'Effect': 'Deny', + 'Action': 's3:ListBucket', + 'Resource': f'arn:aws:s3:::{bucket_name}'}]}) + policy2 = json.dumps({'Version': '2012-10-17', 'Statement': [ + {'Effect': 'Allow', + 'Action': 's3:ListBucket', + 'Resource': f'arn:aws:s3:::{bucket_name}'}]}) + + # Get/Put/Delete fail on nonexistent GroupName + with pytest.raises(iam_root.exceptions.NoSuchEntityException): + iam_root.get_group_policy(GroupName=name, PolicyName=policy_name) + with pytest.raises(iam_root.exceptions.NoSuchEntityException): + iam_root.delete_group_policy(GroupName=name, PolicyName=policy_name) + with pytest.raises(iam_root.exceptions.NoSuchEntityException): + iam_root.put_group_policy(GroupName=name, PolicyName=policy_name, PolicyDocument=policy1) + + iam_root.create_group(GroupName=name, Path=path) + + # Get/Delete fail on nonexistent PolicyName + with pytest.raises(iam_root.exceptions.NoSuchEntityException): + iam_root.get_group_policy(GroupName=name, PolicyName=policy_name) + with pytest.raises(iam_root.exceptions.NoSuchEntityException): + iam_root.delete_group_policy(GroupName=name, PolicyName=policy_name) + + iam_root.put_group_policy(GroupName=name, PolicyName=policy_name, PolicyDocument=policy1) + + response = iam_root.get_group_policy(GroupName=name, PolicyName=policy_name) + assert policy1 == json.dumps(response['PolicyDocument']) + response = iam_root.list_group_policies(GroupName=name) + assert [policy_name] == response['PolicyNames'] + + iam_root.put_group_policy(GroupName=name, PolicyName=policy_name, PolicyDocument=policy2) + + response = iam_root.get_group_policy(GroupName=name, PolicyName=policy_name) + assert policy2 == json.dumps(response['PolicyDocument']) + response = iam_root.list_group_policies(GroupName=name) + assert [policy_name] == response['PolicyNames'] + + # DeleteGroup fails while policies are still attached + with pytest.raises(iam_root.exceptions.DeleteConflictException): + iam_root.delete_group(GroupName=name) + + iam_root.delete_group_policy(GroupName=name, PolicyName=policy_name) + + # Get/Delete fail after Delete + with pytest.raises(iam_root.exceptions.NoSuchEntityException): + iam_root.get_group_policy(GroupName=name, PolicyName=policy_name) + with pytest.raises(iam_root.exceptions.NoSuchEntityException): + iam_root.delete_group_policy(GroupName=name, PolicyName=policy_name) + + response = iam_root.list_group_policies(GroupName=name) + assert [] == response['PolicyNames'] + +@pytest.mark.group_policy +@pytest.mark.iam_account +def test_account_managed_group_policy(iam_root): + path = get_iam_path_prefix() + name = make_iam_name('name') + policy1 = 'arn:aws:iam::aws:policy/AmazonS3FullAccess' + policy2 = 'arn:aws:iam::aws:policy/AmazonS3ReadOnlyAccess' + + # Attach/Detach/List fail on nonexistent GroupName + with pytest.raises(iam_root.exceptions.NoSuchEntityException): + iam_root.attach_group_policy(GroupName=name, PolicyArn=policy1) + with pytest.raises(iam_root.exceptions.NoSuchEntityException): + iam_root.detach_group_policy(GroupName=name, PolicyArn=policy1) + with pytest.raises(iam_root.exceptions.NoSuchEntityException): + iam_root.list_attached_group_policies(GroupName=name) + + iam_root.create_group(GroupName=name, Path=path) + + # Detach fails on unattached PolicyArn + with pytest.raises(iam_root.exceptions.NoSuchEntityException): + iam_root.detach_group_policy(GroupName=name, PolicyArn=policy1) + + iam_root.attach_group_policy(GroupName=name, PolicyArn=policy1) + iam_root.attach_group_policy(GroupName=name, PolicyArn=policy1) + + response = iam_root.list_attached_group_policies(GroupName=name) + assert len(response['AttachedPolicies']) == 1 + assert 'AmazonS3FullAccess' == response['AttachedPolicies'][0]['PolicyName'] + assert policy1 == response['AttachedPolicies'][0]['PolicyArn'] + + iam_root.attach_group_policy(GroupName=name, PolicyArn=policy2) + + response = iam_root.list_attached_group_policies(GroupName=name) + policies = response['AttachedPolicies'] + assert len(policies) == 2 + names = [p['PolicyName'] for p in policies] + arns = [p['PolicyArn'] for p in policies] + assert 'AmazonS3FullAccess' in names + assert policy1 in arns + assert 'AmazonS3ReadOnlyAccess' in names + assert policy2 in arns + + iam_root.detach_group_policy(GroupName=name, PolicyArn=policy2) + + # Detach fails after Detach + with pytest.raises(iam_root.exceptions.NoSuchEntityException): + iam_root.detach_group_policy(GroupName=name, PolicyArn=policy2) + + response = iam_root.list_attached_group_policies(GroupName=name) + assert len(response['AttachedPolicies']) == 1 + assert 'AmazonS3FullAccess' == response['AttachedPolicies'][0]['PolicyName'] + assert policy1 == response['AttachedPolicies'][0]['PolicyArn'] + + # DeleteGroup fails while policies are still attached + with pytest.raises(iam_root.exceptions.DeleteConflictException): + iam_root.delete_group(GroupName=name) + +@pytest.mark.group_policy +@pytest.mark.iam_account +def test_account_inline_group_policy_allow(iam_root): + path = get_iam_path_prefix() + username = make_iam_name('User') + groupname = make_iam_name('Group') + bucket_name = get_new_bucket_name() + + iam_root.create_user(UserName=username, Path=path) + + key = iam_root.create_access_key(UserName=username)['AccessKey'] + client = get_iam_s3client(aws_access_key_id=key['AccessKeyId'], + aws_secret_access_key=key['SecretAccessKey']) + + iam_root.create_group(GroupName=groupname, Path=path) + iam_root.add_user_to_group(GroupName=groupname, UserName=username) + + # the access key may take a bit to start working. retry until it returns + # something other than InvalidAccessKeyId + e = assert_raises(ClientError, retry_on, 'InvalidAccessKeyId', 10, client.list_buckets) + # expect AccessDenied because no identity policy allows s3 actions + status, error_code = _get_status_and_error_code(e.response) + assert status == 403 + assert error_code == 'AccessDenied' + + # add a group policy that allows s3 actions + policy = json.dumps({ + 'Version': '2012-10-17', + 'Statement': [{ + 'Effect': 'Allow', + 'Action': 's3:*', + 'Resource': '*' + }] + }) + policy_name = 'AllowStar' + iam_root.put_group_policy(GroupName=groupname, PolicyName=policy_name, PolicyDocument=policy) + + # the policy may take a bit to start working. retry until it returns + # something other than AccessDenied + retry_on('AccessDenied', 10, client.list_buckets) + +@pytest.mark.group_policy +@pytest.mark.iam_account +def test_account_managed_group_policy_allow(iam_root): + path = get_iam_path_prefix() + username = make_iam_name('User') + groupname = make_iam_name('Group') + bucket_name = get_new_bucket_name() + + iam_root.create_user(UserName=username, Path=path) + + key = iam_root.create_access_key(UserName=username)['AccessKey'] + client = get_iam_s3client(aws_access_key_id=key['AccessKeyId'], + aws_secret_access_key=key['SecretAccessKey']) + + iam_root.create_group(GroupName=groupname, Path=path) + iam_root.add_user_to_group(GroupName=groupname, UserName=username) + + # the access key may take a bit to start working. retry until it returns + # something other than InvalidAccessKeyId + e = assert_raises(ClientError, retry_on, 'InvalidAccessKeyId', 10, client.list_buckets) + # expect AccessDenied because no identity policy allows s3 actions + status, error_code = _get_status_and_error_code(e.response) + assert status == 403 + assert error_code == 'AccessDenied' + + # add a group policy that allows s3 read actions + policy_arn = 'arn:aws:iam::aws:policy/AmazonS3ReadOnlyAccess' + iam_root.attach_group_policy(GroupName=groupname, PolicyArn=policy_arn) + + # the policy may take a bit to start working. retry until it returns + # something other than AccessDenied + retry_on('AccessDenied', 10, client.list_buckets) + + +assume_role_policy = json.dumps({ + 'Version': '2012-10-17', + 'Statement': [{ + 'Effect': 'Allow', + 'Action': 'sts:AssumeRole', + 'Principal': {'AWS': '*'} + }] + }) + +# IAM Role apis +@pytest.mark.iam_account +@pytest.mark.iam_role +def test_account_role_create(iam_root): + path = get_iam_path_prefix() + name1 = make_iam_name('R1') + desc = 'my role description' + max_duration = 43200 + response = iam_root.create_role(RoleName=name1, Path=path, AssumeRolePolicyDocument=assume_role_policy, Description=desc, MaxSessionDuration=max_duration) + role = response['Role'] + assert role['Path'] == path + assert role['RoleName'] == name1 + assert assume_role_policy == json.dumps(role['AssumeRolePolicyDocument']) + assert len(role['RoleId']) + arn = role['Arn'] + assert arn.startswith('arn:aws:iam:') + assert arn.endswith(f':role{path}{name1}') + assert role['CreateDate'] > datetime.datetime(1970, 1, 1, tzinfo=datetime.timezone.utc) + # AWS doesn't include these for CreateRole, only GetRole + #assert desc == role['Description'] + #assert max_duration == role['MaxSessionDuration'] + + response = iam_root.get_role(RoleName=name1) + role = response['Role'] + assert arn == role['Arn'] + assert desc == role['Description'] + assert max_duration == role['MaxSessionDuration'] + + path2 = get_iam_path_prefix() + 'foo/' + with pytest.raises(iam_root.exceptions.EntityAlreadyExistsException): + iam_root.create_role(RoleName=name1, Path=path2, AssumeRolePolicyDocument=assume_role_policy) + + name2 = make_iam_name('R2') + response = iam_root.create_role(RoleName=name2, Path=path2, AssumeRolePolicyDocument=assume_role_policy) + role = response['Role'] + assert role['Path'] == path2 + assert role['RoleName'] == name2 + +@pytest.mark.iam_account +@pytest.mark.iam_role +def test_account_role_case_insensitive_name(iam_root): + path = get_iam_path_prefix() + name_upper = make_iam_name('R1') + name_lower = make_iam_name('r1') + response = iam_root.create_role(RoleName=name_upper, Path=path, AssumeRolePolicyDocument=assume_role_policy) + rid = response['Role']['RoleId'] + + # name is case-insensitive, so 'r1' should also conflict + with pytest.raises(iam_root.exceptions.EntityAlreadyExistsException): + iam_root.create_role(RoleName=name_lower, AssumeRolePolicyDocument=assume_role_policy) + + # search for 'r1' should return the same 'R1' role + response = iam_root.get_role(RoleName=name_lower) + assert rid == response['Role']['RoleId'] + + # delete for 'r1' should delete the same 'R1' role + iam_root.delete_role(RoleName=name_lower) + + with pytest.raises(iam_root.exceptions.NoSuchEntityException): + iam_root.get_role(RoleName=name_lower) + +@pytest.mark.iam_account +@pytest.mark.iam_role +def test_account_role_delete(iam_root): + path = get_iam_path_prefix() + name = make_iam_name('U1') + with pytest.raises(iam_root.exceptions.NoSuchEntityException): + iam_root.delete_role(RoleName=name) + + response = iam_root.create_role(RoleName=name, Path=path, AssumeRolePolicyDocument=assume_role_policy) + uid = response['Role']['RoleId'] + create_date = response['Role']['CreateDate'] + + iam_root.delete_role(RoleName=name) + + response = iam_root.create_role(RoleName=name, Path=path, AssumeRolePolicyDocument=assume_role_policy) + assert uid != response['Role']['RoleId'] + assert create_date <= response['Role']['CreateDate'] + +def role_list_names(client, **kwargs): + p = client.get_paginator('list_roles') + rolenames = [] + for response in p.paginate(**kwargs): + rolenames += [u['RoleName'] for u in response['Roles']] + return rolenames + +@pytest.mark.iam_account +@pytest.mark.iam_role +def test_account_role_list(iam_root): + path = get_iam_path_prefix() + response = iam_root.list_roles(PathPrefix=path) + assert len(response['Roles']) == 0 + assert response['IsTruncated'] == False + + name1 = make_iam_name('aa') + name2 = make_iam_name('Ab') + name3 = make_iam_name('ac') + name4 = make_iam_name('Ad') + + # sort order is independent of CreateDate, Path, and RoleName capitalization + iam_root.create_role(RoleName=name4, Path=path+'w/', AssumeRolePolicyDocument=assume_role_policy) + iam_root.create_role(RoleName=name3, Path=path+'x/', AssumeRolePolicyDocument=assume_role_policy) + iam_root.create_role(RoleName=name2, Path=path+'y/', AssumeRolePolicyDocument=assume_role_policy) + iam_root.create_role(RoleName=name1, Path=path+'z/', AssumeRolePolicyDocument=assume_role_policy) + + assert [name1, name2, name3, name4] == \ + role_list_names(iam_root, PathPrefix=path) + assert [name1, name2, name3, name4] == \ + role_list_names(iam_root, PathPrefix=path, PaginationConfig={'PageSize': 1}) + +@pytest.mark.iam_account +@pytest.mark.iam_role +def test_account_role_list_path_prefix(iam_root): + path = get_iam_path_prefix() + response = iam_root.list_roles(PathPrefix=path) + assert len(response['Roles']) == 0 + assert response['IsTruncated'] == False + + name1 = make_iam_name('a') + name2 = make_iam_name('b') + name3 = make_iam_name('c') + name4 = make_iam_name('d') + + iam_root.create_role(RoleName=name1, Path=path, AssumeRolePolicyDocument=assume_role_policy) + iam_root.create_role(RoleName=name2, Path=path, AssumeRolePolicyDocument=assume_role_policy) + iam_root.create_role(RoleName=name3, Path=path+'a/', AssumeRolePolicyDocument=assume_role_policy) + iam_root.create_role(RoleName=name4, Path=path+'a/x/', AssumeRolePolicyDocument=assume_role_policy) + + assert [name1, name2, name3, name4] == \ + role_list_names(iam_root, PathPrefix=path) + assert [name1, name2, name3, name4] == \ + role_list_names(iam_root, PathPrefix=path, + PaginationConfig={'PageSize': 1}) + assert [name3, name4] == \ + role_list_names(iam_root, PathPrefix=path+'a') + assert [name3, name4] == \ + role_list_names(iam_root, PathPrefix=path+'a', + PaginationConfig={'PageSize': 1}) + assert [name4] == \ + role_list_names(iam_root, PathPrefix=path+'a/x') + assert [name4] == \ + role_list_names(iam_root, PathPrefix=path+'a/x', + PaginationConfig={'PageSize': 1}) + assert [] == role_list_names(iam_root, PathPrefix=path+'a/x/d') + +@pytest.mark.iam_account +@pytest.mark.iam_role +def test_account_role_update(iam_root): + path = get_iam_path_prefix() + name = make_iam_name('a') + with pytest.raises(iam_root.exceptions.NoSuchEntityException): + iam_root.update_role(RoleName=name) + + iam_root.create_role(RoleName=name, Path=path, AssumeRolePolicyDocument=assume_role_policy) + + response = iam_root.get_role(RoleName=name) + assert name == response['Role']['RoleName'] + arn = response['Role']['Arn'] + rid = response['Role']['RoleId'] + + desc = 'my role description' + iam_root.update_role(RoleName=name, Description=desc, MaxSessionDuration=43200) + + response = iam_root.get_role(RoleName=name) + assert rid == response['Role']['RoleId'] + assert arn == response['Role']['Arn'] + assert desc == response['Role']['Description'] + assert 43200 == response['Role']['MaxSessionDuration'] + + +role_policy = json.dumps({ + 'Version': '2012-10-17', + 'Statement': [{ + 'Effect': 'Allow', + 'Action': 's3:*', + "Resource": "*" + }] + }) + +# IAM RolePolicy apis +@pytest.mark.iam_account +@pytest.mark.iam_role +@pytest.mark.role_policy +def test_account_role_policy(iam_root): + path = get_iam_path_prefix() + role_name = make_iam_name('r') + policy_name = 'MyPolicy' + policy2_name = 'AnotherPolicy' + + # Get/Put/Delete fail on nonexistent RoleName + with pytest.raises(iam_root.exceptions.NoSuchEntityException): + iam_root.get_role_policy(RoleName=role_name, PolicyName=policy_name) + with pytest.raises(iam_root.exceptions.NoSuchEntityException): + iam_root.delete_role_policy(RoleName=role_name, PolicyName=policy_name) + with pytest.raises(iam_root.exceptions.NoSuchEntityException): + iam_root.put_role_policy(RoleName=role_name, PolicyName=policy_name, PolicyDocument=role_policy) + + iam_root.create_role(RoleName=role_name, Path=path, AssumeRolePolicyDocument=assume_role_policy) + + # Get/Delete fail on nonexistent PolicyName + with pytest.raises(iam_root.exceptions.NoSuchEntityException): + iam_root.get_role_policy(RoleName=role_name, PolicyName=policy_name) + with pytest.raises(iam_root.exceptions.NoSuchEntityException): + iam_root.delete_role_policy(RoleName=role_name, PolicyName=policy_name) + + iam_root.put_role_policy(RoleName=role_name, PolicyName=policy_name, PolicyDocument=role_policy) + + response = iam_root.get_role_policy(RoleName=role_name, PolicyName=policy_name) + assert role_name == response['RoleName'] + assert policy_name == response['PolicyName'] + assert role_policy == json.dumps(response['PolicyDocument']) + + response = iam_root.list_role_policies(RoleName=role_name) + assert [policy_name] == response['PolicyNames'] + + iam_root.put_role_policy(RoleName=role_name, PolicyName=policy2_name, PolicyDocument=role_policy) + + response = iam_root.list_role_policies(RoleName=role_name) + assert [policy2_name, policy_name] == response['PolicyNames'] + + iam_root.delete_role_policy(RoleName=role_name, PolicyName=policy_name) + iam_root.delete_role_policy(RoleName=role_name, PolicyName=policy2_name) + + # Get/Delete fail after Delete + with pytest.raises(iam_root.exceptions.NoSuchEntityException): + iam_root.get_role_policy(RoleName=role_name, PolicyName=policy_name) + with pytest.raises(iam_root.exceptions.NoSuchEntityException): + iam_root.delete_role_policy(RoleName=role_name, PolicyName=policy_name) + +@pytest.mark.role_policy +@pytest.mark.iam_account +def test_account_role_policy_managed(iam_root): + path = get_iam_path_prefix() + name = make_iam_name('name') + policy1 = 'arn:aws:iam::aws:policy/AmazonS3FullAccess' + policy2 = 'arn:aws:iam::aws:policy/AmazonS3ReadOnlyAccess' + + # Attach/Detach/List fail on nonexistent RoleName + with pytest.raises(iam_root.exceptions.NoSuchEntityException): + iam_root.attach_role_policy(RoleName=name, PolicyArn=policy1) + with pytest.raises(iam_root.exceptions.NoSuchEntityException): + iam_root.detach_role_policy(RoleName=name, PolicyArn=policy1) + with pytest.raises(iam_root.exceptions.NoSuchEntityException): + iam_root.list_attached_role_policies(RoleName=name) + + iam_root.create_role(RoleName=name, Path=path, AssumeRolePolicyDocument=assume_role_policy) + + # Detach fails on unattached PolicyArn + with pytest.raises(iam_root.exceptions.NoSuchEntityException): + iam_root.detach_role_policy(RoleName=name, PolicyArn=policy1) + + iam_root.attach_role_policy(RoleName=name, PolicyArn=policy1) + iam_root.attach_role_policy(RoleName=name, PolicyArn=policy1) + + response = iam_root.list_attached_role_policies(RoleName=name) + assert len(response['AttachedPolicies']) == 1 + assert 'AmazonS3FullAccess' == response['AttachedPolicies'][0]['PolicyName'] + assert policy1 == response['AttachedPolicies'][0]['PolicyArn'] + + iam_root.attach_role_policy(RoleName=name, PolicyArn=policy2) + + response = iam_root.list_attached_role_policies(RoleName=name) + policies = response['AttachedPolicies'] + assert len(policies) == 2 + names = [p['PolicyName'] for p in policies] + arns = [p['PolicyArn'] for p in policies] + assert 'AmazonS3FullAccess' in names + assert policy1 in arns + assert 'AmazonS3ReadOnlyAccess' in names + assert policy2 in arns + + iam_root.detach_role_policy(RoleName=name, PolicyArn=policy2) + + # Detach fails after Detach + with pytest.raises(iam_root.exceptions.NoSuchEntityException): + iam_root.detach_role_policy(RoleName=name, PolicyArn=policy2) + + response = iam_root.list_attached_role_policies(RoleName=name) + assert len(response['AttachedPolicies']) == 1 + assert 'AmazonS3FullAccess' == response['AttachedPolicies'][0]['PolicyName'] + assert policy1 == response['AttachedPolicies'][0]['PolicyArn'] + + # DeleteRole fails while policies are still attached + with pytest.raises(iam_root.exceptions.DeleteConflictException): + iam_root.delete_role(RoleName=name) + +@pytest.mark.iam_account +@pytest.mark.iam_role +@pytest.mark.role_policy +def test_account_role_policy_allow(iam_root): + path = get_iam_path_prefix() + user_name = make_iam_name('MyUser') + role_name = make_iam_name('MyRole') + session_name = 'MySession' + + user = iam_root.create_user(UserName=user_name, Path=path)['User'] + user_arn = user['Arn'] + + trust_policy = json.dumps({ + 'Version': '2012-10-17', + 'Statement': [{ + 'Effect': 'Allow', + 'Action': 'sts:AssumeRole', + 'Principal': {'AWS': user_arn} + }] + }) + # returns MalformedPolicyDocument until the user arn starts working + role = retry_on('MalformedPolicyDocument', 10, iam_root.create_role, + RoleName=role_name, Path=path, AssumeRolePolicyDocument=trust_policy)['Role'] + role_arn = role['Arn'] + + key = iam_root.create_access_key(UserName=user_name)['AccessKey'] + sts = get_sts_client(aws_access_key_id=key['AccessKeyId'], + aws_secret_access_key=key['SecretAccessKey']) + + # returns InvalidClientTokenId or AccessDenied until the access key starts working + response = retry_on(('InvalidClientTokenId', 'AccessDenied'), 10, sts.assume_role, + RoleArn=role_arn, RoleSessionName=session_name) + creds = response['Credentials'] + + s3 = get_iam_s3client(aws_access_key_id = creds['AccessKeyId'], + aws_secret_access_key = creds['SecretAccessKey'], + aws_session_token = creds['SessionToken']) + + # expect AccessDenied because no identity policy allows s3 actions + e = assert_raises(ClientError, s3.list_buckets) + status, error_code = _get_status_and_error_code(e.response) + assert status == 403 + assert error_code == 'AccessDenied' + + policy_name = 'AllowListAllMyBuckets' + policy = json.dumps({ + 'Version': '2012-10-17', + 'Statement': [{ + 'Effect': 'Allow', + 'Action': 's3:ListAllMyBuckets', + 'Resource': '*' + }] + }) + iam_root.put_role_policy(RoleName=role_name, PolicyName=policy_name, PolicyDocument=policy) + + # the policy may take a bit to start working. retry until it returns + # something other than AccessDenied + retry_on('AccessDenied', 10, s3.list_buckets) + +# alt account user assumes main account role to access main account bucket +@pytest.mark.iam_account +@pytest.mark.iam_cross_account +@pytest.mark.iam_role +@pytest.mark.role_policy +def test_same_account_role_policy_allow(iam_root, iam_alt_root): + path = get_iam_path_prefix() + user_name = make_iam_name('AltUser') + role_name = make_iam_name('MyRole') + session_name = 'MySession' + bucket_name = get_new_bucket_name() + + user = iam_alt_root.create_user(UserName=user_name, Path=path)['User'] + user_arn = user['Arn'] + key = iam_alt_root.create_access_key(UserName=user_name)['AccessKey'] + + s3_main = get_iam_root_client(service_name='s3') + s3_main.create_bucket(Bucket=bucket_name) + + trust_policy = json.dumps({ + 'Version': '2012-10-17', + 'Statement': [{ + 'Effect': 'Allow', + 'Action': 'sts:AssumeRole', + 'Principal': {'AWS': user_arn} + }] + }) + # returns MalformedPolicyDocument until the user arn starts working + role = retry_on('MalformedPolicyDocument', 10, iam_root.create_role, + RoleName=role_name, Path=path, AssumeRolePolicyDocument=trust_policy)['Role'] + role_arn = role['Arn'] + + sts = get_sts_client(aws_access_key_id=key['AccessKeyId'], + aws_secret_access_key=key['SecretAccessKey']) + + # returns InvalidClientTokenId or AccessDenied until the access key starts working + response = retry_on(('InvalidClientTokenId', 'AccessDenied'), 10, sts.assume_role, + RoleArn=role_arn, RoleSessionName=session_name) + creds = response['Credentials'] + + s3 = get_iam_s3client(aws_access_key_id = creds['AccessKeyId'], + aws_secret_access_key = creds['SecretAccessKey'], + aws_session_token = creds['SessionToken']) + + # expect AccessDenied because no identity policy allows s3 actions + e = assert_raises(ClientError, s3.list_objects, Bucket=bucket_name) + status, error_code = _get_status_and_error_code(e.response) + assert status == 403 + assert error_code == 'AccessDenied' + + policy_name = 'AllowListBucket' + policy = json.dumps({ + 'Version': '2012-10-17', + 'Statement': [{ + 'Effect': 'Allow', + 'Action': 's3:ListBucket', + 'Resource': '*' + }] + }) + iam_root.put_role_policy(RoleName=role_name, PolicyName=policy_name, PolicyDocument=policy) + + # the policy may take a bit to start working. retry until it returns + # something other than AccessDenied + retry_on('AccessDenied', 10, s3.list_objects, Bucket=bucket_name) + +# alt account user assumes main account role to access alt account bucket +@pytest.mark.iam_account +@pytest.mark.iam_cross_account +@pytest.mark.iam_role +@pytest.mark.role_policy +def test_cross_account_role_policy_allow(iam_root, iam_alt_root): + path = get_iam_path_prefix() + user_name = make_iam_name('AltUser') + role_name = make_iam_name('MyRole') + session_name = 'MySession' + bucket_name = get_new_bucket_name() + + user = iam_alt_root.create_user(UserName=user_name, Path=path)['User'] + user_arn = user['Arn'] + key = iam_alt_root.create_access_key(UserName=user_name)['AccessKey'] + + s3_alt = get_iam_alt_root_client(service_name='s3') + s3_alt.create_bucket(Bucket=bucket_name) + + trust_policy = json.dumps({ + 'Version': '2012-10-17', + 'Statement': [{ + 'Effect': 'Allow', + 'Action': 'sts:AssumeRole', + 'Principal': {'AWS': user_arn} + }] + }) + # returns MalformedPolicyDocument until the user arn starts working + role = retry_on('MalformedPolicyDocument', 10, iam_root.create_role, + RoleName=role_name, Path=path, AssumeRolePolicyDocument=trust_policy)['Role'] + role_arn = role['Arn'] + + sts = get_sts_client(aws_access_key_id=key['AccessKeyId'], + aws_secret_access_key=key['SecretAccessKey']) + + # returns InvalidClientTokenId or AccessDenied until the access key starts working + response = retry_on(('InvalidClientTokenId', 'AccessDenied'), 10, sts.assume_role, + RoleArn=role_arn, RoleSessionName=session_name) + creds = response['Credentials'] + + s3 = get_iam_s3client(aws_access_key_id = creds['AccessKeyId'], + aws_secret_access_key = creds['SecretAccessKey'], + aws_session_token = creds['SessionToken']) + + # expect AccessDenied because no identity policy allows s3 actions + e = assert_raises(ClientError, s3.list_objects, Bucket=bucket_name) + status, error_code = _get_status_and_error_code(e.response) + assert status == 403 + assert error_code == 'AccessDenied' + + policy_name = 'AllowListBucket' + policy = json.dumps({ + 'Version': '2012-10-17', + 'Statement': [{ + 'Effect': 'Allow', + 'Action': 's3:ListBucket', + 'Resource': '*' + }] + }) + iam_root.put_role_policy(RoleName=role_name, PolicyName=policy_name, PolicyDocument=policy) + + # expect AccessDenied because no resource policy allows the main account + e = assert_raises(ClientError, s3.list_objects, Bucket=bucket_name) + status, error_code = _get_status_and_error_code(e.response) + assert status == 403 + assert error_code == 'AccessDenied' + + # add a bucket policy that allows s3:ListBucket for the main account's arn + main_arn = iam_root.get_user()['User']['Arn'] + s3_alt.put_bucket_policy(Bucket=bucket_name, Policy=json.dumps({ + 'Version': '2012-10-17', + 'Statement': [{ + 'Effect': 'Allow', + 'Principal': {'AWS': main_arn}, + 'Action': 's3:ListBucket', + 'Resource': f'arn:aws:s3:::{bucket_name}' + }] + })) + + # the policy may take a bit to start working. retry until it returns + # something other than AccessDenied + retry_on('AccessDenied', 10, s3.list_objects, Bucket=bucket_name) + +# alt account user assumes main account role to create a bucket +@pytest.mark.iam_account +@pytest.mark.iam_cross_account +@pytest.mark.iam_role +@pytest.mark.role_policy +def test_account_role_policy_allow_create_bucket(iam_root, iam_alt_root): + path = get_iam_path_prefix() + user_name = make_iam_name('AltUser') + role_name = make_iam_name('MyRole') + session_name = 'MySession' + bucket_name = get_new_bucket_name() + + user = iam_alt_root.create_user(UserName=user_name, Path=path)['User'] + user_arn = user['Arn'] + key = iam_alt_root.create_access_key(UserName=user_name)['AccessKey'] + + trust_policy = json.dumps({ + 'Version': '2012-10-17', + 'Statement': [{ + 'Effect': 'Allow', + 'Action': 'sts:AssumeRole', + 'Principal': {'AWS': user_arn} + }] + }) + # returns MalformedPolicyDocument until the user arn starts working + role = retry_on('MalformedPolicyDocument', 10, iam_root.create_role, + RoleName=role_name, Path=path, AssumeRolePolicyDocument=trust_policy)['Role'] + role_arn = role['Arn'] + + sts = get_sts_client(aws_access_key_id=key['AccessKeyId'], + aws_secret_access_key=key['SecretAccessKey']) + + # returns InvalidClientTokenId or AccessDenied until the access key starts working + response = retry_on(('InvalidClientTokenId', 'AccessDenied'), 10, sts.assume_role, + RoleArn=role_arn, RoleSessionName=session_name) + creds = response['Credentials'] + + s3 = get_iam_s3client(aws_access_key_id = creds['AccessKeyId'], + aws_secret_access_key = creds['SecretAccessKey'], + aws_session_token = creds['SessionToken']) + + # expect AccessDenied because no identity policy allows s3 actions + e = assert_raises(ClientError, s3.create_bucket, Bucket=bucket_name, ObjectOwnership='ObjectWriter', ACL='private') + status, error_code = _get_status_and_error_code(e.response) + assert status == 403 + assert error_code == 'AccessDenied' + + policy_name = 'AllowCreateBucket' + policy = json.dumps({ + 'Version': '2012-10-17', + 'Statement': [{ + 'Effect': 'Allow', + 'Action': ['s3:CreateBucket', 's3:PutBucketAcl'], + 'Resource': '*' + }] + }) + iam_root.put_role_policy(RoleName=role_name, PolicyName=policy_name, PolicyDocument=policy) + + # the policy may take a bit to start working. retry until it returns + # something other than AccessDenied + retry_on('AccessDenied', 10, s3.create_bucket, Bucket=bucket_name, ObjectOwnership='ObjectWriter', ACL='private') + + # verify that the bucket is owned by the role's account + s3_main = get_iam_root_client(service_name='s3') + response = s3_main.get_bucket_acl(Bucket=bucket_name) + + main_arn = iam_root.get_user()['User']['Arn'] + account_id = main_arn.removeprefix('arn:aws:iam::').removesuffix(':root') + assert response['Owner']['ID'] == account_id + assert response['Grants'][0]['Grantee']['ID'] == account_id + +# alt account user assumes main account role to read the role info +@pytest.mark.iam_account +@pytest.mark.iam_cross_account +@pytest.mark.iam_role +@pytest.mark.role_policy +def test_account_role_policy_allow_get_role(iam_root, iam_alt_root): + path = get_iam_path_prefix() + user_name = make_iam_name('AltUser') + role_name = make_iam_name('MyRole') + session_name = 'MySession' + bucket_name = get_new_bucket_name() + + user = iam_alt_root.create_user(UserName=user_name, Path=path)['User'] + user_arn = user['Arn'] + key = iam_alt_root.create_access_key(UserName=user_name)['AccessKey'] + + trust_policy = json.dumps({ + 'Version': '2012-10-17', + 'Statement': [{ + 'Effect': 'Allow', + 'Action': 'sts:AssumeRole', + 'Principal': {'AWS': user_arn} + }] + }) + # returns MalformedPolicyDocument until the user arn starts working + role = retry_on('MalformedPolicyDocument', 10, iam_root.create_role, + RoleName=role_name, Path=path, AssumeRolePolicyDocument=trust_policy)['Role'] + role_arn = role['Arn'] + + sts = get_sts_client(aws_access_key_id=key['AccessKeyId'], + aws_secret_access_key=key['SecretAccessKey']) + + # returns InvalidClientTokenId or AccessDenied until the access key starts working + response = retry_on(('InvalidClientTokenId', 'AccessDenied'), 10, sts.assume_role, + RoleArn=role_arn, RoleSessionName=session_name) + creds = response['Credentials'] + + iam = get_iam_root_client(service_name='iam', + aws_access_key_id = creds['AccessKeyId'], + aws_secret_access_key = creds['SecretAccessKey'], + aws_session_token = creds['SessionToken']) + + # expect AccessDenied because no identity policy allows iam actions + e = assert_raises(ClientError, iam.get_role, RoleName=role_name) + status, error_code = _get_status_and_error_code(e.response) + assert status == 403 + assert error_code == 'AccessDenied' + + policy_name = 'AllowGetRole' + policy = json.dumps({ + 'Version': '2012-10-17', + 'Statement': [{ + 'Effect': 'Allow', + 'Action': 'iam:GetRole', + 'Resource': '*' + }] + }) + iam_root.put_role_policy(RoleName=role_name, PolicyName=policy_name, PolicyDocument=policy) + + # the policy may take a bit to start working. retry until it returns + # something other than AccessDenied + retry_on('AccessDenied', 10, iam.get_role, RoleName=role_name) + + +# IAM OpenIDConnectProvider apis +@pytest.mark.iam_account +def test_account_oidc_provider(iam_root): + url_host = get_iam_path_prefix()[1:] + 'example.com' + url = 'http://' + url_host + + response = iam_root.create_open_id_connect_provider( + ClientIDList=['my-application-id'], + ThumbprintList=['3768084dfb3d2b68b7897bf5f565da8efEXAMPLE'], + Url=url) + arn = response['OpenIDConnectProviderArn'] + assert arn.endswith(f':oidc-provider/{url_host}') + + response = iam_root.list_open_id_connect_providers() + arns = [p['Arn'] for p in response['OpenIDConnectProviderList']] + assert arn in arns + + response = iam_root.get_open_id_connect_provider(OpenIDConnectProviderArn=arn) + assert url == response['Url'] + assert ['my-application-id'] == response['ClientIDList'] + assert ['3768084dfb3d2b68b7897bf5f565da8efEXAMPLE'] == response['ThumbprintList'] + + iam_root.delete_open_id_connect_provider(OpenIDConnectProviderArn=arn) + + response = iam_root.list_open_id_connect_providers() + arns = [p['Arn'] for p in response['OpenIDConnectProviderList']] + assert arn not in arns + + with pytest.raises(iam_root.exceptions.NoSuchEntityException): + iam_root.get_open_id_connect_provider(OpenIDConnectProviderArn=arn) + with pytest.raises(iam_root.exceptions.NoSuchEntityException): + iam_root.delete_open_id_connect_provider(OpenIDConnectProviderArn=arn) + + +# test cross-account access, adding user policy before the bucket policy +def _test_cross_account_user_bucket_policy(roots3, alt_root, alt_name, alt_arn): + # add a user policy that allows s3 actions + alt_root.put_user_policy(UserName=alt_name, PolicyName='AllowStar', PolicyDocument=json.dumps({ + 'Version': '2012-10-17', + 'Statement': [{ + 'Effect': 'Allow', + 'Action': 's3:*', + 'Resource': '*' + }] + })) + + key = alt_root.create_access_key(UserName=alt_name)['AccessKey'] + alts3 = get_iam_s3client(aws_access_key_id=key['AccessKeyId'], + aws_secret_access_key=key['SecretAccessKey']) + + # create a bucket with the root user + bucket = get_new_bucket(roots3) + try: + # the access key may take a bit to start working. retry until it returns + # something other than InvalidAccessKeyId + e = assert_raises(ClientError, retry_on, 'InvalidAccessKeyId', 10, alts3.list_objects, Bucket=bucket) + status, error_code = _get_status_and_error_code(e.response) + assert status == 403 + assert error_code == 'AccessDenied' + + # add a bucket policy that allows s3:ListBucket for the iam user's arn + roots3.put_bucket_policy(Bucket=bucket, Policy=json.dumps({ + 'Version': '2012-10-17', + 'Statement': [{ + 'Effect': 'Allow', + 'Principal': {'AWS': alt_arn}, + 'Action': 's3:ListBucket', + 'Resource': f'arn:aws:s3:::{bucket}' + }] + })) + + # verify that the iam user can eventually access it + retry_on('AccessDenied', 10, alts3.list_objects, Bucket=bucket) + finally: + roots3.delete_bucket(Bucket=bucket) + +# test cross-account access, adding bucket policy before the user policy +def _test_cross_account_bucket_user_policy(roots3, alt_root, alt_name, alt_arn): + key = alt_root.create_access_key(UserName=alt_name)['AccessKey'] + alts3 = get_iam_s3client(aws_access_key_id=key['AccessKeyId'], + aws_secret_access_key=key['SecretAccessKey']) + + # create a bucket with the root user + bucket = get_new_bucket(roots3) + try: + # add a bucket policy that allows s3:ListBucket for the iam user's arn + roots3.put_bucket_policy(Bucket=bucket, Policy=json.dumps({ + 'Version': '2012-10-17', + 'Statement': [{ + 'Effect': 'Allow', + 'Principal': {'AWS': alt_arn}, + 'Action': 's3:ListBucket', + 'Resource': f'arn:aws:s3:::{bucket}' + }] + })) + + # the access key may take a bit to start working. retry until it returns + # something other than InvalidAccessKeyId + e = assert_raises(ClientError, retry_on, 'InvalidAccessKeyId', 10, alts3.list_objects, Bucket=bucket) + status, error_code = _get_status_and_error_code(e.response) + assert status == 403 + assert error_code == 'AccessDenied' + + # add a user policy that allows s3 actions + alt_root.put_user_policy(UserName=alt_name, PolicyName='AllowStar', PolicyDocument=json.dumps({ + 'Version': '2012-10-17', + 'Statement': [{ + 'Effect': 'Allow', + 'Action': 's3:*', + 'Resource': '*' + }] + })) + + # verify that the iam user can eventually access it + retry_on('AccessDenied', 10, alts3.list_objects, Bucket=bucket) + finally: + roots3.delete_bucket(Bucket=bucket) + +@pytest.mark.iam_account +@pytest.mark.iam_cross_account +def test_cross_account_bucket_user_policy_allow_user_arn(iam_root, iam_alt_root): + roots3 = get_iam_root_client(service_name='s3') + path = get_iam_path_prefix() + user_name = make_iam_name('AltUser') + response = iam_alt_root.create_user(UserName=user_name, Path=path) + user_arn = response['User']['Arn'] + _test_cross_account_bucket_user_policy(roots3, iam_alt_root, user_name, user_arn) + +@pytest.mark.iam_account +@pytest.mark.iam_cross_account +def test_cross_account_user_bucket_policy_allow_user_arn(iam_root, iam_alt_root): + roots3 = get_iam_root_client(service_name='s3') + path = get_iam_path_prefix() + user_name = make_iam_name('AltUser') + response = iam_alt_root.create_user(UserName=user_name, Path=path) + user_arn = response['User']['Arn'] + _test_cross_account_user_bucket_policy(roots3, iam_alt_root, user_name, user_arn) + +@pytest.mark.iam_account +@pytest.mark.iam_cross_account +def test_cross_account_user_bucket_policy_allow_account_arn(iam_root, iam_alt_root): + roots3 = get_iam_root_client(service_name='s3') + path = get_iam_path_prefix() + user_name = make_iam_name('AltUser') + response = iam_alt_root.create_user(UserName=user_name, Path=path) + user_arn = response['User']['Arn'] + account_arn = user_arn.replace(f':user{path}{user_name}', ':root') + _test_cross_account_user_bucket_policy(roots3, iam_alt_root, user_name, account_arn) + +@pytest.mark.iam_account +@pytest.mark.iam_cross_account +def test_cross_account_bucket_user_policy_allow_account_arn(iam_root, iam_alt_root): + roots3 = get_iam_root_client(service_name='s3') + path = get_iam_path_prefix() + user_name = make_iam_name('AltUser') + response = iam_alt_root.create_user(UserName=user_name, Path=path) + user_arn = response['User']['Arn'] + account_arn = user_arn.replace(f':user{path}{user_name}', ':root') + _test_cross_account_bucket_user_policy(roots3, iam_alt_root, user_name, account_arn) + +@pytest.mark.iam_account +@pytest.mark.iam_cross_account +def test_cross_account_user_bucket_policy_allow_account_id(iam_root, iam_alt_root): + roots3 = get_iam_root_client(service_name='s3') + path = get_iam_path_prefix() + user_name = make_iam_name('AltUser') + response = iam_alt_root.create_user(UserName=user_name, Path=path) + user_arn = response['User']['Arn'] + account_id = user_arn.removeprefix('arn:aws:iam::').removesuffix(f':user{path}{user_name}') + _test_cross_account_user_bucket_policy(roots3, iam_alt_root, user_name, account_id) + +@pytest.mark.iam_account +@pytest.mark.iam_cross_account +def test_cross_account_bucket_user_policy_allow_account_id(iam_root, iam_alt_root): + roots3 = get_iam_root_client(service_name='s3') + path = get_iam_path_prefix() + user_name = make_iam_name('AltUser') + response = iam_alt_root.create_user(UserName=user_name, Path=path) + user_arn = response['User']['Arn'] + account_id = user_arn.removeprefix('arn:aws:iam::').removesuffix(f':user{path}{user_name}') + _test_cross_account_bucket_user_policy(roots3, iam_alt_root, user_name, account_id) + + +# test cross-account access, adding user policy before the bucket acl +def _test_cross_account_user_policy_bucket_acl(roots3, alt_root, alt_name, grantee): + # add a user policy that allows s3 actions + alt_root.put_user_policy(UserName=alt_name, PolicyName='AllowStar', PolicyDocument=json.dumps({ + 'Version': '2012-10-17', + 'Statement': [{ + 'Effect': 'Allow', + 'Action': 's3:*', + 'Resource': '*' + }] + })) + + key = alt_root.create_access_key(UserName=alt_name)['AccessKey'] + alts3 = get_iam_s3client(aws_access_key_id=key['AccessKeyId'], + aws_secret_access_key=key['SecretAccessKey']) + + # create a bucket with the root user + bucket = get_new_bucket(roots3) + try: + # the access key may take a bit to start working. retry until it returns + # something other than InvalidAccessKeyId + e = assert_raises(ClientError, retry_on, 'InvalidAccessKeyId', 10, alts3.list_objects, Bucket=bucket) + status, error_code = _get_status_and_error_code(e.response) + assert status == 403 + assert error_code == 'AccessDenied' + + # add a bucket acl that grants READ access + roots3.put_bucket_acl(Bucket=bucket, GrantRead=grantee) + + # verify that the iam user can eventually access it + retry_on('AccessDenied', 10, alts3.list_objects, Bucket=bucket) + finally: + roots3.delete_bucket(Bucket=bucket) + +# test cross-account access, adding bucket acl before the user policy +def _test_cross_account_bucket_acl_user_policy(roots3, alt_root, alt_name, grantee): + key = alt_root.create_access_key(UserName=alt_name)['AccessKey'] + alts3 = get_iam_s3client(aws_access_key_id=key['AccessKeyId'], + aws_secret_access_key=key['SecretAccessKey']) + + # create a bucket with the root user + bucket = get_new_bucket(roots3) + try: + # add a bucket acl that grants READ access + roots3.put_bucket_acl(Bucket=bucket, GrantRead=grantee) + + # the access key may take a bit to start working. retry until it returns + # something other than InvalidAccessKeyId + e = assert_raises(ClientError, retry_on, 'InvalidAccessKeyId', 10, alts3.list_objects, Bucket=bucket) + status, error_code = _get_status_and_error_code(e.response) + assert status == 403 + assert error_code == 'AccessDenied' + + # add a user policy that allows s3 actions + alt_root.put_user_policy(UserName=alt_name, PolicyName='AllowStar', PolicyDocument=json.dumps({ + 'Version': '2012-10-17', + 'Statement': [{ + 'Effect': 'Allow', + 'Action': 's3:*', + 'Resource': '*' + }] + })) + + # verify that the iam user can eventually access it + retry_on('AccessDenied', 10, alts3.list_objects, Bucket=bucket) + finally: + roots3.delete_bucket(Bucket=bucket) + +@pytest.mark.iam_account +@pytest.mark.iam_cross_account +@pytest.mark.fails_on_aws # can't grant to individual users +def test_cross_account_bucket_acl_user_policy_grant_user_id(iam_root, iam_alt_root): + roots3 = get_iam_root_client(service_name='s3') + path = get_iam_path_prefix() + user_name = make_iam_name('AltUser') + response = iam_alt_root.create_user(UserName=user_name, Path=path) + grantee = 'id=' + response['User']['UserId'] + _test_cross_account_bucket_acl_user_policy(roots3, iam_alt_root, user_name, grantee) + +@pytest.mark.iam_account +@pytest.mark.iam_cross_account +@pytest.mark.fails_on_aws # can't grant to individual users +def test_cross_account_user_policy_bucket_acl_grant_user_id(iam_root, iam_alt_root): + roots3 = get_iam_root_client(service_name='s3') + path = get_iam_path_prefix() + user_name = make_iam_name('AltUser') + response = iam_alt_root.create_user(UserName=user_name, Path=path) + grantee = 'id=' + response['User']['UserId'] + _test_cross_account_user_policy_bucket_acl(roots3, iam_alt_root, user_name, grantee) + +@pytest.mark.iam_account +@pytest.mark.iam_cross_account +def test_cross_account_bucket_acl_user_policy_grant_canonical_id(iam_root, iam_alt_root): + roots3 = get_iam_root_client(service_name='s3') + path = get_iam_path_prefix() + user_name = make_iam_name('AltUser') + iam_alt_root.create_user(UserName=user_name, Path=path) + grantee = 'id=' + get_iam_alt_root_user_id() + _test_cross_account_bucket_acl_user_policy(roots3, iam_alt_root, user_name, grantee) + +@pytest.mark.iam_account +@pytest.mark.iam_cross_account +def test_cross_account_user_policy_bucket_acl_grant_canonical_id(iam_root, iam_alt_root): + roots3 = get_iam_root_client(service_name='s3') + path = get_iam_path_prefix() + user_name = make_iam_name('AltUser') + iam_alt_root.create_user(UserName=user_name, Path=path) + grantee = 'id=' + get_iam_alt_root_user_id() + _test_cross_account_user_policy_bucket_acl(roots3, iam_alt_root, user_name, grantee) + +@pytest.mark.iam_account +@pytest.mark.iam_cross_account +def test_cross_account_bucket_acl_user_policy_grant_account_email(iam_root, iam_alt_root): + roots3 = get_iam_root_client(service_name='s3') + path = get_iam_path_prefix() + user_name = make_iam_name('AltUser') + iam_alt_root.create_user(UserName=user_name, Path=path) + grantee = 'emailAddress=' + get_iam_alt_root_email() + _test_cross_account_bucket_acl_user_policy(roots3, iam_alt_root, user_name, grantee) + +@pytest.mark.iam_account +@pytest.mark.iam_cross_account +def test_cross_account_user_policy_bucket_acl_grant_account_email(iam_root, iam_alt_root): + roots3 = get_iam_root_client(service_name='s3') + path = get_iam_path_prefix() + user_name = make_iam_name('AltUser') + iam_alt_root.create_user(UserName=user_name, Path=path) + grantee = 'emailAddress=' + get_iam_alt_root_email() + _test_cross_account_user_policy_bucket_acl(roots3, iam_alt_root, user_name, grantee) + + +# test root cross-account access with bucket policy +def _test_cross_account_root_bucket_policy(roots3, alts3, alt_arn): + # create a bucket with the root user + bucket = get_new_bucket(roots3) + try: + e = assert_raises(ClientError, alts3.list_objects, Bucket=bucket) + status, error_code = _get_status_and_error_code(e.response) + assert status == 403 + assert error_code == 'AccessDenied' + + # add a bucket policy that allows s3:ListBucket for the iam user's arn + roots3.put_bucket_policy(Bucket=bucket, Policy=json.dumps({ + 'Version': '2012-10-17', + 'Statement': [{ + 'Effect': 'Allow', + 'Principal': {'AWS': alt_arn}, + 'Action': 's3:ListBucket', + 'Resource': f'arn:aws:s3:::{bucket}' + }] + })) + + # verify that the iam user can eventually access it + retry_on('AccessDenied', 10, alts3.list_objects, Bucket=bucket) + finally: + roots3.delete_bucket(Bucket=bucket) + +@pytest.mark.iam_account +@pytest.mark.iam_cross_account +def test_cross_account_root_bucket_policy_allow_account_arn(iam_root, iam_alt_root): + roots3 = get_iam_root_client(service_name='s3') + alts3 = get_iam_alt_root_client(service_name='s3') + alt_arn = iam_alt_root.get_user()['User']['Arn'] + _test_cross_account_root_bucket_policy(roots3, alts3, alt_arn) + +@pytest.mark.iam_account +@pytest.mark.iam_cross_account +def test_cross_account_root_bucket_policy_allow_account_id(iam_root, iam_alt_root): + roots3 = get_iam_root_client(service_name='s3') + alts3 = get_iam_alt_root_client(service_name='s3') + alt_arn = iam_alt_root.get_user()['User']['Arn'] + account_id = alt_arn.removeprefix('arn:aws:iam::').removesuffix(':root') + _test_cross_account_root_bucket_policy(roots3, alts3, account_id) + +# test root cross-account access with bucket acls +def _test_cross_account_root_bucket_acl(roots3, alts3, grantee): + # create a bucket with the root user + bucket = get_new_bucket(roots3) + try: + e = assert_raises(ClientError, alts3.list_objects, Bucket=bucket) + status, error_code = _get_status_and_error_code(e.response) + assert status == 403 + assert error_code == 'AccessDenied' + + # add a bucket acl that grants READ + roots3.put_bucket_acl(Bucket=bucket, GrantRead=grantee) + + # verify that the iam user can eventually access it + retry_on('AccessDenied', 10, alts3.list_objects, Bucket=bucket) + finally: + roots3.delete_bucket(Bucket=bucket) + +@pytest.mark.iam_account +@pytest.mark.iam_cross_account +def test_cross_account_root_bucket_acl_grant_canonical_id(iam_root, iam_alt_root): + roots3 = get_iam_root_client(service_name='s3') + alts3 = get_iam_alt_root_client(service_name='s3') + grantee = 'id=' + get_iam_alt_root_user_id() + _test_cross_account_root_bucket_acl(roots3, alts3, grantee) + +@pytest.mark.iam_account +@pytest.mark.iam_cross_account +def test_cross_account_root_bucket_acl_grant_account_email(iam_root, iam_alt_root): + roots3 = get_iam_root_client(service_name='s3') + alts3 = get_iam_alt_root_client(service_name='s3') + grantee = 'emailAddress=' + get_iam_alt_root_email() + _test_cross_account_root_bucket_acl(roots3, alts3, grantee) diff --git a/s3tests_boto3/functional/test_s3.py b/s3tests_boto3/functional/test_s3.py index d5ab01a..728f97b 100644 --- a/s3tests_boto3/functional/test_s3.py +++ b/s3tests_boto3/functional/test_s3.py @@ -4674,40 +4674,6 @@ def test_bucket_acl_grant_nonexist_user(): assert status == 400 assert error_code == 'InvalidArgument' -def test_bucket_acl_no_grants(): - bucket_name = get_new_bucket() - client = get_client() - - client.put_object(Bucket=bucket_name, Key='foo', Body='bar') - response = client.get_bucket_acl(Bucket=bucket_name) - old_grants = response['Grants'] - policy = {} - policy['Owner'] = response['Owner'] - # clear grants - policy['Grants'] = [] - - # remove read/write permission - response = client.put_bucket_acl(Bucket=bucket_name, AccessControlPolicy=policy) - - # can read - client.get_object(Bucket=bucket_name, Key='foo') - - # can't write - check_access_denied(client.put_object, Bucket=bucket_name, Key='baz', Body='a') - - #TODO fix this test once a fix is in for same issues in - # test_access_bucket_private_object_private - client2 = get_client() - # owner can read acl - client2.get_bucket_acl(Bucket=bucket_name) - - # owner can write acl - client2.put_bucket_acl(Bucket=bucket_name, ACL='private') - - # set policy back to original so that bucket can be cleaned up - policy['Grants'] = old_grants - client2.put_bucket_acl(Bucket=bucket_name, AccessControlPolicy=policy) - def _get_acl_header(user_id=None, perms=None): all_headers = ["read", "write", "read-acp", "write-acp", "full-control"] headers = [] diff --git a/s3tests_boto3/functional/test_sns.py b/s3tests_boto3/functional/test_sns.py new file mode 100644 index 0000000..360f14e --- /dev/null +++ b/s3tests_boto3/functional/test_sns.py @@ -0,0 +1,159 @@ +import json +import pytest +from botocore.exceptions import ClientError +from . import ( + configfile, + get_iam_root_client, + get_iam_alt_root_client, + get_new_bucket_name, + get_prefix, + nuke_prefixed_buckets, +) +from .iam import iam_root, iam_alt_root +from .utils import assert_raises, _get_status_and_error_code + +def get_new_topic_name(): + return get_new_bucket_name() + +def nuke_topics(client, prefix): + p = client.get_paginator('list_topics') + for response in p.paginate(): + for topic in response['Topics']: + arn = topic['TopicArn'] + if prefix not in arn: + pass + try: + client.delete_topic(TopicArn=arn) + except: + pass + +@pytest.fixture +def sns(iam_root): + client = get_iam_root_client(service_name='sns') + yield client + nuke_topics(client, get_prefix()) + +@pytest.fixture +def sns_alt(iam_alt_root): + client = get_iam_alt_root_client(service_name='sns') + yield client + nuke_topics(client, get_prefix()) + +@pytest.fixture +def s3(iam_root): + client = get_iam_root_client(service_name='s3') + yield client + nuke_prefixed_buckets(get_prefix(), client) + +@pytest.fixture +def s3_alt(iam_alt_root): + client = get_iam_alt_root_client(service_name='s3') + yield client + nuke_prefixed_buckets(get_prefix(), client) + + +@pytest.mark.iam_account +@pytest.mark.sns +def test_account_topic(sns): + name = get_new_topic_name() + + response = sns.create_topic(Name=name) + arn = response['TopicArn'] + assert arn.startswith('arn:aws:sns:') + assert arn.endswith(f':{name}') + + response = sns.list_topics() + assert arn in [p['TopicArn'] for p in response['Topics']] + + sns.set_topic_attributes(TopicArn=arn, AttributeName='Policy', AttributeValue='') + + response = sns.get_topic_attributes(TopicArn=arn) + assert 'Attributes' in response + + sns.delete_topic(TopicArn=arn) + + response = sns.list_topics() + assert arn not in [p['TopicArn'] for p in response['Topics']] + + with pytest.raises(sns.exceptions.NotFoundException): + sns.get_topic_attributes(TopicArn=arn) + sns.delete_topic(TopicArn=arn) + +@pytest.mark.iam_account +@pytest.mark.sns +def test_cross_account_topic(sns, sns_alt): + name = get_new_topic_name() + arn = sns.create_topic(Name=name)['TopicArn'] + + # not visible to any alt user apis + with pytest.raises(sns.exceptions.NotFoundException): + sns_alt.get_topic_attributes(TopicArn=arn) + with pytest.raises(sns.exceptions.NotFoundException): + sns_alt.set_topic_attributes(TopicArn=arn, AttributeName='Policy', AttributeValue='') + + # delete returns success + sns_alt.delete_topic(TopicArn=arn) + + response = sns_alt.list_topics() + assert arn not in [p['TopicArn'] for p in response['Topics']] + +@pytest.mark.iam_account +@pytest.mark.sns +def test_account_topic_publish(sns, s3): + name = get_new_topic_name() + + response = sns.create_topic(Name=name) + topic_arn = response['TopicArn'] + + bucket = get_new_bucket_name() + s3.create_bucket(Bucket=bucket) + + config = {'TopicConfigurations': [{ + 'Id': 'id', + 'TopicArn': topic_arn, + 'Events': [ 's3:ObjectCreated:*' ], + }]} + s3.put_bucket_notification_configuration( + Bucket=bucket, NotificationConfiguration=config) + +@pytest.mark.iam_account +@pytest.mark.iam_cross_account +@pytest.mark.sns +def test_cross_account_topic_publish(sns, s3_alt, iam_alt_root): + name = get_new_topic_name() + + response = sns.create_topic(Name=name) + topic_arn = response['TopicArn'] + + bucket = get_new_bucket_name() + s3_alt.create_bucket(Bucket=bucket) + + config = {'TopicConfigurations': [{ + 'Id': 'id', + 'TopicArn': topic_arn, + 'Events': [ 's3:ObjectCreated:*' ], + }]} + + # expect AccessDenies because no resource policy allows cross-account access + e = assert_raises(ClientError, s3_alt.put_bucket_notification_configuration, + Bucket=bucket, NotificationConfiguration=config) + status, error_code = _get_status_and_error_code(e.response) + assert status == 403 + assert error_code == 'AccessDenied' + + # add topic policy to allow the alt user + alt_principal = iam_alt_root.get_user()['User']['Arn'] + policy = json.dumps({ + 'Version': '2012-10-17', + 'Statement': [{ + 'Effect': 'Allow', + 'Principal': {'AWS': alt_principal}, + 'Action': 'sns:Publish', + 'Resource': topic_arn + }] + }) + sns.set_topic_attributes(TopicArn=topic_arn, AttributeName='Policy', + AttributeValue=policy) + + s3_alt.put_bucket_notification_configuration( + Bucket=bucket, NotificationConfiguration=config)