cloud-restore: testcase for non-current versioned object

Signed-off-by: Soumya Koduri <skoduri@redhat.com>
This commit is contained in:
Soumya Koduri 2025-03-04 23:19:24 +05:30
parent 78458f02d9
commit bce66ded33

View file

@ -9536,8 +9536,11 @@ def verify_object(client, bucket, key, content=None, sc=None):
body = _get_body(response)
assert body == content
def verify_transition(client, bucket, key, sc=None):
response = client.head_object(Bucket=bucket, Key=key)
def verify_transition(client, bucket, key, sc=None, version=None):
if (version != None):
response = client.head_object(Bucket=bucket, Key=key, VersionId=version)
else:
response = client.head_object(Bucket=bucket, Key=key)
# Iterate over the contents to find the StorageClass
if 'StorageClass' in response:
@ -9966,6 +9969,92 @@ def test_read_through():
response = client.get_object(Bucket=bucket, Key=key)
assert e.exception.response['Error']['Code'] == '403'
@pytest.mark.cloud_restore
@pytest.mark.fails_on_aws
@pytest.mark.fails_on_dbstore
def test_restore_noncur_obj():
cloud_sc = get_cloud_storage_class()
if cloud_sc == None:
pytest.skip('[s3 cloud] section missing cloud_storage_class')
retain_head_object = get_cloud_retain_head_object()
target_path = get_cloud_target_path()
target_sc = get_cloud_target_storage_class()
sc = ['STANDARD', cloud_sc]
bucket = get_new_bucket()
client = get_client()
# before enabling versioning, create a plain entry
# which should get transitioned/expired similar to
# other non-current versioned entries.
key = 'test1/a'
content = 'fooz'
client.put_object(Bucket=bucket, Key=key, Body=content)
check_configure_versioning_retry(bucket, "Enabled", "Enabled")
rules = [
{
'ID': 'rule1',
'Prefix': 'test1/',
'Status': 'Enabled',
'NoncurrentVersionTransitions': [
{
'NoncurrentDays': 2,
'StorageClass': cloud_sc
}
],
}
]
lifecycle = {'Rules': rules}
response = client.put_bucket_lifecycle_configuration(Bucket=bucket, LifecycleConfiguration=lifecycle)
keys = ['test1/a']
(version_ids, contents) = create_multiple_versions(client, bucket, "test1/a", 2)
contents.append(content)
init_keys = list_bucket_storage_class(client, bucket)
print(init_keys)
assert len(init_keys['STANDARD']) == 3
version_ids = []
response = client.list_object_versions(Bucket=bucket)
for version in response['Versions']:
version_ids.append(version['VersionId'])
lc_interval = get_lc_debug_interval()
time.sleep(7*lc_interval)
expire1_keys = list_bucket_storage_class(client, bucket)
assert len(expire1_keys['STANDARD']) == 1
assert len(expire1_keys[cloud_sc]) == 2
restore_interval = get_restore_debug_interval()
for num in range(1, 2):
verify_transition(client, bucket, key, cloud_sc, version_ids[num])
# Restore object temporarily
client.restore_object(Bucket=bucket, Key=key, VersionId=version_ids[num], RestoreRequest={'Days': 2})
time.sleep(2)
# Verify object is restored temporarily
response = client.head_object(Bucket=bucket, Key=key, VersionId=version_ids[num])
assert response['ContentLength'] == len(contents[num])
response = client.list_object_versions(Bucket=bucket)
versions = response['Versions']
assert versions[1]['IsLatest'] == False
time.sleep(2 * (restore_interval + lc_interval))
#verify object expired
for num in range(1, 2):
response = client.head_object(Bucket=bucket, Key=key, VersionId=version_ids[num])
assert response['ContentLength'] == 0
@pytest.mark.encryption
@pytest.mark.fails_on_dbstore
def test_encrypted_transfer_1b():