diff --git a/s3tests/functional/__init__.py b/s3tests/functional/__init__.py index 51dd506..6d0d680 100644 --- a/s3tests/functional/__init__.py +++ b/s3tests/functional/__init__.py @@ -59,18 +59,20 @@ def nuke_prefixed_buckets_on_conn(prefix, name, conn): name=name, prefix=prefix, ) + for bucket in conn.get_all_buckets(): + print 'prefix=',prefix if bucket.name.startswith(prefix): print 'Cleaning bucket {bucket}'.format(bucket=bucket) try: - bucket.set_canned_acl('private') - for key in bucket.list(): + # bucket.set_canned_acl('private') + for key in bucket.list_versions(): print 'Cleaning bucket {bucket} key {key}'.format( bucket=bucket, key=key, ) - key.set_canned_acl('private') - key.delete() + # key.set_canned_acl('private') + bucket.delete_key(key.name, version_id = key.version_id) bucket.delete() except boto.exception.S3ResponseError as e: if e.error_code != 'AccessDenied': diff --git a/s3tests/functional/test_s3.py b/s3tests/functional/test_s3.py index ab02f1d..16bacf5 100644 --- a/s3tests/functional/test_s3.py +++ b/s3tests/functional/test_s3.py @@ -21,6 +21,8 @@ import sha import pytz import json import httplib2 +import threading +import itertools import xml.etree.ElementTree as ET @@ -4392,26 +4394,36 @@ def test_multipart_upload_size_too_small(): eq(e.status, 400) eq(e.error_code, u'EntityTooSmall') -@attr(resource='object') -@attr(method='put') -@attr(operation='check contents of multi-part upload') -@attr(assertion='successful') -def test_multipart_upload_contents(): - bucket = get_new_bucket() - key_name="mymultipart" - num_parts=3 - payload='12345'*1024*1024 +def gen_rand_string(size, chars=string.ascii_uppercase + string.digits): + return ''.join(random.choice(chars) for _ in range(size)) + +def _do_test_multipart_upload_contents(bucket, key_name, num_parts): + payload=gen_rand_string(5)*1024*1024 mp=bucket.initiate_multipart_upload(key_name) for i in range(0, num_parts): mp.upload_part_from_file(StringIO(payload), i+1) last_payload='123'*1024*1024 - mp.upload_part_from_file(StringIO(last_payload), 4) + mp.upload_part_from_file(StringIO(last_payload), num_parts + 1) mp.complete_upload() key=bucket.get_key(key_name) test_string=key.get_contents_as_string() - assert test_string == payload*num_parts+last_payload + + all_payload = payload*num_parts + last_payload + print 'JJJ', key_name, len(all_payload), len(test_string) + + assert test_string == all_payload + + return all_payload + + +@attr(resource='object') +@attr(method='put') +@attr(operation='check contents of multi-part upload') +@attr(assertion='successful') +def test_multipart_upload_contents(): + _do_test_multipart_upload_contents(get_new_bucket(), 'mymultipart', 3) @attr(resource='object') @@ -5187,4 +5199,707 @@ def test_region_copy_object(): e = assert_raises(boto.exception.S3ResponseError, conn.get_bucket, dest_bucket.name) eq(e.status, 404) +def check_versioning(bucket, status): + try: + eq(bucket.get_versioning_status()['Versioning'], status) + except KeyError: + eq(status, None) +# amazon is eventual consistent, retry a bit if failed +def check_configure_versioning_retry(bucket, status, expected_string): + bucket.configure_versioning(status) + + read_status = None + + for i in xrange(5): + try: + read_status = bucket.get_versioning_status()['Versioning'] + except KeyError: + read_status = None + + if (expected_string == read_status): + break + + time.sleep(1) + + eq(expected_string, read_status) + + +@attr(resource='bucket') +@attr(method='create') +@attr(operation='create versioned bucket') +@attr(assertion='can create and suspend bucket versioning') +@attr('versioning') +def test_versioning_bucket_create_suspend(): + bucket = get_new_bucket() + check_versioning(bucket, None) + + check_configure_versioning_retry(bucket, False, "Suspended") + check_configure_versioning_retry(bucket, True, "Enabled") + check_configure_versioning_retry(bucket, True, "Enabled") + check_configure_versioning_retry(bucket, False, "Suspended") + + +def check_head_obj_content(key, content): + if content is not None: + eq(key.get_contents_as_string(), content) + else: + print 'check head', key + eq(key, None) + +def check_obj_content(key, content): + if content is not None: + eq(key.get_contents_as_string(), content) + else: + eq(isinstance(key, boto.s3.deletemarker.DeleteMarker), True) + + +def check_obj_versions(bucket, objname, keys, contents): + # check to see if object is pointing at correct version + key = bucket.get_key(objname) + + if len(contents) > 0: + print 'testing obj head', objname + check_head_obj_content(key, contents[-1]) + i = len(contents) + for key in bucket.list_versions(): + if key.name != objname: + continue + + i -= 1 + eq(keys[i].version_id or 'null', key.version_id) + print 'testing obj version-id=', key.version_id + check_obj_content(key, contents[i]) + else: + eq(key, None) + +def create_multiple_versions(bucket, objname, num_versions, k = None, c = None): + c = c or [] + k = k or [] + for i in xrange(num_versions): + c.append('content-{i}'.format(i=i)) + + key = bucket.new_key(objname) + key.set_contents_from_string(c[i]) + + if i == 0: + check_configure_versioning_retry(bucket, True, "Enabled") + + k_pos = len(k) + i = 0 + for o in bucket.list_versions(): + if o.name != objname: + continue + i += 1 + if i > num_versions: + break + + print o, o.version_id + k.insert(k_pos, o) + print 'created obj name=', objname, 'version-id=', o.version_id + + eq(len(k), len(c)) + + for j in xrange(num_versions): + print j, k[j], k[j].version_id + + check_obj_versions(bucket, objname, k, c) + + return (k, c) + + +def remove_obj_version(bucket, k, c, i): + # check by versioned key + i = i % len(k) + rmkey = k.pop(i) + content = c.pop(i) + if (not rmkey.delete_marker): + eq(rmkey.get_contents_as_string(), content) + + # remove version + print 'removing version_id=', rmkey.version_id + bucket.delete_key(rmkey.name, version_id = rmkey.version_id) + check_obj_versions(bucket, rmkey.name, k, c) + +def remove_obj_head(bucket, objname, k, c): + print 'removing obj=', objname + key = bucket.delete_key(objname) + + k.append(key) + c.append(None) + + eq(key.delete_marker, True) + + check_obj_versions(bucket, objname, k, c) + +def _do_test_create_remove_versions(bucket, objname, num_versions, remove_start_idx, idx_inc): + (k, c) = create_multiple_versions(bucket, objname, num_versions) + + idx = remove_start_idx + + for j in xrange(num_versions): + remove_obj_version(bucket, k, c, idx) + idx += idx_inc + +def _do_remove_versions(bucket, objname, remove_start_idx, idx_inc, head_rm_ratio, k, c): + idx = remove_start_idx + + r = 0 + + total = len(k) + + for j in xrange(total): + r += head_rm_ratio + if r >= 1: + r %= 1 + remove_obj_head(bucket, objname, k, c) + else: + remove_obj_version(bucket, k, c, idx) + idx += idx_inc + + check_obj_versions(bucket, objname, k, c) + +def _do_test_create_remove_versions_and_head(bucket, objname, num_versions, num_ops, remove_start_idx, idx_inc, head_rm_ratio): + (k, c) = create_multiple_versions(bucket, objname, num_versions) + + _do_remove_versions(bucket, objname, remove_start_idx, idx_inc, head_rm_ratio, k, c) + +@attr(resource='object') +@attr(method='create') +@attr(operation='create and remove versioned object') +@attr(assertion='can create access and remove appropriate versions') +@attr('versioning') +def test_versioning_obj_create_read_remove(): + bucket = get_new_bucket() + objname = 'testobj' + num_vers = 5 + + _do_test_create_remove_versions(bucket, objname, num_vers, -1, 0) + _do_test_create_remove_versions(bucket, objname, num_vers, -1, 0) + _do_test_create_remove_versions(bucket, objname, num_vers, 0, 0) + _do_test_create_remove_versions(bucket, objname, num_vers, 1, 0) + _do_test_create_remove_versions(bucket, objname, num_vers, 4, -1) + _do_test_create_remove_versions(bucket, objname, num_vers, 3, 3) + +@attr(resource='object') +@attr(method='create') +@attr(operation='create and remove versioned object and head') +@attr(assertion='can create access and remove appropriate versions') +@attr('versioning') +def test_versioning_obj_create_read_remove_head(): + bucket = get_new_bucket() + objname = 'testobj' + num_vers = 5 + + _do_test_create_remove_versions_and_head(bucket, objname, num_vers, num_vers * 2, -1, 0, 0.5) + +def is_null_key(k): + return (k.version_id is None) or (k.version_id == 'null') + +def delete_suspended_versioning_obj(bucket, objname, k, c): + key = bucket.delete_key(objname) + + i = 0 + while i < len(k): + if is_null_key(k[i]): + k.pop(i) + c.pop(i) + else: + i += 1 + + key.version_id = "null" + k.append(key) + c.append(None) + + check_obj_versions(bucket, objname, k, c) + +def overwrite_suspended_versioning_obj(bucket, objname, k, c, content): + key = bucket.new_key(objname) + key.set_contents_from_string(content) + + i = 0 + while i < len(k): + print 'kkk', i, k[i], k[i].version_id + if is_null_key(k[i]): + print 'null key!' + k.pop(i) + c.pop(i) + else: + i += 1 + + k.append(key) + c.append(content) + + check_obj_versions(bucket, objname, k, c) + +@attr(resource='object') +@attr(method='create') +@attr(operation='suspend versioned bucket') +@attr(assertion='suspended versioning behaves correctly') +@attr('versioning') +def test_versioning_obj_suspend_versions(): + bucket = get_new_bucket() + check_versioning(bucket, None) + + check_configure_versioning_retry(bucket, True, "Enabled") + + num_versions = 5 + objname = 'testobj' + + (k, c) = create_multiple_versions(bucket, objname, num_versions) + + check_configure_versioning_retry(bucket, False, "Suspended") + + delete_suspended_versioning_obj(bucket, objname, k, c) + delete_suspended_versioning_obj(bucket, objname, k, c) + overwrite_suspended_versioning_obj(bucket, objname, k, c, 'null content 1') + overwrite_suspended_versioning_obj(bucket, objname, k, c, 'null content 2') + delete_suspended_versioning_obj(bucket, objname, k, c) + overwrite_suspended_versioning_obj(bucket, objname, k, c, 'null content 3') + delete_suspended_versioning_obj(bucket, objname, k, c) + + check_configure_versioning_retry(bucket, True, "Enabled") + + (k, c) = create_multiple_versions(bucket, objname, 3, k, c) + + _do_remove_versions(bucket, objname, 0, 5, 0.5, k, c) + _do_remove_versions(bucket, objname, 0, 5, 0, k, c) + + eq(len(k), 0) + eq(len(k), len(c)) + +@attr(resource='object') +@attr(method='create') +@attr(operation='suspend versioned bucket') +@attr(assertion='suspended versioning behaves correctly') +@attr('versioning') +def test_versioning_obj_suspend_versions_simple(): + bucket = get_new_bucket() + check_versioning(bucket, None) + + check_configure_versioning_retry(bucket, True, "Enabled") + + num_versions = 1 + objname = 'testobj' + + (k, c) = create_multiple_versions(bucket, objname, num_versions) + + check_configure_versioning_retry(bucket, False, "Suspended") + + delete_suspended_versioning_obj(bucket, objname, k, c) + + check_configure_versioning_retry(bucket, True, "Enabled") + + (k, c) = create_multiple_versions(bucket, objname, 1, k, c) + + for i in xrange(len(k)): + print 'JJJ: ', k[i].version_id, c[i] + + _do_remove_versions(bucket, objname, 0, 0, 0.5, k, c) + _do_remove_versions(bucket, objname, 0, 0, 0, k, c) + + eq(len(k), 0) + eq(len(k), len(c)) + +@attr(resource='object') +@attr(method='remove') +@attr(operation='create and remove versions') +@attr(assertion='everything works') +@attr('versioning') +def test_versioning_obj_create_versions_remove_all(): + bucket = get_new_bucket() + check_versioning(bucket, None) + + check_configure_versioning_retry(bucket, True, "Enabled") + + num_versions = 10 + objname = 'testobj' + + (k, c) = create_multiple_versions(bucket, objname, num_versions) + + _do_remove_versions(bucket, objname, 0, 5, 0.5, k, c) + _do_remove_versions(bucket, objname, 0, 5, 0, k, c) + + eq(len(k), 0) + eq(len(k), len(c)) + +@attr(resource='object') +@attr(method='multipart') +@attr(operation='create and test multipart object') +@attr(assertion='everything works') +@attr('versioning') +def test_versioning_obj_create_overwrite_multipart(): + bucket = get_new_bucket() + check_configure_versioning_retry(bucket, True, "Enabled") + + objname = 'testobj' + + c = [] + + num_vers = 3 + + for i in xrange(num_vers): + c.append(_do_test_multipart_upload_contents(bucket, objname, 3)) + + k = [] + for key in bucket.list_versions(): + k.insert(0, key) + + eq(len(k), num_vers) + check_obj_versions(bucket, objname, k, c) + + _do_remove_versions(bucket, objname, 0, 3, 0.5, k, c) + _do_remove_versions(bucket, objname, 0, 3, 0, k, c) + + eq(len(k), 0) + eq(len(k), len(c)) + + + +@attr(resource='object') +@attr(method='multipart') +@attr(operation='list versioned objects') +@attr(assertion='everything works') +@attr('versioning') +def test_versioning_obj_list_marker(): + bucket = get_new_bucket() + check_configure_versioning_retry(bucket, True, "Enabled") + + objname = 'testobj' + objname2 = 'testobj-1' + + num_vers = 5 + + (k, c) = create_multiple_versions(bucket, objname, num_vers) + (k2, c2) = create_multiple_versions(bucket, objname2, num_vers) + + k.reverse() + k2.reverse() + + allkeys = k + k2 + + names = [] + + for key1, key2 in itertools.izip_longest(bucket.list_versions(), allkeys): + eq(key1.version_id, key2.version_id) + names.append(key1.name) + + for i in xrange(len(allkeys)): + for key1, key2 in itertools.izip_longest(bucket.list_versions(key_marker=names[i], version_id_marker=allkeys[i].version_id), allkeys[i+1:]): + eq(key1.version_id, key2.version_id) + + # with nonexisting version id, skip to next object + for key1, key2 in itertools.izip_longest(bucket.list_versions(key_marker=objname, version_id_marker='nosuchversion'), allkeys[5:]): + eq(key1.version_id, key2.version_id) + + +@attr(resource='object') +@attr(method='multipart') +@attr(operation='create and test versioned object copying') +@attr(assertion='everything works') +@attr('versioning') +def test_versioning_copy_obj_version(): + bucket = get_new_bucket() + + check_configure_versioning_retry(bucket, True, "Enabled") + + num_versions = 3 + objname = 'testobj' + + (k, c) = create_multiple_versions(bucket, objname, num_versions) + + # copy into the same bucket + for i in xrange(num_versions): + new_key_name = 'key_{i}'.format(i=i) + new_key = bucket.copy_key(new_key_name, bucket.name, k[i].name, src_version_id=k[i].version_id) + eq(new_key.get_contents_as_string(), c[i]) + + another_bucket = get_new_bucket() + + # copy into a different bucket + for i in xrange(num_versions): + new_key_name = 'key_{i}'.format(i=i) + new_key = another_bucket.copy_key(new_key_name, bucket.name, k[i].name, src_version_id=k[i].version_id) + eq(new_key.get_contents_as_string(), c[i]) + + # test copy of head object + new_key = another_bucket.copy_key('new_key', bucket.name, objname) + eq(new_key.get_contents_as_string(), c[num_versions - 1]) + +def _count_bucket_versioned_objs(bucket): + k = [] + for key in bucket.list_versions(): + k.insert(0, key) + return len(k) + + +@attr(resource='object') +@attr(method='delete') +@attr(operation='delete multiple versions') +@attr(assertion='deletes multiple versions of an object with a single call') +@attr('versioning') +def test_versioning_multi_object_delete(): + bucket = get_new_bucket() + + check_configure_versioning_retry(bucket, True, "Enabled") + + keyname = 'key' + + key0 = bucket.new_key(keyname) + key0.set_contents_from_string('foo') + key1 = bucket.new_key(keyname) + key1.set_contents_from_string('bar') + + stored_keys = [] + for key in bucket.list_versions(): + stored_keys.insert(0, key) + + eq(len(stored_keys), 2) + + result = bucket.delete_keys(stored_keys) + eq(len(result.deleted), 2) + eq(len(result.errors), 0) + + eq(_count_bucket_versioned_objs(bucket), 0) + + # now remove again, should all succeed due to idempotency + result = bucket.delete_keys(stored_keys) + eq(len(result.deleted), 2) + eq(len(result.errors), 0) + + eq(_count_bucket_versioned_objs(bucket), 0) + +@attr(resource='object') +@attr(method='delete') +@attr(operation='delete multiple versions') +@attr(assertion='deletes multiple versions of an object and delete marker with a single call') +@attr('versioning') +def test_versioning_multi_object_delete_with_marker(): + bucket = get_new_bucket() + + check_configure_versioning_retry(bucket, True, "Enabled") + + keyname = 'key' + + key0 = bucket.new_key(keyname) + key0.set_contents_from_string('foo') + key1 = bucket.new_key(keyname) + key1.set_contents_from_string('bar') + + key2 = bucket.delete_key(keyname) + eq(key2.delete_marker, True) + + stored_keys = [] + for key in bucket.list_versions(): + stored_keys.insert(0, key) + + eq(len(stored_keys), 3) + + result = bucket.delete_keys(stored_keys) + eq(len(result.deleted), 3) + eq(len(result.errors), 0) + eq(_count_bucket_versioned_objs(bucket), 0) + + delete_markers = [] + for o in result.deleted: + if o.delete_marker: + delete_markers.insert(0, o) + + eq(len(delete_markers), 1) + eq(key2.version_id, delete_markers[0].version_id) + + # now remove again, should all succeed due to idempotency + result = bucket.delete_keys(stored_keys) + eq(len(result.deleted), 3) + eq(len(result.errors), 0) + + eq(_count_bucket_versioned_objs(bucket), 0) + +@attr(resource='object') +@attr(method='delete') +@attr(operation='multi delete create marker') +@attr(assertion='returns correct marker version id') +@attr('versioning') +def test_versioning_multi_object_delete_with_marker_create(): + bucket = get_new_bucket() + + check_configure_versioning_retry(bucket, True, "Enabled") + + keyname = 'key' + + rmkeys = { bucket.new_key(keyname) } + + eq(_count_bucket_versioned_objs(bucket), 0) + + result = bucket.delete_keys(rmkeys) + eq(len(result.deleted), 1) + eq(_count_bucket_versioned_objs(bucket), 1) + + delete_markers = [] + for o in result.deleted: + if o.delete_marker: + delete_markers.insert(0, o) + + eq(len(delete_markers), 1) + + for o in bucket.list_versions(): + eq(o.name, keyname) + eq(o.version_id, delete_markers[0].delete_marker_version_id) + +@attr(resource='object') +@attr(method='put') +@attr(operation='change acl on an object version changes specific version') +@attr(assertion='works') +@attr('versioning') +def test_versioned_object_acl(): + bucket = get_new_bucket() + + check_configure_versioning_retry(bucket, True, "Enabled") + + keyname = 'foo' + + key0 = bucket.new_key(keyname) + key0.set_contents_from_string('bar') + key1 = bucket.new_key(keyname) + key1.set_contents_from_string('bla') + key2 = bucket.new_key(keyname) + key2.set_contents_from_string('zxc') + + stored_keys = [] + for key in bucket.list_versions(): + stored_keys.insert(0, key) + + k1 = stored_keys[1] + + policy = bucket.get_acl(key_name=k1.name, version_id=k1.version_id) + + default_policy = [ + dict( + permission='FULL_CONTROL', + id=policy.owner.id, + display_name=policy.owner.display_name, + uri=None, + email_address=None, + type='CanonicalUser', + ), + ] + + print repr(policy) + check_grants(policy.acl.grants, default_policy) + + bucket.set_canned_acl('public-read', key_name=k1.name, version_id=k1.version_id) + + policy = bucket.get_acl(key_name=k1.name, version_id=k1.version_id) + print repr(policy) + check_grants( + policy.acl.grants, + [ + dict( + permission='FULL_CONTROL', + id=policy.owner.id, + display_name=policy.owner.display_name, + uri=None, + email_address=None, + type='CanonicalUser', + ), + dict( + permission='READ', + id=None, + display_name=None, + uri='http://acs.amazonaws.com/groups/global/AllUsers', + email_address=None, + type='Group', + ), + ], + ) + + k = bucket.new_key(keyname) + check_grants(k.get_acl().acl.grants, default_policy) + + +def _do_create_object(bucket, objname, i): + k = bucket.new_key(objname) + k.set_contents_from_string('data {i}'.format(i=i)) + +def _do_remove_ver(bucket, obj): + bucket.delete_key(obj.name, version_id = obj.version_id) + +def _do_create_versioned_obj_concurrent(bucket, objname, num): + t = [] + for i in range(num): + thr = threading.Thread(target = _do_create_object, args=(bucket, objname, i)) + thr.start() + t.append(thr) + return t + +def _do_clear_versioned_bucket_concurrent(bucket): + t = [] + for o in bucket.list_versions(): + thr = threading.Thread(target = _do_remove_ver, args=(bucket, o)) + thr.start() + t.append(thr) + return t + +def _do_wait_completion(t): + for thr in t: + thr.join() + +@attr(resource='object') +@attr(method='put') +@attr(operation='concurrent creation of objects, concurrent removal') +@attr(assertion='works') +@attr('versioning') +def test_versioned_concurrent_object_create_concurrent_remove(): + bucket = get_new_bucket() + + check_configure_versioning_retry(bucket, True, "Enabled") + + keyname = 'myobj' + + num_objs = 5 + + for i in xrange(5): + t = _do_create_versioned_obj_concurrent(bucket, keyname, num_objs) + _do_wait_completion(t) + + eq(_count_bucket_versioned_objs(bucket), num_objs) + eq(len(bucket.get_all_keys()), 1) + + t = _do_clear_versioned_bucket_concurrent(bucket) + _do_wait_completion(t) + + eq(_count_bucket_versioned_objs(bucket), 0) + eq(len(bucket.get_all_keys()), 0) + +@attr(resource='object') +@attr(method='put') +@attr(operation='concurrent creation and removal of objects') +@attr(assertion='works') +@attr('versioning') +def test_versioned_concurrent_object_create_and_remove(): + bucket = get_new_bucket() + + check_configure_versioning_retry(bucket, True, "Enabled") + + keyname = 'myobj' + + num_objs = 3 + + all_threads = [] + + for i in xrange(3): + t = _do_create_versioned_obj_concurrent(bucket, keyname, num_objs) + all_threads.append(t) + + t = _do_clear_versioned_bucket_concurrent(bucket) + all_threads.append(t) + + + for t in all_threads: + _do_wait_completion(t) + + t = _do_clear_versioned_bucket_concurrent(bucket) + _do_wait_completion(t) + + eq(_count_bucket_versioned_objs(bucket), 0) + eq(len(bucket.get_all_keys()), 0)