From f8761f855b393f8c8a338f0a72db3df97ec9e606 Mon Sep 17 00:00:00 2001 From: Adam Kupczyk Date: Mon, 11 Jul 2016 15:43:52 +0200 Subject: [PATCH] Additional tests for server side encryption, S3 SSE-KMS mode. All tests belong to group 'encryption'. Signed-off-by: Adam Kupczyk --- s3tests/functional/test_s3.py | 312 ++++++++++++++++++++++++++++++++++ 1 file changed, 312 insertions(+) diff --git a/s3tests/functional/test_s3.py b/s3tests/functional/test_s3.py index 5c0c272..3f3f5f5 100644 --- a/s3tests/functional/test_s3.py +++ b/s3tests/functional/test_s3.py @@ -7867,3 +7867,315 @@ def test_encryption_sse_c_multipart_bad_download(): eq(k.content_type, content_type) e = assert_raises(boto.exception.S3ResponseError, k.get_contents_as_string, headers=get_headers) + + +@attr(resource='object') +@attr(method='post') +@attr(operation='authenticated browser based upload via POST request') +@attr(assertion='succeeds and returns written data') +@attr('encryption') +def test_encryption_sse_c_post_object_authenticated_request(): + bucket = get_new_bucket() + + url = _get_post_url(s3.main, bucket) + + utc = pytz.utc + expires = datetime.datetime.now(utc) + datetime.timedelta(seconds=+6000) + + policy_document = {"expiration": expires.strftime("%Y-%m-%dT%H:%M:%SZ"), \ + "conditions": [ \ + {"bucket": bucket.name}, \ + ["starts-with", "$key", "foo"], \ + {"acl": "private"}, \ + ["starts-with", "$Content-Type", "text/plain"], \ + ["starts-with", "$x-amz-server-side-encryption-customer-algorithm", ""], \ + ["starts-with", "$x-amz-server-side-encryption-customer-key", ""], \ + ["starts-with", "$x-amz-server-side-encryption-customer-key-md5", ""], \ + ["content-length-range", 0, 1024] \ + ] \ + } + + json_policy_document = json.JSONEncoder().encode(policy_document) + policy = base64.b64encode(json_policy_document) + conn = s3.main + signature = base64.b64encode(hmac.new(conn.aws_secret_access_key, policy, sha).digest()) + + payload = OrderedDict([ ("key" , "foo.txt"),("AWSAccessKeyId" , conn.aws_access_key_id), \ + ("acl" , "private"),("signature" , signature),("policy" , policy), \ + ("Content-Type" , "text/plain"), \ + ('x-amz-server-side-encryption-customer-algorithm', 'AES256'), \ + ('x-amz-server-side-encryption-customer-key', 'pO3upElrwuEXSoFwCfnZPdSsmt/xWeFa0N9KgDijwVs='), \ + ('x-amz-server-side-encryption-customer-key-md5', 'DWygnHRtgiJ77HCm+1rvHw=='), \ + ('file', ('bar'),), ]) + + r = requests.post(url, files = payload) + eq(r.status_code, 204) + get_headers = { + 'x-amz-server-side-encryption-customer-algorithm': 'AES256', + 'x-amz-server-side-encryption-customer-key': 'pO3upElrwuEXSoFwCfnZPdSsmt/xWeFa0N9KgDijwVs=', + 'x-amz-server-side-encryption-customer-key-md5': 'DWygnHRtgiJ77HCm+1rvHw==' + } + + key = bucket.get_key("foo.txt") + got = key.get_contents_as_string(headers=get_headers) + eq(got, 'bar') + + +def _test_sse_kms_customer_write(file_size): + """ + Tests Create a file of A's, use it to set_contents_from_file. + Create a file of B's, use it to re-set_contents_from_file. + Re-read the contents, and confirm we get B's + """ + bucket = get_new_bucket() + sse_kms_client_headers = { + 'x-amz-server-side-encryption': 'aws:kms', + 'x-amz-server-side-encryption-aws-kms-key-id': 'testkey-1' + } + key = bucket.new_key('testobj') + data = 'A'*file_size + key.set_contents_from_string(data, headers=sse_kms_client_headers) + rdata = key.get_contents_as_string(headers=sse_kms_client_headers) + eq(data, rdata) + + +@attr(resource='object') +@attr(method='put') +@attr(operation='Test SSE-KMS encrypted transfer 1 byte') +@attr(assertion='success') +@attr('encryption') +def test_sse_kms_transfer_1b(): + _test_sse_kms_customer_write(1) + + +@attr(resource='object') +@attr(method='put') +@attr(operation='Test SSE-KMS encrypted transfer 1KB') +@attr(assertion='success') +@attr('encryption') +def test_sse_kms_transfer_1kb(): + _test_sse_kms_customer_write(1024) + + +@attr(resource='object') +@attr(method='put') +@attr(operation='Test SSE-KMS encrypted transfer 1MB') +@attr(assertion='success') +@attr('encryption') +def test_sse_kms_transfer_1MB(): + _test_sse_kms_customer_write(1024*1024) + + +@attr(resource='object') +@attr(method='put') +@attr(operation='Test SSE-KMS encrypted transfer 13 bytes') +@attr(assertion='success') +@attr('encryption') +def test_sse_kms_transfer_13b(): + _test_sse_kms_customer_write(13) + + +@attr(resource='object') +@attr(method='put') +@attr(operation='write encrypted with SSE-KMS and read without SSE-KMS') +@attr(assertion='operation success') +@attr('encryption') +def test_sse_kms_present(): + bucket = get_new_bucket() + sse_kms_client_headers = { + 'x-amz-server-side-encryption': 'aws:kms', + 'x-amz-server-side-encryption-aws-kms-key-id': 'testkey-1' + } + key = bucket.new_key('testobj') + data = 'A'*100 + key.set_contents_from_string(data, headers=sse_kms_client_headers) + result = key.get_contents_as_string() + eq(data, result) + + +@attr(resource='object') +@attr(method='put') +@attr(operation='write encrypted with SSE-KMS but read with other key') +@attr(assertion='operation fails') +@attr('encryption') +def test_sse_kms_other_key(): + bucket = get_new_bucket() + sse_kms_client_headers_A = { + 'x-amz-server-side-encryption': 'aws:kms', + 'x-amz-server-side-encryption-aws-kms-key-id': 'testkey-1' + } + sse_kms_client_headers_B = { + 'x-amz-server-side-encryption': 'aws:kms', + 'x-amz-server-side-encryption-aws-kms-key-id': 'testkey-2' + } + key = bucket.new_key('testobj') + data = 'A'*100 + key.set_contents_from_string(data, headers=sse_kms_client_headers_A) + result = key.get_contents_as_string(headers=sse_kms_client_headers_B) + eq(data, result) + + +@attr(resource='object') +@attr(method='put') +@attr(operation='declare SSE-KMS but do not provide key_id') +@attr(assertion='operation fails') +@attr('encryption') +def test_sse_kms_no_key(): + bucket = get_new_bucket() + sse_kms_client_headers = { + 'x-amz-server-side-encryption': 'aws:kms' + } + key = bucket.new_key('testobj') + data = 'A'*100 + e = assert_raises(boto.exception.S3ResponseError, + key.set_contents_from_string, data, headers=sse_kms_client_headers) + + +@attr(resource='object') +@attr(method='put') +@attr(operation='Do not declare SSE-KMS but provide key_id') +@attr(assertion='operation successfull, no encryption') +@attr('encryption') +def test_sse_kms_not_declared(): + bucket = get_new_bucket() + sse_kms_client_headers = { + 'x-amz-server-side-encryption-aws-kms-key-id': 'testkey-2' + } + key = bucket.new_key('testobj') + data = 'A'*100 + key.set_contents_from_string(data, headers=sse_kms_client_headers) + rdata = key.get_contents_as_string() + eq(data, rdata) + + +@attr(resource='object') +@attr(method='put') +@attr(operation='complete KMS multi-part upload') +@attr(assertion='successful') +@attr('encryption') +def test_sse_kms_multipart_upload(): + bucket = get_new_bucket() + key = "multipart_enc" + content_type = 'text/plain' + objlen = 30 * 1024 * 1024 + enc_headers = { + 'x-amz-server-side-encryption': 'aws:kms', + 'x-amz-server-side-encryption-aws-kms-key-id': 'testkey-2', + 'Content-Type': content_type + } + (upload, data) = _multipart_upload_enc(bucket, key, objlen, + init_headers=enc_headers, part_headers=enc_headers, + metadata={'foo': 'bar'}) + upload.complete_upload() + result = _head_bucket(bucket) + + eq(result.get('x-rgw-object-count', 1), 1) + eq(result.get('x-rgw-bytes-used', 30 * 1024 * 1024), 30 * 1024 * 1024) + + k = bucket.get_key(key) + eq(k.metadata['foo'], 'bar') + eq(k.content_type, content_type) + test_string = k.get_contents_as_string(headers=enc_headers) + eq(len(test_string), k.size) + eq(data, test_string) + eq(test_string, data) + + _check_content_using_range_enc(k, data, 1000000, enc_headers=enc_headers) + _check_content_using_range_enc(k, data, 10000000, enc_headers=enc_headers) + + +@attr(resource='object') +@attr(method='put') +@attr(operation='multipart KMS upload with bad key_id for uploading chunks') +@attr(assertion='successful') +@attr('encryption') +def test_sse_kms_multipart_invalid_chunks_1(): + bucket = get_new_bucket() + key = "multipart_enc" + content_type = 'text/bla' + objlen = 30 * 1024 * 1024 + init_headers = { + 'x-amz-server-side-encryption': 'aws:kms', + 'x-amz-server-side-encryption-aws-kms-key-id': 'testkey-1', + 'Content-Type': content_type + } + part_headers = { + 'x-amz-server-side-encryption': 'aws:kms', + 'x-amz-server-side-encryption-aws-kms-key-id': 'testkey-2' + } + _multipart_upload_enc(bucket, key, objlen, + init_headers=init_headers, part_headers=part_headers, + metadata={'foo': 'bar'}) + + +@attr(resource='object') +@attr(method='put') +@attr(operation='multipart KMS upload with unexistent key_id for chunks') +@attr(assertion='successful') +@attr('encryption') +def test_sse_kms_multipart_invalid_chunks_2(): + bucket = get_new_bucket() + key = "multipart_enc" + content_type = 'text/plain' + objlen = 30 * 1024 * 1024 + init_headers = { + 'x-amz-server-side-encryption': 'aws:kms', + 'x-amz-server-side-encryption-aws-kms-key-id': 'testkey-1', + 'Content-Type': content_type + } + part_headers = { + 'x-amz-server-side-encryption': 'aws:kms', + 'x-amz-server-side-encryption-aws-kms-key-id': 'testkey-not-present' + } + _multipart_upload_enc(bucket, key, objlen, + init_headers=init_headers, part_headers=part_headers, + metadata={'foo': 'bar'}) + + +@attr(resource='object') +@attr(method='post') +@attr(operation='authenticated KMS browser based upload via POST request') +@attr(assertion='succeeds and returns written data') +@attr('encryption') +def test_sse_kms_post_object_authenticated_request(): + bucket = get_new_bucket() + + url = _get_post_url(s3.main, bucket) + + utc = pytz.utc + expires = datetime.datetime.now(utc) + datetime.timedelta(seconds=+6000) + + policy_document = {"expiration": expires.strftime("%Y-%m-%dT%H:%M:%SZ"), \ + "conditions": [ \ + {"bucket": bucket.name}, \ + ["starts-with", "$key", "foo"], \ + {"acl": "private"}, \ + ["starts-with", "$Content-Type", "text/plain"], \ + ["starts-with", "$x-amz-server-side-encryption", ""], \ + ["starts-with", "$x-amz-server-side-encryption-aws-kms-key-id", ""], \ + ["content-length-range", 0, 1024] \ + ] \ + } + + json_policy_document = json.JSONEncoder().encode(policy_document) + policy = base64.b64encode(json_policy_document) + conn = s3.main + signature = base64.b64encode(hmac.new(conn.aws_secret_access_key, policy, sha).digest()) + + payload = OrderedDict([ ("key" , "foo.txt"),("AWSAccessKeyId" , conn.aws_access_key_id), \ + ("acl" , "private"),("signature" , signature),("policy" , policy), \ + ("Content-Type" , "text/plain"), \ + ('x-amz-server-side-encryption', 'aws:kms'), \ + ('x-amz-server-side-encryption-aws-kms-key-id', 'testkey-1'), \ + ('file', ('bar'),), ]) + + r = requests.post(url, files = payload) + eq(r.status_code, 204) + get_headers = { + 'x-amz-server-side-encryption': 'aws:kms', + 'x-amz-server-side-encryption-aws-kms-key-id': 'testkey-1', + } + + key = bucket.get_key("foo.txt") + got = key.get_contents_as_string(headers=get_headers) + eq(got, 'bar')