import json from typing import Any from frostfs_testlib.cli.frostfs_cli.shards import FrostfsCliShards from frostfs_testlib.storage.cluster import ClusterNode from frostfs_testlib.testing.test_control import wait_for_success class ShardsWatcher: shards_snapshots: list[dict[str, Any]] = [] def __init__(self, node_under_test: ClusterNode) -> None: self.storage_node = node_under_test.storage_node self.take_shards_snapshot() def take_shards_snapshot(self): snapshot = self.get_shards_snapshot() self.shards_snapshots.append(snapshot) def get_shards_snapshot(self): shards_snapshot: dict[str, Any] = {} shards = self.get_shards() for shard in shards: shards_snapshot[shard["shard_id"]] = shard return shards_snapshot def _get_current_snapshot(self): return self.shards_snapshots[-1] def _get_previous_snapshot(self): return self.shards_snapshots[-2] def _is_shard_present(self, shard_id): snapshot = self._get_current_snapshot() return shard_id in snapshot def get_shards_with_new_errors(self): current_snapshot = self._get_current_snapshot() previous_snapshot = self._get_previous_snapshot() shards_with_new_errors: dict[str, Any] = {} for shard_id, shard in previous_snapshot.items(): if current_snapshot[shard_id]["error_count"] > shard["error_count"]: shards_with_new_errors[shard_id] = current_snapshot[shard_id] return shards_with_new_errors def get_shards_with_errors(self): snapshot = self.get_shards_snapshot() shards_with_errors: dict[str, Any] = {} for shard_id, shard in snapshot.items(): if shard["error_count"] > 0: shards_with_errors[shard_id] = shard return shards_with_errors def get_shard_status(self, shard_id: str): snapshot = self.get_shards_snapshot() assert shard_id in snapshot, f"Shard {shard_id} is missing: {snapshot}" return snapshot[shard_id]["mode"] @wait_for_success(60, 2) def await_for_all_shards_status(self, status: str): snapshot = self.get_shards_snapshot() for shard_id in snapshot: assert snapshot[shard_id]["mode"] == status, f"Shard {shard_id} have wrong shard status" @wait_for_success(60, 2) def await_for_shard_status(self, shard_id: str, status: str): assert self.get_shard_status(shard_id) == status @wait_for_success(60, 2) def await_for_shard_have_new_errors(self, shard_id: str): self.take_shards_snapshot() assert self._is_shard_present(shard_id) shards_with_new_errors = self.get_shards_with_new_errors() assert ( shard_id in shards_with_new_errors ), f"Expected shard {shard_id} to have new errors, but haven't {self.shards_snapshots[-1]}" @wait_for_success(300, 5) def await_for_shards_have_no_new_errors(self): self.take_shards_snapshot() shards_with_new_errors = self.get_shards_with_new_errors() assert len(shards_with_new_errors) == 0 def get_shards(self) -> dict[str, Any]: shards_cli = FrostfsCliShards( self.storage_node.host.get_shell(), self.storage_node.host.get_cli_config("frostfs-cli").exec_path, ) response = shards_cli.list( endpoint=self.storage_node.get_control_endpoint(), wallet=self.storage_node.get_remote_wallet_path(), wallet_password=self.storage_node.get_wallet_password(), json_mode=True, ) return json.loads(response.stdout.split(">", 1)[1]) def set_shard_mode(self, shard_id: str, mode: str, clear_errors: bool = True): shards_cli = FrostfsCliShards( self.storage_node.host.get_shell(), self.storage_node.host.get_cli_config("frostfs-cli").exec_path, ) return shards_cli.set_mode( self.storage_node.get_control_endpoint(), self.storage_node.get_remote_wallet_path(), self.storage_node.get_wallet_password(), mode=mode, id=[shard_id], clear_errors=clear_errors, )