Add AWS retries #113
2 changed files with 17 additions and 56 deletions
|
@ -43,6 +43,6 @@ with open(DEFAULT_WALLET_CONFIG, "w") as file:
|
|||
|
||||
# Number of attempts that S3 clients will attempt per each request (1 means single attempt
|
||||
# without any retries)
|
||||
MAX_REQUEST_ATTEMPTS = 1
|
||||
MAX_REQUEST_ATTEMPTS = 5
|
||||
RETRY_MODE = "standard"
|
||||
CREDENTIALS_CREATE_TIMEOUT = "1m"
|
||||
|
|
|
@ -7,12 +7,7 @@ from time import sleep
|
|||
from typing import Literal, Optional, Union
|
||||
|
||||
from frostfs_testlib.reporter import get_reporter
|
||||
from frostfs_testlib.resources.common import (
|
||||
ASSETS_DIR,
|
||||
MAX_REQUEST_ATTEMPTS,
|
||||
RETRY_MODE,
|
||||
S3_SYNC_WAIT_TIME,
|
||||
)
|
||||
from frostfs_testlib.resources.common import ASSETS_DIR, MAX_REQUEST_ATTEMPTS, RETRY_MODE, S3_SYNC_WAIT_TIME
|
||||
from frostfs_testlib.s3.interfaces import S3ClientWrapper, VersioningStatus, _make_objs_dict
|
||||
from frostfs_testlib.shell import CommandOptions
|
||||
from frostfs_testlib.shell.local_shell import LocalShell
|
||||
|
@ -128,9 +123,7 @@ class AwsCliClient(S3ClientWrapper):
|
|||
|
||||
@reporter.step_deco("Put bucket tagging")
|
||||
def put_bucket_tagging(self, bucket: str, tags: list) -> None:
|
||||
tags_json = {
|
||||
"TagSet": [{"Key": tag_key, "Value": tag_value} for tag_key, tag_value in tags]
|
||||
}
|
||||
tags_json = {"TagSet": [{"Key": tag_key, "Value": tag_value} for tag_key, tag_value in tags]}
|
||||
cmd = (
|
||||
f"aws {self.common_flags} s3api put-bucket-tagging --bucket {bucket} "
|
||||
f"--tagging '{json.dumps(tags_json)}' --endpoint {self.s3gate_endpoint}"
|
||||
|
@ -140,8 +133,7 @@ class AwsCliClient(S3ClientWrapper):
|
|||
@reporter.step_deco("Get bucket tagging")
|
||||
def get_bucket_tagging(self, bucket: str) -> list:
|
||||
cmd = (
|
||||
f"aws {self.common_flags} s3api get-bucket-tagging --bucket {bucket} "
|
||||
f"--endpoint {self.s3gate_endpoint}"
|
||||
f"aws {self.common_flags} s3api get-bucket-tagging --bucket {bucket} " f"--endpoint {self.s3gate_endpoint}"
|
||||
)
|
||||
output = self.local_shell.exec(cmd).stdout
|
||||
response = self._to_json(output)
|
||||
|
@ -149,10 +141,7 @@ class AwsCliClient(S3ClientWrapper):
|
|||
|
||||
@reporter.step_deco("Get bucket acl")
|
||||
def get_bucket_acl(self, bucket: str) -> list:
|
||||
cmd = (
|
||||
f"aws {self.common_flags} s3api get-bucket-acl --bucket {bucket} "
|
||||
f"--endpoint {self.s3gate_endpoint}"
|
||||
)
|
||||
cmd = f"aws {self.common_flags} s3api get-bucket-acl --bucket {bucket} " f"--endpoint {self.s3gate_endpoint}"
|
||||
output = self.local_shell.exec(cmd).stdout
|
||||
response = self._to_json(output)
|
||||
return response.get("Grants")
|
||||
|
@ -160,8 +149,7 @@ class AwsCliClient(S3ClientWrapper):
|
|||
@reporter.step_deco("Get bucket location")
|
||||
def get_bucket_location(self, bucket: str) -> dict:
|
||||
cmd = (
|
||||
f"aws {self.common_flags} s3api get-bucket-location --bucket {bucket} "
|
||||
f"--endpoint {self.s3gate_endpoint}"
|
||||
f"aws {self.common_flags} s3api get-bucket-location --bucket {bucket} " f"--endpoint {self.s3gate_endpoint}"
|
||||
)
|
||||
output = self.local_shell.exec(cmd).stdout
|
||||
response = self._to_json(output)
|
||||
|
@ -169,10 +157,7 @@ class AwsCliClient(S3ClientWrapper):
|
|||
|
||||
@reporter.step_deco("List objects S3")
|
||||
def list_objects(self, bucket: str, full_output: bool = False) -> Union[dict, list[str]]:
|
||||
cmd = (
|
||||
f"aws {self.common_flags} s3api list-objects --bucket {bucket} "
|
||||
f"--endpoint {self.s3gate_endpoint}"
|
||||
)
|
||||
cmd = f"aws {self.common_flags} s3api list-objects --bucket {bucket} " f"--endpoint {self.s3gate_endpoint}"
|
||||
output = self.local_shell.exec(cmd).stdout
|
||||
response = self._to_json(output)
|
||||
|
||||
|
@ -183,10 +168,7 @@ class AwsCliClient(S3ClientWrapper):
|
|||
|
||||
@reporter.step_deco("List objects S3 v2")
|
||||
def list_objects_v2(self, bucket: str, full_output: bool = False) -> Union[dict, list[str]]:
|
||||
cmd = (
|
||||
f"aws {self.common_flags} s3api list-objects-v2 --bucket {bucket} "
|
||||
f"--endpoint {self.s3gate_endpoint}"
|
||||
)
|
||||
cmd = f"aws {self.common_flags} s3api list-objects-v2 --bucket {bucket} " f"--endpoint {self.s3gate_endpoint}"
|
||||
output = self.local_shell.exec(cmd).stdout
|
||||
response = self._to_json(output)
|
||||
|
||||
|
@ -371,10 +353,7 @@ class AwsCliClient(S3ClientWrapper):
|
|||
grant_write: Optional[str] = None,
|
||||
grant_read: Optional[str] = None,
|
||||
) -> None:
|
||||
cmd = (
|
||||
f"aws {self.common_flags} s3api put-bucket-acl --bucket {bucket} "
|
||||
f" --endpoint {self.s3gate_endpoint}"
|
||||
)
|
||||
cmd = f"aws {self.common_flags} s3api put-bucket-acl --bucket {bucket} " f" --endpoint {self.s3gate_endpoint}"
|
||||
if acl:
|
||||
cmd += f" --acl {acl}"
|
||||
if grant_write:
|
||||
|
@ -442,9 +421,7 @@ class AwsCliClient(S3ClientWrapper):
|
|||
def delete_object_versions_without_dm(self, bucket: str, object_versions: list) -> None:
|
||||
# Delete objects without creating delete markers
|
||||
for object_version in object_versions:
|
||||
self.delete_object(
|
||||
bucket=bucket, key=object_version["Key"], version_id=object_version["VersionId"]
|
||||
)
|
||||
self.delete_object(bucket=bucket, key=object_version["Key"], version_id=object_version["VersionId"])
|
||||
|
||||
@reporter.step_deco("Get object attributes")
|
||||
def get_object_attributes(
|
||||
|
@ -480,10 +457,7 @@ class AwsCliClient(S3ClientWrapper):
|
|||
|
||||
@reporter.step_deco("Get bucket policy")
|
||||
def get_bucket_policy(self, bucket: str) -> dict:
|
||||
cmd = (
|
||||
f"aws {self.common_flags} s3api get-bucket-policy --bucket {bucket} "
|
||||
f"--endpoint {self.s3gate_endpoint}"
|
||||
)
|
||||
cmd = f"aws {self.common_flags} s3api get-bucket-policy --bucket {bucket} " f"--endpoint {self.s3gate_endpoint}"
|
||||
output = self.local_shell.exec(cmd).stdout
|
||||
response = self._to_json(output)
|
||||
return response.get("Policy")
|
||||
|
@ -505,10 +479,7 @@ class AwsCliClient(S3ClientWrapper):
|
|||
|
||||
@reporter.step_deco("Get bucket cors")
|
||||
def get_bucket_cors(self, bucket: str) -> dict:
|
||||
cmd = (
|
||||
f"aws {self.common_flags} s3api get-bucket-cors --bucket {bucket} "
|
||||
f"--endpoint {self.s3gate_endpoint}"
|
||||
)
|
||||
cmd = f"aws {self.common_flags} s3api get-bucket-cors --bucket {bucket} " f"--endpoint {self.s3gate_endpoint}"
|
||||
output = self.local_shell.exec(cmd).stdout
|
||||
response = self._to_json(output)
|
||||
return response.get("CORSRules")
|
||||
|
@ -524,8 +495,7 @@ class AwsCliClient(S3ClientWrapper):
|
|||
@reporter.step_deco("Delete bucket cors")
|
||||
def delete_bucket_cors(self, bucket: str) -> None:
|
||||
cmd = (
|
||||
f"aws {self.common_flags} s3api delete-bucket-cors --bucket {bucket} "
|
||||
f"--endpoint {self.s3gate_endpoint}"
|
||||
f"aws {self.common_flags} s3api delete-bucket-cors --bucket {bucket} " f"--endpoint {self.s3gate_endpoint}"
|
||||
)
|
||||
self.local_shell.exec(cmd)
|
||||
|
||||
|
@ -608,10 +578,7 @@ class AwsCliClient(S3ClientWrapper):
|
|||
acl: Optional[str] = None,
|
||||
metadata: Optional[dict] = None,
|
||||
) -> dict:
|
||||
cmd = (
|
||||
f"aws {self.common_flags} s3 sync {dir_path} s3://{bucket} "
|
||||
f"--endpoint-url {self.s3gate_endpoint}"
|
||||
)
|
||||
cmd = f"aws {self.common_flags} s3 sync {dir_path} s3://{bucket} " f"--endpoint-url {self.s3gate_endpoint}"
|
||||
if metadata:
|
||||
cmd += " --metadata"
|
||||
for key, value in metadata.items():
|
||||
|
@ -674,9 +641,7 @@ class AwsCliClient(S3ClientWrapper):
|
|||
self.local_shell.exec(cmd)
|
||||
|
||||
@reporter.step_deco("Upload part S3")
|
||||
def upload_part(
|
||||
self, bucket: str, key: str, upload_id: str, part_num: int, filepath: str
|
||||
) -> str:
|
||||
def upload_part(self, bucket: str, key: str, upload_id: str, part_num: int, filepath: str) -> str:
|
||||
cmd = (
|
||||
f"aws {self.common_flags} s3api upload-part --bucket {bucket} --key {key} "
|
||||
f"--upload-id {upload_id} --part-number {part_num} --body {filepath} "
|
||||
|
@ -688,9 +653,7 @@ class AwsCliClient(S3ClientWrapper):
|
|||
return response["ETag"]
|
||||
|
||||
@reporter.step_deco("Upload copy part S3")
|
||||
def upload_part_copy(
|
||||
self, bucket: str, key: str, upload_id: str, part_num: int, copy_source: str
|
||||
) -> str:
|
||||
def upload_part_copy(self, bucket: str, key: str, upload_id: str, part_num: int, copy_source: str) -> str:
|
||||
cmd = (
|
||||
f"aws {self.common_flags} s3api upload-part-copy --bucket {bucket} --key {key} "
|
||||
f"--upload-id {upload_id} --part-number {part_num} --copy-source {copy_source} "
|
||||
|
@ -698,9 +661,7 @@ class AwsCliClient(S3ClientWrapper):
|
|||
)
|
||||
output = self.local_shell.exec(cmd, command_options).stdout
|
||||
response = self._to_json(output)
|
||||
assert response.get("CopyPartResult", []).get(
|
||||
"ETag"
|
||||
), f"Expected ETag in response:\n{response}"
|
||||
assert response.get("CopyPartResult", []).get("ETag"), f"Expected ETag in response:\n{response}"
|
||||
|
||||
return response["CopyPartResult"]["ETag"]
|
||||
|
||||
|
|
Loading…
Reference in a new issue