diff --git a/s3tests/functional/__init__.py b/s3tests/functional/__init__.py index e69de29..3c314d0 100644 --- a/s3tests/functional/__init__.py +++ b/s3tests/functional/__init__.py @@ -0,0 +1,173 @@ +import ConfigParser +import boto.exception +import boto.s3.connection +import bunch +import itertools +import os +import random +import string + +s3 = bunch.Bunch() +config = bunch.Bunch() + +# this will be assigned by setup() +prefix = None + + +def choose_bucket_prefix(template, max_len=30): + """ + Choose a prefix for our test buckets, so they're easy to identify. + + Use template and feed it more and more random filler, until it's + as long as possible but still below max_len. + """ + rand = ''.join( + random.choice(string.ascii_lowercase + string.digits) + for c in range(255) + ) + + while rand: + s = template.format(random=rand) + if len(s) <= max_len: + return s + rand = rand[:-1] + + raise RuntimeError( + 'Bucket prefix template is impossible to fulfill: {template!r}'.format( + template=template, + ), + ) + + +def nuke_prefixed_buckets(prefix): + for name, conn in s3.items(): + print 'Cleaning buckets from connection {name} prefix {prefix!r}.'.format( + name=name, + prefix=prefix, + ) + for bucket in conn.get_all_buckets(): + if bucket.name.startswith(prefix): + print 'Cleaning bucket {bucket}'.format(bucket=bucket) + try: + bucket.set_canned_acl('private') + for key in bucket.list(): + print 'Cleaning bucket {bucket} key {key}'.format( + bucket=bucket, + key=key, + ) + key.set_canned_acl('private') + key.delete() + bucket.delete() + except boto.exception.S3ResponseError as e: + if e.error_code != 'AccessDenied': + print 'GOT UNWANTED ERROR', e.error_code + raise + # seems like we're not the owner of the bucket; ignore + pass + + print 'Done with cleanup of test buckets.' + + +def setup(): + + cfg = ConfigParser.RawConfigParser() + try: + path = os.environ['S3TEST_CONF'] + except KeyError: + raise RuntimeError( + 'To run tests, point environment ' + + 'variable S3TEST_CONF to a config file.', + ) + with file(path) as f: + cfg.readfp(f) + + global prefix + try: + template = cfg.get('fixtures', 'bucket prefix') + except (ConfigParser.NoSectionError, ConfigParser.NoOptionError): + template = 'test-{random}-' + prefix = choose_bucket_prefix(template=template) + + s3.clear() + config.clear() + for section in cfg.sections(): + try: + (type_, name) = section.split(None, 1) + except ValueError: + continue + if type_ != 's3': + continue + try: + port = cfg.getint(section, 'port') + except ConfigParser.NoOptionError: + port = None + + config[name] = bunch.Bunch() + for var in [ + 'user_id', + 'display_name', + 'email', + ]: + try: + config[name][var] = cfg.get(section, var) + except ConfigParser.NoOptionError: + pass + conn = boto.s3.connection.S3Connection( + aws_access_key_id=cfg.get(section, 'access_key'), + aws_secret_access_key=cfg.get(section, 'secret_key'), + is_secure=cfg.getboolean(section, 'is_secure'), + port=port, + host=cfg.get(section, 'host'), + # TODO support & test all variations + calling_format=boto.s3.connection.OrdinaryCallingFormat(), + ) + s3[name] = conn + + # WARNING! we actively delete all buckets we see with the prefix + # we've chosen! Choose your prefix with care, and don't reuse + # credentials! + + # We also assume nobody else is going to use buckets with that + # prefix. This is racy but given enough randomness, should not + # really fail. + nuke_prefixed_buckets(prefix=prefix) + + +def teardown(): + # remove our buckets here also, to avoid littering + nuke_prefixed_buckets(prefix=prefix) + + +bucket_counter = itertools.count(1) + + +def get_new_bucket_name(): + """ + Get a bucket name that probably does not exist. + + We make every attempt to use a unique random prefix, so if a + bucket by this name happens to exist, it's ok if tests give + false negatives. + """ + name = '{prefix}{num}'.format( + prefix=prefix, + num=next(bucket_counter), + ) + return name + + +def get_new_bucket(connection=None): + """ + Get a bucket that exists and is empty. + + Always recreates a bucket from scratch. This is useful to also + reset ACLs and such. + """ + if connection is None: + connection = s3.main + name = get_new_bucket_name() + # the only way for this to fail with a pre-existing bucket is if + # someone raced us between setup nuke_prefixed_buckets and here; + # ignore that as astronomically unlikely + bucket = connection.create_bucket(name) + return bucket diff --git a/s3tests/functional/test_s3.py b/s3tests/functional/test_s3.py index 2305390..d7d557e 100644 --- a/s3tests/functional/test_s3.py +++ b/s3tests/functional/test_s3.py @@ -1,13 +1,10 @@ from cStringIO import StringIO -import ConfigParser import boto.exception import boto.s3.connection import boto.s3.acl import bunch -import itertools import nose import operator -import os import random import string import socket @@ -15,178 +12,22 @@ import socket from nose.tools import eq_ as eq from nose.plugins.attrib import attr -from utils import assert_raises +from .utils import assert_raises import AnonymousAuth from email.header import decode_header +from . import ( + nuke_prefixed_buckets, + get_new_bucket, + s3, + config, + prefix, + ) + + NONEXISTENT_EMAIL = 'doesnotexist@dreamhost.com.invalid' -s3 = bunch.Bunch() -config = bunch.Bunch() - -# this will be assigned by setup() -prefix = None - - -def choose_bucket_prefix(template, max_len=30): - """ - Choose a prefix for our test buckets, so they're easy to identify. - - Use template and feed it more and more random filler, until it's - as long as possible but still below max_len. - """ - rand = ''.join( - random.choice(string.ascii_lowercase + string.digits) - for c in range(255) - ) - - while rand: - s = template.format(random=rand) - if len(s) <= max_len: - return s - rand = rand[:-1] - - raise RuntimeError( - 'Bucket prefix template is impossible to fulfill: {template!r}'.format( - template=template, - ), - ) - - -def nuke_prefixed_buckets(prefix): - for name, conn in s3.items(): - print 'Cleaning buckets from connection {name} prefix {prefix!r}.'.format( - name=name, - prefix=prefix, - ) - for bucket in conn.get_all_buckets(): - if bucket.name.startswith(prefix): - print 'Cleaning bucket {bucket}'.format(bucket=bucket) - try: - bucket.set_canned_acl('private') - for key in bucket.list(): - print 'Cleaning bucket {bucket} key {key}'.format( - bucket=bucket, - key=key, - ) - key.set_canned_acl('private') - key.delete() - bucket.delete() - except boto.exception.S3ResponseError as e: - if e.error_code != 'AccessDenied': - print 'GOT UNWANTED ERROR', e.error_code - raise - # seems like we're not the owner of the bucket; ignore - pass - - print 'Done with cleanup of test buckets.' - - -def setup(): - - cfg = ConfigParser.RawConfigParser() - try: - path = os.environ['S3TEST_CONF'] - except KeyError: - raise RuntimeError( - 'To run tests, point environment ' - + 'variable S3TEST_CONF to a config file.', - ) - with file(path) as f: - cfg.readfp(f) - - global prefix - try: - template = cfg.get('fixtures', 'bucket prefix') - except (ConfigParser.NoSectionError, ConfigParser.NoOptionError): - template = 'test-{random}-' - prefix = choose_bucket_prefix(template=template) - - s3.clear() - config.clear() - for section in cfg.sections(): - try: - (type_, name) = section.split(None, 1) - except ValueError: - continue - if type_ != 's3': - continue - try: - port = cfg.getint(section, 'port') - except ConfigParser.NoOptionError: - port = None - - config[name] = bunch.Bunch() - for var in [ - 'user_id', - 'display_name', - 'email', - ]: - try: - config[name][var] = cfg.get(section, var) - except ConfigParser.NoOptionError: - pass - conn = boto.s3.connection.S3Connection( - aws_access_key_id=cfg.get(section, 'access_key'), - aws_secret_access_key=cfg.get(section, 'secret_key'), - is_secure=cfg.getboolean(section, 'is_secure'), - port=port, - host=cfg.get(section, 'host'), - # TODO support & test all variations - calling_format=boto.s3.connection.OrdinaryCallingFormat(), - ) - s3[name] = conn - - # WARNING! we actively delete all buckets we see with the prefix - # we've chosen! Choose your prefix with care, and don't reuse - # credentials! - - # We also assume nobody else is going to use buckets with that - # prefix. This is racy but given enough randomness, should not - # really fail. - nuke_prefixed_buckets(prefix=prefix) - - -def teardown(): - # remove our buckets here also, to avoid littering - nuke_prefixed_buckets(prefix=prefix) - - -bucket_counter = itertools.count(1) - - -def get_new_bucket_name(): - """ - Get a bucket name that probably does not exist. - - We make every attempt to use a unique random prefix, so if a - bucket by this name happens to exist, it's ok if tests give - false negatives. - """ - name = '{prefix}{num}'.format( - prefix=prefix, - num=next(bucket_counter), - ) - return name - - -def get_new_bucket(connection=None): - """ - Get a bucket that exists and is empty. - - Always recreates a bucket from scratch. This is useful to also - reset ACLs and such. - """ - if connection is None: - connection = s3.main - name = get_new_bucket_name() - # the only way for this to fail with a pre-existing bucket is if - # someone raced us between setup nuke_prefixed_buckets and here; - # ignore that as astronomically unlikely - bucket = connection.create_bucket(name) - return bucket - def check_access_denied(fn, *args, **kwargs): e = assert_raises(boto.exception.S3ResponseError, fn, *args, **kwargs)