Merge pull request #191 from theanalyst/policy-conditionals

test bucket policy with conditionals 

Reviewed-by: Adam C. Emerson <aemerson@redhat.com>
Reviewed-by: Casey Bodley <cbodley@redhat.com>
This commit is contained in:
Casey Bodley 2018-01-25 11:06:06 -05:00 committed by GitHub
commit 8ef3465f0e
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
3 changed files with 744 additions and 17 deletions

View file

@ -0,0 +1,46 @@
import json
class Statement(object):
def __init__(self, action, resource, principal = {"AWS" : "*"}, effect= "Allow", condition = None):
self.principal = principal
self.action = action
self.resource = resource
self.condition = condition
self.effect = effect
def to_dict(self):
d = { "Action" : self.action,
"Principal" : self.principal,
"Effect" : self.effect,
"Resource" : self.resource
}
if self.condition is not None:
d["Condition"] = self.condition
return d
class Policy(object):
def __init__(self):
self.statements = []
def add_statement(self, s):
self.statements.append(s)
return self
def to_json(self):
policy_dict = {
"Version" : "2012-10-17",
"Statement":
[s.to_dict() for s in self.statements]
}
return json.dumps(policy_dict)
def make_json_policy(action, resource, principal={"AWS": "*"}, conditions=None):
"""
Helper function to make single statement policies
"""
s = Statement(action, resource, principal, condition=conditions)
p = Policy()
return p.add_statement(s).to_json()

View file

@ -39,9 +39,13 @@ from nose.plugins.attrib import attr
from nose.plugins.skip import SkipTest from nose.plugins.skip import SkipTest
from boto.s3.tagging import TagSet from boto.s3.tagging import TagSet
import utils
from .utils import assert_raises from .utils import assert_raises
from .utils import generate_random from .utils import generate_random
from .utils import region_sync_meta from .utils import region_sync_meta
from .policy import Policy, Statement, make_json_policy
import AnonymousAuth import AnonymousAuth
from email.header import decode_header from email.header import decode_header
@ -8737,6 +8741,9 @@ def test_sse_kms_read_declare():
e = assert_raises(boto.exception.S3ResponseError, key.get_contents_as_string, headers=sse_kms_client_headers) e = assert_raises(boto.exception.S3ResponseError, key.get_contents_as_string, headers=sse_kms_client_headers)
eq(e.status, 400) eq(e.status, 400)
def _make_arn_resource(path="*"):
return "arn:aws:s3:::{}".format(path)
@attr(resource='bucket') @attr(resource='bucket')
@attr(method='get') @attr(method='get')
@attr(operation='Test Bucket Policy') @attr(operation='Test Bucket Policy')
@ -8903,7 +8910,7 @@ def test_bucket_policy_another_bucket():
@attr(resource='bucket') @attr(resource='bucket')
@attr(method='put') @attr(method='put')
@attr(operation='Test put condition operator end with ifExists') @attr(operation='Test put condition operator end with ifExists')
@attr('policy') @attr('bucket-policy')
def test_bucket_policy_set_condition_operator_end_with_IfExists(): def test_bucket_policy_set_condition_operator_end_with_IfExists():
bucket = _create_keys(keys=['foo']) bucket = _create_keys(keys=['foo'])
policy = '''{ policy = '''{
@ -8936,6 +8943,194 @@ def test_bucket_policy_set_condition_operator_end_with_IfExists():
eq(res.status, 403) eq(res.status, 403)
@attr(resource='bucket')
@attr(method='get')
@attr(operation='Test listbucket with prefix')
@attr('bucket-policy')
def test_bucket_policy_list_bucket_with_prefix():
bucket = _create_keys(keys=['foo','folder/foo1','folder/foo2','folder/foo3','foo2'])
conditional = {"StringEquals": {
"s3:prefix" : "folder"
}}
resource = _make_arn_resource(bucket.name)
p = Policy()
s = Statement("s3:ListBucket", resource, condition=conditional)
policy_document = p.add_statement(s).to_json()
eq(bucket.set_policy(policy_document), True)
new_conn = _get_alt_connection()
# boto2 cannot give me a bucket object without doing a get bucket :/
res = new_conn.make_request('GET', bucket.name, query_args = 'prefix=folder')
eq(res.status, 200)
ns = {"aws" : "http://s3.amazonaws.com/doc/2006-03-01/"}
keys = ET.fromstring(res.read()).findall('.//aws:Key', ns)
eq(len(keys), 3)
res = new_conn.make_request('GET', bucket.name, query_args = 'prefix=somethingelse')
eq(res.status, 403)
res = new_conn.make_request('GET', bucket.name)
eq(res.status, 403)
@attr(resource='bucket')
@attr(method='get')
@attr(operation='Test listbucket with maxkeys')
@attr('bucket-policy')
def test_bucket_policy_list_bucket_with_maxkeys():
bucket = _create_keys(keys=['key'+str(i) for i in range(8)])
list_conditional = {"NumericLessThanEquals": {
"s3:max-keys" : "6"
}}
resource = _make_arn_resource(bucket.name)
policy_document = make_json_policy("s3:ListBucket",
resource,
conditions=list_conditional)
eq(bucket.set_policy(policy_document), True)
new_conn = _get_alt_connection()
res = new_conn.make_request('GET', bucket.name, query_args = 'max-keys=6')
eq(res.status, 200)
ns = {"aws" : "http://s3.amazonaws.com/doc/2006-03-01/"}
keys = ET.fromstring(res.read()).findall('.//aws:Key', ns)
eq(len(keys), 6)
res = new_conn.make_request('GET', bucket.name, query_args = 'max-keys=5')
eq(res.status, 200)
ns = {"aws" : "http://s3.amazonaws.com/doc/2006-03-01/"}
keys = ET.fromstring(res.read()).findall('.//aws:Key', ns)
eq(len(keys), 5)
res = new_conn.make_request('GET', bucket.name, query_args = 'max-keys=7')
eq(res.status, 403)
res = new_conn.make_request('GET', bucket.name)
eq(res.status, 403)
@attr(resource='bucket')
@attr(method='get')
@attr(operation='Test listbucket with delimiter')
@attr('bucket-policy')
def test_bucket_policy_list_bucket_with_delimiter():
bucket = _create_keys(keys=['key/'+str(i) for i in range(5)])
list_conditional = {"StringEquals": {
"s3:delimiter" : "/"
}}
resource = _make_arn_resource(bucket.name)
policy_document = make_json_policy("s3:ListBucket",
resource,
conditions=list_conditional)
eq(bucket.set_policy(policy_document), True)
new_conn = _get_alt_connection()
# specifying a delimiter will list contents without the delimiter
res = new_conn.make_request('GET', bucket.name, query_args = 'delimiter=/')
eq(res.status, 200)
ns = {"aws" : "http://s3.amazonaws.com/doc/2006-03-01/"}
keys = ET.fromstring(res.read()).findall('.//aws:Key', ns)
eq(len(keys), 0)
# now lets upload some keys again
bucket2 = _create_keys(keys=['key'+str(i) for i in range(5)])
resource = _make_arn_resource(bucket2.name)
policy2 = make_json_policy("s3:ListBucket",
resource,
conditions=list_conditional)
eq(bucket2.set_policy(policy2), True)
res = new_conn.make_request('GET', bucket2.name, query_args = 'delimiter=/')
eq(res.status, 200)
ns = {"aws" : "http://s3.amazonaws.com/doc/2006-03-01/"}
keys = ET.fromstring(res.read()).findall('.//aws:Key', ns)
eq(len(keys), 5)
res = new_conn.make_request('GET', bucket.name)
eq(res.status, 403)
@attr(resource='bucket')
@attr(method='put')
@attr(operation='Test put bucket acl with canned acl conditionals')
@attr('bucket-policy')
def test_bucket_policy_list_put_bucket_acl_canned_acl():
bucket = _create_keys(keys=['key/'+str(i) for i in range(5)])
policy_conditional = {"StringEquals": {
"s3:x-amz-acl" : "bucket-owner-full-control"
}}
resource = _make_arn_resource(bucket.name)
policy_document = make_json_policy("s3:PutBucketAcl",resource,
conditions=policy_conditional)
eq(bucket.set_policy(policy_document), True)
new_conn = _get_alt_connection()
# This doesn't make that much sense as a standalone bucket policy, however
# this is useful when this is used as an object level policy
headers = {"x-amz-acl":"bucket-owner-full-control"}
res = new_conn.make_request('PUT', bucket.name, query_args = 'acl', headers=headers)
eq(res.status, 200)
# now lets upload some keys again
headers = {"x-amz-acl":"public-read"}
res = new_conn.make_request('PUT', bucket.name, query_args = 'acl', headers=headers)
eq(res.status, 403)
@attr(resource='bucket')
@attr(method='put')
@attr(operation='Test put bucket acl with acl grant headers')
@attr('bucket-policy')
def test_bucket_policy_list_put_bucket_acl_grants():
bucket = _create_keys(keys=['key/'+str(i) for i in range(5)])
owner_id_str = "id="+config.main.user_id
policy_conditional = {"StringEquals": {
"s3:x-amz-grant-full-control" : owner_id_str
}}
resource = _make_arn_resource(bucket.name)
policy_document = make_json_policy("s3:PutBucketAcl",resource,
conditions=policy_conditional)
eq(bucket.set_policy(policy_document), True)
new_conn = _get_alt_connection()
headers = {"x-amz-grant-full-control": owner_id_str}
res = new_conn.make_request('PUT', bucket.name, query_args = 'acl', headers=headers)
eq(res.status, 200)
# user trying to elevate himself as the owner
headers = {"x-amz-grant-full-control": "id=" + config.alt.user_id }
res = new_conn.make_request('PUT', bucket.name, query_args = 'acl', headers=headers)
eq(res.status, 403)
headers = {"x-amz-grant-read": owner_id_str}
res = new_conn.make_request('PUT', bucket.name, query_args = 'acl', headers=headers)
eq(res.status, 403)
def _tags_from_dict(d): def _tags_from_dict(d):
tag_list = [] tag_list = []
for k,v in d.items(): for k,v in d.items():
@ -9273,28 +9468,13 @@ def test_put_obj_with_tags():
res_tagset = _get_obj_tags(bucket, key.name) res_tagset = _get_obj_tags(bucket, key.name)
eq(input_tagset.to_dict(), res_tagset.to_dict()) eq(input_tagset.to_dict(), res_tagset.to_dict())
def _make_arn_resource(path="*"):
return "arn:aws:s3:::{}".format(path)
def make_json_policy(action, resource, principal={"AWS": "*"}):
return json.dumps(
{
"Version": "2012-10-17",
"Statement": [{
"Effect": "Allow",
"Principal": principal,
"Action": action,
"Resource": [
resource
]
}]
})
@attr(resource='object') @attr(resource='object')
@attr(method='get') @attr(method='get')
@attr(operation='Test GetObjTagging public read') @attr(operation='Test GetObjTagging public read')
@attr(assertion='success') @attr(assertion='success')
@attr('tagging') @attr('tagging')
@attr('bucket-policy')
def test_get_tags_acl_public(): def test_get_tags_acl_public():
bucket, key = _create_key_with_random_content('testputtagsacl') bucket, key = _create_key_with_random_content('testputtagsacl')
@ -9315,6 +9495,7 @@ def test_get_tags_acl_public():
@attr(operation='Test PutObjTagging public wrote') @attr(operation='Test PutObjTagging public wrote')
@attr(assertion='success') @attr(assertion='success')
@attr('tagging') @attr('tagging')
@attr('bucket-policy')
def test_put_tags_acl_public(): def test_put_tags_acl_public():
bucket, key = _create_key_with_random_content('testputtagsacl') bucket, key = _create_key_with_random_content('testputtagsacl')
@ -9335,6 +9516,7 @@ def test_put_tags_acl_public():
@attr(operation='Test DeleteObjTagging public') @attr(operation='Test DeleteObjTagging public')
@attr(assertion='success') @attr(assertion='success')
@attr('tagging') @attr('tagging')
@attr('bucket-policy')
def test_delete_tags_obj_public(): def test_delete_tags_obj_public():
bucket, key = _create_key_with_random_content('testputtagsacl') bucket, key = _create_key_with_random_content('testputtagsacl')
@ -9417,3 +9599,493 @@ def test_versioning_bucket_multipart_upload_return_version_id():
(upload, data) = _multipart_upload(bucket, key_name, objlen, headers={'Content-Type': content_type}, metadata={'foo': 'baz'}) (upload, data) = _multipart_upload(bucket, key_name, objlen, headers={'Content-Type': content_type}, metadata={'foo': 'baz'})
res = upload.complete_upload() res = upload.complete_upload()
assert_is_none(res.version_id) assert_is_none(res.version_id)
@attr(resource='object')
@attr(method='get')
@attr(operation='Test ExistingObjectTag conditional on get object')
@attr(assertion='success')
@attr('tagging')
@attr('bucket-policy')
def test_bucket_policy_get_obj_existing_tag():
bucket = _create_keys(keys=['publictag','privatetag','invalidtag'])
tag_conditional = {"StringEquals": {
"s3:ExistingObjectTag/security" : "public"
}}
resource = _make_arn_resource("{}/{}".format(bucket.name, "*"))
policy_document = make_json_policy("s3:GetObject",
resource,
conditions=tag_conditional)
bucket.set_policy(policy_document)
input_tagset = S3TestTagSet()
input_tagset.add_tag('security','public')
input_tagset.add_tag('foo','bar')
input_tagset2 = S3TestTagSet()
input_tagset2.add_tag('security','private')
input_tagset3 = S3TestTagSet()
input_tagset3.add_tag('security1','public')
res = _put_obj_tags(bucket, 'publictag', input_tagset.to_xml())
eq(res.status, 200)
res = _put_obj_tags(bucket, 'privatetag', input_tagset2.to_xml())
eq(res.status, 200)
res = _put_obj_tags(bucket, 'invalidtag', input_tagset3.to_xml())
eq(res.status, 200)
new_conn = _get_alt_connection()
res = new_conn.make_request("GET",bucket.name, 'publictag')
eq(res.status, 200)
res = new_conn.make_request("GET",bucket.name, 'privatetag')
eq(res.status, 403)
res = new_conn.make_request("GET",bucket.name, 'invalidtag')
eq(res.status, 403)
@attr(resource='object')
@attr(method='get')
@attr(operation='Test ExistingObjectTag conditional on get object tagging')
@attr(assertion='success')
@attr('tagging')
@attr('bucket-policy')
def test_bucket_policy_get_obj_tagging_existing_tag():
bucket = _create_keys(keys=['publictag','privatetag','invalidtag'])
tag_conditional = {"StringEquals": {
"s3:ExistingObjectTag/security" : "public"
}}
resource = _make_arn_resource("{}/{}".format(bucket.name, "*"))
policy_document = make_json_policy("s3:GetObjectTagging",
resource,
conditions=tag_conditional)
bucket.set_policy(policy_document)
input_tagset = S3TestTagSet()
input_tagset.add_tag('security','public')
input_tagset.add_tag('foo','bar')
input_tagset2 = S3TestTagSet()
input_tagset2.add_tag('security','private')
input_tagset3 = S3TestTagSet()
input_tagset3.add_tag('security1','public')
res = _put_obj_tags(bucket, 'publictag', input_tagset.to_xml())
eq(res.status, 200)
res = _put_obj_tags(bucket, 'privatetag', input_tagset2.to_xml())
eq(res.status, 200)
res = _put_obj_tags(bucket, 'invalidtag', input_tagset3.to_xml())
eq(res.status, 200)
new_conn = _get_alt_connection()
res = new_conn.make_request("GET",bucket.name, 'publictag', query_args='tagging')
eq(res.status, 200)
# A get object itself should fail since we allowed only GetObjectTagging
res = new_conn.make_request("GET",bucket.name, 'publictag')
eq(res.status, 403)
res = new_conn.make_request("GET",bucket.name, 'privatetag', query_args='tagging')
eq(res.status, 403)
res = new_conn.make_request("GET",bucket.name, 'invalidtag', query_args='tagging')
eq(res.status, 403)
@attr(resource='object')
@attr(method='get')
@attr(operation='Test ExistingObjectTag conditional on put object tagging')
@attr(assertion='success')
@attr('tagging')
@attr('bucket-policy')
def test_bucket_policy_put_obj_tagging_existing_tag():
bucket = _create_keys(keys=['publictag','privatetag','invalidtag'])
tag_conditional = {"StringEquals": {
"s3:ExistingObjectTag/security" : "public"
}}
resource = _make_arn_resource("{}/{}".format(bucket.name, "*"))
policy_document = make_json_policy("s3:PutObjectTagging",
resource,
conditions=tag_conditional)
bucket.set_policy(policy_document)
input_tagset = S3TestTagSet()
input_tagset.add_tag('security','public')
input_tagset.add_tag('foo','bar')
res = _put_obj_tags(bucket, 'publictag', input_tagset.to_xml())
eq(res.status, 200)
input_tagset2 = S3TestTagSet()
input_tagset2.add_tag('security','private')
res = _put_obj_tags(bucket, 'privatetag', input_tagset2.to_xml())
eq(res.status, 200)
new_conn = _get_alt_connection()
# PUT requests with object tagging are a bit wierd, if you forget to put
# the tag which is supposed to be existing anymore well, well subsequent
# put requests will fail
testtagset1 = S3TestTagSet()
testtagset1.add_tag('security','public')
testtagset1.add_tag('foo','bar')
res = _put_obj_tags_conn(new_conn, bucket.name, 'publictag', testtagset1.to_xml())
eq(res.status, 200)
res = _put_obj_tags_conn(new_conn, bucket.name, 'privatetag', testtagset1.to_xml())
eq(res.status, 403)
testtagset2 = S3TestTagSet()
testtagset2.add_tag('security','private')
res = _put_obj_tags_conn(new_conn, bucket.name, 'publictag', testtagset2.to_xml())
eq(res.status, 200)
# Now try putting the original tags again, this should fail
res = _put_obj_tags_conn(new_conn, bucket.name, 'publictag', testtagset1.to_xml())
eq(res.status, 403)
@attr(resource='object')
@attr(method='put')
@attr(operation='Test copy-source conditional on put obj')
@attr(assertion='success')
@attr('tagging')
@attr('bucket-policy')
def test_bucket_policy_put_obj_copy_source():
bucket_source = _create_keys(keys=['public/foo', 'public/bar', 'private/foo'])
src_resource = _make_arn_resource("{}/{}".format(bucket_source.name, "*"))
# lets make the source objects public for now
policy_document = make_json_policy("s3:GetObject",
src_resource
)
bucket_source.set_policy(policy_document)
bucket = get_new_bucket()
tag_conditional = {"StringLike": {
"s3:x-amz-copy-source" : bucket_source.name + "/public/*"
}}
resource = _make_arn_resource("{}/{}".format(bucket.name, "*"))
policy_document = make_json_policy("s3:PutObject",
resource,
conditions=tag_conditional)
bucket.set_policy(policy_document)
new_conn = _get_alt_connection()
alt_bucket = new_conn.get_bucket(bucket.name, validate=False)
key = alt_bucket.copy_key('new_foo', bucket_source.name, 'public/foo')
# This is possible because we are still the owner, see the grants with
# policy on how to do this right
eq(key.get_contents_as_string(), 'public/foo')
key = alt_bucket.copy_key('new_foo2', bucket_source.name, 'public/bar')
eq(key.get_contents_as_string(), 'public/bar')
check_access_denied(alt_bucket.copy_key,'new_foo2', bucket_source.name, 'private/foo')
@attr(resource='object')
@attr(method='put')
@attr(operation='Test copy-source conditional on put obj')
@attr(assertion='success')
@attr('tagging')
@attr('bucket-policy')
def test_bucket_policy_put_obj_copy_source_meta():
bucket_source = _create_keys(keys=['public/foo', 'public/bar'])
src_resource = _make_arn_resource("{}/{}".format(bucket_source.name, "*"))
# lets make the source objects public for now
policy_document = make_json_policy("s3:GetObject",
src_resource
)
bucket_source.set_policy(policy_document)
bucket = get_new_bucket()
tag_conditional = {"StringEquals": {
"s3:x-amz-metadata-directive" : "COPY"
}}
resource = _make_arn_resource("{}/{}".format(bucket.name, "*"))
policy_document = make_json_policy("s3:PutObject",
resource,
conditions=tag_conditional)
bucket.set_policy(policy_document)
new_conn = _get_alt_connection()
alt_bucket = new_conn.get_bucket(bucket.name, validate=False)
key = alt_bucket.copy_key('new_foo', bucket_source.name, 'public/foo', headers={"x-metadata-directive" : "COPY"})
# This is possible because we are still the owner, see the grants with
# policy on how to do this right
eq(key.get_contents_as_string(), 'public/foo')
check_access_denied(alt_bucket.copy_key, 'new_foo2', bucket_source.name, 'public/bar', metadata={"foo" : "bar"})
@attr(resource='object')
@attr(method='put')
@attr(operation='Test put obj with canned-acl not to be public')
@attr(assertion='success')
@attr('tagging')
@attr('bucket-policy')
def test_bucket_policy_put_obj_acl():
bucket = get_new_bucket()
# An allow conditional will require atleast the presence of an x-amz-acl
# attribute a Deny conditional would negate any requests that try to set a
# public-read/write acl
conditional = {"StringLike": {
"s3:x-amz-acl" : "public*"
}}
p = Policy()
resource = _make_arn_resource("{}/{}".format(bucket.name, "*"))
s1 = Statement("s3:PutObject",resource)
s2 = Statement("s3:PutObject", resource, effect="Deny", condition=conditional)
policy_document = p.add_statement(s1).add_statement(s2).to_json()
bucket.set_policy(policy_document)
new_conn = _get_alt_connection()
#alt_bucket = new_conn.get_bucket(bucket.name, validate=False)
key1 ='private-key'
#obj1 = alt_bucket.new_key(key1)
# if we want to be really pedantic, we should check that this doesn't raise
# and mark a failure, however if this does raise nosetests would mark this
# as an ERROR anyway
#obj1.set_contents_from_string(key1)
headers = {"x-amz-acl" : "private"}
res = new_conn.make_request('PUT', bucket.name, key1, data=key1)
eq(res.status, 200)
key2 = 'public-key'
headers = {"x-amz-acl":"public-read"}
# so there is no way to create a key and set canned acl in the same request in boto2 :(
res = new_conn.make_request('PUT', bucket.name, key2, headers=headers, data=key2)
eq(res.status, 403)
@attr(resource='object')
@attr(method='put')
@attr(operation='Test put obj with amz-grant back to bucket-owner')
@attr(assertion='success')
@attr('bucket-policy')
def test_bucket_policy_put_obj_grant():
bucket1 = get_new_bucket()
bucket2 = get_new_bucket()
# In normal cases a key owner would be the uploader of a key in first case
# we explicitly require that the bucket owner is granted full control over
# the object uploaded by any user, the second bucket is where no such
# policy is enforced meaning that the uploader still retains ownership
owner_id_str = "id=" + config.main.user_id
grantee_id_str = "id=" + config.alt.user_id
s3_conditional = {"StringEquals": {
"s3:x-amz-grant-full-control" : owner_id_str
}}
resource = _make_arn_resource("{}/{}".format(bucket1.name, "*"))
policy_document = make_json_policy("s3:PutObject",
resource,
conditions=s3_conditional)
resource = _make_arn_resource("{}/{}".format(bucket2.name, "*"))
policy_document2 = make_json_policy("s3:PutObject", resource)
bucket1.set_policy(policy_document)
bucket2.set_policy(policy_document2)
new_conn = _get_alt_connection()
key1 = 'key1'
headers = {"x-amz-grant-full-control": owner_id_str }
res = new_conn.make_request('PUT', bucket1.name, key1, headers=headers, data=key1)
eq(res.status, 200)
key2 = 'key2'
res = new_conn.make_request('PUT', bucket2.name, key2, data=key2)
eq(res.status, 200)
acl1 = bucket1.get_acl(key_name=key1)
# user 1 is trying to get acl for the object from user2 where ownership
# wasn't transferred
check_access_denied(bucket2.get_acl, key_name=key2)
acl2 = new_conn.get_bucket(bucket2.name, validate=False).get_acl(key_name=key2)
# With the full control grant, the owner of the object is the granted
# original bucket owner
eq(utils.get_grantee(acl1, "FULL_CONTROL"), config.main.user_id)
# Normal case without any restrictions, owner is the uploader
eq(utils.get_grantee(acl2, "FULL_CONTROL"), config.alt.user_id)
@attr(resource='object')
@attr(method='put')
@attr(operation='Deny put obj requests without encryption')
@attr(assertion='success')
@attr('encryption')
@attr('bucket-policy')
def test_bucket_policy_put_obj_enc():
bucket = get_new_bucket()
deny_incorrect_algo = {
"StringNotEquals": {
"s3:x-amz-server-side-encryption": "AES256"
}
}
deny_unencrypted_obj = {
"Null" : {
"s3:x-amz-server-side-encryption": "true"
}
}
p = Policy()
resource = _make_arn_resource("{}/{}".format(bucket.name, "*"))
s1 = Statement("s3:PutObject", resource, effect="Deny", condition=deny_incorrect_algo)
s2 = Statement("s3:PutObject", resource, effect="Deny", condition=deny_unencrypted_obj)
policy_document = p.add_statement(s1).add_statement(s2).to_json()
bucket.set_policy(policy_document)
key1_str ='testobj'
key1 = bucket.new_key(key1_str)
check_access_denied(key1.set_contents_from_string, key1_str)
sse_client_headers = {
'x-amz-server-side-encryption' : 'AES256',
'x-amz-server-side-encryption-customer-algorithm': 'AES256',
'x-amz-server-side-encryption-customer-key': 'pO3upElrwuEXSoFwCfnZPdSsmt/xWeFa0N9KgDijwVs=',
'x-amz-server-side-encryption-customer-key-md5': 'DWygnHRtgiJ77HCm+1rvHw=='
}
key1.set_contents_from_string(key1_str, headers=sse_client_headers)
@attr(resource='object')
@attr(method='put')
@attr(operation='put obj with RequestObjectTag')
@attr(assertion='success')
@attr('tagging')
@attr('bucket-policy')
def test_bucket_policy_put_obj_request_obj_tag():
bucket = get_new_bucket()
tag_conditional = {"StringEquals": {
"s3:RequestObjectTag/security" : "public"
}}
p = Policy()
resource = _make_arn_resource("{}/{}".format(bucket.name, "*"))
s1 = Statement("s3:PutObject", resource, effect="Allow", condition=tag_conditional)
policy_document = p.add_statement(s1).to_json()
bucket.set_policy(policy_document)
new_conn = _get_alt_connection()
bucket1 = new_conn.get_bucket(bucket.name, validate=False)
key1_str ='testobj'
key1 = bucket1.new_key(key1_str)
check_access_denied(key1.set_contents_from_string, key1_str)
headers = {"x-amz-tagging" : "security=public"}
key1.set_contents_from_string(key1_str, headers=headers)
@attr(resource='object')
@attr(method='get')
@attr(operation='Test ExistingObjectTag conditional on get object acl')
@attr(assertion='success')
@attr('tagging')
@attr('bucket-policy')
def test_bucket_policy_get_obj_acl_existing_tag():
bucket = _create_keys(keys=['publictag','privatetag','invalidtag'])
tag_conditional = {"StringEquals": {
"s3:ExistingObjectTag/security" : "public"
}}
resource = _make_arn_resource("{}/{}".format(bucket.name, "*"))
policy_document = make_json_policy("s3:GetObjectAcl",
resource,
conditions=tag_conditional)
bucket.set_policy(policy_document)
input_tagset = S3TestTagSet()
input_tagset.add_tag('security','public')
input_tagset.add_tag('foo','bar')
input_tagset2 = S3TestTagSet()
input_tagset2.add_tag('security','private')
input_tagset3 = S3TestTagSet()
input_tagset3.add_tag('security1','public')
res = _put_obj_tags(bucket, 'publictag', input_tagset.to_xml())
eq(res.status, 200)
res = _put_obj_tags(bucket, 'privatetag', input_tagset2.to_xml())
eq(res.status, 200)
res = _put_obj_tags(bucket, 'invalidtag', input_tagset3.to_xml())
eq(res.status, 200)
new_conn = _get_alt_connection()
res = new_conn.make_request("GET",bucket.name, 'publictag', query_args='acl')
eq(res.status, 200)
# A get object itself should fail since we allowed only GetObjectTagging
res = new_conn.make_request("GET",bucket.name, 'publictag')
eq(res.status, 403)
res = new_conn.make_request("GET",bucket.name, 'privatetag', query_args='tagging')
eq(res.status, 403)
res = new_conn.make_request("GET",bucket.name, 'invalidtag', query_args='tagging')
eq(res.status, 403)

View file

@ -52,3 +52,12 @@ def region_sync_meta(targets, region):
if conf.sync_meta_wait: if conf.sync_meta_wait:
time.sleep(conf.sync_meta_wait) time.sleep(conf.sync_meta_wait)
def get_grantee(policy, permission):
'''
Given an object/bucket policy, extract the grantee with the required permission
'''
for g in policy.acl.grants:
if g.permission == permission:
return g.id