Merge pull request #482 from cbodley/wip-tox-pytest

replace deprecated nose with pytest
This commit is contained in:
Ali Maredia 2023-01-25 17:46:57 -05:00 committed by GitHub
commit b8422a2055
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
19 changed files with 2761 additions and 5922 deletions

View file

@ -6,14 +6,10 @@ This is a set of unofficial Amazon AWS S3 compatibility
tests, that can be useful to people implementing software
that exposes an S3-like API. The tests use the Boto2 and Boto3 libraries.
The tests use the Nose test framework. To get started, ensure you have
the ``virtualenv`` software installed; e.g. on Debian/Ubuntu::
The tests use the Tox tool. To get started, ensure you have the ``tox``
software installed; e.g. on Debian/Ubuntu::
sudo apt-get install python-virtualenv
and then run::
./bootstrap
sudo apt-get install tox
You will need to create a configuration file with the location of the
service and two different credentials. A sample configuration file named
@ -22,29 +18,25 @@ used to run the s3 tests on a Ceph cluster started with vstart.
Once you have that file copied and edited, you can run the tests with::
S3TEST_CONF=your.conf ./virtualenv/bin/nosetests
S3TEST_CONF=your.conf tox
You can specify which directory of tests to run::
S3TEST_CONF=your.conf ./virtualenv/bin/nosetests s3tests_boto3.functional
S3TEST_CONF=your.conf tox s3tests_boto3/functional
You can specify which file of tests to run::
S3TEST_CONF=your.conf ./virtualenv/bin/nosetests s3tests_boto3.functional.test_s3
S3TEST_CONF=your.conf tox s3tests_boto3/functional/test_s3.py
You can specify which test to run::
S3TEST_CONF=your.conf ./virtualenv/bin/nosetests s3tests_boto3.functional.test_s3:test_bucket_list_empty
To gather a list of tests being run, use the flags::
-v --collect-only
S3TEST_CONF=your.conf tox s3tests_boto3/functional/test_s3.py::test_bucket_list_empty
Some tests have attributes set based on their current reliability and
things like AWS not enforcing their spec stricly. You can filter tests
based on their attributes::
S3TEST_CONF=aws.conf ./virtualenv/bin/nosetests -a '!fails_on_aws'
S3TEST_CONF=aws.conf tox -- -m 'not fails_on_aws'
Most of the tests have both Boto3 and Boto2 versions. Tests written in
Boto2 are in the ``s3tests`` directory. Tests written in Boto3 are
@ -52,7 +44,7 @@ located in the ``s3test_boto3`` directory.
You can run only the boto3 tests with::
S3TEST_CONF=your.conf ./virtualenv/bin/nosetests -v -s -A 'not fails_on_rgw' s3tests_boto3.functional
S3TEST_CONF=your.conf tox -- -m 'not fails_on_aws' s3tests_boto3/functional
========================
STS compatibility tests
@ -62,11 +54,11 @@ This section contains some basic tests for the AssumeRole, GetSessionToken and A
You can run only the sts tests (all the three API's) with::
S3TEST_CONF=your.conf ./virtualenv/bin/nosetests s3tests_boto3.functional.test_sts
S3TEST_CONF=your.conf tox s3tests_boto3/functional/test_sts.py
You can filter tests based on the attributes. There is a attribute named ``test_of_sts`` to run AssumeRole and GetSessionToken tests and ``webidentity_test`` to run the AssumeRoleWithWebIdentity tests. If you want to execute only ``test_of_sts`` tests you can apply that filter as below::
S3TEST_CONF=your.conf ./virtualenv/bin/nosetests -v -s -A 'test_of_sts' s3tests_boto3.functional.test_sts
S3TEST_CONF=your.conf tox -- -m test_of_sts s3tests_boto3/functional/test_sts.py
For running ``webidentity_test`` you'll need have Keycloak running.
@ -88,14 +80,14 @@ Adding above capabilities to "iam" user is also taken care by vstart (If Ceph cl
To run these tests, create configuration file with section "iam" and "s3 alt" refer s3tests.conf.SAMPLE.
Once you have that configuration file copied and edited, you can run all the tests with::
S3TEST_CONF=your.conf ./virtualenv/bin/nosetests s3tests_boto3.functional.test_iam
S3TEST_CONF=your.conf tox s3tests_boto3/functional/test_iam.py
You can also specify specific test to run::
S3TEST_CONF=your.conf ./virtualenv/bin/nosetests s3tests_boto3.functional.test_iam:test_put_user_policy
S3TEST_CONF=your.conf tox s3tests_boto3/functional/test_iam.py::test_put_user_policy
Some tests have attributes set such as "fails_on_rgw".
You can filter tests based on their attributes::
S3TEST_CONF=your.conf ./virtualenv/bin/nosetests s3tests_boto3.functional.test_iam -a '!fails_on_rgw'
S3TEST_CONF=your.conf tox -- s3tests_boto3/functional/test_iam.py -m 'not fails_on_rgw'

View file

@ -1,76 +0,0 @@
#!/bin/bash
set -e
virtualenv="virtualenv"
declare -a packages
source /etc/os-release
case "$ID" in
debian|ubuntu|devuan)
packages=(debianutils python3-pip python3-virtualenv python3-dev libevent-dev libffi-dev libxml2-dev libxslt-dev zlib1g-dev)
for package in ${packages[@]}; do
if [ "$(dpkg --status -- $package 2>/dev/null|sed -n 's/^Status: //p')" != "install ok installed" ]; then
# add a space after old values
missing="${missing:+$missing }$package"
fi
done
if [ -n "$missing" ]; then
echo "$0: missing required DEB packages. Installing via sudo." 1>&2
sudo apt-get -y install $missing
fi
;;
centos|fedora|rhel|rocky|ol|virtuozzo)
packages=(which python3-virtualenv python36-devel libevent-devel libffi-devel libxml2-devel libxslt-devel zlib-devel)
for package in ${packages[@]}; do
# When the package is python36-devel we change it to python3-devel on Fedora
if [[ ${package} == "python36-devel" && -f /etc/fedora-release ]]; then
package=python36
fi
if [ "$(rpm -qa $package 2>/dev/null)" == "" ]; then
missing="${missing:+$missing }$package"
fi
done
if [ -n "$missing" ]; then
echo "$0: Missing required RPM packages: ${missing}." 1>&2
sudo yum -y install $missing
fi
;;
opensuse*|suse|sles)
packages=(which python3-virtualenv python3-devel libev-devel libffi-devel libxml2-devel libxslt-devel zlib-devel)
for package in ${packages[@]}; do
if [ "$(rpm -qa $package 2>/dev/null)" == "" ]; then
missing="${missing:+$missing }$package"
fi
if [ -n "$missing" ]; then
echo "$0: Missing required RPM packages: ${missing}." 1>&2
sudo zypper --non-interactive install --no-recommends $missing
fi
done
;;
*)
echo "Bootstrap script does not support this distro yet, consider adding the packages"
exit 1
esac
# s3-tests only works on python 3.6 not newer versions of python3
${virtualenv} --python=$(which python3.6) virtualenv
# avoid pip bugs
./virtualenv/bin/pip3 install --upgrade pip
# latest setuptools supporting python 2.7
./virtualenv/bin/pip install setuptools==44.1.0
./virtualenv/bin/pip3 install -r requirements.txt
# forbid setuptools from using the network because it'll try to use
# easy_install, and we really wanted pip; next line will fail if pip
# requirements.txt does not match setup.py requirements -- sucky but
# good enough for now
./virtualenv/bin/python3 setup.py develop

43
pytest.ini Normal file
View file

@ -0,0 +1,43 @@
[pytest]
markers =
abac_test
appendobject
auth_aws2
auth_aws4
auth_common
bucket_policy
bucket_encryption
cloud_transition
encryption
fails_on_aws
fails_on_dbstore
fails_on_dho
fails_on_mod_proxy_fcgi
fails_on_rgw
fails_on_s3
fails_with_subdomain
lifecycle
lifecycle_expiration
lifecycle_transition
list_objects_v2
object_lock
session_policy
s3select
s3website
s3website_routing_rules
s3website_redirect_location
3website
sse_s3
storage_class
tagging
test_of_iam
test_of_sts
token_claims_trust_policy_test
token_principal_tag_role_policy_test
token_request_tag_trust_policy_test
token_resource_tags_test
token_role_tags_test
token_tag_keys_test
user_policy
versioning
webidentity_test

View file

@ -1,5 +1,4 @@
PyYAML
nose >=1.0.0
boto >=2.6.0
boto3 >=1.0.0
# botocore-1.28 broke v2 signatures, see https://tracker.ceph.com/issues/58059
@ -12,3 +11,4 @@ requests >=2.23.0
pytz >=2011k
httplib2
lxml
pytest

View file

@ -7,6 +7,7 @@ import itertools
import os
import random
import string
import pytest
from http.client import HTTPConnection, HTTPSConnection
from urllib.parse import urlparse
@ -370,6 +371,15 @@ def teardown():
# remove our buckets here also, to avoid littering
nuke_prefixed_buckets(prefix=prefix)
@pytest.fixture(scope="package")
def configfile():
setup()
yield config
@pytest.fixture(autouse=True)
def setup_teardown(configfile):
yield
teardown()
bucket_counter = itertools.count(1)

File diff suppressed because it is too large Load diff

View file

@ -7,7 +7,7 @@ import datetime
import time
import email.utils
import isodate
import nose
import pytest
import operator
import socket
import ssl
@ -27,16 +27,14 @@ import re
from collections import defaultdict
from urllib.parse import urlparse
from nose.tools import eq_ as eq
from nose.plugins.attrib import attr
from nose.plugins.skip import SkipTest
from . import utils
from .utils import assert_raises
from .policy import Policy, Statement, make_json_policy
from . import (
configfile,
setup_teardown,
nuke_prefixed_buckets,
get_new_bucket,
get_new_bucket_name,
@ -53,9 +51,9 @@ from . import (
def check_access_denied(fn, *args, **kwargs):
e = assert_raises(boto.exception.S3ResponseError, fn, *args, **kwargs)
eq(e.status, 403)
eq(e.reason, 'Forbidden')
eq(e.error_code, 'AccessDenied')
assert e.status == 403
assert e.reason == 'Forbidden'
assert e.error_code == 'AccessDenied'
def check_bad_bucket_name(name):
"""
@ -63,9 +61,9 @@ def check_bad_bucket_name(name):
that the request fails because of an invalid bucket name.
"""
e = assert_raises(boto.exception.S3ResponseError, get_new_bucket, targets.main.default, name)
eq(e.status, 400)
eq(e.reason.lower(), 'bad request') # some proxies vary the case
eq(e.error_code, 'InvalidBucketName')
assert e.status == 400
assert e.reason.lower() == 'bad request' # some proxies vary the case
assert e.error_code == 'InvalidBucketName'
def _create_keys(bucket=None, keys=[]):
"""
@ -94,20 +92,16 @@ def _get_alt_connection():
# Breaks DNS with SubdomainCallingFormat
@attr('fails_with_subdomain')
@attr(resource='bucket')
@attr(method='put')
@attr(operation='create w/! in name')
@attr(assertion='fails with subdomain')
@pytest.mark.fails_with_subdomain
def test_bucket_create_naming_bad_punctuation():
# characters other than [a-zA-Z0-9._-]
check_bad_bucket_name('alpha!soup')
def check_versioning(bucket, status):
try:
eq(bucket.get_versioning_status()['Versioning'], status)
assert bucket.get_versioning_status()['Versioning'] == status
except KeyError:
eq(status, None)
assert status == None
# amazon is eventual consistent, retry a bit if failed
def check_configure_versioning_retry(bucket, status, expected_string):
@ -126,14 +120,10 @@ def check_configure_versioning_retry(bucket, status, expected_string):
time.sleep(1)
eq(expected_string, read_status)
assert expected_string == read_status
@attr(resource='object')
@attr(method='create')
@attr(operation='create versioned object, read not exist null version')
@attr(assertion='read null version behaves correctly')
@attr('versioning')
@attr('fails_on_dbstore')
@pytest.mark.versioning
@pytest.mark.fails_on_dbstore
def test_versioning_obj_read_not_exist_null():
bucket = get_new_bucket()
check_versioning(bucket, None)
@ -147,16 +137,12 @@ def test_versioning_obj_read_not_exist_null():
key.set_contents_from_string(content)
key = bucket.get_key(objname, version_id='null')
eq(key, None)
assert key == None
@attr(resource='object')
@attr(method='put')
@attr(operation='append object')
@attr(assertion='success')
@attr('fails_on_aws')
@attr('fails_with_subdomain')
@attr('appendobject')
@attr('fails_on_dbstore')
@pytest.mark.fails_on_aws
@pytest.mark.fails_with_subdomain
@pytest.mark.appendobject
@pytest.mark.fails_on_dbstore
def test_append_object():
bucket = get_new_bucket()
key = bucket.new_key('foo')
@ -168,20 +154,16 @@ def test_append_object():
res = _make_raw_request(host=s3.main.host, port=s3.main.port, method='PUT', path=path1, body='abc', secure=s3.main.is_secure)
path2 = path + '&append&position=3'
res = _make_raw_request(host=s3.main.host, port=s3.main.port, method='PUT', path=path2, body='abc', secure=s3.main.is_secure)
eq(res.status, 200)
eq(res.reason, 'OK')
assert res.status == 200
assert res.reason == 'OK'
key = bucket.get_key('foo')
eq(key.size, 6)
assert key.size == 6
@attr(resource='object')
@attr(method='put')
@attr(operation='append to normal object')
@attr(assertion='fails 409')
@attr('fails_on_aws')
@attr('fails_with_subdomain')
@attr('appendobject')
@attr('fails_on_dbstore')
@pytest.mark.fails_on_aws
@pytest.mark.fails_with_subdomain
@pytest.mark.appendobject
@pytest.mark.fails_on_dbstore
def test_append_normal_object():
bucket = get_new_bucket()
key = bucket.new_key('foo')
@ -192,17 +174,13 @@ def test_append_normal_object():
path = o.path + '?' + o.query
path = path + '&append&position=3'
res = _make_raw_request(host=s3.main.host, port=s3.main.port, method='PUT', path=path, body='abc', secure=s3.main.is_secure)
eq(res.status, 409)
assert res.status == 409
@attr(resource='object')
@attr(method='put')
@attr(operation='append position not right')
@attr(assertion='fails 409')
@attr('fails_on_aws')
@attr('fails_with_subdomain')
@attr('appendobject')
@attr('fails_on_dbstore')
@pytest.mark.fails_on_aws
@pytest.mark.fails_with_subdomain
@pytest.mark.appendobject
@pytest.mark.fails_on_dbstore
def test_append_object_position_wrong():
bucket = get_new_bucket()
key = bucket.new_key('foo')
@ -214,17 +192,13 @@ def test_append_object_position_wrong():
res = _make_raw_request(host=s3.main.host, port=s3.main.port, method='PUT', path=path1, body='abc', secure=s3.main.is_secure)
path2 = path + '&append&position=9'
res = _make_raw_request(host=s3.main.host, port=s3.main.port, method='PUT', path=path2, body='abc', secure=s3.main.is_secure)
eq(res.status, 409)
eq(int(res.getheader('x-rgw-next-append-position')), 3)
assert res.status == 409
assert int(res.getheader('x-rgw-next-append-position')) == 3
# TODO rgw log_bucket.set_as_logging_target() gives 403 Forbidden
# http://tracker.newdream.net/issues/984
@attr(resource='bucket.log')
@attr(method='put')
@attr(operation='set/enable/disable logging target')
@attr(assertion='operations succeed')
@attr('fails_on_rgw')
@pytest.mark.fails_on_rgw
def test_logging_toggle():
bucket = get_new_bucket()
log_bucket = get_new_bucket(targets.main.default, bucket.name + '-log')
@ -311,13 +285,13 @@ def gen_rand_string(size, chars=string.ascii_uppercase + string.digits):
def verify_object(bucket, k, data=None, storage_class=None):
if storage_class:
eq(k.storage_class, storage_class)
assert k.storage_class == storage_class
if data:
read_data = k.get_contents_as_string()
equal = data == read_data # avoid spamming log if data not equal
eq(equal, True)
assert equal == True
def copy_object_storage_class(src_bucket, src_key, dest_bucket, dest_key, storage_class):
query_args=None
@ -333,7 +307,7 @@ def copy_object_storage_class(src_bucket, src_key, dest_bucket, dest_key, storag
res = dest_bucket.connection.make_request('PUT', dest_bucket.name, dest_key.name,
query_args=query_args, headers=headers)
eq(res.status, 200)
assert res.status == 200
def _populate_multipart_key(bucket, kname, size, storage_class=None):
(upload, data) = _multipart_upload(bucket, kname, size, storage_class=storage_class)
@ -401,16 +375,13 @@ def lc_transitions(transitions=None):
return result
@attr(resource='object')
@attr(method='put')
@attr(operation='test create object with storage class')
@attr('storage_class')
@attr('fails_on_aws')
@attr('fails_on_dbstore')
@pytest.mark.storage_class
@pytest.mark.fails_on_aws
@pytest.mark.fails_on_dbstore
def test_object_storage_class():
sc = configured_storage_classes()
if len(sc) < 2:
raise SkipTest
pytest.skip('requires multiple storage classes')
bucket = get_new_bucket()
@ -420,16 +391,13 @@ def test_object_storage_class():
verify_object(bucket, k, data, storage_class)
@attr(resource='object')
@attr(method='put')
@attr(operation='test create multipart object with storage class')
@attr('storage_class')
@attr('fails_on_aws')
@attr('fails_on_dbstore')
@pytest.mark.storage_class
@pytest.mark.fails_on_aws
@pytest.mark.fails_on_dbstore
def test_object_storage_class_multipart():
sc = configured_storage_classes()
if len(sc) < 2:
raise SkipTest
pytest.skip('requires multiple storage classes')
bucket = get_new_bucket()
size = 11 * 1024 * 1024
@ -439,13 +407,13 @@ def test_object_storage_class_multipart():
(upload, data) = _multipart_upload(bucket, key, size, storage_class=storage_class)
upload.complete_upload()
key2 = bucket.get_key(key)
eq(key2.size, size)
eq(key2.storage_class, storage_class)
assert key2.size == size
assert key2.storage_class == storage_class
def _do_test_object_modify_storage_class(obj_write_func, size):
sc = configured_storage_classes()
if len(sc) < 2:
raise SkipTest
pytest.skip('requires multiple storage classes')
bucket = get_new_bucket()
@ -462,29 +430,23 @@ def _do_test_object_modify_storage_class(obj_write_func, size):
copy_object_storage_class(bucket, k, bucket, k, new_storage_class)
verify_object(bucket, k, data, storage_class)
@attr(resource='object')
@attr(method='put')
@attr(operation='test changing objects storage class')
@attr('storage_class')
@attr('fails_on_aws')
@attr('fails_on_dbstore')
@pytest.mark.storage_class
@pytest.mark.fails_on_aws
@pytest.mark.fails_on_dbstore
def test_object_modify_storage_class():
_do_test_object_modify_storage_class(_populate_key, size=9*1024*1024)
@attr(resource='object')
@attr(method='put')
@attr(operation='test changing objects storage class')
@attr('storage_class')
@attr('fails_on_aws')
@attr('fails_on_dbstore')
@pytest.mark.storage_class
@pytest.mark.fails_on_aws
@pytest.mark.fails_on_dbstore
def test_object_modify_storage_class_multipart():
_do_test_object_modify_storage_class(_populate_multipart_key, size=11*1024*1024)
def _do_test_object_storage_class_copy(obj_write_func, size):
sc = configured_storage_classes()
if len(sc) < 2:
raise SkipTest
pytest.skip('requires multiple storage classes')
src_bucket = get_new_bucket()
dest_bucket = get_new_bucket()
@ -502,21 +464,15 @@ def _do_test_object_storage_class_copy(obj_write_func, size):
copy_object_storage_class(src_bucket, src_key, dest_bucket, dest_key, new_storage_class)
verify_object(dest_bucket, dest_key, data, new_storage_class)
@attr(resource='object')
@attr(method='copy')
@attr(operation='test copy object to object with different storage class')
@attr('storage_class')
@attr('fails_on_aws')
@attr('fails_on_dbstore')
@pytest.mark.storage_class
@pytest.mark.fails_on_aws
@pytest.mark.fails_on_dbstore
def test_object_storage_class_copy():
_do_test_object_storage_class_copy(_populate_key, size=9*1024*1024)
@attr(resource='object')
@attr(method='copy')
@attr(operation='test changing objects storage class')
@attr('storage_class')
@attr('fails_on_aws')
@attr('fails_on_dbstore')
@pytest.mark.storage_class
@pytest.mark.fails_on_aws
@pytest.mark.fails_on_dbstore
def test_object_storage_class_copy_multipart():
_do_test_object_storage_class_copy(_populate_multipart_key, size=9*1024*1024)
@ -573,7 +529,7 @@ class FakeFileVerifier(object):
if self.char == None:
self.char = data[0]
self.size += size
eq(data.decode(), self.char*size)
assert data.decode() == self.char*size
def _verify_atomic_key_data(key, size=-1, char=None):
"""
@ -582,7 +538,7 @@ def _verify_atomic_key_data(key, size=-1, char=None):
fp_verify = FakeFileVerifier(char)
key.get_contents_to_file(fp_verify)
if size >= 0:
eq(fp_verify.size, size)
assert fp_verify.size == size
def _test_atomic_dual_conditional_write(file_size):
"""
@ -611,28 +567,20 @@ def _test_atomic_dual_conditional_write(file_size):
# key.set_contents_from_file(fp_c, headers={'If-Match': etag_fp_a})
e = assert_raises(boto.exception.S3ResponseError, key.set_contents_from_file, fp_c,
headers={'If-Match': etag_fp_a})
eq(e.status, 412)
eq(e.reason, 'Precondition Failed')
eq(e.error_code, 'PreconditionFailed')
assert e.status == 412
assert e.reason == 'Precondition Failed'
assert e.error_code == 'PreconditionFailed'
# verify the file
_verify_atomic_key_data(key, file_size, 'B')
@attr(resource='object')
@attr(method='put')
@attr(operation='write one or the other')
@attr(assertion='1MB successful')
@attr('fails_on_aws')
@attr('fails_on_dbstore')
@pytest.mark.fails_on_aws
@pytest.mark.fails_on_dbstore
def test_atomic_dual_conditional_write_1mb():
_test_atomic_dual_conditional_write(1024*1024)
@attr(resource='object')
@attr(method='put')
@attr(operation='write file in deleted bucket')
@attr(assertion='fail 404')
@attr('fails_on_aws')
@attr('fails_on_dbstore')
@pytest.mark.fails_on_aws
@pytest.mark.fails_on_dbstore
def test_atomic_write_bucket_gone():
bucket = get_new_bucket()
@ -644,9 +592,9 @@ def test_atomic_write_bucket_gone():
key = bucket.new_key('foo')
fp_a = FakeWriteFile(1024*1024, 'A', remove_bucket)
e = assert_raises(boto.exception.S3ResponseError, key.set_contents_from_file, fp_a)
eq(e.status, 404)
eq(e.reason, 'Not Found')
eq(e.error_code, 'NoSuchBucket')
assert e.status == 404
assert e.reason == 'Not Found'
assert e.error_code == 'NoSuchBucket'
def _multipart_upload_enc(bucket, s3_key_name, size, part_size=5*1024*1024,
do_list=None, init_headers=None, part_headers=None,
@ -672,12 +620,8 @@ def _multipart_upload_enc(bucket, s3_key_name, size, part_size=5*1024*1024,
@attr(resource='object')
@attr(method='put')
@attr(operation='multipart upload with bad key for uploading chunks')
@attr(assertion='successful')
@attr('encryption')
@attr('fails_on_dbstore')
@pytest.mark.encryption
@pytest.mark.fails_on_dbstore
def test_encryption_sse_c_multipart_invalid_chunks_1():
bucket = get_new_bucket()
key = "multipart_enc"
@ -698,14 +642,10 @@ def test_encryption_sse_c_multipart_invalid_chunks_1():
_multipart_upload_enc, bucket, key, objlen,
init_headers=init_headers, part_headers=part_headers,
metadata={'foo': 'bar'})
eq(e.status, 400)
assert e.status == 400
@attr(resource='object')
@attr(method='put')
@attr(operation='multipart upload with bad md5 for chunks')
@attr(assertion='successful')
@attr('encryption')
@attr('fails_on_dbstore')
@pytest.mark.encryption
@pytest.mark.fails_on_dbstore
def test_encryption_sse_c_multipart_invalid_chunks_2():
bucket = get_new_bucket()
key = "multipart_enc"
@ -726,15 +666,11 @@ def test_encryption_sse_c_multipart_invalid_chunks_2():
_multipart_upload_enc, bucket, key, objlen,
init_headers=init_headers, part_headers=part_headers,
metadata={'foo': 'bar'})
eq(e.status, 400)
assert e.status == 400
@attr(resource='bucket')
@attr(method='get')
@attr(operation='Test Bucket Policy for a user belonging to a different tenant')
@attr(assertion='succeeds')
@attr('fails_with_subdomain')
@attr('bucket-policy')
@attr('fails_on_dbstore')
@pytest.mark.fails_with_subdomain
@pytest.mark.bucket_policy
@pytest.mark.fails_on_dbstore
def test_bucket_policy_different_tenant():
bucket = get_new_bucket()
key = bucket.new_key('asdf')
@ -769,11 +705,8 @@ def test_bucket_policy_different_tenant():
b = new_conn.get_bucket(bucket_name)
b.get_all_keys()
@attr(resource='bucket')
@attr(method='put')
@attr(operation='Test put condition operator end with ifExists')
@attr('bucket-policy')
@attr('fails_on_dbstore')
@pytest.mark.bucket_policy
@pytest.mark.fails_on_dbstore
def test_bucket_policy_set_condition_operator_end_with_IfExists():
bucket = _create_keys(keys=['foo'])
policy = '''{
@ -792,29 +725,25 @@ def test_bucket_policy_set_condition_operator_end_with_IfExists():
}
]
}''' % bucket.name
eq(bucket.set_policy(policy), True)
assert bucket.set_policy(policy) == True
res = _make_request('GET', bucket.name, bucket.get_key("foo"),
request_headers={'referer': 'http://www.example.com/'})
eq(res.status, 200)
assert res.status == 200
res = _make_request('GET', bucket.name, bucket.get_key("foo"),
request_headers={'referer': 'http://www.example.com/index.html'})
eq(res.status, 200)
assert res.status == 200
res = _make_request('GET', bucket.name, bucket.get_key("foo"))
eq(res.status, 200)
assert res.status == 200
res = _make_request('GET', bucket.name, bucket.get_key("foo"),
request_headers={'referer': 'http://example.com'})
eq(res.status, 403)
assert res.status == 403
def _make_arn_resource(path="*"):
return "arn:aws:s3:::{}".format(path)
@attr(resource='object')
@attr(method='put')
@attr(operation='put obj with RequestObjectTag')
@attr(assertion='success')
@attr('tagging')
@attr('bucket-policy')
@attr('fails_on_dbstore')
@pytest.mark.tagging
@pytest.mark.bucket_policy
@pytest.mark.fails_on_dbstore
def test_bucket_policy_put_obj_request_obj_tag():
bucket = get_new_bucket()

View file

@ -1,7 +1,6 @@
import sys
import collections
import nose
import pytest
import string
import random
from pprint import pprint
@ -11,14 +10,11 @@ import socket
from urllib.parse import urlparse
from nose.tools import eq_ as eq, ok_ as ok
from nose.plugins.attrib import attr
from nose.tools import timed
from nose.plugins.skip import SkipTest
from .. import common
from . import (
configfile,
setup_teardown,
get_new_bucket,
get_new_bucket_name,
s3,
@ -43,38 +39,26 @@ ERRORDOC_TEMPLATE = '<html><h1>ErrorDoc</h1><body>{random}</body></html>'
CAN_WEBSITE = None
@attr('fails_on_dbstore')
@pytest.fixture(autouse=True, scope="module")
def check_can_test_website():
global CAN_WEBSITE
# This is a bit expensive, so we cache this
if CAN_WEBSITE is None:
bucket = get_new_bucket()
try:
wsconf = bucket.get_website_configuration()
CAN_WEBSITE = True
except boto.exception.S3ResponseError as e:
if e.status == 404 and e.reason == 'Not Found' and e.error_code in ['NoSuchWebsiteConfiguration', 'NoSuchKey']:
CAN_WEBSITE = True
elif e.status == 405 and e.reason == 'Method Not Allowed' and e.error_code == 'MethodNotAllowed':
# rgw_enable_static_website is false
CAN_WEBSITE = False
elif e.status == 403 and e.reason == 'SignatureDoesNotMatch' and e.error_code == 'Forbidden':
# This is older versions that do not support the website code
CAN_WEBSITE = False
elif e.status == 501 and e.error_code == 'NotImplemented':
CAN_WEBSITE = False
else:
raise RuntimeError("Unknown response in checking if WebsiteConf is supported", e)
finally:
bucket.delete()
if CAN_WEBSITE is True:
bucket = get_new_bucket()
try:
wsconf = bucket.get_website_configuration()
return True
elif CAN_WEBSITE is False:
raise SkipTest
else:
raise RuntimeError("Unknown cached response in checking if WebsiteConf is supported")
except boto.exception.S3ResponseError as e:
if e.status == 404 and e.reason == 'Not Found' and e.error_code in ['NoSuchWebsiteConfiguration', 'NoSuchKey']:
return True
elif e.status == 405 and e.reason == 'Method Not Allowed' and e.error_code == 'MethodNotAllowed':
pytest.skip('rgw_enable_static_website is false')
elif e.status == 403 and e.reason == 'SignatureDoesNotMatch' and e.error_code == 'Forbidden':
# This is older versions that do not support the website code
pytest.skip('static website is not implemented')
elif e.status == 501 and e.error_code == 'NotImplemented':
pytest.skip('static website is not implemented')
else:
raise RuntimeError("Unknown response in checking if WebsiteConf is supported", e)
finally:
bucket.delete()
def make_website_config(xml_fragment):
"""
@ -170,7 +154,7 @@ def _test_website_prep(bucket, xml_template, hardcoded_fields = {}, expect_fail=
# Cleanup for our validation
common.assert_xml_equal(config_xmlcmp, config_xmlnew)
#print("config_xmlcmp\n", config_xmlcmp)
#eq (config_xmlnew, config_xmlcmp)
#assert config_xmlnew == config_xmlcmp
f['WebsiteConfiguration'] = config_xmlcmp
return f
@ -181,9 +165,9 @@ def __website_expected_reponse_status(res, status, reason):
reason = set([reason])
if status is not IGNORE_FIELD:
ok(res.status in status, 'HTTP code was %s should be %s' % (res.status, status))
assert res.status in status, 'HTTP code was %s should be %s' % (res.status, status)
if reason is not IGNORE_FIELD:
ok(res.reason in reason, 'HTTP reason was was %s should be %s' % (res.reason, reason))
assert res.reason in reason, 'HTTP reason was was %s should be %s' % (res.reason, reason)
def _website_expected_default_html(**kwargs):
fields = []
@ -213,22 +197,22 @@ def _website_expected_error_response(res, bucket_name, status, reason, code, con
errorcode = res.getheader('x-amz-error-code', None)
if errorcode is not None:
if code is not IGNORE_FIELD:
eq(errorcode, code)
assert errorcode == code
if not isinstance(content, collections.Container):
content = set([content])
for f in content:
if f is not IGNORE_FIELD and f is not None:
f = bytes(f, 'utf-8')
ok(f in body, 'HTML should contain "%s"' % (f, ))
assert f in body, 'HTML should contain "%s"' % (f, )
def _website_expected_redirect_response(res, status, reason, new_url):
body = res.read()
print(body)
__website_expected_reponse_status(res, status, reason)
loc = res.getheader('Location', None)
eq(loc, new_url, 'Location header should be set "%s" != "%s"' % (loc,new_url,))
ok(len(body) == 0, 'Body of a redirect should be empty')
assert loc == new_url, 'Location header should be set "%s" != "%s"' % (loc,new_url,)
assert len(body) == 0, 'Body of a redirect should be empty'
def _website_request(bucket_name, path, connect_hostname=None, method='GET', timeout=None):
url = get_website_url(proto='http', bucket=bucket_name, path=path)
@ -247,27 +231,16 @@ def _website_request(bucket_name, path, connect_hostname=None, method='GET', tim
return res
# ---------- Non-existant buckets via the website endpoint
@attr(resource='bucket')
@attr(method='get')
@attr(operation='list')
@attr(assertion='non-existant bucket via website endpoint should give NoSuchBucket, exposing security risk')
@attr('s3website')
@attr('fails_on_rgw')
@nose.with_setup(setup=check_can_test_website, teardown=common.teardown)
@pytest.mark.s3website
@pytest.mark.fails_on_rgw
def test_website_nonexistant_bucket_s3():
bucket_name = get_new_bucket_name()
res = _website_request(bucket_name, '')
_website_expected_error_response(res, bucket_name, 404, 'Not Found', 'NoSuchBucket', content=_website_expected_default_html(Code='NoSuchBucket'))
@attr(resource='bucket')
@attr(method='get')
@attr(operation='list')
#@attr(assertion='non-existant bucket via website endpoint should give Forbidden, keeping bucket identity secure')
@attr(assertion='non-existant bucket via website endpoint should give NoSuchBucket')
@attr('s3website')
@attr('fails_on_s3')
@attr('fails_on_dbstore')
@nose.with_setup(setup=check_can_test_website, teardown=common.teardown)
@pytest.mark.s3website
@pytest.mark.fails_on_s3
@pytest.mark.fails_on_dbstore
def test_website_nonexistant_bucket_rgw():
bucket_name = get_new_bucket_name()
res = _website_request(bucket_name, '')
@ -275,14 +248,9 @@ def test_website_nonexistant_bucket_rgw():
_website_expected_error_response(res, bucket_name, 404, 'Not Found', 'NoSuchBucket', content=_website_expected_default_html(Code='NoSuchBucket'))
#------------- IndexDocument only, successes
@attr(resource='bucket')
@attr(method='get')
@attr(operation='list')
@attr(assertion='non-empty public buckets via s3website return page for /, where page is public')
@attr('s3website')
@attr('fails_on_dbstore')
@nose.with_setup(setup=check_can_test_website, teardown=common.teardown)
@timed(10)
@pytest.mark.s3website
@pytest.mark.fails_on_dbstore
@pytest.mark.timeout(10)
def test_website_public_bucket_list_public_index():
bucket = get_new_bucket()
f = _test_website_prep(bucket, WEBSITE_CONFIGS_XMLFRAG['IndexDoc'])
@ -299,18 +267,13 @@ def test_website_public_bucket_list_public_index():
body = res.read()
print(body)
indexstring = bytes(indexstring, 'utf-8')
eq(body, indexstring) # default content should match index.html set content
assert body == indexstring # default content should match index.html set content
__website_expected_reponse_status(res, 200, 'OK')
indexhtml.delete()
bucket.delete()
@attr(resource='bucket')
@attr(method='get')
@attr(operation='list')
@attr(assertion='non-empty private buckets via s3website return page for /, where page is private')
@attr('s3website')
@attr('fails_on_dbstore')
@nose.with_setup(setup=check_can_test_website, teardown=common.teardown)
@pytest.mark.s3website
@pytest.mark.fails_on_dbstore
def test_website_private_bucket_list_public_index():
bucket = get_new_bucket()
f = _test_website_prep(bucket, WEBSITE_CONFIGS_XMLFRAG['IndexDoc'])
@ -329,19 +292,14 @@ def test_website_private_bucket_list_public_index():
body = res.read()
print(body)
indexstring = bytes(indexstring, 'utf-8')
eq(body, indexstring, 'default content should match index.html set content')
assert body == indexstring, 'default content should match index.html set content'
indexhtml.delete()
bucket.delete()
# ---------- IndexDocument only, failures
@attr(resource='bucket')
@attr(method='get')
@attr(operation='list')
@attr(assertion='empty private buckets via s3website return a 403 for /')
@attr('s3website')
@attr('fails_on_dbstore')
@nose.with_setup(setup=check_can_test_website, teardown=common.teardown)
@pytest.mark.s3website
@pytest.mark.fails_on_dbstore
def test_website_private_bucket_list_empty():
bucket = get_new_bucket()
f = _test_website_prep(bucket, WEBSITE_CONFIGS_XMLFRAG['IndexDoc'])
@ -352,13 +310,8 @@ def test_website_private_bucket_list_empty():
_website_expected_error_response(res, bucket.name, 403, 'Forbidden', 'AccessDenied', content=_website_expected_default_html(Code='AccessDenied'))
bucket.delete()
@attr(resource='bucket')
@attr(method='get')
@attr(operation='list')
@attr(assertion='empty public buckets via s3website return a 404 for /')
@attr('s3website')
@attr('fails_on_dbstore')
@nose.with_setup(setup=check_can_test_website, teardown=common.teardown)
@pytest.mark.s3website
@pytest.mark.fails_on_dbstore
def test_website_public_bucket_list_empty():
bucket = get_new_bucket()
f = _test_website_prep(bucket, WEBSITE_CONFIGS_XMLFRAG['IndexDoc'])
@ -368,13 +321,8 @@ def test_website_public_bucket_list_empty():
_website_expected_error_response(res, bucket.name, 404, 'Not Found', 'NoSuchKey', content=_website_expected_default_html(Code='NoSuchKey'))
bucket.delete()
@attr(resource='bucket')
@attr(method='get')
@attr(operation='list')
@attr(assertion='non-empty public buckets via s3website return page for /, where page is private')
@attr('s3website')
@attr('fails_on_dbstore')
@nose.with_setup(setup=check_can_test_website, teardown=common.teardown)
@pytest.mark.s3website
@pytest.mark.fails_on_dbstore
def test_website_public_bucket_list_private_index():
bucket = get_new_bucket()
f = _test_website_prep(bucket, WEBSITE_CONFIGS_XMLFRAG['IndexDoc'])
@ -394,13 +342,8 @@ def test_website_public_bucket_list_private_index():
indexhtml.delete()
bucket.delete()
@attr(resource='bucket')
@attr(method='get')
@attr(operation='list')
@attr(assertion='non-empty private buckets via s3website return page for /, where page is private')
@attr('s3website')
@attr('fails_on_dbstore')
@nose.with_setup(setup=check_can_test_website, teardown=common.teardown)
@pytest.mark.s3website
@pytest.mark.fails_on_dbstore
def test_website_private_bucket_list_private_index():
bucket = get_new_bucket()
f = _test_website_prep(bucket, WEBSITE_CONFIGS_XMLFRAG['IndexDoc'])
@ -421,13 +364,8 @@ def test_website_private_bucket_list_private_index():
bucket.delete()
# ---------- IndexDocument & ErrorDocument, failures due to errordoc assigned but missing
@attr(resource='bucket')
@attr(method='get')
@attr(operation='list')
@attr(assertion='empty private buckets via s3website return a 403 for /, missing errordoc')
@attr('s3website')
@attr('fails_on_dbstore')
@nose.with_setup(setup=check_can_test_website, teardown=common.teardown)
@pytest.mark.s3website
@pytest.mark.fails_on_dbstore
def test_website_private_bucket_list_empty_missingerrordoc():
bucket = get_new_bucket()
f = _test_website_prep(bucket, WEBSITE_CONFIGS_XMLFRAG['IndexDocErrorDoc'])
@ -438,13 +376,8 @@ def test_website_private_bucket_list_empty_missingerrordoc():
bucket.delete()
@attr(resource='bucket')
@attr(method='get')
@attr(operation='list')
@attr(assertion='empty public buckets via s3website return a 404 for /, missing errordoc')
@attr('s3website')
@attr('fails_on_dbstore')
@nose.with_setup(setup=check_can_test_website, teardown=common.teardown)
@pytest.mark.s3website
@pytest.mark.fails_on_dbstore
def test_website_public_bucket_list_empty_missingerrordoc():
bucket = get_new_bucket()
f = _test_website_prep(bucket, WEBSITE_CONFIGS_XMLFRAG['IndexDocErrorDoc'])
@ -454,13 +387,8 @@ def test_website_public_bucket_list_empty_missingerrordoc():
_website_expected_error_response(res, bucket.name, 404, 'Not Found', 'NoSuchKey')
bucket.delete()
@attr(resource='bucket')
@attr(method='get')
@attr(operation='list')
@attr(assertion='non-empty public buckets via s3website return page for /, where page is private, missing errordoc')
@attr('s3website')
@attr('fails_on_dbstore')
@nose.with_setup(setup=check_can_test_website, teardown=common.teardown)
@pytest.mark.s3website
@pytest.mark.fails_on_dbstore
def test_website_public_bucket_list_private_index_missingerrordoc():
bucket = get_new_bucket()
f = _test_website_prep(bucket, WEBSITE_CONFIGS_XMLFRAG['IndexDocErrorDoc'])
@ -479,13 +407,8 @@ def test_website_public_bucket_list_private_index_missingerrordoc():
indexhtml.delete()
bucket.delete()
@attr(resource='bucket')
@attr(method='get')
@attr(operation='list')
@attr(assertion='non-empty private buckets via s3website return page for /, where page is private, missing errordoc')
@attr('s3website')
@attr('fails_on_dbstore')
@nose.with_setup(setup=check_can_test_website, teardown=common.teardown)
@pytest.mark.s3website
@pytest.mark.fails_on_dbstore
def test_website_private_bucket_list_private_index_missingerrordoc():
bucket = get_new_bucket()
f = _test_website_prep(bucket, WEBSITE_CONFIGS_XMLFRAG['IndexDocErrorDoc'])
@ -505,13 +428,8 @@ def test_website_private_bucket_list_private_index_missingerrordoc():
bucket.delete()
# ---------- IndexDocument & ErrorDocument, failures due to errordoc assigned but not accessible
@attr(resource='bucket')
@attr(method='get')
@attr(operation='list')
@attr(assertion='empty private buckets via s3website return a 403 for /, blocked errordoc')
@attr('s3website')
@attr('fails_on_dbstore')
@nose.with_setup(setup=check_can_test_website, teardown=common.teardown)
@pytest.mark.s3website
@pytest.mark.fails_on_dbstore
def test_website_private_bucket_list_empty_blockederrordoc():
bucket = get_new_bucket()
f = _test_website_prep(bucket, WEBSITE_CONFIGS_XMLFRAG['IndexDocErrorDoc'])
@ -529,18 +447,13 @@ def test_website_private_bucket_list_empty_blockederrordoc():
print(body)
_website_expected_error_response(res, bucket.name, 403, 'Forbidden', 'AccessDenied', content=_website_expected_default_html(Code='AccessDenied'), body=body)
errorstring = bytes(errorstring, 'utf-8')
ok(errorstring not in body, 'error content should NOT match error.html set content')
assert errorstring not in body, 'error content should NOT match error.html set content'
errorhtml.delete()
bucket.delete()
@attr(resource='bucket')
@attr(method='get')
@attr(operation='list')
@attr(assertion='check if there is an invalid payload after serving error doc')
@attr('s3website')
@attr('fails_on_dbstore')
@nose.with_setup(setup=check_can_test_website, teardown=common.teardown)
@pytest.mark.s3website
@pytest.mark.fails_on_dbstore
def test_website_public_bucket_list_pubilc_errordoc():
bucket = get_new_bucket()
f = _test_website_prep(bucket, WEBSITE_CONFIGS_XMLFRAG['IndexDocErrorDoc'])
@ -581,18 +494,13 @@ def test_website_public_bucket_list_pubilc_errordoc():
except socket.timeout:
print('no invalid payload')
ok(resp_len == 0, 'invalid payload')
assert resp_len == 0, 'invalid payload'
errorhtml.delete()
bucket.delete()
@attr(resource='bucket')
@attr(method='get')
@attr(operation='list')
@attr(assertion='empty public buckets via s3website return a 404 for /, blocked errordoc')
@attr('s3website')
@attr('fails_on_dbstore')
@nose.with_setup(setup=check_can_test_website, teardown=common.teardown)
@pytest.mark.s3website
@pytest.mark.fails_on_dbstore
def test_website_public_bucket_list_empty_blockederrordoc():
bucket = get_new_bucket()
f = _test_website_prep(bucket, WEBSITE_CONFIGS_XMLFRAG['IndexDocErrorDoc'])
@ -609,18 +517,13 @@ def test_website_public_bucket_list_empty_blockederrordoc():
print(body)
_website_expected_error_response(res, bucket.name, 404, 'Not Found', 'NoSuchKey', content=_website_expected_default_html(Code='NoSuchKey'), body=body)
errorstring = bytes(errorstring, 'utf-8')
ok(errorstring not in body, 'error content should match error.html set content')
assert errorstring not in body, 'error content should match error.html set content'
errorhtml.delete()
bucket.delete()
@attr(resource='bucket')
@attr(method='get')
@attr(operation='list')
@attr(assertion='non-empty public buckets via s3website return page for /, where page is private, blocked errordoc')
@attr('s3website')
@attr('fails_on_dbstore')
@nose.with_setup(setup=check_can_test_website, teardown=common.teardown)
@pytest.mark.s3website
@pytest.mark.fails_on_dbstore
def test_website_public_bucket_list_private_index_blockederrordoc():
bucket = get_new_bucket()
f = _test_website_prep(bucket, WEBSITE_CONFIGS_XMLFRAG['IndexDocErrorDoc'])
@ -642,19 +545,14 @@ def test_website_public_bucket_list_private_index_blockederrordoc():
print(body)
_website_expected_error_response(res, bucket.name, 403, 'Forbidden', 'AccessDenied', content=_website_expected_default_html(Code='AccessDenied'), body=body)
errorstring = bytes(errorstring, 'utf-8')
ok(errorstring not in body, 'error content should match error.html set content')
assert errorstring not in body, 'error content should match error.html set content'
indexhtml.delete()
errorhtml.delete()
bucket.delete()
@attr(resource='bucket')
@attr(method='get')
@attr(operation='list')
@attr(assertion='non-empty private buckets via s3website return page for /, where page is private, blocked errordoc')
@attr('s3website')
@attr('fails_on_dbstore')
@nose.with_setup(setup=check_can_test_website, teardown=common.teardown)
@pytest.mark.s3website
@pytest.mark.fails_on_dbstore
def test_website_private_bucket_list_private_index_blockederrordoc():
bucket = get_new_bucket()
f = _test_website_prep(bucket, WEBSITE_CONFIGS_XMLFRAG['IndexDocErrorDoc'])
@ -676,20 +574,15 @@ def test_website_private_bucket_list_private_index_blockederrordoc():
print(body)
_website_expected_error_response(res, bucket.name, 403, 'Forbidden', 'AccessDenied', content=_website_expected_default_html(Code='AccessDenied'), body=body)
errorstring = bytes(errorstring, 'utf-8')
ok(errorstring not in body, 'error content should match error.html set content')
assert errorstring not in body, 'error content should match error.html set content'
indexhtml.delete()
errorhtml.delete()
bucket.delete()
# ---------- IndexDocument & ErrorDocument, failures with errordoc available
@attr(resource='bucket')
@attr(method='get')
@attr(operation='list')
@attr(assertion='empty private buckets via s3website return a 403 for /, good errordoc')
@attr('s3website')
@nose.with_setup(setup=check_can_test_website, teardown=common.teardown)
@attr('fails_on_dbstore')
@pytest.mark.s3website
@pytest.mark.fails_on_dbstore
def test_website_private_bucket_list_empty_gooderrordoc():
bucket = get_new_bucket()
f = _test_website_prep(bucket, WEBSITE_CONFIGS_XMLFRAG['IndexDocErrorDoc'])
@ -707,13 +600,8 @@ def test_website_private_bucket_list_empty_gooderrordoc():
errorhtml.delete()
bucket.delete()
@attr(resource='bucket')
@attr(method='get')
@attr(operation='list')
@attr(assertion='empty public buckets via s3website return a 404 for /, good errordoc')
@attr('s3website')
@attr('fails_on_dbstore')
@nose.with_setup(setup=check_can_test_website, teardown=common.teardown)
@pytest.mark.s3website
@pytest.mark.fails_on_dbstore
def test_website_public_bucket_list_empty_gooderrordoc():
bucket = get_new_bucket()
f = _test_website_prep(bucket, WEBSITE_CONFIGS_XMLFRAG['IndexDocErrorDoc'])
@ -732,13 +620,8 @@ def test_website_public_bucket_list_empty_gooderrordoc():
errorhtml.delete()
bucket.delete()
@attr(resource='bucket')
@attr(method='get')
@attr(operation='list')
@attr(assertion='non-empty public buckets via s3website return page for /, where page is private')
@attr('s3website')
@attr('fails_on_dbstore')
@nose.with_setup(setup=check_can_test_website, teardown=common.teardown)
@pytest.mark.s3website
@pytest.mark.fails_on_dbstore
def test_website_public_bucket_list_private_index_gooderrordoc():
bucket = get_new_bucket()
f = _test_website_prep(bucket, WEBSITE_CONFIGS_XMLFRAG['IndexDocErrorDoc'])
@ -762,13 +645,8 @@ def test_website_public_bucket_list_private_index_gooderrordoc():
errorhtml.delete()
bucket.delete()
@attr(resource='bucket')
@attr(method='get')
@attr(operation='list')
@attr(assertion='non-empty private buckets via s3website return page for /, where page is private')
@attr('s3website')
@attr('fails_on_dbstore')
@nose.with_setup(setup=check_can_test_website, teardown=common.teardown)
@pytest.mark.s3website
@pytest.mark.fails_on_dbstore
def test_website_private_bucket_list_private_index_gooderrordoc():
bucket = get_new_bucket()
f = _test_website_prep(bucket, WEBSITE_CONFIGS_XMLFRAG['IndexDocErrorDoc'])
@ -793,13 +671,8 @@ def test_website_private_bucket_list_private_index_gooderrordoc():
bucket.delete()
# ------ RedirectAll tests
@attr(resource='bucket')
@attr(method='get')
@attr(operation='list')
@attr(assertion='RedirectAllRequestsTo without protocol should TODO')
@attr('s3website')
@attr('fails_on_dbstore')
@nose.with_setup(setup=check_can_test_website, teardown=common.teardown)
@pytest.mark.s3website
@pytest.mark.fails_on_dbstore
def test_website_bucket_private_redirectall_base():
bucket = get_new_bucket()
f = _test_website_prep(bucket, WEBSITE_CONFIGS_XMLFRAG['RedirectAll'])
@ -811,13 +684,8 @@ def test_website_bucket_private_redirectall_base():
bucket.delete()
@attr(resource='bucket')
@attr(method='get')
@attr(operation='list')
@attr(assertion='RedirectAllRequestsTo without protocol should TODO')
@attr('s3website')
@attr('fails_on_dbstore')
@nose.with_setup(setup=check_can_test_website, teardown=common.teardown)
@pytest.mark.s3website
@pytest.mark.fails_on_dbstore
def test_website_bucket_private_redirectall_path():
bucket = get_new_bucket()
f = _test_website_prep(bucket, WEBSITE_CONFIGS_XMLFRAG['RedirectAll'])
@ -831,13 +699,8 @@ def test_website_bucket_private_redirectall_path():
bucket.delete()
@attr(resource='bucket')
@attr(method='get')
@attr(operation='list')
@attr(assertion='RedirectAllRequestsTo without protocol should TODO')
@attr('s3website')
@attr('fails_on_dbstore')
@nose.with_setup(setup=check_can_test_website, teardown=common.teardown)
@pytest.mark.s3website
@pytest.mark.fails_on_dbstore
def test_website_bucket_private_redirectall_path_upgrade():
bucket = get_new_bucket()
x = string.Template(WEBSITE_CONFIGS_XMLFRAG['RedirectAll+Protocol']).safe_substitute(RedirectAllRequestsTo_Protocol='https')
@ -853,14 +716,9 @@ def test_website_bucket_private_redirectall_path_upgrade():
bucket.delete()
# ------ x-amz redirect tests
@attr(resource='bucket')
@attr(method='get')
@attr(operation='list')
@attr(assertion='x-amz-website-redirect-location should not fire without websiteconf')
@attr('s3website')
@attr('x-amz-website-redirect-location')
@attr('fails_on_dbstore')
@nose.with_setup(setup=check_can_test_website, teardown=common.teardown)
@pytest.mark.s3website
@pytest.mark.s3website_redirect_location
@pytest.mark.fails_on_dbstore
def test_website_xredirect_nonwebsite():
bucket = get_new_bucket()
#f = _test_website_prep(bucket, WEBSITE_CONFIGS_XMLFRAG['RedirectAll'])
@ -872,7 +730,7 @@ def test_website_xredirect_nonwebsite():
headers = {'x-amz-website-redirect-location': redirect_dest}
k.set_contents_from_string(content, headers=headers, policy='public-read')
redirect = k.get_redirect()
eq(k.get_redirect(), redirect_dest)
assert k.get_redirect() == redirect_dest
res = _website_request(bucket.name, '/page')
body = res.read()
@ -886,14 +744,9 @@ def test_website_xredirect_nonwebsite():
k.delete()
bucket.delete()
@attr(resource='bucket')
@attr(method='get')
@attr(operation='list')
@attr(assertion='x-amz-website-redirect-location should fire websiteconf, relative path, public key')
@attr('s3website')
@attr('x-amz-website-redirect-location')
@attr('fails_on_dbstore')
@nose.with_setup(setup=check_can_test_website, teardown=common.teardown)
@pytest.mark.s3website
@pytest.mark.s3website_redirect_location
@pytest.mark.fails_on_dbstore
def test_website_xredirect_public_relative():
bucket = get_new_bucket()
f = _test_website_prep(bucket, WEBSITE_CONFIGS_XMLFRAG['IndexDoc'])
@ -905,7 +758,7 @@ def test_website_xredirect_public_relative():
headers = {'x-amz-website-redirect-location': redirect_dest}
k.set_contents_from_string(content, headers=headers, policy='public-read')
redirect = k.get_redirect()
eq(k.get_redirect(), redirect_dest)
assert k.get_redirect() == redirect_dest
res = _website_request(bucket.name, '/page')
#new_url = get_website_url(bucket_name=bucket.name, path=redirect_dest)
@ -914,14 +767,9 @@ def test_website_xredirect_public_relative():
k.delete()
bucket.delete()
@attr(resource='bucket')
@attr(method='get')
@attr(operation='list')
@attr(assertion='x-amz-website-redirect-location should fire websiteconf, absolute, public key')
@attr('s3website')
@attr('x-amz-website-redirect-location')
@attr('fails_on_dbstore')
@nose.with_setup(setup=check_can_test_website, teardown=common.teardown)
@pytest.mark.s3website
@pytest.mark.s3website_redirect_location
@pytest.mark.fails_on_dbstore
def test_website_xredirect_public_abs():
bucket = get_new_bucket()
f = _test_website_prep(bucket, WEBSITE_CONFIGS_XMLFRAG['IndexDoc'])
@ -933,7 +781,7 @@ def test_website_xredirect_public_abs():
headers = {'x-amz-website-redirect-location': redirect_dest}
k.set_contents_from_string(content, headers=headers, policy='public-read')
redirect = k.get_redirect()
eq(k.get_redirect(), redirect_dest)
assert k.get_redirect() == redirect_dest
res = _website_request(bucket.name, '/page')
new_url = get_website_url(proto='http', hostname='example.com', path='/foo')
@ -942,14 +790,9 @@ def test_website_xredirect_public_abs():
k.delete()
bucket.delete()
@attr(resource='bucket')
@attr(method='get')
@attr(operation='list')
@attr(assertion='x-amz-website-redirect-location should fire websiteconf, relative path, private key')
@attr('s3website')
@attr('x-amz-website-redirect-location')
@attr('fails_on_dbstore')
@nose.with_setup(setup=check_can_test_website, teardown=common.teardown)
@pytest.mark.s3website
@pytest.mark.s3website_redirect_location
@pytest.mark.fails_on_dbstore
def test_website_xredirect_private_relative():
bucket = get_new_bucket()
f = _test_website_prep(bucket, WEBSITE_CONFIGS_XMLFRAG['IndexDoc'])
@ -961,7 +804,7 @@ def test_website_xredirect_private_relative():
headers = {'x-amz-website-redirect-location': redirect_dest}
k.set_contents_from_string(content, headers=headers, policy='private')
redirect = k.get_redirect()
eq(k.get_redirect(), redirect_dest)
assert k.get_redirect() == redirect_dest
res = _website_request(bucket.name, '/page')
# We get a 403 because the page is private
@ -970,14 +813,9 @@ def test_website_xredirect_private_relative():
k.delete()
bucket.delete()
@attr(resource='bucket')
@attr(method='get')
@attr(operation='list')
@attr(assertion='x-amz-website-redirect-location should fire websiteconf, absolute, private key')
@attr('s3website')
@attr('x-amz-website-redirect-location')
@attr('fails_on_dbstore')
@nose.with_setup(setup=check_can_test_website, teardown=common.teardown)
@pytest.mark.s3website
@pytest.mark.s3website_redirect_location
@pytest.mark.fails_on_dbstore
def test_website_xredirect_private_abs():
bucket = get_new_bucket()
f = _test_website_prep(bucket, WEBSITE_CONFIGS_XMLFRAG['IndexDoc'])
@ -989,7 +827,7 @@ def test_website_xredirect_private_abs():
headers = {'x-amz-website-redirect-location': redirect_dest}
k.set_contents_from_string(content, headers=headers, policy='private')
redirect = k.get_redirect()
eq(k.get_redirect(), redirect_dest)
assert k.get_redirect() == redirect_dest
res = _website_request(bucket.name, '/page')
new_url = get_website_url(proto='http', hostname='example.com', path='/foo')
@ -1202,8 +1040,6 @@ def routing_teardown(**kwargs):
print('Deleting', str(o))
o.delete()
@common.with_setup_kwargs(setup=routing_setup, teardown=routing_teardown)
#@timed(10)
def routing_check(*args, **kwargs):
bucket = kwargs['bucket']
args=args[0]
@ -1229,8 +1065,8 @@ def routing_check(*args, **kwargs):
if args['code'] >= 200 and args['code'] < 300:
#body = res.read()
#print(body)
#eq(body, args['content'], 'default content should match index.html set content')
ok(int(res.getheader('Content-Length', -1)) > 0)
#assert body == args['content'], 'default content should match index.html set content'
assert int(res.getheader('Content-Length', -1)) > 0
elif args['code'] >= 300 and args['code'] < 400:
_website_expected_redirect_response(res, args['code'], IGNORE_FIELD, new_url)
elif args['code'] >= 400:
@ -1238,10 +1074,9 @@ def routing_check(*args, **kwargs):
else:
assert(False)
@attr('s3website_RoutingRules')
@attr('s3website')
@attr('fails_on_dbstore')
@nose.with_setup(setup=check_can_test_website, teardown=common.teardown)
@pytest.mark.s3website_routing_rules
@pytest.mark.s3website
@pytest.mark.fails_on_dbstore
def test_routing_generator():
for t in ROUTING_RULES_TESTS:
if 'xml' in t and 'RoutingRules' in t['xml'] and len(t['xml']['RoutingRules']) > 0:

View file

@ -1,11 +1,9 @@
from nose.tools import eq_ as eq
from . import utils
def test_generate():
FIVE_MB = 5 * 1024 * 1024
eq(len(''.join(utils.generate_random(0))), 0)
eq(len(''.join(utils.generate_random(1))), 1)
eq(len(''.join(utils.generate_random(FIVE_MB - 1))), FIVE_MB - 1)
eq(len(''.join(utils.generate_random(FIVE_MB))), FIVE_MB)
eq(len(''.join(utils.generate_random(FIVE_MB + 1))), FIVE_MB + 1)
assert len(''.join(utils.generate_random(0))) == 0
assert len(''.join(utils.generate_random(1))) == 1
assert len(''.join(utils.generate_random(FIVE_MB - 1))) == FIVE_MB - 1
assert len(''.join(utils.generate_random(FIVE_MB))) == FIVE_MB
assert len(''.join(utils.generate_random(FIVE_MB + 1))) == FIVE_MB + 1

View file

@ -3,8 +3,6 @@ import requests
import string
import time
from nose.tools import eq_ as eq
def assert_raises(excClass, callableObj, *args, **kwargs):
"""
Like unittest.TestCase.assertRaises, but returns the exception.
@ -48,7 +46,7 @@ def region_sync_meta(targets, region):
conf = r.conf
if conf.sync_agent_addr:
ret = requests.post('http://{addr}:{port}/metadata/incremental'.format(addr = conf.sync_agent_addr, port = conf.sync_agent_port))
eq(ret.status_code, 200)
assert ret.status_code == 200
if conf.sync_meta_wait:
time.sleep(conf.sync_meta_wait)

View file

@ -1,3 +1,4 @@
import pytest
import boto3
from botocore import UNSIGNED
from botocore.client import Config
@ -308,6 +309,16 @@ def teardown():
except:
pass
@pytest.fixture(scope="package")
def configfile():
setup()
return config
@pytest.fixture(autouse=True)
def setup_teardown(configfile):
yield
teardown()
def check_webidentity():
cfg = configparser.RawConfigParser()
try:

View file

@ -1,7 +1,5 @@
import boto3
from nose.tools import eq_ as eq
from nose.plugins.attrib import attr
import nose
import pytest
from botocore.exceptions import ClientError
from email.utils import formatdate
@ -10,6 +8,8 @@ from .utils import _get_status_and_error_code
from .utils import _get_status
from . import (
configfile,
setup_teardown,
get_client,
get_v2_client,
get_new_bucket,
@ -149,156 +149,97 @@ def _remove_header_create_bad_bucket(remove, client=None):
return e
def tag(*tags):
def wrap(func):
for tag in tags:
setattr(func, tag, True)
return func
return wrap
#
# common tests
#
@tag('auth_common')
@attr(resource='object')
@attr(method='put')
@attr(operation='create w/invalid MD5')
@attr(assertion='fails 400')
@pytest.mark.auth_common
def test_object_create_bad_md5_invalid_short():
e = _add_header_create_bad_object({'Content-MD5':'YWJyYWNhZGFicmE='})
status, error_code = _get_status_and_error_code(e.response)
eq(status, 400)
eq(error_code, 'InvalidDigest')
assert status == 400
assert error_code == 'InvalidDigest'
@tag('auth_common')
@attr(resource='object')
@attr(method='put')
@attr(operation='create w/mismatched MD5')
@attr(assertion='fails 400')
@pytest.mark.auth_common
def test_object_create_bad_md5_bad():
e = _add_header_create_bad_object({'Content-MD5':'rL0Y20xC+Fzt72VPzMSk2A=='})
status, error_code = _get_status_and_error_code(e.response)
eq(status, 400)
eq(error_code, 'BadDigest')
assert status == 400
assert error_code == 'BadDigest'
@tag('auth_common')
@attr(resource='object')
@attr(method='put')
@attr(operation='create w/empty MD5')
@attr(assertion='fails 400')
@pytest.mark.auth_common
def test_object_create_bad_md5_empty():
e = _add_header_create_bad_object({'Content-MD5':''})
status, error_code = _get_status_and_error_code(e.response)
eq(status, 400)
eq(error_code, 'InvalidDigest')
assert status == 400
assert error_code == 'InvalidDigest'
@tag('auth_common')
@attr(resource='object')
@attr(method='put')
@attr(operation='create w/no MD5 header')
@attr(assertion='succeeds')
@pytest.mark.auth_common
def test_object_create_bad_md5_none():
bucket_name, key_name = _remove_header_create_object('Content-MD5')
client = get_client()
client.put_object(Bucket=bucket_name, Key=key_name, Body='bar')
@tag('auth_common')
@attr(resource='object')
@attr(method='put')
@attr(operation='create w/Expect 200')
@attr(assertion='garbage, but S3 succeeds!')
@pytest.mark.auth_common
def test_object_create_bad_expect_mismatch():
bucket_name, key_name = _add_header_create_object({'Expect': 200})
client = get_client()
client.put_object(Bucket=bucket_name, Key=key_name, Body='bar')
@tag('auth_common')
@attr(resource='object')
@attr(method='put')
@attr(operation='create w/empty expect')
@attr(assertion='succeeds ... should it?')
@pytest.mark.auth_common
def test_object_create_bad_expect_empty():
bucket_name, key_name = _add_header_create_object({'Expect': ''})
client = get_client()
client.put_object(Bucket=bucket_name, Key=key_name, Body='bar')
@tag('auth_common')
@attr(resource='object')
@attr(method='put')
@attr(operation='create w/no expect')
@attr(assertion='succeeds')
@pytest.mark.auth_common
def test_object_create_bad_expect_none():
bucket_name, key_name = _remove_header_create_object('Expect')
client = get_client()
client.put_object(Bucket=bucket_name, Key=key_name, Body='bar')
@tag('auth_common')
@attr(resource='object')
@attr(method='put')
@attr(operation='create w/empty content length')
@attr(assertion='fails 400')
@pytest.mark.auth_common
# TODO: remove 'fails_on_rgw' and once we have learned how to remove the content-length header
@attr('fails_on_rgw')
@pytest.mark.fails_on_rgw
def test_object_create_bad_contentlength_empty():
e = _add_header_create_bad_object({'Content-Length':''})
status, error_code = _get_status_and_error_code(e.response)
eq(status, 400)
assert status == 400
@tag('auth_common')
@attr(resource='object')
@attr(method='put')
@attr(operation='create w/negative content length')
@attr(assertion='fails 400')
@attr('fails_on_mod_proxy_fcgi')
@pytest.mark.auth_common
@pytest.mark.fails_on_mod_proxy_fcgi
def test_object_create_bad_contentlength_negative():
client = get_client()
bucket_name = get_new_bucket()
key_name = 'foo'
e = assert_raises(ClientError, client.put_object, Bucket=bucket_name, Key=key_name, ContentLength=-1)
status = _get_status(e.response)
eq(status, 400)
assert status == 400
@tag('auth_common')
@attr(resource='object')
@attr(method='put')
@attr(operation='create w/no content length')
@attr(assertion='fails 411')
@pytest.mark.auth_common
# TODO: remove 'fails_on_rgw' and once we have learned how to remove the content-length header
@attr('fails_on_rgw')
@pytest.mark.fails_on_rgw
def test_object_create_bad_contentlength_none():
remove = 'Content-Length'
e = _remove_header_create_bad_object('Content-Length')
status, error_code = _get_status_and_error_code(e.response)
eq(status, 411)
eq(error_code, 'MissingContentLength')
assert status == 411
assert error_code == 'MissingContentLength'
@tag('auth_common')
@attr(resource='object')
@attr(method='put')
@attr(operation='create w/content type text/plain')
@attr(assertion='succeeds')
@pytest.mark.auth_common
def test_object_create_bad_contenttype_invalid():
bucket_name, key_name = _add_header_create_object({'Content-Type': 'text/plain'})
client = get_client()
client.put_object(Bucket=bucket_name, Key=key_name, Body='bar')
@tag('auth_common')
@attr(resource='object')
@attr(method='put')
@attr(operation='create w/empty content type')
@attr(assertion='succeeds')
@pytest.mark.auth_common
def test_object_create_bad_contenttype_empty():
client = get_client()
key_name = 'foo'
bucket_name = get_new_bucket()
client.put_object(Bucket=bucket_name, Key=key_name, Body='bar', ContentType='')
@tag('auth_common')
@attr(resource='object')
@attr(method='put')
@attr(operation='create w/no content type')
@attr(assertion='succeeds')
@pytest.mark.auth_common
def test_object_create_bad_contenttype_none():
bucket_name = get_new_bucket()
key_name = 'foo'
@ -307,38 +248,26 @@ def test_object_create_bad_contenttype_none():
client.put_object(Bucket=bucket_name, Key=key_name, Body='bar')
@tag('auth_common')
@attr(resource='object')
@attr(method='put')
@attr(operation='create w/empty authorization')
@attr(assertion='fails 403')
@pytest.mark.auth_common
# TODO: remove 'fails_on_rgw' and once we have learned how to remove the authorization header
@attr('fails_on_rgw')
@pytest.mark.fails_on_rgw
def test_object_create_bad_authorization_empty():
e = _add_header_create_bad_object({'Authorization': ''})
status, error_code = _get_status_and_error_code(e.response)
eq(status, 403)
assert status == 403
@tag('auth_common')
@attr(resource='object')
@attr(method='put')
@attr(operation='create w/date and x-amz-date')
@attr(assertion='succeeds')
@pytest.mark.auth_common
# TODO: remove 'fails_on_rgw' and once we have learned how to pass both the 'Date' and 'X-Amz-Date' header during signing and not 'X-Amz-Date' before
@attr('fails_on_rgw')
@pytest.mark.fails_on_rgw
def test_object_create_date_and_amz_date():
date = formatdate(usegmt=True)
bucket_name, key_name = _add_header_create_object({'Date': date, 'X-Amz-Date': date})
client = get_client()
client.put_object(Bucket=bucket_name, Key=key_name, Body='bar')
@tag('auth_common')
@attr(resource='object')
@attr(method='put')
@attr(operation='create w/x-amz-date and no date')
@attr(assertion='succeeds')
@pytest.mark.auth_common
# TODO: remove 'fails_on_rgw' and once we have learned how to pass both the 'Date' and 'X-Amz-Date' header during signing and not 'X-Amz-Date' before
@attr('fails_on_rgw')
@pytest.mark.fails_on_rgw
def test_object_create_amz_date_and_no_date():
date = formatdate(usegmt=True)
bucket_name, key_name = _add_header_create_object({'Date': '', 'X-Amz-Date': date})
@ -346,36 +275,24 @@ def test_object_create_amz_date_and_no_date():
client.put_object(Bucket=bucket_name, Key=key_name, Body='bar')
# the teardown is really messed up here. check it out
@tag('auth_common')
@attr(resource='object')
@attr(method='put')
@attr(operation='create w/no authorization')
@attr(assertion='fails 403')
@pytest.mark.auth_common
# TODO: remove 'fails_on_rgw' and once we have learned how to remove the authorization header
@attr('fails_on_rgw')
@pytest.mark.fails_on_rgw
def test_object_create_bad_authorization_none():
e = _remove_header_create_bad_object('Authorization')
status, error_code = _get_status_and_error_code(e.response)
eq(status, 403)
assert status == 403
@tag('auth_common')
@attr(resource='bucket')
@attr(method='put')
@attr(operation='create w/no content length')
@attr(assertion='succeeds')
@pytest.mark.auth_common
# TODO: remove 'fails_on_rgw' and once we have learned how to remove the content-length header
@attr('fails_on_rgw')
@pytest.mark.fails_on_rgw
def test_bucket_create_contentlength_none():
remove = 'Content-Length'
_remove_header_create_bucket(remove)
@tag('auth_common')
@attr(resource='bucket')
@attr(method='acls')
@attr(operation='set w/no content length')
@attr(assertion='succeeds')
@pytest.mark.auth_common
# TODO: remove 'fails_on_rgw' and once we have learned how to remove the content-length header
@attr('fails_on_rgw')
@pytest.mark.fails_on_rgw
def test_object_acl_create_contentlength_none():
bucket_name = get_new_bucket()
client = get_client()
@ -389,11 +306,7 @@ def test_object_acl_create_contentlength_none():
client.meta.events.register('before-call.s3.PutObjectAcl', remove_header)
client.put_object_acl(Bucket=bucket_name, Key='foo', ACL='public-read')
@tag('auth_common')
@attr(resource='bucket')
@attr(method='acls')
@attr(operation='set w/invalid permission')
@attr(assertion='fails 400')
@pytest.mark.auth_common
def test_bucket_put_bad_canned_acl():
bucket_name = get_new_bucket()
client = get_client()
@ -404,13 +317,9 @@ def test_bucket_put_bad_canned_acl():
e = assert_raises(ClientError, client.put_bucket_acl, Bucket=bucket_name, ACL='public-read')
status = _get_status(e.response)
eq(status, 400)
assert status == 400
@tag('auth_common')
@attr(resource='bucket')
@attr(method='put')
@attr(operation='create w/expect 200')
@attr(assertion='garbage, but S3 succeeds!')
@pytest.mark.auth_common
def test_bucket_create_bad_expect_mismatch():
bucket_name = get_new_bucket_name()
client = get_client()
@ -420,99 +329,67 @@ def test_bucket_create_bad_expect_mismatch():
client.meta.events.register('before-call.s3.CreateBucket', add_headers)
client.create_bucket(Bucket=bucket_name)
@tag('auth_common')
@attr(resource='bucket')
@attr(method='put')
@attr(operation='create w/expect empty')
@attr(assertion='garbage, but S3 succeeds!')
@pytest.mark.auth_common
def test_bucket_create_bad_expect_empty():
headers = {'Expect': ''}
_add_header_create_bucket(headers)
@tag('auth_common')
@attr(resource='bucket')
@attr(method='put')
@attr(operation='create w/empty content length')
@attr(assertion='fails 400')
@pytest.mark.auth_common
# TODO: The request isn't even making it to the RGW past the frontend
# This test had 'fails_on_rgw' before the move to boto3
@attr('fails_on_rgw')
@pytest.mark.fails_on_rgw
def test_bucket_create_bad_contentlength_empty():
headers = {'Content-Length': ''}
e = _add_header_create_bad_bucket(headers)
status, error_code = _get_status_and_error_code(e.response)
eq(status, 400)
assert status == 400
@tag('auth_common')
@attr(resource='bucket')
@attr(method='put')
@attr(operation='create w/negative content length')
@attr(assertion='fails 400')
@attr('fails_on_mod_proxy_fcgi')
@pytest.mark.auth_common
@pytest.mark.fails_on_mod_proxy_fcgi
def test_bucket_create_bad_contentlength_negative():
headers = {'Content-Length': '-1'}
e = _add_header_create_bad_bucket(headers)
status = _get_status(e.response)
eq(status, 400)
assert status == 400
@tag('auth_common')
@attr(resource='bucket')
@attr(method='put')
@attr(operation='create w/no content length')
@attr(assertion='succeeds')
@pytest.mark.auth_common
# TODO: remove 'fails_on_rgw' and once we have learned how to remove the content-length header
@attr('fails_on_rgw')
@pytest.mark.fails_on_rgw
def test_bucket_create_bad_contentlength_none():
remove = 'Content-Length'
_remove_header_create_bucket(remove)
@tag('auth_common')
@attr(resource='bucket')
@attr(method='put')
@attr(operation='create w/empty authorization')
@attr(assertion='fails 403')
@pytest.mark.auth_common
# TODO: remove 'fails_on_rgw' and once we have learned how to manipulate the authorization header
@attr('fails_on_rgw')
@pytest.mark.fails_on_rgw
def test_bucket_create_bad_authorization_empty():
headers = {'Authorization': ''}
e = _add_header_create_bad_bucket(headers)
status, error_code = _get_status_and_error_code(e.response)
eq(status, 403)
eq(error_code, 'AccessDenied')
assert status == 403
assert error_code == 'AccessDenied'
@tag('auth_common')
@attr(resource='bucket')
@attr(method='put')
@attr(operation='create w/no authorization')
@attr(assertion='fails 403')
@pytest.mark.auth_common
# TODO: remove 'fails_on_rgw' and once we have learned how to manipulate the authorization header
@attr('fails_on_rgw')
@pytest.mark.fails_on_rgw
def test_bucket_create_bad_authorization_none():
e = _remove_header_create_bad_bucket('Authorization')
status, error_code = _get_status_and_error_code(e.response)
eq(status, 403)
eq(error_code, 'AccessDenied')
assert status == 403
assert error_code == 'AccessDenied'
@tag('auth_aws2')
@attr(resource='object')
@attr(method='put')
@attr(operation='create w/invalid MD5')
@attr(assertion='fails 400')
@pytest.mark.auth_aws2
def test_object_create_bad_md5_invalid_garbage_aws2():
v2_client = get_v2_client()
headers = {'Content-MD5': 'AWS HAHAHA'}
e = _add_header_create_bad_object(headers, v2_client)
status, error_code = _get_status_and_error_code(e.response)
eq(status, 400)
eq(error_code, 'InvalidDigest')
assert status == 400
assert error_code == 'InvalidDigest'
@tag('auth_aws2')
@attr(resource='object')
@attr(method='put')
@attr(operation='create w/content length too short')
@attr(assertion='fails 400')
@pytest.mark.auth_aws2
# TODO: remove 'fails_on_rgw' and once we have learned how to manipulate the Content-Length header
@attr('fails_on_rgw')
@pytest.mark.fails_on_rgw
def test_object_create_bad_contentlength_mismatch_below_aws2():
v2_client = get_v2_client()
content = 'bar'
@ -520,252 +397,176 @@ def test_object_create_bad_contentlength_mismatch_below_aws2():
headers = {'Content-Length': str(length)}
e = _add_header_create_bad_object(headers, v2_client)
status, error_code = _get_status_and_error_code(e.response)
eq(status, 400)
eq(error_code, 'BadDigest')
assert status == 400
assert error_code == 'BadDigest'
@tag('auth_aws2')
@attr(resource='object')
@attr(method='put')
@attr(operation='create w/incorrect authorization')
@attr(assertion='fails 403')
@pytest.mark.auth_aws2
# TODO: remove 'fails_on_rgw' and once we have learned how to manipulate the authorization header
@attr('fails_on_rgw')
@pytest.mark.fails_on_rgw
def test_object_create_bad_authorization_incorrect_aws2():
v2_client = get_v2_client()
headers = {'Authorization': 'AWS AKIAIGR7ZNNBHC5BKSUB:FWeDfwojDSdS2Ztmpfeubhd9isU='}
e = _add_header_create_bad_object(headers, v2_client)
status, error_code = _get_status_and_error_code(e.response)
eq(status, 403)
eq(error_code, 'InvalidDigest')
assert status == 403
assert error_code == 'InvalidDigest'
@tag('auth_aws2')
@attr(resource='object')
@attr(method='put')
@attr(operation='create w/invalid authorization')
@attr(assertion='fails 400')
@pytest.mark.auth_aws2
# TODO: remove 'fails_on_rgw' and once we have learned how to manipulate the authorization header
@attr('fails_on_rgw')
@pytest.mark.fails_on_rgw
def test_object_create_bad_authorization_invalid_aws2():
v2_client = get_v2_client()
headers = {'Authorization': 'AWS HAHAHA'}
e = _add_header_create_bad_object(headers, v2_client)
status, error_code = _get_status_and_error_code(e.response)
eq(status, 400)
eq(error_code, 'InvalidArgument')
assert status == 400
assert error_code == 'InvalidArgument'
@tag('auth_aws2')
@attr(resource='object')
@attr(method='put')
@attr(operation='create w/empty user agent')
@attr(assertion='succeeds')
@pytest.mark.auth_aws2
def test_object_create_bad_ua_empty_aws2():
v2_client = get_v2_client()
headers = {'User-Agent': ''}
bucket_name, key_name = _add_header_create_object(headers, v2_client)
v2_client.put_object(Bucket=bucket_name, Key=key_name, Body='bar')
@tag('auth_aws2')
@attr(resource='object')
@attr(method='put')
@attr(operation='create w/no user agent')
@attr(assertion='succeeds')
@pytest.mark.auth_aws2
def test_object_create_bad_ua_none_aws2():
v2_client = get_v2_client()
remove = 'User-Agent'
bucket_name, key_name = _remove_header_create_object(remove, v2_client)
v2_client.put_object(Bucket=bucket_name, Key=key_name, Body='bar')
@tag('auth_aws2')
@attr(resource='object')
@attr(method='put')
@attr(operation='create w/invalid date')
@attr(assertion='fails 403')
@pytest.mark.auth_aws2
def test_object_create_bad_date_invalid_aws2():
v2_client = get_v2_client()
headers = {'x-amz-date': 'Bad Date'}
e = _add_header_create_bad_object(headers, v2_client)
status, error_code = _get_status_and_error_code(e.response)
eq(status, 403)
eq(error_code, 'AccessDenied')
assert status == 403
assert error_code == 'AccessDenied'
@tag('auth_aws2')
@attr(resource='object')
@attr(method='put')
@attr(operation='create w/empty date')
@attr(assertion='fails 403')
@pytest.mark.auth_aws2
def test_object_create_bad_date_empty_aws2():
v2_client = get_v2_client()
headers = {'x-amz-date': ''}
e = _add_header_create_bad_object(headers, v2_client)
status, error_code = _get_status_and_error_code(e.response)
eq(status, 403)
eq(error_code, 'AccessDenied')
assert status == 403
assert error_code == 'AccessDenied'
@tag('auth_aws2')
@attr(resource='object')
@attr(method='put')
@attr(operation='create w/no date')
@attr(assertion='fails 403')
@pytest.mark.auth_aws2
# TODO: remove 'fails_on_rgw' and once we have learned how to remove the date header
@attr('fails_on_rgw')
@pytest.mark.fails_on_rgw
def test_object_create_bad_date_none_aws2():
v2_client = get_v2_client()
remove = 'x-amz-date'
e = _remove_header_create_bad_object(remove, v2_client)
status, error_code = _get_status_and_error_code(e.response)
eq(status, 403)
eq(error_code, 'AccessDenied')
assert status == 403
assert error_code == 'AccessDenied'
@tag('auth_aws2')
@attr(resource='object')
@attr(method='put')
@attr(operation='create w/date in past')
@attr(assertion='fails 403')
@pytest.mark.auth_aws2
def test_object_create_bad_date_before_today_aws2():
v2_client = get_v2_client()
headers = {'x-amz-date': 'Tue, 07 Jul 2010 21:53:04 GMT'}
e = _add_header_create_bad_object(headers, v2_client)
status, error_code = _get_status_and_error_code(e.response)
eq(status, 403)
eq(error_code, 'RequestTimeTooSkewed')
assert status == 403
assert error_code == 'RequestTimeTooSkewed'
@tag('auth_aws2')
@attr(resource='object')
@attr(method='put')
@attr(operation='create w/date before epoch')
@attr(assertion='fails 403')
@pytest.mark.auth_aws2
def test_object_create_bad_date_before_epoch_aws2():
v2_client = get_v2_client()
headers = {'x-amz-date': 'Tue, 07 Jul 1950 21:53:04 GMT'}
e = _add_header_create_bad_object(headers, v2_client)
status, error_code = _get_status_and_error_code(e.response)
eq(status, 403)
eq(error_code, 'AccessDenied')
assert status == 403
assert error_code == 'AccessDenied'
@tag('auth_aws2')
@attr(resource='object')
@attr(method='put')
@attr(operation='create w/date after 9999')
@attr(assertion='fails 403')
@pytest.mark.auth_aws2
def test_object_create_bad_date_after_end_aws2():
v2_client = get_v2_client()
headers = {'x-amz-date': 'Tue, 07 Jul 9999 21:53:04 GMT'}
e = _add_header_create_bad_object(headers, v2_client)
status, error_code = _get_status_and_error_code(e.response)
eq(status, 403)
eq(error_code, 'RequestTimeTooSkewed')
assert status == 403
assert error_code == 'RequestTimeTooSkewed'
@tag('auth_aws2')
@attr(resource='bucket')
@attr(method='put')
@attr(operation='create w/invalid authorization')
@attr(assertion='fails 400')
@pytest.mark.auth_aws2
# TODO: remove 'fails_on_rgw' and once we have learned how to remove the date header
@attr('fails_on_rgw')
@pytest.mark.fails_on_rgw
def test_bucket_create_bad_authorization_invalid_aws2():
v2_client = get_v2_client()
headers = {'Authorization': 'AWS HAHAHA'}
e = _add_header_create_bad_bucket(headers, v2_client)
status, error_code = _get_status_and_error_code(e.response)
eq(status, 400)
eq(error_code, 'InvalidArgument')
assert status == 400
assert error_code == 'InvalidArgument'
@tag('auth_aws2')
@attr(resource='bucket')
@attr(method='put')
@attr(operation='create w/empty user agent')
@attr(assertion='succeeds')
@pytest.mark.auth_aws2
def test_bucket_create_bad_ua_empty_aws2():
v2_client = get_v2_client()
headers = {'User-Agent': ''}
_add_header_create_bucket(headers, v2_client)
@tag('auth_aws2')
@attr(resource='bucket')
@attr(method='put')
@attr(operation='create w/no user agent')
@attr(assertion='succeeds')
@pytest.mark.auth_aws2
def test_bucket_create_bad_ua_none_aws2():
v2_client = get_v2_client()
remove = 'User-Agent'
_remove_header_create_bucket(remove, v2_client)
@tag('auth_aws2')
@attr(resource='bucket')
@attr(method='put')
@attr(operation='create w/invalid date')
@attr(assertion='fails 403')
@pytest.mark.auth_aws2
def test_bucket_create_bad_date_invalid_aws2():
v2_client = get_v2_client()
headers = {'x-amz-date': 'Bad Date'}
e = _add_header_create_bad_bucket(headers, v2_client)
status, error_code = _get_status_and_error_code(e.response)
eq(status, 403)
eq(error_code, 'AccessDenied')
assert status == 403
assert error_code == 'AccessDenied'
@tag('auth_aws2')
@attr(resource='bucket')
@attr(method='put')
@attr(operation='create w/empty date')
@attr(assertion='fails 403')
@pytest.mark.auth_aws2
def test_bucket_create_bad_date_empty_aws2():
v2_client = get_v2_client()
headers = {'x-amz-date': ''}
e = _add_header_create_bad_bucket(headers, v2_client)
status, error_code = _get_status_and_error_code(e.response)
eq(status, 403)
eq(error_code, 'AccessDenied')
assert status == 403
assert error_code == 'AccessDenied'
@tag('auth_aws2')
@attr(resource='bucket')
@attr(method='put')
@attr(operation='create w/no date')
@attr(assertion='fails 403')
@pytest.mark.auth_aws2
# TODO: remove 'fails_on_rgw' and once we have learned how to remove the date header
@attr('fails_on_rgw')
@pytest.mark.fails_on_rgw
def test_bucket_create_bad_date_none_aws2():
v2_client = get_v2_client()
remove = 'x-amz-date'
e = _remove_header_create_bad_bucket(remove, v2_client)
status, error_code = _get_status_and_error_code(e.response)
eq(status, 403)
eq(error_code, 'AccessDenied')
assert status == 403
assert error_code == 'AccessDenied'
@tag('auth_aws2')
@attr(resource='bucket')
@attr(method='put')
@attr(operation='create w/date in past')
@attr(assertion='fails 403')
@pytest.mark.auth_aws2
def test_bucket_create_bad_date_before_today_aws2():
v2_client = get_v2_client()
headers = {'x-amz-date': 'Tue, 07 Jul 2010 21:53:04 GMT'}
e = _add_header_create_bad_bucket(headers, v2_client)
status, error_code = _get_status_and_error_code(e.response)
eq(status, 403)
eq(error_code, 'RequestTimeTooSkewed')
assert status == 403
assert error_code == 'RequestTimeTooSkewed'
@tag('auth_aws2')
@attr(resource='bucket')
@attr(method='put')
@attr(operation='create w/date in future')
@attr(assertion='fails 403')
@pytest.mark.auth_aws2
def test_bucket_create_bad_date_after_today_aws2():
v2_client = get_v2_client()
headers = {'x-amz-date': 'Tue, 07 Jul 2030 21:53:04 GMT'}
e = _add_header_create_bad_bucket(headers, v2_client)
status, error_code = _get_status_and_error_code(e.response)
eq(status, 403)
eq(error_code, 'RequestTimeTooSkewed')
assert status == 403
assert error_code == 'RequestTimeTooSkewed'
@tag('auth_aws2')
@attr(resource='bucket')
@attr(method='put')
@attr(operation='create w/date before epoch')
@attr(assertion='fails 403')
@pytest.mark.auth_aws2
def test_bucket_create_bad_date_before_epoch_aws2():
v2_client = get_v2_client()
headers = {'x-amz-date': 'Tue, 07 Jul 1950 21:53:04 GMT'}
e = _add_header_create_bad_bucket(headers, v2_client)
status, error_code = _get_status_and_error_code(e.response)
eq(status, 403)
eq(error_code, 'AccessDenied')
assert status == 403
assert error_code == 'AccessDenied'

View file

@ -1,12 +1,13 @@
import json
from botocore.exceptions import ClientError
from nose.plugins.attrib import attr
from nose.tools import eq_ as eq
import pytest
from s3tests_boto3.functional.utils import assert_raises
from s3tests_boto3.functional.test_s3 import _multipart_upload
from . import (
configfile,
setup_teardown,
get_alt_client,
get_iam_client,
get_new_bucket,
@ -17,12 +18,8 @@ from . import (
from .utils import _get_status, _get_status_and_error_code
@attr(resource='user-policy')
@attr(method='put')
@attr(operation='Verify Put User Policy')
@attr(assertion='succeeds')
@attr('user-policy')
@attr('test_of_iam')
@pytest.mark.user_policy
@pytest.mark.test_of_iam
def test_put_user_policy():
client = get_iam_client()
@ -35,18 +32,14 @@ def test_put_user_policy():
)
response = client.put_user_policy(PolicyDocument=policy_document, PolicyName='AllAccessPolicy',
UserName=get_alt_user_id())
eq(response['ResponseMetadata']['HTTPStatusCode'], 200)
assert response['ResponseMetadata']['HTTPStatusCode'] == 200
response = client.delete_user_policy(PolicyName='AllAccessPolicy',
UserName=get_alt_user_id())
eq(response['ResponseMetadata']['HTTPStatusCode'], 200)
assert response['ResponseMetadata']['HTTPStatusCode'] == 200
@attr(resource='user-policy')
@attr(method='put')
@attr(operation='Verify Put User Policy with invalid user')
@attr(assertion='succeeds')
@attr('user-policy')
@attr('test_of_iam')
@pytest.mark.user_policy
@pytest.mark.test_of_iam
def test_put_user_policy_invalid_user():
client = get_iam_client()
@ -60,15 +53,11 @@ def test_put_user_policy_invalid_user():
e = assert_raises(ClientError, client.put_user_policy, PolicyDocument=policy_document,
PolicyName='AllAccessPolicy', UserName="some-non-existing-user-id")
status = _get_status(e.response)
eq(status, 404)
assert status == 404
@attr(resource='user-policy')
@attr(method='put')
@attr(operation='Verify Put User Policy using parameter value outside limit')
@attr(assertion='succeeds')
@attr('user-policy')
@attr('test_of_iam')
@pytest.mark.user_policy
@pytest.mark.test_of_iam
def test_put_user_policy_parameter_limit():
client = get_iam_client()
@ -83,16 +72,12 @@ def test_put_user_policy_parameter_limit():
e = assert_raises(ClientError, client.put_user_policy, PolicyDocument=policy_document,
PolicyName='AllAccessPolicy' * 10, UserName=get_alt_user_id())
status = _get_status(e.response)
eq(status, 400)
assert status == 400
@attr(resource='user-policy')
@attr(method='put')
@attr(operation='Verify Put User Policy using invalid policy document elements')
@attr(assertion='succeeds')
@attr('user-policy')
@attr('test_of_iam')
@attr('fails_on_rgw')
@pytest.mark.user_policy
@pytest.mark.test_of_iam
@pytest.mark.fails_on_rgw
def test_put_user_policy_invalid_element():
client = get_iam_client()
@ -108,7 +93,7 @@ def test_put_user_policy_invalid_element():
e = assert_raises(ClientError, client.put_user_policy, PolicyDocument=policy_document,
PolicyName='AllAccessPolicy', UserName=get_alt_user_id())
status = _get_status(e.response)
eq(status, 400)
assert status == 400
# With no Statement
policy_document = json.dumps(
@ -119,7 +104,7 @@ def test_put_user_policy_invalid_element():
e = assert_raises(ClientError, client.put_user_policy, PolicyDocument=policy_document,
PolicyName='AllAccessPolicy', UserName=get_alt_user_id())
status = _get_status(e.response)
eq(status, 400)
assert status == 400
# with same Sid for 2 statements
policy_document = json.dumps(
@ -138,7 +123,7 @@ def test_put_user_policy_invalid_element():
e = assert_raises(ClientError, client.put_user_policy, PolicyDocument=policy_document,
PolicyName='AllAccessPolicy', UserName=get_alt_user_id())
status = _get_status(e.response)
eq(status, 400)
assert status == 400
# with Principal
policy_document = json.dumps(
@ -153,15 +138,11 @@ def test_put_user_policy_invalid_element():
e = assert_raises(ClientError, client.put_user_policy, PolicyDocument=policy_document,
PolicyName='AllAccessPolicy', UserName=get_alt_user_id())
status = _get_status(e.response)
eq(status, 400)
assert status == 400
@attr(resource='user-policy')
@attr(method='put')
@attr(operation='Verify Put a policy that already exists')
@attr(assertion='succeeds')
@attr('user-policy')
@attr('test_of_iam')
@pytest.mark.user_policy
@pytest.mark.test_of_iam
def test_put_existing_user_policy():
client = get_iam_client()
@ -175,18 +156,14 @@ def test_put_existing_user_policy():
)
response = client.put_user_policy(PolicyDocument=policy_document, PolicyName='AllAccessPolicy',
UserName=get_alt_user_id())
eq(response['ResponseMetadata']['HTTPStatusCode'], 200)
assert response['ResponseMetadata']['HTTPStatusCode'] == 200
client.put_user_policy(PolicyDocument=policy_document, PolicyName='AllAccessPolicy',
UserName=get_alt_user_id())
client.delete_user_policy(PolicyName='AllAccessPolicy', UserName=get_alt_user_id())
@attr(resource='user-policy')
@attr(method='put')
@attr(operation='Verify List User policies')
@attr(assertion='succeeds')
@attr('user-policy')
@attr('test_of_iam')
@pytest.mark.user_policy
@pytest.mark.test_of_iam
def test_list_user_policy():
client = get_iam_client()
@ -200,31 +177,23 @@ def test_list_user_policy():
)
response = client.put_user_policy(PolicyDocument=policy_document, PolicyName='AllAccessPolicy',
UserName=get_alt_user_id())
eq(response['ResponseMetadata']['HTTPStatusCode'], 200)
assert response['ResponseMetadata']['HTTPStatusCode'] == 200
response = client.list_user_policies(UserName=get_alt_user_id())
eq(response['ResponseMetadata']['HTTPStatusCode'], 200)
assert response['ResponseMetadata']['HTTPStatusCode'] == 200
client.delete_user_policy(PolicyName='AllAccessPolicy', UserName=get_alt_user_id())
@attr(resource='user-policy')
@attr(method='put')
@attr(operation='Verify List User policies with invalid user')
@attr(assertion='succeeds')
@attr('user-policy')
@attr('test_of_iam')
@pytest.mark.user_policy
@pytest.mark.test_of_iam
def test_list_user_policy_invalid_user():
client = get_iam_client()
e = assert_raises(ClientError, client.list_user_policies, UserName="some-non-existing-user-id")
status = _get_status(e.response)
eq(status, 404)
assert status == 404
@attr(resource='user-policy')
@attr(method='get')
@attr(operation='Verify Get User policy')
@attr(assertion='succeeds')
@attr('user-policy')
@attr('test_of_iam')
@pytest.mark.user_policy
@pytest.mark.test_of_iam
def test_get_user_policy():
client = get_iam_client()
@ -237,21 +206,17 @@ def test_get_user_policy():
)
response = client.put_user_policy(PolicyDocument=policy_document, PolicyName='AllAccessPolicy',
UserName=get_alt_user_id())
eq(response['ResponseMetadata']['HTTPStatusCode'], 200)
assert response['ResponseMetadata']['HTTPStatusCode'] == 200
response = client.get_user_policy(PolicyName='AllAccessPolicy', UserName=get_alt_user_id())
eq(response['ResponseMetadata']['HTTPStatusCode'], 200)
assert response['ResponseMetadata']['HTTPStatusCode'] == 200
response = client.delete_user_policy(PolicyName='AllAccessPolicy',
UserName=get_alt_user_id())
eq(response['ResponseMetadata']['HTTPStatusCode'], 200)
assert response['ResponseMetadata']['HTTPStatusCode'] == 200
@attr(resource='user-policy')
@attr(method='get')
@attr(operation='Verify Get User Policy with invalid user')
@attr(assertion='succeeds')
@attr('user-policy')
@attr('test_of_iam')
@pytest.mark.user_policy
@pytest.mark.test_of_iam
def test_get_user_policy_invalid_user():
client = get_iam_client()
@ -264,21 +229,17 @@ def test_get_user_policy_invalid_user():
)
response = client.put_user_policy(PolicyDocument=policy_document, PolicyName='AllAccessPolicy',
UserName=get_alt_user_id())
eq(response['ResponseMetadata']['HTTPStatusCode'], 200)
assert response['ResponseMetadata']['HTTPStatusCode'] == 200
e = assert_raises(ClientError, client.get_user_policy, PolicyName='AllAccessPolicy',
UserName="some-non-existing-user-id")
status = _get_status(e.response)
eq(status, 404)
assert status == 404
client.delete_user_policy(PolicyName='AllAccessPolicy', UserName=get_alt_user_id())
@attr(resource='user-policy')
@attr(method='get')
@attr(operation='Verify Get User Policy with invalid policy name')
@attr(assertion='succeeds')
@attr('user-policy')
@attr('test_of_iam')
@attr('fails_on_rgw')
@pytest.mark.user_policy
@pytest.mark.test_of_iam
@pytest.mark.fails_on_rgw
def test_get_user_policy_invalid_policy_name():
client = get_iam_client()
@ -294,17 +255,13 @@ def test_get_user_policy_invalid_policy_name():
e = assert_raises(ClientError, client.get_user_policy, PolicyName='non-existing-policy-name',
UserName=get_alt_user_id())
status = _get_status(e.response)
eq(status, 404)
assert status == 404
client.delete_user_policy(PolicyName='AllAccessPolicy', UserName=get_alt_user_id())
@attr(resource='user-policy')
@attr(method='get')
@attr(operation='Verify Get Deleted User Policy')
@attr(assertion='succeeds')
@attr('user-policy')
@attr('test_of_iam')
@attr('fails_on_rgw')
@pytest.mark.user_policy
@pytest.mark.test_of_iam
@pytest.mark.fails_on_rgw
def test_get_deleted_user_policy():
client = get_iam_client()
@ -321,15 +278,11 @@ def test_get_deleted_user_policy():
e = assert_raises(ClientError, client.get_user_policy, PolicyName='AllAccessPolicy',
UserName=get_alt_user_id())
status = _get_status(e.response)
eq(status, 404)
assert status == 404
@attr(resource='user-policy')
@attr(method='get')
@attr(operation='Verify Get a policy from multiple policies for a user')
@attr(assertion='succeeds')
@attr('user-policy')
@attr('test_of_iam')
@pytest.mark.user_policy
@pytest.mark.test_of_iam
def test_get_user_policy_from_multiple_policies():
client = get_iam_client()
@ -344,29 +297,25 @@ def test_get_user_policy_from_multiple_policies():
response = client.put_user_policy(PolicyDocument=policy_document_allow,
PolicyName='AllowAccessPolicy1',
UserName=get_alt_user_id())
eq(response['ResponseMetadata']['HTTPStatusCode'], 200)
assert response['ResponseMetadata']['HTTPStatusCode'] == 200
response = client.put_user_policy(PolicyDocument=policy_document_allow,
PolicyName='AllowAccessPolicy2',
UserName=get_alt_user_id())
eq(response['ResponseMetadata']['HTTPStatusCode'], 200)
assert response['ResponseMetadata']['HTTPStatusCode'] == 200
response = client.get_user_policy(PolicyName='AllowAccessPolicy2',
UserName=get_alt_user_id())
eq(response['ResponseMetadata']['HTTPStatusCode'], 200)
assert response['ResponseMetadata']['HTTPStatusCode'] == 200
response = client.delete_user_policy(PolicyName='AllowAccessPolicy1',
UserName=get_alt_user_id())
eq(response['ResponseMetadata']['HTTPStatusCode'], 200)
assert response['ResponseMetadata']['HTTPStatusCode'] == 200
response = client.delete_user_policy(PolicyName='AllowAccessPolicy2',
UserName=get_alt_user_id())
eq(response['ResponseMetadata']['HTTPStatusCode'], 200)
assert response['ResponseMetadata']['HTTPStatusCode'] == 200
@attr(resource='user-policy')
@attr(method='delete')
@attr(operation='Verify Delete User Policy')
@attr(assertion='succeeds')
@attr('user-policy')
@attr('test_of_iam')
@pytest.mark.user_policy
@pytest.mark.test_of_iam
def test_delete_user_policy():
client = get_iam_client()
@ -381,18 +330,14 @@ def test_delete_user_policy():
response = client.put_user_policy(PolicyDocument=policy_document_allow,
PolicyName='AllowAccessPolicy',
UserName=get_alt_user_id())
eq(response['ResponseMetadata']['HTTPStatusCode'], 200)
assert response['ResponseMetadata']['HTTPStatusCode'] == 200
response = client.delete_user_policy(PolicyName='AllowAccessPolicy',
UserName=get_alt_user_id())
eq(response['ResponseMetadata']['HTTPStatusCode'], 200)
assert response['ResponseMetadata']['HTTPStatusCode'] == 200
@attr(resource='user-policy')
@attr(method='delete')
@attr(operation='Verify Delete User Policy with invalid user')
@attr(assertion='succeeds')
@attr('user-policy')
@attr('test_of_iam')
@pytest.mark.user_policy
@pytest.mark.test_of_iam
def test_delete_user_policy_invalid_user():
client = get_iam_client()
@ -407,22 +352,18 @@ def test_delete_user_policy_invalid_user():
response = client.put_user_policy(PolicyDocument=policy_document_allow,
PolicyName='AllowAccessPolicy',
UserName=get_alt_user_id())
eq(response['ResponseMetadata']['HTTPStatusCode'], 200)
assert response['ResponseMetadata']['HTTPStatusCode'] == 200
e = assert_raises(ClientError, client.delete_user_policy, PolicyName='AllAccessPolicy',
UserName="some-non-existing-user-id")
status = _get_status(e.response)
eq(status, 404)
assert status == 404
response = client.delete_user_policy(PolicyName='AllowAccessPolicy',
UserName=get_alt_user_id())
eq(response['ResponseMetadata']['HTTPStatusCode'], 200)
assert response['ResponseMetadata']['HTTPStatusCode'] == 200
@attr(resource='user-policy')
@attr(method='delete')
@attr(operation='Verify Delete User Policy with invalid policy name')
@attr(assertion='succeeds')
@attr('user-policy')
@attr('test_of_iam')
@pytest.mark.user_policy
@pytest.mark.test_of_iam
def test_delete_user_policy_invalid_policy_name():
client = get_iam_client()
@ -437,22 +378,18 @@ def test_delete_user_policy_invalid_policy_name():
response = client.put_user_policy(PolicyDocument=policy_document_allow,
PolicyName='AllowAccessPolicy',
UserName=get_alt_user_id())
eq(response['ResponseMetadata']['HTTPStatusCode'], 200)
assert response['ResponseMetadata']['HTTPStatusCode'] == 200
e = assert_raises(ClientError, client.delete_user_policy, PolicyName='non-existing-policy-name',
UserName=get_alt_user_id())
status = _get_status(e.response)
eq(status, 404)
assert status == 404
response = client.delete_user_policy(PolicyName='AllowAccessPolicy',
UserName=get_alt_user_id())
eq(response['ResponseMetadata']['HTTPStatusCode'], 200)
assert response['ResponseMetadata']['HTTPStatusCode'] == 200
@attr(resource='user-policy')
@attr(method='delete')
@attr(operation='Verify Delete multiple User policies for a user')
@attr(assertion='succeeds')
@attr('user-policy')
@attr('test_of_iam')
@pytest.mark.user_policy
@pytest.mark.test_of_iam
def test_delete_user_policy_from_multiple_policies():
client = get_iam_client()
@ -467,36 +404,32 @@ def test_delete_user_policy_from_multiple_policies():
response = client.put_user_policy(PolicyDocument=policy_document_allow,
PolicyName='AllowAccessPolicy1',
UserName=get_alt_user_id())
eq(response['ResponseMetadata']['HTTPStatusCode'], 200)
assert response['ResponseMetadata']['HTTPStatusCode'] == 200
response = client.put_user_policy(PolicyDocument=policy_document_allow,
PolicyName='AllowAccessPolicy2',
UserName=get_alt_user_id())
eq(response['ResponseMetadata']['HTTPStatusCode'], 200)
assert response['ResponseMetadata']['HTTPStatusCode'] == 200
response = client.put_user_policy(PolicyDocument=policy_document_allow,
PolicyName='AllowAccessPolicy3',
UserName=get_alt_user_id())
eq(response['ResponseMetadata']['HTTPStatusCode'], 200)
assert response['ResponseMetadata']['HTTPStatusCode'] == 200
response = client.delete_user_policy(PolicyName='AllowAccessPolicy1',
UserName=get_alt_user_id())
eq(response['ResponseMetadata']['HTTPStatusCode'], 200)
assert response['ResponseMetadata']['HTTPStatusCode'] == 200
response = client.delete_user_policy(PolicyName='AllowAccessPolicy2',
UserName=get_alt_user_id())
eq(response['ResponseMetadata']['HTTPStatusCode'], 200)
assert response['ResponseMetadata']['HTTPStatusCode'] == 200
response = client.get_user_policy(PolicyName='AllowAccessPolicy3',
UserName=get_alt_user_id())
eq(response['ResponseMetadata']['HTTPStatusCode'], 200)
assert response['ResponseMetadata']['HTTPStatusCode'] == 200
response = client.delete_user_policy(PolicyName='AllowAccessPolicy3',
UserName=get_alt_user_id())
eq(response['ResponseMetadata']['HTTPStatusCode'], 200)
assert response['ResponseMetadata']['HTTPStatusCode'] == 200
@attr(resource='user-policy')
@attr(method='s3 Actions')
@attr(operation='Verify Allow Bucket Actions in user Policy')
@attr(assertion='succeeds')
@attr('user-policy')
@attr('test_of_iam')
@pytest.mark.user_policy
@pytest.mark.test_of_iam
def test_allow_bucket_actions_in_user_policy():
client = get_iam_client()
s3_client_alt = get_alt_client()
@ -515,7 +448,7 @@ def test_allow_bucket_actions_in_user_policy():
response = client.put_user_policy(PolicyDocument=policy_document_allow,
PolicyName='AllowAccessPolicy', UserName=get_alt_user_id())
eq(response['ResponseMetadata']['HTTPStatusCode'], 200)
assert response['ResponseMetadata']['HTTPStatusCode'] == 200
response = s3_client_alt.list_objects(Bucket=bucket)
object_found = False
@ -527,10 +460,10 @@ def test_allow_bucket_actions_in_user_policy():
raise AssertionError("Object is not listed")
response = s3_client_iam.delete_object(Bucket=bucket, Key='foo')
eq(response['ResponseMetadata']['HTTPStatusCode'], 204)
assert response['ResponseMetadata']['HTTPStatusCode'] == 204
response = s3_client_alt.delete_bucket(Bucket=bucket)
eq(response['ResponseMetadata']['HTTPStatusCode'], 204)
assert response['ResponseMetadata']['HTTPStatusCode'] == 204
response = s3_client_iam.list_buckets()
for bucket in response['Buckets']:
@ -539,16 +472,11 @@ def test_allow_bucket_actions_in_user_policy():
response = client.delete_user_policy(PolicyName='AllowAccessPolicy',
UserName=get_alt_user_id())
eq(response['ResponseMetadata']['HTTPStatusCode'], 200)
assert response['ResponseMetadata']['HTTPStatusCode'] == 200
@attr(resource='user-policy')
@attr(method='s3 Actions')
@attr(operation='Verify Deny Bucket Actions in user Policy')
@attr(assertion='succeeds')
@attr('user-policy')
@attr('test_of_iam')
@attr('fails_on_dbstore')
@pytest.mark.user_policy
@pytest.mark.test_of_iam
def test_deny_bucket_actions_in_user_policy():
client = get_iam_client()
s3_client = get_alt_client()
@ -565,29 +493,25 @@ def test_deny_bucket_actions_in_user_policy():
response = client.put_user_policy(PolicyDocument=policy_document_deny,
PolicyName='DenyAccessPolicy',
UserName=get_alt_user_id())
eq(response['ResponseMetadata']['HTTPStatusCode'], 200)
assert response['ResponseMetadata']['HTTPStatusCode'] == 200
e = assert_raises(ClientError, s3_client.list_buckets, Bucket=bucket)
status, error_code = _get_status_and_error_code(e.response)
eq(status, 403)
eq(error_code, 'AccessDenied')
assert status == 403
assert error_code == 'AccessDenied'
e = assert_raises(ClientError, s3_client.delete_bucket, Bucket=bucket)
status, error_code = _get_status_and_error_code(e.response)
eq(status, 403)
eq(error_code, 'AccessDenied')
assert status == 403
assert error_code == 'AccessDenied'
response = client.delete_user_policy(PolicyName='DenyAccessPolicy',
UserName=get_alt_user_id())
eq(response['ResponseMetadata']['HTTPStatusCode'], 200)
assert response['ResponseMetadata']['HTTPStatusCode'] == 200
response = s3_client.delete_bucket(Bucket=bucket)
eq(response['ResponseMetadata']['HTTPStatusCode'], 204)
assert response['ResponseMetadata']['HTTPStatusCode'] == 204
@attr(resource='user-policy')
@attr(method='s3 Actions')
@attr(operation='Verify Allow Object Actions in user Policy')
@attr(assertion='succeeds')
@attr('user-policy')
@attr('test_of_iam')
@pytest.mark.user_policy
@pytest.mark.test_of_iam
def test_allow_object_actions_in_user_policy():
client = get_iam_client()
s3_client_alt = get_alt_client()
@ -603,35 +527,30 @@ def test_allow_object_actions_in_user_policy():
)
response = client.put_user_policy(PolicyDocument=policy_document_allow,
PolicyName='AllowAccessPolicy', UserName=get_alt_user_id())
eq(response['ResponseMetadata']['HTTPStatusCode'], 200)
assert response['ResponseMetadata']['HTTPStatusCode'] == 200
s3_client_alt.put_object(Bucket=bucket, Key='foo', Body='bar')
response = s3_client_alt.get_object(Bucket=bucket, Key='foo')
body = response['Body'].read()
if type(body) is bytes:
body = body.decode()
eq(body, "bar")
assert body == "bar"
response = s3_client_alt.delete_object(Bucket=bucket, Key='foo')
eq(response['ResponseMetadata']['HTTPStatusCode'], 204)
assert response['ResponseMetadata']['HTTPStatusCode'] == 204
e = assert_raises(ClientError, s3_client_iam.get_object, Bucket=bucket, Key='foo')
status, error_code = _get_status_and_error_code(e.response)
eq(status, 404)
eq(error_code, 'NoSuchKey')
assert status == 404
assert error_code == 'NoSuchKey'
response = s3_client_iam.delete_bucket(Bucket=bucket)
eq(response['ResponseMetadata']['HTTPStatusCode'], 204)
assert response['ResponseMetadata']['HTTPStatusCode'] == 204
response = client.delete_user_policy(PolicyName='AllowAccessPolicy',
UserName=get_alt_user_id())
eq(response['ResponseMetadata']['HTTPStatusCode'], 200)
assert response['ResponseMetadata']['HTTPStatusCode'] == 200
@attr(resource='user-policy')
@attr(method='s3 Actions')
@attr(operation='Verify Deny Object Actions in user Policy')
@attr(assertion='succeeds')
@attr('user-policy')
@attr('test_of_iam')
@attr('fails_on_dbstore')
@pytest.mark.user_policy
@pytest.mark.test_of_iam
def test_deny_object_actions_in_user_policy():
client = get_iam_client()
s3_client_alt = get_alt_client()
@ -653,28 +572,24 @@ def test_deny_object_actions_in_user_policy():
e = assert_raises(ClientError, s3_client_alt.put_object, Bucket=bucket, Key='foo')
status, error_code = _get_status_and_error_code(e.response)
eq(status, 403)
eq(error_code, 'AccessDenied')
assert status == 403
assert error_code == 'AccessDenied'
e = assert_raises(ClientError, s3_client_alt.get_object, Bucket=bucket, Key='foo')
status, error_code = _get_status_and_error_code(e.response)
eq(status, 403)
eq(error_code, 'AccessDenied')
assert status == 403
assert error_code == 'AccessDenied'
e = assert_raises(ClientError, s3_client_alt.delete_object, Bucket=bucket, Key='foo')
status, error_code = _get_status_and_error_code(e.response)
eq(status, 403)
eq(error_code, 'AccessDenied')
assert status == 403
assert error_code == 'AccessDenied'
response = client.delete_user_policy(PolicyName='DenyAccessPolicy',
UserName=get_alt_user_id())
eq(response['ResponseMetadata']['HTTPStatusCode'], 200)
assert response['ResponseMetadata']['HTTPStatusCode'] == 200
@attr(resource='user-policy')
@attr(method='s3 Actions')
@attr(operation='Verify Allow Multipart Actions in user Policy')
@attr(assertion='succeeds')
@attr('user-policy')
@attr('test_of_iam')
@pytest.mark.user_policy
@pytest.mark.test_of_iam
def test_allow_multipart_actions_in_user_policy():
client = get_iam_client()
s3_client_alt = get_alt_client()
@ -690,31 +605,26 @@ def test_allow_multipart_actions_in_user_policy():
)
response = client.put_user_policy(PolicyDocument=policy_document_allow,
PolicyName='AllowAccessPolicy', UserName=get_alt_user_id())
eq(response['ResponseMetadata']['HTTPStatusCode'], 200)
assert response['ResponseMetadata']['HTTPStatusCode'] == 200
key = "mymultipart"
mb = 1024 * 1024
(upload_id, _, _) = _multipart_upload(client=s3_client_iam, bucket_name=bucket, key=key,
size=5 * mb)
response = s3_client_alt.list_multipart_uploads(Bucket=bucket)
eq(response['ResponseMetadata']['HTTPStatusCode'], 200)
assert response['ResponseMetadata']['HTTPStatusCode'] == 200
response = s3_client_alt.abort_multipart_upload(Bucket=bucket, Key=key, UploadId=upload_id)
eq(response['ResponseMetadata']['HTTPStatusCode'], 204)
assert response['ResponseMetadata']['HTTPStatusCode'] == 204
response = s3_client_iam.delete_bucket(Bucket=bucket)
eq(response['ResponseMetadata']['HTTPStatusCode'], 204)
assert response['ResponseMetadata']['HTTPStatusCode'] == 204
response = client.delete_user_policy(PolicyName='AllowAccessPolicy',
UserName=get_alt_user_id())
eq(response['ResponseMetadata']['HTTPStatusCode'], 200)
assert response['ResponseMetadata']['HTTPStatusCode'] == 200
@attr(resource='user-policy')
@attr(method='s3 Actions')
@attr(operation='Verify Deny Multipart Actions in user Policy')
@attr(assertion='succeeds')
@attr('user-policy')
@attr('test_of_iam')
@attr('fails_on_dbstore')
@pytest.mark.user_policy
@pytest.mark.test_of_iam
def test_deny_multipart_actions_in_user_policy():
client = get_iam_client()
s3_client = get_alt_client()
@ -730,7 +640,7 @@ def test_deny_multipart_actions_in_user_policy():
response = client.put_user_policy(PolicyDocument=policy_document_deny,
PolicyName='DenyAccessPolicy',
UserName=get_alt_user_id())
eq(response['ResponseMetadata']['HTTPStatusCode'], 200)
assert response['ResponseMetadata']['HTTPStatusCode'] == 200
key = "mymultipart"
mb = 1024 * 1024
@ -739,29 +649,24 @@ def test_deny_multipart_actions_in_user_policy():
e = assert_raises(ClientError, s3_client.list_multipart_uploads, Bucket=bucket)
status, error_code = _get_status_and_error_code(e.response)
eq(status, 403)
eq(error_code, 'AccessDenied')
assert status == 403
assert error_code == 'AccessDenied'
e = assert_raises(ClientError, s3_client.abort_multipart_upload, Bucket=bucket,
Key=key, UploadId=upload_id)
status, error_code = _get_status_and_error_code(e.response)
eq(status, 403)
eq(error_code, 'AccessDenied')
assert status == 403
assert error_code == 'AccessDenied'
response = s3_client.delete_bucket(Bucket=bucket)
eq(response['ResponseMetadata']['HTTPStatusCode'], 204)
assert response['ResponseMetadata']['HTTPStatusCode'] == 204
response = client.delete_user_policy(PolicyName='DenyAccessPolicy',
UserName=get_alt_user_id())
eq(response['ResponseMetadata']['HTTPStatusCode'], 200)
assert response['ResponseMetadata']['HTTPStatusCode'] == 200
@attr(resource='user-policy')
@attr(method='s3 Actions')
@attr(operation='Verify Allow Tagging Actions in user Policy')
@attr(assertion='succeeds')
@attr('user-policy')
@attr('test_of_iam')
@attr('fails_on_dbstore')
@pytest.mark.user_policy
@pytest.mark.test_of_iam
def test_allow_tagging_actions_in_user_policy():
client = get_iam_client()
s3_client_alt = get_alt_client()
@ -781,37 +686,32 @@ def test_allow_tagging_actions_in_user_policy():
tags = {'TagSet': [{'Key': 'Hello', 'Value': 'World'}, ]}
response = s3_client_alt.put_bucket_tagging(Bucket=bucket, Tagging=tags)
eq(response['ResponseMetadata']['HTTPStatusCode'], 200)
assert response['ResponseMetadata']['HTTPStatusCode'] == 200
response = s3_client_alt.get_bucket_tagging(Bucket=bucket)
eq(response['ResponseMetadata']['HTTPStatusCode'], 200)
eq(response['TagSet'][0]['Key'], 'Hello')
eq(response['TagSet'][0]['Value'], 'World')
assert response['ResponseMetadata']['HTTPStatusCode'] == 200
assert response['TagSet'][0]['Key'] == 'Hello'
assert response['TagSet'][0]['Value'] == 'World'
obj_key = 'obj'
response = s3_client_iam.put_object(Bucket=bucket, Key=obj_key, Body='obj_body')
eq(response['ResponseMetadata']['HTTPStatusCode'], 200)
assert response['ResponseMetadata']['HTTPStatusCode'] == 200
response = s3_client_alt.put_object_tagging(Bucket=bucket, Key=obj_key, Tagging=tags)
eq(response['ResponseMetadata']['HTTPStatusCode'], 200)
assert response['ResponseMetadata']['HTTPStatusCode'] == 200
response = s3_client_alt.get_object_tagging(Bucket=bucket, Key=obj_key)
eq(response['ResponseMetadata']['HTTPStatusCode'], 200)
eq(response['TagSet'], tags['TagSet'])
assert response['ResponseMetadata']['HTTPStatusCode'] == 200
assert response['TagSet'] == tags['TagSet']
response = s3_client_iam.delete_object(Bucket=bucket, Key=obj_key)
eq(response['ResponseMetadata']['HTTPStatusCode'], 204)
assert response['ResponseMetadata']['HTTPStatusCode'] == 204
response = s3_client_iam.delete_bucket(Bucket=bucket)
eq(response['ResponseMetadata']['HTTPStatusCode'], 204)
assert response['ResponseMetadata']['HTTPStatusCode'] == 204
response = client.delete_user_policy(PolicyName='AllowAccessPolicy',
UserName=get_alt_user_id())
eq(response['ResponseMetadata']['HTTPStatusCode'], 200)
assert response['ResponseMetadata']['HTTPStatusCode'] == 200
@attr(resource='user-policy')
@attr(method='s3 Actions')
@attr(operation='Verify Deny Tagging Actions in user Policy')
@attr(assertion='succeeds')
@attr('user-policy')
@attr('test_of_iam')
@attr('fails_on_dbstore')
@pytest.mark.user_policy
@pytest.mark.test_of_iam
def test_deny_tagging_actions_in_user_policy():
client = get_iam_client()
s3_client = get_alt_client()
@ -831,43 +731,38 @@ def test_deny_tagging_actions_in_user_policy():
e = assert_raises(ClientError, s3_client.put_bucket_tagging, Bucket=bucket, Tagging=tags)
status, error_code = _get_status_and_error_code(e.response)
eq(status, 403)
eq(error_code, 'AccessDenied')
assert status == 403
assert error_code == 'AccessDenied'
e = assert_raises(ClientError, s3_client.get_bucket_tagging, Bucket=bucket)
status, error_code = _get_status_and_error_code(e.response)
eq(status, 403)
eq(error_code, 'AccessDenied')
assert status == 403
assert error_code == 'AccessDenied'
obj_key = 'obj'
response = s3_client.put_object(Bucket=bucket, Key=obj_key, Body='obj_body')
eq(response['ResponseMetadata']['HTTPStatusCode'], 200)
assert response['ResponseMetadata']['HTTPStatusCode'] == 200
e = assert_raises(ClientError, s3_client.put_object_tagging, Bucket=bucket, Key=obj_key,
Tagging=tags)
status, error_code = _get_status_and_error_code(e.response)
eq(status, 403)
eq(error_code, 'AccessDenied')
assert status == 403
assert error_code == 'AccessDenied'
e = assert_raises(ClientError, s3_client.delete_object_tagging, Bucket=bucket, Key=obj_key)
status, error_code = _get_status_and_error_code(e.response)
eq(status, 403)
eq(error_code, 'AccessDenied')
assert status == 403
assert error_code == 'AccessDenied'
response = s3_client.delete_object(Bucket=bucket, Key=obj_key)
eq(response['ResponseMetadata']['HTTPStatusCode'], 204)
assert response['ResponseMetadata']['HTTPStatusCode'] == 204
response = s3_client.delete_bucket(Bucket=bucket)
eq(response['ResponseMetadata']['HTTPStatusCode'], 204)
assert response['ResponseMetadata']['HTTPStatusCode'] == 204
response = client.delete_user_policy(PolicyName='DenyAccessPolicy',
UserName=get_alt_user_id())
eq(response['ResponseMetadata']['HTTPStatusCode'], 200)
assert response['ResponseMetadata']['HTTPStatusCode'] == 200
@attr(resource='user-policy')
@attr(method='put')
@attr(operation='Verify conflicting user policy statements')
@attr(assertion='succeeds')
@attr('user-policy')
@attr('test_of_iam')
@attr('fails_on_dbstore')
@pytest.mark.user_policy
@pytest.mark.test_of_iam
def test_verify_conflicting_user_policy_statements():
s3client = get_alt_client()
bucket = get_new_bucket(client=s3client)
@ -887,23 +782,18 @@ def test_verify_conflicting_user_policy_statements():
client = get_iam_client()
response = client.put_user_policy(PolicyDocument=policy_document, PolicyName='DenyAccessPolicy',
UserName=get_alt_user_id())
eq(response['ResponseMetadata']['HTTPStatusCode'], 200)
assert response['ResponseMetadata']['HTTPStatusCode'] == 200
e = assert_raises(ClientError, s3client.list_objects, Bucket=bucket)
status, error_code = _get_status_and_error_code(e.response)
eq(status, 403)
eq(error_code, 'AccessDenied')
assert status == 403
assert error_code == 'AccessDenied'
response = client.delete_user_policy(PolicyName='DenyAccessPolicy',
UserName=get_alt_user_id())
eq(response['ResponseMetadata']['HTTPStatusCode'], 200)
assert response['ResponseMetadata']['HTTPStatusCode'] == 200
@attr(resource='user-policy')
@attr(method='put')
@attr(operation='Verify conflicting user policies')
@attr(assertion='succeeds')
@attr('user-policy')
@attr('test_of_iam')
@attr('fails_on_dbstore')
@pytest.mark.user_policy
@pytest.mark.test_of_iam
def test_verify_conflicting_user_policies():
s3client = get_alt_client()
bucket = get_new_bucket(client=s3client)
@ -924,27 +814,24 @@ def test_verify_conflicting_user_policies():
client = get_iam_client()
response = client.put_user_policy(PolicyDocument=policy_allow, PolicyName='AllowAccessPolicy',
UserName=get_alt_user_id())
eq(response['ResponseMetadata']['HTTPStatusCode'], 200)
assert response['ResponseMetadata']['HTTPStatusCode'] == 200
response = client.put_user_policy(PolicyDocument=policy_deny, PolicyName='DenyAccessPolicy',
UserName=get_alt_user_id())
eq(response['ResponseMetadata']['HTTPStatusCode'], 200)
assert response['ResponseMetadata']['HTTPStatusCode'] == 200
e = assert_raises(ClientError, s3client.list_objects, Bucket=bucket)
status, error_code = _get_status_and_error_code(e.response)
eq(status, 403)
eq(error_code, 'AccessDenied')
assert status == 403
assert error_code == 'AccessDenied'
response = client.delete_user_policy(PolicyName='AllowAccessPolicy',
UserName=get_alt_user_id())
eq(response['ResponseMetadata']['HTTPStatusCode'], 200)
assert response['ResponseMetadata']['HTTPStatusCode'] == 200
response = client.delete_user_policy(PolicyName='DenyAccessPolicy',
UserName=get_alt_user_id())
eq(response['ResponseMetadata']['HTTPStatusCode'], 200)
assert response['ResponseMetadata']['HTTPStatusCode'] == 200
@attr(resource='user-policy')
@attr(operation='Verify Allow Actions for IAM user policies')
@attr(assertion='succeeds')
@attr('user-policy')
@attr('test_of_iam')
@pytest.mark.user_policy
@pytest.mark.test_of_iam
def test_verify_allow_iam_actions():
policy1 = json.dumps(
{"Version": "2012-10-17",
@ -959,12 +846,12 @@ def test_verify_allow_iam_actions():
response = client1.put_user_policy(PolicyDocument=policy1, PolicyName='AllowAccessPolicy',
UserName=get_alt_user_id())
eq(response['ResponseMetadata']['HTTPStatusCode'], 200)
assert response['ResponseMetadata']['HTTPStatusCode'] == 200
response = iam_client_alt.get_user_policy(PolicyName='AllowAccessPolicy',
UserName=get_alt_user_id())
eq(response['ResponseMetadata']['HTTPStatusCode'], 200)
assert response['ResponseMetadata']['HTTPStatusCode'] == 200
response = iam_client_alt.list_user_policies(UserName=get_alt_user_id())
eq(response['ResponseMetadata']['HTTPStatusCode'], 200)
assert response['ResponseMetadata']['HTTPStatusCode'] == 200
response = iam_client_alt.delete_user_policy(PolicyName='AllowAccessPolicy',
UserName=get_alt_user_id())
eq(response['ResponseMetadata']['HTTPStatusCode'], 200)
assert response['ResponseMetadata']['HTTPStatusCode'] == 200

File diff suppressed because it is too large Load diff

View file

@ -1,14 +1,14 @@
import nose
import pytest
import random
import string
import re
from nose.plugins.attrib import attr
from botocore.exceptions import ClientError
import uuid
from nose.tools import eq_ as eq
from . import (
configfile,
setup_teardown,
get_client
)
@ -76,12 +76,12 @@ def generate_s3select_expression_projection(bucket_name,obj_name):
# both results should be close (epsilon)
assert( abs(float(res.split("\n")[1]) - eval(e)) < epsilon )
@attr('s3select')
@pytest.mark.s3select
def get_random_string():
return uuid.uuid4().hex[:6].upper()
@attr('s3select')
@pytest.mark.s3select
def test_generate_where_clause():
# create small csv file for testing the random expressions
@ -93,7 +93,7 @@ def test_generate_where_clause():
for _ in range(100):
generate_s3select_where_clause(bucket_name,obj_name)
@attr('s3select')
@pytest.mark.s3select
def test_generate_projection():
# create small csv file for testing the random expressions
@ -114,8 +114,7 @@ def s3select_assert_result(a,b):
else:
assert a != ""
assert b != ""
nose.tools.assert_equal(a,b)
assert a == b
def create_csv_object_for_datetime(rows,columns):
result = ""
@ -219,7 +218,7 @@ def upload_csv_object(bucket_name,new_key,obj):
# validate uploaded object
c2 = get_client()
response = c2.get_object(Bucket=bucket_name, Key=new_key)
eq(response['Body'].read().decode('utf-8'), obj, 's3select error[ downloaded object not equal to uploaded objecy')
assert response['Body'].read().decode('utf-8') == obj, 's3select error[ downloaded object not equal to uploaded objecy'
def run_s3select(bucket,key,query,column_delim=",",row_delim="\n",quot_char='"',esc_char='\\',csv_header_info="NONE", progress = False):
@ -291,7 +290,7 @@ def remove_xml_tags_from_result(obj):
x = bool(re.search("^failure.*$", result_strip))
if x:
logging.info(result)
nose.tools.assert_equal(x, False)
assert x == False
return result
@ -309,7 +308,7 @@ def create_list_of_int(column_pos,obj,field_split=",",row_split="\n"):
return list_of_int
@attr('s3select')
@pytest.mark.s3select
def test_count_operation():
csv_obj_name = get_random_string()
bucket_name = "test"
@ -320,7 +319,7 @@ def test_count_operation():
s3select_assert_result( num_of_rows, int( res ))
@attr('s3select')
@pytest.mark.s3select
def test_column_sum_min_max():
csv_obj = create_random_csv_object(10000,10)
@ -385,7 +384,7 @@ def test_column_sum_min_max():
s3select_assert_result( int(count)*4 , int(sum1)-int(sum2) )
@attr('s3select')
@pytest.mark.s3select
def test_nullif_expressions():
csv_obj = create_random_csv_object(10000,10)
@ -440,7 +439,7 @@ def test_nullif_expressions():
s3select_assert_result( res_s3select_nullif, res_s3select)
@attr('s3select')
@pytest.mark.s3select
def test_nulliftrue_expressions():
csv_obj = create_random_csv_object(10000,10)
@ -467,7 +466,7 @@ def test_nulliftrue_expressions():
s3select_assert_result( res_s3select_nullif, res_s3select)
@attr('s3select')
@pytest.mark.s3select
def test_is_not_null_expressions():
csv_obj = create_random_csv_object(10000,10)
@ -488,7 +487,7 @@ def test_is_not_null_expressions():
s3select_assert_result( res_s3select_null, res_s3select)
@attr('s3select')
@pytest.mark.s3select
def test_lowerupper_expressions():
csv_obj = create_random_csv_object(1,10)
@ -505,7 +504,7 @@ def test_lowerupper_expressions():
s3select_assert_result( res_s3select, "AB12CD$$")
@attr('s3select')
@pytest.mark.s3select
def test_in_expressions():
# purpose of test: engine is process correctly several projections containing aggregation-functions
@ -575,7 +574,7 @@ def test_in_expressions():
s3select_assert_result( res_s3select_in, res_s3select )
@attr('s3select')
@pytest.mark.s3select
def test_true_false_in_expressions():
csv_obj = create_random_csv_object(10000,10)
@ -620,7 +619,7 @@ def test_true_false_in_expressions():
s3select_assert_result( res_s3select_in, res_s3select )
@attr('s3select')
@pytest.mark.s3select
def test_like_expressions():
csv_obj = create_random_csv_object_string(1000,10)
@ -707,7 +706,7 @@ def test_like_expressions():
s3select_assert_result( res_s3select_like, res_s3select )
@attr('s3select')
@pytest.mark.s3select
def test_truefalselike_expressions():
csv_obj = create_random_csv_object_string(1000,10)
@ -752,7 +751,7 @@ def test_truefalselike_expressions():
s3select_assert_result( res_s3select_like, res_s3select )
@attr('s3select')
@pytest.mark.s3select
def test_nullif_expressions():
csv_obj = create_random_csv_object(10000,10)
@ -765,21 +764,21 @@ def test_nullif_expressions():
res_s3select = remove_xml_tags_from_result( run_s3select(bucket_name,csv_obj_name,"select count(0) from stdin where _1 = _2 ;") ).replace("\n","")
nose.tools.assert_equal( res_s3select_nullif, res_s3select)
assert res_s3select_nullif == res_s3select
res_s3select_nullif = remove_xml_tags_from_result( run_s3select(bucket_name,csv_obj_name,"select count(0) from stdin where not nullif(_1,_2) is null ;") ).replace("\n","")
res_s3select = remove_xml_tags_from_result( run_s3select(bucket_name,csv_obj_name,"select count(0) from stdin where _1 != _2 ;") ).replace("\n","")
nose.tools.assert_equal( res_s3select_nullif, res_s3select)
assert res_s3select_nullif == res_s3select
res_s3select_nullif = remove_xml_tags_from_result( run_s3select(bucket_name,csv_obj_name,"select count(0) from stdin where nullif(_1,_2) = _1 ;") ).replace("\n","")
res_s3select = remove_xml_tags_from_result( run_s3select(bucket_name,csv_obj_name,"select count(0) from stdin where _1 != _2 ;") ).replace("\n","")
nose.tools.assert_equal( res_s3select_nullif, res_s3select)
assert res_s3select_nullif == res_s3select
@attr('s3select')
@pytest.mark.s3select
def test_lowerupper_expressions():
csv_obj = create_random_csv_object(1,10)
@ -790,13 +789,13 @@ def test_lowerupper_expressions():
res_s3select = remove_xml_tags_from_result( run_s3select(bucket_name,csv_obj_name,'select lower("AB12cd$$") from stdin ;') ).replace("\n","")
nose.tools.assert_equal( res_s3select, "ab12cd$$")
assert res_s3select == "ab12cd$$"
res_s3select = remove_xml_tags_from_result( run_s3select(bucket_name,csv_obj_name,'select upper("ab12CD$$") from stdin ;') ).replace("\n","")
nose.tools.assert_equal( res_s3select, "AB12CD$$")
assert res_s3select == "AB12CD$$"
@attr('s3select')
@pytest.mark.s3select
def test_in_expressions():
# purpose of test: engine is process correctly several projections containing aggregation-functions
@ -810,33 +809,33 @@ def test_in_expressions():
res_s3select = remove_xml_tags_from_result( run_s3select(bucket_name,csv_obj_name,'select int(_1) from stdin where int(_1) = 1;')).replace("\n","")
nose.tools.assert_equal( res_s3select_in, res_s3select )
assert res_s3select_in == res_s3select
res_s3select_in = remove_xml_tags_from_result( run_s3select(bucket_name,csv_obj_name,'select int(_1) from stdin where int(_1) in(1,0);')).replace("\n","")
res_s3select = remove_xml_tags_from_result( run_s3select(bucket_name,csv_obj_name,'select int(_1) from stdin where int(_1) = 1 or int(_1) = 0;')).replace("\n","")
nose.tools.assert_equal( res_s3select_in, res_s3select )
assert res_s3select_in == res_s3select
res_s3select_in = remove_xml_tags_from_result( run_s3select(bucket_name,csv_obj_name,'select int(_2) from stdin where int(_2) in(1,0,2);')).replace("\n","")
res_s3select = remove_xml_tags_from_result( run_s3select(bucket_name,csv_obj_name,'select int(_2) from stdin where int(_2) = 1 or int(_2) = 0 or int(_2) = 2;')).replace("\n","")
nose.tools.assert_equal( res_s3select_in, res_s3select )
assert res_s3select_in == res_s3select
res_s3select_in = remove_xml_tags_from_result( run_s3select(bucket_name,csv_obj_name,'select int(_2) from stdin where int(_2)*2 in(int(_3)*2,int(_4)*3,int(_5)*5);')).replace("\n","")
res_s3select = remove_xml_tags_from_result( run_s3select(bucket_name,csv_obj_name,'select int(_2) from stdin where int(_2)*2 = int(_3)*2 or int(_2)*2 = int(_4)*3 or int(_2)*2 = int(_5)*5;')).replace("\n","")
nose.tools.assert_equal( res_s3select_in, res_s3select )
assert res_s3select_in == res_s3select
res_s3select_in = remove_xml_tags_from_result( run_s3select(bucket_name,csv_obj_name,'select int(_1) from stdin where character_length(_1) = 2 and substring(_1,2,1) in ("3");')).replace("\n","")
res_s3select = remove_xml_tags_from_result( run_s3select(bucket_name,csv_obj_name,'select int(_1) from stdin where _1 like "_3";')).replace("\n","")
nose.tools.assert_equal( res_s3select_in, res_s3select )
assert res_s3select_in == res_s3select
@attr('s3select')
@pytest.mark.s3select
def test_like_expressions():
csv_obj = create_random_csv_object_string(10000,10)
@ -849,40 +848,40 @@ def test_like_expressions():
res_s3select = remove_xml_tags_from_result( run_s3select(bucket_name,csv_obj_name, 'select count(*) from stdin where substring(_1,11,4) = "aeio" ;')).replace("\n","")
nose.tools.assert_equal( res_s3select_in, res_s3select )
assert res_s3select_in == res_s3select
res_s3select_in = remove_xml_tags_from_result( run_s3select(bucket_name,csv_obj_name,'select count(*) from stdin where _1 like "cbcd%";')).replace("\n","")
res_s3select = remove_xml_tags_from_result( run_s3select(bucket_name,csv_obj_name, 'select count(*) from stdin where substring(_1,1,4) = "cbcd";')).replace("\n","")
nose.tools.assert_equal( res_s3select_in, res_s3select )
assert res_s3select_in == res_s3select
res_s3select_in = remove_xml_tags_from_result( run_s3select(bucket_name,csv_obj_name,'select count(*) from stdin where _3 like "%y[y-z]";')).replace("\n","")
res_s3select = remove_xml_tags_from_result( run_s3select(bucket_name,csv_obj_name, 'select count(*) from stdin where substring(_3,character_length(_3),1) between "y" and "z" and substring(_3,character_length(_3)-1,1) = "y";')).replace("\n","")
nose.tools.assert_equal( res_s3select_in, res_s3select )
assert res_s3select_in == res_s3select
res_s3select_in = remove_xml_tags_from_result( run_s3select(bucket_name,csv_obj_name,'select count(*) from stdin where _2 like "%yz";')).replace("\n","")
res_s3select = remove_xml_tags_from_result( run_s3select(bucket_name,csv_obj_name, 'select count(*) from stdin where substring(_2,character_length(_2),1) = "z" and substring(_2,character_length(_2)-1,1) = "y";')).replace("\n","")
nose.tools.assert_equal( res_s3select_in, res_s3select )
assert res_s3select_in == res_s3select
res_s3select_in = remove_xml_tags_from_result( run_s3select(bucket_name,csv_obj_name,'select count(*) from stdin where _3 like "c%z";')).replace("\n","")
res_s3select = remove_xml_tags_from_result( run_s3select(bucket_name,csv_obj_name, 'select count(*) from stdin where substring(_3,character_length(_3),1) = "z" and substring(_3,1,1) = "c";')).replace("\n","")
nose.tools.assert_equal( res_s3select_in, res_s3select )
assert res_s3select_in == res_s3select
res_s3select_in = remove_xml_tags_from_result( run_s3select(bucket_name,csv_obj_name,'select count(*) from stdin where _2 like "%xy_";')).replace("\n","")
res_s3select = remove_xml_tags_from_result( run_s3select(bucket_name,csv_obj_name, 'select count(*) from stdin where substring(_2,character_length(_2)-1,1) = "y" and substring(_2,character_length(_2)-2,1) = "x";')).replace("\n","")
nose.tools.assert_equal( res_s3select_in, res_s3select )
assert res_s3select_in == res_s3select
@attr('s3select')
@pytest.mark.s3select
def test_complex_expressions():
# purpose of test: engine is process correctly several projections containing aggregation-functions
@ -914,7 +913,7 @@ def test_complex_expressions():
s3select_assert_result( res_s3select_between_numbers, res_s3select_eq_modolu)
@attr('s3select')
@pytest.mark.s3select
def test_alias():
# purpose: test is comparing result of exactly the same queries , one with alias the other without.
@ -935,7 +934,7 @@ def test_alias():
s3select_assert_result( res_s3select_alias, res_s3select_no_alias)
@attr('s3select')
@pytest.mark.s3select
def test_alias_cyclic_refernce():
number_of_rows = 10000
@ -953,7 +952,7 @@ def test_alias_cyclic_refernce():
assert int(find_res) >= 0
@attr('s3select')
@pytest.mark.s3select
def test_datetime():
# purpose of test is to validate date-time functionality is correct,
@ -984,7 +983,7 @@ def test_datetime():
s3select_assert_result( res_s3select_date_time_to_timestamp, res_s3select_substring)
@attr('s3select')
@pytest.mark.s3select
def test_true_false_datetime():
# purpose of test is to validate date-time functionality is correct,
@ -1018,7 +1017,7 @@ def test_true_false_datetime():
s3select_assert_result( res_s3select_date_time_utcnow, res_s3select_count)
@attr('s3select')
@pytest.mark.s3select
def test_csv_parser():
# purpuse: test default csv values(, \n " \ ), return value may contain meta-char
@ -1058,7 +1057,7 @@ def test_csv_parser():
res_s3select_alias = remove_xml_tags_from_result( run_s3select(bucket_name,csv_obj_name,"select _9 from s3object;") ).replace("\n","")
s3select_assert_result( res_s3select_alias, 'null')
@attr('s3select')
@pytest.mark.s3select
def test_csv_definition():
number_of_rows = 10000
@ -1088,7 +1087,7 @@ def test_csv_definition():
s3select_assert_result( res_s3select, __res )
@attr('s3select')
@pytest.mark.s3select
def test_schema_definition():
number_of_rows = 10000
@ -1123,7 +1122,7 @@ def test_schema_definition():
assert ((res_multiple_defintion.find("multiple definition of column {c4} as schema-column and alias")) >= 0)
@attr('s3select')
@pytest.mark.s3select
def test_when_then_else_expressions():
csv_obj = create_random_csv_object(10000,10)
@ -1152,7 +1151,7 @@ def test_when_then_else_expressions():
s3select_assert_result( str(count3) , res2)
@attr('s3select')
@pytest.mark.s3select
def test_coalesce_expressions():
csv_obj = create_random_csv_object(10000,10)
@ -1174,7 +1173,7 @@ def test_coalesce_expressions():
s3select_assert_result( res_s3select, res_coalesce)
@attr('s3select')
@pytest.mark.s3select
def test_cast_expressions():
csv_obj = create_random_csv_object(10000,10)
@ -1195,7 +1194,7 @@ def test_cast_expressions():
s3select_assert_result( res_s3select, res)
@attr('s3select')
@pytest.mark.s3select
def test_version():
return
@ -1213,7 +1212,7 @@ def test_version():
s3select_assert_result( res_version, "41.a," )
@attr('s3select')
@pytest.mark.s3select
def test_trim_expressions():
csv_obj = create_random_csv_object_trim(10000,10)
@ -1252,7 +1251,7 @@ def test_trim_expressions():
s3select_assert_result( res_s3select_trim, res_s3select )
@attr('s3select')
@pytest.mark.s3select
def test_truefalse_trim_expressions():
csv_obj = create_random_csv_object_trim(10000,10)
@ -1291,7 +1290,7 @@ def test_truefalse_trim_expressions():
s3select_assert_result( res_s3select_trim, res_s3select )
@attr('s3select')
@pytest.mark.s3select
def test_escape_expressions():
csv_obj = create_random_csv_object_escape(10000,10)
@ -1312,7 +1311,7 @@ def test_escape_expressions():
s3select_assert_result( res_s3select_escape, res_s3select )
@attr('s3select')
@pytest.mark.s3select
def test_case_value_expressions():
csv_obj = create_random_csv_object(10000,10)
@ -1327,7 +1326,7 @@ def test_case_value_expressions():
s3select_assert_result( res_s3select_case, res_s3select )
@attr('s3select')
@pytest.mark.s3select
def test_bool_cast_expressions():
csv_obj = create_random_csv_object(10000,10)
@ -1342,7 +1341,7 @@ def test_bool_cast_expressions():
s3select_assert_result( res_s3select_cast, res_s3select )
@attr('s3select')
@pytest.mark.s3select
def test_progress_expressions():
csv_obj = create_random_csv_object(1000000,10)
@ -1369,7 +1368,7 @@ def test_progress_expressions():
# end response
s3select_assert_result({}, res_s3select_response[total_response-1])
@attr('s3select')
@pytest.mark.s3select
def test_output_serial_expressions():
return # TODO fix test

File diff suppressed because it is too large Load diff

View file

@ -1,11 +1,9 @@
from nose.tools import eq_ as eq
from . import utils
def test_generate():
FIVE_MB = 5 * 1024 * 1024
eq(len(''.join(utils.generate_random(0))), 0)
eq(len(''.join(utils.generate_random(1))), 1)
eq(len(''.join(utils.generate_random(FIVE_MB - 1))), FIVE_MB - 1)
eq(len(''.join(utils.generate_random(FIVE_MB))), FIVE_MB)
eq(len(''.join(utils.generate_random(FIVE_MB + 1))), FIVE_MB + 1)
assert len(''.join(utils.generate_random(0))) == 0
assert len(''.join(utils.generate_random(1))) == 1
assert len(''.join(utils.generate_random(FIVE_MB - 1))) == FIVE_MB - 1
assert len(''.join(utils.generate_random(FIVE_MB))) == FIVE_MB
assert len(''.join(utils.generate_random(FIVE_MB + 1))) == FIVE_MB + 1

View file

@ -3,8 +3,6 @@ import requests
import string
import time
from nose.tools import eq_ as eq
def assert_raises(excClass, callableObj, *args, **kwargs):
"""
Like unittest.TestCase.assertRaises, but returns the exception.

7
tox.ini Normal file
View file

@ -0,0 +1,7 @@
[tox]
envlist = py
[testenv]
deps = -rrequirements.txt
passenv = S3TEST_CONF S3_USE_SIGV4
commands = pytest {posargs}