mirror of
https://github.com/ceph/s3-tests.git
synced 2025-05-13 05:18:16 +00:00
boto3: Foundation laid for boto3 tests
Added sample config file for boto3 and vstart.sh Modified setup.py, requirements.txt, and README Added an rgw_interactive.py to use interactively with vstart.sh and python -i Ported 400+ tests over to boto3 from functional/test_s3.py Signed-off-by: Ali Maredia <amaredia@redhat.com>
This commit is contained in:
parent
fa979f416d
commit
67f4f5d356
25 changed files with 13990 additions and 150 deletions
301
s3tests_boto3/common.py
Normal file
301
s3tests_boto3/common.py
Normal file
|
@ -0,0 +1,301 @@
|
|||
import boto.s3.connection
|
||||
import bunch
|
||||
import itertools
|
||||
import os
|
||||
import random
|
||||
import string
|
||||
import yaml
|
||||
import re
|
||||
from lxml import etree
|
||||
|
||||
from doctest import Example
|
||||
from lxml.doctestcompare import LXMLOutputChecker
|
||||
|
||||
s3 = bunch.Bunch()
|
||||
config = bunch.Bunch()
|
||||
prefix = ''
|
||||
|
||||
bucket_counter = itertools.count(1)
|
||||
key_counter = itertools.count(1)
|
||||
|
||||
def choose_bucket_prefix(template, max_len=30):
|
||||
"""
|
||||
Choose a prefix for our test buckets, so they're easy to identify.
|
||||
|
||||
Use template and feed it more and more random filler, until it's
|
||||
as long as possible but still below max_len.
|
||||
"""
|
||||
rand = ''.join(
|
||||
random.choice(string.ascii_lowercase + string.digits)
|
||||
for c in range(255)
|
||||
)
|
||||
|
||||
while rand:
|
||||
s = template.format(random=rand)
|
||||
if len(s) <= max_len:
|
||||
return s
|
||||
rand = rand[:-1]
|
||||
|
||||
raise RuntimeError(
|
||||
'Bucket prefix template is impossible to fulfill: {template!r}'.format(
|
||||
template=template,
|
||||
),
|
||||
)
|
||||
|
||||
def nuke_bucket(bucket):
|
||||
try:
|
||||
bucket.set_canned_acl('private')
|
||||
# TODO: deleted_cnt and the while loop is a work around for rgw
|
||||
# not sending the
|
||||
deleted_cnt = 1
|
||||
while deleted_cnt:
|
||||
deleted_cnt = 0
|
||||
for key in bucket.list():
|
||||
print 'Cleaning bucket {bucket} key {key}'.format(
|
||||
bucket=bucket,
|
||||
key=key,
|
||||
)
|
||||
key.set_canned_acl('private')
|
||||
key.delete()
|
||||
deleted_cnt += 1
|
||||
bucket.delete()
|
||||
except boto.exception.S3ResponseError as e:
|
||||
# TODO workaround for buggy rgw that fails to send
|
||||
# error_code, remove
|
||||
if (e.status == 403
|
||||
and e.error_code is None
|
||||
and e.body == ''):
|
||||
e.error_code = 'AccessDenied'
|
||||
if e.error_code != 'AccessDenied':
|
||||
print 'GOT UNWANTED ERROR', e.error_code
|
||||
raise
|
||||
# seems like we're not the owner of the bucket; ignore
|
||||
pass
|
||||
|
||||
def nuke_prefixed_buckets():
|
||||
for name, conn in s3.items():
|
||||
print 'Cleaning buckets from connection {name}'.format(name=name)
|
||||
for bucket in conn.get_all_buckets():
|
||||
if bucket.name.startswith(prefix):
|
||||
print 'Cleaning bucket {bucket}'.format(bucket=bucket)
|
||||
nuke_bucket(bucket)
|
||||
|
||||
print 'Done with cleanup of test buckets.'
|
||||
|
||||
def read_config(fp):
|
||||
config = bunch.Bunch()
|
||||
g = yaml.safe_load_all(fp)
|
||||
for new in g:
|
||||
config.update(bunch.bunchify(new))
|
||||
return config
|
||||
|
||||
def connect(conf):
|
||||
mapping = dict(
|
||||
port='port',
|
||||
host='host',
|
||||
is_secure='is_secure',
|
||||
access_key='aws_access_key_id',
|
||||
secret_key='aws_secret_access_key',
|
||||
)
|
||||
kwargs = dict((mapping[k],v) for (k,v) in conf.iteritems() if k in mapping)
|
||||
#process calling_format argument
|
||||
calling_formats = dict(
|
||||
ordinary=boto.s3.connection.OrdinaryCallingFormat(),
|
||||
subdomain=boto.s3.connection.SubdomainCallingFormat(),
|
||||
vhost=boto.s3.connection.VHostCallingFormat(),
|
||||
)
|
||||
kwargs['calling_format'] = calling_formats['ordinary']
|
||||
if conf.has_key('calling_format'):
|
||||
raw_calling_format = conf['calling_format']
|
||||
try:
|
||||
kwargs['calling_format'] = calling_formats[raw_calling_format]
|
||||
except KeyError:
|
||||
raise RuntimeError(
|
||||
'calling_format unknown: %r' % raw_calling_format
|
||||
)
|
||||
# TODO test vhost calling format
|
||||
conn = boto.s3.connection.S3Connection(**kwargs)
|
||||
return conn
|
||||
|
||||
def setup():
|
||||
global s3, config, prefix
|
||||
s3.clear()
|
||||
config.clear()
|
||||
|
||||
try:
|
||||
path = os.environ['S3TEST_CONF']
|
||||
except KeyError:
|
||||
raise RuntimeError(
|
||||
'To run tests, point environment '
|
||||
+ 'variable S3TEST_CONF to a config file.',
|
||||
)
|
||||
with file(path) as f:
|
||||
config.update(read_config(f))
|
||||
|
||||
# These 3 should always be present.
|
||||
if 's3' not in config:
|
||||
raise RuntimeError('Your config file is missing the s3 section!')
|
||||
if 'defaults' not in config.s3:
|
||||
raise RuntimeError('Your config file is missing the s3.defaults section!')
|
||||
if 'fixtures' not in config:
|
||||
raise RuntimeError('Your config file is missing the fixtures section!')
|
||||
|
||||
template = config.fixtures.get('bucket prefix', 'test-{random}-')
|
||||
prefix = choose_bucket_prefix(template=template)
|
||||
if prefix == '':
|
||||
raise RuntimeError("Empty Prefix! Aborting!")
|
||||
|
||||
defaults = config.s3.defaults
|
||||
for section in config.s3.keys():
|
||||
if section == 'defaults':
|
||||
continue
|
||||
|
||||
conf = {}
|
||||
conf.update(defaults)
|
||||
conf.update(config.s3[section])
|
||||
conn = connect(conf)
|
||||
s3[section] = conn
|
||||
|
||||
# WARNING! we actively delete all buckets we see with the prefix
|
||||
# we've chosen! Choose your prefix with care, and don't reuse
|
||||
# credentials!
|
||||
|
||||
# We also assume nobody else is going to use buckets with that
|
||||
# prefix. This is racy but given enough randomness, should not
|
||||
# really fail.
|
||||
nuke_prefixed_buckets()
|
||||
|
||||
def get_new_bucket(connection=None):
|
||||
"""
|
||||
Get a bucket that exists and is empty.
|
||||
|
||||
Always recreates a bucket from scratch. This is useful to also
|
||||
reset ACLs and such.
|
||||
"""
|
||||
if connection is None:
|
||||
connection = s3.main
|
||||
name = '{prefix}{num}'.format(
|
||||
prefix=prefix,
|
||||
num=next(bucket_counter),
|
||||
)
|
||||
# the only way for this to fail with a pre-existing bucket is if
|
||||
# someone raced us between setup nuke_prefixed_buckets and here;
|
||||
# ignore that as astronomically unlikely
|
||||
bucket = connection.create_bucket(name)
|
||||
return bucket
|
||||
|
||||
def teardown():
|
||||
nuke_prefixed_buckets()
|
||||
|
||||
def with_setup_kwargs(setup, teardown=None):
|
||||
"""Decorator to add setup and/or teardown methods to a test function::
|
||||
|
||||
@with_setup_args(setup, teardown)
|
||||
def test_something():
|
||||
" ... "
|
||||
|
||||
The setup function should return (kwargs) which will be passed to
|
||||
test function, and teardown function.
|
||||
|
||||
Note that `with_setup_kwargs` is useful *only* for test functions, not for test
|
||||
methods or inside of TestCase subclasses.
|
||||
"""
|
||||
def decorate(func):
|
||||
kwargs = {}
|
||||
|
||||
def test_wrapped(*args, **kwargs2):
|
||||
k2 = kwargs.copy()
|
||||
k2.update(kwargs2)
|
||||
k2['testname'] = func.__name__
|
||||
func(*args, **k2)
|
||||
|
||||
test_wrapped.__name__ = func.__name__
|
||||
|
||||
def setup_wrapped():
|
||||
k = setup()
|
||||
kwargs.update(k)
|
||||
if hasattr(func, 'setup'):
|
||||
func.setup()
|
||||
test_wrapped.setup = setup_wrapped
|
||||
|
||||
if teardown:
|
||||
def teardown_wrapped():
|
||||
if hasattr(func, 'teardown'):
|
||||
func.teardown()
|
||||
teardown(**kwargs)
|
||||
|
||||
test_wrapped.teardown = teardown_wrapped
|
||||
else:
|
||||
if hasattr(func, 'teardown'):
|
||||
test_wrapped.teardown = func.teardown()
|
||||
return test_wrapped
|
||||
return decorate
|
||||
|
||||
# Demo case for the above, when you run test_gen():
|
||||
# _test_gen will run twice,
|
||||
# with the following stderr printing
|
||||
# setup_func {'b': 2}
|
||||
# testcase ('1',) {'b': 2, 'testname': '_test_gen'}
|
||||
# teardown_func {'b': 2}
|
||||
# setup_func {'b': 2}
|
||||
# testcase () {'b': 2, 'testname': '_test_gen'}
|
||||
# teardown_func {'b': 2}
|
||||
#
|
||||
#def setup_func():
|
||||
# kwargs = {'b': 2}
|
||||
# print("setup_func", kwargs, file=sys.stderr)
|
||||
# return kwargs
|
||||
#
|
||||
#def teardown_func(**kwargs):
|
||||
# print("teardown_func", kwargs, file=sys.stderr)
|
||||
#
|
||||
#@with_setup_kwargs(setup=setup_func, teardown=teardown_func)
|
||||
#def _test_gen(*args, **kwargs):
|
||||
# print("testcase", args, kwargs, file=sys.stderr)
|
||||
#
|
||||
#def test_gen():
|
||||
# yield _test_gen, '1'
|
||||
# yield _test_gen
|
||||
|
||||
def trim_xml(xml_str):
|
||||
p = etree.XMLParser(remove_blank_text=True)
|
||||
elem = etree.XML(xml_str, parser=p)
|
||||
return etree.tostring(elem)
|
||||
|
||||
def normalize_xml(xml, pretty_print=True):
|
||||
if xml is None:
|
||||
return xml
|
||||
|
||||
root = etree.fromstring(xml.encode(encoding='ascii'))
|
||||
|
||||
for element in root.iter('*'):
|
||||
if element.text is not None and not element.text.strip():
|
||||
element.text = None
|
||||
if element.text is not None:
|
||||
element.text = element.text.strip().replace("\n", "").replace("\r", "")
|
||||
if element.tail is not None and not element.tail.strip():
|
||||
element.tail = None
|
||||
if element.tail is not None:
|
||||
element.tail = element.tail.strip().replace("\n", "").replace("\r", "")
|
||||
|
||||
# Sort the elements
|
||||
for parent in root.xpath('//*[./*]'): # Search for parent elements
|
||||
parent[:] = sorted(parent,key=lambda x: x.tag)
|
||||
|
||||
xmlstr = etree.tostring(root, encoding="utf-8", xml_declaration=True, pretty_print=pretty_print)
|
||||
# there are two different DTD URIs
|
||||
xmlstr = re.sub(r'xmlns="[^"]+"', 'xmlns="s3"', xmlstr)
|
||||
xmlstr = re.sub(r'xmlns=\'[^\']+\'', 'xmlns="s3"', xmlstr)
|
||||
for uri in ['http://doc.s3.amazonaws.com/doc/2006-03-01/', 'http://s3.amazonaws.com/doc/2006-03-01/']:
|
||||
xmlstr = xmlstr.replace(uri, 'URI-DTD')
|
||||
#xmlstr = re.sub(r'>\s+', '>', xmlstr, count=0, flags=re.MULTILINE)
|
||||
return xmlstr
|
||||
|
||||
def assert_xml_equal(got, want):
|
||||
assert want is not None, 'Wanted XML cannot be None'
|
||||
if got is None:
|
||||
raise AssertionError('Got input to validate was None')
|
||||
checker = LXMLOutputChecker()
|
||||
if not checker.check_output(want, got, 0):
|
||||
message = checker.output_difference(Example("", want), got, 0)
|
||||
raise AssertionError(message)
|
Loading…
Add table
Add a link
Reference in a new issue