Compare commits

..

67 commits

Author SHA1 Message Date
Casey Bodley
f004493dcc object-lock: test changes between retention modes
Signed-off-by: Casey Bodley <cbodley@redhat.com>
(cherry picked from commit 8662815ebe)
2021-08-12 13:46:22 -04:00
Casey Bodley
9eacf29594 nuke_prefixed_buckets waits up to 60 seconds for object locks to expire
objects locked in GOVERNANCE mode can be removed with
BypassGovernanceRetention, but some tests may leave an object locked in
COMPLIANCE mode, which blocks deletion until the retention period
expires

nuke_prefixed_buckets now checks the retention policy of objects that it
fails to delete with AccessDenied, and will wait up to 60 seconds for
locks to expire before retrying the deletes. if the wait exceeds 60
seconds, it instead throws an error without deleting the bucket

instead of doing this in nuke_prefixed_buckets, we could potentially
have each object-lock test case handle this manually, but that would
add a separate delay to each test case

Signed-off-by: Casey Bodley <cbodley@redhat.com>
(cherry picked from commit 9c4f15a47e)
2021-08-12 13:46:03 -04:00
Casey Bodley
8090ea4629 nuke_prefixed_buckets deletes objects in batches
speed up the cleanup by using delete_objects() with batches of 128

Signed-off-by: Casey Bodley <cbodley@redhat.com>
(cherry picked from commit bb995c2aeb)
2021-08-12 13:45:27 -04:00
Rahul Dev Parashar
df426ea041 rgw/s3_boto3: Add tests for bucket encryption APIs
Tests are added for GetBucketEncryption, PutBucketEncryption,
and DeleteBucketEncryption APIs.

Related PR: https://github.com/ceph/ceph/pull/42222

Signed-off-by: Rahul Dev Parashar <rahul.dev@flipkart.com>
(cherry picked from commit 44643af0b0)
2021-08-12 13:45:16 -04:00
Kalpesh Pandya
057432b9f5 rgw/sts: Addition of new STS tests for testing
session policies alongwith role's permission policy
and bucket policy.

Signed-off-by: Kalpesh Pandya <kapandya@redhat.com>
Signed-off-by: Pritha Srivastava <prsrivas@redhat.com>
(cherry picked from commit 245a93326e)
2021-08-12 13:45:07 -04:00
iraj465
e22a689a44 rgw/s3_boto3:Adds lifecycle transition test for invalid iso8601 date
(cherry picked from commit ba9525f425)
2021-08-12 13:44:56 -04:00
Pei
f344fd6ca7 Fix: Bucket resource leak when cleanup
In the function of nuke_prefixed_buckets, if some err is thrown when deleting buckets, the left buckets remain uncleaned.
It is kind of resoruce leak on some charged platform. We have to clear them manually.

I know the original code is meant to give the user some hint by rasing error. But the resource leak of left buckets is a little annoying.

This PR would skip the problem point and continue the teardown process. The last client error would be saved and re-raised after the loop completes.

Signed-off-by: Pei <huangp0600@126.com>
Signed-off-by: Pei <phuang1@dev-new-3-3854897.slc07.dev.ebayc3.com>
(cherry picked from commit 713012c178)
2021-08-12 13:38:40 -04:00
Ali Maredia
97fb5a7ee3 disable ssl verify by default
Signed-off-by: Ali Maredia <amaredia@redhat.com>
(cherry picked from commit b252638369)
2021-08-11 13:14:33 -04:00
Pragadeeswaran Sathyanarayanan
0fef1637ae Add support for disabling SSL certificate verification
Signed-off-by: Pragadeeswaran Sathyanarayanan <psathyan@redhat.com>
(cherry picked from commit ea3caaa76b)
2021-08-09 14:04:47 -04:00
Mark Houghton
a81ad3515e Add tests for issue 47586.
(cherry picked from commit 7fe0304e9c)
2021-08-05 11:29:25 -04:00
Danny Abukalam
0d7111ffc2 Add test to check retain date is in iso 8601 format
Signed-off-by: Danny Abukalam <danny@softiron.com>
(cherry picked from commit e229d1aaf6)
2021-07-28 09:38:30 -04:00
Casey Bodley
2ad7f81917 sts: test role policy around nonexistent objects
Signed-off-by: Casey Bodley <cbodley@redhat.com>
(cherry picked from commit 7276bee050)
2021-06-28 11:12:08 -04:00
Casey Bodley
72957ece35 test that listed buckets have creation time
Signed-off-by: Casey Bodley <cbodley@redhat.com>
(cherry picked from commit 66ced9af1d)
2021-06-28 11:09:59 -04:00
Or Friedmann
8423389033 test multipart upload with bucket policy
Signed-off-by: Or Friedmann <ofriedma@redhat.com>
(cherry picked from commit cfdf914c4b)
2021-06-07 14:05:58 -04:00
gal salomon
aea3f6b4c3 modifying tests to be align with change of s3select compare sign( == -> = )
Signed-off-by: gal salomon <gal.salomon@gmail.com>
(cherry picked from commit 1572fbc87b)
2021-05-21 20:10:14 -04:00
Albin Antony
6f74f2af07 tests
Signed-off-by: Albin Antony <aantony@redhat.com>
(cherry picked from commit c6a4ab9d12)
2021-05-21 20:10:14 -04:00
Albin Antony
6b412b509b tests
Signed-off-by: Albin Antony <aantony@redhat.com>
(cherry picked from commit e7102e8cb0)
2021-05-21 20:10:14 -04:00
Albin Antony
87993f147d tests
Signed-off-by: Albin Antony <aantony@redhat.com>
(cherry picked from commit 60dd3444b3)
2021-05-21 20:10:14 -04:00
Albin Antony
ecc4cbc5c4 s3select: align s3-tests with new changes in s3select
Fix when then, date functions and NULL, add escape, trim tests

Signed-off-by: Albin Antony <aantony@redhat.com>
(cherry picked from commit 4a86ebbe8b)
2021-05-21 20:09:48 -04:00
galsalomon66
287acbc6e7 change test_s3select.py permission; add s3select attribute
Signed-off-by: galsalomon66 <gal.salomon@gmail.com>
(cherry picked from commit ea7d5fb563)
2021-01-14 00:08:50 -05:00
galsalomon66
68f1939942 reduce object size for test_like_expression, it may cause timeouts in teuthology runs
Signed-off-by: galsalomon66 <gal.salomon@gmail.com>
2020-12-22 00:32:18 -05:00
galsalomon66
a0aa55d4ae rename charlength and character_length function names
Signed-off-by: galsalomon66 <gal.salomon@gmail.com>
2020-12-22 00:32:07 -05:00
Or Friedmann
e3d31ef6eb Add test for head bucket usage headers
Signed-off-by: Or Friedmann <ofriedma@redhat.com>
(cherry picked from commit ef8f65d917)
2020-12-03 16:04:29 -05:00
Or Friedmann
3fe80dc877 Add test for GetUsage api
Signed-off-by: Or Friedmann <ofriedma@redhat.com>
(cherry picked from commit f4f7812efd)
2020-12-03 16:04:14 -05:00
Albin Antony
a5108a7d69 s3select tests for coalesce and case
Signed-off-by: Albin Antony <aantony@redhat.com>
2020-12-03 01:04:02 -05:00
gal salomon
3be10d722f per each new uploaded file(for test), it got unique name(random), and uploaded file is verified for its content
Signed-off-by: gal salomon <gal.salomon@gmail.com>

Signed-off-by: gal salomon <gal.salomon@gmail.com>
2020-12-03 01:03:55 -05:00
Albin Antony
adad16121f s3select predicate tests
Signed-off-by: Albin Antony <aantony@redhat.com>
2020-12-03 00:59:32 -05:00
root
dd163877d4 Webidentity Test addition to test_sts.py
Signed-off-by: Kalpesh Pandya <kapandya@redhat.com>

Few main changes/additions:
1. Webidentity test addition to test_sts.py.
2. A function named check_webidentity() added to __init__.py in order to check for section presence.
3. Few lines shifted from setup() to get_iam_client() to make them execute only when sts-tests run.
4. Documentation update (for sts section)
5. Changes in s3tests.conf.SAMPLE regarding sts sections
2020-11-25 20:34:32 -05:00
Matt Benjamin
86bc2a191f Add test for HeadBucket on a non-existent bucket
n.b., RGW does not send a response document for this operation,
which seems consistent with
https://docs.aws.amazon.com/AmazonS3/latest/API/API_HeadBucket.html

Signed-off-by: Matt Benjamin <mbenjamin@redhat.com>
2020-11-25 00:06:12 -05:00
Ilsoo Byun
3698d093bf Check if invalid payload is added after serving errordoc
Signed-off-by: Ilsoo Byun <ilsoobyun@linecorp.com>
(cherry picked from commit c08de72d55)
2020-11-19 10:24:30 -05:00
Casey Bodley
65b067486e test bucket recreation with different acls
Signed-off-by: Casey Bodley <cbodley@redhat.com>
(cherry picked from commit f6218fa1de)
2020-10-08 13:32:53 -04:00
root
0e36699571 STS issue fix (https://tracker.ceph.com/issues/47588)
This is the fix for the issue reported (https://tracker.ceph.com/issues/47588). The issue was with the argument which was passed to the function. After removing that argument (as it's already an optional argument) the issue is fixed.

Signed-off-by: Kalpesh Pandya <kapandya@redhat.com>
(cherry picked from commit daf9062a22)
2020-10-08 13:32:53 -04:00
Matt Benjamin
3dc4ff5da8 add noncurrent version expiration rule w/tag filter
Create 10 object versions (9 noncurrent).  Install a noncurrent
version expiration at 4 days.  Verify that 10 versions exist at
T+20, and only 1 (current) at T+60.

Signed-off-by: Matt Benjamin <mbenjamin@redhat.com>
2020-10-01 11:30:33 -04:00
Matt Benjamin
c9792cb975 add lifecycle expiration test mixing 2-tag filter w/versioning
By design this test duplicates test_lifecycle_expiration_tags2,
but enables object versioning on the bucket.

The tests install a rule which requires -2- tags to be matched,
and creates 2 objects, one matching only 1 of the required tags,
the other matching both.  Only the 2nd object should expire.

Signed-off-by: Matt Benjamin <mbenjamin@redhat.com>
2020-10-01 11:30:24 -04:00
Matt Benjamin
b930f194e4 add tests for lifecycle expiration w/1 and 2 object tags
Note that the 1-tag case contains a filter prefix--which exposes
an apparent bug parsing Filter when it contains a Prefix element
and a single Tag element (without And).

Signed-off-by: Matt Benjamin <mbenjamin@redhat.com>
2020-10-01 11:30:15 -04:00
Matt Benjamin
253b63aa11 fix lifecycle expiration days: 0
In fact test_lifecycle_expiration_days0 is should fail, as 0-day
expiration is permitted for transition rules but not expiration
rules.

Signed-off-by: Matt Benjamin <mbenjamin@redhat.com>
2020-10-01 11:30:08 -04:00
Matt Benjamin
6bd75be1d6 s/test_set_tagging/test_set_bucket_tagging/;
The test exercises bucket tagging, has nothing to do with object
tagging (clarity).

Signed-off-by: Matt Benjamin <mbenjamin@redhat.com>
2020-10-01 11:30:01 -04:00
Matt Benjamin
61804bcf91 fix test_lifecycle_expiration_header_{put,head}
Primarily fixes the expiration header() verifier function
check_lifecycle_expiration_header, but also cleans up
prefix handling in setup_lifecycle_expiration().

Signed-off-by: Matt Benjamin <mbenjamin@redhat.com>
2020-10-01 11:29:52 -04:00
Matt Benjamin
ea9f07a2bf fix and remark on test_lifecycle_expiration_days0
1. fix a python3-related KeyError exception

2. note here:  AWS documentation includes examples of "Days 0"
   in use, but boto3 will not accept them--this is why the days0
   test currently sets Days 1

3. delay increased to 30s, to avoid occasional failures due to
   jitter

Signed-off-by: Matt Benjamin <mbenjamin@redhat.com>
2020-10-01 11:29:45 -04:00
root
7d14452035 STS Tests Files and modification in __init__.py
Signed-off-by: Kalpesh Pandya <kapandya@redhat.com>
2020-09-16 01:40:33 -04:00
gal salomon
6ea6cb6467 add filter for s3select tests
Signed-off-by: gal salomon <gal.salomon@gmail.com>
(cherry picked from commit fce9a52ef4)
2020-07-07 07:38:41 -04:00
gal salomon
10c801a2e0 fix comments;remove non-used imports;enable test for projection;using get_client()
(cherry picked from commit 72e251ed69)
2020-06-25 13:42:23 -04:00
gal salomon
9d670846a3 adding radom-generated tests, one for where clause , the second for prjection. random arithmetical expression is generated and used for building s3-select query; result is compared to python-engine
(cherry picked from commit fb39ac4829)
2020-06-25 13:42:23 -04:00
gal salomon
bb801b8625 python linter; replace assert with assert_equal; add complex query test(sum,count,where); add test-schema ;
(cherry picked from commit e006dd4753)
2020-06-25 13:42:23 -04:00
gal salomon
652619f46f using config parameters
(cherry picked from commit 1a9d3677f7)
2020-06-25 13:42:23 -04:00
gal salomon
b1ddeee6eb add tests to validate csv-header-info functionalities is correct
(cherry picked from commit 5dc8bc75ab)
2020-06-25 13:42:23 -04:00
gal salomon
ca9cb5cc2c adding test cases for processing CSV objects with different CSV definitions; validate null,escape-rules and quotes are processed correctly
(cherry picked from commit 94b1986228)
2020-06-25 13:42:23 -04:00
gal salomon
e010c4cfec adding utcnow test
(cherry picked from commit 4c7c279f70)
2020-06-25 13:42:23 -04:00
gal salomon
edea887e9c adding tests for date-time functionalities
(cherry picked from commit 5925f0fb3f)
2020-06-25 13:42:23 -04:00
gal salomon
cd4f7e1a7a add complex expression tests; for nested function calls; and different where-clause which create the same group of values
(cherry picked from commit c1bce6ac70)
2020-06-25 13:42:23 -04:00
gal salomon
048f9297a1 adding test for detection of cyclic reference to alias
(cherry picked from commit d543619e71)
2020-06-25 13:42:23 -04:00
gal salomon
8bd6158054 adding aggregation tests
(cherry picked from commit f42872fd53)
2020-06-25 13:42:23 -04:00
gal salomon
aca68a9d39 adding alias test case
(cherry picked from commit 74daf86fe5)
2020-06-25 13:42:23 -04:00
gal salomon
537431c686 commit first tests for s3select and initial framework
(cherry picked from commit dac38694ef)
2020-06-25 13:42:23 -04:00
Matt Benjamin
8ca96c4519 fix/restore test_lifecycle_expiration checks
Commit bf956df71e adding
listobvjectsv2 tests inadvertently changed the v1
test_lifecycle_expiration test, which it had copied to
create a v2 version.  Revert this.

Signed-off-by: Matt Benjamin <mbenjamin@redhat.com>
2020-05-26 11:07:53 -04:00
Kefu Chai
34040769ff bootstrap,requirements.txt: bump up setuptools and requests
Fixes: https://tracker.ceph.com/issues/45691
Signed-off-by: Kefu Chai <kchai@redhat.com>
2020-05-26 09:08:26 -04:00
Casey Bodley
8ebb504159 bootstrap: remove deprecated virtualenv options
this fails on Ubuntu 20.04:

> virtualenv: error: unrecognized arguments: --no-site-packages --distribute

according to `virtualenv -h`:

>   --no-site-packages    DEPRECATED. Retained only for backward compatibility.
>                         Not having access to global site-packages is now the
>                         default behavior.
>   --distribute          DEPRECATED. Retained only for backward compatibility.
>                         This option has no effect.

Signed-off-by: Casey Bodley <cbodley@redhat.com>
(cherry picked from commit a0c15c80ad)
2020-05-21 11:09:26 -04:00
Abhishek L
9092d1ac61
Merge pull request #343 from theanalyst/ceph-master-public-buckets-qa
Ceph master public buckets backport

Reviewed-By: Casey Bodley <cbodley@redhat.com>
2020-03-30 15:25:57 +02:00
Abhishek Lekshmanan
7b3df700cc fix ignore public acls with py3 compatible code
Signed-off-by: Abhishek Lekshmanan <abhishek@suse.com>
(cherry picked from commit 4d675235dd)
2020-03-26 16:28:12 +01:00
Abhishek Lekshmanan
4fc133b1b5 add tests for ignore public acls
Signed-off-by: Abhishek Lekshmanan <abhishek@suse.com>
(cherry picked from commit 3b1571ace6)
2020-03-26 16:28:12 +01:00
Abhishek Lekshmanan
0a495efc8c add test for block public policy
Signed-off-by: Abhishek Lekshmanan <abhishek@suse.com>
(cherry picked from commit b4516725f2)
2020-03-26 16:28:12 +01:00
Abhishek Lekshmanan
a48cf75391 use empty bodies for canned acl tests with BlockPublicAccess
This should be a temporary workaround until #42208 is fixed

Signed-off-by: Abhishek Lekshmanan <abhishek@suse.com>
(cherry picked from commit d02c1819f6)
2020-03-26 16:28:12 +01:00
Abhishek Lekshmanan
a20e0d47f2 remove redundant get_client calls
Signed-off-by: Abhishek Lekshmanan <abhishek@suse.com>
(cherry picked from commit 4996430709)
2020-03-26 16:28:12 +01:00
Abhishek Lekshmanan
19947bd541 add ability to get svc client for s3config set of apis
Signed-off-by: Abhishek Lekshmanan <abhishek@suse.com>
(cherry picked from commit 6d3f574a8e)
2020-03-26 16:26:39 +01:00
Abhishek Lekshmanan
94168194fd add tests for public access configuration
Signed-off-by: Abhishek Lekshmanan <abhishek@suse.com>
(cherry picked from commit 1ad38530e0)
2020-03-26 16:26:19 +01:00
Abhishek Lekshmanan
0e3084c995 add a few test cases for public bucket policies
Signed-off-by: Abhishek Lekshmanan <abhishek@suse.com>
(cherry picked from commit 3f9d31c6c7)
2020-03-26 16:24:14 +01:00
Abhishek Lekshmanan
1d39198872 boto3: add bucket policy status checks for public ACLs
Signed-off-by: Abhishek Lekshmanan <abhishek@suse.com>
(cherry picked from commit 02b1d50ca7)
2020-03-26 16:23:46 +01:00
24 changed files with 5580 additions and 10079 deletions

1
.gitignore vendored
View file

@ -10,6 +10,5 @@
/*.egg-info
/virtualenv
/venv
config.yaml

View file

@ -6,10 +6,14 @@ This is a set of unofficial Amazon AWS S3 compatibility
tests, that can be useful to people implementing software
that exposes an S3-like API. The tests use the Boto2 and Boto3 libraries.
The tests use the Tox tool. To get started, ensure you have the ``tox``
software installed; e.g. on Debian/Ubuntu::
The tests use the Nose test framework. To get started, ensure you have
the ``virtualenv`` software installed; e.g. on Debian/Ubuntu::
sudo apt-get install tox
sudo apt-get install python-virtualenv
and then run::
./bootstrap
You will need to create a configuration file with the location of the
service and two different credentials. A sample configuration file named
@ -18,25 +22,29 @@ used to run the s3 tests on a Ceph cluster started with vstart.
Once you have that file copied and edited, you can run the tests with::
S3TEST_CONF=your.conf tox
S3TEST_CONF=your.conf ./virtualenv/bin/nosetests
You can specify which directory of tests to run::
S3TEST_CONF=your.conf tox -- s3tests_boto3/functional
S3TEST_CONF=your.conf ./virtualenv/bin/nosetests s3tests.functional
You can specify which file of tests to run::
S3TEST_CONF=your.conf tox s3tests_boto3/functional/test_s3.py
S3TEST_CONF=your.conf ./virtualenv/bin/nosetests s3tests.functional.test_s3
You can specify which test to run::
S3TEST_CONF=your.conf tox s3tests_boto3/functional/test_s3.py::test_bucket_list_empty
S3TEST_CONF=your.conf ./virtualenv/bin/nosetests s3tests.functional.test_s3:test_bucket_list_empty
To gather a list of tests being run, use the flags::
-v --collect-only
Some tests have attributes set based on their current reliability and
things like AWS not enforcing their spec stricly. You can filter tests
based on their attributes::
S3TEST_CONF=aws.conf tox -- -m 'not fails_on_aws'
S3TEST_CONF=aws.conf ./virtualenv/bin/nosetests -a '!fails_on_aws'
Most of the tests have both Boto3 and Boto2 versions. Tests written in
Boto2 are in the ``s3tests`` directory. Tests written in Boto3 are
@ -44,7 +52,7 @@ located in the ``s3test_boto3`` directory.
You can run only the boto3 tests with::
S3TEST_CONF=your.conf tox -- s3tests_boto3/functional
S3TEST_CONF=your.conf ./virtualenv/bin/nosetests -v -s -A 'not fails_on_rgw' s3tests_boto3.functional
========================
STS compatibility tests
@ -52,51 +60,14 @@ You can run only the boto3 tests with::
This section contains some basic tests for the AssumeRole, GetSessionToken and AssumeRoleWithWebIdentity API's. The test file is located under ``s3tests_boto3/functional``.
To run the STS tests, the vstart cluster should be started with the following parameter (in addition to any parameters already used with it)::
vstart.sh -o rgw_sts_key=abcdefghijklmnop -o rgw_s3_auth_use_sts=true
Note that the ``rgw_sts_key`` can be set to anything that is 128 bits in length.
After the cluster is up the following command should be executed::
radosgw-admin caps add --tenant=testx --uid="9876543210abcdef0123456789abcdef0123456789abcdef0123456789abcdef" --caps="roles=*"
You can run only the sts tests (all the three API's) with::
S3TEST_CONF=your.conf tox s3tests_boto3/functional/test_sts.py
S3TEST_CONF=your.conf ./virtualenv/bin/nosetests s3tests_boto3.functional.test_sts
You can filter tests based on the attributes. There is a attribute named ``test_of_sts`` to run AssumeRole and GetSessionToken tests and ``webidentity_test`` to run the AssumeRoleWithWebIdentity tests. If you want to execute only ``test_of_sts`` tests you can apply that filter as below::
S3TEST_CONF=your.conf tox -- -m test_of_sts s3tests_boto3/functional/test_sts.py
S3TEST_CONF=your.conf ./virtualenv/bin/nosetests -v -s -A 'test_of_sts' s3tests_boto3.functional.test_sts
For running ``webidentity_test`` you'll need have Keycloak running.
In order to run any STS test you'll need to add "iam" section to the config file. For further reference on how your config file should look check ``s3tests.conf.SAMPLE``.
========================
IAM policy tests
========================
This is a set of IAM policy tests.
This section covers tests for user policies such as Put, Get, List, Delete, user policies with s3 actions, conflicting user policies etc
These tests uses Boto3 libraries. Tests are written in the ``s3test_boto3`` directory.
These iam policy tests uses two users with profile name "iam" and "s3 alt" as mentioned in s3tests.conf.SAMPLE.
If Ceph cluster is started with vstart, then above two users will get created as part of vstart with same access key, secrete key etc as mentioned in s3tests.conf.SAMPLE.
Out of those two users, "iam" user is with capabilities --caps=user-policy=* and "s3 alt" user is without capabilities.
Adding above capabilities to "iam" user is also taken care by vstart (If Ceph cluster is started with vstart).
To run these tests, create configuration file with section "iam" and "s3 alt" refer s3tests.conf.SAMPLE.
Once you have that configuration file copied and edited, you can run all the tests with::
S3TEST_CONF=your.conf tox s3tests_boto3/functional/test_iam.py
You can also specify specific test to run::
S3TEST_CONF=your.conf tox s3tests_boto3/functional/test_iam.py::test_put_user_policy
Some tests have attributes set such as "fails_on_rgw".
You can filter tests based on their attributes::
S3TEST_CONF=your.conf tox -- s3tests_boto3/functional/test_iam.py -m 'not fails_on_rgw'

76
bootstrap Executable file
View file

@ -0,0 +1,76 @@
#!/bin/bash
set -e
virtualenv="virtualenv"
declare -a packages
source /etc/os-release
case "$ID" in
debian|ubuntu|devuan)
packages=(debianutils python3-pip python3-virtualenv python3-dev libevent-dev libffi-dev libxml2-dev libxslt-dev zlib1g-dev)
for package in ${packages[@]}; do
if [ "$(dpkg --status -- $package 2>/dev/null|sed -n 's/^Status: //p')" != "install ok installed" ]; then
# add a space after old values
missing="${missing:+$missing }$package"
fi
done
if [ -n "$missing" ]; then
echo "$0: missing required DEB packages. Installing via sudo." 1>&2
sudo apt-get -y install $missing
fi
;;
centos|fedora|rhel|ol|virtuozzo)
packages=(which python3-virtualenv python36-devel libevent-devel libffi-devel libxml2-devel libxslt-devel zlib-devel)
for package in ${packages[@]}; do
# When the package is python36-devel we change it to python3-devel on Fedora
if [[ ${package} == "python36-devel" && -f /etc/fedora-release ]]; then
package=python36
fi
if [ "$(rpm -qa $package 2>/dev/null)" == "" ]; then
missing="${missing:+$missing }$package"
fi
done
if [ -n "$missing" ]; then
echo "$0: Missing required RPM packages: ${missing}." 1>&2
sudo yum -y install $missing
fi
;;
opensuse*|suse|sles)
packages=(which python3-virtualenv python3-devel libev-devel libffi-devel libxml2-devel libxslt-devel zlib-devel)
for package in ${packages[@]}; do
if [ "$(rpm -qa $package 2>/dev/null)" == "" ]; then
missing="${missing:+$missing }$package"
fi
if [ -n "$missing" ]; then
echo "$0: Missing required RPM packages: ${missing}." 1>&2
sudo zypper --non-interactive install --no-recommends $missing
fi
done
;;
*)
echo "Bootstrap script does not support this distro yet, consider adding the packages"
exit 1
esac
# s3-tests only works on python 3.6 not newer versions of python3
${virtualenv} --python=$(which python3.6) virtualenv
# avoid pip bugs
./virtualenv/bin/pip3 install --upgrade pip
# latest setuptools supporting python 2.7
./virtualenv/bin/pip install setuptools==44.1.0
./virtualenv/bin/pip3 install -r requirements.txt
# forbid setuptools from using the network because it'll try to use
# easy_install, and we really wanted pip; next line will fail if pip
# requirements.txt does not match setup.py requirements -- sucky but
# good enough for now
./virtualenv/bin/python3 setup.py develop

View file

@ -1,51 +0,0 @@
[pytest]
markers =
abac_test
appendobject
auth_aws2
auth_aws4
auth_common
bucket_policy
bucket_encryption
checksum
cloud_transition
encryption
fails_on_aws
fails_on_dbstore
fails_on_dho
fails_on_mod_proxy_fcgi
fails_on_rgw
fails_on_s3
fails_with_subdomain
group
group_policy
iam_account
iam_cross_account
iam_role
iam_tenant
iam_user
lifecycle
lifecycle_expiration
lifecycle_transition
list_objects_v2
object_lock
role_policy
session_policy
s3select
s3website
s3website_routing_rules
s3website_redirect_location
sns
sse_s3
storage_class
tagging
test_of_sts
token_claims_trust_policy_test
token_principal_tag_role_policy_test
token_request_tag_trust_policy_test
token_resource_tags_test
token_role_tags_test
token_tag_keys_test
user_policy
versioning
webidentity_test

View file

@ -1,15 +1,12 @@
PyYAML
nose >=1.0.0
boto >=2.6.0
boto3 >=1.0.0
# botocore-1.28 broke v2 signatures, see https://tracker.ceph.com/issues/58059
botocore <1.28.0
munch >=2.0.0
# 0.14 switches to libev, that means bootstrap needs to change too
gevent >=1.0
isodate >=0.4.4
requests >=2.23.0
pytz
pytz >=2011k
httplib2
lxml
pytest
tox

View file

@ -19,14 +19,6 @@ ssl_verify = False
## the prefix to 30 characters long, and avoid collisions
bucket prefix = yournamehere-{random}-
# all the iam account resources (users, roles, etc) created
# will start with this name prefix
iam name prefix = s3-tests-
# all the iam account resources (users, roles, etc) created
# will start with this path prefix
iam path prefix = /s3-tests/
[s3 main]
# main display_name set in vstart.sh
display_name = M. Tester
@ -49,12 +41,6 @@ secret_key = h7GhxuBLTrlhVUyxSPUKUV8r/2EI4ngqJxD7iBdBYLhwluN30JaT3Q==
## replace with key id obtained when secret is created, or delete if KMS not tested
#kms_keyid = 01234567-89ab-cdef-0123-456789abcdef
## Storage classes
#storage_classes = "LUKEWARM, FROZEN"
## Lifecycle debug interval (default: 10)
#lc_debug_interval = 20
[s3 alt]
# alt display_name set in vstart.sh
display_name = john.doe
@ -70,37 +56,6 @@ access_key = NOPQRSTUVWXYZABCDEFG
# alt AWS secret key set in vstart.sh
secret_key = nopqrstuvwxyzabcdefghijklmnabcdefghijklm
#[s3 cloud]
## to run the testcases with "cloud_transition" attribute.
## Note: the waiting time may have to tweaked depending on
## the I/O latency to the cloud endpoint.
## host set for cloud endpoint
# host = localhost
## port set for cloud endpoint
# port = 8001
## say "False" to disable TLS
# is_secure = False
## cloud endpoint credentials
# access_key = 0555b35654ad1656d804
# secret_key = h7GhxuBLTrlhVUyxSPUKUV8r/2EI4ngqJxD7iBdBYLhwluN30JaT3Q==
## storage class configured as cloud tier on local rgw server
# cloud_storage_class = CLOUDTIER
## Below are optional -
## Above configured cloud storage class config options
# retain_head_object = false
# target_storage_class = Target_SC
# target_path = cloud-bucket
## another regular storage class to test multiple transition rules,
# storage_class = S1
[s3 tenant]
# tenant display_name set in vstart.sh
display_name = testx$tenanteduser
@ -117,9 +72,6 @@ secret_key = opqrstuvwxyzabcdefghijklmnopqrstuvwxyzab
# tenant email set in vstart.sh
email = tenanteduser@example.com
# tenant name
tenant = testx
#following section needs to be added for all sts-tests
[iam]
#used for iam operations in sts-tests
@ -138,20 +90,6 @@ secret_key = abcdefghijklmnopqrstuvwxyzabcdefghijklmn
#display_name from vstart.sh
display_name = youruseridhere
# iam account root user for iam_account tests
[iam root]
access_key = AAAAAAAAAAAAAAAAAAaa
secret_key = aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa
user_id = RGW11111111111111111
email = account1@ceph.com
# iam account root user in a different account than [iam root]
[iam alt root]
access_key = BBBBBBBBBBBBBBBBBBbb
secret_key = bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb
user_id = RGW22222222222222222
email = account2@ceph.com
#following section needs to be added when you want to run Assume Role With Webidentity test
[webidentity]
#used for assume role with web identity test in sts-tests
@ -160,12 +98,6 @@ token=<access_token>
aud=<obtained after introspecting token>
sub=<obtained after introspecting token>
azp=<obtained after introspecting token>
user_token=<access token for a user, with attribute Department=[Engineering, Marketing>]
thumbprint=<obtained from x509 certificate>
KC_REALM=<name of the realm>

View file

@ -7,7 +7,6 @@ import itertools
import os
import random
import string
import pytest
from http.client import HTTPConnection, HTTPSConnection
from urllib.parse import urlparse
@ -371,15 +370,6 @@ def teardown():
# remove our buckets here also, to avoid littering
nuke_prefixed_buckets(prefix=prefix)
@pytest.fixture(scope="package")
def configfile():
setup()
yield config
@pytest.fixture(autouse=True)
def setup_teardown(configfile):
yield
teardown()
bucket_counter = itertools.count(1)

File diff suppressed because it is too large Load diff

View file

@ -7,7 +7,7 @@ import datetime
import time
import email.utils
import isodate
import pytest
import nose
import operator
import socket
import ssl
@ -27,14 +27,16 @@ import re
from collections import defaultdict
from urllib.parse import urlparse
from nose.tools import eq_ as eq
from nose.plugins.attrib import attr
from nose.plugins.skip import SkipTest
from . import utils
from .utils import assert_raises
from .policy import Policy, Statement, make_json_policy
from . import (
configfile,
setup_teardown,
nuke_prefixed_buckets,
get_new_bucket,
get_new_bucket_name,
@ -51,9 +53,9 @@ from . import (
def check_access_denied(fn, *args, **kwargs):
e = assert_raises(boto.exception.S3ResponseError, fn, *args, **kwargs)
assert e.status == 403
assert e.reason == 'Forbidden'
assert e.error_code == 'AccessDenied'
eq(e.status, 403)
eq(e.reason, 'Forbidden')
eq(e.error_code, 'AccessDenied')
def check_bad_bucket_name(name):
"""
@ -61,9 +63,9 @@ def check_bad_bucket_name(name):
that the request fails because of an invalid bucket name.
"""
e = assert_raises(boto.exception.S3ResponseError, get_new_bucket, targets.main.default, name)
assert e.status == 400
assert e.reason.lower() == 'bad request' # some proxies vary the case
assert e.error_code == 'InvalidBucketName'
eq(e.status, 400)
eq(e.reason.lower(), 'bad request') # some proxies vary the case
eq(e.error_code, 'InvalidBucketName')
def _create_keys(bucket=None, keys=[]):
"""
@ -92,16 +94,20 @@ def _get_alt_connection():
# Breaks DNS with SubdomainCallingFormat
@pytest.mark.fails_with_subdomain
@attr('fails_with_subdomain')
@attr(resource='bucket')
@attr(method='put')
@attr(operation='create w/! in name')
@attr(assertion='fails with subdomain')
def test_bucket_create_naming_bad_punctuation():
# characters other than [a-zA-Z0-9._-]
check_bad_bucket_name('alpha!soup')
def check_versioning(bucket, status):
try:
assert bucket.get_versioning_status()['Versioning'] == status
eq(bucket.get_versioning_status()['Versioning'], status)
except KeyError:
assert status == None
eq(status, None)
# amazon is eventual consistent, retry a bit if failed
def check_configure_versioning_retry(bucket, status, expected_string):
@ -120,10 +126,13 @@ def check_configure_versioning_retry(bucket, status, expected_string):
time.sleep(1)
assert expected_string == read_status
eq(expected_string, read_status)
@pytest.mark.versioning
@pytest.mark.fails_on_dbstore
@attr(resource='object')
@attr(method='create')
@attr(operation='create versioned object, read not exist null version')
@attr(assertion='read null version behaves correctly')
@attr('versioning')
def test_versioning_obj_read_not_exist_null():
bucket = get_new_bucket()
check_versioning(bucket, None)
@ -137,12 +146,15 @@ def test_versioning_obj_read_not_exist_null():
key.set_contents_from_string(content)
key = bucket.get_key(objname, version_id='null')
assert key == None
eq(key, None)
@pytest.mark.fails_on_aws
@pytest.mark.fails_with_subdomain
@pytest.mark.appendobject
@pytest.mark.fails_on_dbstore
@attr(resource='object')
@attr(method='put')
@attr(operation='append object')
@attr(assertion='success')
@attr('fails_on_aws')
@attr('fails_with_subdomain')
@attr('appendobject')
def test_append_object():
bucket = get_new_bucket()
key = bucket.new_key('foo')
@ -154,16 +166,19 @@ def test_append_object():
res = _make_raw_request(host=s3.main.host, port=s3.main.port, method='PUT', path=path1, body='abc', secure=s3.main.is_secure)
path2 = path + '&append&position=3'
res = _make_raw_request(host=s3.main.host, port=s3.main.port, method='PUT', path=path2, body='abc', secure=s3.main.is_secure)
assert res.status == 200
assert res.reason == 'OK'
eq(res.status, 200)
eq(res.reason, 'OK')
key = bucket.get_key('foo')
assert key.size == 6
eq(key.size, 6)
@pytest.mark.fails_on_aws
@pytest.mark.fails_with_subdomain
@pytest.mark.appendobject
@pytest.mark.fails_on_dbstore
@attr(resource='object')
@attr(method='put')
@attr(operation='append to normal object')
@attr(assertion='fails 409')
@attr('fails_on_aws')
@attr('fails_with_subdomain')
@attr('appendobject')
def test_append_normal_object():
bucket = get_new_bucket()
key = bucket.new_key('foo')
@ -174,13 +189,16 @@ def test_append_normal_object():
path = o.path + '?' + o.query
path = path + '&append&position=3'
res = _make_raw_request(host=s3.main.host, port=s3.main.port, method='PUT', path=path, body='abc', secure=s3.main.is_secure)
assert res.status == 409
eq(res.status, 409)
@pytest.mark.fails_on_aws
@pytest.mark.fails_with_subdomain
@pytest.mark.appendobject
@pytest.mark.fails_on_dbstore
@attr(resource='object')
@attr(method='put')
@attr(operation='append position not right')
@attr(assertion='fails 409')
@attr('fails_on_aws')
@attr('fails_with_subdomain')
@attr('appendobject')
def test_append_object_position_wrong():
bucket = get_new_bucket()
key = bucket.new_key('foo')
@ -192,13 +210,17 @@ def test_append_object_position_wrong():
res = _make_raw_request(host=s3.main.host, port=s3.main.port, method='PUT', path=path1, body='abc', secure=s3.main.is_secure)
path2 = path + '&append&position=9'
res = _make_raw_request(host=s3.main.host, port=s3.main.port, method='PUT', path=path2, body='abc', secure=s3.main.is_secure)
assert res.status == 409
assert int(res.getheader('x-rgw-next-append-position')) == 3
eq(res.status, 409)
eq(int(res.getheader('x-rgw-next-append-position')), 3)
# TODO rgw log_bucket.set_as_logging_target() gives 403 Forbidden
# http://tracker.newdream.net/issues/984
@pytest.mark.fails_on_rgw
@attr(resource='bucket.log')
@attr(method='put')
@attr(operation='set/enable/disable logging target')
@attr(assertion='operations succeed')
@attr('fails_on_rgw')
def test_logging_toggle():
bucket = get_new_bucket()
log_bucket = get_new_bucket(targets.main.default, bucket.name + '-log')
@ -214,6 +236,242 @@ def list_bucket_storage_class(bucket):
return result
# The test harness for lifecycle is configured to treat days as 10 second intervals.
@attr(resource='bucket')
@attr(method='put')
@attr(operation='test lifecycle expiration')
@attr('lifecycle')
@attr('lifecycle_transition')
@attr('fails_on_aws')
def test_lifecycle_transition():
sc = configured_storage_classes()
if len(sc) < 3:
raise SkipTest
bucket = set_lifecycle(rules=[{'id': 'rule1', 'transition': lc_transition(days=1, storage_class=sc[1]), 'prefix': 'expire1/', 'status': 'Enabled'},
{'id':'rule2', 'transition': lc_transition(days=4, storage_class=sc[2]), 'prefix': 'expire3/', 'status': 'Enabled'}])
_create_keys(bucket=bucket, keys=['expire1/foo', 'expire1/bar', 'keep2/foo',
'keep2/bar', 'expire3/foo', 'expire3/bar'])
# Get list of all keys
init_keys = bucket.get_all_keys()
eq(len(init_keys), 6)
# Wait for first expiration (plus fudge to handle the timer window)
time.sleep(25)
expire1_keys = list_bucket_storage_class(bucket)
eq(len(expire1_keys['STANDARD']), 4)
eq(len(expire1_keys[sc[1]]), 2)
eq(len(expire1_keys[sc[2]]), 0)
# Wait for next expiration cycle
time.sleep(10)
keep2_keys = list_bucket_storage_class(bucket)
eq(len(keep2_keys['STANDARD']), 4)
eq(len(keep2_keys[sc[1]]), 2)
eq(len(keep2_keys[sc[2]]), 0)
# Wait for final expiration cycle
time.sleep(20)
expire3_keys = list_bucket_storage_class(bucket)
eq(len(expire3_keys['STANDARD']), 2)
eq(len(expire3_keys[sc[1]]), 2)
eq(len(expire3_keys[sc[2]]), 2)
# The test harness for lifecycle is configured to treat days as 10 second intervals.
@attr(resource='bucket')
@attr(method='put')
@attr(operation='test lifecycle expiration')
@attr('lifecycle')
@attr('lifecycle_transition')
@attr('fails_on_aws')
def test_lifecycle_transition_single_rule_multi_trans():
sc = configured_storage_classes()
if len(sc) < 3:
raise SkipTest
bucket = set_lifecycle(rules=[
{'id': 'rule1',
'transition': lc_transitions([
lc_transition(days=1, storage_class=sc[1]),
lc_transition(days=4, storage_class=sc[2])]),
'prefix': 'expire1/',
'status': 'Enabled'}])
_create_keys(bucket=bucket, keys=['expire1/foo', 'expire1/bar', 'keep2/foo',
'keep2/bar', 'expire3/foo', 'expire3/bar'])
# Get list of all keys
init_keys = bucket.get_all_keys()
eq(len(init_keys), 6)
# Wait for first expiration (plus fudge to handle the timer window)
time.sleep(25)
expire1_keys = list_bucket_storage_class(bucket)
eq(len(expire1_keys['STANDARD']), 4)
eq(len(expire1_keys[sc[1]]), 2)
eq(len(expire1_keys[sc[2]]), 0)
# Wait for next expiration cycle
time.sleep(10)
keep2_keys = list_bucket_storage_class(bucket)
eq(len(keep2_keys['STANDARD']), 4)
eq(len(keep2_keys[sc[1]]), 2)
eq(len(keep2_keys[sc[2]]), 0)
# Wait for final expiration cycle
time.sleep(20)
expire3_keys = list_bucket_storage_class(bucket)
eq(len(expire3_keys['STANDARD']), 4)
eq(len(expire3_keys[sc[1]]), 0)
eq(len(expire3_keys[sc[2]]), 2)
def generate_lifecycle_body(rules):
body = '<?xml version="1.0" encoding="UTF-8"?><LifecycleConfiguration>'
for rule in rules:
body += '<Rule><ID>%s</ID><Status>%s</Status>' % (rule['ID'], rule['Status'])
if 'Prefix' in list(rule.keys()):
body += '<Prefix>%s</Prefix>' % rule['Prefix']
if 'Filter' in list(rule.keys()):
prefix_str= '' # AWS supports empty filters
if 'Prefix' in list(rule['Filter'].keys()):
prefix_str = '<Prefix>%s</Prefix>' % rule['Filter']['Prefix']
body += '<Filter>%s</Filter>' % prefix_str
if 'Expiration' in list(rule.keys()):
if 'ExpiredObjectDeleteMarker' in list(rule['Expiration'].keys()):
body += '<Expiration><ExpiredObjectDeleteMarker>%s</ExpiredObjectDeleteMarker></Expiration>' \
% rule['Expiration']['ExpiredObjectDeleteMarker']
elif 'Date' in list(rule['Expiration'].keys()):
body += '<Expiration><Date>%s</Date></Expiration>' % rule['Expiration']['Date']
else:
body += '<Expiration><Days>%d</Days></Expiration>' % rule['Expiration']['Days']
if 'NoncurrentVersionExpiration' in list(rule.keys()):
body += '<NoncurrentVersionExpiration><NoncurrentDays>%d</NoncurrentDays></NoncurrentVersionExpiration>' % \
rule['NoncurrentVersionExpiration']['NoncurrentDays']
if 'NoncurrentVersionTransition' in list(rule.keys()):
for t in rule['NoncurrentVersionTransition']:
body += '<NoncurrentVersionTransition>'
body += '<NoncurrentDays>%d</NoncurrentDays>' % \
t['NoncurrentDays']
body += '<StorageClass>%s</StorageClass>' % \
t['StorageClass']
body += '</NoncurrentVersionTransition>'
if 'AbortIncompleteMultipartUpload' in list(rule.keys()):
body += '<AbortIncompleteMultipartUpload><DaysAfterInitiation>%d</DaysAfterInitiation>' \
'</AbortIncompleteMultipartUpload>' % rule['AbortIncompleteMultipartUpload']['DaysAfterInitiation']
body += '</Rule>'
body += '</LifecycleConfiguration>'
return body
@attr(resource='bucket')
@attr(method='put')
@attr(operation='set lifecycle config with noncurrent version expiration')
@attr('lifecycle')
@attr('lifecycle_transition')
def test_lifecycle_set_noncurrent_transition():
sc = configured_storage_classes()
if len(sc) < 3:
raise SkipTest
bucket = get_new_bucket()
rules = [
{
'ID': 'rule1',
'Prefix': 'test1/',
'Status': 'Enabled',
'NoncurrentVersionTransition': [
{
'NoncurrentDays': 2,
'StorageClass': sc[1]
},
{
'NoncurrentDays': 4,
'StorageClass': sc[2]
}
],
'NoncurrentVersionExpiration': {
'NoncurrentDays': 6
}
},
{'ID': 'rule2', 'Prefix': 'test2/', 'Status': 'Disabled', 'NoncurrentVersionExpiration': {'NoncurrentDays': 3}}
]
body = generate_lifecycle_body(rules)
fp = StringIO(body)
md5 = boto.utils.compute_md5(fp)
headers = {'Content-MD5': md5[1], 'Content-Type': 'text/xml'}
res = bucket.connection.make_request('PUT', bucket.name, data=fp.getvalue(), query_args='lifecycle',
headers=headers)
eq(res.status, 200)
eq(res.reason, 'OK')
@attr(resource='bucket')
@attr(method='put')
@attr(operation='test lifecycle non-current version expiration')
@attr('lifecycle')
@attr('lifecycle_expiration')
@attr('lifecycle_transition')
@attr('fails_on_aws')
def test_lifecycle_noncur_transition():
sc = configured_storage_classes()
if len(sc) < 3:
raise SkipTest
bucket = get_new_bucket()
check_configure_versioning_retry(bucket, True, "Enabled")
rules = [
{
'ID': 'rule1',
'Prefix': 'test1/',
'Status': 'Enabled',
'NoncurrentVersionTransition': [
{
'NoncurrentDays': 1,
'StorageClass': sc[1]
},
{
'NoncurrentDays': 3,
'StorageClass': sc[2]
}
],
'NoncurrentVersionExpiration': {
'NoncurrentDays': 5
}
}
]
body = generate_lifecycle_body(rules)
fp = StringIO(body)
md5 = boto.utils.compute_md5(fp)
headers = {'Content-MD5': md5[1], 'Content-Type': 'text/xml'}
bucket.connection.make_request('PUT', bucket.name, data=fp.getvalue(), query_args='lifecycle',
headers=headers)
create_multiple_versions(bucket, "test1/a", 3)
create_multiple_versions(bucket, "test1/b", 3)
init_keys = bucket.get_all_versions()
eq(len(init_keys), 6)
time.sleep(25)
expire1_keys = list_bucket_storage_class(bucket)
eq(len(expire1_keys['STANDARD']), 2)
eq(len(expire1_keys[sc[1]]), 4)
eq(len(expire1_keys[sc[2]]), 0)
time.sleep(20)
expire1_keys = list_bucket_storage_class(bucket)
eq(len(expire1_keys['STANDARD']), 2)
eq(len(expire1_keys[sc[1]]), 0)
eq(len(expire1_keys[sc[2]]), 4)
time.sleep(20)
expire_keys = bucket.get_all_versions()
expire1_keys = list_bucket_storage_class(bucket)
eq(len(expire1_keys['STANDARD']), 2)
eq(len(expire1_keys[sc[1]]), 0)
eq(len(expire1_keys[sc[2]]), 0)
def transfer_part(bucket, mp_id, mp_keyname, i, part, headers=None):
"""Transfer a part of a multipart upload. Designed to be run in parallel.
"""
@ -285,13 +543,13 @@ def gen_rand_string(size, chars=string.ascii_uppercase + string.digits):
def verify_object(bucket, k, data=None, storage_class=None):
if storage_class:
assert k.storage_class == storage_class
eq(k.storage_class, storage_class)
if data:
read_data = k.get_contents_as_string()
equal = data == read_data.decode() # avoid spamming log if data not equal
assert equal == True
equal = data == read_data # avoid spamming log if data not equal
eq(equal, True)
def copy_object_storage_class(src_bucket, src_key, dest_bucket, dest_key, storage_class):
query_args=None
@ -307,7 +565,7 @@ def copy_object_storage_class(src_bucket, src_key, dest_bucket, dest_key, storag
res = dest_bucket.connection.make_request('PUT', dest_bucket.name, dest_key.name,
query_args=query_args, headers=headers)
assert res.status == 200
eq(res.status, 200)
def _populate_multipart_key(bucket, kname, size, storage_class=None):
(upload, data) = _multipart_upload(bucket, kname, size, storage_class=storage_class)
@ -362,9 +620,6 @@ def configured_storage_classes():
if item != 'STANDARD':
sc.append(item)
sc = [i for i in sc if i]
print("storage classes configured: " + str(sc))
return sc
def lc_transition(days=None, date=None, storage_class=None):
@ -378,13 +633,15 @@ def lc_transitions(transitions=None):
return result
@pytest.mark.storage_class
@pytest.mark.fails_on_aws
@pytest.mark.fails_on_dbstore
@attr(resource='object')
@attr(method='put')
@attr(operation='test create object with storage class')
@attr('storage_class')
@attr('fails_on_aws')
def test_object_storage_class():
sc = configured_storage_classes()
if len(sc) < 2:
pytest.skip('requires multiple storage classes')
raise SkipTest
bucket = get_new_bucket()
@ -394,13 +651,15 @@ def test_object_storage_class():
verify_object(bucket, k, data, storage_class)
@pytest.mark.storage_class
@pytest.mark.fails_on_aws
@pytest.mark.fails_on_dbstore
@attr(resource='object')
@attr(method='put')
@attr(operation='test create multipart object with storage class')
@attr('storage_class')
@attr('fails_on_aws')
def test_object_storage_class_multipart():
sc = configured_storage_classes()
if len(sc) < 2:
pytest.skip('requires multiple storage classes')
raise SkipTest
bucket = get_new_bucket()
size = 11 * 1024 * 1024
@ -410,13 +669,13 @@ def test_object_storage_class_multipart():
(upload, data) = _multipart_upload(bucket, key, size, storage_class=storage_class)
upload.complete_upload()
key2 = bucket.get_key(key)
assert key2.size == size
assert key2.storage_class == storage_class
eq(key2.size, size)
eq(key2.storage_class, storage_class)
def _do_test_object_modify_storage_class(obj_write_func, size):
sc = configured_storage_classes()
if len(sc) < 2:
pytest.skip('requires multiple storage classes')
raise SkipTest
bucket = get_new_bucket()
@ -433,23 +692,27 @@ def _do_test_object_modify_storage_class(obj_write_func, size):
copy_object_storage_class(bucket, k, bucket, k, new_storage_class)
verify_object(bucket, k, data, storage_class)
@pytest.mark.storage_class
@pytest.mark.fails_on_aws
@pytest.mark.fails_on_dbstore
@attr(resource='object')
@attr(method='put')
@attr(operation='test changing objects storage class')
@attr('storage_class')
@attr('fails_on_aws')
def test_object_modify_storage_class():
_do_test_object_modify_storage_class(_populate_key, size=9*1024*1024)
@pytest.mark.storage_class
@pytest.mark.fails_on_aws
@pytest.mark.fails_on_dbstore
@attr(resource='object')
@attr(method='put')
@attr(operation='test changing objects storage class')
@attr('storage_class')
@attr('fails_on_aws')
def test_object_modify_storage_class_multipart():
_do_test_object_modify_storage_class(_populate_multipart_key, size=11*1024*1024)
def _do_test_object_storage_class_copy(obj_write_func, size):
sc = configured_storage_classes()
if len(sc) < 2:
pytest.skip('requires multiple storage classes')
raise SkipTest
src_bucket = get_new_bucket()
dest_bucket = get_new_bucket()
@ -467,15 +730,19 @@ def _do_test_object_storage_class_copy(obj_write_func, size):
copy_object_storage_class(src_bucket, src_key, dest_bucket, dest_key, new_storage_class)
verify_object(dest_bucket, dest_key, data, new_storage_class)
@pytest.mark.storage_class
@pytest.mark.fails_on_aws
@pytest.mark.fails_on_dbstore
@attr(resource='object')
@attr(method='copy')
@attr(operation='test copy object to object with different storage class')
@attr('storage_class')
@attr('fails_on_aws')
def test_object_storage_class_copy():
_do_test_object_storage_class_copy(_populate_key, size=9*1024*1024)
@pytest.mark.storage_class
@pytest.mark.fails_on_aws
@pytest.mark.fails_on_dbstore
@attr(resource='object')
@attr(method='copy')
@attr(operation='test changing objects storage class')
@attr('storage_class')
@attr('fails_on_aws')
def test_object_storage_class_copy_multipart():
_do_test_object_storage_class_copy(_populate_multipart_key, size=9*1024*1024)
@ -532,7 +799,7 @@ class FakeFileVerifier(object):
if self.char == None:
self.char = data[0]
self.size += size
assert data.decode() == self.char*size
eq(data.decode(), self.char*size)
def _verify_atomic_key_data(key, size=-1, char=None):
"""
@ -541,7 +808,7 @@ def _verify_atomic_key_data(key, size=-1, char=None):
fp_verify = FakeFileVerifier(char)
key.get_contents_to_file(fp_verify)
if size >= 0:
assert fp_verify.size == size
eq(fp_verify.size, size)
def _test_atomic_dual_conditional_write(file_size):
"""
@ -570,20 +837,26 @@ def _test_atomic_dual_conditional_write(file_size):
# key.set_contents_from_file(fp_c, headers={'If-Match': etag_fp_a})
e = assert_raises(boto.exception.S3ResponseError, key.set_contents_from_file, fp_c,
headers={'If-Match': etag_fp_a})
assert e.status == 412
assert e.reason == 'Precondition Failed'
assert e.error_code == 'PreconditionFailed'
eq(e.status, 412)
eq(e.reason, 'Precondition Failed')
eq(e.error_code, 'PreconditionFailed')
# verify the file
_verify_atomic_key_data(key, file_size, 'B')
@pytest.mark.fails_on_aws
@pytest.mark.fails_on_dbstore
@attr(resource='object')
@attr(method='put')
@attr(operation='write one or the other')
@attr(assertion='1MB successful')
@attr('fails_on_aws')
def test_atomic_dual_conditional_write_1mb():
_test_atomic_dual_conditional_write(1024*1024)
@pytest.mark.fails_on_aws
@pytest.mark.fails_on_dbstore
@attr(resource='object')
@attr(method='put')
@attr(operation='write file in deleted bucket')
@attr(assertion='fail 404')
@attr('fails_on_aws')
def test_atomic_write_bucket_gone():
bucket = get_new_bucket()
@ -595,9 +868,9 @@ def test_atomic_write_bucket_gone():
key = bucket.new_key('foo')
fp_a = FakeWriteFile(1024*1024, 'A', remove_bucket)
e = assert_raises(boto.exception.S3ResponseError, key.set_contents_from_file, fp_a)
assert e.status == 404
assert e.reason == 'Not Found'
assert e.error_code == 'NoSuchBucket'
eq(e.status, 404)
eq(e.reason, 'Not Found')
eq(e.error_code, 'NoSuchBucket')
def _multipart_upload_enc(bucket, s3_key_name, size, part_size=5*1024*1024,
do_list=None, init_headers=None, part_headers=None,
@ -623,8 +896,11 @@ def _multipart_upload_enc(bucket, s3_key_name, size, part_size=5*1024*1024,
@pytest.mark.encryption
@pytest.mark.fails_on_dbstore
@attr(resource='object')
@attr(method='put')
@attr(operation='multipart upload with bad key for uploading chunks')
@attr(assertion='successful')
@attr('encryption')
def test_encryption_sse_c_multipart_invalid_chunks_1():
bucket = get_new_bucket()
key = "multipart_enc"
@ -645,10 +921,13 @@ def test_encryption_sse_c_multipart_invalid_chunks_1():
_multipart_upload_enc, bucket, key, objlen,
init_headers=init_headers, part_headers=part_headers,
metadata={'foo': 'bar'})
assert e.status == 400
eq(e.status, 400)
@pytest.mark.encryption
@pytest.mark.fails_on_dbstore
@attr(resource='object')
@attr(method='put')
@attr(operation='multipart upload with bad md5 for chunks')
@attr(assertion='successful')
@attr('encryption')
def test_encryption_sse_c_multipart_invalid_chunks_2():
bucket = get_new_bucket()
key = "multipart_enc"
@ -669,11 +948,14 @@ def test_encryption_sse_c_multipart_invalid_chunks_2():
_multipart_upload_enc, bucket, key, objlen,
init_headers=init_headers, part_headers=part_headers,
metadata={'foo': 'bar'})
assert e.status == 400
eq(e.status, 400)
@pytest.mark.fails_with_subdomain
@pytest.mark.bucket_policy
@pytest.mark.fails_on_dbstore
@attr(resource='bucket')
@attr(method='get')
@attr(operation='Test Bucket Policy for a user belonging to a different tenant')
@attr(assertion='succeeds')
@attr('fails_with_subdomain')
@attr('bucket-policy')
def test_bucket_policy_different_tenant():
bucket = get_new_bucket()
key = bucket.new_key('asdf')
@ -708,8 +990,10 @@ def test_bucket_policy_different_tenant():
b = new_conn.get_bucket(bucket_name)
b.get_all_keys()
@pytest.mark.bucket_policy
@pytest.mark.fails_on_dbstore
@attr(resource='bucket')
@attr(method='put')
@attr(operation='Test put condition operator end with ifExists')
@attr('bucket-policy')
def test_bucket_policy_set_condition_operator_end_with_IfExists():
bucket = _create_keys(keys=['foo'])
policy = '''{
@ -728,25 +1012,73 @@ def test_bucket_policy_set_condition_operator_end_with_IfExists():
}
]
}''' % bucket.name
assert bucket.set_policy(policy) == True
eq(bucket.set_policy(policy), True)
res = _make_request('GET', bucket.name, bucket.get_key("foo"),
request_headers={'referer': 'http://www.example.com/'})
assert res.status == 200
eq(res.status, 200)
res = _make_request('GET', bucket.name, bucket.get_key("foo"),
request_headers={'referer': 'http://www.example.com/index.html'})
assert res.status == 200
eq(res.status, 200)
res = _make_request('GET', bucket.name, bucket.get_key("foo"))
assert res.status == 200
eq(res.status, 200)
res = _make_request('GET', bucket.name, bucket.get_key("foo"),
request_headers={'referer': 'http://example.com'})
assert res.status == 403
eq(res.status, 403)
def _make_arn_resource(path="*"):
return "arn:aws:s3:::{}".format(path)
@pytest.mark.tagging
@pytest.mark.bucket_policy
@pytest.mark.fails_on_dbstore
@attr(resource='object')
@attr(method='put')
@attr(operation='Deny put obj requests without encryption')
@attr(assertion='success')
@attr('encryption')
@attr('bucket-policy')
def test_bucket_policy_put_obj_enc():
bucket = get_new_bucket()
deny_incorrect_algo = {
"StringNotEquals": {
"s3:x-amz-server-side-encryption": "AES256"
}
}
deny_unencrypted_obj = {
"Null" : {
"s3:x-amz-server-side-encryption": "true"
}
}
p = Policy()
resource = _make_arn_resource("{}/{}".format(bucket.name, "*"))
s1 = Statement("s3:PutObject", resource, effect="Deny", condition=deny_incorrect_algo)
s2 = Statement("s3:PutObject", resource, effect="Deny", condition=deny_unencrypted_obj)
policy_document = p.add_statement(s1).add_statement(s2).to_json()
bucket.set_policy(policy_document)
key1_str ='testobj'
key1 = bucket.new_key(key1_str)
check_access_denied(key1.set_contents_from_string, key1_str)
sse_client_headers = {
'x-amz-server-side-encryption' : 'AES256',
'x-amz-server-side-encryption-customer-algorithm': 'AES256',
'x-amz-server-side-encryption-customer-key': 'pO3upElrwuEXSoFwCfnZPdSsmt/xWeFa0N9KgDijwVs=',
'x-amz-server-side-encryption-customer-key-md5': 'DWygnHRtgiJ77HCm+1rvHw=='
}
key1.set_contents_from_string(key1_str, headers=sse_client_headers)
@attr(resource='object')
@attr(method='put')
@attr(operation='put obj with RequestObjectTag')
@attr(assertion='success')
@attr('tagging')
@attr('bucket-policy')
def test_bucket_policy_put_obj_request_obj_tag():
bucket = get_new_bucket()

View file

@ -1,6 +1,7 @@
import sys
from collections.abc import Container
import pytest
import collections
import nose
import string
import random
from pprint import pprint
@ -10,11 +11,14 @@ import socket
from urllib.parse import urlparse
from nose.tools import eq_ as eq, ok_ as ok
from nose.plugins.attrib import attr
from nose.tools import timed
from nose.plugins.skip import SkipTest
from .. import common
from . import (
configfile,
setup_teardown,
get_new_bucket,
get_new_bucket_name,
s3,
@ -39,26 +43,37 @@ ERRORDOC_TEMPLATE = '<html><h1>ErrorDoc</h1><body>{random}</body></html>'
CAN_WEBSITE = None
@pytest.fixture(autouse=True, scope="module")
def check_can_test_website():
bucket = get_new_bucket()
try:
wsconf = bucket.get_website_configuration()
global CAN_WEBSITE
# This is a bit expensive, so we cache this
if CAN_WEBSITE is None:
bucket = get_new_bucket()
try:
wsconf = bucket.get_website_configuration()
CAN_WEBSITE = True
except boto.exception.S3ResponseError as e:
if e.status == 404 and e.reason == 'Not Found' and e.error_code in ['NoSuchWebsiteConfiguration', 'NoSuchKey']:
CAN_WEBSITE = True
elif e.status == 405 and e.reason == 'Method Not Allowed' and e.error_code == 'MethodNotAllowed':
# rgw_enable_static_website is false
CAN_WEBSITE = False
elif e.status == 403 and e.reason == 'SignatureDoesNotMatch' and e.error_code == 'Forbidden':
# This is older versions that do not support the website code
CAN_WEBSITE = False
elif e.status == 501 and e.error_code == 'NotImplemented':
CAN_WEBSITE = False
else:
raise RuntimeError("Unknown response in checking if WebsiteConf is supported", e)
finally:
bucket.delete()
if CAN_WEBSITE is True:
return True
except boto.exception.S3ResponseError as e:
if e.status == 404 and e.reason == 'Not Found' and e.error_code in ['NoSuchWebsiteConfiguration', 'NoSuchKey']:
return True
elif e.status == 405 and e.reason == 'Method Not Allowed' and e.error_code == 'MethodNotAllowed':
pytest.skip('rgw_enable_static_website is false')
elif e.status == 403 and e.reason == 'SignatureDoesNotMatch' and e.error_code == 'Forbidden':
# This is older versions that do not support the website code
pytest.skip('static website is not implemented')
elif e.status == 501 and e.error_code == 'NotImplemented':
pytest.skip('static website is not implemented')
else:
raise RuntimeError("Unknown response in checking if WebsiteConf is supported", e)
finally:
bucket.delete()
elif CAN_WEBSITE is False:
raise SkipTest
else:
raise RuntimeError("Unknown cached response in checking if WebsiteConf is supported")
def make_website_config(xml_fragment):
"""
@ -154,20 +169,20 @@ def _test_website_prep(bucket, xml_template, hardcoded_fields = {}, expect_fail=
# Cleanup for our validation
common.assert_xml_equal(config_xmlcmp, config_xmlnew)
#print("config_xmlcmp\n", config_xmlcmp)
#assert config_xmlnew == config_xmlcmp
#eq (config_xmlnew, config_xmlcmp)
f['WebsiteConfiguration'] = config_xmlcmp
return f
def __website_expected_reponse_status(res, status, reason):
if not isinstance(status, Container):
if not isinstance(status, collections.Container):
status = set([status])
if not isinstance(reason, Container):
if not isinstance(reason, collections.Container):
reason = set([reason])
if status is not IGNORE_FIELD:
assert res.status in status, 'HTTP code was %s should be %s' % (res.status, status)
ok(res.status in status, 'HTTP code was %s should be %s' % (res.status, status))
if reason is not IGNORE_FIELD:
assert res.reason in reason, 'HTTP reason was was %s should be %s' % (res.reason, reason)
ok(res.reason in reason, 'HTTP reason was was %s should be %s' % (res.reason, reason))
def _website_expected_default_html(**kwargs):
fields = []
@ -179,7 +194,7 @@ def _website_expected_default_html(**kwargs):
v = kwargs[k]
if isinstance(v, str):
v = [v]
elif not isinstance(v, Container):
elif not isinstance(v, collections.Container):
v = [v]
for v2 in v:
s = '<li>%s: %s</li>' % (k,v2)
@ -197,22 +212,22 @@ def _website_expected_error_response(res, bucket_name, status, reason, code, con
errorcode = res.getheader('x-amz-error-code', None)
if errorcode is not None:
if code is not IGNORE_FIELD:
assert errorcode == code
eq(errorcode, code)
if not isinstance(content, Container):
if not isinstance(content, collections.Container):
content = set([content])
for f in content:
if f is not IGNORE_FIELD and f is not None:
f = bytes(f, 'utf-8')
assert f in body, 'HTML should contain "%s"' % (f, )
ok(f in body, 'HTML should contain "%s"' % (f, ))
def _website_expected_redirect_response(res, status, reason, new_url):
body = res.read()
print(body)
__website_expected_reponse_status(res, status, reason)
loc = res.getheader('Location', None)
assert loc == new_url, 'Location header should be set "%s" != "%s"' % (loc,new_url,)
assert len(body) == 0, 'Body of a redirect should be empty'
eq(loc, new_url, 'Location header should be set "%s" != "%s"' % (loc,new_url,))
ok(len(body) == 0, 'Body of a redirect should be empty')
def _website_request(bucket_name, path, connect_hostname=None, method='GET', timeout=None):
url = get_website_url(proto='http', bucket=bucket_name, path=path)
@ -231,16 +246,26 @@ def _website_request(bucket_name, path, connect_hostname=None, method='GET', tim
return res
# ---------- Non-existant buckets via the website endpoint
@pytest.mark.s3website
@pytest.mark.fails_on_rgw
@attr(resource='bucket')
@attr(method='get')
@attr(operation='list')
@attr(assertion='non-existant bucket via website endpoint should give NoSuchBucket, exposing security risk')
@attr('s3website')
@attr('fails_on_rgw')
@nose.with_setup(setup=check_can_test_website, teardown=common.teardown)
def test_website_nonexistant_bucket_s3():
bucket_name = get_new_bucket_name()
res = _website_request(bucket_name, '')
_website_expected_error_response(res, bucket_name, 404, 'Not Found', 'NoSuchBucket', content=_website_expected_default_html(Code='NoSuchBucket'))
@pytest.mark.s3website
@pytest.mark.fails_on_s3
@pytest.mark.fails_on_dbstore
@attr(resource='bucket')
@attr(method='get')
@attr(operation='list')
#@attr(assertion='non-existant bucket via website endpoint should give Forbidden, keeping bucket identity secure')
@attr(assertion='non-existant bucket via website endpoint should give NoSuchBucket')
@attr('s3website')
@attr('fails_on_s3')
@nose.with_setup(setup=check_can_test_website, teardown=common.teardown)
def test_website_nonexistant_bucket_rgw():
bucket_name = get_new_bucket_name()
res = _website_request(bucket_name, '')
@ -248,9 +273,13 @@ def test_website_nonexistant_bucket_rgw():
_website_expected_error_response(res, bucket_name, 404, 'Not Found', 'NoSuchBucket', content=_website_expected_default_html(Code='NoSuchBucket'))
#------------- IndexDocument only, successes
@pytest.mark.s3website
@pytest.mark.fails_on_dbstore
@pytest.mark.timeout(10)
@attr(resource='bucket')
@attr(method='get')
@attr(operation='list')
@attr(assertion='non-empty public buckets via s3website return page for /, where page is public')
@attr('s3website')
@nose.with_setup(setup=check_can_test_website, teardown=common.teardown)
@timed(10)
def test_website_public_bucket_list_public_index():
bucket = get_new_bucket()
f = _test_website_prep(bucket, WEBSITE_CONFIGS_XMLFRAG['IndexDoc'])
@ -267,13 +296,17 @@ def test_website_public_bucket_list_public_index():
body = res.read()
print(body)
indexstring = bytes(indexstring, 'utf-8')
assert body == indexstring # default content should match index.html set content
eq(body, indexstring) # default content should match index.html set content
__website_expected_reponse_status(res, 200, 'OK')
indexhtml.delete()
bucket.delete()
@pytest.mark.s3website
@pytest.mark.fails_on_dbstore
@attr(resource='bucket')
@attr(method='get')
@attr(operation='list')
@attr(assertion='non-empty private buckets via s3website return page for /, where page is private')
@attr('s3website')
@nose.with_setup(setup=check_can_test_website, teardown=common.teardown)
def test_website_private_bucket_list_public_index():
bucket = get_new_bucket()
f = _test_website_prep(bucket, WEBSITE_CONFIGS_XMLFRAG['IndexDoc'])
@ -292,14 +325,18 @@ def test_website_private_bucket_list_public_index():
body = res.read()
print(body)
indexstring = bytes(indexstring, 'utf-8')
assert body == indexstring, 'default content should match index.html set content'
eq(body, indexstring, 'default content should match index.html set content')
indexhtml.delete()
bucket.delete()
# ---------- IndexDocument only, failures
@pytest.mark.s3website
@pytest.mark.fails_on_dbstore
@attr(resource='bucket')
@attr(method='get')
@attr(operation='list')
@attr(assertion='empty private buckets via s3website return a 403 for /')
@attr('s3website')
@nose.with_setup(setup=check_can_test_website, teardown=common.teardown)
def test_website_private_bucket_list_empty():
bucket = get_new_bucket()
f = _test_website_prep(bucket, WEBSITE_CONFIGS_XMLFRAG['IndexDoc'])
@ -310,8 +347,12 @@ def test_website_private_bucket_list_empty():
_website_expected_error_response(res, bucket.name, 403, 'Forbidden', 'AccessDenied', content=_website_expected_default_html(Code='AccessDenied'))
bucket.delete()
@pytest.mark.s3website
@pytest.mark.fails_on_dbstore
@attr(resource='bucket')
@attr(method='get')
@attr(operation='list')
@attr(assertion='empty public buckets via s3website return a 404 for /')
@attr('s3website')
@nose.with_setup(setup=check_can_test_website, teardown=common.teardown)
def test_website_public_bucket_list_empty():
bucket = get_new_bucket()
f = _test_website_prep(bucket, WEBSITE_CONFIGS_XMLFRAG['IndexDoc'])
@ -321,8 +362,12 @@ def test_website_public_bucket_list_empty():
_website_expected_error_response(res, bucket.name, 404, 'Not Found', 'NoSuchKey', content=_website_expected_default_html(Code='NoSuchKey'))
bucket.delete()
@pytest.mark.s3website
@pytest.mark.fails_on_dbstore
@attr(resource='bucket')
@attr(method='get')
@attr(operation='list')
@attr(assertion='non-empty public buckets via s3website return page for /, where page is private')
@attr('s3website')
@nose.with_setup(setup=check_can_test_website, teardown=common.teardown)
def test_website_public_bucket_list_private_index():
bucket = get_new_bucket()
f = _test_website_prep(bucket, WEBSITE_CONFIGS_XMLFRAG['IndexDoc'])
@ -342,8 +387,12 @@ def test_website_public_bucket_list_private_index():
indexhtml.delete()
bucket.delete()
@pytest.mark.s3website
@pytest.mark.fails_on_dbstore
@attr(resource='bucket')
@attr(method='get')
@attr(operation='list')
@attr(assertion='non-empty private buckets via s3website return page for /, where page is private')
@attr('s3website')
@nose.with_setup(setup=check_can_test_website, teardown=common.teardown)
def test_website_private_bucket_list_private_index():
bucket = get_new_bucket()
f = _test_website_prep(bucket, WEBSITE_CONFIGS_XMLFRAG['IndexDoc'])
@ -364,8 +413,12 @@ def test_website_private_bucket_list_private_index():
bucket.delete()
# ---------- IndexDocument & ErrorDocument, failures due to errordoc assigned but missing
@pytest.mark.s3website
@pytest.mark.fails_on_dbstore
@attr(resource='bucket')
@attr(method='get')
@attr(operation='list')
@attr(assertion='empty private buckets via s3website return a 403 for /, missing errordoc')
@attr('s3website')
@nose.with_setup(setup=check_can_test_website, teardown=common.teardown)
def test_website_private_bucket_list_empty_missingerrordoc():
bucket = get_new_bucket()
f = _test_website_prep(bucket, WEBSITE_CONFIGS_XMLFRAG['IndexDocErrorDoc'])
@ -376,8 +429,12 @@ def test_website_private_bucket_list_empty_missingerrordoc():
bucket.delete()
@pytest.mark.s3website
@pytest.mark.fails_on_dbstore
@attr(resource='bucket')
@attr(method='get')
@attr(operation='list')
@attr(assertion='empty public buckets via s3website return a 404 for /, missing errordoc')
@attr('s3website')
@nose.with_setup(setup=check_can_test_website, teardown=common.teardown)
def test_website_public_bucket_list_empty_missingerrordoc():
bucket = get_new_bucket()
f = _test_website_prep(bucket, WEBSITE_CONFIGS_XMLFRAG['IndexDocErrorDoc'])
@ -387,8 +444,12 @@ def test_website_public_bucket_list_empty_missingerrordoc():
_website_expected_error_response(res, bucket.name, 404, 'Not Found', 'NoSuchKey')
bucket.delete()
@pytest.mark.s3website
@pytest.mark.fails_on_dbstore
@attr(resource='bucket')
@attr(method='get')
@attr(operation='list')
@attr(assertion='non-empty public buckets via s3website return page for /, where page is private, missing errordoc')
@attr('s3website')
@nose.with_setup(setup=check_can_test_website, teardown=common.teardown)
def test_website_public_bucket_list_private_index_missingerrordoc():
bucket = get_new_bucket()
f = _test_website_prep(bucket, WEBSITE_CONFIGS_XMLFRAG['IndexDocErrorDoc'])
@ -407,8 +468,12 @@ def test_website_public_bucket_list_private_index_missingerrordoc():
indexhtml.delete()
bucket.delete()
@pytest.mark.s3website
@pytest.mark.fails_on_dbstore
@attr(resource='bucket')
@attr(method='get')
@attr(operation='list')
@attr(assertion='non-empty private buckets via s3website return page for /, where page is private, missing errordoc')
@attr('s3website')
@nose.with_setup(setup=check_can_test_website, teardown=common.teardown)
def test_website_private_bucket_list_private_index_missingerrordoc():
bucket = get_new_bucket()
f = _test_website_prep(bucket, WEBSITE_CONFIGS_XMLFRAG['IndexDocErrorDoc'])
@ -428,8 +493,12 @@ def test_website_private_bucket_list_private_index_missingerrordoc():
bucket.delete()
# ---------- IndexDocument & ErrorDocument, failures due to errordoc assigned but not accessible
@pytest.mark.s3website
@pytest.mark.fails_on_dbstore
@attr(resource='bucket')
@attr(method='get')
@attr(operation='list')
@attr(assertion='empty private buckets via s3website return a 403 for /, blocked errordoc')
@attr('s3website')
@nose.with_setup(setup=check_can_test_website, teardown=common.teardown)
def test_website_private_bucket_list_empty_blockederrordoc():
bucket = get_new_bucket()
f = _test_website_prep(bucket, WEBSITE_CONFIGS_XMLFRAG['IndexDocErrorDoc'])
@ -447,13 +516,17 @@ def test_website_private_bucket_list_empty_blockederrordoc():
print(body)
_website_expected_error_response(res, bucket.name, 403, 'Forbidden', 'AccessDenied', content=_website_expected_default_html(Code='AccessDenied'), body=body)
errorstring = bytes(errorstring, 'utf-8')
assert errorstring not in body, 'error content should NOT match error.html set content'
ok(errorstring not in body, 'error content should NOT match error.html set content')
errorhtml.delete()
bucket.delete()
@pytest.mark.s3website
@pytest.mark.fails_on_dbstore
@attr(resource='bucket')
@attr(method='get')
@attr(operation='list')
@attr(assertion='check if there is an invalid payload after serving error doc')
@attr('s3website')
@nose.with_setup(setup=check_can_test_website, teardown=common.teardown)
def test_website_public_bucket_list_pubilc_errordoc():
bucket = get_new_bucket()
f = _test_website_prep(bucket, WEBSITE_CONFIGS_XMLFRAG['IndexDocErrorDoc'])
@ -494,13 +567,17 @@ def test_website_public_bucket_list_pubilc_errordoc():
except socket.timeout:
print('no invalid payload')
assert resp_len == 0, 'invalid payload'
ok(resp_len == 0, 'invalid payload')
errorhtml.delete()
bucket.delete()
@pytest.mark.s3website
@pytest.mark.fails_on_dbstore
@attr(resource='bucket')
@attr(method='get')
@attr(operation='list')
@attr(assertion='empty public buckets via s3website return a 404 for /, blocked errordoc')
@attr('s3website')
@nose.with_setup(setup=check_can_test_website, teardown=common.teardown)
def test_website_public_bucket_list_empty_blockederrordoc():
bucket = get_new_bucket()
f = _test_website_prep(bucket, WEBSITE_CONFIGS_XMLFRAG['IndexDocErrorDoc'])
@ -517,13 +594,17 @@ def test_website_public_bucket_list_empty_blockederrordoc():
print(body)
_website_expected_error_response(res, bucket.name, 404, 'Not Found', 'NoSuchKey', content=_website_expected_default_html(Code='NoSuchKey'), body=body)
errorstring = bytes(errorstring, 'utf-8')
assert errorstring not in body, 'error content should match error.html set content'
ok(errorstring not in body, 'error content should match error.html set content')
errorhtml.delete()
bucket.delete()
@pytest.mark.s3website
@pytest.mark.fails_on_dbstore
@attr(resource='bucket')
@attr(method='get')
@attr(operation='list')
@attr(assertion='non-empty public buckets via s3website return page for /, where page is private, blocked errordoc')
@attr('s3website')
@nose.with_setup(setup=check_can_test_website, teardown=common.teardown)
def test_website_public_bucket_list_private_index_blockederrordoc():
bucket = get_new_bucket()
f = _test_website_prep(bucket, WEBSITE_CONFIGS_XMLFRAG['IndexDocErrorDoc'])
@ -545,14 +626,18 @@ def test_website_public_bucket_list_private_index_blockederrordoc():
print(body)
_website_expected_error_response(res, bucket.name, 403, 'Forbidden', 'AccessDenied', content=_website_expected_default_html(Code='AccessDenied'), body=body)
errorstring = bytes(errorstring, 'utf-8')
assert errorstring not in body, 'error content should match error.html set content'
ok(errorstring not in body, 'error content should match error.html set content')
indexhtml.delete()
errorhtml.delete()
bucket.delete()
@pytest.mark.s3website
@pytest.mark.fails_on_dbstore
@attr(resource='bucket')
@attr(method='get')
@attr(operation='list')
@attr(assertion='non-empty private buckets via s3website return page for /, where page is private, blocked errordoc')
@attr('s3website')
@nose.with_setup(setup=check_can_test_website, teardown=common.teardown)
def test_website_private_bucket_list_private_index_blockederrordoc():
bucket = get_new_bucket()
f = _test_website_prep(bucket, WEBSITE_CONFIGS_XMLFRAG['IndexDocErrorDoc'])
@ -574,15 +659,19 @@ def test_website_private_bucket_list_private_index_blockederrordoc():
print(body)
_website_expected_error_response(res, bucket.name, 403, 'Forbidden', 'AccessDenied', content=_website_expected_default_html(Code='AccessDenied'), body=body)
errorstring = bytes(errorstring, 'utf-8')
assert errorstring not in body, 'error content should match error.html set content'
ok(errorstring not in body, 'error content should match error.html set content')
indexhtml.delete()
errorhtml.delete()
bucket.delete()
# ---------- IndexDocument & ErrorDocument, failures with errordoc available
@pytest.mark.s3website
@pytest.mark.fails_on_dbstore
@attr(resource='bucket')
@attr(method='get')
@attr(operation='list')
@attr(assertion='empty private buckets via s3website return a 403 for /, good errordoc')
@attr('s3website')
@nose.with_setup(setup=check_can_test_website, teardown=common.teardown)
def test_website_private_bucket_list_empty_gooderrordoc():
bucket = get_new_bucket()
f = _test_website_prep(bucket, WEBSITE_CONFIGS_XMLFRAG['IndexDocErrorDoc'])
@ -600,8 +689,12 @@ def test_website_private_bucket_list_empty_gooderrordoc():
errorhtml.delete()
bucket.delete()
@pytest.mark.s3website
@pytest.mark.fails_on_dbstore
@attr(resource='bucket')
@attr(method='get')
@attr(operation='list')
@attr(assertion='empty public buckets via s3website return a 404 for /, good errordoc')
@attr('s3website')
@nose.with_setup(setup=check_can_test_website, teardown=common.teardown)
def test_website_public_bucket_list_empty_gooderrordoc():
bucket = get_new_bucket()
f = _test_website_prep(bucket, WEBSITE_CONFIGS_XMLFRAG['IndexDocErrorDoc'])
@ -620,8 +713,12 @@ def test_website_public_bucket_list_empty_gooderrordoc():
errorhtml.delete()
bucket.delete()
@pytest.mark.s3website
@pytest.mark.fails_on_dbstore
@attr(resource='bucket')
@attr(method='get')
@attr(operation='list')
@attr(assertion='non-empty public buckets via s3website return page for /, where page is private')
@attr('s3website')
@nose.with_setup(setup=check_can_test_website, teardown=common.teardown)
def test_website_public_bucket_list_private_index_gooderrordoc():
bucket = get_new_bucket()
f = _test_website_prep(bucket, WEBSITE_CONFIGS_XMLFRAG['IndexDocErrorDoc'])
@ -645,8 +742,12 @@ def test_website_public_bucket_list_private_index_gooderrordoc():
errorhtml.delete()
bucket.delete()
@pytest.mark.s3website
@pytest.mark.fails_on_dbstore
@attr(resource='bucket')
@attr(method='get')
@attr(operation='list')
@attr(assertion='non-empty private buckets via s3website return page for /, where page is private')
@attr('s3website')
@nose.with_setup(setup=check_can_test_website, teardown=common.teardown)
def test_website_private_bucket_list_private_index_gooderrordoc():
bucket = get_new_bucket()
f = _test_website_prep(bucket, WEBSITE_CONFIGS_XMLFRAG['IndexDocErrorDoc'])
@ -671,8 +772,12 @@ def test_website_private_bucket_list_private_index_gooderrordoc():
bucket.delete()
# ------ RedirectAll tests
@pytest.mark.s3website
@pytest.mark.fails_on_dbstore
@attr(resource='bucket')
@attr(method='get')
@attr(operation='list')
@attr(assertion='RedirectAllRequestsTo without protocol should TODO')
@attr('s3website')
@nose.with_setup(setup=check_can_test_website, teardown=common.teardown)
def test_website_bucket_private_redirectall_base():
bucket = get_new_bucket()
f = _test_website_prep(bucket, WEBSITE_CONFIGS_XMLFRAG['RedirectAll'])
@ -684,8 +789,12 @@ def test_website_bucket_private_redirectall_base():
bucket.delete()
@pytest.mark.s3website
@pytest.mark.fails_on_dbstore
@attr(resource='bucket')
@attr(method='get')
@attr(operation='list')
@attr(assertion='RedirectAllRequestsTo without protocol should TODO')
@attr('s3website')
@nose.with_setup(setup=check_can_test_website, teardown=common.teardown)
def test_website_bucket_private_redirectall_path():
bucket = get_new_bucket()
f = _test_website_prep(bucket, WEBSITE_CONFIGS_XMLFRAG['RedirectAll'])
@ -699,8 +808,12 @@ def test_website_bucket_private_redirectall_path():
bucket.delete()
@pytest.mark.s3website
@pytest.mark.fails_on_dbstore
@attr(resource='bucket')
@attr(method='get')
@attr(operation='list')
@attr(assertion='RedirectAllRequestsTo without protocol should TODO')
@attr('s3website')
@nose.with_setup(setup=check_can_test_website, teardown=common.teardown)
def test_website_bucket_private_redirectall_path_upgrade():
bucket = get_new_bucket()
x = string.Template(WEBSITE_CONFIGS_XMLFRAG['RedirectAll+Protocol']).safe_substitute(RedirectAllRequestsTo_Protocol='https')
@ -716,9 +829,13 @@ def test_website_bucket_private_redirectall_path_upgrade():
bucket.delete()
# ------ x-amz redirect tests
@pytest.mark.s3website
@pytest.mark.s3website_redirect_location
@pytest.mark.fails_on_dbstore
@attr(resource='bucket')
@attr(method='get')
@attr(operation='list')
@attr(assertion='x-amz-website-redirect-location should not fire without websiteconf')
@attr('s3website')
@attr('x-amz-website-redirect-location')
@nose.with_setup(setup=check_can_test_website, teardown=common.teardown)
def test_website_xredirect_nonwebsite():
bucket = get_new_bucket()
#f = _test_website_prep(bucket, WEBSITE_CONFIGS_XMLFRAG['RedirectAll'])
@ -730,7 +847,7 @@ def test_website_xredirect_nonwebsite():
headers = {'x-amz-website-redirect-location': redirect_dest}
k.set_contents_from_string(content, headers=headers, policy='public-read')
redirect = k.get_redirect()
assert k.get_redirect() == redirect_dest
eq(k.get_redirect(), redirect_dest)
res = _website_request(bucket.name, '/page')
body = res.read()
@ -744,9 +861,13 @@ def test_website_xredirect_nonwebsite():
k.delete()
bucket.delete()
@pytest.mark.s3website
@pytest.mark.s3website_redirect_location
@pytest.mark.fails_on_dbstore
@attr(resource='bucket')
@attr(method='get')
@attr(operation='list')
@attr(assertion='x-amz-website-redirect-location should fire websiteconf, relative path, public key')
@attr('s3website')
@attr('x-amz-website-redirect-location')
@nose.with_setup(setup=check_can_test_website, teardown=common.teardown)
def test_website_xredirect_public_relative():
bucket = get_new_bucket()
f = _test_website_prep(bucket, WEBSITE_CONFIGS_XMLFRAG['IndexDoc'])
@ -758,7 +879,7 @@ def test_website_xredirect_public_relative():
headers = {'x-amz-website-redirect-location': redirect_dest}
k.set_contents_from_string(content, headers=headers, policy='public-read')
redirect = k.get_redirect()
assert k.get_redirect() == redirect_dest
eq(k.get_redirect(), redirect_dest)
res = _website_request(bucket.name, '/page')
#new_url = get_website_url(bucket_name=bucket.name, path=redirect_dest)
@ -767,9 +888,13 @@ def test_website_xredirect_public_relative():
k.delete()
bucket.delete()
@pytest.mark.s3website
@pytest.mark.s3website_redirect_location
@pytest.mark.fails_on_dbstore
@attr(resource='bucket')
@attr(method='get')
@attr(operation='list')
@attr(assertion='x-amz-website-redirect-location should fire websiteconf, absolute, public key')
@attr('s3website')
@attr('x-amz-website-redirect-location')
@nose.with_setup(setup=check_can_test_website, teardown=common.teardown)
def test_website_xredirect_public_abs():
bucket = get_new_bucket()
f = _test_website_prep(bucket, WEBSITE_CONFIGS_XMLFRAG['IndexDoc'])
@ -781,7 +906,7 @@ def test_website_xredirect_public_abs():
headers = {'x-amz-website-redirect-location': redirect_dest}
k.set_contents_from_string(content, headers=headers, policy='public-read')
redirect = k.get_redirect()
assert k.get_redirect() == redirect_dest
eq(k.get_redirect(), redirect_dest)
res = _website_request(bucket.name, '/page')
new_url = get_website_url(proto='http', hostname='example.com', path='/foo')
@ -790,9 +915,13 @@ def test_website_xredirect_public_abs():
k.delete()
bucket.delete()
@pytest.mark.s3website
@pytest.mark.s3website_redirect_location
@pytest.mark.fails_on_dbstore
@attr(resource='bucket')
@attr(method='get')
@attr(operation='list')
@attr(assertion='x-amz-website-redirect-location should fire websiteconf, relative path, private key')
@attr('s3website')
@attr('x-amz-website-redirect-location')
@nose.with_setup(setup=check_can_test_website, teardown=common.teardown)
def test_website_xredirect_private_relative():
bucket = get_new_bucket()
f = _test_website_prep(bucket, WEBSITE_CONFIGS_XMLFRAG['IndexDoc'])
@ -804,7 +933,7 @@ def test_website_xredirect_private_relative():
headers = {'x-amz-website-redirect-location': redirect_dest}
k.set_contents_from_string(content, headers=headers, policy='private')
redirect = k.get_redirect()
assert k.get_redirect() == redirect_dest
eq(k.get_redirect(), redirect_dest)
res = _website_request(bucket.name, '/page')
# We get a 403 because the page is private
@ -813,9 +942,13 @@ def test_website_xredirect_private_relative():
k.delete()
bucket.delete()
@pytest.mark.s3website
@pytest.mark.s3website_redirect_location
@pytest.mark.fails_on_dbstore
@attr(resource='bucket')
@attr(method='get')
@attr(operation='list')
@attr(assertion='x-amz-website-redirect-location should fire websiteconf, absolute, private key')
@attr('s3website')
@attr('x-amz-website-redirect-location')
@nose.with_setup(setup=check_can_test_website, teardown=common.teardown)
def test_website_xredirect_private_abs():
bucket = get_new_bucket()
f = _test_website_prep(bucket, WEBSITE_CONFIGS_XMLFRAG['IndexDoc'])
@ -827,7 +960,7 @@ def test_website_xredirect_private_abs():
headers = {'x-amz-website-redirect-location': redirect_dest}
k.set_contents_from_string(content, headers=headers, policy='private')
redirect = k.get_redirect()
assert k.get_redirect() == redirect_dest
eq(k.get_redirect(), redirect_dest)
res = _website_request(bucket.name, '/page')
new_url = get_website_url(proto='http', hostname='example.com', path='/foo')
@ -1040,6 +1173,8 @@ def routing_teardown(**kwargs):
print('Deleting', str(o))
o.delete()
@common.with_setup_kwargs(setup=routing_setup, teardown=routing_teardown)
#@timed(10)
def routing_check(*args, **kwargs):
bucket = kwargs['bucket']
args=args[0]
@ -1065,8 +1200,8 @@ def routing_check(*args, **kwargs):
if args['code'] >= 200 and args['code'] < 300:
#body = res.read()
#print(body)
#assert body == args['content'], 'default content should match index.html set content'
assert int(res.getheader('Content-Length', -1)) > 0
#eq(body, args['content'], 'default content should match index.html set content')
ok(int(res.getheader('Content-Length', -1)) > 0)
elif args['code'] >= 300 and args['code'] < 400:
_website_expected_redirect_response(res, args['code'], IGNORE_FIELD, new_url)
elif args['code'] >= 400:
@ -1074,9 +1209,9 @@ def routing_check(*args, **kwargs):
else:
assert(False)
@pytest.mark.s3website_routing_rules
@pytest.mark.s3website
@pytest.mark.fails_on_dbstore
@attr('s3website_RoutingRules')
@attr('s3website')
@nose.with_setup(setup=check_can_test_website, teardown=common.teardown)
def test_routing_generator():
for t in ROUTING_RULES_TESTS:
if 'xml' in t and 'RoutingRules' in t['xml'] and len(t['xml']['RoutingRules']) > 0:

View file

@ -1,9 +1,11 @@
from nose.tools import eq_ as eq
from . import utils
def test_generate():
FIVE_MB = 5 * 1024 * 1024
assert len(''.join(utils.generate_random(0))) == 0
assert len(''.join(utils.generate_random(1))) == 1
assert len(''.join(utils.generate_random(FIVE_MB - 1))) == FIVE_MB - 1
assert len(''.join(utils.generate_random(FIVE_MB))) == FIVE_MB
assert len(''.join(utils.generate_random(FIVE_MB + 1))) == FIVE_MB + 1
eq(len(''.join(utils.generate_random(0))), 0)
eq(len(''.join(utils.generate_random(1))), 1)
eq(len(''.join(utils.generate_random(FIVE_MB - 1))), FIVE_MB - 1)
eq(len(''.join(utils.generate_random(FIVE_MB))), FIVE_MB)
eq(len(''.join(utils.generate_random(FIVE_MB + 1))), FIVE_MB + 1)

View file

@ -3,6 +3,8 @@ import requests
import string
import time
from nose.tools import eq_ as eq
def assert_raises(excClass, callableObj, *args, **kwargs):
"""
Like unittest.TestCase.assertRaises, but returns the exception.
@ -46,7 +48,7 @@ def region_sync_meta(targets, region):
conf = r.conf
if conf.sync_agent_addr:
ret = requests.post('http://{addr}:{port}/metadata/incremental'.format(addr = conf.sync_agent_addr, port = conf.sync_agent_port))
assert ret.status_code == 200
eq(ret.status_code, 200)
if conf.sync_meta_wait:
time.sleep(conf.sync_meta_wait)

View file

@ -1,4 +1,3 @@
import pytest
import boto3
from botocore import UNSIGNED
from botocore.client import Config
@ -13,7 +12,6 @@ import random
import string
import itertools
import urllib3
import re
config = munch.Munch
@ -82,13 +80,18 @@ def get_objects_list(bucket, client=None, prefix=None):
# generator function that returns object listings in batches, where each
# batch is a list of dicts compatible with delete_objects()
def list_versions(client, bucket, batch_size):
kwargs = {'Bucket': bucket, 'MaxKeys': batch_size}
key_marker = ''
version_marker = ''
truncated = True
while truncated:
listing = client.list_object_versions(**kwargs)
listing = client.list_object_versions(
Bucket=bucket,
KeyMarker=key_marker,
VersionIdMarker=version_marker,
MaxKeys=batch_size)
kwargs['KeyMarker'] = listing.get('NextKeyMarker')
kwargs['VersionIdMarker'] = listing.get('NextVersionIdMarker')
key_marker = listing.get('NextKeyMarker')
version_marker = listing.get('NextVersionIdMarker')
truncated = listing['IsTruncated']
objs = listing.get('Versions', []) + listing.get('DeleteMarkers', [])
@ -160,21 +163,7 @@ def nuke_prefixed_buckets(prefix, client=None):
print('Done with cleanup of buckets in tests.')
def configured_storage_classes():
sc = ['STANDARD']
extra_sc = re.split(r"[\b\W\b]+", config.storage_classes)
for item in extra_sc:
if item != 'STANDARD':
sc.append(item)
sc = [i for i in sc if i]
print("storage classes configured: " + str(sc))
return sc
def configure():
def setup():
cfg = configparser.RawConfigParser()
try:
path = os.environ['S3TEST_CONF']
@ -237,17 +226,6 @@ def configure():
config.main_api_name = ""
pass
try:
config.storage_classes = cfg.get('s3 main',"storage_classes")
except (configparser.NoSectionError, configparser.NoOptionError):
config.storage_classes = ""
pass
try:
config.lc_debug_interval = int(cfg.get('s3 main',"lc_debug_interval"))
except (configparser.NoSectionError, configparser.NoOptionError):
config.lc_debug_interval = 10
config.alt_access_key = cfg.get('s3 alt',"access_key")
config.alt_secret_key = cfg.get('s3 alt',"secret_key")
config.alt_display_name = cfg.get('s3 alt',"display_name")
@ -259,44 +237,21 @@ def configure():
config.tenant_display_name = cfg.get('s3 tenant',"display_name")
config.tenant_user_id = cfg.get('s3 tenant',"user_id")
config.tenant_email = cfg.get('s3 tenant',"email")
config.tenant_name = cfg.get('s3 tenant',"tenant")
config.iam_access_key = cfg.get('iam',"access_key")
config.iam_secret_key = cfg.get('iam',"secret_key")
config.iam_display_name = cfg.get('iam',"display_name")
config.iam_user_id = cfg.get('iam',"user_id")
config.iam_email = cfg.get('iam',"email")
config.iam_root_access_key = cfg.get('iam root',"access_key")
config.iam_root_secret_key = cfg.get('iam root',"secret_key")
config.iam_root_user_id = cfg.get('iam root',"user_id")
config.iam_root_email = cfg.get('iam root',"email")
config.iam_alt_root_access_key = cfg.get('iam alt root',"access_key")
config.iam_alt_root_secret_key = cfg.get('iam alt root',"secret_key")
config.iam_alt_root_user_id = cfg.get('iam alt root',"user_id")
config.iam_alt_root_email = cfg.get('iam alt root',"email")
# vars from the fixtures section
template = cfg.get('fixtures', "bucket prefix", fallback='test-{random}-')
try:
template = cfg.get('fixtures', "bucket prefix")
except (configparser.NoOptionError):
template = 'test-{random}-'
prefix = choose_bucket_prefix(template=template)
template = cfg.get('fixtures', "iam name prefix", fallback="s3-tests-")
config.iam_name_prefix = choose_bucket_prefix(template=template)
template = cfg.get('fixtures', "iam path prefix", fallback="/s3-tests/")
config.iam_path_prefix = choose_bucket_prefix(template=template)
if cfg.has_section("s3 cloud"):
get_cloud_config(cfg)
else:
config.cloud_storage_class = None
def setup():
alt_client = get_alt_client()
tenant_client = get_tenant_client()
nuke_prefixed_buckets(prefix=prefix)
nuke_prefixed_buckets(prefix=prefix, client=alt_client)
nuke_prefixed_buckets(prefix=prefix, client=tenant_client)
def teardown():
alt_client = get_alt_client()
tenant_client = get_tenant_client()
@ -322,17 +277,6 @@ def teardown():
except:
pass
@pytest.fixture(scope="package")
def configfile():
configure()
return config
@pytest.fixture(autouse=True)
def setup_teardown(configfile):
setup()
yield
teardown()
def check_webidentity():
cfg = configparser.RawConfigParser()
try:
@ -350,46 +294,6 @@ def check_webidentity():
config.webidentity_aud = cfg.get('webidentity', "aud")
config.webidentity_token = cfg.get('webidentity', "token")
config.webidentity_realm = cfg.get('webidentity', "KC_REALM")
config.webidentity_sub = cfg.get('webidentity', "sub")
config.webidentity_azp = cfg.get('webidentity', "azp")
config.webidentity_user_token = cfg.get('webidentity', "user_token")
def get_cloud_config(cfg):
config.cloud_host = cfg.get('s3 cloud',"host")
config.cloud_port = int(cfg.get('s3 cloud',"port"))
config.cloud_is_secure = cfg.getboolean('s3 cloud', "is_secure")
proto = 'https' if config.cloud_is_secure else 'http'
config.cloud_endpoint = "%s://%s:%d" % (proto, config.cloud_host, config.cloud_port)
config.cloud_access_key = cfg.get('s3 cloud',"access_key")
config.cloud_secret_key = cfg.get('s3 cloud',"secret_key")
try:
config.cloud_storage_class = cfg.get('s3 cloud', "cloud_storage_class")
except (configparser.NoSectionError, configparser.NoOptionError):
config.cloud_storage_class = None
try:
config.cloud_retain_head_object = cfg.get('s3 cloud',"retain_head_object")
except (configparser.NoSectionError, configparser.NoOptionError):
config.cloud_retain_head_object = None
try:
config.cloud_target_path = cfg.get('s3 cloud',"target_path")
except (configparser.NoSectionError, configparser.NoOptionError):
config.cloud_target_path = None
try:
config.cloud_target_storage_class = cfg.get('s3 cloud',"target_storage_class")
except (configparser.NoSectionError, configparser.NoOptionError):
config.cloud_target_storage_class = 'STANDARD'
try:
config.cloud_regular_storage_class = cfg.get('s3 cloud', "storage_class")
except (configparser.NoSectionError, configparser.NoOptionError):
config.cloud_regular_storage_class = None
def get_client(client_config=None):
if client_config == None:
@ -414,64 +318,51 @@ def get_v2_client():
config=Config(signature_version='s3'))
return client
def get_sts_client(**kwargs):
kwargs.setdefault('aws_access_key_id', config.alt_access_key)
kwargs.setdefault('aws_secret_access_key', config.alt_secret_key)
kwargs.setdefault('config', Config(signature_version='s3v4'))
def get_sts_client(client_config=None):
if client_config == None:
client_config = Config(signature_version='s3v4')
client = boto3.client(service_name='sts',
endpoint_url=config.default_endpoint,
region_name='',
use_ssl=config.default_is_secure,
verify=config.default_ssl_verify,
**kwargs)
return client
def get_iam_client(**kwargs):
kwargs.setdefault('aws_access_key_id', config.iam_access_key)
kwargs.setdefault('aws_secret_access_key', config.iam_secret_key)
client = boto3.client(service_name='iam',
aws_access_key_id=config.alt_access_key,
aws_secret_access_key=config.alt_secret_key,
endpoint_url=config.default_endpoint,
region_name='',
use_ssl=config.default_is_secure,
verify=config.default_ssl_verify,
**kwargs)
config=client_config)
return client
def get_iam_s3client(**kwargs):
kwargs.setdefault('aws_access_key_id', config.iam_access_key)
kwargs.setdefault('aws_secret_access_key', config.iam_secret_key)
kwargs.setdefault('config', Config(signature_version='s3v4'))
def get_iam_client(client_config=None):
cfg = configparser.RawConfigParser()
try:
path = os.environ['S3TEST_CONF']
except KeyError:
raise RuntimeError(
'To run tests, point environment '
+ 'variable S3TEST_CONF to a config file.',
)
cfg.read(path)
if not cfg.has_section("iam"):
raise RuntimeError('Your config file is missing the "iam" section!')
client = boto3.client(service_name='s3',
endpoint_url=config.default_endpoint,
use_ssl=config.default_is_secure,
verify=config.default_ssl_verify,
**kwargs)
return client
config.iam_access_key = cfg.get('iam',"access_key")
config.iam_secret_key = cfg.get('iam',"secret_key")
config.iam_display_name = cfg.get('iam',"display_name")
config.iam_user_id = cfg.get('iam',"user_id")
config.iam_email = cfg.get('iam',"email")
def get_iam_root_client(**kwargs):
kwargs.setdefault('service_name', 'iam')
kwargs.setdefault('aws_access_key_id', config.iam_root_access_key)
kwargs.setdefault('aws_secret_access_key', config.iam_root_secret_key)
return boto3.client(endpoint_url=config.default_endpoint,
if client_config == None:
client_config = Config(signature_version='s3v4')
client = boto3.client(service_name='iam',
aws_access_key_id=config.iam_access_key,
aws_secret_access_key=config.iam_secret_key,
endpoint_url=config.default_endpoint,
region_name='',
use_ssl=config.default_is_secure,
verify=config.default_ssl_verify,
**kwargs)
def get_iam_alt_root_client(**kwargs):
kwargs.setdefault('service_name', 'iam')
kwargs.setdefault('aws_access_key_id', config.iam_alt_root_access_key)
kwargs.setdefault('aws_secret_access_key', config.iam_alt_root_secret_key)
return boto3.client(endpoint_url=config.default_endpoint,
region_name='',
use_ssl=config.default_is_secure,
verify=config.default_ssl_verify,
**kwargs)
config=client_config)
return client
def get_alt_client(client_config=None):
if client_config == None:
@ -486,18 +377,6 @@ def get_alt_client(client_config=None):
config=client_config)
return client
def get_cloud_client(client_config=None):
if client_config == None:
client_config = Config(signature_version='s3v4')
client = boto3.client(service_name='s3',
aws_access_key_id=config.cloud_access_key,
aws_secret_access_key=config.cloud_secret_key,
endpoint_url=config.cloud_endpoint,
use_ssl=config.cloud_is_secure,
config=client_config)
return client
def get_tenant_client(client_config=None):
if client_config == None:
client_config = Config(signature_version='s3v4')
@ -511,17 +390,6 @@ def get_tenant_client(client_config=None):
config=client_config)
return client
def get_v2_tenant_client():
client_config = Config(signature_version='s3')
client = boto3.client(service_name='s3',
aws_access_key_id=config.tenant_access_key,
aws_secret_access_key=config.tenant_secret_key,
endpoint_url=config.default_endpoint,
use_ssl=config.default_is_secure,
verify=config.default_ssl_verify,
config=client_config)
return client
def get_tenant_iam_client():
client = boto3.client(service_name='iam',
@ -533,17 +401,6 @@ def get_tenant_iam_client():
use_ssl=config.default_is_secure)
return client
def get_alt_iam_client():
client = boto3.client(service_name='iam',
region_name='',
aws_access_key_id=config.alt_access_key,
aws_secret_access_key=config.alt_secret_key,
endpoint_url=config.default_endpoint,
verify=config.default_ssl_verify,
use_ssl=config.default_is_secure)
return client
def get_unauthenticated_client():
client = boto3.client(service_name='s3',
aws_access_key_id='',
@ -706,9 +563,6 @@ def get_tenant_aws_secret_key():
def get_tenant_display_name():
return config.tenant_display_name
def get_tenant_name():
return config.tenant_name
def get_tenant_user_id():
return config.tenant_user_id
@ -721,62 +575,14 @@ def get_thumbprint():
def get_aud():
return config.webidentity_aud
def get_sub():
return config.webidentity_sub
def get_azp():
return config.webidentity_azp
def get_token():
return config.webidentity_token
def get_realm_name():
return config.webidentity_realm
def get_iam_name_prefix():
return config.iam_name_prefix
def make_iam_name(name):
return config.iam_name_prefix + name
def get_iam_path_prefix():
return config.iam_path_prefix
def get_iam_access_key():
return config.iam_access_key
def get_iam_secret_key():
return config.iam_secret_key
def get_iam_root_user_id():
return config.iam_root_user_id
def get_iam_root_email():
return config.iam_root_email
def get_iam_alt_root_user_id():
return config.iam_alt_root_user_id
def get_iam_alt_root_email():
return config.iam_alt_root_email
def get_user_token():
return config.webidentity_user_token
def get_cloud_storage_class():
return config.cloud_storage_class
def get_cloud_retain_head_object():
return config.cloud_retain_head_object
def get_cloud_regular_storage_class():
return config.cloud_regular_storage_class
def get_cloud_target_path():
return config.cloud_target_path
def get_cloud_target_storage_class():
return config.cloud_target_storage_class
def get_lc_debug_interval():
return config.lc_debug_interval

View file

@ -1,199 +0,0 @@
from botocore.exceptions import ClientError
import pytest
from . import (
configfile,
get_iam_root_client,
get_iam_root_user_id,
get_iam_root_email,
get_iam_alt_root_client,
get_iam_alt_root_user_id,
get_iam_alt_root_email,
get_iam_path_prefix,
)
def nuke_user_keys(client, name):
p = client.get_paginator('list_access_keys')
for response in p.paginate(UserName=name):
for key in response['AccessKeyMetadata']:
try:
client.delete_access_key(UserName=name, AccessKeyId=key['AccessKeyId'])
except:
pass
def nuke_user_policies(client, name):
p = client.get_paginator('list_user_policies')
for response in p.paginate(UserName=name):
for policy in response['PolicyNames']:
try:
client.delete_user_policy(UserName=name, PolicyName=policy)
except:
pass
def nuke_attached_user_policies(client, name):
p = client.get_paginator('list_attached_user_policies')
for response in p.paginate(UserName=name):
for policy in response['AttachedPolicies']:
try:
client.detach_user_policy(UserName=name, PolicyArn=policy['PolicyArn'])
except:
pass
def nuke_user(client, name):
# delete access keys, user policies, etc
try:
nuke_user_keys(client, name)
except:
pass
try:
nuke_user_policies(client, name)
except:
pass
try:
nuke_attached_user_policies(client, name)
except:
pass
client.delete_user(UserName=name)
def nuke_users(client, **kwargs):
p = client.get_paginator('list_users')
for response in p.paginate(**kwargs):
for user in response['Users']:
try:
nuke_user(client, user['UserName'])
except:
pass
def nuke_group_policies(client, name):
p = client.get_paginator('list_group_policies')
for response in p.paginate(GroupName=name):
for policy in response['PolicyNames']:
try:
client.delete_group_policy(GroupName=name, PolicyName=policy)
except:
pass
def nuke_attached_group_policies(client, name):
p = client.get_paginator('list_attached_group_policies')
for response in p.paginate(GroupName=name):
for policy in response['AttachedPolicies']:
try:
client.detach_group_policy(GroupName=name, PolicyArn=policy['PolicyArn'])
except:
pass
def nuke_group_users(client, name):
p = client.get_paginator('get_group')
for response in p.paginate(GroupName=name):
for user in response['Users']:
try:
client.remove_user_from_group(GroupName=name, UserName=user['UserName'])
except:
pass
def nuke_group(client, name):
# delete group policies and remove all users
try:
nuke_group_policies(client, name)
except:
pass
try:
nuke_attached_group_policies(client, name)
except:
pass
try:
nuke_group_users(client, name)
except:
pass
client.delete_group(GroupName=name)
def nuke_groups(client, **kwargs):
p = client.get_paginator('list_groups')
for response in p.paginate(**kwargs):
for user in response['Groups']:
try:
nuke_group(client, user['GroupName'])
except:
pass
def nuke_role_policies(client, name):
p = client.get_paginator('list_role_policies')
for response in p.paginate(RoleName=name):
for policy in response['PolicyNames']:
try:
client.delete_role_policy(RoleName=name, PolicyName=policy)
except:
pass
def nuke_attached_role_policies(client, name):
p = client.get_paginator('list_attached_role_policies')
for response in p.paginate(RoleName=name):
for policy in response['AttachedPolicies']:
try:
client.detach_role_policy(RoleName=name, PolicyArn=policy['PolicyArn'])
except:
pass
def nuke_role(client, name):
# delete role policies, etc
try:
nuke_role_policies(client, name)
except:
pass
try:
nuke_attached_role_policies(client, name)
except:
pass
client.delete_role(RoleName=name)
def nuke_roles(client, **kwargs):
p = client.get_paginator('list_roles')
for response in p.paginate(**kwargs):
for role in response['Roles']:
try:
nuke_role(client, role['RoleName'])
except:
pass
def nuke_oidc_providers(client, prefix):
result = client.list_open_id_connect_providers()
for provider in result['OpenIDConnectProviderList']:
arn = provider['Arn']
if f':oidc-provider{prefix}' in arn:
try:
client.delete_open_id_connect_provider(OpenIDConnectProviderArn=arn)
except:
pass
# fixture for iam account root user
@pytest.fixture
def iam_root(configfile):
client = get_iam_root_client()
try:
arn = client.get_user()['User']['Arn']
if not arn.endswith(':root'):
pytest.skip('[iam root] user does not have :root arn')
except ClientError as e:
pytest.skip('[iam root] user does not belong to an account')
yield client
nuke_users(client, PathPrefix=get_iam_path_prefix())
nuke_groups(client, PathPrefix=get_iam_path_prefix())
nuke_roles(client, PathPrefix=get_iam_path_prefix())
nuke_oidc_providers(client, get_iam_path_prefix())
# fixture for iam alt account root user
@pytest.fixture
def iam_alt_root(configfile):
client = get_iam_alt_root_client()
try:
arn = client.get_user()['User']['Arn']
if not arn.endswith(':root'):
pytest.skip('[iam alt root] user does not have :root arn')
except ClientError as e:
pytest.skip('[iam alt root] user does not belong to an account')
yield client
nuke_users(client, PathPrefix=get_iam_path_prefix())
nuke_roles(client, PathPrefix=get_iam_path_prefix())

View file

@ -37,10 +37,10 @@ class Policy(object):
return json.dumps(policy_dict)
def make_json_policy(action, resource, principal={"AWS": "*"}, effect="Allow", conditions=None):
def make_json_policy(action, resource, principal={"AWS": "*"}, conditions=None):
"""
Helper function to make single statement policies
"""
s = Statement(action, resource, principal, effect=effect, condition=conditions)
s = Statement(action, resource, principal, condition=conditions)
p = Policy()
return p.add_statement(s).to_json()

View file

@ -1,5 +1,7 @@
import boto3
import pytest
from nose.tools import eq_ as eq
from nose.plugins.attrib import attr
import nose
from botocore.exceptions import ClientError
from email.utils import formatdate
@ -8,8 +10,6 @@ from .utils import _get_status_and_error_code
from .utils import _get_status
from . import (
configfile,
setup_teardown,
get_client,
get_v2_client,
get_new_bucket,
@ -149,97 +149,178 @@ def _remove_header_create_bad_bucket(remove, client=None):
return e
def tag(*tags):
def wrap(func):
for tag in tags:
setattr(func, tag, True)
return func
return wrap
#
# common tests
#
@pytest.mark.auth_common
@tag('auth_common')
@attr(resource='object')
@attr(method='put')
@attr(operation='create w/invalid MD5')
@attr(assertion='fails 400')
def test_object_create_bad_md5_invalid_short():
e = _add_header_create_bad_object({'Content-MD5':'YWJyYWNhZGFicmE='})
status, error_code = _get_status_and_error_code(e.response)
assert status == 400
assert error_code == 'InvalidDigest'
eq(status, 400)
eq(error_code, 'InvalidDigest')
@pytest.mark.auth_common
@tag('auth_common')
@attr(resource='object')
@attr(method='put')
@attr(operation='create w/mismatched MD5')
@attr(assertion='fails 400')
def test_object_create_bad_md5_bad():
e = _add_header_create_bad_object({'Content-MD5':'rL0Y20xC+Fzt72VPzMSk2A=='})
status, error_code = _get_status_and_error_code(e.response)
assert status == 400
assert error_code == 'BadDigest'
eq(status, 400)
eq(error_code, 'BadDigest')
@pytest.mark.auth_common
@tag('auth_common')
@attr(resource='object')
@attr(method='put')
@attr(operation='create w/empty MD5')
@attr(assertion='fails 400')
def test_object_create_bad_md5_empty():
e = _add_header_create_bad_object({'Content-MD5':''})
status, error_code = _get_status_and_error_code(e.response)
assert status == 400
assert error_code == 'InvalidDigest'
eq(status, 400)
eq(error_code, 'InvalidDigest')
@pytest.mark.auth_common
@tag('auth_common')
@attr(resource='object')
@attr(method='put')
@attr(operation='create w/no MD5 header')
@attr(assertion='succeeds')
def test_object_create_bad_md5_none():
bucket_name, key_name = _remove_header_create_object('Content-MD5')
client = get_client()
client.put_object(Bucket=bucket_name, Key=key_name, Body='bar')
@pytest.mark.auth_common
@tag('auth_common')
@attr(resource='object')
@attr(method='put')
@attr(operation='create w/Expect 200')
@attr(assertion='garbage, but S3 succeeds!')
def test_object_create_bad_expect_mismatch():
bucket_name, key_name = _add_header_create_object({'Expect': 200})
client = get_client()
client.put_object(Bucket=bucket_name, Key=key_name, Body='bar')
@pytest.mark.auth_common
@tag('auth_common')
@attr(resource='object')
@attr(method='put')
@attr(operation='create w/empty expect')
@attr(assertion='succeeds ... should it?')
def test_object_create_bad_expect_empty():
bucket_name, key_name = _add_header_create_object({'Expect': ''})
client = get_client()
client.put_object(Bucket=bucket_name, Key=key_name, Body='bar')
@pytest.mark.auth_common
@tag('auth_common')
@attr(resource='object')
@attr(method='put')
@attr(operation='create w/no expect')
@attr(assertion='succeeds')
def test_object_create_bad_expect_none():
bucket_name, key_name = _remove_header_create_object('Expect')
client = get_client()
client.put_object(Bucket=bucket_name, Key=key_name, Body='bar')
@pytest.mark.auth_common
@tag('auth_common')
@attr(resource='object')
@attr(method='put')
@attr(operation='create w/empty content length')
@attr(assertion='fails 400')
# TODO: remove 'fails_on_rgw' and once we have learned how to remove the content-length header
@pytest.mark.fails_on_rgw
@attr('fails_on_rgw')
def test_object_create_bad_contentlength_empty():
e = _add_header_create_bad_object({'Content-Length':''})
status, error_code = _get_status_and_error_code(e.response)
assert status == 400
eq(status, 400)
@pytest.mark.auth_common
@pytest.mark.fails_on_mod_proxy_fcgi
@tag('auth_common')
@attr(resource='object')
@attr(method='put')
@attr(operation='create w/negative content length')
@attr(assertion='fails 400')
@attr('fails_on_mod_proxy_fcgi')
def test_object_create_bad_contentlength_negative():
client = get_client()
bucket_name = get_new_bucket()
key_name = 'foo'
e = assert_raises(ClientError, client.put_object, Bucket=bucket_name, Key=key_name, ContentLength=-1)
status = _get_status(e.response)
assert status == 400
eq(status, 400)
@pytest.mark.auth_common
@tag('auth_common')
@attr(resource='object')
@attr(method='put')
@attr(operation='create w/no content length')
@attr(assertion='fails 411')
# TODO: remove 'fails_on_rgw' and once we have learned how to remove the content-length header
@pytest.mark.fails_on_rgw
@attr('fails_on_rgw')
def test_object_create_bad_contentlength_none():
remove = 'Content-Length'
e = _remove_header_create_bad_object('Content-Length')
status, error_code = _get_status_and_error_code(e.response)
assert status == 411
assert error_code == 'MissingContentLength'
eq(status, 411)
eq(error_code, 'MissingContentLength')
@pytest.mark.auth_common
@tag('auth_common')
@attr(resource='object')
@attr(method='put')
@attr(operation='create w/content length too long')
@attr(assertion='fails 400')
# TODO: remove 'fails_on_rgw' and once we have learned how to remove the content-length header
@attr('fails_on_rgw')
def test_object_create_bad_contentlength_mismatch_above():
content = 'bar'
length = len(content) + 1
client = get_client()
bucket_name = get_new_bucket()
key_name = 'foo'
headers = {'Content-Length': str(length)}
add_headers = (lambda **kwargs: kwargs['params']['headers'].update(headers))
client.meta.events.register('before-sign.s3.PutObject', add_headers)
e = assert_raises(ClientError, client.put_object, Bucket=bucket_name, Key=key_name, Body=content)
status, error_code = _get_status_and_error_code(e.response)
eq(status, 400)
@tag('auth_common')
@attr(resource='object')
@attr(method='put')
@attr(operation='create w/content type text/plain')
@attr(assertion='succeeds')
def test_object_create_bad_contenttype_invalid():
bucket_name, key_name = _add_header_create_object({'Content-Type': 'text/plain'})
client = get_client()
client.put_object(Bucket=bucket_name, Key=key_name, Body='bar')
@pytest.mark.auth_common
@tag('auth_common')
@attr(resource='object')
@attr(method='put')
@attr(operation='create w/empty content type')
@attr(assertion='succeeds')
def test_object_create_bad_contenttype_empty():
client = get_client()
key_name = 'foo'
bucket_name = get_new_bucket()
client.put_object(Bucket=bucket_name, Key=key_name, Body='bar', ContentType='')
@pytest.mark.auth_common
@tag('auth_common')
@attr(resource='object')
@attr(method='put')
@attr(operation='create w/no content type')
@attr(assertion='succeeds')
def test_object_create_bad_contenttype_none():
bucket_name = get_new_bucket()
key_name = 'foo'
@ -248,26 +329,38 @@ def test_object_create_bad_contenttype_none():
client.put_object(Bucket=bucket_name, Key=key_name, Body='bar')
@pytest.mark.auth_common
@tag('auth_common')
@attr(resource='object')
@attr(method='put')
@attr(operation='create w/empty authorization')
@attr(assertion='fails 403')
# TODO: remove 'fails_on_rgw' and once we have learned how to remove the authorization header
@pytest.mark.fails_on_rgw
@attr('fails_on_rgw')
def test_object_create_bad_authorization_empty():
e = _add_header_create_bad_object({'Authorization': ''})
status, error_code = _get_status_and_error_code(e.response)
assert status == 403
eq(status, 403)
@pytest.mark.auth_common
@tag('auth_common')
@attr(resource='object')
@attr(method='put')
@attr(operation='create w/date and x-amz-date')
@attr(assertion='succeeds')
# TODO: remove 'fails_on_rgw' and once we have learned how to pass both the 'Date' and 'X-Amz-Date' header during signing and not 'X-Amz-Date' before
@pytest.mark.fails_on_rgw
@attr('fails_on_rgw')
def test_object_create_date_and_amz_date():
date = formatdate(usegmt=True)
bucket_name, key_name = _add_header_create_object({'Date': date, 'X-Amz-Date': date})
client = get_client()
client.put_object(Bucket=bucket_name, Key=key_name, Body='bar')
@pytest.mark.auth_common
@tag('auth_common')
@attr(resource='object')
@attr(method='put')
@attr(operation='create w/x-amz-date and no date')
@attr(assertion='succeeds')
# TODO: remove 'fails_on_rgw' and once we have learned how to pass both the 'Date' and 'X-Amz-Date' header during signing and not 'X-Amz-Date' before
@pytest.mark.fails_on_rgw
@attr('fails_on_rgw')
def test_object_create_amz_date_and_no_date():
date = formatdate(usegmt=True)
bucket_name, key_name = _add_header_create_object({'Date': '', 'X-Amz-Date': date})
@ -275,24 +368,36 @@ def test_object_create_amz_date_and_no_date():
client.put_object(Bucket=bucket_name, Key=key_name, Body='bar')
# the teardown is really messed up here. check it out
@pytest.mark.auth_common
@tag('auth_common')
@attr(resource='object')
@attr(method='put')
@attr(operation='create w/no authorization')
@attr(assertion='fails 403')
# TODO: remove 'fails_on_rgw' and once we have learned how to remove the authorization header
@pytest.mark.fails_on_rgw
@attr('fails_on_rgw')
def test_object_create_bad_authorization_none():
e = _remove_header_create_bad_object('Authorization')
status, error_code = _get_status_and_error_code(e.response)
assert status == 403
eq(status, 403)
@pytest.mark.auth_common
@tag('auth_common')
@attr(resource='bucket')
@attr(method='put')
@attr(operation='create w/no content length')
@attr(assertion='succeeds')
# TODO: remove 'fails_on_rgw' and once we have learned how to remove the content-length header
@pytest.mark.fails_on_rgw
@attr('fails_on_rgw')
def test_bucket_create_contentlength_none():
remove = 'Content-Length'
_remove_header_create_bucket(remove)
@pytest.mark.auth_common
@tag('auth_common')
@attr(resource='bucket')
@attr(method='acls')
@attr(operation='set w/no content length')
@attr(assertion='succeeds')
# TODO: remove 'fails_on_rgw' and once we have learned how to remove the content-length header
@pytest.mark.fails_on_rgw
@attr('fails_on_rgw')
def test_object_acl_create_contentlength_none():
bucket_name = get_new_bucket()
client = get_client()
@ -306,7 +411,11 @@ def test_object_acl_create_contentlength_none():
client.meta.events.register('before-call.s3.PutObjectAcl', remove_header)
client.put_object_acl(Bucket=bucket_name, Key='foo', ACL='public-read')
@pytest.mark.auth_common
@tag('auth_common')
@attr(resource='bucket')
@attr(method='acls')
@attr(operation='set w/invalid permission')
@attr(assertion='fails 400')
def test_bucket_put_bad_canned_acl():
bucket_name = get_new_bucket()
client = get_client()
@ -317,9 +426,13 @@ def test_bucket_put_bad_canned_acl():
e = assert_raises(ClientError, client.put_bucket_acl, Bucket=bucket_name, ACL='public-read')
status = _get_status(e.response)
assert status == 400
eq(status, 400)
@pytest.mark.auth_common
@tag('auth_common')
@attr(resource='bucket')
@attr(method='put')
@attr(operation='create w/expect 200')
@attr(assertion='garbage, but S3 succeeds!')
def test_bucket_create_bad_expect_mismatch():
bucket_name = get_new_bucket_name()
client = get_client()
@ -329,67 +442,99 @@ def test_bucket_create_bad_expect_mismatch():
client.meta.events.register('before-call.s3.CreateBucket', add_headers)
client.create_bucket(Bucket=bucket_name)
@pytest.mark.auth_common
@tag('auth_common')
@attr(resource='bucket')
@attr(method='put')
@attr(operation='create w/expect empty')
@attr(assertion='garbage, but S3 succeeds!')
def test_bucket_create_bad_expect_empty():
headers = {'Expect': ''}
_add_header_create_bucket(headers)
@pytest.mark.auth_common
@tag('auth_common')
@attr(resource='bucket')
@attr(method='put')
@attr(operation='create w/empty content length')
@attr(assertion='fails 400')
# TODO: The request isn't even making it to the RGW past the frontend
# This test had 'fails_on_rgw' before the move to boto3
@pytest.mark.fails_on_rgw
@attr('fails_on_rgw')
def test_bucket_create_bad_contentlength_empty():
headers = {'Content-Length': ''}
e = _add_header_create_bad_bucket(headers)
status, error_code = _get_status_and_error_code(e.response)
assert status == 400
eq(status, 400)
@pytest.mark.auth_common
@pytest.mark.fails_on_mod_proxy_fcgi
@tag('auth_common')
@attr(resource='bucket')
@attr(method='put')
@attr(operation='create w/negative content length')
@attr(assertion='fails 400')
@attr('fails_on_mod_proxy_fcgi')
def test_bucket_create_bad_contentlength_negative():
headers = {'Content-Length': '-1'}
e = _add_header_create_bad_bucket(headers)
status = _get_status(e.response)
assert status == 400
eq(status, 400)
@pytest.mark.auth_common
@tag('auth_common')
@attr(resource='bucket')
@attr(method='put')
@attr(operation='create w/no content length')
@attr(assertion='succeeds')
# TODO: remove 'fails_on_rgw' and once we have learned how to remove the content-length header
@pytest.mark.fails_on_rgw
@attr('fails_on_rgw')
def test_bucket_create_bad_contentlength_none():
remove = 'Content-Length'
_remove_header_create_bucket(remove)
@pytest.mark.auth_common
@tag('auth_common')
@attr(resource='bucket')
@attr(method='put')
@attr(operation='create w/empty authorization')
@attr(assertion='fails 403')
# TODO: remove 'fails_on_rgw' and once we have learned how to manipulate the authorization header
@pytest.mark.fails_on_rgw
@attr('fails_on_rgw')
def test_bucket_create_bad_authorization_empty():
headers = {'Authorization': ''}
e = _add_header_create_bad_bucket(headers)
status, error_code = _get_status_and_error_code(e.response)
assert status == 403
assert error_code == 'AccessDenied'
eq(status, 403)
eq(error_code, 'AccessDenied')
@pytest.mark.auth_common
@tag('auth_common')
@attr(resource='bucket')
@attr(method='put')
@attr(operation='create w/no authorization')
@attr(assertion='fails 403')
# TODO: remove 'fails_on_rgw' and once we have learned how to manipulate the authorization header
@pytest.mark.fails_on_rgw
@attr('fails_on_rgw')
def test_bucket_create_bad_authorization_none():
e = _remove_header_create_bad_bucket('Authorization')
status, error_code = _get_status_and_error_code(e.response)
assert status == 403
assert error_code == 'AccessDenied'
eq(status, 403)
eq(error_code, 'AccessDenied')
@pytest.mark.auth_aws2
@tag('auth_aws2')
@attr(resource='object')
@attr(method='put')
@attr(operation='create w/invalid MD5')
@attr(assertion='fails 400')
def test_object_create_bad_md5_invalid_garbage_aws2():
v2_client = get_v2_client()
headers = {'Content-MD5': 'AWS HAHAHA'}
e = _add_header_create_bad_object(headers, v2_client)
status, error_code = _get_status_and_error_code(e.response)
assert status == 400
assert error_code == 'InvalidDigest'
eq(status, 400)
eq(error_code, 'InvalidDigest')
@pytest.mark.auth_aws2
@tag('auth_aws2')
@attr(resource='object')
@attr(method='put')
@attr(operation='create w/content length too short')
@attr(assertion='fails 400')
# TODO: remove 'fails_on_rgw' and once we have learned how to manipulate the Content-Length header
@pytest.mark.fails_on_rgw
@attr('fails_on_rgw')
def test_object_create_bad_contentlength_mismatch_below_aws2():
v2_client = get_v2_client()
content = 'bar'
@ -397,176 +542,252 @@ def test_object_create_bad_contentlength_mismatch_below_aws2():
headers = {'Content-Length': str(length)}
e = _add_header_create_bad_object(headers, v2_client)
status, error_code = _get_status_and_error_code(e.response)
assert status == 400
assert error_code == 'BadDigest'
eq(status, 400)
eq(error_code, 'BadDigest')
@pytest.mark.auth_aws2
@tag('auth_aws2')
@attr(resource='object')
@attr(method='put')
@attr(operation='create w/incorrect authorization')
@attr(assertion='fails 403')
# TODO: remove 'fails_on_rgw' and once we have learned how to manipulate the authorization header
@pytest.mark.fails_on_rgw
@attr('fails_on_rgw')
def test_object_create_bad_authorization_incorrect_aws2():
v2_client = get_v2_client()
headers = {'Authorization': 'AWS AKIAIGR7ZNNBHC5BKSUB:FWeDfwojDSdS2Ztmpfeubhd9isU='}
e = _add_header_create_bad_object(headers, v2_client)
status, error_code = _get_status_and_error_code(e.response)
assert status == 403
assert error_code == 'InvalidDigest'
eq(status, 403)
eq(error_code, 'InvalidDigest')
@pytest.mark.auth_aws2
@tag('auth_aws2')
@attr(resource='object')
@attr(method='put')
@attr(operation='create w/invalid authorization')
@attr(assertion='fails 400')
# TODO: remove 'fails_on_rgw' and once we have learned how to manipulate the authorization header
@pytest.mark.fails_on_rgw
@attr('fails_on_rgw')
def test_object_create_bad_authorization_invalid_aws2():
v2_client = get_v2_client()
headers = {'Authorization': 'AWS HAHAHA'}
e = _add_header_create_bad_object(headers, v2_client)
status, error_code = _get_status_and_error_code(e.response)
assert status == 400
assert error_code == 'InvalidArgument'
eq(status, 400)
eq(error_code, 'InvalidArgument')
@pytest.mark.auth_aws2
@tag('auth_aws2')
@attr(resource='object')
@attr(method='put')
@attr(operation='create w/empty user agent')
@attr(assertion='succeeds')
def test_object_create_bad_ua_empty_aws2():
v2_client = get_v2_client()
headers = {'User-Agent': ''}
bucket_name, key_name = _add_header_create_object(headers, v2_client)
v2_client.put_object(Bucket=bucket_name, Key=key_name, Body='bar')
@pytest.mark.auth_aws2
@tag('auth_aws2')
@attr(resource='object')
@attr(method='put')
@attr(operation='create w/no user agent')
@attr(assertion='succeeds')
def test_object_create_bad_ua_none_aws2():
v2_client = get_v2_client()
remove = 'User-Agent'
bucket_name, key_name = _remove_header_create_object(remove, v2_client)
v2_client.put_object(Bucket=bucket_name, Key=key_name, Body='bar')
@pytest.mark.auth_aws2
@tag('auth_aws2')
@attr(resource='object')
@attr(method='put')
@attr(operation='create w/invalid date')
@attr(assertion='fails 403')
def test_object_create_bad_date_invalid_aws2():
v2_client = get_v2_client()
headers = {'x-amz-date': 'Bad Date'}
e = _add_header_create_bad_object(headers, v2_client)
status, error_code = _get_status_and_error_code(e.response)
assert status == 403
assert error_code == 'AccessDenied'
eq(status, 403)
eq(error_code, 'AccessDenied')
@pytest.mark.auth_aws2
@tag('auth_aws2')
@attr(resource='object')
@attr(method='put')
@attr(operation='create w/empty date')
@attr(assertion='fails 403')
def test_object_create_bad_date_empty_aws2():
v2_client = get_v2_client()
headers = {'x-amz-date': ''}
e = _add_header_create_bad_object(headers, v2_client)
status, error_code = _get_status_and_error_code(e.response)
assert status == 403
assert error_code == 'AccessDenied'
eq(status, 403)
eq(error_code, 'AccessDenied')
@pytest.mark.auth_aws2
@tag('auth_aws2')
@attr(resource='object')
@attr(method='put')
@attr(operation='create w/no date')
@attr(assertion='fails 403')
# TODO: remove 'fails_on_rgw' and once we have learned how to remove the date header
@pytest.mark.fails_on_rgw
@attr('fails_on_rgw')
def test_object_create_bad_date_none_aws2():
v2_client = get_v2_client()
remove = 'x-amz-date'
e = _remove_header_create_bad_object(remove, v2_client)
status, error_code = _get_status_and_error_code(e.response)
assert status == 403
assert error_code == 'AccessDenied'
eq(status, 403)
eq(error_code, 'AccessDenied')
@pytest.mark.auth_aws2
@tag('auth_aws2')
@attr(resource='object')
@attr(method='put')
@attr(operation='create w/date in past')
@attr(assertion='fails 403')
def test_object_create_bad_date_before_today_aws2():
v2_client = get_v2_client()
headers = {'x-amz-date': 'Tue, 07 Jul 2010 21:53:04 GMT'}
e = _add_header_create_bad_object(headers, v2_client)
status, error_code = _get_status_and_error_code(e.response)
assert status == 403
assert error_code == 'RequestTimeTooSkewed'
eq(status, 403)
eq(error_code, 'RequestTimeTooSkewed')
@pytest.mark.auth_aws2
@tag('auth_aws2')
@attr(resource='object')
@attr(method='put')
@attr(operation='create w/date before epoch')
@attr(assertion='fails 403')
def test_object_create_bad_date_before_epoch_aws2():
v2_client = get_v2_client()
headers = {'x-amz-date': 'Tue, 07 Jul 1950 21:53:04 GMT'}
e = _add_header_create_bad_object(headers, v2_client)
status, error_code = _get_status_and_error_code(e.response)
assert status == 403
assert error_code == 'AccessDenied'
eq(status, 403)
eq(error_code, 'AccessDenied')
@pytest.mark.auth_aws2
@tag('auth_aws2')
@attr(resource='object')
@attr(method='put')
@attr(operation='create w/date after 9999')
@attr(assertion='fails 403')
def test_object_create_bad_date_after_end_aws2():
v2_client = get_v2_client()
headers = {'x-amz-date': 'Tue, 07 Jul 9999 21:53:04 GMT'}
e = _add_header_create_bad_object(headers, v2_client)
status, error_code = _get_status_and_error_code(e.response)
assert status == 403
assert error_code == 'RequestTimeTooSkewed'
eq(status, 403)
eq(error_code, 'RequestTimeTooSkewed')
@pytest.mark.auth_aws2
@tag('auth_aws2')
@attr(resource='bucket')
@attr(method='put')
@attr(operation='create w/invalid authorization')
@attr(assertion='fails 400')
# TODO: remove 'fails_on_rgw' and once we have learned how to remove the date header
@pytest.mark.fails_on_rgw
@attr('fails_on_rgw')
def test_bucket_create_bad_authorization_invalid_aws2():
v2_client = get_v2_client()
headers = {'Authorization': 'AWS HAHAHA'}
e = _add_header_create_bad_bucket(headers, v2_client)
status, error_code = _get_status_and_error_code(e.response)
assert status == 400
assert error_code == 'InvalidArgument'
eq(status, 400)
eq(error_code, 'InvalidArgument')
@pytest.mark.auth_aws2
@tag('auth_aws2')
@attr(resource='bucket')
@attr(method='put')
@attr(operation='create w/empty user agent')
@attr(assertion='succeeds')
def test_bucket_create_bad_ua_empty_aws2():
v2_client = get_v2_client()
headers = {'User-Agent': ''}
_add_header_create_bucket(headers, v2_client)
@pytest.mark.auth_aws2
@tag('auth_aws2')
@attr(resource='bucket')
@attr(method='put')
@attr(operation='create w/no user agent')
@attr(assertion='succeeds')
def test_bucket_create_bad_ua_none_aws2():
v2_client = get_v2_client()
remove = 'User-Agent'
_remove_header_create_bucket(remove, v2_client)
@pytest.mark.auth_aws2
@tag('auth_aws2')
@attr(resource='bucket')
@attr(method='put')
@attr(operation='create w/invalid date')
@attr(assertion='fails 403')
def test_bucket_create_bad_date_invalid_aws2():
v2_client = get_v2_client()
headers = {'x-amz-date': 'Bad Date'}
e = _add_header_create_bad_bucket(headers, v2_client)
status, error_code = _get_status_and_error_code(e.response)
assert status == 403
assert error_code == 'AccessDenied'
eq(status, 403)
eq(error_code, 'AccessDenied')
@pytest.mark.auth_aws2
@tag('auth_aws2')
@attr(resource='bucket')
@attr(method='put')
@attr(operation='create w/empty date')
@attr(assertion='fails 403')
def test_bucket_create_bad_date_empty_aws2():
v2_client = get_v2_client()
headers = {'x-amz-date': ''}
e = _add_header_create_bad_bucket(headers, v2_client)
status, error_code = _get_status_and_error_code(e.response)
assert status == 403
assert error_code == 'AccessDenied'
eq(status, 403)
eq(error_code, 'AccessDenied')
@pytest.mark.auth_aws2
@tag('auth_aws2')
@attr(resource='bucket')
@attr(method='put')
@attr(operation='create w/no date')
@attr(assertion='fails 403')
# TODO: remove 'fails_on_rgw' and once we have learned how to remove the date header
@pytest.mark.fails_on_rgw
@attr('fails_on_rgw')
def test_bucket_create_bad_date_none_aws2():
v2_client = get_v2_client()
remove = 'x-amz-date'
e = _remove_header_create_bad_bucket(remove, v2_client)
status, error_code = _get_status_and_error_code(e.response)
assert status == 403
assert error_code == 'AccessDenied'
eq(status, 403)
eq(error_code, 'AccessDenied')
@pytest.mark.auth_aws2
@tag('auth_aws2')
@attr(resource='bucket')
@attr(method='put')
@attr(operation='create w/date in past')
@attr(assertion='fails 403')
def test_bucket_create_bad_date_before_today_aws2():
v2_client = get_v2_client()
headers = {'x-amz-date': 'Tue, 07 Jul 2010 21:53:04 GMT'}
e = _add_header_create_bad_bucket(headers, v2_client)
status, error_code = _get_status_and_error_code(e.response)
assert status == 403
assert error_code == 'RequestTimeTooSkewed'
eq(status, 403)
eq(error_code, 'RequestTimeTooSkewed')
@pytest.mark.auth_aws2
@tag('auth_aws2')
@attr(resource='bucket')
@attr(method='put')
@attr(operation='create w/date in future')
@attr(assertion='fails 403')
def test_bucket_create_bad_date_after_today_aws2():
v2_client = get_v2_client()
headers = {'x-amz-date': 'Tue, 07 Jul 2030 21:53:04 GMT'}
e = _add_header_create_bad_bucket(headers, v2_client)
status, error_code = _get_status_and_error_code(e.response)
assert status == 403
assert error_code == 'RequestTimeTooSkewed'
eq(status, 403)
eq(error_code, 'RequestTimeTooSkewed')
@pytest.mark.auth_aws2
@tag('auth_aws2')
@attr(resource='bucket')
@attr(method='put')
@attr(operation='create w/date before epoch')
@attr(assertion='fails 403')
def test_bucket_create_bad_date_before_epoch_aws2():
v2_client = get_v2_client()
headers = {'x-amz-date': 'Tue, 07 Jul 1950 21:53:04 GMT'}
e = _add_header_create_bad_bucket(headers, v2_client)
status, error_code = _get_status_and_error_code(e.response)
assert status == 403
assert error_code == 'AccessDenied'
eq(status, 403)
eq(error_code, 'AccessDenied')

File diff suppressed because it is too large Load diff

File diff suppressed because it is too large Load diff

File diff suppressed because it is too large Load diff

View file

@ -1,159 +0,0 @@
import json
import pytest
from botocore.exceptions import ClientError
from . import (
configfile,
get_iam_root_client,
get_iam_alt_root_client,
get_new_bucket_name,
get_prefix,
nuke_prefixed_buckets,
)
from .iam import iam_root, iam_alt_root
from .utils import assert_raises, _get_status_and_error_code
def get_new_topic_name():
return get_new_bucket_name()
def nuke_topics(client, prefix):
p = client.get_paginator('list_topics')
for response in p.paginate():
for topic in response['Topics']:
arn = topic['TopicArn']
if prefix not in arn:
pass
try:
client.delete_topic(TopicArn=arn)
except:
pass
@pytest.fixture
def sns(iam_root):
client = get_iam_root_client(service_name='sns')
yield client
nuke_topics(client, get_prefix())
@pytest.fixture
def sns_alt(iam_alt_root):
client = get_iam_alt_root_client(service_name='sns')
yield client
nuke_topics(client, get_prefix())
@pytest.fixture
def s3(iam_root):
client = get_iam_root_client(service_name='s3')
yield client
nuke_prefixed_buckets(get_prefix(), client)
@pytest.fixture
def s3_alt(iam_alt_root):
client = get_iam_alt_root_client(service_name='s3')
yield client
nuke_prefixed_buckets(get_prefix(), client)
@pytest.mark.iam_account
@pytest.mark.sns
def test_account_topic(sns):
name = get_new_topic_name()
response = sns.create_topic(Name=name)
arn = response['TopicArn']
assert arn.startswith('arn:aws:sns:')
assert arn.endswith(f':{name}')
response = sns.list_topics()
assert arn in [p['TopicArn'] for p in response['Topics']]
sns.set_topic_attributes(TopicArn=arn, AttributeName='Policy', AttributeValue='')
response = sns.get_topic_attributes(TopicArn=arn)
assert 'Attributes' in response
sns.delete_topic(TopicArn=arn)
response = sns.list_topics()
assert arn not in [p['TopicArn'] for p in response['Topics']]
with pytest.raises(sns.exceptions.NotFoundException):
sns.get_topic_attributes(TopicArn=arn)
sns.delete_topic(TopicArn=arn)
@pytest.mark.iam_account
@pytest.mark.sns
def test_cross_account_topic(sns, sns_alt):
name = get_new_topic_name()
arn = sns.create_topic(Name=name)['TopicArn']
# not visible to any alt user apis
with pytest.raises(sns.exceptions.NotFoundException):
sns_alt.get_topic_attributes(TopicArn=arn)
with pytest.raises(sns.exceptions.NotFoundException):
sns_alt.set_topic_attributes(TopicArn=arn, AttributeName='Policy', AttributeValue='')
# delete returns success
sns_alt.delete_topic(TopicArn=arn)
response = sns_alt.list_topics()
assert arn not in [p['TopicArn'] for p in response['Topics']]
@pytest.mark.iam_account
@pytest.mark.sns
def test_account_topic_publish(sns, s3):
name = get_new_topic_name()
response = sns.create_topic(Name=name)
topic_arn = response['TopicArn']
bucket = get_new_bucket_name()
s3.create_bucket(Bucket=bucket)
config = {'TopicConfigurations': [{
'Id': 'id',
'TopicArn': topic_arn,
'Events': [ 's3:ObjectCreated:*' ],
}]}
s3.put_bucket_notification_configuration(
Bucket=bucket, NotificationConfiguration=config)
@pytest.mark.iam_account
@pytest.mark.iam_cross_account
@pytest.mark.sns
def test_cross_account_topic_publish(sns, s3_alt, iam_alt_root):
name = get_new_topic_name()
response = sns.create_topic(Name=name)
topic_arn = response['TopicArn']
bucket = get_new_bucket_name()
s3_alt.create_bucket(Bucket=bucket)
config = {'TopicConfigurations': [{
'Id': 'id',
'TopicArn': topic_arn,
'Events': [ 's3:ObjectCreated:*' ],
}]}
# expect AccessDenies because no resource policy allows cross-account access
e = assert_raises(ClientError, s3_alt.put_bucket_notification_configuration,
Bucket=bucket, NotificationConfiguration=config)
status, error_code = _get_status_and_error_code(e.response)
assert status == 403
assert error_code == 'AccessDenied'
# add topic policy to allow the alt user
alt_principal = iam_alt_root.get_user()['User']['Arn']
policy = json.dumps({
'Version': '2012-10-17',
'Statement': [{
'Effect': 'Allow',
'Principal': {'AWS': alt_principal},
'Action': 'sns:Publish',
'Resource': topic_arn
}]
})
sns.set_topic_attributes(TopicArn=topic_arn, AttributeName='Policy',
AttributeValue=policy)
s3_alt.put_bucket_notification_configuration(
Bucket=bucket, NotificationConfiguration=config)

File diff suppressed because it is too large Load diff

View file

@ -1,9 +1,11 @@
from nose.tools import eq_ as eq
from . import utils
def test_generate():
FIVE_MB = 5 * 1024 * 1024
assert len(''.join(utils.generate_random(0))) == 0
assert len(''.join(utils.generate_random(1))) == 1
assert len(''.join(utils.generate_random(FIVE_MB - 1))) == FIVE_MB - 1
assert len(''.join(utils.generate_random(FIVE_MB))) == FIVE_MB
assert len(''.join(utils.generate_random(FIVE_MB + 1))) == FIVE_MB + 1
eq(len(''.join(utils.generate_random(0))), 0)
eq(len(''.join(utils.generate_random(1))), 1)
eq(len(''.join(utils.generate_random(FIVE_MB - 1))), FIVE_MB - 1)
eq(len(''.join(utils.generate_random(FIVE_MB))), FIVE_MB)
eq(len(''.join(utils.generate_random(FIVE_MB + 1))), FIVE_MB + 1)

View file

@ -3,6 +3,8 @@ import requests
import string
import time
from nose.tools import eq_ as eq
def assert_raises(excClass, callableObj, *args, **kwargs):
"""
Like unittest.TestCase.assertRaises, but returns the exception.

View file

@ -1,9 +0,0 @@
[tox]
envlist = py
[testenv]
deps = -rrequirements.txt
passenv =
S3TEST_CONF
S3_USE_SIGV4
commands = pytest {posargs}