Add test shard mode #180
1 changed files with 106 additions and 2 deletions
|
@ -3,15 +3,80 @@ import pathlib
|
||||||
|
|
||||||
import allure
|
import allure
|
||||||
import pytest
|
import pytest
|
||||||
|
from frostfs_testlib import reporter
|
||||||
from frostfs_testlib.cli import FrostfsCli
|
from frostfs_testlib.cli import FrostfsCli
|
||||||
from frostfs_testlib.resources.cli import CLI_DEFAULT_TIMEOUT
|
from frostfs_testlib.resources.cli import CLI_DEFAULT_TIMEOUT
|
||||||
from frostfs_testlib.resources.common import DEFAULT_WALLET_CONFIG
|
from frostfs_testlib.resources.common import DEFAULT_WALLET_CONFIG
|
||||||
from frostfs_testlib.storage.cluster import Cluster, StorageNode
|
from frostfs_testlib.steps.cli.container import create_container, delete_container
|
||||||
|
from frostfs_testlib.steps.cli.object import delete_object, get_object, get_object_nodes, put_object
|
||||||
|
from frostfs_testlib.storage.cluster import Cluster, ClusterNode, StorageNode
|
||||||
|
from frostfs_testlib.storage.controllers import ClusterStateController, ShardsWatcher
|
||||||
|
from frostfs_testlib.storage.controllers.state_managers.config_state_manager import ConfigStateManager
|
||||||
from frostfs_testlib.storage.dataclasses.shard import Shard
|
from frostfs_testlib.storage.dataclasses.shard import Shard
|
||||||
|
from frostfs_testlib.testing import parallel
|
||||||
|
from frostfs_testlib.testing.cluster_test_base import ClusterTestBase
|
||||||
|
from frostfs_testlib.utils.file_utils import generate_file
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.shard
|
@pytest.mark.shard
|
||||||
class TestControlShard:
|
class TestControlShard(ClusterTestBase):
|
||||||
|
@staticmethod
|
||||||
|
def get_object_path_and_name_file(oid: str, cid: str, node: ClusterNode) -> tuple[str, str]:
|
||||||
|
oid_path = f"{oid[0]}/{oid[1]}/{oid[2]}/{oid[3]}"
|
||||||
|
with reporter.step("Search object file"):
|
||||||
|
node_shell = node.storage_node.host.get_shell()
|
||||||
|
data_path = node.storage_node.get_data_directory()
|
||||||
|
all_datas = node_shell.exec(f"ls -la {data_path} | awk '{{ print $9 }}'").stdout.strip()
|
||||||
|
for data in all_datas.replace(".", "").strip().split("\n"):
|
||||||
|
check_dir = node_shell.exec(f" [ -d {data_path}/{data}/{oid_path} ] && echo 1 || echo 0").stdout
|
||||||
|
if "1" in check_dir:
|
||||||
|
object_path = f"{data_path}/{data}/{oid_path}"
|
||||||
|
object_name = f"{oid[4:]}.{cid}"
|
||||||
|
break
|
||||||
|
return object_path, object_name
|
||||||
|
|
||||||
|
def set_shard_rw_mode(self, node: ClusterNode):
|
||||||
|
watcher = ShardsWatcher(node)
|
||||||
|
shards = watcher.get_shards()
|
||||||
|
for shard in shards:
|
||||||
|
watcher.set_shard_mode(shard["shard_id"], mode="read-write")
|
||||||
|
watcher.await_for_all_shards_status(status="read-write")
|
||||||
|
|
||||||
|
@pytest.fixture()
|
||||||
|
@allure.title("Revert all shards mode")
|
||||||
|
def revert_all_shards_mode(self) -> None:
|
||||||
|
yield
|
||||||
|
parallel(self.set_shard_rw_mode, self.cluster.cluster_nodes)
|
||||||
|
|
||||||
|
@pytest.fixture()
|
||||||
|
def oid_cid_node(self, default_wallet: str) -> tuple[str, str, ClusterNode]:
|
||||||
|
with reporter.step("Create container, and put object"):
|
||||||
|
cid = create_container(
|
||||||
|
wallet=default_wallet, shell=self.shell, endpoint=self.cluster.default_rpc_endpoint, rule="REP 1 CBF 1"
|
||||||
|
)
|
||||||
|
file = generate_file(5242880)
|
||||||
|
oid = put_object(
|
||||||
|
wallet=default_wallet, path=file, cid=cid, shell=self.shell, endpoint=self.cluster.default_rpc_endpoint
|
||||||
|
)
|
||||||
|
with reporter.step("Search node with object"):
|
||||||
|
nodes = get_object_nodes(
|
||||||
|
cluster=self.cluster,
|
||||||
|
wallet=default_wallet,
|
||||||
|
cid=cid,
|
||||||
|
oid=oid,
|
||||||
|
shell=self.shell,
|
||||||
|
endpoint=self.cluster.default_rpc_endpoint,
|
||||||
|
)
|
||||||
|
|
||||||
|
yield oid, cid, nodes[0]
|
||||||
|
|
||||||
|
object_path, object_name = self.get_object_path_and_name_file(oid, cid, nodes[0])
|
||||||
|
nodes[0].host.get_shell().exec(f"chmod +r {object_path}/{object_name}")
|
||||||
|
delete_object(
|
||||||
|
wallet=default_wallet, cid=cid, oid=oid, shell=self.shell, endpoint=self.cluster.default_rpc_endpoint
|
||||||
|
)
|
||||||
|
delete_container(wallet=default_wallet, cid=cid, shell=self.shell, endpoint=self.cluster.default_rpc_endpoint)
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
def get_shards_from_config(node: StorageNode) -> list[Shard]:
|
def get_shards_from_config(node: StorageNode) -> list[Shard]:
|
||||||
config_file = node.get_shard_config_path()
|
config_file = node.get_shard_config_path()
|
||||||
|
@ -44,6 +109,16 @@ class TestControlShard:
|
||||||
)
|
)
|
||||||
return [Shard.from_object(shard) for shard in json.loads(result.stdout.split(">", 1)[1])]
|
return [Shard.from_object(shard) for shard in json.loads(result.stdout.split(">", 1)[1])]
|
||||||
|
|
||||||
|
@pytest.fixture()
|
||||||
|
def change_config_storage(self, cluster_state_controller: ClusterStateController):
|
||||||
|
with reporter.step("Change threshold error shards"):
|
||||||
|
cluster_state_controller.manager(ConfigStateManager).set_on_all_nodes(
|
||||||
|
service_type=StorageNode, values={"storage:shard_ro_error_threshold": "5"}
|
||||||
|
)
|
||||||
|
yield
|
||||||
|
with reporter.step("Restore threshold error shards"):
|
||||||
|
cluster_state_controller.manager(ConfigStateManager).revert_all()
|
||||||
|
|
||||||
@allure.title("All shards are available")
|
@allure.title("All shards are available")
|
||||||
def test_control_shard(self, cluster: Cluster):
|
def test_control_shard(self, cluster: Cluster):
|
||||||
for storage_node in cluster.storage_nodes:
|
for storage_node in cluster.storage_nodes:
|
||||||
|
@ -51,3 +126,32 @@ class TestControlShard:
|
||||||
shards_from_cli = self.get_shards_from_cli(storage_node)
|
shards_from_cli = self.get_shards_from_cli(storage_node)
|
||||||
|
|
||||||
assert set(shards_from_config) == set(shards_from_cli)
|
assert set(shards_from_config) == set(shards_from_cli)
|
||||||
|
|
||||||
|
@pytest.mark.failover
|
||||||
|
def test_shard_errors(
|
||||||
|
self,
|
||||||
|
default_wallet: str,
|
||||||
|
oid_cid_node: tuple[str, str, ClusterNode],
|
||||||
|
change_config_storage: None,
|
||||||
|
revert_all_shards_mode: None,
|
||||||
|
):
|
||||||
|
oid, cid, node = oid_cid_node
|
||||||
|
object_path, object_name = self.get_object_path_and_name_file(oid, cid, node)
|
||||||
|
with reporter.step("Block read file"):
|
||||||
|
node.host.get_shell().exec(f"chmod a-r {object_path}/{object_name}")
|
||||||
|
with reporter.step("Get object, expect 6 errors"):
|
||||||
|
for _ in range(6):
|
||||||
|
with pytest.raises(RuntimeError):
|
||||||
|
get_object(
|
||||||
|
wallet=default_wallet,
|
||||||
|
cid=cid,
|
||||||
|
oid=oid,
|
||||||
|
shell=self.shell,
|
||||||
|
endpoint=node.storage_node.get_rpc_endpoint(),
|
||||||
|
)
|
||||||
|
with reporter.step("Check shard status"):
|
||||||
|
for shard in ShardsWatcher(node).get_shards():
|
||||||
|
if shard["blobstor"][1]["path"] in object_path:
|
||||||
|
with reporter.step(f"Shard - {shard['shard_id']} to {node.host_ip}, mode - {shard['mode']}"):
|
||||||
|
assert shard["mode"] == "read-only"
|
||||||
|
break
|
||||||
|
|
Loading…
Reference in a new issue