2011-08-08 23:51:10 +00:00
|
|
|
import nose
|
|
|
|
import random
|
|
|
|
import string
|
|
|
|
import yaml
|
|
|
|
|
|
|
|
from s3tests.fuzz_headers import *
|
|
|
|
|
|
|
|
from nose.tools import eq_ as eq
|
|
|
|
from nose.plugins.attrib import attr
|
|
|
|
|
|
|
|
from .utils import assert_raises
|
|
|
|
|
|
|
|
_decision_graph = {}
|
|
|
|
|
|
|
|
def check_access_denied(fn, *args, **kwargs):
|
|
|
|
e = assert_raises(boto.exception.S3ResponseError, fn, *args, **kwargs)
|
|
|
|
eq(e.status, 403)
|
|
|
|
eq(e.reason, 'Forbidden')
|
|
|
|
eq(e.error_code, 'AccessDenied')
|
|
|
|
|
|
|
|
|
2011-08-09 18:56:38 +00:00
|
|
|
def build_graph():
|
|
|
|
graph = {}
|
|
|
|
graph['start'] = {
|
|
|
|
'set': {},
|
2011-08-09 22:44:25 +00:00
|
|
|
'choices': ['node2']
|
2011-08-09 18:56:38 +00:00
|
|
|
}
|
|
|
|
graph['leaf'] = {
|
|
|
|
'set': {
|
|
|
|
'key1': 'value1',
|
|
|
|
'key2': 'value2'
|
|
|
|
},
|
|
|
|
'choices': []
|
|
|
|
}
|
|
|
|
graph['node1'] = {
|
|
|
|
'set': {
|
|
|
|
'key3': 'value3'
|
|
|
|
},
|
|
|
|
'choices': ['leaf']
|
|
|
|
}
|
2011-08-09 22:44:25 +00:00
|
|
|
graph['node2'] = {
|
|
|
|
'set': {
|
|
|
|
'randkey': 'value-{random 10-15 printable}',
|
|
|
|
'path': '/{bucket_readable}',
|
|
|
|
'indirect_key1': '{key1}'
|
|
|
|
},
|
|
|
|
'choices': ['leaf']
|
|
|
|
}
|
2011-08-09 18:56:38 +00:00
|
|
|
graph['bad_node'] = {
|
|
|
|
'set': {
|
|
|
|
'key1': 'value1'
|
|
|
|
},
|
|
|
|
'choices': ['leaf']
|
|
|
|
}
|
|
|
|
return graph
|
|
|
|
|
|
|
|
|
|
|
|
def test_load_graph():
|
2011-08-08 23:51:10 +00:00
|
|
|
graph_file = open('request_decision_graph.yml', 'r')
|
2011-08-09 18:56:38 +00:00
|
|
|
graph = yaml.safe_load(graph_file)
|
|
|
|
graph['start']
|
|
|
|
|
|
|
|
|
|
|
|
def test_descend_leaf_node():
|
|
|
|
graph = build_graph()
|
|
|
|
prng = random.Random(1)
|
|
|
|
decision = descend_graph(graph, 'leaf', prng)
|
|
|
|
|
|
|
|
eq(decision['key1'], 'value1')
|
|
|
|
eq(decision['key2'], 'value2')
|
|
|
|
e = assert_raises(KeyError, lambda x: decision[x], 'key3')
|
|
|
|
|
2011-08-09 22:44:25 +00:00
|
|
|
|
2011-08-09 18:56:38 +00:00
|
|
|
def test_descend_node():
|
|
|
|
graph = build_graph()
|
|
|
|
prng = random.Random(1)
|
|
|
|
decision = descend_graph(graph, 'node1', prng)
|
2011-08-08 23:51:10 +00:00
|
|
|
|
2011-08-09 18:56:38 +00:00
|
|
|
eq(decision['key1'], 'value1')
|
|
|
|
eq(decision['key2'], 'value2')
|
|
|
|
eq(decision['key3'], 'value3')
|
2011-08-08 23:51:10 +00:00
|
|
|
|
2011-08-09 22:44:25 +00:00
|
|
|
|
2011-08-09 18:56:38 +00:00
|
|
|
def test_descend_bad_node():
|
|
|
|
graph = build_graph()
|
2011-08-08 23:51:10 +00:00
|
|
|
prng = random.Random(1)
|
2011-08-09 18:56:38 +00:00
|
|
|
assert_raises(KeyError, descend_graph, graph, 'bad_node', prng)
|
2011-08-08 23:51:10 +00:00
|
|
|
|
2011-08-09 22:44:25 +00:00
|
|
|
|
|
|
|
def test_SpecialVariables_dict():
|
|
|
|
prng = random.Random(1)
|
|
|
|
testdict = {'foo': 'bar'}
|
|
|
|
tester = SpecialVariables(testdict, prng)
|
|
|
|
|
|
|
|
eq(tester['foo'], 'bar')
|
|
|
|
eq(tester['random 10-15 printable'], '[/pNI$;92@') #FIXME: how should I test pseudorandom content?
|
|
|
|
|
|
|
|
def test_assemble_decision():
|
|
|
|
graph = build_graph()
|
|
|
|
prng = random.Random(1)
|
|
|
|
decision = assemble_decision(graph, prng)
|
|
|
|
|
|
|
|
eq(decision['key1'], 'value1')
|
|
|
|
eq(decision['key2'], 'value2')
|
|
|
|
eq(decision['randkey'], 'value-{random 10-15 printable}')
|
|
|
|
eq(decision['indirect_key1'], '{key1}')
|
|
|
|
eq(decision['path'], '/{bucket_readable}')
|
|
|
|
assert_raises(KeyError, lambda x: decision[x], 'key3')
|
|
|
|
|
2011-08-10 18:27:06 +00:00
|
|
|
def test_expand_key():
|
|
|
|
prng = random.Random(1)
|
|
|
|
test_decision = {
|
|
|
|
'key1': 'value1',
|
|
|
|
'randkey': 'value-{random 10-15 printable}',
|
|
|
|
'indirect': '{key1}',
|
|
|
|
'dbl_indirect': '{indirect}'
|
|
|
|
}
|
|
|
|
decision = SpecialVariables(test_decision, prng)
|
|
|
|
|
|
|
|
randkey = expand_key(decision, 'randkey')
|
|
|
|
indirect = expand_key(decision, 'indirect')
|
|
|
|
dbl_indirect = expand_key(decision, 'dbl_indirect')
|
|
|
|
|
|
|
|
eq(indirect, 'value1')
|
|
|
|
eq(dbl_indirect, 'value1')
|
|
|
|
eq(randkey, 'value-[/pNI$;92@')
|
|
|
|
|
|
|
|
def test_expand_loop():
|
|
|
|
prng = random.Random(1)
|
|
|
|
test_decision = {
|
|
|
|
'key1': '{key2}',
|
|
|
|
'key2': '{key1}',
|
|
|
|
}
|
|
|
|
decision = SpecialVariables(test_decision, prng)
|
|
|
|
assert_raises(RuntimeError, expand_key, decision, 'key1')
|
|
|
|
|
2011-08-09 22:44:25 +00:00
|
|
|
def test_expand_decision():
|
|
|
|
graph = build_graph()
|
|
|
|
prng = random.Random(1)
|
|
|
|
|
|
|
|
decision = assemble_decision(graph, prng)
|
|
|
|
decision.update({'bucket_readable': 'my-readable-bucket'})
|
|
|
|
|
|
|
|
request = expand_decision(decision, prng)
|
|
|
|
|
|
|
|
eq(request['key1'], 'value1')
|
|
|
|
eq(request['indirect_key1'], 'value1')
|
|
|
|
eq(request['path'], '/my-readable-bucket')
|
2011-08-10 18:27:06 +00:00
|
|
|
eq(request['randkey'], 'value-NI$;92@H/0I') #FIXME: again, how to handle the pseudorandom content?
|
2011-08-09 22:44:25 +00:00
|
|
|
assert_raises(KeyError, lambda x: decision[x], 'key3')
|
|
|
|
|