add tests for ObjectSizeGreater(Less)Than

Add tests for the new ObjectSizeGreaterThan and
ObjectSizeLessThan lifecycle operators.

Signed-off-by: Matt Benjamin <mbenjamin@redhat.com>
This commit is contained in:
Matt Benjamin 2024-03-08 10:10:47 -05:00
parent a5aa59df04
commit f752d6b6d8

View file

@ -27,6 +27,7 @@ import ssl
from collections import namedtuple from collections import namedtuple
from collections import defaultdict from collections import defaultdict
from io import StringIO from io import StringIO
from io import BytesIO
from email.header import decode_header from email.header import decode_header
@ -8490,6 +8491,119 @@ def test_lifecycle_expiration_newer_noncurrent():
# at T+20, 6 objects should exist (1 current and (9 - 5) noncurrent) # at T+20, 6 objects should exist (1 current and (9 - 5) noncurrent)
assert num_objs == 6 assert num_objs == 6
def get_byte_buffer(nbytes):
buf = BytesIO(b"")
for x in range(nbytes):
buf.write(b"b")
buf.seek(0)
return buf
@pytest.mark.lifecycle
@pytest.mark.lifecycle_expiration
@pytest.mark.fails_on_aws
@pytest.mark.fails_on_dbstore
def test_lifecycle_expiration_size_gt():
bucket_name = get_new_bucket()
client = get_client()
check_configure_versioning_retry(bucket_name, "Enabled", "Enabled")
# create one object lt and one object gt 2000 bytes
key = "myobject_small"
body = get_byte_buffer(1000)
response = client.put_object(Bucket=bucket_name, Key=key, Body=body)
assert response['ResponseMetadata']['HTTPStatusCode'] == 200
key = "myobject_big"
body = get_byte_buffer(3000)
response = client.put_object(Bucket=bucket_name, Key=key, Body=body)
assert response['ResponseMetadata']['HTTPStatusCode'] == 200
# add a lifecycle rule which expires objects greater than 2000 bytes
days = 1
lifecycle_config = {
'Rules': [
{
'Expiration': {
'Days': days
},
'ID': 'object_gt1',
'Filter': {
'Prefix': '',
'ObjectSizeGreaterThan': 2000
},
'Status': 'Enabled',
},
]
}
response = client.put_bucket_lifecycle_configuration(
Bucket=bucket_name, LifecycleConfiguration=lifecycle_config)
assert response['ResponseMetadata']['HTTPStatusCode'] == 200
lc_interval = get_lc_debug_interval()
time.sleep(2*lc_interval)
# we should find only the small object present
response = client.list_objects(Bucket=bucket_name)
objects = response['Contents']
assert len(objects) == 1
assert objects[0]['Key'] == "myobject_small"
@pytest.mark.lifecycle
@pytest.mark.lifecycle_expiration
@pytest.mark.fails_on_aws
@pytest.mark.fails_on_dbstore
def test_lifecycle_expiration_size_lt():
bucket_name = get_new_bucket()
client = get_client()
check_configure_versioning_retry(bucket_name, "Enabled", "Enabled")
# create one object lt and one object gt 2000 bytes
key = "myobject_small"
body = get_byte_buffer(1000)
response = client.put_object(Bucket=bucket_name, Key=key, Body=body)
assert response['ResponseMetadata']['HTTPStatusCode'] == 200
key = "myobject_big"
body = get_byte_buffer(3000)
response = client.put_object(Bucket=bucket_name, Key=key, Body=body)
assert response['ResponseMetadata']['HTTPStatusCode'] == 200
# add a lifecycle rule which expires objects greater than 2000 bytes
days = 1
lifecycle_config = {
'Rules': [
{
'Expiration': {
'Days': days
},
'ID': 'object_lt1',
'Filter': {
'Prefix': '',
'ObjectSizeLessThan': 2000
},
'Status': 'Enabled',
},
]
}
response = client.put_bucket_lifecycle_configuration(
Bucket=bucket_name, LifecycleConfiguration=lifecycle_config)
assert response['ResponseMetadata']['HTTPStatusCode'] == 200
lc_interval = get_lc_debug_interval()
time.sleep(2*lc_interval)
# we should find only the large object present
response = client.list_objects(Bucket=bucket_name)
objects = response['Contents']
assert len(objects) == 1
assert objects[0]['Key'] == "myobject_big"
@pytest.mark.lifecycle @pytest.mark.lifecycle
def test_lifecycle_id_too_long(): def test_lifecycle_id_too_long():
bucket_name = get_new_bucket() bucket_name = get_new_bucket()