Add testcases for rgw cloudtransition feature

Feature PR: https://github.com/ceph/ceph/pull/35100

Also ported lc testcases from boto2 to boto3

(cherry picked from commit 47292aee17)
Signed-off-by: Soumya Koduri <skoduri@redhat.com>
This commit is contained in:
Soumya Koduri 2021-12-06 10:53:18 +05:30
parent 1af1880b7a
commit 001b8c14a1
4 changed files with 643 additions and 238 deletions

View file

@ -56,6 +56,37 @@ access_key = NOPQRSTUVWXYZABCDEFG
# alt AWS secret key set in vstart.sh # alt AWS secret key set in vstart.sh
secret_key = nopqrstuvwxyzabcdefghijklmnabcdefghijklm secret_key = nopqrstuvwxyzabcdefghijklmnabcdefghijklm
[s3 cloud]
## to run the testcases with "cloud_transition" attribute.
## Note: the waiting time may have to tweaked depending on
## the I/O latency to the cloud endpoint.
## host set for cloud endpoint
# host = localhost
## port set for cloud endpoint
# port = 8001
## say "False" to disable TLS
# is_secure = False
## cloud endpoint credentials
# access_key = 0555b35654ad1656d804
# secret_key = h7GhxuBLTrlhVUyxSPUKUV8r/2EI4ngqJxD7iBdBYLhwluN30JaT3Q==
## storage class configured as cloud tier on local rgw server
# cloud_storage_class = CLOUDTIER
## Below are optional -
## Above configured cloud storage class config options
# retain_head_object = false
# target_storage_class = Target_SC
# target_path = cloud-bucket
## another regular storage class to test multiple transition rules,
# storage_class = S1
[s3 tenant] [s3 tenant]
# tenant display_name set in vstart.sh # tenant display_name set in vstart.sh
display_name = testx$tenanteduser display_name = testx$tenanteduser

View file

@ -236,242 +236,6 @@ def list_bucket_storage_class(bucket):
return result return result
# The test harness for lifecycle is configured to treat days as 10 second intervals.
@attr(resource='bucket')
@attr(method='put')
@attr(operation='test lifecycle expiration')
@attr('lifecycle')
@attr('lifecycle_transition')
@attr('fails_on_aws')
def test_lifecycle_transition():
sc = configured_storage_classes()
if len(sc) < 3:
raise SkipTest
bucket = set_lifecycle(rules=[{'id': 'rule1', 'transition': lc_transition(days=1, storage_class=sc[1]), 'prefix': 'expire1/', 'status': 'Enabled'},
{'id':'rule2', 'transition': lc_transition(days=4, storage_class=sc[2]), 'prefix': 'expire3/', 'status': 'Enabled'}])
_create_keys(bucket=bucket, keys=['expire1/foo', 'expire1/bar', 'keep2/foo',
'keep2/bar', 'expire3/foo', 'expire3/bar'])
# Get list of all keys
init_keys = bucket.get_all_keys()
eq(len(init_keys), 6)
# Wait for first expiration (plus fudge to handle the timer window)
time.sleep(25)
expire1_keys = list_bucket_storage_class(bucket)
eq(len(expire1_keys['STANDARD']), 4)
eq(len(expire1_keys[sc[1]]), 2)
eq(len(expire1_keys[sc[2]]), 0)
# Wait for next expiration cycle
time.sleep(10)
keep2_keys = list_bucket_storage_class(bucket)
eq(len(keep2_keys['STANDARD']), 4)
eq(len(keep2_keys[sc[1]]), 2)
eq(len(keep2_keys[sc[2]]), 0)
# Wait for final expiration cycle
time.sleep(20)
expire3_keys = list_bucket_storage_class(bucket)
eq(len(expire3_keys['STANDARD']), 2)
eq(len(expire3_keys[sc[1]]), 2)
eq(len(expire3_keys[sc[2]]), 2)
# The test harness for lifecycle is configured to treat days as 10 second intervals.
@attr(resource='bucket')
@attr(method='put')
@attr(operation='test lifecycle expiration')
@attr('lifecycle')
@attr('lifecycle_transition')
@attr('fails_on_aws')
def test_lifecycle_transition_single_rule_multi_trans():
sc = configured_storage_classes()
if len(sc) < 3:
raise SkipTest
bucket = set_lifecycle(rules=[
{'id': 'rule1',
'transition': lc_transitions([
lc_transition(days=1, storage_class=sc[1]),
lc_transition(days=4, storage_class=sc[2])]),
'prefix': 'expire1/',
'status': 'Enabled'}])
_create_keys(bucket=bucket, keys=['expire1/foo', 'expire1/bar', 'keep2/foo',
'keep2/bar', 'expire3/foo', 'expire3/bar'])
# Get list of all keys
init_keys = bucket.get_all_keys()
eq(len(init_keys), 6)
# Wait for first expiration (plus fudge to handle the timer window)
time.sleep(25)
expire1_keys = list_bucket_storage_class(bucket)
eq(len(expire1_keys['STANDARD']), 4)
eq(len(expire1_keys[sc[1]]), 2)
eq(len(expire1_keys[sc[2]]), 0)
# Wait for next expiration cycle
time.sleep(10)
keep2_keys = list_bucket_storage_class(bucket)
eq(len(keep2_keys['STANDARD']), 4)
eq(len(keep2_keys[sc[1]]), 2)
eq(len(keep2_keys[sc[2]]), 0)
# Wait for final expiration cycle
time.sleep(20)
expire3_keys = list_bucket_storage_class(bucket)
eq(len(expire3_keys['STANDARD']), 4)
eq(len(expire3_keys[sc[1]]), 0)
eq(len(expire3_keys[sc[2]]), 2)
def generate_lifecycle_body(rules):
body = '<?xml version="1.0" encoding="UTF-8"?><LifecycleConfiguration>'
for rule in rules:
body += '<Rule><ID>%s</ID><Status>%s</Status>' % (rule['ID'], rule['Status'])
if 'Prefix' in list(rule.keys()):
body += '<Prefix>%s</Prefix>' % rule['Prefix']
if 'Filter' in list(rule.keys()):
prefix_str= '' # AWS supports empty filters
if 'Prefix' in list(rule['Filter'].keys()):
prefix_str = '<Prefix>%s</Prefix>' % rule['Filter']['Prefix']
body += '<Filter>%s</Filter>' % prefix_str
if 'Expiration' in list(rule.keys()):
if 'ExpiredObjectDeleteMarker' in list(rule['Expiration'].keys()):
body += '<Expiration><ExpiredObjectDeleteMarker>%s</ExpiredObjectDeleteMarker></Expiration>' \
% rule['Expiration']['ExpiredObjectDeleteMarker']
elif 'Date' in list(rule['Expiration'].keys()):
body += '<Expiration><Date>%s</Date></Expiration>' % rule['Expiration']['Date']
else:
body += '<Expiration><Days>%d</Days></Expiration>' % rule['Expiration']['Days']
if 'NoncurrentVersionExpiration' in list(rule.keys()):
body += '<NoncurrentVersionExpiration><NoncurrentDays>%d</NoncurrentDays></NoncurrentVersionExpiration>' % \
rule['NoncurrentVersionExpiration']['NoncurrentDays']
if 'NoncurrentVersionTransition' in list(rule.keys()):
for t in rule['NoncurrentVersionTransition']:
body += '<NoncurrentVersionTransition>'
body += '<NoncurrentDays>%d</NoncurrentDays>' % \
t['NoncurrentDays']
body += '<StorageClass>%s</StorageClass>' % \
t['StorageClass']
body += '</NoncurrentVersionTransition>'
if 'AbortIncompleteMultipartUpload' in list(rule.keys()):
body += '<AbortIncompleteMultipartUpload><DaysAfterInitiation>%d</DaysAfterInitiation>' \
'</AbortIncompleteMultipartUpload>' % rule['AbortIncompleteMultipartUpload']['DaysAfterInitiation']
body += '</Rule>'
body += '</LifecycleConfiguration>'
return body
@attr(resource='bucket')
@attr(method='put')
@attr(operation='set lifecycle config with noncurrent version expiration')
@attr('lifecycle')
@attr('lifecycle_transition')
def test_lifecycle_set_noncurrent_transition():
sc = configured_storage_classes()
if len(sc) < 3:
raise SkipTest
bucket = get_new_bucket()
rules = [
{
'ID': 'rule1',
'Prefix': 'test1/',
'Status': 'Enabled',
'NoncurrentVersionTransition': [
{
'NoncurrentDays': 2,
'StorageClass': sc[1]
},
{
'NoncurrentDays': 4,
'StorageClass': sc[2]
}
],
'NoncurrentVersionExpiration': {
'NoncurrentDays': 6
}
},
{'ID': 'rule2', 'Prefix': 'test2/', 'Status': 'Disabled', 'NoncurrentVersionExpiration': {'NoncurrentDays': 3}}
]
body = generate_lifecycle_body(rules)
fp = StringIO(body)
md5 = boto.utils.compute_md5(fp)
headers = {'Content-MD5': md5[1], 'Content-Type': 'text/xml'}
res = bucket.connection.make_request('PUT', bucket.name, data=fp.getvalue(), query_args='lifecycle',
headers=headers)
eq(res.status, 200)
eq(res.reason, 'OK')
@attr(resource='bucket')
@attr(method='put')
@attr(operation='test lifecycle non-current version expiration')
@attr('lifecycle')
@attr('lifecycle_expiration')
@attr('lifecycle_transition')
@attr('fails_on_aws')
def test_lifecycle_noncur_transition():
sc = configured_storage_classes()
if len(sc) < 3:
raise SkipTest
bucket = get_new_bucket()
check_configure_versioning_retry(bucket, True, "Enabled")
rules = [
{
'ID': 'rule1',
'Prefix': 'test1/',
'Status': 'Enabled',
'NoncurrentVersionTransition': [
{
'NoncurrentDays': 1,
'StorageClass': sc[1]
},
{
'NoncurrentDays': 3,
'StorageClass': sc[2]
}
],
'NoncurrentVersionExpiration': {
'NoncurrentDays': 5
}
}
]
body = generate_lifecycle_body(rules)
fp = StringIO(body)
md5 = boto.utils.compute_md5(fp)
headers = {'Content-MD5': md5[1], 'Content-Type': 'text/xml'}
bucket.connection.make_request('PUT', bucket.name, data=fp.getvalue(), query_args='lifecycle',
headers=headers)
create_multiple_versions(bucket, "test1/a", 3)
create_multiple_versions(bucket, "test1/b", 3)
init_keys = bucket.get_all_versions()
eq(len(init_keys), 6)
time.sleep(25)
expire1_keys = list_bucket_storage_class(bucket)
eq(len(expire1_keys['STANDARD']), 2)
eq(len(expire1_keys[sc[1]]), 4)
eq(len(expire1_keys[sc[2]]), 0)
time.sleep(20)
expire1_keys = list_bucket_storage_class(bucket)
eq(len(expire1_keys['STANDARD']), 2)
eq(len(expire1_keys[sc[1]]), 0)
eq(len(expire1_keys[sc[2]]), 4)
time.sleep(20)
expire_keys = bucket.get_all_versions()
expire1_keys = list_bucket_storage_class(bucket)
eq(len(expire1_keys['STANDARD']), 2)
eq(len(expire1_keys[sc[1]]), 0)
eq(len(expire1_keys[sc[2]]), 0)
def transfer_part(bucket, mp_id, mp_keyname, i, part, headers=None): def transfer_part(bucket, mp_id, mp_keyname, i, part, headers=None):
"""Transfer a part of a multipart upload. Designed to be run in parallel. """Transfer a part of a multipart upload. Designed to be run in parallel.
""" """

View file

@ -12,6 +12,7 @@ import random
import string import string
import itertools import itertools
import urllib3 import urllib3
import re
config = munch.Munch config = munch.Munch
@ -163,6 +164,17 @@ def nuke_prefixed_buckets(prefix, client=None):
print('Done with cleanup of buckets in tests.') print('Done with cleanup of buckets in tests.')
def configured_storage_classes():
sc = [ 'STANDARD' ]
extra_sc = re.split('\W+', config.storage_classes)
for item in extra_sc:
if item != 'STANDARD':
sc.append(item)
return sc
def setup(): def setup():
cfg = configparser.RawConfigParser() cfg = configparser.RawConfigParser()
try: try:
@ -226,6 +238,12 @@ def setup():
config.main_api_name = "" config.main_api_name = ""
pass pass
try:
config.storage_classes = cfg.get('s3 main',"storage_classes")
except (configparser.NoSectionError, configparser.NoOptionError):
config.storage_classes = ""
pass
config.alt_access_key = cfg.get('s3 alt',"access_key") config.alt_access_key = cfg.get('s3 alt',"access_key")
config.alt_secret_key = cfg.get('s3 alt',"secret_key") config.alt_secret_key = cfg.get('s3 alt',"secret_key")
config.alt_display_name = cfg.get('s3 alt',"display_name") config.alt_display_name = cfg.get('s3 alt',"display_name")
@ -251,6 +269,11 @@ def setup():
nuke_prefixed_buckets(prefix=prefix, client=alt_client) nuke_prefixed_buckets(prefix=prefix, client=alt_client)
nuke_prefixed_buckets(prefix=prefix, client=tenant_client) nuke_prefixed_buckets(prefix=prefix, client=tenant_client)
if cfg.has_section("s3 cloud"):
get_cloud_config(cfg)
else:
config.cloud_storage_class = None
def teardown(): def teardown():
alt_client = get_alt_client() alt_client = get_alt_client()
@ -298,6 +321,43 @@ def check_webidentity():
config.webidentity_azp = cfg.get('webidentity', "azp") config.webidentity_azp = cfg.get('webidentity', "azp")
config.webidentity_user_token = cfg.get('webidentity', "user_token") config.webidentity_user_token = cfg.get('webidentity', "user_token")
def get_cloud_config(cfg):
config.cloud_host = cfg.get('s3 cloud',"host")
config.cloud_port = int(cfg.get('s3 cloud',"port"))
config.cloud_is_secure = cfg.getboolean('s3 cloud', "is_secure")
proto = 'https' if config.cloud_is_secure else 'http'
config.cloud_endpoint = "%s://%s:%d" % (proto, config.cloud_host, config.cloud_port)
config.cloud_access_key = cfg.get('s3 cloud',"access_key")
config.cloud_secret_key = cfg.get('s3 cloud',"secret_key")
try:
config.cloud_storage_class = cfg.get('s3 cloud', "cloud_storage_class")
except (configparser.NoSectionError, configparser.NoOptionError):
config.cloud_storage_class = None
try:
config.cloud_retain_head_object = cfg.get('s3 cloud',"retain_head_object")
except (configparser.NoSectionError, configparser.NoOptionError):
config.cloud_retain_head_object = None
try:
config.cloud_target_path = cfg.get('s3 cloud',"target_path")
except (configparser.NoSectionError, configparser.NoOptionError):
config.cloud_target_path = None
try:
config.cloud_target_storage_class = cfg.get('s3 cloud',"target_storage_class")
except (configparser.NoSectionError, configparser.NoOptionError):
config.cloud_target_storage_class = 'STANDARD'
try:
config.cloud_regular_storage_class = cfg.get('s3 cloud', "storage_class")
except (configparser.NoSectionError, configparser.NoOptionError):
config.cloud_regular_storage_class = None
def get_client(client_config=None): def get_client(client_config=None):
if client_config == None: if client_config == None:
client_config = Config(signature_version='s3v4') client_config = Config(signature_version='s3v4')
@ -380,6 +440,18 @@ def get_alt_client(client_config=None):
config=client_config) config=client_config)
return client return client
def get_cloud_client(client_config=None):
if client_config == None:
client_config = Config(signature_version='s3v4')
client = boto3.client(service_name='s3',
aws_access_key_id=config.cloud_access_key,
aws_secret_access_key=config.cloud_secret_key,
endpoint_url=config.cloud_endpoint,
use_ssl=config.cloud_is_secure,
config=client_config)
return client
def get_tenant_client(client_config=None): def get_tenant_client(client_config=None):
if client_config == None: if client_config == None:
client_config = Config(signature_version='s3v4') client_config = Config(signature_version='s3v4')
@ -598,3 +670,18 @@ def get_iam_secret_key():
def get_user_token(): def get_user_token():
return config.webidentity_user_token return config.webidentity_user_token
def get_cloud_storage_class():
return config.cloud_storage_class
def get_cloud_retain_head_object():
return config.cloud_retain_head_object
def get_cloud_regular_storage_class():
return config.cloud_regular_storage_class
def get_cloud_target_path():
return config.cloud_target_path
def get_cloud_target_storage_class():
return config.cloud_target_storage_class

View file

@ -28,6 +28,8 @@ import socket
import dateutil.parser import dateutil.parser
import ssl import ssl
from collections import namedtuple from collections import namedtuple
from collections import defaultdict
from io import StringIO
from email.header import decode_header from email.header import decode_header
@ -72,7 +74,14 @@ from . import (
get_main_kms_keyid, get_main_kms_keyid,
get_secondary_kms_keyid, get_secondary_kms_keyid,
get_svc_client, get_svc_client,
get_cloud_storage_class,
get_cloud_retain_head_object,
get_cloud_regular_storage_class,
get_cloud_target_path,
get_cloud_target_storage_class,
get_cloud_client,
nuke_prefixed_buckets, nuke_prefixed_buckets,
configured_storage_classes,
) )
@ -5790,6 +5799,22 @@ def get_bucket_key_names(bucket_name):
objs_list = get_objects_list(bucket_name) objs_list = get_objects_list(bucket_name)
return frozenset(obj for obj in objs_list) return frozenset(obj for obj in objs_list)
def list_bucket_storage_class(client, bucket_name):
result = defaultdict(list)
response = client.list_object_versions(Bucket=bucket_name)
for k in response['Versions']:
result[k['StorageClass']].append(k)
return result
def list_bucket_versions(client, bucket_name):
result = defaultdict(list)
response = client.list_object_versions(Bucket=bucket_name)
for k in response['Versions']:
result[response['Name']].append(k)
return result
@attr(resource='object') @attr(resource='object')
@attr(method='ACLs') @attr(method='ACLs')
@attr(operation='set bucket/object acls: private/private') @attr(operation='set bucket/object acls: private/private')
@ -8266,6 +8291,7 @@ def check_obj_versions(client, bucket_name, key, version_ids, contents):
# check to see if objects is pointing at correct version # check to see if objects is pointing at correct version
response = client.list_object_versions(Bucket=bucket_name) response = client.list_object_versions(Bucket=bucket_name)
versions = []
versions = response['Versions'] versions = response['Versions']
# obj versions in versions come out created last to first not first to last like version_ids & contents # obj versions in versions come out created last to first not first to last like version_ids & contents
versions.reverse() versions.reverse()
@ -8289,8 +8315,8 @@ def create_multiple_versions(client, bucket_name, key, num_versions, version_ids
contents.append(body) contents.append(body)
version_ids.append(version_id) version_ids.append(version_id)
if check_versions: # if check_versions:
check_obj_versions(client, bucket_name, key, version_ids, contents) # check_obj_versions(client, bucket_name, key, version_ids, contents)
return (version_ids, contents) return (version_ids, contents)
@ -9954,6 +9980,503 @@ def _test_encryption_sse_customer_write(file_size):
body = _get_body(response) body = _get_body(response)
eq(body, data) eq(body, data)
# The test harness for lifecycle is configured to treat days as 10 second intervals.
@attr(resource='bucket')
@attr(method='put')
@attr(operation='test lifecycle transition')
@attr('lifecycle')
@attr('lifecycle_transition')
@attr('fails_on_aws')
def test_lifecycle_transition():
sc = configured_storage_classes()
if len(sc) < 3:
raise SkipTest
bucket_name = _create_objects(keys=['expire1/foo', 'expire1/bar', 'keep2/foo',
'keep2/bar', 'expire3/foo', 'expire3/bar'])
client = get_client()
rules=[{'ID': 'rule1', 'Transitions': [{'Days': 1, 'StorageClass': sc[1]}], 'Prefix': 'expire1/', 'Status': 'Enabled'},
{'ID': 'rule2', 'Transitions': [{'Days': 4, 'StorageClass': sc[2]}], 'Prefix': 'expire3/', 'Status': 'Enabled'}]
lifecycle = {'Rules': rules}
client.put_bucket_lifecycle_configuration(Bucket=bucket_name, LifecycleConfiguration=lifecycle)
# Get list of all keys
response = client.list_objects(Bucket=bucket_name)
init_keys = _get_keys(response)
eq(len(init_keys), 6)
# Wait for first expiration (plus fudge to handle the timer window)
time.sleep(25)
expire1_keys = list_bucket_storage_class(client, bucket_name)
eq(len(expire1_keys['STANDARD']), 4)
eq(len(expire1_keys[sc[1]]), 2)
eq(len(expire1_keys[sc[2]]), 0)
# Wait for next expiration cycle
time.sleep(10)
keep2_keys = list_bucket_storage_class(client, bucket_name)
eq(len(keep2_keys['STANDARD']), 4)
eq(len(keep2_keys[sc[1]]), 2)
eq(len(keep2_keys[sc[2]]), 0)
# Wait for final expiration cycle
time.sleep(20)
expire3_keys = list_bucket_storage_class(client, bucket_name)
eq(len(expire3_keys['STANDARD']), 2)
eq(len(expire3_keys[sc[1]]), 2)
eq(len(expire3_keys[sc[2]]), 2)
# The test harness for lifecycle is configured to treat days as 10 second intervals.
@attr(resource='bucket')
@attr(method='put')
@attr(operation='test lifecycle expiration')
@attr('lifecycle')
@attr('lifecycle_transition')
@attr('fails_on_aws')
def test_lifecycle_transition_single_rule_multi_trans():
sc = configured_storage_classes()
if len(sc) < 3:
raise SkipTest
bucket_name = _create_objects(keys=['expire1/foo', 'expire1/bar', 'keep2/foo',
'keep2/bar', 'expire3/foo', 'expire3/bar'])
client = get_client()
rules=[{'ID': 'rule1', 'Transitions': [{'Days': 1, 'StorageClass': sc[1]}, {'Days': 4, 'StorageClass': sc[2]}], 'Prefix': 'expire1/', 'Status': 'Enabled'}]
lifecycle = {'Rules': rules}
client.put_bucket_lifecycle_configuration(Bucket=bucket_name, LifecycleConfiguration=lifecycle)
# Get list of all keys
response = client.list_objects(Bucket=bucket_name)
init_keys = _get_keys(response)
eq(len(init_keys), 6)
# Wait for first expiration (plus fudge to handle the timer window)
time.sleep(25)
expire1_keys = list_bucket_storage_class(client, bucket_name)
eq(len(expire1_keys['STANDARD']), 4)
eq(len(expire1_keys[sc[1]]), 2)
eq(len(expire1_keys[sc[2]]), 0)
# Wait for next expiration cycle
time.sleep(10)
keep2_keys = list_bucket_storage_class(client, bucket_name)
eq(len(keep2_keys['STANDARD']), 4)
eq(len(keep2_keys[sc[1]]), 2)
eq(len(keep2_keys[sc[2]]), 0)
# Wait for final expiration cycle
time.sleep(20)
expire3_keys = list_bucket_storage_class(client, bucket_name)
eq(len(expire3_keys['STANDARD']), 4)
eq(len(expire3_keys[sc[1]]), 0)
eq(len(expire3_keys[sc[2]]), 2)
@attr(resource='bucket')
@attr(method='put')
@attr(operation='set lifecycle config with noncurrent version expiration')
@attr('lifecycle')
@attr('lifecycle_transition')
def test_lifecycle_set_noncurrent_transition():
sc = configured_storage_classes()
if len(sc) < 3:
raise SkipTest
bucket = get_new_bucket()
client = get_client()
rules = [
{
'ID': 'rule1',
'Prefix': 'test1/',
'Status': 'Enabled',
'NoncurrentVersionTransitions': [
{
'NoncurrentDays': 2,
'StorageClass': sc[1]
},
{
'NoncurrentDays': 4,
'StorageClass': sc[2]
}
],
'NoncurrentVersionExpiration': {
'NoncurrentDays': 6
}
},
{'ID': 'rule2', 'Prefix': 'test2/', 'Status': 'Disabled', 'NoncurrentVersionExpiration': {'NoncurrentDays': 3}}
]
lifecycle = {'Rules': rules}
response = client.put_bucket_lifecycle_configuration(Bucket=bucket, LifecycleConfiguration=lifecycle)
eq(response['ResponseMetadata']['HTTPStatusCode'], 200)
@attr(resource='bucket')
@attr(method='put')
@attr(operation='test lifecycle non-current version expiration')
@attr('lifecycle')
@attr('lifecycle_expiration')
@attr('lifecycle_transition')
@attr('fails_on_aws')
def test_lifecycle_noncur_transition():
sc = configured_storage_classes()
if len(sc) < 3:
raise SkipTest
bucket = get_new_bucket()
client = get_client()
check_configure_versioning_retry(bucket, "Enabled", "Enabled")
rules = [
{
'ID': 'rule1',
'Prefix': 'test1/',
'Status': 'Enabled',
'NoncurrentVersionTransitions': [
{
'NoncurrentDays': 1,
'StorageClass': sc[1]
},
{
'NoncurrentDays': 3,
'StorageClass': sc[2]
}
],
'NoncurrentVersionExpiration': {
'NoncurrentDays': 5
}
}
]
lifecycle = {'Rules': rules}
response = client.put_bucket_lifecycle_configuration(Bucket=bucket, LifecycleConfiguration=lifecycle)
create_multiple_versions(client, bucket, "test1/a", 3)
create_multiple_versions(client, bucket, "test1/b", 3)
init_keys = list_bucket_storage_class(client, bucket)
eq(len(init_keys['STANDARD']), 6)
time.sleep(25)
expire1_keys = list_bucket_storage_class(client, bucket)
eq(len(expire1_keys['STANDARD']), 2)
eq(len(expire1_keys[sc[1]]), 4)
eq(len(expire1_keys[sc[2]]), 0)
time.sleep(20)
expire1_keys = list_bucket_storage_class(client, bucket)
eq(len(expire1_keys['STANDARD']), 2)
eq(len(expire1_keys[sc[1]]), 0)
eq(len(expire1_keys[sc[2]]), 4)
time.sleep(20)
expire1_keys = list_bucket_storage_class(client, bucket)
eq(len(expire1_keys['STANDARD']), 2)
eq(len(expire1_keys[sc[1]]), 0)
eq(len(expire1_keys[sc[2]]), 0)
def verify_object(client, bucket, key, content=None, sc=None):
response = client.get_object(Bucket=bucket, Key=key)
if (sc == None):
sc = 'STANDARD'
if ('StorageClass' in response):
eq(response['StorageClass'], sc)
else: #storage class should be STANDARD
eq('STANDARD', sc)
if (content != None):
body = _get_body(response)
eq(body, content)
# The test harness for lifecycle is configured to treat days as 10 second intervals.
@attr(resource='bucket')
@attr(method='put')
@attr(operation='test lifecycle transition for cloud')
@attr('lifecycle')
@attr('lifecycle_transition')
@attr('cloud_transition')
@attr('fails_on_aws')
def test_lifecycle_cloud_transition():
cloud_sc = get_cloud_storage_class()
if cloud_sc == None:
raise SkipTest
retain_head_object = get_cloud_retain_head_object()
target_path = get_cloud_target_path()
target_sc = get_cloud_target_storage_class()
keys=['expire1/foo', 'expire1/bar', 'keep2/foo', 'keep2/bar']
bucket_name = _create_objects(keys=keys)
client = get_client()
rules=[{'ID': 'rule1', 'Transitions': [{'Days': 1, 'StorageClass': cloud_sc}], 'Prefix': 'expire1/', 'Status': 'Enabled'}]
lifecycle = {'Rules': rules}
client.put_bucket_lifecycle_configuration(Bucket=bucket_name, LifecycleConfiguration=lifecycle)
# Get list of all keys
response = client.list_objects(Bucket=bucket_name)
init_keys = _get_keys(response)
eq(len(init_keys), 4)
# Wait for first expiration (plus fudge to handle the timer window)
time.sleep(30)
expire1_keys = list_bucket_storage_class(client, bucket_name)
eq(len(expire1_keys['STANDARD']), 2)
if (retain_head_object != None and retain_head_object == "true"):
eq(len(expire1_keys[cloud_sc]), 2)
else:
eq(len(expire1_keys[cloud_sc]), 0)
time.sleep(20)
# Check if objects copied to target path
if target_path == None:
target_path = "rgwx-default-" + cloud_sc.lower() + "-cloud-bucket"
prefix = bucket_name + "/"
cloud_client = get_cloud_client()
time.sleep(100)
expire1_key1_str = prefix + keys[0]
verify_object(cloud_client, target_path, expire1_key1_str, keys[0], target_sc)
expire1_key2_str = prefix + keys[1]
verify_object(cloud_client, target_path, expire1_key2_str, keys[1], target_sc)
# Now verify the object on source rgw
src_key = keys[0]
if (retain_head_object != None and retain_head_object == "true"):
# verify HEAD response
response = client.head_object(Bucket=bucket_name, Key=keys[0])
eq(0, response['ContentLength'])
eq(cloud_sc, response['StorageClass'])
# GET should return InvalidObjectState error
e = assert_raises(ClientError, client.get_object, Bucket=bucket_name, Key=src_key)
status, error_code = _get_status_and_error_code(e.response)
eq(status, 403)
eq(error_code, 'InvalidObjectState')
# COPY of object should return InvalidObjectState error
copy_source = {'Bucket': bucket_name, 'Key': src_key}
e = assert_raises(ClientError, client.copy, CopySource=copy_source, Bucket=bucket_name, Key='copy_obj')
status, error_code = _get_status_and_error_code(e.response)
eq(status, 403)
eq(error_code, 'InvalidObjectState')
# DELETE should succeed
response = client.delete_object(Bucket=bucket_name, Key=src_key)
e = assert_raises(ClientError, client.get_object, Bucket=bucket_name, Key=src_key)
status, error_code = _get_status_and_error_code(e.response)
eq(status, 404)
eq(error_code, 'NoSuchKey')
# Similar to 'test_lifecycle_transition' but for cloud transition
@attr(resource='bucket')
@attr(method='put')
@attr(operation='test lifecycle transition for cloud')
@attr('lifecycle')
@attr('lifecycle_transition')
@attr('cloud_transition')
@attr('fails_on_aws')
def test_lifecycle_cloud_multiple_transition():
cloud_sc = get_cloud_storage_class()
if cloud_sc == None:
raise SkipTest
retain_head_object = get_cloud_retain_head_object()
target_path = get_cloud_target_path()
target_sc = get_cloud_target_storage_class()
sc1 = get_cloud_regular_storage_class()
if (sc1 == None):
raise SkipTest
sc = ['STANDARD', sc1, cloud_sc]
keys=['expire1/foo', 'expire1/bar', 'keep2/foo', 'keep2/bar']
bucket_name = _create_objects(keys=keys)
client = get_client()
rules=[{'ID': 'rule1', 'Transitions': [{'Days': 1, 'StorageClass': sc1}], 'Prefix': 'expire1/', 'Status': 'Enabled'},
{'ID': 'rule2', 'Transitions': [{'Days': 6, 'StorageClass': cloud_sc}], 'Prefix': 'expire1/', 'Status': 'Enabled'},
{'ID': 'rule3', 'Expiration': {'Days': 8}, 'Prefix': 'expire1/', 'Status': 'Enabled'}]
lifecycle = {'Rules': rules}
client.put_bucket_lifecycle_configuration(Bucket=bucket_name, LifecycleConfiguration=lifecycle)
# Get list of all keys
response = client.list_objects(Bucket=bucket_name)
init_keys = _get_keys(response)
eq(len(init_keys), 4)
# Wait for first expiration (plus fudge to handle the timer window)
time.sleep(50)
expire1_keys = list_bucket_storage_class(client, bucket_name)
eq(len(expire1_keys['STANDARD']), 2)
eq(len(expire1_keys[sc[1]]), 2)
eq(len(expire1_keys[sc[2]]), 0)
# Wait for next expiration cycle
time.sleep(50)
expire1_keys = list_bucket_storage_class(client, bucket_name)
eq(len(expire1_keys['STANDARD']), 2)
eq(len(expire1_keys[sc[1]]), 0)
if (retain_head_object != None and retain_head_object == "true"):
eq(len(expire1_keys[sc[2]]), 2)
else:
eq(len(expire1_keys[sc[2]]), 0)
# Wait for final expiration cycle
time.sleep(60)
expire3_keys = list_bucket_storage_class(client, bucket_name)
eq(len(expire3_keys['STANDARD']), 2)
eq(len(expire3_keys[sc[1]]), 0)
eq(len(expire3_keys[sc[2]]), 0)
# Noncurrent objects for cloud transition
@attr(resource='bucket')
@attr(method='put')
@attr(operation='test lifecycle non-current version expiration on cloud transition')
@attr('lifecycle')
@attr('lifecycle_expiration')
@attr('lifecycle_transition')
@attr('cloud_transition')
@attr('fails_on_aws')
def test_lifecycle_noncur_cloud_transition():
cloud_sc = get_cloud_storage_class()
if cloud_sc == None:
raise SkipTest
retain_head_object = get_cloud_retain_head_object()
target_path = get_cloud_target_path()
target_sc = get_cloud_target_storage_class()
sc1 = get_cloud_regular_storage_class()
if (sc1 == None):
raise SkipTest
sc = ['STANDARD', sc1, cloud_sc]
bucket = get_new_bucket()
client = get_client()
check_configure_versioning_retry(bucket, "Enabled", "Enabled")
rules = [
{
'ID': 'rule1',
'Prefix': 'test1/',
'Status': 'Enabled',
'NoncurrentVersionTransitions': [
{
'NoncurrentDays': 1,
'StorageClass': sc[1]
},
{
'NoncurrentDays': 3,
'StorageClass': sc[2]
}
],
}
]
lifecycle = {'Rules': rules}
response = client.put_bucket_lifecycle_configuration(Bucket=bucket, LifecycleConfiguration=lifecycle)
keys = ['test1/a', 'test1/b']
for k in keys:
create_multiple_versions(client, bucket, k, 3)
init_keys = list_bucket_storage_class(client, bucket)
eq(len(init_keys['STANDARD']), 6)
response = client.list_object_versions(Bucket=bucket)
time.sleep(25)
expire1_keys = list_bucket_storage_class(client, bucket)
eq(len(expire1_keys['STANDARD']), 2)
eq(len(expire1_keys[sc[1]]), 4)
eq(len(expire1_keys[sc[2]]), 0)
time.sleep(80)
expire1_keys = list_bucket_storage_class(client, bucket)
eq(len(expire1_keys['STANDARD']), 2)
eq(len(expire1_keys[sc[1]]), 0)
if (retain_head_object == None or retain_head_object == "false"):
eq(len(expire1_keys[sc[2]]), 0)
else:
eq(len(expire1_keys[sc[2]]), 4)
#check if versioned object exists on cloud endpoint
if target_path == None:
target_path = "rgwx-default-" + cloud_sc.lower() + "-cloud-bucket"
prefix = bucket + "/"
cloud_client = get_cloud_client()
time.sleep(10)
result = list_bucket_versions(client, bucket)
for src_key in keys:
for k in result[src_key]:
expire1_key1_str = prefix + 'test1/a' + "-" + k['VersionId']
verify_object(cloud_client, target_path, expire1_key1_str, None, target_sc)
# The test harness for lifecycle is configured to treat days as 10 second intervals.
@attr(resource='bucket')
@attr(method='put')
@attr(operation='test lifecycle transition for cloud')
@attr('lifecycle')
@attr('lifecycle_transition')
@attr('cloud_transition')
@attr('fails_on_aws')
def test_lifecycle_cloud_transition_large_obj():
cloud_sc = get_cloud_storage_class()
if cloud_sc == None:
raise SkipTest
retain_head_object = get_cloud_retain_head_object()
target_path = get_cloud_target_path()
target_sc = get_cloud_target_storage_class()
bucket = get_new_bucket()
client = get_client()
rules=[{'ID': 'rule1', 'Transitions': [{'Days': 1, 'StorageClass': cloud_sc}], 'Prefix': 'expire1/', 'Status': 'Enabled'}]
keys = ['keep/multi', 'expire1/multi']
size = 9*1024*1024
data = 'A'*size
for k in keys:
client.put_object(Bucket=bucket, Body=data, Key=k)
verify_object(client, bucket, k, data)
lifecycle = {'Rules': rules}
response = client.put_bucket_lifecycle_configuration(Bucket=bucket, LifecycleConfiguration=lifecycle)
# Wait for first expiration (plus fudge to handle the timer window)
time.sleep(30)
expire1_keys = list_bucket_storage_class(client, bucket)
eq(len(expire1_keys['STANDARD']), 1)
if (retain_head_object != None and retain_head_object == "true"):
eq(len(expire1_keys[cloud_sc]), 1)
else:
eq(len(expire1_keys[cloud_sc]), 0)
# Check if objects copied to target path
if target_path == None:
target_path = "rgwx-default-" + cloud_sc.lower() + "-cloud-bucket"
prefix = bucket + "/"
# multipart upload takes time
time.sleep(10)
cloud_client = get_cloud_client()
expire1_key1_str = prefix + keys[1]
verify_object(cloud_client, target_path, expire1_key1_str, data, target_sc)
@attr(resource='object') @attr(resource='object')
@attr(method='put') @attr(method='put')