diff --git a/s3tests/common.py b/s3tests/common/__init__.py similarity index 81% rename from s3tests/common.py rename to s3tests/common/__init__.py index 373d191..8eb4232 100644 --- a/s3tests/common.py +++ b/s3tests/common/__init__.py @@ -4,13 +4,24 @@ import itertools import os import random import string +import sys import yaml s3 = bunch.Bunch() config = bunch.Bunch() prefix = '' +# For those scripts that use a context, these are pretty univerally needed. +context = bunch.Bunch( + bucket = None, + + # Save stdout/stderr in case they get fudged with. + real_stdout = sys.stdout, + real_stderr = sys.stderr, +) + bucket_counter = itertools.count(1) +key_counter = itertools.count(1) def choose_bucket_prefix(template, max_len=30): """ @@ -44,13 +55,19 @@ def nuke_prefixed_buckets(): print 'Cleaning bucket {bucket}'.format(bucket=bucket) try: bucket.set_canned_acl('private') - for key in bucket.list(): - print 'Cleaning bucket {bucket} key {key}'.format( - bucket=bucket, - key=key, - ) - key.set_canned_acl('private') - key.delete() + # TODO: deleted_cnt and the while loop is a work around for rgw + # not sending the + deleted_cnt = 1 + while deleted_cnt: + deleted_cnt = 0 + for key in bucket.list(): + print 'Cleaning bucket {bucket} key {key}'.format( + bucket=bucket, + key=key, + ) + key.set_canned_acl('private') + key.delete() + deleted_cnt += 1 bucket.delete() except boto.exception.S3ResponseError as e: # TODO workaround for buggy rgw that fails to send @@ -156,3 +173,12 @@ def get_new_bucket(connection=None): def teardown(): nuke_prefixed_buckets() + +def fill_pools(*args): + for pool in args: + while not pool.full(): + pool.spawn() + +def get_next_key(bucket=None): + return bucket.new_key("seqkey-{num}".format(num=next(key_counter))) + diff --git a/s3tests/common/greenlets.py b/s3tests/common/greenlets.py new file mode 100644 index 0000000..e4e52bc --- /dev/null +++ b/s3tests/common/greenlets.py @@ -0,0 +1,93 @@ +import bunch +import collections +import gevent +import random +import time +import traceback + +from ..common import context, get_next_key +from ..common.results import TransferGreenletResult +from ..realistic import FileVerifier + + +# Make sure context has somewhere to store what we need +context.update(bunch.Bunch( + needs_first_read = collections.deque(), + all_keys = [], + files_iter = None, +)) + + +class SafeTransferGreenlet(gevent.Greenlet): + def __init__(self, timeout=120): + gevent.Greenlet.__init__(self) + self.timeout = timeout + self.result = None + + def _run(self): + result = self.result = TransferGreenletResult(self.type) + result.markStarted() + + try: + with gevent.Timeout(self.timeout, False): + result.success = self._doit() + except gevent.GreenletExit: + return + except: + result.setError(show_traceback=True) + + result.markFinished() + + +class ReaderGreenlet(SafeTransferGreenlet): + type = 'reader' + + def _doit(self): + if context.needs_first_read: + key = context.needs_first_read.popleft() + elif context.all_keys: + key = random.choice(context.all_keys) + else: + time.sleep(1) + return self.result.setError('No available keys to test with reader. Try again later.') + + # Copynew the key object + key = key.bucket.new_key(key.name) + self.result.setKey(key) + + fp = FileVerifier() + + key.get_contents_to_file(fp) + + self.result.request_finish = time.time() + self.result.request_start = fp.created_at + self.result.chunks = fp.chunks + self.result.size = fp.size + + if not fp.valid(): + return self.result.setError('Failed to validate key {name!s}'.format(name=key.name)) + + return True + + +class WriterGreenlet(SafeTransferGreenlet): + type = 'writer' + + def _doit(self): + key = get_next_key(context.bucket) + self.result.setKey(key) + + fp = next(context.files_iter) + self.result.size = fp.size + + key.set_contents_from_file(fp) + + self.result.request_finish = time.time() + self.result.request_start = fp.start_time + self.result.chunks = fp.last_chunks + + # And at the end, add to needs_first_read and shuffle + context.needs_first_read.append(key) + context.all_keys.append(key) + + return True diff --git a/s3tests/common/results.py b/s3tests/common/results.py new file mode 100644 index 0000000..b9aa9ce --- /dev/null +++ b/s3tests/common/results.py @@ -0,0 +1,112 @@ +import bunch +import collections +import gevent +import sys +import time +import traceback +import yaml + +from ..common import context + +context.update(bunch.Bunch( + result_queue = collections.deque(), +)) + + +class TransferGreenletResult(object): + """ Generic container object. Weeeeeeeeeeeeeee *short* """ + def __init__(self, type): + # About the Greenlet + self.type = type + + # About the key + self.bucket = None + self.key = None + self.size = None + + # About the job + self.success = False + self.error = None + + self.start_time = None + self.finish_time = None + + self.duration = None + self.latency = None + + self.request_start = None + self.request_finish = None + + self.chunks = None + + def markStarted(self): + self.start_time = time.time() + + def markFinished(self): + self.finish_time = time.time() + self.duration = self.finish_time - self.start_time + context.result_queue.append(self) + + def setKey(self, key): + self.key = key.name + self.bucket = key.bucket.name + + def setError(self, message='Unhandled Exception', show_traceback=False): + """ Sets an error state in the result, and returns False... example usage: + + return self.result.setError('Something happened', traceback=True) + """ + self.error = dict() + self.error['msg'] = message + if show_traceback: + self.error['traceback'] = traceback.format_exc() + return False + + @classmethod + def repr_yaml(c, dumper, self): + data = dict() + for x in ('type', 'bucket', 'key', 'chunks'): + data[x] = self.__dict__[x] + + # reader => r, writer => w + data['type'] = data['type'][0]#chunks + + # the error key must be present ONLY on failure. + assert not (self.success and self.error) + if self.success: + assert self.error == None + else: + assert self.error != None + data['error'] = self.error + + data['start'] = self.request_start + if self.request_finish: + data['duration'] = 1000000000 * (self.request_finish - self.request_start) + + return dumper.represent_dict(data) + +# And a representer for dumping a TransferGreenletResult as a YAML dict() +yaml.add_representer(TransferGreenletResult, TransferGreenletResult.repr_yaml) + + +class ResultsLogger(gevent.Greenlet): + """ A quick little greenlet to always run and dump results. """ + def __init__(self): + gevent.Greenlet.__init__(self) + self.outfile = context.real_stdout + + def _run(self): + while True: + try: + self._doit() + except: + print "An exception was encountered while dumping the results... this shouldn't happen!" + traceback.print_exc() + time.sleep(0.1) + + def _doit(self): + while context.result_queue: + result = context.result_queue.popleft() + yrep = yaml.dump(result) + self.outfile.write(yrep + "---\n") + diff --git a/s3tests/realistic.py b/s3tests/realistic.py index e5d234b..0755516 100644 --- a/s3tests/realistic.py +++ b/s3tests/realistic.py @@ -1,18 +1,25 @@ +import bunch import hashlib import random import string import struct +import time class RandomContentFile(object): def __init__(self, size, seed): + self.size = size self.seed = seed self.random = random.Random(self.seed) - self.offset = 0 - self.buffer = '' - self.size = size - self.hash = hashlib.md5() - self.digest_size = self.hash.digest_size - self.digest = None + + # Boto likes to seek once more after it's done reading, so we need to save the last chunks/seek value. + self.last_chunks = self.chunks = None + self.last_seek = self.start_time = None + + # Let seek initialize the rest of it, rather than dup code + self.seek(0) + + def _mark_chunk(self): + self.chunks.append([self.offset, (time.time() - self.last_seek) * 1000000000]) def seek(self, offset): assert offset == 0 @@ -20,6 +27,17 @@ class RandomContentFile(object): self.offset = offset self.buffer = '' + self.hash = hashlib.md5() + self.digest_size = self.hash.digest_size + self.digest = None + + # Save the last seek time as our start time, and the last chunks + self.start_time = self.last_seek + self.last_chunks = self.chunks + # Before emptying. + self.last_seek = time.time() + self.chunks = [] + def tell(self): return self.offset @@ -58,6 +76,8 @@ class RandomContentFile(object): data = self.digest[:digest_count] r.append(data) + self._mark_chunk() + return ''.join(r) class FileVerifier(object): @@ -65,6 +85,11 @@ class FileVerifier(object): self.size = 0 self.hash = hashlib.md5() self.buf = '' + self.created_at = time.time() + self.chunks = [] + + def _mark_chunk(self): + self.chunks.append([self.size, (time.time() - self.created_at) * 1000000000]) def write(self, data): self.size += len(data) @@ -72,6 +97,7 @@ class FileVerifier(object): digsz = -1*self.hash.digest_size new_data, self.buf = self.buf[0:digsz], self.buf[digsz:] self.hash.update(new_data) + self._mark_chunk() def valid(self): """ @@ -123,7 +149,54 @@ def names(mean, stddev, charset=None, seed=None): while True: while True: length = int(rand.normalvariate(mean, stddev)) - if length >= 0: + if length > 0: break name = ''.join(rand.choice(charset) for _ in xrange(length)) yield name + +def files_varied(groups, unlimited=False): + """ Yields a weighted-random selection of file-like objects. """ + # Quick data type sanity. + assert groups and isinstance(groups, (list, tuple)) + + total_num = 0 + file_sets = [] + rand = random.Random(time.time()) + + # Build the sets for our yield + for num, size, stddev in groups: + assert num and size + + file_sets.append(bunch.Bunch( + num = num, + size = size, + stddev = stddev, + files = files(size, stddev, time.time()) + )) + total_num += num + + while True: + if not total_num: + raise StopIteration + + num = rand.randrange(total_num) + + ok = 0 + for file_set in file_sets: + if num > file_set.num: + num -= file_set.num + continue + + if not unlimited: + total_num -= 1 + file_set.num -= 1 + + # None left in this set! + if file_set.num == 0: + file_sets.remove(file_set) + + ok = 1 + yield next(file_set.files) + + if not ok: + raise RuntimeError, "Couldn't find a match." diff --git a/s3tests/roundtrip.py b/s3tests/roundtrip.py new file mode 100644 index 0000000..69b35a1 --- /dev/null +++ b/s3tests/roundtrip.py @@ -0,0 +1,99 @@ +import gevent.monkey +gevent.monkey.patch_all() + +import bunch +import collections +import gevent +import gevent.pool +import itertools +import random +import realistic +import time +import traceback +import sys + +import common +from common import context, config +from common.greenlets import ReaderGreenlet, WriterGreenlet +from common.results import ResultsLogger + +# Set up the common context to use our information. Wee. +context.update(bunch.Bunch( + # Set to False when it's time to exit main loop. + running = True, + + # The pools our tasks run in. + greenlet_pools = bunch.Bunch( + writer=None, + reader=None, + ), + + # The greenlet that keeps logs going. + results_logger = None, +)) + + +def setup(): + config_rt = config.roundtrip + + context.bucket = common.get_new_bucket() + print "Using bucket: {name}".format(name=context.bucket.name) + + context.greenlet_pools.reader = gevent.pool.Pool(config_rt.pool_sizes.reader, ReaderGreenlet) + context.greenlet_pools.writer = gevent.pool.Pool(config_rt.pool_sizes.writer, WriterGreenlet) + + context.key_iter = itertools.count(1) + context.files_iter = realistic.files_varied(config_rt.create_objects) + + +def _main(): + def _stop_running(): + """ Since we can't do assignment in a lambda, we have this little stub """ + context.running = False + + grace_period = config.roundtrip.grace_wait + + print "Launching/Scheduling essential services..." + gevent.spawn_later(config.roundtrip.duration + grace_period, _stop_running) + context.results_logger = ResultsLogger.spawn() + + print "Launching the pool of writers, and giving them {grace} seconds to get ahead of us!".format(grace=grace_period) + writers_start_time = time.time() + while time.time() - writers_start_time < grace_period: + common.fill_pools(context.greenlet_pools.writer) + time.sleep(0.1) + + # Main work loop. + print "Starting main work loop..." + while context.running: + common.fill_pools(*context.greenlet_pools.values()) + time.sleep(0.1) + + print "We've hit duration. Time to stop!" + print "Waiting {grace} seconds for jobs to finish normally.".format(grace=grace_period) + time.sleep(grace_period) + + print "Killing off any remaining jobs." + context.greenlet_pools.reader.kill() + context.greenlet_pools.writer.kill() + + print "Waiting 10 seconds for them to finish dying off and collections to complete!" + time.sleep(10) + + print "Killing essential services..." + context.results_logger.kill() + + print "Done!" + + +def main(): + sys.stdout = sys.stderr # Original steam already saved by common + common.setup() + setup() + + # Normal + try: + _main() + except: + traceback.print_exc() + common.teardown() diff --git a/setup.py b/setup.py index de01ab6..686fadd 100644 --- a/setup.py +++ b/setup.py @@ -23,6 +23,7 @@ setup( 'console_scripts': [ 's3tests-generate-objects = s3tests.generate_objects:main', 's3tests-test-readwrite = s3tests.rand_readwrite:main', + 's3tests-test-roundtrip = s3tests.roundtrip:main', ], },