Compare commits

...

1 commit

Author SHA1 Message Date
5687b79b38 [#109] Update CSC with healthchecks
(cherry picked from commit e970fe2788)
Signed-off-by: Dmitry Anurin <d.anurin@yadro.com>
2023-11-13 12:03:59 +03:00
6 changed files with 212 additions and 113 deletions

View file

@ -1,5 +1,7 @@
from frostfs_testlib.cli.frostfs_cli.cli import FrostfsCli
from frostfs_testlib.healthcheck.interfaces import Healthcheck from frostfs_testlib.healthcheck.interfaces import Healthcheck
from frostfs_testlib.reporter import get_reporter from frostfs_testlib.reporter import get_reporter
from frostfs_testlib.resources.cli import FROSTFS_CLI_EXEC
from frostfs_testlib.steps.node_management import storage_node_healthcheck from frostfs_testlib.steps.node_management import storage_node_healthcheck
from frostfs_testlib.storage.cluster import ClusterNode from frostfs_testlib.storage.cluster import ClusterNode
@ -9,6 +11,33 @@ reporter = get_reporter()
class BasicHealthcheck(Healthcheck): class BasicHealthcheck(Healthcheck):
@reporter.step_deco("Perform healthcheck for {cluster_node}") @reporter.step_deco("Perform healthcheck for {cluster_node}")
def perform(self, cluster_node: ClusterNode): def perform(self, cluster_node: ClusterNode):
health_check = storage_node_healthcheck(cluster_node.storage_node) result = self.storage_healthcheck(cluster_node)
if health_check.health_status != "READY" or health_check.network_status != "ONLINE": if result:
raise AssertionError("Node {cluster_node} is not healthy") raise AssertionError(result)
@reporter.step_deco("Tree healthcheck on {cluster_node}")
def tree_healthcheck(self, cluster_node: ClusterNode) -> str | None:
host = cluster_node.host
service_config = host.get_service_config(cluster_node.storage_node.name)
wallet_path = service_config.attributes["wallet_path"]
wallet_password = service_config.attributes["wallet_password"]
shell = host.get_shell()
wallet_config_path = f"/tmp/{cluster_node.storage_node.name}-config.yaml"
wallet_config = f'wallet: {wallet_path}\npassword: "{wallet_password}"'
shell.exec(f"echo '{wallet_config}' > {wallet_config_path}")
remote_cli = FrostfsCli(
shell,
host.get_cli_config(FROSTFS_CLI_EXEC).exec_path,
config_file=wallet_config_path,
)
result = remote_cli.tree.healthcheck(rpc_endpoint="127.0.0.1:8080")
if result.return_code != 0:
return f"Error during tree healthcheck (rc={result.return_code}): {result.stdout}. \n Stderr: {result.stderr}"
@reporter.step_deco("Storage healthcheck on {cluster_node}")
def storage_healthcheck(self, cluster_node: ClusterNode) -> str | None:
result = storage_node_healthcheck(cluster_node.storage_node)
if result.health_status != "READY" or result.network_status != "ONLINE":
return f"Node {cluster_node} is not healthy. Health={result.health_status}. Network={result.network_status}"

View file

@ -7,3 +7,11 @@ class Healthcheck(ABC):
@abstractmethod @abstractmethod
def perform(self, cluster_node: ClusterNode): def perform(self, cluster_node: ClusterNode):
"""Perform healthcheck on the target cluster node""" """Perform healthcheck on the target cluster node"""
@abstractmethod
def tree_healthcheck(self, cluster_node: ClusterNode):
"""Check tree sync status on target cluster node"""
@abstractmethod
def storage_healthcheck(self, cluster_node: ClusterNode):
"""Perform storage node healthcheck on target cluster node"""

View file

@ -3,7 +3,6 @@ import itertools
import math import math
import re import re
import time import time
from concurrent.futures import ThreadPoolExecutor
from dataclasses import fields from dataclasses import fields
from typing import Optional from typing import Optional
from urllib.parse import urlparse from urllib.parse import urlparse
@ -24,13 +23,16 @@ from frostfs_testlib.resources.load_params import (
LOAD_NODE_SSH_USER, LOAD_NODE_SSH_USER,
LOAD_NODES, LOAD_NODES,
) )
from frostfs_testlib.shell.command_inspectors import SuInspector
from frostfs_testlib.shell.interfaces import CommandOptions, InteractiveInput from frostfs_testlib.shell.interfaces import CommandOptions, InteractiveInput
from frostfs_testlib.storage.cluster import ClusterNode from frostfs_testlib.storage.cluster import ClusterNode
from frostfs_testlib.storage.controllers.cluster_state_controller import ClusterStateController from frostfs_testlib.storage.controllers.cluster_state_controller import ClusterStateController
from frostfs_testlib.storage.dataclasses.frostfs_services import S3Gate, StorageNode from frostfs_testlib.storage.dataclasses.frostfs_services import S3Gate, StorageNode
from frostfs_testlib.storage.dataclasses.wallet import WalletInfo from frostfs_testlib.storage.dataclasses.wallet import WalletInfo
from frostfs_testlib.testing import parallel, run_optionally from frostfs_testlib.testing import parallel, run_optionally
from frostfs_testlib.utils import FileKeeper, datetime_utils from frostfs_testlib.testing.test_control import retry
from frostfs_testlib.utils import datetime_utils
from frostfs_testlib.utils.file_keeper import FileKeeper
reporter = get_reporter() reporter = get_reporter()
@ -296,40 +298,53 @@ class LocalRunner(RunnerBase):
nodes_under_load: list[ClusterNode], nodes_under_load: list[ClusterNode],
k6_dir: str, k6_dir: str,
): ):
@reporter.step_deco("Prepare node {cluster_node}") parallel(self.prepare_node, nodes_under_load, k6_dir, load_params)
def prepare_node(cluster_node: ClusterNode):
shell = cluster_node.host.get_shell()
with reporter.step("Allow storage user to login into system"): @retry(3, 5, expected_result=True)
shell.exec(f"sudo chsh -s /bin/bash {STORAGE_USER_NAME}") def allow_user_to_login_in_system(self, cluster_node: ClusterNode):
shell.exec("sudo chattr +i /etc/passwd") shell = cluster_node.host.get_shell()
with reporter.step("Update limits.conf"): result = None
limits_path = "/etc/security/limits.conf" try:
self.file_keeper.add(cluster_node.storage_node, limits_path) shell.exec(f"sudo chsh -s /bin/bash {STORAGE_USER_NAME}")
content = f"{STORAGE_USER_NAME} hard nofile 65536\n{STORAGE_USER_NAME} soft nofile 65536\n" self.lock_passwd_on_node(cluster_node)
shell.exec(f"echo '{content}' | sudo tee {limits_path}") options = CommandOptions(check=False, extra_inspectors=[SuInspector(STORAGE_USER_NAME)])
result = shell.exec("whoami", options)
finally:
if not result or result.return_code:
self.restore_passwd_on_node(cluster_node)
return False
with reporter.step("Download K6"): return True
shell.exec(f"sudo rm -rf {k6_dir};sudo mkdir {k6_dir}")
shell.exec(f"sudo curl -so {k6_dir}/k6.tar.gz {load_params.k6_url}")
shell.exec(f"sudo tar xf {k6_dir}/k6.tar.gz -C {k6_dir}")
shell.exec(f"sudo chmod -R 777 {k6_dir}")
with reporter.step("Create empty_passwd"): @reporter.step_deco("Prepare node {cluster_node}")
self.wallet = WalletInfo( def prepare_node(self, cluster_node: ClusterNode, k6_dir: str, load_params: LoadParams):
f"{k6_dir}/scenarios/files/wallet.json", "", "/tmp/empty_passwd.yml" shell = cluster_node.host.get_shell()
)
content = yaml.dump({"password": ""})
shell.exec(f'echo "{content}" | sudo tee {self.wallet.config_path}')
shell.exec(f"sudo chmod -R 777 {self.wallet.config_path}")
with ThreadPoolExecutor(max_workers=len(nodes_under_load)) as executor: with reporter.step("Allow storage user to login into system"):
result = executor.map(prepare_node, nodes_under_load) self.allow_user_to_login_in_system(cluster_node)
# Check for exceptions with reporter.step("Update limits.conf"):
for _ in result: limits_path = "/etc/security/limits.conf"
pass self.file_keeper.add(cluster_node.storage_node, limits_path)
content = (
f"{STORAGE_USER_NAME} hard nofile 65536\n{STORAGE_USER_NAME} soft nofile 65536\n"
)
shell.exec(f"echo '{content}' | sudo tee {limits_path}")
with reporter.step("Download K6"):
shell.exec(f"sudo rm -rf {k6_dir};sudo mkdir {k6_dir}")
shell.exec(f"sudo curl -so {k6_dir}/k6.tar.gz {load_params.k6_url}")
shell.exec(f"sudo tar xf {k6_dir}/k6.tar.gz -C {k6_dir}")
shell.exec(f"sudo chmod -R 777 {k6_dir}")
with reporter.step("Create empty_passwd"):
self.wallet = WalletInfo(
f"{k6_dir}/scenarios/files/wallet.json", "", "/tmp/empty_passwd.yml"
)
content = yaml.dump({"password": ""})
shell.exec(f'echo "{content}" | sudo tee {self.wallet.config_path}')
shell.exec(f"sudo chmod -R 777 {self.wallet.config_path}")
@reporter.step_deco("Init k6 instances") @reporter.step_deco("Init k6 instances")
def init_k6_instances(self, load_params: LoadParams, endpoints: list[str], k6_dir: str): def init_k6_instances(self, load_params: LoadParams, endpoints: list[str], k6_dir: str):
@ -379,16 +394,21 @@ class LocalRunner(RunnerBase):
): ):
time.sleep(wait_after_start_time) time.sleep(wait_after_start_time)
@reporter.step_deco("Restore passwd on {cluster_node}")
def restore_passwd_on_node(self, cluster_node: ClusterNode):
shell = cluster_node.host.get_shell()
shell.exec("sudo chattr -i /etc/passwd")
@reporter.step_deco("Lock passwd on {cluster_node}")
def lock_passwd_on_node(self, cluster_node: ClusterNode):
shell = cluster_node.host.get_shell()
shell.exec("sudo chattr +i /etc/passwd")
def stop(self): def stop(self):
for k6_instance in self.k6_instances: for k6_instance in self.k6_instances:
k6_instance.stop() k6_instance.stop()
@reporter.step_deco("Restore passwd on {cluster_node}") parallel(self.restore_passwd_on_node, self.nodes_under_load)
def restore_passwd_attr_on_node(cluster_node: ClusterNode):
shell = cluster_node.host.get_shell()
shell.exec("sudo chattr -i /etc/passwd")
parallel(restore_passwd_attr_on_node, self.nodes_under_load)
self.cluster_state_controller.start_stopped_storage_services() self.cluster_state_controller.start_stopped_storage_services()
self.cluster_state_controller.start_stopped_s3_gates() self.cluster_state_controller.start_stopped_s3_gates()

View file

@ -1,15 +1,15 @@
import copy
import time import time
import frostfs_testlib.resources.optionals as optionals import frostfs_testlib.resources.optionals as optionals
from frostfs_testlib.healthcheck.interfaces import Healthcheck
from frostfs_testlib.reporter import get_reporter from frostfs_testlib.reporter import get_reporter
from frostfs_testlib.shell import CommandOptions, Shell, SshConnectionProvider from frostfs_testlib.shell import CommandOptions, Shell, SshConnectionProvider
from frostfs_testlib.steps.network import IfUpDownHelper, IpTablesHelper from frostfs_testlib.steps.network import IfUpDownHelper, IpTablesHelper
from frostfs_testlib.storage.cluster import Cluster, ClusterNode, StorageNode from frostfs_testlib.storage.cluster import Cluster, ClusterNode, S3Gate, StorageNode
from frostfs_testlib.storage.controllers.disk_controller import DiskController from frostfs_testlib.storage.controllers.disk_controller import DiskController
from frostfs_testlib.storage.dataclasses.node_base import NodeBase, ServiceClass from frostfs_testlib.storage.dataclasses.node_base import NodeBase, ServiceClass
from frostfs_testlib.testing import parallel from frostfs_testlib.testing import parallel
from frostfs_testlib.testing.test_control import run_optionally from frostfs_testlib.testing.test_control import run_optionally, wait_for_success
from frostfs_testlib.utils.failover_utils import ( from frostfs_testlib.utils.failover_utils import (
wait_all_storage_nodes_returned, wait_all_storage_nodes_returned,
wait_for_host_offline, wait_for_host_offline,
@ -22,18 +22,36 @@ if_up_down_helper = IfUpDownHelper()
class ClusterStateController: class ClusterStateController:
def __init__(self, shell: Shell, cluster: Cluster) -> None: def __init__(self, shell: Shell, cluster: Cluster, healthcheck: Healthcheck) -> None:
self.stopped_nodes: list[ClusterNode] = [] self.stopped_nodes: list[ClusterNode] = []
self.detached_disks: dict[str, DiskController] = {} self.detached_disks: dict[str, DiskController] = {}
self.stopped_storage_nodes: list[ClusterNode] = []
self.stopped_s3_gates: list[ClusterNode] = []
self.dropped_traffic: list[ClusterNode] = [] self.dropped_traffic: list[ClusterNode] = []
self.stopped_services: set[NodeBase] = set() self.stopped_services: set[NodeBase] = set()
self.cluster = cluster self.cluster = cluster
self.healthcheck = healthcheck
self.shell = shell self.shell = shell
self.suspended_services: dict[str, list[ClusterNode]] = {} self.suspended_services: dict[str, list[ClusterNode]] = {}
self.nodes_with_modified_interface: list[ClusterNode] = [] self.nodes_with_modified_interface: list[ClusterNode] = []
def _get_stopped_by_node(self, node: ClusterNode) -> set[NodeBase]:
stopped_by_node = [svc for svc in self.stopped_services if svc.host == node.host]
return set(stopped_by_node)
def _get_stopped_by_type(self, service_type: type[ServiceClass]) -> set[ServiceClass]:
stopped_by_type = [svc for svc in self.stopped_services if isinstance(svc, service_type)]
return set(stopped_by_type)
def _from_stopped_nodes(self, service_type: type[ServiceClass]) -> set[ServiceClass]:
stopped_on_nodes = set([node.service(service_type) for node in self.stopped_nodes])
return set(stopped_on_nodes)
def _get_online(self, service_type: type[ServiceClass]) -> set[ServiceClass]:
stopped_svc = self._get_stopped_by_type(service_type).union(
self._from_stopped_nodes(service_type)
)
online_svc = set(self.cluster.services(service_type)) - stopped_svc
return online_svc
@run_optionally(optionals.OPTIONAL_FAILOVER_ENABLED) @run_optionally(optionals.OPTIONAL_FAILOVER_ENABLED)
@reporter.step_deco("Stop host of node {node}") @reporter.step_deco("Stop host of node {node}")
def stop_node_host(self, node: ClusterNode, mode: str): def stop_node_host(self, node: ClusterNode, mode: str):
@ -65,26 +83,6 @@ class ClusterStateController:
for node in nodes: for node in nodes:
wait_for_host_offline(self.shell, node.storage_node) wait_for_host_offline(self.shell, node.storage_node)
@run_optionally(optionals.OPTIONAL_FAILOVER_ENABLED)
@reporter.step_deco("Stop all storage services on cluster")
def stop_all_storage_services(self, reversed_order: bool = False):
nodes = (
reversed(self.cluster.cluster_nodes) if reversed_order else self.cluster.cluster_nodes
)
for node in nodes:
self.stop_storage_service(node)
@run_optionally(optionals.OPTIONAL_FAILOVER_ENABLED)
@reporter.step_deco("Stop all S3 gates on cluster")
def stop_all_s3_gates(self, reversed_order: bool = False):
nodes = (
reversed(self.cluster.cluster_nodes) if reversed_order else self.cluster.cluster_nodes
)
for node in nodes:
self.stop_s3_gate(node)
@run_optionally(optionals.OPTIONAL_FAILOVER_ENABLED) @run_optionally(optionals.OPTIONAL_FAILOVER_ENABLED)
@reporter.step_deco("Start host of node {node}") @reporter.step_deco("Start host of node {node}")
def start_node_host(self, node: ClusterNode): def start_node_host(self, node: ClusterNode):
@ -104,13 +102,10 @@ class ClusterStateController:
for node in nodes: for node in nodes:
with reporter.step(f"Start host {node.host.config.address}"): with reporter.step(f"Start host {node.host.config.address}"):
node.host.start_host() node.host.start_host()
if node in self.stopped_storage_nodes: self.stopped_services.difference_update(self._get_stopped_by_node(node))
self.stopped_storage_nodes.remove(node)
if node in self.stopped_s3_gates:
self.stopped_s3_gates.remove(node)
self.stopped_nodes = [] self.stopped_nodes = []
wait_all_storage_nodes_returned(self.shell, self.cluster) self.wait_after_storage_startup()
@run_optionally(optionals.OPTIONAL_FAILOVER_ENABLED) @run_optionally(optionals.OPTIONAL_FAILOVER_ENABLED)
@reporter.step_deco("Detach disk {device} at {mountpoint} on node {node}") @reporter.step_deco("Detach disk {device} at {mountpoint} on node {node}")
@ -133,42 +128,57 @@ class ClusterStateController:
disk_controller.attach() disk_controller.attach()
self.detached_disks = {} self.detached_disks = {}
@run_optionally(optionals.OPTIONAL_FAILOVER_ENABLED)
@reporter.step_deco("Stop storage service on {node}")
def stop_storage_service(self, node: ClusterNode, mask: bool = True):
self.stopped_storage_nodes.append(node)
node.storage_node.stop_service(mask)
@run_optionally(optionals.OPTIONAL_FAILOVER_ENABLED) @run_optionally(optionals.OPTIONAL_FAILOVER_ENABLED)
@reporter.step_deco("Stop all {service_type} services") @reporter.step_deco("Stop all {service_type} services")
def stop_services_of_type(self, service_type: type[ServiceClass]): def stop_services_of_type(self, service_type: type[ServiceClass], mask: bool = True):
services = self.cluster.services(service_type) services = self.cluster.services(service_type)
self.stopped_services.update(services) self.stopped_services.update(services)
parallel([service.stop_service for service in services]) parallel([service.stop_service for service in services], mask=mask)
@run_optionally(optionals.OPTIONAL_FAILOVER_ENABLED) @run_optionally(optionals.OPTIONAL_FAILOVER_ENABLED)
@reporter.step_deco("Start all {service_type} services") @reporter.step_deco("Start all {service_type} services")
def start_services_of_type(self, service_type: type[ServiceClass]): def start_services_of_type(self, service_type: type[ServiceClass]):
services = self.cluster.services(service_type) services = self.cluster.services(service_type)
parallel([service.start_service for service in services]) parallel([service.start_service for service in services])
self.stopped_services.difference_update(set(services))
if service_type == StorageNode: if service_type == StorageNode:
wait_all_storage_nodes_returned(self.shell, self.cluster) self.wait_after_storage_startup()
self.stopped_services = self.stopped_services - set(services) @wait_for_success(600, 60)
def wait_s3gate(self, s3gate: S3Gate):
with reporter.step(f"Wait for {s3gate} reconnection"):
result = s3gate.get_metric("frostfs_s3_gw_pool_current_nodes")
assert (
'address="127.0.0.1' in result.stdout
), "S3Gate should connect to local storage node"
@reporter.step_deco("Wait for S3Gates reconnection to local storage")
def wait_s3gates(self):
online_s3gates = self._get_online(S3Gate)
parallel(self.wait_s3gate, online_s3gates)
@wait_for_success(600, 60)
def wait_tree_healthcheck(self):
nodes = self.cluster.nodes(self._get_online(StorageNode))
parallel(self.healthcheck.tree_healthcheck, nodes)
@reporter.step_deco("Wait for storage reconnection to the system")
def wait_after_storage_startup(self):
wait_all_storage_nodes_returned(self.shell, self.cluster)
self.wait_s3gates()
self.wait_tree_healthcheck()
@run_optionally(optionals.OPTIONAL_FAILOVER_ENABLED) @run_optionally(optionals.OPTIONAL_FAILOVER_ENABLED)
@reporter.step_deco("Start all stopped services") @reporter.step_deco("Start all stopped services")
def start_all_stopped_services(self): def start_all_stopped_services(self):
stopped_storages = self._get_stopped_by_type(StorageNode)
parallel([service.start_service for service in self.stopped_services]) parallel([service.start_service for service in self.stopped_services])
for service in self.stopped_services:
if isinstance(service, StorageNode):
wait_all_storage_nodes_returned(self.shell, self.cluster)
break
self.stopped_services.clear() self.stopped_services.clear()
if stopped_storages:
self.wait_after_storage_startup()
@run_optionally(optionals.OPTIONAL_FAILOVER_ENABLED) @run_optionally(optionals.OPTIONAL_FAILOVER_ENABLED)
@reporter.step_deco("Stop {service_type} service on {node}") @reporter.step_deco("Stop {service_type} service on {node}")
def stop_service_of_type( def stop_service_of_type(
@ -183,50 +193,78 @@ class ClusterStateController:
def start_service_of_type(self, node: ClusterNode, service_type: type[ServiceClass]): def start_service_of_type(self, node: ClusterNode, service_type: type[ServiceClass]):
service = node.service(service_type) service = node.service(service_type)
service.start_service() service.start_service()
if service in self.stopped_services: self.stopped_services.discard(service)
self.stopped_services.remove(service)
@run_optionally(optionals.OPTIONAL_FAILOVER_ENABLED)
@reporter.step_deco("Start all stopped {service_type} services")
def start_stopped_services_of_type(self, service_type: type[ServiceClass]):
stopped_svc = self._get_stopped_by_type(service_type)
if not stopped_svc:
return
parallel([svc.start_service for svc in stopped_svc])
self.stopped_services.difference_update(stopped_svc)
if service_type == StorageNode:
self.wait_after_storage_startup()
# TODO: Deprecated
@run_optionally(optionals.OPTIONAL_FAILOVER_ENABLED)
@reporter.step_deco("Stop all storage services on cluster")
def stop_all_storage_services(self, reversed_order: bool = False):
nodes = (
reversed(self.cluster.cluster_nodes) if reversed_order else self.cluster.cluster_nodes
)
for node in nodes:
self.stop_service_of_type(node, StorageNode)
# TODO: Deprecated
@run_optionally(optionals.OPTIONAL_FAILOVER_ENABLED)
@reporter.step_deco("Stop all S3 gates on cluster")
def stop_all_s3_gates(self, reversed_order: bool = False):
nodes = (
reversed(self.cluster.cluster_nodes) if reversed_order else self.cluster.cluster_nodes
)
for node in nodes:
self.stop_service_of_type(node, S3Gate)
# TODO: Deprecated
@run_optionally(optionals.OPTIONAL_FAILOVER_ENABLED)
@reporter.step_deco("Stop storage service on {node}")
def stop_storage_service(self, node: ClusterNode, mask: bool = True):
self.stop_service_of_type(node, StorageNode, mask)
# TODO: Deprecated
@run_optionally(optionals.OPTIONAL_FAILOVER_ENABLED) @run_optionally(optionals.OPTIONAL_FAILOVER_ENABLED)
@reporter.step_deco("Start storage service on {node}") @reporter.step_deco("Start storage service on {node}")
def start_storage_service(self, node: ClusterNode): def start_storage_service(self, node: ClusterNode):
node.storage_node.start_service() self.start_service_of_type(node, StorageNode)
self.stopped_storage_nodes.remove(node)
# TODO: Deprecated
@run_optionally(optionals.OPTIONAL_FAILOVER_ENABLED) @run_optionally(optionals.OPTIONAL_FAILOVER_ENABLED)
@reporter.step_deco("Start stopped storage services") @reporter.step_deco("Start stopped storage services")
def start_stopped_storage_services(self): def start_stopped_storage_services(self):
if not self.stopped_storage_nodes: self.start_stopped_services_of_type(StorageNode)
return
# In case if we stopped couple services, for example (s01-s04):
# After starting only s01, it may require connections to s02-s04, which is still down, and fail to start.
# Also, if something goes wrong here, we might skip s02-s04 start at all, and cluster will be left in a bad state.
# So in order to make sure that services are at least attempted to be started, using parallel runs here.
parallel(self.start_storage_service, copy.copy(self.stopped_storage_nodes))
wait_all_storage_nodes_returned(self.shell, self.cluster)
self.stopped_storage_nodes = []
# TODO: Deprecated
@run_optionally(optionals.OPTIONAL_FAILOVER_ENABLED) @run_optionally(optionals.OPTIONAL_FAILOVER_ENABLED)
@reporter.step_deco("Stop s3 gate on {node}") @reporter.step_deco("Stop s3 gate on {node}")
def stop_s3_gate(self, node: ClusterNode, mask: bool = True): def stop_s3_gate(self, node: ClusterNode, mask: bool = True):
node.s3_gate.stop_service(mask) self.stop_service_of_type(node, S3Gate, mask)
self.stopped_s3_gates.append(node)
# TODO: Deprecated
@run_optionally(optionals.OPTIONAL_FAILOVER_ENABLED) @run_optionally(optionals.OPTIONAL_FAILOVER_ENABLED)
@reporter.step_deco("Start s3 gate on {node}") @reporter.step_deco("Start s3 gate on {node}")
def start_s3_gate(self, node: ClusterNode): def start_s3_gate(self, node: ClusterNode):
node.s3_gate.start_service() self.start_service_of_type(node, S3Gate)
self.stopped_s3_gates.remove(node)
# TODO: Deprecated
@run_optionally(optionals.OPTIONAL_FAILOVER_ENABLED) @run_optionally(optionals.OPTIONAL_FAILOVER_ENABLED)
@reporter.step_deco("Start stopped S3 gates") @reporter.step_deco("Start stopped S3 gates")
def start_stopped_s3_gates(self): def start_stopped_s3_gates(self):
if not self.stopped_s3_gates: self.start_stopped_services_of_type(S3Gate)
return
parallel(self.start_s3_gate, copy.copy(self.stopped_s3_gates))
self.stopped_s3_gates = []
@run_optionally(optionals.OPTIONAL_FAILOVER_ENABLED) @run_optionally(optionals.OPTIONAL_FAILOVER_ENABLED)
@reporter.step_deco("Suspend {process_name} service in {node}") @reporter.step_deco("Suspend {process_name} service in {node}")

View file

@ -7,6 +7,7 @@ import yaml
from frostfs_testlib.hosting.config import ServiceConfig from frostfs_testlib.hosting.config import ServiceConfig
from frostfs_testlib.hosting.interfaces import Host from frostfs_testlib.hosting.interfaces import Host
from frostfs_testlib.reporter import get_reporter from frostfs_testlib.reporter import get_reporter
from frostfs_testlib.shell.interfaces import CommandResult
from frostfs_testlib.storage.constants import ConfigAttributes from frostfs_testlib.storage.constants import ConfigAttributes
from frostfs_testlib.testing.readable import HumanReadableABC from frostfs_testlib.testing.readable import HumanReadableABC
from frostfs_testlib.utils import wallet_utils from frostfs_testlib.utils import wallet_utils
@ -67,6 +68,12 @@ class NodeBase(HumanReadableABC):
def service_healthcheck(self) -> bool: def service_healthcheck(self) -> bool:
"""Service healthcheck.""" """Service healthcheck."""
# TODO: Migrate to sub-class Metrcis (not yet exists :))
def get_metric(self, metric: str) -> CommandResult:
shell = self.host.get_shell()
result = shell.exec(f"curl -s {self.get_metrics_endpoint()} | grep -e '^{metric}'")
return result
def get_metrics_endpoint(self) -> str: def get_metrics_endpoint(self) -> str:
return self._get_attribute(ConfigAttributes.ENDPOINT_PROMETHEUS) return self._get_attribute(ConfigAttributes.ENDPOINT_PROMETHEUS)

View file

@ -3,6 +3,3 @@ import frostfs_testlib.utils.datetime_utils
import frostfs_testlib.utils.json_utils import frostfs_testlib.utils.json_utils
import frostfs_testlib.utils.string_utils import frostfs_testlib.utils.string_utils
import frostfs_testlib.utils.wallet_utils import frostfs_testlib.utils.wallet_utils
# TODO: Circullar dependency FileKeeper -> NodeBase -> Utils -> FileKeeper -> NodeBase
from frostfs_testlib.utils.file_keeper import FileKeeper