mirror of
https://github.com/ceph/s3-tests.git
synced 2024-11-28 09:31:38 +00:00
s3tests: concurrent object versioning tests
test concurrent creation and removal Signed-off-by: Yehuda Sadeh <yehuda@inktank.com>
This commit is contained in:
parent
5be9694863
commit
460e3f1f1e
1 changed files with 95 additions and 7 deletions
|
@ -21,6 +21,7 @@ import sha
|
|||
import pytz
|
||||
import json
|
||||
import httplib2
|
||||
import threading
|
||||
|
||||
import xml.etree.ElementTree as ET
|
||||
|
||||
|
@ -5570,7 +5571,7 @@ def test_versioning_copy_obj_version():
|
|||
new_key = another_bucket.copy_key('new_key', bucket.name, objname)
|
||||
eq(new_key.get_contents_as_string(), c[num_versions - 1])
|
||||
|
||||
def _count_bucket_objs(bucket):
|
||||
def _count_bucket_versioned_objs(bucket):
|
||||
k = []
|
||||
for key in bucket.list_versions():
|
||||
k.insert(0, key)
|
||||
|
@ -5604,14 +5605,14 @@ def test_versioning_multi_object_delete():
|
|||
eq(len(result.deleted), 2)
|
||||
eq(len(result.errors), 0)
|
||||
|
||||
eq(_count_bucket_objs(bucket), 0)
|
||||
eq(_count_bucket_versioned_objs(bucket), 0)
|
||||
|
||||
# now remove again, should all succeed due to idempotency
|
||||
result = bucket.delete_keys(stored_keys)
|
||||
eq(len(result.deleted), 2)
|
||||
eq(len(result.errors), 0)
|
||||
|
||||
eq(_count_bucket_objs(bucket), 0)
|
||||
eq(_count_bucket_versioned_objs(bucket), 0)
|
||||
|
||||
@attr(resource='object')
|
||||
@attr(method='delete')
|
||||
|
@ -5642,7 +5643,7 @@ def test_versioning_multi_object_delete_with_marker():
|
|||
result = bucket.delete_keys(stored_keys)
|
||||
eq(len(result.deleted), 3)
|
||||
eq(len(result.errors), 0)
|
||||
eq(_count_bucket_objs(bucket), 0)
|
||||
eq(_count_bucket_versioned_objs(bucket), 0)
|
||||
|
||||
delete_markers = []
|
||||
for o in result.deleted:
|
||||
|
@ -5657,7 +5658,7 @@ def test_versioning_multi_object_delete_with_marker():
|
|||
eq(len(result.deleted), 3)
|
||||
eq(len(result.errors), 0)
|
||||
|
||||
eq(_count_bucket_objs(bucket), 0)
|
||||
eq(_count_bucket_versioned_objs(bucket), 0)
|
||||
|
||||
@attr(resource='object')
|
||||
@attr(method='delete')
|
||||
|
@ -5673,11 +5674,11 @@ def test_versioning_multi_object_delete_with_marker_create():
|
|||
|
||||
rmkeys = { bucket.new_key(keyname) }
|
||||
|
||||
eq(_count_bucket_objs(bucket), 0)
|
||||
eq(_count_bucket_versioned_objs(bucket), 0)
|
||||
|
||||
result = bucket.delete_keys(rmkeys)
|
||||
eq(len(result.deleted), 1)
|
||||
eq(_count_bucket_objs(bucket), 1)
|
||||
eq(_count_bucket_versioned_objs(bucket), 1)
|
||||
|
||||
delete_markers = []
|
||||
for o in result.deleted:
|
||||
|
@ -5760,3 +5761,90 @@ def test_versioned_object_acl():
|
|||
k = bucket.new_key(keyname)
|
||||
check_grants(k.get_acl().acl.grants, default_policy)
|
||||
|
||||
|
||||
def _do_create_object(bucket, objname, i):
|
||||
k = bucket.new_key(objname)
|
||||
k.set_contents_from_string('data {i}'.format(i=i))
|
||||
|
||||
def _do_remove_ver(bucket, obj):
|
||||
bucket.delete_key(obj.name, version_id = obj.version_id)
|
||||
|
||||
def _do_create_versioned_obj_concurrent(bucket, objname, num):
|
||||
t = []
|
||||
for i in range(num):
|
||||
thr = threading.Thread(target = _do_create_object, args=(bucket, objname, i))
|
||||
thr.start()
|
||||
t.append(thr)
|
||||
return t
|
||||
|
||||
def _do_clear_versioned_bucket_concurrent(bucket):
|
||||
t = []
|
||||
for o in bucket.list_versions():
|
||||
thr = threading.Thread(target = _do_remove_ver, args=(bucket, o))
|
||||
thr.start()
|
||||
t.append(thr)
|
||||
return t
|
||||
|
||||
def _do_wait_completion(t):
|
||||
for thr in t:
|
||||
thr.join()
|
||||
|
||||
@attr(resource='object')
|
||||
@attr(method='put')
|
||||
@attr(operation='concurrent creation of objects, concurrent removal')
|
||||
@attr(assertion='works')
|
||||
@attr('versioning')
|
||||
def test_versioned_concurrent_object_create_concurrent_remove():
|
||||
bucket = get_new_bucket()
|
||||
|
||||
check_configure_versioning_retry(bucket, True, "Enabled")
|
||||
|
||||
keyname = 'myobj'
|
||||
|
||||
num_objs = 5
|
||||
|
||||
for i in xrange(5):
|
||||
t = _do_create_versioned_obj_concurrent(bucket, keyname, num_objs)
|
||||
_do_wait_completion(t)
|
||||
|
||||
eq(_count_bucket_versioned_objs(bucket), num_objs)
|
||||
eq(len(bucket.get_all_keys()), 1)
|
||||
|
||||
t = _do_clear_versioned_bucket_concurrent(bucket)
|
||||
_do_wait_completion(t)
|
||||
|
||||
eq(_count_bucket_versioned_objs(bucket), 0)
|
||||
eq(len(bucket.get_all_keys()), 0)
|
||||
|
||||
@attr(resource='object')
|
||||
@attr(method='put')
|
||||
@attr(operation='concurrent creation and removal of objects')
|
||||
@attr(assertion='works')
|
||||
@attr('versioning')
|
||||
def test_versioned_concurrent_object_create_and_remove():
|
||||
bucket = get_new_bucket()
|
||||
|
||||
check_configure_versioning_retry(bucket, True, "Enabled")
|
||||
|
||||
keyname = 'myobj'
|
||||
|
||||
num_objs = 3
|
||||
|
||||
all_threads = []
|
||||
|
||||
for i in xrange(3):
|
||||
t = _do_create_versioned_obj_concurrent(bucket, keyname, num_objs)
|
||||
all_threads.append(t)
|
||||
|
||||
t = _do_clear_versioned_bucket_concurrent(bucket)
|
||||
all_threads.append(t)
|
||||
|
||||
|
||||
for t in all_threads:
|
||||
_do_wait_completion(t)
|
||||
|
||||
t = _do_clear_versioned_bucket_concurrent(bucket)
|
||||
_do_wait_completion(t)
|
||||
|
||||
eq(_count_bucket_versioned_objs(bucket), 0)
|
||||
eq(len(bucket.get_all_keys()), 0)
|
||||
|
|
Loading…
Reference in a new issue