mirror of
https://github.com/ceph/s3-tests.git
synced 2024-11-27 03:58:10 +00:00
CopyObject: add test for encrypted objects
Add test for copy on sse-s3 and sse-c encrypted objects. Signed-off-by: Seena Fallah <seenafallah@gmail.com>
This commit is contained in:
parent
08df9352f9
commit
21b9386ade
1 changed files with 172 additions and 0 deletions
|
@ -13915,3 +13915,175 @@ def test_post_object_upload_checksum():
|
||||||
|
|
||||||
r = requests.post(url, files=payload, verify=get_config_ssl_verify())
|
r = requests.post(url, files=payload, verify=get_config_ssl_verify())
|
||||||
assert r.status_code == 400
|
assert r.status_code == 400
|
||||||
|
|
||||||
|
|
||||||
|
#########################
|
||||||
|
# COPY ENCRYPTION TESTS #
|
||||||
|
#########################
|
||||||
|
_copy_enc_source_modes = {
|
||||||
|
'unencrypted': {
|
||||||
|
'marks': [pytest.mark.fails_on_aws],
|
||||||
|
},
|
||||||
|
'sse-s3': {
|
||||||
|
'args': {'ServerSideEncryption': 'AES256'},
|
||||||
|
'assert': lambda r: r['ResponseMetadata']['HTTPHeaders']['x-amz-server-side-encryption'] == 'AES256',
|
||||||
|
'marks': [pytest.mark.sse_s3],
|
||||||
|
},
|
||||||
|
'sse-c': {
|
||||||
|
'args': {
|
||||||
|
'SSECustomerAlgorithm': 'AES256',
|
||||||
|
'SSECustomerKey': 'pO3upElrwuEXSoFwCfnZPdSsmt/xWeFa0N9KgDijwVs=',
|
||||||
|
'SSECustomerKeyMD5': 'DWygnHRtgiJ77HCm+1rvHw==',
|
||||||
|
},
|
||||||
|
'source_copy_args': {
|
||||||
|
'CopySourceSSECustomerAlgorithm': 'AES256',
|
||||||
|
'CopySourceSSECustomerKey': 'pO3upElrwuEXSoFwCfnZPdSsmt/xWeFa0N9KgDijwVs=',
|
||||||
|
'CopySourceSSECustomerKeyMD5': 'DWygnHRtgiJ77HCm+1rvHw==',
|
||||||
|
},
|
||||||
|
'assert': lambda r: (
|
||||||
|
r['ResponseMetadata']['HTTPHeaders']['x-amz-server-side-encryption-customer-algorithm'] == 'AES256' and
|
||||||
|
r['ResponseMetadata']['HTTPHeaders']['x-amz-server-side-encryption-customer-key-MD5'] == 'DWygnHRtgiJ77HCm+1rvHw=='
|
||||||
|
)
|
||||||
|
},
|
||||||
|
'sse-kms': {
|
||||||
|
'args': {
|
||||||
|
'ServerSideEncryption': 'aws:kms',
|
||||||
|
'SSEKMSKeyId': lambda: get_main_kms_keyid()
|
||||||
|
},
|
||||||
|
'assert': lambda r: (
|
||||||
|
r['ResponseMetadata']['HTTPHeaders']['x-amz-server-side-encryption'] == 'aws:kms' and
|
||||||
|
r['ResponseMetadata']['HTTPHeaders']['x-amz-server-side-encryption-aws-kms-key-id'] == get_main_kms_keyid()
|
||||||
|
)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
_copy_enc_dest_modes = {
|
||||||
|
'unencrypted': {
|
||||||
|
'marks': [pytest.mark.fails_on_aws],
|
||||||
|
},
|
||||||
|
'sse-s3': {
|
||||||
|
'args': {'ServerSideEncryption': 'AES256'},
|
||||||
|
'assert': lambda r: r['ResponseMetadata']['HTTPHeaders']['x-amz-server-side-encryption'] == 'AES256',
|
||||||
|
'marks': [pytest.mark.sse_s3],
|
||||||
|
},
|
||||||
|
'sse-c': {
|
||||||
|
'args': {
|
||||||
|
'SSECustomerAlgorithm': 'AES256',
|
||||||
|
'SSECustomerKey': '6b+WOZ1T3cqZMxgThRcXAQBrS5mXKdDUphvpxptl9/4=',
|
||||||
|
'SSECustomerKeyMD5': 'arxBvwY2V4SiOne6yppVPQ=='
|
||||||
|
},
|
||||||
|
'get_args': {
|
||||||
|
'SSECustomerAlgorithm': 'AES256',
|
||||||
|
'SSECustomerKey': '6b+WOZ1T3cqZMxgThRcXAQBrS5mXKdDUphvpxptl9/4=',
|
||||||
|
'SSECustomerKeyMD5': 'arxBvwY2V4SiOne6yppVPQ=='
|
||||||
|
},
|
||||||
|
'assert': lambda r: (
|
||||||
|
r['ResponseMetadata']['HTTPHeaders']['x-amz-server-side-encryption-customer-algorithm'] == 'AES256' and
|
||||||
|
r['ResponseMetadata']['HTTPHeaders']['x-amz-server-side-encryption-customer-key-MD5'] == 'arxBvwY2V4SiOne6yppVPQ=='
|
||||||
|
)
|
||||||
|
},
|
||||||
|
'sse-kms': {
|
||||||
|
'args': {
|
||||||
|
'ServerSideEncryption': 'aws:kms',
|
||||||
|
'SSEKMSKeyId': lambda: get_secondary_kms_keyid()
|
||||||
|
},
|
||||||
|
'assert': lambda r: (
|
||||||
|
r['ResponseMetadata']['HTTPHeaders']['x-amz-server-side-encryption'] == 'aws:kms' and
|
||||||
|
r['ResponseMetadata']['HTTPHeaders']['x-amz-server-side-encryption-aws-kms-key-id'] == get_secondary_kms_keyid()
|
||||||
|
)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
def _test_copy_enc(file_size, source_mode_key, dest_mode_key):
|
||||||
|
source_args = _copy_enc_source_modes[source_mode_key]
|
||||||
|
dest_args = _copy_enc_dest_modes[dest_mode_key]
|
||||||
|
|
||||||
|
bucket_name = get_new_bucket()
|
||||||
|
client = get_client()
|
||||||
|
|
||||||
|
# upload original file with source encryption
|
||||||
|
data = 'A'*file_size
|
||||||
|
args = {key: value() if callable(value) else value for key, value in source_args.get('args', {}).items()}
|
||||||
|
response = client.put_object(Bucket=bucket_name, Key='testobj', Body=data, **args)
|
||||||
|
assert source_args.get('assert', lambda r: True)(response)
|
||||||
|
|
||||||
|
# copy the object to a new key, with destination encryption
|
||||||
|
dest_bucket_name = get_new_bucket()
|
||||||
|
copy_args = {key: value() if callable(value) else value for key, value in dest_args.get('args', {}).items()}
|
||||||
|
copy_args.update(source_args.get('source_copy_args', {}))
|
||||||
|
response = client.copy_object(Bucket=dest_bucket_name, Key='testobj2', CopySource={'Bucket': bucket_name, 'Key': 'testobj'}, **copy_args)
|
||||||
|
assert dest_args.get('assert', lambda r: True)(response)
|
||||||
|
|
||||||
|
# verify the copy is encrypted
|
||||||
|
get_args = dest_args.get('get_args', {})
|
||||||
|
response = client.get_object(Bucket=dest_bucket_name, Key='testobj2', **get_args)
|
||||||
|
assert dest_args.get('assert', lambda r: True)(response)
|
||||||
|
body = _get_body(response)
|
||||||
|
assert body == data
|
||||||
|
|
||||||
|
@pytest.mark.encryption
|
||||||
|
@pytest.mark.fails_on_dbstore
|
||||||
|
@pytest.mark.parametrize("source_mode_key, dest_mode_key", [
|
||||||
|
pytest.param(
|
||||||
|
source_key,
|
||||||
|
dest_key,
|
||||||
|
marks=[
|
||||||
|
*_copy_enc_source_modes[source_key].get('marks', []),
|
||||||
|
*_copy_enc_dest_modes[dest_key].get('marks', [])
|
||||||
|
]
|
||||||
|
)
|
||||||
|
for source_key in _copy_enc_source_modes.keys()
|
||||||
|
for dest_key in _copy_enc_dest_modes.keys()
|
||||||
|
])
|
||||||
|
def test_copy_enc_1b(source_mode_key, dest_mode_key):
|
||||||
|
_test_copy_enc(1, source_mode_key, dest_mode_key)
|
||||||
|
|
||||||
|
@pytest.mark.encryption
|
||||||
|
@pytest.mark.fails_on_dbstore
|
||||||
|
@pytest.mark.parametrize("source_mode_key, dest_mode_key", [
|
||||||
|
pytest.param(
|
||||||
|
source_key,
|
||||||
|
dest_key,
|
||||||
|
marks=[
|
||||||
|
*_copy_enc_source_modes[source_key].get('marks', []),
|
||||||
|
*_copy_enc_dest_modes[dest_key].get('marks', [])
|
||||||
|
]
|
||||||
|
)
|
||||||
|
for source_key in _copy_enc_source_modes.keys()
|
||||||
|
for dest_key in _copy_enc_dest_modes.keys()
|
||||||
|
])
|
||||||
|
def test_copy_enc_1kb(source_mode_key, dest_mode_key):
|
||||||
|
_test_copy_enc(1024, source_mode_key, dest_mode_key)
|
||||||
|
|
||||||
|
@pytest.mark.encryption
|
||||||
|
@pytest.mark.fails_on_dbstore
|
||||||
|
@pytest.mark.parametrize("source_mode_key, dest_mode_key", [
|
||||||
|
pytest.param(
|
||||||
|
source_key,
|
||||||
|
dest_key,
|
||||||
|
marks=[
|
||||||
|
*_copy_enc_source_modes[source_key].get('marks', []),
|
||||||
|
*_copy_enc_dest_modes[dest_key].get('marks', [])
|
||||||
|
]
|
||||||
|
)
|
||||||
|
for source_key in _copy_enc_source_modes.keys()
|
||||||
|
for dest_key in _copy_enc_dest_modes.keys()
|
||||||
|
])
|
||||||
|
def test_copy_enc_1mb(source_mode_key, dest_mode_key):
|
||||||
|
_test_copy_enc(1024*1024, source_mode_key, dest_mode_key)
|
||||||
|
|
||||||
|
@pytest.mark.encryption
|
||||||
|
@pytest.mark.fails_on_dbstore
|
||||||
|
@pytest.mark.parametrize("source_mode_key, dest_mode_key", [
|
||||||
|
pytest.param(
|
||||||
|
source_key,
|
||||||
|
dest_key,
|
||||||
|
marks=[
|
||||||
|
*_copy_enc_source_modes[source_key].get('marks', []),
|
||||||
|
*_copy_enc_dest_modes[dest_key].get('marks', [])
|
||||||
|
]
|
||||||
|
)
|
||||||
|
for source_key in _copy_enc_source_modes.keys()
|
||||||
|
for dest_key in _copy_enc_dest_modes.keys()
|
||||||
|
])
|
||||||
|
def test_copy_enc_8mb(source_mode_key, dest_mode_key):
|
||||||
|
_test_copy_enc(8*1024*1024, source_mode_key, dest_mode_key)
|
||||||
|
|
Loading…
Reference in a new issue