frostfs-testlib/src/frostfs_testlib/storage/controllers/cluster_state_controller.py
Andrey Berezin 2bad0f1db6 Add metabase and write_cache operations
Signed-off-by: Andrey Berezin <a.berezin@yadro.com>
2023-05-30 16:32:38 +03:00

169 lines
7.5 KiB
Python

import time
from concurrent.futures import ThreadPoolExecutor
import frostfs_testlib.resources.optionals as optionals
from frostfs_testlib.reporter import get_reporter
from frostfs_testlib.shell import CommandOptions, Shell
from frostfs_testlib.steps import epoch
from frostfs_testlib.storage.cluster import Cluster, ClusterNode, StorageNode
from frostfs_testlib.storage.controllers.disk_controller import DiskController
from frostfs_testlib.testing.test_control import run_optionally, wait_for_success
from frostfs_testlib.utils.failover_utils import (
wait_all_storage_nodes_returned,
wait_for_host_offline,
wait_for_host_online,
wait_for_node_online,
)
reporter = get_reporter()
class ClusterStateController:
def __init__(self, shell: Shell, cluster: Cluster) -> None:
self.stopped_nodes: list[ClusterNode] = []
self.detached_disks: dict[str, DiskController] = {}
self.stopped_storage_nodes: list[ClusterNode] = []
self.cluster = cluster
self.shell = shell
@run_optionally(optionals.OPTIONAL_FAILOVER_ENABLED)
@reporter.step_deco("Stop host of node {node}")
def stop_node_host(self, node: ClusterNode, mode: str):
with reporter.step(f"Stop host {node.host.config.address}"):
node.host.stop_host(mode=mode)
wait_for_host_offline(self.shell, node.storage_node)
self.stopped_nodes.append(node)
@run_optionally(optionals.OPTIONAL_FAILOVER_ENABLED)
@reporter.step_deco("Shutdown whole cluster")
def shutdown_cluster(self, mode: str, reversed_order: bool = False):
nodes = (
reversed(self.cluster.cluster_nodes) if reversed_order else self.cluster.cluster_nodes
)
for node in nodes:
with reporter.step(f"Stop host {node.host.config.address}"):
self.stopped_nodes.append(node)
node.host.stop_host(mode=mode)
for node in nodes:
wait_for_host_offline(self.shell, node.storage_node)
@run_optionally(optionals.OPTIONAL_FAILOVER_ENABLED)
@reporter.step_deco("Stop all storage services on cluster")
def stop_all_storage_services(self, reversed_order: bool = False):
nodes = (
reversed(self.cluster.cluster_nodes) if reversed_order else self.cluster.cluster_nodes
)
for node in nodes:
self.stop_storage_service(node)
@run_optionally(optionals.OPTIONAL_FAILOVER_ENABLED)
@reporter.step_deco("Start host of node {node}")
def start_node_host(self, node: ClusterNode):
with reporter.step(f"Start host {node.host.config.address}"):
node.host.start_host()
wait_for_host_online(self.shell, node.storage_node)
wait_for_node_online(node.storage_node)
self.stopped_nodes.remove(node)
@run_optionally(optionals.OPTIONAL_FAILOVER_ENABLED)
@reporter.step_deco("Start stopped hosts")
def start_stopped_hosts(self, reversed_order: bool = False):
nodes = reversed(self.stopped_nodes) if reversed_order else self.stopped_nodes
for node in nodes:
with reporter.step(f"Start host {node.host.config.address}"):
node.host.start_host()
self.stopped_nodes = []
wait_all_storage_nodes_returned(self.shell, self.cluster)
@run_optionally(optionals.OPTIONAL_FAILOVER_ENABLED)
@reporter.step_deco("Detach disk {device} at {mountpoint} on node {node}")
def detach_disk(self, node: StorageNode, device: str, mountpoint: str):
disk_controller = self._get_disk_controller(node, device, mountpoint)
self.detached_disks[disk_controller.id] = disk_controller
disk_controller.detach()
@run_optionally(optionals.OPTIONAL_FAILOVER_ENABLED)
@reporter.step_deco("Attach disk {device} at {mountpoint} on node {node}")
def attach_disk(self, node: StorageNode, device: str, mountpoint: str):
disk_controller = self._get_disk_controller(node, device, mountpoint)
disk_controller.attach()
self.detached_disks.pop(disk_controller.id, None)
@run_optionally(optionals.OPTIONAL_FAILOVER_ENABLED)
@reporter.step_deco("Restore detached disks")
def restore_disks(self):
for disk_controller in self.detached_disks.values():
disk_controller.attach()
self.detached_disks = {}
@run_optionally(optionals.OPTIONAL_FAILOVER_ENABLED)
@reporter.step_deco("Stop storage service on {node}")
def stop_storage_service(self, node: ClusterNode):
node.storage_node.stop_service()
self.stopped_storage_nodes.append(node)
@run_optionally(optionals.OPTIONAL_FAILOVER_ENABLED)
@reporter.step_deco("Start storage service on {node}")
def start_storage_service(self, node: ClusterNode):
node.storage_node.start_service()
self.stopped_storage_nodes.remove(node)
@run_optionally(optionals.OPTIONAL_FAILOVER_ENABLED)
@reporter.step_deco("Start stopped storage services")
def start_stopped_storage_services(self):
if self.stopped_storage_nodes:
# In case if we stopped couple services, for example (s01-s04):
# After starting only s01, it may require connections to s02-s04, which is still down, and fail to start.
# Also, if something goes wrong here, we might skip s02-s04 start at all, and cluster will be left in a bad state.
# So in order to make sure that services are at least attempted to be started, using threads here.
with ThreadPoolExecutor(max_workers=len(self.stopped_storage_nodes)) as executor:
start_result = executor.map(self.start_storage_service, self.stopped_storage_nodes)
# Looks tricky, but if exception is raised in any thread, it will be "eaten" by ThreadPoolExecutor,
# But will be thrown here.
# Not ideal solution, but okay for now
for _ in start_result:
pass
wait_all_storage_nodes_returned(self.shell, self.cluster)
self.stopped_storage_nodes = []
@run_optionally(optionals.OPTIONAL_FAILOVER_ENABLED)
@reporter.step_deco("Hard reboot host {node} via magic SysRq option")
def panic_reboot_host(self, node: ClusterNode, wait_for_return: bool = True):
shell = node.host.get_shell()
shell.exec('sudo sh -c "echo 1 > /proc/sys/kernel/sysrq"')
options = CommandOptions(close_stdin=True, timeout=1, check=False)
shell.exec('sudo sh -c "echo b > /proc/sysrq-trigger"', options)
if wait_for_return:
# Let the things to be settled
# A little wait here to prevent ssh stuck during panic
time.sleep(10)
wait_for_host_online(self.shell, node.storage_node)
wait_for_node_online(node.storage_node)
@reporter.step_deco("Wait up to {timeout} seconds for nodes on cluster to align epochs")
def wait_for_epochs_align(self, timeout=60):
@wait_for_success(timeout, 5, None, True)
def check_epochs():
epochs_by_node = epoch.get_epochs_from_nodes(self.shell, self.cluster)
assert (
len(set(epochs_by_node.values())) == 1
), f"unaligned epochs found: {epochs_by_node}"
check_epochs()
def _get_disk_controller(
self, node: StorageNode, device: str, mountpoint: str
) -> DiskController:
disk_controller_id = DiskController.get_id(node, device)
if disk_controller_id in self.detached_disks.keys():
disk_controller = self.detached_disks[disk_controller_id]
else:
disk_controller = DiskController(node, device, mountpoint)
return disk_controller