diff --git a/s3tests/functional/test_s3.py b/s3tests/functional/test_s3.py index 7abfe7a..b8dc47f 100644 --- a/s3tests/functional/test_s3.py +++ b/s3tests/functional/test_s3.py @@ -35,6 +35,7 @@ from urlparse import urlparse from nose.tools import eq_ as eq from nose.plugins.attrib import attr from nose.plugins.skip import SkipTest +from boto.s3.tagging import TagSet from .utils import assert_raises from .utils import generate_random @@ -45,6 +46,7 @@ from email.header import decode_header from ordereddict import OrderedDict from boto.s3.cors import CORSConfiguration +from urllib import quote_plus from . import ( nuke_prefixed_buckets, @@ -101,6 +103,14 @@ def tag(*tags): return func return wrap +def parse_s3_errorcode(error_xml): + """ + Given an S3 error response return the Error Code message. + Useful for apis not fully implemented in boto + """ + return ET.fromstring(error_xml).find('./Code').text + + @attr(resource='bucket') @attr(method='get') @attr(operation='list') @@ -148,6 +158,15 @@ def _get_keys_prefixes(li): prefixes = [x for x in li if not isinstance(x, boto.s3.key.Key)] return (keys, prefixes) +def _get_alt_connection(): + return boto.s3.connection.S3Connection( + aws_access_key_id=s3['alt'].aws_access_key_id, + aws_secret_access_key=s3['alt'].aws_secret_access_key, + is_secure=s3['alt'].is_secure, + port=s3['alt'].port, + host=s3['alt'].host, + calling_format=s3['alt'].calling_format, + ) @attr(resource='bucket') @attr(method='get') @@ -8768,3 +8787,421 @@ def test_bucket_policy_set_condition_operator_end_with_IfExists(): res = _make_request('GET', bucket.name, bucket.get_key("foo"), request_headers={'referer': 'http://example.com'}) eq(res.status, 403) + + +def _tags_from_dict(d): + tag_list = [] + for k,v in d.items(): + tag_list.append({ + 'Key' : k, + 'Value': v if v is not None else '' + }) + + return tag_list + +class S3TestTagSet(TagSet): + ''' + version of TagSet that supports comparision, so that we can compare tagsets + ''' + def to_dict(self): + d = dict() + for tag in self: + d[tag.key] = tag.value + return d + + def __eq__(self, other): + return self.to_dict() == other.to_dict() + + def __str__(self): + s = '' + for tag in self: + if s: + s += '&' + s += quote_plus(tag.key) + v = tag.value + if v is not None and v != '': + s += '=' + quote_plus(v) + return s + + def to_xml(self): + xml = '' + xml += super(S3TestTagSet,self).to_xml() + xml += '' + return xml + +def _parse_tagging_xml(tags_xml): + # Apparently ETree doesn't understand namespaces well, so let's define it + ns = {"aws" : "http://s3.amazonaws.com/doc/2006-03-01/"} + tags_list = ET.fromstring(tags_xml).findall('./aws:TagSet/aws:Tag', ns) + tagset = S3TestTagSet() + + for it in tags_list: + # unfortunately etree returns None when string is empty + tagset.add_tag(it.find('aws:Key', ns).text,it.find('aws:Value', ns).text or '') + return tagset + +def _make_random_string(size): + return ''.join(random.choice(string.ascii_letters) for _ in range(size)) + +def _get_obj_tags_conn(conn, bucket_name, key_name): + res = conn.make_request('GET',bucket_name, key_name, query_args='tagging') + eq(res.status, 200) + return _parse_tagging_xml(res.read()) + +def _get_obj_tags(bucket, key_name): + # our _make_request doesn't sign query args, let's piggy back on boto + return _get_obj_tags_conn(bucket.connection, bucket.name, key_name) + +def _put_obj_tags_conn(conn, bucket_name, key_name, tag_str): + return conn.make_request('PUT',bucket_name, key_name, query_args='tagging', data=tag_str) + +def _put_obj_tags(bucket, key_name, tag_str): + return _put_obj_tags_conn(bucket.connection, bucket.name, key_name, tag_str) + +def _delete_obj_tags(bucket, key_name): + return bucket.connection.make_request('DELETE', bucket.name, key_name, query_args='tagging') + +def _create_simple_tagset(count): + tagset = S3TestTagSet() + for i in range(count): + tagset.add_tag('key'+str(i),'val'+str(i)) + + return tagset + +@attr(resource='object') +@attr(method='get') +@attr(operation='Test Get/PutObjTagging output') +@attr(assertion='success') +@attr('tagging') +def test_get_obj_tagging(): + bucket, key = _create_key_with_random_content('testputtags') + input_tagset = _create_simple_tagset(2) + res = _put_obj_tags(bucket, key.name, input_tagset.to_xml()) + eq(res.status, 200) + res_tagset = _get_obj_tags(bucket, key.name) + eq(input_tagset, res_tagset) + +@attr(resource='object') +@attr(method='get') +@attr(operation='Test HEAD obj tagging output') +@attr(assertion='success') +@attr('tagging') +def test_get_obj_head_tagging(): + bucket, key = _create_key_with_random_content('testputtags') + count = 2 + input_tagset = _create_simple_tagset(count) + res = _put_obj_tags(bucket, key.name, input_tagset.to_xml()) + eq(res.status, 200) + + res = _make_request('HEAD',bucket, key, authenticated=True) + eq(res.status, 200) + eq(int(res.getheader('x-amz-tagging-count')), count) + +@attr(resource='object') +@attr(method='get') +@attr(operation='Test Put max allowed tags') +@attr(assertion='success') +@attr('tagging') +def test_put_max_tags(): + bucket, key = _create_key_with_random_content('testputmaxtags') + input_tagset = _create_simple_tagset(10) + res = _put_obj_tags(bucket, key.name, input_tagset.to_xml()) + eq(res.status, 200) + + res_tagset = _get_obj_tags(bucket, key.name) + eq(input_tagset, res_tagset) + +@attr(resource='object') +@attr(method='get') +@attr(operation='Test Put max allowed tags') +@attr(assertion='fails') +@attr('tagging') +def test_put_excess_tags(): + bucket, key = _create_key_with_random_content('testputexcesstags') + input_tagset = _create_simple_tagset(11) + res = _put_obj_tags(bucket, key.name, input_tagset.to_xml()) + eq(res.status, 400) + eq(parse_s3_errorcode(res.read()), 'InvalidTag') + + # Now assert that no tags have been put + res_tagset = _get_obj_tags(bucket, key.name) + eq(len(res_tagset), 0) + +@attr(resource='object') +@attr(method='get') +@attr(operation='Test Put max allowed k-v size') +@attr(assertion='success') +@attr('tagging') +def test_put_max_kvsize_tags(): + bucket, key = _create_key_with_random_content('testputmaxkeysize') + input_tagset = S3TestTagSet() + for i in range(10): + k = _make_random_string(128) + v = _make_random_string(256) + input_tagset.add_tag(k, v) + res = _put_obj_tags(bucket, key.name, input_tagset.to_xml()) + eq(res.status, 200) + + res_tagset = _get_obj_tags(bucket, key.name) + eq(input_tagset, res_tagset) + +@attr(resource='object') +@attr(method='get') +@attr(operation='Test exceed key size') +@attr(assertion='success') +@attr('tagging') +def test_put_excess_key_tags(): + bucket, key = _create_key_with_random_content('testputexcesskeytags') + input_tagset = S3TestTagSet() + for i in range(10): + k = _make_random_string(129) + v = _make_random_string(256) + input_tagset.add_tag(k, v) + res = _put_obj_tags(bucket, key.name, input_tagset.to_xml()) + eq(res.status, 400) + eq(parse_s3_errorcode(res.read()), 'InvalidTag') + + # Now assert that no tags have been put + res_tagset = _get_obj_tags(bucket, key.name) + eq(len(res_tagset), 0) + +@attr(resource='object') +@attr(method='get') +@attr(operation='Test exceed val size') +@attr(assertion='success') +@attr('tagging') +def test_put_excess_val_tags(): + bucket, key = _create_key_with_random_content('testputexcessvaltags') + input_tagset = S3TestTagSet() + for i in range(10): + k = _make_random_string(128) + v = _make_random_string(257) + input_tagset.add_tag(k, v) + res = _put_obj_tags(bucket, key.name, input_tagset.to_xml()) + eq(res.status, 400) + eq(parse_s3_errorcode(res.read()), 'InvalidTag') + + # Now assert that no tags have been put + res_tagset = _get_obj_tags(bucket, key.name) + eq(len(res_tagset), 0) + +@attr(resource='object') +@attr(method='get') +@attr(operation='Test PUT modifies existing tags') +@attr(assertion='success') +@attr('tagging') +def test_put_modify_tags(): + bucket, key = _create_key_with_random_content('testputmodifytags') + input_tagset = S3TestTagSet() + input_tagset.add_tag('key','val') + input_tagset.add_tag('key2','val2') + + res = _put_obj_tags(bucket, key.name, input_tagset.to_xml()) + eq(res.status, 200) + res_tagset = _get_obj_tags(bucket, key.name) + eq(input_tagset, res_tagset) + + input2_tagset = S3TestTagSet() + input2_tagset.add_tag('key3','val3') + + res = _put_obj_tags(bucket, key.name, input2_tagset.to_xml()) + eq(res.status, 200) + res2_tagset = _get_obj_tags(bucket, key.name) + eq(input2_tagset, res2_tagset) + +@attr(resource='object') +@attr(method='get') +@attr(operation='Test Delete tags') +@attr(assertion='success') +@attr('tagging') +def test_put_delete_tags(): + bucket, key = _create_key_with_random_content('testputmodifytags') + input_tagset = _create_simple_tagset(2) + res = _put_obj_tags(bucket, key.name, input_tagset.to_xml()) + eq(res.status, 200) + res_tagset = _get_obj_tags(bucket, key.name) + eq(input_tagset, res_tagset) + + input2_tagset = S3TestTagSet() + input2_tagset.add_tag('key3','val3') + + res = _delete_obj_tags(bucket, key.name) + eq(res.status, 204) + + # TODO do a test to verify that we've *only* removed the xattr relating to + # tagging + res2_tagset = _get_obj_tags(bucket, key.name) + eq(len(res2_tagset), 0) + +@attr(resource='object') +@attr(method='post') +@attr(operation='anonymous browser based upload via POST request') +@attr('tagging') +@attr(assertion='succeeds and returns written data') +def test_post_object_tags_anonymous_request(): + bucket = get_new_bucket() + url = _get_post_url(s3.main, bucket) + bucket.set_acl('public-read-write') + input_tagset = _create_simple_tagset(2) + key_name = "foo.txt" + payload = OrderedDict([ + ("key" , key_name), + ("acl" , "public-read"), + ("Content-Type" , "text/plain"), + ("tagging", input_tagset.to_xml()), + ('file', ('bar')), + ]) + + r = requests.post(url, files = payload) + eq(r.status_code, 204) + key = bucket.get_key("foo.txt") + got = key.get_contents_as_string() + eq(got, 'bar') + + res_tagset = _get_obj_tags(bucket, key_name) + eq(input_tagset, res_tagset) + +@attr(resource='object') +@attr(method='post') +@attr(operation='authenticated browser based upload via POST request') +@attr('tagging') +@attr(assertion='succeeds and returns written data') +def test_post_object_tags_authenticated_request(): + bucket = get_new_bucket() + + url = _get_post_url(s3.main, bucket) + + utc = pytz.utc + expires = datetime.datetime.now(utc) + datetime.timedelta(seconds=+6000) + + policy_document = {"expiration": expires.strftime("%Y-%m-%dT%H:%M:%SZ"),\ + "conditions": [ + {"bucket": bucket.name}, + ["starts-with", "$key", "foo"], + {"acl": "private"}, + ["starts-with", "$Content-Type", "text/plain"], + ["content-length-range", 0, 1024], + ["starts-with", "$tagging", ""] + ]} + input_tagset = _create_simple_tagset(2) + + json_policy_document = json.JSONEncoder().encode(policy_document) + policy = base64.b64encode(json_policy_document) + conn = s3.main + signature = base64.b64encode(hmac.new(conn.aws_secret_access_key, policy, sha).digest()) + + payload = OrderedDict([ + ("key" , "foo.txt"), + ("AWSAccessKeyId" , conn.aws_access_key_id), + ("acl" , "private"),("signature" , signature),("policy" , policy), + ('tagging',input_tagset.to_xml()), + ("Content-Type" , "text/plain"), + ('file', ('bar'))]) + + r = requests.post(url, files = payload) + eq(r.status_code, 204) + key = bucket.get_key("foo.txt") + got = key.get_contents_as_string() + eq(got, 'bar') + +@attr(resource='object') +@attr(method='put') +@attr(operation='Test PutObj with tagging headers') +@attr(assertion='success') +@attr('tagging') +def test_put_obj_with_tags(): + input_tagset = S3TestTagSet() + input_tagset.add_tag('foo','bar') + input_tagset.add_tag('bar', '') + put_obj_tag_headers = { + 'x-amz-tagging' : str(input_tagset) + } + bucket = get_new_bucket() + key = bucket.new_key('testtagobj1') + data = 'A'*100 + key.set_contents_from_string(data, headers=put_obj_tag_headers) + result = key.get_contents_as_string() + eq(data,result) + + res_tagset = _get_obj_tags(bucket, key.name) + eq(input_tagset.to_dict(), res_tagset.to_dict()) + +def _make_arn_resource(path="*"): + return "arn:aws:s3:::{}".format(path) + +def make_json_policy(action, resource, principal={"AWS": "*"}): + return json.dumps( + { + "Version": "2012-10-17", + "Statement": [{ + "Effect": "Allow", + "Principal": principal, + "Action": action, + "Resource": [ + resource + ] + }] + }) + +@attr(resource='object') +@attr(method='get') +@attr(operation='Test GetObjTagging public read') +@attr(assertion='success') +@attr('tagging') +def test_get_tags_acl_public(): + bucket, key = _create_key_with_random_content('testputtagsacl') + + resource = _make_arn_resource("{}/{}".format(bucket.name, key.name)) + policy_document = make_json_policy("s3:GetObjectTagging", + resource) + + bucket.set_policy(policy_document) + input_tagset = _create_simple_tagset(10) + res = _put_obj_tags(bucket, key.name, input_tagset.to_xml()) + eq(res.status, 200) + new_conn = _get_alt_connection() + res_tagset = _get_obj_tags_conn(new_conn, bucket.name, key.name) + eq(input_tagset, res_tagset) + +@attr(resource='object') +@attr(method='get') +@attr(operation='Test PutObjTagging public wrote') +@attr(assertion='success') +@attr('tagging') +def test_put_tags_acl_public(): + bucket, key = _create_key_with_random_content('testputtagsacl') + + resource = _make_arn_resource("{}/{}".format(bucket.name, key.name)) + #principal = {"AWS": "s3test2"} This needs a tenanted user? + policy_document = make_json_policy("s3:PutObjectTagging", + resource) + bucket.set_policy(policy_document) + new_conn = _get_alt_connection() + input_tagset = _create_simple_tagset(10) + res = _put_obj_tags_conn(new_conn, bucket.name, key.name, input_tagset.to_xml()) + eq(res.status, 200) + res_tagset = _get_obj_tags(bucket, key.name) + eq(input_tagset, res_tagset) + +@attr(resource='object') +@attr(method='get') +@attr(operation='Test DeleteObjTagging public') +@attr(assertion='success') +@attr('tagging') +def test_delete_tags_obj_public(): + bucket, key = _create_key_with_random_content('testputtagsacl') + + resource = _make_arn_resource("{}/{}".format(bucket.name, key.name)) + policy_document = make_json_policy("s3:DeleteObjectTagging", + resource) + + bucket.set_policy(policy_document) + input_tagset = _create_simple_tagset(10) + res = _put_obj_tags(bucket, key.name, input_tagset.to_xml()) + eq(res.status, 200) + new_conn = _get_alt_connection() + res = new_conn.make_request("DELETE",bucket.name, key.name, query_args='tagging') + eq(res.status, 204) + tags = _get_obj_tags(bucket, key.name) + eq(len(tags),0) + #eq(input_tagset, res_tagset)