mirror of
https://github.com/ceph/s3-tests.git
synced 2025-01-23 05:20:17 +00:00
5219b86db9
Fixes: https://tracker.ceph.com/issues/58762 Signed-off-by: Casey Bodley <cbodley@redhat.com>
864 lines
34 KiB
Python
864 lines
34 KiB
Python
import json
|
|
|
|
from botocore.exceptions import ClientError
|
|
import pytest
|
|
|
|
from s3tests_boto3.functional.utils import assert_raises
|
|
from s3tests_boto3.functional.test_s3 import _multipart_upload
|
|
from . import (
|
|
configfile,
|
|
setup_teardown,
|
|
get_alt_client,
|
|
get_iam_client,
|
|
get_new_bucket,
|
|
get_iam_s3client,
|
|
get_alt_iam_client,
|
|
get_alt_user_id,
|
|
)
|
|
from .utils import _get_status, _get_status_and_error_code
|
|
|
|
|
|
@pytest.mark.user_policy
|
|
@pytest.mark.test_of_iam
|
|
def test_put_user_policy():
|
|
client = get_iam_client()
|
|
|
|
policy_document = json.dumps(
|
|
{"Version": "2012-10-17",
|
|
"Statement": {
|
|
"Effect": "Allow",
|
|
"Action": "*",
|
|
"Resource": "*"}}
|
|
)
|
|
response = client.put_user_policy(PolicyDocument=policy_document, PolicyName='AllAccessPolicy',
|
|
UserName=get_alt_user_id())
|
|
assert response['ResponseMetadata']['HTTPStatusCode'] == 200
|
|
response = client.delete_user_policy(PolicyName='AllAccessPolicy',
|
|
UserName=get_alt_user_id())
|
|
assert response['ResponseMetadata']['HTTPStatusCode'] == 200
|
|
|
|
|
|
@pytest.mark.user_policy
|
|
@pytest.mark.test_of_iam
|
|
def test_put_user_policy_invalid_user():
|
|
client = get_iam_client()
|
|
|
|
policy_document = json.dumps(
|
|
{"Version": "2012-10-17",
|
|
"Statement": {
|
|
"Effect": "Allow",
|
|
"Action": "*",
|
|
"Resource": "*"}}
|
|
)
|
|
e = assert_raises(ClientError, client.put_user_policy, PolicyDocument=policy_document,
|
|
PolicyName='AllAccessPolicy', UserName="some-non-existing-user-id")
|
|
status = _get_status(e.response)
|
|
assert status == 404
|
|
|
|
|
|
@pytest.mark.user_policy
|
|
@pytest.mark.test_of_iam
|
|
def test_put_user_policy_parameter_limit():
|
|
client = get_iam_client()
|
|
|
|
policy_document = json.dumps(
|
|
{"Version": "2012-10-17",
|
|
"Statement": [{
|
|
"Effect": "Allow",
|
|
"Action": "*",
|
|
"Resource": "*"}] * 1000
|
|
}
|
|
)
|
|
e = assert_raises(ClientError, client.put_user_policy, PolicyDocument=policy_document,
|
|
PolicyName='AllAccessPolicy' * 10, UserName=get_alt_user_id())
|
|
status = _get_status(e.response)
|
|
assert status == 400
|
|
|
|
|
|
@pytest.mark.user_policy
|
|
@pytest.mark.test_of_iam
|
|
@pytest.mark.fails_on_rgw
|
|
def test_put_user_policy_invalid_element():
|
|
client = get_iam_client()
|
|
|
|
# With Version other than 2012-10-17
|
|
policy_document = json.dumps(
|
|
{"Version": "2010-10-17",
|
|
"Statement": [{
|
|
"Effect": "Allow",
|
|
"Action": "*",
|
|
"Resource": "*"}]
|
|
}
|
|
)
|
|
e = assert_raises(ClientError, client.put_user_policy, PolicyDocument=policy_document,
|
|
PolicyName='AllAccessPolicy', UserName=get_alt_user_id())
|
|
status = _get_status(e.response)
|
|
assert status == 400
|
|
|
|
# With no Statement
|
|
policy_document = json.dumps(
|
|
{
|
|
"Version": "2012-10-17",
|
|
}
|
|
)
|
|
e = assert_raises(ClientError, client.put_user_policy, PolicyDocument=policy_document,
|
|
PolicyName='AllAccessPolicy', UserName=get_alt_user_id())
|
|
status = _get_status(e.response)
|
|
assert status == 400
|
|
|
|
# with same Sid for 2 statements
|
|
policy_document = json.dumps(
|
|
{"Version": "2012-10-17",
|
|
"Statement": [
|
|
{"Sid": "98AB54CF",
|
|
"Effect": "Allow",
|
|
"Action": "*",
|
|
"Resource": "*"},
|
|
{"Sid": "98AB54CF",
|
|
"Effect": "Allow",
|
|
"Action": "*",
|
|
"Resource": "*"}]
|
|
}
|
|
)
|
|
e = assert_raises(ClientError, client.put_user_policy, PolicyDocument=policy_document,
|
|
PolicyName='AllAccessPolicy', UserName=get_alt_user_id())
|
|
status = _get_status(e.response)
|
|
assert status == 400
|
|
|
|
# with Principal
|
|
policy_document = json.dumps(
|
|
{"Version": "2012-10-17",
|
|
"Statement": [{
|
|
"Effect": "Allow",
|
|
"Action": "*",
|
|
"Resource": "*",
|
|
"Principal": "arn:aws:iam:::username"}]
|
|
}
|
|
)
|
|
e = assert_raises(ClientError, client.put_user_policy, PolicyDocument=policy_document,
|
|
PolicyName='AllAccessPolicy', UserName=get_alt_user_id())
|
|
status = _get_status(e.response)
|
|
assert status == 400
|
|
|
|
|
|
@pytest.mark.user_policy
|
|
@pytest.mark.test_of_iam
|
|
def test_put_existing_user_policy():
|
|
client = get_iam_client()
|
|
|
|
policy_document = json.dumps(
|
|
{"Version": "2012-10-17",
|
|
"Statement": {
|
|
"Effect": "Allow",
|
|
"Action": "*",
|
|
"Resource": "*"}
|
|
}
|
|
)
|
|
response = client.put_user_policy(PolicyDocument=policy_document, PolicyName='AllAccessPolicy',
|
|
UserName=get_alt_user_id())
|
|
assert response['ResponseMetadata']['HTTPStatusCode'] == 200
|
|
client.put_user_policy(PolicyDocument=policy_document, PolicyName='AllAccessPolicy',
|
|
UserName=get_alt_user_id())
|
|
client.delete_user_policy(PolicyName='AllAccessPolicy', UserName=get_alt_user_id())
|
|
|
|
|
|
@pytest.mark.user_policy
|
|
@pytest.mark.test_of_iam
|
|
def test_list_user_policy():
|
|
client = get_iam_client()
|
|
|
|
policy_document = json.dumps(
|
|
{"Version": "2012-10-17",
|
|
"Statement": {
|
|
"Effect": "Allow",
|
|
"Action": "*",
|
|
"Resource": "*"}
|
|
}
|
|
)
|
|
response = client.put_user_policy(PolicyDocument=policy_document, PolicyName='AllAccessPolicy',
|
|
UserName=get_alt_user_id())
|
|
assert response['ResponseMetadata']['HTTPStatusCode'] == 200
|
|
response = client.list_user_policies(UserName=get_alt_user_id())
|
|
assert response['ResponseMetadata']['HTTPStatusCode'] == 200
|
|
client.delete_user_policy(PolicyName='AllAccessPolicy', UserName=get_alt_user_id())
|
|
|
|
|
|
@pytest.mark.user_policy
|
|
@pytest.mark.test_of_iam
|
|
def test_list_user_policy_invalid_user():
|
|
client = get_iam_client()
|
|
e = assert_raises(ClientError, client.list_user_policies, UserName="some-non-existing-user-id")
|
|
status = _get_status(e.response)
|
|
assert status == 404
|
|
|
|
|
|
@pytest.mark.user_policy
|
|
@pytest.mark.test_of_iam
|
|
def test_get_user_policy():
|
|
client = get_iam_client()
|
|
|
|
policy_document = json.dumps(
|
|
{"Version": "2012-10-17",
|
|
"Statement": {
|
|
"Effect": "Allow",
|
|
"Action": "*",
|
|
"Resource": "*"}}
|
|
)
|
|
response = client.put_user_policy(PolicyDocument=policy_document, PolicyName='AllAccessPolicy',
|
|
UserName=get_alt_user_id())
|
|
assert response['ResponseMetadata']['HTTPStatusCode'] == 200
|
|
response = client.get_user_policy(PolicyName='AllAccessPolicy', UserName=get_alt_user_id())
|
|
assert response['ResponseMetadata']['HTTPStatusCode'] == 200
|
|
|
|
response = client.delete_user_policy(PolicyName='AllAccessPolicy',
|
|
UserName=get_alt_user_id())
|
|
assert response['ResponseMetadata']['HTTPStatusCode'] == 200
|
|
|
|
|
|
@pytest.mark.user_policy
|
|
@pytest.mark.test_of_iam
|
|
def test_get_user_policy_invalid_user():
|
|
client = get_iam_client()
|
|
|
|
policy_document = json.dumps(
|
|
{"Version": "2012-10-17",
|
|
"Statement": {
|
|
"Effect": "Allow",
|
|
"Action": "*",
|
|
"Resource": "*"}}
|
|
)
|
|
response = client.put_user_policy(PolicyDocument=policy_document, PolicyName='AllAccessPolicy',
|
|
UserName=get_alt_user_id())
|
|
assert response['ResponseMetadata']['HTTPStatusCode'] == 200
|
|
e = assert_raises(ClientError, client.get_user_policy, PolicyName='AllAccessPolicy',
|
|
UserName="some-non-existing-user-id")
|
|
status = _get_status(e.response)
|
|
assert status == 404
|
|
client.delete_user_policy(PolicyName='AllAccessPolicy', UserName=get_alt_user_id())
|
|
|
|
|
|
@pytest.mark.user_policy
|
|
@pytest.mark.test_of_iam
|
|
@pytest.mark.fails_on_rgw
|
|
def test_get_user_policy_invalid_policy_name():
|
|
client = get_iam_client()
|
|
|
|
policy_document = json.dumps(
|
|
{"Version": "2012-10-17",
|
|
"Statement": {
|
|
"Effect": "Allow",
|
|
"Action": "*",
|
|
"Resource": "*"}}
|
|
)
|
|
client.put_user_policy(PolicyDocument=policy_document, PolicyName='AllAccessPolicy',
|
|
UserName=get_alt_user_id())
|
|
e = assert_raises(ClientError, client.get_user_policy, PolicyName='non-existing-policy-name',
|
|
UserName=get_alt_user_id())
|
|
status = _get_status(e.response)
|
|
assert status == 404
|
|
client.delete_user_policy(PolicyName='AllAccessPolicy', UserName=get_alt_user_id())
|
|
|
|
|
|
@pytest.mark.user_policy
|
|
@pytest.mark.test_of_iam
|
|
@pytest.mark.fails_on_rgw
|
|
def test_get_deleted_user_policy():
|
|
client = get_iam_client()
|
|
|
|
policy_document = json.dumps(
|
|
{"Version": "2012-10-17",
|
|
"Statement": {
|
|
"Effect": "Allow",
|
|
"Action": "*",
|
|
"Resource": "*"}}
|
|
)
|
|
client.put_user_policy(PolicyDocument=policy_document, PolicyName='AllAccessPolicy',
|
|
UserName=get_alt_user_id())
|
|
client.delete_user_policy(PolicyName='AllAccessPolicy', UserName=get_alt_user_id())
|
|
e = assert_raises(ClientError, client.get_user_policy, PolicyName='AllAccessPolicy',
|
|
UserName=get_alt_user_id())
|
|
status = _get_status(e.response)
|
|
assert status == 404
|
|
|
|
|
|
@pytest.mark.user_policy
|
|
@pytest.mark.test_of_iam
|
|
def test_get_user_policy_from_multiple_policies():
|
|
client = get_iam_client()
|
|
|
|
policy_document_allow = json.dumps(
|
|
{"Version": "2012-10-17",
|
|
"Statement": {
|
|
"Effect": "Allow",
|
|
"Action": "*",
|
|
"Resource": "*"}}
|
|
)
|
|
|
|
response = client.put_user_policy(PolicyDocument=policy_document_allow,
|
|
PolicyName='AllowAccessPolicy1',
|
|
UserName=get_alt_user_id())
|
|
assert response['ResponseMetadata']['HTTPStatusCode'] == 200
|
|
response = client.put_user_policy(PolicyDocument=policy_document_allow,
|
|
PolicyName='AllowAccessPolicy2',
|
|
UserName=get_alt_user_id())
|
|
assert response['ResponseMetadata']['HTTPStatusCode'] == 200
|
|
response = client.get_user_policy(PolicyName='AllowAccessPolicy2',
|
|
UserName=get_alt_user_id())
|
|
assert response['ResponseMetadata']['HTTPStatusCode'] == 200
|
|
|
|
response = client.delete_user_policy(PolicyName='AllowAccessPolicy1',
|
|
UserName=get_alt_user_id())
|
|
assert response['ResponseMetadata']['HTTPStatusCode'] == 200
|
|
response = client.delete_user_policy(PolicyName='AllowAccessPolicy2',
|
|
UserName=get_alt_user_id())
|
|
assert response['ResponseMetadata']['HTTPStatusCode'] == 200
|
|
|
|
|
|
@pytest.mark.user_policy
|
|
@pytest.mark.test_of_iam
|
|
def test_delete_user_policy():
|
|
client = get_iam_client()
|
|
|
|
policy_document_allow = json.dumps(
|
|
{"Version": "2012-10-17",
|
|
"Statement": {
|
|
"Effect": "Allow",
|
|
"Action": "*",
|
|
"Resource": "*"}}
|
|
)
|
|
|
|
response = client.put_user_policy(PolicyDocument=policy_document_allow,
|
|
PolicyName='AllowAccessPolicy',
|
|
UserName=get_alt_user_id())
|
|
assert response['ResponseMetadata']['HTTPStatusCode'] == 200
|
|
response = client.delete_user_policy(PolicyName='AllowAccessPolicy',
|
|
UserName=get_alt_user_id())
|
|
assert response['ResponseMetadata']['HTTPStatusCode'] == 200
|
|
|
|
|
|
@pytest.mark.user_policy
|
|
@pytest.mark.test_of_iam
|
|
def test_delete_user_policy_invalid_user():
|
|
client = get_iam_client()
|
|
|
|
policy_document_allow = json.dumps(
|
|
{"Version": "2012-10-17",
|
|
"Statement": {
|
|
"Effect": "Allow",
|
|
"Action": "*",
|
|
"Resource": "*"}}
|
|
)
|
|
|
|
response = client.put_user_policy(PolicyDocument=policy_document_allow,
|
|
PolicyName='AllowAccessPolicy',
|
|
UserName=get_alt_user_id())
|
|
assert response['ResponseMetadata']['HTTPStatusCode'] == 200
|
|
e = assert_raises(ClientError, client.delete_user_policy, PolicyName='AllAccessPolicy',
|
|
UserName="some-non-existing-user-id")
|
|
status = _get_status(e.response)
|
|
assert status == 404
|
|
response = client.delete_user_policy(PolicyName='AllowAccessPolicy',
|
|
UserName=get_alt_user_id())
|
|
assert response['ResponseMetadata']['HTTPStatusCode'] == 200
|
|
|
|
|
|
@pytest.mark.user_policy
|
|
@pytest.mark.test_of_iam
|
|
def test_delete_user_policy_invalid_policy_name():
|
|
client = get_iam_client()
|
|
|
|
policy_document_allow = json.dumps(
|
|
{"Version": "2012-10-17",
|
|
"Statement": {
|
|
"Effect": "Allow",
|
|
"Action": "*",
|
|
"Resource": "*"}}
|
|
)
|
|
|
|
response = client.put_user_policy(PolicyDocument=policy_document_allow,
|
|
PolicyName='AllowAccessPolicy',
|
|
UserName=get_alt_user_id())
|
|
assert response['ResponseMetadata']['HTTPStatusCode'] == 200
|
|
e = assert_raises(ClientError, client.delete_user_policy, PolicyName='non-existing-policy-name',
|
|
UserName=get_alt_user_id())
|
|
status = _get_status(e.response)
|
|
assert status == 404
|
|
response = client.delete_user_policy(PolicyName='AllowAccessPolicy',
|
|
UserName=get_alt_user_id())
|
|
assert response['ResponseMetadata']['HTTPStatusCode'] == 200
|
|
|
|
|
|
@pytest.mark.user_policy
|
|
@pytest.mark.test_of_iam
|
|
def test_delete_user_policy_from_multiple_policies():
|
|
client = get_iam_client()
|
|
|
|
policy_document_allow = json.dumps(
|
|
{"Version": "2012-10-17",
|
|
"Statement": {
|
|
"Effect": "Allow",
|
|
"Action": "*",
|
|
"Resource": "*"}}
|
|
)
|
|
|
|
response = client.put_user_policy(PolicyDocument=policy_document_allow,
|
|
PolicyName='AllowAccessPolicy1',
|
|
UserName=get_alt_user_id())
|
|
assert response['ResponseMetadata']['HTTPStatusCode'] == 200
|
|
response = client.put_user_policy(PolicyDocument=policy_document_allow,
|
|
PolicyName='AllowAccessPolicy2',
|
|
UserName=get_alt_user_id())
|
|
assert response['ResponseMetadata']['HTTPStatusCode'] == 200
|
|
response = client.put_user_policy(PolicyDocument=policy_document_allow,
|
|
PolicyName='AllowAccessPolicy3',
|
|
UserName=get_alt_user_id())
|
|
assert response['ResponseMetadata']['HTTPStatusCode'] == 200
|
|
response = client.delete_user_policy(PolicyName='AllowAccessPolicy1',
|
|
UserName=get_alt_user_id())
|
|
assert response['ResponseMetadata']['HTTPStatusCode'] == 200
|
|
response = client.delete_user_policy(PolicyName='AllowAccessPolicy2',
|
|
UserName=get_alt_user_id())
|
|
assert response['ResponseMetadata']['HTTPStatusCode'] == 200
|
|
response = client.get_user_policy(PolicyName='AllowAccessPolicy3',
|
|
UserName=get_alt_user_id())
|
|
assert response['ResponseMetadata']['HTTPStatusCode'] == 200
|
|
|
|
response = client.delete_user_policy(PolicyName='AllowAccessPolicy3',
|
|
UserName=get_alt_user_id())
|
|
assert response['ResponseMetadata']['HTTPStatusCode'] == 200
|
|
|
|
|
|
@pytest.mark.user_policy
|
|
@pytest.mark.test_of_iam
|
|
def test_allow_bucket_actions_in_user_policy():
|
|
client = get_iam_client()
|
|
s3_client_alt = get_alt_client()
|
|
|
|
s3_client_iam = get_iam_s3client()
|
|
bucket = get_new_bucket(client=s3_client_iam)
|
|
s3_client_iam.put_object(Bucket=bucket, Key='foo', Body='bar')
|
|
|
|
policy_document_allow = json.dumps(
|
|
{"Version": "2012-10-17",
|
|
"Statement": {
|
|
"Effect": "Allow",
|
|
"Action": ["s3:ListBucket", "s3:DeleteBucket"],
|
|
"Resource": f"arn:aws:s3:::{bucket}"}}
|
|
)
|
|
|
|
response = client.put_user_policy(PolicyDocument=policy_document_allow,
|
|
PolicyName='AllowAccessPolicy', UserName=get_alt_user_id())
|
|
assert response['ResponseMetadata']['HTTPStatusCode'] == 200
|
|
|
|
response = s3_client_alt.list_objects(Bucket=bucket)
|
|
object_found = False
|
|
for object_received in response['Contents']:
|
|
if "foo" == object_received['Key']:
|
|
object_found = True
|
|
break
|
|
if not object_found:
|
|
raise AssertionError("Object is not listed")
|
|
|
|
response = s3_client_iam.delete_object(Bucket=bucket, Key='foo')
|
|
assert response['ResponseMetadata']['HTTPStatusCode'] == 204
|
|
|
|
response = s3_client_alt.delete_bucket(Bucket=bucket)
|
|
assert response['ResponseMetadata']['HTTPStatusCode'] == 204
|
|
|
|
response = s3_client_iam.list_buckets()
|
|
for bucket in response['Buckets']:
|
|
if bucket == bucket['Name']:
|
|
raise AssertionError("deleted bucket is getting listed")
|
|
|
|
response = client.delete_user_policy(PolicyName='AllowAccessPolicy',
|
|
UserName=get_alt_user_id())
|
|
assert response['ResponseMetadata']['HTTPStatusCode'] == 200
|
|
|
|
|
|
@pytest.mark.user_policy
|
|
@pytest.mark.test_of_iam
|
|
@pytest.mark.fails_on_dbstore
|
|
def test_deny_bucket_actions_in_user_policy():
|
|
client = get_iam_client()
|
|
s3_client = get_alt_client()
|
|
bucket = get_new_bucket(client=s3_client)
|
|
|
|
policy_document_deny = json.dumps(
|
|
{"Version": "2012-10-17",
|
|
"Statement": {
|
|
"Effect": "Deny",
|
|
"Action": ["s3:ListAllMyBuckets", "s3:DeleteBucket"],
|
|
"Resource": "arn:aws:s3:::*"}}
|
|
)
|
|
|
|
response = client.put_user_policy(PolicyDocument=policy_document_deny,
|
|
PolicyName='DenyAccessPolicy',
|
|
UserName=get_alt_user_id())
|
|
assert response['ResponseMetadata']['HTTPStatusCode'] == 200
|
|
|
|
e = assert_raises(ClientError, s3_client.list_buckets, Bucket=bucket)
|
|
status, error_code = _get_status_and_error_code(e.response)
|
|
assert status == 403
|
|
assert error_code == 'AccessDenied'
|
|
e = assert_raises(ClientError, s3_client.delete_bucket, Bucket=bucket)
|
|
status, error_code = _get_status_and_error_code(e.response)
|
|
assert status == 403
|
|
assert error_code == 'AccessDenied'
|
|
response = client.delete_user_policy(PolicyName='DenyAccessPolicy',
|
|
UserName=get_alt_user_id())
|
|
assert response['ResponseMetadata']['HTTPStatusCode'] == 200
|
|
response = s3_client.delete_bucket(Bucket=bucket)
|
|
assert response['ResponseMetadata']['HTTPStatusCode'] == 204
|
|
|
|
|
|
@pytest.mark.user_policy
|
|
@pytest.mark.test_of_iam
|
|
def test_allow_object_actions_in_user_policy():
|
|
client = get_iam_client()
|
|
s3_client_alt = get_alt_client()
|
|
s3_client_iam = get_iam_s3client()
|
|
bucket = get_new_bucket(client=s3_client_iam)
|
|
|
|
policy_document_allow = json.dumps(
|
|
{"Version": "2012-10-17",
|
|
"Statement": {
|
|
"Effect": "Allow",
|
|
"Action": ["s3:PutObject", "s3:GetObject", "s3:DeleteObject"],
|
|
"Resource": f"arn:aws:s3:::{bucket}/*"}}
|
|
)
|
|
response = client.put_user_policy(PolicyDocument=policy_document_allow,
|
|
PolicyName='AllowAccessPolicy', UserName=get_alt_user_id())
|
|
assert response['ResponseMetadata']['HTTPStatusCode'] == 200
|
|
|
|
s3_client_alt.put_object(Bucket=bucket, Key='foo', Body='bar')
|
|
response = s3_client_alt.get_object(Bucket=bucket, Key='foo')
|
|
body = response['Body'].read()
|
|
if type(body) is bytes:
|
|
body = body.decode()
|
|
assert body == "bar"
|
|
response = s3_client_alt.delete_object(Bucket=bucket, Key='foo')
|
|
assert response['ResponseMetadata']['HTTPStatusCode'] == 204
|
|
|
|
e = assert_raises(ClientError, s3_client_iam.get_object, Bucket=bucket, Key='foo')
|
|
status, error_code = _get_status_and_error_code(e.response)
|
|
assert status == 404
|
|
assert error_code == 'NoSuchKey'
|
|
response = s3_client_iam.delete_bucket(Bucket=bucket)
|
|
assert response['ResponseMetadata']['HTTPStatusCode'] == 204
|
|
response = client.delete_user_policy(PolicyName='AllowAccessPolicy',
|
|
UserName=get_alt_user_id())
|
|
assert response['ResponseMetadata']['HTTPStatusCode'] == 200
|
|
|
|
|
|
@pytest.mark.user_policy
|
|
@pytest.mark.test_of_iam
|
|
@pytest.mark.fails_on_dbstore
|
|
def test_deny_object_actions_in_user_policy():
|
|
client = get_iam_client()
|
|
s3_client_alt = get_alt_client()
|
|
bucket = get_new_bucket(client=s3_client_alt)
|
|
s3_client_alt.put_object(Bucket=bucket, Key='foo', Body='bar')
|
|
|
|
policy_document_deny = json.dumps(
|
|
{"Version": "2012-10-17",
|
|
"Statement": [{
|
|
"Effect": "Deny",
|
|
"Action": ["s3:PutObject", "s3:GetObject", "s3:DeleteObject"],
|
|
"Resource": f"arn:aws:s3:::{bucket}/*"}, {
|
|
"Effect": "Allow",
|
|
"Action": ["s3:DeleteBucket"],
|
|
"Resource": f"arn:aws:s3:::{bucket}"}]}
|
|
)
|
|
client.put_user_policy(PolicyDocument=policy_document_deny, PolicyName='DenyAccessPolicy',
|
|
UserName=get_alt_user_id())
|
|
|
|
e = assert_raises(ClientError, s3_client_alt.put_object, Bucket=bucket, Key='foo')
|
|
status, error_code = _get_status_and_error_code(e.response)
|
|
assert status == 403
|
|
assert error_code == 'AccessDenied'
|
|
e = assert_raises(ClientError, s3_client_alt.get_object, Bucket=bucket, Key='foo')
|
|
status, error_code = _get_status_and_error_code(e.response)
|
|
assert status == 403
|
|
assert error_code == 'AccessDenied'
|
|
e = assert_raises(ClientError, s3_client_alt.delete_object, Bucket=bucket, Key='foo')
|
|
status, error_code = _get_status_and_error_code(e.response)
|
|
assert status == 403
|
|
assert error_code == 'AccessDenied'
|
|
|
|
response = client.delete_user_policy(PolicyName='DenyAccessPolicy',
|
|
UserName=get_alt_user_id())
|
|
assert response['ResponseMetadata']['HTTPStatusCode'] == 200
|
|
|
|
|
|
@pytest.mark.user_policy
|
|
@pytest.mark.test_of_iam
|
|
def test_allow_multipart_actions_in_user_policy():
|
|
client = get_iam_client()
|
|
s3_client_alt = get_alt_client()
|
|
s3_client_iam = get_iam_s3client()
|
|
bucket = get_new_bucket(client=s3_client_iam)
|
|
|
|
policy_document_allow = json.dumps(
|
|
{"Version": "2012-10-17",
|
|
"Statement": {
|
|
"Effect": "Allow",
|
|
"Action": ["s3:ListBucketMultipartUploads", "s3:AbortMultipartUpload"],
|
|
"Resource": "arn:aws:s3:::*"}}
|
|
)
|
|
response = client.put_user_policy(PolicyDocument=policy_document_allow,
|
|
PolicyName='AllowAccessPolicy', UserName=get_alt_user_id())
|
|
assert response['ResponseMetadata']['HTTPStatusCode'] == 200
|
|
key = "mymultipart"
|
|
mb = 1024 * 1024
|
|
|
|
(upload_id, _, _) = _multipart_upload(client=s3_client_iam, bucket_name=bucket, key=key,
|
|
size=5 * mb)
|
|
response = s3_client_alt.list_multipart_uploads(Bucket=bucket)
|
|
assert response['ResponseMetadata']['HTTPStatusCode'] == 200
|
|
response = s3_client_alt.abort_multipart_upload(Bucket=bucket, Key=key, UploadId=upload_id)
|
|
assert response['ResponseMetadata']['HTTPStatusCode'] == 204
|
|
|
|
response = s3_client_iam.delete_bucket(Bucket=bucket)
|
|
assert response['ResponseMetadata']['HTTPStatusCode'] == 204
|
|
response = client.delete_user_policy(PolicyName='AllowAccessPolicy',
|
|
UserName=get_alt_user_id())
|
|
assert response['ResponseMetadata']['HTTPStatusCode'] == 200
|
|
|
|
|
|
@pytest.mark.user_policy
|
|
@pytest.mark.test_of_iam
|
|
@pytest.mark.fails_on_dbstore
|
|
def test_deny_multipart_actions_in_user_policy():
|
|
client = get_iam_client()
|
|
s3_client = get_alt_client()
|
|
bucket = get_new_bucket(client=s3_client)
|
|
|
|
policy_document_deny = json.dumps(
|
|
{"Version": "2012-10-17",
|
|
"Statement": {
|
|
"Effect": "Deny",
|
|
"Action": ["s3:ListBucketMultipartUploads", "s3:AbortMultipartUpload"],
|
|
"Resource": "arn:aws:s3:::*"}}
|
|
)
|
|
response = client.put_user_policy(PolicyDocument=policy_document_deny,
|
|
PolicyName='DenyAccessPolicy',
|
|
UserName=get_alt_user_id())
|
|
assert response['ResponseMetadata']['HTTPStatusCode'] == 200
|
|
key = "mymultipart"
|
|
mb = 1024 * 1024
|
|
|
|
(upload_id, _, _) = _multipart_upload(client=s3_client, bucket_name=bucket, key=key,
|
|
size=5 * mb)
|
|
|
|
e = assert_raises(ClientError, s3_client.list_multipart_uploads, Bucket=bucket)
|
|
status, error_code = _get_status_and_error_code(e.response)
|
|
assert status == 403
|
|
assert error_code == 'AccessDenied'
|
|
|
|
e = assert_raises(ClientError, s3_client.abort_multipart_upload, Bucket=bucket,
|
|
Key=key, UploadId=upload_id)
|
|
status, error_code = _get_status_and_error_code(e.response)
|
|
assert status == 403
|
|
assert error_code == 'AccessDenied'
|
|
|
|
response = s3_client.delete_bucket(Bucket=bucket)
|
|
assert response['ResponseMetadata']['HTTPStatusCode'] == 204
|
|
response = client.delete_user_policy(PolicyName='DenyAccessPolicy',
|
|
UserName=get_alt_user_id())
|
|
assert response['ResponseMetadata']['HTTPStatusCode'] == 200
|
|
|
|
|
|
@pytest.mark.user_policy
|
|
@pytest.mark.test_of_iam
|
|
@pytest.mark.fails_on_dbstore
|
|
def test_allow_tagging_actions_in_user_policy():
|
|
client = get_iam_client()
|
|
s3_client_alt = get_alt_client()
|
|
s3_client_iam = get_iam_s3client()
|
|
bucket = get_new_bucket(client=s3_client_iam)
|
|
|
|
policy_document_allow = json.dumps(
|
|
{"Version": "2012-10-17",
|
|
"Statement": {
|
|
"Effect": "Allow",
|
|
"Action": ["s3:PutBucketTagging", "s3:GetBucketTagging",
|
|
"s3:PutObjectTagging", "s3:GetObjectTagging"],
|
|
"Resource": f"arn:aws:s3:::*"}}
|
|
)
|
|
client.put_user_policy(PolicyDocument=policy_document_allow, PolicyName='AllowAccessPolicy',
|
|
UserName=get_alt_user_id())
|
|
tags = {'TagSet': [{'Key': 'Hello', 'Value': 'World'}, ]}
|
|
|
|
response = s3_client_alt.put_bucket_tagging(Bucket=bucket, Tagging=tags)
|
|
assert response['ResponseMetadata']['HTTPStatusCode'] == 200
|
|
response = s3_client_alt.get_bucket_tagging(Bucket=bucket)
|
|
assert response['ResponseMetadata']['HTTPStatusCode'] == 200
|
|
assert response['TagSet'][0]['Key'] == 'Hello'
|
|
assert response['TagSet'][0]['Value'] == 'World'
|
|
|
|
obj_key = 'obj'
|
|
response = s3_client_iam.put_object(Bucket=bucket, Key=obj_key, Body='obj_body')
|
|
assert response['ResponseMetadata']['HTTPStatusCode'] == 200
|
|
response = s3_client_alt.put_object_tagging(Bucket=bucket, Key=obj_key, Tagging=tags)
|
|
assert response['ResponseMetadata']['HTTPStatusCode'] == 200
|
|
response = s3_client_alt.get_object_tagging(Bucket=bucket, Key=obj_key)
|
|
assert response['ResponseMetadata']['HTTPStatusCode'] == 200
|
|
assert response['TagSet'] == tags['TagSet']
|
|
|
|
response = s3_client_iam.delete_object(Bucket=bucket, Key=obj_key)
|
|
assert response['ResponseMetadata']['HTTPStatusCode'] == 204
|
|
response = s3_client_iam.delete_bucket(Bucket=bucket)
|
|
assert response['ResponseMetadata']['HTTPStatusCode'] == 204
|
|
response = client.delete_user_policy(PolicyName='AllowAccessPolicy',
|
|
UserName=get_alt_user_id())
|
|
assert response['ResponseMetadata']['HTTPStatusCode'] == 200
|
|
|
|
|
|
@pytest.mark.user_policy
|
|
@pytest.mark.test_of_iam
|
|
@pytest.mark.fails_on_dbstore
|
|
def test_deny_tagging_actions_in_user_policy():
|
|
client = get_iam_client()
|
|
s3_client = get_alt_client()
|
|
bucket = get_new_bucket(client=s3_client)
|
|
|
|
policy_document_deny = json.dumps(
|
|
{"Version": "2012-10-17",
|
|
"Statement": {
|
|
"Effect": "Deny",
|
|
"Action": ["s3:PutBucketTagging", "s3:GetBucketTagging",
|
|
"s3:PutObjectTagging", "s3:DeleteObjectTagging"],
|
|
"Resource": "arn:aws:s3:::*"}}
|
|
)
|
|
client.put_user_policy(PolicyDocument=policy_document_deny, PolicyName='DenyAccessPolicy',
|
|
UserName=get_alt_user_id())
|
|
tags = {'TagSet': [{'Key': 'Hello', 'Value': 'World'}, ]}
|
|
|
|
e = assert_raises(ClientError, s3_client.put_bucket_tagging, Bucket=bucket, Tagging=tags)
|
|
status, error_code = _get_status_and_error_code(e.response)
|
|
assert status == 403
|
|
assert error_code == 'AccessDenied'
|
|
e = assert_raises(ClientError, s3_client.get_bucket_tagging, Bucket=bucket)
|
|
status, error_code = _get_status_and_error_code(e.response)
|
|
assert status == 403
|
|
assert error_code == 'AccessDenied'
|
|
|
|
obj_key = 'obj'
|
|
response = s3_client.put_object(Bucket=bucket, Key=obj_key, Body='obj_body')
|
|
assert response['ResponseMetadata']['HTTPStatusCode'] == 200
|
|
|
|
e = assert_raises(ClientError, s3_client.put_object_tagging, Bucket=bucket, Key=obj_key,
|
|
Tagging=tags)
|
|
status, error_code = _get_status_and_error_code(e.response)
|
|
assert status == 403
|
|
assert error_code == 'AccessDenied'
|
|
e = assert_raises(ClientError, s3_client.delete_object_tagging, Bucket=bucket, Key=obj_key)
|
|
status, error_code = _get_status_and_error_code(e.response)
|
|
assert status == 403
|
|
assert error_code == 'AccessDenied'
|
|
|
|
response = s3_client.delete_object(Bucket=bucket, Key=obj_key)
|
|
assert response['ResponseMetadata']['HTTPStatusCode'] == 204
|
|
response = s3_client.delete_bucket(Bucket=bucket)
|
|
assert response['ResponseMetadata']['HTTPStatusCode'] == 204
|
|
response = client.delete_user_policy(PolicyName='DenyAccessPolicy',
|
|
UserName=get_alt_user_id())
|
|
assert response['ResponseMetadata']['HTTPStatusCode'] == 200
|
|
|
|
|
|
@pytest.mark.user_policy
|
|
@pytest.mark.test_of_iam
|
|
@pytest.mark.fails_on_dbstore
|
|
def test_verify_conflicting_user_policy_statements():
|
|
s3client = get_alt_client()
|
|
bucket = get_new_bucket(client=s3client)
|
|
policy_document = json.dumps(
|
|
{"Version": "2012-10-17",
|
|
"Statement": [
|
|
{"Sid": "98AB54CG",
|
|
"Effect": "Allow",
|
|
"Action": "s3:ListBucket",
|
|
"Resource": f"arn:aws:s3:::{bucket}"},
|
|
{"Sid": "98AB54CA",
|
|
"Effect": "Deny",
|
|
"Action": "s3:ListBucket",
|
|
"Resource": f"arn:aws:s3:::{bucket}"}
|
|
]}
|
|
)
|
|
client = get_iam_client()
|
|
response = client.put_user_policy(PolicyDocument=policy_document, PolicyName='DenyAccessPolicy',
|
|
UserName=get_alt_user_id())
|
|
assert response['ResponseMetadata']['HTTPStatusCode'] == 200
|
|
e = assert_raises(ClientError, s3client.list_objects, Bucket=bucket)
|
|
status, error_code = _get_status_and_error_code(e.response)
|
|
assert status == 403
|
|
assert error_code == 'AccessDenied'
|
|
response = client.delete_user_policy(PolicyName='DenyAccessPolicy',
|
|
UserName=get_alt_user_id())
|
|
assert response['ResponseMetadata']['HTTPStatusCode'] == 200
|
|
|
|
|
|
@pytest.mark.user_policy
|
|
@pytest.mark.test_of_iam
|
|
@pytest.mark.fails_on_dbstore
|
|
def test_verify_conflicting_user_policies():
|
|
s3client = get_alt_client()
|
|
bucket = get_new_bucket(client=s3client)
|
|
policy_allow = json.dumps(
|
|
{"Version": "2012-10-17",
|
|
"Statement": {"Sid": "98AB54CG",
|
|
"Effect": "Allow",
|
|
"Action": "s3:ListBucket",
|
|
"Resource": f"arn:aws:s3:::{bucket}"}}
|
|
)
|
|
policy_deny = json.dumps(
|
|
{"Version": "2012-10-17",
|
|
"Statement": {"Sid": "98AB54CGZ",
|
|
"Effect": "Deny",
|
|
"Action": "s3:ListBucket",
|
|
"Resource": f"arn:aws:s3:::{bucket}"}}
|
|
)
|
|
client = get_iam_client()
|
|
response = client.put_user_policy(PolicyDocument=policy_allow, PolicyName='AllowAccessPolicy',
|
|
UserName=get_alt_user_id())
|
|
assert response['ResponseMetadata']['HTTPStatusCode'] == 200
|
|
response = client.put_user_policy(PolicyDocument=policy_deny, PolicyName='DenyAccessPolicy',
|
|
UserName=get_alt_user_id())
|
|
assert response['ResponseMetadata']['HTTPStatusCode'] == 200
|
|
e = assert_raises(ClientError, s3client.list_objects, Bucket=bucket)
|
|
status, error_code = _get_status_and_error_code(e.response)
|
|
assert status == 403
|
|
assert error_code == 'AccessDenied'
|
|
response = client.delete_user_policy(PolicyName='AllowAccessPolicy',
|
|
UserName=get_alt_user_id())
|
|
assert response['ResponseMetadata']['HTTPStatusCode'] == 200
|
|
response = client.delete_user_policy(PolicyName='DenyAccessPolicy',
|
|
UserName=get_alt_user_id())
|
|
assert response['ResponseMetadata']['HTTPStatusCode'] == 200
|
|
|
|
|
|
@pytest.mark.user_policy
|
|
@pytest.mark.test_of_iam
|
|
def test_verify_allow_iam_actions():
|
|
policy1 = json.dumps(
|
|
{"Version": "2012-10-17",
|
|
"Statement": {"Sid": "98AB54CGA",
|
|
"Effect": "Allow",
|
|
"Action": ["iam:PutUserPolicy", "iam:GetUserPolicy",
|
|
"iam:ListUserPolicies", "iam:DeleteUserPolicy"],
|
|
"Resource": f"arn:aws:iam:::user/{get_alt_user_id()}"}}
|
|
)
|
|
client1 = get_iam_client()
|
|
iam_client_alt = get_alt_iam_client()
|
|
|
|
response = client1.put_user_policy(PolicyDocument=policy1, PolicyName='AllowAccessPolicy',
|
|
UserName=get_alt_user_id())
|
|
assert response['ResponseMetadata']['HTTPStatusCode'] == 200
|
|
response = iam_client_alt.get_user_policy(PolicyName='AllowAccessPolicy',
|
|
UserName=get_alt_user_id())
|
|
assert response['ResponseMetadata']['HTTPStatusCode'] == 200
|
|
response = iam_client_alt.list_user_policies(UserName=get_alt_user_id())
|
|
assert response['ResponseMetadata']['HTTPStatusCode'] == 200
|
|
response = iam_client_alt.delete_user_policy(PolicyName='AllowAccessPolicy',
|
|
UserName=get_alt_user_id())
|
|
assert response['ResponseMetadata']['HTTPStatusCode'] == 200
|