From 71cfd4956c59bb063dfe491bbc1a796c11c14770 Mon Sep 17 00:00:00 2001 From: Tommi Virtanen Date: Wed, 20 Jul 2011 09:50:10 -0700 Subject: [PATCH] Move end test logic out of individual workers. Parent waits for the duration anyway, just make it unconditionally kill the workers. They already had timeouts aborting them; they may be in the middle of an operation, but we really don't care, time's up. This also avoids oddities with the queue where there may be items after the StopIteration. This also avoids workers potentially not exiting within the 1-second timeout the old code had. --- s3tests/rand_readwrite.py | 98 +++++++++++++++++++-------------------- 1 file changed, 49 insertions(+), 49 deletions(-) diff --git a/s3tests/rand_readwrite.py b/s3tests/rand_readwrite.py index 1aae304..df5a929 100644 --- a/s3tests/rand_readwrite.py +++ b/s3tests/rand_readwrite.py @@ -48,57 +48,55 @@ class Result: details=self.details, ) -def reader(seconds, bucket, name=None, queue=None): - with gevent.Timeout(seconds, False): - while (1): - count = 0 - for key in bucket.list(): - fp = realistic.FileVerifier() - start = time.time() - key.get_contents_to_file(fp) - end = time.time() - elapsed = end - start - if queue: - queue.put( - Result( - name, - type=Result.TYPE_READER, - time=elapsed, - success=fp.valid(), - size=fp.size / 1024, - ), - ) - count += 1 - if count == 0: - gevent.sleep(1) - -def writer(seconds, bucket, name=None, queue=None, quantity=1, file_size=1, file_stddev=0, file_name_seed=None): - with gevent.Timeout(seconds, False): - while (1): - r = random.randint(0, 65535) - r2 = r - if file_name_seed != None: - r2 = file_name_seed - - files = generate_objects.get_random_files( - quantity=quantity, - mean=1024 * file_size, - stddev=1024 * file_stddev, - seed=r, - ) - +def reader(bucket, name=None, queue=None): + while (1): + count = 0 + for key in bucket.list(): + fp = realistic.FileVerifier() start = time.time() - generate_objects.upload_objects(bucket, files, r2) + key.get_contents_to_file(fp) end = time.time() elapsed = end - start - if queue: - queue.put(Result(name, - type=Result.TYPE_WRITER, - time=elapsed, - size=sum(f.size/1024 for f in files), + queue.put( + Result( + name, + type=Result.TYPE_READER, + time=elapsed, + success=fp.valid(), + size=fp.size / 1024, + ), ) + count += 1 + if count == 0: + gevent.sleep(1) + +def writer(bucket, name=None, queue=None, quantity=1, file_size=1, file_stddev=0, file_name_seed=None): + while (1): + r = random.randint(0, 65535) + r2 = r + if file_name_seed != None: + r2 = file_name_seed + + files = generate_objects.get_random_files( + quantity=quantity, + mean=1024 * file_size, + stddev=1024 * file_stddev, + seed=r, + ) + + start = time.time() + generate_objects.upload_objects(bucket, files, r2) + end = time.time() + elapsed = end - start + + if queue: + queue.put(Result(name, + type=Result.TYPE_WRITER, + time=elapsed, + size=sum(f.size/1024 for f in files), ) + ) def parse_options(): parser = optparse.OptionParser() @@ -140,7 +138,7 @@ def main(): print "Spawning {r} readers and {w} writers...".format(r=options.num_readers, w=options.num_writers) group = gevent.pool.Group() for x in xrange(options.num_writers): - group.spawn(writer, options.duration, bucket, + group.spawn(writer, bucket, name=x, queue=q, file_size=options.file_size, @@ -149,11 +147,14 @@ def main(): file_name_seed=r, ) for x in xrange(options.num_readers): - group.spawn(reader, options.duration, bucket, + group.spawn(reader, bucket, name=x, queue=q, ) - gevent.spawn_later(options.duration, lambda: q.put(StopIteration)) + def stop(): + group.kill(block=True) + q.put(StopIteration) + gevent.spawn_later(options.duration, stop) total_read = 0 total_write = 0 @@ -195,7 +196,6 @@ def main(): percent=(100.0*write_failure/max(write_failure+write_success, 1)), ) - group.join(timeout=1) except Exception as e: print e finally: