s3-tests/s3tests/roundtrip.py
2011-07-19 09:09:51 -07:00

95 lines
2.6 KiB
Python

import gevent.monkey
gevent.monkey.patch_all()
import bunch
import gevent
import gevent.pool
import realistic
import time
import traceback
import sys
import common
from common import context, config
from common.greenlets import ReaderGreenlet, WriterGreenlet
from common.results import ResultsLogger
# Set up the common context to use our information. Wee.
context.update(bunch.Bunch(
# Set to False when it's time to exit main loop.
running=True,
# The pools our tasks run in.
greenlet_pools=bunch.Bunch(
writer=None,
reader=None,
),
# The greenlet that keeps logs going.
results_logger=None,
))
def setup():
config_rt = config.roundtrip
context.bucket = common.get_new_bucket()
print "Using bucket: {name}".format(name=context.bucket.name)
context.greenlet_pools.reader = gevent.pool.Pool(config_rt.pool_sizes.reader, ReaderGreenlet)
context.greenlet_pools.writer = gevent.pool.Pool(config_rt.pool_sizes.writer, WriterGreenlet)
context.files_iter = realistic.files_varied(config_rt.create_objects)
def _main():
def _stop_running():
""" Since we can't do assignment in a lambda, we have this little stub """
context.running = False
grace_period = config.roundtrip.grace_wait
print "Launching/Scheduling essential services..."
gevent.spawn_later(config.roundtrip.duration + grace_period, _stop_running)
context.results_logger = ResultsLogger.spawn()
print "Launching the pool of writers, and giving them {grace} seconds to get ahead of us!".format(grace=grace_period)
writers_start_time = time.time()
while time.time() - writers_start_time < grace_period:
common.fill_pools(context.greenlet_pools.writer)
time.sleep(0.1)
# Main work loop.
print "Starting main work loop..."
while context.running:
common.fill_pools(*context.greenlet_pools.values())
time.sleep(0.1)
print "We've hit duration. Time to stop!"
print "Waiting {grace} seconds for jobs to finish normally.".format(grace=grace_period)
time.sleep(grace_period)
print "Killing off any remaining jobs."
context.greenlet_pools.reader.kill()
context.greenlet_pools.writer.kill()
print "Waiting 10 seconds for them to finish dying off and collections to complete!"
time.sleep(10)
print "Killing essential services..."
context.results_logger.kill()
print "Done!"
def main():
sys.stdout = sys.stderr # Original steam already saved by common
common.setup()
setup()
# Normal
try:
_main()
except:
traceback.print_exc()
common.teardown()