Merge pull request #601 from pritha-srivastava/wip-rgw-merge-store-tests

rgw/s3-tests: adding missing tests for testing delete operations (related to bucket policy and bucket public access block)
This commit is contained in:
Casey Bodley 2024-12-17 16:34:03 -05:00 committed by GitHub
commit 14768d8554
No known key found for this signature in database
GPG key ID: B5690EEEBB952194

View file

@ -10895,6 +10895,43 @@ def test_bucket_policy_set_condition_operator_end_with_IfExists():
response = client.get_bucket_policy(Bucket=bucket_name)
print(response)
@pytest.mark.bucket_policy
def test_set_get_del_bucket_policy():
bucket_name = get_new_bucket()
client = get_client()
key = 'asdf'
client.put_object(Bucket=bucket_name, Key=key, Body='asdf')
resource1 = "arn:aws:s3:::" + bucket_name
resource2 = "arn:aws:s3:::" + bucket_name + "/*"
policy_document = json.dumps(
{
"Version": "2012-10-17",
"Statement": [{
"Effect": "Allow",
"Principal": {"AWS": "*"},
"Action": "s3:ListBucket",
"Resource": [
"{}".format(resource1),
"{}".format(resource2)
]
}]
})
client.put_bucket_policy(Bucket=bucket_name, Policy=policy_document)
response = client.get_bucket_policy(Bucket=bucket_name)
response_policy = response['Policy']
assert policy_document == response_policy
client.delete_bucket_policy(Bucket=bucket_name)
try:
response = client.get_bucket_policy(Bucket=bucket_name)
except botocore.exceptions.ClientError as e:
print (e)
assert e.response['Error']['Code'] == 'NoSuchBucketPolicy'
def _create_simple_tagset(count):
tagset = []
for i in range(count):
@ -13243,6 +13280,33 @@ def test_ignore_public_acls():
check_access_denied(alt_client.list_objects, Bucket=bucket_name)
check_access_denied(alt_client.get_object, Bucket=bucket_name, Key='key1')
def test_put_get_delete_public_block():
bucket_name = get_new_bucket()
client = get_client()
access_conf = {'BlockPublicAcls': True,
'IgnorePublicAcls': True,
'BlockPublicPolicy': True,
'RestrictPublicBuckets': False}
client.put_public_access_block(Bucket=bucket_name, PublicAccessBlockConfiguration=access_conf)
resp = client.get_public_access_block(Bucket=bucket_name)
assert resp['PublicAccessBlockConfiguration']['BlockPublicAcls'] == access_conf['BlockPublicAcls']
assert resp['PublicAccessBlockConfiguration']['BlockPublicPolicy'] == access_conf['BlockPublicPolicy']
assert resp['PublicAccessBlockConfiguration']['IgnorePublicAcls'] == access_conf['IgnorePublicAcls']
assert resp['PublicAccessBlockConfiguration']['RestrictPublicBuckets'] == access_conf['RestrictPublicBuckets']
resp = client.delete_public_access_block(Bucket=bucket_name)
assert resp['ResponseMetadata']['HTTPStatusCode'] == 204
response_code = ""
try:
resp = client.get_public_access_block(Bucket=bucket_name)
except ClientError as e:
response_code = e.response['Error']['Code']
assert response_code == 'NoSuchPublicAccessBlockConfiguration'
def test_multipart_upload_on_a_bucket_with_policy():
bucket_name = get_new_bucket()