Merge pull request #266 from theanalyst/wip-38700-range

add multipart ranges to encryption tests
This commit is contained in:
Casey Bodley 2023-06-14 16:11:52 -04:00 committed by GitHub
commit 4476773180
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23

View file

@ -9302,9 +9302,7 @@ def _multipart_upload_enc(client, bucket_name, key, size, part_size, init_header
return (upload_id, s, parts)
def _check_content_using_range_enc(client, bucket_name, key, data, step, enc_headers=None):
response = client.get_object(Bucket=bucket_name, Key=key)
size = response['ContentLength']
def _check_content_using_range_enc(client, bucket_name, key, data, size, step, enc_headers=None):
for ofs in range(0, size, step):
toread = size - ofs
if toread > step:
@ -9328,6 +9326,7 @@ def test_encryption_sse_c_multipart_upload():
key = "multipart_enc"
content_type = 'text/plain'
objlen = 30 * 1024 * 1024
partlen = 5*1024*1024
metadata = {'foo': 'bar'}
enc_headers = {
'x-amz-server-side-encryption-customer-algorithm': 'AES256',
@ -9338,7 +9337,7 @@ def test_encryption_sse_c_multipart_upload():
resend_parts = []
(upload_id, data, parts) = _multipart_upload_enc(client, bucket_name, key, objlen,
part_size=5*1024*1024, init_headers=enc_headers, part_headers=enc_headers, metadata=metadata, resend_parts=resend_parts)
part_size=partlen, init_headers=enc_headers, part_headers=enc_headers, metadata=metadata, resend_parts=resend_parts)
lf = (lambda **kwargs: kwargs['params']['headers'].update(enc_headers))
client.meta.events.register('before-call.s3.CompleteMultipartUpload', lf)
@ -9362,8 +9361,57 @@ def test_encryption_sse_c_multipart_upload():
size = response['ContentLength']
assert len(body) == size
_check_content_using_range_enc(client, bucket_name, key, data, 1000000, enc_headers=enc_headers)
_check_content_using_range_enc(client, bucket_name, key, data, 10000000, enc_headers=enc_headers)
_check_content_using_range_enc(client, bucket_name, key, data, size, 1000000, enc_headers=enc_headers)
_check_content_using_range_enc(client, bucket_name, key, data, size, 10000000, enc_headers=enc_headers)
for i in range(-1,2):
_check_content_using_range_enc(client, bucket_name, key, data, size, partlen + i, enc_headers=enc_headers)
@pytest.mark.encryption
def test_encryption_sse_c_unaligned_multipart_upload():
bucket_name = get_new_bucket()
client = get_client()
key = "multipart_enc"
content_type = 'text/plain'
objlen = 30 * 1024 * 1024
partlen = 1 + 5 * 1024 * 1024 # not a multiple of the 4k encryption block size
metadata = {'foo': 'bar'}
enc_headers = {
'x-amz-server-side-encryption-customer-algorithm': 'AES256',
'x-amz-server-side-encryption-customer-key': 'pO3upElrwuEXSoFwCfnZPdSsmt/xWeFa0N9KgDijwVs=',
'x-amz-server-side-encryption-customer-key-md5': 'DWygnHRtgiJ77HCm+1rvHw==',
'Content-Type': content_type
}
resend_parts = []
(upload_id, data, parts) = _multipart_upload_enc(client, bucket_name, key, objlen,
part_size=partlen, init_headers=enc_headers, part_headers=enc_headers, metadata=metadata, resend_parts=resend_parts)
lf = (lambda **kwargs: kwargs['params']['headers'].update(enc_headers))
client.meta.events.register('before-call.s3.CompleteMultipartUpload', lf)
client.complete_multipart_upload(Bucket=bucket_name, Key=key, UploadId=upload_id, MultipartUpload={'Parts': parts})
response = client.head_bucket(Bucket=bucket_name)
rgw_object_count = int(response['ResponseMetadata']['HTTPHeaders']['x-rgw-object-count'])
assert rgw_object_count == 1
rgw_bytes_used = int(response['ResponseMetadata']['HTTPHeaders']['x-rgw-bytes-used'])
assert rgw_bytes_used == objlen
lf = (lambda **kwargs: kwargs['params']['headers'].update(enc_headers))
client.meta.events.register('before-call.s3.GetObject', lf)
response = client.get_object(Bucket=bucket_name, Key=key)
assert response['Metadata'] == metadata
assert response['ResponseMetadata']['HTTPHeaders']['content-type'] == content_type
body = _get_body(response)
assert body == data
size = response['ContentLength']
assert len(body) == size
_check_content_using_range_enc(client, bucket_name, key, data, size, 1000000, enc_headers=enc_headers)
_check_content_using_range_enc(client, bucket_name, key, data, size, 10000000, enc_headers=enc_headers)
for i in range(-1,2):
_check_content_using_range_enc(client, bucket_name, key, data, size, partlen + i, enc_headers=enc_headers)
@pytest.mark.encryption
# TODO: remove this fails_on_rgw when I fix it