From 98fd3f55a02fbbcfd492131b0e1f9a626765accd Mon Sep 17 00:00:00 2001 From: Casey Bodley Date: Sun, 11 Feb 2024 11:51:17 -0500 Subject: [PATCH] iam: add account tests for GroupPolicy apis Signed-off-by: Casey Bodley (cherry picked from commit 7ebc530e042333636338d298cfeaefa58592b22d) --- pytest.ini | 1 + s3tests_boto3/functional/test_iam.py | 220 +++++++++++++++++++++++++++ 2 files changed, 221 insertions(+) diff --git a/pytest.ini b/pytest.ini index 5da379d..c7e0418 100644 --- a/pytest.ini +++ b/pytest.ini @@ -17,6 +17,7 @@ markers = fails_on_s3 fails_with_subdomain group + group_policy iam_account iam_cross_account iam_role diff --git a/s3tests_boto3/functional/test_iam.py b/s3tests_boto3/functional/test_iam.py index e873fe1..313a896 100644 --- a/s3tests_boto3/functional/test_iam.py +++ b/s3tests_boto3/functional/test_iam.py @@ -928,6 +928,24 @@ def nuke_users(client, **kwargs): except: pass +def nuke_group_policies(client, name): + p = client.get_paginator('list_group_policies') + for response in p.paginate(GroupName=name): + for policy in response['PolicyNames']: + try: + client.delete_group_policy(GroupName=name, PolicyName=policy) + except: + pass + +def nuke_attached_group_policies(client, name): + p = client.get_paginator('list_attached_group_policies') + for response in p.paginate(GroupName=name): + for policy in response['AttachedPolicies']: + try: + client.detach_group_policy(GroupName=name, PolicyArn=policy['PolicyArn']) + except: + pass + def nuke_group_users(client, name): p = client.get_paginator('get_group') for response in p.paginate(GroupName=name): @@ -939,6 +957,14 @@ def nuke_group_users(client, name): def nuke_group(client, name): # delete group policies and remove all users + try: + nuke_group_policies(client, name) + except: + pass + try: + nuke_attached_group_policies(client, name) + except: + pass try: nuke_group_users(client, name) except: @@ -1727,6 +1753,200 @@ def test_account_group_update(iam_root): assert group_id == groups[0]['GroupId'] +# IAM GroupPolicy apis +@pytest.mark.group_policy +@pytest.mark.iam_account +def test_account_inline_group_policy(iam_root): + path = get_iam_path_prefix() + name = make_iam_name('name') + policy_name = 'List' + bucket_name = get_new_bucket_name() + policy1 = json.dumps({'Version': '2012-10-17', 'Statement': [ + {'Effect': 'Deny', + 'Action': 's3:ListBucket', + 'Resource': f'arn:aws:s3:::{bucket_name}'}]}) + policy2 = json.dumps({'Version': '2012-10-17', 'Statement': [ + {'Effect': 'Allow', + 'Action': 's3:ListBucket', + 'Resource': f'arn:aws:s3:::{bucket_name}'}]}) + + # Get/Put/Delete fail on nonexistent GroupName + with pytest.raises(iam_root.exceptions.NoSuchEntityException): + iam_root.get_group_policy(GroupName=name, PolicyName=policy_name) + with pytest.raises(iam_root.exceptions.NoSuchEntityException): + iam_root.delete_group_policy(GroupName=name, PolicyName=policy_name) + with pytest.raises(iam_root.exceptions.NoSuchEntityException): + iam_root.put_group_policy(GroupName=name, PolicyName=policy_name, PolicyDocument=policy1) + + iam_root.create_group(GroupName=name, Path=path) + + # Get/Delete fail on nonexistent PolicyName + with pytest.raises(iam_root.exceptions.NoSuchEntityException): + iam_root.get_group_policy(GroupName=name, PolicyName=policy_name) + with pytest.raises(iam_root.exceptions.NoSuchEntityException): + iam_root.delete_group_policy(GroupName=name, PolicyName=policy_name) + + iam_root.put_group_policy(GroupName=name, PolicyName=policy_name, PolicyDocument=policy1) + + response = iam_root.get_group_policy(GroupName=name, PolicyName=policy_name) + assert policy1 == json.dumps(response['PolicyDocument']) + response = iam_root.list_group_policies(GroupName=name) + assert [policy_name] == response['PolicyNames'] + + iam_root.put_group_policy(GroupName=name, PolicyName=policy_name, PolicyDocument=policy2) + + response = iam_root.get_group_policy(GroupName=name, PolicyName=policy_name) + assert policy2 == json.dumps(response['PolicyDocument']) + response = iam_root.list_group_policies(GroupName=name) + assert [policy_name] == response['PolicyNames'] + + # DeleteGroup fails while policies are still attached + with pytest.raises(iam_root.exceptions.DeleteConflictException): + iam_root.delete_group(GroupName=name) + + iam_root.delete_group_policy(GroupName=name, PolicyName=policy_name) + + # Get/Delete fail after Delete + with pytest.raises(iam_root.exceptions.NoSuchEntityException): + iam_root.get_group_policy(GroupName=name, PolicyName=policy_name) + with pytest.raises(iam_root.exceptions.NoSuchEntityException): + iam_root.delete_group_policy(GroupName=name, PolicyName=policy_name) + + response = iam_root.list_group_policies(GroupName=name) + assert [] == response['PolicyNames'] + +@pytest.mark.group_policy +@pytest.mark.iam_account +def test_account_managed_group_policy(iam_root): + path = get_iam_path_prefix() + name = make_iam_name('name') + policy1 = 'arn:aws:iam::aws:policy/AmazonS3FullAccess' + policy2 = 'arn:aws:iam::aws:policy/AmazonS3ReadOnlyAccess' + + # Attach/Detach/List fail on nonexistent GroupName + with pytest.raises(iam_root.exceptions.NoSuchEntityException): + iam_root.attach_group_policy(GroupName=name, PolicyArn=policy1) + with pytest.raises(iam_root.exceptions.NoSuchEntityException): + iam_root.detach_group_policy(GroupName=name, PolicyArn=policy1) + with pytest.raises(iam_root.exceptions.NoSuchEntityException): + iam_root.list_attached_group_policies(GroupName=name) + + iam_root.create_group(GroupName=name, Path=path) + + # Detach fails on unattached PolicyArn + with pytest.raises(iam_root.exceptions.NoSuchEntityException): + iam_root.detach_group_policy(GroupName=name, PolicyArn=policy1) + + iam_root.attach_group_policy(GroupName=name, PolicyArn=policy1) + iam_root.attach_group_policy(GroupName=name, PolicyArn=policy1) + + response = iam_root.list_attached_group_policies(GroupName=name) + assert len(response['AttachedPolicies']) == 1 + assert 'AmazonS3FullAccess' == response['AttachedPolicies'][0]['PolicyName'] + assert policy1 == response['AttachedPolicies'][0]['PolicyArn'] + + iam_root.attach_group_policy(GroupName=name, PolicyArn=policy2) + + response = iam_root.list_attached_group_policies(GroupName=name) + policies = response['AttachedPolicies'] + assert len(policies) == 2 + names = [p['PolicyName'] for p in policies] + arns = [p['PolicyArn'] for p in policies] + assert 'AmazonS3FullAccess' in names + assert policy1 in arns + assert 'AmazonS3ReadOnlyAccess' in names + assert policy2 in arns + + iam_root.detach_group_policy(GroupName=name, PolicyArn=policy2) + + # Detach fails after Detach + with pytest.raises(iam_root.exceptions.NoSuchEntityException): + iam_root.detach_group_policy(GroupName=name, PolicyArn=policy2) + + response = iam_root.list_attached_group_policies(GroupName=name) + assert len(response['AttachedPolicies']) == 1 + assert 'AmazonS3FullAccess' == response['AttachedPolicies'][0]['PolicyName'] + assert policy1 == response['AttachedPolicies'][0]['PolicyArn'] + + # DeleteGroup fails while policies are still attached + with pytest.raises(iam_root.exceptions.DeleteConflictException): + iam_root.delete_group(GroupName=name) + +@pytest.mark.group_policy +@pytest.mark.iam_account +def test_account_inline_group_policy_allow(iam_root): + path = get_iam_path_prefix() + username = make_iam_name('User') + groupname = make_iam_name('Group') + bucket_name = get_new_bucket_name() + + iam_root.create_user(UserName=username, Path=path) + + key = iam_root.create_access_key(UserName=username)['AccessKey'] + client = get_iam_s3client(aws_access_key_id=key['AccessKeyId'], + aws_secret_access_key=key['SecretAccessKey']) + + iam_root.create_group(GroupName=groupname, Path=path) + iam_root.add_user_to_group(GroupName=groupname, UserName=username) + + # the access key may take a bit to start working. retry until it returns + # something other than InvalidAccessKeyId + e = assert_raises(ClientError, retry_on, 'InvalidAccessKeyId', 10, client.list_buckets) + # expect AccessDenied because no identity policy allows s3 actions + status, error_code = _get_status_and_error_code(e.response) + assert status == 403 + assert error_code == 'AccessDenied' + + # add a group policy that allows s3 actions + policy = json.dumps({ + 'Version': '2012-10-17', + 'Statement': [{ + 'Effect': 'Allow', + 'Action': 's3:*', + 'Resource': '*' + }] + }) + policy_name = 'AllowStar' + iam_root.put_group_policy(GroupName=groupname, PolicyName=policy_name, PolicyDocument=policy) + + # the policy may take a bit to start working. retry until it returns + # something other than AccessDenied + retry_on('AccessDenied', 10, client.list_buckets) + +@pytest.mark.group_policy +@pytest.mark.iam_account +def test_account_managed_group_policy_allow(iam_root): + path = get_iam_path_prefix() + username = make_iam_name('User') + groupname = make_iam_name('Group') + bucket_name = get_new_bucket_name() + + iam_root.create_user(UserName=username, Path=path) + + key = iam_root.create_access_key(UserName=username)['AccessKey'] + client = get_iam_s3client(aws_access_key_id=key['AccessKeyId'], + aws_secret_access_key=key['SecretAccessKey']) + + iam_root.create_group(GroupName=groupname, Path=path) + iam_root.add_user_to_group(GroupName=groupname, UserName=username) + + # the access key may take a bit to start working. retry until it returns + # something other than InvalidAccessKeyId + e = assert_raises(ClientError, retry_on, 'InvalidAccessKeyId', 10, client.list_buckets) + # expect AccessDenied because no identity policy allows s3 actions + status, error_code = _get_status_and_error_code(e.response) + assert status == 403 + assert error_code == 'AccessDenied' + + # add a group policy that allows s3 read actions + policy_arn = 'arn:aws:iam::aws:policy/AmazonS3ReadOnlyAccess' + iam_root.attach_group_policy(GroupName=groupname, PolicyArn=policy_arn) + + # the policy may take a bit to start working. retry until it returns + # something other than AccessDenied + retry_on('AccessDenied', 10, client.list_buckets) + + assume_role_policy = json.dumps({ 'Version': '2012-10-17', 'Statement': [{