add test_post_object_upload_checksum

this tests a two-megabyte binary upload with validated
(awscli-computed) SHA256 checksum, and also verifies failure when
a bad checksum is provided

Signed-off-by: Matt Benjamin <mbenjamin@redhat.com>
(cherry picked from commit 95df503ced)
This commit is contained in:
Matt Benjamin 2024-05-03 16:25:19 -04:00 committed by Casey Bodley
parent 7d91b2507f
commit 98c0931419

View file

@ -13560,3 +13560,56 @@ def test_multipart_checksum_3parts():
assert 'ChecksumSHA256' not in response
response = client.head_object(Bucket=bucket, Key=key, ChecksumMode='ENABLED')
assert composite_sha256sum == response['ChecksumSHA256']
def test_post_object_upload_checksum():
megabytes = 1024 * 1024
min_size = 0
max_size = 5 * megabytes
test_payload_size = 2 * megabytes
bucket_name = get_new_bucket()
client = get_client()
url = _get_post_url(bucket_name)
utc = pytz.utc
expires = datetime.datetime.now(utc) + datetime.timedelta(seconds=+6000)
policy_document = {"expiration": expires.strftime("%Y-%m-%dT%H:%M:%SZ"),\
"conditions": [\
{"bucket": bucket_name},\
["starts-with", "$key", "foo_cksum_test"],\
{"acl": "private"},\
["starts-with", "$Content-Type", "text/plain"],\
["content-length-range", min_size, max_size],\
]\
}
test_payload = b'x' * test_payload_size
json_policy_document = json.JSONEncoder().encode(policy_document)
bytes_json_policy_document = bytes(json_policy_document, 'utf-8')
policy = base64.b64encode(bytes_json_policy_document)
aws_secret_access_key = get_main_aws_secret_key()
aws_access_key_id = get_main_aws_access_key()
signature = base64.b64encode(hmac.new(bytes(aws_secret_access_key, 'utf-8'), policy, hashlib.sha1).digest())
# good checksum payload (checked via upload from awscli)
payload = OrderedDict([ ("key" , "foo_cksum_test.txt"),("AWSAccessKeyId" , aws_access_key_id),\
("acl" , "private"),("signature" , signature),("policy" , policy),\
("Content-Type" , "text/plain"),\
('x-amz-checksum-sha256', 'aTL9MeXa9HObn6eP93eygxsJlcwdCwCTysgGAZAgE7w='),\
('file', (test_payload)),])
r = requests.post(url, files=payload, verify=get_config_ssl_verify())
assert r.status_code == 204
# bad checksum payload
payload = OrderedDict([ ("key" , "foo_cksum_test.txt"),("AWSAccessKeyId" , aws_access_key_id),\
("acl" , "private"),("signature" , signature),("policy" , policy),\
("Content-Type" , "text/plain"),\
('x-amz-checksum-sha256', 'sailorjerry'),\
('file', (test_payload)),])
r = requests.post(url, files=payload, verify=get_config_ssl_verify())
assert r.status_code == 400