diff --git a/s3tests/functional/policy.py b/s3tests/functional/policy.py new file mode 100644 index 0000000..aae5454 --- /dev/null +++ b/s3tests/functional/policy.py @@ -0,0 +1,46 @@ +import json + +class Statement(object): + def __init__(self, action, resource, principal = {"AWS" : "*"}, effect= "Allow", condition = None): + self.principal = principal + self.action = action + self.resource = resource + self.condition = condition + self.effect = effect + + def to_dict(self): + d = { "Action" : self.action, + "Principal" : self.principal, + "Effect" : self.effect, + "Resource" : self.resource + } + + if self.condition is not None: + d["Condition"] = self.condition + + return d + +class Policy(object): + def __init__(self): + self.statements = [] + + def add_statement(self, s): + self.statements.append(s) + return self + + def to_json(self): + policy_dict = { + "Version" : "2012-10-17", + "Statement": + [s.to_dict() for s in self.statements] + } + + return json.dumps(policy_dict) + +def make_json_policy(action, resource, principal={"AWS": "*"}, conditions=None): + """ + Helper function to make single statement policies + """ + s = Statement(action, resource, principal, condition=conditions) + p = Policy() + return p.add_statement(s).to_json() diff --git a/s3tests/functional/test_s3.py b/s3tests/functional/test_s3.py index 45a70c3..12b065a 100644 --- a/s3tests/functional/test_s3.py +++ b/s3tests/functional/test_s3.py @@ -39,9 +39,13 @@ from nose.plugins.attrib import attr from nose.plugins.skip import SkipTest from boto.s3.tagging import TagSet +import utils from .utils import assert_raises from .utils import generate_random from .utils import region_sync_meta + +from .policy import Policy, Statement, make_json_policy + import AnonymousAuth from email.header import decode_header @@ -8737,6 +8741,9 @@ def test_sse_kms_read_declare(): e = assert_raises(boto.exception.S3ResponseError, key.get_contents_as_string, headers=sse_kms_client_headers) eq(e.status, 400) +def _make_arn_resource(path="*"): + return "arn:aws:s3:::{}".format(path) + @attr(resource='bucket') @attr(method='get') @attr(operation='Test Bucket Policy') @@ -8903,7 +8910,7 @@ def test_bucket_policy_another_bucket(): @attr(resource='bucket') @attr(method='put') @attr(operation='Test put condition operator end with ifExists') -@attr('policy') +@attr('bucket-policy') def test_bucket_policy_set_condition_operator_end_with_IfExists(): bucket = _create_keys(keys=['foo']) policy = '''{ @@ -8936,6 +8943,194 @@ def test_bucket_policy_set_condition_operator_end_with_IfExists(): eq(res.status, 403) +@attr(resource='bucket') +@attr(method='get') +@attr(operation='Test listbucket with prefix') +@attr('bucket-policy') +def test_bucket_policy_list_bucket_with_prefix(): + bucket = _create_keys(keys=['foo','folder/foo1','folder/foo2','folder/foo3','foo2']) + conditional = {"StringEquals": { + "s3:prefix" : "folder" + }} + + resource = _make_arn_resource(bucket.name) + p = Policy() + s = Statement("s3:ListBucket", resource, condition=conditional) + policy_document = p.add_statement(s).to_json() + + eq(bucket.set_policy(policy_document), True) + + new_conn = _get_alt_connection() + + # boto2 cannot give me a bucket object without doing a get bucket :/ + res = new_conn.make_request('GET', bucket.name, query_args = 'prefix=folder') + eq(res.status, 200) + ns = {"aws" : "http://s3.amazonaws.com/doc/2006-03-01/"} + keys = ET.fromstring(res.read()).findall('.//aws:Key', ns) + eq(len(keys), 3) + + res = new_conn.make_request('GET', bucket.name, query_args = 'prefix=somethingelse') + eq(res.status, 403) + + res = new_conn.make_request('GET', bucket.name) + eq(res.status, 403) + + +@attr(resource='bucket') +@attr(method='get') +@attr(operation='Test listbucket with maxkeys') +@attr('bucket-policy') +def test_bucket_policy_list_bucket_with_maxkeys(): + bucket = _create_keys(keys=['key'+str(i) for i in range(8)]) + + list_conditional = {"NumericLessThanEquals": { + "s3:max-keys" : "6" + }} + + resource = _make_arn_resource(bucket.name) + policy_document = make_json_policy("s3:ListBucket", + resource, + conditions=list_conditional) + + + eq(bucket.set_policy(policy_document), True) + + new_conn = _get_alt_connection() + + + res = new_conn.make_request('GET', bucket.name, query_args = 'max-keys=6') + eq(res.status, 200) + ns = {"aws" : "http://s3.amazonaws.com/doc/2006-03-01/"} + keys = ET.fromstring(res.read()).findall('.//aws:Key', ns) + eq(len(keys), 6) + + res = new_conn.make_request('GET', bucket.name, query_args = 'max-keys=5') + eq(res.status, 200) + ns = {"aws" : "http://s3.amazonaws.com/doc/2006-03-01/"} + keys = ET.fromstring(res.read()).findall('.//aws:Key', ns) + eq(len(keys), 5) + + res = new_conn.make_request('GET', bucket.name, query_args = 'max-keys=7') + eq(res.status, 403) + + res = new_conn.make_request('GET', bucket.name) + eq(res.status, 403) + + +@attr(resource='bucket') +@attr(method='get') +@attr(operation='Test listbucket with delimiter') +@attr('bucket-policy') +def test_bucket_policy_list_bucket_with_delimiter(): + bucket = _create_keys(keys=['key/'+str(i) for i in range(5)]) + + list_conditional = {"StringEquals": { + "s3:delimiter" : "/" + }} + + resource = _make_arn_resource(bucket.name) + policy_document = make_json_policy("s3:ListBucket", + resource, + conditions=list_conditional) + eq(bucket.set_policy(policy_document), True) + + new_conn = _get_alt_connection() + + # specifying a delimiter will list contents without the delimiter + res = new_conn.make_request('GET', bucket.name, query_args = 'delimiter=/') + eq(res.status, 200) + ns = {"aws" : "http://s3.amazonaws.com/doc/2006-03-01/"} + keys = ET.fromstring(res.read()).findall('.//aws:Key', ns) + eq(len(keys), 0) + + # now lets upload some keys again + bucket2 = _create_keys(keys=['key'+str(i) for i in range(5)]) + resource = _make_arn_resource(bucket2.name) + policy2 = make_json_policy("s3:ListBucket", + resource, + conditions=list_conditional) + eq(bucket2.set_policy(policy2), True) + res = new_conn.make_request('GET', bucket2.name, query_args = 'delimiter=/') + eq(res.status, 200) + ns = {"aws" : "http://s3.amazonaws.com/doc/2006-03-01/"} + keys = ET.fromstring(res.read()).findall('.//aws:Key', ns) + eq(len(keys), 5) + + res = new_conn.make_request('GET', bucket.name) + eq(res.status, 403) + + +@attr(resource='bucket') +@attr(method='put') +@attr(operation='Test put bucket acl with canned acl conditionals') +@attr('bucket-policy') +def test_bucket_policy_list_put_bucket_acl_canned_acl(): + bucket = _create_keys(keys=['key/'+str(i) for i in range(5)]) + + policy_conditional = {"StringEquals": { + "s3:x-amz-acl" : "bucket-owner-full-control" + }} + + resource = _make_arn_resource(bucket.name) + policy_document = make_json_policy("s3:PutBucketAcl",resource, + conditions=policy_conditional) + eq(bucket.set_policy(policy_document), True) + + new_conn = _get_alt_connection() + + # This doesn't make that much sense as a standalone bucket policy, however + # this is useful when this is used as an object level policy + headers = {"x-amz-acl":"bucket-owner-full-control"} + res = new_conn.make_request('PUT', bucket.name, query_args = 'acl', headers=headers) + + eq(res.status, 200) + + # now lets upload some keys again + headers = {"x-amz-acl":"public-read"} + res = new_conn.make_request('PUT', bucket.name, query_args = 'acl', headers=headers) + + eq(res.status, 403) + + + +@attr(resource='bucket') +@attr(method='put') +@attr(operation='Test put bucket acl with acl grant headers') +@attr('bucket-policy') +def test_bucket_policy_list_put_bucket_acl_grants(): + bucket = _create_keys(keys=['key/'+str(i) for i in range(5)]) + + + owner_id_str = "id="+config.main.user_id + policy_conditional = {"StringEquals": { + "s3:x-amz-grant-full-control" : owner_id_str + }} + + resource = _make_arn_resource(bucket.name) + policy_document = make_json_policy("s3:PutBucketAcl",resource, + conditions=policy_conditional) + eq(bucket.set_policy(policy_document), True) + + new_conn = _get_alt_connection() + + headers = {"x-amz-grant-full-control": owner_id_str} + res = new_conn.make_request('PUT', bucket.name, query_args = 'acl', headers=headers) + + eq(res.status, 200) + + # user trying to elevate himself as the owner + headers = {"x-amz-grant-full-control": "id=" + config.alt.user_id } + res = new_conn.make_request('PUT', bucket.name, query_args = 'acl', headers=headers) + + eq(res.status, 403) + + headers = {"x-amz-grant-read": owner_id_str} + res = new_conn.make_request('PUT', bucket.name, query_args = 'acl', headers=headers) + + eq(res.status, 403) + + + def _tags_from_dict(d): tag_list = [] for k,v in d.items(): @@ -9273,28 +9468,13 @@ def test_put_obj_with_tags(): res_tagset = _get_obj_tags(bucket, key.name) eq(input_tagset.to_dict(), res_tagset.to_dict()) -def _make_arn_resource(path="*"): - return "arn:aws:s3:::{}".format(path) - -def make_json_policy(action, resource, principal={"AWS": "*"}): - return json.dumps( - { - "Version": "2012-10-17", - "Statement": [{ - "Effect": "Allow", - "Principal": principal, - "Action": action, - "Resource": [ - resource - ] - }] - }) @attr(resource='object') @attr(method='get') @attr(operation='Test GetObjTagging public read') @attr(assertion='success') @attr('tagging') +@attr('bucket-policy') def test_get_tags_acl_public(): bucket, key = _create_key_with_random_content('testputtagsacl') @@ -9315,6 +9495,7 @@ def test_get_tags_acl_public(): @attr(operation='Test PutObjTagging public wrote') @attr(assertion='success') @attr('tagging') +@attr('bucket-policy') def test_put_tags_acl_public(): bucket, key = _create_key_with_random_content('testputtagsacl') @@ -9335,6 +9516,7 @@ def test_put_tags_acl_public(): @attr(operation='Test DeleteObjTagging public') @attr(assertion='success') @attr('tagging') +@attr('bucket-policy') def test_delete_tags_obj_public(): bucket, key = _create_key_with_random_content('testputtagsacl') @@ -9417,3 +9599,493 @@ def test_versioning_bucket_multipart_upload_return_version_id(): (upload, data) = _multipart_upload(bucket, key_name, objlen, headers={'Content-Type': content_type}, metadata={'foo': 'baz'}) res = upload.complete_upload() assert_is_none(res.version_id) + +@attr(resource='object') +@attr(method='get') +@attr(operation='Test ExistingObjectTag conditional on get object') +@attr(assertion='success') +@attr('tagging') +@attr('bucket-policy') +def test_bucket_policy_get_obj_existing_tag(): + + bucket = _create_keys(keys=['publictag','privatetag','invalidtag']) + + + tag_conditional = {"StringEquals": { + "s3:ExistingObjectTag/security" : "public" + }} + + resource = _make_arn_resource("{}/{}".format(bucket.name, "*")) + policy_document = make_json_policy("s3:GetObject", + resource, + conditions=tag_conditional) + + bucket.set_policy(policy_document) + input_tagset = S3TestTagSet() + input_tagset.add_tag('security','public') + input_tagset.add_tag('foo','bar') + + input_tagset2 = S3TestTagSet() + input_tagset2.add_tag('security','private') + + input_tagset3 = S3TestTagSet() + input_tagset3.add_tag('security1','public') + + res = _put_obj_tags(bucket, 'publictag', input_tagset.to_xml()) + eq(res.status, 200) + + res = _put_obj_tags(bucket, 'privatetag', input_tagset2.to_xml()) + eq(res.status, 200) + + res = _put_obj_tags(bucket, 'invalidtag', input_tagset3.to_xml()) + eq(res.status, 200) + + new_conn = _get_alt_connection() + res = new_conn.make_request("GET",bucket.name, 'publictag') + eq(res.status, 200) + + res = new_conn.make_request("GET",bucket.name, 'privatetag') + eq(res.status, 403) + + res = new_conn.make_request("GET",bucket.name, 'invalidtag') + eq(res.status, 403) + +@attr(resource='object') +@attr(method='get') +@attr(operation='Test ExistingObjectTag conditional on get object tagging') +@attr(assertion='success') +@attr('tagging') +@attr('bucket-policy') +def test_bucket_policy_get_obj_tagging_existing_tag(): + + bucket = _create_keys(keys=['publictag','privatetag','invalidtag']) + + + tag_conditional = {"StringEquals": { + "s3:ExistingObjectTag/security" : "public" + }} + + resource = _make_arn_resource("{}/{}".format(bucket.name, "*")) + policy_document = make_json_policy("s3:GetObjectTagging", + resource, + conditions=tag_conditional) + + bucket.set_policy(policy_document) + input_tagset = S3TestTagSet() + input_tagset.add_tag('security','public') + input_tagset.add_tag('foo','bar') + + input_tagset2 = S3TestTagSet() + input_tagset2.add_tag('security','private') + + input_tagset3 = S3TestTagSet() + input_tagset3.add_tag('security1','public') + + res = _put_obj_tags(bucket, 'publictag', input_tagset.to_xml()) + eq(res.status, 200) + + res = _put_obj_tags(bucket, 'privatetag', input_tagset2.to_xml()) + eq(res.status, 200) + + res = _put_obj_tags(bucket, 'invalidtag', input_tagset3.to_xml()) + eq(res.status, 200) + + new_conn = _get_alt_connection() + res = new_conn.make_request("GET",bucket.name, 'publictag', query_args='tagging') + eq(res.status, 200) + + # A get object itself should fail since we allowed only GetObjectTagging + res = new_conn.make_request("GET",bucket.name, 'publictag') + eq(res.status, 403) + + res = new_conn.make_request("GET",bucket.name, 'privatetag', query_args='tagging') + eq(res.status, 403) + + res = new_conn.make_request("GET",bucket.name, 'invalidtag', query_args='tagging') + eq(res.status, 403) + + + +@attr(resource='object') +@attr(method='get') +@attr(operation='Test ExistingObjectTag conditional on put object tagging') +@attr(assertion='success') +@attr('tagging') +@attr('bucket-policy') +def test_bucket_policy_put_obj_tagging_existing_tag(): + + bucket = _create_keys(keys=['publictag','privatetag','invalidtag']) + + + tag_conditional = {"StringEquals": { + "s3:ExistingObjectTag/security" : "public" + }} + + resource = _make_arn_resource("{}/{}".format(bucket.name, "*")) + policy_document = make_json_policy("s3:PutObjectTagging", + resource, + conditions=tag_conditional) + + bucket.set_policy(policy_document) + input_tagset = S3TestTagSet() + input_tagset.add_tag('security','public') + input_tagset.add_tag('foo','bar') + + res = _put_obj_tags(bucket, 'publictag', input_tagset.to_xml()) + eq(res.status, 200) + + input_tagset2 = S3TestTagSet() + input_tagset2.add_tag('security','private') + + res = _put_obj_tags(bucket, 'privatetag', input_tagset2.to_xml()) + eq(res.status, 200) + + new_conn = _get_alt_connection() + # PUT requests with object tagging are a bit wierd, if you forget to put + # the tag which is supposed to be existing anymore well, well subsequent + # put requests will fail + testtagset1 = S3TestTagSet() + testtagset1.add_tag('security','public') + testtagset1.add_tag('foo','bar') + res = _put_obj_tags_conn(new_conn, bucket.name, 'publictag', testtagset1.to_xml()) + eq(res.status, 200) + + res = _put_obj_tags_conn(new_conn, bucket.name, 'privatetag', testtagset1.to_xml()) + eq(res.status, 403) + + testtagset2 = S3TestTagSet() + testtagset2.add_tag('security','private') + res = _put_obj_tags_conn(new_conn, bucket.name, 'publictag', testtagset2.to_xml()) + eq(res.status, 200) + + # Now try putting the original tags again, this should fail + res = _put_obj_tags_conn(new_conn, bucket.name, 'publictag', testtagset1.to_xml()) + eq(res.status, 403) + + + +@attr(resource='object') +@attr(method='put') +@attr(operation='Test copy-source conditional on put obj') +@attr(assertion='success') +@attr('tagging') +@attr('bucket-policy') +def test_bucket_policy_put_obj_copy_source(): + + bucket_source = _create_keys(keys=['public/foo', 'public/bar', 'private/foo']) + src_resource = _make_arn_resource("{}/{}".format(bucket_source.name, "*")) + # lets make the source objects public for now + policy_document = make_json_policy("s3:GetObject", + src_resource + ) + + bucket_source.set_policy(policy_document) + + bucket = get_new_bucket() + + tag_conditional = {"StringLike": { + "s3:x-amz-copy-source" : bucket_source.name + "/public/*" + }} + + resource = _make_arn_resource("{}/{}".format(bucket.name, "*")) + policy_document = make_json_policy("s3:PutObject", + resource, + conditions=tag_conditional) + + bucket.set_policy(policy_document) + + new_conn = _get_alt_connection() + alt_bucket = new_conn.get_bucket(bucket.name, validate=False) + key = alt_bucket.copy_key('new_foo', bucket_source.name, 'public/foo') + # This is possible because we are still the owner, see the grants with + # policy on how to do this right + eq(key.get_contents_as_string(), 'public/foo') + + key = alt_bucket.copy_key('new_foo2', bucket_source.name, 'public/bar') + eq(key.get_contents_as_string(), 'public/bar') + + check_access_denied(alt_bucket.copy_key,'new_foo2', bucket_source.name, 'private/foo') + + +@attr(resource='object') +@attr(method='put') +@attr(operation='Test copy-source conditional on put obj') +@attr(assertion='success') +@attr('tagging') +@attr('bucket-policy') +def test_bucket_policy_put_obj_copy_source_meta(): + + bucket_source = _create_keys(keys=['public/foo', 'public/bar']) + src_resource = _make_arn_resource("{}/{}".format(bucket_source.name, "*")) + # lets make the source objects public for now + policy_document = make_json_policy("s3:GetObject", + src_resource + ) + + bucket_source.set_policy(policy_document) + + bucket = get_new_bucket() + + tag_conditional = {"StringEquals": { + "s3:x-amz-metadata-directive" : "COPY" + }} + + resource = _make_arn_resource("{}/{}".format(bucket.name, "*")) + policy_document = make_json_policy("s3:PutObject", + resource, + conditions=tag_conditional) + + bucket.set_policy(policy_document) + + new_conn = _get_alt_connection() + alt_bucket = new_conn.get_bucket(bucket.name, validate=False) + key = alt_bucket.copy_key('new_foo', bucket_source.name, 'public/foo', headers={"x-metadata-directive" : "COPY"}) + # This is possible because we are still the owner, see the grants with + # policy on how to do this right + eq(key.get_contents_as_string(), 'public/foo') + + check_access_denied(alt_bucket.copy_key, 'new_foo2', bucket_source.name, 'public/bar', metadata={"foo" : "bar"}) + + + +@attr(resource='object') +@attr(method='put') +@attr(operation='Test put obj with canned-acl not to be public') +@attr(assertion='success') +@attr('tagging') +@attr('bucket-policy') +def test_bucket_policy_put_obj_acl(): + + bucket = get_new_bucket() + + # An allow conditional will require atleast the presence of an x-amz-acl + # attribute a Deny conditional would negate any requests that try to set a + # public-read/write acl + conditional = {"StringLike": { + "s3:x-amz-acl" : "public*" + }} + + p = Policy() + resource = _make_arn_resource("{}/{}".format(bucket.name, "*")) + s1 = Statement("s3:PutObject",resource) + s2 = Statement("s3:PutObject", resource, effect="Deny", condition=conditional) + + policy_document = p.add_statement(s1).add_statement(s2).to_json() + + bucket.set_policy(policy_document) + + new_conn = _get_alt_connection() + #alt_bucket = new_conn.get_bucket(bucket.name, validate=False) + + key1 ='private-key' + #obj1 = alt_bucket.new_key(key1) + + # if we want to be really pedantic, we should check that this doesn't raise + # and mark a failure, however if this does raise nosetests would mark this + # as an ERROR anyway + #obj1.set_contents_from_string(key1) + headers = {"x-amz-acl" : "private"} + res = new_conn.make_request('PUT', bucket.name, key1, data=key1) + eq(res.status, 200) + + + key2 = 'public-key' + headers = {"x-amz-acl":"public-read"} + # so there is no way to create a key and set canned acl in the same request in boto2 :( + res = new_conn.make_request('PUT', bucket.name, key2, headers=headers, data=key2) + eq(res.status, 403) + + + +@attr(resource='object') +@attr(method='put') +@attr(operation='Test put obj with amz-grant back to bucket-owner') +@attr(assertion='success') +@attr('bucket-policy') +def test_bucket_policy_put_obj_grant(): + + bucket1 = get_new_bucket() + bucket2 = get_new_bucket() + + # In normal cases a key owner would be the uploader of a key in first case + # we explicitly require that the bucket owner is granted full control over + # the object uploaded by any user, the second bucket is where no such + # policy is enforced meaning that the uploader still retains ownership + + owner_id_str = "id=" + config.main.user_id + grantee_id_str = "id=" + config.alt.user_id + s3_conditional = {"StringEquals": { + "s3:x-amz-grant-full-control" : owner_id_str + }} + + resource = _make_arn_resource("{}/{}".format(bucket1.name, "*")) + policy_document = make_json_policy("s3:PutObject", + resource, + conditions=s3_conditional) + + resource = _make_arn_resource("{}/{}".format(bucket2.name, "*")) + policy_document2 = make_json_policy("s3:PutObject", resource) + + bucket1.set_policy(policy_document) + bucket2.set_policy(policy_document2) + + new_conn = _get_alt_connection() + + key1 = 'key1' + headers = {"x-amz-grant-full-control": owner_id_str } + res = new_conn.make_request('PUT', bucket1.name, key1, headers=headers, data=key1) + eq(res.status, 200) + + key2 = 'key2' + res = new_conn.make_request('PUT', bucket2.name, key2, data=key2) + eq(res.status, 200) + + acl1 = bucket1.get_acl(key_name=key1) + + # user 1 is trying to get acl for the object from user2 where ownership + # wasn't transferred + check_access_denied(bucket2.get_acl, key_name=key2) + + acl2 = new_conn.get_bucket(bucket2.name, validate=False).get_acl(key_name=key2) + + # With the full control grant, the owner of the object is the granted + # original bucket owner + eq(utils.get_grantee(acl1, "FULL_CONTROL"), config.main.user_id) + + # Normal case without any restrictions, owner is the uploader + eq(utils.get_grantee(acl2, "FULL_CONTROL"), config.alt.user_id) + + +@attr(resource='object') +@attr(method='put') +@attr(operation='Deny put obj requests without encryption') +@attr(assertion='success') +@attr('encryption') +@attr('bucket-policy') +def test_bucket_policy_put_obj_enc(): + + bucket = get_new_bucket() + + deny_incorrect_algo = { + "StringNotEquals": { + "s3:x-amz-server-side-encryption": "AES256" + } + } + + deny_unencrypted_obj = { + "Null" : { + "s3:x-amz-server-side-encryption": "true" + } + } + + p = Policy() + resource = _make_arn_resource("{}/{}".format(bucket.name, "*")) + + s1 = Statement("s3:PutObject", resource, effect="Deny", condition=deny_incorrect_algo) + s2 = Statement("s3:PutObject", resource, effect="Deny", condition=deny_unencrypted_obj) + policy_document = p.add_statement(s1).add_statement(s2).to_json() + + bucket.set_policy(policy_document) + + key1_str ='testobj' + key1 = bucket.new_key(key1_str) + check_access_denied(key1.set_contents_from_string, key1_str) + + sse_client_headers = { + 'x-amz-server-side-encryption' : 'AES256', + 'x-amz-server-side-encryption-customer-algorithm': 'AES256', + 'x-amz-server-side-encryption-customer-key': 'pO3upElrwuEXSoFwCfnZPdSsmt/xWeFa0N9KgDijwVs=', + 'x-amz-server-side-encryption-customer-key-md5': 'DWygnHRtgiJ77HCm+1rvHw==' + } + + + key1.set_contents_from_string(key1_str, headers=sse_client_headers) + + + + +@attr(resource='object') +@attr(method='put') +@attr(operation='put obj with RequestObjectTag') +@attr(assertion='success') +@attr('tagging') +@attr('bucket-policy') +def test_bucket_policy_put_obj_request_obj_tag(): + + bucket = get_new_bucket() + + tag_conditional = {"StringEquals": { + "s3:RequestObjectTag/security" : "public" + }} + + p = Policy() + resource = _make_arn_resource("{}/{}".format(bucket.name, "*")) + + s1 = Statement("s3:PutObject", resource, effect="Allow", condition=tag_conditional) + policy_document = p.add_statement(s1).to_json() + + bucket.set_policy(policy_document) + + new_conn = _get_alt_connection() + bucket1 = new_conn.get_bucket(bucket.name, validate=False) + key1_str ='testobj' + key1 = bucket1.new_key(key1_str) + check_access_denied(key1.set_contents_from_string, key1_str) + + headers = {"x-amz-tagging" : "security=public"} + key1.set_contents_from_string(key1_str, headers=headers) + + +@attr(resource='object') +@attr(method='get') +@attr(operation='Test ExistingObjectTag conditional on get object acl') +@attr(assertion='success') +@attr('tagging') +@attr('bucket-policy') +def test_bucket_policy_get_obj_acl_existing_tag(): + + bucket = _create_keys(keys=['publictag','privatetag','invalidtag']) + + + tag_conditional = {"StringEquals": { + "s3:ExistingObjectTag/security" : "public" + }} + + resource = _make_arn_resource("{}/{}".format(bucket.name, "*")) + policy_document = make_json_policy("s3:GetObjectAcl", + resource, + conditions=tag_conditional) + + bucket.set_policy(policy_document) + input_tagset = S3TestTagSet() + input_tagset.add_tag('security','public') + input_tagset.add_tag('foo','bar') + + input_tagset2 = S3TestTagSet() + input_tagset2.add_tag('security','private') + + input_tagset3 = S3TestTagSet() + input_tagset3.add_tag('security1','public') + + res = _put_obj_tags(bucket, 'publictag', input_tagset.to_xml()) + eq(res.status, 200) + + res = _put_obj_tags(bucket, 'privatetag', input_tagset2.to_xml()) + eq(res.status, 200) + + res = _put_obj_tags(bucket, 'invalidtag', input_tagset3.to_xml()) + eq(res.status, 200) + + new_conn = _get_alt_connection() + res = new_conn.make_request("GET",bucket.name, 'publictag', query_args='acl') + eq(res.status, 200) + + # A get object itself should fail since we allowed only GetObjectTagging + res = new_conn.make_request("GET",bucket.name, 'publictag') + eq(res.status, 403) + + res = new_conn.make_request("GET",bucket.name, 'privatetag', query_args='tagging') + eq(res.status, 403) + + res = new_conn.make_request("GET",bucket.name, 'invalidtag', query_args='tagging') + eq(res.status, 403) diff --git a/s3tests/functional/utils.py b/s3tests/functional/utils.py index cb507ce..24f7d87 100644 --- a/s3tests/functional/utils.py +++ b/s3tests/functional/utils.py @@ -52,3 +52,12 @@ def region_sync_meta(targets, region): if conf.sync_meta_wait: time.sleep(conf.sync_meta_wait) + +def get_grantee(policy, permission): + ''' + Given an object/bucket policy, extract the grantee with the required permission + ''' + + for g in policy.acl.grants: + if g.permission == permission: + return g.id