mirror of
https://github.com/ceph/s3-tests.git
synced 2024-11-24 19:30:38 +00:00
Merge pull request #537 from cbodley/wip-iam-user-apis
iam: add tests for account-based IAM apis
This commit is contained in:
commit
54c1488a43
7 changed files with 2448 additions and 116 deletions
11
pytest.ini
11
pytest.ini
|
@ -16,21 +16,28 @@ markers =
|
||||||
fails_on_rgw
|
fails_on_rgw
|
||||||
fails_on_s3
|
fails_on_s3
|
||||||
fails_with_subdomain
|
fails_with_subdomain
|
||||||
|
group
|
||||||
|
group_policy
|
||||||
|
iam_account
|
||||||
|
iam_cross_account
|
||||||
|
iam_role
|
||||||
|
iam_tenant
|
||||||
|
iam_user
|
||||||
lifecycle
|
lifecycle
|
||||||
lifecycle_expiration
|
lifecycle_expiration
|
||||||
lifecycle_transition
|
lifecycle_transition
|
||||||
list_objects_v2
|
list_objects_v2
|
||||||
object_lock
|
object_lock
|
||||||
|
role_policy
|
||||||
session_policy
|
session_policy
|
||||||
s3select
|
s3select
|
||||||
s3website
|
s3website
|
||||||
s3website_routing_rules
|
s3website_routing_rules
|
||||||
s3website_redirect_location
|
s3website_redirect_location
|
||||||
3website
|
sns
|
||||||
sse_s3
|
sse_s3
|
||||||
storage_class
|
storage_class
|
||||||
tagging
|
tagging
|
||||||
test_of_iam
|
|
||||||
test_of_sts
|
test_of_sts
|
||||||
token_claims_trust_policy_test
|
token_claims_trust_policy_test
|
||||||
token_principal_tag_role_policy_test
|
token_principal_tag_role_policy_test
|
||||||
|
|
|
@ -19,6 +19,14 @@ ssl_verify = False
|
||||||
## the prefix to 30 characters long, and avoid collisions
|
## the prefix to 30 characters long, and avoid collisions
|
||||||
bucket prefix = yournamehere-{random}-
|
bucket prefix = yournamehere-{random}-
|
||||||
|
|
||||||
|
# all the iam account resources (users, roles, etc) created
|
||||||
|
# will start with this name prefix
|
||||||
|
iam name prefix = s3-tests-
|
||||||
|
|
||||||
|
# all the iam account resources (users, roles, etc) created
|
||||||
|
# will start with this path prefix
|
||||||
|
iam path prefix = /s3-tests/
|
||||||
|
|
||||||
[s3 main]
|
[s3 main]
|
||||||
# main display_name set in vstart.sh
|
# main display_name set in vstart.sh
|
||||||
display_name = M. Tester
|
display_name = M. Tester
|
||||||
|
@ -127,6 +135,20 @@ secret_key = abcdefghijklmnopqrstuvwxyzabcdefghijklmn
|
||||||
#display_name from vstart.sh
|
#display_name from vstart.sh
|
||||||
display_name = youruseridhere
|
display_name = youruseridhere
|
||||||
|
|
||||||
|
# iam account root user for iam_account tests
|
||||||
|
[iam root]
|
||||||
|
access_key = AAAAAAAAAAAAAAAAAAaa
|
||||||
|
secret_key = aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa
|
||||||
|
user_id = RGW11111111111111111
|
||||||
|
email = account1@ceph.com
|
||||||
|
|
||||||
|
# iam account root user in a different account than [iam root]
|
||||||
|
[iam alt root]
|
||||||
|
access_key = BBBBBBBBBBBBBBBBBBbb
|
||||||
|
secret_key = bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb
|
||||||
|
user_id = RGW22222222222222222
|
||||||
|
email = account2@ceph.com
|
||||||
|
|
||||||
#following section needs to be added when you want to run Assume Role With Webidentity test
|
#following section needs to be added when you want to run Assume Role With Webidentity test
|
||||||
[webidentity]
|
[webidentity]
|
||||||
#used for assume role with web identity test in sts-tests
|
#used for assume role with web identity test in sts-tests
|
||||||
|
|
|
@ -174,7 +174,7 @@ def configured_storage_classes():
|
||||||
|
|
||||||
return sc
|
return sc
|
||||||
|
|
||||||
def setup():
|
def configure():
|
||||||
cfg = configparser.RawConfigParser()
|
cfg = configparser.RawConfigParser()
|
||||||
try:
|
try:
|
||||||
path = os.environ['S3TEST_CONF']
|
path = os.environ['S3TEST_CONF']
|
||||||
|
@ -260,24 +260,41 @@ def setup():
|
||||||
config.tenant_user_id = cfg.get('s3 tenant',"user_id")
|
config.tenant_user_id = cfg.get('s3 tenant',"user_id")
|
||||||
config.tenant_email = cfg.get('s3 tenant',"email")
|
config.tenant_email = cfg.get('s3 tenant',"email")
|
||||||
|
|
||||||
# vars from the fixtures section
|
config.iam_access_key = cfg.get('iam',"access_key")
|
||||||
try:
|
config.iam_secret_key = cfg.get('iam',"secret_key")
|
||||||
template = cfg.get('fixtures', "bucket prefix")
|
config.iam_display_name = cfg.get('iam',"display_name")
|
||||||
except (configparser.NoOptionError):
|
config.iam_user_id = cfg.get('iam',"user_id")
|
||||||
template = 'test-{random}-'
|
config.iam_email = cfg.get('iam',"email")
|
||||||
prefix = choose_bucket_prefix(template=template)
|
|
||||||
|
|
||||||
alt_client = get_alt_client()
|
config.iam_root_access_key = cfg.get('iam root',"access_key")
|
||||||
tenant_client = get_tenant_client()
|
config.iam_root_secret_key = cfg.get('iam root',"secret_key")
|
||||||
nuke_prefixed_buckets(prefix=prefix)
|
config.iam_root_user_id = cfg.get('iam root',"user_id")
|
||||||
nuke_prefixed_buckets(prefix=prefix, client=alt_client)
|
config.iam_root_email = cfg.get('iam root',"email")
|
||||||
nuke_prefixed_buckets(prefix=prefix, client=tenant_client)
|
|
||||||
|
config.iam_alt_root_access_key = cfg.get('iam alt root',"access_key")
|
||||||
|
config.iam_alt_root_secret_key = cfg.get('iam alt root',"secret_key")
|
||||||
|
config.iam_alt_root_user_id = cfg.get('iam alt root',"user_id")
|
||||||
|
config.iam_alt_root_email = cfg.get('iam alt root',"email")
|
||||||
|
|
||||||
|
# vars from the fixtures section
|
||||||
|
template = cfg.get('fixtures', "bucket prefix", fallback='test-{random}-')
|
||||||
|
prefix = choose_bucket_prefix(template=template)
|
||||||
|
template = cfg.get('fixtures', "iam name prefix", fallback="s3-tests-")
|
||||||
|
config.iam_name_prefix = choose_bucket_prefix(template=template)
|
||||||
|
template = cfg.get('fixtures', "iam path prefix", fallback="/s3-tests/")
|
||||||
|
config.iam_path_prefix = choose_bucket_prefix(template=template)
|
||||||
|
|
||||||
if cfg.has_section("s3 cloud"):
|
if cfg.has_section("s3 cloud"):
|
||||||
get_cloud_config(cfg)
|
get_cloud_config(cfg)
|
||||||
else:
|
else:
|
||||||
config.cloud_storage_class = None
|
config.cloud_storage_class = None
|
||||||
|
|
||||||
|
def setup():
|
||||||
|
alt_client = get_alt_client()
|
||||||
|
tenant_client = get_tenant_client()
|
||||||
|
nuke_prefixed_buckets(prefix=prefix)
|
||||||
|
nuke_prefixed_buckets(prefix=prefix, client=alt_client)
|
||||||
|
nuke_prefixed_buckets(prefix=prefix, client=tenant_client)
|
||||||
|
|
||||||
def teardown():
|
def teardown():
|
||||||
alt_client = get_alt_client()
|
alt_client = get_alt_client()
|
||||||
|
@ -306,11 +323,12 @@ def teardown():
|
||||||
|
|
||||||
@pytest.fixture(scope="package")
|
@pytest.fixture(scope="package")
|
||||||
def configfile():
|
def configfile():
|
||||||
setup()
|
configure()
|
||||||
return config
|
return config
|
||||||
|
|
||||||
@pytest.fixture(autouse=True)
|
@pytest.fixture(autouse=True)
|
||||||
def setup_teardown(configfile):
|
def setup_teardown(configfile):
|
||||||
|
setup()
|
||||||
yield
|
yield
|
||||||
teardown()
|
teardown()
|
||||||
|
|
||||||
|
@ -395,64 +413,65 @@ def get_v2_client():
|
||||||
config=Config(signature_version='s3'))
|
config=Config(signature_version='s3'))
|
||||||
return client
|
return client
|
||||||
|
|
||||||
def get_sts_client(client_config=None):
|
def get_sts_client(**kwargs):
|
||||||
if client_config == None:
|
kwargs.setdefault('aws_access_key_id', config.alt_access_key)
|
||||||
client_config = Config(signature_version='s3v4')
|
kwargs.setdefault('aws_secret_access_key', config.alt_secret_key)
|
||||||
|
kwargs.setdefault('config', Config(signature_version='s3v4'))
|
||||||
|
|
||||||
client = boto3.client(service_name='sts',
|
client = boto3.client(service_name='sts',
|
||||||
aws_access_key_id=config.alt_access_key,
|
|
||||||
aws_secret_access_key=config.alt_secret_key,
|
|
||||||
endpoint_url=config.default_endpoint,
|
endpoint_url=config.default_endpoint,
|
||||||
region_name='',
|
region_name='',
|
||||||
use_ssl=config.default_is_secure,
|
use_ssl=config.default_is_secure,
|
||||||
verify=config.default_ssl_verify,
|
verify=config.default_ssl_verify,
|
||||||
config=client_config)
|
**kwargs)
|
||||||
return client
|
return client
|
||||||
|
|
||||||
def get_iam_client(client_config=None):
|
def get_iam_client(**kwargs):
|
||||||
cfg = configparser.RawConfigParser()
|
kwargs.setdefault('aws_access_key_id', config.iam_access_key)
|
||||||
try:
|
kwargs.setdefault('aws_secret_access_key', config.iam_secret_key)
|
||||||
path = os.environ['S3TEST_CONF']
|
|
||||||
except KeyError:
|
|
||||||
raise RuntimeError(
|
|
||||||
'To run tests, point environment '
|
|
||||||
+ 'variable S3TEST_CONF to a config file.',
|
|
||||||
)
|
|
||||||
cfg.read(path)
|
|
||||||
if not cfg.has_section("iam"):
|
|
||||||
raise RuntimeError('Your config file is missing the "iam" section!')
|
|
||||||
|
|
||||||
config.iam_access_key = cfg.get('iam',"access_key")
|
|
||||||
config.iam_secret_key = cfg.get('iam',"secret_key")
|
|
||||||
config.iam_display_name = cfg.get('iam',"display_name")
|
|
||||||
config.iam_user_id = cfg.get('iam',"user_id")
|
|
||||||
config.iam_email = cfg.get('iam',"email")
|
|
||||||
|
|
||||||
if client_config == None:
|
|
||||||
client_config = Config(signature_version='s3v4')
|
|
||||||
|
|
||||||
client = boto3.client(service_name='iam',
|
client = boto3.client(service_name='iam',
|
||||||
aws_access_key_id=config.iam_access_key,
|
|
||||||
aws_secret_access_key=config.iam_secret_key,
|
|
||||||
endpoint_url=config.default_endpoint,
|
endpoint_url=config.default_endpoint,
|
||||||
region_name='',
|
region_name='',
|
||||||
use_ssl=config.default_is_secure,
|
use_ssl=config.default_is_secure,
|
||||||
verify=config.default_ssl_verify,
|
verify=config.default_ssl_verify,
|
||||||
config=client_config)
|
**kwargs)
|
||||||
return client
|
return client
|
||||||
|
|
||||||
def get_iam_s3client(client_config=None):
|
def get_iam_s3client(**kwargs):
|
||||||
if client_config == None:
|
kwargs.setdefault('aws_access_key_id', config.iam_access_key)
|
||||||
client_config = Config(signature_version='s3v4')
|
kwargs.setdefault('aws_secret_access_key', config.iam_secret_key)
|
||||||
|
kwargs.setdefault('config', Config(signature_version='s3v4'))
|
||||||
|
|
||||||
client = boto3.client(service_name='s3',
|
client = boto3.client(service_name='s3',
|
||||||
aws_access_key_id=get_iam_access_key(),
|
|
||||||
aws_secret_access_key=get_iam_secret_key(),
|
|
||||||
endpoint_url=config.default_endpoint,
|
endpoint_url=config.default_endpoint,
|
||||||
use_ssl=config.default_is_secure,
|
use_ssl=config.default_is_secure,
|
||||||
verify=config.default_ssl_verify,
|
verify=config.default_ssl_verify,
|
||||||
config=client_config)
|
**kwargs)
|
||||||
return client
|
return client
|
||||||
|
|
||||||
|
def get_iam_root_client(**kwargs):
|
||||||
|
kwargs.setdefault('service_name', 'iam')
|
||||||
|
kwargs.setdefault('aws_access_key_id', config.iam_root_access_key)
|
||||||
|
kwargs.setdefault('aws_secret_access_key', config.iam_root_secret_key)
|
||||||
|
|
||||||
|
return boto3.client(endpoint_url=config.default_endpoint,
|
||||||
|
region_name='',
|
||||||
|
use_ssl=config.default_is_secure,
|
||||||
|
verify=config.default_ssl_verify,
|
||||||
|
**kwargs)
|
||||||
|
|
||||||
|
def get_iam_alt_root_client(**kwargs):
|
||||||
|
kwargs.setdefault('service_name', 'iam')
|
||||||
|
kwargs.setdefault('aws_access_key_id', config.iam_alt_root_access_key)
|
||||||
|
kwargs.setdefault('aws_secret_access_key', config.iam_alt_root_secret_key)
|
||||||
|
|
||||||
|
return boto3.client(endpoint_url=config.default_endpoint,
|
||||||
|
region_name='',
|
||||||
|
use_ssl=config.default_is_secure,
|
||||||
|
verify=config.default_ssl_verify,
|
||||||
|
**kwargs)
|
||||||
|
|
||||||
def get_alt_client(client_config=None):
|
def get_alt_client(client_config=None):
|
||||||
if client_config == None:
|
if client_config == None:
|
||||||
client_config = Config(signature_version='s3v4')
|
client_config = Config(signature_version='s3v4')
|
||||||
|
@ -699,12 +718,33 @@ def get_token():
|
||||||
def get_realm_name():
|
def get_realm_name():
|
||||||
return config.webidentity_realm
|
return config.webidentity_realm
|
||||||
|
|
||||||
|
def get_iam_name_prefix():
|
||||||
|
return config.iam_name_prefix
|
||||||
|
|
||||||
|
def make_iam_name(name):
|
||||||
|
return config.iam_name_prefix + name
|
||||||
|
|
||||||
|
def get_iam_path_prefix():
|
||||||
|
return config.iam_path_prefix
|
||||||
|
|
||||||
def get_iam_access_key():
|
def get_iam_access_key():
|
||||||
return config.iam_access_key
|
return config.iam_access_key
|
||||||
|
|
||||||
def get_iam_secret_key():
|
def get_iam_secret_key():
|
||||||
return config.iam_secret_key
|
return config.iam_secret_key
|
||||||
|
|
||||||
|
def get_iam_root_user_id():
|
||||||
|
return config.iam_root_user_id
|
||||||
|
|
||||||
|
def get_iam_root_email():
|
||||||
|
return config.iam_root_email
|
||||||
|
|
||||||
|
def get_iam_alt_root_user_id():
|
||||||
|
return config.iam_alt_root_user_id
|
||||||
|
|
||||||
|
def get_iam_alt_root_email():
|
||||||
|
return config.iam_alt_root_email
|
||||||
|
|
||||||
def get_user_token():
|
def get_user_token():
|
||||||
return config.webidentity_user_token
|
return config.webidentity_user_token
|
||||||
|
|
||||||
|
|
199
s3tests_boto3/functional/iam.py
Normal file
199
s3tests_boto3/functional/iam.py
Normal file
|
@ -0,0 +1,199 @@
|
||||||
|
from botocore.exceptions import ClientError
|
||||||
|
import pytest
|
||||||
|
|
||||||
|
from . import (
|
||||||
|
configfile,
|
||||||
|
get_iam_root_client,
|
||||||
|
get_iam_root_user_id,
|
||||||
|
get_iam_root_email,
|
||||||
|
get_iam_alt_root_client,
|
||||||
|
get_iam_alt_root_user_id,
|
||||||
|
get_iam_alt_root_email,
|
||||||
|
get_iam_path_prefix,
|
||||||
|
)
|
||||||
|
|
||||||
|
def nuke_user_keys(client, name):
|
||||||
|
p = client.get_paginator('list_access_keys')
|
||||||
|
for response in p.paginate(UserName=name):
|
||||||
|
for key in response['AccessKeyMetadata']:
|
||||||
|
try:
|
||||||
|
client.delete_access_key(UserName=name, AccessKeyId=key['AccessKeyId'])
|
||||||
|
except:
|
||||||
|
pass
|
||||||
|
|
||||||
|
def nuke_user_policies(client, name):
|
||||||
|
p = client.get_paginator('list_user_policies')
|
||||||
|
for response in p.paginate(UserName=name):
|
||||||
|
for policy in response['PolicyNames']:
|
||||||
|
try:
|
||||||
|
client.delete_user_policy(UserName=name, PolicyName=policy)
|
||||||
|
except:
|
||||||
|
pass
|
||||||
|
|
||||||
|
def nuke_attached_user_policies(client, name):
|
||||||
|
p = client.get_paginator('list_attached_user_policies')
|
||||||
|
for response in p.paginate(UserName=name):
|
||||||
|
for policy in response['AttachedPolicies']:
|
||||||
|
try:
|
||||||
|
client.detach_user_policy(UserName=name, PolicyArn=policy['PolicyArn'])
|
||||||
|
except:
|
||||||
|
pass
|
||||||
|
|
||||||
|
def nuke_user(client, name):
|
||||||
|
# delete access keys, user policies, etc
|
||||||
|
try:
|
||||||
|
nuke_user_keys(client, name)
|
||||||
|
except:
|
||||||
|
pass
|
||||||
|
try:
|
||||||
|
nuke_user_policies(client, name)
|
||||||
|
except:
|
||||||
|
pass
|
||||||
|
try:
|
||||||
|
nuke_attached_user_policies(client, name)
|
||||||
|
except:
|
||||||
|
pass
|
||||||
|
client.delete_user(UserName=name)
|
||||||
|
|
||||||
|
def nuke_users(client, **kwargs):
|
||||||
|
p = client.get_paginator('list_users')
|
||||||
|
for response in p.paginate(**kwargs):
|
||||||
|
for user in response['Users']:
|
||||||
|
try:
|
||||||
|
nuke_user(client, user['UserName'])
|
||||||
|
except:
|
||||||
|
pass
|
||||||
|
|
||||||
|
def nuke_group_policies(client, name):
|
||||||
|
p = client.get_paginator('list_group_policies')
|
||||||
|
for response in p.paginate(GroupName=name):
|
||||||
|
for policy in response['PolicyNames']:
|
||||||
|
try:
|
||||||
|
client.delete_group_policy(GroupName=name, PolicyName=policy)
|
||||||
|
except:
|
||||||
|
pass
|
||||||
|
|
||||||
|
def nuke_attached_group_policies(client, name):
|
||||||
|
p = client.get_paginator('list_attached_group_policies')
|
||||||
|
for response in p.paginate(GroupName=name):
|
||||||
|
for policy in response['AttachedPolicies']:
|
||||||
|
try:
|
||||||
|
client.detach_group_policy(GroupName=name, PolicyArn=policy['PolicyArn'])
|
||||||
|
except:
|
||||||
|
pass
|
||||||
|
|
||||||
|
def nuke_group_users(client, name):
|
||||||
|
p = client.get_paginator('get_group')
|
||||||
|
for response in p.paginate(GroupName=name):
|
||||||
|
for user in response['Users']:
|
||||||
|
try:
|
||||||
|
client.remove_user_from_group(GroupName=name, UserName=user['UserName'])
|
||||||
|
except:
|
||||||
|
pass
|
||||||
|
|
||||||
|
def nuke_group(client, name):
|
||||||
|
# delete group policies and remove all users
|
||||||
|
try:
|
||||||
|
nuke_group_policies(client, name)
|
||||||
|
except:
|
||||||
|
pass
|
||||||
|
try:
|
||||||
|
nuke_attached_group_policies(client, name)
|
||||||
|
except:
|
||||||
|
pass
|
||||||
|
try:
|
||||||
|
nuke_group_users(client, name)
|
||||||
|
except:
|
||||||
|
pass
|
||||||
|
client.delete_group(GroupName=name)
|
||||||
|
|
||||||
|
def nuke_groups(client, **kwargs):
|
||||||
|
p = client.get_paginator('list_groups')
|
||||||
|
for response in p.paginate(**kwargs):
|
||||||
|
for user in response['Groups']:
|
||||||
|
try:
|
||||||
|
nuke_group(client, user['GroupName'])
|
||||||
|
except:
|
||||||
|
pass
|
||||||
|
|
||||||
|
def nuke_role_policies(client, name):
|
||||||
|
p = client.get_paginator('list_role_policies')
|
||||||
|
for response in p.paginate(RoleName=name):
|
||||||
|
for policy in response['PolicyNames']:
|
||||||
|
try:
|
||||||
|
client.delete_role_policy(RoleName=name, PolicyName=policy)
|
||||||
|
except:
|
||||||
|
pass
|
||||||
|
|
||||||
|
def nuke_attached_role_policies(client, name):
|
||||||
|
p = client.get_paginator('list_attached_role_policies')
|
||||||
|
for response in p.paginate(RoleName=name):
|
||||||
|
for policy in response['AttachedPolicies']:
|
||||||
|
try:
|
||||||
|
client.detach_role_policy(RoleName=name, PolicyArn=policy['PolicyArn'])
|
||||||
|
except:
|
||||||
|
pass
|
||||||
|
|
||||||
|
def nuke_role(client, name):
|
||||||
|
# delete role policies, etc
|
||||||
|
try:
|
||||||
|
nuke_role_policies(client, name)
|
||||||
|
except:
|
||||||
|
pass
|
||||||
|
try:
|
||||||
|
nuke_attached_role_policies(client, name)
|
||||||
|
except:
|
||||||
|
pass
|
||||||
|
client.delete_role(RoleName=name)
|
||||||
|
|
||||||
|
def nuke_roles(client, **kwargs):
|
||||||
|
p = client.get_paginator('list_roles')
|
||||||
|
for response in p.paginate(**kwargs):
|
||||||
|
for role in response['Roles']:
|
||||||
|
try:
|
||||||
|
nuke_role(client, role['RoleName'])
|
||||||
|
except:
|
||||||
|
pass
|
||||||
|
|
||||||
|
def nuke_oidc_providers(client, prefix):
|
||||||
|
result = client.list_open_id_connect_providers()
|
||||||
|
for provider in result['OpenIDConnectProviderList']:
|
||||||
|
arn = provider['Arn']
|
||||||
|
if f':oidc-provider{prefix}' in arn:
|
||||||
|
try:
|
||||||
|
client.delete_open_id_connect_provider(OpenIDConnectProviderArn=arn)
|
||||||
|
except:
|
||||||
|
pass
|
||||||
|
|
||||||
|
|
||||||
|
# fixture for iam account root user
|
||||||
|
@pytest.fixture
|
||||||
|
def iam_root(configfile):
|
||||||
|
client = get_iam_root_client()
|
||||||
|
try:
|
||||||
|
arn = client.get_user()['User']['Arn']
|
||||||
|
if not arn.endswith(':root'):
|
||||||
|
pytest.skip('[iam root] user does not have :root arn')
|
||||||
|
except ClientError as e:
|
||||||
|
pytest.skip('[iam root] user does not belong to an account')
|
||||||
|
|
||||||
|
yield client
|
||||||
|
nuke_users(client, PathPrefix=get_iam_path_prefix())
|
||||||
|
nuke_groups(client, PathPrefix=get_iam_path_prefix())
|
||||||
|
nuke_roles(client, PathPrefix=get_iam_path_prefix())
|
||||||
|
nuke_oidc_providers(client, get_iam_path_prefix())
|
||||||
|
|
||||||
|
# fixture for iam alt account root user
|
||||||
|
@pytest.fixture
|
||||||
|
def iam_alt_root(configfile):
|
||||||
|
client = get_iam_alt_root_client()
|
||||||
|
try:
|
||||||
|
arn = client.get_user()['User']['Arn']
|
||||||
|
if not arn.endswith(':root'):
|
||||||
|
pytest.skip('[iam alt root] user does not have :root arn')
|
||||||
|
except ClientError as e:
|
||||||
|
pytest.skip('[iam alt root] user does not belong to an account')
|
||||||
|
|
||||||
|
yield client
|
||||||
|
nuke_users(client, PathPrefix=get_iam_path_prefix())
|
||||||
|
nuke_roles(client, PathPrefix=get_iam_path_prefix())
|
File diff suppressed because it is too large
Load diff
|
@ -4674,40 +4674,6 @@ def test_bucket_acl_grant_nonexist_user():
|
||||||
assert status == 400
|
assert status == 400
|
||||||
assert error_code == 'InvalidArgument'
|
assert error_code == 'InvalidArgument'
|
||||||
|
|
||||||
def test_bucket_acl_no_grants():
|
|
||||||
bucket_name = get_new_bucket()
|
|
||||||
client = get_client()
|
|
||||||
|
|
||||||
client.put_object(Bucket=bucket_name, Key='foo', Body='bar')
|
|
||||||
response = client.get_bucket_acl(Bucket=bucket_name)
|
|
||||||
old_grants = response['Grants']
|
|
||||||
policy = {}
|
|
||||||
policy['Owner'] = response['Owner']
|
|
||||||
# clear grants
|
|
||||||
policy['Grants'] = []
|
|
||||||
|
|
||||||
# remove read/write permission
|
|
||||||
response = client.put_bucket_acl(Bucket=bucket_name, AccessControlPolicy=policy)
|
|
||||||
|
|
||||||
# can read
|
|
||||||
client.get_object(Bucket=bucket_name, Key='foo')
|
|
||||||
|
|
||||||
# can't write
|
|
||||||
check_access_denied(client.put_object, Bucket=bucket_name, Key='baz', Body='a')
|
|
||||||
|
|
||||||
#TODO fix this test once a fix is in for same issues in
|
|
||||||
# test_access_bucket_private_object_private
|
|
||||||
client2 = get_client()
|
|
||||||
# owner can read acl
|
|
||||||
client2.get_bucket_acl(Bucket=bucket_name)
|
|
||||||
|
|
||||||
# owner can write acl
|
|
||||||
client2.put_bucket_acl(Bucket=bucket_name, ACL='private')
|
|
||||||
|
|
||||||
# set policy back to original so that bucket can be cleaned up
|
|
||||||
policy['Grants'] = old_grants
|
|
||||||
client2.put_bucket_acl(Bucket=bucket_name, AccessControlPolicy=policy)
|
|
||||||
|
|
||||||
def _get_acl_header(user_id=None, perms=None):
|
def _get_acl_header(user_id=None, perms=None):
|
||||||
all_headers = ["read", "write", "read-acp", "write-acp", "full-control"]
|
all_headers = ["read", "write", "read-acp", "write-acp", "full-control"]
|
||||||
headers = []
|
headers = []
|
||||||
|
|
159
s3tests_boto3/functional/test_sns.py
Normal file
159
s3tests_boto3/functional/test_sns.py
Normal file
|
@ -0,0 +1,159 @@
|
||||||
|
import json
|
||||||
|
import pytest
|
||||||
|
from botocore.exceptions import ClientError
|
||||||
|
from . import (
|
||||||
|
configfile,
|
||||||
|
get_iam_root_client,
|
||||||
|
get_iam_alt_root_client,
|
||||||
|
get_new_bucket_name,
|
||||||
|
get_prefix,
|
||||||
|
nuke_prefixed_buckets,
|
||||||
|
)
|
||||||
|
from .iam import iam_root, iam_alt_root
|
||||||
|
from .utils import assert_raises, _get_status_and_error_code
|
||||||
|
|
||||||
|
def get_new_topic_name():
|
||||||
|
return get_new_bucket_name()
|
||||||
|
|
||||||
|
def nuke_topics(client, prefix):
|
||||||
|
p = client.get_paginator('list_topics')
|
||||||
|
for response in p.paginate():
|
||||||
|
for topic in response['Topics']:
|
||||||
|
arn = topic['TopicArn']
|
||||||
|
if prefix not in arn:
|
||||||
|
pass
|
||||||
|
try:
|
||||||
|
client.delete_topic(TopicArn=arn)
|
||||||
|
except:
|
||||||
|
pass
|
||||||
|
|
||||||
|
@pytest.fixture
|
||||||
|
def sns(iam_root):
|
||||||
|
client = get_iam_root_client(service_name='sns')
|
||||||
|
yield client
|
||||||
|
nuke_topics(client, get_prefix())
|
||||||
|
|
||||||
|
@pytest.fixture
|
||||||
|
def sns_alt(iam_alt_root):
|
||||||
|
client = get_iam_alt_root_client(service_name='sns')
|
||||||
|
yield client
|
||||||
|
nuke_topics(client, get_prefix())
|
||||||
|
|
||||||
|
@pytest.fixture
|
||||||
|
def s3(iam_root):
|
||||||
|
client = get_iam_root_client(service_name='s3')
|
||||||
|
yield client
|
||||||
|
nuke_prefixed_buckets(get_prefix(), client)
|
||||||
|
|
||||||
|
@pytest.fixture
|
||||||
|
def s3_alt(iam_alt_root):
|
||||||
|
client = get_iam_alt_root_client(service_name='s3')
|
||||||
|
yield client
|
||||||
|
nuke_prefixed_buckets(get_prefix(), client)
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.iam_account
|
||||||
|
@pytest.mark.sns
|
||||||
|
def test_account_topic(sns):
|
||||||
|
name = get_new_topic_name()
|
||||||
|
|
||||||
|
response = sns.create_topic(Name=name)
|
||||||
|
arn = response['TopicArn']
|
||||||
|
assert arn.startswith('arn:aws:sns:')
|
||||||
|
assert arn.endswith(f':{name}')
|
||||||
|
|
||||||
|
response = sns.list_topics()
|
||||||
|
assert arn in [p['TopicArn'] for p in response['Topics']]
|
||||||
|
|
||||||
|
sns.set_topic_attributes(TopicArn=arn, AttributeName='Policy', AttributeValue='')
|
||||||
|
|
||||||
|
response = sns.get_topic_attributes(TopicArn=arn)
|
||||||
|
assert 'Attributes' in response
|
||||||
|
|
||||||
|
sns.delete_topic(TopicArn=arn)
|
||||||
|
|
||||||
|
response = sns.list_topics()
|
||||||
|
assert arn not in [p['TopicArn'] for p in response['Topics']]
|
||||||
|
|
||||||
|
with pytest.raises(sns.exceptions.NotFoundException):
|
||||||
|
sns.get_topic_attributes(TopicArn=arn)
|
||||||
|
sns.delete_topic(TopicArn=arn)
|
||||||
|
|
||||||
|
@pytest.mark.iam_account
|
||||||
|
@pytest.mark.sns
|
||||||
|
def test_cross_account_topic(sns, sns_alt):
|
||||||
|
name = get_new_topic_name()
|
||||||
|
arn = sns.create_topic(Name=name)['TopicArn']
|
||||||
|
|
||||||
|
# not visible to any alt user apis
|
||||||
|
with pytest.raises(sns.exceptions.NotFoundException):
|
||||||
|
sns_alt.get_topic_attributes(TopicArn=arn)
|
||||||
|
with pytest.raises(sns.exceptions.NotFoundException):
|
||||||
|
sns_alt.set_topic_attributes(TopicArn=arn, AttributeName='Policy', AttributeValue='')
|
||||||
|
|
||||||
|
# delete returns success
|
||||||
|
sns_alt.delete_topic(TopicArn=arn)
|
||||||
|
|
||||||
|
response = sns_alt.list_topics()
|
||||||
|
assert arn not in [p['TopicArn'] for p in response['Topics']]
|
||||||
|
|
||||||
|
@pytest.mark.iam_account
|
||||||
|
@pytest.mark.sns
|
||||||
|
def test_account_topic_publish(sns, s3):
|
||||||
|
name = get_new_topic_name()
|
||||||
|
|
||||||
|
response = sns.create_topic(Name=name)
|
||||||
|
topic_arn = response['TopicArn']
|
||||||
|
|
||||||
|
bucket = get_new_bucket_name()
|
||||||
|
s3.create_bucket(Bucket=bucket)
|
||||||
|
|
||||||
|
config = {'TopicConfigurations': [{
|
||||||
|
'Id': 'id',
|
||||||
|
'TopicArn': topic_arn,
|
||||||
|
'Events': [ 's3:ObjectCreated:*' ],
|
||||||
|
}]}
|
||||||
|
s3.put_bucket_notification_configuration(
|
||||||
|
Bucket=bucket, NotificationConfiguration=config)
|
||||||
|
|
||||||
|
@pytest.mark.iam_account
|
||||||
|
@pytest.mark.iam_cross_account
|
||||||
|
@pytest.mark.sns
|
||||||
|
def test_cross_account_topic_publish(sns, s3_alt, iam_alt_root):
|
||||||
|
name = get_new_topic_name()
|
||||||
|
|
||||||
|
response = sns.create_topic(Name=name)
|
||||||
|
topic_arn = response['TopicArn']
|
||||||
|
|
||||||
|
bucket = get_new_bucket_name()
|
||||||
|
s3_alt.create_bucket(Bucket=bucket)
|
||||||
|
|
||||||
|
config = {'TopicConfigurations': [{
|
||||||
|
'Id': 'id',
|
||||||
|
'TopicArn': topic_arn,
|
||||||
|
'Events': [ 's3:ObjectCreated:*' ],
|
||||||
|
}]}
|
||||||
|
|
||||||
|
# expect AccessDenies because no resource policy allows cross-account access
|
||||||
|
e = assert_raises(ClientError, s3_alt.put_bucket_notification_configuration,
|
||||||
|
Bucket=bucket, NotificationConfiguration=config)
|
||||||
|
status, error_code = _get_status_and_error_code(e.response)
|
||||||
|
assert status == 403
|
||||||
|
assert error_code == 'AccessDenied'
|
||||||
|
|
||||||
|
# add topic policy to allow the alt user
|
||||||
|
alt_principal = iam_alt_root.get_user()['User']['Arn']
|
||||||
|
policy = json.dumps({
|
||||||
|
'Version': '2012-10-17',
|
||||||
|
'Statement': [{
|
||||||
|
'Effect': 'Allow',
|
||||||
|
'Principal': {'AWS': alt_principal},
|
||||||
|
'Action': 'sns:Publish',
|
||||||
|
'Resource': topic_arn
|
||||||
|
}]
|
||||||
|
})
|
||||||
|
sns.set_topic_attributes(TopicArn=topic_arn, AttributeName='Policy',
|
||||||
|
AttributeValue=policy)
|
||||||
|
|
||||||
|
s3_alt.put_bucket_notification_configuration(
|
||||||
|
Bucket=bucket, NotificationConfiguration=config)
|
Loading…
Reference in a new issue