Merge pull request #250 from yehudasa/wip-rgw-lifecycle-trans

lifecycle transitions, and storage class related tests
This commit is contained in:
Ali Maredia 2019-01-22 12:01:50 -05:00 committed by GitHub
commit a2c6edff27
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
2 changed files with 413 additions and 5 deletions

View file

@ -319,6 +319,7 @@ def setup():
'port', 'port',
'is_secure', 'is_secure',
'kms_keyid', 'kms_keyid',
'storage_classes',
]: ]:
try: try:
config[name][var] = cfg.get(section, var) config[name][var] = cfg.get(section, var)

View file

@ -29,6 +29,7 @@ import re
import xml.etree.ElementTree as ET import xml.etree.ElementTree as ET
from collections import namedtuple from collections import namedtuple
from collections import defaultdict
from email.Utils import formatdate from email.Utils import formatdate
from httplib import HTTPConnection, HTTPSConnection from httplib import HTTPConnection, HTTPSConnection
from urlparse import urlparse from urlparse import urlparse
@ -5297,12 +5298,18 @@ def generate_random(size, part_size=5*1024*1024):
if (x == size): if (x == size):
return return
def _multipart_upload(bucket, s3_key_name, size, part_size=5*1024*1024, do_list=None, headers=None, metadata=None, resend_parts=[]): def _multipart_upload(bucket, s3_key_name, size, part_size=5*1024*1024, do_list=None, headers=None, metadata=None, storage_class=None, resend_parts=[]):
""" """
generate a multi-part upload for a random file of specifed size, generate a multi-part upload for a random file of specifed size,
if requested, generate a list of the parts if requested, generate a list of the parts
return the upload descriptor return the upload descriptor
""" """
if storage_class is not None:
if not headers:
headers = {}
headers['X-Amz-Storage-Class'] = storage_class
upload = bucket.initiate_multipart_upload(s3_key_name, headers=headers, metadata=metadata) upload = bucket.initiate_multipart_upload(s3_key_name, headers=headers, metadata=metadata)
s = '' s = ''
for i, part in enumerate(generate_random(size, part_size)): for i, part in enumerate(generate_random(size, part_size)):
@ -5334,12 +5341,20 @@ def _multipart_copy(src_bucketname, src_keyname, dst_bucket, dst_keyname, size,
return upload return upload
def _create_key_with_random_content(keyname, size=7*1024*1024, bucket=None): def _populate_key(bucket, keyname, size=7*1024*1024, storage_class=None):
if bucket is None: if bucket is None:
bucket = get_new_bucket() bucket = get_new_bucket()
key = bucket.new_key(keyname) key = bucket.new_key(keyname)
data = StringIO(str(generate_random(size, size).next())) if storage_class:
key.storage_class = storage_class
data_str = str(generate_random(size, size).next())
data = StringIO(data_str)
key.set_contents_from_file(fp=data) key.set_contents_from_file(fp=data)
return (key, data_str)
def _create_key_with_random_content(keyname, size=7*1024*1024, bucket=None):
bucket = get_new_bucket()
key, _ = _populate_key(bucket, keyname, size)
return (bucket, key) return (bucket, key)
@attr(resource='object') @attr(resource='object')
@ -5758,6 +5773,153 @@ def test_multipart_upload_incorrect_etag():
eq(e.reason.lower(), 'bad request') # some proxies vary the case eq(e.reason.lower(), 'bad request') # some proxies vary the case
eq(e.error_code, 'InvalidPart') eq(e.error_code, 'InvalidPart')
def verify_object(bucket, k, data=None, storage_class=None):
if storage_class:
eq(k.storage_class, storage_class)
if data:
read_data = k.get_contents_as_string()
equal = data == read_data # avoid spamming log if data not equal
eq(equal, True)
def copy_object_storage_class(src_bucket, src_key, dest_bucket, dest_key, storage_class):
query_args=None
if dest_key.version_id:
query_arg='versionId={v}'.format(v=dest_key.version_id)
headers = {}
headers['X-Amz-Copy-Source'] = '/{bucket}/{object}'.format(bucket=src_bucket.name, object=src_key.name)
if src_key.version_id:
headers['X-Amz-Copy-Source-Version-Id'] = src_key.version_id
headers['X-Amz-Storage-Class'] = storage_class
res = dest_bucket.connection.make_request('PUT', dest_bucket.name, dest_key.name,
query_args=query_args, headers=headers)
eq(res.status, 200)
def _populate_multipart_key(bucket, kname, size, storage_class=None):
(upload, data) = _multipart_upload(bucket, kname, size, storage_class=storage_class)
upload.complete_upload()
k = bucket.get_key(kname)
return (k, data)
@attr(resource='object')
@attr(method='put')
@attr(operation='test create object with storage class')
@attr('storage_class')
@attr('fails_on_aws')
def test_object_storage_class():
sc = configured_storage_classes()
if len(sc) < 2:
raise SkipTest
bucket = get_new_bucket()
for storage_class in sc:
kname = 'foo-' + storage_class
k, data = _populate_key(bucket, kname, size=9*1024*1024, storage_class=storage_class)
verify_object(bucket, k, data, storage_class)
@attr(resource='object')
@attr(method='put')
@attr(operation='test create multipart object with storage class')
@attr('storage_class')
@attr('fails_on_aws')
def test_object_storage_class_multipart():
sc = configured_storage_classes()
if len(sc) < 2:
raise SkipTest
bucket = get_new_bucket()
size = 11 * 1024 * 1024
for storage_class in sc:
key = "mymultipart-" + storage_class
(upload, data) = _multipart_upload(bucket, key, size, storage_class=storage_class)
upload.complete_upload()
key2 = bucket.get_key(key)
eq(key2.size, size)
eq(key2.storage_class, storage_class)
def _do_test_object_modify_storage_class(obj_write_func, size):
sc = configured_storage_classes()
if len(sc) < 2:
raise SkipTest
bucket = get_new_bucket()
for storage_class in sc:
kname = 'foo-' + storage_class
k, data = obj_write_func(bucket, kname, size, storage_class=storage_class)
verify_object(bucket, k, data, storage_class)
for new_storage_class in sc:
if new_storage_class == storage_class:
continue
copy_object_storage_class(bucket, k, bucket, k, new_storage_class)
verify_object(bucket, k, data, storage_class)
@attr(resource='object')
@attr(method='put')
@attr(operation='test changing objects storage class')
@attr('storage_class')
@attr('fails_on_aws')
def test_object_modify_storage_class():
_do_test_object_modify_storage_class(_populate_key, size=9*1024*1024)
@attr(resource='object')
@attr(method='put')
@attr(operation='test changing objects storage class')
@attr('storage_class')
@attr('fails_on_aws')
def test_object_modify_storage_class_multipart():
_do_test_object_modify_storage_class(_populate_multipart_key, size=11*1024*1024)
def _do_test_object_storage_class_copy(obj_write_func, size):
sc = configured_storage_classes()
if len(sc) < 2:
raise SkipTest
src_bucket = get_new_bucket()
dest_bucket = get_new_bucket()
kname = 'foo'
src_key, data = obj_write_func(src_bucket, kname, size)
verify_object(src_bucket, src_key, data)
for new_storage_class in sc:
if new_storage_class == src_key.storage_class:
continue
dest_key = dest_bucket.get_key('foo-' + new_storage_class, validate=False)
copy_object_storage_class(src_bucket, src_key, dest_bucket, dest_key, new_storage_class)
verify_object(dest_bucket, dest_key, data, new_storage_class)
@attr(resource='object')
@attr(method='copy')
@attr(operation='test copy object to object with different storage class')
@attr('storage_class')
@attr('fails_on_aws')
def test_object_storage_class_copy():
_do_test_object_storage_class_copy(_populate_key, size=9*1024*1024)
@attr(resource='object')
@attr(method='copy')
@attr(operation='test changing objects storage class')
@attr('storage_class')
@attr('fails_on_aws')
def test_object_storage_class_copy_multipart():
_do_test_object_storage_class_copy(_populate_multipart_key, size=9*1024*1024)
def _simple_http_req_100_cont(host, port, is_secure, method, resource): def _simple_http_req_100_cont(host, port, is_secure, method, resource):
""" """
Send the specified request w/expect 100-continue Send the specified request w/expect 100-continue
@ -7629,10 +7791,21 @@ def create_lifecycle(days = None, prefix = 'test/', rules = None):
lifecycle.append(rule) lifecycle.append(rule)
else: else:
for rule in rules: for rule in rules:
expiration = boto.s3.lifecycle.Expiration(days=rule['days']) expiration = None
transition = None
try:
expiration = boto.s3.lifecycle.Expiration(days=rule['days'])
except:
pass
try:
transition = rule['transition']
except:
pass
_id = rule.get('id',None) _id = rule.get('id',None)
rule = boto.s3.lifecycle.Rule(id=_id, prefix=rule['prefix'], rule = boto.s3.lifecycle.Rule(id=_id, prefix=rule['prefix'],
status=rule['status'], expiration=expiration) status=rule['status'], expiration=expiration, transition=transition)
lifecycle.append(rule) lifecycle.append(rule)
return lifecycle return lifecycle
@ -7642,6 +7815,28 @@ def set_lifecycle(rules = None):
bucket.configure_lifecycle(lifecycle) bucket.configure_lifecycle(lifecycle)
return bucket return bucket
def configured_storage_classes():
sc = [ 'STANDARD' ]
if 'storage_classes' in config['main']:
extra_sc = re.split('\W+', config['main']['storage_classes'])
for item in extra_sc:
if item != 'STANDARD':
sc.append(item)
return sc
def lc_transition(days=None, date=None, storage_class=None):
return boto.s3.lifecycle.Transition(days=days, date=date, storage_class=storage_class)
def lc_transitions(transitions=None):
result = boto.s3.lifecycle.Transitions()
for t in transitions:
result.add_transition(days=t.days, date=t.date, storage_class=t.storage_class)
return result
@attr(resource='bucket') @attr(resource='bucket')
@attr(method='put') @attr(method='put')
@ -7728,6 +7923,102 @@ def test_lifecycle_expiration():
eq(len(keep2_keys), 4) eq(len(keep2_keys), 4)
eq(len(expire3_keys), 2) eq(len(expire3_keys), 2)
def list_bucket_storage_class(bucket):
result = defaultdict(list)
for k in bucket.get_all_versions():
result[k.storage_class].append(k)
return result
# The test harness for lifecycle is configured to treat days as 10 second intervals.
@attr(resource='bucket')
@attr(method='put')
@attr(operation='test lifecycle expiration')
@attr('lifecycle')
@attr('lifecycle_transition')
@attr('fails_on_aws')
def test_lifecycle_transition():
sc = configured_storage_classes()
if len(sc) < 3:
raise SkipTest
bucket = set_lifecycle(rules=[{'id': 'rule1', 'transition': lc_transition(days=1, storage_class=sc[1]), 'prefix': 'expire1/', 'status': 'Enabled'},
{'id':'rule2', 'transition': lc_transition(days=4, storage_class=sc[2]), 'prefix': 'expire3/', 'status': 'Enabled'}])
_create_keys(bucket=bucket, keys=['expire1/foo', 'expire1/bar', 'keep2/foo',
'keep2/bar', 'expire3/foo', 'expire3/bar'])
# Get list of all keys
init_keys = bucket.get_all_keys()
eq(len(init_keys), 6)
# Wait for first expiration (plus fudge to handle the timer window)
time.sleep(25)
expire1_keys = list_bucket_storage_class(bucket)
eq(len(expire1_keys['STANDARD']), 4)
eq(len(expire1_keys[sc[1]]), 2)
eq(len(expire1_keys[sc[2]]), 0)
# Wait for next expiration cycle
time.sleep(10)
keep2_keys = list_bucket_storage_class(bucket)
eq(len(keep2_keys['STANDARD']), 4)
eq(len(keep2_keys[sc[1]]), 2)
eq(len(keep2_keys[sc[2]]), 0)
# Wait for final expiration cycle
time.sleep(20)
expire3_keys = list_bucket_storage_class(bucket)
eq(len(expire3_keys['STANDARD']), 2)
eq(len(expire3_keys[sc[1]]), 2)
eq(len(expire3_keys[sc[2]]), 2)
# The test harness for lifecycle is configured to treat days as 10 second intervals.
@attr(resource='bucket')
@attr(method='put')
@attr(operation='test lifecycle expiration')
@attr('lifecycle')
@attr('lifecycle_transition')
@attr('fails_on_aws')
def test_lifecycle_transition_single_rule_multi_trans():
sc = configured_storage_classes()
if len(sc) < 3:
raise SkipTest
bucket = set_lifecycle(rules=[
{'id': 'rule1',
'transition': lc_transitions([
lc_transition(days=1, storage_class=sc[1]),
lc_transition(days=4, storage_class=sc[2])]),
'prefix': 'expire1/',
'status': 'Enabled'}])
_create_keys(bucket=bucket, keys=['expire1/foo', 'expire1/bar', 'keep2/foo',
'keep2/bar', 'expire3/foo', 'expire3/bar'])
# Get list of all keys
init_keys = bucket.get_all_keys()
eq(len(init_keys), 6)
# Wait for first expiration (plus fudge to handle the timer window)
time.sleep(25)
expire1_keys = list_bucket_storage_class(bucket)
eq(len(expire1_keys['STANDARD']), 4)
eq(len(expire1_keys[sc[1]]), 2)
eq(len(expire1_keys[sc[2]]), 0)
# Wait for next expiration cycle
time.sleep(10)
keep2_keys = list_bucket_storage_class(bucket)
eq(len(keep2_keys['STANDARD']), 4)
eq(len(keep2_keys[sc[1]]), 2)
eq(len(keep2_keys[sc[2]]), 0)
# Wait for final expiration cycle
time.sleep(20)
expire3_keys = list_bucket_storage_class(bucket)
eq(len(expire3_keys['STANDARD']), 4)
eq(len(expire3_keys[sc[1]]), 0)
eq(len(expire3_keys[sc[2]]), 2)
@attr(resource='bucket') @attr(resource='bucket')
@attr(method='put') @attr(method='put')
@attr(operation='id too long in lifecycle rule') @attr(operation='id too long in lifecycle rule')
@ -7813,6 +8104,14 @@ def generate_lifecycle_body(rules):
if 'NoncurrentVersionExpiration' in rule.keys(): if 'NoncurrentVersionExpiration' in rule.keys():
body += '<NoncurrentVersionExpiration><NoncurrentDays>%d</NoncurrentDays></NoncurrentVersionExpiration>' % \ body += '<NoncurrentVersionExpiration><NoncurrentDays>%d</NoncurrentDays></NoncurrentVersionExpiration>' % \
rule['NoncurrentVersionExpiration']['NoncurrentDays'] rule['NoncurrentVersionExpiration']['NoncurrentDays']
if 'NoncurrentVersionTransition' in rule.keys():
for t in rule['NoncurrentVersionTransition']:
body += '<NoncurrentVersionTransition>'
body += '<NoncurrentDays>%d</NoncurrentDays>' % \
t['NoncurrentDays']
body += '<StorageClass>%s</StorageClass>' % \
t['StorageClass']
body += '</NoncurrentVersionTransition>'
if 'AbortIncompleteMultipartUpload' in rule.keys(): if 'AbortIncompleteMultipartUpload' in rule.keys():
body += '<AbortIncompleteMultipartUpload><DaysAfterInitiation>%d</DaysAfterInitiation>' \ body += '<AbortIncompleteMultipartUpload><DaysAfterInitiation>%d</DaysAfterInitiation>' \
'</AbortIncompleteMultipartUpload>' % rule['AbortIncompleteMultipartUpload']['DaysAfterInitiation'] '</AbortIncompleteMultipartUpload>' % rule['AbortIncompleteMultipartUpload']['DaysAfterInitiation']
@ -7936,6 +8235,114 @@ def test_lifecycle_noncur_expiration():
eq(len(expire_keys), 4) eq(len(expire_keys), 4)
@attr(resource='bucket')
@attr(method='put')
@attr(operation='set lifecycle config with noncurrent version expiration')
@attr('lifecycle')
@attr('lifecycle_transition')
def test_lifecycle_set_noncurrent_transition():
sc = configured_storage_classes()
if len(sc) < 3:
raise SkipTest
bucket = get_new_bucket()
rules = [
{
'ID': 'rule1',
'Prefix': 'test1/',
'Status': 'Enabled',
'NoncurrentVersionTransition': [
{
'NoncurrentDays': 2,
'StorageClass': sc[1]
},
{
'NoncurrentDays': 4,
'StorageClass': sc[2]
}
],
'NoncurrentVersionExpiration': {
'NoncurrentDays': 6
}
},
{'ID': 'rule2', 'Prefix': 'test2/', 'Status': 'Disabled', 'NoncurrentVersionExpiration': {'NoncurrentDays': 3}}
]
body = generate_lifecycle_body(rules)
fp = StringIO(body)
md5 = boto.utils.compute_md5(fp)
headers = {'Content-MD5': md5[1], 'Content-Type': 'text/xml'}
res = bucket.connection.make_request('PUT', bucket.name, data=fp.getvalue(), query_args='lifecycle',
headers=headers)
eq(res.status, 200)
eq(res.reason, 'OK')
@attr(resource='bucket')
@attr(method='put')
@attr(operation='test lifecycle non-current version expiration')
@attr('lifecycle')
@attr('lifecycle_expiration')
@attr('lifecycle_transition')
@attr('fails_on_aws')
def test_lifecycle_noncur_transition():
sc = configured_storage_classes()
if len(sc) < 3:
raise SkipTest
bucket = get_new_bucket()
check_configure_versioning_retry(bucket, True, "Enabled")
rules = [
{
'ID': 'rule1',
'Prefix': 'test1/',
'Status': 'Enabled',
'NoncurrentVersionTransition': [
{
'NoncurrentDays': 1,
'StorageClass': sc[1]
},
{
'NoncurrentDays': 3,
'StorageClass': sc[2]
}
],
'NoncurrentVersionExpiration': {
'NoncurrentDays': 5
}
}
]
body = generate_lifecycle_body(rules)
fp = StringIO(body)
md5 = boto.utils.compute_md5(fp)
headers = {'Content-MD5': md5[1], 'Content-Type': 'text/xml'}
bucket.connection.make_request('PUT', bucket.name, data=fp.getvalue(), query_args='lifecycle',
headers=headers)
create_multiple_versions(bucket, "test1/a", 3)
create_multiple_versions(bucket, "test1/b", 3)
init_keys = bucket.get_all_versions()
eq(len(init_keys), 6)
time.sleep(25)
expire1_keys = list_bucket_storage_class(bucket)
eq(len(expire1_keys['STANDARD']), 2)
eq(len(expire1_keys[sc[1]]), 4)
eq(len(expire1_keys[sc[2]]), 0)
time.sleep(20)
expire1_keys = list_bucket_storage_class(bucket)
eq(len(expire1_keys['STANDARD']), 2)
eq(len(expire1_keys[sc[1]]), 0)
eq(len(expire1_keys[sc[2]]), 4)
time.sleep(20)
expire_keys = bucket.get_all_versions()
expire1_keys = list_bucket_storage_class(bucket)
eq(len(expire1_keys['STANDARD']), 2)
eq(len(expire1_keys[sc[1]]), 0)
eq(len(expire1_keys[sc[2]]), 0)
@attr(resource='bucket') @attr(resource='bucket')
@attr(method='put') @attr(method='put')
@attr(operation='set lifecycle config with delete marker expiration') @attr(operation='set lifecycle config with delete marker expiration')