From 6ff4e6b245f0f6cf501b9377a04aee415326bdbf Mon Sep 17 00:00:00 2001 From: Casey Bodley Date: Fri, 9 Feb 2024 14:22:37 -0500 Subject: [PATCH] s3: object lock tests for deletion of multipart objects Signed-off-by: Casey Bodley (cherry picked from commit a87f0b63e7c0aee617499d4e161c254e90d62995) --- s3tests_boto3/functional/test_s3.py | 46 +++++++++++++++++++++++++++++ 1 file changed, 46 insertions(+) diff --git a/s3tests_boto3/functional/test_s3.py b/s3tests_boto3/functional/test_s3.py index dbd20e8..4a188bf 100644 --- a/s3tests_boto3/functional/test_s3.py +++ b/s3tests_boto3/functional/test_s3.py @@ -12153,6 +12153,31 @@ def test_object_lock_delete_object_with_retention(): response = client.delete_object(Bucket=bucket_name, Key=key, VersionId=response['VersionId'], BypassGovernanceRetention=True) assert response['ResponseMetadata']['HTTPStatusCode'] == 204 +@pytest.mark.fails_on_dbstore +def test_object_lock_delete_multipart_object_with_retention(): + bucket_name = get_new_bucket_name() + client = get_client() + client.create_bucket(Bucket=bucket_name, ObjectLockEnabledForBucket=True) + + key = 'file1' + body = 'abc' + response = client.create_multipart_upload(Bucket=bucket_name, Key=key, ObjectLockMode='GOVERNANCE', + ObjectLockRetainUntilDate=datetime.datetime(2030,1,1,tzinfo=pytz.UTC)) + upload_id = response['UploadId'] + + response = client.upload_part(UploadId=upload_id, Bucket=bucket_name, Key=key, PartNumber=1, Body=body) + parts = [{'ETag': response['ETag'].strip('"'), 'PartNumber': 1}] + + response = client.complete_multipart_upload(Bucket=bucket_name, Key=key, UploadId=upload_id, MultipartUpload={'Parts': parts}) + + e = assert_raises(ClientError, client.delete_object, Bucket=bucket_name, Key=key, VersionId=response['VersionId']) + status, error_code = _get_status_and_error_code(e.response) + assert status == 403 + assert error_code == 'AccessDenied' + + response = client.delete_object(Bucket=bucket_name, Key=key, VersionId=response['VersionId'], BypassGovernanceRetention=True) + assert response['ResponseMetadata']['HTTPStatusCode'] == 204 + @pytest.mark.fails_on_dbstore def test_object_lock_delete_object_with_retention_and_marker(): bucket_name = get_new_bucket_name() @@ -12329,6 +12354,27 @@ def test_object_lock_delete_object_with_legal_hold_on(): assert error_code == 'AccessDenied' client.put_object_legal_hold(Bucket=bucket_name, Key=key, LegalHold={'Status':'OFF'}) +@pytest.mark.fails_on_dbstore +def test_object_lock_delete_multipart_object_with_legal_hold_on(): + bucket_name = get_new_bucket_name() + client = get_client() + client.create_bucket(Bucket=bucket_name, ObjectLockEnabledForBucket=True) + + key = 'file1' + body = 'abc' + response = client.create_multipart_upload(Bucket=bucket_name, Key=key, ObjectLockLegalHoldStatus='ON') + upload_id = response['UploadId'] + + response = client.upload_part(UploadId=upload_id, Bucket=bucket_name, Key=key, PartNumber=1, Body=body) + parts = [{'ETag': response['ETag'].strip('"'), 'PartNumber': 1}] + + response = client.complete_multipart_upload(Bucket=bucket_name, Key=key, UploadId=upload_id, MultipartUpload={'Parts': parts}) + + e = assert_raises(ClientError, client.delete_object, Bucket=bucket_name, Key=key, VersionId=response['VersionId']) + status, error_code = _get_status_and_error_code(e.response) + assert status == 403 + assert error_code == 'AccessDenied' + client.put_object_legal_hold(Bucket=bucket_name, Key=key, LegalHold={'Status':'OFF'}) @pytest.mark.fails_on_dbstore def test_object_lock_delete_object_with_legal_hold_off():