Merge branch 'wip-rgw-versioning-4'

This commit is contained in:
Yehuda Sadeh 2015-01-30 07:29:42 -08:00
commit ecbb481bea
2 changed files with 732 additions and 15 deletions

View file

@ -59,18 +59,20 @@ def nuke_prefixed_buckets_on_conn(prefix, name, conn):
name=name, name=name,
prefix=prefix, prefix=prefix,
) )
for bucket in conn.get_all_buckets(): for bucket in conn.get_all_buckets():
print 'prefix=',prefix
if bucket.name.startswith(prefix): if bucket.name.startswith(prefix):
print 'Cleaning bucket {bucket}'.format(bucket=bucket) print 'Cleaning bucket {bucket}'.format(bucket=bucket)
try: try:
bucket.set_canned_acl('private') # bucket.set_canned_acl('private')
for key in bucket.list(): for key in bucket.list_versions():
print 'Cleaning bucket {bucket} key {key}'.format( print 'Cleaning bucket {bucket} key {key}'.format(
bucket=bucket, bucket=bucket,
key=key, key=key,
) )
key.set_canned_acl('private') # key.set_canned_acl('private')
key.delete() bucket.delete_key(key.name, version_id = key.version_id)
bucket.delete() bucket.delete()
except boto.exception.S3ResponseError as e: except boto.exception.S3ResponseError as e:
if e.error_code != 'AccessDenied': if e.error_code != 'AccessDenied':

View file

@ -21,6 +21,8 @@ import sha
import pytz import pytz
import json import json
import httplib2 import httplib2
import threading
import itertools
import xml.etree.ElementTree as ET import xml.etree.ElementTree as ET
@ -4392,26 +4394,36 @@ def test_multipart_upload_size_too_small():
eq(e.status, 400) eq(e.status, 400)
eq(e.error_code, u'EntityTooSmall') eq(e.error_code, u'EntityTooSmall')
@attr(resource='object') def gen_rand_string(size, chars=string.ascii_uppercase + string.digits):
@attr(method='put') return ''.join(random.choice(chars) for _ in range(size))
@attr(operation='check contents of multi-part upload')
@attr(assertion='successful') def _do_test_multipart_upload_contents(bucket, key_name, num_parts):
def test_multipart_upload_contents(): payload=gen_rand_string(5)*1024*1024
bucket = get_new_bucket()
key_name="mymultipart"
num_parts=3
payload='12345'*1024*1024
mp=bucket.initiate_multipart_upload(key_name) mp=bucket.initiate_multipart_upload(key_name)
for i in range(0, num_parts): for i in range(0, num_parts):
mp.upload_part_from_file(StringIO(payload), i+1) mp.upload_part_from_file(StringIO(payload), i+1)
last_payload='123'*1024*1024 last_payload='123'*1024*1024
mp.upload_part_from_file(StringIO(last_payload), 4) mp.upload_part_from_file(StringIO(last_payload), num_parts + 1)
mp.complete_upload() mp.complete_upload()
key=bucket.get_key(key_name) key=bucket.get_key(key_name)
test_string=key.get_contents_as_string() test_string=key.get_contents_as_string()
assert test_string == payload*num_parts+last_payload
all_payload = payload*num_parts + last_payload
print 'JJJ', key_name, len(all_payload), len(test_string)
assert test_string == all_payload
return all_payload
@attr(resource='object')
@attr(method='put')
@attr(operation='check contents of multi-part upload')
@attr(assertion='successful')
def test_multipart_upload_contents():
_do_test_multipart_upload_contents(get_new_bucket(), 'mymultipart', 3)
@attr(resource='object') @attr(resource='object')
@ -5187,4 +5199,707 @@ def test_region_copy_object():
e = assert_raises(boto.exception.S3ResponseError, conn.get_bucket, dest_bucket.name) e = assert_raises(boto.exception.S3ResponseError, conn.get_bucket, dest_bucket.name)
eq(e.status, 404) eq(e.status, 404)
def check_versioning(bucket, status):
try:
eq(bucket.get_versioning_status()['Versioning'], status)
except KeyError:
eq(status, None)
# amazon is eventual consistent, retry a bit if failed
def check_configure_versioning_retry(bucket, status, expected_string):
bucket.configure_versioning(status)
read_status = None
for i in xrange(5):
try:
read_status = bucket.get_versioning_status()['Versioning']
except KeyError:
read_status = None
if (expected_string == read_status):
break
time.sleep(1)
eq(expected_string, read_status)
@attr(resource='bucket')
@attr(method='create')
@attr(operation='create versioned bucket')
@attr(assertion='can create and suspend bucket versioning')
@attr('versioning')
def test_versioning_bucket_create_suspend():
bucket = get_new_bucket()
check_versioning(bucket, None)
check_configure_versioning_retry(bucket, False, "Suspended")
check_configure_versioning_retry(bucket, True, "Enabled")
check_configure_versioning_retry(bucket, True, "Enabled")
check_configure_versioning_retry(bucket, False, "Suspended")
def check_head_obj_content(key, content):
if content is not None:
eq(key.get_contents_as_string(), content)
else:
print 'check head', key
eq(key, None)
def check_obj_content(key, content):
if content is not None:
eq(key.get_contents_as_string(), content)
else:
eq(isinstance(key, boto.s3.deletemarker.DeleteMarker), True)
def check_obj_versions(bucket, objname, keys, contents):
# check to see if object is pointing at correct version
key = bucket.get_key(objname)
if len(contents) > 0:
print 'testing obj head', objname
check_head_obj_content(key, contents[-1])
i = len(contents)
for key in bucket.list_versions():
if key.name != objname:
continue
i -= 1
eq(keys[i].version_id or 'null', key.version_id)
print 'testing obj version-id=', key.version_id
check_obj_content(key, contents[i])
else:
eq(key, None)
def create_multiple_versions(bucket, objname, num_versions, k = None, c = None):
c = c or []
k = k or []
for i in xrange(num_versions):
c.append('content-{i}'.format(i=i))
key = bucket.new_key(objname)
key.set_contents_from_string(c[i])
if i == 0:
check_configure_versioning_retry(bucket, True, "Enabled")
k_pos = len(k)
i = 0
for o in bucket.list_versions():
if o.name != objname:
continue
i += 1
if i > num_versions:
break
print o, o.version_id
k.insert(k_pos, o)
print 'created obj name=', objname, 'version-id=', o.version_id
eq(len(k), len(c))
for j in xrange(num_versions):
print j, k[j], k[j].version_id
check_obj_versions(bucket, objname, k, c)
return (k, c)
def remove_obj_version(bucket, k, c, i):
# check by versioned key
i = i % len(k)
rmkey = k.pop(i)
content = c.pop(i)
if (not rmkey.delete_marker):
eq(rmkey.get_contents_as_string(), content)
# remove version
print 'removing version_id=', rmkey.version_id
bucket.delete_key(rmkey.name, version_id = rmkey.version_id)
check_obj_versions(bucket, rmkey.name, k, c)
def remove_obj_head(bucket, objname, k, c):
print 'removing obj=', objname
key = bucket.delete_key(objname)
k.append(key)
c.append(None)
eq(key.delete_marker, True)
check_obj_versions(bucket, objname, k, c)
def _do_test_create_remove_versions(bucket, objname, num_versions, remove_start_idx, idx_inc):
(k, c) = create_multiple_versions(bucket, objname, num_versions)
idx = remove_start_idx
for j in xrange(num_versions):
remove_obj_version(bucket, k, c, idx)
idx += idx_inc
def _do_remove_versions(bucket, objname, remove_start_idx, idx_inc, head_rm_ratio, k, c):
idx = remove_start_idx
r = 0
total = len(k)
for j in xrange(total):
r += head_rm_ratio
if r >= 1:
r %= 1
remove_obj_head(bucket, objname, k, c)
else:
remove_obj_version(bucket, k, c, idx)
idx += idx_inc
check_obj_versions(bucket, objname, k, c)
def _do_test_create_remove_versions_and_head(bucket, objname, num_versions, num_ops, remove_start_idx, idx_inc, head_rm_ratio):
(k, c) = create_multiple_versions(bucket, objname, num_versions)
_do_remove_versions(bucket, objname, remove_start_idx, idx_inc, head_rm_ratio, k, c)
@attr(resource='object')
@attr(method='create')
@attr(operation='create and remove versioned object')
@attr(assertion='can create access and remove appropriate versions')
@attr('versioning')
def test_versioning_obj_create_read_remove():
bucket = get_new_bucket()
objname = 'testobj'
num_vers = 5
_do_test_create_remove_versions(bucket, objname, num_vers, -1, 0)
_do_test_create_remove_versions(bucket, objname, num_vers, -1, 0)
_do_test_create_remove_versions(bucket, objname, num_vers, 0, 0)
_do_test_create_remove_versions(bucket, objname, num_vers, 1, 0)
_do_test_create_remove_versions(bucket, objname, num_vers, 4, -1)
_do_test_create_remove_versions(bucket, objname, num_vers, 3, 3)
@attr(resource='object')
@attr(method='create')
@attr(operation='create and remove versioned object and head')
@attr(assertion='can create access and remove appropriate versions')
@attr('versioning')
def test_versioning_obj_create_read_remove_head():
bucket = get_new_bucket()
objname = 'testobj'
num_vers = 5
_do_test_create_remove_versions_and_head(bucket, objname, num_vers, num_vers * 2, -1, 0, 0.5)
def is_null_key(k):
return (k.version_id is None) or (k.version_id == 'null')
def delete_suspended_versioning_obj(bucket, objname, k, c):
key = bucket.delete_key(objname)
i = 0
while i < len(k):
if is_null_key(k[i]):
k.pop(i)
c.pop(i)
else:
i += 1
key.version_id = "null"
k.append(key)
c.append(None)
check_obj_versions(bucket, objname, k, c)
def overwrite_suspended_versioning_obj(bucket, objname, k, c, content):
key = bucket.new_key(objname)
key.set_contents_from_string(content)
i = 0
while i < len(k):
print 'kkk', i, k[i], k[i].version_id
if is_null_key(k[i]):
print 'null key!'
k.pop(i)
c.pop(i)
else:
i += 1
k.append(key)
c.append(content)
check_obj_versions(bucket, objname, k, c)
@attr(resource='object')
@attr(method='create')
@attr(operation='suspend versioned bucket')
@attr(assertion='suspended versioning behaves correctly')
@attr('versioning')
def test_versioning_obj_suspend_versions():
bucket = get_new_bucket()
check_versioning(bucket, None)
check_configure_versioning_retry(bucket, True, "Enabled")
num_versions = 5
objname = 'testobj'
(k, c) = create_multiple_versions(bucket, objname, num_versions)
check_configure_versioning_retry(bucket, False, "Suspended")
delete_suspended_versioning_obj(bucket, objname, k, c)
delete_suspended_versioning_obj(bucket, objname, k, c)
overwrite_suspended_versioning_obj(bucket, objname, k, c, 'null content 1')
overwrite_suspended_versioning_obj(bucket, objname, k, c, 'null content 2')
delete_suspended_versioning_obj(bucket, objname, k, c)
overwrite_suspended_versioning_obj(bucket, objname, k, c, 'null content 3')
delete_suspended_versioning_obj(bucket, objname, k, c)
check_configure_versioning_retry(bucket, True, "Enabled")
(k, c) = create_multiple_versions(bucket, objname, 3, k, c)
_do_remove_versions(bucket, objname, 0, 5, 0.5, k, c)
_do_remove_versions(bucket, objname, 0, 5, 0, k, c)
eq(len(k), 0)
eq(len(k), len(c))
@attr(resource='object')
@attr(method='create')
@attr(operation='suspend versioned bucket')
@attr(assertion='suspended versioning behaves correctly')
@attr('versioning')
def test_versioning_obj_suspend_versions_simple():
bucket = get_new_bucket()
check_versioning(bucket, None)
check_configure_versioning_retry(bucket, True, "Enabled")
num_versions = 1
objname = 'testobj'
(k, c) = create_multiple_versions(bucket, objname, num_versions)
check_configure_versioning_retry(bucket, False, "Suspended")
delete_suspended_versioning_obj(bucket, objname, k, c)
check_configure_versioning_retry(bucket, True, "Enabled")
(k, c) = create_multiple_versions(bucket, objname, 1, k, c)
for i in xrange(len(k)):
print 'JJJ: ', k[i].version_id, c[i]
_do_remove_versions(bucket, objname, 0, 0, 0.5, k, c)
_do_remove_versions(bucket, objname, 0, 0, 0, k, c)
eq(len(k), 0)
eq(len(k), len(c))
@attr(resource='object')
@attr(method='remove')
@attr(operation='create and remove versions')
@attr(assertion='everything works')
@attr('versioning')
def test_versioning_obj_create_versions_remove_all():
bucket = get_new_bucket()
check_versioning(bucket, None)
check_configure_versioning_retry(bucket, True, "Enabled")
num_versions = 10
objname = 'testobj'
(k, c) = create_multiple_versions(bucket, objname, num_versions)
_do_remove_versions(bucket, objname, 0, 5, 0.5, k, c)
_do_remove_versions(bucket, objname, 0, 5, 0, k, c)
eq(len(k), 0)
eq(len(k), len(c))
@attr(resource='object')
@attr(method='multipart')
@attr(operation='create and test multipart object')
@attr(assertion='everything works')
@attr('versioning')
def test_versioning_obj_create_overwrite_multipart():
bucket = get_new_bucket()
check_configure_versioning_retry(bucket, True, "Enabled")
objname = 'testobj'
c = []
num_vers = 3
for i in xrange(num_vers):
c.append(_do_test_multipart_upload_contents(bucket, objname, 3))
k = []
for key in bucket.list_versions():
k.insert(0, key)
eq(len(k), num_vers)
check_obj_versions(bucket, objname, k, c)
_do_remove_versions(bucket, objname, 0, 3, 0.5, k, c)
_do_remove_versions(bucket, objname, 0, 3, 0, k, c)
eq(len(k), 0)
eq(len(k), len(c))
@attr(resource='object')
@attr(method='multipart')
@attr(operation='list versioned objects')
@attr(assertion='everything works')
@attr('versioning')
def test_versioning_obj_list_marker():
bucket = get_new_bucket()
check_configure_versioning_retry(bucket, True, "Enabled")
objname = 'testobj'
objname2 = 'testobj-1'
num_vers = 5
(k, c) = create_multiple_versions(bucket, objname, num_vers)
(k2, c2) = create_multiple_versions(bucket, objname2, num_vers)
k.reverse()
k2.reverse()
allkeys = k + k2
names = []
for key1, key2 in itertools.izip_longest(bucket.list_versions(), allkeys):
eq(key1.version_id, key2.version_id)
names.append(key1.name)
for i in xrange(len(allkeys)):
for key1, key2 in itertools.izip_longest(bucket.list_versions(key_marker=names[i], version_id_marker=allkeys[i].version_id), allkeys[i+1:]):
eq(key1.version_id, key2.version_id)
# with nonexisting version id, skip to next object
for key1, key2 in itertools.izip_longest(bucket.list_versions(key_marker=objname, version_id_marker='nosuchversion'), allkeys[5:]):
eq(key1.version_id, key2.version_id)
@attr(resource='object')
@attr(method='multipart')
@attr(operation='create and test versioned object copying')
@attr(assertion='everything works')
@attr('versioning')
def test_versioning_copy_obj_version():
bucket = get_new_bucket()
check_configure_versioning_retry(bucket, True, "Enabled")
num_versions = 3
objname = 'testobj'
(k, c) = create_multiple_versions(bucket, objname, num_versions)
# copy into the same bucket
for i in xrange(num_versions):
new_key_name = 'key_{i}'.format(i=i)
new_key = bucket.copy_key(new_key_name, bucket.name, k[i].name, src_version_id=k[i].version_id)
eq(new_key.get_contents_as_string(), c[i])
another_bucket = get_new_bucket()
# copy into a different bucket
for i in xrange(num_versions):
new_key_name = 'key_{i}'.format(i=i)
new_key = another_bucket.copy_key(new_key_name, bucket.name, k[i].name, src_version_id=k[i].version_id)
eq(new_key.get_contents_as_string(), c[i])
# test copy of head object
new_key = another_bucket.copy_key('new_key', bucket.name, objname)
eq(new_key.get_contents_as_string(), c[num_versions - 1])
def _count_bucket_versioned_objs(bucket):
k = []
for key in bucket.list_versions():
k.insert(0, key)
return len(k)
@attr(resource='object')
@attr(method='delete')
@attr(operation='delete multiple versions')
@attr(assertion='deletes multiple versions of an object with a single call')
@attr('versioning')
def test_versioning_multi_object_delete():
bucket = get_new_bucket()
check_configure_versioning_retry(bucket, True, "Enabled")
keyname = 'key'
key0 = bucket.new_key(keyname)
key0.set_contents_from_string('foo')
key1 = bucket.new_key(keyname)
key1.set_contents_from_string('bar')
stored_keys = []
for key in bucket.list_versions():
stored_keys.insert(0, key)
eq(len(stored_keys), 2)
result = bucket.delete_keys(stored_keys)
eq(len(result.deleted), 2)
eq(len(result.errors), 0)
eq(_count_bucket_versioned_objs(bucket), 0)
# now remove again, should all succeed due to idempotency
result = bucket.delete_keys(stored_keys)
eq(len(result.deleted), 2)
eq(len(result.errors), 0)
eq(_count_bucket_versioned_objs(bucket), 0)
@attr(resource='object')
@attr(method='delete')
@attr(operation='delete multiple versions')
@attr(assertion='deletes multiple versions of an object and delete marker with a single call')
@attr('versioning')
def test_versioning_multi_object_delete_with_marker():
bucket = get_new_bucket()
check_configure_versioning_retry(bucket, True, "Enabled")
keyname = 'key'
key0 = bucket.new_key(keyname)
key0.set_contents_from_string('foo')
key1 = bucket.new_key(keyname)
key1.set_contents_from_string('bar')
key2 = bucket.delete_key(keyname)
eq(key2.delete_marker, True)
stored_keys = []
for key in bucket.list_versions():
stored_keys.insert(0, key)
eq(len(stored_keys), 3)
result = bucket.delete_keys(stored_keys)
eq(len(result.deleted), 3)
eq(len(result.errors), 0)
eq(_count_bucket_versioned_objs(bucket), 0)
delete_markers = []
for o in result.deleted:
if o.delete_marker:
delete_markers.insert(0, o)
eq(len(delete_markers), 1)
eq(key2.version_id, delete_markers[0].version_id)
# now remove again, should all succeed due to idempotency
result = bucket.delete_keys(stored_keys)
eq(len(result.deleted), 3)
eq(len(result.errors), 0)
eq(_count_bucket_versioned_objs(bucket), 0)
@attr(resource='object')
@attr(method='delete')
@attr(operation='multi delete create marker')
@attr(assertion='returns correct marker version id')
@attr('versioning')
def test_versioning_multi_object_delete_with_marker_create():
bucket = get_new_bucket()
check_configure_versioning_retry(bucket, True, "Enabled")
keyname = 'key'
rmkeys = { bucket.new_key(keyname) }
eq(_count_bucket_versioned_objs(bucket), 0)
result = bucket.delete_keys(rmkeys)
eq(len(result.deleted), 1)
eq(_count_bucket_versioned_objs(bucket), 1)
delete_markers = []
for o in result.deleted:
if o.delete_marker:
delete_markers.insert(0, o)
eq(len(delete_markers), 1)
for o in bucket.list_versions():
eq(o.name, keyname)
eq(o.version_id, delete_markers[0].delete_marker_version_id)
@attr(resource='object')
@attr(method='put')
@attr(operation='change acl on an object version changes specific version')
@attr(assertion='works')
@attr('versioning')
def test_versioned_object_acl():
bucket = get_new_bucket()
check_configure_versioning_retry(bucket, True, "Enabled")
keyname = 'foo'
key0 = bucket.new_key(keyname)
key0.set_contents_from_string('bar')
key1 = bucket.new_key(keyname)
key1.set_contents_from_string('bla')
key2 = bucket.new_key(keyname)
key2.set_contents_from_string('zxc')
stored_keys = []
for key in bucket.list_versions():
stored_keys.insert(0, key)
k1 = stored_keys[1]
policy = bucket.get_acl(key_name=k1.name, version_id=k1.version_id)
default_policy = [
dict(
permission='FULL_CONTROL',
id=policy.owner.id,
display_name=policy.owner.display_name,
uri=None,
email_address=None,
type='CanonicalUser',
),
]
print repr(policy)
check_grants(policy.acl.grants, default_policy)
bucket.set_canned_acl('public-read', key_name=k1.name, version_id=k1.version_id)
policy = bucket.get_acl(key_name=k1.name, version_id=k1.version_id)
print repr(policy)
check_grants(
policy.acl.grants,
[
dict(
permission='FULL_CONTROL',
id=policy.owner.id,
display_name=policy.owner.display_name,
uri=None,
email_address=None,
type='CanonicalUser',
),
dict(
permission='READ',
id=None,
display_name=None,
uri='http://acs.amazonaws.com/groups/global/AllUsers',
email_address=None,
type='Group',
),
],
)
k = bucket.new_key(keyname)
check_grants(k.get_acl().acl.grants, default_policy)
def _do_create_object(bucket, objname, i):
k = bucket.new_key(objname)
k.set_contents_from_string('data {i}'.format(i=i))
def _do_remove_ver(bucket, obj):
bucket.delete_key(obj.name, version_id = obj.version_id)
def _do_create_versioned_obj_concurrent(bucket, objname, num):
t = []
for i in range(num):
thr = threading.Thread(target = _do_create_object, args=(bucket, objname, i))
thr.start()
t.append(thr)
return t
def _do_clear_versioned_bucket_concurrent(bucket):
t = []
for o in bucket.list_versions():
thr = threading.Thread(target = _do_remove_ver, args=(bucket, o))
thr.start()
t.append(thr)
return t
def _do_wait_completion(t):
for thr in t:
thr.join()
@attr(resource='object')
@attr(method='put')
@attr(operation='concurrent creation of objects, concurrent removal')
@attr(assertion='works')
@attr('versioning')
def test_versioned_concurrent_object_create_concurrent_remove():
bucket = get_new_bucket()
check_configure_versioning_retry(bucket, True, "Enabled")
keyname = 'myobj'
num_objs = 5
for i in xrange(5):
t = _do_create_versioned_obj_concurrent(bucket, keyname, num_objs)
_do_wait_completion(t)
eq(_count_bucket_versioned_objs(bucket), num_objs)
eq(len(bucket.get_all_keys()), 1)
t = _do_clear_versioned_bucket_concurrent(bucket)
_do_wait_completion(t)
eq(_count_bucket_versioned_objs(bucket), 0)
eq(len(bucket.get_all_keys()), 0)
@attr(resource='object')
@attr(method='put')
@attr(operation='concurrent creation and removal of objects')
@attr(assertion='works')
@attr('versioning')
def test_versioned_concurrent_object_create_and_remove():
bucket = get_new_bucket()
check_configure_versioning_retry(bucket, True, "Enabled")
keyname = 'myobj'
num_objs = 3
all_threads = []
for i in xrange(3):
t = _do_create_versioned_obj_concurrent(bucket, keyname, num_objs)
all_threads.append(t)
t = _do_clear_versioned_bucket_concurrent(bucket)
all_threads.append(t)
for t in all_threads:
_do_wait_completion(t)
t = _do_clear_versioned_bucket_concurrent(bucket)
_do_wait_completion(t)
eq(_count_bucket_versioned_objs(bucket), 0)
eq(len(bucket.get_all_keys()), 0)