diff --git a/s3tests_boto3/functional/test_s3.py b/s3tests_boto3/functional/test_s3.py index 2e86d44..a7da2f9 100644 --- a/s3tests_boto3/functional/test_s3.py +++ b/s3tests_boto3/functional/test_s3.py @@ -9536,8 +9536,11 @@ def verify_object(client, bucket, key, content=None, sc=None): body = _get_body(response) assert body == content -def verify_transition(client, bucket, key, sc=None): - response = client.head_object(Bucket=bucket, Key=key) +def verify_transition(client, bucket, key, sc=None, version=None): + if (version != None): + response = client.head_object(Bucket=bucket, Key=key, VersionId=version) + else: + response = client.head_object(Bucket=bucket, Key=key) # Iterate over the contents to find the StorageClass if 'StorageClass' in response: @@ -9966,6 +9969,92 @@ def test_read_through(): response = client.get_object(Bucket=bucket, Key=key) assert e.exception.response['Error']['Code'] == '403' +@pytest.mark.cloud_restore +@pytest.mark.fails_on_aws +@pytest.mark.fails_on_dbstore +def test_restore_noncur_obj(): + cloud_sc = get_cloud_storage_class() + if cloud_sc == None: + pytest.skip('[s3 cloud] section missing cloud_storage_class') + + retain_head_object = get_cloud_retain_head_object() + target_path = get_cloud_target_path() + target_sc = get_cloud_target_storage_class() + + sc = ['STANDARD', cloud_sc] + + bucket = get_new_bucket() + client = get_client() + + # before enabling versioning, create a plain entry + # which should get transitioned/expired similar to + # other non-current versioned entries. + key = 'test1/a' + content = 'fooz' + client.put_object(Bucket=bucket, Key=key, Body=content) + + check_configure_versioning_retry(bucket, "Enabled", "Enabled") + + rules = [ + { + 'ID': 'rule1', + 'Prefix': 'test1/', + 'Status': 'Enabled', + 'NoncurrentVersionTransitions': [ + { + 'NoncurrentDays': 2, + 'StorageClass': cloud_sc + } + ], + } + ] + lifecycle = {'Rules': rules} + response = client.put_bucket_lifecycle_configuration(Bucket=bucket, LifecycleConfiguration=lifecycle) + + keys = ['test1/a'] + + (version_ids, contents) = create_multiple_versions(client, bucket, "test1/a", 2) + + contents.append(content) + + init_keys = list_bucket_storage_class(client, bucket) + print(init_keys) + assert len(init_keys['STANDARD']) == 3 + + version_ids = [] + response = client.list_object_versions(Bucket=bucket) + for version in response['Versions']: + version_ids.append(version['VersionId']) + + lc_interval = get_lc_debug_interval() + + time.sleep(7*lc_interval) + expire1_keys = list_bucket_storage_class(client, bucket) + assert len(expire1_keys['STANDARD']) == 1 + assert len(expire1_keys[cloud_sc]) == 2 + + restore_interval = get_restore_debug_interval() + + for num in range(1, 2): + verify_transition(client, bucket, key, cloud_sc, version_ids[num]) + # Restore object temporarily + client.restore_object(Bucket=bucket, Key=key, VersionId=version_ids[num], RestoreRequest={'Days': 2}) + time.sleep(2) + + # Verify object is restored temporarily + response = client.head_object(Bucket=bucket, Key=key, VersionId=version_ids[num]) + assert response['ContentLength'] == len(contents[num]) + response = client.list_object_versions(Bucket=bucket) + versions = response['Versions'] + assert versions[1]['IsLatest'] == False + + time.sleep(2 * (restore_interval + lc_interval)) + + #verify object expired + for num in range(1, 2): + response = client.head_object(Bucket=bucket, Key=key, VersionId=version_ids[num]) + assert response['ContentLength'] == 0 + @pytest.mark.encryption @pytest.mark.fails_on_dbstore def test_encrypted_transfer_1b():