322 lines
13 KiB
Python
322 lines
13 KiB
Python
import logging
|
|
import random
|
|
from time import sleep
|
|
from typing import Optional, Tuple
|
|
|
|
import allure
|
|
import pytest
|
|
from frostfs_testlib.resources.common import MORPH_BLOCK_TIME
|
|
from frostfs_testlib.resources.error_patterns import OBJECT_NOT_FOUND
|
|
from frostfs_testlib.resources.wellknown_acl import PUBLIC_ACL
|
|
from frostfs_testlib.steps.cli.container import create_container
|
|
from frostfs_testlib.steps.cli.object import (
|
|
delete_object,
|
|
get_object,
|
|
get_object_from_random_node,
|
|
head_object,
|
|
put_object,
|
|
put_object_to_random_node,
|
|
)
|
|
from frostfs_testlib.steps.node_management import (
|
|
check_node_in_map,
|
|
delete_node_data,
|
|
drop_object,
|
|
exclude_node_from_network_map,
|
|
get_locode_from_random_node,
|
|
include_node_to_network_map,
|
|
node_shard_list,
|
|
node_shard_set_mode,
|
|
storage_node_set_status,
|
|
wait_for_node_to_be_ready,
|
|
)
|
|
from frostfs_testlib.steps.storage_policy import get_nodes_with_object
|
|
from frostfs_testlib.storage.cluster import StorageNode
|
|
from frostfs_testlib.storage.dataclasses.object_size import ObjectSize
|
|
from frostfs_testlib.testing.cluster_test_base import ClusterTestBase
|
|
from frostfs_testlib.utils import datetime_utils, string_utils
|
|
from frostfs_testlib.utils.failover_utils import wait_object_replication
|
|
from frostfs_testlib.utils.file_utils import generate_file
|
|
|
|
from pytest_tests.helpers.utility import wait_for_gc_pass_on_storage_nodes
|
|
|
|
logger = logging.getLogger("NeoLogger")
|
|
check_nodes: list[StorageNode] = []
|
|
|
|
|
|
@allure.title("Add one node to cluster")
|
|
@pytest.mark.add_nodes
|
|
@pytest.mark.node_mgmt
|
|
class TestNodeManagement(ClusterTestBase):
|
|
@pytest.fixture
|
|
@allure.title("Create container and pick the node with data")
|
|
def create_container_and_pick_node(
|
|
self, default_wallet: str, simple_object_size: ObjectSize
|
|
) -> Tuple[str, StorageNode]:
|
|
file_path = generate_file(simple_object_size.value)
|
|
placement_rule = "REP 1 IN X CBF 1 SELECT 1 FROM * AS X"
|
|
endpoint = self.cluster.default_rpc_endpoint
|
|
|
|
cid = create_container(
|
|
default_wallet,
|
|
shell=self.shell,
|
|
endpoint=endpoint,
|
|
rule=placement_rule,
|
|
basic_acl=PUBLIC_ACL,
|
|
)
|
|
oid = put_object_to_random_node(default_wallet, file_path, cid, self.shell, self.cluster)
|
|
|
|
nodes = get_nodes_with_object(cid, oid, shell=self.shell, nodes=self.cluster.storage_nodes)
|
|
assert len(nodes) == 1
|
|
node = nodes[0]
|
|
|
|
yield cid, node
|
|
|
|
shards = node_shard_list(node)
|
|
assert shards
|
|
|
|
for shard in shards:
|
|
node_shard_set_mode(node, shard, "read-write")
|
|
|
|
node_shard_list(node)
|
|
|
|
@allure.step("Tick epoch with retries")
|
|
def tick_epoch_with_retries(self, attempts: int = 3, timeout: int = 3, wait_block: int = None):
|
|
for attempt in range(attempts):
|
|
try:
|
|
self.tick_epoch(wait_block=wait_block)
|
|
except RuntimeError:
|
|
sleep(timeout)
|
|
if attempt >= attempts - 1:
|
|
raise
|
|
continue
|
|
return
|
|
|
|
@pytest.fixture
|
|
def after_run_start_all_nodes(self):
|
|
yield
|
|
self.return_nodes()
|
|
|
|
@pytest.fixture
|
|
def return_nodes_after_test_run(self):
|
|
yield
|
|
self.return_nodes()
|
|
|
|
@allure.step("Return node to cluster")
|
|
def return_nodes(self, alive_node: Optional[StorageNode] = None) -> None:
|
|
for node in list(check_nodes):
|
|
with allure.step(f"Start node {node}"):
|
|
node.start_service()
|
|
with allure.step(f"Waiting status ready for node {node}"):
|
|
wait_for_node_to_be_ready(node)
|
|
|
|
# We need to wait for node to establish notifications from morph-chain
|
|
# Otherwise it will hang up when we will try to set status
|
|
sleep(datetime_utils.parse_time(MORPH_BLOCK_TIME))
|
|
|
|
with allure.step(f"Move node {node} to online state"):
|
|
storage_node_set_status(node, status="online", retries=2)
|
|
|
|
check_nodes.remove(node)
|
|
sleep(datetime_utils.parse_time(MORPH_BLOCK_TIME))
|
|
self.tick_epoch_with_retries(3, wait_block=2)
|
|
check_node_in_map(node, shell=self.shell, alive_node=alive_node)
|
|
|
|
@allure.title("Add one node to cluster")
|
|
@pytest.mark.add_nodes
|
|
def test_add_nodes(
|
|
self,
|
|
default_wallet: str,
|
|
simple_object_size: ObjectSize,
|
|
return_nodes_after_test_run,
|
|
):
|
|
"""
|
|
This test remove one node from frostfs_testlib.storage.cluster then add it back. Test uses base control operations with storage nodes (healthcheck, netmap-snapshot, set-status).
|
|
"""
|
|
wallet = default_wallet
|
|
placement_rule_3 = "REP 3 IN X CBF 1 SELECT 3 FROM * AS X"
|
|
placement_rule_4 = "REP 4 IN X CBF 1 SELECT 4 FROM * AS X"
|
|
source_file_path = generate_file(simple_object_size.value)
|
|
|
|
storage_nodes = self.cluster.storage_nodes
|
|
random_node = random.choice(storage_nodes[1:])
|
|
alive_node = random.choice(
|
|
[storage_node for storage_node in storage_nodes if storage_node.id != random_node.id]
|
|
)
|
|
|
|
check_node_in_map(random_node, shell=self.shell, alive_node=alive_node)
|
|
|
|
# Add node to recovery list before messing with it
|
|
check_nodes.append(random_node)
|
|
exclude_node_from_network_map(random_node, alive_node, shell=self.shell, cluster=self.cluster)
|
|
delete_node_data(random_node)
|
|
|
|
cid = create_container(
|
|
wallet,
|
|
rule=placement_rule_3,
|
|
basic_acl=PUBLIC_ACL,
|
|
shell=self.shell,
|
|
endpoint=alive_node.get_rpc_endpoint(),
|
|
)
|
|
oid = put_object(
|
|
wallet,
|
|
source_file_path,
|
|
cid,
|
|
shell=self.shell,
|
|
endpoint=alive_node.get_rpc_endpoint(),
|
|
)
|
|
wait_object_replication(cid, oid, 3, shell=self.shell, nodes=storage_nodes)
|
|
|
|
self.return_nodes(alive_node)
|
|
|
|
with allure.step("Check data could be replicated to new node"):
|
|
random_node = random.choice(list(set(storage_nodes) - {random_node, alive_node}))
|
|
# Add node to recovery list before messing with it
|
|
check_nodes.append(random_node)
|
|
exclude_node_from_network_map(random_node, alive_node, shell=self.shell, cluster=self.cluster)
|
|
|
|
wait_object_replication(
|
|
cid,
|
|
oid,
|
|
3,
|
|
shell=self.shell,
|
|
nodes=list(set(storage_nodes) - {random_node}),
|
|
)
|
|
include_node_to_network_map(random_node, alive_node, shell=self.shell, cluster=self.cluster)
|
|
wait_object_replication(cid, oid, 3, shell=self.shell, nodes=storage_nodes)
|
|
|
|
with allure.step("Check container could be created with new node"):
|
|
cid = create_container(
|
|
wallet,
|
|
rule=placement_rule_4,
|
|
basic_acl=PUBLIC_ACL,
|
|
shell=self.shell,
|
|
endpoint=alive_node.get_rpc_endpoint(),
|
|
)
|
|
oid = put_object(
|
|
wallet,
|
|
source_file_path,
|
|
cid,
|
|
shell=self.shell,
|
|
endpoint=alive_node.get_rpc_endpoint(),
|
|
)
|
|
wait_object_replication(cid, oid, 4, shell=self.shell, nodes=storage_nodes)
|
|
|
|
@pytest.mark.node_mgmt
|
|
@allure.title("Drop object using control command")
|
|
def test_drop_object(self, default_wallet, complex_object_size: ObjectSize, simple_object_size: ObjectSize):
|
|
"""
|
|
Test checks object could be dropped using `frostfs-cli control drop-objects` command.
|
|
"""
|
|
wallet = default_wallet
|
|
endpoint = self.cluster.default_rpc_endpoint
|
|
file_path_simple = generate_file(simple_object_size.value)
|
|
file_path_complex = generate_file(complex_object_size.value)
|
|
|
|
locode = get_locode_from_random_node(self.cluster)
|
|
rule = f"REP 1 IN SE CBF 1 SELECT 1 FROM LOC AS SE FILTER 'UN-LOCODE' EQ '{locode}' AS LOC"
|
|
cid = create_container(wallet, rule=rule, shell=self.shell, endpoint=endpoint)
|
|
oid_simple = put_object_to_random_node(wallet, file_path_simple, cid, shell=self.shell, cluster=self.cluster)
|
|
oid_complex = put_object_to_random_node(wallet, file_path_complex, cid, shell=self.shell, cluster=self.cluster)
|
|
|
|
for oid in (oid_simple, oid_complex):
|
|
get_object_from_random_node(wallet, cid, oid, shell=self.shell, cluster=self.cluster)
|
|
head_object(wallet, cid, oid, shell=self.shell, endpoint=endpoint)
|
|
|
|
nodes_with_object = get_nodes_with_object(cid, oid_simple, shell=self.shell, nodes=self.cluster.storage_nodes)
|
|
random_node = random.choice(nodes_with_object)
|
|
|
|
for oid in (oid_simple, oid_complex):
|
|
with allure.step(f"Drop object {oid}"):
|
|
get_object_from_random_node(wallet, cid, oid, shell=self.shell, cluster=self.cluster)
|
|
head_object(wallet, cid, oid, shell=self.shell, endpoint=endpoint)
|
|
drop_object(random_node, cid, oid)
|
|
self.wait_for_obj_dropped(wallet, cid, oid, endpoint, get_object)
|
|
self.wait_for_obj_dropped(wallet, cid, oid, endpoint, head_object)
|
|
|
|
@pytest.mark.node_mgmt
|
|
@pytest.mark.skip(reason="Need to clarify scenario")
|
|
@allure.title("Control Operations with storage nodes")
|
|
def test_shards(
|
|
self,
|
|
default_wallet,
|
|
create_container_and_pick_node,
|
|
simple_object_size: ObjectSize,
|
|
):
|
|
wallet = default_wallet
|
|
file_path = generate_file(simple_object_size.value)
|
|
|
|
cid, node = create_container_and_pick_node
|
|
original_oid = put_object_to_random_node(wallet, file_path, cid, self.shell, self.cluster)
|
|
|
|
# for mode in ('read-only', 'degraded'):
|
|
for mode in ("degraded",):
|
|
shards = node_shard_list(node)
|
|
assert shards
|
|
|
|
for shard in shards:
|
|
node_shard_set_mode(node, shard, mode)
|
|
|
|
shards = node_shard_list(node)
|
|
assert shards
|
|
|
|
with pytest.raises(RuntimeError):
|
|
put_object_to_random_node(wallet, file_path, cid, self.shell, self.cluster)
|
|
|
|
with pytest.raises(RuntimeError):
|
|
delete_object(wallet, cid, original_oid, self.shell, self.cluster.default_rpc_endpoint)
|
|
|
|
get_object_from_random_node(wallet, cid, original_oid, self.shell, self.cluster)
|
|
|
|
for shard in shards:
|
|
node_shard_set_mode(node, shard, "read-write")
|
|
|
|
shards = node_shard_list(node)
|
|
assert shards
|
|
|
|
oid = put_object_to_random_node(wallet, file_path, cid, self.shell, self.cluster)
|
|
delete_object(wallet, cid, oid, self.shell, self.cluster.default_rpc_endpoint)
|
|
|
|
@pytest.mark.node_mgmt
|
|
@allure.title("Put object with stopped node")
|
|
def test_stop_node(self, default_wallet, return_nodes_after_test_run, simple_object_size: ObjectSize):
|
|
wallet = default_wallet
|
|
placement_rule = "REP 3 IN X SELECT 4 FROM * AS X"
|
|
source_file_path = generate_file(simple_object_size.value)
|
|
storage_nodes = self.cluster.storage_nodes
|
|
random_node = random.choice(storage_nodes[1:])
|
|
alive_node = random.choice(
|
|
[storage_node for storage_node in storage_nodes if storage_node.id != random_node.id]
|
|
)
|
|
|
|
cid = create_container(
|
|
wallet,
|
|
rule=placement_rule,
|
|
basic_acl=PUBLIC_ACL,
|
|
shell=self.shell,
|
|
endpoint=random_node.get_rpc_endpoint(),
|
|
)
|
|
with allure.step("Stop the random node"):
|
|
check_nodes.append(random_node)
|
|
random_node.stop_service()
|
|
with allure.step("Try to put an object and expect success"):
|
|
put_object(
|
|
wallet,
|
|
source_file_path,
|
|
cid,
|
|
shell=self.shell,
|
|
endpoint=alive_node.get_rpc_endpoint(),
|
|
)
|
|
self.return_nodes(alive_node)
|
|
|
|
@allure.step("Wait for object to be dropped")
|
|
def wait_for_obj_dropped(self, wallet: str, cid: str, oid: str, endpoint: str, checker) -> None:
|
|
for _ in range(3):
|
|
try:
|
|
checker(wallet, cid, oid, shell=self.shell, endpoint=endpoint)
|
|
wait_for_gc_pass_on_storage_nodes()
|
|
except Exception as err:
|
|
if string_utils.is_str_match_pattern(err, OBJECT_NOT_FOUND):
|
|
return
|
|
raise AssertionError(f'Expected "{OBJECT_NOT_FOUND}" error, got\n{err}')
|
|
|
|
raise AssertionError(f"Object {oid} was not dropped from node")
|