From e4effe5479e550956c38f9526d30a797af9ecf41 Mon Sep 17 00:00:00 2001 From: Wesley Spikes Date: Mon, 11 Jul 2011 14:17:06 -0700 Subject: [PATCH 01/17] realistic.names shouldn't return a 0-length filename --- s3tests/realistic.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/s3tests/realistic.py b/s3tests/realistic.py index e5d234b..0ead55b 100644 --- a/s3tests/realistic.py +++ b/s3tests/realistic.py @@ -123,7 +123,7 @@ def names(mean, stddev, charset=None, seed=None): while True: while True: length = int(rand.normalvariate(mean, stddev)) - if length >= 0: + if length > 0: break name = ''.join(rand.choice(charset) for _ in xrange(length)) yield name From 298bc99c5dad585bb942a29e0fbcc2b1bdec5555 Mon Sep 17 00:00:00 2001 From: Wesley Spikes Date: Fri, 15 Jul 2011 11:44:05 -0700 Subject: [PATCH 02/17] Packagify s3tests.common so we can add more magic into it later. --- s3tests/{common.py => common/__init__.py} | 0 1 file changed, 0 insertions(+), 0 deletions(-) rename s3tests/{common.py => common/__init__.py} (100%) diff --git a/s3tests/common.py b/s3tests/common/__init__.py similarity index 100% rename from s3tests/common.py rename to s3tests/common/__init__.py From b6e0287810bcf136cd99c641ef70b3756c52dd7c Mon Sep 17 00:00:00 2001 From: Wesley Spikes Date: Fri, 15 Jul 2011 11:45:38 -0700 Subject: [PATCH 03/17] Add context, get_next_key, and fill_pools to s3tests.common context is a generic Bunch that we can store items in to be shared across modules, without invoking the wrath of the globals -- yuck! get_next_key will generate a sequentially numbered key for us to use fill_pools will take a variable list of gevent.pool.Pool objects and top them all off --- s3tests/common/__init__.py | 15 +++++++++++++++ 1 file changed, 15 insertions(+) diff --git a/s3tests/common/__init__.py b/s3tests/common/__init__.py index 373d191..7039d43 100644 --- a/s3tests/common/__init__.py +++ b/s3tests/common/__init__.py @@ -10,7 +10,13 @@ s3 = bunch.Bunch() config = bunch.Bunch() prefix = '' +# For those scripts that use a context, these are pretty univerally needed. +context = bunch.Bunch( + bucket = None, +) + bucket_counter = itertools.count(1) +key_counter = itertools.count(1) def choose_bucket_prefix(template, max_len=30): """ @@ -156,3 +162,12 @@ def get_new_bucket(connection=None): def teardown(): nuke_prefixed_buckets() + +def fill_pools(*args): + for pool in args: + while not pool.full(): + pool.spawn() + +def get_next_key(bucket=None): + return bucket.new_key("seqkey-{num}".format(num=next(key_counter))) + From 4959c5e1a4433a2fc006ff9438680c0a6b06c226 Mon Sep 17 00:00:00 2001 From: Wesley Spikes Date: Fri, 15 Jul 2011 11:47:57 -0700 Subject: [PATCH 04/17] Work-around for RGW failing to send IsTruncated on listing all keys. Without this, it fails to delete the bucket when there's more than 1000 keys. --- s3tests/common/__init__.py | 20 +++++++++++++------- 1 file changed, 13 insertions(+), 7 deletions(-) diff --git a/s3tests/common/__init__.py b/s3tests/common/__init__.py index 7039d43..a64fe10 100644 --- a/s3tests/common/__init__.py +++ b/s3tests/common/__init__.py @@ -50,13 +50,19 @@ def nuke_prefixed_buckets(): print 'Cleaning bucket {bucket}'.format(bucket=bucket) try: bucket.set_canned_acl('private') - for key in bucket.list(): - print 'Cleaning bucket {bucket} key {key}'.format( - bucket=bucket, - key=key, - ) - key.set_canned_acl('private') - key.delete() + # TODO: deleted_cnt and the while loop is a work around for rgw + # not sending the + deleted_cnt = 1 + while deleted_cnt: + deleted_cnt = 0 + for key in bucket.list(): + print 'Cleaning bucket {bucket} key {key}'.format( + bucket=bucket, + key=key, + ) + key.set_canned_acl('private') + key.delete() + deleted_cnt += 1 bucket.delete() except boto.exception.S3ResponseError as e: # TODO workaround for buggy rgw that fails to send From 12716d854b42e5795703b29ad066f973300f9795 Mon Sep 17 00:00:00 2001 From: Wesley Spikes Date: Fri, 15 Jul 2011 12:03:23 -0700 Subject: [PATCH 05/17] Adding generator files_varied to s3tests.realistic Given a tuple of tuples, construct several files() generators, and yield from those randomly. Randomness is weighted based on the number of files remaining in each group. --- s3tests/realistic.py | 48 ++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 48 insertions(+) diff --git a/s3tests/realistic.py b/s3tests/realistic.py index 0ead55b..b3f47ba 100644 --- a/s3tests/realistic.py +++ b/s3tests/realistic.py @@ -1,7 +1,9 @@ +import bunch import hashlib import random import string import struct +import time class RandomContentFile(object): def __init__(self, size, seed): @@ -127,3 +129,49 @@ def names(mean, stddev, charset=None, seed=None): break name = ''.join(rand.choice(charset) for _ in xrange(length)) yield name + +def files_varied(groups): + """ Yields a weighted-random selection of file-like objects. """ + # Quick data type sanity. + assert groups and isinstance(groups, (list, tuple)) + + total_num = 0 + file_sets = [] + rand = random.Random(time.time()) + + # Build the sets for our yield + for num, size, stddev in groups: + assert num and size + + file_sets.append(bunch.Bunch( + num = num, + size = size, + stddev = stddev, + files = files(size, stddev, time.time()) + )) + total_num += num + + while True: + if not total_num: + raise StopIteration + + num = rand.randrange(total_num) + + ok = 0 + for file_set in file_sets: + if num > file_set.num: + num -= file_set.num + continue + + total_num -= 1 + file_set.num -= 1 + + # None left in this set! + if file_set.num == 0: + file_sets.remove(file_set) + + ok = 1 + yield next(file_set.files) + + if not ok: + raise RuntimeError, "Couldn't find a match." From 6afc7fdcdb143fb8bb071509cb1c6f7cdd24b87e Mon Sep 17 00:00:00 2001 From: Wesley Spikes Date: Fri, 15 Jul 2011 12:05:02 -0700 Subject: [PATCH 06/17] Adding last_read to RandomContentFile and first_write to FileVerifier, so we can measure latency. --- s3tests/realistic.py | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/s3tests/realistic.py b/s3tests/realistic.py index b3f47ba..d025c91 100644 --- a/s3tests/realistic.py +++ b/s3tests/realistic.py @@ -15,6 +15,7 @@ class RandomContentFile(object): self.hash = hashlib.md5() self.digest_size = self.hash.digest_size self.digest = None + self.last_read = 0 def seek(self, offset): assert offset == 0 @@ -59,6 +60,7 @@ class RandomContentFile(object): size -= digest_count data = self.digest[:digest_count] r.append(data) + self.last_read = time.time() return ''.join(r) @@ -67,8 +69,11 @@ class FileVerifier(object): self.size = 0 self.hash = hashlib.md5() self.buf = '' + self.first_write = 0 def write(self, data): + if self.size == 0: + self.first_write = time.time() self.size += len(data) self.buf += data digsz = -1*self.hash.digest_size From 653c2b22007ed1d66a4e284861aefbeb6c832e29 Mon Sep 17 00:00:00 2001 From: Wesley Spikes Date: Fri, 15 Jul 2011 12:06:02 -0700 Subject: [PATCH 07/17] Adding new testing tool, roundtrip --- s3tests/common/greenlets.py | 116 ++++++++++++++++++++++++++++++++++++ s3tests/common/results.py | 69 +++++++++++++++++++++ s3tests/roundtrip.py | 97 ++++++++++++++++++++++++++++++ setup.py | 1 + 4 files changed, 283 insertions(+) create mode 100644 s3tests/common/greenlets.py create mode 100644 s3tests/common/results.py create mode 100644 s3tests/roundtrip.py diff --git a/s3tests/common/greenlets.py b/s3tests/common/greenlets.py new file mode 100644 index 0000000..3191097 --- /dev/null +++ b/s3tests/common/greenlets.py @@ -0,0 +1,116 @@ +import bunch +import collections +import gevent +import random +import time +import traceback + +from ..common import context, get_next_key +from ..common.results import TransferGreenletResult +from ..realistic import FileVerifier + +# Make sure context has somewhere to store what we need +context.update(bunch.Bunch( + neads_first_read = collections.deque(), + all_keys = [], + files_iter = None, +)) + +class SafeTransferGreenlet(gevent.Greenlet): + def __init__(self, timeout=120): + gevent.Greenlet.__init__(self) + self.timeout = timeout + self.result = None + self.key = None # We store key in case we ned to retry due to gevent being a jerk + + def _run(self): + """ A runner loop... using gevent creates a fun little bug where if two gevents try to + do the same op (reading, for ex), it raises an AssertionError rather than just switching + contexts again. Oh joy. + + To combat this, we've put the main work to do in _call_doit, which handles detecting the + gevent quirk, and we'll retry as long as _call_doit requests that we retry, as indicated + by _call_doit returning True. + """ + while self._call_doit(): + time.sleep(0.1) + + def _call_doit(self): + """ Return True if we need to retry, False otherwise. """ + result = self.result = TransferGreenletResult(self.type) + result.start_time = time.time() + + try: + with gevent.Timeout(self.timeout, False): + result.success = self._doit() + except gevent.GreenletExit: + # We don't want to retry, as it's time to exit, but we also don't want to count + # this as a failure. + return False + except AssertionError as e: + # If we've raised this damn gevent error, we simply need to retry. + if e.args[0].startswith('This event is already used by another greenlet'): + return True # retry + # Different assertion error, so fail normally. + result.comment = traceback.format_exc() + except Exception: + result.comment = traceback.format_exc() + + result.finish_time = time.time() + result.duration = result.finish_time - result.start_time + result.queue_finished() + return False # don't retry + + +class ReaderGreenlet(SafeTransferGreenlet): + type = 'reader' + + def _doit(self): + if self.key: + key = self.key + elif context.neads_first_read: + key = context.neads_first_read.popleft() + elif context.all_keys: + key = random.choice(context.all_keys) + else: + time.sleep(1) + self.result.comment = 'No available keys to test with reader. Try again later.' + return False + + self.key = key + fp = FileVerifier() + self.result.name = key.name + + request_start = time.time() + key.get_contents_to_file(fp) + self.result.size = fp.size + self.result.latency = fp.first_write - request_start + + if not fp.valid(): + self.result.comment = 'Failed to validate key {name!s}'.format(name=key.name) + return False + + return True + + +class WriterGreenlet(SafeTransferGreenlet): + type = 'writer' + + def _doit(self): + if self.key: + key = self.key + else: + key = self.key = get_next_key(context.bucket) + + fp = next(context.files_iter) + self.result.name = key.name + self.result.size = fp.size + + key.set_contents_from_file(fp) + self.result.latency = time.time() - fp.last_read + + # And at the end, add to neads_first_read and shuffle + context.neads_first_read.append(key) + context.all_keys.append(key) + + return True diff --git a/s3tests/common/results.py b/s3tests/common/results.py new file mode 100644 index 0000000..684bfc5 --- /dev/null +++ b/s3tests/common/results.py @@ -0,0 +1,69 @@ +import bunch +import collections +import gevent +import time +import traceback +import yaml + +from ..common import context + +# Make sure context has somewhere to store what we need +context.update(bunch.Bunch( + result_queue = collections.deque(), +)) + + +class TransferGreenletResult(object): + """ Generic container object. Weeeeeeeeeeeeeee *short* """ + def __init__(self, type): + # About the key + self.name = None + self.size = None + + # About the job + self.type = type + self.success = False + self.comment = None + self.start_time = None + self.finish_time = None + + self.latency = None + self.duration = None + + def __repr__(self): + d = self.__dict__ + d['success'] = d['success'] and 'ok' or 'FAILED' + + return self._format.format(**d) + + def queue_finished(self): + context.result_queue.append(self) + + +# And a representer for dumping a TransferGreenletResult as a YAML dict() +yaml.add_representer(TransferGreenletResult, lambda dumper, data: dumper.represent_dict(data.__dict__) ) + + +class ResultsLogger(gevent.Greenlet): + """ A quick little greenlet to always run and dump results. """ + def __init__(self): + gevent.Greenlet.__init__(self) + self.outfile = None + + def _run(self): + while True: + try: + self._doit() + except: + print "An exception was encountered while dumping the results... this shouldn't happen!" + traceback.print_exc() + time.sleep(0.1) + + def _doit(self): + while context.result_queue: + result = context.result_queue.popleft() + yrep = yaml.dump(result) + if self.outfile: + self.outfile.write(yrep) + print yrep, "\n" + diff --git a/s3tests/roundtrip.py b/s3tests/roundtrip.py new file mode 100644 index 0000000..3642237 --- /dev/null +++ b/s3tests/roundtrip.py @@ -0,0 +1,97 @@ +import gevent.monkey +gevent.monkey.patch_all() + +import bunch +import collections +import gevent +import gevent.pool +import itertools +import random +import realistic +import time +import traceback + +import common +from common import context, config +from common.greenlets import ReaderGreenlet, WriterGreenlet +from common.results import ResultsLogger + +# Set up the common context to use our information. Wee. +context.update(bunch.Bunch( + # Set to False when it's time to exit main loop. + running = True, + + # The pools our tasks run in. + greenlet_pools = bunch.Bunch( + writer=None, + reader=None, + ), + + # The greenlet that keeps logs going. + results_logger = None, +)) + + +def setup(): + config_rt = config.roundtrip + + context.bucket = common.get_new_bucket() + print "Using bucket: {name}".format(name=context.bucket.name) + + context.greenlet_pools.reader = gevent.pool.Pool(config_rt.pool_sizes.reader, ReaderGreenlet) + context.greenlet_pools.writer = gevent.pool.Pool(config_rt.pool_sizes.writer, WriterGreenlet) + + context.key_iter = itertools.count(1) + context.files_iter = realistic.files_varied(config_rt.create_objects) + + +def _main(): + def _stop_running(): + """ Since we can't do assignment in a lambda, we have this little stub """ + context.running = False + + grace_period = config.roundtrip.grace_wait + + print "Launching/Scheduling essential services..." + gevent.spawn_later(config.roundtrip.duration + grace_period, _stop_running) + context.results_logger = ResultsLogger.spawn() + + print "Launching the pool of writers, and giving them {grace} seconds to get ahead of us!".format(grace=grace_period) + writers_start_time = time.time() + while time.time() - writers_start_time < grace_period: + common.fill_pools(context.greenlet_pools.writer) + time.sleep(0.1) + + # Main work loop. + print "Starting main work loop..." + while context.running: + common.fill_pools(context.greenlet_pools.writer, context.greenlet_pools.reader) + time.sleep(0.1) + + print "We've hit duration. Time to stop!" + print "Waiting {grace} seconds for jobs to finish normally.".format(grace=grace_period) + time.sleep(grace_period) + + print "Killing off any remaining jobs." + context.greenlet_pools.reader.kill() + context.greenlet_pools.writer.kill() + + print "Waiting 10 seconds for them to finish dying off and collections to complete!" + time.sleep(10) + + print "Killing essential services..." + context.results_logger.kill() + + print "Done!" + + +def main(): + common.setup() + setup() + + # Normal + try: + _main() + except: + traceback.print_exc() + common.teardown() diff --git a/setup.py b/setup.py index de01ab6..686fadd 100644 --- a/setup.py +++ b/setup.py @@ -23,6 +23,7 @@ setup( 'console_scripts': [ 's3tests-generate-objects = s3tests.generate_objects:main', 's3tests-test-readwrite = s3tests.rand_readwrite:main', + 's3tests-test-roundtrip = s3tests.roundtrip:main', ], }, From d0bb148dea1446bf22999196c5e321fef45acf8d Mon Sep 17 00:00:00 2001 From: Wesley Spikes Date: Fri, 15 Jul 2011 13:18:21 -0700 Subject: [PATCH 08/17] Rename _call_doit to _real_run -- it's a bit more clear that way, IMHO --- s3tests/common/greenlets.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/s3tests/common/greenlets.py b/s3tests/common/greenlets.py index 3191097..ddd82bd 100644 --- a/s3tests/common/greenlets.py +++ b/s3tests/common/greenlets.py @@ -28,14 +28,14 @@ class SafeTransferGreenlet(gevent.Greenlet): do the same op (reading, for ex), it raises an AssertionError rather than just switching contexts again. Oh joy. - To combat this, we've put the main work to do in _call_doit, which handles detecting the - gevent quirk, and we'll retry as long as _call_doit requests that we retry, as indicated - by _call_doit returning True. + To combat this, we've put the main work to do in _real_run, which handles detecting the + gevent quirk, and we'll retry as long as _real_run requests that we retry, as indicated + by _real_run returning True. """ - while self._call_doit(): + while self._real_run(): time.sleep(0.1) - def _call_doit(self): + def _real_run(self): """ Return True if we need to retry, False otherwise. """ result = self.result = TransferGreenletResult(self.type) result.start_time = time.time() From fb8f110e690e68d6c37a76b5bc9e261ae3d7c50b Mon Sep 17 00:00:00 2001 From: Wesley Spikes Date: Fri, 15 Jul 2011 13:19:01 -0700 Subject: [PATCH 09/17] Add assert to RandomContentFile, ensure size >= digest_size It clearly will not work to have the size smaller than the digest size, so we shouldn't allow it. --- s3tests/realistic.py | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/s3tests/realistic.py b/s3tests/realistic.py index d025c91..41a0255 100644 --- a/s3tests/realistic.py +++ b/s3tests/realistic.py @@ -17,6 +17,11 @@ class RandomContentFile(object): self.digest = None self.last_read = 0 + assert self.size >= self.digest_size, "Can not create RandomContentFile: size ({size}) must be >= digest_size ({digest_size})".format( + size=self.size, + digest_size=self.digest_size + ) + def seek(self, offset): assert offset == 0 self.random.seed(self.seed) From 3c07a4a3633d1a9b235cfa66de91d675d49c89a8 Mon Sep 17 00:00:00 2001 From: Wesley Spikes Date: Fri, 15 Jul 2011 13:20:50 -0700 Subject: [PATCH 10/17] Update roundtrip._main -- while loop should auto-replenish all pools, and not ones so named. --- s3tests/roundtrip.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/s3tests/roundtrip.py b/s3tests/roundtrip.py index 3642237..c97ce12 100644 --- a/s3tests/roundtrip.py +++ b/s3tests/roundtrip.py @@ -65,7 +65,7 @@ def _main(): # Main work loop. print "Starting main work loop..." while context.running: - common.fill_pools(context.greenlet_pools.writer, context.greenlet_pools.reader) + common.fill_pools(*context.greenlet_pools.values()) time.sleep(0.1) print "We've hit duration. Time to stop!" From 841b0986193048ed54b9af537910498d577f4825 Mon Sep 17 00:00:00 2001 From: Wesley Spikes Date: Fri, 15 Jul 2011 14:19:58 -0700 Subject: [PATCH 11/17] Refactor/cleanup TransferResults class. Also cleans the output to be in the format TV seems to want. --- s3tests/common/greenlets.py | 40 +++++++++++--------- s3tests/common/results.py | 73 +++++++++++++++++++++++++++++-------- s3tests/realistic.py | 46 +++++++++++++++++------ s3tests/roundtrip.py | 2 +- 4 files changed, 115 insertions(+), 46 deletions(-) diff --git a/s3tests/common/greenlets.py b/s3tests/common/greenlets.py index ddd82bd..885ee3d 100644 --- a/s3tests/common/greenlets.py +++ b/s3tests/common/greenlets.py @@ -38,8 +38,8 @@ class SafeTransferGreenlet(gevent.Greenlet): def _real_run(self): """ Return True if we need to retry, False otherwise. """ result = self.result = TransferGreenletResult(self.type) - result.start_time = time.time() - + result.markStarted() + try: with gevent.Timeout(self.timeout, False): result.success = self._doit() @@ -52,13 +52,11 @@ class SafeTransferGreenlet(gevent.Greenlet): if e.args[0].startswith('This event is already used by another greenlet'): return True # retry # Different assertion error, so fail normally. - result.comment = traceback.format_exc() + result.setError(show_traceback=True) except Exception: - result.comment = traceback.format_exc() + result.setError(show_traceback=True) - result.finish_time = time.time() - result.duration = result.finish_time - result.start_time - result.queue_finished() + result.markFinished() return False # don't retry @@ -74,21 +72,22 @@ class ReaderGreenlet(SafeTransferGreenlet): key = random.choice(context.all_keys) else: time.sleep(1) - self.result.comment = 'No available keys to test with reader. Try again later.' - return False + return self.result.setError('No available keys to test with reader. Try again later.') self.key = key - fp = FileVerifier() - self.result.name = key.name + self.result.setKey(key) + + fp = FileVerifier() - request_start = time.time() key.get_contents_to_file(fp) + + self.result.request_finish = time.time() + self.result.request_start = fp.created_at + self.result.chunks = fp.chunks self.result.size = fp.size - self.result.latency = fp.first_write - request_start if not fp.valid(): - self.result.comment = 'Failed to validate key {name!s}'.format(name=key.name) - return False + return self.result.setError('Failed to validate key {name!s}'.format(name=key.name)) return True @@ -100,14 +99,19 @@ class WriterGreenlet(SafeTransferGreenlet): if self.key: key = self.key else: - key = self.key = get_next_key(context.bucket) + key = get_next_key(context.bucket) + + self.key = key + self.result.setKey(key) fp = next(context.files_iter) - self.result.name = key.name self.result.size = fp.size key.set_contents_from_file(fp) - self.result.latency = time.time() - fp.last_read + + self.result.request_finish = time.time() + self.result.request_start = fp.start_time + self.result.chunks = fp.last_chunks # And at the end, add to neads_first_read and shuffle context.neads_first_read.append(key) diff --git a/s3tests/common/results.py b/s3tests/common/results.py index 684bfc5..2a3da10 100644 --- a/s3tests/common/results.py +++ b/s3tests/common/results.py @@ -1,13 +1,13 @@ import bunch import collections import gevent +import sys import time import traceback import yaml from ..common import context -# Make sure context has somewhere to store what we need context.update(bunch.Bunch( result_queue = collections.deque(), )) @@ -16,39 +16,84 @@ context.update(bunch.Bunch( class TransferGreenletResult(object): """ Generic container object. Weeeeeeeeeeeeeee *short* """ def __init__(self, type): + # About the Greenlet + self.type = type + # About the key - self.name = None + self.bucket = None + self.key = None self.size = None # About the job - self.type = type self.success = False - self.comment = None + self.error = None + self.start_time = None self.finish_time = None - self.latency = None self.duration = None + self.latency = None - def __repr__(self): - d = self.__dict__ - d['success'] = d['success'] and 'ok' or 'FAILED' + self.request_start = None + self.request_finish = None - return self._format.format(**d) + self.chunks = None - def queue_finished(self): + def markStarted(self): + self.start_time = time.time() + + def markFinished(self): + self.finish_time = time.time() + self.duration = self.finish_time - self.start_time context.result_queue.append(self) + def setKey(self, key): + self.key = key.name + self.bucket = key.bucket.name + + def setError(self, message='Unhandled Exception', show_traceback=False): + """ Sets an error state in the result, and returns False... example usage: + + return self.result.setError('Something happened', traceback=True) + """ + self.error = dict() + self.error['msg'] = message + if show_traceback: + self.error['traceback'] = traceback.format_exc() + return False + + @classmethod + def repr_yaml(c, dumper, self): + data = dict() + for x in ('type', 'bucket', 'key', 'chunks'): + data[x] = self.__dict__[x] + + # reader => r, writer => w + data['type'] = data['type'][0]#chunks + + # the error key must be present ONLY on failure. + assert not (self.success and self.error) + if self.success: + assert self.error == None + else: + assert self.error != None + data['error'] = self.error + + data['start'] = self.request_start + if self.request_finish: + data['duration'] = 1000000000 * (self.request_finish - self.request_start) + + return dumper.represent_dict(data) # And a representer for dumping a TransferGreenletResult as a YAML dict() -yaml.add_representer(TransferGreenletResult, lambda dumper, data: dumper.represent_dict(data.__dict__) ) +yaml.add_representer(TransferGreenletResult, TransferGreenletResult.repr_yaml) class ResultsLogger(gevent.Greenlet): """ A quick little greenlet to always run and dump results. """ def __init__(self): gevent.Greenlet.__init__(self) - self.outfile = None + self.outfile = sys.stderr def _run(self): while True: @@ -63,7 +108,5 @@ class ResultsLogger(gevent.Greenlet): while context.result_queue: result = context.result_queue.popleft() yrep = yaml.dump(result) - if self.outfile: - self.outfile.write(yrep) - print yrep, "\n" + self.outfile.write(yrep + "---\n") diff --git a/s3tests/realistic.py b/s3tests/realistic.py index 41a0255..6a51a4c 100644 --- a/s3tests/realistic.py +++ b/s3tests/realistic.py @@ -7,27 +7,44 @@ import time class RandomContentFile(object): def __init__(self, size, seed): + self.size = size self.seed = seed self.random = random.Random(self.seed) - self.offset = 0 - self.buffer = '' - self.size = size - self.hash = hashlib.md5() - self.digest_size = self.hash.digest_size - self.digest = None - self.last_read = 0 - assert self.size >= self.digest_size, "Can not create RandomContentFile: size ({size}) must be >= digest_size ({digest_size})".format( + # Boto likes to seek once more after it's done reading, so we need to save the last chunks/seek value. + self.last_chunks = self.chunks = None + self.last_seek = self.start_time = None + + # Let seek initialize the rest of it, rather than dup code + self.seek(0) + + assert self.size >= self.digest_size, \ + "Can not create RandomContentFile: size ({size}) must be >= digest_size ({digest_size})".format( size=self.size, digest_size=self.digest_size ) + def _mark_chunk(self): + ctime = time.time() + self.chunks.append([self.offset, (ctime - self.last_seek) * 1000000000, ctime]) + def seek(self, offset): assert offset == 0 self.random.seed(self.seed) self.offset = offset self.buffer = '' + self.hash = hashlib.md5() + self.digest_size = self.hash.digest_size + self.digest = None + + # Save the last seek time as our start time, and the last chunks + self.start_time = self.last_seek + self.last_chunks = self.chunks + # Before emptying. + self.last_seek = time.time() + self.chunks = [] + def tell(self): return self.offset @@ -65,7 +82,8 @@ class RandomContentFile(object): size -= digest_count data = self.digest[:digest_count] r.append(data) - self.last_read = time.time() + + self._mark_chunk() return ''.join(r) @@ -74,16 +92,20 @@ class FileVerifier(object): self.size = 0 self.hash = hashlib.md5() self.buf = '' - self.first_write = 0 + self.created_at = time.time() + self.chunks = [] + + def _mark_chunk(self): + ctime = time.time() + self.chunks.append([self.size, (ctime - self.created_at) * 1000000000, ctime]) def write(self, data): - if self.size == 0: - self.first_write = time.time() self.size += len(data) self.buf += data digsz = -1*self.hash.digest_size new_data, self.buf = self.buf[0:digsz], self.buf[digsz:] self.hash.update(new_data) + self._mark_chunk() def valid(self): """ diff --git a/s3tests/roundtrip.py b/s3tests/roundtrip.py index c97ce12..c69fb94 100644 --- a/s3tests/roundtrip.py +++ b/s3tests/roundtrip.py @@ -20,7 +20,7 @@ from common.results import ResultsLogger context.update(bunch.Bunch( # Set to False when it's time to exit main loop. running = True, - + # The pools our tasks run in. greenlet_pools = bunch.Bunch( writer=None, From 43b2f8695d08360b215ed32170a6b1e0823aa002 Mon Sep 17 00:00:00 2001 From: Wesley Spikes Date: Mon, 18 Jul 2011 13:13:08 -0700 Subject: [PATCH 12/17] Cleanup output just a little bit more. --- s3tests/common/greenlets.py | 2 ++ s3tests/realistic.py | 10 ++++------ 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/s3tests/common/greenlets.py b/s3tests/common/greenlets.py index 885ee3d..8ed90ac 100644 --- a/s3tests/common/greenlets.py +++ b/s3tests/common/greenlets.py @@ -9,6 +9,7 @@ from ..common import context, get_next_key from ..common.results import TransferGreenletResult from ..realistic import FileVerifier + # Make sure context has somewhere to store what we need context.update(bunch.Bunch( neads_first_read = collections.deque(), @@ -16,6 +17,7 @@ context.update(bunch.Bunch( files_iter = None, )) + class SafeTransferGreenlet(gevent.Greenlet): def __init__(self, timeout=120): gevent.Greenlet.__init__(self) diff --git a/s3tests/realistic.py b/s3tests/realistic.py index 6a51a4c..f4a3ddf 100644 --- a/s3tests/realistic.py +++ b/s3tests/realistic.py @@ -25,8 +25,7 @@ class RandomContentFile(object): ) def _mark_chunk(self): - ctime = time.time() - self.chunks.append([self.offset, (ctime - self.last_seek) * 1000000000, ctime]) + self.chunks.append([self.offset, (time.time() - self.last_seek) * 1000000000]) def seek(self, offset): assert offset == 0 @@ -37,8 +36,8 @@ class RandomContentFile(object): self.hash = hashlib.md5() self.digest_size = self.hash.digest_size self.digest = None - - # Save the last seek time as our start time, and the last chunks + + # Save the last seek time as our start time, and the last chunks self.start_time = self.last_seek self.last_chunks = self.chunks # Before emptying. @@ -96,8 +95,7 @@ class FileVerifier(object): self.chunks = [] def _mark_chunk(self): - ctime = time.time() - self.chunks.append([self.size, (ctime - self.created_at) * 1000000000, ctime]) + self.chunks.append([self.size, (time.time() - self.created_at) * 1000000000]) def write(self, data): self.size += len(data) From 44cb2f256e643784f393404820f8985f04876678 Mon Sep 17 00:00:00 2001 From: Wesley Spikes Date: Mon, 18 Jul 2011 15:26:03 -0700 Subject: [PATCH 13/17] Adding unlimited=bool to files_varied -- if set to true, it will never issue StopIteration --- s3tests/realistic.py | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/s3tests/realistic.py b/s3tests/realistic.py index f4a3ddf..64c406c 100644 --- a/s3tests/realistic.py +++ b/s3tests/realistic.py @@ -160,7 +160,7 @@ def names(mean, stddev, charset=None, seed=None): name = ''.join(rand.choice(charset) for _ in xrange(length)) yield name -def files_varied(groups): +def files_varied(groups, unlimited=False): """ Yields a weighted-random selection of file-like objects. """ # Quick data type sanity. assert groups and isinstance(groups, (list, tuple)) @@ -193,12 +193,13 @@ def files_varied(groups): num -= file_set.num continue - total_num -= 1 - file_set.num -= 1 + if not unlimited: + total_num -= 1 + file_set.num -= 1 - # None left in this set! - if file_set.num == 0: - file_sets.remove(file_set) + # None left in this set! + if file_set.num == 0: + file_sets.remove(file_set) ok = 1 yield next(file_set.files) From c154d98b9636d453ddbc4784fab43e8bdf2b2087 Mon Sep 17 00:00:00 2001 From: Wesley Spikes Date: Mon, 18 Jul 2011 15:34:06 -0700 Subject: [PATCH 14/17] Resolve the random AssertionError By doing a copy-new on the key, it seems to work around the random AssertionError's I've been seeing. Here's hoping this fully resolves it. --- s3tests/common/greenlets.py | 10 +++------- 1 file changed, 3 insertions(+), 7 deletions(-) diff --git a/s3tests/common/greenlets.py b/s3tests/common/greenlets.py index 8ed90ac..4da0ba1 100644 --- a/s3tests/common/greenlets.py +++ b/s3tests/common/greenlets.py @@ -49,13 +49,7 @@ class SafeTransferGreenlet(gevent.Greenlet): # We don't want to retry, as it's time to exit, but we also don't want to count # this as a failure. return False - except AssertionError as e: - # If we've raised this damn gevent error, we simply need to retry. - if e.args[0].startswith('This event is already used by another greenlet'): - return True # retry - # Different assertion error, so fail normally. - result.setError(show_traceback=True) - except Exception: + except: result.setError(show_traceback=True) result.markFinished() @@ -76,6 +70,8 @@ class ReaderGreenlet(SafeTransferGreenlet): time.sleep(1) return self.result.setError('No available keys to test with reader. Try again later.') + # Copynew the key object + key = key.bucket.new_key(key.name) self.key = key self.result.setKey(key) From ce6c57841a0192125b68a375aea2f43c819547f7 Mon Sep 17 00:00:00 2001 From: Wesley Spikes Date: Mon, 18 Jul 2011 16:09:13 -0700 Subject: [PATCH 15/17] Now that we no longer have the need to retry, get rid of that logic. Also fix a typo -- s/neads_first_read/needs_first_read/ --- s3tests/common/greenlets.py | 39 +++++++------------------------------ 1 file changed, 7 insertions(+), 32 deletions(-) diff --git a/s3tests/common/greenlets.py b/s3tests/common/greenlets.py index 4da0ba1..e4e52bc 100644 --- a/s3tests/common/greenlets.py +++ b/s3tests/common/greenlets.py @@ -12,7 +12,7 @@ from ..realistic import FileVerifier # Make sure context has somewhere to store what we need context.update(bunch.Bunch( - neads_first_read = collections.deque(), + needs_first_read = collections.deque(), all_keys = [], files_iter = None, )) @@ -23,22 +23,8 @@ class SafeTransferGreenlet(gevent.Greenlet): gevent.Greenlet.__init__(self) self.timeout = timeout self.result = None - self.key = None # We store key in case we ned to retry due to gevent being a jerk def _run(self): - """ A runner loop... using gevent creates a fun little bug where if two gevents try to - do the same op (reading, for ex), it raises an AssertionError rather than just switching - contexts again. Oh joy. - - To combat this, we've put the main work to do in _real_run, which handles detecting the - gevent quirk, and we'll retry as long as _real_run requests that we retry, as indicated - by _real_run returning True. - """ - while self._real_run(): - time.sleep(0.1) - - def _real_run(self): - """ Return True if we need to retry, False otherwise. """ result = self.result = TransferGreenletResult(self.type) result.markStarted() @@ -46,24 +32,19 @@ class SafeTransferGreenlet(gevent.Greenlet): with gevent.Timeout(self.timeout, False): result.success = self._doit() except gevent.GreenletExit: - # We don't want to retry, as it's time to exit, but we also don't want to count - # this as a failure. - return False + return except: result.setError(show_traceback=True) result.markFinished() - return False # don't retry class ReaderGreenlet(SafeTransferGreenlet): type = 'reader' def _doit(self): - if self.key: - key = self.key - elif context.neads_first_read: - key = context.neads_first_read.popleft() + if context.needs_first_read: + key = context.needs_first_read.popleft() elif context.all_keys: key = random.choice(context.all_keys) else: @@ -72,7 +53,6 @@ class ReaderGreenlet(SafeTransferGreenlet): # Copynew the key object key = key.bucket.new_key(key.name) - self.key = key self.result.setKey(key) fp = FileVerifier() @@ -94,12 +74,7 @@ class WriterGreenlet(SafeTransferGreenlet): type = 'writer' def _doit(self): - if self.key: - key = self.key - else: - key = get_next_key(context.bucket) - - self.key = key + key = get_next_key(context.bucket) self.result.setKey(key) fp = next(context.files_iter) @@ -111,8 +86,8 @@ class WriterGreenlet(SafeTransferGreenlet): self.result.request_start = fp.start_time self.result.chunks = fp.last_chunks - # And at the end, add to neads_first_read and shuffle - context.neads_first_read.append(key) + # And at the end, add to needs_first_read and shuffle + context.needs_first_read.append(key) context.all_keys.append(key) return True From 0297d61e40b81d9ebb154f3225478991b3483a5f Mon Sep 17 00:00:00 2001 From: Wesley Spikes Date: Mon, 18 Jul 2011 17:04:36 -0700 Subject: [PATCH 16/17] Swap stdout/stderr -- debug messages then go to stderr, and yaml data to stdout --- s3tests/common/__init__.py | 5 +++++ s3tests/common/results.py | 2 +- s3tests/roundtrip.py | 2 ++ 3 files changed, 8 insertions(+), 1 deletion(-) diff --git a/s3tests/common/__init__.py b/s3tests/common/__init__.py index a64fe10..8eb4232 100644 --- a/s3tests/common/__init__.py +++ b/s3tests/common/__init__.py @@ -4,6 +4,7 @@ import itertools import os import random import string +import sys import yaml s3 = bunch.Bunch() @@ -13,6 +14,10 @@ prefix = '' # For those scripts that use a context, these are pretty univerally needed. context = bunch.Bunch( bucket = None, + + # Save stdout/stderr in case they get fudged with. + real_stdout = sys.stdout, + real_stderr = sys.stderr, ) bucket_counter = itertools.count(1) diff --git a/s3tests/common/results.py b/s3tests/common/results.py index 2a3da10..b9aa9ce 100644 --- a/s3tests/common/results.py +++ b/s3tests/common/results.py @@ -93,7 +93,7 @@ class ResultsLogger(gevent.Greenlet): """ A quick little greenlet to always run and dump results. """ def __init__(self): gevent.Greenlet.__init__(self) - self.outfile = sys.stderr + self.outfile = context.real_stdout def _run(self): while True: diff --git a/s3tests/roundtrip.py b/s3tests/roundtrip.py index c69fb94..69b35a1 100644 --- a/s3tests/roundtrip.py +++ b/s3tests/roundtrip.py @@ -10,6 +10,7 @@ import random import realistic import time import traceback +import sys import common from common import context, config @@ -86,6 +87,7 @@ def _main(): def main(): + sys.stdout = sys.stderr # Original steam already saved by common common.setup() setup() From 56ff0f6374b9da69163c2eb259ca3c1acec8a6e8 Mon Sep 17 00:00:00 2001 From: Tommi Virtanen Date: Tue, 19 Jul 2011 08:56:17 -0700 Subject: [PATCH 17/17] Revert "Add assert to RandomContentFile, ensure size >= digest_size" This reverts commit fb8f110e690e68d6c37a76b5bc9e261ae3d7c50b. realistic.files was meant to work just fine with short files. Commit 91c4f88860efa5f7d11d9cd6436ab30d78a83ab1 has a fix for the verifying side, to support short files. Conflicts: s3tests/realistic.py --- s3tests/realistic.py | 6 ------ 1 file changed, 6 deletions(-) diff --git a/s3tests/realistic.py b/s3tests/realistic.py index 64c406c..0755516 100644 --- a/s3tests/realistic.py +++ b/s3tests/realistic.py @@ -18,12 +18,6 @@ class RandomContentFile(object): # Let seek initialize the rest of it, rather than dup code self.seek(0) - assert self.size >= self.digest_size, \ - "Can not create RandomContentFile: size ({size}) must be >= digest_size ({digest_size})".format( - size=self.size, - digest_size=self.digest_size - ) - def _mark_chunk(self): self.chunks.append([self.offset, (time.time() - self.last_seek) * 1000000000])