Merge pull request #158 from zhangsw/lifecycle-noncurrent

Test case for non-current expiration,multipart upload expiration and delete marker expiration

Reviewed-by: Daniel Gryniewicz <dang@redhat.com>
This commit is contained in:
Yehuda Sadeh 2017-05-04 09:37:12 -07:00 committed by GitHub
commit 77241f587f

View file

@ -7533,6 +7533,175 @@ def test_lifecycle_rules_conflicted():
eq(e.status, 400) eq(e.status, 400)
eq(e.error_code, 'InvalidRequest') eq(e.error_code, 'InvalidRequest')
def generate_lifecycle_body(rules):
body = '<?xml version="1.0" encoding="UTF-8"?><LifecycleConfiguration>'
for rule in rules:
body += '<Rule><ID>%s</ID><Prefix>%s</Prefix><Status>%s</Status>' % (rule['ID'], rule['Prefix'], rule['Status'])
if 'Expiration' in rule.keys():
if 'ExpiredObjectDeleteMarker' in rule['Expiration'].keys():
body += '<Expiration><ExpiredObjectDeleteMarker>%s</ExpiredObjectDeleteMarker></Expiration>' \
% rule['Expiration']['ExpiredObjectDeleteMarker']
else:
body += '<Expiration><Days>%d</Days></Expiration>' % rule['Expiration']['Days']
if 'NoncurrentVersionExpiration' in rule.keys():
body += '<NoncurrentVersionExpiration><NoncurrentDays>%d</NoncurrentDays></NoncurrentVersionExpiration>' % \
rule['NoncurrentVersionExpiration']['NoncurrentDays']
if 'AbortIncompleteMultipartUpload' in rule.keys():
body += '<AbortIncompleteMultipartUpload><DaysAfterInitiation>%d</DaysAfterInitiation>' \
'</AbortIncompleteMultipartUpload>' % rule['AbortIncompleteMultipartUpload']['DaysAfterInitiation']
body += '</Rule>'
body += '</LifecycleConfiguration>'
return body
@attr(resource='bucket')
@attr(method='put')
@attr(operation='set lifecycle config with noncurrent version expiration')
@attr('lifecycle')
def test_lifecycle_set_noncurrent():
bucket = get_new_bucket()
rules = [
{'ID': 'rule1', 'Prefix': 'test1/', 'Status': 'Enabled', 'NoncurrentVersionExpiration': {'NoncurrentDays': 2}},
{'ID': 'rule2', 'Prefix': 'test2/', 'Status': 'Disabled', 'NoncurrentVersionExpiration': {'NoncurrentDays': 3}}
]
body = generate_lifecycle_body(rules)
fp = StringIO(body)
md5 = boto.utils.compute_md5(fp)
headers = {'Content-MD5': md5[1], 'Content-Type': 'text/xml'}
res = bucket.connection.make_request('PUT', bucket.name, data=fp.getvalue(), query_args='lifecycle',
headers=headers)
eq(res.status, 200)
eq(res.reason, 'OK')
# The test harnass for lifecycle is configured to treat days as 2 second intervals.
@attr(resource='bucket')
@attr(method='put')
@attr(operation='test lifecycle non-current version expiration')
@attr('lifecycle')
@attr('fails_on_aws')
def test_lifecycle_noncur_expiration():
bucket = get_new_bucket()
check_configure_versioning_retry(bucket, True, "Enabled")
create_multiple_versions(bucket, "test1/a", 3)
create_multiple_versions(bucket, "test2/abc", 3)
init_keys = bucket.get_all_versions()
rules = [
{'ID': 'rule1', 'Prefix': 'test1/', 'Status': 'Enabled', 'NoncurrentVersionExpiration': {'NoncurrentDays': 2}}
]
body = generate_lifecycle_body(rules)
fp = StringIO(body)
md5 = boto.utils.compute_md5(fp)
headers = {'Content-MD5': md5[1], 'Content-Type': 'text/xml'}
bucket.connection.make_request('PUT', bucket.name, data=fp.getvalue(), query_args='lifecycle',
headers=headers)
time.sleep(50)
expire_keys = bucket.get_all_versions()
eq(len(init_keys), 6)
eq(len(expire_keys), 4)
@attr(resource='bucket')
@attr(method='put')
@attr(operation='set lifecycle config with delete marker expiration')
@attr('lifecycle')
def test_lifecycle_set_deletemarker():
bucket = get_new_bucket()
rules = [
{'ID': 'rule1', 'Prefix': 'test1/', 'Status': 'Enabled', 'Expiration': {'ExpiredObjectDeleteMarker': 'true'}}
]
body = generate_lifecycle_body(rules)
fp = StringIO(body)
md5 = boto.utils.compute_md5(fp)
headers = {'Content-MD5': md5[1], 'Content-Type': 'text/xml'}
res = bucket.connection.make_request('PUT', bucket.name, data=fp.getvalue(), query_args='lifecycle',
headers=headers)
eq(res.status, 200)
eq(res.reason, 'OK')
# The test harnass for lifecycle is configured to treat days as 1 second intervals.
@attr(resource='bucket')
@attr(method='put')
@attr(operation='test lifecycle delete marker expiration')
@attr('lifecycle')
@attr('fails_on_aws')
def test_lifecycle_deletemarker_expiration():
bucket = get_new_bucket()
check_configure_versioning_retry(bucket, True, "Enabled")
create_multiple_versions(bucket, "test1/a", 1)
create_multiple_versions(bucket, "test2/abc", 1)
bucket.delete_key('test1/a')
bucket.delete_key('test2/abc')
init_keys = bucket.get_all_versions()
rules = [
{'ID': 'rule1', 'Prefix': 'test1/', 'Status': 'Enabled', 'Expiration': {'ExpiredObjectDeleteMarker': 'true'},
'NoncurrentVersionExpiration': {'NoncurrentDays': 1}}
]
body = generate_lifecycle_body(rules)
fp = StringIO(body)
md5 = boto.utils.compute_md5(fp)
headers = {'Content-MD5': md5[1], 'Content-Type': 'text/xml'}
bucket.connection.make_request('PUT', bucket.name, data=fp.getvalue(), query_args='lifecycle',
headers=headers)
time.sleep(50)
expire_keys = bucket.get_all_versions()
eq(len(init_keys), 4)
eq(len(expire_keys), 2)
@attr(resource='bucket')
@attr(method='put')
@attr(operation='set lifecycle config with multipart expiration')
@attr('lifecycle')
def test_lifecycle_set_multipart():
bucket = get_new_bucket()
rules = [
{'ID': 'rule1', 'Prefix': 'test1/', 'Status': 'Enabled',
'AbortIncompleteMultipartUpload': {'DaysAfterInitiation': 2}},
{'ID': 'rule2', 'Prefix': 'test2/', 'Status': 'Disabled',
'AbortIncompleteMultipartUpload': {'DaysAfterInitiation': 3}}
]
body = generate_lifecycle_body(rules)
fp = StringIO(body)
md5 = boto.utils.compute_md5(fp)
headers = {'Content-MD5': md5[1], 'Content-Type': 'text/xml'}
res = bucket.connection.make_request('PUT', bucket.name, data=fp.getvalue(), query_args='lifecycle',
headers=headers)
eq(res.status, 200)
eq(res.reason, 'OK')
# The test harnass for lifecycle is configured to treat days as 1 second intervals.
@attr(resource='bucket')
@attr(method='put')
@attr(operation='test lifecycle multipart expiration')
@attr('lifecycle')
@attr('fails_on_aws')
def test_lifecycle_multipart_expiration():
bucket = get_new_bucket()
key_names = ['test1/a', 'test2/']
for key_name in key_names:
bucket.initiate_multipart_upload(key_name)
init_keys = bucket.get_all_multipart_uploads()
rules = [
{'ID': 'rule1', 'Prefix': 'test1/', 'Status': 'Enabled',
'AbortIncompleteMultipartUpload': {'DaysAfterInitiation': 2}}
]
body = generate_lifecycle_body(rules)
fp = StringIO(body)
md5 = boto.utils.compute_md5(fp)
headers = {'Content-MD5': md5[1], 'Content-Type': 'text/xml'}
bucket.connection.make_request('PUT', bucket.name, data=fp.getvalue(), query_args='lifecycle',
headers=headers)
time.sleep(50)
expire_keys = bucket.get_all_multipart_uploads()
eq(len(init_keys), 2)
eq(len(expire_keys), 1)
def _test_encryption_sse_customer_write(file_size): def _test_encryption_sse_customer_write(file_size):
""" """
Tests Create a file of A's, use it to set_contents_from_file. Tests Create a file of A's, use it to set_contents_from_file.