forked from TrueCloudLab/frostfs-testlib
546 lines
25 KiB
Python
546 lines
25 KiB
Python
import datetime
|
|
import logging
|
|
import time
|
|
from typing import TypeVar
|
|
|
|
import frostfs_testlib.resources.optionals as optionals
|
|
from frostfs_testlib import reporter
|
|
from frostfs_testlib.cli import FrostfsAdm, FrostfsCli
|
|
from frostfs_testlib.cli.netmap_parser import NetmapParser
|
|
from frostfs_testlib.healthcheck.interfaces import Healthcheck
|
|
from frostfs_testlib.hosting.interfaces import HostStatus
|
|
from frostfs_testlib.plugins import load_all
|
|
from frostfs_testlib.resources.cli import FROSTFS_ADM_CONFIG_PATH, FROSTFS_ADM_EXEC, FROSTFS_CLI_EXEC
|
|
from frostfs_testlib.resources.common import MORPH_BLOCK_TIME
|
|
from frostfs_testlib.shell import CommandOptions, Shell, SshConnectionProvider
|
|
from frostfs_testlib.steps.network import IpHelper
|
|
from frostfs_testlib.steps.node_management import include_node_to_network_map, remove_nodes_from_map_morph
|
|
from frostfs_testlib.storage.cluster import Cluster, ClusterNode, S3Gate, StorageNode
|
|
from frostfs_testlib.storage.controllers.disk_controller import DiskController
|
|
from frostfs_testlib.storage.dataclasses.node_base import NodeBase, ServiceClass
|
|
from frostfs_testlib.storage.dataclasses.storage_object_info import NodeStatus
|
|
from frostfs_testlib.storage.dataclasses.wallet import WalletInfo
|
|
from frostfs_testlib.testing import parallel
|
|
from frostfs_testlib.testing.test_control import retry, run_optionally, wait_for_success
|
|
from frostfs_testlib.utils.datetime_utils import parse_time
|
|
|
|
logger = logging.getLogger("NeoLogger")
|
|
|
|
|
|
class StateManager:
|
|
def __init__(self, cluster_state_controller: "ClusterStateController") -> None:
|
|
self.csc = cluster_state_controller
|
|
|
|
|
|
StateManagerClass = TypeVar("StateManagerClass", bound=StateManager)
|
|
|
|
|
|
class ClusterStateController:
|
|
def __init__(self, shell: Shell, cluster: Cluster, healthcheck: Healthcheck) -> None:
|
|
self.stopped_nodes: list[ClusterNode] = []
|
|
self.detached_disks: dict[str, DiskController] = {}
|
|
self.dropped_traffic: list[ClusterNode] = []
|
|
self.excluded_from_netmap: list[StorageNode] = []
|
|
self.stopped_services: set[NodeBase] = set()
|
|
self.cluster = cluster
|
|
self.healthcheck = healthcheck
|
|
self.shell = shell
|
|
self.suspended_services: dict[str, list[ClusterNode]] = {}
|
|
self.nodes_with_modified_interface: list[ClusterNode] = []
|
|
self.managers: list[StateManagerClass] = []
|
|
|
|
# TODO: move all functionality to managers
|
|
managers = set(load_all(group="frostfs.testlib.csc_managers"))
|
|
for manager in managers:
|
|
self.managers.append(manager(self))
|
|
|
|
def manager(self, manager_type: type[StateManagerClass]) -> StateManagerClass:
|
|
for manager in self.managers:
|
|
# Subclasses here for the future if we have overriding subclasses of base interface
|
|
if issubclass(type(manager), manager_type):
|
|
return manager
|
|
|
|
def _get_stopped_by_node(self, node: ClusterNode) -> set[NodeBase]:
|
|
stopped_by_node = [svc for svc in self.stopped_services if svc.host == node.host]
|
|
return set(stopped_by_node)
|
|
|
|
def _get_stopped_by_type(self, service_type: type[ServiceClass]) -> set[ServiceClass]:
|
|
stopped_by_type = [svc for svc in self.stopped_services if isinstance(svc, service_type)]
|
|
return set(stopped_by_type)
|
|
|
|
def _from_stopped_nodes(self, service_type: type[ServiceClass]) -> set[ServiceClass]:
|
|
stopped_on_nodes = set([node.service(service_type) for node in self.stopped_nodes])
|
|
return set(stopped_on_nodes)
|
|
|
|
def _get_online(self, service_type: type[ServiceClass]) -> set[ServiceClass]:
|
|
stopped_svc = self._get_stopped_by_type(service_type).union(self._from_stopped_nodes(service_type))
|
|
online_svc = set(self.cluster.services(service_type)) - stopped_svc
|
|
return online_svc
|
|
|
|
@run_optionally(optionals.OPTIONAL_FAILOVER_ENABLED)
|
|
@reporter.step("Stop host of node {node}")
|
|
def stop_node_host(self, node: ClusterNode, mode: str):
|
|
# Drop ssh connection for this node before shutdown
|
|
provider = SshConnectionProvider()
|
|
provider.drop(node.host_ip)
|
|
|
|
self.stopped_nodes.append(node)
|
|
with reporter.step(f"Stop host {node.host.config.address}"):
|
|
node.host.stop_host(mode=mode)
|
|
self._wait_for_host_offline(node)
|
|
|
|
@run_optionally(optionals.OPTIONAL_FAILOVER_ENABLED)
|
|
@reporter.step("Shutdown whole cluster")
|
|
def shutdown_cluster(self, mode: str, reversed_order: bool = False):
|
|
nodes = reversed(self.cluster.cluster_nodes) if reversed_order else self.cluster.cluster_nodes
|
|
|
|
# Drop all ssh connections before shutdown
|
|
provider = SshConnectionProvider()
|
|
provider.drop_all()
|
|
|
|
for node in nodes:
|
|
with reporter.step(f"Stop host {node.host.config.address}"):
|
|
self.stopped_nodes.append(node)
|
|
node.host.stop_host(mode=mode)
|
|
|
|
for node in nodes:
|
|
self._wait_for_host_offline(node)
|
|
|
|
@run_optionally(optionals.OPTIONAL_FAILOVER_ENABLED)
|
|
@reporter.step("Start host of node {node}")
|
|
def start_node_host(self, node: ClusterNode, startup_healthcheck: bool = True):
|
|
with reporter.step(f"Start host {node.host.config.address}"):
|
|
node.host.start_host()
|
|
self._wait_for_host_online(node)
|
|
self.stopped_nodes.remove(node)
|
|
if startup_healthcheck:
|
|
self.wait_startup_healthcheck()
|
|
|
|
@run_optionally(optionals.OPTIONAL_FAILOVER_ENABLED)
|
|
@reporter.step("Start stopped hosts")
|
|
def start_stopped_hosts(self, reversed_order: bool = False):
|
|
if not self.stopped_nodes:
|
|
return
|
|
|
|
nodes = reversed(self.stopped_nodes) if reversed_order else self.stopped_nodes
|
|
for node in nodes:
|
|
with reporter.step(f"Start host {node.host.config.address}"):
|
|
node.host.start_host()
|
|
self.stopped_services.difference_update(self._get_stopped_by_node(node))
|
|
|
|
self.stopped_nodes = []
|
|
with reporter.step("Wait for all nodes to go online"):
|
|
parallel(self._wait_for_host_online, self.cluster.cluster_nodes)
|
|
|
|
self.wait_after_storage_startup()
|
|
|
|
@run_optionally(optionals.OPTIONAL_FAILOVER_ENABLED)
|
|
@reporter.step("Detach disk {device} at {mountpoint} on node {node}")
|
|
def detach_disk(self, node: StorageNode, device: str, mountpoint: str):
|
|
disk_controller = self._get_disk_controller(node, device, mountpoint)
|
|
self.detached_disks[disk_controller.id] = disk_controller
|
|
disk_controller.detach()
|
|
|
|
@run_optionally(optionals.OPTIONAL_FAILOVER_ENABLED)
|
|
@reporter.step("Attach disk {device} at {mountpoint} on node {node}")
|
|
def attach_disk(self, node: StorageNode, device: str, mountpoint: str):
|
|
disk_controller = self._get_disk_controller(node, device, mountpoint)
|
|
disk_controller.attach()
|
|
self.detached_disks.pop(disk_controller.id, None)
|
|
|
|
@run_optionally(optionals.OPTIONAL_FAILOVER_ENABLED)
|
|
@reporter.step("Restore detached disks")
|
|
def restore_disks(self):
|
|
for disk_controller in self.detached_disks.values():
|
|
disk_controller.attach()
|
|
self.detached_disks = {}
|
|
|
|
@run_optionally(optionals.OPTIONAL_FAILOVER_ENABLED)
|
|
@reporter.step("Stop all {service_type} services")
|
|
def stop_services_of_type(self, service_type: type[ServiceClass], mask: bool = True):
|
|
services = self.cluster.services(service_type)
|
|
self.stopped_services.update(services)
|
|
parallel([service.stop_service for service in services], mask=mask)
|
|
|
|
@run_optionally(optionals.OPTIONAL_FAILOVER_ENABLED)
|
|
@reporter.step("Start all {service_type} services")
|
|
def start_services_of_type(self, service_type: type[ServiceClass]):
|
|
services = self.cluster.services(service_type)
|
|
parallel([service.start_service for service in services])
|
|
self.stopped_services.difference_update(set(services))
|
|
|
|
if service_type == StorageNode:
|
|
self.wait_after_storage_startup()
|
|
|
|
@wait_for_success(600, 60)
|
|
def wait_s3gate(self, s3gate: S3Gate):
|
|
with reporter.step(f"Wait for {s3gate} reconnection"):
|
|
result = s3gate.get_metric("frostfs_s3_gw_pool_current_nodes")
|
|
assert 'address="127.0.0.1' in result.stdout, "S3Gate should connect to local storage node"
|
|
|
|
@reporter.step("Wait for S3Gates reconnection to local storage")
|
|
def wait_s3gates(self):
|
|
online_s3gates = self._get_online(S3Gate)
|
|
if online_s3gates:
|
|
parallel(self.wait_s3gate, online_s3gates)
|
|
|
|
@reporter.step("Wait for cluster startup healtcheck")
|
|
def wait_startup_healthcheck(self):
|
|
nodes = self.cluster.nodes(self._get_online(StorageNode))
|
|
parallel(self.healthcheck.startup_healthcheck, nodes)
|
|
|
|
@reporter.step("Wait for storage reconnection to the system")
|
|
def wait_after_storage_startup(self):
|
|
self.wait_startup_healthcheck()
|
|
self.wait_s3gates()
|
|
|
|
@run_optionally(optionals.OPTIONAL_FAILOVER_ENABLED)
|
|
@reporter.step("Start all stopped services")
|
|
def start_all_stopped_services(self):
|
|
stopped_storages = self._get_stopped_by_type(StorageNode)
|
|
parallel([service.start_service for service in self.stopped_services])
|
|
self.stopped_services.clear()
|
|
|
|
if stopped_storages:
|
|
self.wait_after_storage_startup()
|
|
|
|
@run_optionally(optionals.OPTIONAL_FAILOVER_ENABLED)
|
|
@reporter.step("Stop {service_type} service on {node}")
|
|
def stop_service_of_type(self, node: ClusterNode, service_type: type[ServiceClass], mask: bool = True):
|
|
service = node.service(service_type)
|
|
service.stop_service(mask)
|
|
self.stopped_services.add(service)
|
|
|
|
@run_optionally(optionals.OPTIONAL_FAILOVER_ENABLED)
|
|
@reporter.step("Start {service_type} service on {node}")
|
|
def start_service_of_type(self, node: ClusterNode, service_type: type[ServiceClass]):
|
|
service = node.service(service_type)
|
|
service.start_service()
|
|
self.stopped_services.discard(service)
|
|
|
|
@run_optionally(optionals.OPTIONAL_FAILOVER_ENABLED)
|
|
@reporter.step("Start all stopped {service_type} services")
|
|
def start_stopped_services_of_type(self, service_type: type[ServiceClass]):
|
|
stopped_svc = self._get_stopped_by_type(service_type)
|
|
if not stopped_svc:
|
|
return
|
|
|
|
parallel([svc.start_service for svc in stopped_svc])
|
|
self.stopped_services.difference_update(stopped_svc)
|
|
|
|
if service_type == StorageNode:
|
|
self.wait_after_storage_startup()
|
|
|
|
# TODO: Deprecated
|
|
@run_optionally(optionals.OPTIONAL_FAILOVER_ENABLED)
|
|
@reporter.step("Stop all storage services on cluster")
|
|
def stop_all_storage_services(self, reversed_order: bool = False):
|
|
nodes = reversed(self.cluster.cluster_nodes) if reversed_order else self.cluster.cluster_nodes
|
|
|
|
for node in nodes:
|
|
self.stop_service_of_type(node, StorageNode)
|
|
|
|
# TODO: Deprecated
|
|
@run_optionally(optionals.OPTIONAL_FAILOVER_ENABLED)
|
|
@reporter.step("Stop all S3 gates on cluster")
|
|
def stop_all_s3_gates(self, reversed_order: bool = False):
|
|
nodes = reversed(self.cluster.cluster_nodes) if reversed_order else self.cluster.cluster_nodes
|
|
|
|
for node in nodes:
|
|
self.stop_service_of_type(node, S3Gate)
|
|
|
|
# TODO: Deprecated
|
|
@run_optionally(optionals.OPTIONAL_FAILOVER_ENABLED)
|
|
@reporter.step("Stop storage service on {node}")
|
|
def stop_storage_service(self, node: ClusterNode, mask: bool = True):
|
|
self.stop_service_of_type(node, StorageNode, mask)
|
|
|
|
# TODO: Deprecated
|
|
@run_optionally(optionals.OPTIONAL_FAILOVER_ENABLED)
|
|
@reporter.step("Start storage service on {node}")
|
|
def start_storage_service(self, node: ClusterNode):
|
|
self.start_service_of_type(node, StorageNode)
|
|
|
|
# TODO: Deprecated
|
|
@run_optionally(optionals.OPTIONAL_FAILOVER_ENABLED)
|
|
@reporter.step("Start stopped storage services")
|
|
def start_stopped_storage_services(self):
|
|
self.start_stopped_services_of_type(StorageNode)
|
|
|
|
# TODO: Deprecated
|
|
@run_optionally(optionals.OPTIONAL_FAILOVER_ENABLED)
|
|
@reporter.step("Stop s3 gate on {node}")
|
|
def stop_s3_gate(self, node: ClusterNode, mask: bool = True):
|
|
self.stop_service_of_type(node, S3Gate, mask)
|
|
|
|
# TODO: Deprecated
|
|
@run_optionally(optionals.OPTIONAL_FAILOVER_ENABLED)
|
|
@reporter.step("Start s3 gate on {node}")
|
|
def start_s3_gate(self, node: ClusterNode):
|
|
self.start_service_of_type(node, S3Gate)
|
|
|
|
# TODO: Deprecated
|
|
@run_optionally(optionals.OPTIONAL_FAILOVER_ENABLED)
|
|
@reporter.step("Start stopped S3 gates")
|
|
def start_stopped_s3_gates(self):
|
|
self.start_stopped_services_of_type(S3Gate)
|
|
|
|
@run_optionally(optionals.OPTIONAL_FAILOVER_ENABLED)
|
|
@reporter.step("Suspend {process_name} service in {node}")
|
|
def suspend_service(self, process_name: str, node: ClusterNode):
|
|
node.host.wait_success_suspend_process(process_name)
|
|
if self.suspended_services.get(process_name):
|
|
self.suspended_services[process_name].append(node)
|
|
else:
|
|
self.suspended_services[process_name] = [node]
|
|
|
|
@run_optionally(optionals.OPTIONAL_FAILOVER_ENABLED)
|
|
@reporter.step("Resume {process_name} service in {node}")
|
|
def resume_service(self, process_name: str, node: ClusterNode):
|
|
node.host.wait_success_resume_process(process_name)
|
|
if self.suspended_services.get(process_name) and node in self.suspended_services[process_name]:
|
|
self.suspended_services[process_name].remove(node)
|
|
|
|
@run_optionally(optionals.OPTIONAL_FAILOVER_ENABLED)
|
|
@reporter.step("Start suspend processes services")
|
|
def resume_suspended_services(self):
|
|
for process_name, list_nodes in self.suspended_services.items():
|
|
[node.host.wait_success_resume_process(process_name) for node in list_nodes]
|
|
self.suspended_services = {}
|
|
|
|
@reporter.step("Drop traffic to {node}, nodes - {block_nodes}")
|
|
def drop_traffic(self, node: ClusterNode, wakeup_timeout: int, name_interface: str, block_nodes: list[ClusterNode] = None) -> None:
|
|
list_ip = self._parse_interfaces(block_nodes, name_interface)
|
|
IpHelper.drop_input_traffic_to_node(node, list_ip)
|
|
time.sleep(wakeup_timeout)
|
|
self.dropped_traffic.append(node)
|
|
|
|
@reporter.step("Start traffic to {node}")
|
|
def restore_traffic(self, node: ClusterNode) -> None:
|
|
IpHelper.restore_input_traffic_to_node(node=node)
|
|
index = self.dropped_traffic.index(node)
|
|
self.dropped_traffic.pop(index)
|
|
|
|
@reporter.step("Restore blocked nodes")
|
|
def restore_all_traffic(self):
|
|
if not self.dropped_traffic:
|
|
return
|
|
parallel(self._restore_traffic_to_node, self.dropped_traffic)
|
|
|
|
@run_optionally(optionals.OPTIONAL_FAILOVER_ENABLED)
|
|
@reporter.step("Hard reboot host {node} via magic SysRq option")
|
|
def panic_reboot_host(self, node: ClusterNode, wait_for_return: bool = True, startup_healthcheck: bool = True):
|
|
shell = node.host.get_shell()
|
|
shell.exec('sudo sh -c "echo 1 > /proc/sys/kernel/sysrq"')
|
|
|
|
options = CommandOptions(close_stdin=True, timeout=1, check=False)
|
|
shell.exec('sudo sh -c "echo b > /proc/sysrq-trigger"', options)
|
|
|
|
# Drop ssh connection for this node
|
|
provider = SshConnectionProvider()
|
|
provider.drop(node.host_ip)
|
|
|
|
if wait_for_return:
|
|
# Let the things to be settled
|
|
# A little wait here to prevent ssh stuck during panic
|
|
time.sleep(10)
|
|
self._wait_for_host_online(node)
|
|
if startup_healthcheck:
|
|
self.wait_startup_healthcheck()
|
|
|
|
@reporter.step("Down {interface} to {nodes}")
|
|
def down_interface(self, nodes: list[ClusterNode], interface: str):
|
|
for node in nodes:
|
|
node.host.down_interface(interface=interface)
|
|
assert node.host.check_state(interface=interface) == "DOWN"
|
|
self.nodes_with_modified_interface.append(node)
|
|
|
|
@reporter.step("Up {interface} to {nodes}")
|
|
def up_interface(self, nodes: list[ClusterNode], interface: str):
|
|
for node in nodes:
|
|
node.host.up_interface(interface=interface)
|
|
assert node.host.check_state(interface=interface) == "UP"
|
|
if node in self.nodes_with_modified_interface:
|
|
self.nodes_with_modified_interface.remove(node)
|
|
|
|
@reporter.step("Restore interface")
|
|
def restore_interfaces(self):
|
|
for node in self.nodes_with_modified_interface:
|
|
dict_interfaces = node.host.config.interfaces.keys()
|
|
for name_interface in dict_interfaces:
|
|
if "mgmt" not in name_interface:
|
|
node.host.up_interface(interface=name_interface)
|
|
|
|
@reporter.step("Get node time")
|
|
def get_node_date(self, node: ClusterNode) -> datetime:
|
|
shell = node.host.get_shell()
|
|
return datetime.datetime.strptime(shell.exec("hwclock -r").stdout.strip(), "%Y-%m-%d %H:%M:%S.%f%z")
|
|
|
|
@reporter.step("Set node time to {in_date}")
|
|
def change_node_date(self, node: ClusterNode, in_date: datetime) -> None:
|
|
shell = node.host.get_shell()
|
|
shell.exec(f"date -s @{time.mktime(in_date.timetuple())}")
|
|
shell.exec("hwclock --systohc")
|
|
node_time = self.get_node_date(node)
|
|
with reporter.step(f"Verify difference between {node_time} and {in_date} is less than a minute"):
|
|
assert (self.get_node_date(node) - in_date) < datetime.timedelta(minutes=1)
|
|
|
|
@reporter.step(f"Restore time")
|
|
def restore_node_date(self, node: ClusterNode) -> None:
|
|
shell = node.host.get_shell()
|
|
now_time = datetime.datetime.now(datetime.timezone.utc)
|
|
with reporter.step(f"Set {now_time} time"):
|
|
shell.exec(f"date -s @{time.mktime(now_time.timetuple())}")
|
|
shell.exec("hwclock --systohc")
|
|
|
|
@reporter.step("Change the synchronizer status to {status}")
|
|
def set_sync_date_all_nodes(self, status: str):
|
|
if status == "active":
|
|
parallel(self._enable_date_synchronizer, self.cluster.cluster_nodes)
|
|
return
|
|
parallel(self._disable_date_synchronizer, self.cluster.cluster_nodes)
|
|
|
|
@reporter.step("Set MaintenanceModeAllowed - {status}")
|
|
def set_maintenance_mode_allowed(self, status: str, cluster_node: ClusterNode) -> None:
|
|
frostfs_adm = FrostfsAdm(
|
|
shell=cluster_node.host.get_shell(), frostfs_adm_exec_path=FROSTFS_ADM_EXEC, config_file=FROSTFS_ADM_CONFIG_PATH
|
|
)
|
|
frostfs_adm.morph.set_config(set_key_value=f"MaintenanceModeAllowed={status}")
|
|
|
|
@reporter.step("Set node status to {status} in CSC")
|
|
def set_node_status(self, cluster_node: ClusterNode, wallet: WalletInfo, status: NodeStatus, await_tick: bool = True) -> None:
|
|
rpc_endpoint = cluster_node.storage_node.get_rpc_endpoint()
|
|
control_endpoint = cluster_node.service(StorageNode).get_control_endpoint()
|
|
|
|
frostfs_adm, frostfs_cli, frostfs_cli_remote = self._get_cli(self.shell, wallet, cluster_node)
|
|
node_netinfo = NetmapParser.netinfo(frostfs_cli.netmap.netinfo(rpc_endpoint).stdout)
|
|
|
|
if node_netinfo.maintenance_mode_allowed == "false":
|
|
with reporter.step("Enable maintenance mode"):
|
|
frostfs_adm.morph.set_config("MaintenanceModeAllowed=true")
|
|
|
|
with reporter.step(f"Set node status to {status} using FrostfsCli"):
|
|
frostfs_cli_remote.control.set_status(control_endpoint, status.value)
|
|
|
|
if not await_tick:
|
|
return
|
|
|
|
with reporter.step("Tick 2 epoch with 2 block await."):
|
|
for _ in range(2):
|
|
frostfs_adm.morph.force_new_epoch()
|
|
time.sleep(parse_time(MORPH_BLOCK_TIME) * 2)
|
|
|
|
self.await_node_status(status, wallet, cluster_node)
|
|
|
|
@wait_for_success(80, 8, title="Wait for node status become {status}")
|
|
def await_node_status(self, status: NodeStatus, wallet: WalletInfo, cluster_node: ClusterNode, checker_node: ClusterNode = None):
|
|
frostfs_cli = FrostfsCli(self.shell, FROSTFS_CLI_EXEC, wallet.config_path)
|
|
if not checker_node:
|
|
checker_node = cluster_node
|
|
netmap = NetmapParser.snapshot_all_nodes(frostfs_cli.netmap.snapshot(checker_node.storage_node.get_rpc_endpoint()).stdout)
|
|
netmap = [node for node in netmap if cluster_node.host_ip == node.node]
|
|
if status == NodeStatus.OFFLINE:
|
|
assert cluster_node.host_ip not in netmap, f"{cluster_node.host_ip} not in Offline"
|
|
else:
|
|
assert netmap[0].node_status == status, f"Node status should be '{status}', but was '{netmap[0].node_status}'"
|
|
|
|
def remove_node_from_netmap(self, removes_nodes: list[StorageNode]) -> None:
|
|
alive_storage = list(set(self.cluster.storage_nodes) - set(removes_nodes))[0]
|
|
remove_nodes_from_map_morph(self.shell, self.cluster, removes_nodes, alive_storage)
|
|
self.excluded_from_netmap.extend(removes_nodes)
|
|
|
|
def include_node_to_netmap(self, include_node: StorageNode, alive_node: StorageNode):
|
|
include_node_to_network_map(include_node, alive_node, self.shell, self.cluster)
|
|
self.excluded_from_netmap.pop(self.excluded_from_netmap.index(include_node))
|
|
|
|
def include_all_excluded_nodes(self):
|
|
if not self.excluded_from_netmap:
|
|
return
|
|
alive_node = list(set(self.cluster.storage_nodes) - set(self.excluded_from_netmap))[0]
|
|
if not alive_node:
|
|
return
|
|
|
|
for exclude_node in self.excluded_from_netmap.copy():
|
|
self.include_node_to_netmap(exclude_node, alive_node)
|
|
|
|
def _get_cli(
|
|
self, local_shell: Shell, local_wallet: WalletInfo, cluster_node: ClusterNode
|
|
) -> tuple[FrostfsAdm, FrostfsCli, FrostfsCli]:
|
|
# TODO Move to service config
|
|
host = cluster_node.host
|
|
service_config = host.get_service_config(cluster_node.storage_node.name)
|
|
wallet_path = service_config.attributes["wallet_path"]
|
|
wallet_password = service_config.attributes["wallet_password"]
|
|
|
|
shell = host.get_shell()
|
|
wallet_config_path = f"/tmp/{cluster_node.storage_node.name}-config.yaml"
|
|
wallet_config = f'wallet: {wallet_path}\npassword: "{wallet_password}"'
|
|
shell.exec(f"echo '{wallet_config}' > {wallet_config_path}")
|
|
|
|
frostfs_adm = FrostfsAdm(shell=shell, frostfs_adm_exec_path=FROSTFS_ADM_EXEC, config_file=FROSTFS_ADM_CONFIG_PATH)
|
|
frostfs_cli = FrostfsCli(local_shell, FROSTFS_CLI_EXEC, local_wallet.config_path)
|
|
frostfs_cli_remote = FrostfsCli(shell=shell, frostfs_cli_exec_path=FROSTFS_CLI_EXEC, config_file=wallet_config_path)
|
|
return frostfs_adm, frostfs_cli, frostfs_cli_remote
|
|
|
|
def _enable_date_synchronizer(self, cluster_node: ClusterNode):
|
|
shell = cluster_node.host.get_shell()
|
|
shell.exec("timedatectl set-ntp true")
|
|
cluster_node.host.wait_for_service_to_be_in_state("systemd-timesyncd", "active", 15)
|
|
|
|
def _disable_date_synchronizer(self, cluster_node: ClusterNode):
|
|
shell = cluster_node.host.get_shell()
|
|
shell.exec("timedatectl set-ntp false")
|
|
cluster_node.host.wait_for_service_to_be_in_state("systemd-timesyncd", "inactive", 15)
|
|
|
|
def _get_disk_controller(self, node: StorageNode, device: str, mountpoint: str) -> DiskController:
|
|
disk_controller_id = DiskController.get_id(node, device)
|
|
if disk_controller_id in self.detached_disks.keys():
|
|
disk_controller = self.detached_disks[disk_controller_id]
|
|
else:
|
|
disk_controller = DiskController(node, device, mountpoint)
|
|
|
|
return disk_controller
|
|
|
|
def _restore_traffic_to_node(self, node):
|
|
IpHelper.restore_input_traffic_to_node(node)
|
|
|
|
def _parse_interfaces(self, nodes: list[ClusterNode], name_interface: str):
|
|
interfaces = []
|
|
for node in nodes:
|
|
dict_interfaces = node.host.config.interfaces
|
|
for type, ip in dict_interfaces.items():
|
|
if name_interface in type:
|
|
interfaces.append(ip)
|
|
return interfaces
|
|
|
|
@reporter.step("Ping node")
|
|
def _ping_host(self, node: ClusterNode):
|
|
options = CommandOptions(check=False)
|
|
return self.shell.exec(f"ping {node.host.config.address} -c 1", options).return_code
|
|
|
|
@retry(max_attempts=60, sleep_interval=10, expected_result=HostStatus.ONLINE, title="Waiting for {node} to go online")
|
|
def _wait_for_host_online(self, node: ClusterNode):
|
|
try:
|
|
ping_result = self._ping_host(node)
|
|
if ping_result != 0:
|
|
return HostStatus.OFFLINE
|
|
return node.host.get_host_status()
|
|
except Exception as err:
|
|
logger.warning(f"Host ping fails with error {err}")
|
|
return HostStatus.OFFLINE
|
|
|
|
@retry(max_attempts=60, sleep_interval=10, expected_result=HostStatus.OFFLINE, title="Waiting for {node} to go offline")
|
|
def _wait_for_host_offline(self, node: ClusterNode):
|
|
try:
|
|
ping_result = self._ping_host(node)
|
|
if ping_result == 0:
|
|
return HostStatus.ONLINE
|
|
return node.host.get_host_status()
|
|
except Exception as err:
|
|
logger.warning(f"Host ping fails with error {err}")
|
|
return HostStatus.ONLINE
|
|
|
|
@reporter.step("Get contract by domain - {domain_name}")
|
|
def get_domain_contracts(self, cluster_node: ClusterNode, domain_name: str):
|
|
frostfs_adm = FrostfsAdm(shell=cluster_node.host.get_shell(), frostfs_adm_exec_path=FROSTFS_ADM_EXEC)
|
|
return frostfs_adm.morph.dump_hashes(cluster_node.morph_chain.get_http_endpoint(), domain_name).stdout
|