frostfs-testlib/src/frostfs_testlib/s3/boto3_client.py
Kirill Sosnovskikh 07ac43f498
Some checks failed
DCO action / DCO (pull_request) Has been cancelled
[#334] Automation of PATCH method in S3
Signed-off-by: Kirill Sosnovskikh <k.sosnovskikh@yadro.com>
2024-12-05 23:26:25 +03:00

1347 lines
51 KiB
Python

import json
import logging
import os
from collections.abc import Callable
from datetime import datetime
from time import sleep
from typing import Literal, Optional, Union
import boto3
import urllib3
from botocore.config import Config
from botocore.exceptions import ClientError
from mypy_boto3_s3 import S3Client
from frostfs_testlib import reporter
from frostfs_testlib.resources.common import ASSETS_DIR, MAX_REQUEST_ATTEMPTS, RETRY_MODE, S3_SYNC_WAIT_TIME
from frostfs_testlib.s3.interfaces import S3ClientWrapper, VersioningStatus, _make_objs_dict
from frostfs_testlib.utils import string_utils
# TODO: Refactor this code to use shell instead of _cmd_run
from frostfs_testlib.utils.cli_utils import log_command_execution
from frostfs_testlib.utils.file_utils import TestFile
logger = logging.getLogger("NeoLogger")
# Disable warnings on self-signed certificate which the
# boto library produces on requests to S3-gate in dev-env
urllib3.disable_warnings()
class Boto3ClientWrapper(S3ClientWrapper):
__repr_name__: str = "Boto3 client"
@reporter.step("Configure S3 client (boto3)")
def __init__(
self, access_key_id: str, secret_access_key: str, s3gate_endpoint: str, profile: str = "default", region: str = "us-east-1"
) -> None:
self.boto3_client: S3Client = None
self.s3gate_endpoint: str = ""
self.boto3_iam_client: S3Client = None
self.iam_endpoint: str = ""
self.boto3_sts_client: S3Client = None
self.access_key_id: str = access_key_id
self.secret_access_key: str = secret_access_key
self.profile = profile
self.region = region
self.session = boto3.Session()
self.config = Config(
retries={
"max_attempts": MAX_REQUEST_ATTEMPTS,
"mode": RETRY_MODE,
}
)
self.set_endpoint(s3gate_endpoint)
@reporter.step("Set endpoint S3 to {s3gate_endpoint}")
def set_endpoint(self, s3gate_endpoint: str):
if self.s3gate_endpoint == s3gate_endpoint:
return
self.s3gate_endpoint = s3gate_endpoint
self.boto3_client: S3Client = self.session.client(
service_name="s3",
aws_access_key_id=self.access_key_id,
aws_secret_access_key=self.secret_access_key,
region_name=self.region,
config=self.config,
endpoint_url=s3gate_endpoint,
verify=False,
)
@reporter.step("Set endpoint IAM to {iam_endpoint}")
def set_iam_endpoint(self, iam_endpoint: str):
if self.iam_endpoint == iam_endpoint:
return
self.iam_endpoint = iam_endpoint
self.boto3_iam_client = self.session.client(
service_name="iam",
aws_access_key_id=self.access_key_id,
aws_secret_access_key=self.secret_access_key,
endpoint_url=self.iam_endpoint,
verify=False,
)
# since the STS does not have an enpoint, IAM is used
self.boto3_sts_client = self.session.client(
service_name="sts",
aws_access_key_id=self.access_key_id,
aws_secret_access_key=self.secret_access_key,
endpoint_url=iam_endpoint,
verify=False,
)
def _to_s3_param(self, param: str) -> str:
replacement_map = {
"Acl": "ACL",
"Cors": "CORS",
"_": "",
}
result = param.title()
for find, replace in replacement_map.items():
result = result.replace(find, replace)
return result
def _convert_to_s3_params(self, scope: dict, exclude: Optional[list[str]] = None) -> dict:
exclude = ["self", "cls"] if not exclude else exclude + ["self", "cls"]
return {self._to_s3_param(param): value for param, value in scope.items() if param not in exclude and value is not None}
def _exec_request(self, method: Callable, params: Optional[dict] = None, **kwargs):
if not params:
params = {}
try:
result = method(**params)
except ClientError as err:
log_command_execution(method.__name__, err.response, params, **kwargs)
raise
log_command_execution(method.__name__, result, params, **kwargs)
return result
# BUCKET METHODS #
@reporter.step("Create bucket S3")
def create_bucket(
self,
bucket: Optional[str] = None,
object_lock_enabled_for_bucket: Optional[bool] = None,
acl: Optional[str] = None,
grant_write: Optional[str] = None,
grant_read: Optional[str] = None,
grant_full_control: Optional[str] = None,
location_constraint: Optional[str] = None,
) -> str:
if bucket is None:
bucket = string_utils.unique_name("bucket-")
params = {"Bucket": bucket}
if object_lock_enabled_for_bucket is not None:
params.update({"ObjectLockEnabledForBucket": object_lock_enabled_for_bucket})
if acl is not None:
params.update({"ACL": acl})
elif grant_write or grant_read or grant_full_control:
if grant_write:
params.update({"GrantWrite": grant_write})
elif grant_read:
params.update({"GrantRead": grant_read})
elif grant_full_control:
params.update({"GrantFullControl": grant_full_control})
if location_constraint:
params.update({"CreateBucketConfiguration": {"LocationConstraint": location_constraint}})
self._exec_request(self.boto3_client.create_bucket, params, endpoint=self.s3gate_endpoint, profile=self.profile)
return bucket
@reporter.step("List buckets S3")
def list_buckets(self) -> list[str]:
response = self._exec_request(
self.boto3_client.list_buckets,
endpoint=self.s3gate_endpoint,
profile=self.profile,
)
return [bucket["Name"] for bucket in response["Buckets"]]
@reporter.step("Delete bucket S3")
def delete_bucket(self, bucket: str) -> None:
self._exec_request(
self.boto3_client.delete_bucket,
params={"Bucket": bucket},
endpoint=self.s3gate_endpoint,
profile=self.profile,
)
@reporter.step("Head bucket S3")
def head_bucket(self, bucket: str) -> None:
self._exec_request(
self.boto3_client.head_bucket,
params={"Bucket": bucket},
endpoint=self.s3gate_endpoint,
profile=self.profile,
)
@reporter.step("Put bucket versioning status")
def put_bucket_versioning(self, bucket: str, status: VersioningStatus) -> None:
params = {"Bucket": bucket, "VersioningConfiguration": {"Status": status.value}}
self._exec_request(
self.boto3_client.put_bucket_versioning,
params,
endpoint=self.s3gate_endpoint,
profile=self.profile,
)
@reporter.step("Get bucket versioning status")
def get_bucket_versioning_status(self, bucket: str) -> Literal["Enabled", "Suspended"]:
response = self._exec_request(
self.boto3_client.get_bucket_versioning,
params={"Bucket": bucket},
endpoint=self.s3gate_endpoint,
profile=self.profile,
)
return response.get("Status")
@reporter.step("Put bucket tagging")
def put_bucket_tagging(self, bucket: str, tags: list) -> None:
tags = [{"Key": tag_key, "Value": tag_value} for tag_key, tag_value in tags]
tagging = {"TagSet": tags}
params = self._convert_to_s3_params(locals(), exclude=["tags"])
self._exec_request(
self.boto3_client.put_bucket_tagging,
params,
endpoint=self.s3gate_endpoint,
profile=self.profile,
)
@reporter.step("Get bucket tagging")
def get_bucket_tagging(self, bucket: str) -> list:
response = self._exec_request(
self.boto3_client.get_bucket_tagging,
params={"Bucket": bucket},
endpoint=self.s3gate_endpoint,
profile=self.profile,
)
return response.get("TagSet")
@reporter.step("Get bucket acl")
def get_bucket_acl(self, bucket: str) -> list:
response = self._exec_request(
self.boto3_client.get_bucket_acl,
params={"Bucket": bucket},
endpoint=self.s3gate_endpoint,
profile=self.profile,
)
return response.get("Grants")
@reporter.step("Delete bucket tagging")
def delete_bucket_tagging(self, bucket: str) -> None:
self._exec_request(
self.boto3_client.delete_bucket_tagging,
params={"Bucket": bucket},
endpoint=self.s3gate_endpoint,
profile=self.profile,
)
@reporter.step("Put bucket ACL")
def put_bucket_acl(
self,
bucket: str,
acl: Optional[str] = None,
grant_write: Optional[str] = None,
grant_read: Optional[str] = None,
) -> None:
params = self._convert_to_s3_params(locals())
self._exec_request(
self.boto3_client.put_bucket_acl,
params,
endpoint=self.s3gate_endpoint,
profile=self.profile,
)
@reporter.step("Put object lock configuration")
def put_object_lock_configuration(self, bucket: str, configuration: dict) -> dict:
params = {"Bucket": bucket, "ObjectLockConfiguration": configuration}
return self._exec_request(
self.boto3_client.put_object_lock_configuration,
params,
endpoint=self.s3gate_endpoint,
profile=self.profile,
)
@reporter.step("Get object lock configuration")
def get_object_lock_configuration(self, bucket: str) -> dict:
response = self._exec_request(
self.boto3_client.get_object_lock_configuration,
params={"Bucket": bucket},
endpoint=self.s3gate_endpoint,
profile=self.profile,
)
return response.get("ObjectLockConfiguration")
@reporter.step("Get bucket policy")
def get_bucket_policy(self, bucket: str) -> str:
response = self._exec_request(
self.boto3_client.get_bucket_policy,
params={"Bucket": bucket},
endpoint=self.s3gate_endpoint,
profile=self.profile,
)
return response.get("Policy")
@reporter.step("Delete bucket policy")
def delete_bucket_policy(self, bucket: str) -> str:
return self._exec_request(
self.boto3_client.delete_bucket_policy,
params={"Bucket": bucket},
endpoint=self.s3gate_endpoint,
profile=self.profile,
)
@reporter.step("Put bucket policy")
def put_bucket_policy(self, bucket: str, policy: dict) -> None:
params = {"Bucket": bucket, "Policy": json.dumps(policy)}
return self._exec_request(
self.boto3_client.put_bucket_policy,
params,
# Overriding option for AWS CLI
policy=policy,
endpoint=self.s3gate_endpoint,
profile=self.profile,
)
@reporter.step("Get bucket cors")
def get_bucket_cors(self, bucket: str) -> dict:
response = self._exec_request(
self.boto3_client.get_bucket_cors,
params={"Bucket": bucket},
endpoint=self.s3gate_endpoint,
profile=self.profile,
)
return response.get("CORSRules")
@reporter.step("Get bucket location")
def get_bucket_location(self, bucket: str) -> str:
response = self._exec_request(
self.boto3_client.get_bucket_location,
params={"Bucket": bucket},
endpoint=self.s3gate_endpoint,
profile=self.profile,
)
return response.get("LocationConstraint")
@reporter.step("Put bucket cors")
def put_bucket_cors(self, bucket: str, cors_configuration: dict) -> None:
params = self._convert_to_s3_params(locals())
return self._exec_request(
self.boto3_client.put_bucket_cors,
params,
endpoint=self.s3gate_endpoint,
profile=self.profile,
)
@reporter.step("Delete bucket cors")
def delete_bucket_cors(self, bucket: str) -> None:
self._exec_request(
self.boto3_client.delete_bucket_cors,
params={"Bucket": bucket},
endpoint=self.s3gate_endpoint,
profile=self.profile,
)
@reporter.step("Put bucket lifecycle configuration")
def put_bucket_lifecycle_configuration(self, bucket: str, lifecycle_configuration: dict, dumped_configuration: str) -> dict:
params = self._convert_to_s3_params(locals(), exclude=["dumped_configuration"])
return self._exec_request(
self.boto3_client.put_bucket_lifecycle_configuration,
params,
endpoint=self.s3gate_endpoint,
profile=self.profile,
)
@reporter.step("Get bucket lifecycle configuration")
def get_bucket_lifecycle_configuration(self, bucket: str) -> dict:
response = self._exec_request(
self.boto3_client.get_bucket_lifecycle_configuration,
params={"Bucket": bucket},
endpoint=self.s3gate_endpoint,
profile=self.profile,
)
return {"Rules": response.get("Rules")}
@reporter.step("Delete bucket lifecycle configuration")
def delete_bucket_lifecycle(self, bucket: str) -> dict:
return self._exec_request(
self.boto3_client.delete_bucket_lifecycle,
params={"Bucket": bucket},
endpoint=self.s3gate_endpoint,
profile=self.profile,
)
# END OF BUCKET METHODS #
# OBJECT METHODS #
@reporter.step("List objects S3 v2")
def list_objects_v2(self, bucket: str, full_output: bool = False) -> Union[dict, list[str]]:
response = self._exec_request(
self.boto3_client.list_objects_v2,
params={"Bucket": bucket},
endpoint=self.s3gate_endpoint,
profile=self.profile,
)
obj_list = [obj["Key"] for obj in response.get("Contents", [])]
logger.info(f"Found s3 objects: {obj_list}")
return response if full_output else obj_list
@reporter.step("List objects S3")
def list_objects(
self, bucket: str, full_output: bool = False, page_size: Optional[int] = None, prefix: Optional[str] = None
) -> Union[dict, list[str]]:
params = {"Bucket": bucket}
if page_size:
params["MaxKeys"] = page_size
if prefix:
params["Prefix"] = prefix
response = self._exec_request(
self.boto3_client.list_objects,
params,
endpoint=self.s3gate_endpoint,
profile=self.profile,
)
obj_list = [obj["Key"] for obj in response.get("Contents", [])]
logger.info(f"Found s3 objects: {obj_list}")
return response if full_output else obj_list
@reporter.step("List objects versions S3")
def list_objects_versions(self, bucket: str, full_output: bool = False) -> dict:
response = self._exec_request(
self.boto3_client.list_object_versions,
params={"Bucket": bucket},
endpoint=self.s3gate_endpoint,
profile=self.profile,
)
return response if full_output else response.get("Versions", [])
@reporter.step("List objects delete markers S3")
def list_delete_markers(self, bucket: str, full_output: bool = False) -> list:
response = self._exec_request(
self.boto3_client.list_object_versions,
params={"Bucket": bucket},
endpoint=self.s3gate_endpoint,
profile=self.profile,
)
return response if full_output else response.get("DeleteMarkers", [])
@reporter.step("Put object S3")
def put_object(
self,
bucket: str,
filepath: str,
key: Optional[str] = None,
metadata: Optional[dict] = None,
tagging: Optional[str] = None,
acl: Optional[str] = None,
object_lock_mode: Optional[str] = None,
object_lock_retain_until_date: Optional[datetime] = None,
object_lock_legal_hold_status: Optional[str] = None,
grant_full_control: Optional[str] = None,
grant_read: Optional[str] = None,
) -> str:
if key is None:
key = os.path.basename(filepath)
with open(filepath, "rb") as put_file:
body = put_file.read()
params = self._convert_to_s3_params(locals(), exclude=["filepath", "put_file"])
response = self._exec_request(
self.boto3_client.put_object,
params,
body=filepath,
endpoint=self.s3gate_endpoint,
profile=self.profile,
)
return response.get("VersionId")
@reporter.step("Head object S3")
def head_object(self, bucket: str, key: str, version_id: Optional[str] = None) -> dict:
params = self._convert_to_s3_params(locals())
return self._exec_request(
self.boto3_client.head_object,
params,
endpoint=self.s3gate_endpoint,
profile=self.profile,
)
@reporter.step("Delete object S3")
def delete_object(self, bucket: str, key: str, version_id: Optional[str] = None) -> dict:
params = self._convert_to_s3_params(locals())
return self._exec_request(
self.boto3_client.delete_object,
params,
endpoint=self.s3gate_endpoint,
profile=self.profile,
)
@reporter.step("Delete objects S3")
def delete_objects(self, bucket: str, keys: list[str]) -> dict:
params = {"Bucket": bucket, "Delete": _make_objs_dict(keys)}
response = self._exec_request(
self.boto3_client.delete_objects,
params,
endpoint=self.s3gate_endpoint,
profile=self.profile,
)
assert (
"Errors" not in response
), f'The following objects have not been deleted: {[err_info["Key"] for err_info in response["Errors"]]}.\nError Message: {response["Errors"]["Message"]}'
return response
@reporter.step("Delete object versions S3")
def delete_object_versions(self, bucket: str, object_versions: list) -> dict:
# Build deletion list in S3 format
delete_list = {
"Objects": [
{
"Key": object_version["Key"],
"VersionId": object_version["VersionId"],
}
for object_version in object_versions
]
}
params = {"Bucket": bucket, "Delete": delete_list}
return self._exec_request(
self.boto3_client.delete_objects,
params,
endpoint=self.s3gate_endpoint,
profile=self.profile,
)
@reporter.step("Delete object versions S3 without delete markers")
def delete_object_versions_without_dm(self, bucket: str, object_versions: list) -> None:
# Delete objects without creating delete markers
for object_version in object_versions:
params = {"Bucket": bucket, "Key": object_version["Key"], "VersionId": object_version["VersionId"]}
self._exec_request(
self.boto3_client.delete_object,
params,
endpoint=self.s3gate_endpoint,
profile=self.profile,
)
@reporter.step("Put object ACL")
def put_object_acl(
self,
bucket: str,
key: str,
acl: Optional[str] = None,
grant_write: Optional[str] = None,
grant_read: Optional[str] = None,
) -> list:
params = self._convert_to_s3_params(locals())
response = self._exec_request(
self.boto3_client.put_object_acl,
params,
endpoint=self.s3gate_endpoint,
profile=self.profile,
)
return response.get("Grants")
@reporter.step("Get object ACL")
def get_object_acl(self, bucket: str, key: str, version_id: Optional[str] = None) -> list:
params = self._convert_to_s3_params(locals())
response = self._exec_request(
self.boto3_client.get_object_acl,
params,
endpoint=self.s3gate_endpoint,
profile=self.profile,
)
return response.get("Grants")
@reporter.step("Copy object S3")
def copy_object(
self,
source_bucket: str,
source_key: str,
bucket: Optional[str] = None,
key: Optional[str] = None,
acl: Optional[str] = None,
metadata_directive: Optional[Literal["COPY", "REPLACE"]] = None,
metadata: Optional[dict] = None,
tagging_directive: Optional[Literal["COPY", "REPLACE"]] = None,
tagging: Optional[str] = None,
) -> str:
if bucket is None:
bucket = source_bucket
if key is None:
key = string_utils.unique_name("copy-object-")
copy_source = f"{source_bucket}/{source_key}"
params = self._convert_to_s3_params(locals(), exclude=["source_bucket", "source_key"])
self._exec_request(
self.boto3_client.copy_object,
params,
endpoint=self.s3gate_endpoint,
profile=self.profile,
)
return key
@reporter.step("Get object S3")
def get_object(
self,
bucket: str,
key: str,
version_id: Optional[str] = None,
object_range: Optional[tuple[int, int]] = None,
full_output: bool = False,
) -> dict | TestFile:
range_str = None
if object_range:
range_str = f"bytes={object_range[0]}-{object_range[1]}"
params = locals()
params.update({"Range": f"bytes={object_range[0]}-{object_range[1]}"} if object_range else {})
params = self._convert_to_s3_params(params, exclude=["object_range", "full_output", "range_str"])
response = self._exec_request(
self.boto3_client.get_object,
params,
endpoint=self.s3gate_endpoint,
profile=self.profile,
)
if full_output:
return response
test_file = TestFile(os.path.join(os.getcwd(), ASSETS_DIR, string_utils.unique_name("dl-object-")))
with open(test_file, "wb") as file:
chunk = response["Body"].read(1024)
while chunk:
file.write(chunk)
chunk = response["Body"].read(1024)
return test_file
@reporter.step("Create multipart upload S3")
def create_multipart_upload(self, bucket: str, key: str) -> str:
params = self._convert_to_s3_params(locals())
response = self._exec_request(
self.boto3_client.create_multipart_upload,
params,
endpoint=self.s3gate_endpoint,
profile=self.profile,
)
assert response.get("UploadId"), f"Expected UploadId in response:\n{response}"
return response["UploadId"]
@reporter.step("List multipart uploads S3")
def list_multipart_uploads(self, bucket: str) -> Optional[list[dict]]:
response = self._exec_request(
self.boto3_client.list_multipart_uploads,
params={"Bucket": bucket},
endpoint=self.s3gate_endpoint,
profile=self.profile,
)
return response.get("Uploads")
@reporter.step("Abort multipart upload S3")
def abort_multipart_upload(self, bucket: str, key: str, upload_id: str) -> None:
params = self._convert_to_s3_params(locals())
self._exec_request(
self.boto3_client.abort_multipart_upload,
params,
endpoint=self.s3gate_endpoint,
profile=self.profile,
)
@reporter.step("Upload part S3")
def upload_part(self, bucket: str, key: str, upload_id: str, part_num: int, filepath: str) -> str:
with open(filepath, "rb") as put_file:
body = put_file.read()
params = self._convert_to_s3_params(locals(), exclude=["put_file", "part_num", "filepath"])
params["PartNumber"] = part_num
response = self._exec_request(
self.boto3_client.upload_part,
params,
body=filepath,
endpoint=self.s3gate_endpoint,
profile=self.profile,
)
assert response.get("ETag"), f"Expected ETag in response:\n{response}"
return response["ETag"]
@reporter.step("Upload copy part S3")
def upload_part_copy(self, bucket: str, key: str, upload_id: str, part_num: int, copy_source: str) -> str:
params = self._convert_to_s3_params(locals(), exclude=["put_file", "part_num", "filepath"])
params["PartNumber"] = part_num
response = self._exec_request(
self.boto3_client.upload_part_copy,
params,
endpoint=self.s3gate_endpoint,
profile=self.profile,
)
assert response.get("CopyPartResult", []).get("ETag"), f"Expected ETag in response:\n{response}"
return response["CopyPartResult"]["ETag"]
@reporter.step("List parts S3")
def list_parts(self, bucket: str, key: str, upload_id: str) -> list[dict]:
params = self._convert_to_s3_params(locals())
response = self._exec_request(
self.boto3_client.list_parts,
params,
endpoint=self.s3gate_endpoint,
profile=self.profile,
)
assert response.get("Parts"), f"Expected Parts in response:\n{response}"
return response["Parts"]
@reporter.step("Complete multipart upload S3")
def complete_multipart_upload(self, bucket: str, key: str, upload_id: str, parts: list) -> dict:
parts = [{"ETag": etag, "PartNumber": part_num} for part_num, etag in parts]
params = self._convert_to_s3_params(locals(), exclude=["parts"])
params["MultipartUpload"] = {"Parts": parts}
return self._exec_request(
self.boto3_client.complete_multipart_upload,
params,
endpoint=self.s3gate_endpoint,
profile=self.profile,
)
@reporter.step("Put object retention")
def put_object_retention(
self,
bucket: str,
key: str,
retention: dict,
version_id: Optional[str] = None,
bypass_governance_retention: Optional[bool] = None,
) -> None:
params = self._convert_to_s3_params(locals())
self._exec_request(
self.boto3_client.put_object_retention,
params,
endpoint=self.s3gate_endpoint,
profile=self.profile,
)
@reporter.step("Put object legal hold")
def put_object_legal_hold(
self,
bucket: str,
key: str,
legal_hold_status: Literal["ON", "OFF"],
version_id: Optional[str] = None,
) -> None:
legal_hold = {"Status": legal_hold_status}
params = self._convert_to_s3_params(locals(), exclude=["legal_hold_status"])
self._exec_request(
self.boto3_client.put_object_legal_hold,
params,
endpoint=self.s3gate_endpoint,
profile=self.profile,
)
@reporter.step("Put object tagging")
def put_object_tagging(self, bucket: str, key: str, tags: list, version_id: Optional[str] = "") -> None:
tags = [{"Key": tag_key, "Value": tag_value} for tag_key, tag_value in tags]
tagging = {"TagSet": tags}
params = self._convert_to_s3_params(locals(), exclude=["tags"])
self._exec_request(
self.boto3_client.put_object_tagging,
params,
endpoint=self.s3gate_endpoint,
profile=self.profile,
)
@reporter.step("Get object tagging")
def get_object_tagging(self, bucket: str, key: str, version_id: Optional[str] = None) -> list:
params = self._convert_to_s3_params(locals())
response = self._exec_request(
self.boto3_client.get_object_tagging,
params,
endpoint=self.s3gate_endpoint,
profile=self.profile,
)
return response.get("TagSet")
@reporter.step("Delete object tagging")
def delete_object_tagging(self, bucket: str, key: str) -> None:
params = self._convert_to_s3_params(locals())
self._exec_request(
self.boto3_client.delete_object_tagging,
params,
endpoint=self.s3gate_endpoint,
profile=self.profile,
)
@reporter.step("Get object attributes")
def get_object_attributes(
self,
bucket: str,
key: str,
attributes: list[str],
version_id: Optional[str] = None,
max_parts: Optional[int] = None,
part_number: Optional[int] = None,
full_output: bool = True,
) -> dict:
logger.warning("Method get_object_attributes is not supported by boto3 client")
return {}
@reporter.step("Sync directory S3")
def sync(
self,
bucket: str,
dir_path: str,
acl: Optional[str] = None,
metadata: Optional[dict] = None,
) -> dict:
raise NotImplementedError("Sync is not supported for boto3 client")
@reporter.step("CP directory S3")
def cp(
self,
bucket: str,
dir_path: str,
acl: Optional[str] = None,
metadata: Optional[dict] = None,
) -> dict:
raise NotImplementedError("Cp is not supported for boto3 client")
# END OBJECT METHODS #
# IAM METHODS #
# Some methods don't have checks because boto3 is silent in some cases (delete, attach, etc.)
@reporter.step("Adds the specified user to the specified group")
def iam_add_user_to_group(self, user_name: str, group_name: str) -> dict:
params = self._convert_to_s3_params(locals())
return self._exec_request(
self.boto3_iam_client.add_user_to_group,
params,
endpoint=self.iam_endpoint,
profile=self.profile,
)
@reporter.step("Attaches the specified managed policy to the specified IAM group")
def iam_attach_group_policy(self, group_name: str, policy_arn: str) -> dict:
params = self._convert_to_s3_params(locals())
response = self._exec_request(
self.boto3_iam_client.attach_group_policy,
params,
endpoint=self.iam_endpoint,
profile=self.profile,
)
sleep(S3_SYNC_WAIT_TIME * 10)
return response
@reporter.step("Attaches the specified managed policy to the specified user")
def iam_attach_user_policy(self, user_name: str, policy_arn: str) -> dict:
params = self._convert_to_s3_params(locals())
response = self._exec_request(
self.boto3_iam_client.attach_user_policy,
params,
endpoint=self.iam_endpoint,
profile=self.profile,
)
sleep(S3_SYNC_WAIT_TIME * 10)
return response
@reporter.step("Creates a new AWS secret access key and access key ID for the specified user")
def iam_create_access_key(self, user_name: str) -> dict:
response = self._exec_request(
self.boto3_iam_client.create_access_key,
params={"UserName": user_name},
endpoint=self.iam_endpoint,
profile=self.profile,
)
access_key_id = response["AccessKey"].get("AccessKeyId")
secret_access_key = response["AccessKey"].get("SecretAccessKey")
assert access_key_id, f"Expected AccessKeyId in response:\n{response}"
assert secret_access_key, f"Expected SecretAccessKey in response:\n{response}"
return access_key_id, secret_access_key
@reporter.step("Creates a new group")
def iam_create_group(self, group_name: str) -> dict:
response = self._exec_request(
self.boto3_iam_client.create_group,
params={"GroupName": group_name},
endpoint=self.iam_endpoint,
profile=self.profile,
)
assert response.get("Group"), f"Expected Group in response:\n{response}"
assert response["Group"].get("GroupName") == group_name, f"GroupName should be equal to {group_name}"
return response
@reporter.step("Creates a new managed policy for your AWS account")
def iam_create_policy(self, policy_name: str, policy_document: dict) -> dict:
params = self._convert_to_s3_params(locals())
params["PolicyDocument"] = json.dumps(policy_document)
response = self._exec_request(
self.boto3_iam_client.create_policy,
params,
# Overriding option for AWS CLI
policy_document=policy_document,
endpoint=self.iam_endpoint,
profile=self.profile,
)
assert response.get("Policy"), f"Expected Policy in response:\n{response}"
assert response["Policy"].get("PolicyName") == policy_name, f"PolicyName should be equal to {policy_name}"
return response
@reporter.step("Creates a new IAM user for your AWS account")
def iam_create_user(self, user_name: str) -> dict:
response = self._exec_request(
self.boto3_iam_client.create_user,
params={"UserName": user_name},
endpoint=self.iam_endpoint,
profile=self.profile,
)
assert response.get("User"), f"Expected User in response:\n{response}"
assert response["User"].get("UserName") == user_name, f"UserName should be equal to {user_name}"
return response
@reporter.step("Deletes the access key pair associated with the specified IAM user")
def iam_delete_access_key(self, access_key_id: str, user_name: str) -> dict:
params = self._convert_to_s3_params(locals())
return self._exec_request(
self.boto3_iam_client.delete_access_key,
params,
endpoint=self.iam_endpoint,
profile=self.profile,
)
@reporter.step("Deletes the specified IAM group")
def iam_delete_group(self, group_name: str) -> dict:
return self._exec_request(
self.boto3_iam_client.delete_group,
params={"GroupName": group_name},
endpoint=self.iam_endpoint,
profile=self.profile,
)
@reporter.step("Deletes the specified inline policy that is embedded in the specified IAM group")
def iam_delete_group_policy(self, group_name: str, policy_name: str) -> dict:
params = self._convert_to_s3_params(locals())
return self._exec_request(
self.boto3_iam_client.delete_group_policy,
params,
endpoint=self.iam_endpoint,
profile=self.profile,
)
@reporter.step("Deletes the specified managed policy")
def iam_delete_policy(self, policy_arn: str) -> dict:
return self._exec_request(
self.boto3_iam_client.delete_policy,
params={"PolicyArn": policy_arn},
endpoint=self.iam_endpoint,
profile=self.profile,
)
@reporter.step("Deletes the specified IAM user")
def iam_delete_user(self, user_name: str) -> dict:
return self._exec_request(
self.boto3_iam_client.delete_user,
params={"UserName": user_name},
endpoint=self.iam_endpoint,
profile=self.profile,
)
@reporter.step("Deletes the specified inline policy that is embedded in the specified IAM user")
def iam_delete_user_policy(self, user_name: str, policy_name: str) -> dict:
params = self._convert_to_s3_params(locals())
return self._exec_request(
self.boto3_iam_client.delete_user_policy,
params,
endpoint=self.iam_endpoint,
profile=self.profile,
)
@reporter.step("Removes the specified managed policy from the specified IAM group")
def iam_detach_group_policy(self, group_name: str, policy_arn: str) -> dict:
params = self._convert_to_s3_params(locals())
response = self._exec_request(
self.boto3_iam_client.detach_group_policy,
params,
endpoint=self.iam_endpoint,
profile=self.profile,
)
sleep(S3_SYNC_WAIT_TIME * 10)
return response
@reporter.step("Removes the specified managed policy from the specified user")
def iam_detach_user_policy(self, user_name: str, policy_arn: str) -> dict:
params = self._convert_to_s3_params(locals())
response = self._exec_request(
self.boto3_iam_client.detach_user_policy,
params,
endpoint=self.iam_endpoint,
profile=self.profile,
)
sleep(S3_SYNC_WAIT_TIME * 10)
return response
@reporter.step("Returns a list of IAM users that are in the specified IAM group")
def iam_get_group(self, group_name: str) -> dict:
response = self._exec_request(
self.boto3_iam_client.get_group,
params={"GroupName": group_name},
endpoint=self.iam_endpoint,
profile=self.profile,
)
assert response.get("Group").get("GroupName") == group_name, f"GroupName should be equal to {group_name}"
return response
@reporter.step("Retrieves the specified inline policy document that is embedded in the specified IAM group")
def iam_get_group_policy(self, group_name: str, policy_name: str) -> dict:
params = self._convert_to_s3_params(locals())
return self._exec_request(
self.boto3_iam_client.get_group_policy,
params,
endpoint=self.iam_endpoint,
profile=self.profile,
)
@reporter.step("Retrieves information about the specified managed policy")
def iam_get_policy(self, policy_arn: str) -> dict:
response = self._exec_request(
self.boto3_iam_client.get_policy,
params={"PolicyArn": policy_arn},
endpoint=self.iam_endpoint,
profile=self.profile,
)
assert response.get("Policy"), f"Expected Policy in response:\n{response}"
assert response["Policy"].get("Arn") == policy_arn, f"PolicyArn should be equal to {policy_arn}"
return response
@reporter.step("Retrieves information about the specified version of the specified managed policy")
def iam_get_policy_version(self, policy_arn: str, version_id: str) -> dict:
params = self._convert_to_s3_params(locals())
response = self._exec_request(
self.boto3_iam_client.get_policy_version,
params,
endpoint=self.iam_endpoint,
profile=self.profile,
)
assert response.get("PolicyVersion"), f"Expected PolicyVersion in response:\n{response}"
assert response["PolicyVersion"].get("VersionId") == version_id, f"VersionId should be equal to {version_id}"
return response
@reporter.step("Retrieves information about the specified IAM user")
def iam_get_user(self, user_name: str) -> dict:
response = self._exec_request(
self.boto3_iam_client.get_user,
params={"UserName": user_name},
endpoint=self.iam_endpoint,
profile=self.profile,
)
assert response.get("User"), f"Expected User in response:\n{response}"
assert response["User"].get("UserName") == user_name, f"UserName should be equal to {user_name}"
return response
@reporter.step("Retrieves the specified inline policy document that is embedded in the specified IAM user")
def iam_get_user_policy(self, user_name: str, policy_name: str) -> dict:
params = self._convert_to_s3_params(locals())
response = self._exec_request(
self.boto3_iam_client.get_user_policy,
params,
endpoint=self.iam_endpoint,
profile=self.profile,
)
assert response.get("UserName"), f"Expected UserName in response:\n{response}"
return response
@reporter.step("Returns information about the access key IDs associated with the specified IAM user")
def iam_list_access_keys(self, user_name: str) -> dict:
return self._exec_request(
self.boto3_iam_client.list_access_keys,
params={"UserName": user_name},
endpoint=self.iam_endpoint,
profile=self.profile,
)
@reporter.step("Lists all managed policies that are attached to the specified IAM group")
def iam_list_attached_group_policies(self, group_name: str) -> dict:
response = self._exec_request(
self.boto3_iam_client.list_attached_group_policies,
params={"GroupName": group_name},
endpoint=self.iam_endpoint,
profile=self.profile,
)
assert response.get("AttachedPolicies"), f"Expected AttachedPolicies in response:\n{response}"
return response
@reporter.step("Lists all managed policies that are attached to the specified IAM user")
def iam_list_attached_user_policies(self, user_name: str) -> dict:
response = self._exec_request(
self.boto3_iam_client.list_attached_user_policies,
params={"UserName": user_name},
endpoint=self.iam_endpoint,
profile=self.profile,
)
assert response.get("AttachedPolicies"), f"Expected AttachedPolicies in response:\n{response}"
return response
@reporter.step("Lists all IAM users, groups, and roles that the specified managed policy is attached to")
def iam_list_entities_for_policy(self, policy_arn: str) -> dict:
response = self._exec_request(
self.boto3_iam_client.list_entities_for_policy,
params={"PolicyArn": policy_arn},
endpoint=self.iam_endpoint,
profile=self.profile,
)
assert response.get("PolicyGroups"), f"Expected PolicyGroups in response:\n{response}"
assert response.get("PolicyUsers"), f"Expected PolicyUsers in response:\n{response}"
return response
@reporter.step("Lists the names of the inline policies that are embedded in the specified IAM group")
def iam_list_group_policies(self, group_name: str) -> dict:
response = self._exec_request(
self.boto3_iam_client.list_group_policies,
params={"GroupName": group_name},
endpoint=self.iam_endpoint,
profile=self.profile,
)
assert response.get("PolicyNames"), f"Expected PolicyNames in response:\n{response}"
return response
@reporter.step("Lists the IAM groups")
def iam_list_groups(self) -> dict:
response = self._exec_request(
self.boto3_iam_client.list_groups,
endpoint=self.iam_endpoint,
profile=self.profile,
)
assert response.get("Groups"), f"Expected Groups in response:\n{response}"
return response
@reporter.step("Lists the IAM groups that the specified IAM user belongs to")
def iam_list_groups_for_user(self, user_name: str) -> dict:
response = self._exec_request(
self.boto3_iam_client.list_groups_for_user,
params={"UserName": user_name},
endpoint=self.iam_endpoint,
profile=self.profile,
)
assert response.get("Groups"), f"Expected Groups in response:\n{response}"
return response
@reporter.step("Lists all the managed policies that are available in your AWS account")
def iam_list_policies(self) -> dict:
response = self._exec_request(
self.boto3_iam_client.list_policies,
endpoint=self.iam_endpoint,
profile=self.profile,
)
assert response.get("Policies"), f"Expected Policies in response:\n{response}"
return response
@reporter.step("Lists information about the versions of the specified managed policy")
def iam_list_policy_versions(self, policy_arn: str) -> dict:
response = self._exec_request(
self.boto3_iam_client.list_policy_versions,
params={"PolicyArn": policy_arn},
endpoint=self.iam_endpoint,
profile=self.profile,
)
assert response.get("Versions"), f"Expected Versions in response:\n{response}"
return response
@reporter.step("Lists the names of the inline policies embedded in the specified IAM user")
def iam_list_user_policies(self, user_name: str) -> dict:
response = self._exec_request(
self.boto3_iam_client.list_user_policies,
params={"UserName": user_name},
endpoint=self.iam_endpoint,
profile=self.profile,
)
assert response.get("PolicyNames"), f"Expected PolicyNames in response:\n{response}"
return response
@reporter.step("Lists the IAM users")
def iam_list_users(self) -> dict:
response = self._exec_request(
self.boto3_iam_client.list_users,
endpoint=self.iam_endpoint,
profile=self.profile,
)
assert response.get("Users"), f"Expected Users in response:\n{response}"
return response
@reporter.step("Adds or updates an inline policy document that is embedded in the specified IAM group")
def iam_put_group_policy(self, group_name: str, policy_name: str, policy_document: dict) -> dict:
params = self._convert_to_s3_params(locals())
params["PolicyDocument"] = json.dumps(policy_document)
response = self._exec_request(
self.boto3_iam_client.put_group_policy,
params,
# Overriding option for AWS CLI
policy_document=policy_document,
endpoint=self.iam_endpoint,
profile=self.profile,
)
sleep(S3_SYNC_WAIT_TIME * 10)
return response
@reporter.step("Adds or updates an inline policy document that is embedded in the specified IAM user")
def iam_put_user_policy(self, user_name: str, policy_name: str, policy_document: dict) -> dict:
params = self._convert_to_s3_params(locals())
params["PolicyDocument"] = json.dumps(policy_document)
response = self._exec_request(
self.boto3_iam_client.put_user_policy,
params,
# Overriding option for AWS CLI
policy_document=policy_document,
endpoint=self.iam_endpoint,
profile=self.profile,
)
sleep(S3_SYNC_WAIT_TIME * 10)
return response
@reporter.step("Removes the specified user from the specified group")
def iam_remove_user_from_group(self, group_name: str, user_name: str) -> dict:
params = self._convert_to_s3_params(locals())
return self._exec_request(
self.boto3_iam_client.remove_user_from_group,
params,
endpoint=self.iam_endpoint,
profile=self.profile,
)
@reporter.step("Updates the name and/or the path of the specified IAM group")
def iam_update_group(self, group_name: str, new_name: str, new_path: Optional[str] = None) -> dict:
params = {"GroupName": group_name, "NewGroupName": new_name, "NewPath": "/"}
return self._exec_request(
self.boto3_iam_client.update_group,
params,
endpoint=self.iam_endpoint,
profile=self.profile,
)
@reporter.step("Updates the name and/or the path of the specified IAM user")
def iam_update_user(self, user_name: str, new_name: str, new_path: Optional[str] = None) -> dict:
params = {"UserName": user_name, "NewUserName": new_name, "NewPath": "/"}
return self._exec_request(
self.boto3_iam_client.update_user,
params,
endpoint=self.iam_endpoint,
profile=self.profile,
)
@reporter.step("Adds one or more tags to an IAM user")
def iam_tag_user(self, user_name: str, tags: list) -> dict:
params = self._convert_to_s3_params(locals())
params["Tags"] = [{"Key": tag_key, "Value": tag_value} for tag_key, tag_value in tags]
return self._exec_request(
self.boto3_iam_client.tag_user,
params,
endpoint=self.iam_endpoint,
profile=self.profile,
)
@reporter.step("List tags of IAM user")
def iam_list_user_tags(self, user_name: str) -> dict:
return self._exec_request(
self.boto3_iam_client.list_user_tags,
params={"UserName": user_name},
endpoint=self.iam_endpoint,
profile=self.profile,
)
@reporter.step("Removes the specified tags from the user")
def iam_untag_user(self, user_name: str, tag_keys: list) -> dict:
params = self._convert_to_s3_params(locals())
return self._exec_request(
self.boto3_iam_client.untag_user,
params,
endpoint=self.iam_endpoint,
profile=self.profile,
)
# MFA methods
@reporter.step("Creates a new virtual MFA device")
def iam_create_virtual_mfa_device(
self, virtual_mfa_device_name: str, outfile: Optional[str] = None, bootstrap_method: Optional[str] = None
) -> tuple:
response = self.boto3_iam_client.create_virtual_mfa_device(VirtualMFADeviceName=virtual_mfa_device_name)
serial_number = response.get("VirtualMFADevice", {}).get("SerialNumber")
base32StringSeed = response.get("VirtualMFADevice", {}).get("Base32StringSeed")
assert serial_number, f"Expected SerialNumber in response:\n{response}"
assert base32StringSeed, f"Expected Base32StringSeed in response:\n{response}"
return serial_number, base32StringSeed
@reporter.step("Deactivates the specified MFA device and removes it from association with the user name")
def iam_deactivate_mfa_device(self, user_name: str, serial_number: str) -> dict:
response = self.boto3_iam_client.deactivate_mfa_device(UserName=user_name, SerialNumber=serial_number)
return response
@reporter.step("Deletes a virtual MFA device")
def iam_delete_virtual_mfa_device(self, serial_number: str) -> dict:
response = self.boto3_iam_client.delete_virtual_mfa_device(SerialNumber=serial_number)
return response
@reporter.step("Enables the specified MFA device and associates it with the specified IAM user")
def iam_enable_mfa_device(self, user_name: str, serial_number: str, authentication_code1: str, authentication_code2: str) -> dict:
response = self.boto3_iam_client.enable_mfa_device(
UserName=user_name,
SerialNumber=serial_number,
AuthenticationCode1=authentication_code1,
AuthenticationCode2=authentication_code2,
)
return response
@reporter.step("Lists the MFA devices for an IAM user")
def iam_list_virtual_mfa_devices(self) -> dict:
response = self.boto3_iam_client.list_virtual_mfa_devices()
assert response.get("VirtualMFADevices"), f"Expected VirtualMFADevices in response:\n{response}"
return response
@reporter.step("Get session token for user")
def sts_get_session_token(
self, duration_seconds: Optional[str] = "", serial_number: Optional[str] = "", token_code: Optional[str] = ""
) -> tuple:
response = self.boto3_sts_client.get_session_token(
DurationSeconds=duration_seconds,
SerialNumber=serial_number,
TokenCode=token_code,
)
access_key = response.get("Credentials", {}).get("AccessKeyId")
secret_access_key = response.get("Credentials", {}).get("SecretAccessKey")
session_token = response.get("Credentials", {}).get("SessionToken")
assert access_key, f"Expected AccessKeyId in response:\n{response}"
assert secret_access_key, f"Expected SecretAccessKey in response:\n{response}"
assert session_token, f"Expected SessionToken in response:\n{response}"
return access_key, secret_access_key, session_token