Refactor/cleanup TransferResults class. Also cleans the output to be in the format TV seems to want.

This commit is contained in:
Wesley Spikes 2011-07-15 14:19:58 -07:00
parent 3c07a4a363
commit 841b098619
4 changed files with 115 additions and 46 deletions

View file

@ -38,7 +38,7 @@ class SafeTransferGreenlet(gevent.Greenlet):
def _real_run(self): def _real_run(self):
""" Return True if we need to retry, False otherwise. """ """ Return True if we need to retry, False otherwise. """
result = self.result = TransferGreenletResult(self.type) result = self.result = TransferGreenletResult(self.type)
result.start_time = time.time() result.markStarted()
try: try:
with gevent.Timeout(self.timeout, False): with gevent.Timeout(self.timeout, False):
@ -52,13 +52,11 @@ class SafeTransferGreenlet(gevent.Greenlet):
if e.args[0].startswith('This event is already used by another greenlet'): if e.args[0].startswith('This event is already used by another greenlet'):
return True # retry return True # retry
# Different assertion error, so fail normally. # Different assertion error, so fail normally.
result.comment = traceback.format_exc() result.setError(show_traceback=True)
except Exception: except Exception:
result.comment = traceback.format_exc() result.setError(show_traceback=True)
result.finish_time = time.time() result.markFinished()
result.duration = result.finish_time - result.start_time
result.queue_finished()
return False # don't retry return False # don't retry
@ -74,21 +72,22 @@ class ReaderGreenlet(SafeTransferGreenlet):
key = random.choice(context.all_keys) key = random.choice(context.all_keys)
else: else:
time.sleep(1) time.sleep(1)
self.result.comment = 'No available keys to test with reader. Try again later.' return self.result.setError('No available keys to test with reader. Try again later.')
return False
self.key = key self.key = key
fp = FileVerifier() self.result.setKey(key)
self.result.name = key.name
fp = FileVerifier()
request_start = time.time()
key.get_contents_to_file(fp) key.get_contents_to_file(fp)
self.result.request_finish = time.time()
self.result.request_start = fp.created_at
self.result.chunks = fp.chunks
self.result.size = fp.size self.result.size = fp.size
self.result.latency = fp.first_write - request_start
if not fp.valid(): if not fp.valid():
self.result.comment = 'Failed to validate key {name!s}'.format(name=key.name) return self.result.setError('Failed to validate key {name!s}'.format(name=key.name))
return False
return True return True
@ -100,14 +99,19 @@ class WriterGreenlet(SafeTransferGreenlet):
if self.key: if self.key:
key = self.key key = self.key
else: else:
key = self.key = get_next_key(context.bucket) key = get_next_key(context.bucket)
self.key = key
self.result.setKey(key)
fp = next(context.files_iter) fp = next(context.files_iter)
self.result.name = key.name
self.result.size = fp.size self.result.size = fp.size
key.set_contents_from_file(fp) key.set_contents_from_file(fp)
self.result.latency = time.time() - fp.last_read
self.result.request_finish = time.time()
self.result.request_start = fp.start_time
self.result.chunks = fp.last_chunks
# And at the end, add to neads_first_read and shuffle # And at the end, add to neads_first_read and shuffle
context.neads_first_read.append(key) context.neads_first_read.append(key)

View file

@ -1,13 +1,13 @@
import bunch import bunch
import collections import collections
import gevent import gevent
import sys
import time import time
import traceback import traceback
import yaml import yaml
from ..common import context from ..common import context
# Make sure context has somewhere to store what we need
context.update(bunch.Bunch( context.update(bunch.Bunch(
result_queue = collections.deque(), result_queue = collections.deque(),
)) ))
@ -16,39 +16,84 @@ context.update(bunch.Bunch(
class TransferGreenletResult(object): class TransferGreenletResult(object):
""" Generic container object. Weeeeeeeeeeeeeee *short* """ """ Generic container object. Weeeeeeeeeeeeeee *short* """
def __init__(self, type): def __init__(self, type):
# About the Greenlet
self.type = type
# About the key # About the key
self.name = None self.bucket = None
self.key = None
self.size = None self.size = None
# About the job # About the job
self.type = type
self.success = False self.success = False
self.comment = None self.error = None
self.start_time = None self.start_time = None
self.finish_time = None self.finish_time = None
self.latency = None
self.duration = None self.duration = None
self.latency = None
def __repr__(self): self.request_start = None
d = self.__dict__ self.request_finish = None
d['success'] = d['success'] and 'ok' or 'FAILED'
return self._format.format(**d) self.chunks = None
def queue_finished(self): def markStarted(self):
self.start_time = time.time()
def markFinished(self):
self.finish_time = time.time()
self.duration = self.finish_time - self.start_time
context.result_queue.append(self) context.result_queue.append(self)
def setKey(self, key):
self.key = key.name
self.bucket = key.bucket.name
def setError(self, message='Unhandled Exception', show_traceback=False):
""" Sets an error state in the result, and returns False... example usage:
return self.result.setError('Something happened', traceback=True)
"""
self.error = dict()
self.error['msg'] = message
if show_traceback:
self.error['traceback'] = traceback.format_exc()
return False
@classmethod
def repr_yaml(c, dumper, self):
data = dict()
for x in ('type', 'bucket', 'key', 'chunks'):
data[x] = self.__dict__[x]
# reader => r, writer => w
data['type'] = data['type'][0]#chunks
# the error key must be present ONLY on failure.
assert not (self.success and self.error)
if self.success:
assert self.error == None
else:
assert self.error != None
data['error'] = self.error
data['start'] = self.request_start
if self.request_finish:
data['duration'] = 1000000000 * (self.request_finish - self.request_start)
return dumper.represent_dict(data)
# And a representer for dumping a TransferGreenletResult as a YAML dict() # And a representer for dumping a TransferGreenletResult as a YAML dict()
yaml.add_representer(TransferGreenletResult, lambda dumper, data: dumper.represent_dict(data.__dict__) ) yaml.add_representer(TransferGreenletResult, TransferGreenletResult.repr_yaml)
class ResultsLogger(gevent.Greenlet): class ResultsLogger(gevent.Greenlet):
""" A quick little greenlet to always run and dump results. """ """ A quick little greenlet to always run and dump results. """
def __init__(self): def __init__(self):
gevent.Greenlet.__init__(self) gevent.Greenlet.__init__(self)
self.outfile = None self.outfile = sys.stderr
def _run(self): def _run(self):
while True: while True:
@ -63,7 +108,5 @@ class ResultsLogger(gevent.Greenlet):
while context.result_queue: while context.result_queue:
result = context.result_queue.popleft() result = context.result_queue.popleft()
yrep = yaml.dump(result) yrep = yaml.dump(result)
if self.outfile: self.outfile.write(yrep + "---\n")
self.outfile.write(yrep)
print yrep, "\n"

View file

@ -7,27 +7,44 @@ import time
class RandomContentFile(object): class RandomContentFile(object):
def __init__(self, size, seed): def __init__(self, size, seed):
self.size = size
self.seed = seed self.seed = seed
self.random = random.Random(self.seed) self.random = random.Random(self.seed)
self.offset = 0
self.buffer = ''
self.size = size
self.hash = hashlib.md5()
self.digest_size = self.hash.digest_size
self.digest = None
self.last_read = 0
assert self.size >= self.digest_size, "Can not create RandomContentFile: size ({size}) must be >= digest_size ({digest_size})".format( # Boto likes to seek once more after it's done reading, so we need to save the last chunks/seek value.
self.last_chunks = self.chunks = None
self.last_seek = self.start_time = None
# Let seek initialize the rest of it, rather than dup code
self.seek(0)
assert self.size >= self.digest_size, \
"Can not create RandomContentFile: size ({size}) must be >= digest_size ({digest_size})".format(
size=self.size, size=self.size,
digest_size=self.digest_size digest_size=self.digest_size
) )
def _mark_chunk(self):
ctime = time.time()
self.chunks.append([self.offset, (ctime - self.last_seek) * 1000000000, ctime])
def seek(self, offset): def seek(self, offset):
assert offset == 0 assert offset == 0
self.random.seed(self.seed) self.random.seed(self.seed)
self.offset = offset self.offset = offset
self.buffer = '' self.buffer = ''
self.hash = hashlib.md5()
self.digest_size = self.hash.digest_size
self.digest = None
# Save the last seek time as our start time, and the last chunks
self.start_time = self.last_seek
self.last_chunks = self.chunks
# Before emptying.
self.last_seek = time.time()
self.chunks = []
def tell(self): def tell(self):
return self.offset return self.offset
@ -65,7 +82,8 @@ class RandomContentFile(object):
size -= digest_count size -= digest_count
data = self.digest[:digest_count] data = self.digest[:digest_count]
r.append(data) r.append(data)
self.last_read = time.time()
self._mark_chunk()
return ''.join(r) return ''.join(r)
@ -74,16 +92,20 @@ class FileVerifier(object):
self.size = 0 self.size = 0
self.hash = hashlib.md5() self.hash = hashlib.md5()
self.buf = '' self.buf = ''
self.first_write = 0 self.created_at = time.time()
self.chunks = []
def _mark_chunk(self):
ctime = time.time()
self.chunks.append([self.size, (ctime - self.created_at) * 1000000000, ctime])
def write(self, data): def write(self, data):
if self.size == 0:
self.first_write = time.time()
self.size += len(data) self.size += len(data)
self.buf += data self.buf += data
digsz = -1*self.hash.digest_size digsz = -1*self.hash.digest_size
new_data, self.buf = self.buf[0:digsz], self.buf[digsz:] new_data, self.buf = self.buf[0:digsz], self.buf[digsz:]
self.hash.update(new_data) self.hash.update(new_data)
self._mark_chunk()
def valid(self): def valid(self):
""" """