s3-tests/s3tests/functional/__init__.py
Robin H. Johnson de65c582b7 s3tests: do not exit cleanup after first bucket!
Bucket cleanup code was exiting after the first succesful bucket
removed, instead of cleaning up all buckets. This left many buckets
behind over time, causing large slowdowns of the test suite.

This was introduced in commit c41ebab9cf,
when changes for versioned files were added.

Signed-off-by: Kobi Laredo <kobi.laredo@dreamhost.com>
Signed-off-by: Robin H. Johnson <robin.johnson@dreamhost.com>
Backport: hammer
2015-12-01 17:28:23 -08:00

394 lines
12 KiB
Python

import ConfigParser
import boto.exception
import boto.s3.connection
import bunch
import itertools
import os
import random
import string
from .utils import region_sync_meta
s3 = bunch.Bunch()
config = bunch.Bunch()
targets = bunch.Bunch()
# this will be assigned by setup()
prefix = None
calling_formats = dict(
ordinary=boto.s3.connection.OrdinaryCallingFormat(),
subdomain=boto.s3.connection.SubdomainCallingFormat(),
vhost=boto.s3.connection.VHostCallingFormat(),
)
def get_prefix():
assert prefix is not None
return prefix
def is_slow_backend():
return slow_backend
def choose_bucket_prefix(template, max_len=30):
"""
Choose a prefix for our test buckets, so they're easy to identify.
Use template and feed it more and more random filler, until it's
as long as possible but still below max_len.
"""
rand = ''.join(
random.choice(string.ascii_lowercase + string.digits)
for c in range(255)
)
while rand:
s = template.format(random=rand)
if len(s) <= max_len:
return s
rand = rand[:-1]
raise RuntimeError(
'Bucket prefix template is impossible to fulfill: {template!r}'.format(
template=template,
),
)
def nuke_prefixed_buckets_on_conn(prefix, name, conn):
print 'Cleaning buckets from connection {name} prefix {prefix!r}.'.format(
name=name,
prefix=prefix,
)
for bucket in conn.get_all_buckets():
print 'prefix=',prefix
if bucket.name.startswith(prefix):
print 'Cleaning bucket {bucket}'.format(bucket=bucket)
success = False
for i in xrange(2):
try:
try:
iterator = iter(bucket.list_versions())
# peek into iterator to issue list operation
try:
keys = itertools.chain([next(iterator)], iterator)
except StopIteration:
keys = [] # empty iterator
except boto.exception.S3ResponseError as e:
# some S3 implementations do not support object
# versioning - fall back to listing without versions
if e.error_code != 'NotImplemented':
raise e
keys = bucket.list();
for key in keys:
print 'Cleaning bucket {bucket} key {key}'.format(
bucket=bucket,
key=key,
)
# key.set_canned_acl('private')
bucket.delete_key(key.name, version_id = key.version_id)
bucket.delete()
success = True
except boto.exception.S3ResponseError as e:
if e.error_code != 'AccessDenied':
print 'GOT UNWANTED ERROR', e.error_code
raise
# seems like we don't have permissions set appropriately, we'll
# modify permissions and retry
pass
if success:
break
bucket.set_canned_acl('private')
def nuke_prefixed_buckets(prefix):
# If no regions are specified, use the simple method
if targets.main.master == None:
for name, conn in s3.items():
print 'Deleting buckets on {name}'.format(name=name)
nuke_prefixed_buckets_on_conn(prefix, name, conn)
else:
# First, delete all buckets on the master connection
for name, conn in s3.items():
if conn == targets.main.master.connection:
print 'Deleting buckets on {name} (master)'.format(name=name)
nuke_prefixed_buckets_on_conn(prefix, name, conn)
# Then sync to propagate deletes to secondaries
region_sync_meta(targets.main, targets.main.master.connection)
print 'region-sync in nuke_prefixed_buckets'
# Now delete remaining buckets on any other connection
for name, conn in s3.items():
if conn != targets.main.master.connection:
print 'Deleting buckets on {name} (non-master)'.format(name=name)
nuke_prefixed_buckets_on_conn(prefix, name, conn)
print 'Done with cleanup of test buckets.'
class TargetConfig:
def __init__(self, cfg, section):
self.port = None
self.api_name = ''
self.is_master = False
self.is_secure = False
self.sync_agent_addr = None
self.sync_agent_port = 0
self.sync_meta_wait = 0
try:
self.api_name = cfg.get(section, 'api_name')
except (ConfigParser.NoSectionError, ConfigParser.NoOptionError):
pass
try:
self.port = cfg.getint(section, 'port')
except ConfigParser.NoOptionError:
pass
try:
self.host=cfg.get(section, 'host')
except ConfigParser.NoOptionError:
raise RuntimeError(
'host not specified for section {s}'.format(s=section)
)
try:
self.is_master=cfg.getboolean(section, 'is_master')
except ConfigParser.NoOptionError:
pass
try:
self.is_secure=cfg.getboolean(section, 'is_secure')
except ConfigParser.NoOptionError:
pass
try:
raw_calling_format = cfg.get(section, 'calling_format')
except ConfigParser.NoOptionError:
raw_calling_format = 'ordinary'
try:
self.sync_agent_addr = cfg.get(section, 'sync_agent_addr')
except (ConfigParser.NoSectionError, ConfigParser.NoOptionError):
pass
try:
self.sync_agent_port = cfg.getint(section, 'sync_agent_port')
except (ConfigParser.NoSectionError, ConfigParser.NoOptionError):
pass
try:
self.sync_meta_wait = cfg.getint(section, 'sync_meta_wait')
except (ConfigParser.NoSectionError, ConfigParser.NoOptionError):
pass
try:
self.calling_format = calling_formats[raw_calling_format]
except KeyError:
raise RuntimeError(
'calling_format unknown: %r' % raw_calling_format
)
class TargetConnection:
def __init__(self, conf, conn):
self.conf = conf
self.connection = conn
class RegionsInfo:
def __init__(self):
self.m = bunch.Bunch()
self.master = None
self.secondaries = []
def add(self, name, region_config):
self.m[name] = region_config
if (region_config.is_master):
if not self.master is None:
raise RuntimeError(
'multiple regions defined as master'
)
self.master = region_config
else:
self.secondaries.append(region_config)
def get(self, name):
return self.m[name]
def get(self):
return self.m
def iteritems(self):
return self.m.iteritems()
regions = RegionsInfo()
class RegionsConn:
def __init__(self):
self.m = bunch.Bunch()
self.default = None
self.master = None
self.secondaries = []
def iteritems(self):
return self.m.iteritems()
def set_default(self, conn):
self.default = conn
def add(self, name, conn):
self.m[name] = conn
if not self.default:
self.default = conn
if (conn.conf.is_master):
self.master = conn
else:
self.secondaries.append(conn)
# nosetests --processes=N with N>1 is safe
_multiprocess_can_split_ = True
def setup():
cfg = ConfigParser.RawConfigParser()
try:
path = os.environ['S3TEST_CONF']
except KeyError:
raise RuntimeError(
'To run tests, point environment '
+ 'variable S3TEST_CONF to a config file.',
)
with file(path) as f:
cfg.readfp(f)
global prefix
global targets
global slow_backend
try:
template = cfg.get('fixtures', 'bucket prefix')
except (ConfigParser.NoSectionError, ConfigParser.NoOptionError):
template = 'test-{random}-'
prefix = choose_bucket_prefix(template=template)
try:
slow_backend = cfg.getboolean('fixtures', 'slow backend')
except (ConfigParser.NoSectionError, ConfigParser.NoOptionError):
slow_backend = False
# pull the default_region out, if it exists
try:
default_region = cfg.get('fixtures', 'default_region')
except (ConfigParser.NoSectionError, ConfigParser.NoOptionError):
default_region = None
s3.clear()
config.clear()
for section in cfg.sections():
try:
(type_, name) = section.split(None, 1)
except ValueError:
continue
if type_ != 'region':
continue
regions.add(name, TargetConfig(cfg, section))
for section in cfg.sections():
try:
(type_, name) = section.split(None, 1)
except ValueError:
continue
if type_ != 's3':
continue
if len(regions.get()) == 0:
regions.add("default", TargetConfig(cfg, section))
config[name] = bunch.Bunch()
for var in [
'user_id',
'display_name',
'email',
]:
try:
config[name][var] = cfg.get(section, var)
except ConfigParser.NoOptionError:
pass
targets[name] = RegionsConn()
for (k, conf) in regions.iteritems():
conn = boto.s3.connection.S3Connection(
aws_access_key_id=cfg.get(section, 'access_key'),
aws_secret_access_key=cfg.get(section, 'secret_key'),
is_secure=conf.is_secure,
port=conf.port,
host=conf.host,
# TODO test vhost calling format
calling_format=conf.calling_format,
)
temp_targetConn = TargetConnection(conf, conn)
targets[name].add(k, temp_targetConn)
# Explicitly test for and set the default region, if specified.
# If it was not specified, use the 'is_master' flag to set it.
if default_region:
if default_region == name:
targets[name].set_default(temp_targetConn)
elif conf.is_master:
targets[name].set_default(temp_targetConn)
s3[name] = targets[name].default.connection
# WARNING! we actively delete all buckets we see with the prefix
# we've chosen! Choose your prefix with care, and don't reuse
# credentials!
# We also assume nobody else is going to use buckets with that
# prefix. This is racy but given enough randomness, should not
# really fail.
nuke_prefixed_buckets(prefix=prefix)
def teardown():
# remove our buckets here also, to avoid littering
nuke_prefixed_buckets(prefix=prefix)
bucket_counter = itertools.count(1)
def get_new_bucket_name():
"""
Get a bucket name that probably does not exist.
We make every attempt to use a unique random prefix, so if a
bucket by this name happens to exist, it's ok if tests give
false negatives.
"""
name = '{prefix}{num}'.format(
prefix=prefix,
num=next(bucket_counter),
)
return name
def get_new_bucket(target=None, name=None, headers=None):
"""
Get a bucket that exists and is empty.
Always recreates a bucket from scratch. This is useful to also
reset ACLs and such.
"""
if target is None:
target = targets.main.default
connection = target.connection
if name is None:
name = get_new_bucket_name()
# the only way for this to fail with a pre-existing bucket is if
# someone raced us between setup nuke_prefixed_buckets and here;
# ignore that as astronomically unlikely
bucket = connection.create_bucket(name, location=target.conf.api_name, headers=headers)
return bucket