diff --git a/README.rst b/README.rst index f94f7f2..42ad7d5 100644 --- a/README.rst +++ b/README.rst @@ -71,3 +71,31 @@ You can filter tests based on the attributes. There is a attribute named ``test_ For running ``webidentity_test`` you'll need have Keycloak running. In order to run any STS test you'll need to add "iam" section to the config file. For further reference on how your config file should look check ``s3tests.conf.SAMPLE``. + +======================== + IAM policy tests +======================== + +This is a set of IAM policy tests. +This section covers tests for user policies such as Put, Get, List, Delete, user policies with s3 actions, conflicting user policies etc +These tests uses Boto3 libraries. Tests are written in the ``s3test_boto3`` directory. + +These iam policy tests uses two users with profile name "iam" and "s3 alt" as mentioned in s3tests.conf.SAMPLE. +If Ceph cluster is started with vstart, then above two users will get created as part of vstart with same access key, secrete key etc as mentioned in s3tests.conf.SAMPLE. +Out of those two users, "iam" user is with capabilities --caps=user-policy=* and "s3 alt" user is without capabilities. +Adding above capabilities to "iam" user is also taken care by vstart (If Ceph cluster is started with vstart). + +To run these tests, create configuration file with section "iam" and "s3 alt" refer s3tests.conf.SAMPLE. +Once you have that configuration file copied and edited, you can run all the tests with:: + + S3TEST_CONF=your.conf ./virtualenv/bin/nosetests s3tests_boto3.functional.test_iam + +You can also specify specific test to run:: + + S3TEST_CONF=your.conf ./virtualenv/bin/nosetests s3tests_boto3.functional.test_iam:test_put_user_policy + +Some tests have attributes set such as "fails_on_rgw". +You can filter tests based on their attributes:: + + S3TEST_CONF=your.conf ./virtualenv/bin/nosetests s3tests_boto3.functional.test_iam -a '!fails_on_rgw' + diff --git a/s3tests_boto3/functional/__init__.py b/s3tests_boto3/functional/__init__.py index 1f1763c..4d03d65 100644 --- a/s3tests_boto3/functional/__init__.py +++ b/s3tests_boto3/functional/__init__.py @@ -435,6 +435,18 @@ def get_iam_client(client_config=None): config=client_config) return client +def get_iam_s3client(client_config=None): + if client_config == None: + client_config = Config(signature_version='s3v4') + client = boto3.client(service_name='s3', + aws_access_key_id=get_iam_access_key(), + aws_secret_access_key=get_iam_secret_key(), + endpoint_url=config.default_endpoint, + use_ssl=config.default_is_secure, + verify=config.default_ssl_verify, + config=client_config) + return client + def get_alt_client(client_config=None): if client_config == None: client_config = Config(signature_version='s3v4') @@ -484,6 +496,17 @@ def get_tenant_iam_client(): use_ssl=config.default_is_secure) return client +def get_alt_iam_client(): + + client = boto3.client(service_name='iam', + region_name='', + aws_access_key_id=config.alt_access_key, + aws_secret_access_key=config.alt_secret_key, + endpoint_url=config.default_endpoint, + verify=config.default_ssl_verify, + use_ssl=config.default_is_secure) + return client + def get_unauthenticated_client(): client = boto3.client(service_name='s3', aws_access_key_id='', diff --git a/s3tests_boto3/functional/test_iam.py b/s3tests_boto3/functional/test_iam.py new file mode 100644 index 0000000..a927c9d --- /dev/null +++ b/s3tests_boto3/functional/test_iam.py @@ -0,0 +1,963 @@ +import json + +from botocore.exceptions import ClientError +from nose.plugins.attrib import attr +from nose.tools import eq_ as eq + +from s3tests_boto3.functional.utils import assert_raises +from s3tests_boto3.functional.test_s3 import _multipart_upload +from . import ( + get_alt_client, + get_iam_client, + get_new_bucket, + get_iam_s3client, + get_alt_iam_client, + get_alt_user_id, +) +from .utils import _get_status, _get_status_and_error_code + + +@attr(resource='user-policy') +@attr(method='put') +@attr(operation='Verify Put User Policy') +@attr(assertion='succeeds') +@attr('user-policy') +@attr('test_of_iam') +def test_put_user_policy(): + client = get_iam_client() + + policy_document = json.dumps( + {"Version": "2012-10-17", + "Statement": { + "Effect": "Allow", + "Action": "*", + "Resource": "*"}} + ) + response = client.put_user_policy(PolicyDocument=policy_document, PolicyName='AllAccessPolicy', + UserName=get_alt_user_id()) + eq(response['ResponseMetadata']['HTTPStatusCode'], 200) + response = client.delete_user_policy(PolicyName='AllAccessPolicy', + UserName=get_alt_user_id()) + eq(response['ResponseMetadata']['HTTPStatusCode'], 200) + + +@attr(resource='user-policy') +@attr(method='put') +@attr(operation='Verify Put User Policy with invalid user') +@attr(assertion='succeeds') +@attr('user-policy') +@attr('test_of_iam') +def test_put_user_policy_invalid_user(): + client = get_iam_client() + + policy_document = json.dumps( + {"Version": "2012-10-17", + "Statement": { + "Effect": "Allow", + "Action": "*", + "Resource": "*"}} + ) + e = assert_raises(ClientError, client.put_user_policy, PolicyDocument=policy_document, + PolicyName='AllAccessPolicy', UserName="some-non-existing-user-id") + status = _get_status(e.response) + eq(status, 404) + + +@attr(resource='user-policy') +@attr(method='put') +@attr(operation='Verify Put User Policy using parameter value outside limit') +@attr(assertion='succeeds') +@attr('user-policy') +@attr('test_of_iam') +def test_put_user_policy_parameter_limit(): + client = get_iam_client() + + policy_document = json.dumps( + {"Version": "2012-10-17", + "Statement": [{ + "Effect": "Allow", + "Action": "*", + "Resource": "*"}] * 1000 + } + ) + e = assert_raises(ClientError, client.put_user_policy, PolicyDocument=policy_document, + PolicyName='AllAccessPolicy' * 10, UserName=get_alt_user_id()) + status = _get_status(e.response) + eq(status, 400) + + +@attr(resource='user-policy') +@attr(method='put') +@attr(operation='Verify Put User Policy using invalid policy document elements') +@attr(assertion='succeeds') +@attr('user-policy') +@attr('test_of_iam') +@attr('fails_on_rgw') +def test_put_user_policy_invalid_element(): + client = get_iam_client() + + # With Version other than 2012-10-17 + policy_document = json.dumps( + {"Version": "2010-10-17", + "Statement": [{ + "Effect": "Allow", + "Action": "*", + "Resource": "*"}] + } + ) + e = assert_raises(ClientError, client.put_user_policy, PolicyDocument=policy_document, + PolicyName='AllAccessPolicy', UserName=get_alt_user_id()) + status = _get_status(e.response) + eq(status, 400) + + # With no Statement + policy_document = json.dumps( + { + "Version": "2012-10-17", + } + ) + e = assert_raises(ClientError, client.put_user_policy, PolicyDocument=policy_document, + PolicyName='AllAccessPolicy', UserName=get_alt_user_id()) + status = _get_status(e.response) + eq(status, 400) + + # with same Sid for 2 statements + policy_document = json.dumps( + {"Version": "2012-10-17", + "Statement": [ + {"Sid": "98AB54CF", + "Effect": "Allow", + "Action": "*", + "Resource": "*"}, + {"Sid": "98AB54CF", + "Effect": "Allow", + "Action": "*", + "Resource": "*"}] + } + ) + e = assert_raises(ClientError, client.put_user_policy, PolicyDocument=policy_document, + PolicyName='AllAccessPolicy', UserName=get_alt_user_id()) + status = _get_status(e.response) + eq(status, 400) + + # with Principal + policy_document = json.dumps( + {"Version": "2012-10-17", + "Statement": [{ + "Effect": "Allow", + "Action": "*", + "Resource": "*", + "Principal": "arn:aws:iam:::username"}] + } + ) + e = assert_raises(ClientError, client.put_user_policy, PolicyDocument=policy_document, + PolicyName='AllAccessPolicy', UserName=get_alt_user_id()) + status = _get_status(e.response) + eq(status, 400) + + +@attr(resource='user-policy') +@attr(method='put') +@attr(operation='Verify Put a policy that already exists') +@attr(assertion='succeeds') +@attr('user-policy') +@attr('test_of_iam') +def test_put_existing_user_policy(): + client = get_iam_client() + + policy_document = json.dumps( + {"Version": "2012-10-17", + "Statement": { + "Effect": "Allow", + "Action": "*", + "Resource": "*"} + } + ) + response = client.put_user_policy(PolicyDocument=policy_document, PolicyName='AllAccessPolicy', + UserName=get_alt_user_id()) + eq(response['ResponseMetadata']['HTTPStatusCode'], 200) + client.put_user_policy(PolicyDocument=policy_document, PolicyName='AllAccessPolicy', + UserName=get_alt_user_id()) + client.delete_user_policy(PolicyName='AllAccessPolicy', UserName=get_alt_user_id()) + + +@attr(resource='user-policy') +@attr(method='put') +@attr(operation='Verify List User policies') +@attr(assertion='succeeds') +@attr('user-policy') +@attr('test_of_iam') +def test_list_user_policy(): + client = get_iam_client() + + policy_document = json.dumps( + {"Version": "2012-10-17", + "Statement": { + "Effect": "Allow", + "Action": "*", + "Resource": "*"} + } + ) + response = client.put_user_policy(PolicyDocument=policy_document, PolicyName='AllAccessPolicy', + UserName=get_alt_user_id()) + eq(response['ResponseMetadata']['HTTPStatusCode'], 200) + response = client.list_user_policies(UserName=get_alt_user_id()) + eq(response['ResponseMetadata']['HTTPStatusCode'], 200) + client.delete_user_policy(PolicyName='AllAccessPolicy', UserName=get_alt_user_id()) + + +@attr(resource='user-policy') +@attr(method='put') +@attr(operation='Verify List User policies with invalid user') +@attr(assertion='succeeds') +@attr('user-policy') +@attr('test_of_iam') +def test_list_user_policy_invalid_user(): + client = get_iam_client() + e = assert_raises(ClientError, client.list_user_policies, UserName="some-non-existing-user-id") + status = _get_status(e.response) + eq(status, 404) + + +@attr(resource='user-policy') +@attr(method='get') +@attr(operation='Verify Get User policy') +@attr(assertion='succeeds') +@attr('user-policy') +@attr('test_of_iam') +def test_get_user_policy(): + client = get_iam_client() + + policy_document = json.dumps( + {"Version": "2012-10-17", + "Statement": { + "Effect": "Allow", + "Action": "*", + "Resource": "*"}} + ) + response = client.put_user_policy(PolicyDocument=policy_document, PolicyName='AllAccessPolicy', + UserName=get_alt_user_id()) + eq(response['ResponseMetadata']['HTTPStatusCode'], 200) + response = client.get_user_policy(PolicyName='AllAccessPolicy', UserName=get_alt_user_id()) + eq(response['ResponseMetadata']['HTTPStatusCode'], 200) + + response = client.delete_user_policy(PolicyName='AllAccessPolicy', + UserName=get_alt_user_id()) + eq(response['ResponseMetadata']['HTTPStatusCode'], 200) + + +@attr(resource='user-policy') +@attr(method='get') +@attr(operation='Verify Get User Policy with invalid user') +@attr(assertion='succeeds') +@attr('user-policy') +@attr('test_of_iam') +def test_get_user_policy_invalid_user(): + client = get_iam_client() + + policy_document = json.dumps( + {"Version": "2012-10-17", + "Statement": { + "Effect": "Allow", + "Action": "*", + "Resource": "*"}} + ) + response = client.put_user_policy(PolicyDocument=policy_document, PolicyName='AllAccessPolicy', + UserName=get_alt_user_id()) + eq(response['ResponseMetadata']['HTTPStatusCode'], 200) + e = assert_raises(ClientError, client.get_user_policy, PolicyName='AllAccessPolicy', + UserName="some-non-existing-user-id") + status = _get_status(e.response) + eq(status, 404) + client.delete_user_policy(PolicyName='AllAccessPolicy', UserName=get_alt_user_id()) + + +@attr(resource='user-policy') +@attr(method='get') +@attr(operation='Verify Get User Policy with invalid policy name') +@attr(assertion='succeeds') +@attr('user-policy') +@attr('test_of_iam') +@attr('fails_on_rgw') +def test_get_user_policy_invalid_policy_name(): + client = get_iam_client() + + policy_document = json.dumps( + {"Version": "2012-10-17", + "Statement": { + "Effect": "Allow", + "Action": "*", + "Resource": "*"}} + ) + client.put_user_policy(PolicyDocument=policy_document, PolicyName='AllAccessPolicy', + UserName=get_alt_user_id()) + e = assert_raises(ClientError, client.get_user_policy, PolicyName='non-existing-policy-name', + UserName=get_alt_user_id()) + status = _get_status(e.response) + eq(status, 404) + client.delete_user_policy(PolicyName='AllAccessPolicy', UserName=get_alt_user_id()) + + +@attr(resource='user-policy') +@attr(method='get') +@attr(operation='Verify Get Deleted User Policy') +@attr(assertion='succeeds') +@attr('user-policy') +@attr('test_of_iam') +@attr('fails_on_rgw') +def test_get_deleted_user_policy(): + client = get_iam_client() + + policy_document = json.dumps( + {"Version": "2012-10-17", + "Statement": { + "Effect": "Allow", + "Action": "*", + "Resource": "*"}} + ) + client.put_user_policy(PolicyDocument=policy_document, PolicyName='AllAccessPolicy', + UserName=get_alt_user_id()) + client.delete_user_policy(PolicyName='AllAccessPolicy', UserName=get_alt_user_id()) + e = assert_raises(ClientError, client.get_user_policy, PolicyName='AllAccessPolicy', + UserName=get_alt_user_id()) + status = _get_status(e.response) + eq(status, 404) + + +@attr(resource='user-policy') +@attr(method='get') +@attr(operation='Verify Get a policy from multiple policies for a user') +@attr(assertion='succeeds') +@attr('user-policy') +@attr('test_of_iam') +def test_get_user_policy_from_multiple_policies(): + client = get_iam_client() + + policy_document_allow = json.dumps( + {"Version": "2012-10-17", + "Statement": { + "Effect": "Allow", + "Action": "*", + "Resource": "*"}} + ) + + response = client.put_user_policy(PolicyDocument=policy_document_allow, + PolicyName='AllowAccessPolicy1', + UserName=get_alt_user_id()) + eq(response['ResponseMetadata']['HTTPStatusCode'], 200) + response = client.put_user_policy(PolicyDocument=policy_document_allow, + PolicyName='AllowAccessPolicy2', + UserName=get_alt_user_id()) + eq(response['ResponseMetadata']['HTTPStatusCode'], 200) + response = client.get_user_policy(PolicyName='AllowAccessPolicy2', + UserName=get_alt_user_id()) + eq(response['ResponseMetadata']['HTTPStatusCode'], 200) + + response = client.delete_user_policy(PolicyName='AllowAccessPolicy1', + UserName=get_alt_user_id()) + eq(response['ResponseMetadata']['HTTPStatusCode'], 200) + response = client.delete_user_policy(PolicyName='AllowAccessPolicy2', + UserName=get_alt_user_id()) + eq(response['ResponseMetadata']['HTTPStatusCode'], 200) + + +@attr(resource='user-policy') +@attr(method='delete') +@attr(operation='Verify Delete User Policy') +@attr(assertion='succeeds') +@attr('user-policy') +@attr('test_of_iam') +def test_delete_user_policy(): + client = get_iam_client() + + policy_document_allow = json.dumps( + {"Version": "2012-10-17", + "Statement": { + "Effect": "Allow", + "Action": "*", + "Resource": "*"}} + ) + + response = client.put_user_policy(PolicyDocument=policy_document_allow, + PolicyName='AllowAccessPolicy', + UserName=get_alt_user_id()) + eq(response['ResponseMetadata']['HTTPStatusCode'], 200) + response = client.delete_user_policy(PolicyName='AllowAccessPolicy', + UserName=get_alt_user_id()) + eq(response['ResponseMetadata']['HTTPStatusCode'], 200) + + +@attr(resource='user-policy') +@attr(method='delete') +@attr(operation='Verify Delete User Policy with invalid user') +@attr(assertion='succeeds') +@attr('user-policy') +@attr('test_of_iam') +def test_delete_user_policy_invalid_user(): + client = get_iam_client() + + policy_document_allow = json.dumps( + {"Version": "2012-10-17", + "Statement": { + "Effect": "Allow", + "Action": "*", + "Resource": "*"}} + ) + + response = client.put_user_policy(PolicyDocument=policy_document_allow, + PolicyName='AllowAccessPolicy', + UserName=get_alt_user_id()) + eq(response['ResponseMetadata']['HTTPStatusCode'], 200) + e = assert_raises(ClientError, client.delete_user_policy, PolicyName='AllAccessPolicy', + UserName="some-non-existing-user-id") + status = _get_status(e.response) + eq(status, 404) + response = client.delete_user_policy(PolicyName='AllowAccessPolicy', + UserName=get_alt_user_id()) + eq(response['ResponseMetadata']['HTTPStatusCode'], 200) + + +@attr(resource='user-policy') +@attr(method='delete') +@attr(operation='Verify Delete User Policy with invalid policy name') +@attr(assertion='succeeds') +@attr('user-policy') +@attr('test_of_iam') +def test_delete_user_policy_invalid_policy_name(): + client = get_iam_client() + + policy_document_allow = json.dumps( + {"Version": "2012-10-17", + "Statement": { + "Effect": "Allow", + "Action": "*", + "Resource": "*"}} + ) + + response = client.put_user_policy(PolicyDocument=policy_document_allow, + PolicyName='AllowAccessPolicy', + UserName=get_alt_user_id()) + eq(response['ResponseMetadata']['HTTPStatusCode'], 200) + e = assert_raises(ClientError, client.delete_user_policy, PolicyName='non-existing-policy-name', + UserName=get_alt_user_id()) + status = _get_status(e.response) + eq(status, 404) + response = client.delete_user_policy(PolicyName='AllowAccessPolicy', + UserName=get_alt_user_id()) + eq(response['ResponseMetadata']['HTTPStatusCode'], 200) + + +@attr(resource='user-policy') +@attr(method='delete') +@attr(operation='Verify Delete multiple User policies for a user') +@attr(assertion='succeeds') +@attr('user-policy') +@attr('test_of_iam') +def test_delete_user_policy_from_multiple_policies(): + client = get_iam_client() + + policy_document_allow = json.dumps( + {"Version": "2012-10-17", + "Statement": { + "Effect": "Allow", + "Action": "*", + "Resource": "*"}} + ) + + response = client.put_user_policy(PolicyDocument=policy_document_allow, + PolicyName='AllowAccessPolicy1', + UserName=get_alt_user_id()) + eq(response['ResponseMetadata']['HTTPStatusCode'], 200) + response = client.put_user_policy(PolicyDocument=policy_document_allow, + PolicyName='AllowAccessPolicy2', + UserName=get_alt_user_id()) + eq(response['ResponseMetadata']['HTTPStatusCode'], 200) + response = client.put_user_policy(PolicyDocument=policy_document_allow, + PolicyName='AllowAccessPolicy3', + UserName=get_alt_user_id()) + eq(response['ResponseMetadata']['HTTPStatusCode'], 200) + response = client.delete_user_policy(PolicyName='AllowAccessPolicy1', + UserName=get_alt_user_id()) + eq(response['ResponseMetadata']['HTTPStatusCode'], 200) + response = client.delete_user_policy(PolicyName='AllowAccessPolicy2', + UserName=get_alt_user_id()) + eq(response['ResponseMetadata']['HTTPStatusCode'], 200) + response = client.get_user_policy(PolicyName='AllowAccessPolicy3', + UserName=get_alt_user_id()) + eq(response['ResponseMetadata']['HTTPStatusCode'], 200) + + response = client.delete_user_policy(PolicyName='AllowAccessPolicy3', + UserName=get_alt_user_id()) + eq(response['ResponseMetadata']['HTTPStatusCode'], 200) + + +@attr(resource='user-policy') +@attr(method='s3 Actions') +@attr(operation='Verify Allow Bucket Actions in user Policy') +@attr(assertion='succeeds') +@attr('user-policy') +@attr('test_of_iam') +def test_allow_bucket_actions_in_user_policy(): + client = get_iam_client() + s3_client_alt = get_alt_client() + + s3_client_iam = get_iam_s3client() + bucket = get_new_bucket(client=s3_client_iam) + s3_client_iam.put_object(Bucket=bucket, Key='foo', Body='bar') + + policy_document_allow = json.dumps( + {"Version": "2012-10-17", + "Statement": { + "Effect": "Allow", + "Action": ["s3:ListBucket", "s3:DeleteBucket"], + "Resource": f"arn:aws:s3:::{bucket}"}} + ) + + response = client.put_user_policy(PolicyDocument=policy_document_allow, + PolicyName='AllowAccessPolicy', UserName=get_alt_user_id()) + eq(response['ResponseMetadata']['HTTPStatusCode'], 200) + + response = s3_client_alt.list_objects(Bucket=bucket) + object_found = False + for object_received in response['Contents']: + if "foo" == object_received['Key']: + object_found = True + break + if not object_found: + raise AssertionError("Object is not listed") + + response = s3_client_iam.delete_object(Bucket=bucket, Key='foo') + eq(response['ResponseMetadata']['HTTPStatusCode'], 204) + + response = s3_client_alt.delete_bucket(Bucket=bucket) + eq(response['ResponseMetadata']['HTTPStatusCode'], 204) + + response = s3_client_iam.list_buckets() + for bucket in response['Buckets']: + if bucket == bucket['Name']: + raise AssertionError("deleted bucket is getting listed") + + response = client.delete_user_policy(PolicyName='AllowAccessPolicy', + UserName=get_alt_user_id()) + eq(response['ResponseMetadata']['HTTPStatusCode'], 200) + + +@attr(resource='user-policy') +@attr(method='s3 Actions') +@attr(operation='Verify Deny Bucket Actions in user Policy') +@attr(assertion='succeeds') +@attr('user-policy') +@attr('test_of_iam') +def test_deny_bucket_actions_in_user_policy(): + client = get_iam_client() + s3_client = get_alt_client() + bucket = get_new_bucket(client=s3_client) + + policy_document_deny = json.dumps( + {"Version": "2012-10-17", + "Statement": { + "Effect": "Deny", + "Action": ["s3:ListAllMyBuckets", "s3:DeleteBucket"], + "Resource": "arn:aws:s3:::*"}} + ) + + response = client.put_user_policy(PolicyDocument=policy_document_deny, + PolicyName='DenyAccessPolicy', + UserName=get_alt_user_id()) + eq(response['ResponseMetadata']['HTTPStatusCode'], 200) + + e = assert_raises(ClientError, s3_client.list_buckets, Bucket=bucket) + status, error_code = _get_status_and_error_code(e.response) + eq(status, 403) + eq(error_code, 'AccessDenied') + e = assert_raises(ClientError, s3_client.delete_bucket, Bucket=bucket) + status, error_code = _get_status_and_error_code(e.response) + eq(status, 403) + eq(error_code, 'AccessDenied') + response = client.delete_user_policy(PolicyName='DenyAccessPolicy', + UserName=get_alt_user_id()) + eq(response['ResponseMetadata']['HTTPStatusCode'], 200) + response = s3_client.delete_bucket(Bucket=bucket) + eq(response['ResponseMetadata']['HTTPStatusCode'], 204) + + +@attr(resource='user-policy') +@attr(method='s3 Actions') +@attr(operation='Verify Allow Object Actions in user Policy') +@attr(assertion='succeeds') +@attr('user-policy') +@attr('test_of_iam') +def test_allow_object_actions_in_user_policy(): + client = get_iam_client() + s3_client_alt = get_alt_client() + s3_client_iam = get_iam_s3client() + bucket = get_new_bucket(client=s3_client_iam) + + policy_document_allow = json.dumps( + {"Version": "2012-10-17", + "Statement": { + "Effect": "Allow", + "Action": ["s3:PutObject", "s3:GetObject", "s3:DeleteObject"], + "Resource": f"arn:aws:s3:::{bucket}/*"}} + ) + response = client.put_user_policy(PolicyDocument=policy_document_allow, + PolicyName='AllowAccessPolicy', UserName=get_alt_user_id()) + eq(response['ResponseMetadata']['HTTPStatusCode'], 200) + + s3_client_alt.put_object(Bucket=bucket, Key='foo', Body='bar') + response = s3_client_alt.get_object(Bucket=bucket, Key='foo') + body = response['Body'].read() + if type(body) is bytes: + body = body.decode() + eq(body, "bar") + response = s3_client_alt.delete_object(Bucket=bucket, Key='foo') + eq(response['ResponseMetadata']['HTTPStatusCode'], 204) + + e = assert_raises(ClientError, s3_client_iam.get_object, Bucket=bucket, Key='foo') + status, error_code = _get_status_and_error_code(e.response) + eq(status, 404) + eq(error_code, 'NoSuchKey') + response = s3_client_iam.delete_bucket(Bucket=bucket) + eq(response['ResponseMetadata']['HTTPStatusCode'], 204) + response = client.delete_user_policy(PolicyName='AllowAccessPolicy', + UserName=get_alt_user_id()) + eq(response['ResponseMetadata']['HTTPStatusCode'], 200) + + +@attr(resource='user-policy') +@attr(method='s3 Actions') +@attr(operation='Verify Deny Object Actions in user Policy') +@attr(assertion='succeeds') +@attr('user-policy') +@attr('test_of_iam') +def test_deny_object_actions_in_user_policy(): + client = get_iam_client() + s3_client_alt = get_alt_client() + bucket = get_new_bucket(client=s3_client_alt) + s3_client_alt.put_object(Bucket=bucket, Key='foo', Body='bar') + + policy_document_deny = json.dumps( + {"Version": "2012-10-17", + "Statement": [{ + "Effect": "Deny", + "Action": ["s3:PutObject", "s3:GetObject", "s3:DeleteObject"], + "Resource": f"arn:aws:s3:::{bucket}/*"}, { + "Effect": "Allow", + "Action": ["s3:DeleteBucket"], + "Resource": f"arn:aws:s3:::{bucket}"}]} + ) + client.put_user_policy(PolicyDocument=policy_document_deny, PolicyName='DenyAccessPolicy', + UserName=get_alt_user_id()) + + e = assert_raises(ClientError, s3_client_alt.put_object, Bucket=bucket, Key='foo') + status, error_code = _get_status_and_error_code(e.response) + eq(status, 403) + eq(error_code, 'AccessDenied') + e = assert_raises(ClientError, s3_client_alt.get_object, Bucket=bucket, Key='foo') + status, error_code = _get_status_and_error_code(e.response) + eq(status, 403) + eq(error_code, 'AccessDenied') + e = assert_raises(ClientError, s3_client_alt.delete_object, Bucket=bucket, Key='foo') + status, error_code = _get_status_and_error_code(e.response) + eq(status, 403) + eq(error_code, 'AccessDenied') + + response = client.delete_user_policy(PolicyName='DenyAccessPolicy', + UserName=get_alt_user_id()) + eq(response['ResponseMetadata']['HTTPStatusCode'], 200) + + +@attr(resource='user-policy') +@attr(method='s3 Actions') +@attr(operation='Verify Allow Multipart Actions in user Policy') +@attr(assertion='succeeds') +@attr('user-policy') +@attr('test_of_iam') +def test_allow_multipart_actions_in_user_policy(): + client = get_iam_client() + s3_client_alt = get_alt_client() + s3_client_iam = get_iam_s3client() + bucket = get_new_bucket(client=s3_client_iam) + + policy_document_allow = json.dumps( + {"Version": "2012-10-17", + "Statement": { + "Effect": "Allow", + "Action": ["s3:ListBucketMultipartUploads", "s3:AbortMultipartUpload"], + "Resource": "arn:aws:s3:::*"}} + ) + response = client.put_user_policy(PolicyDocument=policy_document_allow, + PolicyName='AllowAccessPolicy', UserName=get_alt_user_id()) + eq(response['ResponseMetadata']['HTTPStatusCode'], 200) + key = "mymultipart" + mb = 1024 * 1024 + + (upload_id, _, _) = _multipart_upload(client=s3_client_iam, bucket_name=bucket, key=key, + size=5 * mb) + response = s3_client_alt.list_multipart_uploads(Bucket=bucket) + eq(response['ResponseMetadata']['HTTPStatusCode'], 200) + response = s3_client_alt.abort_multipart_upload(Bucket=bucket, Key=key, UploadId=upload_id) + eq(response['ResponseMetadata']['HTTPStatusCode'], 204) + + response = s3_client_iam.delete_bucket(Bucket=bucket) + eq(response['ResponseMetadata']['HTTPStatusCode'], 204) + response = client.delete_user_policy(PolicyName='AllowAccessPolicy', + UserName=get_alt_user_id()) + eq(response['ResponseMetadata']['HTTPStatusCode'], 200) + + +@attr(resource='user-policy') +@attr(method='s3 Actions') +@attr(operation='Verify Deny Multipart Actions in user Policy') +@attr(assertion='succeeds') +@attr('user-policy') +@attr('test_of_iam') +def test_deny_multipart_actions_in_user_policy(): + client = get_iam_client() + s3_client = get_alt_client() + bucket = get_new_bucket(client=s3_client) + + policy_document_deny = json.dumps( + {"Version": "2012-10-17", + "Statement": { + "Effect": "Deny", + "Action": ["s3:ListBucketMultipartUploads", "s3:AbortMultipartUpload"], + "Resource": "arn:aws:s3:::*"}} + ) + response = client.put_user_policy(PolicyDocument=policy_document_deny, + PolicyName='DenyAccessPolicy', + UserName=get_alt_user_id()) + eq(response['ResponseMetadata']['HTTPStatusCode'], 200) + key = "mymultipart" + mb = 1024 * 1024 + + (upload_id, _, _) = _multipart_upload(client=s3_client, bucket_name=bucket, key=key, + size=5 * mb) + + e = assert_raises(ClientError, s3_client.list_multipart_uploads, Bucket=bucket) + status, error_code = _get_status_and_error_code(e.response) + eq(status, 403) + eq(error_code, 'AccessDenied') + + e = assert_raises(ClientError, s3_client.abort_multipart_upload, Bucket=bucket, + Key=key, UploadId=upload_id) + status, error_code = _get_status_and_error_code(e.response) + eq(status, 403) + eq(error_code, 'AccessDenied') + + response = s3_client.delete_bucket(Bucket=bucket) + eq(response['ResponseMetadata']['HTTPStatusCode'], 204) + response = client.delete_user_policy(PolicyName='DenyAccessPolicy', + UserName=get_alt_user_id()) + eq(response['ResponseMetadata']['HTTPStatusCode'], 200) + + +@attr(resource='user-policy') +@attr(method='s3 Actions') +@attr(operation='Verify Allow Tagging Actions in user Policy') +@attr(assertion='succeeds') +@attr('user-policy') +@attr('test_of_iam') +def test_allow_tagging_actions_in_user_policy(): + client = get_iam_client() + s3_client_alt = get_alt_client() + s3_client_iam = get_iam_s3client() + bucket = get_new_bucket(client=s3_client_iam) + + policy_document_allow = json.dumps( + {"Version": "2012-10-17", + "Statement": { + "Effect": "Allow", + "Action": ["s3:PutBucketTagging", "s3:GetBucketTagging", + "s3:PutObjectTagging", "s3:GetObjectTagging"], + "Resource": f"arn:aws:s3:::*"}} + ) + client.put_user_policy(PolicyDocument=policy_document_allow, PolicyName='AllowAccessPolicy', + UserName=get_alt_user_id()) + tags = {'TagSet': [{'Key': 'Hello', 'Value': 'World'}, ]} + + response = s3_client_alt.put_bucket_tagging(Bucket=bucket, Tagging=tags) + eq(response['ResponseMetadata']['HTTPStatusCode'], 200) + response = s3_client_alt.get_bucket_tagging(Bucket=bucket) + eq(response['ResponseMetadata']['HTTPStatusCode'], 200) + eq(response['TagSet'][0]['Key'], 'Hello') + eq(response['TagSet'][0]['Value'], 'World') + + obj_key = 'obj' + response = s3_client_iam.put_object(Bucket=bucket, Key=obj_key, Body='obj_body') + eq(response['ResponseMetadata']['HTTPStatusCode'], 200) + response = s3_client_alt.put_object_tagging(Bucket=bucket, Key=obj_key, Tagging=tags) + eq(response['ResponseMetadata']['HTTPStatusCode'], 200) + response = s3_client_alt.get_object_tagging(Bucket=bucket, Key=obj_key) + eq(response['ResponseMetadata']['HTTPStatusCode'], 200) + eq(response['TagSet'], tags['TagSet']) + + response = s3_client_iam.delete_object(Bucket=bucket, Key=obj_key) + eq(response['ResponseMetadata']['HTTPStatusCode'], 204) + response = s3_client_iam.delete_bucket(Bucket=bucket) + eq(response['ResponseMetadata']['HTTPStatusCode'], 204) + response = client.delete_user_policy(PolicyName='AllowAccessPolicy', + UserName=get_alt_user_id()) + eq(response['ResponseMetadata']['HTTPStatusCode'], 200) + + +@attr(resource='user-policy') +@attr(method='s3 Actions') +@attr(operation='Verify Deny Tagging Actions in user Policy') +@attr(assertion='succeeds') +@attr('user-policy') +@attr('test_of_iam') +def test_deny_tagging_actions_in_user_policy(): + client = get_iam_client() + s3_client = get_alt_client() + bucket = get_new_bucket(client=s3_client) + + policy_document_deny = json.dumps( + {"Version": "2012-10-17", + "Statement": { + "Effect": "Deny", + "Action": ["s3:PutBucketTagging", "s3:GetBucketTagging", + "s3:PutObjectTagging", "s3:DeleteObjectTagging"], + "Resource": "arn:aws:s3:::*"}} + ) + client.put_user_policy(PolicyDocument=policy_document_deny, PolicyName='DenyAccessPolicy', + UserName=get_alt_user_id()) + tags = {'TagSet': [{'Key': 'Hello', 'Value': 'World'}, ]} + + e = assert_raises(ClientError, s3_client.put_bucket_tagging, Bucket=bucket, Tagging=tags) + status, error_code = _get_status_and_error_code(e.response) + eq(status, 403) + eq(error_code, 'AccessDenied') + e = assert_raises(ClientError, s3_client.get_bucket_tagging, Bucket=bucket) + status, error_code = _get_status_and_error_code(e.response) + eq(status, 403) + eq(error_code, 'AccessDenied') + + obj_key = 'obj' + response = s3_client.put_object(Bucket=bucket, Key=obj_key, Body='obj_body') + eq(response['ResponseMetadata']['HTTPStatusCode'], 200) + + e = assert_raises(ClientError, s3_client.put_object_tagging, Bucket=bucket, Key=obj_key, + Tagging=tags) + status, error_code = _get_status_and_error_code(e.response) + eq(status, 403) + eq(error_code, 'AccessDenied') + e = assert_raises(ClientError, s3_client.delete_object_tagging, Bucket=bucket, Key=obj_key) + status, error_code = _get_status_and_error_code(e.response) + eq(status, 403) + eq(error_code, 'AccessDenied') + + response = s3_client.delete_object(Bucket=bucket, Key=obj_key) + eq(response['ResponseMetadata']['HTTPStatusCode'], 204) + response = s3_client.delete_bucket(Bucket=bucket) + eq(response['ResponseMetadata']['HTTPStatusCode'], 204) + response = client.delete_user_policy(PolicyName='DenyAccessPolicy', + UserName=get_alt_user_id()) + eq(response['ResponseMetadata']['HTTPStatusCode'], 200) + + +@attr(resource='user-policy') +@attr(method='put') +@attr(operation='Verify conflicting user policy statements') +@attr(assertion='succeeds') +@attr('user-policy') +@attr('test_of_iam') +def test_verify_conflicting_user_policy_statements(): + s3client = get_alt_client() + bucket = get_new_bucket(client=s3client) + policy_document = json.dumps( + {"Version": "2012-10-17", + "Statement": [ + {"Sid": "98AB54CG", + "Effect": "Allow", + "Action": "s3:ListBucket", + "Resource": f"arn:aws:s3:::{bucket}"}, + {"Sid": "98AB54CA", + "Effect": "Deny", + "Action": "s3:ListBucket", + "Resource": f"arn:aws:s3:::{bucket}"} + ]} + ) + client = get_iam_client() + response = client.put_user_policy(PolicyDocument=policy_document, PolicyName='DenyAccessPolicy', + UserName=get_alt_user_id()) + eq(response['ResponseMetadata']['HTTPStatusCode'], 200) + e = assert_raises(ClientError, s3client.list_objects, Bucket=bucket) + status, error_code = _get_status_and_error_code(e.response) + eq(status, 403) + eq(error_code, 'AccessDenied') + response = client.delete_user_policy(PolicyName='DenyAccessPolicy', + UserName=get_alt_user_id()) + eq(response['ResponseMetadata']['HTTPStatusCode'], 200) + + +@attr(resource='user-policy') +@attr(method='put') +@attr(operation='Verify conflicting user policies') +@attr(assertion='succeeds') +@attr('user-policy') +@attr('test_of_iam') +def test_verify_conflicting_user_policies(): + s3client = get_alt_client() + bucket = get_new_bucket(client=s3client) + policy_allow = json.dumps( + {"Version": "2012-10-17", + "Statement": {"Sid": "98AB54CG", + "Effect": "Allow", + "Action": "s3:ListBucket", + "Resource": f"arn:aws:s3:::{bucket}"}} + ) + policy_deny = json.dumps( + {"Version": "2012-10-17", + "Statement": {"Sid": "98AB54CGZ", + "Effect": "Deny", + "Action": "s3:ListBucket", + "Resource": f"arn:aws:s3:::{bucket}"}} + ) + client = get_iam_client() + response = client.put_user_policy(PolicyDocument=policy_allow, PolicyName='AllowAccessPolicy', + UserName=get_alt_user_id()) + eq(response['ResponseMetadata']['HTTPStatusCode'], 200) + response = client.put_user_policy(PolicyDocument=policy_deny, PolicyName='DenyAccessPolicy', + UserName=get_alt_user_id()) + eq(response['ResponseMetadata']['HTTPStatusCode'], 200) + e = assert_raises(ClientError, s3client.list_objects, Bucket=bucket) + status, error_code = _get_status_and_error_code(e.response) + eq(status, 403) + eq(error_code, 'AccessDenied') + response = client.delete_user_policy(PolicyName='AllowAccessPolicy', + UserName=get_alt_user_id()) + eq(response['ResponseMetadata']['HTTPStatusCode'], 200) + response = client.delete_user_policy(PolicyName='DenyAccessPolicy', + UserName=get_alt_user_id()) + eq(response['ResponseMetadata']['HTTPStatusCode'], 200) + + +@attr(resource='user-policy') +@attr(operation='Verify Allow Actions for IAM user policies') +@attr(assertion='succeeds') +@attr('user-policy') +@attr('test_of_iam') +def test_verify_allow_iam_actions(): + policy1 = json.dumps( + {"Version": "2012-10-17", + "Statement": {"Sid": "98AB54CGA", + "Effect": "Allow", + "Action": ["iam:PutUserPolicy", "iam:GetUserPolicy", + "iam:ListUserPolicies", "iam:DeleteUserPolicy"], + "Resource": f"arn:aws:iam:::user/{get_alt_user_id()}"}} + ) + client1 = get_iam_client() + iam_client_alt = get_alt_iam_client() + + response = client1.put_user_policy(PolicyDocument=policy1, PolicyName='AllowAccessPolicy', + UserName=get_alt_user_id()) + eq(response['ResponseMetadata']['HTTPStatusCode'], 200) + response = iam_client_alt.get_user_policy(PolicyName='AllowAccessPolicy', + UserName=get_alt_user_id()) + eq(response['ResponseMetadata']['HTTPStatusCode'], 200) + response = iam_client_alt.list_user_policies(UserName=get_alt_user_id()) + eq(response['ResponseMetadata']['HTTPStatusCode'], 200) + response = iam_client_alt.delete_user_policy(PolicyName='AllowAccessPolicy', + UserName=get_alt_user_id()) + eq(response['ResponseMetadata']['HTTPStatusCode'], 200)