mirror of
https://github.com/ceph/s3-tests.git
synced 2024-11-25 13:47:27 +00:00
test Get/HeadObject with partNumber for single-multipart upload
test_multipart_get_part() tests 'normal' multipart uploads. add a new
test case for a multipart upload with a single part to tests the fix
for https://tracker.ceph.com/issues/66705
Signed-off-by: Casey Bodley <cbodley@redhat.com>
(cherry picked from commit bebdfd1ba7
)
This commit is contained in:
parent
7d8331959a
commit
a1273e21af
1 changed files with 42 additions and 0 deletions
|
@ -6454,6 +6454,48 @@ def test_multipart_get_part():
|
||||||
assert status == 400
|
assert status == 400
|
||||||
assert error_code == 'InvalidPart'
|
assert error_code == 'InvalidPart'
|
||||||
|
|
||||||
|
@pytest.mark.fails_on_dbstore
|
||||||
|
def test_multipart_single_get_part():
|
||||||
|
bucket_name = get_new_bucket()
|
||||||
|
client = get_client()
|
||||||
|
key = "mymultipart"
|
||||||
|
|
||||||
|
part_size = 5*1024*1024
|
||||||
|
part_sizes = [part_size] # just one part
|
||||||
|
part_count = len(part_sizes)
|
||||||
|
total_size = sum(part_sizes)
|
||||||
|
|
||||||
|
(upload_id, data, parts) = _multipart_upload(bucket_name, key, total_size, part_size)
|
||||||
|
|
||||||
|
# request part before complete
|
||||||
|
e = assert_raises(ClientError, client.get_object, Bucket=bucket_name, Key=key, PartNumber=1)
|
||||||
|
status, error_code = _get_status_and_error_code(e.response)
|
||||||
|
assert status == 404
|
||||||
|
assert error_code == 'NoSuchKey'
|
||||||
|
|
||||||
|
client.complete_multipart_upload(Bucket=bucket_name, Key=key, UploadId=upload_id, MultipartUpload={'Parts': parts})
|
||||||
|
assert len(parts) == part_count
|
||||||
|
|
||||||
|
for part, size in zip(parts, part_sizes):
|
||||||
|
response = client.head_object(Bucket=bucket_name, Key=key, PartNumber=part['PartNumber'])
|
||||||
|
assert response['PartsCount'] == part_count
|
||||||
|
assert response['ETag'] == '"{}"'.format(part['ETag'])
|
||||||
|
|
||||||
|
response = client.get_object(Bucket=bucket_name, Key=key, PartNumber=part['PartNumber'])
|
||||||
|
assert response['PartsCount'] == part_count
|
||||||
|
assert response['ETag'] == '"{}"'.format(part['ETag'])
|
||||||
|
assert response['ContentLength'] == size
|
||||||
|
# compare contents
|
||||||
|
for chunk in response['Body'].iter_chunks():
|
||||||
|
assert chunk.decode() == data[0:len(chunk)]
|
||||||
|
data = data[len(chunk):]
|
||||||
|
|
||||||
|
# request PartNumber out of range
|
||||||
|
e = assert_raises(ClientError, client.get_object, Bucket=bucket_name, Key=key, PartNumber=5)
|
||||||
|
status, error_code = _get_status_and_error_code(e.response)
|
||||||
|
assert status == 400
|
||||||
|
assert error_code == 'InvalidPart'
|
||||||
|
|
||||||
@pytest.mark.fails_on_dbstore
|
@pytest.mark.fails_on_dbstore
|
||||||
def test_non_multipart_get_part():
|
def test_non_multipart_get_part():
|
||||||
bucket_name = get_new_bucket()
|
bucket_name = get_new_bucket()
|
||||||
|
|
Loading…
Reference in a new issue