sse: add sse-c test with unaligned multipart parts

Fixes: http://tracker.ceph.com/issues/38700

Signed-off-by: Casey Bodley <cbodley@redhat.com>
This commit is contained in:
Casey Bodley 2019-03-22 11:41:44 -04:00
parent 13e0d736a8
commit b05a394738

View file

@ -9365,6 +9365,48 @@ def test_encryption_sse_c_multipart_upload():
_check_content_using_range_enc(client, bucket_name, key, data, 1000000, enc_headers=enc_headers)
_check_content_using_range_enc(client, bucket_name, key, data, 10000000, enc_headers=enc_headers)
@pytest.mark.encryption
def test_encryption_sse_c_unaligned_multipart_upload():
bucket_name = get_new_bucket()
client = get_client()
key = "multipart_enc"
content_type = 'text/plain'
objlen = 30 * 1024 * 1024
partlen = 1 + 5 * 1024 * 1024 # not a multiple of the 4k encryption block size
metadata = {'foo': 'bar'}
enc_headers = {
'x-amz-server-side-encryption-customer-algorithm': 'AES256',
'x-amz-server-side-encryption-customer-key': 'pO3upElrwuEXSoFwCfnZPdSsmt/xWeFa0N9KgDijwVs=',
'x-amz-server-side-encryption-customer-key-md5': 'DWygnHRtgiJ77HCm+1rvHw==',
'Content-Type': content_type
}
resend_parts = []
(upload_id, data, parts) = _multipart_upload_enc(client, bucket_name, key, objlen,
part_size=partlen, init_headers=enc_headers, part_headers=enc_headers, metadata=metadata, resend_parts=resend_parts)
lf = (lambda **kwargs: kwargs['params']['headers'].update(enc_headers))
client.meta.events.register('before-call.s3.CompleteMultipartUpload', lf)
client.complete_multipart_upload(Bucket=bucket_name, Key=key, UploadId=upload_id, MultipartUpload={'Parts': parts})
response = client.head_bucket(Bucket=bucket_name)
rgw_object_count = int(response['ResponseMetadata']['HTTPHeaders']['x-rgw-object-count'])
assert rgw_object_count == 1
rgw_bytes_used = int(response['ResponseMetadata']['HTTPHeaders']['x-rgw-bytes-used'])
assert rgw_bytes_used == objlen
lf = (lambda **kwargs: kwargs['params']['headers'].update(enc_headers))
client.meta.events.register('before-call.s3.GetObject', lf)
response = client.get_object(Bucket=bucket_name, Key=key)
assert response['Metadata'] == metadata
assert response['ResponseMetadata']['HTTPHeaders']['content-type'] == content_type
body = _get_body(response)
assert body == data
size = response['ContentLength']
assert len(body) == size
@pytest.mark.encryption
# TODO: remove this fails_on_rgw when I fix it
@pytest.mark.fails_on_rgw