import json from typing import Optional from frostfs_testlib import reporter from frostfs_testlib.cli.frostfs_cli.cli import FrostfsCli from frostfs_testlib.resources.cli import CLI_DEFAULT_TIMEOUT from frostfs_testlib.storage.cluster import Cluster, ClusterNode from frostfs_testlib.storage.controllers.shards_watcher import ShardsWatcher from frostfs_testlib.storage.dataclasses.storage_object_info import Chunk, Interfaces, NodeNetmapInfo from frostfs_testlib.storage.grpc_operations import interfaces from frostfs_testlib.testing.test_control import wait_for_success from frostfs_testlib.utils.cli_utils import parse_netmap_output class ChunksOperations(interfaces.ChunksInterface): def __init__(self, cli: FrostfsCli) -> None: self.cli = cli @reporter.step("Search node without chunks") def search_node_without_chunks(self, chunks: list[Chunk], cluster: Cluster, endpoint: str = None) -> list[ClusterNode]: if not endpoint: endpoint = cluster.default_rpc_endpoint netmap = parse_netmap_output(self.cli.netmap.snapshot(endpoint, timeout=CLI_DEFAULT_TIMEOUT).stdout) chunks_node_key = [] for chunk in chunks: chunks_node_key.extend(chunk.confirmed_nodes) for node_info in netmap.copy(): if node_info.node_id in chunks_node_key and node_info in netmap: netmap.remove(node_info) result = [] for node_info in netmap: for cluster_node in cluster.cluster_nodes: if node_info.node == cluster_node.get_interface(Interfaces.MGMT): result.append(cluster_node) return result @reporter.step("Search node with chunk {chunk}") def get_chunk_node(self, cluster: Cluster, chunk: Chunk) -> tuple[ClusterNode, NodeNetmapInfo]: netmap = parse_netmap_output(self.cli.netmap.snapshot(cluster.default_rpc_endpoint, timeout=CLI_DEFAULT_TIMEOUT).stdout) for node_info in netmap: if node_info.node_id in chunk.confirmed_nodes: for cluster_node in cluster.cluster_nodes: if cluster_node.get_interface(Interfaces.MGMT) == node_info.node: return (cluster_node, node_info) @wait_for_success(300, 5, fail_testcase=None) @reporter.step("Search shard with chunk {chunk}") def get_shard_chunk(self, node: ClusterNode, chunk: Chunk) -> str: oid_path = f"{chunk.object_id[0]}/{chunk.object_id[1]}/{chunk.object_id[2]}/{chunk.object_id[3]}" node_shell = node.storage_node.host.get_shell() shards_watcher = ShardsWatcher(node) with reporter.step("Search object file"): for shard_id, shard_info in shards_watcher.shards_snapshots[-1].items(): check_dir = node_shell.exec(f" [ -d {shard_info['blobstor'][1]['path']}/{oid_path} ] && echo 1 || echo 0").stdout if "1" in check_dir.strip(): return shard_id @reporter.step("Get all chunks") def get_all( self, rpc_endpoint: str, cid: str, oid: str, address: Optional[str] = None, bearer: Optional[str] = None, generate_key: Optional[bool] = None, trace: bool = True, root: bool = False, verify_presence_all: bool = False, json: bool = True, ttl: Optional[int] = None, xhdr: Optional[dict] = None, timeout: Optional[str] = None, ) -> list[Chunk]: object_nodes = self.cli.object.nodes( rpc_endpoint=rpc_endpoint, cid=cid, address=address, bearer=bearer, generate_key=generate_key, oid=oid, trace=trace, root=root, verify_presence_all=verify_presence_all, json=json, ttl=ttl, xhdr=xhdr, timeout=timeout, ) return self._parse_object_nodes(object_nodes.stdout.split("\n")[0]) @reporter.step("Get last parity chunk") def get_parity( self, rpc_endpoint: str, cid: str, address: Optional[str] = None, bearer: Optional[str] = None, generate_key: Optional[bool] = None, oid: Optional[str] = None, trace: bool = True, root: bool = False, verify_presence_all: bool = False, json: bool = True, ttl: Optional[int] = None, xhdr: Optional[dict] = None, timeout: Optional[str] = None, ) -> Chunk: object_nodes = self.cli.object.nodes( rpc_endpoint=rpc_endpoint, cid=cid, address=address, bearer=bearer, generate_key=generate_key, oid=oid, trace=trace, root=root, verify_presence_all=verify_presence_all, json=json, ttl=ttl, xhdr=xhdr, timeout=timeout, ) return self._parse_object_nodes(object_nodes.stdout.split("\n")[0])[-1] @reporter.step("Get first data chunk") def get_first_data( self, rpc_endpoint: str, cid: str, oid: Optional[str] = None, address: Optional[str] = None, bearer: Optional[str] = None, generate_key: Optional[bool] = None, trace: bool = True, root: bool = False, verify_presence_all: bool = False, json: bool = True, ttl: Optional[int] = None, xhdr: Optional[dict] = None, timeout: Optional[str] = CLI_DEFAULT_TIMEOUT, ) -> Chunk: object_nodes = self.cli.object.nodes( rpc_endpoint=rpc_endpoint, cid=cid, address=address, bearer=bearer, generate_key=generate_key, oid=oid, trace=trace, root=root, verify_presence_all=verify_presence_all, json=json, ttl=ttl, xhdr=xhdr, timeout=timeout, ) return self._parse_object_nodes(object_nodes.stdout.split("\n")[0])[0] def _parse_object_nodes(self, object_nodes: str) -> list[Chunk]: parse_result = json.loads(object_nodes) if parse_result.get("errors"): raise RuntimeError(", ".join(parse_result["errors"])) return [Chunk(**chunk) for chunk in parse_result["data_objects"]]