From f752d6b6d88dd607710a876d1f195b0d23abd7ef Mon Sep 17 00:00:00 2001 From: Matt Benjamin Date: Fri, 8 Mar 2024 10:10:47 -0500 Subject: [PATCH] add tests for ObjectSizeGreater(Less)Than Add tests for the new ObjectSizeGreaterThan and ObjectSizeLessThan lifecycle operators. Signed-off-by: Matt Benjamin --- s3tests_boto3/functional/test_s3.py | 114 ++++++++++++++++++++++++++++ 1 file changed, 114 insertions(+) diff --git a/s3tests_boto3/functional/test_s3.py b/s3tests_boto3/functional/test_s3.py index 47c9656..b99e0bc 100644 --- a/s3tests_boto3/functional/test_s3.py +++ b/s3tests_boto3/functional/test_s3.py @@ -27,6 +27,7 @@ import ssl from collections import namedtuple from collections import defaultdict from io import StringIO +from io import BytesIO from email.header import decode_header @@ -8490,6 +8491,119 @@ def test_lifecycle_expiration_newer_noncurrent(): # at T+20, 6 objects should exist (1 current and (9 - 5) noncurrent) assert num_objs == 6 +def get_byte_buffer(nbytes): + buf = BytesIO(b"") + for x in range(nbytes): + buf.write(b"b") + buf.seek(0) + return buf + +@pytest.mark.lifecycle +@pytest.mark.lifecycle_expiration +@pytest.mark.fails_on_aws +@pytest.mark.fails_on_dbstore +def test_lifecycle_expiration_size_gt(): + bucket_name = get_new_bucket() + client = get_client() + + check_configure_versioning_retry(bucket_name, "Enabled", "Enabled") + + # create one object lt and one object gt 2000 bytes + key = "myobject_small" + body = get_byte_buffer(1000) + response = client.put_object(Bucket=bucket_name, Key=key, Body=body) + assert response['ResponseMetadata']['HTTPStatusCode'] == 200 + + key = "myobject_big" + body = get_byte_buffer(3000) + response = client.put_object(Bucket=bucket_name, Key=key, Body=body) + assert response['ResponseMetadata']['HTTPStatusCode'] == 200 + + # add a lifecycle rule which expires objects greater than 2000 bytes + days = 1 + lifecycle_config = { + 'Rules': [ + { + 'Expiration': { + 'Days': days + }, + 'ID': 'object_gt1', + 'Filter': { + 'Prefix': '', + 'ObjectSizeGreaterThan': 2000 + }, + 'Status': 'Enabled', + }, + ] + } + + response = client.put_bucket_lifecycle_configuration( + Bucket=bucket_name, LifecycleConfiguration=lifecycle_config) + assert response['ResponseMetadata']['HTTPStatusCode'] == 200 + + lc_interval = get_lc_debug_interval() + time.sleep(2*lc_interval) + + # we should find only the small object present + response = client.list_objects(Bucket=bucket_name) + objects = response['Contents'] + + assert len(objects) == 1 + assert objects[0]['Key'] == "myobject_small" + +@pytest.mark.lifecycle +@pytest.mark.lifecycle_expiration +@pytest.mark.fails_on_aws +@pytest.mark.fails_on_dbstore +def test_lifecycle_expiration_size_lt(): + bucket_name = get_new_bucket() + client = get_client() + + check_configure_versioning_retry(bucket_name, "Enabled", "Enabled") + + # create one object lt and one object gt 2000 bytes + key = "myobject_small" + body = get_byte_buffer(1000) + response = client.put_object(Bucket=bucket_name, Key=key, Body=body) + assert response['ResponseMetadata']['HTTPStatusCode'] == 200 + + key = "myobject_big" + body = get_byte_buffer(3000) + response = client.put_object(Bucket=bucket_name, Key=key, Body=body) + assert response['ResponseMetadata']['HTTPStatusCode'] == 200 + + # add a lifecycle rule which expires objects greater than 2000 bytes + days = 1 + lifecycle_config = { + 'Rules': [ + { + 'Expiration': { + 'Days': days + }, + 'ID': 'object_lt1', + 'Filter': { + 'Prefix': '', + 'ObjectSizeLessThan': 2000 + }, + 'Status': 'Enabled', + }, + ] + } + + response = client.put_bucket_lifecycle_configuration( + Bucket=bucket_name, LifecycleConfiguration=lifecycle_config) + assert response['ResponseMetadata']['HTTPStatusCode'] == 200 + + lc_interval = get_lc_debug_interval() + time.sleep(2*lc_interval) + + # we should find only the large object present + response = client.list_objects(Bucket=bucket_name) + objects = response['Contents'] + + assert len(objects) == 1 + assert objects[0]['Key'] == "myobject_big" + @pytest.mark.lifecycle def test_lifecycle_id_too_long(): bucket_name = get_new_bucket()