import json as module_json from typing import List, Optional from frostfs_testlib.cli.frostfs_cli.cli import FrostfsCli from frostfs_testlib.cli.netmap_parser import NetmapParser from frostfs_testlib.resources.cli import CLI_DEFAULT_TIMEOUT from frostfs_testlib.storage.dataclasses.storage_object_info import NodeNetInfo, NodeNetmapInfo from .. import interfaces class NetmapOperations(interfaces.NetmapInterface): def __init__(self, cli: FrostfsCli) -> None: self.cli = cli def epoch( self, rpc_endpoint: str, wallet: Optional[str] = None, address: Optional[str] = None, generate_key: bool = False, ttl: Optional[int] = None, trace: Optional[bool] = True, xhdr: Optional[dict] = None, timeout: Optional[str] = CLI_DEFAULT_TIMEOUT, ) -> int: """ Get current epoch number. """ output = ( self.cli.netmap.epoch( rpc_endpoint=rpc_endpoint, wallet=wallet, address=address, generate_key=generate_key, ttl=ttl, trace=trace, xhdr=xhdr, timeout=timeout, ) .stdout.split("Trace ID")[0] .strip() ) return int(output) def netinfo( self, rpc_endpoint: str, wallet: Optional[str] = None, address: Optional[str] = None, generate_key: bool = False, ttl: Optional[int] = None, trace: Optional[bool] = True, xhdr: Optional[dict] = None, timeout: Optional[str] = None, ) -> NodeNetInfo: """ Get target node info. """ output = ( self.cli.netmap.netinfo( rpc_endpoint=rpc_endpoint, wallet=wallet, address=address, generate_key=generate_key, ttl=ttl, trace=trace, xhdr=xhdr, timeout=timeout, ) .stdout.split("Trace ID")[0] .strip() ) return NetmapParser.netinfo(output) def nodeinfo( self, rpc_endpoint: str, wallet: Optional[str] = None, address: Optional[str] = None, generate_key: bool = False, json: bool = True, ttl: Optional[int] = None, trace: Optional[bool] = True, xhdr: Optional[dict] = None, timeout: Optional[str] = CLI_DEFAULT_TIMEOUT, ) -> NodeNetmapInfo: """ Get target node info. """ output = ( self.cli.netmap.nodeinfo( rpc_endpoint=rpc_endpoint, wallet=wallet, address=address, generate_key=generate_key, json=json, ttl=ttl, trace=trace, xhdr=xhdr, timeout=timeout, ) .stdout.split("Trace ID")[0] .strip() ) return NetmapParser.node_info(module_json.loads(output)) def snapshot( self, rpc_endpoint: str, wallet: Optional[str] = None, address: Optional[str] = None, generate_key: bool = False, ttl: Optional[int] = None, trace: Optional[bool] = True, xhdr: Optional[dict] = None, timeout: Optional[str] = CLI_DEFAULT_TIMEOUT, ) -> List[NodeNetmapInfo]: """ Get target node info. """ output = ( self.cli.netmap.snapshot( rpc_endpoint=rpc_endpoint, wallet=wallet, address=address, generate_key=generate_key, ttl=ttl, trace=trace, xhdr=xhdr, timeout=timeout, ) .stdout.split("Trace ID")[0] .strip() ) return NetmapParser.snapshot_all_nodes(output) def snapshot_one_node( self, rpc_endpoint: str, wallet: Optional[str] = None, address: Optional[str] = None, generate_key: bool = False, ttl: Optional[int] = None, trace: Optional[bool] = True, xhdr: Optional[dict] = None, timeout: Optional[str] = CLI_DEFAULT_TIMEOUT, ) -> List[NodeNetmapInfo]: """ Get target one node info. """ output = ( self.cli.netmap.snapshot( rpc_endpoint=rpc_endpoint, wallet=wallet, address=address, generate_key=generate_key, ttl=ttl, trace=trace, xhdr=xhdr, timeout=timeout, ) .stdout.split("Trace ID")[0] .strip() ) return NetmapParser.snapshot_one_node(output, rpc_endpoint)