Refactor grant checking to ignore order of grants.

This commit is contained in:
Tommi Virtanen 2011-04-18 09:58:10 -07:00
parent 344b182260
commit 3f0828b855

View file

@ -3,6 +3,7 @@ import boto.exception
import boto.s3.connection import boto.s3.connection
import bunch import bunch
import itertools import itertools
import operator
import os import os
import random import random
import string import string
@ -177,6 +178,24 @@ def check_access_denied(fn, *args, **kwargs):
eq(e.error_code, 'AccessDenied') eq(e.error_code, 'AccessDenied')
def check_grants(got, want):
"""
Check that grants list in got matches the dictionaries in want,
in any order.
"""
eq(len(got), len(want))
got = sorted(got, key=operator.attrgetter('id'))
want = sorted(want, key=operator.itemgetter('id'))
for g,w in zip(got, want):
w = dict(w)
eq(g.permission, w.pop('permission'))
eq(g.id, w.pop('id'))
eq(g.display_name, w.pop('display_name'))
eq(g.uri, w.pop('uri'))
eq(g.email_address, w.pop('email_address'))
eq(g.type, w.pop('type'))
eq(w, {})
def test_bucket_list_empty(): def test_bucket_list_empty():
bucket = get_new_bucket() bucket = get_new_bucket()
l = bucket.list() l = bucket.list()
@ -389,13 +408,19 @@ def test_bucket_acl_default():
eq(policy.owner.type, None) eq(policy.owner.type, None)
eq(policy.owner.id, config.main.user_id) eq(policy.owner.id, config.main.user_id)
eq(policy.owner.display_name, config.main.display_name) eq(policy.owner.display_name, config.main.display_name)
eq(len(policy.acl.grants), 1) check_grants(
eq(policy.acl.grants[0].permission, 'FULL_CONTROL') policy.acl.grants,
eq(policy.acl.grants[0].id, policy.owner.id) [
eq(policy.acl.grants[0].display_name, policy.owner.display_name) dict(
eq(policy.acl.grants[0].uri, None) permission='FULL_CONTROL',
eq(policy.acl.grants[0].email_address, None) id=policy.owner.id,
eq(policy.acl.grants[0].type, 'CanonicalUser') display_name=policy.owner.display_name,
uri=None,
email_address=None,
type='CanonicalUser',
),
],
)
def test_bucket_acl_canned(): def test_bucket_acl_canned():
@ -404,31 +429,45 @@ def test_bucket_acl_canned():
bucket.set_acl('public-read') bucket.set_acl('public-read')
policy = bucket.get_acl() policy = bucket.get_acl()
print repr(policy) print repr(policy)
eq(len(policy.acl.grants), 2) check_grants(
eq(policy.acl.grants[0].permission, 'FULL_CONTROL') policy.acl.grants,
eq(policy.acl.grants[0].id, policy.owner.id) [
eq(policy.acl.grants[0].display_name, policy.owner.display_name) dict(
eq(policy.acl.grants[0].uri, None) permission='FULL_CONTROL',
eq(policy.acl.grants[0].email_address, None) id=policy.owner.id,
eq(policy.acl.grants[0].type, 'CanonicalUser') display_name=policy.owner.display_name,
eq(policy.acl.grants[1].permission, 'READ') uri=None,
eq(policy.acl.grants[1].id, None) email_address=None,
eq(policy.acl.grants[1].display_name, None) type='CanonicalUser',
eq(policy.acl.grants[1].uri, 'http://acs.amazonaws.com/groups/global/AllUsers') ),
eq(policy.acl.grants[1].email_address, None) dict(
eq(policy.acl.grants[1].type, 'Group') permission='READ',
id=None,
display_name=None,
uri='http://acs.amazonaws.com/groups/global/AllUsers',
email_address=None,
type='Group',
),
],
)
# Then back to private. # Then back to private.
bucket.set_acl('private') bucket.set_acl('private')
policy = bucket.get_acl() policy = bucket.get_acl()
print repr(policy) print repr(policy)
eq(len(policy.acl.grants), 1) check_grants(
eq(policy.acl.grants[0].permission, 'FULL_CONTROL') policy.acl.grants,
eq(policy.acl.grants[0].id, policy.owner.id) [
eq(policy.acl.grants[0].display_name, policy.owner.display_name) dict(
eq(policy.acl.grants[0].uri, None) permission='FULL_CONTROL',
eq(policy.acl.grants[0].email_address, None) id=policy.owner.id,
eq(policy.acl.grants[0].type, 'CanonicalUser') display_name=policy.owner.display_name,
uri=None,
email_address=None,
type='CanonicalUser',
),
],
)
def test_bucket_acl_canned_private_to_private(): def test_bucket_acl_canned_private_to_private():
@ -445,13 +484,27 @@ def test_bucket_acl_grant_userid():
policy.acl.add_user_grant('FULL_CONTROL', config.alt.user_id) policy.acl.add_user_grant('FULL_CONTROL', config.alt.user_id)
bucket.set_acl(policy) bucket.set_acl(policy)
policy = bucket.get_acl() policy = bucket.get_acl()
eq(len(policy.acl.grants), 2) check_grants(
eq(policy.acl.grants[1].permission, 'FULL_CONTROL') policy.acl.grants,
eq(policy.acl.grants[1].id, config.alt.user_id) [
eq(policy.acl.grants[1].display_name, config.alt.display_name) dict(
eq(policy.acl.grants[1].uri, None) permission='FULL_CONTROL',
eq(policy.acl.grants[1].email_address, None) id=policy.owner.id,
eq(policy.acl.grants[1].type, 'CanonicalUser') display_name=policy.owner.display_name,
uri=None,
email_address=None,
type='CanonicalUser',
),
dict(
permission='FULL_CONTROL',
id=config.alt.user_id,
display_name=config.alt.display_name,
uri=None,
email_address=None,
type='CanonicalUser',
),
],
)
# alt user can write # alt user can write
bucket2 = s3.alt.get_bucket(bucket.name) bucket2 = s3.alt.get_bucket(bucket.name)
@ -468,13 +521,27 @@ def test_bucket_acl_grant_email():
policy.acl.add_email_grant('FULL_CONTROL', config.alt.email) policy.acl.add_email_grant('FULL_CONTROL', config.alt.email)
bucket.set_acl(policy) bucket.set_acl(policy)
policy = bucket.get_acl() policy = bucket.get_acl()
eq(len(policy.acl.grants), 2) check_grants(
eq(policy.acl.grants[1].permission, 'FULL_CONTROL') policy.acl.grants,
eq(policy.acl.grants[1].id, config.alt.user_id) [
eq(policy.acl.grants[1].display_name, config.alt.display_name) dict(
eq(policy.acl.grants[1].uri, None) permission='FULL_CONTROL',
eq(policy.acl.grants[1].email_address, None) id=policy.owner.id,
eq(policy.acl.grants[1].type, 'CanonicalUser') display_name=policy.owner.display_name,
uri=None,
email_address=None,
type='CanonicalUser',
),
dict(
permission='FULL_CONTROL',
id=config.alt.user_id,
display_name=config.alt.display_name,
uri=None,
email_address=None,
type='CanonicalUser',
),
],
)
# alt user can write # alt user can write
bucket2 = s3.alt.get_bucket(bucket.name) bucket2 = s3.alt.get_bucket(bucket.name)