Compare commits

...
Sign in to create a new pull request.

15 commits

Author SHA1 Message Date
Casey Bodley
62e9fc0faf requirements: unpin 'requests' from old v0.14
Signed-off-by: Casey Bodley <cbodley@redhat.com>
2018-02-16 11:29:53 -05:00
Casey Bodley
8dfbf70d16 cors: allow None to match missing headers
this is apparently needed to run against newer versions of the requests
library

Signed-off-by: Casey Bodley <cbodley@redhat.com>
2018-02-16 11:29:53 -05:00
Matt Benjamin
e050c10295 Add test case for POST with no Content-Type header
Signed-off-by: Matt Benjamin <mbenjamin@redhat.com>
(cherry picked from commit f5ad2bfaee)
2018-01-04 14:35:54 -05:00
Xinying Song
277c2993eb Add test case for getting version-id when upload objects to bucket
For default-bucket and versioning-suspended-bucket, no version-id should return when upload.
For versioning-enabled-bucket, an non-empty version-id should return and equal to what
bucket list shows.

Signed-off-by: Xinying Song <songxinying@cloudin.cn>
(cherry picked from commit 9506ac220f)
2017-11-27 14:58:06 -05:00
Casey Bodley
55e1c95b2f check precedence of Date and X-Amz-Date in signatures
Signed-off-by: Casey Bodley <cbodley@redhat.com>
(cherry picked from commit a2f73f6a84)
2017-11-06 10:31:40 -05:00
Tim Burke
b1c1d2d98b Care less about ordering when checking grants
Some tests, like test_object_header_acl_grants and
test_bucket_header_acl_grants, use the same id for all permissions.
Sort() is stable, so those tests end up testing the order of acl grants
returned by Boto. Not sure, but I think this may in turn depend on the
order of HTTP headers, where the order is not significant.

Signed-off-by: Tim Burke <tim.burke@gmail.com>
(cherry picked from commit 301272c0ef)
2017-11-06 10:30:57 -05:00
yuliyang
a271ebeaa1 test for https://github.com/ceph/ceph/pull/17934
Signed-off-by: yuliyang <yuliyang@cmss.chinamobile.com>
(cherry picked from commit f464a5a3b1)
2017-10-26 13:12:56 -04:00
Casey Bodley
74e0956621 expect 400 for missing sse headers
Signed-off-by: Casey Bodley <cbodley@redhat.com>
(cherry picked from commit 42debb8be7)
2017-10-10 15:01:10 -04:00
Yehuda Sadeh
c82bd9a403 Merge pull request #187 from theanalyst/wip-lc-fixes
add the LC fixes to ceph-master

Reviewed-by: Yehuda Sadeh <yehuda@redhat.com>
2017-10-04 07:37:21 -07:00
Abhishek Lekshmanan
d46175f31e lifecycle: add test cases using filter & no id
Added test cases for LC policies with no ID in xml, no prefix and an
empty and non empty Filter.

Signed-off-by: Abhishek Lekshmanan <abhishek@suse.com>
(cherry picked from commit cc2d6f076f)
2017-09-20 09:55:17 +02:00
Abhishek Lekshmanan
9e7b2ae53d lc: Give more flexibility for LC expiration times
Earlier values expected a lc debug interval of 2s, which is a pretty
short time window and can often lead to failures if the processing
didn't complete within the next day. This commit assumes the currently
configured LC debug interval of 10s, and gives time intervals using the
following logic:

Worst case:
LC start-time : 00:00
obj upload    : 00:01
LC run1       : 00:10 -> object not expired as it is only 9s old
LC run2       : 00:20 -> object will expire in this run, however we
can't exactly guess when this run will complete, for a moderate amount
of objects this can take anywhere between 1 to a max of 10s, let us give
it a wiggle room to complete around 8s, given the amount of objects in a
teuthology run, it should be mostly probable that the object is already
deleted within this time, so at 28s, we should have seen day1 objects being
expired.

Best case:
LC start-time: 00:00
obj upload : 00:09
LC run1 : 00:10
LC run2 : 00:20 -> obj expires, so elapsed time is around 11->19s (of
course it would almost close to 10s too), We should probably configure
the LC lock time to 10s as well just so as to ensure that the lock isn't
held for the default 60s in which case it is possible that the object
might expire in a time greater than the lock interval.

Signed-off-by: Abhishek Lekshmanan <abhishek@suse.com>
(cherry picked from commit 9204283def)
2017-09-20 09:49:45 +02:00
Abhishek Lekshmanan
7a6d52afd5 lc: add attr lifecycle_expiration for lc tests using expiration
Allows us to additionally filter out these tests which currently fail in
the teuthology runs

Signed-off-by: Abhishek Lekshmanan <abhishek@suse.com>
(cherry picked from commit edeb956a82)
2017-09-20 09:49:30 +02:00
hechuang
d8fecfdc66 modify test_sse_kms_method_head() for aws standard
Signed-off-by: hechuang <hechuang@xsky.com>
(cherry picked from commit 1ca6a94e6a)
2017-08-30 08:56:55 -04:00
hechuang
af18c59d22 add test_sse_kms_read_declare() for aws standard
Signed-off-by: hechuang <hechuang@xsky.com>
(cherry picked from commit b554973239)
2017-08-30 08:56:55 -04:00
hechuang
2d7d79c95d rgw: Data encryption is not follow the AWS agreement
Encryption request headers should not be sent for GET requests and HEAD
requests if your object uses SSE-KMS/SSE-S3 or you’ll get an HTTP 400
BadRequest error.

Signed-off-by: hechuang <hechuang@xsky.com>
(cherry picked from commit 58944d0ba6)
2017-08-30 08:56:55 -04:00
3 changed files with 267 additions and 45 deletions

View file

@ -5,7 +5,7 @@ bunch >=1.0.0
# 0.14 switches to libev, that means bootstrap needs to change too
gevent >=1.0
isodate >=0.4.4
requests ==0.14.0
requests >=0.14.0
pytz >=2011k
ordereddict
httplib2

View file

@ -13,6 +13,7 @@ import socket
import ssl
import os
import re
from email.utils import formatdate
from urlparse import urlparse
@ -469,6 +470,28 @@ def test_object_create_bad_authorization_empty():
eq(e.reason, 'Forbidden')
eq(e.error_code, 'AccessDenied')
@tag('auth_common')
@attr(resource='object')
@attr(method='put')
@attr(operation='create w/date and x-amz-date')
@attr(assertion='succeeds')
@nose.with_setup(teardown=_clear_custom_headers)
def test_object_create_date_and_amz_date():
date = formatdate(usegmt=True)
key = _setup_bad_object({'Date': date, 'X-Amz-Date': date})
key.set_contents_from_string('bar')
@tag('auth_common')
@attr(resource='object')
@attr(method='put')
@attr(operation='create w/x-amz-date and no date')
@attr(assertion='succeeds')
@nose.with_setup(teardown=_clear_custom_headers)
def test_object_create_amz_date_and_no_date():
date = formatdate(usegmt=True)
key = _setup_bad_object({'X-Amz-Date': date}, ('Date',))
key.set_contents_from_string('bar')
# the teardown is really messed up here. check it out
@tag('auth_common')

View file

@ -28,11 +28,13 @@ import re
import xml.etree.ElementTree as ET
from collections import namedtuple
from email.Utils import formatdate
from httplib import HTTPConnection, HTTPSConnection
from urlparse import urlparse
from nose.tools import eq_ as eq
from nose.tools import assert_is_none
from nose.plugins.attrib import attr
from nose.plugins.skip import SkipTest
from boto.s3.tagging import TagSet
@ -80,8 +82,8 @@ def check_grants(got, want):
in any order.
"""
eq(len(got), len(want))
got = sorted(got, key=operator.attrgetter('id'))
want = sorted(want, key=operator.itemgetter('id'))
got = sorted(got, key=operator.attrgetter('id', 'permission'))
want = sorted(want, key=operator.itemgetter('id', 'permission'))
for g, w in zip(got, want):
w = dict(w)
eq(g.permission, w.pop('permission'))
@ -893,6 +895,21 @@ def test_bucket_list_return_data_versioning():
eq(key.version_id, key_data['version_id'])
@attr(resource='bucket')
@attr(method='get')
@attr(operation='list keys after marker when bucket versioning is configured')
@attr(assertion='marker list on versioning bucket')
def test_bucket_list_marker_versioning():
bucket = get_new_bucket()
check_configure_versioning_retry(bucket, True, "Enabled")
key_names = ['bar', 'baz', 'foo']
bucket = _create_keys(bucket=bucket, keys=key_names)
li = bucket.get_all_keys(marker='baz')
eq(li.marker, 'baz')
names = [e.name for e in li]
eq(names, ['foo'])
@attr(resource='object.metadata')
@attr(method='head')
@attr(operation='modification-times')
@ -1483,6 +1500,42 @@ def test_post_object_authenticated_request():
got = key.get_contents_as_string()
eq(got, 'bar')
@attr(resource='object')
@attr(method='post')
@attr(operation='authenticated browser based upload via POST request, no content-type header')
@attr(assertion='succeeds and returns written data')
def test_post_object_authenticated_no_content_type():
bucket = get_new_bucket()
url = _get_post_url(s3.main, bucket)
utc = pytz.utc
expires = datetime.datetime.now(utc) + datetime.timedelta(seconds=+6000)
policy_document = {"expiration": expires.strftime("%Y-%m-%dT%H:%M:%SZ"),\
"conditions": [\
{"bucket": bucket.name},\
["starts-with", "$key", "foo"],\
{"acl": "private"},\
["content-length-range", 0, 1024]\
]\
}
json_policy_document = json.JSONEncoder().encode(policy_document)
policy = base64.b64encode(json_policy_document)
conn = s3.main
signature = base64.b64encode(hmac.new(conn.aws_secret_access_key, policy, sha).digest())
payload = OrderedDict([ ("key" , "foo.txt"),("AWSAccessKeyId" , conn.aws_access_key_id),\
("acl" , "private"),("signature" , signature),("policy" , policy),\
('file', ('bar'))])
r = requests.post(url, files = payload)
eq(r.status_code, 204)
key = bucket.get_key("foo.txt")
got = key.get_contents_as_string()
eq(got, 'bar')
@attr(resource='object')
@attr(method='post')
@attr(operation='authenticated browser based upload via POST request, bad access key')
@ -5779,8 +5832,8 @@ def _cors_request_and_check(func, url, headers, expect_status, expect_allow_orig
r = func(url, headers=headers)
eq(r.status_code, expect_status)
assert r.headers['access-control-allow-origin'] == expect_allow_origin
assert r.headers['access-control-allow-methods'] == expect_allow_methods
assert r.headers.get('access-control-allow-origin', None) == expect_allow_origin
assert r.headers.get('access-control-allow-methods', None) == expect_allow_methods
@ -7492,7 +7545,8 @@ def create_lifecycle(days = None, prefix = 'test/', rules = None):
else:
for rule in rules:
expiration = boto.s3.lifecycle.Expiration(days=rule['days'])
rule = boto.s3.lifecycle.Rule(id=rule['id'], prefix=rule['prefix'],
_id = rule.get('id',None)
rule = boto.s3.lifecycle.Rule(id=_id, prefix=rule['prefix'],
status=rule['status'], expiration=expiration)
lifecycle.append(rule)
return lifecycle
@ -7529,27 +7583,59 @@ def test_lifecycle_get():
eq(current[1].id, 'test2/')
eq(current[1].prefix, 'test2/')
@attr(resource='bucket')
@attr(method='get')
@attr(operation='get lifecycle config no id')
@attr('lifecycle')
def test_lifecycle_get_no_id():
Rule = namedtuple('Rule',['prefix','status','days'])
rules = {'rule1' : Rule('test1/','Enabled',31),
'rule2' : Rule('test2/','Enabled',120)}
bucket = set_lifecycle(rules=[{'days': rules['rule1'].days ,
'prefix': rules['rule1'].prefix,
'status': rules['rule1'].status},
{'days': rules['rule2'].days,
'prefix': rules['rule2'].prefix,
'status': rules['rule2'].status}])
current_lc = bucket.get_lifecycle_config()
# We can't guarantee the order of XML, since the id is random, let's walk
# through the rules and validate that both were present
for lc_rule in current_lc:
if lc_rule.expiration.days == rules['rule1'].days:
eq(lc_rule.prefix, rules['rule1'].prefix)
assert len(lc_rule.id) > 0
elif lc_rule.expiration.days == rules['rule2'].days:
eq(lc_rule.prefix, rules['rule2'].prefix)
assert len(lc_rule.id) > 0
else:
# neither of the rules we supplied, something wrong
assert False
# The test harnass for lifecycle is configured to treat days as 2 second intervals.
@attr(resource='bucket')
@attr(method='put')
@attr(operation='test lifecycle expiration')
@attr('lifecycle')
@attr('lifecycle_expiration')
@attr('fails_on_aws')
def test_lifecycle_expiration():
bucket = set_lifecycle(rules=[{'id': 'rule1', 'days': 2, 'prefix': 'expire1/', 'status': 'Enabled'},
{'id':'rule2', 'days': 6, 'prefix': 'expire3/', 'status': 'Enabled'}])
bucket = set_lifecycle(rules=[{'id': 'rule1', 'days': 1, 'prefix': 'expire1/', 'status': 'Enabled'},
{'id':'rule2', 'days': 4, 'prefix': 'expire3/', 'status': 'Enabled'}])
_create_keys(bucket=bucket, keys=['expire1/foo', 'expire1/bar', 'keep2/foo',
'keep2/bar', 'expire3/foo', 'expire3/bar'])
# Get list of all keys
init_keys = bucket.get_all_keys()
# Wait for first expiration (plus fudge to handle the timer window)
time.sleep(6)
time.sleep(28)
expire1_keys = bucket.get_all_keys()
# Wait for next expiration cycle
time.sleep(2)
time.sleep(10)
keep2_keys = bucket.get_all_keys()
# Wait for final expiration cycle
time.sleep(8)
time.sleep(20)
expire3_keys = bucket.get_all_keys()
eq(len(init_keys), 6)
@ -7622,7 +7708,15 @@ def test_lifecycle_rules_conflicted():
def generate_lifecycle_body(rules):
body = '<?xml version="1.0" encoding="UTF-8"?><LifecycleConfiguration>'
for rule in rules:
body += '<Rule><ID>%s</ID><Prefix>%s</Prefix><Status>%s</Status>' % (rule['ID'], rule['Prefix'], rule['Status'])
body += '<Rule><ID>%s</ID><Status>%s</Status>' % (rule['ID'], rule['Status'])
if 'Prefix' in rule.keys():
body += '<Prefix>%s</Prefix>' % rule['Prefix']
if 'Filter' in rule.keys():
prefix_str= '' # AWS supports empty filters
if 'Prefix' in rule['Filter'].keys():
prefix_str = '<Prefix>%s</Prefix>' % rule['Filter']['Prefix']
body += '<Filter>%s</Filter>' % prefix_str
if 'Expiration' in rule.keys():
if 'ExpiredObjectDeleteMarker' in rule['Expiration'].keys():
body += '<Expiration><ExpiredObjectDeleteMarker>%s</ExpiredObjectDeleteMarker></Expiration>' \
@ -7686,6 +7780,7 @@ def test_lifecycle_set_invalid_date():
@attr(method='put')
@attr(operation='test lifecycle expiration with date')
@attr('lifecycle')
@attr('lifecycle_expiration')
@attr('fails_on_aws')
def test_lifecycle_expiration_date():
bucket = get_new_bucket()
@ -7734,6 +7829,7 @@ def test_lifecycle_set_noncurrent():
@attr(method='put')
@attr(operation='test lifecycle non-current version expiration')
@attr('lifecycle')
@attr('lifecycle_expiration')
@attr('fails_on_aws')
def test_lifecycle_noncur_expiration():
bucket = get_new_bucket()
@ -7774,12 +7870,51 @@ def test_lifecycle_set_deletemarker():
eq(res.status, 200)
eq(res.reason, 'OK')
@attr(resource='bucket')
@attr(method='put')
@attr(operation='set lifecycle config with Filter')
@attr('lifecycle')
def test_lifecycle_set_filter():
bucket = get_new_bucket()
rules = [
{'ID': 'rule1', 'Filter': {'Prefix': 'foo'}, 'Status': 'Enabled', 'Expiration': {'ExpiredObjectDeleteMarker': 'true'}}
]
body = generate_lifecycle_body(rules)
fp = StringIO(body)
md5 = boto.utils.compute_md5(fp)
headers = {'Content-MD5': md5[1], 'Content-Type': 'text/xml'}
res = bucket.connection.make_request('PUT', bucket.name, data=fp.getvalue(), query_args='lifecycle',
headers=headers)
eq(res.status, 200)
eq(res.reason, 'OK')
@attr(resource='bucket')
@attr(method='put')
@attr(operation='set lifecycle config with empty Filter')
@attr('lifecycle')
def test_lifecycle_set_empty_filter():
bucket = get_new_bucket()
rules = [
{'ID': 'rule1', 'Filter': {}, 'Status': 'Enabled', 'Expiration': {'ExpiredObjectDeleteMarker': 'true'}}
]
body = generate_lifecycle_body(rules)
fp = StringIO(body)
md5 = boto.utils.compute_md5(fp)
headers = {'Content-MD5': md5[1], 'Content-Type': 'text/xml'}
res = bucket.connection.make_request('PUT', bucket.name, data=fp.getvalue(), query_args='lifecycle',
headers=headers)
eq(res.status, 200)
eq(res.reason, 'OK')
# The test harnass for lifecycle is configured to treat days as 1 second intervals.
@attr(resource='bucket')
@attr(method='put')
@attr(operation='test lifecycle delete marker expiration')
@attr('lifecycle')
@attr('lifecycle_expiration')
@attr('fails_on_aws')
def test_lifecycle_deletemarker_expiration():
bucket = get_new_bucket()
@ -7832,6 +7967,7 @@ def test_lifecycle_set_multipart():
@attr(method='put')
@attr(operation='test lifecycle multipart expiration')
@attr('lifecycle')
@attr('lifecycle_expiration')
@attr('fails_on_aws')
def test_lifecycle_multipart_expiration():
bucket = get_new_bucket()
@ -8043,9 +8179,9 @@ def test_encryption_key_no_sse_c():
}
key = bucket.new_key('testobj')
data = 'A'*100
key.set_contents_from_string(data, headers=sse_client_headers)
rdata = key.get_contents_as_string()
eq(data, rdata)
e = assert_raises(boto.exception.S3ResponseError,
key.set_contents_from_string, data, headers=sse_client_headers)
eq(e.status, 400)
def _multipart_upload_enc(bucket, s3_key_name, size, part_size=5*1024*1024,
@ -8281,7 +8417,7 @@ def _test_sse_kms_customer_write(file_size, key_id = 'testkey-1'):
key = bucket.new_key('testobj')
data = 'A'*file_size
key.set_contents_from_string(data, headers=sse_kms_client_headers)
rdata = key.get_contents_as_string(headers=sse_kms_client_headers)
rdata = key.get_contents_as_string()
eq(data, rdata)
@ -8340,6 +8476,9 @@ def test_sse_kms_method_head():
eq(res.status, 200)
eq(res.getheader('x-amz-server-side-encryption'), 'aws:kms')
eq(res.getheader('x-amz-server-side-encryption-aws-kms-key-id'), 'testkey-1')
res = _make_request('HEAD', bucket, key, authenticated=True, request_headers=sse_kms_client_headers)
eq(res.status, 400)
@attr(resource='object')
@ -8360,28 +8499,6 @@ def test_sse_kms_present():
eq(data, result)
@attr(resource='object')
@attr(method='put')
@attr(operation='write encrypted with SSE-KMS but read with other key')
@attr(assertion='operation fails')
@attr('encryption')
def test_sse_kms_other_key():
bucket = get_new_bucket()
sse_kms_client_headers_A = {
'x-amz-server-side-encryption': 'aws:kms',
'x-amz-server-side-encryption-aws-kms-key-id': 'testkey-1'
}
sse_kms_client_headers_B = {
'x-amz-server-side-encryption': 'aws:kms',
'x-amz-server-side-encryption-aws-kms-key-id': 'testkey-2'
}
key = bucket.new_key('testobj')
data = 'A'*100
key.set_contents_from_string(data, headers=sse_kms_client_headers_A)
result = key.get_contents_as_string(headers=sse_kms_client_headers_B)
eq(data, result)
@attr(resource='object')
@attr(method='put')
@attr(operation='declare SSE-KMS but do not provide key_id')
@ -8410,9 +8527,9 @@ def test_sse_kms_not_declared():
}
key = bucket.new_key('testobj')
data = 'A'*100
key.set_contents_from_string(data, headers=sse_kms_client_headers)
rdata = key.get_contents_as_string()
eq(data, rdata)
e = assert_raises(boto.exception.S3ResponseError,
key.set_contents_from_string, data, headers=sse_kms_client_headers)
eq(e.status, 400)
@attr(resource='object')
@ -8442,13 +8559,13 @@ def test_sse_kms_multipart_upload():
k = bucket.get_key(key)
eq(k.metadata['foo'], 'bar')
eq(k.content_type, content_type)
test_string = k.get_contents_as_string(headers=enc_headers)
test_string = k.get_contents_as_string()
eq(len(test_string), k.size)
eq(data, test_string)
eq(test_string, data)
_check_content_using_range_enc(k, data, 1000000, enc_headers=enc_headers)
_check_content_using_range_enc(k, data, 10000000, enc_headers=enc_headers)
_check_content_using_range(k, data, 1000000)
_check_content_using_range(k, data, 10000000)
@attr(resource='object')
@ -8544,7 +8661,7 @@ def test_sse_kms_post_object_authenticated_request():
}
key = bucket.get_key("foo.txt")
got = key.get_contents_as_string(headers=get_headers)
got = key.get_contents_as_string()
eq(got, 'bar')
@attr(resource='object')
@ -8590,6 +8707,23 @@ def test_sse_kms_barb_transfer_13b():
raise SkipTest
_test_sse_kms_customer_write(13, key_id = config['main']['kms_keyid'])
@attr(resource='object')
@attr(method='get')
@attr(operation='write encrypted with SSE-KMS and read with SSE-KMS')
@attr(assertion='operation fails')
@attr('encryption')
def test_sse_kms_read_declare():
bucket = get_new_bucket()
sse_kms_client_headers = {
'x-amz-server-side-encryption': 'aws:kms',
'x-amz-server-side-encryption-aws-kms-key-id': 'testkey-1'
}
key = bucket.new_key('testobj')
data = 'A'*100
key.set_contents_from_string(data, headers=sse_kms_client_headers)
e = assert_raises(boto.exception.S3ResponseError, key.get_contents_as_string, headers=sse_kms_client_headers)
eq(e.status, 400)
@attr(resource='bucket')
@attr(method='get')
@attr(operation='Test Bucket Policy')
@ -9205,3 +9339,68 @@ def test_delete_tags_obj_public():
tags = _get_obj_tags(bucket, key.name)
eq(len(tags),0)
#eq(input_tagset, res_tagset)
@attr(resource='object')
@attr(method='put')
@attr(operation='test whether a correct version-id returned')
@attr(assertion='version-id is same as bucket list')
def test_versioning_bucket_atomic_upload_return_version_id():
# for versioning-enabled-bucket, an non-empty version-id should return
bucket = get_new_bucket()
check_configure_versioning_retry(bucket, True, "Enabled")
key_name = 'bar'
key = bucket.new_key(key_name)
key.set_contents_from_string(key_name)
# list_verions will return an non-empty version-id
li = bucket.list_versions()
for k in li:
eq(key.version_id, k.version_id)
# for versioning-default-bucket, no version-id should return.
bucket = get_new_bucket()
key_name = 'baz'
key = bucket.new_key(key_name)
key.set_contents_from_string(key_name)
assert_is_none(key.version_id)
# for versioning-suspended-bucket, no version-id should return.
bucket = get_new_bucket()
check_configure_versioning_retry(bucket, False, "Suspended")
key_name = 'foo'
key = bucket.new_key(key_name)
key.set_contents_from_string(key_name)
assert_is_none(key.version_id)
@attr(resource='object')
@attr(method='put')
@attr(operation='test whether a correct version-id returned')
@attr(assertion='version-id is same as bucket list')
def test_versioning_bucket_multipart_upload_return_version_id():
content_type='text/bla'
objlen = 30 * 1024 * 1024
# for versioning-enabled-bucket, an non-empty version-id should return
bucket = get_new_bucket()
check_configure_versioning_retry(bucket, True, "Enabled")
key_name = 'bar'
(upload, data) = _multipart_upload(bucket, key_name, objlen, headers={'Content-Type': content_type}, metadata={'foo': 'baz'})
res = upload.complete_upload()
# list_verions will return an non-empty version-id
li = bucket.list_versions()
for k in li:
eq(k.version_id, res.version_id)
# for versioning-default-bucket, no version-id should return.
bucket = get_new_bucket()
key_name = 'baz'
(upload, data) = _multipart_upload(bucket, key_name, objlen, headers={'Content-Type': content_type}, metadata={'foo': 'baz'})
res = upload.complete_upload()
assert_is_none(res.version_id)
# for versioning-suspended-bucket, no version-id should return
bucket = get_new_bucket()
check_configure_versioning_retry(bucket, False, "Suspended")
key_name = 'foo'
(upload, data) = _multipart_upload(bucket, key_name, objlen, headers={'Content-Type': content_type}, metadata={'foo': 'baz'})
res = upload.complete_upload()
assert_is_none(res.version_id)