Merge pull request #306 from hairesis/wip-sse-kms-tests-refactor

Passing sse-kms keys from configuration instead of hard coding in tests
This commit is contained in:
Ali Maredia 2019-10-03 12:08:44 -04:00 committed by GitHub
commit 65f3441636
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
2 changed files with 123 additions and 140 deletions

View file

@ -169,8 +169,11 @@ def setup():
try: try:
config.main_kms_keyid = cfg.get('s3 main',"kms_keyid") config.main_kms_keyid = cfg.get('s3 main',"kms_keyid")
except (ConfigParser.NoSectionError, ConfigParser.NoOptionError): except (ConfigParser.NoSectionError, ConfigParser.NoOptionError):
config.main_kms_keyid = None config.main_kms_keyid = 'testkey-1'
pass try:
config.main_kms_keyid2 = cfg.get('s3 main',"kms_keyid2")
except (ConfigParser.NoSectionError, ConfigParser.NoOptionError):
config.main_kms_keyid2 = 'testkey-2'
try: try:
config.main_api_name = cfg.get('s3 main',"api_name") config.main_api_name = cfg.get('s3 main',"api_name")
@ -356,6 +359,9 @@ def get_main_api_name():
def get_main_kms_keyid(): def get_main_kms_keyid():
return config.main_kms_keyid return config.main_kms_keyid
def get_secondary_kms_keyid():
return config.main_kms_keyid2
def get_alt_aws_access_key(): def get_alt_aws_access_key():
return config.alt_access_key return config.alt_access_key

View file

@ -68,6 +68,7 @@ from . import (
get_buckets_list, get_buckets_list,
get_objects_list, get_objects_list,
get_main_kms_keyid, get_main_kms_keyid,
get_secondary_kms_keyid,
nuke_prefixed_buckets, nuke_prefixed_buckets,
) )
@ -4395,7 +4396,6 @@ def test_bucket_acl_canned_publicreadwrite():
display_name = get_main_display_name() display_name = get_main_display_name()
user_id = get_main_user_id() user_id = get_main_user_id()
grants = response['Grants'] grants = response['Grants']
check_grants( check_grants(
grants, grants,
@ -9765,52 +9765,22 @@ def _test_sse_kms_customer_write(file_size, key_id = 'testkey-1'):
eq(body, data) eq(body, data)
@attr(resource='object')
@attr(method='put')
@attr(operation='Test SSE-KMS encrypted transfer 1 byte')
@attr(assertion='success')
@attr('encryption')
def test_sse_kms_transfer_1b():
_test_sse_kms_customer_write(1)
@attr(resource='object')
@attr(method='put')
@attr(operation='Test SSE-KMS encrypted transfer 1KB')
@attr(assertion='success')
@attr('encryption')
def test_sse_kms_transfer_1kb():
_test_sse_kms_customer_write(1024)
@attr(resource='object')
@attr(method='put')
@attr(operation='Test SSE-KMS encrypted transfer 1MB')
@attr(assertion='success')
@attr('encryption')
def test_sse_kms_transfer_1MB():
_test_sse_kms_customer_write(1024*1024)
@attr(resource='object')
@attr(method='put')
@attr(operation='Test SSE-KMS encrypted transfer 13 bytes')
@attr(assertion='success')
@attr('encryption')
def test_sse_kms_transfer_13b():
_test_sse_kms_customer_write(13)
@attr(resource='object') @attr(resource='object')
@attr(method='head') @attr(method='head')
@attr(operation='Test SSE-KMS encrypted does perform head properly') @attr(operation='Test SSE-KMS encrypted does perform head properly')
@attr(assertion='success') @attr(assertion='success')
@attr('encryption') @attr('encryption')
def test_sse_kms_method_head(): def test_sse_kms_method_head():
kms_keyid = get_main_kms_keyid()
bucket_name = get_new_bucket() bucket_name = get_new_bucket()
client = get_client() client = get_client()
sse_kms_client_headers = { sse_kms_client_headers = {
'x-amz-server-side-encryption': 'aws:kms', 'x-amz-server-side-encryption': 'aws:kms',
'x-amz-server-side-encryption-aws-kms-key-id': 'testkey-1' 'x-amz-server-side-encryption-aws-kms-key-id': kms_keyid
} }
data = 'A'*1000 data = 'A'*1000
key = 'testobj' key = 'testobj'
@ -9821,7 +9791,7 @@ def test_sse_kms_method_head():
response = client.head_object(Bucket=bucket_name, Key=key) response = client.head_object(Bucket=bucket_name, Key=key)
eq(response['ResponseMetadata']['HTTPHeaders']['x-amz-server-side-encryption'], 'aws:kms') eq(response['ResponseMetadata']['HTTPHeaders']['x-amz-server-side-encryption'], 'aws:kms')
eq(response['ResponseMetadata']['HTTPHeaders']['x-amz-server-side-encryption-aws-kms-key-id'], 'testkey-1') eq(response['ResponseMetadata']['HTTPHeaders']['x-amz-server-side-encryption-aws-kms-key-id'], kms_keyid)
lf = (lambda **kwargs: kwargs['params']['headers'].update(sse_kms_client_headers)) lf = (lambda **kwargs: kwargs['params']['headers'].update(sse_kms_client_headers))
client.meta.events.register('before-call.s3.HeadObject', lf) client.meta.events.register('before-call.s3.HeadObject', lf)
@ -9835,11 +9805,12 @@ def test_sse_kms_method_head():
@attr(assertion='operation success') @attr(assertion='operation success')
@attr('encryption') @attr('encryption')
def test_sse_kms_present(): def test_sse_kms_present():
kms_keyid = get_main_kms_keyid()
bucket_name = get_new_bucket() bucket_name = get_new_bucket()
client = get_client() client = get_client()
sse_kms_client_headers = { sse_kms_client_headers = {
'x-amz-server-side-encryption': 'aws:kms', 'x-amz-server-side-encryption': 'aws:kms',
'x-amz-server-side-encryption-aws-kms-key-id': 'testkey-1' 'x-amz-server-side-encryption-aws-kms-key-id': kms_keyid
} }
data = 'A'*100 data = 'A'*100
key = 'testobj' key = 'testobj'
@ -9899,6 +9870,7 @@ def test_sse_kms_not_declared():
@attr(assertion='successful') @attr(assertion='successful')
@attr('encryption') @attr('encryption')
def test_sse_kms_multipart_upload(): def test_sse_kms_multipart_upload():
kms_keyid = get_main_kms_keyid()
bucket_name = get_new_bucket() bucket_name = get_new_bucket()
client = get_client() client = get_client()
key = "multipart_enc" key = "multipart_enc"
@ -9907,7 +9879,7 @@ def test_sse_kms_multipart_upload():
metadata = {'foo': 'bar'} metadata = {'foo': 'bar'}
enc_headers = { enc_headers = {
'x-amz-server-side-encryption': 'aws:kms', 'x-amz-server-side-encryption': 'aws:kms',
'x-amz-server-side-encryption-aws-kms-key-id': 'testkey-2', 'x-amz-server-side-encryption-aws-kms-key-id': kms_keyid,
'Content-Type': content_type 'Content-Type': content_type
} }
resend_parts = [] resend_parts = []
@ -9948,6 +9920,8 @@ def test_sse_kms_multipart_upload():
@attr(assertion='successful') @attr(assertion='successful')
@attr('encryption') @attr('encryption')
def test_sse_kms_multipart_invalid_chunks_1(): def test_sse_kms_multipart_invalid_chunks_1():
kms_keyid = get_main_kms_keyid()
kms_keyid2 = get_secondary_kms_keyid()
bucket_name = get_new_bucket() bucket_name = get_new_bucket()
client = get_client() client = get_client()
key = "multipart_enc" key = "multipart_enc"
@ -9956,12 +9930,12 @@ def test_sse_kms_multipart_invalid_chunks_1():
metadata = {'foo': 'bar'} metadata = {'foo': 'bar'}
init_headers = { init_headers = {
'x-amz-server-side-encryption': 'aws:kms', 'x-amz-server-side-encryption': 'aws:kms',
'x-amz-server-side-encryption-aws-kms-key-id': 'testkey-1', 'x-amz-server-side-encryption-aws-kms-key-id': kms_keyid,
'Content-Type': content_type 'Content-Type': content_type
} }
part_headers = { part_headers = {
'x-amz-server-side-encryption': 'aws:kms', 'x-amz-server-side-encryption': 'aws:kms',
'x-amz-server-side-encryption-aws-kms-key-id': 'testkey-2' 'x-amz-server-side-encryption-aws-kms-key-id': kms_keyid2
} }
resend_parts = [] resend_parts = []
@ -9976,6 +9950,7 @@ def test_sse_kms_multipart_invalid_chunks_1():
@attr(assertion='successful') @attr(assertion='successful')
@attr('encryption') @attr('encryption')
def test_sse_kms_multipart_invalid_chunks_2(): def test_sse_kms_multipart_invalid_chunks_2():
kms_keyid = get_main_kms_keyid()
bucket_name = get_new_bucket() bucket_name = get_new_bucket()
client = get_client() client = get_client()
key = "multipart_enc" key = "multipart_enc"
@ -9984,7 +9959,7 @@ def test_sse_kms_multipart_invalid_chunks_2():
metadata = {'foo': 'bar'} metadata = {'foo': 'bar'}
init_headers = { init_headers = {
'x-amz-server-side-encryption': 'aws:kms', 'x-amz-server-side-encryption': 'aws:kms',
'x-amz-server-side-encryption-aws-kms-key-id': 'testkey-1', 'x-amz-server-side-encryption-aws-kms-key-id': kms_keyid,
'Content-Type': content_type 'Content-Type': content_type
} }
part_headers = { part_headers = {
@ -9997,12 +9972,14 @@ def test_sse_kms_multipart_invalid_chunks_2():
init_headers=init_headers, part_headers=part_headers, metadata=metadata, init_headers=init_headers, part_headers=part_headers, metadata=metadata,
resend_parts=resend_parts) resend_parts=resend_parts)
@attr(resource='object') @attr(resource='object')
@attr(method='post') @attr(method='post')
@attr(operation='authenticated KMS browser based upload via POST request') @attr(operation='authenticated KMS browser based upload via POST request')
@attr(assertion='succeeds and returns written data') @attr(assertion='succeeds and returns written data')
@attr('encryption') @attr('encryption')
def test_sse_kms_post_object_authenticated_request(): def test_sse_kms_post_object_authenticated_request():
kms_keyid = get_main_kms_keyid()
bucket_name = get_new_bucket() bucket_name = get_new_bucket()
client = get_client() client = get_client()
@ -10034,7 +10011,7 @@ def test_sse_kms_post_object_authenticated_request():
("acl" , "private"),("signature" , signature),("policy" , policy),\ ("acl" , "private"),("signature" , signature),("policy" , policy),\
("Content-Type" , "text/plain"), ("Content-Type" , "text/plain"),
('x-amz-server-side-encryption', 'aws:kms'), \ ('x-amz-server-side-encryption', 'aws:kms'), \
('x-amz-server-side-encryption-aws-kms-key-id', 'testkey-1'), \ ('x-amz-server-side-encryption-aws-kms-key-id', kms_keyid), \
('file', ('bar'))]) ('file', ('bar'))])
r = requests.post(url, files = payload) r = requests.post(url, files = payload)
@ -10049,7 +10026,7 @@ def test_sse_kms_post_object_authenticated_request():
@attr(operation='Test SSE-KMS encrypted transfer 1 byte') @attr(operation='Test SSE-KMS encrypted transfer 1 byte')
@attr(assertion='success') @attr(assertion='success')
@attr('encryption') @attr('encryption')
def test_sse_kms_barb_transfer_1b(): def test_sse_kms_transfer_1b():
kms_keyid = get_main_kms_keyid() kms_keyid = get_main_kms_keyid()
if kms_keyid is None: if kms_keyid is None:
raise SkipTest raise SkipTest
@ -10061,7 +10038,7 @@ def test_sse_kms_barb_transfer_1b():
@attr(operation='Test SSE-KMS encrypted transfer 1KB') @attr(operation='Test SSE-KMS encrypted transfer 1KB')
@attr(assertion='success') @attr(assertion='success')
@attr('encryption') @attr('encryption')
def test_sse_kms_barb_transfer_1kb(): def test_sse_kms_transfer_1kb():
kms_keyid = get_main_kms_keyid() kms_keyid = get_main_kms_keyid()
if kms_keyid is None: if kms_keyid is None:
raise SkipTest raise SkipTest
@ -10073,7 +10050,7 @@ def test_sse_kms_barb_transfer_1kb():
@attr(operation='Test SSE-KMS encrypted transfer 1MB') @attr(operation='Test SSE-KMS encrypted transfer 1MB')
@attr(assertion='success') @attr(assertion='success')
@attr('encryption') @attr('encryption')
def test_sse_kms_barb_transfer_1MB(): def test_sse_kms_transfer_1MB():
kms_keyid = get_main_kms_keyid() kms_keyid = get_main_kms_keyid()
if kms_keyid is None: if kms_keyid is None:
raise SkipTest raise SkipTest
@ -10085,7 +10062,7 @@ def test_sse_kms_barb_transfer_1MB():
@attr(operation='Test SSE-KMS encrypted transfer 13 bytes') @attr(operation='Test SSE-KMS encrypted transfer 13 bytes')
@attr(assertion='success') @attr(assertion='success')
@attr('encryption') @attr('encryption')
def test_sse_kms_barb_transfer_13b(): def test_sse_kms_transfer_13b():
kms_keyid = get_main_kms_keyid() kms_keyid = get_main_kms_keyid()
if kms_keyid is None: if kms_keyid is None:
raise SkipTest raise SkipTest