add test test_lifecycle_expiration_newer_noncurrent()

This verifies the new NewerNoncurrentVersions lifecycle filter
operator.

Signed-off-by: Matt Benjamin <mbenjamin@redhat.com>
(cherry picked from commit a5aa59df04)
This commit is contained in:
Matt Benjamin 2024-03-07 14:27:13 -05:00 committed by Casey Bodley
parent ee3139ba04
commit fa75ccfe18

View file

@ -8434,6 +8434,63 @@ def test_lifecycle_expiration_noncur_tags1():
# at T+60, only the current object version should exist # at T+60, only the current object version should exist
assert num_objs == 1 assert num_objs == 1
def wait_interval_list_object_versions(client, bucket_name, secs):
time.sleep(secs)
try:
response = client.list_object_versions(Bucket=bucket_name)
objs_list = response['Versions']
except:
objs_list = []
return len(objs_list)
@pytest.mark.lifecycle
@pytest.mark.lifecycle_expiration
@pytest.mark.fails_on_aws
@pytest.mark.fails_on_dbstore
def test_lifecycle_expiration_newer_noncurrent():
bucket_name = get_new_bucket()
client = get_client()
check_configure_versioning_retry(bucket_name, "Enabled", "Enabled")
# create 10 object versions (9 noncurrent)
key = "myobject_"
for ix in range(10):
body = "%s v%d" % (key, ix)
response = client.put_object(Bucket=bucket_name, Key=key, Body=body)
assert response['ResponseMetadata']['HTTPStatusCode'] == 200
# add a lifecycle rule which sets newer-noncurrent-versions to 5
days = 1
lifecycle_config = {
'Rules': [
{
'NoncurrentVersionExpiration': {
'NoncurrentDays': days,
'NewerNoncurrentVersions': 5,
},
'ID': 'newer_noncurrent1',
'Filter': {
'Prefix': '',
},
'Status': 'Enabled',
},
]
}
response = client.put_bucket_lifecycle_configuration(
Bucket=bucket_name, LifecycleConfiguration=lifecycle_config)
assert response['ResponseMetadata']['HTTPStatusCode'] == 200
lc_interval = get_lc_debug_interval()
num_objs = wait_interval_list_object_versions(
client, bucket_name, 2*lc_interval)
# at T+20, 6 objects should exist (1 current and (9 - 5) noncurrent)
assert num_objs == 6
@pytest.mark.lifecycle @pytest.mark.lifecycle
def test_lifecycle_id_too_long(): def test_lifecycle_id_too_long():
bucket_name = get_new_bucket() bucket_name = get_new_bucket()