from __future__ import print_function
import sys
import collections
import nose
import string
import random
from pprint import pprint
import time
import boto.exception
from urlparse import urlparse
from nose.tools import eq_ as eq, ok_ as ok
from nose.plugins.attrib import attr
from nose.tools import timed
from nose.plugins.skip import SkipTest
from .. import common
from . import (
get_new_bucket,
get_new_bucket_name,
s3,
config,
_make_raw_request,
choose_bucket_prefix,
)
IGNORE_FIELD = 'IGNORETHIS'
SLEEP_INTERVAL = 0.01
SLEEP_MAX = 2.0
WEBSITE_CONFIGS_XMLFRAG = {
'IndexDoc': '${IndexDocument_Suffix}${RoutingRules}',
'IndexDocErrorDoc': '${IndexDocument_Suffix}${ErrorDocument_Key}${RoutingRules}',
'RedirectAll': '${RedirectAllRequestsTo_HostName}${RoutingRules}',
'RedirectAll+Protocol': '${RedirectAllRequestsTo_HostName}${RedirectAllRequestsTo_Protocol}${RoutingRules}',
}
INDEXDOC_TEMPLATE = '
IndexDoc
{random}'
ERRORDOC_TEMPLATE = 'ErrorDoc
{random}'
CAN_WEBSITE = None
def check_can_test_website():
global CAN_WEBSITE
# This is a bit expensive, so we cache this
if CAN_WEBSITE is None:
bucket = get_new_bucket()
try:
wsconf = bucket.get_website_configuration()
CAN_WEBSITE = True
except boto.exception.S3ResponseError as e:
if e.status == 404 and e.reason == 'Not Found' and e.error_code in ['NoSuchWebsiteConfiguration', 'NoSuchKey']:
CAN_WEBSITE = True
elif e.status == 405 and e.reason == 'Method Not Allowed' and e.error_code == 'MethodNotAllowed':
# rgw_enable_static_website is false
CAN_WEBSITE = False
elif e.status == 403 and e.reason == 'SignatureDoesNotMatch' and e.error_code == 'Forbidden':
# This is older versions that do not support the website code
CAN_WEBSITE = False
else:
raise RuntimeError("Unknown response in checking if WebsiteConf is supported", e)
finally:
bucket.delete()
if CAN_WEBSITE is True:
return True
elif CAN_WEBSITE is False:
raise SkipTest
else:
raise RuntimeError("Unknown cached response in checking if WebsiteConf is supported")
def make_website_config(xml_fragment):
"""
Take the tedious stuff out of the config
"""
return '' + xml_fragment + ''
def get_website_url(**kwargs):
"""
Return the URL to a website page
"""
proto, bucket, hostname, path = 'http', None, None, '/'
if 'proto' in kwargs:
proto = kwargs['proto']
if 'bucket' in kwargs:
bucket = kwargs['bucket']
if 'hostname' in kwargs:
hostname = kwargs['hostname']
if 'path' in kwargs:
path = kwargs['path']
if hostname is None and bucket is None:
return '/' + path.lstrip('/')
domain = config['main']['host']
if('s3website_domain' in config['main']):
domain = config['main']['s3website_domain']
elif('s3website_domain' in config['alt']):
domain = config['DEFAULT']['s3website_domain']
if hostname is None and bucket is not None:
hostname = '%s.%s' % (bucket, domain)
path = path.lstrip('/')
return "%s://%s/%s" % (proto, hostname, path)
def _test_website_populate_fragment(xml_fragment, fields):
for k in ['RoutingRules']:
if k in fields.keys() and len(fields[k]) > 0:
fields[k] = '<%s>%s%s>' % (k, fields[k], k)
f = {
'IndexDocument_Suffix': choose_bucket_prefix(template='index-{random}.html', max_len=32),
'ErrorDocument_Key': choose_bucket_prefix(template='error-{random}.html', max_len=32),
'RedirectAllRequestsTo_HostName': choose_bucket_prefix(template='{random}.{random}.com', max_len=32),
'RoutingRules': ''
}
f.update(fields)
xml_fragment = string.Template(xml_fragment).safe_substitute(**f)
return xml_fragment, f
def _test_website_prep(bucket, xml_template, hardcoded_fields = {}, expect_fail=None):
xml_fragment, f = _test_website_populate_fragment(xml_template, hardcoded_fields)
f['WebsiteConfiguration'] = ''
if not xml_template:
bucket.delete_website_configuration()
return f
config_xmlnew = make_website_config(xml_fragment)
config_xmlold = ''
try:
config_xmlold = common.normalize_xml(bucket.get_website_configuration_xml(), pretty_print=True)
except boto.exception.S3ResponseError as e:
if str(e.status) == str(404) \
and ('NoSuchWebsiteConfiguration' in e.body or 'NoSuchWebsiteConfiguration' in e.code or
'NoSuchKey' in e.body or 'NoSuchKey' in e.code):
pass
else:
raise e
try:
bucket.set_website_configuration_xml(common.trim_xml(config_xmlnew))
config_xmlnew = common.normalize_xml(config_xmlnew, pretty_print=True)
except boto.exception.S3ResponseError as e:
if expect_fail is not None:
if isinstance(expect_fail, dict):
pass
elif isinstance(expect_fail, str):
pass
raise e
# TODO: in some cases, it takes non-zero time for the config to be applied by AmazonS3
# We should figure out how to poll for changes better
# WARNING: eu-west-1 as of 2015/06/22 was taking at least 4 seconds to propogate website configs, esp when you cycle between non-null configs
time.sleep(0.1)
config_xmlcmp = common.normalize_xml(bucket.get_website_configuration_xml(), pretty_print=True)
#if config_xmlold is not None:
# print('old',config_xmlold.replace("\n",''))
#if config_xmlcmp is not None:
# print('cmp',config_xmlcmp.replace("\n",''))
#if config_xmlnew is not None:
# print('new',config_xmlnew.replace("\n",''))
# Cleanup for our validation
common.assert_xml_equal(config_xmlcmp, config_xmlnew)
#print("config_xmlcmp\n", config_xmlcmp)
#eq (config_xmlnew, config_xmlcmp)
f['WebsiteConfiguration'] = config_xmlcmp
return f
def __website_expected_reponse_status(res, status, reason):
if not isinstance(status, collections.Container):
status = set([status])
if not isinstance(reason, collections.Container):
reason = set([reason])
if status is not IGNORE_FIELD:
ok(res.status in status, 'HTTP code was %s should be %s' % (res.status, status))
if reason is not IGNORE_FIELD:
ok(res.reason in reason, 'HTTP reason was was %s should be %s' % (res.reason, reason))
def _website_expected_default_html(**kwargs):
fields = []
for k in kwargs.keys():
# AmazonS3 seems to be inconsistent, some HTML errors include BucketName, but others do not.
if k is 'BucketName':
continue
v = kwargs[k]
if isinstance(v, str):
v = [v]
elif not isinstance(v, collections.Container):
v = [v]
for v2 in v:
s = '%s: %s' % (k,v2)
fields.append(s)
return fields
def _website_expected_error_response(res, bucket_name, status, reason, code, content=None, body=None):
if body is None:
body = res.read()
print(body)
__website_expected_reponse_status(res, status, reason)
# Argh, AmazonS3 is really inconsistent, so we have a conditional test!
# This is most visible if you have an ErrorDoc present
errorcode = res.getheader('x-amz-error-code', None)
if errorcode is not None:
if code is not IGNORE_FIELD:
eq(errorcode, code)
if not isinstance(content, collections.Container):
content = set([content])
for f in content:
if f is not IGNORE_FIELD and f is not None:
ok(f in body, 'HTML should contain "%s"' % (f, ))
def _website_expected_redirect_response(res, status, reason, new_url):
body = res.read()
print(body)
__website_expected_reponse_status(res, status, reason)
loc = res.getheader('Location', None)
eq(loc, new_url, 'Location header should be set "%s" != "%s"' % (loc,new_url,))
ok(len(body) == 0, 'Body of a redirect should be empty')
def _website_request(bucket_name, path, connect_hostname=None, method='GET', timeout=None):
url = get_website_url(proto='http', bucket=bucket_name, path=path)
print("url", url)
o = urlparse(url)
if connect_hostname is None:
connect_hostname = o.hostname
path = o.path + '?' + o.query
request_headers={}
request_headers['Host'] = o.hostname
request_headers['Accept'] = '*/*'
print('Request: {method} {path}\n{headers}'.format(method=method, path=path, headers=''.join(map(lambda t: t[0]+':'+t[1]+"\n", request_headers.items()))))
res = _make_raw_request(connect_hostname, config.main.port, method, path, request_headers=request_headers, secure=False, timeout=timeout)
for (k,v) in res.getheaders():
print(k,v)
return res
# ---------- Non-existant buckets via the website endpoint
@attr(resource='bucket')
@attr(method='get')
@attr(operation='list')
@attr(assertion='non-existant bucket via website endpoint should give NoSuchBucket, exposing security risk')
@attr('s3website')
@attr('fails_on_rgw')
@nose.with_setup(setup=check_can_test_website, teardown=common.teardown)
def test_website_nonexistant_bucket_s3():
bucket_name = get_new_bucket_name()
res = _website_request(bucket_name, '')
_website_expected_error_response(res, bucket_name, 404, 'Not Found', 'NoSuchBucket', content=_website_expected_default_html(Code='NoSuchBucket'))
@attr(resource='bucket')
@attr(method='get')
@attr(operation='list')
#@attr(assertion='non-existant bucket via website endpoint should give Forbidden, keeping bucket identity secure')
@attr(assertion='non-existant bucket via website endpoint should give NoSuchBucket')
@attr('s3website')
@attr('fails_on_s3')
@nose.with_setup(setup=check_can_test_website, teardown=common.teardown)
def test_website_nonexistant_bucket_rgw():
bucket_name = get_new_bucket_name()
res = _website_request(bucket_name, '')
#_website_expected_error_response(res, bucket_name, 403, 'Forbidden', 'AccessDenied', content=_website_expected_default_html(Code='AccessDenied'))
_website_expected_error_response(res, bucket_name, 404, 'Not Found', 'NoSuchBucket', content=_website_expected_default_html(Code='NoSuchBucket'))
#------------- IndexDocument only, successes
@attr(resource='bucket')
@attr(method='get')
@attr(operation='list')
@attr(assertion='non-empty public buckets via s3website return page for /, where page is public')
@attr('s3website')
@nose.with_setup(setup=check_can_test_website, teardown=common.teardown)
@timed(10)
def test_website_public_bucket_list_public_index():
bucket = get_new_bucket()
f = _test_website_prep(bucket, WEBSITE_CONFIGS_XMLFRAG['IndexDoc'])
bucket.make_public()
indexhtml = bucket.new_key(f['IndexDocument_Suffix'])
indexstring = choose_bucket_prefix(template=INDEXDOC_TEMPLATE, max_len=256)
indexhtml.set_contents_from_string(indexstring)
indexhtml.make_public()
#time.sleep(1)
while bucket.get_key(f['IndexDocument_Suffix']) is None:
time.sleep(SLEEP_INTERVAL)
res = _website_request(bucket.name, '')
body = res.read()
print(body)
eq(body, indexstring) # default content should match index.html set content
__website_expected_reponse_status(res, 200, 'OK')
indexhtml.delete()
bucket.delete()
@attr(resource='bucket')
@attr(method='get')
@attr(operation='list')
@attr(assertion='non-empty private buckets via s3website return page for /, where page is private')
@attr('s3website')
@nose.with_setup(setup=check_can_test_website, teardown=common.teardown)
def test_website_private_bucket_list_public_index():
bucket = get_new_bucket()
f = _test_website_prep(bucket, WEBSITE_CONFIGS_XMLFRAG['IndexDoc'])
bucket.set_canned_acl('private')
indexhtml = bucket.new_key(f['IndexDocument_Suffix'])
indexstring = choose_bucket_prefix(template=INDEXDOC_TEMPLATE, max_len=256)
indexhtml.set_contents_from_string(indexstring)
indexhtml.make_public()
#time.sleep(1)
while bucket.get_key(f['IndexDocument_Suffix']) is None:
time.sleep(SLEEP_INTERVAL)
res = _website_request(bucket.name, '')
__website_expected_reponse_status(res, 200, 'OK')
body = res.read()
print(body)
eq(body, indexstring, 'default content should match index.html set content')
indexhtml.delete()
bucket.delete()
# ---------- IndexDocument only, failures
@attr(resource='bucket')
@attr(method='get')
@attr(operation='list')
@attr(assertion='empty private buckets via s3website return a 403 for /')
@attr('s3website')
@nose.with_setup(setup=check_can_test_website, teardown=common.teardown)
def test_website_private_bucket_list_empty():
bucket = get_new_bucket()
f = _test_website_prep(bucket, WEBSITE_CONFIGS_XMLFRAG['IndexDoc'])
bucket.set_canned_acl('private')
# TODO: wait for sync
res = _website_request(bucket.name, '')
_website_expected_error_response(res, bucket.name, 403, 'Forbidden', 'AccessDenied', content=_website_expected_default_html(Code='AccessDenied'))
bucket.delete()
@attr(resource='bucket')
@attr(method='get')
@attr(operation='list')
@attr(assertion='empty public buckets via s3website return a 404 for /')
@attr('s3website')
@nose.with_setup(setup=check_can_test_website, teardown=common.teardown)
def test_website_public_bucket_list_empty():
bucket = get_new_bucket()
f = _test_website_prep(bucket, WEBSITE_CONFIGS_XMLFRAG['IndexDoc'])
bucket.make_public()
res = _website_request(bucket.name, '')
_website_expected_error_response(res, bucket.name, 404, 'Not Found', 'NoSuchKey', content=_website_expected_default_html(Code='NoSuchKey'))
bucket.delete()
@attr(resource='bucket')
@attr(method='get')
@attr(operation='list')
@attr(assertion='non-empty public buckets via s3website return page for /, where page is private')
@attr('s3website')
@nose.with_setup(setup=check_can_test_website, teardown=common.teardown)
def test_website_public_bucket_list_private_index():
bucket = get_new_bucket()
f = _test_website_prep(bucket, WEBSITE_CONFIGS_XMLFRAG['IndexDoc'])
bucket.make_public()
indexhtml = bucket.new_key(f['IndexDocument_Suffix'])
indexstring = choose_bucket_prefix(template=INDEXDOC_TEMPLATE, max_len=256)
indexhtml.set_contents_from_string(indexstring)
indexhtml.set_canned_acl('private')
#time.sleep(1)
#time.sleep(1)
while bucket.get_key(f['IndexDocument_Suffix']) is None:
time.sleep(SLEEP_INTERVAL)
res = _website_request(bucket.name, '')
_website_expected_error_response(res, bucket.name, 403, 'Forbidden', 'AccessDenied', content=_website_expected_default_html(Code='AccessDenied'))
indexhtml.delete()
bucket.delete()
@attr(resource='bucket')
@attr(method='get')
@attr(operation='list')
@attr(assertion='non-empty private buckets via s3website return page for /, where page is private')
@attr('s3website')
@nose.with_setup(setup=check_can_test_website, teardown=common.teardown)
def test_website_private_bucket_list_private_index():
bucket = get_new_bucket()
f = _test_website_prep(bucket, WEBSITE_CONFIGS_XMLFRAG['IndexDoc'])
bucket.set_canned_acl('private')
indexhtml = bucket.new_key(f['IndexDocument_Suffix'])
indexstring = choose_bucket_prefix(template=INDEXDOC_TEMPLATE, max_len=256)
indexhtml.set_contents_from_string(indexstring)
indexhtml.set_canned_acl('private')
##time.sleep(1)
while bucket.get_key(f['IndexDocument_Suffix']) is None:
time.sleep(SLEEP_INTERVAL)
res = _website_request(bucket.name, '')
_website_expected_error_response(res, bucket.name, 403, 'Forbidden', 'AccessDenied', content=_website_expected_default_html(Code='AccessDenied'))
indexhtml.delete()
bucket.delete()
# ---------- IndexDocument & ErrorDocument, failures due to errordoc assigned but missing
@attr(resource='bucket')
@attr(method='get')
@attr(operation='list')
@attr(assertion='empty private buckets via s3website return a 403 for /, missing errordoc')
@attr('s3website')
@nose.with_setup(setup=check_can_test_website, teardown=common.teardown)
def test_website_private_bucket_list_empty_missingerrordoc():
bucket = get_new_bucket()
f = _test_website_prep(bucket, WEBSITE_CONFIGS_XMLFRAG['IndexDocErrorDoc'])
bucket.set_canned_acl('private')
res = _website_request(bucket.name, '')
_website_expected_error_response(res, bucket.name, 403, 'Forbidden', 'AccessDenied', content=_website_expected_default_html(Code='AccessDenied'))
bucket.delete()
@attr(resource='bucket')
@attr(method='get')
@attr(operation='list')
@attr(assertion='empty public buckets via s3website return a 404 for /, missing errordoc')
@attr('s3website')
@nose.with_setup(setup=check_can_test_website, teardown=common.teardown)
def test_website_public_bucket_list_empty_missingerrordoc():
bucket = get_new_bucket()
f = _test_website_prep(bucket, WEBSITE_CONFIGS_XMLFRAG['IndexDocErrorDoc'])
bucket.make_public()
res = _website_request(bucket.name, '')
_website_expected_error_response(res, bucket.name, 404, 'Not Found', 'NoSuchKey')
bucket.delete()
@attr(resource='bucket')
@attr(method='get')
@attr(operation='list')
@attr(assertion='non-empty public buckets via s3website return page for /, where page is private, missing errordoc')
@attr('s3website')
@nose.with_setup(setup=check_can_test_website, teardown=common.teardown)
def test_website_public_bucket_list_private_index_missingerrordoc():
bucket = get_new_bucket()
f = _test_website_prep(bucket, WEBSITE_CONFIGS_XMLFRAG['IndexDocErrorDoc'])
bucket.make_public()
indexhtml = bucket.new_key(f['IndexDocument_Suffix'])
indexstring = choose_bucket_prefix(template=INDEXDOC_TEMPLATE, max_len=256)
indexhtml.set_contents_from_string(indexstring)
indexhtml.set_canned_acl('private')
#time.sleep(1)
while bucket.get_key(f['IndexDocument_Suffix']) is None:
time.sleep(SLEEP_INTERVAL)
res = _website_request(bucket.name, '')
_website_expected_error_response(res, bucket.name, 403, 'Forbidden', 'AccessDenied', content=_website_expected_default_html(Code='AccessDenied'))
indexhtml.delete()
bucket.delete()
@attr(resource='bucket')
@attr(method='get')
@attr(operation='list')
@attr(assertion='non-empty private buckets via s3website return page for /, where page is private, missing errordoc')
@attr('s3website')
@nose.with_setup(setup=check_can_test_website, teardown=common.teardown)
def test_website_private_bucket_list_private_index_missingerrordoc():
bucket = get_new_bucket()
f = _test_website_prep(bucket, WEBSITE_CONFIGS_XMLFRAG['IndexDocErrorDoc'])
bucket.set_canned_acl('private')
indexhtml = bucket.new_key(f['IndexDocument_Suffix'])
indexstring = choose_bucket_prefix(template=INDEXDOC_TEMPLATE, max_len=256)
indexhtml.set_contents_from_string(indexstring)
indexhtml.set_canned_acl('private')
#time.sleep(1)
while bucket.get_key(f['IndexDocument_Suffix']) is None:
time.sleep(SLEEP_INTERVAL)
res = _website_request(bucket.name, '')
_website_expected_error_response(res, bucket.name, 403, 'Forbidden', 'AccessDenied', content=_website_expected_default_html(Code='AccessDenied'))
indexhtml.delete()
bucket.delete()
# ---------- IndexDocument & ErrorDocument, failures due to errordoc assigned but not accessible
@attr(resource='bucket')
@attr(method='get')
@attr(operation='list')
@attr(assertion='empty private buckets via s3website return a 403 for /, blocked errordoc')
@attr('s3website')
@nose.with_setup(setup=check_can_test_website, teardown=common.teardown)
def test_website_private_bucket_list_empty_blockederrordoc():
bucket = get_new_bucket()
f = _test_website_prep(bucket, WEBSITE_CONFIGS_XMLFRAG['IndexDocErrorDoc'])
bucket.set_canned_acl('private')
errorhtml = bucket.new_key(f['ErrorDocument_Key'])
errorstring = choose_bucket_prefix(template=ERRORDOC_TEMPLATE, max_len=256)
errorhtml.set_contents_from_string(errorstring)
errorhtml.set_canned_acl('private')
#time.sleep(1)
while bucket.get_key(f['ErrorDocument_Key']) is None:
time.sleep(SLEEP_INTERVAL)
res = _website_request(bucket.name, '')
body = res.read()
print(body)
_website_expected_error_response(res, bucket.name, 403, 'Forbidden', 'AccessDenied', content=_website_expected_default_html(Code='AccessDenied'), body=body)
ok(errorstring not in body, 'error content should NOT match error.html set content')
errorhtml.delete()
bucket.delete()
@attr(resource='bucket')
@attr(method='get')
@attr(operation='list')
@attr(assertion='empty public buckets via s3website return a 404 for /, blocked errordoc')
@attr('s3website')
@nose.with_setup(setup=check_can_test_website, teardown=common.teardown)
def test_website_public_bucket_list_empty_blockederrordoc():
bucket = get_new_bucket()
f = _test_website_prep(bucket, WEBSITE_CONFIGS_XMLFRAG['IndexDocErrorDoc'])
bucket.make_public()
errorhtml = bucket.new_key(f['ErrorDocument_Key'])
errorstring = choose_bucket_prefix(template=ERRORDOC_TEMPLATE, max_len=256)
errorhtml.set_contents_from_string(errorstring)
errorhtml.set_canned_acl('private')
while bucket.get_key(f['ErrorDocument_Key']) is None:
time.sleep(SLEEP_INTERVAL)
res = _website_request(bucket.name, '')
body = res.read()
print(body)
_website_expected_error_response(res, bucket.name, 404, 'Not Found', 'NoSuchKey', content=_website_expected_default_html(Code='NoSuchKey'), body=body)
ok(errorstring not in body, 'error content should match error.html set content')
errorhtml.delete()
bucket.delete()
@attr(resource='bucket')
@attr(method='get')
@attr(operation='list')
@attr(assertion='non-empty public buckets via s3website return page for /, where page is private, blocked errordoc')
@attr('s3website')
@nose.with_setup(setup=check_can_test_website, teardown=common.teardown)
def test_website_public_bucket_list_private_index_blockederrordoc():
bucket = get_new_bucket()
f = _test_website_prep(bucket, WEBSITE_CONFIGS_XMLFRAG['IndexDocErrorDoc'])
bucket.make_public()
indexhtml = bucket.new_key(f['IndexDocument_Suffix'])
indexstring = choose_bucket_prefix(template=INDEXDOC_TEMPLATE, max_len=256)
indexhtml.set_contents_from_string(indexstring)
indexhtml.set_canned_acl('private')
errorhtml = bucket.new_key(f['ErrorDocument_Key'])
errorstring = choose_bucket_prefix(template=ERRORDOC_TEMPLATE, max_len=256)
errorhtml.set_contents_from_string(errorstring)
errorhtml.set_canned_acl('private')
#time.sleep(1)
while bucket.get_key(f['ErrorDocument_Key']) is None:
time.sleep(SLEEP_INTERVAL)
res = _website_request(bucket.name, '')
body = res.read()
print(body)
_website_expected_error_response(res, bucket.name, 403, 'Forbidden', 'AccessDenied', content=_website_expected_default_html(Code='AccessDenied'), body=body)
ok(errorstring not in body, 'error content should match error.html set content')
indexhtml.delete()
errorhtml.delete()
bucket.delete()
@attr(resource='bucket')
@attr(method='get')
@attr(operation='list')
@attr(assertion='non-empty private buckets via s3website return page for /, where page is private, blocked errordoc')
@attr('s3website')
@nose.with_setup(setup=check_can_test_website, teardown=common.teardown)
def test_website_private_bucket_list_private_index_blockederrordoc():
bucket = get_new_bucket()
f = _test_website_prep(bucket, WEBSITE_CONFIGS_XMLFRAG['IndexDocErrorDoc'])
bucket.set_canned_acl('private')
indexhtml = bucket.new_key(f['IndexDocument_Suffix'])
indexstring = choose_bucket_prefix(template=INDEXDOC_TEMPLATE, max_len=256)
indexhtml.set_contents_from_string(indexstring)
indexhtml.set_canned_acl('private')
errorhtml = bucket.new_key(f['ErrorDocument_Key'])
errorstring = choose_bucket_prefix(template=ERRORDOC_TEMPLATE, max_len=256)
errorhtml.set_contents_from_string(errorstring)
errorhtml.set_canned_acl('private')
#time.sleep(1)
while bucket.get_key(f['ErrorDocument_Key']) is None:
time.sleep(SLEEP_INTERVAL)
res = _website_request(bucket.name, '')
body = res.read()
print(body)
_website_expected_error_response(res, bucket.name, 403, 'Forbidden', 'AccessDenied', content=_website_expected_default_html(Code='AccessDenied'), body=body)
ok(errorstring not in body, 'error content should match error.html set content')
indexhtml.delete()
errorhtml.delete()
bucket.delete()
# ---------- IndexDocument & ErrorDocument, failures with errordoc available
@attr(resource='bucket')
@attr(method='get')
@attr(operation='list')
@attr(assertion='empty private buckets via s3website return a 403 for /, good errordoc')
@attr('s3website')
@nose.with_setup(setup=check_can_test_website, teardown=common.teardown)
def test_website_private_bucket_list_empty_gooderrordoc():
bucket = get_new_bucket()
f = _test_website_prep(bucket, WEBSITE_CONFIGS_XMLFRAG['IndexDocErrorDoc'])
bucket.set_canned_acl('private')
errorhtml = bucket.new_key(f['ErrorDocument_Key'])
errorstring = choose_bucket_prefix(template=ERRORDOC_TEMPLATE, max_len=256)
errorhtml.set_contents_from_string(errorstring, policy='public-read')
#time.sleep(1)
while bucket.get_key(f['ErrorDocument_Key']) is None:
time.sleep(SLEEP_INTERVAL)
res = _website_request(bucket.name, '')
_website_expected_error_response(res, bucket.name, 403, 'Forbidden', 'AccessDenied', content=[errorstring])
errorhtml.delete()
bucket.delete()
@attr(resource='bucket')
@attr(method='get')
@attr(operation='list')
@attr(assertion='empty public buckets via s3website return a 404 for /, good errordoc')
@attr('s3website')
@nose.with_setup(setup=check_can_test_website, teardown=common.teardown)
def test_website_public_bucket_list_empty_gooderrordoc():
bucket = get_new_bucket()
f = _test_website_prep(bucket, WEBSITE_CONFIGS_XMLFRAG['IndexDocErrorDoc'])
bucket.make_public()
errorhtml = bucket.new_key(f['ErrorDocument_Key'])
errorstring = choose_bucket_prefix(template=ERRORDOC_TEMPLATE, max_len=256)
errorhtml.set_contents_from_string(errorstring)
errorhtml.set_canned_acl('public-read')
#time.sleep(1)
while bucket.get_key(f['ErrorDocument_Key']) is None:
time.sleep(SLEEP_INTERVAL)
res = _website_request(bucket.name, '')
_website_expected_error_response(res, bucket.name, 404, 'Not Found', 'NoSuchKey', content=[errorstring])
errorhtml.delete()
bucket.delete()
@attr(resource='bucket')
@attr(method='get')
@attr(operation='list')
@attr(assertion='non-empty public buckets via s3website return page for /, where page is private')
@attr('s3website')
@nose.with_setup(setup=check_can_test_website, teardown=common.teardown)
def test_website_public_bucket_list_private_index_gooderrordoc():
bucket = get_new_bucket()
f = _test_website_prep(bucket, WEBSITE_CONFIGS_XMLFRAG['IndexDocErrorDoc'])
bucket.make_public()
indexhtml = bucket.new_key(f['IndexDocument_Suffix'])
indexstring = choose_bucket_prefix(template=INDEXDOC_TEMPLATE, max_len=256)
indexhtml.set_contents_from_string(indexstring)
indexhtml.set_canned_acl('private')
errorhtml = bucket.new_key(f['ErrorDocument_Key'])
errorstring = choose_bucket_prefix(template=ERRORDOC_TEMPLATE, max_len=256)
errorhtml.set_contents_from_string(errorstring)
errorhtml.set_canned_acl('public-read')
#time.sleep(1)
while bucket.get_key(f['ErrorDocument_Key']) is None:
time.sleep(SLEEP_INTERVAL)
res = _website_request(bucket.name, '')
_website_expected_error_response(res, bucket.name, 403, 'Forbidden', 'AccessDenied', content=[errorstring])
indexhtml.delete()
errorhtml.delete()
bucket.delete()
@attr(resource='bucket')
@attr(method='get')
@attr(operation='list')
@attr(assertion='non-empty private buckets via s3website return page for /, where page is private')
@attr('s3website')
@nose.with_setup(setup=check_can_test_website, teardown=common.teardown)
def test_website_private_bucket_list_private_index_gooderrordoc():
bucket = get_new_bucket()
f = _test_website_prep(bucket, WEBSITE_CONFIGS_XMLFRAG['IndexDocErrorDoc'])
bucket.set_canned_acl('private')
indexhtml = bucket.new_key(f['IndexDocument_Suffix'])
indexstring = choose_bucket_prefix(template=INDEXDOC_TEMPLATE, max_len=256)
indexhtml.set_contents_from_string(indexstring)
indexhtml.set_canned_acl('private')
errorhtml = bucket.new_key(f['ErrorDocument_Key'])
errorstring = choose_bucket_prefix(template=ERRORDOC_TEMPLATE, max_len=256)
errorhtml.set_contents_from_string(errorstring)
errorhtml.set_canned_acl('public-read')
#time.sleep(1)
while bucket.get_key(f['ErrorDocument_Key']) is None:
time.sleep(SLEEP_INTERVAL)
res = _website_request(bucket.name, '')
_website_expected_error_response(res, bucket.name, 403, 'Forbidden', 'AccessDenied', content=[errorstring])
indexhtml.delete()
errorhtml.delete()
bucket.delete()
# ------ RedirectAll tests
@attr(resource='bucket')
@attr(method='get')
@attr(operation='list')
@attr(assertion='RedirectAllRequestsTo without protocol should TODO')
@attr('s3website')
@nose.with_setup(setup=check_can_test_website, teardown=common.teardown)
def test_website_bucket_private_redirectall_base():
bucket = get_new_bucket()
f = _test_website_prep(bucket, WEBSITE_CONFIGS_XMLFRAG['RedirectAll'])
bucket.set_canned_acl('private')
res = _website_request(bucket.name, '')
new_url = 'http://%s/' % f['RedirectAllRequestsTo_HostName']
_website_expected_redirect_response(res, 301, ['Moved Permanently'], new_url)
bucket.delete()
@attr(resource='bucket')
@attr(method='get')
@attr(operation='list')
@attr(assertion='RedirectAllRequestsTo without protocol should TODO')
@attr('s3website')
@nose.with_setup(setup=check_can_test_website, teardown=common.teardown)
def test_website_bucket_private_redirectall_path():
bucket = get_new_bucket()
f = _test_website_prep(bucket, WEBSITE_CONFIGS_XMLFRAG['RedirectAll'])
bucket.set_canned_acl('private')
pathfragment = choose_bucket_prefix(template='/{random}', max_len=16)
res = _website_request(bucket.name, pathfragment)
new_url = 'http://%s%s' % (f['RedirectAllRequestsTo_HostName'], pathfragment)
_website_expected_redirect_response(res, 301, ['Moved Permanently'], new_url)
bucket.delete()
@attr(resource='bucket')
@attr(method='get')
@attr(operation='list')
@attr(assertion='RedirectAllRequestsTo without protocol should TODO')
@attr('s3website')
@nose.with_setup(setup=check_can_test_website, teardown=common.teardown)
def test_website_bucket_private_redirectall_path_upgrade():
bucket = get_new_bucket()
x = string.Template(WEBSITE_CONFIGS_XMLFRAG['RedirectAll+Protocol']).safe_substitute(RedirectAllRequestsTo_Protocol='https')
f = _test_website_prep(bucket, x)
bucket.set_canned_acl('private')
pathfragment = choose_bucket_prefix(template='/{random}', max_len=16)
res = _website_request(bucket.name, pathfragment)
new_url = 'https://%s%s' % (f['RedirectAllRequestsTo_HostName'], pathfragment)
_website_expected_redirect_response(res, 301, ['Moved Permanently'], new_url)
bucket.delete()
# ------ x-amz redirect tests
@attr(resource='bucket')
@attr(method='get')
@attr(operation='list')
@attr(assertion='x-amz-website-redirect-location should not fire without websiteconf')
@attr('s3website')
@attr('x-amz-website-redirect-location')
@nose.with_setup(setup=check_can_test_website, teardown=common.teardown)
def test_website_xredirect_nonwebsite():
bucket = get_new_bucket()
#f = _test_website_prep(bucket, WEBSITE_CONFIGS_XMLFRAG['RedirectAll'])
#bucket.set_canned_acl('private')
k = bucket.new_key('page')
content = 'wrong-content'
redirect_dest = '/relative'
headers = {'x-amz-website-redirect-location': redirect_dest}
k.set_contents_from_string(content, headers=headers, policy='public-read')
redirect = k.get_redirect()
eq(k.get_redirect(), redirect_dest)
res = _website_request(bucket.name, '/page')
body = res.read()
print(body)
expected_content = _website_expected_default_html(Code='NoSuchWebsiteConfiguration', BucketName=bucket.name)
# TODO: RGW does not have custom error messages for different 404s yet
#expected_content = _website_expected_default_html(Code='NoSuchWebsiteConfiguration', BucketName=bucket.name, Message='The specified bucket does not have a website configuration')
print(expected_content)
_website_expected_error_response(res, bucket.name, 404, 'Not Found', 'NoSuchWebsiteConfiguration', content=expected_content, body=body)
k.delete()
bucket.delete()
@attr(resource='bucket')
@attr(method='get')
@attr(operation='list')
@attr(assertion='x-amz-website-redirect-location should fire websiteconf, relative path, public key')
@attr('s3website')
@attr('x-amz-website-redirect-location')
@nose.with_setup(setup=check_can_test_website, teardown=common.teardown)
def test_website_xredirect_public_relative():
bucket = get_new_bucket()
f = _test_website_prep(bucket, WEBSITE_CONFIGS_XMLFRAG['IndexDoc'])
bucket.make_public()
k = bucket.new_key('page')
content = 'wrong-content'
redirect_dest = '/relative'
headers = {'x-amz-website-redirect-location': redirect_dest}
k.set_contents_from_string(content, headers=headers, policy='public-read')
redirect = k.get_redirect()
eq(k.get_redirect(), redirect_dest)
res = _website_request(bucket.name, '/page')
#new_url = get_website_url(bucket_name=bucket.name, path=redirect_dest)
_website_expected_redirect_response(res, 301, ['Moved Permanently'], redirect_dest)
k.delete()
bucket.delete()
@attr(resource='bucket')
@attr(method='get')
@attr(operation='list')
@attr(assertion='x-amz-website-redirect-location should fire websiteconf, absolute, public key')
@attr('s3website')
@attr('x-amz-website-redirect-location')
@nose.with_setup(setup=check_can_test_website, teardown=common.teardown)
def test_website_xredirect_public_abs():
bucket = get_new_bucket()
f = _test_website_prep(bucket, WEBSITE_CONFIGS_XMLFRAG['IndexDoc'])
bucket.make_public()
k = bucket.new_key('page')
content = 'wrong-content'
redirect_dest = 'http://example.com/foo'
headers = {'x-amz-website-redirect-location': redirect_dest}
k.set_contents_from_string(content, headers=headers, policy='public-read')
redirect = k.get_redirect()
eq(k.get_redirect(), redirect_dest)
res = _website_request(bucket.name, '/page')
new_url = get_website_url(proto='http', hostname='example.com', path='/foo')
_website_expected_redirect_response(res, 301, ['Moved Permanently'], new_url)
k.delete()
bucket.delete()
@attr(resource='bucket')
@attr(method='get')
@attr(operation='list')
@attr(assertion='x-amz-website-redirect-location should fire websiteconf, relative path, private key')
@attr('s3website')
@attr('x-amz-website-redirect-location')
@nose.with_setup(setup=check_can_test_website, teardown=common.teardown)
def test_website_xredirect_private_relative():
bucket = get_new_bucket()
f = _test_website_prep(bucket, WEBSITE_CONFIGS_XMLFRAG['IndexDoc'])
bucket.make_public()
k = bucket.new_key('page')
content = 'wrong-content'
redirect_dest = '/relative'
headers = {'x-amz-website-redirect-location': redirect_dest}
k.set_contents_from_string(content, headers=headers, policy='private')
redirect = k.get_redirect()
eq(k.get_redirect(), redirect_dest)
res = _website_request(bucket.name, '/page')
# We get a 403 because the page is private
_website_expected_error_response(res, bucket.name, 403, 'Forbidden', 'AccessDenied', content=_website_expected_default_html(Code='AccessDenied'))
k.delete()
bucket.delete()
@attr(resource='bucket')
@attr(method='get')
@attr(operation='list')
@attr(assertion='x-amz-website-redirect-location should fire websiteconf, absolute, private key')
@attr('s3website')
@attr('x-amz-website-redirect-location')
@nose.with_setup(setup=check_can_test_website, teardown=common.teardown)
def test_website_xredirect_private_abs():
bucket = get_new_bucket()
f = _test_website_prep(bucket, WEBSITE_CONFIGS_XMLFRAG['IndexDoc'])
bucket.make_public()
k = bucket.new_key('page')
content = 'wrong-content'
redirect_dest = 'http://example.com/foo'
headers = {'x-amz-website-redirect-location': redirect_dest}
k.set_contents_from_string(content, headers=headers, policy='private')
redirect = k.get_redirect()
eq(k.get_redirect(), redirect_dest)
res = _website_request(bucket.name, '/page')
new_url = get_website_url(proto='http', hostname='example.com', path='/foo')
# We get a 403 because the page is private
_website_expected_error_response(res, bucket.name, 403, 'Forbidden', 'AccessDenied', content=_website_expected_default_html(Code='AccessDenied'))
k.delete()
bucket.delete()
# ------ RoutingRules tests
# RoutingRules
ROUTING_RULES = {
'empty': '',
'AmazonExample1': \
"""
docs/
documents/
""",
'AmazonExample1+Protocol=https': \
"""
docs/
https
documents/
""",
'AmazonExample1+Protocol=https+Hostname=xyzzy': \
"""
docs/
https
xyzzy
documents/
""",
'AmazonExample1+Protocol=http2': \
"""
docs/
http2
documents/
""",
'AmazonExample2': \
"""
images/
folderdeleted.html
""",
'AmazonExample2+HttpRedirectCode=TMPL': \
"""
images/
{HttpRedirectCode}
folderdeleted.html
""",
'AmazonExample3': \
"""
404
ec2-11-22-333-44.compute-1.amazonaws.com
report-404/
""",
'AmazonExample3+KeyPrefixEquals': \
"""
images/
404
ec2-11-22-333-44.compute-1.amazonaws.com
report-404/
""",
}
for k in ROUTING_RULES.keys():
if len(ROUTING_RULES[k]) > 0:
ROUTING_RULES[k] = "\n%s" % (k, ROUTING_RULES[k])
ROUTING_RULES_TESTS = [
dict(xml=dict(RoutingRules=ROUTING_RULES['empty']), url='', location=None, code=200),
dict(xml=dict(RoutingRules=ROUTING_RULES['empty']), url='/', location=None, code=200),
dict(xml=dict(RoutingRules=ROUTING_RULES['empty']), url='/x', location=None, code=404),
dict(xml=dict(RoutingRules=ROUTING_RULES['AmazonExample1']), url='/', location=None, code=200),
dict(xml=dict(RoutingRules=ROUTING_RULES['AmazonExample1']), url='/x', location=None, code=404),
dict(xml=dict(RoutingRules=ROUTING_RULES['AmazonExample1']), url='/docs/', location=dict(proto='http',bucket='{bucket_name}',path='/documents/'), code=301),
dict(xml=dict(RoutingRules=ROUTING_RULES['AmazonExample1']), url='/docs/x', location=dict(proto='http',bucket='{bucket_name}',path='/documents/x'), code=301),
dict(xml=dict(RoutingRules=ROUTING_RULES['AmazonExample1+Protocol=https']), url='/', location=None, code=200),
dict(xml=dict(RoutingRules=ROUTING_RULES['AmazonExample1+Protocol=https']), url='/x', location=None, code=404),
dict(xml=dict(RoutingRules=ROUTING_RULES['AmazonExample1+Protocol=https']), url='/docs/', location=dict(proto='https',bucket='{bucket_name}',path='/documents/'), code=301),
dict(xml=dict(RoutingRules=ROUTING_RULES['AmazonExample1+Protocol=https']), url='/docs/x', location=dict(proto='https',bucket='{bucket_name}',path='/documents/x'), code=301),
dict(xml=dict(RoutingRules=ROUTING_RULES['AmazonExample1+Protocol=https+Hostname=xyzzy']), url='/', location=None, code=200),
dict(xml=dict(RoutingRules=ROUTING_RULES['AmazonExample1+Protocol=https+Hostname=xyzzy']), url='/x', location=None, code=404),
dict(xml=dict(RoutingRules=ROUTING_RULES['AmazonExample1+Protocol=https+Hostname=xyzzy']), url='/docs/', location=dict(proto='https',hostname='xyzzy',path='/documents/'), code=301),
dict(xml=dict(RoutingRules=ROUTING_RULES['AmazonExample1+Protocol=https+Hostname=xyzzy']), url='/docs/x', location=dict(proto='https',hostname='xyzzy',path='/documents/x'), code=301),
dict(xml=dict(RoutingRules=ROUTING_RULES['AmazonExample2']), url='/images/', location=dict(proto='http',bucket='{bucket_name}',path='/folderdeleted.html'), code=301),
dict(xml=dict(RoutingRules=ROUTING_RULES['AmazonExample2']), url='/images/x', location=dict(proto='http',bucket='{bucket_name}',path='/folderdeleted.html'), code=301),
dict(xml=dict(RoutingRules=ROUTING_RULES['AmazonExample3']), url='/x', location=dict(proto='http',hostname='ec2-11-22-333-44.compute-1.amazonaws.com',path='/report-404/x'), code=301),
dict(xml=dict(RoutingRules=ROUTING_RULES['AmazonExample3']), url='/images/x', location=dict(proto='http',hostname='ec2-11-22-333-44.compute-1.amazonaws.com',path='/report-404/images/x'), code=301),
dict(xml=dict(RoutingRules=ROUTING_RULES['AmazonExample3+KeyPrefixEquals']), url='/x', location=None, code=404),
dict(xml=dict(RoutingRules=ROUTING_RULES['AmazonExample3+KeyPrefixEquals']), url='/images/x', location=dict(proto='http',hostname='ec2-11-22-333-44.compute-1.amazonaws.com',path='/report-404/x'), code=301),
]
ROUTING_ERROR_PROTOCOL = dict(code=400, reason='Bad Request', errorcode='InvalidRequest', bodyregex=r'Invalid protocol, protocol can be http or https. If not defined the protocol will be selected automatically.')
ROUTING_RULES_TESTS_ERRORS = [ # TODO: Unused!
# Invalid protocol, protocol can be http or https. If not defined the protocol will be selected automatically.
dict(xml=dict(RoutingRules=ROUTING_RULES['AmazonExample1+Protocol=http2']), url='/', location=None, code=400, error=ROUTING_ERROR_PROTOCOL),
dict(xml=dict(RoutingRules=ROUTING_RULES['AmazonExample1+Protocol=http2']), url='/x', location=None, code=400, error=ROUTING_ERROR_PROTOCOL),
dict(xml=dict(RoutingRules=ROUTING_RULES['AmazonExample1+Protocol=http2']), url='/docs/', location=None, code=400, error=ROUTING_ERROR_PROTOCOL),
dict(xml=dict(RoutingRules=ROUTING_RULES['AmazonExample1+Protocol=http2']), url='/docs/x', location=None, code=400, error=ROUTING_ERROR_PROTOCOL),
]
VALID_AMZ_REDIRECT = set([301,302,303,304,305,307,308])
# General lots of tests
for redirect_code in VALID_AMZ_REDIRECT:
rules = ROUTING_RULES['AmazonExample2+HttpRedirectCode=TMPL'].format(HttpRedirectCode=redirect_code)
result = redirect_code
ROUTING_RULES_TESTS.append(
dict(xml=dict(RoutingRules=rules), url='/images/', location=dict(proto='http',bucket='{bucket_name}',path='/folderdeleted.html'), code=result)
)
ROUTING_RULES_TESTS.append(
dict(xml=dict(RoutingRules=rules), url='/images/x', location=dict(proto='http',bucket='{bucket_name}',path='/folderdeleted.html'), code=result)
)
# TODO:
# codes other than those in VALID_AMZ_REDIRECT
# give an error of 'The provided HTTP redirect code (314) is not valid. Valid codes are 3XX except 300.' during setting the website config
# we should check that we can return that too on ceph
def routing_setup():
check_can_test_website()
kwargs = {'obj':[]}
bucket = get_new_bucket()
kwargs['bucket'] = bucket
kwargs['obj'].append(bucket)
#f = _test_website_prep(bucket, WEBSITE_CONFIGS_XMLFRAG['IndexDocErrorDoc'])
f = _test_website_prep(bucket, '')
kwargs.update(f)
bucket.set_canned_acl('public-read')
k = bucket.new_key('debug-ws.xml')
kwargs['obj'].append(k)
k.set_contents_from_string('', policy='public-read')
k = bucket.new_key(f['IndexDocument_Suffix'])
kwargs['obj'].append(k)
s = choose_bucket_prefix(template=INDEXDOC_TEMPLATE, max_len=64)
k.set_contents_from_string(s)
k.set_canned_acl('public-read')
k = bucket.new_key(f['ErrorDocument_Key'])
kwargs['obj'].append(k)
s = choose_bucket_prefix(template=ERRORDOC_TEMPLATE, max_len=64)
k.set_contents_from_string(s)
k.set_canned_acl('public-read')
#time.sleep(1)
while bucket.get_key(f['ErrorDocument_Key']) is None:
time.sleep(SLEEP_INTERVAL)
return kwargs
def routing_teardown(**kwargs):
for o in reversed(kwargs['obj']):
print('Deleting', str(o))
o.delete()
@common.with_setup_kwargs(setup=routing_setup, teardown=routing_teardown)
#@timed(10)
def routing_check(*args, **kwargs):
bucket = kwargs['bucket']
args=args[0]
#print(args)
pprint(args)
xml_fields = kwargs.copy()
xml_fields.update(args['xml'])
k = bucket.get_key('debug-ws.xml')
k.set_contents_from_string(str(args)+str(kwargs), policy='public-read')
pprint(xml_fields)
f = _test_website_prep(bucket, WEBSITE_CONFIGS_XMLFRAG['IndexDocErrorDoc'], hardcoded_fields=xml_fields)
#print(f)
config_xmlcmp = bucket.get_website_configuration_xml()
config_xmlcmp = common.normalize_xml(config_xmlcmp, pretty_print=True) # For us to read
res = _website_request(bucket.name, args['url'])
print(config_xmlcmp)
new_url = args['location']
if new_url is not None:
new_url = get_website_url(**new_url)
new_url = new_url.format(bucket_name=bucket.name)
if args['code'] >= 200 and args['code'] < 300:
#body = res.read()
#print(body)
#eq(body, args['content'], 'default content should match index.html set content')
ok(res.getheader('Content-Length', -1) > 0)
elif args['code'] >= 300 and args['code'] < 400:
_website_expected_redirect_response(res, args['code'], IGNORE_FIELD, new_url)
elif args['code'] >= 400:
_website_expected_error_response(res, bucket.name, args['code'], IGNORE_FIELD, IGNORE_FIELD)
else:
assert(False)
@attr('s3website_RoutingRules')
@attr('s3website')
@nose.with_setup(setup=check_can_test_website, teardown=common.teardown)
def test_routing_generator():
for t in ROUTING_RULES_TESTS:
if 'xml' in t and 'RoutingRules' in t['xml'] and len(t['xml']['RoutingRules']) > 0:
t['xml']['RoutingRules'] = common.trim_xml(t['xml']['RoutingRules'])
yield routing_check, t