forked from TrueCloudLab/frostfs-testlib
Compare commits
1 commit
master
...
stop-cm-be
Author | SHA1 | Date | |
---|---|---|---|
4bfe99f13c |
5 changed files with 83 additions and 81 deletions
|
@ -5,6 +5,7 @@ from frostfs_testlib.storage.dataclasses.frostfs_services import (
|
||||||
MorphChain,
|
MorphChain,
|
||||||
S3Gate,
|
S3Gate,
|
||||||
StorageNode,
|
StorageNode,
|
||||||
|
CM
|
||||||
)
|
)
|
||||||
from frostfs_testlib.storage.service_registry import ServiceRegistry
|
from frostfs_testlib.storage.service_registry import ServiceRegistry
|
||||||
|
|
||||||
|
@ -16,7 +17,7 @@ __class_registry.register_service(_FrostfsServicesNames.INNER_RING, InnerRing)
|
||||||
__class_registry.register_service(_FrostfsServicesNames.MORPH_CHAIN, MorphChain)
|
__class_registry.register_service(_FrostfsServicesNames.MORPH_CHAIN, MorphChain)
|
||||||
__class_registry.register_service(_FrostfsServicesNames.S3_GATE, S3Gate)
|
__class_registry.register_service(_FrostfsServicesNames.S3_GATE, S3Gate)
|
||||||
__class_registry.register_service(_FrostfsServicesNames.HTTP_GATE, HTTPGate)
|
__class_registry.register_service(_FrostfsServicesNames.HTTP_GATE, HTTPGate)
|
||||||
|
__class_registry.register_service(_FrostfsServicesNames.CM, CM)
|
||||||
|
|
||||||
def get_service_registry() -> ServiceRegistry:
|
def get_service_registry() -> ServiceRegistry:
|
||||||
"""Returns registry with registered classes related to cluster and cluster nodes.
|
"""Returns registry with registered classes related to cluster and cluster nodes.
|
||||||
|
|
|
@ -15,6 +15,7 @@ from frostfs_testlib.storage.dataclasses.frostfs_services import (
|
||||||
MorphChain,
|
MorphChain,
|
||||||
S3Gate,
|
S3Gate,
|
||||||
StorageNode,
|
StorageNode,
|
||||||
|
CM
|
||||||
)
|
)
|
||||||
from frostfs_testlib.storage.dataclasses.node_base import NodeBase, ServiceClass
|
from frostfs_testlib.storage.dataclasses.node_base import NodeBase, ServiceClass
|
||||||
from frostfs_testlib.storage.service_registry import ServiceRegistry
|
from frostfs_testlib.storage.service_registry import ServiceRegistry
|
||||||
|
@ -76,6 +77,9 @@ class ClusterNode:
|
||||||
@property
|
@property
|
||||||
def s3_gate(self) -> S3Gate:
|
def s3_gate(self) -> S3Gate:
|
||||||
return self.service(S3Gate)
|
return self.service(S3Gate)
|
||||||
|
|
||||||
|
def cm(self) -> CM:
|
||||||
|
return self.service(CM)
|
||||||
|
|
||||||
def get_config(self, config_file_path: str) -> dict:
|
def get_config(self, config_file_path: str) -> dict:
|
||||||
shell = self.host.get_shell()
|
shell = self.host.get_shell()
|
||||||
|
@ -98,7 +102,7 @@ class ClusterNode:
|
||||||
|
|
||||||
Args:
|
Args:
|
||||||
service_type: type of the service which should be returned,
|
service_type: type of the service which should be returned,
|
||||||
for frostfs it can be StorageNode, S3Gate, HttpGate, MorphChain and InnerRing.
|
for frostfs it can be StorageNode, S3Gate, HttpGate, MorphChain, InnerRing and CM.
|
||||||
|
|
||||||
Returns:
|
Returns:
|
||||||
service of service_type class.
|
service of service_type class.
|
||||||
|
@ -132,6 +136,7 @@ class Cluster:
|
||||||
default_http_gate_endpoint: str
|
default_http_gate_endpoint: str
|
||||||
default_http_hostname: str
|
default_http_hostname: str
|
||||||
default_s3_hostname: str
|
default_s3_hostname: str
|
||||||
|
default_cm_hostname: str
|
||||||
|
|
||||||
def __init__(self, hosting: Hosting) -> None:
|
def __init__(self, hosting: Hosting) -> None:
|
||||||
self._hosting = hosting
|
self._hosting = hosting
|
||||||
|
|
|
@ -13,6 +13,7 @@ class ConfigAttributes:
|
||||||
UN_LOCODE = "un_locode"
|
UN_LOCODE = "un_locode"
|
||||||
HTTP_HOSTNAME = "http_hostname"
|
HTTP_HOSTNAME = "http_hostname"
|
||||||
S3_HOSTNAME = "s3_hostname"
|
S3_HOSTNAME = "s3_hostname"
|
||||||
|
CM_HOSTNAME = "cm_hostname"
|
||||||
|
|
||||||
|
|
||||||
class _FrostfsServicesNames:
|
class _FrostfsServicesNames:
|
||||||
|
@ -21,3 +22,4 @@ class _FrostfsServicesNames:
|
||||||
HTTP_GATE = "http-gate"
|
HTTP_GATE = "http-gate"
|
||||||
MORPH_CHAIN = "morph-chain"
|
MORPH_CHAIN = "morph-chain"
|
||||||
INNER_RING = "ir"
|
INNER_RING = "ir"
|
||||||
|
CM = "cm"
|
||||||
|
|
|
@ -1,15 +1,12 @@
|
||||||
import copy
|
|
||||||
import itertools
|
|
||||||
import time
|
import time
|
||||||
|
from concurrent.futures import ThreadPoolExecutor
|
||||||
|
|
||||||
import frostfs_testlib.resources.optionals as optionals
|
import frostfs_testlib.resources.optionals as optionals
|
||||||
from frostfs_testlib.reporter import get_reporter
|
from frostfs_testlib.reporter import get_reporter
|
||||||
from frostfs_testlib.shell import CommandOptions, Shell
|
from frostfs_testlib.shell import CommandOptions, Shell
|
||||||
from frostfs_testlib.steps import epoch
|
from frostfs_testlib.steps import epoch
|
||||||
from frostfs_testlib.steps.iptables import IpTablesHelper
|
|
||||||
from frostfs_testlib.storage.cluster import Cluster, ClusterNode, StorageNode
|
from frostfs_testlib.storage.cluster import Cluster, ClusterNode, StorageNode
|
||||||
from frostfs_testlib.storage.controllers.disk_controller import DiskController
|
from frostfs_testlib.storage.controllers.disk_controller import DiskController
|
||||||
from frostfs_testlib.testing import parallel
|
|
||||||
from frostfs_testlib.testing.test_control import run_optionally, wait_for_success
|
from frostfs_testlib.testing.test_control import run_optionally, wait_for_success
|
||||||
from frostfs_testlib.utils.failover_utils import (
|
from frostfs_testlib.utils.failover_utils import (
|
||||||
wait_all_storage_nodes_returned,
|
wait_all_storage_nodes_returned,
|
||||||
|
@ -27,7 +24,7 @@ class ClusterStateController:
|
||||||
self.detached_disks: dict[str, DiskController] = {}
|
self.detached_disks: dict[str, DiskController] = {}
|
||||||
self.stopped_storage_nodes: list[ClusterNode] = []
|
self.stopped_storage_nodes: list[ClusterNode] = []
|
||||||
self.stopped_s3_gates: list[ClusterNode] = []
|
self.stopped_s3_gates: list[ClusterNode] = []
|
||||||
self.dropped_traffic: list[ClusterNode] = []
|
self.stopped_cms: list[ClusterNode] = []
|
||||||
self.cluster = cluster
|
self.cluster = cluster
|
||||||
self.shell = shell
|
self.shell = shell
|
||||||
self.suspended_services: dict[str, list[ClusterNode]] = {}
|
self.suspended_services: dict[str, list[ClusterNode]] = {}
|
||||||
|
@ -73,7 +70,17 @@ class ClusterStateController:
|
||||||
|
|
||||||
for node in nodes:
|
for node in nodes:
|
||||||
self.stop_s3_gate(node)
|
self.stop_s3_gate(node)
|
||||||
|
|
||||||
|
@run_optionally(optionals.OPTIONAL_FAILOVER_ENABLED)
|
||||||
|
@reporter.step_deco("Stop all cm services on cluster")
|
||||||
|
def stop_all_cm_services(self, reversed_order: bool = False):
|
||||||
|
nodes = (
|
||||||
|
reversed(self.cluster.cluster_nodes) if reversed_order else self.cluster.cluster_nodes
|
||||||
|
)
|
||||||
|
|
||||||
|
for node in nodes:
|
||||||
|
self.stop_cm_service(node)
|
||||||
|
|
||||||
@run_optionally(optionals.OPTIONAL_FAILOVER_ENABLED)
|
@run_optionally(optionals.OPTIONAL_FAILOVER_ENABLED)
|
||||||
@reporter.step_deco("Start host of node {node}")
|
@reporter.step_deco("Start host of node {node}")
|
||||||
def start_node_host(self, node: ClusterNode):
|
def start_node_host(self, node: ClusterNode):
|
||||||
|
@ -134,6 +141,33 @@ class ClusterStateController:
|
||||||
node.storage_node.start_service()
|
node.storage_node.start_service()
|
||||||
self.stopped_storage_nodes.remove(node)
|
self.stopped_storage_nodes.remove(node)
|
||||||
|
|
||||||
|
@run_optionally(optionals.OPTIONAL_FAILOVER_ENABLED)
|
||||||
|
@reporter.step_deco("Stop cm service on {node}")
|
||||||
|
def stop_cm_service(self, node: ClusterNode):
|
||||||
|
node.cm().stop_service()
|
||||||
|
self.stopped_cms.append(node)
|
||||||
|
|
||||||
|
@run_optionally(optionals.OPTIONAL_FAILOVER_ENABLED)
|
||||||
|
@reporter.step_deco("Start cm service on {node}")
|
||||||
|
def start_cm_service(self, node: ClusterNode):
|
||||||
|
node.cm().start_service()
|
||||||
|
self.stopped_cms.remove(node)
|
||||||
|
|
||||||
|
@run_optionally(optionals.OPTIONAL_FAILOVER_ENABLED)
|
||||||
|
@reporter.step_deco("Start stopped cm services")
|
||||||
|
def start_stopped_cm_services(self):
|
||||||
|
if not self.stopped_cms:
|
||||||
|
return
|
||||||
|
|
||||||
|
# In case if we stopped couple services, for example (s01-s04):
|
||||||
|
# After starting only s01, it may require connections to s02-s04, which is still down, and fail to start.
|
||||||
|
# Also, if something goes wrong here, we might skip s02-s04 start at all, and cluster will be left in a bad state.
|
||||||
|
# So in order to make sure that services are at least attempted to be started, using parallel runs here.
|
||||||
|
parallel(self.start_cm_service, copy.copy(self.stopped_cms))
|
||||||
|
|
||||||
|
wait_all_storage_nodes_returned(self.shell, self.cluster)
|
||||||
|
self.stopped_cms = []
|
||||||
|
|
||||||
@run_optionally(optionals.OPTIONAL_FAILOVER_ENABLED)
|
@run_optionally(optionals.OPTIONAL_FAILOVER_ENABLED)
|
||||||
@reporter.step_deco("Start stopped storage services")
|
@reporter.step_deco("Start stopped storage services")
|
||||||
def start_stopped_storage_services(self):
|
def start_stopped_storage_services(self):
|
||||||
|
@ -143,8 +177,15 @@ class ClusterStateController:
|
||||||
# In case if we stopped couple services, for example (s01-s04):
|
# In case if we stopped couple services, for example (s01-s04):
|
||||||
# After starting only s01, it may require connections to s02-s04, which is still down, and fail to start.
|
# After starting only s01, it may require connections to s02-s04, which is still down, and fail to start.
|
||||||
# Also, if something goes wrong here, we might skip s02-s04 start at all, and cluster will be left in a bad state.
|
# Also, if something goes wrong here, we might skip s02-s04 start at all, and cluster will be left in a bad state.
|
||||||
# So in order to make sure that services are at least attempted to be started, using parallel runs here.
|
# So in order to make sure that services are at least attempted to be started, using threads here.
|
||||||
parallel(self.start_storage_service, copy.copy(self.stopped_storage_nodes))
|
with ThreadPoolExecutor(max_workers=len(self.stopped_storage_nodes)) as executor:
|
||||||
|
start_result = executor.map(self.start_storage_service, self.stopped_storage_nodes)
|
||||||
|
|
||||||
|
# Looks tricky, but if exception is raised in any thread, it will be "eaten" by ThreadPoolExecutor,
|
||||||
|
# But will be thrown here.
|
||||||
|
# Not ideal solution, but okay for now
|
||||||
|
for _ in start_result:
|
||||||
|
pass
|
||||||
|
|
||||||
wait_all_storage_nodes_returned(self.shell, self.cluster)
|
wait_all_storage_nodes_returned(self.shell, self.cluster)
|
||||||
self.stopped_storage_nodes = []
|
self.stopped_storage_nodes = []
|
||||||
|
@ -167,8 +208,14 @@ class ClusterStateController:
|
||||||
if not self.stopped_s3_gates:
|
if not self.stopped_s3_gates:
|
||||||
return
|
return
|
||||||
|
|
||||||
parallel(self.start_s3_gate, copy.copy(self.stopped_s3_gates))
|
with ThreadPoolExecutor(max_workers=len(self.stopped_s3_gates)) as executor:
|
||||||
self.stopped_s3_gates = []
|
start_result = executor.map(self.start_s3_gate, self.stopped_s3_gates)
|
||||||
|
|
||||||
|
# Looks tricky, but if exception is raised in any thread, it will be "eaten" by ThreadPoolExecutor,
|
||||||
|
# But will be thrown here.
|
||||||
|
# Not ideal solution, but okay for now
|
||||||
|
for _ in start_result:
|
||||||
|
pass
|
||||||
|
|
||||||
@run_optionally(optionals.OPTIONAL_FAILOVER_ENABLED)
|
@run_optionally(optionals.OPTIONAL_FAILOVER_ENABLED)
|
||||||
@reporter.step_deco("Suspend {process_name} service in {node}")
|
@reporter.step_deco("Suspend {process_name} service in {node}")
|
||||||
|
@ -195,62 +242,6 @@ class ClusterStateController:
|
||||||
[node.host.wait_success_resume_process(process_name) for node in list_nodes]
|
[node.host.wait_success_resume_process(process_name) for node in list_nodes]
|
||||||
self.suspended_services = {}
|
self.suspended_services = {}
|
||||||
|
|
||||||
@reporter.step_deco("Drop traffic to {node}, with ports - {ports}, nodes - {block_nodes}")
|
|
||||||
def drop_traffic(
|
|
||||||
self,
|
|
||||||
mode: str,
|
|
||||||
node: ClusterNode,
|
|
||||||
wakeup_timeout: int,
|
|
||||||
ports: list[str] = None,
|
|
||||||
block_nodes: list[ClusterNode] = None,
|
|
||||||
) -> None:
|
|
||||||
allowed_modes = ["ports", "nodes"]
|
|
||||||
assert mode in allowed_modes
|
|
||||||
|
|
||||||
match mode:
|
|
||||||
case "ports":
|
|
||||||
IpTablesHelper.drop_input_traffic_to_port(node, ports)
|
|
||||||
case "nodes":
|
|
||||||
list_ip = self._parse_intefaces(block_nodes)
|
|
||||||
IpTablesHelper.drop_input_traffic_to_node(node, list_ip)
|
|
||||||
time.sleep(wakeup_timeout)
|
|
||||||
self.dropped_traffic.append(node)
|
|
||||||
|
|
||||||
@reporter.step_deco("Ping traffic")
|
|
||||||
def ping_traffic(
|
|
||||||
self,
|
|
||||||
node: ClusterNode,
|
|
||||||
nodes_list: list[ClusterNode],
|
|
||||||
expect_result: int,
|
|
||||||
) -> bool:
|
|
||||||
shell = node.host.get_shell()
|
|
||||||
options = CommandOptions(check=False)
|
|
||||||
ips = self._parse_intefaces(nodes_list)
|
|
||||||
for ip in ips:
|
|
||||||
code = shell.exec(f"ping {ip} -c 1", options).return_code
|
|
||||||
if code != expect_result:
|
|
||||||
return False
|
|
||||||
return True
|
|
||||||
|
|
||||||
@reporter.step_deco("Start traffic to {node}")
|
|
||||||
def restore_traffic(
|
|
||||||
self,
|
|
||||||
mode: str,
|
|
||||||
node: ClusterNode,
|
|
||||||
) -> None:
|
|
||||||
allowed_modes = ["ports", "nodes"]
|
|
||||||
assert mode in allowed_modes
|
|
||||||
|
|
||||||
match mode:
|
|
||||||
case "ports":
|
|
||||||
IpTablesHelper.restore_input_traffic_to_port(node=node)
|
|
||||||
case "nodes":
|
|
||||||
IpTablesHelper.restore_input_traffic_to_node(node=node)
|
|
||||||
|
|
||||||
@reporter.step_deco("Restore blocked nodes")
|
|
||||||
def restore_all_traffic(self):
|
|
||||||
parallel(self._restore_traffic_to_node, self.dropped_traffic)
|
|
||||||
|
|
||||||
@run_optionally(optionals.OPTIONAL_FAILOVER_ENABLED)
|
@run_optionally(optionals.OPTIONAL_FAILOVER_ENABLED)
|
||||||
@reporter.step_deco("Hard reboot host {node} via magic SysRq option")
|
@reporter.step_deco("Hard reboot host {node} via magic SysRq option")
|
||||||
def panic_reboot_host(self, node: ClusterNode, wait_for_return: bool = True):
|
def panic_reboot_host(self, node: ClusterNode, wait_for_return: bool = True):
|
||||||
|
@ -277,16 +268,3 @@ class ClusterStateController:
|
||||||
disk_controller = DiskController(node, device, mountpoint)
|
disk_controller = DiskController(node, device, mountpoint)
|
||||||
|
|
||||||
return disk_controller
|
return disk_controller
|
||||||
|
|
||||||
def _restore_traffic_to_node(self, node):
|
|
||||||
IpTablesHelper.restore_input_traffic_to_port(node)
|
|
||||||
IpTablesHelper.restore_input_traffic_to_node(node)
|
|
||||||
|
|
||||||
def _parse_intefaces(self, nodes: list[ClusterNode]):
|
|
||||||
interfaces = []
|
|
||||||
for node in nodes:
|
|
||||||
dict_interfaces = node.host.config.interfaces
|
|
||||||
for type, ip in dict_interfaces.items():
|
|
||||||
if "mgmt" not in type:
|
|
||||||
interfaces.append(ip)
|
|
||||||
return interfaces
|
|
||||||
|
|
|
@ -61,7 +61,6 @@ class S3Gate(NodeBase):
|
||||||
def label(self) -> str:
|
def label(self) -> str:
|
||||||
return f"{self.name}: {self.get_endpoint()}"
|
return f"{self.name}: {self.get_endpoint()}"
|
||||||
|
|
||||||
|
|
||||||
class HTTPGate(NodeBase):
|
class HTTPGate(NodeBase):
|
||||||
"""
|
"""
|
||||||
Class represents HTTP gateway in a cluster
|
Class represents HTTP gateway in a cluster
|
||||||
|
@ -152,6 +151,9 @@ class StorageNode(NodeBase):
|
||||||
|
|
||||||
def get_s3_hostname(self) -> str:
|
def get_s3_hostname(self) -> str:
|
||||||
return self._get_attribute(ConfigAttributes.S3_HOSTNAME)
|
return self._get_attribute(ConfigAttributes.S3_HOSTNAME)
|
||||||
|
|
||||||
|
def get_cm_hostname(self) -> str:
|
||||||
|
return self._get_attribute(ConfigAttributes.CM_HOSTNAME)
|
||||||
|
|
||||||
def delete_blobovnicza(self):
|
def delete_blobovnicza(self):
|
||||||
self.host.delete_blobovnicza(self.name)
|
self.host.delete_blobovnicza(self.name)
|
||||||
|
@ -171,3 +173,17 @@ class StorageNode(NodeBase):
|
||||||
@property
|
@property
|
||||||
def label(self) -> str:
|
def label(self) -> str:
|
||||||
return f"{self.name}: {self.get_rpc_endpoint()}"
|
return f"{self.name}: {self.get_rpc_endpoint()}"
|
||||||
|
|
||||||
|
class CM(NodeBase):
|
||||||
|
"""
|
||||||
|
Class represents cm service in a cluster
|
||||||
|
"""
|
||||||
|
@property
|
||||||
|
def label(self) -> str:
|
||||||
|
return f"{self.name}"
|
||||||
|
|
||||||
|
def service_healthcheck(self) -> bool:
|
||||||
|
"""Service healthcheck."""
|
||||||
|
|
||||||
|
def get_cm_hostname(self) -> str:
|
||||||
|
return self._get_attribute(ConfigAttributes.CM_HOSTNAME)
|
Loading…
Add table
Reference in a new issue