Add logic that deletes all objects versions from versioned s3 bucket.

This is required per AWS S3 specification.
This commit is contained in:
Vladimir Domnich 2022-07-21 14:19:49 +04:00
parent e4a3f45a10
commit 8167c1ccd5
2 changed files with 36 additions and 3 deletions

View file

@ -61,9 +61,21 @@ class TestS3Gate:
def bucket(self): def bucket(self):
bucket = s3_gate_bucket.create_bucket_s3(self.s3_client) bucket = s3_gate_bucket.create_bucket_s3(self.s3_client)
yield bucket yield bucket
# Delete all objects from bucket
versioning_status = s3_gate_bucket.get_bucket_versioning_status(self.s3_client, bucket)
if versioning_status == s3_gate_bucket.VersioningStatus.ENABLED.value:
# From versioned bucket we should delete all versions of all objects
objects_versions = s3_gate_object.list_objects_versions_s3(self.s3_client, bucket)
if objects_versions:
s3_gate_object.delete_object_versions_s3(self.s3_client, bucket, objects_versions)
else:
# From non-versioned bucket it's sufficient to delete objects by key
objects = s3_gate_object.list_objects_s3(self.s3_client, bucket) objects = s3_gate_object.list_objects_s3(self.s3_client, bucket)
if objects: if objects:
s3_gate_object.delete_objects_s3(self.s3_client, bucket, objects) s3_gate_object.delete_objects_s3(self.s3_client, bucket, objects)
# Delete the bucket itself
s3_gate_bucket.delete_bucket_s3(self.s3_client, bucket) s3_gate_bucket.delete_bucket_s3(self.s3_client, bucket)
@allure.title('Test S3 Bucket API') @allure.title('Test S3 Bucket API')

View file

@ -137,6 +137,27 @@ def delete_objects_s3(s3_client, bucket: str, object_keys: list):
raise Exception(f'Error Message: {err.response["Error"]["Message"]}\n' raise Exception(f'Error Message: {err.response["Error"]["Message"]}\n'
f'Http status code: {err.response["ResponseMetadata"]["HTTPStatusCode"]}') from err f'Http status code: {err.response["ResponseMetadata"]["HTTPStatusCode"]}') from err
@keyword('Delete object versions S3')
def delete_object_versions_s3(s3_client, bucket: str, object_versions: list):
try:
# Build deletion list in S3 format
delete_list = {
"Objects": [
{
"Key": object_version["Key"],
"VersionId": object_version["VersionId"],
}
for object_version in object_versions
]
}
response = s3_client.delete_objects(Bucket=bucket, Delete=delete_list)
log_command_execution('S3 Delete objects result', response)
return response
except ClientError as err:
raise Exception(f'Error Message: {err.response["Error"]["Message"]}\n'
f'Http status code: {err.response["ResponseMetadata"]["HTTPStatusCode"]}') from err
@keyword('Copy object S3') @keyword('Copy object S3')
def copy_object_s3(s3_client, bucket, object_key, bucket_dst=None): def copy_object_s3(s3_client, bucket, object_key, bucket_dst=None):