mirror of
https://github.com/ceph/s3-tests.git
synced 2024-11-25 03:47:22 +00:00
STS Tests Files and modification in __init__.py
Signed-off-by: Kalpesh Pandya <kapandya@redhat.com>
This commit is contained in:
parent
982d15c30e
commit
8dbe896f89
4 changed files with 310 additions and 0 deletions
14
README.rst
14
README.rst
|
@ -54,3 +54,17 @@ You can run only the boto3 tests with::
|
||||||
|
|
||||||
S3TEST_CONF=your.conf ./virtualenv/bin/nosetests -v -s -A 'not fails_on_rgw' s3tests_boto3.functional
|
S3TEST_CONF=your.conf ./virtualenv/bin/nosetests -v -s -A 'not fails_on_rgw' s3tests_boto3.functional
|
||||||
|
|
||||||
|
========================
|
||||||
|
STS compatibility tests
|
||||||
|
========================
|
||||||
|
|
||||||
|
This section contains some basic tests for the AssumeRole and GetSessionToken API's. The test file is located under ``s3tests_boto3/functional``.
|
||||||
|
|
||||||
|
You can run only the sts tests with::
|
||||||
|
|
||||||
|
S3TEST_CONF=your.conf ./virtualenv/bin/nosetests s3tests_boto3.functional.test_sts
|
||||||
|
|
||||||
|
You can filter tests based on the attributes. There is a attribute named ``sts_test`` to run specifically the sts tests as below::
|
||||||
|
|
||||||
|
S3TEST_CONF=your.conf ./virtualenv/bin/nosetests -v -s -A 'sts_test' s3tests_boto3.functional.test_sts
|
||||||
|
|
||||||
|
|
|
@ -68,3 +68,17 @@ secret_key = opqrstuvwxyzabcdefghijklmnopqrstuvwxyzab
|
||||||
|
|
||||||
# tenant email set in vstart.sh
|
# tenant email set in vstart.sh
|
||||||
email = tenanteduser@example.com
|
email = tenanteduser@example.com
|
||||||
|
|
||||||
|
[iam]
|
||||||
|
#used for iam operations in sts-tests
|
||||||
|
#user_id from vstart.sh
|
||||||
|
user_id = 0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef
|
||||||
|
|
||||||
|
#access_key from vstart.sh
|
||||||
|
access_key = ABCDEFGHIJKLMNOPQRST
|
||||||
|
|
||||||
|
#secret_key vstart.sh
|
||||||
|
secret_key = abcdefghijklmnopqrstuvwxyzabcdefghijklmn
|
||||||
|
|
||||||
|
#display_name from vstart.sh
|
||||||
|
display_name = youruseridhere
|
||||||
|
|
|
@ -158,6 +158,8 @@ def setup():
|
||||||
raise RuntimeError('Your config file is missing the "s3 alt" section!')
|
raise RuntimeError('Your config file is missing the "s3 alt" section!')
|
||||||
if not cfg.has_section("s3 tenant"):
|
if not cfg.has_section("s3 tenant"):
|
||||||
raise RuntimeError('Your config file is missing the "s3 tenant" section!')
|
raise RuntimeError('Your config file is missing the "s3 tenant" section!')
|
||||||
|
if not cfg.has_section("iam"):
|
||||||
|
raise RuntimeError('Your config file is missing the "iam" section!')
|
||||||
|
|
||||||
global prefix
|
global prefix
|
||||||
|
|
||||||
|
@ -205,6 +207,12 @@ def setup():
|
||||||
config.tenant_user_id = cfg.get('s3 tenant',"user_id")
|
config.tenant_user_id = cfg.get('s3 tenant',"user_id")
|
||||||
config.tenant_email = cfg.get('s3 tenant',"email")
|
config.tenant_email = cfg.get('s3 tenant',"email")
|
||||||
|
|
||||||
|
config.iam_access_key = cfg.get('iam',"access_key")
|
||||||
|
config.iam_secret_key = cfg.get('iam',"secret_key")
|
||||||
|
config.iam_display_name = cfg.get('iam',"display_name")
|
||||||
|
config.iam_user_id = cfg.get('iam',"user_id")
|
||||||
|
#config.iam_email = cfg.get('iam',"email")
|
||||||
|
|
||||||
# vars from the fixtures section
|
# vars from the fixtures section
|
||||||
try:
|
try:
|
||||||
template = cfg.get('fixtures', "bucket prefix")
|
template = cfg.get('fixtures', "bucket prefix")
|
||||||
|
@ -246,6 +254,32 @@ def get_v2_client():
|
||||||
config=Config(signature_version='s3'))
|
config=Config(signature_version='s3'))
|
||||||
return client
|
return client
|
||||||
|
|
||||||
|
def get_sts_client(client_config=None):
|
||||||
|
if client_config == None:
|
||||||
|
client_config = Config(signature_version='s3v4')
|
||||||
|
|
||||||
|
client = boto3.client(service_name='sts',
|
||||||
|
aws_access_key_id=config.alt_access_key,
|
||||||
|
aws_secret_access_key=config.alt_secret_key,
|
||||||
|
endpoint_url=config.default_endpoint,
|
||||||
|
region_name='',
|
||||||
|
use_ssl=config.default_is_secure,
|
||||||
|
config=client_config)
|
||||||
|
return client
|
||||||
|
|
||||||
|
def get_iam_client(client_config=None):
|
||||||
|
if client_config == None:
|
||||||
|
client_config = Config(signature_version='s3v4')
|
||||||
|
|
||||||
|
client = boto3.client(service_name='iam',
|
||||||
|
aws_access_key_id=config.iam_access_key,
|
||||||
|
aws_secret_access_key=config.iam_secret_key,
|
||||||
|
endpoint_url=config.default_endpoint,
|
||||||
|
region_name='',
|
||||||
|
use_ssl=config.default_is_secure,
|
||||||
|
config=client_config)
|
||||||
|
return client
|
||||||
|
|
||||||
def get_alt_client(client_config=None):
|
def get_alt_client(client_config=None):
|
||||||
if client_config == None:
|
if client_config == None:
|
||||||
client_config = Config(signature_version='s3v4')
|
client_config = Config(signature_version='s3v4')
|
||||||
|
@ -359,6 +393,21 @@ def get_new_bucket(client=None, name=None):
|
||||||
client.create_bucket(Bucket=name)
|
client.create_bucket(Bucket=name)
|
||||||
return name
|
return name
|
||||||
|
|
||||||
|
def get_parameter_name():
|
||||||
|
parameter_name=""
|
||||||
|
rand = ''.join(
|
||||||
|
random.choice(string.ascii_lowercase + string.digits)
|
||||||
|
for c in range(255)
|
||||||
|
)
|
||||||
|
while rand:
|
||||||
|
parameter_name = '{random}'.format(random=rand)
|
||||||
|
if len(parameter_name) <= 10:
|
||||||
|
return parameter_name
|
||||||
|
rand = rand[:-1]
|
||||||
|
return parameter_name
|
||||||
|
|
||||||
|
def get_sts_user_id():
|
||||||
|
return config.alt_user_id
|
||||||
|
|
||||||
def get_config_is_secure():
|
def get_config_is_secure():
|
||||||
return config.default_is_secure
|
return config.default_is_secure
|
||||||
|
|
233
s3tests_boto3/functional/test_sts.py
Normal file
233
s3tests_boto3/functional/test_sts.py
Normal file
|
@ -0,0 +1,233 @@
|
||||||
|
import boto3
|
||||||
|
import botocore.session
|
||||||
|
from botocore.exceptions import ClientError
|
||||||
|
from botocore.exceptions import ParamValidationError
|
||||||
|
from nose.tools import eq_ as eq
|
||||||
|
from nose.plugins.attrib import attr
|
||||||
|
from nose.plugins.skip import SkipTest
|
||||||
|
import isodate
|
||||||
|
import email.utils
|
||||||
|
import datetime
|
||||||
|
import threading
|
||||||
|
import re
|
||||||
|
import pytz
|
||||||
|
from collections import OrderedDict
|
||||||
|
import requests
|
||||||
|
import json
|
||||||
|
import base64
|
||||||
|
import hmac
|
||||||
|
import hashlib
|
||||||
|
import xml.etree.ElementTree as ET
|
||||||
|
import time
|
||||||
|
import operator
|
||||||
|
import nose
|
||||||
|
import os
|
||||||
|
import string
|
||||||
|
import random
|
||||||
|
import socket
|
||||||
|
import ssl
|
||||||
|
import logging
|
||||||
|
from collections import namedtuple
|
||||||
|
|
||||||
|
from email.header import decode_header
|
||||||
|
|
||||||
|
from . import(
|
||||||
|
get_iam_client,
|
||||||
|
get_sts_client,
|
||||||
|
get_client,
|
||||||
|
get_alt_user_id,
|
||||||
|
get_config_endpoint,
|
||||||
|
get_new_bucket_name,
|
||||||
|
get_parameter_name,
|
||||||
|
get_main_aws_access_key,
|
||||||
|
get_main_aws_secret_key,
|
||||||
|
)
|
||||||
|
|
||||||
|
def create_role(iam_client,path,rolename,policy_document,description,sessionduration,permissionboundary):
|
||||||
|
role_err=None
|
||||||
|
if rolename is None:
|
||||||
|
rolename=get_parameter_name()
|
||||||
|
try:
|
||||||
|
role_response = iam_client.create_role(Path=path,RoleName=rolename,AssumeRolePolicyDocument=policy_document,)
|
||||||
|
except ClientError as e:
|
||||||
|
role_err = e.response['Code']
|
||||||
|
return (role_err,role_response,rolename)
|
||||||
|
|
||||||
|
def put_role_policy(iam_client,rolename,policyname,role_policy):
|
||||||
|
role_err=None
|
||||||
|
if policyname is None:
|
||||||
|
policyname=get_parameter_name()
|
||||||
|
try:
|
||||||
|
role_response = iam_client.put_role_policy(RoleName=rolename,PolicyName=policyname,PolicyDocument=role_policy)
|
||||||
|
except ClientError as e:
|
||||||
|
role_err = e.response['Code']
|
||||||
|
return (role_err,role_response)
|
||||||
|
|
||||||
|
def put_user_policy(iam_client,username,policyname,policy_document):
|
||||||
|
role_err=None
|
||||||
|
if policyname is None:
|
||||||
|
policyname=get_parameter_name()
|
||||||
|
try:
|
||||||
|
role_response = iam_client.put_user_policy(UserName=username,PolicyName=policyname,PolicyDocument=policy_document)
|
||||||
|
except ClientError as e:
|
||||||
|
role_err = e.response['Code']
|
||||||
|
return (role_err,role_response)
|
||||||
|
|
||||||
|
@attr(resource='get session token')
|
||||||
|
@attr(method='get')
|
||||||
|
@attr(operation='check')
|
||||||
|
@attr(assertion='s3 ops only accessible by temporary credentials')
|
||||||
|
@attr('sts_test')
|
||||||
|
def test_get_session_token():
|
||||||
|
iam_client=get_iam_client()
|
||||||
|
sts_client=get_sts_client()
|
||||||
|
sts_user_id=get_alt_user_id()
|
||||||
|
default_endpoint=get_config_endpoint()
|
||||||
|
user_policy = "{\"Version\":\"2012-10-17\",\"Statement\":[{\"Effect\":\"Deny\",\"Action\":\"s3:*\",\"Resource\":[\"*\"],\"Condition\":{\"BoolIfExists\":{\"sts:authentication\":\"false\"}}},{\"Effect\":\"Allow\",\"Action\":\"sts:GetSessionToken\",\"Resource\":\"*\",\"Condition\":{\"BoolIfExists\":{\"sts:authentication\":\"false\"}}}]}"
|
||||||
|
(resp_err,resp)=put_user_policy(iam_client,sts_user_id,None,user_policy)
|
||||||
|
eq(resp['ResponseMetadata']['HTTPStatusCode'],200)
|
||||||
|
response=sts_client.get_session_token(DurationSeconds=43200)
|
||||||
|
eq(response['ResponseMetadata']['HTTPStatusCode'],200)
|
||||||
|
s3_client=boto3.client('s3',
|
||||||
|
aws_access_key_id = response['Credentials']['AccessKeyId'],
|
||||||
|
aws_secret_access_key = response['Credentials']['SecretAccessKey'],
|
||||||
|
aws_session_token = response['Credentials']['SessionToken'],
|
||||||
|
endpoint_url=default_endpoint,
|
||||||
|
region_name='',
|
||||||
|
)
|
||||||
|
bucket_name = get_new_bucket_name()
|
||||||
|
s3bucket = s3_client.create_bucket(Bucket=bucket_name)
|
||||||
|
eq(s3bucket['ResponseMetadata']['HTTPStatusCode'],200)
|
||||||
|
finish=s3_client.delete_bucket(Bucket=bucket_name)
|
||||||
|
|
||||||
|
@attr(resource='get session token')
|
||||||
|
@attr(method='get')
|
||||||
|
@attr(operation='check')
|
||||||
|
@attr(assertion='s3 ops denied by permanent credentials')
|
||||||
|
@attr('sts_test')
|
||||||
|
def test_get_session_token_permanent_creds_denied():
|
||||||
|
s3bucket_error=None
|
||||||
|
iam_client=get_iam_client()
|
||||||
|
sts_client=get_sts_client()
|
||||||
|
sts_user_id=get_alt_user_id()
|
||||||
|
default_endpoint=get_config_endpoint()
|
||||||
|
s3_main_access_key=get_main_aws_access_key()
|
||||||
|
s3_main_secret_key=get_main_aws_secret_key()
|
||||||
|
user_policy = "{\"Version\":\"2012-10-17\",\"Statement\":[{\"Effect\":\"Deny\",\"Action\":\"s3:*\",\"Resource\":[\"*\"],\"Condition\":{\"BoolIfExists\":{\"sts:authentication\":\"false\"}}},{\"Effect\":\"Allow\",\"Action\":\"sts:GetSessionToken\",\"Resource\":\"*\",\"Condition\":{\"BoolIfExists\":{\"sts:authentication\":\"false\"}}}]}"
|
||||||
|
(resp_err,resp)=put_user_policy(iam_client,sts_user_id,None,user_policy)
|
||||||
|
eq(resp['ResponseMetadata']['HTTPStatusCode'],200)
|
||||||
|
response=sts_client.get_session_token(DurationSeconds=43200)
|
||||||
|
eq(response['ResponseMetadata']['HTTPStatusCode'],200)
|
||||||
|
s3_client=boto3.client('s3',
|
||||||
|
aws_access_key_id = s3_main_access_key,
|
||||||
|
aws_secret_access_key = s3_main_secret_key,
|
||||||
|
aws_session_token = response['Credentials']['SessionToken'],
|
||||||
|
endpoint_url=default_endpoint,
|
||||||
|
region_name='',
|
||||||
|
)
|
||||||
|
bucket_name = get_new_bucket_name()
|
||||||
|
try:
|
||||||
|
s3bucket = s3_client.create_bucket(Bucket=bucket_name)
|
||||||
|
except ClientError as e:
|
||||||
|
s3bucket_error = e.response.get("Error", {}).get("Code")
|
||||||
|
eq(s3bucket_error,'AccessDenied')
|
||||||
|
|
||||||
|
@attr(resource='assume role')
|
||||||
|
@attr(method='get')
|
||||||
|
@attr(operation='check')
|
||||||
|
@attr(assertion='role policy allows all s3 ops')
|
||||||
|
@attr('sts_test')
|
||||||
|
def test_assume_role_allow():
|
||||||
|
iam_client=get_iam_client()
|
||||||
|
sts_client=get_sts_client()
|
||||||
|
sts_user_id=get_alt_user_id()
|
||||||
|
default_endpoint=get_config_endpoint()
|
||||||
|
role_session_name=get_parameter_name()
|
||||||
|
policy_document = "{\"Version\":\"2012-10-17\",\"Statement\":[{\"Effect\":\"Allow\",\"Principal\":{\"AWS\":[\"arn:aws:iam:::user/"+sts_user_id+"\"]},\"Action\":[\"sts:AssumeRole\"]}]}"
|
||||||
|
(role_error,role_response,general_role_name)=create_role(iam_client,'/',None,policy_document,None,None,None)
|
||||||
|
eq(role_response['Role']['Arn'],'arn:aws:iam:::role/'+general_role_name+'')
|
||||||
|
role_policy = "{\"Version\":\"2012-10-17\",\"Statement\":{\"Effect\":\"Allow\",\"Action\":\"s3:*\",\"Resource\":\"arn:aws:s3:::*\"}}"
|
||||||
|
(role_err,response)=put_role_policy(iam_client,general_role_name,None,role_policy)
|
||||||
|
eq(response['ResponseMetadata']['HTTPStatusCode'],200)
|
||||||
|
resp=sts_client.assume_role(RoleArn=role_response['Role']['Arn'],RoleSessionName=role_session_name)
|
||||||
|
eq(resp['ResponseMetadata']['HTTPStatusCode'],200)
|
||||||
|
s3_client = boto3.client('s3',
|
||||||
|
aws_access_key_id = resp['Credentials']['AccessKeyId'],
|
||||||
|
aws_secret_access_key = resp['Credentials']['SecretAccessKey'],
|
||||||
|
aws_session_token = resp['Credentials']['SessionToken'],
|
||||||
|
endpoint_url=default_endpoint,
|
||||||
|
region_name='',
|
||||||
|
)
|
||||||
|
bucket_name = get_new_bucket_name()
|
||||||
|
s3bucket = s3_client.create_bucket(Bucket=bucket_name)
|
||||||
|
eq(s3bucket['ResponseMetadata']['HTTPStatusCode'],200)
|
||||||
|
bkt = s3_client.delete_bucket(Bucket=bucket_name)
|
||||||
|
eq(bkt['ResponseMetadata']['HTTPStatusCode'],204)
|
||||||
|
|
||||||
|
@attr(resource='assume role')
|
||||||
|
@attr(method='get')
|
||||||
|
@attr(operation='check')
|
||||||
|
@attr(assertion='role policy denies all s3 ops')
|
||||||
|
@attr('sts_test')
|
||||||
|
def test_assume_role_deny():
|
||||||
|
s3bucket_error=None
|
||||||
|
iam_client=get_iam_client()
|
||||||
|
sts_client=get_sts_client()
|
||||||
|
sts_user_id=get_alt_user_id()
|
||||||
|
default_endpoint=get_config_endpoint()
|
||||||
|
role_session_name=get_parameter_name()
|
||||||
|
policy_document = "{\"Version\":\"2012-10-17\",\"Statement\":[{\"Effect\":\"Allow\",\"Principal\":{\"AWS\":[\"arn:aws:iam:::user/"+sts_user_id+"\"]},\"Action\":[\"sts:AssumeRole\"]}]}"
|
||||||
|
(role_error,role_response,general_role_name)=create_role(iam_client,'/',None,policy_document,None,None,None)
|
||||||
|
eq(role_response['Role']['Arn'],'arn:aws:iam:::role/'+general_role_name+'')
|
||||||
|
role_policy = "{\"Version\":\"2012-10-17\",\"Statement\":{\"Effect\":\"Deny\",\"Action\":\"s3:*\",\"Resource\":\"arn:aws:s3:::*\"}}"
|
||||||
|
(role_err,response)=put_role_policy(iam_client,general_role_name,None,role_policy)
|
||||||
|
eq(response['ResponseMetadata']['HTTPStatusCode'],200)
|
||||||
|
resp=sts_client.assume_role(RoleArn=role_response['Role']['Arn'],RoleSessionName=role_session_name)
|
||||||
|
eq(resp['ResponseMetadata']['HTTPStatusCode'],200)
|
||||||
|
s3_client = boto3.client('s3',
|
||||||
|
aws_access_key_id = resp['Credentials']['AccessKeyId'],
|
||||||
|
aws_secret_access_key = resp['Credentials']['SecretAccessKey'],
|
||||||
|
aws_session_token = resp['Credentials']['SessionToken'],
|
||||||
|
endpoint_url=default_endpoint,
|
||||||
|
region_name='',
|
||||||
|
)
|
||||||
|
bucket_name = get_new_bucket_name()
|
||||||
|
try:
|
||||||
|
s3bucket = s3_client.create_bucket(Bucket=bucket_name)
|
||||||
|
except ClientError as e:
|
||||||
|
s3bucket_error = e.response.get("Error", {}).get("Code")
|
||||||
|
eq(s3bucket_error,'AccessDenied')
|
||||||
|
|
||||||
|
@attr(resource='assume role')
|
||||||
|
@attr(method='get')
|
||||||
|
@attr(operation='check')
|
||||||
|
@attr(assertion='creds expire so all s3 ops fails')
|
||||||
|
@attr('sts_test')
|
||||||
|
def test_assume_role_creds_expiry():
|
||||||
|
iam_client=get_iam_client()
|
||||||
|
sts_client=get_sts_client()
|
||||||
|
sts_user_id=get_alt_user_id()
|
||||||
|
default_endpoint=get_config_endpoint()
|
||||||
|
role_session_name=get_parameter_name()
|
||||||
|
policy_document = "{\"Version\":\"2012-10-17\",\"Statement\":[{\"Effect\":\"Allow\",\"Principal\":{\"AWS\":[\"arn:aws:iam:::user/"+sts_user_id+"\"]},\"Action\":[\"sts:AssumeRole\"]}]}"
|
||||||
|
(role_error,role_response,general_role_name)=create_role(iam_client,'/',None,policy_document,None,None,None)
|
||||||
|
eq(role_response['Role']['Arn'],'arn:aws:iam:::role/'+general_role_name+'')
|
||||||
|
role_policy = "{\"Version\":\"2012-10-17\",\"Statement\":{\"Effect\":\"Allow\",\"Action\":\"s3:*\",\"Resource\":\"arn:aws:s3:::*\"}}"
|
||||||
|
(role_err,response)=put_role_policy(iam_client,general_role_name,None,role_policy)
|
||||||
|
eq(response['ResponseMetadata']['HTTPStatusCode'],200)
|
||||||
|
resp=sts_client.assume_role(RoleArn=role_response['Role']['Arn'],RoleSessionName=role_session_name,DurationSeconds=900)
|
||||||
|
eq(resp['ResponseMetadata']['HTTPStatusCode'],200)
|
||||||
|
time.sleep(900)
|
||||||
|
s3_client = boto3.client('s3',
|
||||||
|
aws_access_key_id = resp['Credentials']['AccessKeyId'],
|
||||||
|
aws_secret_access_key = resp['Credentials']['SecretAccessKey'],
|
||||||
|
aws_session_token = resp['Credentials']['SessionToken'],
|
||||||
|
endpoint_url=default_endpoint,
|
||||||
|
region_name='',
|
||||||
|
)
|
||||||
|
bucket_name = get_new_bucket_name()
|
||||||
|
try:
|
||||||
|
s3bucket = s3_client.create_bucket(Bucket=bucket_name)
|
||||||
|
except ClientError as e:
|
||||||
|
s3bucket_error = e.response.get("Error", {}).get("Code")
|
||||||
|
eq(s3bucket_error,'AccessDenied')
|
Loading…
Reference in a new issue