forked from TrueCloudLab/s3-tests
Merge pull request #163 from pritha-srivastava/wip-rgw-bucket-policy-tests
rgw: Adding tests for Bucket Policy. Reviewed-by: Adam C. Emerson <aemerson@redhat.com>
This commit is contained in:
commit
ed701f0c65
1 changed files with 163 additions and 0 deletions
|
@ -8478,3 +8478,166 @@ def test_sse_kms_barb_transfer_13b():
|
|||
if 'kms_keyid' not in config['main']:
|
||||
raise SkipTest
|
||||
_test_sse_kms_customer_write(13, key_id = config['main']['kms_keyid'])
|
||||
|
||||
@attr(resource='bucket')
|
||||
@attr(method='get')
|
||||
@attr(operation='Test Bucket Policy')
|
||||
@attr(assertion='succeeds')
|
||||
@attr('bucket-policy')
|
||||
def test_bucket_policy():
|
||||
bucket = get_new_bucket()
|
||||
key = bucket.new_key('asdf')
|
||||
key.set_contents_from_string('asdf')
|
||||
l = bucket.list()
|
||||
resource1 = "arn:aws:s3:::" + bucket.name
|
||||
resource2 = "arn:aws:s3:::" + bucket.name + "/*"
|
||||
policy_document = json.dumps(
|
||||
{
|
||||
"Version": "2012-10-17",
|
||||
"Statement": [{
|
||||
"Effect": "Allow",
|
||||
"Principal": {"AWS": "*"},
|
||||
"Action": "s3:ListBucket",
|
||||
"Resource": [
|
||||
"{}".format(resource1),
|
||||
"{}".format(resource2)
|
||||
]
|
||||
}]
|
||||
})
|
||||
bucket.set_policy(policy_document)
|
||||
|
||||
new_conn = boto.s3.connection.S3Connection(
|
||||
aws_access_key_id=s3['alt'].aws_access_key_id,
|
||||
aws_secret_access_key=s3['alt'].aws_secret_access_key,
|
||||
is_secure=s3['alt'].is_secure,
|
||||
port=s3['alt'].port,
|
||||
host=s3['alt'].host,
|
||||
calling_format=s3['alt'].calling_format,
|
||||
)
|
||||
b = new_conn.get_bucket(bucket.name)
|
||||
b.get_all_keys()
|
||||
|
||||
@attr(resource='bucket')
|
||||
@attr(method='get')
|
||||
@attr(operation='Test Bucket Policy and ACL')
|
||||
@attr(assertion='fails')
|
||||
@attr('bucket-policy')
|
||||
def test_bucket_policy_acl():
|
||||
bucket = get_new_bucket()
|
||||
key = bucket.new_key('asdf')
|
||||
key.set_contents_from_string('asdf')
|
||||
l = bucket.list()
|
||||
resource1 = "arn:aws:s3:::" + bucket.name
|
||||
resource2 = "arn:aws:s3:::" + bucket.name + "/*"
|
||||
policy_document = json.dumps(
|
||||
{
|
||||
"Version": "2012-10-17",
|
||||
"Statement": [{
|
||||
"Effect": "Deny",
|
||||
"Principal": {"AWS": "*"},
|
||||
"Action": "s3:ListBucket",
|
||||
"Resource": [
|
||||
"{}".format(resource1),
|
||||
"{}".format(resource2)
|
||||
]
|
||||
}]
|
||||
})
|
||||
bucket.set_canned_acl('authenticated-read')
|
||||
bucket.set_policy(policy_document)
|
||||
|
||||
new_conn = boto.s3.connection.S3Connection(
|
||||
aws_access_key_id=s3['alt'].aws_access_key_id,
|
||||
aws_secret_access_key=s3['alt'].aws_secret_access_key,
|
||||
is_secure=s3['alt'].is_secure,
|
||||
port=s3['alt'].port,
|
||||
host=s3['alt'].host,
|
||||
calling_format=s3['alt'].calling_format,
|
||||
)
|
||||
e = assert_raises(boto.exception.S3ResponseError, new_conn.get_bucket, bucket.name)
|
||||
eq(e.status, 403)
|
||||
eq(e.reason, 'Forbidden')
|
||||
eq(e.error_code, 'AccessDenied')
|
||||
|
||||
@attr(resource='bucket')
|
||||
@attr(method='get')
|
||||
@attr(operation='Test Bucket Policy for a user belonging to a different tenant')
|
||||
@attr(assertion='succeeds')
|
||||
@attr('bucket-policy')
|
||||
def test_bucket_policy_different_tenant():
|
||||
bucket = get_new_bucket()
|
||||
key = bucket.new_key('asdf')
|
||||
key.set_contents_from_string('asdf')
|
||||
l = bucket.list()
|
||||
resource1 = "arn:aws:s3::*:" + bucket.name
|
||||
resource2 = "arn:aws:s3::*:" + bucket.name + "/*"
|
||||
policy_document = json.dumps(
|
||||
{
|
||||
"Version": "2012-10-17",
|
||||
"Statement": [{
|
||||
"Effect": "Allow",
|
||||
"Principal": {"AWS": "*"},
|
||||
"Action": "s3:ListBucket",
|
||||
"Resource": [
|
||||
"{}".format(resource1),
|
||||
"{}".format(resource2)
|
||||
]
|
||||
}]
|
||||
})
|
||||
bucket.set_policy(policy_document)
|
||||
|
||||
new_conn = boto.s3.connection.S3Connection(
|
||||
aws_access_key_id=s3['tenant'].aws_access_key_id,
|
||||
aws_secret_access_key=s3['tenant'].aws_secret_access_key,
|
||||
is_secure=s3['tenant'].is_secure,
|
||||
port=s3['tenant'].port,
|
||||
host=s3['tenant'].host,
|
||||
calling_format=s3['tenant'].calling_format,
|
||||
)
|
||||
bucket_name = ":" + bucket.name
|
||||
b = new_conn.get_bucket(bucket_name)
|
||||
b.get_all_keys()
|
||||
|
||||
@attr(resource='bucket')
|
||||
@attr(method='get')
|
||||
@attr(operation='Test Bucket Policy on another bucket')
|
||||
@attr(assertion='succeeds')
|
||||
@attr('bucket-policy')
|
||||
def test_bucket_policy_another_bucket():
|
||||
bucket1 = get_new_bucket()
|
||||
bucket2 = get_new_bucket()
|
||||
key1 = bucket1.new_key('asdf')
|
||||
key1.set_contents_from_string('asdf')
|
||||
key2 = bucket2.new_key('abcd')
|
||||
key2.set_contents_from_string('abcd')
|
||||
l = bucket1.list()
|
||||
policy_document = json.dumps(
|
||||
{
|
||||
"Version": "2012-10-17",
|
||||
"Statement": [{
|
||||
"Effect": "Allow",
|
||||
"Principal": {"AWS": "*"},
|
||||
"Action": "s3:ListBucket",
|
||||
"Resource": [
|
||||
"arn:aws:s3:::*",
|
||||
"arn:aws:s3:::*/*"
|
||||
]
|
||||
}]
|
||||
})
|
||||
bucket1.set_policy(policy_document)
|
||||
|
||||
json_policy = bucket1.get_policy()
|
||||
bucket2.set_policy(json_policy)
|
||||
|
||||
new_conn = boto.s3.connection.S3Connection(
|
||||
aws_access_key_id=s3['alt'].aws_access_key_id,
|
||||
aws_secret_access_key=s3['alt'].aws_secret_access_key,
|
||||
is_secure=s3['alt'].is_secure,
|
||||
port=s3['alt'].port,
|
||||
host=s3['alt'].host,
|
||||
calling_format=s3['alt'].calling_format,
|
||||
)
|
||||
b1 = new_conn.get_bucket(bucket1.name)
|
||||
b1.get_all_keys()
|
||||
|
||||
b2 = new_conn.get_bucket(bucket2.name)
|
||||
b2.get_all_keys()
|
||||
|
|
Loading…
Reference in a new issue