[#312] add new ACL test to s3

Signed-off-by: Yulia Kovshova <y.kovshova@yadro.com>
This commit is contained in:
Юлия Ковшова 2022-09-23 15:09:41 +04:00 committed by Julia Kovshova
parent c71d24ea76
commit 987df42542
5 changed files with 241 additions and 4 deletions

View file

@ -28,7 +28,12 @@ class AwsCliClient:
logger.info(f"Executing command: {cmd}")
_configure_aws_cli(cmd, self.access_key_id, self.secret_access_key)
def create_bucket(self, Bucket: str, ObjectLockEnabledForBucket: bool = None):
def create_bucket(
self,
Bucket: str,
ObjectLockEnabledForBucket: Optional[bool] = None,
ACL: Optional[str] = None,
):
if ObjectLockEnabledForBucket is None:
object_lock = ""
elif ObjectLockEnabledForBucket:
@ -39,6 +44,8 @@ class AwsCliClient:
f"aws {self.common_flags} s3api create-bucket --bucket {Bucket} "
f"{object_lock} --endpoint {S3_GATE}"
)
if ACL:
cmd += f" --acl {ACL}"
_cmd_run(cmd, REGULAR_TIMEOUT)
def list_buckets(self) -> dict:
@ -46,6 +53,14 @@ class AwsCliClient:
output = _cmd_run(cmd)
return self._to_json(output)
def get_bucket_acl(self, Bucket: str) -> dict:
cmd = (
f"aws {self.common_flags} s3api get-bucket-acl --bucket {Bucket} "
f"--endpoint {S3_GATE}"
)
output = _cmd_run(cmd, REGULAR_TIMEOUT)
return self._to_json(output)
def get_bucket_versioning(self, Bucket: str) -> dict:
cmd = (
f"aws {self.common_flags} s3api get-bucket-versioning --bucket {Bucket} "
@ -196,6 +211,47 @@ class AwsCliClient:
output = _cmd_run(cmd, REGULAR_TIMEOUT)
return self._to_json(output)
def put_object_acl(
self,
Bucket: str,
Key: str,
ACL: Optional[str] = None,
GrantWrite: Optional[str] = None,
GrantRead: Optional[str] = None,
) -> dict:
cmd = (
f"aws {self.common_flags} s3api put-object-acl --bucket {Bucket} --key {Key} "
f" --endpoint {S3_GATE}"
)
if ACL:
cmd += f" --acl {ACL}"
if GrantWrite:
cmd += f" --grant-write {GrantWrite}"
if GrantRead:
cmd += f" --grant-read {GrantRead}"
output = _cmd_run(cmd, REGULAR_TIMEOUT)
return self._to_json(output)
def put_bucket_acl(
self,
Bucket: str,
ACL: Optional[str] = None,
GrantWrite: Optional[str] = None,
GrantRead: Optional[str] = None,
) -> dict:
cmd = (
f"aws {self.common_flags} s3api put-bucket-acl --bucket {Bucket} "
f" --endpoint {S3_GATE}"
)
if ACL:
cmd += f" --acl {ACL}"
if GrantWrite:
cmd += f" --grant-write {GrantWrite}"
if GrantRead:
cmd += f" --grant-read {GrantRead}"
output = _cmd_run(cmd, REGULAR_TIMEOUT)
return self._to_json(output)
def delete_objects(self, Bucket: str, Delete: dict) -> dict:
file_path = f"{os.getcwd()}/{ASSETS_DIR}/delete.json"
with open(file_path, "w") as out_file: