From 15cf598b125a970d210ffcaa8ccddbaa434645f0 Mon Sep 17 00:00:00 2001 From: Steven Berler Date: Fri, 22 Jul 2011 11:54:19 -0700 Subject: [PATCH] readwrite tool improvements Now generates a list of filenames to use duing the setup, and passes this list to the workers rather than having the workers list the bucket contents or generate file names themselves. Added a "warmup" stage that first writes out all the files so that content exists for each filename. Added setting the random seed as a command line parameter and uses this seed for both generating the file names and the file content generator. --- s3tests/rand_readwrite.py | 153 +++++++++++++++++++++----------------- 1 file changed, 83 insertions(+), 70 deletions(-) diff --git a/s3tests/rand_readwrite.py b/s3tests/rand_readwrite.py index ab606c8..0f00bea 100644 --- a/s3tests/rand_readwrite.py +++ b/s3tests/rand_readwrite.py @@ -17,77 +17,58 @@ import common NANOSECOND = int(1e9) -def reader(bucket, worker_id, queue): +def reader(bucket, worker_id, file_names, queue): while True: - count = 0 - for key in bucket.list(): - fp = realistic.FileVerifier() - result = dict( - type='r', - bucket=bucket.name, - key=key.name, - #TODO chunks - worker=worker_id, - ) + objname = random.choice(file_names) + key = bucket.new_key(objname) - start = time.time() - try: - key.get_contents_to_file(fp) - except gevent.GreenletExit: - raise - except Exception as e: - # stop timer ASAP, even on errors - end = time.time() + fp = realistic.FileVerifier() + result = dict( + type='r', + bucket=bucket.name, + key=key.name, + #TODO chunks + worker=worker_id, + ) + + start = time.time() + try: + key.get_contents_to_file(fp) + except gevent.GreenletExit: + raise + except Exception as e: + # stop timer ASAP, even on errors + end = time.time() + result.update( + error=dict( + msg=str(e), + traceback=traceback.format_exc(), + ), + ) + # certain kinds of programmer errors make this a busy + # loop; let parent greenlet get some time too + time.sleep(0) + else: + end = time.time() + + if not fp.valid(): result.update( error=dict( - msg=str(e), - traceback=traceback.format_exc(), + msg='md5sum check failed', ), ) - # certain kinds of programmer errors make this a busy - # loop; let parent greenlet get some time too - time.sleep(0) - else: - end = time.time() - if not fp.valid(): - result.update( - error=dict( - msg='md5sum check failed', - ), - ) - - elapsed = end - start - result.update( - start=start, - duration=int(round(elapsed * NANOSECOND)), - ) - queue.put(result) - count += 1 - if count == 0: - gevent.sleep(1) - -def writer(bucket, worker_id, queue, file_size=1, file_stddev=0, file_name_seed=None): - r = random.randint(0, 65535) - r2 = r - if file_name_seed != None: - r2 = file_name_seed - - files = realistic.files( - mean=1024 * file_size, - stddev=1024 * file_stddev, - seed=r, - ) - - names = realistic.names( - mean=15, - stddev=4, - seed=r2, - ) + elapsed = end - start + result.update( + start=start, + duration=int(round(elapsed * NANOSECOND)), + ) + queue.put(result) +def writer(bucket, worker_id, file_names, files, queue): while True: fp = next(files) - objname = next(names) + objname = random.choice(file_names) key = bucket.new_key(objname) result = dict( @@ -137,13 +118,23 @@ def parse_options(): help="file size to use, in kb", default=1024, metavar="KB") parser.add_option("-d", "--stddev", dest="stddev", type="float", help="stddev of file size", default=0, metavar="KB") - parser.add_option("-W", "--rewrite", dest="rewrite", action="store_true", - help="rewrite the same files (total=quantity)") + parser.add_option("-n", "--numfiles", dest="num_files", type="int", + help="total number of files to write", default=1, metavar="NUM") + parser.add_option("--seed", dest="seed", type="int", + help="seed to use for random number generator", metavar="NUM") parser.add_option("--no-cleanup", dest="cleanup", action="store_false", help="skip cleaning up all created buckets", default=True) return parser.parse_args() +def write_file(bucket, file_name, file): + """ + Write a single file to the bucket using the file_name. + This is used during the warmup to initialize the files. + """ + key = bucket.new_key(file_name) + key.set_contents_from_file(file) + def main(): # parse options (options, args) = parse_options() @@ -153,30 +144,52 @@ def main(): common.setup() bucket = common.get_new_bucket() print "Created bucket: {name}".format(name=bucket.name) - r = None - if (options.rewrite): - r = random.randint(0, 65535) + file_names = list(realistic.names( + mean=15, + stddev=4, + seed=options.seed, + max_amount=options.num_files + )) + files = realistic.files( + mean=options.file_size, + stddev=options.stddev, + seed=options.seed, + ) q = gevent.queue.Queue() + # warmup - get initial set of files uploaded + print "Uploading initial set of {num} files".format(num=options.num_files) + warmup_pool = gevent.pool.Pool(size=100) + for file_name in file_names: + file = next(files) + warmup_pool.spawn_link_exception( + write_file, + bucket=bucket, + file_name=file_name, + file=file, + ) + warmup_pool.join() + # main work + print "Starting main worker loop." print "Using file size: {size} +- {stddev}".format(size=options.file_size, stddev=options.stddev) - print "Spawning {r} readers and {w} writers...".format(r=options.num_readers, w=options.num_writers) + print "Spawning {w} writers and {r} readers...".format(r=options.num_readers, w=options.num_writers) group = gevent.pool.Group() for x in xrange(options.num_writers): group.spawn_link_exception( writer, bucket=bucket, worker_id=x, + file_names=file_names, + files=files, queue=q, - file_size=options.file_size, - file_stddev=options.stddev, - file_name_seed=r, ) for x in xrange(options.num_readers): group.spawn_link_exception( reader, bucket=bucket, worker_id=x, + file_names=file_names, queue=q, ) def stop():