From 841b0986193048ed54b9af537910498d577f4825 Mon Sep 17 00:00:00 2001 From: Wesley Spikes Date: Fri, 15 Jul 2011 14:19:58 -0700 Subject: [PATCH] Refactor/cleanup TransferResults class. Also cleans the output to be in the format TV seems to want. --- s3tests/common/greenlets.py | 40 +++++++++++--------- s3tests/common/results.py | 73 +++++++++++++++++++++++++++++-------- s3tests/realistic.py | 46 +++++++++++++++++------ s3tests/roundtrip.py | 2 +- 4 files changed, 115 insertions(+), 46 deletions(-) diff --git a/s3tests/common/greenlets.py b/s3tests/common/greenlets.py index ddd82bd..885ee3d 100644 --- a/s3tests/common/greenlets.py +++ b/s3tests/common/greenlets.py @@ -38,8 +38,8 @@ class SafeTransferGreenlet(gevent.Greenlet): def _real_run(self): """ Return True if we need to retry, False otherwise. """ result = self.result = TransferGreenletResult(self.type) - result.start_time = time.time() - + result.markStarted() + try: with gevent.Timeout(self.timeout, False): result.success = self._doit() @@ -52,13 +52,11 @@ class SafeTransferGreenlet(gevent.Greenlet): if e.args[0].startswith('This event is already used by another greenlet'): return True # retry # Different assertion error, so fail normally. - result.comment = traceback.format_exc() + result.setError(show_traceback=True) except Exception: - result.comment = traceback.format_exc() + result.setError(show_traceback=True) - result.finish_time = time.time() - result.duration = result.finish_time - result.start_time - result.queue_finished() + result.markFinished() return False # don't retry @@ -74,21 +72,22 @@ class ReaderGreenlet(SafeTransferGreenlet): key = random.choice(context.all_keys) else: time.sleep(1) - self.result.comment = 'No available keys to test with reader. Try again later.' - return False + return self.result.setError('No available keys to test with reader. Try again later.') self.key = key - fp = FileVerifier() - self.result.name = key.name + self.result.setKey(key) + + fp = FileVerifier() - request_start = time.time() key.get_contents_to_file(fp) + + self.result.request_finish = time.time() + self.result.request_start = fp.created_at + self.result.chunks = fp.chunks self.result.size = fp.size - self.result.latency = fp.first_write - request_start if not fp.valid(): - self.result.comment = 'Failed to validate key {name!s}'.format(name=key.name) - return False + return self.result.setError('Failed to validate key {name!s}'.format(name=key.name)) return True @@ -100,14 +99,19 @@ class WriterGreenlet(SafeTransferGreenlet): if self.key: key = self.key else: - key = self.key = get_next_key(context.bucket) + key = get_next_key(context.bucket) + + self.key = key + self.result.setKey(key) fp = next(context.files_iter) - self.result.name = key.name self.result.size = fp.size key.set_contents_from_file(fp) - self.result.latency = time.time() - fp.last_read + + self.result.request_finish = time.time() + self.result.request_start = fp.start_time + self.result.chunks = fp.last_chunks # And at the end, add to neads_first_read and shuffle context.neads_first_read.append(key) diff --git a/s3tests/common/results.py b/s3tests/common/results.py index 684bfc5..2a3da10 100644 --- a/s3tests/common/results.py +++ b/s3tests/common/results.py @@ -1,13 +1,13 @@ import bunch import collections import gevent +import sys import time import traceback import yaml from ..common import context -# Make sure context has somewhere to store what we need context.update(bunch.Bunch( result_queue = collections.deque(), )) @@ -16,39 +16,84 @@ context.update(bunch.Bunch( class TransferGreenletResult(object): """ Generic container object. Weeeeeeeeeeeeeee *short* """ def __init__(self, type): + # About the Greenlet + self.type = type + # About the key - self.name = None + self.bucket = None + self.key = None self.size = None # About the job - self.type = type self.success = False - self.comment = None + self.error = None + self.start_time = None self.finish_time = None - self.latency = None self.duration = None + self.latency = None - def __repr__(self): - d = self.__dict__ - d['success'] = d['success'] and 'ok' or 'FAILED' + self.request_start = None + self.request_finish = None - return self._format.format(**d) + self.chunks = None - def queue_finished(self): + def markStarted(self): + self.start_time = time.time() + + def markFinished(self): + self.finish_time = time.time() + self.duration = self.finish_time - self.start_time context.result_queue.append(self) + def setKey(self, key): + self.key = key.name + self.bucket = key.bucket.name + + def setError(self, message='Unhandled Exception', show_traceback=False): + """ Sets an error state in the result, and returns False... example usage: + + return self.result.setError('Something happened', traceback=True) + """ + self.error = dict() + self.error['msg'] = message + if show_traceback: + self.error['traceback'] = traceback.format_exc() + return False + + @classmethod + def repr_yaml(c, dumper, self): + data = dict() + for x in ('type', 'bucket', 'key', 'chunks'): + data[x] = self.__dict__[x] + + # reader => r, writer => w + data['type'] = data['type'][0]#chunks + + # the error key must be present ONLY on failure. + assert not (self.success and self.error) + if self.success: + assert self.error == None + else: + assert self.error != None + data['error'] = self.error + + data['start'] = self.request_start + if self.request_finish: + data['duration'] = 1000000000 * (self.request_finish - self.request_start) + + return dumper.represent_dict(data) # And a representer for dumping a TransferGreenletResult as a YAML dict() -yaml.add_representer(TransferGreenletResult, lambda dumper, data: dumper.represent_dict(data.__dict__) ) +yaml.add_representer(TransferGreenletResult, TransferGreenletResult.repr_yaml) class ResultsLogger(gevent.Greenlet): """ A quick little greenlet to always run and dump results. """ def __init__(self): gevent.Greenlet.__init__(self) - self.outfile = None + self.outfile = sys.stderr def _run(self): while True: @@ -63,7 +108,5 @@ class ResultsLogger(gevent.Greenlet): while context.result_queue: result = context.result_queue.popleft() yrep = yaml.dump(result) - if self.outfile: - self.outfile.write(yrep) - print yrep, "\n" + self.outfile.write(yrep + "---\n") diff --git a/s3tests/realistic.py b/s3tests/realistic.py index 41a0255..6a51a4c 100644 --- a/s3tests/realistic.py +++ b/s3tests/realistic.py @@ -7,27 +7,44 @@ import time class RandomContentFile(object): def __init__(self, size, seed): + self.size = size self.seed = seed self.random = random.Random(self.seed) - self.offset = 0 - self.buffer = '' - self.size = size - self.hash = hashlib.md5() - self.digest_size = self.hash.digest_size - self.digest = None - self.last_read = 0 - assert self.size >= self.digest_size, "Can not create RandomContentFile: size ({size}) must be >= digest_size ({digest_size})".format( + # Boto likes to seek once more after it's done reading, so we need to save the last chunks/seek value. + self.last_chunks = self.chunks = None + self.last_seek = self.start_time = None + + # Let seek initialize the rest of it, rather than dup code + self.seek(0) + + assert self.size >= self.digest_size, \ + "Can not create RandomContentFile: size ({size}) must be >= digest_size ({digest_size})".format( size=self.size, digest_size=self.digest_size ) + def _mark_chunk(self): + ctime = time.time() + self.chunks.append([self.offset, (ctime - self.last_seek) * 1000000000, ctime]) + def seek(self, offset): assert offset == 0 self.random.seed(self.seed) self.offset = offset self.buffer = '' + self.hash = hashlib.md5() + self.digest_size = self.hash.digest_size + self.digest = None + + # Save the last seek time as our start time, and the last chunks + self.start_time = self.last_seek + self.last_chunks = self.chunks + # Before emptying. + self.last_seek = time.time() + self.chunks = [] + def tell(self): return self.offset @@ -65,7 +82,8 @@ class RandomContentFile(object): size -= digest_count data = self.digest[:digest_count] r.append(data) - self.last_read = time.time() + + self._mark_chunk() return ''.join(r) @@ -74,16 +92,20 @@ class FileVerifier(object): self.size = 0 self.hash = hashlib.md5() self.buf = '' - self.first_write = 0 + self.created_at = time.time() + self.chunks = [] + + def _mark_chunk(self): + ctime = time.time() + self.chunks.append([self.size, (ctime - self.created_at) * 1000000000, ctime]) def write(self, data): - if self.size == 0: - self.first_write = time.time() self.size += len(data) self.buf += data digsz = -1*self.hash.digest_size new_data, self.buf = self.buf[0:digsz], self.buf[digsz:] self.hash.update(new_data) + self._mark_chunk() def valid(self): """ diff --git a/s3tests/roundtrip.py b/s3tests/roundtrip.py index c97ce12..c69fb94 100644 --- a/s3tests/roundtrip.py +++ b/s3tests/roundtrip.py @@ -20,7 +20,7 @@ from common.results import ResultsLogger context.update(bunch.Bunch( # Set to False when it's time to exit main loop. running = True, - + # The pools our tasks run in. greenlet_pools = bunch.Bunch( writer=None,