[#312] Add new policy test

Signed-off-by: Yulia Kovshova <y.kovshova@yadro.com>
This commit is contained in:
Юлия Ковшова 2022-11-14 15:04:15 +03:00 committed by Julia Kovshova
parent a0da15e60b
commit 2b08a932ac
6 changed files with 302 additions and 4 deletions

View file

@ -1,3 +1,4 @@
import json
import logging
import uuid
from enum import Enum
@ -234,3 +235,81 @@ def get_object_lock_configuration(s3_client, bucket: str):
f'Error Message: {err.response["Error"]["Message"]}\n'
f'Http status code: {err.response["ResponseMetadata"]["HTTPStatusCode"]}'
) from err
def get_bucket_policy(s3_client, bucket: str):
params = {"Bucket": bucket}
try:
response = s3_client.get_bucket_policy(**params)
log_command_execution("S3 get_object_lock_configuration result", response)
return response.get("ObjectLockConfiguration")
except ClientError as err:
raise Exception(
f'Error Message: {err.response["Error"]["Message"]}\n'
f'Http status code: {err.response["ResponseMetadata"]["HTTPStatusCode"]}'
) from err
def put_bucket_policy(s3_client, bucket: str, policy: dict):
params = {"Bucket": bucket, "Policy": json.dumps(policy)}
try:
response = s3_client.put_bucket_policy(**params)
log_command_execution("S3 put_bucket_policy result", response)
return response
except ClientError as err:
raise Exception(
f'Error Message: {err.response["Error"]["Message"]}\n'
f'Http status code: {err.response["ResponseMetadata"]["HTTPStatusCode"]}'
) from err
def get_bucket_cors(s3_client, bucket: str):
params = {"Bucket": bucket}
try:
response = s3_client.get_bucket_cors(**params)
log_command_execution("S3 get_bucket_cors result", response)
return response.get("CORSRules")
except ClientError as err:
raise Exception(
f'Error Message: {err.response["Error"]["Message"]}\n'
f'Http status code: {err.response["ResponseMetadata"]["HTTPStatusCode"]}'
) from err
def get_bucket_location(s3_client, bucket: str):
params = {"Bucket": bucket}
try:
response = s3_client.get_bucket_location(**params)
log_command_execution("S3 get_bucket_location result", response)
return response.get("LocationConstraint")
except ClientError as err:
raise Exception(
f'Error Message: {err.response["Error"]["Message"]}\n'
f'Http status code: {err.response["ResponseMetadata"]["HTTPStatusCode"]}'
) from err
def put_bucket_cors(s3_client, bucket: str, cors_configuration: dict):
params = {"Bucket": bucket, "CORSConfiguration": cors_configuration}
try:
response = s3_client.put_bucket_cors(**params)
log_command_execution("S3 put_bucket_cors result", response)
return response
except ClientError as err:
raise Exception(
f'Error Message: {err.response["Error"]["Message"]}\n'
f'Http status code: {err.response["ResponseMetadata"]["HTTPStatusCode"]}'
) from err
def delete_bucket_cors(s3_client, bucket: str):
params = {"Bucket": bucket}
try:
response = s3_client.delete_bucket_cors(**params)
log_command_execution("S3 delete_bucket_cors result", response)
return response.get("ObjectLockConfiguration")
except ClientError as err:
raise Exception(
f'Error Message: {err.response["Error"]["Message"]}\n'
f'Http status code: {err.response["ResponseMetadata"]["HTTPStatusCode"]}'
) from err