diff --git a/s3tests_boto3/functional/test_s3.py b/s3tests_boto3/functional/test_s3.py index 1a085bd..b8a6142 100644 --- a/s3tests_boto3/functional/test_s3.py +++ b/s3tests_boto3/functional/test_s3.py @@ -13560,3 +13560,56 @@ def test_multipart_checksum_3parts(): assert 'ChecksumSHA256' not in response response = client.head_object(Bucket=bucket, Key=key, ChecksumMode='ENABLED') assert composite_sha256sum == response['ChecksumSHA256'] + +def test_post_object_upload_checksum(): + megabytes = 1024 * 1024 + min_size = 0 + max_size = 5 * megabytes + test_payload_size = 2 * megabytes + + bucket_name = get_new_bucket() + client = get_client() + + url = _get_post_url(bucket_name) + utc = pytz.utc + expires = datetime.datetime.now(utc) + datetime.timedelta(seconds=+6000) + + policy_document = {"expiration": expires.strftime("%Y-%m-%dT%H:%M:%SZ"),\ + "conditions": [\ + {"bucket": bucket_name},\ + ["starts-with", "$key", "foo_cksum_test"],\ + {"acl": "private"},\ + ["starts-with", "$Content-Type", "text/plain"],\ + ["content-length-range", min_size, max_size],\ + ]\ + } + + test_payload = b'x' * test_payload_size + + json_policy_document = json.JSONEncoder().encode(policy_document) + bytes_json_policy_document = bytes(json_policy_document, 'utf-8') + policy = base64.b64encode(bytes_json_policy_document) + aws_secret_access_key = get_main_aws_secret_key() + aws_access_key_id = get_main_aws_access_key() + + signature = base64.b64encode(hmac.new(bytes(aws_secret_access_key, 'utf-8'), policy, hashlib.sha1).digest()) + + # good checksum payload (checked via upload from awscli) + payload = OrderedDict([ ("key" , "foo_cksum_test.txt"),("AWSAccessKeyId" , aws_access_key_id),\ + ("acl" , "private"),("signature" , signature),("policy" , policy),\ + ("Content-Type" , "text/plain"),\ + ('x-amz-checksum-sha256', 'aTL9MeXa9HObn6eP93eygxsJlcwdCwCTysgGAZAgE7w='),\ + ('file', (test_payload)),]) + + r = requests.post(url, files=payload, verify=get_config_ssl_verify()) + assert r.status_code == 204 + + # bad checksum payload + payload = OrderedDict([ ("key" , "foo_cksum_test.txt"),("AWSAccessKeyId" , aws_access_key_id),\ + ("acl" , "private"),("signature" , signature),("policy" , policy),\ + ("Content-Type" , "text/plain"),\ + ('x-amz-checksum-sha256', 'sailorjerry'),\ + ('file', (test_payload)),]) + + r = requests.post(url, files=payload, verify=get_config_ssl_verify()) + assert r.status_code == 400