diff --git a/s3tests_boto3/functional/test_s3.py b/s3tests_boto3/functional/test_s3.py index 0d47f1c..333604a 100644 --- a/s3tests_boto3/functional/test_s3.py +++ b/s3tests_boto3/functional/test_s3.py @@ -9365,6 +9365,48 @@ def test_encryption_sse_c_multipart_upload(): _check_content_using_range_enc(client, bucket_name, key, data, 1000000, enc_headers=enc_headers) _check_content_using_range_enc(client, bucket_name, key, data, 10000000, enc_headers=enc_headers) +@pytest.mark.encryption +def test_encryption_sse_c_unaligned_multipart_upload(): + bucket_name = get_new_bucket() + client = get_client() + key = "multipart_enc" + content_type = 'text/plain' + objlen = 30 * 1024 * 1024 + partlen = 1 + 5 * 1024 * 1024 # not a multiple of the 4k encryption block size + metadata = {'foo': 'bar'} + enc_headers = { + 'x-amz-server-side-encryption-customer-algorithm': 'AES256', + 'x-amz-server-side-encryption-customer-key': 'pO3upElrwuEXSoFwCfnZPdSsmt/xWeFa0N9KgDijwVs=', + 'x-amz-server-side-encryption-customer-key-md5': 'DWygnHRtgiJ77HCm+1rvHw==', + 'Content-Type': content_type + } + resend_parts = [] + + (upload_id, data, parts) = _multipart_upload_enc(client, bucket_name, key, objlen, + part_size=partlen, init_headers=enc_headers, part_headers=enc_headers, metadata=metadata, resend_parts=resend_parts) + + lf = (lambda **kwargs: kwargs['params']['headers'].update(enc_headers)) + client.meta.events.register('before-call.s3.CompleteMultipartUpload', lf) + client.complete_multipart_upload(Bucket=bucket_name, Key=key, UploadId=upload_id, MultipartUpload={'Parts': parts}) + + response = client.head_bucket(Bucket=bucket_name) + rgw_object_count = int(response['ResponseMetadata']['HTTPHeaders']['x-rgw-object-count']) + assert rgw_object_count == 1 + rgw_bytes_used = int(response['ResponseMetadata']['HTTPHeaders']['x-rgw-bytes-used']) + assert rgw_bytes_used == objlen + + lf = (lambda **kwargs: kwargs['params']['headers'].update(enc_headers)) + client.meta.events.register('before-call.s3.GetObject', lf) + response = client.get_object(Bucket=bucket_name, Key=key) + + assert response['Metadata'] == metadata + assert response['ResponseMetadata']['HTTPHeaders']['content-type'] == content_type + + body = _get_body(response) + assert body == data + size = response['ContentLength'] + assert len(body) == size + @pytest.mark.encryption # TODO: remove this fails_on_rgw when I fix it @pytest.mark.fails_on_rgw