Rewrite roundtrip test based on the readwrite one.

They probably should share more code in the future, but it's only
200 lines of python, so I'm not all that worried.
This commit is contained in:
Tommi Virtanen 2011-07-26 16:22:40 -07:00
parent f3718da14c
commit 700dd3f880

View file

@ -1,95 +1,205 @@
import gevent.monkey
gevent.monkey.patch_all()
import bunch
import gevent import gevent
import gevent.pool import gevent.pool
import realistic import gevent.queue
import gevent.monkey; gevent.monkey.patch_all()
import itertools
import optparse
import os
import sys
import time import time
import traceback import traceback
import sys import random
import yaml
import realistic
import common import common
from common import context, config
from common.greenlets import ReaderGreenlet, WriterGreenlet
from common.results import ResultsLogger
# Set up the common context to use our information. Wee. NANOSECOND = int(1e9)
context.update(bunch.Bunch(
# Set to False when it's time to exit main loop.
running=True,
# The pools our tasks run in. def writer(bucket, objname, fp, queue):
greenlet_pools=bunch.Bunch( key = bucket.new_key(objname)
writer=None,
reader=None,
),
# The greenlet that keeps logs going. result = dict(
results_logger=None, type='w',
)) bucket=bucket.name,
key=key.name,
)
start = time.time()
try:
key.set_contents_from_file(fp)
except gevent.GreenletExit:
raise
except Exception as e:
# stop timer ASAP, even on errors
end = time.time()
result.update(
error=dict(
msg=str(e),
traceback=traceback.format_exc(),
),
)
# certain kinds of programmer errors make this a busy
# loop; let parent greenlet get some time too
time.sleep(0)
else:
end = time.time()
elapsed = end - start
result.update(
start=start,
duration=int(round(elapsed * NANOSECOND)),
chunks=fp.last_chunks,
)
queue.put(result)
def setup(): def reader(bucket, objname, queue):
config_rt = config.roundtrip key = bucket.new_key(objname)
context.bucket = common.get_new_bucket() fp = realistic.FileVerifier()
print "Using bucket: {name}".format(name=context.bucket.name) result = dict(
type='r',
bucket=bucket.name,
key=key.name,
)
context.greenlet_pools.reader = gevent.pool.Pool(config_rt.pool_sizes.reader, ReaderGreenlet) start = time.time()
context.greenlet_pools.writer = gevent.pool.Pool(config_rt.pool_sizes.writer, WriterGreenlet) try:
key.get_contents_to_file(fp)
except gevent.GreenletExit:
raise
except Exception as e:
# stop timer ASAP, even on errors
end = time.time()
result.update(
error=dict(
msg=str(e),
traceback=traceback.format_exc(),
),
)
# certain kinds of programmer errors make this a busy
# loop; let parent greenlet get some time too
time.sleep(0)
else:
end = time.time()
context.files_iter = realistic.files_varied(config_rt.create_objects) if not fp.valid():
result.update(
error=dict(
msg='md5sum check failed',
),
)
elapsed = end - start
result.update(
start=start,
duration=int(round(elapsed * NANOSECOND)),
chunks=fp.chunks,
)
queue.put(result)
def _main(): def parse_options():
def _stop_running(): parser = optparse.OptionParser(
""" Since we can't do assignment in a lambda, we have this little stub """ usage='%prog [OPTS] <CONFIG_YAML',
context.running = False )
parser.add_option("--no-cleanup", dest="cleanup", action="store_false",
grace_period = config.roundtrip.grace_wait help="skip cleaning up all created buckets", default=True)
print "Launching/Scheduling essential services..."
gevent.spawn_later(config.roundtrip.duration + grace_period, _stop_running)
context.results_logger = ResultsLogger.spawn()
print "Launching the pool of writers, and giving them {grace} seconds to get ahead of us!".format(grace=grace_period)
writers_start_time = time.time()
while time.time() - writers_start_time < grace_period:
common.fill_pools(context.greenlet_pools.writer)
time.sleep(0.1)
# Main work loop.
print "Starting main work loop..."
while context.running:
common.fill_pools(*context.greenlet_pools.values())
time.sleep(0.1)
print "We've hit duration. Time to stop!"
print "Waiting {grace} seconds for jobs to finish normally.".format(grace=grace_period)
time.sleep(grace_period)
print "Killing off any remaining jobs."
context.greenlet_pools.reader.kill()
context.greenlet_pools.writer.kill()
print "Waiting 10 seconds for them to finish dying off and collections to complete!"
time.sleep(10)
print "Killing essential services..."
context.results_logger.kill()
print "Done!"
return parser.parse_args()
def main(): def main():
sys.stdout = sys.stderr # Original steam already saved by common # parse options
common.setup() (options, args) = parse_options()
setup()
if os.isatty(sys.stdin.fileno()):
raise RuntimeError('Need configuration in stdin.')
config = common.read_config(sys.stdin)
conn = common.connect(config.s3)
bucket = None
# Normal
try: try:
_main() # setup
except: real_stdout = sys.stdout
traceback.print_exc() sys.stdout = sys.stderr
common.teardown()
# verify all required config items are present
if 'roundtrip' not in config:
raise RuntimeError('roundtrip section not found in config')
for item in ['readers', 'writers', 'duration', 'files', 'bucket']:
if item not in config.roundtrip:
raise RuntimeError("Missing roundtrip config item: {item}".format(item=item))
for item in ['num', 'size', 'stddev']:
if item not in config.roundtrip.files:
raise RuntimeError("Missing roundtrip config item: files.{item}".format(item=item))
seeds = dict(config.roundtrip.get('random_seed', {}))
seeds.setdefault('main', random.randrange(2**32))
rand = random.Random(seeds['main'])
for name in ['names', 'contents', 'writer', 'reader']:
seeds.setdefault(name, rand.randrange(2**32))
print 'Using random seeds: {seeds}'.format(seeds=seeds)
# setup bucket and other objects
bucket_name = common.choose_bucket_prefix(config.roundtrip.bucket, max_len=30)
bucket = conn.create_bucket(bucket_name)
print "Created bucket: {name}".format(name=bucket.name)
objnames = realistic.names(
mean=15,
stddev=4,
seed=seeds['names'],
)
objnames = itertools.islice(objnames, config.roundtrip.files.num)
objnames = list(objnames)
files = realistic.files(
mean=1024 * config.roundtrip.files.size,
stddev=1024 * config.roundtrip.files.stddev,
seed=seeds['contents'],
)
q = gevent.queue.Queue()
logger_g = gevent.spawn_link_exception(yaml.safe_dump_all, q, stream=real_stdout)
print "Writing {num} objects with {w} workers...".format(
num=config.roundtrip.files.num,
w=config.roundtrip.writers,
)
pool = gevent.pool.Pool(size=config.roundtrip.writers)
for objname in objnames:
fp = next(files)
pool.spawn_link_exception(
writer,
bucket=bucket,
objname=objname,
fp=fp,
queue=q,
)
pool.join()
print "Reading {num} objects with {w} workers...".format(
num=config.roundtrip.files.num,
w=config.roundtrip.readers,
)
# avoid accessing them in the same order as the writing
rand.shuffle(objnames)
pool = gevent.pool.Pool(size=config.roundtrip.readers)
for objname in objnames:
pool.spawn_link_exception(
reader,
bucket=bucket,
objname=objname,
queue=q,
)
pool.join()
q.put(StopIteration)
logger_g.get()
finally:
# cleanup
if options.cleanup:
if bucket is not None:
common.nuke_bucket(bucket)