import json from botocore.exceptions import ClientError import pytest from s3tests_boto3.functional.utils import assert_raises from s3tests_boto3.functional.test_s3 import _multipart_upload from . import ( configfile, setup_teardown, get_alt_client, get_iam_client, get_new_bucket, get_iam_s3client, get_alt_iam_client, get_alt_user_id, ) from .utils import _get_status, _get_status_and_error_code @pytest.mark.user_policy @pytest.mark.iam_tenant def test_put_user_policy(): client = get_iam_client() policy_document = json.dumps( {"Version": "2012-10-17", "Statement": { "Effect": "Allow", "Action": "*", "Resource": "*"}} ) response = client.put_user_policy(PolicyDocument=policy_document, PolicyName='AllAccessPolicy', UserName=get_alt_user_id()) assert response['ResponseMetadata']['HTTPStatusCode'] == 200 response = client.delete_user_policy(PolicyName='AllAccessPolicy', UserName=get_alt_user_id()) assert response['ResponseMetadata']['HTTPStatusCode'] == 200 @pytest.mark.user_policy @pytest.mark.iam_tenant def test_put_user_policy_invalid_user(): client = get_iam_client() policy_document = json.dumps( {"Version": "2012-10-17", "Statement": { "Effect": "Allow", "Action": "*", "Resource": "*"}} ) e = assert_raises(ClientError, client.put_user_policy, PolicyDocument=policy_document, PolicyName='AllAccessPolicy', UserName="some-non-existing-user-id") status = _get_status(e.response) assert status == 404 @pytest.mark.user_policy @pytest.mark.iam_tenant def test_put_user_policy_parameter_limit(): client = get_iam_client() policy_document = json.dumps( {"Version": "2012-10-17", "Statement": [{ "Effect": "Allow", "Action": "*", "Resource": "*"}] * 1000 } ) e = assert_raises(ClientError, client.put_user_policy, PolicyDocument=policy_document, PolicyName='AllAccessPolicy' * 10, UserName=get_alt_user_id()) status = _get_status(e.response) assert status == 400 @pytest.mark.user_policy @pytest.mark.iam_tenant @pytest.mark.fails_on_rgw def test_put_user_policy_invalid_element(): client = get_iam_client() # With Version other than 2012-10-17 policy_document = json.dumps( {"Version": "2010-10-17", "Statement": [{ "Effect": "Allow", "Action": "*", "Resource": "*"}] } ) e = assert_raises(ClientError, client.put_user_policy, PolicyDocument=policy_document, PolicyName='AllAccessPolicy', UserName=get_alt_user_id()) status = _get_status(e.response) assert status == 400 # With no Statement policy_document = json.dumps( { "Version": "2012-10-17", } ) e = assert_raises(ClientError, client.put_user_policy, PolicyDocument=policy_document, PolicyName='AllAccessPolicy', UserName=get_alt_user_id()) status = _get_status(e.response) assert status == 400 # with same Sid for 2 statements policy_document = json.dumps( {"Version": "2012-10-17", "Statement": [ {"Sid": "98AB54CF", "Effect": "Allow", "Action": "*", "Resource": "*"}, {"Sid": "98AB54CF", "Effect": "Allow", "Action": "*", "Resource": "*"}] } ) e = assert_raises(ClientError, client.put_user_policy, PolicyDocument=policy_document, PolicyName='AllAccessPolicy', UserName=get_alt_user_id()) status = _get_status(e.response) assert status == 400 # with Principal policy_document = json.dumps( {"Version": "2012-10-17", "Statement": [{ "Effect": "Allow", "Action": "*", "Resource": "*", "Principal": "arn:aws:iam:::username"}] } ) e = assert_raises(ClientError, client.put_user_policy, PolicyDocument=policy_document, PolicyName='AllAccessPolicy', UserName=get_alt_user_id()) status = _get_status(e.response) assert status == 400 @pytest.mark.user_policy @pytest.mark.iam_tenant def test_put_existing_user_policy(): client = get_iam_client() policy_document = json.dumps( {"Version": "2012-10-17", "Statement": { "Effect": "Allow", "Action": "*", "Resource": "*"} } ) response = client.put_user_policy(PolicyDocument=policy_document, PolicyName='AllAccessPolicy', UserName=get_alt_user_id()) assert response['ResponseMetadata']['HTTPStatusCode'] == 200 client.put_user_policy(PolicyDocument=policy_document, PolicyName='AllAccessPolicy', UserName=get_alt_user_id()) client.delete_user_policy(PolicyName='AllAccessPolicy', UserName=get_alt_user_id()) @pytest.mark.user_policy @pytest.mark.iam_tenant def test_list_user_policy(): client = get_iam_client() policy_document = json.dumps( {"Version": "2012-10-17", "Statement": { "Effect": "Allow", "Action": "*", "Resource": "*"} } ) response = client.put_user_policy(PolicyDocument=policy_document, PolicyName='AllAccessPolicy', UserName=get_alt_user_id()) assert response['ResponseMetadata']['HTTPStatusCode'] == 200 response = client.list_user_policies(UserName=get_alt_user_id()) assert response['ResponseMetadata']['HTTPStatusCode'] == 200 client.delete_user_policy(PolicyName='AllAccessPolicy', UserName=get_alt_user_id()) @pytest.mark.user_policy @pytest.mark.iam_tenant def test_list_user_policy_invalid_user(): client = get_iam_client() e = assert_raises(ClientError, client.list_user_policies, UserName="some-non-existing-user-id") status = _get_status(e.response) assert status == 404 @pytest.mark.user_policy @pytest.mark.iam_tenant def test_get_user_policy(): client = get_iam_client() policy_document = json.dumps( {"Version": "2012-10-17", "Statement": { "Effect": "Allow", "Action": "*", "Resource": "*"}} ) response = client.put_user_policy(PolicyDocument=policy_document, PolicyName='AllAccessPolicy', UserName=get_alt_user_id()) assert response['ResponseMetadata']['HTTPStatusCode'] == 200 response = client.get_user_policy(PolicyName='AllAccessPolicy', UserName=get_alt_user_id()) assert response['ResponseMetadata']['HTTPStatusCode'] == 200 response = client.delete_user_policy(PolicyName='AllAccessPolicy', UserName=get_alt_user_id()) assert response['ResponseMetadata']['HTTPStatusCode'] == 200 @pytest.mark.user_policy @pytest.mark.iam_tenant def test_get_user_policy_invalid_user(): client = get_iam_client() policy_document = json.dumps( {"Version": "2012-10-17", "Statement": { "Effect": "Allow", "Action": "*", "Resource": "*"}} ) response = client.put_user_policy(PolicyDocument=policy_document, PolicyName='AllAccessPolicy', UserName=get_alt_user_id()) assert response['ResponseMetadata']['HTTPStatusCode'] == 200 e = assert_raises(ClientError, client.get_user_policy, PolicyName='AllAccessPolicy', UserName="some-non-existing-user-id") status = _get_status(e.response) assert status == 404 client.delete_user_policy(PolicyName='AllAccessPolicy', UserName=get_alt_user_id()) @pytest.mark.user_policy @pytest.mark.iam_tenant @pytest.mark.fails_on_rgw def test_get_user_policy_invalid_policy_name(): client = get_iam_client() policy_document = json.dumps( {"Version": "2012-10-17", "Statement": { "Effect": "Allow", "Action": "*", "Resource": "*"}} ) client.put_user_policy(PolicyDocument=policy_document, PolicyName='AllAccessPolicy', UserName=get_alt_user_id()) e = assert_raises(ClientError, client.get_user_policy, PolicyName='non-existing-policy-name', UserName=get_alt_user_id()) status = _get_status(e.response) assert status == 404 client.delete_user_policy(PolicyName='AllAccessPolicy', UserName=get_alt_user_id()) @pytest.mark.user_policy @pytest.mark.iam_tenant @pytest.mark.fails_on_rgw def test_get_deleted_user_policy(): client = get_iam_client() policy_document = json.dumps( {"Version": "2012-10-17", "Statement": { "Effect": "Allow", "Action": "*", "Resource": "*"}} ) client.put_user_policy(PolicyDocument=policy_document, PolicyName='AllAccessPolicy', UserName=get_alt_user_id()) client.delete_user_policy(PolicyName='AllAccessPolicy', UserName=get_alt_user_id()) e = assert_raises(ClientError, client.get_user_policy, PolicyName='AllAccessPolicy', UserName=get_alt_user_id()) status = _get_status(e.response) assert status == 404 @pytest.mark.user_policy @pytest.mark.iam_tenant def test_get_user_policy_from_multiple_policies(): client = get_iam_client() policy_document_allow = json.dumps( {"Version": "2012-10-17", "Statement": { "Effect": "Allow", "Action": "*", "Resource": "*"}} ) response = client.put_user_policy(PolicyDocument=policy_document_allow, PolicyName='AllowAccessPolicy1', UserName=get_alt_user_id()) assert response['ResponseMetadata']['HTTPStatusCode'] == 200 response = client.put_user_policy(PolicyDocument=policy_document_allow, PolicyName='AllowAccessPolicy2', UserName=get_alt_user_id()) assert response['ResponseMetadata']['HTTPStatusCode'] == 200 response = client.get_user_policy(PolicyName='AllowAccessPolicy2', UserName=get_alt_user_id()) assert response['ResponseMetadata']['HTTPStatusCode'] == 200 response = client.delete_user_policy(PolicyName='AllowAccessPolicy1', UserName=get_alt_user_id()) assert response['ResponseMetadata']['HTTPStatusCode'] == 200 response = client.delete_user_policy(PolicyName='AllowAccessPolicy2', UserName=get_alt_user_id()) assert response['ResponseMetadata']['HTTPStatusCode'] == 200 @pytest.mark.user_policy @pytest.mark.iam_tenant def test_delete_user_policy(): client = get_iam_client() policy_document_allow = json.dumps( {"Version": "2012-10-17", "Statement": { "Effect": "Allow", "Action": "*", "Resource": "*"}} ) response = client.put_user_policy(PolicyDocument=policy_document_allow, PolicyName='AllowAccessPolicy', UserName=get_alt_user_id()) assert response['ResponseMetadata']['HTTPStatusCode'] == 200 response = client.delete_user_policy(PolicyName='AllowAccessPolicy', UserName=get_alt_user_id()) assert response['ResponseMetadata']['HTTPStatusCode'] == 200 @pytest.mark.user_policy @pytest.mark.iam_tenant def test_delete_user_policy_invalid_user(): client = get_iam_client() policy_document_allow = json.dumps( {"Version": "2012-10-17", "Statement": { "Effect": "Allow", "Action": "*", "Resource": "*"}} ) response = client.put_user_policy(PolicyDocument=policy_document_allow, PolicyName='AllowAccessPolicy', UserName=get_alt_user_id()) assert response['ResponseMetadata']['HTTPStatusCode'] == 200 e = assert_raises(ClientError, client.delete_user_policy, PolicyName='AllAccessPolicy', UserName="some-non-existing-user-id") status = _get_status(e.response) assert status == 404 response = client.delete_user_policy(PolicyName='AllowAccessPolicy', UserName=get_alt_user_id()) assert response['ResponseMetadata']['HTTPStatusCode'] == 200 @pytest.mark.user_policy @pytest.mark.iam_tenant def test_delete_user_policy_invalid_policy_name(): client = get_iam_client() policy_document_allow = json.dumps( {"Version": "2012-10-17", "Statement": { "Effect": "Allow", "Action": "*", "Resource": "*"}} ) response = client.put_user_policy(PolicyDocument=policy_document_allow, PolicyName='AllowAccessPolicy', UserName=get_alt_user_id()) assert response['ResponseMetadata']['HTTPStatusCode'] == 200 e = assert_raises(ClientError, client.delete_user_policy, PolicyName='non-existing-policy-name', UserName=get_alt_user_id()) status = _get_status(e.response) assert status == 404 response = client.delete_user_policy(PolicyName='AllowAccessPolicy', UserName=get_alt_user_id()) assert response['ResponseMetadata']['HTTPStatusCode'] == 200 @pytest.mark.user_policy @pytest.mark.iam_tenant def test_delete_user_policy_from_multiple_policies(): client = get_iam_client() policy_document_allow = json.dumps( {"Version": "2012-10-17", "Statement": { "Effect": "Allow", "Action": "*", "Resource": "*"}} ) response = client.put_user_policy(PolicyDocument=policy_document_allow, PolicyName='AllowAccessPolicy1', UserName=get_alt_user_id()) assert response['ResponseMetadata']['HTTPStatusCode'] == 200 response = client.put_user_policy(PolicyDocument=policy_document_allow, PolicyName='AllowAccessPolicy2', UserName=get_alt_user_id()) assert response['ResponseMetadata']['HTTPStatusCode'] == 200 response = client.put_user_policy(PolicyDocument=policy_document_allow, PolicyName='AllowAccessPolicy3', UserName=get_alt_user_id()) assert response['ResponseMetadata']['HTTPStatusCode'] == 200 response = client.delete_user_policy(PolicyName='AllowAccessPolicy1', UserName=get_alt_user_id()) assert response['ResponseMetadata']['HTTPStatusCode'] == 200 response = client.delete_user_policy(PolicyName='AllowAccessPolicy2', UserName=get_alt_user_id()) assert response['ResponseMetadata']['HTTPStatusCode'] == 200 response = client.get_user_policy(PolicyName='AllowAccessPolicy3', UserName=get_alt_user_id()) assert response['ResponseMetadata']['HTTPStatusCode'] == 200 response = client.delete_user_policy(PolicyName='AllowAccessPolicy3', UserName=get_alt_user_id()) assert response['ResponseMetadata']['HTTPStatusCode'] == 200 @pytest.mark.user_policy @pytest.mark.iam_tenant def test_allow_bucket_actions_in_user_policy(): client = get_iam_client() s3_client_alt = get_alt_client() s3_client_iam = get_iam_s3client() bucket = get_new_bucket(client=s3_client_iam) s3_client_iam.put_object(Bucket=bucket, Key='foo', Body='bar') policy_document_allow = json.dumps( {"Version": "2012-10-17", "Statement": { "Effect": "Allow", "Action": ["s3:ListBucket", "s3:DeleteBucket"], "Resource": f"arn:aws:s3:::{bucket}"}} ) response = client.put_user_policy(PolicyDocument=policy_document_allow, PolicyName='AllowAccessPolicy', UserName=get_alt_user_id()) assert response['ResponseMetadata']['HTTPStatusCode'] == 200 response = s3_client_alt.list_objects(Bucket=bucket) object_found = False for object_received in response['Contents']: if "foo" == object_received['Key']: object_found = True break if not object_found: raise AssertionError("Object is not listed") response = s3_client_iam.delete_object(Bucket=bucket, Key='foo') assert response['ResponseMetadata']['HTTPStatusCode'] == 204 response = s3_client_alt.delete_bucket(Bucket=bucket) assert response['ResponseMetadata']['HTTPStatusCode'] == 204 response = s3_client_iam.list_buckets() for bucket in response['Buckets']: if bucket == bucket['Name']: raise AssertionError("deleted bucket is getting listed") response = client.delete_user_policy(PolicyName='AllowAccessPolicy', UserName=get_alt_user_id()) assert response['ResponseMetadata']['HTTPStatusCode'] == 200 @pytest.mark.user_policy @pytest.mark.iam_tenant @pytest.mark.fails_on_dbstore def test_deny_bucket_actions_in_user_policy(): client = get_iam_client() s3_client = get_alt_client() bucket = get_new_bucket(client=s3_client) policy_document_deny = json.dumps( {"Version": "2012-10-17", "Statement": { "Effect": "Deny", "Action": ["s3:ListAllMyBuckets", "s3:DeleteBucket"], "Resource": "arn:aws:s3:::*"}} ) response = client.put_user_policy(PolicyDocument=policy_document_deny, PolicyName='DenyAccessPolicy', UserName=get_alt_user_id()) assert response['ResponseMetadata']['HTTPStatusCode'] == 200 e = assert_raises(ClientError, s3_client.list_buckets, Bucket=bucket) status, error_code = _get_status_and_error_code(e.response) assert status == 403 assert error_code == 'AccessDenied' e = assert_raises(ClientError, s3_client.delete_bucket, Bucket=bucket) status, error_code = _get_status_and_error_code(e.response) assert status == 403 assert error_code == 'AccessDenied' response = client.delete_user_policy(PolicyName='DenyAccessPolicy', UserName=get_alt_user_id()) assert response['ResponseMetadata']['HTTPStatusCode'] == 200 response = s3_client.delete_bucket(Bucket=bucket) assert response['ResponseMetadata']['HTTPStatusCode'] == 204 @pytest.mark.user_policy @pytest.mark.iam_tenant def test_allow_object_actions_in_user_policy(): client = get_iam_client() s3_client_alt = get_alt_client() s3_client_iam = get_iam_s3client() bucket = get_new_bucket(client=s3_client_iam) policy_document_allow = json.dumps( {"Version": "2012-10-17", "Statement": { "Effect": "Allow", "Action": ["s3:PutObject", "s3:GetObject", "s3:DeleteObject"], "Resource": f"arn:aws:s3:::{bucket}/*"}} ) response = client.put_user_policy(PolicyDocument=policy_document_allow, PolicyName='AllowAccessPolicy', UserName=get_alt_user_id()) assert response['ResponseMetadata']['HTTPStatusCode'] == 200 s3_client_alt.put_object(Bucket=bucket, Key='foo', Body='bar') response = s3_client_alt.get_object(Bucket=bucket, Key='foo') body = response['Body'].read() if type(body) is bytes: body = body.decode() assert body == "bar" response = s3_client_alt.delete_object(Bucket=bucket, Key='foo') assert response['ResponseMetadata']['HTTPStatusCode'] == 204 e = assert_raises(ClientError, s3_client_iam.get_object, Bucket=bucket, Key='foo') status, error_code = _get_status_and_error_code(e.response) assert status == 404 assert error_code == 'NoSuchKey' response = s3_client_iam.delete_bucket(Bucket=bucket) assert response['ResponseMetadata']['HTTPStatusCode'] == 204 response = client.delete_user_policy(PolicyName='AllowAccessPolicy', UserName=get_alt_user_id()) assert response['ResponseMetadata']['HTTPStatusCode'] == 200 @pytest.mark.user_policy @pytest.mark.iam_tenant @pytest.mark.fails_on_dbstore def test_deny_object_actions_in_user_policy(): client = get_iam_client() s3_client_alt = get_alt_client() bucket = get_new_bucket(client=s3_client_alt) s3_client_alt.put_object(Bucket=bucket, Key='foo', Body='bar') policy_document_deny = json.dumps( {"Version": "2012-10-17", "Statement": [{ "Effect": "Deny", "Action": ["s3:PutObject", "s3:GetObject", "s3:DeleteObject"], "Resource": f"arn:aws:s3:::{bucket}/*"}, { "Effect": "Allow", "Action": ["s3:DeleteBucket"], "Resource": f"arn:aws:s3:::{bucket}"}]} ) client.put_user_policy(PolicyDocument=policy_document_deny, PolicyName='DenyAccessPolicy', UserName=get_alt_user_id()) e = assert_raises(ClientError, s3_client_alt.put_object, Bucket=bucket, Key='foo') status, error_code = _get_status_and_error_code(e.response) assert status == 403 assert error_code == 'AccessDenied' e = assert_raises(ClientError, s3_client_alt.get_object, Bucket=bucket, Key='foo') status, error_code = _get_status_and_error_code(e.response) assert status == 403 assert error_code == 'AccessDenied' e = assert_raises(ClientError, s3_client_alt.delete_object, Bucket=bucket, Key='foo') status, error_code = _get_status_and_error_code(e.response) assert status == 403 assert error_code == 'AccessDenied' response = client.delete_user_policy(PolicyName='DenyAccessPolicy', UserName=get_alt_user_id()) assert response['ResponseMetadata']['HTTPStatusCode'] == 200 @pytest.mark.user_policy @pytest.mark.iam_tenant def test_allow_multipart_actions_in_user_policy(): client = get_iam_client() s3_client_alt = get_alt_client() s3_client_iam = get_iam_s3client() bucket = get_new_bucket(client=s3_client_iam) policy_document_allow = json.dumps( {"Version": "2012-10-17", "Statement": { "Effect": "Allow", "Action": ["s3:ListBucketMultipartUploads", "s3:AbortMultipartUpload"], "Resource": "arn:aws:s3:::*"}} ) response = client.put_user_policy(PolicyDocument=policy_document_allow, PolicyName='AllowAccessPolicy', UserName=get_alt_user_id()) assert response['ResponseMetadata']['HTTPStatusCode'] == 200 key = "mymultipart" mb = 1024 * 1024 (upload_id, _, _) = _multipart_upload(client=s3_client_iam, bucket_name=bucket, key=key, size=5 * mb) response = s3_client_alt.list_multipart_uploads(Bucket=bucket) assert response['ResponseMetadata']['HTTPStatusCode'] == 200 response = s3_client_alt.abort_multipart_upload(Bucket=bucket, Key=key, UploadId=upload_id) assert response['ResponseMetadata']['HTTPStatusCode'] == 204 response = s3_client_iam.delete_bucket(Bucket=bucket) assert response['ResponseMetadata']['HTTPStatusCode'] == 204 response = client.delete_user_policy(PolicyName='AllowAccessPolicy', UserName=get_alt_user_id()) assert response['ResponseMetadata']['HTTPStatusCode'] == 200 @pytest.mark.user_policy @pytest.mark.iam_tenant @pytest.mark.fails_on_dbstore def test_deny_multipart_actions_in_user_policy(): client = get_iam_client() s3_client = get_alt_client() bucket = get_new_bucket(client=s3_client) policy_document_deny = json.dumps( {"Version": "2012-10-17", "Statement": { "Effect": "Deny", "Action": ["s3:ListBucketMultipartUploads", "s3:AbortMultipartUpload"], "Resource": "arn:aws:s3:::*"}} ) response = client.put_user_policy(PolicyDocument=policy_document_deny, PolicyName='DenyAccessPolicy', UserName=get_alt_user_id()) assert response['ResponseMetadata']['HTTPStatusCode'] == 200 key = "mymultipart" mb = 1024 * 1024 (upload_id, _, _) = _multipart_upload(client=s3_client, bucket_name=bucket, key=key, size=5 * mb) e = assert_raises(ClientError, s3_client.list_multipart_uploads, Bucket=bucket) status, error_code = _get_status_and_error_code(e.response) assert status == 403 assert error_code == 'AccessDenied' e = assert_raises(ClientError, s3_client.abort_multipart_upload, Bucket=bucket, Key=key, UploadId=upload_id) status, error_code = _get_status_and_error_code(e.response) assert status == 403 assert error_code == 'AccessDenied' response = s3_client.delete_bucket(Bucket=bucket) assert response['ResponseMetadata']['HTTPStatusCode'] == 204 response = client.delete_user_policy(PolicyName='DenyAccessPolicy', UserName=get_alt_user_id()) assert response['ResponseMetadata']['HTTPStatusCode'] == 200 @pytest.mark.user_policy @pytest.mark.iam_tenant @pytest.mark.fails_on_dbstore def test_allow_tagging_actions_in_user_policy(): client = get_iam_client() s3_client_alt = get_alt_client() s3_client_iam = get_iam_s3client() bucket = get_new_bucket(client=s3_client_iam) policy_document_allow = json.dumps( {"Version": "2012-10-17", "Statement": { "Effect": "Allow", "Action": ["s3:PutBucketTagging", "s3:GetBucketTagging", "s3:PutObjectTagging", "s3:GetObjectTagging"], "Resource": f"arn:aws:s3:::*"}} ) client.put_user_policy(PolicyDocument=policy_document_allow, PolicyName='AllowAccessPolicy', UserName=get_alt_user_id()) tags = {'TagSet': [{'Key': 'Hello', 'Value': 'World'}, ]} response = s3_client_alt.put_bucket_tagging(Bucket=bucket, Tagging=tags) assert response['ResponseMetadata']['HTTPStatusCode'] == 200 response = s3_client_alt.get_bucket_tagging(Bucket=bucket) assert response['ResponseMetadata']['HTTPStatusCode'] == 200 assert response['TagSet'][0]['Key'] == 'Hello' assert response['TagSet'][0]['Value'] == 'World' obj_key = 'obj' response = s3_client_iam.put_object(Bucket=bucket, Key=obj_key, Body='obj_body') assert response['ResponseMetadata']['HTTPStatusCode'] == 200 response = s3_client_alt.put_object_tagging(Bucket=bucket, Key=obj_key, Tagging=tags) assert response['ResponseMetadata']['HTTPStatusCode'] == 200 response = s3_client_alt.get_object_tagging(Bucket=bucket, Key=obj_key) assert response['ResponseMetadata']['HTTPStatusCode'] == 200 assert response['TagSet'] == tags['TagSet'] response = s3_client_iam.delete_object(Bucket=bucket, Key=obj_key) assert response['ResponseMetadata']['HTTPStatusCode'] == 204 response = s3_client_iam.delete_bucket(Bucket=bucket) assert response['ResponseMetadata']['HTTPStatusCode'] == 204 response = client.delete_user_policy(PolicyName='AllowAccessPolicy', UserName=get_alt_user_id()) assert response['ResponseMetadata']['HTTPStatusCode'] == 200 @pytest.mark.user_policy @pytest.mark.iam_tenant @pytest.mark.fails_on_dbstore def test_deny_tagging_actions_in_user_policy(): client = get_iam_client() s3_client = get_alt_client() bucket = get_new_bucket(client=s3_client) policy_document_deny = json.dumps( {"Version": "2012-10-17", "Statement": { "Effect": "Deny", "Action": ["s3:PutBucketTagging", "s3:GetBucketTagging", "s3:PutObjectTagging", "s3:DeleteObjectTagging"], "Resource": "arn:aws:s3:::*"}} ) client.put_user_policy(PolicyDocument=policy_document_deny, PolicyName='DenyAccessPolicy', UserName=get_alt_user_id()) tags = {'TagSet': [{'Key': 'Hello', 'Value': 'World'}, ]} e = assert_raises(ClientError, s3_client.put_bucket_tagging, Bucket=bucket, Tagging=tags) status, error_code = _get_status_and_error_code(e.response) assert status == 403 assert error_code == 'AccessDenied' e = assert_raises(ClientError, s3_client.get_bucket_tagging, Bucket=bucket) status, error_code = _get_status_and_error_code(e.response) assert status == 403 assert error_code == 'AccessDenied' obj_key = 'obj' response = s3_client.put_object(Bucket=bucket, Key=obj_key, Body='obj_body') assert response['ResponseMetadata']['HTTPStatusCode'] == 200 e = assert_raises(ClientError, s3_client.put_object_tagging, Bucket=bucket, Key=obj_key, Tagging=tags) status, error_code = _get_status_and_error_code(e.response) assert status == 403 assert error_code == 'AccessDenied' e = assert_raises(ClientError, s3_client.delete_object_tagging, Bucket=bucket, Key=obj_key) status, error_code = _get_status_and_error_code(e.response) assert status == 403 assert error_code == 'AccessDenied' response = s3_client.delete_object(Bucket=bucket, Key=obj_key) assert response['ResponseMetadata']['HTTPStatusCode'] == 204 response = s3_client.delete_bucket(Bucket=bucket) assert response['ResponseMetadata']['HTTPStatusCode'] == 204 response = client.delete_user_policy(PolicyName='DenyAccessPolicy', UserName=get_alt_user_id()) assert response['ResponseMetadata']['HTTPStatusCode'] == 200 @pytest.mark.user_policy @pytest.mark.iam_tenant @pytest.mark.fails_on_dbstore def test_verify_conflicting_user_policy_statements(): s3client = get_alt_client() bucket = get_new_bucket(client=s3client) policy_document = json.dumps( {"Version": "2012-10-17", "Statement": [ {"Sid": "98AB54CG", "Effect": "Allow", "Action": "s3:ListBucket", "Resource": f"arn:aws:s3:::{bucket}"}, {"Sid": "98AB54CA", "Effect": "Deny", "Action": "s3:ListBucket", "Resource": f"arn:aws:s3:::{bucket}"} ]} ) client = get_iam_client() response = client.put_user_policy(PolicyDocument=policy_document, PolicyName='DenyAccessPolicy', UserName=get_alt_user_id()) assert response['ResponseMetadata']['HTTPStatusCode'] == 200 e = assert_raises(ClientError, s3client.list_objects, Bucket=bucket) status, error_code = _get_status_and_error_code(e.response) assert status == 403 assert error_code == 'AccessDenied' response = client.delete_user_policy(PolicyName='DenyAccessPolicy', UserName=get_alt_user_id()) assert response['ResponseMetadata']['HTTPStatusCode'] == 200 @pytest.mark.user_policy @pytest.mark.iam_tenant @pytest.mark.fails_on_dbstore def test_verify_conflicting_user_policies(): s3client = get_alt_client() bucket = get_new_bucket(client=s3client) policy_allow = json.dumps( {"Version": "2012-10-17", "Statement": {"Sid": "98AB54CG", "Effect": "Allow", "Action": "s3:ListBucket", "Resource": f"arn:aws:s3:::{bucket}"}} ) policy_deny = json.dumps( {"Version": "2012-10-17", "Statement": {"Sid": "98AB54CGZ", "Effect": "Deny", "Action": "s3:ListBucket", "Resource": f"arn:aws:s3:::{bucket}"}} ) client = get_iam_client() response = client.put_user_policy(PolicyDocument=policy_allow, PolicyName='AllowAccessPolicy', UserName=get_alt_user_id()) assert response['ResponseMetadata']['HTTPStatusCode'] == 200 response = client.put_user_policy(PolicyDocument=policy_deny, PolicyName='DenyAccessPolicy', UserName=get_alt_user_id()) assert response['ResponseMetadata']['HTTPStatusCode'] == 200 e = assert_raises(ClientError, s3client.list_objects, Bucket=bucket) status, error_code = _get_status_and_error_code(e.response) assert status == 403 assert error_code == 'AccessDenied' response = client.delete_user_policy(PolicyName='AllowAccessPolicy', UserName=get_alt_user_id()) assert response['ResponseMetadata']['HTTPStatusCode'] == 200 response = client.delete_user_policy(PolicyName='DenyAccessPolicy', UserName=get_alt_user_id()) assert response['ResponseMetadata']['HTTPStatusCode'] == 200 @pytest.mark.user_policy @pytest.mark.iam_tenant def test_verify_allow_iam_actions(): policy1 = json.dumps( {"Version": "2012-10-17", "Statement": {"Sid": "98AB54CGA", "Effect": "Allow", "Action": ["iam:PutUserPolicy", "iam:GetUserPolicy", "iam:ListUserPolicies", "iam:DeleteUserPolicy"], "Resource": f"arn:aws:iam:::user/{get_alt_user_id()}"}} ) client1 = get_iam_client() iam_client_alt = get_alt_iam_client() response = client1.put_user_policy(PolicyDocument=policy1, PolicyName='AllowAccessPolicy', UserName=get_alt_user_id()) assert response['ResponseMetadata']['HTTPStatusCode'] == 200 response = iam_client_alt.get_user_policy(PolicyName='AllowAccessPolicy', UserName=get_alt_user_id()) assert response['ResponseMetadata']['HTTPStatusCode'] == 200 response = iam_client_alt.list_user_policies(UserName=get_alt_user_id()) assert response['ResponseMetadata']['HTTPStatusCode'] == 200 response = iam_client_alt.delete_user_policy(PolicyName='AllowAccessPolicy', UserName=get_alt_user_id()) assert response['ResponseMetadata']['HTTPStatusCode'] == 200