a.berezin
b999d7cf9b
Some checks failed
DCO check / DCO (pull_request) Has been cancelled
Signed-off-by: a.berezin <a.berezin@yadro.com>
149 lines
6.2 KiB
Python
149 lines
6.2 KiB
Python
import json
|
|
import os
|
|
|
|
import allure
|
|
import pytest
|
|
from frostfs_testlib import reporter
|
|
from frostfs_testlib.cli import FrostfsCli
|
|
from frostfs_testlib.resources.cli import CLI_DEFAULT_TIMEOUT
|
|
from frostfs_testlib.shell.interfaces import Shell
|
|
from frostfs_testlib.steps.cli.object import get_object, get_object_nodes, put_object
|
|
from frostfs_testlib.storage.cluster import Cluster, ClusterNode, StorageNode
|
|
from frostfs_testlib.storage.controllers import ClusterStateController, ShardsWatcher
|
|
from frostfs_testlib.storage.controllers.state_managers.config_state_manager import ConfigStateManager
|
|
from frostfs_testlib.storage.dataclasses.shard import Shard
|
|
from frostfs_testlib.storage.dataclasses.wallet import WalletInfo
|
|
from frostfs_testlib.testing import parallel, wait_for_success
|
|
from frostfs_testlib.testing.cluster_test_base import ClusterTestBase
|
|
from frostfs_testlib.utils.file_utils import generate_file
|
|
|
|
from ...helpers.container_request import PUBLIC_WITH_POLICY, requires_container
|
|
|
|
|
|
def set_shard_rw_mode(node: ClusterNode):
|
|
watcher = ShardsWatcher(node)
|
|
shards = watcher.get_shards()
|
|
for shard in shards:
|
|
watcher.set_shard_mode(shard["shard_id"], mode="read-write")
|
|
watcher.await_for_all_shards_status(status="read-write")
|
|
|
|
|
|
@pytest.fixture()
|
|
@allure.title("Revert all shards mode")
|
|
def revert_all_shards_mode(cluster: Cluster) -> None:
|
|
yield
|
|
parallel(set_shard_rw_mode, cluster.cluster_nodes)
|
|
|
|
|
|
@pytest.fixture()
|
|
def object_id(client_shell: Shell, cluster: Cluster, container: str, default_wallet: WalletInfo, max_object_size: int) -> str:
|
|
with reporter.step("Create container, and put object"):
|
|
file = generate_file(round(max_object_size * 0.8))
|
|
oid = put_object(default_wallet, file, container, client_shell, cluster.default_rpc_endpoint)
|
|
return oid
|
|
|
|
|
|
@pytest.fixture()
|
|
def node_with_object(cluster: Cluster, container: str, object_id: str) -> ClusterNode:
|
|
with reporter.step("Search node with object"):
|
|
nodes = get_object_nodes(cluster, container, object_id, cluster.cluster_nodes[0])
|
|
|
|
return nodes[0]
|
|
|
|
|
|
@pytest.fixture()
|
|
@wait_for_success(180, 30, title="Search object in system")
|
|
def object_path_on_node(object_id: str, container: str, node_with_object: ClusterNode) -> str:
|
|
oid_path = f"{object_id[0]}/{object_id[1]}/{object_id[2]}/{object_id[3]}"
|
|
object_path = None
|
|
|
|
with reporter.step("Search object file"):
|
|
node_shell = node_with_object.storage_node.host.get_shell()
|
|
data_path = node_with_object.storage_node.get_data_directory()
|
|
all_datas = node_shell.exec(f"ls -la {data_path}/data | awk '{{ print $9 }}'").stdout.strip()
|
|
for data_dir in all_datas.replace(".", "").strip().split("\n"):
|
|
check_dir = node_shell.exec(f" [ -d {data_path}/data/{data_dir}/data/{oid_path} ] && echo 1 || echo 0").stdout
|
|
if "1" in check_dir:
|
|
object_path = f"{data_path}/data/{data_dir}/data/{oid_path}"
|
|
object_name = f"{object_id[4:]}.{container}"
|
|
break
|
|
|
|
assert object_path is not None, f"{object_id} object not found in directory - {data_path}/data"
|
|
return os.path.join(object_path, object_name)
|
|
|
|
|
|
@pytest.fixture()
|
|
def erroneous_object_id(object_id: str, object_path_on_node: str, node_with_object: ClusterNode):
|
|
with reporter.step("Block read file"):
|
|
node_with_object.host.get_shell().exec(f"chmod a-r {object_path_on_node}")
|
|
|
|
yield object_id
|
|
|
|
with reporter.step("Restore file access"):
|
|
node_with_object.host.get_shell().exec(f"chmod +r {object_path_on_node}")
|
|
|
|
|
|
@pytest.fixture()
|
|
def change_config_storage(cluster_state_controller: ClusterStateController):
|
|
with reporter.step("Change threshold error shards"):
|
|
cluster_state_controller.manager(ConfigStateManager).set_on_all_nodes(
|
|
service_type=StorageNode, values={"storage:shard_ro_error_threshold": "5"}
|
|
)
|
|
yield
|
|
with reporter.step("Restore threshold error shards"):
|
|
cluster_state_controller.manager(ConfigStateManager).revert_all()
|
|
|
|
|
|
@pytest.mark.nightly
|
|
@pytest.mark.shard
|
|
class TestControlShard(ClusterTestBase):
|
|
@staticmethod
|
|
def get_shards_from_cli(node: StorageNode) -> list[Shard]:
|
|
wallet_path = node.get_remote_wallet_path()
|
|
wallet_password = node.get_wallet_password()
|
|
control_endpoint = node.get_control_endpoint()
|
|
|
|
cli_config = node.host.get_cli_config("frostfs-cli")
|
|
|
|
cli = FrostfsCli(node.host.get_shell(), cli_config.exec_path)
|
|
result = cli.shards.list(
|
|
endpoint=control_endpoint,
|
|
wallet=wallet_path,
|
|
wallet_password=wallet_password,
|
|
json_mode=True,
|
|
timeout=CLI_DEFAULT_TIMEOUT,
|
|
)
|
|
return [Shard.from_object(shard) for shard in json.loads(result.stdout.split(">", 1)[1])]
|
|
|
|
@allure.title("All shards are available")
|
|
def test_control_shard(self, cluster: Cluster):
|
|
for storage_node in cluster.storage_nodes:
|
|
shards_from_config = storage_node.get_shards()
|
|
shards_from_cli = self.get_shards_from_cli(storage_node)
|
|
|
|
assert set(shards_from_config) == set(shards_from_cli)
|
|
|
|
@allure.title("Shard become read-only when errors exceeds threshold")
|
|
@pytest.mark.failover
|
|
@requires_container(PUBLIC_WITH_POLICY("REP 1 CBF 1", short_name="REP 1"))
|
|
def test_shard_errors(
|
|
self,
|
|
default_wallet: WalletInfo,
|
|
container: str,
|
|
node_with_object: ClusterNode,
|
|
erroneous_object_id: str,
|
|
object_path_on_node: str,
|
|
change_config_storage: None,
|
|
revert_all_shards_mode: None,
|
|
):
|
|
with reporter.step("Get object, expect 6 errors"):
|
|
for _ in range(6):
|
|
with pytest.raises(RuntimeError):
|
|
get_object(default_wallet, container, erroneous_object_id, self.shell, node_with_object.storage_node.get_rpc_endpoint())
|
|
|
|
with reporter.step("Check shard status"):
|
|
for shard in ShardsWatcher(node_with_object).get_shards():
|
|
if shard["blobstor"][1]["path"] in object_path_on_node:
|
|
with reporter.step(f"Shard {shard['shard_id']} should be in read-only mode"):
|
|
assert shard["mode"] == "read-only"
|
|
break
|