Make readwrite test not use unittest setup/teardown logic.

This commit is contained in:
Tommi Virtanen 2011-07-26 14:13:11 -07:00
parent f58641d0bf
commit 513d5fabf2

View file

@ -6,6 +6,7 @@ import gevent.queue
import gevent.monkey; gevent.monkey.patch_all() import gevent.monkey; gevent.monkey.patch_all()
import itertools import itertools
import optparse import optparse
import os
import sys import sys
import time import time
import traceback import traceback
@ -107,7 +108,9 @@ def writer(bucket, worker_id, file_names, files, queue):
queue.put(result) queue.put(result)
def parse_options(): def parse_options():
parser = optparse.OptionParser() parser = optparse.OptionParser(
usage='%prog [OPTS] <CONFIG_YAML',
)
parser.add_option("--seed", dest="seed", type="int", parser.add_option("--seed", dest="seed", type="int",
help="seed to use for random number generator", metavar="NUM") help="seed to use for random number generator", metavar="NUM")
parser.add_option("--no-cleanup", dest="cleanup", action="store_false", parser.add_option("--no-cleanup", dest="cleanup", action="store_false",
@ -127,42 +130,47 @@ def main():
# parse options # parse options
(options, args) = parse_options() (options, args) = parse_options()
if os.isatty(sys.stdin.fileno()):
raise RuntimeError('Need configuration in stdin.')
config = common.read_config(sys.stdin)
conn = common.connect(config.s3)
bucket = None
try: try:
# setup # setup
real_stdout = sys.stdout real_stdout = sys.stdout
sys.stdout = sys.stderr sys.stdout = sys.stderr
common.setup()
# verify all required config items are present # verify all required config items are present
if 'readwrite' not in common.config: if 'readwrite' not in config:
raise RuntimeError('readwrite section not found in config') raise RuntimeError('readwrite section not found in config')
config = common.config.readwrite for item in ['readers', 'writers', 'duration', 'files', 'bucket']:
for item in ['readers', 'writers', 'duration', 'files']: if item not in config.readwrite:
if item not in config:
raise RuntimeError("Missing readwrite config item: {item}".format(item=item)) raise RuntimeError("Missing readwrite config item: {item}".format(item=item))
for item in ['num', 'size', 'stddev']: for item in ['num', 'size', 'stddev']:
if item not in config.files: if item not in config.readwrite.files:
raise RuntimeError("Missing readwrite config item: files.{item}".format(item=item)) raise RuntimeError("Missing readwrite config item: files.{item}".format(item=item))
# setup bucket and other objects # setup bucket and other objects
bucket = common.get_new_bucket() bucket_name = common.choose_bucket_prefix(config.readwrite.bucket, max_len=30)
bucket = conn.create_bucket(bucket_name)
print "Created bucket: {name}".format(name=bucket.name) print "Created bucket: {name}".format(name=bucket.name)
file_names = realistic.names( file_names = realistic.names(
mean=15, mean=15,
stddev=4, stddev=4,
seed=options.seed, seed=options.seed,
) )
file_names = itertools.islice(file_names, config.files.num) file_names = itertools.islice(file_names, config.readwrite.files.num)
file_names = list(file_names) file_names = list(file_names)
files = realistic.files( files = realistic.files(
mean=1024 * config.files.size, mean=1024 * config.readwrite.files.size,
stddev=1024 * config.files.stddev, stddev=1024 * config.readwrite.files.stddev,
seed=options.seed, seed=options.seed,
) )
q = gevent.queue.Queue() q = gevent.queue.Queue()
# warmup - get initial set of files uploaded # warmup - get initial set of files uploaded
print "Uploading initial set of {num} files".format(num=config.files.num) print "Uploading initial set of {num} files".format(num=config.readwrite.files.num)
warmup_pool = gevent.pool.Pool(size=100) warmup_pool = gevent.pool.Pool(size=100)
for file_name in file_names: for file_name in file_names:
fp = next(files) fp = next(files)
@ -176,10 +184,10 @@ def main():
# main work # main work
print "Starting main worker loop." print "Starting main worker loop."
print "Using file size: {size} +- {stddev}".format(size=config.files.size, stddev=config.files.stddev) print "Using file size: {size} +- {stddev}".format(size=config.readwrite.files.size, stddev=config.readwrite.files.stddev)
print "Spawning {w} writers and {r} readers...".format(w=config.writers, r=config.readers) print "Spawning {w} writers and {r} readers...".format(w=config.readwrite.writers, r=config.readwrite.readers)
group = gevent.pool.Group() group = gevent.pool.Group()
for x in xrange(config.writers): for x in xrange(config.readwrite.writers):
group.spawn_link_exception( group.spawn_link_exception(
writer, writer,
bucket=bucket, bucket=bucket,
@ -188,7 +196,7 @@ def main():
files=files, files=files,
queue=q, queue=q,
) )
for x in xrange(config.readers): for x in xrange(config.readwrite.readers):
group.spawn_link_exception( group.spawn_link_exception(
reader, reader,
bucket=bucket, bucket=bucket,
@ -199,11 +207,12 @@ def main():
def stop(): def stop():
group.kill(block=True) group.kill(block=True)
q.put(StopIteration) q.put(StopIteration)
gevent.spawn_later(config.duration, stop) gevent.spawn_later(config.readwrite.duration, stop)
yaml.safe_dump_all(q, stream=real_stdout) yaml.safe_dump_all(q, stream=real_stdout)
finally: finally:
# cleanup # cleanup
if options.cleanup: if options.cleanup:
common.teardown() if bucket is not None:
common.nuke_bucket(bucket)