Merge branch 'roundtrip'

This commit is contained in:
Tommi Virtanen 2011-07-19 09:00:44 -07:00
commit 5c25c66649
6 changed files with 418 additions and 14 deletions

View file

@ -4,13 +4,24 @@ import itertools
import os import os
import random import random
import string import string
import sys
import yaml import yaml
s3 = bunch.Bunch() s3 = bunch.Bunch()
config = bunch.Bunch() config = bunch.Bunch()
prefix = '' prefix = ''
# For those scripts that use a context, these are pretty univerally needed.
context = bunch.Bunch(
bucket = None,
# Save stdout/stderr in case they get fudged with.
real_stdout = sys.stdout,
real_stderr = sys.stderr,
)
bucket_counter = itertools.count(1) bucket_counter = itertools.count(1)
key_counter = itertools.count(1)
def choose_bucket_prefix(template, max_len=30): def choose_bucket_prefix(template, max_len=30):
""" """
@ -44,13 +55,19 @@ def nuke_prefixed_buckets():
print 'Cleaning bucket {bucket}'.format(bucket=bucket) print 'Cleaning bucket {bucket}'.format(bucket=bucket)
try: try:
bucket.set_canned_acl('private') bucket.set_canned_acl('private')
for key in bucket.list(): # TODO: deleted_cnt and the while loop is a work around for rgw
print 'Cleaning bucket {bucket} key {key}'.format( # not sending the
bucket=bucket, deleted_cnt = 1
key=key, while deleted_cnt:
) deleted_cnt = 0
key.set_canned_acl('private') for key in bucket.list():
key.delete() print 'Cleaning bucket {bucket} key {key}'.format(
bucket=bucket,
key=key,
)
key.set_canned_acl('private')
key.delete()
deleted_cnt += 1
bucket.delete() bucket.delete()
except boto.exception.S3ResponseError as e: except boto.exception.S3ResponseError as e:
# TODO workaround for buggy rgw that fails to send # TODO workaround for buggy rgw that fails to send
@ -156,3 +173,12 @@ def get_new_bucket(connection=None):
def teardown(): def teardown():
nuke_prefixed_buckets() nuke_prefixed_buckets()
def fill_pools(*args):
for pool in args:
while not pool.full():
pool.spawn()
def get_next_key(bucket=None):
return bucket.new_key("seqkey-{num}".format(num=next(key_counter)))

View file

@ -0,0 +1,93 @@
import bunch
import collections
import gevent
import random
import time
import traceback
from ..common import context, get_next_key
from ..common.results import TransferGreenletResult
from ..realistic import FileVerifier
# Make sure context has somewhere to store what we need
context.update(bunch.Bunch(
needs_first_read = collections.deque(),
all_keys = [],
files_iter = None,
))
class SafeTransferGreenlet(gevent.Greenlet):
def __init__(self, timeout=120):
gevent.Greenlet.__init__(self)
self.timeout = timeout
self.result = None
def _run(self):
result = self.result = TransferGreenletResult(self.type)
result.markStarted()
try:
with gevent.Timeout(self.timeout, False):
result.success = self._doit()
except gevent.GreenletExit:
return
except:
result.setError(show_traceback=True)
result.markFinished()
class ReaderGreenlet(SafeTransferGreenlet):
type = 'reader'
def _doit(self):
if context.needs_first_read:
key = context.needs_first_read.popleft()
elif context.all_keys:
key = random.choice(context.all_keys)
else:
time.sleep(1)
return self.result.setError('No available keys to test with reader. Try again later.')
# Copynew the key object
key = key.bucket.new_key(key.name)
self.result.setKey(key)
fp = FileVerifier()
key.get_contents_to_file(fp)
self.result.request_finish = time.time()
self.result.request_start = fp.created_at
self.result.chunks = fp.chunks
self.result.size = fp.size
if not fp.valid():
return self.result.setError('Failed to validate key {name!s}'.format(name=key.name))
return True
class WriterGreenlet(SafeTransferGreenlet):
type = 'writer'
def _doit(self):
key = get_next_key(context.bucket)
self.result.setKey(key)
fp = next(context.files_iter)
self.result.size = fp.size
key.set_contents_from_file(fp)
self.result.request_finish = time.time()
self.result.request_start = fp.start_time
self.result.chunks = fp.last_chunks
# And at the end, add to needs_first_read and shuffle
context.needs_first_read.append(key)
context.all_keys.append(key)
return True

112
s3tests/common/results.py Normal file
View file

@ -0,0 +1,112 @@
import bunch
import collections
import gevent
import sys
import time
import traceback
import yaml
from ..common import context
context.update(bunch.Bunch(
result_queue = collections.deque(),
))
class TransferGreenletResult(object):
""" Generic container object. Weeeeeeeeeeeeeee *short* """
def __init__(self, type):
# About the Greenlet
self.type = type
# About the key
self.bucket = None
self.key = None
self.size = None
# About the job
self.success = False
self.error = None
self.start_time = None
self.finish_time = None
self.duration = None
self.latency = None
self.request_start = None
self.request_finish = None
self.chunks = None
def markStarted(self):
self.start_time = time.time()
def markFinished(self):
self.finish_time = time.time()
self.duration = self.finish_time - self.start_time
context.result_queue.append(self)
def setKey(self, key):
self.key = key.name
self.bucket = key.bucket.name
def setError(self, message='Unhandled Exception', show_traceback=False):
""" Sets an error state in the result, and returns False... example usage:
return self.result.setError('Something happened', traceback=True)
"""
self.error = dict()
self.error['msg'] = message
if show_traceback:
self.error['traceback'] = traceback.format_exc()
return False
@classmethod
def repr_yaml(c, dumper, self):
data = dict()
for x in ('type', 'bucket', 'key', 'chunks'):
data[x] = self.__dict__[x]
# reader => r, writer => w
data['type'] = data['type'][0]#chunks
# the error key must be present ONLY on failure.
assert not (self.success and self.error)
if self.success:
assert self.error == None
else:
assert self.error != None
data['error'] = self.error
data['start'] = self.request_start
if self.request_finish:
data['duration'] = 1000000000 * (self.request_finish - self.request_start)
return dumper.represent_dict(data)
# And a representer for dumping a TransferGreenletResult as a YAML dict()
yaml.add_representer(TransferGreenletResult, TransferGreenletResult.repr_yaml)
class ResultsLogger(gevent.Greenlet):
""" A quick little greenlet to always run and dump results. """
def __init__(self):
gevent.Greenlet.__init__(self)
self.outfile = context.real_stdout
def _run(self):
while True:
try:
self._doit()
except:
print "An exception was encountered while dumping the results... this shouldn't happen!"
traceback.print_exc()
time.sleep(0.1)
def _doit(self):
while context.result_queue:
result = context.result_queue.popleft()
yrep = yaml.dump(result)
self.outfile.write(yrep + "---\n")

View file

@ -1,18 +1,25 @@
import bunch
import hashlib import hashlib
import random import random
import string import string
import struct import struct
import time
class RandomContentFile(object): class RandomContentFile(object):
def __init__(self, size, seed): def __init__(self, size, seed):
self.size = size
self.seed = seed self.seed = seed
self.random = random.Random(self.seed) self.random = random.Random(self.seed)
self.offset = 0
self.buffer = '' # Boto likes to seek once more after it's done reading, so we need to save the last chunks/seek value.
self.size = size self.last_chunks = self.chunks = None
self.hash = hashlib.md5() self.last_seek = self.start_time = None
self.digest_size = self.hash.digest_size
self.digest = None # Let seek initialize the rest of it, rather than dup code
self.seek(0)
def _mark_chunk(self):
self.chunks.append([self.offset, (time.time() - self.last_seek) * 1000000000])
def seek(self, offset): def seek(self, offset):
assert offset == 0 assert offset == 0
@ -20,6 +27,17 @@ class RandomContentFile(object):
self.offset = offset self.offset = offset
self.buffer = '' self.buffer = ''
self.hash = hashlib.md5()
self.digest_size = self.hash.digest_size
self.digest = None
# Save the last seek time as our start time, and the last chunks
self.start_time = self.last_seek
self.last_chunks = self.chunks
# Before emptying.
self.last_seek = time.time()
self.chunks = []
def tell(self): def tell(self):
return self.offset return self.offset
@ -58,6 +76,8 @@ class RandomContentFile(object):
data = self.digest[:digest_count] data = self.digest[:digest_count]
r.append(data) r.append(data)
self._mark_chunk()
return ''.join(r) return ''.join(r)
class FileVerifier(object): class FileVerifier(object):
@ -65,6 +85,11 @@ class FileVerifier(object):
self.size = 0 self.size = 0
self.hash = hashlib.md5() self.hash = hashlib.md5()
self.buf = '' self.buf = ''
self.created_at = time.time()
self.chunks = []
def _mark_chunk(self):
self.chunks.append([self.size, (time.time() - self.created_at) * 1000000000])
def write(self, data): def write(self, data):
self.size += len(data) self.size += len(data)
@ -72,6 +97,7 @@ class FileVerifier(object):
digsz = -1*self.hash.digest_size digsz = -1*self.hash.digest_size
new_data, self.buf = self.buf[0:digsz], self.buf[digsz:] new_data, self.buf = self.buf[0:digsz], self.buf[digsz:]
self.hash.update(new_data) self.hash.update(new_data)
self._mark_chunk()
def valid(self): def valid(self):
""" """
@ -123,7 +149,54 @@ def names(mean, stddev, charset=None, seed=None):
while True: while True:
while True: while True:
length = int(rand.normalvariate(mean, stddev)) length = int(rand.normalvariate(mean, stddev))
if length >= 0: if length > 0:
break break
name = ''.join(rand.choice(charset) for _ in xrange(length)) name = ''.join(rand.choice(charset) for _ in xrange(length))
yield name yield name
def files_varied(groups, unlimited=False):
""" Yields a weighted-random selection of file-like objects. """
# Quick data type sanity.
assert groups and isinstance(groups, (list, tuple))
total_num = 0
file_sets = []
rand = random.Random(time.time())
# Build the sets for our yield
for num, size, stddev in groups:
assert num and size
file_sets.append(bunch.Bunch(
num = num,
size = size,
stddev = stddev,
files = files(size, stddev, time.time())
))
total_num += num
while True:
if not total_num:
raise StopIteration
num = rand.randrange(total_num)
ok = 0
for file_set in file_sets:
if num > file_set.num:
num -= file_set.num
continue
if not unlimited:
total_num -= 1
file_set.num -= 1
# None left in this set!
if file_set.num == 0:
file_sets.remove(file_set)
ok = 1
yield next(file_set.files)
if not ok:
raise RuntimeError, "Couldn't find a match."

99
s3tests/roundtrip.py Normal file
View file

@ -0,0 +1,99 @@
import gevent.monkey
gevent.monkey.patch_all()
import bunch
import collections
import gevent
import gevent.pool
import itertools
import random
import realistic
import time
import traceback
import sys
import common
from common import context, config
from common.greenlets import ReaderGreenlet, WriterGreenlet
from common.results import ResultsLogger
# Set up the common context to use our information. Wee.
context.update(bunch.Bunch(
# Set to False when it's time to exit main loop.
running = True,
# The pools our tasks run in.
greenlet_pools = bunch.Bunch(
writer=None,
reader=None,
),
# The greenlet that keeps logs going.
results_logger = None,
))
def setup():
config_rt = config.roundtrip
context.bucket = common.get_new_bucket()
print "Using bucket: {name}".format(name=context.bucket.name)
context.greenlet_pools.reader = gevent.pool.Pool(config_rt.pool_sizes.reader, ReaderGreenlet)
context.greenlet_pools.writer = gevent.pool.Pool(config_rt.pool_sizes.writer, WriterGreenlet)
context.key_iter = itertools.count(1)
context.files_iter = realistic.files_varied(config_rt.create_objects)
def _main():
def _stop_running():
""" Since we can't do assignment in a lambda, we have this little stub """
context.running = False
grace_period = config.roundtrip.grace_wait
print "Launching/Scheduling essential services..."
gevent.spawn_later(config.roundtrip.duration + grace_period, _stop_running)
context.results_logger = ResultsLogger.spawn()
print "Launching the pool of writers, and giving them {grace} seconds to get ahead of us!".format(grace=grace_period)
writers_start_time = time.time()
while time.time() - writers_start_time < grace_period:
common.fill_pools(context.greenlet_pools.writer)
time.sleep(0.1)
# Main work loop.
print "Starting main work loop..."
while context.running:
common.fill_pools(*context.greenlet_pools.values())
time.sleep(0.1)
print "We've hit duration. Time to stop!"
print "Waiting {grace} seconds for jobs to finish normally.".format(grace=grace_period)
time.sleep(grace_period)
print "Killing off any remaining jobs."
context.greenlet_pools.reader.kill()
context.greenlet_pools.writer.kill()
print "Waiting 10 seconds for them to finish dying off and collections to complete!"
time.sleep(10)
print "Killing essential services..."
context.results_logger.kill()
print "Done!"
def main():
sys.stdout = sys.stderr # Original steam already saved by common
common.setup()
setup()
# Normal
try:
_main()
except:
traceback.print_exc()
common.teardown()

View file

@ -23,6 +23,7 @@ setup(
'console_scripts': [ 'console_scripts': [
's3tests-generate-objects = s3tests.generate_objects:main', 's3tests-generate-objects = s3tests.generate_objects:main',
's3tests-test-readwrite = s3tests.rand_readwrite:main', 's3tests-test-readwrite = s3tests.rand_readwrite:main',
's3tests-test-roundtrip = s3tests.roundtrip:main',
], ],
}, },