rgw: add a new multipart test

test for the following ceph issue:
http://tracker.ceph.com/issues/15886

Signed-off-by: Yehuda Sadeh <yehuda@redhat.com>
This commit is contained in:
Yehuda Sadeh 2016-05-16 14:44:22 -07:00
parent a72fc4a231
commit e71c84f328

View file

@ -5760,6 +5760,68 @@ def test_atomic_multipart_upload_write():
got = key.get_contents_as_string()
eq(got, 'bar')
class Counter:
def __init__(self, default_val):
self.val = default_val
def inc(self):
self.val = self.val + 1
class ActionOnCount:
def __init__(self, trigger_count, action):
self.count = 0
self.trigger_count = trigger_count
self.action = action
def trigger(self):
self.count = self.count + 1
if self.count == self.trigger_count:
self.action()
@attr(resource='object')
@attr(method='put')
@attr(operation='multipart check for two writes of the same part, first write finishes last')
@attr(assertion='object contains correct content')
def test_multipart_resend_first_finishes_last():
bucket = get_new_bucket()
key_name = "mymultipart"
mp = bucket.initiate_multipart_upload(key_name)
file_size = 8 * 1024 * 1024
counter = Counter(0)
# mp.upload_part_from_file might read multiple times from the object
# first time when it calculates md5, second time when it writes data
# out. We want to interject only on the last time, but we can't be
# sure how many times it's going to read, so let's have a test run
# and count the number of reads
fp_dryrun = FakeWriteFile(file_size, 'C',
lambda: counter.inc()
)
mp.upload_part_from_file(fp_dryrun, 1)
mp.complete_upload
bucket.delete_key(key_name)
# ok, now for the actual test
fp_b = FakeWriteFile(file_size, 'B')
action = ActionOnCount(counter.val, lambda: mp.upload_part_from_file(fp_b, 1))
fp_a = FakeWriteFile(file_size, 'A',
lambda: action.trigger()
)
mp = bucket.initiate_multipart_upload(key_name)
mp.upload_part_from_file(fp_a, 1)
mp.complete_upload()
key = bucket.get_key(key_name)
_verify_atomic_key_data(key, file_size, 'A')
@attr(resource='object')
@attr(method='get')
@attr(operation='range')