From 8182d100480e4346ad949430a7c46fe135958759 Mon Sep 17 00:00:00 2001 From: Casey Bodley Date: Tue, 30 Jan 2024 18:17:17 -0500 Subject: [PATCH] iam: test cross-account permissions test the [iam alt root] user's access to buckets owned by [iam root] using various policy principals and acl grantees Signed-off-by: Casey Bodley (cherry picked from commit ba292fbf59897006d4aba32365129c27cea6c200) --- pytest.ini | 1 + s3tests_boto3/functional/test_iam.py | 379 +++++++++++++++++++++++++++ 2 files changed, 380 insertions(+) diff --git a/pytest.ini b/pytest.ini index 3ce92cf..4c456be 100644 --- a/pytest.ini +++ b/pytest.ini @@ -17,6 +17,7 @@ markers = fails_on_s3 fails_with_subdomain iam_account + iam_cross_account iam_role iam_tenant iam_user diff --git a/s3tests_boto3/functional/test_iam.py b/s3tests_boto3/functional/test_iam.py index a49f129..4cfbfad 100644 --- a/s3tests_boto3/functional/test_iam.py +++ b/s3tests_boto3/functional/test_iam.py @@ -13,6 +13,11 @@ from . import ( get_alt_client, get_iam_client, get_iam_root_client, + get_iam_root_user_id, + get_iam_root_email, + get_iam_alt_root_client, + get_iam_alt_root_user_id, + get_iam_alt_root_email, make_iam_name, get_iam_path_prefix, get_new_bucket, @@ -1908,3 +1913,377 @@ def test_account_role_policy_allow(iam_root): # the policy may take a bit to start working. retry until it returns # something other than AccessDenied retry_on('AccessDenied', 10, s3.list_buckets) + + +# fixture for iam alt account root user +@pytest.fixture +def iam_alt_root(configfile): + client = get_iam_alt_root_client() + try: + arn = client.get_user()['User']['Arn'] + if not arn.endswith(':root'): + pytest.skip('[iam alt root] user does not have :root arn') + except ClientError as e: + pytest.skip('[iam alt root] user does not belong to an account') + + yield client + nuke_users(client, PathPrefix=get_iam_path_prefix()) + nuke_roles(client, PathPrefix=get_iam_path_prefix()) + + +# test cross-account access, adding user policy before the bucket policy +def _test_cross_account_user_bucket_policy(roots3, alt_root, alt_name, alt_arn): + # add a user policy that allows s3 actions + alt_root.put_user_policy(UserName=alt_name, PolicyName='AllowStar', PolicyDocument=json.dumps({ + 'Version': '2012-10-17', + 'Statement': [{ + 'Effect': 'Allow', + 'Action': 's3:*', + 'Resource': '*' + }] + })) + + key = alt_root.create_access_key(UserName=alt_name)['AccessKey'] + alts3 = get_iam_s3client(aws_access_key_id=key['AccessKeyId'], + aws_secret_access_key=key['SecretAccessKey']) + + # create a bucket with the root user + bucket = get_new_bucket(roots3) + try: + # the access key may take a bit to start working. retry until it returns + # something other than InvalidAccessKeyId + e = assert_raises(ClientError, retry_on, 'InvalidAccessKeyId', 10, alts3.list_objects, Bucket=bucket) + status, error_code = _get_status_and_error_code(e.response) + assert status == 403 + assert error_code == 'AccessDenied' + + # add a bucket policy that allows s3:ListBucket for the iam user's arn + roots3.put_bucket_policy(Bucket=bucket, Policy=json.dumps({ + 'Version': '2012-10-17', + 'Statement': [{ + 'Effect': 'Allow', + 'Principal': {'AWS': alt_arn}, + 'Action': 's3:ListBucket', + 'Resource': f'arn:aws:s3:::{bucket}' + }] + })) + + # verify that the iam user can eventually access it + retry_on('AccessDenied', 10, alts3.list_objects, Bucket=bucket) + finally: + roots3.delete_bucket(Bucket=bucket) + +# test cross-account access, adding bucket policy before the user policy +def _test_cross_account_bucket_user_policy(roots3, alt_root, alt_name, alt_arn): + key = alt_root.create_access_key(UserName=alt_name)['AccessKey'] + alts3 = get_iam_s3client(aws_access_key_id=key['AccessKeyId'], + aws_secret_access_key=key['SecretAccessKey']) + + # create a bucket with the root user + bucket = get_new_bucket(roots3) + try: + # add a bucket policy that allows s3:ListBucket for the iam user's arn + roots3.put_bucket_policy(Bucket=bucket, Policy=json.dumps({ + 'Version': '2012-10-17', + 'Statement': [{ + 'Effect': 'Allow', + 'Principal': {'AWS': alt_arn}, + 'Action': 's3:ListBucket', + 'Resource': f'arn:aws:s3:::{bucket}' + }] + })) + + # the access key may take a bit to start working. retry until it returns + # something other than InvalidAccessKeyId + e = assert_raises(ClientError, retry_on, 'InvalidAccessKeyId', 10, alts3.list_objects, Bucket=bucket) + status, error_code = _get_status_and_error_code(e.response) + assert status == 403 + assert error_code == 'AccessDenied' + + # add a user policy that allows s3 actions + alt_root.put_user_policy(UserName=alt_name, PolicyName='AllowStar', PolicyDocument=json.dumps({ + 'Version': '2012-10-17', + 'Statement': [{ + 'Effect': 'Allow', + 'Action': 's3:*', + 'Resource': '*' + }] + })) + + # verify that the iam user can eventually access it + retry_on('AccessDenied', 10, alts3.list_objects, Bucket=bucket) + finally: + roots3.delete_bucket(Bucket=bucket) + +@pytest.mark.iam_account +@pytest.mark.iam_cross_account +def test_cross_account_bucket_user_policy_allow_user_arn(iam_root, iam_alt_root): + roots3 = get_iam_root_client(service_name='s3') + path = get_iam_path_prefix() + user_name = make_iam_name('AltUser') + response = iam_alt_root.create_user(UserName=user_name, Path=path) + user_arn = response['User']['Arn'] + _test_cross_account_bucket_user_policy(roots3, iam_alt_root, user_name, user_arn) + +@pytest.mark.iam_account +@pytest.mark.iam_cross_account +def test_cross_account_user_bucket_policy_allow_user_arn(iam_root, iam_alt_root): + roots3 = get_iam_root_client(service_name='s3') + path = get_iam_path_prefix() + user_name = make_iam_name('AltUser') + response = iam_alt_root.create_user(UserName=user_name, Path=path) + user_arn = response['User']['Arn'] + _test_cross_account_user_bucket_policy(roots3, iam_alt_root, user_name, user_arn) + +@pytest.mark.iam_account +@pytest.mark.iam_cross_account +def test_cross_account_user_bucket_policy_allow_account_arn(iam_root, iam_alt_root): + roots3 = get_iam_root_client(service_name='s3') + path = get_iam_path_prefix() + user_name = make_iam_name('AltUser') + response = iam_alt_root.create_user(UserName=user_name, Path=path) + user_arn = response['User']['Arn'] + account_arn = user_arn.replace(f':user{path}{user_name}', ':root') + _test_cross_account_user_bucket_policy(roots3, iam_alt_root, user_name, account_arn) + +@pytest.mark.iam_account +@pytest.mark.iam_cross_account +def test_cross_account_bucket_user_policy_allow_account_arn(iam_root, iam_alt_root): + roots3 = get_iam_root_client(service_name='s3') + path = get_iam_path_prefix() + user_name = make_iam_name('AltUser') + response = iam_alt_root.create_user(UserName=user_name, Path=path) + user_arn = response['User']['Arn'] + account_arn = user_arn.replace(f':user{path}{user_name}', ':root') + _test_cross_account_bucket_user_policy(roots3, iam_alt_root, user_name, account_arn) + +@pytest.mark.iam_account +@pytest.mark.iam_cross_account +def test_cross_account_user_bucket_policy_allow_account_id(iam_root, iam_alt_root): + roots3 = get_iam_root_client(service_name='s3') + path = get_iam_path_prefix() + user_name = make_iam_name('AltUser') + response = iam_alt_root.create_user(UserName=user_name, Path=path) + user_arn = response['User']['Arn'] + account_id = user_arn.removeprefix('arn:aws:iam::').removesuffix(f':user{path}{user_name}') + _test_cross_account_user_bucket_policy(roots3, iam_alt_root, user_name, account_id) + +@pytest.mark.iam_account +@pytest.mark.iam_cross_account +def test_cross_account_bucket_user_policy_allow_account_id(iam_root, iam_alt_root): + roots3 = get_iam_root_client(service_name='s3') + path = get_iam_path_prefix() + user_name = make_iam_name('AltUser') + response = iam_alt_root.create_user(UserName=user_name, Path=path) + user_arn = response['User']['Arn'] + account_id = user_arn.removeprefix('arn:aws:iam::').removesuffix(f':user{path}{user_name}') + _test_cross_account_bucket_user_policy(roots3, iam_alt_root, user_name, account_id) + + +# test cross-account access, adding user policy before the bucket acl +def _test_cross_account_user_policy_bucket_acl(roots3, alt_root, alt_name, grantee): + # add a user policy that allows s3 actions + alt_root.put_user_policy(UserName=alt_name, PolicyName='AllowStar', PolicyDocument=json.dumps({ + 'Version': '2012-10-17', + 'Statement': [{ + 'Effect': 'Allow', + 'Action': 's3:*', + 'Resource': '*' + }] + })) + + key = alt_root.create_access_key(UserName=alt_name)['AccessKey'] + alts3 = get_iam_s3client(aws_access_key_id=key['AccessKeyId'], + aws_secret_access_key=key['SecretAccessKey']) + + # create a bucket with the root user + bucket = get_new_bucket(roots3) + try: + # the access key may take a bit to start working. retry until it returns + # something other than InvalidAccessKeyId + e = assert_raises(ClientError, retry_on, 'InvalidAccessKeyId', 10, alts3.list_objects, Bucket=bucket) + status, error_code = _get_status_and_error_code(e.response) + assert status == 403 + assert error_code == 'AccessDenied' + + # add a bucket acl that grants READ access + roots3.put_bucket_acl(Bucket=bucket, GrantRead=grantee) + + # verify that the iam user can eventually access it + retry_on('AccessDenied', 10, alts3.list_objects, Bucket=bucket) + finally: + roots3.delete_bucket(Bucket=bucket) + +# test cross-account access, adding bucket acl before the user policy +def _test_cross_account_bucket_acl_user_policy(roots3, alt_root, alt_name, grantee): + key = alt_root.create_access_key(UserName=alt_name)['AccessKey'] + alts3 = get_iam_s3client(aws_access_key_id=key['AccessKeyId'], + aws_secret_access_key=key['SecretAccessKey']) + + # create a bucket with the root user + bucket = get_new_bucket(roots3) + try: + # add a bucket acl that grants READ access + roots3.put_bucket_acl(Bucket=bucket, GrantRead=grantee) + + # the access key may take a bit to start working. retry until it returns + # something other than InvalidAccessKeyId + e = assert_raises(ClientError, retry_on, 'InvalidAccessKeyId', 10, alts3.list_objects, Bucket=bucket) + status, error_code = _get_status_and_error_code(e.response) + assert status == 403 + assert error_code == 'AccessDenied' + + # add a user policy that allows s3 actions + alt_root.put_user_policy(UserName=alt_name, PolicyName='AllowStar', PolicyDocument=json.dumps({ + 'Version': '2012-10-17', + 'Statement': [{ + 'Effect': 'Allow', + 'Action': 's3:*', + 'Resource': '*' + }] + })) + + # verify that the iam user can eventually access it + retry_on('AccessDenied', 10, alts3.list_objects, Bucket=bucket) + finally: + roots3.delete_bucket(Bucket=bucket) + +@pytest.mark.iam_account +@pytest.mark.iam_cross_account +@pytest.mark.fails_on_aws # can't grant to individual users +def test_cross_account_bucket_acl_user_policy_grant_user_id(iam_root, iam_alt_root): + roots3 = get_iam_root_client(service_name='s3') + path = get_iam_path_prefix() + user_name = make_iam_name('AltUser') + response = iam_alt_root.create_user(UserName=user_name, Path=path) + grantee = 'id=' + response['User']['UserId'] + _test_cross_account_bucket_acl_user_policy(roots3, iam_alt_root, user_name, grantee) + +@pytest.mark.iam_account +@pytest.mark.iam_cross_account +@pytest.mark.fails_on_aws # can't grant to individual users +def test_cross_account_user_policy_bucket_acl_grant_user_id(iam_root, iam_alt_root): + roots3 = get_iam_root_client(service_name='s3') + path = get_iam_path_prefix() + user_name = make_iam_name('AltUser') + response = iam_alt_root.create_user(UserName=user_name, Path=path) + grantee = 'id=' + response['User']['UserId'] + _test_cross_account_user_policy_bucket_acl(roots3, iam_alt_root, user_name, grantee) + +@pytest.mark.iam_account +@pytest.mark.iam_cross_account +def test_cross_account_bucket_acl_user_policy_grant_canonical_id(iam_root, iam_alt_root): + roots3 = get_iam_root_client(service_name='s3') + path = get_iam_path_prefix() + user_name = make_iam_name('AltUser') + iam_alt_root.create_user(UserName=user_name, Path=path) + grantee = 'id=' + get_iam_alt_root_user_id() + _test_cross_account_bucket_acl_user_policy(roots3, iam_alt_root, user_name, grantee) + +@pytest.mark.iam_account +@pytest.mark.iam_cross_account +def test_cross_account_user_policy_bucket_acl_grant_canonical_id(iam_root, iam_alt_root): + roots3 = get_iam_root_client(service_name='s3') + path = get_iam_path_prefix() + user_name = make_iam_name('AltUser') + iam_alt_root.create_user(UserName=user_name, Path=path) + grantee = 'id=' + get_iam_alt_root_user_id() + _test_cross_account_user_policy_bucket_acl(roots3, iam_alt_root, user_name, grantee) + +@pytest.mark.iam_account +@pytest.mark.iam_cross_account +def test_cross_account_bucket_acl_user_policy_grant_account_email(iam_root, iam_alt_root): + roots3 = get_iam_root_client(service_name='s3') + path = get_iam_path_prefix() + user_name = make_iam_name('AltUser') + iam_alt_root.create_user(UserName=user_name, Path=path) + grantee = 'emailAddress=' + get_iam_alt_root_email() + _test_cross_account_bucket_acl_user_policy(roots3, iam_alt_root, user_name, grantee) + +@pytest.mark.iam_account +@pytest.mark.iam_cross_account +def test_cross_account_user_policy_bucket_acl_grant_account_email(iam_root, iam_alt_root): + roots3 = get_iam_root_client(service_name='s3') + path = get_iam_path_prefix() + user_name = make_iam_name('AltUser') + iam_alt_root.create_user(UserName=user_name, Path=path) + grantee = 'emailAddress=' + get_iam_alt_root_email() + _test_cross_account_user_policy_bucket_acl(roots3, iam_alt_root, user_name, grantee) + + +# test root cross-account access with bucket policy +def _test_cross_account_root_bucket_policy(roots3, alts3, alt_arn): + # create a bucket with the root user + bucket = get_new_bucket(roots3) + try: + e = assert_raises(ClientError, alts3.list_objects, Bucket=bucket) + status, error_code = _get_status_and_error_code(e.response) + assert status == 403 + assert error_code == 'AccessDenied' + + # add a bucket policy that allows s3:ListBucket for the iam user's arn + roots3.put_bucket_policy(Bucket=bucket, Policy=json.dumps({ + 'Version': '2012-10-17', + 'Statement': [{ + 'Effect': 'Allow', + 'Principal': {'AWS': alt_arn}, + 'Action': 's3:ListBucket', + 'Resource': f'arn:aws:s3:::{bucket}' + }] + })) + + # verify that the iam user can eventually access it + retry_on('AccessDenied', 10, alts3.list_objects, Bucket=bucket) + finally: + roots3.delete_bucket(Bucket=bucket) + +@pytest.mark.iam_account +@pytest.mark.iam_cross_account +def test_cross_account_root_bucket_policy_allow_account_arn(iam_root, iam_alt_root): + roots3 = get_iam_root_client(service_name='s3') + alts3 = get_iam_alt_root_client(service_name='s3') + alt_arn = iam_alt_root.get_user()['User']['Arn'] + _test_cross_account_root_bucket_policy(roots3, alts3, alt_arn) + +@pytest.mark.iam_account +@pytest.mark.iam_cross_account +def test_cross_account_root_bucket_policy_allow_account_id(iam_root, iam_alt_root): + roots3 = get_iam_root_client(service_name='s3') + alts3 = get_iam_alt_root_client(service_name='s3') + alt_arn = iam_alt_root.get_user()['User']['Arn'] + account_id = alt_arn.removeprefix('arn:aws:iam::').removesuffix(':root') + _test_cross_account_root_bucket_policy(roots3, alts3, account_id) + +# test root cross-account access with bucket acls +def _test_cross_account_root_bucket_acl(roots3, alts3, grantee): + # create a bucket with the root user + bucket = get_new_bucket(roots3) + try: + e = assert_raises(ClientError, alts3.list_objects, Bucket=bucket) + status, error_code = _get_status_and_error_code(e.response) + assert status == 403 + assert error_code == 'AccessDenied' + + # add a bucket acl that grants READ + roots3.put_bucket_acl(Bucket=bucket, GrantRead=grantee) + + # verify that the iam user can eventually access it + retry_on('AccessDenied', 10, alts3.list_objects, Bucket=bucket) + finally: + roots3.delete_bucket(Bucket=bucket) + +@pytest.mark.iam_account +@pytest.mark.iam_cross_account +def test_cross_account_root_bucket_acl_grant_canonical_id(iam_root, iam_alt_root): + roots3 = get_iam_root_client(service_name='s3') + alts3 = get_iam_alt_root_client(service_name='s3') + grantee = 'id=' + get_iam_alt_root_user_id() + _test_cross_account_root_bucket_acl(roots3, alts3, grantee) + +@pytest.mark.iam_account +@pytest.mark.iam_cross_account +def test_cross_account_root_bucket_acl_grant_account_email(iam_root, iam_alt_root): + roots3 = get_iam_root_client(service_name='s3') + alts3 = get_iam_alt_root_client(service_name='s3') + grantee = 'emailAddress=' + get_iam_alt_root_email() + _test_cross_account_root_bucket_acl(roots3, alts3, grantee)