Merge pull request #571 from linuxbox2/wip-get-objattrs

test get_object_attributes
This commit is contained in:
Matt Benjamin 2025-01-13 11:09:29 -05:00 committed by GitHub
commit ff16b9c0c3
No known key found for this signature in database
GPG key ID: B5690EEEBB952194

View file

@ -1212,6 +1212,7 @@ def test_bucket_listv2_unordered():
intersect = set(unordered_keys_out).intersection(unordered_keys_out2) intersect = set(unordered_keys_out).intersection(unordered_keys_out2)
assert 0 == len(intersect) assert 0 == len(intersect)
#pdb.set_trace()
# verify that unordered used with delimiter results in error # verify that unordered used with delimiter results in error
e = assert_raises(ClientError, e = assert_raises(ClientError,
client.list_objects, Bucket=bucket_name, Delimiter="/") client.list_objects, Bucket=bucket_name, Delimiter="/")
@ -5713,6 +5714,44 @@ def _multipart_upload(bucket_name, key, size, part_size=5*1024*1024, client=None
return (upload_id, s, parts) return (upload_id, s, parts)
def _multipart_upload_checksum(bucket_name, key, size, part_size=5*1024*1024, client=None, content_type=None, metadata=None, resend_parts=[]):
"""
generate a multi-part upload for a random file of specifed size,
if requested, generate a list of the parts
return the upload descriptor
"""
if client == None:
client = get_client()
if content_type == None and metadata == None:
response = client.create_multipart_upload(Bucket=bucket_name, Key=key, ChecksumAlgorithm='SHA256')
else:
response = client.create_multipart_upload(Bucket=bucket_name, Key=key, Metadata=metadata, ContentType=content_type,
ChecksumAlgorithm='SHA256')
upload_id = response['UploadId']
s = ''
parts = []
part_checksums = []
for i, part in enumerate(generate_random(size, part_size)):
# part_num is necessary because PartNumber for upload_part and in parts must start at 1 and i starts at 0
part_num = i+1
s += part
response = client.upload_part(UploadId=upload_id, Bucket=bucket_name, Key=key, PartNumber=part_num, Body=part,
ChecksumAlgorithm='SHA256')
parts.append({'ETag': response['ETag'].strip('"'), 'PartNumber': part_num})
armored_part_cksum = base64.b64encode(hashlib.sha256(part.encode('utf-8')).digest())
part_checksums.append(armored_part_cksum.decode())
if i in resend_parts:
client.upload_part(UploadId=upload_id, Bucket=bucket_name, Key=key, PartNumber=part_num, Body=part,
ChecksumAlgorithm='SHA256')
return (upload_id, s, parts, part_checksums)
@pytest.mark.fails_on_dbstore @pytest.mark.fails_on_dbstore
def test_object_copy_versioning_multipart_upload(): def test_object_copy_versioning_multipart_upload():
bucket_name = get_new_bucket() bucket_name = get_new_bucket()
@ -13959,6 +13998,47 @@ def test_multipart_checksum_3parts():
response = client.head_object(Bucket=bucket, Key=key, ChecksumMode='ENABLED') response = client.head_object(Bucket=bucket, Key=key, ChecksumMode='ENABLED')
assert composite_sha256sum == response['ChecksumSHA256'] assert composite_sha256sum == response['ChecksumSHA256']
@pytest.mark.checksum
@pytest.mark.fails_on_dbstore
def test_multipart_checksum_upload_fallback():
bucket = get_new_bucket()
client = get_client()
key = "mpu_cksum_fallback"
alg = 'SHA256'
response = client.create_multipart_upload(
Bucket=bucket, Key=key, ChecksumAlgorithm=alg)
assert alg == response['ChecksumAlgorithm']
upload_id = response['UploadId']
nparts = 3
parts = []
size = 5 * 1024 * 1024 # each part but the last must be at least 5M
for ix in range(0,nparts):
body = FakeWriteFile(size, 'A')
part_num = ix + 1
res = client.upload_part(UploadId=upload_id, Bucket=bucket,
Key=key, PartNumber=part_num, Body=body)
etag = res['ETag']
part = {'ETag': etag, 'PartNumber': part_num}
parts.append(part)
res = client.complete_multipart_upload(
Bucket=bucket, Key=key, UploadId=upload_id,
MultipartUpload={'Parts': parts})
#pdb.set_trace()
assert res['ResponseMetadata']['HTTPStatusCode'] == 200
# not yet merged
#request_attributes = ['ETag', 'Checksum', 'ObjectParts', 'StorageClass',
# 'ObjectSize']
#res = client.get_object_attributes(Bucket=bucket, Key=key, \
# ObjectAttributes=request_attributes)
#upload_checksum = res['Checksum']['ChecksumSHA256']
@pytest.mark.checksum @pytest.mark.checksum
def test_post_object_upload_checksum(): def test_post_object_upload_checksum():
megabytes = 1024 * 1024 megabytes = 1024 * 1024
@ -14013,7 +14093,6 @@ def test_post_object_upload_checksum():
r = requests.post(url, files=payload, verify=get_config_ssl_verify()) r = requests.post(url, files=payload, verify=get_config_ssl_verify())
assert r.status_code == 400 assert r.status_code == 400
def _has_bucket_logging_extension(): def _has_bucket_logging_extension():
src_bucket_name = get_new_bucket_name() src_bucket_name = get_new_bucket_name()
src_bucket = get_new_bucket_resource(name=src_bucket_name) src_bucket = get_new_bucket_resource(name=src_bucket_name)
@ -15321,3 +15400,308 @@ def test_bucket_logging_single_prefix():
src_keys = _get_keys(response) src_keys = _get_keys(response)
found = _verify_records(body, src_bucket_name, 'REST.PUT.OBJECT', src_keys, 'Standard', num_keys) found = _verify_records(body, src_bucket_name, 'REST.PUT.OBJECT', src_keys, 'Standard', num_keys)
assert found assert found
def check_parts_count(parts, expected):
# AWS docs disagree on the name of this element
if 'TotalPartsCount' in parts:
assert parts['TotalPartsCount'] == expected
else:
assert parts['PartsCount'] == expected
@pytest.mark.checksum
@pytest.mark.fails_on_dbstore
def test_get_multipart_checksum_object_attributes():
bucket_name = get_new_bucket()
client = get_client()
#pdb.set_trace()
key = "multipart_checksum"
key_metadata = {'foo': 'bar'}
content_type = 'text/plain'
objlen = 64 * 1024 * 1024
(upload_id, data, parts, checksums) = \
_multipart_upload_checksum(bucket_name=bucket_name, key=key, size=objlen,
content_type=content_type, metadata=key_metadata)
response = client.complete_multipart_upload(Bucket=bucket_name, Key=key,
UploadId=upload_id,
MultipartUpload={'Parts': parts})
upload_checksum = response['ChecksumSHA256']
response = client.get_object(Bucket=bucket_name, Key=key)
request_attributes = ['ETag', 'Checksum', 'ObjectParts', 'StorageClass', 'ObjectSize']
response = client.get_object_attributes(Bucket=bucket_name, Key=key, \
ObjectAttributes=request_attributes)
# check overall object
nparts = len(parts)
assert response['ObjectSize'] == objlen
assert response['Checksum']['ChecksumSHA256'] == upload_checksum
check_parts_count(response['ObjectParts'], nparts)
# check the parts
partno = 1
for obj_part in response['ObjectParts']['Parts']:
assert obj_part['PartNumber'] == partno
if partno < len(parts):
assert obj_part['Size'] == 5 * 1024 * 1024
else:
assert obj_part['Size'] == objlen - ((nparts-1) * (5 * 1024 * 1024))
assert obj_part['ChecksumSHA256'] == checksums[partno - 1]
partno += 1
@pytest.mark.fails_on_dbstore
def test_get_multipart_object_attributes():
bucket_name = get_new_bucket()
client = get_client()
key = "multipart"
part_size = 5*1024*1024
objlen = 30*1024*1024
(upload_id, data, parts) = _multipart_upload(bucket_name, key, objlen, part_size)
response = client.complete_multipart_upload(Bucket=bucket_name, Key=key,
UploadId=upload_id,
MultipartUpload={'Parts': parts})
etag = response['ETag'].strip('"')
assert len(etag)
request_attributes = ['ETag', 'Checksum', 'ObjectParts', 'StorageClass', 'ObjectSize']
response = client.get_object_attributes(Bucket=bucket_name, Key=key, \
ObjectAttributes=request_attributes)
# check overall object
nparts = len(parts)
assert response['ObjectSize'] == objlen
check_parts_count(response['ObjectParts'], len(parts))
assert response['ObjectParts']['IsTruncated'] == False
assert response['ETag'] == etag
assert response['StorageClass'] == 'STANDARD'
# check the parts
partno = 1
for obj_part in response['ObjectParts']['Parts']:
assert obj_part['PartNumber'] == partno
assert obj_part['Size'] == part_size
assert 'ChecksumSHA256' not in obj_part
partno += 1
@pytest.mark.fails_on_dbstore
def test_get_paginated_multipart_object_attributes():
bucket_name = get_new_bucket()
client = get_client()
key = "multipart"
part_size = 5*1024*1024
objlen = 30*1024*1024
(upload_id, data, parts) = _multipart_upload(bucket_name, key, objlen, part_size)
response = client.complete_multipart_upload(Bucket=bucket_name, Key=key,
UploadId=upload_id,
MultipartUpload={'Parts': parts})
etag = response['ETag'].strip('"')
assert len(etag)
request_attributes = ['ETag', 'Checksum', 'ObjectParts', 'StorageClass', 'ObjectSize']
response = client.get_object_attributes(Bucket=bucket_name, Key=key,
ObjectAttributes=request_attributes,
MaxParts=1, PartNumberMarker=3)
# check overall object
assert response['ObjectSize'] == objlen
check_parts_count(response['ObjectParts'], len(parts))
assert response['ObjectParts']['MaxParts'] == 1
assert response['ObjectParts']['PartNumberMarker'] == 3
assert response['ObjectParts']['IsTruncated'] == True
assert response['ObjectParts']['NextPartNumberMarker'] == 4
assert response['ETag'] == etag
assert response['StorageClass'] == 'STANDARD'
# check the part
assert len(response['ObjectParts']['Parts']) == 1
obj_part = response['ObjectParts']['Parts'][0]
assert obj_part['PartNumber'] == 4
assert obj_part['Size'] == part_size
assert 'ChecksumSHA256' not in obj_part
request_attributes = ['ETag', 'Checksum', 'ObjectParts', 'StorageClass', 'ObjectSize']
response = client.get_object_attributes(Bucket=bucket_name, Key=key,
ObjectAttributes=request_attributes,
MaxParts=10, PartNumberMarker=4)
# check overall object
assert response['ObjectSize'] == objlen
check_parts_count(response['ObjectParts'], len(parts))
assert response['ObjectParts']['MaxParts'] == 10
assert response['ObjectParts']['IsTruncated'] == False
assert response['ObjectParts']['PartNumberMarker'] == 4
assert response['ETag'] == etag
assert response['StorageClass'] == 'STANDARD'
# check the parts
assert len(response['ObjectParts']['Parts']) == 2
partno = 5
for obj_part in response['ObjectParts']['Parts']:
assert obj_part['PartNumber'] == partno
assert obj_part['Size'] == part_size
assert 'ChecksumSHA256' not in obj_part
partno += 1
@pytest.mark.fails_on_dbstore
def test_get_single_multipart_object_attributes():
bucket_name = get_new_bucket()
client = get_client()
key = "multipart"
part_size = 5*1024*1024
part_sizes = [part_size] # just one part
part_count = len(part_sizes)
total_size = sum(part_sizes)
(upload_id, data, parts) = _multipart_upload(bucket_name, key, total_size, part_size)
response = client.complete_multipart_upload(Bucket=bucket_name, Key=key,
UploadId=upload_id,
MultipartUpload={'Parts': parts})
etag = response['ETag'].strip('"')
assert len(etag)
request_attributes = ['ETag', 'Checksum', 'ObjectParts', 'StorageClass', 'ObjectSize']
response = client.get_object_attributes(Bucket=bucket_name, Key=key,
ObjectAttributes=request_attributes)
assert response['ObjectSize'] == total_size
check_parts_count(response['ObjectParts'], 1)
assert response['ETag'] == etag
assert response['StorageClass'] == 'STANDARD'
assert len(response['ObjectParts']['Parts']) == 1
obj_part = response['ObjectParts']['Parts'][0]
assert obj_part['PartNumber'] == 1
assert obj_part['Size'] == part_size
assert 'ChecksumSHA256' not in obj_part
def test_get_checksum_object_attributes():
bucket_name = get_new_bucket()
client = get_client()
key = "myobj"
size = 1024
body = FakeWriteFile(size, 'A')
sha256sum = 'arcu6553sHVAiX4MjW0j7I7vD4w6R+Gz9Ok0Q9lTa+0='
response = client.put_object(Bucket=bucket_name, Key=key, Body=body, ChecksumAlgorithm='SHA256', ChecksumSHA256=sha256sum)
assert sha256sum == response['ChecksumSHA256']
etag = response['ETag'].strip('"')
assert len(etag)
request_attributes = ['ETag', 'Checksum', 'ObjectParts', 'StorageClass', 'ObjectSize']
response = client.get_object_attributes(Bucket=bucket_name, Key=key,
ObjectAttributes=request_attributes)
assert response['ObjectSize'] == size
assert response['ETag'] == etag
assert response['StorageClass'] == 'STANDARD'
assert response['Checksum']['ChecksumSHA256'] == sha256sum
assert 'ObjectParts' not in response
def test_get_versioned_object_attributes():
bucket_name = get_new_bucket()
check_configure_versioning_retry(bucket_name, "Enabled", "Enabled")
client = get_client()
key = "obj"
objlen = 3
response = client.put_object(Bucket=bucket_name, Key=key, Body='foo')
etag = response['ETag'].strip('"')
assert len(etag)
version = response['VersionId']
assert len(version)
request_attributes = ['ETag', 'Checksum', 'ObjectParts', 'StorageClass', 'ObjectSize']
response = client.get_object_attributes(Bucket=bucket_name, Key=key,
ObjectAttributes=request_attributes)
assert 'DeleteMarker' not in response
assert response['VersionId'] == version
assert response['ObjectSize'] == 3
assert response['ETag'] == etag
assert response['StorageClass'] == 'STANDARD'
assert 'ObjectParts' not in response
# write a new current version
client.put_object(Bucket=bucket_name, Key=key, Body='foo')
# ask for the original version again
request_attributes = ['ETag', 'Checksum', 'ObjectParts', 'StorageClass', 'ObjectSize']
response = client.get_object_attributes(Bucket=bucket_name, Key=key, VersionId=version,
ObjectAttributes=request_attributes)
assert 'DeleteMarker' not in response
assert response['VersionId'] == version
assert response['ObjectSize'] == 3
assert response['ETag'] == etag
assert response['StorageClass'] == 'STANDARD'
assert 'ObjectParts' not in response
@pytest.mark.encryption
@pytest.mark.fails_on_dbstore
def test_get_sse_c_encrypted_object_attributes():
bucket_name = get_new_bucket()
client = get_client()
key = 'obj'
objlen = 1000
data = 'A'*objlen
sse_args = {
'SSECustomerAlgorithm': 'AES256',
'SSECustomerKey': 'pO3upElrwuEXSoFwCfnZPdSsmt/xWeFa0N9KgDijwVs=',
'SSECustomerKeyMD5': 'DWygnHRtgiJ77HCm+1rvHw=='
}
attrs = ['ETag', 'Checksum', 'ObjectParts', 'StorageClass', 'ObjectSize']
response = client.put_object(Bucket=bucket_name, Key=key, Body=data, **sse_args)
etag = response['ETag'].strip('"')
assert len(etag)
# GetObjectAttributes fails without sse-c headers
e = assert_raises(ClientError, client.get_object_attributes,
Bucket=bucket_name, Key=key, ObjectAttributes=attrs)
status, error_code = _get_status_and_error_code(e.response)
assert status == 400
# and succeeds sse-c headers
response = client.get_object_attributes(Bucket=bucket_name, Key=key,
ObjectAttributes=attrs, **sse_args)
assert 'DeleteMarker' not in response
assert 'VersionId' not in response
assert response['ObjectSize'] == objlen
assert response['ETag'] == etag
assert response['StorageClass'] == 'STANDARD'
assert 'ObjectParts' not in response
@pytest.mark.fails_on_dbstore
def test_get_object_attributes():
bucket_name = get_new_bucket()
client = get_client()
key = "obj"
objlen = 3
response = client.put_object(Bucket=bucket_name, Key=key, Body='foo')
etag = response['ETag'].strip('"')
assert len(etag)
request_attributes = ['ETag', 'Checksum', 'ObjectParts', 'StorageClass', 'ObjectSize']
response = client.get_object_attributes(Bucket=bucket_name, Key=key,
ObjectAttributes=request_attributes)
assert 'DeleteMarker' not in response
assert 'VersionId' not in response
assert response['ObjectSize'] == 3
assert response['ETag'] == etag
assert response['StorageClass'] == 'STANDARD'
assert 'ObjectParts' not in response