mirror of
https://github.com/ceph/s3-tests.git
synced 2024-11-23 23:30:38 +00:00
add test_post_object_upload_checksum
this tests a two-megabyte binary upload with validated (awscli-computed) SHA256 checksum, and also verifies failure when a bad checksum is provided Signed-off-by: Matt Benjamin <mbenjamin@redhat.com>
This commit is contained in:
parent
9577cde013
commit
95df503ced
1 changed files with 53 additions and 0 deletions
|
@ -13560,3 +13560,56 @@ def test_multipart_checksum_3parts():
|
||||||
assert 'ChecksumSHA256' not in response
|
assert 'ChecksumSHA256' not in response
|
||||||
response = client.head_object(Bucket=bucket, Key=key, ChecksumMode='ENABLED')
|
response = client.head_object(Bucket=bucket, Key=key, ChecksumMode='ENABLED')
|
||||||
assert composite_sha256sum == response['ChecksumSHA256']
|
assert composite_sha256sum == response['ChecksumSHA256']
|
||||||
|
|
||||||
|
def test_post_object_upload_checksum():
|
||||||
|
megabytes = 1024 * 1024
|
||||||
|
min_size = 0
|
||||||
|
max_size = 5 * megabytes
|
||||||
|
test_payload_size = 2 * megabytes
|
||||||
|
|
||||||
|
bucket_name = get_new_bucket()
|
||||||
|
client = get_client()
|
||||||
|
|
||||||
|
url = _get_post_url(bucket_name)
|
||||||
|
utc = pytz.utc
|
||||||
|
expires = datetime.datetime.now(utc) + datetime.timedelta(seconds=+6000)
|
||||||
|
|
||||||
|
policy_document = {"expiration": expires.strftime("%Y-%m-%dT%H:%M:%SZ"),\
|
||||||
|
"conditions": [\
|
||||||
|
{"bucket": bucket_name},\
|
||||||
|
["starts-with", "$key", "foo_cksum_test"],\
|
||||||
|
{"acl": "private"},\
|
||||||
|
["starts-with", "$Content-Type", "text/plain"],\
|
||||||
|
["content-length-range", min_size, max_size],\
|
||||||
|
]\
|
||||||
|
}
|
||||||
|
|
||||||
|
test_payload = b'x' * test_payload_size
|
||||||
|
|
||||||
|
json_policy_document = json.JSONEncoder().encode(policy_document)
|
||||||
|
bytes_json_policy_document = bytes(json_policy_document, 'utf-8')
|
||||||
|
policy = base64.b64encode(bytes_json_policy_document)
|
||||||
|
aws_secret_access_key = get_main_aws_secret_key()
|
||||||
|
aws_access_key_id = get_main_aws_access_key()
|
||||||
|
|
||||||
|
signature = base64.b64encode(hmac.new(bytes(aws_secret_access_key, 'utf-8'), policy, hashlib.sha1).digest())
|
||||||
|
|
||||||
|
# good checksum payload (checked via upload from awscli)
|
||||||
|
payload = OrderedDict([ ("key" , "foo_cksum_test.txt"),("AWSAccessKeyId" , aws_access_key_id),\
|
||||||
|
("acl" , "private"),("signature" , signature),("policy" , policy),\
|
||||||
|
("Content-Type" , "text/plain"),\
|
||||||
|
('x-amz-checksum-sha256', 'aTL9MeXa9HObn6eP93eygxsJlcwdCwCTysgGAZAgE7w='),\
|
||||||
|
('file', (test_payload)),])
|
||||||
|
|
||||||
|
r = requests.post(url, files=payload, verify=get_config_ssl_verify())
|
||||||
|
assert r.status_code == 204
|
||||||
|
|
||||||
|
# bad checksum payload
|
||||||
|
payload = OrderedDict([ ("key" , "foo_cksum_test.txt"),("AWSAccessKeyId" , aws_access_key_id),\
|
||||||
|
("acl" , "private"),("signature" , signature),("policy" , policy),\
|
||||||
|
("Content-Type" , "text/plain"),\
|
||||||
|
('x-amz-checksum-sha256', 'sailorjerry'),\
|
||||||
|
('file', (test_payload)),])
|
||||||
|
|
||||||
|
r = requests.post(url, files=payload, verify=get_config_ssl_verify())
|
||||||
|
assert r.status_code == 400
|
||||||
|
|
Loading…
Reference in a new issue