forked from TrueCloudLab/frostfs-testlib
[#250] Adjustments for tests optimization
Signed-off-by: a.berezin <a.berezin@yadro.com>
This commit is contained in:
parent
cb31d41f15
commit
f1b2fbd47b
5 changed files with 12 additions and 16 deletions
|
@ -91,7 +91,6 @@ class AwsCliClient(S3ClientWrapper):
|
||||||
if location_constraint:
|
if location_constraint:
|
||||||
cmd += f" --create-bucket-configuration LocationConstraint={location_constraint}"
|
cmd += f" --create-bucket-configuration LocationConstraint={location_constraint}"
|
||||||
self.local_shell.exec(cmd)
|
self.local_shell.exec(cmd)
|
||||||
sleep(S3_SYNC_WAIT_TIME)
|
|
||||||
|
|
||||||
return bucket
|
return bucket
|
||||||
|
|
||||||
|
@ -106,7 +105,6 @@ class AwsCliClient(S3ClientWrapper):
|
||||||
def delete_bucket(self, bucket: str) -> None:
|
def delete_bucket(self, bucket: str) -> None:
|
||||||
cmd = f"aws {self.common_flags} s3api delete-bucket --bucket {bucket} --endpoint {self.s3gate_endpoint} --profile {self.profile}"
|
cmd = f"aws {self.common_flags} s3api delete-bucket --bucket {bucket} --endpoint {self.s3gate_endpoint} --profile {self.profile}"
|
||||||
self.local_shell.exec(cmd, command_options)
|
self.local_shell.exec(cmd, command_options)
|
||||||
sleep(S3_SYNC_WAIT_TIME)
|
|
||||||
|
|
||||||
@reporter.step("Head bucket S3")
|
@reporter.step("Head bucket S3")
|
||||||
def head_bucket(self, bucket: str) -> None:
|
def head_bucket(self, bucket: str) -> None:
|
||||||
|
@ -397,7 +395,6 @@ class AwsCliClient(S3ClientWrapper):
|
||||||
)
|
)
|
||||||
output = self.local_shell.exec(cmd, command_options).stdout
|
output = self.local_shell.exec(cmd, command_options).stdout
|
||||||
response = self._to_json(output)
|
response = self._to_json(output)
|
||||||
sleep(S3_SYNC_WAIT_TIME)
|
|
||||||
return response
|
return response
|
||||||
|
|
||||||
@reporter.step("Delete object S3")
|
@reporter.step("Delete object S3")
|
||||||
|
@ -408,7 +405,6 @@ class AwsCliClient(S3ClientWrapper):
|
||||||
f"--key {key} {version} --endpoint {self.s3gate_endpoint} --profile {self.profile}"
|
f"--key {key} {version} --endpoint {self.s3gate_endpoint} --profile {self.profile}"
|
||||||
)
|
)
|
||||||
output = self.local_shell.exec(cmd, command_options).stdout
|
output = self.local_shell.exec(cmd, command_options).stdout
|
||||||
sleep(S3_SYNC_WAIT_TIME)
|
|
||||||
return self._to_json(output)
|
return self._to_json(output)
|
||||||
|
|
||||||
@reporter.step("Delete object versions S3")
|
@reporter.step("Delete object versions S3")
|
||||||
|
@ -435,7 +431,6 @@ class AwsCliClient(S3ClientWrapper):
|
||||||
f"--delete file://{file_path} --endpoint {self.s3gate_endpoint} --profile {self.profile}"
|
f"--delete file://{file_path} --endpoint {self.s3gate_endpoint} --profile {self.profile}"
|
||||||
)
|
)
|
||||||
output = self.local_shell.exec(cmd, command_options).stdout
|
output = self.local_shell.exec(cmd, command_options).stdout
|
||||||
sleep(S3_SYNC_WAIT_TIME)
|
|
||||||
return self._to_json(output)
|
return self._to_json(output)
|
||||||
|
|
||||||
@reporter.step("Delete object versions S3 without delete markers")
|
@reporter.step("Delete object versions S3 without delete markers")
|
||||||
|
|
|
@ -134,7 +134,6 @@ class Boto3ClientWrapper(S3ClientWrapper):
|
||||||
|
|
||||||
s3_bucket = self.boto3_client.create_bucket(**params)
|
s3_bucket = self.boto3_client.create_bucket(**params)
|
||||||
log_command_execution(f"Created S3 bucket {bucket}", s3_bucket)
|
log_command_execution(f"Created S3 bucket {bucket}", s3_bucket)
|
||||||
sleep(S3_SYNC_WAIT_TIME)
|
|
||||||
return bucket
|
return bucket
|
||||||
|
|
||||||
@reporter.step("List buckets S3")
|
@reporter.step("List buckets S3")
|
||||||
|
@ -155,7 +154,6 @@ class Boto3ClientWrapper(S3ClientWrapper):
|
||||||
def delete_bucket(self, bucket: str) -> None:
|
def delete_bucket(self, bucket: str) -> None:
|
||||||
response = self.boto3_client.delete_bucket(Bucket=bucket)
|
response = self.boto3_client.delete_bucket(Bucket=bucket)
|
||||||
log_command_execution("S3 Delete bucket result", response)
|
log_command_execution("S3 Delete bucket result", response)
|
||||||
sleep(S3_SYNC_WAIT_TIME)
|
|
||||||
|
|
||||||
@reporter.step("Head bucket S3")
|
@reporter.step("Head bucket S3")
|
||||||
@report_error
|
@report_error
|
||||||
|
@ -364,7 +362,6 @@ class Boto3ClientWrapper(S3ClientWrapper):
|
||||||
params = {self._to_s3_param(param): value for param, value in locals().items() if param not in ["self"] and value is not None}
|
params = {self._to_s3_param(param): value for param, value in locals().items() if param not in ["self"] and value is not None}
|
||||||
response = self.boto3_client.delete_object(**params)
|
response = self.boto3_client.delete_object(**params)
|
||||||
log_command_execution("S3 Delete object result", response)
|
log_command_execution("S3 Delete object result", response)
|
||||||
sleep(S3_SYNC_WAIT_TIME)
|
|
||||||
return response
|
return response
|
||||||
|
|
||||||
@reporter.step("Delete objects S3")
|
@reporter.step("Delete objects S3")
|
||||||
|
@ -375,7 +372,6 @@ class Boto3ClientWrapper(S3ClientWrapper):
|
||||||
assert (
|
assert (
|
||||||
"Errors" not in response
|
"Errors" not in response
|
||||||
), f'The following objects have not been deleted: {[err_info["Key"] for err_info in response["Errors"]]}.\nError Message: {response["Errors"]["Message"]}'
|
), f'The following objects have not been deleted: {[err_info["Key"] for err_info in response["Errors"]]}.\nError Message: {response["Errors"]["Message"]}'
|
||||||
sleep(S3_SYNC_WAIT_TIME)
|
|
||||||
return response
|
return response
|
||||||
|
|
||||||
@reporter.step("Delete object versions S3")
|
@reporter.step("Delete object versions S3")
|
||||||
|
@ -413,8 +409,10 @@ class Boto3ClientWrapper(S3ClientWrapper):
|
||||||
grant_write: Optional[str] = None,
|
grant_write: Optional[str] = None,
|
||||||
grant_read: Optional[str] = None,
|
grant_read: Optional[str] = None,
|
||||||
) -> list:
|
) -> list:
|
||||||
# pytest.skip("Method put_object_acl is not supported by boto3 client")
|
params = {self._to_s3_param(param): value for param, value in locals().items() if param not in ["self"] and value is not None}
|
||||||
raise NotImplementedError("Unsupported for boto3 client")
|
response = self.boto3_client.put_object_acl(**params)
|
||||||
|
log_command_execution("S3 put object ACL", response)
|
||||||
|
return response.get("Grants")
|
||||||
|
|
||||||
@reporter.step("Get object ACL")
|
@reporter.step("Get object ACL")
|
||||||
@report_error
|
@report_error
|
||||||
|
|
|
@ -47,7 +47,6 @@ def set_bucket_versioning(s3_client: S3ClientWrapper, bucket: str, status: Versi
|
||||||
if status == VersioningStatus.UNDEFINED:
|
if status == VersioningStatus.UNDEFINED:
|
||||||
return
|
return
|
||||||
|
|
||||||
s3_client.get_bucket_versioning_status(bucket)
|
|
||||||
s3_client.put_bucket_versioning(bucket, status=status)
|
s3_client.put_bucket_versioning(bucket, status=status)
|
||||||
bucket_status = s3_client.get_bucket_versioning_status(bucket)
|
bucket_status = s3_client.get_bucket_versioning_status(bucket)
|
||||||
assert bucket_status == status.value, f"Expected {bucket_status} status. Got {status.value}"
|
assert bucket_status == status.value, f"Expected {bucket_status} status. Got {status.value}"
|
||||||
|
|
|
@ -440,9 +440,11 @@ class ClusterStateController:
|
||||||
self.await_node_status(status, wallet, cluster_node)
|
self.await_node_status(status, wallet, cluster_node)
|
||||||
|
|
||||||
@wait_for_success(80, 8, title="Wait for node status become {status}")
|
@wait_for_success(80, 8, title="Wait for node status become {status}")
|
||||||
def await_node_status(self, status: NodeStatus, wallet: WalletInfo, cluster_node: ClusterNode):
|
def await_node_status(self, status: NodeStatus, wallet: WalletInfo, cluster_node: ClusterNode, checker_node: ClusterNode = None):
|
||||||
frostfs_cli = FrostfsCli(self.shell, FROSTFS_CLI_EXEC, wallet.config_path)
|
frostfs_cli = FrostfsCli(self.shell, FROSTFS_CLI_EXEC, wallet.config_path)
|
||||||
netmap = NetmapParser.snapshot_all_nodes(frostfs_cli.netmap.snapshot(cluster_node.storage_node.get_rpc_endpoint()).stdout)
|
if not checker_node:
|
||||||
|
checker_node = cluster_node
|
||||||
|
netmap = NetmapParser.snapshot_all_nodes(frostfs_cli.netmap.snapshot(checker_node.storage_node.get_rpc_endpoint()).stdout)
|
||||||
netmap = [node for node in netmap if cluster_node.host_ip == node.node]
|
netmap = [node for node in netmap if cluster_node.host_ip == node.node]
|
||||||
if status == NodeStatus.OFFLINE:
|
if status == NodeStatus.OFFLINE:
|
||||||
assert cluster_node.host_ip not in netmap, f"{cluster_node.host_ip} not in Offline"
|
assert cluster_node.host_ip not in netmap, f"{cluster_node.host_ip} not in Offline"
|
||||||
|
|
|
@ -2,6 +2,8 @@ import itertools
|
||||||
from concurrent.futures import Future, ThreadPoolExecutor
|
from concurrent.futures import Future, ThreadPoolExecutor
|
||||||
from typing import Callable, Collection, Optional, Union
|
from typing import Callable, Collection, Optional, Union
|
||||||
|
|
||||||
|
MAX_WORKERS = 50
|
||||||
|
|
||||||
|
|
||||||
def parallel(
|
def parallel(
|
||||||
fn: Union[Callable, list[Callable]],
|
fn: Union[Callable, list[Callable]],
|
||||||
|
@ -54,7 +56,7 @@ def _run_by_fn_list(fn_list: list[Callable], *args, **kwargs) -> list[Future]:
|
||||||
|
|
||||||
futures: list[Future] = []
|
futures: list[Future] = []
|
||||||
|
|
||||||
with ThreadPoolExecutor(max_workers=len(fn_list)) as executor:
|
with ThreadPoolExecutor(max_workers=min(len(fn_list), MAX_WORKERS)) as executor:
|
||||||
for fn in fn_list:
|
for fn in fn_list:
|
||||||
task_args = _get_args(*args)
|
task_args = _get_args(*args)
|
||||||
task_kwargs = _get_kwargs(**kwargs)
|
task_kwargs = _get_kwargs(**kwargs)
|
||||||
|
@ -67,7 +69,7 @@ def _run_by_fn_list(fn_list: list[Callable], *args, **kwargs) -> list[Future]:
|
||||||
def _run_by_items(fn: Callable, parallel_items: Collection, *args, **kwargs) -> list[Future]:
|
def _run_by_items(fn: Callable, parallel_items: Collection, *args, **kwargs) -> list[Future]:
|
||||||
futures: list[Future] = []
|
futures: list[Future] = []
|
||||||
|
|
||||||
with ThreadPoolExecutor(max_workers=len(parallel_items)) as executor:
|
with ThreadPoolExecutor(max_workers=min(len(parallel_items), MAX_WORKERS)) as executor:
|
||||||
for item in parallel_items:
|
for item in parallel_items:
|
||||||
task_args = _get_args(*args)
|
task_args = _get_args(*args)
|
||||||
task_kwargs = _get_kwargs(**kwargs)
|
task_kwargs = _get_kwargs(**kwargs)
|
||||||
|
|
Loading…
Reference in a new issue