diff --git a/src/frostfs_testlib/cli/frostfs_cli/shards.py b/src/frostfs_testlib/cli/frostfs_cli/shards.py index 4399b13..e88707a 100644 --- a/src/frostfs_testlib/cli/frostfs_cli/shards.py +++ b/src/frostfs_testlib/cli/frostfs_cli/shards.py @@ -143,3 +143,101 @@ class FrostfsCliShards(CliCommand): **{param: value for param, value in locals().items() if param not in ["self", "wallet_password"]}, ) + def evacuation_start( + self, + endpoint: str, + id: Optional[str] = None, + scope: Optional[str] = None, + all: bool = False, + no_errors: bool = True, + await_mode: bool = False, + address: Optional[str] = None, + timeout: Optional[str] = None, + no_progress: bool = False, + ) -> CommandResult: + """ + Objects evacuation from shard to other shards. + + Args: + address: Address of wallet account + all: Process all shards + await: Block execution until evacuation is completed + endpoint: Remote node control address (as 'multiaddr' or ':') + id: List of shard IDs in base58 encoding + no_errors: Skip invalid/unreadable objects (default true) + no_progress: Print progress if await provided + scope: Evacuation scope; possible values: trees, objects, all (default "all") + timeout: Timeout for an operation (default 15s) + + Returns: + Command's result. + """ + return self._execute( + "control shards evacuation start", + **{param: value for param, value in locals().items() if param not in ["self"]}, + ) + + def evacuation_reset( + self, + endpoint: str, + address: Optional[str] = None, + timeout: Optional[str] = None, + ) -> CommandResult: + """ + Reset evacuate objects from shard to other shards status. + + Args: + address: Address of wallet account + endpoint: Remote node control address (as 'multiaddr' or ':') + timeout: Timeout for an operation (default 15s) + Returns: + Command's result. + """ + return self._execute( + "control shards evacuation reset", + **{param: value for param, value in locals().items() if param not in ["self"]}, + ) + + def evacuation_stop( + self, + endpoint: str, + address: Optional[str] = None, + timeout: Optional[str] = None, + ) -> CommandResult: + """ + Stop running evacuate process from shard to other shards. + + Args: + address: Address of wallet account + endpoint: Remote node control address (as 'multiaddr' or ':') + timeout: Timeout for an operation (default 15s) + + Returns: + Command's result. + """ + return self._execute( + "control shards evacuation stop", + **{param: value for param, value in locals().items() if param not in ["self"]}, + ) + + def evacuation_status( + self, + endpoint: str, + address: Optional[str] = None, + timeout: Optional[str] = None, + ) -> CommandResult: + """ + Get evacuate objects from shard to other shards status. + + Args: + address: Address of wallet account + endpoint: Remote node control address (as 'multiaddr' or ':') + timeout: Timeout for an operation (default 15s) + + Returns: + Command's result. + """ + return self._execute( + "control shards evacuation status", + **{param: value for param, value in locals().items() if param not in ["self"]}, + ) diff --git a/src/frostfs_testlib/storage/controllers/shards_watcher.py b/src/frostfs_testlib/storage/controllers/shards_watcher.py index 3d313f1..5017406 100644 --- a/src/frostfs_testlib/storage/controllers/shards_watcher.py +++ b/src/frostfs_testlib/storage/controllers/shards_watcher.py @@ -2,22 +2,22 @@ import json from typing import Any from frostfs_testlib.cli.frostfs_cli.shards import FrostfsCliShards +from frostfs_testlib.shell.interfaces import CommandResult from frostfs_testlib.storage.cluster import ClusterNode from frostfs_testlib.testing.test_control import wait_for_success class ShardsWatcher: - shards_snapshots: list[dict[str, Any]] = [] - def __init__(self, node_under_test: ClusterNode) -> None: + self.shards_snapshots: list[dict[str, Any]] = [] self.storage_node = node_under_test.storage_node self.take_shards_snapshot() - def take_shards_snapshot(self): + def take_shards_snapshot(self) -> None: snapshot = self.get_shards_snapshot() self.shards_snapshots.append(snapshot) - def get_shards_snapshot(self): + def get_shards_snapshot(self) -> dict[str, Any]: shards_snapshot: dict[str, Any] = {} shards = self.get_shards() @@ -26,17 +26,17 @@ class ShardsWatcher: return shards_snapshot - def _get_current_snapshot(self): + def _get_current_snapshot(self) -> dict[str, Any]: return self.shards_snapshots[-1] - def _get_previous_snapshot(self): + def _get_previous_snapshot(self) -> dict[str, Any]: return self.shards_snapshots[-2] - def _is_shard_present(self, shard_id): + def _is_shard_present(self, shard_id) -> bool: snapshot = self._get_current_snapshot() return shard_id in snapshot - def get_shards_with_new_errors(self): + def get_shards_with_new_errors(self) -> dict[str, Any]: current_snapshot = self._get_current_snapshot() previous_snapshot = self._get_previous_snapshot() shards_with_new_errors: dict[str, Any] = {} @@ -46,7 +46,7 @@ class ShardsWatcher: return shards_with_new_errors - def get_shards_with_errors(self): + def get_shards_with_errors(self) -> dict[str, Any]: snapshot = self.get_shards_snapshot() shards_with_errors: dict[str, Any] = {} for shard_id, shard in snapshot.items(): @@ -55,7 +55,7 @@ class ShardsWatcher: return shards_with_errors - def get_shard_status(self, shard_id: str): + def get_shard_status(self, shard_id: str): # -> Any: snapshot = self.get_shards_snapshot() assert shard_id in snapshot, f"Shard {shard_id} is missing: {snapshot}" @@ -63,18 +63,18 @@ class ShardsWatcher: return snapshot[shard_id]["mode"] @wait_for_success(60, 2) - def await_for_all_shards_status(self, status: str): + def await_for_all_shards_status(self, status: str) -> None: snapshot = self.get_shards_snapshot() for shard_id in snapshot: assert snapshot[shard_id]["mode"] == status, f"Shard {shard_id} have wrong shard status" @wait_for_success(60, 2) - def await_for_shard_status(self, shard_id: str, status: str): + def await_for_shard_status(self, shard_id: str, status: str) -> None: assert self.get_shard_status(shard_id) == status @wait_for_success(60, 2) - def await_for_shard_have_new_errors(self, shard_id: str): + def await_for_shard_have_new_errors(self, shard_id: str) -> None: self.take_shards_snapshot() assert self._is_shard_present(shard_id) shards_with_new_errors = self.get_shards_with_new_errors() @@ -82,7 +82,7 @@ class ShardsWatcher: assert shard_id in shards_with_new_errors, f"Expected shard {shard_id} to have new errors, but haven't {self.shards_snapshots[-1]}" @wait_for_success(300, 5) - def await_for_shards_have_no_new_errors(self): + def await_for_shards_have_no_new_errors(self) -> None: self.take_shards_snapshot() shards_with_new_errors = self.get_shards_with_new_errors() assert len(shards_with_new_errors) == 0 @@ -102,7 +102,7 @@ class ShardsWatcher: return json.loads(response.stdout.split(">", 1)[1]) - def set_shard_mode(self, shard_id: str, mode: str, clear_errors: bool = True): + def set_shard_mode(self, shard_id: str, mode: str, clear_errors: bool = True) -> CommandResult: shards_cli = FrostfsCliShards( self.storage_node.host.get_shell(), self.storage_node.host.get_cli_config("frostfs-cli").exec_path,