diff --git a/s3tests/common/greenlets.py b/s3tests/common/greenlets.py new file mode 100644 index 0000000..3191097 --- /dev/null +++ b/s3tests/common/greenlets.py @@ -0,0 +1,116 @@ +import bunch +import collections +import gevent +import random +import time +import traceback + +from ..common import context, get_next_key +from ..common.results import TransferGreenletResult +from ..realistic import FileVerifier + +# Make sure context has somewhere to store what we need +context.update(bunch.Bunch( + neads_first_read = collections.deque(), + all_keys = [], + files_iter = None, +)) + +class SafeTransferGreenlet(gevent.Greenlet): + def __init__(self, timeout=120): + gevent.Greenlet.__init__(self) + self.timeout = timeout + self.result = None + self.key = None # We store key in case we ned to retry due to gevent being a jerk + + def _run(self): + """ A runner loop... using gevent creates a fun little bug where if two gevents try to + do the same op (reading, for ex), it raises an AssertionError rather than just switching + contexts again. Oh joy. + + To combat this, we've put the main work to do in _call_doit, which handles detecting the + gevent quirk, and we'll retry as long as _call_doit requests that we retry, as indicated + by _call_doit returning True. + """ + while self._call_doit(): + time.sleep(0.1) + + def _call_doit(self): + """ Return True if we need to retry, False otherwise. """ + result = self.result = TransferGreenletResult(self.type) + result.start_time = time.time() + + try: + with gevent.Timeout(self.timeout, False): + result.success = self._doit() + except gevent.GreenletExit: + # We don't want to retry, as it's time to exit, but we also don't want to count + # this as a failure. + return False + except AssertionError as e: + # If we've raised this damn gevent error, we simply need to retry. + if e.args[0].startswith('This event is already used by another greenlet'): + return True # retry + # Different assertion error, so fail normally. + result.comment = traceback.format_exc() + except Exception: + result.comment = traceback.format_exc() + + result.finish_time = time.time() + result.duration = result.finish_time - result.start_time + result.queue_finished() + return False # don't retry + + +class ReaderGreenlet(SafeTransferGreenlet): + type = 'reader' + + def _doit(self): + if self.key: + key = self.key + elif context.neads_first_read: + key = context.neads_first_read.popleft() + elif context.all_keys: + key = random.choice(context.all_keys) + else: + time.sleep(1) + self.result.comment = 'No available keys to test with reader. Try again later.' + return False + + self.key = key + fp = FileVerifier() + self.result.name = key.name + + request_start = time.time() + key.get_contents_to_file(fp) + self.result.size = fp.size + self.result.latency = fp.first_write - request_start + + if not fp.valid(): + self.result.comment = 'Failed to validate key {name!s}'.format(name=key.name) + return False + + return True + + +class WriterGreenlet(SafeTransferGreenlet): + type = 'writer' + + def _doit(self): + if self.key: + key = self.key + else: + key = self.key = get_next_key(context.bucket) + + fp = next(context.files_iter) + self.result.name = key.name + self.result.size = fp.size + + key.set_contents_from_file(fp) + self.result.latency = time.time() - fp.last_read + + # And at the end, add to neads_first_read and shuffle + context.neads_first_read.append(key) + context.all_keys.append(key) + + return True diff --git a/s3tests/common/results.py b/s3tests/common/results.py new file mode 100644 index 0000000..684bfc5 --- /dev/null +++ b/s3tests/common/results.py @@ -0,0 +1,69 @@ +import bunch +import collections +import gevent +import time +import traceback +import yaml + +from ..common import context + +# Make sure context has somewhere to store what we need +context.update(bunch.Bunch( + result_queue = collections.deque(), +)) + + +class TransferGreenletResult(object): + """ Generic container object. Weeeeeeeeeeeeeee *short* """ + def __init__(self, type): + # About the key + self.name = None + self.size = None + + # About the job + self.type = type + self.success = False + self.comment = None + self.start_time = None + self.finish_time = None + + self.latency = None + self.duration = None + + def __repr__(self): + d = self.__dict__ + d['success'] = d['success'] and 'ok' or 'FAILED' + + return self._format.format(**d) + + def queue_finished(self): + context.result_queue.append(self) + + +# And a representer for dumping a TransferGreenletResult as a YAML dict() +yaml.add_representer(TransferGreenletResult, lambda dumper, data: dumper.represent_dict(data.__dict__) ) + + +class ResultsLogger(gevent.Greenlet): + """ A quick little greenlet to always run and dump results. """ + def __init__(self): + gevent.Greenlet.__init__(self) + self.outfile = None + + def _run(self): + while True: + try: + self._doit() + except: + print "An exception was encountered while dumping the results... this shouldn't happen!" + traceback.print_exc() + time.sleep(0.1) + + def _doit(self): + while context.result_queue: + result = context.result_queue.popleft() + yrep = yaml.dump(result) + if self.outfile: + self.outfile.write(yrep) + print yrep, "\n" + diff --git a/s3tests/roundtrip.py b/s3tests/roundtrip.py new file mode 100644 index 0000000..3642237 --- /dev/null +++ b/s3tests/roundtrip.py @@ -0,0 +1,97 @@ +import gevent.monkey +gevent.monkey.patch_all() + +import bunch +import collections +import gevent +import gevent.pool +import itertools +import random +import realistic +import time +import traceback + +import common +from common import context, config +from common.greenlets import ReaderGreenlet, WriterGreenlet +from common.results import ResultsLogger + +# Set up the common context to use our information. Wee. +context.update(bunch.Bunch( + # Set to False when it's time to exit main loop. + running = True, + + # The pools our tasks run in. + greenlet_pools = bunch.Bunch( + writer=None, + reader=None, + ), + + # The greenlet that keeps logs going. + results_logger = None, +)) + + +def setup(): + config_rt = config.roundtrip + + context.bucket = common.get_new_bucket() + print "Using bucket: {name}".format(name=context.bucket.name) + + context.greenlet_pools.reader = gevent.pool.Pool(config_rt.pool_sizes.reader, ReaderGreenlet) + context.greenlet_pools.writer = gevent.pool.Pool(config_rt.pool_sizes.writer, WriterGreenlet) + + context.key_iter = itertools.count(1) + context.files_iter = realistic.files_varied(config_rt.create_objects) + + +def _main(): + def _stop_running(): + """ Since we can't do assignment in a lambda, we have this little stub """ + context.running = False + + grace_period = config.roundtrip.grace_wait + + print "Launching/Scheduling essential services..." + gevent.spawn_later(config.roundtrip.duration + grace_period, _stop_running) + context.results_logger = ResultsLogger.spawn() + + print "Launching the pool of writers, and giving them {grace} seconds to get ahead of us!".format(grace=grace_period) + writers_start_time = time.time() + while time.time() - writers_start_time < grace_period: + common.fill_pools(context.greenlet_pools.writer) + time.sleep(0.1) + + # Main work loop. + print "Starting main work loop..." + while context.running: + common.fill_pools(context.greenlet_pools.writer, context.greenlet_pools.reader) + time.sleep(0.1) + + print "We've hit duration. Time to stop!" + print "Waiting {grace} seconds for jobs to finish normally.".format(grace=grace_period) + time.sleep(grace_period) + + print "Killing off any remaining jobs." + context.greenlet_pools.reader.kill() + context.greenlet_pools.writer.kill() + + print "Waiting 10 seconds for them to finish dying off and collections to complete!" + time.sleep(10) + + print "Killing essential services..." + context.results_logger.kill() + + print "Done!" + + +def main(): + common.setup() + setup() + + # Normal + try: + _main() + except: + traceback.print_exc() + common.teardown() diff --git a/setup.py b/setup.py index de01ab6..686fadd 100644 --- a/setup.py +++ b/setup.py @@ -23,6 +23,7 @@ setup( 'console_scripts': [ 's3tests-generate-objects = s3tests.generate_objects:main', 's3tests-test-readwrite = s3tests.rand_readwrite:main', + 's3tests-test-roundtrip = s3tests.roundtrip:main', ], },