diff --git a/s3tests/rand_readwrite.py b/s3tests/rand_readwrite.py index 1aae304..df5a929 100644 --- a/s3tests/rand_readwrite.py +++ b/s3tests/rand_readwrite.py @@ -48,57 +48,55 @@ class Result: details=self.details, ) -def reader(seconds, bucket, name=None, queue=None): - with gevent.Timeout(seconds, False): - while (1): - count = 0 - for key in bucket.list(): - fp = realistic.FileVerifier() - start = time.time() - key.get_contents_to_file(fp) - end = time.time() - elapsed = end - start - if queue: - queue.put( - Result( - name, - type=Result.TYPE_READER, - time=elapsed, - success=fp.valid(), - size=fp.size / 1024, - ), - ) - count += 1 - if count == 0: - gevent.sleep(1) - -def writer(seconds, bucket, name=None, queue=None, quantity=1, file_size=1, file_stddev=0, file_name_seed=None): - with gevent.Timeout(seconds, False): - while (1): - r = random.randint(0, 65535) - r2 = r - if file_name_seed != None: - r2 = file_name_seed - - files = generate_objects.get_random_files( - quantity=quantity, - mean=1024 * file_size, - stddev=1024 * file_stddev, - seed=r, - ) - +def reader(bucket, name=None, queue=None): + while (1): + count = 0 + for key in bucket.list(): + fp = realistic.FileVerifier() start = time.time() - generate_objects.upload_objects(bucket, files, r2) + key.get_contents_to_file(fp) end = time.time() elapsed = end - start - if queue: - queue.put(Result(name, - type=Result.TYPE_WRITER, - time=elapsed, - size=sum(f.size/1024 for f in files), + queue.put( + Result( + name, + type=Result.TYPE_READER, + time=elapsed, + success=fp.valid(), + size=fp.size / 1024, + ), ) + count += 1 + if count == 0: + gevent.sleep(1) + +def writer(bucket, name=None, queue=None, quantity=1, file_size=1, file_stddev=0, file_name_seed=None): + while (1): + r = random.randint(0, 65535) + r2 = r + if file_name_seed != None: + r2 = file_name_seed + + files = generate_objects.get_random_files( + quantity=quantity, + mean=1024 * file_size, + stddev=1024 * file_stddev, + seed=r, + ) + + start = time.time() + generate_objects.upload_objects(bucket, files, r2) + end = time.time() + elapsed = end - start + + if queue: + queue.put(Result(name, + type=Result.TYPE_WRITER, + time=elapsed, + size=sum(f.size/1024 for f in files), ) + ) def parse_options(): parser = optparse.OptionParser() @@ -140,7 +138,7 @@ def main(): print "Spawning {r} readers and {w} writers...".format(r=options.num_readers, w=options.num_writers) group = gevent.pool.Group() for x in xrange(options.num_writers): - group.spawn(writer, options.duration, bucket, + group.spawn(writer, bucket, name=x, queue=q, file_size=options.file_size, @@ -149,11 +147,14 @@ def main(): file_name_seed=r, ) for x in xrange(options.num_readers): - group.spawn(reader, options.duration, bucket, + group.spawn(reader, bucket, name=x, queue=q, ) - gevent.spawn_later(options.duration, lambda: q.put(StopIteration)) + def stop(): + group.kill(block=True) + q.put(StopIteration) + gevent.spawn_later(options.duration, stop) total_read = 0 total_write = 0 @@ -195,7 +196,6 @@ def main(): percent=(100.0*write_failure/max(write_failure+write_success, 1)), ) - group.join(timeout=1) except Exception as e: print e finally: