from abc import ABC, abstractmethod from typing import Any, List, Optional from frostfs_testlib.shell.interfaces import CommandResult from frostfs_testlib.storage.cluster import Cluster, ClusterNode from frostfs_testlib.storage.constants import PlacementRule from frostfs_testlib.storage.dataclasses.storage_object_info import Chunk, NodeNetmapInfo from frostfs_testlib.utils import file_utils class ChunksInterface(ABC): @abstractmethod def search_node_without_chunks(self, chunks: list[Chunk], cluster: Cluster, endpoint: str = None) -> list[ClusterNode]: pass @abstractmethod def get_chunk_node(self, cluster: Cluster, chunk: Chunk) -> tuple[ClusterNode, NodeNetmapInfo]: pass @abstractmethod def get_shard_chunk(self, node: ClusterNode, chunk: Chunk) -> str: pass @abstractmethod def get_all( self, rpc_endpoint: str, cid: str, oid: str, wallet: Optional[str] = None, address: Optional[str] = None, bearer: Optional[str] = None, generate_key: Optional[bool] = None, trace: bool = False, root: bool = False, verify_presence_all: bool = False, json: bool = True, ttl: Optional[int] = None, xhdr: Optional[dict] = None, timeout: Optional[str] = None, ) -> list[Chunk]: pass @abstractmethod def get_parity( self, rpc_endpoint: str, cid: str, wallet: Optional[str] = None, address: Optional[str] = None, bearer: Optional[str] = None, generate_key: Optional[bool] = None, oid: Optional[str] = None, trace: bool = False, root: bool = False, verify_presence_all: bool = False, json: bool = True, ttl: Optional[int] = None, xhdr: Optional[dict] = None, timeout: Optional[str] = None, ) -> Chunk: pass @abstractmethod def get_first_data( self, rpc_endpoint: str, cid: str, wallet: Optional[str] = None, address: Optional[str] = None, bearer: Optional[str] = None, generate_key: Optional[bool] = None, oid: Optional[str] = None, trace: bool = False, root: bool = False, verify_presence_all: bool = False, json: bool = True, ttl: Optional[int] = None, xhdr: Optional[dict] = None, timeout: Optional[str] = None, ) -> Chunk: pass class ObjectInterface(ABC): def __init__(self) -> None: self.chunks: ChunksInterface @abstractmethod def delete( self, cid: str, oid: str, endpoint: str, bearer: str = "", xhdr: Optional[dict] = None, session: Optional[str] = None, timeout: Optional[str] = None, ) -> str: pass @abstractmethod def get( self, cid: str, oid: str, endpoint: str, bearer: Optional[str] = None, write_object: Optional[str] = None, xhdr: Optional[dict] = None, no_progress: bool = True, session: Optional[str] = None, timeout: Optional[str] = None, ) -> file_utils.TestFile: pass @abstractmethod def get_from_random_node( self, cid: str, oid: str, cluster: Cluster, bearer: Optional[str] = None, write_object: Optional[str] = None, xhdr: Optional[dict] = None, no_progress: bool = True, session: Optional[str] = None, timeout: Optional[str] = None, ) -> str: pass @abstractmethod def hash( self, endpoint: str, cid: str, oid: str, address: Optional[str] = None, bearer: Optional[str] = None, generate_key: Optional[bool] = None, range: Optional[str] = None, salt: Optional[str] = None, ttl: Optional[int] = None, session: Optional[str] = None, hash_type: Optional[str] = None, xhdr: Optional[dict] = None, timeout: Optional[str] = None, ) -> str: pass @abstractmethod def head( self, cid: str, oid: str, endpoint: str, bearer: str = "", xhdr: Optional[dict] = None, json_output: bool = True, is_raw: bool = False, is_direct: bool = False, session: Optional[str] = None, timeout: Optional[str] = None, ) -> CommandResult | Any: pass @abstractmethod def lock( self, cid: str, oid: str, endpoint: str, lifetime: Optional[int] = None, expire_at: Optional[int] = None, address: Optional[str] = None, bearer: Optional[str] = None, session: Optional[str] = None, ttl: Optional[int] = None, xhdr: Optional[dict] = None, timeout: Optional[str] = None, ) -> str: pass @abstractmethod def put( self, path: str, cid: str, endpoint: str, bearer: Optional[str] = None, copies_number: Optional[int] = None, attributes: Optional[dict] = None, xhdr: Optional[dict] = None, expire_at: Optional[int] = None, no_progress: bool = True, session: Optional[str] = None, timeout: Optional[str] = None, ) -> str: pass @abstractmethod def patch( self, cid: str, oid: str, endpoint: str, ranges: Optional[list[str]] = None, payloads: Optional[list[str]] = None, new_attrs: Optional[str] = None, replace_attrs: bool = False, bearer: Optional[str] = None, xhdr: Optional[dict] = None, session: Optional[str] = None, timeout: Optional[str] = None, trace: bool = False, ) -> str: pass @abstractmethod def put_to_random_node( self, path: str, cid: str, cluster: Cluster, bearer: Optional[str] = None, copies_number: Optional[int] = None, attributes: Optional[dict] = None, xhdr: Optional[dict] = None, expire_at: Optional[int] = None, no_progress: bool = True, session: Optional[str] = None, timeout: Optional[str] = None, ) -> str: pass @abstractmethod def range( self, cid: str, oid: str, range_cut: str, endpoint: str, bearer: str = "", xhdr: Optional[dict] = None, session: Optional[str] = None, timeout: Optional[str] = None, ) -> tuple[file_utils.TestFile, bytes]: pass @abstractmethod def search( self, cid: str, endpoint: str, bearer: str = "", oid: Optional[str] = None, filters: Optional[dict] = None, expected_objects_list: Optional[list] = None, xhdr: Optional[dict] = None, session: Optional[str] = None, phy: bool = False, root: bool = False, timeout: Optional[str] = None, address: Optional[str] = None, generate_key: Optional[bool] = None, ttl: Optional[int] = None, ) -> List: pass @abstractmethod def nodes( self, cluster: Cluster, cid: str, oid: str, alive_node: ClusterNode, bearer: str = "", xhdr: Optional[dict] = None, is_direct: bool = False, verify_presence_all: bool = False, timeout: Optional[str] = None, ) -> List[ClusterNode]: pass @abstractmethod def parts( self, cid: str, oid: str, alive_node: ClusterNode, bearer: str = "", xhdr: Optional[dict] = None, is_direct: bool = False, verify_presence_all: bool = False, timeout: Optional[str] = None, ) -> List[str]: pass class ContainerInterface(ABC): @abstractmethod def create( self, endpoint: str, nns_zone: Optional[str] = None, nns_name: Optional[str] = None, address: Optional[str] = None, attributes: Optional[dict] = None, basic_acl: Optional[str] = None, await_mode: bool = False, disable_timestamp: bool = False, force: bool = False, trace: bool = False, name: Optional[str] = None, nonce: Optional[str] = None, policy: Optional[str] = None, session: Optional[str] = None, subnet: Optional[str] = None, ttl: Optional[int] = None, xhdr: Optional[dict] = None, timeout: Optional[str] = None, ) -> str: """ Create a new container and register it in the FrostFS. It will be stored in the sidechain when the Inner Ring accepts it. """ raise NotImplementedError("No implemethed method create") @abstractmethod def delete( self, endpoint: str, cid: str, address: Optional[str] = None, await_mode: bool = False, session: Optional[str] = None, ttl: Optional[int] = None, xhdr: Optional[dict] = None, force: bool = False, trace: bool = False, ) -> List[str]: """ Delete an existing container. Only the owner of the container has permission to remove the container. """ raise NotImplementedError("No implemethed method delete") @abstractmethod def get( self, endpoint: str, cid: str, address: Optional[str] = None, generate_key: Optional[bool] = None, await_mode: bool = False, to: Optional[str] = None, json_mode: bool = True, trace: bool = False, ttl: Optional[int] = None, xhdr: Optional[dict] = None, timeout: Optional[str] = None, ) -> List[str]: """Get container field info.""" raise NotImplementedError("No implemethed method get") @abstractmethod def get_eacl( self, endpoint: str, cid: str, address: Optional[str] = None, generate_key: Optional[bool] = None, await_mode: bool = False, json_mode: bool = True, trace: bool = False, to: Optional[str] = None, session: Optional[str] = None, ttl: Optional[int] = None, xhdr: Optional[dict] = None, timeout: Optional[str] = None, ) -> List[str]: """Get extended ACL table of container.""" raise NotImplementedError("No implemethed method get-eacl") @abstractmethod def list( self, endpoint: str, name: Optional[str] = None, address: Optional[str] = None, generate_key: Optional[bool] = None, trace: bool = False, owner: Optional[str] = None, ttl: Optional[int] = None, xhdr: Optional[dict] = None, timeout: Optional[str] = None, **params, ) -> List[str]: """List all created containers.""" raise NotImplementedError("No implemethed method list") @abstractmethod def nodes( self, endpoint: str, cid: str, cluster: Cluster, address: Optional[str] = None, ttl: Optional[int] = None, from_file: Optional[str] = None, trace: bool = False, short: Optional[bool] = True, xhdr: Optional[dict] = None, generate_key: Optional[bool] = None, timeout: Optional[str] = None, ) -> List[ClusterNode]: """Show the nodes participating in the container in the current epoch.""" raise NotImplementedError("No implemethed method nodes") class GrpcClientWrapper(ABC): def __init__(self) -> None: self.object: ObjectInterface self.container: ContainerInterface