nose: remove nose attrs and imports

Signed-off-by: Casey Bodley <cbodley@redhat.com>
(cherry picked from commit 4c75fba0de)
This commit is contained in:
Casey Bodley 2023-01-21 14:24:31 -05:00 committed by Ali Maredia
parent 8186dd7561
commit b4bb9afb52
8 changed files with 1 additions and 3836 deletions

View file

@ -4,7 +4,6 @@ import boto.exception
import boto.s3.connection
import boto.s3.acl
import boto.utils
import nose
import pytest
import operator
import random
@ -19,8 +18,6 @@ from urllib.parse import urlparse
from boto.s3.connection import S3Connection
from nose.plugins.attrib import attr
from .utils import assert_raises
from email.header import decode_header
@ -183,11 +180,6 @@ def tag(*tags):
#
@tag('auth_common')
@attr(resource='object')
@attr(method='put')
@attr(operation='create w/no content length')
@attr(assertion='fails 411')
@attr('fails_on_dbstore')
@pytest.mark.fails_on_dbstore
def test_object_create_bad_contentlength_none():
key = _setup_bad_object(remove=('Content-Length',))
@ -199,11 +191,6 @@ def test_object_create_bad_contentlength_none():
@tag('auth_common')
@attr(resource='object')
@attr(method='put')
@attr(operation='create w/content length too long')
@attr(assertion='fails 400')
@attr('fails_on_rgw')
@pytest.mark.fails_on_rgw
def test_object_create_bad_contentlength_mismatch_above():
content = 'bar'
@ -223,11 +210,6 @@ def test_object_create_bad_contentlength_mismatch_above():
@tag('auth_common')
@attr(resource='object')
@attr(method='put')
@attr(operation='create w/empty authorization')
@attr(assertion='fails 403')
@attr('fails_on_dbstore')
@pytest.mark.fails_on_dbstore
def test_object_create_bad_authorization_empty():
key = _setup_bad_object({'Authorization': ''})
@ -238,11 +220,6 @@ def test_object_create_bad_authorization_empty():
assert e.error_code == 'AccessDenied'
@tag('auth_common')
@attr(resource='object')
@attr(method='put')
@attr(operation='create w/date and x-amz-date')
@attr(assertion='succeeds')
@attr('fails_on_dbstore')
@pytest.mark.fails_on_dbstore
def test_object_create_date_and_amz_date():
date = formatdate(usegmt=True)
@ -250,11 +227,6 @@ def test_object_create_date_and_amz_date():
key.set_contents_from_string('bar')
@tag('auth_common')
@attr(resource='object')
@attr(method='put')
@attr(operation='create w/x-amz-date and no date')
@attr(assertion='succeeds')
@attr('fails_on_dbstore')
@pytest.mark.fails_on_dbstore
def test_object_create_amz_date_and_no_date():
date = formatdate(usegmt=True)
@ -264,11 +236,6 @@ def test_object_create_amz_date_and_no_date():
# the teardown is really messed up here. check it out
@tag('auth_common')
@attr(resource='object')
@attr(method='put')
@attr(operation='create w/no authorization')
@attr(assertion='fails 403')
@attr('fails_on_dbstore')
@pytest.mark.fails_on_dbstore
def test_object_create_bad_authorization_none():
key = _setup_bad_object(remove=('Authorization',))
@ -280,11 +247,6 @@ def test_object_create_bad_authorization_none():
@tag('auth_common')
@attr(resource='bucket')
@attr(method='put')
@attr(operation='create w/no content length')
@attr(assertion='succeeds')
@attr('fails_on_dbstore')
@pytest.mark.fails_on_dbstore
def test_bucket_create_contentlength_none():
_add_custom_headers(remove=('Content-Length',))
@ -292,11 +254,6 @@ def test_bucket_create_contentlength_none():
@tag('auth_common')
@attr(resource='bucket')
@attr(method='acls')
@attr(operation='set w/no content length')
@attr(assertion='succeeds')
@attr('fails_on_dbstore')
@pytest.mark.fails_on_dbstore
def test_object_acl_create_contentlength_none():
bucket = get_new_bucket()
@ -321,11 +278,6 @@ def _create_new_connection():
return TargetConnection(targets.main.default.conf, conn)
@tag('auth_common')
@attr(resource='bucket')
@attr(method='put')
@attr(operation='create w/empty content length')
@attr(assertion='fails 400')
@attr('fails_on_rgw')
@pytest.mark.fails_on_rgw
def test_bucket_create_bad_contentlength_empty():
conn = _create_new_connection()
@ -336,11 +288,6 @@ def test_bucket_create_bad_contentlength_empty():
@tag('auth_common')
@attr(resource='bucket')
@attr(method='put')
@attr(operation='create w/no content length')
@attr(assertion='succeeds')
@attr('fails_on_dbstore')
@pytest.mark.fails_on_dbstore
def test_bucket_create_bad_contentlength_none():
_add_custom_headers(remove=('Content-Length',))
@ -348,11 +295,6 @@ def test_bucket_create_bad_contentlength_none():
@tag('auth_common')
@attr(resource='bucket')
@attr(method='put')
@attr(operation='create w/empty authorization')
@attr(assertion='fails 403')
@attr('fails_on_dbstore')
@pytest.mark.fails_on_dbstore
def test_bucket_create_bad_authorization_empty():
_add_custom_headers({'Authorization': ''})
@ -364,11 +306,6 @@ def test_bucket_create_bad_authorization_empty():
# the teardown is really messed up here. check it out
@tag('auth_common')
@attr(resource='bucket')
@attr(method='put')
@attr(operation='create w/no authorization')
@attr(assertion='fails 403')
@attr('fails_on_dbstore')
@pytest.mark.fails_on_dbstore
def test_bucket_create_bad_authorization_none():
_add_custom_headers(remove=('Authorization',))
@ -382,11 +319,6 @@ def test_bucket_create_bad_authorization_none():
#
@tag('auth_aws2')
@attr(resource='object')
@attr(method='put')
@attr(operation='create w/content length too short')
@attr(assertion='fails 400')
@attr('fails_on_dbstore')
@pytest.mark.fails_on_dbstore
def test_object_create_bad_contentlength_mismatch_below_aws2():
check_aws2_support()
@ -400,11 +332,6 @@ def test_object_create_bad_contentlength_mismatch_below_aws2():
@tag('auth_aws2')
@attr(resource='object')
@attr(method='put')
@attr(operation='create w/incorrect authorization')
@attr(assertion='fails 403')
@attr('fails_on_dbstore')
@pytest.mark.fails_on_dbstore
def test_object_create_bad_authorization_incorrect_aws2():
check_aws2_support()
@ -416,11 +343,6 @@ def test_object_create_bad_authorization_incorrect_aws2():
@tag('auth_aws2')
@attr(resource='object')
@attr(method='put')
@attr(operation='create w/invalid authorization')
@attr(assertion='fails 400')
@attr('fails_on_dbstore')
@pytest.mark.fails_on_dbstore
def test_object_create_bad_authorization_invalid_aws2():
check_aws2_support()
@ -431,11 +353,6 @@ def test_object_create_bad_authorization_invalid_aws2():
assert e.error_code == 'InvalidArgument'
@tag('auth_aws2')
@attr(resource='object')
@attr(method='put')
@attr(operation='create w/no date')
@attr(assertion='fails 403')
@attr('fails_on_dbstore')
@pytest.mark.fails_on_dbstore
def test_object_create_bad_date_none_aws2():
check_aws2_support()
@ -447,10 +364,6 @@ def test_object_create_bad_date_none_aws2():
@tag('auth_aws2')
@attr(resource='bucket')
@attr(method='put')
@attr(operation='create w/invalid authorization')
@attr(assertion='fails 400')
def test_bucket_create_bad_authorization_invalid_aws2():
check_aws2_support()
_add_custom_headers({'Authorization': 'AWS HAHAHA'})
@ -460,11 +373,6 @@ def test_bucket_create_bad_authorization_invalid_aws2():
assert e.error_code == 'InvalidArgument'
@tag('auth_aws2')
@attr(resource='bucket')
@attr(method='put')
@attr(operation='create w/no date')
@attr(assertion='fails 403')
@attr('fails_on_dbstore')
@pytest.mark.fails_on_dbstore
def test_bucket_create_bad_date_none_aws2():
check_aws2_support()
@ -488,10 +396,6 @@ def check_aws2_support():
@tag('auth_aws4')
@attr(resource='object')
@attr(method='put')
@attr(operation='create w/invalid MD5')
@attr(assertion='fails 400')
def test_object_create_bad_md5_invalid_garbage_aws4():
check_aws4_support()
key = _setup_bad_object({'Content-MD5':'AWS4 HAHAHA'})
@ -503,10 +407,6 @@ def test_object_create_bad_md5_invalid_garbage_aws4():
@tag('auth_aws4')
@attr(resource='object')
@attr(method='put')
@attr(operation='create w/content length too short')
@attr(assertion='fails 400')
def test_object_create_bad_contentlength_mismatch_below_aws4():
check_aws4_support()
content = 'bar'
@ -520,10 +420,6 @@ def test_object_create_bad_contentlength_mismatch_below_aws4():
@tag('auth_aws4')
@attr(resource='object')
@attr(method='put')
@attr(operation='create w/incorrect authorization')
@attr(assertion='fails 403')
def test_object_create_bad_authorization_incorrect_aws4():
check_aws4_support()
key = _setup_bad_object({'Authorization': 'AWS4-HMAC-SHA256 Credential=AKIAIGR7ZNNBHC5BKSUB/20150930/us-east-1/s3/aws4_request,SignedHeaders=host;user-agent,Signature=FWeDfwojDSdS2Ztmpfeubhd9isU='})
@ -535,10 +431,6 @@ def test_object_create_bad_authorization_incorrect_aws4():
@tag('auth_aws4')
@attr(resource='object')
@attr(method='put')
@attr(operation='create w/invalid authorization')
@attr(assertion='fails 400')
def test_object_create_bad_authorization_invalid_aws4():
check_aws4_support()
key = _setup_bad_object({'Authorization': 'AWS4-HMAC-SHA256 Credential=HAHAHA'})
@ -550,10 +442,6 @@ def test_object_create_bad_authorization_invalid_aws4():
@tag('auth_aws4')
@attr(resource='object')
@attr(method='put')
@attr(operation='create w/empty user agent')
@attr(assertion='fails 403')
def test_object_create_bad_ua_empty_aws4():
check_aws4_support()
key = _setup_bad_object({'User-Agent': ''})
@ -565,10 +453,6 @@ def test_object_create_bad_ua_empty_aws4():
@tag('auth_aws4')
@attr(resource='object')
@attr(method='put')
@attr(operation='create w/no user agent')
@attr(assertion='fails 403')
def test_object_create_bad_ua_none_aws4():
check_aws4_support()
key = _setup_bad_object(remove=('User-Agent',))
@ -580,10 +464,6 @@ def test_object_create_bad_ua_none_aws4():
@tag('auth_aws4')
@attr(resource='object')
@attr(method='put')
@attr(operation='create w/invalid date')
@attr(assertion='succeeds')
def test_object_create_bad_date_invalid_aws4():
check_aws4_support()
key = _setup_bad_object({'Date': 'Bad Date'})
@ -591,10 +471,6 @@ def test_object_create_bad_date_invalid_aws4():
@tag('auth_aws4')
@attr(resource='object')
@attr(method='put')
@attr(operation='create w/invalid x-amz-date')
@attr(assertion='fails 403')
def test_object_create_bad_amz_date_invalid_aws4():
check_aws4_support()
key = _setup_bad_object({'X-Amz-Date': 'Bad Date'})
@ -606,10 +482,6 @@ def test_object_create_bad_amz_date_invalid_aws4():
@tag('auth_aws4')
@attr(resource='object')
@attr(method='put')
@attr(operation='create w/empty date')
@attr(assertion='succeeds')
def test_object_create_bad_date_empty_aws4():
check_aws4_support()
key = _setup_bad_object({'Date': ''})
@ -617,10 +489,6 @@ def test_object_create_bad_date_empty_aws4():
@tag('auth_aws4')
@attr(resource='object')
@attr(method='put')
@attr(operation='create w/empty x-amz-date')
@attr(assertion='fails 403')
def test_object_create_bad_amz_date_empty_aws4():
check_aws4_support()
key = _setup_bad_object({'X-Amz-Date': ''})
@ -632,10 +500,6 @@ def test_object_create_bad_amz_date_empty_aws4():
@tag('auth_aws4')
@attr(resource='object')
@attr(method='put')
@attr(operation='create w/no date')
@attr(assertion='succeeds')
def test_object_create_bad_date_none_aws4():
check_aws4_support()
key = _setup_bad_object(remove=('Date',))
@ -643,10 +507,6 @@ def test_object_create_bad_date_none_aws4():
@tag('auth_aws4')
@attr(resource='object')
@attr(method='put')
@attr(operation='create w/no x-amz-date')
@attr(assertion='fails 403')
def test_object_create_bad_amz_date_none_aws4():
check_aws4_support()
key = _setup_bad_object(remove=('X-Amz-Date',))
@ -658,10 +518,6 @@ def test_object_create_bad_amz_date_none_aws4():
@tag('auth_aws4')
@attr(resource='object')
@attr(method='put')
@attr(operation='create w/date in past')
@attr(assertion='succeeds')
def test_object_create_bad_date_before_today_aws4():
check_aws4_support()
key = _setup_bad_object({'Date': 'Tue, 07 Jul 2010 21:53:04 GMT'})
@ -669,10 +525,6 @@ def test_object_create_bad_date_before_today_aws4():
@tag('auth_aws4')
@attr(resource='object')
@attr(method='put')
@attr(operation='create w/x-amz-date in past')
@attr(assertion='fails 403')
def test_object_create_bad_amz_date_before_today_aws4():
check_aws4_support()
key = _setup_bad_object({'X-Amz-Date': '20100707T215304Z'})
@ -684,10 +536,6 @@ def test_object_create_bad_amz_date_before_today_aws4():
@tag('auth_aws4')
@attr(resource='object')
@attr(method='put')
@attr(operation='create w/date in future')
@attr(assertion='succeeds')
def test_object_create_bad_date_after_today_aws4():
check_aws4_support()
key = _setup_bad_object({'Date': 'Tue, 07 Jul 2030 21:53:04 GMT'})
@ -695,10 +543,6 @@ def test_object_create_bad_date_after_today_aws4():
@tag('auth_aws4')
@attr(resource='object')
@attr(method='put')
@attr(operation='create w/x-amz-date in future')
@attr(assertion='fails 403')
def test_object_create_bad_amz_date_after_today_aws4():
check_aws4_support()
key = _setup_bad_object({'X-Amz-Date': '20300707T215304Z'})
@ -710,10 +554,6 @@ def test_object_create_bad_amz_date_after_today_aws4():
@tag('auth_aws4')
@attr(resource='object')
@attr(method='put')
@attr(operation='create w/date before epoch')
@attr(assertion='succeeds')
def test_object_create_bad_date_before_epoch_aws4():
check_aws4_support()
key = _setup_bad_object({'Date': 'Tue, 07 Jul 1950 21:53:04 GMT'})
@ -721,10 +561,6 @@ def test_object_create_bad_date_before_epoch_aws4():
@tag('auth_aws4')
@attr(resource='object')
@attr(method='put')
@attr(operation='create w/x-amz-date before epoch')
@attr(assertion='fails 403')
def test_object_create_bad_amz_date_before_epoch_aws4():
check_aws4_support()
key = _setup_bad_object({'X-Amz-Date': '19500707T215304Z'})
@ -736,10 +572,6 @@ def test_object_create_bad_amz_date_before_epoch_aws4():
@tag('auth_aws4')
@attr(resource='object')
@attr(method='put')
@attr(operation='create w/date after 9999')
@attr(assertion='fails 403')
def test_object_create_bad_date_after_end_aws4():
check_aws4_support()
key = _setup_bad_object({'Date': 'Tue, 07 Jul 9999 21:53:04 GMT'})
@ -747,10 +579,6 @@ def test_object_create_bad_date_after_end_aws4():
@tag('auth_aws4')
@attr(resource='object')
@attr(method='put')
@attr(operation='create w/x-amz-date after 9999')
@attr(assertion='fails 403')
def test_object_create_bad_amz_date_after_end_aws4():
check_aws4_support()
key = _setup_bad_object({'X-Amz-Date': '99990707T215304Z'})
@ -762,10 +590,6 @@ def test_object_create_bad_amz_date_after_end_aws4():
@tag('auth_aws4')
@attr(resource='object')
@attr(method='put')
@attr(operation='create with missing signed custom header')
@attr(assertion='fails 403')
def test_object_create_missing_signed_custom_header_aws4():
check_aws4_support()
method='PUT'
@ -792,10 +616,6 @@ def test_object_create_missing_signed_custom_header_aws4():
@tag('auth_aws4')
@attr(resource='object')
@attr(method='put')
@attr(opearation='create with missing signed header')
@attr(assertion='fails 403')
def test_object_create_missing_signed_header_aws4():
check_aws4_support()
method='PUT'
@ -823,10 +643,6 @@ def test_object_create_missing_signed_header_aws4():
@tag('auth_aws4')
@attr(resource='bucket')
@attr(method='put')
@attr(operation='create w/invalid authorization')
@attr(assertion='fails 400')
def test_bucket_create_bad_authorization_invalid_aws4():
check_aws4_support()
_add_custom_headers({'Authorization': 'AWS4 HAHAHA'})
@ -838,10 +654,6 @@ def test_bucket_create_bad_authorization_invalid_aws4():
@tag('auth_aws4')
@attr(resource='bucket')
@attr(method='put')
@attr(operation='create w/empty user agent')
@attr(assertion='fails 403')
def test_bucket_create_bad_ua_empty_aws4():
check_aws4_support()
_add_custom_headers({'User-Agent': ''})
@ -852,10 +664,6 @@ def test_bucket_create_bad_ua_empty_aws4():
assert e.error_code == 'SignatureDoesNotMatch'
@tag('auth_aws4')
@attr(resource='bucket')
@attr(method='put')
@attr(operation='create w/no user agent')
@attr(assertion='fails 403')
def test_bucket_create_bad_ua_none_aws4():
check_aws4_support()
_add_custom_headers(remove=('User-Agent',))
@ -867,10 +675,6 @@ def test_bucket_create_bad_ua_none_aws4():
@tag('auth_aws4')
@attr(resource='bucket')
@attr(method='put')
@attr(operation='create w/invalid date')
@attr(assertion='succeeds')
def test_bucket_create_bad_date_invalid_aws4():
check_aws4_support()
_add_custom_headers({'Date': 'Bad Date'})
@ -878,10 +682,6 @@ def test_bucket_create_bad_date_invalid_aws4():
@tag('auth_aws4')
@attr(resource='bucket')
@attr(method='put')
@attr(operation='create w/invalid x-amz-date')
@attr(assertion='fails 403')
def test_bucket_create_bad_amz_date_invalid_aws4():
check_aws4_support()
_add_custom_headers({'X-Amz-Date': 'Bad Date'})
@ -893,10 +693,6 @@ def test_bucket_create_bad_amz_date_invalid_aws4():
@tag('auth_aws4')
@attr(resource='bucket')
@attr(method='put')
@attr(operation='create w/empty date')
@attr(assertion='succeeds')
def test_bucket_create_bad_date_empty_aws4():
check_aws4_support()
_add_custom_headers({'Date': ''})
@ -904,10 +700,6 @@ def test_bucket_create_bad_date_empty_aws4():
@tag('auth_aws4')
@attr(resource='bucket')
@attr(method='put')
@attr(operation='create w/empty x-amz-date')
@attr(assertion='fails 403')
def test_bucket_create_bad_amz_date_empty_aws4():
check_aws4_support()
_add_custom_headers({'X-Amz-Date': ''})
@ -918,10 +710,6 @@ def test_bucket_create_bad_amz_date_empty_aws4():
assert e.error_code in ('AccessDenied', 'SignatureDoesNotMatch')
@tag('auth_aws4')
@attr(resource='bucket')
@attr(method='put')
@attr(operation='create w/no date')
@attr(assertion='succeeds')
def test_bucket_create_bad_date_none_aws4():
check_aws4_support()
_add_custom_headers(remove=('Date',))
@ -929,10 +717,6 @@ def test_bucket_create_bad_date_none_aws4():
@tag('auth_aws4')
@attr(resource='bucket')
@attr(method='put')
@attr(operation='create w/no x-amz-date')
@attr(assertion='fails 403')
def test_bucket_create_bad_amz_date_none_aws4():
check_aws4_support()
_add_custom_headers(remove=('X-Amz-Date',))
@ -944,10 +728,6 @@ def test_bucket_create_bad_amz_date_none_aws4():
@tag('auth_aws4')
@attr(resource='bucket')
@attr(method='put')
@attr(operation='create w/date in past')
@attr(assertion='succeeds')
def test_bucket_create_bad_date_before_today_aws4():
check_aws4_support()
_add_custom_headers({'Date': 'Tue, 07 Jul 2010 21:53:04 GMT'})
@ -955,10 +735,6 @@ def test_bucket_create_bad_date_before_today_aws4():
@tag('auth_aws4')
@attr(resource='bucket')
@attr(method='put')
@attr(operation='create w/x-amz-date in past')
@attr(assertion='fails 403')
def test_bucket_create_bad_amz_date_before_today_aws4():
check_aws4_support()
_add_custom_headers({'X-Amz-Date': '20100707T215304Z'})
@ -970,10 +746,6 @@ def test_bucket_create_bad_amz_date_before_today_aws4():
@tag('auth_aws4')
@attr(resource='bucket')
@attr(method='put')
@attr(operation='create w/date in future')
@attr(assertion='succeeds')
def test_bucket_create_bad_date_after_today_aws4():
check_aws4_support()
_add_custom_headers({'Date': 'Tue, 07 Jul 2030 21:53:04 GMT'})
@ -981,10 +753,6 @@ def test_bucket_create_bad_date_after_today_aws4():
@tag('auth_aws4')
@attr(resource='bucket')
@attr(method='put')
@attr(operation='create w/x-amz-date in future')
@attr(assertion='fails 403')
def test_bucket_create_bad_amz_date_after_today_aws4():
check_aws4_support()
_add_custom_headers({'X-Amz-Date': '20300707T215304Z'})
@ -996,10 +764,6 @@ def test_bucket_create_bad_amz_date_after_today_aws4():
@tag('auth_aws4')
@attr(resource='bucket')
@attr(method='put')
@attr(operation='create w/date before epoch')
@attr(assertion='succeeds')
def test_bucket_create_bad_date_before_epoch_aws4():
check_aws4_support()
_add_custom_headers({'Date': 'Tue, 07 Jul 1950 21:53:04 GMT'})
@ -1007,10 +771,6 @@ def test_bucket_create_bad_date_before_epoch_aws4():
@tag('auth_aws4')
@attr(resource='bucket')
@attr(method='put')
@attr(operation='create w/x-amz-date before epoch')
@attr(assertion='fails 403')
def test_bucket_create_bad_amz_date_before_epoch_aws4():
check_aws4_support()
_add_custom_headers({'X-Amz-Date': '19500707T215304Z'})

View file

@ -7,7 +7,6 @@ import datetime
import time
import email.utils
import isodate
import nose
import pytest
import operator
import socket
@ -28,8 +27,6 @@ import re
from collections import defaultdict
from urllib.parse import urlparse
from nose.plugins.attrib import attr
from . import utils
from .utils import assert_raises
@ -95,12 +92,7 @@ def _get_alt_connection():
# Breaks DNS with SubdomainCallingFormat
@attr('fails_with_subdomain')
@pytest.mark.fails_with_subdomain
@attr(resource='bucket')
@attr(method='put')
@attr(operation='create w/! in name')
@attr(assertion='fails with subdomain')
def test_bucket_create_naming_bad_punctuation():
# characters other than [a-zA-Z0-9._-]
check_bad_bucket_name('alpha!soup')
@ -130,13 +122,7 @@ def check_configure_versioning_retry(bucket, status, expected_string):
assert expected_string == read_status
@attr(resource='object')
@attr(method='create')
@attr(operation='create versioned object, read not exist null version')
@attr(assertion='read null version behaves correctly')
@attr('versioning')
@pytest.mark.versioning
@attr('fails_on_dbstore')
@pytest.mark.fails_on_dbstore
def test_versioning_obj_read_not_exist_null():
bucket = get_new_bucket()
@ -153,17 +139,9 @@ def test_versioning_obj_read_not_exist_null():
key = bucket.get_key(objname, version_id='null')
assert key == None
@attr(resource='object')
@attr(method='put')
@attr(operation='append object')
@attr(assertion='success')
@attr('fails_on_aws')
@pytest.mark.fails_on_aws
@attr('fails_with_subdomain')
@pytest.mark.fails_with_subdomain
@attr('appendobject')
@pytest.mark.appendobject
@attr('fails_on_dbstore')
@pytest.mark.fails_on_dbstore
def test_append_object():
bucket = get_new_bucket()
@ -182,17 +160,9 @@ def test_append_object():
key = bucket.get_key('foo')
assert key.size == 6
@attr(resource='object')
@attr(method='put')
@attr(operation='append to normal object')
@attr(assertion='fails 409')
@attr('fails_on_aws')
@pytest.mark.fails_on_aws
@attr('fails_with_subdomain')
@pytest.mark.fails_with_subdomain
@attr('appendobject')
@pytest.mark.appendobject
@attr('fails_on_dbstore')
@pytest.mark.fails_on_dbstore
def test_append_normal_object():
bucket = get_new_bucket()
@ -207,17 +177,9 @@ def test_append_normal_object():
assert res.status == 409
@attr(resource='object')
@attr(method='put')
@attr(operation='append position not right')
@attr(assertion='fails 409')
@attr('fails_on_aws')
@pytest.mark.fails_on_aws
@attr('fails_with_subdomain')
@pytest.mark.fails_with_subdomain
@attr('appendobject')
@pytest.mark.appendobject
@attr('fails_on_dbstore')
@pytest.mark.fails_on_dbstore
def test_append_object_position_wrong():
bucket = get_new_bucket()
@ -236,11 +198,6 @@ def test_append_object_position_wrong():
# TODO rgw log_bucket.set_as_logging_target() gives 403 Forbidden
# http://tracker.newdream.net/issues/984
@attr(resource='bucket.log')
@attr(method='put')
@attr(operation='set/enable/disable logging target')
@attr(assertion='operations succeed')
@attr('fails_on_rgw')
@pytest.mark.fails_on_rgw
def test_logging_toggle():
bucket = get_new_bucket()
@ -418,14 +375,8 @@ def lc_transitions(transitions=None):
return result
@attr(resource='object')
@attr(method='put')
@attr(operation='test create object with storage class')
@attr('storage_class')
@pytest.mark.storage_class
@attr('fails_on_aws')
@pytest.mark.fails_on_aws
@attr('fails_on_dbstore')
@pytest.mark.fails_on_dbstore
def test_object_storage_class():
sc = configured_storage_classes()
@ -440,14 +391,8 @@ def test_object_storage_class():
verify_object(bucket, k, data, storage_class)
@attr(resource='object')
@attr(method='put')
@attr(operation='test create multipart object with storage class')
@attr('storage_class')
@pytest.mark.storage_class
@attr('fails_on_aws')
@pytest.mark.fails_on_aws
@attr('fails_on_dbstore')
@pytest.mark.fails_on_dbstore
def test_object_storage_class_multipart():
sc = configured_storage_classes()
@ -485,27 +430,15 @@ def _do_test_object_modify_storage_class(obj_write_func, size):
copy_object_storage_class(bucket, k, bucket, k, new_storage_class)
verify_object(bucket, k, data, storage_class)
@attr(resource='object')
@attr(method='put')
@attr(operation='test changing objects storage class')
@attr('storage_class')
@pytest.mark.storage_class
@attr('fails_on_aws')
@pytest.mark.fails_on_aws
@attr('fails_on_dbstore')
@pytest.mark.fails_on_dbstore
def test_object_modify_storage_class():
_do_test_object_modify_storage_class(_populate_key, size=9*1024*1024)
@attr(resource='object')
@attr(method='put')
@attr(operation='test changing objects storage class')
@attr('storage_class')
@pytest.mark.storage_class
@attr('fails_on_aws')
@pytest.mark.fails_on_aws
@attr('fails_on_dbstore')
@pytest.mark.fails_on_dbstore
def test_object_modify_storage_class_multipart():
_do_test_object_modify_storage_class(_populate_multipart_key, size=11*1024*1024)
@ -531,26 +464,14 @@ def _do_test_object_storage_class_copy(obj_write_func, size):
copy_object_storage_class(src_bucket, src_key, dest_bucket, dest_key, new_storage_class)
verify_object(dest_bucket, dest_key, data, new_storage_class)
@attr(resource='object')
@attr(method='copy')
@attr(operation='test copy object to object with different storage class')
@attr('storage_class')
@pytest.mark.storage_class
@attr('fails_on_aws')
@pytest.mark.fails_on_aws
@attr('fails_on_dbstore')
@pytest.mark.fails_on_dbstore
def test_object_storage_class_copy():
_do_test_object_storage_class_copy(_populate_key, size=9*1024*1024)
@attr(resource='object')
@attr(method='copy')
@attr(operation='test changing objects storage class')
@attr('storage_class')
@pytest.mark.storage_class
@attr('fails_on_aws')
@pytest.mark.fails_on_aws
@attr('fails_on_dbstore')
@pytest.mark.fails_on_dbstore
def test_object_storage_class_copy_multipart():
_do_test_object_storage_class_copy(_populate_multipart_key, size=9*1024*1024)
@ -653,24 +574,12 @@ def _test_atomic_dual_conditional_write(file_size):
# verify the file
_verify_atomic_key_data(key, file_size, 'B')
@attr(resource='object')
@attr(method='put')
@attr(operation='write one or the other')
@attr(assertion='1MB successful')
@attr('fails_on_aws')
@pytest.mark.fails_on_aws
@attr('fails_on_dbstore')
@pytest.mark.fails_on_dbstore
def test_atomic_dual_conditional_write_1mb():
_test_atomic_dual_conditional_write(1024*1024)
@attr(resource='object')
@attr(method='put')
@attr(operation='write file in deleted bucket')
@attr(assertion='fail 404')
@attr('fails_on_aws')
@pytest.mark.fails_on_aws
@attr('fails_on_dbstore')
@pytest.mark.fails_on_dbstore
def test_atomic_write_bucket_gone():
bucket = get_new_bucket()
@ -711,13 +620,7 @@ def _multipart_upload_enc(bucket, s3_key_name, size, part_size=5*1024*1024,
@attr(resource='object')
@attr(method='put')
@attr(operation='multipart upload with bad key for uploading chunks')
@attr(assertion='successful')
@attr('encryption')
@pytest.mark.encryption
@attr('fails_on_dbstore')
@pytest.mark.fails_on_dbstore
def test_encryption_sse_c_multipart_invalid_chunks_1():
bucket = get_new_bucket()
@ -741,13 +644,7 @@ def test_encryption_sse_c_multipart_invalid_chunks_1():
metadata={'foo': 'bar'})
assert e.status == 400
@attr(resource='object')
@attr(method='put')
@attr(operation='multipart upload with bad md5 for chunks')
@attr(assertion='successful')
@attr('encryption')
@pytest.mark.encryption
@attr('fails_on_dbstore')
@pytest.mark.fails_on_dbstore
def test_encryption_sse_c_multipart_invalid_chunks_2():
bucket = get_new_bucket()
@ -771,15 +668,8 @@ def test_encryption_sse_c_multipart_invalid_chunks_2():
metadata={'foo': 'bar'})
assert e.status == 400
@attr(resource='bucket')
@attr(method='get')
@attr(operation='Test Bucket Policy for a user belonging to a different tenant')
@attr(assertion='succeeds')
@attr('fails_with_subdomain')
@pytest.mark.fails_with_subdomain
@attr('bucket-policy')
@pytest.mark.bucket_policy
@attr('fails_on_dbstore')
@pytest.mark.fails_on_dbstore
def test_bucket_policy_different_tenant():
bucket = get_new_bucket()
@ -815,12 +705,7 @@ def test_bucket_policy_different_tenant():
b = new_conn.get_bucket(bucket_name)
b.get_all_keys()
@attr(resource='bucket')
@attr(method='put')
@attr(operation='Test put condition operator end with ifExists')
@attr('bucket-policy')
@pytest.mark.bucket_policy
@attr('fails_on_dbstore')
@pytest.mark.fails_on_dbstore
def test_bucket_policy_set_condition_operator_end_with_IfExists():
bucket = _create_keys(keys=['foo'])
@ -856,15 +741,8 @@ def test_bucket_policy_set_condition_operator_end_with_IfExists():
def _make_arn_resource(path="*"):
return "arn:aws:s3:::{}".format(path)
@attr(resource='object')
@attr(method='put')
@attr(operation='put obj with RequestObjectTag')
@attr(assertion='success')
@attr('tagging')
@pytest.mark.tagging
@attr('bucket-policy')
@pytest.mark.bucket_policy
@attr('fails_on_dbstore')
@pytest.mark.fails_on_dbstore
def test_bucket_policy_put_obj_request_obj_tag():

View file

@ -1,7 +1,5 @@
import sys
import collections
import nose
import pytest
import string
import random
@ -12,9 +10,6 @@ import socket
from urllib.parse import urlparse
from nose.plugins.attrib import attr
from nose.tools import timed
from .. import common
from . import (
@ -236,29 +231,15 @@ def _website_request(bucket_name, path, connect_hostname=None, method='GET', tim
return res
# ---------- Non-existant buckets via the website endpoint
@attr(resource='bucket')
@attr(method='get')
@attr(operation='list')
@attr(assertion='non-existant bucket via website endpoint should give NoSuchBucket, exposing security risk')
@attr('s3website')
@pytest.mark.s3website
@attr('fails_on_rgw')
@pytest.mark.fails_on_rgw
def test_website_nonexistant_bucket_s3():
bucket_name = get_new_bucket_name()
res = _website_request(bucket_name, '')
_website_expected_error_response(res, bucket_name, 404, 'Not Found', 'NoSuchBucket', content=_website_expected_default_html(Code='NoSuchBucket'))
@attr(resource='bucket')
@attr(method='get')
@attr(operation='list')
#@attr(assertion='non-existant bucket via website endpoint should give Forbidden, keeping bucket identity secure')
@attr(assertion='non-existant bucket via website endpoint should give NoSuchBucket')
@attr('s3website')
@pytest.mark.s3website
@attr('fails_on_s3')
@pytest.mark.fails_on_s3
@attr('fails_on_dbstore')
@pytest.mark.fails_on_dbstore
def test_website_nonexistant_bucket_rgw():
bucket_name = get_new_bucket_name()
@ -267,15 +248,9 @@ def test_website_nonexistant_bucket_rgw():
_website_expected_error_response(res, bucket_name, 404, 'Not Found', 'NoSuchBucket', content=_website_expected_default_html(Code='NoSuchBucket'))
#------------- IndexDocument only, successes
@attr(resource='bucket')
@attr(method='get')
@attr(operation='list')
@attr(assertion='non-empty public buckets via s3website return page for /, where page is public')
@attr('s3website')
@pytest.mark.s3website
@attr('fails_on_dbstore')
@pytest.mark.fails_on_dbstore
@timed(10)
@pytest.mark.timeout(10)
def test_website_public_bucket_list_public_index():
bucket = get_new_bucket()
f = _test_website_prep(bucket, WEBSITE_CONFIGS_XMLFRAG['IndexDoc'])
@ -297,13 +272,7 @@ def test_website_public_bucket_list_public_index():
indexhtml.delete()
bucket.delete()
@attr(resource='bucket')
@attr(method='get')
@attr(operation='list')
@attr(assertion='non-empty private buckets via s3website return page for /, where page is private')
@attr('s3website')
@pytest.mark.s3website
@attr('fails_on_dbstore')
@pytest.mark.fails_on_dbstore
def test_website_private_bucket_list_public_index():
bucket = get_new_bucket()
@ -329,13 +298,7 @@ def test_website_private_bucket_list_public_index():
# ---------- IndexDocument only, failures
@attr(resource='bucket')
@attr(method='get')
@attr(operation='list')
@attr(assertion='empty private buckets via s3website return a 403 for /')
@attr('s3website')
@pytest.mark.s3website
@attr('fails_on_dbstore')
@pytest.mark.fails_on_dbstore
def test_website_private_bucket_list_empty():
bucket = get_new_bucket()
@ -347,13 +310,7 @@ def test_website_private_bucket_list_empty():
_website_expected_error_response(res, bucket.name, 403, 'Forbidden', 'AccessDenied', content=_website_expected_default_html(Code='AccessDenied'))
bucket.delete()
@attr(resource='bucket')
@attr(method='get')
@attr(operation='list')
@attr(assertion='empty public buckets via s3website return a 404 for /')
@attr('s3website')
@pytest.mark.s3website
@attr('fails_on_dbstore')
@pytest.mark.fails_on_dbstore
def test_website_public_bucket_list_empty():
bucket = get_new_bucket()
@ -364,13 +321,7 @@ def test_website_public_bucket_list_empty():
_website_expected_error_response(res, bucket.name, 404, 'Not Found', 'NoSuchKey', content=_website_expected_default_html(Code='NoSuchKey'))
bucket.delete()
@attr(resource='bucket')
@attr(method='get')
@attr(operation='list')
@attr(assertion='non-empty public buckets via s3website return page for /, where page is private')
@attr('s3website')
@pytest.mark.s3website
@attr('fails_on_dbstore')
@pytest.mark.fails_on_dbstore
def test_website_public_bucket_list_private_index():
bucket = get_new_bucket()
@ -391,13 +342,7 @@ def test_website_public_bucket_list_private_index():
indexhtml.delete()
bucket.delete()
@attr(resource='bucket')
@attr(method='get')
@attr(operation='list')
@attr(assertion='non-empty private buckets via s3website return page for /, where page is private')
@attr('s3website')
@pytest.mark.s3website
@attr('fails_on_dbstore')
@pytest.mark.fails_on_dbstore
def test_website_private_bucket_list_private_index():
bucket = get_new_bucket()
@ -419,13 +364,7 @@ def test_website_private_bucket_list_private_index():
bucket.delete()
# ---------- IndexDocument & ErrorDocument, failures due to errordoc assigned but missing
@attr(resource='bucket')
@attr(method='get')
@attr(operation='list')
@attr(assertion='empty private buckets via s3website return a 403 for /, missing errordoc')
@attr('s3website')
@pytest.mark.s3website
@attr('fails_on_dbstore')
@pytest.mark.fails_on_dbstore
def test_website_private_bucket_list_empty_missingerrordoc():
bucket = get_new_bucket()
@ -437,13 +376,7 @@ def test_website_private_bucket_list_empty_missingerrordoc():
bucket.delete()
@attr(resource='bucket')
@attr(method='get')
@attr(operation='list')
@attr(assertion='empty public buckets via s3website return a 404 for /, missing errordoc')
@attr('s3website')
@pytest.mark.s3website
@attr('fails_on_dbstore')
@pytest.mark.fails_on_dbstore
def test_website_public_bucket_list_empty_missingerrordoc():
bucket = get_new_bucket()
@ -454,13 +387,7 @@ def test_website_public_bucket_list_empty_missingerrordoc():
_website_expected_error_response(res, bucket.name, 404, 'Not Found', 'NoSuchKey')
bucket.delete()
@attr(resource='bucket')
@attr(method='get')
@attr(operation='list')
@attr(assertion='non-empty public buckets via s3website return page for /, where page is private, missing errordoc')
@attr('s3website')
@pytest.mark.s3website
@attr('fails_on_dbstore')
@pytest.mark.fails_on_dbstore
def test_website_public_bucket_list_private_index_missingerrordoc():
bucket = get_new_bucket()
@ -480,13 +407,7 @@ def test_website_public_bucket_list_private_index_missingerrordoc():
indexhtml.delete()
bucket.delete()
@attr(resource='bucket')
@attr(method='get')
@attr(operation='list')
@attr(assertion='non-empty private buckets via s3website return page for /, where page is private, missing errordoc')
@attr('s3website')
@pytest.mark.s3website
@attr('fails_on_dbstore')
@pytest.mark.fails_on_dbstore
def test_website_private_bucket_list_private_index_missingerrordoc():
bucket = get_new_bucket()
@ -507,13 +428,7 @@ def test_website_private_bucket_list_private_index_missingerrordoc():
bucket.delete()
# ---------- IndexDocument & ErrorDocument, failures due to errordoc assigned but not accessible
@attr(resource='bucket')
@attr(method='get')
@attr(operation='list')
@attr(assertion='empty private buckets via s3website return a 403 for /, blocked errordoc')
@attr('s3website')
@pytest.mark.s3website
@attr('fails_on_dbstore')
@pytest.mark.fails_on_dbstore
def test_website_private_bucket_list_empty_blockederrordoc():
bucket = get_new_bucket()
@ -537,13 +452,7 @@ def test_website_private_bucket_list_empty_blockederrordoc():
errorhtml.delete()
bucket.delete()
@attr(resource='bucket')
@attr(method='get')
@attr(operation='list')
@attr(assertion='check if there is an invalid payload after serving error doc')
@attr('s3website')
@pytest.mark.s3website
@attr('fails_on_dbstore')
@pytest.mark.fails_on_dbstore
def test_website_public_bucket_list_pubilc_errordoc():
bucket = get_new_bucket()
@ -590,13 +499,7 @@ def test_website_public_bucket_list_pubilc_errordoc():
errorhtml.delete()
bucket.delete()
@attr(resource='bucket')
@attr(method='get')
@attr(operation='list')
@attr(assertion='empty public buckets via s3website return a 404 for /, blocked errordoc')
@attr('s3website')
@pytest.mark.s3website
@attr('fails_on_dbstore')
@pytest.mark.fails_on_dbstore
def test_website_public_bucket_list_empty_blockederrordoc():
bucket = get_new_bucket()
@ -619,13 +522,7 @@ def test_website_public_bucket_list_empty_blockederrordoc():
errorhtml.delete()
bucket.delete()
@attr(resource='bucket')
@attr(method='get')
@attr(operation='list')
@attr(assertion='non-empty public buckets via s3website return page for /, where page is private, blocked errordoc')
@attr('s3website')
@pytest.mark.s3website
@attr('fails_on_dbstore')
@pytest.mark.fails_on_dbstore
def test_website_public_bucket_list_private_index_blockederrordoc():
bucket = get_new_bucket()
@ -654,13 +551,7 @@ def test_website_public_bucket_list_private_index_blockederrordoc():
errorhtml.delete()
bucket.delete()
@attr(resource='bucket')
@attr(method='get')
@attr(operation='list')
@attr(assertion='non-empty private buckets via s3website return page for /, where page is private, blocked errordoc')
@attr('s3website')
@pytest.mark.s3website
@attr('fails_on_dbstore')
@pytest.mark.fails_on_dbstore
def test_website_private_bucket_list_private_index_blockederrordoc():
bucket = get_new_bucket()
@ -690,13 +581,7 @@ def test_website_private_bucket_list_private_index_blockederrordoc():
bucket.delete()
# ---------- IndexDocument & ErrorDocument, failures with errordoc available
@attr(resource='bucket')
@attr(method='get')
@attr(operation='list')
@attr(assertion='empty private buckets via s3website return a 403 for /, good errordoc')
@attr('s3website')
@pytest.mark.s3website
@attr('fails_on_dbstore')
@pytest.mark.fails_on_dbstore
def test_website_private_bucket_list_empty_gooderrordoc():
bucket = get_new_bucket()
@ -715,13 +600,7 @@ def test_website_private_bucket_list_empty_gooderrordoc():
errorhtml.delete()
bucket.delete()
@attr(resource='bucket')
@attr(method='get')
@attr(operation='list')
@attr(assertion='empty public buckets via s3website return a 404 for /, good errordoc')
@attr('s3website')
@pytest.mark.s3website
@attr('fails_on_dbstore')
@pytest.mark.fails_on_dbstore
def test_website_public_bucket_list_empty_gooderrordoc():
bucket = get_new_bucket()
@ -741,13 +620,7 @@ def test_website_public_bucket_list_empty_gooderrordoc():
errorhtml.delete()
bucket.delete()
@attr(resource='bucket')
@attr(method='get')
@attr(operation='list')
@attr(assertion='non-empty public buckets via s3website return page for /, where page is private')
@attr('s3website')
@pytest.mark.s3website
@attr('fails_on_dbstore')
@pytest.mark.fails_on_dbstore
def test_website_public_bucket_list_private_index_gooderrordoc():
bucket = get_new_bucket()
@ -772,13 +645,7 @@ def test_website_public_bucket_list_private_index_gooderrordoc():
errorhtml.delete()
bucket.delete()
@attr(resource='bucket')
@attr(method='get')
@attr(operation='list')
@attr(assertion='non-empty private buckets via s3website return page for /, where page is private')
@attr('s3website')
@pytest.mark.s3website
@attr('fails_on_dbstore')
@pytest.mark.fails_on_dbstore
def test_website_private_bucket_list_private_index_gooderrordoc():
bucket = get_new_bucket()
@ -804,13 +671,7 @@ def test_website_private_bucket_list_private_index_gooderrordoc():
bucket.delete()
# ------ RedirectAll tests
@attr(resource='bucket')
@attr(method='get')
@attr(operation='list')
@attr(assertion='RedirectAllRequestsTo without protocol should TODO')
@attr('s3website')
@pytest.mark.s3website
@attr('fails_on_dbstore')
@pytest.mark.fails_on_dbstore
def test_website_bucket_private_redirectall_base():
bucket = get_new_bucket()
@ -823,13 +684,7 @@ def test_website_bucket_private_redirectall_base():
bucket.delete()
@attr(resource='bucket')
@attr(method='get')
@attr(operation='list')
@attr(assertion='RedirectAllRequestsTo without protocol should TODO')
@attr('s3website')
@pytest.mark.s3website
@attr('fails_on_dbstore')
@pytest.mark.fails_on_dbstore
def test_website_bucket_private_redirectall_path():
bucket = get_new_bucket()
@ -844,13 +699,7 @@ def test_website_bucket_private_redirectall_path():
bucket.delete()
@attr(resource='bucket')
@attr(method='get')
@attr(operation='list')
@attr(assertion='RedirectAllRequestsTo without protocol should TODO')
@attr('s3website')
@pytest.mark.s3website
@attr('fails_on_dbstore')
@pytest.mark.fails_on_dbstore
def test_website_bucket_private_redirectall_path_upgrade():
bucket = get_new_bucket()
@ -867,15 +716,8 @@ def test_website_bucket_private_redirectall_path_upgrade():
bucket.delete()
# ------ x-amz redirect tests
@attr(resource='bucket')
@attr(method='get')
@attr(operation='list')
@attr(assertion='x-amz-website-redirect-location should not fire without websiteconf')
@attr('s3website')
@pytest.mark.s3website
@attr('x-amz-website-redirect-location')
@pytest.mark.s3website_redirect_location
@attr('fails_on_dbstore')
@pytest.mark.fails_on_dbstore
def test_website_xredirect_nonwebsite():
bucket = get_new_bucket()
@ -902,15 +744,8 @@ def test_website_xredirect_nonwebsite():
k.delete()
bucket.delete()
@attr(resource='bucket')
@attr(method='get')
@attr(operation='list')
@attr(assertion='x-amz-website-redirect-location should fire websiteconf, relative path, public key')
@attr('s3website')
@pytest.mark.s3website
@attr('x-amz-website-redirect-location')
@pytest.mark.s3website_redirect_location
@attr('fails_on_dbstore')
@pytest.mark.fails_on_dbstore
def test_website_xredirect_public_relative():
bucket = get_new_bucket()
@ -932,15 +767,8 @@ def test_website_xredirect_public_relative():
k.delete()
bucket.delete()
@attr(resource='bucket')
@attr(method='get')
@attr(operation='list')
@attr(assertion='x-amz-website-redirect-location should fire websiteconf, absolute, public key')
@attr('s3website')
@pytest.mark.s3website
@attr('x-amz-website-redirect-location')
@pytest.mark.s3website_redirect_location
@attr('fails_on_dbstore')
@pytest.mark.fails_on_dbstore
def test_website_xredirect_public_abs():
bucket = get_new_bucket()
@ -962,15 +790,8 @@ def test_website_xredirect_public_abs():
k.delete()
bucket.delete()
@attr(resource='bucket')
@attr(method='get')
@attr(operation='list')
@attr(assertion='x-amz-website-redirect-location should fire websiteconf, relative path, private key')
@attr('s3website')
@pytest.mark.s3website
@attr('x-amz-website-redirect-location')
@pytest.mark.s3website_redirect_location
@attr('fails_on_dbstore')
@pytest.mark.fails_on_dbstore
def test_website_xredirect_private_relative():
bucket = get_new_bucket()
@ -992,15 +813,8 @@ def test_website_xredirect_private_relative():
k.delete()
bucket.delete()
@attr(resource='bucket')
@attr(method='get')
@attr(operation='list')
@attr(assertion='x-amz-website-redirect-location should fire websiteconf, absolute, private key')
@attr('s3website')
@pytest.mark.s3website
@attr('x-amz-website-redirect-location')
@pytest.mark.s3website_redirect_location
@attr('fails_on_dbstore')
@pytest.mark.fails_on_dbstore
def test_website_xredirect_private_abs():
bucket = get_new_bucket()
@ -1226,7 +1040,6 @@ def routing_teardown(**kwargs):
print('Deleting', str(o))
o.delete()
#@timed(10)
def routing_check(*args, **kwargs):
bucket = kwargs['bucket']
args=args[0]
@ -1261,11 +1074,8 @@ def routing_check(*args, **kwargs):
else:
assert(False)
@attr('s3website_RoutingRules')
@pytest.mark.s3website_routing_rules
@attr('s3website')
@pytest.mark.s3website
@attr('fails_on_dbstore')
@pytest.mark.fails_on_dbstore
def test_routing_generator():
for t in ROUTING_RULES_TESTS:

View file

@ -1,6 +1,4 @@
import boto3
from nose.plugins.attrib import attr
import nose
import pytest
from botocore.exceptions import ClientError
from email.utils import formatdate
@ -163,10 +161,6 @@ def tag(*tags):
#
@tag('auth_common')
@attr(resource='object')
@attr(method='put')
@attr(operation='create w/invalid MD5')
@attr(assertion='fails 400')
def test_object_create_bad_md5_invalid_short():
e = _add_header_create_bad_object({'Content-MD5':'YWJyYWNhZGFicmE='})
status, error_code = _get_status_and_error_code(e.response)
@ -174,10 +168,6 @@ def test_object_create_bad_md5_invalid_short():
assert error_code == 'InvalidDigest'
@tag('auth_common')
@attr(resource='object')
@attr(method='put')
@attr(operation='create w/mismatched MD5')
@attr(assertion='fails 400')
def test_object_create_bad_md5_bad():
e = _add_header_create_bad_object({'Content-MD5':'rL0Y20xC+Fzt72VPzMSk2A=='})
status, error_code = _get_status_and_error_code(e.response)
@ -185,10 +175,6 @@ def test_object_create_bad_md5_bad():
assert error_code == 'BadDigest'
@tag('auth_common')
@attr(resource='object')
@attr(method='put')
@attr(operation='create w/empty MD5')
@attr(assertion='fails 400')
def test_object_create_bad_md5_empty():
e = _add_header_create_bad_object({'Content-MD5':''})
status, error_code = _get_status_and_error_code(e.response)
@ -196,52 +182,31 @@ def test_object_create_bad_md5_empty():
assert error_code == 'InvalidDigest'
@tag('auth_common')
@attr(resource='object')
@attr(method='put')
@attr(operation='create w/no MD5 header')
@attr(assertion='succeeds')
def test_object_create_bad_md5_none():
bucket_name, key_name = _remove_header_create_object('Content-MD5')
client = get_client()
client.put_object(Bucket=bucket_name, Key=key_name, Body='bar')
@tag('auth_common')
@attr(resource='object')
@attr(method='put')
@attr(operation='create w/Expect 200')
@attr(assertion='garbage, but S3 succeeds!')
def test_object_create_bad_expect_mismatch():
bucket_name, key_name = _add_header_create_object({'Expect': 200})
client = get_client()
client.put_object(Bucket=bucket_name, Key=key_name, Body='bar')
@tag('auth_common')
@attr(resource='object')
@attr(method='put')
@attr(operation='create w/empty expect')
@attr(assertion='succeeds ... should it?')
def test_object_create_bad_expect_empty():
bucket_name, key_name = _add_header_create_object({'Expect': ''})
client = get_client()
client.put_object(Bucket=bucket_name, Key=key_name, Body='bar')
@tag('auth_common')
@attr(resource='object')
@attr(method='put')
@attr(operation='create w/no expect')
@attr(assertion='succeeds')
def test_object_create_bad_expect_none():
bucket_name, key_name = _remove_header_create_object('Expect')
client = get_client()
client.put_object(Bucket=bucket_name, Key=key_name, Body='bar')
@tag('auth_common')
@attr(resource='object')
@attr(method='put')
@attr(operation='create w/empty content length')
@attr(assertion='fails 400')
# TODO: remove 'fails_on_rgw' and once we have learned how to remove the content-length header
@attr('fails_on_rgw')
@pytest.mark.fails_on_rgw
def test_object_create_bad_contentlength_empty():
e = _add_header_create_bad_object({'Content-Length':''})
@ -249,11 +214,6 @@ def test_object_create_bad_contentlength_empty():
assert status == 400
@tag('auth_common')
@attr(resource='object')
@attr(method='put')
@attr(operation='create w/negative content length')
@attr(assertion='fails 400')
@attr('fails_on_mod_proxy_fcgi')
@pytest.mark.fails_on_mod_proxy_fcgi
def test_object_create_bad_contentlength_negative():
client = get_client()
@ -264,12 +224,7 @@ def test_object_create_bad_contentlength_negative():
assert status == 400
@tag('auth_common')
@attr(resource='object')
@attr(method='put')
@attr(operation='create w/no content length')
@attr(assertion='fails 411')
# TODO: remove 'fails_on_rgw' and once we have learned how to remove the content-length header
@attr('fails_on_rgw')
@pytest.mark.fails_on_rgw
def test_object_create_bad_contentlength_none():
remove = 'Content-Length'
@ -279,20 +234,12 @@ def test_object_create_bad_contentlength_none():
assert error_code == 'MissingContentLength'
@tag('auth_common')
@attr(resource='object')
@attr(method='put')
@attr(operation='create w/content type text/plain')
@attr(assertion='succeeds')
def test_object_create_bad_contenttype_invalid():
bucket_name, key_name = _add_header_create_object({'Content-Type': 'text/plain'})
client = get_client()
client.put_object(Bucket=bucket_name, Key=key_name, Body='bar')
@tag('auth_common')
@attr(resource='object')
@attr(method='put')
@attr(operation='create w/empty content type')
@attr(assertion='succeeds')
def test_object_create_bad_contenttype_empty():
client = get_client()
key_name = 'foo'
@ -300,10 +247,6 @@ def test_object_create_bad_contenttype_empty():
client.put_object(Bucket=bucket_name, Key=key_name, Body='bar', ContentType='')
@tag('auth_common')
@attr(resource='object')
@attr(method='put')
@attr(operation='create w/no content type')
@attr(assertion='succeeds')
def test_object_create_bad_contenttype_none():
bucket_name = get_new_bucket()
key_name = 'foo'
@ -313,12 +256,7 @@ def test_object_create_bad_contenttype_none():
@tag('auth_common')
@attr(resource='object')
@attr(method='put')
@attr(operation='create w/empty authorization')
@attr(assertion='fails 403')
# TODO: remove 'fails_on_rgw' and once we have learned how to remove the authorization header
@attr('fails_on_rgw')
@pytest.mark.fails_on_rgw
def test_object_create_bad_authorization_empty():
e = _add_header_create_bad_object({'Authorization': ''})
@ -326,12 +264,7 @@ def test_object_create_bad_authorization_empty():
assert status == 403
@tag('auth_common')
@attr(resource='object')
@attr(method='put')
@attr(operation='create w/date and x-amz-date')
@attr(assertion='succeeds')
# TODO: remove 'fails_on_rgw' and once we have learned how to pass both the 'Date' and 'X-Amz-Date' header during signing and not 'X-Amz-Date' before
@attr('fails_on_rgw')
@pytest.mark.fails_on_rgw
def test_object_create_date_and_amz_date():
date = formatdate(usegmt=True)
@ -340,12 +273,7 @@ def test_object_create_date_and_amz_date():
client.put_object(Bucket=bucket_name, Key=key_name, Body='bar')
@tag('auth_common')
@attr(resource='object')
@attr(method='put')
@attr(operation='create w/x-amz-date and no date')
@attr(assertion='succeeds')
# TODO: remove 'fails_on_rgw' and once we have learned how to pass both the 'Date' and 'X-Amz-Date' header during signing and not 'X-Amz-Date' before
@attr('fails_on_rgw')
@pytest.mark.fails_on_rgw
def test_object_create_amz_date_and_no_date():
date = formatdate(usegmt=True)
@ -355,12 +283,7 @@ def test_object_create_amz_date_and_no_date():
# the teardown is really messed up here. check it out
@tag('auth_common')
@attr(resource='object')
@attr(method='put')
@attr(operation='create w/no authorization')
@attr(assertion='fails 403')
# TODO: remove 'fails_on_rgw' and once we have learned how to remove the authorization header
@attr('fails_on_rgw')
@pytest.mark.fails_on_rgw
def test_object_create_bad_authorization_none():
e = _remove_header_create_bad_object('Authorization')
@ -368,24 +291,14 @@ def test_object_create_bad_authorization_none():
assert status == 403
@tag('auth_common')
@attr(resource='bucket')
@attr(method='put')
@attr(operation='create w/no content length')
@attr(assertion='succeeds')
# TODO: remove 'fails_on_rgw' and once we have learned how to remove the content-length header
@attr('fails_on_rgw')
@pytest.mark.fails_on_rgw
def test_bucket_create_contentlength_none():
remove = 'Content-Length'
_remove_header_create_bucket(remove)
@tag('auth_common')
@attr(resource='bucket')
@attr(method='acls')
@attr(operation='set w/no content length')
@attr(assertion='succeeds')
# TODO: remove 'fails_on_rgw' and once we have learned how to remove the content-length header
@attr('fails_on_rgw')
@pytest.mark.fails_on_rgw
def test_object_acl_create_contentlength_none():
bucket_name = get_new_bucket()
@ -401,10 +314,6 @@ def test_object_acl_create_contentlength_none():
client.put_object_acl(Bucket=bucket_name, Key='foo', ACL='public-read')
@tag('auth_common')
@attr(resource='bucket')
@attr(method='acls')
@attr(operation='set w/invalid permission')
@attr(assertion='fails 400')
def test_bucket_put_bad_canned_acl():
bucket_name = get_new_bucket()
client = get_client()
@ -418,10 +327,6 @@ def test_bucket_put_bad_canned_acl():
assert status == 400
@tag('auth_common')
@attr(resource='bucket')
@attr(method='put')
@attr(operation='create w/expect 200')
@attr(assertion='garbage, but S3 succeeds!')
def test_bucket_create_bad_expect_mismatch():
bucket_name = get_new_bucket_name()
client = get_client()
@ -432,22 +337,13 @@ def test_bucket_create_bad_expect_mismatch():
client.create_bucket(Bucket=bucket_name)
@tag('auth_common')
@attr(resource='bucket')
@attr(method='put')
@attr(operation='create w/expect empty')
@attr(assertion='garbage, but S3 succeeds!')
def test_bucket_create_bad_expect_empty():
headers = {'Expect': ''}
_add_header_create_bucket(headers)
@tag('auth_common')
@attr(resource='bucket')
@attr(method='put')
@attr(operation='create w/empty content length')
@attr(assertion='fails 400')
# TODO: The request isn't even making it to the RGW past the frontend
# This test had 'fails_on_rgw' before the move to boto3
@attr('fails_on_rgw')
@pytest.mark.fails_on_rgw
def test_bucket_create_bad_contentlength_empty():
headers = {'Content-Length': ''}
@ -456,11 +352,6 @@ def test_bucket_create_bad_contentlength_empty():
assert status == 400
@tag('auth_common')
@attr(resource='bucket')
@attr(method='put')
@attr(operation='create w/negative content length')
@attr(assertion='fails 400')
@attr('fails_on_mod_proxy_fcgi')
@pytest.mark.fails_on_mod_proxy_fcgi
def test_bucket_create_bad_contentlength_negative():
headers = {'Content-Length': '-1'}
@ -469,24 +360,14 @@ def test_bucket_create_bad_contentlength_negative():
assert status == 400
@tag('auth_common')
@attr(resource='bucket')
@attr(method='put')
@attr(operation='create w/no content length')
@attr(assertion='succeeds')
# TODO: remove 'fails_on_rgw' and once we have learned how to remove the content-length header
@attr('fails_on_rgw')
@pytest.mark.fails_on_rgw
def test_bucket_create_bad_contentlength_none():
remove = 'Content-Length'
_remove_header_create_bucket(remove)
@tag('auth_common')
@attr(resource='bucket')
@attr(method='put')
@attr(operation='create w/empty authorization')
@attr(assertion='fails 403')
# TODO: remove 'fails_on_rgw' and once we have learned how to manipulate the authorization header
@attr('fails_on_rgw')
@pytest.mark.fails_on_rgw
def test_bucket_create_bad_authorization_empty():
headers = {'Authorization': ''}
@ -496,12 +377,7 @@ def test_bucket_create_bad_authorization_empty():
assert error_code == 'AccessDenied'
@tag('auth_common')
@attr(resource='bucket')
@attr(method='put')
@attr(operation='create w/no authorization')
@attr(assertion='fails 403')
# TODO: remove 'fails_on_rgw' and once we have learned how to manipulate the authorization header
@attr('fails_on_rgw')
@pytest.mark.fails_on_rgw
def test_bucket_create_bad_authorization_none():
e = _remove_header_create_bad_bucket('Authorization')
@ -510,10 +386,6 @@ def test_bucket_create_bad_authorization_none():
assert error_code == 'AccessDenied'
@tag('auth_aws2')
@attr(resource='object')
@attr(method='put')
@attr(operation='create w/invalid MD5')
@attr(assertion='fails 400')
def test_object_create_bad_md5_invalid_garbage_aws2():
v2_client = get_v2_client()
headers = {'Content-MD5': 'AWS HAHAHA'}
@ -523,12 +395,7 @@ def test_object_create_bad_md5_invalid_garbage_aws2():
assert error_code == 'InvalidDigest'
@tag('auth_aws2')
@attr(resource='object')
@attr(method='put')
@attr(operation='create w/content length too short')
@attr(assertion='fails 400')
# TODO: remove 'fails_on_rgw' and once we have learned how to manipulate the Content-Length header
@attr('fails_on_rgw')
@pytest.mark.fails_on_rgw
def test_object_create_bad_contentlength_mismatch_below_aws2():
v2_client = get_v2_client()
@ -541,12 +408,7 @@ def test_object_create_bad_contentlength_mismatch_below_aws2():
assert error_code == 'BadDigest'
@tag('auth_aws2')
@attr(resource='object')
@attr(method='put')
@attr(operation='create w/incorrect authorization')
@attr(assertion='fails 403')
# TODO: remove 'fails_on_rgw' and once we have learned how to manipulate the authorization header
@attr('fails_on_rgw')
@pytest.mark.fails_on_rgw
def test_object_create_bad_authorization_incorrect_aws2():
v2_client = get_v2_client()
@ -557,12 +419,7 @@ def test_object_create_bad_authorization_incorrect_aws2():
assert error_code == 'InvalidDigest'
@tag('auth_aws2')
@attr(resource='object')
@attr(method='put')
@attr(operation='create w/invalid authorization')
@attr(assertion='fails 400')
# TODO: remove 'fails_on_rgw' and once we have learned how to manipulate the authorization header
@attr('fails_on_rgw')
@pytest.mark.fails_on_rgw
def test_object_create_bad_authorization_invalid_aws2():
v2_client = get_v2_client()
@ -573,10 +430,6 @@ def test_object_create_bad_authorization_invalid_aws2():
assert error_code == 'InvalidArgument'
@tag('auth_aws2')
@attr(resource='object')
@attr(method='put')
@attr(operation='create w/empty user agent')
@attr(assertion='succeeds')
def test_object_create_bad_ua_empty_aws2():
v2_client = get_v2_client()
headers = {'User-Agent': ''}
@ -584,10 +437,6 @@ def test_object_create_bad_ua_empty_aws2():
v2_client.put_object(Bucket=bucket_name, Key=key_name, Body='bar')
@tag('auth_aws2')
@attr(resource='object')
@attr(method='put')
@attr(operation='create w/no user agent')
@attr(assertion='succeeds')
def test_object_create_bad_ua_none_aws2():
v2_client = get_v2_client()
remove = 'User-Agent'
@ -595,10 +444,6 @@ def test_object_create_bad_ua_none_aws2():
v2_client.put_object(Bucket=bucket_name, Key=key_name, Body='bar')
@tag('auth_aws2')
@attr(resource='object')
@attr(method='put')
@attr(operation='create w/invalid date')
@attr(assertion='fails 403')
def test_object_create_bad_date_invalid_aws2():
v2_client = get_v2_client()
headers = {'x-amz-date': 'Bad Date'}
@ -608,10 +453,6 @@ def test_object_create_bad_date_invalid_aws2():
assert error_code == 'AccessDenied'
@tag('auth_aws2')
@attr(resource='object')
@attr(method='put')
@attr(operation='create w/empty date')
@attr(assertion='fails 403')
def test_object_create_bad_date_empty_aws2():
v2_client = get_v2_client()
headers = {'x-amz-date': ''}
@ -621,12 +462,7 @@ def test_object_create_bad_date_empty_aws2():
assert error_code == 'AccessDenied'
@tag('auth_aws2')
@attr(resource='object')
@attr(method='put')
@attr(operation='create w/no date')
@attr(assertion='fails 403')
# TODO: remove 'fails_on_rgw' and once we have learned how to remove the date header
@attr('fails_on_rgw')
@pytest.mark.fails_on_rgw
def test_object_create_bad_date_none_aws2():
v2_client = get_v2_client()
@ -637,10 +473,6 @@ def test_object_create_bad_date_none_aws2():
assert error_code == 'AccessDenied'
@tag('auth_aws2')
@attr(resource='object')
@attr(method='put')
@attr(operation='create w/date in past')
@attr(assertion='fails 403')
def test_object_create_bad_date_before_today_aws2():
v2_client = get_v2_client()
headers = {'x-amz-date': 'Tue, 07 Jul 2010 21:53:04 GMT'}
@ -650,10 +482,6 @@ def test_object_create_bad_date_before_today_aws2():
assert error_code == 'RequestTimeTooSkewed'
@tag('auth_aws2')
@attr(resource='object')
@attr(method='put')
@attr(operation='create w/date before epoch')
@attr(assertion='fails 403')
def test_object_create_bad_date_before_epoch_aws2():
v2_client = get_v2_client()
headers = {'x-amz-date': 'Tue, 07 Jul 1950 21:53:04 GMT'}
@ -663,10 +491,6 @@ def test_object_create_bad_date_before_epoch_aws2():
assert error_code == 'AccessDenied'
@tag('auth_aws2')
@attr(resource='object')
@attr(method='put')
@attr(operation='create w/date after 9999')
@attr(assertion='fails 403')
def test_object_create_bad_date_after_end_aws2():
v2_client = get_v2_client()
headers = {'x-amz-date': 'Tue, 07 Jul 9999 21:53:04 GMT'}
@ -676,12 +500,7 @@ def test_object_create_bad_date_after_end_aws2():
assert error_code == 'RequestTimeTooSkewed'
@tag('auth_aws2')
@attr(resource='bucket')
@attr(method='put')
@attr(operation='create w/invalid authorization')
@attr(assertion='fails 400')
# TODO: remove 'fails_on_rgw' and once we have learned how to remove the date header
@attr('fails_on_rgw')
@pytest.mark.fails_on_rgw
def test_bucket_create_bad_authorization_invalid_aws2():
v2_client = get_v2_client()
@ -692,30 +511,18 @@ def test_bucket_create_bad_authorization_invalid_aws2():
assert error_code == 'InvalidArgument'
@tag('auth_aws2')
@attr(resource='bucket')
@attr(method='put')
@attr(operation='create w/empty user agent')
@attr(assertion='succeeds')
def test_bucket_create_bad_ua_empty_aws2():
v2_client = get_v2_client()
headers = {'User-Agent': ''}
_add_header_create_bucket(headers, v2_client)
@tag('auth_aws2')
@attr(resource='bucket')
@attr(method='put')
@attr(operation='create w/no user agent')
@attr(assertion='succeeds')
def test_bucket_create_bad_ua_none_aws2():
v2_client = get_v2_client()
remove = 'User-Agent'
_remove_header_create_bucket(remove, v2_client)
@tag('auth_aws2')
@attr(resource='bucket')
@attr(method='put')
@attr(operation='create w/invalid date')
@attr(assertion='fails 403')
def test_bucket_create_bad_date_invalid_aws2():
v2_client = get_v2_client()
headers = {'x-amz-date': 'Bad Date'}
@ -725,10 +532,6 @@ def test_bucket_create_bad_date_invalid_aws2():
assert error_code == 'AccessDenied'
@tag('auth_aws2')
@attr(resource='bucket')
@attr(method='put')
@attr(operation='create w/empty date')
@attr(assertion='fails 403')
def test_bucket_create_bad_date_empty_aws2():
v2_client = get_v2_client()
headers = {'x-amz-date': ''}
@ -738,12 +541,7 @@ def test_bucket_create_bad_date_empty_aws2():
assert error_code == 'AccessDenied'
@tag('auth_aws2')
@attr(resource='bucket')
@attr(method='put')
@attr(operation='create w/no date')
@attr(assertion='fails 403')
# TODO: remove 'fails_on_rgw' and once we have learned how to remove the date header
@attr('fails_on_rgw')
@pytest.mark.fails_on_rgw
def test_bucket_create_bad_date_none_aws2():
v2_client = get_v2_client()
@ -754,10 +552,6 @@ def test_bucket_create_bad_date_none_aws2():
assert error_code == 'AccessDenied'
@tag('auth_aws2')
@attr(resource='bucket')
@attr(method='put')
@attr(operation='create w/date in past')
@attr(assertion='fails 403')
def test_bucket_create_bad_date_before_today_aws2():
v2_client = get_v2_client()
headers = {'x-amz-date': 'Tue, 07 Jul 2010 21:53:04 GMT'}
@ -767,10 +561,6 @@ def test_bucket_create_bad_date_before_today_aws2():
assert error_code == 'RequestTimeTooSkewed'
@tag('auth_aws2')
@attr(resource='bucket')
@attr(method='put')
@attr(operation='create w/date in future')
@attr(assertion='fails 403')
def test_bucket_create_bad_date_after_today_aws2():
v2_client = get_v2_client()
headers = {'x-amz-date': 'Tue, 07 Jul 2030 21:53:04 GMT'}
@ -780,10 +570,6 @@ def test_bucket_create_bad_date_after_today_aws2():
assert error_code == 'RequestTimeTooSkewed'
@tag('auth_aws2')
@attr(resource='bucket')
@attr(method='put')
@attr(operation='create w/date before epoch')
@attr(assertion='fails 403')
def test_bucket_create_bad_date_before_epoch_aws2():
v2_client = get_v2_client()
headers = {'x-amz-date': 'Tue, 07 Jul 1950 21:53:04 GMT'}

View file

@ -1,7 +1,6 @@
import json
from botocore.exceptions import ClientError
from nose.plugins.attrib import attr
import pytest
from s3tests_boto3.functional.utils import assert_raises
@ -19,13 +18,7 @@ from . import (
from .utils import _get_status, _get_status_and_error_code
@attr(resource='user-policy')
@attr(method='put')
@attr(operation='Verify Put User Policy')
@attr(assertion='succeeds')
@attr('user-policy')
@pytest.mark.user_policy
@attr('test_of_iam')
@pytest.mark.test_of_iam
def test_put_user_policy():
client = get_iam_client()
@ -45,13 +38,7 @@ def test_put_user_policy():
assert response['ResponseMetadata']['HTTPStatusCode'] == 200
@attr(resource='user-policy')
@attr(method='put')
@attr(operation='Verify Put User Policy with invalid user')
@attr(assertion='succeeds')
@attr('user-policy')
@pytest.mark.user_policy
@attr('test_of_iam')
@pytest.mark.test_of_iam
def test_put_user_policy_invalid_user():
client = get_iam_client()
@ -69,13 +56,7 @@ def test_put_user_policy_invalid_user():
assert status == 404
@attr(resource='user-policy')
@attr(method='put')
@attr(operation='Verify Put User Policy using parameter value outside limit')
@attr(assertion='succeeds')
@attr('user-policy')
@pytest.mark.user_policy
@attr('test_of_iam')
@pytest.mark.test_of_iam
def test_put_user_policy_parameter_limit():
client = get_iam_client()
@ -94,15 +75,8 @@ def test_put_user_policy_parameter_limit():
assert status == 400
@attr(resource='user-policy')
@attr(method='put')
@attr(operation='Verify Put User Policy using invalid policy document elements')
@attr(assertion='succeeds')
@attr('user-policy')
@pytest.mark.user_policy
@attr('test_of_iam')
@pytest.mark.test_of_iam
@attr('fails_on_rgw')
@pytest.mark.fails_on_rgw
def test_put_user_policy_invalid_element():
client = get_iam_client()
@ -167,13 +141,7 @@ def test_put_user_policy_invalid_element():
assert status == 400
@attr(resource='user-policy')
@attr(method='put')
@attr(operation='Verify Put a policy that already exists')
@attr(assertion='succeeds')
@attr('user-policy')
@pytest.mark.user_policy
@attr('test_of_iam')
@pytest.mark.test_of_iam
def test_put_existing_user_policy():
client = get_iam_client()
@ -194,13 +162,7 @@ def test_put_existing_user_policy():
client.delete_user_policy(PolicyName='AllAccessPolicy', UserName=get_alt_user_id())
@attr(resource='user-policy')
@attr(method='put')
@attr(operation='Verify List User policies')
@attr(assertion='succeeds')
@attr('user-policy')
@pytest.mark.user_policy
@attr('test_of_iam')
@pytest.mark.test_of_iam
def test_list_user_policy():
client = get_iam_client()
@ -221,13 +183,7 @@ def test_list_user_policy():
client.delete_user_policy(PolicyName='AllAccessPolicy', UserName=get_alt_user_id())
@attr(resource='user-policy')
@attr(method='put')
@attr(operation='Verify List User policies with invalid user')
@attr(assertion='succeeds')
@attr('user-policy')
@pytest.mark.user_policy
@attr('test_of_iam')
@pytest.mark.test_of_iam
def test_list_user_policy_invalid_user():
client = get_iam_client()
@ -236,13 +192,7 @@ def test_list_user_policy_invalid_user():
assert status == 404
@attr(resource='user-policy')
@attr(method='get')
@attr(operation='Verify Get User policy')
@attr(assertion='succeeds')
@attr('user-policy')
@pytest.mark.user_policy
@attr('test_of_iam')
@pytest.mark.test_of_iam
def test_get_user_policy():
client = get_iam_client()
@ -265,13 +215,7 @@ def test_get_user_policy():
assert response['ResponseMetadata']['HTTPStatusCode'] == 200
@attr(resource='user-policy')
@attr(method='get')
@attr(operation='Verify Get User Policy with invalid user')
@attr(assertion='succeeds')
@attr('user-policy')
@pytest.mark.user_policy
@attr('test_of_iam')
@pytest.mark.test_of_iam
def test_get_user_policy_invalid_user():
client = get_iam_client()
@ -293,15 +237,8 @@ def test_get_user_policy_invalid_user():
client.delete_user_policy(PolicyName='AllAccessPolicy', UserName=get_alt_user_id())
@attr(resource='user-policy')
@attr(method='get')
@attr(operation='Verify Get User Policy with invalid policy name')
@attr(assertion='succeeds')
@attr('user-policy')
@pytest.mark.user_policy
@attr('test_of_iam')
@pytest.mark.test_of_iam
@attr('fails_on_rgw')
@pytest.mark.fails_on_rgw
def test_get_user_policy_invalid_policy_name():
client = get_iam_client()
@ -322,15 +259,8 @@ def test_get_user_policy_invalid_policy_name():
client.delete_user_policy(PolicyName='AllAccessPolicy', UserName=get_alt_user_id())
@attr(resource='user-policy')
@attr(method='get')
@attr(operation='Verify Get Deleted User Policy')
@attr(assertion='succeeds')
@attr('user-policy')
@pytest.mark.user_policy
@attr('test_of_iam')
@pytest.mark.test_of_iam
@attr('fails_on_rgw')
@pytest.mark.fails_on_rgw
def test_get_deleted_user_policy():
client = get_iam_client()
@ -351,13 +281,7 @@ def test_get_deleted_user_policy():
assert status == 404
@attr(resource='user-policy')
@attr(method='get')
@attr(operation='Verify Get a policy from multiple policies for a user')
@attr(assertion='succeeds')
@attr('user-policy')
@pytest.mark.user_policy
@attr('test_of_iam')
@pytest.mark.test_of_iam
def test_get_user_policy_from_multiple_policies():
client = get_iam_client()
@ -390,13 +314,7 @@ def test_get_user_policy_from_multiple_policies():
assert response['ResponseMetadata']['HTTPStatusCode'] == 200
@attr(resource='user-policy')
@attr(method='delete')
@attr(operation='Verify Delete User Policy')
@attr(assertion='succeeds')
@attr('user-policy')
@pytest.mark.user_policy
@attr('test_of_iam')
@pytest.mark.test_of_iam
def test_delete_user_policy():
client = get_iam_client()
@ -418,13 +336,7 @@ def test_delete_user_policy():
assert response['ResponseMetadata']['HTTPStatusCode'] == 200
@attr(resource='user-policy')
@attr(method='delete')
@attr(operation='Verify Delete User Policy with invalid user')
@attr(assertion='succeeds')
@attr('user-policy')
@pytest.mark.user_policy
@attr('test_of_iam')
@pytest.mark.test_of_iam
def test_delete_user_policy_invalid_user():
client = get_iam_client()
@ -450,13 +362,7 @@ def test_delete_user_policy_invalid_user():
assert response['ResponseMetadata']['HTTPStatusCode'] == 200
@attr(resource='user-policy')
@attr(method='delete')
@attr(operation='Verify Delete User Policy with invalid policy name')
@attr(assertion='succeeds')
@attr('user-policy')
@pytest.mark.user_policy
@attr('test_of_iam')
@pytest.mark.test_of_iam
def test_delete_user_policy_invalid_policy_name():
client = get_iam_client()
@ -482,13 +388,7 @@ def test_delete_user_policy_invalid_policy_name():
assert response['ResponseMetadata']['HTTPStatusCode'] == 200
@attr(resource='user-policy')
@attr(method='delete')
@attr(operation='Verify Delete multiple User policies for a user')
@attr(assertion='succeeds')
@attr('user-policy')
@pytest.mark.user_policy
@attr('test_of_iam')
@pytest.mark.test_of_iam
def test_delete_user_policy_from_multiple_policies():
client = get_iam_client()
@ -528,13 +428,7 @@ def test_delete_user_policy_from_multiple_policies():
assert response['ResponseMetadata']['HTTPStatusCode'] == 200
@attr(resource='user-policy')
@attr(method='s3 Actions')
@attr(operation='Verify Allow Bucket Actions in user Policy')
@attr(assertion='succeeds')
@attr('user-policy')
@pytest.mark.user_policy
@attr('test_of_iam')
@pytest.mark.test_of_iam
def test_allow_bucket_actions_in_user_policy():
client = get_iam_client()
@ -581,15 +475,8 @@ def test_allow_bucket_actions_in_user_policy():
assert response['ResponseMetadata']['HTTPStatusCode'] == 200
@attr(resource='user-policy')
@attr(method='s3 Actions')
@attr(operation='Verify Deny Bucket Actions in user Policy')
@attr(assertion='succeeds')
@attr('user-policy')
@pytest.mark.user_policy
@attr('test_of_iam')
@pytest.mark.test_of_iam
@attr('fails_on_dbstore')
def test_deny_bucket_actions_in_user_policy():
client = get_iam_client()
s3_client = get_alt_client()
@ -623,13 +510,7 @@ def test_deny_bucket_actions_in_user_policy():
assert response['ResponseMetadata']['HTTPStatusCode'] == 204
@attr(resource='user-policy')
@attr(method='s3 Actions')
@attr(operation='Verify Allow Object Actions in user Policy')
@attr(assertion='succeeds')
@attr('user-policy')
@pytest.mark.user_policy
@attr('test_of_iam')
@pytest.mark.test_of_iam
def test_allow_object_actions_in_user_policy():
client = get_iam_client()
@ -668,15 +549,8 @@ def test_allow_object_actions_in_user_policy():
assert response['ResponseMetadata']['HTTPStatusCode'] == 200
@attr(resource='user-policy')
@attr(method='s3 Actions')
@attr(operation='Verify Deny Object Actions in user Policy')
@attr(assertion='succeeds')
@attr('user-policy')
@pytest.mark.user_policy
@attr('test_of_iam')
@pytest.mark.test_of_iam
@attr('fails_on_dbstore')
def test_deny_object_actions_in_user_policy():
client = get_iam_client()
s3_client_alt = get_alt_client()
@ -714,13 +588,7 @@ def test_deny_object_actions_in_user_policy():
assert response['ResponseMetadata']['HTTPStatusCode'] == 200
@attr(resource='user-policy')
@attr(method='s3 Actions')
@attr(operation='Verify Allow Multipart Actions in user Policy')
@attr(assertion='succeeds')
@attr('user-policy')
@pytest.mark.user_policy
@attr('test_of_iam')
@pytest.mark.test_of_iam
def test_allow_multipart_actions_in_user_policy():
client = get_iam_client()
@ -755,15 +623,8 @@ def test_allow_multipart_actions_in_user_policy():
assert response['ResponseMetadata']['HTTPStatusCode'] == 200
@attr(resource='user-policy')
@attr(method='s3 Actions')
@attr(operation='Verify Deny Multipart Actions in user Policy')
@attr(assertion='succeeds')
@attr('user-policy')
@pytest.mark.user_policy
@attr('test_of_iam')
@pytest.mark.test_of_iam
@attr('fails_on_dbstore')
def test_deny_multipart_actions_in_user_policy():
client = get_iam_client()
s3_client = get_alt_client()
@ -804,15 +665,8 @@ def test_deny_multipart_actions_in_user_policy():
assert response['ResponseMetadata']['HTTPStatusCode'] == 200
@attr(resource='user-policy')
@attr(method='s3 Actions')
@attr(operation='Verify Allow Tagging Actions in user Policy')
@attr(assertion='succeeds')
@attr('user-policy')
@pytest.mark.user_policy
@attr('test_of_iam')
@pytest.mark.test_of_iam
@attr('fails_on_dbstore')
def test_allow_tagging_actions_in_user_policy():
client = get_iam_client()
s3_client_alt = get_alt_client()
@ -856,15 +710,8 @@ def test_allow_tagging_actions_in_user_policy():
assert response['ResponseMetadata']['HTTPStatusCode'] == 200
@attr(resource='user-policy')
@attr(method='s3 Actions')
@attr(operation='Verify Deny Tagging Actions in user Policy')
@attr(assertion='succeeds')
@attr('user-policy')
@pytest.mark.user_policy
@attr('test_of_iam')
@pytest.mark.test_of_iam
@attr('fails_on_dbstore')
def test_deny_tagging_actions_in_user_policy():
client = get_iam_client()
s3_client = get_alt_client()
@ -914,15 +761,8 @@ def test_deny_tagging_actions_in_user_policy():
assert response['ResponseMetadata']['HTTPStatusCode'] == 200
@attr(resource='user-policy')
@attr(method='put')
@attr(operation='Verify conflicting user policy statements')
@attr(assertion='succeeds')
@attr('user-policy')
@pytest.mark.user_policy
@attr('test_of_iam')
@pytest.mark.test_of_iam
@attr('fails_on_dbstore')
def test_verify_conflicting_user_policy_statements():
s3client = get_alt_client()
bucket = get_new_bucket(client=s3client)
@ -952,15 +792,8 @@ def test_verify_conflicting_user_policy_statements():
assert response['ResponseMetadata']['HTTPStatusCode'] == 200
@attr(resource='user-policy')
@attr(method='put')
@attr(operation='Verify conflicting user policies')
@attr(assertion='succeeds')
@attr('user-policy')
@pytest.mark.user_policy
@attr('test_of_iam')
@pytest.mark.test_of_iam
@attr('fails_on_dbstore')
def test_verify_conflicting_user_policies():
s3client = get_alt_client()
bucket = get_new_bucket(client=s3client)
@ -997,12 +830,7 @@ def test_verify_conflicting_user_policies():
assert response['ResponseMetadata']['HTTPStatusCode'] == 200
@attr(resource='user-policy')
@attr(operation='Verify Allow Actions for IAM user policies')
@attr(assertion='succeeds')
@attr('user-policy')
@pytest.mark.user_policy
@attr('test_of_iam')
@pytest.mark.test_of_iam
def test_verify_allow_iam_actions():
policy1 = json.dumps(

File diff suppressed because it is too large Load diff

View file

@ -1,9 +1,7 @@
import nose
import pytest
import random
import string
import re
from nose.plugins.attrib import attr
from botocore.exceptions import ClientError
import uuid
@ -78,13 +76,11 @@ def generate_s3select_expression_projection(bucket_name,obj_name):
# both results should be close (epsilon)
assert( abs(float(res.split("\n")[1]) - eval(e)) < epsilon )
@attr('s3select')
@pytest.mark.s3select
def get_random_string():
return uuid.uuid4().hex[:6].upper()
@attr('s3select')
@pytest.mark.s3select
def test_generate_where_clause():
@ -97,7 +93,6 @@ def test_generate_where_clause():
for _ in range(100):
generate_s3select_where_clause(bucket_name,obj_name)
@attr('s3select')
@pytest.mark.s3select
def test_generate_projection():
@ -313,7 +308,6 @@ def create_list_of_int(column_pos,obj,field_split=",",row_split="\n"):
return list_of_int
@attr('s3select')
@pytest.mark.s3select
def test_count_operation():
csv_obj_name = get_random_string()
@ -325,7 +319,6 @@ def test_count_operation():
s3select_assert_result( num_of_rows, int( res ))
@attr('s3select')
@pytest.mark.s3select
def test_column_sum_min_max():
csv_obj = create_random_csv_object(10000,10)
@ -391,7 +384,6 @@ def test_column_sum_min_max():
s3select_assert_result( int(count)*4 , int(sum1)-int(sum2) )
@attr('s3select')
@pytest.mark.s3select
def test_nullif_expressions():
@ -447,7 +439,6 @@ def test_nullif_expressions():
s3select_assert_result( res_s3select_nullif, res_s3select)
@attr('s3select')
@pytest.mark.s3select
def test_nulliftrue_expressions():
@ -475,7 +466,6 @@ def test_nulliftrue_expressions():
s3select_assert_result( res_s3select_nullif, res_s3select)
@attr('s3select')
@pytest.mark.s3select
def test_is_not_null_expressions():
@ -497,7 +487,6 @@ def test_is_not_null_expressions():
s3select_assert_result( res_s3select_null, res_s3select)
@attr('s3select')
@pytest.mark.s3select
def test_lowerupper_expressions():
@ -515,7 +504,6 @@ def test_lowerupper_expressions():
s3select_assert_result( res_s3select, "AB12CD$$")
@attr('s3select')
@pytest.mark.s3select
def test_in_expressions():
@ -586,7 +574,6 @@ def test_in_expressions():
s3select_assert_result( res_s3select_in, res_s3select )
@attr('s3select')
@pytest.mark.s3select
def test_true_false_in_expressions():
@ -632,7 +619,6 @@ def test_true_false_in_expressions():
s3select_assert_result( res_s3select_in, res_s3select )
@attr('s3select')
@pytest.mark.s3select
def test_like_expressions():
@ -720,7 +706,6 @@ def test_like_expressions():
s3select_assert_result( res_s3select_like, res_s3select )
@attr('s3select')
@pytest.mark.s3select
def test_truefalselike_expressions():
@ -766,7 +751,6 @@ def test_truefalselike_expressions():
s3select_assert_result( res_s3select_like, res_s3select )
@attr('s3select')
@pytest.mark.s3select
def test_nullif_expressions():
@ -794,7 +778,6 @@ def test_nullif_expressions():
assert res_s3select_nullif == res_s3select
@attr('s3select')
@pytest.mark.s3select
def test_lowerupper_expressions():
@ -812,7 +795,6 @@ def test_lowerupper_expressions():
assert res_s3select == "AB12CD$$"
@attr('s3select')
@pytest.mark.s3select
def test_in_expressions():
@ -853,7 +835,6 @@ def test_in_expressions():
assert res_s3select_in == res_s3select
@attr('s3select')
@pytest.mark.s3select
def test_like_expressions():
@ -900,7 +881,6 @@ def test_like_expressions():
assert res_s3select_in == res_s3select
@attr('s3select')
@pytest.mark.s3select
def test_complex_expressions():
@ -933,7 +913,6 @@ def test_complex_expressions():
s3select_assert_result( res_s3select_between_numbers, res_s3select_eq_modolu)
@attr('s3select')
@pytest.mark.s3select
def test_alias():
@ -955,7 +934,6 @@ def test_alias():
s3select_assert_result( res_s3select_alias, res_s3select_no_alias)
@attr('s3select')
@pytest.mark.s3select
def test_alias_cyclic_refernce():
@ -974,7 +952,6 @@ def test_alias_cyclic_refernce():
assert int(find_res) >= 0
@attr('s3select')
@pytest.mark.s3select
def test_datetime():
@ -1006,7 +983,6 @@ def test_datetime():
s3select_assert_result( res_s3select_date_time_to_timestamp, res_s3select_substring)
@attr('s3select')
@pytest.mark.s3select
def test_true_false_datetime():
@ -1041,7 +1017,6 @@ def test_true_false_datetime():
s3select_assert_result( res_s3select_date_time_utcnow, res_s3select_count)
@attr('s3select')
@pytest.mark.s3select
def test_csv_parser():
@ -1082,7 +1057,6 @@ def test_csv_parser():
res_s3select_alias = remove_xml_tags_from_result( run_s3select(bucket_name,csv_obj_name,"select _9 from s3object;") ).replace("\n","")
s3select_assert_result( res_s3select_alias, 'null')
@attr('s3select')
@pytest.mark.s3select
def test_csv_definition():
@ -1113,7 +1087,6 @@ def test_csv_definition():
s3select_assert_result( res_s3select, __res )
@attr('s3select')
@pytest.mark.s3select
def test_schema_definition():
@ -1149,7 +1122,6 @@ def test_schema_definition():
assert ((res_multiple_defintion.find("multiple definition of column {c4} as schema-column and alias")) >= 0)
@attr('s3select')
@pytest.mark.s3select
def test_when_then_else_expressions():
@ -1179,7 +1151,6 @@ def test_when_then_else_expressions():
s3select_assert_result( str(count3) , res2)
@attr('s3select')
@pytest.mark.s3select
def test_coalesce_expressions():
@ -1202,7 +1173,6 @@ def test_coalesce_expressions():
s3select_assert_result( res_s3select, res_coalesce)
@attr('s3select')
@pytest.mark.s3select
def test_cast_expressions():
@ -1224,7 +1194,6 @@ def test_cast_expressions():
s3select_assert_result( res_s3select, res)
@attr('s3select')
@pytest.mark.s3select
def test_version():
@ -1243,7 +1212,6 @@ def test_version():
s3select_assert_result( res_version, "41.a," )
@attr('s3select')
@pytest.mark.s3select
def test_trim_expressions():
@ -1283,7 +1251,6 @@ def test_trim_expressions():
s3select_assert_result( res_s3select_trim, res_s3select )
@attr('s3select')
@pytest.mark.s3select
def test_truefalse_trim_expressions():
@ -1323,7 +1290,6 @@ def test_truefalse_trim_expressions():
s3select_assert_result( res_s3select_trim, res_s3select )
@attr('s3select')
@pytest.mark.s3select
def test_escape_expressions():
@ -1345,7 +1311,6 @@ def test_escape_expressions():
s3select_assert_result( res_s3select_escape, res_s3select )
@attr('s3select')
@pytest.mark.s3select
def test_case_value_expressions():
@ -1361,7 +1326,6 @@ def test_case_value_expressions():
s3select_assert_result( res_s3select_case, res_s3select )
@attr('s3select')
@pytest.mark.s3select
def test_bool_cast_expressions():
@ -1377,7 +1341,6 @@ def test_bool_cast_expressions():
s3select_assert_result( res_s3select_cast, res_s3select )
@attr('s3select')
@pytest.mark.s3select
def test_progress_expressions():
@ -1405,7 +1368,6 @@ def test_progress_expressions():
# end response
s3select_assert_result({}, res_s3select_response[total_response-1])
@attr('s3select')
@pytest.mark.s3select
def test_output_serial_expressions():
return # TODO fix test

View file

@ -2,7 +2,6 @@ import boto3
import botocore.session
from botocore.exceptions import ClientError
from botocore.exceptions import ParamValidationError
from nose.plugins.attrib import attr
import pytest
import isodate
import email.utils
@ -19,7 +18,6 @@ import hashlib
import xml.etree.ElementTree as ET
import time
import operator
import nose
import os
import string
import random
@ -150,13 +148,7 @@ def get_s3_resource_using_iam_creds():
return s3_res_iam_creds
@attr(resource='get session token')
@attr(method='get')
@attr(operation='check')
@attr(assertion='s3 ops only accessible by temporary credentials')
@attr('test_of_sts')
@pytest.mark.test_of_sts
@attr('fails_on_dbstore')
@pytest.mark.fails_on_dbstore
def test_get_session_token():
iam_client=get_iam_client()
@ -186,13 +178,7 @@ def test_get_session_token():
finally: # clean up user policy even if create_bucket/delete_bucket fails
iam_client.delete_user_policy(UserName=sts_user_id,PolicyName=policy_name)
@attr(resource='get session token')
@attr(method='get')
@attr(operation='check')
@attr(assertion='s3 ops denied by permanent credentials')
@attr('test_of_sts')
@pytest.mark.test_of_sts
@attr('fails_on_dbstore')
@pytest.mark.fails_on_dbstore
def test_get_session_token_permanent_creds_denied():
s3bucket_error=None
@ -225,13 +211,7 @@ def test_get_session_token_permanent_creds_denied():
assert s3bucket_error == 'AccessDenied'
iam_client.delete_user_policy(UserName=sts_user_id,PolicyName=policy_name)
@attr(resource='assume role')
@attr(method='get')
@attr(operation='check')
@attr(assertion='role policy allows all s3 ops')
@attr('test_of_sts')
@pytest.mark.test_of_sts
@attr('fails_on_dbstore')
@pytest.mark.fails_on_dbstore
def test_assume_role_allow():
iam_client=get_iam_client()
@ -264,13 +244,7 @@ def test_assume_role_allow():
bkt = s3_client.delete_bucket(Bucket=bucket_name)
assert bkt['ResponseMetadata']['HTTPStatusCode'] == 204
@attr(resource='assume role')
@attr(method='get')
@attr(operation='check')
@attr(assertion='role policy denies all s3 ops')
@attr('test_of_sts')
@pytest.mark.test_of_sts
@attr('fails_on_dbstore')
@pytest.mark.fails_on_dbstore
def test_assume_role_deny():
s3bucket_error=None
@ -305,13 +279,7 @@ def test_assume_role_deny():
s3bucket_error = e.response.get("Error", {}).get("Code")
assert s3bucket_error == 'AccessDenied'
@attr(resource='assume role')
@attr(method='get')
@attr(operation='check')
@attr(assertion='creds expire so all s3 ops fails')
@attr('test_of_sts')
@pytest.mark.test_of_sts
@attr('fails_on_dbstore')
@pytest.mark.fails_on_dbstore
def test_assume_role_creds_expiry():
iam_client=get_iam_client()
@ -346,13 +314,7 @@ def test_assume_role_creds_expiry():
s3bucket_error = e.response.get("Error", {}).get("Code")
assert s3bucket_error == 'AccessDenied'
@attr(resource='assume role')
@attr(method='head')
@attr(operation='check')
@attr(assertion='HEAD fails with 403 when role policy denies s3:ListBucket')
@attr('test_of_sts')
@pytest.mark.test_of_sts
@attr('fails_on_dbstore')
@pytest.mark.fails_on_dbstore
def test_assume_role_deny_head_nonexistent():
# create a bucket with the normal s3 client
@ -390,13 +352,7 @@ def test_assume_role_deny_head_nonexistent():
status = e.response['ResponseMetadata']['HTTPStatusCode']
assert status == 403
@attr(resource='assume role')
@attr(method='head')
@attr(operation='check')
@attr(assertion='HEAD fails with 404 when role policy allows s3:ListBucket')
@attr('test_of_sts')
@pytest.mark.test_of_sts
@attr('fails_on_dbstore')
@pytest.mark.fails_on_dbstore
def test_assume_role_allow_head_nonexistent():
# create a bucket with the normal s3 client
@ -435,15 +391,8 @@ def test_assume_role_allow_head_nonexistent():
assert status == 404
@attr(resource='assume role with web identity')
@attr(method='get')
@attr(operation='check')
@attr(assertion='assuming role through web token')
@attr('webidentity_test')
@pytest.mark.webidentity_test
@attr('token_claims_trust_policy_test')
@pytest.mark.token_claims_trust_policy_test
@attr('fails_on_dbstore')
@pytest.mark.fails_on_dbstore
def test_assume_role_with_web_identity():
check_webidentity()
@ -492,11 +441,6 @@ def test_assume_role_with_web_identity():
)
'''
@attr(resource='assume role with web identity')
@attr(method='get')
@attr(operation='check')
@attr(assertion='assume_role_with_web_token creds expire')
@attr('webidentity_test')
@pytest.mark.webidentity_test
def test_assume_role_with_web_identity_invalid_webtoken():
resp_error=None
@ -543,15 +487,8 @@ def test_assume_role_with_web_identity_invalid_webtoken():
# Session Policy Tests
#######################
@attr(resource='assume role with web identity')
@attr(method='get')
@attr(operation='check')
@attr(assertion='checking session policy working for two different buckets')
@attr('webidentity_test')
@pytest.mark.webidentity_test
@attr('session_policy')
@pytest.mark.session_policy
@attr('fails_on_dbstore')
@pytest.mark.fails_on_dbstore
def test_session_policy_check_on_different_buckets():
check_webidentity()
@ -619,15 +556,8 @@ def test_session_policy_check_on_different_buckets():
)
@attr(resource='assume role with web identity')
@attr(method='put')
@attr(operation='check')
@attr(assertion='checking session policy working for same bucket')
@attr('webidentity_test')
@pytest.mark.webidentity_test
@attr('session_policy')
@pytest.mark.session_policy
@attr('fails_on_dbstore')
@pytest.mark.fails_on_dbstore
def test_session_policy_check_on_same_bucket():
check_webidentity()
@ -683,15 +613,8 @@ def test_session_policy_check_on_same_bucket():
)
@attr(resource='assume role with web identity')
@attr(method='get')
@attr(operation='check')
@attr(assertion='checking put_obj op denial')
@attr('webidentity_test')
@pytest.mark.webidentity_test
@attr('session_policy')
@pytest.mark.session_policy
@attr('fails_on_dbstore')
@pytest.mark.fails_on_dbstore
def test_session_policy_check_put_obj_denial():
check_webidentity()
@ -752,15 +675,8 @@ def test_session_policy_check_put_obj_denial():
)
@attr(resource='assume role with web identity')
@attr(method='get')
@attr(operation='check')
@attr(assertion='checking put_obj working by swapping policies')
@attr('webidentity_test')
@pytest.mark.webidentity_test
@attr('session_policy')
@pytest.mark.session_policy
@attr('fails_on_dbstore')
@pytest.mark.fails_on_dbstore
def test_swapping_role_policy_and_session_policy():
check_webidentity()
@ -816,15 +732,8 @@ def test_swapping_role_policy_and_session_policy():
OpenIDConnectProviderArn=oidc_arn
)
@attr(resource='assume role with web identity')
@attr(method='put')
@attr(operation='check')
@attr(assertion='checking put_obj working by setting different permissions to role and session policy')
@attr('webidentity_test')
@pytest.mark.webidentity_test
@attr('session_policy')
@pytest.mark.session_policy
@attr('fails_on_dbstore')
@pytest.mark.fails_on_dbstore
def test_session_policy_check_different_op_permissions():
check_webidentity()
@ -885,15 +794,8 @@ def test_session_policy_check_different_op_permissions():
)
@attr(resource='assume role with web identity')
@attr(method='put')
@attr(operation='check')
@attr(assertion='checking op behaviour with deny effect')
@attr('webidentity_test')
@pytest.mark.webidentity_test
@attr('session_policy')
@pytest.mark.session_policy
@attr('fails_on_dbstore')
@pytest.mark.fails_on_dbstore
def test_session_policy_check_with_deny_effect():
check_webidentity()
@ -953,15 +855,8 @@ def test_session_policy_check_with_deny_effect():
)
@attr(resource='assume role with web identity')
@attr(method='put')
@attr(operation='check')
@attr(assertion='checking put_obj working with deny and allow on same op')
@attr('webidentity_test')
@pytest.mark.webidentity_test
@attr('session_policy')
@pytest.mark.session_policy
@attr('fails_on_dbstore')
@pytest.mark.fails_on_dbstore
def test_session_policy_check_with_deny_on_same_op():
check_webidentity()
@ -1021,15 +916,8 @@ def test_session_policy_check_with_deny_on_same_op():
OpenIDConnectProviderArn=oidc_arn
)
@attr(resource='assume role with web identity')
@attr(method='put')
@attr(operation='check')
@attr(assertion='checking op when bucket policy has role arn')
@attr('webidentity_test')
@pytest.mark.webidentity_test
@attr('session_policy')
@pytest.mark.session_policy
@attr('fails_on_dbstore')
@pytest.mark.fails_on_dbstore
def test_session_policy_bucket_policy_role_arn():
check_webidentity()
@ -1104,15 +992,8 @@ def test_session_policy_bucket_policy_role_arn():
OpenIDConnectProviderArn=oidc_arn
)
@attr(resource='assume role with web identity')
@attr(method='get')
@attr(operation='check')
@attr(assertion='checking op when bucket policy has session arn')
@attr('webidentity_test')
@pytest.mark.webidentity_test
@attr('session_policy')
@pytest.mark.session_policy
@attr('fails_on_dbstore')
@pytest.mark.fails_on_dbstore
def test_session_policy_bucket_policy_session_arn():
check_webidentity()
@ -1185,15 +1066,8 @@ def test_session_policy_bucket_policy_session_arn():
OpenIDConnectProviderArn=oidc_arn
)
@attr(resource='assume role with web identity')
@attr(method='put')
@attr(operation='check')
@attr(assertion='checking copy object op with role, session and bucket policy')
@attr('webidentity_test')
@pytest.mark.webidentity_test
@attr('session_policy')
@pytest.mark.session_policy
@attr('fails_on_dbstore')
@pytest.mark.fails_on_dbstore
def test_session_policy_copy_object():
check_webidentity()
@ -1273,15 +1147,8 @@ def test_session_policy_copy_object():
OpenIDConnectProviderArn=oidc_arn
)
@attr(resource='assume role with web identity')
@attr(method='put')
@attr(operation='check')
@attr(assertion='checking op is denied when no role policy')
@attr('webidentity_test')
@pytest.mark.webidentity_test
@attr('session_policy')
@pytest.mark.session_policy
@attr('fails_on_dbstore')
@pytest.mark.fails_on_dbstore
def test_session_policy_no_bucket_role_policy():
check_webidentity()
@ -1332,15 +1199,8 @@ def test_session_policy_no_bucket_role_policy():
OpenIDConnectProviderArn=oidc_arn
)
@attr(resource='assume role with web identity')
@attr(method='put')
@attr(operation='check')
@attr(assertion='checking op is denied when resource policy denies')
@attr('webidentity_test')
@pytest.mark.webidentity_test
@attr('session_policy')
@pytest.mark.session_policy
@attr('fails_on_dbstore')
@pytest.mark.fails_on_dbstore
def test_session_policy_bucket_policy_deny():
check_webidentity()
@ -1413,15 +1273,8 @@ def test_session_policy_bucket_policy_deny():
OpenIDConnectProviderArn=oidc_arn
)
@attr(resource='assume role with web identity')
@attr(method='get')
@attr(operation='check')
@attr(assertion='assuming role using web token using sub in trust policy')
@attr('webidentity_test')
@pytest.mark.webidentity_test
@attr('token_claims_trust_policy_test')
@pytest.mark.token_claims_trust_policy_test
@attr('fails_on_dbstore')
@pytest.mark.fails_on_dbstore
def test_assume_role_with_web_identity_with_sub():
check_webidentity()
@ -1469,15 +1322,8 @@ def test_assume_role_with_web_identity_with_sub():
OpenIDConnectProviderArn=oidc_response["OpenIDConnectProviderArn"]
)
@attr(resource='assume role with web identity')
@attr(method='get')
@attr(operation='check')
@attr(assertion='assuming role using web token using azp in trust policy')
@attr('webidentity_test')
@pytest.mark.webidentity_test
@attr('token_claims_trust_policy_test')
@pytest.mark.token_claims_trust_policy_test
@attr('fails_on_dbstore')
@pytest.mark.fails_on_dbstore
def test_assume_role_with_web_identity_with_azp():
check_webidentity()
@ -1525,17 +1371,9 @@ def test_assume_role_with_web_identity_with_azp():
OpenIDConnectProviderArn=oidc_response["OpenIDConnectProviderArn"]
)
@attr(resource='assume role with web identity')
@attr(method='get')
@attr(operation='check')
@attr(assertion='assuming role using web token using aws:RequestTag in trust policy')
@attr('webidentity_test')
@pytest.mark.webidentity_test
@attr('abac_test')
@pytest.mark.abac_test
@attr('token_request_tag_trust_policy_test')
@pytest.mark.token_request_tag_trust_policy_test
@attr('fails_on_dbstore')
@pytest.mark.fails_on_dbstore
def test_assume_role_with_web_identity_with_request_tag():
check_webidentity()
@ -1582,17 +1420,9 @@ def test_assume_role_with_web_identity_with_request_tag():
OpenIDConnectProviderArn=oidc_response["OpenIDConnectProviderArn"]
)
@attr(resource='assume role with web identity')
@attr(method='get')
@attr(operation='check')
@attr(assertion='assuming role using web token with aws:PrincipalTag in role policy')
@attr('webidentity_test')
@pytest.mark.webidentity_test
@attr('abac_test')
@pytest.mark.abac_test
@attr('token_principal_tag_role_policy_test')
@pytest.mark.token_principal_tag_role_policy_test
@attr('fails_on_dbstore')
@pytest.mark.fails_on_dbstore
def test_assume_role_with_web_identity_with_principal_tag():
check_webidentity()
@ -1639,17 +1469,9 @@ def test_assume_role_with_web_identity_with_principal_tag():
OpenIDConnectProviderArn=oidc_response["OpenIDConnectProviderArn"]
)
@attr(resource='assume role with web identity')
@attr(method='get')
@attr(operation='check')
@attr(assertion='assuming role using web token with aws:PrincipalTag in role policy')
@attr('webidentity_test')
@pytest.mark.webidentity_test
@attr('abac_test')
@pytest.mark.abac_test
@attr('token_principal_tag_role_policy_test')
@pytest.mark.token_principal_tag_role_policy_test
@attr('fails_on_dbstore')
@pytest.mark.fails_on_dbstore
def test_assume_role_with_web_identity_for_all_values():
check_webidentity()
@ -1696,17 +1518,9 @@ def test_assume_role_with_web_identity_for_all_values():
OpenIDConnectProviderArn=oidc_response["OpenIDConnectProviderArn"]
)
@attr(resource='assume role with web identity')
@attr(method='get')
@attr(operation='check')
@attr(assertion='assuming role using web token with aws:PrincipalTag in role policy')
@attr('webidentity_test')
@pytest.mark.webidentity_test
@attr('abac_test')
@pytest.mark.abac_test
@attr('token_principal_tag_role_policy_test')
@pytest.mark.token_principal_tag_role_policy_test
@attr('fails_on_dbstore')
@pytest.mark.fails_on_dbstore
def test_assume_role_with_web_identity_for_all_values_deny():
check_webidentity()
@ -1755,17 +1569,9 @@ def test_assume_role_with_web_identity_for_all_values_deny():
OpenIDConnectProviderArn=oidc_response["OpenIDConnectProviderArn"]
)
@attr(resource='assume role with web identity')
@attr(method='get')
@attr(operation='check')
@attr(assertion='assuming role using web token with aws:TagKeys in trust policy')
@attr('webidentity_test')
@pytest.mark.webidentity_test
@attr('abac_test')
@pytest.mark.abac_test
@attr('token_tag_keys_test')
@pytest.mark.token_tag_keys_test
@attr('fails_on_dbstore')
@pytest.mark.fails_on_dbstore
def test_assume_role_with_web_identity_tag_keys_trust_policy():
check_webidentity()
@ -1812,17 +1618,9 @@ def test_assume_role_with_web_identity_tag_keys_trust_policy():
OpenIDConnectProviderArn=oidc_response["OpenIDConnectProviderArn"]
)
@attr(resource='assume role with web identity')
@attr(method='get')
@attr(operation='check')
@attr(assertion='assuming role using web token with aws:TagKeys in role permission policy')
@attr('webidentity_test')
@pytest.mark.webidentity_test
@attr('abac_test')
@pytest.mark.abac_test
@attr('token_tag_keys_test')
@pytest.mark.token_tag_keys_test
@attr('fails_on_dbstore')
@pytest.mark.fails_on_dbstore
def test_assume_role_with_web_identity_tag_keys_role_policy():
check_webidentity()
@ -1869,17 +1667,9 @@ def test_assume_role_with_web_identity_tag_keys_role_policy():
OpenIDConnectProviderArn=oidc_response["OpenIDConnectProviderArn"]
)
@attr(resource='assume role with web identity')
@attr(method='put')
@attr(operation='check')
@attr(assertion='assuming role using web token with s3:ResourceTag in role permission policy')
@attr('webidentity_test')
@pytest.mark.webidentity_test
@attr('abac_test')
@pytest.mark.abac_test
@attr('token_resource_tags_test')
@pytest.mark.token_resource_tags_test
@attr('fails_on_dbstore')
@pytest.mark.fails_on_dbstore
def test_assume_role_with_web_identity_resource_tag():
check_webidentity()
@ -1936,17 +1726,9 @@ def test_assume_role_with_web_identity_resource_tag():
OpenIDConnectProviderArn=oidc_response["OpenIDConnectProviderArn"]
)
@attr(resource='assume role with web identity')
@attr(method='put')
@attr(operation='check')
@attr(assertion='assuming role using web token with s3:ResourceTag with missing tags on bucket')
@attr('webidentity_test')
@pytest.mark.webidentity_test
@attr('abac_test')
@pytest.mark.abac_test
@attr('token_resource_tags_test')
@pytest.mark.token_resource_tags_test
@attr('fails_on_dbstore')
@pytest.mark.fails_on_dbstore
def test_assume_role_with_web_identity_resource_tag_deny():
check_webidentity()
@ -2003,17 +1785,9 @@ def test_assume_role_with_web_identity_resource_tag_deny():
OpenIDConnectProviderArn=oidc_response["OpenIDConnectProviderArn"]
)
@attr(resource='assume role with web identity')
@attr(method='put')
@attr(operation='check')
@attr(assertion='assuming role using web token with s3:ResourceTag with wrong resource tag in policy')
@attr('webidentity_test')
@pytest.mark.webidentity_test
@attr('abac_test')
@pytest.mark.abac_test
@attr('token_resource_tags_test')
@pytest.mark.token_resource_tags_test
@attr('fails_on_dbstore')
@pytest.mark.fails_on_dbstore
def test_assume_role_with_web_identity_wrong_resource_tag_deny():
check_webidentity()
@ -2073,17 +1847,9 @@ def test_assume_role_with_web_identity_wrong_resource_tag_deny():
OpenIDConnectProviderArn=oidc_response["OpenIDConnectProviderArn"]
)
@attr(resource='assume role with web identity')
@attr(method='put')
@attr(operation='check')
@attr(assertion='assuming role using web token with s3:ResourceTag matching aws:PrincipalTag in role permission policy')
@attr('webidentity_test')
@pytest.mark.webidentity_test
@attr('abac_test')
@pytest.mark.abac_test
@attr('token_resource_tags_test')
@pytest.mark.token_resource_tags_test
@attr('fails_on_dbstore')
@pytest.mark.fails_on_dbstore
def test_assume_role_with_web_identity_resource_tag_princ_tag():
check_webidentity()
@ -2145,17 +1911,9 @@ def test_assume_role_with_web_identity_resource_tag_princ_tag():
OpenIDConnectProviderArn=oidc_response["OpenIDConnectProviderArn"]
)
@attr(resource='assume role with web identity')
@attr(method='put')
@attr(operation='check')
@attr(assertion='assuming role using web token with s3:ResourceTag used to test copy object')
@attr('webidentity_test')
@pytest.mark.webidentity_test
@attr('abac_test')
@pytest.mark.abac_test
@attr('token_resource_tags_test')
@pytest.mark.token_resource_tags_test
@attr('fails_on_dbstore')
@pytest.mark.fails_on_dbstore
def test_assume_role_with_web_identity_resource_tag_copy_obj():
check_webidentity()
@ -2244,17 +2002,9 @@ def test_assume_role_with_web_identity_resource_tag_copy_obj():
OpenIDConnectProviderArn=oidc_response["OpenIDConnectProviderArn"]
)
@attr(resource='assume role with web identity')
@attr(method='put')
@attr(operation='check')
@attr(assertion='assuming role using web token with iam:ResourceTag in role trust policy')
@attr('webidentity_test')
@pytest.mark.webidentity_test
@attr('abac_test')
@pytest.mark.abac_test
@attr('token_role_tags_test')
@pytest.mark.token_role_tags_test
@attr('fails_on_dbstore')
@pytest.mark.fails_on_dbstore
def test_assume_role_with_web_identity_role_resource_tag():
check_webidentity()