diff --git a/s3tests_boto3/functional/test_s3.py b/s3tests_boto3/functional/test_s3.py index ecf69e5..6c306ed 100644 --- a/s3tests_boto3/functional/test_s3.py +++ b/s3tests_boto3/functional/test_s3.py @@ -12211,3 +12211,57 @@ def test_object_lock_uploading_obj(): eq(response['ObjectLockLegalHoldStatus'], 'ON') client.put_object_legal_hold(Bucket=bucket_name, Key=key, LegalHold={'Status':'OFF'}) client.delete_object(Bucket=bucket_name, Key=key, VersionId=response['VersionId'], BypassGovernanceRetention=True) + +@attr(resource='object') +@attr(method='copy') +@attr(operation='copy w/ x-amz-copy-source-if-match: the latest ETag') +@attr(assertion='succeeds') +def test_copy_object_ifmatch_good(): + bucket_name = get_new_bucket() + client = get_client() + resp = client.put_object(Bucket=bucket_name, Key='foo', Body='bar') + + client.copy_object(Bucket=bucket_name, CopySource=bucket_name+'/foo', CopySourceIfMatch=resp['ETag'], Key='bar') + resp = client.get_object(Bucket=bucket_name, Key='bar') + eq(resp['Body'].read(), 'bar') + +@attr(resource='object') +@attr(method='copy') +@attr(operation='copy w/ x-amz-copy-source-if-match: bogus ETag') +@attr(assertion='fails 412') +def test_copy_object_ifmatch_failed(): + bucket_name = get_new_bucket() + client = get_client() + client.put_object(Bucket=bucket_name, Key='foo', Body='bar') + + e = assert_raises(ClientError, client.copy_object, Bucket=bucket_name, CopySource=bucket_name+'/foo', CopySourceIfMatch='ABCORZ', Key='bar') + status, error_code = _get_status_and_error_code(e.response) + eq(status, 412) + eq(error_code, 'PreconditionFailed') + +@attr(resource='object') +@attr(method='copy') +@attr(operation='copy w/ x-amz-copy-source-if-none-match: the latest ETag') +@attr(assertion='fails 412') +def test_copy_object_ifnonematch_good(): + bucket_name = get_new_bucket() + client = get_client() + resp = client.put_object(Bucket=bucket_name, Key='foo', Body='bar') + + e = assert_raises(ClientError, client.copy_object, Bucket=bucket_name, CopySource=bucket_name+'/foo', CopySourceIfNoneMatch=resp['ETag'], Key='bar') + status, error_code = _get_status_and_error_code(e.response) + eq(status, 412) + eq(error_code, 'PreconditionFailed') + +@attr(resource='object') +@attr(method='copy') +@attr(operation='copy w/ x-amz-copy-source-if-none-match: bogus ETag') +@attr(assertion='succeeds') +def test_copy_object_ifnonematch_failed(): + bucket_name = get_new_bucket() + client = get_client() + resp = client.put_object(Bucket=bucket_name, Key='foo', Body='bar') + + client.copy_object(Bucket=bucket_name, CopySource=bucket_name+'/foo', CopySourceIfNoneMatch='ABCORZ', Key='bar') + resp = client.get_object(Bucket=bucket_name, Key='bar') + eq(resp['Body'].read(), 'bar')