[#312] Add new Locking test

Signed-off-by: Yulia Kovshova <y.kovshova@yadro.com>
This commit is contained in:
Юлия Ковшова 2022-10-27 19:42:05 +03:00 committed by Julia Kovshova
parent d21a89485b
commit c9e42a7a0a
5 changed files with 360 additions and 8 deletions

View file

@ -348,6 +348,35 @@ class AwsCliClient:
output = _cmd_run(cmd)
return self._to_json(output)
def put_object_retention(
self,
Bucket: str,
Key: str,
Retention: dict,
VersionId: Optional[str] = None,
BypassGovernanceRetention: Optional[bool] = None,
) -> dict:
version = f" --version-id {VersionId}" if VersionId else ""
cmd = (
f"aws {self.common_flags} s3api put-object-retention --bucket {Bucket} --key {Key} "
f"{version} --retention '{json.dumps(Retention, indent=4, sort_keys=True, default=str)}' --endpoint {S3_GATE}"
)
if not BypassGovernanceRetention is None:
cmd += " --bypass-governance-retention"
output = _cmd_run(cmd)
return self._to_json(output)
def put_object_legal_hold(
self, Bucket: str, Key: str, LegalHold: dict, VersionId: Optional[str] = None
) -> dict:
version = f" --version-id {VersionId}" if VersionId else ""
cmd = (
f"aws {self.common_flags} s3api put-object-legal-hold --bucket {Bucket} --key {Key} "
f"{version} --legal-hold '{json.dumps(LegalHold)}' --endpoint {S3_GATE}"
)
output = _cmd_run(cmd)
return self._to_json(output)
def put_object_tagging(self, Bucket: str, Key: str, Tagging: dict) -> dict:
cmd = (
f"aws {self.common_flags} s3api put-object-tagging --bucket {Bucket} --key {Key} "
@ -483,6 +512,22 @@ class AwsCliClient:
output = _cmd_run(cmd)
return self._to_json(output)
def put_object_lock_configuration(self, Bucket, ObjectLockConfiguration):
cmd = (
f"aws {self.common_flags} s3api put-object-lock-configuration --bucket {Bucket} "
f"--object-lock-configuration '{json.dumps(ObjectLockConfiguration)}' --endpoint-url {S3_GATE}"
)
output = _cmd_run(cmd)
return self._to_json(output)
def get_object_lock_configuration(self, Bucket):
cmd = (
f"aws {self.common_flags} s3api get-object-lock-configuration --bucket {Bucket} "
f"--endpoint-url {S3_GATE}"
)
output = _cmd_run(cmd)
return self._to_json(output)
@staticmethod
def _to_json(output: str) -> dict:
json_output = {}