[#114] Add yaml configuration controllers
Signed-off-by: Andrey Berezin <a.berezin@yadro.com>
This commit is contained in:
parent
f8562da7e0
commit
72bd467c53
9 changed files with 244 additions and 22 deletions
|
@ -47,6 +47,10 @@ docker = "frostfs_testlib.hosting.docker_host:DockerHost"
|
|||
[project.entry-points."frostfs.testlib.healthcheck"]
|
||||
basic = "frostfs_testlib.healthcheck.basic_healthcheck:BasicHealthcheck"
|
||||
|
||||
[project.entry-points."frostfs.testlib.csc_managers"]
|
||||
config = "frostfs_testlib.storage.controllers.state_managers.config_state_manager:ConfigStateManager"
|
||||
|
||||
|
||||
[tool.isort]
|
||||
profile = "black"
|
||||
src_paths = ["src", "tests"]
|
||||
|
|
|
@ -17,3 +17,16 @@ def load_plugin(plugin_group: str, name: str) -> Any:
|
|||
return None
|
||||
plugin = plugins[name]
|
||||
return plugin.load()
|
||||
|
||||
|
||||
def load_all(group: str) -> Any:
|
||||
"""Loads all plugins using entry point specification.
|
||||
|
||||
Args:
|
||||
plugin_group: Name of plugin group.
|
||||
|
||||
Returns:
|
||||
Classes from specified group.
|
||||
"""
|
||||
plugins = entry_points(group=group)
|
||||
return [plugin.load() for plugin in plugins]
|
||||
|
|
|
@ -8,14 +8,10 @@ from frostfs_testlib.hosting import Host, Hosting
|
|||
from frostfs_testlib.hosting.config import ServiceConfig
|
||||
from frostfs_testlib.reporter import get_reporter
|
||||
from frostfs_testlib.storage import get_service_registry
|
||||
from frostfs_testlib.storage.configuration.interfaces import ServiceConfigurationYml
|
||||
from frostfs_testlib.storage.configuration.service_configuration import ServiceConfiguration
|
||||
from frostfs_testlib.storage.constants import ConfigAttributes
|
||||
from frostfs_testlib.storage.dataclasses.frostfs_services import (
|
||||
HTTPGate,
|
||||
InnerRing,
|
||||
MorphChain,
|
||||
S3Gate,
|
||||
StorageNode,
|
||||
)
|
||||
from frostfs_testlib.storage.dataclasses.frostfs_services import HTTPGate, InnerRing, MorphChain, S3Gate, StorageNode
|
||||
from frostfs_testlib.storage.dataclasses.node_base import NodeBase, ServiceClass
|
||||
from frostfs_testlib.storage.dataclasses.storage_object_info import Interfaces
|
||||
from frostfs_testlib.storage.service_registry import ServiceRegistry
|
||||
|
@ -93,6 +89,9 @@ class ClusterNode:
|
|||
config_str = yaml.dump(new_config)
|
||||
shell.exec(f"echo '{config_str}' | sudo tee {config_file_path}")
|
||||
|
||||
def config(self, service_type: type[ServiceClass]) -> ServiceConfigurationYml:
|
||||
return ServiceConfiguration(self.service(service_type))
|
||||
|
||||
def service(self, service_type: type[ServiceClass]) -> ServiceClass:
|
||||
"""
|
||||
Get a service cluster node of specified type.
|
||||
|
@ -118,9 +117,7 @@ class ClusterNode:
|
|||
)
|
||||
|
||||
def get_list_of_services(self) -> list[str]:
|
||||
return [
|
||||
config.attributes[ConfigAttributes.SERVICE_NAME] for config in self.host.config.services
|
||||
]
|
||||
return [config.attributes[ConfigAttributes.SERVICE_NAME] for config in self.host.config.services]
|
||||
|
||||
def get_all_interfaces(self) -> dict[str, str]:
|
||||
return self.host.config.interfaces
|
||||
|
@ -130,9 +127,7 @@ class ClusterNode:
|
|||
|
||||
def get_data_interfaces(self) -> list[str]:
|
||||
return [
|
||||
ip_address
|
||||
for name_interface, ip_address in self.host.config.interfaces.items()
|
||||
if "data" in name_interface
|
||||
ip_address for name_interface, ip_address in self.host.config.interfaces.items() if "data" in name_interface
|
||||
]
|
||||
|
||||
def get_data_interface(self, search_interface: str) -> list[str]:
|
||||
|
@ -221,9 +216,7 @@ class Cluster:
|
|||
|
||||
cluster_nodes = set()
|
||||
for service in services:
|
||||
cluster_nodes.update(
|
||||
[node for node in self.cluster_nodes if node.service(type(service)) == service]
|
||||
)
|
||||
cluster_nodes.update([node for node in self.cluster_nodes if node.service(type(service)) == service])
|
||||
|
||||
return list(cluster_nodes)
|
||||
|
||||
|
@ -331,8 +324,6 @@ class Cluster:
|
|||
return [node.get_endpoint() for node in nodes]
|
||||
|
||||
def get_nodes_by_ip(self, ips: list[str]) -> list[ClusterNode]:
|
||||
cluster_nodes = [
|
||||
node for node in self.cluster_nodes if URL(node.morph_chain.get_endpoint()).host in ips
|
||||
]
|
||||
cluster_nodes = [node for node in self.cluster_nodes if URL(node.morph_chain.get_endpoint()).host in ips]
|
||||
with reporter.step(f"Return cluster nodes - {cluster_nodes}"):
|
||||
return cluster_nodes
|
||||
|
|
65
src/frostfs_testlib/storage/configuration/interfaces.py
Normal file
65
src/frostfs_testlib/storage/configuration/interfaces.py
Normal file
|
@ -0,0 +1,65 @@
|
|||
from abc import ABC, abstractmethod
|
||||
from typing import Any
|
||||
|
||||
|
||||
class ServiceConfigurationYml(ABC):
|
||||
"""
|
||||
Class to manipulate yml configuration for service
|
||||
"""
|
||||
|
||||
def _find_option(self, key: str, data: dict):
|
||||
tree = key.split(":")
|
||||
current = data
|
||||
for node in tree:
|
||||
if isinstance(current, list) and len(current) - 1 >= int(node):
|
||||
current = current[int(node)]
|
||||
continue
|
||||
|
||||
if node not in current:
|
||||
return None
|
||||
|
||||
current = current[node]
|
||||
|
||||
return current
|
||||
|
||||
def _set_option(self, key: str, value: Any, data: dict):
|
||||
tree = key.split(":")
|
||||
current = data
|
||||
for node in tree[:-1]:
|
||||
if isinstance(current, list) and len(current) - 1 >= int(node):
|
||||
current = current[int(node)]
|
||||
continue
|
||||
|
||||
if node not in current:
|
||||
current[node] = {}
|
||||
|
||||
current = current[node]
|
||||
|
||||
current[tree[-1]] = value
|
||||
|
||||
@abstractmethod
|
||||
def get(self, key: str) -> str:
|
||||
"""
|
||||
Get parameter value from current configuration
|
||||
|
||||
Args:
|
||||
key: key of the parameter in yaml format like 'storage:shard:default:resync_metabase'
|
||||
|
||||
Returns:
|
||||
value of the parameter
|
||||
"""
|
||||
|
||||
@abstractmethod
|
||||
def set(self, values: dict[str, Any]):
|
||||
"""
|
||||
Sets parameters to configuration
|
||||
|
||||
Args:
|
||||
values: dict where key is the key of the parameter in yaml format like 'storage:shard:default:resync_metabase' and value is the value of the option to set
|
||||
"""
|
||||
|
||||
@abstractmethod
|
||||
def revert(self):
|
||||
"""
|
||||
Revert changes
|
||||
"""
|
|
@ -0,0 +1,67 @@
|
|||
import os
|
||||
import re
|
||||
from typing import Any
|
||||
|
||||
import yaml
|
||||
|
||||
from frostfs_testlib.reporter import get_reporter
|
||||
from frostfs_testlib.shell.interfaces import CommandOptions
|
||||
from frostfs_testlib.storage.configuration.interfaces import ServiceConfigurationYml
|
||||
from frostfs_testlib.storage.dataclasses.node_base import ServiceClass
|
||||
|
||||
reporter = get_reporter()
|
||||
|
||||
|
||||
class ServiceConfiguration(ServiceConfigurationYml):
|
||||
def __init__(self, service: "ServiceClass") -> None:
|
||||
self.service = service
|
||||
self.shell = self.service.host.get_shell()
|
||||
self.confd_path = os.path.join(self.service.config_dir, "conf.d")
|
||||
self.custom_file = os.path.join(self.confd_path, "99_changes.yml")
|
||||
|
||||
def _path_exists(self, path: str) -> bool:
|
||||
return not self.shell.exec(f"test -e {path}", options=CommandOptions(check=False)).return_code
|
||||
|
||||
def _get_data_from_file(self, path: str) -> dict:
|
||||
content = self.shell.exec(f"cat {path}").stdout
|
||||
data = yaml.safe_load(content)
|
||||
return data
|
||||
|
||||
def get(self, key: str) -> str:
|
||||
with reporter.step(f"Get {key} configuration value for {self.service}"):
|
||||
config_files = [self.service.main_config_path]
|
||||
|
||||
if self._path_exists(self.confd_path):
|
||||
files = self.shell.exec(f"find {self.confd_path} -type f").stdout.strip().split()
|
||||
# Sorting files in backwards order from latest to first one
|
||||
config_files.extend(sorted(files, key=lambda x: -int(re.findall("^\d+", os.path.basename(x))[0])))
|
||||
|
||||
result = None
|
||||
for file in files:
|
||||
data = self._get_data_from_file(file)
|
||||
result = self._find_option(key, data)
|
||||
if result is not None:
|
||||
break
|
||||
|
||||
return result
|
||||
|
||||
def set(self, values: dict[str, Any]):
|
||||
with reporter.step(f"Change configuration for {self.service}"):
|
||||
if not self._path_exists(self.confd_path):
|
||||
self.shell.exec(f"mkdir {self.confd_path}")
|
||||
|
||||
if self._path_exists(self.custom_file):
|
||||
data = self._get_data_from_file(self.custom_file)
|
||||
else:
|
||||
data = {}
|
||||
|
||||
for key, value in values.items():
|
||||
self._set_option(key, value, data)
|
||||
|
||||
content = yaml.dump(data)
|
||||
self.shell.exec(f"echo '{content}' | sudo tee {self.custom_file}")
|
||||
self.shell.exec(f"chmod 777 {self.custom_file}")
|
||||
|
||||
def revert(self):
|
||||
with reporter.step(f"Revert changed options for {self.service}"):
|
||||
self.shell.exec(f"rm -rf {self.custom_file}")
|
|
@ -3,6 +3,7 @@ class ConfigAttributes:
|
|||
WALLET_PASSWORD = "wallet_password"
|
||||
WALLET_PATH = "wallet_path"
|
||||
WALLET_CONFIG = "wallet_config"
|
||||
CONFIG_DIR = "service_config_dir"
|
||||
CONFIG_PATH = "config_path"
|
||||
SHARD_CONFIG_PATH = "shard_config_path"
|
||||
LOCAL_WALLET_PATH = "local_wallet_path"
|
||||
|
|
|
@ -1,8 +1,10 @@
|
|||
import datetime
|
||||
import time
|
||||
from typing import TypeVar
|
||||
|
||||
import frostfs_testlib.resources.optionals as optionals
|
||||
from frostfs_testlib.healthcheck.interfaces import Healthcheck
|
||||
from frostfs_testlib.plugins import load_all
|
||||
from frostfs_testlib.reporter import get_reporter
|
||||
from frostfs_testlib.shell import CommandOptions, Shell, SshConnectionProvider
|
||||
from frostfs_testlib.steps.network import IfUpDownHelper, IpTablesHelper
|
||||
|
@ -22,6 +24,14 @@ reporter = get_reporter()
|
|||
if_up_down_helper = IfUpDownHelper()
|
||||
|
||||
|
||||
class StateManager:
|
||||
def __init__(self, cluster_state_controller: "ClusterStateController") -> None:
|
||||
self.csc = cluster_state_controller
|
||||
|
||||
|
||||
StateManagerClass = TypeVar("StateManagerClass", bound=StateManager)
|
||||
|
||||
|
||||
class ClusterStateController:
|
||||
def __init__(self, shell: Shell, cluster: Cluster, healthcheck: Healthcheck) -> None:
|
||||
self.stopped_nodes: list[ClusterNode] = []
|
||||
|
@ -33,6 +43,18 @@ class ClusterStateController:
|
|||
self.shell = shell
|
||||
self.suspended_services: dict[str, list[ClusterNode]] = {}
|
||||
self.nodes_with_modified_interface: list[ClusterNode] = []
|
||||
self.managers: list[StateManagerClass] = []
|
||||
|
||||
# TODO: move all functionality to managers
|
||||
managers = set(load_all(group="frostfs.testlib.csc_managers"))
|
||||
for manager in managers:
|
||||
self.managers.append(manager(self))
|
||||
|
||||
def manager(self, manager_type: type[StateManagerClass]) -> StateManagerClass:
|
||||
for manager in self.managers:
|
||||
# Subclasses here for the future if we have overriding subclasses of base interface
|
||||
if issubclass(type(manager), manager_type):
|
||||
return manager
|
||||
|
||||
def _get_stopped_by_node(self, node: ClusterNode) -> set[NodeBase]:
|
||||
stopped_by_node = [svc for svc in self.stopped_services if svc.host == node.host]
|
||||
|
|
|
@ -0,0 +1,51 @@
|
|||
from typing import Any
|
||||
|
||||
from frostfs_testlib.reporter import get_reporter
|
||||
from frostfs_testlib.storage.cluster import ClusterNode
|
||||
from frostfs_testlib.storage.controllers.cluster_state_controller import ClusterStateController, StateManager
|
||||
from frostfs_testlib.storage.dataclasses.node_base import ServiceClass
|
||||
from frostfs_testlib.testing import parallel
|
||||
|
||||
reporter = get_reporter()
|
||||
|
||||
|
||||
class ConfigStateManager(StateManager):
|
||||
def __init__(self, cluster_state_controller: ClusterStateController) -> None:
|
||||
super().__init__(cluster_state_controller)
|
||||
self.services_with_changed_config: set[tuple[ClusterNode, ServiceClass]] = set()
|
||||
self.cluster = self.csc.cluster
|
||||
|
||||
@reporter.step_deco("Change configuration for {service_type} on all nodes")
|
||||
def set_on_all_nodes(self, service_type: type[ServiceClass], values: dict[str, Any]):
|
||||
services = self.cluster.services(service_type)
|
||||
nodes = self.cluster.nodes(services)
|
||||
self.services_with_changed_config.update([(node, service_type) for node in nodes])
|
||||
|
||||
self.csc.stop_services_of_type(service_type)
|
||||
parallel([node.config(service_type).set for node in nodes], values=values)
|
||||
self.csc.start_services_of_type(service_type)
|
||||
|
||||
@reporter.step_deco("Change configuration for {service_type} on {node}")
|
||||
def set_on_node(self, node: ClusterNode, service_type: type[ServiceClass], values: dict[str, Any]):
|
||||
self.services_with_changed_config.add((node, service_type))
|
||||
|
||||
self.csc.stop_service_of_type(node, service_type)
|
||||
node.config(service_type).set(values)
|
||||
self.csc.start_service_of_type(node, service_type)
|
||||
|
||||
@reporter.step_deco("Revert all configuration changes")
|
||||
def revert_all(self):
|
||||
if not self.services_with_changed_config:
|
||||
return
|
||||
|
||||
parallel(self._revert_svc, self.services_with_changed_config)
|
||||
self.services_with_changed_config.clear()
|
||||
|
||||
self.csc.start_all_stopped_services()
|
||||
|
||||
# TODO: parallel can't have multiple parallel_items :(
|
||||
@reporter.step_deco("Revert all configuration {node_and_service}")
|
||||
def _revert_svc(self, node_and_service: tuple[ClusterNode, ServiceClass]):
|
||||
node, service_type = node_and_service
|
||||
self.csc.stop_service_of_type(node, service_type)
|
||||
node.config(service_type).revert()
|
|
@ -120,6 +120,15 @@ class NodeBase(HumanReadableABC):
|
|||
ConfigAttributes.WALLET_CONFIG,
|
||||
)
|
||||
|
||||
@property
|
||||
def config_dir(self) -> str:
|
||||
return self._get_attribute(ConfigAttributes.CONFIG_DIR)
|
||||
|
||||
@property
|
||||
def main_config_path(self) -> str:
|
||||
return self._get_attribute(ConfigAttributes.CONFIG_PATH)
|
||||
|
||||
# TODO: Deprecated
|
||||
def get_config(self, config_file_path: Optional[str] = None) -> tuple[str, dict]:
|
||||
if config_file_path is None:
|
||||
config_file_path = self._get_attribute(ConfigAttributes.CONFIG_PATH)
|
||||
|
@ -132,6 +141,7 @@ class NodeBase(HumanReadableABC):
|
|||
config = yaml.safe_load(config_text)
|
||||
return config_file_path, config
|
||||
|
||||
# TODO: Deprecated
|
||||
def save_config(self, new_config: dict, config_file_path: Optional[str] = None) -> None:
|
||||
if config_file_path is None:
|
||||
config_file_path = self._get_attribute(ConfigAttributes.CONFIG_PATH)
|
||||
|
@ -146,9 +156,7 @@ class NodeBase(HumanReadableABC):
|
|||
storage_wallet_pass = self.get_wallet_password()
|
||||
return wallet_utils.get_wallet_public_key(storage_wallet_path, storage_wallet_pass)
|
||||
|
||||
def _get_attribute(
|
||||
self, attribute_name: str, default_attribute_name: Optional[str] = None
|
||||
) -> str:
|
||||
def _get_attribute(self, attribute_name: str, default_attribute_name: Optional[str] = None) -> str:
|
||||
config = self.host.get_service_config(self.name)
|
||||
|
||||
if attribute_name not in config.attributes:
|
||||
|
|
Loading…
Reference in a new issue