From 8e01f2315c66a982a682ce871609f7810e91c9a1 Mon Sep 17 00:00:00 2001 From: Casey Bodley Date: Thu, 11 Jan 2024 22:38:17 -0500 Subject: [PATCH 01/22] fixtures: split setup() and configure() Signed-off-by: Casey Bodley --- s3tests_boto3/functional/__init__.py | 17 +++++++++-------- 1 file changed, 9 insertions(+), 8 deletions(-) diff --git a/s3tests_boto3/functional/__init__.py b/s3tests_boto3/functional/__init__.py index a65b54c..0636932 100644 --- a/s3tests_boto3/functional/__init__.py +++ b/s3tests_boto3/functional/__init__.py @@ -174,7 +174,7 @@ def configured_storage_classes(): return sc -def setup(): +def configure(): cfg = configparser.RawConfigParser() try: path = os.environ['S3TEST_CONF'] @@ -267,17 +267,17 @@ def setup(): template = 'test-{random}-' prefix = choose_bucket_prefix(template=template) - alt_client = get_alt_client() - tenant_client = get_tenant_client() - nuke_prefixed_buckets(prefix=prefix) - nuke_prefixed_buckets(prefix=prefix, client=alt_client) - nuke_prefixed_buckets(prefix=prefix, client=tenant_client) - if cfg.has_section("s3 cloud"): get_cloud_config(cfg) else: config.cloud_storage_class = None +def setup(): + alt_client = get_alt_client() + tenant_client = get_tenant_client() + nuke_prefixed_buckets(prefix=prefix) + nuke_prefixed_buckets(prefix=prefix, client=alt_client) + nuke_prefixed_buckets(prefix=prefix, client=tenant_client) def teardown(): alt_client = get_alt_client() @@ -306,11 +306,12 @@ def teardown(): @pytest.fixture(scope="package") def configfile(): - setup() + configure() return config @pytest.fixture(autouse=True) def setup_teardown(configfile): + setup() yield teardown() From 83af25722c2a1591d47d1e97bec3df4f8a19f6a2 Mon Sep 17 00:00:00 2001 From: Casey Bodley Date: Thu, 11 Jan 2024 22:42:24 -0500 Subject: [PATCH 02/22] config: add fixtures for iam name/path prefixes Signed-off-by: Casey Bodley --- s3tests.conf.SAMPLE | 8 ++++++++ s3tests_boto3/functional/__init__.py | 18 ++++++++++++++---- 2 files changed, 22 insertions(+), 4 deletions(-) diff --git a/s3tests.conf.SAMPLE b/s3tests.conf.SAMPLE index 9593fc1..60e9928 100644 --- a/s3tests.conf.SAMPLE +++ b/s3tests.conf.SAMPLE @@ -19,6 +19,14 @@ ssl_verify = False ## the prefix to 30 characters long, and avoid collisions bucket prefix = yournamehere-{random}- +# all the iam account resources (users, roles, etc) created +# will start with this name prefix +iam name prefix = s3-tests- + +# all the iam account resources (users, roles, etc) created +# will start with this path prefix +iam path prefix = /s3-tests/ + [s3 main] # main display_name set in vstart.sh display_name = M. Tester diff --git a/s3tests_boto3/functional/__init__.py b/s3tests_boto3/functional/__init__.py index 0636932..d45e626 100644 --- a/s3tests_boto3/functional/__init__.py +++ b/s3tests_boto3/functional/__init__.py @@ -261,11 +261,12 @@ def configure(): config.tenant_email = cfg.get('s3 tenant',"email") # vars from the fixtures section - try: - template = cfg.get('fixtures', "bucket prefix") - except (configparser.NoOptionError): - template = 'test-{random}-' + template = cfg.get('fixtures', "bucket prefix", fallback='test-{random}-') prefix = choose_bucket_prefix(template=template) + template = cfg.get('fixtures', "iam name prefix", fallback="s3-tests-") + config.iam_name_prefix = choose_bucket_prefix(template=template) + template = cfg.get('fixtures', "iam path prefix", fallback="/s3-tests/") + config.iam_path_prefix = choose_bucket_prefix(template=template) if cfg.has_section("s3 cloud"): get_cloud_config(cfg) @@ -700,6 +701,15 @@ def get_token(): def get_realm_name(): return config.webidentity_realm +def get_iam_name_prefix(): + return config.iam_name_prefix + +def make_iam_name(name): + return config.iam_name_prefix + name + +def get_iam_path_prefix(): + return config.iam_path_prefix + def get_iam_access_key(): return config.iam_access_key From a35b3c609a5a74fbc17cf52264ab17d9d6ffa0a6 Mon Sep 17 00:00:00 2001 From: Casey Bodley Date: Thu, 11 Jan 2024 22:45:03 -0500 Subject: [PATCH 03/22] iam: rename test_of_iam mark to iam_tenant differentiate the test cases that expect a tenant-wide IAM api from new ones that expect an account-wide api Signed-off-by: Casey Bodley --- pytest.ini | 2 +- s3tests_boto3/functional/test_iam.py | 54 ++++++++++++++-------------- 2 files changed, 28 insertions(+), 28 deletions(-) diff --git a/pytest.ini b/pytest.ini index 0e3bcba..824adec 100644 --- a/pytest.ini +++ b/pytest.ini @@ -16,6 +16,7 @@ markers = fails_on_rgw fails_on_s3 fails_with_subdomain + iam_tenant lifecycle lifecycle_expiration lifecycle_transition @@ -30,7 +31,6 @@ markers = sse_s3 storage_class tagging - test_of_iam test_of_sts token_claims_trust_policy_test token_principal_tag_role_policy_test diff --git a/s3tests_boto3/functional/test_iam.py b/s3tests_boto3/functional/test_iam.py index fa44357..195ce80 100644 --- a/s3tests_boto3/functional/test_iam.py +++ b/s3tests_boto3/functional/test_iam.py @@ -19,7 +19,7 @@ from .utils import _get_status, _get_status_and_error_code @pytest.mark.user_policy -@pytest.mark.test_of_iam +@pytest.mark.iam_tenant def test_put_user_policy(): client = get_iam_client() @@ -39,7 +39,7 @@ def test_put_user_policy(): @pytest.mark.user_policy -@pytest.mark.test_of_iam +@pytest.mark.iam_tenant def test_put_user_policy_invalid_user(): client = get_iam_client() @@ -57,7 +57,7 @@ def test_put_user_policy_invalid_user(): @pytest.mark.user_policy -@pytest.mark.test_of_iam +@pytest.mark.iam_tenant def test_put_user_policy_parameter_limit(): client = get_iam_client() @@ -76,7 +76,7 @@ def test_put_user_policy_parameter_limit(): @pytest.mark.user_policy -@pytest.mark.test_of_iam +@pytest.mark.iam_tenant @pytest.mark.fails_on_rgw def test_put_user_policy_invalid_element(): client = get_iam_client() @@ -142,7 +142,7 @@ def test_put_user_policy_invalid_element(): @pytest.mark.user_policy -@pytest.mark.test_of_iam +@pytest.mark.iam_tenant def test_put_existing_user_policy(): client = get_iam_client() @@ -163,7 +163,7 @@ def test_put_existing_user_policy(): @pytest.mark.user_policy -@pytest.mark.test_of_iam +@pytest.mark.iam_tenant def test_list_user_policy(): client = get_iam_client() @@ -184,7 +184,7 @@ def test_list_user_policy(): @pytest.mark.user_policy -@pytest.mark.test_of_iam +@pytest.mark.iam_tenant def test_list_user_policy_invalid_user(): client = get_iam_client() e = assert_raises(ClientError, client.list_user_policies, UserName="some-non-existing-user-id") @@ -193,7 +193,7 @@ def test_list_user_policy_invalid_user(): @pytest.mark.user_policy -@pytest.mark.test_of_iam +@pytest.mark.iam_tenant def test_get_user_policy(): client = get_iam_client() @@ -216,7 +216,7 @@ def test_get_user_policy(): @pytest.mark.user_policy -@pytest.mark.test_of_iam +@pytest.mark.iam_tenant def test_get_user_policy_invalid_user(): client = get_iam_client() @@ -238,7 +238,7 @@ def test_get_user_policy_invalid_user(): @pytest.mark.user_policy -@pytest.mark.test_of_iam +@pytest.mark.iam_tenant @pytest.mark.fails_on_rgw def test_get_user_policy_invalid_policy_name(): client = get_iam_client() @@ -260,7 +260,7 @@ def test_get_user_policy_invalid_policy_name(): @pytest.mark.user_policy -@pytest.mark.test_of_iam +@pytest.mark.iam_tenant @pytest.mark.fails_on_rgw def test_get_deleted_user_policy(): client = get_iam_client() @@ -282,7 +282,7 @@ def test_get_deleted_user_policy(): @pytest.mark.user_policy -@pytest.mark.test_of_iam +@pytest.mark.iam_tenant def test_get_user_policy_from_multiple_policies(): client = get_iam_client() @@ -315,7 +315,7 @@ def test_get_user_policy_from_multiple_policies(): @pytest.mark.user_policy -@pytest.mark.test_of_iam +@pytest.mark.iam_tenant def test_delete_user_policy(): client = get_iam_client() @@ -337,7 +337,7 @@ def test_delete_user_policy(): @pytest.mark.user_policy -@pytest.mark.test_of_iam +@pytest.mark.iam_tenant def test_delete_user_policy_invalid_user(): client = get_iam_client() @@ -363,7 +363,7 @@ def test_delete_user_policy_invalid_user(): @pytest.mark.user_policy -@pytest.mark.test_of_iam +@pytest.mark.iam_tenant def test_delete_user_policy_invalid_policy_name(): client = get_iam_client() @@ -389,7 +389,7 @@ def test_delete_user_policy_invalid_policy_name(): @pytest.mark.user_policy -@pytest.mark.test_of_iam +@pytest.mark.iam_tenant def test_delete_user_policy_from_multiple_policies(): client = get_iam_client() @@ -429,7 +429,7 @@ def test_delete_user_policy_from_multiple_policies(): @pytest.mark.user_policy -@pytest.mark.test_of_iam +@pytest.mark.iam_tenant def test_allow_bucket_actions_in_user_policy(): client = get_iam_client() s3_client_alt = get_alt_client() @@ -476,7 +476,7 @@ def test_allow_bucket_actions_in_user_policy(): @pytest.mark.user_policy -@pytest.mark.test_of_iam +@pytest.mark.iam_tenant @pytest.mark.fails_on_dbstore def test_deny_bucket_actions_in_user_policy(): client = get_iam_client() @@ -512,7 +512,7 @@ def test_deny_bucket_actions_in_user_policy(): @pytest.mark.user_policy -@pytest.mark.test_of_iam +@pytest.mark.iam_tenant def test_allow_object_actions_in_user_policy(): client = get_iam_client() s3_client_alt = get_alt_client() @@ -551,7 +551,7 @@ def test_allow_object_actions_in_user_policy(): @pytest.mark.user_policy -@pytest.mark.test_of_iam +@pytest.mark.iam_tenant @pytest.mark.fails_on_dbstore def test_deny_object_actions_in_user_policy(): client = get_iam_client() @@ -591,7 +591,7 @@ def test_deny_object_actions_in_user_policy(): @pytest.mark.user_policy -@pytest.mark.test_of_iam +@pytest.mark.iam_tenant def test_allow_multipart_actions_in_user_policy(): client = get_iam_client() s3_client_alt = get_alt_client() @@ -626,7 +626,7 @@ def test_allow_multipart_actions_in_user_policy(): @pytest.mark.user_policy -@pytest.mark.test_of_iam +@pytest.mark.iam_tenant @pytest.mark.fails_on_dbstore def test_deny_multipart_actions_in_user_policy(): client = get_iam_client() @@ -669,7 +669,7 @@ def test_deny_multipart_actions_in_user_policy(): @pytest.mark.user_policy -@pytest.mark.test_of_iam +@pytest.mark.iam_tenant @pytest.mark.fails_on_dbstore def test_allow_tagging_actions_in_user_policy(): client = get_iam_client() @@ -715,7 +715,7 @@ def test_allow_tagging_actions_in_user_policy(): @pytest.mark.user_policy -@pytest.mark.test_of_iam +@pytest.mark.iam_tenant @pytest.mark.fails_on_dbstore def test_deny_tagging_actions_in_user_policy(): client = get_iam_client() @@ -767,7 +767,7 @@ def test_deny_tagging_actions_in_user_policy(): @pytest.mark.user_policy -@pytest.mark.test_of_iam +@pytest.mark.iam_tenant @pytest.mark.fails_on_dbstore def test_verify_conflicting_user_policy_statements(): s3client = get_alt_client() @@ -799,7 +799,7 @@ def test_verify_conflicting_user_policy_statements(): @pytest.mark.user_policy -@pytest.mark.test_of_iam +@pytest.mark.iam_tenant @pytest.mark.fails_on_dbstore def test_verify_conflicting_user_policies(): s3client = get_alt_client() @@ -838,7 +838,7 @@ def test_verify_conflicting_user_policies(): @pytest.mark.user_policy -@pytest.mark.test_of_iam +@pytest.mark.iam_tenant def test_verify_allow_iam_actions(): policy1 = json.dumps( {"Version": "2012-10-17", From 5f3353e6b5a734b975465cfb27b32532d9fad091 Mon Sep 17 00:00:00 2001 From: Casey Bodley Date: Mon, 1 Jan 2024 19:08:10 -0500 Subject: [PATCH 04/22] config: parse iam config during setup() Signed-off-by: Casey Bodley --- s3tests_boto3/functional/__init__.py | 65 +++++++++++----------------- 1 file changed, 25 insertions(+), 40 deletions(-) diff --git a/s3tests_boto3/functional/__init__.py b/s3tests_boto3/functional/__init__.py index d45e626..ec3f6dc 100644 --- a/s3tests_boto3/functional/__init__.py +++ b/s3tests_boto3/functional/__init__.py @@ -260,6 +260,12 @@ def configure(): config.tenant_user_id = cfg.get('s3 tenant',"user_id") config.tenant_email = cfg.get('s3 tenant',"email") + config.iam_access_key = cfg.get('iam',"access_key") + config.iam_secret_key = cfg.get('iam',"secret_key") + config.iam_display_name = cfg.get('iam',"display_name") + config.iam_user_id = cfg.get('iam',"user_id") + config.iam_email = cfg.get('iam',"email") + # vars from the fixtures section template = cfg.get('fixtures', "bucket prefix", fallback='test-{random}-') prefix = choose_bucket_prefix(template=template) @@ -397,62 +403,41 @@ def get_v2_client(): config=Config(signature_version='s3')) return client -def get_sts_client(client_config=None): - if client_config == None: - client_config = Config(signature_version='s3v4') +def get_sts_client(**kwargs): + kwargs.setdefault('aws_access_key_id', config.alt_access_key) + kwargs.setdefault('aws_secret_access_key', config.alt_secret_key) + kwargs.setdefault('config', Config(signature_version='s3v4')) client = boto3.client(service_name='sts', - aws_access_key_id=config.alt_access_key, - aws_secret_access_key=config.alt_secret_key, - endpoint_url=config.default_endpoint, - region_name='', - use_ssl=config.default_is_secure, - verify=config.default_ssl_verify, - config=client_config) + endpoint_url=config.default_endpoint, + region_name='', + use_ssl=config.default_is_secure, + verify=config.default_ssl_verify, + **kwargs) return client -def get_iam_client(client_config=None): - cfg = configparser.RawConfigParser() - try: - path = os.environ['S3TEST_CONF'] - except KeyError: - raise RuntimeError( - 'To run tests, point environment ' - + 'variable S3TEST_CONF to a config file.', - ) - cfg.read(path) - if not cfg.has_section("iam"): - raise RuntimeError('Your config file is missing the "iam" section!') +def get_iam_client(**kwargs): + kwargs.setdefault('aws_access_key_id', config.iam_access_key) + kwargs.setdefault('aws_secret_access_key', config.iam_secret_key) - config.iam_access_key = cfg.get('iam',"access_key") - config.iam_secret_key = cfg.get('iam',"secret_key") - config.iam_display_name = cfg.get('iam',"display_name") - config.iam_user_id = cfg.get('iam',"user_id") - config.iam_email = cfg.get('iam',"email") - - if client_config == None: - client_config = Config(signature_version='s3v4') - client = boto3.client(service_name='iam', - aws_access_key_id=config.iam_access_key, - aws_secret_access_key=config.iam_secret_key, endpoint_url=config.default_endpoint, region_name='', use_ssl=config.default_is_secure, verify=config.default_ssl_verify, - config=client_config) + **kwargs) return client -def get_iam_s3client(client_config=None): - if client_config == None: - client_config = Config(signature_version='s3v4') +def get_iam_s3client(**kwargs): + kwargs.setdefault('aws_access_key_id', config.iam_access_key) + kwargs.setdefault('aws_secret_access_key', config.iam_secret_key) + kwargs.setdefault('config', Config(signature_version='s3v4')) + client = boto3.client(service_name='s3', - aws_access_key_id=get_iam_access_key(), - aws_secret_access_key=get_iam_secret_key(), endpoint_url=config.default_endpoint, use_ssl=config.default_is_secure, verify=config.default_ssl_verify, - config=client_config) + **kwargs) return client def get_alt_client(client_config=None): From 7cd46138837a42ceb800d2f44f42a0f43271affa Mon Sep 17 00:00:00 2001 From: Casey Bodley Date: Sun, 4 Feb 2024 18:25:31 -0500 Subject: [PATCH 05/22] config: add [iam root] for an account root user Signed-off-by: Casey Bodley --- s3tests.conf.SAMPLE | 7 +++++++ s3tests_boto3/functional/__init__.py | 18 ++++++++++++++++++ 2 files changed, 25 insertions(+) diff --git a/s3tests.conf.SAMPLE b/s3tests.conf.SAMPLE index 60e9928..550e806 100644 --- a/s3tests.conf.SAMPLE +++ b/s3tests.conf.SAMPLE @@ -135,6 +135,13 @@ secret_key = abcdefghijklmnopqrstuvwxyzabcdefghijklmn #display_name from vstart.sh display_name = youruseridhere +# iam account root user for iam_account tests +[iam root] +access_key = AAAAAAAAAAAAAAAAAAaa +secret_key = aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa +user_id = RGW11111111111111111 +email = account1@ceph.com + #following section needs to be added when you want to run Assume Role With Webidentity test [webidentity] #used for assume role with web identity test in sts-tests diff --git a/s3tests_boto3/functional/__init__.py b/s3tests_boto3/functional/__init__.py index ec3f6dc..d1330a9 100644 --- a/s3tests_boto3/functional/__init__.py +++ b/s3tests_boto3/functional/__init__.py @@ -266,6 +266,10 @@ def configure(): config.iam_user_id = cfg.get('iam',"user_id") config.iam_email = cfg.get('iam',"email") + config.iam_root_access_key = cfg.get('iam root',"access_key") + config.iam_root_secret_key = cfg.get('iam root',"secret_key") + config.iam_root_email = cfg.get('iam root',"email") + # vars from the fixtures section template = cfg.get('fixtures', "bucket prefix", fallback='test-{random}-') prefix = choose_bucket_prefix(template=template) @@ -440,6 +444,17 @@ def get_iam_s3client(**kwargs): **kwargs) return client +def get_iam_root_client(**kwargs): + kwargs.setdefault('service_name', 'iam') + kwargs.setdefault('aws_access_key_id', config.iam_root_access_key) + kwargs.setdefault('aws_secret_access_key', config.iam_root_secret_key) + + return boto3.client(endpoint_url=config.default_endpoint, + region_name='', + use_ssl=config.default_is_secure, + verify=config.default_ssl_verify, + **kwargs) + def get_alt_client(client_config=None): if client_config == None: client_config = Config(signature_version='s3v4') @@ -701,6 +716,9 @@ def get_iam_access_key(): def get_iam_secret_key(): return config.iam_secret_key +def get_iam_root_email(): + return config.iam_root_email + def get_user_token(): return config.webidentity_user_token From d8becad96a24f260b50d06f7397a615c1ab52ed2 Mon Sep 17 00:00:00 2001 From: Casey Bodley Date: Tue, 19 Dec 2023 10:26:19 -0500 Subject: [PATCH 06/22] iam: add tests for User apis adds test cases for the following iam actions: * CreateUser * GetUser * UpdateUser * DeleteUser * ListUsers verified to pass against aws when an account root user's credentials are provided in the [iam] section of s3tests.conf Signed-off-by: Casey Bodley --- pytest.ini | 2 + s3tests_boto3/functional/test_iam.py | 220 +++++++++++++++++++++++++++ 2 files changed, 222 insertions(+) diff --git a/pytest.ini b/pytest.ini index 824adec..dcaa0f7 100644 --- a/pytest.ini +++ b/pytest.ini @@ -16,7 +16,9 @@ markers = fails_on_rgw fails_on_s3 fails_with_subdomain + iam_account iam_tenant + iam_user lifecycle lifecycle_expiration lifecycle_transition diff --git a/s3tests_boto3/functional/test_iam.py b/s3tests_boto3/functional/test_iam.py index 195ce80..8213627 100644 --- a/s3tests_boto3/functional/test_iam.py +++ b/s3tests_boto3/functional/test_iam.py @@ -1,4 +1,5 @@ import json +import datetime from botocore.exceptions import ClientError import pytest @@ -10,6 +11,9 @@ from . import ( setup_teardown, get_alt_client, get_iam_client, + get_iam_root_client, + make_iam_name, + get_iam_path_prefix, get_new_bucket, get_iam_s3client, get_alt_iam_client, @@ -862,3 +866,219 @@ def test_verify_allow_iam_actions(): response = iam_client_alt.delete_user_policy(PolicyName='AllowAccessPolicy', UserName=get_alt_user_id()) assert response['ResponseMetadata']['HTTPStatusCode'] == 200 + + +def nuke_user(client, name): + # delete access keys, user policies, etc + client.delete_user(UserName=name) + +def nuke_users(client, **kwargs): + p = client.get_paginator('list_users') + for response in p.paginate(**kwargs): + for user in response['Users']: + try: + nuke_user(client, user['UserName']) + except: + pass + +# fixture for iam account root user +@pytest.fixture +def iam_root(configfile): + client = get_iam_root_client() + try: + arn = client.get_user()['User']['Arn'] + if not arn.endswith(':root'): + pytest.skip('[iam root] user does not have :root arn') + except ClientError as e: + pytest.skip('[iam root] user does not belong to an account') + + yield client + nuke_users(client, PathPrefix=get_iam_path_prefix()) + + +# IAM User apis +@pytest.mark.iam_account +@pytest.mark.iam_user +def test_account_user_create(iam_root): + path = get_iam_path_prefix() + name1 = make_iam_name('U1') + response = iam_root.create_user(UserName=name1, Path=path) + user = response['User'] + assert user['Path'] == path + assert user['UserName'] == name1 + assert len(user['UserId']) + assert user['Arn'].startswith('arn:aws:iam:') + assert user['Arn'].endswith(f':user{path}{name1}') + assert user['CreateDate'] > datetime.datetime(1970, 1, 1, tzinfo=datetime.timezone.utc) + + path2 = get_iam_path_prefix() + 'foo/' + with pytest.raises(iam_root.exceptions.EntityAlreadyExistsException): + iam_root.create_user(UserName=name1, Path=path2) + + name2 = make_iam_name('U2') + response = iam_root.create_user(UserName=name2, Path=path2) + user = response['User'] + assert user['Path'] == path2 + assert user['UserName'] == name2 + +@pytest.mark.iam_account +@pytest.mark.iam_user +def test_account_user_case_insensitive_name(iam_root): + path = get_iam_path_prefix() + name_upper = make_iam_name('U1') + name_lower = make_iam_name('u1') + response = iam_root.create_user(UserName=name_upper, Path=path) + user = response['User'] + + # name is case-insensitive, so 'u1' should also conflict + with pytest.raises(iam_root.exceptions.EntityAlreadyExistsException): + iam_root.create_user(UserName=name_lower) + + # search for 'u1' should return the same 'U1' user + response = iam_root.get_user(UserName=name_lower) + assert user == response['User'] + + # delete for 'u1' should delete the same 'U1' user + iam_root.delete_user(UserName=name_lower) + + with pytest.raises(iam_root.exceptions.NoSuchEntityException): + iam_root.get_user(UserName=name_lower) + +@pytest.mark.iam_account +@pytest.mark.iam_user +def test_account_user_delete(iam_root): + path = get_iam_path_prefix() + name = make_iam_name('U1') + with pytest.raises(iam_root.exceptions.NoSuchEntityException): + iam_root.delete_user(UserName=name) + + response = iam_root.create_user(UserName=name, Path=path) + uid = response['User']['UserId'] + create_date = response['User']['CreateDate'] + + iam_root.delete_user(UserName=name) + + response = iam_root.create_user(UserName=name, Path=path) + assert uid != response['User']['UserId'] + assert create_date <= response['User']['CreateDate'] + +def user_list_names(client, **kwargs): + p = client.get_paginator('list_users') + usernames = [] + for response in p.paginate(**kwargs): + usernames += [u['UserName'] for u in response['Users']] + return usernames + +@pytest.mark.iam_account +@pytest.mark.iam_user +def test_account_user_list(iam_root): + path = get_iam_path_prefix() + response = iam_root.list_users(PathPrefix=path) + assert len(response['Users']) == 0 + assert response['IsTruncated'] == False + + name1 = make_iam_name('aa') + name2 = make_iam_name('Ab') + name3 = make_iam_name('ac') + name4 = make_iam_name('Ad') + + # sort order is independent of CreateDate, Path, and UserName capitalization + iam_root.create_user(UserName=name4, Path=path+'w/') + iam_root.create_user(UserName=name3, Path=path+'x/') + iam_root.create_user(UserName=name2, Path=path+'y/') + iam_root.create_user(UserName=name1, Path=path+'z/') + + assert [name1, name2, name3, name4] == \ + user_list_names(iam_root, PathPrefix=path) + assert [name1, name2, name3, name4] == \ + user_list_names(iam_root, PathPrefix=path, PaginationConfig={'PageSize': 1}) + +@pytest.mark.iam_account +@pytest.mark.iam_user +def test_account_user_list_path_prefix(iam_root): + path = get_iam_path_prefix() + response = iam_root.list_users(PathPrefix=path) + assert len(response['Users']) == 0 + assert response['IsTruncated'] == False + + name1 = make_iam_name('a') + name2 = make_iam_name('b') + name3 = make_iam_name('c') + name4 = make_iam_name('d') + + iam_root.create_user(UserName=name1, Path=path) + iam_root.create_user(UserName=name2, Path=path) + iam_root.create_user(UserName=name3, Path=path+'a/') + iam_root.create_user(UserName=name4, Path=path+'a/x/') + + assert [name1, name2, name3, name4] == \ + user_list_names(iam_root, PathPrefix=path) + assert [name1, name2, name3, name4] == \ + user_list_names(iam_root, PathPrefix=path, + PaginationConfig={'PageSize': 1}) + assert [name3, name4] == \ + user_list_names(iam_root, PathPrefix=path+'a') + assert [name3, name4] == \ + user_list_names(iam_root, PathPrefix=path+'a', + PaginationConfig={'PageSize': 1}) + assert [name4] == \ + user_list_names(iam_root, PathPrefix=path+'a/x') + assert [name4] == \ + user_list_names(iam_root, PathPrefix=path+'a/x', + PaginationConfig={'PageSize': 1}) + assert [] == user_list_names(iam_root, PathPrefix=path+'a/x/d') + +@pytest.mark.iam_account +@pytest.mark.iam_user +def test_account_user_update_name(iam_root): + path = get_iam_path_prefix() + name1 = make_iam_name('a') + new_name1 = make_iam_name('z') + name2 = make_iam_name('b') + with pytest.raises(iam_root.exceptions.NoSuchEntityException): + iam_root.update_user(UserName=name1, NewUserName=new_name1) + + iam_root.create_user(UserName=name1, Path=path) + iam_root.create_user(UserName=name2, Path=path+'m/') + assert [name1, name2] == user_list_names(iam_root, PathPrefix=path) + + response = iam_root.get_user(UserName=name1) + assert name1 == response['User']['UserName'] + uid = response['User']['UserId'] + + iam_root.update_user(UserName=name1, NewUserName=new_name1) + + with pytest.raises(iam_root.exceptions.NoSuchEntityException): + iam_root.get_user(UserName=name1) + + response = iam_root.get_user(UserName=new_name1) + assert new_name1 == response['User']['UserName'] + assert uid == response['User']['UserId'] + assert response['User']['Arn'].endswith(f':user{path}{new_name1}') + + assert [name2, new_name1] == user_list_names(iam_root, PathPrefix=path) + +@pytest.mark.iam_account +@pytest.mark.iam_user +def test_account_user_update_path(iam_root): + path = get_iam_path_prefix() + name1 = make_iam_name('a') + name2 = make_iam_name('b') + iam_root.create_user(UserName=name1, Path=path) + iam_root.create_user(UserName=name2, Path=path+'m/') + assert [name1, name2] == user_list_names(iam_root, PathPrefix=path) + + response = iam_root.get_user(UserName=name1) + assert name1 == response['User']['UserName'] + assert path == response['User']['Path'] + uid = response['User']['UserId'] + + iam_root.update_user(UserName=name1, NewPath=path+'z/') + + response = iam_root.get_user(UserName=name1) + assert name1 == response['User']['UserName'] + assert f'{path}z/' == response['User']['Path'] + assert uid == response['User']['UserId'] + assert response['User']['Arn'].endswith(f':user{path}z/{name1}') + + assert [name1, name2] == user_list_names(iam_root, PathPrefix=path) From db76dfe791c62e6e16969141a1f072c45ef930f3 Mon Sep 17 00:00:00 2001 From: Casey Bodley Date: Wed, 20 Dec 2023 14:17:35 -0500 Subject: [PATCH 07/22] iam: add tests for AccessKey apis adds test cases for the following iam actions: * CreateAccessKey * UpdateAccessKey * DeleteAccessKey * ListAccessKeys verified to pass against aws when an account root user's credentials are provided in the [iam] section of s3tests.conf Signed-off-by: Casey Bodley --- s3tests_boto3/functional/test_iam.py | 194 +++++++++++++++++++++++++++ 1 file changed, 194 insertions(+) diff --git a/s3tests_boto3/functional/test_iam.py b/s3tests_boto3/functional/test_iam.py index 8213627..be0a6bb 100644 --- a/s3tests_boto3/functional/test_iam.py +++ b/s3tests_boto3/functional/test_iam.py @@ -868,8 +868,21 @@ def test_verify_allow_iam_actions(): assert response['ResponseMetadata']['HTTPStatusCode'] == 200 +def nuke_user_keys(client, name): + p = client.get_paginator('list_access_keys') + for response in p.paginate(UserName=name): + for key in response['AccessKeyMetadata']: + try: + client.delete_access_key(UserName=name, AccessKeyId=key['AccessKeyId']) + except: + pass + def nuke_user(client, name): # delete access keys, user policies, etc + try: + nuke_user_keys(client, name) + except: + pass client.delete_user(UserName=name) def nuke_users(client, **kwargs): @@ -1082,3 +1095,184 @@ def test_account_user_update_path(iam_root): assert response['User']['Arn'].endswith(f':user{path}z/{name1}') assert [name1, name2] == user_list_names(iam_root, PathPrefix=path) + + +# IAM AccessKey apis +@pytest.mark.iam_account +@pytest.mark.iam_user +def test_account_user_access_key_create(iam_root): + path = get_iam_path_prefix() + name = make_iam_name('a') + with pytest.raises(iam_root.exceptions.NoSuchEntityException): + iam_root.create_access_key(UserName=name) + + iam_root.create_user(UserName=name, Path=path) + + response = iam_root.create_access_key(UserName=name) + key = response['AccessKey'] + assert name == key['UserName'] + assert len(key['AccessKeyId']) + assert len(key['SecretAccessKey']) + assert 'Active' == key['Status'] + assert key['CreateDate'] > datetime.datetime(1970, 1, 1, tzinfo=datetime.timezone.utc) + +@pytest.mark.iam_account +@pytest.mark.iam_user +def test_account_current_user_access_key_create(iam_root): + # omit the UserName argument to operate on the current authenticated + # user (assumed to be an account root user) + + response = iam_root.create_access_key() + key = response['AccessKey'] + keyid = key['AccessKeyId'] + assert len(keyid) + try: + assert len(key['SecretAccessKey']) + assert 'Active' == key['Status'] + assert key['CreateDate'] > datetime.datetime(1970, 1, 1, tzinfo=datetime.timezone.utc) + finally: + # iam_root doesn't see the account root user, so clean up + # this key manually + iam_root.delete_access_key(AccessKeyId=keyid) + +@pytest.mark.iam_account +@pytest.mark.iam_user +def test_account_user_access_key_update(iam_root): + with pytest.raises(iam_root.exceptions.NoSuchEntityException): + iam_root.update_access_key(UserName='nosuchuser', AccessKeyId='abcdefghijklmnopqrstu', Status='Active') + + path = get_iam_path_prefix() + name = make_iam_name('a') + iam_root.create_user(UserName=name, Path=path) + + response = iam_root.create_access_key(UserName=name) + key = response['AccessKey'] + keyid = key['AccessKeyId'] + create_date = key['CreateDate'] + assert create_date > datetime.datetime(1970, 1, 1, tzinfo=datetime.timezone.utc) + + with pytest.raises(iam_root.exceptions.NoSuchEntityException): + iam_root.update_access_key(UserName=name, AccessKeyId='abcdefghijklmnopqrstu', Status='Active') + + iam_root.update_access_key(UserName=name, AccessKeyId=keyid, Status='Active') + iam_root.update_access_key(UserName=name, AccessKeyId=keyid, Status='Inactive') + + response = iam_root.list_access_keys(UserName=name) + keys = response['AccessKeyMetadata'] + assert 1 == len(keys) + key = keys[0] + assert name == key['UserName'] + assert keyid == key['AccessKeyId'] + assert 'Inactive' == key['Status'] + assert create_date == key['CreateDate'] # CreateDate unchanged by update_access_key() + +@pytest.mark.iam_account +@pytest.mark.iam_user +def test_account_current_user_access_key_update(iam_root): + # omit the UserName argument to operate on the current authenticated + # user (assumed to be an account root user) + + with pytest.raises(iam_root.exceptions.NoSuchEntityException): + iam_root.update_access_key(AccessKeyId='abcdefghijklmnopqrstu', Status='Active') + + response = iam_root.create_access_key() + key = response['AccessKey'] + keyid = key['AccessKeyId'] + assert len(keyid) + try: + iam_root.update_access_key(AccessKeyId=keyid, Status='Active') + iam_root.update_access_key(AccessKeyId=keyid, Status='Inactive') + + # find the access key id we created + p = iam_root.get_paginator('list_access_keys') + for response in p.paginate(): + for key in response['AccessKeyMetadata']: + if keyid == key['AccessKeyId']: + assert 'Inactive' == key['Status'] + return + assert False, f'AccessKeyId={keyid} not found in list_access_keys()' + + finally: + # iam_root doesn't see the account root user, so clean up + # this key manually + iam_root.delete_access_key(AccessKeyId=keyid) + +@pytest.mark.iam_account +@pytest.mark.iam_user +def test_account_user_access_key_delete(iam_root): + with pytest.raises(iam_root.exceptions.NoSuchEntityException): + iam_root.delete_access_key(UserName='nosuchuser', AccessKeyId='abcdefghijklmnopqrstu') + + path = get_iam_path_prefix() + name = make_iam_name('a') + iam_root.create_user(UserName=name, Path=path) + + with pytest.raises(iam_root.exceptions.NoSuchEntityException): + iam_root.delete_access_key(UserName=name, AccessKeyId='abcdefghijklmnopqrstu') + + response = iam_root.create_access_key(UserName=name) + keyid = response['AccessKey']['AccessKeyId'] + + iam_root.delete_access_key(UserName=name, AccessKeyId=keyid) + + with pytest.raises(iam_root.exceptions.NoSuchEntityException): + iam_root.delete_access_key(UserName=name, AccessKeyId=keyid) + + response = iam_root.list_access_keys(UserName=name) + keys = response['AccessKeyMetadata'] + assert 0 == len(keys) + +@pytest.mark.iam_account +@pytest.mark.iam_user +def test_account_current_user_access_key_delete(iam_root): + # omit the UserName argument to operate on the current authenticated + # user (assumed to be an account root user) + + with pytest.raises(iam_root.exceptions.NoSuchEntityException): + iam_root.delete_access_key(AccessKeyId='abcdefghijklmnopqrstu') + + response = iam_root.create_access_key() + keyid = response['AccessKey']['AccessKeyId'] + + iam_root.delete_access_key(AccessKeyId=keyid) + + with pytest.raises(iam_root.exceptions.NoSuchEntityException): + iam_root.delete_access_key(AccessKeyId=keyid) + + # make sure list_access_keys() doesn't return the access key id we deleted + p = iam_root.get_paginator('list_access_keys') + for response in p.paginate(): + for key in response['AccessKeyMetadata']: + assert keyid != key['AccessKeyId'] + +def user_list_key_ids(client, **kwargs): + p = client.get_paginator('list_access_keys') + ids = [] + for response in p.paginate(**kwargs): + ids += [k['AccessKeyId'] for k in response['AccessKeyMetadata']] + return ids + +@pytest.mark.iam_account +@pytest.mark.iam_user +def test_account_user_access_key_list(iam_root): + with pytest.raises(iam_root.exceptions.NoSuchEntityException): + iam_root.list_access_keys(UserName='nosuchuser') + + path = get_iam_path_prefix() + name = make_iam_name('a') + iam_root.create_user(UserName=name, Path=path) + + assert [] == user_list_key_ids(iam_root, UserName=name) + assert [] == user_list_key_ids(iam_root, UserName=name, PaginationConfig={'PageSize': 1}) + + id1 = iam_root.create_access_key(UserName=name)['AccessKey']['AccessKeyId'] + + assert [id1] == user_list_key_ids(iam_root, UserName=name) + assert [id1] == user_list_key_ids(iam_root, UserName=name, PaginationConfig={'PageSize': 1}) + + id2 = iam_root.create_access_key(UserName=name)['AccessKey']['AccessKeyId'] + # AccessKeysPerUser=2 is the default quota in aws + + keys = sorted([id1, id2]) + assert keys == sorted(user_list_key_ids(iam_root, UserName=name)) + assert keys == sorted(user_list_key_ids(iam_root, UserName=name, PaginationConfig={'PageSize': 1})) From 0377466704fba6847e109b1ac035e48284293561 Mon Sep 17 00:00:00 2001 From: Casey Bodley Date: Mon, 1 Jan 2024 19:11:24 -0500 Subject: [PATCH 08/22] iam: test bucket policy principal for iam user with path Signed-off-by: Casey Bodley --- s3tests_boto3/functional/test_iam.py | 57 ++++++++++++++++++++++++++++ 1 file changed, 57 insertions(+) diff --git a/s3tests_boto3/functional/test_iam.py b/s3tests_boto3/functional/test_iam.py index be0a6bb..4919798 100644 --- a/s3tests_boto3/functional/test_iam.py +++ b/s3tests_boto3/functional/test_iam.py @@ -1,5 +1,6 @@ import json import datetime +import time from botocore.exceptions import ClientError import pytest @@ -1276,3 +1277,59 @@ def test_account_user_access_key_list(iam_root): keys = sorted([id1, id2]) assert keys == sorted(user_list_key_ids(iam_root, UserName=name)) assert keys == sorted(user_list_key_ids(iam_root, UserName=name, PaginationConfig={'PageSize': 1})) + +def retry_on(code, tries, func, *args, **kwargs): + for i in range(tries): + try: + return func(*args, **kwargs) + except ClientError as e: + err = e.response['Error']['Code'] + if i + 1 < tries and err in code: + print(f'Got {err}, retrying in {i}s..') + time.sleep(i) + continue + raise + + +@pytest.mark.iam_account +@pytest.mark.iam_user +def test_account_user_bucket_policy_allow(iam_root): + path = get_iam_path_prefix() + name = make_iam_name('name') + response = iam_root.create_user(UserName=name, Path=path) + user_arn = response['User']['Arn'] + assert user_arn.startswith('arn:aws:iam:') + assert user_arn.endswith(f':user{path}{name}') + + key = iam_root.create_access_key(UserName=name)['AccessKey'] + client = get_iam_s3client(aws_access_key_id=key['AccessKeyId'], + aws_secret_access_key=key['SecretAccessKey']) + + # create a bucket with the root user + roots3 = get_iam_root_client(service_name='s3') + bucket = get_new_bucket(roots3) + try: + # the access key may take a bit to start working. retry until it returns + # something other than InvalidAccessKeyId + e = assert_raises(ClientError, retry_on, 'InvalidAccessKeyId', 10, client.list_objects, Bucket=bucket) + # expect AccessDenied because no identity policy allows s3 actions + status, error_code = _get_status_and_error_code(e.response) + assert status == 403 + assert error_code == 'AccessDenied' + + # add a bucket policy that allows s3:ListBucket for the iam user's arn + policy = json.dumps({ + 'Version': '2012-10-17', + 'Statement': [{ + 'Effect': 'Allow', + 'Principal': {'AWS': user_arn}, + 'Action': 's3:ListBucket', + 'Resource': f'arn:aws:s3:::{bucket}' + }] + }) + roots3.put_bucket_policy(Bucket=bucket, Policy=policy) + + # verify that the iam user can eventually access it + retry_on('AccessDenied', 10, client.list_objects, Bucket=bucket) + finally: + roots3.delete_bucket(Bucket=bucket) From 364f29d087e12b5a3106598b9dfce8321f52f7dc Mon Sep 17 00:00:00 2001 From: Casey Bodley Date: Thu, 11 Jan 2024 22:49:54 -0500 Subject: [PATCH 09/22] iam: add account tests for UserPolicy apis adds test cases for the following iam actions: * PutUserPolicy * GetUserPolicy * DeleteUserPolicy * ListUserPolicies verified to pass against aws when an account root user's credentials are provided in the [iam] section of s3tests.conf Signed-off-by: Casey Bodley --- s3tests_boto3/functional/test_iam.py | 109 +++++++++++++++++++++++++++ 1 file changed, 109 insertions(+) diff --git a/s3tests_boto3/functional/test_iam.py b/s3tests_boto3/functional/test_iam.py index 4919798..1e09a5e 100644 --- a/s3tests_boto3/functional/test_iam.py +++ b/s3tests_boto3/functional/test_iam.py @@ -16,6 +16,7 @@ from . import ( make_iam_name, get_iam_path_prefix, get_new_bucket, + get_new_bucket_name, get_iam_s3client, get_alt_iam_client, get_alt_user_id, @@ -878,12 +879,25 @@ def nuke_user_keys(client, name): except: pass +def nuke_user_policies(client, name): + p = client.get_paginator('list_user_policies') + for response in p.paginate(UserName=name): + for policy in response['PolicyNames']: + try: + client.delete_user_policy(UserName=name, PolicyName=policy) + except: + pass + def nuke_user(client, name): # delete access keys, user policies, etc try: nuke_user_keys(client, name) except: pass + try: + nuke_user_policies(client, name) + except: + pass client.delete_user(UserName=name) def nuke_users(client, **kwargs): @@ -1333,3 +1347,98 @@ def test_account_user_bucket_policy_allow(iam_root): retry_on('AccessDenied', 10, client.list_objects, Bucket=bucket) finally: roots3.delete_bucket(Bucket=bucket) + + +# IAM UserPolicy apis +@pytest.mark.user_policy +@pytest.mark.iam_account +def test_account_user_policy(iam_root): + path = get_iam_path_prefix() + name = make_iam_name('name') + policy_name = 'List' + bucket_name = get_new_bucket_name() + policy1 = json.dumps({'Version': '2012-10-17', 'Statement': [ + {'Effect': 'Deny', + 'Action': 's3:ListBucket', + 'Resource': f'arn:aws:s3:::{bucket_name}'}]}) + policy2 = json.dumps({'Version': '2012-10-17', 'Statement': [ + {'Effect': 'Allow', + 'Action': 's3:ListBucket', + 'Resource': f'arn:aws:s3:::{bucket_name}'}]}) + + # Get/Put/Delete fail on nonexistent UserName + with pytest.raises(iam_root.exceptions.NoSuchEntityException): + iam_root.get_user_policy(UserName=name, PolicyName=policy_name) + with pytest.raises(iam_root.exceptions.NoSuchEntityException): + iam_root.delete_user_policy(UserName=name, PolicyName=policy_name) + with pytest.raises(iam_root.exceptions.NoSuchEntityException): + iam_root.put_user_policy(UserName=name, PolicyName=policy_name, PolicyDocument=policy1) + + iam_root.create_user(UserName=name, Path=path) + + # Get/Delete fail on nonexistent PolicyName + with pytest.raises(iam_root.exceptions.NoSuchEntityException): + iam_root.get_user_policy(UserName=name, PolicyName=policy_name) + with pytest.raises(iam_root.exceptions.NoSuchEntityException): + iam_root.delete_user_policy(UserName=name, PolicyName=policy_name) + + iam_root.put_user_policy(UserName=name, PolicyName=policy_name, PolicyDocument=policy1) + + response = iam_root.get_user_policy(UserName=name, PolicyName=policy_name) + assert policy1 == json.dumps(response['PolicyDocument']) + response = iam_root.list_user_policies(UserName=name) + assert [policy_name] == response['PolicyNames'] + + iam_root.put_user_policy(UserName=name, PolicyName=policy_name, PolicyDocument=policy2) + + response = iam_root.get_user_policy(UserName=name, PolicyName=policy_name) + assert policy2 == json.dumps(response['PolicyDocument']) + response = iam_root.list_user_policies(UserName=name) + assert [policy_name] == response['PolicyNames'] + + iam_root.delete_user_policy(UserName=name, PolicyName=policy_name) + + # Get/Delete fail after Delete + with pytest.raises(iam_root.exceptions.NoSuchEntityException): + iam_root.get_user_policy(UserName=name, PolicyName=policy_name) + with pytest.raises(iam_root.exceptions.NoSuchEntityException): + iam_root.delete_user_policy(UserName=name, PolicyName=policy_name) + + response = iam_root.list_user_policies(UserName=name) + assert [] == response['PolicyNames'] + +@pytest.mark.user_policy +@pytest.mark.iam_account +def test_account_user_policy_allow(iam_root): + path = get_iam_path_prefix() + name = make_iam_name('name') + bucket_name = get_new_bucket_name() + iam_root.create_user(UserName=name, Path=path) + + key = iam_root.create_access_key(UserName=name)['AccessKey'] + client = get_iam_s3client(aws_access_key_id=key['AccessKeyId'], + aws_secret_access_key=key['SecretAccessKey']) + + # the access key may take a bit to start working. retry until it returns + # something other than InvalidAccessKeyId + e = assert_raises(ClientError, retry_on, 'InvalidAccessKeyId', 10, client.list_buckets) + # expect AccessDenied because no identity policy allows s3 actions + status, error_code = _get_status_and_error_code(e.response) + assert status == 403 + assert error_code == 'AccessDenied' + + # add a user policy that allows s3 actions + policy = json.dumps({ + 'Version': '2012-10-17', + 'Statement': [{ + 'Effect': 'Allow', + 'Action': 's3:*', + 'Resource': '*' + }] + }) + policy_name = 'AllowStar' + iam_root.put_user_policy(UserName=name, PolicyName=policy_name, PolicyDocument=policy) + + # the policy may take a bit to start working. retry until it returns + # something other than AccessDenied + retry_on('AccessDenied', 10, client.list_buckets) From c6e40b4ffa4e216d9e07a5f6a97c3c1275c3e389 Mon Sep 17 00:00:00 2001 From: Casey Bodley Date: Sun, 4 Feb 2024 18:06:39 -0500 Subject: [PATCH 10/22] iam: test managed user policy Signed-off-by: Casey Bodley --- s3tests_boto3/functional/test_iam.py | 70 ++++++++++++++++++++++++++++ 1 file changed, 70 insertions(+) diff --git a/s3tests_boto3/functional/test_iam.py b/s3tests_boto3/functional/test_iam.py index 1e09a5e..1eeb4e3 100644 --- a/s3tests_boto3/functional/test_iam.py +++ b/s3tests_boto3/functional/test_iam.py @@ -888,6 +888,15 @@ def nuke_user_policies(client, name): except: pass +def nuke_attached_user_policies(client, name): + p = client.get_paginator('list_attached_user_policies') + for response in p.paginate(UserName=name): + for policy in response['AttachedPolicies']: + try: + client.detach_user_policy(UserName=name, PolicyArn=policy['PolicyArn']) + except: + pass + def nuke_user(client, name): # delete access keys, user policies, etc try: @@ -898,6 +907,10 @@ def nuke_user(client, name): nuke_user_policies(client, name) except: pass + try: + nuke_attached_user_policies(client, name) + except: + pass client.delete_user(UserName=name) def nuke_users(client, **kwargs): @@ -1407,6 +1420,63 @@ def test_account_user_policy(iam_root): response = iam_root.list_user_policies(UserName=name) assert [] == response['PolicyNames'] +@pytest.mark.user_policy +@pytest.mark.iam_account +def test_account_user_policy_managed(iam_root): + path = get_iam_path_prefix() + name = make_iam_name('name') + policy1 = 'arn:aws:iam::aws:policy/AmazonS3FullAccess' + policy2 = 'arn:aws:iam::aws:policy/AmazonS3ReadOnlyAccess' + + # Attach/Detach/List fail on nonexistent UserName + with pytest.raises(iam_root.exceptions.NoSuchEntityException): + iam_root.attach_user_policy(UserName=name, PolicyArn=policy1) + with pytest.raises(iam_root.exceptions.NoSuchEntityException): + iam_root.detach_user_policy(UserName=name, PolicyArn=policy1) + with pytest.raises(iam_root.exceptions.NoSuchEntityException): + iam_root.list_attached_user_policies(UserName=name) + + iam_root.create_user(UserName=name, Path=path) + + # Detach fails on unattached PolicyArn + with pytest.raises(iam_root.exceptions.NoSuchEntityException): + iam_root.detach_user_policy(UserName=name, PolicyArn=policy1) + + iam_root.attach_user_policy(UserName=name, PolicyArn=policy1) + iam_root.attach_user_policy(UserName=name, PolicyArn=policy1) + + response = iam_root.list_attached_user_policies(UserName=name) + assert len(response['AttachedPolicies']) == 1 + assert 'AmazonS3FullAccess' == response['AttachedPolicies'][0]['PolicyName'] + assert policy1 == response['AttachedPolicies'][0]['PolicyArn'] + + iam_root.attach_user_policy(UserName=name, PolicyArn=policy2) + + response = iam_root.list_attached_user_policies(UserName=name) + policies = response['AttachedPolicies'] + assert len(policies) == 2 + names = [p['PolicyName'] for p in policies] + arns = [p['PolicyArn'] for p in policies] + assert 'AmazonS3FullAccess' in names + assert policy1 in arns + assert 'AmazonS3ReadOnlyAccess' in names + assert policy2 in arns + + iam_root.detach_user_policy(UserName=name, PolicyArn=policy2) + + # Detach fails after Detach + with pytest.raises(iam_root.exceptions.NoSuchEntityException): + iam_root.detach_user_policy(UserName=name, PolicyArn=policy2) + + response = iam_root.list_attached_user_policies(UserName=name) + assert len(response['AttachedPolicies']) == 1 + assert 'AmazonS3FullAccess' == response['AttachedPolicies'][0]['PolicyName'] + assert policy1 == response['AttachedPolicies'][0]['PolicyArn'] + + # DeleteUser fails while policies are still attached + with pytest.raises(iam_root.exceptions.DeleteConflictException): + iam_root.delete_user(UserName=name) + @pytest.mark.user_policy @pytest.mark.iam_account def test_account_user_policy_allow(iam_root): From d4ada317e1327dc89ca5e8dada94bc7098050843 Mon Sep 17 00:00:00 2001 From: Casey Bodley Date: Fri, 26 Jan 2024 15:23:40 -0500 Subject: [PATCH 11/22] iam: add account tests for Role apis adds test cases for the following iam actions: * CreateRole * GetRole * ListRoles * DeleteRole * UpdateRole verified to pass against aws when an account root user's credentials are provided in the [iam] section of s3tests.conf Signed-off-by: Casey Bodley --- pytest.ini | 1 + s3tests_boto3/functional/test_iam.py | 206 +++++++++++++++++++++++++++ 2 files changed, 207 insertions(+) diff --git a/pytest.ini b/pytest.ini index dcaa0f7..4a94bc8 100644 --- a/pytest.ini +++ b/pytest.ini @@ -17,6 +17,7 @@ markers = fails_on_s3 fails_with_subdomain iam_account + iam_role iam_tenant iam_user lifecycle diff --git a/s3tests_boto3/functional/test_iam.py b/s3tests_boto3/functional/test_iam.py index 1eeb4e3..95fcb80 100644 --- a/s3tests_boto3/functional/test_iam.py +++ b/s3tests_boto3/functional/test_iam.py @@ -922,6 +922,32 @@ def nuke_users(client, **kwargs): except: pass +def nuke_role_policies(client, name): + p = client.get_paginator('list_role_policies') + for response in p.paginate(RoleName=name): + for policy in response['PolicyNames']: + try: + client.delete_role_policy(RoleName=name, PolicyName=policy) + except: + pass + +def nuke_role(client, name): + # delete role policies, etc + try: + nuke_role_policies(client, name) + except: + pass + client.delete_role(RoleName=name) + +def nuke_roles(client, **kwargs): + p = client.get_paginator('list_roles') + for response in p.paginate(**kwargs): + for role in response['Roles']: + try: + nuke_role(client, role['RoleName']) + except: + pass + # fixture for iam account root user @pytest.fixture def iam_root(configfile): @@ -935,6 +961,7 @@ def iam_root(configfile): yield client nuke_users(client, PathPrefix=get_iam_path_prefix()) + nuke_roles(client, PathPrefix=get_iam_path_prefix()) # IAM User apis @@ -1512,3 +1539,182 @@ def test_account_user_policy_allow(iam_root): # the policy may take a bit to start working. retry until it returns # something other than AccessDenied retry_on('AccessDenied', 10, client.list_buckets) + + +assume_role_policy = json.dumps({ + 'Version': '2012-10-17', + 'Statement': [{ + 'Effect': 'Allow', + 'Action': 'sts:AssumeRole', + 'Principal': {'AWS': '*'} + }] + }) + +# IAM Role apis +@pytest.mark.iam_account +@pytest.mark.iam_role +def test_account_role_create(iam_root): + path = get_iam_path_prefix() + name1 = make_iam_name('R1') + desc = 'my role description' + max_duration = 43200 + response = iam_root.create_role(RoleName=name1, Path=path, AssumeRolePolicyDocument=assume_role_policy, Description=desc, MaxSessionDuration=max_duration) + role = response['Role'] + assert role['Path'] == path + assert role['RoleName'] == name1 + assert assume_role_policy == json.dumps(role['AssumeRolePolicyDocument']) + assert len(role['RoleId']) + arn = role['Arn'] + assert arn.startswith('arn:aws:iam:') + assert arn.endswith(f':role{path}{name1}') + assert role['CreateDate'] > datetime.datetime(1970, 1, 1, tzinfo=datetime.timezone.utc) + # AWS doesn't include these for CreateRole, only GetRole + #assert desc == role['Description'] + #assert max_duration == role['MaxSessionDuration'] + + response = iam_root.get_role(RoleName=name1) + role = response['Role'] + assert arn == role['Arn'] + assert desc == role['Description'] + assert max_duration == role['MaxSessionDuration'] + + path2 = get_iam_path_prefix() + 'foo/' + with pytest.raises(iam_root.exceptions.EntityAlreadyExistsException): + iam_root.create_role(RoleName=name1, Path=path2, AssumeRolePolicyDocument=assume_role_policy) + + name2 = make_iam_name('R2') + response = iam_root.create_role(RoleName=name2, Path=path2, AssumeRolePolicyDocument=assume_role_policy) + role = response['Role'] + assert role['Path'] == path2 + assert role['RoleName'] == name2 + +@pytest.mark.iam_account +@pytest.mark.iam_role +def test_account_role_case_insensitive_name(iam_root): + path = get_iam_path_prefix() + name_upper = make_iam_name('R1') + name_lower = make_iam_name('r1') + response = iam_root.create_role(RoleName=name_upper, Path=path, AssumeRolePolicyDocument=assume_role_policy) + rid = response['Role']['RoleId'] + + # name is case-insensitive, so 'r1' should also conflict + with pytest.raises(iam_root.exceptions.EntityAlreadyExistsException): + iam_root.create_role(RoleName=name_lower, AssumeRolePolicyDocument=assume_role_policy) + + # search for 'r1' should return the same 'R1' role + response = iam_root.get_role(RoleName=name_lower) + assert rid == response['Role']['RoleId'] + + # delete for 'r1' should delete the same 'R1' role + iam_root.delete_role(RoleName=name_lower) + + with pytest.raises(iam_root.exceptions.NoSuchEntityException): + iam_root.get_role(RoleName=name_lower) + +@pytest.mark.iam_account +@pytest.mark.iam_role +def test_account_role_delete(iam_root): + path = get_iam_path_prefix() + name = make_iam_name('U1') + with pytest.raises(iam_root.exceptions.NoSuchEntityException): + iam_root.delete_role(RoleName=name) + + response = iam_root.create_role(RoleName=name, Path=path, AssumeRolePolicyDocument=assume_role_policy) + uid = response['Role']['RoleId'] + create_date = response['Role']['CreateDate'] + + iam_root.delete_role(RoleName=name) + + response = iam_root.create_role(RoleName=name, Path=path, AssumeRolePolicyDocument=assume_role_policy) + assert uid != response['Role']['RoleId'] + assert create_date <= response['Role']['CreateDate'] + +def role_list_names(client, **kwargs): + p = client.get_paginator('list_roles') + rolenames = [] + for response in p.paginate(**kwargs): + rolenames += [u['RoleName'] for u in response['Roles']] + return rolenames + +@pytest.mark.iam_account +@pytest.mark.iam_role +def test_account_role_list(iam_root): + path = get_iam_path_prefix() + response = iam_root.list_roles(PathPrefix=path) + assert len(response['Roles']) == 0 + assert response['IsTruncated'] == False + + name1 = make_iam_name('aa') + name2 = make_iam_name('Ab') + name3 = make_iam_name('ac') + name4 = make_iam_name('Ad') + + # sort order is independent of CreateDate, Path, and RoleName capitalization + iam_root.create_role(RoleName=name4, Path=path+'w/', AssumeRolePolicyDocument=assume_role_policy) + iam_root.create_role(RoleName=name3, Path=path+'x/', AssumeRolePolicyDocument=assume_role_policy) + iam_root.create_role(RoleName=name2, Path=path+'y/', AssumeRolePolicyDocument=assume_role_policy) + iam_root.create_role(RoleName=name1, Path=path+'z/', AssumeRolePolicyDocument=assume_role_policy) + + assert [name1, name2, name3, name4] == \ + role_list_names(iam_root, PathPrefix=path) + assert [name1, name2, name3, name4] == \ + role_list_names(iam_root, PathPrefix=path, PaginationConfig={'PageSize': 1}) + +@pytest.mark.iam_account +@pytest.mark.iam_role +def test_account_role_list_path_prefix(iam_root): + path = get_iam_path_prefix() + response = iam_root.list_roles(PathPrefix=path) + assert len(response['Roles']) == 0 + assert response['IsTruncated'] == False + + name1 = make_iam_name('a') + name2 = make_iam_name('b') + name3 = make_iam_name('c') + name4 = make_iam_name('d') + + iam_root.create_role(RoleName=name1, Path=path, AssumeRolePolicyDocument=assume_role_policy) + iam_root.create_role(RoleName=name2, Path=path, AssumeRolePolicyDocument=assume_role_policy) + iam_root.create_role(RoleName=name3, Path=path+'a/', AssumeRolePolicyDocument=assume_role_policy) + iam_root.create_role(RoleName=name4, Path=path+'a/x/', AssumeRolePolicyDocument=assume_role_policy) + + assert [name1, name2, name3, name4] == \ + role_list_names(iam_root, PathPrefix=path) + assert [name1, name2, name3, name4] == \ + role_list_names(iam_root, PathPrefix=path, + PaginationConfig={'PageSize': 1}) + assert [name3, name4] == \ + role_list_names(iam_root, PathPrefix=path+'a') + assert [name3, name4] == \ + role_list_names(iam_root, PathPrefix=path+'a', + PaginationConfig={'PageSize': 1}) + assert [name4] == \ + role_list_names(iam_root, PathPrefix=path+'a/x') + assert [name4] == \ + role_list_names(iam_root, PathPrefix=path+'a/x', + PaginationConfig={'PageSize': 1}) + assert [] == role_list_names(iam_root, PathPrefix=path+'a/x/d') + +@pytest.mark.iam_account +@pytest.mark.iam_role +def test_account_role_update(iam_root): + path = get_iam_path_prefix() + name = make_iam_name('a') + with pytest.raises(iam_root.exceptions.NoSuchEntityException): + iam_root.update_role(RoleName=name) + + iam_root.create_role(RoleName=name, Path=path, AssumeRolePolicyDocument=assume_role_policy) + + response = iam_root.get_role(RoleName=name) + assert name == response['Role']['RoleName'] + arn = response['Role']['Arn'] + rid = response['Role']['RoleId'] + + desc = 'my role description' + iam_root.update_role(RoleName=name, Description=desc, MaxSessionDuration=43200) + + response = iam_root.get_role(RoleName=name) + assert rid == response['Role']['RoleId'] + assert arn == response['Role']['Arn'] + assert desc == response['Role']['Description'] + assert 43200 == response['Role']['MaxSessionDuration'] From cefea0fd26f75d94f2e8f100adeb2190faafb3ea Mon Sep 17 00:00:00 2001 From: Casey Bodley Date: Fri, 26 Jan 2024 15:51:55 -0500 Subject: [PATCH 12/22] iam: add account test for RolePolicy apis adds test cases for the following iam actions: * PutRolePolicy * GetRolePolicy * DeleteRolePolicy * ListRolePolicies verified to pass against aws when an account root user's credentials are provided in the [iam] section of s3tests.conf Signed-off-by: Casey Bodley --- pytest.ini | 1 + s3tests_boto3/functional/test_iam.py | 120 +++++++++++++++++++++++++++ 2 files changed, 121 insertions(+) diff --git a/pytest.ini b/pytest.ini index 4a94bc8..3ce92cf 100644 --- a/pytest.ini +++ b/pytest.ini @@ -25,6 +25,7 @@ markers = lifecycle_transition list_objects_v2 object_lock + role_policy session_policy s3select s3website diff --git a/s3tests_boto3/functional/test_iam.py b/s3tests_boto3/functional/test_iam.py index 95fcb80..b06c5b8 100644 --- a/s3tests_boto3/functional/test_iam.py +++ b/s3tests_boto3/functional/test_iam.py @@ -20,6 +20,7 @@ from . import ( get_iam_s3client, get_alt_iam_client, get_alt_user_id, + get_sts_client, ) from .utils import _get_status, _get_status_and_error_code @@ -1718,3 +1719,122 @@ def test_account_role_update(iam_root): assert arn == response['Role']['Arn'] assert desc == response['Role']['Description'] assert 43200 == response['Role']['MaxSessionDuration'] + + +role_policy = json.dumps({ + 'Version': '2012-10-17', + 'Statement': [{ + 'Effect': 'Allow', + 'Action': 's3:*', + "Resource": "*" + }] + }) + +# IAM RolePolicy apis +@pytest.mark.iam_account +@pytest.mark.iam_role +@pytest.mark.role_policy +def test_account_role_policy(iam_root): + path = get_iam_path_prefix() + role_name = make_iam_name('r') + policy_name = 'MyPolicy' + policy2_name = 'AnotherPolicy' + + # Get/Put/Delete fail on nonexistent RoleName + with pytest.raises(iam_root.exceptions.NoSuchEntityException): + iam_root.get_role_policy(RoleName=role_name, PolicyName=policy_name) + with pytest.raises(iam_root.exceptions.NoSuchEntityException): + iam_root.delete_role_policy(RoleName=role_name, PolicyName=policy_name) + with pytest.raises(iam_root.exceptions.NoSuchEntityException): + iam_root.put_role_policy(RoleName=role_name, PolicyName=policy_name, PolicyDocument=role_policy) + + iam_root.create_role(RoleName=role_name, Path=path, AssumeRolePolicyDocument=assume_role_policy) + + # Get/Delete fail on nonexistent PolicyName + with pytest.raises(iam_root.exceptions.NoSuchEntityException): + iam_root.get_role_policy(RoleName=role_name, PolicyName=policy_name) + with pytest.raises(iam_root.exceptions.NoSuchEntityException): + iam_root.delete_role_policy(RoleName=role_name, PolicyName=policy_name) + + iam_root.put_role_policy(RoleName=role_name, PolicyName=policy_name, PolicyDocument=role_policy) + + response = iam_root.get_role_policy(RoleName=role_name, PolicyName=policy_name) + assert role_name == response['RoleName'] + assert policy_name == response['PolicyName'] + assert role_policy == json.dumps(response['PolicyDocument']) + + response = iam_root.list_role_policies(RoleName=role_name) + assert [policy_name] == response['PolicyNames'] + + iam_root.put_role_policy(RoleName=role_name, PolicyName=policy2_name, PolicyDocument=role_policy) + + response = iam_root.list_role_policies(RoleName=role_name) + assert [policy2_name, policy_name] == response['PolicyNames'] + + iam_root.delete_role_policy(RoleName=role_name, PolicyName=policy_name) + iam_root.delete_role_policy(RoleName=role_name, PolicyName=policy2_name) + + # Get/Delete fail after Delete + with pytest.raises(iam_root.exceptions.NoSuchEntityException): + iam_root.get_role_policy(RoleName=role_name, PolicyName=policy_name) + with pytest.raises(iam_root.exceptions.NoSuchEntityException): + iam_root.delete_role_policy(RoleName=role_name, PolicyName=policy_name) + +@pytest.mark.iam_account +@pytest.mark.iam_role +@pytest.mark.role_policy +def test_account_role_policy_allow(iam_root): + path = get_iam_path_prefix() + user_name = make_iam_name('MyUser') + role_name = make_iam_name('MyRole') + session_name = 'MySession' + + user = iam_root.create_user(UserName=user_name, Path=path)['User'] + user_arn = user['Arn'] + + trust_policy = json.dumps({ + 'Version': '2012-10-17', + 'Statement': [{ + 'Effect': 'Allow', + 'Action': 'sts:AssumeRole', + 'Principal': {'AWS': user_arn} + }] + }) + # returns MalformedPolicyDocument until the user arn starts working + role = retry_on('MalformedPolicyDocument', 10, iam_root.create_role, + RoleName=role_name, Path=path, AssumeRolePolicyDocument=trust_policy)['Role'] + role_arn = role['Arn'] + + key = iam_root.create_access_key(UserName=user_name)['AccessKey'] + sts = get_sts_client(aws_access_key_id=key['AccessKeyId'], + aws_secret_access_key=key['SecretAccessKey']) + + # returns InvalidClientTokenId or AccessDenied until the access key starts working + response = retry_on(('InvalidClientTokenId', 'AccessDenied'), 10, sts.assume_role, + RoleArn=role_arn, RoleSessionName=session_name) + creds = response['Credentials'] + + s3 = get_iam_s3client(aws_access_key_id = creds['AccessKeyId'], + aws_secret_access_key = creds['SecretAccessKey'], + aws_session_token = creds['SessionToken']) + + # expect AccessDenied because no identity policy allows s3 actions + e = assert_raises(ClientError, s3.list_buckets) + status, error_code = _get_status_and_error_code(e.response) + assert status == 403 + assert error_code == 'AccessDenied' + + policy_name = 'AllowListAllMyBuckets' + policy = json.dumps({ + 'Version': '2012-10-17', + 'Statement': [{ + 'Effect': 'Allow', + 'Action': 's3:ListAllMyBuckets', + 'Resource': '*' + }] + }) + iam_root.put_role_policy(RoleName=role_name, PolicyName=policy_name, PolicyDocument=policy) + + # the policy may take a bit to start working. retry until it returns + # something other than AccessDenied + retry_on('AccessDenied', 10, s3.list_buckets) From 46217fcf812fb1462b1b846d0fa45a9a7436d868 Mon Sep 17 00:00:00 2001 From: Casey Bodley Date: Sun, 4 Feb 2024 18:07:09 -0500 Subject: [PATCH 13/22] iam: test managed role policy Signed-off-by: Casey Bodley --- s3tests_boto3/functional/test_iam.py | 70 ++++++++++++++++++++++++++++ 1 file changed, 70 insertions(+) diff --git a/s3tests_boto3/functional/test_iam.py b/s3tests_boto3/functional/test_iam.py index b06c5b8..a49f129 100644 --- a/s3tests_boto3/functional/test_iam.py +++ b/s3tests_boto3/functional/test_iam.py @@ -932,12 +932,25 @@ def nuke_role_policies(client, name): except: pass +def nuke_attached_role_policies(client, name): + p = client.get_paginator('list_attached_role_policies') + for response in p.paginate(RoleName=name): + for policy in response['AttachedPolicies']: + try: + client.detach_role_policy(RoleName=name, PolicyArn=policy['PolicyArn']) + except: + pass + def nuke_role(client, name): # delete role policies, etc try: nuke_role_policies(client, name) except: pass + try: + nuke_attached_role_policies(client, name) + except: + pass client.delete_role(RoleName=name) def nuke_roles(client, **kwargs): @@ -1780,6 +1793,63 @@ def test_account_role_policy(iam_root): with pytest.raises(iam_root.exceptions.NoSuchEntityException): iam_root.delete_role_policy(RoleName=role_name, PolicyName=policy_name) +@pytest.mark.role_policy +@pytest.mark.iam_account +def test_account_role_policy_managed(iam_root): + path = get_iam_path_prefix() + name = make_iam_name('name') + policy1 = 'arn:aws:iam::aws:policy/AmazonS3FullAccess' + policy2 = 'arn:aws:iam::aws:policy/AmazonS3ReadOnlyAccess' + + # Attach/Detach/List fail on nonexistent RoleName + with pytest.raises(iam_root.exceptions.NoSuchEntityException): + iam_root.attach_role_policy(RoleName=name, PolicyArn=policy1) + with pytest.raises(iam_root.exceptions.NoSuchEntityException): + iam_root.detach_role_policy(RoleName=name, PolicyArn=policy1) + with pytest.raises(iam_root.exceptions.NoSuchEntityException): + iam_root.list_attached_role_policies(RoleName=name) + + iam_root.create_role(RoleName=name, Path=path, AssumeRolePolicyDocument=assume_role_policy) + + # Detach fails on unattached PolicyArn + with pytest.raises(iam_root.exceptions.NoSuchEntityException): + iam_root.detach_role_policy(RoleName=name, PolicyArn=policy1) + + iam_root.attach_role_policy(RoleName=name, PolicyArn=policy1) + iam_root.attach_role_policy(RoleName=name, PolicyArn=policy1) + + response = iam_root.list_attached_role_policies(RoleName=name) + assert len(response['AttachedPolicies']) == 1 + assert 'AmazonS3FullAccess' == response['AttachedPolicies'][0]['PolicyName'] + assert policy1 == response['AttachedPolicies'][0]['PolicyArn'] + + iam_root.attach_role_policy(RoleName=name, PolicyArn=policy2) + + response = iam_root.list_attached_role_policies(RoleName=name) + policies = response['AttachedPolicies'] + assert len(policies) == 2 + names = [p['PolicyName'] for p in policies] + arns = [p['PolicyArn'] for p in policies] + assert 'AmazonS3FullAccess' in names + assert policy1 in arns + assert 'AmazonS3ReadOnlyAccess' in names + assert policy2 in arns + + iam_root.detach_role_policy(RoleName=name, PolicyArn=policy2) + + # Detach fails after Detach + with pytest.raises(iam_root.exceptions.NoSuchEntityException): + iam_root.detach_role_policy(RoleName=name, PolicyArn=policy2) + + response = iam_root.list_attached_role_policies(RoleName=name) + assert len(response['AttachedPolicies']) == 1 + assert 'AmazonS3FullAccess' == response['AttachedPolicies'][0]['PolicyName'] + assert policy1 == response['AttachedPolicies'][0]['PolicyArn'] + + # DeleteRole fails while policies are still attached + with pytest.raises(iam_root.exceptions.DeleteConflictException): + iam_root.delete_role(RoleName=name) + @pytest.mark.iam_account @pytest.mark.iam_role @pytest.mark.role_policy From ed4a8e2244372d583f204ab60e339c730d61ace3 Mon Sep 17 00:00:00 2001 From: Casey Bodley Date: Tue, 30 Jan 2024 18:16:40 -0500 Subject: [PATCH 14/22] config: add [iam alt root] for an alt account's root user Signed-off-by: Casey Bodley --- s3tests.conf.SAMPLE | 7 +++++++ s3tests_boto3/functional/__init__.py | 26 ++++++++++++++++++++++++++ 2 files changed, 33 insertions(+) diff --git a/s3tests.conf.SAMPLE b/s3tests.conf.SAMPLE index 550e806..c0dc89a 100644 --- a/s3tests.conf.SAMPLE +++ b/s3tests.conf.SAMPLE @@ -142,6 +142,13 @@ secret_key = aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa user_id = RGW11111111111111111 email = account1@ceph.com +# iam account root user in a different account than [iam root] +[iam alt root] +access_key = BBBBBBBBBBBBBBBBBBbb +secret_key = bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb +user_id = RGW22222222222222222 +email = account2@ceph.com + #following section needs to be added when you want to run Assume Role With Webidentity test [webidentity] #used for assume role with web identity test in sts-tests diff --git a/s3tests_boto3/functional/__init__.py b/s3tests_boto3/functional/__init__.py index d1330a9..2f9f7e1 100644 --- a/s3tests_boto3/functional/__init__.py +++ b/s3tests_boto3/functional/__init__.py @@ -268,8 +268,14 @@ def configure(): config.iam_root_access_key = cfg.get('iam root',"access_key") config.iam_root_secret_key = cfg.get('iam root',"secret_key") + config.iam_root_user_id = cfg.get('iam root',"user_id") config.iam_root_email = cfg.get('iam root',"email") + config.iam_alt_root_access_key = cfg.get('iam alt root',"access_key") + config.iam_alt_root_secret_key = cfg.get('iam alt root',"secret_key") + config.iam_alt_root_user_id = cfg.get('iam alt root',"user_id") + config.iam_alt_root_email = cfg.get('iam alt root',"email") + # vars from the fixtures section template = cfg.get('fixtures', "bucket prefix", fallback='test-{random}-') prefix = choose_bucket_prefix(template=template) @@ -455,6 +461,17 @@ def get_iam_root_client(**kwargs): verify=config.default_ssl_verify, **kwargs) +def get_iam_alt_root_client(**kwargs): + kwargs.setdefault('service_name', 'iam') + kwargs.setdefault('aws_access_key_id', config.iam_alt_root_access_key) + kwargs.setdefault('aws_secret_access_key', config.iam_alt_root_secret_key) + + return boto3.client(endpoint_url=config.default_endpoint, + region_name='', + use_ssl=config.default_is_secure, + verify=config.default_ssl_verify, + **kwargs) + def get_alt_client(client_config=None): if client_config == None: client_config = Config(signature_version='s3v4') @@ -716,9 +733,18 @@ def get_iam_access_key(): def get_iam_secret_key(): return config.iam_secret_key +def get_iam_root_user_id(): + return config.iam_root_user_id + def get_iam_root_email(): return config.iam_root_email +def get_iam_alt_root_user_id(): + return config.iam_alt_root_user_id + +def get_iam_alt_root_email(): + return config.iam_alt_root_email + def get_user_token(): return config.webidentity_user_token From ba292fbf59897006d4aba32365129c27cea6c200 Mon Sep 17 00:00:00 2001 From: Casey Bodley Date: Tue, 30 Jan 2024 18:17:17 -0500 Subject: [PATCH 15/22] iam: test cross-account permissions test the [iam alt root] user's access to buckets owned by [iam root] using various policy principals and acl grantees Signed-off-by: Casey Bodley --- pytest.ini | 1 + s3tests_boto3/functional/test_iam.py | 379 +++++++++++++++++++++++++++ 2 files changed, 380 insertions(+) diff --git a/pytest.ini b/pytest.ini index 3ce92cf..4c456be 100644 --- a/pytest.ini +++ b/pytest.ini @@ -17,6 +17,7 @@ markers = fails_on_s3 fails_with_subdomain iam_account + iam_cross_account iam_role iam_tenant iam_user diff --git a/s3tests_boto3/functional/test_iam.py b/s3tests_boto3/functional/test_iam.py index a49f129..4cfbfad 100644 --- a/s3tests_boto3/functional/test_iam.py +++ b/s3tests_boto3/functional/test_iam.py @@ -13,6 +13,11 @@ from . import ( get_alt_client, get_iam_client, get_iam_root_client, + get_iam_root_user_id, + get_iam_root_email, + get_iam_alt_root_client, + get_iam_alt_root_user_id, + get_iam_alt_root_email, make_iam_name, get_iam_path_prefix, get_new_bucket, @@ -1908,3 +1913,377 @@ def test_account_role_policy_allow(iam_root): # the policy may take a bit to start working. retry until it returns # something other than AccessDenied retry_on('AccessDenied', 10, s3.list_buckets) + + +# fixture for iam alt account root user +@pytest.fixture +def iam_alt_root(configfile): + client = get_iam_alt_root_client() + try: + arn = client.get_user()['User']['Arn'] + if not arn.endswith(':root'): + pytest.skip('[iam alt root] user does not have :root arn') + except ClientError as e: + pytest.skip('[iam alt root] user does not belong to an account') + + yield client + nuke_users(client, PathPrefix=get_iam_path_prefix()) + nuke_roles(client, PathPrefix=get_iam_path_prefix()) + + +# test cross-account access, adding user policy before the bucket policy +def _test_cross_account_user_bucket_policy(roots3, alt_root, alt_name, alt_arn): + # add a user policy that allows s3 actions + alt_root.put_user_policy(UserName=alt_name, PolicyName='AllowStar', PolicyDocument=json.dumps({ + 'Version': '2012-10-17', + 'Statement': [{ + 'Effect': 'Allow', + 'Action': 's3:*', + 'Resource': '*' + }] + })) + + key = alt_root.create_access_key(UserName=alt_name)['AccessKey'] + alts3 = get_iam_s3client(aws_access_key_id=key['AccessKeyId'], + aws_secret_access_key=key['SecretAccessKey']) + + # create a bucket with the root user + bucket = get_new_bucket(roots3) + try: + # the access key may take a bit to start working. retry until it returns + # something other than InvalidAccessKeyId + e = assert_raises(ClientError, retry_on, 'InvalidAccessKeyId', 10, alts3.list_objects, Bucket=bucket) + status, error_code = _get_status_and_error_code(e.response) + assert status == 403 + assert error_code == 'AccessDenied' + + # add a bucket policy that allows s3:ListBucket for the iam user's arn + roots3.put_bucket_policy(Bucket=bucket, Policy=json.dumps({ + 'Version': '2012-10-17', + 'Statement': [{ + 'Effect': 'Allow', + 'Principal': {'AWS': alt_arn}, + 'Action': 's3:ListBucket', + 'Resource': f'arn:aws:s3:::{bucket}' + }] + })) + + # verify that the iam user can eventually access it + retry_on('AccessDenied', 10, alts3.list_objects, Bucket=bucket) + finally: + roots3.delete_bucket(Bucket=bucket) + +# test cross-account access, adding bucket policy before the user policy +def _test_cross_account_bucket_user_policy(roots3, alt_root, alt_name, alt_arn): + key = alt_root.create_access_key(UserName=alt_name)['AccessKey'] + alts3 = get_iam_s3client(aws_access_key_id=key['AccessKeyId'], + aws_secret_access_key=key['SecretAccessKey']) + + # create a bucket with the root user + bucket = get_new_bucket(roots3) + try: + # add a bucket policy that allows s3:ListBucket for the iam user's arn + roots3.put_bucket_policy(Bucket=bucket, Policy=json.dumps({ + 'Version': '2012-10-17', + 'Statement': [{ + 'Effect': 'Allow', + 'Principal': {'AWS': alt_arn}, + 'Action': 's3:ListBucket', + 'Resource': f'arn:aws:s3:::{bucket}' + }] + })) + + # the access key may take a bit to start working. retry until it returns + # something other than InvalidAccessKeyId + e = assert_raises(ClientError, retry_on, 'InvalidAccessKeyId', 10, alts3.list_objects, Bucket=bucket) + status, error_code = _get_status_and_error_code(e.response) + assert status == 403 + assert error_code == 'AccessDenied' + + # add a user policy that allows s3 actions + alt_root.put_user_policy(UserName=alt_name, PolicyName='AllowStar', PolicyDocument=json.dumps({ + 'Version': '2012-10-17', + 'Statement': [{ + 'Effect': 'Allow', + 'Action': 's3:*', + 'Resource': '*' + }] + })) + + # verify that the iam user can eventually access it + retry_on('AccessDenied', 10, alts3.list_objects, Bucket=bucket) + finally: + roots3.delete_bucket(Bucket=bucket) + +@pytest.mark.iam_account +@pytest.mark.iam_cross_account +def test_cross_account_bucket_user_policy_allow_user_arn(iam_root, iam_alt_root): + roots3 = get_iam_root_client(service_name='s3') + path = get_iam_path_prefix() + user_name = make_iam_name('AltUser') + response = iam_alt_root.create_user(UserName=user_name, Path=path) + user_arn = response['User']['Arn'] + _test_cross_account_bucket_user_policy(roots3, iam_alt_root, user_name, user_arn) + +@pytest.mark.iam_account +@pytest.mark.iam_cross_account +def test_cross_account_user_bucket_policy_allow_user_arn(iam_root, iam_alt_root): + roots3 = get_iam_root_client(service_name='s3') + path = get_iam_path_prefix() + user_name = make_iam_name('AltUser') + response = iam_alt_root.create_user(UserName=user_name, Path=path) + user_arn = response['User']['Arn'] + _test_cross_account_user_bucket_policy(roots3, iam_alt_root, user_name, user_arn) + +@pytest.mark.iam_account +@pytest.mark.iam_cross_account +def test_cross_account_user_bucket_policy_allow_account_arn(iam_root, iam_alt_root): + roots3 = get_iam_root_client(service_name='s3') + path = get_iam_path_prefix() + user_name = make_iam_name('AltUser') + response = iam_alt_root.create_user(UserName=user_name, Path=path) + user_arn = response['User']['Arn'] + account_arn = user_arn.replace(f':user{path}{user_name}', ':root') + _test_cross_account_user_bucket_policy(roots3, iam_alt_root, user_name, account_arn) + +@pytest.mark.iam_account +@pytest.mark.iam_cross_account +def test_cross_account_bucket_user_policy_allow_account_arn(iam_root, iam_alt_root): + roots3 = get_iam_root_client(service_name='s3') + path = get_iam_path_prefix() + user_name = make_iam_name('AltUser') + response = iam_alt_root.create_user(UserName=user_name, Path=path) + user_arn = response['User']['Arn'] + account_arn = user_arn.replace(f':user{path}{user_name}', ':root') + _test_cross_account_bucket_user_policy(roots3, iam_alt_root, user_name, account_arn) + +@pytest.mark.iam_account +@pytest.mark.iam_cross_account +def test_cross_account_user_bucket_policy_allow_account_id(iam_root, iam_alt_root): + roots3 = get_iam_root_client(service_name='s3') + path = get_iam_path_prefix() + user_name = make_iam_name('AltUser') + response = iam_alt_root.create_user(UserName=user_name, Path=path) + user_arn = response['User']['Arn'] + account_id = user_arn.removeprefix('arn:aws:iam::').removesuffix(f':user{path}{user_name}') + _test_cross_account_user_bucket_policy(roots3, iam_alt_root, user_name, account_id) + +@pytest.mark.iam_account +@pytest.mark.iam_cross_account +def test_cross_account_bucket_user_policy_allow_account_id(iam_root, iam_alt_root): + roots3 = get_iam_root_client(service_name='s3') + path = get_iam_path_prefix() + user_name = make_iam_name('AltUser') + response = iam_alt_root.create_user(UserName=user_name, Path=path) + user_arn = response['User']['Arn'] + account_id = user_arn.removeprefix('arn:aws:iam::').removesuffix(f':user{path}{user_name}') + _test_cross_account_bucket_user_policy(roots3, iam_alt_root, user_name, account_id) + + +# test cross-account access, adding user policy before the bucket acl +def _test_cross_account_user_policy_bucket_acl(roots3, alt_root, alt_name, grantee): + # add a user policy that allows s3 actions + alt_root.put_user_policy(UserName=alt_name, PolicyName='AllowStar', PolicyDocument=json.dumps({ + 'Version': '2012-10-17', + 'Statement': [{ + 'Effect': 'Allow', + 'Action': 's3:*', + 'Resource': '*' + }] + })) + + key = alt_root.create_access_key(UserName=alt_name)['AccessKey'] + alts3 = get_iam_s3client(aws_access_key_id=key['AccessKeyId'], + aws_secret_access_key=key['SecretAccessKey']) + + # create a bucket with the root user + bucket = get_new_bucket(roots3) + try: + # the access key may take a bit to start working. retry until it returns + # something other than InvalidAccessKeyId + e = assert_raises(ClientError, retry_on, 'InvalidAccessKeyId', 10, alts3.list_objects, Bucket=bucket) + status, error_code = _get_status_and_error_code(e.response) + assert status == 403 + assert error_code == 'AccessDenied' + + # add a bucket acl that grants READ access + roots3.put_bucket_acl(Bucket=bucket, GrantRead=grantee) + + # verify that the iam user can eventually access it + retry_on('AccessDenied', 10, alts3.list_objects, Bucket=bucket) + finally: + roots3.delete_bucket(Bucket=bucket) + +# test cross-account access, adding bucket acl before the user policy +def _test_cross_account_bucket_acl_user_policy(roots3, alt_root, alt_name, grantee): + key = alt_root.create_access_key(UserName=alt_name)['AccessKey'] + alts3 = get_iam_s3client(aws_access_key_id=key['AccessKeyId'], + aws_secret_access_key=key['SecretAccessKey']) + + # create a bucket with the root user + bucket = get_new_bucket(roots3) + try: + # add a bucket acl that grants READ access + roots3.put_bucket_acl(Bucket=bucket, GrantRead=grantee) + + # the access key may take a bit to start working. retry until it returns + # something other than InvalidAccessKeyId + e = assert_raises(ClientError, retry_on, 'InvalidAccessKeyId', 10, alts3.list_objects, Bucket=bucket) + status, error_code = _get_status_and_error_code(e.response) + assert status == 403 + assert error_code == 'AccessDenied' + + # add a user policy that allows s3 actions + alt_root.put_user_policy(UserName=alt_name, PolicyName='AllowStar', PolicyDocument=json.dumps({ + 'Version': '2012-10-17', + 'Statement': [{ + 'Effect': 'Allow', + 'Action': 's3:*', + 'Resource': '*' + }] + })) + + # verify that the iam user can eventually access it + retry_on('AccessDenied', 10, alts3.list_objects, Bucket=bucket) + finally: + roots3.delete_bucket(Bucket=bucket) + +@pytest.mark.iam_account +@pytest.mark.iam_cross_account +@pytest.mark.fails_on_aws # can't grant to individual users +def test_cross_account_bucket_acl_user_policy_grant_user_id(iam_root, iam_alt_root): + roots3 = get_iam_root_client(service_name='s3') + path = get_iam_path_prefix() + user_name = make_iam_name('AltUser') + response = iam_alt_root.create_user(UserName=user_name, Path=path) + grantee = 'id=' + response['User']['UserId'] + _test_cross_account_bucket_acl_user_policy(roots3, iam_alt_root, user_name, grantee) + +@pytest.mark.iam_account +@pytest.mark.iam_cross_account +@pytest.mark.fails_on_aws # can't grant to individual users +def test_cross_account_user_policy_bucket_acl_grant_user_id(iam_root, iam_alt_root): + roots3 = get_iam_root_client(service_name='s3') + path = get_iam_path_prefix() + user_name = make_iam_name('AltUser') + response = iam_alt_root.create_user(UserName=user_name, Path=path) + grantee = 'id=' + response['User']['UserId'] + _test_cross_account_user_policy_bucket_acl(roots3, iam_alt_root, user_name, grantee) + +@pytest.mark.iam_account +@pytest.mark.iam_cross_account +def test_cross_account_bucket_acl_user_policy_grant_canonical_id(iam_root, iam_alt_root): + roots3 = get_iam_root_client(service_name='s3') + path = get_iam_path_prefix() + user_name = make_iam_name('AltUser') + iam_alt_root.create_user(UserName=user_name, Path=path) + grantee = 'id=' + get_iam_alt_root_user_id() + _test_cross_account_bucket_acl_user_policy(roots3, iam_alt_root, user_name, grantee) + +@pytest.mark.iam_account +@pytest.mark.iam_cross_account +def test_cross_account_user_policy_bucket_acl_grant_canonical_id(iam_root, iam_alt_root): + roots3 = get_iam_root_client(service_name='s3') + path = get_iam_path_prefix() + user_name = make_iam_name('AltUser') + iam_alt_root.create_user(UserName=user_name, Path=path) + grantee = 'id=' + get_iam_alt_root_user_id() + _test_cross_account_user_policy_bucket_acl(roots3, iam_alt_root, user_name, grantee) + +@pytest.mark.iam_account +@pytest.mark.iam_cross_account +def test_cross_account_bucket_acl_user_policy_grant_account_email(iam_root, iam_alt_root): + roots3 = get_iam_root_client(service_name='s3') + path = get_iam_path_prefix() + user_name = make_iam_name('AltUser') + iam_alt_root.create_user(UserName=user_name, Path=path) + grantee = 'emailAddress=' + get_iam_alt_root_email() + _test_cross_account_bucket_acl_user_policy(roots3, iam_alt_root, user_name, grantee) + +@pytest.mark.iam_account +@pytest.mark.iam_cross_account +def test_cross_account_user_policy_bucket_acl_grant_account_email(iam_root, iam_alt_root): + roots3 = get_iam_root_client(service_name='s3') + path = get_iam_path_prefix() + user_name = make_iam_name('AltUser') + iam_alt_root.create_user(UserName=user_name, Path=path) + grantee = 'emailAddress=' + get_iam_alt_root_email() + _test_cross_account_user_policy_bucket_acl(roots3, iam_alt_root, user_name, grantee) + + +# test root cross-account access with bucket policy +def _test_cross_account_root_bucket_policy(roots3, alts3, alt_arn): + # create a bucket with the root user + bucket = get_new_bucket(roots3) + try: + e = assert_raises(ClientError, alts3.list_objects, Bucket=bucket) + status, error_code = _get_status_and_error_code(e.response) + assert status == 403 + assert error_code == 'AccessDenied' + + # add a bucket policy that allows s3:ListBucket for the iam user's arn + roots3.put_bucket_policy(Bucket=bucket, Policy=json.dumps({ + 'Version': '2012-10-17', + 'Statement': [{ + 'Effect': 'Allow', + 'Principal': {'AWS': alt_arn}, + 'Action': 's3:ListBucket', + 'Resource': f'arn:aws:s3:::{bucket}' + }] + })) + + # verify that the iam user can eventually access it + retry_on('AccessDenied', 10, alts3.list_objects, Bucket=bucket) + finally: + roots3.delete_bucket(Bucket=bucket) + +@pytest.mark.iam_account +@pytest.mark.iam_cross_account +def test_cross_account_root_bucket_policy_allow_account_arn(iam_root, iam_alt_root): + roots3 = get_iam_root_client(service_name='s3') + alts3 = get_iam_alt_root_client(service_name='s3') + alt_arn = iam_alt_root.get_user()['User']['Arn'] + _test_cross_account_root_bucket_policy(roots3, alts3, alt_arn) + +@pytest.mark.iam_account +@pytest.mark.iam_cross_account +def test_cross_account_root_bucket_policy_allow_account_id(iam_root, iam_alt_root): + roots3 = get_iam_root_client(service_name='s3') + alts3 = get_iam_alt_root_client(service_name='s3') + alt_arn = iam_alt_root.get_user()['User']['Arn'] + account_id = alt_arn.removeprefix('arn:aws:iam::').removesuffix(':root') + _test_cross_account_root_bucket_policy(roots3, alts3, account_id) + +# test root cross-account access with bucket acls +def _test_cross_account_root_bucket_acl(roots3, alts3, grantee): + # create a bucket with the root user + bucket = get_new_bucket(roots3) + try: + e = assert_raises(ClientError, alts3.list_objects, Bucket=bucket) + status, error_code = _get_status_and_error_code(e.response) + assert status == 403 + assert error_code == 'AccessDenied' + + # add a bucket acl that grants READ + roots3.put_bucket_acl(Bucket=bucket, GrantRead=grantee) + + # verify that the iam user can eventually access it + retry_on('AccessDenied', 10, alts3.list_objects, Bucket=bucket) + finally: + roots3.delete_bucket(Bucket=bucket) + +@pytest.mark.iam_account +@pytest.mark.iam_cross_account +def test_cross_account_root_bucket_acl_grant_canonical_id(iam_root, iam_alt_root): + roots3 = get_iam_root_client(service_name='s3') + alts3 = get_iam_alt_root_client(service_name='s3') + grantee = 'id=' + get_iam_alt_root_user_id() + _test_cross_account_root_bucket_acl(roots3, alts3, grantee) + +@pytest.mark.iam_account +@pytest.mark.iam_cross_account +def test_cross_account_root_bucket_acl_grant_account_email(iam_root, iam_alt_root): + roots3 = get_iam_root_client(service_name='s3') + alts3 = get_iam_alt_root_client(service_name='s3') + grantee = 'emailAddress=' + get_iam_alt_root_email() + _test_cross_account_root_bucket_acl(roots3, alts3, grantee) From d5791d8da6c83917264139b3b4b92baa3ba1b38a Mon Sep 17 00:00:00 2001 From: Casey Bodley Date: Tue, 6 Feb 2024 17:57:14 -0500 Subject: [PATCH 16/22] iam: add account test for OpenIDConnectProvider apis Signed-off-by: Casey Bodley --- s3tests_boto3/functional/test_iam.py | 46 ++++++++++++++++++++++++++++ 1 file changed, 46 insertions(+) diff --git a/s3tests_boto3/functional/test_iam.py b/s3tests_boto3/functional/test_iam.py index 4cfbfad..936dc24 100644 --- a/s3tests_boto3/functional/test_iam.py +++ b/s3tests_boto3/functional/test_iam.py @@ -967,6 +967,17 @@ def nuke_roles(client, **kwargs): except: pass +def nuke_oidc_providers(client, prefix): + result = client.list_open_id_connect_providers() + for provider in result['OpenIDConnectProviderList']: + arn = provider['Arn'] + if f':oidc-provider{prefix}' in arn: + try: + client.delete_open_id_connect_provider(OpenIDConnectProviderArn=arn) + except: + pass + + # fixture for iam account root user @pytest.fixture def iam_root(configfile): @@ -981,6 +992,7 @@ def iam_root(configfile): yield client nuke_users(client, PathPrefix=get_iam_path_prefix()) nuke_roles(client, PathPrefix=get_iam_path_prefix()) + nuke_oidc_providers(client, get_iam_path_prefix()) # IAM User apis @@ -1915,6 +1927,40 @@ def test_account_role_policy_allow(iam_root): retry_on('AccessDenied', 10, s3.list_buckets) +# IAM OpenIDConnectProvider apis +@pytest.mark.iam_account +def test_account_oidc_provider(iam_root): + url_host = get_iam_path_prefix()[1:] + 'example.com' + url = 'http://' + url_host + + response = iam_root.create_open_id_connect_provider( + ClientIDList=['my-application-id'], + ThumbprintList=['3768084dfb3d2b68b7897bf5f565da8efEXAMPLE'], + Url=url) + arn = response['OpenIDConnectProviderArn'] + assert arn.endswith(f':oidc-provider/{url_host}') + + response = iam_root.list_open_id_connect_providers() + arns = [p['Arn'] for p in response['OpenIDConnectProviderList']] + assert arn in arns + + response = iam_root.get_open_id_connect_provider(OpenIDConnectProviderArn=arn) + assert url == response['Url'] + assert ['my-application-id'] == response['ClientIDList'] + assert ['3768084dfb3d2b68b7897bf5f565da8efEXAMPLE'] == response['ThumbprintList'] + + iam_root.delete_open_id_connect_provider(OpenIDConnectProviderArn=arn) + + response = iam_root.list_open_id_connect_providers() + arns = [p['Arn'] for p in response['OpenIDConnectProviderList']] + assert arn not in arns + + with pytest.raises(iam_root.exceptions.NoSuchEntityException): + iam_root.get_open_id_connect_provider(OpenIDConnectProviderArn=arn) + with pytest.raises(iam_root.exceptions.NoSuchEntityException): + iam_root.delete_open_id_connect_provider(OpenIDConnectProviderArn=arn) + + # fixture for iam alt account root user @pytest.fixture def iam_alt_root(configfile): From 4ca7967ae7716df385b525c4a6a5e2aa60cd4033 Mon Sep 17 00:00:00 2001 From: Casey Bodley Date: Sun, 11 Feb 2024 11:50:43 -0500 Subject: [PATCH 17/22] iam: add account tests for Group apis Signed-off-by: Casey Bodley --- pytest.ini | 1 + s3tests_boto3/functional/test_iam.py | 155 +++++++++++++++++++++++++++ 2 files changed, 156 insertions(+) diff --git a/pytest.ini b/pytest.ini index 4c456be..5da379d 100644 --- a/pytest.ini +++ b/pytest.ini @@ -16,6 +16,7 @@ markers = fails_on_rgw fails_on_s3 fails_with_subdomain + group iam_account iam_cross_account iam_role diff --git a/s3tests_boto3/functional/test_iam.py b/s3tests_boto3/functional/test_iam.py index 936dc24..e873fe1 100644 --- a/s3tests_boto3/functional/test_iam.py +++ b/s3tests_boto3/functional/test_iam.py @@ -928,6 +928,32 @@ def nuke_users(client, **kwargs): except: pass +def nuke_group_users(client, name): + p = client.get_paginator('get_group') + for response in p.paginate(GroupName=name): + for user in response['Users']: + try: + client.remove_user_from_group(GroupName=name, UserName=user['UserName']) + except: + pass + +def nuke_group(client, name): + # delete group policies and remove all users + try: + nuke_group_users(client, name) + except: + pass + client.delete_group(GroupName=name) + +def nuke_groups(client, **kwargs): + p = client.get_paginator('list_groups') + for response in p.paginate(**kwargs): + for user in response['Groups']: + try: + nuke_group(client, user['GroupName']) + except: + pass + def nuke_role_policies(client, name): p = client.get_paginator('list_role_policies') for response in p.paginate(RoleName=name): @@ -991,6 +1017,7 @@ def iam_root(configfile): yield client nuke_users(client, PathPrefix=get_iam_path_prefix()) + nuke_groups(client, PathPrefix=get_iam_path_prefix()) nuke_roles(client, PathPrefix=get_iam_path_prefix()) nuke_oidc_providers(client, get_iam_path_prefix()) @@ -1572,6 +1599,134 @@ def test_account_user_policy_allow(iam_root): retry_on('AccessDenied', 10, client.list_buckets) +def group_list_names(client, **kwargs): + p = client.get_paginator('list_groups') + names = [] + for response in p.paginate(**kwargs): + names += [u['GroupName'] for u in response['Groups']] + return names + +# IAM Group apis +@pytest.mark.group +@pytest.mark.iam_account +def test_account_group_create(iam_root): + path = get_iam_path_prefix() + name = make_iam_name('G1') + + assert [] == group_list_names(iam_root, PathPrefix=path) + + response = iam_root.create_group(GroupName=name, Path=path) + group = response['Group'] + assert path == group['Path'] + assert name == group['GroupName'] + assert len(group['GroupId']) + arn = group['Arn'] + assert arn.startswith('arn:aws:iam:') + assert arn.endswith(f':group{path}{name}') + + with pytest.raises(iam_root.exceptions.EntityAlreadyExistsException): + iam_root.create_group(GroupName=name) + + response = iam_root.get_group(GroupName=name) + assert group == response['Group'] + + assert [name] == group_list_names(iam_root, PathPrefix=path) + + iam_root.delete_group(GroupName=name) + + with pytest.raises(iam_root.exceptions.NoSuchEntityException): + iam_root.get_group(GroupName=name) + + assert [] == group_list_names(iam_root, PathPrefix=path) + +@pytest.mark.iam_account +@pytest.mark.group +def test_account_group_case_insensitive_name(iam_root): + path = get_iam_path_prefix() + name_upper = make_iam_name('G1') + name_lower = make_iam_name('g1') + response = iam_root.create_group(GroupName=name_upper, Path=path) + group = response['Group'] + + with pytest.raises(iam_root.exceptions.EntityAlreadyExistsException): + iam_root.create_group(GroupName=name_lower) + + response = iam_root.get_group(GroupName=name_lower) + assert group == response['Group'] + + iam_root.delete_group(GroupName=name_lower) + + with pytest.raises(iam_root.exceptions.NoSuchEntityException): + iam_root.delete_group(GroupName=name_upper) + +@pytest.mark.iam_account +@pytest.mark.group +def test_account_group_list(iam_root): + path = get_iam_path_prefix() + response = iam_root.list_groups(PathPrefix=path) + assert len(response['Groups']) == 0 + assert response['IsTruncated'] == False + + name1 = make_iam_name('aa') + name2 = make_iam_name('Ab') + name3 = make_iam_name('ac') + name4 = make_iam_name('Ad') + + # sort order is independent of Path and GroupName capitalization + iam_root.create_group(GroupName=name4, Path=path+'w/') + iam_root.create_group(GroupName=name3, Path=path+'x/') + iam_root.create_group(GroupName=name2, Path=path+'y/') + iam_root.create_group(GroupName=name1, Path=path+'z/') + + assert [name1, name2, name3, name4] == \ + group_list_names(iam_root, PathPrefix=path) + assert [name1, name2, name3, name4] == \ + group_list_names(iam_root, PathPrefix=path, PaginationConfig={'PageSize': 1}) + +@pytest.mark.group +@pytest.mark.iam_account +def test_account_group_update(iam_root): + path = get_iam_path_prefix() + name = make_iam_name('G1') + response = iam_root.create_group(GroupName=name, Path=path) + group_id = response['Group']['GroupId'] + + username = make_iam_name('U1') + iam_root.create_user(UserName=username, Path=path) + + iam_root.add_user_to_group(GroupName=name, UserName=username) + + response = iam_root.list_groups_for_user(UserName=username) + groups = response['Groups'] + assert len(groups) == 1 + assert path == groups[0]['Path'] + assert name == groups[0]['GroupName'] + assert group_id == groups[0]['GroupId'] + + new_path = path + 'new/' + new_name = make_iam_name('NG1') + iam_root.update_group(GroupName=name, NewPath=new_path, NewGroupName=new_name) + + response = iam_root.get_group(GroupName=new_name) + group = response['Group'] + assert new_path == group['Path'] + assert new_name == group['GroupName'] + assert group_id == group['GroupId'] + arn = group['Arn'] + assert arn.startswith('arn:aws:iam:') + assert arn.endswith(f':group{new_path}{new_name}') + users = response['Users'] + assert len(users) == 1 + assert username == users[0]['UserName'] + + response = iam_root.list_groups_for_user(UserName=username) + groups = response['Groups'] + assert len(groups) == 1 + assert new_path == groups[0]['Path'] + assert new_name == groups[0]['GroupName'] + assert group_id == groups[0]['GroupId'] + + assume_role_policy = json.dumps({ 'Version': '2012-10-17', 'Statement': [{ From 7ebc530e042333636338d298cfeaefa58592b22d Mon Sep 17 00:00:00 2001 From: Casey Bodley Date: Sun, 11 Feb 2024 11:51:17 -0500 Subject: [PATCH 18/22] iam: add account tests for GroupPolicy apis Signed-off-by: Casey Bodley --- pytest.ini | 1 + s3tests_boto3/functional/test_iam.py | 220 +++++++++++++++++++++++++++ 2 files changed, 221 insertions(+) diff --git a/pytest.ini b/pytest.ini index 5da379d..c7e0418 100644 --- a/pytest.ini +++ b/pytest.ini @@ -17,6 +17,7 @@ markers = fails_on_s3 fails_with_subdomain group + group_policy iam_account iam_cross_account iam_role diff --git a/s3tests_boto3/functional/test_iam.py b/s3tests_boto3/functional/test_iam.py index e873fe1..313a896 100644 --- a/s3tests_boto3/functional/test_iam.py +++ b/s3tests_boto3/functional/test_iam.py @@ -928,6 +928,24 @@ def nuke_users(client, **kwargs): except: pass +def nuke_group_policies(client, name): + p = client.get_paginator('list_group_policies') + for response in p.paginate(GroupName=name): + for policy in response['PolicyNames']: + try: + client.delete_group_policy(GroupName=name, PolicyName=policy) + except: + pass + +def nuke_attached_group_policies(client, name): + p = client.get_paginator('list_attached_group_policies') + for response in p.paginate(GroupName=name): + for policy in response['AttachedPolicies']: + try: + client.detach_group_policy(GroupName=name, PolicyArn=policy['PolicyArn']) + except: + pass + def nuke_group_users(client, name): p = client.get_paginator('get_group') for response in p.paginate(GroupName=name): @@ -939,6 +957,14 @@ def nuke_group_users(client, name): def nuke_group(client, name): # delete group policies and remove all users + try: + nuke_group_policies(client, name) + except: + pass + try: + nuke_attached_group_policies(client, name) + except: + pass try: nuke_group_users(client, name) except: @@ -1727,6 +1753,200 @@ def test_account_group_update(iam_root): assert group_id == groups[0]['GroupId'] +# IAM GroupPolicy apis +@pytest.mark.group_policy +@pytest.mark.iam_account +def test_account_inline_group_policy(iam_root): + path = get_iam_path_prefix() + name = make_iam_name('name') + policy_name = 'List' + bucket_name = get_new_bucket_name() + policy1 = json.dumps({'Version': '2012-10-17', 'Statement': [ + {'Effect': 'Deny', + 'Action': 's3:ListBucket', + 'Resource': f'arn:aws:s3:::{bucket_name}'}]}) + policy2 = json.dumps({'Version': '2012-10-17', 'Statement': [ + {'Effect': 'Allow', + 'Action': 's3:ListBucket', + 'Resource': f'arn:aws:s3:::{bucket_name}'}]}) + + # Get/Put/Delete fail on nonexistent GroupName + with pytest.raises(iam_root.exceptions.NoSuchEntityException): + iam_root.get_group_policy(GroupName=name, PolicyName=policy_name) + with pytest.raises(iam_root.exceptions.NoSuchEntityException): + iam_root.delete_group_policy(GroupName=name, PolicyName=policy_name) + with pytest.raises(iam_root.exceptions.NoSuchEntityException): + iam_root.put_group_policy(GroupName=name, PolicyName=policy_name, PolicyDocument=policy1) + + iam_root.create_group(GroupName=name, Path=path) + + # Get/Delete fail on nonexistent PolicyName + with pytest.raises(iam_root.exceptions.NoSuchEntityException): + iam_root.get_group_policy(GroupName=name, PolicyName=policy_name) + with pytest.raises(iam_root.exceptions.NoSuchEntityException): + iam_root.delete_group_policy(GroupName=name, PolicyName=policy_name) + + iam_root.put_group_policy(GroupName=name, PolicyName=policy_name, PolicyDocument=policy1) + + response = iam_root.get_group_policy(GroupName=name, PolicyName=policy_name) + assert policy1 == json.dumps(response['PolicyDocument']) + response = iam_root.list_group_policies(GroupName=name) + assert [policy_name] == response['PolicyNames'] + + iam_root.put_group_policy(GroupName=name, PolicyName=policy_name, PolicyDocument=policy2) + + response = iam_root.get_group_policy(GroupName=name, PolicyName=policy_name) + assert policy2 == json.dumps(response['PolicyDocument']) + response = iam_root.list_group_policies(GroupName=name) + assert [policy_name] == response['PolicyNames'] + + # DeleteGroup fails while policies are still attached + with pytest.raises(iam_root.exceptions.DeleteConflictException): + iam_root.delete_group(GroupName=name) + + iam_root.delete_group_policy(GroupName=name, PolicyName=policy_name) + + # Get/Delete fail after Delete + with pytest.raises(iam_root.exceptions.NoSuchEntityException): + iam_root.get_group_policy(GroupName=name, PolicyName=policy_name) + with pytest.raises(iam_root.exceptions.NoSuchEntityException): + iam_root.delete_group_policy(GroupName=name, PolicyName=policy_name) + + response = iam_root.list_group_policies(GroupName=name) + assert [] == response['PolicyNames'] + +@pytest.mark.group_policy +@pytest.mark.iam_account +def test_account_managed_group_policy(iam_root): + path = get_iam_path_prefix() + name = make_iam_name('name') + policy1 = 'arn:aws:iam::aws:policy/AmazonS3FullAccess' + policy2 = 'arn:aws:iam::aws:policy/AmazonS3ReadOnlyAccess' + + # Attach/Detach/List fail on nonexistent GroupName + with pytest.raises(iam_root.exceptions.NoSuchEntityException): + iam_root.attach_group_policy(GroupName=name, PolicyArn=policy1) + with pytest.raises(iam_root.exceptions.NoSuchEntityException): + iam_root.detach_group_policy(GroupName=name, PolicyArn=policy1) + with pytest.raises(iam_root.exceptions.NoSuchEntityException): + iam_root.list_attached_group_policies(GroupName=name) + + iam_root.create_group(GroupName=name, Path=path) + + # Detach fails on unattached PolicyArn + with pytest.raises(iam_root.exceptions.NoSuchEntityException): + iam_root.detach_group_policy(GroupName=name, PolicyArn=policy1) + + iam_root.attach_group_policy(GroupName=name, PolicyArn=policy1) + iam_root.attach_group_policy(GroupName=name, PolicyArn=policy1) + + response = iam_root.list_attached_group_policies(GroupName=name) + assert len(response['AttachedPolicies']) == 1 + assert 'AmazonS3FullAccess' == response['AttachedPolicies'][0]['PolicyName'] + assert policy1 == response['AttachedPolicies'][0]['PolicyArn'] + + iam_root.attach_group_policy(GroupName=name, PolicyArn=policy2) + + response = iam_root.list_attached_group_policies(GroupName=name) + policies = response['AttachedPolicies'] + assert len(policies) == 2 + names = [p['PolicyName'] for p in policies] + arns = [p['PolicyArn'] for p in policies] + assert 'AmazonS3FullAccess' in names + assert policy1 in arns + assert 'AmazonS3ReadOnlyAccess' in names + assert policy2 in arns + + iam_root.detach_group_policy(GroupName=name, PolicyArn=policy2) + + # Detach fails after Detach + with pytest.raises(iam_root.exceptions.NoSuchEntityException): + iam_root.detach_group_policy(GroupName=name, PolicyArn=policy2) + + response = iam_root.list_attached_group_policies(GroupName=name) + assert len(response['AttachedPolicies']) == 1 + assert 'AmazonS3FullAccess' == response['AttachedPolicies'][0]['PolicyName'] + assert policy1 == response['AttachedPolicies'][0]['PolicyArn'] + + # DeleteGroup fails while policies are still attached + with pytest.raises(iam_root.exceptions.DeleteConflictException): + iam_root.delete_group(GroupName=name) + +@pytest.mark.group_policy +@pytest.mark.iam_account +def test_account_inline_group_policy_allow(iam_root): + path = get_iam_path_prefix() + username = make_iam_name('User') + groupname = make_iam_name('Group') + bucket_name = get_new_bucket_name() + + iam_root.create_user(UserName=username, Path=path) + + key = iam_root.create_access_key(UserName=username)['AccessKey'] + client = get_iam_s3client(aws_access_key_id=key['AccessKeyId'], + aws_secret_access_key=key['SecretAccessKey']) + + iam_root.create_group(GroupName=groupname, Path=path) + iam_root.add_user_to_group(GroupName=groupname, UserName=username) + + # the access key may take a bit to start working. retry until it returns + # something other than InvalidAccessKeyId + e = assert_raises(ClientError, retry_on, 'InvalidAccessKeyId', 10, client.list_buckets) + # expect AccessDenied because no identity policy allows s3 actions + status, error_code = _get_status_and_error_code(e.response) + assert status == 403 + assert error_code == 'AccessDenied' + + # add a group policy that allows s3 actions + policy = json.dumps({ + 'Version': '2012-10-17', + 'Statement': [{ + 'Effect': 'Allow', + 'Action': 's3:*', + 'Resource': '*' + }] + }) + policy_name = 'AllowStar' + iam_root.put_group_policy(GroupName=groupname, PolicyName=policy_name, PolicyDocument=policy) + + # the policy may take a bit to start working. retry until it returns + # something other than AccessDenied + retry_on('AccessDenied', 10, client.list_buckets) + +@pytest.mark.group_policy +@pytest.mark.iam_account +def test_account_managed_group_policy_allow(iam_root): + path = get_iam_path_prefix() + username = make_iam_name('User') + groupname = make_iam_name('Group') + bucket_name = get_new_bucket_name() + + iam_root.create_user(UserName=username, Path=path) + + key = iam_root.create_access_key(UserName=username)['AccessKey'] + client = get_iam_s3client(aws_access_key_id=key['AccessKeyId'], + aws_secret_access_key=key['SecretAccessKey']) + + iam_root.create_group(GroupName=groupname, Path=path) + iam_root.add_user_to_group(GroupName=groupname, UserName=username) + + # the access key may take a bit to start working. retry until it returns + # something other than InvalidAccessKeyId + e = assert_raises(ClientError, retry_on, 'InvalidAccessKeyId', 10, client.list_buckets) + # expect AccessDenied because no identity policy allows s3 actions + status, error_code = _get_status_and_error_code(e.response) + assert status == 403 + assert error_code == 'AccessDenied' + + # add a group policy that allows s3 read actions + policy_arn = 'arn:aws:iam::aws:policy/AmazonS3ReadOnlyAccess' + iam_root.attach_group_policy(GroupName=groupname, PolicyArn=policy_arn) + + # the policy may take a bit to start working. retry until it returns + # something other than AccessDenied + retry_on('AccessDenied', 10, client.list_buckets) + + assume_role_policy = json.dumps({ 'Version': '2012-10-17', 'Statement': [{ From a3a16eb66a020ce079a9ce1cc9398921e264ca3d Mon Sep 17 00:00:00 2001 From: Casey Bodley Date: Wed, 21 Feb 2024 09:36:06 -0500 Subject: [PATCH 19/22] iam: test cross-account policy with assumed role Signed-off-by: Casey Bodley --- s3tests_boto3/functional/test_iam.py | 282 +++++++++++++++++++++++++++ 1 file changed, 282 insertions(+) diff --git a/s3tests_boto3/functional/test_iam.py b/s3tests_boto3/functional/test_iam.py index 313a896..101cfb8 100644 --- a/s3tests_boto3/functional/test_iam.py +++ b/s3tests_boto3/functional/test_iam.py @@ -2301,6 +2301,288 @@ def test_account_role_policy_allow(iam_root): # something other than AccessDenied retry_on('AccessDenied', 10, s3.list_buckets) +# alt account user assumes main account role to access main account bucket +@pytest.mark.iam_account +@pytest.mark.iam_cross_account +@pytest.mark.iam_role +@pytest.mark.role_policy +def test_same_account_role_policy_allow(iam_root, iam_alt_root): + path = get_iam_path_prefix() + user_name = make_iam_name('AltUser') + role_name = make_iam_name('MyRole') + session_name = 'MySession' + bucket_name = get_new_bucket_name() + + user = iam_alt_root.create_user(UserName=user_name, Path=path)['User'] + user_arn = user['Arn'] + key = iam_alt_root.create_access_key(UserName=user_name)['AccessKey'] + + s3_main = get_iam_root_client(service_name='s3') + s3_main.create_bucket(Bucket=bucket_name) + + trust_policy = json.dumps({ + 'Version': '2012-10-17', + 'Statement': [{ + 'Effect': 'Allow', + 'Action': 'sts:AssumeRole', + 'Principal': {'AWS': user_arn} + }] + }) + # returns MalformedPolicyDocument until the user arn starts working + role = retry_on('MalformedPolicyDocument', 10, iam_root.create_role, + RoleName=role_name, Path=path, AssumeRolePolicyDocument=trust_policy)['Role'] + role_arn = role['Arn'] + + sts = get_sts_client(aws_access_key_id=key['AccessKeyId'], + aws_secret_access_key=key['SecretAccessKey']) + + # returns InvalidClientTokenId or AccessDenied until the access key starts working + response = retry_on(('InvalidClientTokenId', 'AccessDenied'), 10, sts.assume_role, + RoleArn=role_arn, RoleSessionName=session_name) + creds = response['Credentials'] + + s3 = get_iam_s3client(aws_access_key_id = creds['AccessKeyId'], + aws_secret_access_key = creds['SecretAccessKey'], + aws_session_token = creds['SessionToken']) + + # expect AccessDenied because no identity policy allows s3 actions + e = assert_raises(ClientError, s3.list_objects, Bucket=bucket_name) + status, error_code = _get_status_and_error_code(e.response) + assert status == 403 + assert error_code == 'AccessDenied' + + policy_name = 'AllowListBucket' + policy = json.dumps({ + 'Version': '2012-10-17', + 'Statement': [{ + 'Effect': 'Allow', + 'Action': 's3:ListBucket', + 'Resource': '*' + }] + }) + iam_root.put_role_policy(RoleName=role_name, PolicyName=policy_name, PolicyDocument=policy) + + # the policy may take a bit to start working. retry until it returns + # something other than AccessDenied + retry_on('AccessDenied', 10, s3.list_objects, Bucket=bucket_name) + +# alt account user assumes main account role to access alt account bucket +@pytest.mark.iam_account +@pytest.mark.iam_cross_account +@pytest.mark.iam_role +@pytest.mark.role_policy +def test_cross_account_role_policy_allow(iam_root, iam_alt_root): + path = get_iam_path_prefix() + user_name = make_iam_name('AltUser') + role_name = make_iam_name('MyRole') + session_name = 'MySession' + bucket_name = get_new_bucket_name() + + user = iam_alt_root.create_user(UserName=user_name, Path=path)['User'] + user_arn = user['Arn'] + key = iam_alt_root.create_access_key(UserName=user_name)['AccessKey'] + + s3_alt = get_iam_alt_root_client(service_name='s3') + s3_alt.create_bucket(Bucket=bucket_name) + + trust_policy = json.dumps({ + 'Version': '2012-10-17', + 'Statement': [{ + 'Effect': 'Allow', + 'Action': 'sts:AssumeRole', + 'Principal': {'AWS': user_arn} + }] + }) + # returns MalformedPolicyDocument until the user arn starts working + role = retry_on('MalformedPolicyDocument', 10, iam_root.create_role, + RoleName=role_name, Path=path, AssumeRolePolicyDocument=trust_policy)['Role'] + role_arn = role['Arn'] + + sts = get_sts_client(aws_access_key_id=key['AccessKeyId'], + aws_secret_access_key=key['SecretAccessKey']) + + # returns InvalidClientTokenId or AccessDenied until the access key starts working + response = retry_on(('InvalidClientTokenId', 'AccessDenied'), 10, sts.assume_role, + RoleArn=role_arn, RoleSessionName=session_name) + creds = response['Credentials'] + + s3 = get_iam_s3client(aws_access_key_id = creds['AccessKeyId'], + aws_secret_access_key = creds['SecretAccessKey'], + aws_session_token = creds['SessionToken']) + + # expect AccessDenied because no identity policy allows s3 actions + e = assert_raises(ClientError, s3.list_objects, Bucket=bucket_name) + status, error_code = _get_status_and_error_code(e.response) + assert status == 403 + assert error_code == 'AccessDenied' + + policy_name = 'AllowListBucket' + policy = json.dumps({ + 'Version': '2012-10-17', + 'Statement': [{ + 'Effect': 'Allow', + 'Action': 's3:ListBucket', + 'Resource': '*' + }] + }) + iam_root.put_role_policy(RoleName=role_name, PolicyName=policy_name, PolicyDocument=policy) + + # expect AccessDenied because no resource policy allows the main account + e = assert_raises(ClientError, s3.list_objects, Bucket=bucket_name) + status, error_code = _get_status_and_error_code(e.response) + assert status == 403 + assert error_code == 'AccessDenied' + + # add a bucket policy that allows s3:ListBucket for the main account's arn + main_arn = iam_root.get_user()['User']['Arn'] + s3_alt.put_bucket_policy(Bucket=bucket_name, Policy=json.dumps({ + 'Version': '2012-10-17', + 'Statement': [{ + 'Effect': 'Allow', + 'Principal': {'AWS': main_arn}, + 'Action': 's3:ListBucket', + 'Resource': f'arn:aws:s3:::{bucket_name}' + }] + })) + + # the policy may take a bit to start working. retry until it returns + # something other than AccessDenied + retry_on('AccessDenied', 10, s3.list_objects, Bucket=bucket_name) + +# alt account user assumes main account role to create a bucket +@pytest.mark.iam_account +@pytest.mark.iam_cross_account +@pytest.mark.iam_role +@pytest.mark.role_policy +def test_account_role_policy_allow_create_bucket(iam_root, iam_alt_root): + path = get_iam_path_prefix() + user_name = make_iam_name('AltUser') + role_name = make_iam_name('MyRole') + session_name = 'MySession' + bucket_name = get_new_bucket_name() + + user = iam_alt_root.create_user(UserName=user_name, Path=path)['User'] + user_arn = user['Arn'] + key = iam_alt_root.create_access_key(UserName=user_name)['AccessKey'] + + trust_policy = json.dumps({ + 'Version': '2012-10-17', + 'Statement': [{ + 'Effect': 'Allow', + 'Action': 'sts:AssumeRole', + 'Principal': {'AWS': user_arn} + }] + }) + # returns MalformedPolicyDocument until the user arn starts working + role = retry_on('MalformedPolicyDocument', 10, iam_root.create_role, + RoleName=role_name, Path=path, AssumeRolePolicyDocument=trust_policy)['Role'] + role_arn = role['Arn'] + + sts = get_sts_client(aws_access_key_id=key['AccessKeyId'], + aws_secret_access_key=key['SecretAccessKey']) + + # returns InvalidClientTokenId or AccessDenied until the access key starts working + response = retry_on(('InvalidClientTokenId', 'AccessDenied'), 10, sts.assume_role, + RoleArn=role_arn, RoleSessionName=session_name) + creds = response['Credentials'] + + s3 = get_iam_s3client(aws_access_key_id = creds['AccessKeyId'], + aws_secret_access_key = creds['SecretAccessKey'], + aws_session_token = creds['SessionToken']) + + # expect AccessDenied because no identity policy allows s3 actions + e = assert_raises(ClientError, s3.create_bucket, Bucket=bucket_name, ObjectOwnership='ObjectWriter', ACL='private') + status, error_code = _get_status_and_error_code(e.response) + assert status == 403 + assert error_code == 'AccessDenied' + + policy_name = 'AllowCreateBucket' + policy = json.dumps({ + 'Version': '2012-10-17', + 'Statement': [{ + 'Effect': 'Allow', + 'Action': ['s3:CreateBucket', 's3:PutBucketAcl'], + 'Resource': '*' + }] + }) + iam_root.put_role_policy(RoleName=role_name, PolicyName=policy_name, PolicyDocument=policy) + + # the policy may take a bit to start working. retry until it returns + # something other than AccessDenied + retry_on('AccessDenied', 10, s3.create_bucket, Bucket=bucket_name, ObjectOwnership='ObjectWriter', ACL='private') + + # verify that the bucket is owned by the role's account + s3_main = get_iam_root_client(service_name='s3') + response = s3_main.get_bucket_acl(Bucket=bucket_name) + + main_arn = iam_root.get_user()['User']['Arn'] + account_id = main_arn.removeprefix('arn:aws:iam::').removesuffix(':root') + assert response['Owner']['ID'] == account_id + assert response['Grants'][0]['Grantee']['ID'] == account_id + +# alt account user assumes main account role to read the role info +@pytest.mark.iam_account +@pytest.mark.iam_cross_account +@pytest.mark.iam_role +@pytest.mark.role_policy +def test_account_role_policy_allow_get_role(iam_root, iam_alt_root): + path = get_iam_path_prefix() + user_name = make_iam_name('AltUser') + role_name = make_iam_name('MyRole') + session_name = 'MySession' + bucket_name = get_new_bucket_name() + + user = iam_alt_root.create_user(UserName=user_name, Path=path)['User'] + user_arn = user['Arn'] + key = iam_alt_root.create_access_key(UserName=user_name)['AccessKey'] + + trust_policy = json.dumps({ + 'Version': '2012-10-17', + 'Statement': [{ + 'Effect': 'Allow', + 'Action': 'sts:AssumeRole', + 'Principal': {'AWS': user_arn} + }] + }) + # returns MalformedPolicyDocument until the user arn starts working + role = retry_on('MalformedPolicyDocument', 10, iam_root.create_role, + RoleName=role_name, Path=path, AssumeRolePolicyDocument=trust_policy)['Role'] + role_arn = role['Arn'] + + sts = get_sts_client(aws_access_key_id=key['AccessKeyId'], + aws_secret_access_key=key['SecretAccessKey']) + + # returns InvalidClientTokenId or AccessDenied until the access key starts working + response = retry_on(('InvalidClientTokenId', 'AccessDenied'), 10, sts.assume_role, + RoleArn=role_arn, RoleSessionName=session_name) + creds = response['Credentials'] + + iam = get_iam_root_client(service_name='iam', + aws_access_key_id = creds['AccessKeyId'], + aws_secret_access_key = creds['SecretAccessKey'], + aws_session_token = creds['SessionToken']) + + # expect AccessDenied because no identity policy allows iam actions + e = assert_raises(ClientError, iam.get_role, RoleName=role_name) + status, error_code = _get_status_and_error_code(e.response) + assert status == 403 + assert error_code == 'AccessDenied' + + policy_name = 'AllowGetRole' + policy = json.dumps({ + 'Version': '2012-10-17', + 'Statement': [{ + 'Effect': 'Allow', + 'Action': 'iam:GetRole', + 'Resource': '*' + }] + }) + iam_root.put_role_policy(RoleName=role_name, PolicyName=policy_name, PolicyDocument=policy) + + # the policy may take a bit to start working. retry until it returns + # something other than AccessDenied + retry_on('AccessDenied', 10, iam.get_role, RoleName=role_name) + # IAM OpenIDConnectProvider apis @pytest.mark.iam_account From 96d658444ac1b70a28acb5d0e88460f89d71aa0a Mon Sep 17 00:00:00 2001 From: Casey Bodley Date: Fri, 1 Mar 2024 09:59:09 -0500 Subject: [PATCH 20/22] s3: remove test_bucket_acl_no_grants() aws doesn't consult acls for same-account access. rgw doesn't for account users either Fixes: https://github.com/ceph/s3-tests/issues/184 Signed-off-by: Casey Bodley --- s3tests_boto3/functional/test_s3.py | 34 ----------------------------- 1 file changed, 34 deletions(-) diff --git a/s3tests_boto3/functional/test_s3.py b/s3tests_boto3/functional/test_s3.py index 4a188bf..997195e 100644 --- a/s3tests_boto3/functional/test_s3.py +++ b/s3tests_boto3/functional/test_s3.py @@ -4673,40 +4673,6 @@ def test_bucket_acl_grant_nonexist_user(): assert status == 400 assert error_code == 'InvalidArgument' -def test_bucket_acl_no_grants(): - bucket_name = get_new_bucket() - client = get_client() - - client.put_object(Bucket=bucket_name, Key='foo', Body='bar') - response = client.get_bucket_acl(Bucket=bucket_name) - old_grants = response['Grants'] - policy = {} - policy['Owner'] = response['Owner'] - # clear grants - policy['Grants'] = [] - - # remove read/write permission - response = client.put_bucket_acl(Bucket=bucket_name, AccessControlPolicy=policy) - - # can read - client.get_object(Bucket=bucket_name, Key='foo') - - # can't write - check_access_denied(client.put_object, Bucket=bucket_name, Key='baz', Body='a') - - #TODO fix this test once a fix is in for same issues in - # test_access_bucket_private_object_private - client2 = get_client() - # owner can read acl - client2.get_bucket_acl(Bucket=bucket_name) - - # owner can write acl - client2.put_bucket_acl(Bucket=bucket_name, ACL='private') - - # set policy back to original so that bucket can be cleaned up - policy['Grants'] = old_grants - client2.put_bucket_acl(Bucket=bucket_name, AccessControlPolicy=policy) - def _get_acl_header(user_id=None, perms=None): all_headers = ["read", "write", "read-acp", "write-acp", "full-control"] headers = [] From 7bd4b0ee142b2c58f9d8a700cdbadafb9a4387a4 Mon Sep 17 00:00:00 2001 From: Casey Bodley Date: Tue, 12 Mar 2024 15:47:51 -0400 Subject: [PATCH 21/22] iam: move iam_root, iam_alt_root fixtures to iam.py Signed-off-by: Casey Bodley --- s3tests_boto3/functional/iam.py | 199 +++++++++++++++++++++++++++ s3tests_boto3/functional/test_iam.py | 191 +------------------------ 2 files changed, 200 insertions(+), 190 deletions(-) create mode 100644 s3tests_boto3/functional/iam.py diff --git a/s3tests_boto3/functional/iam.py b/s3tests_boto3/functional/iam.py new file mode 100644 index 0000000..a070e5d --- /dev/null +++ b/s3tests_boto3/functional/iam.py @@ -0,0 +1,199 @@ +from botocore.exceptions import ClientError +import pytest + +from . import ( + configfile, + get_iam_root_client, + get_iam_root_user_id, + get_iam_root_email, + get_iam_alt_root_client, + get_iam_alt_root_user_id, + get_iam_alt_root_email, + get_iam_path_prefix, +) + +def nuke_user_keys(client, name): + p = client.get_paginator('list_access_keys') + for response in p.paginate(UserName=name): + for key in response['AccessKeyMetadata']: + try: + client.delete_access_key(UserName=name, AccessKeyId=key['AccessKeyId']) + except: + pass + +def nuke_user_policies(client, name): + p = client.get_paginator('list_user_policies') + for response in p.paginate(UserName=name): + for policy in response['PolicyNames']: + try: + client.delete_user_policy(UserName=name, PolicyName=policy) + except: + pass + +def nuke_attached_user_policies(client, name): + p = client.get_paginator('list_attached_user_policies') + for response in p.paginate(UserName=name): + for policy in response['AttachedPolicies']: + try: + client.detach_user_policy(UserName=name, PolicyArn=policy['PolicyArn']) + except: + pass + +def nuke_user(client, name): + # delete access keys, user policies, etc + try: + nuke_user_keys(client, name) + except: + pass + try: + nuke_user_policies(client, name) + except: + pass + try: + nuke_attached_user_policies(client, name) + except: + pass + client.delete_user(UserName=name) + +def nuke_users(client, **kwargs): + p = client.get_paginator('list_users') + for response in p.paginate(**kwargs): + for user in response['Users']: + try: + nuke_user(client, user['UserName']) + except: + pass + +def nuke_group_policies(client, name): + p = client.get_paginator('list_group_policies') + for response in p.paginate(GroupName=name): + for policy in response['PolicyNames']: + try: + client.delete_group_policy(GroupName=name, PolicyName=policy) + except: + pass + +def nuke_attached_group_policies(client, name): + p = client.get_paginator('list_attached_group_policies') + for response in p.paginate(GroupName=name): + for policy in response['AttachedPolicies']: + try: + client.detach_group_policy(GroupName=name, PolicyArn=policy['PolicyArn']) + except: + pass + +def nuke_group_users(client, name): + p = client.get_paginator('get_group') + for response in p.paginate(GroupName=name): + for user in response['Users']: + try: + client.remove_user_from_group(GroupName=name, UserName=user['UserName']) + except: + pass + +def nuke_group(client, name): + # delete group policies and remove all users + try: + nuke_group_policies(client, name) + except: + pass + try: + nuke_attached_group_policies(client, name) + except: + pass + try: + nuke_group_users(client, name) + except: + pass + client.delete_group(GroupName=name) + +def nuke_groups(client, **kwargs): + p = client.get_paginator('list_groups') + for response in p.paginate(**kwargs): + for user in response['Groups']: + try: + nuke_group(client, user['GroupName']) + except: + pass + +def nuke_role_policies(client, name): + p = client.get_paginator('list_role_policies') + for response in p.paginate(RoleName=name): + for policy in response['PolicyNames']: + try: + client.delete_role_policy(RoleName=name, PolicyName=policy) + except: + pass + +def nuke_attached_role_policies(client, name): + p = client.get_paginator('list_attached_role_policies') + for response in p.paginate(RoleName=name): + for policy in response['AttachedPolicies']: + try: + client.detach_role_policy(RoleName=name, PolicyArn=policy['PolicyArn']) + except: + pass + +def nuke_role(client, name): + # delete role policies, etc + try: + nuke_role_policies(client, name) + except: + pass + try: + nuke_attached_role_policies(client, name) + except: + pass + client.delete_role(RoleName=name) + +def nuke_roles(client, **kwargs): + p = client.get_paginator('list_roles') + for response in p.paginate(**kwargs): + for role in response['Roles']: + try: + nuke_role(client, role['RoleName']) + except: + pass + +def nuke_oidc_providers(client, prefix): + result = client.list_open_id_connect_providers() + for provider in result['OpenIDConnectProviderList']: + arn = provider['Arn'] + if f':oidc-provider{prefix}' in arn: + try: + client.delete_open_id_connect_provider(OpenIDConnectProviderArn=arn) + except: + pass + + +# fixture for iam account root user +@pytest.fixture +def iam_root(configfile): + client = get_iam_root_client() + try: + arn = client.get_user()['User']['Arn'] + if not arn.endswith(':root'): + pytest.skip('[iam root] user does not have :root arn') + except ClientError as e: + pytest.skip('[iam root] user does not belong to an account') + + yield client + nuke_users(client, PathPrefix=get_iam_path_prefix()) + nuke_groups(client, PathPrefix=get_iam_path_prefix()) + nuke_roles(client, PathPrefix=get_iam_path_prefix()) + nuke_oidc_providers(client, get_iam_path_prefix()) + +# fixture for iam alt account root user +@pytest.fixture +def iam_alt_root(configfile): + client = get_iam_alt_root_client() + try: + arn = client.get_user()['User']['Arn'] + if not arn.endswith(':root'): + pytest.skip('[iam alt root] user does not have :root arn') + except ClientError as e: + pytest.skip('[iam alt root] user does not belong to an account') + + yield client + nuke_users(client, PathPrefix=get_iam_path_prefix()) + nuke_roles(client, PathPrefix=get_iam_path_prefix()) diff --git a/s3tests_boto3/functional/test_iam.py b/s3tests_boto3/functional/test_iam.py index 101cfb8..fb288ce 100644 --- a/s3tests_boto3/functional/test_iam.py +++ b/s3tests_boto3/functional/test_iam.py @@ -13,8 +13,6 @@ from . import ( get_alt_client, get_iam_client, get_iam_root_client, - get_iam_root_user_id, - get_iam_root_email, get_iam_alt_root_client, get_iam_alt_root_user_id, get_iam_alt_root_email, @@ -28,6 +26,7 @@ from . import ( get_sts_client, ) from .utils import _get_status, _get_status_and_error_code +from .iam import iam_root, iam_alt_root @pytest.mark.user_policy @@ -876,178 +875,6 @@ def test_verify_allow_iam_actions(): assert response['ResponseMetadata']['HTTPStatusCode'] == 200 -def nuke_user_keys(client, name): - p = client.get_paginator('list_access_keys') - for response in p.paginate(UserName=name): - for key in response['AccessKeyMetadata']: - try: - client.delete_access_key(UserName=name, AccessKeyId=key['AccessKeyId']) - except: - pass - -def nuke_user_policies(client, name): - p = client.get_paginator('list_user_policies') - for response in p.paginate(UserName=name): - for policy in response['PolicyNames']: - try: - client.delete_user_policy(UserName=name, PolicyName=policy) - except: - pass - -def nuke_attached_user_policies(client, name): - p = client.get_paginator('list_attached_user_policies') - for response in p.paginate(UserName=name): - for policy in response['AttachedPolicies']: - try: - client.detach_user_policy(UserName=name, PolicyArn=policy['PolicyArn']) - except: - pass - -def nuke_user(client, name): - # delete access keys, user policies, etc - try: - nuke_user_keys(client, name) - except: - pass - try: - nuke_user_policies(client, name) - except: - pass - try: - nuke_attached_user_policies(client, name) - except: - pass - client.delete_user(UserName=name) - -def nuke_users(client, **kwargs): - p = client.get_paginator('list_users') - for response in p.paginate(**kwargs): - for user in response['Users']: - try: - nuke_user(client, user['UserName']) - except: - pass - -def nuke_group_policies(client, name): - p = client.get_paginator('list_group_policies') - for response in p.paginate(GroupName=name): - for policy in response['PolicyNames']: - try: - client.delete_group_policy(GroupName=name, PolicyName=policy) - except: - pass - -def nuke_attached_group_policies(client, name): - p = client.get_paginator('list_attached_group_policies') - for response in p.paginate(GroupName=name): - for policy in response['AttachedPolicies']: - try: - client.detach_group_policy(GroupName=name, PolicyArn=policy['PolicyArn']) - except: - pass - -def nuke_group_users(client, name): - p = client.get_paginator('get_group') - for response in p.paginate(GroupName=name): - for user in response['Users']: - try: - client.remove_user_from_group(GroupName=name, UserName=user['UserName']) - except: - pass - -def nuke_group(client, name): - # delete group policies and remove all users - try: - nuke_group_policies(client, name) - except: - pass - try: - nuke_attached_group_policies(client, name) - except: - pass - try: - nuke_group_users(client, name) - except: - pass - client.delete_group(GroupName=name) - -def nuke_groups(client, **kwargs): - p = client.get_paginator('list_groups') - for response in p.paginate(**kwargs): - for user in response['Groups']: - try: - nuke_group(client, user['GroupName']) - except: - pass - -def nuke_role_policies(client, name): - p = client.get_paginator('list_role_policies') - for response in p.paginate(RoleName=name): - for policy in response['PolicyNames']: - try: - client.delete_role_policy(RoleName=name, PolicyName=policy) - except: - pass - -def nuke_attached_role_policies(client, name): - p = client.get_paginator('list_attached_role_policies') - for response in p.paginate(RoleName=name): - for policy in response['AttachedPolicies']: - try: - client.detach_role_policy(RoleName=name, PolicyArn=policy['PolicyArn']) - except: - pass - -def nuke_role(client, name): - # delete role policies, etc - try: - nuke_role_policies(client, name) - except: - pass - try: - nuke_attached_role_policies(client, name) - except: - pass - client.delete_role(RoleName=name) - -def nuke_roles(client, **kwargs): - p = client.get_paginator('list_roles') - for response in p.paginate(**kwargs): - for role in response['Roles']: - try: - nuke_role(client, role['RoleName']) - except: - pass - -def nuke_oidc_providers(client, prefix): - result = client.list_open_id_connect_providers() - for provider in result['OpenIDConnectProviderList']: - arn = provider['Arn'] - if f':oidc-provider{prefix}' in arn: - try: - client.delete_open_id_connect_provider(OpenIDConnectProviderArn=arn) - except: - pass - - -# fixture for iam account root user -@pytest.fixture -def iam_root(configfile): - client = get_iam_root_client() - try: - arn = client.get_user()['User']['Arn'] - if not arn.endswith(':root'): - pytest.skip('[iam root] user does not have :root arn') - except ClientError as e: - pytest.skip('[iam root] user does not belong to an account') - - yield client - nuke_users(client, PathPrefix=get_iam_path_prefix()) - nuke_groups(client, PathPrefix=get_iam_path_prefix()) - nuke_roles(client, PathPrefix=get_iam_path_prefix()) - nuke_oidc_providers(client, get_iam_path_prefix()) - - # IAM User apis @pytest.mark.iam_account @pytest.mark.iam_user @@ -2618,22 +2445,6 @@ def test_account_oidc_provider(iam_root): iam_root.delete_open_id_connect_provider(OpenIDConnectProviderArn=arn) -# fixture for iam alt account root user -@pytest.fixture -def iam_alt_root(configfile): - client = get_iam_alt_root_client() - try: - arn = client.get_user()['User']['Arn'] - if not arn.endswith(':root'): - pytest.skip('[iam alt root] user does not have :root arn') - except ClientError as e: - pytest.skip('[iam alt root] user does not belong to an account') - - yield client - nuke_users(client, PathPrefix=get_iam_path_prefix()) - nuke_roles(client, PathPrefix=get_iam_path_prefix()) - - # test cross-account access, adding user policy before the bucket policy def _test_cross_account_user_bucket_policy(roots3, alt_root, alt_name, alt_arn): # add a user policy that allows s3 actions From dfabbf5a8d835424147338693b4033c708b24543 Mon Sep 17 00:00:00 2001 From: Casey Bodley Date: Tue, 12 Mar 2024 20:05:15 -0400 Subject: [PATCH 22/22] sns: add test_sns.py for simple topic testing Signed-off-by: Casey Bodley --- pytest.ini | 2 +- s3tests_boto3/functional/test_sns.py | 159 +++++++++++++++++++++++++++ 2 files changed, 160 insertions(+), 1 deletion(-) create mode 100644 s3tests_boto3/functional/test_sns.py diff --git a/pytest.ini b/pytest.ini index c7e0418..73d1563 100644 --- a/pytest.ini +++ b/pytest.ini @@ -34,7 +34,7 @@ markers = s3website s3website_routing_rules s3website_redirect_location - 3website + sns sse_s3 storage_class tagging diff --git a/s3tests_boto3/functional/test_sns.py b/s3tests_boto3/functional/test_sns.py new file mode 100644 index 0000000..360f14e --- /dev/null +++ b/s3tests_boto3/functional/test_sns.py @@ -0,0 +1,159 @@ +import json +import pytest +from botocore.exceptions import ClientError +from . import ( + configfile, + get_iam_root_client, + get_iam_alt_root_client, + get_new_bucket_name, + get_prefix, + nuke_prefixed_buckets, +) +from .iam import iam_root, iam_alt_root +from .utils import assert_raises, _get_status_and_error_code + +def get_new_topic_name(): + return get_new_bucket_name() + +def nuke_topics(client, prefix): + p = client.get_paginator('list_topics') + for response in p.paginate(): + for topic in response['Topics']: + arn = topic['TopicArn'] + if prefix not in arn: + pass + try: + client.delete_topic(TopicArn=arn) + except: + pass + +@pytest.fixture +def sns(iam_root): + client = get_iam_root_client(service_name='sns') + yield client + nuke_topics(client, get_prefix()) + +@pytest.fixture +def sns_alt(iam_alt_root): + client = get_iam_alt_root_client(service_name='sns') + yield client + nuke_topics(client, get_prefix()) + +@pytest.fixture +def s3(iam_root): + client = get_iam_root_client(service_name='s3') + yield client + nuke_prefixed_buckets(get_prefix(), client) + +@pytest.fixture +def s3_alt(iam_alt_root): + client = get_iam_alt_root_client(service_name='s3') + yield client + nuke_prefixed_buckets(get_prefix(), client) + + +@pytest.mark.iam_account +@pytest.mark.sns +def test_account_topic(sns): + name = get_new_topic_name() + + response = sns.create_topic(Name=name) + arn = response['TopicArn'] + assert arn.startswith('arn:aws:sns:') + assert arn.endswith(f':{name}') + + response = sns.list_topics() + assert arn in [p['TopicArn'] for p in response['Topics']] + + sns.set_topic_attributes(TopicArn=arn, AttributeName='Policy', AttributeValue='') + + response = sns.get_topic_attributes(TopicArn=arn) + assert 'Attributes' in response + + sns.delete_topic(TopicArn=arn) + + response = sns.list_topics() + assert arn not in [p['TopicArn'] for p in response['Topics']] + + with pytest.raises(sns.exceptions.NotFoundException): + sns.get_topic_attributes(TopicArn=arn) + sns.delete_topic(TopicArn=arn) + +@pytest.mark.iam_account +@pytest.mark.sns +def test_cross_account_topic(sns, sns_alt): + name = get_new_topic_name() + arn = sns.create_topic(Name=name)['TopicArn'] + + # not visible to any alt user apis + with pytest.raises(sns.exceptions.NotFoundException): + sns_alt.get_topic_attributes(TopicArn=arn) + with pytest.raises(sns.exceptions.NotFoundException): + sns_alt.set_topic_attributes(TopicArn=arn, AttributeName='Policy', AttributeValue='') + + # delete returns success + sns_alt.delete_topic(TopicArn=arn) + + response = sns_alt.list_topics() + assert arn not in [p['TopicArn'] for p in response['Topics']] + +@pytest.mark.iam_account +@pytest.mark.sns +def test_account_topic_publish(sns, s3): + name = get_new_topic_name() + + response = sns.create_topic(Name=name) + topic_arn = response['TopicArn'] + + bucket = get_new_bucket_name() + s3.create_bucket(Bucket=bucket) + + config = {'TopicConfigurations': [{ + 'Id': 'id', + 'TopicArn': topic_arn, + 'Events': [ 's3:ObjectCreated:*' ], + }]} + s3.put_bucket_notification_configuration( + Bucket=bucket, NotificationConfiguration=config) + +@pytest.mark.iam_account +@pytest.mark.iam_cross_account +@pytest.mark.sns +def test_cross_account_topic_publish(sns, s3_alt, iam_alt_root): + name = get_new_topic_name() + + response = sns.create_topic(Name=name) + topic_arn = response['TopicArn'] + + bucket = get_new_bucket_name() + s3_alt.create_bucket(Bucket=bucket) + + config = {'TopicConfigurations': [{ + 'Id': 'id', + 'TopicArn': topic_arn, + 'Events': [ 's3:ObjectCreated:*' ], + }]} + + # expect AccessDenies because no resource policy allows cross-account access + e = assert_raises(ClientError, s3_alt.put_bucket_notification_configuration, + Bucket=bucket, NotificationConfiguration=config) + status, error_code = _get_status_and_error_code(e.response) + assert status == 403 + assert error_code == 'AccessDenied' + + # add topic policy to allow the alt user + alt_principal = iam_alt_root.get_user()['User']['Arn'] + policy = json.dumps({ + 'Version': '2012-10-17', + 'Statement': [{ + 'Effect': 'Allow', + 'Principal': {'AWS': alt_principal}, + 'Action': 'sns:Publish', + 'Resource': topic_arn + }] + }) + sns.set_topic_attributes(TopicArn=topic_arn, AttributeName='Policy', + AttributeValue=policy) + + s3_alt.put_bucket_notification_configuration( + Bucket=bucket, NotificationConfiguration=config)