mirror of
https://github.com/ceph/s3-tests.git
synced 2024-11-22 09:29:43 +00:00
rgw: fix post tests to include tcp port
Signed-off-by: Yehuda Sadeh <yehuda@inktank.com>
This commit is contained in:
parent
efa352711b
commit
414a107b46
1 changed files with 61 additions and 117 deletions
|
@ -1034,17 +1034,20 @@ def test_object_write_file():
|
||||||
eq(got, 'bar')
|
eq(got, 'bar')
|
||||||
|
|
||||||
|
|
||||||
|
def _get_post_url(conn, bucket):
|
||||||
|
|
||||||
|
url = '{protocol}://{host}:{port}/{bucket}'.format(protocol= 'https' if conn.is_secure else 'http',\
|
||||||
|
host=conn.host, port=conn.port, bucket=bucket.name)
|
||||||
|
return url
|
||||||
|
|
||||||
@attr(resource='object')
|
@attr(resource='object')
|
||||||
@attr(method='post')
|
@attr(method='post')
|
||||||
@attr(operation='anonymous browser based upload via POST request')
|
@attr(operation='anonymous browser based upload via POST request')
|
||||||
@attr(assertion='succeeds and returns written data')
|
@attr(assertion='succeeds and returns written data')
|
||||||
def test_post_object_anonymous_request():
|
def test_post_object_anonymous_request():
|
||||||
bucket = get_new_bucket()
|
bucket = get_new_bucket()
|
||||||
|
url = _get_post_url(s3.main, bucket)
|
||||||
bucket.set_acl('public-read-write')
|
bucket.set_acl('public-read-write')
|
||||||
conn = s3.main
|
|
||||||
host_name = conn.host
|
|
||||||
url = '{protocol}://{host}/{bucket}'.format(protocol= 'https' if conn.is_secure else 'http',\
|
|
||||||
host=host_name,bucket=bucket.name)
|
|
||||||
|
|
||||||
payload = OrderedDict([("key" , "foo.txt"),("acl" , "public-read"),\
|
payload = OrderedDict([("key" , "foo.txt"),("acl" , "public-read"),\
|
||||||
("Content-Type" , "text/plain"),('file', ('bar'))])
|
("Content-Type" , "text/plain"),('file', ('bar'))])
|
||||||
|
@ -1062,11 +1065,8 @@ def test_post_object_anonymous_request():
|
||||||
@attr(assertion='succeeds and returns written data')
|
@attr(assertion='succeeds and returns written data')
|
||||||
def test_post_object_authenticated_request():
|
def test_post_object_authenticated_request():
|
||||||
bucket = get_new_bucket()
|
bucket = get_new_bucket()
|
||||||
conn = s3.main
|
|
||||||
host_name = conn.host
|
|
||||||
|
|
||||||
url = '{protocol}://{host}/{bucket}'.format(protocol= 'https' if conn.is_secure else 'http',\
|
url = _get_post_url(s3.main, bucket)
|
||||||
host=host_name,bucket=bucket.name)
|
|
||||||
|
|
||||||
utc = pytz.utc
|
utc = pytz.utc
|
||||||
expires = datetime.datetime.now(utc) + datetime.timedelta(seconds=+6000)
|
expires = datetime.datetime.now(utc) + datetime.timedelta(seconds=+6000)
|
||||||
|
@ -1083,6 +1083,7 @@ def test_post_object_authenticated_request():
|
||||||
|
|
||||||
json_policy_document = json.JSONEncoder().encode(policy_document)
|
json_policy_document = json.JSONEncoder().encode(policy_document)
|
||||||
policy = base64.b64encode(json_policy_document)
|
policy = base64.b64encode(json_policy_document)
|
||||||
|
conn = s3.main
|
||||||
signature = base64.b64encode(hmac.new(conn.aws_secret_access_key, policy, sha).digest())
|
signature = base64.b64encode(hmac.new(conn.aws_secret_access_key, policy, sha).digest())
|
||||||
|
|
||||||
payload = OrderedDict([ ("key" , "foo.txt"),("AWSAccessKeyId" , conn.aws_access_key_id),\
|
payload = OrderedDict([ ("key" , "foo.txt"),("AWSAccessKeyId" , conn.aws_access_key_id),\
|
||||||
|
@ -1103,10 +1104,7 @@ def test_post_object_authenticated_request():
|
||||||
def test_post_object_set_success_code():
|
def test_post_object_set_success_code():
|
||||||
bucket = get_new_bucket()
|
bucket = get_new_bucket()
|
||||||
bucket.set_acl('public-read-write')
|
bucket.set_acl('public-read-write')
|
||||||
conn = s3.main
|
url = _get_post_url(s3.main, bucket)
|
||||||
host_name = conn.host
|
|
||||||
url = '{protocol}://{host}/{bucket}'.format(protocol= 'https' if conn.is_secure else 'http',\
|
|
||||||
host=host_name,bucket=bucket.name)
|
|
||||||
|
|
||||||
payload = OrderedDict([("key" , "foo.txt"),("acl" , "public-read"),\
|
payload = OrderedDict([("key" , "foo.txt"),("acl" , "public-read"),\
|
||||||
("success_action_status" , "201"),\
|
("success_action_status" , "201"),\
|
||||||
|
@ -1125,10 +1123,7 @@ def test_post_object_set_success_code():
|
||||||
def test_post_object_set_invalid_success_code():
|
def test_post_object_set_invalid_success_code():
|
||||||
bucket = get_new_bucket()
|
bucket = get_new_bucket()
|
||||||
bucket.set_acl('public-read-write')
|
bucket.set_acl('public-read-write')
|
||||||
conn = s3.main
|
url = _get_post_url(s3.main, bucket)
|
||||||
host_name = conn.host
|
|
||||||
url = '{protocol}://{host}/{bucket}'.format(protocol= 'https' if conn.is_secure else 'http',\
|
|
||||||
host=host_name,bucket=bucket.name)
|
|
||||||
|
|
||||||
payload = OrderedDict([("key" , "foo.txt"),("acl" , "public-read"),\
|
payload = OrderedDict([("key" , "foo.txt"),("acl" , "public-read"),\
|
||||||
("success_action_status" , "404"),\
|
("success_action_status" , "404"),\
|
||||||
|
@ -1145,11 +1140,8 @@ def test_post_object_set_invalid_success_code():
|
||||||
@attr(assertion='succeeds and returns written data')
|
@attr(assertion='succeeds and returns written data')
|
||||||
def test_post_object_upload_larger_than_chunk():
|
def test_post_object_upload_larger_than_chunk():
|
||||||
bucket = get_new_bucket()
|
bucket = get_new_bucket()
|
||||||
conn = s3.main
|
|
||||||
host_name = conn.host
|
|
||||||
|
|
||||||
url = '{protocol}://{host}/{bucket}'.format(protocol= 'https' if conn.is_secure else 'http',\
|
url = _get_post_url(s3.main, bucket)
|
||||||
host=host_name,bucket=bucket.name)
|
|
||||||
|
|
||||||
utc = pytz.utc
|
utc = pytz.utc
|
||||||
expires = datetime.datetime.now(utc) + datetime.timedelta(seconds=+6000)
|
expires = datetime.datetime.now(utc) + datetime.timedelta(seconds=+6000)
|
||||||
|
@ -1166,6 +1158,7 @@ def test_post_object_upload_larger_than_chunk():
|
||||||
|
|
||||||
json_policy_document = json.JSONEncoder().encode(policy_document)
|
json_policy_document = json.JSONEncoder().encode(policy_document)
|
||||||
policy = base64.b64encode(json_policy_document)
|
policy = base64.b64encode(json_policy_document)
|
||||||
|
conn = s3.main
|
||||||
signature = base64.b64encode(hmac.new(conn.aws_secret_access_key, policy, sha).digest())
|
signature = base64.b64encode(hmac.new(conn.aws_secret_access_key, policy, sha).digest())
|
||||||
|
|
||||||
foo_string = 'foo' * 1024*1024
|
foo_string = 'foo' * 1024*1024
|
||||||
|
@ -1187,11 +1180,8 @@ def test_post_object_upload_larger_than_chunk():
|
||||||
@attr(assertion='succeeds and returns written data')
|
@attr(assertion='succeeds and returns written data')
|
||||||
def test_post_object_set_key_from_filename():
|
def test_post_object_set_key_from_filename():
|
||||||
bucket = get_new_bucket()
|
bucket = get_new_bucket()
|
||||||
conn = s3.main
|
|
||||||
host_name = conn.host
|
|
||||||
|
|
||||||
url = '{protocol}://{host}/{bucket}'.format(protocol= 'https' if conn.is_secure else 'http',\
|
url = _get_post_url(s3.main, bucket)
|
||||||
host=host_name,bucket=bucket.name)
|
|
||||||
|
|
||||||
utc = pytz.utc
|
utc = pytz.utc
|
||||||
expires = datetime.datetime.now(utc) + datetime.timedelta(seconds=+6000)
|
expires = datetime.datetime.now(utc) + datetime.timedelta(seconds=+6000)
|
||||||
|
@ -1208,6 +1198,7 @@ def test_post_object_set_key_from_filename():
|
||||||
|
|
||||||
json_policy_document = json.JSONEncoder().encode(policy_document)
|
json_policy_document = json.JSONEncoder().encode(policy_document)
|
||||||
policy = base64.b64encode(json_policy_document)
|
policy = base64.b64encode(json_policy_document)
|
||||||
|
conn = s3.main
|
||||||
signature = base64.b64encode(hmac.new(conn.aws_secret_access_key, policy, sha).digest())
|
signature = base64.b64encode(hmac.new(conn.aws_secret_access_key, policy, sha).digest())
|
||||||
|
|
||||||
payload = OrderedDict([ ("key" , "${filename}"),("AWSAccessKeyId" , conn.aws_access_key_id),\
|
payload = OrderedDict([ ("key" , "${filename}"),("AWSAccessKeyId" , conn.aws_access_key_id),\
|
||||||
|
@ -1227,11 +1218,8 @@ def test_post_object_set_key_from_filename():
|
||||||
@attr(assertion='succeeds with status 204')
|
@attr(assertion='succeeds with status 204')
|
||||||
def test_post_object_ignored_header():
|
def test_post_object_ignored_header():
|
||||||
bucket = get_new_bucket()
|
bucket = get_new_bucket()
|
||||||
conn = s3.main
|
|
||||||
host_name = conn.host
|
|
||||||
|
|
||||||
url = '{protocol}://{host}/{bucket}'.format(protocol= 'https' if conn.is_secure else 'http',
|
url = _get_post_url(s3.main, bucket)
|
||||||
host=host_name,bucket=bucket.name)
|
|
||||||
|
|
||||||
utc = pytz.utc
|
utc = pytz.utc
|
||||||
expires = datetime.datetime.now(utc) + datetime.timedelta(seconds=+6000)
|
expires = datetime.datetime.now(utc) + datetime.timedelta(seconds=+6000)
|
||||||
|
@ -1248,6 +1236,7 @@ def test_post_object_ignored_header():
|
||||||
|
|
||||||
json_policy_document = json.JSONEncoder().encode(policy_document)
|
json_policy_document = json.JSONEncoder().encode(policy_document)
|
||||||
policy = base64.b64encode(json_policy_document)
|
policy = base64.b64encode(json_policy_document)
|
||||||
|
conn = s3.main
|
||||||
signature = base64.b64encode(hmac.new(conn.aws_secret_access_key, policy, sha).digest())
|
signature = base64.b64encode(hmac.new(conn.aws_secret_access_key, policy, sha).digest())
|
||||||
|
|
||||||
payload = OrderedDict([ ("key" , "foo.txt"),("AWSAccessKeyId" , conn.aws_access_key_id),\
|
payload = OrderedDict([ ("key" , "foo.txt"),("AWSAccessKeyId" , conn.aws_access_key_id),\
|
||||||
|
@ -1264,11 +1253,8 @@ def test_post_object_ignored_header():
|
||||||
@attr(assertion='succeeds with status 204')
|
@attr(assertion='succeeds with status 204')
|
||||||
def test_post_object_case_insensitive_condition_fields():
|
def test_post_object_case_insensitive_condition_fields():
|
||||||
bucket = get_new_bucket()
|
bucket = get_new_bucket()
|
||||||
conn = s3.main
|
|
||||||
host_name = conn.host
|
|
||||||
|
|
||||||
url = '{protocol}://{host}/{bucket}'.format(protocol= 'https' if conn.is_secure else 'http',host=host_name,\
|
url = _get_post_url(s3.main, bucket)
|
||||||
bucket=bucket.name)
|
|
||||||
|
|
||||||
utc = pytz.utc
|
utc = pytz.utc
|
||||||
expires = datetime.datetime.now(utc) + datetime.timedelta(seconds=+6000)
|
expires = datetime.datetime.now(utc) + datetime.timedelta(seconds=+6000)
|
||||||
|
@ -1285,6 +1271,7 @@ def test_post_object_case_insensitive_condition_fields():
|
||||||
|
|
||||||
json_policy_document = json.JSONEncoder().encode(policy_document)
|
json_policy_document = json.JSONEncoder().encode(policy_document)
|
||||||
policy = base64.b64encode(json_policy_document)
|
policy = base64.b64encode(json_policy_document)
|
||||||
|
conn = s3.main
|
||||||
signature = base64.b64encode(hmac.new(conn.aws_secret_access_key, policy, sha).digest())
|
signature = base64.b64encode(hmac.new(conn.aws_secret_access_key, policy, sha).digest())
|
||||||
|
|
||||||
payload = OrderedDict([ ("kEy" , "foo.txt"),("AWSAccessKeyId" , conn.aws_access_key_id),\
|
payload = OrderedDict([ ("kEy" , "foo.txt"),("AWSAccessKeyId" , conn.aws_access_key_id),\
|
||||||
|
@ -1301,11 +1288,8 @@ def test_post_object_case_insensitive_condition_fields():
|
||||||
@attr(assertion='succeeds with escaped leading $ and returns written data')
|
@attr(assertion='succeeds with escaped leading $ and returns written data')
|
||||||
def test_post_object_escaped_field_values():
|
def test_post_object_escaped_field_values():
|
||||||
bucket = get_new_bucket()
|
bucket = get_new_bucket()
|
||||||
conn = s3.main
|
|
||||||
host_name = conn.host
|
|
||||||
|
|
||||||
url = '{protocol}://{host}/{bucket}'.format(protocol= 'https' if conn.is_secure else 'http',\
|
url = _get_post_url(s3.main, bucket)
|
||||||
host=host_name,bucket=bucket.name)
|
|
||||||
|
|
||||||
utc = pytz.utc
|
utc = pytz.utc
|
||||||
expires = datetime.datetime.now(utc) + datetime.timedelta(seconds=+6000)
|
expires = datetime.datetime.now(utc) + datetime.timedelta(seconds=+6000)
|
||||||
|
@ -1322,6 +1306,7 @@ def test_post_object_escaped_field_values():
|
||||||
|
|
||||||
json_policy_document = json.JSONEncoder().encode(policy_document)
|
json_policy_document = json.JSONEncoder().encode(policy_document)
|
||||||
policy = base64.b64encode(json_policy_document)
|
policy = base64.b64encode(json_policy_document)
|
||||||
|
conn = s3.main
|
||||||
signature = base64.b64encode(hmac.new(conn.aws_secret_access_key, policy, sha).digest())
|
signature = base64.b64encode(hmac.new(conn.aws_secret_access_key, policy, sha).digest())
|
||||||
|
|
||||||
payload = OrderedDict([ ("key" , "\$foo.txt"),("AWSAccessKeyId" , conn.aws_access_key_id),\
|
payload = OrderedDict([ ("key" , "\$foo.txt"),("AWSAccessKeyId" , conn.aws_access_key_id),\
|
||||||
|
@ -1342,15 +1327,9 @@ def test_post_object_escaped_field_values():
|
||||||
def test_post_object_success_redirect_action():
|
def test_post_object_success_redirect_action():
|
||||||
bucket = get_new_bucket()
|
bucket = get_new_bucket()
|
||||||
|
|
||||||
conn = s3.main
|
url = _get_post_url(s3.main, bucket)
|
||||||
host_name = conn.host
|
redirect_url = _get_post_url(s3.main, bucket)
|
||||||
|
bucket.set_acl('public-read')
|
||||||
url = '{protocol}://{host}/{bucket}'.format(protocol= 'https' if conn.is_secure else 'http',\
|
|
||||||
host=host_name,bucket=bucket.name)
|
|
||||||
|
|
||||||
redirect_url = '{protocol}://{host}/{bucket}'.format(protocol= 'https' if conn.is_secure else 'http',\
|
|
||||||
host=host_name,bucket=bucket.name)
|
|
||||||
bucket.set_acl('public-read')
|
|
||||||
|
|
||||||
utc = pytz.utc
|
utc = pytz.utc
|
||||||
expires = datetime.datetime.now(utc) + datetime.timedelta(seconds=+6000)
|
expires = datetime.datetime.now(utc) + datetime.timedelta(seconds=+6000)
|
||||||
|
@ -1368,6 +1347,7 @@ def test_post_object_success_redirect_action():
|
||||||
|
|
||||||
json_policy_document = json.JSONEncoder().encode(policy_document)
|
json_policy_document = json.JSONEncoder().encode(policy_document)
|
||||||
policy = base64.b64encode(json_policy_document)
|
policy = base64.b64encode(json_policy_document)
|
||||||
|
conn = s3.main
|
||||||
signature = base64.b64encode(hmac.new(conn.aws_secret_access_key, policy, sha).digest())
|
signature = base64.b64encode(hmac.new(conn.aws_secret_access_key, policy, sha).digest())
|
||||||
|
|
||||||
payload = OrderedDict([ ("key" , "foo.txt"),("AWSAccessKeyId" , conn.aws_access_key_id),\
|
payload = OrderedDict([ ("key" , "foo.txt"),("AWSAccessKeyId" , conn.aws_access_key_id),\
|
||||||
|
@ -1390,11 +1370,8 @@ def test_post_object_success_redirect_action():
|
||||||
@attr(assertion='fails with invalid signature error')
|
@attr(assertion='fails with invalid signature error')
|
||||||
def test_post_object_invalid_signature():
|
def test_post_object_invalid_signature():
|
||||||
bucket = get_new_bucket()
|
bucket = get_new_bucket()
|
||||||
conn = s3.main
|
|
||||||
host_name = conn.host
|
|
||||||
|
|
||||||
url = '{protocol}://{host}/{bucket}'.format(protocol= 'https' if conn.is_secure else 'http',\
|
url = _get_post_url(s3.main, bucket)
|
||||||
host=host_name,bucket=bucket.name)
|
|
||||||
|
|
||||||
utc = pytz.utc
|
utc = pytz.utc
|
||||||
expires = datetime.datetime.now(utc) + datetime.timedelta(seconds=+6000)
|
expires = datetime.datetime.now(utc) + datetime.timedelta(seconds=+6000)
|
||||||
|
@ -1411,6 +1388,7 @@ def test_post_object_invalid_signature():
|
||||||
|
|
||||||
json_policy_document = json.JSONEncoder().encode(policy_document)
|
json_policy_document = json.JSONEncoder().encode(policy_document)
|
||||||
policy = base64.b64encode(json_policy_document)
|
policy = base64.b64encode(json_policy_document)
|
||||||
|
conn = s3.main
|
||||||
signature = base64.b64encode(hmac.new(conn.aws_secret_access_key, policy, sha).digest())[::-1]
|
signature = base64.b64encode(hmac.new(conn.aws_secret_access_key, policy, sha).digest())[::-1]
|
||||||
|
|
||||||
payload = OrderedDict([ ("key" , "\$foo.txt"),("AWSAccessKeyId" , conn.aws_access_key_id),\
|
payload = OrderedDict([ ("key" , "\$foo.txt"),("AWSAccessKeyId" , conn.aws_access_key_id),\
|
||||||
|
@ -1427,11 +1405,8 @@ def test_post_object_invalid_signature():
|
||||||
@attr(assertion='fails with access key does not exist error')
|
@attr(assertion='fails with access key does not exist error')
|
||||||
def test_post_object_invalid_access_key():
|
def test_post_object_invalid_access_key():
|
||||||
bucket = get_new_bucket()
|
bucket = get_new_bucket()
|
||||||
conn = s3.main
|
|
||||||
host_name = conn.host
|
|
||||||
|
|
||||||
url = '{protocol}://{host}/{bucket}'.format(protocol= 'https' if conn.is_secure else 'http',\
|
url = _get_post_url(s3.main, bucket)
|
||||||
host=host_name,bucket=bucket.name)
|
|
||||||
|
|
||||||
utc = pytz.utc
|
utc = pytz.utc
|
||||||
expires = datetime.datetime.now(utc) + datetime.timedelta(seconds=+6000)
|
expires = datetime.datetime.now(utc) + datetime.timedelta(seconds=+6000)
|
||||||
|
@ -1448,6 +1423,7 @@ def test_post_object_invalid_access_key():
|
||||||
|
|
||||||
json_policy_document = json.JSONEncoder().encode(policy_document)
|
json_policy_document = json.JSONEncoder().encode(policy_document)
|
||||||
policy = base64.b64encode(json_policy_document)
|
policy = base64.b64encode(json_policy_document)
|
||||||
|
conn = s3.main
|
||||||
signature = base64.b64encode(hmac.new(conn.aws_secret_access_key, policy, sha).digest())
|
signature = base64.b64encode(hmac.new(conn.aws_secret_access_key, policy, sha).digest())
|
||||||
|
|
||||||
payload = OrderedDict([ ("key" , "\$foo.txt"),("AWSAccessKeyId" , conn.aws_access_key_id[::-1]),\
|
payload = OrderedDict([ ("key" , "\$foo.txt"),("AWSAccessKeyId" , conn.aws_access_key_id[::-1]),\
|
||||||
|
@ -1464,11 +1440,8 @@ def test_post_object_invalid_access_key():
|
||||||
@attr(assertion='fails with invalid expiration error')
|
@attr(assertion='fails with invalid expiration error')
|
||||||
def test_post_object_invalid_date_format():
|
def test_post_object_invalid_date_format():
|
||||||
bucket = get_new_bucket()
|
bucket = get_new_bucket()
|
||||||
conn = s3.main
|
|
||||||
host_name = conn.host
|
|
||||||
|
|
||||||
url = '{protocol}://{host}/{bucket}'.format(protocol= 'https' if conn.is_secure else 'http',\
|
url = _get_post_url(s3.main, bucket)
|
||||||
host=host_name,bucket=bucket.name)
|
|
||||||
|
|
||||||
utc = pytz.utc
|
utc = pytz.utc
|
||||||
expires = datetime.datetime.now(utc) + datetime.timedelta(seconds=+6000)
|
expires = datetime.datetime.now(utc) + datetime.timedelta(seconds=+6000)
|
||||||
|
@ -1485,6 +1458,7 @@ def test_post_object_invalid_date_format():
|
||||||
|
|
||||||
json_policy_document = json.JSONEncoder().encode(policy_document)
|
json_policy_document = json.JSONEncoder().encode(policy_document)
|
||||||
policy = base64.b64encode(json_policy_document)
|
policy = base64.b64encode(json_policy_document)
|
||||||
|
conn = s3.main
|
||||||
signature = base64.b64encode(hmac.new(conn.aws_secret_access_key, policy, sha).digest())
|
signature = base64.b64encode(hmac.new(conn.aws_secret_access_key, policy, sha).digest())
|
||||||
|
|
||||||
payload = OrderedDict([ ("key" , "\$foo.txt"),("AWSAccessKeyId" , conn.aws_access_key_id),\
|
payload = OrderedDict([ ("key" , "\$foo.txt"),("AWSAccessKeyId" , conn.aws_access_key_id),\
|
||||||
|
@ -1501,11 +1475,8 @@ def test_post_object_invalid_date_format():
|
||||||
@attr(assertion='fails with missing key error')
|
@attr(assertion='fails with missing key error')
|
||||||
def test_post_object_no_key_specified():
|
def test_post_object_no_key_specified():
|
||||||
bucket = get_new_bucket()
|
bucket = get_new_bucket()
|
||||||
conn = s3.main
|
|
||||||
host_name = conn.host
|
|
||||||
|
|
||||||
url = '{protocol}://{host}/{bucket}'.format(protocol= 'https' if conn.is_secure else 'http',
|
url = _get_post_url(s3.main, bucket)
|
||||||
host=host_name,bucket=bucket.name)
|
|
||||||
|
|
||||||
utc = pytz.utc
|
utc = pytz.utc
|
||||||
expires = datetime.datetime.now(utc) + datetime.timedelta(seconds=+6000)
|
expires = datetime.datetime.now(utc) + datetime.timedelta(seconds=+6000)
|
||||||
|
@ -1521,6 +1492,7 @@ def test_post_object_no_key_specified():
|
||||||
|
|
||||||
json_policy_document = json.JSONEncoder().encode(policy_document)
|
json_policy_document = json.JSONEncoder().encode(policy_document)
|
||||||
policy = base64.b64encode(json_policy_document)
|
policy = base64.b64encode(json_policy_document)
|
||||||
|
conn = s3.main
|
||||||
signature = base64.b64encode(hmac.new(conn.aws_secret_access_key, policy, sha).digest())
|
signature = base64.b64encode(hmac.new(conn.aws_secret_access_key, policy, sha).digest())
|
||||||
|
|
||||||
payload = OrderedDict([ ("AWSAccessKeyId" , conn.aws_access_key_id),\
|
payload = OrderedDict([ ("AWSAccessKeyId" , conn.aws_access_key_id),\
|
||||||
|
@ -1537,11 +1509,8 @@ def test_post_object_no_key_specified():
|
||||||
@attr(assertion='fails with missing signature error')
|
@attr(assertion='fails with missing signature error')
|
||||||
def test_post_object_missing_signature():
|
def test_post_object_missing_signature():
|
||||||
bucket = get_new_bucket()
|
bucket = get_new_bucket()
|
||||||
conn = s3.main
|
|
||||||
host_name = conn.host
|
|
||||||
|
|
||||||
url = '{protocol}://{host}/{bucket}'.format(protocol= 'https' if conn.is_secure else 'http',\
|
url = _get_post_url(s3.main, bucket)
|
||||||
host=host_name,bucket=bucket.name)
|
|
||||||
|
|
||||||
utc = pytz.utc
|
utc = pytz.utc
|
||||||
expires = datetime.datetime.now(utc) + datetime.timedelta(seconds=+6000)
|
expires = datetime.datetime.now(utc) + datetime.timedelta(seconds=+6000)
|
||||||
|
@ -1558,6 +1527,7 @@ def test_post_object_missing_signature():
|
||||||
|
|
||||||
json_policy_document = json.JSONEncoder().encode(policy_document)
|
json_policy_document = json.JSONEncoder().encode(policy_document)
|
||||||
policy = base64.b64encode(json_policy_document)
|
policy = base64.b64encode(json_policy_document)
|
||||||
|
conn = s3.main
|
||||||
signature = base64.b64encode(hmac.new(conn.aws_secret_access_key, policy, sha).digest())
|
signature = base64.b64encode(hmac.new(conn.aws_secret_access_key, policy, sha).digest())
|
||||||
|
|
||||||
payload = OrderedDict([ ("key" , "foo.txt"),("AWSAccessKeyId" , conn.aws_access_key_id),\
|
payload = OrderedDict([ ("key" , "foo.txt"),("AWSAccessKeyId" , conn.aws_access_key_id),\
|
||||||
|
@ -1574,11 +1544,8 @@ def test_post_object_missing_signature():
|
||||||
@attr(assertion='fails with extra input fields policy error')
|
@attr(assertion='fails with extra input fields policy error')
|
||||||
def test_post_object_missing_policy_condition():
|
def test_post_object_missing_policy_condition():
|
||||||
bucket = get_new_bucket()
|
bucket = get_new_bucket()
|
||||||
conn = s3.main
|
|
||||||
host_name = conn.host
|
|
||||||
|
|
||||||
url = '{protocol}://{host}/{bucket}'.format(protocol= 'https' if conn.is_secure else 'http',\
|
url = _get_post_url(s3.main, bucket)
|
||||||
host=host_name,bucket=bucket.name)
|
|
||||||
|
|
||||||
utc = pytz.utc
|
utc = pytz.utc
|
||||||
expires = datetime.datetime.now(utc) + datetime.timedelta(seconds=+6000)
|
expires = datetime.datetime.now(utc) + datetime.timedelta(seconds=+6000)
|
||||||
|
@ -1594,6 +1561,7 @@ def test_post_object_missing_policy_condition():
|
||||||
|
|
||||||
json_policy_document = json.JSONEncoder().encode(policy_document)
|
json_policy_document = json.JSONEncoder().encode(policy_document)
|
||||||
policy = base64.b64encode(json_policy_document)
|
policy = base64.b64encode(json_policy_document)
|
||||||
|
conn = s3.main
|
||||||
signature = base64.b64encode(hmac.new(conn.aws_secret_access_key, policy, sha).digest())
|
signature = base64.b64encode(hmac.new(conn.aws_secret_access_key, policy, sha).digest())
|
||||||
|
|
||||||
payload = OrderedDict([ ("key" , "foo.txt"),("AWSAccessKeyId" , conn.aws_access_key_id),\
|
payload = OrderedDict([ ("key" , "foo.txt"),("AWSAccessKeyId" , conn.aws_access_key_id),\
|
||||||
|
@ -1610,11 +1578,8 @@ def test_post_object_missing_policy_condition():
|
||||||
@attr(assertion='succeeds using starts-with restriction on metadata header')
|
@attr(assertion='succeeds using starts-with restriction on metadata header')
|
||||||
def test_post_object_user_specified_header():
|
def test_post_object_user_specified_header():
|
||||||
bucket = get_new_bucket()
|
bucket = get_new_bucket()
|
||||||
conn = s3.main
|
|
||||||
host_name = conn.host
|
|
||||||
|
|
||||||
url = '{protocol}://{host}/{bucket}'.format(protocol= 'https' if conn.is_secure else 'http',\
|
url = _get_post_url(s3.main, bucket)
|
||||||
host=host_name,bucket=bucket.name)
|
|
||||||
|
|
||||||
utc = pytz.utc
|
utc = pytz.utc
|
||||||
expires = datetime.datetime.now(utc) + datetime.timedelta(seconds=+6000)
|
expires = datetime.datetime.now(utc) + datetime.timedelta(seconds=+6000)
|
||||||
|
@ -1632,6 +1597,7 @@ def test_post_object_user_specified_header():
|
||||||
|
|
||||||
json_policy_document = json.JSONEncoder().encode(policy_document)
|
json_policy_document = json.JSONEncoder().encode(policy_document)
|
||||||
policy = base64.b64encode(json_policy_document)
|
policy = base64.b64encode(json_policy_document)
|
||||||
|
conn = s3.main
|
||||||
signature = base64.b64encode(hmac.new(conn.aws_secret_access_key, policy, sha).digest())
|
signature = base64.b64encode(hmac.new(conn.aws_secret_access_key, policy, sha).digest())
|
||||||
|
|
||||||
payload = OrderedDict([ ("key" , "foo.txt"),("AWSAccessKeyId" , conn.aws_access_key_id),\
|
payload = OrderedDict([ ("key" , "foo.txt"),("AWSAccessKeyId" , conn.aws_access_key_id),\
|
||||||
|
@ -1650,11 +1616,8 @@ def test_post_object_user_specified_header():
|
||||||
@attr(assertion='fails with policy condition failed error due to missing field in POST request')
|
@attr(assertion='fails with policy condition failed error due to missing field in POST request')
|
||||||
def test_post_object_request_missing_policy_specified_field():
|
def test_post_object_request_missing_policy_specified_field():
|
||||||
bucket = get_new_bucket()
|
bucket = get_new_bucket()
|
||||||
conn = s3.main
|
|
||||||
host_name = conn.host
|
|
||||||
|
|
||||||
url = '{protocol}://{host}/{bucket}'.format(protocol= 'https' if conn.is_secure else 'http',
|
url = _get_post_url(s3.main, bucket)
|
||||||
host=host_name,bucket=bucket.name)
|
|
||||||
|
|
||||||
utc = pytz.utc
|
utc = pytz.utc
|
||||||
expires = datetime.datetime.now(utc) + datetime.timedelta(seconds=+6000)
|
expires = datetime.datetime.now(utc) + datetime.timedelta(seconds=+6000)
|
||||||
|
@ -1672,6 +1635,7 @@ def test_post_object_request_missing_policy_specified_field():
|
||||||
|
|
||||||
json_policy_document = json.JSONEncoder().encode(policy_document)
|
json_policy_document = json.JSONEncoder().encode(policy_document)
|
||||||
policy = base64.b64encode(json_policy_document)
|
policy = base64.b64encode(json_policy_document)
|
||||||
|
conn = s3.main
|
||||||
signature = base64.b64encode(hmac.new(conn.aws_secret_access_key, policy, sha).digest())
|
signature = base64.b64encode(hmac.new(conn.aws_secret_access_key, policy, sha).digest())
|
||||||
|
|
||||||
payload = OrderedDict([ ("key" , "foo.txt"),("AWSAccessKeyId" , conn.aws_access_key_id),\
|
payload = OrderedDict([ ("key" , "foo.txt"),("AWSAccessKeyId" , conn.aws_access_key_id),\
|
||||||
|
@ -1688,11 +1652,8 @@ def test_post_object_request_missing_policy_specified_field():
|
||||||
@attr(assertion='fails with conditions must be list error')
|
@attr(assertion='fails with conditions must be list error')
|
||||||
def test_post_object_condition_is_case_sensitive():
|
def test_post_object_condition_is_case_sensitive():
|
||||||
bucket = get_new_bucket()
|
bucket = get_new_bucket()
|
||||||
conn = s3.main
|
|
||||||
host_name = conn.host
|
|
||||||
|
|
||||||
url = '{protocol}://{host}/{bucket}'.format(protocol= 'https' if conn.is_secure else 'http',\
|
url = _get_post_url(s3.main, bucket)
|
||||||
host=host_name,bucket=bucket.name)
|
|
||||||
|
|
||||||
utc = pytz.utc
|
utc = pytz.utc
|
||||||
expires = datetime.datetime.now(utc) + datetime.timedelta(seconds=+6000)
|
expires = datetime.datetime.now(utc) + datetime.timedelta(seconds=+6000)
|
||||||
|
@ -1709,6 +1670,7 @@ def test_post_object_condition_is_case_sensitive():
|
||||||
|
|
||||||
json_policy_document = json.JSONEncoder().encode(policy_document)
|
json_policy_document = json.JSONEncoder().encode(policy_document)
|
||||||
policy = base64.b64encode(json_policy_document)
|
policy = base64.b64encode(json_policy_document)
|
||||||
|
conn = s3.main
|
||||||
signature = base64.b64encode(hmac.new(conn.aws_secret_access_key, policy, sha).digest())
|
signature = base64.b64encode(hmac.new(conn.aws_secret_access_key, policy, sha).digest())
|
||||||
|
|
||||||
payload = OrderedDict([ ("key" , "foo.txt"),("AWSAccessKeyId" , conn.aws_access_key_id),\
|
payload = OrderedDict([ ("key" , "foo.txt"),("AWSAccessKeyId" , conn.aws_access_key_id),\
|
||||||
|
@ -1725,11 +1687,8 @@ def test_post_object_condition_is_case_sensitive():
|
||||||
@attr(assertion='fails with expiration must be string error')
|
@attr(assertion='fails with expiration must be string error')
|
||||||
def test_post_object_expires_is_case_sensitive():
|
def test_post_object_expires_is_case_sensitive():
|
||||||
bucket = get_new_bucket()
|
bucket = get_new_bucket()
|
||||||
conn = s3.main
|
|
||||||
host_name = conn.host
|
|
||||||
|
|
||||||
url = '{protocol}://{host}/{bucket}'.format(protocol= 'https' if conn.is_secure else 'http',\
|
url = _get_post_url(s3.main, bucket)
|
||||||
host=host_name,bucket=bucket.name)
|
|
||||||
|
|
||||||
utc = pytz.utc
|
utc = pytz.utc
|
||||||
expires = datetime.datetime.now(utc) + datetime.timedelta(seconds=+6000)
|
expires = datetime.datetime.now(utc) + datetime.timedelta(seconds=+6000)
|
||||||
|
@ -1746,6 +1705,7 @@ def test_post_object_expires_is_case_sensitive():
|
||||||
|
|
||||||
json_policy_document = json.JSONEncoder().encode(policy_document)
|
json_policy_document = json.JSONEncoder().encode(policy_document)
|
||||||
policy = base64.b64encode(json_policy_document)
|
policy = base64.b64encode(json_policy_document)
|
||||||
|
conn = s3.main
|
||||||
signature = base64.b64encode(hmac.new(conn.aws_secret_access_key, policy, sha).digest())
|
signature = base64.b64encode(hmac.new(conn.aws_secret_access_key, policy, sha).digest())
|
||||||
|
|
||||||
payload = OrderedDict([ ("key" , "foo.txt"),("AWSAccessKeyId" , conn.aws_access_key_id),\
|
payload = OrderedDict([ ("key" , "foo.txt"),("AWSAccessKeyId" , conn.aws_access_key_id),\
|
||||||
|
@ -1762,11 +1722,8 @@ def test_post_object_expires_is_case_sensitive():
|
||||||
@attr(assertion='fails with policy expired error')
|
@attr(assertion='fails with policy expired error')
|
||||||
def test_post_object_expired_policy():
|
def test_post_object_expired_policy():
|
||||||
bucket = get_new_bucket()
|
bucket = get_new_bucket()
|
||||||
conn = s3.main
|
|
||||||
host_name = conn.host
|
|
||||||
|
|
||||||
url = '{protocol}://{host}/{bucket}'.format(protocol= 'https' if conn.is_secure else 'http',\
|
url = _get_post_url(s3.main, bucket)
|
||||||
host=host_name,bucket=bucket.name)
|
|
||||||
|
|
||||||
utc = pytz.utc
|
utc = pytz.utc
|
||||||
expires = datetime.datetime.now(utc) + datetime.timedelta(seconds=-6000)
|
expires = datetime.datetime.now(utc) + datetime.timedelta(seconds=-6000)
|
||||||
|
@ -1783,6 +1740,7 @@ def test_post_object_expired_policy():
|
||||||
|
|
||||||
json_policy_document = json.JSONEncoder().encode(policy_document)
|
json_policy_document = json.JSONEncoder().encode(policy_document)
|
||||||
policy = base64.b64encode(json_policy_document)
|
policy = base64.b64encode(json_policy_document)
|
||||||
|
conn = s3.main
|
||||||
signature = base64.b64encode(hmac.new(conn.aws_secret_access_key, policy, sha).digest())
|
signature = base64.b64encode(hmac.new(conn.aws_secret_access_key, policy, sha).digest())
|
||||||
|
|
||||||
payload = OrderedDict([ ("key" , "foo.txt"),("AWSAccessKeyId" , conn.aws_access_key_id),\
|
payload = OrderedDict([ ("key" , "foo.txt"),("AWSAccessKeyId" , conn.aws_access_key_id),\
|
||||||
|
@ -1799,11 +1757,8 @@ def test_post_object_expired_policy():
|
||||||
@attr(assertion='fails using equality restriction on metadata header')
|
@attr(assertion='fails using equality restriction on metadata header')
|
||||||
def test_post_object_invalid_request_field_value():
|
def test_post_object_invalid_request_field_value():
|
||||||
bucket = get_new_bucket()
|
bucket = get_new_bucket()
|
||||||
conn = s3.main
|
|
||||||
host_name = conn.host
|
|
||||||
|
|
||||||
url = '{protocol}://{host}/{bucket}'.format(protocol= 'https' if conn.is_secure else 'http',\
|
url = _get_post_url(s3.main, bucket)
|
||||||
host=host_name,bucket=bucket.name)
|
|
||||||
|
|
||||||
utc = pytz.utc
|
utc = pytz.utc
|
||||||
expires = datetime.datetime.now(utc) + datetime.timedelta(seconds=+6000)
|
expires = datetime.datetime.now(utc) + datetime.timedelta(seconds=+6000)
|
||||||
|
@ -1821,6 +1776,7 @@ def test_post_object_invalid_request_field_value():
|
||||||
|
|
||||||
json_policy_document = json.JSONEncoder().encode(policy_document)
|
json_policy_document = json.JSONEncoder().encode(policy_document)
|
||||||
policy = base64.b64encode(json_policy_document)
|
policy = base64.b64encode(json_policy_document)
|
||||||
|
conn = s3.main
|
||||||
signature = base64.b64encode(hmac.new(conn.aws_secret_access_key, policy, sha).digest())
|
signature = base64.b64encode(hmac.new(conn.aws_secret_access_key, policy, sha).digest())
|
||||||
|
|
||||||
payload = OrderedDict([ ("key" , "foo.txt"),("AWSAccessKeyId" , conn.aws_access_key_id),\
|
payload = OrderedDict([ ("key" , "foo.txt"),("AWSAccessKeyId" , conn.aws_access_key_id),\
|
||||||
|
@ -1837,11 +1793,8 @@ def test_post_object_invalid_request_field_value():
|
||||||
@attr(assertion='fails with policy missing expiration error')
|
@attr(assertion='fails with policy missing expiration error')
|
||||||
def test_post_object_missing_expires_condition():
|
def test_post_object_missing_expires_condition():
|
||||||
bucket = get_new_bucket()
|
bucket = get_new_bucket()
|
||||||
conn = s3.main
|
|
||||||
host_name = conn.host
|
|
||||||
|
|
||||||
url = '{protocol}://{host}/{bucket}'.format(protocol= 'https' if conn.is_secure else 'http',\
|
url = _get_post_url(s3.main, bucket)
|
||||||
host=host_name,bucket=bucket.name)
|
|
||||||
|
|
||||||
utc = pytz.utc
|
utc = pytz.utc
|
||||||
expires = datetime.datetime.now(utc) + datetime.timedelta(seconds=+6000)
|
expires = datetime.datetime.now(utc) + datetime.timedelta(seconds=+6000)
|
||||||
|
@ -1858,6 +1811,7 @@ def test_post_object_missing_expires_condition():
|
||||||
|
|
||||||
json_policy_document = json.JSONEncoder().encode(policy_document)
|
json_policy_document = json.JSONEncoder().encode(policy_document)
|
||||||
policy = base64.b64encode(json_policy_document)
|
policy = base64.b64encode(json_policy_document)
|
||||||
|
conn = s3.main
|
||||||
signature = base64.b64encode(hmac.new(conn.aws_secret_access_key, policy, sha).digest())
|
signature = base64.b64encode(hmac.new(conn.aws_secret_access_key, policy, sha).digest())
|
||||||
|
|
||||||
payload = OrderedDict([ ("key" , "foo.txt"),("AWSAccessKeyId" , conn.aws_access_key_id),\
|
payload = OrderedDict([ ("key" , "foo.txt"),("AWSAccessKeyId" , conn.aws_access_key_id),\
|
||||||
|
@ -1874,11 +1828,8 @@ def test_post_object_missing_expires_condition():
|
||||||
@attr(assertion='fails with policy missing conditions error')
|
@attr(assertion='fails with policy missing conditions error')
|
||||||
def test_post_object_missing_conditions_list():
|
def test_post_object_missing_conditions_list():
|
||||||
bucket = get_new_bucket()
|
bucket = get_new_bucket()
|
||||||
conn = s3.main
|
|
||||||
host_name = conn.host
|
|
||||||
|
|
||||||
url = '{protocol}://{host}/{bucket}'.format(protocol= 'https' if conn.is_secure else 'http',\
|
url = _get_post_url(s3.main, bucket)
|
||||||
host=host_name,bucket=bucket.name)
|
|
||||||
|
|
||||||
utc = pytz.utc
|
utc = pytz.utc
|
||||||
expires = datetime.datetime.now(utc) + datetime.timedelta(seconds=+6000)
|
expires = datetime.datetime.now(utc) + datetime.timedelta(seconds=+6000)
|
||||||
|
@ -1888,6 +1839,7 @@ def test_post_object_missing_conditions_list():
|
||||||
|
|
||||||
json_policy_document = json.JSONEncoder().encode(policy_document)
|
json_policy_document = json.JSONEncoder().encode(policy_document)
|
||||||
policy = base64.b64encode(json_policy_document)
|
policy = base64.b64encode(json_policy_document)
|
||||||
|
conn = s3.main
|
||||||
signature = base64.b64encode(hmac.new(conn.aws_secret_access_key, policy, sha).digest())
|
signature = base64.b64encode(hmac.new(conn.aws_secret_access_key, policy, sha).digest())
|
||||||
|
|
||||||
payload = OrderedDict([ ("key" , "foo.txt"),("AWSAccessKeyId" , conn.aws_access_key_id),\
|
payload = OrderedDict([ ("key" , "foo.txt"),("AWSAccessKeyId" , conn.aws_access_key_id),\
|
||||||
|
@ -1904,11 +1856,8 @@ def test_post_object_missing_conditions_list():
|
||||||
@attr(assertion='fails with allowable upload size exceeded error')
|
@attr(assertion='fails with allowable upload size exceeded error')
|
||||||
def test_post_object_upload_size_limit_exceeded():
|
def test_post_object_upload_size_limit_exceeded():
|
||||||
bucket = get_new_bucket()
|
bucket = get_new_bucket()
|
||||||
conn = s3.main
|
|
||||||
host_name = conn.host
|
|
||||||
|
|
||||||
url = '{protocol}://{host}/{bucket}'.format(protocol= 'https' if conn.is_secure else 'http',\
|
url = _get_post_url(s3.main, bucket)
|
||||||
host=host_name,bucket=bucket.name)
|
|
||||||
|
|
||||||
utc = pytz.utc
|
utc = pytz.utc
|
||||||
expires = datetime.datetime.now(utc) + datetime.timedelta(seconds=+6000)
|
expires = datetime.datetime.now(utc) + datetime.timedelta(seconds=+6000)
|
||||||
|
@ -1925,6 +1874,7 @@ def test_post_object_upload_size_limit_exceeded():
|
||||||
|
|
||||||
json_policy_document = json.JSONEncoder().encode(policy_document)
|
json_policy_document = json.JSONEncoder().encode(policy_document)
|
||||||
policy = base64.b64encode(json_policy_document)
|
policy = base64.b64encode(json_policy_document)
|
||||||
|
conn = s3.main
|
||||||
signature = base64.b64encode(hmac.new(conn.aws_secret_access_key, policy, sha).digest())
|
signature = base64.b64encode(hmac.new(conn.aws_secret_access_key, policy, sha).digest())
|
||||||
|
|
||||||
payload = OrderedDict([ ("key" , "foo.txt"),("AWSAccessKeyId" , conn.aws_access_key_id),\
|
payload = OrderedDict([ ("key" , "foo.txt"),("AWSAccessKeyId" , conn.aws_access_key_id),\
|
||||||
|
@ -1941,11 +1891,8 @@ def test_post_object_upload_size_limit_exceeded():
|
||||||
@attr(assertion='fails with invalid content length error')
|
@attr(assertion='fails with invalid content length error')
|
||||||
def test_post_object_missing_content_length_argument():
|
def test_post_object_missing_content_length_argument():
|
||||||
bucket = get_new_bucket()
|
bucket = get_new_bucket()
|
||||||
conn = s3.main
|
|
||||||
host_name = conn.host
|
|
||||||
|
|
||||||
url = '{protocol}://{host}/{bucket}'.format(protocol= 'https' if conn.is_secure else 'http',\
|
url = _get_post_url(s3.main, bucket)
|
||||||
host=host_name,bucket=bucket.name)
|
|
||||||
|
|
||||||
utc = pytz.utc
|
utc = pytz.utc
|
||||||
expires = datetime.datetime.now(utc) + datetime.timedelta(seconds=+6000)
|
expires = datetime.datetime.now(utc) + datetime.timedelta(seconds=+6000)
|
||||||
|
@ -1962,6 +1909,7 @@ def test_post_object_missing_content_length_argument():
|
||||||
|
|
||||||
json_policy_document = json.JSONEncoder().encode(policy_document)
|
json_policy_document = json.JSONEncoder().encode(policy_document)
|
||||||
policy = base64.b64encode(json_policy_document)
|
policy = base64.b64encode(json_policy_document)
|
||||||
|
conn = s3.main
|
||||||
signature = base64.b64encode(hmac.new(conn.aws_secret_access_key, policy, sha).digest())
|
signature = base64.b64encode(hmac.new(conn.aws_secret_access_key, policy, sha).digest())
|
||||||
|
|
||||||
payload = OrderedDict([ ("key" , "foo.txt"),("AWSAccessKeyId" , conn.aws_access_key_id),\
|
payload = OrderedDict([ ("key" , "foo.txt"),("AWSAccessKeyId" , conn.aws_access_key_id),\
|
||||||
|
@ -1978,11 +1926,8 @@ def test_post_object_missing_content_length_argument():
|
||||||
@attr(assertion='fails with invalid JSON error')
|
@attr(assertion='fails with invalid JSON error')
|
||||||
def test_post_object_invalid_content_length_argument():
|
def test_post_object_invalid_content_length_argument():
|
||||||
bucket = get_new_bucket()
|
bucket = get_new_bucket()
|
||||||
conn = s3.main
|
|
||||||
host_name = conn.host
|
|
||||||
|
|
||||||
url = '{protocol}://{host}/{bucket}'.format(protocol= 'https' if conn.is_secure else 'http',\
|
url = _get_post_url(s3.main, bucket)
|
||||||
host=host_name,bucket=bucket.name)
|
|
||||||
|
|
||||||
utc = pytz.utc
|
utc = pytz.utc
|
||||||
expires = datetime.datetime.now(utc) + datetime.timedelta(seconds=+6000)
|
expires = datetime.datetime.now(utc) + datetime.timedelta(seconds=+6000)
|
||||||
|
@ -1999,6 +1944,7 @@ def test_post_object_invalid_content_length_argument():
|
||||||
|
|
||||||
json_policy_document = json.JSONEncoder().encode(policy_document)
|
json_policy_document = json.JSONEncoder().encode(policy_document)
|
||||||
policy = base64.b64encode(json_policy_document)
|
policy = base64.b64encode(json_policy_document)
|
||||||
|
conn = s3.main
|
||||||
signature = base64.b64encode(hmac.new(conn.aws_secret_access_key, policy, sha).digest())
|
signature = base64.b64encode(hmac.new(conn.aws_secret_access_key, policy, sha).digest())
|
||||||
|
|
||||||
payload = OrderedDict([ ("key" , "foo.txt"),("AWSAccessKeyId" , conn.aws_access_key_id),\
|
payload = OrderedDict([ ("key" , "foo.txt"),("AWSAccessKeyId" , conn.aws_access_key_id),\
|
||||||
|
@ -2015,11 +1961,8 @@ def test_post_object_invalid_content_length_argument():
|
||||||
@attr(assertion='fails with upload size less than minimum allowable error')
|
@attr(assertion='fails with upload size less than minimum allowable error')
|
||||||
def test_post_object_upload_size_below_minimum():
|
def test_post_object_upload_size_below_minimum():
|
||||||
bucket = get_new_bucket()
|
bucket = get_new_bucket()
|
||||||
conn = s3.main
|
|
||||||
host_name = conn.host
|
|
||||||
|
|
||||||
url = '{protocol}://{host}/{bucket}'.format(protocol= 'https' if conn.is_secure else 'http',\
|
url = _get_post_url(s3.main, bucket)
|
||||||
host=host_name,bucket=bucket.name)
|
|
||||||
|
|
||||||
utc = pytz.utc
|
utc = pytz.utc
|
||||||
expires = datetime.datetime.now(utc) + datetime.timedelta(seconds=+6000)
|
expires = datetime.datetime.now(utc) + datetime.timedelta(seconds=+6000)
|
||||||
|
@ -2036,6 +1979,7 @@ def test_post_object_upload_size_below_minimum():
|
||||||
|
|
||||||
json_policy_document = json.JSONEncoder().encode(policy_document)
|
json_policy_document = json.JSONEncoder().encode(policy_document)
|
||||||
policy = base64.b64encode(json_policy_document)
|
policy = base64.b64encode(json_policy_document)
|
||||||
|
conn = s3.main
|
||||||
signature = base64.b64encode(hmac.new(conn.aws_secret_access_key, policy, sha).digest())
|
signature = base64.b64encode(hmac.new(conn.aws_secret_access_key, policy, sha).digest())
|
||||||
|
|
||||||
payload = OrderedDict([ ("key" , "foo.txt"),("AWSAccessKeyId" , conn.aws_access_key_id),\
|
payload = OrderedDict([ ("key" , "foo.txt"),("AWSAccessKeyId" , conn.aws_access_key_id),\
|
||||||
|
|
Loading…
Reference in a new issue