Compare commits

...

4 commits

Author SHA1 Message Date
Yehuda Sadeh
f2ee66d1cf s3tests: more thorough multipart resend tests
Signed-off-by: Yehuda Sadeh <yehuda@redhat.com>
(cherry picked from commit 725bb83e5e)

Conflicts:
	s3tests/functional/test_s3.py
2015-05-13 17:13:18 -07:00
Yehuda Sadeh
2b3ba675d6 radosgw-admin: test resend of a multipart upload part
Signed-off-by: Yehuda Sadeh <yehuda@redhat.com>

Conflicts:
	s3tests/functional/test_s3.py
2015-05-13 14:55:45 -07:00
Sage Weil
f041a73153 Merge pull request #41 from ceph/wip-10770
rgw: concurrent non-versioned object creation, deletion

Reviewed-by: Sage Weil <sage@redhat.com>
2015-02-05 11:32:23 -08:00
Yehuda Sadeh
4342e14114 rgw: concurrent non-versioned object creation, deletion
Tests issue #10770: bad bucket index entry on bucket index cancelled
operation.

Signed-off-by: Yehuda Sadeh <yehuda@inktank.com>
2015-02-05 10:02:45 -08:00

View file

@ -4352,25 +4352,66 @@ def generate_random(size, part_size=5*1024*1024):
this_part_size = min(left, part_size) this_part_size = min(left, part_size)
for y in range(this_part_size / chunk): for y in range(this_part_size / chunk):
s = s + strpart s = s + strpart
remaining = this_part_size - len(s)
if remaining > 0:
s += strpart[0:remaining]
yield s yield s
if (x == size): if (x == size):
return return
def _multipart_upload(bucket, s3_key_name, size, part_size=5*1024*1024, do_list=None, headers=None, metadata=None): def _multipart_upload(bucket, s3_key_name, size, part_size=5*1024*1024, do_list=None, headers=None, metadata=None, resend_parts=[]):
""" """
generate a multi-part upload for a random file of specifed size, generate a multi-part upload for a random file of specifed size,
if requested, generate a list of the parts if requested, generate a list of the parts
return the upload descriptor return the upload descriptor
""" """
upload = bucket.initiate_multipart_upload(s3_key_name, headers=headers, metadata=metadata) upload = bucket.initiate_multipart_upload(s3_key_name, headers=headers, metadata=metadata)
s = ''
for i, part in enumerate(generate_random(size, part_size)): for i, part in enumerate(generate_random(size, part_size)):
s += part
transfer_part(bucket, upload.id, upload.key_name, i, part)
if i in resend_parts:
transfer_part(bucket, upload.id, upload.key_name, i, part) transfer_part(bucket, upload.id, upload.key_name, i, part)
if do_list is not None: if do_list is not None:
l = bucket.list_multipart_uploads() l = bucket.list_multipart_uploads()
l = list(l) l = list(l)
return upload return (upload, s)
@attr(resource='object')
@attr(method='put')
@attr(operation='check multipart upload without parts')
def test_multipart_upload_empty():
bucket = get_new_bucket()
key = "mymultipart"
(upload, data) = _multipart_upload(bucket, key, 0)
e = assert_raises(boto.exception.S3ResponseError, upload.complete_upload)
eq(e.status, 400)
eq(e.error_code, u'MalformedXML')
@attr(resource='object')
@attr(method='put')
@attr(operation='check multipart uploads with single small part')
def test_multipart_upload_small():
bucket = get_new_bucket()
key = "mymultipart"
size = 1
(upload, data) = _multipart_upload(bucket, key, size)
upload.complete_upload()
key2 = bucket.get_key(key)
eq(key2.size, size)
def _check_content_using_range(k, data, step):
objlen = k.size
for ofs in xrange(0, k.size, step):
toread = k.size - ofs
if toread > step:
toread = step
end = ofs + toread - 1
read_range = k.get_contents_as_string(headers={'Range': 'bytes={s}-{e}'.format(s=ofs, e=end)})
eq(len(read_range), toread)
eq(read_range, data[ofs:end+1])
@attr(resource='object') @attr(resource='object')
@attr(method='put') @attr(method='put')
@ -4380,7 +4421,8 @@ def test_multipart_upload():
bucket = get_new_bucket() bucket = get_new_bucket()
key="mymultipart" key="mymultipart"
content_type='text/bla' content_type='text/bla'
upload = _multipart_upload(bucket, key, 30 * 1024 * 1024, headers={'Content-Type': content_type}, metadata={'foo': 'bar'}) objlen = 30 * 1024 * 1024
(upload, data) = _multipart_upload(bucket, key, objlen, headers={'Content-Type': content_type}, metadata={'foo': 'bar'})
upload.complete_upload() upload.complete_upload()
(obj_count, bytes_used) = _head_bucket(bucket) (obj_count, bytes_used) = _head_bucket(bucket)
@ -4391,30 +4433,69 @@ def test_multipart_upload():
k=bucket.get_key(key) k=bucket.get_key(key)
eq(k.metadata['foo'], 'bar') eq(k.metadata['foo'], 'bar')
eq(k.content_type, content_type) eq(k.content_type, content_type)
test_string=k.get_contents_as_string()
eq(len(test_string), k.size)
eq(test_string, data)
_check_content_using_range(k, data, 1000000)
_check_content_using_range(k, data, 10000000)
def _check_upload_multipart_resend(bucket, key, objlen, resend_parts):
content_type='text/bla'
(upload, data) = _multipart_upload(bucket, key, objlen, headers={'Content-Type': content_type}, metadata={'foo': 'bar'}, resend_parts=resend_parts)
upload.complete_upload()
(obj_count, bytes_used) = _head_bucket(bucket)
k=bucket.get_key(key)
eq(k.metadata['foo'], 'bar')
eq(k.content_type, content_type)
test_string=k.get_contents_as_string()
eq(k.size, len(test_string))
eq(k.size, objlen)
eq(test_string, data)
_check_content_using_range(k, data, 1000000)
_check_content_using_range(k, data, 10000000)
@attr(resource='object') @attr(resource='object')
@attr(method='put') @attr(method='put')
@attr(operation='complete multiple multi-part upload with different sizes') @attr(operation='complete multiple multi-part upload with different sizes')
@attr(resource='object')
@attr(method='put')
@attr(operation='complete multi-part upload')
@attr(assertion='successful')
def test_multipart_upload_resend_part():
bucket = get_new_bucket()
key="mymultipart"
objlen = 30 * 1024 * 1024
_check_upload_multipart_resend(bucket, key, objlen, [0])
_check_upload_multipart_resend(bucket, key, objlen, [1])
_check_upload_multipart_resend(bucket, key, objlen, [2])
_check_upload_multipart_resend(bucket, key, objlen, [1,2])
_check_upload_multipart_resend(bucket, key, objlen, [0,1,2,3,4,5])
@attr(assertion='successful') @attr(assertion='successful')
def test_multipart_upload_multiple_sizes(): def test_multipart_upload_multiple_sizes():
bucket = get_new_bucket() bucket = get_new_bucket()
key="mymultipart" key="mymultipart"
upload = _multipart_upload(bucket, key, 5 * 1024 * 1024) (upload, data) = _multipart_upload(bucket, key, 5 * 1024 * 1024)
upload.complete_upload() upload.complete_upload()
upload = _multipart_upload(bucket, key, 5 * 1024 * 1024 + 100 * 1024) (upload, data) = _multipart_upload(bucket, key, 5 * 1024 * 1024 + 100 * 1024)
upload.complete_upload() upload.complete_upload()
upload = _multipart_upload(bucket, key, 5 * 1024 * 1024 + 600 * 1024) (upload, data) = _multipart_upload(bucket, key, 5 * 1024 * 1024 + 600 * 1024)
upload.complete_upload() upload.complete_upload()
upload = _multipart_upload(bucket, key, 10 * 1024 * 1024 + 100 * 1024) (upload, data) = _multipart_upload(bucket, key, 10 * 1024 * 1024 + 100 * 1024)
upload.complete_upload() upload.complete_upload()
upload = _multipart_upload(bucket, key, 10 * 1024 * 1024 + 600 * 1024) (upload, data) = _multipart_upload(bucket, key, 10 * 1024 * 1024 + 600 * 1024)
upload.complete_upload() upload.complete_upload()
upload = _multipart_upload(bucket, key, 10 * 1024 * 1024) (upload, data) = _multipart_upload(bucket, key, 10 * 1024 * 1024)
upload.complete_upload() upload.complete_upload()
@attr(resource='object') @attr(resource='object')
@ -4424,7 +4505,7 @@ def test_multipart_upload_multiple_sizes():
def test_multipart_upload_size_too_small(): def test_multipart_upload_size_too_small():
bucket = get_new_bucket() bucket = get_new_bucket()
key="mymultipart" key="mymultipart"
upload = _multipart_upload(bucket, key, 100 * 1024, part_size=10*1024) (upload, data) = _multipart_upload(bucket, key, 100 * 1024, part_size=10*1024)
e = assert_raises(boto.exception.S3ResponseError, upload.complete_upload) e = assert_raises(boto.exception.S3ResponseError, upload.complete_upload)
eq(e.status, 400) eq(e.status, 400)
eq(e.error_code, u'EntityTooSmall') eq(e.error_code, u'EntityTooSmall')
@ -4489,7 +4570,7 @@ def test_multipart_upload_overwrite_existing_object():
def test_abort_multipart_upload(): def test_abort_multipart_upload():
bucket = get_new_bucket() bucket = get_new_bucket()
key="mymultipart" key="mymultipart"
upload = _multipart_upload(bucket, key, 10 * 1024 * 1024) (upload, data) = _multipart_upload(bucket, key, 10 * 1024 * 1024)
upload.cancel_upload() upload.cancel_upload()
(obj_count, bytes_used) = _head_bucket(bucket) (obj_count, bytes_used) = _head_bucket(bucket)
@ -4505,11 +4586,11 @@ def test_list_multipart_upload():
bucket = get_new_bucket() bucket = get_new_bucket()
key="mymultipart" key="mymultipart"
mb = 1024 * 1024 mb = 1024 * 1024
upload1 = _multipart_upload(bucket, key, 5 * mb, do_list = True) (upload1, data) = _multipart_upload(bucket, key, 5 * mb, do_list = True)
upload2 = _multipart_upload(bucket, key, 6 * mb, do_list = True) (upload2, data) = _multipart_upload(bucket, key, 6 * mb, do_list = True)
key2="mymultipart2" key2="mymultipart2"
upload3 = _multipart_upload(bucket, key2, 5 * mb, do_list = True) (upload3, data) = _multipart_upload(bucket, key2, 5 * mb, do_list = True)
l = bucket.list_multipart_uploads() l = bucket.list_multipart_uploads()
l = list(l) l = list(l)
@ -5852,17 +5933,18 @@ def test_versioned_object_acl():
check_grants(k.get_acl().acl.grants, default_policy) check_grants(k.get_acl().acl.grants, default_policy)
def _do_create_object(bucket, objname, i): def _do_create_object(bucket, objname, i, obj_size):
k = bucket.new_key(objname) k = bucket.new_key(objname)
k.set_contents_from_string('data {i}'.format(i=i)) s = 'x' * obj_size
k.set_contents_from_string(s)
def _do_remove_ver(bucket, obj): def _do_remove_ver(bucket, obj):
bucket.delete_key(obj.name, version_id = obj.version_id) bucket.delete_key(obj.name, version_id = obj.version_id)
def _do_create_versioned_obj_concurrent(bucket, objname, num): def _do_create_obj_concurrent(bucket, objname, num, obj_size=0):
t = [] t = []
for i in range(num): for i in range(num):
thr = threading.Thread(target = _do_create_object, args=(bucket, objname, i)) thr = threading.Thread(target = _do_create_object, args=(bucket, objname, i, obj_size))
thr.start() thr.start()
t.append(thr) t.append(thr)
return t return t
@ -5894,7 +5976,7 @@ def test_versioned_concurrent_object_create_concurrent_remove():
num_objs = 5 num_objs = 5
for i in xrange(5): for i in xrange(5):
t = _do_create_versioned_obj_concurrent(bucket, keyname, num_objs) t = _do_create_obj_concurrent(bucket, keyname, num_objs)
_do_wait_completion(t) _do_wait_completion(t)
eq(_count_bucket_versioned_objs(bucket), num_objs) eq(_count_bucket_versioned_objs(bucket), num_objs)
@ -5923,7 +6005,7 @@ def test_versioned_concurrent_object_create_and_remove():
all_threads = [] all_threads = []
for i in xrange(3): for i in xrange(3):
t = _do_create_versioned_obj_concurrent(bucket, keyname, num_objs) t = _do_create_obj_concurrent(bucket, keyname, num_objs)
all_threads.append(t) all_threads.append(t)
t = _do_clear_versioned_bucket_concurrent(bucket) t = _do_clear_versioned_bucket_concurrent(bucket)
@ -5938,3 +6020,36 @@ def test_versioned_concurrent_object_create_and_remove():
eq(_count_bucket_versioned_objs(bucket), 0) eq(_count_bucket_versioned_objs(bucket), 0)
eq(len(bucket.get_all_keys()), 0) eq(len(bucket.get_all_keys()), 0)
@attr(resource='object')
@attr(method='put')
@attr(operation='concurrent creation of objects, concurrent removal')
@attr(assertion='works')
def test_non_versioned_concurrent_object_create_concurrent_remove():
bucket = get_new_bucket()
# not configuring versioning here! this test is non-versioned
keyname = 'myobj'
num_objs = 5
obj_size = 10 # non-zero
for i in xrange(5):
t = _do_create_obj_concurrent(bucket, keyname, num_objs, obj_size)
_do_wait_completion(t)
keys = []
for k in bucket.get_all_keys():
keys.append(k)
eq(len(keys), 1)
eq(keys[0].size, obj_size)
t = _do_clear_versioned_bucket_concurrent(bucket)
_do_wait_completion(t)
eq(_count_bucket_versioned_objs(bucket), 0)
eq(len(bucket.get_all_keys()), 0)