mirror of
https://github.com/ceph/s3-tests.git
synced 2024-11-24 19:30:38 +00:00
Merge pull request #453 from Seagate/master
Contributing User Policy tests
This commit is contained in:
commit
952beb9ebd
3 changed files with 1014 additions and 0 deletions
28
README.rst
28
README.rst
|
@ -71,3 +71,31 @@ You can filter tests based on the attributes. There is a attribute named ``test_
|
|||
For running ``webidentity_test`` you'll need have Keycloak running.
|
||||
|
||||
In order to run any STS test you'll need to add "iam" section to the config file. For further reference on how your config file should look check ``s3tests.conf.SAMPLE``.
|
||||
|
||||
========================
|
||||
IAM policy tests
|
||||
========================
|
||||
|
||||
This is a set of IAM policy tests.
|
||||
This section covers tests for user policies such as Put, Get, List, Delete, user policies with s3 actions, conflicting user policies etc
|
||||
These tests uses Boto3 libraries. Tests are written in the ``s3test_boto3`` directory.
|
||||
|
||||
These iam policy tests uses two users with profile name "iam" and "s3 alt" as mentioned in s3tests.conf.SAMPLE.
|
||||
If Ceph cluster is started with vstart, then above two users will get created as part of vstart with same access key, secrete key etc as mentioned in s3tests.conf.SAMPLE.
|
||||
Out of those two users, "iam" user is with capabilities --caps=user-policy=* and "s3 alt" user is without capabilities.
|
||||
Adding above capabilities to "iam" user is also taken care by vstart (If Ceph cluster is started with vstart).
|
||||
|
||||
To run these tests, create configuration file with section "iam" and "s3 alt" refer s3tests.conf.SAMPLE.
|
||||
Once you have that configuration file copied and edited, you can run all the tests with::
|
||||
|
||||
S3TEST_CONF=your.conf ./virtualenv/bin/nosetests s3tests_boto3.functional.test_iam
|
||||
|
||||
You can also specify specific test to run::
|
||||
|
||||
S3TEST_CONF=your.conf ./virtualenv/bin/nosetests s3tests_boto3.functional.test_iam:test_put_user_policy
|
||||
|
||||
Some tests have attributes set such as "fails_on_rgw".
|
||||
You can filter tests based on their attributes::
|
||||
|
||||
S3TEST_CONF=your.conf ./virtualenv/bin/nosetests s3tests_boto3.functional.test_iam -a '!fails_on_rgw'
|
||||
|
||||
|
|
|
@ -435,6 +435,18 @@ def get_iam_client(client_config=None):
|
|||
config=client_config)
|
||||
return client
|
||||
|
||||
def get_iam_s3client(client_config=None):
|
||||
if client_config == None:
|
||||
client_config = Config(signature_version='s3v4')
|
||||
client = boto3.client(service_name='s3',
|
||||
aws_access_key_id=get_iam_access_key(),
|
||||
aws_secret_access_key=get_iam_secret_key(),
|
||||
endpoint_url=config.default_endpoint,
|
||||
use_ssl=config.default_is_secure,
|
||||
verify=config.default_ssl_verify,
|
||||
config=client_config)
|
||||
return client
|
||||
|
||||
def get_alt_client(client_config=None):
|
||||
if client_config == None:
|
||||
client_config = Config(signature_version='s3v4')
|
||||
|
@ -484,6 +496,17 @@ def get_tenant_iam_client():
|
|||
use_ssl=config.default_is_secure)
|
||||
return client
|
||||
|
||||
def get_alt_iam_client():
|
||||
|
||||
client = boto3.client(service_name='iam',
|
||||
region_name='',
|
||||
aws_access_key_id=config.alt_access_key,
|
||||
aws_secret_access_key=config.alt_secret_key,
|
||||
endpoint_url=config.default_endpoint,
|
||||
verify=config.default_ssl_verify,
|
||||
use_ssl=config.default_is_secure)
|
||||
return client
|
||||
|
||||
def get_unauthenticated_client():
|
||||
client = boto3.client(service_name='s3',
|
||||
aws_access_key_id='',
|
||||
|
|
963
s3tests_boto3/functional/test_iam.py
Normal file
963
s3tests_boto3/functional/test_iam.py
Normal file
|
@ -0,0 +1,963 @@
|
|||
import json
|
||||
|
||||
from botocore.exceptions import ClientError
|
||||
from nose.plugins.attrib import attr
|
||||
from nose.tools import eq_ as eq
|
||||
|
||||
from s3tests_boto3.functional.utils import assert_raises
|
||||
from s3tests_boto3.functional.test_s3 import _multipart_upload
|
||||
from . import (
|
||||
get_alt_client,
|
||||
get_iam_client,
|
||||
get_new_bucket,
|
||||
get_iam_s3client,
|
||||
get_alt_iam_client,
|
||||
get_alt_user_id,
|
||||
)
|
||||
from .utils import _get_status, _get_status_and_error_code
|
||||
|
||||
|
||||
@attr(resource='user-policy')
|
||||
@attr(method='put')
|
||||
@attr(operation='Verify Put User Policy')
|
||||
@attr(assertion='succeeds')
|
||||
@attr('user-policy')
|
||||
@attr('test_of_iam')
|
||||
def test_put_user_policy():
|
||||
client = get_iam_client()
|
||||
|
||||
policy_document = json.dumps(
|
||||
{"Version": "2012-10-17",
|
||||
"Statement": {
|
||||
"Effect": "Allow",
|
||||
"Action": "*",
|
||||
"Resource": "*"}}
|
||||
)
|
||||
response = client.put_user_policy(PolicyDocument=policy_document, PolicyName='AllAccessPolicy',
|
||||
UserName=get_alt_user_id())
|
||||
eq(response['ResponseMetadata']['HTTPStatusCode'], 200)
|
||||
response = client.delete_user_policy(PolicyName='AllAccessPolicy',
|
||||
UserName=get_alt_user_id())
|
||||
eq(response['ResponseMetadata']['HTTPStatusCode'], 200)
|
||||
|
||||
|
||||
@attr(resource='user-policy')
|
||||
@attr(method='put')
|
||||
@attr(operation='Verify Put User Policy with invalid user')
|
||||
@attr(assertion='succeeds')
|
||||
@attr('user-policy')
|
||||
@attr('test_of_iam')
|
||||
def test_put_user_policy_invalid_user():
|
||||
client = get_iam_client()
|
||||
|
||||
policy_document = json.dumps(
|
||||
{"Version": "2012-10-17",
|
||||
"Statement": {
|
||||
"Effect": "Allow",
|
||||
"Action": "*",
|
||||
"Resource": "*"}}
|
||||
)
|
||||
e = assert_raises(ClientError, client.put_user_policy, PolicyDocument=policy_document,
|
||||
PolicyName='AllAccessPolicy', UserName="some-non-existing-user-id")
|
||||
status = _get_status(e.response)
|
||||
eq(status, 404)
|
||||
|
||||
|
||||
@attr(resource='user-policy')
|
||||
@attr(method='put')
|
||||
@attr(operation='Verify Put User Policy using parameter value outside limit')
|
||||
@attr(assertion='succeeds')
|
||||
@attr('user-policy')
|
||||
@attr('test_of_iam')
|
||||
def test_put_user_policy_parameter_limit():
|
||||
client = get_iam_client()
|
||||
|
||||
policy_document = json.dumps(
|
||||
{"Version": "2012-10-17",
|
||||
"Statement": [{
|
||||
"Effect": "Allow",
|
||||
"Action": "*",
|
||||
"Resource": "*"}] * 1000
|
||||
}
|
||||
)
|
||||
e = assert_raises(ClientError, client.put_user_policy, PolicyDocument=policy_document,
|
||||
PolicyName='AllAccessPolicy' * 10, UserName=get_alt_user_id())
|
||||
status = _get_status(e.response)
|
||||
eq(status, 400)
|
||||
|
||||
|
||||
@attr(resource='user-policy')
|
||||
@attr(method='put')
|
||||
@attr(operation='Verify Put User Policy using invalid policy document elements')
|
||||
@attr(assertion='succeeds')
|
||||
@attr('user-policy')
|
||||
@attr('test_of_iam')
|
||||
@attr('fails_on_rgw')
|
||||
def test_put_user_policy_invalid_element():
|
||||
client = get_iam_client()
|
||||
|
||||
# With Version other than 2012-10-17
|
||||
policy_document = json.dumps(
|
||||
{"Version": "2010-10-17",
|
||||
"Statement": [{
|
||||
"Effect": "Allow",
|
||||
"Action": "*",
|
||||
"Resource": "*"}]
|
||||
}
|
||||
)
|
||||
e = assert_raises(ClientError, client.put_user_policy, PolicyDocument=policy_document,
|
||||
PolicyName='AllAccessPolicy', UserName=get_alt_user_id())
|
||||
status = _get_status(e.response)
|
||||
eq(status, 400)
|
||||
|
||||
# With no Statement
|
||||
policy_document = json.dumps(
|
||||
{
|
||||
"Version": "2012-10-17",
|
||||
}
|
||||
)
|
||||
e = assert_raises(ClientError, client.put_user_policy, PolicyDocument=policy_document,
|
||||
PolicyName='AllAccessPolicy', UserName=get_alt_user_id())
|
||||
status = _get_status(e.response)
|
||||
eq(status, 400)
|
||||
|
||||
# with same Sid for 2 statements
|
||||
policy_document = json.dumps(
|
||||
{"Version": "2012-10-17",
|
||||
"Statement": [
|
||||
{"Sid": "98AB54CF",
|
||||
"Effect": "Allow",
|
||||
"Action": "*",
|
||||
"Resource": "*"},
|
||||
{"Sid": "98AB54CF",
|
||||
"Effect": "Allow",
|
||||
"Action": "*",
|
||||
"Resource": "*"}]
|
||||
}
|
||||
)
|
||||
e = assert_raises(ClientError, client.put_user_policy, PolicyDocument=policy_document,
|
||||
PolicyName='AllAccessPolicy', UserName=get_alt_user_id())
|
||||
status = _get_status(e.response)
|
||||
eq(status, 400)
|
||||
|
||||
# with Principal
|
||||
policy_document = json.dumps(
|
||||
{"Version": "2012-10-17",
|
||||
"Statement": [{
|
||||
"Effect": "Allow",
|
||||
"Action": "*",
|
||||
"Resource": "*",
|
||||
"Principal": "arn:aws:iam:::username"}]
|
||||
}
|
||||
)
|
||||
e = assert_raises(ClientError, client.put_user_policy, PolicyDocument=policy_document,
|
||||
PolicyName='AllAccessPolicy', UserName=get_alt_user_id())
|
||||
status = _get_status(e.response)
|
||||
eq(status, 400)
|
||||
|
||||
|
||||
@attr(resource='user-policy')
|
||||
@attr(method='put')
|
||||
@attr(operation='Verify Put a policy that already exists')
|
||||
@attr(assertion='succeeds')
|
||||
@attr('user-policy')
|
||||
@attr('test_of_iam')
|
||||
def test_put_existing_user_policy():
|
||||
client = get_iam_client()
|
||||
|
||||
policy_document = json.dumps(
|
||||
{"Version": "2012-10-17",
|
||||
"Statement": {
|
||||
"Effect": "Allow",
|
||||
"Action": "*",
|
||||
"Resource": "*"}
|
||||
}
|
||||
)
|
||||
response = client.put_user_policy(PolicyDocument=policy_document, PolicyName='AllAccessPolicy',
|
||||
UserName=get_alt_user_id())
|
||||
eq(response['ResponseMetadata']['HTTPStatusCode'], 200)
|
||||
client.put_user_policy(PolicyDocument=policy_document, PolicyName='AllAccessPolicy',
|
||||
UserName=get_alt_user_id())
|
||||
client.delete_user_policy(PolicyName='AllAccessPolicy', UserName=get_alt_user_id())
|
||||
|
||||
|
||||
@attr(resource='user-policy')
|
||||
@attr(method='put')
|
||||
@attr(operation='Verify List User policies')
|
||||
@attr(assertion='succeeds')
|
||||
@attr('user-policy')
|
||||
@attr('test_of_iam')
|
||||
def test_list_user_policy():
|
||||
client = get_iam_client()
|
||||
|
||||
policy_document = json.dumps(
|
||||
{"Version": "2012-10-17",
|
||||
"Statement": {
|
||||
"Effect": "Allow",
|
||||
"Action": "*",
|
||||
"Resource": "*"}
|
||||
}
|
||||
)
|
||||
response = client.put_user_policy(PolicyDocument=policy_document, PolicyName='AllAccessPolicy',
|
||||
UserName=get_alt_user_id())
|
||||
eq(response['ResponseMetadata']['HTTPStatusCode'], 200)
|
||||
response = client.list_user_policies(UserName=get_alt_user_id())
|
||||
eq(response['ResponseMetadata']['HTTPStatusCode'], 200)
|
||||
client.delete_user_policy(PolicyName='AllAccessPolicy', UserName=get_alt_user_id())
|
||||
|
||||
|
||||
@attr(resource='user-policy')
|
||||
@attr(method='put')
|
||||
@attr(operation='Verify List User policies with invalid user')
|
||||
@attr(assertion='succeeds')
|
||||
@attr('user-policy')
|
||||
@attr('test_of_iam')
|
||||
def test_list_user_policy_invalid_user():
|
||||
client = get_iam_client()
|
||||
e = assert_raises(ClientError, client.list_user_policies, UserName="some-non-existing-user-id")
|
||||
status = _get_status(e.response)
|
||||
eq(status, 404)
|
||||
|
||||
|
||||
@attr(resource='user-policy')
|
||||
@attr(method='get')
|
||||
@attr(operation='Verify Get User policy')
|
||||
@attr(assertion='succeeds')
|
||||
@attr('user-policy')
|
||||
@attr('test_of_iam')
|
||||
def test_get_user_policy():
|
||||
client = get_iam_client()
|
||||
|
||||
policy_document = json.dumps(
|
||||
{"Version": "2012-10-17",
|
||||
"Statement": {
|
||||
"Effect": "Allow",
|
||||
"Action": "*",
|
||||
"Resource": "*"}}
|
||||
)
|
||||
response = client.put_user_policy(PolicyDocument=policy_document, PolicyName='AllAccessPolicy',
|
||||
UserName=get_alt_user_id())
|
||||
eq(response['ResponseMetadata']['HTTPStatusCode'], 200)
|
||||
response = client.get_user_policy(PolicyName='AllAccessPolicy', UserName=get_alt_user_id())
|
||||
eq(response['ResponseMetadata']['HTTPStatusCode'], 200)
|
||||
|
||||
response = client.delete_user_policy(PolicyName='AllAccessPolicy',
|
||||
UserName=get_alt_user_id())
|
||||
eq(response['ResponseMetadata']['HTTPStatusCode'], 200)
|
||||
|
||||
|
||||
@attr(resource='user-policy')
|
||||
@attr(method='get')
|
||||
@attr(operation='Verify Get User Policy with invalid user')
|
||||
@attr(assertion='succeeds')
|
||||
@attr('user-policy')
|
||||
@attr('test_of_iam')
|
||||
def test_get_user_policy_invalid_user():
|
||||
client = get_iam_client()
|
||||
|
||||
policy_document = json.dumps(
|
||||
{"Version": "2012-10-17",
|
||||
"Statement": {
|
||||
"Effect": "Allow",
|
||||
"Action": "*",
|
||||
"Resource": "*"}}
|
||||
)
|
||||
response = client.put_user_policy(PolicyDocument=policy_document, PolicyName='AllAccessPolicy',
|
||||
UserName=get_alt_user_id())
|
||||
eq(response['ResponseMetadata']['HTTPStatusCode'], 200)
|
||||
e = assert_raises(ClientError, client.get_user_policy, PolicyName='AllAccessPolicy',
|
||||
UserName="some-non-existing-user-id")
|
||||
status = _get_status(e.response)
|
||||
eq(status, 404)
|
||||
client.delete_user_policy(PolicyName='AllAccessPolicy', UserName=get_alt_user_id())
|
||||
|
||||
|
||||
@attr(resource='user-policy')
|
||||
@attr(method='get')
|
||||
@attr(operation='Verify Get User Policy with invalid policy name')
|
||||
@attr(assertion='succeeds')
|
||||
@attr('user-policy')
|
||||
@attr('test_of_iam')
|
||||
@attr('fails_on_rgw')
|
||||
def test_get_user_policy_invalid_policy_name():
|
||||
client = get_iam_client()
|
||||
|
||||
policy_document = json.dumps(
|
||||
{"Version": "2012-10-17",
|
||||
"Statement": {
|
||||
"Effect": "Allow",
|
||||
"Action": "*",
|
||||
"Resource": "*"}}
|
||||
)
|
||||
client.put_user_policy(PolicyDocument=policy_document, PolicyName='AllAccessPolicy',
|
||||
UserName=get_alt_user_id())
|
||||
e = assert_raises(ClientError, client.get_user_policy, PolicyName='non-existing-policy-name',
|
||||
UserName=get_alt_user_id())
|
||||
status = _get_status(e.response)
|
||||
eq(status, 404)
|
||||
client.delete_user_policy(PolicyName='AllAccessPolicy', UserName=get_alt_user_id())
|
||||
|
||||
|
||||
@attr(resource='user-policy')
|
||||
@attr(method='get')
|
||||
@attr(operation='Verify Get Deleted User Policy')
|
||||
@attr(assertion='succeeds')
|
||||
@attr('user-policy')
|
||||
@attr('test_of_iam')
|
||||
@attr('fails_on_rgw')
|
||||
def test_get_deleted_user_policy():
|
||||
client = get_iam_client()
|
||||
|
||||
policy_document = json.dumps(
|
||||
{"Version": "2012-10-17",
|
||||
"Statement": {
|
||||
"Effect": "Allow",
|
||||
"Action": "*",
|
||||
"Resource": "*"}}
|
||||
)
|
||||
client.put_user_policy(PolicyDocument=policy_document, PolicyName='AllAccessPolicy',
|
||||
UserName=get_alt_user_id())
|
||||
client.delete_user_policy(PolicyName='AllAccessPolicy', UserName=get_alt_user_id())
|
||||
e = assert_raises(ClientError, client.get_user_policy, PolicyName='AllAccessPolicy',
|
||||
UserName=get_alt_user_id())
|
||||
status = _get_status(e.response)
|
||||
eq(status, 404)
|
||||
|
||||
|
||||
@attr(resource='user-policy')
|
||||
@attr(method='get')
|
||||
@attr(operation='Verify Get a policy from multiple policies for a user')
|
||||
@attr(assertion='succeeds')
|
||||
@attr('user-policy')
|
||||
@attr('test_of_iam')
|
||||
def test_get_user_policy_from_multiple_policies():
|
||||
client = get_iam_client()
|
||||
|
||||
policy_document_allow = json.dumps(
|
||||
{"Version": "2012-10-17",
|
||||
"Statement": {
|
||||
"Effect": "Allow",
|
||||
"Action": "*",
|
||||
"Resource": "*"}}
|
||||
)
|
||||
|
||||
response = client.put_user_policy(PolicyDocument=policy_document_allow,
|
||||
PolicyName='AllowAccessPolicy1',
|
||||
UserName=get_alt_user_id())
|
||||
eq(response['ResponseMetadata']['HTTPStatusCode'], 200)
|
||||
response = client.put_user_policy(PolicyDocument=policy_document_allow,
|
||||
PolicyName='AllowAccessPolicy2',
|
||||
UserName=get_alt_user_id())
|
||||
eq(response['ResponseMetadata']['HTTPStatusCode'], 200)
|
||||
response = client.get_user_policy(PolicyName='AllowAccessPolicy2',
|
||||
UserName=get_alt_user_id())
|
||||
eq(response['ResponseMetadata']['HTTPStatusCode'], 200)
|
||||
|
||||
response = client.delete_user_policy(PolicyName='AllowAccessPolicy1',
|
||||
UserName=get_alt_user_id())
|
||||
eq(response['ResponseMetadata']['HTTPStatusCode'], 200)
|
||||
response = client.delete_user_policy(PolicyName='AllowAccessPolicy2',
|
||||
UserName=get_alt_user_id())
|
||||
eq(response['ResponseMetadata']['HTTPStatusCode'], 200)
|
||||
|
||||
|
||||
@attr(resource='user-policy')
|
||||
@attr(method='delete')
|
||||
@attr(operation='Verify Delete User Policy')
|
||||
@attr(assertion='succeeds')
|
||||
@attr('user-policy')
|
||||
@attr('test_of_iam')
|
||||
def test_delete_user_policy():
|
||||
client = get_iam_client()
|
||||
|
||||
policy_document_allow = json.dumps(
|
||||
{"Version": "2012-10-17",
|
||||
"Statement": {
|
||||
"Effect": "Allow",
|
||||
"Action": "*",
|
||||
"Resource": "*"}}
|
||||
)
|
||||
|
||||
response = client.put_user_policy(PolicyDocument=policy_document_allow,
|
||||
PolicyName='AllowAccessPolicy',
|
||||
UserName=get_alt_user_id())
|
||||
eq(response['ResponseMetadata']['HTTPStatusCode'], 200)
|
||||
response = client.delete_user_policy(PolicyName='AllowAccessPolicy',
|
||||
UserName=get_alt_user_id())
|
||||
eq(response['ResponseMetadata']['HTTPStatusCode'], 200)
|
||||
|
||||
|
||||
@attr(resource='user-policy')
|
||||
@attr(method='delete')
|
||||
@attr(operation='Verify Delete User Policy with invalid user')
|
||||
@attr(assertion='succeeds')
|
||||
@attr('user-policy')
|
||||
@attr('test_of_iam')
|
||||
def test_delete_user_policy_invalid_user():
|
||||
client = get_iam_client()
|
||||
|
||||
policy_document_allow = json.dumps(
|
||||
{"Version": "2012-10-17",
|
||||
"Statement": {
|
||||
"Effect": "Allow",
|
||||
"Action": "*",
|
||||
"Resource": "*"}}
|
||||
)
|
||||
|
||||
response = client.put_user_policy(PolicyDocument=policy_document_allow,
|
||||
PolicyName='AllowAccessPolicy',
|
||||
UserName=get_alt_user_id())
|
||||
eq(response['ResponseMetadata']['HTTPStatusCode'], 200)
|
||||
e = assert_raises(ClientError, client.delete_user_policy, PolicyName='AllAccessPolicy',
|
||||
UserName="some-non-existing-user-id")
|
||||
status = _get_status(e.response)
|
||||
eq(status, 404)
|
||||
response = client.delete_user_policy(PolicyName='AllowAccessPolicy',
|
||||
UserName=get_alt_user_id())
|
||||
eq(response['ResponseMetadata']['HTTPStatusCode'], 200)
|
||||
|
||||
|
||||
@attr(resource='user-policy')
|
||||
@attr(method='delete')
|
||||
@attr(operation='Verify Delete User Policy with invalid policy name')
|
||||
@attr(assertion='succeeds')
|
||||
@attr('user-policy')
|
||||
@attr('test_of_iam')
|
||||
def test_delete_user_policy_invalid_policy_name():
|
||||
client = get_iam_client()
|
||||
|
||||
policy_document_allow = json.dumps(
|
||||
{"Version": "2012-10-17",
|
||||
"Statement": {
|
||||
"Effect": "Allow",
|
||||
"Action": "*",
|
||||
"Resource": "*"}}
|
||||
)
|
||||
|
||||
response = client.put_user_policy(PolicyDocument=policy_document_allow,
|
||||
PolicyName='AllowAccessPolicy',
|
||||
UserName=get_alt_user_id())
|
||||
eq(response['ResponseMetadata']['HTTPStatusCode'], 200)
|
||||
e = assert_raises(ClientError, client.delete_user_policy, PolicyName='non-existing-policy-name',
|
||||
UserName=get_alt_user_id())
|
||||
status = _get_status(e.response)
|
||||
eq(status, 404)
|
||||
response = client.delete_user_policy(PolicyName='AllowAccessPolicy',
|
||||
UserName=get_alt_user_id())
|
||||
eq(response['ResponseMetadata']['HTTPStatusCode'], 200)
|
||||
|
||||
|
||||
@attr(resource='user-policy')
|
||||
@attr(method='delete')
|
||||
@attr(operation='Verify Delete multiple User policies for a user')
|
||||
@attr(assertion='succeeds')
|
||||
@attr('user-policy')
|
||||
@attr('test_of_iam')
|
||||
def test_delete_user_policy_from_multiple_policies():
|
||||
client = get_iam_client()
|
||||
|
||||
policy_document_allow = json.dumps(
|
||||
{"Version": "2012-10-17",
|
||||
"Statement": {
|
||||
"Effect": "Allow",
|
||||
"Action": "*",
|
||||
"Resource": "*"}}
|
||||
)
|
||||
|
||||
response = client.put_user_policy(PolicyDocument=policy_document_allow,
|
||||
PolicyName='AllowAccessPolicy1',
|
||||
UserName=get_alt_user_id())
|
||||
eq(response['ResponseMetadata']['HTTPStatusCode'], 200)
|
||||
response = client.put_user_policy(PolicyDocument=policy_document_allow,
|
||||
PolicyName='AllowAccessPolicy2',
|
||||
UserName=get_alt_user_id())
|
||||
eq(response['ResponseMetadata']['HTTPStatusCode'], 200)
|
||||
response = client.put_user_policy(PolicyDocument=policy_document_allow,
|
||||
PolicyName='AllowAccessPolicy3',
|
||||
UserName=get_alt_user_id())
|
||||
eq(response['ResponseMetadata']['HTTPStatusCode'], 200)
|
||||
response = client.delete_user_policy(PolicyName='AllowAccessPolicy1',
|
||||
UserName=get_alt_user_id())
|
||||
eq(response['ResponseMetadata']['HTTPStatusCode'], 200)
|
||||
response = client.delete_user_policy(PolicyName='AllowAccessPolicy2',
|
||||
UserName=get_alt_user_id())
|
||||
eq(response['ResponseMetadata']['HTTPStatusCode'], 200)
|
||||
response = client.get_user_policy(PolicyName='AllowAccessPolicy3',
|
||||
UserName=get_alt_user_id())
|
||||
eq(response['ResponseMetadata']['HTTPStatusCode'], 200)
|
||||
|
||||
response = client.delete_user_policy(PolicyName='AllowAccessPolicy3',
|
||||
UserName=get_alt_user_id())
|
||||
eq(response['ResponseMetadata']['HTTPStatusCode'], 200)
|
||||
|
||||
|
||||
@attr(resource='user-policy')
|
||||
@attr(method='s3 Actions')
|
||||
@attr(operation='Verify Allow Bucket Actions in user Policy')
|
||||
@attr(assertion='succeeds')
|
||||
@attr('user-policy')
|
||||
@attr('test_of_iam')
|
||||
def test_allow_bucket_actions_in_user_policy():
|
||||
client = get_iam_client()
|
||||
s3_client_alt = get_alt_client()
|
||||
|
||||
s3_client_iam = get_iam_s3client()
|
||||
bucket = get_new_bucket(client=s3_client_iam)
|
||||
s3_client_iam.put_object(Bucket=bucket, Key='foo', Body='bar')
|
||||
|
||||
policy_document_allow = json.dumps(
|
||||
{"Version": "2012-10-17",
|
||||
"Statement": {
|
||||
"Effect": "Allow",
|
||||
"Action": ["s3:ListBucket", "s3:DeleteBucket"],
|
||||
"Resource": f"arn:aws:s3:::{bucket}"}}
|
||||
)
|
||||
|
||||
response = client.put_user_policy(PolicyDocument=policy_document_allow,
|
||||
PolicyName='AllowAccessPolicy', UserName=get_alt_user_id())
|
||||
eq(response['ResponseMetadata']['HTTPStatusCode'], 200)
|
||||
|
||||
response = s3_client_alt.list_objects(Bucket=bucket)
|
||||
object_found = False
|
||||
for object_received in response['Contents']:
|
||||
if "foo" == object_received['Key']:
|
||||
object_found = True
|
||||
break
|
||||
if not object_found:
|
||||
raise AssertionError("Object is not listed")
|
||||
|
||||
response = s3_client_iam.delete_object(Bucket=bucket, Key='foo')
|
||||
eq(response['ResponseMetadata']['HTTPStatusCode'], 204)
|
||||
|
||||
response = s3_client_alt.delete_bucket(Bucket=bucket)
|
||||
eq(response['ResponseMetadata']['HTTPStatusCode'], 204)
|
||||
|
||||
response = s3_client_iam.list_buckets()
|
||||
for bucket in response['Buckets']:
|
||||
if bucket == bucket['Name']:
|
||||
raise AssertionError("deleted bucket is getting listed")
|
||||
|
||||
response = client.delete_user_policy(PolicyName='AllowAccessPolicy',
|
||||
UserName=get_alt_user_id())
|
||||
eq(response['ResponseMetadata']['HTTPStatusCode'], 200)
|
||||
|
||||
|
||||
@attr(resource='user-policy')
|
||||
@attr(method='s3 Actions')
|
||||
@attr(operation='Verify Deny Bucket Actions in user Policy')
|
||||
@attr(assertion='succeeds')
|
||||
@attr('user-policy')
|
||||
@attr('test_of_iam')
|
||||
def test_deny_bucket_actions_in_user_policy():
|
||||
client = get_iam_client()
|
||||
s3_client = get_alt_client()
|
||||
bucket = get_new_bucket(client=s3_client)
|
||||
|
||||
policy_document_deny = json.dumps(
|
||||
{"Version": "2012-10-17",
|
||||
"Statement": {
|
||||
"Effect": "Deny",
|
||||
"Action": ["s3:ListAllMyBuckets", "s3:DeleteBucket"],
|
||||
"Resource": "arn:aws:s3:::*"}}
|
||||
)
|
||||
|
||||
response = client.put_user_policy(PolicyDocument=policy_document_deny,
|
||||
PolicyName='DenyAccessPolicy',
|
||||
UserName=get_alt_user_id())
|
||||
eq(response['ResponseMetadata']['HTTPStatusCode'], 200)
|
||||
|
||||
e = assert_raises(ClientError, s3_client.list_buckets, Bucket=bucket)
|
||||
status, error_code = _get_status_and_error_code(e.response)
|
||||
eq(status, 403)
|
||||
eq(error_code, 'AccessDenied')
|
||||
e = assert_raises(ClientError, s3_client.delete_bucket, Bucket=bucket)
|
||||
status, error_code = _get_status_and_error_code(e.response)
|
||||
eq(status, 403)
|
||||
eq(error_code, 'AccessDenied')
|
||||
response = client.delete_user_policy(PolicyName='DenyAccessPolicy',
|
||||
UserName=get_alt_user_id())
|
||||
eq(response['ResponseMetadata']['HTTPStatusCode'], 200)
|
||||
response = s3_client.delete_bucket(Bucket=bucket)
|
||||
eq(response['ResponseMetadata']['HTTPStatusCode'], 204)
|
||||
|
||||
|
||||
@attr(resource='user-policy')
|
||||
@attr(method='s3 Actions')
|
||||
@attr(operation='Verify Allow Object Actions in user Policy')
|
||||
@attr(assertion='succeeds')
|
||||
@attr('user-policy')
|
||||
@attr('test_of_iam')
|
||||
def test_allow_object_actions_in_user_policy():
|
||||
client = get_iam_client()
|
||||
s3_client_alt = get_alt_client()
|
||||
s3_client_iam = get_iam_s3client()
|
||||
bucket = get_new_bucket(client=s3_client_iam)
|
||||
|
||||
policy_document_allow = json.dumps(
|
||||
{"Version": "2012-10-17",
|
||||
"Statement": {
|
||||
"Effect": "Allow",
|
||||
"Action": ["s3:PutObject", "s3:GetObject", "s3:DeleteObject"],
|
||||
"Resource": f"arn:aws:s3:::{bucket}/*"}}
|
||||
)
|
||||
response = client.put_user_policy(PolicyDocument=policy_document_allow,
|
||||
PolicyName='AllowAccessPolicy', UserName=get_alt_user_id())
|
||||
eq(response['ResponseMetadata']['HTTPStatusCode'], 200)
|
||||
|
||||
s3_client_alt.put_object(Bucket=bucket, Key='foo', Body='bar')
|
||||
response = s3_client_alt.get_object(Bucket=bucket, Key='foo')
|
||||
body = response['Body'].read()
|
||||
if type(body) is bytes:
|
||||
body = body.decode()
|
||||
eq(body, "bar")
|
||||
response = s3_client_alt.delete_object(Bucket=bucket, Key='foo')
|
||||
eq(response['ResponseMetadata']['HTTPStatusCode'], 204)
|
||||
|
||||
e = assert_raises(ClientError, s3_client_iam.get_object, Bucket=bucket, Key='foo')
|
||||
status, error_code = _get_status_and_error_code(e.response)
|
||||
eq(status, 404)
|
||||
eq(error_code, 'NoSuchKey')
|
||||
response = s3_client_iam.delete_bucket(Bucket=bucket)
|
||||
eq(response['ResponseMetadata']['HTTPStatusCode'], 204)
|
||||
response = client.delete_user_policy(PolicyName='AllowAccessPolicy',
|
||||
UserName=get_alt_user_id())
|
||||
eq(response['ResponseMetadata']['HTTPStatusCode'], 200)
|
||||
|
||||
|
||||
@attr(resource='user-policy')
|
||||
@attr(method='s3 Actions')
|
||||
@attr(operation='Verify Deny Object Actions in user Policy')
|
||||
@attr(assertion='succeeds')
|
||||
@attr('user-policy')
|
||||
@attr('test_of_iam')
|
||||
def test_deny_object_actions_in_user_policy():
|
||||
client = get_iam_client()
|
||||
s3_client_alt = get_alt_client()
|
||||
bucket = get_new_bucket(client=s3_client_alt)
|
||||
s3_client_alt.put_object(Bucket=bucket, Key='foo', Body='bar')
|
||||
|
||||
policy_document_deny = json.dumps(
|
||||
{"Version": "2012-10-17",
|
||||
"Statement": [{
|
||||
"Effect": "Deny",
|
||||
"Action": ["s3:PutObject", "s3:GetObject", "s3:DeleteObject"],
|
||||
"Resource": f"arn:aws:s3:::{bucket}/*"}, {
|
||||
"Effect": "Allow",
|
||||
"Action": ["s3:DeleteBucket"],
|
||||
"Resource": f"arn:aws:s3:::{bucket}"}]}
|
||||
)
|
||||
client.put_user_policy(PolicyDocument=policy_document_deny, PolicyName='DenyAccessPolicy',
|
||||
UserName=get_alt_user_id())
|
||||
|
||||
e = assert_raises(ClientError, s3_client_alt.put_object, Bucket=bucket, Key='foo')
|
||||
status, error_code = _get_status_and_error_code(e.response)
|
||||
eq(status, 403)
|
||||
eq(error_code, 'AccessDenied')
|
||||
e = assert_raises(ClientError, s3_client_alt.get_object, Bucket=bucket, Key='foo')
|
||||
status, error_code = _get_status_and_error_code(e.response)
|
||||
eq(status, 403)
|
||||
eq(error_code, 'AccessDenied')
|
||||
e = assert_raises(ClientError, s3_client_alt.delete_object, Bucket=bucket, Key='foo')
|
||||
status, error_code = _get_status_and_error_code(e.response)
|
||||
eq(status, 403)
|
||||
eq(error_code, 'AccessDenied')
|
||||
|
||||
response = client.delete_user_policy(PolicyName='DenyAccessPolicy',
|
||||
UserName=get_alt_user_id())
|
||||
eq(response['ResponseMetadata']['HTTPStatusCode'], 200)
|
||||
|
||||
|
||||
@attr(resource='user-policy')
|
||||
@attr(method='s3 Actions')
|
||||
@attr(operation='Verify Allow Multipart Actions in user Policy')
|
||||
@attr(assertion='succeeds')
|
||||
@attr('user-policy')
|
||||
@attr('test_of_iam')
|
||||
def test_allow_multipart_actions_in_user_policy():
|
||||
client = get_iam_client()
|
||||
s3_client_alt = get_alt_client()
|
||||
s3_client_iam = get_iam_s3client()
|
||||
bucket = get_new_bucket(client=s3_client_iam)
|
||||
|
||||
policy_document_allow = json.dumps(
|
||||
{"Version": "2012-10-17",
|
||||
"Statement": {
|
||||
"Effect": "Allow",
|
||||
"Action": ["s3:ListBucketMultipartUploads", "s3:AbortMultipartUpload"],
|
||||
"Resource": "arn:aws:s3:::*"}}
|
||||
)
|
||||
response = client.put_user_policy(PolicyDocument=policy_document_allow,
|
||||
PolicyName='AllowAccessPolicy', UserName=get_alt_user_id())
|
||||
eq(response['ResponseMetadata']['HTTPStatusCode'], 200)
|
||||
key = "mymultipart"
|
||||
mb = 1024 * 1024
|
||||
|
||||
(upload_id, _, _) = _multipart_upload(client=s3_client_iam, bucket_name=bucket, key=key,
|
||||
size=5 * mb)
|
||||
response = s3_client_alt.list_multipart_uploads(Bucket=bucket)
|
||||
eq(response['ResponseMetadata']['HTTPStatusCode'], 200)
|
||||
response = s3_client_alt.abort_multipart_upload(Bucket=bucket, Key=key, UploadId=upload_id)
|
||||
eq(response['ResponseMetadata']['HTTPStatusCode'], 204)
|
||||
|
||||
response = s3_client_iam.delete_bucket(Bucket=bucket)
|
||||
eq(response['ResponseMetadata']['HTTPStatusCode'], 204)
|
||||
response = client.delete_user_policy(PolicyName='AllowAccessPolicy',
|
||||
UserName=get_alt_user_id())
|
||||
eq(response['ResponseMetadata']['HTTPStatusCode'], 200)
|
||||
|
||||
|
||||
@attr(resource='user-policy')
|
||||
@attr(method='s3 Actions')
|
||||
@attr(operation='Verify Deny Multipart Actions in user Policy')
|
||||
@attr(assertion='succeeds')
|
||||
@attr('user-policy')
|
||||
@attr('test_of_iam')
|
||||
def test_deny_multipart_actions_in_user_policy():
|
||||
client = get_iam_client()
|
||||
s3_client = get_alt_client()
|
||||
bucket = get_new_bucket(client=s3_client)
|
||||
|
||||
policy_document_deny = json.dumps(
|
||||
{"Version": "2012-10-17",
|
||||
"Statement": {
|
||||
"Effect": "Deny",
|
||||
"Action": ["s3:ListBucketMultipartUploads", "s3:AbortMultipartUpload"],
|
||||
"Resource": "arn:aws:s3:::*"}}
|
||||
)
|
||||
response = client.put_user_policy(PolicyDocument=policy_document_deny,
|
||||
PolicyName='DenyAccessPolicy',
|
||||
UserName=get_alt_user_id())
|
||||
eq(response['ResponseMetadata']['HTTPStatusCode'], 200)
|
||||
key = "mymultipart"
|
||||
mb = 1024 * 1024
|
||||
|
||||
(upload_id, _, _) = _multipart_upload(client=s3_client, bucket_name=bucket, key=key,
|
||||
size=5 * mb)
|
||||
|
||||
e = assert_raises(ClientError, s3_client.list_multipart_uploads, Bucket=bucket)
|
||||
status, error_code = _get_status_and_error_code(e.response)
|
||||
eq(status, 403)
|
||||
eq(error_code, 'AccessDenied')
|
||||
|
||||
e = assert_raises(ClientError, s3_client.abort_multipart_upload, Bucket=bucket,
|
||||
Key=key, UploadId=upload_id)
|
||||
status, error_code = _get_status_and_error_code(e.response)
|
||||
eq(status, 403)
|
||||
eq(error_code, 'AccessDenied')
|
||||
|
||||
response = s3_client.delete_bucket(Bucket=bucket)
|
||||
eq(response['ResponseMetadata']['HTTPStatusCode'], 204)
|
||||
response = client.delete_user_policy(PolicyName='DenyAccessPolicy',
|
||||
UserName=get_alt_user_id())
|
||||
eq(response['ResponseMetadata']['HTTPStatusCode'], 200)
|
||||
|
||||
|
||||
@attr(resource='user-policy')
|
||||
@attr(method='s3 Actions')
|
||||
@attr(operation='Verify Allow Tagging Actions in user Policy')
|
||||
@attr(assertion='succeeds')
|
||||
@attr('user-policy')
|
||||
@attr('test_of_iam')
|
||||
def test_allow_tagging_actions_in_user_policy():
|
||||
client = get_iam_client()
|
||||
s3_client_alt = get_alt_client()
|
||||
s3_client_iam = get_iam_s3client()
|
||||
bucket = get_new_bucket(client=s3_client_iam)
|
||||
|
||||
policy_document_allow = json.dumps(
|
||||
{"Version": "2012-10-17",
|
||||
"Statement": {
|
||||
"Effect": "Allow",
|
||||
"Action": ["s3:PutBucketTagging", "s3:GetBucketTagging",
|
||||
"s3:PutObjectTagging", "s3:GetObjectTagging"],
|
||||
"Resource": f"arn:aws:s3:::*"}}
|
||||
)
|
||||
client.put_user_policy(PolicyDocument=policy_document_allow, PolicyName='AllowAccessPolicy',
|
||||
UserName=get_alt_user_id())
|
||||
tags = {'TagSet': [{'Key': 'Hello', 'Value': 'World'}, ]}
|
||||
|
||||
response = s3_client_alt.put_bucket_tagging(Bucket=bucket, Tagging=tags)
|
||||
eq(response['ResponseMetadata']['HTTPStatusCode'], 200)
|
||||
response = s3_client_alt.get_bucket_tagging(Bucket=bucket)
|
||||
eq(response['ResponseMetadata']['HTTPStatusCode'], 200)
|
||||
eq(response['TagSet'][0]['Key'], 'Hello')
|
||||
eq(response['TagSet'][0]['Value'], 'World')
|
||||
|
||||
obj_key = 'obj'
|
||||
response = s3_client_iam.put_object(Bucket=bucket, Key=obj_key, Body='obj_body')
|
||||
eq(response['ResponseMetadata']['HTTPStatusCode'], 200)
|
||||
response = s3_client_alt.put_object_tagging(Bucket=bucket, Key=obj_key, Tagging=tags)
|
||||
eq(response['ResponseMetadata']['HTTPStatusCode'], 200)
|
||||
response = s3_client_alt.get_object_tagging(Bucket=bucket, Key=obj_key)
|
||||
eq(response['ResponseMetadata']['HTTPStatusCode'], 200)
|
||||
eq(response['TagSet'], tags['TagSet'])
|
||||
|
||||
response = s3_client_iam.delete_object(Bucket=bucket, Key=obj_key)
|
||||
eq(response['ResponseMetadata']['HTTPStatusCode'], 204)
|
||||
response = s3_client_iam.delete_bucket(Bucket=bucket)
|
||||
eq(response['ResponseMetadata']['HTTPStatusCode'], 204)
|
||||
response = client.delete_user_policy(PolicyName='AllowAccessPolicy',
|
||||
UserName=get_alt_user_id())
|
||||
eq(response['ResponseMetadata']['HTTPStatusCode'], 200)
|
||||
|
||||
|
||||
@attr(resource='user-policy')
|
||||
@attr(method='s3 Actions')
|
||||
@attr(operation='Verify Deny Tagging Actions in user Policy')
|
||||
@attr(assertion='succeeds')
|
||||
@attr('user-policy')
|
||||
@attr('test_of_iam')
|
||||
def test_deny_tagging_actions_in_user_policy():
|
||||
client = get_iam_client()
|
||||
s3_client = get_alt_client()
|
||||
bucket = get_new_bucket(client=s3_client)
|
||||
|
||||
policy_document_deny = json.dumps(
|
||||
{"Version": "2012-10-17",
|
||||
"Statement": {
|
||||
"Effect": "Deny",
|
||||
"Action": ["s3:PutBucketTagging", "s3:GetBucketTagging",
|
||||
"s3:PutObjectTagging", "s3:DeleteObjectTagging"],
|
||||
"Resource": "arn:aws:s3:::*"}}
|
||||
)
|
||||
client.put_user_policy(PolicyDocument=policy_document_deny, PolicyName='DenyAccessPolicy',
|
||||
UserName=get_alt_user_id())
|
||||
tags = {'TagSet': [{'Key': 'Hello', 'Value': 'World'}, ]}
|
||||
|
||||
e = assert_raises(ClientError, s3_client.put_bucket_tagging, Bucket=bucket, Tagging=tags)
|
||||
status, error_code = _get_status_and_error_code(e.response)
|
||||
eq(status, 403)
|
||||
eq(error_code, 'AccessDenied')
|
||||
e = assert_raises(ClientError, s3_client.get_bucket_tagging, Bucket=bucket)
|
||||
status, error_code = _get_status_and_error_code(e.response)
|
||||
eq(status, 403)
|
||||
eq(error_code, 'AccessDenied')
|
||||
|
||||
obj_key = 'obj'
|
||||
response = s3_client.put_object(Bucket=bucket, Key=obj_key, Body='obj_body')
|
||||
eq(response['ResponseMetadata']['HTTPStatusCode'], 200)
|
||||
|
||||
e = assert_raises(ClientError, s3_client.put_object_tagging, Bucket=bucket, Key=obj_key,
|
||||
Tagging=tags)
|
||||
status, error_code = _get_status_and_error_code(e.response)
|
||||
eq(status, 403)
|
||||
eq(error_code, 'AccessDenied')
|
||||
e = assert_raises(ClientError, s3_client.delete_object_tagging, Bucket=bucket, Key=obj_key)
|
||||
status, error_code = _get_status_and_error_code(e.response)
|
||||
eq(status, 403)
|
||||
eq(error_code, 'AccessDenied')
|
||||
|
||||
response = s3_client.delete_object(Bucket=bucket, Key=obj_key)
|
||||
eq(response['ResponseMetadata']['HTTPStatusCode'], 204)
|
||||
response = s3_client.delete_bucket(Bucket=bucket)
|
||||
eq(response['ResponseMetadata']['HTTPStatusCode'], 204)
|
||||
response = client.delete_user_policy(PolicyName='DenyAccessPolicy',
|
||||
UserName=get_alt_user_id())
|
||||
eq(response['ResponseMetadata']['HTTPStatusCode'], 200)
|
||||
|
||||
|
||||
@attr(resource='user-policy')
|
||||
@attr(method='put')
|
||||
@attr(operation='Verify conflicting user policy statements')
|
||||
@attr(assertion='succeeds')
|
||||
@attr('user-policy')
|
||||
@attr('test_of_iam')
|
||||
def test_verify_conflicting_user_policy_statements():
|
||||
s3client = get_alt_client()
|
||||
bucket = get_new_bucket(client=s3client)
|
||||
policy_document = json.dumps(
|
||||
{"Version": "2012-10-17",
|
||||
"Statement": [
|
||||
{"Sid": "98AB54CG",
|
||||
"Effect": "Allow",
|
||||
"Action": "s3:ListBucket",
|
||||
"Resource": f"arn:aws:s3:::{bucket}"},
|
||||
{"Sid": "98AB54CA",
|
||||
"Effect": "Deny",
|
||||
"Action": "s3:ListBucket",
|
||||
"Resource": f"arn:aws:s3:::{bucket}"}
|
||||
]}
|
||||
)
|
||||
client = get_iam_client()
|
||||
response = client.put_user_policy(PolicyDocument=policy_document, PolicyName='DenyAccessPolicy',
|
||||
UserName=get_alt_user_id())
|
||||
eq(response['ResponseMetadata']['HTTPStatusCode'], 200)
|
||||
e = assert_raises(ClientError, s3client.list_objects, Bucket=bucket)
|
||||
status, error_code = _get_status_and_error_code(e.response)
|
||||
eq(status, 403)
|
||||
eq(error_code, 'AccessDenied')
|
||||
response = client.delete_user_policy(PolicyName='DenyAccessPolicy',
|
||||
UserName=get_alt_user_id())
|
||||
eq(response['ResponseMetadata']['HTTPStatusCode'], 200)
|
||||
|
||||
|
||||
@attr(resource='user-policy')
|
||||
@attr(method='put')
|
||||
@attr(operation='Verify conflicting user policies')
|
||||
@attr(assertion='succeeds')
|
||||
@attr('user-policy')
|
||||
@attr('test_of_iam')
|
||||
def test_verify_conflicting_user_policies():
|
||||
s3client = get_alt_client()
|
||||
bucket = get_new_bucket(client=s3client)
|
||||
policy_allow = json.dumps(
|
||||
{"Version": "2012-10-17",
|
||||
"Statement": {"Sid": "98AB54CG",
|
||||
"Effect": "Allow",
|
||||
"Action": "s3:ListBucket",
|
||||
"Resource": f"arn:aws:s3:::{bucket}"}}
|
||||
)
|
||||
policy_deny = json.dumps(
|
||||
{"Version": "2012-10-17",
|
||||
"Statement": {"Sid": "98AB54CGZ",
|
||||
"Effect": "Deny",
|
||||
"Action": "s3:ListBucket",
|
||||
"Resource": f"arn:aws:s3:::{bucket}"}}
|
||||
)
|
||||
client = get_iam_client()
|
||||
response = client.put_user_policy(PolicyDocument=policy_allow, PolicyName='AllowAccessPolicy',
|
||||
UserName=get_alt_user_id())
|
||||
eq(response['ResponseMetadata']['HTTPStatusCode'], 200)
|
||||
response = client.put_user_policy(PolicyDocument=policy_deny, PolicyName='DenyAccessPolicy',
|
||||
UserName=get_alt_user_id())
|
||||
eq(response['ResponseMetadata']['HTTPStatusCode'], 200)
|
||||
e = assert_raises(ClientError, s3client.list_objects, Bucket=bucket)
|
||||
status, error_code = _get_status_and_error_code(e.response)
|
||||
eq(status, 403)
|
||||
eq(error_code, 'AccessDenied')
|
||||
response = client.delete_user_policy(PolicyName='AllowAccessPolicy',
|
||||
UserName=get_alt_user_id())
|
||||
eq(response['ResponseMetadata']['HTTPStatusCode'], 200)
|
||||
response = client.delete_user_policy(PolicyName='DenyAccessPolicy',
|
||||
UserName=get_alt_user_id())
|
||||
eq(response['ResponseMetadata']['HTTPStatusCode'], 200)
|
||||
|
||||
|
||||
@attr(resource='user-policy')
|
||||
@attr(operation='Verify Allow Actions for IAM user policies')
|
||||
@attr(assertion='succeeds')
|
||||
@attr('user-policy')
|
||||
@attr('test_of_iam')
|
||||
def test_verify_allow_iam_actions():
|
||||
policy1 = json.dumps(
|
||||
{"Version": "2012-10-17",
|
||||
"Statement": {"Sid": "98AB54CGA",
|
||||
"Effect": "Allow",
|
||||
"Action": ["iam:PutUserPolicy", "iam:GetUserPolicy",
|
||||
"iam:ListUserPolicies", "iam:DeleteUserPolicy"],
|
||||
"Resource": f"arn:aws:iam:::user/{get_alt_user_id()}"}}
|
||||
)
|
||||
client1 = get_iam_client()
|
||||
iam_client_alt = get_alt_iam_client()
|
||||
|
||||
response = client1.put_user_policy(PolicyDocument=policy1, PolicyName='AllowAccessPolicy',
|
||||
UserName=get_alt_user_id())
|
||||
eq(response['ResponseMetadata']['HTTPStatusCode'], 200)
|
||||
response = iam_client_alt.get_user_policy(PolicyName='AllowAccessPolicy',
|
||||
UserName=get_alt_user_id())
|
||||
eq(response['ResponseMetadata']['HTTPStatusCode'], 200)
|
||||
response = iam_client_alt.list_user_policies(UserName=get_alt_user_id())
|
||||
eq(response['ResponseMetadata']['HTTPStatusCode'], 200)
|
||||
response = iam_client_alt.delete_user_policy(PolicyName='AllowAccessPolicy',
|
||||
UserName=get_alt_user_id())
|
||||
eq(response['ResponseMetadata']['HTTPStatusCode'], 200)
|
Loading…
Reference in a new issue