Add tests for node shutdown and object replication integrity #34
8 changed files with 451 additions and 23 deletions
|
@ -3,7 +3,7 @@ repos:
|
||||||
rev: 22.8.0
|
rev: 22.8.0
|
||||||
hooks:
|
hooks:
|
||||||
- id: black
|
- id: black
|
||||||
language_version: python3.9
|
language_version: python3.10
|
||||||
- repo: https://github.com/pycqa/isort
|
- repo: https://github.com/pycqa/isort
|
||||||
rev: 5.12.0
|
rev: 5.12.0
|
||||||
hooks:
|
hooks:
|
||||||
|
|
|
@ -1,9 +1,10 @@
|
||||||
import random
|
|
||||||
import pathlib
|
import pathlib
|
||||||
|
import random
|
||||||
import re
|
import re
|
||||||
from dataclasses import dataclass
|
from dataclasses import dataclass
|
||||||
from typing import Any
|
from typing import Any, Optional
|
||||||
|
|
||||||
|
import pytest
|
||||||
import yaml
|
import yaml
|
||||||
from frostfs_testlib.blockchain import RPCClient
|
from frostfs_testlib.blockchain import RPCClient
|
||||||
from frostfs_testlib.hosting import Host, Hosting
|
from frostfs_testlib.hosting import Host, Hosting
|
||||||
|
@ -111,14 +112,14 @@ class InnerRingNode(NodeBase):
|
||||||
since frostfs network will still treat it as "node"
|
since frostfs network will still treat it as "node"
|
||||||
"""
|
"""
|
||||||
|
|
||||||
def get_netmap_cleaner_threshold(self) -> str:
|
def get_netmap_cleaner_threshold(self) -> int:
|
||||||
config_file = self.get_remote_config_path()
|
config_file = self.get_remote_config_path()
|
||||||
contents = self.host.get_shell().exec(f"cat {config_file}").stdout
|
contents = self.host.get_shell().exec(f"cat {config_file}").stdout
|
||||||
|
|
||||||
config = yaml.safe_load(contents)
|
config = yaml.safe_load(contents)
|
||||||
value = config["netmap_cleaner"]["threshold"]
|
value = config["netmap_cleaner"]["threshold"]
|
||||||
|
|
||||||
return value
|
return int(value)
|
||||||
|
|
||||||
|
|
||||||
class S3Gate(NodeBase):
|
class S3Gate(NodeBase):
|
||||||
|
@ -217,6 +218,73 @@ class StorageNode(NodeBase):
|
||||||
return f"{self.name}: {self.get_rpc_endpoint()}"
|
return f"{self.name}: {self.get_rpc_endpoint()}"
|
||||||
|
|
||||||
|
|
||||||
|
class ClusterNode:
|
||||||
|
"""
|
||||||
|
Represents physical node where multiple different services may be located
|
||||||
|
"""
|
||||||
|
|
||||||
|
host: Host
|
||||||
|
storage_node: Optional[StorageNode] = None
|
||||||
|
ir_node: Optional[InnerRingNode] = None
|
||||||
|
s3_gate: Optional[S3Gate] = None
|
||||||
|
http_gate: Optional[HTTPGate] = None
|
||||||
|
morph_chain: Optional[MorphChain] = None
|
||||||
|
main_chain: Optional[MainChain] = None
|
||||||
|
|
||||||
|
def __init__(self, host: Host, nodes: list[NodeBase]) -> None:
|
||||||
|
self.host = host
|
||||||
|
attributes_map = {
|
||||||
|
StorageNode: "storage_node",
|
||||||
|
InnerRingNode: "ir_node",
|
||||||
|
S3Gate: "s3_gate",
|
||||||
|
HTTPGate: "http_gate",
|
||||||
|
MorphChain: "morph_chain",
|
||||||
|
MainChain: "main_chain",
|
||||||
|
}
|
||||||
|
|
||||||
|
for node in nodes:
|
||||||
|
if node.host.config.address == host.config.address:
|
||||||
|
self.__setattr__(attributes_map[node.__class__], node)
|
||||||
|
|
||||||
|
@property
|
||||||
|
def host_ip(self):
|
||||||
|
return self.host.config.address
|
||||||
|
|
||||||
|
def __eq__(self, other):
|
||||||
|
return self.host.config.address == other.host.config.address
|
||||||
|
|
||||||
|
def __hash__(self):
|
||||||
|
return id(self.host.config.address)
|
||||||
|
|
||||||
|
def __str__(self):
|
||||||
|
return self.host.config.address
|
||||||
|
|
||||||
|
def __repr__(self) -> str:
|
||||||
|
return self.host.config.address
|
||||||
|
|
||||||
|
def get_service_by_type(self, service_type: type[NodeBase]) -> type[NodeBase]:
|
||||||
|
class_name = service_type.__name__
|
||||||
|
class_field_map = {
|
||||||
|
StorageNode.__name__: self.storage_node,
|
||||||
|
InnerRingNode.__name__: self.ir_node,
|
||||||
|
S3Gate.__name__: self.s3_gate,
|
||||||
|
HTTPGate.__name__: self.http_gate,
|
||||||
|
MorphChain.__name__: self.morph_chain,
|
||||||
|
}
|
||||||
|
if class_name not in class_field_map:
|
||||||
|
raise pytest.fail(f"Invalid type passed {class_name}")
|
||||||
|
return class_field_map[class_name]
|
||||||
|
|
||||||
|
def get_list_of_services(self) -> list[str]:
|
||||||
|
return [
|
||||||
|
self.storage_node.get_service_systemctl_name(),
|
||||||
|
self.ir_node.get_service_systemctl_name(),
|
||||||
|
self.s3_gate.get_service_systemctl_name(),
|
||||||
|
self.http_gate.get_service_systemctl_name(),
|
||||||
|
self.morph_chain.get_service_systemctl_name(),
|
||||||
|
]
|
||||||
|
|
||||||
|
|
||||||
class Cluster:
|
class Cluster:
|
||||||
"""
|
"""
|
||||||
This class represents a Cluster object for the whole storage based on provided hosting
|
This class represents a Cluster object for the whole storage based on provided hosting
|
||||||
|
@ -225,6 +293,7 @@ class Cluster:
|
||||||
default_rpc_endpoint: str
|
default_rpc_endpoint: str
|
||||||
default_s3_gate_endpoint: str
|
default_s3_gate_endpoint: str
|
||||||
default_http_gate_endpoint: str
|
default_http_gate_endpoint: str
|
||||||
|
cluster_nodes: list[ClusterNode]
|
||||||
|
|
||||||
def __init__(self, hosting: Hosting) -> None:
|
def __init__(self, hosting: Hosting) -> None:
|
||||||
self._hosting = hosting
|
self._hosting = hosting
|
||||||
|
@ -239,6 +308,17 @@ class Cluster:
|
||||||
"""
|
"""
|
||||||
return self._hosting.hosts
|
return self._hosting.hosts
|
||||||
|
|
||||||
|
@property
|
||||||
|
def cluster_nodes(self) -> list[ClusterNode]:
|
||||||
|
"""
|
||||||
|
Returns list of Cluster Nodes
|
||||||
|
"""
|
||||||
|
|
||||||
|
return [
|
||||||
|
ClusterNode(host, self._find_nodes_by_pattern(f".*{id:02d}$"))
|
||||||
|
for id, host in enumerate(self.hosts, start=1)
|
||||||
|
]
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def hosting(self) -> Hosting:
|
def hosting(self) -> Hosting:
|
||||||
return self._hosting
|
return self._hosting
|
||||||
|
@ -304,8 +384,11 @@ class Cluster:
|
||||||
"""
|
"""
|
||||||
return self._get_nodes(_ServicesNames.INNER_RING)
|
return self._get_nodes(_ServicesNames.INNER_RING)
|
||||||
|
|
||||||
def _get_nodes(self, service_name) -> list[StorageNode]:
|
def _get_nodes(self, service_name) -> list[NodeBase]:
|
||||||
configs = self.hosting.find_service_configs(f"{service_name}\d*$")
|
return self._find_nodes_by_pattern(f"{service_name}\d*$")
|
||||||
|
|
||||||
|
def _find_nodes_by_pattern(self, pattern) -> list[NodeBase]:
|
||||||
|
configs = self.hosting.find_service_configs(pattern)
|
||||||
|
|
||||||
class_mapping: dict[str, Any] = {
|
class_mapping: dict[str, Any] = {
|
||||||
_ServicesNames.STORAGE: StorageNode,
|
_ServicesNames.STORAGE: StorageNode,
|
||||||
|
@ -316,15 +399,23 @@ class Cluster:
|
||||||
_ServicesNames.MAIN_CHAIN: MainChain,
|
_ServicesNames.MAIN_CHAIN: MainChain,
|
||||||
}
|
}
|
||||||
|
|
||||||
cls = class_mapping.get(service_name)
|
found_nodes = []
|
||||||
return [
|
for config in configs:
|
||||||
cls(
|
# config.name is something like s3-gate01. Cut last digits to know service type
|
||||||
self._get_id(config.name),
|
service_type = re.findall(".*\D", config.name)[0]
|
||||||
config.name,
|
# exclude unsupported services
|
||||||
self.hosting.get_host_by_service(config.name),
|
if service_type not in class_mapping.keys():
|
||||||
|
continue
|
||||||
|
|
||||||
|
cls = class_mapping.get(service_type)
|
||||||
|
found_nodes.append(
|
||||||
|
cls(
|
||||||
|
self._get_id(config.name),
|
||||||
|
config.name,
|
||||||
|
self.hosting.get_host_by_service(config.name),
|
||||||
|
)
|
||||||
)
|
)
|
||||||
for config in configs
|
return found_nodes
|
||||||
]
|
|
||||||
|
|
||||||
def _get_id(self, node_name) -> str:
|
def _get_id(self, node_name) -> str:
|
||||||
pattern = "\d*$"
|
pattern = "\d*$"
|
||||||
|
|
|
@ -2,11 +2,13 @@ import logging
|
||||||
from time import sleep
|
from time import sleep
|
||||||
|
|
||||||
import allure
|
import allure
|
||||||
from frostfs_testlib.shell import Shell
|
from frostfs_testlib.hosting import Host
|
||||||
|
from frostfs_testlib.shell import CommandOptions, Shell
|
||||||
|
|
||||||
from pytest_tests.helpers.cluster import Cluster, StorageNode
|
from pytest_tests.helpers.cluster import Cluster, StorageNode
|
||||||
from pytest_tests.helpers.node_management import storage_node_healthcheck
|
from pytest_tests.helpers.node_management import storage_node_healthcheck
|
||||||
from pytest_tests.helpers.storage_policy import get_nodes_with_object
|
from pytest_tests.helpers.storage_policy import get_nodes_with_object
|
||||||
|
from pytest_tests.helpers.test_control import retry
|
||||||
|
|
||||||
logger = logging.getLogger("NeoLogger")
|
logger = logging.getLogger("NeoLogger")
|
||||||
|
|
||||||
|
@ -18,8 +20,9 @@ def wait_object_replication(
|
||||||
expected_copies: int,
|
expected_copies: int,
|
||||||
shell: Shell,
|
shell: Shell,
|
||||||
nodes: list[StorageNode],
|
nodes: list[StorageNode],
|
||||||
|
sleep_interval: int = 15,
|
||||||
|
attempts: int = 20,
|
||||||
) -> list[StorageNode]:
|
) -> list[StorageNode]:
|
||||||
sleep_interval, attempts = 15, 20
|
|
||||||
nodes_with_object = []
|
nodes_with_object = []
|
||||||
for _ in range(attempts):
|
for _ in range(attempts):
|
||||||
nodes_with_object = get_nodes_with_object(cid, oid, shell=shell, nodes=nodes)
|
nodes_with_object = get_nodes_with_object(cid, oid, shell=shell, nodes=nodes)
|
||||||
|
@ -53,3 +56,20 @@ def is_all_storage_nodes_returned(cluster: Cluster) -> bool:
|
||||||
if health_check.health_status != "READY" or health_check.network_status != "ONLINE":
|
if health_check.health_status != "READY" or health_check.network_status != "ONLINE":
|
||||||
return False
|
return False
|
||||||
return True
|
return True
|
||||||
|
|
||||||
|
|
||||||
|
@allure.step("Ping node")
|
||||||
|
def ping_host(shell: Shell, host: Host):
|
||||||
|
options = CommandOptions(check=False)
|
||||||
|
return shell.exec(f"ping {host.config.address} -c 1", options).return_code
|
||||||
|
|
||||||
|
|
||||||
|
@retry(max_attempts=60, sleep_interval=5, expected_result=1)
|
||||||
|
@allure.step("Waiting for host of {node} to go offline")
|
||||||
|
def wait_for_host_offline(shell: Shell, node: StorageNode):
|
||||||
|
try:
|
||||||
|
# TODO: Quick solution for now, should be replaced by lib interactions
|
||||||
|
return ping_host(shell, node.host)
|
||||||
|
except Exception as err:
|
||||||
|
logger.warning(f"Host ping fails with error {err}")
|
||||||
|
return 0
|
||||||
|
|
|
@ -6,7 +6,7 @@ import uuid
|
||||||
from typing import Any, Optional
|
from typing import Any, Optional
|
||||||
|
|
||||||
import allure
|
import allure
|
||||||
from frostfs_testlib.cli import FrostfsCli
|
from frostfs_testlib.cli import FrostfsCli, NeoGo
|
||||||
from frostfs_testlib.shell import Shell
|
from frostfs_testlib.shell import Shell
|
||||||
from frostfs_testlib.utils import json_utils
|
from frostfs_testlib.utils import json_utils
|
||||||
|
|
||||||
|
@ -15,6 +15,7 @@ from pytest_tests.resources.common import (
|
||||||
ASSETS_DIR,
|
ASSETS_DIR,
|
||||||
CLI_DEFAULT_TIMEOUT,
|
CLI_DEFAULT_TIMEOUT,
|
||||||
FROSTFS_CLI_EXEC,
|
FROSTFS_CLI_EXEC,
|
||||||
|
NEOGO_EXECUTABLE,
|
||||||
WALLET_CONFIG,
|
WALLET_CONFIG,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -670,3 +671,34 @@ def head_object(
|
||||||
|
|
||||||
logger.info("decoding simple header")
|
logger.info("decoding simple header")
|
||||||
return json_utils.decode_simple_header(decoded)
|
return json_utils.decode_simple_header(decoded)
|
||||||
|
|
||||||
|
|
||||||
|
@allure.step("Run neo-go query height")
|
||||||
|
def neo_go_query_height(shell: Shell, endpoint: str) -> dict:
|
||||||
|
"""
|
||||||
|
Run neo-go query height command
|
||||||
|
|
||||||
|
Args:
|
||||||
|
shell: executor for cli command
|
||||||
|
endpoint: endpoint to execute
|
||||||
|
Returns:
|
||||||
|
dict->
|
||||||
|
Latest block: {value}
|
||||||
|
Validated state: {value}
|
||||||
|
|
||||||
|
"""
|
||||||
|
neogo = NeoGo(shell, neo_go_exec_path=NEOGO_EXECUTABLE)
|
||||||
|
output = neogo.query.height(rpc_endpoint=endpoint).stdout
|
||||||
|
try:
|
||||||
|
# taking first line from command's output contain the latest block in blockchain
|
||||||
|
first_line = output.split("\n")[0]
|
||||||
|
except Exception:
|
||||||
|
logger.error(f"Got empty output (neo-go query height): {output}")
|
||||||
|
latest_block = first_line.split(":")
|
||||||
|
# taking second line from command's output contain wallet key
|
||||||
|
second_line = output.split("\n")[1]
|
||||||
|
validated_state = second_line.split(":")
|
||||||
|
return {
|
||||||
|
latest_block[0].replace(":", ""): int(latest_block[1]),
|
||||||
|
validated_state[0].replace(":", ""): int(validated_state[1]),
|
||||||
|
}
|
||||||
|
|
|
@ -1,6 +1,7 @@
|
||||||
import logging
|
import logging
|
||||||
from functools import wraps
|
from functools import wraps
|
||||||
from time import sleep, time
|
from time import sleep, time
|
||||||
|
from typing import Any
|
||||||
|
|
||||||
from _pytest.outcomes import Failed
|
from _pytest.outcomes import Failed
|
||||||
from pytest import fail
|
from pytest import fail
|
||||||
|
@ -78,3 +79,39 @@ def wait_for_success(max_wait_time: int = 60, interval: int = 1):
|
||||||
return impl
|
return impl
|
||||||
|
|
||||||
return wrapper
|
return wrapper
|
||||||
|
|
||||||
|
|
||||||
|
def retry(max_attempts: int, sleep_interval: int = 1, expected_result: Any = None):
|
||||||
|
"""
|
||||||
|
Decorator to wait for some conditions/functions to pass successfully.
|
||||||
|
This is useful if you don't know exact time when something should pass successfully and do not
|
||||||
|
want to use sleep(X) with too big X.
|
||||||
|
|
||||||
|
Be careful though, wrapped function should only check the state of something, not change it.
|
||||||
|
"""
|
||||||
|
|
||||||
|
def wrapper(func):
|
||||||
|
@wraps(func)
|
||||||
|
def impl(*a, **kw):
|
||||||
|
last_exception = None
|
||||||
|
for _ in range(max_attempts):
|
||||||
|
try:
|
||||||
|
actual_result = func(*a, **kw)
|
||||||
|
if expected_result is not None:
|
||||||
|
assert expected_result == actual_result
|
||||||
|
return actual_result
|
||||||
|
except Exception as ex:
|
||||||
|
logger.debug(ex)
|
||||||
|
last_exception = ex
|
||||||
|
sleep(sleep_interval)
|
||||||
|
except Failed as ex:
|
||||||
|
logger.debug(ex)
|
||||||
|
last_exception = ex
|
||||||
|
sleep(sleep_interval)
|
||||||
|
|
||||||
|
# timeout exceeded with no success, raise last_exception
|
||||||
|
raise last_exception
|
||||||
|
|
||||||
|
return impl
|
||||||
|
|
||||||
|
return wrapper
|
||||||
|
|
|
@ -3,7 +3,7 @@ import pytest
|
||||||
from frostfs_testlib.shell import Shell
|
from frostfs_testlib.shell import Shell
|
||||||
|
|
||||||
from pytest_tests.helpers import epoch
|
from pytest_tests.helpers import epoch
|
||||||
from pytest_tests.helpers.cluster import Cluster
|
from pytest_tests.helpers.cluster import Cluster, StorageNode
|
||||||
|
|
||||||
|
|
||||||
# To skip adding every mandatory singleton dependency to EACH test function
|
# To skip adding every mandatory singleton dependency to EACH test function
|
||||||
|
@ -18,12 +18,12 @@ class ClusterTestBase:
|
||||||
yield
|
yield
|
||||||
|
|
||||||
@allure.title("Tick {epochs_to_tick} epochs")
|
@allure.title("Tick {epochs_to_tick} epochs")
|
||||||
def tick_epochs(self, epochs_to_tick: int):
|
def tick_epochs(self, epochs_to_tick: int, alive_node: StorageNode):
|
||||||
for _ in range(epochs_to_tick):
|
for _ in range(epochs_to_tick):
|
||||||
self.tick_epoch()
|
self.tick_epoch(alive_node)
|
||||||
|
|
||||||
def tick_epoch(self):
|
def tick_epoch(self, alive_node: StorageNode):
|
||||||
epoch.tick_epoch(self.shell, self.cluster)
|
epoch.tick_epoch(self.shell, self.cluster, alive_node=alive_node)
|
||||||
|
|
||||||
def wait_for_epochs_align(self):
|
def wait_for_epochs_align(self):
|
||||||
epoch.wait_for_epochs_align(self.shell, self.cluster)
|
epoch.wait_for_epochs_align(self.shell, self.cluster)
|
||||||
|
|
248
pytest_tests/testsuites/failovers/test_failover_server.py
Normal file
248
pytest_tests/testsuites/failovers/test_failover_server.py
Normal file
|
@ -0,0 +1,248 @@
|
||||||
|
import logging
|
||||||
|
import os.path
|
||||||
|
import random
|
||||||
|
import time
|
||||||
|
|
||||||
|
import allure
|
||||||
|
import pytest
|
||||||
|
from frostfs_testlib.resources.common import PUBLIC_ACL
|
||||||
|
from frostfs_testlib.utils import datetime_utils
|
||||||
|
from pytest import FixtureRequest
|
||||||
|
|
||||||
|
from pytest_tests.helpers.cluster import ClusterNode, StorageNode
|
||||||
|
from pytest_tests.helpers.container import StorageContainer, StorageContainerInfo, create_container
|
||||||
|
from pytest_tests.helpers.failover_utils import wait_for_host_offline, wait_object_replication
|
||||||
|
from pytest_tests.helpers.file_helper import get_file_hash
|
||||||
|
from pytest_tests.helpers.frostfs_verbs import get_object
|
||||||
|
from pytest_tests.helpers.node_management import (
|
||||||
|
check_node_in_map,
|
||||||
|
check_node_not_in_map,
|
||||||
|
wait_for_node_to_be_ready,
|
||||||
|
)
|
||||||
|
from pytest_tests.helpers.storage_object_info import StorageObjectInfo
|
||||||
|
from pytest_tests.helpers.test_control import wait_for_success
|
||||||
|
from pytest_tests.helpers.wallet import WalletFile
|
||||||
|
from pytest_tests.resources.common import MORPH_BLOCK_TIME
|
||||||
|
from pytest_tests.steps.cluster_test_base import ClusterTestBase
|
||||||
|
|
||||||
|
logger = logging.getLogger("NeoLogger")
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.failover
|
||||||
|
@pytest.mark.failover_server
|
||||||
|
class TestFailoverServer(ClusterTestBase):
|
||||||
|
@wait_for_success(max_wait_time=120, interval=1)
|
||||||
|
def wait_node_not_in_map(self, *args, **kwargs):
|
||||||
|
check_node_not_in_map(*args, **kwargs)
|
||||||
|
|
||||||
|
@wait_for_success(max_wait_time=120, interval=1)
|
||||||
|
def wait_node_in_map(self, *args, **kwargs):
|
||||||
|
check_node_in_map(*args, **kwargs)
|
||||||
|
|
||||||
|
@allure.step("Create {count_containers} containers and {count_files} objects")
|
||||||
|
@pytest.fixture
|
||||||
|
def containers(
|
||||||
|
self,
|
||||||
|
request: FixtureRequest,
|
||||||
|
default_wallet: str,
|
||||||
|
) -> list[StorageContainer]:
|
||||||
|
|
||||||
|
placement_rule = "REP 2 CBF 2 SELECT 2 FROM * AS X"
|
||||||
|
|
||||||
|
containers = []
|
||||||
|
|
||||||
|
for _ in range(request.param):
|
||||||
|
cont_id = create_container(
|
||||||
|
default_wallet,
|
||||||
|
shell=self.shell,
|
||||||
|
endpoint=self.cluster.default_rpc_endpoint,
|
||||||
|
rule=placement_rule,
|
||||||
|
basic_acl=PUBLIC_ACL,
|
||||||
|
)
|
||||||
|
wallet = WalletFile(path=default_wallet)
|
||||||
|
storage_cont_info = StorageContainerInfo(id=cont_id, wallet_file=wallet)
|
||||||
|
containers.append(
|
||||||
|
StorageContainer(
|
||||||
|
storage_container_info=storage_cont_info, shell=self.shell, cluster=self.cluster
|
||||||
|
)
|
||||||
|
)
|
||||||
|
|
||||||
|
return containers
|
||||||
|
|
||||||
|
@pytest.fixture(
|
||||||
|
params=[
|
||||||
|
pytest.lazy_fixture("simple_object_size"),
|
||||||
|
pytest.lazy_fixture("complex_object_size"),
|
||||||
|
],
|
||||||
|
ids=["simple object", "complex object"],
|
||||||
|
# Scope session to upload/delete each files set only once
|
||||||
|
scope="class",
|
||||||
|
)
|
||||||
|
def object_size(self, request):
|
||||||
|
return request.param
|
||||||
|
|
||||||
|
@allure.step("Create object and delete after test")
|
||||||
|
@pytest.fixture(scope="class")
|
||||||
abereziny marked this conversation as resolved
Outdated
|
|||||||
|
def storage_objects(
|
||||||
|
self, request: FixtureRequest, containers: list[StorageContainer], object_size: int
|
||||||
|
) -> StorageObjectInfo:
|
||||||
|
object_list = []
|
||||||
|
for cont in containers:
|
||||||
|
for _ in range(request.param):
|
||||||
|
object_list.append(cont.generate_object(size=object_size))
|
||||||
|
|
||||||
|
yield object_list
|
||||||
|
|
||||||
|
for storage_object in object_list:
|
||||||
|
os.remove(storage_object.file_path)
|
||||||
|
|
||||||
abereziny marked this conversation as resolved
Outdated
abereziny
commented
Select random node to stop and start it after test Select random node to stop and start it after test
d.zayakin
commented
Corrected Corrected
|
|||||||
|
@allure.step("Select random node to stop and start it after test")
|
||||||
|
@pytest.fixture
|
||||||
|
def node_to_stop(self) -> ClusterNode:
|
||||||
|
node = random.choice(self.cluster.cluster_nodes)
|
||||||
|
yield node
|
||||||
|
with allure.step(f"start {node.storage_node}"):
|
||||||
|
node.host.start_host()
|
||||||
|
with allure.step(f"Waiting status ready for node {node.storage_node}"):
|
||||||
|
wait_for_node_to_be_ready(node.storage_node)
|
||||||
|
|
||||||
|
@allure.step("Upload object with nodes and compare")
|
||||||
|
def get_corrupted_objects_list(
|
||||||
|
self, nodes: list[StorageNode], storage_objects: list[StorageObjectInfo]
|
||||||
|
) -> list[StorageObjectInfo]:
|
||||||
|
corrupted_objects = []
|
||||||
|
for node in nodes:
|
||||||
|
for storage_object in storage_objects:
|
||||||
|
got_file_path = get_object(
|
||||||
|
storage_object.wallet_file_path,
|
||||||
|
storage_object.cid,
|
||||||
|
storage_object.oid,
|
||||||
|
endpoint=node.get_rpc_endpoint(),
|
||||||
|
shell=self.shell,
|
||||||
|
timeout="60s",
|
||||||
|
)
|
||||||
|
if storage_object.file_hash != get_file_hash(got_file_path):
|
||||||
|
corrupted_objects.append(storage_object)
|
||||||
|
os.remove(got_file_path)
|
||||||
|
|
||||||
|
return corrupted_objects
|
||||||
|
|
||||||
|
def check_objects_replication(
|
||||||
|
self, storage_objects: list[StorageObjectInfo], storage_nodes: list[StorageNode]
|
||||||
|
) -> None:
|
||||||
|
for storage_object in storage_objects:
|
||||||
|
wait_object_replication(
|
||||||
|
storage_object.cid,
|
||||||
|
storage_object.oid,
|
||||||
|
2,
|
||||||
|
shell=self.shell,
|
||||||
|
nodes=storage_nodes,
|
||||||
|
sleep_interval=45,
|
||||||
|
attempts=60,
|
||||||
|
)
|
||||||
|
|
||||||
|
@allure.title("Full shutdown node")
|
||||||
|
@pytest.mark.parametrize("containers, storage_objects", [(5, 20)], indirect=True)
|
||||||
|
def test_complete_node_shutdown(
|
||||||
|
self,
|
||||||
|
containers: list[StorageContainer],
|
||||||
|
storage_objects: list[StorageObjectInfo],
|
||||||
|
default_wallet: str,
|
||||||
|
node_to_stop: ClusterNode,
|
||||||
|
):
|
||||||
|
with allure.step("Checking that the objects are loader according to the policy"):
|
||||||
|
self.check_objects_replication(storage_objects, self.cluster.storage_nodes)
|
||||||
|
|
||||||
|
with allure.step(f"Remove {node_to_stop} from the list of nodes"):
|
||||||
|
alive_nodes = list(set(self.cluster.cluster_nodes) - {node_to_stop})
|
||||||
|
|
||||||
|
storage_nodes = [cluster.storage_node for cluster in alive_nodes]
|
||||||
|
|
||||||
|
with allure.step("Tick epoch"):
|
||||||
|
self.tick_epochs(1, storage_nodes[0])
|
||||||
|
|
||||||
|
with allure.step("Wait 2 block time"):
|
||||||
|
time.sleep(datetime_utils.parse_time(MORPH_BLOCK_TIME) * 2)
|
||||||
|
|
||||||
|
with allure.step(f"Stop {node_to_stop} node"):
|
||||||
|
node_to_stop.host.stop_host(mode="hard")
|
||||||
|
|
||||||
|
with allure.step(f"Check if the node {node_to_stop.storage_node} has stopped"):
|
||||||
|
wait_for_host_offline(self.shell, node_to_stop.storage_node)
|
||||||
|
|
||||||
|
with allure.step("Wait for objects replication"):
|
||||||
|
self.check_objects_replication(storage_objects, storage_nodes)
|
||||||
|
|
||||||
|
with allure.step("Verify that there are no corrupted objects"):
|
||||||
|
corrupted_objects_list = self.get_corrupted_objects_list(storage_nodes, storage_objects)
|
||||||
|
|
||||||
|
assert not corrupted_objects_list
|
||||||
|
|
||||||
|
with allure.step(f"check {node_to_stop.storage_node} in map"):
|
||||||
|
self.wait_node_in_map(
|
||||||
|
node_to_stop.storage_node, self.shell, alive_node=storage_nodes[0]
|
||||||
|
)
|
||||||
|
|
||||||
|
count_tick_epoch = alive_nodes[0].ir_node.get_netmap_cleaner_threshold() + 1
|
||||||
|
with allure.step(f"Tick {count_tick_epoch} epoch, in {storage_nodes[0]} node"):
|
||||||
|
self.tick_epochs(count_tick_epoch, storage_nodes[0])
|
||||||
|
|
||||||
|
with allure.step("Wait 2 block time"):
|
||||||
|
time.sleep(datetime_utils.parse_time(MORPH_BLOCK_TIME) * 2)
|
||||||
|
|
||||||
|
with allure.step(f"Check {node_to_stop} in not map"):
|
||||||
|
self.wait_node_not_in_map(
|
||||||
|
node_to_stop.storage_node, self.shell, alive_node=storage_nodes[0]
|
||||||
|
)
|
||||||
|
|
||||||
|
with allure.step(
|
||||||
|
f"Verify that there are no corrupted objects after {count_tick_epoch} epoch"
|
||||||
|
):
|
||||||
|
corrupted_objects_list = self.get_corrupted_objects_list(storage_nodes, storage_objects)
|
||||||
|
assert not corrupted_objects_list
|
||||||
|
|
||||||
|
@allure.title("Temporarily disable a node")
|
||||||
|
@pytest.mark.parametrize("containers, storage_objects", [(1, 2)], indirect=True)
|
||||||
|
def test_temporarily_disable_a_node(
|
||||||
|
self,
|
||||||
|
containers: list[StorageContainer],
|
||||||
|
storage_objects: list[StorageObjectInfo],
|
||||||
|
default_wallet: str,
|
||||||
|
node_to_stop,
|
||||||
|
):
|
||||||
|
with allure.step("Checking that the objects are loader according to the policy"):
|
||||||
|
self.check_objects_replication(storage_objects, self.cluster.storage_nodes)
|
||||||
|
|
||||||
|
with allure.step(f"Remove {node_to_stop} from the list of nodes"):
|
||||||
abereziny marked this conversation as resolved
Outdated
abereziny
commented
suggest to rename it as suggest to rename it as `alive_nodes` or smth as in the previous test case
d.zayakin
commented
There is rather a wrong naming initially. Corrected There is rather a wrong naming initially. Corrected
|
|||||||
|
storage_nodes = list(set(self.cluster.storage_nodes) - {node_to_stop.storage_node})
|
||||||
|
|
||||||
|
with allure.step("Tick epoch"):
|
||||||
|
self.tick_epochs(1, storage_nodes[0])
|
||||||
|
|
||||||
|
with allure.step("Wait 2 block time"):
|
||||||
|
time.sleep(datetime_utils.parse_time(MORPH_BLOCK_TIME) * 2)
|
||||||
|
|
||||||
|
with allure.step(f"Stop {node_to_stop.storage_node} node"):
|
||||||
|
node_to_stop.host.stop_host(mode="hard")
|
||||||
|
|
||||||
|
with allure.step(f"Check if the node {node_to_stop} has stopped"):
|
||||||
|
wait_for_host_offline(self.shell, node_to_stop.storage_node)
|
||||||
|
|
||||||
|
with allure.step("Wait for objects replication"):
|
||||||
|
self.check_objects_replication(storage_objects, storage_nodes)
|
||||||
|
|
||||||
|
with allure.step("Verify that there are no corrupted objects"):
|
||||||
|
corrupted_objects_list = self.get_corrupted_objects_list(storage_nodes, storage_objects)
|
||||||
|
assert not corrupted_objects_list
|
||||||
|
|
||||||
|
with allure.step(f"Check {node_to_stop} in map"):
|
||||||
|
self.wait_node_in_map(
|
||||||
|
node_to_stop.storage_node, self.shell, alive_node=storage_nodes[0]
|
||||||
|
)
|
||||||
|
|
||||||
|
with allure.step(f"Start {node_to_stop}"):
|
||||||
|
node_to_stop.host.start_host()
|
||||||
|
|
||||||
|
with allure.step("Verify that there are no corrupted objects"):
|
||||||
|
corrupted_objects_list = self.get_corrupted_objects_list(storage_nodes, storage_objects)
|
||||||
|
assert not corrupted_objects_list
|
Loading…
Reference in a new issue
This fixture also deserves an allure step :)
Corrected