MInor change for shard #277
2 changed files with 113 additions and 15 deletions
|
@ -143,3 +143,101 @@ class FrostfsCliShards(CliCommand):
|
||||||
**{param: value for param, value in locals().items() if param not in ["self", "wallet_password"]},
|
**{param: value for param, value in locals().items() if param not in ["self", "wallet_password"]},
|
||||||
)
|
)
|
||||||
|
|
||||||
|
def evacuation_start(
|
||||||
|
self,
|
||||||
|
endpoint: str,
|
||||||
|
id: Optional[str] = None,
|
||||||
|
scope: Optional[str] = None,
|
||||||
|
all: bool = False,
|
||||||
|
no_errors: bool = True,
|
||||||
|
await_mode: bool = False,
|
||||||
|
address: Optional[str] = None,
|
||||||
|
timeout: Optional[str] = None,
|
||||||
|
no_progress: bool = False,
|
||||||
|
) -> CommandResult:
|
||||||
|
"""
|
||||||
|
Objects evacuation from shard to other shards.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
address: Address of wallet account
|
||||||
|
all: Process all shards
|
||||||
|
await: Block execution until evacuation is completed
|
||||||
|
endpoint: Remote node control address (as 'multiaddr' or '<host>:<port>')
|
||||||
|
id: List of shard IDs in base58 encoding
|
||||||
|
no_errors: Skip invalid/unreadable objects (default true)
|
||||||
|
no_progress: Print progress if await provided
|
||||||
|
scope: Evacuation scope; possible values: trees, objects, all (default "all")
|
||||||
|
timeout: Timeout for an operation (default 15s)
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Command's result.
|
||||||
|
"""
|
||||||
|
return self._execute(
|
||||||
|
"control shards evacuation start",
|
||||||
|
**{param: value for param, value in locals().items() if param not in ["self"]},
|
||||||
|
)
|
||||||
|
|
||||||
|
def evacuation_reset(
|
||||||
|
self,
|
||||||
|
endpoint: str,
|
||||||
|
address: Optional[str] = None,
|
||||||
|
timeout: Optional[str] = None,
|
||||||
|
) -> CommandResult:
|
||||||
|
"""
|
||||||
|
Reset evacuate objects from shard to other shards status.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
address: Address of wallet account
|
||||||
|
endpoint: Remote node control address (as 'multiaddr' or '<host>:<port>')
|
||||||
|
timeout: Timeout for an operation (default 15s)
|
||||||
|
Returns:
|
||||||
|
Command's result.
|
||||||
|
"""
|
||||||
|
return self._execute(
|
||||||
|
"control shards evacuation reset",
|
||||||
|
**{param: value for param, value in locals().items() if param not in ["self"]},
|
||||||
|
)
|
||||||
|
|
||||||
|
def evacuation_stop(
|
||||||
|
self,
|
||||||
|
endpoint: str,
|
||||||
|
address: Optional[str] = None,
|
||||||
|
timeout: Optional[str] = None,
|
||||||
|
) -> CommandResult:
|
||||||
|
"""
|
||||||
|
Stop running evacuate process from shard to other shards.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
address: Address of wallet account
|
||||||
|
endpoint: Remote node control address (as 'multiaddr' or '<host>:<port>')
|
||||||
|
timeout: Timeout for an operation (default 15s)
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Command's result.
|
||||||
|
"""
|
||||||
|
return self._execute(
|
||||||
|
"control shards evacuation stop",
|
||||||
|
**{param: value for param, value in locals().items() if param not in ["self"]},
|
||||||
|
)
|
||||||
|
|
||||||
|
def evacuation_status(
|
||||||
|
self,
|
||||||
|
endpoint: str,
|
||||||
|
address: Optional[str] = None,
|
||||||
|
timeout: Optional[str] = None,
|
||||||
|
) -> CommandResult:
|
||||||
|
"""
|
||||||
|
Get evacuate objects from shard to other shards status.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
address: Address of wallet account
|
||||||
|
endpoint: Remote node control address (as 'multiaddr' or '<host>:<port>')
|
||||||
|
timeout: Timeout for an operation (default 15s)
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Command's result.
|
||||||
|
"""
|
||||||
|
return self._execute(
|
||||||
|
"control shards evacuation status",
|
||||||
|
**{param: value for param, value in locals().items() if param not in ["self"]},
|
||||||
|
)
|
||||||
|
|
|
@ -2,22 +2,22 @@ import json
|
||||||
from typing import Any
|
from typing import Any
|
||||||
|
|
||||||
from frostfs_testlib.cli.frostfs_cli.shards import FrostfsCliShards
|
from frostfs_testlib.cli.frostfs_cli.shards import FrostfsCliShards
|
||||||
|
from frostfs_testlib.shell.interfaces import CommandResult
|
||||||
from frostfs_testlib.storage.cluster import ClusterNode
|
from frostfs_testlib.storage.cluster import ClusterNode
|
||||||
from frostfs_testlib.testing.test_control import wait_for_success
|
from frostfs_testlib.testing.test_control import wait_for_success
|
||||||
|
|
||||||
|
|
||||||
class ShardsWatcher:
|
class ShardsWatcher:
|
||||||
shards_snapshots: list[dict[str, Any]] = []
|
|
||||||
|
|
||||||
def __init__(self, node_under_test: ClusterNode) -> None:
|
def __init__(self, node_under_test: ClusterNode) -> None:
|
||||||
|
self.shards_snapshots: list[dict[str, Any]] = []
|
||||||
self.storage_node = node_under_test.storage_node
|
self.storage_node = node_under_test.storage_node
|
||||||
self.take_shards_snapshot()
|
self.take_shards_snapshot()
|
||||||
|
|
||||||
def take_shards_snapshot(self):
|
def take_shards_snapshot(self) -> None:
|
||||||
snapshot = self.get_shards_snapshot()
|
snapshot = self.get_shards_snapshot()
|
||||||
self.shards_snapshots.append(snapshot)
|
self.shards_snapshots.append(snapshot)
|
||||||
|
|
||||||
def get_shards_snapshot(self):
|
def get_shards_snapshot(self) -> dict[str, Any]:
|
||||||
shards_snapshot: dict[str, Any] = {}
|
shards_snapshot: dict[str, Any] = {}
|
||||||
|
|
||||||
shards = self.get_shards()
|
shards = self.get_shards()
|
||||||
|
@ -26,17 +26,17 @@ class ShardsWatcher:
|
||||||
|
|
||||||
return shards_snapshot
|
return shards_snapshot
|
||||||
|
|
||||||
def _get_current_snapshot(self):
|
def _get_current_snapshot(self) -> dict[str, Any]:
|
||||||
return self.shards_snapshots[-1]
|
return self.shards_snapshots[-1]
|
||||||
|
|
||||||
def _get_previous_snapshot(self):
|
def _get_previous_snapshot(self) -> dict[str, Any]:
|
||||||
return self.shards_snapshots[-2]
|
return self.shards_snapshots[-2]
|
||||||
|
|
||||||
def _is_shard_present(self, shard_id):
|
def _is_shard_present(self, shard_id) -> bool:
|
||||||
snapshot = self._get_current_snapshot()
|
snapshot = self._get_current_snapshot()
|
||||||
return shard_id in snapshot
|
return shard_id in snapshot
|
||||||
|
|
||||||
def get_shards_with_new_errors(self):
|
def get_shards_with_new_errors(self) -> dict[str, Any]:
|
||||||
current_snapshot = self._get_current_snapshot()
|
current_snapshot = self._get_current_snapshot()
|
||||||
previous_snapshot = self._get_previous_snapshot()
|
previous_snapshot = self._get_previous_snapshot()
|
||||||
shards_with_new_errors: dict[str, Any] = {}
|
shards_with_new_errors: dict[str, Any] = {}
|
||||||
|
@ -46,7 +46,7 @@ class ShardsWatcher:
|
||||||
|
|
||||||
return shards_with_new_errors
|
return shards_with_new_errors
|
||||||
|
|
||||||
def get_shards_with_errors(self):
|
def get_shards_with_errors(self) -> dict[str, Any]:
|
||||||
snapshot = self.get_shards_snapshot()
|
snapshot = self.get_shards_snapshot()
|
||||||
shards_with_errors: dict[str, Any] = {}
|
shards_with_errors: dict[str, Any] = {}
|
||||||
for shard_id, shard in snapshot.items():
|
for shard_id, shard in snapshot.items():
|
||||||
|
@ -55,7 +55,7 @@ class ShardsWatcher:
|
||||||
|
|
||||||
return shards_with_errors
|
return shards_with_errors
|
||||||
|
|
||||||
def get_shard_status(self, shard_id: str):
|
def get_shard_status(self, shard_id: str): # -> Any:
|
||||||
snapshot = self.get_shards_snapshot()
|
snapshot = self.get_shards_snapshot()
|
||||||
|
|
||||||
assert shard_id in snapshot, f"Shard {shard_id} is missing: {snapshot}"
|
assert shard_id in snapshot, f"Shard {shard_id} is missing: {snapshot}"
|
||||||
|
@ -63,18 +63,18 @@ class ShardsWatcher:
|
||||||
return snapshot[shard_id]["mode"]
|
return snapshot[shard_id]["mode"]
|
||||||
|
|
||||||
@wait_for_success(60, 2)
|
@wait_for_success(60, 2)
|
||||||
def await_for_all_shards_status(self, status: str):
|
def await_for_all_shards_status(self, status: str) -> None:
|
||||||
snapshot = self.get_shards_snapshot()
|
snapshot = self.get_shards_snapshot()
|
||||||
|
|
||||||
for shard_id in snapshot:
|
for shard_id in snapshot:
|
||||||
assert snapshot[shard_id]["mode"] == status, f"Shard {shard_id} have wrong shard status"
|
assert snapshot[shard_id]["mode"] == status, f"Shard {shard_id} have wrong shard status"
|
||||||
|
|
||||||
@wait_for_success(60, 2)
|
@wait_for_success(60, 2)
|
||||||
def await_for_shard_status(self, shard_id: str, status: str):
|
def await_for_shard_status(self, shard_id: str, status: str) -> None:
|
||||||
assert self.get_shard_status(shard_id) == status
|
assert self.get_shard_status(shard_id) == status
|
||||||
|
|
||||||
@wait_for_success(60, 2)
|
@wait_for_success(60, 2)
|
||||||
def await_for_shard_have_new_errors(self, shard_id: str):
|
def await_for_shard_have_new_errors(self, shard_id: str) -> None:
|
||||||
self.take_shards_snapshot()
|
self.take_shards_snapshot()
|
||||||
assert self._is_shard_present(shard_id)
|
assert self._is_shard_present(shard_id)
|
||||||
shards_with_new_errors = self.get_shards_with_new_errors()
|
shards_with_new_errors = self.get_shards_with_new_errors()
|
||||||
|
@ -82,7 +82,7 @@ class ShardsWatcher:
|
||||||
assert shard_id in shards_with_new_errors, f"Expected shard {shard_id} to have new errors, but haven't {self.shards_snapshots[-1]}"
|
assert shard_id in shards_with_new_errors, f"Expected shard {shard_id} to have new errors, but haven't {self.shards_snapshots[-1]}"
|
||||||
|
|
||||||
@wait_for_success(300, 5)
|
@wait_for_success(300, 5)
|
||||||
def await_for_shards_have_no_new_errors(self):
|
def await_for_shards_have_no_new_errors(self) -> None:
|
||||||
self.take_shards_snapshot()
|
self.take_shards_snapshot()
|
||||||
shards_with_new_errors = self.get_shards_with_new_errors()
|
shards_with_new_errors = self.get_shards_with_new_errors()
|
||||||
assert len(shards_with_new_errors) == 0
|
assert len(shards_with_new_errors) == 0
|
||||||
|
@ -102,7 +102,7 @@ class ShardsWatcher:
|
||||||
|
|
||||||
return json.loads(response.stdout.split(">", 1)[1])
|
return json.loads(response.stdout.split(">", 1)[1])
|
||||||
|
|
||||||
def set_shard_mode(self, shard_id: str, mode: str, clear_errors: bool = True):
|
def set_shard_mode(self, shard_id: str, mode: str, clear_errors: bool = True) -> CommandResult:
|
||||||
shards_cli = FrostfsCliShards(
|
shards_cli = FrostfsCliShards(
|
||||||
self.storage_node.host.get_shell(),
|
self.storage_node.host.get_shell(),
|
||||||
self.storage_node.host.get_cli_config("frostfs-cli").exec_path,
|
self.storage_node.host.get_cli_config("frostfs-cli").exec_path,
|
||||||
|
|
Loading…
Reference in a new issue