import json from botocore.exceptions import ClientError from nose.plugins.attrib import attr from nose.tools import eq_ as eq from s3tests_boto3.functional.utils import assert_raises from s3tests_boto3.functional.test_s3 import _multipart_upload from . import ( configfile, setup_teardown, get_alt_client, get_iam_client, get_new_bucket, get_iam_s3client, get_alt_iam_client, get_alt_user_id, ) from .utils import _get_status, _get_status_and_error_code @attr(resource='user-policy') @attr(method='put') @attr(operation='Verify Put User Policy') @attr(assertion='succeeds') @attr('user-policy') @attr('test_of_iam') def test_put_user_policy(): client = get_iam_client() policy_document = json.dumps( {"Version": "2012-10-17", "Statement": { "Effect": "Allow", "Action": "*", "Resource": "*"}} ) response = client.put_user_policy(PolicyDocument=policy_document, PolicyName='AllAccessPolicy', UserName=get_alt_user_id()) eq(response['ResponseMetadata']['HTTPStatusCode'], 200) response = client.delete_user_policy(PolicyName='AllAccessPolicy', UserName=get_alt_user_id()) eq(response['ResponseMetadata']['HTTPStatusCode'], 200) @attr(resource='user-policy') @attr(method='put') @attr(operation='Verify Put User Policy with invalid user') @attr(assertion='succeeds') @attr('user-policy') @attr('test_of_iam') def test_put_user_policy_invalid_user(): client = get_iam_client() policy_document = json.dumps( {"Version": "2012-10-17", "Statement": { "Effect": "Allow", "Action": "*", "Resource": "*"}} ) e = assert_raises(ClientError, client.put_user_policy, PolicyDocument=policy_document, PolicyName='AllAccessPolicy', UserName="some-non-existing-user-id") status = _get_status(e.response) eq(status, 404) @attr(resource='user-policy') @attr(method='put') @attr(operation='Verify Put User Policy using parameter value outside limit') @attr(assertion='succeeds') @attr('user-policy') @attr('test_of_iam') def test_put_user_policy_parameter_limit(): client = get_iam_client() policy_document = json.dumps( {"Version": "2012-10-17", "Statement": [{ "Effect": "Allow", "Action": "*", "Resource": "*"}] * 1000 } ) e = assert_raises(ClientError, client.put_user_policy, PolicyDocument=policy_document, PolicyName='AllAccessPolicy' * 10, UserName=get_alt_user_id()) status = _get_status(e.response) eq(status, 400) @attr(resource='user-policy') @attr(method='put') @attr(operation='Verify Put User Policy using invalid policy document elements') @attr(assertion='succeeds') @attr('user-policy') @attr('test_of_iam') @attr('fails_on_rgw') def test_put_user_policy_invalid_element(): client = get_iam_client() # With Version other than 2012-10-17 policy_document = json.dumps( {"Version": "2010-10-17", "Statement": [{ "Effect": "Allow", "Action": "*", "Resource": "*"}] } ) e = assert_raises(ClientError, client.put_user_policy, PolicyDocument=policy_document, PolicyName='AllAccessPolicy', UserName=get_alt_user_id()) status = _get_status(e.response) eq(status, 400) # With no Statement policy_document = json.dumps( { "Version": "2012-10-17", } ) e = assert_raises(ClientError, client.put_user_policy, PolicyDocument=policy_document, PolicyName='AllAccessPolicy', UserName=get_alt_user_id()) status = _get_status(e.response) eq(status, 400) # with same Sid for 2 statements policy_document = json.dumps( {"Version": "2012-10-17", "Statement": [ {"Sid": "98AB54CF", "Effect": "Allow", "Action": "*", "Resource": "*"}, {"Sid": "98AB54CF", "Effect": "Allow", "Action": "*", "Resource": "*"}] } ) e = assert_raises(ClientError, client.put_user_policy, PolicyDocument=policy_document, PolicyName='AllAccessPolicy', UserName=get_alt_user_id()) status = _get_status(e.response) eq(status, 400) # with Principal policy_document = json.dumps( {"Version": "2012-10-17", "Statement": [{ "Effect": "Allow", "Action": "*", "Resource": "*", "Principal": "arn:aws:iam:::username"}] } ) e = assert_raises(ClientError, client.put_user_policy, PolicyDocument=policy_document, PolicyName='AllAccessPolicy', UserName=get_alt_user_id()) status = _get_status(e.response) eq(status, 400) @attr(resource='user-policy') @attr(method='put') @attr(operation='Verify Put a policy that already exists') @attr(assertion='succeeds') @attr('user-policy') @attr('test_of_iam') def test_put_existing_user_policy(): client = get_iam_client() policy_document = json.dumps( {"Version": "2012-10-17", "Statement": { "Effect": "Allow", "Action": "*", "Resource": "*"} } ) response = client.put_user_policy(PolicyDocument=policy_document, PolicyName='AllAccessPolicy', UserName=get_alt_user_id()) eq(response['ResponseMetadata']['HTTPStatusCode'], 200) client.put_user_policy(PolicyDocument=policy_document, PolicyName='AllAccessPolicy', UserName=get_alt_user_id()) client.delete_user_policy(PolicyName='AllAccessPolicy', UserName=get_alt_user_id()) @attr(resource='user-policy') @attr(method='put') @attr(operation='Verify List User policies') @attr(assertion='succeeds') @attr('user-policy') @attr('test_of_iam') def test_list_user_policy(): client = get_iam_client() policy_document = json.dumps( {"Version": "2012-10-17", "Statement": { "Effect": "Allow", "Action": "*", "Resource": "*"} } ) response = client.put_user_policy(PolicyDocument=policy_document, PolicyName='AllAccessPolicy', UserName=get_alt_user_id()) eq(response['ResponseMetadata']['HTTPStatusCode'], 200) response = client.list_user_policies(UserName=get_alt_user_id()) eq(response['ResponseMetadata']['HTTPStatusCode'], 200) client.delete_user_policy(PolicyName='AllAccessPolicy', UserName=get_alt_user_id()) @attr(resource='user-policy') @attr(method='put') @attr(operation='Verify List User policies with invalid user') @attr(assertion='succeeds') @attr('user-policy') @attr('test_of_iam') def test_list_user_policy_invalid_user(): client = get_iam_client() e = assert_raises(ClientError, client.list_user_policies, UserName="some-non-existing-user-id") status = _get_status(e.response) eq(status, 404) @attr(resource='user-policy') @attr(method='get') @attr(operation='Verify Get User policy') @attr(assertion='succeeds') @attr('user-policy') @attr('test_of_iam') def test_get_user_policy(): client = get_iam_client() policy_document = json.dumps( {"Version": "2012-10-17", "Statement": { "Effect": "Allow", "Action": "*", "Resource": "*"}} ) response = client.put_user_policy(PolicyDocument=policy_document, PolicyName='AllAccessPolicy', UserName=get_alt_user_id()) eq(response['ResponseMetadata']['HTTPStatusCode'], 200) response = client.get_user_policy(PolicyName='AllAccessPolicy', UserName=get_alt_user_id()) eq(response['ResponseMetadata']['HTTPStatusCode'], 200) response = client.delete_user_policy(PolicyName='AllAccessPolicy', UserName=get_alt_user_id()) eq(response['ResponseMetadata']['HTTPStatusCode'], 200) @attr(resource='user-policy') @attr(method='get') @attr(operation='Verify Get User Policy with invalid user') @attr(assertion='succeeds') @attr('user-policy') @attr('test_of_iam') def test_get_user_policy_invalid_user(): client = get_iam_client() policy_document = json.dumps( {"Version": "2012-10-17", "Statement": { "Effect": "Allow", "Action": "*", "Resource": "*"}} ) response = client.put_user_policy(PolicyDocument=policy_document, PolicyName='AllAccessPolicy', UserName=get_alt_user_id()) eq(response['ResponseMetadata']['HTTPStatusCode'], 200) e = assert_raises(ClientError, client.get_user_policy, PolicyName='AllAccessPolicy', UserName="some-non-existing-user-id") status = _get_status(e.response) eq(status, 404) client.delete_user_policy(PolicyName='AllAccessPolicy', UserName=get_alt_user_id()) @attr(resource='user-policy') @attr(method='get') @attr(operation='Verify Get User Policy with invalid policy name') @attr(assertion='succeeds') @attr('user-policy') @attr('test_of_iam') @attr('fails_on_rgw') def test_get_user_policy_invalid_policy_name(): client = get_iam_client() policy_document = json.dumps( {"Version": "2012-10-17", "Statement": { "Effect": "Allow", "Action": "*", "Resource": "*"}} ) client.put_user_policy(PolicyDocument=policy_document, PolicyName='AllAccessPolicy', UserName=get_alt_user_id()) e = assert_raises(ClientError, client.get_user_policy, PolicyName='non-existing-policy-name', UserName=get_alt_user_id()) status = _get_status(e.response) eq(status, 404) client.delete_user_policy(PolicyName='AllAccessPolicy', UserName=get_alt_user_id()) @attr(resource='user-policy') @attr(method='get') @attr(operation='Verify Get Deleted User Policy') @attr(assertion='succeeds') @attr('user-policy') @attr('test_of_iam') @attr('fails_on_rgw') def test_get_deleted_user_policy(): client = get_iam_client() policy_document = json.dumps( {"Version": "2012-10-17", "Statement": { "Effect": "Allow", "Action": "*", "Resource": "*"}} ) client.put_user_policy(PolicyDocument=policy_document, PolicyName='AllAccessPolicy', UserName=get_alt_user_id()) client.delete_user_policy(PolicyName='AllAccessPolicy', UserName=get_alt_user_id()) e = assert_raises(ClientError, client.get_user_policy, PolicyName='AllAccessPolicy', UserName=get_alt_user_id()) status = _get_status(e.response) eq(status, 404) @attr(resource='user-policy') @attr(method='get') @attr(operation='Verify Get a policy from multiple policies for a user') @attr(assertion='succeeds') @attr('user-policy') @attr('test_of_iam') def test_get_user_policy_from_multiple_policies(): client = get_iam_client() policy_document_allow = json.dumps( {"Version": "2012-10-17", "Statement": { "Effect": "Allow", "Action": "*", "Resource": "*"}} ) response = client.put_user_policy(PolicyDocument=policy_document_allow, PolicyName='AllowAccessPolicy1', UserName=get_alt_user_id()) eq(response['ResponseMetadata']['HTTPStatusCode'], 200) response = client.put_user_policy(PolicyDocument=policy_document_allow, PolicyName='AllowAccessPolicy2', UserName=get_alt_user_id()) eq(response['ResponseMetadata']['HTTPStatusCode'], 200) response = client.get_user_policy(PolicyName='AllowAccessPolicy2', UserName=get_alt_user_id()) eq(response['ResponseMetadata']['HTTPStatusCode'], 200) response = client.delete_user_policy(PolicyName='AllowAccessPolicy1', UserName=get_alt_user_id()) eq(response['ResponseMetadata']['HTTPStatusCode'], 200) response = client.delete_user_policy(PolicyName='AllowAccessPolicy2', UserName=get_alt_user_id()) eq(response['ResponseMetadata']['HTTPStatusCode'], 200) @attr(resource='user-policy') @attr(method='delete') @attr(operation='Verify Delete User Policy') @attr(assertion='succeeds') @attr('user-policy') @attr('test_of_iam') def test_delete_user_policy(): client = get_iam_client() policy_document_allow = json.dumps( {"Version": "2012-10-17", "Statement": { "Effect": "Allow", "Action": "*", "Resource": "*"}} ) response = client.put_user_policy(PolicyDocument=policy_document_allow, PolicyName='AllowAccessPolicy', UserName=get_alt_user_id()) eq(response['ResponseMetadata']['HTTPStatusCode'], 200) response = client.delete_user_policy(PolicyName='AllowAccessPolicy', UserName=get_alt_user_id()) eq(response['ResponseMetadata']['HTTPStatusCode'], 200) @attr(resource='user-policy') @attr(method='delete') @attr(operation='Verify Delete User Policy with invalid user') @attr(assertion='succeeds') @attr('user-policy') @attr('test_of_iam') def test_delete_user_policy_invalid_user(): client = get_iam_client() policy_document_allow = json.dumps( {"Version": "2012-10-17", "Statement": { "Effect": "Allow", "Action": "*", "Resource": "*"}} ) response = client.put_user_policy(PolicyDocument=policy_document_allow, PolicyName='AllowAccessPolicy', UserName=get_alt_user_id()) eq(response['ResponseMetadata']['HTTPStatusCode'], 200) e = assert_raises(ClientError, client.delete_user_policy, PolicyName='AllAccessPolicy', UserName="some-non-existing-user-id") status = _get_status(e.response) eq(status, 404) response = client.delete_user_policy(PolicyName='AllowAccessPolicy', UserName=get_alt_user_id()) eq(response['ResponseMetadata']['HTTPStatusCode'], 200) @attr(resource='user-policy') @attr(method='delete') @attr(operation='Verify Delete User Policy with invalid policy name') @attr(assertion='succeeds') @attr('user-policy') @attr('test_of_iam') def test_delete_user_policy_invalid_policy_name(): client = get_iam_client() policy_document_allow = json.dumps( {"Version": "2012-10-17", "Statement": { "Effect": "Allow", "Action": "*", "Resource": "*"}} ) response = client.put_user_policy(PolicyDocument=policy_document_allow, PolicyName='AllowAccessPolicy', UserName=get_alt_user_id()) eq(response['ResponseMetadata']['HTTPStatusCode'], 200) e = assert_raises(ClientError, client.delete_user_policy, PolicyName='non-existing-policy-name', UserName=get_alt_user_id()) status = _get_status(e.response) eq(status, 404) response = client.delete_user_policy(PolicyName='AllowAccessPolicy', UserName=get_alt_user_id()) eq(response['ResponseMetadata']['HTTPStatusCode'], 200) @attr(resource='user-policy') @attr(method='delete') @attr(operation='Verify Delete multiple User policies for a user') @attr(assertion='succeeds') @attr('user-policy') @attr('test_of_iam') def test_delete_user_policy_from_multiple_policies(): client = get_iam_client() policy_document_allow = json.dumps( {"Version": "2012-10-17", "Statement": { "Effect": "Allow", "Action": "*", "Resource": "*"}} ) response = client.put_user_policy(PolicyDocument=policy_document_allow, PolicyName='AllowAccessPolicy1', UserName=get_alt_user_id()) eq(response['ResponseMetadata']['HTTPStatusCode'], 200) response = client.put_user_policy(PolicyDocument=policy_document_allow, PolicyName='AllowAccessPolicy2', UserName=get_alt_user_id()) eq(response['ResponseMetadata']['HTTPStatusCode'], 200) response = client.put_user_policy(PolicyDocument=policy_document_allow, PolicyName='AllowAccessPolicy3', UserName=get_alt_user_id()) eq(response['ResponseMetadata']['HTTPStatusCode'], 200) response = client.delete_user_policy(PolicyName='AllowAccessPolicy1', UserName=get_alt_user_id()) eq(response['ResponseMetadata']['HTTPStatusCode'], 200) response = client.delete_user_policy(PolicyName='AllowAccessPolicy2', UserName=get_alt_user_id()) eq(response['ResponseMetadata']['HTTPStatusCode'], 200) response = client.get_user_policy(PolicyName='AllowAccessPolicy3', UserName=get_alt_user_id()) eq(response['ResponseMetadata']['HTTPStatusCode'], 200) response = client.delete_user_policy(PolicyName='AllowAccessPolicy3', UserName=get_alt_user_id()) eq(response['ResponseMetadata']['HTTPStatusCode'], 200) @attr(resource='user-policy') @attr(method='s3 Actions') @attr(operation='Verify Allow Bucket Actions in user Policy') @attr(assertion='succeeds') @attr('user-policy') @attr('test_of_iam') def test_allow_bucket_actions_in_user_policy(): client = get_iam_client() s3_client_alt = get_alt_client() s3_client_iam = get_iam_s3client() bucket = get_new_bucket(client=s3_client_iam) s3_client_iam.put_object(Bucket=bucket, Key='foo', Body='bar') policy_document_allow = json.dumps( {"Version": "2012-10-17", "Statement": { "Effect": "Allow", "Action": ["s3:ListBucket", "s3:DeleteBucket"], "Resource": f"arn:aws:s3:::{bucket}"}} ) response = client.put_user_policy(PolicyDocument=policy_document_allow, PolicyName='AllowAccessPolicy', UserName=get_alt_user_id()) eq(response['ResponseMetadata']['HTTPStatusCode'], 200) response = s3_client_alt.list_objects(Bucket=bucket) object_found = False for object_received in response['Contents']: if "foo" == object_received['Key']: object_found = True break if not object_found: raise AssertionError("Object is not listed") response = s3_client_iam.delete_object(Bucket=bucket, Key='foo') eq(response['ResponseMetadata']['HTTPStatusCode'], 204) response = s3_client_alt.delete_bucket(Bucket=bucket) eq(response['ResponseMetadata']['HTTPStatusCode'], 204) response = s3_client_iam.list_buckets() for bucket in response['Buckets']: if bucket == bucket['Name']: raise AssertionError("deleted bucket is getting listed") response = client.delete_user_policy(PolicyName='AllowAccessPolicy', UserName=get_alt_user_id()) eq(response['ResponseMetadata']['HTTPStatusCode'], 200) @attr(resource='user-policy') @attr(method='s3 Actions') @attr(operation='Verify Deny Bucket Actions in user Policy') @attr(assertion='succeeds') @attr('user-policy') @attr('test_of_iam') @attr('fails_on_dbstore') def test_deny_bucket_actions_in_user_policy(): client = get_iam_client() s3_client = get_alt_client() bucket = get_new_bucket(client=s3_client) policy_document_deny = json.dumps( {"Version": "2012-10-17", "Statement": { "Effect": "Deny", "Action": ["s3:ListAllMyBuckets", "s3:DeleteBucket"], "Resource": "arn:aws:s3:::*"}} ) response = client.put_user_policy(PolicyDocument=policy_document_deny, PolicyName='DenyAccessPolicy', UserName=get_alt_user_id()) eq(response['ResponseMetadata']['HTTPStatusCode'], 200) e = assert_raises(ClientError, s3_client.list_buckets, Bucket=bucket) status, error_code = _get_status_and_error_code(e.response) eq(status, 403) eq(error_code, 'AccessDenied') e = assert_raises(ClientError, s3_client.delete_bucket, Bucket=bucket) status, error_code = _get_status_and_error_code(e.response) eq(status, 403) eq(error_code, 'AccessDenied') response = client.delete_user_policy(PolicyName='DenyAccessPolicy', UserName=get_alt_user_id()) eq(response['ResponseMetadata']['HTTPStatusCode'], 200) response = s3_client.delete_bucket(Bucket=bucket) eq(response['ResponseMetadata']['HTTPStatusCode'], 204) @attr(resource='user-policy') @attr(method='s3 Actions') @attr(operation='Verify Allow Object Actions in user Policy') @attr(assertion='succeeds') @attr('user-policy') @attr('test_of_iam') def test_allow_object_actions_in_user_policy(): client = get_iam_client() s3_client_alt = get_alt_client() s3_client_iam = get_iam_s3client() bucket = get_new_bucket(client=s3_client_iam) policy_document_allow = json.dumps( {"Version": "2012-10-17", "Statement": { "Effect": "Allow", "Action": ["s3:PutObject", "s3:GetObject", "s3:DeleteObject"], "Resource": f"arn:aws:s3:::{bucket}/*"}} ) response = client.put_user_policy(PolicyDocument=policy_document_allow, PolicyName='AllowAccessPolicy', UserName=get_alt_user_id()) eq(response['ResponseMetadata']['HTTPStatusCode'], 200) s3_client_alt.put_object(Bucket=bucket, Key='foo', Body='bar') response = s3_client_alt.get_object(Bucket=bucket, Key='foo') body = response['Body'].read() if type(body) is bytes: body = body.decode() eq(body, "bar") response = s3_client_alt.delete_object(Bucket=bucket, Key='foo') eq(response['ResponseMetadata']['HTTPStatusCode'], 204) e = assert_raises(ClientError, s3_client_iam.get_object, Bucket=bucket, Key='foo') status, error_code = _get_status_and_error_code(e.response) eq(status, 404) eq(error_code, 'NoSuchKey') response = s3_client_iam.delete_bucket(Bucket=bucket) eq(response['ResponseMetadata']['HTTPStatusCode'], 204) response = client.delete_user_policy(PolicyName='AllowAccessPolicy', UserName=get_alt_user_id()) eq(response['ResponseMetadata']['HTTPStatusCode'], 200) @attr(resource='user-policy') @attr(method='s3 Actions') @attr(operation='Verify Deny Object Actions in user Policy') @attr(assertion='succeeds') @attr('user-policy') @attr('test_of_iam') @attr('fails_on_dbstore') def test_deny_object_actions_in_user_policy(): client = get_iam_client() s3_client_alt = get_alt_client() bucket = get_new_bucket(client=s3_client_alt) s3_client_alt.put_object(Bucket=bucket, Key='foo', Body='bar') policy_document_deny = json.dumps( {"Version": "2012-10-17", "Statement": [{ "Effect": "Deny", "Action": ["s3:PutObject", "s3:GetObject", "s3:DeleteObject"], "Resource": f"arn:aws:s3:::{bucket}/*"}, { "Effect": "Allow", "Action": ["s3:DeleteBucket"], "Resource": f"arn:aws:s3:::{bucket}"}]} ) client.put_user_policy(PolicyDocument=policy_document_deny, PolicyName='DenyAccessPolicy', UserName=get_alt_user_id()) e = assert_raises(ClientError, s3_client_alt.put_object, Bucket=bucket, Key='foo') status, error_code = _get_status_and_error_code(e.response) eq(status, 403) eq(error_code, 'AccessDenied') e = assert_raises(ClientError, s3_client_alt.get_object, Bucket=bucket, Key='foo') status, error_code = _get_status_and_error_code(e.response) eq(status, 403) eq(error_code, 'AccessDenied') e = assert_raises(ClientError, s3_client_alt.delete_object, Bucket=bucket, Key='foo') status, error_code = _get_status_and_error_code(e.response) eq(status, 403) eq(error_code, 'AccessDenied') response = client.delete_user_policy(PolicyName='DenyAccessPolicy', UserName=get_alt_user_id()) eq(response['ResponseMetadata']['HTTPStatusCode'], 200) @attr(resource='user-policy') @attr(method='s3 Actions') @attr(operation='Verify Allow Multipart Actions in user Policy') @attr(assertion='succeeds') @attr('user-policy') @attr('test_of_iam') def test_allow_multipart_actions_in_user_policy(): client = get_iam_client() s3_client_alt = get_alt_client() s3_client_iam = get_iam_s3client() bucket = get_new_bucket(client=s3_client_iam) policy_document_allow = json.dumps( {"Version": "2012-10-17", "Statement": { "Effect": "Allow", "Action": ["s3:ListBucketMultipartUploads", "s3:AbortMultipartUpload"], "Resource": "arn:aws:s3:::*"}} ) response = client.put_user_policy(PolicyDocument=policy_document_allow, PolicyName='AllowAccessPolicy', UserName=get_alt_user_id()) eq(response['ResponseMetadata']['HTTPStatusCode'], 200) key = "mymultipart" mb = 1024 * 1024 (upload_id, _, _) = _multipart_upload(client=s3_client_iam, bucket_name=bucket, key=key, size=5 * mb) response = s3_client_alt.list_multipart_uploads(Bucket=bucket) eq(response['ResponseMetadata']['HTTPStatusCode'], 200) response = s3_client_alt.abort_multipart_upload(Bucket=bucket, Key=key, UploadId=upload_id) eq(response['ResponseMetadata']['HTTPStatusCode'], 204) response = s3_client_iam.delete_bucket(Bucket=bucket) eq(response['ResponseMetadata']['HTTPStatusCode'], 204) response = client.delete_user_policy(PolicyName='AllowAccessPolicy', UserName=get_alt_user_id()) eq(response['ResponseMetadata']['HTTPStatusCode'], 200) @attr(resource='user-policy') @attr(method='s3 Actions') @attr(operation='Verify Deny Multipart Actions in user Policy') @attr(assertion='succeeds') @attr('user-policy') @attr('test_of_iam') @attr('fails_on_dbstore') def test_deny_multipart_actions_in_user_policy(): client = get_iam_client() s3_client = get_alt_client() bucket = get_new_bucket(client=s3_client) policy_document_deny = json.dumps( {"Version": "2012-10-17", "Statement": { "Effect": "Deny", "Action": ["s3:ListBucketMultipartUploads", "s3:AbortMultipartUpload"], "Resource": "arn:aws:s3:::*"}} ) response = client.put_user_policy(PolicyDocument=policy_document_deny, PolicyName='DenyAccessPolicy', UserName=get_alt_user_id()) eq(response['ResponseMetadata']['HTTPStatusCode'], 200) key = "mymultipart" mb = 1024 * 1024 (upload_id, _, _) = _multipart_upload(client=s3_client, bucket_name=bucket, key=key, size=5 * mb) e = assert_raises(ClientError, s3_client.list_multipart_uploads, Bucket=bucket) status, error_code = _get_status_and_error_code(e.response) eq(status, 403) eq(error_code, 'AccessDenied') e = assert_raises(ClientError, s3_client.abort_multipart_upload, Bucket=bucket, Key=key, UploadId=upload_id) status, error_code = _get_status_and_error_code(e.response) eq(status, 403) eq(error_code, 'AccessDenied') response = s3_client.delete_bucket(Bucket=bucket) eq(response['ResponseMetadata']['HTTPStatusCode'], 204) response = client.delete_user_policy(PolicyName='DenyAccessPolicy', UserName=get_alt_user_id()) eq(response['ResponseMetadata']['HTTPStatusCode'], 200) @attr(resource='user-policy') @attr(method='s3 Actions') @attr(operation='Verify Allow Tagging Actions in user Policy') @attr(assertion='succeeds') @attr('user-policy') @attr('test_of_iam') @attr('fails_on_dbstore') def test_allow_tagging_actions_in_user_policy(): client = get_iam_client() s3_client_alt = get_alt_client() s3_client_iam = get_iam_s3client() bucket = get_new_bucket(client=s3_client_iam) policy_document_allow = json.dumps( {"Version": "2012-10-17", "Statement": { "Effect": "Allow", "Action": ["s3:PutBucketTagging", "s3:GetBucketTagging", "s3:PutObjectTagging", "s3:GetObjectTagging"], "Resource": f"arn:aws:s3:::*"}} ) client.put_user_policy(PolicyDocument=policy_document_allow, PolicyName='AllowAccessPolicy', UserName=get_alt_user_id()) tags = {'TagSet': [{'Key': 'Hello', 'Value': 'World'}, ]} response = s3_client_alt.put_bucket_tagging(Bucket=bucket, Tagging=tags) eq(response['ResponseMetadata']['HTTPStatusCode'], 200) response = s3_client_alt.get_bucket_tagging(Bucket=bucket) eq(response['ResponseMetadata']['HTTPStatusCode'], 200) eq(response['TagSet'][0]['Key'], 'Hello') eq(response['TagSet'][0]['Value'], 'World') obj_key = 'obj' response = s3_client_iam.put_object(Bucket=bucket, Key=obj_key, Body='obj_body') eq(response['ResponseMetadata']['HTTPStatusCode'], 200) response = s3_client_alt.put_object_tagging(Bucket=bucket, Key=obj_key, Tagging=tags) eq(response['ResponseMetadata']['HTTPStatusCode'], 200) response = s3_client_alt.get_object_tagging(Bucket=bucket, Key=obj_key) eq(response['ResponseMetadata']['HTTPStatusCode'], 200) eq(response['TagSet'], tags['TagSet']) response = s3_client_iam.delete_object(Bucket=bucket, Key=obj_key) eq(response['ResponseMetadata']['HTTPStatusCode'], 204) response = s3_client_iam.delete_bucket(Bucket=bucket) eq(response['ResponseMetadata']['HTTPStatusCode'], 204) response = client.delete_user_policy(PolicyName='AllowAccessPolicy', UserName=get_alt_user_id()) eq(response['ResponseMetadata']['HTTPStatusCode'], 200) @attr(resource='user-policy') @attr(method='s3 Actions') @attr(operation='Verify Deny Tagging Actions in user Policy') @attr(assertion='succeeds') @attr('user-policy') @attr('test_of_iam') @attr('fails_on_dbstore') def test_deny_tagging_actions_in_user_policy(): client = get_iam_client() s3_client = get_alt_client() bucket = get_new_bucket(client=s3_client) policy_document_deny = json.dumps( {"Version": "2012-10-17", "Statement": { "Effect": "Deny", "Action": ["s3:PutBucketTagging", "s3:GetBucketTagging", "s3:PutObjectTagging", "s3:DeleteObjectTagging"], "Resource": "arn:aws:s3:::*"}} ) client.put_user_policy(PolicyDocument=policy_document_deny, PolicyName='DenyAccessPolicy', UserName=get_alt_user_id()) tags = {'TagSet': [{'Key': 'Hello', 'Value': 'World'}, ]} e = assert_raises(ClientError, s3_client.put_bucket_tagging, Bucket=bucket, Tagging=tags) status, error_code = _get_status_and_error_code(e.response) eq(status, 403) eq(error_code, 'AccessDenied') e = assert_raises(ClientError, s3_client.get_bucket_tagging, Bucket=bucket) status, error_code = _get_status_and_error_code(e.response) eq(status, 403) eq(error_code, 'AccessDenied') obj_key = 'obj' response = s3_client.put_object(Bucket=bucket, Key=obj_key, Body='obj_body') eq(response['ResponseMetadata']['HTTPStatusCode'], 200) e = assert_raises(ClientError, s3_client.put_object_tagging, Bucket=bucket, Key=obj_key, Tagging=tags) status, error_code = _get_status_and_error_code(e.response) eq(status, 403) eq(error_code, 'AccessDenied') e = assert_raises(ClientError, s3_client.delete_object_tagging, Bucket=bucket, Key=obj_key) status, error_code = _get_status_and_error_code(e.response) eq(status, 403) eq(error_code, 'AccessDenied') response = s3_client.delete_object(Bucket=bucket, Key=obj_key) eq(response['ResponseMetadata']['HTTPStatusCode'], 204) response = s3_client.delete_bucket(Bucket=bucket) eq(response['ResponseMetadata']['HTTPStatusCode'], 204) response = client.delete_user_policy(PolicyName='DenyAccessPolicy', UserName=get_alt_user_id()) eq(response['ResponseMetadata']['HTTPStatusCode'], 200) @attr(resource='user-policy') @attr(method='put') @attr(operation='Verify conflicting user policy statements') @attr(assertion='succeeds') @attr('user-policy') @attr('test_of_iam') @attr('fails_on_dbstore') def test_verify_conflicting_user_policy_statements(): s3client = get_alt_client() bucket = get_new_bucket(client=s3client) policy_document = json.dumps( {"Version": "2012-10-17", "Statement": [ {"Sid": "98AB54CG", "Effect": "Allow", "Action": "s3:ListBucket", "Resource": f"arn:aws:s3:::{bucket}"}, {"Sid": "98AB54CA", "Effect": "Deny", "Action": "s3:ListBucket", "Resource": f"arn:aws:s3:::{bucket}"} ]} ) client = get_iam_client() response = client.put_user_policy(PolicyDocument=policy_document, PolicyName='DenyAccessPolicy', UserName=get_alt_user_id()) eq(response['ResponseMetadata']['HTTPStatusCode'], 200) e = assert_raises(ClientError, s3client.list_objects, Bucket=bucket) status, error_code = _get_status_and_error_code(e.response) eq(status, 403) eq(error_code, 'AccessDenied') response = client.delete_user_policy(PolicyName='DenyAccessPolicy', UserName=get_alt_user_id()) eq(response['ResponseMetadata']['HTTPStatusCode'], 200) @attr(resource='user-policy') @attr(method='put') @attr(operation='Verify conflicting user policies') @attr(assertion='succeeds') @attr('user-policy') @attr('test_of_iam') @attr('fails_on_dbstore') def test_verify_conflicting_user_policies(): s3client = get_alt_client() bucket = get_new_bucket(client=s3client) policy_allow = json.dumps( {"Version": "2012-10-17", "Statement": {"Sid": "98AB54CG", "Effect": "Allow", "Action": "s3:ListBucket", "Resource": f"arn:aws:s3:::{bucket}"}} ) policy_deny = json.dumps( {"Version": "2012-10-17", "Statement": {"Sid": "98AB54CGZ", "Effect": "Deny", "Action": "s3:ListBucket", "Resource": f"arn:aws:s3:::{bucket}"}} ) client = get_iam_client() response = client.put_user_policy(PolicyDocument=policy_allow, PolicyName='AllowAccessPolicy', UserName=get_alt_user_id()) eq(response['ResponseMetadata']['HTTPStatusCode'], 200) response = client.put_user_policy(PolicyDocument=policy_deny, PolicyName='DenyAccessPolicy', UserName=get_alt_user_id()) eq(response['ResponseMetadata']['HTTPStatusCode'], 200) e = assert_raises(ClientError, s3client.list_objects, Bucket=bucket) status, error_code = _get_status_and_error_code(e.response) eq(status, 403) eq(error_code, 'AccessDenied') response = client.delete_user_policy(PolicyName='AllowAccessPolicy', UserName=get_alt_user_id()) eq(response['ResponseMetadata']['HTTPStatusCode'], 200) response = client.delete_user_policy(PolicyName='DenyAccessPolicy', UserName=get_alt_user_id()) eq(response['ResponseMetadata']['HTTPStatusCode'], 200) @attr(resource='user-policy') @attr(operation='Verify Allow Actions for IAM user policies') @attr(assertion='succeeds') @attr('user-policy') @attr('test_of_iam') def test_verify_allow_iam_actions(): policy1 = json.dumps( {"Version": "2012-10-17", "Statement": {"Sid": "98AB54CGA", "Effect": "Allow", "Action": ["iam:PutUserPolicy", "iam:GetUserPolicy", "iam:ListUserPolicies", "iam:DeleteUserPolicy"], "Resource": f"arn:aws:iam:::user/{get_alt_user_id()}"}} ) client1 = get_iam_client() iam_client_alt = get_alt_iam_client() response = client1.put_user_policy(PolicyDocument=policy1, PolicyName='AllowAccessPolicy', UserName=get_alt_user_id()) eq(response['ResponseMetadata']['HTTPStatusCode'], 200) response = iam_client_alt.get_user_policy(PolicyName='AllowAccessPolicy', UserName=get_alt_user_id()) eq(response['ResponseMetadata']['HTTPStatusCode'], 200) response = iam_client_alt.list_user_policies(UserName=get_alt_user_id()) eq(response['ResponseMetadata']['HTTPStatusCode'], 200) response = iam_client_alt.delete_user_policy(PolicyName='AllowAccessPolicy', UserName=get_alt_user_id()) eq(response['ResponseMetadata']['HTTPStatusCode'], 200)