From 92a60df56df3d50337dcea67958cbcf9e8f8026d Mon Sep 17 00:00:00 2001 From: caleb miles Date: Mon, 15 Oct 2012 13:59:10 -0400 Subject: [PATCH] test_s3: Add tests of post object. Tests the implementation of browser based uploads via a POST request. Signed-off-by: caleb miles --- requirements.txt | 3 + s3tests/functional/test_s3.py | 1078 ++++++++++++++++++++++++++++++++- 2 files changed, 1080 insertions(+), 1 deletion(-) diff --git a/requirements.txt b/requirements.txt index 461e6cd..919a567 100644 --- a/requirements.txt +++ b/requirements.txt @@ -5,3 +5,6 @@ bunch >=1.0.0 # 0.14 switches to libev, that means bootstrap needs to change too gevent ==0.13.6 isodate >=0.4.4 +requests +pytz +ordereddict diff --git a/s3tests/functional/test_s3.py b/s3tests/functional/test_s3.py index 7f9a495..de30c37 100644 --- a/s3tests/functional/test_s3.py +++ b/s3tests/functional/test_s3.py @@ -13,6 +13,14 @@ import string import socket import ssl import os +import requests +import base64 +import hmac +import sha +import pytz +import json + +import xml.etree.ElementTree as ET from httplib import HTTPConnection, HTTPSConnection from urlparse import urlparse @@ -24,6 +32,8 @@ from .utils import assert_raises import AnonymousAuth from email.header import decode_header +from ordereddict import OrderedDict + from . import ( nuke_prefixed_buckets, @@ -1013,7 +1023,7 @@ def test_object_metadata_replaced_on_put(): @attr(resource='object') @attr(method='put') @attr(operation='data write from file (w/100-Continue)') -@attr(assertion='returns written data') +@attr(assertion='succeeds and returns written data') def test_object_write_file(): # boto Key.set_contents_from_file / .send_file uses Expect: # 100-Continue, so this test exercises that (though a bit too @@ -1026,6 +1036,1072 @@ def test_object_write_file(): eq(got, 'bar') +@attr(resource='object') +@attr(method='post') +@attr(operation='anonymous browser based upload via POST request') +@attr(assertion='succeeds and returns written data') +def test_post_object_anonymous_request(): + bucket = get_new_bucket() + bucket.set_acl('public-read-write') + conn = s3.main + host_name = conn.host + url = '{protocol}://{host}/{bucket}'.format(protocol= 'https' if conn.is_secure else 'http',\ + host=host_name,bucket=bucket.name) + + payload = OrderedDict([("key" , "foo.txt"),("acl" , "public-read"),\ + ("Content-Type" , "text/plain"),('file', ('bar'))]) + + r = requests.post(url, files = payload) + eq(r.status_code, 200) + key = bucket.get_key("foo.txt") + got = key.get_contents_as_string() + eq(got, 'bar') + + +@attr(resource='object') +@attr(method='post') +@attr(operation='authenticated browser based upload via POST request') +@attr(assertion='succeeds and returns written data') +def test_post_object_authenticated_request(): + bucket = get_new_bucket() + conn = s3.main + host_name = conn.host + + url = '{protocol}://{host}/{bucket}'.format(protocol= 'https' if conn.is_secure else 'http',\ + host=host_name,bucket=bucket.name) + + utc = pytz.utc + expires = datetime.datetime.now(utc) + datetime.timedelta(seconds=+6000) + + policy_document = {"expiration": expires.strftime("%Y-%m-%dT%H:%M:%SZ"),\ + "conditions": [\ + {"bucket": bucket.name},\ + ["starts-with", "$key", "foo"],\ + {"acl": "private"},\ + ["starts-with", "$Content-Type", "text/plain"],\ + ["content-length-range", 0, 1024]\ + ]\ + } + + json_policy_document = json.JSONEncoder().encode(policy_document) + policy = base64.b64encode(json_policy_document) + signature = base64.b64encode(hmac.new(conn.aws_secret_access_key, policy, sha).digest()) + + payload = OrderedDict([ ("key" , "foo.txt"),("AWSAccessKeyId" , conn.aws_access_key_id),\ + ("acl" , "private"),("signature" , signature),("policy" , policy),\ + ("Content-Type" , "text/plain"),('file', ('bar'))]) + + r = requests.post(url, files = payload) + eq(r.status_code, 204) + key = bucket.get_key("foo.txt") + got = key.get_contents_as_string() + eq(got, 'bar') + + +@attr(resource='object') +@attr(method='post') +@attr(operation='anonymous browser based upload via POST request') +@attr(assertion='succeeds with status 201') +def test_post_object_set_success_code(): + bucket = get_new_bucket() + bucket.set_acl('public-read-write') + conn = s3.main + host_name = conn.host + url = '{protocol}://{host}/{bucket}'.format(protocol= 'https' if conn.is_secure else 'http',\ + host=host_name,bucket=bucket.name) + + payload = OrderedDict([("key" , "foo.txt"),("acl" , "public-read"),\ + ("success_action_status" , "201"),\ + ("Content-Type" , "text/plain"),('file', ('bar'))]) + + r = requests.post(url, files = payload) + eq(r.status_code, 201) + message = ET.fromstring(r.content).find('Key') + eq(message.text,'foo.txt') + + +@attr(resource='object') +@attr(method='post') +@attr(operation='anonymous browser based upload via POST request') +@attr(assertion='succeeds with status 204') +def test_post_object_set_invalid_success_code(): + bucket = get_new_bucket() + bucket.set_acl('public-read-write') + conn = s3.main + host_name = conn.host + url = '{protocol}://{host}/{bucket}'.format(protocol= 'https' if conn.is_secure else 'http',\ + host=host_name,bucket=bucket.name) + + payload = OrderedDict([("key" , "foo.txt"),("acl" , "public-read"),\ + ("success_action_status" , "404"),\ + ("Content-Type" , "text/plain"),('file', ('bar'))]) + + r = requests.post(url, files = payload) + eq(r.status_code, 204) + eq(r.content,'') + + +@attr(resource='object') +@attr(method='post') +@attr(operation='authenticated browser based upload via POST request') +@attr(assertion='succeeds and returns written data') +def test_post_object_upload_larger_than_chunk(): + bucket = get_new_bucket() + conn = s3.main + host_name = conn.host + + url = '{protocol}://{host}/{bucket}'.format(protocol= 'https' if conn.is_secure else 'http',\ + host=host_name,bucket=bucket.name) + + utc = pytz.utc + expires = datetime.datetime.now(utc) + datetime.timedelta(seconds=+6000) + + policy_document = {"expiration": expires.strftime("%Y-%m-%dT%H:%M:%SZ"),\ + "conditions": [\ + {"bucket": bucket.name},\ + ["starts-with", "$key", "foo"],\ + {"acl": "private"},\ + ["starts-with", "$Content-Type", "text/plain"],\ + ["content-length-range", 0, 5*1024*1024]\ + ]\ + } + + json_policy_document = json.JSONEncoder().encode(policy_document) + policy = base64.b64encode(json_policy_document) + signature = base64.b64encode(hmac.new(conn.aws_secret_access_key, policy, sha).digest()) + + foo_string = 'foo' * 1024*1024 + + payload = OrderedDict([ ("key" , "foo.txt"),("AWSAccessKeyId" , conn.aws_access_key_id),\ + ("acl" , "private"),("signature" , signature),("policy" , policy),\ + ("Content-Type" , "text/plain"),('file', foo_string)]) + + r = requests.post(url, files = payload) + eq(r.status_code, 204) + key = bucket.get_key("foo.txt") + got = key.get_contents_as_string() + eq(got, foo_string) + + +@attr(resource='object') +@attr(method='post') +@attr(operation='authenticated browser based upload via POST request') +@attr(assertion='succeeds and returns written data') +def test_post_object_set_key_from_filename(): + bucket = get_new_bucket() + conn = s3.main + host_name = conn.host + + url = '{protocol}://{host}/{bucket}'.format(protocol= 'https' if conn.is_secure else 'http',\ + host=host_name,bucket=bucket.name) + + utc = pytz.utc + expires = datetime.datetime.now(utc) + datetime.timedelta(seconds=+6000) + + policy_document = {"expiration": expires.strftime("%Y-%m-%dT%H:%M:%SZ"),\ + "conditions": [\ + {"bucket": bucket.name},\ + ["starts-with", "$key", "foo"],\ + {"acl": "private"},\ + ["starts-with", "$Content-Type", "text/plain"],\ + ["content-length-range", 0, 5*1024*1024]\ + ]\ + } + + json_policy_document = json.JSONEncoder().encode(policy_document) + policy = base64.b64encode(json_policy_document) + signature = base64.b64encode(hmac.new(conn.aws_secret_access_key, policy, sha).digest()) + + foo_string = 'foo' * 1024*1024 + + payload = OrderedDict([ ("key" , "${filename}"),("AWSAccessKeyId" , conn.aws_access_key_id),\ + ("acl" , "private"),("signature" , signature),("policy" , policy),\ + ("Content-Type" , "text/plain"),('file', ('foo.txt', 'bar'))]) + + r = requests.post(url, files = payload) + eq(r.status_code, 204) + key = bucket.get_key("foo.txt") + got = key.get_contents_as_string() + eq(got, foo_string) + + +@attr(resource='object') +@attr(method='post') +@attr(operation='authenticated browser based upload via POST request') +@attr(assertion='succeeds with status 204') +def test_post_object_ignored_header(): + bucket = get_new_bucket() + conn = s3.main + host_name = conn.host + + url = '{protocol}://{host}/{bucket}'.format(protocol= 'https' if conn.is_secure else 'http', + host=host_name,bucket=bucket.name) + + utc = pytz.utc + expires = datetime.datetime.now(utc) + datetime.timedelta(seconds=+6000) + + policy_document = {"expiration": expires.strftime("%Y-%m-%dT%H:%M:%SZ"),\ + "conditions": [\ + {"bucket": bucket.name},\ + ["starts-with", "$key", "foo"],\ + {"acl": "private"},\ + ["starts-with", "$Content-Type", "text/plain"],\ + ["content-length-range", 0, 1024]\ + ]\ + } + + json_policy_document = json.JSONEncoder().encode(policy_document) + policy = base64.b64encode(json_policy_document) + signature = base64.b64encode(hmac.new(conn.aws_secret_access_key, policy, sha).digest()) + + payload = OrderedDict([ ("key" , "foo.txt"),("AWSAccessKeyId" , conn.aws_access_key_id),\ + ("acl" , "private"),("signature" , signature),("policy" , policy),\ + ("Content-Type" , "text/plain"),("x-ignore-foo" , "bar"),('file', ('bar'))]) + + r = requests.post(url, files = payload) + eq(r.status_code, 204) + + +@attr(resource='object') +@attr(method='post') +@attr(operation='authenticated browser based upload via POST request') +@attr(assertion='succeeds with status 204') +def test_post_object_case_insensitive_condition_fields(): + bucket = get_new_bucket() + conn = s3.main + host_name = conn.host + + url = '{protocol}://{host}/{bucket}'.format(protocol= 'https' if conn.is_secure else 'http',host=host_name,\ + bucket=bucket.name) + + utc = pytz.utc + expires = datetime.datetime.now(utc) + datetime.timedelta(seconds=+6000) + + policy_document = {"expiration": expires.strftime("%Y-%m-%dT%H:%M:%SZ"),\ + "conditions": [\ + {"bUcKeT": bucket.name},\ + ["StArTs-WiTh", "$KeY", "foo"],\ + {"AcL": "private"},\ + ["StArTs-WiTh", "$CoNtEnT-TyPe", "text/plain"],\ + ["content-length-range", 0, 1024]\ + ]\ + } + + json_policy_document = json.JSONEncoder().encode(policy_document) + policy = base64.b64encode(json_policy_document) + signature = base64.b64encode(hmac.new(conn.aws_secret_access_key, policy, sha).digest()) + + payload = OrderedDict([ ("kEy" , "foo.txt"),("AWSAccessKeyId" , conn.aws_access_key_id),\ + ("aCl" , "private"),("signature" , signature),("pOLICy" , policy),\ + ("Content-Type" , "text/plain"),('file', ('bar'))]) + + r = requests.post(url, files = payload) + eq(r.status_code, 204) + + +@attr(resource='object') +@attr(method='post') +@attr(operation='authenticated browser based upload via POST request') +@attr(assertion='succeeds with escaped leading $ and returns written data') +def test_post_object_escaped_field_values(): + bucket = get_new_bucket() + conn = s3.main + host_name = conn.host + + url = '{protocol}://{host}/{bucket}'.format(protocol= 'https' if conn.is_secure else 'http',\ + host=host_name,bucket=bucket.name) + + utc = pytz.utc + expires = datetime.datetime.now(utc) + datetime.timedelta(seconds=+6000) + + policy_document = {"expiration": expires.strftime("%Y-%m-%dT%H:%M:%SZ"),\ + "conditions": [\ + {"bucket": bucket.name},\ + ["starts-with", "$key", "\$foo"],\ + {"acl": "private"},\ + ["starts-with", "$Content-Type", "text/plain"],\ + ["content-length-range", 0, 1024]\ + ]\ + } + + json_policy_document = json.JSONEncoder().encode(policy_document) + policy = base64.b64encode(json_policy_document) + signature = base64.b64encode(hmac.new(conn.aws_secret_access_key, policy, sha).digest()) + + payload = OrderedDict([ ("key" , "\$foo.txt"),("AWSAccessKeyId" , conn.aws_access_key_id),\ + ("acl" , "private"),("signature" , signature),("policy" , policy),\ + ("Content-Type" , "text/plain"),('file', ('bar'))]) + + r = requests.post(url, files = payload) + eq(r.status_code, 204) + key = bucket.get_key("\$foo.txt") + got = key.get_contents_as_string() + eq(got, 'bar') + + +@attr(resource='object') +@attr(method='post') +@attr(operation='authenticated browser based upload via POST request') +@attr(assertion='succeeds and returns redirect url') +def test_post_object_success_redirect_action(): + bucket = get_new_bucket() + conn = s3.main + host_name = conn.host + + url = '{protocol}://{host}/{bucket}'.format(protocol= 'https' if conn.is_secure else 'http',\ + host=host_name,bucket=bucket.name) + + utc = pytz.utc + expires = datetime.datetime.now(utc) + datetime.timedelta(seconds=+6000) + + policy_document = {"expiration": expires.strftime("%Y-%m-%dT%H:%M:%SZ"),\ + "conditions": [\ + {"bucket": bucket.name},\ + ["starts-with", "$key", "foo"],\ + {"acl": "private"},\ + ["starts-with", "$Content-Type", "text/plain"],\ + ["$eq", "$success_action_redirect", "http://localhost"],\ + ["content-length-range", 0, 1024]\ + ]\ + } + + json_policy_document = json.JSONEncoder().encode(policy_document) + policy = base64.b64encode(json_policy_document) + signature = base64.b64encode(hmac.new(conn.aws_secret_access_key, policy, sha).digest()) + + payload = OrderedDict([ ("key" , "foo.txt"),("AWSAccessKeyId" , conn.aws_access_key_id),\ + ("acl" , "private"),("signature" , signature),("policy" , policy),\ + ("Content-Type" , "text/plain"),("success_action_redirect" , "http://localhost"),\ + ('file', ('bar'))]) + + r = requests.post(url, files = payload) + eq(r.status_code, 200) + url = r.url + key = bucket.get_key("foo.txt") + eq(url, + 'http://localhost/?bucket={bucket}&key={key}&etag=%22{etag}%22'.format(bucket = bucket.name, + key = key.name, etag = key.etag.strip('"'))) + + +@attr(resource='object') +@attr(method='post') +@attr(operation='authenticated browser based upload via POST request') +@attr(assertion='fails with invalid signature error') +def test_post_object_invalid_signature(): + bucket = get_new_bucket() + conn = s3.main + host_name = conn.host + + url = '{protocol}://{host}/{bucket}'.format(protocol= 'https' if conn.is_secure else 'http',\ + host=host_name,bucket=bucket.name) + + utc = pytz.utc + expires = datetime.datetime.now(utc) + datetime.timedelta(seconds=+6000) + + policy_document = {"expiration": expires.strftime("%Y-%m-%dT%H:%M:%SZ"),\ + "conditions": [\ + {"bucket": bucket.name},\ + ["starts-with", "$key", "\$foo"],\ + {"acl": "private"},\ + ["starts-with", "$Content-Type", "text/plain"],\ + ["content-length-range", 0, 1024]\ + ]\ + } + + json_policy_document = json.JSONEncoder().encode(policy_document) + policy = base64.b64encode(json_policy_document) + signature = base64.b64encode(hmac.new(conn.aws_secret_access_key, policy, sha).digest())[::-1] + + payload = OrderedDict([ ("key" , "\$foo.txt"),("AWSAccessKeyId" , conn.aws_access_key_id),\ + ("acl" , "private"),("signature" , signature),("policy" , policy),\ + ("Content-Type" , "text/plain"),('file', ('bar'))]) + + r = requests.post(url, files = payload) + eq(r.status_code, 403) + message = ET.fromstring(r.content).find('Message') + message_text = message.text + eq(message_text, 'The request signature we calculated does not match the\ + signature you provided. Check your key and signing method.') + + +@attr(resource='object') +@attr(method='post') +@attr(operation='authenticated browser based upload via POST request') +@attr(assertion='fails with access key does not exist error') +def test_post_object_invalid_access_key(): + bucket = get_new_bucket() + conn = s3.main + host_name = conn.host + + url = '{protocol}://{host}/{bucket}'.format(protocol= 'https' if conn.is_secure else 'http',\ + host=host_name,bucket=bucket.name) + + utc = pytz.utc + expires = datetime.datetime.now(utc) + datetime.timedelta(seconds=+6000) + + policy_document = {"expiration": expires.strftime("%Y-%m-%dT%H:%M:%SZ"),\ + "conditions": [\ + {"bucket": bucket.name},\ + ["starts-with", "$key", "\$foo"],\ + {"acl": "private"},\ + ["starts-with", "$Content-Type", "text/plain"],\ + ["content-length-range", 0, 1024]\ + ]\ + } + + json_policy_document = json.JSONEncoder().encode(policy_document) + policy = base64.b64encode(json_policy_document) + signature = base64.b64encode(hmac.new(conn.aws_secret_access_key, policy, sha).digest()) + + payload = OrderedDict([ ("key" , "\$foo.txt"),("AWSAccessKeyId" , conn.aws_access_key_id[::-1]),\ + ("acl" , "private"),("signature" , signature),("policy" , policy),\ + ("Content-Type" , "text/plain"),('file', ('bar'))]) + + r = requests.post(url, files = payload) + eq(r.status_code, 403) + message = ET.fromstring(r.content).find('Message') + message_text = message.text + eq(message_text, 'The AWS Access Key Id you provided does not exist in our records.') + + +@attr(resource='object') +@attr(method='post') +@attr(operation='authenticated browser based upload via POST request') +@attr(assertion='fails with invalid expiration error') +def test_post_object_invalid_date_format(): + bucket = get_new_bucket() + conn = s3.main + host_name = conn.host + + url = '{protocol}://{host}/{bucket}'.format(protocol= 'https' if conn.is_secure else 'http',\ + host=host_name,bucket=bucket.name) + + utc = pytz.utc + expires = datetime.datetime.now(utc) + datetime.timedelta(seconds=+6000) + + policy_document = {"expiration": str(expires),\ + "conditions": [\ + {"bucket": bucket.name},\ + ["starts-with", "$key", "\$foo"],\ + {"acl": "private"},\ + ["starts-with", "$Content-Type", "text/plain"],\ + ["content-length-range", 0, 1024]\ + ]\ + } + + json_policy_document = json.JSONEncoder().encode(policy_document) + policy = base64.b64encode(json_policy_document) + signature = base64.b64encode(hmac.new(conn.aws_secret_access_key, policy, sha).digest()) + + payload = OrderedDict([ ("key" , "\$foo.txt"),("AWSAccessKeyId" , conn.aws_access_key_id),\ + ("acl" , "private"),("signature" , signature),("policy" , policy),\ + ("Content-Type" , "text/plain"),('file', ('bar'))]) + + r = requests.post(url, files = payload) + eq(r.status_code, 400) + message = ET.fromstring(r.content).find('Message') + message_text = message.text + eq(message_text,\ + "Invalid Policy: Invalid 'expiration' value: '{date}'".format(date=str(expires))) + + +@attr(resource='object') +@attr(method='post') +@attr(operation='authenticated browser based upload via POST request') +@attr(assertion='fails with missing key error') +def test_post_object_no_key_specified(): + bucket = get_new_bucket() + conn = s3.main + host_name = conn.host + + url = '{protocol}://{host}/{bucket}'.format(protocol= 'https' if conn.is_secure else 'http', + host=host_name,bucket=bucket.name) + + utc = pytz.utc + expires = datetime.datetime.now(utc) + datetime.timedelta(seconds=+6000) + + policy_document = {"expiration": expires.strftime("%Y-%m-%dT%H:%M:%SZ"),\ + "conditions": [\ + {"bucket": bucket.name},\ + ["starts-with", "$key", "\$foo"],\ + {"acl": "private"},\ + ["starts-with", "$Content-Type", "text/plain"],\ + ["content-length-range", 0, 1024]\ + ]\ + } + + json_policy_document = json.JSONEncoder().encode(policy_document) + policy = base64.b64encode(json_policy_document) + signature = base64.b64encode(hmac.new(conn.aws_secret_access_key, policy, sha).digest()) + + payload = OrderedDict([ ("key" , "\$foo.txt"),("AWSAccessKeyId" , conn.aws_access_key_id),\ + ("acl" , "private"),("signature" , signature),("policy" , policy),\ + ("Content-Type" , "text/plain"),('file', ('bar'))]) + + r = requests.post(url, files = payload) + eq(r.status_code, 400) + message = ET.fromstring(r.content).find('Message') + message_text = message.text + eq(message_text, "Bucket POST must contain a field named 'key'.\ + If it is specified, please check the order of the fields.") + + +@attr(resource='object') +@attr(method='post') +@attr(operation='authenticated browser based upload via POST request') +@attr(assertion='fails with missing signature error') +def test_post_object_missing_signature(): + bucket = get_new_bucket() + conn = s3.main + host_name = conn.host + + url = '{protocol}://{host}/{bucket}'.format(protocol= 'https' if conn.is_secure else 'http',\ + host=host_name,bucket=bucket.name) + + utc = pytz.utc + expires = datetime.datetime.now(utc) + datetime.timedelta(seconds=+6000) + + policy_document = {"expiration": expires.strftime("%Y-%m-%dT%H:%M:%SZ"),\ + "conditions": [\ + {"bucket": bucket.name},\ + ["starts-with", "$key", "\$foo"],\ + {"acl": "private"},\ + ["starts-with", "$Content-Type", "text/plain"],\ + ["content-length-range", 0, 1024]\ + ]\ + } + + json_policy_document = json.JSONEncoder().encode(policy_document) + policy = base64.b64encode(json_policy_document) + signature = base64.b64encode(hmac.new(conn.aws_secret_access_key, policy, sha).digest()) + + payload = OrderedDict([ ("key" , "foo.txt"),("AWSAccessKeyId" , conn.aws_access_key_id),\ + ("acl" , "private"),("policy" , policy),\ + ("Content-Type" , "text/plain"),('file', ('bar'))]) + + r = requests.post(url, files = payload) + eq(r.status_code, 400) + message = ET.fromstring(r.content).find('Message') + message_text = message.text + eq("Bucket POST must contain a field named 'Signature'.\ + If it is specified, please check the order of the fields.") + + +@attr(resource='object') +@attr(method='post') +@attr(operation='authenticated browser based upload via POST request') +@attr(assertion='fails with extra input fields policy error') +def test_post_object_missing_policy_condition(): + bucket = get_new_bucket() + conn = s3.main + host_name = conn.host + + url = '{protocol}://{host}/{bucket}'.format(protocol= 'https' if conn.is_secure else 'http',\ + host=host_name,bucket=bucket.name) + + utc = pytz.utc + expires = datetime.datetime.now(utc) + datetime.timedelta(seconds=+6000) + + policy_document = {"expiration": expires.strftime("%Y-%m-%dT%H:%M:%SZ"),\ + "conditions": [\ + ["starts-with", "$key", "foo"],\ + {"acl": "private"},\ + ["starts-with", "$Content-Type", "text/plain"],\ + ["content-length-range", 0, 1024]\ + ]\ + } + + json_policy_document = json.JSONEncoder().encode(policy_document) + policy = base64.b64encode(json_policy_document) + signature = base64.b64encode(hmac.new(conn.aws_secret_access_key, policy, sha).digest()) + + payload = OrderedDict([ ("key" , "foo.txt"),("AWSAccessKeyId" , conn.aws_access_key_id),\ + ("acl" , "private"),("signature" , signature),("policy" , policy),\ + ("Content-Type" , "text/plain"),('file', ('bar'))]) + + r = requests.post(url, files = payload) + eq(r.status_code, 403) + message = ET.fromstring(r.content).find('Message') + message_text = message.text + eq(message_text, 'Invalid according to Policy: Extra input fields: bucket') + + +@attr(resource='object') +@attr(method='post') +@attr(operation='authenticated browser based upload via POST request') +@attr(assertion='succeeds using starts-with restriction on metadata header') +def test_post_object_user_specified_header(): + bucket = get_new_bucket() + conn = s3.main + host_name = conn.host + + url = '{protocol}://{host}/{bucket}'.format(protocol= 'https' if conn.is_secure else 'http',\ + host=host_name,bucket=bucket.name) + + utc = pytz.utc + expires = datetime.datetime.now(utc) + datetime.timedelta(seconds=+6000) + + policy_document = {"expiration": expires.strftime("%Y-%m-%dT%H:%M:%SZ"),\ + "conditions": [\ + {"bucket": bucket.name},\ + ["starts-with", "$key", "foo"],\ + {"acl": "private"},\ + ["starts-with", "$Content-Type", "text/plain"],\ + ["content-length-range", 0, 1024],\ + ["starts-with", "$x-amz-meta-foo", "bar"] + ]\ + } + + json_policy_document = json.JSONEncoder().encode(policy_document) + policy = base64.b64encode(json_policy_document) + signature = base64.b64encode(hmac.new(conn.aws_secret_access_key, policy, sha).digest()) + + payload = OrderedDict([ ("key" , "foo.txt"),("AWSAccessKeyId" , conn.aws_access_key_id),\ + ("acl" , "private"),("signature" , signature),("policy" , policy),\ + ("Content-Type" , "text/plain"),('x-amz-meta-foo' , 'barclamp'),('file', ('bar'))]) + + r = requests.post(url, files = payload) + eq(r.status_code, 204) + key = bucket.get_key("foo.txt") + eq(key.get_metadata('foo'), 'barclamp') + + +@attr(resource='object') +@attr(method='post') +@attr(operation='authenticated browser based upload via POST request') +@attr(assertion='fails with policy condition failed error due to missing field in POST request') +def test_post_object_request_missing_policy_specified_field(): + bucket = get_new_bucket() + conn = s3.main + host_name = conn.host + + url = '{protocol}://{host}/{bucket}'.format(protocol= 'https' if conn.is_secure else 'http', + host=host_name,bucket=bucket.name) + + utc = pytz.utc + expires = datetime.datetime.now(utc) + datetime.timedelta(seconds=+6000) + + policy_document = {"expiration": expires.strftime("%Y-%m-%dT%H:%M:%SZ"),\ + "conditions": [\ + {"bucket": bucket.name},\ + ["starts-with", "$key", "foo"],\ + {"acl": "private"},\ + ["starts-with", "$Content-Type", "text/plain"],\ + ["content-length-range", 0, 1024],\ + ["starts-with", "$x-amz-meta-foo", "bar"] + ]\ + } + + json_policy_document = json.JSONEncoder().encode(policy_document) + policy = base64.b64encode(json_policy_document) + signature = base64.b64encode(hmac.new(conn.aws_secret_access_key, policy, sha).digest()) + + payload = OrderedDict([ ("key" , "foo.txt"),("AWSAccessKeyId" , conn.aws_access_key_id),\ + ("acl" , "private"),("signature" , signature),("policy" , policy),\ + ("Content-Type" , "text/plain"),('file', ('bar'))]) + + r = requests.post(url, files = payload) + eq(r.status_code, 403) + message = ET.fromstring(r.content).find('Message') + message_text = message.text + eq(message_text, 'Invalid according to Policy: Policy Condition failed: ["starts-with", "$x-amz-meta-foo", "bar"]') + + +@attr(resource='object') +@attr(method='post') +@attr(operation='authenticated browser based upload via POST request') +@attr(assertion='fails with conditions must be list error') +def test_post_object_condition_is_case_sensitive(): + bucket = get_new_bucket() + conn = s3.main + host_name = conn.host + + url = '{protocol}://{host}/{bucket}'.format(protocol= 'https' if conn.is_secure else 'http',\ + host=host_name,bucket=bucket.name) + + utc = pytz.utc + expires = datetime.datetime.now(utc) + datetime.timedelta(seconds=+6000) + + policy_document = {"expiration": expires.strftime("%Y-%m-%dT%H:%M:%SZ"),\ + "CONDITIONS": [\ + {"bucket": bucket.name},\ + ["starts-with", "$key", "foo"],\ + {"acl": "private"},\ + ["starts-with", "$Content-Type", "text/plain"],\ + ["content-length-range", 0, 1024],\ + ]\ + } + + json_policy_document = json.JSONEncoder().encode(policy_document) + policy = base64.b64encode(json_policy_document) + signature = base64.b64encode(hmac.new(conn.aws_secret_access_key, policy, sha).digest()) + + payload = OrderedDict([ ("key" , "foo.txt"),("AWSAccessKeyId" , conn.aws_access_key_id),\ + ("acl" , "private"),("signature" , signature),("policy" , policy),\ + ("Content-Type" , "text/plain"),('file', ('bar'))]) + + r = requests.post(url, files = payload) + eq(r.status_code, 400) + message = ET.fromstring(r.content).find('Message') + message_text = message.text + eq(message_text, "Invalid Policy: Invalid 'conditions' value: must be a List.") + + +@attr(resource='object') +@attr(method='post') +@attr(operation='authenticated browser based upload via POST request') +@attr(assertion='fails with expiration must be string error') +def test_post_object_expires_is_case_sensitive(): + bucket = get_new_bucket() + conn = s3.main + host_name = conn.host + + url = '{protocol}://{host}/{bucket}'.format(protocol= 'https' if conn.is_secure else 'http',\ + host=host_name,bucket=bucket.name) + + utc = pytz.utc + expires = datetime.datetime.now(utc) + datetime.timedelta(seconds=+6000) + + policy_document = {"EXPIRATION": expires.strftime("%Y-%m-%dT%H:%M:%SZ"),\ + "conditions": [\ + {"bucket": bucket.name},\ + ["starts-with", "$key", "foo"],\ + {"acl": "private"},\ + ["starts-with", "$Content-Type", "text/plain"],\ + ["content-length-range", 0, 1024],\ + ]\ + } + + json_policy_document = json.JSONEncoder().encode(policy_document) + policy = base64.b64encode(json_policy_document) + signature = base64.b64encode(hmac.new(conn.aws_secret_access_key, policy, sha).digest()) + + payload = OrderedDict([ ("key" , "foo.txt"),("AWSAccessKeyId" , conn.aws_access_key_id),\ + ("acl" , "private"),("signature" , signature),("policy" , policy),\ + ("Content-Type" , "text/plain"),('file', ('bar'))]) + + r = requests.post(url, files = payload) + eq(r.status_code, 400) + message = ET.fromstring(r.content).find('Message') + message_text = message.text + eq(message_text, "Invalid Policy: Invalid 'expiration' value: must be a String.") + + +@attr(resource='object') +@attr(method='post') +@attr(operation='authenticated browser based upload via POST request') +@attr(assertion='fails with policy expired error') +def test_post_object_expired_policy(): + bucket = get_new_bucket() + conn = s3.main + host_name = conn.host + + url = '{protocol}://{host}/{bucket}'.format(protocol= 'https' if conn.is_secure else 'http',\ + host=host_name,bucket=bucket.name) + + utc = pytz.utc + expires = datetime.datetime.now(utc) + datetime.timedelta(seconds=-6000) + + policy_document = {"expiration": expires.strftime("%Y-%m-%dT%H:%M:%SZ"),\ + "conditions": [\ + {"bucket": bucket.name},\ + ["starts-with", "$key", "foo"],\ + {"acl": "private"},\ + ["starts-with", "$Content-Type", "text/plain"],\ + ["content-length-range", 0, 1024],\ + ]\ + } + + json_policy_document = json.JSONEncoder().encode(policy_document) + policy = base64.b64encode(json_policy_document) + signature = base64.b64encode(hmac.new(conn.aws_secret_access_key, policy, sha).digest()) + + payload = OrderedDict([ ("key" , "foo.txt"),("AWSAccessKeyId" , conn.aws_access_key_id),\ + ("acl" , "private"),("signature" , signature),("policy" , policy),\ + ("Content-Type" , "text/plain"),('file', ('bar'))]) + + r = requests.post(url, files = payload) + eq(r.status_code, 403) + message = ET.fromstring(r.content).find('Message') + message_text = message.text + eq(message_text, 'Invalid according to Policy: Policy expired.') + + +@attr(resource='object') +@attr(method='post') +@attr(operation='authenticated browser based upload via POST request') +@attr(assertion='fails using equality restriction on metadata header') +def test_post_object_invalid_request_field_value(): + bucket = get_new_bucket() + conn = s3.main + host_name = conn.host + + url = '{protocol}://{host}/{bucket}'.format(protocol= 'https' if conn.is_secure else 'http',\ + host=host_name,bucket=bucket.name) + + utc = pytz.utc + expires = datetime.datetime.now(utc) + datetime.timedelta(seconds=+6000) + + policy_document = {"expiration": expires.strftime("%Y-%m-%dT%H:%M:%SZ"),\ + "conditions": [\ + {"bucket": bucket.name},\ + ["starts-with", "$key", "foo"],\ + {"acl": "private"},\ + ["starts-with", "$Content-Type", "text/plain"],\ + ["content-length-range", 0, 1024],\ + ["eq", "$x-amz-meta-foo", ""] + ]\ + } + + json_policy_document = json.JSONEncoder().encode(policy_document) + policy = base64.b64encode(json_policy_document) + signature = base64.b64encode(hmac.new(conn.aws_secret_access_key, policy, sha).digest()) + + payload = OrderedDict([ ("key" , "foo.txt"),("AWSAccessKeyId" , conn.aws_access_key_id),\ + ("acl" , "private"),("signature" , signature),("policy" , policy),\ + ("Content-Type" , "text/plain"),('x-amz-meta-foo' , 'barclamp'),('file', ('bar'))]) + + r = requests.post(url, files = payload) + eq(r.status_code, 403) + message = ET.fromstring(r.content).find('Message') + message_text = message.text + eq(message_text, 'Invalid according to Policy: Policy Condition failed:\ + ["eq", "$x-amz-meta-foo", ""]') + + +@attr(resource='object') +@attr(method='post') +@attr(operation='authenticated browser based upload via POST request') +@attr(assertion='fails with policy missing expiration error') +def test_post_object_missing_expires_condition(): + bucket = get_new_bucket() + conn = s3.main + host_name = conn.host + + url = '{protocol}://{host}/{bucket}'.format(protocol= 'https' if conn.is_secure else 'http',\ + host=host_name,bucket=bucket.name) + + utc = pytz.utc + expires = datetime.datetime.now(utc) + datetime.timedelta(seconds=+6000) + + policy_document = {\ + "conditions": [\ + {"bucket": bucket.name},\ + ["starts-with", "$key", "foo"],\ + {"acl": "private"},\ + ["starts-with", "$Content-Type", "text/plain"],\ + ["content-length-range", 0, 1024],\ + ]\ + } + + json_policy_document = json.JSONEncoder().encode(policy_document) + policy = base64.b64encode(json_policy_document) + signature = base64.b64encode(hmac.new(conn.aws_secret_access_key, policy, sha).digest()) + + payload = OrderedDict([ ("key" , "foo.txt"),("AWSAccessKeyId" , conn.aws_access_key_id),\ + ("acl" , "private"),("signature" , signature),("policy" , policy),\ + ("Content-Type" , "text/plain"),('file', ('bar'))]) + + r = requests.post(url, files = payload) + eq(r.status_code, 400) + message = ET.fromstring(r.content).find('Message') + message_text = message.text + eq(message_text, 'Invalid Policy: Policy missing expiration.') + + +@attr(resource='object') +@attr(method='post') +@attr(operation='authenticated browser based upload via POST request') +@attr(assertion='fails with policy missing conditions error') +def test_post_object_missing_conditions_list(): + bucket = get_new_bucket() + conn = s3.main + host_name = conn.host + + url = '{protocol}://{host}/{bucket}'.format(protocol= 'https' if conn.is_secure else 'http',\ + host=host_name,bucket=bucket.name) + + utc = pytz.utc + expires = datetime.datetime.now(utc) + datetime.timedelta(seconds=+6000) + + policy_document = {"expiration": expires.strftime("%Y-%m-%dT%H:%M:%SZ"),\ + } + + json_policy_document = json.JSONEncoder().encode(policy_document) + policy = base64.b64encode(json_policy_document) + signature = base64.b64encode(hmac.new(conn.aws_secret_access_key, policy, sha).digest()) + + payload = OrderedDict([ ("key" , "foo.txt"),("AWSAccessKeyId" , conn.aws_access_key_id),\ + ("acl" , "private"),("signature" , signature),("policy" , policy),\ + ("Content-Type" , "text/plain"),('file', ('bar'))]) + + r = requests.post(url, files = payload) + eq(r.status_code, 400) + message = ET.fromstring(r.content).find('Message') + message_text = message.text + eq(message_text, 'Invalid Policy: Policy missing conditions.') + + +@attr(resource='object') +@attr(method='post') +@attr(operation='authenticated browser based upload via POST request') +@attr(assertion='fails with allowable upload size exceeded error') +def test_post_object_upload_size_limit_exceeded(): + bucket = get_new_bucket() + conn = s3.main + host_name = conn.host + + url = '{protocol}://{host}/{bucket}'.format(protocol= 'https' if conn.is_secure else 'http',\ + host=host_name,bucket=bucket.name) + + utc = pytz.utc + expires = datetime.datetime.now(utc) + datetime.timedelta(seconds=+6000) + + policy_document = {"expiration": expires.strftime("%Y-%m-%dT%H:%M:%SZ"),\ + "conditions": [\ + {"bucket": bucket.name},\ + ["starts-with", "$key", "foo"],\ + {"acl": "private"},\ + ["starts-with", "$Content-Type", "text/plain"],\ + ["content-length-range", 0, 0]\ + ]\ + } + + json_policy_document = json.JSONEncoder().encode(policy_document) + policy = base64.b64encode(json_policy_document) + signature = base64.b64encode(hmac.new(conn.aws_secret_access_key, policy, sha).digest()) + + payload = OrderedDict([ ("key" , "foo.txt"),("AWSAccessKeyId" , conn.aws_access_key_id),\ + ("acl" , "private"),("signature" , signature),("policy" , policy),\ + ("Content-Type" , "text/plain"),('file', ('bar'))]) + + r = requests.post(url, files = payload) + eq(r.status_code, 400) + message = ET.fromstring(r.content).find('Message') + message_text = message.text + eq(message_text, 'Your proposed upload exceeds the maximum allowed size') + + +@attr(resource='object') +@attr(method='post') +@attr(operation='authenticated browser based upload via POST request') +@attr(assertion='fails with invalid content length error') +def test_post_object_missing_content_length_argument(): + bucket = get_new_bucket() + conn = s3.main + host_name = conn.host + + url = '{protocol}://{host}/{bucket}'.format(protocol= 'https' if conn.is_secure else 'http',\ + host=host_name,bucket=bucket.name) + + utc = pytz.utc + expires = datetime.datetime.now(utc) + datetime.timedelta(seconds=+6000) + + policy_document = {"expiration": expires.strftime("%Y-%m-%dT%H:%M:%SZ"),\ + "conditions": [\ + {"bucket": bucket.name},\ + ["starts-with", "$key", "foo"],\ + {"acl": "private"},\ + ["starts-with", "$Content-Type", "text/plain"],\ + ["content-length-range", 0]\ + ]\ + } + + json_policy_document = json.JSONEncoder().encode(policy_document) + policy = base64.b64encode(json_policy_document) + signature = base64.b64encode(hmac.new(conn.aws_secret_access_key, policy, sha).digest()) + + payload = OrderedDict([ ("key" , "foo.txt"),("AWSAccessKeyId" , conn.aws_access_key_id),\ + ("acl" , "private"),("signature" , signature),("policy" , policy),\ + ("Content-Type" , "text/plain"),('file', ('bar'))]) + + r = requests.post(url, files = payload) + eq(r.status_code, 400) + message = ET.fromstring(r.content).find('Message') + message_text = message.text + eq(message_text, 'Invalid Policy: Invalid content-length-range: wrong number of arguments.') + + +@attr(resource='object') +@attr(method='post') +@attr(operation='authenticated browser based upload via POST request') +@attr(assertion='fails with invalid JSON error') +def test_post_object_invalid_content_length_argument(): + bucket = get_new_bucket() + conn = s3.main + host_name = conn.host + + url = '{protocol}://{host}/{bucket}'.format(protocol= 'https' if conn.is_secure else 'http',\ + host=host_name,bucket=bucket.name) + + utc = pytz.utc + expires = datetime.datetime.now(utc) + datetime.timedelta(seconds=+6000) + + policy_document = {"expiration": expires.strftime("%Y-%m-%dT%H:%M:%SZ"),\ + "conditions": [\ + {"bucket": bucket.name},\ + ["starts-with", "$key", "foo"],\ + {"acl": "private"},\ + ["starts-with", "$Content-Type", "text/plain"],\ + ["content-length-range", -1, 0]\ + ]\ + } + + json_policy_document = json.JSONEncoder().encode(policy_document) + policy = base64.b64encode(json_policy_document) + signature = base64.b64encode(hmac.new(conn.aws_secret_access_key, policy, sha).digest()) + + payload = OrderedDict([ ("key" , "foo.txt"),("AWSAccessKeyId" , conn.aws_access_key_id),\ + ("acl" , "private"),("signature" , signature),("policy" , policy),\ + ("Content-Type" , "text/plain"),('file', ('bar'))]) + + r = requests.post(url, files = payload) + eq(r.status_code, 400) + message = ET.fromstring(r.content).find('Message') + message_text = message.text + eq(message_text, 'Invalid Policy: Invalid JSON.') + + +@attr(resource='object') +@attr(method='post') +@attr(operation='authenticated browser based upload via POST request') +@attr(assertion='fails with upload size less than minimum allowable error') +def test_post_object_upload_size_below_minimum(): + bucket = get_new_bucket() + conn = s3.main + host_name = conn.host + + url = '{protocol}://{host}/{bucket}'.format(protocol= 'https' if conn.is_secure else 'http',\ + host=host_name,bucket=bucket.name) + + utc = pytz.utc + expires = datetime.datetime.now(utc) + datetime.timedelta(seconds=+6000) + + policy_document = {"expiration": expires.strftime("%Y-%m-%dT%H:%M:%SZ"),\ + "conditions": [\ + {"bucket": bucket.name},\ + ["starts-with", "$key", "foo"],\ + {"acl": "private"},\ + ["starts-with", "$Content-Type", "text/plain"],\ + ["content-length-range", 512, 1000]\ + ]\ + } + + json_policy_document = json.JSONEncoder().encode(policy_document) + policy = base64.b64encode(json_policy_document) + signature = base64.b64encode(hmac.new(conn.aws_secret_access_key, policy, sha).digest()) + + payload = OrderedDict([ ("key" , "foo.txt"),("AWSAccessKeyId" , conn.aws_access_key_id),\ + ("acl" , "private"),("signature" , signature),("policy" , policy),\ + ("Content-Type" , "text/plain"),('file', ('bar'))]) + + r = requests.post(url, files = payload) + eq(r.status_code, 400) + message = ET.fromstring(r.content).find('Message') + message_text = message.text + eq(message_text, 'Your proposed upload is smaller than the minimum allowed size') + + def _setup_request(bucket_acl=None, object_acl=None): """ add a foo key, and specified key and bucket acls to