import sys import collections import nose import string import random from pprint import pprint import time import boto.exception import socket from urllib.parse import urlparse from nose.tools import eq_ as eq, ok_ as ok from nose.plugins.attrib import attr from nose.tools import timed from nose.plugins.skip import SkipTest from .. import common from . import ( get_new_bucket, get_new_bucket_name, s3, config, _make_raw_request, choose_bucket_prefix, ) IGNORE_FIELD = 'IGNORETHIS' SLEEP_INTERVAL = 0.01 SLEEP_MAX = 2.0 WEBSITE_CONFIGS_XMLFRAG = { 'IndexDoc': '${IndexDocument_Suffix}${RoutingRules}', 'IndexDocErrorDoc': '${IndexDocument_Suffix}${ErrorDocument_Key}${RoutingRules}', 'RedirectAll': '${RedirectAllRequestsTo_HostName}${RoutingRules}', 'RedirectAll+Protocol': '${RedirectAllRequestsTo_HostName}${RedirectAllRequestsTo_Protocol}${RoutingRules}', } INDEXDOC_TEMPLATE = '

IndexDoc

{random}' ERRORDOC_TEMPLATE = '

ErrorDoc

{random}' CAN_WEBSITE = None def check_can_test_website(): global CAN_WEBSITE # This is a bit expensive, so we cache this if CAN_WEBSITE is None: bucket = get_new_bucket() try: wsconf = bucket.get_website_configuration() CAN_WEBSITE = True except boto.exception.S3ResponseError as e: if e.status == 404 and e.reason == 'Not Found' and e.error_code in ['NoSuchWebsiteConfiguration', 'NoSuchKey']: CAN_WEBSITE = True elif e.status == 405 and e.reason == 'Method Not Allowed' and e.error_code == 'MethodNotAllowed': # rgw_enable_static_website is false CAN_WEBSITE = False elif e.status == 403 and e.reason == 'SignatureDoesNotMatch' and e.error_code == 'Forbidden': # This is older versions that do not support the website code CAN_WEBSITE = False elif e.status == 501 and e.error_code == 'NotImplemented': CAN_WEBSITE = False else: raise RuntimeError("Unknown response in checking if WebsiteConf is supported", e) finally: bucket.delete() if CAN_WEBSITE is True: return True elif CAN_WEBSITE is False: raise SkipTest else: raise RuntimeError("Unknown cached response in checking if WebsiteConf is supported") def make_website_config(xml_fragment): """ Take the tedious stuff out of the config """ return '' + xml_fragment + '' def get_website_url(**kwargs): """ Return the URL to a website page """ proto, bucket, hostname, path = 'http', None, None, '/' if 'proto' in kwargs: proto = kwargs['proto'] if 'bucket' in kwargs: bucket = kwargs['bucket'] if 'hostname' in kwargs: hostname = kwargs['hostname'] if 'path' in kwargs: path = kwargs['path'] if hostname is None and bucket is None: return '/' + path.lstrip('/') domain = config['main']['host'] if('s3website_domain' in config['main']): domain = config['main']['s3website_domain'] elif('s3website_domain' in config['alt']): domain = config['DEFAULT']['s3website_domain'] if hostname is None and bucket is not None: hostname = '%s.%s' % (bucket, domain) path = path.lstrip('/') return "%s://%s/%s" % (proto, hostname, path) def _test_website_populate_fragment(xml_fragment, fields): for k in ['RoutingRules']: if k in list(fields.keys()) and len(fields[k]) > 0: fields[k] = '<%s>%s' % (k, fields[k], k) f = { 'IndexDocument_Suffix': choose_bucket_prefix(template='index-{random}.html', max_len=32), 'ErrorDocument_Key': choose_bucket_prefix(template='error-{random}.html', max_len=32), 'RedirectAllRequestsTo_HostName': choose_bucket_prefix(template='{random}.{random}.com', max_len=32), 'RoutingRules': '' } f.update(fields) xml_fragment = string.Template(xml_fragment).safe_substitute(**f) return xml_fragment, f def _test_website_prep(bucket, xml_template, hardcoded_fields = {}, expect_fail=None): xml_fragment, f = _test_website_populate_fragment(xml_template, hardcoded_fields) f['WebsiteConfiguration'] = '' if not xml_template: bucket.delete_website_configuration() return f config_xmlnew = make_website_config(xml_fragment) config_xmlold = '' try: config_xmlold = common.normalize_xml(bucket.get_website_configuration_xml(), pretty_print=True) except boto.exception.S3ResponseError as e: if str(e.status) == str(404) \ and ('NoSuchWebsiteConfiguration' in e.body or 'NoSuchWebsiteConfiguration' in e.code or 'NoSuchKey' in e.body or 'NoSuchKey' in e.code): pass else: raise e try: bucket.set_website_configuration_xml(common.trim_xml(config_xmlnew)) config_xmlnew = common.normalize_xml(config_xmlnew, pretty_print=True) except boto.exception.S3ResponseError as e: if expect_fail is not None: if isinstance(expect_fail, dict): pass elif isinstance(expect_fail, str): pass raise e # TODO: in some cases, it takes non-zero time for the config to be applied by AmazonS3 # We should figure out how to poll for changes better # WARNING: eu-west-1 as of 2015/06/22 was taking at least 4 seconds to propogate website configs, esp when you cycle between non-null configs time.sleep(0.1) config_xmlcmp = common.normalize_xml(bucket.get_website_configuration_xml(), pretty_print=True) #if config_xmlold is not None: # print('old',config_xmlold.replace("\n",'')) #if config_xmlcmp is not None: # print('cmp',config_xmlcmp.replace("\n",'')) #if config_xmlnew is not None: # print('new',config_xmlnew.replace("\n",'')) # Cleanup for our validation common.assert_xml_equal(config_xmlcmp, config_xmlnew) #print("config_xmlcmp\n", config_xmlcmp) #eq (config_xmlnew, config_xmlcmp) f['WebsiteConfiguration'] = config_xmlcmp return f def __website_expected_reponse_status(res, status, reason): if not isinstance(status, collections.Container): status = set([status]) if not isinstance(reason, collections.Container): reason = set([reason]) if status is not IGNORE_FIELD: ok(res.status in status, 'HTTP code was %s should be %s' % (res.status, status)) if reason is not IGNORE_FIELD: ok(res.reason in reason, 'HTTP reason was was %s should be %s' % (res.reason, reason)) def _website_expected_default_html(**kwargs): fields = [] for k in list(kwargs.keys()): # AmazonS3 seems to be inconsistent, some HTML errors include BucketName, but others do not. if k is 'BucketName': continue v = kwargs[k] if isinstance(v, str): v = [v] elif not isinstance(v, collections.Container): v = [v] for v2 in v: s = '
  • %s: %s
  • ' % (k,v2) fields.append(s) return fields def _website_expected_error_response(res, bucket_name, status, reason, code, content=None, body=None): if body is None: body = res.read() print(body) __website_expected_reponse_status(res, status, reason) # Argh, AmazonS3 is really inconsistent, so we have a conditional test! # This is most visible if you have an ErrorDoc present errorcode = res.getheader('x-amz-error-code', None) if errorcode is not None: if code is not IGNORE_FIELD: eq(errorcode, code) if not isinstance(content, collections.Container): content = set([content]) for f in content: if f is not IGNORE_FIELD and f is not None: f = bytes(f, 'utf-8') ok(f in body, 'HTML should contain "%s"' % (f, )) def _website_expected_redirect_response(res, status, reason, new_url): body = res.read() print(body) __website_expected_reponse_status(res, status, reason) loc = res.getheader('Location', None) eq(loc, new_url, 'Location header should be set "%s" != "%s"' % (loc,new_url,)) ok(len(body) == 0, 'Body of a redirect should be empty') def _website_request(bucket_name, path, connect_hostname=None, method='GET', timeout=None): url = get_website_url(proto='http', bucket=bucket_name, path=path) print("url", url) o = urlparse(url) if connect_hostname is None: connect_hostname = o.hostname path = o.path + '?' + o.query request_headers={} request_headers['Host'] = o.hostname request_headers['Accept'] = '*/*' print('Request: {method} {path}\n{headers}'.format(method=method, path=path, headers=''.join([t[0]+':'+t[1]+"\n" for t in list(request_headers.items())]))) res = _make_raw_request(connect_hostname, config.main.port, method, path, request_headers=request_headers, secure=False, timeout=timeout) for (k,v) in res.getheaders(): print(k,v) return res # ---------- Non-existant buckets via the website endpoint @attr(resource='bucket') @attr(method='get') @attr(operation='list') @attr(assertion='non-existant bucket via website endpoint should give NoSuchBucket, exposing security risk') @attr('s3website') @attr('fails_on_rgw') @nose.with_setup(setup=check_can_test_website, teardown=common.teardown) def test_website_nonexistant_bucket_s3(): bucket_name = get_new_bucket_name() res = _website_request(bucket_name, '') _website_expected_error_response(res, bucket_name, 404, 'Not Found', 'NoSuchBucket', content=_website_expected_default_html(Code='NoSuchBucket')) @attr(resource='bucket') @attr(method='get') @attr(operation='list') #@attr(assertion='non-existant bucket via website endpoint should give Forbidden, keeping bucket identity secure') @attr(assertion='non-existant bucket via website endpoint should give NoSuchBucket') @attr('s3website') @attr('fails_on_s3') @nose.with_setup(setup=check_can_test_website, teardown=common.teardown) def test_website_nonexistant_bucket_rgw(): bucket_name = get_new_bucket_name() res = _website_request(bucket_name, '') #_website_expected_error_response(res, bucket_name, 403, 'Forbidden', 'AccessDenied', content=_website_expected_default_html(Code='AccessDenied')) _website_expected_error_response(res, bucket_name, 404, 'Not Found', 'NoSuchBucket', content=_website_expected_default_html(Code='NoSuchBucket')) #------------- IndexDocument only, successes @attr(resource='bucket') @attr(method='get') @attr(operation='list') @attr(assertion='non-empty public buckets via s3website return page for /, where page is public') @attr('s3website') @nose.with_setup(setup=check_can_test_website, teardown=common.teardown) @timed(10) def test_website_public_bucket_list_public_index(): bucket = get_new_bucket() f = _test_website_prep(bucket, WEBSITE_CONFIGS_XMLFRAG['IndexDoc']) bucket.make_public() indexhtml = bucket.new_key(f['IndexDocument_Suffix']) indexstring = choose_bucket_prefix(template=INDEXDOC_TEMPLATE, max_len=256) indexhtml.set_contents_from_string(indexstring) indexhtml.make_public() #time.sleep(1) while bucket.get_key(f['IndexDocument_Suffix']) is None: time.sleep(SLEEP_INTERVAL) res = _website_request(bucket.name, '') body = res.read() print(body) indexstring = bytes(indexstring, 'utf-8') eq(body, indexstring) # default content should match index.html set content __website_expected_reponse_status(res, 200, 'OK') indexhtml.delete() bucket.delete() @attr(resource='bucket') @attr(method='get') @attr(operation='list') @attr(assertion='non-empty private buckets via s3website return page for /, where page is private') @attr('s3website') @nose.with_setup(setup=check_can_test_website, teardown=common.teardown) def test_website_private_bucket_list_public_index(): bucket = get_new_bucket() f = _test_website_prep(bucket, WEBSITE_CONFIGS_XMLFRAG['IndexDoc']) bucket.set_canned_acl('private') indexhtml = bucket.new_key(f['IndexDocument_Suffix']) indexstring = choose_bucket_prefix(template=INDEXDOC_TEMPLATE, max_len=256) indexhtml.set_contents_from_string(indexstring) indexhtml.make_public() #time.sleep(1) while bucket.get_key(f['IndexDocument_Suffix']) is None: time.sleep(SLEEP_INTERVAL) res = _website_request(bucket.name, '') __website_expected_reponse_status(res, 200, 'OK') body = res.read() print(body) indexstring = bytes(indexstring, 'utf-8') eq(body, indexstring, 'default content should match index.html set content') indexhtml.delete() bucket.delete() # ---------- IndexDocument only, failures @attr(resource='bucket') @attr(method='get') @attr(operation='list') @attr(assertion='empty private buckets via s3website return a 403 for /') @attr('s3website') @nose.with_setup(setup=check_can_test_website, teardown=common.teardown) def test_website_private_bucket_list_empty(): bucket = get_new_bucket() f = _test_website_prep(bucket, WEBSITE_CONFIGS_XMLFRAG['IndexDoc']) bucket.set_canned_acl('private') # TODO: wait for sync res = _website_request(bucket.name, '') _website_expected_error_response(res, bucket.name, 403, 'Forbidden', 'AccessDenied', content=_website_expected_default_html(Code='AccessDenied')) bucket.delete() @attr(resource='bucket') @attr(method='get') @attr(operation='list') @attr(assertion='empty public buckets via s3website return a 404 for /') @attr('s3website') @nose.with_setup(setup=check_can_test_website, teardown=common.teardown) def test_website_public_bucket_list_empty(): bucket = get_new_bucket() f = _test_website_prep(bucket, WEBSITE_CONFIGS_XMLFRAG['IndexDoc']) bucket.make_public() res = _website_request(bucket.name, '') _website_expected_error_response(res, bucket.name, 404, 'Not Found', 'NoSuchKey', content=_website_expected_default_html(Code='NoSuchKey')) bucket.delete() @attr(resource='bucket') @attr(method='get') @attr(operation='list') @attr(assertion='non-empty public buckets via s3website return page for /, where page is private') @attr('s3website') @nose.with_setup(setup=check_can_test_website, teardown=common.teardown) def test_website_public_bucket_list_private_index(): bucket = get_new_bucket() f = _test_website_prep(bucket, WEBSITE_CONFIGS_XMLFRAG['IndexDoc']) bucket.make_public() indexhtml = bucket.new_key(f['IndexDocument_Suffix']) indexstring = choose_bucket_prefix(template=INDEXDOC_TEMPLATE, max_len=256) indexhtml.set_contents_from_string(indexstring) indexhtml.set_canned_acl('private') #time.sleep(1) #time.sleep(1) while bucket.get_key(f['IndexDocument_Suffix']) is None: time.sleep(SLEEP_INTERVAL) res = _website_request(bucket.name, '') _website_expected_error_response(res, bucket.name, 403, 'Forbidden', 'AccessDenied', content=_website_expected_default_html(Code='AccessDenied')) indexhtml.delete() bucket.delete() @attr(resource='bucket') @attr(method='get') @attr(operation='list') @attr(assertion='non-empty private buckets via s3website return page for /, where page is private') @attr('s3website') @nose.with_setup(setup=check_can_test_website, teardown=common.teardown) def test_website_private_bucket_list_private_index(): bucket = get_new_bucket() f = _test_website_prep(bucket, WEBSITE_CONFIGS_XMLFRAG['IndexDoc']) bucket.set_canned_acl('private') indexhtml = bucket.new_key(f['IndexDocument_Suffix']) indexstring = choose_bucket_prefix(template=INDEXDOC_TEMPLATE, max_len=256) indexhtml.set_contents_from_string(indexstring) indexhtml.set_canned_acl('private') ##time.sleep(1) while bucket.get_key(f['IndexDocument_Suffix']) is None: time.sleep(SLEEP_INTERVAL) res = _website_request(bucket.name, '') _website_expected_error_response(res, bucket.name, 403, 'Forbidden', 'AccessDenied', content=_website_expected_default_html(Code='AccessDenied')) indexhtml.delete() bucket.delete() # ---------- IndexDocument & ErrorDocument, failures due to errordoc assigned but missing @attr(resource='bucket') @attr(method='get') @attr(operation='list') @attr(assertion='empty private buckets via s3website return a 403 for /, missing errordoc') @attr('s3website') @nose.with_setup(setup=check_can_test_website, teardown=common.teardown) def test_website_private_bucket_list_empty_missingerrordoc(): bucket = get_new_bucket() f = _test_website_prep(bucket, WEBSITE_CONFIGS_XMLFRAG['IndexDocErrorDoc']) bucket.set_canned_acl('private') res = _website_request(bucket.name, '') _website_expected_error_response(res, bucket.name, 403, 'Forbidden', 'AccessDenied', content=_website_expected_default_html(Code='AccessDenied')) bucket.delete() @attr(resource='bucket') @attr(method='get') @attr(operation='list') @attr(assertion='empty public buckets via s3website return a 404 for /, missing errordoc') @attr('s3website') @nose.with_setup(setup=check_can_test_website, teardown=common.teardown) def test_website_public_bucket_list_empty_missingerrordoc(): bucket = get_new_bucket() f = _test_website_prep(bucket, WEBSITE_CONFIGS_XMLFRAG['IndexDocErrorDoc']) bucket.make_public() res = _website_request(bucket.name, '') _website_expected_error_response(res, bucket.name, 404, 'Not Found', 'NoSuchKey') bucket.delete() @attr(resource='bucket') @attr(method='get') @attr(operation='list') @attr(assertion='non-empty public buckets via s3website return page for /, where page is private, missing errordoc') @attr('s3website') @nose.with_setup(setup=check_can_test_website, teardown=common.teardown) def test_website_public_bucket_list_private_index_missingerrordoc(): bucket = get_new_bucket() f = _test_website_prep(bucket, WEBSITE_CONFIGS_XMLFRAG['IndexDocErrorDoc']) bucket.make_public() indexhtml = bucket.new_key(f['IndexDocument_Suffix']) indexstring = choose_bucket_prefix(template=INDEXDOC_TEMPLATE, max_len=256) indexhtml.set_contents_from_string(indexstring) indexhtml.set_canned_acl('private') #time.sleep(1) while bucket.get_key(f['IndexDocument_Suffix']) is None: time.sleep(SLEEP_INTERVAL) res = _website_request(bucket.name, '') _website_expected_error_response(res, bucket.name, 403, 'Forbidden', 'AccessDenied', content=_website_expected_default_html(Code='AccessDenied')) indexhtml.delete() bucket.delete() @attr(resource='bucket') @attr(method='get') @attr(operation='list') @attr(assertion='non-empty private buckets via s3website return page for /, where page is private, missing errordoc') @attr('s3website') @nose.with_setup(setup=check_can_test_website, teardown=common.teardown) def test_website_private_bucket_list_private_index_missingerrordoc(): bucket = get_new_bucket() f = _test_website_prep(bucket, WEBSITE_CONFIGS_XMLFRAG['IndexDocErrorDoc']) bucket.set_canned_acl('private') indexhtml = bucket.new_key(f['IndexDocument_Suffix']) indexstring = choose_bucket_prefix(template=INDEXDOC_TEMPLATE, max_len=256) indexhtml.set_contents_from_string(indexstring) indexhtml.set_canned_acl('private') #time.sleep(1) while bucket.get_key(f['IndexDocument_Suffix']) is None: time.sleep(SLEEP_INTERVAL) res = _website_request(bucket.name, '') _website_expected_error_response(res, bucket.name, 403, 'Forbidden', 'AccessDenied', content=_website_expected_default_html(Code='AccessDenied')) indexhtml.delete() bucket.delete() # ---------- IndexDocument & ErrorDocument, failures due to errordoc assigned but not accessible @attr(resource='bucket') @attr(method='get') @attr(operation='list') @attr(assertion='empty private buckets via s3website return a 403 for /, blocked errordoc') @attr('s3website') @nose.with_setup(setup=check_can_test_website, teardown=common.teardown) def test_website_private_bucket_list_empty_blockederrordoc(): bucket = get_new_bucket() f = _test_website_prep(bucket, WEBSITE_CONFIGS_XMLFRAG['IndexDocErrorDoc']) bucket.set_canned_acl('private') errorhtml = bucket.new_key(f['ErrorDocument_Key']) errorstring = choose_bucket_prefix(template=ERRORDOC_TEMPLATE, max_len=256) errorhtml.set_contents_from_string(errorstring) errorhtml.set_canned_acl('private') #time.sleep(1) while bucket.get_key(f['ErrorDocument_Key']) is None: time.sleep(SLEEP_INTERVAL) res = _website_request(bucket.name, '') body = res.read() print(body) _website_expected_error_response(res, bucket.name, 403, 'Forbidden', 'AccessDenied', content=_website_expected_default_html(Code='AccessDenied'), body=body) errorstring = bytes(errorstring, 'utf-8') ok(errorstring not in body, 'error content should NOT match error.html set content') errorhtml.delete() bucket.delete() @attr(resource='bucket') @attr(method='get') @attr(operation='list') @attr(assertion='check if there is an invalid payload after serving error doc') @attr('s3website') @nose.with_setup(setup=check_can_test_website, teardown=common.teardown) def test_website_public_bucket_list_pubilc_errordoc(): bucket = get_new_bucket() f = _test_website_prep(bucket, WEBSITE_CONFIGS_XMLFRAG['IndexDocErrorDoc']) bucket.make_public() errorhtml = bucket.new_key(f['ErrorDocument_Key']) errorstring = choose_bucket_prefix(template=ERRORDOC_TEMPLATE, max_len=256) errorhtml.set_contents_from_string(errorstring) errorhtml.set_canned_acl('public-read') url = get_website_url(proto='http', bucket=bucket.name, path='') o = urlparse(url) host = o.hostname port = s3.main.port sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) sock.connect((host, port)) request = "GET / HTTP/1.1\r\nHost:%s.%s:%s\r\n\r\n" % (bucket.name, host, port) sock.send(request.encode()) #receive header resp = sock.recv(4096) print(resp) #receive body resp = sock.recv(4096) print('payload length=%d' % len(resp)) print(resp) #check if any additional payload is left resp_len = 0 sock.settimeout(2) try: resp = sock.recv(4096) resp_len = len(resp) print('invalid payload length=%d' % resp_len) print(resp) except socket.timeout: print('no invalid payload') ok(resp_len == 0, 'invalid payload') errorhtml.delete() bucket.delete() @attr(resource='bucket') @attr(method='get') @attr(operation='list') @attr(assertion='empty public buckets via s3website return a 404 for /, blocked errordoc') @attr('s3website') @nose.with_setup(setup=check_can_test_website, teardown=common.teardown) def test_website_public_bucket_list_empty_blockederrordoc(): bucket = get_new_bucket() f = _test_website_prep(bucket, WEBSITE_CONFIGS_XMLFRAG['IndexDocErrorDoc']) bucket.make_public() errorhtml = bucket.new_key(f['ErrorDocument_Key']) errorstring = choose_bucket_prefix(template=ERRORDOC_TEMPLATE, max_len=256) errorhtml.set_contents_from_string(errorstring) errorhtml.set_canned_acl('private') while bucket.get_key(f['ErrorDocument_Key']) is None: time.sleep(SLEEP_INTERVAL) res = _website_request(bucket.name, '') body = res.read() print(body) _website_expected_error_response(res, bucket.name, 404, 'Not Found', 'NoSuchKey', content=_website_expected_default_html(Code='NoSuchKey'), body=body) errorstring = bytes(errorstring, 'utf-8') ok(errorstring not in body, 'error content should match error.html set content') errorhtml.delete() bucket.delete() @attr(resource='bucket') @attr(method='get') @attr(operation='list') @attr(assertion='non-empty public buckets via s3website return page for /, where page is private, blocked errordoc') @attr('s3website') @nose.with_setup(setup=check_can_test_website, teardown=common.teardown) def test_website_public_bucket_list_private_index_blockederrordoc(): bucket = get_new_bucket() f = _test_website_prep(bucket, WEBSITE_CONFIGS_XMLFRAG['IndexDocErrorDoc']) bucket.make_public() indexhtml = bucket.new_key(f['IndexDocument_Suffix']) indexstring = choose_bucket_prefix(template=INDEXDOC_TEMPLATE, max_len=256) indexhtml.set_contents_from_string(indexstring) indexhtml.set_canned_acl('private') errorhtml = bucket.new_key(f['ErrorDocument_Key']) errorstring = choose_bucket_prefix(template=ERRORDOC_TEMPLATE, max_len=256) errorhtml.set_contents_from_string(errorstring) errorhtml.set_canned_acl('private') #time.sleep(1) while bucket.get_key(f['ErrorDocument_Key']) is None: time.sleep(SLEEP_INTERVAL) res = _website_request(bucket.name, '') body = res.read() print(body) _website_expected_error_response(res, bucket.name, 403, 'Forbidden', 'AccessDenied', content=_website_expected_default_html(Code='AccessDenied'), body=body) errorstring = bytes(errorstring, 'utf-8') ok(errorstring not in body, 'error content should match error.html set content') indexhtml.delete() errorhtml.delete() bucket.delete() @attr(resource='bucket') @attr(method='get') @attr(operation='list') @attr(assertion='non-empty private buckets via s3website return page for /, where page is private, blocked errordoc') @attr('s3website') @nose.with_setup(setup=check_can_test_website, teardown=common.teardown) def test_website_private_bucket_list_private_index_blockederrordoc(): bucket = get_new_bucket() f = _test_website_prep(bucket, WEBSITE_CONFIGS_XMLFRAG['IndexDocErrorDoc']) bucket.set_canned_acl('private') indexhtml = bucket.new_key(f['IndexDocument_Suffix']) indexstring = choose_bucket_prefix(template=INDEXDOC_TEMPLATE, max_len=256) indexhtml.set_contents_from_string(indexstring) indexhtml.set_canned_acl('private') errorhtml = bucket.new_key(f['ErrorDocument_Key']) errorstring = choose_bucket_prefix(template=ERRORDOC_TEMPLATE, max_len=256) errorhtml.set_contents_from_string(errorstring) errorhtml.set_canned_acl('private') #time.sleep(1) while bucket.get_key(f['ErrorDocument_Key']) is None: time.sleep(SLEEP_INTERVAL) res = _website_request(bucket.name, '') body = res.read() print(body) _website_expected_error_response(res, bucket.name, 403, 'Forbidden', 'AccessDenied', content=_website_expected_default_html(Code='AccessDenied'), body=body) errorstring = bytes(errorstring, 'utf-8') ok(errorstring not in body, 'error content should match error.html set content') indexhtml.delete() errorhtml.delete() bucket.delete() # ---------- IndexDocument & ErrorDocument, failures with errordoc available @attr(resource='bucket') @attr(method='get') @attr(operation='list') @attr(assertion='empty private buckets via s3website return a 403 for /, good errordoc') @attr('s3website') @nose.with_setup(setup=check_can_test_website, teardown=common.teardown) def test_website_private_bucket_list_empty_gooderrordoc(): bucket = get_new_bucket() f = _test_website_prep(bucket, WEBSITE_CONFIGS_XMLFRAG['IndexDocErrorDoc']) bucket.set_canned_acl('private') errorhtml = bucket.new_key(f['ErrorDocument_Key']) errorstring = choose_bucket_prefix(template=ERRORDOC_TEMPLATE, max_len=256) errorhtml.set_contents_from_string(errorstring, policy='public-read') #time.sleep(1) while bucket.get_key(f['ErrorDocument_Key']) is None: time.sleep(SLEEP_INTERVAL) res = _website_request(bucket.name, '') _website_expected_error_response(res, bucket.name, 403, 'Forbidden', 'AccessDenied', content=[errorstring]) errorhtml.delete() bucket.delete() @attr(resource='bucket') @attr(method='get') @attr(operation='list') @attr(assertion='empty public buckets via s3website return a 404 for /, good errordoc') @attr('s3website') @nose.with_setup(setup=check_can_test_website, teardown=common.teardown) def test_website_public_bucket_list_empty_gooderrordoc(): bucket = get_new_bucket() f = _test_website_prep(bucket, WEBSITE_CONFIGS_XMLFRAG['IndexDocErrorDoc']) bucket.make_public() errorhtml = bucket.new_key(f['ErrorDocument_Key']) errorstring = choose_bucket_prefix(template=ERRORDOC_TEMPLATE, max_len=256) errorhtml.set_contents_from_string(errorstring) errorhtml.set_canned_acl('public-read') #time.sleep(1) while bucket.get_key(f['ErrorDocument_Key']) is None: time.sleep(SLEEP_INTERVAL) res = _website_request(bucket.name, '') _website_expected_error_response(res, bucket.name, 404, 'Not Found', 'NoSuchKey', content=[errorstring]) errorhtml.delete() bucket.delete() @attr(resource='bucket') @attr(method='get') @attr(operation='list') @attr(assertion='non-empty public buckets via s3website return page for /, where page is private') @attr('s3website') @nose.with_setup(setup=check_can_test_website, teardown=common.teardown) def test_website_public_bucket_list_private_index_gooderrordoc(): bucket = get_new_bucket() f = _test_website_prep(bucket, WEBSITE_CONFIGS_XMLFRAG['IndexDocErrorDoc']) bucket.make_public() indexhtml = bucket.new_key(f['IndexDocument_Suffix']) indexstring = choose_bucket_prefix(template=INDEXDOC_TEMPLATE, max_len=256) indexhtml.set_contents_from_string(indexstring) indexhtml.set_canned_acl('private') errorhtml = bucket.new_key(f['ErrorDocument_Key']) errorstring = choose_bucket_prefix(template=ERRORDOC_TEMPLATE, max_len=256) errorhtml.set_contents_from_string(errorstring) errorhtml.set_canned_acl('public-read') #time.sleep(1) while bucket.get_key(f['ErrorDocument_Key']) is None: time.sleep(SLEEP_INTERVAL) res = _website_request(bucket.name, '') _website_expected_error_response(res, bucket.name, 403, 'Forbidden', 'AccessDenied', content=[errorstring]) indexhtml.delete() errorhtml.delete() bucket.delete() @attr(resource='bucket') @attr(method='get') @attr(operation='list') @attr(assertion='non-empty private buckets via s3website return page for /, where page is private') @attr('s3website') @nose.with_setup(setup=check_can_test_website, teardown=common.teardown) def test_website_private_bucket_list_private_index_gooderrordoc(): bucket = get_new_bucket() f = _test_website_prep(bucket, WEBSITE_CONFIGS_XMLFRAG['IndexDocErrorDoc']) bucket.set_canned_acl('private') indexhtml = bucket.new_key(f['IndexDocument_Suffix']) indexstring = choose_bucket_prefix(template=INDEXDOC_TEMPLATE, max_len=256) indexhtml.set_contents_from_string(indexstring) indexhtml.set_canned_acl('private') errorhtml = bucket.new_key(f['ErrorDocument_Key']) errorstring = choose_bucket_prefix(template=ERRORDOC_TEMPLATE, max_len=256) errorhtml.set_contents_from_string(errorstring) errorhtml.set_canned_acl('public-read') #time.sleep(1) while bucket.get_key(f['ErrorDocument_Key']) is None: time.sleep(SLEEP_INTERVAL) res = _website_request(bucket.name, '') _website_expected_error_response(res, bucket.name, 403, 'Forbidden', 'AccessDenied', content=[errorstring]) indexhtml.delete() errorhtml.delete() bucket.delete() # ------ RedirectAll tests @attr(resource='bucket') @attr(method='get') @attr(operation='list') @attr(assertion='RedirectAllRequestsTo without protocol should TODO') @attr('s3website') @nose.with_setup(setup=check_can_test_website, teardown=common.teardown) def test_website_bucket_private_redirectall_base(): bucket = get_new_bucket() f = _test_website_prep(bucket, WEBSITE_CONFIGS_XMLFRAG['RedirectAll']) bucket.set_canned_acl('private') res = _website_request(bucket.name, '') new_url = 'http://%s/' % f['RedirectAllRequestsTo_HostName'] _website_expected_redirect_response(res, 301, ['Moved Permanently'], new_url) bucket.delete() @attr(resource='bucket') @attr(method='get') @attr(operation='list') @attr(assertion='RedirectAllRequestsTo without protocol should TODO') @attr('s3website') @nose.with_setup(setup=check_can_test_website, teardown=common.teardown) def test_website_bucket_private_redirectall_path(): bucket = get_new_bucket() f = _test_website_prep(bucket, WEBSITE_CONFIGS_XMLFRAG['RedirectAll']) bucket.set_canned_acl('private') pathfragment = choose_bucket_prefix(template='/{random}', max_len=16) res = _website_request(bucket.name, pathfragment) new_url = 'http://%s%s' % (f['RedirectAllRequestsTo_HostName'], pathfragment) _website_expected_redirect_response(res, 301, ['Moved Permanently'], new_url) bucket.delete() @attr(resource='bucket') @attr(method='get') @attr(operation='list') @attr(assertion='RedirectAllRequestsTo without protocol should TODO') @attr('s3website') @nose.with_setup(setup=check_can_test_website, teardown=common.teardown) def test_website_bucket_private_redirectall_path_upgrade(): bucket = get_new_bucket() x = string.Template(WEBSITE_CONFIGS_XMLFRAG['RedirectAll+Protocol']).safe_substitute(RedirectAllRequestsTo_Protocol='https') f = _test_website_prep(bucket, x) bucket.set_canned_acl('private') pathfragment = choose_bucket_prefix(template='/{random}', max_len=16) res = _website_request(bucket.name, pathfragment) new_url = 'https://%s%s' % (f['RedirectAllRequestsTo_HostName'], pathfragment) _website_expected_redirect_response(res, 301, ['Moved Permanently'], new_url) bucket.delete() # ------ x-amz redirect tests @attr(resource='bucket') @attr(method='get') @attr(operation='list') @attr(assertion='x-amz-website-redirect-location should not fire without websiteconf') @attr('s3website') @attr('x-amz-website-redirect-location') @nose.with_setup(setup=check_can_test_website, teardown=common.teardown) def test_website_xredirect_nonwebsite(): bucket = get_new_bucket() #f = _test_website_prep(bucket, WEBSITE_CONFIGS_XMLFRAG['RedirectAll']) #bucket.set_canned_acl('private') k = bucket.new_key('page') content = 'wrong-content' redirect_dest = '/relative' headers = {'x-amz-website-redirect-location': redirect_dest} k.set_contents_from_string(content, headers=headers, policy='public-read') redirect = k.get_redirect() eq(k.get_redirect(), redirect_dest) res = _website_request(bucket.name, '/page') body = res.read() print(body) expected_content = _website_expected_default_html(Code='NoSuchWebsiteConfiguration', BucketName=bucket.name) # TODO: RGW does not have custom error messages for different 404s yet #expected_content = _website_expected_default_html(Code='NoSuchWebsiteConfiguration', BucketName=bucket.name, Message='The specified bucket does not have a website configuration') print(expected_content) _website_expected_error_response(res, bucket.name, 404, 'Not Found', 'NoSuchWebsiteConfiguration', content=expected_content, body=body) k.delete() bucket.delete() @attr(resource='bucket') @attr(method='get') @attr(operation='list') @attr(assertion='x-amz-website-redirect-location should fire websiteconf, relative path, public key') @attr('s3website') @attr('x-amz-website-redirect-location') @nose.with_setup(setup=check_can_test_website, teardown=common.teardown) def test_website_xredirect_public_relative(): bucket = get_new_bucket() f = _test_website_prep(bucket, WEBSITE_CONFIGS_XMLFRAG['IndexDoc']) bucket.make_public() k = bucket.new_key('page') content = 'wrong-content' redirect_dest = '/relative' headers = {'x-amz-website-redirect-location': redirect_dest} k.set_contents_from_string(content, headers=headers, policy='public-read') redirect = k.get_redirect() eq(k.get_redirect(), redirect_dest) res = _website_request(bucket.name, '/page') #new_url = get_website_url(bucket_name=bucket.name, path=redirect_dest) _website_expected_redirect_response(res, 301, ['Moved Permanently'], redirect_dest) k.delete() bucket.delete() @attr(resource='bucket') @attr(method='get') @attr(operation='list') @attr(assertion='x-amz-website-redirect-location should fire websiteconf, absolute, public key') @attr('s3website') @attr('x-amz-website-redirect-location') @nose.with_setup(setup=check_can_test_website, teardown=common.teardown) def test_website_xredirect_public_abs(): bucket = get_new_bucket() f = _test_website_prep(bucket, WEBSITE_CONFIGS_XMLFRAG['IndexDoc']) bucket.make_public() k = bucket.new_key('page') content = 'wrong-content' redirect_dest = 'http://example.com/foo' headers = {'x-amz-website-redirect-location': redirect_dest} k.set_contents_from_string(content, headers=headers, policy='public-read') redirect = k.get_redirect() eq(k.get_redirect(), redirect_dest) res = _website_request(bucket.name, '/page') new_url = get_website_url(proto='http', hostname='example.com', path='/foo') _website_expected_redirect_response(res, 301, ['Moved Permanently'], new_url) k.delete() bucket.delete() @attr(resource='bucket') @attr(method='get') @attr(operation='list') @attr(assertion='x-amz-website-redirect-location should fire websiteconf, relative path, private key') @attr('s3website') @attr('x-amz-website-redirect-location') @nose.with_setup(setup=check_can_test_website, teardown=common.teardown) def test_website_xredirect_private_relative(): bucket = get_new_bucket() f = _test_website_prep(bucket, WEBSITE_CONFIGS_XMLFRAG['IndexDoc']) bucket.make_public() k = bucket.new_key('page') content = 'wrong-content' redirect_dest = '/relative' headers = {'x-amz-website-redirect-location': redirect_dest} k.set_contents_from_string(content, headers=headers, policy='private') redirect = k.get_redirect() eq(k.get_redirect(), redirect_dest) res = _website_request(bucket.name, '/page') # We get a 403 because the page is private _website_expected_error_response(res, bucket.name, 403, 'Forbidden', 'AccessDenied', content=_website_expected_default_html(Code='AccessDenied')) k.delete() bucket.delete() @attr(resource='bucket') @attr(method='get') @attr(operation='list') @attr(assertion='x-amz-website-redirect-location should fire websiteconf, absolute, private key') @attr('s3website') @attr('x-amz-website-redirect-location') @nose.with_setup(setup=check_can_test_website, teardown=common.teardown) def test_website_xredirect_private_abs(): bucket = get_new_bucket() f = _test_website_prep(bucket, WEBSITE_CONFIGS_XMLFRAG['IndexDoc']) bucket.make_public() k = bucket.new_key('page') content = 'wrong-content' redirect_dest = 'http://example.com/foo' headers = {'x-amz-website-redirect-location': redirect_dest} k.set_contents_from_string(content, headers=headers, policy='private') redirect = k.get_redirect() eq(k.get_redirect(), redirect_dest) res = _website_request(bucket.name, '/page') new_url = get_website_url(proto='http', hostname='example.com', path='/foo') # We get a 403 because the page is private _website_expected_error_response(res, bucket.name, 403, 'Forbidden', 'AccessDenied', content=_website_expected_default_html(Code='AccessDenied')) k.delete() bucket.delete() # ------ RoutingRules tests # RoutingRules ROUTING_RULES = { 'empty': '', 'AmazonExample1': \ """ docs/ documents/ """, 'AmazonExample1+Protocol=https': \ """ docs/ https documents/ """, 'AmazonExample1+Protocol=https+Hostname=xyzzy': \ """ docs/ https xyzzy documents/ """, 'AmazonExample1+Protocol=http2': \ """ docs/ http2 documents/ """, 'AmazonExample2': \ """ images/ folderdeleted.html """, 'AmazonExample2+HttpRedirectCode=TMPL': \ """ images/ {HttpRedirectCode} folderdeleted.html """, 'AmazonExample3': \ """ 404 ec2-11-22-333-44.compute-1.amazonaws.com report-404/ """, 'AmazonExample3+KeyPrefixEquals': \ """ images/ 404 ec2-11-22-333-44.compute-1.amazonaws.com report-404/ """, } for k in list(ROUTING_RULES.keys()): if len(ROUTING_RULES[k]) > 0: ROUTING_RULES[k] = "\n%s" % (k, ROUTING_RULES[k]) ROUTING_RULES_TESTS = [ dict(xml=dict(RoutingRules=ROUTING_RULES['empty']), url='', location=None, code=200), dict(xml=dict(RoutingRules=ROUTING_RULES['empty']), url='/', location=None, code=200), dict(xml=dict(RoutingRules=ROUTING_RULES['empty']), url='/x', location=None, code=404), dict(xml=dict(RoutingRules=ROUTING_RULES['AmazonExample1']), url='/', location=None, code=200), dict(xml=dict(RoutingRules=ROUTING_RULES['AmazonExample1']), url='/x', location=None, code=404), dict(xml=dict(RoutingRules=ROUTING_RULES['AmazonExample1']), url='/docs/', location=dict(proto='http',bucket='{bucket_name}',path='/documents/'), code=301), dict(xml=dict(RoutingRules=ROUTING_RULES['AmazonExample1']), url='/docs/x', location=dict(proto='http',bucket='{bucket_name}',path='/documents/x'), code=301), dict(xml=dict(RoutingRules=ROUTING_RULES['AmazonExample1+Protocol=https']), url='/', location=None, code=200), dict(xml=dict(RoutingRules=ROUTING_RULES['AmazonExample1+Protocol=https']), url='/x', location=None, code=404), dict(xml=dict(RoutingRules=ROUTING_RULES['AmazonExample1+Protocol=https']), url='/docs/', location=dict(proto='https',bucket='{bucket_name}',path='/documents/'), code=301), dict(xml=dict(RoutingRules=ROUTING_RULES['AmazonExample1+Protocol=https']), url='/docs/x', location=dict(proto='https',bucket='{bucket_name}',path='/documents/x'), code=301), dict(xml=dict(RoutingRules=ROUTING_RULES['AmazonExample1+Protocol=https+Hostname=xyzzy']), url='/', location=None, code=200), dict(xml=dict(RoutingRules=ROUTING_RULES['AmazonExample1+Protocol=https+Hostname=xyzzy']), url='/x', location=None, code=404), dict(xml=dict(RoutingRules=ROUTING_RULES['AmazonExample1+Protocol=https+Hostname=xyzzy']), url='/docs/', location=dict(proto='https',hostname='xyzzy',path='/documents/'), code=301), dict(xml=dict(RoutingRules=ROUTING_RULES['AmazonExample1+Protocol=https+Hostname=xyzzy']), url='/docs/x', location=dict(proto='https',hostname='xyzzy',path='/documents/x'), code=301), dict(xml=dict(RoutingRules=ROUTING_RULES['AmazonExample2']), url='/images/', location=dict(proto='http',bucket='{bucket_name}',path='/folderdeleted.html'), code=301), dict(xml=dict(RoutingRules=ROUTING_RULES['AmazonExample2']), url='/images/x', location=dict(proto='http',bucket='{bucket_name}',path='/folderdeleted.html'), code=301), dict(xml=dict(RoutingRules=ROUTING_RULES['AmazonExample3']), url='/x', location=dict(proto='http',hostname='ec2-11-22-333-44.compute-1.amazonaws.com',path='/report-404/x'), code=301), dict(xml=dict(RoutingRules=ROUTING_RULES['AmazonExample3']), url='/images/x', location=dict(proto='http',hostname='ec2-11-22-333-44.compute-1.amazonaws.com',path='/report-404/images/x'), code=301), dict(xml=dict(RoutingRules=ROUTING_RULES['AmazonExample3+KeyPrefixEquals']), url='/x', location=None, code=404), dict(xml=dict(RoutingRules=ROUTING_RULES['AmazonExample3+KeyPrefixEquals']), url='/images/x', location=dict(proto='http',hostname='ec2-11-22-333-44.compute-1.amazonaws.com',path='/report-404/x'), code=301), ] ROUTING_ERROR_PROTOCOL = dict(code=400, reason='Bad Request', errorcode='InvalidRequest', bodyregex=r'Invalid protocol, protocol can be http or https. If not defined the protocol will be selected automatically.') ROUTING_RULES_TESTS_ERRORS = [ # TODO: Unused! # Invalid protocol, protocol can be http or https. If not defined the protocol will be selected automatically. dict(xml=dict(RoutingRules=ROUTING_RULES['AmazonExample1+Protocol=http2']), url='/', location=None, code=400, error=ROUTING_ERROR_PROTOCOL), dict(xml=dict(RoutingRules=ROUTING_RULES['AmazonExample1+Protocol=http2']), url='/x', location=None, code=400, error=ROUTING_ERROR_PROTOCOL), dict(xml=dict(RoutingRules=ROUTING_RULES['AmazonExample1+Protocol=http2']), url='/docs/', location=None, code=400, error=ROUTING_ERROR_PROTOCOL), dict(xml=dict(RoutingRules=ROUTING_RULES['AmazonExample1+Protocol=http2']), url='/docs/x', location=None, code=400, error=ROUTING_ERROR_PROTOCOL), ] VALID_AMZ_REDIRECT = set([301,302,303,304,305,307,308]) # General lots of tests for redirect_code in VALID_AMZ_REDIRECT: rules = ROUTING_RULES['AmazonExample2+HttpRedirectCode=TMPL'].format(HttpRedirectCode=redirect_code) result = redirect_code ROUTING_RULES_TESTS.append( dict(xml=dict(RoutingRules=rules), url='/images/', location=dict(proto='http',bucket='{bucket_name}',path='/folderdeleted.html'), code=result) ) ROUTING_RULES_TESTS.append( dict(xml=dict(RoutingRules=rules), url='/images/x', location=dict(proto='http',bucket='{bucket_name}',path='/folderdeleted.html'), code=result) ) # TODO: # codes other than those in VALID_AMZ_REDIRECT # give an error of 'The provided HTTP redirect code (314) is not valid. Valid codes are 3XX except 300.' during setting the website config # we should check that we can return that too on ceph def routing_setup(): check_can_test_website() kwargs = {'obj':[]} bucket = get_new_bucket() kwargs['bucket'] = bucket kwargs['obj'].append(bucket) #f = _test_website_prep(bucket, WEBSITE_CONFIGS_XMLFRAG['IndexDocErrorDoc']) f = _test_website_prep(bucket, '') kwargs.update(f) bucket.set_canned_acl('public-read') k = bucket.new_key('debug-ws.xml') kwargs['obj'].append(k) k.set_contents_from_string('', policy='public-read') k = bucket.new_key(f['IndexDocument_Suffix']) kwargs['obj'].append(k) s = choose_bucket_prefix(template=INDEXDOC_TEMPLATE, max_len=64) k.set_contents_from_string(s) k.set_canned_acl('public-read') k = bucket.new_key(f['ErrorDocument_Key']) kwargs['obj'].append(k) s = choose_bucket_prefix(template=ERRORDOC_TEMPLATE, max_len=64) k.set_contents_from_string(s) k.set_canned_acl('public-read') #time.sleep(1) while bucket.get_key(f['ErrorDocument_Key']) is None: time.sleep(SLEEP_INTERVAL) return kwargs def routing_teardown(**kwargs): for o in reversed(kwargs['obj']): print('Deleting', str(o)) o.delete() @common.with_setup_kwargs(setup=routing_setup, teardown=routing_teardown) #@timed(10) def routing_check(*args, **kwargs): bucket = kwargs['bucket'] args=args[0] #print(args) pprint(args) xml_fields = kwargs.copy() xml_fields.update(args['xml']) k = bucket.get_key('debug-ws.xml') k.set_contents_from_string(str(args)+str(kwargs), policy='public-read') pprint(xml_fields) f = _test_website_prep(bucket, WEBSITE_CONFIGS_XMLFRAG['IndexDocErrorDoc'], hardcoded_fields=xml_fields) #print(f) config_xmlcmp = bucket.get_website_configuration_xml() config_xmlcmp = common.normalize_xml(config_xmlcmp, pretty_print=True) # For us to read res = _website_request(bucket.name, args['url']) print(config_xmlcmp) new_url = args['location'] if new_url is not None: new_url = get_website_url(**new_url) new_url = new_url.format(bucket_name=bucket.name) if args['code'] >= 200 and args['code'] < 300: #body = res.read() #print(body) #eq(body, args['content'], 'default content should match index.html set content') ok(int(res.getheader('Content-Length', -1)) > 0) elif args['code'] >= 300 and args['code'] < 400: _website_expected_redirect_response(res, args['code'], IGNORE_FIELD, new_url) elif args['code'] >= 400: _website_expected_error_response(res, bucket.name, args['code'], IGNORE_FIELD, IGNORE_FIELD) else: assert(False) @attr('s3website_RoutingRules') @attr('s3website') @nose.with_setup(setup=check_can_test_website, teardown=common.teardown) def test_routing_generator(): for t in ROUTING_RULES_TESTS: if 'xml' in t and 'RoutingRules' in t['xml'] and len(t['xml']['RoutingRules']) > 0: t['xml']['RoutingRules'] = common.trim_xml(t['xml']['RoutingRules']) yield routing_check, t