All checks were successful
DCO action / DCO (pull_request) Successful in 1m22s
Signed-off-by: Kirill Sosnovskikh <k.sosnovskikh@yadro.com>
611 lines
23 KiB
Python
611 lines
23 KiB
Python
from abc import ABC, abstractmethod
|
|
from datetime import datetime
|
|
from typing import Literal, Optional, Union
|
|
|
|
from frostfs_testlib.storage.cluster import ClusterNode
|
|
from frostfs_testlib.testing.readable import HumanReadableABC, HumanReadableEnum
|
|
from frostfs_testlib.utils.file_utils import TestFile
|
|
|
|
|
|
def _make_objs_dict(key_names):
|
|
objs_list = []
|
|
for key in key_names:
|
|
obj_dict = {"Key": key}
|
|
objs_list.append(obj_dict)
|
|
objs_dict = {"Objects": objs_list}
|
|
return objs_dict
|
|
|
|
|
|
class VersioningStatus(HumanReadableEnum):
|
|
UNDEFINED = None
|
|
ENABLED = "Enabled"
|
|
SUSPENDED = "Suspended"
|
|
|
|
|
|
ACL_COPY = [
|
|
"private",
|
|
"public-read",
|
|
"public-read-write",
|
|
"authenticated-read",
|
|
"aws-exec-read",
|
|
"bucket-owner-read",
|
|
"bucket-owner-full-control",
|
|
]
|
|
|
|
|
|
class BucketContainerResolver(ABC):
|
|
@abstractmethod
|
|
def resolve(self, node: ClusterNode, bucket_name: str, **kwargs: dict) -> str:
|
|
"""
|
|
Resolve Container ID from bucket name
|
|
|
|
Args:
|
|
node: node from where we want to resolve
|
|
bucket_name: name of the bucket
|
|
**kwargs: any other required params
|
|
|
|
Returns: Container ID
|
|
"""
|
|
raise NotImplementedError("Call from abstract class")
|
|
|
|
|
|
class S3ClientWrapper(HumanReadableABC):
|
|
@abstractmethod
|
|
def __init__(self, access_key_id: str, secret_access_key: str, s3gate_endpoint: str, profile: str, region: str) -> None:
|
|
pass
|
|
|
|
@abstractmethod
|
|
def set_endpoint(self, s3gate_endpoint: str):
|
|
"""Set endpoint"""
|
|
|
|
@abstractmethod
|
|
def set_iam_endpoint(self, iam_endpoint: str):
|
|
"""Set iam endpoint"""
|
|
|
|
@abstractmethod
|
|
def create_bucket(
|
|
self,
|
|
bucket: Optional[str] = None,
|
|
object_lock_enabled_for_bucket: Optional[bool] = None,
|
|
acl: Optional[str] = None,
|
|
grant_write: Optional[str] = None,
|
|
grant_read: Optional[str] = None,
|
|
grant_full_control: Optional[str] = None,
|
|
location_constraint: Optional[str] = None,
|
|
) -> str:
|
|
"""Create a bucket."""
|
|
|
|
# BUCKET METHODS #
|
|
|
|
@abstractmethod
|
|
def list_buckets(self) -> list[str]:
|
|
"""List buckets."""
|
|
|
|
@abstractmethod
|
|
def delete_bucket(self, bucket: str) -> None:
|
|
"""Delete bucket"""
|
|
|
|
@abstractmethod
|
|
def head_bucket(self, bucket: str) -> None:
|
|
"""This action is useful to determine if a bucket exists and you have permission to access it.
|
|
The action returns a 200 OK if the bucket exists and you have permission to access it.
|
|
|
|
If the bucket does not exist or you do not have permission to access it, the HEAD request
|
|
returns a generic 400 Bad Request, 403 Forbidden or 404 Not Found code.
|
|
A message body is not included, so you cannot determine the exception beyond these error codes.
|
|
"""
|
|
|
|
@abstractmethod
|
|
def put_bucket_versioning(self, bucket: str, status: VersioningStatus) -> None:
|
|
"""Sets the versioning state of an existing bucket.
|
|
|
|
You can set the versioning state with one of the following values:
|
|
|
|
Enabled—Enables versioning for the objects in the bucket. All objects added to the bucket receive a unique version ID.
|
|
|
|
Suspended—Disables versioning for the objects in the bucket. All objects added to the bucket receive the version ID null.
|
|
|
|
If the versioning state has never been set on a bucket, it has no versioning state
|
|
"""
|
|
|
|
@abstractmethod
|
|
def get_bucket_versioning_status(self, bucket: str) -> Literal["Enabled", "Suspended"]:
|
|
"""Returns the versioning state of a bucket.
|
|
|
|
To retrieve the versioning state of a bucket, you must be the bucket owner.
|
|
"""
|
|
|
|
@abstractmethod
|
|
def put_bucket_tagging(self, bucket: str, tags: list) -> None:
|
|
"""Sets the tags for a bucket."""
|
|
|
|
@abstractmethod
|
|
def get_bucket_tagging(self, bucket: str) -> list:
|
|
"""Returns the tag set associated with the Outposts bucket."""
|
|
|
|
@abstractmethod
|
|
def delete_bucket_tagging(self, bucket: str) -> None:
|
|
"""Deletes the tags from the bucket."""
|
|
|
|
@abstractmethod
|
|
def get_bucket_acl(self, bucket: str) -> dict:
|
|
"""This implementation of the GET action uses the acl subresource to return the access control list (ACL) of a bucket."""
|
|
|
|
@abstractmethod
|
|
def put_bucket_acl(
|
|
self,
|
|
bucket: str,
|
|
acl: Optional[str] = None,
|
|
grant_write: Optional[str] = None,
|
|
grant_read: Optional[str] = None,
|
|
) -> list:
|
|
"""Sets the permissions on an existing bucket using access control lists (ACL)."""
|
|
|
|
@abstractmethod
|
|
def put_object_lock_configuration(self, bucket: str, configuration: dict) -> dict:
|
|
"""Places an Object Lock configuration on the specified bucket.
|
|
The rule specified in the Object Lock configuration will be applied by
|
|
default to every new object placed in the specified bucket."""
|
|
|
|
@abstractmethod
|
|
def get_object_lock_configuration(self, bucket: str) -> dict:
|
|
"""Gets the Object Lock configuration for a bucket.
|
|
The rule specified in the Object Lock configuration will be applied by
|
|
default to every new object placed in the specified bucket."""
|
|
|
|
@abstractmethod
|
|
def get_bucket_policy(self, bucket: str) -> str:
|
|
"""Returns the policy of a specified bucket."""
|
|
|
|
@abstractmethod
|
|
def delete_bucket_policy(self, bucket: str) -> str:
|
|
"""Deletes the policy of a specified bucket."""
|
|
|
|
@abstractmethod
|
|
def put_bucket_policy(self, bucket: str, policy: dict) -> None:
|
|
"""Applies S3 bucket policy to an S3 bucket."""
|
|
|
|
@abstractmethod
|
|
def get_bucket_cors(self, bucket: str) -> dict:
|
|
"""Returns the Cross-Origin Resource Sharing (CORS) configuration information set for the bucket."""
|
|
|
|
@abstractmethod
|
|
def put_bucket_cors(self, bucket: str, cors_configuration: dict) -> None:
|
|
"""Sets the cors configuration for your bucket. If the configuration exists, S3 replaces it."""
|
|
|
|
@abstractmethod
|
|
def delete_bucket_cors(self, bucket: str) -> None:
|
|
"""Deletes the cors configuration information set for the bucket."""
|
|
|
|
@abstractmethod
|
|
def get_bucket_location(self, bucket: str) -> str:
|
|
"""Returns the LocationConstraint the bucket resides in. You can set the it
|
|
using the LocationConstraint request parameter in a CreateBucket request."""
|
|
|
|
# END OF BUCKET METHODS #
|
|
|
|
# OBJECT METHODS #
|
|
|
|
@abstractmethod
|
|
def list_objects_v2(self, bucket: str, full_output: bool = False) -> Union[dict, list[str]]:
|
|
"""Returns some or all (up to 1,000) of the objects in a bucket with each request.
|
|
You can use the request parameters as selection criteria to return a subset of the objects in a bucket.
|
|
A 200 OK response can contain valid or invalid XML. Make sure to design your application
|
|
to parse the contents of the response and handle it appropriately.
|
|
"""
|
|
|
|
@abstractmethod
|
|
def list_objects(
|
|
self, bucket: str, full_output: bool = False, page_size: Optional[int] = None, prefix: Optional[str] = None
|
|
) -> Union[dict, list[str]]:
|
|
"""Returns some or all (up to 1,000) of the objects in a bucket with each request.
|
|
You can use the request parameters as selection criteria to return a subset of the objects in a bucket.
|
|
A 200 OK response can contain valid or invalid XML. Make sure to design your application
|
|
to parse the contents of the response and handle it appropriately.
|
|
"""
|
|
|
|
@abstractmethod
|
|
def list_objects_versions(self, bucket: str, full_output: bool = False) -> dict:
|
|
"""Returns metadata about all versions of the objects in a bucket."""
|
|
|
|
@abstractmethod
|
|
def list_delete_markers(self, bucket: str, full_output: bool = False) -> dict:
|
|
"""Returns metadata about all delete markers of the objects in a bucket."""
|
|
|
|
@abstractmethod
|
|
def put_object(
|
|
self,
|
|
bucket: str,
|
|
filepath: str,
|
|
key: Optional[str] = None,
|
|
metadata: Optional[dict] = None,
|
|
tagging: Optional[str] = None,
|
|
acl: Optional[str] = None,
|
|
object_lock_mode: Optional[str] = None,
|
|
object_lock_retain_until_date: Optional[datetime] = None,
|
|
object_lock_legal_hold_status: Optional[str] = None,
|
|
grant_full_control: Optional[str] = None,
|
|
grant_read: Optional[str] = None,
|
|
) -> str:
|
|
"""Adds an object to a bucket."""
|
|
|
|
@abstractmethod
|
|
def head_object(self, bucket: str, key: str, version_id: Optional[str] = None) -> dict:
|
|
"""The HEAD action retrieves metadata from an object without returning the object itself.
|
|
This action is useful if you're only interested in an object's metadata."""
|
|
|
|
@abstractmethod
|
|
def delete_object(self, bucket: str, key: str, version_id: Optional[str] = None) -> dict:
|
|
"""Removes the null version (if there is one) of an object and inserts a delete marker,
|
|
which becomes the latest version of the object. If there isn't a null version,
|
|
S3 does not remove any objects but will still respond that the command was successful."""
|
|
|
|
@abstractmethod
|
|
def delete_objects(self, bucket: str, keys: list[str]) -> dict:
|
|
"""This action enables you to delete multiple objects from a bucket
|
|
using a single HTTP request. If you know the object keys that
|
|
you want to delete, then this action provides a suitable alternative
|
|
to sending individual delete requests, reducing per-request overhead.
|
|
|
|
The request contains a list of up to 1000 keys that you want to delete."""
|
|
|
|
@abstractmethod
|
|
def delete_object_versions(self, bucket: str, object_versions: list) -> dict:
|
|
"""Delete object versions"""
|
|
|
|
@abstractmethod
|
|
def delete_object_versions_without_dm(self, bucket: str, object_versions: list) -> None:
|
|
"""Delete object versions without delete markers"""
|
|
|
|
@abstractmethod
|
|
def put_object_acl(
|
|
self,
|
|
bucket: str,
|
|
key: str,
|
|
acl: Optional[str] = None,
|
|
grant_write: Optional[str] = None,
|
|
grant_read: Optional[str] = None,
|
|
) -> list:
|
|
"""Uses the acl subresource to set the access control
|
|
list (ACL) permissions for a new or existing object in an S3 bucket."""
|
|
|
|
@abstractmethod
|
|
def get_object_acl(self, bucket: str, key: str, version_id: Optional[str] = None) -> list:
|
|
"""Returns the access control list (ACL) of an object."""
|
|
|
|
@abstractmethod
|
|
def copy_object(
|
|
self,
|
|
source_bucket: str,
|
|
source_key: str,
|
|
bucket: Optional[str] = None,
|
|
key: Optional[str] = None,
|
|
acl: Optional[str] = None,
|
|
metadata_directive: Optional[Literal["COPY", "REPLACE"]] = None,
|
|
metadata: Optional[dict] = None,
|
|
tagging_directive: Optional[Literal["COPY", "REPLACE"]] = None,
|
|
tagging: Optional[str] = None,
|
|
) -> str:
|
|
"""Creates a copy of an object"""
|
|
|
|
@abstractmethod
|
|
def get_object(
|
|
self,
|
|
bucket: str,
|
|
key: str,
|
|
version_id: Optional[str] = None,
|
|
object_range: Optional[tuple[int, int]] = None,
|
|
full_output: bool = False,
|
|
) -> dict | TestFile:
|
|
"""Retrieves objects from S3."""
|
|
|
|
@abstractmethod
|
|
def create_multipart_upload(self, bucket: str, key: str) -> str:
|
|
"""This action initiates a multipart upload and returns an upload ID.
|
|
This upload ID is used to associate all of the parts in the specific multipart upload.
|
|
You specify this upload ID in each of your subsequent upload part requests (see UploadPart).
|
|
You also include this upload ID in the final request to either complete or abort the multipart upload request."""
|
|
|
|
@abstractmethod
|
|
def list_multipart_uploads(self, bucket: str) -> Optional[list[dict]]:
|
|
"""This action lists in-progress multipart uploads.
|
|
An in-progress multipart upload is a multipart upload that has been initiated
|
|
using the Initiate Multipart Upload request, but has not yet been completed or aborted.
|
|
|
|
This action returns at most 1,000 multipart uploads in the response."""
|
|
|
|
@abstractmethod
|
|
def abort_multipart_upload(self, bucket: str, key: str, upload_id: str) -> None:
|
|
"""This action aborts a multipart upload. After a multipart upload is aborted,
|
|
no additional parts can be uploaded using that upload ID.
|
|
The storage consumed by any previously uploaded parts will be freed.
|
|
However, if any part uploads are currently in progress, those part
|
|
uploads might or might not succeed. As a result, it might be necessary to
|
|
abort a given multipart upload multiple times in order to completely free all storage consumed by all parts."""
|
|
|
|
@abstractmethod
|
|
def upload_part(self, bucket: str, key: str, upload_id: str, part_num: int, filepath: str) -> str:
|
|
"""Uploads a part in a multipart upload."""
|
|
|
|
@abstractmethod
|
|
def upload_part_copy(self, bucket: str, key: str, upload_id: str, part_num: int, copy_source: str) -> str:
|
|
"""Uploads a part by copying data from an existing object as data source."""
|
|
|
|
@abstractmethod
|
|
def list_parts(self, bucket: str, key: str, upload_id: str) -> list[dict]:
|
|
"""Lists the parts that have been uploaded for a specific multipart upload."""
|
|
|
|
@abstractmethod
|
|
def complete_multipart_upload(self, bucket: str, key: str, upload_id: str, parts: list) -> dict:
|
|
"""Completes a multipart upload by assembling previously uploaded parts."""
|
|
|
|
@abstractmethod
|
|
def put_object_retention(
|
|
self,
|
|
bucket: str,
|
|
key: str,
|
|
retention: dict,
|
|
version_id: Optional[str] = None,
|
|
bypass_governance_retention: Optional[bool] = None,
|
|
) -> None:
|
|
"""Places an Object Retention configuration on an object."""
|
|
|
|
@abstractmethod
|
|
def put_object_legal_hold(
|
|
self,
|
|
bucket: str,
|
|
key: str,
|
|
legal_hold_status: Literal["ON", "OFF"],
|
|
version_id: Optional[str] = None,
|
|
) -> None:
|
|
"""Applies a legal hold configuration to the specified object."""
|
|
|
|
@abstractmethod
|
|
def put_object_tagging(self, bucket: str, key: str, tags: list) -> None:
|
|
"""Sets the tag-set for an object."""
|
|
|
|
@abstractmethod
|
|
def get_object_tagging(self, bucket: str, key: str, version_id: Optional[str] = None) -> list:
|
|
"""Returns the tag-set of an object."""
|
|
|
|
@abstractmethod
|
|
def delete_object_tagging(self, bucket: str, key: str) -> None:
|
|
"""Removes the entire tag set from the specified object."""
|
|
|
|
@abstractmethod
|
|
def put_bucket_lifecycle_configuration(self, bucket: str, lifecycle_configuration: dict, dumped_configuration: str) -> dict:
|
|
"""Adds or updates bucket lifecycle configuration"""
|
|
|
|
@abstractmethod
|
|
def get_bucket_lifecycle_configuration(self, bucket: str) -> dict:
|
|
"""Gets bucket lifecycle configuration"""
|
|
|
|
@abstractmethod
|
|
def delete_bucket_lifecycle(self, bucket: str) -> dict:
|
|
"""Deletes bucket lifecycle"""
|
|
|
|
@abstractmethod
|
|
def get_object_attributes(
|
|
self,
|
|
bucket: str,
|
|
key: str,
|
|
attributes: list[str],
|
|
version_id: str = "",
|
|
max_parts: int = 0,
|
|
part_number: int = 0,
|
|
full_output: bool = True,
|
|
) -> dict:
|
|
"""Retrieves all the metadata from an object without returning the object itself."""
|
|
|
|
@abstractmethod
|
|
def sync(
|
|
self,
|
|
bucket: str,
|
|
dir_path: str,
|
|
acl: Optional[str] = None,
|
|
metadata: Optional[dict] = None,
|
|
) -> dict:
|
|
"""sync directory TODO: Add proper description"""
|
|
|
|
@abstractmethod
|
|
def cp(
|
|
self,
|
|
bucket: str,
|
|
dir_path: str,
|
|
acl: Optional[str] = None,
|
|
metadata: Optional[dict] = None,
|
|
) -> dict:
|
|
"""cp directory TODO: Add proper description"""
|
|
|
|
# END OF OBJECT METHODS #
|
|
|
|
# IAM METHODS #
|
|
|
|
@abstractmethod
|
|
def iam_add_user_to_group(self, user_name: str, group_name: str) -> dict:
|
|
"""Adds the specified user to the specified group"""
|
|
|
|
@abstractmethod
|
|
def iam_attach_group_policy(self, group_name: str, policy_arn: str) -> dict:
|
|
"""Attaches the specified managed policy to the specified IAM group"""
|
|
|
|
@abstractmethod
|
|
def iam_attach_user_policy(self, user_name: str, policy_arn: str) -> dict:
|
|
"""Attaches the specified managed policy to the specified user"""
|
|
|
|
@abstractmethod
|
|
def iam_create_access_key(self, user_name: str) -> dict:
|
|
"""Creates a new AWS secret access key and access key ID for the specified user"""
|
|
|
|
@abstractmethod
|
|
def iam_create_group(self, group_name: str) -> dict:
|
|
"""Creates a new group"""
|
|
|
|
@abstractmethod
|
|
def iam_create_policy(self, policy_name: str, policy_document: dict) -> dict:
|
|
"""Creates a new managed policy for your AWS account"""
|
|
|
|
@abstractmethod
|
|
def iam_create_user(self, user_name: str) -> dict:
|
|
"""Creates a new IAM user for your AWS account"""
|
|
|
|
@abstractmethod
|
|
def iam_delete_access_key(self, access_key_id: str, user_name: str) -> dict:
|
|
"""Deletes the access key pair associated with the specified IAM user"""
|
|
|
|
@abstractmethod
|
|
def iam_delete_group(self, group_name: str) -> dict:
|
|
"""Deletes the specified IAM group"""
|
|
|
|
@abstractmethod
|
|
def iam_delete_group_policy(self, group_name: str, policy_name: str) -> dict:
|
|
"""Deletes the specified inline policy that is embedded in the specified IAM group"""
|
|
|
|
@abstractmethod
|
|
def iam_delete_policy(self, policy_arn: str) -> dict:
|
|
"""Deletes the specified managed policy"""
|
|
|
|
@abstractmethod
|
|
def iam_delete_user(self, user_name: str) -> dict:
|
|
"""Deletes the specified IAM user"""
|
|
|
|
@abstractmethod
|
|
def iam_delete_user_policy(self, user_name: str, policy_name: str) -> dict:
|
|
"""Deletes the specified inline policy that is embedded in the specified IAM user"""
|
|
|
|
@abstractmethod
|
|
def iam_detach_group_policy(self, group_name: str, policy_arn: str) -> dict:
|
|
"""Removes the specified managed policy from the specified IAM group"""
|
|
|
|
@abstractmethod
|
|
def iam_detach_user_policy(self, user_name: str, policy_arn: str) -> dict:
|
|
"""Removes the specified managed policy from the specified user"""
|
|
|
|
@abstractmethod
|
|
def iam_get_group(self, group_name: str) -> dict:
|
|
"""Returns a list of IAM users that are in the specified IAM group"""
|
|
|
|
@abstractmethod
|
|
def iam_get_group_policy(self, group_name: str, policy_name: str) -> dict:
|
|
"""Retrieves the specified inline policy document that is embedded in the specified IAM group"""
|
|
|
|
@abstractmethod
|
|
def iam_get_policy(self, policy_arn: str) -> dict:
|
|
"""Retrieves information about the specified managed policy"""
|
|
|
|
@abstractmethod
|
|
def iam_get_policy_version(self, policy_arn: str, version_id: str) -> dict:
|
|
"""Retrieves information about the specified version of the specified managed policy"""
|
|
|
|
@abstractmethod
|
|
def iam_get_user(self, user_name: str) -> dict:
|
|
"""Retrieves information about the specified IAM user"""
|
|
|
|
@abstractmethod
|
|
def iam_get_user_policy(self, user_name: str, policy_name: str) -> dict:
|
|
"""Retrieves the specified inline policy document that is embedded in the specified IAM user"""
|
|
|
|
@abstractmethod
|
|
def iam_list_access_keys(self, user_name: str) -> dict:
|
|
"""Returns information about the access key IDs associated with the specified IAM user"""
|
|
|
|
@abstractmethod
|
|
def iam_list_attached_group_policies(self, group_name: str) -> dict:
|
|
"""Lists all managed policies that are attached to the specified IAM group"""
|
|
|
|
@abstractmethod
|
|
def iam_list_attached_user_policies(self, user_name: str) -> dict:
|
|
"""Lists all managed policies that are attached to the specified IAM user"""
|
|
|
|
@abstractmethod
|
|
def iam_list_entities_for_policy(self, policy_arn: str) -> dict:
|
|
"""Lists all IAM users, groups, and roles that the specified managed policy is attached to"""
|
|
|
|
@abstractmethod
|
|
def iam_list_group_policies(self, group_name: str) -> dict:
|
|
"""Lists the names of the inline policies that are embedded in the specified IAM group"""
|
|
|
|
@abstractmethod
|
|
def iam_list_groups(self) -> dict:
|
|
"""Lists the IAM groups"""
|
|
|
|
@abstractmethod
|
|
def iam_list_groups_for_user(self, user_name: str) -> dict:
|
|
"""Lists the IAM groups that the specified IAM user belongs to"""
|
|
|
|
@abstractmethod
|
|
def iam_list_policies(self) -> dict:
|
|
"""Lists all the managed policies that are available in your AWS account"""
|
|
|
|
@abstractmethod
|
|
def iam_list_policy_versions(self, policy_arn: str) -> dict:
|
|
"""Lists information about the versions of the specified managed policy"""
|
|
|
|
@abstractmethod
|
|
def iam_list_user_policies(self, user_name: str) -> dict:
|
|
"""Lists the names of the inline policies embedded in the specified IAM user"""
|
|
|
|
@abstractmethod
|
|
def iam_list_users(self) -> dict:
|
|
"""Lists the IAM users"""
|
|
|
|
@abstractmethod
|
|
def iam_put_group_policy(self, group_name: str, policy_name: str, policy_document: dict) -> dict:
|
|
"""Adds or updates an inline policy document that is embedded in the specified IAM group"""
|
|
|
|
@abstractmethod
|
|
def iam_put_user_policy(self, user_name: str, policy_name: str, policy_document: dict) -> dict:
|
|
"""Adds or updates an inline policy document that is embedded in the specified IAM user"""
|
|
|
|
@abstractmethod
|
|
def iam_remove_user_from_group(self, group_name: str, user_name: str) -> dict:
|
|
"""Removes the specified user from the specified group"""
|
|
|
|
@abstractmethod
|
|
def iam_update_group(self, group_name: str, new_name: Optional[str] = None, new_path: Optional[str] = None) -> dict:
|
|
"""Updates the name and/or the path of the specified IAM group"""
|
|
|
|
@abstractmethod
|
|
def iam_update_user(self, user_name: str, new_name: Optional[str] = None, new_path: Optional[str] = None) -> dict:
|
|
"""Updates the name and/or the path of the specified IAM user"""
|
|
|
|
@abstractmethod
|
|
def iam_tag_user(self, user_name: str, tags: list) -> dict:
|
|
"""Adds one or more tags to an IAM user"""
|
|
|
|
@abstractmethod
|
|
def iam_list_user_tags(self, user_name: str) -> dict:
|
|
"""List tags of IAM user"""
|
|
|
|
@abstractmethod
|
|
def iam_untag_user(self, user_name: str, tag_keys: list) -> dict:
|
|
"""Removes the specified tags from the user"""
|
|
|
|
# MFA methods
|
|
@abstractmethod
|
|
def iam_create_virtual_mfa_device(
|
|
self, virtual_mfa_device_name: str, outfile: Optional[str] = None, bootstrap_method: Optional[str] = None
|
|
) -> tuple:
|
|
"""Creates a new virtual MFA device"""
|
|
|
|
@abstractmethod
|
|
def iam_deactivate_mfa_device(self, user_name: str, serial_number: str) -> dict:
|
|
"""Deactivates the specified MFA device and removes it from association with the user name"""
|
|
|
|
@abstractmethod
|
|
def iam_delete_virtual_mfa_device(self, serial_number: str) -> dict:
|
|
"""Deletes a virtual MFA device"""
|
|
|
|
@abstractmethod
|
|
def iam_enable_mfa_device(self, user_name: str, serial_number: str, authentication_code1: str, authentication_code2: str) -> dict:
|
|
"""Enables the specified MFA device and associates it with the specified IAM user"""
|
|
|
|
@abstractmethod
|
|
def iam_list_virtual_mfa_devices(self) -> dict:
|
|
"""Lists the MFA devices for an IAM user"""
|
|
|
|
@abstractmethod
|
|
def sts_get_session_token(
|
|
self, duration_seconds: Optional[str] = None, serial_number: Optional[str] = None, token_code: Optional[str] = None
|
|
) -> tuple:
|
|
"""Get session token for user"""
|