pytest: add custom marks for each nose @attr

and register them in pytest.ini

Signed-off-by: Casey Bodley <cbodley@redhat.com>
(cherry picked from commit 4864dbc340)
This commit is contained in:
Casey Bodley 2023-01-20 14:35:45 -05:00 committed by Ali Maredia
parent 60cc478d70
commit b6dcf111fa
9 changed files with 896 additions and 2 deletions

40
pytest.ini Normal file
View file

@ -0,0 +1,40 @@
[pytest]
markers =
abac_test
appendobject
bucket_policy
bucket_encryption
cloud_transition
encryption
fails_on_aws
fails_on_dbstore
fails_on_dho
fails_on_mod_proxy_fcgi
fails_on_rgw
fails_on_s3
fails_with_subdomain
lifecycle
lifecycle_expiration
lifecycle_transition
list_objects_v2
object_lock
session_policy
s3select
s3website
s3website_routing_rules
s3website_redirect_location
3website
sse_s3
storage_class
tagging
test_of_iam
test_of_sts
token_claims_trust_policy_test
token_principal_tag_role_policy_test
token_request_tag_trust_policy_test
token_resource_tags_test
token_role_tags_test
token_tag_keys_test
user_policy
versioning
webidentity_test

View file

@ -5,6 +5,7 @@ import boto.s3.connection
import boto.s3.acl
import boto.utils
import nose
import pytest
import operator
import random
import string
@ -185,6 +186,7 @@ def tag(*tags):
@attr(operation='create w/no content length')
@attr(assertion='fails 411')
@attr('fails_on_dbstore')
@pytest.mark.fails_on_dbstore
@nose.with_setup(teardown=_clear_custom_headers)
def test_object_create_bad_contentlength_none():
key = _setup_bad_object(remove=('Content-Length',))
@ -202,6 +204,7 @@ def test_object_create_bad_contentlength_none():
@attr(assertion='fails 400')
@nose.with_setup(teardown=_clear_custom_headers)
@attr('fails_on_rgw')
@pytest.mark.fails_on_rgw
def test_object_create_bad_contentlength_mismatch_above():
content = 'bar'
length = len(content) + 1
@ -225,6 +228,7 @@ def test_object_create_bad_contentlength_mismatch_above():
@attr(operation='create w/empty authorization')
@attr(assertion='fails 403')
@attr('fails_on_dbstore')
@pytest.mark.fails_on_dbstore
@nose.with_setup(teardown=_clear_custom_headers)
def test_object_create_bad_authorization_empty():
key = _setup_bad_object({'Authorization': ''})
@ -240,6 +244,7 @@ def test_object_create_bad_authorization_empty():
@attr(operation='create w/date and x-amz-date')
@attr(assertion='succeeds')
@attr('fails_on_dbstore')
@pytest.mark.fails_on_dbstore
@nose.with_setup(teardown=_clear_custom_headers)
def test_object_create_date_and_amz_date():
date = formatdate(usegmt=True)
@ -252,6 +257,7 @@ def test_object_create_date_and_amz_date():
@attr(operation='create w/x-amz-date and no date')
@attr(assertion='succeeds')
@attr('fails_on_dbstore')
@pytest.mark.fails_on_dbstore
@nose.with_setup(teardown=_clear_custom_headers)
def test_object_create_amz_date_and_no_date():
date = formatdate(usegmt=True)
@ -266,6 +272,7 @@ def test_object_create_amz_date_and_no_date():
@attr(operation='create w/no authorization')
@attr(assertion='fails 403')
@attr('fails_on_dbstore')
@pytest.mark.fails_on_dbstore
@nose.with_setup(teardown=_clear_custom_headers)
def test_object_create_bad_authorization_none():
key = _setup_bad_object(remove=('Authorization',))
@ -282,6 +289,7 @@ def test_object_create_bad_authorization_none():
@attr(operation='create w/no content length')
@attr(assertion='succeeds')
@attr('fails_on_dbstore')
@pytest.mark.fails_on_dbstore
@nose.with_setup(teardown=_clear_custom_headers)
def test_bucket_create_contentlength_none():
_add_custom_headers(remove=('Content-Length',))
@ -294,6 +302,7 @@ def test_bucket_create_contentlength_none():
@attr(operation='set w/no content length')
@attr(assertion='succeeds')
@attr('fails_on_dbstore')
@pytest.mark.fails_on_dbstore
@nose.with_setup(teardown=_clear_custom_headers)
def test_object_acl_create_contentlength_none():
bucket = get_new_bucket()
@ -324,6 +333,7 @@ def _create_new_connection():
@attr(assertion='fails 400')
@nose.with_setup(teardown=_clear_custom_headers)
@attr('fails_on_rgw')
@pytest.mark.fails_on_rgw
def test_bucket_create_bad_contentlength_empty():
conn = _create_new_connection()
_add_custom_headers({'Content-Length': ''})
@ -338,6 +348,7 @@ def test_bucket_create_bad_contentlength_empty():
@attr(operation='create w/no content length')
@attr(assertion='succeeds')
@attr('fails_on_dbstore')
@pytest.mark.fails_on_dbstore
@nose.with_setup(teardown=_clear_custom_headers)
def test_bucket_create_bad_contentlength_none():
_add_custom_headers(remove=('Content-Length',))
@ -350,6 +361,7 @@ def test_bucket_create_bad_contentlength_none():
@attr(operation='create w/empty authorization')
@attr(assertion='fails 403')
@attr('fails_on_dbstore')
@pytest.mark.fails_on_dbstore
@nose.with_setup(teardown=_clear_custom_headers)
def test_bucket_create_bad_authorization_empty():
_add_custom_headers({'Authorization': ''})
@ -366,6 +378,7 @@ def test_bucket_create_bad_authorization_empty():
@attr(operation='create w/no authorization')
@attr(assertion='fails 403')
@attr('fails_on_dbstore')
@pytest.mark.fails_on_dbstore
@nose.with_setup(teardown=_clear_custom_headers)
def test_bucket_create_bad_authorization_none():
_add_custom_headers(remove=('Authorization',))
@ -384,6 +397,7 @@ def test_bucket_create_bad_authorization_none():
@attr(operation='create w/content length too short')
@attr(assertion='fails 400')
@attr('fails_on_dbstore')
@pytest.mark.fails_on_dbstore
@nose.with_setup(teardown=_clear_custom_headers)
def test_object_create_bad_contentlength_mismatch_below_aws2():
check_aws2_support()
@ -402,6 +416,7 @@ def test_object_create_bad_contentlength_mismatch_below_aws2():
@attr(operation='create w/incorrect authorization')
@attr(assertion='fails 403')
@attr('fails_on_dbstore')
@pytest.mark.fails_on_dbstore
@nose.with_setup(teardown=_clear_custom_headers)
def test_object_create_bad_authorization_incorrect_aws2():
check_aws2_support()
@ -419,6 +434,7 @@ def test_object_create_bad_authorization_incorrect_aws2():
@attr(operation='create w/invalid authorization')
@attr(assertion='fails 400')
@attr('fails_on_dbstore')
@pytest.mark.fails_on_dbstore
def test_object_create_bad_authorization_invalid_aws2():
check_aws2_support()
key = _setup_bad_object({'Authorization': 'AWS HAHAHA'})
@ -433,6 +449,7 @@ def test_object_create_bad_authorization_invalid_aws2():
@attr(operation='create w/no date')
@attr(assertion='fails 403')
@attr('fails_on_dbstore')
@pytest.mark.fails_on_dbstore
@nose.with_setup(teardown=_clear_custom_headers)
def test_object_create_bad_date_none_aws2():
check_aws2_support()
@ -463,6 +480,7 @@ def test_bucket_create_bad_authorization_invalid_aws2():
@attr(operation='create w/no date')
@attr(assertion='fails 403')
@attr('fails_on_dbstore')
@pytest.mark.fails_on_dbstore
@nose.with_setup(teardown=_clear_custom_headers)
def test_bucket_create_bad_date_none_aws2():
check_aws2_support()

View file

@ -8,6 +8,7 @@ import time
import email.utils
import isodate
import nose
import pytest
import operator
import socket
import ssl
@ -97,6 +98,7 @@ def _get_alt_connection():
# Breaks DNS with SubdomainCallingFormat
@attr('fails_with_subdomain')
@pytest.mark.fails_with_subdomain
@attr(resource='bucket')
@attr(method='put')
@attr(operation='create w/! in name')
@ -135,7 +137,9 @@ def check_configure_versioning_retry(bucket, status, expected_string):
@attr(operation='create versioned object, read not exist null version')
@attr(assertion='read null version behaves correctly')
@attr('versioning')
@pytest.mark.versioning
@attr('fails_on_dbstore')
@pytest.mark.fails_on_dbstore
def test_versioning_obj_read_not_exist_null():
bucket = get_new_bucket()
check_versioning(bucket, None)
@ -156,9 +160,13 @@ def test_versioning_obj_read_not_exist_null():
@attr(operation='append object')
@attr(assertion='success')
@attr('fails_on_aws')
@pytest.mark.fails_on_aws
@attr('fails_with_subdomain')
@pytest.mark.fails_with_subdomain
@attr('appendobject')
@pytest.mark.appendobject
@attr('fails_on_dbstore')
@pytest.mark.fails_on_dbstore
def test_append_object():
bucket = get_new_bucket()
key = bucket.new_key('foo')
@ -181,9 +189,13 @@ def test_append_object():
@attr(operation='append to normal object')
@attr(assertion='fails 409')
@attr('fails_on_aws')
@pytest.mark.fails_on_aws
@attr('fails_with_subdomain')
@pytest.mark.fails_with_subdomain
@attr('appendobject')
@pytest.mark.appendobject
@attr('fails_on_dbstore')
@pytest.mark.fails_on_dbstore
def test_append_normal_object():
bucket = get_new_bucket()
key = bucket.new_key('foo')
@ -202,9 +214,13 @@ def test_append_normal_object():
@attr(operation='append position not right')
@attr(assertion='fails 409')
@attr('fails_on_aws')
@pytest.mark.fails_on_aws
@attr('fails_with_subdomain')
@pytest.mark.fails_with_subdomain
@attr('appendobject')
@pytest.mark.appendobject
@attr('fails_on_dbstore')
@pytest.mark.fails_on_dbstore
def test_append_object_position_wrong():
bucket = get_new_bucket()
key = bucket.new_key('foo')
@ -227,6 +243,7 @@ def test_append_object_position_wrong():
@attr(operation='set/enable/disable logging target')
@attr(assertion='operations succeed')
@attr('fails_on_rgw')
@pytest.mark.fails_on_rgw
def test_logging_toggle():
bucket = get_new_bucket()
log_bucket = get_new_bucket(targets.main.default, bucket.name + '-log')
@ -407,8 +424,11 @@ def lc_transitions(transitions=None):
@attr(method='put')
@attr(operation='test create object with storage class')
@attr('storage_class')
@pytest.mark.storage_class
@attr('fails_on_aws')
@pytest.mark.fails_on_aws
@attr('fails_on_dbstore')
@pytest.mark.fails_on_dbstore
def test_object_storage_class():
sc = configured_storage_classes()
if len(sc) < 2:
@ -426,8 +446,11 @@ def test_object_storage_class():
@attr(method='put')
@attr(operation='test create multipart object with storage class')
@attr('storage_class')
@pytest.mark.storage_class
@attr('fails_on_aws')
@pytest.mark.fails_on_aws
@attr('fails_on_dbstore')
@pytest.mark.fails_on_dbstore
def test_object_storage_class_multipart():
sc = configured_storage_classes()
if len(sc) < 2:
@ -468,8 +491,11 @@ def _do_test_object_modify_storage_class(obj_write_func, size):
@attr(method='put')
@attr(operation='test changing objects storage class')
@attr('storage_class')
@pytest.mark.storage_class
@attr('fails_on_aws')
@pytest.mark.fails_on_aws
@attr('fails_on_dbstore')
@pytest.mark.fails_on_dbstore
def test_object_modify_storage_class():
_do_test_object_modify_storage_class(_populate_key, size=9*1024*1024)
@ -478,8 +504,11 @@ def test_object_modify_storage_class():
@attr(method='put')
@attr(operation='test changing objects storage class')
@attr('storage_class')
@pytest.mark.storage_class
@attr('fails_on_aws')
@pytest.mark.fails_on_aws
@attr('fails_on_dbstore')
@pytest.mark.fails_on_dbstore
def test_object_modify_storage_class_multipart():
_do_test_object_modify_storage_class(_populate_multipart_key, size=11*1024*1024)
@ -508,8 +537,11 @@ def _do_test_object_storage_class_copy(obj_write_func, size):
@attr(method='copy')
@attr(operation='test copy object to object with different storage class')
@attr('storage_class')
@pytest.mark.storage_class
@attr('fails_on_aws')
@pytest.mark.fails_on_aws
@attr('fails_on_dbstore')
@pytest.mark.fails_on_dbstore
def test_object_storage_class_copy():
_do_test_object_storage_class_copy(_populate_key, size=9*1024*1024)
@ -517,8 +549,11 @@ def test_object_storage_class_copy():
@attr(method='copy')
@attr(operation='test changing objects storage class')
@attr('storage_class')
@pytest.mark.storage_class
@attr('fails_on_aws')
@pytest.mark.fails_on_aws
@attr('fails_on_dbstore')
@pytest.mark.fails_on_dbstore
def test_object_storage_class_copy_multipart():
_do_test_object_storage_class_copy(_populate_multipart_key, size=9*1024*1024)
@ -625,7 +660,9 @@ def _test_atomic_dual_conditional_write(file_size):
@attr(operation='write one or the other')
@attr(assertion='1MB successful')
@attr('fails_on_aws')
@pytest.mark.fails_on_aws
@attr('fails_on_dbstore')
@pytest.mark.fails_on_dbstore
def test_atomic_dual_conditional_write_1mb():
_test_atomic_dual_conditional_write(1024*1024)
@ -634,7 +671,9 @@ def test_atomic_dual_conditional_write_1mb():
@attr(operation='write file in deleted bucket')
@attr(assertion='fail 404')
@attr('fails_on_aws')
@pytest.mark.fails_on_aws
@attr('fails_on_dbstore')
@pytest.mark.fails_on_dbstore
def test_atomic_write_bucket_gone():
bucket = get_new_bucket()
@ -679,7 +718,9 @@ def _multipart_upload_enc(bucket, s3_key_name, size, part_size=5*1024*1024,
@attr(operation='multipart upload with bad key for uploading chunks')
@attr(assertion='successful')
@attr('encryption')
@pytest.mark.encryption
@attr('fails_on_dbstore')
@pytest.mark.fails_on_dbstore
def test_encryption_sse_c_multipart_invalid_chunks_1():
bucket = get_new_bucket()
key = "multipart_enc"
@ -707,7 +748,9 @@ def test_encryption_sse_c_multipart_invalid_chunks_1():
@attr(operation='multipart upload with bad md5 for chunks')
@attr(assertion='successful')
@attr('encryption')
@pytest.mark.encryption
@attr('fails_on_dbstore')
@pytest.mark.fails_on_dbstore
def test_encryption_sse_c_multipart_invalid_chunks_2():
bucket = get_new_bucket()
key = "multipart_enc"
@ -735,8 +778,11 @@ def test_encryption_sse_c_multipart_invalid_chunks_2():
@attr(operation='Test Bucket Policy for a user belonging to a different tenant')
@attr(assertion='succeeds')
@attr('fails_with_subdomain')
@pytest.mark.fails_with_subdomain
@attr('bucket-policy')
@pytest.mark.bucket_policy
@attr('fails_on_dbstore')
@pytest.mark.fails_on_dbstore
def test_bucket_policy_different_tenant():
bucket = get_new_bucket()
key = bucket.new_key('asdf')
@ -775,7 +821,9 @@ def test_bucket_policy_different_tenant():
@attr(method='put')
@attr(operation='Test put condition operator end with ifExists')
@attr('bucket-policy')
@pytest.mark.bucket_policy
@attr('fails_on_dbstore')
@pytest.mark.fails_on_dbstore
def test_bucket_policy_set_condition_operator_end_with_IfExists():
bucket = _create_keys(keys=['foo'])
policy = '''{
@ -815,8 +863,11 @@ def _make_arn_resource(path="*"):
@attr(operation='put obj with RequestObjectTag')
@attr(assertion='success')
@attr('tagging')
@pytest.mark.tagging
@attr('bucket-policy')
@pytest.mark.bucket_policy
@attr('fails_on_dbstore')
@pytest.mark.fails_on_dbstore
def test_bucket_policy_put_obj_request_obj_tag():
bucket = get_new_bucket()

View file

@ -2,6 +2,7 @@
import sys
import collections
import nose
import pytest
import string
import random
from pprint import pprint
@ -45,7 +46,6 @@ ERRORDOC_TEMPLATE = '<html><h1>ErrorDoc</h1><body>{random}</body></html>'
CAN_WEBSITE = None
@attr('fails_on_dbstore')
def check_can_test_website():
global CAN_WEBSITE
# This is a bit expensive, so we cache this
@ -254,7 +254,9 @@ def _website_request(bucket_name, path, connect_hostname=None, method='GET', tim
@attr(operation='list')
@attr(assertion='non-existant bucket via website endpoint should give NoSuchBucket, exposing security risk')
@attr('s3website')
@pytest.mark.s3website
@attr('fails_on_rgw')
@pytest.mark.fails_on_rgw
@nose.with_setup(setup=check_can_test_website, teardown=common.teardown)
def test_website_nonexistant_bucket_s3():
bucket_name = get_new_bucket_name()
@ -267,8 +269,11 @@ def test_website_nonexistant_bucket_s3():
#@attr(assertion='non-existant bucket via website endpoint should give Forbidden, keeping bucket identity secure')
@attr(assertion='non-existant bucket via website endpoint should give NoSuchBucket')
@attr('s3website')
@pytest.mark.s3website
@attr('fails_on_s3')
@pytest.mark.fails_on_s3
@attr('fails_on_dbstore')
@pytest.mark.fails_on_dbstore
@nose.with_setup(setup=check_can_test_website, teardown=common.teardown)
def test_website_nonexistant_bucket_rgw():
bucket_name = get_new_bucket_name()
@ -282,7 +287,9 @@ def test_website_nonexistant_bucket_rgw():
@attr(operation='list')
@attr(assertion='non-empty public buckets via s3website return page for /, where page is public')
@attr('s3website')
@pytest.mark.s3website
@attr('fails_on_dbstore')
@pytest.mark.fails_on_dbstore
@nose.with_setup(setup=check_can_test_website, teardown=common.teardown)
@timed(10)
def test_website_public_bucket_list_public_index():
@ -311,7 +318,9 @@ def test_website_public_bucket_list_public_index():
@attr(operation='list')
@attr(assertion='non-empty private buckets via s3website return page for /, where page is private')
@attr('s3website')
@pytest.mark.s3website
@attr('fails_on_dbstore')
@pytest.mark.fails_on_dbstore
@nose.with_setup(setup=check_can_test_website, teardown=common.teardown)
def test_website_private_bucket_list_public_index():
bucket = get_new_bucket()
@ -342,7 +351,9 @@ def test_website_private_bucket_list_public_index():
@attr(operation='list')
@attr(assertion='empty private buckets via s3website return a 403 for /')
@attr('s3website')
@pytest.mark.s3website
@attr('fails_on_dbstore')
@pytest.mark.fails_on_dbstore
@nose.with_setup(setup=check_can_test_website, teardown=common.teardown)
def test_website_private_bucket_list_empty():
bucket = get_new_bucket()
@ -359,7 +370,9 @@ def test_website_private_bucket_list_empty():
@attr(operation='list')
@attr(assertion='empty public buckets via s3website return a 404 for /')
@attr('s3website')
@pytest.mark.s3website
@attr('fails_on_dbstore')
@pytest.mark.fails_on_dbstore
@nose.with_setup(setup=check_can_test_website, teardown=common.teardown)
def test_website_public_bucket_list_empty():
bucket = get_new_bucket()
@ -375,7 +388,9 @@ def test_website_public_bucket_list_empty():
@attr(operation='list')
@attr(assertion='non-empty public buckets via s3website return page for /, where page is private')
@attr('s3website')
@pytest.mark.s3website
@attr('fails_on_dbstore')
@pytest.mark.fails_on_dbstore
@nose.with_setup(setup=check_can_test_website, teardown=common.teardown)
def test_website_public_bucket_list_private_index():
bucket = get_new_bucket()
@ -401,7 +416,9 @@ def test_website_public_bucket_list_private_index():
@attr(operation='list')
@attr(assertion='non-empty private buckets via s3website return page for /, where page is private')
@attr('s3website')
@pytest.mark.s3website
@attr('fails_on_dbstore')
@pytest.mark.fails_on_dbstore
@nose.with_setup(setup=check_can_test_website, teardown=common.teardown)
def test_website_private_bucket_list_private_index():
bucket = get_new_bucket()
@ -428,7 +445,9 @@ def test_website_private_bucket_list_private_index():
@attr(operation='list')
@attr(assertion='empty private buckets via s3website return a 403 for /, missing errordoc')
@attr('s3website')
@pytest.mark.s3website
@attr('fails_on_dbstore')
@pytest.mark.fails_on_dbstore
@nose.with_setup(setup=check_can_test_website, teardown=common.teardown)
def test_website_private_bucket_list_empty_missingerrordoc():
bucket = get_new_bucket()
@ -445,7 +464,9 @@ def test_website_private_bucket_list_empty_missingerrordoc():
@attr(operation='list')
@attr(assertion='empty public buckets via s3website return a 404 for /, missing errordoc')
@attr('s3website')
@pytest.mark.s3website
@attr('fails_on_dbstore')
@pytest.mark.fails_on_dbstore
@nose.with_setup(setup=check_can_test_website, teardown=common.teardown)
def test_website_public_bucket_list_empty_missingerrordoc():
bucket = get_new_bucket()
@ -461,7 +482,9 @@ def test_website_public_bucket_list_empty_missingerrordoc():
@attr(operation='list')
@attr(assertion='non-empty public buckets via s3website return page for /, where page is private, missing errordoc')
@attr('s3website')
@pytest.mark.s3website
@attr('fails_on_dbstore')
@pytest.mark.fails_on_dbstore
@nose.with_setup(setup=check_can_test_website, teardown=common.teardown)
def test_website_public_bucket_list_private_index_missingerrordoc():
bucket = get_new_bucket()
@ -486,7 +509,9 @@ def test_website_public_bucket_list_private_index_missingerrordoc():
@attr(operation='list')
@attr(assertion='non-empty private buckets via s3website return page for /, where page is private, missing errordoc')
@attr('s3website')
@pytest.mark.s3website
@attr('fails_on_dbstore')
@pytest.mark.fails_on_dbstore
@nose.with_setup(setup=check_can_test_website, teardown=common.teardown)
def test_website_private_bucket_list_private_index_missingerrordoc():
bucket = get_new_bucket()
@ -512,7 +537,9 @@ def test_website_private_bucket_list_private_index_missingerrordoc():
@attr(operation='list')
@attr(assertion='empty private buckets via s3website return a 403 for /, blocked errordoc')
@attr('s3website')
@pytest.mark.s3website
@attr('fails_on_dbstore')
@pytest.mark.fails_on_dbstore
@nose.with_setup(setup=check_can_test_website, teardown=common.teardown)
def test_website_private_bucket_list_empty_blockederrordoc():
bucket = get_new_bucket()
@ -541,7 +568,9 @@ def test_website_private_bucket_list_empty_blockederrordoc():
@attr(operation='list')
@attr(assertion='check if there is an invalid payload after serving error doc')
@attr('s3website')
@pytest.mark.s3website
@attr('fails_on_dbstore')
@pytest.mark.fails_on_dbstore
@nose.with_setup(setup=check_can_test_website, teardown=common.teardown)
def test_website_public_bucket_list_pubilc_errordoc():
bucket = get_new_bucket()
@ -593,7 +622,9 @@ def test_website_public_bucket_list_pubilc_errordoc():
@attr(operation='list')
@attr(assertion='empty public buckets via s3website return a 404 for /, blocked errordoc')
@attr('s3website')
@pytest.mark.s3website
@attr('fails_on_dbstore')
@pytest.mark.fails_on_dbstore
@nose.with_setup(setup=check_can_test_website, teardown=common.teardown)
def test_website_public_bucket_list_empty_blockederrordoc():
bucket = get_new_bucket()
@ -621,7 +652,9 @@ def test_website_public_bucket_list_empty_blockederrordoc():
@attr(operation='list')
@attr(assertion='non-empty public buckets via s3website return page for /, where page is private, blocked errordoc')
@attr('s3website')
@pytest.mark.s3website
@attr('fails_on_dbstore')
@pytest.mark.fails_on_dbstore
@nose.with_setup(setup=check_can_test_website, teardown=common.teardown)
def test_website_public_bucket_list_private_index_blockederrordoc():
bucket = get_new_bucket()
@ -655,7 +688,9 @@ def test_website_public_bucket_list_private_index_blockederrordoc():
@attr(operation='list')
@attr(assertion='non-empty private buckets via s3website return page for /, where page is private, blocked errordoc')
@attr('s3website')
@pytest.mark.s3website
@attr('fails_on_dbstore')
@pytest.mark.fails_on_dbstore
@nose.with_setup(setup=check_can_test_website, teardown=common.teardown)
def test_website_private_bucket_list_private_index_blockederrordoc():
bucket = get_new_bucket()
@ -690,8 +725,10 @@ def test_website_private_bucket_list_private_index_blockederrordoc():
@attr(operation='list')
@attr(assertion='empty private buckets via s3website return a 403 for /, good errordoc')
@attr('s3website')
@pytest.mark.s3website
@nose.with_setup(setup=check_can_test_website, teardown=common.teardown)
@attr('fails_on_dbstore')
@pytest.mark.fails_on_dbstore
def test_website_private_bucket_list_empty_gooderrordoc():
bucket = get_new_bucket()
f = _test_website_prep(bucket, WEBSITE_CONFIGS_XMLFRAG['IndexDocErrorDoc'])
@ -714,7 +751,9 @@ def test_website_private_bucket_list_empty_gooderrordoc():
@attr(operation='list')
@attr(assertion='empty public buckets via s3website return a 404 for /, good errordoc')
@attr('s3website')
@pytest.mark.s3website
@attr('fails_on_dbstore')
@pytest.mark.fails_on_dbstore
@nose.with_setup(setup=check_can_test_website, teardown=common.teardown)
def test_website_public_bucket_list_empty_gooderrordoc():
bucket = get_new_bucket()
@ -739,7 +778,9 @@ def test_website_public_bucket_list_empty_gooderrordoc():
@attr(operation='list')
@attr(assertion='non-empty public buckets via s3website return page for /, where page is private')
@attr('s3website')
@pytest.mark.s3website
@attr('fails_on_dbstore')
@pytest.mark.fails_on_dbstore
@nose.with_setup(setup=check_can_test_website, teardown=common.teardown)
def test_website_public_bucket_list_private_index_gooderrordoc():
bucket = get_new_bucket()
@ -769,7 +810,9 @@ def test_website_public_bucket_list_private_index_gooderrordoc():
@attr(operation='list')
@attr(assertion='non-empty private buckets via s3website return page for /, where page is private')
@attr('s3website')
@pytest.mark.s3website
@attr('fails_on_dbstore')
@pytest.mark.fails_on_dbstore
@nose.with_setup(setup=check_can_test_website, teardown=common.teardown)
def test_website_private_bucket_list_private_index_gooderrordoc():
bucket = get_new_bucket()
@ -800,7 +843,9 @@ def test_website_private_bucket_list_private_index_gooderrordoc():
@attr(operation='list')
@attr(assertion='RedirectAllRequestsTo without protocol should TODO')
@attr('s3website')
@pytest.mark.s3website
@attr('fails_on_dbstore')
@pytest.mark.fails_on_dbstore
@nose.with_setup(setup=check_can_test_website, teardown=common.teardown)
def test_website_bucket_private_redirectall_base():
bucket = get_new_bucket()
@ -818,7 +863,9 @@ def test_website_bucket_private_redirectall_base():
@attr(operation='list')
@attr(assertion='RedirectAllRequestsTo without protocol should TODO')
@attr('s3website')
@pytest.mark.s3website
@attr('fails_on_dbstore')
@pytest.mark.fails_on_dbstore
@nose.with_setup(setup=check_can_test_website, teardown=common.teardown)
def test_website_bucket_private_redirectall_path():
bucket = get_new_bucket()
@ -838,7 +885,9 @@ def test_website_bucket_private_redirectall_path():
@attr(operation='list')
@attr(assertion='RedirectAllRequestsTo without protocol should TODO')
@attr('s3website')
@pytest.mark.s3website
@attr('fails_on_dbstore')
@pytest.mark.fails_on_dbstore
@nose.with_setup(setup=check_can_test_website, teardown=common.teardown)
def test_website_bucket_private_redirectall_path_upgrade():
bucket = get_new_bucket()
@ -860,8 +909,11 @@ def test_website_bucket_private_redirectall_path_upgrade():
@attr(operation='list')
@attr(assertion='x-amz-website-redirect-location should not fire without websiteconf')
@attr('s3website')
@pytest.mark.s3website
@attr('x-amz-website-redirect-location')
@pytest.mark.s3website_redirect_location
@attr('fails_on_dbstore')
@pytest.mark.fails_on_dbstore
@nose.with_setup(setup=check_can_test_website, teardown=common.teardown)
def test_website_xredirect_nonwebsite():
bucket = get_new_bucket()
@ -893,8 +945,11 @@ def test_website_xredirect_nonwebsite():
@attr(operation='list')
@attr(assertion='x-amz-website-redirect-location should fire websiteconf, relative path, public key')
@attr('s3website')
@pytest.mark.s3website
@attr('x-amz-website-redirect-location')
@pytest.mark.s3website_redirect_location
@attr('fails_on_dbstore')
@pytest.mark.fails_on_dbstore
@nose.with_setup(setup=check_can_test_website, teardown=common.teardown)
def test_website_xredirect_public_relative():
bucket = get_new_bucket()
@ -921,8 +976,11 @@ def test_website_xredirect_public_relative():
@attr(operation='list')
@attr(assertion='x-amz-website-redirect-location should fire websiteconf, absolute, public key')
@attr('s3website')
@pytest.mark.s3website
@attr('x-amz-website-redirect-location')
@pytest.mark.s3website_redirect_location
@attr('fails_on_dbstore')
@pytest.mark.fails_on_dbstore
@nose.with_setup(setup=check_can_test_website, teardown=common.teardown)
def test_website_xredirect_public_abs():
bucket = get_new_bucket()
@ -949,8 +1007,11 @@ def test_website_xredirect_public_abs():
@attr(operation='list')
@attr(assertion='x-amz-website-redirect-location should fire websiteconf, relative path, private key')
@attr('s3website')
@pytest.mark.s3website
@attr('x-amz-website-redirect-location')
@pytest.mark.s3website_redirect_location
@attr('fails_on_dbstore')
@pytest.mark.fails_on_dbstore
@nose.with_setup(setup=check_can_test_website, teardown=common.teardown)
def test_website_xredirect_private_relative():
bucket = get_new_bucket()
@ -977,8 +1038,11 @@ def test_website_xredirect_private_relative():
@attr(operation='list')
@attr(assertion='x-amz-website-redirect-location should fire websiteconf, absolute, private key')
@attr('s3website')
@pytest.mark.s3website
@attr('x-amz-website-redirect-location')
@pytest.mark.s3website_redirect_location
@attr('fails_on_dbstore')
@pytest.mark.fails_on_dbstore
@nose.with_setup(setup=check_can_test_website, teardown=common.teardown)
def test_website_xredirect_private_abs():
bucket = get_new_bucket()
@ -1241,8 +1305,11 @@ def routing_check(*args, **kwargs):
assert(False)
@attr('s3website_RoutingRules')
@pytest.mark.s3website_routing_rules
@attr('s3website')
@pytest.mark.s3website
@attr('fails_on_dbstore')
@pytest.mark.fails_on_dbstore
@nose.with_setup(setup=check_can_test_website, teardown=common.teardown)
def test_routing_generator():
for t in ROUTING_RULES_TESTS:

View file

@ -2,6 +2,7 @@ import boto3
from nose.tools import eq_ as eq
from nose.plugins.attrib import attr
import nose
import pytest
from botocore.exceptions import ClientError
from email.utils import formatdate
@ -242,6 +243,7 @@ def test_object_create_bad_expect_none():
@attr(assertion='fails 400')
# TODO: remove 'fails_on_rgw' and once we have learned how to remove the content-length header
@attr('fails_on_rgw')
@pytest.mark.fails_on_rgw
def test_object_create_bad_contentlength_empty():
e = _add_header_create_bad_object({'Content-Length':''})
status, error_code = _get_status_and_error_code(e.response)
@ -253,6 +255,7 @@ def test_object_create_bad_contentlength_empty():
@attr(operation='create w/negative content length')
@attr(assertion='fails 400')
@attr('fails_on_mod_proxy_fcgi')
@pytest.mark.fails_on_mod_proxy_fcgi
def test_object_create_bad_contentlength_negative():
client = get_client()
bucket_name = get_new_bucket()
@ -268,6 +271,7 @@ def test_object_create_bad_contentlength_negative():
@attr(assertion='fails 411')
# TODO: remove 'fails_on_rgw' and once we have learned how to remove the content-length header
@attr('fails_on_rgw')
@pytest.mark.fails_on_rgw
def test_object_create_bad_contentlength_none():
remove = 'Content-Length'
e = _remove_header_create_bad_object('Content-Length')
@ -316,6 +320,7 @@ def test_object_create_bad_contenttype_none():
@attr(assertion='fails 403')
# TODO: remove 'fails_on_rgw' and once we have learned how to remove the authorization header
@attr('fails_on_rgw')
@pytest.mark.fails_on_rgw
def test_object_create_bad_authorization_empty():
e = _add_header_create_bad_object({'Authorization': ''})
status, error_code = _get_status_and_error_code(e.response)
@ -328,6 +333,7 @@ def test_object_create_bad_authorization_empty():
@attr(assertion='succeeds')
# TODO: remove 'fails_on_rgw' and once we have learned how to pass both the 'Date' and 'X-Amz-Date' header during signing and not 'X-Amz-Date' before
@attr('fails_on_rgw')
@pytest.mark.fails_on_rgw
def test_object_create_date_and_amz_date():
date = formatdate(usegmt=True)
bucket_name, key_name = _add_header_create_object({'Date': date, 'X-Amz-Date': date})
@ -341,6 +347,7 @@ def test_object_create_date_and_amz_date():
@attr(assertion='succeeds')
# TODO: remove 'fails_on_rgw' and once we have learned how to pass both the 'Date' and 'X-Amz-Date' header during signing and not 'X-Amz-Date' before
@attr('fails_on_rgw')
@pytest.mark.fails_on_rgw
def test_object_create_amz_date_and_no_date():
date = formatdate(usegmt=True)
bucket_name, key_name = _add_header_create_object({'Date': '', 'X-Amz-Date': date})
@ -355,6 +362,7 @@ def test_object_create_amz_date_and_no_date():
@attr(assertion='fails 403')
# TODO: remove 'fails_on_rgw' and once we have learned how to remove the authorization header
@attr('fails_on_rgw')
@pytest.mark.fails_on_rgw
def test_object_create_bad_authorization_none():
e = _remove_header_create_bad_object('Authorization')
status, error_code = _get_status_and_error_code(e.response)
@ -367,6 +375,7 @@ def test_object_create_bad_authorization_none():
@attr(assertion='succeeds')
# TODO: remove 'fails_on_rgw' and once we have learned how to remove the content-length header
@attr('fails_on_rgw')
@pytest.mark.fails_on_rgw
def test_bucket_create_contentlength_none():
remove = 'Content-Length'
_remove_header_create_bucket(remove)
@ -378,6 +387,7 @@ def test_bucket_create_contentlength_none():
@attr(assertion='succeeds')
# TODO: remove 'fails_on_rgw' and once we have learned how to remove the content-length header
@attr('fails_on_rgw')
@pytest.mark.fails_on_rgw
def test_object_acl_create_contentlength_none():
bucket_name = get_new_bucket()
client = get_client()
@ -439,6 +449,7 @@ def test_bucket_create_bad_expect_empty():
# TODO: The request isn't even making it to the RGW past the frontend
# This test had 'fails_on_rgw' before the move to boto3
@attr('fails_on_rgw')
@pytest.mark.fails_on_rgw
def test_bucket_create_bad_contentlength_empty():
headers = {'Content-Length': ''}
e = _add_header_create_bad_bucket(headers)
@ -451,6 +462,7 @@ def test_bucket_create_bad_contentlength_empty():
@attr(operation='create w/negative content length')
@attr(assertion='fails 400')
@attr('fails_on_mod_proxy_fcgi')
@pytest.mark.fails_on_mod_proxy_fcgi
def test_bucket_create_bad_contentlength_negative():
headers = {'Content-Length': '-1'}
e = _add_header_create_bad_bucket(headers)
@ -464,6 +476,7 @@ def test_bucket_create_bad_contentlength_negative():
@attr(assertion='succeeds')
# TODO: remove 'fails_on_rgw' and once we have learned how to remove the content-length header
@attr('fails_on_rgw')
@pytest.mark.fails_on_rgw
def test_bucket_create_bad_contentlength_none():
remove = 'Content-Length'
_remove_header_create_bucket(remove)
@ -475,6 +488,7 @@ def test_bucket_create_bad_contentlength_none():
@attr(assertion='fails 403')
# TODO: remove 'fails_on_rgw' and once we have learned how to manipulate the authorization header
@attr('fails_on_rgw')
@pytest.mark.fails_on_rgw
def test_bucket_create_bad_authorization_empty():
headers = {'Authorization': ''}
e = _add_header_create_bad_bucket(headers)
@ -489,6 +503,7 @@ def test_bucket_create_bad_authorization_empty():
@attr(assertion='fails 403')
# TODO: remove 'fails_on_rgw' and once we have learned how to manipulate the authorization header
@attr('fails_on_rgw')
@pytest.mark.fails_on_rgw
def test_bucket_create_bad_authorization_none():
e = _remove_header_create_bad_bucket('Authorization')
status, error_code = _get_status_and_error_code(e.response)
@ -515,6 +530,7 @@ def test_object_create_bad_md5_invalid_garbage_aws2():
@attr(assertion='fails 400')
# TODO: remove 'fails_on_rgw' and once we have learned how to manipulate the Content-Length header
@attr('fails_on_rgw')
@pytest.mark.fails_on_rgw
def test_object_create_bad_contentlength_mismatch_below_aws2():
v2_client = get_v2_client()
content = 'bar'
@ -532,6 +548,7 @@ def test_object_create_bad_contentlength_mismatch_below_aws2():
@attr(assertion='fails 403')
# TODO: remove 'fails_on_rgw' and once we have learned how to manipulate the authorization header
@attr('fails_on_rgw')
@pytest.mark.fails_on_rgw
def test_object_create_bad_authorization_incorrect_aws2():
v2_client = get_v2_client()
headers = {'Authorization': 'AWS AKIAIGR7ZNNBHC5BKSUB:FWeDfwojDSdS2Ztmpfeubhd9isU='}
@ -547,6 +564,7 @@ def test_object_create_bad_authorization_incorrect_aws2():
@attr(assertion='fails 400')
# TODO: remove 'fails_on_rgw' and once we have learned how to manipulate the authorization header
@attr('fails_on_rgw')
@pytest.mark.fails_on_rgw
def test_object_create_bad_authorization_invalid_aws2():
v2_client = get_v2_client()
headers = {'Authorization': 'AWS HAHAHA'}
@ -610,6 +628,7 @@ def test_object_create_bad_date_empty_aws2():
@attr(assertion='fails 403')
# TODO: remove 'fails_on_rgw' and once we have learned how to remove the date header
@attr('fails_on_rgw')
@pytest.mark.fails_on_rgw
def test_object_create_bad_date_none_aws2():
v2_client = get_v2_client()
remove = 'x-amz-date'
@ -664,6 +683,7 @@ def test_object_create_bad_date_after_end_aws2():
@attr(assertion='fails 400')
# TODO: remove 'fails_on_rgw' and once we have learned how to remove the date header
@attr('fails_on_rgw')
@pytest.mark.fails_on_rgw
def test_bucket_create_bad_authorization_invalid_aws2():
v2_client = get_v2_client()
headers = {'Authorization': 'AWS HAHAHA'}
@ -725,6 +745,7 @@ def test_bucket_create_bad_date_empty_aws2():
@attr(assertion='fails 403')
# TODO: remove 'fails_on_rgw' and once we have learned how to remove the date header
@attr('fails_on_rgw')
@pytest.mark.fails_on_rgw
def test_bucket_create_bad_date_none_aws2():
v2_client = get_v2_client()
remove = 'x-amz-date'

View file

@ -3,6 +3,7 @@ import json
from botocore.exceptions import ClientError
from nose.plugins.attrib import attr
from nose.tools import eq_ as eq
import pytest
from s3tests_boto3.functional.utils import assert_raises
from s3tests_boto3.functional.test_s3 import _multipart_upload
@ -24,7 +25,9 @@ from .utils import _get_status, _get_status_and_error_code
@attr(operation='Verify Put User Policy')
@attr(assertion='succeeds')
@attr('user-policy')
@pytest.mark.user_policy
@attr('test_of_iam')
@pytest.mark.test_of_iam
def test_put_user_policy():
client = get_iam_client()
@ -48,7 +51,9 @@ def test_put_user_policy():
@attr(operation='Verify Put User Policy with invalid user')
@attr(assertion='succeeds')
@attr('user-policy')
@pytest.mark.user_policy
@attr('test_of_iam')
@pytest.mark.test_of_iam
def test_put_user_policy_invalid_user():
client = get_iam_client()
@ -70,7 +75,9 @@ def test_put_user_policy_invalid_user():
@attr(operation='Verify Put User Policy using parameter value outside limit')
@attr(assertion='succeeds')
@attr('user-policy')
@pytest.mark.user_policy
@attr('test_of_iam')
@pytest.mark.test_of_iam
def test_put_user_policy_parameter_limit():
client = get_iam_client()
@ -93,8 +100,11 @@ def test_put_user_policy_parameter_limit():
@attr(operation='Verify Put User Policy using invalid policy document elements')
@attr(assertion='succeeds')
@attr('user-policy')
@pytest.mark.user_policy
@attr('test_of_iam')
@pytest.mark.test_of_iam
@attr('fails_on_rgw')
@pytest.mark.fails_on_rgw
def test_put_user_policy_invalid_element():
client = get_iam_client()
@ -163,7 +173,9 @@ def test_put_user_policy_invalid_element():
@attr(operation='Verify Put a policy that already exists')
@attr(assertion='succeeds')
@attr('user-policy')
@pytest.mark.user_policy
@attr('test_of_iam')
@pytest.mark.test_of_iam
def test_put_existing_user_policy():
client = get_iam_client()
@ -188,7 +200,9 @@ def test_put_existing_user_policy():
@attr(operation='Verify List User policies')
@attr(assertion='succeeds')
@attr('user-policy')
@pytest.mark.user_policy
@attr('test_of_iam')
@pytest.mark.test_of_iam
def test_list_user_policy():
client = get_iam_client()
@ -213,7 +227,9 @@ def test_list_user_policy():
@attr(operation='Verify List User policies with invalid user')
@attr(assertion='succeeds')
@attr('user-policy')
@pytest.mark.user_policy
@attr('test_of_iam')
@pytest.mark.test_of_iam
def test_list_user_policy_invalid_user():
client = get_iam_client()
e = assert_raises(ClientError, client.list_user_policies, UserName="some-non-existing-user-id")
@ -226,7 +242,9 @@ def test_list_user_policy_invalid_user():
@attr(operation='Verify Get User policy')
@attr(assertion='succeeds')
@attr('user-policy')
@pytest.mark.user_policy
@attr('test_of_iam')
@pytest.mark.test_of_iam
def test_get_user_policy():
client = get_iam_client()
@ -253,7 +271,9 @@ def test_get_user_policy():
@attr(operation='Verify Get User Policy with invalid user')
@attr(assertion='succeeds')
@attr('user-policy')
@pytest.mark.user_policy
@attr('test_of_iam')
@pytest.mark.test_of_iam
def test_get_user_policy_invalid_user():
client = get_iam_client()
@ -279,8 +299,11 @@ def test_get_user_policy_invalid_user():
@attr(operation='Verify Get User Policy with invalid policy name')
@attr(assertion='succeeds')
@attr('user-policy')
@pytest.mark.user_policy
@attr('test_of_iam')
@pytest.mark.test_of_iam
@attr('fails_on_rgw')
@pytest.mark.fails_on_rgw
def test_get_user_policy_invalid_policy_name():
client = get_iam_client()
@ -305,8 +328,11 @@ def test_get_user_policy_invalid_policy_name():
@attr(operation='Verify Get Deleted User Policy')
@attr(assertion='succeeds')
@attr('user-policy')
@pytest.mark.user_policy
@attr('test_of_iam')
@pytest.mark.test_of_iam
@attr('fails_on_rgw')
@pytest.mark.fails_on_rgw
def test_get_deleted_user_policy():
client = get_iam_client()
@ -331,7 +357,9 @@ def test_get_deleted_user_policy():
@attr(operation='Verify Get a policy from multiple policies for a user')
@attr(assertion='succeeds')
@attr('user-policy')
@pytest.mark.user_policy
@attr('test_of_iam')
@pytest.mark.test_of_iam
def test_get_user_policy_from_multiple_policies():
client = get_iam_client()
@ -368,7 +396,9 @@ def test_get_user_policy_from_multiple_policies():
@attr(operation='Verify Delete User Policy')
@attr(assertion='succeeds')
@attr('user-policy')
@pytest.mark.user_policy
@attr('test_of_iam')
@pytest.mark.test_of_iam
def test_delete_user_policy():
client = get_iam_client()
@ -394,7 +424,9 @@ def test_delete_user_policy():
@attr(operation='Verify Delete User Policy with invalid user')
@attr(assertion='succeeds')
@attr('user-policy')
@pytest.mark.user_policy
@attr('test_of_iam')
@pytest.mark.test_of_iam
def test_delete_user_policy_invalid_user():
client = get_iam_client()
@ -424,7 +456,9 @@ def test_delete_user_policy_invalid_user():
@attr(operation='Verify Delete User Policy with invalid policy name')
@attr(assertion='succeeds')
@attr('user-policy')
@pytest.mark.user_policy
@attr('test_of_iam')
@pytest.mark.test_of_iam
def test_delete_user_policy_invalid_policy_name():
client = get_iam_client()
@ -454,7 +488,9 @@ def test_delete_user_policy_invalid_policy_name():
@attr(operation='Verify Delete multiple User policies for a user')
@attr(assertion='succeeds')
@attr('user-policy')
@pytest.mark.user_policy
@attr('test_of_iam')
@pytest.mark.test_of_iam
def test_delete_user_policy_from_multiple_policies():
client = get_iam_client()
@ -498,7 +534,9 @@ def test_delete_user_policy_from_multiple_policies():
@attr(operation='Verify Allow Bucket Actions in user Policy')
@attr(assertion='succeeds')
@attr('user-policy')
@pytest.mark.user_policy
@attr('test_of_iam')
@pytest.mark.test_of_iam
def test_allow_bucket_actions_in_user_policy():
client = get_iam_client()
s3_client_alt = get_alt_client()
@ -549,7 +587,9 @@ def test_allow_bucket_actions_in_user_policy():
@attr(operation='Verify Deny Bucket Actions in user Policy')
@attr(assertion='succeeds')
@attr('user-policy')
@pytest.mark.user_policy
@attr('test_of_iam')
@pytest.mark.test_of_iam
@attr('fails_on_dbstore')
def test_deny_bucket_actions_in_user_policy():
client = get_iam_client()
@ -589,7 +629,9 @@ def test_deny_bucket_actions_in_user_policy():
@attr(operation='Verify Allow Object Actions in user Policy')
@attr(assertion='succeeds')
@attr('user-policy')
@pytest.mark.user_policy
@attr('test_of_iam')
@pytest.mark.test_of_iam
def test_allow_object_actions_in_user_policy():
client = get_iam_client()
s3_client_alt = get_alt_client()
@ -632,7 +674,9 @@ def test_allow_object_actions_in_user_policy():
@attr(operation='Verify Deny Object Actions in user Policy')
@attr(assertion='succeeds')
@attr('user-policy')
@pytest.mark.user_policy
@attr('test_of_iam')
@pytest.mark.test_of_iam
@attr('fails_on_dbstore')
def test_deny_object_actions_in_user_policy():
client = get_iam_client()
@ -676,7 +720,9 @@ def test_deny_object_actions_in_user_policy():
@attr(operation='Verify Allow Multipart Actions in user Policy')
@attr(assertion='succeeds')
@attr('user-policy')
@pytest.mark.user_policy
@attr('test_of_iam')
@pytest.mark.test_of_iam
def test_allow_multipart_actions_in_user_policy():
client = get_iam_client()
s3_client_alt = get_alt_client()
@ -715,7 +761,9 @@ def test_allow_multipart_actions_in_user_policy():
@attr(operation='Verify Deny Multipart Actions in user Policy')
@attr(assertion='succeeds')
@attr('user-policy')
@pytest.mark.user_policy
@attr('test_of_iam')
@pytest.mark.test_of_iam
@attr('fails_on_dbstore')
def test_deny_multipart_actions_in_user_policy():
client = get_iam_client()
@ -762,7 +810,9 @@ def test_deny_multipart_actions_in_user_policy():
@attr(operation='Verify Allow Tagging Actions in user Policy')
@attr(assertion='succeeds')
@attr('user-policy')
@pytest.mark.user_policy
@attr('test_of_iam')
@pytest.mark.test_of_iam
@attr('fails_on_dbstore')
def test_allow_tagging_actions_in_user_policy():
client = get_iam_client()
@ -812,7 +862,9 @@ def test_allow_tagging_actions_in_user_policy():
@attr(operation='Verify Deny Tagging Actions in user Policy')
@attr(assertion='succeeds')
@attr('user-policy')
@pytest.mark.user_policy
@attr('test_of_iam')
@pytest.mark.test_of_iam
@attr('fails_on_dbstore')
def test_deny_tagging_actions_in_user_policy():
client = get_iam_client()
@ -868,7 +920,9 @@ def test_deny_tagging_actions_in_user_policy():
@attr(operation='Verify conflicting user policy statements')
@attr(assertion='succeeds')
@attr('user-policy')
@pytest.mark.user_policy
@attr('test_of_iam')
@pytest.mark.test_of_iam
@attr('fails_on_dbstore')
def test_verify_conflicting_user_policy_statements():
s3client = get_alt_client()
@ -904,7 +958,9 @@ def test_verify_conflicting_user_policy_statements():
@attr(operation='Verify conflicting user policies')
@attr(assertion='succeeds')
@attr('user-policy')
@pytest.mark.user_policy
@attr('test_of_iam')
@pytest.mark.test_of_iam
@attr('fails_on_dbstore')
def test_verify_conflicting_user_policies():
s3client = get_alt_client()
@ -946,7 +1002,9 @@ def test_verify_conflicting_user_policies():
@attr(operation='Verify Allow Actions for IAM user policies')
@attr(assertion='succeeds')
@attr('user-policy')
@pytest.mark.user_policy
@attr('test_of_iam')
@pytest.mark.test_of_iam
def test_verify_allow_iam_actions():
policy1 = json.dumps(
{"Version": "2012-10-17",

File diff suppressed because it is too large Load diff

View file

@ -1,4 +1,5 @@
import nose
import pytest
import random
import string
import re
@ -79,11 +80,13 @@ def generate_s3select_expression_projection(bucket_name,obj_name):
assert( abs(float(res.split("\n")[1]) - eval(e)) < epsilon )
@attr('s3select')
@pytest.mark.s3select
def get_random_string():
return uuid.uuid4().hex[:6].upper()
@attr('s3select')
@pytest.mark.s3select
def test_generate_where_clause():
# create small csv file for testing the random expressions
@ -96,6 +99,7 @@ def test_generate_where_clause():
generate_s3select_where_clause(bucket_name,obj_name)
@attr('s3select')
@pytest.mark.s3select
def test_generate_projection():
# create small csv file for testing the random expressions
@ -312,6 +316,7 @@ def create_list_of_int(column_pos,obj,field_split=",",row_split="\n"):
return list_of_int
@attr('s3select')
@pytest.mark.s3select
def test_count_operation():
csv_obj_name = get_random_string()
bucket_name = "test"
@ -323,6 +328,7 @@ def test_count_operation():
s3select_assert_result( num_of_rows, int( res ))
@attr('s3select')
@pytest.mark.s3select
def test_column_sum_min_max():
csv_obj = create_random_csv_object(10000,10)
@ -388,6 +394,7 @@ def test_column_sum_min_max():
s3select_assert_result( int(count)*4 , int(sum1)-int(sum2) )
@attr('s3select')
@pytest.mark.s3select
def test_nullif_expressions():
csv_obj = create_random_csv_object(10000,10)
@ -443,6 +450,7 @@ def test_nullif_expressions():
s3select_assert_result( res_s3select_nullif, res_s3select)
@attr('s3select')
@pytest.mark.s3select
def test_nulliftrue_expressions():
csv_obj = create_random_csv_object(10000,10)
@ -470,6 +478,7 @@ def test_nulliftrue_expressions():
s3select_assert_result( res_s3select_nullif, res_s3select)
@attr('s3select')
@pytest.mark.s3select
def test_is_not_null_expressions():
csv_obj = create_random_csv_object(10000,10)
@ -491,6 +500,7 @@ def test_is_not_null_expressions():
s3select_assert_result( res_s3select_null, res_s3select)
@attr('s3select')
@pytest.mark.s3select
def test_lowerupper_expressions():
csv_obj = create_random_csv_object(1,10)
@ -508,6 +518,7 @@ def test_lowerupper_expressions():
s3select_assert_result( res_s3select, "AB12CD$$")
@attr('s3select')
@pytest.mark.s3select
def test_in_expressions():
# purpose of test: engine is process correctly several projections containing aggregation-functions
@ -578,6 +589,7 @@ def test_in_expressions():
s3select_assert_result( res_s3select_in, res_s3select )
@attr('s3select')
@pytest.mark.s3select
def test_true_false_in_expressions():
csv_obj = create_random_csv_object(10000,10)
@ -623,6 +635,7 @@ def test_true_false_in_expressions():
s3select_assert_result( res_s3select_in, res_s3select )
@attr('s3select')
@pytest.mark.s3select
def test_like_expressions():
csv_obj = create_random_csv_object_string(1000,10)
@ -710,6 +723,7 @@ def test_like_expressions():
s3select_assert_result( res_s3select_like, res_s3select )
@attr('s3select')
@pytest.mark.s3select
def test_truefalselike_expressions():
csv_obj = create_random_csv_object_string(1000,10)
@ -755,6 +769,7 @@ def test_truefalselike_expressions():
s3select_assert_result( res_s3select_like, res_s3select )
@attr('s3select')
@pytest.mark.s3select
def test_nullif_expressions():
csv_obj = create_random_csv_object(10000,10)
@ -782,6 +797,7 @@ def test_nullif_expressions():
nose.tools.assert_equal( res_s3select_nullif, res_s3select)
@attr('s3select')
@pytest.mark.s3select
def test_lowerupper_expressions():
csv_obj = create_random_csv_object(1,10)
@ -799,6 +815,7 @@ def test_lowerupper_expressions():
nose.tools.assert_equal( res_s3select, "AB12CD$$")
@attr('s3select')
@pytest.mark.s3select
def test_in_expressions():
# purpose of test: engine is process correctly several projections containing aggregation-functions
@ -839,6 +856,7 @@ def test_in_expressions():
nose.tools.assert_equal( res_s3select_in, res_s3select )
@attr('s3select')
@pytest.mark.s3select
def test_like_expressions():
csv_obj = create_random_csv_object_string(10000,10)
@ -885,6 +903,7 @@ def test_like_expressions():
@attr('s3select')
@pytest.mark.s3select
def test_complex_expressions():
# purpose of test: engine is process correctly several projections containing aggregation-functions
@ -917,6 +936,7 @@ def test_complex_expressions():
s3select_assert_result( res_s3select_between_numbers, res_s3select_eq_modolu)
@attr('s3select')
@pytest.mark.s3select
def test_alias():
# purpose: test is comparing result of exactly the same queries , one with alias the other without.
@ -938,6 +958,7 @@ def test_alias():
@attr('s3select')
@pytest.mark.s3select
def test_alias_cyclic_refernce():
number_of_rows = 10000
@ -956,6 +977,7 @@ def test_alias_cyclic_refernce():
assert int(find_res) >= 0
@attr('s3select')
@pytest.mark.s3select
def test_datetime():
# purpose of test is to validate date-time functionality is correct,
@ -987,6 +1009,7 @@ def test_datetime():
s3select_assert_result( res_s3select_date_time_to_timestamp, res_s3select_substring)
@attr('s3select')
@pytest.mark.s3select
def test_true_false_datetime():
# purpose of test is to validate date-time functionality is correct,
@ -1021,6 +1044,7 @@ def test_true_false_datetime():
s3select_assert_result( res_s3select_date_time_utcnow, res_s3select_count)
@attr('s3select')
@pytest.mark.s3select
def test_csv_parser():
# purpuse: test default csv values(, \n " \ ), return value may contain meta-char
@ -1061,6 +1085,7 @@ def test_csv_parser():
s3select_assert_result( res_s3select_alias, 'null')
@attr('s3select')
@pytest.mark.s3select
def test_csv_definition():
number_of_rows = 10000
@ -1091,6 +1116,7 @@ def test_csv_definition():
@attr('s3select')
@pytest.mark.s3select
def test_schema_definition():
number_of_rows = 10000
@ -1126,6 +1152,7 @@ def test_schema_definition():
assert ((res_multiple_defintion.find("multiple definition of column {c4} as schema-column and alias")) >= 0)
@attr('s3select')
@pytest.mark.s3select
def test_when_then_else_expressions():
csv_obj = create_random_csv_object(10000,10)
@ -1155,6 +1182,7 @@ def test_when_then_else_expressions():
s3select_assert_result( str(count3) , res2)
@attr('s3select')
@pytest.mark.s3select
def test_coalesce_expressions():
csv_obj = create_random_csv_object(10000,10)
@ -1177,6 +1205,7 @@ def test_coalesce_expressions():
@attr('s3select')
@pytest.mark.s3select
def test_cast_expressions():
csv_obj = create_random_csv_object(10000,10)
@ -1198,6 +1227,7 @@ def test_cast_expressions():
s3select_assert_result( res_s3select, res)
@attr('s3select')
@pytest.mark.s3select
def test_version():
return
@ -1216,6 +1246,7 @@ def test_version():
s3select_assert_result( res_version, "41.a," )
@attr('s3select')
@pytest.mark.s3select
def test_trim_expressions():
csv_obj = create_random_csv_object_trim(10000,10)
@ -1255,6 +1286,7 @@ def test_trim_expressions():
s3select_assert_result( res_s3select_trim, res_s3select )
@attr('s3select')
@pytest.mark.s3select
def test_truefalse_trim_expressions():
csv_obj = create_random_csv_object_trim(10000,10)
@ -1294,6 +1326,7 @@ def test_truefalse_trim_expressions():
s3select_assert_result( res_s3select_trim, res_s3select )
@attr('s3select')
@pytest.mark.s3select
def test_escape_expressions():
csv_obj = create_random_csv_object_escape(10000,10)
@ -1315,6 +1348,7 @@ def test_escape_expressions():
s3select_assert_result( res_s3select_escape, res_s3select )
@attr('s3select')
@pytest.mark.s3select
def test_case_value_expressions():
csv_obj = create_random_csv_object(10000,10)
@ -1330,6 +1364,7 @@ def test_case_value_expressions():
s3select_assert_result( res_s3select_case, res_s3select )
@attr('s3select')
@pytest.mark.s3select
def test_bool_cast_expressions():
csv_obj = create_random_csv_object(10000,10)
@ -1345,6 +1380,7 @@ def test_bool_cast_expressions():
s3select_assert_result( res_s3select_cast, res_s3select )
@attr('s3select')
@pytest.mark.s3select
def test_progress_expressions():
csv_obj = create_random_csv_object(1000000,10)
@ -1372,6 +1408,7 @@ def test_progress_expressions():
s3select_assert_result({}, res_s3select_response[total_response-1])
@attr('s3select')
@pytest.mark.s3select
def test_output_serial_expressions():
return # TODO fix test

View file

@ -5,6 +5,7 @@ from botocore.exceptions import ParamValidationError
from nose.tools import eq_ as eq
from nose.plugins.attrib import attr
from nose.plugins.skip import SkipTest
import pytest
import isodate
import email.utils
import datetime
@ -156,7 +157,9 @@ def get_s3_resource_using_iam_creds():
@attr(operation='check')
@attr(assertion='s3 ops only accessible by temporary credentials')
@attr('test_of_sts')
@pytest.mark.test_of_sts
@attr('fails_on_dbstore')
@pytest.mark.fails_on_dbstore
def test_get_session_token():
iam_client=get_iam_client()
sts_client=get_sts_client()
@ -190,7 +193,9 @@ def test_get_session_token():
@attr(operation='check')
@attr(assertion='s3 ops denied by permanent credentials')
@attr('test_of_sts')
@pytest.mark.test_of_sts
@attr('fails_on_dbstore')
@pytest.mark.fails_on_dbstore
def test_get_session_token_permanent_creds_denied():
s3bucket_error=None
iam_client=get_iam_client()
@ -227,7 +232,9 @@ def test_get_session_token_permanent_creds_denied():
@attr(operation='check')
@attr(assertion='role policy allows all s3 ops')
@attr('test_of_sts')
@pytest.mark.test_of_sts
@attr('fails_on_dbstore')
@pytest.mark.fails_on_dbstore
def test_assume_role_allow():
iam_client=get_iam_client()
sts_client=get_sts_client()
@ -264,7 +271,9 @@ def test_assume_role_allow():
@attr(operation='check')
@attr(assertion='role policy denies all s3 ops')
@attr('test_of_sts')
@pytest.mark.test_of_sts
@attr('fails_on_dbstore')
@pytest.mark.fails_on_dbstore
def test_assume_role_deny():
s3bucket_error=None
iam_client=get_iam_client()
@ -303,7 +312,9 @@ def test_assume_role_deny():
@attr(operation='check')
@attr(assertion='creds expire so all s3 ops fails')
@attr('test_of_sts')
@pytest.mark.test_of_sts
@attr('fails_on_dbstore')
@pytest.mark.fails_on_dbstore
def test_assume_role_creds_expiry():
iam_client=get_iam_client()
sts_client=get_sts_client()
@ -342,7 +353,9 @@ def test_assume_role_creds_expiry():
@attr(operation='check')
@attr(assertion='HEAD fails with 403 when role policy denies s3:ListBucket')
@attr('test_of_sts')
@pytest.mark.test_of_sts
@attr('fails_on_dbstore')
@pytest.mark.fails_on_dbstore
def test_assume_role_deny_head_nonexistent():
# create a bucket with the normal s3 client
bucket_name = get_new_bucket_name()
@ -384,7 +397,9 @@ def test_assume_role_deny_head_nonexistent():
@attr(operation='check')
@attr(assertion='HEAD fails with 404 when role policy allows s3:ListBucket')
@attr('test_of_sts')
@pytest.mark.test_of_sts
@attr('fails_on_dbstore')
@pytest.mark.fails_on_dbstore
def test_assume_role_allow_head_nonexistent():
# create a bucket with the normal s3 client
bucket_name = get_new_bucket_name()
@ -427,8 +442,11 @@ def test_assume_role_allow_head_nonexistent():
@attr(operation='check')
@attr(assertion='assuming role through web token')
@attr('webidentity_test')
@pytest.mark.webidentity_test
@attr('token_claims_trust_policy_test')
@pytest.mark.token_claims_trust_policy_test
@attr('fails_on_dbstore')
@pytest.mark.fails_on_dbstore
def test_assume_role_with_web_identity():
check_webidentity()
iam_client=get_iam_client()
@ -481,6 +499,7 @@ def test_assume_role_with_web_identity():
@attr(operation='check')
@attr(assertion='assume_role_with_web_token creds expire')
@attr('webidentity_test')
@pytest.mark.webidentity_test
def test_assume_role_with_web_identity_invalid_webtoken():
resp_error=None
iam_client=get_iam_client()
@ -531,8 +550,11 @@ def test_assume_role_with_web_identity_invalid_webtoken():
@attr(operation='check')
@attr(assertion='checking session policy working for two different buckets')
@attr('webidentity_test')
@pytest.mark.webidentity_test
@attr('session_policy')
@pytest.mark.session_policy
@attr('fails_on_dbstore')
@pytest.mark.fails_on_dbstore
def test_session_policy_check_on_different_buckets():
check_webidentity()
iam_client=get_iam_client()
@ -604,8 +626,11 @@ def test_session_policy_check_on_different_buckets():
@attr(operation='check')
@attr(assertion='checking session policy working for same bucket')
@attr('webidentity_test')
@pytest.mark.webidentity_test
@attr('session_policy')
@pytest.mark.session_policy
@attr('fails_on_dbstore')
@pytest.mark.fails_on_dbstore
def test_session_policy_check_on_same_bucket():
check_webidentity()
iam_client=get_iam_client()
@ -665,8 +690,11 @@ def test_session_policy_check_on_same_bucket():
@attr(operation='check')
@attr(assertion='checking put_obj op denial')
@attr('webidentity_test')
@pytest.mark.webidentity_test
@attr('session_policy')
@pytest.mark.session_policy
@attr('fails_on_dbstore')
@pytest.mark.fails_on_dbstore
def test_session_policy_check_put_obj_denial():
check_webidentity()
iam_client=get_iam_client()
@ -731,8 +759,11 @@ def test_session_policy_check_put_obj_denial():
@attr(operation='check')
@attr(assertion='checking put_obj working by swapping policies')
@attr('webidentity_test')
@pytest.mark.webidentity_test
@attr('session_policy')
@pytest.mark.session_policy
@attr('fails_on_dbstore')
@pytest.mark.fails_on_dbstore
def test_swapping_role_policy_and_session_policy():
check_webidentity()
iam_client=get_iam_client()
@ -792,8 +823,11 @@ def test_swapping_role_policy_and_session_policy():
@attr(operation='check')
@attr(assertion='checking put_obj working by setting different permissions to role and session policy')
@attr('webidentity_test')
@pytest.mark.webidentity_test
@attr('session_policy')
@pytest.mark.session_policy
@attr('fails_on_dbstore')
@pytest.mark.fails_on_dbstore
def test_session_policy_check_different_op_permissions():
check_webidentity()
iam_client=get_iam_client()
@ -858,8 +892,11 @@ def test_session_policy_check_different_op_permissions():
@attr(operation='check')
@attr(assertion='checking op behaviour with deny effect')
@attr('webidentity_test')
@pytest.mark.webidentity_test
@attr('session_policy')
@pytest.mark.session_policy
@attr('fails_on_dbstore')
@pytest.mark.fails_on_dbstore
def test_session_policy_check_with_deny_effect():
check_webidentity()
iam_client=get_iam_client()
@ -923,8 +960,11 @@ def test_session_policy_check_with_deny_effect():
@attr(operation='check')
@attr(assertion='checking put_obj working with deny and allow on same op')
@attr('webidentity_test')
@pytest.mark.webidentity_test
@attr('session_policy')
@pytest.mark.session_policy
@attr('fails_on_dbstore')
@pytest.mark.fails_on_dbstore
def test_session_policy_check_with_deny_on_same_op():
check_webidentity()
iam_client=get_iam_client()
@ -988,8 +1028,11 @@ def test_session_policy_check_with_deny_on_same_op():
@attr(operation='check')
@attr(assertion='checking op when bucket policy has role arn')
@attr('webidentity_test')
@pytest.mark.webidentity_test
@attr('session_policy')
@pytest.mark.session_policy
@attr('fails_on_dbstore')
@pytest.mark.fails_on_dbstore
def test_session_policy_bucket_policy_role_arn():
check_webidentity()
iam_client=get_iam_client()
@ -1068,8 +1111,11 @@ def test_session_policy_bucket_policy_role_arn():
@attr(operation='check')
@attr(assertion='checking op when bucket policy has session arn')
@attr('webidentity_test')
@pytest.mark.webidentity_test
@attr('session_policy')
@pytest.mark.session_policy
@attr('fails_on_dbstore')
@pytest.mark.fails_on_dbstore
def test_session_policy_bucket_policy_session_arn():
check_webidentity()
iam_client=get_iam_client()
@ -1146,8 +1192,11 @@ def test_session_policy_bucket_policy_session_arn():
@attr(operation='check')
@attr(assertion='checking copy object op with role, session and bucket policy')
@attr('webidentity_test')
@pytest.mark.webidentity_test
@attr('session_policy')
@pytest.mark.session_policy
@attr('fails_on_dbstore')
@pytest.mark.fails_on_dbstore
def test_session_policy_copy_object():
check_webidentity()
iam_client=get_iam_client()
@ -1231,8 +1280,11 @@ def test_session_policy_copy_object():
@attr(operation='check')
@attr(assertion='checking op is denied when no role policy')
@attr('webidentity_test')
@pytest.mark.webidentity_test
@attr('session_policy')
@pytest.mark.session_policy
@attr('fails_on_dbstore')
@pytest.mark.fails_on_dbstore
def test_session_policy_no_bucket_role_policy():
check_webidentity()
iam_client=get_iam_client()
@ -1287,8 +1339,11 @@ def test_session_policy_no_bucket_role_policy():
@attr(operation='check')
@attr(assertion='checking op is denied when resource policy denies')
@attr('webidentity_test')
@pytest.mark.webidentity_test
@attr('session_policy')
@pytest.mark.session_policy
@attr('fails_on_dbstore')
@pytest.mark.fails_on_dbstore
def test_session_policy_bucket_policy_deny():
check_webidentity()
iam_client=get_iam_client()
@ -1365,8 +1420,11 @@ def test_session_policy_bucket_policy_deny():
@attr(operation='check')
@attr(assertion='assuming role using web token using sub in trust policy')
@attr('webidentity_test')
@pytest.mark.webidentity_test
@attr('token_claims_trust_policy_test')
@pytest.mark.token_claims_trust_policy_test
@attr('fails_on_dbstore')
@pytest.mark.fails_on_dbstore
def test_assume_role_with_web_identity_with_sub():
check_webidentity()
iam_client=get_iam_client()
@ -1418,8 +1476,11 @@ def test_assume_role_with_web_identity_with_sub():
@attr(operation='check')
@attr(assertion='assuming role using web token using azp in trust policy')
@attr('webidentity_test')
@pytest.mark.webidentity_test
@attr('token_claims_trust_policy_test')
@pytest.mark.token_claims_trust_policy_test
@attr('fails_on_dbstore')
@pytest.mark.fails_on_dbstore
def test_assume_role_with_web_identity_with_azp():
check_webidentity()
iam_client=get_iam_client()
@ -1471,9 +1532,13 @@ def test_assume_role_with_web_identity_with_azp():
@attr(operation='check')
@attr(assertion='assuming role using web token using aws:RequestTag in trust policy')
@attr('webidentity_test')
@pytest.mark.webidentity_test
@attr('abac_test')
@pytest.mark.abac_test
@attr('token_request_tag_trust_policy_test')
@pytest.mark.token_request_tag_trust_policy_test
@attr('fails_on_dbstore')
@pytest.mark.fails_on_dbstore
def test_assume_role_with_web_identity_with_request_tag():
check_webidentity()
iam_client=get_iam_client()
@ -1524,9 +1589,13 @@ def test_assume_role_with_web_identity_with_request_tag():
@attr(operation='check')
@attr(assertion='assuming role using web token with aws:PrincipalTag in role policy')
@attr('webidentity_test')
@pytest.mark.webidentity_test
@attr('abac_test')
@pytest.mark.abac_test
@attr('token_principal_tag_role_policy_test')
@pytest.mark.token_principal_tag_role_policy_test
@attr('fails_on_dbstore')
@pytest.mark.fails_on_dbstore
def test_assume_role_with_web_identity_with_principal_tag():
check_webidentity()
iam_client=get_iam_client()
@ -1577,9 +1646,13 @@ def test_assume_role_with_web_identity_with_principal_tag():
@attr(operation='check')
@attr(assertion='assuming role using web token with aws:PrincipalTag in role policy')
@attr('webidentity_test')
@pytest.mark.webidentity_test
@attr('abac_test')
@pytest.mark.abac_test
@attr('token_principal_tag_role_policy_test')
@pytest.mark.token_principal_tag_role_policy_test
@attr('fails_on_dbstore')
@pytest.mark.fails_on_dbstore
def test_assume_role_with_web_identity_for_all_values():
check_webidentity()
iam_client=get_iam_client()
@ -1630,9 +1703,13 @@ def test_assume_role_with_web_identity_for_all_values():
@attr(operation='check')
@attr(assertion='assuming role using web token with aws:PrincipalTag in role policy')
@attr('webidentity_test')
@pytest.mark.webidentity_test
@attr('abac_test')
@pytest.mark.abac_test
@attr('token_principal_tag_role_policy_test')
@pytest.mark.token_principal_tag_role_policy_test
@attr('fails_on_dbstore')
@pytest.mark.fails_on_dbstore
def test_assume_role_with_web_identity_for_all_values_deny():
check_webidentity()
iam_client=get_iam_client()
@ -1685,9 +1762,13 @@ def test_assume_role_with_web_identity_for_all_values_deny():
@attr(operation='check')
@attr(assertion='assuming role using web token with aws:TagKeys in trust policy')
@attr('webidentity_test')
@pytest.mark.webidentity_test
@attr('abac_test')
@pytest.mark.abac_test
@attr('token_tag_keys_test')
@pytest.mark.token_tag_keys_test
@attr('fails_on_dbstore')
@pytest.mark.fails_on_dbstore
def test_assume_role_with_web_identity_tag_keys_trust_policy():
check_webidentity()
iam_client=get_iam_client()
@ -1738,9 +1819,13 @@ def test_assume_role_with_web_identity_tag_keys_trust_policy():
@attr(operation='check')
@attr(assertion='assuming role using web token with aws:TagKeys in role permission policy')
@attr('webidentity_test')
@pytest.mark.webidentity_test
@attr('abac_test')
@pytest.mark.abac_test
@attr('token_tag_keys_test')
@pytest.mark.token_tag_keys_test
@attr('fails_on_dbstore')
@pytest.mark.fails_on_dbstore
def test_assume_role_with_web_identity_tag_keys_role_policy():
check_webidentity()
iam_client=get_iam_client()
@ -1791,9 +1876,13 @@ def test_assume_role_with_web_identity_tag_keys_role_policy():
@attr(operation='check')
@attr(assertion='assuming role using web token with s3:ResourceTag in role permission policy')
@attr('webidentity_test')
@pytest.mark.webidentity_test
@attr('abac_test')
@pytest.mark.abac_test
@attr('token_resource_tags_test')
@pytest.mark.token_resource_tags_test
@attr('fails_on_dbstore')
@pytest.mark.fails_on_dbstore
def test_assume_role_with_web_identity_resource_tag():
check_webidentity()
iam_client=get_iam_client()
@ -1854,9 +1943,13 @@ def test_assume_role_with_web_identity_resource_tag():
@attr(operation='check')
@attr(assertion='assuming role using web token with s3:ResourceTag with missing tags on bucket')
@attr('webidentity_test')
@pytest.mark.webidentity_test
@attr('abac_test')
@pytest.mark.abac_test
@attr('token_resource_tags_test')
@pytest.mark.token_resource_tags_test
@attr('fails_on_dbstore')
@pytest.mark.fails_on_dbstore
def test_assume_role_with_web_identity_resource_tag_deny():
check_webidentity()
iam_client=get_iam_client()
@ -1917,9 +2010,13 @@ def test_assume_role_with_web_identity_resource_tag_deny():
@attr(operation='check')
@attr(assertion='assuming role using web token with s3:ResourceTag with wrong resource tag in policy')
@attr('webidentity_test')
@pytest.mark.webidentity_test
@attr('abac_test')
@pytest.mark.abac_test
@attr('token_resource_tags_test')
@pytest.mark.token_resource_tags_test
@attr('fails_on_dbstore')
@pytest.mark.fails_on_dbstore
def test_assume_role_with_web_identity_wrong_resource_tag_deny():
check_webidentity()
iam_client=get_iam_client()
@ -1983,9 +2080,13 @@ def test_assume_role_with_web_identity_wrong_resource_tag_deny():
@attr(operation='check')
@attr(assertion='assuming role using web token with s3:ResourceTag matching aws:PrincipalTag in role permission policy')
@attr('webidentity_test')
@pytest.mark.webidentity_test
@attr('abac_test')
@pytest.mark.abac_test
@attr('token_resource_tags_test')
@pytest.mark.token_resource_tags_test
@attr('fails_on_dbstore')
@pytest.mark.fails_on_dbstore
def test_assume_role_with_web_identity_resource_tag_princ_tag():
check_webidentity()
iam_client=get_iam_client()
@ -2051,9 +2152,13 @@ def test_assume_role_with_web_identity_resource_tag_princ_tag():
@attr(operation='check')
@attr(assertion='assuming role using web token with s3:ResourceTag used to test copy object')
@attr('webidentity_test')
@pytest.mark.webidentity_test
@attr('abac_test')
@pytest.mark.abac_test
@attr('token_resource_tags_test')
@pytest.mark.token_resource_tags_test
@attr('fails_on_dbstore')
@pytest.mark.fails_on_dbstore
def test_assume_role_with_web_identity_resource_tag_copy_obj():
check_webidentity()
iam_client=get_iam_client()
@ -2146,9 +2251,13 @@ def test_assume_role_with_web_identity_resource_tag_copy_obj():
@attr(operation='check')
@attr(assertion='assuming role using web token with iam:ResourceTag in role trust policy')
@attr('webidentity_test')
@pytest.mark.webidentity_test
@attr('abac_test')
@pytest.mark.abac_test
@attr('token_role_tags_test')
@pytest.mark.token_role_tags_test
@attr('fails_on_dbstore')
@pytest.mark.fails_on_dbstore
def test_assume_role_with_web_identity_role_resource_tag():
check_webidentity()
iam_client=get_iam_client()