Additional tests for server side encryption, S3 SSE-C mode.

All tests belong to group 'encryption'.

Signed-off-by: Adam Kupczyk <akupczyk@mirantis.com>
This commit is contained in:
Adam Kupczyk 2016-06-02 18:13:56 +02:00 committed by Casey Bodley
parent 4eec3bebc7
commit d92031ef16

View file

@ -5119,14 +5119,14 @@ def test_object_copy_versioning_multipart_upload():
got = key6.get_contents_as_string() got = key6.get_contents_as_string()
eq(got, data) eq(got, data)
def transfer_part(bucket, mp_id, mp_keyname, i, part): def transfer_part(bucket, mp_id, mp_keyname, i, part, headers=None):
"""Transfer a part of a multipart upload. Designed to be run in parallel. """Transfer a part of a multipart upload. Designed to be run in parallel.
""" """
mp = boto.s3.multipart.MultiPartUpload(bucket) mp = boto.s3.multipart.MultiPartUpload(bucket)
mp.key_name = mp_keyname mp.key_name = mp_keyname
mp.id = mp_id mp.id = mp_id
part_out = StringIO(part) part_out = StringIO(part)
mp.upload_part_from_file(part_out, i+1) mp.upload_part_from_file(part_out, i+1, headers=headers)
def copy_part(src_bucket, src_keyname, dst_bucket, dst_keyname, mp_id, i, start=None, end=None, src_version_id=None): def copy_part(src_bucket, src_keyname, dst_bucket, dst_keyname, mp_id, i, start=None, end=None, src_version_id=None):
"""Copy a part of a multipart upload from other bucket. """Copy a part of a multipart upload from other bucket.
@ -5167,9 +5167,9 @@ def _multipart_upload(bucket, s3_key_name, size, part_size=5*1024*1024, do_list=
s = '' s = ''
for i, part in enumerate(generate_random(size, part_size)): for i, part in enumerate(generate_random(size, part_size)):
s += part s += part
transfer_part(bucket, upload.id, upload.key_name, i, part) transfer_part(bucket, upload.id, upload.key_name, i, part, headers)
if i in resend_parts: if i in resend_parts:
transfer_part(bucket, upload.id, upload.key_name, i, part) transfer_part(bucket, upload.id, upload.key_name, i, part, headers)
if do_list is not None: if do_list is not None:
l = bucket.list_multipart_uploads() l = bucket.list_multipart_uploads()
@ -7533,3 +7533,337 @@ def test_lifecycle_rules_conflicted():
eq(e.status, 400) eq(e.status, 400)
eq(e.error_code, 'InvalidRequest') eq(e.error_code, 'InvalidRequest')
def _test_encryption_sse_customer_write(file_size):
"""
Tests Create a file of A's, use it to set_contents_from_file.
Create a file of B's, use it to re-set_contents_from_file.
Re-read the contents, and confirm we get B's
"""
bucket = get_new_bucket()
sse_client_headers = {
'x-amz-server-side-encryption-customer-algorithm': 'AES256',
'x-amz-server-side-encryption-customer-key': 'pO3upElrwuEXSoFwCfnZPdSsmt/xWeFa0N9KgDijwVs=',
'x-amz-server-side-encryption-customer-key-md5': 'DWygnHRtgiJ77HCm+1rvHw=='
}
key = bucket.new_key('testobj')
data = 'A'*file_size
key.set_contents_from_string(data, headers=sse_client_headers)
rdata = key.get_contents_as_string(headers=sse_client_headers)
eq(data, rdata)
@attr(resource='object')
@attr(method='put')
@attr(operation='Test SSE-C encrypted transfer 1 byte')
@attr(assertion='success')
@attr('encryption')
def test_encrypted_transfer_1b():
_test_encryption_sse_customer_write(1)
@attr(resource='object')
@attr(method='put')
@attr(operation='Test SSE-C encrypted transfer 1KB')
@attr(assertion='success')
@attr('encryption')
def test_encrypted_transfer_1kb():
_test_encryption_sse_customer_write(1024)
@attr(resource='object')
@attr(method='put')
@attr(operation='Test SSE-C encrypted transfer 1MB')
@attr(assertion='success')
@attr('encryption')
def test_encrypted_transfer_1MB():
_test_encryption_sse_customer_write(1024*1024)
@attr(resource='object')
@attr(method='put')
@attr(operation='Test SSE-C encrypted transfer 13 bytes')
@attr(assertion='success')
@attr('encryption')
def test_encrypted_transfer_13b():
_test_encryption_sse_customer_write(13)
@attr(resource='object')
@attr(method='put')
@attr(operation='write encrypted with SSE-C and read without SSE-C')
@attr(assertion='operation fails')
@attr('encryption')
def test_encryption_sse_c_present():
bucket = get_new_bucket()
sse_client_headers = {
'x-amz-server-side-encryption-customer-algorithm': 'AES256',
'x-amz-server-side-encryption-customer-key': 'pO3upElrwuEXSoFwCfnZPdSsmt/xWeFa0N9KgDijwVs=',
'x-amz-server-side-encryption-customer-key-md5': 'DWygnHRtgiJ77HCm+1rvHw=='
}
key = bucket.new_key('testobj')
data = 'A'*100
key.set_contents_from_string(data, headers=sse_client_headers)
e = assert_raises(boto.exception.S3ResponseError, key.get_contents_as_string)
eq(e.status, 400)
@attr(resource='object')
@attr(method='put')
@attr(operation='write encrypted with SSE-C but read with other key')
@attr(assertion='operation fails')
@attr('encryption')
def test_encryption_sse_c_other_key():
bucket = get_new_bucket()
sse_client_headers_A = {
'x-amz-server-side-encryption-customer-algorithm': 'AES256',
'x-amz-server-side-encryption-customer-key': 'pO3upElrwuEXSoFwCfnZPdSsmt/xWeFa0N9KgDijwVs=',
'x-amz-server-side-encryption-customer-key-md5': 'DWygnHRtgiJ77HCm+1rvHw=='
}
sse_client_headers_B = {
'x-amz-server-side-encryption-customer-algorithm': 'AES256',
'x-amz-server-side-encryption-customer-key': '6b+WOZ1T3cqZMxgThRcXAQBrS5mXKdDUphvpxptl9/4=',
'x-amz-server-side-encryption-customer-key-md5': 'arxBvwY2V4SiOne6yppVPQ=='
}
key = bucket.new_key('testobj')
data = 'A'*100
key.set_contents_from_string(data, headers=sse_client_headers_A)
e = assert_raises(boto.exception.S3ResponseError,
key.get_contents_as_string, headers=sse_client_headers_B)
eq(e.status, 400)
@attr(resource='object')
@attr(method='put')
@attr(operation='write encrypted with SSE-C, but md5 is bad')
@attr(assertion='operation fails')
@attr('encryption')
def test_encryption_sse_c_invalid_md5():
bucket = get_new_bucket()
sse_client_headers = {
'x-amz-server-side-encryption-customer-algorithm': 'AES256',
'x-amz-server-side-encryption-customer-key': 'pO3upElrwuEXSoFwCfnZPdSsmt/xWeFa0N9KgDijwVs=',
'x-amz-server-side-encryption-customer-key-md5': 'AAAAAAAAAAAAAAAAAAAAAA=='
}
key = bucket.new_key('testobj')
data = 'A'*100
e = assert_raises(boto.exception.S3ResponseError,
key.set_contents_from_string, data, headers=sse_client_headers)
eq(e.status, 400)
@attr(resource='object')
@attr(method='put')
@attr(operation='write encrypted with SSE-C, but dont provide MD5')
@attr(assertion='operation fails')
@attr('encryption')
def test_encryption_sse_c_no_md5():
bucket = get_new_bucket()
sse_client_headers = {
'x-amz-server-side-encryption-customer-algorithm': 'AES256',
'x-amz-server-side-encryption-customer-key': 'pO3upElrwuEXSoFwCfnZPdSsmt/xWeFa0N9KgDijwVs='
}
key = bucket.new_key('testobj')
data = 'A'*100
e = assert_raises(boto.exception.S3ResponseError,
key.set_contents_from_string, data, headers=sse_client_headers)
@attr(resource='object')
@attr(method='put')
@attr(operation='declare SSE-C but do not provide key')
@attr(assertion='operation fails')
@attr('encryption')
def test_encryption_sse_c_no_key():
bucket = get_new_bucket()
sse_client_headers = {
'x-amz-server-side-encryption-customer-algorithm': 'AES256'
}
key = bucket.new_key('testobj')
data = 'A'*100
e = assert_raises(boto.exception.S3ResponseError,
key.set_contents_from_string, data, headers=sse_client_headers)
@attr(resource='object')
@attr(method='put')
@attr(operation='Do not declare SSE-C but provide key and MD5')
@attr(assertion='operation successfull, no encryption')
@attr('encryption')
def test_encryption_key_no_sse_c():
bucket = get_new_bucket()
sse_client_headers = {
'x-amz-server-side-encryption-customer-key': 'pO3upElrwuEXSoFwCfnZPdSsmt/xWeFa0N9KgDijwVs=',
'x-amz-server-side-encryption-customer-key-md5': 'DWygnHRtgiJ77HCm+1rvHw=='
}
key = bucket.new_key('testobj')
data = 'A'*100
key.set_contents_from_string(data, headers=sse_client_headers)
rdata = key.get_contents_as_string()
eq(data, rdata)
def _multipart_upload_enc(bucket, s3_key_name, size, part_size=5*1024*1024,
do_list=None, init_headers=None, part_headers=None,
metadata=None, resend_parts=[]):
"""
generate a multi-part upload for a random file of specifed size,
if requested, generate a list of the parts
return the upload descriptor
"""
upload = bucket.initiate_multipart_upload(s3_key_name, headers=init_headers, metadata=metadata)
s = ''
for i, part in enumerate(generate_random(size, part_size)):
s += part
transfer_part(bucket, upload.id, upload.key_name, i, part, part_headers)
if i in resend_parts:
transfer_part(bucket, upload.id, upload.key_name, i, part, part_headers)
if do_list is not None:
l = bucket.list_multipart_uploads()
l = list(l)
return (upload, s)
def _check_content_using_range_enc(k, data, step, enc_headers=None):
objlen = k.size
for ofs in xrange(0, k.size, step):
toread = k.size - ofs
if toread > step:
toread = step
end = ofs + toread - 1
read_range = k.get_contents_as_string(
headers=dict({'Range': 'bytes={s}-{e}'.format(s=ofs, e=end)}, **enc_headers))
eq(len(read_range), toread)
eq(read_range, data[ofs:end+1])
@attr(resource='object')
@attr(method='put')
@attr(operation='complete multi-part upload')
@attr(assertion='successful')
@attr('encryption')
def test_encryption_sse_c_multipart_upload():
bucket = get_new_bucket()
key = "multipart_enc"
content_type = 'text/plain'
objlen = 30 * 1024 * 1024
enc_headers = {
'x-amz-server-side-encryption-customer-algorithm': 'AES256',
'x-amz-server-side-encryption-customer-key': 'pO3upElrwuEXSoFwCfnZPdSsmt/xWeFa0N9KgDijwVs=',
'x-amz-server-side-encryption-customer-key-md5': 'DWygnHRtgiJ77HCm+1rvHw==',
'Content-Type': content_type
}
(upload, data) = _multipart_upload_enc(bucket, key, objlen,
init_headers=enc_headers, part_headers=enc_headers,
metadata={'foo': 'bar'})
upload.complete_upload()
result = _head_bucket(bucket)
eq(result.get('x-rgw-object-count', 1), 1)
eq(result.get('x-rgw-bytes-used', 30 * 1024 * 1024), 30 * 1024 * 1024)
k = bucket.get_key(key)
eq(k.metadata['foo'], 'bar')
eq(k.content_type, content_type)
test_string = k.get_contents_as_string(headers=enc_headers)
eq(len(test_string), k.size)
eq(data, test_string)
eq(test_string, data)
_check_content_using_range_enc(k, data, 1000000, enc_headers=enc_headers)
_check_content_using_range_enc(k, data, 10000000, enc_headers=enc_headers)
@attr(resource='object')
@attr(method='put')
@attr(operation='multipart upload with bad key for uploading chunks')
@attr(assertion='successful')
@attr('encryption')
def test_encryption_sse_c_multipart_invalid_chunks_1():
bucket = get_new_bucket()
key = "multipart_enc"
content_type = 'text/bla'
objlen = 30 * 1024 * 1024
init_headers = {
'x-amz-server-side-encryption-customer-algorithm': 'AES256',
'x-amz-server-side-encryption-customer-key': 'pO3upElrwuEXSoFwCfnZPdSsmt/xWeFa0N9KgDijwVs=',
'x-amz-server-side-encryption-customer-key-md5': 'DWygnHRtgiJ77HCm+1rvHw==',
'Content-Type': content_type
}
part_headers = {
'x-amz-server-side-encryption-customer-algorithm': 'AES256',
'x-amz-server-side-encryption-customer-key': '6b+WOZ1T3cqZMxgThRcXAQBrS5mXKdDUphvpxptl9/4=',
'x-amz-server-side-encryption-customer-key-md5': 'arxBvwY2V4SiOne6yppVPQ=='
}
e = assert_raises(boto.exception.S3ResponseError,
_multipart_upload_enc, bucket, key, objlen,
init_headers=init_headers, part_headers=part_headers,
metadata={'foo': 'bar'})
eq(e.status, 400)
@attr(resource='object')
@attr(method='put')
@attr(operation='multipart upload with bad md5 for chunks')
@attr(assertion='successful')
@attr('encryption')
def test_encryption_sse_c_multipart_invalid_chunks_2():
bucket = get_new_bucket()
key = "multipart_enc"
content_type = 'text/plain'
objlen = 30 * 1024 * 1024
init_headers = {
'x-amz-server-side-encryption-customer-algorithm': 'AES256',
'x-amz-server-side-encryption-customer-key': 'pO3upElrwuEXSoFwCfnZPdSsmt/xWeFa0N9KgDijwVs=',
'x-amz-server-side-encryption-customer-key-md5': 'DWygnHRtgiJ77HCm+1rvHw==',
'Content-Type': content_type
}
part_headers = {
'x-amz-server-side-encryption-customer-algorithm': 'AES256',
'x-amz-server-side-encryption-customer-key': 'pO3upElrwuEXSoFwCfnZPdSsmt/xWeFa0N9KgDijwVs=',
'x-amz-server-side-encryption-customer-key-md5': 'AAAAAAAAAAAAAAAAAAAAAA=='
}
e = assert_raises(boto.exception.S3ResponseError,
_multipart_upload_enc, bucket, key, objlen,
init_headers=init_headers, part_headers=part_headers,
metadata={'foo': 'bar'})
eq(e.status, 400)
@attr(resource='object')
@attr(method='put')
@attr(operation='complete multi-part upload and download with bad key')
@attr(assertion='successful')
@attr('encryption')
def test_encryption_sse_c_multipart_bad_download():
bucket = get_new_bucket()
key = "multipart_enc"
content_type = 'text/plain'
objlen = 30 * 1024 * 1024
put_headers = {
'x-amz-server-side-encryption-customer-algorithm': 'AES256',
'x-amz-server-side-encryption-customer-key': 'pO3upElrwuEXSoFwCfnZPdSsmt/xWeFa0N9KgDijwVs=',
'x-amz-server-side-encryption-customer-key-md5': 'DWygnHRtgiJ77HCm+1rvHw==',
'Content-Type': content_type
}
get_headers = {
'x-amz-server-side-encryption-customer-algorithm': 'AES256',
'x-amz-server-side-encryption-customer-key': '6b+WOZ1T3cqZMxgThRcXAQBrS5mXKdDUphvpxptl9/4=',
'x-amz-server-side-encryption-customer-key-md5': 'arxBvwY2V4SiOne6yppVPQ=='
}
(upload, data) = _multipart_upload_enc(bucket, key, objlen,
init_headers=put_headers, part_headers=put_headers,
metadata={'foo': 'bar'})
upload.complete_upload()
result = _head_bucket(bucket)
eq(result.get('x-rgw-object-count', 1), 1)
eq(result.get('x-rgw-bytes-used', 30 * 1024 * 1024), 30 * 1024 * 1024)
k = bucket.get_key(key)
eq(k.metadata['foo'], 'bar')
eq(k.content_type, content_type)
e = assert_raises(boto.exception.S3ResponseError,
k.get_contents_as_string, headers=get_headers)