rgw: add a few tests that check for null object versioning behavior

Deals with ceph issue #15243

Signed-off-by: Yehuda Sadeh <yehuda@redhat.com>
This commit is contained in:
Yehuda Sadeh 2016-03-22 18:40:39 -07:00
parent 4cc37c888f
commit 278113fbc5

View file

@ -6239,6 +6239,125 @@ def overwrite_suspended_versioning_obj(bucket, objname, k, c, content):
check_obj_versions(bucket, objname, k, c) check_obj_versions(bucket, objname, k, c)
@attr(resource='object')
@attr(method='create')
@attr(operation='create object, then switch to versioning')
@attr(assertion='behaves correctly')
@attr('versioning')
def test_versioning_obj_plain_null_version_removal():
bucket = get_new_bucket()
check_versioning(bucket, None)
content = 'fooz'
objname = 'testobj'
key = bucket.new_key(objname)
key.set_contents_from_string(content)
check_configure_versioning_retry(bucket, True, "Enabled")
bucket.delete_key(key, version_id='null')
e = assert_raises(boto.exception.S3ResponseError, key.get_contents_as_string)
eq(e.status, 404)
eq(e.reason, 'Not Found')
eq(e.error_code, 'NoSuchKey')
k = []
for key in bucket.list_versions():
k.insert(0, key)
eq(len(k), 0)
@attr(resource='object')
@attr(method='create')
@attr(operation='create object, then switch to versioning')
@attr(assertion='behaves correctly')
@attr('versioning')
def test_versioning_obj_plain_null_version_overwrite():
bucket = get_new_bucket()
check_versioning(bucket, None)
content = 'fooz'
objname = 'testobj'
key = bucket.new_key(objname)
key.set_contents_from_string(content)
check_configure_versioning_retry(bucket, True, "Enabled")
content2 = 'zzz'
key.set_contents_from_string(content2)
eq(key.get_contents_as_string(), content2)
version_id = None
for k in bucket.list_versions():
version_id = k.version_id
break
print 'version_id=', version_id
bucket.delete_key(key, version_id=version_id)
eq(key.get_contents_as_string(), content)
bucket.delete_key(key, version_id='null')
e = assert_raises(boto.exception.S3ResponseError, key.get_contents_as_string)
eq(e.status, 404)
eq(e.reason, 'Not Found')
eq(e.error_code, 'NoSuchKey')
k = []
for key in bucket.list_versions():
k.insert(0, key)
eq(len(k), 0)
@attr(resource='object')
@attr(method='create')
@attr(operation='create object, then switch to versioning')
@attr(assertion='behaves correctly')
@attr('versioning')
def test_versioning_obj_plain_null_version_overwrite_suspended():
bucket = get_new_bucket()
check_versioning(bucket, None)
content = 'fooz'
objname = 'testobj'
key = bucket.new_key(objname)
key.set_contents_from_string(content)
check_configure_versioning_retry(bucket, True, "Enabled")
check_configure_versioning_retry(bucket, False, "Suspended")
content2 = 'zzz'
key.set_contents_from_string(content2)
eq(key.get_contents_as_string(), content2)
version_id = None
for k in bucket.list_versions():
version_id = k.version_id
break
print 'version_id=', version_id
bucket.delete_key(key, version_id=version_id)
e = assert_raises(boto.exception.S3ResponseError, key.get_contents_as_string)
eq(e.status, 404)
eq(e.reason, 'Not Found')
eq(e.error_code, 'NoSuchKey')
k = []
for key in bucket.list_versions():
k.insert(0, key)
eq(len(k), 0)
@attr(resource='object') @attr(resource='object')
@attr(method='create') @attr(method='create')
@attr(operation='suspend versioned bucket') @attr(operation='suspend versioned bucket')