Compare commits

..

2 commits

Author SHA1 Message Date
galsalomon66
a99d90faa3 Revert "Revert "Using get bucket name""
This reverts commit 9aaca64335.

revert to get_bucket_name
2023-06-22 19:40:25 +03:00
Ali Maredia
9aaca64335
Revert "Using get bucket name" 2023-06-21 16:47:56 -04:00
13 changed files with 300 additions and 3851 deletions

View file

@ -22,7 +22,7 @@ Once you have that file copied and edited, you can run the tests with::
You can specify which directory of tests to run:: You can specify which directory of tests to run::
S3TEST_CONF=your.conf tox -- s3tests_boto3/functional S3TEST_CONF=your.conf tox s3tests_boto3/functional
You can specify which file of tests to run:: You can specify which file of tests to run::
@ -44,7 +44,7 @@ located in the ``s3test_boto3`` directory.
You can run only the boto3 tests with:: You can run only the boto3 tests with::
S3TEST_CONF=your.conf tox -- s3tests_boto3/functional S3TEST_CONF=your.conf tox -- -m 'not fails_on_rgw' s3tests_boto3/functional
======================== ========================
STS compatibility tests STS compatibility tests

View file

@ -7,7 +7,6 @@ markers =
auth_common auth_common
bucket_policy bucket_policy
bucket_encryption bucket_encryption
checksum
cloud_transition cloud_transition
encryption encryption
fails_on_aws fails_on_aws
@ -17,28 +16,21 @@ markers =
fails_on_rgw fails_on_rgw
fails_on_s3 fails_on_s3
fails_with_subdomain fails_with_subdomain
group
group_policy
iam_account
iam_cross_account
iam_role
iam_tenant
iam_user
lifecycle lifecycle
lifecycle_expiration lifecycle_expiration
lifecycle_transition lifecycle_transition
list_objects_v2 list_objects_v2
object_lock object_lock
role_policy
session_policy session_policy
s3select s3select
s3website s3website
s3website_routing_rules s3website_routing_rules
s3website_redirect_location s3website_redirect_location
sns 3website
sse_s3 sse_s3
storage_class storage_class
tagging tagging
test_of_iam
test_of_sts test_of_sts
token_claims_trust_policy_test token_claims_trust_policy_test
token_principal_tag_role_policy_test token_principal_tag_role_policy_test

View file

@ -8,7 +8,7 @@ munch >=2.0.0
gevent >=1.0 gevent >=1.0
isodate >=0.4.4 isodate >=0.4.4
requests >=2.23.0 requests >=2.23.0
pytz pytz >=2011k
httplib2 httplib2
lxml lxml
pytest pytest

View file

@ -19,14 +19,6 @@ ssl_verify = False
## the prefix to 30 characters long, and avoid collisions ## the prefix to 30 characters long, and avoid collisions
bucket prefix = yournamehere-{random}- bucket prefix = yournamehere-{random}-
# all the iam account resources (users, roles, etc) created
# will start with this name prefix
iam name prefix = s3-tests-
# all the iam account resources (users, roles, etc) created
# will start with this path prefix
iam path prefix = /s3-tests/
[s3 main] [s3 main]
# main display_name set in vstart.sh # main display_name set in vstart.sh
display_name = M. Tester display_name = M. Tester
@ -117,9 +109,6 @@ secret_key = opqrstuvwxyzabcdefghijklmnopqrstuvwxyzab
# tenant email set in vstart.sh # tenant email set in vstart.sh
email = tenanteduser@example.com email = tenanteduser@example.com
# tenant name
tenant = testx
#following section needs to be added for all sts-tests #following section needs to be added for all sts-tests
[iam] [iam]
#used for iam operations in sts-tests #used for iam operations in sts-tests
@ -138,20 +127,6 @@ secret_key = abcdefghijklmnopqrstuvwxyzabcdefghijklmn
#display_name from vstart.sh #display_name from vstart.sh
display_name = youruseridhere display_name = youruseridhere
# iam account root user for iam_account tests
[iam root]
access_key = AAAAAAAAAAAAAAAAAAaa
secret_key = aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa
user_id = RGW11111111111111111
email = account1@ceph.com
# iam account root user in a different account than [iam root]
[iam alt root]
access_key = BBBBBBBBBBBBBBBBBBbb
secret_key = bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb
user_id = RGW22222222222222222
email = account2@ceph.com
#following section needs to be added when you want to run Assume Role With Webidentity test #following section needs to be added when you want to run Assume Role With Webidentity test
[webidentity] [webidentity]
#used for assume role with web identity test in sts-tests #used for assume role with web identity test in sts-tests

View file

@ -36,9 +36,11 @@ from . import (
) )
_orig_conn = {}
_orig_authorize = None _orig_authorize = None
_custom_headers = {} _custom_headers = {}
_remove_headers = [] _remove_headers = []
boto_type = None
# HeaderS3Connection and _our_authorize are necessary to be able to arbitrarily # HeaderS3Connection and _our_authorize are necessary to be able to arbitrarily
@ -82,15 +84,15 @@ def _our_authorize(self, connection, **kwargs):
_update_headers(self.headers) _update_headers(self.headers)
@pytest.fixture def setup():
def hook_headers(setup_teardown): global boto_type
boto_type = None
_orig_conn = {}
# we determine what we need to replace by the existence of particular # we determine what we need to replace by the existence of particular
# attributes. boto 2.0rc1 as fill_in_auth for S3Connection, while boto 2.0 # attributes. boto 2.0rc1 as fill_in_auth for S3Connection, while boto 2.0
# has authorize for HTTPRequest. # has authorize for HTTPRequest.
if hasattr(S3Connection, 'fill_in_auth'): if hasattr(S3Connection, 'fill_in_auth'):
global _orig_conn
boto_type = 'S3Connection' boto_type = 'S3Connection'
for conn in s3: for conn in s3:
_orig_conn[conn] = s3[conn] _orig_conn[conn] = s3[conn]
@ -114,14 +116,19 @@ def hook_headers(setup_teardown):
else: else:
raise RuntimeError raise RuntimeError
yield
def teardown():
global boto_type
# replace original functionality depending on the boto version # replace original functionality depending on the boto version
if boto_type is 'S3Connection': if boto_type is 'S3Connection':
global _orig_conn
for conn in s3: for conn in s3:
s3[conn] = _orig_conn[conn] s3[conn] = _orig_conn[conn]
_orig_conn = {} _orig_conn = {}
elif boto_type is 'HTTPRequest': elif boto_type is 'HTTPRequest':
global _orig_authorize
boto.connection.HTTPRequest.authorize = _orig_authorize boto.connection.HTTPRequest.authorize = _orig_authorize
_orig_authorize = None _orig_authorize = None
else: else:
@ -136,7 +143,7 @@ def _clear_custom_headers():
_remove_headers = [] _remove_headers = []
@pytest.fixture(autouse=True) @pytest.fixture(autouse=True)
def clear_custom_headers(setup_teardown, hook_headers): def clear_custom_headers(setup_teardown):
yield yield
_clear_custom_headers() # clear headers before teardown() _clear_custom_headers() # clear headers before teardown()

View file

@ -290,7 +290,7 @@ def verify_object(bucket, k, data=None, storage_class=None):
if data: if data:
read_data = k.get_contents_as_string() read_data = k.get_contents_as_string()
equal = data == read_data.decode() # avoid spamming log if data not equal equal = data == read_data # avoid spamming log if data not equal
assert equal == True assert equal == True
def copy_object_storage_class(src_bucket, src_key, dest_bucket, dest_key, storage_class): def copy_object_storage_class(src_bucket, src_key, dest_bucket, dest_key, storage_class):
@ -362,9 +362,6 @@ def configured_storage_classes():
if item != 'STANDARD': if item != 'STANDARD':
sc.append(item) sc.append(item)
sc = [i for i in sc if i]
print("storage classes configured: " + str(sc))
return sc return sc
def lc_transition(days=None, date=None, storage_class=None): def lc_transition(days=None, date=None, storage_class=None):

View file

@ -174,7 +174,7 @@ def configured_storage_classes():
return sc return sc
def configure(): def setup():
cfg = configparser.RawConfigParser() cfg = configparser.RawConfigParser()
try: try:
path = os.environ['S3TEST_CONF'] path = os.environ['S3TEST_CONF']
@ -259,43 +259,25 @@ def configure():
config.tenant_display_name = cfg.get('s3 tenant',"display_name") config.tenant_display_name = cfg.get('s3 tenant',"display_name")
config.tenant_user_id = cfg.get('s3 tenant',"user_id") config.tenant_user_id = cfg.get('s3 tenant',"user_id")
config.tenant_email = cfg.get('s3 tenant',"email") config.tenant_email = cfg.get('s3 tenant',"email")
config.tenant_name = cfg.get('s3 tenant',"tenant")
config.iam_access_key = cfg.get('iam',"access_key")
config.iam_secret_key = cfg.get('iam',"secret_key")
config.iam_display_name = cfg.get('iam',"display_name")
config.iam_user_id = cfg.get('iam',"user_id")
config.iam_email = cfg.get('iam',"email")
config.iam_root_access_key = cfg.get('iam root',"access_key")
config.iam_root_secret_key = cfg.get('iam root',"secret_key")
config.iam_root_user_id = cfg.get('iam root',"user_id")
config.iam_root_email = cfg.get('iam root',"email")
config.iam_alt_root_access_key = cfg.get('iam alt root',"access_key")
config.iam_alt_root_secret_key = cfg.get('iam alt root',"secret_key")
config.iam_alt_root_user_id = cfg.get('iam alt root',"user_id")
config.iam_alt_root_email = cfg.get('iam alt root',"email")
# vars from the fixtures section # vars from the fixtures section
template = cfg.get('fixtures', "bucket prefix", fallback='test-{random}-') try:
template = cfg.get('fixtures', "bucket prefix")
except (configparser.NoOptionError):
template = 'test-{random}-'
prefix = choose_bucket_prefix(template=template) prefix = choose_bucket_prefix(template=template)
template = cfg.get('fixtures', "iam name prefix", fallback="s3-tests-")
config.iam_name_prefix = choose_bucket_prefix(template=template) alt_client = get_alt_client()
template = cfg.get('fixtures', "iam path prefix", fallback="/s3-tests/") tenant_client = get_tenant_client()
config.iam_path_prefix = choose_bucket_prefix(template=template) nuke_prefixed_buckets(prefix=prefix)
nuke_prefixed_buckets(prefix=prefix, client=alt_client)
nuke_prefixed_buckets(prefix=prefix, client=tenant_client)
if cfg.has_section("s3 cloud"): if cfg.has_section("s3 cloud"):
get_cloud_config(cfg) get_cloud_config(cfg)
else: else:
config.cloud_storage_class = None config.cloud_storage_class = None
def setup():
alt_client = get_alt_client()
tenant_client = get_tenant_client()
nuke_prefixed_buckets(prefix=prefix)
nuke_prefixed_buckets(prefix=prefix, client=alt_client)
nuke_prefixed_buckets(prefix=prefix, client=tenant_client)
def teardown(): def teardown():
alt_client = get_alt_client() alt_client = get_alt_client()
@ -324,12 +306,11 @@ def teardown():
@pytest.fixture(scope="package") @pytest.fixture(scope="package")
def configfile(): def configfile():
configure() setup()
return config return config
@pytest.fixture(autouse=True) @pytest.fixture(autouse=True)
def setup_teardown(configfile): def setup_teardown(configfile):
setup()
yield yield
teardown() teardown()
@ -414,65 +395,64 @@ def get_v2_client():
config=Config(signature_version='s3')) config=Config(signature_version='s3'))
return client return client
def get_sts_client(**kwargs): def get_sts_client(client_config=None):
kwargs.setdefault('aws_access_key_id', config.alt_access_key) if client_config == None:
kwargs.setdefault('aws_secret_access_key', config.alt_secret_key) client_config = Config(signature_version='s3v4')
kwargs.setdefault('config', Config(signature_version='s3v4'))
client = boto3.client(service_name='sts', client = boto3.client(service_name='sts',
endpoint_url=config.default_endpoint, aws_access_key_id=config.alt_access_key,
region_name='', aws_secret_access_key=config.alt_secret_key,
use_ssl=config.default_is_secure,
verify=config.default_ssl_verify,
**kwargs)
return client
def get_iam_client(**kwargs):
kwargs.setdefault('aws_access_key_id', config.iam_access_key)
kwargs.setdefault('aws_secret_access_key', config.iam_secret_key)
client = boto3.client(service_name='iam',
endpoint_url=config.default_endpoint, endpoint_url=config.default_endpoint,
region_name='', region_name='',
use_ssl=config.default_is_secure, use_ssl=config.default_is_secure,
verify=config.default_ssl_verify, verify=config.default_ssl_verify,
**kwargs) config=client_config)
return client return client
def get_iam_s3client(**kwargs): def get_iam_client(client_config=None):
kwargs.setdefault('aws_access_key_id', config.iam_access_key) cfg = configparser.RawConfigParser()
kwargs.setdefault('aws_secret_access_key', config.iam_secret_key) try:
kwargs.setdefault('config', Config(signature_version='s3v4')) path = os.environ['S3TEST_CONF']
except KeyError:
raise RuntimeError(
'To run tests, point environment '
+ 'variable S3TEST_CONF to a config file.',
)
cfg.read(path)
if not cfg.has_section("iam"):
raise RuntimeError('Your config file is missing the "iam" section!')
config.iam_access_key = cfg.get('iam',"access_key")
config.iam_secret_key = cfg.get('iam',"secret_key")
config.iam_display_name = cfg.get('iam',"display_name")
config.iam_user_id = cfg.get('iam',"user_id")
config.iam_email = cfg.get('iam',"email")
if client_config == None:
client_config = Config(signature_version='s3v4')
client = boto3.client(service_name='iam',
aws_access_key_id=config.iam_access_key,
aws_secret_access_key=config.iam_secret_key,
endpoint_url=config.default_endpoint,
region_name='',
use_ssl=config.default_is_secure,
verify=config.default_ssl_verify,
config=client_config)
return client
def get_iam_s3client(client_config=None):
if client_config == None:
client_config = Config(signature_version='s3v4')
client = boto3.client(service_name='s3', client = boto3.client(service_name='s3',
aws_access_key_id=get_iam_access_key(),
aws_secret_access_key=get_iam_secret_key(),
endpoint_url=config.default_endpoint, endpoint_url=config.default_endpoint,
use_ssl=config.default_is_secure, use_ssl=config.default_is_secure,
verify=config.default_ssl_verify, verify=config.default_ssl_verify,
**kwargs) config=client_config)
return client return client
def get_iam_root_client(**kwargs):
kwargs.setdefault('service_name', 'iam')
kwargs.setdefault('aws_access_key_id', config.iam_root_access_key)
kwargs.setdefault('aws_secret_access_key', config.iam_root_secret_key)
return boto3.client(endpoint_url=config.default_endpoint,
region_name='',
use_ssl=config.default_is_secure,
verify=config.default_ssl_verify,
**kwargs)
def get_iam_alt_root_client(**kwargs):
kwargs.setdefault('service_name', 'iam')
kwargs.setdefault('aws_access_key_id', config.iam_alt_root_access_key)
kwargs.setdefault('aws_secret_access_key', config.iam_alt_root_secret_key)
return boto3.client(endpoint_url=config.default_endpoint,
region_name='',
use_ssl=config.default_is_secure,
verify=config.default_ssl_verify,
**kwargs)
def get_alt_client(client_config=None): def get_alt_client(client_config=None):
if client_config == None: if client_config == None:
client_config = Config(signature_version='s3v4') client_config = Config(signature_version='s3v4')
@ -511,17 +491,6 @@ def get_tenant_client(client_config=None):
config=client_config) config=client_config)
return client return client
def get_v2_tenant_client():
client_config = Config(signature_version='s3')
client = boto3.client(service_name='s3',
aws_access_key_id=config.tenant_access_key,
aws_secret_access_key=config.tenant_secret_key,
endpoint_url=config.default_endpoint,
use_ssl=config.default_is_secure,
verify=config.default_ssl_verify,
config=client_config)
return client
def get_tenant_iam_client(): def get_tenant_iam_client():
client = boto3.client(service_name='iam', client = boto3.client(service_name='iam',
@ -706,9 +675,6 @@ def get_tenant_aws_secret_key():
def get_tenant_display_name(): def get_tenant_display_name():
return config.tenant_display_name return config.tenant_display_name
def get_tenant_name():
return config.tenant_name
def get_tenant_user_id(): def get_tenant_user_id():
return config.tenant_user_id return config.tenant_user_id
@ -733,33 +699,12 @@ def get_token():
def get_realm_name(): def get_realm_name():
return config.webidentity_realm return config.webidentity_realm
def get_iam_name_prefix():
return config.iam_name_prefix
def make_iam_name(name):
return config.iam_name_prefix + name
def get_iam_path_prefix():
return config.iam_path_prefix
def get_iam_access_key(): def get_iam_access_key():
return config.iam_access_key return config.iam_access_key
def get_iam_secret_key(): def get_iam_secret_key():
return config.iam_secret_key return config.iam_secret_key
def get_iam_root_user_id():
return config.iam_root_user_id
def get_iam_root_email():
return config.iam_root_email
def get_iam_alt_root_user_id():
return config.iam_alt_root_user_id
def get_iam_alt_root_email():
return config.iam_alt_root_email
def get_user_token(): def get_user_token():
return config.webidentity_user_token return config.webidentity_user_token

View file

@ -1,199 +0,0 @@
from botocore.exceptions import ClientError
import pytest
from . import (
configfile,
get_iam_root_client,
get_iam_root_user_id,
get_iam_root_email,
get_iam_alt_root_client,
get_iam_alt_root_user_id,
get_iam_alt_root_email,
get_iam_path_prefix,
)
def nuke_user_keys(client, name):
p = client.get_paginator('list_access_keys')
for response in p.paginate(UserName=name):
for key in response['AccessKeyMetadata']:
try:
client.delete_access_key(UserName=name, AccessKeyId=key['AccessKeyId'])
except:
pass
def nuke_user_policies(client, name):
p = client.get_paginator('list_user_policies')
for response in p.paginate(UserName=name):
for policy in response['PolicyNames']:
try:
client.delete_user_policy(UserName=name, PolicyName=policy)
except:
pass
def nuke_attached_user_policies(client, name):
p = client.get_paginator('list_attached_user_policies')
for response in p.paginate(UserName=name):
for policy in response['AttachedPolicies']:
try:
client.detach_user_policy(UserName=name, PolicyArn=policy['PolicyArn'])
except:
pass
def nuke_user(client, name):
# delete access keys, user policies, etc
try:
nuke_user_keys(client, name)
except:
pass
try:
nuke_user_policies(client, name)
except:
pass
try:
nuke_attached_user_policies(client, name)
except:
pass
client.delete_user(UserName=name)
def nuke_users(client, **kwargs):
p = client.get_paginator('list_users')
for response in p.paginate(**kwargs):
for user in response['Users']:
try:
nuke_user(client, user['UserName'])
except:
pass
def nuke_group_policies(client, name):
p = client.get_paginator('list_group_policies')
for response in p.paginate(GroupName=name):
for policy in response['PolicyNames']:
try:
client.delete_group_policy(GroupName=name, PolicyName=policy)
except:
pass
def nuke_attached_group_policies(client, name):
p = client.get_paginator('list_attached_group_policies')
for response in p.paginate(GroupName=name):
for policy in response['AttachedPolicies']:
try:
client.detach_group_policy(GroupName=name, PolicyArn=policy['PolicyArn'])
except:
pass
def nuke_group_users(client, name):
p = client.get_paginator('get_group')
for response in p.paginate(GroupName=name):
for user in response['Users']:
try:
client.remove_user_from_group(GroupName=name, UserName=user['UserName'])
except:
pass
def nuke_group(client, name):
# delete group policies and remove all users
try:
nuke_group_policies(client, name)
except:
pass
try:
nuke_attached_group_policies(client, name)
except:
pass
try:
nuke_group_users(client, name)
except:
pass
client.delete_group(GroupName=name)
def nuke_groups(client, **kwargs):
p = client.get_paginator('list_groups')
for response in p.paginate(**kwargs):
for user in response['Groups']:
try:
nuke_group(client, user['GroupName'])
except:
pass
def nuke_role_policies(client, name):
p = client.get_paginator('list_role_policies')
for response in p.paginate(RoleName=name):
for policy in response['PolicyNames']:
try:
client.delete_role_policy(RoleName=name, PolicyName=policy)
except:
pass
def nuke_attached_role_policies(client, name):
p = client.get_paginator('list_attached_role_policies')
for response in p.paginate(RoleName=name):
for policy in response['AttachedPolicies']:
try:
client.detach_role_policy(RoleName=name, PolicyArn=policy['PolicyArn'])
except:
pass
def nuke_role(client, name):
# delete role policies, etc
try:
nuke_role_policies(client, name)
except:
pass
try:
nuke_attached_role_policies(client, name)
except:
pass
client.delete_role(RoleName=name)
def nuke_roles(client, **kwargs):
p = client.get_paginator('list_roles')
for response in p.paginate(**kwargs):
for role in response['Roles']:
try:
nuke_role(client, role['RoleName'])
except:
pass
def nuke_oidc_providers(client, prefix):
result = client.list_open_id_connect_providers()
for provider in result['OpenIDConnectProviderList']:
arn = provider['Arn']
if f':oidc-provider{prefix}' in arn:
try:
client.delete_open_id_connect_provider(OpenIDConnectProviderArn=arn)
except:
pass
# fixture for iam account root user
@pytest.fixture
def iam_root(configfile):
client = get_iam_root_client()
try:
arn = client.get_user()['User']['Arn']
if not arn.endswith(':root'):
pytest.skip('[iam root] user does not have :root arn')
except ClientError as e:
pytest.skip('[iam root] user does not belong to an account')
yield client
nuke_users(client, PathPrefix=get_iam_path_prefix())
nuke_groups(client, PathPrefix=get_iam_path_prefix())
nuke_roles(client, PathPrefix=get_iam_path_prefix())
nuke_oidc_providers(client, get_iam_path_prefix())
# fixture for iam alt account root user
@pytest.fixture
def iam_alt_root(configfile):
client = get_iam_alt_root_client()
try:
arn = client.get_user()['User']['Arn']
if not arn.endswith(':root'):
pytest.skip('[iam alt root] user does not have :root arn')
except ClientError as e:
pytest.skip('[iam alt root] user does not belong to an account')
yield client
nuke_users(client, PathPrefix=get_iam_path_prefix())
nuke_roles(client, PathPrefix=get_iam_path_prefix())

View file

@ -37,10 +37,10 @@ class Policy(object):
return json.dumps(policy_dict) return json.dumps(policy_dict)
def make_json_policy(action, resource, principal={"AWS": "*"}, effect="Allow", conditions=None): def make_json_policy(action, resource, principal={"AWS": "*"}, conditions=None):
""" """
Helper function to make single statement policies Helper function to make single statement policies
""" """
s = Statement(action, resource, principal, effect=effect, condition=conditions) s = Statement(action, resource, principal, condition=conditions)
p = Policy() p = Policy()
return p.add_statement(s).to_json() return p.add_statement(s).to_json()

File diff suppressed because it is too large Load diff

File diff suppressed because it is too large Load diff

View file

@ -4,11 +4,8 @@ import string
import re import re
import json import json
from botocore.exceptions import ClientError from botocore.exceptions import ClientError
from botocore.exceptions import EventStreamError
import uuid import uuid
import warnings
import traceback
from . import ( from . import (
configfile, configfile,
@ -98,7 +95,6 @@ def test_generate_where_clause():
for _ in range(100): for _ in range(100):
generate_s3select_where_clause(bucket_name,obj_name) generate_s3select_where_clause(bucket_name,obj_name)
@pytest.mark.s3select @pytest.mark.s3select
def test_generate_projection(): def test_generate_projection():
@ -115,26 +111,12 @@ def s3select_assert_result(a,b):
if type(a) == str: if type(a) == str:
a_strip = a.strip() a_strip = a.strip()
b_strip = b.strip() b_strip = b.strip()
if a=="" and b=="":
warnings.warn(UserWarning("{}".format("both results are empty, it may indicates a wrong input, please check the test input")))
## print the calling function that created the empty result.
stack = traceback.extract_stack(limit=2)
formatted_stack = traceback.format_list(stack)[0]
warnings.warn(UserWarning("{}".format(formatted_stack)))
return True
assert a_strip != "" assert a_strip != ""
assert b_strip != "" assert b_strip != ""
else: else:
if a=="" and b=="":
warnings.warn(UserWarning("{}".format("both results are empty, it may indicates a wrong input, please check the test input")))
## print the calling function that created the empty result.
stack = traceback.extract_stack(limit=2)
formatted_stack = traceback.format_list(stack)[0]
warnings.warn(UserWarning("{}".format(formatted_stack)))
return True
assert a != "" assert a != ""
assert b != "" assert b != ""
assert True assert a == b
def create_csv_object_for_datetime(rows,columns): def create_csv_object_for_datetime(rows,columns):
result = "" result = ""
@ -295,7 +277,6 @@ def run_s3select(bucket,key,query,column_delim=",",row_delim="\n",quot_char='"',
s3 = get_client() s3 = get_client()
result = "" result = ""
result_status = {} result_status = {}
try: try:
r = s3.select_object_content( r = s3.select_object_content(
Bucket=bucket, Bucket=bucket,
@ -311,34 +292,26 @@ def run_s3select(bucket,key,query,column_delim=",",row_delim="\n",quot_char='"',
return result return result
if progress == False: if progress == False:
for event in r['Payload']:
try: if 'Records' in event:
for event in r['Payload']: records = event['Records']['Payload'].decode('utf-8')
if 'Records' in event: result += records
records = event['Records']['Payload'].decode('utf-8')
result += records
except EventStreamError as c:
result = str(c)
return result
else: else:
result = [] result = []
max_progress_scanned = 0 max_progress_scanned = 0
for event in r['Payload']: for event in r['Payload']:
if 'Records' in event: if 'Records' in event:
records = event['Records'] records = event['Records']
result.append(records.copy()) result.append(records.copy())
if 'Progress' in event: if 'Progress' in event:
if(event['Progress']['Details']['BytesScanned'] > max_progress_scanned): if(event['Progress']['Details']['BytesScanned'] > max_progress_scanned):
max_progress_scanned = event['Progress']['Details']['BytesScanned'] max_progress_scanned = event['Progress']['Details']['BytesScanned']
result_status['Progress'] = event['Progress'] result_status['Progress'] = event['Progress']
if 'Stats' in event:
result_status['Stats'] = event['Stats']
if 'End' in event:
result_status['End'] = event['End']
if 'Stats' in event:
result_status['Stats'] = event['Stats']
if 'End' in event:
result_status['End'] = event['End']
if progress == False: if progress == False:
return result return result
@ -823,9 +796,6 @@ def test_true_false_in_expressions():
csv_obj_name = get_random_string() csv_obj_name = get_random_string()
bucket_name = get_new_bucket_name() bucket_name = get_new_bucket_name()
## 1,2 must exist in first/second column (to avoid empty results)
csv_obj = csv_obj + "1,2,,,,,,,,,,\n"
upload_object(bucket_name,csv_obj_name,csv_obj) upload_object(bucket_name,csv_obj_name,csv_obj)
res_s3select_in = remove_xml_tags_from_result( run_s3select(bucket_name,csv_obj_name,'select int(_1) from s3object where (int(_1) in(1)) = true;')).replace("\n","") res_s3select_in = remove_xml_tags_from_result( run_s3select(bucket_name,csv_obj_name,'select int(_1) from s3object where (int(_1) in(1)) = true;')).replace("\n","")
@ -894,7 +864,7 @@ def test_like_expressions():
res_s3select_like = remove_xml_tags_from_result( run_s3select(bucket_name,csv_obj_name,'select count(*) from stdin where _1 like "%aeio%" like;')).replace("\n","") res_s3select_like = remove_xml_tags_from_result( run_s3select(bucket_name,csv_obj_name,'select count(*) from stdin where _1 like "%aeio%" like;')).replace("\n","")
find_like = res_s3select_like.find("UnsupportedSyntax") find_like = res_s3select_like.find("s3select-Syntax-Error")
assert int(find_like) >= 0 assert int(find_like) >= 0
@ -1359,6 +1329,7 @@ def test_schema_definition():
# using the scheme on first line, query is using the attach schema # using the scheme on first line, query is using the attach schema
res_use = remove_xml_tags_from_result( run_s3select(bucket_name,csv_obj_name,"select c1,c3 from s3object;",csv_header_info="USE") ).replace("\n","") res_use = remove_xml_tags_from_result( run_s3select(bucket_name,csv_obj_name,"select c1,c3 from s3object;",csv_header_info="USE") ).replace("\n","")
# result of both queries should be the same # result of both queries should be the same
s3select_assert_result( res_ignore, res_use) s3select_assert_result( res_ignore, res_use)
@ -1367,8 +1338,8 @@ def test_schema_definition():
assert ((res_multiple_defintion.find("alias {c11} or column not exist in schema")) >= 0) assert ((res_multiple_defintion.find("alias {c11} or column not exist in schema")) >= 0)
#find_processing_error = res_multiple_defintion.find("ProcessingTimeError") #find_processing_error = res_multiple_defintion.find("s3select-ProcessingTime-Error")
assert ((res_multiple_defintion.find("ProcessingTimeError")) >= 0) assert ((res_multiple_defintion.find("s3select-ProcessingTime-Error")) >= 0)
# alias-name is identical to column-name # alias-name is identical to column-name
res_multiple_defintion = remove_xml_tags_from_result( run_s3select(bucket_name,csv_obj_name,"select int(c1)+int(c2) as c4,c4 from s3object;",csv_header_info="USE") ).replace("\n","") res_multiple_defintion = remove_xml_tags_from_result( run_s3select(bucket_name,csv_obj_name,"select int(c1)+int(c2) as c4,c4 from s3object;",csv_header_info="USE") ).replace("\n","")

View file

@ -1,159 +0,0 @@
import json
import pytest
from botocore.exceptions import ClientError
from . import (
configfile,
get_iam_root_client,
get_iam_alt_root_client,
get_new_bucket_name,
get_prefix,
nuke_prefixed_buckets,
)
from .iam import iam_root, iam_alt_root
from .utils import assert_raises, _get_status_and_error_code
def get_new_topic_name():
return get_new_bucket_name()
def nuke_topics(client, prefix):
p = client.get_paginator('list_topics')
for response in p.paginate():
for topic in response['Topics']:
arn = topic['TopicArn']
if prefix not in arn:
pass
try:
client.delete_topic(TopicArn=arn)
except:
pass
@pytest.fixture
def sns(iam_root):
client = get_iam_root_client(service_name='sns')
yield client
nuke_topics(client, get_prefix())
@pytest.fixture
def sns_alt(iam_alt_root):
client = get_iam_alt_root_client(service_name='sns')
yield client
nuke_topics(client, get_prefix())
@pytest.fixture
def s3(iam_root):
client = get_iam_root_client(service_name='s3')
yield client
nuke_prefixed_buckets(get_prefix(), client)
@pytest.fixture
def s3_alt(iam_alt_root):
client = get_iam_alt_root_client(service_name='s3')
yield client
nuke_prefixed_buckets(get_prefix(), client)
@pytest.mark.iam_account
@pytest.mark.sns
def test_account_topic(sns):
name = get_new_topic_name()
response = sns.create_topic(Name=name)
arn = response['TopicArn']
assert arn.startswith('arn:aws:sns:')
assert arn.endswith(f':{name}')
response = sns.list_topics()
assert arn in [p['TopicArn'] for p in response['Topics']]
sns.set_topic_attributes(TopicArn=arn, AttributeName='Policy', AttributeValue='')
response = sns.get_topic_attributes(TopicArn=arn)
assert 'Attributes' in response
sns.delete_topic(TopicArn=arn)
response = sns.list_topics()
assert arn not in [p['TopicArn'] for p in response['Topics']]
with pytest.raises(sns.exceptions.NotFoundException):
sns.get_topic_attributes(TopicArn=arn)
sns.delete_topic(TopicArn=arn)
@pytest.mark.iam_account
@pytest.mark.sns
def test_cross_account_topic(sns, sns_alt):
name = get_new_topic_name()
arn = sns.create_topic(Name=name)['TopicArn']
# not visible to any alt user apis
with pytest.raises(sns.exceptions.NotFoundException):
sns_alt.get_topic_attributes(TopicArn=arn)
with pytest.raises(sns.exceptions.NotFoundException):
sns_alt.set_topic_attributes(TopicArn=arn, AttributeName='Policy', AttributeValue='')
# delete returns success
sns_alt.delete_topic(TopicArn=arn)
response = sns_alt.list_topics()
assert arn not in [p['TopicArn'] for p in response['Topics']]
@pytest.mark.iam_account
@pytest.mark.sns
def test_account_topic_publish(sns, s3):
name = get_new_topic_name()
response = sns.create_topic(Name=name)
topic_arn = response['TopicArn']
bucket = get_new_bucket_name()
s3.create_bucket(Bucket=bucket)
config = {'TopicConfigurations': [{
'Id': 'id',
'TopicArn': topic_arn,
'Events': [ 's3:ObjectCreated:*' ],
}]}
s3.put_bucket_notification_configuration(
Bucket=bucket, NotificationConfiguration=config)
@pytest.mark.iam_account
@pytest.mark.iam_cross_account
@pytest.mark.sns
def test_cross_account_topic_publish(sns, s3_alt, iam_alt_root):
name = get_new_topic_name()
response = sns.create_topic(Name=name)
topic_arn = response['TopicArn']
bucket = get_new_bucket_name()
s3_alt.create_bucket(Bucket=bucket)
config = {'TopicConfigurations': [{
'Id': 'id',
'TopicArn': topic_arn,
'Events': [ 's3:ObjectCreated:*' ],
}]}
# expect AccessDenies because no resource policy allows cross-account access
e = assert_raises(ClientError, s3_alt.put_bucket_notification_configuration,
Bucket=bucket, NotificationConfiguration=config)
status, error_code = _get_status_and_error_code(e.response)
assert status == 403
assert error_code == 'AccessDenied'
# add topic policy to allow the alt user
alt_principal = iam_alt_root.get_user()['User']['Arn']
policy = json.dumps({
'Version': '2012-10-17',
'Statement': [{
'Effect': 'Allow',
'Principal': {'AWS': alt_principal},
'Action': 'sns:Publish',
'Resource': topic_arn
}]
})
sns.set_topic_attributes(TopicArn=topic_arn, AttributeName='Policy',
AttributeValue=policy)
s3_alt.put_bucket_notification_configuration(
Bucket=bucket, NotificationConfiguration=config)