import sys import collections import nose import pytest import string import random from pprint import pprint import time import boto.exception import socket from urllib.parse import urlparse from nose.plugins.attrib import attr from nose.tools import timed from .. import common from . import ( configfile, setup_teardown, get_new_bucket, get_new_bucket_name, s3, config, _make_raw_request, choose_bucket_prefix, ) IGNORE_FIELD = 'IGNORETHIS' SLEEP_INTERVAL = 0.01 SLEEP_MAX = 2.0 WEBSITE_CONFIGS_XMLFRAG = { 'IndexDoc': '${IndexDocument_Suffix}${RoutingRules}', 'IndexDocErrorDoc': '${IndexDocument_Suffix}${ErrorDocument_Key}${RoutingRules}', 'RedirectAll': '${RedirectAllRequestsTo_HostName}${RoutingRules}', 'RedirectAll+Protocol': '${RedirectAllRequestsTo_HostName}${RedirectAllRequestsTo_Protocol}${RoutingRules}', } INDEXDOC_TEMPLATE = '

IndexDoc

{random}' ERRORDOC_TEMPLATE = '

ErrorDoc

{random}' CAN_WEBSITE = None @pytest.fixture(autouse=True, scope="module") def check_can_test_website(): bucket = get_new_bucket() try: wsconf = bucket.get_website_configuration() return True except boto.exception.S3ResponseError as e: if e.status == 404 and e.reason == 'Not Found' and e.error_code in ['NoSuchWebsiteConfiguration', 'NoSuchKey']: return True elif e.status == 405 and e.reason == 'Method Not Allowed' and e.error_code == 'MethodNotAllowed': pytest.skip('rgw_enable_static_website is false') elif e.status == 403 and e.reason == 'SignatureDoesNotMatch' and e.error_code == 'Forbidden': # This is older versions that do not support the website code pytest.skip('static website is not implemented') elif e.status == 501 and e.error_code == 'NotImplemented': pytest.skip('static website is not implemented') else: raise RuntimeError("Unknown response in checking if WebsiteConf is supported", e) finally: bucket.delete() def make_website_config(xml_fragment): """ Take the tedious stuff out of the config """ return '' + xml_fragment + '' def get_website_url(**kwargs): """ Return the URL to a website page """ proto, bucket, hostname, path = 'http', None, None, '/' if 'proto' in kwargs: proto = kwargs['proto'] if 'bucket' in kwargs: bucket = kwargs['bucket'] if 'hostname' in kwargs: hostname = kwargs['hostname'] if 'path' in kwargs: path = kwargs['path'] if hostname is None and bucket is None: return '/' + path.lstrip('/') domain = config['main']['host'] if('s3website_domain' in config['main']): domain = config['main']['s3website_domain'] elif('s3website_domain' in config['alt']): domain = config['DEFAULT']['s3website_domain'] if hostname is None and bucket is not None: hostname = '%s.%s' % (bucket, domain) path = path.lstrip('/') return "%s://%s/%s" % (proto, hostname, path) def _test_website_populate_fragment(xml_fragment, fields): for k in ['RoutingRules']: if k in list(fields.keys()) and len(fields[k]) > 0: fields[k] = '<%s>%s' % (k, fields[k], k) f = { 'IndexDocument_Suffix': choose_bucket_prefix(template='index-{random}.html', max_len=32), 'ErrorDocument_Key': choose_bucket_prefix(template='error-{random}.html', max_len=32), 'RedirectAllRequestsTo_HostName': choose_bucket_prefix(template='{random}.{random}.com', max_len=32), 'RoutingRules': '' } f.update(fields) xml_fragment = string.Template(xml_fragment).safe_substitute(**f) return xml_fragment, f def _test_website_prep(bucket, xml_template, hardcoded_fields = {}, expect_fail=None): xml_fragment, f = _test_website_populate_fragment(xml_template, hardcoded_fields) f['WebsiteConfiguration'] = '' if not xml_template: bucket.delete_website_configuration() return f config_xmlnew = make_website_config(xml_fragment) config_xmlold = '' try: config_xmlold = common.normalize_xml(bucket.get_website_configuration_xml(), pretty_print=True) except boto.exception.S3ResponseError as e: if str(e.status) == str(404) \ and ('NoSuchWebsiteConfiguration' in e.body or 'NoSuchWebsiteConfiguration' in e.code or 'NoSuchKey' in e.body or 'NoSuchKey' in e.code): pass else: raise e try: bucket.set_website_configuration_xml(common.trim_xml(config_xmlnew)) config_xmlnew = common.normalize_xml(config_xmlnew, pretty_print=True) except boto.exception.S3ResponseError as e: if expect_fail is not None: if isinstance(expect_fail, dict): pass elif isinstance(expect_fail, str): pass raise e # TODO: in some cases, it takes non-zero time for the config to be applied by AmazonS3 # We should figure out how to poll for changes better # WARNING: eu-west-1 as of 2015/06/22 was taking at least 4 seconds to propogate website configs, esp when you cycle between non-null configs time.sleep(0.1) config_xmlcmp = common.normalize_xml(bucket.get_website_configuration_xml(), pretty_print=True) #if config_xmlold is not None: # print('old',config_xmlold.replace("\n",'')) #if config_xmlcmp is not None: # print('cmp',config_xmlcmp.replace("\n",'')) #if config_xmlnew is not None: # print('new',config_xmlnew.replace("\n",'')) # Cleanup for our validation common.assert_xml_equal(config_xmlcmp, config_xmlnew) #print("config_xmlcmp\n", config_xmlcmp) #assert config_xmlnew == config_xmlcmp f['WebsiteConfiguration'] = config_xmlcmp return f def __website_expected_reponse_status(res, status, reason): if not isinstance(status, collections.Container): status = set([status]) if not isinstance(reason, collections.Container): reason = set([reason]) if status is not IGNORE_FIELD: assert res.status in status, 'HTTP code was %s should be %s' % (res.status, status) if reason is not IGNORE_FIELD: assert res.reason in reason, 'HTTP reason was was %s should be %s' % (res.reason, reason) def _website_expected_default_html(**kwargs): fields = [] for k in list(kwargs.keys()): # AmazonS3 seems to be inconsistent, some HTML errors include BucketName, but others do not. if k is 'BucketName': continue v = kwargs[k] if isinstance(v, str): v = [v] elif not isinstance(v, collections.Container): v = [v] for v2 in v: s = '
  • %s: %s
  • ' % (k,v2) fields.append(s) return fields def _website_expected_error_response(res, bucket_name, status, reason, code, content=None, body=None): if body is None: body = res.read() print(body) __website_expected_reponse_status(res, status, reason) # Argh, AmazonS3 is really inconsistent, so we have a conditional test! # This is most visible if you have an ErrorDoc present errorcode = res.getheader('x-amz-error-code', None) if errorcode is not None: if code is not IGNORE_FIELD: assert errorcode == code if not isinstance(content, collections.Container): content = set([content]) for f in content: if f is not IGNORE_FIELD and f is not None: f = bytes(f, 'utf-8') assert f in body, 'HTML should contain "%s"' % (f, ) def _website_expected_redirect_response(res, status, reason, new_url): body = res.read() print(body) __website_expected_reponse_status(res, status, reason) loc = res.getheader('Location', None) assert loc == new_url, 'Location header should be set "%s" != "%s"' % (loc,new_url,) assert len(body) == 0, 'Body of a redirect should be empty' def _website_request(bucket_name, path, connect_hostname=None, method='GET', timeout=None): url = get_website_url(proto='http', bucket=bucket_name, path=path) print("url", url) o = urlparse(url) if connect_hostname is None: connect_hostname = o.hostname path = o.path + '?' + o.query request_headers={} request_headers['Host'] = o.hostname request_headers['Accept'] = '*/*' print('Request: {method} {path}\n{headers}'.format(method=method, path=path, headers=''.join([t[0]+':'+t[1]+"\n" for t in list(request_headers.items())]))) res = _make_raw_request(connect_hostname, config.main.port, method, path, request_headers=request_headers, secure=False, timeout=timeout) for (k,v) in res.getheaders(): print(k,v) return res # ---------- Non-existant buckets via the website endpoint @attr(resource='bucket') @attr(method='get') @attr(operation='list') @attr(assertion='non-existant bucket via website endpoint should give NoSuchBucket, exposing security risk') @attr('s3website') @pytest.mark.s3website @attr('fails_on_rgw') @pytest.mark.fails_on_rgw def test_website_nonexistant_bucket_s3(): bucket_name = get_new_bucket_name() res = _website_request(bucket_name, '') _website_expected_error_response(res, bucket_name, 404, 'Not Found', 'NoSuchBucket', content=_website_expected_default_html(Code='NoSuchBucket')) @attr(resource='bucket') @attr(method='get') @attr(operation='list') #@attr(assertion='non-existant bucket via website endpoint should give Forbidden, keeping bucket identity secure') @attr(assertion='non-existant bucket via website endpoint should give NoSuchBucket') @attr('s3website') @pytest.mark.s3website @attr('fails_on_s3') @pytest.mark.fails_on_s3 @attr('fails_on_dbstore') @pytest.mark.fails_on_dbstore def test_website_nonexistant_bucket_rgw(): bucket_name = get_new_bucket_name() res = _website_request(bucket_name, '') #_website_expected_error_response(res, bucket_name, 403, 'Forbidden', 'AccessDenied', content=_website_expected_default_html(Code='AccessDenied')) _website_expected_error_response(res, bucket_name, 404, 'Not Found', 'NoSuchBucket', content=_website_expected_default_html(Code='NoSuchBucket')) #------------- IndexDocument only, successes @attr(resource='bucket') @attr(method='get') @attr(operation='list') @attr(assertion='non-empty public buckets via s3website return page for /, where page is public') @attr('s3website') @pytest.mark.s3website @attr('fails_on_dbstore') @pytest.mark.fails_on_dbstore @timed(10) def test_website_public_bucket_list_public_index(): bucket = get_new_bucket() f = _test_website_prep(bucket, WEBSITE_CONFIGS_XMLFRAG['IndexDoc']) bucket.make_public() indexhtml = bucket.new_key(f['IndexDocument_Suffix']) indexstring = choose_bucket_prefix(template=INDEXDOC_TEMPLATE, max_len=256) indexhtml.set_contents_from_string(indexstring) indexhtml.make_public() #time.sleep(1) while bucket.get_key(f['IndexDocument_Suffix']) is None: time.sleep(SLEEP_INTERVAL) res = _website_request(bucket.name, '') body = res.read() print(body) indexstring = bytes(indexstring, 'utf-8') assert body == indexstring # default content should match index.html set content __website_expected_reponse_status(res, 200, 'OK') indexhtml.delete() bucket.delete() @attr(resource='bucket') @attr(method='get') @attr(operation='list') @attr(assertion='non-empty private buckets via s3website return page for /, where page is private') @attr('s3website') @pytest.mark.s3website @attr('fails_on_dbstore') @pytest.mark.fails_on_dbstore def test_website_private_bucket_list_public_index(): bucket = get_new_bucket() f = _test_website_prep(bucket, WEBSITE_CONFIGS_XMLFRAG['IndexDoc']) bucket.set_canned_acl('private') indexhtml = bucket.new_key(f['IndexDocument_Suffix']) indexstring = choose_bucket_prefix(template=INDEXDOC_TEMPLATE, max_len=256) indexhtml.set_contents_from_string(indexstring) indexhtml.make_public() #time.sleep(1) while bucket.get_key(f['IndexDocument_Suffix']) is None: time.sleep(SLEEP_INTERVAL) res = _website_request(bucket.name, '') __website_expected_reponse_status(res, 200, 'OK') body = res.read() print(body) indexstring = bytes(indexstring, 'utf-8') assert body == indexstring, 'default content should match index.html set content' indexhtml.delete() bucket.delete() # ---------- IndexDocument only, failures @attr(resource='bucket') @attr(method='get') @attr(operation='list') @attr(assertion='empty private buckets via s3website return a 403 for /') @attr('s3website') @pytest.mark.s3website @attr('fails_on_dbstore') @pytest.mark.fails_on_dbstore def test_website_private_bucket_list_empty(): bucket = get_new_bucket() f = _test_website_prep(bucket, WEBSITE_CONFIGS_XMLFRAG['IndexDoc']) bucket.set_canned_acl('private') # TODO: wait for sync res = _website_request(bucket.name, '') _website_expected_error_response(res, bucket.name, 403, 'Forbidden', 'AccessDenied', content=_website_expected_default_html(Code='AccessDenied')) bucket.delete() @attr(resource='bucket') @attr(method='get') @attr(operation='list') @attr(assertion='empty public buckets via s3website return a 404 for /') @attr('s3website') @pytest.mark.s3website @attr('fails_on_dbstore') @pytest.mark.fails_on_dbstore def test_website_public_bucket_list_empty(): bucket = get_new_bucket() f = _test_website_prep(bucket, WEBSITE_CONFIGS_XMLFRAG['IndexDoc']) bucket.make_public() res = _website_request(bucket.name, '') _website_expected_error_response(res, bucket.name, 404, 'Not Found', 'NoSuchKey', content=_website_expected_default_html(Code='NoSuchKey')) bucket.delete() @attr(resource='bucket') @attr(method='get') @attr(operation='list') @attr(assertion='non-empty public buckets via s3website return page for /, where page is private') @attr('s3website') @pytest.mark.s3website @attr('fails_on_dbstore') @pytest.mark.fails_on_dbstore def test_website_public_bucket_list_private_index(): bucket = get_new_bucket() f = _test_website_prep(bucket, WEBSITE_CONFIGS_XMLFRAG['IndexDoc']) bucket.make_public() indexhtml = bucket.new_key(f['IndexDocument_Suffix']) indexstring = choose_bucket_prefix(template=INDEXDOC_TEMPLATE, max_len=256) indexhtml.set_contents_from_string(indexstring) indexhtml.set_canned_acl('private') #time.sleep(1) #time.sleep(1) while bucket.get_key(f['IndexDocument_Suffix']) is None: time.sleep(SLEEP_INTERVAL) res = _website_request(bucket.name, '') _website_expected_error_response(res, bucket.name, 403, 'Forbidden', 'AccessDenied', content=_website_expected_default_html(Code='AccessDenied')) indexhtml.delete() bucket.delete() @attr(resource='bucket') @attr(method='get') @attr(operation='list') @attr(assertion='non-empty private buckets via s3website return page for /, where page is private') @attr('s3website') @pytest.mark.s3website @attr('fails_on_dbstore') @pytest.mark.fails_on_dbstore def test_website_private_bucket_list_private_index(): bucket = get_new_bucket() f = _test_website_prep(bucket, WEBSITE_CONFIGS_XMLFRAG['IndexDoc']) bucket.set_canned_acl('private') indexhtml = bucket.new_key(f['IndexDocument_Suffix']) indexstring = choose_bucket_prefix(template=INDEXDOC_TEMPLATE, max_len=256) indexhtml.set_contents_from_string(indexstring) indexhtml.set_canned_acl('private') ##time.sleep(1) while bucket.get_key(f['IndexDocument_Suffix']) is None: time.sleep(SLEEP_INTERVAL) res = _website_request(bucket.name, '') _website_expected_error_response(res, bucket.name, 403, 'Forbidden', 'AccessDenied', content=_website_expected_default_html(Code='AccessDenied')) indexhtml.delete() bucket.delete() # ---------- IndexDocument & ErrorDocument, failures due to errordoc assigned but missing @attr(resource='bucket') @attr(method='get') @attr(operation='list') @attr(assertion='empty private buckets via s3website return a 403 for /, missing errordoc') @attr('s3website') @pytest.mark.s3website @attr('fails_on_dbstore') @pytest.mark.fails_on_dbstore def test_website_private_bucket_list_empty_missingerrordoc(): bucket = get_new_bucket() f = _test_website_prep(bucket, WEBSITE_CONFIGS_XMLFRAG['IndexDocErrorDoc']) bucket.set_canned_acl('private') res = _website_request(bucket.name, '') _website_expected_error_response(res, bucket.name, 403, 'Forbidden', 'AccessDenied', content=_website_expected_default_html(Code='AccessDenied')) bucket.delete() @attr(resource='bucket') @attr(method='get') @attr(operation='list') @attr(assertion='empty public buckets via s3website return a 404 for /, missing errordoc') @attr('s3website') @pytest.mark.s3website @attr('fails_on_dbstore') @pytest.mark.fails_on_dbstore def test_website_public_bucket_list_empty_missingerrordoc(): bucket = get_new_bucket() f = _test_website_prep(bucket, WEBSITE_CONFIGS_XMLFRAG['IndexDocErrorDoc']) bucket.make_public() res = _website_request(bucket.name, '') _website_expected_error_response(res, bucket.name, 404, 'Not Found', 'NoSuchKey') bucket.delete() @attr(resource='bucket') @attr(method='get') @attr(operation='list') @attr(assertion='non-empty public buckets via s3website return page for /, where page is private, missing errordoc') @attr('s3website') @pytest.mark.s3website @attr('fails_on_dbstore') @pytest.mark.fails_on_dbstore def test_website_public_bucket_list_private_index_missingerrordoc(): bucket = get_new_bucket() f = _test_website_prep(bucket, WEBSITE_CONFIGS_XMLFRAG['IndexDocErrorDoc']) bucket.make_public() indexhtml = bucket.new_key(f['IndexDocument_Suffix']) indexstring = choose_bucket_prefix(template=INDEXDOC_TEMPLATE, max_len=256) indexhtml.set_contents_from_string(indexstring) indexhtml.set_canned_acl('private') #time.sleep(1) while bucket.get_key(f['IndexDocument_Suffix']) is None: time.sleep(SLEEP_INTERVAL) res = _website_request(bucket.name, '') _website_expected_error_response(res, bucket.name, 403, 'Forbidden', 'AccessDenied', content=_website_expected_default_html(Code='AccessDenied')) indexhtml.delete() bucket.delete() @attr(resource='bucket') @attr(method='get') @attr(operation='list') @attr(assertion='non-empty private buckets via s3website return page for /, where page is private, missing errordoc') @attr('s3website') @pytest.mark.s3website @attr('fails_on_dbstore') @pytest.mark.fails_on_dbstore def test_website_private_bucket_list_private_index_missingerrordoc(): bucket = get_new_bucket() f = _test_website_prep(bucket, WEBSITE_CONFIGS_XMLFRAG['IndexDocErrorDoc']) bucket.set_canned_acl('private') indexhtml = bucket.new_key(f['IndexDocument_Suffix']) indexstring = choose_bucket_prefix(template=INDEXDOC_TEMPLATE, max_len=256) indexhtml.set_contents_from_string(indexstring) indexhtml.set_canned_acl('private') #time.sleep(1) while bucket.get_key(f['IndexDocument_Suffix']) is None: time.sleep(SLEEP_INTERVAL) res = _website_request(bucket.name, '') _website_expected_error_response(res, bucket.name, 403, 'Forbidden', 'AccessDenied', content=_website_expected_default_html(Code='AccessDenied')) indexhtml.delete() bucket.delete() # ---------- IndexDocument & ErrorDocument, failures due to errordoc assigned but not accessible @attr(resource='bucket') @attr(method='get') @attr(operation='list') @attr(assertion='empty private buckets via s3website return a 403 for /, blocked errordoc') @attr('s3website') @pytest.mark.s3website @attr('fails_on_dbstore') @pytest.mark.fails_on_dbstore def test_website_private_bucket_list_empty_blockederrordoc(): bucket = get_new_bucket() f = _test_website_prep(bucket, WEBSITE_CONFIGS_XMLFRAG['IndexDocErrorDoc']) bucket.set_canned_acl('private') errorhtml = bucket.new_key(f['ErrorDocument_Key']) errorstring = choose_bucket_prefix(template=ERRORDOC_TEMPLATE, max_len=256) errorhtml.set_contents_from_string(errorstring) errorhtml.set_canned_acl('private') #time.sleep(1) while bucket.get_key(f['ErrorDocument_Key']) is None: time.sleep(SLEEP_INTERVAL) res = _website_request(bucket.name, '') body = res.read() print(body) _website_expected_error_response(res, bucket.name, 403, 'Forbidden', 'AccessDenied', content=_website_expected_default_html(Code='AccessDenied'), body=body) errorstring = bytes(errorstring, 'utf-8') assert errorstring not in body, 'error content should NOT match error.html set content' errorhtml.delete() bucket.delete() @attr(resource='bucket') @attr(method='get') @attr(operation='list') @attr(assertion='check if there is an invalid payload after serving error doc') @attr('s3website') @pytest.mark.s3website @attr('fails_on_dbstore') @pytest.mark.fails_on_dbstore def test_website_public_bucket_list_pubilc_errordoc(): bucket = get_new_bucket() f = _test_website_prep(bucket, WEBSITE_CONFIGS_XMLFRAG['IndexDocErrorDoc']) bucket.make_public() errorhtml = bucket.new_key(f['ErrorDocument_Key']) errorstring = choose_bucket_prefix(template=ERRORDOC_TEMPLATE, max_len=256) errorhtml.set_contents_from_string(errorstring) errorhtml.set_canned_acl('public-read') url = get_website_url(proto='http', bucket=bucket.name, path='') o = urlparse(url) host = o.hostname port = s3.main.port sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) sock.connect((host, port)) request = "GET / HTTP/1.1\r\nHost:%s.%s:%s\r\n\r\n" % (bucket.name, host, port) sock.send(request.encode()) #receive header resp = sock.recv(4096) print(resp) #receive body resp = sock.recv(4096) print('payload length=%d' % len(resp)) print(resp) #check if any additional payload is left resp_len = 0 sock.settimeout(2) try: resp = sock.recv(4096) resp_len = len(resp) print('invalid payload length=%d' % resp_len) print(resp) except socket.timeout: print('no invalid payload') assert resp_len == 0, 'invalid payload' errorhtml.delete() bucket.delete() @attr(resource='bucket') @attr(method='get') @attr(operation='list') @attr(assertion='empty public buckets via s3website return a 404 for /, blocked errordoc') @attr('s3website') @pytest.mark.s3website @attr('fails_on_dbstore') @pytest.mark.fails_on_dbstore def test_website_public_bucket_list_empty_blockederrordoc(): bucket = get_new_bucket() f = _test_website_prep(bucket, WEBSITE_CONFIGS_XMLFRAG['IndexDocErrorDoc']) bucket.make_public() errorhtml = bucket.new_key(f['ErrorDocument_Key']) errorstring = choose_bucket_prefix(template=ERRORDOC_TEMPLATE, max_len=256) errorhtml.set_contents_from_string(errorstring) errorhtml.set_canned_acl('private') while bucket.get_key(f['ErrorDocument_Key']) is None: time.sleep(SLEEP_INTERVAL) res = _website_request(bucket.name, '') body = res.read() print(body) _website_expected_error_response(res, bucket.name, 404, 'Not Found', 'NoSuchKey', content=_website_expected_default_html(Code='NoSuchKey'), body=body) errorstring = bytes(errorstring, 'utf-8') assert errorstring not in body, 'error content should match error.html set content' errorhtml.delete() bucket.delete() @attr(resource='bucket') @attr(method='get') @attr(operation='list') @attr(assertion='non-empty public buckets via s3website return page for /, where page is private, blocked errordoc') @attr('s3website') @pytest.mark.s3website @attr('fails_on_dbstore') @pytest.mark.fails_on_dbstore def test_website_public_bucket_list_private_index_blockederrordoc(): bucket = get_new_bucket() f = _test_website_prep(bucket, WEBSITE_CONFIGS_XMLFRAG['IndexDocErrorDoc']) bucket.make_public() indexhtml = bucket.new_key(f['IndexDocument_Suffix']) indexstring = choose_bucket_prefix(template=INDEXDOC_TEMPLATE, max_len=256) indexhtml.set_contents_from_string(indexstring) indexhtml.set_canned_acl('private') errorhtml = bucket.new_key(f['ErrorDocument_Key']) errorstring = choose_bucket_prefix(template=ERRORDOC_TEMPLATE, max_len=256) errorhtml.set_contents_from_string(errorstring) errorhtml.set_canned_acl('private') #time.sleep(1) while bucket.get_key(f['ErrorDocument_Key']) is None: time.sleep(SLEEP_INTERVAL) res = _website_request(bucket.name, '') body = res.read() print(body) _website_expected_error_response(res, bucket.name, 403, 'Forbidden', 'AccessDenied', content=_website_expected_default_html(Code='AccessDenied'), body=body) errorstring = bytes(errorstring, 'utf-8') assert errorstring not in body, 'error content should match error.html set content' indexhtml.delete() errorhtml.delete() bucket.delete() @attr(resource='bucket') @attr(method='get') @attr(operation='list') @attr(assertion='non-empty private buckets via s3website return page for /, where page is private, blocked errordoc') @attr('s3website') @pytest.mark.s3website @attr('fails_on_dbstore') @pytest.mark.fails_on_dbstore def test_website_private_bucket_list_private_index_blockederrordoc(): bucket = get_new_bucket() f = _test_website_prep(bucket, WEBSITE_CONFIGS_XMLFRAG['IndexDocErrorDoc']) bucket.set_canned_acl('private') indexhtml = bucket.new_key(f['IndexDocument_Suffix']) indexstring = choose_bucket_prefix(template=INDEXDOC_TEMPLATE, max_len=256) indexhtml.set_contents_from_string(indexstring) indexhtml.set_canned_acl('private') errorhtml = bucket.new_key(f['ErrorDocument_Key']) errorstring = choose_bucket_prefix(template=ERRORDOC_TEMPLATE, max_len=256) errorhtml.set_contents_from_string(errorstring) errorhtml.set_canned_acl('private') #time.sleep(1) while bucket.get_key(f['ErrorDocument_Key']) is None: time.sleep(SLEEP_INTERVAL) res = _website_request(bucket.name, '') body = res.read() print(body) _website_expected_error_response(res, bucket.name, 403, 'Forbidden', 'AccessDenied', content=_website_expected_default_html(Code='AccessDenied'), body=body) errorstring = bytes(errorstring, 'utf-8') assert errorstring not in body, 'error content should match error.html set content' indexhtml.delete() errorhtml.delete() bucket.delete() # ---------- IndexDocument & ErrorDocument, failures with errordoc available @attr(resource='bucket') @attr(method='get') @attr(operation='list') @attr(assertion='empty private buckets via s3website return a 403 for /, good errordoc') @attr('s3website') @pytest.mark.s3website @attr('fails_on_dbstore') @pytest.mark.fails_on_dbstore def test_website_private_bucket_list_empty_gooderrordoc(): bucket = get_new_bucket() f = _test_website_prep(bucket, WEBSITE_CONFIGS_XMLFRAG['IndexDocErrorDoc']) bucket.set_canned_acl('private') errorhtml = bucket.new_key(f['ErrorDocument_Key']) errorstring = choose_bucket_prefix(template=ERRORDOC_TEMPLATE, max_len=256) errorhtml.set_contents_from_string(errorstring, policy='public-read') #time.sleep(1) while bucket.get_key(f['ErrorDocument_Key']) is None: time.sleep(SLEEP_INTERVAL) res = _website_request(bucket.name, '') _website_expected_error_response(res, bucket.name, 403, 'Forbidden', 'AccessDenied', content=[errorstring]) errorhtml.delete() bucket.delete() @attr(resource='bucket') @attr(method='get') @attr(operation='list') @attr(assertion='empty public buckets via s3website return a 404 for /, good errordoc') @attr('s3website') @pytest.mark.s3website @attr('fails_on_dbstore') @pytest.mark.fails_on_dbstore def test_website_public_bucket_list_empty_gooderrordoc(): bucket = get_new_bucket() f = _test_website_prep(bucket, WEBSITE_CONFIGS_XMLFRAG['IndexDocErrorDoc']) bucket.make_public() errorhtml = bucket.new_key(f['ErrorDocument_Key']) errorstring = choose_bucket_prefix(template=ERRORDOC_TEMPLATE, max_len=256) errorhtml.set_contents_from_string(errorstring) errorhtml.set_canned_acl('public-read') #time.sleep(1) while bucket.get_key(f['ErrorDocument_Key']) is None: time.sleep(SLEEP_INTERVAL) res = _website_request(bucket.name, '') _website_expected_error_response(res, bucket.name, 404, 'Not Found', 'NoSuchKey', content=[errorstring]) errorhtml.delete() bucket.delete() @attr(resource='bucket') @attr(method='get') @attr(operation='list') @attr(assertion='non-empty public buckets via s3website return page for /, where page is private') @attr('s3website') @pytest.mark.s3website @attr('fails_on_dbstore') @pytest.mark.fails_on_dbstore def test_website_public_bucket_list_private_index_gooderrordoc(): bucket = get_new_bucket() f = _test_website_prep(bucket, WEBSITE_CONFIGS_XMLFRAG['IndexDocErrorDoc']) bucket.make_public() indexhtml = bucket.new_key(f['IndexDocument_Suffix']) indexstring = choose_bucket_prefix(template=INDEXDOC_TEMPLATE, max_len=256) indexhtml.set_contents_from_string(indexstring) indexhtml.set_canned_acl('private') errorhtml = bucket.new_key(f['ErrorDocument_Key']) errorstring = choose_bucket_prefix(template=ERRORDOC_TEMPLATE, max_len=256) errorhtml.set_contents_from_string(errorstring) errorhtml.set_canned_acl('public-read') #time.sleep(1) while bucket.get_key(f['ErrorDocument_Key']) is None: time.sleep(SLEEP_INTERVAL) res = _website_request(bucket.name, '') _website_expected_error_response(res, bucket.name, 403, 'Forbidden', 'AccessDenied', content=[errorstring]) indexhtml.delete() errorhtml.delete() bucket.delete() @attr(resource='bucket') @attr(method='get') @attr(operation='list') @attr(assertion='non-empty private buckets via s3website return page for /, where page is private') @attr('s3website') @pytest.mark.s3website @attr('fails_on_dbstore') @pytest.mark.fails_on_dbstore def test_website_private_bucket_list_private_index_gooderrordoc(): bucket = get_new_bucket() f = _test_website_prep(bucket, WEBSITE_CONFIGS_XMLFRAG['IndexDocErrorDoc']) bucket.set_canned_acl('private') indexhtml = bucket.new_key(f['IndexDocument_Suffix']) indexstring = choose_bucket_prefix(template=INDEXDOC_TEMPLATE, max_len=256) indexhtml.set_contents_from_string(indexstring) indexhtml.set_canned_acl('private') errorhtml = bucket.new_key(f['ErrorDocument_Key']) errorstring = choose_bucket_prefix(template=ERRORDOC_TEMPLATE, max_len=256) errorhtml.set_contents_from_string(errorstring) errorhtml.set_canned_acl('public-read') #time.sleep(1) while bucket.get_key(f['ErrorDocument_Key']) is None: time.sleep(SLEEP_INTERVAL) res = _website_request(bucket.name, '') _website_expected_error_response(res, bucket.name, 403, 'Forbidden', 'AccessDenied', content=[errorstring]) indexhtml.delete() errorhtml.delete() bucket.delete() # ------ RedirectAll tests @attr(resource='bucket') @attr(method='get') @attr(operation='list') @attr(assertion='RedirectAllRequestsTo without protocol should TODO') @attr('s3website') @pytest.mark.s3website @attr('fails_on_dbstore') @pytest.mark.fails_on_dbstore def test_website_bucket_private_redirectall_base(): bucket = get_new_bucket() f = _test_website_prep(bucket, WEBSITE_CONFIGS_XMLFRAG['RedirectAll']) bucket.set_canned_acl('private') res = _website_request(bucket.name, '') new_url = 'http://%s/' % f['RedirectAllRequestsTo_HostName'] _website_expected_redirect_response(res, 301, ['Moved Permanently'], new_url) bucket.delete() @attr(resource='bucket') @attr(method='get') @attr(operation='list') @attr(assertion='RedirectAllRequestsTo without protocol should TODO') @attr('s3website') @pytest.mark.s3website @attr('fails_on_dbstore') @pytest.mark.fails_on_dbstore def test_website_bucket_private_redirectall_path(): bucket = get_new_bucket() f = _test_website_prep(bucket, WEBSITE_CONFIGS_XMLFRAG['RedirectAll']) bucket.set_canned_acl('private') pathfragment = choose_bucket_prefix(template='/{random}', max_len=16) res = _website_request(bucket.name, pathfragment) new_url = 'http://%s%s' % (f['RedirectAllRequestsTo_HostName'], pathfragment) _website_expected_redirect_response(res, 301, ['Moved Permanently'], new_url) bucket.delete() @attr(resource='bucket') @attr(method='get') @attr(operation='list') @attr(assertion='RedirectAllRequestsTo without protocol should TODO') @attr('s3website') @pytest.mark.s3website @attr('fails_on_dbstore') @pytest.mark.fails_on_dbstore def test_website_bucket_private_redirectall_path_upgrade(): bucket = get_new_bucket() x = string.Template(WEBSITE_CONFIGS_XMLFRAG['RedirectAll+Protocol']).safe_substitute(RedirectAllRequestsTo_Protocol='https') f = _test_website_prep(bucket, x) bucket.set_canned_acl('private') pathfragment = choose_bucket_prefix(template='/{random}', max_len=16) res = _website_request(bucket.name, pathfragment) new_url = 'https://%s%s' % (f['RedirectAllRequestsTo_HostName'], pathfragment) _website_expected_redirect_response(res, 301, ['Moved Permanently'], new_url) bucket.delete() # ------ x-amz redirect tests @attr(resource='bucket') @attr(method='get') @attr(operation='list') @attr(assertion='x-amz-website-redirect-location should not fire without websiteconf') @attr('s3website') @pytest.mark.s3website @attr('x-amz-website-redirect-location') @pytest.mark.s3website_redirect_location @attr('fails_on_dbstore') @pytest.mark.fails_on_dbstore def test_website_xredirect_nonwebsite(): bucket = get_new_bucket() #f = _test_website_prep(bucket, WEBSITE_CONFIGS_XMLFRAG['RedirectAll']) #bucket.set_canned_acl('private') k = bucket.new_key('page') content = 'wrong-content' redirect_dest = '/relative' headers = {'x-amz-website-redirect-location': redirect_dest} k.set_contents_from_string(content, headers=headers, policy='public-read') redirect = k.get_redirect() assert k.get_redirect() == redirect_dest res = _website_request(bucket.name, '/page') body = res.read() print(body) expected_content = _website_expected_default_html(Code='NoSuchWebsiteConfiguration', BucketName=bucket.name) # TODO: RGW does not have custom error messages for different 404s yet #expected_content = _website_expected_default_html(Code='NoSuchWebsiteConfiguration', BucketName=bucket.name, Message='The specified bucket does not have a website configuration') print(expected_content) _website_expected_error_response(res, bucket.name, 404, 'Not Found', 'NoSuchWebsiteConfiguration', content=expected_content, body=body) k.delete() bucket.delete() @attr(resource='bucket') @attr(method='get') @attr(operation='list') @attr(assertion='x-amz-website-redirect-location should fire websiteconf, relative path, public key') @attr('s3website') @pytest.mark.s3website @attr('x-amz-website-redirect-location') @pytest.mark.s3website_redirect_location @attr('fails_on_dbstore') @pytest.mark.fails_on_dbstore def test_website_xredirect_public_relative(): bucket = get_new_bucket() f = _test_website_prep(bucket, WEBSITE_CONFIGS_XMLFRAG['IndexDoc']) bucket.make_public() k = bucket.new_key('page') content = 'wrong-content' redirect_dest = '/relative' headers = {'x-amz-website-redirect-location': redirect_dest} k.set_contents_from_string(content, headers=headers, policy='public-read') redirect = k.get_redirect() assert k.get_redirect() == redirect_dest res = _website_request(bucket.name, '/page') #new_url = get_website_url(bucket_name=bucket.name, path=redirect_dest) _website_expected_redirect_response(res, 301, ['Moved Permanently'], redirect_dest) k.delete() bucket.delete() @attr(resource='bucket') @attr(method='get') @attr(operation='list') @attr(assertion='x-amz-website-redirect-location should fire websiteconf, absolute, public key') @attr('s3website') @pytest.mark.s3website @attr('x-amz-website-redirect-location') @pytest.mark.s3website_redirect_location @attr('fails_on_dbstore') @pytest.mark.fails_on_dbstore def test_website_xredirect_public_abs(): bucket = get_new_bucket() f = _test_website_prep(bucket, WEBSITE_CONFIGS_XMLFRAG['IndexDoc']) bucket.make_public() k = bucket.new_key('page') content = 'wrong-content' redirect_dest = 'http://example.com/foo' headers = {'x-amz-website-redirect-location': redirect_dest} k.set_contents_from_string(content, headers=headers, policy='public-read') redirect = k.get_redirect() assert k.get_redirect() == redirect_dest res = _website_request(bucket.name, '/page') new_url = get_website_url(proto='http', hostname='example.com', path='/foo') _website_expected_redirect_response(res, 301, ['Moved Permanently'], new_url) k.delete() bucket.delete() @attr(resource='bucket') @attr(method='get') @attr(operation='list') @attr(assertion='x-amz-website-redirect-location should fire websiteconf, relative path, private key') @attr('s3website') @pytest.mark.s3website @attr('x-amz-website-redirect-location') @pytest.mark.s3website_redirect_location @attr('fails_on_dbstore') @pytest.mark.fails_on_dbstore def test_website_xredirect_private_relative(): bucket = get_new_bucket() f = _test_website_prep(bucket, WEBSITE_CONFIGS_XMLFRAG['IndexDoc']) bucket.make_public() k = bucket.new_key('page') content = 'wrong-content' redirect_dest = '/relative' headers = {'x-amz-website-redirect-location': redirect_dest} k.set_contents_from_string(content, headers=headers, policy='private') redirect = k.get_redirect() assert k.get_redirect() == redirect_dest res = _website_request(bucket.name, '/page') # We get a 403 because the page is private _website_expected_error_response(res, bucket.name, 403, 'Forbidden', 'AccessDenied', content=_website_expected_default_html(Code='AccessDenied')) k.delete() bucket.delete() @attr(resource='bucket') @attr(method='get') @attr(operation='list') @attr(assertion='x-amz-website-redirect-location should fire websiteconf, absolute, private key') @attr('s3website') @pytest.mark.s3website @attr('x-amz-website-redirect-location') @pytest.mark.s3website_redirect_location @attr('fails_on_dbstore') @pytest.mark.fails_on_dbstore def test_website_xredirect_private_abs(): bucket = get_new_bucket() f = _test_website_prep(bucket, WEBSITE_CONFIGS_XMLFRAG['IndexDoc']) bucket.make_public() k = bucket.new_key('page') content = 'wrong-content' redirect_dest = 'http://example.com/foo' headers = {'x-amz-website-redirect-location': redirect_dest} k.set_contents_from_string(content, headers=headers, policy='private') redirect = k.get_redirect() assert k.get_redirect() == redirect_dest res = _website_request(bucket.name, '/page') new_url = get_website_url(proto='http', hostname='example.com', path='/foo') # We get a 403 because the page is private _website_expected_error_response(res, bucket.name, 403, 'Forbidden', 'AccessDenied', content=_website_expected_default_html(Code='AccessDenied')) k.delete() bucket.delete() # ------ RoutingRules tests # RoutingRules ROUTING_RULES = { 'empty': '', 'AmazonExample1': \ """ docs/ documents/ """, 'AmazonExample1+Protocol=https': \ """ docs/ https documents/ """, 'AmazonExample1+Protocol=https+Hostname=xyzzy': \ """ docs/ https xyzzy documents/ """, 'AmazonExample1+Protocol=http2': \ """ docs/ http2 documents/ """, 'AmazonExample2': \ """ images/ folderdeleted.html """, 'AmazonExample2+HttpRedirectCode=TMPL': \ """ images/ {HttpRedirectCode} folderdeleted.html """, 'AmazonExample3': \ """ 404 ec2-11-22-333-44.compute-1.amazonaws.com report-404/ """, 'AmazonExample3+KeyPrefixEquals': \ """ images/ 404 ec2-11-22-333-44.compute-1.amazonaws.com report-404/ """, } for k in list(ROUTING_RULES.keys()): if len(ROUTING_RULES[k]) > 0: ROUTING_RULES[k] = "\n%s" % (k, ROUTING_RULES[k]) ROUTING_RULES_TESTS = [ dict(xml=dict(RoutingRules=ROUTING_RULES['empty']), url='', location=None, code=200), dict(xml=dict(RoutingRules=ROUTING_RULES['empty']), url='/', location=None, code=200), dict(xml=dict(RoutingRules=ROUTING_RULES['empty']), url='/x', location=None, code=404), dict(xml=dict(RoutingRules=ROUTING_RULES['AmazonExample1']), url='/', location=None, code=200), dict(xml=dict(RoutingRules=ROUTING_RULES['AmazonExample1']), url='/x', location=None, code=404), dict(xml=dict(RoutingRules=ROUTING_RULES['AmazonExample1']), url='/docs/', location=dict(proto='http',bucket='{bucket_name}',path='/documents/'), code=301), dict(xml=dict(RoutingRules=ROUTING_RULES['AmazonExample1']), url='/docs/x', location=dict(proto='http',bucket='{bucket_name}',path='/documents/x'), code=301), dict(xml=dict(RoutingRules=ROUTING_RULES['AmazonExample1+Protocol=https']), url='/', location=None, code=200), dict(xml=dict(RoutingRules=ROUTING_RULES['AmazonExample1+Protocol=https']), url='/x', location=None, code=404), dict(xml=dict(RoutingRules=ROUTING_RULES['AmazonExample1+Protocol=https']), url='/docs/', location=dict(proto='https',bucket='{bucket_name}',path='/documents/'), code=301), dict(xml=dict(RoutingRules=ROUTING_RULES['AmazonExample1+Protocol=https']), url='/docs/x', location=dict(proto='https',bucket='{bucket_name}',path='/documents/x'), code=301), dict(xml=dict(RoutingRules=ROUTING_RULES['AmazonExample1+Protocol=https+Hostname=xyzzy']), url='/', location=None, code=200), dict(xml=dict(RoutingRules=ROUTING_RULES['AmazonExample1+Protocol=https+Hostname=xyzzy']), url='/x', location=None, code=404), dict(xml=dict(RoutingRules=ROUTING_RULES['AmazonExample1+Protocol=https+Hostname=xyzzy']), url='/docs/', location=dict(proto='https',hostname='xyzzy',path='/documents/'), code=301), dict(xml=dict(RoutingRules=ROUTING_RULES['AmazonExample1+Protocol=https+Hostname=xyzzy']), url='/docs/x', location=dict(proto='https',hostname='xyzzy',path='/documents/x'), code=301), dict(xml=dict(RoutingRules=ROUTING_RULES['AmazonExample2']), url='/images/', location=dict(proto='http',bucket='{bucket_name}',path='/folderdeleted.html'), code=301), dict(xml=dict(RoutingRules=ROUTING_RULES['AmazonExample2']), url='/images/x', location=dict(proto='http',bucket='{bucket_name}',path='/folderdeleted.html'), code=301), dict(xml=dict(RoutingRules=ROUTING_RULES['AmazonExample3']), url='/x', location=dict(proto='http',hostname='ec2-11-22-333-44.compute-1.amazonaws.com',path='/report-404/x'), code=301), dict(xml=dict(RoutingRules=ROUTING_RULES['AmazonExample3']), url='/images/x', location=dict(proto='http',hostname='ec2-11-22-333-44.compute-1.amazonaws.com',path='/report-404/images/x'), code=301), dict(xml=dict(RoutingRules=ROUTING_RULES['AmazonExample3+KeyPrefixEquals']), url='/x', location=None, code=404), dict(xml=dict(RoutingRules=ROUTING_RULES['AmazonExample3+KeyPrefixEquals']), url='/images/x', location=dict(proto='http',hostname='ec2-11-22-333-44.compute-1.amazonaws.com',path='/report-404/x'), code=301), ] ROUTING_ERROR_PROTOCOL = dict(code=400, reason='Bad Request', errorcode='InvalidRequest', bodyregex=r'Invalid protocol, protocol can be http or https. If not defined the protocol will be selected automatically.') ROUTING_RULES_TESTS_ERRORS = [ # TODO: Unused! # Invalid protocol, protocol can be http or https. If not defined the protocol will be selected automatically. dict(xml=dict(RoutingRules=ROUTING_RULES['AmazonExample1+Protocol=http2']), url='/', location=None, code=400, error=ROUTING_ERROR_PROTOCOL), dict(xml=dict(RoutingRules=ROUTING_RULES['AmazonExample1+Protocol=http2']), url='/x', location=None, code=400, error=ROUTING_ERROR_PROTOCOL), dict(xml=dict(RoutingRules=ROUTING_RULES['AmazonExample1+Protocol=http2']), url='/docs/', location=None, code=400, error=ROUTING_ERROR_PROTOCOL), dict(xml=dict(RoutingRules=ROUTING_RULES['AmazonExample1+Protocol=http2']), url='/docs/x', location=None, code=400, error=ROUTING_ERROR_PROTOCOL), ] VALID_AMZ_REDIRECT = set([301,302,303,304,305,307,308]) # General lots of tests for redirect_code in VALID_AMZ_REDIRECT: rules = ROUTING_RULES['AmazonExample2+HttpRedirectCode=TMPL'].format(HttpRedirectCode=redirect_code) result = redirect_code ROUTING_RULES_TESTS.append( dict(xml=dict(RoutingRules=rules), url='/images/', location=dict(proto='http',bucket='{bucket_name}',path='/folderdeleted.html'), code=result) ) ROUTING_RULES_TESTS.append( dict(xml=dict(RoutingRules=rules), url='/images/x', location=dict(proto='http',bucket='{bucket_name}',path='/folderdeleted.html'), code=result) ) # TODO: # codes other than those in VALID_AMZ_REDIRECT # give an error of 'The provided HTTP redirect code (314) is not valid. Valid codes are 3XX except 300.' during setting the website config # we should check that we can return that too on ceph def routing_setup(): check_can_test_website() kwargs = {'obj':[]} bucket = get_new_bucket() kwargs['bucket'] = bucket kwargs['obj'].append(bucket) #f = _test_website_prep(bucket, WEBSITE_CONFIGS_XMLFRAG['IndexDocErrorDoc']) f = _test_website_prep(bucket, '') kwargs.update(f) bucket.set_canned_acl('public-read') k = bucket.new_key('debug-ws.xml') kwargs['obj'].append(k) k.set_contents_from_string('', policy='public-read') k = bucket.new_key(f['IndexDocument_Suffix']) kwargs['obj'].append(k) s = choose_bucket_prefix(template=INDEXDOC_TEMPLATE, max_len=64) k.set_contents_from_string(s) k.set_canned_acl('public-read') k = bucket.new_key(f['ErrorDocument_Key']) kwargs['obj'].append(k) s = choose_bucket_prefix(template=ERRORDOC_TEMPLATE, max_len=64) k.set_contents_from_string(s) k.set_canned_acl('public-read') #time.sleep(1) while bucket.get_key(f['ErrorDocument_Key']) is None: time.sleep(SLEEP_INTERVAL) return kwargs def routing_teardown(**kwargs): for o in reversed(kwargs['obj']): print('Deleting', str(o)) o.delete() #@timed(10) def routing_check(*args, **kwargs): bucket = kwargs['bucket'] args=args[0] #print(args) pprint(args) xml_fields = kwargs.copy() xml_fields.update(args['xml']) k = bucket.get_key('debug-ws.xml') k.set_contents_from_string(str(args)+str(kwargs), policy='public-read') pprint(xml_fields) f = _test_website_prep(bucket, WEBSITE_CONFIGS_XMLFRAG['IndexDocErrorDoc'], hardcoded_fields=xml_fields) #print(f) config_xmlcmp = bucket.get_website_configuration_xml() config_xmlcmp = common.normalize_xml(config_xmlcmp, pretty_print=True) # For us to read res = _website_request(bucket.name, args['url']) print(config_xmlcmp) new_url = args['location'] if new_url is not None: new_url = get_website_url(**new_url) new_url = new_url.format(bucket_name=bucket.name) if args['code'] >= 200 and args['code'] < 300: #body = res.read() #print(body) #assert body == args['content'], 'default content should match index.html set content' assert int(res.getheader('Content-Length', -1)) > 0 elif args['code'] >= 300 and args['code'] < 400: _website_expected_redirect_response(res, args['code'], IGNORE_FIELD, new_url) elif args['code'] >= 400: _website_expected_error_response(res, bucket.name, args['code'], IGNORE_FIELD, IGNORE_FIELD) else: assert(False) @attr('s3website_RoutingRules') @pytest.mark.s3website_routing_rules @attr('s3website') @pytest.mark.s3website @attr('fails_on_dbstore') @pytest.mark.fails_on_dbstore def test_routing_generator(): for t in ROUTING_RULES_TESTS: if 'xml' in t and 'RoutingRules' in t['xml'] and len(t['xml']['RoutingRules']) > 0: t['xml']['RoutingRules'] = common.trim_xml(t['xml']['RoutingRules']) yield routing_check, t