Add AWS retries #113
2 changed files with 17 additions and 56 deletions
|
@ -43,6 +43,6 @@ with open(DEFAULT_WALLET_CONFIG, "w") as file:
|
||||||
|
|
||||||
# Number of attempts that S3 clients will attempt per each request (1 means single attempt
|
# Number of attempts that S3 clients will attempt per each request (1 means single attempt
|
||||||
# without any retries)
|
# without any retries)
|
||||||
MAX_REQUEST_ATTEMPTS = 1
|
MAX_REQUEST_ATTEMPTS = 5
|
||||||
RETRY_MODE = "standard"
|
RETRY_MODE = "standard"
|
||||||
CREDENTIALS_CREATE_TIMEOUT = "1m"
|
CREDENTIALS_CREATE_TIMEOUT = "1m"
|
||||||
|
|
|
@ -7,12 +7,7 @@ from time import sleep
|
||||||
from typing import Literal, Optional, Union
|
from typing import Literal, Optional, Union
|
||||||
|
|
||||||
from frostfs_testlib.reporter import get_reporter
|
from frostfs_testlib.reporter import get_reporter
|
||||||
from frostfs_testlib.resources.common import (
|
from frostfs_testlib.resources.common import ASSETS_DIR, MAX_REQUEST_ATTEMPTS, RETRY_MODE, S3_SYNC_WAIT_TIME
|
||||||
ASSETS_DIR,
|
|
||||||
MAX_REQUEST_ATTEMPTS,
|
|
||||||
RETRY_MODE,
|
|
||||||
S3_SYNC_WAIT_TIME,
|
|
||||||
)
|
|
||||||
from frostfs_testlib.s3.interfaces import S3ClientWrapper, VersioningStatus, _make_objs_dict
|
from frostfs_testlib.s3.interfaces import S3ClientWrapper, VersioningStatus, _make_objs_dict
|
||||||
from frostfs_testlib.shell import CommandOptions
|
from frostfs_testlib.shell import CommandOptions
|
||||||
from frostfs_testlib.shell.local_shell import LocalShell
|
from frostfs_testlib.shell.local_shell import LocalShell
|
||||||
|
@ -128,9 +123,7 @@ class AwsCliClient(S3ClientWrapper):
|
||||||
|
|
||||||
@reporter.step_deco("Put bucket tagging")
|
@reporter.step_deco("Put bucket tagging")
|
||||||
def put_bucket_tagging(self, bucket: str, tags: list) -> None:
|
def put_bucket_tagging(self, bucket: str, tags: list) -> None:
|
||||||
tags_json = {
|
tags_json = {"TagSet": [{"Key": tag_key, "Value": tag_value} for tag_key, tag_value in tags]}
|
||||||
"TagSet": [{"Key": tag_key, "Value": tag_value} for tag_key, tag_value in tags]
|
|
||||||
}
|
|
||||||
cmd = (
|
cmd = (
|
||||||
f"aws {self.common_flags} s3api put-bucket-tagging --bucket {bucket} "
|
f"aws {self.common_flags} s3api put-bucket-tagging --bucket {bucket} "
|
||||||
f"--tagging '{json.dumps(tags_json)}' --endpoint {self.s3gate_endpoint}"
|
f"--tagging '{json.dumps(tags_json)}' --endpoint {self.s3gate_endpoint}"
|
||||||
|
@ -140,8 +133,7 @@ class AwsCliClient(S3ClientWrapper):
|
||||||
@reporter.step_deco("Get bucket tagging")
|
@reporter.step_deco("Get bucket tagging")
|
||||||
def get_bucket_tagging(self, bucket: str) -> list:
|
def get_bucket_tagging(self, bucket: str) -> list:
|
||||||
cmd = (
|
cmd = (
|
||||||
f"aws {self.common_flags} s3api get-bucket-tagging --bucket {bucket} "
|
f"aws {self.common_flags} s3api get-bucket-tagging --bucket {bucket} " f"--endpoint {self.s3gate_endpoint}"
|
||||||
f"--endpoint {self.s3gate_endpoint}"
|
|
||||||
)
|
)
|
||||||
output = self.local_shell.exec(cmd).stdout
|
output = self.local_shell.exec(cmd).stdout
|
||||||
response = self._to_json(output)
|
response = self._to_json(output)
|
||||||
|
@ -149,10 +141,7 @@ class AwsCliClient(S3ClientWrapper):
|
||||||
|
|
||||||
@reporter.step_deco("Get bucket acl")
|
@reporter.step_deco("Get bucket acl")
|
||||||
def get_bucket_acl(self, bucket: str) -> list:
|
def get_bucket_acl(self, bucket: str) -> list:
|
||||||
cmd = (
|
cmd = f"aws {self.common_flags} s3api get-bucket-acl --bucket {bucket} " f"--endpoint {self.s3gate_endpoint}"
|
||||||
f"aws {self.common_flags} s3api get-bucket-acl --bucket {bucket} "
|
|
||||||
f"--endpoint {self.s3gate_endpoint}"
|
|
||||||
)
|
|
||||||
output = self.local_shell.exec(cmd).stdout
|
output = self.local_shell.exec(cmd).stdout
|
||||||
response = self._to_json(output)
|
response = self._to_json(output)
|
||||||
return response.get("Grants")
|
return response.get("Grants")
|
||||||
|
@ -160,8 +149,7 @@ class AwsCliClient(S3ClientWrapper):
|
||||||
@reporter.step_deco("Get bucket location")
|
@reporter.step_deco("Get bucket location")
|
||||||
def get_bucket_location(self, bucket: str) -> dict:
|
def get_bucket_location(self, bucket: str) -> dict:
|
||||||
cmd = (
|
cmd = (
|
||||||
f"aws {self.common_flags} s3api get-bucket-location --bucket {bucket} "
|
f"aws {self.common_flags} s3api get-bucket-location --bucket {bucket} " f"--endpoint {self.s3gate_endpoint}"
|
||||||
f"--endpoint {self.s3gate_endpoint}"
|
|
||||||
)
|
)
|
||||||
output = self.local_shell.exec(cmd).stdout
|
output = self.local_shell.exec(cmd).stdout
|
||||||
response = self._to_json(output)
|
response = self._to_json(output)
|
||||||
|
@ -169,10 +157,7 @@ class AwsCliClient(S3ClientWrapper):
|
||||||
|
|
||||||
@reporter.step_deco("List objects S3")
|
@reporter.step_deco("List objects S3")
|
||||||
def list_objects(self, bucket: str, full_output: bool = False) -> Union[dict, list[str]]:
|
def list_objects(self, bucket: str, full_output: bool = False) -> Union[dict, list[str]]:
|
||||||
cmd = (
|
cmd = f"aws {self.common_flags} s3api list-objects --bucket {bucket} " f"--endpoint {self.s3gate_endpoint}"
|
||||||
f"aws {self.common_flags} s3api list-objects --bucket {bucket} "
|
|
||||||
f"--endpoint {self.s3gate_endpoint}"
|
|
||||||
)
|
|
||||||
output = self.local_shell.exec(cmd).stdout
|
output = self.local_shell.exec(cmd).stdout
|
||||||
response = self._to_json(output)
|
response = self._to_json(output)
|
||||||
|
|
||||||
|
@ -183,10 +168,7 @@ class AwsCliClient(S3ClientWrapper):
|
||||||
|
|
||||||
@reporter.step_deco("List objects S3 v2")
|
@reporter.step_deco("List objects S3 v2")
|
||||||
def list_objects_v2(self, bucket: str, full_output: bool = False) -> Union[dict, list[str]]:
|
def list_objects_v2(self, bucket: str, full_output: bool = False) -> Union[dict, list[str]]:
|
||||||
cmd = (
|
cmd = f"aws {self.common_flags} s3api list-objects-v2 --bucket {bucket} " f"--endpoint {self.s3gate_endpoint}"
|
||||||
f"aws {self.common_flags} s3api list-objects-v2 --bucket {bucket} "
|
|
||||||
f"--endpoint {self.s3gate_endpoint}"
|
|
||||||
)
|
|
||||||
output = self.local_shell.exec(cmd).stdout
|
output = self.local_shell.exec(cmd).stdout
|
||||||
response = self._to_json(output)
|
response = self._to_json(output)
|
||||||
|
|
||||||
|
@ -371,10 +353,7 @@ class AwsCliClient(S3ClientWrapper):
|
||||||
grant_write: Optional[str] = None,
|
grant_write: Optional[str] = None,
|
||||||
grant_read: Optional[str] = None,
|
grant_read: Optional[str] = None,
|
||||||
) -> None:
|
) -> None:
|
||||||
cmd = (
|
cmd = f"aws {self.common_flags} s3api put-bucket-acl --bucket {bucket} " f" --endpoint {self.s3gate_endpoint}"
|
||||||
f"aws {self.common_flags} s3api put-bucket-acl --bucket {bucket} "
|
|
||||||
f" --endpoint {self.s3gate_endpoint}"
|
|
||||||
)
|
|
||||||
if acl:
|
if acl:
|
||||||
cmd += f" --acl {acl}"
|
cmd += f" --acl {acl}"
|
||||||
if grant_write:
|
if grant_write:
|
||||||
|
@ -442,9 +421,7 @@ class AwsCliClient(S3ClientWrapper):
|
||||||
def delete_object_versions_without_dm(self, bucket: str, object_versions: list) -> None:
|
def delete_object_versions_without_dm(self, bucket: str, object_versions: list) -> None:
|
||||||
# Delete objects without creating delete markers
|
# Delete objects without creating delete markers
|
||||||
for object_version in object_versions:
|
for object_version in object_versions:
|
||||||
self.delete_object(
|
self.delete_object(bucket=bucket, key=object_version["Key"], version_id=object_version["VersionId"])
|
||||||
bucket=bucket, key=object_version["Key"], version_id=object_version["VersionId"]
|
|
||||||
)
|
|
||||||
|
|
||||||
@reporter.step_deco("Get object attributes")
|
@reporter.step_deco("Get object attributes")
|
||||||
def get_object_attributes(
|
def get_object_attributes(
|
||||||
|
@ -480,10 +457,7 @@ class AwsCliClient(S3ClientWrapper):
|
||||||
|
|
||||||
@reporter.step_deco("Get bucket policy")
|
@reporter.step_deco("Get bucket policy")
|
||||||
def get_bucket_policy(self, bucket: str) -> dict:
|
def get_bucket_policy(self, bucket: str) -> dict:
|
||||||
cmd = (
|
cmd = f"aws {self.common_flags} s3api get-bucket-policy --bucket {bucket} " f"--endpoint {self.s3gate_endpoint}"
|
||||||
f"aws {self.common_flags} s3api get-bucket-policy --bucket {bucket} "
|
|
||||||
f"--endpoint {self.s3gate_endpoint}"
|
|
||||||
)
|
|
||||||
output = self.local_shell.exec(cmd).stdout
|
output = self.local_shell.exec(cmd).stdout
|
||||||
response = self._to_json(output)
|
response = self._to_json(output)
|
||||||
return response.get("Policy")
|
return response.get("Policy")
|
||||||
|
@ -505,10 +479,7 @@ class AwsCliClient(S3ClientWrapper):
|
||||||
|
|
||||||
@reporter.step_deco("Get bucket cors")
|
@reporter.step_deco("Get bucket cors")
|
||||||
def get_bucket_cors(self, bucket: str) -> dict:
|
def get_bucket_cors(self, bucket: str) -> dict:
|
||||||
cmd = (
|
cmd = f"aws {self.common_flags} s3api get-bucket-cors --bucket {bucket} " f"--endpoint {self.s3gate_endpoint}"
|
||||||
f"aws {self.common_flags} s3api get-bucket-cors --bucket {bucket} "
|
|
||||||
f"--endpoint {self.s3gate_endpoint}"
|
|
||||||
)
|
|
||||||
output = self.local_shell.exec(cmd).stdout
|
output = self.local_shell.exec(cmd).stdout
|
||||||
response = self._to_json(output)
|
response = self._to_json(output)
|
||||||
return response.get("CORSRules")
|
return response.get("CORSRules")
|
||||||
|
@ -524,8 +495,7 @@ class AwsCliClient(S3ClientWrapper):
|
||||||
@reporter.step_deco("Delete bucket cors")
|
@reporter.step_deco("Delete bucket cors")
|
||||||
def delete_bucket_cors(self, bucket: str) -> None:
|
def delete_bucket_cors(self, bucket: str) -> None:
|
||||||
cmd = (
|
cmd = (
|
||||||
f"aws {self.common_flags} s3api delete-bucket-cors --bucket {bucket} "
|
f"aws {self.common_flags} s3api delete-bucket-cors --bucket {bucket} " f"--endpoint {self.s3gate_endpoint}"
|
||||||
f"--endpoint {self.s3gate_endpoint}"
|
|
||||||
)
|
)
|
||||||
self.local_shell.exec(cmd)
|
self.local_shell.exec(cmd)
|
||||||
|
|
||||||
|
@ -608,10 +578,7 @@ class AwsCliClient(S3ClientWrapper):
|
||||||
acl: Optional[str] = None,
|
acl: Optional[str] = None,
|
||||||
metadata: Optional[dict] = None,
|
metadata: Optional[dict] = None,
|
||||||
) -> dict:
|
) -> dict:
|
||||||
cmd = (
|
cmd = f"aws {self.common_flags} s3 sync {dir_path} s3://{bucket} " f"--endpoint-url {self.s3gate_endpoint}"
|
||||||
f"aws {self.common_flags} s3 sync {dir_path} s3://{bucket} "
|
|
||||||
f"--endpoint-url {self.s3gate_endpoint}"
|
|
||||||
)
|
|
||||||
if metadata:
|
if metadata:
|
||||||
cmd += " --metadata"
|
cmd += " --metadata"
|
||||||
for key, value in metadata.items():
|
for key, value in metadata.items():
|
||||||
|
@ -674,9 +641,7 @@ class AwsCliClient(S3ClientWrapper):
|
||||||
self.local_shell.exec(cmd)
|
self.local_shell.exec(cmd)
|
||||||
|
|
||||||
@reporter.step_deco("Upload part S3")
|
@reporter.step_deco("Upload part S3")
|
||||||
def upload_part(
|
def upload_part(self, bucket: str, key: str, upload_id: str, part_num: int, filepath: str) -> str:
|
||||||
self, bucket: str, key: str, upload_id: str, part_num: int, filepath: str
|
|
||||||
) -> str:
|
|
||||||
cmd = (
|
cmd = (
|
||||||
f"aws {self.common_flags} s3api upload-part --bucket {bucket} --key {key} "
|
f"aws {self.common_flags} s3api upload-part --bucket {bucket} --key {key} "
|
||||||
f"--upload-id {upload_id} --part-number {part_num} --body {filepath} "
|
f"--upload-id {upload_id} --part-number {part_num} --body {filepath} "
|
||||||
|
@ -688,9 +653,7 @@ class AwsCliClient(S3ClientWrapper):
|
||||||
return response["ETag"]
|
return response["ETag"]
|
||||||
|
|
||||||
@reporter.step_deco("Upload copy part S3")
|
@reporter.step_deco("Upload copy part S3")
|
||||||
def upload_part_copy(
|
def upload_part_copy(self, bucket: str, key: str, upload_id: str, part_num: int, copy_source: str) -> str:
|
||||||
self, bucket: str, key: str, upload_id: str, part_num: int, copy_source: str
|
|
||||||
) -> str:
|
|
||||||
cmd = (
|
cmd = (
|
||||||
f"aws {self.common_flags} s3api upload-part-copy --bucket {bucket} --key {key} "
|
f"aws {self.common_flags} s3api upload-part-copy --bucket {bucket} --key {key} "
|
||||||
f"--upload-id {upload_id} --part-number {part_num} --copy-source {copy_source} "
|
f"--upload-id {upload_id} --part-number {part_num} --copy-source {copy_source} "
|
||||||
|
@ -698,9 +661,7 @@ class AwsCliClient(S3ClientWrapper):
|
||||||
)
|
)
|
||||||
output = self.local_shell.exec(cmd, command_options).stdout
|
output = self.local_shell.exec(cmd, command_options).stdout
|
||||||
response = self._to_json(output)
|
response = self._to_json(output)
|
||||||
assert response.get("CopyPartResult", []).get(
|
assert response.get("CopyPartResult", []).get("ETag"), f"Expected ETag in response:\n{response}"
|
||||||
"ETag"
|
|
||||||
), f"Expected ETag in response:\n{response}"
|
|
||||||
|
|
||||||
return response["CopyPartResult"]["ETag"]
|
return response["CopyPartResult"]["ETag"]
|
||||||
|
|
||||||
|
|
Loading…
Reference in a new issue