s3-tests/s3tests/rand_readwrite.py

211 lines
6.3 KiB
Python
Raw Normal View History

#!/usr/bin/python
import gevent
import gevent.pool
import gevent.queue
import gevent.monkey; gevent.monkey.patch_all()
import itertools
import optparse
import sys
import time
import traceback
import random
import yaml
import generate_objects
import realistic
import common
NANOSECOND = int(1e9)
def reader(bucket, worker_id, file_names, queue):
2011-07-20 16:53:59 +00:00
while True:
objname = random.choice(file_names)
key = bucket.new_key(objname)
fp = realistic.FileVerifier()
result = dict(
type='r',
bucket=bucket.name,
key=key.name,
worker=worker_id,
)
start = time.time()
try:
key.get_contents_to_file(fp)
except gevent.GreenletExit:
raise
except Exception as e:
# stop timer ASAP, even on errors
end = time.time()
result.update(
error=dict(
msg=str(e),
traceback=traceback.format_exc(),
),
)
# certain kinds of programmer errors make this a busy
# loop; let parent greenlet get some time too
time.sleep(0)
else:
end = time.time()
if not fp.valid():
result.update(
error=dict(
msg='md5sum check failed',
),
)
elapsed = end - start
result.update(
start=start,
duration=int(round(elapsed * NANOSECOND)),
2011-07-22 20:17:25 +00:00
chunks=fp.chunks,
)
queue.put(result)
def writer(bucket, worker_id, file_names, files, queue):
while True:
fp = next(files)
objname = random.choice(file_names)
key = bucket.new_key(objname)
result = dict(
type='w',
bucket=bucket.name,
key=key.name,
worker=worker_id,
)
start = time.time()
try:
key.set_contents_from_file(fp)
except gevent.GreenletExit:
raise
except Exception as e:
# stop timer ASAP, even on errors
end = time.time()
result.update(
error=dict(
msg=str(e),
traceback=traceback.format_exc(),
),
)
# certain kinds of programmer errors make this a busy
# loop; let parent greenlet get some time too
time.sleep(0)
else:
end = time.time()
elapsed = end - start
result.update(
start=start,
duration=int(round(elapsed * NANOSECOND)),
2011-07-22 20:17:25 +00:00
chunks=fp.last_chunks,
)
queue.put(result)
def parse_options():
parser = optparse.OptionParser()
parser.add_option("--seed", dest="seed", type="int",
help="seed to use for random number generator", metavar="NUM")
parser.add_option("--no-cleanup", dest="cleanup", action="store_false",
help="skip cleaning up all created buckets", default=True)
return parser.parse_args()
def write_file(bucket, file_name, file):
"""
Write a single file to the bucket using the file_name.
This is used during the warmup to initialize the files.
"""
key = bucket.new_key(file_name)
key.set_contents_from_file(file)
def main():
2011-07-11 20:19:54 +00:00
# parse options
(options, args) = parse_options()
2011-07-11 20:19:54 +00:00
try:
# setup
real_stdout = sys.stdout
sys.stdout = sys.stderr
common.setup()
# verify all required config items are present
if 'rand_readwrite' not in common.config:
raise RuntimeError('rand_readwrite section not found in config')
config = common.config.rand_readwrite
for item in ['readers', 'writers', 'duration', 'files']:
if item not in config:
raise RuntimeError("Missing rand_readwrite config item: {item}".format(item=item))
for item in ['num', 'size', 'stddev']:
if item not in config.files:
raise RuntimeError("Missing rand_readwrite config item: files.{item}".format(item=item))
# setup bucket and other objects
bucket = common.get_new_bucket()
print "Created bucket: {name}".format(name=bucket.name)
file_names = realistic.names(
mean=15,
stddev=4,
seed=options.seed,
)
file_names = itertools.islice(file_names, config.files.num)
file_names = list(file_names)
files = realistic.files(
mean=1024 * config.files.size,
stddev=1024 * config.files.stddev,
seed=options.seed,
)
q = gevent.queue.Queue()
# warmup - get initial set of files uploaded
print "Uploading initial set of {num} files".format(num=config.files.num)
warmup_pool = gevent.pool.Pool(size=100)
for file_name in file_names:
file = next(files)
warmup_pool.spawn_link_exception(
write_file,
bucket=bucket,
file_name=file_name,
file=file,
)
warmup_pool.join()
# main work
print "Starting main worker loop."
print "Using file size: {size} +- {stddev}".format(size=config.files.size, stddev=config.files.stddev)
print "Spawning {w} writers and {r} readers...".format(w=config.writers, r=config.readers)
group = gevent.pool.Group()
for x in xrange(config.writers):
group.spawn_link_exception(
2011-07-20 17:54:17 +00:00
writer,
bucket=bucket,
worker_id=x,
file_names=file_names,
files=files,
2011-07-20 17:54:17 +00:00
queue=q,
)
for x in xrange(config.readers):
group.spawn_link_exception(
2011-07-20 17:54:17 +00:00
reader,
bucket=bucket,
worker_id=x,
file_names=file_names,
2011-07-20 17:54:17 +00:00
queue=q,
)
def stop():
group.kill(block=True)
q.put(StopIteration)
gevent.spawn_later(config.duration, stop)
yaml.safe_dump_all(q, stream=real_stdout)
finally:
# cleanup
if options.cleanup:
common.teardown()