forked from TrueCloudLab/s3-tests
Compare commits
15 commits
master
...
wip-ceph-m
Author | SHA1 | Date | |
---|---|---|---|
|
62e9fc0faf | ||
|
8dfbf70d16 | ||
|
e050c10295 | ||
|
277c2993eb | ||
|
55e1c95b2f | ||
|
b1c1d2d98b | ||
|
a271ebeaa1 | ||
|
74e0956621 | ||
|
c82bd9a403 | ||
|
d46175f31e | ||
|
9e7b2ae53d | ||
|
7a6d52afd5 | ||
|
d8fecfdc66 | ||
|
af18c59d22 | ||
|
2d7d79c95d |
3 changed files with 267 additions and 45 deletions
|
@ -5,7 +5,7 @@ bunch >=1.0.0
|
|||
# 0.14 switches to libev, that means bootstrap needs to change too
|
||||
gevent >=1.0
|
||||
isodate >=0.4.4
|
||||
requests ==0.14.0
|
||||
requests >=0.14.0
|
||||
pytz >=2011k
|
||||
ordereddict
|
||||
httplib2
|
||||
|
|
|
@ -13,6 +13,7 @@ import socket
|
|||
import ssl
|
||||
import os
|
||||
import re
|
||||
from email.utils import formatdate
|
||||
|
||||
from urlparse import urlparse
|
||||
|
||||
|
@ -469,6 +470,28 @@ def test_object_create_bad_authorization_empty():
|
|||
eq(e.reason, 'Forbidden')
|
||||
eq(e.error_code, 'AccessDenied')
|
||||
|
||||
@tag('auth_common')
|
||||
@attr(resource='object')
|
||||
@attr(method='put')
|
||||
@attr(operation='create w/date and x-amz-date')
|
||||
@attr(assertion='succeeds')
|
||||
@nose.with_setup(teardown=_clear_custom_headers)
|
||||
def test_object_create_date_and_amz_date():
|
||||
date = formatdate(usegmt=True)
|
||||
key = _setup_bad_object({'Date': date, 'X-Amz-Date': date})
|
||||
key.set_contents_from_string('bar')
|
||||
|
||||
@tag('auth_common')
|
||||
@attr(resource='object')
|
||||
@attr(method='put')
|
||||
@attr(operation='create w/x-amz-date and no date')
|
||||
@attr(assertion='succeeds')
|
||||
@nose.with_setup(teardown=_clear_custom_headers)
|
||||
def test_object_create_amz_date_and_no_date():
|
||||
date = formatdate(usegmt=True)
|
||||
key = _setup_bad_object({'X-Amz-Date': date}, ('Date',))
|
||||
key.set_contents_from_string('bar')
|
||||
|
||||
|
||||
# the teardown is really messed up here. check it out
|
||||
@tag('auth_common')
|
||||
|
|
|
@ -28,11 +28,13 @@ import re
|
|||
|
||||
import xml.etree.ElementTree as ET
|
||||
|
||||
from collections import namedtuple
|
||||
from email.Utils import formatdate
|
||||
from httplib import HTTPConnection, HTTPSConnection
|
||||
from urlparse import urlparse
|
||||
|
||||
from nose.tools import eq_ as eq
|
||||
from nose.tools import assert_is_none
|
||||
from nose.plugins.attrib import attr
|
||||
from nose.plugins.skip import SkipTest
|
||||
from boto.s3.tagging import TagSet
|
||||
|
@ -80,8 +82,8 @@ def check_grants(got, want):
|
|||
in any order.
|
||||
"""
|
||||
eq(len(got), len(want))
|
||||
got = sorted(got, key=operator.attrgetter('id'))
|
||||
want = sorted(want, key=operator.itemgetter('id'))
|
||||
got = sorted(got, key=operator.attrgetter('id', 'permission'))
|
||||
want = sorted(want, key=operator.itemgetter('id', 'permission'))
|
||||
for g, w in zip(got, want):
|
||||
w = dict(w)
|
||||
eq(g.permission, w.pop('permission'))
|
||||
|
@ -893,6 +895,21 @@ def test_bucket_list_return_data_versioning():
|
|||
eq(key.version_id, key_data['version_id'])
|
||||
|
||||
|
||||
@attr(resource='bucket')
|
||||
@attr(method='get')
|
||||
@attr(operation='list keys after marker when bucket versioning is configured')
|
||||
@attr(assertion='marker list on versioning bucket')
|
||||
def test_bucket_list_marker_versioning():
|
||||
bucket = get_new_bucket()
|
||||
check_configure_versioning_retry(bucket, True, "Enabled")
|
||||
key_names = ['bar', 'baz', 'foo']
|
||||
bucket = _create_keys(bucket=bucket, keys=key_names)
|
||||
li = bucket.get_all_keys(marker='baz')
|
||||
eq(li.marker, 'baz')
|
||||
names = [e.name for e in li]
|
||||
eq(names, ['foo'])
|
||||
|
||||
|
||||
@attr(resource='object.metadata')
|
||||
@attr(method='head')
|
||||
@attr(operation='modification-times')
|
||||
|
@ -1483,6 +1500,42 @@ def test_post_object_authenticated_request():
|
|||
got = key.get_contents_as_string()
|
||||
eq(got, 'bar')
|
||||
|
||||
@attr(resource='object')
|
||||
@attr(method='post')
|
||||
@attr(operation='authenticated browser based upload via POST request, no content-type header')
|
||||
@attr(assertion='succeeds and returns written data')
|
||||
def test_post_object_authenticated_no_content_type():
|
||||
bucket = get_new_bucket()
|
||||
|
||||
url = _get_post_url(s3.main, bucket)
|
||||
|
||||
utc = pytz.utc
|
||||
expires = datetime.datetime.now(utc) + datetime.timedelta(seconds=+6000)
|
||||
|
||||
policy_document = {"expiration": expires.strftime("%Y-%m-%dT%H:%M:%SZ"),\
|
||||
"conditions": [\
|
||||
{"bucket": bucket.name},\
|
||||
["starts-with", "$key", "foo"],\
|
||||
{"acl": "private"},\
|
||||
["content-length-range", 0, 1024]\
|
||||
]\
|
||||
}
|
||||
|
||||
json_policy_document = json.JSONEncoder().encode(policy_document)
|
||||
policy = base64.b64encode(json_policy_document)
|
||||
conn = s3.main
|
||||
signature = base64.b64encode(hmac.new(conn.aws_secret_access_key, policy, sha).digest())
|
||||
|
||||
payload = OrderedDict([ ("key" , "foo.txt"),("AWSAccessKeyId" , conn.aws_access_key_id),\
|
||||
("acl" , "private"),("signature" , signature),("policy" , policy),\
|
||||
('file', ('bar'))])
|
||||
|
||||
r = requests.post(url, files = payload)
|
||||
eq(r.status_code, 204)
|
||||
key = bucket.get_key("foo.txt")
|
||||
got = key.get_contents_as_string()
|
||||
eq(got, 'bar')
|
||||
|
||||
@attr(resource='object')
|
||||
@attr(method='post')
|
||||
@attr(operation='authenticated browser based upload via POST request, bad access key')
|
||||
|
@ -5779,8 +5832,8 @@ def _cors_request_and_check(func, url, headers, expect_status, expect_allow_orig
|
|||
r = func(url, headers=headers)
|
||||
eq(r.status_code, expect_status)
|
||||
|
||||
assert r.headers['access-control-allow-origin'] == expect_allow_origin
|
||||
assert r.headers['access-control-allow-methods'] == expect_allow_methods
|
||||
assert r.headers.get('access-control-allow-origin', None) == expect_allow_origin
|
||||
assert r.headers.get('access-control-allow-methods', None) == expect_allow_methods
|
||||
|
||||
|
||||
|
||||
|
@ -7492,7 +7545,8 @@ def create_lifecycle(days = None, prefix = 'test/', rules = None):
|
|||
else:
|
||||
for rule in rules:
|
||||
expiration = boto.s3.lifecycle.Expiration(days=rule['days'])
|
||||
rule = boto.s3.lifecycle.Rule(id=rule['id'], prefix=rule['prefix'],
|
||||
_id = rule.get('id',None)
|
||||
rule = boto.s3.lifecycle.Rule(id=_id, prefix=rule['prefix'],
|
||||
status=rule['status'], expiration=expiration)
|
||||
lifecycle.append(rule)
|
||||
return lifecycle
|
||||
|
@ -7529,27 +7583,59 @@ def test_lifecycle_get():
|
|||
eq(current[1].id, 'test2/')
|
||||
eq(current[1].prefix, 'test2/')
|
||||
|
||||
|
||||
|
||||
@attr(resource='bucket')
|
||||
@attr(method='get')
|
||||
@attr(operation='get lifecycle config no id')
|
||||
@attr('lifecycle')
|
||||
def test_lifecycle_get_no_id():
|
||||
Rule = namedtuple('Rule',['prefix','status','days'])
|
||||
rules = {'rule1' : Rule('test1/','Enabled',31),
|
||||
'rule2' : Rule('test2/','Enabled',120)}
|
||||
bucket = set_lifecycle(rules=[{'days': rules['rule1'].days ,
|
||||
'prefix': rules['rule1'].prefix,
|
||||
'status': rules['rule1'].status},
|
||||
{'days': rules['rule2'].days,
|
||||
'prefix': rules['rule2'].prefix,
|
||||
'status': rules['rule2'].status}])
|
||||
current_lc = bucket.get_lifecycle_config()
|
||||
# We can't guarantee the order of XML, since the id is random, let's walk
|
||||
# through the rules and validate that both were present
|
||||
for lc_rule in current_lc:
|
||||
if lc_rule.expiration.days == rules['rule1'].days:
|
||||
eq(lc_rule.prefix, rules['rule1'].prefix)
|
||||
assert len(lc_rule.id) > 0
|
||||
elif lc_rule.expiration.days == rules['rule2'].days:
|
||||
eq(lc_rule.prefix, rules['rule2'].prefix)
|
||||
assert len(lc_rule.id) > 0
|
||||
else:
|
||||
# neither of the rules we supplied, something wrong
|
||||
assert False
|
||||
|
||||
|
||||
# The test harnass for lifecycle is configured to treat days as 2 second intervals.
|
||||
@attr(resource='bucket')
|
||||
@attr(method='put')
|
||||
@attr(operation='test lifecycle expiration')
|
||||
@attr('lifecycle')
|
||||
@attr('lifecycle_expiration')
|
||||
@attr('fails_on_aws')
|
||||
def test_lifecycle_expiration():
|
||||
bucket = set_lifecycle(rules=[{'id': 'rule1', 'days': 2, 'prefix': 'expire1/', 'status': 'Enabled'},
|
||||
{'id':'rule2', 'days': 6, 'prefix': 'expire3/', 'status': 'Enabled'}])
|
||||
bucket = set_lifecycle(rules=[{'id': 'rule1', 'days': 1, 'prefix': 'expire1/', 'status': 'Enabled'},
|
||||
{'id':'rule2', 'days': 4, 'prefix': 'expire3/', 'status': 'Enabled'}])
|
||||
_create_keys(bucket=bucket, keys=['expire1/foo', 'expire1/bar', 'keep2/foo',
|
||||
'keep2/bar', 'expire3/foo', 'expire3/bar'])
|
||||
# Get list of all keys
|
||||
init_keys = bucket.get_all_keys()
|
||||
# Wait for first expiration (plus fudge to handle the timer window)
|
||||
time.sleep(6)
|
||||
time.sleep(28)
|
||||
expire1_keys = bucket.get_all_keys()
|
||||
# Wait for next expiration cycle
|
||||
time.sleep(2)
|
||||
time.sleep(10)
|
||||
keep2_keys = bucket.get_all_keys()
|
||||
# Wait for final expiration cycle
|
||||
time.sleep(8)
|
||||
time.sleep(20)
|
||||
expire3_keys = bucket.get_all_keys()
|
||||
|
||||
eq(len(init_keys), 6)
|
||||
|
@ -7622,7 +7708,15 @@ def test_lifecycle_rules_conflicted():
|
|||
def generate_lifecycle_body(rules):
|
||||
body = '<?xml version="1.0" encoding="UTF-8"?><LifecycleConfiguration>'
|
||||
for rule in rules:
|
||||
body += '<Rule><ID>%s</ID><Prefix>%s</Prefix><Status>%s</Status>' % (rule['ID'], rule['Prefix'], rule['Status'])
|
||||
body += '<Rule><ID>%s</ID><Status>%s</Status>' % (rule['ID'], rule['Status'])
|
||||
if 'Prefix' in rule.keys():
|
||||
body += '<Prefix>%s</Prefix>' % rule['Prefix']
|
||||
if 'Filter' in rule.keys():
|
||||
prefix_str= '' # AWS supports empty filters
|
||||
if 'Prefix' in rule['Filter'].keys():
|
||||
prefix_str = '<Prefix>%s</Prefix>' % rule['Filter']['Prefix']
|
||||
body += '<Filter>%s</Filter>' % prefix_str
|
||||
|
||||
if 'Expiration' in rule.keys():
|
||||
if 'ExpiredObjectDeleteMarker' in rule['Expiration'].keys():
|
||||
body += '<Expiration><ExpiredObjectDeleteMarker>%s</ExpiredObjectDeleteMarker></Expiration>' \
|
||||
|
@ -7686,6 +7780,7 @@ def test_lifecycle_set_invalid_date():
|
|||
@attr(method='put')
|
||||
@attr(operation='test lifecycle expiration with date')
|
||||
@attr('lifecycle')
|
||||
@attr('lifecycle_expiration')
|
||||
@attr('fails_on_aws')
|
||||
def test_lifecycle_expiration_date():
|
||||
bucket = get_new_bucket()
|
||||
|
@ -7734,6 +7829,7 @@ def test_lifecycle_set_noncurrent():
|
|||
@attr(method='put')
|
||||
@attr(operation='test lifecycle non-current version expiration')
|
||||
@attr('lifecycle')
|
||||
@attr('lifecycle_expiration')
|
||||
@attr('fails_on_aws')
|
||||
def test_lifecycle_noncur_expiration():
|
||||
bucket = get_new_bucket()
|
||||
|
@ -7774,12 +7870,51 @@ def test_lifecycle_set_deletemarker():
|
|||
eq(res.status, 200)
|
||||
eq(res.reason, 'OK')
|
||||
|
||||
@attr(resource='bucket')
|
||||
@attr(method='put')
|
||||
@attr(operation='set lifecycle config with Filter')
|
||||
@attr('lifecycle')
|
||||
def test_lifecycle_set_filter():
|
||||
bucket = get_new_bucket()
|
||||
rules = [
|
||||
{'ID': 'rule1', 'Filter': {'Prefix': 'foo'}, 'Status': 'Enabled', 'Expiration': {'ExpiredObjectDeleteMarker': 'true'}}
|
||||
]
|
||||
body = generate_lifecycle_body(rules)
|
||||
fp = StringIO(body)
|
||||
md5 = boto.utils.compute_md5(fp)
|
||||
headers = {'Content-MD5': md5[1], 'Content-Type': 'text/xml'}
|
||||
res = bucket.connection.make_request('PUT', bucket.name, data=fp.getvalue(), query_args='lifecycle',
|
||||
headers=headers)
|
||||
eq(res.status, 200)
|
||||
eq(res.reason, 'OK')
|
||||
|
||||
@attr(resource='bucket')
|
||||
@attr(method='put')
|
||||
@attr(operation='set lifecycle config with empty Filter')
|
||||
@attr('lifecycle')
|
||||
def test_lifecycle_set_empty_filter():
|
||||
bucket = get_new_bucket()
|
||||
rules = [
|
||||
{'ID': 'rule1', 'Filter': {}, 'Status': 'Enabled', 'Expiration': {'ExpiredObjectDeleteMarker': 'true'}}
|
||||
]
|
||||
body = generate_lifecycle_body(rules)
|
||||
fp = StringIO(body)
|
||||
md5 = boto.utils.compute_md5(fp)
|
||||
headers = {'Content-MD5': md5[1], 'Content-Type': 'text/xml'}
|
||||
res = bucket.connection.make_request('PUT', bucket.name, data=fp.getvalue(), query_args='lifecycle',
|
||||
headers=headers)
|
||||
eq(res.status, 200)
|
||||
eq(res.reason, 'OK')
|
||||
|
||||
|
||||
|
||||
|
||||
# The test harnass for lifecycle is configured to treat days as 1 second intervals.
|
||||
@attr(resource='bucket')
|
||||
@attr(method='put')
|
||||
@attr(operation='test lifecycle delete marker expiration')
|
||||
@attr('lifecycle')
|
||||
@attr('lifecycle_expiration')
|
||||
@attr('fails_on_aws')
|
||||
def test_lifecycle_deletemarker_expiration():
|
||||
bucket = get_new_bucket()
|
||||
|
@ -7832,6 +7967,7 @@ def test_lifecycle_set_multipart():
|
|||
@attr(method='put')
|
||||
@attr(operation='test lifecycle multipart expiration')
|
||||
@attr('lifecycle')
|
||||
@attr('lifecycle_expiration')
|
||||
@attr('fails_on_aws')
|
||||
def test_lifecycle_multipart_expiration():
|
||||
bucket = get_new_bucket()
|
||||
|
@ -8043,9 +8179,9 @@ def test_encryption_key_no_sse_c():
|
|||
}
|
||||
key = bucket.new_key('testobj')
|
||||
data = 'A'*100
|
||||
key.set_contents_from_string(data, headers=sse_client_headers)
|
||||
rdata = key.get_contents_as_string()
|
||||
eq(data, rdata)
|
||||
e = assert_raises(boto.exception.S3ResponseError,
|
||||
key.set_contents_from_string, data, headers=sse_client_headers)
|
||||
eq(e.status, 400)
|
||||
|
||||
|
||||
def _multipart_upload_enc(bucket, s3_key_name, size, part_size=5*1024*1024,
|
||||
|
@ -8281,7 +8417,7 @@ def _test_sse_kms_customer_write(file_size, key_id = 'testkey-1'):
|
|||
key = bucket.new_key('testobj')
|
||||
data = 'A'*file_size
|
||||
key.set_contents_from_string(data, headers=sse_kms_client_headers)
|
||||
rdata = key.get_contents_as_string(headers=sse_kms_client_headers)
|
||||
rdata = key.get_contents_as_string()
|
||||
eq(data, rdata)
|
||||
|
||||
|
||||
|
@ -8340,6 +8476,9 @@ def test_sse_kms_method_head():
|
|||
eq(res.status, 200)
|
||||
eq(res.getheader('x-amz-server-side-encryption'), 'aws:kms')
|
||||
eq(res.getheader('x-amz-server-side-encryption-aws-kms-key-id'), 'testkey-1')
|
||||
|
||||
res = _make_request('HEAD', bucket, key, authenticated=True, request_headers=sse_kms_client_headers)
|
||||
eq(res.status, 400)
|
||||
|
||||
|
||||
@attr(resource='object')
|
||||
|
@ -8360,28 +8499,6 @@ def test_sse_kms_present():
|
|||
eq(data, result)
|
||||
|
||||
|
||||
@attr(resource='object')
|
||||
@attr(method='put')
|
||||
@attr(operation='write encrypted with SSE-KMS but read with other key')
|
||||
@attr(assertion='operation fails')
|
||||
@attr('encryption')
|
||||
def test_sse_kms_other_key():
|
||||
bucket = get_new_bucket()
|
||||
sse_kms_client_headers_A = {
|
||||
'x-amz-server-side-encryption': 'aws:kms',
|
||||
'x-amz-server-side-encryption-aws-kms-key-id': 'testkey-1'
|
||||
}
|
||||
sse_kms_client_headers_B = {
|
||||
'x-amz-server-side-encryption': 'aws:kms',
|
||||
'x-amz-server-side-encryption-aws-kms-key-id': 'testkey-2'
|
||||
}
|
||||
key = bucket.new_key('testobj')
|
||||
data = 'A'*100
|
||||
key.set_contents_from_string(data, headers=sse_kms_client_headers_A)
|
||||
result = key.get_contents_as_string(headers=sse_kms_client_headers_B)
|
||||
eq(data, result)
|
||||
|
||||
|
||||
@attr(resource='object')
|
||||
@attr(method='put')
|
||||
@attr(operation='declare SSE-KMS but do not provide key_id')
|
||||
|
@ -8410,9 +8527,9 @@ def test_sse_kms_not_declared():
|
|||
}
|
||||
key = bucket.new_key('testobj')
|
||||
data = 'A'*100
|
||||
key.set_contents_from_string(data, headers=sse_kms_client_headers)
|
||||
rdata = key.get_contents_as_string()
|
||||
eq(data, rdata)
|
||||
e = assert_raises(boto.exception.S3ResponseError,
|
||||
key.set_contents_from_string, data, headers=sse_kms_client_headers)
|
||||
eq(e.status, 400)
|
||||
|
||||
|
||||
@attr(resource='object')
|
||||
|
@ -8442,13 +8559,13 @@ def test_sse_kms_multipart_upload():
|
|||
k = bucket.get_key(key)
|
||||
eq(k.metadata['foo'], 'bar')
|
||||
eq(k.content_type, content_type)
|
||||
test_string = k.get_contents_as_string(headers=enc_headers)
|
||||
test_string = k.get_contents_as_string()
|
||||
eq(len(test_string), k.size)
|
||||
eq(data, test_string)
|
||||
eq(test_string, data)
|
||||
|
||||
_check_content_using_range_enc(k, data, 1000000, enc_headers=enc_headers)
|
||||
_check_content_using_range_enc(k, data, 10000000, enc_headers=enc_headers)
|
||||
_check_content_using_range(k, data, 1000000)
|
||||
_check_content_using_range(k, data, 10000000)
|
||||
|
||||
|
||||
@attr(resource='object')
|
||||
|
@ -8544,7 +8661,7 @@ def test_sse_kms_post_object_authenticated_request():
|
|||
}
|
||||
|
||||
key = bucket.get_key("foo.txt")
|
||||
got = key.get_contents_as_string(headers=get_headers)
|
||||
got = key.get_contents_as_string()
|
||||
eq(got, 'bar')
|
||||
|
||||
@attr(resource='object')
|
||||
|
@ -8590,6 +8707,23 @@ def test_sse_kms_barb_transfer_13b():
|
|||
raise SkipTest
|
||||
_test_sse_kms_customer_write(13, key_id = config['main']['kms_keyid'])
|
||||
|
||||
@attr(resource='object')
|
||||
@attr(method='get')
|
||||
@attr(operation='write encrypted with SSE-KMS and read with SSE-KMS')
|
||||
@attr(assertion='operation fails')
|
||||
@attr('encryption')
|
||||
def test_sse_kms_read_declare():
|
||||
bucket = get_new_bucket()
|
||||
sse_kms_client_headers = {
|
||||
'x-amz-server-side-encryption': 'aws:kms',
|
||||
'x-amz-server-side-encryption-aws-kms-key-id': 'testkey-1'
|
||||
}
|
||||
key = bucket.new_key('testobj')
|
||||
data = 'A'*100
|
||||
key.set_contents_from_string(data, headers=sse_kms_client_headers)
|
||||
e = assert_raises(boto.exception.S3ResponseError, key.get_contents_as_string, headers=sse_kms_client_headers)
|
||||
eq(e.status, 400)
|
||||
|
||||
@attr(resource='bucket')
|
||||
@attr(method='get')
|
||||
@attr(operation='Test Bucket Policy')
|
||||
|
@ -9205,3 +9339,68 @@ def test_delete_tags_obj_public():
|
|||
tags = _get_obj_tags(bucket, key.name)
|
||||
eq(len(tags),0)
|
||||
#eq(input_tagset, res_tagset)
|
||||
|
||||
@attr(resource='object')
|
||||
@attr(method='put')
|
||||
@attr(operation='test whether a correct version-id returned')
|
||||
@attr(assertion='version-id is same as bucket list')
|
||||
def test_versioning_bucket_atomic_upload_return_version_id():
|
||||
# for versioning-enabled-bucket, an non-empty version-id should return
|
||||
bucket = get_new_bucket()
|
||||
check_configure_versioning_retry(bucket, True, "Enabled")
|
||||
key_name = 'bar'
|
||||
key = bucket.new_key(key_name)
|
||||
key.set_contents_from_string(key_name)
|
||||
# list_verions will return an non-empty version-id
|
||||
li = bucket.list_versions()
|
||||
for k in li:
|
||||
eq(key.version_id, k.version_id)
|
||||
|
||||
# for versioning-default-bucket, no version-id should return.
|
||||
bucket = get_new_bucket()
|
||||
key_name = 'baz'
|
||||
key = bucket.new_key(key_name)
|
||||
key.set_contents_from_string(key_name)
|
||||
assert_is_none(key.version_id)
|
||||
|
||||
# for versioning-suspended-bucket, no version-id should return.
|
||||
bucket = get_new_bucket()
|
||||
check_configure_versioning_retry(bucket, False, "Suspended")
|
||||
key_name = 'foo'
|
||||
key = bucket.new_key(key_name)
|
||||
key.set_contents_from_string(key_name)
|
||||
assert_is_none(key.version_id)
|
||||
|
||||
@attr(resource='object')
|
||||
@attr(method='put')
|
||||
@attr(operation='test whether a correct version-id returned')
|
||||
@attr(assertion='version-id is same as bucket list')
|
||||
def test_versioning_bucket_multipart_upload_return_version_id():
|
||||
content_type='text/bla'
|
||||
objlen = 30 * 1024 * 1024
|
||||
|
||||
# for versioning-enabled-bucket, an non-empty version-id should return
|
||||
bucket = get_new_bucket()
|
||||
check_configure_versioning_retry(bucket, True, "Enabled")
|
||||
key_name = 'bar'
|
||||
(upload, data) = _multipart_upload(bucket, key_name, objlen, headers={'Content-Type': content_type}, metadata={'foo': 'baz'})
|
||||
res = upload.complete_upload()
|
||||
# list_verions will return an non-empty version-id
|
||||
li = bucket.list_versions()
|
||||
for k in li:
|
||||
eq(k.version_id, res.version_id)
|
||||
|
||||
# for versioning-default-bucket, no version-id should return.
|
||||
bucket = get_new_bucket()
|
||||
key_name = 'baz'
|
||||
(upload, data) = _multipart_upload(bucket, key_name, objlen, headers={'Content-Type': content_type}, metadata={'foo': 'baz'})
|
||||
res = upload.complete_upload()
|
||||
assert_is_none(res.version_id)
|
||||
|
||||
# for versioning-suspended-bucket, no version-id should return
|
||||
bucket = get_new_bucket()
|
||||
check_configure_versioning_retry(bucket, False, "Suspended")
|
||||
key_name = 'foo'
|
||||
(upload, data) = _multipart_upload(bucket, key_name, objlen, headers={'Content-Type': content_type}, metadata={'foo': 'baz'})
|
||||
res = upload.complete_upload()
|
||||
assert_is_none(res.version_id)
|
||||
|
|
Loading…
Add table
Reference in a new issue