Compare commits
1 commit
master
...
support/v0
Author | SHA1 | Date | |
---|---|---|---|
5687b79b38 |
6 changed files with 212 additions and 113 deletions
|
@ -1,5 +1,7 @@
|
||||||
|
from frostfs_testlib.cli.frostfs_cli.cli import FrostfsCli
|
||||||
from frostfs_testlib.healthcheck.interfaces import Healthcheck
|
from frostfs_testlib.healthcheck.interfaces import Healthcheck
|
||||||
from frostfs_testlib.reporter import get_reporter
|
from frostfs_testlib.reporter import get_reporter
|
||||||
|
from frostfs_testlib.resources.cli import FROSTFS_CLI_EXEC
|
||||||
from frostfs_testlib.steps.node_management import storage_node_healthcheck
|
from frostfs_testlib.steps.node_management import storage_node_healthcheck
|
||||||
from frostfs_testlib.storage.cluster import ClusterNode
|
from frostfs_testlib.storage.cluster import ClusterNode
|
||||||
|
|
||||||
|
@ -9,6 +11,33 @@ reporter = get_reporter()
|
||||||
class BasicHealthcheck(Healthcheck):
|
class BasicHealthcheck(Healthcheck):
|
||||||
@reporter.step_deco("Perform healthcheck for {cluster_node}")
|
@reporter.step_deco("Perform healthcheck for {cluster_node}")
|
||||||
def perform(self, cluster_node: ClusterNode):
|
def perform(self, cluster_node: ClusterNode):
|
||||||
health_check = storage_node_healthcheck(cluster_node.storage_node)
|
result = self.storage_healthcheck(cluster_node)
|
||||||
if health_check.health_status != "READY" or health_check.network_status != "ONLINE":
|
if result:
|
||||||
raise AssertionError("Node {cluster_node} is not healthy")
|
raise AssertionError(result)
|
||||||
|
|
||||||
|
@reporter.step_deco("Tree healthcheck on {cluster_node}")
|
||||||
|
def tree_healthcheck(self, cluster_node: ClusterNode) -> str | None:
|
||||||
|
host = cluster_node.host
|
||||||
|
service_config = host.get_service_config(cluster_node.storage_node.name)
|
||||||
|
wallet_path = service_config.attributes["wallet_path"]
|
||||||
|
wallet_password = service_config.attributes["wallet_password"]
|
||||||
|
|
||||||
|
shell = host.get_shell()
|
||||||
|
wallet_config_path = f"/tmp/{cluster_node.storage_node.name}-config.yaml"
|
||||||
|
wallet_config = f'wallet: {wallet_path}\npassword: "{wallet_password}"'
|
||||||
|
shell.exec(f"echo '{wallet_config}' > {wallet_config_path}")
|
||||||
|
|
||||||
|
remote_cli = FrostfsCli(
|
||||||
|
shell,
|
||||||
|
host.get_cli_config(FROSTFS_CLI_EXEC).exec_path,
|
||||||
|
config_file=wallet_config_path,
|
||||||
|
)
|
||||||
|
result = remote_cli.tree.healthcheck(rpc_endpoint="127.0.0.1:8080")
|
||||||
|
if result.return_code != 0:
|
||||||
|
return f"Error during tree healthcheck (rc={result.return_code}): {result.stdout}. \n Stderr: {result.stderr}"
|
||||||
|
|
||||||
|
@reporter.step_deco("Storage healthcheck on {cluster_node}")
|
||||||
|
def storage_healthcheck(self, cluster_node: ClusterNode) -> str | None:
|
||||||
|
result = storage_node_healthcheck(cluster_node.storage_node)
|
||||||
|
if result.health_status != "READY" or result.network_status != "ONLINE":
|
||||||
|
return f"Node {cluster_node} is not healthy. Health={result.health_status}. Network={result.network_status}"
|
||||||
|
|
|
@ -7,3 +7,11 @@ class Healthcheck(ABC):
|
||||||
@abstractmethod
|
@abstractmethod
|
||||||
def perform(self, cluster_node: ClusterNode):
|
def perform(self, cluster_node: ClusterNode):
|
||||||
"""Perform healthcheck on the target cluster node"""
|
"""Perform healthcheck on the target cluster node"""
|
||||||
|
|
||||||
|
@abstractmethod
|
||||||
|
def tree_healthcheck(self, cluster_node: ClusterNode):
|
||||||
|
"""Check tree sync status on target cluster node"""
|
||||||
|
|
||||||
|
@abstractmethod
|
||||||
|
def storage_healthcheck(self, cluster_node: ClusterNode):
|
||||||
|
"""Perform storage node healthcheck on target cluster node"""
|
||||||
|
|
|
@ -3,7 +3,6 @@ import itertools
|
||||||
import math
|
import math
|
||||||
import re
|
import re
|
||||||
import time
|
import time
|
||||||
from concurrent.futures import ThreadPoolExecutor
|
|
||||||
from dataclasses import fields
|
from dataclasses import fields
|
||||||
from typing import Optional
|
from typing import Optional
|
||||||
from urllib.parse import urlparse
|
from urllib.parse import urlparse
|
||||||
|
@ -24,13 +23,16 @@ from frostfs_testlib.resources.load_params import (
|
||||||
LOAD_NODE_SSH_USER,
|
LOAD_NODE_SSH_USER,
|
||||||
LOAD_NODES,
|
LOAD_NODES,
|
||||||
)
|
)
|
||||||
|
from frostfs_testlib.shell.command_inspectors import SuInspector
|
||||||
from frostfs_testlib.shell.interfaces import CommandOptions, InteractiveInput
|
from frostfs_testlib.shell.interfaces import CommandOptions, InteractiveInput
|
||||||
from frostfs_testlib.storage.cluster import ClusterNode
|
from frostfs_testlib.storage.cluster import ClusterNode
|
||||||
from frostfs_testlib.storage.controllers.cluster_state_controller import ClusterStateController
|
from frostfs_testlib.storage.controllers.cluster_state_controller import ClusterStateController
|
||||||
from frostfs_testlib.storage.dataclasses.frostfs_services import S3Gate, StorageNode
|
from frostfs_testlib.storage.dataclasses.frostfs_services import S3Gate, StorageNode
|
||||||
from frostfs_testlib.storage.dataclasses.wallet import WalletInfo
|
from frostfs_testlib.storage.dataclasses.wallet import WalletInfo
|
||||||
from frostfs_testlib.testing import parallel, run_optionally
|
from frostfs_testlib.testing import parallel, run_optionally
|
||||||
from frostfs_testlib.utils import FileKeeper, datetime_utils
|
from frostfs_testlib.testing.test_control import retry
|
||||||
|
from frostfs_testlib.utils import datetime_utils
|
||||||
|
from frostfs_testlib.utils.file_keeper import FileKeeper
|
||||||
|
|
||||||
reporter = get_reporter()
|
reporter = get_reporter()
|
||||||
|
|
||||||
|
@ -296,40 +298,53 @@ class LocalRunner(RunnerBase):
|
||||||
nodes_under_load: list[ClusterNode],
|
nodes_under_load: list[ClusterNode],
|
||||||
k6_dir: str,
|
k6_dir: str,
|
||||||
):
|
):
|
||||||
@reporter.step_deco("Prepare node {cluster_node}")
|
parallel(self.prepare_node, nodes_under_load, k6_dir, load_params)
|
||||||
def prepare_node(cluster_node: ClusterNode):
|
|
||||||
shell = cluster_node.host.get_shell()
|
|
||||||
|
|
||||||
with reporter.step("Allow storage user to login into system"):
|
@retry(3, 5, expected_result=True)
|
||||||
shell.exec(f"sudo chsh -s /bin/bash {STORAGE_USER_NAME}")
|
def allow_user_to_login_in_system(self, cluster_node: ClusterNode):
|
||||||
shell.exec("sudo chattr +i /etc/passwd")
|
shell = cluster_node.host.get_shell()
|
||||||
|
|
||||||
with reporter.step("Update limits.conf"):
|
result = None
|
||||||
limits_path = "/etc/security/limits.conf"
|
try:
|
||||||
self.file_keeper.add(cluster_node.storage_node, limits_path)
|
shell.exec(f"sudo chsh -s /bin/bash {STORAGE_USER_NAME}")
|
||||||
content = f"{STORAGE_USER_NAME} hard nofile 65536\n{STORAGE_USER_NAME} soft nofile 65536\n"
|
self.lock_passwd_on_node(cluster_node)
|
||||||
shell.exec(f"echo '{content}' | sudo tee {limits_path}")
|
options = CommandOptions(check=False, extra_inspectors=[SuInspector(STORAGE_USER_NAME)])
|
||||||
|
result = shell.exec("whoami", options)
|
||||||
|
finally:
|
||||||
|
if not result or result.return_code:
|
||||||
|
self.restore_passwd_on_node(cluster_node)
|
||||||
|
return False
|
||||||
|
|
||||||
with reporter.step("Download K6"):
|
return True
|
||||||
shell.exec(f"sudo rm -rf {k6_dir};sudo mkdir {k6_dir}")
|
|
||||||
shell.exec(f"sudo curl -so {k6_dir}/k6.tar.gz {load_params.k6_url}")
|
|
||||||
shell.exec(f"sudo tar xf {k6_dir}/k6.tar.gz -C {k6_dir}")
|
|
||||||
shell.exec(f"sudo chmod -R 777 {k6_dir}")
|
|
||||||
|
|
||||||
with reporter.step("Create empty_passwd"):
|
@reporter.step_deco("Prepare node {cluster_node}")
|
||||||
self.wallet = WalletInfo(
|
def prepare_node(self, cluster_node: ClusterNode, k6_dir: str, load_params: LoadParams):
|
||||||
f"{k6_dir}/scenarios/files/wallet.json", "", "/tmp/empty_passwd.yml"
|
shell = cluster_node.host.get_shell()
|
||||||
)
|
|
||||||
content = yaml.dump({"password": ""})
|
|
||||||
shell.exec(f'echo "{content}" | sudo tee {self.wallet.config_path}')
|
|
||||||
shell.exec(f"sudo chmod -R 777 {self.wallet.config_path}")
|
|
||||||
|
|
||||||
with ThreadPoolExecutor(max_workers=len(nodes_under_load)) as executor:
|
with reporter.step("Allow storage user to login into system"):
|
||||||
result = executor.map(prepare_node, nodes_under_load)
|
self.allow_user_to_login_in_system(cluster_node)
|
||||||
|
|
||||||
# Check for exceptions
|
with reporter.step("Update limits.conf"):
|
||||||
for _ in result:
|
limits_path = "/etc/security/limits.conf"
|
||||||
pass
|
self.file_keeper.add(cluster_node.storage_node, limits_path)
|
||||||
|
content = (
|
||||||
|
f"{STORAGE_USER_NAME} hard nofile 65536\n{STORAGE_USER_NAME} soft nofile 65536\n"
|
||||||
|
)
|
||||||
|
shell.exec(f"echo '{content}' | sudo tee {limits_path}")
|
||||||
|
|
||||||
|
with reporter.step("Download K6"):
|
||||||
|
shell.exec(f"sudo rm -rf {k6_dir};sudo mkdir {k6_dir}")
|
||||||
|
shell.exec(f"sudo curl -so {k6_dir}/k6.tar.gz {load_params.k6_url}")
|
||||||
|
shell.exec(f"sudo tar xf {k6_dir}/k6.tar.gz -C {k6_dir}")
|
||||||
|
shell.exec(f"sudo chmod -R 777 {k6_dir}")
|
||||||
|
|
||||||
|
with reporter.step("Create empty_passwd"):
|
||||||
|
self.wallet = WalletInfo(
|
||||||
|
f"{k6_dir}/scenarios/files/wallet.json", "", "/tmp/empty_passwd.yml"
|
||||||
|
)
|
||||||
|
content = yaml.dump({"password": ""})
|
||||||
|
shell.exec(f'echo "{content}" | sudo tee {self.wallet.config_path}')
|
||||||
|
shell.exec(f"sudo chmod -R 777 {self.wallet.config_path}")
|
||||||
|
|
||||||
@reporter.step_deco("Init k6 instances")
|
@reporter.step_deco("Init k6 instances")
|
||||||
def init_k6_instances(self, load_params: LoadParams, endpoints: list[str], k6_dir: str):
|
def init_k6_instances(self, load_params: LoadParams, endpoints: list[str], k6_dir: str):
|
||||||
|
@ -379,16 +394,21 @@ class LocalRunner(RunnerBase):
|
||||||
):
|
):
|
||||||
time.sleep(wait_after_start_time)
|
time.sleep(wait_after_start_time)
|
||||||
|
|
||||||
|
@reporter.step_deco("Restore passwd on {cluster_node}")
|
||||||
|
def restore_passwd_on_node(self, cluster_node: ClusterNode):
|
||||||
|
shell = cluster_node.host.get_shell()
|
||||||
|
shell.exec("sudo chattr -i /etc/passwd")
|
||||||
|
|
||||||
|
@reporter.step_deco("Lock passwd on {cluster_node}")
|
||||||
|
def lock_passwd_on_node(self, cluster_node: ClusterNode):
|
||||||
|
shell = cluster_node.host.get_shell()
|
||||||
|
shell.exec("sudo chattr +i /etc/passwd")
|
||||||
|
|
||||||
def stop(self):
|
def stop(self):
|
||||||
for k6_instance in self.k6_instances:
|
for k6_instance in self.k6_instances:
|
||||||
k6_instance.stop()
|
k6_instance.stop()
|
||||||
|
|
||||||
@reporter.step_deco("Restore passwd on {cluster_node}")
|
parallel(self.restore_passwd_on_node, self.nodes_under_load)
|
||||||
def restore_passwd_attr_on_node(cluster_node: ClusterNode):
|
|
||||||
shell = cluster_node.host.get_shell()
|
|
||||||
shell.exec("sudo chattr -i /etc/passwd")
|
|
||||||
|
|
||||||
parallel(restore_passwd_attr_on_node, self.nodes_under_load)
|
|
||||||
|
|
||||||
self.cluster_state_controller.start_stopped_storage_services()
|
self.cluster_state_controller.start_stopped_storage_services()
|
||||||
self.cluster_state_controller.start_stopped_s3_gates()
|
self.cluster_state_controller.start_stopped_s3_gates()
|
||||||
|
|
|
@ -1,15 +1,15 @@
|
||||||
import copy
|
|
||||||
import time
|
import time
|
||||||
|
|
||||||
import frostfs_testlib.resources.optionals as optionals
|
import frostfs_testlib.resources.optionals as optionals
|
||||||
|
from frostfs_testlib.healthcheck.interfaces import Healthcheck
|
||||||
from frostfs_testlib.reporter import get_reporter
|
from frostfs_testlib.reporter import get_reporter
|
||||||
from frostfs_testlib.shell import CommandOptions, Shell, SshConnectionProvider
|
from frostfs_testlib.shell import CommandOptions, Shell, SshConnectionProvider
|
||||||
from frostfs_testlib.steps.network import IfUpDownHelper, IpTablesHelper
|
from frostfs_testlib.steps.network import IfUpDownHelper, IpTablesHelper
|
||||||
from frostfs_testlib.storage.cluster import Cluster, ClusterNode, StorageNode
|
from frostfs_testlib.storage.cluster import Cluster, ClusterNode, S3Gate, StorageNode
|
||||||
from frostfs_testlib.storage.controllers.disk_controller import DiskController
|
from frostfs_testlib.storage.controllers.disk_controller import DiskController
|
||||||
from frostfs_testlib.storage.dataclasses.node_base import NodeBase, ServiceClass
|
from frostfs_testlib.storage.dataclasses.node_base import NodeBase, ServiceClass
|
||||||
from frostfs_testlib.testing import parallel
|
from frostfs_testlib.testing import parallel
|
||||||
from frostfs_testlib.testing.test_control import run_optionally
|
from frostfs_testlib.testing.test_control import run_optionally, wait_for_success
|
||||||
from frostfs_testlib.utils.failover_utils import (
|
from frostfs_testlib.utils.failover_utils import (
|
||||||
wait_all_storage_nodes_returned,
|
wait_all_storage_nodes_returned,
|
||||||
wait_for_host_offline,
|
wait_for_host_offline,
|
||||||
|
@ -22,18 +22,36 @@ if_up_down_helper = IfUpDownHelper()
|
||||||
|
|
||||||
|
|
||||||
class ClusterStateController:
|
class ClusterStateController:
|
||||||
def __init__(self, shell: Shell, cluster: Cluster) -> None:
|
def __init__(self, shell: Shell, cluster: Cluster, healthcheck: Healthcheck) -> None:
|
||||||
self.stopped_nodes: list[ClusterNode] = []
|
self.stopped_nodes: list[ClusterNode] = []
|
||||||
self.detached_disks: dict[str, DiskController] = {}
|
self.detached_disks: dict[str, DiskController] = {}
|
||||||
self.stopped_storage_nodes: list[ClusterNode] = []
|
|
||||||
self.stopped_s3_gates: list[ClusterNode] = []
|
|
||||||
self.dropped_traffic: list[ClusterNode] = []
|
self.dropped_traffic: list[ClusterNode] = []
|
||||||
self.stopped_services: set[NodeBase] = set()
|
self.stopped_services: set[NodeBase] = set()
|
||||||
self.cluster = cluster
|
self.cluster = cluster
|
||||||
|
self.healthcheck = healthcheck
|
||||||
self.shell = shell
|
self.shell = shell
|
||||||
self.suspended_services: dict[str, list[ClusterNode]] = {}
|
self.suspended_services: dict[str, list[ClusterNode]] = {}
|
||||||
self.nodes_with_modified_interface: list[ClusterNode] = []
|
self.nodes_with_modified_interface: list[ClusterNode] = []
|
||||||
|
|
||||||
|
def _get_stopped_by_node(self, node: ClusterNode) -> set[NodeBase]:
|
||||||
|
stopped_by_node = [svc for svc in self.stopped_services if svc.host == node.host]
|
||||||
|
return set(stopped_by_node)
|
||||||
|
|
||||||
|
def _get_stopped_by_type(self, service_type: type[ServiceClass]) -> set[ServiceClass]:
|
||||||
|
stopped_by_type = [svc for svc in self.stopped_services if isinstance(svc, service_type)]
|
||||||
|
return set(stopped_by_type)
|
||||||
|
|
||||||
|
def _from_stopped_nodes(self, service_type: type[ServiceClass]) -> set[ServiceClass]:
|
||||||
|
stopped_on_nodes = set([node.service(service_type) for node in self.stopped_nodes])
|
||||||
|
return set(stopped_on_nodes)
|
||||||
|
|
||||||
|
def _get_online(self, service_type: type[ServiceClass]) -> set[ServiceClass]:
|
||||||
|
stopped_svc = self._get_stopped_by_type(service_type).union(
|
||||||
|
self._from_stopped_nodes(service_type)
|
||||||
|
)
|
||||||
|
online_svc = set(self.cluster.services(service_type)) - stopped_svc
|
||||||
|
return online_svc
|
||||||
|
|
||||||
@run_optionally(optionals.OPTIONAL_FAILOVER_ENABLED)
|
@run_optionally(optionals.OPTIONAL_FAILOVER_ENABLED)
|
||||||
@reporter.step_deco("Stop host of node {node}")
|
@reporter.step_deco("Stop host of node {node}")
|
||||||
def stop_node_host(self, node: ClusterNode, mode: str):
|
def stop_node_host(self, node: ClusterNode, mode: str):
|
||||||
|
@ -65,26 +83,6 @@ class ClusterStateController:
|
||||||
for node in nodes:
|
for node in nodes:
|
||||||
wait_for_host_offline(self.shell, node.storage_node)
|
wait_for_host_offline(self.shell, node.storage_node)
|
||||||
|
|
||||||
@run_optionally(optionals.OPTIONAL_FAILOVER_ENABLED)
|
|
||||||
@reporter.step_deco("Stop all storage services on cluster")
|
|
||||||
def stop_all_storage_services(self, reversed_order: bool = False):
|
|
||||||
nodes = (
|
|
||||||
reversed(self.cluster.cluster_nodes) if reversed_order else self.cluster.cluster_nodes
|
|
||||||
)
|
|
||||||
|
|
||||||
for node in nodes:
|
|
||||||
self.stop_storage_service(node)
|
|
||||||
|
|
||||||
@run_optionally(optionals.OPTIONAL_FAILOVER_ENABLED)
|
|
||||||
@reporter.step_deco("Stop all S3 gates on cluster")
|
|
||||||
def stop_all_s3_gates(self, reversed_order: bool = False):
|
|
||||||
nodes = (
|
|
||||||
reversed(self.cluster.cluster_nodes) if reversed_order else self.cluster.cluster_nodes
|
|
||||||
)
|
|
||||||
|
|
||||||
for node in nodes:
|
|
||||||
self.stop_s3_gate(node)
|
|
||||||
|
|
||||||
@run_optionally(optionals.OPTIONAL_FAILOVER_ENABLED)
|
@run_optionally(optionals.OPTIONAL_FAILOVER_ENABLED)
|
||||||
@reporter.step_deco("Start host of node {node}")
|
@reporter.step_deco("Start host of node {node}")
|
||||||
def start_node_host(self, node: ClusterNode):
|
def start_node_host(self, node: ClusterNode):
|
||||||
|
@ -104,13 +102,10 @@ class ClusterStateController:
|
||||||
for node in nodes:
|
for node in nodes:
|
||||||
with reporter.step(f"Start host {node.host.config.address}"):
|
with reporter.step(f"Start host {node.host.config.address}"):
|
||||||
node.host.start_host()
|
node.host.start_host()
|
||||||
if node in self.stopped_storage_nodes:
|
self.stopped_services.difference_update(self._get_stopped_by_node(node))
|
||||||
self.stopped_storage_nodes.remove(node)
|
|
||||||
|
|
||||||
if node in self.stopped_s3_gates:
|
|
||||||
self.stopped_s3_gates.remove(node)
|
|
||||||
self.stopped_nodes = []
|
self.stopped_nodes = []
|
||||||
wait_all_storage_nodes_returned(self.shell, self.cluster)
|
self.wait_after_storage_startup()
|
||||||
|
|
||||||
@run_optionally(optionals.OPTIONAL_FAILOVER_ENABLED)
|
@run_optionally(optionals.OPTIONAL_FAILOVER_ENABLED)
|
||||||
@reporter.step_deco("Detach disk {device} at {mountpoint} on node {node}")
|
@reporter.step_deco("Detach disk {device} at {mountpoint} on node {node}")
|
||||||
|
@ -133,42 +128,57 @@ class ClusterStateController:
|
||||||
disk_controller.attach()
|
disk_controller.attach()
|
||||||
self.detached_disks = {}
|
self.detached_disks = {}
|
||||||
|
|
||||||
@run_optionally(optionals.OPTIONAL_FAILOVER_ENABLED)
|
|
||||||
@reporter.step_deco("Stop storage service on {node}")
|
|
||||||
def stop_storage_service(self, node: ClusterNode, mask: bool = True):
|
|
||||||
self.stopped_storage_nodes.append(node)
|
|
||||||
node.storage_node.stop_service(mask)
|
|
||||||
|
|
||||||
@run_optionally(optionals.OPTIONAL_FAILOVER_ENABLED)
|
@run_optionally(optionals.OPTIONAL_FAILOVER_ENABLED)
|
||||||
@reporter.step_deco("Stop all {service_type} services")
|
@reporter.step_deco("Stop all {service_type} services")
|
||||||
def stop_services_of_type(self, service_type: type[ServiceClass]):
|
def stop_services_of_type(self, service_type: type[ServiceClass], mask: bool = True):
|
||||||
services = self.cluster.services(service_type)
|
services = self.cluster.services(service_type)
|
||||||
self.stopped_services.update(services)
|
self.stopped_services.update(services)
|
||||||
parallel([service.stop_service for service in services])
|
parallel([service.stop_service for service in services], mask=mask)
|
||||||
|
|
||||||
@run_optionally(optionals.OPTIONAL_FAILOVER_ENABLED)
|
@run_optionally(optionals.OPTIONAL_FAILOVER_ENABLED)
|
||||||
@reporter.step_deco("Start all {service_type} services")
|
@reporter.step_deco("Start all {service_type} services")
|
||||||
def start_services_of_type(self, service_type: type[ServiceClass]):
|
def start_services_of_type(self, service_type: type[ServiceClass]):
|
||||||
services = self.cluster.services(service_type)
|
services = self.cluster.services(service_type)
|
||||||
parallel([service.start_service for service in services])
|
parallel([service.start_service for service in services])
|
||||||
|
self.stopped_services.difference_update(set(services))
|
||||||
|
|
||||||
if service_type == StorageNode:
|
if service_type == StorageNode:
|
||||||
wait_all_storage_nodes_returned(self.shell, self.cluster)
|
self.wait_after_storage_startup()
|
||||||
|
|
||||||
self.stopped_services = self.stopped_services - set(services)
|
@wait_for_success(600, 60)
|
||||||
|
def wait_s3gate(self, s3gate: S3Gate):
|
||||||
|
with reporter.step(f"Wait for {s3gate} reconnection"):
|
||||||
|
result = s3gate.get_metric("frostfs_s3_gw_pool_current_nodes")
|
||||||
|
assert (
|
||||||
|
'address="127.0.0.1' in result.stdout
|
||||||
|
), "S3Gate should connect to local storage node"
|
||||||
|
|
||||||
|
@reporter.step_deco("Wait for S3Gates reconnection to local storage")
|
||||||
|
def wait_s3gates(self):
|
||||||
|
online_s3gates = self._get_online(S3Gate)
|
||||||
|
parallel(self.wait_s3gate, online_s3gates)
|
||||||
|
|
||||||
|
@wait_for_success(600, 60)
|
||||||
|
def wait_tree_healthcheck(self):
|
||||||
|
nodes = self.cluster.nodes(self._get_online(StorageNode))
|
||||||
|
parallel(self.healthcheck.tree_healthcheck, nodes)
|
||||||
|
|
||||||
|
@reporter.step_deco("Wait for storage reconnection to the system")
|
||||||
|
def wait_after_storage_startup(self):
|
||||||
|
wait_all_storage_nodes_returned(self.shell, self.cluster)
|
||||||
|
self.wait_s3gates()
|
||||||
|
self.wait_tree_healthcheck()
|
||||||
|
|
||||||
@run_optionally(optionals.OPTIONAL_FAILOVER_ENABLED)
|
@run_optionally(optionals.OPTIONAL_FAILOVER_ENABLED)
|
||||||
@reporter.step_deco("Start all stopped services")
|
@reporter.step_deco("Start all stopped services")
|
||||||
def start_all_stopped_services(self):
|
def start_all_stopped_services(self):
|
||||||
|
stopped_storages = self._get_stopped_by_type(StorageNode)
|
||||||
parallel([service.start_service for service in self.stopped_services])
|
parallel([service.start_service for service in self.stopped_services])
|
||||||
|
|
||||||
for service in self.stopped_services:
|
|
||||||
if isinstance(service, StorageNode):
|
|
||||||
wait_all_storage_nodes_returned(self.shell, self.cluster)
|
|
||||||
break
|
|
||||||
|
|
||||||
self.stopped_services.clear()
|
self.stopped_services.clear()
|
||||||
|
|
||||||
|
if stopped_storages:
|
||||||
|
self.wait_after_storage_startup()
|
||||||
|
|
||||||
@run_optionally(optionals.OPTIONAL_FAILOVER_ENABLED)
|
@run_optionally(optionals.OPTIONAL_FAILOVER_ENABLED)
|
||||||
@reporter.step_deco("Stop {service_type} service on {node}")
|
@reporter.step_deco("Stop {service_type} service on {node}")
|
||||||
def stop_service_of_type(
|
def stop_service_of_type(
|
||||||
|
@ -183,50 +193,78 @@ class ClusterStateController:
|
||||||
def start_service_of_type(self, node: ClusterNode, service_type: type[ServiceClass]):
|
def start_service_of_type(self, node: ClusterNode, service_type: type[ServiceClass]):
|
||||||
service = node.service(service_type)
|
service = node.service(service_type)
|
||||||
service.start_service()
|
service.start_service()
|
||||||
if service in self.stopped_services:
|
self.stopped_services.discard(service)
|
||||||
self.stopped_services.remove(service)
|
|
||||||
|
|
||||||
|
@run_optionally(optionals.OPTIONAL_FAILOVER_ENABLED)
|
||||||
|
@reporter.step_deco("Start all stopped {service_type} services")
|
||||||
|
def start_stopped_services_of_type(self, service_type: type[ServiceClass]):
|
||||||
|
stopped_svc = self._get_stopped_by_type(service_type)
|
||||||
|
if not stopped_svc:
|
||||||
|
return
|
||||||
|
|
||||||
|
parallel([svc.start_service for svc in stopped_svc])
|
||||||
|
self.stopped_services.difference_update(stopped_svc)
|
||||||
|
|
||||||
|
if service_type == StorageNode:
|
||||||
|
self.wait_after_storage_startup()
|
||||||
|
|
||||||
|
# TODO: Deprecated
|
||||||
|
@run_optionally(optionals.OPTIONAL_FAILOVER_ENABLED)
|
||||||
|
@reporter.step_deco("Stop all storage services on cluster")
|
||||||
|
def stop_all_storage_services(self, reversed_order: bool = False):
|
||||||
|
nodes = (
|
||||||
|
reversed(self.cluster.cluster_nodes) if reversed_order else self.cluster.cluster_nodes
|
||||||
|
)
|
||||||
|
|
||||||
|
for node in nodes:
|
||||||
|
self.stop_service_of_type(node, StorageNode)
|
||||||
|
|
||||||
|
# TODO: Deprecated
|
||||||
|
@run_optionally(optionals.OPTIONAL_FAILOVER_ENABLED)
|
||||||
|
@reporter.step_deco("Stop all S3 gates on cluster")
|
||||||
|
def stop_all_s3_gates(self, reversed_order: bool = False):
|
||||||
|
nodes = (
|
||||||
|
reversed(self.cluster.cluster_nodes) if reversed_order else self.cluster.cluster_nodes
|
||||||
|
)
|
||||||
|
|
||||||
|
for node in nodes:
|
||||||
|
self.stop_service_of_type(node, S3Gate)
|
||||||
|
|
||||||
|
# TODO: Deprecated
|
||||||
|
@run_optionally(optionals.OPTIONAL_FAILOVER_ENABLED)
|
||||||
|
@reporter.step_deco("Stop storage service on {node}")
|
||||||
|
def stop_storage_service(self, node: ClusterNode, mask: bool = True):
|
||||||
|
self.stop_service_of_type(node, StorageNode, mask)
|
||||||
|
|
||||||
|
# TODO: Deprecated
|
||||||
@run_optionally(optionals.OPTIONAL_FAILOVER_ENABLED)
|
@run_optionally(optionals.OPTIONAL_FAILOVER_ENABLED)
|
||||||
@reporter.step_deco("Start storage service on {node}")
|
@reporter.step_deco("Start storage service on {node}")
|
||||||
def start_storage_service(self, node: ClusterNode):
|
def start_storage_service(self, node: ClusterNode):
|
||||||
node.storage_node.start_service()
|
self.start_service_of_type(node, StorageNode)
|
||||||
self.stopped_storage_nodes.remove(node)
|
|
||||||
|
|
||||||
|
# TODO: Deprecated
|
||||||
@run_optionally(optionals.OPTIONAL_FAILOVER_ENABLED)
|
@run_optionally(optionals.OPTIONAL_FAILOVER_ENABLED)
|
||||||
@reporter.step_deco("Start stopped storage services")
|
@reporter.step_deco("Start stopped storage services")
|
||||||
def start_stopped_storage_services(self):
|
def start_stopped_storage_services(self):
|
||||||
if not self.stopped_storage_nodes:
|
self.start_stopped_services_of_type(StorageNode)
|
||||||
return
|
|
||||||
|
|
||||||
# In case if we stopped couple services, for example (s01-s04):
|
|
||||||
# After starting only s01, it may require connections to s02-s04, which is still down, and fail to start.
|
|
||||||
# Also, if something goes wrong here, we might skip s02-s04 start at all, and cluster will be left in a bad state.
|
|
||||||
# So in order to make sure that services are at least attempted to be started, using parallel runs here.
|
|
||||||
parallel(self.start_storage_service, copy.copy(self.stopped_storage_nodes))
|
|
||||||
|
|
||||||
wait_all_storage_nodes_returned(self.shell, self.cluster)
|
|
||||||
self.stopped_storage_nodes = []
|
|
||||||
|
|
||||||
|
# TODO: Deprecated
|
||||||
@run_optionally(optionals.OPTIONAL_FAILOVER_ENABLED)
|
@run_optionally(optionals.OPTIONAL_FAILOVER_ENABLED)
|
||||||
@reporter.step_deco("Stop s3 gate on {node}")
|
@reporter.step_deco("Stop s3 gate on {node}")
|
||||||
def stop_s3_gate(self, node: ClusterNode, mask: bool = True):
|
def stop_s3_gate(self, node: ClusterNode, mask: bool = True):
|
||||||
node.s3_gate.stop_service(mask)
|
self.stop_service_of_type(node, S3Gate, mask)
|
||||||
self.stopped_s3_gates.append(node)
|
|
||||||
|
|
||||||
|
# TODO: Deprecated
|
||||||
@run_optionally(optionals.OPTIONAL_FAILOVER_ENABLED)
|
@run_optionally(optionals.OPTIONAL_FAILOVER_ENABLED)
|
||||||
@reporter.step_deco("Start s3 gate on {node}")
|
@reporter.step_deco("Start s3 gate on {node}")
|
||||||
def start_s3_gate(self, node: ClusterNode):
|
def start_s3_gate(self, node: ClusterNode):
|
||||||
node.s3_gate.start_service()
|
self.start_service_of_type(node, S3Gate)
|
||||||
self.stopped_s3_gates.remove(node)
|
|
||||||
|
|
||||||
|
# TODO: Deprecated
|
||||||
@run_optionally(optionals.OPTIONAL_FAILOVER_ENABLED)
|
@run_optionally(optionals.OPTIONAL_FAILOVER_ENABLED)
|
||||||
@reporter.step_deco("Start stopped S3 gates")
|
@reporter.step_deco("Start stopped S3 gates")
|
||||||
def start_stopped_s3_gates(self):
|
def start_stopped_s3_gates(self):
|
||||||
if not self.stopped_s3_gates:
|
self.start_stopped_services_of_type(S3Gate)
|
||||||
return
|
|
||||||
|
|
||||||
parallel(self.start_s3_gate, copy.copy(self.stopped_s3_gates))
|
|
||||||
self.stopped_s3_gates = []
|
|
||||||
|
|
||||||
@run_optionally(optionals.OPTIONAL_FAILOVER_ENABLED)
|
@run_optionally(optionals.OPTIONAL_FAILOVER_ENABLED)
|
||||||
@reporter.step_deco("Suspend {process_name} service in {node}")
|
@reporter.step_deco("Suspend {process_name} service in {node}")
|
||||||
|
|
|
@ -7,6 +7,7 @@ import yaml
|
||||||
from frostfs_testlib.hosting.config import ServiceConfig
|
from frostfs_testlib.hosting.config import ServiceConfig
|
||||||
from frostfs_testlib.hosting.interfaces import Host
|
from frostfs_testlib.hosting.interfaces import Host
|
||||||
from frostfs_testlib.reporter import get_reporter
|
from frostfs_testlib.reporter import get_reporter
|
||||||
|
from frostfs_testlib.shell.interfaces import CommandResult
|
||||||
from frostfs_testlib.storage.constants import ConfigAttributes
|
from frostfs_testlib.storage.constants import ConfigAttributes
|
||||||
from frostfs_testlib.testing.readable import HumanReadableABC
|
from frostfs_testlib.testing.readable import HumanReadableABC
|
||||||
from frostfs_testlib.utils import wallet_utils
|
from frostfs_testlib.utils import wallet_utils
|
||||||
|
@ -67,6 +68,12 @@ class NodeBase(HumanReadableABC):
|
||||||
def service_healthcheck(self) -> bool:
|
def service_healthcheck(self) -> bool:
|
||||||
"""Service healthcheck."""
|
"""Service healthcheck."""
|
||||||
|
|
||||||
|
# TODO: Migrate to sub-class Metrcis (not yet exists :))
|
||||||
|
def get_metric(self, metric: str) -> CommandResult:
|
||||||
|
shell = self.host.get_shell()
|
||||||
|
result = shell.exec(f"curl -s {self.get_metrics_endpoint()} | grep -e '^{metric}'")
|
||||||
|
return result
|
||||||
|
|
||||||
def get_metrics_endpoint(self) -> str:
|
def get_metrics_endpoint(self) -> str:
|
||||||
return self._get_attribute(ConfigAttributes.ENDPOINT_PROMETHEUS)
|
return self._get_attribute(ConfigAttributes.ENDPOINT_PROMETHEUS)
|
||||||
|
|
||||||
|
|
|
@ -3,6 +3,3 @@ import frostfs_testlib.utils.datetime_utils
|
||||||
import frostfs_testlib.utils.json_utils
|
import frostfs_testlib.utils.json_utils
|
||||||
import frostfs_testlib.utils.string_utils
|
import frostfs_testlib.utils.string_utils
|
||||||
import frostfs_testlib.utils.wallet_utils
|
import frostfs_testlib.utils.wallet_utils
|
||||||
|
|
||||||
# TODO: Circullar dependency FileKeeper -> NodeBase -> Utils -> FileKeeper -> NodeBase
|
|
||||||
from frostfs_testlib.utils.file_keeper import FileKeeper
|
|
||||||
|
|
Loading…
Reference in a new issue