mirror of
https://github.com/ceph/s3-tests.git
synced 2025-01-12 05:21:09 +00:00
95 lines
2.6 KiB
Python
95 lines
2.6 KiB
Python
import gevent.monkey
|
|
gevent.monkey.patch_all()
|
|
|
|
import bunch
|
|
import gevent
|
|
import gevent.pool
|
|
import realistic
|
|
import time
|
|
import traceback
|
|
import sys
|
|
|
|
import common
|
|
from common import context, config
|
|
from common.greenlets import ReaderGreenlet, WriterGreenlet
|
|
from common.results import ResultsLogger
|
|
|
|
# Set up the common context to use our information. Wee.
|
|
context.update(bunch.Bunch(
|
|
# Set to False when it's time to exit main loop.
|
|
running=True,
|
|
|
|
# The pools our tasks run in.
|
|
greenlet_pools=bunch.Bunch(
|
|
writer=None,
|
|
reader=None,
|
|
),
|
|
|
|
# The greenlet that keeps logs going.
|
|
results_logger=None,
|
|
))
|
|
|
|
|
|
def setup():
|
|
config_rt = config.roundtrip
|
|
|
|
context.bucket = common.get_new_bucket()
|
|
print "Using bucket: {name}".format(name=context.bucket.name)
|
|
|
|
context.greenlet_pools.reader = gevent.pool.Pool(config_rt.pool_sizes.reader, ReaderGreenlet)
|
|
context.greenlet_pools.writer = gevent.pool.Pool(config_rt.pool_sizes.writer, WriterGreenlet)
|
|
|
|
context.files_iter = realistic.files_varied(config_rt.create_objects)
|
|
|
|
|
|
def _main():
|
|
def _stop_running():
|
|
""" Since we can't do assignment in a lambda, we have this little stub """
|
|
context.running = False
|
|
|
|
grace_period = config.roundtrip.grace_wait
|
|
|
|
print "Launching/Scheduling essential services..."
|
|
gevent.spawn_later(config.roundtrip.duration + grace_period, _stop_running)
|
|
context.results_logger = ResultsLogger.spawn()
|
|
|
|
print "Launching the pool of writers, and giving them {grace} seconds to get ahead of us!".format(grace=grace_period)
|
|
writers_start_time = time.time()
|
|
while time.time() - writers_start_time < grace_period:
|
|
common.fill_pools(context.greenlet_pools.writer)
|
|
time.sleep(0.1)
|
|
|
|
# Main work loop.
|
|
print "Starting main work loop..."
|
|
while context.running:
|
|
common.fill_pools(*context.greenlet_pools.values())
|
|
time.sleep(0.1)
|
|
|
|
print "We've hit duration. Time to stop!"
|
|
print "Waiting {grace} seconds for jobs to finish normally.".format(grace=grace_period)
|
|
time.sleep(grace_period)
|
|
|
|
print "Killing off any remaining jobs."
|
|
context.greenlet_pools.reader.kill()
|
|
context.greenlet_pools.writer.kill()
|
|
|
|
print "Waiting 10 seconds for them to finish dying off and collections to complete!"
|
|
time.sleep(10)
|
|
|
|
print "Killing essential services..."
|
|
context.results_logger.kill()
|
|
|
|
print "Done!"
|
|
|
|
|
|
def main():
|
|
sys.stdout = sys.stderr # Original steam already saved by common
|
|
common.setup()
|
|
setup()
|
|
|
|
# Normal
|
|
try:
|
|
_main()
|
|
except:
|
|
traceback.print_exc()
|
|
common.teardown()
|