readwrite tool improvements

Now generates a list of filenames to use duing the setup, and passes this
list to the workers rather than having the workers list the bucket contents
or generate file names themselves.

Added a "warmup" stage that first writes out all the files so that content
exists for each filename.

Added setting the random seed as a command line parameter and uses this seed
for both generating the file names and the file content generator.
This commit is contained in:
Steven Berler 2011-07-22 11:54:19 -07:00 committed by Tommi Virtanen
parent b465596063
commit 15cf598b12

View file

@ -17,10 +17,11 @@ import common
NANOSECOND = int(1e9) NANOSECOND = int(1e9)
def reader(bucket, worker_id, queue): def reader(bucket, worker_id, file_names, queue):
while True: while True:
count = 0 objname = random.choice(file_names)
for key in bucket.list(): key = bucket.new_key(objname)
fp = realistic.FileVerifier() fp = realistic.FileVerifier()
result = dict( result = dict(
type='r', type='r',
@ -63,31 +64,11 @@ def reader(bucket, worker_id, queue):
duration=int(round(elapsed * NANOSECOND)), duration=int(round(elapsed * NANOSECOND)),
) )
queue.put(result) queue.put(result)
count += 1
if count == 0:
gevent.sleep(1)
def writer(bucket, worker_id, queue, file_size=1, file_stddev=0, file_name_seed=None):
r = random.randint(0, 65535)
r2 = r
if file_name_seed != None:
r2 = file_name_seed
files = realistic.files(
mean=1024 * file_size,
stddev=1024 * file_stddev,
seed=r,
)
names = realistic.names(
mean=15,
stddev=4,
seed=r2,
)
def writer(bucket, worker_id, file_names, files, queue):
while True: while True:
fp = next(files) fp = next(files)
objname = next(names) objname = random.choice(file_names)
key = bucket.new_key(objname) key = bucket.new_key(objname)
result = dict( result = dict(
@ -137,13 +118,23 @@ def parse_options():
help="file size to use, in kb", default=1024, metavar="KB") help="file size to use, in kb", default=1024, metavar="KB")
parser.add_option("-d", "--stddev", dest="stddev", type="float", parser.add_option("-d", "--stddev", dest="stddev", type="float",
help="stddev of file size", default=0, metavar="KB") help="stddev of file size", default=0, metavar="KB")
parser.add_option("-W", "--rewrite", dest="rewrite", action="store_true", parser.add_option("-n", "--numfiles", dest="num_files", type="int",
help="rewrite the same files (total=quantity)") help="total number of files to write", default=1, metavar="NUM")
parser.add_option("--seed", dest="seed", type="int",
help="seed to use for random number generator", metavar="NUM")
parser.add_option("--no-cleanup", dest="cleanup", action="store_false", parser.add_option("--no-cleanup", dest="cleanup", action="store_false",
help="skip cleaning up all created buckets", default=True) help="skip cleaning up all created buckets", default=True)
return parser.parse_args() return parser.parse_args()
def write_file(bucket, file_name, file):
"""
Write a single file to the bucket using the file_name.
This is used during the warmup to initialize the files.
"""
key = bucket.new_key(file_name)
key.set_contents_from_file(file)
def main(): def main():
# parse options # parse options
(options, args) = parse_options() (options, args) = parse_options()
@ -153,30 +144,52 @@ def main():
common.setup() common.setup()
bucket = common.get_new_bucket() bucket = common.get_new_bucket()
print "Created bucket: {name}".format(name=bucket.name) print "Created bucket: {name}".format(name=bucket.name)
r = None file_names = list(realistic.names(
if (options.rewrite): mean=15,
r = random.randint(0, 65535) stddev=4,
seed=options.seed,
max_amount=options.num_files
))
files = realistic.files(
mean=options.file_size,
stddev=options.stddev,
seed=options.seed,
)
q = gevent.queue.Queue() q = gevent.queue.Queue()
# warmup - get initial set of files uploaded
print "Uploading initial set of {num} files".format(num=options.num_files)
warmup_pool = gevent.pool.Pool(size=100)
for file_name in file_names:
file = next(files)
warmup_pool.spawn_link_exception(
write_file,
bucket=bucket,
file_name=file_name,
file=file,
)
warmup_pool.join()
# main work # main work
print "Starting main worker loop."
print "Using file size: {size} +- {stddev}".format(size=options.file_size, stddev=options.stddev) print "Using file size: {size} +- {stddev}".format(size=options.file_size, stddev=options.stddev)
print "Spawning {r} readers and {w} writers...".format(r=options.num_readers, w=options.num_writers) print "Spawning {w} writers and {r} readers...".format(r=options.num_readers, w=options.num_writers)
group = gevent.pool.Group() group = gevent.pool.Group()
for x in xrange(options.num_writers): for x in xrange(options.num_writers):
group.spawn_link_exception( group.spawn_link_exception(
writer, writer,
bucket=bucket, bucket=bucket,
worker_id=x, worker_id=x,
file_names=file_names,
files=files,
queue=q, queue=q,
file_size=options.file_size,
file_stddev=options.stddev,
file_name_seed=r,
) )
for x in xrange(options.num_readers): for x in xrange(options.num_readers):
group.spawn_link_exception( group.spawn_link_exception(
reader, reader,
bucket=bucket, bucket=bucket,
worker_id=x, worker_id=x,
file_names=file_names,
queue=q, queue=q,
) )
def stop(): def stop():