From a3a16eb66a020ce079a9ce1cc9398921e264ca3d Mon Sep 17 00:00:00 2001 From: Casey Bodley Date: Wed, 21 Feb 2024 09:36:06 -0500 Subject: [PATCH] iam: test cross-account policy with assumed role Signed-off-by: Casey Bodley --- s3tests_boto3/functional/test_iam.py | 282 +++++++++++++++++++++++++++ 1 file changed, 282 insertions(+) diff --git a/s3tests_boto3/functional/test_iam.py b/s3tests_boto3/functional/test_iam.py index 313a896..101cfb8 100644 --- a/s3tests_boto3/functional/test_iam.py +++ b/s3tests_boto3/functional/test_iam.py @@ -2301,6 +2301,288 @@ def test_account_role_policy_allow(iam_root): # something other than AccessDenied retry_on('AccessDenied', 10, s3.list_buckets) +# alt account user assumes main account role to access main account bucket +@pytest.mark.iam_account +@pytest.mark.iam_cross_account +@pytest.mark.iam_role +@pytest.mark.role_policy +def test_same_account_role_policy_allow(iam_root, iam_alt_root): + path = get_iam_path_prefix() + user_name = make_iam_name('AltUser') + role_name = make_iam_name('MyRole') + session_name = 'MySession' + bucket_name = get_new_bucket_name() + + user = iam_alt_root.create_user(UserName=user_name, Path=path)['User'] + user_arn = user['Arn'] + key = iam_alt_root.create_access_key(UserName=user_name)['AccessKey'] + + s3_main = get_iam_root_client(service_name='s3') + s3_main.create_bucket(Bucket=bucket_name) + + trust_policy = json.dumps({ + 'Version': '2012-10-17', + 'Statement': [{ + 'Effect': 'Allow', + 'Action': 'sts:AssumeRole', + 'Principal': {'AWS': user_arn} + }] + }) + # returns MalformedPolicyDocument until the user arn starts working + role = retry_on('MalformedPolicyDocument', 10, iam_root.create_role, + RoleName=role_name, Path=path, AssumeRolePolicyDocument=trust_policy)['Role'] + role_arn = role['Arn'] + + sts = get_sts_client(aws_access_key_id=key['AccessKeyId'], + aws_secret_access_key=key['SecretAccessKey']) + + # returns InvalidClientTokenId or AccessDenied until the access key starts working + response = retry_on(('InvalidClientTokenId', 'AccessDenied'), 10, sts.assume_role, + RoleArn=role_arn, RoleSessionName=session_name) + creds = response['Credentials'] + + s3 = get_iam_s3client(aws_access_key_id = creds['AccessKeyId'], + aws_secret_access_key = creds['SecretAccessKey'], + aws_session_token = creds['SessionToken']) + + # expect AccessDenied because no identity policy allows s3 actions + e = assert_raises(ClientError, s3.list_objects, Bucket=bucket_name) + status, error_code = _get_status_and_error_code(e.response) + assert status == 403 + assert error_code == 'AccessDenied' + + policy_name = 'AllowListBucket' + policy = json.dumps({ + 'Version': '2012-10-17', + 'Statement': [{ + 'Effect': 'Allow', + 'Action': 's3:ListBucket', + 'Resource': '*' + }] + }) + iam_root.put_role_policy(RoleName=role_name, PolicyName=policy_name, PolicyDocument=policy) + + # the policy may take a bit to start working. retry until it returns + # something other than AccessDenied + retry_on('AccessDenied', 10, s3.list_objects, Bucket=bucket_name) + +# alt account user assumes main account role to access alt account bucket +@pytest.mark.iam_account +@pytest.mark.iam_cross_account +@pytest.mark.iam_role +@pytest.mark.role_policy +def test_cross_account_role_policy_allow(iam_root, iam_alt_root): + path = get_iam_path_prefix() + user_name = make_iam_name('AltUser') + role_name = make_iam_name('MyRole') + session_name = 'MySession' + bucket_name = get_new_bucket_name() + + user = iam_alt_root.create_user(UserName=user_name, Path=path)['User'] + user_arn = user['Arn'] + key = iam_alt_root.create_access_key(UserName=user_name)['AccessKey'] + + s3_alt = get_iam_alt_root_client(service_name='s3') + s3_alt.create_bucket(Bucket=bucket_name) + + trust_policy = json.dumps({ + 'Version': '2012-10-17', + 'Statement': [{ + 'Effect': 'Allow', + 'Action': 'sts:AssumeRole', + 'Principal': {'AWS': user_arn} + }] + }) + # returns MalformedPolicyDocument until the user arn starts working + role = retry_on('MalformedPolicyDocument', 10, iam_root.create_role, + RoleName=role_name, Path=path, AssumeRolePolicyDocument=trust_policy)['Role'] + role_arn = role['Arn'] + + sts = get_sts_client(aws_access_key_id=key['AccessKeyId'], + aws_secret_access_key=key['SecretAccessKey']) + + # returns InvalidClientTokenId or AccessDenied until the access key starts working + response = retry_on(('InvalidClientTokenId', 'AccessDenied'), 10, sts.assume_role, + RoleArn=role_arn, RoleSessionName=session_name) + creds = response['Credentials'] + + s3 = get_iam_s3client(aws_access_key_id = creds['AccessKeyId'], + aws_secret_access_key = creds['SecretAccessKey'], + aws_session_token = creds['SessionToken']) + + # expect AccessDenied because no identity policy allows s3 actions + e = assert_raises(ClientError, s3.list_objects, Bucket=bucket_name) + status, error_code = _get_status_and_error_code(e.response) + assert status == 403 + assert error_code == 'AccessDenied' + + policy_name = 'AllowListBucket' + policy = json.dumps({ + 'Version': '2012-10-17', + 'Statement': [{ + 'Effect': 'Allow', + 'Action': 's3:ListBucket', + 'Resource': '*' + }] + }) + iam_root.put_role_policy(RoleName=role_name, PolicyName=policy_name, PolicyDocument=policy) + + # expect AccessDenied because no resource policy allows the main account + e = assert_raises(ClientError, s3.list_objects, Bucket=bucket_name) + status, error_code = _get_status_and_error_code(e.response) + assert status == 403 + assert error_code == 'AccessDenied' + + # add a bucket policy that allows s3:ListBucket for the main account's arn + main_arn = iam_root.get_user()['User']['Arn'] + s3_alt.put_bucket_policy(Bucket=bucket_name, Policy=json.dumps({ + 'Version': '2012-10-17', + 'Statement': [{ + 'Effect': 'Allow', + 'Principal': {'AWS': main_arn}, + 'Action': 's3:ListBucket', + 'Resource': f'arn:aws:s3:::{bucket_name}' + }] + })) + + # the policy may take a bit to start working. retry until it returns + # something other than AccessDenied + retry_on('AccessDenied', 10, s3.list_objects, Bucket=bucket_name) + +# alt account user assumes main account role to create a bucket +@pytest.mark.iam_account +@pytest.mark.iam_cross_account +@pytest.mark.iam_role +@pytest.mark.role_policy +def test_account_role_policy_allow_create_bucket(iam_root, iam_alt_root): + path = get_iam_path_prefix() + user_name = make_iam_name('AltUser') + role_name = make_iam_name('MyRole') + session_name = 'MySession' + bucket_name = get_new_bucket_name() + + user = iam_alt_root.create_user(UserName=user_name, Path=path)['User'] + user_arn = user['Arn'] + key = iam_alt_root.create_access_key(UserName=user_name)['AccessKey'] + + trust_policy = json.dumps({ + 'Version': '2012-10-17', + 'Statement': [{ + 'Effect': 'Allow', + 'Action': 'sts:AssumeRole', + 'Principal': {'AWS': user_arn} + }] + }) + # returns MalformedPolicyDocument until the user arn starts working + role = retry_on('MalformedPolicyDocument', 10, iam_root.create_role, + RoleName=role_name, Path=path, AssumeRolePolicyDocument=trust_policy)['Role'] + role_arn = role['Arn'] + + sts = get_sts_client(aws_access_key_id=key['AccessKeyId'], + aws_secret_access_key=key['SecretAccessKey']) + + # returns InvalidClientTokenId or AccessDenied until the access key starts working + response = retry_on(('InvalidClientTokenId', 'AccessDenied'), 10, sts.assume_role, + RoleArn=role_arn, RoleSessionName=session_name) + creds = response['Credentials'] + + s3 = get_iam_s3client(aws_access_key_id = creds['AccessKeyId'], + aws_secret_access_key = creds['SecretAccessKey'], + aws_session_token = creds['SessionToken']) + + # expect AccessDenied because no identity policy allows s3 actions + e = assert_raises(ClientError, s3.create_bucket, Bucket=bucket_name, ObjectOwnership='ObjectWriter', ACL='private') + status, error_code = _get_status_and_error_code(e.response) + assert status == 403 + assert error_code == 'AccessDenied' + + policy_name = 'AllowCreateBucket' + policy = json.dumps({ + 'Version': '2012-10-17', + 'Statement': [{ + 'Effect': 'Allow', + 'Action': ['s3:CreateBucket', 's3:PutBucketAcl'], + 'Resource': '*' + }] + }) + iam_root.put_role_policy(RoleName=role_name, PolicyName=policy_name, PolicyDocument=policy) + + # the policy may take a bit to start working. retry until it returns + # something other than AccessDenied + retry_on('AccessDenied', 10, s3.create_bucket, Bucket=bucket_name, ObjectOwnership='ObjectWriter', ACL='private') + + # verify that the bucket is owned by the role's account + s3_main = get_iam_root_client(service_name='s3') + response = s3_main.get_bucket_acl(Bucket=bucket_name) + + main_arn = iam_root.get_user()['User']['Arn'] + account_id = main_arn.removeprefix('arn:aws:iam::').removesuffix(':root') + assert response['Owner']['ID'] == account_id + assert response['Grants'][0]['Grantee']['ID'] == account_id + +# alt account user assumes main account role to read the role info +@pytest.mark.iam_account +@pytest.mark.iam_cross_account +@pytest.mark.iam_role +@pytest.mark.role_policy +def test_account_role_policy_allow_get_role(iam_root, iam_alt_root): + path = get_iam_path_prefix() + user_name = make_iam_name('AltUser') + role_name = make_iam_name('MyRole') + session_name = 'MySession' + bucket_name = get_new_bucket_name() + + user = iam_alt_root.create_user(UserName=user_name, Path=path)['User'] + user_arn = user['Arn'] + key = iam_alt_root.create_access_key(UserName=user_name)['AccessKey'] + + trust_policy = json.dumps({ + 'Version': '2012-10-17', + 'Statement': [{ + 'Effect': 'Allow', + 'Action': 'sts:AssumeRole', + 'Principal': {'AWS': user_arn} + }] + }) + # returns MalformedPolicyDocument until the user arn starts working + role = retry_on('MalformedPolicyDocument', 10, iam_root.create_role, + RoleName=role_name, Path=path, AssumeRolePolicyDocument=trust_policy)['Role'] + role_arn = role['Arn'] + + sts = get_sts_client(aws_access_key_id=key['AccessKeyId'], + aws_secret_access_key=key['SecretAccessKey']) + + # returns InvalidClientTokenId or AccessDenied until the access key starts working + response = retry_on(('InvalidClientTokenId', 'AccessDenied'), 10, sts.assume_role, + RoleArn=role_arn, RoleSessionName=session_name) + creds = response['Credentials'] + + iam = get_iam_root_client(service_name='iam', + aws_access_key_id = creds['AccessKeyId'], + aws_secret_access_key = creds['SecretAccessKey'], + aws_session_token = creds['SessionToken']) + + # expect AccessDenied because no identity policy allows iam actions + e = assert_raises(ClientError, iam.get_role, RoleName=role_name) + status, error_code = _get_status_and_error_code(e.response) + assert status == 403 + assert error_code == 'AccessDenied' + + policy_name = 'AllowGetRole' + policy = json.dumps({ + 'Version': '2012-10-17', + 'Statement': [{ + 'Effect': 'Allow', + 'Action': 'iam:GetRole', + 'Resource': '*' + }] + }) + iam_root.put_role_policy(RoleName=role_name, PolicyName=policy_name, PolicyDocument=policy) + + # the policy may take a bit to start working. retry until it returns + # something other than AccessDenied + retry_on('AccessDenied', 10, iam.get_role, RoleName=role_name) + # IAM OpenIDConnectProvider apis @pytest.mark.iam_account