frostfs-testlib/src/frostfs_testlib/storage/controllers/cluster_state_controller.py

413 lines
17 KiB
Python

import time
import frostfs_testlib.resources.optionals as optionals
from frostfs_testlib.healthcheck.interfaces import Healthcheck
from frostfs_testlib.reporter import get_reporter
from frostfs_testlib.shell import CommandOptions, Shell, SshConnectionProvider
from frostfs_testlib.steps.network import IfUpDownHelper, IpTablesHelper
from frostfs_testlib.storage.cluster import Cluster, ClusterNode, S3Gate, StorageNode
from frostfs_testlib.storage.controllers.disk_controller import DiskController
from frostfs_testlib.storage.dataclasses.node_base import NodeBase, ServiceClass
from frostfs_testlib.testing import parallel
from frostfs_testlib.testing.test_control import run_optionally, wait_for_success
from frostfs_testlib.utils.failover_utils import (
wait_all_storage_nodes_returned,
wait_for_host_offline,
wait_for_host_online,
wait_for_node_online,
)
reporter = get_reporter()
if_up_down_helper = IfUpDownHelper()
class ClusterStateController:
def __init__(self, shell: Shell, cluster: Cluster, healthcheck: Healthcheck) -> None:
self.stopped_nodes: list[ClusterNode] = []
self.detached_disks: dict[str, DiskController] = {}
self.dropped_traffic: list[ClusterNode] = []
self.stopped_services: set[NodeBase] = set()
self.cluster = cluster
self.healthcheck = healthcheck
self.shell = shell
self.suspended_services: dict[str, list[ClusterNode]] = {}
self.nodes_with_modified_interface: list[ClusterNode] = []
def _get_stopped_by_node(self, node: ClusterNode) -> set[NodeBase]:
stopped_by_node = [svc for svc in self.stopped_services if svc.host == node.host]
return set(stopped_by_node)
def _get_stopped_by_type(self, service_type: type[ServiceClass]) -> set[ServiceClass]:
stopped_by_type = [svc for svc in self.stopped_services if isinstance(svc, service_type)]
return set(stopped_by_type)
def _from_stopped_nodes(self, service_type: type[ServiceClass]) -> set[ServiceClass]:
stopped_on_nodes = set([node.service(service_type) for node in self.stopped_nodes])
return set(stopped_on_nodes)
def _get_online(self, service_type: type[ServiceClass]) -> set[ServiceClass]:
stopped_svc = self._get_stopped_by_type(service_type).union(
self._from_stopped_nodes(service_type)
)
online_svc = set(self.cluster.services(service_type)) - stopped_svc
return online_svc
@run_optionally(optionals.OPTIONAL_FAILOVER_ENABLED)
@reporter.step_deco("Stop host of node {node}")
def stop_node_host(self, node: ClusterNode, mode: str):
# Drop ssh connection for this node before shutdown
provider = SshConnectionProvider()
provider.drop(node.host_ip)
self.stopped_nodes.append(node)
with reporter.step(f"Stop host {node.host.config.address}"):
node.host.stop_host(mode=mode)
wait_for_host_offline(self.shell, node.storage_node)
@run_optionally(optionals.OPTIONAL_FAILOVER_ENABLED)
@reporter.step_deco("Shutdown whole cluster")
def shutdown_cluster(self, mode: str, reversed_order: bool = False):
nodes = (
reversed(self.cluster.cluster_nodes) if reversed_order else self.cluster.cluster_nodes
)
# Drop all ssh connections before shutdown
provider = SshConnectionProvider()
provider.drop_all()
for node in nodes:
with reporter.step(f"Stop host {node.host.config.address}"):
self.stopped_nodes.append(node)
node.host.stop_host(mode=mode)
for node in nodes:
wait_for_host_offline(self.shell, node.storage_node)
@run_optionally(optionals.OPTIONAL_FAILOVER_ENABLED)
@reporter.step_deco("Start host of node {node}")
def start_node_host(self, node: ClusterNode):
with reporter.step(f"Start host {node.host.config.address}"):
node.host.start_host()
wait_for_host_online(self.shell, node.storage_node)
wait_for_node_online(node.storage_node)
self.stopped_nodes.remove(node)
@run_optionally(optionals.OPTIONAL_FAILOVER_ENABLED)
@reporter.step_deco("Start stopped hosts")
def start_stopped_hosts(self, reversed_order: bool = False):
if not self.stopped_nodes:
return
nodes = reversed(self.stopped_nodes) if reversed_order else self.stopped_nodes
for node in nodes:
with reporter.step(f"Start host {node.host.config.address}"):
node.host.start_host()
self.stopped_services.difference_update(self._get_stopped_by_node(node))
self.stopped_nodes = []
self.wait_after_storage_startup()
@run_optionally(optionals.OPTIONAL_FAILOVER_ENABLED)
@reporter.step_deco("Detach disk {device} at {mountpoint} on node {node}")
def detach_disk(self, node: StorageNode, device: str, mountpoint: str):
disk_controller = self._get_disk_controller(node, device, mountpoint)
self.detached_disks[disk_controller.id] = disk_controller
disk_controller.detach()
@run_optionally(optionals.OPTIONAL_FAILOVER_ENABLED)
@reporter.step_deco("Attach disk {device} at {mountpoint} on node {node}")
def attach_disk(self, node: StorageNode, device: str, mountpoint: str):
disk_controller = self._get_disk_controller(node, device, mountpoint)
disk_controller.attach()
self.detached_disks.pop(disk_controller.id, None)
@run_optionally(optionals.OPTIONAL_FAILOVER_ENABLED)
@reporter.step_deco("Restore detached disks")
def restore_disks(self):
for disk_controller in self.detached_disks.values():
disk_controller.attach()
self.detached_disks = {}
@run_optionally(optionals.OPTIONAL_FAILOVER_ENABLED)
@reporter.step_deco("Stop all {service_type} services")
def stop_services_of_type(self, service_type: type[ServiceClass], mask: bool = True):
services = self.cluster.services(service_type)
self.stopped_services.update(services)
parallel([service.stop_service for service in services], mask=mask)
@run_optionally(optionals.OPTIONAL_FAILOVER_ENABLED)
@reporter.step_deco("Start all {service_type} services")
def start_services_of_type(self, service_type: type[ServiceClass]):
services = self.cluster.services(service_type)
parallel([service.start_service for service in services])
self.stopped_services.difference_update(set(services))
if service_type == StorageNode:
self.wait_after_storage_startup()
@wait_for_success(600, 60)
def wait_s3gate(self, s3gate: S3Gate):
with reporter.step(f"Wait for {s3gate} reconnection"):
result = s3gate.get_metric("frostfs_s3_gw_pool_current_nodes")
assert (
'address="127.0.0.1' in result.stdout
), "S3Gate should connect to local storage node"
@reporter.step_deco("Wait for S3Gates reconnection to local storage")
def wait_s3gates(self):
online_s3gates = self._get_online(S3Gate)
parallel(self.wait_s3gate, online_s3gates)
@wait_for_success(600, 60)
def wait_tree_healthcheck(self):
nodes = self.cluster.nodes(self._get_online(StorageNode))
parallel(self.healthcheck.tree_healthcheck, nodes)
@reporter.step_deco("Wait for storage reconnection to the system")
def wait_after_storage_startup(self):
wait_all_storage_nodes_returned(self.shell, self.cluster)
self.wait_s3gates()
self.wait_tree_healthcheck()
@run_optionally(optionals.OPTIONAL_FAILOVER_ENABLED)
@reporter.step_deco("Start all stopped services")
def start_all_stopped_services(self):
stopped_storages = self._get_stopped_by_type(StorageNode)
parallel([service.start_service for service in self.stopped_services])
self.stopped_services.clear()
if stopped_storages:
self.wait_after_storage_startup()
@run_optionally(optionals.OPTIONAL_FAILOVER_ENABLED)
@reporter.step_deco("Stop {service_type} service on {node}")
def stop_service_of_type(
self, node: ClusterNode, service_type: type[ServiceClass], mask: bool = True
):
service = node.service(service_type)
service.stop_service(mask)
self.stopped_services.add(service)
@run_optionally(optionals.OPTIONAL_FAILOVER_ENABLED)
@reporter.step_deco("Start {service_type} service on {node}")
def start_service_of_type(self, node: ClusterNode, service_type: type[ServiceClass]):
service = node.service(service_type)
service.start_service()
self.stopped_services.discard(service)
@run_optionally(optionals.OPTIONAL_FAILOVER_ENABLED)
@reporter.step_deco("Start all stopped {service_type} services")
def start_stopped_services_of_type(self, service_type: type[ServiceClass]):
stopped_svc = self._get_stopped_by_type(service_type)
if not stopped_svc:
return
parallel([svc.start_service for svc in stopped_svc])
self.stopped_services.difference_update(stopped_svc)
if service_type == StorageNode:
self.wait_after_storage_startup()
# TODO: Deprecated
@run_optionally(optionals.OPTIONAL_FAILOVER_ENABLED)
@reporter.step_deco("Stop all storage services on cluster")
def stop_all_storage_services(self, reversed_order: bool = False):
nodes = (
reversed(self.cluster.cluster_nodes) if reversed_order else self.cluster.cluster_nodes
)
for node in nodes:
self.stop_service_of_type(node, StorageNode)
# TODO: Deprecated
@run_optionally(optionals.OPTIONAL_FAILOVER_ENABLED)
@reporter.step_deco("Stop all S3 gates on cluster")
def stop_all_s3_gates(self, reversed_order: bool = False):
nodes = (
reversed(self.cluster.cluster_nodes) if reversed_order else self.cluster.cluster_nodes
)
for node in nodes:
self.stop_service_of_type(node, S3Gate)
# TODO: Deprecated
@run_optionally(optionals.OPTIONAL_FAILOVER_ENABLED)
@reporter.step_deco("Stop storage service on {node}")
def stop_storage_service(self, node: ClusterNode, mask: bool = True):
self.stop_service_of_type(node, StorageNode, mask)
# TODO: Deprecated
@run_optionally(optionals.OPTIONAL_FAILOVER_ENABLED)
@reporter.step_deco("Start storage service on {node}")
def start_storage_service(self, node: ClusterNode):
self.start_service_of_type(node, StorageNode)
# TODO: Deprecated
@run_optionally(optionals.OPTIONAL_FAILOVER_ENABLED)
@reporter.step_deco("Start stopped storage services")
def start_stopped_storage_services(self):
self.start_stopped_services_of_type(StorageNode)
# TODO: Deprecated
@run_optionally(optionals.OPTIONAL_FAILOVER_ENABLED)
@reporter.step_deco("Stop s3 gate on {node}")
def stop_s3_gate(self, node: ClusterNode, mask: bool = True):
self.stop_service_of_type(node, S3Gate, mask)
# TODO: Deprecated
@run_optionally(optionals.OPTIONAL_FAILOVER_ENABLED)
@reporter.step_deco("Start s3 gate on {node}")
def start_s3_gate(self, node: ClusterNode):
self.start_service_of_type(node, S3Gate)
# TODO: Deprecated
@run_optionally(optionals.OPTIONAL_FAILOVER_ENABLED)
@reporter.step_deco("Start stopped S3 gates")
def start_stopped_s3_gates(self):
self.start_stopped_services_of_type(S3Gate)
@run_optionally(optionals.OPTIONAL_FAILOVER_ENABLED)
@reporter.step_deco("Suspend {process_name} service in {node}")
def suspend_service(self, process_name: str, node: ClusterNode):
node.host.wait_success_suspend_process(process_name)
if self.suspended_services.get(process_name):
self.suspended_services[process_name].append(node)
else:
self.suspended_services[process_name] = [node]
@run_optionally(optionals.OPTIONAL_FAILOVER_ENABLED)
@reporter.step_deco("Resume {process_name} service in {node}")
def resume_service(self, process_name: str, node: ClusterNode):
node.host.wait_success_resume_process(process_name)
if (
self.suspended_services.get(process_name)
and node in self.suspended_services[process_name]
):
self.suspended_services[process_name].remove(node)
@run_optionally(optionals.OPTIONAL_FAILOVER_ENABLED)
@reporter.step_deco("Start suspend processes services")
def resume_suspended_services(self):
for process_name, list_nodes in self.suspended_services.items():
[node.host.wait_success_resume_process(process_name) for node in list_nodes]
self.suspended_services = {}
@reporter.step_deco("Drop traffic to {node}, with ports - {ports}, nodes - {block_nodes}")
def drop_traffic(
self,
mode: str,
node: ClusterNode,
wakeup_timeout: int,
ports: list[str] = None,
block_nodes: list[ClusterNode] = None,
) -> None:
allowed_modes = ["ports", "nodes"]
assert mode in allowed_modes
match mode:
case "ports":
IpTablesHelper.drop_input_traffic_to_port(node, ports)
case "nodes":
list_ip = self._parse_intefaces(block_nodes)
IpTablesHelper.drop_input_traffic_to_node(node, list_ip)
time.sleep(wakeup_timeout)
self.dropped_traffic.append(node)
@reporter.step_deco("Ping traffic")
def ping_traffic(
self,
node: ClusterNode,
nodes_list: list[ClusterNode],
expect_result: int,
) -> bool:
shell = node.host.get_shell()
options = CommandOptions(check=False)
ips = self._parse_intefaces(nodes_list)
for ip in ips:
code = shell.exec(f"ping {ip} -c 1", options).return_code
if code != expect_result:
return False
return True
@reporter.step_deco("Start traffic to {node}")
def restore_traffic(
self,
mode: str,
node: ClusterNode,
) -> None:
allowed_modes = ["ports", "nodes"]
assert mode in allowed_modes
match mode:
case "ports":
IpTablesHelper.restore_input_traffic_to_port(node=node)
case "nodes":
IpTablesHelper.restore_input_traffic_to_node(node=node)
@reporter.step_deco("Restore blocked nodes")
def restore_all_traffic(self):
parallel(self._restore_traffic_to_node, self.dropped_traffic)
@run_optionally(optionals.OPTIONAL_FAILOVER_ENABLED)
@reporter.step_deco("Hard reboot host {node} via magic SysRq option")
def panic_reboot_host(self, node: ClusterNode, wait_for_return: bool = True):
shell = node.host.get_shell()
shell.exec('sudo sh -c "echo 1 > /proc/sys/kernel/sysrq"')
options = CommandOptions(close_stdin=True, timeout=1, check=False)
shell.exec('sudo sh -c "echo b > /proc/sysrq-trigger"', options)
# Drop ssh connection for this node
provider = SshConnectionProvider()
provider.drop(node.host_ip)
if wait_for_return:
# Let the things to be settled
# A little wait here to prevent ssh stuck during panic
time.sleep(10)
wait_for_host_online(self.shell, node.storage_node)
wait_for_node_online(node.storage_node)
@reporter.step_deco("Down {interface} to {nodes}")
def down_interface(self, nodes: list[ClusterNode], interface: str):
for node in nodes:
if_up_down_helper.down_interface(node=node, interface=interface)
assert if_up_down_helper.check_state(node=node, interface=interface) == "DOWN"
self.nodes_with_modified_interface.append(node)
@reporter.step_deco("Up {interface} to {nodes}")
def up_interface(self, nodes: list[ClusterNode], interface: str):
for node in nodes:
if_up_down_helper.up_interface(node=node, interface=interface)
assert if_up_down_helper.check_state(node=node, interface=interface) == "UP"
if node in self.nodes_with_modified_interface:
self.nodes_with_modified_interface.remove(node)
@reporter.step_deco("Restore interface")
def restore_interfaces(self):
for node in self.nodes_with_modified_interface:
if_up_down_helper.up_all_interface(node)
def _get_disk_controller(
self, node: StorageNode, device: str, mountpoint: str
) -> DiskController:
disk_controller_id = DiskController.get_id(node, device)
if disk_controller_id in self.detached_disks.keys():
disk_controller = self.detached_disks[disk_controller_id]
else:
disk_controller = DiskController(node, device, mountpoint)
return disk_controller
def _restore_traffic_to_node(self, node):
IpTablesHelper.restore_input_traffic_to_port(node)
IpTablesHelper.restore_input_traffic_to_node(node)
def _parse_intefaces(self, nodes: list[ClusterNode]):
interfaces = []
for node in nodes:
dict_interfaces = node.host.config.interfaces
for type, ip in dict_interfaces.items():
if "mgmt" not in type:
interfaces.append(ip)
return interfaces