UploadPartCopy: add test for source bucket with policy

Ref: https://github.com/ceph/ceph/pull/59253

Signed-off-by: Seena Fallah <seenafallah@gmail.com>
This commit is contained in:
Seena Fallah 2024-08-16 14:30:17 +02:00
parent 3458971054
commit ecf7a8a7a9

View file

@ -11410,6 +11410,63 @@ def test_bucket_policy_put_obj_tagging_existing_tag():
assert status == 403 assert status == 403
@pytest.mark.bucket_policy
@pytest.mark.fails_on_dbstore
def test_bucket_policy_upload_part_copy():
bucket_name = _create_objects(keys=['public/foo', 'public/bar', 'private/foo'])
client = get_client()
src_resource = _make_arn_resource("{}/{}".format(bucket_name, "public/*"))
policy_document = make_json_policy("s3:GetObject", src_resource)
client.put_bucket_policy(Bucket=bucket_name, Policy=policy_document)
alt_client = get_alt_client()
bucket_name2 = get_new_bucket(alt_client)
copy_source = {'Bucket': bucket_name, 'Key': 'public/foo'}
# Create a multipart upload
response = alt_client.create_multipart_upload(Bucket=bucket_name2, Key='new_foo')
upload_id = response['UploadId']
# Upload a part
response = alt_client.upload_part_copy(Bucket=bucket_name2, Key='new_foo', PartNumber=1, UploadId=upload_id, CopySource=copy_source)
# Complete the multipart upload
response = alt_client.complete_multipart_upload(
Bucket=bucket_name2, Key='new_foo', UploadId=upload_id,
MultipartUpload={'Parts': [{'PartNumber': 1, 'ETag': response['CopyPartResult']['ETag']}]},
)
response = alt_client.get_object(Bucket=bucket_name2, Key='new_foo')
body = _get_body(response)
assert body == 'public/foo'
copy_source = {'Bucket': bucket_name, 'Key': 'public/bar'}
# Create a multipart upload
response = alt_client.create_multipart_upload(Bucket=bucket_name2, Key='new_foo2')
upload_id = response['UploadId']
# Upload a part
response = alt_client.upload_part_copy(Bucket=bucket_name2, Key='new_foo2', PartNumber=1, UploadId=upload_id, CopySource=copy_source)
# Complete the multipart upload
response = alt_client.complete_multipart_upload(
Bucket=bucket_name2, Key='new_foo2', UploadId=upload_id,
MultipartUpload={'Parts': [{'PartNumber': 1, 'ETag': response['CopyPartResult']['ETag']}]},
)
response = alt_client.get_object(Bucket=bucket_name2, Key='new_foo2')
body = _get_body(response)
assert body == 'public/bar'
copy_source = {'Bucket': bucket_name, 'Key': 'private/foo'}
# Create a multipart upload
response = alt_client.create_multipart_upload(Bucket=bucket_name2, Key='new_foo2')
upload_id = response['UploadId']
# Upload a part
check_access_denied(alt_client.upload_part_copy, Bucket=bucket_name2, Key='new_foo2', PartNumber=1, UploadId=upload_id, CopySource=copy_source)
# Abort the multipart upload
alt_client.abort_multipart_upload(Bucket=bucket_name2, Key='new_foo2', UploadId=upload_id)
@pytest.mark.tagging @pytest.mark.tagging
@pytest.mark.bucket_policy @pytest.mark.bucket_policy
@pytest.mark.fails_on_dbstore @pytest.mark.fails_on_dbstore