Adding new testing tool, roundtrip

This commit is contained in:
Wesley Spikes 2011-07-15 12:06:02 -07:00
parent 6afc7fdcdb
commit 653c2b2200
4 changed files with 283 additions and 0 deletions

116
s3tests/common/greenlets.py Normal file
View file

@ -0,0 +1,116 @@
import bunch
import collections
import gevent
import random
import time
import traceback
from ..common import context, get_next_key
from ..common.results import TransferGreenletResult
from ..realistic import FileVerifier
# Make sure context has somewhere to store what we need
context.update(bunch.Bunch(
neads_first_read = collections.deque(),
all_keys = [],
files_iter = None,
))
class SafeTransferGreenlet(gevent.Greenlet):
def __init__(self, timeout=120):
gevent.Greenlet.__init__(self)
self.timeout = timeout
self.result = None
self.key = None # We store key in case we ned to retry due to gevent being a jerk
def _run(self):
""" A runner loop... using gevent creates a fun little bug where if two gevents try to
do the same op (reading, for ex), it raises an AssertionError rather than just switching
contexts again. Oh joy.
To combat this, we've put the main work to do in _call_doit, which handles detecting the
gevent quirk, and we'll retry as long as _call_doit requests that we retry, as indicated
by _call_doit returning True.
"""
while self._call_doit():
time.sleep(0.1)
def _call_doit(self):
""" Return True if we need to retry, False otherwise. """
result = self.result = TransferGreenletResult(self.type)
result.start_time = time.time()
try:
with gevent.Timeout(self.timeout, False):
result.success = self._doit()
except gevent.GreenletExit:
# We don't want to retry, as it's time to exit, but we also don't want to count
# this as a failure.
return False
except AssertionError as e:
# If we've raised this damn gevent error, we simply need to retry.
if e.args[0].startswith('This event is already used by another greenlet'):
return True # retry
# Different assertion error, so fail normally.
result.comment = traceback.format_exc()
except Exception:
result.comment = traceback.format_exc()
result.finish_time = time.time()
result.duration = result.finish_time - result.start_time
result.queue_finished()
return False # don't retry
class ReaderGreenlet(SafeTransferGreenlet):
type = 'reader'
def _doit(self):
if self.key:
key = self.key
elif context.neads_first_read:
key = context.neads_first_read.popleft()
elif context.all_keys:
key = random.choice(context.all_keys)
else:
time.sleep(1)
self.result.comment = 'No available keys to test with reader. Try again later.'
return False
self.key = key
fp = FileVerifier()
self.result.name = key.name
request_start = time.time()
key.get_contents_to_file(fp)
self.result.size = fp.size
self.result.latency = fp.first_write - request_start
if not fp.valid():
self.result.comment = 'Failed to validate key {name!s}'.format(name=key.name)
return False
return True
class WriterGreenlet(SafeTransferGreenlet):
type = 'writer'
def _doit(self):
if self.key:
key = self.key
else:
key = self.key = get_next_key(context.bucket)
fp = next(context.files_iter)
self.result.name = key.name
self.result.size = fp.size
key.set_contents_from_file(fp)
self.result.latency = time.time() - fp.last_read
# And at the end, add to neads_first_read and shuffle
context.neads_first_read.append(key)
context.all_keys.append(key)
return True

69
s3tests/common/results.py Normal file
View file

@ -0,0 +1,69 @@
import bunch
import collections
import gevent
import time
import traceback
import yaml
from ..common import context
# Make sure context has somewhere to store what we need
context.update(bunch.Bunch(
result_queue = collections.deque(),
))
class TransferGreenletResult(object):
""" Generic container object. Weeeeeeeeeeeeeee *short* """
def __init__(self, type):
# About the key
self.name = None
self.size = None
# About the job
self.type = type
self.success = False
self.comment = None
self.start_time = None
self.finish_time = None
self.latency = None
self.duration = None
def __repr__(self):
d = self.__dict__
d['success'] = d['success'] and 'ok' or 'FAILED'
return self._format.format(**d)
def queue_finished(self):
context.result_queue.append(self)
# And a representer for dumping a TransferGreenletResult as a YAML dict()
yaml.add_representer(TransferGreenletResult, lambda dumper, data: dumper.represent_dict(data.__dict__) )
class ResultsLogger(gevent.Greenlet):
""" A quick little greenlet to always run and dump results. """
def __init__(self):
gevent.Greenlet.__init__(self)
self.outfile = None
def _run(self):
while True:
try:
self._doit()
except:
print "An exception was encountered while dumping the results... this shouldn't happen!"
traceback.print_exc()
time.sleep(0.1)
def _doit(self):
while context.result_queue:
result = context.result_queue.popleft()
yrep = yaml.dump(result)
if self.outfile:
self.outfile.write(yrep)
print yrep, "\n"

97
s3tests/roundtrip.py Normal file
View file

@ -0,0 +1,97 @@
import gevent.monkey
gevent.monkey.patch_all()
import bunch
import collections
import gevent
import gevent.pool
import itertools
import random
import realistic
import time
import traceback
import common
from common import context, config
from common.greenlets import ReaderGreenlet, WriterGreenlet
from common.results import ResultsLogger
# Set up the common context to use our information. Wee.
context.update(bunch.Bunch(
# Set to False when it's time to exit main loop.
running = True,
# The pools our tasks run in.
greenlet_pools = bunch.Bunch(
writer=None,
reader=None,
),
# The greenlet that keeps logs going.
results_logger = None,
))
def setup():
config_rt = config.roundtrip
context.bucket = common.get_new_bucket()
print "Using bucket: {name}".format(name=context.bucket.name)
context.greenlet_pools.reader = gevent.pool.Pool(config_rt.pool_sizes.reader, ReaderGreenlet)
context.greenlet_pools.writer = gevent.pool.Pool(config_rt.pool_sizes.writer, WriterGreenlet)
context.key_iter = itertools.count(1)
context.files_iter = realistic.files_varied(config_rt.create_objects)
def _main():
def _stop_running():
""" Since we can't do assignment in a lambda, we have this little stub """
context.running = False
grace_period = config.roundtrip.grace_wait
print "Launching/Scheduling essential services..."
gevent.spawn_later(config.roundtrip.duration + grace_period, _stop_running)
context.results_logger = ResultsLogger.spawn()
print "Launching the pool of writers, and giving them {grace} seconds to get ahead of us!".format(grace=grace_period)
writers_start_time = time.time()
while time.time() - writers_start_time < grace_period:
common.fill_pools(context.greenlet_pools.writer)
time.sleep(0.1)
# Main work loop.
print "Starting main work loop..."
while context.running:
common.fill_pools(context.greenlet_pools.writer, context.greenlet_pools.reader)
time.sleep(0.1)
print "We've hit duration. Time to stop!"
print "Waiting {grace} seconds for jobs to finish normally.".format(grace=grace_period)
time.sleep(grace_period)
print "Killing off any remaining jobs."
context.greenlet_pools.reader.kill()
context.greenlet_pools.writer.kill()
print "Waiting 10 seconds for them to finish dying off and collections to complete!"
time.sleep(10)
print "Killing essential services..."
context.results_logger.kill()
print "Done!"
def main():
common.setup()
setup()
# Normal
try:
_main()
except:
traceback.print_exc()
common.teardown()

View file

@ -23,6 +23,7 @@ setup(
'console_scripts': [
's3tests-generate-objects = s3tests.generate_objects:main',
's3tests-test-readwrite = s3tests.rand_readwrite:main',
's3tests-test-roundtrip = s3tests.roundtrip:main',
],
},