Store k6 output and add socket info collection

Signed-off-by: Andrey Berezin <a.berezin@yadro.com>
This commit is contained in:
Andrey Berezin 2023-11-20 17:39:15 +03:00
parent ed8f90dfc0
commit 9ab4def44f
9 changed files with 99 additions and 110 deletions

View file

@ -1,4 +1,5 @@
from frostfs_testlib.load.interfaces import Loader, ScenarioRunner
from frostfs_testlib.load.interfaces.loader import Loader
from frostfs_testlib.load.interfaces.scenario_runner import ScenarioRunner
from frostfs_testlib.load.load_config import (
EndpointSelectionStrategy,
K6ProcessAllocationStrategy,

View file

@ -0,0 +1,14 @@
from abc import ABC, abstractmethod
from frostfs_testlib.shell.interfaces import Shell
class Loader(ABC):
@abstractmethod
def get_shell(self) -> Shell:
"""Get shell for the loader"""
@property
@abstractmethod
def ip(self):
"""Get address of the loader"""

View file

@ -1,20 +1,8 @@
from abc import ABC, abstractmethod
from frostfs_testlib.load.k6 import K6
from frostfs_testlib.load.load_config import LoadParams
from frostfs_testlib.shell.interfaces import Shell
from frostfs_testlib.storage.cluster import ClusterNode
from frostfs_testlib.storage.dataclasses.wallet import WalletInfo
class Loader(ABC):
@abstractmethod
def get_shell(self) -> Shell:
"""Get shell for the loader"""
@property
@abstractmethod
def ip(self):
"""Get address of the loader"""
class ScenarioRunner(ABC):
@ -32,6 +20,10 @@ class ScenarioRunner(ABC):
def init_k6_instances(self, load_params: LoadParams, endpoints: list[str], k6_dir: str):
"""Init K6 instances"""
@abstractmethod
def get_k6_instances(self) -> list[K6]:
"""Get K6 instances"""
@abstractmethod
def start(self):
"""Start K6 instances"""

View file

@ -8,13 +8,8 @@ from time import sleep
from typing import Any
from urllib.parse import urlparse
from frostfs_testlib.load.interfaces import Loader
from frostfs_testlib.load.load_config import (
K6ProcessAllocationStrategy,
LoadParams,
LoadScenario,
LoadType,
)
from frostfs_testlib.load.interfaces.loader import Loader
from frostfs_testlib.load.load_config import K6ProcessAllocationStrategy, LoadParams, LoadScenario, LoadType
from frostfs_testlib.processes.remote_process import RemoteProcess
from frostfs_testlib.reporter import get_reporter
from frostfs_testlib.resources.common import STORAGE_USER_NAME
@ -59,6 +54,7 @@ class K6:
self.loader: Loader = loader
self.shell: Shell = shell
self.wallet = wallet
self.preset_output: str = ""
self.summary_json: str = os.path.join(
self.load_params.working_dir,
f"{self.load_params.load_id}_{self.load_params.scenario.value}_summary.json",
@ -101,10 +97,10 @@ class K6:
command = " ".join(command_args)
result = self.shell.exec(command)
assert (
result.return_code == EXIT_RESULT_CODE
), f"Return code of preset is not zero: {result.stdout}"
return result.stdout.strip("\n")
assert result.return_code == EXIT_RESULT_CODE, f"Return code of preset is not zero: {result.stdout}"
self.preset_output = result.stdout.strip("\n")
return self.preset_output
@reporter.step_deco("Generate K6 command")
def _generate_env_variables(self) -> str:
@ -113,31 +109,21 @@ class K6:
env_vars[f"{self.load_params.load_type.value.upper()}_ENDPOINTS"] = ",".join(self.endpoints)
env_vars["SUMMARY_JSON"] = self.summary_json
reporter.attach(
"\n".join(f"{param}: {value}" for param, value in env_vars.items()), "K6 ENV variables"
)
return " ".join(
[f"-e {param}='{value}'" for param, value in env_vars.items() if value is not None]
)
reporter.attach("\n".join(f"{param}: {value}" for param, value in env_vars.items()), "K6 ENV variables")
return " ".join([f"-e {param}='{value}'" for param, value in env_vars.items() if value is not None])
def start(self) -> None:
with reporter.step(
f"Start load from loader {self.loader.ip} on endpoints {self.endpoints}"
):
with reporter.step(f"Start load from loader {self.loader.ip} on endpoints {self.endpoints}"):
self._start_time = int(datetime.utcnow().timestamp())
command = (
f"{self._k6_dir}/k6 run {self._generate_env_variables()} "
f"{self._k6_dir}/scenarios/{self.load_params.scenario.value}.js"
)
user = STORAGE_USER_NAME if self.load_params.scenario == LoadScenario.LOCAL else None
self._k6_process = RemoteProcess.create(
command, self.shell, self.load_params.working_dir, user
)
self._k6_process = RemoteProcess.create(command, self.shell, self.load_params.working_dir, user)
def wait_until_finished(self, soft_timeout: int = 0) -> None:
with reporter.step(
f"Wait until load is finished from loader {self.loader.ip} on endpoints {self.endpoints}"
):
with reporter.step(f"Wait until load is finished from loader {self.loader.ip} on endpoints {self.endpoints}"):
if self.load_params.scenario == LoadScenario.VERIFY:
timeout = self.load_params.verify_time or 0
else:
@ -180,9 +166,11 @@ class K6:
while timeout > 0:
if not self._k6_process.running():
return
remaining_time_hours = f"{timeout//3600}h" if timeout//3600 != 0 else ""
remaining_time_minutes = f"{timeout//60%60}m" if timeout//60%60 != 0 else ""
logger.info(f"K6 is running. Remaining time {remaining_time_hours}{remaining_time_minutes}{timeout%60}s. Next check after {wait_interval} seconds...")
remaining_time_hours = f"{timeout//3600}h" if timeout // 3600 != 0 else ""
remaining_time_minutes = f"{timeout//60%60}m" if timeout // 60 % 60 != 0 else ""
logger.info(
f"K6 is running. Remaining time {remaining_time_hours}{remaining_time_minutes}{timeout%60}s. Next check after {wait_interval} seconds..."
)
sleep(wait_interval)
timeout -= min(timeout, wait_interval)
wait_interval = max(
@ -198,9 +186,7 @@ class K6:
raise TimeoutError(f"Expected K6 to finish after {original_timeout} sec.")
def get_results(self) -> Any:
with reporter.step(
f"Get load results from loader {self.loader.ip} on endpoints {self.endpoints}"
):
with reporter.step(f"Get load results from loader {self.loader.ip} on endpoints {self.endpoints}"):
self.__log_output()
if not self.summary_json:
@ -231,9 +217,7 @@ class K6:
return False
@reporter.step_deco("Wait until K6 process end")
@wait_for_success(
K6_STOP_SIGNAL_TIMEOUT, 15, False, False, "Can not stop K6 process within timeout"
)
@wait_for_success(K6_STOP_SIGNAL_TIMEOUT, 15, False, False, "Can not stop K6 process within timeout")
def _wait_until_process_end(self):
return self._k6_process.running()

View file

@ -1,4 +1,4 @@
from frostfs_testlib.load.interfaces import Loader
from frostfs_testlib.load.interfaces.loader import Loader
from frostfs_testlib.resources.load_params import (
LOAD_NODE_SSH_PASSWORD,
LOAD_NODE_SSH_PRIVATE_KEY_PASSPHRASE,

View file

@ -10,7 +10,8 @@ from urllib.parse import urlparse
import yaml
from frostfs_testlib.cli.frostfs_authmate.authmate import FrostfsAuthmate
from frostfs_testlib.load.interfaces import Loader, ScenarioRunner
from frostfs_testlib.load.interfaces.loader import Loader
from frostfs_testlib.load.interfaces.scenario_runner import ScenarioRunner
from frostfs_testlib.load.k6 import K6
from frostfs_testlib.load.load_config import K6ProcessAllocationStrategy, LoadParams, LoadType
from frostfs_testlib.load.loaders import NodeLoader, RemoteLoader
@ -50,6 +51,9 @@ class RunnerBase(ScenarioRunner):
return any([future.result() for future in futures])
def get_k6_instances(self):
return self.k6_instances
class DefaultRunner(RunnerBase):
loaders: list[Loader]
@ -391,6 +395,7 @@ class LocalRunner(RunnerBase):
return results
class S3LocalRunner(LocalRunner):
endpoints: list[str]
k6_dir: str
@ -404,7 +409,8 @@ class S3LocalRunner(LocalRunner):
@reporter.step_deco("Resolve containers in preset")
def _resolve_containers_in_preset(self, k6_instance: K6):
k6_instance.shell.exec(
f"sudo {self.k6_dir}/scenarios/preset/resolve_containers_in_preset.py --endpoint {k6_instance.endpoints[0]} --preset_file {k6_instance.load_params.preset.pregen_json}")
f"sudo {self.k6_dir}/scenarios/preset/resolve_containers_in_preset.py --endpoint {k6_instance.endpoints[0]} --preset_file {k6_instance.load_params.preset.pregen_json}"
)
@reporter.step_deco("Init k6 instances")
def init_k6_instances(self, load_params: LoadParams, endpoints: list[str], k6_dir: str):
@ -426,9 +432,9 @@ class S3LocalRunner(LocalRunner):
# If we chmod /home/<user_name> folder we can no longer ssh to the node
# !! IMPORTANT !!
if (
load_params.working_dir
and not load_params.working_dir == f"/home/{LOAD_NODE_SSH_USER}"
and not load_params.working_dir == f"/home/{LOAD_NODE_SSH_USER}/"
load_params.working_dir
and not load_params.working_dir == f"/home/{LOAD_NODE_SSH_USER}"
and not load_params.working_dir == f"/home/{LOAD_NODE_SSH_USER}/"
):
shell.exec(f"sudo chmod -R 777 {load_params.working_dir}")
@ -444,30 +450,25 @@ class S3LocalRunner(LocalRunner):
@run_optionally(optionals.OPTIONAL_BACKGROUND_LOAD_ENABLED)
@reporter.step_deco("Preparation steps")
def prepare(
self,
load_params: LoadParams,
cluster_nodes: list[ClusterNode],
nodes_under_load: list[ClusterNode],
k6_dir: str,
self,
load_params: LoadParams,
cluster_nodes: list[ClusterNode],
nodes_under_load: list[ClusterNode],
k6_dir: str,
):
self.k6_dir = k6_dir
with reporter.step("Init s3 client on loaders"):
storage_node = nodes_under_load[0].service(StorageNode)
s3_public_keys = [
node.service(S3Gate).get_wallet_public_key() for node in cluster_nodes
]
s3_public_keys = [node.service(S3Gate).get_wallet_public_key() for node in cluster_nodes]
grpc_peer = storage_node.get_rpc_endpoint()
parallel(self.prepare_node, nodes_under_load, k6_dir, load_params, s3_public_keys, grpc_peer)
@reporter.step_deco("Prepare node {cluster_node}")
def prepare_node(self,
cluster_node: ClusterNode,
k6_dir: str,
load_params: LoadParams,
s3_public_keys: list[str],
grpc_peer: str):
LocalRunner.prepare_node(self,cluster_node, k6_dir, load_params)
def prepare_node(
self, cluster_node: ClusterNode, k6_dir: str, load_params: LoadParams, s3_public_keys: list[str], grpc_peer: str
):
LocalRunner.prepare_node(self, cluster_node, k6_dir, load_params)
self.endpoints = cluster_node.s3_gate.get_all_endpoints()
shell = cluster_node.host.get_shell()
@ -497,9 +498,9 @@ class S3LocalRunner(LocalRunner):
wallet_password=self.wallet.password,
).stdout
aws_access_key_id = str(
re.search(
r"access_key_id.*:\s.(?P<aws_access_key_id>\w*)", issue_secret_output
).group("aws_access_key_id")
re.search(r"access_key_id.*:\s.(?P<aws_access_key_id>\w*)", issue_secret_output).group(
"aws_access_key_id"
)
)
aws_secret_access_key = str(
re.search(
@ -509,10 +510,8 @@ class S3LocalRunner(LocalRunner):
)
configure_input = [
InteractiveInput(prompt_pattern=r"AWS Access Key ID.*", input=aws_access_key_id),
InteractiveInput(
prompt_pattern=r"AWS Secret Access Key.*", input=aws_secret_access_key
),
InteractiveInput(prompt_pattern=r"AWS Secret Access Key.*", input=aws_secret_access_key),
InteractiveInput(prompt_pattern=r".*", input=""),
InteractiveInput(prompt_pattern=r".*", input=""),
]
shell.exec("aws configure", CommandOptions(interactive_inputs=configure_input))
shell.exec("aws configure", CommandOptions(interactive_inputs=configure_input))