[#154] Add new test
Signed-off-by: Dmitriy Zayakin <d.zayakin@yadro.com>
This commit is contained in:
parent
fbefa422e8
commit
269caf8e03
6 changed files with 833 additions and 677 deletions
|
@ -53,7 +53,8 @@ def pytest_collection_modifyitems(items: list[pytest.Item]):
|
|||
def priority(item: pytest.Item) -> int:
|
||||
is_node_mgmt_test = 1 if item.get_closest_marker("node_mgmt") else 0
|
||||
is_logs_check_test = 100 if item.get_closest_marker("logs_after_session") else 0
|
||||
return is_node_mgmt_test + is_logs_check_test
|
||||
is_system_time_test = 10 if item.get_closest_marker("system_time") else 0
|
||||
return is_node_mgmt_test + is_logs_check_test + is_system_time_test
|
||||
|
||||
items.sort(key=lambda item: priority(item))
|
||||
|
||||
|
|
|
@ -1,3 +1,4 @@
|
|||
import os
|
||||
import random
|
||||
from datetime import datetime
|
||||
|
||||
|
@ -5,6 +6,8 @@ import allure
|
|||
import pytest
|
||||
from frostfs_testlib.storage.cluster import Cluster, ClusterNode
|
||||
from frostfs_testlib.storage.controllers import ShardsWatcher
|
||||
from frostfs_testlib.storage.dataclasses.object_size import ObjectSize
|
||||
from frostfs_testlib.utils.file_utils import generate_file
|
||||
|
||||
|
||||
@pytest.fixture()
|
||||
|
@ -27,3 +30,11 @@ def shards_watcher(node_under_test: ClusterNode) -> ShardsWatcher:
|
|||
def test_start_time() -> datetime:
|
||||
start_time = datetime.utcnow()
|
||||
return start_time
|
||||
|
||||
|
||||
@pytest.fixture()
|
||||
@allure.title("Generate simple size file")
|
||||
def simple_file(simple_object_size: ObjectSize) -> str:
|
||||
path_file = generate_file(size=simple_object_size.value)
|
||||
yield path_file
|
||||
os.remove(path_file)
|
||||
|
|
|
@ -1,12 +1,14 @@
|
|||
import logging
|
||||
import os.path
|
||||
import random
|
||||
import time
|
||||
|
||||
import allure
|
||||
import pytest
|
||||
from frostfs_testlib.resources.common import MORPH_BLOCK_TIME
|
||||
from frostfs_testlib.resources.wellknown_acl import PUBLIC_ACL
|
||||
from frostfs_testlib.steps.cli.container import StorageContainer, StorageContainerInfo, create_container
|
||||
from frostfs_testlib.steps.cli.object import get_object
|
||||
from frostfs_testlib.steps.cli.object import get_object, get_object_nodes, put_object
|
||||
from frostfs_testlib.steps.node_management import check_node_in_map, check_node_not_in_map
|
||||
from frostfs_testlib.storage.cluster import ClusterNode, StorageNode
|
||||
from frostfs_testlib.storage.controllers import ClusterStateController
|
||||
|
@ -15,6 +17,7 @@ from frostfs_testlib.storage.dataclasses.storage_object_info import StorageObjec
|
|||
from frostfs_testlib.storage.dataclasses.wallet import WalletInfo
|
||||
from frostfs_testlib.testing.cluster_test_base import ClusterTestBase
|
||||
from frostfs_testlib.testing.test_control import wait_for_success
|
||||
from frostfs_testlib.utils import datetime_utils
|
||||
from frostfs_testlib.utils.failover_utils import wait_object_replication
|
||||
from frostfs_testlib.utils.file_utils import get_file_hash
|
||||
from pytest import FixtureRequest
|
||||
|
@ -61,6 +64,22 @@ class TestFailoverServer(ClusterTestBase):
|
|||
|
||||
return containers
|
||||
|
||||
@allure.step("Creation container")
|
||||
@pytest.fixture()
|
||||
def container(self, default_wallet: str) -> StorageContainer:
|
||||
select = len(self.cluster.cluster_nodes)
|
||||
placement_rule = f"REP {select - 1} CBF 1 SELECT {select} FROM *"
|
||||
cont_id = create_container(
|
||||
default_wallet,
|
||||
shell=self.shell,
|
||||
endpoint=self.cluster.default_rpc_endpoint,
|
||||
rule=placement_rule,
|
||||
basic_acl=PUBLIC_ACL,
|
||||
)
|
||||
wallet = WalletInfo(path=default_wallet)
|
||||
storage_cont_info = StorageContainerInfo(id=cont_id, wallet_file=wallet)
|
||||
return StorageContainer(storage_container_info=storage_cont_info, shell=self.shell, cluster=self.cluster)
|
||||
|
||||
@allure.step("Create object and delete after test")
|
||||
@pytest.fixture(scope="class")
|
||||
def storage_objects(
|
||||
|
@ -96,20 +115,25 @@ class TestFailoverServer(ClusterTestBase):
|
|||
self, nodes: list[StorageNode], storage_objects: list[StorageObjectInfo]
|
||||
) -> list[StorageObjectInfo]:
|
||||
corrupted_objects = []
|
||||
errors_get = []
|
||||
for node in nodes:
|
||||
for storage_object in storage_objects:
|
||||
got_file_path = get_object(
|
||||
storage_object.wallet_file_path,
|
||||
storage_object.cid,
|
||||
storage_object.oid,
|
||||
endpoint=node.get_rpc_endpoint(),
|
||||
shell=self.shell,
|
||||
timeout="60s",
|
||||
)
|
||||
if storage_object.file_hash != get_file_hash(got_file_path):
|
||||
corrupted_objects.append(storage_object)
|
||||
os.remove(got_file_path)
|
||||
try:
|
||||
got_file_path = get_object(
|
||||
storage_object.wallet_file_path,
|
||||
storage_object.cid,
|
||||
storage_object.oid,
|
||||
endpoint=node.get_rpc_endpoint(),
|
||||
shell=self.shell,
|
||||
timeout="60s",
|
||||
)
|
||||
if storage_object.file_hash != get_file_hash(got_file_path):
|
||||
corrupted_objects.append(storage_object)
|
||||
os.remove(got_file_path)
|
||||
except RuntimeError:
|
||||
errors_get.append(storage_object.oid)
|
||||
|
||||
assert len(errors_get) == 0, f"Get failed - {errors_get}"
|
||||
return corrupted_objects
|
||||
|
||||
def check_objects_replication(
|
||||
|
@ -126,6 +150,26 @@ class TestFailoverServer(ClusterTestBase):
|
|||
attempts=60,
|
||||
)
|
||||
|
||||
@pytest.fixture()
|
||||
def object_and_nodes(
|
||||
self, simple_object_size: ObjectSize, container: StorageContainer, default_wallet: str
|
||||
) -> tuple[StorageObjectInfo, list[ClusterNode]]:
|
||||
object_info = container.generate_object(simple_object_size.value)
|
||||
object_nodes = get_object_nodes(
|
||||
cluster=self.cluster,
|
||||
wallet=default_wallet,
|
||||
cid=object_info.cid,
|
||||
oid=object_info.oid,
|
||||
shell=self.shell,
|
||||
endpoint=self.cluster.default_rpc_endpoint,
|
||||
)
|
||||
return object_info, object_nodes
|
||||
|
||||
@pytest.fixture()
|
||||
def up_stop_nodes(self, cluster_state_controller: ClusterStateController):
|
||||
yield
|
||||
cluster_state_controller.start_stopped_hosts()
|
||||
|
||||
@allure.title("Full shutdown node")
|
||||
@pytest.mark.parametrize("containers, storage_objects", [(5, 10)], indirect=True)
|
||||
def test_complete_node_shutdown(
|
||||
|
@ -157,7 +201,7 @@ class TestFailoverServer(ClusterTestBase):
|
|||
count_tick_epoch = int(alive_nodes[0].ir_node.get_netmap_cleaner_threshold()) + 4
|
||||
|
||||
with allure.step(f"Tick {count_tick_epoch} epochs and wait for 2 blocks"):
|
||||
self.tick_epochs(count_tick_epoch, wait_block=2)
|
||||
self.tick_epochs(count_tick_epoch, storage_nodes[0], wait_block=2)
|
||||
|
||||
with allure.step(f"Check {node_to_stop} in not map"):
|
||||
self.wait_node_not_in_map(node_to_stop.storage_node, self.shell, alive_node=storage_nodes[0])
|
||||
|
@ -195,3 +239,111 @@ class TestFailoverServer(ClusterTestBase):
|
|||
with allure.step("Verify that there are no corrupted objects"):
|
||||
corrupted_objects_list = self.get_corrupted_objects_list(storage_nodes, storage_objects)
|
||||
assert not corrupted_objects_list
|
||||
|
||||
@allure.title("Not enough nodes in the container with policy - 'REP 3 CBF 1 SELECT 4 FROM *'")
|
||||
def test_not_enough_nodes_in_container_rep_3(
|
||||
self,
|
||||
container: list[StorageContainer],
|
||||
object_and_nodes: tuple[StorageObjectInfo, list[ClusterNode]],
|
||||
default_wallet: str,
|
||||
cluster_state_controller: ClusterStateController,
|
||||
simple_file: str,
|
||||
up_stop_nodes: None,
|
||||
):
|
||||
object_info, object_nodes = object_and_nodes
|
||||
node_not_object = list(set(self.cluster.cluster_nodes) - set(object_nodes))[0]
|
||||
|
||||
with allure.step("Stop all nodes with object, except 1"):
|
||||
for cluster_node in object_nodes[1:]:
|
||||
cluster_state_controller.stop_node_host(node=cluster_node, mode="hard")
|
||||
|
||||
with allure.step("Get object"):
|
||||
with allure.step(f"Get operation to {node_not_object} where it does not exist, expect success"):
|
||||
get_object(
|
||||
wallet=default_wallet,
|
||||
cid=object_info.cid,
|
||||
oid=object_info.oid,
|
||||
shell=self.shell,
|
||||
endpoint=node_not_object.storage_node.get_rpc_endpoint(),
|
||||
)
|
||||
with allure.step(f"Get operation to {object_nodes[0]} with object, expect success"):
|
||||
get_object(
|
||||
wallet=default_wallet,
|
||||
cid=object_info.cid,
|
||||
oid=object_info.oid,
|
||||
shell=self.shell,
|
||||
endpoint=object_nodes[0].storage_node.get_rpc_endpoint(),
|
||||
)
|
||||
|
||||
with allure.step(f"Put operation to node with object, expect error"):
|
||||
with pytest.raises(RuntimeError):
|
||||
put_object(
|
||||
wallet=default_wallet,
|
||||
path=simple_file,
|
||||
cid=object_info.cid,
|
||||
shell=self.shell,
|
||||
endpoint=node_not_object.storage_node.get_rpc_endpoint(),
|
||||
)
|
||||
|
||||
@allure.title("Not enough nodes in the container with policy - 'REP 2 CBF 2 SELECT 4 FROM *'")
|
||||
def test_not_enough_nodes_in_container_rep_2(
|
||||
self,
|
||||
default_wallet: str,
|
||||
cluster_state_controller: ClusterStateController,
|
||||
simple_file: str,
|
||||
up_stop_nodes: None,
|
||||
):
|
||||
with allure.step("Creating a container with a full network map"):
|
||||
select = len(self.cluster.cluster_nodes)
|
||||
placement_rule = f"REP {select - 2} IN X CBF 2 SELECT {select} FROM * AS X"
|
||||
cid_1 = create_container(
|
||||
default_wallet,
|
||||
shell=self.shell,
|
||||
endpoint=self.cluster.default_rpc_endpoint,
|
||||
rule=placement_rule,
|
||||
basic_acl=PUBLIC_ACL,
|
||||
)
|
||||
with allure.step("Put object"):
|
||||
oid = put_object(
|
||||
wallet=default_wallet,
|
||||
path=simple_file,
|
||||
cid=cid_1,
|
||||
shell=self.shell,
|
||||
endpoint=self.cluster.default_rpc_endpoint,
|
||||
)
|
||||
with allure.step("Search nodes with object"):
|
||||
object_nodes = get_object_nodes(
|
||||
cluster=self.cluster,
|
||||
wallet=default_wallet,
|
||||
cid=cid_1,
|
||||
oid=oid,
|
||||
shell=self.shell,
|
||||
endpoint=self.cluster.default_rpc_endpoint,
|
||||
)
|
||||
with allure.step("Turn off random node with object"):
|
||||
cluster_state_controller.stop_node_host(node=random.choice(object_nodes[1:]), mode="hard")
|
||||
with allure.step("Checking PUT operation"):
|
||||
oid_2 = put_object(
|
||||
wallet=default_wallet,
|
||||
path=simple_file,
|
||||
cid=cid_1,
|
||||
shell=self.shell,
|
||||
endpoint=self.cluster.default_rpc_endpoint,
|
||||
)
|
||||
with allure.step("Checking GET operation"):
|
||||
get_file = get_object(
|
||||
wallet=default_wallet,
|
||||
cid=cid_1,
|
||||
oid=oid_2,
|
||||
shell=self.shell,
|
||||
endpoint=object_nodes[0].storage_node.get_rpc_endpoint(),
|
||||
)
|
||||
os.remove(get_file)
|
||||
with allure.step("Checking creating container"):
|
||||
_ = create_container(
|
||||
default_wallet,
|
||||
shell=self.shell,
|
||||
endpoint=object_nodes[0].storage_node.get_rpc_endpoint(),
|
||||
rule=placement_rule,
|
||||
basic_acl=PUBLIC_ACL,
|
||||
)
|
||||
|
|
|
@ -1,30 +1,15 @@
|
|||
import logging
|
||||
import os
|
||||
import random
|
||||
from datetime import datetime
|
||||
from time import sleep
|
||||
|
||||
import allure
|
||||
import pytest
|
||||
from frostfs_testlib.cli import FrostfsCli
|
||||
from frostfs_testlib.cli.netmap_parser import NetmapParser
|
||||
from frostfs_testlib.resources.cli import FROSTFS_CLI_EXEC
|
||||
from frostfs_testlib.resources.common import DEFAULT_WALLET_CONFIG, MORPH_BLOCK_TIME
|
||||
from frostfs_testlib.resources.common import MORPH_BLOCK_TIME
|
||||
from frostfs_testlib.resources.wellknown_acl import PUBLIC_ACL
|
||||
from frostfs_testlib.s3 import S3ClientWrapper, VersioningStatus
|
||||
from frostfs_testlib.steps.cli.container import (
|
||||
StorageContainer,
|
||||
StorageContainerInfo,
|
||||
create_container,
|
||||
search_nodes_with_container,
|
||||
)
|
||||
from frostfs_testlib.steps.cli.object import (
|
||||
delete_object,
|
||||
get_object,
|
||||
put_object,
|
||||
put_object_to_random_node,
|
||||
search_object,
|
||||
)
|
||||
from frostfs_testlib.steps.cli.container import StorageContainer, StorageContainerInfo, create_container
|
||||
from frostfs_testlib.steps.cli.object import get_object, put_object_to_random_node
|
||||
from frostfs_testlib.steps.node_management import (
|
||||
check_node_in_map,
|
||||
check_node_not_in_map,
|
||||
|
@ -37,7 +22,7 @@ from frostfs_testlib.steps.s3 import s3_helper
|
|||
from frostfs_testlib.storage.cluster import Cluster, ClusterNode, S3Gate, StorageNode
|
||||
from frostfs_testlib.storage.controllers import ClusterStateController, ShardsWatcher
|
||||
from frostfs_testlib.storage.dataclasses.object_size import ObjectSize
|
||||
from frostfs_testlib.storage.dataclasses.storage_object_info import ModeNode, StorageObjectInfo
|
||||
from frostfs_testlib.storage.dataclasses.storage_object_info import StorageObjectInfo
|
||||
from frostfs_testlib.storage.dataclasses.wallet import WalletInfo
|
||||
from frostfs_testlib.testing.cluster_test_base import ClusterTestBase
|
||||
from frostfs_testlib.testing.test_control import expect_not_raises
|
||||
|
@ -727,325 +712,3 @@ class TestStorageDataLoss(ClusterTestBase):
|
|||
|
||||
with allure.step("Delete bucket"):
|
||||
s3_client.delete_bucket(bucket)
|
||||
|
||||
|
||||
@pytest.mark.maintenance
|
||||
class TestMaintenanceMode(ClusterTestBase):
|
||||
change_node: ClusterNode = None
|
||||
|
||||
@pytest.fixture()
|
||||
@allure.title("Init Frostfs CLI remote")
|
||||
def frostfs_cli_remote(self, node_under_test: ClusterNode) -> FrostfsCli:
|
||||
host = node_under_test.host
|
||||
service_config = host.get_service_config(node_under_test.storage_node.name)
|
||||
wallet_path = service_config.attributes["wallet_path"]
|
||||
wallet_password = service_config.attributes["wallet_password"]
|
||||
|
||||
shell = host.get_shell()
|
||||
wallet_config_path = f"/tmp/{node_under_test.storage_node.name}-config.yaml"
|
||||
wallet_config = f'wallet: {wallet_path}\npassword: "{wallet_password}"'
|
||||
shell.exec(f"echo '{wallet_config}' > {wallet_config_path}")
|
||||
cli = FrostfsCli(shell=shell, frostfs_cli_exec_path=FROSTFS_CLI_EXEC, config_file=wallet_config_path)
|
||||
return cli
|
||||
|
||||
@pytest.fixture()
|
||||
@allure.title("Init Frostfs CLI remote")
|
||||
def frostfs_cli(self) -> FrostfsCli:
|
||||
cli = FrostfsCli(shell=self.shell, frostfs_cli_exec_path=FROSTFS_CLI_EXEC, config_file=DEFAULT_WALLET_CONFIG)
|
||||
return cli
|
||||
|
||||
@pytest.fixture()
|
||||
def restore_online_mode_node(self, cluster_state_controller: ClusterStateController, default_wallet: str):
|
||||
yield
|
||||
cluster_state_controller.set_mode_node(cluster_node=self.change_node, wallet=default_wallet, status="online")
|
||||
self.tick_epoch(wait_block=2)
|
||||
|
||||
def basic_operations(self, wallet, cid, oid, shell, endpoint, matchs, object_size):
|
||||
file_path = generate_file(object_size)
|
||||
default_kw = {"wallet": wallet, "cid": cid, "shell": shell, "endpoint": endpoint}
|
||||
operations = {
|
||||
get_object: {"oid": oid},
|
||||
search_object: {},
|
||||
delete_object: {"oid": oid},
|
||||
put_object: {"path": file_path},
|
||||
}
|
||||
for index, operation, kw in enumerate(operations.items()):
|
||||
with allure.step(f"Run {operation.__name__} object, waiting response - {matchs[index]}"):
|
||||
default_kw.update(kw)
|
||||
if operation == search_object and "Found" in matchs[index]:
|
||||
operation(**default_kw)
|
||||
continue
|
||||
with pytest.raises(RuntimeError, match=matchs[index]):
|
||||
operation(**default_kw)
|
||||
os.remove(file_path)
|
||||
|
||||
@allure.title("Test of basic node operations in maintenance mode")
|
||||
def test_maintenance_mode(
|
||||
self,
|
||||
default_wallet: str,
|
||||
simple_object_size: ObjectSize,
|
||||
cluster_state_controller: ClusterStateController,
|
||||
restore_online_mode_node: None,
|
||||
):
|
||||
with allure.step("Create container and create\put object"):
|
||||
cid = create_container(
|
||||
wallet=default_wallet,
|
||||
shell=self.shell,
|
||||
endpoint=self.cluster.default_rpc_endpoint,
|
||||
rule="REP 1 CBF 1",
|
||||
)
|
||||
node = search_nodes_with_container(
|
||||
wallet=default_wallet,
|
||||
cid=cid,
|
||||
shell=self.shell,
|
||||
endpoint=self.cluster.default_rpc_endpoint,
|
||||
cluster=self.cluster,
|
||||
)
|
||||
self.change_node = node[0]
|
||||
file_path = generate_file(simple_object_size.value)
|
||||
oid = put_object(
|
||||
wallet=default_wallet,
|
||||
path=file_path,
|
||||
cid=cid,
|
||||
shell=self.shell,
|
||||
endpoint=self.change_node.storage_node.get_rpc_endpoint(),
|
||||
)
|
||||
with allure.step("Enable MaintenanceModeAllowed:"):
|
||||
cluster_state_controller.set_mode_node(
|
||||
cluster_node=self.change_node, wallet=default_wallet, status="maintenance"
|
||||
)
|
||||
|
||||
other_nodes = list(set(self.cluster.cluster_nodes) - set(node))
|
||||
node_and_match = {
|
||||
self.change_node: ["node is under maintenance"] * 4,
|
||||
other_nodes[0]: [
|
||||
"object not found",
|
||||
"Found 0 objects",
|
||||
"object not found",
|
||||
"node is under maintenance",
|
||||
],
|
||||
}
|
||||
with allure.step("Run basic operations"):
|
||||
for cluster_node, matchs in node_and_match.items():
|
||||
self.basic_operations(
|
||||
wallet=default_wallet,
|
||||
cid=cid,
|
||||
oid=oid,
|
||||
shell=self.shell,
|
||||
endpoint=cluster_node.storage_node.get_rpc_endpoint(),
|
||||
matchs=matchs,
|
||||
object_size=simple_object_size.value,
|
||||
)
|
||||
|
||||
@pytest.mark.sanity
|
||||
@allure.title("MAINTENANCE and OFFLINE mode transitions")
|
||||
def test_mode_transitions(
|
||||
self,
|
||||
cluster_state_controller: ClusterStateController,
|
||||
node_under_test: ClusterNode,
|
||||
default_wallet: str,
|
||||
frostfs_cli: FrostfsCli,
|
||||
restore_online_mode_node: None,
|
||||
):
|
||||
self.change_node = node_under_test
|
||||
cluster_nodes = list(set(self.cluster.cluster_nodes) - {node_under_test})
|
||||
|
||||
with allure.step("Tick epoch"):
|
||||
self.tick_epochs(epochs_to_tick=2, alive_node=cluster_nodes[0].storage_node, wait_block=2)
|
||||
|
||||
with allure.step("Set node mode to offline"):
|
||||
cluster_state_controller.set_mode_node(
|
||||
cluster_node=node_under_test,
|
||||
wallet=default_wallet,
|
||||
status=ModeNode.OFFLINE.value,
|
||||
)
|
||||
|
||||
with allure.step("Tick epoch to update the network map"):
|
||||
self.tick_epochs(epochs_to_tick=2, alive_node=cluster_nodes[0].storage_node, wait_block=2)
|
||||
|
||||
with ((allure.step("Check node mode = offline, after update the network map"))):
|
||||
netmap = frostfs_cli.netmap.snapshot(
|
||||
rpc_endpoint=cluster_nodes[0].storage_node.get_rpc_endpoint(), wallet=default_wallet
|
||||
).stdout
|
||||
netmap = NetmapParser.snapshot_all_nodes(netmap)
|
||||
assert node_under_test.host_ip not in [
|
||||
netmap_node.node for netmap_node in netmap
|
||||
], f"Node {node_under_test.host_ip} not in state offline"
|
||||
|
||||
with allure.step("Restart storage service"):
|
||||
cluster_state_controller.stop_storage_service(node_under_test)
|
||||
cluster_state_controller.start_storage_service(node_under_test)
|
||||
|
||||
with allure.step("Tick epoch after restart storage service and set mode to offline"):
|
||||
self.tick_epochs(epochs_to_tick=2, alive_node=cluster_nodes[0].storage_node, wait_block=2)
|
||||
|
||||
with (allure.step("Check node mode = online, after restart storage service")):
|
||||
netmap = frostfs_cli.netmap.snapshot(
|
||||
rpc_endpoint=cluster_nodes[0].storage_node.get_rpc_endpoint(), wallet=default_wallet
|
||||
).stdout
|
||||
node_state = NetmapParser.snapshot_one_node(netmap, node_under_test).node_status
|
||||
assert node_state == ModeNode.ONLINE.value.upper(), (
|
||||
f"Node actual state - {node_state}, " f"expect - {ModeNode.ONLINE.value}"
|
||||
)
|
||||
|
||||
with allure.step("Tick epoch"):
|
||||
self.tick_epochs(epochs_to_tick=2, alive_node=cluster_nodes[0].storage_node, wait_block=2)
|
||||
|
||||
with allure.step("Set node mode to maintenance"):
|
||||
cluster_state_controller.set_mode_node(
|
||||
cluster_node=node_under_test, wallet=default_wallet, status=ModeNode.MAINTENANCE.value
|
||||
)
|
||||
with allure.step("Restart storage service"):
|
||||
cluster_state_controller.stop_storage_service(node_under_test)
|
||||
cluster_state_controller.start_storage_service(node_under_test)
|
||||
|
||||
with allure.step("Tick epoch after restart storage service"):
|
||||
self.tick_epochs(epochs_to_tick=2, alive_node=cluster_nodes[0].storage_node, wait_block=2)
|
||||
|
||||
with allure.step("Check node mode = maintenance, after restart storage service and tick epoch"):
|
||||
netmap = frostfs_cli.netmap.snapshot(
|
||||
rpc_endpoint=cluster_nodes[0].storage_node.get_rpc_endpoint(), wallet=default_wallet
|
||||
).stdout
|
||||
node_state = NetmapParser.snapshot_one_node(netmap, node_under_test).node_status
|
||||
assert node_state == ModeNode.MAINTENANCE.value.upper(), (
|
||||
f"Node actual state - {node_state}, " f"expect - {ModeNode.MAINTENANCE.value}"
|
||||
)
|
||||
|
||||
with allure.step("Tick epoch"):
|
||||
self.tick_epochs(epochs_to_tick=2, alive_node=cluster_nodes[0].storage_node, wait_block=2)
|
||||
|
||||
with allure.step("Set node mode to offline"):
|
||||
netmap = frostfs_cli.netmap.snapshot(
|
||||
rpc_endpoint=cluster_nodes[0].storage_node.get_rpc_endpoint(), wallet=default_wallet
|
||||
).stdout
|
||||
node_state = NetmapParser.snapshot_one_node(netmap, node_under_test).node_status
|
||||
assert node_state == ModeNode.OFFLINE.value.upper(), (
|
||||
f"Node actual state - {node_state}, " f"expect - {ModeNode.OFFLINE.value}"
|
||||
)
|
||||
|
||||
with allure.step("Stop storage service"):
|
||||
cluster_state_controller.stop_storage_service(node_under_test)
|
||||
|
||||
with allure.step("Tick epoch"):
|
||||
self.tick_epochs(epochs_to_tick=2, alive_node=cluster_nodes[0].storage_node, wait_block=2)
|
||||
|
||||
with allure.step("Start storage service"):
|
||||
cluster_state_controller.start_storage_service(node_under_test)
|
||||
|
||||
with allure.step("Check node mode = offline, after tick epoch and start storage service"):
|
||||
netmap = frostfs_cli.netmap.snapshot(
|
||||
rpc_endpoint=cluster_nodes[0].storage_node.get_rpc_endpoint(), wallet=default_wallet
|
||||
).stdout
|
||||
node_state = NetmapParser.snapshot_one_node(netmap, node_under_test).node_status
|
||||
assert node_state == ModeNode.OFFLINE.value.upper(), (
|
||||
f"Node actual state - {node_state}, " f"expect - {ModeNode.OFFLINE.value}"
|
||||
)
|
||||
|
||||
with allure.step("Tick epoch"):
|
||||
self.tick_epochs(epochs_to_tick=2, alive_node=cluster_nodes[0].storage_node, wait_block=2)
|
||||
|
||||
with allure.step("Check node mode = online, after start storage service and tick epoch"):
|
||||
netmap = frostfs_cli.netmap.snapshot(
|
||||
rpc_endpoint=cluster_nodes[0].storage_node.get_rpc_endpoint(), wallet=default_wallet
|
||||
).stdout
|
||||
node_state = NetmapParser.snapshot_one_node(netmap, node_under_test).node_status
|
||||
assert node_state == ModeNode.ONLINE.value.upper(), (
|
||||
f"Node actual state - {node_state}, " f"expect - {ModeNode.ONLINE.value}"
|
||||
)
|
||||
|
||||
with allure.step("Tick epoch"):
|
||||
self.tick_epochs(epochs_to_tick=2, alive_node=cluster_nodes[0].storage_node, wait_block=2)
|
||||
|
||||
with allure.step("Set node mode to maintenance"):
|
||||
cluster_state_controller.set_mode_node(
|
||||
cluster_node=node_under_test, wallet=default_wallet, status=ModeNode.MAINTENANCE.value
|
||||
)
|
||||
|
||||
with allure.step("Stop storage service"):
|
||||
cluster_state_controller.stop_storage_service(node_under_test)
|
||||
|
||||
with allure.step("Tick epoch"):
|
||||
self.tick_epochs(epochs_to_tick=2, alive_node=cluster_nodes[0].storage_node, wait_block=2)
|
||||
|
||||
with allure.step("Start storage service"):
|
||||
cluster_state_controller.start_storage_service(node_under_test)
|
||||
|
||||
with allure.step("Check node mode = maintenance"):
|
||||
netmap = frostfs_cli.netmap.snapshot(
|
||||
rpc_endpoint=cluster_nodes[0].storage_node.get_rpc_endpoint(), wallet=default_wallet
|
||||
).stdout
|
||||
node_state = NetmapParser.snapshot_one_node(netmap, node_under_test).node_status
|
||||
|
||||
assert (
|
||||
node_state == ModeNode.MAINTENANCE.value.upper()
|
||||
), f"Node actual state - {node_state}, expect - {ModeNode.MAINTENANCE.value}"
|
||||
|
||||
with allure.step("Tick epoch"):
|
||||
self.tick_epochs(epochs_to_tick=2, alive_node=cluster_nodes[0].storage_node, wait_block=2)
|
||||
|
||||
with allure.step("Check node mode = maintenance"):
|
||||
netmap = frostfs_cli.netmap.snapshot(
|
||||
rpc_endpoint=cluster_nodes[0].storage_node.get_rpc_endpoint(), wallet=default_wallet
|
||||
).stdout
|
||||
node_state = NetmapParser.snapshot_one_node(netmap, node_under_test).node_status
|
||||
assert node_state == ModeNode.MAINTENANCE.value.upper(), (
|
||||
f"Node actual state - {node_state}, " f"expect - {ModeNode.MAINTENANCE.value}"
|
||||
)
|
||||
|
||||
@allure.title("A node cannot go into maintenance if maintenance is prohibited globally in the network")
|
||||
def test_maintenance_globally_forbidden(
|
||||
self,
|
||||
cluster_state_controller: ClusterStateController,
|
||||
node_under_test: ClusterNode,
|
||||
frostfs_cli_remote: FrostfsCli,
|
||||
frostfs_cli: FrostfsCli,
|
||||
default_wallet: str,
|
||||
restore_online_mode_node: None,
|
||||
):
|
||||
self.change_node = node_under_test
|
||||
control_endpoint = node_under_test.service(StorageNode).get_control_endpoint()
|
||||
|
||||
with allure.step("Set MaintenanceModeAllowed = false"):
|
||||
cluster_state_controller.set_maintenance_mode_allowed("false", node_under_test)
|
||||
|
||||
with allure.step("Set status node - maintenance"):
|
||||
with pytest.raises(RuntimeError, match="maintenance mode is not allowed by the network"):
|
||||
frostfs_cli_remote.control.set_status(endpoint=control_endpoint, status="maintenance")
|
||||
|
||||
with allure.step("Set MaintenanceModeAllowed = true"):
|
||||
cluster_state_controller.set_maintenance_mode_allowed("true", node_under_test)
|
||||
|
||||
with allure.step("Set status node - maintenance"):
|
||||
output = frostfs_cli_remote.control.set_status(endpoint=control_endpoint, status="maintenance")
|
||||
assert "update request successfully sent" in output.stdout, f"Response = {output}"
|
||||
|
||||
with allure.step("Tick epoch"):
|
||||
self.tick_epoch(wait_block=2)
|
||||
|
||||
with allure.step("Check state node = maintenance "):
|
||||
netmap_node = NetmapParser.snapshot_one_node(
|
||||
frostfs_cli.netmap.snapshot(
|
||||
rpc_endpoint=node_under_test.storage_node.get_rpc_endpoint(), wallet=default_wallet
|
||||
).stdout,
|
||||
node_under_test,
|
||||
)
|
||||
assert (
|
||||
netmap_node.node_status == ModeNode.MAINTENANCE.value.upper()
|
||||
), f"Node actual state - {netmap_node.node_status}, expect - {ModeNode.MAINTENANCE.value}"
|
||||
|
||||
with allure.step("Set status node - online "):
|
||||
frostfs_cli_remote.control.set_status(endpoint=control_endpoint, status="online")
|
||||
|
||||
with allure.step("Tick epoch"):
|
||||
self.tick_epoch()
|
||||
|
||||
with allure.step("Check state node: online"):
|
||||
netmap_node = NetmapParser.snapshot_one_node(
|
||||
frostfs_cli.netmap.snapshot(
|
||||
rpc_endpoint=node_under_test.storage_node.get_rpc_endpoint(), wallet=default_wallet
|
||||
).stdout,
|
||||
node_under_test,
|
||||
)
|
||||
assert (
|
||||
netmap_node.node_status == ModeNode.ONLINE.value.upper()
|
||||
), f"Node actual state - {netmap_node.node_status}, expect - {ModeNode.ONLINE.value}"
|
||||
|
|
651
pytest_tests/testsuites/management/test_node_management.py
Normal file
651
pytest_tests/testsuites/management/test_node_management.py
Normal file
|
@ -0,0 +1,651 @@
|
|||
import logging
|
||||
import os
|
||||
import random
|
||||
from time import sleep
|
||||
from typing import Optional, Tuple
|
||||
|
||||
import allure
|
||||
import pytest
|
||||
from frostfs_testlib.cli import FrostfsCli
|
||||
from frostfs_testlib.cli.netmap_parser import NetmapParser
|
||||
from frostfs_testlib.resources.cli import FROSTFS_CLI_EXEC
|
||||
from frostfs_testlib.resources.common import DEFAULT_WALLET_CONFIG, MORPH_BLOCK_TIME
|
||||
from frostfs_testlib.resources.error_patterns import OBJECT_NOT_FOUND
|
||||
from frostfs_testlib.resources.wellknown_acl import PUBLIC_ACL
|
||||
from frostfs_testlib.steps.cli.container import create_container, search_nodes_with_container
|
||||
from frostfs_testlib.steps.cli.object import (
|
||||
delete_object,
|
||||
get_object,
|
||||
get_object_from_random_node,
|
||||
head_object,
|
||||
put_object,
|
||||
put_object_to_random_node,
|
||||
search_object,
|
||||
)
|
||||
from frostfs_testlib.steps.node_management import (
|
||||
check_node_in_map,
|
||||
delete_node_data,
|
||||
drop_object,
|
||||
exclude_node_from_network_map,
|
||||
get_locode_from_random_node,
|
||||
include_node_to_network_map,
|
||||
node_shard_list,
|
||||
node_shard_set_mode,
|
||||
storage_node_set_status,
|
||||
wait_for_node_to_be_ready,
|
||||
)
|
||||
from frostfs_testlib.steps.storage_policy import get_nodes_with_object
|
||||
from frostfs_testlib.storage.cluster import ClusterNode, StorageNode
|
||||
from frostfs_testlib.storage.controllers import ClusterStateController
|
||||
from frostfs_testlib.storage.dataclasses.object_size import ObjectSize
|
||||
from frostfs_testlib.storage.dataclasses.storage_object_info import ModeNode
|
||||
from frostfs_testlib.testing.cluster_test_base import ClusterTestBase
|
||||
from frostfs_testlib.utils import datetime_utils, string_utils
|
||||
from frostfs_testlib.utils.failover_utils import wait_object_replication
|
||||
from frostfs_testlib.utils.file_utils import generate_file
|
||||
|
||||
from pytest_tests.helpers.utility import wait_for_gc_pass_on_storage_nodes
|
||||
|
||||
logger = logging.getLogger("NeoLogger")
|
||||
check_nodes: list[StorageNode] = []
|
||||
|
||||
|
||||
@allure.title("Add one node to cluster")
|
||||
@pytest.mark.add_nodes
|
||||
@pytest.mark.node_mgmt
|
||||
class TestNodeManagement(ClusterTestBase):
|
||||
@pytest.fixture
|
||||
@allure.title("Create container and pick the node with data")
|
||||
def create_container_and_pick_node(
|
||||
self, default_wallet: str, simple_object_size: ObjectSize
|
||||
) -> Tuple[str, StorageNode]:
|
||||
file_path = generate_file(simple_object_size.value)
|
||||
placement_rule = "REP 1 IN X CBF 1 SELECT 1 FROM * AS X"
|
||||
endpoint = self.cluster.default_rpc_endpoint
|
||||
|
||||
cid = create_container(
|
||||
default_wallet,
|
||||
shell=self.shell,
|
||||
endpoint=endpoint,
|
||||
rule=placement_rule,
|
||||
basic_acl=PUBLIC_ACL,
|
||||
)
|
||||
oid = put_object_to_random_node(default_wallet, file_path, cid, self.shell, self.cluster)
|
||||
|
||||
nodes = get_nodes_with_object(cid, oid, shell=self.shell, nodes=self.cluster.storage_nodes)
|
||||
assert len(nodes) == 1
|
||||
node = nodes[0]
|
||||
|
||||
yield cid, node
|
||||
|
||||
shards = node_shard_list(node)
|
||||
assert shards
|
||||
|
||||
for shard in shards:
|
||||
node_shard_set_mode(node, shard, "read-write")
|
||||
|
||||
node_shard_list(node)
|
||||
|
||||
@allure.step("Tick epoch with retries")
|
||||
def tick_epoch_with_retries(self, attempts: int = 3, timeout: int = 3, wait_block: int = None):
|
||||
for attempt in range(attempts):
|
||||
try:
|
||||
self.tick_epoch(wait_block=wait_block)
|
||||
except RuntimeError:
|
||||
sleep(timeout)
|
||||
if attempt >= attempts - 1:
|
||||
raise
|
||||
continue
|
||||
return
|
||||
|
||||
@pytest.fixture
|
||||
def after_run_start_all_nodes(self):
|
||||
yield
|
||||
self.return_nodes()
|
||||
|
||||
@pytest.fixture
|
||||
def return_nodes_after_test_run(self):
|
||||
yield
|
||||
self.return_nodes()
|
||||
|
||||
@allure.step("Return node to cluster")
|
||||
def return_nodes(self, alive_node: Optional[StorageNode] = None) -> None:
|
||||
for node in list(check_nodes):
|
||||
with allure.step(f"Start node {node}"):
|
||||
node.start_service()
|
||||
with allure.step(f"Waiting status ready for node {node}"):
|
||||
wait_for_node_to_be_ready(node)
|
||||
|
||||
# We need to wait for node to establish notifications from morph-chain
|
||||
# Otherwise it will hang up when we will try to set status
|
||||
sleep(datetime_utils.parse_time(MORPH_BLOCK_TIME))
|
||||
|
||||
with allure.step(f"Move node {node} to online state"):
|
||||
storage_node_set_status(node, status="online", retries=2)
|
||||
|
||||
check_nodes.remove(node)
|
||||
sleep(datetime_utils.parse_time(MORPH_BLOCK_TIME))
|
||||
self.tick_epoch_with_retries(3, wait_block=2)
|
||||
check_node_in_map(node, shell=self.shell, alive_node=alive_node)
|
||||
|
||||
@allure.title("Add one node to cluster")
|
||||
@pytest.mark.add_nodes
|
||||
def test_add_nodes(
|
||||
self,
|
||||
default_wallet: str,
|
||||
simple_object_size: ObjectSize,
|
||||
return_nodes_after_test_run,
|
||||
):
|
||||
"""
|
||||
This test remove one node from frostfs_testlib.storage.cluster then add it back. Test uses base control operations with storage nodes (healthcheck, netmap-snapshot, set-status).
|
||||
"""
|
||||
wallet = default_wallet
|
||||
placement_rule_3 = "REP 3 IN X CBF 1 SELECT 3 FROM * AS X"
|
||||
placement_rule_4 = "REP 4 IN X CBF 1 SELECT 4 FROM * AS X"
|
||||
source_file_path = generate_file(simple_object_size.value)
|
||||
|
||||
storage_nodes = self.cluster.storage_nodes
|
||||
random_node = random.choice(storage_nodes[1:])
|
||||
alive_node = random.choice(
|
||||
[storage_node for storage_node in storage_nodes if storage_node.id != random_node.id]
|
||||
)
|
||||
|
||||
check_node_in_map(random_node, shell=self.shell, alive_node=alive_node)
|
||||
|
||||
# Add node to recovery list before messing with it
|
||||
check_nodes.append(random_node)
|
||||
exclude_node_from_network_map(random_node, alive_node, shell=self.shell, cluster=self.cluster)
|
||||
delete_node_data(random_node)
|
||||
|
||||
cid = create_container(
|
||||
wallet,
|
||||
rule=placement_rule_3,
|
||||
basic_acl=PUBLIC_ACL,
|
||||
shell=self.shell,
|
||||
endpoint=alive_node.get_rpc_endpoint(),
|
||||
)
|
||||
oid = put_object(
|
||||
wallet,
|
||||
source_file_path,
|
||||
cid,
|
||||
shell=self.shell,
|
||||
endpoint=alive_node.get_rpc_endpoint(),
|
||||
)
|
||||
wait_object_replication(cid, oid, 3, shell=self.shell, nodes=storage_nodes)
|
||||
|
||||
self.return_nodes(alive_node)
|
||||
|
||||
with allure.step("Check data could be replicated to new node"):
|
||||
random_node = random.choice(list(set(storage_nodes) - {random_node, alive_node}))
|
||||
# Add node to recovery list before messing with it
|
||||
check_nodes.append(random_node)
|
||||
exclude_node_from_network_map(random_node, alive_node, shell=self.shell, cluster=self.cluster)
|
||||
|
||||
wait_object_replication(
|
||||
cid,
|
||||
oid,
|
||||
3,
|
||||
shell=self.shell,
|
||||
nodes=list(set(storage_nodes) - {random_node}),
|
||||
)
|
||||
include_node_to_network_map(random_node, alive_node, shell=self.shell, cluster=self.cluster)
|
||||
wait_object_replication(cid, oid, 3, shell=self.shell, nodes=storage_nodes)
|
||||
|
||||
with allure.step("Check container could be created with new node"):
|
||||
cid = create_container(
|
||||
wallet,
|
||||
rule=placement_rule_4,
|
||||
basic_acl=PUBLIC_ACL,
|
||||
shell=self.shell,
|
||||
endpoint=alive_node.get_rpc_endpoint(),
|
||||
)
|
||||
oid = put_object(
|
||||
wallet,
|
||||
source_file_path,
|
||||
cid,
|
||||
shell=self.shell,
|
||||
endpoint=alive_node.get_rpc_endpoint(),
|
||||
)
|
||||
wait_object_replication(cid, oid, 4, shell=self.shell, nodes=storage_nodes)
|
||||
|
||||
@pytest.mark.node_mgmt
|
||||
@allure.title("Drop object using control command")
|
||||
def test_drop_object(self, default_wallet, complex_object_size: ObjectSize, simple_object_size: ObjectSize):
|
||||
"""
|
||||
Test checks object could be dropped using `frostfs-cli control drop-objects` command.
|
||||
"""
|
||||
wallet = default_wallet
|
||||
endpoint = self.cluster.default_rpc_endpoint
|
||||
file_path_simple = generate_file(simple_object_size.value)
|
||||
file_path_complex = generate_file(complex_object_size.value)
|
||||
|
||||
locode = get_locode_from_random_node(self.cluster)
|
||||
rule = f"REP 1 IN SE CBF 1 SELECT 1 FROM LOC AS SE FILTER 'UN-LOCODE' EQ '{locode}' AS LOC"
|
||||
cid = create_container(wallet, rule=rule, shell=self.shell, endpoint=endpoint)
|
||||
oid_simple = put_object_to_random_node(wallet, file_path_simple, cid, shell=self.shell, cluster=self.cluster)
|
||||
oid_complex = put_object_to_random_node(wallet, file_path_complex, cid, shell=self.shell, cluster=self.cluster)
|
||||
|
||||
for oid in (oid_simple, oid_complex):
|
||||
get_object_from_random_node(wallet, cid, oid, shell=self.shell, cluster=self.cluster)
|
||||
head_object(wallet, cid, oid, shell=self.shell, endpoint=endpoint)
|
||||
|
||||
nodes_with_object = get_nodes_with_object(cid, oid_simple, shell=self.shell, nodes=self.cluster.storage_nodes)
|
||||
random_node = random.choice(nodes_with_object)
|
||||
|
||||
for oid in (oid_simple, oid_complex):
|
||||
with allure.step(f"Drop object {oid}"):
|
||||
get_object_from_random_node(wallet, cid, oid, shell=self.shell, cluster=self.cluster)
|
||||
head_object(wallet, cid, oid, shell=self.shell, endpoint=endpoint)
|
||||
drop_object(random_node, cid, oid)
|
||||
self.wait_for_obj_dropped(wallet, cid, oid, endpoint, get_object)
|
||||
self.wait_for_obj_dropped(wallet, cid, oid, endpoint, head_object)
|
||||
|
||||
@pytest.mark.node_mgmt
|
||||
@pytest.mark.skip(reason="Need to clarify scenario")
|
||||
@allure.title("Control Operations with storage nodes")
|
||||
def test_shards(
|
||||
self,
|
||||
default_wallet,
|
||||
create_container_and_pick_node,
|
||||
simple_object_size: ObjectSize,
|
||||
):
|
||||
wallet = default_wallet
|
||||
file_path = generate_file(simple_object_size.value)
|
||||
|
||||
cid, node = create_container_and_pick_node
|
||||
original_oid = put_object_to_random_node(wallet, file_path, cid, self.shell, self.cluster)
|
||||
|
||||
# for mode in ('read-only', 'degraded'):
|
||||
for mode in ("degraded",):
|
||||
shards = node_shard_list(node)
|
||||
assert shards
|
||||
|
||||
for shard in shards:
|
||||
node_shard_set_mode(node, shard, mode)
|
||||
|
||||
shards = node_shard_list(node)
|
||||
assert shards
|
||||
|
||||
with pytest.raises(RuntimeError):
|
||||
put_object_to_random_node(wallet, file_path, cid, self.shell, self.cluster)
|
||||
|
||||
with pytest.raises(RuntimeError):
|
||||
delete_object(wallet, cid, original_oid, self.shell, self.cluster.default_rpc_endpoint)
|
||||
|
||||
get_object_from_random_node(wallet, cid, original_oid, self.shell, self.cluster)
|
||||
|
||||
for shard in shards:
|
||||
node_shard_set_mode(node, shard, "read-write")
|
||||
|
||||
shards = node_shard_list(node)
|
||||
assert shards
|
||||
|
||||
oid = put_object_to_random_node(wallet, file_path, cid, self.shell, self.cluster)
|
||||
delete_object(wallet, cid, oid, self.shell, self.cluster.default_rpc_endpoint)
|
||||
|
||||
@pytest.mark.node_mgmt
|
||||
@allure.title("Put object with stopped node")
|
||||
def test_stop_node(self, default_wallet, return_nodes_after_test_run, simple_object_size: ObjectSize):
|
||||
wallet = default_wallet
|
||||
placement_rule = "REP 3 IN X SELECT 4 FROM * AS X"
|
||||
source_file_path = generate_file(simple_object_size.value)
|
||||
storage_nodes = self.cluster.storage_nodes
|
||||
random_node = random.choice(storage_nodes[1:])
|
||||
alive_node = random.choice(
|
||||
[storage_node for storage_node in storage_nodes if storage_node.id != random_node.id]
|
||||
)
|
||||
|
||||
cid = create_container(
|
||||
wallet,
|
||||
rule=placement_rule,
|
||||
basic_acl=PUBLIC_ACL,
|
||||
shell=self.shell,
|
||||
endpoint=random_node.get_rpc_endpoint(),
|
||||
)
|
||||
with allure.step("Stop the random node"):
|
||||
check_nodes.append(random_node)
|
||||
random_node.stop_service()
|
||||
with allure.step("Try to put an object and expect success"):
|
||||
put_object(
|
||||
wallet,
|
||||
source_file_path,
|
||||
cid,
|
||||
shell=self.shell,
|
||||
endpoint=alive_node.get_rpc_endpoint(),
|
||||
)
|
||||
self.return_nodes(alive_node)
|
||||
|
||||
@allure.step("Wait for object to be dropped")
|
||||
def wait_for_obj_dropped(self, wallet: str, cid: str, oid: str, endpoint: str, checker) -> None:
|
||||
for _ in range(3):
|
||||
try:
|
||||
checker(wallet, cid, oid, shell=self.shell, endpoint=endpoint)
|
||||
wait_for_gc_pass_on_storage_nodes()
|
||||
except Exception as err:
|
||||
if string_utils.is_str_match_pattern(err, OBJECT_NOT_FOUND):
|
||||
return
|
||||
raise AssertionError(f'Expected "{OBJECT_NOT_FOUND}" error, got\n{err}')
|
||||
|
||||
raise AssertionError(f"Object {oid} was not dropped from node")
|
||||
|
||||
|
||||
@pytest.mark.maintenance
|
||||
class TestMaintenanceMode(ClusterTestBase):
|
||||
change_node: ClusterNode = None
|
||||
|
||||
@pytest.fixture()
|
||||
@allure.title("Init Frostfs CLI remote")
|
||||
def frostfs_cli_remote(self, node_under_test: ClusterNode) -> FrostfsCli:
|
||||
host = node_under_test.host
|
||||
service_config = host.get_service_config(node_under_test.storage_node.name)
|
||||
wallet_path = service_config.attributes["wallet_path"]
|
||||
wallet_password = service_config.attributes["wallet_password"]
|
||||
|
||||
shell = host.get_shell()
|
||||
wallet_config_path = f"/tmp/{node_under_test.storage_node.name}-config.yaml"
|
||||
wallet_config = f'wallet: {wallet_path}\npassword: "{wallet_password}"'
|
||||
shell.exec(f"echo '{wallet_config}' > {wallet_config_path}")
|
||||
cli = FrostfsCli(shell=shell, frostfs_cli_exec_path=FROSTFS_CLI_EXEC, config_file=wallet_config_path)
|
||||
return cli
|
||||
|
||||
@pytest.fixture()
|
||||
@allure.title("Init Frostfs CLI remote")
|
||||
def frostfs_cli(self) -> FrostfsCli:
|
||||
cli = FrostfsCli(shell=self.shell, frostfs_cli_exec_path=FROSTFS_CLI_EXEC, config_file=DEFAULT_WALLET_CONFIG)
|
||||
return cli
|
||||
|
||||
@pytest.fixture()
|
||||
def restore_online_mode_node(self, cluster_state_controller: ClusterStateController, default_wallet: str):
|
||||
yield
|
||||
cluster_state_controller.set_mode_node(cluster_node=self.change_node, wallet=default_wallet, status="online")
|
||||
self.tick_epoch(wait_block=2)
|
||||
|
||||
def basic_operations(self, wallet, cid, oid, shell, endpoint, matchs, object_size):
|
||||
file_path = generate_file(object_size)
|
||||
default_kw = {"wallet": wallet, "cid": cid, "shell": shell, "endpoint": endpoint}
|
||||
operations = {
|
||||
get_object: {"oid": oid},
|
||||
search_object: {},
|
||||
delete_object: {"oid": oid},
|
||||
put_object: {"path": file_path},
|
||||
}
|
||||
for index, operation, kw in enumerate(operations.items()):
|
||||
with allure.step(f"Run {operation.__name__} object, waiting response - {matchs[index]}"):
|
||||
default_kw.update(kw)
|
||||
if operation == search_object and "Found" in matchs[index]:
|
||||
operation(**default_kw)
|
||||
continue
|
||||
with pytest.raises(RuntimeError, match=matchs[index]):
|
||||
operation(**default_kw)
|
||||
os.remove(file_path)
|
||||
|
||||
@allure.title("Test of basic node operations in maintenance mode")
|
||||
def test_maintenance_mode(
|
||||
self,
|
||||
default_wallet: str,
|
||||
simple_object_size: ObjectSize,
|
||||
cluster_state_controller: ClusterStateController,
|
||||
restore_online_mode_node: None,
|
||||
):
|
||||
with allure.step("Create container and create\put object"):
|
||||
cid = create_container(
|
||||
wallet=default_wallet,
|
||||
shell=self.shell,
|
||||
endpoint=self.cluster.default_rpc_endpoint,
|
||||
rule="REP 1 CBF 1",
|
||||
)
|
||||
node = search_nodes_with_container(
|
||||
wallet=default_wallet,
|
||||
cid=cid,
|
||||
shell=self.shell,
|
||||
endpoint=self.cluster.default_rpc_endpoint,
|
||||
cluster=self.cluster,
|
||||
)
|
||||
self.change_node = node[0]
|
||||
file_path = generate_file(simple_object_size.value)
|
||||
oid = put_object(
|
||||
wallet=default_wallet,
|
||||
path=file_path,
|
||||
cid=cid,
|
||||
shell=self.shell,
|
||||
endpoint=self.change_node.storage_node.get_rpc_endpoint(),
|
||||
)
|
||||
with allure.step("Enable MaintenanceModeAllowed:"):
|
||||
cluster_state_controller.set_mode_node(
|
||||
cluster_node=self.change_node, wallet=default_wallet, status="maintenance"
|
||||
)
|
||||
|
||||
other_nodes = list(set(self.cluster.cluster_nodes) - set(node))
|
||||
node_and_match = {
|
||||
self.change_node: ["node is under maintenance"] * 4,
|
||||
other_nodes[0]: [
|
||||
"object not found",
|
||||
"Found 0 objects",
|
||||
"object not found",
|
||||
"node is under maintenance",
|
||||
],
|
||||
}
|
||||
with allure.step("Run basic operations"):
|
||||
for cluster_node, matchs in node_and_match.items():
|
||||
self.basic_operations(
|
||||
wallet=default_wallet,
|
||||
cid=cid,
|
||||
oid=oid,
|
||||
shell=self.shell,
|
||||
endpoint=cluster_node.storage_node.get_rpc_endpoint(),
|
||||
matchs=matchs,
|
||||
object_size=simple_object_size.value,
|
||||
)
|
||||
|
||||
@pytest.mark.sanity
|
||||
@allure.title("MAINTENANCE and OFFLINE mode transitions")
|
||||
def test_mode_transitions(
|
||||
self,
|
||||
cluster_state_controller: ClusterStateController,
|
||||
node_under_test: ClusterNode,
|
||||
default_wallet: str,
|
||||
frostfs_cli: FrostfsCli,
|
||||
restore_online_mode_node: None,
|
||||
):
|
||||
self.change_node = node_under_test
|
||||
cluster_nodes = list(set(self.cluster.cluster_nodes) - {node_under_test})
|
||||
|
||||
with allure.step("Tick epoch"):
|
||||
self.tick_epochs(epochs_to_tick=2, alive_node=cluster_nodes[0].storage_node, wait_block=2)
|
||||
|
||||
with allure.step("Set node mode to offline"):
|
||||
cluster_state_controller.set_mode_node(
|
||||
cluster_node=node_under_test,
|
||||
wallet=default_wallet,
|
||||
status=ModeNode.OFFLINE.value,
|
||||
)
|
||||
|
||||
with allure.step("Tick epoch to update the network map"):
|
||||
self.tick_epochs(epochs_to_tick=2, alive_node=cluster_nodes[0].storage_node, wait_block=2)
|
||||
|
||||
with ((allure.step("Check node mode = offline, after update the network map"))):
|
||||
netmap = frostfs_cli.netmap.snapshot(
|
||||
rpc_endpoint=cluster_nodes[0].storage_node.get_rpc_endpoint(), wallet=default_wallet
|
||||
).stdout
|
||||
netmap = NetmapParser.snapshot_all_nodes(netmap)
|
||||
assert node_under_test.host_ip not in [
|
||||
netmap_node.node for netmap_node in netmap
|
||||
], f"Node {node_under_test.host_ip} not in state offline"
|
||||
|
||||
with allure.step("Restart storage service"):
|
||||
cluster_state_controller.stop_storage_service(node_under_test)
|
||||
cluster_state_controller.start_storage_service(node_under_test)
|
||||
|
||||
with allure.step("Tick epoch after restart storage service and set mode to offline"):
|
||||
self.tick_epochs(epochs_to_tick=2, alive_node=cluster_nodes[0].storage_node, wait_block=2)
|
||||
|
||||
with (allure.step("Check node mode = online, after restart storage service")):
|
||||
netmap = frostfs_cli.netmap.snapshot(
|
||||
rpc_endpoint=cluster_nodes[0].storage_node.get_rpc_endpoint(), wallet=default_wallet
|
||||
).stdout
|
||||
node_state = NetmapParser.snapshot_one_node(netmap, node_under_test).node_status
|
||||
assert node_state == ModeNode.ONLINE.value.upper(), (
|
||||
f"Node actual state - {node_state}, " f"expect - {ModeNode.ONLINE.value}"
|
||||
)
|
||||
|
||||
with allure.step("Tick epoch"):
|
||||
self.tick_epochs(epochs_to_tick=2, alive_node=cluster_nodes[0].storage_node, wait_block=2)
|
||||
|
||||
with allure.step("Set node mode to maintenance"):
|
||||
cluster_state_controller.set_mode_node(
|
||||
cluster_node=node_under_test, wallet=default_wallet, status=ModeNode.MAINTENANCE.value
|
||||
)
|
||||
with allure.step("Restart storage service"):
|
||||
cluster_state_controller.stop_storage_service(node_under_test)
|
||||
cluster_state_controller.start_storage_service(node_under_test)
|
||||
|
||||
with allure.step("Tick epoch after restart storage service"):
|
||||
self.tick_epochs(epochs_to_tick=2, alive_node=cluster_nodes[0].storage_node, wait_block=2)
|
||||
|
||||
with allure.step("Check node mode = maintenance, after restart storage service and tick epoch"):
|
||||
netmap = frostfs_cli.netmap.snapshot(
|
||||
rpc_endpoint=cluster_nodes[0].storage_node.get_rpc_endpoint(), wallet=default_wallet
|
||||
).stdout
|
||||
node_state = NetmapParser.snapshot_one_node(netmap, node_under_test).node_status
|
||||
assert node_state == ModeNode.MAINTENANCE.value.upper(), (
|
||||
f"Node actual state - {node_state}, " f"expect - {ModeNode.MAINTENANCE.value}"
|
||||
)
|
||||
|
||||
with allure.step("Tick epoch"):
|
||||
self.tick_epochs(epochs_to_tick=2, alive_node=cluster_nodes[0].storage_node, wait_block=2)
|
||||
|
||||
with allure.step("Set node mode to offline"):
|
||||
netmap = frostfs_cli.netmap.snapshot(
|
||||
rpc_endpoint=cluster_nodes[0].storage_node.get_rpc_endpoint(), wallet=default_wallet
|
||||
).stdout
|
||||
node_state = NetmapParser.snapshot_one_node(netmap, node_under_test).node_status
|
||||
assert node_state == ModeNode.OFFLINE.value.upper(), (
|
||||
f"Node actual state - {node_state}, " f"expect - {ModeNode.OFFLINE.value}"
|
||||
)
|
||||
|
||||
with allure.step("Stop storage service"):
|
||||
cluster_state_controller.stop_storage_service(node_under_test)
|
||||
|
||||
with allure.step("Tick epoch"):
|
||||
self.tick_epochs(epochs_to_tick=2, alive_node=cluster_nodes[0].storage_node, wait_block=2)
|
||||
|
||||
with allure.step("Start storage service"):
|
||||
cluster_state_controller.start_storage_service(node_under_test)
|
||||
|
||||
with allure.step("Check node mode = offline, after tick epoch and start storage service"):
|
||||
netmap = frostfs_cli.netmap.snapshot(
|
||||
rpc_endpoint=cluster_nodes[0].storage_node.get_rpc_endpoint(), wallet=default_wallet
|
||||
).stdout
|
||||
node_state = NetmapParser.snapshot_one_node(netmap, node_under_test).node_status
|
||||
assert node_state == ModeNode.OFFLINE.value.upper(), (
|
||||
f"Node actual state - {node_state}, " f"expect - {ModeNode.OFFLINE.value}"
|
||||
)
|
||||
|
||||
with allure.step("Tick epoch"):
|
||||
self.tick_epochs(epochs_to_tick=2, alive_node=cluster_nodes[0].storage_node, wait_block=2)
|
||||
|
||||
with allure.step("Check node mode = online, after start storage service and tick epoch"):
|
||||
netmap = frostfs_cli.netmap.snapshot(
|
||||
rpc_endpoint=cluster_nodes[0].storage_node.get_rpc_endpoint(), wallet=default_wallet
|
||||
).stdout
|
||||
node_state = NetmapParser.snapshot_one_node(netmap, node_under_test).node_status
|
||||
assert node_state == ModeNode.ONLINE.value.upper(), (
|
||||
f"Node actual state - {node_state}, " f"expect - {ModeNode.ONLINE.value}"
|
||||
)
|
||||
|
||||
with allure.step("Tick epoch"):
|
||||
self.tick_epochs(epochs_to_tick=2, alive_node=cluster_nodes[0].storage_node, wait_block=2)
|
||||
|
||||
with allure.step("Set node mode to maintenance"):
|
||||
cluster_state_controller.set_mode_node(
|
||||
cluster_node=node_under_test, wallet=default_wallet, status=ModeNode.MAINTENANCE.value
|
||||
)
|
||||
|
||||
with allure.step("Stop storage service"):
|
||||
cluster_state_controller.stop_storage_service(node_under_test)
|
||||
|
||||
with allure.step("Tick epoch"):
|
||||
self.tick_epochs(epochs_to_tick=2, alive_node=cluster_nodes[0].storage_node, wait_block=2)
|
||||
|
||||
with allure.step("Start storage service"):
|
||||
cluster_state_controller.start_storage_service(node_under_test)
|
||||
|
||||
with allure.step("Check node mode = maintenance"):
|
||||
netmap = frostfs_cli.netmap.snapshot(
|
||||
rpc_endpoint=cluster_nodes[0].storage_node.get_rpc_endpoint(), wallet=default_wallet
|
||||
).stdout
|
||||
node_state = NetmapParser.snapshot_one_node(netmap, node_under_test).node_status
|
||||
|
||||
assert (
|
||||
node_state == ModeNode.MAINTENANCE.value.upper()
|
||||
), f"Node actual state - {node_state}, expect - {ModeNode.MAINTENANCE.value}"
|
||||
|
||||
with allure.step("Tick epoch"):
|
||||
self.tick_epochs(epochs_to_tick=2, alive_node=cluster_nodes[0].storage_node, wait_block=2)
|
||||
|
||||
with allure.step("Check node mode = maintenance"):
|
||||
netmap = frostfs_cli.netmap.snapshot(
|
||||
rpc_endpoint=cluster_nodes[0].storage_node.get_rpc_endpoint(), wallet=default_wallet
|
||||
).stdout
|
||||
node_state = NetmapParser.snapshot_one_node(netmap, node_under_test).node_status
|
||||
assert node_state == ModeNode.MAINTENANCE.value.upper(), (
|
||||
f"Node actual state - {node_state}, " f"expect - {ModeNode.MAINTENANCE.value}"
|
||||
)
|
||||
|
||||
@allure.title("A node cannot go into maintenance if maintenance is prohibited globally in the network")
|
||||
def test_maintenance_globally_forbidden(
|
||||
self,
|
||||
cluster_state_controller: ClusterStateController,
|
||||
node_under_test: ClusterNode,
|
||||
frostfs_cli_remote: FrostfsCli,
|
||||
frostfs_cli: FrostfsCli,
|
||||
default_wallet: str,
|
||||
restore_online_mode_node: None,
|
||||
):
|
||||
self.change_node = node_under_test
|
||||
control_endpoint = node_under_test.service(StorageNode).get_control_endpoint()
|
||||
|
||||
with allure.step("Set MaintenanceModeAllowed = false"):
|
||||
cluster_state_controller.set_maintenance_mode_allowed("false", node_under_test)
|
||||
|
||||
with allure.step("Set status node - maintenance"):
|
||||
with pytest.raises(RuntimeError, match="maintenance mode is not allowed by the network"):
|
||||
frostfs_cli_remote.control.set_status(endpoint=control_endpoint, status="maintenance")
|
||||
|
||||
with allure.step("Set MaintenanceModeAllowed = true"):
|
||||
cluster_state_controller.set_maintenance_mode_allowed("true", node_under_test)
|
||||
|
||||
with allure.step("Set status node - maintenance"):
|
||||
output = frostfs_cli_remote.control.set_status(endpoint=control_endpoint, status="maintenance")
|
||||
assert "update request successfully sent" in output.stdout, f"Response = {output}"
|
||||
|
||||
with allure.step("Tick epoch"):
|
||||
self.tick_epoch(wait_block=2)
|
||||
|
||||
with allure.step("Check state node = maintenance "):
|
||||
netmap_node = NetmapParser.snapshot_one_node(
|
||||
frostfs_cli.netmap.snapshot(
|
||||
rpc_endpoint=node_under_test.storage_node.get_rpc_endpoint(), wallet=default_wallet
|
||||
).stdout,
|
||||
node_under_test,
|
||||
)
|
||||
assert (
|
||||
netmap_node.node_status == ModeNode.MAINTENANCE.value.upper()
|
||||
), f"Node actual state - {netmap_node.node_status}, expect - {ModeNode.MAINTENANCE.value}"
|
||||
|
||||
with allure.step("Set status node - online "):
|
||||
frostfs_cli_remote.control.set_status(endpoint=control_endpoint, status="online")
|
||||
|
||||
with allure.step("Tick epoch"):
|
||||
self.tick_epoch()
|
||||
|
||||
with allure.step("Check state node: online"):
|
||||
netmap_node = NetmapParser.snapshot_one_node(
|
||||
frostfs_cli.netmap.snapshot(
|
||||
rpc_endpoint=node_under_test.storage_node.get_rpc_endpoint(), wallet=default_wallet
|
||||
).stdout,
|
||||
node_under_test,
|
||||
)
|
||||
assert (
|
||||
netmap_node.node_status == ModeNode.ONLINE.value.upper()
|
||||
), f"Node actual state - {netmap_node.node_status}, expect - {ModeNode.ONLINE.value}"
|
|
@ -1,322 +0,0 @@
|
|||
import logging
|
||||
import random
|
||||
from time import sleep
|
||||
from typing import Optional, Tuple
|
||||
|
||||
import allure
|
||||
import pytest
|
||||
from frostfs_testlib.resources.common import MORPH_BLOCK_TIME
|
||||
from frostfs_testlib.resources.error_patterns import OBJECT_NOT_FOUND
|
||||
from frostfs_testlib.resources.wellknown_acl import PUBLIC_ACL
|
||||
from frostfs_testlib.steps.cli.container import create_container
|
||||
from frostfs_testlib.steps.cli.object import (
|
||||
delete_object,
|
||||
get_object,
|
||||
get_object_from_random_node,
|
||||
head_object,
|
||||
put_object,
|
||||
put_object_to_random_node,
|
||||
)
|
||||
from frostfs_testlib.steps.node_management import (
|
||||
check_node_in_map,
|
||||
delete_node_data,
|
||||
drop_object,
|
||||
exclude_node_from_network_map,
|
||||
get_locode_from_random_node,
|
||||
include_node_to_network_map,
|
||||
node_shard_list,
|
||||
node_shard_set_mode,
|
||||
storage_node_set_status,
|
||||
wait_for_node_to_be_ready,
|
||||
)
|
||||
from frostfs_testlib.steps.storage_policy import get_nodes_with_object
|
||||
from frostfs_testlib.storage.cluster import StorageNode
|
||||
from frostfs_testlib.storage.dataclasses.object_size import ObjectSize
|
||||
from frostfs_testlib.testing.cluster_test_base import ClusterTestBase
|
||||
from frostfs_testlib.utils import datetime_utils, string_utils
|
||||
from frostfs_testlib.utils.failover_utils import wait_object_replication
|
||||
from frostfs_testlib.utils.file_utils import generate_file
|
||||
|
||||
from pytest_tests.helpers.utility import wait_for_gc_pass_on_storage_nodes
|
||||
|
||||
logger = logging.getLogger("NeoLogger")
|
||||
check_nodes: list[StorageNode] = []
|
||||
|
||||
|
||||
@allure.title("Add one node to cluster")
|
||||
@pytest.mark.add_nodes
|
||||
@pytest.mark.node_mgmt
|
||||
class TestNodeManagement(ClusterTestBase):
|
||||
@pytest.fixture
|
||||
@allure.title("Create container and pick the node with data")
|
||||
def create_container_and_pick_node(
|
||||
self, default_wallet: str, simple_object_size: ObjectSize
|
||||
) -> Tuple[str, StorageNode]:
|
||||
file_path = generate_file(simple_object_size.value)
|
||||
placement_rule = "REP 1 IN X CBF 1 SELECT 1 FROM * AS X"
|
||||
endpoint = self.cluster.default_rpc_endpoint
|
||||
|
||||
cid = create_container(
|
||||
default_wallet,
|
||||
shell=self.shell,
|
||||
endpoint=endpoint,
|
||||
rule=placement_rule,
|
||||
basic_acl=PUBLIC_ACL,
|
||||
)
|
||||
oid = put_object_to_random_node(default_wallet, file_path, cid, self.shell, self.cluster)
|
||||
|
||||
nodes = get_nodes_with_object(cid, oid, shell=self.shell, nodes=self.cluster.storage_nodes)
|
||||
assert len(nodes) == 1
|
||||
node = nodes[0]
|
||||
|
||||
yield cid, node
|
||||
|
||||
shards = node_shard_list(node)
|
||||
assert shards
|
||||
|
||||
for shard in shards:
|
||||
node_shard_set_mode(node, shard, "read-write")
|
||||
|
||||
node_shard_list(node)
|
||||
|
||||
@allure.step("Tick epoch with retries")
|
||||
def tick_epoch_with_retries(self, attempts: int = 3, timeout: int = 3, wait_block: int = None):
|
||||
for attempt in range(attempts):
|
||||
try:
|
||||
self.tick_epoch(wait_block=wait_block)
|
||||
except RuntimeError:
|
||||
sleep(timeout)
|
||||
if attempt >= attempts - 1:
|
||||
raise
|
||||
continue
|
||||
return
|
||||
|
||||
@pytest.fixture
|
||||
def after_run_start_all_nodes(self):
|
||||
yield
|
||||
self.return_nodes()
|
||||
|
||||
@pytest.fixture
|
||||
def return_nodes_after_test_run(self):
|
||||
yield
|
||||
self.return_nodes()
|
||||
|
||||
@allure.step("Return node to cluster")
|
||||
def return_nodes(self, alive_node: Optional[StorageNode] = None) -> None:
|
||||
for node in list(check_nodes):
|
||||
with allure.step(f"Start node {node}"):
|
||||
node.start_service()
|
||||
with allure.step(f"Waiting status ready for node {node}"):
|
||||
wait_for_node_to_be_ready(node)
|
||||
|
||||
# We need to wait for node to establish notifications from morph-chain
|
||||
# Otherwise it will hang up when we will try to set status
|
||||
sleep(datetime_utils.parse_time(MORPH_BLOCK_TIME))
|
||||
|
||||
with allure.step(f"Move node {node} to online state"):
|
||||
storage_node_set_status(node, status="online", retries=2)
|
||||
|
||||
check_nodes.remove(node)
|
||||
sleep(datetime_utils.parse_time(MORPH_BLOCK_TIME))
|
||||
self.tick_epoch_with_retries(3, wait_block=2)
|
||||
check_node_in_map(node, shell=self.shell, alive_node=alive_node)
|
||||
|
||||
@allure.title("Add one node to cluster")
|
||||
@pytest.mark.add_nodes
|
||||
def test_add_nodes(
|
||||
self,
|
||||
default_wallet: str,
|
||||
simple_object_size: ObjectSize,
|
||||
return_nodes_after_test_run,
|
||||
):
|
||||
"""
|
||||
This test remove one node from frostfs_testlib.storage.cluster then add it back. Test uses base control operations with storage nodes (healthcheck, netmap-snapshot, set-status).
|
||||
"""
|
||||
wallet = default_wallet
|
||||
placement_rule_3 = "REP 3 IN X CBF 1 SELECT 3 FROM * AS X"
|
||||
placement_rule_4 = "REP 4 IN X CBF 1 SELECT 4 FROM * AS X"
|
||||
source_file_path = generate_file(simple_object_size.value)
|
||||
|
||||
storage_nodes = self.cluster.storage_nodes
|
||||
random_node = random.choice(storage_nodes[1:])
|
||||
alive_node = random.choice(
|
||||
[storage_node for storage_node in storage_nodes if storage_node.id != random_node.id]
|
||||
)
|
||||
|
||||
check_node_in_map(random_node, shell=self.shell, alive_node=alive_node)
|
||||
|
||||
# Add node to recovery list before messing with it
|
||||
check_nodes.append(random_node)
|
||||
exclude_node_from_network_map(random_node, alive_node, shell=self.shell, cluster=self.cluster)
|
||||
delete_node_data(random_node)
|
||||
|
||||
cid = create_container(
|
||||
wallet,
|
||||
rule=placement_rule_3,
|
||||
basic_acl=PUBLIC_ACL,
|
||||
shell=self.shell,
|
||||
endpoint=alive_node.get_rpc_endpoint(),
|
||||
)
|
||||
oid = put_object(
|
||||
wallet,
|
||||
source_file_path,
|
||||
cid,
|
||||
shell=self.shell,
|
||||
endpoint=alive_node.get_rpc_endpoint(),
|
||||
)
|
||||
wait_object_replication(cid, oid, 3, shell=self.shell, nodes=storage_nodes)
|
||||
|
||||
self.return_nodes(alive_node)
|
||||
|
||||
with allure.step("Check data could be replicated to new node"):
|
||||
random_node = random.choice(list(set(storage_nodes) - {random_node, alive_node}))
|
||||
# Add node to recovery list before messing with it
|
||||
check_nodes.append(random_node)
|
||||
exclude_node_from_network_map(random_node, alive_node, shell=self.shell, cluster=self.cluster)
|
||||
|
||||
wait_object_replication(
|
||||
cid,
|
||||
oid,
|
||||
3,
|
||||
shell=self.shell,
|
||||
nodes=list(set(storage_nodes) - {random_node}),
|
||||
)
|
||||
include_node_to_network_map(random_node, alive_node, shell=self.shell, cluster=self.cluster)
|
||||
wait_object_replication(cid, oid, 3, shell=self.shell, nodes=storage_nodes)
|
||||
|
||||
with allure.step("Check container could be created with new node"):
|
||||
cid = create_container(
|
||||
wallet,
|
||||
rule=placement_rule_4,
|
||||
basic_acl=PUBLIC_ACL,
|
||||
shell=self.shell,
|
||||
endpoint=alive_node.get_rpc_endpoint(),
|
||||
)
|
||||
oid = put_object(
|
||||
wallet,
|
||||
source_file_path,
|
||||
cid,
|
||||
shell=self.shell,
|
||||
endpoint=alive_node.get_rpc_endpoint(),
|
||||
)
|
||||
wait_object_replication(cid, oid, 4, shell=self.shell, nodes=storage_nodes)
|
||||
|
||||
@pytest.mark.node_mgmt
|
||||
@allure.title("Drop object using control command")
|
||||
def test_drop_object(self, default_wallet, complex_object_size: ObjectSize, simple_object_size: ObjectSize):
|
||||
"""
|
||||
Test checks object could be dropped using `frostfs-cli control drop-objects` command.
|
||||
"""
|
||||
wallet = default_wallet
|
||||
endpoint = self.cluster.default_rpc_endpoint
|
||||
file_path_simple = generate_file(simple_object_size.value)
|
||||
file_path_complex = generate_file(complex_object_size.value)
|
||||
|
||||
locode = get_locode_from_random_node(self.cluster)
|
||||
rule = f"REP 1 IN SE CBF 1 SELECT 1 FROM LOC AS SE FILTER 'UN-LOCODE' EQ '{locode}' AS LOC"
|
||||
cid = create_container(wallet, rule=rule, shell=self.shell, endpoint=endpoint)
|
||||
oid_simple = put_object_to_random_node(wallet, file_path_simple, cid, shell=self.shell, cluster=self.cluster)
|
||||
oid_complex = put_object_to_random_node(wallet, file_path_complex, cid, shell=self.shell, cluster=self.cluster)
|
||||
|
||||
for oid in (oid_simple, oid_complex):
|
||||
get_object_from_random_node(wallet, cid, oid, shell=self.shell, cluster=self.cluster)
|
||||
head_object(wallet, cid, oid, shell=self.shell, endpoint=endpoint)
|
||||
|
||||
nodes_with_object = get_nodes_with_object(cid, oid_simple, shell=self.shell, nodes=self.cluster.storage_nodes)
|
||||
random_node = random.choice(nodes_with_object)
|
||||
|
||||
for oid in (oid_simple, oid_complex):
|
||||
with allure.step(f"Drop object {oid}"):
|
||||
get_object_from_random_node(wallet, cid, oid, shell=self.shell, cluster=self.cluster)
|
||||
head_object(wallet, cid, oid, shell=self.shell, endpoint=endpoint)
|
||||
drop_object(random_node, cid, oid)
|
||||
self.wait_for_obj_dropped(wallet, cid, oid, endpoint, get_object)
|
||||
self.wait_for_obj_dropped(wallet, cid, oid, endpoint, head_object)
|
||||
|
||||
@pytest.mark.node_mgmt
|
||||
@pytest.mark.skip(reason="Need to clarify scenario")
|
||||
@allure.title("Control Operations with storage nodes")
|
||||
def test_shards(
|
||||
self,
|
||||
default_wallet,
|
||||
create_container_and_pick_node,
|
||||
simple_object_size: ObjectSize,
|
||||
):
|
||||
wallet = default_wallet
|
||||
file_path = generate_file(simple_object_size.value)
|
||||
|
||||
cid, node = create_container_and_pick_node
|
||||
original_oid = put_object_to_random_node(wallet, file_path, cid, self.shell, self.cluster)
|
||||
|
||||
# for mode in ('read-only', 'degraded'):
|
||||
for mode in ("degraded",):
|
||||
shards = node_shard_list(node)
|
||||
assert shards
|
||||
|
||||
for shard in shards:
|
||||
node_shard_set_mode(node, shard, mode)
|
||||
|
||||
shards = node_shard_list(node)
|
||||
assert shards
|
||||
|
||||
with pytest.raises(RuntimeError):
|
||||
put_object_to_random_node(wallet, file_path, cid, self.shell, self.cluster)
|
||||
|
||||
with pytest.raises(RuntimeError):
|
||||
delete_object(wallet, cid, original_oid, self.shell, self.cluster.default_rpc_endpoint)
|
||||
|
||||
get_object_from_random_node(wallet, cid, original_oid, self.shell, self.cluster)
|
||||
|
||||
for shard in shards:
|
||||
node_shard_set_mode(node, shard, "read-write")
|
||||
|
||||
shards = node_shard_list(node)
|
||||
assert shards
|
||||
|
||||
oid = put_object_to_random_node(wallet, file_path, cid, self.shell, self.cluster)
|
||||
delete_object(wallet, cid, oid, self.shell, self.cluster.default_rpc_endpoint)
|
||||
|
||||
@pytest.mark.node_mgmt
|
||||
@allure.title("Put object with stopped node")
|
||||
def test_stop_node(self, default_wallet, return_nodes_after_test_run, simple_object_size: ObjectSize):
|
||||
wallet = default_wallet
|
||||
placement_rule = "REP 3 IN X SELECT 4 FROM * AS X"
|
||||
source_file_path = generate_file(simple_object_size.value)
|
||||
storage_nodes = self.cluster.storage_nodes
|
||||
random_node = random.choice(storage_nodes[1:])
|
||||
alive_node = random.choice(
|
||||
[storage_node for storage_node in storage_nodes if storage_node.id != random_node.id]
|
||||
)
|
||||
|
||||
cid = create_container(
|
||||
wallet,
|
||||
rule=placement_rule,
|
||||
basic_acl=PUBLIC_ACL,
|
||||
shell=self.shell,
|
||||
endpoint=random_node.get_rpc_endpoint(),
|
||||
)
|
||||
with allure.step("Stop the random node"):
|
||||
check_nodes.append(random_node)
|
||||
random_node.stop_service()
|
||||
with allure.step("Try to put an object and expect success"):
|
||||
put_object(
|
||||
wallet,
|
||||
source_file_path,
|
||||
cid,
|
||||
shell=self.shell,
|
||||
endpoint=alive_node.get_rpc_endpoint(),
|
||||
)
|
||||
self.return_nodes(alive_node)
|
||||
|
||||
@allure.step("Wait for object to be dropped")
|
||||
def wait_for_obj_dropped(self, wallet: str, cid: str, oid: str, endpoint: str, checker) -> None:
|
||||
for _ in range(3):
|
||||
try:
|
||||
checker(wallet, cid, oid, shell=self.shell, endpoint=endpoint)
|
||||
wait_for_gc_pass_on_storage_nodes()
|
||||
except Exception as err:
|
||||
if string_utils.is_str_match_pattern(err, OBJECT_NOT_FOUND):
|
||||
return
|
||||
raise AssertionError(f'Expected "{OBJECT_NOT_FOUND}" error, got\n{err}')
|
||||
|
||||
raise AssertionError(f"Object {oid} was not dropped from node")
|
Loading…
Reference in a new issue