From 414a107b4630a750dee890ccae92c49c6b877aa2 Mon Sep 17 00:00:00 2001 From: Yehuda Sadeh Date: Wed, 23 Jan 2013 10:41:38 -0800 Subject: [PATCH] rgw: fix post tests to include tcp port Signed-off-by: Yehuda Sadeh --- s3tests/functional/test_s3.py | 178 ++++++++++++---------------------- 1 file changed, 61 insertions(+), 117 deletions(-) diff --git a/s3tests/functional/test_s3.py b/s3tests/functional/test_s3.py index 0e4d075..258e3d8 100644 --- a/s3tests/functional/test_s3.py +++ b/s3tests/functional/test_s3.py @@ -1034,17 +1034,20 @@ def test_object_write_file(): eq(got, 'bar') +def _get_post_url(conn, bucket): + + url = '{protocol}://{host}:{port}/{bucket}'.format(protocol= 'https' if conn.is_secure else 'http',\ + host=conn.host, port=conn.port, bucket=bucket.name) + return url + @attr(resource='object') @attr(method='post') @attr(operation='anonymous browser based upload via POST request') @attr(assertion='succeeds and returns written data') def test_post_object_anonymous_request(): bucket = get_new_bucket() + url = _get_post_url(s3.main, bucket) bucket.set_acl('public-read-write') - conn = s3.main - host_name = conn.host - url = '{protocol}://{host}/{bucket}'.format(protocol= 'https' if conn.is_secure else 'http',\ - host=host_name,bucket=bucket.name) payload = OrderedDict([("key" , "foo.txt"),("acl" , "public-read"),\ ("Content-Type" , "text/plain"),('file', ('bar'))]) @@ -1062,11 +1065,8 @@ def test_post_object_anonymous_request(): @attr(assertion='succeeds and returns written data') def test_post_object_authenticated_request(): bucket = get_new_bucket() - conn = s3.main - host_name = conn.host - url = '{protocol}://{host}/{bucket}'.format(protocol= 'https' if conn.is_secure else 'http',\ - host=host_name,bucket=bucket.name) + url = _get_post_url(s3.main, bucket) utc = pytz.utc expires = datetime.datetime.now(utc) + datetime.timedelta(seconds=+6000) @@ -1083,6 +1083,7 @@ def test_post_object_authenticated_request(): json_policy_document = json.JSONEncoder().encode(policy_document) policy = base64.b64encode(json_policy_document) + conn = s3.main signature = base64.b64encode(hmac.new(conn.aws_secret_access_key, policy, sha).digest()) payload = OrderedDict([ ("key" , "foo.txt"),("AWSAccessKeyId" , conn.aws_access_key_id),\ @@ -1103,10 +1104,7 @@ def test_post_object_authenticated_request(): def test_post_object_set_success_code(): bucket = get_new_bucket() bucket.set_acl('public-read-write') - conn = s3.main - host_name = conn.host - url = '{protocol}://{host}/{bucket}'.format(protocol= 'https' if conn.is_secure else 'http',\ - host=host_name,bucket=bucket.name) + url = _get_post_url(s3.main, bucket) payload = OrderedDict([("key" , "foo.txt"),("acl" , "public-read"),\ ("success_action_status" , "201"),\ @@ -1125,10 +1123,7 @@ def test_post_object_set_success_code(): def test_post_object_set_invalid_success_code(): bucket = get_new_bucket() bucket.set_acl('public-read-write') - conn = s3.main - host_name = conn.host - url = '{protocol}://{host}/{bucket}'.format(protocol= 'https' if conn.is_secure else 'http',\ - host=host_name,bucket=bucket.name) + url = _get_post_url(s3.main, bucket) payload = OrderedDict([("key" , "foo.txt"),("acl" , "public-read"),\ ("success_action_status" , "404"),\ @@ -1145,11 +1140,8 @@ def test_post_object_set_invalid_success_code(): @attr(assertion='succeeds and returns written data') def test_post_object_upload_larger_than_chunk(): bucket = get_new_bucket() - conn = s3.main - host_name = conn.host - url = '{protocol}://{host}/{bucket}'.format(protocol= 'https' if conn.is_secure else 'http',\ - host=host_name,bucket=bucket.name) + url = _get_post_url(s3.main, bucket) utc = pytz.utc expires = datetime.datetime.now(utc) + datetime.timedelta(seconds=+6000) @@ -1166,6 +1158,7 @@ def test_post_object_upload_larger_than_chunk(): json_policy_document = json.JSONEncoder().encode(policy_document) policy = base64.b64encode(json_policy_document) + conn = s3.main signature = base64.b64encode(hmac.new(conn.aws_secret_access_key, policy, sha).digest()) foo_string = 'foo' * 1024*1024 @@ -1187,11 +1180,8 @@ def test_post_object_upload_larger_than_chunk(): @attr(assertion='succeeds and returns written data') def test_post_object_set_key_from_filename(): bucket = get_new_bucket() - conn = s3.main - host_name = conn.host - url = '{protocol}://{host}/{bucket}'.format(protocol= 'https' if conn.is_secure else 'http',\ - host=host_name,bucket=bucket.name) + url = _get_post_url(s3.main, bucket) utc = pytz.utc expires = datetime.datetime.now(utc) + datetime.timedelta(seconds=+6000) @@ -1208,6 +1198,7 @@ def test_post_object_set_key_from_filename(): json_policy_document = json.JSONEncoder().encode(policy_document) policy = base64.b64encode(json_policy_document) + conn = s3.main signature = base64.b64encode(hmac.new(conn.aws_secret_access_key, policy, sha).digest()) payload = OrderedDict([ ("key" , "${filename}"),("AWSAccessKeyId" , conn.aws_access_key_id),\ @@ -1227,11 +1218,8 @@ def test_post_object_set_key_from_filename(): @attr(assertion='succeeds with status 204') def test_post_object_ignored_header(): bucket = get_new_bucket() - conn = s3.main - host_name = conn.host - url = '{protocol}://{host}/{bucket}'.format(protocol= 'https' if conn.is_secure else 'http', - host=host_name,bucket=bucket.name) + url = _get_post_url(s3.main, bucket) utc = pytz.utc expires = datetime.datetime.now(utc) + datetime.timedelta(seconds=+6000) @@ -1248,6 +1236,7 @@ def test_post_object_ignored_header(): json_policy_document = json.JSONEncoder().encode(policy_document) policy = base64.b64encode(json_policy_document) + conn = s3.main signature = base64.b64encode(hmac.new(conn.aws_secret_access_key, policy, sha).digest()) payload = OrderedDict([ ("key" , "foo.txt"),("AWSAccessKeyId" , conn.aws_access_key_id),\ @@ -1264,11 +1253,8 @@ def test_post_object_ignored_header(): @attr(assertion='succeeds with status 204') def test_post_object_case_insensitive_condition_fields(): bucket = get_new_bucket() - conn = s3.main - host_name = conn.host - url = '{protocol}://{host}/{bucket}'.format(protocol= 'https' if conn.is_secure else 'http',host=host_name,\ - bucket=bucket.name) + url = _get_post_url(s3.main, bucket) utc = pytz.utc expires = datetime.datetime.now(utc) + datetime.timedelta(seconds=+6000) @@ -1285,6 +1271,7 @@ def test_post_object_case_insensitive_condition_fields(): json_policy_document = json.JSONEncoder().encode(policy_document) policy = base64.b64encode(json_policy_document) + conn = s3.main signature = base64.b64encode(hmac.new(conn.aws_secret_access_key, policy, sha).digest()) payload = OrderedDict([ ("kEy" , "foo.txt"),("AWSAccessKeyId" , conn.aws_access_key_id),\ @@ -1301,11 +1288,8 @@ def test_post_object_case_insensitive_condition_fields(): @attr(assertion='succeeds with escaped leading $ and returns written data') def test_post_object_escaped_field_values(): bucket = get_new_bucket() - conn = s3.main - host_name = conn.host - url = '{protocol}://{host}/{bucket}'.format(protocol= 'https' if conn.is_secure else 'http',\ - host=host_name,bucket=bucket.name) + url = _get_post_url(s3.main, bucket) utc = pytz.utc expires = datetime.datetime.now(utc) + datetime.timedelta(seconds=+6000) @@ -1322,6 +1306,7 @@ def test_post_object_escaped_field_values(): json_policy_document = json.JSONEncoder().encode(policy_document) policy = base64.b64encode(json_policy_document) + conn = s3.main signature = base64.b64encode(hmac.new(conn.aws_secret_access_key, policy, sha).digest()) payload = OrderedDict([ ("key" , "\$foo.txt"),("AWSAccessKeyId" , conn.aws_access_key_id),\ @@ -1342,15 +1327,9 @@ def test_post_object_escaped_field_values(): def test_post_object_success_redirect_action(): bucket = get_new_bucket() - conn = s3.main - host_name = conn.host - - url = '{protocol}://{host}/{bucket}'.format(protocol= 'https' if conn.is_secure else 'http',\ - host=host_name,bucket=bucket.name) - - redirect_url = '{protocol}://{host}/{bucket}'.format(protocol= 'https' if conn.is_secure else 'http',\ - host=host_name,bucket=bucket.name) - bucket.set_acl('public-read') + url = _get_post_url(s3.main, bucket) + redirect_url = _get_post_url(s3.main, bucket) + bucket.set_acl('public-read') utc = pytz.utc expires = datetime.datetime.now(utc) + datetime.timedelta(seconds=+6000) @@ -1368,6 +1347,7 @@ def test_post_object_success_redirect_action(): json_policy_document = json.JSONEncoder().encode(policy_document) policy = base64.b64encode(json_policy_document) + conn = s3.main signature = base64.b64encode(hmac.new(conn.aws_secret_access_key, policy, sha).digest()) payload = OrderedDict([ ("key" , "foo.txt"),("AWSAccessKeyId" , conn.aws_access_key_id),\ @@ -1390,11 +1370,8 @@ def test_post_object_success_redirect_action(): @attr(assertion='fails with invalid signature error') def test_post_object_invalid_signature(): bucket = get_new_bucket() - conn = s3.main - host_name = conn.host - url = '{protocol}://{host}/{bucket}'.format(protocol= 'https' if conn.is_secure else 'http',\ - host=host_name,bucket=bucket.name) + url = _get_post_url(s3.main, bucket) utc = pytz.utc expires = datetime.datetime.now(utc) + datetime.timedelta(seconds=+6000) @@ -1411,6 +1388,7 @@ def test_post_object_invalid_signature(): json_policy_document = json.JSONEncoder().encode(policy_document) policy = base64.b64encode(json_policy_document) + conn = s3.main signature = base64.b64encode(hmac.new(conn.aws_secret_access_key, policy, sha).digest())[::-1] payload = OrderedDict([ ("key" , "\$foo.txt"),("AWSAccessKeyId" , conn.aws_access_key_id),\ @@ -1427,11 +1405,8 @@ def test_post_object_invalid_signature(): @attr(assertion='fails with access key does not exist error') def test_post_object_invalid_access_key(): bucket = get_new_bucket() - conn = s3.main - host_name = conn.host - url = '{protocol}://{host}/{bucket}'.format(protocol= 'https' if conn.is_secure else 'http',\ - host=host_name,bucket=bucket.name) + url = _get_post_url(s3.main, bucket) utc = pytz.utc expires = datetime.datetime.now(utc) + datetime.timedelta(seconds=+6000) @@ -1448,6 +1423,7 @@ def test_post_object_invalid_access_key(): json_policy_document = json.JSONEncoder().encode(policy_document) policy = base64.b64encode(json_policy_document) + conn = s3.main signature = base64.b64encode(hmac.new(conn.aws_secret_access_key, policy, sha).digest()) payload = OrderedDict([ ("key" , "\$foo.txt"),("AWSAccessKeyId" , conn.aws_access_key_id[::-1]),\ @@ -1464,11 +1440,8 @@ def test_post_object_invalid_access_key(): @attr(assertion='fails with invalid expiration error') def test_post_object_invalid_date_format(): bucket = get_new_bucket() - conn = s3.main - host_name = conn.host - url = '{protocol}://{host}/{bucket}'.format(protocol= 'https' if conn.is_secure else 'http',\ - host=host_name,bucket=bucket.name) + url = _get_post_url(s3.main, bucket) utc = pytz.utc expires = datetime.datetime.now(utc) + datetime.timedelta(seconds=+6000) @@ -1485,6 +1458,7 @@ def test_post_object_invalid_date_format(): json_policy_document = json.JSONEncoder().encode(policy_document) policy = base64.b64encode(json_policy_document) + conn = s3.main signature = base64.b64encode(hmac.new(conn.aws_secret_access_key, policy, sha).digest()) payload = OrderedDict([ ("key" , "\$foo.txt"),("AWSAccessKeyId" , conn.aws_access_key_id),\ @@ -1501,11 +1475,8 @@ def test_post_object_invalid_date_format(): @attr(assertion='fails with missing key error') def test_post_object_no_key_specified(): bucket = get_new_bucket() - conn = s3.main - host_name = conn.host - url = '{protocol}://{host}/{bucket}'.format(protocol= 'https' if conn.is_secure else 'http', - host=host_name,bucket=bucket.name) + url = _get_post_url(s3.main, bucket) utc = pytz.utc expires = datetime.datetime.now(utc) + datetime.timedelta(seconds=+6000) @@ -1521,6 +1492,7 @@ def test_post_object_no_key_specified(): json_policy_document = json.JSONEncoder().encode(policy_document) policy = base64.b64encode(json_policy_document) + conn = s3.main signature = base64.b64encode(hmac.new(conn.aws_secret_access_key, policy, sha).digest()) payload = OrderedDict([ ("AWSAccessKeyId" , conn.aws_access_key_id),\ @@ -1537,11 +1509,8 @@ def test_post_object_no_key_specified(): @attr(assertion='fails with missing signature error') def test_post_object_missing_signature(): bucket = get_new_bucket() - conn = s3.main - host_name = conn.host - url = '{protocol}://{host}/{bucket}'.format(protocol= 'https' if conn.is_secure else 'http',\ - host=host_name,bucket=bucket.name) + url = _get_post_url(s3.main, bucket) utc = pytz.utc expires = datetime.datetime.now(utc) + datetime.timedelta(seconds=+6000) @@ -1558,6 +1527,7 @@ def test_post_object_missing_signature(): json_policy_document = json.JSONEncoder().encode(policy_document) policy = base64.b64encode(json_policy_document) + conn = s3.main signature = base64.b64encode(hmac.new(conn.aws_secret_access_key, policy, sha).digest()) payload = OrderedDict([ ("key" , "foo.txt"),("AWSAccessKeyId" , conn.aws_access_key_id),\ @@ -1574,11 +1544,8 @@ def test_post_object_missing_signature(): @attr(assertion='fails with extra input fields policy error') def test_post_object_missing_policy_condition(): bucket = get_new_bucket() - conn = s3.main - host_name = conn.host - url = '{protocol}://{host}/{bucket}'.format(protocol= 'https' if conn.is_secure else 'http',\ - host=host_name,bucket=bucket.name) + url = _get_post_url(s3.main, bucket) utc = pytz.utc expires = datetime.datetime.now(utc) + datetime.timedelta(seconds=+6000) @@ -1594,6 +1561,7 @@ def test_post_object_missing_policy_condition(): json_policy_document = json.JSONEncoder().encode(policy_document) policy = base64.b64encode(json_policy_document) + conn = s3.main signature = base64.b64encode(hmac.new(conn.aws_secret_access_key, policy, sha).digest()) payload = OrderedDict([ ("key" , "foo.txt"),("AWSAccessKeyId" , conn.aws_access_key_id),\ @@ -1610,11 +1578,8 @@ def test_post_object_missing_policy_condition(): @attr(assertion='succeeds using starts-with restriction on metadata header') def test_post_object_user_specified_header(): bucket = get_new_bucket() - conn = s3.main - host_name = conn.host - url = '{protocol}://{host}/{bucket}'.format(protocol= 'https' if conn.is_secure else 'http',\ - host=host_name,bucket=bucket.name) + url = _get_post_url(s3.main, bucket) utc = pytz.utc expires = datetime.datetime.now(utc) + datetime.timedelta(seconds=+6000) @@ -1632,6 +1597,7 @@ def test_post_object_user_specified_header(): json_policy_document = json.JSONEncoder().encode(policy_document) policy = base64.b64encode(json_policy_document) + conn = s3.main signature = base64.b64encode(hmac.new(conn.aws_secret_access_key, policy, sha).digest()) payload = OrderedDict([ ("key" , "foo.txt"),("AWSAccessKeyId" , conn.aws_access_key_id),\ @@ -1650,11 +1616,8 @@ def test_post_object_user_specified_header(): @attr(assertion='fails with policy condition failed error due to missing field in POST request') def test_post_object_request_missing_policy_specified_field(): bucket = get_new_bucket() - conn = s3.main - host_name = conn.host - url = '{protocol}://{host}/{bucket}'.format(protocol= 'https' if conn.is_secure else 'http', - host=host_name,bucket=bucket.name) + url = _get_post_url(s3.main, bucket) utc = pytz.utc expires = datetime.datetime.now(utc) + datetime.timedelta(seconds=+6000) @@ -1672,6 +1635,7 @@ def test_post_object_request_missing_policy_specified_field(): json_policy_document = json.JSONEncoder().encode(policy_document) policy = base64.b64encode(json_policy_document) + conn = s3.main signature = base64.b64encode(hmac.new(conn.aws_secret_access_key, policy, sha).digest()) payload = OrderedDict([ ("key" , "foo.txt"),("AWSAccessKeyId" , conn.aws_access_key_id),\ @@ -1688,11 +1652,8 @@ def test_post_object_request_missing_policy_specified_field(): @attr(assertion='fails with conditions must be list error') def test_post_object_condition_is_case_sensitive(): bucket = get_new_bucket() - conn = s3.main - host_name = conn.host - url = '{protocol}://{host}/{bucket}'.format(protocol= 'https' if conn.is_secure else 'http',\ - host=host_name,bucket=bucket.name) + url = _get_post_url(s3.main, bucket) utc = pytz.utc expires = datetime.datetime.now(utc) + datetime.timedelta(seconds=+6000) @@ -1709,6 +1670,7 @@ def test_post_object_condition_is_case_sensitive(): json_policy_document = json.JSONEncoder().encode(policy_document) policy = base64.b64encode(json_policy_document) + conn = s3.main signature = base64.b64encode(hmac.new(conn.aws_secret_access_key, policy, sha).digest()) payload = OrderedDict([ ("key" , "foo.txt"),("AWSAccessKeyId" , conn.aws_access_key_id),\ @@ -1725,11 +1687,8 @@ def test_post_object_condition_is_case_sensitive(): @attr(assertion='fails with expiration must be string error') def test_post_object_expires_is_case_sensitive(): bucket = get_new_bucket() - conn = s3.main - host_name = conn.host - url = '{protocol}://{host}/{bucket}'.format(protocol= 'https' if conn.is_secure else 'http',\ - host=host_name,bucket=bucket.name) + url = _get_post_url(s3.main, bucket) utc = pytz.utc expires = datetime.datetime.now(utc) + datetime.timedelta(seconds=+6000) @@ -1746,6 +1705,7 @@ def test_post_object_expires_is_case_sensitive(): json_policy_document = json.JSONEncoder().encode(policy_document) policy = base64.b64encode(json_policy_document) + conn = s3.main signature = base64.b64encode(hmac.new(conn.aws_secret_access_key, policy, sha).digest()) payload = OrderedDict([ ("key" , "foo.txt"),("AWSAccessKeyId" , conn.aws_access_key_id),\ @@ -1762,11 +1722,8 @@ def test_post_object_expires_is_case_sensitive(): @attr(assertion='fails with policy expired error') def test_post_object_expired_policy(): bucket = get_new_bucket() - conn = s3.main - host_name = conn.host - url = '{protocol}://{host}/{bucket}'.format(protocol= 'https' if conn.is_secure else 'http',\ - host=host_name,bucket=bucket.name) + url = _get_post_url(s3.main, bucket) utc = pytz.utc expires = datetime.datetime.now(utc) + datetime.timedelta(seconds=-6000) @@ -1783,6 +1740,7 @@ def test_post_object_expired_policy(): json_policy_document = json.JSONEncoder().encode(policy_document) policy = base64.b64encode(json_policy_document) + conn = s3.main signature = base64.b64encode(hmac.new(conn.aws_secret_access_key, policy, sha).digest()) payload = OrderedDict([ ("key" , "foo.txt"),("AWSAccessKeyId" , conn.aws_access_key_id),\ @@ -1799,11 +1757,8 @@ def test_post_object_expired_policy(): @attr(assertion='fails using equality restriction on metadata header') def test_post_object_invalid_request_field_value(): bucket = get_new_bucket() - conn = s3.main - host_name = conn.host - url = '{protocol}://{host}/{bucket}'.format(protocol= 'https' if conn.is_secure else 'http',\ - host=host_name,bucket=bucket.name) + url = _get_post_url(s3.main, bucket) utc = pytz.utc expires = datetime.datetime.now(utc) + datetime.timedelta(seconds=+6000) @@ -1821,6 +1776,7 @@ def test_post_object_invalid_request_field_value(): json_policy_document = json.JSONEncoder().encode(policy_document) policy = base64.b64encode(json_policy_document) + conn = s3.main signature = base64.b64encode(hmac.new(conn.aws_secret_access_key, policy, sha).digest()) payload = OrderedDict([ ("key" , "foo.txt"),("AWSAccessKeyId" , conn.aws_access_key_id),\ @@ -1837,11 +1793,8 @@ def test_post_object_invalid_request_field_value(): @attr(assertion='fails with policy missing expiration error') def test_post_object_missing_expires_condition(): bucket = get_new_bucket() - conn = s3.main - host_name = conn.host - url = '{protocol}://{host}/{bucket}'.format(protocol= 'https' if conn.is_secure else 'http',\ - host=host_name,bucket=bucket.name) + url = _get_post_url(s3.main, bucket) utc = pytz.utc expires = datetime.datetime.now(utc) + datetime.timedelta(seconds=+6000) @@ -1858,6 +1811,7 @@ def test_post_object_missing_expires_condition(): json_policy_document = json.JSONEncoder().encode(policy_document) policy = base64.b64encode(json_policy_document) + conn = s3.main signature = base64.b64encode(hmac.new(conn.aws_secret_access_key, policy, sha).digest()) payload = OrderedDict([ ("key" , "foo.txt"),("AWSAccessKeyId" , conn.aws_access_key_id),\ @@ -1874,11 +1828,8 @@ def test_post_object_missing_expires_condition(): @attr(assertion='fails with policy missing conditions error') def test_post_object_missing_conditions_list(): bucket = get_new_bucket() - conn = s3.main - host_name = conn.host - url = '{protocol}://{host}/{bucket}'.format(protocol= 'https' if conn.is_secure else 'http',\ - host=host_name,bucket=bucket.name) + url = _get_post_url(s3.main, bucket) utc = pytz.utc expires = datetime.datetime.now(utc) + datetime.timedelta(seconds=+6000) @@ -1888,6 +1839,7 @@ def test_post_object_missing_conditions_list(): json_policy_document = json.JSONEncoder().encode(policy_document) policy = base64.b64encode(json_policy_document) + conn = s3.main signature = base64.b64encode(hmac.new(conn.aws_secret_access_key, policy, sha).digest()) payload = OrderedDict([ ("key" , "foo.txt"),("AWSAccessKeyId" , conn.aws_access_key_id),\ @@ -1904,11 +1856,8 @@ def test_post_object_missing_conditions_list(): @attr(assertion='fails with allowable upload size exceeded error') def test_post_object_upload_size_limit_exceeded(): bucket = get_new_bucket() - conn = s3.main - host_name = conn.host - url = '{protocol}://{host}/{bucket}'.format(protocol= 'https' if conn.is_secure else 'http',\ - host=host_name,bucket=bucket.name) + url = _get_post_url(s3.main, bucket) utc = pytz.utc expires = datetime.datetime.now(utc) + datetime.timedelta(seconds=+6000) @@ -1925,6 +1874,7 @@ def test_post_object_upload_size_limit_exceeded(): json_policy_document = json.JSONEncoder().encode(policy_document) policy = base64.b64encode(json_policy_document) + conn = s3.main signature = base64.b64encode(hmac.new(conn.aws_secret_access_key, policy, sha).digest()) payload = OrderedDict([ ("key" , "foo.txt"),("AWSAccessKeyId" , conn.aws_access_key_id),\ @@ -1941,11 +1891,8 @@ def test_post_object_upload_size_limit_exceeded(): @attr(assertion='fails with invalid content length error') def test_post_object_missing_content_length_argument(): bucket = get_new_bucket() - conn = s3.main - host_name = conn.host - url = '{protocol}://{host}/{bucket}'.format(protocol= 'https' if conn.is_secure else 'http',\ - host=host_name,bucket=bucket.name) + url = _get_post_url(s3.main, bucket) utc = pytz.utc expires = datetime.datetime.now(utc) + datetime.timedelta(seconds=+6000) @@ -1962,6 +1909,7 @@ def test_post_object_missing_content_length_argument(): json_policy_document = json.JSONEncoder().encode(policy_document) policy = base64.b64encode(json_policy_document) + conn = s3.main signature = base64.b64encode(hmac.new(conn.aws_secret_access_key, policy, sha).digest()) payload = OrderedDict([ ("key" , "foo.txt"),("AWSAccessKeyId" , conn.aws_access_key_id),\ @@ -1978,11 +1926,8 @@ def test_post_object_missing_content_length_argument(): @attr(assertion='fails with invalid JSON error') def test_post_object_invalid_content_length_argument(): bucket = get_new_bucket() - conn = s3.main - host_name = conn.host - url = '{protocol}://{host}/{bucket}'.format(protocol= 'https' if conn.is_secure else 'http',\ - host=host_name,bucket=bucket.name) + url = _get_post_url(s3.main, bucket) utc = pytz.utc expires = datetime.datetime.now(utc) + datetime.timedelta(seconds=+6000) @@ -1999,6 +1944,7 @@ def test_post_object_invalid_content_length_argument(): json_policy_document = json.JSONEncoder().encode(policy_document) policy = base64.b64encode(json_policy_document) + conn = s3.main signature = base64.b64encode(hmac.new(conn.aws_secret_access_key, policy, sha).digest()) payload = OrderedDict([ ("key" , "foo.txt"),("AWSAccessKeyId" , conn.aws_access_key_id),\ @@ -2015,11 +1961,8 @@ def test_post_object_invalid_content_length_argument(): @attr(assertion='fails with upload size less than minimum allowable error') def test_post_object_upload_size_below_minimum(): bucket = get_new_bucket() - conn = s3.main - host_name = conn.host - url = '{protocol}://{host}/{bucket}'.format(protocol= 'https' if conn.is_secure else 'http',\ - host=host_name,bucket=bucket.name) + url = _get_post_url(s3.main, bucket) utc = pytz.utc expires = datetime.datetime.now(utc) + datetime.timedelta(seconds=+6000) @@ -2036,6 +1979,7 @@ def test_post_object_upload_size_below_minimum(): json_policy_document = json.JSONEncoder().encode(policy_document) policy = base64.b64encode(json_policy_document) + conn = s3.main signature = base64.b64encode(hmac.new(conn.aws_secret_access_key, policy, sha).digest()) payload = OrderedDict([ ("key" , "foo.txt"),("AWSAccessKeyId" , conn.aws_access_key_id),\