forked from TrueCloudLab/frostfs-testlib
675 lines
25 KiB
Python
675 lines
25 KiB
Python
import json
|
|
import logging
|
|
import os
|
|
import uuid
|
|
from datetime import datetime
|
|
from functools import wraps
|
|
from time import sleep
|
|
from typing import Literal, Optional, Union
|
|
|
|
import boto3
|
|
import urllib3
|
|
from botocore.config import Config
|
|
from botocore.exceptions import ClientError
|
|
from mypy_boto3_s3 import S3Client
|
|
|
|
from frostfs_testlib.reporter import get_reporter
|
|
from frostfs_testlib.resources.common import (
|
|
ASSETS_DIR,
|
|
MAX_REQUEST_ATTEMPTS,
|
|
RETRY_MODE,
|
|
S3_SYNC_WAIT_TIME,
|
|
)
|
|
from frostfs_testlib.s3.interfaces import S3ClientWrapper, VersioningStatus, _make_objs_dict
|
|
from frostfs_testlib.utils.cli_utils import log_command_execution
|
|
|
|
reporter = get_reporter()
|
|
logger = logging.getLogger("NeoLogger")
|
|
|
|
# Disable warnings on self-signed certificate which the
|
|
# boto library produces on requests to S3-gate in dev-env
|
|
urllib3.disable_warnings()
|
|
|
|
|
|
def report_error(func):
|
|
@wraps(func)
|
|
def deco(*a, **kw):
|
|
try:
|
|
return func(*a, **kw)
|
|
except ClientError as err:
|
|
log_command_execution("Result", str(err))
|
|
raise
|
|
|
|
return deco
|
|
|
|
|
|
class Boto3ClientWrapper(S3ClientWrapper):
|
|
__repr_name__: str = "Boto3 client"
|
|
|
|
@reporter.step_deco("Configure S3 client (boto3)")
|
|
@report_error
|
|
def __init__(self, access_key_id: str, secret_access_key: str, s3gate_endpoint: str) -> None:
|
|
self.boto3_client: S3Client = None
|
|
self.session = boto3.Session()
|
|
self.config = Config(
|
|
retries={
|
|
"max_attempts": MAX_REQUEST_ATTEMPTS,
|
|
"mode": RETRY_MODE,
|
|
}
|
|
)
|
|
self.access_key_id: str = access_key_id
|
|
self.secret_access_key: str = secret_access_key
|
|
self.s3gate_endpoint: str = ""
|
|
self.set_endpoint(s3gate_endpoint)
|
|
|
|
@reporter.step_deco("Set endpoint S3 to {s3gate_endpoint}")
|
|
def set_endpoint(self, s3gate_endpoint: str):
|
|
if self.s3gate_endpoint == s3gate_endpoint:
|
|
return
|
|
|
|
self.s3gate_endpoint = s3gate_endpoint
|
|
|
|
self.boto3_client: S3Client = self.session.client(
|
|
service_name="s3",
|
|
aws_access_key_id=self.access_key_id,
|
|
aws_secret_access_key=self.secret_access_key,
|
|
config=self.config,
|
|
endpoint_url=s3gate_endpoint,
|
|
verify=False,
|
|
)
|
|
|
|
def _to_s3_param(self, param: str):
|
|
replacement_map = {
|
|
"Acl": "ACL",
|
|
"Cors": "CORS",
|
|
"_": "",
|
|
}
|
|
result = param.title()
|
|
for find, replace in replacement_map.items():
|
|
result = result.replace(find, replace)
|
|
return result
|
|
|
|
# BUCKET METHODS #
|
|
@reporter.step_deco("Create bucket S3")
|
|
@report_error
|
|
def create_bucket(
|
|
self,
|
|
bucket: Optional[str] = None,
|
|
object_lock_enabled_for_bucket: Optional[bool] = None,
|
|
acl: Optional[str] = None,
|
|
grant_write: Optional[str] = None,
|
|
grant_read: Optional[str] = None,
|
|
grant_full_control: Optional[str] = None,
|
|
location_constraint: Optional[str] = None,
|
|
) -> str:
|
|
if bucket is None:
|
|
bucket = str(uuid.uuid4())
|
|
|
|
params = {"Bucket": bucket}
|
|
if object_lock_enabled_for_bucket is not None:
|
|
params.update({"ObjectLockEnabledForBucket": object_lock_enabled_for_bucket})
|
|
if acl is not None:
|
|
params.update({"ACL": acl})
|
|
elif grant_write or grant_read or grant_full_control:
|
|
if grant_write:
|
|
params.update({"GrantWrite": grant_write})
|
|
elif grant_read:
|
|
params.update({"GrantRead": grant_read})
|
|
elif grant_full_control:
|
|
params.update({"GrantFullControl": grant_full_control})
|
|
if location_constraint:
|
|
params.update(
|
|
{"CreateBucketConfiguration": {"LocationConstraint": location_constraint}}
|
|
)
|
|
|
|
s3_bucket = self.boto3_client.create_bucket(**params)
|
|
log_command_execution(f"Created S3 bucket {bucket}", s3_bucket)
|
|
sleep(S3_SYNC_WAIT_TIME)
|
|
return bucket
|
|
|
|
@reporter.step_deco("List buckets S3")
|
|
@report_error
|
|
def list_buckets(self) -> list[str]:
|
|
found_buckets = []
|
|
|
|
response = self.boto3_client.list_buckets()
|
|
log_command_execution("S3 List buckets result", response)
|
|
|
|
for bucket in response["Buckets"]:
|
|
found_buckets.append(bucket["Name"])
|
|
|
|
return found_buckets
|
|
|
|
@reporter.step_deco("Delete bucket S3")
|
|
@report_error
|
|
def delete_bucket(self, bucket: str) -> None:
|
|
response = self.boto3_client.delete_bucket(Bucket=bucket)
|
|
log_command_execution("S3 Delete bucket result", response)
|
|
sleep(S3_SYNC_WAIT_TIME)
|
|
|
|
@reporter.step_deco("Head bucket S3")
|
|
@report_error
|
|
def head_bucket(self, bucket: str) -> None:
|
|
response = self.boto3_client.head_bucket(Bucket=bucket)
|
|
log_command_execution("S3 Head bucket result", response)
|
|
|
|
@reporter.step_deco("Put bucket versioning status")
|
|
@report_error
|
|
def put_bucket_versioning(self, bucket: str, status: VersioningStatus) -> None:
|
|
response = self.boto3_client.put_bucket_versioning(
|
|
Bucket=bucket, VersioningConfiguration={"Status": status.value}
|
|
)
|
|
log_command_execution("S3 Set bucket versioning to", response)
|
|
|
|
@reporter.step_deco("Get bucket versioning status")
|
|
@report_error
|
|
def get_bucket_versioning_status(self, bucket: str) -> Literal["Enabled", "Suspended"]:
|
|
response = self.boto3_client.get_bucket_versioning(Bucket=bucket)
|
|
status = response.get("Status")
|
|
log_command_execution("S3 Got bucket versioning status", response)
|
|
return status
|
|
|
|
@reporter.step_deco("Put bucket tagging")
|
|
@report_error
|
|
def put_bucket_tagging(self, bucket: str, tags: list) -> None:
|
|
tags = [{"Key": tag_key, "Value": tag_value} for tag_key, tag_value in tags]
|
|
tagging = {"TagSet": tags}
|
|
response = self.boto3_client.put_bucket_tagging(Bucket=bucket, Tagging=tagging)
|
|
log_command_execution("S3 Put bucket tagging", response)
|
|
|
|
@reporter.step_deco("Get bucket tagging")
|
|
@report_error
|
|
def get_bucket_tagging(self, bucket: str) -> list:
|
|
response = self.boto3_client.get_bucket_tagging(Bucket=bucket)
|
|
log_command_execution("S3 Get bucket tagging", response)
|
|
return response.get("TagSet")
|
|
|
|
@reporter.step_deco("Get bucket acl")
|
|
@report_error
|
|
def get_bucket_acl(self, bucket: str) -> list:
|
|
response = self.boto3_client.get_bucket_acl(Bucket=bucket)
|
|
log_command_execution("S3 Get bucket acl", response)
|
|
return response.get("Grants")
|
|
|
|
@reporter.step_deco("Delete bucket tagging")
|
|
@report_error
|
|
def delete_bucket_tagging(self, bucket: str) -> None:
|
|
response = self.boto3_client.delete_bucket_tagging(Bucket=bucket)
|
|
log_command_execution("S3 Delete bucket tagging", response)
|
|
|
|
@reporter.step_deco("Put bucket ACL")
|
|
@report_error
|
|
def put_bucket_acl(
|
|
self,
|
|
bucket: str,
|
|
acl: Optional[str] = None,
|
|
grant_write: Optional[str] = None,
|
|
grant_read: Optional[str] = None,
|
|
) -> None:
|
|
params = {
|
|
self._to_s3_param(param): value
|
|
for param, value in locals().items()
|
|
if param not in ["self"] and value is not None
|
|
}
|
|
response = self.boto3_client.put_bucket_acl(**params)
|
|
log_command_execution("S3 ACL bucket result", response)
|
|
|
|
@reporter.step_deco("Put object lock configuration")
|
|
@report_error
|
|
def put_object_lock_configuration(self, bucket: str, configuration: dict) -> dict:
|
|
response = self.boto3_client.put_object_lock_configuration(
|
|
Bucket=bucket, ObjectLockConfiguration=configuration
|
|
)
|
|
log_command_execution("S3 put_object_lock_configuration result", response)
|
|
return response
|
|
|
|
@reporter.step_deco("Get object lock configuration")
|
|
@report_error
|
|
def get_object_lock_configuration(self, bucket: str) -> dict:
|
|
response = self.boto3_client.get_object_lock_configuration(Bucket=bucket)
|
|
log_command_execution("S3 get_object_lock_configuration result", response)
|
|
return response.get("ObjectLockConfiguration")
|
|
|
|
@reporter.step_deco("Get bucket policy")
|
|
@report_error
|
|
def get_bucket_policy(self, bucket: str) -> str:
|
|
response = self.boto3_client.get_bucket_policy(Bucket=bucket)
|
|
log_command_execution("S3 get_bucket_policy result", response)
|
|
return response.get("Policy")
|
|
|
|
@reporter.step_deco("Put bucket policy")
|
|
@report_error
|
|
def put_bucket_policy(self, bucket: str, policy: dict) -> None:
|
|
response = self.boto3_client.put_bucket_policy(Bucket=bucket, Policy=json.dumps(policy))
|
|
log_command_execution("S3 put_bucket_policy result", response)
|
|
return response
|
|
|
|
@reporter.step_deco("Get bucket cors")
|
|
@report_error
|
|
def get_bucket_cors(self, bucket: str) -> dict:
|
|
response = self.boto3_client.get_bucket_cors(Bucket=bucket)
|
|
log_command_execution("S3 get_bucket_cors result", response)
|
|
return response.get("CORSRules")
|
|
|
|
@reporter.step_deco("Get bucket location")
|
|
@report_error
|
|
def get_bucket_location(self, bucket: str) -> str:
|
|
response = self.boto3_client.get_bucket_location(Bucket=bucket)
|
|
log_command_execution("S3 get_bucket_location result", response)
|
|
return response.get("LocationConstraint")
|
|
|
|
@reporter.step_deco("Put bucket cors")
|
|
@report_error
|
|
def put_bucket_cors(self, bucket: str, cors_configuration: dict) -> None:
|
|
response = self.boto3_client.put_bucket_cors(
|
|
Bucket=bucket, CORSConfiguration=cors_configuration
|
|
)
|
|
log_command_execution("S3 put_bucket_cors result", response)
|
|
return response
|
|
|
|
@reporter.step_deco("Delete bucket cors")
|
|
@report_error
|
|
def delete_bucket_cors(self, bucket: str) -> None:
|
|
response = self.boto3_client.delete_bucket_cors(Bucket=bucket)
|
|
log_command_execution("S3 delete_bucket_cors result", response)
|
|
|
|
# END OF BUCKET METHODS #
|
|
# OBJECT METHODS #
|
|
|
|
@reporter.step_deco("List objects S3 v2")
|
|
@report_error
|
|
def list_objects_v2(self, bucket: str, full_output: bool = False) -> Union[dict, list[str]]:
|
|
response = self.boto3_client.list_objects_v2(Bucket=bucket)
|
|
log_command_execution("S3 v2 List objects result", response)
|
|
|
|
obj_list = [obj["Key"] for obj in response.get("Contents", [])]
|
|
logger.info(f"Found s3 objects: {obj_list}")
|
|
|
|
return response if full_output else obj_list
|
|
|
|
@reporter.step_deco("List objects S3")
|
|
@report_error
|
|
def list_objects(self, bucket: str, full_output: bool = False) -> Union[dict, list[str]]:
|
|
response = self.boto3_client.list_objects(Bucket=bucket)
|
|
log_command_execution("S3 List objects result", response)
|
|
|
|
obj_list = [obj["Key"] for obj in response.get("Contents", [])]
|
|
logger.info(f"Found s3 objects: {obj_list}")
|
|
|
|
return response if full_output else obj_list
|
|
|
|
@reporter.step_deco("List objects versions S3")
|
|
@report_error
|
|
def list_objects_versions(self, bucket: str, full_output: bool = False) -> dict:
|
|
response = self.boto3_client.list_object_versions(Bucket=bucket)
|
|
log_command_execution("S3 List objects versions result", response)
|
|
return response if full_output else response.get("Versions", [])
|
|
|
|
@reporter.step_deco("List objects delete markers S3")
|
|
@report_error
|
|
def list_delete_markers(self, bucket: str, full_output: bool = False) -> list:
|
|
response = self.boto3_client.list_object_versions(Bucket=bucket)
|
|
log_command_execution("S3 List objects delete markers result", response)
|
|
return response if full_output else response.get("DeleteMarkers", [])
|
|
|
|
@reporter.step_deco("Put object S3")
|
|
@report_error
|
|
def put_object(
|
|
self,
|
|
bucket: str,
|
|
filepath: str,
|
|
key: Optional[str] = None,
|
|
metadata: Optional[dict] = None,
|
|
tagging: Optional[str] = None,
|
|
acl: Optional[str] = None,
|
|
object_lock_mode: Optional[str] = None,
|
|
object_lock_retain_until_date: Optional[datetime] = None,
|
|
object_lock_legal_hold_status: Optional[str] = None,
|
|
grant_full_control: Optional[str] = None,
|
|
grant_read: Optional[str] = None,
|
|
) -> str:
|
|
if key is None:
|
|
key = os.path.basename(filepath)
|
|
|
|
with open(filepath, "rb") as put_file:
|
|
body = put_file.read()
|
|
|
|
params = {
|
|
self._to_s3_param(param): value
|
|
for param, value in locals().items()
|
|
if param not in ["self", "filepath", "put_file"] and value is not None
|
|
}
|
|
response = self.boto3_client.put_object(**params)
|
|
log_command_execution("S3 Put object result", response)
|
|
return response.get("VersionId")
|
|
|
|
@reporter.step_deco("Head object S3")
|
|
@report_error
|
|
def head_object(self, bucket: str, key: str, version_id: Optional[str] = None) -> dict:
|
|
params = {
|
|
self._to_s3_param(param): value
|
|
for param, value in locals().items()
|
|
if param not in ["self"] and value is not None
|
|
}
|
|
response = self.boto3_client.head_object(**params)
|
|
log_command_execution("S3 Head object result", response)
|
|
return response
|
|
|
|
@reporter.step_deco("Delete object S3")
|
|
@report_error
|
|
def delete_object(self, bucket: str, key: str, version_id: Optional[str] = None) -> dict:
|
|
params = {
|
|
self._to_s3_param(param): value
|
|
for param, value in locals().items()
|
|
if param not in ["self"] and value is not None
|
|
}
|
|
response = self.boto3_client.delete_object(**params)
|
|
log_command_execution("S3 Delete object result", response)
|
|
sleep(S3_SYNC_WAIT_TIME)
|
|
return response
|
|
|
|
@reporter.step_deco("Delete objects S3")
|
|
@report_error
|
|
def delete_objects(self, bucket: str, keys: list[str]) -> dict:
|
|
response = self.boto3_client.delete_objects(Bucket=bucket, Delete=_make_objs_dict(keys))
|
|
log_command_execution("S3 Delete objects result", response)
|
|
assert (
|
|
"Errors" not in response
|
|
), f'The following objects have not been deleted: {[err_info["Key"] for err_info in response["Errors"]]}.\nError Message: {response["Errors"]["Message"]}'
|
|
sleep(S3_SYNC_WAIT_TIME)
|
|
return response
|
|
|
|
@reporter.step_deco("Delete object versions S3")
|
|
@report_error
|
|
def delete_object_versions(self, bucket: str, object_versions: list) -> dict:
|
|
# Build deletion list in S3 format
|
|
delete_list = {
|
|
"Objects": [
|
|
{
|
|
"Key": object_version["Key"],
|
|
"VersionId": object_version["VersionId"],
|
|
}
|
|
for object_version in object_versions
|
|
]
|
|
}
|
|
response = self.boto3_client.delete_objects(Bucket=bucket, Delete=delete_list)
|
|
log_command_execution("S3 Delete objects result", response)
|
|
return response
|
|
|
|
@reporter.step_deco("Delete object versions S3 without delete markers")
|
|
@report_error
|
|
def delete_object_versions_without_dm(self, bucket: str, object_versions: list) -> None:
|
|
# Delete objects without creating delete markers
|
|
for object_version in object_versions:
|
|
response = self.boto3_client.delete_object(
|
|
Bucket=bucket, Key=object_version["Key"], VersionId=object_version["VersionId"]
|
|
)
|
|
log_command_execution("S3 Delete object result", response)
|
|
|
|
@reporter.step_deco("Put object ACL")
|
|
@report_error
|
|
def put_object_acl(
|
|
self,
|
|
bucket: str,
|
|
key: str,
|
|
acl: Optional[str] = None,
|
|
grant_write: Optional[str] = None,
|
|
grant_read: Optional[str] = None,
|
|
) -> list:
|
|
# pytest.skip("Method put_object_acl is not supported by boto3 client")
|
|
raise NotImplementedError("Unsupported for boto3 client")
|
|
|
|
@reporter.step_deco("Get object ACL")
|
|
@report_error
|
|
def get_object_acl(self, bucket: str, key: str, version_id: Optional[str] = None) -> list:
|
|
params = {
|
|
self._to_s3_param(param): value
|
|
for param, value in locals().items()
|
|
if param not in ["self"] and value is not None
|
|
}
|
|
response = self.boto3_client.get_object_acl(**params)
|
|
log_command_execution("S3 ACL objects result", response)
|
|
return response.get("Grants")
|
|
|
|
@reporter.step_deco("Copy object S3")
|
|
@report_error
|
|
def copy_object(
|
|
self,
|
|
source_bucket: str,
|
|
source_key: str,
|
|
bucket: Optional[str] = None,
|
|
key: Optional[str] = None,
|
|
acl: Optional[str] = None,
|
|
metadata_directive: Optional[Literal["COPY", "REPLACE"]] = None,
|
|
metadata: Optional[dict] = None,
|
|
tagging_directive: Optional[Literal["COPY", "REPLACE"]] = None,
|
|
tagging: Optional[str] = None,
|
|
) -> str:
|
|
if bucket is None:
|
|
bucket = source_bucket
|
|
if key is None:
|
|
key = os.path.join(os.getcwd(), str(uuid.uuid4()))
|
|
copy_source = f"{source_bucket}/{source_key}"
|
|
|
|
params = {
|
|
self._to_s3_param(param): value
|
|
for param, value in locals().items()
|
|
if param not in ["self", "source_bucket", "source_key"] and value is not None
|
|
}
|
|
response = self.boto3_client.copy_object(**params)
|
|
log_command_execution("S3 Copy objects result", response)
|
|
return key
|
|
|
|
@reporter.step_deco("Get object S3")
|
|
@report_error
|
|
def get_object(
|
|
self,
|
|
bucket: str,
|
|
key: str,
|
|
version_id: Optional[str] = None,
|
|
object_range: Optional[tuple[int, int]] = None,
|
|
full_output: bool = False,
|
|
) -> Union[dict, str]:
|
|
filename = os.path.join(os.getcwd(), ASSETS_DIR, str(uuid.uuid4()))
|
|
range_str = None
|
|
if object_range:
|
|
range_str = f"bytes={object_range[0]}-{object_range[1]}"
|
|
|
|
params = {
|
|
self._to_s3_param(param): value
|
|
for param, value in {**locals(), **{"Range": range_str}}.items()
|
|
if param not in ["self", "object_range", "full_output", "range_str", "filename"]
|
|
and value is not None
|
|
}
|
|
response = self.boto3_client.get_object(**params)
|
|
log_command_execution("S3 Get objects result", response)
|
|
|
|
with open(f"{filename}", "wb") as get_file:
|
|
chunk = response["Body"].read(1024)
|
|
while chunk:
|
|
get_file.write(chunk)
|
|
chunk = response["Body"].read(1024)
|
|
return response if full_output else filename
|
|
|
|
@reporter.step_deco("Create multipart upload S3")
|
|
@report_error
|
|
def create_multipart_upload(self, bucket: str, key: str) -> str:
|
|
response = self.boto3_client.create_multipart_upload(Bucket=bucket, Key=key)
|
|
log_command_execution("S3 Created multipart upload", response)
|
|
assert response.get("UploadId"), f"Expected UploadId in response:\n{response}"
|
|
|
|
return response["UploadId"]
|
|
|
|
@reporter.step_deco("List multipart uploads S3")
|
|
@report_error
|
|
def list_multipart_uploads(self, bucket: str) -> Optional[list[dict]]:
|
|
response = self.boto3_client.list_multipart_uploads(Bucket=bucket)
|
|
log_command_execution("S3 List multipart upload", response)
|
|
|
|
return response.get("Uploads")
|
|
|
|
@reporter.step_deco("Abort multipart upload S3")
|
|
@report_error
|
|
def abort_multipart_upload(self, bucket: str, key: str, upload_id: str) -> None:
|
|
response = self.boto3_client.abort_multipart_upload(
|
|
Bucket=bucket, Key=key, UploadId=upload_id
|
|
)
|
|
log_command_execution("S3 Abort multipart upload", response)
|
|
|
|
@reporter.step_deco("Upload part S3")
|
|
@report_error
|
|
def upload_part(
|
|
self, bucket: str, key: str, upload_id: str, part_num: int, filepath: str
|
|
) -> str:
|
|
with open(filepath, "rb") as put_file:
|
|
body = put_file.read()
|
|
|
|
response = self.boto3_client.upload_part(
|
|
UploadId=upload_id,
|
|
Bucket=bucket,
|
|
Key=key,
|
|
PartNumber=part_num,
|
|
Body=body,
|
|
)
|
|
log_command_execution("S3 Upload part", response)
|
|
assert response.get("ETag"), f"Expected ETag in response:\n{response}"
|
|
|
|
return response["ETag"]
|
|
|
|
@reporter.step_deco("Upload copy part S3")
|
|
@report_error
|
|
def upload_part_copy(
|
|
self, bucket: str, key: str, upload_id: str, part_num: int, copy_source: str
|
|
) -> str:
|
|
response = self.boto3_client.upload_part_copy(
|
|
UploadId=upload_id,
|
|
Bucket=bucket,
|
|
Key=key,
|
|
PartNumber=part_num,
|
|
CopySource=copy_source,
|
|
)
|
|
log_command_execution("S3 Upload copy part", response)
|
|
assert response.get("CopyPartResult", []).get(
|
|
"ETag"
|
|
), f"Expected ETag in response:\n{response}"
|
|
|
|
return response["CopyPartResult"]["ETag"]
|
|
|
|
@reporter.step_deco("List parts S3")
|
|
@report_error
|
|
def list_parts(self, bucket: str, key: str, upload_id: str) -> list[dict]:
|
|
response = self.boto3_client.list_parts(UploadId=upload_id, Bucket=bucket, Key=key)
|
|
log_command_execution("S3 List part", response)
|
|
assert response.get("Parts"), f"Expected Parts in response:\n{response}"
|
|
|
|
return response["Parts"]
|
|
|
|
@reporter.step_deco("Complete multipart upload S3")
|
|
@report_error
|
|
def complete_multipart_upload(self, bucket: str, key: str, upload_id: str, parts: list) -> None:
|
|
parts = [{"ETag": etag, "PartNumber": part_num} for part_num, etag in parts]
|
|
response = self.boto3_client.complete_multipart_upload(
|
|
Bucket=bucket, Key=key, UploadId=upload_id, MultipartUpload={"Parts": parts}
|
|
)
|
|
log_command_execution("S3 Complete multipart upload", response)
|
|
|
|
@reporter.step_deco("Put object retention")
|
|
@report_error
|
|
def put_object_retention(
|
|
self,
|
|
bucket: str,
|
|
key: str,
|
|
retention: dict,
|
|
version_id: Optional[str] = None,
|
|
bypass_governance_retention: Optional[bool] = None,
|
|
) -> None:
|
|
params = {
|
|
self._to_s3_param(param): value
|
|
for param, value in locals().items()
|
|
if param not in ["self"] and value is not None
|
|
}
|
|
response = self.boto3_client.put_object_retention(**params)
|
|
log_command_execution("S3 Put object retention ", response)
|
|
|
|
@reporter.step_deco("Put object legal hold")
|
|
@report_error
|
|
def put_object_legal_hold(
|
|
self,
|
|
bucket: str,
|
|
key: str,
|
|
legal_hold_status: Literal["ON", "OFF"],
|
|
version_id: Optional[str] = None,
|
|
) -> None:
|
|
legal_hold = {"Status": legal_hold_status}
|
|
params = {
|
|
self._to_s3_param(param): value
|
|
for param, value in locals().items()
|
|
if param not in ["self", "legal_hold_status"] and value is not None
|
|
}
|
|
response = self.boto3_client.put_object_legal_hold(**params)
|
|
log_command_execution("S3 Put object legal hold ", response)
|
|
|
|
@reporter.step_deco("Put object tagging")
|
|
@report_error
|
|
def put_object_tagging(self, bucket: str, key: str, tags: list) -> None:
|
|
tags = [{"Key": tag_key, "Value": tag_value} for tag_key, tag_value in tags]
|
|
tagging = {"TagSet": tags}
|
|
response = self.boto3_client.put_object_tagging(Bucket=bucket, Key=key, Tagging=tagging)
|
|
log_command_execution("S3 Put object tagging", response)
|
|
|
|
@reporter.step_deco("Get object tagging")
|
|
@report_error
|
|
def get_object_tagging(self, bucket: str, key: str, version_id: Optional[str] = None) -> list:
|
|
params = {
|
|
self._to_s3_param(param): value
|
|
for param, value in locals().items()
|
|
if param not in ["self"] and value is not None
|
|
}
|
|
response = self.boto3_client.get_object_tagging(**params)
|
|
log_command_execution("S3 Get object tagging", response)
|
|
return response.get("TagSet")
|
|
|
|
@reporter.step_deco("Delete object tagging")
|
|
@report_error
|
|
def delete_object_tagging(self, bucket: str, key: str) -> None:
|
|
response = self.boto3_client.delete_object_tagging(Bucket=bucket, Key=key)
|
|
log_command_execution("S3 Delete object tagging", response)
|
|
|
|
@reporter.step_deco("Get object attributes")
|
|
@report_error
|
|
def get_object_attributes(
|
|
self,
|
|
bucket: str,
|
|
key: str,
|
|
attributes: list[str],
|
|
version_id: Optional[str] = None,
|
|
max_parts: Optional[int] = None,
|
|
part_number: Optional[int] = None,
|
|
full_output: bool = True,
|
|
) -> dict:
|
|
logger.warning("Method get_object_attributes is not supported by boto3 client")
|
|
return {}
|
|
|
|
@reporter.step_deco("Sync directory S3")
|
|
@report_error
|
|
def sync(
|
|
self,
|
|
bucket: str,
|
|
dir_path: str,
|
|
acl: Optional[str] = None,
|
|
metadata: Optional[dict] = None,
|
|
) -> dict:
|
|
raise NotImplementedError("Sync is not supported for boto3 client")
|
|
|
|
@reporter.step_deco("CP directory S3")
|
|
@report_error
|
|
def cp(
|
|
self,
|
|
bucket: str,
|
|
dir_path: str,
|
|
acl: Optional[str] = None,
|
|
metadata: Optional[dict] = None,
|
|
) -> dict:
|
|
raise NotImplementedError("Cp is not supported for boto3 client")
|
|
|
|
# END OBJECT METHODS #
|