mirror of
https://github.com/ceph/s3-tests.git
synced 2024-11-21 23:29:47 +00:00
add tests for ObjectSizeGreater(Less)Than
Add tests for the new ObjectSizeGreaterThan and
ObjectSizeLessThan lifecycle operators.
Signed-off-by: Matt Benjamin <mbenjamin@redhat.com>
(cherry picked from commit f752d6b6d8
)
This commit is contained in:
parent
4a6fe93f52
commit
75f5210436
1 changed files with 114 additions and 0 deletions
|
@ -27,6 +27,7 @@ import ssl
|
||||||
from collections import namedtuple
|
from collections import namedtuple
|
||||||
from collections import defaultdict
|
from collections import defaultdict
|
||||||
from io import StringIO
|
from io import StringIO
|
||||||
|
from io import BytesIO
|
||||||
|
|
||||||
from email.header import decode_header
|
from email.header import decode_header
|
||||||
|
|
||||||
|
@ -8490,6 +8491,119 @@ def test_lifecycle_expiration_newer_noncurrent():
|
||||||
# at T+20, 6 objects should exist (1 current and (9 - 5) noncurrent)
|
# at T+20, 6 objects should exist (1 current and (9 - 5) noncurrent)
|
||||||
assert num_objs == 6
|
assert num_objs == 6
|
||||||
|
|
||||||
|
def get_byte_buffer(nbytes):
|
||||||
|
buf = BytesIO(b"")
|
||||||
|
for x in range(nbytes):
|
||||||
|
buf.write(b"b")
|
||||||
|
buf.seek(0)
|
||||||
|
return buf
|
||||||
|
|
||||||
|
@pytest.mark.lifecycle
|
||||||
|
@pytest.mark.lifecycle_expiration
|
||||||
|
@pytest.mark.fails_on_aws
|
||||||
|
@pytest.mark.fails_on_dbstore
|
||||||
|
def test_lifecycle_expiration_size_gt():
|
||||||
|
bucket_name = get_new_bucket()
|
||||||
|
client = get_client()
|
||||||
|
|
||||||
|
check_configure_versioning_retry(bucket_name, "Enabled", "Enabled")
|
||||||
|
|
||||||
|
# create one object lt and one object gt 2000 bytes
|
||||||
|
key = "myobject_small"
|
||||||
|
body = get_byte_buffer(1000)
|
||||||
|
response = client.put_object(Bucket=bucket_name, Key=key, Body=body)
|
||||||
|
assert response['ResponseMetadata']['HTTPStatusCode'] == 200
|
||||||
|
|
||||||
|
key = "myobject_big"
|
||||||
|
body = get_byte_buffer(3000)
|
||||||
|
response = client.put_object(Bucket=bucket_name, Key=key, Body=body)
|
||||||
|
assert response['ResponseMetadata']['HTTPStatusCode'] == 200
|
||||||
|
|
||||||
|
# add a lifecycle rule which expires objects greater than 2000 bytes
|
||||||
|
days = 1
|
||||||
|
lifecycle_config = {
|
||||||
|
'Rules': [
|
||||||
|
{
|
||||||
|
'Expiration': {
|
||||||
|
'Days': days
|
||||||
|
},
|
||||||
|
'ID': 'object_gt1',
|
||||||
|
'Filter': {
|
||||||
|
'Prefix': '',
|
||||||
|
'ObjectSizeGreaterThan': 2000
|
||||||
|
},
|
||||||
|
'Status': 'Enabled',
|
||||||
|
},
|
||||||
|
]
|
||||||
|
}
|
||||||
|
|
||||||
|
response = client.put_bucket_lifecycle_configuration(
|
||||||
|
Bucket=bucket_name, LifecycleConfiguration=lifecycle_config)
|
||||||
|
assert response['ResponseMetadata']['HTTPStatusCode'] == 200
|
||||||
|
|
||||||
|
lc_interval = get_lc_debug_interval()
|
||||||
|
time.sleep(2*lc_interval)
|
||||||
|
|
||||||
|
# we should find only the small object present
|
||||||
|
response = client.list_objects(Bucket=bucket_name)
|
||||||
|
objects = response['Contents']
|
||||||
|
|
||||||
|
assert len(objects) == 1
|
||||||
|
assert objects[0]['Key'] == "myobject_small"
|
||||||
|
|
||||||
|
@pytest.mark.lifecycle
|
||||||
|
@pytest.mark.lifecycle_expiration
|
||||||
|
@pytest.mark.fails_on_aws
|
||||||
|
@pytest.mark.fails_on_dbstore
|
||||||
|
def test_lifecycle_expiration_size_lt():
|
||||||
|
bucket_name = get_new_bucket()
|
||||||
|
client = get_client()
|
||||||
|
|
||||||
|
check_configure_versioning_retry(bucket_name, "Enabled", "Enabled")
|
||||||
|
|
||||||
|
# create one object lt and one object gt 2000 bytes
|
||||||
|
key = "myobject_small"
|
||||||
|
body = get_byte_buffer(1000)
|
||||||
|
response = client.put_object(Bucket=bucket_name, Key=key, Body=body)
|
||||||
|
assert response['ResponseMetadata']['HTTPStatusCode'] == 200
|
||||||
|
|
||||||
|
key = "myobject_big"
|
||||||
|
body = get_byte_buffer(3000)
|
||||||
|
response = client.put_object(Bucket=bucket_name, Key=key, Body=body)
|
||||||
|
assert response['ResponseMetadata']['HTTPStatusCode'] == 200
|
||||||
|
|
||||||
|
# add a lifecycle rule which expires objects greater than 2000 bytes
|
||||||
|
days = 1
|
||||||
|
lifecycle_config = {
|
||||||
|
'Rules': [
|
||||||
|
{
|
||||||
|
'Expiration': {
|
||||||
|
'Days': days
|
||||||
|
},
|
||||||
|
'ID': 'object_lt1',
|
||||||
|
'Filter': {
|
||||||
|
'Prefix': '',
|
||||||
|
'ObjectSizeLessThan': 2000
|
||||||
|
},
|
||||||
|
'Status': 'Enabled',
|
||||||
|
},
|
||||||
|
]
|
||||||
|
}
|
||||||
|
|
||||||
|
response = client.put_bucket_lifecycle_configuration(
|
||||||
|
Bucket=bucket_name, LifecycleConfiguration=lifecycle_config)
|
||||||
|
assert response['ResponseMetadata']['HTTPStatusCode'] == 200
|
||||||
|
|
||||||
|
lc_interval = get_lc_debug_interval()
|
||||||
|
time.sleep(2*lc_interval)
|
||||||
|
|
||||||
|
# we should find only the large object present
|
||||||
|
response = client.list_objects(Bucket=bucket_name)
|
||||||
|
objects = response['Contents']
|
||||||
|
|
||||||
|
assert len(objects) == 1
|
||||||
|
assert objects[0]['Key'] == "myobject_big"
|
||||||
|
|
||||||
@pytest.mark.lifecycle
|
@pytest.mark.lifecycle
|
||||||
def test_lifecycle_id_too_long():
|
def test_lifecycle_id_too_long():
|
||||||
bucket_name = get_new_bucket()
|
bucket_name = get_new_bucket()
|
||||||
|
|
Loading…
Reference in a new issue