Changes from pull - https://github.com/ceph/s3-tests/pull/490
This commit is contained in:
Vishnu Vardhan 2023-02-18 20:57:44 -08:00
parent b1472019d7
commit 9a8863785f

View file

@ -82,18 +82,13 @@ def get_objects_list(bucket, client=None, prefix=None):
# generator function that returns object listings in batches, where each
# batch is a list of dicts compatible with delete_objects()
def list_versions(client, bucket, batch_size):
key_marker = ''
version_marker = ''
kwargs = {'Bucket': bucket, 'MaxKeys': batch_size}
truncated = True
while truncated:
listing = client.list_object_versions(
Bucket=bucket,
KeyMarker=key_marker,
VersionIdMarker=version_marker,
MaxKeys=batch_size)
listing = client.list_object_versions(**kwargs)
key_marker = listing.get('NextKeyMarker')
version_marker = listing.get('NextVersionIdMarker')
kwargs['KeyMarker'] = listing.get('NextKeyMarker')
kwargs['VersionIdMarker'] = listing.get('NextVersionIdMarker')
truncated = listing['IsTruncated']
objs = listing.get('Versions', []) + listing.get('DeleteMarkers', [])
@ -104,11 +99,25 @@ def nuke_bucket(client, bucket):
batch_size = 128
max_retain_date = None
try:
response = client.get_object_lock_configuration(Bucket=bucket)
LockEnabled = True
except ClientError as error:
if error.response['Error']['Code'] == 'ObjectLockConfigurationNotFoundError':
LockEnabled = False
else:
raise error
# list and delete objects in batches
for objects in list_versions(client, bucket, batch_size):
delete = client.delete_objects(Bucket=bucket,
Delete={'Objects': objects, 'Quiet': True},
BypassGovernanceRetention=True)
if(LockEnabled):
delete = client.delete_objects(Bucket=bucket,
Delete={'Objects': objects, 'Quiet': True},
BypassGovernanceRetention = LockEnabled)
else:
delete = client.delete_objects(Bucket=bucket,
Delete={'Objects': objects, 'Quiet': True})
# check for object locks on 403 AccessDenied errors
for err in delete.get('Errors', []):