Add tests for node shutdown and object replication integrity

Signed-off-by: Dmitriy Zayakin <d.zayakin@yadro.com>
master
Dmitriy Zayakin 2023-05-03 11:45:44 +03:00 committed by Юлия Ковшова
parent bbf9ea7143
commit a6e1190f23
7 changed files with 450 additions and 22 deletions

View File

@ -1,9 +1,10 @@
import random
import pathlib
import random
import re
from dataclasses import dataclass
from typing import Any
from typing import Any, Optional
import pytest
import yaml
from frostfs_testlib.blockchain import RPCClient
from frostfs_testlib.hosting import Host, Hosting
@ -111,14 +112,14 @@ class InnerRingNode(NodeBase):
since frostfs network will still treat it as "node"
"""
def get_netmap_cleaner_threshold(self) -> str:
def get_netmap_cleaner_threshold(self) -> int:
config_file = self.get_remote_config_path()
contents = self.host.get_shell().exec(f"cat {config_file}").stdout
config = yaml.safe_load(contents)
value = config["netmap_cleaner"]["threshold"]
return value
return int(value)
class S3Gate(NodeBase):
@ -217,6 +218,73 @@ class StorageNode(NodeBase):
return f"{self.name}: {self.get_rpc_endpoint()}"
class ClusterNode:
"""
Represents physical node where multiple different services may be located
"""
host: Host
storage_node: Optional[StorageNode] = None
ir_node: Optional[InnerRingNode] = None
s3_gate: Optional[S3Gate] = None
http_gate: Optional[HTTPGate] = None
morph_chain: Optional[MorphChain] = None
main_chain: Optional[MainChain] = None
def __init__(self, host: Host, nodes: list[NodeBase]) -> None:
self.host = host
attributes_map = {
StorageNode: "storage_node",
InnerRingNode: "ir_node",
S3Gate: "s3_gate",
HTTPGate: "http_gate",
MorphChain: "morph_chain",
MainChain: "main_chain",
}
for node in nodes:
if node.host.config.address == host.config.address:
self.__setattr__(attributes_map[node.__class__], node)
@property
def host_ip(self):
return self.host.config.address
def __eq__(self, other):
return self.host.config.address == other.host.config.address
def __hash__(self):
return id(self.host.config.address)
def __str__(self):
return self.host.config.address
def __repr__(self) -> str:
return self.host.config.address
def get_service_by_type(self, service_type: type[NodeBase]) -> type[NodeBase]:
class_name = service_type.__name__
class_field_map = {
StorageNode.__name__: self.storage_node,
InnerRingNode.__name__: self.ir_node,
S3Gate.__name__: self.s3_gate,
HTTPGate.__name__: self.http_gate,
MorphChain.__name__: self.morph_chain,
}
if class_name not in class_field_map:
raise pytest.fail(f"Invalid type passed {class_name}")
return class_field_map[class_name]
def get_list_of_services(self) -> list[str]:
return [
self.storage_node.get_service_systemctl_name(),
self.ir_node.get_service_systemctl_name(),
self.s3_gate.get_service_systemctl_name(),
self.http_gate.get_service_systemctl_name(),
self.morph_chain.get_service_systemctl_name(),
]
class Cluster:
"""
This class represents a Cluster object for the whole storage based on provided hosting
@ -225,6 +293,7 @@ class Cluster:
default_rpc_endpoint: str
default_s3_gate_endpoint: str
default_http_gate_endpoint: str
cluster_nodes: list[ClusterNode]
def __init__(self, hosting: Hosting) -> None:
self._hosting = hosting
@ -239,6 +308,17 @@ class Cluster:
"""
return self._hosting.hosts
@property
def cluster_nodes(self) -> list[ClusterNode]:
"""
Returns list of Cluster Nodes
"""
return [
ClusterNode(host, self._find_nodes_by_pattern(f".*{id:02d}$"))
for id, host in enumerate(self.hosts, start=1)
]
@property
def hosting(self) -> Hosting:
return self._hosting
@ -304,8 +384,11 @@ class Cluster:
"""
return self._get_nodes(_ServicesNames.INNER_RING)
def _get_nodes(self, service_name) -> list[StorageNode]:
configs = self.hosting.find_service_configs(f"{service_name}\d*$")
def _get_nodes(self, service_name) -> list[NodeBase]:
return self._find_nodes_by_pattern(f"{service_name}\d*$")
def _find_nodes_by_pattern(self, pattern) -> list[NodeBase]:
configs = self.hosting.find_service_configs(pattern)
class_mapping: dict[str, Any] = {
_ServicesNames.STORAGE: StorageNode,
@ -316,15 +399,23 @@ class Cluster:
_ServicesNames.MAIN_CHAIN: MainChain,
}
cls = class_mapping.get(service_name)
return [
cls(
self._get_id(config.name),
config.name,
self.hosting.get_host_by_service(config.name),
found_nodes = []
for config in configs:
# config.name is something like s3-gate01. Cut last digits to know service type
service_type = re.findall(".*\D", config.name)[0]
# exclude unsupported services
if service_type not in class_mapping.keys():
continue
cls = class_mapping.get(service_type)
found_nodes.append(
cls(
self._get_id(config.name),
config.name,
self.hosting.get_host_by_service(config.name),
)
)
for config in configs
]
return found_nodes
def _get_id(self, node_name) -> str:
pattern = "\d*$"

View File

@ -2,11 +2,13 @@ import logging
from time import sleep
import allure
from frostfs_testlib.shell import Shell
from frostfs_testlib.hosting import Host
from frostfs_testlib.shell import CommandOptions, Shell
from pytest_tests.helpers.cluster import Cluster, StorageNode
from pytest_tests.helpers.node_management import storage_node_healthcheck
from pytest_tests.helpers.storage_policy import get_nodes_with_object
from pytest_tests.helpers.test_control import retry
logger = logging.getLogger("NeoLogger")
@ -18,8 +20,9 @@ def wait_object_replication(
expected_copies: int,
shell: Shell,
nodes: list[StorageNode],
sleep_interval: int = 15,
attempts: int = 20,
) -> list[StorageNode]:
sleep_interval, attempts = 15, 20
nodes_with_object = []
for _ in range(attempts):
nodes_with_object = get_nodes_with_object(cid, oid, shell=shell, nodes=nodes)
@ -53,3 +56,20 @@ def is_all_storage_nodes_returned(cluster: Cluster) -> bool:
if health_check.health_status != "READY" or health_check.network_status != "ONLINE":
return False
return True
@allure.step("Ping node")
def ping_host(shell: Shell, host: Host):
options = CommandOptions(check=False)
return shell.exec(f"ping {host.config.address} -c 1", options).return_code
@retry(max_attempts=60, sleep_interval=5, expected_result=1)
@allure.step("Waiting for host of {node} to go offline")
def wait_for_host_offline(shell: Shell, node: StorageNode):
try:
# TODO: Quick solution for now, should be replaced by lib interactions
return ping_host(shell, node.host)
except Exception as err:
logger.warning(f"Host ping fails with error {err}")
return 0

View File

@ -6,7 +6,7 @@ import uuid
from typing import Any, Optional
import allure
from frostfs_testlib.cli import FrostfsCli
from frostfs_testlib.cli import FrostfsCli, NeoGo
from frostfs_testlib.shell import Shell
from frostfs_testlib.utils import json_utils
@ -15,6 +15,7 @@ from pytest_tests.resources.common import (
ASSETS_DIR,
CLI_DEFAULT_TIMEOUT,
FROSTFS_CLI_EXEC,
NEOGO_EXECUTABLE,
WALLET_CONFIG,
)
@ -670,3 +671,34 @@ def head_object(
logger.info("decoding simple header")
return json_utils.decode_simple_header(decoded)
@allure.step("Run neo-go query height")
def neo_go_query_height(shell: Shell, endpoint: str) -> dict:
"""
Run neo-go query height command
Args:
shell: executor for cli command
endpoint: endpoint to execute
Returns:
dict->
Latest block: {value}
Validated state: {value}
"""
neogo = NeoGo(shell, neo_go_exec_path=NEOGO_EXECUTABLE)
output = neogo.query.height(rpc_endpoint=endpoint).stdout
try:
# taking first line from command's output contain the latest block in blockchain
first_line = output.split("\n")[0]
except Exception:
logger.error(f"Got empty output (neo-go query height): {output}")
latest_block = first_line.split(":")
# taking second line from command's output contain wallet key
second_line = output.split("\n")[1]
validated_state = second_line.split(":")
return {
latest_block[0].replace(":", ""): int(latest_block[1]),
validated_state[0].replace(":", ""): int(validated_state[1]),
}

View File

@ -1,6 +1,7 @@
import logging
from functools import wraps
from time import sleep, time
from typing import Any
from _pytest.outcomes import Failed
from pytest import fail
@ -78,3 +79,39 @@ def wait_for_success(max_wait_time: int = 60, interval: int = 1):
return impl
return wrapper
def retry(max_attempts: int, sleep_interval: int = 1, expected_result: Any = None):
"""
Decorator to wait for some conditions/functions to pass successfully.
This is useful if you don't know exact time when something should pass successfully and do not
want to use sleep(X) with too big X.
Be careful though, wrapped function should only check the state of something, not change it.
"""
def wrapper(func):
@wraps(func)
def impl(*a, **kw):
last_exception = None
for _ in range(max_attempts):
try:
actual_result = func(*a, **kw)
if expected_result is not None:
assert expected_result == actual_result
return actual_result
except Exception as ex:
logger.debug(ex)
last_exception = ex
sleep(sleep_interval)
except Failed as ex:
logger.debug(ex)
last_exception = ex
sleep(sleep_interval)
# timeout exceeded with no success, raise last_exception
raise last_exception
return impl
return wrapper

View File

@ -3,7 +3,7 @@ import pytest
from frostfs_testlib.shell import Shell
from pytest_tests.helpers import epoch
from pytest_tests.helpers.cluster import Cluster
from pytest_tests.helpers.cluster import Cluster, StorageNode
# To skip adding every mandatory singleton dependency to EACH test function
@ -18,12 +18,12 @@ class ClusterTestBase:
yield
@allure.title("Tick {epochs_to_tick} epochs")
def tick_epochs(self, epochs_to_tick: int):
def tick_epochs(self, epochs_to_tick: int, alive_node: StorageNode):
for _ in range(epochs_to_tick):
self.tick_epoch()
self.tick_epoch(alive_node)
def tick_epoch(self):
epoch.tick_epoch(self.shell, self.cluster)
def tick_epoch(self, alive_node: StorageNode):
epoch.tick_epoch(self.shell, self.cluster, alive_node=alive_node)
def wait_for_epochs_align(self):
epoch.wait_for_epochs_align(self.shell, self.cluster)

View File

@ -0,0 +1,248 @@
import logging
import os.path
import random
import time
import allure
import pytest
from frostfs_testlib.resources.common import PUBLIC_ACL
from frostfs_testlib.utils import datetime_utils
from pytest import FixtureRequest
from pytest_tests.helpers.cluster import ClusterNode, StorageNode
from pytest_tests.helpers.container import StorageContainer, StorageContainerInfo, create_container
from pytest_tests.helpers.failover_utils import wait_for_host_offline, wait_object_replication
from pytest_tests.helpers.file_helper import get_file_hash
from pytest_tests.helpers.frostfs_verbs import get_object
from pytest_tests.helpers.node_management import (
check_node_in_map,
check_node_not_in_map,
wait_for_node_to_be_ready,
)
from pytest_tests.helpers.storage_object_info import StorageObjectInfo
from pytest_tests.helpers.test_control import wait_for_success
from pytest_tests.helpers.wallet import WalletFile
from pytest_tests.resources.common import MORPH_BLOCK_TIME
from pytest_tests.steps.cluster_test_base import ClusterTestBase
logger = logging.getLogger("NeoLogger")
@pytest.mark.failover
@pytest.mark.failover_server
class TestFailoverServer(ClusterTestBase):
@wait_for_success(max_wait_time=120, interval=1)
def wait_node_not_in_map(self, *args, **kwargs):
check_node_not_in_map(*args, **kwargs)
@wait_for_success(max_wait_time=120, interval=1)
def wait_node_in_map(self, *args, **kwargs):
check_node_in_map(*args, **kwargs)
@allure.step("Create {count_containers} containers and {count_files} objects")
@pytest.fixture
def containers(
self,
request: FixtureRequest,
default_wallet: str,
) -> list[StorageContainer]:
placement_rule = "REP 2 CBF 2 SELECT 2 FROM * AS X"
containers = []
for _ in range(request.param):
cont_id = create_container(
default_wallet,
shell=self.shell,
endpoint=self.cluster.default_rpc_endpoint,
rule=placement_rule,
basic_acl=PUBLIC_ACL,
)
wallet = WalletFile(path=default_wallet)
storage_cont_info = StorageContainerInfo(id=cont_id, wallet_file=wallet)
containers.append(
StorageContainer(
storage_container_info=storage_cont_info, shell=self.shell, cluster=self.cluster
)
)
return containers
@pytest.fixture(
params=[
pytest.lazy_fixture("simple_object_size"),
pytest.lazy_fixture("complex_object_size"),
],
ids=["simple object", "complex object"],
# Scope session to upload/delete each files set only once
scope="class",
)
def object_size(self, request):
return request.param
@allure.step("Create object and delete after test")
@pytest.fixture(scope="class")
def storage_objects(
self, request: FixtureRequest, containers: list[StorageContainer], object_size: int
) -> StorageObjectInfo:
object_list = []
for cont in containers:
for _ in range(request.param):
object_list.append(cont.generate_object(size=object_size))
yield object_list
for storage_object in object_list:
os.remove(storage_object.file_path)
@allure.step("Select random node to stop and start it after test")
@pytest.fixture
def node_to_stop(self) -> ClusterNode:
node = random.choice(self.cluster.cluster_nodes)
yield node
with allure.step(f"start {node.storage_node}"):
node.host.start_host()
with allure.step(f"Waiting status ready for node {node.storage_node}"):
wait_for_node_to_be_ready(node.storage_node)
@allure.step("Upload object with nodes and compare")
def get_corrupted_objects_list(
self, nodes: list[StorageNode], storage_objects: list[StorageObjectInfo]
) -> list[StorageObjectInfo]:
corrupted_objects = []
for node in nodes:
for storage_object in storage_objects:
got_file_path = get_object(
storage_object.wallet_file_path,
storage_object.cid,
storage_object.oid,
endpoint=node.get_rpc_endpoint(),
shell=self.shell,
timeout="60s",
)
if storage_object.file_hash != get_file_hash(got_file_path):
corrupted_objects.append(storage_object)
os.remove(got_file_path)
return corrupted_objects
def check_objects_replication(
self, storage_objects: list[StorageObjectInfo], storage_nodes: list[StorageNode]
) -> None:
for storage_object in storage_objects:
wait_object_replication(
storage_object.cid,
storage_object.oid,
2,
shell=self.shell,
nodes=storage_nodes,
sleep_interval=45,
attempts=60,
)
@allure.title("Full shutdown node")
@pytest.mark.parametrize("containers, storage_objects", [(5, 20)], indirect=True)
def test_complete_node_shutdown(
self,
containers: list[StorageContainer],
storage_objects: list[StorageObjectInfo],
default_wallet: str,
node_to_stop: ClusterNode,
):
with allure.step("Checking that the objects are loader according to the policy"):
self.check_objects_replication(storage_objects, self.cluster.storage_nodes)
with allure.step(f"Remove {node_to_stop} from the list of nodes"):
alive_nodes = list(set(self.cluster.cluster_nodes) - {node_to_stop})
storage_nodes = [cluster.storage_node for cluster in alive_nodes]
with allure.step("Tick epoch"):
self.tick_epochs(1, storage_nodes[0])
with allure.step("Wait 2 block time"):
time.sleep(datetime_utils.parse_time(MORPH_BLOCK_TIME) * 2)
with allure.step(f"Stop {node_to_stop} node"):
node_to_stop.host.stop_host(mode="hard")
with allure.step(f"Check if the node {node_to_stop.storage_node} has stopped"):
wait_for_host_offline(self.shell, node_to_stop.storage_node)
with allure.step("Wait for objects replication"):
self.check_objects_replication(storage_objects, storage_nodes)
with allure.step("Verify that there are no corrupted objects"):
corrupted_objects_list = self.get_corrupted_objects_list(storage_nodes, storage_objects)
assert not corrupted_objects_list
with allure.step(f"check {node_to_stop.storage_node} in map"):
self.wait_node_in_map(
node_to_stop.storage_node, self.shell, alive_node=storage_nodes[0]
)
count_tick_epoch = alive_nodes[0].ir_node.get_netmap_cleaner_threshold() + 1
with allure.step(f"Tick {count_tick_epoch} epoch, in {storage_nodes[0]} node"):
self.tick_epochs(count_tick_epoch, storage_nodes[0])
with allure.step("Wait 2 block time"):
time.sleep(datetime_utils.parse_time(MORPH_BLOCK_TIME) * 2)
with allure.step(f"Check {node_to_stop} in not map"):
self.wait_node_not_in_map(
node_to_stop.storage_node, self.shell, alive_node=storage_nodes[0]
)
with allure.step(
f"Verify that there are no corrupted objects after {count_tick_epoch} epoch"
):
corrupted_objects_list = self.get_corrupted_objects_list(storage_nodes, storage_objects)
assert not corrupted_objects_list
@allure.title("Temporarily disable a node")
@pytest.mark.parametrize("containers, storage_objects", [(1, 2)], indirect=True)
def test_temporarily_disable_a_node(
self,
containers: list[StorageContainer],
storage_objects: list[StorageObjectInfo],
default_wallet: str,
node_to_stop,
):
with allure.step("Checking that the objects are loader according to the policy"):
self.check_objects_replication(storage_objects, self.cluster.storage_nodes)
with allure.step(f"Remove {node_to_stop} from the list of nodes"):
storage_nodes = list(set(self.cluster.storage_nodes) - {node_to_stop.storage_node})
with allure.step("Tick epoch"):
self.tick_epochs(1, storage_nodes[0])
with allure.step("Wait 2 block time"):
time.sleep(datetime_utils.parse_time(MORPH_BLOCK_TIME) * 2)
with allure.step(f"Stop {node_to_stop.storage_node} node"):
node_to_stop.host.stop_host(mode="hard")
with allure.step(f"Check if the node {node_to_stop} has stopped"):
wait_for_host_offline(self.shell, node_to_stop.storage_node)
with allure.step("Wait for objects replication"):
self.check_objects_replication(storage_objects, storage_nodes)
with allure.step("Verify that there are no corrupted objects"):
corrupted_objects_list = self.get_corrupted_objects_list(storage_nodes, storage_objects)
assert not corrupted_objects_list
with allure.step(f"Check {node_to_stop} in map"):
self.wait_node_in_map(
node_to_stop.storage_node, self.shell, alive_node=storage_nodes[0]
)
with allure.step(f"Start {node_to_stop}"):
node_to_stop.host.start_host()
with allure.step("Verify that there are no corrupted objects"):
corrupted_objects_list = self.get_corrupted_objects_list(storage_nodes, storage_objects)
assert not corrupted_objects_list