Added interfaces for put/get lifecycle configuration to s3 clients

This commit is contained in:
Yaroslava Lukoyanova 2024-09-23 17:54:40 +03:00
parent 4a2ac8a9b6
commit a814ab3e74
3 changed files with 42 additions and 0 deletions

View file

@ -754,6 +754,26 @@ class AwsCliClient(S3ClientWrapper):
response = self._to_json(output) response = self._to_json(output)
return response.get("ObjectLockConfiguration") return response.get("ObjectLockConfiguration")
@reporter.step("Put bucket lifecycle configuration")
def put_bucket_lifecycle_configuration(self, bucket: str, lifecycle_configuration: dict, dumped_configuration: str) -> dict:
cmd = (
f"aws {self.common_flags} s3api put-bucket-lifecycle-configuration --bucket {bucket} "
f"--endpoint-url {self.s3gate_endpoint} --lifecycle-configuration file://{dumped_configuration} --profile {self.profile}"
)
output = self.local_shell.exec(cmd).stdout
response = self._to_json(output)
return response
@reporter.step("Get bucket lifecycle configuration")
def get_bucket_lifecycle_configuration(self, bucket: str) -> dict:
cmd = (
f"aws {self.common_flags} s3api get-bucket-lifecycle-configuration --bucket {bucket} "
f"--endpoint-url {self.s3gate_endpoint} --profile {self.profile}"
)
output = self.local_shell.exec(cmd).stdout
response = self._to_json(output)
return response
@staticmethod @staticmethod
def _to_json(output: str) -> dict: def _to_json(output: str) -> dict:
json_output = {} json_output = {}

View file

@ -296,6 +296,20 @@ class Boto3ClientWrapper(S3ClientWrapper):
response = self.boto3_client.delete_bucket_cors(Bucket=bucket) response = self.boto3_client.delete_bucket_cors(Bucket=bucket)
log_command_execution(self.s3gate_endpoint, "S3 delete_bucket_cors result", response, {"Bucket": bucket}) log_command_execution(self.s3gate_endpoint, "S3 delete_bucket_cors result", response, {"Bucket": bucket})
@reporter.step("Put bucket lifecycle configuration")
@report_error
def put_bucket_lifecycle_configuration(self, bucket: str, lifecycle_configuration: dict, dumped_configuration: str) -> dict:
response = self.boto3_client.put_bucket_lifecycle_configuration(Bucket=bucket, LifecycleConfiguration=lifecycle_configuration)
log_command_execution(self.s3gate_endpoint, "S3 put_bucket_lifecycle_configuration result", response, {"Bucket": bucket})
return response
@reporter.step("Get bucket lifecycle configuration")
@report_error
def get_bucket_lifecycle_configuration(self, bucket: str) -> dict:
response = self.boto3_client.get_bucket_lifecycle_configuration(Bucket=bucket)
log_command_execution(self.s3gate_endpoint, "S3 get_bucket_lifecycle_configuration result", response, {"Bucket": bucket})
return response
# END OF BUCKET METHODS # # END OF BUCKET METHODS #
# OBJECT METHODS # # OBJECT METHODS #

View file

@ -366,6 +366,14 @@ class S3ClientWrapper(HumanReadableABC):
def delete_object_tagging(self, bucket: str, key: str) -> None: def delete_object_tagging(self, bucket: str, key: str) -> None:
"""Removes the entire tag set from the specified object.""" """Removes the entire tag set from the specified object."""
@abstractmethod
def put_bucket_lifecycle_configuration(self, bucket: str, lifecycle_configuration: dict, dumped_configuration: str) -> dict:
"""Adds or updates bucket lifecycle configuration"""
@abstractmethod
def get_bucket_lifecycle_configuration(self, bucket: str) -> dict:
"""Gets bucket lifecycle configuration"""
@abstractmethod @abstractmethod
def get_object_attributes( def get_object_attributes(
self, self,