xk6-frostfs/scenarios/preset/helpers/aws_cli.py
a.berezin 9b9db46a07
All checks were successful
DCO action / DCO (pull_request) Successful in 43s
Tests and linters / Tests (1.21) (pull_request) Successful in 3m10s
Tests and linters / Tests (1.22) (pull_request) Successful in 3m21s
Tests and linters / Tests with -race (pull_request) Successful in 3m34s
Tests and linters / Lint (pull_request) Successful in 2m56s
[#152] Allow to set mix of policies for containers and buckets
Signed-off-by: a.berezin <a.berezin@yadro.com>
2024-07-02 20:45:31 +03:00

54 lines
2 KiB
Python

import uuid
from helpers.cmd import execute_cmd, log
def create_bucket(endpoint, versioning, location, acl, no_verify_ssl):
configuration = ""
if location:
configuration = f"--create-bucket-configuration 'LocationConstraint={location}'"
if acl:
acl = f"--acl {acl}"
bucket_name = str(uuid.uuid4())
no_verify_ssl_str = "--no-verify-ssl" if no_verify_ssl else ""
cmd_line = f"aws {no_verify_ssl_str} s3api create-bucket --bucket {bucket_name} " \
f"--endpoint {endpoint} {configuration} {acl} "
cmd_line_ver = f"aws {no_verify_ssl_str} s3api put-bucket-versioning --bucket {bucket_name} " \
f"--versioning-configuration Status=Enabled --endpoint {endpoint} {acl} "
output, success = execute_cmd(cmd_line)
if not success and "succeeded and you already own it" not in output:
log(f"{cmd_line}\n"
f"Bucket {bucket_name} has not been created:\n"
f"Error: {output}", endpoint)
return False
if versioning == "True":
output, success = execute_cmd(cmd_line_ver)
if not success:
log(f"{cmd_line_ver}\n"
f"Bucket versioning has not been applied for bucket {bucket_name}\n"
f"Error: {output}", endpoint)
else:
log(f"Bucket versioning has been applied for bucket {bucket_name}", endpoint)
log(f"Created bucket: {bucket_name} ({location})", endpoint)
return bucket_name
def upload_object(bucket, payload_filepath, endpoint, no_verify_ssl):
object_name = str(uuid.uuid4())
no_verify_ssl_str = "--no-verify-ssl" if no_verify_ssl else ""
cmd_line = f"aws {no_verify_ssl_str} s3api put-object --bucket {bucket} --key {object_name} " \
f"--body {payload_filepath} --endpoint {endpoint}"
output, success = execute_cmd(cmd_line)
if not success:
log(f"{cmd_line}\n"
f"Object {object_name} has not been uploaded\n"
f"Error: {output}", endpoint)
return False
return bucket, endpoint, object_name