Compare commits

...

1 commit

Author SHA1 Message Date
caleb miles
1d39e5aee7 test_s3: Add test of ACL grants through HTTP headers.
Signed-off-by: caleb miles <caleb.miles@inktank.com>
2013-02-07 15:58:32 -05:00

View file

@ -28,6 +28,7 @@ from email.header import decode_header
from . import (
nuke_prefixed_buckets,
get_new_bucket,
get_new_bucket_name,
s3,
config,
get_prefix,
@ -2328,6 +2329,143 @@ def test_bucket_acl_no_grants():
# can write acl
bucket.set_acl('private')
def _get_acl_header(user=None, perms=None):
all_headers = ["read", "write", "read-acp", "write-acp", "full-control"]
headers = {}
if user == None:
user = config.alt.user_id
if perms != None:
for perm in perms:
headers["x-amz-grant-{perm}".format(perm=perm)] = "id={uid}".format(uid=user)
else:
for perm in all_headers:
headers["x-amz-grant-{perm}".format(perm=perm)] = "id={uid}".format(uid=user)
return headers
@attr(resource='object')
@attr(method='PUT')
@attr(operation='add all grants to user through headers')
@attr(assertion='adds all grants individually to second user')
@attr('fails_on_dho')
def test_object_header_acl_grants():
bucket = get_new_bucket()
headers = _get_acl_header()
k = bucket.new_key("foo_key")
k.set_contents_from_string("bar", headers=headers)
policy = k.get_acl()
check_grants(
policy.acl.grants,
[
dict(
permission='READ',
id=config.alt.user_id,
display_name=config.alt.display_name,
uri=None,
email_address=None,
type='CanonicalUser',
),
dict(
permission='WRITE',
id=config.alt.user_id,
display_name=config.alt.display_name,
uri=None,
email_address=None,
type='CanonicalUser',
),
dict(
permission='READ_ACP',
id=config.alt.user_id,
display_name=config.alt.display_name,
uri=None,
email_address=None,
type='CanonicalUser',
),
dict(
permission='WRITE_ACP',
id=config.alt.user_id,
display_name=config.alt.display_name,
uri=None,
email_address=None,
type='CanonicalUser',
),
dict(
permission='FULL_CONTROL',
id=config.alt.user_id,
display_name=config.alt.display_name,
uri=None,
email_address=None,
type='CanonicalUser',
),
],
)
@attr(resource='bucket')
@attr(method='PUT')
@attr(operation='add all grants to user through headers')
@attr(assertion='adds all grants individually to second user')
@attr('fails_on_dho')
def test_bucket_header_acl_grants():
headers = _get_acl_header()
bucket = s3.main.create_bucket(get_prefix(), headers=headers)
policy = bucket.get_acl()
check_grants(
policy.acl.grants,
[
dict(
permission='READ',
id=config.alt.user_id,
display_name=config.alt.display_name,
uri=None,
email_address=None,
type='CanonicalUser',
),
dict(
permission='WRITE',
id=config.alt.user_id,
display_name=config.alt.display_name,
uri=None,
email_address=None,
type='CanonicalUser',
),
dict(
permission='READ_ACP',
id=config.alt.user_id,
display_name=config.alt.display_name,
uri=None,
email_address=None,
type='CanonicalUser',
),
dict(
permission='WRITE_ACP',
id=config.alt.user_id,
display_name=config.alt.display_name,
uri=None,
email_address=None,
type='CanonicalUser',
),
dict(
permission='FULL_CONTROL',
id=config.alt.user_id,
display_name=config.alt.display_name,
uri=None,
email_address=None,
type='CanonicalUser',
),
],
)
# alt user can write
bucket2 = s3.alt.get_bucket(bucket.name)
key = bucket2.new_key('foo')
key.set_contents_from_string('bar')
# This test will fail on DH Objects. DHO allows multiple users with one account, which
# would violate the uniqueness requirement of a user's email. As such, DHO users are