diff --git a/s3tests_boto3/functional/test_s3.py b/s3tests_boto3/functional/test_s3.py index 76fa585..da6d9ee 100644 --- a/s3tests_boto3/functional/test_s3.py +++ b/s3tests_boto3/functional/test_s3.py @@ -13915,3 +13915,175 @@ def test_post_object_upload_checksum(): r = requests.post(url, files=payload, verify=get_config_ssl_verify()) assert r.status_code == 400 + + +######################### +# COPY ENCRYPTION TESTS # +######################### +_copy_enc_source_modes = { + 'unencrypted': { + 'marks': [pytest.mark.fails_on_aws], + }, + 'sse-s3': { + 'args': {'ServerSideEncryption': 'AES256'}, + 'assert': lambda r: r['ResponseMetadata']['HTTPHeaders']['x-amz-server-side-encryption'] == 'AES256', + 'marks': [pytest.mark.sse_s3], + }, + 'sse-c': { + 'args': { + 'SSECustomerAlgorithm': 'AES256', + 'SSECustomerKey': 'pO3upElrwuEXSoFwCfnZPdSsmt/xWeFa0N9KgDijwVs=', + 'SSECustomerKeyMD5': 'DWygnHRtgiJ77HCm+1rvHw==', + }, + 'source_copy_args': { + 'CopySourceSSECustomerAlgorithm': 'AES256', + 'CopySourceSSECustomerKey': 'pO3upElrwuEXSoFwCfnZPdSsmt/xWeFa0N9KgDijwVs=', + 'CopySourceSSECustomerKeyMD5': 'DWygnHRtgiJ77HCm+1rvHw==', + }, + 'assert': lambda r: ( + r['ResponseMetadata']['HTTPHeaders']['x-amz-server-side-encryption-customer-algorithm'] == 'AES256' and + r['ResponseMetadata']['HTTPHeaders']['x-amz-server-side-encryption-customer-key-MD5'] == 'DWygnHRtgiJ77HCm+1rvHw==' + ) + }, + 'sse-kms': { + 'args': { + 'ServerSideEncryption': 'aws:kms', + 'SSEKMSKeyId': lambda: get_main_kms_keyid() + }, + 'assert': lambda r: ( + r['ResponseMetadata']['HTTPHeaders']['x-amz-server-side-encryption'] == 'aws:kms' and + r['ResponseMetadata']['HTTPHeaders']['x-amz-server-side-encryption-aws-kms-key-id'] == get_main_kms_keyid() + ) + } +} +_copy_enc_dest_modes = { + 'unencrypted': { + 'marks': [pytest.mark.fails_on_aws], + }, + 'sse-s3': { + 'args': {'ServerSideEncryption': 'AES256'}, + 'assert': lambda r: r['ResponseMetadata']['HTTPHeaders']['x-amz-server-side-encryption'] == 'AES256', + 'marks': [pytest.mark.sse_s3], + }, + 'sse-c': { + 'args': { + 'SSECustomerAlgorithm': 'AES256', + 'SSECustomerKey': '6b+WOZ1T3cqZMxgThRcXAQBrS5mXKdDUphvpxptl9/4=', + 'SSECustomerKeyMD5': 'arxBvwY2V4SiOne6yppVPQ==' + }, + 'get_args': { + 'SSECustomerAlgorithm': 'AES256', + 'SSECustomerKey': '6b+WOZ1T3cqZMxgThRcXAQBrS5mXKdDUphvpxptl9/4=', + 'SSECustomerKeyMD5': 'arxBvwY2V4SiOne6yppVPQ==' + }, + 'assert': lambda r: ( + r['ResponseMetadata']['HTTPHeaders']['x-amz-server-side-encryption-customer-algorithm'] == 'AES256' and + r['ResponseMetadata']['HTTPHeaders']['x-amz-server-side-encryption-customer-key-MD5'] == 'arxBvwY2V4SiOne6yppVPQ==' + ) + }, + 'sse-kms': { + 'args': { + 'ServerSideEncryption': 'aws:kms', + 'SSEKMSKeyId': lambda: get_secondary_kms_keyid() + }, + 'assert': lambda r: ( + r['ResponseMetadata']['HTTPHeaders']['x-amz-server-side-encryption'] == 'aws:kms' and + r['ResponseMetadata']['HTTPHeaders']['x-amz-server-side-encryption-aws-kms-key-id'] == get_secondary_kms_keyid() + ) + } +} + +def _test_copy_enc(file_size, source_mode_key, dest_mode_key): + source_args = _copy_enc_source_modes[source_mode_key] + dest_args = _copy_enc_dest_modes[dest_mode_key] + + bucket_name = get_new_bucket() + client = get_client() + + # upload original file with source encryption + data = 'A'*file_size + args = {key: value() if callable(value) else value for key, value in source_args.get('args', {}).items()} + response = client.put_object(Bucket=bucket_name, Key='testobj', Body=data, **args) + assert source_args.get('assert', lambda r: True)(response) + + # copy the object to a new key, with destination encryption + dest_bucket_name = get_new_bucket() + copy_args = {key: value() if callable(value) else value for key, value in dest_args.get('args', {}).items()} + copy_args.update(source_args.get('source_copy_args', {})) + response = client.copy_object(Bucket=dest_bucket_name, Key='testobj2', CopySource={'Bucket': bucket_name, 'Key': 'testobj'}, **copy_args) + assert dest_args.get('assert', lambda r: True)(response) + + # verify the copy is encrypted + get_args = dest_args.get('get_args', {}) + response = client.get_object(Bucket=dest_bucket_name, Key='testobj2', **get_args) + assert dest_args.get('assert', lambda r: True)(response) + body = _get_body(response) + assert body == data + +@pytest.mark.encryption +@pytest.mark.fails_on_dbstore +@pytest.mark.parametrize("source_mode_key, dest_mode_key", [ + pytest.param( + source_key, + dest_key, + marks=[ + *_copy_enc_source_modes[source_key].get('marks', []), + *_copy_enc_dest_modes[dest_key].get('marks', []) + ] + ) + for source_key in _copy_enc_source_modes.keys() + for dest_key in _copy_enc_dest_modes.keys() +]) +def test_copy_enc_1b(source_mode_key, dest_mode_key): + _test_copy_enc(1, source_mode_key, dest_mode_key) + +@pytest.mark.encryption +@pytest.mark.fails_on_dbstore +@pytest.mark.parametrize("source_mode_key, dest_mode_key", [ + pytest.param( + source_key, + dest_key, + marks=[ + *_copy_enc_source_modes[source_key].get('marks', []), + *_copy_enc_dest_modes[dest_key].get('marks', []) + ] + ) + for source_key in _copy_enc_source_modes.keys() + for dest_key in _copy_enc_dest_modes.keys() +]) +def test_copy_enc_1kb(source_mode_key, dest_mode_key): + _test_copy_enc(1024, source_mode_key, dest_mode_key) + +@pytest.mark.encryption +@pytest.mark.fails_on_dbstore +@pytest.mark.parametrize("source_mode_key, dest_mode_key", [ + pytest.param( + source_key, + dest_key, + marks=[ + *_copy_enc_source_modes[source_key].get('marks', []), + *_copy_enc_dest_modes[dest_key].get('marks', []) + ] + ) + for source_key in _copy_enc_source_modes.keys() + for dest_key in _copy_enc_dest_modes.keys() +]) +def test_copy_enc_1mb(source_mode_key, dest_mode_key): + _test_copy_enc(1024*1024, source_mode_key, dest_mode_key) + +@pytest.mark.encryption +@pytest.mark.fails_on_dbstore +@pytest.mark.parametrize("source_mode_key, dest_mode_key", [ + pytest.param( + source_key, + dest_key, + marks=[ + *_copy_enc_source_modes[source_key].get('marks', []), + *_copy_enc_dest_modes[dest_key].get('marks', []) + ] + ) + for source_key in _copy_enc_source_modes.keys() + for dest_key in _copy_enc_dest_modes.keys() +]) +def test_copy_enc_8mb(source_mode_key, dest_mode_key): + _test_copy_enc(8*1024*1024, source_mode_key, dest_mode_key)