diff --git a/src/frostfs_testlib/s3/aws_cli_client.py b/src/frostfs_testlib/s3/aws_cli_client.py index f6488f5..3568037 100644 --- a/src/frostfs_testlib/s3/aws_cli_client.py +++ b/src/frostfs_testlib/s3/aws_cli_client.py @@ -91,7 +91,6 @@ class AwsCliClient(S3ClientWrapper): if location_constraint: cmd += f" --create-bucket-configuration LocationConstraint={location_constraint}" self.local_shell.exec(cmd) - sleep(S3_SYNC_WAIT_TIME) return bucket @@ -106,7 +105,6 @@ class AwsCliClient(S3ClientWrapper): def delete_bucket(self, bucket: str) -> None: cmd = f"aws {self.common_flags} s3api delete-bucket --bucket {bucket} --endpoint {self.s3gate_endpoint} --profile {self.profile}" self.local_shell.exec(cmd, command_options) - sleep(S3_SYNC_WAIT_TIME) @reporter.step("Head bucket S3") def head_bucket(self, bucket: str) -> None: @@ -397,7 +395,6 @@ class AwsCliClient(S3ClientWrapper): ) output = self.local_shell.exec(cmd, command_options).stdout response = self._to_json(output) - sleep(S3_SYNC_WAIT_TIME) return response @reporter.step("Delete object S3") @@ -408,7 +405,6 @@ class AwsCliClient(S3ClientWrapper): f"--key {key} {version} --endpoint {self.s3gate_endpoint} --profile {self.profile}" ) output = self.local_shell.exec(cmd, command_options).stdout - sleep(S3_SYNC_WAIT_TIME) return self._to_json(output) @reporter.step("Delete object versions S3") @@ -435,7 +431,6 @@ class AwsCliClient(S3ClientWrapper): f"--delete file://{file_path} --endpoint {self.s3gate_endpoint} --profile {self.profile}" ) output = self.local_shell.exec(cmd, command_options).stdout - sleep(S3_SYNC_WAIT_TIME) return self._to_json(output) @reporter.step("Delete object versions S3 without delete markers") diff --git a/src/frostfs_testlib/s3/boto3_client.py b/src/frostfs_testlib/s3/boto3_client.py index bdf7a9f..a8a7828 100644 --- a/src/frostfs_testlib/s3/boto3_client.py +++ b/src/frostfs_testlib/s3/boto3_client.py @@ -134,7 +134,6 @@ class Boto3ClientWrapper(S3ClientWrapper): s3_bucket = self.boto3_client.create_bucket(**params) log_command_execution(f"Created S3 bucket {bucket}", s3_bucket) - sleep(S3_SYNC_WAIT_TIME) return bucket @reporter.step("List buckets S3") @@ -155,7 +154,6 @@ class Boto3ClientWrapper(S3ClientWrapper): def delete_bucket(self, bucket: str) -> None: response = self.boto3_client.delete_bucket(Bucket=bucket) log_command_execution("S3 Delete bucket result", response) - sleep(S3_SYNC_WAIT_TIME) @reporter.step("Head bucket S3") @report_error @@ -364,7 +362,6 @@ class Boto3ClientWrapper(S3ClientWrapper): params = {self._to_s3_param(param): value for param, value in locals().items() if param not in ["self"] and value is not None} response = self.boto3_client.delete_object(**params) log_command_execution("S3 Delete object result", response) - sleep(S3_SYNC_WAIT_TIME) return response @reporter.step("Delete objects S3") @@ -375,7 +372,6 @@ class Boto3ClientWrapper(S3ClientWrapper): assert ( "Errors" not in response ), f'The following objects have not been deleted: {[err_info["Key"] for err_info in response["Errors"]]}.\nError Message: {response["Errors"]["Message"]}' - sleep(S3_SYNC_WAIT_TIME) return response @reporter.step("Delete object versions S3") @@ -413,8 +409,10 @@ class Boto3ClientWrapper(S3ClientWrapper): grant_write: Optional[str] = None, grant_read: Optional[str] = None, ) -> list: - # pytest.skip("Method put_object_acl is not supported by boto3 client") - raise NotImplementedError("Unsupported for boto3 client") + params = {self._to_s3_param(param): value for param, value in locals().items() if param not in ["self"] and value is not None} + response = self.boto3_client.put_object_acl(**params) + log_command_execution("S3 put object ACL", response) + return response.get("Grants") @reporter.step("Get object ACL") @report_error diff --git a/src/frostfs_testlib/steps/s3/s3_helper.py b/src/frostfs_testlib/steps/s3/s3_helper.py index ab0cee3..9b85766 100644 --- a/src/frostfs_testlib/steps/s3/s3_helper.py +++ b/src/frostfs_testlib/steps/s3/s3_helper.py @@ -47,7 +47,6 @@ def set_bucket_versioning(s3_client: S3ClientWrapper, bucket: str, status: Versi if status == VersioningStatus.UNDEFINED: return - s3_client.get_bucket_versioning_status(bucket) s3_client.put_bucket_versioning(bucket, status=status) bucket_status = s3_client.get_bucket_versioning_status(bucket) assert bucket_status == status.value, f"Expected {bucket_status} status. Got {status.value}" diff --git a/src/frostfs_testlib/storage/controllers/cluster_state_controller.py b/src/frostfs_testlib/storage/controllers/cluster_state_controller.py index 4003dfd..3c6c268 100644 --- a/src/frostfs_testlib/storage/controllers/cluster_state_controller.py +++ b/src/frostfs_testlib/storage/controllers/cluster_state_controller.py @@ -440,9 +440,11 @@ class ClusterStateController: self.await_node_status(status, wallet, cluster_node) @wait_for_success(80, 8, title="Wait for node status become {status}") - def await_node_status(self, status: NodeStatus, wallet: WalletInfo, cluster_node: ClusterNode): + def await_node_status(self, status: NodeStatus, wallet: WalletInfo, cluster_node: ClusterNode, checker_node: ClusterNode = None): frostfs_cli = FrostfsCli(self.shell, FROSTFS_CLI_EXEC, wallet.config_path) - netmap = NetmapParser.snapshot_all_nodes(frostfs_cli.netmap.snapshot(cluster_node.storage_node.get_rpc_endpoint()).stdout) + if not checker_node: + checker_node = cluster_node + netmap = NetmapParser.snapshot_all_nodes(frostfs_cli.netmap.snapshot(checker_node.storage_node.get_rpc_endpoint()).stdout) netmap = [node for node in netmap if cluster_node.host_ip == node.node] if status == NodeStatus.OFFLINE: assert cluster_node.host_ip not in netmap, f"{cluster_node.host_ip} not in Offline" diff --git a/src/frostfs_testlib/testing/parallel.py b/src/frostfs_testlib/testing/parallel.py index 1c30cec..9c36118 100644 --- a/src/frostfs_testlib/testing/parallel.py +++ b/src/frostfs_testlib/testing/parallel.py @@ -2,6 +2,8 @@ import itertools from concurrent.futures import Future, ThreadPoolExecutor from typing import Callable, Collection, Optional, Union +MAX_WORKERS = 50 + def parallel( fn: Union[Callable, list[Callable]], @@ -54,7 +56,7 @@ def _run_by_fn_list(fn_list: list[Callable], *args, **kwargs) -> list[Future]: futures: list[Future] = [] - with ThreadPoolExecutor(max_workers=len(fn_list)) as executor: + with ThreadPoolExecutor(max_workers=min(len(fn_list), MAX_WORKERS)) as executor: for fn in fn_list: task_args = _get_args(*args) task_kwargs = _get_kwargs(**kwargs) @@ -67,7 +69,7 @@ def _run_by_fn_list(fn_list: list[Callable], *args, **kwargs) -> list[Future]: def _run_by_items(fn: Callable, parallel_items: Collection, *args, **kwargs) -> list[Future]: futures: list[Future] = [] - with ThreadPoolExecutor(max_workers=len(parallel_items)) as executor: + with ThreadPoolExecutor(max_workers=min(len(parallel_items), MAX_WORKERS)) as executor: for item in parallel_items: task_args = _get_args(*args) task_kwargs = _get_kwargs(**kwargs)