[#284] Add container operational in CliWrapper
Some checks reported warnings
DCO action / DCO (pull_request) Has been cancelled

Signed-off-by: Dmitriy Zayakin <d.zayakin@yadro.com>
This commit is contained in:
Dmitriy Zayakin 2024-08-28 12:12:05 +03:00
parent 0caca54e36
commit 85c2707ec8
3 changed files with 377 additions and 38 deletions

View file

@ -1,11 +1,16 @@
import json
import logging
from typing import Optional
import re
from typing import List, Optional, Union
from frostfs_testlib import reporter
from frostfs_testlib.cli.frostfs_cli.cli import FrostfsCli
from frostfs_testlib.plugins import load_plugin
from frostfs_testlib.resources.cli import CLI_DEFAULT_TIMEOUT
from frostfs_testlib.storage.constants import PlacementRule
from frostfs_testlib.s3.interfaces import BucketContainerResolver
from frostfs_testlib.storage.cluster import ClusterNode
from frostfs_testlib.storage.grpc_operations import interfaces
from frostfs_testlib.utils import json_utils
logger = logging.getLogger("NeoLogger")
@ -18,13 +23,22 @@ class ContainerOperations(interfaces.ContainerInterface):
def create(
self,
endpoint: str,
rule: str = PlacementRule.DEFAULT_PLACEMENT_RULE,
basic_acl: str = "",
nns_zone: Optional[str] = None,
nns_name: Optional[str] = None,
address: Optional[str] = None,
attributes: Optional[dict] = None,
session_token: str = "",
basic_acl: Optional[str] = None,
await_mode: bool = False,
disable_timestamp: bool = False,
force: bool = False,
trace: bool = False,
name: Optional[str] = None,
options: Optional[dict] = None,
await_mode: bool = True,
nonce: Optional[str] = None,
policy: Optional[str] = None,
session: Optional[str] = None,
subnet: Optional[str] = None,
ttl: Optional[int] = None,
xhdr: Optional[dict] = None,
timeout: Optional[str] = CLI_DEFAULT_TIMEOUT,
) -> str:
"""
@ -54,14 +68,23 @@ class ContainerOperations(interfaces.ContainerInterface):
"""
result = self.cli.container.create(
rpc_endpoint=endpoint,
policy=rule,
basic_acl=basic_acl,
policy=policy,
nns_zone=nns_zone,
nns_name=nns_name,
address=address,
attributes=attributes,
name=name,
session=session_token,
basic_acl=basic_acl,
await_mode=await_mode,
disable_timestamp=disable_timestamp,
force=force,
trace=trace,
name=name,
nonce=nonce,
session=session,
subnet=subnet,
ttl=ttl,
xhdr=xhdr,
timeout=timeout,
**options or {},
)
cid = self._parse_cid(result.stdout)
@ -71,21 +94,215 @@ class ContainerOperations(interfaces.ContainerInterface):
return cid
@reporter.step("List Containers")
def list(self, endpoint: str, timeout: Optional[str] = CLI_DEFAULT_TIMEOUT) -> list[str]:
def list(
self,
endpoint: str,
name: Optional[str] = None,
address: Optional[str] = None,
generate_key: Optional[bool] = None,
owner: Optional[str] = None,
ttl: Optional[int] = None,
xhdr: Optional[dict] = None,
timeout: Optional[str] = CLI_DEFAULT_TIMEOUT,
**params,
) -> List[str]:
"""
A wrapper for `frostfs-cli container list` call. It returns all the
available containers for the given wallet.
Args:
wallet (WalletInfo): a wallet on whose behalf we list the containers
shell: executor for cli command
endpoint: FrostFS endpoint to send request to, appends to `--rpc-endpoint` key
timeout: Timeout for the operation.
Returns:
(list): list of containers
"""
result = self.cli.container.list(rpc_endpoint=endpoint, timeout=timeout)
result = self.cli.container.list(
rpc_endpoint=endpoint,
name=name,
address=address,
generate_key=generate_key,
owner=owner,
ttl=ttl,
xhdr=xhdr,
timeout=timeout,
**params,
)
return result.stdout.split()
@reporter.step("List Objects in container")
def list_objects(
self,
endpoint: str,
cid: str,
bearer: Optional[str] = None,
wallet: Optional[str] = None,
address: Optional[str] = None,
generate_key: Optional[bool] = None,
trace: bool = False,
ttl: Optional[int] = None,
xhdr: Optional[dict] = None,
timeout: Optional[str] = CLI_DEFAULT_TIMEOUT,
) -> List[str]:
"""
A wrapper for `frostfs-cli container list-objects` call. It returns all the
available objects in container.
Args:
container_id: cid of container
endpoint: FrostFS endpoint to send request to, appends to `--rpc-endpoint` key
timeout: Timeout for the operation.
Returns:
(list): list of containers
"""
result = self.cli.container.list_objects(
rpc_endpoint=endpoint,
cid=cid,
bearer=bearer,
wallet=wallet,
address=address,
generate_key=generate_key,
trace=trace,
ttl=ttl,
xhdr=xhdr,
timeout=timeout,
)
logger.info(f"Container objects: \n{result}")
return result.stdout.split()
@reporter.step("Delete container")
def delete(
self,
endpoint: str,
cid: str,
address: Optional[str] = None,
await_mode: bool = False,
session: Optional[str] = None,
ttl: Optional[int] = None,
xhdr: Optional[dict] = None,
force: bool = False,
trace: bool = False,
):
try:
return self.cli.container.delete(
rpc_endpoint=endpoint,
cid=cid,
address=address,
await_mode=await_mode,
session=session,
ttl=ttl,
xhdr=xhdr,
force=force,
trace=trace,
).stdout
except RuntimeError as e:
print(f"Error request:\n{e}")
@reporter.step("Get container")
def get(
self,
endpoint: str,
cid: str,
address: Optional[str] = None,
generate_key: Optional[bool] = None,
await_mode: bool = False,
to: Optional[str] = None,
json_mode: bool = True,
trace: bool = False,
ttl: Optional[int] = None,
xhdr: Optional[dict] = None,
timeout: Optional[str] = CLI_DEFAULT_TIMEOUT,
) -> Union[dict, str]:
result = self.cli.container.get(
rpc_endpoint=endpoint,
cid=cid,
address=address,
generate_key=generate_key,
await_mode=await_mode,
to=to,
json_mode=json_mode,
trace=trace,
ttl=ttl,
xhdr=xhdr,
timeout=timeout,
)
container_info = json.loads(result.stdout)
attributes = dict()
for attr in container_info["attributes"]:
attributes[attr["key"]] = attr["value"]
container_info["attributes"] = attributes
container_info["ownerID"] = json_utils.json_reencode(container_info["ownerID"]["value"])
return container_info
@reporter.step("Get eacl container")
def get_eacl(
self,
endpoint: str,
cid: str,
address: Optional[str] = None,
generate_key: Optional[bool] = None,
await_mode: bool = False,
json_mode: bool = True,
trace: bool = False,
to: Optional[str] = None,
session: Optional[str] = None,
ttl: Optional[int] = None,
xhdr: Optional[dict] = None,
timeout: Optional[str] = CLI_DEFAULT_TIMEOUT,
):
return self.cli.container.get_eacl(
rpc_endpoint=endpoint,
cid=cid,
address=address,
generate_key=generate_key,
await_mode=await_mode,
to=to,
session=session,
ttl=ttl,
xhdr=xhdr,
timeout=CLI_DEFAULT_TIMEOUT,
).stdout
@reporter.step("Get nodes container")
def nodes(
self,
endpoint: str,
cid: str,
address: Optional[str] = None,
ttl: Optional[int] = None,
from_file: Optional[str] = None,
trace: bool = False,
short: Optional[bool] = True,
xhdr: Optional[dict] = None,
generate_key: Optional[bool] = None,
timeout: Optional[str] = CLI_DEFAULT_TIMEOUT,
) -> List[ClusterNode]:
result = self.cli.container.search_node(
rpc_endpoint=endpoint,
cid=cid,
address=address,
ttl=ttl,
from_file=from_file,
trace=trace,
short=short,
xhdr=xhdr,
generate_key=generate_key,
timeout=timeout,
).stdout
pattern = r"[0-9]+(?:\.[0-9]+){3}"
nodes_ip = list(set(re.findall(pattern, result)))
with reporter.step(f"nodes ips = {nodes_ip}"):
nodes_list = cluster.get_nodes_by_ip(nodes_ip)
with reporter.step(f"Return nodes - {nodes_list}"):
return nodes_list
@reporter.step("Resolve container by name")
def resolve_container_by_name(name: str, node: ClusterNode):
resolver_cls = load_plugin("frostfs.testlib.bucket_cid_resolver", node.host.config.product)
resolver: BucketContainerResolver = resolver_cls()
return resolver.resolve(node, name)
def _parse_cid(self, output: str) -> str:
"""
Parses container ID from a given CLI output. The input string we expect: