s3-tests/s3tests/functional/__init__.py

484 lines
16 KiB
Python
Raw Permalink Normal View History

from __future__ import print_function
import sys
import ConfigParser
import boto.exception
import boto.s3.connection
import bunch
import itertools
import os
import random
import string
from httplib import HTTPConnection, HTTPSConnection
from urlparse import urlparse
from .utils import region_sync_meta
s3 = bunch.Bunch()
config = bunch.Bunch()
targets = bunch.Bunch()
# this will be assigned by setup()
prefix = None
calling_formats = dict(
ordinary=boto.s3.connection.OrdinaryCallingFormat(),
subdomain=boto.s3.connection.SubdomainCallingFormat(),
vhost=boto.s3.connection.VHostCallingFormat(),
)
def get_prefix():
assert prefix is not None
return prefix
def is_slow_backend():
return slow_backend
def choose_bucket_prefix(template, max_len=30):
"""
Choose a prefix for our test buckets, so they're easy to identify.
Use template and feed it more and more random filler, until it's
as long as possible but still below max_len.
"""
rand = ''.join(
random.choice(string.ascii_lowercase + string.digits)
for c in range(255)
)
while rand:
s = template.format(random=rand)
if len(s) <= max_len:
return s
rand = rand[:-1]
raise RuntimeError(
'Bucket prefix template is impossible to fulfill: {template!r}'.format(
template=template,
),
)
def nuke_prefixed_buckets_on_conn(prefix, name, conn):
print('Cleaning buckets from connection {name} prefix {prefix!r}.'.format(
name=name,
prefix=prefix,
))
for bucket in conn.get_all_buckets():
print('prefix=',prefix)
if bucket.name.startswith(prefix):
print('Cleaning bucket {bucket}'.format(bucket=bucket))
success = False
for i in xrange(2):
try:
try:
iterator = iter(bucket.list_versions())
# peek into iterator to issue list operation
try:
keys = itertools.chain([next(iterator)], iterator)
except StopIteration:
keys = [] # empty iterator
except boto.exception.S3ResponseError as e:
# some S3 implementations do not support object
# versioning - fall back to listing without versions
if e.error_code != 'NotImplemented':
raise e
keys = bucket.list();
for key in keys:
print('Cleaning bucket {bucket} key {key}'.format(
bucket=bucket,
key=key,
))
# key.set_canned_acl('private')
bucket.delete_key(key.name, version_id = key.version_id)
bucket.delete()
success = True
except boto.exception.S3ResponseError as e:
if e.error_code != 'AccessDenied':
print('GOT UNWANTED ERROR', e.error_code)
raise
# seems like we don't have permissions set appropriately, we'll
# modify permissions and retry
pass
if success:
break
bucket.set_canned_acl('private')
def nuke_prefixed_buckets(prefix):
# If no regions are specified, use the simple method
if targets.main.master == None:
for name, conn in s3.items():
print('Deleting buckets on {name}'.format(name=name))
nuke_prefixed_buckets_on_conn(prefix, name, conn)
else:
# First, delete all buckets on the master connection
for name, conn in s3.items():
if conn == targets.main.master.connection:
print('Deleting buckets on {name} (master)'.format(name=name))
nuke_prefixed_buckets_on_conn(prefix, name, conn)
# Then sync to propagate deletes to secondaries
region_sync_meta(targets.main, targets.main.master.connection)
print('region-sync in nuke_prefixed_buckets')
# Now delete remaining buckets on any other connection
for name, conn in s3.items():
if conn != targets.main.master.connection:
print('Deleting buckets on {name} (non-master)'.format(name=name))
nuke_prefixed_buckets_on_conn(prefix, name, conn)
print('Done with cleanup of test buckets.')
class TargetConfig:
def __init__(self, cfg, section):
self.port = None
self.api_name = ''
self.is_master = False
self.is_secure = False
self.sync_agent_addr = None
self.sync_agent_port = 0
self.sync_meta_wait = 0
try:
self.api_name = cfg.get(section, 'api_name')
except (ConfigParser.NoSectionError, ConfigParser.NoOptionError):
pass
try:
self.port = cfg.getint(section, 'port')
except ConfigParser.NoOptionError:
pass
try:
self.host=cfg.get(section, 'host')
except ConfigParser.NoOptionError:
raise RuntimeError(
'host not specified for section {s}'.format(s=section)
)
try:
self.is_master=cfg.getboolean(section, 'is_master')
except ConfigParser.NoOptionError:
pass
try:
self.is_secure=cfg.getboolean(section, 'is_secure')
except ConfigParser.NoOptionError:
pass
try:
raw_calling_format = cfg.get(section, 'calling_format')
except ConfigParser.NoOptionError:
raw_calling_format = 'ordinary'
try:
self.sync_agent_addr = cfg.get(section, 'sync_agent_addr')
except (ConfigParser.NoSectionError, ConfigParser.NoOptionError):
pass
try:
self.sync_agent_port = cfg.getint(section, 'sync_agent_port')
except (ConfigParser.NoSectionError, ConfigParser.NoOptionError):
pass
try:
self.sync_meta_wait = cfg.getint(section, 'sync_meta_wait')
except (ConfigParser.NoSectionError, ConfigParser.NoOptionError):
pass
try:
self.calling_format = calling_formats[raw_calling_format]
except KeyError:
raise RuntimeError(
'calling_format unknown: %r' % raw_calling_format
)
class TargetConnection:
def __init__(self, conf, conn):
self.conf = conf
self.connection = conn
class RegionsInfo:
def __init__(self):
self.m = bunch.Bunch()
self.master = None
self.secondaries = []
def add(self, name, region_config):
self.m[name] = region_config
if (region_config.is_master):
if not self.master is None:
raise RuntimeError(
'multiple regions defined as master'
)
self.master = region_config
else:
self.secondaries.append(region_config)
def get(self, name):
return self.m[name]
def get(self):
return self.m
def iteritems(self):
return self.m.iteritems()
regions = RegionsInfo()
class RegionsConn:
def __init__(self):
self.m = bunch.Bunch()
self.default = None
self.master = None
self.secondaries = []
def iteritems(self):
return self.m.iteritems()
def set_default(self, conn):
self.default = conn
def add(self, name, conn):
self.m[name] = conn
if not self.default:
self.default = conn
if (conn.conf.is_master):
self.master = conn
else:
self.secondaries.append(conn)
# nosetests --processes=N with N>1 is safe
_multiprocess_can_split_ = True
def setup():
cfg = ConfigParser.RawConfigParser()
try:
path = os.environ['S3TEST_CONF']
except KeyError:
raise RuntimeError(
'To run tests, point environment '
+ 'variable S3TEST_CONF to a config file.',
)
with file(path) as f:
cfg.readfp(f)
global prefix
global targets
global slow_backend
try:
template = cfg.get('fixtures', 'bucket prefix')
except (ConfigParser.NoSectionError, ConfigParser.NoOptionError):
template = 'test-{random}-'
prefix = choose_bucket_prefix(template=template)
try:
slow_backend = cfg.getboolean('fixtures', 'slow backend')
except (ConfigParser.NoSectionError, ConfigParser.NoOptionError):
slow_backend = False
# pull the default_region out, if it exists
try:
default_region = cfg.get('fixtures', 'default_region')
except (ConfigParser.NoSectionError, ConfigParser.NoOptionError):
default_region = None
s3.clear()
config.clear()
for section in cfg.sections():
try:
(type_, name) = section.split(None, 1)
except ValueError:
continue
if type_ != 'region':
continue
regions.add(name, TargetConfig(cfg, section))
for section in cfg.sections():
try:
(type_, name) = section.split(None, 1)
except ValueError:
continue
if type_ != 's3':
continue
if len(regions.get()) == 0:
regions.add("default", TargetConfig(cfg, section))
config[name] = bunch.Bunch()
for var in [
'user_id',
'display_name',
'email',
's3website_domain',
'host',
'port',
'is_secure',
'kms_keyid',
]:
try:
config[name][var] = cfg.get(section, var)
except ConfigParser.NoOptionError:
pass
targets[name] = RegionsConn()
for (k, conf) in regions.iteritems():
conn = boto.s3.connection.S3Connection(
aws_access_key_id=cfg.get(section, 'access_key'),
aws_secret_access_key=cfg.get(section, 'secret_key'),
is_secure=conf.is_secure,
port=conf.port,
host=conf.host,
# TODO test vhost calling format
calling_format=conf.calling_format,
)
temp_targetConn = TargetConnection(conf, conn)
targets[name].add(k, temp_targetConn)
# Explicitly test for and set the default region, if specified.
# If it was not specified, use the 'is_master' flag to set it.
if default_region:
if default_region == name:
targets[name].set_default(temp_targetConn)
elif conf.is_master:
targets[name].set_default(temp_targetConn)
s3[name] = targets[name].default.connection
# WARNING! we actively delete all buckets we see with the prefix
# we've chosen! Choose your prefix with care, and don't reuse
# credentials!
# We also assume nobody else is going to use buckets with that
# prefix. This is racy but given enough randomness, should not
# really fail.
nuke_prefixed_buckets(prefix=prefix)
def teardown():
# remove our buckets here also, to avoid littering
nuke_prefixed_buckets(prefix=prefix)
bucket_counter = itertools.count(1)
def get_new_bucket_name():
"""
Get a bucket name that probably does not exist.
We make every attempt to use a unique random prefix, so if a
bucket by this name happens to exist, it's ok if tests give
false negatives.
"""
name = '{prefix}{num}'.format(
prefix=prefix,
num=next(bucket_counter),
)
return name
def get_new_bucket(target=None, name=None, headers=None):
"""
Get a bucket that exists and is empty.
Always recreates a bucket from scratch. This is useful to also
reset ACLs and such.
"""
if target is None:
target = targets.main.default
connection = target.connection
if name is None:
name = get_new_bucket_name()
# the only way for this to fail with a pre-existing bucket is if
# someone raced us between setup nuke_prefixed_buckets and here;
# ignore that as astronomically unlikely
bucket = connection.create_bucket(name, location=target.conf.api_name, headers=headers)
return bucket
def _make_request(method, bucket, key, body=None, authenticated=False, response_headers=None, request_headers=None, expires_in=100000, path_style=True, timeout=None):
"""
issue a request for a specified method, on a specified <bucket,key>,
with a specified (optional) body (encrypted per the connection), and
return the response (status, reason).
If key is None, then this will be treated as a bucket-level request.
If the request or response headers are None, then default values will be
provided by later methods.
"""
if not path_style:
conn = bucket.connection
request_headers['Host'] = conn.calling_format.build_host(conn.server_name(), bucket.name)
if authenticated:
urlobj = None
if key is not None:
urlobj = key
elif bucket is not None:
urlobj = bucket
else:
raise RuntimeError('Unable to find bucket name')
url = urlobj.generate_url(expires_in, method=method, response_headers=response_headers, headers=request_headers)
o = urlparse(url)
path = o.path + '?' + o.query
else:
bucketobj = None
if key is not None:
path = '/{obj}'.format(obj=key.name)
bucketobj = key.bucket
elif bucket is not None:
path = '/'
bucketobj = bucket
else:
raise RuntimeError('Unable to find bucket name')
if path_style:
path = '/{bucket}'.format(bucket=bucketobj.name) + path
return _make_raw_request(host=s3.main.host, port=s3.main.port, method=method, path=path, body=body, request_headers=request_headers, secure=s3.main.is_secure, timeout=timeout)
def _make_bucket_request(method, bucket, body=None, authenticated=False, response_headers=None, request_headers=None, expires_in=100000, path_style=True, timeout=None):
"""
issue a request for a specified method, on a specified <bucket>,
with a specified (optional) body (encrypted per the connection), and
return the response (status, reason)
"""
return _make_request(method=method, bucket=bucket, key=None, body=body, authenticated=authenticated, response_headers=response_headers, request_headers=request_headers, expires_in=expires_in, path_style=path_style, timeout=timeout)
def _make_raw_request(host, port, method, path, body=None, request_headers=None, secure=False, timeout=None):
"""
issue a request to a specific host & port, for a specified method, on a
specified path with a specified (optional) body (encrypted per the
connection), and return the response (status, reason).
This allows construction of special cases not covered by the bucket/key to
URL mapping of _make_request/_make_bucket_request.
"""
if secure:
class_ = HTTPSConnection
else:
class_ = HTTPConnection
if request_headers is None:
request_headers = {}
c = class_(host, port, strict=True, timeout=timeout)
# TODO: We might have to modify this in future if we need to interact with
# how httplib.request handles Accept-Encoding and Host.
c.request(method, path, body=body, headers=request_headers)
res = c.getresponse()
#c.close()
print(res.status, res.reason)
return res