Store k6 output and add socket info collection #120
9 changed files with 59 additions and 26 deletions
|
@ -1,4 +1,5 @@
|
||||||
from frostfs_testlib.load.interfaces import Loader, ScenarioRunner
|
from frostfs_testlib.load.interfaces.loader import Loader
|
||||||
|
from frostfs_testlib.load.interfaces.scenario_runner import ScenarioRunner
|
||||||
from frostfs_testlib.load.load_config import (
|
from frostfs_testlib.load.load_config import (
|
||||||
EndpointSelectionStrategy,
|
EndpointSelectionStrategy,
|
||||||
K6ProcessAllocationStrategy,
|
K6ProcessAllocationStrategy,
|
||||||
|
|
14
src/frostfs_testlib/load/interfaces/loader.py
Normal file
14
src/frostfs_testlib/load/interfaces/loader.py
Normal file
|
@ -0,0 +1,14 @@
|
||||||
|
from abc import ABC, abstractmethod
|
||||||
|
|
||||||
|
from frostfs_testlib.shell.interfaces import Shell
|
||||||
|
|
||||||
|
|
||||||
|
class Loader(ABC):
|
||||||
|
@abstractmethod
|
||||||
|
def get_shell(self) -> Shell:
|
||||||
|
"""Get shell for the loader"""
|
||||||
|
|
||||||
|
@property
|
||||||
|
@abstractmethod
|
||||||
|
def ip(self):
|
||||||
|
"""Get address of the loader"""
|
|
@ -1,20 +1,8 @@
|
||||||
from abc import ABC, abstractmethod
|
from abc import ABC, abstractmethod
|
||||||
|
|
||||||
|
from frostfs_testlib.load.k6 import K6
|
||||||
from frostfs_testlib.load.load_config import LoadParams
|
from frostfs_testlib.load.load_config import LoadParams
|
||||||
from frostfs_testlib.shell.interfaces import Shell
|
|
||||||
from frostfs_testlib.storage.cluster import ClusterNode
|
from frostfs_testlib.storage.cluster import ClusterNode
|
||||||
from frostfs_testlib.storage.dataclasses.wallet import WalletInfo
|
|
||||||
|
|
||||||
|
|
||||||
class Loader(ABC):
|
|
||||||
@abstractmethod
|
|
||||||
def get_shell(self) -> Shell:
|
|
||||||
"""Get shell for the loader"""
|
|
||||||
|
|
||||||
@property
|
|
||||||
@abstractmethod
|
|
||||||
def ip(self):
|
|
||||||
"""Get address of the loader"""
|
|
||||||
|
|
||||||
|
|
||||||
class ScenarioRunner(ABC):
|
class ScenarioRunner(ABC):
|
||||||
|
@ -32,6 +20,10 @@ class ScenarioRunner(ABC):
|
||||||
def init_k6_instances(self, load_params: LoadParams, endpoints: list[str], k6_dir: str):
|
def init_k6_instances(self, load_params: LoadParams, endpoints: list[str], k6_dir: str):
|
||||||
"""Init K6 instances"""
|
"""Init K6 instances"""
|
||||||
|
|
||||||
|
@abstractmethod
|
||||||
|
def get_k6_instances(self) -> list[K6]:
|
||||||
|
"""Get K6 instances"""
|
||||||
|
|
||||||
@abstractmethod
|
@abstractmethod
|
||||||
def start(self):
|
def start(self):
|
||||||
"""Start K6 instances"""
|
"""Start K6 instances"""
|
|
@ -8,7 +8,7 @@ from time import sleep
|
||||||
from typing import Any
|
from typing import Any
|
||||||
from urllib.parse import urlparse
|
from urllib.parse import urlparse
|
||||||
|
|
||||||
from frostfs_testlib.load.interfaces import Loader
|
from frostfs_testlib.load.interfaces.loader import Loader
|
||||||
from frostfs_testlib.load.load_config import (
|
from frostfs_testlib.load.load_config import (
|
||||||
K6ProcessAllocationStrategy,
|
K6ProcessAllocationStrategy,
|
||||||
LoadParams,
|
LoadParams,
|
||||||
|
@ -59,6 +59,7 @@ class K6:
|
||||||
self.loader: Loader = loader
|
self.loader: Loader = loader
|
||||||
self.shell: Shell = shell
|
self.shell: Shell = shell
|
||||||
self.wallet = wallet
|
self.wallet = wallet
|
||||||
|
self.preset_output: str = ""
|
||||||
self.summary_json: str = os.path.join(
|
self.summary_json: str = os.path.join(
|
||||||
self.load_params.working_dir,
|
self.load_params.working_dir,
|
||||||
f"{self.load_params.load_id}_{self.load_params.scenario.value}_summary.json",
|
f"{self.load_params.load_id}_{self.load_params.scenario.value}_summary.json",
|
||||||
|
@ -104,7 +105,9 @@ class K6:
|
||||||
assert (
|
assert (
|
||||||
result.return_code == EXIT_RESULT_CODE
|
result.return_code == EXIT_RESULT_CODE
|
||||||
), f"Return code of preset is not zero: {result.stdout}"
|
), f"Return code of preset is not zero: {result.stdout}"
|
||||||
return result.stdout.strip("\n")
|
|
||||||
|
self.preset_output = result.stdout.strip("\n")
|
||||||
|
return self.preset_output
|
||||||
|
|
||||||
@reporter.step_deco("Generate K6 command")
|
@reporter.step_deco("Generate K6 command")
|
||||||
def _generate_env_variables(self) -> str:
|
def _generate_env_variables(self) -> str:
|
||||||
|
|
|
@ -1,4 +1,4 @@
|
||||||
from frostfs_testlib.load.interfaces import Loader
|
from frostfs_testlib.load.interfaces.loader import Loader
|
||||||
from frostfs_testlib.resources.load_params import (
|
from frostfs_testlib.resources.load_params import (
|
||||||
LOAD_NODE_SSH_PASSWORD,
|
LOAD_NODE_SSH_PASSWORD,
|
||||||
LOAD_NODE_SSH_PRIVATE_KEY_PASSPHRASE,
|
LOAD_NODE_SSH_PRIVATE_KEY_PASSPHRASE,
|
||||||
|
|
|
@ -10,7 +10,8 @@ from urllib.parse import urlparse
|
||||||
import yaml
|
import yaml
|
||||||
|
|
||||||
from frostfs_testlib.cli.frostfs_authmate.authmate import FrostfsAuthmate
|
from frostfs_testlib.cli.frostfs_authmate.authmate import FrostfsAuthmate
|
||||||
from frostfs_testlib.load.interfaces import Loader, ScenarioRunner
|
from frostfs_testlib.load.interfaces.loader import Loader
|
||||||
|
from frostfs_testlib.load.interfaces.scenario_runner import ScenarioRunner
|
||||||
from frostfs_testlib.load.k6 import K6
|
from frostfs_testlib.load.k6 import K6
|
||||||
from frostfs_testlib.load.load_config import K6ProcessAllocationStrategy, LoadParams, LoadType
|
from frostfs_testlib.load.load_config import K6ProcessAllocationStrategy, LoadParams, LoadType
|
||||||
from frostfs_testlib.load.loaders import NodeLoader, RemoteLoader
|
from frostfs_testlib.load.loaders import NodeLoader, RemoteLoader
|
||||||
|
@ -54,6 +55,9 @@ class RunnerBase(ScenarioRunner):
|
||||||
|
|
||||||
return any([future.result() for future in futures])
|
return any([future.result() for future in futures])
|
||||||
|
|
||||||
|
def get_k6_instances(self):
|
||||||
|
return self.k6_instances
|
||||||
|
|
||||||
|
|
||||||
class DefaultRunner(RunnerBase):
|
class DefaultRunner(RunnerBase):
|
||||||
loaders: list[Loader]
|
loaders: list[Loader]
|
||||||
|
|
|
@ -2,7 +2,7 @@ import copy
|
||||||
from typing import Optional
|
from typing import Optional
|
||||||
|
|
||||||
import frostfs_testlib.resources.optionals as optionals
|
import frostfs_testlib.resources.optionals as optionals
|
||||||
from frostfs_testlib.load.interfaces import ScenarioRunner
|
from frostfs_testlib.load.interfaces.scenario_runner import ScenarioRunner
|
||||||
from frostfs_testlib.load.load_config import (
|
from frostfs_testlib.load.load_config import (
|
||||||
EndpointSelectionStrategy,
|
EndpointSelectionStrategy,
|
||||||
LoadParams,
|
LoadParams,
|
||||||
|
|
|
@ -85,12 +85,14 @@ class ClusterStateController:
|
||||||
|
|
||||||
@run_optionally(optionals.OPTIONAL_FAILOVER_ENABLED)
|
@run_optionally(optionals.OPTIONAL_FAILOVER_ENABLED)
|
||||||
@reporter.step_deco("Start host of node {node}")
|
@reporter.step_deco("Start host of node {node}")
|
||||||
def start_node_host(self, node: ClusterNode):
|
def start_node_host(self, node: ClusterNode, tree_healthcheck: bool = True):
|
||||||
with reporter.step(f"Start host {node.host.config.address}"):
|
with reporter.step(f"Start host {node.host.config.address}"):
|
||||||
node.host.start_host()
|
node.host.start_host()
|
||||||
wait_for_host_online(self.shell, node.storage_node)
|
wait_for_host_online(self.shell, node.storage_node)
|
||||||
wait_for_node_online(node.storage_node)
|
|
||||||
self.stopped_nodes.remove(node)
|
self.stopped_nodes.remove(node)
|
||||||
|
wait_for_node_online(node.storage_node)
|
||||||
|
if tree_healthcheck:
|
||||||
|
self.wait_tree_healthcheck()
|
||||||
|
|
||||||
@run_optionally(optionals.OPTIONAL_FAILOVER_ENABLED)
|
@run_optionally(optionals.OPTIONAL_FAILOVER_ENABLED)
|
||||||
@reporter.step_deco("Start stopped hosts")
|
@reporter.step_deco("Start stopped hosts")
|
||||||
|
@ -350,7 +352,9 @@ class ClusterStateController:
|
||||||
|
|
||||||
@run_optionally(optionals.OPTIONAL_FAILOVER_ENABLED)
|
@run_optionally(optionals.OPTIONAL_FAILOVER_ENABLED)
|
||||||
@reporter.step_deco("Hard reboot host {node} via magic SysRq option")
|
@reporter.step_deco("Hard reboot host {node} via magic SysRq option")
|
||||||
def panic_reboot_host(self, node: ClusterNode, wait_for_return: bool = True):
|
def panic_reboot_host(
|
||||||
|
self, node: ClusterNode, wait_for_return: bool = True, tree_healthcheck: bool = True
|
||||||
|
):
|
||||||
shell = node.host.get_shell()
|
shell = node.host.get_shell()
|
||||||
shell.exec('sudo sh -c "echo 1 > /proc/sys/kernel/sysrq"')
|
shell.exec('sudo sh -c "echo 1 > /proc/sys/kernel/sysrq"')
|
||||||
|
|
||||||
|
@ -367,6 +371,8 @@ class ClusterStateController:
|
||||||
time.sleep(10)
|
time.sleep(10)
|
||||||
wait_for_host_online(self.shell, node.storage_node)
|
wait_for_host_online(self.shell, node.storage_node)
|
||||||
wait_for_node_online(node.storage_node)
|
wait_for_node_online(node.storage_node)
|
||||||
|
if tree_healthcheck:
|
||||||
|
self.wait_tree_healthcheck()
|
||||||
|
|
||||||
@reporter.step_deco("Down {interface} to {nodes}")
|
@reporter.step_deco("Down {interface} to {nodes}")
|
||||||
def down_interface(self, nodes: list[ClusterNode], interface: str):
|
def down_interface(self, nodes: list[ClusterNode], interface: str):
|
||||||
|
|
|
@ -12,6 +12,7 @@ from frostfs_testlib.steps.node_management import storage_node_healthcheck
|
||||||
from frostfs_testlib.steps.storage_policy import get_nodes_with_object
|
from frostfs_testlib.steps.storage_policy import get_nodes_with_object
|
||||||
from frostfs_testlib.storage.cluster import Cluster, ClusterNode, NodeBase, StorageNode
|
from frostfs_testlib.storage.cluster import Cluster, ClusterNode, NodeBase, StorageNode
|
||||||
from frostfs_testlib.storage.dataclasses.frostfs_services import MorphChain
|
from frostfs_testlib.storage.dataclasses.frostfs_services import MorphChain
|
||||||
|
from frostfs_testlib.testing.parallel import parallel
|
||||||
from frostfs_testlib.testing.test_control import retry, wait_for_success
|
from frostfs_testlib.testing.test_control import retry, wait_for_success
|
||||||
from frostfs_testlib.utils.datetime_utils import parse_time
|
from frostfs_testlib.utils.datetime_utils import parse_time
|
||||||
|
|
||||||
|
@ -26,10 +27,15 @@ def ping_host(shell: Shell, host: Host):
|
||||||
return shell.exec(f"ping {host.config.address} -c 1", options).return_code
|
return shell.exec(f"ping {host.config.address} -c 1", options).return_code
|
||||||
|
|
||||||
|
|
||||||
|
# TODO: Move to ClusterStateController
|
||||||
@reporter.step_deco("Wait for storage nodes returned to cluster")
|
@reporter.step_deco("Wait for storage nodes returned to cluster")
|
||||||
def wait_all_storage_nodes_returned(shell: Shell, cluster: Cluster) -> None:
|
def wait_all_storage_nodes_returned(shell: Shell, cluster: Cluster) -> None:
|
||||||
for node in cluster.services(StorageNode):
|
nodes = cluster.services(StorageNode)
|
||||||
with reporter.step(f"Run health check for storage at '{node}'"):
|
parallel(_wait_for_storage_node, nodes, shell=shell)
|
||||||
|
|
||||||
|
|
||||||
|
@reporter.step_deco("Run health check for storage at '{node}'")
|
||||||
|
def _wait_for_storage_node(node: StorageNode, shell: Shell) -> None:
|
||||||
wait_for_host_online(shell, node)
|
wait_for_host_online(shell, node)
|
||||||
wait_for_node_online(node)
|
wait_for_node_online(node)
|
||||||
|
|
||||||
|
@ -64,10 +70,17 @@ def wait_for_node_online(node: StorageNode):
|
||||||
except Exception as err:
|
except Exception as err:
|
||||||
logger.warning(f"Node healthcheck fails with error {err}")
|
logger.warning(f"Node healthcheck fails with error {err}")
|
||||||
return False
|
return False
|
||||||
|
finally:
|
||||||
|
gather_socket_info(node)
|
||||||
|
|
||||||
return health_check.health_status == "READY" and health_check.network_status == "ONLINE"
|
return health_check.health_status == "READY" and health_check.network_status == "ONLINE"
|
||||||
|
|
||||||
|
|
||||||
|
@reporter.step_deco("Gather socket info for {node}")
|
||||||
|
def gather_socket_info(node: StorageNode):
|
||||||
|
node.host.get_shell().exec("ss -tuln | grep 8080", CommandOptions(check=False))
|
||||||
|
|
||||||
|
|
||||||
@reporter.step_deco("Check and return status of given service")
|
@reporter.step_deco("Check and return status of given service")
|
||||||
def service_status(service: str, shell: Shell) -> str:
|
def service_status(service: str, shell: Shell) -> str:
|
||||||
return shell.exec(f"sudo systemctl is-active {service}").stdout.rstrip()
|
return shell.exec(f"sudo systemctl is-active {service}").stdout.rstrip()
|
||||||
|
|
Loading…
Reference in a new issue