diff --git a/s3tests_boto3/functional/test_s3.py b/s3tests_boto3/functional/test_s3.py index 0d47f1c..cfd0f0a 100644 --- a/s3tests_boto3/functional/test_s3.py +++ b/s3tests_boto3/functional/test_s3.py @@ -9302,9 +9302,7 @@ def _multipart_upload_enc(client, bucket_name, key, size, part_size, init_header return (upload_id, s, parts) -def _check_content_using_range_enc(client, bucket_name, key, data, step, enc_headers=None): - response = client.get_object(Bucket=bucket_name, Key=key) - size = response['ContentLength'] +def _check_content_using_range_enc(client, bucket_name, key, data, size, step, enc_headers=None): for ofs in range(0, size, step): toread = size - ofs if toread > step: @@ -9328,6 +9326,7 @@ def test_encryption_sse_c_multipart_upload(): key = "multipart_enc" content_type = 'text/plain' objlen = 30 * 1024 * 1024 + partlen = 5*1024*1024 metadata = {'foo': 'bar'} enc_headers = { 'x-amz-server-side-encryption-customer-algorithm': 'AES256', @@ -9338,7 +9337,7 @@ def test_encryption_sse_c_multipart_upload(): resend_parts = [] (upload_id, data, parts) = _multipart_upload_enc(client, bucket_name, key, objlen, - part_size=5*1024*1024, init_headers=enc_headers, part_headers=enc_headers, metadata=metadata, resend_parts=resend_parts) + part_size=partlen, init_headers=enc_headers, part_headers=enc_headers, metadata=metadata, resend_parts=resend_parts) lf = (lambda **kwargs: kwargs['params']['headers'].update(enc_headers)) client.meta.events.register('before-call.s3.CompleteMultipartUpload', lf) @@ -9362,8 +9361,57 @@ def test_encryption_sse_c_multipart_upload(): size = response['ContentLength'] assert len(body) == size - _check_content_using_range_enc(client, bucket_name, key, data, 1000000, enc_headers=enc_headers) - _check_content_using_range_enc(client, bucket_name, key, data, 10000000, enc_headers=enc_headers) + _check_content_using_range_enc(client, bucket_name, key, data, size, 1000000, enc_headers=enc_headers) + _check_content_using_range_enc(client, bucket_name, key, data, size, 10000000, enc_headers=enc_headers) + for i in range(-1,2): + _check_content_using_range_enc(client, bucket_name, key, data, size, partlen + i, enc_headers=enc_headers) + +@pytest.mark.encryption +def test_encryption_sse_c_unaligned_multipart_upload(): + bucket_name = get_new_bucket() + client = get_client() + key = "multipart_enc" + content_type = 'text/plain' + objlen = 30 * 1024 * 1024 + partlen = 1 + 5 * 1024 * 1024 # not a multiple of the 4k encryption block size + metadata = {'foo': 'bar'} + enc_headers = { + 'x-amz-server-side-encryption-customer-algorithm': 'AES256', + 'x-amz-server-side-encryption-customer-key': 'pO3upElrwuEXSoFwCfnZPdSsmt/xWeFa0N9KgDijwVs=', + 'x-amz-server-side-encryption-customer-key-md5': 'DWygnHRtgiJ77HCm+1rvHw==', + 'Content-Type': content_type + } + resend_parts = [] + + (upload_id, data, parts) = _multipart_upload_enc(client, bucket_name, key, objlen, + part_size=partlen, init_headers=enc_headers, part_headers=enc_headers, metadata=metadata, resend_parts=resend_parts) + + lf = (lambda **kwargs: kwargs['params']['headers'].update(enc_headers)) + client.meta.events.register('before-call.s3.CompleteMultipartUpload', lf) + client.complete_multipart_upload(Bucket=bucket_name, Key=key, UploadId=upload_id, MultipartUpload={'Parts': parts}) + + response = client.head_bucket(Bucket=bucket_name) + rgw_object_count = int(response['ResponseMetadata']['HTTPHeaders']['x-rgw-object-count']) + assert rgw_object_count == 1 + rgw_bytes_used = int(response['ResponseMetadata']['HTTPHeaders']['x-rgw-bytes-used']) + assert rgw_bytes_used == objlen + + lf = (lambda **kwargs: kwargs['params']['headers'].update(enc_headers)) + client.meta.events.register('before-call.s3.GetObject', lf) + response = client.get_object(Bucket=bucket_name, Key=key) + + assert response['Metadata'] == metadata + assert response['ResponseMetadata']['HTTPHeaders']['content-type'] == content_type + + body = _get_body(response) + assert body == data + size = response['ContentLength'] + assert len(body) == size + + _check_content_using_range_enc(client, bucket_name, key, data, size, 1000000, enc_headers=enc_headers) + _check_content_using_range_enc(client, bucket_name, key, data, size, 10000000, enc_headers=enc_headers) + for i in range(-1,2): + _check_content_using_range_enc(client, bucket_name, key, data, size, partlen + i, enc_headers=enc_headers) @pytest.mark.encryption # TODO: remove this fails_on_rgw when I fix it