rgw/s3_boto3: Add tests for bucket encryption APIs

Tests are added for GetBucketEncryption, PutBucketEncryption,
and DeleteBucketEncryption APIs.

Related PR: https://github.com/ceph/ceph/pull/42222

Signed-off-by: Rahul Dev Parashar <rahul.dev@flipkart.com>
(cherry picked from commit 44643af0b0)
This commit is contained in:
Rahul Dev Parashar 2021-07-26 16:15:02 +05:30 committed by Ali Maredia
parent 057432b9f5
commit df426ea041

View file

@ -13184,3 +13184,86 @@ def test_multipart_upload_on_a_bucket_with_policy():
(upload_id, data, parts) = _multipart_upload(bucket_name=bucket_name, key=key, size=objlen, client=client) (upload_id, data, parts) = _multipart_upload(bucket_name=bucket_name, key=key, size=objlen, client=client)
response = client.complete_multipart_upload(Bucket=bucket_name, Key=key, UploadId=upload_id, MultipartUpload={'Parts': parts}) response = client.complete_multipart_upload(Bucket=bucket_name, Key=key, UploadId=upload_id, MultipartUpload={'Parts': parts})
eq(response['ResponseMetadata']['HTTPStatusCode'], 200) eq(response['ResponseMetadata']['HTTPStatusCode'], 200)
@attr(resource='bucket')
@attr(method='put')
@attr(operation='put bucket encryption on bucket')
@attr(assertion='succeeds')
def test_put_bucket_encryption():
bucket_name = get_new_bucket()
client = get_client()
server_side_encryption_conf = {
'Rules': [
{
'ApplyServerSideEncryptionByDefault': {
'SSEAlgorithm': 'AES256'
}
},
]
}
response = client.put_bucket_encryption(Bucket=bucket_name, ServerSideEncryptionConfiguration=server_side_encryption_conf)
eq(response['ResponseMetadata']['HTTPStatusCode'], 200)
@attr(resource='bucket')
@attr(method='get')
@attr(operation='get bucket encryption on bucket')
@attr(assertion='succeeds')
def test_get_bucket_encryption():
bucket_name = get_new_bucket()
client = get_client()
response_code = ""
try:
client.get_bucket_encryption(Bucket=bucket_name)
except ClientError as e:
response_code = e.response['Error']['Code']
eq(response_code, 'ServerSideEncryptionConfigurationNotFoundError')
server_side_encryption_conf = {
'Rules': [
{
'ApplyServerSideEncryptionByDefault': {
'SSEAlgorithm': 'AES256'
}
},
]
}
client.put_bucket_encryption(Bucket=bucket_name, ServerSideEncryptionConfiguration=server_side_encryption_conf)
response = client.get_bucket_encryption(Bucket=bucket_name)
eq(response['ResponseMetadata']['HTTPStatusCode'], 200)
eq(response['ServerSideEncryptionConfiguration']['Rules'][0]['ApplyServerSideEncryptionByDefault']['SSEAlgorithm'],
server_side_encryption_conf['Rules'][0]['ApplyServerSideEncryptionByDefault']['SSEAlgorithm'])
@attr(resource='bucket')
@attr(method='delete')
@attr(operation='delete bucket encryption on bucket')
@attr(assertion='succeeds')
def test_delete_bucket_encryption():
bucket_name = get_new_bucket()
client = get_client()
response = client.delete_bucket_encryption(Bucket=bucket_name)
eq(response['ResponseMetadata']['HTTPStatusCode'], 204)
server_side_encryption_conf = {
'Rules': [
{
'ApplyServerSideEncryptionByDefault': {
'SSEAlgorithm': 'AES256'
}
},
]
}
client.put_bucket_encryption(Bucket=bucket_name, ServerSideEncryptionConfiguration=server_side_encryption_conf)
response = client.delete_bucket_encryption(Bucket=bucket_name)
eq(response['ResponseMetadata']['HTTPStatusCode'], 204)