frostfs-testlib/src/frostfs_testlib/s3/interfaces.py

384 lines
14 KiB
Python

from abc import abstractmethod
from datetime import datetime
from typing import Literal, Optional, Union
from frostfs_testlib.testing.readable import HumanReadableABC, HumanReadableEnum
def _make_objs_dict(key_names):
objs_list = []
for key in key_names:
obj_dict = {"Key": key}
objs_list.append(obj_dict)
objs_dict = {"Objects": objs_list}
return objs_dict
class VersioningStatus(HumanReadableEnum):
UNDEFINED = None
ENABLED = "Enabled"
SUSPENDED = "Suspended"
ACL_COPY = [
"private",
"public-read",
"public-read-write",
"authenticated-read",
"aws-exec-read",
"bucket-owner-read",
"bucket-owner-full-control",
]
class S3ClientWrapper(HumanReadableABC):
@abstractmethod
def __init__(self, access_key_id: str, secret_access_key: str, s3gate_endpoint: str, profile: str) -> None:
pass
@abstractmethod
def set_endpoint(self, s3gate_endpoint: str):
"""Set endpoint"""
@abstractmethod
def create_bucket(
self,
bucket: Optional[str] = None,
object_lock_enabled_for_bucket: Optional[bool] = None,
acl: Optional[str] = None,
grant_write: Optional[str] = None,
grant_read: Optional[str] = None,
grant_full_control: Optional[str] = None,
location_constraint: Optional[str] = None,
) -> str:
"""Create a bucket."""
# BUCKET METHODS #
@abstractmethod
def list_buckets(self) -> list[str]:
"""List buckets."""
@abstractmethod
def delete_bucket(self, bucket: str) -> None:
"""Delete bucket"""
@abstractmethod
def head_bucket(self, bucket: str) -> None:
"""This action is useful to determine if a bucket exists and you have permission to access it.
The action returns a 200 OK if the bucket exists and you have permission to access it.
If the bucket does not exist or you do not have permission to access it, the HEAD request
returns a generic 400 Bad Request, 403 Forbidden or 404 Not Found code.
A message body is not included, so you cannot determine the exception beyond these error codes.
"""
@abstractmethod
def put_bucket_versioning(self, bucket: str, status: VersioningStatus) -> None:
"""Sets the versioning state of an existing bucket.
You can set the versioning state with one of the following values:
Enabled—Enables versioning for the objects in the bucket. All objects added to the bucket receive a unique version ID.
Suspended—Disables versioning for the objects in the bucket. All objects added to the bucket receive the version ID null.
If the versioning state has never been set on a bucket, it has no versioning state
"""
@abstractmethod
def get_bucket_versioning_status(self, bucket: str) -> Literal["Enabled", "Suspended"]:
"""Returns the versioning state of a bucket.
To retrieve the versioning state of a bucket, you must be the bucket owner.
"""
@abstractmethod
def put_bucket_tagging(self, bucket: str, tags: list) -> None:
"""Sets the tags for a bucket."""
@abstractmethod
def get_bucket_tagging(self, bucket: str) -> list:
"""Returns the tag set associated with the Outposts bucket."""
@abstractmethod
def delete_bucket_tagging(self, bucket: str) -> None:
"""Deletes the tags from the bucket."""
@abstractmethod
def get_bucket_acl(self, bucket: str) -> list:
"""This implementation of the GET action uses the acl subresource to return the access control list (ACL) of a bucket."""
@abstractmethod
def put_bucket_acl(
self,
bucket: str,
acl: Optional[str] = None,
grant_write: Optional[str] = None,
grant_read: Optional[str] = None,
) -> list:
"""Sets the permissions on an existing bucket using access control lists (ACL)."""
@abstractmethod
def put_object_lock_configuration(self, bucket: str, configuration: dict) -> dict:
"""Places an Object Lock configuration on the specified bucket.
The rule specified in the Object Lock configuration will be applied by
default to every new object placed in the specified bucket."""
@abstractmethod
def get_object_lock_configuration(self, bucket: str) -> dict:
"""Gets the Object Lock configuration for a bucket.
The rule specified in the Object Lock configuration will be applied by
default to every new object placed in the specified bucket."""
@abstractmethod
def get_bucket_policy(self, bucket: str) -> str:
"""Returns the policy of a specified bucket."""
@abstractmethod
def put_bucket_policy(self, bucket: str, policy: dict) -> None:
"""Applies S3 bucket policy to an S3 bucket."""
@abstractmethod
def get_bucket_cors(self, bucket: str) -> dict:
"""Returns the Cross-Origin Resource Sharing (CORS) configuration information set for the bucket."""
@abstractmethod
def put_bucket_cors(self, bucket: str, cors_configuration: dict) -> None:
"""Sets the cors configuration for your bucket. If the configuration exists, S3 replaces it."""
@abstractmethod
def delete_bucket_cors(self, bucket: str) -> None:
"""Deletes the cors configuration information set for the bucket."""
@abstractmethod
def get_bucket_location(self, bucket: str) -> str:
"""Returns the LocationConstraint the bucket resides in. You can set the it
using the LocationConstraint request parameter in a CreateBucket request."""
# END OF BUCKET METHODS #
# OBJECT METHODS #
@abstractmethod
def list_objects_v2(self, bucket: str, full_output: bool = False) -> Union[dict, list[str]]:
"""Returns some or all (up to 1,000) of the objects in a bucket with each request.
You can use the request parameters as selection criteria to return a subset of the objects in a bucket.
A 200 OK response can contain valid or invalid XML. Make sure to design your application
to parse the contents of the response and handle it appropriately.
"""
@abstractmethod
def list_objects(self, bucket: str, full_output: bool = False) -> Union[dict, list[str]]:
"""Returns some or all (up to 1,000) of the objects in a bucket with each request.
You can use the request parameters as selection criteria to return a subset of the objects in a bucket.
A 200 OK response can contain valid or invalid XML. Make sure to design your application
to parse the contents of the response and handle it appropriately.
"""
@abstractmethod
def list_objects_versions(self, bucket: str, full_output: bool = False) -> dict:
"""Returns metadata about all versions of the objects in a bucket."""
@abstractmethod
def list_delete_markers(self, bucket: str, full_output: bool = False) -> dict:
"""Returns metadata about all delete markers of the objects in a bucket."""
@abstractmethod
def put_object(
self,
bucket: str,
filepath: str,
key: Optional[str] = None,
metadata: Optional[dict] = None,
tagging: Optional[str] = None,
acl: Optional[str] = None,
object_lock_mode: Optional[str] = None,
object_lock_retain_until_date: Optional[datetime] = None,
object_lock_legal_hold_status: Optional[str] = None,
grant_full_control: Optional[str] = None,
grant_read: Optional[str] = None,
) -> str:
"""Adds an object to a bucket."""
@abstractmethod
def head_object(self, bucket: str, key: str, version_id: Optional[str] = None) -> dict:
"""The HEAD action retrieves metadata from an object without returning the object itself.
This action is useful if you're only interested in an object's metadata."""
@abstractmethod
def delete_object(self, bucket: str, key: str, version_id: Optional[str] = None) -> dict:
"""Removes the null version (if there is one) of an object and inserts a delete marker,
which becomes the latest version of the object. If there isn't a null version,
S3 does not remove any objects but will still respond that the command was successful."""
@abstractmethod
def delete_objects(self, bucket: str, keys: list[str]) -> dict:
"""This action enables you to delete multiple objects from a bucket
using a single HTTP request. If you know the object keys that
you want to delete, then this action provides a suitable alternative
to sending individual delete requests, reducing per-request overhead.
The request contains a list of up to 1000 keys that you want to delete."""
@abstractmethod
def delete_object_versions(self, bucket: str, object_versions: list) -> dict:
"""Delete object versions"""
@abstractmethod
def delete_object_versions_without_dm(self, bucket: str, object_versions: list) -> None:
"""Delete object versions without delete markers"""
@abstractmethod
def put_object_acl(
self,
bucket: str,
key: str,
acl: Optional[str] = None,
grant_write: Optional[str] = None,
grant_read: Optional[str] = None,
) -> list:
"""Uses the acl subresource to set the access control
list (ACL) permissions for a new or existing object in an S3 bucket."""
@abstractmethod
def get_object_acl(self, bucket: str, key: str, version_id: Optional[str] = None) -> list:
"""Returns the access control list (ACL) of an object."""
@abstractmethod
def copy_object(
self,
source_bucket: str,
source_key: str,
bucket: Optional[str] = None,
key: Optional[str] = None,
acl: Optional[str] = None,
metadata_directive: Optional[Literal["COPY", "REPLACE"]] = None,
metadata: Optional[dict] = None,
tagging_directive: Optional[Literal["COPY", "REPLACE"]] = None,
tagging: Optional[str] = None,
) -> str:
"""Creates a copy of an object"""
@abstractmethod
def get_object(
self,
bucket: str,
key: str,
version_id: Optional[str] = None,
object_range: Optional[tuple[int, int]] = None,
full_output: bool = False,
) -> Union[dict, str]:
"""Retrieves objects from S3."""
@abstractmethod
def create_multipart_upload(self, bucket: str, key: str) -> str:
"""This action initiates a multipart upload and returns an upload ID.
This upload ID is used to associate all of the parts in the specific multipart upload.
You specify this upload ID in each of your subsequent upload part requests (see UploadPart).
You also include this upload ID in the final request to either complete or abort the multipart upload request."""
@abstractmethod
def list_multipart_uploads(self, bucket: str) -> Optional[list[dict]]:
"""This action lists in-progress multipart uploads.
An in-progress multipart upload is a multipart upload that has been initiated
using the Initiate Multipart Upload request, but has not yet been completed or aborted.
This action returns at most 1,000 multipart uploads in the response."""
@abstractmethod
def abort_multipart_upload(self, bucket: str, key: str, upload_id: str) -> None:
"""This action aborts a multipart upload. After a multipart upload is aborted,
no additional parts can be uploaded using that upload ID.
The storage consumed by any previously uploaded parts will be freed.
However, if any part uploads are currently in progress, those part
uploads might or might not succeed. As a result, it might be necessary to
abort a given multipart upload multiple times in order to completely free all storage consumed by all parts."""
@abstractmethod
def upload_part(
self, bucket: str, key: str, upload_id: str, part_num: int, filepath: str
) -> str:
"""Uploads a part in a multipart upload."""
@abstractmethod
def upload_part_copy(
self, bucket: str, key: str, upload_id: str, part_num: int, copy_source: str
) -> str:
"""Uploads a part by copying data from an existing object as data source."""
@abstractmethod
def list_parts(self, bucket: str, key: str, upload_id: str) -> list[dict]:
"""Lists the parts that have been uploaded for a specific multipart upload."""
@abstractmethod
def complete_multipart_upload(self, bucket: str, key: str, upload_id: str, parts: list) -> None:
"""Completes a multipart upload by assembling previously uploaded parts."""
@abstractmethod
def put_object_retention(
self,
bucket: str,
key: str,
retention: dict,
version_id: Optional[str] = None,
bypass_governance_retention: Optional[bool] = None,
) -> None:
"""Places an Object Retention configuration on an object."""
@abstractmethod
def put_object_legal_hold(
self,
bucket: str,
key: str,
legal_hold_status: Literal["ON", "OFF"],
version_id: Optional[str] = None,
) -> None:
"""Applies a legal hold configuration to the specified object."""
@abstractmethod
def put_object_tagging(self, bucket: str, key: str, tags: list) -> None:
"""Sets the tag-set for an object."""
@abstractmethod
def get_object_tagging(self, bucket: str, key: str, version_id: Optional[str] = None) -> list:
"""Returns the tag-set of an object."""
@abstractmethod
def delete_object_tagging(self, bucket: str, key: str) -> None:
"""Removes the entire tag set from the specified object."""
@abstractmethod
def get_object_attributes(
self,
bucket: str,
key: str,
attributes: list[str],
version_id: str = "",
max_parts: int = 0,
part_number: int = 0,
full_output: bool = True,
) -> dict:
"""Retrieves all the metadata from an object without returning the object itself."""
@abstractmethod
def sync(
self,
bucket: str,
dir_path: str,
acl: Optional[str] = None,
metadata: Optional[dict] = None,
) -> dict:
"""sync directory TODO: Add proper description"""
@abstractmethod
def cp(
self,
bucket: str,
dir_path: str,
acl: Optional[str] = None,
metadata: Optional[dict] = None,
) -> dict:
"""cp directory TODO: Add proper description"""
# END OF OBJECT METHODS #