forked from TrueCloudLab/frostfs-testlib
337 lines
14 KiB
Python
337 lines
14 KiB
Python
import copy
|
|
import time
|
|
|
|
import frostfs_testlib.resources.optionals as optionals
|
|
from frostfs_testlib.reporter import get_reporter
|
|
from frostfs_testlib.shell import CommandOptions, Shell
|
|
from frostfs_testlib.steps.iptables import IpTablesHelper
|
|
from frostfs_testlib.storage.cluster import Cluster, ClusterNode, StorageNode
|
|
from frostfs_testlib.storage.controllers.disk_controller import DiskController
|
|
from frostfs_testlib.storage.dataclasses.node_base import NodeBase, ServiceClass
|
|
from frostfs_testlib.testing import parallel
|
|
from frostfs_testlib.testing.test_control import run_optionally
|
|
from frostfs_testlib.utils.failover_utils import (
|
|
wait_all_storage_nodes_returned,
|
|
wait_for_host_offline,
|
|
wait_for_host_online,
|
|
wait_for_node_online,
|
|
)
|
|
|
|
reporter = get_reporter()
|
|
|
|
|
|
class ClusterStateController:
|
|
def __init__(self, shell: Shell, cluster: Cluster) -> None:
|
|
self.stopped_nodes: list[ClusterNode] = []
|
|
self.detached_disks: dict[str, DiskController] = {}
|
|
self.stopped_storage_nodes: list[ClusterNode] = []
|
|
self.stopped_s3_gates: list[ClusterNode] = []
|
|
self.dropped_traffic: list[ClusterNode] = []
|
|
self.stopped_services: set[NodeBase] = set()
|
|
self.cluster = cluster
|
|
self.shell = shell
|
|
self.suspended_services: dict[str, list[ClusterNode]] = {}
|
|
|
|
@run_optionally(optionals.OPTIONAL_FAILOVER_ENABLED)
|
|
@reporter.step_deco("Stop host of node {node}")
|
|
def stop_node_host(self, node: ClusterNode, mode: str):
|
|
with reporter.step(f"Stop host {node.host.config.address}"):
|
|
node.host.stop_host(mode=mode)
|
|
wait_for_host_offline(self.shell, node.storage_node)
|
|
self.stopped_nodes.append(node)
|
|
|
|
@run_optionally(optionals.OPTIONAL_FAILOVER_ENABLED)
|
|
@reporter.step_deco("Shutdown whole cluster")
|
|
def shutdown_cluster(self, mode: str, reversed_order: bool = False):
|
|
nodes = (
|
|
reversed(self.cluster.cluster_nodes) if reversed_order else self.cluster.cluster_nodes
|
|
)
|
|
for node in nodes:
|
|
with reporter.step(f"Stop host {node.host.config.address}"):
|
|
self.stopped_nodes.append(node)
|
|
node.host.stop_host(mode=mode)
|
|
|
|
for node in nodes:
|
|
wait_for_host_offline(self.shell, node.storage_node)
|
|
|
|
@run_optionally(optionals.OPTIONAL_FAILOVER_ENABLED)
|
|
@reporter.step_deco("Stop all storage services on cluster")
|
|
def stop_all_storage_services(self, reversed_order: bool = False):
|
|
nodes = (
|
|
reversed(self.cluster.cluster_nodes) if reversed_order else self.cluster.cluster_nodes
|
|
)
|
|
|
|
for node in nodes:
|
|
self.stop_storage_service(node)
|
|
|
|
@run_optionally(optionals.OPTIONAL_FAILOVER_ENABLED)
|
|
@reporter.step_deco("Stop all S3 gates on cluster")
|
|
def stop_all_s3_gates(self, reversed_order: bool = False):
|
|
nodes = (
|
|
reversed(self.cluster.cluster_nodes) if reversed_order else self.cluster.cluster_nodes
|
|
)
|
|
|
|
for node in nodes:
|
|
self.stop_s3_gate(node)
|
|
|
|
@run_optionally(optionals.OPTIONAL_FAILOVER_ENABLED)
|
|
@reporter.step_deco("Start host of node {node}")
|
|
def start_node_host(self, node: ClusterNode):
|
|
with reporter.step(f"Start host {node.host.config.address}"):
|
|
node.host.start_host()
|
|
wait_for_host_online(self.shell, node.storage_node)
|
|
wait_for_node_online(node.storage_node)
|
|
self.stopped_nodes.remove(node)
|
|
|
|
@run_optionally(optionals.OPTIONAL_FAILOVER_ENABLED)
|
|
@reporter.step_deco("Start stopped hosts")
|
|
def start_stopped_hosts(self, reversed_order: bool = False):
|
|
if not self.stopped_nodes:
|
|
return
|
|
|
|
nodes = reversed(self.stopped_nodes) if reversed_order else self.stopped_nodes
|
|
for node in nodes:
|
|
with reporter.step(f"Start host {node.host.config.address}"):
|
|
node.host.start_host()
|
|
if node in self.stopped_storage_nodes:
|
|
self.stopped_storage_nodes.remove(node)
|
|
|
|
if node in self.stopped_s3_gates:
|
|
self.stopped_s3_gates.remove(node)
|
|
self.stopped_nodes = []
|
|
wait_all_storage_nodes_returned(self.shell, self.cluster)
|
|
|
|
@run_optionally(optionals.OPTIONAL_FAILOVER_ENABLED)
|
|
@reporter.step_deco("Detach disk {device} at {mountpoint} on node {node}")
|
|
def detach_disk(self, node: StorageNode, device: str, mountpoint: str):
|
|
disk_controller = self._get_disk_controller(node, device, mountpoint)
|
|
self.detached_disks[disk_controller.id] = disk_controller
|
|
disk_controller.detach()
|
|
|
|
@run_optionally(optionals.OPTIONAL_FAILOVER_ENABLED)
|
|
@reporter.step_deco("Attach disk {device} at {mountpoint} on node {node}")
|
|
def attach_disk(self, node: StorageNode, device: str, mountpoint: str):
|
|
disk_controller = self._get_disk_controller(node, device, mountpoint)
|
|
disk_controller.attach()
|
|
self.detached_disks.pop(disk_controller.id, None)
|
|
|
|
@run_optionally(optionals.OPTIONAL_FAILOVER_ENABLED)
|
|
@reporter.step_deco("Restore detached disks")
|
|
def restore_disks(self):
|
|
for disk_controller in self.detached_disks.values():
|
|
disk_controller.attach()
|
|
self.detached_disks = {}
|
|
|
|
@run_optionally(optionals.OPTIONAL_FAILOVER_ENABLED)
|
|
@reporter.step_deco("Stop storage service on {node}")
|
|
def stop_storage_service(self, node: ClusterNode):
|
|
node.storage_node.stop_service()
|
|
self.stopped_storage_nodes.append(node)
|
|
|
|
@run_optionally(optionals.OPTIONAL_FAILOVER_ENABLED)
|
|
@reporter.step_deco("Stop all {service_type} services")
|
|
def stop_services_of_type(self, service_type: type[ServiceClass]):
|
|
services = self.cluster.services(service_type)
|
|
self.stopped_services.update(services)
|
|
parallel([service.stop_service for service in services])
|
|
|
|
@run_optionally(optionals.OPTIONAL_FAILOVER_ENABLED)
|
|
@reporter.step_deco("Start all {service_type} services")
|
|
def start_services_of_type(self, service_type: type[ServiceClass]):
|
|
services = self.cluster.services(service_type)
|
|
parallel([service.start_service for service in services])
|
|
|
|
if service_type == StorageNode:
|
|
wait_all_storage_nodes_returned(self.shell, self.cluster)
|
|
|
|
self.stopped_services = self.stopped_services - set(services)
|
|
|
|
@run_optionally(optionals.OPTIONAL_FAILOVER_ENABLED)
|
|
@reporter.step_deco("Start all stopped services")
|
|
def start_all_stopped_services(self):
|
|
parallel([service.start_service for service in self.stopped_services])
|
|
|
|
for service in self.stopped_services:
|
|
if isinstance(service, StorageNode):
|
|
wait_all_storage_nodes_returned(self.shell, self.cluster)
|
|
break
|
|
|
|
self.stopped_services.clear()
|
|
|
|
@run_optionally(optionals.OPTIONAL_FAILOVER_ENABLED)
|
|
@reporter.step_deco("Stop {service_type} service on {node}")
|
|
def stop_service_of_type(self, node: ClusterNode, service_type: type[ServiceClass]):
|
|
service = node.service(service_type)
|
|
service.stop_service()
|
|
self.stopped_services.add(service)
|
|
|
|
@run_optionally(optionals.OPTIONAL_FAILOVER_ENABLED)
|
|
@reporter.step_deco("Start {service_type} service on {node}")
|
|
def start_service_of_type(self, node: ClusterNode, service_type: type[ServiceClass]):
|
|
service = node.service(service_type)
|
|
service.start_service()
|
|
if service in self.stopped_services:
|
|
self.stopped_services.remove(service)
|
|
|
|
@run_optionally(optionals.OPTIONAL_FAILOVER_ENABLED)
|
|
@reporter.step_deco("Start storage service on {node}")
|
|
def start_storage_service(self, node: ClusterNode):
|
|
node.storage_node.start_service()
|
|
self.stopped_storage_nodes.remove(node)
|
|
|
|
@run_optionally(optionals.OPTIONAL_FAILOVER_ENABLED)
|
|
@reporter.step_deco("Start stopped storage services")
|
|
def start_stopped_storage_services(self):
|
|
if not self.stopped_storage_nodes:
|
|
return
|
|
|
|
# In case if we stopped couple services, for example (s01-s04):
|
|
# After starting only s01, it may require connections to s02-s04, which is still down, and fail to start.
|
|
# Also, if something goes wrong here, we might skip s02-s04 start at all, and cluster will be left in a bad state.
|
|
# So in order to make sure that services are at least attempted to be started, using parallel runs here.
|
|
parallel(self.start_storage_service, copy.copy(self.stopped_storage_nodes))
|
|
|
|
wait_all_storage_nodes_returned(self.shell, self.cluster)
|
|
self.stopped_storage_nodes = []
|
|
|
|
@run_optionally(optionals.OPTIONAL_FAILOVER_ENABLED)
|
|
@reporter.step_deco("Stop s3 gate on {node}")
|
|
def stop_s3_gate(self, node: ClusterNode):
|
|
node.s3_gate.stop_service()
|
|
self.stopped_s3_gates.append(node)
|
|
|
|
@run_optionally(optionals.OPTIONAL_FAILOVER_ENABLED)
|
|
@reporter.step_deco("Start s3 gate on {node}")
|
|
def start_s3_gate(self, node: ClusterNode):
|
|
node.s3_gate.start_service()
|
|
self.stopped_s3_gates.remove(node)
|
|
|
|
@run_optionally(optionals.OPTIONAL_FAILOVER_ENABLED)
|
|
@reporter.step_deco("Start stopped S3 gates")
|
|
def start_stopped_s3_gates(self):
|
|
if not self.stopped_s3_gates:
|
|
return
|
|
|
|
parallel(self.start_s3_gate, copy.copy(self.stopped_s3_gates))
|
|
self.stopped_s3_gates = []
|
|
|
|
@run_optionally(optionals.OPTIONAL_FAILOVER_ENABLED)
|
|
@reporter.step_deco("Suspend {process_name} service in {node}")
|
|
def suspend_service(self, process_name: str, node: ClusterNode):
|
|
node.host.wait_success_suspend_process(process_name)
|
|
if self.suspended_services.get(process_name):
|
|
self.suspended_services[process_name].append(node)
|
|
else:
|
|
self.suspended_services[process_name] = [node]
|
|
|
|
@run_optionally(optionals.OPTIONAL_FAILOVER_ENABLED)
|
|
@reporter.step_deco("Resume {process_name} service in {node}")
|
|
def resume_service(self, process_name: str, node: ClusterNode):
|
|
node.host.wait_success_resume_process(process_name)
|
|
if self.suspended_services.get(process_name):
|
|
self.suspended_services[process_name].append(node)
|
|
else:
|
|
self.suspended_services[process_name] = [node]
|
|
|
|
@run_optionally(optionals.OPTIONAL_FAILOVER_ENABLED)
|
|
@reporter.step_deco("Start suspend processes services")
|
|
def resume_suspended_services(self):
|
|
for process_name, list_nodes in self.suspended_services.items():
|
|
[node.host.wait_success_resume_process(process_name) for node in list_nodes]
|
|
self.suspended_services = {}
|
|
|
|
@reporter.step_deco("Drop traffic to {node}, with ports - {ports}, nodes - {block_nodes}")
|
|
def drop_traffic(
|
|
self,
|
|
mode: str,
|
|
node: ClusterNode,
|
|
wakeup_timeout: int,
|
|
ports: list[str] = None,
|
|
block_nodes: list[ClusterNode] = None,
|
|
) -> None:
|
|
allowed_modes = ["ports", "nodes"]
|
|
assert mode in allowed_modes
|
|
|
|
match mode:
|
|
case "ports":
|
|
IpTablesHelper.drop_input_traffic_to_port(node, ports)
|
|
case "nodes":
|
|
list_ip = self._parse_intefaces(block_nodes)
|
|
IpTablesHelper.drop_input_traffic_to_node(node, list_ip)
|
|
time.sleep(wakeup_timeout)
|
|
self.dropped_traffic.append(node)
|
|
|
|
@reporter.step_deco("Ping traffic")
|
|
def ping_traffic(
|
|
self,
|
|
node: ClusterNode,
|
|
nodes_list: list[ClusterNode],
|
|
expect_result: int,
|
|
) -> bool:
|
|
shell = node.host.get_shell()
|
|
options = CommandOptions(check=False)
|
|
ips = self._parse_intefaces(nodes_list)
|
|
for ip in ips:
|
|
code = shell.exec(f"ping {ip} -c 1", options).return_code
|
|
if code != expect_result:
|
|
return False
|
|
return True
|
|
|
|
@reporter.step_deco("Start traffic to {node}")
|
|
def restore_traffic(
|
|
self,
|
|
mode: str,
|
|
node: ClusterNode,
|
|
) -> None:
|
|
allowed_modes = ["ports", "nodes"]
|
|
assert mode in allowed_modes
|
|
|
|
match mode:
|
|
case "ports":
|
|
IpTablesHelper.restore_input_traffic_to_port(node=node)
|
|
case "nodes":
|
|
IpTablesHelper.restore_input_traffic_to_node(node=node)
|
|
|
|
@reporter.step_deco("Restore blocked nodes")
|
|
def restore_all_traffic(self):
|
|
parallel(self._restore_traffic_to_node, self.dropped_traffic)
|
|
|
|
@run_optionally(optionals.OPTIONAL_FAILOVER_ENABLED)
|
|
@reporter.step_deco("Hard reboot host {node} via magic SysRq option")
|
|
def panic_reboot_host(self, node: ClusterNode, wait_for_return: bool = True):
|
|
shell = node.host.get_shell()
|
|
shell.exec('sudo sh -c "echo 1 > /proc/sys/kernel/sysrq"')
|
|
|
|
options = CommandOptions(close_stdin=True, timeout=1, check=False)
|
|
shell.exec('sudo sh -c "echo b > /proc/sysrq-trigger"', options)
|
|
|
|
if wait_for_return:
|
|
# Let the things to be settled
|
|
# A little wait here to prevent ssh stuck during panic
|
|
time.sleep(10)
|
|
wait_for_host_online(self.shell, node.storage_node)
|
|
wait_for_node_online(node.storage_node)
|
|
|
|
def _get_disk_controller(
|
|
self, node: StorageNode, device: str, mountpoint: str
|
|
) -> DiskController:
|
|
disk_controller_id = DiskController.get_id(node, device)
|
|
if disk_controller_id in self.detached_disks.keys():
|
|
disk_controller = self.detached_disks[disk_controller_id]
|
|
else:
|
|
disk_controller = DiskController(node, device, mountpoint)
|
|
|
|
return disk_controller
|
|
|
|
def _restore_traffic_to_node(self, node):
|
|
IpTablesHelper.restore_input_traffic_to_port(node)
|
|
IpTablesHelper.restore_input_traffic_to_node(node)
|
|
|
|
def _parse_intefaces(self, nodes: list[ClusterNode]):
|
|
interfaces = []
|
|
for node in nodes:
|
|
dict_interfaces = node.host.config.interfaces
|
|
for type, ip in dict_interfaces.items():
|
|
if "mgmt" not in type:
|
|
interfaces.append(ip)
|
|
return interfaces
|