boto3_s3tests: add tests for lifecycle expiration header

The response to a successful PUT or HEAD (or GET) of/on
an object matching any enabled lifecycle expiration rule should
include an x-amz-expiration header for the object.  The
x-amz-expiration header consists of an expiry-date, rule-id tuple
indicating the earliest matching rule and the corresponding
expiration date for the object.

Also, while at it, add test for lifecycle expiration rule with
'Days' : 0, which should both apply and...work.

Signed-off-by: Matt Benjamin <mbenjamin@redhat.com>
(cherry picked from commit 2d12e98ce0)
This commit is contained in:
Matt Benjamin 2019-02-13 19:17:20 -05:00 committed by Ali Maredia
parent baec994fc6
commit 7c067c346b

View file

@ -822,7 +822,7 @@ def test_bucket_list_return_data():
eq(obj['Owner']['ID'],key_data['ID']) eq(obj['Owner']['ID'],key_data['ID'])
_compare_dates(obj['LastModified'],key_data['LastModified']) _compare_dates(obj['LastModified'],key_data['LastModified'])
# amazon is eventual consistent, retry a bit if failed # amazon is eventually consistent, retry a bit if failed
def check_configure_versioning_retry(bucket_name, status, expected_string): def check_configure_versioning_retry(bucket_name, status, expected_string):
client = get_client() client = get_client()
@ -8004,6 +8004,98 @@ def test_lifecycle_expiration_date():
eq(len(init_objects), 2) eq(len(init_objects), 2)
eq(len(expire_objects), 1) eq(len(expire_objects), 1)
@attr(resource='bucket')
@attr(method='put')
@attr(operation='test lifecycle expiration days 0')
@attr('lifecycle')
@attr('lifecycle_expiration')
def test_lifecycle_expiration_days0():
bucket_name = _create_objects(keys=['days0/foo', 'days0/bar'])
client = get_client()
rules=[{'ID': 'rule1', 'Expiration': {'Days': 0}, 'Prefix': 'days0/',
'Status':'Enabled'}]
lifecycle = {'Rules': rules}
response = client.put_bucket_lifecycle_configuration(
Bucket=bucket_name, LifecycleConfiguration=lifecycle)
eq(response['ResponseMetadata']['HTTPStatusCode'], 200)
time.sleep(20)
response = client.list_objects(Bucket=bucket_name)
expire_objects = response['Contents']
eq(len(expire_objects), 0)
def setup_lifecycle_expiration_test(bucket_name, rule_id, delta_days,
rule_prefix):
"""
Common setup for lifecycle expiration header checks:
"""
rules=[{'ID': rule_id,
'Expiration': {'Days': delta_days}, 'Prefix': rule_prefix,
'Status':'Enabled'}]
lifecycle = {'Rules': rules}
response = client.put_bucket_lifecycle_configuration(
Bucket=bucket_name, LifecycleConfiguration=lifecycle)
eq(response['ResponseMetadata']['HTTPStatusCode'], 200)
key = rule_prefix + '/foo'
body = 'bar'
response = client.put_object(Bucket=bucket_name, Key=key, Body=bar)
eq(response['ResponseMetadata']['HTTPStatusCode'], 200)
return response
def check_lifecycle_expiration_header(response, start_time, rule_id,
delta_days):
exp_header = response['ResponseMetadata']['HTTPHeaders']['x-amz-expiration']
m = re.search(r'expiry-date="(.+)", rule-id="(.+)"', exp_header)
expiration = datetime.datetime.strptime(m.group(1),
'%a %b %d %H:%M:%S %Y')
eq((expiration - start_time).days, delta_days)
eq(m.group(2), rule_id)
return True
@attr(resource='bucket')
@attr(method='put')
@attr(operation='test lifecycle expiration header put')
@attr('lifecycle')
@attr('lifecycle_expiration')
def test_lifecycle_expiration_header_put():
"""
Check for valid x-amz-expiration header after PUT
"""
bucket_name = get_new_bucket()
client = get_client()
now = datetime.datetime.now(None)
response = setup_lifecycle_expiration_test(
bucket_name, 'rule1', 1, 'days1/')
eq(check_lifecycle_expiration_header(response, now, 'rule1', 1), True)
@attr(resource='bucket')
@attr(method='head')
@attr(operation='test lifecycle expiration header head')
@attr('lifecycle')
@attr('lifecycle_expiration')
def test_lifecycle_expiration_header_head():
"""
Check for valid x-amz-expiration header on HEAD request
"""
bucket_name = get_new_bucket()
client = get_client()
now = datetime.datetime.now(None)
response = setup_lifecycle_expiration_test(
bucket_name, 'rule1', 1, 'days1/')
# stat the object, check header
response = client.head_object(Bucket=bucket_name, Key=key)
eq(response['ResponseMetadata']['HTTPStatusCode'], 200)
eq(check_lifecycle_expiration_header(response, now, 'rule1', 1), True)
@attr(resource='bucket') @attr(resource='bucket')
@attr(method='put') @attr(method='put')
@attr(operation='set lifecycle config with noncurrent version expiration') @attr(operation='set lifecycle config with noncurrent version expiration')