155 lines
5.2 KiB
Python
155 lines
5.2 KiB
Python
import json
|
|
import pathlib
|
|
import re
|
|
from dataclasses import dataclass
|
|
from io import StringIO
|
|
|
|
import allure
|
|
import pytest
|
|
import yaml
|
|
from configobj import ConfigObj
|
|
from frostfs_testlib.cli import FrostfsCli
|
|
from frostfs_testlib.resources.cli import CLI_DEFAULT_TIMEOUT
|
|
from frostfs_testlib.resources.common import DEFAULT_WALLET_CONFIG
|
|
from frostfs_testlib.storage.cluster import Cluster, StorageNode
|
|
|
|
SHARD_PREFIX = "FROSTFS_STORAGE_SHARD_"
|
|
BLOBSTOR_PREFIX = "_BLOBSTOR_"
|
|
|
|
|
|
@dataclass
|
|
class Blobstor:
|
|
path: str
|
|
path_type: str
|
|
|
|
def __eq__(self, other) -> bool:
|
|
if not isinstance(other, self.__class__):
|
|
raise RuntimeError(f"Only two {self.__class__.__name__} instances can be compared")
|
|
return self.path == other.path and self.path_type == other.path_type
|
|
|
|
def __hash__(self):
|
|
return hash((self.path, self.path_type))
|
|
|
|
@staticmethod
|
|
def from_config_object(section: ConfigObj, shard_id: str, blobstor_id: str):
|
|
var_prefix = f"{SHARD_PREFIX}{shard_id}{BLOBSTOR_PREFIX}{blobstor_id}"
|
|
return Blobstor(section.get(f"{var_prefix}_PATH"), section.get(f"{var_prefix}_TYPE"))
|
|
|
|
|
|
@dataclass
|
|
class Shard:
|
|
blobstor: list[Blobstor]
|
|
metabase: str
|
|
writecache: str
|
|
|
|
def __eq__(self, other) -> bool:
|
|
if not isinstance(other, self.__class__):
|
|
raise RuntimeError(f"Only two {self.__class__.__name__} instances can be compared")
|
|
return (
|
|
set(self.blobstor) == set(other.blobstor)
|
|
and self.metabase == other.metabase
|
|
and self.writecache == other.writecache
|
|
)
|
|
|
|
def __hash__(self):
|
|
return hash((self.metabase, self.writecache))
|
|
|
|
@staticmethod
|
|
def _get_blobstor_count_from_section(config_object: ConfigObj, shard_id: int):
|
|
pattern = f"{SHARD_PREFIX}{shard_id}{BLOBSTOR_PREFIX}"
|
|
blobstors = {key[: len(pattern) + 2] for key in config_object.keys() if pattern in key}
|
|
return len(blobstors)
|
|
|
|
@staticmethod
|
|
def from_config_object(config_object: ConfigObj, shard_id: int):
|
|
var_prefix = f"{SHARD_PREFIX}{shard_id}"
|
|
|
|
blobstor_count = Shard._get_blobstor_count_from_section(config_object, shard_id)
|
|
blobstors = [
|
|
Blobstor.from_config_object(config_object, shard_id, blobstor_id)
|
|
for blobstor_id in range(blobstor_count)
|
|
]
|
|
|
|
write_cache_enabled = config_object.as_bool(f"{var_prefix}_WRITECACHE_ENABLED")
|
|
|
|
return Shard(
|
|
blobstors,
|
|
config_object.get(f"{var_prefix}_METABASE_PATH"),
|
|
config_object.get(f"{var_prefix}_WRITECACHE_PATH") if write_cache_enabled else "",
|
|
)
|
|
|
|
@staticmethod
|
|
def from_object(shard):
|
|
metabase = shard["metabase"]["path"] if "path" in shard["metabase"] else shard["metabase"]
|
|
writecache = (
|
|
shard["writecache"]["path"] if "path" in shard["writecache"] else shard["writecache"]
|
|
)
|
|
|
|
return Shard(
|
|
blobstor=[
|
|
Blobstor(path=blobstor["path"], path_type=blobstor["type"])
|
|
for blobstor in shard["blobstor"]
|
|
],
|
|
metabase=metabase,
|
|
writecache=writecache,
|
|
)
|
|
|
|
|
|
def shards_from_yaml(contents: str) -> list[Shard]:
|
|
config = yaml.safe_load(contents)
|
|
config["storage"]["shard"].pop("default")
|
|
|
|
return [Shard.from_object(shard) for shard in config["storage"]["shard"].values()]
|
|
|
|
|
|
def shards_from_env(contents: str) -> list[Shard]:
|
|
configObj = ConfigObj(StringIO(contents))
|
|
|
|
pattern = f"{SHARD_PREFIX}\d*"
|
|
num_shards = len(set(re.findall(pattern, contents)))
|
|
|
|
return [Shard.from_config_object(configObj, shard_id) for shard_id in range(num_shards)]
|
|
|
|
|
|
@pytest.mark.sanity
|
|
@pytest.mark.shard
|
|
class TestControlShard:
|
|
@staticmethod
|
|
def get_shards_from_config(node: StorageNode) -> list[Shard]:
|
|
config_file = node.get_shard_config_path()
|
|
file_type = pathlib.Path(config_file).suffix
|
|
contents = node.host.get_shell().exec(f"cat {config_file}").stdout
|
|
|
|
parser_method = {
|
|
".env": shards_from_env,
|
|
".yaml": shards_from_yaml,
|
|
".yml": shards_from_yaml,
|
|
}
|
|
|
|
shards = parser_method[file_type](contents)
|
|
return shards
|
|
|
|
@staticmethod
|
|
def get_shards_from_cli(node: StorageNode) -> list[Shard]:
|
|
wallet_path = node.get_remote_wallet_path()
|
|
wallet_password = node.get_wallet_password()
|
|
control_endpoint = node.get_control_endpoint()
|
|
|
|
cli_config = node.host.get_cli_config("frostfs-cli")
|
|
|
|
cli = FrostfsCli(node.host.get_shell(), cli_config.exec_path, DEFAULT_WALLET_CONFIG)
|
|
result = cli.shards.list(
|
|
endpoint=control_endpoint,
|
|
wallet=wallet_path,
|
|
wallet_password=wallet_password,
|
|
json_mode=True,
|
|
timeout=CLI_DEFAULT_TIMEOUT,
|
|
)
|
|
return [Shard.from_object(shard) for shard in json.loads(result.stdout.split(">", 1)[1])]
|
|
|
|
@allure.title("All shards are available")
|
|
def test_control_shard(self, cluster: Cluster):
|
|
for storage_node in cluster.storage_nodes:
|
|
shards_from_config = self.get_shards_from_config(storage_node)
|
|
shards_from_cli = self.get_shards_from_cli(storage_node)
|
|
assert set(shards_from_config) == set(shards_from_cli)
|