Add AWS retries

Signed-off-by: Dmitriy Zayakin <d.zayakin@yadro.com>
This commit is contained in:
Dmitriy Zayakin 2023-11-08 19:49:20 +03:00
parent 5687b79b38
commit 44e7c7eb29
2 changed files with 17 additions and 56 deletions

View file

@ -43,6 +43,6 @@ with open(DEFAULT_WALLET_CONFIG, "w") as file:
# Number of attempts that S3 clients will attempt per each request (1 means single attempt # Number of attempts that S3 clients will attempt per each request (1 means single attempt
# without any retries) # without any retries)
MAX_REQUEST_ATTEMPTS = 1 MAX_REQUEST_ATTEMPTS = 5
RETRY_MODE = "standard" RETRY_MODE = "standard"
CREDENTIALS_CREATE_TIMEOUT = "1m" CREDENTIALS_CREATE_TIMEOUT = "1m"

View file

@ -7,12 +7,7 @@ from time import sleep
from typing import Literal, Optional, Union from typing import Literal, Optional, Union
from frostfs_testlib.reporter import get_reporter from frostfs_testlib.reporter import get_reporter
from frostfs_testlib.resources.common import ( from frostfs_testlib.resources.common import ASSETS_DIR, MAX_REQUEST_ATTEMPTS, RETRY_MODE, S3_SYNC_WAIT_TIME
ASSETS_DIR,
MAX_REQUEST_ATTEMPTS,
RETRY_MODE,
S3_SYNC_WAIT_TIME,
)
from frostfs_testlib.s3.interfaces import S3ClientWrapper, VersioningStatus, _make_objs_dict from frostfs_testlib.s3.interfaces import S3ClientWrapper, VersioningStatus, _make_objs_dict
from frostfs_testlib.shell import CommandOptions from frostfs_testlib.shell import CommandOptions
from frostfs_testlib.shell.local_shell import LocalShell from frostfs_testlib.shell.local_shell import LocalShell
@ -128,9 +123,7 @@ class AwsCliClient(S3ClientWrapper):
@reporter.step_deco("Put bucket tagging") @reporter.step_deco("Put bucket tagging")
def put_bucket_tagging(self, bucket: str, tags: list) -> None: def put_bucket_tagging(self, bucket: str, tags: list) -> None:
tags_json = { tags_json = {"TagSet": [{"Key": tag_key, "Value": tag_value} for tag_key, tag_value in tags]}
"TagSet": [{"Key": tag_key, "Value": tag_value} for tag_key, tag_value in tags]
}
cmd = ( cmd = (
f"aws {self.common_flags} s3api put-bucket-tagging --bucket {bucket} " f"aws {self.common_flags} s3api put-bucket-tagging --bucket {bucket} "
f"--tagging '{json.dumps(tags_json)}' --endpoint {self.s3gate_endpoint}" f"--tagging '{json.dumps(tags_json)}' --endpoint {self.s3gate_endpoint}"
@ -140,8 +133,7 @@ class AwsCliClient(S3ClientWrapper):
@reporter.step_deco("Get bucket tagging") @reporter.step_deco("Get bucket tagging")
def get_bucket_tagging(self, bucket: str) -> list: def get_bucket_tagging(self, bucket: str) -> list:
cmd = ( cmd = (
f"aws {self.common_flags} s3api get-bucket-tagging --bucket {bucket} " f"aws {self.common_flags} s3api get-bucket-tagging --bucket {bucket} " f"--endpoint {self.s3gate_endpoint}"
f"--endpoint {self.s3gate_endpoint}"
) )
output = self.local_shell.exec(cmd).stdout output = self.local_shell.exec(cmd).stdout
response = self._to_json(output) response = self._to_json(output)
@ -149,10 +141,7 @@ class AwsCliClient(S3ClientWrapper):
@reporter.step_deco("Get bucket acl") @reporter.step_deco("Get bucket acl")
def get_bucket_acl(self, bucket: str) -> list: def get_bucket_acl(self, bucket: str) -> list:
cmd = ( cmd = f"aws {self.common_flags} s3api get-bucket-acl --bucket {bucket} " f"--endpoint {self.s3gate_endpoint}"
f"aws {self.common_flags} s3api get-bucket-acl --bucket {bucket} "
f"--endpoint {self.s3gate_endpoint}"
)
output = self.local_shell.exec(cmd).stdout output = self.local_shell.exec(cmd).stdout
response = self._to_json(output) response = self._to_json(output)
return response.get("Grants") return response.get("Grants")
@ -160,8 +149,7 @@ class AwsCliClient(S3ClientWrapper):
@reporter.step_deco("Get bucket location") @reporter.step_deco("Get bucket location")
def get_bucket_location(self, bucket: str) -> dict: def get_bucket_location(self, bucket: str) -> dict:
cmd = ( cmd = (
f"aws {self.common_flags} s3api get-bucket-location --bucket {bucket} " f"aws {self.common_flags} s3api get-bucket-location --bucket {bucket} " f"--endpoint {self.s3gate_endpoint}"
f"--endpoint {self.s3gate_endpoint}"
) )
output = self.local_shell.exec(cmd).stdout output = self.local_shell.exec(cmd).stdout
response = self._to_json(output) response = self._to_json(output)
@ -169,10 +157,7 @@ class AwsCliClient(S3ClientWrapper):
@reporter.step_deco("List objects S3") @reporter.step_deco("List objects S3")
def list_objects(self, bucket: str, full_output: bool = False) -> Union[dict, list[str]]: def list_objects(self, bucket: str, full_output: bool = False) -> Union[dict, list[str]]:
cmd = ( cmd = f"aws {self.common_flags} s3api list-objects --bucket {bucket} " f"--endpoint {self.s3gate_endpoint}"
f"aws {self.common_flags} s3api list-objects --bucket {bucket} "
f"--endpoint {self.s3gate_endpoint}"
)
output = self.local_shell.exec(cmd).stdout output = self.local_shell.exec(cmd).stdout
response = self._to_json(output) response = self._to_json(output)
@ -183,10 +168,7 @@ class AwsCliClient(S3ClientWrapper):
@reporter.step_deco("List objects S3 v2") @reporter.step_deco("List objects S3 v2")
def list_objects_v2(self, bucket: str, full_output: bool = False) -> Union[dict, list[str]]: def list_objects_v2(self, bucket: str, full_output: bool = False) -> Union[dict, list[str]]:
cmd = ( cmd = f"aws {self.common_flags} s3api list-objects-v2 --bucket {bucket} " f"--endpoint {self.s3gate_endpoint}"
f"aws {self.common_flags} s3api list-objects-v2 --bucket {bucket} "
f"--endpoint {self.s3gate_endpoint}"
)
output = self.local_shell.exec(cmd).stdout output = self.local_shell.exec(cmd).stdout
response = self._to_json(output) response = self._to_json(output)
@ -371,10 +353,7 @@ class AwsCliClient(S3ClientWrapper):
grant_write: Optional[str] = None, grant_write: Optional[str] = None,
grant_read: Optional[str] = None, grant_read: Optional[str] = None,
) -> None: ) -> None:
cmd = ( cmd = f"aws {self.common_flags} s3api put-bucket-acl --bucket {bucket} " f" --endpoint {self.s3gate_endpoint}"
f"aws {self.common_flags} s3api put-bucket-acl --bucket {bucket} "
f" --endpoint {self.s3gate_endpoint}"
)
if acl: if acl:
cmd += f" --acl {acl}" cmd += f" --acl {acl}"
if grant_write: if grant_write:
@ -442,9 +421,7 @@ class AwsCliClient(S3ClientWrapper):
def delete_object_versions_without_dm(self, bucket: str, object_versions: list) -> None: def delete_object_versions_without_dm(self, bucket: str, object_versions: list) -> None:
# Delete objects without creating delete markers # Delete objects without creating delete markers
for object_version in object_versions: for object_version in object_versions:
self.delete_object( self.delete_object(bucket=bucket, key=object_version["Key"], version_id=object_version["VersionId"])
bucket=bucket, key=object_version["Key"], version_id=object_version["VersionId"]
)
@reporter.step_deco("Get object attributes") @reporter.step_deco("Get object attributes")
def get_object_attributes( def get_object_attributes(
@ -480,10 +457,7 @@ class AwsCliClient(S3ClientWrapper):
@reporter.step_deco("Get bucket policy") @reporter.step_deco("Get bucket policy")
def get_bucket_policy(self, bucket: str) -> dict: def get_bucket_policy(self, bucket: str) -> dict:
cmd = ( cmd = f"aws {self.common_flags} s3api get-bucket-policy --bucket {bucket} " f"--endpoint {self.s3gate_endpoint}"
f"aws {self.common_flags} s3api get-bucket-policy --bucket {bucket} "
f"--endpoint {self.s3gate_endpoint}"
)
output = self.local_shell.exec(cmd).stdout output = self.local_shell.exec(cmd).stdout
response = self._to_json(output) response = self._to_json(output)
return response.get("Policy") return response.get("Policy")
@ -505,10 +479,7 @@ class AwsCliClient(S3ClientWrapper):
@reporter.step_deco("Get bucket cors") @reporter.step_deco("Get bucket cors")
def get_bucket_cors(self, bucket: str) -> dict: def get_bucket_cors(self, bucket: str) -> dict:
cmd = ( cmd = f"aws {self.common_flags} s3api get-bucket-cors --bucket {bucket} " f"--endpoint {self.s3gate_endpoint}"
f"aws {self.common_flags} s3api get-bucket-cors --bucket {bucket} "
f"--endpoint {self.s3gate_endpoint}"
)
output = self.local_shell.exec(cmd).stdout output = self.local_shell.exec(cmd).stdout
response = self._to_json(output) response = self._to_json(output)
return response.get("CORSRules") return response.get("CORSRules")
@ -524,8 +495,7 @@ class AwsCliClient(S3ClientWrapper):
@reporter.step_deco("Delete bucket cors") @reporter.step_deco("Delete bucket cors")
def delete_bucket_cors(self, bucket: str) -> None: def delete_bucket_cors(self, bucket: str) -> None:
cmd = ( cmd = (
f"aws {self.common_flags} s3api delete-bucket-cors --bucket {bucket} " f"aws {self.common_flags} s3api delete-bucket-cors --bucket {bucket} " f"--endpoint {self.s3gate_endpoint}"
f"--endpoint {self.s3gate_endpoint}"
) )
self.local_shell.exec(cmd) self.local_shell.exec(cmd)
@ -608,10 +578,7 @@ class AwsCliClient(S3ClientWrapper):
acl: Optional[str] = None, acl: Optional[str] = None,
metadata: Optional[dict] = None, metadata: Optional[dict] = None,
) -> dict: ) -> dict:
cmd = ( cmd = f"aws {self.common_flags} s3 sync {dir_path} s3://{bucket} " f"--endpoint-url {self.s3gate_endpoint}"
f"aws {self.common_flags} s3 sync {dir_path} s3://{bucket} "
f"--endpoint-url {self.s3gate_endpoint}"
)
if metadata: if metadata:
cmd += " --metadata" cmd += " --metadata"
for key, value in metadata.items(): for key, value in metadata.items():
@ -674,9 +641,7 @@ class AwsCliClient(S3ClientWrapper):
self.local_shell.exec(cmd) self.local_shell.exec(cmd)
@reporter.step_deco("Upload part S3") @reporter.step_deco("Upload part S3")
def upload_part( def upload_part(self, bucket: str, key: str, upload_id: str, part_num: int, filepath: str) -> str:
self, bucket: str, key: str, upload_id: str, part_num: int, filepath: str
) -> str:
cmd = ( cmd = (
f"aws {self.common_flags} s3api upload-part --bucket {bucket} --key {key} " f"aws {self.common_flags} s3api upload-part --bucket {bucket} --key {key} "
f"--upload-id {upload_id} --part-number {part_num} --body {filepath} " f"--upload-id {upload_id} --part-number {part_num} --body {filepath} "
@ -688,9 +653,7 @@ class AwsCliClient(S3ClientWrapper):
return response["ETag"] return response["ETag"]
@reporter.step_deco("Upload copy part S3") @reporter.step_deco("Upload copy part S3")
def upload_part_copy( def upload_part_copy(self, bucket: str, key: str, upload_id: str, part_num: int, copy_source: str) -> str:
self, bucket: str, key: str, upload_id: str, part_num: int, copy_source: str
) -> str:
cmd = ( cmd = (
f"aws {self.common_flags} s3api upload-part-copy --bucket {bucket} --key {key} " f"aws {self.common_flags} s3api upload-part-copy --bucket {bucket} --key {key} "
f"--upload-id {upload_id} --part-number {part_num} --copy-source {copy_source} " f"--upload-id {upload_id} --part-number {part_num} --copy-source {copy_source} "
@ -698,9 +661,7 @@ class AwsCliClient(S3ClientWrapper):
) )
output = self.local_shell.exec(cmd, command_options).stdout output = self.local_shell.exec(cmd, command_options).stdout
response = self._to_json(output) response = self._to_json(output)
assert response.get("CopyPartResult", []).get( assert response.get("CopyPartResult", []).get("ETag"), f"Expected ETag in response:\n{response}"
"ETag"
), f"Expected ETag in response:\n{response}"
return response["CopyPartResult"]["ETag"] return response["CopyPartResult"]["ETag"]