mirror of
https://github.com/ceph/s3-tests.git
synced 2025-04-09 23:07:14 +00:00
Merge pull request #159 from theanalyst/wip-tagging
add tests for tagging functionality
This commit is contained in:
commit
1f9f9d461f
1 changed files with 437 additions and 0 deletions
|
@ -35,6 +35,7 @@ from urlparse import urlparse
|
||||||
from nose.tools import eq_ as eq
|
from nose.tools import eq_ as eq
|
||||||
from nose.plugins.attrib import attr
|
from nose.plugins.attrib import attr
|
||||||
from nose.plugins.skip import SkipTest
|
from nose.plugins.skip import SkipTest
|
||||||
|
from boto.s3.tagging import TagSet
|
||||||
|
|
||||||
from .utils import assert_raises
|
from .utils import assert_raises
|
||||||
from .utils import generate_random
|
from .utils import generate_random
|
||||||
|
@ -45,6 +46,7 @@ from email.header import decode_header
|
||||||
from ordereddict import OrderedDict
|
from ordereddict import OrderedDict
|
||||||
|
|
||||||
from boto.s3.cors import CORSConfiguration
|
from boto.s3.cors import CORSConfiguration
|
||||||
|
from urllib import quote_plus
|
||||||
|
|
||||||
from . import (
|
from . import (
|
||||||
nuke_prefixed_buckets,
|
nuke_prefixed_buckets,
|
||||||
|
@ -101,6 +103,14 @@ def tag(*tags):
|
||||||
return func
|
return func
|
||||||
return wrap
|
return wrap
|
||||||
|
|
||||||
|
def parse_s3_errorcode(error_xml):
|
||||||
|
"""
|
||||||
|
Given an S3 error response return the Error Code message.
|
||||||
|
Useful for apis not fully implemented in boto
|
||||||
|
"""
|
||||||
|
return ET.fromstring(error_xml).find('./Code').text
|
||||||
|
|
||||||
|
|
||||||
@attr(resource='bucket')
|
@attr(resource='bucket')
|
||||||
@attr(method='get')
|
@attr(method='get')
|
||||||
@attr(operation='list')
|
@attr(operation='list')
|
||||||
|
@ -148,6 +158,15 @@ def _get_keys_prefixes(li):
|
||||||
prefixes = [x for x in li if not isinstance(x, boto.s3.key.Key)]
|
prefixes = [x for x in li if not isinstance(x, boto.s3.key.Key)]
|
||||||
return (keys, prefixes)
|
return (keys, prefixes)
|
||||||
|
|
||||||
|
def _get_alt_connection():
|
||||||
|
return boto.s3.connection.S3Connection(
|
||||||
|
aws_access_key_id=s3['alt'].aws_access_key_id,
|
||||||
|
aws_secret_access_key=s3['alt'].aws_secret_access_key,
|
||||||
|
is_secure=s3['alt'].is_secure,
|
||||||
|
port=s3['alt'].port,
|
||||||
|
host=s3['alt'].host,
|
||||||
|
calling_format=s3['alt'].calling_format,
|
||||||
|
)
|
||||||
|
|
||||||
@attr(resource='bucket')
|
@attr(resource='bucket')
|
||||||
@attr(method='get')
|
@attr(method='get')
|
||||||
|
@ -8768,3 +8787,421 @@ def test_bucket_policy_set_condition_operator_end_with_IfExists():
|
||||||
res = _make_request('GET', bucket.name, bucket.get_key("foo"),
|
res = _make_request('GET', bucket.name, bucket.get_key("foo"),
|
||||||
request_headers={'referer': 'http://example.com'})
|
request_headers={'referer': 'http://example.com'})
|
||||||
eq(res.status, 403)
|
eq(res.status, 403)
|
||||||
|
|
||||||
|
|
||||||
|
def _tags_from_dict(d):
|
||||||
|
tag_list = []
|
||||||
|
for k,v in d.items():
|
||||||
|
tag_list.append({
|
||||||
|
'Key' : k,
|
||||||
|
'Value': v if v is not None else ''
|
||||||
|
})
|
||||||
|
|
||||||
|
return tag_list
|
||||||
|
|
||||||
|
class S3TestTagSet(TagSet):
|
||||||
|
'''
|
||||||
|
version of TagSet that supports comparision, so that we can compare tagsets
|
||||||
|
'''
|
||||||
|
def to_dict(self):
|
||||||
|
d = dict()
|
||||||
|
for tag in self:
|
||||||
|
d[tag.key] = tag.value
|
||||||
|
return d
|
||||||
|
|
||||||
|
def __eq__(self, other):
|
||||||
|
return self.to_dict() == other.to_dict()
|
||||||
|
|
||||||
|
def __str__(self):
|
||||||
|
s = ''
|
||||||
|
for tag in self:
|
||||||
|
if s:
|
||||||
|
s += '&'
|
||||||
|
s += quote_plus(tag.key)
|
||||||
|
v = tag.value
|
||||||
|
if v is not None and v != '':
|
||||||
|
s += '=' + quote_plus(v)
|
||||||
|
return s
|
||||||
|
|
||||||
|
def to_xml(self):
|
||||||
|
xml = '<Tagging>'
|
||||||
|
xml += super(S3TestTagSet,self).to_xml()
|
||||||
|
xml += '</Tagging>'
|
||||||
|
return xml
|
||||||
|
|
||||||
|
def _parse_tagging_xml(tags_xml):
|
||||||
|
# Apparently ETree doesn't understand namespaces well, so let's define it
|
||||||
|
ns = {"aws" : "http://s3.amazonaws.com/doc/2006-03-01/"}
|
||||||
|
tags_list = ET.fromstring(tags_xml).findall('./aws:TagSet/aws:Tag', ns)
|
||||||
|
tagset = S3TestTagSet()
|
||||||
|
|
||||||
|
for it in tags_list:
|
||||||
|
# unfortunately etree returns None when string is empty
|
||||||
|
tagset.add_tag(it.find('aws:Key', ns).text,it.find('aws:Value', ns).text or '')
|
||||||
|
return tagset
|
||||||
|
|
||||||
|
def _make_random_string(size):
|
||||||
|
return ''.join(random.choice(string.ascii_letters) for _ in range(size))
|
||||||
|
|
||||||
|
def _get_obj_tags_conn(conn, bucket_name, key_name):
|
||||||
|
res = conn.make_request('GET',bucket_name, key_name, query_args='tagging')
|
||||||
|
eq(res.status, 200)
|
||||||
|
return _parse_tagging_xml(res.read())
|
||||||
|
|
||||||
|
def _get_obj_tags(bucket, key_name):
|
||||||
|
# our _make_request doesn't sign query args, let's piggy back on boto
|
||||||
|
return _get_obj_tags_conn(bucket.connection, bucket.name, key_name)
|
||||||
|
|
||||||
|
def _put_obj_tags_conn(conn, bucket_name, key_name, tag_str):
|
||||||
|
return conn.make_request('PUT',bucket_name, key_name, query_args='tagging', data=tag_str)
|
||||||
|
|
||||||
|
def _put_obj_tags(bucket, key_name, tag_str):
|
||||||
|
return _put_obj_tags_conn(bucket.connection, bucket.name, key_name, tag_str)
|
||||||
|
|
||||||
|
def _delete_obj_tags(bucket, key_name):
|
||||||
|
return bucket.connection.make_request('DELETE', bucket.name, key_name, query_args='tagging')
|
||||||
|
|
||||||
|
def _create_simple_tagset(count):
|
||||||
|
tagset = S3TestTagSet()
|
||||||
|
for i in range(count):
|
||||||
|
tagset.add_tag('key'+str(i),'val'+str(i))
|
||||||
|
|
||||||
|
return tagset
|
||||||
|
|
||||||
|
@attr(resource='object')
|
||||||
|
@attr(method='get')
|
||||||
|
@attr(operation='Test Get/PutObjTagging output')
|
||||||
|
@attr(assertion='success')
|
||||||
|
@attr('tagging')
|
||||||
|
def test_get_obj_tagging():
|
||||||
|
bucket, key = _create_key_with_random_content('testputtags')
|
||||||
|
input_tagset = _create_simple_tagset(2)
|
||||||
|
res = _put_obj_tags(bucket, key.name, input_tagset.to_xml())
|
||||||
|
eq(res.status, 200)
|
||||||
|
res_tagset = _get_obj_tags(bucket, key.name)
|
||||||
|
eq(input_tagset, res_tagset)
|
||||||
|
|
||||||
|
@attr(resource='object')
|
||||||
|
@attr(method='get')
|
||||||
|
@attr(operation='Test HEAD obj tagging output')
|
||||||
|
@attr(assertion='success')
|
||||||
|
@attr('tagging')
|
||||||
|
def test_get_obj_head_tagging():
|
||||||
|
bucket, key = _create_key_with_random_content('testputtags')
|
||||||
|
count = 2
|
||||||
|
input_tagset = _create_simple_tagset(count)
|
||||||
|
res = _put_obj_tags(bucket, key.name, input_tagset.to_xml())
|
||||||
|
eq(res.status, 200)
|
||||||
|
|
||||||
|
res = _make_request('HEAD',bucket, key, authenticated=True)
|
||||||
|
eq(res.status, 200)
|
||||||
|
eq(int(res.getheader('x-amz-tagging-count')), count)
|
||||||
|
|
||||||
|
@attr(resource='object')
|
||||||
|
@attr(method='get')
|
||||||
|
@attr(operation='Test Put max allowed tags')
|
||||||
|
@attr(assertion='success')
|
||||||
|
@attr('tagging')
|
||||||
|
def test_put_max_tags():
|
||||||
|
bucket, key = _create_key_with_random_content('testputmaxtags')
|
||||||
|
input_tagset = _create_simple_tagset(10)
|
||||||
|
res = _put_obj_tags(bucket, key.name, input_tagset.to_xml())
|
||||||
|
eq(res.status, 200)
|
||||||
|
|
||||||
|
res_tagset = _get_obj_tags(bucket, key.name)
|
||||||
|
eq(input_tagset, res_tagset)
|
||||||
|
|
||||||
|
@attr(resource='object')
|
||||||
|
@attr(method='get')
|
||||||
|
@attr(operation='Test Put max allowed tags')
|
||||||
|
@attr(assertion='fails')
|
||||||
|
@attr('tagging')
|
||||||
|
def test_put_excess_tags():
|
||||||
|
bucket, key = _create_key_with_random_content('testputexcesstags')
|
||||||
|
input_tagset = _create_simple_tagset(11)
|
||||||
|
res = _put_obj_tags(bucket, key.name, input_tagset.to_xml())
|
||||||
|
eq(res.status, 400)
|
||||||
|
eq(parse_s3_errorcode(res.read()), 'InvalidTag')
|
||||||
|
|
||||||
|
# Now assert that no tags have been put
|
||||||
|
res_tagset = _get_obj_tags(bucket, key.name)
|
||||||
|
eq(len(res_tagset), 0)
|
||||||
|
|
||||||
|
@attr(resource='object')
|
||||||
|
@attr(method='get')
|
||||||
|
@attr(operation='Test Put max allowed k-v size')
|
||||||
|
@attr(assertion='success')
|
||||||
|
@attr('tagging')
|
||||||
|
def test_put_max_kvsize_tags():
|
||||||
|
bucket, key = _create_key_with_random_content('testputmaxkeysize')
|
||||||
|
input_tagset = S3TestTagSet()
|
||||||
|
for i in range(10):
|
||||||
|
k = _make_random_string(128)
|
||||||
|
v = _make_random_string(256)
|
||||||
|
input_tagset.add_tag(k, v)
|
||||||
|
res = _put_obj_tags(bucket, key.name, input_tagset.to_xml())
|
||||||
|
eq(res.status, 200)
|
||||||
|
|
||||||
|
res_tagset = _get_obj_tags(bucket, key.name)
|
||||||
|
eq(input_tagset, res_tagset)
|
||||||
|
|
||||||
|
@attr(resource='object')
|
||||||
|
@attr(method='get')
|
||||||
|
@attr(operation='Test exceed key size')
|
||||||
|
@attr(assertion='success')
|
||||||
|
@attr('tagging')
|
||||||
|
def test_put_excess_key_tags():
|
||||||
|
bucket, key = _create_key_with_random_content('testputexcesskeytags')
|
||||||
|
input_tagset = S3TestTagSet()
|
||||||
|
for i in range(10):
|
||||||
|
k = _make_random_string(129)
|
||||||
|
v = _make_random_string(256)
|
||||||
|
input_tagset.add_tag(k, v)
|
||||||
|
res = _put_obj_tags(bucket, key.name, input_tagset.to_xml())
|
||||||
|
eq(res.status, 400)
|
||||||
|
eq(parse_s3_errorcode(res.read()), 'InvalidTag')
|
||||||
|
|
||||||
|
# Now assert that no tags have been put
|
||||||
|
res_tagset = _get_obj_tags(bucket, key.name)
|
||||||
|
eq(len(res_tagset), 0)
|
||||||
|
|
||||||
|
@attr(resource='object')
|
||||||
|
@attr(method='get')
|
||||||
|
@attr(operation='Test exceed val size')
|
||||||
|
@attr(assertion='success')
|
||||||
|
@attr('tagging')
|
||||||
|
def test_put_excess_val_tags():
|
||||||
|
bucket, key = _create_key_with_random_content('testputexcessvaltags')
|
||||||
|
input_tagset = S3TestTagSet()
|
||||||
|
for i in range(10):
|
||||||
|
k = _make_random_string(128)
|
||||||
|
v = _make_random_string(257)
|
||||||
|
input_tagset.add_tag(k, v)
|
||||||
|
res = _put_obj_tags(bucket, key.name, input_tagset.to_xml())
|
||||||
|
eq(res.status, 400)
|
||||||
|
eq(parse_s3_errorcode(res.read()), 'InvalidTag')
|
||||||
|
|
||||||
|
# Now assert that no tags have been put
|
||||||
|
res_tagset = _get_obj_tags(bucket, key.name)
|
||||||
|
eq(len(res_tagset), 0)
|
||||||
|
|
||||||
|
@attr(resource='object')
|
||||||
|
@attr(method='get')
|
||||||
|
@attr(operation='Test PUT modifies existing tags')
|
||||||
|
@attr(assertion='success')
|
||||||
|
@attr('tagging')
|
||||||
|
def test_put_modify_tags():
|
||||||
|
bucket, key = _create_key_with_random_content('testputmodifytags')
|
||||||
|
input_tagset = S3TestTagSet()
|
||||||
|
input_tagset.add_tag('key','val')
|
||||||
|
input_tagset.add_tag('key2','val2')
|
||||||
|
|
||||||
|
res = _put_obj_tags(bucket, key.name, input_tagset.to_xml())
|
||||||
|
eq(res.status, 200)
|
||||||
|
res_tagset = _get_obj_tags(bucket, key.name)
|
||||||
|
eq(input_tagset, res_tagset)
|
||||||
|
|
||||||
|
input2_tagset = S3TestTagSet()
|
||||||
|
input2_tagset.add_tag('key3','val3')
|
||||||
|
|
||||||
|
res = _put_obj_tags(bucket, key.name, input2_tagset.to_xml())
|
||||||
|
eq(res.status, 200)
|
||||||
|
res2_tagset = _get_obj_tags(bucket, key.name)
|
||||||
|
eq(input2_tagset, res2_tagset)
|
||||||
|
|
||||||
|
@attr(resource='object')
|
||||||
|
@attr(method='get')
|
||||||
|
@attr(operation='Test Delete tags')
|
||||||
|
@attr(assertion='success')
|
||||||
|
@attr('tagging')
|
||||||
|
def test_put_delete_tags():
|
||||||
|
bucket, key = _create_key_with_random_content('testputmodifytags')
|
||||||
|
input_tagset = _create_simple_tagset(2)
|
||||||
|
res = _put_obj_tags(bucket, key.name, input_tagset.to_xml())
|
||||||
|
eq(res.status, 200)
|
||||||
|
res_tagset = _get_obj_tags(bucket, key.name)
|
||||||
|
eq(input_tagset, res_tagset)
|
||||||
|
|
||||||
|
input2_tagset = S3TestTagSet()
|
||||||
|
input2_tagset.add_tag('key3','val3')
|
||||||
|
|
||||||
|
res = _delete_obj_tags(bucket, key.name)
|
||||||
|
eq(res.status, 204)
|
||||||
|
|
||||||
|
# TODO do a test to verify that we've *only* removed the xattr relating to
|
||||||
|
# tagging
|
||||||
|
res2_tagset = _get_obj_tags(bucket, key.name)
|
||||||
|
eq(len(res2_tagset), 0)
|
||||||
|
|
||||||
|
@attr(resource='object')
|
||||||
|
@attr(method='post')
|
||||||
|
@attr(operation='anonymous browser based upload via POST request')
|
||||||
|
@attr('tagging')
|
||||||
|
@attr(assertion='succeeds and returns written data')
|
||||||
|
def test_post_object_tags_anonymous_request():
|
||||||
|
bucket = get_new_bucket()
|
||||||
|
url = _get_post_url(s3.main, bucket)
|
||||||
|
bucket.set_acl('public-read-write')
|
||||||
|
input_tagset = _create_simple_tagset(2)
|
||||||
|
key_name = "foo.txt"
|
||||||
|
payload = OrderedDict([
|
||||||
|
("key" , key_name),
|
||||||
|
("acl" , "public-read"),
|
||||||
|
("Content-Type" , "text/plain"),
|
||||||
|
("tagging", input_tagset.to_xml()),
|
||||||
|
('file', ('bar')),
|
||||||
|
])
|
||||||
|
|
||||||
|
r = requests.post(url, files = payload)
|
||||||
|
eq(r.status_code, 204)
|
||||||
|
key = bucket.get_key("foo.txt")
|
||||||
|
got = key.get_contents_as_string()
|
||||||
|
eq(got, 'bar')
|
||||||
|
|
||||||
|
res_tagset = _get_obj_tags(bucket, key_name)
|
||||||
|
eq(input_tagset, res_tagset)
|
||||||
|
|
||||||
|
@attr(resource='object')
|
||||||
|
@attr(method='post')
|
||||||
|
@attr(operation='authenticated browser based upload via POST request')
|
||||||
|
@attr('tagging')
|
||||||
|
@attr(assertion='succeeds and returns written data')
|
||||||
|
def test_post_object_tags_authenticated_request():
|
||||||
|
bucket = get_new_bucket()
|
||||||
|
|
||||||
|
url = _get_post_url(s3.main, bucket)
|
||||||
|
|
||||||
|
utc = pytz.utc
|
||||||
|
expires = datetime.datetime.now(utc) + datetime.timedelta(seconds=+6000)
|
||||||
|
|
||||||
|
policy_document = {"expiration": expires.strftime("%Y-%m-%dT%H:%M:%SZ"),\
|
||||||
|
"conditions": [
|
||||||
|
{"bucket": bucket.name},
|
||||||
|
["starts-with", "$key", "foo"],
|
||||||
|
{"acl": "private"},
|
||||||
|
["starts-with", "$Content-Type", "text/plain"],
|
||||||
|
["content-length-range", 0, 1024],
|
||||||
|
["starts-with", "$tagging", ""]
|
||||||
|
]}
|
||||||
|
input_tagset = _create_simple_tagset(2)
|
||||||
|
|
||||||
|
json_policy_document = json.JSONEncoder().encode(policy_document)
|
||||||
|
policy = base64.b64encode(json_policy_document)
|
||||||
|
conn = s3.main
|
||||||
|
signature = base64.b64encode(hmac.new(conn.aws_secret_access_key, policy, sha).digest())
|
||||||
|
|
||||||
|
payload = OrderedDict([
|
||||||
|
("key" , "foo.txt"),
|
||||||
|
("AWSAccessKeyId" , conn.aws_access_key_id),
|
||||||
|
("acl" , "private"),("signature" , signature),("policy" , policy),
|
||||||
|
('tagging',input_tagset.to_xml()),
|
||||||
|
("Content-Type" , "text/plain"),
|
||||||
|
('file', ('bar'))])
|
||||||
|
|
||||||
|
r = requests.post(url, files = payload)
|
||||||
|
eq(r.status_code, 204)
|
||||||
|
key = bucket.get_key("foo.txt")
|
||||||
|
got = key.get_contents_as_string()
|
||||||
|
eq(got, 'bar')
|
||||||
|
|
||||||
|
@attr(resource='object')
|
||||||
|
@attr(method='put')
|
||||||
|
@attr(operation='Test PutObj with tagging headers')
|
||||||
|
@attr(assertion='success')
|
||||||
|
@attr('tagging')
|
||||||
|
def test_put_obj_with_tags():
|
||||||
|
input_tagset = S3TestTagSet()
|
||||||
|
input_tagset.add_tag('foo','bar')
|
||||||
|
input_tagset.add_tag('bar', '')
|
||||||
|
put_obj_tag_headers = {
|
||||||
|
'x-amz-tagging' : str(input_tagset)
|
||||||
|
}
|
||||||
|
bucket = get_new_bucket()
|
||||||
|
key = bucket.new_key('testtagobj1')
|
||||||
|
data = 'A'*100
|
||||||
|
key.set_contents_from_string(data, headers=put_obj_tag_headers)
|
||||||
|
result = key.get_contents_as_string()
|
||||||
|
eq(data,result)
|
||||||
|
|
||||||
|
res_tagset = _get_obj_tags(bucket, key.name)
|
||||||
|
eq(input_tagset.to_dict(), res_tagset.to_dict())
|
||||||
|
|
||||||
|
def _make_arn_resource(path="*"):
|
||||||
|
return "arn:aws:s3:::{}".format(path)
|
||||||
|
|
||||||
|
def make_json_policy(action, resource, principal={"AWS": "*"}):
|
||||||
|
return json.dumps(
|
||||||
|
{
|
||||||
|
"Version": "2012-10-17",
|
||||||
|
"Statement": [{
|
||||||
|
"Effect": "Allow",
|
||||||
|
"Principal": principal,
|
||||||
|
"Action": action,
|
||||||
|
"Resource": [
|
||||||
|
resource
|
||||||
|
]
|
||||||
|
}]
|
||||||
|
})
|
||||||
|
|
||||||
|
@attr(resource='object')
|
||||||
|
@attr(method='get')
|
||||||
|
@attr(operation='Test GetObjTagging public read')
|
||||||
|
@attr(assertion='success')
|
||||||
|
@attr('tagging')
|
||||||
|
def test_get_tags_acl_public():
|
||||||
|
bucket, key = _create_key_with_random_content('testputtagsacl')
|
||||||
|
|
||||||
|
resource = _make_arn_resource("{}/{}".format(bucket.name, key.name))
|
||||||
|
policy_document = make_json_policy("s3:GetObjectTagging",
|
||||||
|
resource)
|
||||||
|
|
||||||
|
bucket.set_policy(policy_document)
|
||||||
|
input_tagset = _create_simple_tagset(10)
|
||||||
|
res = _put_obj_tags(bucket, key.name, input_tagset.to_xml())
|
||||||
|
eq(res.status, 200)
|
||||||
|
new_conn = _get_alt_connection()
|
||||||
|
res_tagset = _get_obj_tags_conn(new_conn, bucket.name, key.name)
|
||||||
|
eq(input_tagset, res_tagset)
|
||||||
|
|
||||||
|
@attr(resource='object')
|
||||||
|
@attr(method='get')
|
||||||
|
@attr(operation='Test PutObjTagging public wrote')
|
||||||
|
@attr(assertion='success')
|
||||||
|
@attr('tagging')
|
||||||
|
def test_put_tags_acl_public():
|
||||||
|
bucket, key = _create_key_with_random_content('testputtagsacl')
|
||||||
|
|
||||||
|
resource = _make_arn_resource("{}/{}".format(bucket.name, key.name))
|
||||||
|
#principal = {"AWS": "s3test2"} This needs a tenanted user?
|
||||||
|
policy_document = make_json_policy("s3:PutObjectTagging",
|
||||||
|
resource)
|
||||||
|
bucket.set_policy(policy_document)
|
||||||
|
new_conn = _get_alt_connection()
|
||||||
|
input_tagset = _create_simple_tagset(10)
|
||||||
|
res = _put_obj_tags_conn(new_conn, bucket.name, key.name, input_tagset.to_xml())
|
||||||
|
eq(res.status, 200)
|
||||||
|
res_tagset = _get_obj_tags(bucket, key.name)
|
||||||
|
eq(input_tagset, res_tagset)
|
||||||
|
|
||||||
|
@attr(resource='object')
|
||||||
|
@attr(method='get')
|
||||||
|
@attr(operation='Test DeleteObjTagging public')
|
||||||
|
@attr(assertion='success')
|
||||||
|
@attr('tagging')
|
||||||
|
def test_delete_tags_obj_public():
|
||||||
|
bucket, key = _create_key_with_random_content('testputtagsacl')
|
||||||
|
|
||||||
|
resource = _make_arn_resource("{}/{}".format(bucket.name, key.name))
|
||||||
|
policy_document = make_json_policy("s3:DeleteObjectTagging",
|
||||||
|
resource)
|
||||||
|
|
||||||
|
bucket.set_policy(policy_document)
|
||||||
|
input_tagset = _create_simple_tagset(10)
|
||||||
|
res = _put_obj_tags(bucket, key.name, input_tagset.to_xml())
|
||||||
|
eq(res.status, 200)
|
||||||
|
new_conn = _get_alt_connection()
|
||||||
|
res = new_conn.make_request("DELETE",bucket.name, key.name, query_args='tagging')
|
||||||
|
eq(res.status, 204)
|
||||||
|
tags = _get_obj_tags(bucket, key.name)
|
||||||
|
eq(len(tags),0)
|
||||||
|
#eq(input_tagset, res_tagset)
|
||||||
|
|
Loading…
Add table
Reference in a new issue