mirror of
https://github.com/ceph/s3-tests.git
synced 2024-11-21 23:29:47 +00:00
Additional tests for server side encryption, S3 SSE-KMS mode.
All tests belong to group 'encryption'.
Signed-off-by: Adam Kupczyk <akupczyk@mirantis.com>
(cherry picked from commit f8761f855b
)
This commit is contained in:
parent
ea2454654e
commit
f4cf8eceb7
1 changed files with 312 additions and 0 deletions
|
@ -7440,3 +7440,315 @@ def test_encryption_sse_c_multipart_bad_download():
|
|||
eq(k.content_type, content_type)
|
||||
e = assert_raises(boto.exception.S3ResponseError,
|
||||
k.get_contents_as_string, headers=get_headers)
|
||||
|
||||
|
||||
@attr(resource='object')
|
||||
@attr(method='post')
|
||||
@attr(operation='authenticated browser based upload via POST request')
|
||||
@attr(assertion='succeeds and returns written data')
|
||||
@attr('encryption')
|
||||
def test_encryption_sse_c_post_object_authenticated_request():
|
||||
bucket = get_new_bucket()
|
||||
|
||||
url = _get_post_url(s3.main, bucket)
|
||||
|
||||
utc = pytz.utc
|
||||
expires = datetime.datetime.now(utc) + datetime.timedelta(seconds=+6000)
|
||||
|
||||
policy_document = {"expiration": expires.strftime("%Y-%m-%dT%H:%M:%SZ"), \
|
||||
"conditions": [ \
|
||||
{"bucket": bucket.name}, \
|
||||
["starts-with", "$key", "foo"], \
|
||||
{"acl": "private"}, \
|
||||
["starts-with", "$Content-Type", "text/plain"], \
|
||||
["starts-with", "$x-amz-server-side-encryption-customer-algorithm", ""], \
|
||||
["starts-with", "$x-amz-server-side-encryption-customer-key", ""], \
|
||||
["starts-with", "$x-amz-server-side-encryption-customer-key-md5", ""], \
|
||||
["content-length-range", 0, 1024] \
|
||||
] \
|
||||
}
|
||||
|
||||
json_policy_document = json.JSONEncoder().encode(policy_document)
|
||||
policy = base64.b64encode(json_policy_document)
|
||||
conn = s3.main
|
||||
signature = base64.b64encode(hmac.new(conn.aws_secret_access_key, policy, sha).digest())
|
||||
|
||||
payload = OrderedDict([ ("key" , "foo.txt"),("AWSAccessKeyId" , conn.aws_access_key_id), \
|
||||
("acl" , "private"),("signature" , signature),("policy" , policy), \
|
||||
("Content-Type" , "text/plain"), \
|
||||
('x-amz-server-side-encryption-customer-algorithm', 'AES256'), \
|
||||
('x-amz-server-side-encryption-customer-key', 'pO3upElrwuEXSoFwCfnZPdSsmt/xWeFa0N9KgDijwVs='), \
|
||||
('x-amz-server-side-encryption-customer-key-md5', 'DWygnHRtgiJ77HCm+1rvHw=='), \
|
||||
('file', ('bar'),), ])
|
||||
|
||||
r = requests.post(url, files = payload)
|
||||
eq(r.status_code, 204)
|
||||
get_headers = {
|
||||
'x-amz-server-side-encryption-customer-algorithm': 'AES256',
|
||||
'x-amz-server-side-encryption-customer-key': 'pO3upElrwuEXSoFwCfnZPdSsmt/xWeFa0N9KgDijwVs=',
|
||||
'x-amz-server-side-encryption-customer-key-md5': 'DWygnHRtgiJ77HCm+1rvHw=='
|
||||
}
|
||||
|
||||
key = bucket.get_key("foo.txt")
|
||||
got = key.get_contents_as_string(headers=get_headers)
|
||||
eq(got, 'bar')
|
||||
|
||||
|
||||
def _test_sse_kms_customer_write(file_size):
|
||||
"""
|
||||
Tests Create a file of A's, use it to set_contents_from_file.
|
||||
Create a file of B's, use it to re-set_contents_from_file.
|
||||
Re-read the contents, and confirm we get B's
|
||||
"""
|
||||
bucket = get_new_bucket()
|
||||
sse_kms_client_headers = {
|
||||
'x-amz-server-side-encryption': 'aws:kms',
|
||||
'x-amz-server-side-encryption-aws-kms-key-id': 'testkey-1'
|
||||
}
|
||||
key = bucket.new_key('testobj')
|
||||
data = 'A'*file_size
|
||||
key.set_contents_from_string(data, headers=sse_kms_client_headers)
|
||||
rdata = key.get_contents_as_string(headers=sse_kms_client_headers)
|
||||
eq(data, rdata)
|
||||
|
||||
|
||||
@attr(resource='object')
|
||||
@attr(method='put')
|
||||
@attr(operation='Test SSE-KMS encrypted transfer 1 byte')
|
||||
@attr(assertion='success')
|
||||
@attr('encryption')
|
||||
def test_sse_kms_transfer_1b():
|
||||
_test_sse_kms_customer_write(1)
|
||||
|
||||
|
||||
@attr(resource='object')
|
||||
@attr(method='put')
|
||||
@attr(operation='Test SSE-KMS encrypted transfer 1KB')
|
||||
@attr(assertion='success')
|
||||
@attr('encryption')
|
||||
def test_sse_kms_transfer_1kb():
|
||||
_test_sse_kms_customer_write(1024)
|
||||
|
||||
|
||||
@attr(resource='object')
|
||||
@attr(method='put')
|
||||
@attr(operation='Test SSE-KMS encrypted transfer 1MB')
|
||||
@attr(assertion='success')
|
||||
@attr('encryption')
|
||||
def test_sse_kms_transfer_1MB():
|
||||
_test_sse_kms_customer_write(1024*1024)
|
||||
|
||||
|
||||
@attr(resource='object')
|
||||
@attr(method='put')
|
||||
@attr(operation='Test SSE-KMS encrypted transfer 13 bytes')
|
||||
@attr(assertion='success')
|
||||
@attr('encryption')
|
||||
def test_sse_kms_transfer_13b():
|
||||
_test_sse_kms_customer_write(13)
|
||||
|
||||
|
||||
@attr(resource='object')
|
||||
@attr(method='put')
|
||||
@attr(operation='write encrypted with SSE-KMS and read without SSE-KMS')
|
||||
@attr(assertion='operation success')
|
||||
@attr('encryption')
|
||||
def test_sse_kms_present():
|
||||
bucket = get_new_bucket()
|
||||
sse_kms_client_headers = {
|
||||
'x-amz-server-side-encryption': 'aws:kms',
|
||||
'x-amz-server-side-encryption-aws-kms-key-id': 'testkey-1'
|
||||
}
|
||||
key = bucket.new_key('testobj')
|
||||
data = 'A'*100
|
||||
key.set_contents_from_string(data, headers=sse_kms_client_headers)
|
||||
result = key.get_contents_as_string()
|
||||
eq(data, result)
|
||||
|
||||
|
||||
@attr(resource='object')
|
||||
@attr(method='put')
|
||||
@attr(operation='write encrypted with SSE-KMS but read with other key')
|
||||
@attr(assertion='operation fails')
|
||||
@attr('encryption')
|
||||
def test_sse_kms_other_key():
|
||||
bucket = get_new_bucket()
|
||||
sse_kms_client_headers_A = {
|
||||
'x-amz-server-side-encryption': 'aws:kms',
|
||||
'x-amz-server-side-encryption-aws-kms-key-id': 'testkey-1'
|
||||
}
|
||||
sse_kms_client_headers_B = {
|
||||
'x-amz-server-side-encryption': 'aws:kms',
|
||||
'x-amz-server-side-encryption-aws-kms-key-id': 'testkey-2'
|
||||
}
|
||||
key = bucket.new_key('testobj')
|
||||
data = 'A'*100
|
||||
key.set_contents_from_string(data, headers=sse_kms_client_headers_A)
|
||||
result = key.get_contents_as_string(headers=sse_kms_client_headers_B)
|
||||
eq(data, result)
|
||||
|
||||
|
||||
@attr(resource='object')
|
||||
@attr(method='put')
|
||||
@attr(operation='declare SSE-KMS but do not provide key_id')
|
||||
@attr(assertion='operation fails')
|
||||
@attr('encryption')
|
||||
def test_sse_kms_no_key():
|
||||
bucket = get_new_bucket()
|
||||
sse_kms_client_headers = {
|
||||
'x-amz-server-side-encryption': 'aws:kms'
|
||||
}
|
||||
key = bucket.new_key('testobj')
|
||||
data = 'A'*100
|
||||
e = assert_raises(boto.exception.S3ResponseError,
|
||||
key.set_contents_from_string, data, headers=sse_kms_client_headers)
|
||||
|
||||
|
||||
@attr(resource='object')
|
||||
@attr(method='put')
|
||||
@attr(operation='Do not declare SSE-KMS but provide key_id')
|
||||
@attr(assertion='operation successfull, no encryption')
|
||||
@attr('encryption')
|
||||
def test_sse_kms_not_declared():
|
||||
bucket = get_new_bucket()
|
||||
sse_kms_client_headers = {
|
||||
'x-amz-server-side-encryption-aws-kms-key-id': 'testkey-2'
|
||||
}
|
||||
key = bucket.new_key('testobj')
|
||||
data = 'A'*100
|
||||
key.set_contents_from_string(data, headers=sse_kms_client_headers)
|
||||
rdata = key.get_contents_as_string()
|
||||
eq(data, rdata)
|
||||
|
||||
|
||||
@attr(resource='object')
|
||||
@attr(method='put')
|
||||
@attr(operation='complete KMS multi-part upload')
|
||||
@attr(assertion='successful')
|
||||
@attr('encryption')
|
||||
def test_sse_kms_multipart_upload():
|
||||
bucket = get_new_bucket()
|
||||
key = "multipart_enc"
|
||||
content_type = 'text/plain'
|
||||
objlen = 30 * 1024 * 1024
|
||||
enc_headers = {
|
||||
'x-amz-server-side-encryption': 'aws:kms',
|
||||
'x-amz-server-side-encryption-aws-kms-key-id': 'testkey-2',
|
||||
'Content-Type': content_type
|
||||
}
|
||||
(upload, data) = _multipart_upload_enc(bucket, key, objlen,
|
||||
init_headers=enc_headers, part_headers=enc_headers,
|
||||
metadata={'foo': 'bar'})
|
||||
upload.complete_upload()
|
||||
result = _head_bucket(bucket)
|
||||
|
||||
eq(result.get('x-rgw-object-count', 1), 1)
|
||||
eq(result.get('x-rgw-bytes-used', 30 * 1024 * 1024), 30 * 1024 * 1024)
|
||||
|
||||
k = bucket.get_key(key)
|
||||
eq(k.metadata['foo'], 'bar')
|
||||
eq(k.content_type, content_type)
|
||||
test_string = k.get_contents_as_string(headers=enc_headers)
|
||||
eq(len(test_string), k.size)
|
||||
eq(data, test_string)
|
||||
eq(test_string, data)
|
||||
|
||||
_check_content_using_range_enc(k, data, 1000000, enc_headers=enc_headers)
|
||||
_check_content_using_range_enc(k, data, 10000000, enc_headers=enc_headers)
|
||||
|
||||
|
||||
@attr(resource='object')
|
||||
@attr(method='put')
|
||||
@attr(operation='multipart KMS upload with bad key_id for uploading chunks')
|
||||
@attr(assertion='successful')
|
||||
@attr('encryption')
|
||||
def test_sse_kms_multipart_invalid_chunks_1():
|
||||
bucket = get_new_bucket()
|
||||
key = "multipart_enc"
|
||||
content_type = 'text/bla'
|
||||
objlen = 30 * 1024 * 1024
|
||||
init_headers = {
|
||||
'x-amz-server-side-encryption': 'aws:kms',
|
||||
'x-amz-server-side-encryption-aws-kms-key-id': 'testkey-1',
|
||||
'Content-Type': content_type
|
||||
}
|
||||
part_headers = {
|
||||
'x-amz-server-side-encryption': 'aws:kms',
|
||||
'x-amz-server-side-encryption-aws-kms-key-id': 'testkey-2'
|
||||
}
|
||||
_multipart_upload_enc(bucket, key, objlen,
|
||||
init_headers=init_headers, part_headers=part_headers,
|
||||
metadata={'foo': 'bar'})
|
||||
|
||||
|
||||
@attr(resource='object')
|
||||
@attr(method='put')
|
||||
@attr(operation='multipart KMS upload with unexistent key_id for chunks')
|
||||
@attr(assertion='successful')
|
||||
@attr('encryption')
|
||||
def test_sse_kms_multipart_invalid_chunks_2():
|
||||
bucket = get_new_bucket()
|
||||
key = "multipart_enc"
|
||||
content_type = 'text/plain'
|
||||
objlen = 30 * 1024 * 1024
|
||||
init_headers = {
|
||||
'x-amz-server-side-encryption': 'aws:kms',
|
||||
'x-amz-server-side-encryption-aws-kms-key-id': 'testkey-1',
|
||||
'Content-Type': content_type
|
||||
}
|
||||
part_headers = {
|
||||
'x-amz-server-side-encryption': 'aws:kms',
|
||||
'x-amz-server-side-encryption-aws-kms-key-id': 'testkey-not-present'
|
||||
}
|
||||
_multipart_upload_enc(bucket, key, objlen,
|
||||
init_headers=init_headers, part_headers=part_headers,
|
||||
metadata={'foo': 'bar'})
|
||||
|
||||
|
||||
@attr(resource='object')
|
||||
@attr(method='post')
|
||||
@attr(operation='authenticated KMS browser based upload via POST request')
|
||||
@attr(assertion='succeeds and returns written data')
|
||||
@attr('encryption')
|
||||
def test_sse_kms_post_object_authenticated_request():
|
||||
bucket = get_new_bucket()
|
||||
|
||||
url = _get_post_url(s3.main, bucket)
|
||||
|
||||
utc = pytz.utc
|
||||
expires = datetime.datetime.now(utc) + datetime.timedelta(seconds=+6000)
|
||||
|
||||
policy_document = {"expiration": expires.strftime("%Y-%m-%dT%H:%M:%SZ"), \
|
||||
"conditions": [ \
|
||||
{"bucket": bucket.name}, \
|
||||
["starts-with", "$key", "foo"], \
|
||||
{"acl": "private"}, \
|
||||
["starts-with", "$Content-Type", "text/plain"], \
|
||||
["starts-with", "$x-amz-server-side-encryption", ""], \
|
||||
["starts-with", "$x-amz-server-side-encryption-aws-kms-key-id", ""], \
|
||||
["content-length-range", 0, 1024] \
|
||||
] \
|
||||
}
|
||||
|
||||
json_policy_document = json.JSONEncoder().encode(policy_document)
|
||||
policy = base64.b64encode(json_policy_document)
|
||||
conn = s3.main
|
||||
signature = base64.b64encode(hmac.new(conn.aws_secret_access_key, policy, sha).digest())
|
||||
|
||||
payload = OrderedDict([ ("key" , "foo.txt"),("AWSAccessKeyId" , conn.aws_access_key_id), \
|
||||
("acl" , "private"),("signature" , signature),("policy" , policy), \
|
||||
("Content-Type" , "text/plain"), \
|
||||
('x-amz-server-side-encryption', 'aws:kms'), \
|
||||
('x-amz-server-side-encryption-aws-kms-key-id', 'testkey-1'), \
|
||||
('file', ('bar'),), ])
|
||||
|
||||
r = requests.post(url, files = payload)
|
||||
eq(r.status_code, 204)
|
||||
get_headers = {
|
||||
'x-amz-server-side-encryption': 'aws:kms',
|
||||
'x-amz-server-side-encryption-aws-kms-key-id': 'testkey-1',
|
||||
}
|
||||
|
||||
key = bucket.get_key("foo.txt")
|
||||
got = key.get_contents_as_string(headers=get_headers)
|
||||
eq(got, 'bar')
|
||||
|
|
Loading…
Reference in a new issue