s3-tests/s3tests/common.py

271 lines
8.2 KiB
Python
Raw Normal View History

import boto.s3.connection
import bunch
import itertools
import os
import random
import string
import yaml
from lxml import etree
s3 = bunch.Bunch()
config = bunch.Bunch()
prefix = ''
bucket_counter = itertools.count(1)
key_counter = itertools.count(1)
def choose_bucket_prefix(template, max_len=30):
"""
Choose a prefix for our test buckets, so they're easy to identify.
2011-07-11 20:19:54 +00:00
Use template and feed it more and more random filler, until it's
as long as possible but still below max_len.
"""
rand = ''.join(
random.choice(string.ascii_lowercase + string.digits)
for c in range(255)
)
2011-07-11 20:19:54 +00:00
while rand:
s = template.format(random=rand)
if len(s) <= max_len:
return s
rand = rand[:-1]
2011-07-11 20:19:54 +00:00
raise RuntimeError(
'Bucket prefix template is impossible to fulfill: {template!r}'.format(
template=template,
),
)
2011-07-26 20:37:27 +00:00
def nuke_bucket(bucket):
try:
bucket.set_canned_acl('private')
# TODO: deleted_cnt and the while loop is a work around for rgw
# not sending the
deleted_cnt = 1
while deleted_cnt:
deleted_cnt = 0
for key in bucket.list():
print 'Cleaning bucket {bucket} key {key}'.format(
bucket=bucket,
key=key,
)
key.set_canned_acl('private')
key.delete()
deleted_cnt += 1
bucket.delete()
except boto.exception.S3ResponseError as e:
# TODO workaround for buggy rgw that fails to send
# error_code, remove
if (e.status == 403
and e.error_code is None
and e.body == ''):
e.error_code = 'AccessDenied'
if e.error_code != 'AccessDenied':
print 'GOT UNWANTED ERROR', e.error_code
raise
# seems like we're not the owner of the bucket; ignore
pass
def nuke_prefixed_buckets():
for name, conn in s3.items():
print 'Cleaning buckets from connection {name}'.format(name=name)
for bucket in conn.get_all_buckets():
if bucket.name.startswith(prefix):
print 'Cleaning bucket {bucket}'.format(bucket=bucket)
2011-07-26 20:37:27 +00:00
nuke_bucket(bucket)
2011-07-11 20:19:54 +00:00
print 'Done with cleanup of test buckets.'
2011-07-26 20:16:34 +00:00
def read_config(fp):
config = bunch.Bunch()
g = yaml.safe_load_all(fp)
for new in g:
config.update(bunch.bunchify(new))
return config
2011-07-26 20:37:17 +00:00
def connect(conf):
mapping = dict(
port='port',
host='host',
is_secure='is_secure',
access_key='aws_access_key_id',
secret_key='aws_secret_access_key',
)
kwargs = dict((mapping[k],v) for (k,v) in conf.iteritems() if k in mapping)
#process calling_format argument
calling_formats = dict(
ordinary=boto.s3.connection.OrdinaryCallingFormat(),
subdomain=boto.s3.connection.SubdomainCallingFormat(),
vhost=boto.s3.connection.VHostCallingFormat(),
2011-07-26 20:37:17 +00:00
)
kwargs['calling_format'] = calling_formats['ordinary']
if conf.has_key('calling_format'):
raw_calling_format = conf['calling_format']
try:
kwargs['calling_format'] = calling_formats[raw_calling_format]
except KeyError:
raise RuntimeError(
'calling_format unknown: %r' % raw_calling_format
)
# TODO test vhost calling format
conn = boto.s3.connection.S3Connection(**kwargs)
2011-07-26 20:37:17 +00:00
return conn
def setup():
global s3, config, prefix
s3.clear()
config.clear()
2011-07-11 20:19:54 +00:00
try:
path = os.environ['S3TEST_CONF']
except KeyError:
raise RuntimeError(
'To run tests, point environment '
+ 'variable S3TEST_CONF to a config file.',
)
with file(path) as f:
2011-07-26 20:16:34 +00:00
config.update(read_config(f))
2011-07-11 20:19:54 +00:00
# These 3 should always be present.
2011-07-11 20:21:11 +00:00
if 's3' not in config:
raise RuntimeError('Your config file is missing the s3 section!')
2011-07-11 20:21:11 +00:00
if 'defaults' not in config.s3:
raise RuntimeError('Your config file is missing the s3.defaults section!')
2011-07-11 20:21:11 +00:00
if 'fixtures' not in config:
raise RuntimeError('Your config file is missing the fixtures section!')
2011-07-11 20:19:54 +00:00
2011-07-11 20:24:24 +00:00
template = config.fixtures.get('bucket prefix', 'test-{random}-')
prefix = choose_bucket_prefix(template=template)
if prefix == '':
2011-07-11 20:22:19 +00:00
raise RuntimeError("Empty Prefix! Aborting!")
2011-07-11 20:19:54 +00:00
defaults = config.s3.defaults
for section in config.s3.keys():
if section == 'defaults':
continue
2011-07-26 20:37:17 +00:00
conf = {}
conf.update(defaults)
conf.update(config.s3[section])
conn = connect(conf)
s3[section] = conn
2011-07-11 20:19:54 +00:00
# WARNING! we actively delete all buckets we see with the prefix
# we've chosen! Choose your prefix with care, and don't reuse
# credentials!
2011-07-11 20:19:54 +00:00
# We also assume nobody else is going to use buckets with that
# prefix. This is racy but given enough randomness, should not
# really fail.
nuke_prefixed_buckets()
def get_new_bucket(connection=None):
"""
Get a bucket that exists and is empty.
2011-07-11 20:19:54 +00:00
Always recreates a bucket from scratch. This is useful to also
reset ACLs and such.
"""
if connection is None:
connection = s3.main
name = '{prefix}{num}'.format(
prefix=prefix,
num=next(bucket_counter),
)
# the only way for this to fail with a pre-existing bucket is if
# someone raced us between setup nuke_prefixed_buckets and here;
# ignore that as astronomically unlikely
bucket = connection.create_bucket(name)
return bucket
2011-07-07 23:05:13 +00:00
def teardown():
2011-07-11 20:19:54 +00:00
nuke_prefixed_buckets()
def with_setup_kwargs(setup, teardown=None):
"""Decorator to add setup and/or teardown methods to a test function::
@with_setup_args(setup, teardown)
def test_something():
" ... "
The setup function should return (kwargs) which will be passed to
test function, and teardown function.
Note that `with_setup_kwargs` is useful *only* for test functions, not for test
methods or inside of TestCase subclasses.
"""
def decorate(func):
kwargs = {}
def test_wrapped(*args, **kwargs2):
k2 = kwargs.copy()
k2.update(kwargs2)
k2['testname'] = func.__name__
func(*args, **k2)
test_wrapped.__name__ = func.__name__
def setup_wrapped():
k = setup()
kwargs.update(k)
if hasattr(func, 'setup'):
func.setup()
test_wrapped.setup = setup_wrapped
if teardown:
def teardown_wrapped():
if hasattr(func, 'teardown'):
func.teardown()
teardown(**kwargs)
test_wrapped.teardown = teardown_wrapped
else:
if hasattr(func, 'teardown'):
test_wrapped.teardown = func.teardown()
return test_wrapped
return decorate
# Demo case for the above, when you run test_gen():
# _test_gen will run twice,
# with the following stderr printing
# setup_func {'b': 2}
# testcase ('1',) {'b': 2, 'testname': '_test_gen'}
# teardown_func {'b': 2}
# setup_func {'b': 2}
# testcase () {'b': 2, 'testname': '_test_gen'}
# teardown_func {'b': 2}
#
#def setup_func():
# kwargs = {'b': 2}
# print("setup_func", kwargs, file=sys.stderr)
# return kwargs
#
#def teardown_func(**kwargs):
# print("teardown_func", kwargs, file=sys.stderr)
#
#@with_setup_kwargs(setup=setup_func, teardown=teardown_func)
#def _test_gen(*args, **kwargs):
# print("testcase", args, kwargs, file=sys.stderr)
#
#def test_gen():
# yield _test_gen, '1'
# yield _test_gen
def normalize_xml_whitespace(xml, pretty_print=True):
root = etree.fromstring(xml.encode(encoding='ascii'))
for element in root.iter('*'):
if element.text is not None and not element.text.strip():
element.text = None
if element.text is not None:
element.text = element.text.strip().replace("\n","").replace("\r","")
if element.tail is not None and not element.tail.strip():
element.tail = None
if element.tail is not None:
element.tail = element.tail.strip().replace("\n","").replace("\r","")
return etree.tostring(root, encoding="utf-8", xml_declaration=True, pretty_print=pretty_print)