diff --git a/s3tests/functional/test_s3.py b/s3tests/functional/test_s3.py index 716ec1c..9e7cedc 100644 --- a/s3tests/functional/test_s3.py +++ b/s3tests/functional/test_s3.py @@ -4940,42 +4940,110 @@ def test_object_copy_key_not_found(): def test_object_copy_versioned_bucket(): bucket = get_new_bucket() check_configure_versioning_retry(bucket, True, "Enabled") - key = bucket.new_key('foo123bar') - _create_key_with_random_content(key.name, 1*1024*1024) + size = 1*1024*1024 + data = str(bytearray(size)) + key.set_contents_from_string(data) # copy object in the same bucket key2 = bucket.copy_key('bar321foo', bucket.name, key.name, src_version_id = key.version_id) - res = _make_request('GET', bucket, key2) - eq(res.status, 200) - eq(res.reason, 'OK') + key2 = bucket.get_key(key2.name) + eq(key2.size, size) + got = key2.get_contents_as_string() + eq(got, data) # second copy key3 = bucket.copy_key('bar321foo2', bucket.name, key2.name, src_version_id = key2.version_id) - res = _make_request('GET', bucket, key3) - eq(res.status, 200) - eq(res.reason, 'OK') + key3 = bucket.get_key(key3.name) + eq(key3.size, size) + got = key3.get_contents_as_string() + eq(got, data) # copy to another versioned bucket bucket2 = get_new_bucket() check_configure_versioning_retry(bucket2, True, "Enabled") key4 = bucket2.copy_key('bar321foo3', bucket.name, key.name, src_version_id = key.version_id) - res = _make_request('GET', bucket, key4) - eq(res.status, 200) - eq(res.reason, 'OK') + key4 = bucket2.get_key(key4.name) + eq(key4.size, size) + got = key4.get_contents_as_string() + eq(got, data) # copy to another non versioned bucket bucket3 = get_new_bucket() key5 = bucket3.copy_key('bar321foo4', bucket.name, key.name , src_version_id = key.version_id) - res = _make_request('GET', bucket, key5) - eq(res.status, 200) - eq(res.reason, 'OK') + key5 = bucket3.get_key(key5.name) + eq(key5.size, size) + got = key5.get_contents_as_string() + eq(got, data) # copy from a non versioned bucket key6 = bucket.copy_key('foo123bar2', bucket3.name, key5.name) - res = _make_request('GET', bucket, key6) - eq(res.status, 200) - eq(res.reason, 'OK') + key6 = bucket.get_key(key6.name) + eq(key6.size, size) + got = key6.get_contents_as_string() + eq(got, data) + +@attr(resource='object') +@attr(method='put') +@attr(operation='test copy object of a multipart upload') +@attr(assertion='successful') +def test_object_copy_versioning_multipart_upload(): + bucket = get_new_bucket() + check_configure_versioning_retry(bucket, True, "Enabled") + key_name="srcmultipart" + content_type='text/bla' + objlen = 30 * 1024 * 1024 + (upload, data) = _multipart_upload(bucket, key_name, objlen, headers={'Content-Type': content_type}, metadata={'foo': 'bar'}) + upload.complete_upload() + key = bucket.get_key(key_name) + + # copy object in the same bucket + key2 = bucket.copy_key('dstmultipart', bucket.name, key.name, src_version_id = key.version_id) + key2 = bucket.get_key(key2.name) + eq(key2.metadata['foo'], 'bar') + eq(key2.content_type, content_type) + eq(key2.size, key.size) + got = key2.get_contents_as_string() + eq(got, data) + + # second copy + key3 = bucket.copy_key('dstmultipart2', bucket.name, key2.name, src_version_id = key2.version_id) + key3 = bucket.get_key(key3.name) + eq(key3.metadata['foo'], 'bar') + eq(key3.content_type, content_type) + eq(key3.size, key.size) + got = key3.get_contents_as_string() + eq(got, data) + + # copy to another versioned bucket + bucket2 = get_new_bucket() + check_configure_versioning_retry(bucket2, True, "Enabled") + key4 = bucket2.copy_key('dstmultipart3', bucket.name, key.name, src_version_id = key.version_id) + key4 = bucket2.get_key(key4.name) + eq(key4.metadata['foo'], 'bar') + eq(key4.content_type, content_type) + eq(key4.size, key.size) + got = key4.get_contents_as_string() + eq(got, data) + + # copy to another non versioned bucket + bucket3 = get_new_bucket() + key5 = bucket3.copy_key('dstmultipart4', bucket.name, key.name, src_version_id = key.version_id) + key5 = bucket3.get_key(key5.name) + eq(key5.metadata['foo'], 'bar') + eq(key5.content_type, content_type) + eq(key5.size, key.size) + got = key5.get_contents_as_string() + eq(got, data) + + # copy from a non versioned bucket + key6 = bucket.copy_key('dstmultipart5', bucket3.name, key5.name) + key6 = bucket3.get_key(key6.name) + eq(key6.metadata['foo'], 'bar') + eq(key6.content_type, content_type) + eq(key6.size, key.size) + got = key6.get_contents_as_string() + eq(got, data) def transfer_part(bucket, mp_id, mp_keyname, i, part): """Transfer a part of a multipart upload. Designed to be run in parallel.