policy: add a new policy class to make creation of complex policies

Since policies can have allow/deny rules etc

Signed-off-by: Abhishek Lekshmanan <abhishek@suse.com>
This commit is contained in:
Abhishek Lekshmanan 2017-10-19 16:24:30 +02:00
parent ef827b745e
commit ecea466666
2 changed files with 59 additions and 9 deletions

View file

@ -0,0 +1,38 @@
import json
class Statement(object):
def __init__(self, action, resource, principal = {"AWS" : "*"}, effect= "Allow", condition = None):
self.principal = principal
self.action = action
self.resource = resource
self.condition = condition
self.effect = effect
def to_dict(self):
d = { "Action" : self.action,
"Principal" : self.principal,
"Effect" : self.effect,
"Resource" : self.resource
}
if self.condition is not None:
d["Condition"] = self.condition
return d
class Policy(object):
def __init__(self):
self.statements = []
def add_statement(self, s):
self.statements.append(s)
return self
def to_json(self):
policy_dict = {
"Version" : "2012-10-17",
"Statement":
[s.to_dict() for s in self.statements]
}
return json.dumps(policy_dict)

View file

@ -42,6 +42,9 @@ from boto.s3.tagging import TagSet
from .utils import assert_raises
from .utils import generate_random
from .utils import region_sync_meta
from .policy import Policy, Statement
import AnonymousAuth
from email.header import decode_header
@ -9876,27 +9879,36 @@ def test_bucket_policy_put_obj_acl():
bucket = get_new_bucket()
tag_conditional = {"StringNotLike": {
# An allow conditional will require atleast the presence of an x-amz-acl
# attribute a Deny conditional would negate any requests that try to set a
# public-read/write acl
conditional = {"StringLike": {
"s3:x-amz-acl" : "public*"
}}
p = Policy()
resource = _make_arn_resource("{}/{}".format(bucket.name, "*"))
policy_document = make_json_policy("s3:PutObject",
resource,
conditions=tag_conditional)
s1 = Statement("s3:PutObject",resource)
s2 = Statement("s3:PutOBject", resource, effect="Deny", condition=conditional)
policy_document = p.add_statement(s1).add_statement(s2).to_json()
bucket.set_policy(policy_document)
new_conn = _get_alt_connection()
alt_bucket = new_conn.get_bucket(bucket.name, validate=False)
#alt_bucket = new_conn.get_bucket(bucket.name, validate=False)
key1 ='private-key'
obj1 = bucket.new_key(key1)
#obj1 = alt_bucket.new_key(key1)
# if we want to be really pedantic, we should check that this doesn't raise
# and mark failure, however if this does raise nose will mark it as an
# ERROR anyway
obj1.set_contents_from_string(key1)
# and mark a failure, however if this does raise nosetests would mark this
# as an ERROR anyway
#obj1.set_contents_from_string(key1)
headers = {"x-amz-acl" : "private"}
res = new_conn.make_request('PUT', bucket.name, key1, data=key1)
eq(res.status, 200)
key2 = 'public-key'
headers = {"x-amz-acl":"public-read"}