Add support test maintenance

Signed-off-by: Dmitriy Zayakin <d.zayakin@yadro.com>
This commit is contained in:
Dmitriy Zayakin 2023-11-20 13:54:47 +03:00 committed by Dmitriy Zayakin
parent 22647c6d59
commit ed70dada96
8 changed files with 290 additions and 96 deletions

View file

@ -27,11 +27,7 @@ class FrostfsAdmMorph(CliCommand):
"""
return self._execute(
"morph deposit-notary",
**{
param: param_value
for param, param_value in locals().items()
if param not in ["self"]
},
**{param: param_value for param, param_value in locals().items() if param not in ["self"]},
)
def dump_balances(
@ -56,11 +52,7 @@ class FrostfsAdmMorph(CliCommand):
"""
return self._execute(
"morph dump-balances",
**{
param: param_value
for param, param_value in locals().items()
if param not in ["self"]
},
**{param: param_value for param, param_value in locals().items() if param not in ["self"]},
)
def dump_config(self, rpc_endpoint: str) -> CommandResult:
@ -74,11 +66,25 @@ class FrostfsAdmMorph(CliCommand):
"""
return self._execute(
"morph dump-config",
**{
param: param_value
for param, param_value in locals().items()
if param not in ["self"]
},
**{param: param_value for param, param_value in locals().items() if param not in ["self"]},
)
def set_config(
self, set_key_value: str, rpc_endpoint: Optional[str] = None, alphabet_wallets: Optional[str] = None
) -> CommandResult:
"""Add/update global config value in the FrostFS network.
Args:
set_key_value: key1=val1 [key2=val2 ...]
alphabet_wallets: Path to alphabet wallets dir
rpc_endpoint: N3 RPC node endpoint
Returns:
Command's result.
"""
return self._execute(
f"morph set-config {set_key_value}",
**{param: param_value for param, param_value in locals().items() if param not in ["self", "set_key_value"]},
)
def dump_containers(
@ -101,11 +107,7 @@ class FrostfsAdmMorph(CliCommand):
"""
return self._execute(
"morph dump-containers",
**{
param: param_value
for param, param_value in locals().items()
if param not in ["self"]
},
**{param: param_value for param, param_value in locals().items() if param not in ["self"]},
)
def dump_hashes(self, rpc_endpoint: str) -> CommandResult:
@ -119,11 +121,7 @@ class FrostfsAdmMorph(CliCommand):
"""
return self._execute(
"morph dump-hashes",
**{
param: param_value
for param, param_value in locals().items()
if param not in ["self"]
},
**{param: param_value for param, param_value in locals().items() if param not in ["self"]},
)
def force_new_epoch(
@ -140,11 +138,7 @@ class FrostfsAdmMorph(CliCommand):
"""
return self._execute(
"morph force-new-epoch",
**{
param: param_value
for param, param_value in locals().items()
if param not in ["self"]
},
**{param: param_value for param, param_value in locals().items() if param not in ["self"]},
)
def generate_alphabet(
@ -165,11 +159,7 @@ class FrostfsAdmMorph(CliCommand):
"""
return self._execute(
"morph generate-alphabet",
**{
param: param_value
for param, param_value in locals().items()
if param not in ["self"]
},
**{param: param_value for param, param_value in locals().items() if param not in ["self"]},
)
def generate_storage_wallet(
@ -192,11 +182,7 @@ class FrostfsAdmMorph(CliCommand):
"""
return self._execute(
"morph generate-storage-wallet",
**{
param: param_value
for param, param_value in locals().items()
if param not in ["self"]
},
**{param: param_value for param, param_value in locals().items() if param not in ["self"]},
)
def init(
@ -232,11 +218,7 @@ class FrostfsAdmMorph(CliCommand):
"""
return self._execute(
"morph init",
**{
param: param_value
for param, param_value in locals().items()
if param not in ["self"]
},
**{param: param_value for param, param_value in locals().items() if param not in ["self"]},
)
def refill_gas(
@ -259,11 +241,7 @@ class FrostfsAdmMorph(CliCommand):
"""
return self._execute(
"morph refill-gas",
**{
param: param_value
for param, param_value in locals().items()
if param not in ["self"]
},
**{param: param_value for param, param_value in locals().items() if param not in ["self"]},
)
def restore_containers(
@ -286,11 +264,7 @@ class FrostfsAdmMorph(CliCommand):
"""
return self._execute(
"morph restore-containers",
**{
param: param_value
for param, param_value in locals().items()
if param not in ["self"]
},
**{param: param_value for param, param_value in locals().items() if param not in ["self"]},
)
def set_policy(
@ -348,17 +322,13 @@ class FrostfsAdmMorph(CliCommand):
"""
return self._execute(
"morph update-contracts",
**{
param: param_value
for param, param_value in locals().items()
if param not in ["self"]
},
**{param: param_value for param, param_value in locals().items() if param not in ["self"]},
)
def remove_nodes(
self, node_netmap_keys: list[str], rpc_endpoint: Optional[str] = None, alphabet_wallets: Optional[str] = None
) -> CommandResult:
""" Move node to the Offline state in the candidates list
"""Move node to the Offline state in the candidates list
and tick an epoch to update the netmap using frostfs-adm
Args:

View file

@ -3,6 +3,7 @@ from typing import Optional
from frostfs_testlib.cli.frostfs_cli.accounting import FrostfsCliAccounting
from frostfs_testlib.cli.frostfs_cli.acl import FrostfsCliACL
from frostfs_testlib.cli.frostfs_cli.container import FrostfsCliContainer
from frostfs_testlib.cli.frostfs_cli.control import FrostfsCliControl
from frostfs_testlib.cli.frostfs_cli.netmap import FrostfsCliNetmap
from frostfs_testlib.cli.frostfs_cli.object import FrostfsCliObject
from frostfs_testlib.cli.frostfs_cli.session import FrostfsCliSession
@ -25,6 +26,7 @@ class FrostfsCli:
storagegroup: FrostfsCliStorageGroup
util: FrostfsCliUtil
version: FrostfsCliVersion
control: FrostfsCliControl
def __init__(self, shell: Shell, frostfs_cli_exec_path: str, config_file: Optional[str] = None):
self.accounting = FrostfsCliAccounting(shell, frostfs_cli_exec_path, config=config_file)
@ -38,3 +40,4 @@ class FrostfsCli:
self.util = FrostfsCliUtil(shell, frostfs_cli_exec_path, config=config_file)
self.version = FrostfsCliVersion(shell, frostfs_cli_exec_path, config=config_file)
self.tree = FrostfsCliTree(shell, frostfs_cli_exec_path, config=config_file)
self.control = FrostfsCliControl(shell, frostfs_cli_exec_path, config=config_file)

View file

@ -0,0 +1,58 @@
from typing import Optional
from frostfs_testlib.cli.cli_command import CliCommand
from frostfs_testlib.shell import CommandResult
class FrostfsCliControl(CliCommand):
def set_status(
self,
endpoint: str,
status: str,
wallet: Optional[str] = None,
force: Optional[bool] = None,
address: Optional[str] = None,
timeout: Optional[str] = None,
) -> CommandResult:
"""Set status of the storage node in FrostFS network map
Args:
wallet: Path to the wallet or binary key
address: Address of wallet account
endpoint: Remote node control address (as 'multiaddr' or '<host>:<port>')
force: Force turning to local maintenance
status: New netmap status keyword ('online', 'offline', 'maintenance')
timeout: Timeout for an operation (default 15s)
Returns:
Command`s result.
"""
return self._execute(
"control set-status",
**{param: value for param, value in locals().items() if param not in ["self"]},
)
def healthcheck(
self,
endpoint: str,
wallet: Optional[str] = None,
address: Optional[str] = None,
timeout: Optional[str] = None,
) -> CommandResult:
"""Set status of the storage node in FrostFS network map
Args:
wallet: Path to the wallet or binary key
address: Address of wallet account
endpoint: Remote node control address (as 'multiaddr' or '<host>:<port>')
force: Force turning to local maintenance
status: New netmap status keyword ('online', 'offline', 'maintenance')
timeout: Timeout for an operation (default 15s)
Returns:
Command`s result.
"""
return self._execute(
"control healthcheck",
**{param: value for param, value in locals().items() if param not in ["self"]},
)

View file

@ -0,0 +1,86 @@
import re
from frostfs_testlib.storage.cluster import ClusterNode
from frostfs_testlib.storage.dataclasses.storage_object_info import NodeNetInfo, NodeNetmapInfo
class NetmapParser:
@staticmethod
def netinfo(output: str) -> NodeNetInfo:
regexes = {
"epoch": r"Epoch: (?P<epoch>\d+)",
"network_magic": r"Network magic: (?P<network_magic>.*$)",
"time_per_block": r"Time per block: (?P<time_per_block>\d+\w+)",
"container_fee": r"Container fee: (?P<container_fee>\d+)",
"epoch_duration": r"Epoch duration: (?P<epoch_duration>\d+)",
"inner_ring_candidate_fee": r"Inner Ring candidate fee: (?P<inner_ring_candidate_fee>\d+)",
"maximum_object_size": r"Maximum object size: (?P<maximum_object_size>\d+)",
"withdrawal_fee": r"Withdrawal fee: (?P<withdrawal_fee>\d+)",
"homomorphic_hashing_disabled": r"Homomorphic hashing disabled: (?P<homomorphic_hashing_disabled>true|false)",
"maintenance_mode_allowed": r"Maintenance mode allowed: (?P<maintenance_mode_allowed>true|false)",
"eigen_trust_alpha": r"EigenTrustAlpha: (?P<eigen_trust_alpha>\d+\w+$)",
"eigen_trust_iterations": r"EigenTrustIterations: (?P<eigen_trust_iterations>\d+)",
}
parse_result = {}
for key, regex in regexes.items():
search_result = re.search(regex, output, flags=re.MULTILINE)
if search_result == None:
parse_result[key] = None
continue
parse_result[key] = search_result[key].strip()
node_netinfo = NodeNetInfo(**parse_result)
return node_netinfo
@staticmethod
def snapshot_all_nodes(output: str) -> list[NodeNetmapInfo]:
"""The code will parse each line and return each node as dataclass."""
netmap_nodes = output.split("Node ")[1:]
dataclasses_netmap = []
result_netmap = {}
regexes = {
"node_id": r"\d+: (?P<node_id>\w+)",
"node_data_ips": r"(?P<node_data_ips>/ip4/.+?)$",
"node_status": r"(?P<node_status>ONLINE|OFFLINE)",
"cluster_name": r"ClusterName: (?P<cluster_name>\w+)",
"continent": r"Continent: (?P<continent>\w+)",
"country": r"Country: (?P<country>\w+)",
"country_code": r"CountryCode: (?P<country_code>\w+)",
"external_address": r"ExternalAddr: (?P<external_address>/ip[4].+?)$",
"location": r"Location: (?P<location>\w+.*)",
"node": r"Node: (?P<node>\d+\.\d+\.\d+\.\d+)",
"price": r"Price: (?P<price>\d+)",
"sub_div": r"SubDiv: (?P<sub_div>.*)",
"sub_div_code": r"SubDivCode: (?P<sub_div_code>\w+)",
"un_locode": r"UN-LOCODE: (?P<un_locode>\w+.*)",
"role": r"role: (?P<role>\w+)",
}
for node in netmap_nodes:
for key, regex in regexes.items():
search_result = re.search(regex, node, flags=re.MULTILINE)
if key == "node_data_ips":
result_netmap[key] = search_result[key].strip().split(" ")
continue
if key == "external_address":
result_netmap[key] = search_result[key].strip().split(",")
continue
if search_result == None:
result_netmap[key] = None
continue
result_netmap[key] = search_result[key].strip()
dataclasses_netmap.append(NodeNetmapInfo(**result_netmap))
return dataclasses_netmap
@staticmethod
def snapshot_one_node(output: str, cluster_node: ClusterNode) -> NodeNetmapInfo | None:
snapshot_nodes = NetmapParser.snapshot_all_nodes(output=output)
snapshot_node = [node for node in snapshot_nodes if node.node == cluster_node.host_ip]
if not snapshot_node:
return None
return snapshot_node[0]

View file

@ -62,7 +62,8 @@ class LocalShell(Shell):
if options.check and result.return_code != 0:
raise RuntimeError(
f"Command: {command}\nreturn code: {result.return_code}\n"
f"Output: {result.stdout}"
f"Output: {result.stdout}\n"
f"Stderr: {result.stderr}\n"
)
return result
@ -94,9 +95,7 @@ class LocalShell(Shell):
return_code=exc.returncode,
)
raise RuntimeError(
f"Command: {command}\nError:\n"
f"return code: {exc.returncode}\n"
f"output: {exc.output}"
f"Command: {command}\nError:\n" f"return code: {exc.returncode}\n" f"output: {exc.output}"
) from exc
except OSError as exc:
raise RuntimeError(f"Command: {command}\nOutput: {exc.strerror}") from exc

View file

@ -6,27 +6,11 @@ from functools import lru_cache, wraps
from time import sleep
from typing import ClassVar, Optional, Tuple
from paramiko import (
AutoAddPolicy,
Channel,
ECDSAKey,
Ed25519Key,
PKey,
RSAKey,
SSHClient,
SSHException,
ssh_exception,
)
from paramiko import AutoAddPolicy, Channel, ECDSAKey, Ed25519Key, PKey, RSAKey, SSHClient, SSHException, ssh_exception
from paramiko.ssh_exception import AuthenticationException
from frostfs_testlib.reporter import get_reporter
from frostfs_testlib.shell.interfaces import (
CommandInspector,
CommandOptions,
CommandResult,
Shell,
SshCredentials,
)
from frostfs_testlib.shell.interfaces import CommandInspector, CommandOptions, CommandResult, Shell, SshCredentials
logger = logging.getLogger("frostfs.testlib.shell")
reporter = get_reporter()
@ -97,8 +81,7 @@ class SshConnectionProvider:
)
else:
logger.info(
f"Trying to connect to host {host} as {creds.ssh_login} using password "
f"(attempt {attempt})"
f"Trying to connect to host {host} as {creds.ssh_login} using password " f"(attempt {attempt})"
)
connection.connect(
hostname=host,
@ -141,9 +124,7 @@ class HostIsNotAvailable(Exception):
def log_command(func):
@wraps(func)
def wrapper(
shell: "SSHShell", command: str, options: CommandOptions, *args, **kwargs
) -> CommandResult:
def wrapper(shell: "SSHShell", command: str, options: CommandOptions, *args, **kwargs) -> CommandResult:
command_info = command.removeprefix("$ProgressPreference='SilentlyContinue'\n")
with reporter.step(command_info):
logger.info(f'Execute command "{command}" on "{shell.host}"')
@ -238,15 +219,13 @@ class SSHShell(Shell):
if options.check and result.return_code != 0:
raise RuntimeError(
f"Command: {command}\nreturn code: {result.return_code}\nOutput: {result.stdout}"
f"Command: {command}\nreturn code: {result.return_code}\nOutput: {result.stdout}\nStderr: {result.stderr}\n"
)
return result
@log_command
def _exec_interactive(self, command: str, options: CommandOptions) -> CommandResult:
stdin, stdout, stderr = self._connection.exec_command(
command, timeout=options.timeout, get_pty=True
)
stdin, stdout, stderr = self._connection.exec_command(command, timeout=options.timeout, get_pty=True)
for interactive_input in options.interactive_inputs:
input = interactive_input.input
if not input.endswith("\n"):

View file

@ -3,9 +3,13 @@ import time
from typing import TypeVar
import frostfs_testlib.resources.optionals as optionals
from frostfs_testlib.cli import FrostfsAdm, FrostfsCli
from frostfs_testlib.cli.netmap_parser import NetmapParser
from frostfs_testlib.healthcheck.interfaces import Healthcheck
from frostfs_testlib.plugins import load_all
from frostfs_testlib.reporter import get_reporter
from frostfs_testlib.resources.cli import FROSTFS_ADM_CONFIG_PATH, FROSTFS_ADM_EXEC, FROSTFS_CLI_EXEC
from frostfs_testlib.resources.common import DEFAULT_WALLET_CONFIG, MORPH_BLOCK_TIME
from frostfs_testlib.shell import CommandOptions, Shell, SshConnectionProvider
from frostfs_testlib.steps.network import IfUpDownHelper, IpTablesHelper
from frostfs_testlib.storage.cluster import Cluster, ClusterNode, S3Gate, StorageNode
@ -13,6 +17,7 @@ from frostfs_testlib.storage.controllers.disk_controller import DiskController
from frostfs_testlib.storage.dataclasses.node_base import NodeBase, ServiceClass
from frostfs_testlib.testing import parallel
from frostfs_testlib.testing.test_control import run_optionally, wait_for_success
from frostfs_testlib.utils.datetime_utils import parse_time
from frostfs_testlib.utils.failover_utils import (
wait_all_storage_nodes_returned,
wait_for_host_offline,
@ -426,6 +431,79 @@ class ClusterStateController:
return
parallel(self._disable_date_synchronizer, self.cluster.cluster_nodes)
@reporter.step_deco("Set MaintenanceModeAllowed - {status}")
def set_maintenance_mode_allowed(self, status: str, cluster_node: ClusterNode) -> None:
frostfs_adm = FrostfsAdm(
shell=cluster_node.host.get_shell(),
frostfs_adm_exec_path=FROSTFS_ADM_EXEC,
config_file=FROSTFS_ADM_CONFIG_PATH,
)
frostfs_adm.morph.set_config(set_key_value=f"MaintenanceModeAllowed={status}")
@reporter.step_deco("Set mode node to {status}")
def set_mode_node(self, cluster_node: ClusterNode, wallet: str, status: str, await_tick: bool = True) -> None:
rpc_endpoint = cluster_node.storage_node.get_rpc_endpoint()
control_endpoint = cluster_node.service(StorageNode).get_control_endpoint()
frostfs_adm, frostfs_cli, frostfs_cli_remote = self._get_cli(local_shell=self.shell, cluster_node=cluster_node)
node_netinfo = NetmapParser.netinfo(frostfs_cli.netmap.netinfo(rpc_endpoint=rpc_endpoint, wallet=wallet).stdout)
with reporter.step("If status maintenance, then check that the option is enabled"):
if node_netinfo.maintenance_mode_allowed == "false":
frostfs_adm.morph.set_config(set_key_value="MaintenanceModeAllowed=true")
with reporter.step(f"Change the status to {status}"):
frostfs_cli_remote.control.set_status(endpoint=control_endpoint, status=status)
if not await_tick:
return
with reporter.step("Tick 1 epoch, and await 2 block"):
frostfs_adm.morph.force_new_epoch()
time.sleep(parse_time(MORPH_BLOCK_TIME) * 2)
self.check_node_status(status=status, wallet=wallet, cluster_node=cluster_node)
@wait_for_success(80, 8)
@reporter.step_deco("Check status node, status - {status}")
def check_node_status(self, status: str, wallet: str, cluster_node: ClusterNode):
frostfs_cli = FrostfsCli(
shell=self.shell, frostfs_cli_exec_path=FROSTFS_CLI_EXEC, config_file=DEFAULT_WALLET_CONFIG
)
netmap = NetmapParser.snapshot_all_nodes(
frostfs_cli.netmap.snapshot(rpc_endpoint=cluster_node.storage_node.get_rpc_endpoint(), wallet=wallet).stdout
)
netmap = [node for node in netmap if cluster_node.host_ip == node.node]
if status == "offline":
assert cluster_node.host_ip not in netmap, f"{cluster_node.host_ip} not in Offline"
else:
assert netmap[0].node_status == status.upper(), f"Node state - {netmap[0].node_status} != {status} expect"
def _get_cli(self, local_shell: Shell, cluster_node: ClusterNode) -> tuple[FrostfsAdm, FrostfsCli, FrostfsCli]:
# TODO Move to service config
host = cluster_node.host
service_config = host.get_service_config(cluster_node.storage_node.name)
wallet_path = service_config.attributes["wallet_path"]
wallet_password = service_config.attributes["wallet_password"]
shell = host.get_shell()
wallet_config_path = f"/tmp/{cluster_node.storage_node.name}-config.yaml"
wallet_config = f'wallet: {wallet_path}\npassword: "{wallet_password}"'
shell.exec(f"echo '{wallet_config}' > {wallet_config_path}")
frostfs_adm = FrostfsAdm(
shell=shell, frostfs_adm_exec_path=FROSTFS_ADM_EXEC, config_file=FROSTFS_ADM_CONFIG_PATH
)
frostfs_cli = FrostfsCli(
shell=local_shell, frostfs_cli_exec_path=FROSTFS_CLI_EXEC, config_file=DEFAULT_WALLET_CONFIG
)
frostfs_cli_remote = FrostfsCli(
shell=shell,
frostfs_cli_exec_path=FROSTFS_CLI_EXEC,
config_file=wallet_config_path,
)
return frostfs_adm, frostfs_cli, frostfs_cli_remote
def _enable_date_synchronizer(self, cluster_node: ClusterNode):
shell = cluster_node.host.get_shell()
shell.exec("timedatectl set-ntp true")

View file

@ -1,5 +1,4 @@
from dataclasses import dataclass
from enum import Enum
from typing import Optional
from frostfs_testlib.testing.readable import HumanReadableEnum
@ -28,10 +27,16 @@ class StorageObjectInfo(ObjectRef):
locks: Optional[list[LockObjectInfo]] = None
class ModeNode(HumanReadableEnum):
MAINTENANCE: str = "maintenance"
ONLINE: str = "online"
OFFLINE: str = "offline"
@dataclass
class NodeNetmapInfo:
node_id: str = None
node_status: str = None
node_status: ModeNode = None
node_data_ips: list[str] = None
cluster_name: str = None
continent: str = None
@ -53,3 +58,19 @@ class Interfaces(HumanReadableEnum):
MGMT: str = "mgmt"
INTERNAL_0: str = "internal0"
INTERNAL_1: str = "internal1"
@dataclass
class NodeNetInfo:
epoch: str = None
network_magic: str = None
time_per_block: str = None
container_fee: str = None
epoch_duration: str = None
inner_ring_candidate_fee: str = None
maximum_object_size: str = None
withdrawal_fee: str = None
homomorphic_hashing_disabled: str = None
maintenance_mode_allowed: str = None
eigen_trust_alpha: str = None
eigen_trust_iterations: str = None