Stop cm before wipe data, start cm after wipe data

Signed-off-by: m.malygina <m.malygina@yadro.com>
stop-cm-before-wipe-data
m.malygina 2023-08-24 13:23:58 +03:00
parent 7059596506
commit 4bfe99f13c
5 changed files with 83 additions and 81 deletions

View File

@ -5,6 +5,7 @@ from frostfs_testlib.storage.dataclasses.frostfs_services import (
MorphChain,
S3Gate,
StorageNode,
CM
)
from frostfs_testlib.storage.service_registry import ServiceRegistry
@ -16,7 +17,7 @@ __class_registry.register_service(_FrostfsServicesNames.INNER_RING, InnerRing)
__class_registry.register_service(_FrostfsServicesNames.MORPH_CHAIN, MorphChain)
__class_registry.register_service(_FrostfsServicesNames.S3_GATE, S3Gate)
__class_registry.register_service(_FrostfsServicesNames.HTTP_GATE, HTTPGate)
__class_registry.register_service(_FrostfsServicesNames.CM, CM)
def get_service_registry() -> ServiceRegistry:
"""Returns registry with registered classes related to cluster and cluster nodes.

View File

@ -15,6 +15,7 @@ from frostfs_testlib.storage.dataclasses.frostfs_services import (
MorphChain,
S3Gate,
StorageNode,
CM
)
from frostfs_testlib.storage.dataclasses.node_base import NodeBase, ServiceClass
from frostfs_testlib.storage.service_registry import ServiceRegistry
@ -76,6 +77,9 @@ class ClusterNode:
@property
def s3_gate(self) -> S3Gate:
return self.service(S3Gate)
def cm(self) -> CM:
return self.service(CM)
def get_config(self, config_file_path: str) -> dict:
shell = self.host.get_shell()
@ -98,7 +102,7 @@ class ClusterNode:
Args:
service_type: type of the service which should be returned,
for frostfs it can be StorageNode, S3Gate, HttpGate, MorphChain and InnerRing.
for frostfs it can be StorageNode, S3Gate, HttpGate, MorphChain, InnerRing and CM.
Returns:
service of service_type class.
@ -132,6 +136,7 @@ class Cluster:
default_http_gate_endpoint: str
default_http_hostname: str
default_s3_hostname: str
default_cm_hostname: str
def __init__(self, hosting: Hosting) -> None:
self._hosting = hosting

View File

@ -13,6 +13,7 @@ class ConfigAttributes:
UN_LOCODE = "un_locode"
HTTP_HOSTNAME = "http_hostname"
S3_HOSTNAME = "s3_hostname"
CM_HOSTNAME = "cm_hostname"
class _FrostfsServicesNames:
@ -21,3 +22,4 @@ class _FrostfsServicesNames:
HTTP_GATE = "http-gate"
MORPH_CHAIN = "morph-chain"
INNER_RING = "ir"
CM = "cm"

View File

@ -1,15 +1,12 @@
import copy
import itertools
import time
from concurrent.futures import ThreadPoolExecutor
import frostfs_testlib.resources.optionals as optionals
from frostfs_testlib.reporter import get_reporter
from frostfs_testlib.shell import CommandOptions, Shell
from frostfs_testlib.steps import epoch
from frostfs_testlib.steps.iptables import IpTablesHelper
from frostfs_testlib.storage.cluster import Cluster, ClusterNode, StorageNode
from frostfs_testlib.storage.controllers.disk_controller import DiskController
from frostfs_testlib.testing import parallel
from frostfs_testlib.testing.test_control import run_optionally, wait_for_success
from frostfs_testlib.utils.failover_utils import (
wait_all_storage_nodes_returned,
@ -27,7 +24,7 @@ class ClusterStateController:
self.detached_disks: dict[str, DiskController] = {}
self.stopped_storage_nodes: list[ClusterNode] = []
self.stopped_s3_gates: list[ClusterNode] = []
self.dropped_traffic: list[ClusterNode] = []
self.stopped_cms: list[ClusterNode] = []
self.cluster = cluster
self.shell = shell
self.suspended_services: dict[str, list[ClusterNode]] = {}
@ -73,7 +70,17 @@ class ClusterStateController:
for node in nodes:
self.stop_s3_gate(node)
@run_optionally(optionals.OPTIONAL_FAILOVER_ENABLED)
@reporter.step_deco("Stop all cm services on cluster")
def stop_all_cm_services(self, reversed_order: bool = False):
nodes = (
reversed(self.cluster.cluster_nodes) if reversed_order else self.cluster.cluster_nodes
)
for node in nodes:
self.stop_cm_service(node)
@run_optionally(optionals.OPTIONAL_FAILOVER_ENABLED)
@reporter.step_deco("Start host of node {node}")
def start_node_host(self, node: ClusterNode):
@ -134,6 +141,33 @@ class ClusterStateController:
node.storage_node.start_service()
self.stopped_storage_nodes.remove(node)
@run_optionally(optionals.OPTIONAL_FAILOVER_ENABLED)
@reporter.step_deco("Stop cm service on {node}")
def stop_cm_service(self, node: ClusterNode):
node.cm().stop_service()
self.stopped_cms.append(node)
@run_optionally(optionals.OPTIONAL_FAILOVER_ENABLED)
@reporter.step_deco("Start cm service on {node}")
def start_cm_service(self, node: ClusterNode):
node.cm().start_service()
self.stopped_cms.remove(node)
@run_optionally(optionals.OPTIONAL_FAILOVER_ENABLED)
@reporter.step_deco("Start stopped cm services")
def start_stopped_cm_services(self):
if not self.stopped_cms:
return
# In case if we stopped couple services, for example (s01-s04):
# After starting only s01, it may require connections to s02-s04, which is still down, and fail to start.
# Also, if something goes wrong here, we might skip s02-s04 start at all, and cluster will be left in a bad state.
# So in order to make sure that services are at least attempted to be started, using parallel runs here.
parallel(self.start_cm_service, copy.copy(self.stopped_cms))
wait_all_storage_nodes_returned(self.shell, self.cluster)
self.stopped_cms = []
@run_optionally(optionals.OPTIONAL_FAILOVER_ENABLED)
@reporter.step_deco("Start stopped storage services")
def start_stopped_storage_services(self):
@ -143,8 +177,15 @@ class ClusterStateController:
# In case if we stopped couple services, for example (s01-s04):
# After starting only s01, it may require connections to s02-s04, which is still down, and fail to start.
# Also, if something goes wrong here, we might skip s02-s04 start at all, and cluster will be left in a bad state.
# So in order to make sure that services are at least attempted to be started, using parallel runs here.
parallel(self.start_storage_service, copy.copy(self.stopped_storage_nodes))
# So in order to make sure that services are at least attempted to be started, using threads here.
with ThreadPoolExecutor(max_workers=len(self.stopped_storage_nodes)) as executor:
start_result = executor.map(self.start_storage_service, self.stopped_storage_nodes)
# Looks tricky, but if exception is raised in any thread, it will be "eaten" by ThreadPoolExecutor,
# But will be thrown here.
# Not ideal solution, but okay for now
for _ in start_result:
pass
wait_all_storage_nodes_returned(self.shell, self.cluster)
self.stopped_storage_nodes = []
@ -167,8 +208,14 @@ class ClusterStateController:
if not self.stopped_s3_gates:
return
parallel(self.start_s3_gate, copy.copy(self.stopped_s3_gates))
self.stopped_s3_gates = []
with ThreadPoolExecutor(max_workers=len(self.stopped_s3_gates)) as executor:
start_result = executor.map(self.start_s3_gate, self.stopped_s3_gates)
# Looks tricky, but if exception is raised in any thread, it will be "eaten" by ThreadPoolExecutor,
# But will be thrown here.
# Not ideal solution, but okay for now
for _ in start_result:
pass
@run_optionally(optionals.OPTIONAL_FAILOVER_ENABLED)
@reporter.step_deco("Suspend {process_name} service in {node}")
@ -195,62 +242,6 @@ class ClusterStateController:
[node.host.wait_success_resume_process(process_name) for node in list_nodes]
self.suspended_services = {}
@reporter.step_deco("Drop traffic to {node}, with ports - {ports}, nodes - {block_nodes}")
def drop_traffic(
self,
mode: str,
node: ClusterNode,
wakeup_timeout: int,
ports: list[str] = None,
block_nodes: list[ClusterNode] = None,
) -> None:
allowed_modes = ["ports", "nodes"]
assert mode in allowed_modes
match mode:
case "ports":
IpTablesHelper.drop_input_traffic_to_port(node, ports)
case "nodes":
list_ip = self._parse_intefaces(block_nodes)
IpTablesHelper.drop_input_traffic_to_node(node, list_ip)
time.sleep(wakeup_timeout)
self.dropped_traffic.append(node)
@reporter.step_deco("Ping traffic")
def ping_traffic(
self,
node: ClusterNode,
nodes_list: list[ClusterNode],
expect_result: int,
) -> bool:
shell = node.host.get_shell()
options = CommandOptions(check=False)
ips = self._parse_intefaces(nodes_list)
for ip in ips:
code = shell.exec(f"ping {ip} -c 1", options).return_code
if code != expect_result:
return False
return True
@reporter.step_deco("Start traffic to {node}")
def restore_traffic(
self,
mode: str,
node: ClusterNode,
) -> None:
allowed_modes = ["ports", "nodes"]
assert mode in allowed_modes
match mode:
case "ports":
IpTablesHelper.restore_input_traffic_to_port(node=node)
case "nodes":
IpTablesHelper.restore_input_traffic_to_node(node=node)
@reporter.step_deco("Restore blocked nodes")
def restore_all_traffic(self):
parallel(self._restore_traffic_to_node, self.dropped_traffic)
@run_optionally(optionals.OPTIONAL_FAILOVER_ENABLED)
@reporter.step_deco("Hard reboot host {node} via magic SysRq option")
def panic_reboot_host(self, node: ClusterNode, wait_for_return: bool = True):
@ -277,16 +268,3 @@ class ClusterStateController:
disk_controller = DiskController(node, device, mountpoint)
return disk_controller
def _restore_traffic_to_node(self, node):
IpTablesHelper.restore_input_traffic_to_port(node)
IpTablesHelper.restore_input_traffic_to_node(node)
def _parse_intefaces(self, nodes: list[ClusterNode]):
interfaces = []
for node in nodes:
dict_interfaces = node.host.config.interfaces
for type, ip in dict_interfaces.items():
if "mgmt" not in type:
interfaces.append(ip)
return interfaces

View File

@ -61,7 +61,6 @@ class S3Gate(NodeBase):
def label(self) -> str:
return f"{self.name}: {self.get_endpoint()}"
class HTTPGate(NodeBase):
"""
Class represents HTTP gateway in a cluster
@ -152,6 +151,9 @@ class StorageNode(NodeBase):
def get_s3_hostname(self) -> str:
return self._get_attribute(ConfigAttributes.S3_HOSTNAME)
def get_cm_hostname(self) -> str:
return self._get_attribute(ConfigAttributes.CM_HOSTNAME)
def delete_blobovnicza(self):
self.host.delete_blobovnicza(self.name)
@ -171,3 +173,17 @@ class StorageNode(NodeBase):
@property
def label(self) -> str:
return f"{self.name}: {self.get_rpc_endpoint()}"
class CM(NodeBase):
"""
Class represents cm service in a cluster
"""
@property
def label(self) -> str:
return f"{self.name}"
def service_healthcheck(self) -> bool:
"""Service healthcheck."""
def get_cm_hostname(self) -> str:
return self._get_attribute(ConfigAttributes.CM_HOSTNAME)