diff --git a/s3tests/roundtrip.py b/s3tests/roundtrip.py index 78b4f14..e1a0f1c 100644 --- a/s3tests/roundtrip.py +++ b/s3tests/roundtrip.py @@ -1,95 +1,205 @@ -import gevent.monkey -gevent.monkey.patch_all() - -import bunch import gevent import gevent.pool -import realistic +import gevent.queue +import gevent.monkey; gevent.monkey.patch_all() +import itertools +import optparse +import os +import sys import time import traceback -import sys +import random +import yaml +import realistic import common -from common import context, config -from common.greenlets import ReaderGreenlet, WriterGreenlet -from common.results import ResultsLogger -# Set up the common context to use our information. Wee. -context.update(bunch.Bunch( - # Set to False when it's time to exit main loop. - running=True, +NANOSECOND = int(1e9) - # The pools our tasks run in. - greenlet_pools=bunch.Bunch( - writer=None, - reader=None, - ), +def writer(bucket, objname, fp, queue): + key = bucket.new_key(objname) - # The greenlet that keeps logs going. - results_logger=None, -)) + result = dict( + type='w', + bucket=bucket.name, + key=key.name, + ) + + start = time.time() + try: + key.set_contents_from_file(fp) + except gevent.GreenletExit: + raise + except Exception as e: + # stop timer ASAP, even on errors + end = time.time() + result.update( + error=dict( + msg=str(e), + traceback=traceback.format_exc(), + ), + ) + # certain kinds of programmer errors make this a busy + # loop; let parent greenlet get some time too + time.sleep(0) + else: + end = time.time() + + elapsed = end - start + result.update( + start=start, + duration=int(round(elapsed * NANOSECOND)), + chunks=fp.last_chunks, + ) + queue.put(result) -def setup(): - config_rt = config.roundtrip +def reader(bucket, objname, queue): + key = bucket.new_key(objname) - context.bucket = common.get_new_bucket() - print "Using bucket: {name}".format(name=context.bucket.name) + fp = realistic.FileVerifier() + result = dict( + type='r', + bucket=bucket.name, + key=key.name, + ) - context.greenlet_pools.reader = gevent.pool.Pool(config_rt.pool_sizes.reader, ReaderGreenlet) - context.greenlet_pools.writer = gevent.pool.Pool(config_rt.pool_sizes.writer, WriterGreenlet) + start = time.time() + try: + key.get_contents_to_file(fp) + except gevent.GreenletExit: + raise + except Exception as e: + # stop timer ASAP, even on errors + end = time.time() + result.update( + error=dict( + msg=str(e), + traceback=traceback.format_exc(), + ), + ) + # certain kinds of programmer errors make this a busy + # loop; let parent greenlet get some time too + time.sleep(0) + else: + end = time.time() - context.files_iter = realistic.files_varied(config_rt.create_objects) + if not fp.valid(): + result.update( + error=dict( + msg='md5sum check failed', + ), + ) + elapsed = end - start + result.update( + start=start, + duration=int(round(elapsed * NANOSECOND)), + chunks=fp.chunks, + ) + queue.put(result) -def _main(): - def _stop_running(): - """ Since we can't do assignment in a lambda, we have this little stub """ - context.running = False - - grace_period = config.roundtrip.grace_wait - - print "Launching/Scheduling essential services..." - gevent.spawn_later(config.roundtrip.duration + grace_period, _stop_running) - context.results_logger = ResultsLogger.spawn() - - print "Launching the pool of writers, and giving them {grace} seconds to get ahead of us!".format(grace=grace_period) - writers_start_time = time.time() - while time.time() - writers_start_time < grace_period: - common.fill_pools(context.greenlet_pools.writer) - time.sleep(0.1) - - # Main work loop. - print "Starting main work loop..." - while context.running: - common.fill_pools(*context.greenlet_pools.values()) - time.sleep(0.1) - - print "We've hit duration. Time to stop!" - print "Waiting {grace} seconds for jobs to finish normally.".format(grace=grace_period) - time.sleep(grace_period) - - print "Killing off any remaining jobs." - context.greenlet_pools.reader.kill() - context.greenlet_pools.writer.kill() - - print "Waiting 10 seconds for them to finish dying off and collections to complete!" - time.sleep(10) - - print "Killing essential services..." - context.results_logger.kill() - - print "Done!" +def parse_options(): + parser = optparse.OptionParser( + usage='%prog [OPTS]