Updates for local scenario teardown #116
2 changed files with 21 additions and 50 deletions
|
@ -18,11 +18,7 @@ from frostfs_testlib.reporter import get_reporter
|
||||||
from frostfs_testlib.resources import optionals
|
from frostfs_testlib.resources import optionals
|
||||||
from frostfs_testlib.resources.cli import FROSTFS_AUTHMATE_EXEC
|
from frostfs_testlib.resources.cli import FROSTFS_AUTHMATE_EXEC
|
||||||
from frostfs_testlib.resources.common import STORAGE_USER_NAME
|
from frostfs_testlib.resources.common import STORAGE_USER_NAME
|
||||||
from frostfs_testlib.resources.load_params import (
|
from frostfs_testlib.resources.load_params import BACKGROUND_LOAD_VUS_COUNT_DIVISOR, LOAD_NODE_SSH_USER, LOAD_NODES
|
||||||
BACKGROUND_LOAD_VUS_COUNT_DIVISOR,
|
|
||||||
LOAD_NODE_SSH_USER,
|
|
||||||
LOAD_NODES,
|
|
||||||
)
|
|
||||||
from frostfs_testlib.shell.command_inspectors import SuInspector
|
from frostfs_testlib.shell.command_inspectors import SuInspector
|
||||||
from frostfs_testlib.shell.interfaces import CommandOptions, InteractiveInput
|
from frostfs_testlib.shell.interfaces import CommandOptions, InteractiveInput
|
||||||
from frostfs_testlib.storage.cluster import ClusterNode
|
from frostfs_testlib.storage.cluster import ClusterNode
|
||||||
|
@ -83,14 +79,10 @@ class DefaultRunner(RunnerBase):
|
||||||
|
|
||||||
with reporter.step("Init s3 client on loaders"):
|
with reporter.step("Init s3 client on loaders"):
|
||||||
storage_node = nodes_under_load[0].service(StorageNode)
|
storage_node = nodes_under_load[0].service(StorageNode)
|
||||||
s3_public_keys = [
|
s3_public_keys = [node.service(S3Gate).get_wallet_public_key() for node in cluster_nodes]
|
||||||
node.service(S3Gate).get_wallet_public_key() for node in cluster_nodes
|
|
||||||
]
|
|
||||||
grpc_peer = storage_node.get_rpc_endpoint()
|
grpc_peer = storage_node.get_rpc_endpoint()
|
||||||
|
|
||||||
parallel(
|
parallel(self._prepare_loader, self.loaders, load_params, grpc_peer, s3_public_keys, k6_dir)
|
||||||
self._prepare_loader, self.loaders, load_params, grpc_peer, s3_public_keys, k6_dir
|
|
||||||
)
|
|
||||||
|
|
||||||
def _prepare_loader(
|
def _prepare_loader(
|
||||||
self,
|
self,
|
||||||
|
@ -112,9 +104,9 @@ class DefaultRunner(RunnerBase):
|
||||||
wallet_password=self.loaders_wallet.password,
|
wallet_password=self.loaders_wallet.password,
|
||||||
).stdout
|
).stdout
|
||||||
aws_access_key_id = str(
|
aws_access_key_id = str(
|
||||||
re.search(
|
re.search(r"access_key_id.*:\s.(?P<aws_access_key_id>\w*)", issue_secret_output).group(
|
||||||
r"access_key_id.*:\s.(?P<aws_access_key_id>\w*)", issue_secret_output
|
"aws_access_key_id"
|
||||||
).group("aws_access_key_id")
|
)
|
||||||
)
|
)
|
||||||
aws_secret_access_key = str(
|
aws_secret_access_key = str(
|
||||||
re.search(
|
re.search(
|
||||||
|
@ -125,9 +117,7 @@ class DefaultRunner(RunnerBase):
|
||||||
|
|
||||||
configure_input = [
|
configure_input = [
|
||||||
InteractiveInput(prompt_pattern=r"AWS Access Key ID.*", input=aws_access_key_id),
|
InteractiveInput(prompt_pattern=r"AWS Access Key ID.*", input=aws_access_key_id),
|
||||||
InteractiveInput(
|
InteractiveInput(prompt_pattern=r"AWS Secret Access Key.*", input=aws_secret_access_key),
|
||||||
prompt_pattern=r"AWS Secret Access Key.*", input=aws_secret_access_key
|
|
||||||
),
|
|
||||||
InteractiveInput(prompt_pattern=r".*", input=""),
|
InteractiveInput(prompt_pattern=r".*", input=""),
|
||||||
InteractiveInput(prompt_pattern=r".*", input=""),
|
InteractiveInput(prompt_pattern=r".*", input=""),
|
||||||
]
|
]
|
||||||
|
@ -144,16 +134,12 @@ class DefaultRunner(RunnerBase):
|
||||||
}
|
}
|
||||||
endpoints_generators = {
|
endpoints_generators = {
|
||||||
K6ProcessAllocationStrategy.PER_LOAD_NODE: itertools.cycle([endpoints]),
|
K6ProcessAllocationStrategy.PER_LOAD_NODE: itertools.cycle([endpoints]),
|
||||||
K6ProcessAllocationStrategy.PER_ENDPOINT: itertools.cycle(
|
K6ProcessAllocationStrategy.PER_ENDPOINT: itertools.cycle([[endpoint] for endpoint in endpoints]),
|
||||||
[[endpoint] for endpoint in endpoints]
|
|
||||||
),
|
|
||||||
}
|
}
|
||||||
k6_processes_count = k6_distribution_count[load_params.k6_process_allocation_strategy]
|
k6_processes_count = k6_distribution_count[load_params.k6_process_allocation_strategy]
|
||||||
endpoints_gen = endpoints_generators[load_params.k6_process_allocation_strategy]
|
endpoints_gen = endpoints_generators[load_params.k6_process_allocation_strategy]
|
||||||
|
|
||||||
distributed_load_params_list = self._get_distributed_load_params_list(
|
distributed_load_params_list = self._get_distributed_load_params_list(load_params, k6_processes_count)
|
||||||
load_params, k6_processes_count
|
|
||||||
)
|
|
||||||
|
|
||||||
futures = parallel(
|
futures = parallel(
|
||||||
self._init_k6_instance,
|
self._init_k6_instance,
|
||||||
|
@ -164,9 +150,7 @@ class DefaultRunner(RunnerBase):
|
||||||
)
|
)
|
||||||
self.k6_instances = [future.result() for future in futures]
|
self.k6_instances = [future.result() for future in futures]
|
||||||
|
|
||||||
def _init_k6_instance(
|
def _init_k6_instance(self, load_params_for_loader: LoadParams, loader: Loader, endpoints: list[str], k6_dir: str):
|
||||||
self, load_params_for_loader: LoadParams, loader: Loader, endpoints: list[str], k6_dir: str
|
|
||||||
):
|
|
||||||
shell = loader.get_shell()
|
shell = loader.get_shell()
|
||||||
with reporter.step(f"Init K6 instance on {loader.ip} for endpoints {endpoints}"):
|
with reporter.step(f"Init K6 instance on {loader.ip} for endpoints {endpoints}"):
|
||||||
with reporter.step(f"Make working directory"):
|
with reporter.step(f"Make working directory"):
|
||||||
|
@ -204,9 +188,7 @@ class DefaultRunner(RunnerBase):
|
||||||
and getattr(original_load_params, field.name) is not None
|
and getattr(original_load_params, field.name) is not None
|
||||||
):
|
):
|
||||||
original_value = getattr(original_load_params, field.name)
|
original_value = getattr(original_load_params, field.name)
|
||||||
distribution = self._get_distribution(
|
distribution = self._get_distribution(math.ceil(original_value / divisor), workers_count)
|
||||||
math.ceil(original_value / divisor), workers_count
|
|
||||||
)
|
|
||||||
for i in range(workers_count):
|
for i in range(workers_count):
|
||||||
setattr(distributed_load_params[i], field.name, distribution[i])
|
setattr(distributed_load_params[i], field.name, distribution[i])
|
||||||
|
|
||||||
|
@ -233,10 +215,7 @@ class DefaultRunner(RunnerBase):
|
||||||
# Remainder of clients left to be distributed
|
# Remainder of clients left to be distributed
|
||||||
remainder = clients_count - clients_per_worker * workers_count
|
remainder = clients_count - clients_per_worker * workers_count
|
||||||
|
|
||||||
distribution = [
|
distribution = [clients_per_worker + 1 if i < remainder else clients_per_worker for i in range(workers_count)]
|
||||||
clients_per_worker + 1 if i < remainder else clients_per_worker
|
|
||||||
for i in range(workers_count)
|
|
||||||
]
|
|
||||||
return distribution
|
return distribution
|
||||||
|
|
||||||
def start(self):
|
def start(self):
|
||||||
|
@ -245,9 +224,7 @@ class DefaultRunner(RunnerBase):
|
||||||
parallel([k6.start for k6 in self.k6_instances])
|
parallel([k6.start for k6 in self.k6_instances])
|
||||||
|
|
||||||
wait_after_start_time = datetime_utils.parse_time(load_params.setup_timeout) + 5
|
wait_after_start_time = datetime_utils.parse_time(load_params.setup_timeout) + 5
|
||||||
with reporter.step(
|
with reporter.step(f"Wait for start timeout + couple more seconds ({wait_after_start_time}) before moving on"):
|
||||||
f"Wait for start timeout + couple more seconds ({wait_after_start_time}) before moving on"
|
|
||||||
):
|
|
||||||
time.sleep(wait_after_start_time)
|
time.sleep(wait_after_start_time)
|
||||||
|
|
||||||
def stop(self):
|
def stop(self):
|
||||||
|
@ -327,9 +304,7 @@ class LocalRunner(RunnerBase):
|
||||||
with reporter.step("Update limits.conf"):
|
with reporter.step("Update limits.conf"):
|
||||||
limits_path = "/etc/security/limits.conf"
|
limits_path = "/etc/security/limits.conf"
|
||||||
self.file_keeper.add(cluster_node.storage_node, limits_path)
|
self.file_keeper.add(cluster_node.storage_node, limits_path)
|
||||||
content = (
|
content = f"{STORAGE_USER_NAME} hard nofile 65536\n{STORAGE_USER_NAME} soft nofile 65536\n"
|
||||||
f"{STORAGE_USER_NAME} hard nofile 65536\n{STORAGE_USER_NAME} soft nofile 65536\n"
|
|
||||||
)
|
|
||||||
shell.exec(f"echo '{content}' | sudo tee {limits_path}")
|
shell.exec(f"echo '{content}' | sudo tee {limits_path}")
|
||||||
|
|
||||||
with reporter.step("Download K6"):
|
with reporter.step("Download K6"):
|
||||||
|
@ -339,9 +314,7 @@ class LocalRunner(RunnerBase):
|
||||||
shell.exec(f"sudo chmod -R 777 {k6_dir}")
|
shell.exec(f"sudo chmod -R 777 {k6_dir}")
|
||||||
|
|
||||||
with reporter.step("Create empty_passwd"):
|
with reporter.step("Create empty_passwd"):
|
||||||
self.wallet = WalletInfo(
|
self.wallet = WalletInfo(f"{k6_dir}/scenarios/files/wallet.json", "", "/tmp/empty_passwd.yml")
|
||||||
f"{k6_dir}/scenarios/files/wallet.json", "", "/tmp/empty_passwd.yml"
|
|
||||||
)
|
|
||||||
content = yaml.dump({"password": ""})
|
content = yaml.dump({"password": ""})
|
||||||
shell.exec(f'echo "{content}" | sudo tee {self.wallet.config_path}')
|
shell.exec(f'echo "{content}" | sudo tee {self.wallet.config_path}')
|
||||||
shell.exec(f"sudo chmod -R 777 {self.wallet.config_path}")
|
shell.exec(f"sudo chmod -R 777 {self.wallet.config_path}")
|
||||||
|
@ -383,15 +356,13 @@ class LocalRunner(RunnerBase):
|
||||||
def start(self):
|
def start(self):
|
||||||
load_params = self.k6_instances[0].load_params
|
load_params = self.k6_instances[0].load_params
|
||||||
|
|
||||||
self.cluster_state_controller.stop_all_s3_gates()
|
self.cluster_state_controller.stop_services_of_type(S3Gate)
|
||||||
self.cluster_state_controller.stop_all_storage_services()
|
self.cluster_state_controller.stop_services_of_type(StorageNode)
|
||||||
|
|
||||||
parallel([k6.start for k6 in self.k6_instances])
|
parallel([k6.start for k6 in self.k6_instances])
|
||||||
|
|
||||||
wait_after_start_time = datetime_utils.parse_time(load_params.setup_timeout) + 5
|
wait_after_start_time = datetime_utils.parse_time(load_params.setup_timeout) + 5
|
||||||
with reporter.step(
|
with reporter.step(f"Wait for start timeout + couple more seconds ({wait_after_start_time}) before moving on"):
|
||||||
f"Wait for start timeout + couple more seconds ({wait_after_start_time}) before moving on"
|
|
||||||
):
|
|
||||||
time.sleep(wait_after_start_time)
|
time.sleep(wait_after_start_time)
|
||||||
|
|
||||||
@reporter.step_deco("Restore passwd on {cluster_node}")
|
@reporter.step_deco("Restore passwd on {cluster_node}")
|
||||||
|
@ -408,8 +379,7 @@ class LocalRunner(RunnerBase):
|
||||||
for k6_instance in self.k6_instances:
|
for k6_instance in self.k6_instances:
|
||||||
k6_instance.stop()
|
k6_instance.stop()
|
||||||
|
|
||||||
self.cluster_state_controller.start_stopped_storage_services()
|
self.cluster_state_controller.start_all_stopped_services()
|
||||||
self.cluster_state_controller.start_stopped_s3_gates()
|
|
||||||
|
|
||||||
def get_results(self) -> dict:
|
def get_results(self) -> dict:
|
||||||
results = {}
|
results = {}
|
||||||
|
|
|
@ -173,6 +173,7 @@ class ClusterStateController:
|
||||||
@reporter.step_deco("Wait for S3Gates reconnection to local storage")
|
@reporter.step_deco("Wait for S3Gates reconnection to local storage")
|
||||||
def wait_s3gates(self):
|
def wait_s3gates(self):
|
||||||
online_s3gates = self._get_online(S3Gate)
|
online_s3gates = self._get_online(S3Gate)
|
||||||
|
if online_s3gates:
|
||||||
parallel(self.wait_s3gate, online_s3gates)
|
parallel(self.wait_s3gate, online_s3gates)
|
||||||
|
|
||||||
@wait_for_success(600, 60)
|
@wait_for_success(600, 60)
|
||||||
|
|
Loading…
Reference in a new issue