Initial commit.

This commit is contained in:
Tommi Virtanen 2011-04-04 14:45:42 -07:00
commit 46f2e3b00e
6 changed files with 825 additions and 0 deletions

11
.gitignore vendored Normal file
View file

@ -0,0 +1,11 @@
*~
.#*
## the next line needs to start with a backslash to avoid looking like
## a comment
\#*#
.*.swp
*.pyc
*.pyo
/virtualenv

91
README.rst Normal file
View file

@ -0,0 +1,91 @@
========================
S3 compatibility tests
========================
This is a set of completely unofficial Amazon AWS S3 compatibility
tests, that will hopefully be useful to people implementing software
that exposes an S3-like API.
The tests only cover the REST interface.
TODO: test direct HTTP downloads, like a web browser would do.
The tests use the Boto library, so any e.g. HTTP-level differences
that Boto papers over, the tests will not be able to discover. Raw
HTTP tests may be added later.
The tests use the Nose test framework. To get started, ensure you have
the ``virtualenv`` software installed; e.g. on Debian/Ubuntu::
sudo apt-get install python-virtualenv
and then run::
./bootstrap
You will need to create a configuration file with the location of the
service and two different credentials, something like this::
[DEFAULT]
## this section is just used as default for all the "s3 *"
## sections, you can place these variables also directly there
## replace with e.g. "localhost" to run against local software
host = s3.amazonaws.com
## uncomment the port to use something other than 80
# port = 8080
## say "no" to disable TLS
is_secure = yes
[fixtures]
## all the buckets created will start with this prefix;
## {random} will be filled with random characters to pad
## the prefix to 30 characters long, and avoid collisions
bucket prefix = YOURNAMEHERE-{random}-
[s3 main]
## the tests assume two accounts are defined, "main" and "alt".
## user_id is a 64-character hexstring
user_id = 0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef
## display name typically looks more like a unix login, "jdoe" etc
display_name = youruseridhere
## replace these with your access keys
access_key = ABCDEFGHIJKLMNOPQRST
secret_key = abcdefghijklmnopqrstuvwxyzabcdefghijklmn
[s3 alt]
## another user account, used for ACL-related tests
user_id = 56789abcdef0123456789abcdef0123456789abcdef0123456789abcdef01234
display_name = john.doe
## the "alt" user needs to have email set, too
email = john.doe@example.com
access_key = NOPQRSTUVWXYZABCDEFG
secret_key = nopqrstuvwxyzabcdefghijklmnabcdefghijklm
Once you have that, you can run the tests with::
S3TEST_CONF=your.conf ./virtualenv/bin/nosetests
You can specify what test(s) to run::
S3TEST_CONF=your.conf ./virtualenv/bin/nosetests test_s3:test_object_acl_grant_public_read
Some tests have attributes set based on their current reliability and
things like AWS not enforcing their spec stricly. You can filter tests
based on their attributes::
S3TEST_CONF=aws.conf ./virtualenv/bin/nosetests -a '!fails_on_aws'
TODO
====
- We should assume read-after-write consistency, and make the tests
actually request such a location.
http://aws.amazon.com/s3/faqs/#What_data_consistency_model_does_Amazon_S3_employ

4
bootstrap Executable file
View file

@ -0,0 +1,4 @@
#!/bin/sh
set -e
virtualenv --no-site-packages --distribute virtualenv
./virtualenv/bin/pip install -r requirements.txt

3
requirements.txt Normal file
View file

@ -0,0 +1,3 @@
nose >=1.0.0
boto >=2.0b4
bunch >=1.0.0

702
test_s3.py Normal file
View file

@ -0,0 +1,702 @@
import ConfigParser
import boto.exception
import boto.s3.connection
import bunch
import itertools
import os
import random
import string
import time
from nose.tools import eq_ as eq
from nose.plugins.attrib import attr
from utils import assert_raises
NONEXISTENT_EMAIL = 'doesnotexist@dreamhost.com.invalid'
s3 = bunch.Bunch()
config = bunch.Bunch()
# this will be assigned by setup()
prefix = None
def choose_bucket_prefix(template, max_len=30):
"""
Choose a prefix for our test buckets, so they're easy to identify.
Use template and feed it more and more random filler, until it's
as long as possible but still below max_len.
"""
rand = ''.join(
random.choice(string.ascii_lowercase + string.digits)
for c in range(255)
)
while rand:
s = template.format(random=rand)
if len(s) <= max_len:
return s
rand = rand[:-1]
raise RuntimeError(
'Bucket prefix template is impossible to fulfill: {template!r}'.format(
template=template,
),
)
def nuke_prefixed_buckets():
for name, conn in s3.items():
print 'Cleaning buckets from connection {name}'.format(name=name)
for bucket in conn.get_all_buckets():
if bucket.name.startswith(prefix):
print 'Cleaning bucket {bucket}'.format(bucket=bucket)
try:
for key in bucket.list():
print 'Cleaning bucket {bucket} key {key}'.format(
bucket=bucket,
key=key,
)
key.delete()
bucket.delete()
except boto.exception.S3ResponseError as e:
# TODO workaround for buggy rgw that fails to send
# error_code, remove
if (e.status == 403
and e.error_code is None
and e.body == ''):
e.error_code = 'AccessDenied'
if e.error_code != 'AccessDenied':
print 'GOT UNWANTED ERROR', e.error_code
raise
# seems like we're not the owner of the bucket; ignore
pass
print 'Done with cleanup of test buckets.'
def setup():
cfg = ConfigParser.RawConfigParser()
try:
path = os.environ['S3TEST_CONF']
except KeyError:
raise RuntimeError(
'To run tests, point environment '
+ 'variable S3TEST_CONF to a config file.',
)
with file(path) as f:
cfg.readfp(f)
global prefix
try:
template = cfg.get('fixtures', 'bucket prefix')
except (ConfigParser.NoSectionError, ConfigParser.NoOptionError):
template = 'test-{random}-'
prefix = choose_bucket_prefix(template=template)
s3.clear()
config.clear()
for section in cfg.sections():
try:
(type_, name) = section.split(None, 1)
except ValueError:
continue
if type_ != 's3':
continue
try:
port = cfg.getint(section, 'port')
except ConfigParser.NoOptionError:
port = None
config[name] = bunch.Bunch()
for var in [
'user_id',
'display_name',
'email',
]:
try:
config[name][var] = cfg.get(section, var)
except ConfigParser.NoOptionError:
pass
conn = boto.s3.connection.S3Connection(
aws_access_key_id=cfg.get(section, 'access_key'),
aws_secret_access_key=cfg.get(section, 'secret_key'),
is_secure=cfg.getboolean(section, 'is_secure'),
port=port,
host=cfg.get(section, 'host'),
# TODO support & test all variations
calling_format=boto.s3.connection.OrdinaryCallingFormat(),
)
s3[name] = conn
# WARNING! we actively delete all buckets we see with the prefix
# we've chosen! Choose your prefix with care, and don't reuse
# credentials!
# We also assume nobody else is going to use buckets with that
# prefix. This is racy but given enough randomness, should not
# really fail.
nuke_prefixed_buckets()
def teardown():
# remove our buckets here also, to avoid littering
nuke_prefixed_buckets()
bucket_counter = itertools.count(1)
def get_new_bucket(connection=None):
"""
Get a bucket that exists and is empty.
Always recreates a bucket from scratch. This is useful to also
reset ACLs and such.
"""
if connection is None:
connection = s3.main
name = '{prefix}{num}'.format(
prefix=prefix,
num=next(bucket_counter),
)
# the only way for this to fail with a pre-existing bucket is if
# someone raced us between setup nuke_prefixed_buckets and here;
# ignore that as astronomically unlikely
bucket = connection.create_bucket(name)
return bucket
def check_access_denied(fn, *args, **kwargs):
e = assert_raises(boto.exception.S3ResponseError, fn, *args, **kwargs)
eq(e.status, 403)
eq(e.reason, 'Forbidden')
eq(e.error_code, 'AccessDenied')
def test_bucket_list_empty():
bucket = get_new_bucket()
l = bucket.list()
l = list(l)
eq(l, [])
# TODO rgw gives NoSuchKey instead of NoSuchBucket
@attr('fails_on_rgw')
def test_bucket_create_delete():
name = '{prefix}foo'.format(prefix=prefix)
print 'Trying bucket {name!r}'.format(name=name)
bucket = s3.main.create_bucket(name)
# make sure it's actually there
s3.main.get_bucket(bucket.name)
bucket.delete()
# make sure it's gone
e = assert_raises(boto.exception.S3ResponseError, bucket.delete)
eq(e.status, 404)
eq(e.reason, 'Not Found')
eq(e.error_code, 'NoSuchBucket')
def test_object_read_notexist():
bucket = get_new_bucket()
key = bucket.new_key('foobar')
e = assert_raises(boto.exception.S3ResponseError, key.get_contents_as_string)
eq(e.status, 404)
eq(e.reason, 'Not Found')
eq(e.error_code, 'NoSuchKey')
def test_object_write_then_read():
bucket = get_new_bucket()
key = bucket.new_key('foo')
key.set_contents_from_string('bar')
got = key.get_contents_as_string()
eq(got, 'bar')
def check_bad_bucket_name(name):
e = assert_raises(boto.exception.S3ResponseError, s3.main.create_bucket, name)
eq(e.status, 400)
eq(e.reason, 'Bad Request')
eq(e.error_code, 'InvalidBucketName')
# AWS does not enforce all documented bucket restrictions.
# http://docs.amazonwebservices.com/AmazonS3/2006-03-01/dev/index.html?BucketRestrictions.html
@attr('fails_on_aws')
# TODO rgw fails to provide error_code
@attr('fails_on_rgw')
def test_bucket_create_naming_bad_starts_nonalpha():
check_bad_bucket_name('_alphasoup')
# TODO this seems to hang until timeout on rgw?
@attr('fails_on_rgw')
def test_bucket_create_naming_bad_short_empty():
# bucket creates where name is empty look like PUTs to the parent
# resource (with slash), hence their error response is different
e = assert_raises(boto.exception.S3ResponseError, s3.main.create_bucket, '')
eq(e.status, 405)
eq(e.reason, 'Method Not Allowed')
eq(e.error_code, 'MethodNotAllowed')
# TODO rgw fails to provide error_code
@attr('fails_on_rgw')
def test_bucket_create_naming_bad_short_one():
check_bad_bucket_name('a')
# TODO rgw fails to provide error_code
@attr('fails_on_rgw')
def test_bucket_create_naming_bad_short_two():
check_bad_bucket_name('aa')
# TODO rgw fails to provide error_code
@attr('fails_on_rgw')
def test_bucket_create_naming_bad_long():
check_bad_bucket_name(256*'a')
check_bad_bucket_name(280*'a')
check_bad_bucket_name(3000*'a')
def check_good_bucket_name(name):
# prefixing to make then unique; tests using this must *not* rely
# on being able to set the initial character, or exceed the max
# len
s3.main.create_bucket('{prefix}{name}'.format(
prefix=prefix,
name=name,
))
def _test_bucket_create_naming_good_long(length):
assert len(prefix) < 255
num = length - len(prefix)
s3.main.create_bucket('{prefix}{name}'.format(
prefix=prefix,
name=num*'a',
))
def test_bucket_create_naming_good_long_250():
_test_bucket_create_naming_good_long(250)
# breaks nuke_prefixed_buckets in teardown, claims a bucket from
# conn.get_all_buckets() suddenly does not exist
@attr('fails_on_rgw')
def test_bucket_create_naming_good_long_251():
_test_bucket_create_naming_good_long(251)
# breaks nuke_prefixed_buckets in teardown, claims a bucket from
# conn.get_all_buckets() suddenly does not exist
@attr('fails_on_rgw')
def test_bucket_create_naming_good_long_252():
_test_bucket_create_naming_good_long(252)
# breaks nuke_prefixed_buckets in teardown, claims a bucket from
# conn.get_all_buckets() suddenly does not exist
@attr('fails_on_rgw')
def test_bucket_create_naming_good_long_253():
_test_bucket_create_naming_good_long(253)
# breaks nuke_prefixed_buckets in teardown, claims a bucket from
# conn.get_all_buckets() suddenly does not exist
@attr('fails_on_rgw')
def test_bucket_create_naming_good_long_254():
_test_bucket_create_naming_good_long(254)
# breaks nuke_prefixed_buckets in teardown, claims a bucket from
# conn.get_all_buckets() suddenly does not exist
@attr('fails_on_rgw')
def test_bucket_create_naming_good_long_255():
_test_bucket_create_naming_good_long(255)
# AWS does not enforce all documented bucket restrictions.
# http://docs.amazonwebservices.com/AmazonS3/2006-03-01/dev/index.html?BucketRestrictions.html
@attr('fails_on_aws')
# TODO rgw fails to provide error_code
@attr('fails_on_rgw')
def test_bucket_create_naming_bad_ip():
check_bad_bucket_name('192.168.5.123')
# TODO rgw fails to provide error_code
@attr('fails_on_rgw')
def test_bucket_create_naming_bad_punctuation():
# characters other than [a-zA-Z0-9._-]
check_bad_bucket_name('alpha!soup')
# test_bucket_create_naming_dns_* are valid but not recommended
def test_bucket_create_naming_dns_underscore():
check_good_bucket_name('foo_bar')
def test_bucket_create_naming_dns_long():
assert len(prefix) < 50
num = 100 - len(prefix)
check_good_bucket_name(num * 'a')
def test_bucket_create_naming_dns_dash_at_end():
check_good_bucket_name('foo-')
def test_bucket_create_naming_dns_dot_dot():
check_good_bucket_name('foo..bar')
def test_bucket_create_naming_dns_dot_dash():
check_good_bucket_name('foo.-bar')
def test_bucket_create_naming_dns_dash_dot():
check_good_bucket_name('foo-.bar')
# TODO rgw create_bucket() gives 409 Conflict even when owner matches;
# AWS ignores double-PUT
@attr('fails_on_rgw')
def test_bucket_create_exists():
bucket = get_new_bucket()
# REST idempotency means this should be a nop
s3.main.create_bucket(bucket.name)
def test_bucket_create_exists_nonowner():
# Names are shared across a global namespace. As such, no two
# users can create a bucket with that same name.
bucket = get_new_bucket()
e = assert_raises(boto.exception.S3CreateError, s3.alt.create_bucket, bucket.name)
eq(e.status, 409)
eq(e.reason, 'Conflict')
eq(e.error_code, 'BucketAlreadyExists')
def test_bucket_delete_nonowner():
bucket = get_new_bucket()
check_access_denied(s3.alt.delete_bucket, bucket.name)
# TODO radosgw returns the access_key instead of user_id
@attr('fails_on_rgw')
def test_bucket_acl_default():
bucket = get_new_bucket()
policy = bucket.get_acl()
print repr(policy)
eq(policy.owner.type, None)
eq(policy.owner.id, config.main.user_id)
eq(policy.owner.display_name, config.main.display_name)
eq(len(policy.acl.grants), 1)
eq(policy.acl.grants[0].permission, 'FULL_CONTROL')
eq(policy.acl.grants[0].id, policy.owner.id)
eq(policy.acl.grants[0].display_name, policy.owner.display_name)
eq(policy.acl.grants[0].uri, None)
eq(policy.acl.grants[0].email_address, None)
eq(policy.acl.grants[0].type, 'CanonicalUser')
# TODO rgw bucket.set_acl() gives 403 Forbidden
@attr('fails_on_rgw')
def test_bucket_acl_canned():
bucket = get_new_bucket()
# Since it defaults to private, set it public-read first
bucket.set_acl('public-read')
policy = bucket.get_acl()
print repr(policy)
eq(len(policy.acl.grants), 2)
eq(policy.acl.grants[0].permission, 'FULL_CONTROL')
eq(policy.acl.grants[0].id, policy.owner.id)
eq(policy.acl.grants[0].display_name, policy.owner.display_name)
eq(policy.acl.grants[0].uri, None)
eq(policy.acl.grants[0].email_address, None)
eq(policy.acl.grants[0].type, 'CanonicalUser')
eq(policy.acl.grants[1].permission, 'READ')
eq(policy.acl.grants[1].id, None)
eq(policy.acl.grants[1].display_name, None)
eq(policy.acl.grants[1].uri, 'http://acs.amazonaws.com/groups/global/AllUsers')
eq(policy.acl.grants[1].email_address, None)
eq(policy.acl.grants[1].type, 'Group')
# Then back to private.
bucket.set_acl('private')
policy = bucket.get_acl()
print repr(policy)
eq(len(policy.acl.grants), 1)
eq(policy.acl.grants[0].permission, 'FULL_CONTROL')
eq(policy.acl.grants[0].id, policy.owner.id)
eq(policy.acl.grants[0].display_name, policy.owner.display_name)
eq(policy.acl.grants[0].uri, None)
eq(policy.acl.grants[0].email_address, None)
eq(policy.acl.grants[0].type, 'CanonicalUser')
# TODO rgw bucket.set_acl() gives 403 Forbidden
@attr('fails_on_rgw')
def test_bucket_acl_canned_private_to_private():
bucket = get_new_bucket()
bucket.set_acl('private')
# TODO rgw bucket.set_acl() gives 403 Forbidden
@attr('fails_on_rgw')
def test_bucket_acl_grant_userid():
bucket = get_new_bucket()
# add alt user
policy = bucket.get_acl()
policy.acl.add_user_grant('FULL_CONTROL', config.alt.user_id)
bucket.set_acl(policy)
policy = bucket.get_acl()
eq(len(policy.acl.grants), 2)
eq(policy.acl.grants[1].permission, 'FULL_CONTROL')
eq(policy.acl.grants[1].id, config.alt.user_id)
eq(policy.acl.grants[1].display_name, config.alt.display_name)
eq(policy.acl.grants[1].uri, None)
eq(policy.acl.grants[1].email_address, None)
eq(policy.acl.grants[1].type, 'CanonicalUser')
# alt user can write
bucket2 = s3.alt.get_bucket(bucket.name)
key = bucket2.new_key('foo')
key.set_contents_from_string('bar')
# TODO rgw bucket.set_acl() gives 403 Forbidden
@attr('fails_on_rgw')
def test_bucket_acl_grant_email():
bucket = get_new_bucket()
# add alt user
policy = bucket.get_acl()
policy.acl.add_email_grant('FULL_CONTROL', config.alt.email)
bucket.set_acl(policy)
policy = bucket.get_acl()
eq(len(policy.acl.grants), 2)
eq(policy.acl.grants[1].permission, 'FULL_CONTROL')
eq(policy.acl.grants[1].id, config.alt.user_id)
eq(policy.acl.grants[1].display_name, config.alt.display_name)
eq(policy.acl.grants[1].uri, None)
eq(policy.acl.grants[1].email_address, None)
eq(policy.acl.grants[1].type, 'CanonicalUser')
# alt user can write
bucket2 = s3.alt.get_bucket(bucket.name)
key = bucket2.new_key('foo')
key.set_contents_from_string('bar')
# TODO rgw gives 403 error
@attr('fails_on_rgw')
def test_bucket_acl_grant_email_notexist():
# behavior not documented by amazon
bucket = get_new_bucket()
policy = bucket.get_acl()
policy.acl.add_email_grant('FULL_CONTROL', NONEXISTENT_EMAIL)
e = assert_raises(boto.exception.S3ResponseError, bucket.set_acl, policy)
eq(e.status, 400)
eq(e.reason, 'Bad Request')
eq(e.error_code, 'UnresolvableGrantByEmailAddress')
# TODO rgw bucket.set_acl() gives 403 Forbidden
@attr('fails_on_rgw')
def test_bucket_acl_revoke_all():
# revoke all access, including the owner's access
bucket = get_new_bucket()
policy = bucket.get_acl()
policy.acl.grants = []
bucket.set_acl(policy)
policy = bucket.get_acl()
eq(len(policy.acl.grants), 0)
# TODO rgw log_bucket.set_as_logging_target() gives 403 Forbidden
@attr('fails_on_rgw')
def test_logging_toggle():
bucket = get_new_bucket()
log_bucket = s3.main.create_bucket(bucket.name + '-log')
log_bucket.set_as_logging_target()
bucket.enable_logging(target_bucket=log_bucket, target_prefix=bucket.name)
bucket.disable_logging()
def _setup_access(bucket_acl, object_acl):
"""
Simple test fixture: create a bucket with given ACL, with objects:
- a: given ACL
- b: default ACL
"""
obj = bunch.Bunch()
bucket = get_new_bucket()
bucket.set_acl(bucket_acl)
obj.a = bucket.new_key('foo')
obj.a.set_contents_from_string('foocontent')
obj.a.set_acl(object_acl)
obj.b = bucket.new_key('bar')
obj.b.set_contents_from_string('barcontent')
obj.bucket2 = s3.alt.get_bucket(bucket.name, validate=False)
obj.a2 = obj.bucket2.new_key(obj.a.name)
obj.b2 = obj.bucket2.new_key(obj.b.name)
obj.new = obj.bucket2.new_key('new')
return obj
def get_bucket_key_names(bucket):
return frozenset(k.name for k in bucket.list())
# TODO bucket.set_acl('private') fails on rgw
@attr('fails_on_rgw')
def test_access_bucket_private_object_private():
# all the test_access_* tests follow this template
obj = _setup_access(bucket_acl='private', object_acl='private')
# acled object read fail
check_access_denied(obj.a2.get_contents_as_string)
# acled object write fail
check_access_denied(obj.a2.set_contents_from_string, 'barcontent')
# default object read fail
check_access_denied(obj.b2.get_contents_as_string)
# default object write fail
check_access_denied(obj.b2.set_contents_from_string, 'baroverwrite')
# bucket read fail
check_access_denied(get_bucket_key_names, obj.bucket2)
# bucket write fail
check_access_denied(obj.new.set_contents_from_string, 'newcontent')
# TODO bucket.set_acl('private') fails on rgw
@attr('fails_on_rgw')
def test_access_bucket_private_object_publicread():
obj = _setup_access(bucket_acl='private', object_acl='public-read')
eq(obj.a2.get_contents_as_string(), 'foocontent')
check_access_denied(obj.a2.set_contents_from_string, 'foooverwrite')
check_access_denied(obj.b2.get_contents_as_string)
check_access_denied(obj.b2.set_contents_from_string, 'baroverwrite')
check_access_denied(get_bucket_key_names, obj.bucket2)
check_access_denied(obj.new.set_contents_from_string, 'newcontent')
# TODO bucket.set_acl('private') fails on rgw
@attr('fails_on_rgw')
def test_access_bucket_private_object_publicreadwrite():
obj = _setup_access(bucket_acl='private', object_acl='public-read-write')
eq(obj.a2.get_contents_as_string(), 'foocontent')
### TODO: it seems AWS denies this write, even when we expected it
### to complete; as it is unclear what the actual desired behavior
### is (the docs are somewhat unclear), we'll just codify current
### AWS behavior, at least for now.
# obj.a2.set_contents_from_string('foooverwrite')
check_access_denied(obj.a2.set_contents_from_string, 'foooverwrite')
check_access_denied(obj.b2.get_contents_as_string)
check_access_denied(obj.b2.set_contents_from_string, 'baroverwrite')
check_access_denied(get_bucket_key_names, obj.bucket2)
check_access_denied(obj.new.set_contents_from_string, 'newcontent')
# TODO bucket.set_acl('public-read') fails on rgw
@attr('fails_on_rgw')
def test_access_bucket_publicread_object_private():
obj = _setup_access(bucket_acl='public-read', object_acl='private')
check_access_denied(obj.a2.get_contents_as_string)
check_access_denied(obj.a2.set_contents_from_string, 'barcontent')
### TODO: i don't understand why this gets denied, but codifying what
### AWS does
# eq(obj.b2.get_contents_as_string(), 'barcontent')
check_access_denied(obj.b2.get_contents_as_string)
check_access_denied(obj.b2.set_contents_from_string, 'baroverwrite')
eq(get_bucket_key_names(obj.bucket2), frozenset(['foo', 'bar']))
check_access_denied(obj.new.set_contents_from_string, 'newcontent')
# TODO bucket.set_acl('public-read') fails on rgw
@attr('fails_on_rgw')
def test_access_bucket_publicread_object_publicread():
obj = _setup_access(bucket_acl='public-read', object_acl='public-read')
eq(obj.a2.get_contents_as_string(), 'foocontent')
check_access_denied(obj.a2.set_contents_from_string, 'foooverwrite')
### TODO: i don't understand why this gets denied, but codifying what
### AWS does
# eq(obj.b2.get_contents_as_string(), 'barcontent')
check_access_denied(obj.b2.get_contents_as_string)
check_access_denied(obj.b2.set_contents_from_string, 'baroverwrite')
eq(get_bucket_key_names(obj.bucket2), frozenset(['foo', 'bar']))
check_access_denied(obj.new.set_contents_from_string, 'newcontent')
# TODO bucket.set_acl('public-read') fails on rgw
@attr('fails_on_rgw')
def test_access_bucket_publicread_object_publicreadwrite():
obj = _setup_access(bucket_acl='public-read', object_acl='public-read-write')
eq(obj.a2.get_contents_as_string(), 'foocontent')
### TODO: it seems AWS denies this write, even when we expected it
### to complete; as it is unclear what the actual desired behavior
### is (the docs are somewhat unclear), we'll just codify current
### AWS behavior, at least for now.
# obj.a2.set_contents_from_string('foooverwrite')
check_access_denied(obj.a2.set_contents_from_string, 'foooverwrite')
### TODO: i don't understand why this gets denied, but codifying what
### AWS does
# eq(obj.b2.get_contents_as_string(), 'barcontent')
check_access_denied(obj.b2.get_contents_as_string)
check_access_denied(obj.b2.set_contents_from_string, 'baroverwrite')
eq(get_bucket_key_names(obj.bucket2), frozenset(['foo', 'bar']))
check_access_denied(obj.new.set_contents_from_string, 'newcontent')
# TODO bucket.set_acl('public-read-write') fails on rgw
@attr('fails_on_rgw')
def test_access_bucket_publicreadwrite_object_private():
obj = _setup_access(bucket_acl='public-read-write', object_acl='private')
check_access_denied(obj.a2.get_contents_as_string)
obj.a2.set_contents_from_string('barcontent')
### TODO: i don't understand why this gets denied, but codifying what
### AWS does
# eq(obj.b2.get_contents_as_string(), 'barcontent')
check_access_denied(obj.b2.get_contents_as_string)
obj.b2.set_contents_from_string('baroverwrite')
eq(get_bucket_key_names(obj.bucket2), frozenset(['foo', 'bar']))
obj.new.set_contents_from_string('newcontent')
# TODO bucket.set_acl('public-read-write') fails on rgw
@attr('fails_on_rgw')
def test_access_bucket_publicreadwrite_object_publicread():
obj = _setup_access(bucket_acl='public-read-write', object_acl='public-read')
eq(obj.a2.get_contents_as_string(), 'foocontent')
obj.a2.set_contents_from_string('barcontent')
### TODO: i don't understand why this gets denied, but codifying what
### AWS does
# eq(obj.b2.get_contents_as_string(), 'barcontent')
check_access_denied(obj.b2.get_contents_as_string)
obj.b2.set_contents_from_string('baroverwrite')
eq(get_bucket_key_names(obj.bucket2), frozenset(['foo', 'bar']))
obj.new.set_contents_from_string('newcontent')
# TODO bucket.set_acl('public-read-write') fails on rgw
@attr('fails_on_rgw')
def test_access_bucket_publicreadwrite_object_publicreadwrite():
obj = _setup_access(bucket_acl='public-read-write', object_acl='public-read-write')
eq(obj.a2.get_contents_as_string(), 'foocontent')
obj.a2.set_contents_from_string('foooverwrite')
### TODO: i don't understand why this gets denied, but codifying what
### AWS does
# eq(obj.b2.get_contents_as_string(), 'barcontent')
check_access_denied(obj.b2.get_contents_as_string)
obj.b2.set_contents_from_string('baroverwrite')
eq(get_bucket_key_names(obj.bucket2), frozenset(['foo', 'bar']))
obj.new.set_contents_from_string('newcontent')

14
utils.py Normal file
View file

@ -0,0 +1,14 @@
def assert_raises(excClass, callableObj, *args, **kwargs):
"""
Like unittest.TestCase.assertRaises, but returns the exception.
"""
try:
callableObj(*args, **kwargs)
except excClass as e:
return e
else:
if hasattr(excClass, '__name__'):
excName = excClass.__name__
else:
excName = str(excClass)
raise AssertionError("%s not raised" % excName)