diff --git a/s3tests/functional/policy.py b/s3tests/functional/policy.py new file mode 100644 index 0000000..910461a --- /dev/null +++ b/s3tests/functional/policy.py @@ -0,0 +1,38 @@ +import json + +class Statement(object): + def __init__(self, action, resource, principal = {"AWS" : "*"}, effect= "Allow", condition = None): + self.principal = principal + self.action = action + self.resource = resource + self.condition = condition + self.effect = effect + + def to_dict(self): + d = { "Action" : self.action, + "Principal" : self.principal, + "Effect" : self.effect, + "Resource" : self.resource + } + + if self.condition is not None: + d["Condition"] = self.condition + + return d + +class Policy(object): + def __init__(self): + self.statements = [] + + def add_statement(self, s): + self.statements.append(s) + return self + + def to_json(self): + policy_dict = { + "Version" : "2012-10-17", + "Statement": + [s.to_dict() for s in self.statements] + } + + return json.dumps(policy_dict) diff --git a/s3tests/functional/test_s3.py b/s3tests/functional/test_s3.py index f75c7a8..1b32535 100644 --- a/s3tests/functional/test_s3.py +++ b/s3tests/functional/test_s3.py @@ -42,6 +42,9 @@ from boto.s3.tagging import TagSet from .utils import assert_raises from .utils import generate_random from .utils import region_sync_meta + +from .policy import Policy, Statement + import AnonymousAuth from email.header import decode_header @@ -9876,27 +9879,36 @@ def test_bucket_policy_put_obj_acl(): bucket = get_new_bucket() - tag_conditional = {"StringNotLike": { + # An allow conditional will require atleast the presence of an x-amz-acl + # attribute a Deny conditional would negate any requests that try to set a + # public-read/write acl + conditional = {"StringLike": { "s3:x-amz-acl" : "public*" }} + p = Policy() resource = _make_arn_resource("{}/{}".format(bucket.name, "*")) - policy_document = make_json_policy("s3:PutObject", - resource, - conditions=tag_conditional) + s1 = Statement("s3:PutObject",resource) + s2 = Statement("s3:PutOBject", resource, effect="Deny", condition=conditional) + + policy_document = p.add_statement(s1).add_statement(s2).to_json() bucket.set_policy(policy_document) new_conn = _get_alt_connection() - alt_bucket = new_conn.get_bucket(bucket.name, validate=False) + #alt_bucket = new_conn.get_bucket(bucket.name, validate=False) key1 ='private-key' - obj1 = bucket.new_key(key1) + #obj1 = alt_bucket.new_key(key1) # if we want to be really pedantic, we should check that this doesn't raise - # and mark failure, however if this does raise nose will mark it as an - # ERROR anyway - obj1.set_contents_from_string(key1) + # and mark a failure, however if this does raise nosetests would mark this + # as an ERROR anyway + #obj1.set_contents_from_string(key1) + headers = {"x-amz-acl" : "private"} + res = new_conn.make_request('PUT', bucket.name, key1, data=key1) + eq(res.status, 200) + key2 = 'public-key' headers = {"x-amz-acl":"public-read"}