Refactor for cluster usage

Signed-off-by: Andrey Berezin <a.berezin@yadro.com>
This commit is contained in:
Andrey Berezin 2022-12-06 01:31:45 +03:00 committed by abereziny
parent d9e881001e
commit bd05aae585
46 changed files with 3859 additions and 2703 deletions

View file

@ -10,7 +10,7 @@ from typing import Any, Dict, List, Optional, Union
import allure
import base58
from common import ASSETS_DIR, NEOFS_CLI_EXEC, NEOFS_ENDPOINT, WALLET_CONFIG
from common import ASSETS_DIR, NEOFS_CLI_EXEC, WALLET_CONFIG
from data_formatters import get_wallet_public_key
from neofs_testlib.cli import NeofsCli
from neofs_testlib.shell import Shell
@ -116,10 +116,10 @@ class EACLRule:
@allure.title("Get extended ACL")
def get_eacl(wallet_path: str, cid: str, shell: Shell) -> Optional[str]:
def get_eacl(wallet_path: str, cid: str, shell: Shell, endpoint: str) -> Optional[str]:
cli = NeofsCli(shell, NEOFS_CLI_EXEC, WALLET_CONFIG)
try:
result = cli.container.get_eacl(wallet=wallet_path, rpc_endpoint=NEOFS_ENDPOINT, cid=cid)
result = cli.container.get_eacl(wallet=wallet_path, rpc_endpoint=endpoint, cid=cid)
except RuntimeError as exc:
logger.info("Extended ACL table is not set for this container")
logger.info(f"Got exception while getting eacl: {exc}")
@ -135,12 +135,13 @@ def set_eacl(
cid: str,
eacl_table_path: str,
shell: Shell,
endpoint: str,
session_token: Optional[str] = None,
) -> None:
cli = NeofsCli(shell, NEOFS_CLI_EXEC, WALLET_CONFIG)
cli.container.set_eacl(
wallet=wallet_path,
rpc_endpoint=NEOFS_ENDPOINT,
rpc_endpoint=endpoint,
cid=cid,
table=eacl_table_path,
await_mode=True,
@ -166,7 +167,11 @@ def create_eacl(cid: str, rules_list: List[EACLRule], shell: Shell) -> str:
def form_bearertoken_file(
wif: str, cid: str, eacl_rule_list: List[Union[EACLRule, EACLPubKey]], shell: Shell
wif: str,
cid: str,
eacl_rule_list: List[Union[EACLRule, EACLPubKey]],
shell: Shell,
endpoint: str,
) -> str:
"""
This function fetches eACL for given <cid> on behalf of <wif>,
@ -176,7 +181,7 @@ def form_bearertoken_file(
enc_cid = _encode_cid_for_eacl(cid)
file_path = os.path.join(os.getcwd(), ASSETS_DIR, str(uuid.uuid4()))
eacl = get_eacl(wif, cid, shell=shell)
eacl = get_eacl(wif, cid, shell, endpoint)
json_eacl = dict()
if eacl:
eacl = eacl.replace("eACL: ", "").split("Signature")[0]

View file

@ -15,7 +15,8 @@ from typing import Optional
import allure
import neofs_verbs
from common import NEOFS_NETMAP, WALLET_CONFIG
from cluster import StorageNode
from common import WALLET_CONFIG
from neofs_testlib.shell import Shell
logger = logging.getLogger("NeoLogger")
@ -27,6 +28,7 @@ def get_link_object(
cid: str,
oid: str,
shell: Shell,
nodes: list[StorageNode],
bearer: str = "",
wallet_config: str = WALLET_CONFIG,
is_direct: bool = True,
@ -38,6 +40,7 @@ def get_link_object(
cid (str): Container ID which stores the Large Object
oid (str): Large Object ID
shell: executor for cli command
nodes: list of nodes to do search on
bearer (optional, str): path to Bearer token file
wallet_config (optional, str): path to the neofs-cli config file
is_direct: send request directly to the node or not; this flag
@ -47,14 +50,15 @@ def get_link_object(
When no Link Object ID is found after all Storage Nodes polling,
the function throws an error.
"""
for node in NEOFS_NETMAP:
for node in nodes:
endpoint = node.get_rpc_endpoint()
try:
resp = neofs_verbs.head_object(
wallet,
cid,
oid,
shell=shell,
endpoint=node,
endpoint=endpoint,
is_raw=True,
is_direct=is_direct,
bearer=bearer,
@ -63,13 +67,15 @@ def get_link_object(
if resp["link"]:
return resp["link"]
except Exception:
logger.info(f"No Link Object found on {node}; continue")
logger.info(f"No Link Object found on {endpoint}; continue")
logger.error(f"No Link Object for {cid}/{oid} found among all Storage Nodes")
return None
@allure.step("Get Last Object")
def get_last_object(wallet: str, cid: str, oid: str, shell: Shell) -> Optional[str]:
def get_last_object(
wallet: str, cid: str, oid: str, shell: Shell, nodes: list[StorageNode]
) -> Optional[str]:
"""
Args:
wallet (str): path to the wallet on whose behalf the Storage Nodes
@ -77,19 +83,21 @@ def get_last_object(wallet: str, cid: str, oid: str, shell: Shell) -> Optional[s
cid (str): Container ID which stores the Large Object
oid (str): Large Object ID
shell: executor for cli command
nodes: list of nodes to do search on
Returns:
(str): Last Object ID
When no Last Object ID is found after all Storage Nodes polling,
the function throws an error.
"""
for node in NEOFS_NETMAP:
for node in nodes:
endpoint = node.get_rpc_endpoint()
try:
resp = neofs_verbs.head_object(
wallet, cid, oid, shell=shell, endpoint=node, is_raw=True, is_direct=True
wallet, cid, oid, shell=shell, endpoint=endpoint, is_raw=True, is_direct=True
)
if resp["lastPart"]:
return resp["lastPart"]
except Exception:
logger.info(f"No Last Object found on {node}; continue")
logger.info(f"No Last Object found on {endpoint}; continue")
logger.error(f"No Last Object for {cid}/{oid} found among all Storage Nodes")
return None

View file

@ -11,7 +11,7 @@ from typing import Optional, Union
import allure
import json_transformers
from common import NEOFS_CLI_EXEC, NEOFS_ENDPOINT, WALLET_CONFIG
from common import NEOFS_CLI_EXEC, WALLET_CONFIG
from neofs_testlib.cli import NeofsCli
from neofs_testlib.shell import Shell
@ -24,6 +24,7 @@ DEFAULT_PLACEMENT_RULE = "REP 2 IN X CBF 1 SELECT 4 FROM * AS X"
def create_container(
wallet: str,
shell: Shell,
endpoint: str,
rule: str = DEFAULT_PLACEMENT_RULE,
basic_acl: str = "",
attributes: Optional[dict] = None,
@ -49,6 +50,7 @@ def create_container(
the session token; this parameter makes sense
when paired with `session_token`
shell: executor for cli command
endpoint: NeoFS endpoint to send request to, appends to `--rpc-endpoint` key
options (optional, dict): any other options to pass to the call
name (optional, str): container name attribute
await_mode (bool): block execution until container is persisted
@ -60,7 +62,7 @@ def create_container(
cli = NeofsCli(shell, NEOFS_CLI_EXEC, WALLET_CONFIG)
result = cli.container.create(
rpc_endpoint=NEOFS_ENDPOINT,
rpc_endpoint=endpoint,
wallet=session_wallet if session_wallet else wallet,
policy=rule,
basic_acl=basic_acl,
@ -76,16 +78,16 @@ def create_container(
logger.info("Container created; waiting until it is persisted in the sidechain")
if wait_for_creation:
wait_for_container_creation(wallet, cid, shell=shell)
wait_for_container_creation(wallet, cid, shell, endpoint)
return cid
def wait_for_container_creation(
wallet: str, cid: str, shell: Shell, attempts: int = 15, sleep_interval: int = 1
wallet: str, cid: str, shell: Shell, endpoint: str, attempts: int = 15, sleep_interval: int = 1
):
for _ in range(attempts):
containers = list_containers(wallet, shell=shell)
containers = list_containers(wallet, shell, endpoint)
if cid in containers:
return
logger.info(f"There is no {cid} in {containers} yet; sleep {sleep_interval} and continue")
@ -96,11 +98,11 @@ def wait_for_container_creation(
def wait_for_container_deletion(
wallet: str, cid: str, shell: Shell, attempts: int = 30, sleep_interval: int = 1
wallet: str, cid: str, shell: Shell, endpoint: str, attempts: int = 30, sleep_interval: int = 1
):
for _ in range(attempts):
try:
get_container(wallet, cid, shell=shell)
get_container(wallet, cid, shell=shell, endpoint=endpoint)
sleep(sleep_interval)
continue
except Exception as err:
@ -111,18 +113,19 @@ def wait_for_container_deletion(
@allure.step("List Containers")
def list_containers(wallet: str, shell: Shell) -> list[str]:
def list_containers(wallet: str, shell: Shell, endpoint: str) -> list[str]:
"""
A wrapper for `neofs-cli container list` call. It returns all the
available containers for the given wallet.
Args:
wallet (str): a wallet on whose behalf we list the containers
shell: executor for cli command
endpoint: NeoFS endpoint to send request to, appends to `--rpc-endpoint` key
Returns:
(list): list of containers
"""
cli = NeofsCli(shell, NEOFS_CLI_EXEC, WALLET_CONFIG)
result = cli.container.list(rpc_endpoint=NEOFS_ENDPOINT, wallet=wallet)
result = cli.container.list(rpc_endpoint=endpoint, wallet=wallet)
logger.info(f"Containers: \n{result}")
return result.stdout.split()
@ -132,6 +135,7 @@ def get_container(
wallet: str,
cid: str,
shell: Shell,
endpoint: str,
json_mode: bool = True,
) -> Union[dict, str]:
"""
@ -141,14 +145,14 @@ def get_container(
wallet (str): path to a wallet on whose behalf we get the container
cid (str): ID of the container to get
shell: executor for cli command
endpoint: NeoFS endpoint to send request to, appends to `--rpc-endpoint` key
json_mode (bool): return container in JSON format
Returns:
(dict, str): dict of container attributes
"""
cli = NeofsCli(shell, NEOFS_CLI_EXEC, WALLET_CONFIG)
result = cli.container.get(
rpc_endpoint=NEOFS_ENDPOINT, wallet=wallet, cid=cid, json_mode=json_mode
)
result = cli.container.get(rpc_endpoint=endpoint, wallet=wallet, cid=cid, json_mode=json_mode)
if not json_mode:
return result.stdout
@ -166,7 +170,12 @@ def get_container(
# TODO: make the error message about a non-found container more user-friendly
# https://github.com/nspcc-dev/neofs-contract/issues/121
def delete_container(
wallet: str, cid: str, shell: Shell, force: bool = False, session_token: Optional[str] = None
wallet: str,
cid: str,
shell: Shell,
endpoint: str,
force: bool = False,
session_token: Optional[str] = None,
) -> None:
"""
A wrapper for `neofs-cli container delete` call.
@ -174,6 +183,7 @@ def delete_container(
wallet (str): path to a wallet on whose behalf we delete the container
cid (str): ID of the container to delete
shell: executor for cli command
endpoint: NeoFS endpoint to send request to, appends to `--rpc-endpoint` key
force (bool): do not check whether container contains locks and remove immediately
session_token: a path to session token file
This function doesn't return anything.
@ -181,7 +191,7 @@ def delete_container(
cli = NeofsCli(shell, NEOFS_CLI_EXEC, WALLET_CONFIG)
cli.container.delete(
wallet=wallet, cid=cid, rpc_endpoint=NEOFS_ENDPOINT, force=force, session=session_token
wallet=wallet, cid=cid, rpc_endpoint=endpoint, force=force, session=session_token
)
@ -212,10 +222,10 @@ def _parse_cid(output: str) -> str:
@allure.step("Search container by name")
def search_container_by_name(wallet: str, name: str, shell: Shell):
list_cids = list_containers(wallet, shell)
def search_container_by_name(wallet: str, name: str, shell: Shell, endpoint: str):
list_cids = list_containers(wallet, shell, endpoint)
for cid in list_cids:
cont_info = get_container(wallet, cid, shell, True)
cont_info = get_container(wallet, cid, shell, endpoint, True)
if cont_info.get("attributes").get("Name", None) == name:
return cid
return None

View file

@ -1,6 +1,7 @@
from typing import List, Optional
from acl import EACLOperation
from cluster import Cluster
from neofs_testlib.shell import Shell
from python_keywords.object_access import (
can_delete_object,
@ -19,17 +20,21 @@ def check_full_access_to_container(
oid: str,
file_name: str,
shell: Shell,
cluster: Cluster,
bearer: Optional[str] = None,
wallet_config: Optional[str] = None,
xhdr: Optional[dict] = None,
):
assert can_put_object(wallet, cid, file_name, shell, bearer, wallet_config, xhdr)
assert can_get_head_object(wallet, cid, oid, shell, bearer, wallet_config, xhdr)
assert can_get_range_of_object(wallet, cid, oid, shell, bearer, wallet_config, xhdr)
assert can_get_range_hash_of_object(wallet, cid, oid, shell, bearer, wallet_config, xhdr)
assert can_search_object(wallet, cid, shell, oid, bearer, wallet_config, xhdr)
assert can_get_object(wallet, cid, oid, file_name, shell, bearer, wallet_config, xhdr)
assert can_delete_object(wallet, cid, oid, shell, bearer, wallet_config, xhdr)
endpoint = cluster.default_rpc_endpoint
assert can_put_object(wallet, cid, file_name, shell, cluster, bearer, wallet_config, xhdr)
assert can_get_head_object(wallet, cid, oid, shell, endpoint, bearer, wallet_config, xhdr)
assert can_get_range_of_object(wallet, cid, oid, shell, endpoint, bearer, wallet_config, xhdr)
assert can_get_range_hash_of_object(
wallet, cid, oid, shell, endpoint, bearer, wallet_config, xhdr
)
assert can_search_object(wallet, cid, shell, endpoint, oid, bearer, wallet_config, xhdr)
assert can_get_object(wallet, cid, oid, file_name, shell, cluster, bearer, wallet_config, xhdr)
assert can_delete_object(wallet, cid, oid, shell, endpoint, bearer, wallet_config, xhdr)
def check_no_access_to_container(
@ -38,17 +43,25 @@ def check_no_access_to_container(
oid: str,
file_name: str,
shell: Shell,
cluster: Cluster,
bearer: Optional[str] = None,
wallet_config: Optional[str] = None,
xhdr: Optional[dict] = None,
):
assert not can_put_object(wallet, cid, file_name, shell, bearer, wallet_config, xhdr)
assert not can_get_head_object(wallet, cid, oid, shell, bearer, wallet_config, xhdr)
assert not can_get_range_of_object(wallet, cid, oid, shell, bearer, wallet_config, xhdr)
assert not can_get_range_hash_of_object(wallet, cid, oid, shell, bearer, wallet_config, xhdr)
assert not can_search_object(wallet, cid, shell, oid, bearer, wallet_config, xhdr)
assert not can_get_object(wallet, cid, oid, file_name, shell, bearer, wallet_config, xhdr)
assert not can_delete_object(wallet, cid, oid, shell, bearer, wallet_config, xhdr)
endpoint = cluster.default_rpc_endpoint
assert not can_put_object(wallet, cid, file_name, shell, cluster, bearer, wallet_config, xhdr)
assert not can_get_head_object(wallet, cid, oid, shell, endpoint, bearer, wallet_config, xhdr)
assert not can_get_range_of_object(
wallet, cid, oid, shell, endpoint, bearer, wallet_config, xhdr
)
assert not can_get_range_hash_of_object(
wallet, cid, oid, shell, endpoint, bearer, wallet_config, xhdr
)
assert not can_search_object(wallet, cid, shell, endpoint, oid, bearer, wallet_config, xhdr)
assert not can_get_object(
wallet, cid, oid, file_name, shell, cluster, bearer, wallet_config, xhdr
)
assert not can_delete_object(wallet, cid, oid, shell, endpoint, bearer, wallet_config, xhdr)
def check_custom_access_to_container(
@ -57,42 +70,44 @@ def check_custom_access_to_container(
oid: str,
file_name: str,
shell: Shell,
cluster: Cluster,
deny_operations: Optional[List[EACLOperation]] = None,
ignore_operations: Optional[List[EACLOperation]] = None,
bearer: Optional[str] = None,
wallet_config: Optional[str] = None,
xhdr: Optional[dict] = None,
):
endpoint = cluster.default_rpc_endpoint
deny_operations = [op.value for op in deny_operations or []]
ignore_operations = [op.value for op in ignore_operations or []]
checks: dict = {}
if EACLOperation.PUT.value not in ignore_operations:
checks[EACLOperation.PUT.value] = can_put_object(
wallet, cid, file_name, shell, bearer, wallet_config, xhdr
wallet, cid, file_name, shell, cluster, bearer, wallet_config, xhdr
)
if EACLOperation.HEAD.value not in ignore_operations:
checks[EACLOperation.HEAD.value] = can_get_head_object(
wallet, cid, oid, shell, bearer, wallet_config, xhdr
wallet, cid, oid, shell, endpoint, bearer, wallet_config, xhdr
)
if EACLOperation.GET_RANGE.value not in ignore_operations:
checks[EACLOperation.GET_RANGE.value] = can_get_range_of_object(
wallet, cid, oid, shell, bearer, wallet_config, xhdr
wallet, cid, oid, shell, endpoint, bearer, wallet_config, xhdr
)
if EACLOperation.GET_RANGE_HASH.value not in ignore_operations:
checks[EACLOperation.GET_RANGE_HASH.value] = can_get_range_hash_of_object(
wallet, cid, oid, shell, bearer, wallet_config, xhdr
wallet, cid, oid, shell, endpoint, bearer, wallet_config, xhdr
)
if EACLOperation.SEARCH.value not in ignore_operations:
checks[EACLOperation.SEARCH.value] = can_search_object(
wallet, cid, shell, oid, bearer, wallet_config, xhdr
wallet, cid, shell, endpoint, oid, bearer, wallet_config, xhdr
)
if EACLOperation.GET.value not in ignore_operations:
checks[EACLOperation.GET.value] = can_get_object(
wallet, cid, oid, file_name, shell, bearer, wallet_config, xhdr
wallet, cid, oid, file_name, shell, cluster, bearer, wallet_config, xhdr
)
if EACLOperation.DELETE.value not in ignore_operations:
checks[EACLOperation.DELETE.value] = can_delete_object(
wallet, cid, oid, shell, bearer, wallet_config, xhdr
wallet, cid, oid, shell, endpoint, bearer, wallet_config, xhdr
)
failed_checks = [
@ -114,6 +129,7 @@ def check_read_only_container(
oid: str,
file_name: str,
shell: Shell,
cluster: Cluster,
bearer: Optional[str] = None,
wallet_config: Optional[str] = None,
xhdr: Optional[dict] = None,
@ -128,4 +144,5 @@ def check_read_only_container(
wallet_config=wallet_config,
xhdr=xhdr,
shell=shell,
cluster=cluster,
)

View file

@ -3,15 +3,8 @@ import logging
from time import sleep
import allure
from common import (
IR_WALLET_PASS,
IR_WALLET_PATH,
MAINNET_BLOCK_TIME,
MORPH_ENDPOINT,
NEOFS_ADM_CONFIG_PATH,
NEOFS_ADM_EXEC,
NEOGO_EXECUTABLE,
)
from cluster import Cluster
from common import MAINNET_BLOCK_TIME, NEOFS_ADM_CONFIG_PATH, NEOFS_ADM_EXEC, NEOGO_EXECUTABLE
from neofs_testlib.cli import NeofsAdm, NeoGo
from neofs_testlib.shell import Shell
from neofs_testlib.utils.wallet import get_last_address_from_wallet
@ -22,28 +15,32 @@ logger = logging.getLogger("NeoLogger")
@allure.step("Ensure fresh epoch")
def ensure_fresh_epoch(shell: Shell) -> int:
def ensure_fresh_epoch(shell: Shell, cluster: Cluster) -> int:
# ensure new fresh epoch to avoid epoch switch during test session
current_epoch = get_epoch(shell)
tick_epoch(shell)
epoch = get_epoch(shell)
current_epoch = get_epoch(shell, cluster)
tick_epoch(shell, cluster)
epoch = get_epoch(shell, cluster)
assert epoch > current_epoch, "Epoch wasn't ticked"
return epoch
@allure.step("Get Epoch")
def get_epoch(shell: Shell):
def get_epoch(shell: Shell, cluster: Cluster):
morph_chain = cluster.morph_chain_nodes[0]
morph_endpoint = morph_chain.get_endpoint()
neogo = NeoGo(shell=shell, neo_go_exec_path=NEOGO_EXECUTABLE)
out = neogo.contract.testinvokefunction(
scripthash=get_contract_hash("netmap.neofs", shell=shell),
scripthash=get_contract_hash(morph_chain, "netmap.neofs", shell=shell),
method="epoch",
rpc_endpoint=MORPH_ENDPOINT,
rpc_endpoint=morph_endpoint,
)
return int(json.loads(out.stdout.replace("\n", ""))["stack"][0]["value"])
@allure.step("Tick Epoch")
def tick_epoch(shell: Shell):
def tick_epoch(shell: Shell, cluster: Cluster):
if NEOFS_ADM_EXEC and NEOFS_ADM_CONFIG_PATH:
# If neofs-adm is available, then we tick epoch with it (to be consistent with UAT tests)
neofsadm = NeofsAdm(
@ -52,21 +49,30 @@ def tick_epoch(shell: Shell):
neofsadm.morph.force_new_epoch()
return
# Otherwise we tick epoch using transaction
cur_epoch = get_epoch(shell)
# Use first node by default
ir_address = get_last_address_from_wallet(IR_WALLET_PATH, IR_WALLET_PASS)
# Otherwise we tick epoch using transaction
cur_epoch = get_epoch(shell, cluster)
ir_node = cluster.ir_nodes[0]
# In case if no local_wallet_path is provided, we use wallet_path
ir_wallet_path = ir_node.get_wallet_path()
ir_wallet_pass = ir_node.get_wallet_password()
ir_address = get_last_address_from_wallet(ir_wallet_path, ir_wallet_pass)
morph_chain = cluster.morph_chain_nodes[0]
morph_endpoint = morph_chain.get_endpoint()
neogo = NeoGo(shell, neo_go_exec_path=NEOGO_EXECUTABLE)
neogo.contract.invokefunction(
wallet=IR_WALLET_PATH,
wallet_password=IR_WALLET_PASS,
scripthash=get_contract_hash("netmap.neofs", shell=shell),
wallet=ir_wallet_path,
wallet_password=ir_wallet_pass,
scripthash=get_contract_hash(morph_chain, "netmap.neofs", shell=shell),
method="newEpoch",
arguments=f"int:{cur_epoch + 1}",
multisig_hash=f"{ir_address}:Global",
address=ir_address,
rpc_endpoint=MORPH_ENDPOINT,
rpc_endpoint=morph_endpoint,
force=True,
gas=1,
)

View file

@ -1,55 +1,51 @@
import logging
from time import sleep
from typing import Optional
import allure
from common import NEOFS_NETMAP_DICT
from neofs_testlib.hosting import Hosting
from cluster import Cluster, StorageNode
from neofs_testlib.shell import Shell
from python_keywords.node_management import node_healthcheck
from python_keywords.node_management import storage_node_healthcheck
from storage_policy import get_nodes_with_object
logger = logging.getLogger("NeoLogger")
@allure.step("Wait for object replication")
def wait_object_replication_on_nodes(
wallet: str,
def wait_object_replication(
cid: str,
oid: str,
expected_copies: int,
shell: Shell,
excluded_nodes: Optional[list[str]] = None,
) -> list[str]:
excluded_nodes = excluded_nodes or []
nodes: list[StorageNode],
) -> list[StorageNode]:
sleep_interval, attempts = 15, 20
nodes = []
for __attempt in range(attempts):
nodes = get_nodes_with_object(wallet, cid, oid, shell=shell, skip_nodes=excluded_nodes)
if len(nodes) >= expected_copies:
return nodes
nodes_with_object = []
for _ in range(attempts):
nodes_with_object = get_nodes_with_object(cid, oid, shell=shell, nodes=nodes)
if len(nodes_with_object) >= expected_copies:
return nodes_with_object
sleep(sleep_interval)
raise AssertionError(
f"Expected {expected_copies} copies of object, but found {len(nodes)}. "
f"Expected {expected_copies} copies of object, but found {len(nodes_with_object)}. "
f"Waiting time {sleep_interval * attempts}"
)
@allure.step("Wait for storage node returned to cluster")
def wait_all_storage_node_returned(hosting: Hosting) -> None:
@allure.step("Wait for storage nodes returned to cluster")
def wait_all_storage_nodes_returned(cluster: Cluster) -> None:
sleep_interval, attempts = 15, 20
for __attempt in range(attempts):
if is_all_storage_node_returned(hosting):
if is_all_storage_nodes_returned(cluster):
return
sleep(sleep_interval)
raise AssertionError("Storage node(s) is broken")
def is_all_storage_node_returned(hosting: Hosting) -> bool:
def is_all_storage_nodes_returned(cluster: Cluster) -> bool:
with allure.step("Run health check for all storage nodes"):
for node_name in NEOFS_NETMAP_DICT.keys():
for node in cluster.storage_nodes:
try:
health_check = node_healthcheck(hosting, node_name)
health_check = storage_node_healthcheck(node)
except Exception as err:
logger.warning(f"Node healthcheck fails with error {err}")
return False

View file

@ -9,7 +9,6 @@ from urllib.parse import quote_plus
import allure
import requests
from cli_helpers import _cmd_run
from common import HTTP_GATE
logger = logging.getLogger("NeoLogger")
@ -17,13 +16,14 @@ ASSETS_DIR = os.getenv("ASSETS_DIR", "TemporaryDir/")
@allure.step("Get via HTTP Gate")
def get_via_http_gate(cid: str, oid: str):
def get_via_http_gate(cid: str, oid: str, endpoint: str):
"""
This function gets given object from HTTP gate
:param cid: CID to get object from
:param oid: object OID
cid: container id to get object from
oid: object ID
endpoint: http gate endpoint
"""
request = f"{HTTP_GATE}/get/{cid}/{oid}"
request = f"{endpoint}/get/{cid}/{oid}"
resp = requests.get(request, stream=True)
if not resp.ok:
@ -44,13 +44,14 @@ def get_via_http_gate(cid: str, oid: str):
@allure.step("Get via Zip HTTP Gate")
def get_via_zip_http_gate(cid: str, prefix: str):
def get_via_zip_http_gate(cid: str, prefix: str, endpoint: str):
"""
This function gets given object from HTTP gate
:param cid: CID to get object from
:param prefix: common prefix
cid: container id to get object from
prefix: common prefix
endpoint: http gate endpoint
"""
request = f"{HTTP_GATE}/zip/{cid}/{prefix}"
request = f"{endpoint}/zip/{cid}/{prefix}"
resp = requests.get(request, stream=True)
if not resp.ok:
@ -75,15 +76,16 @@ def get_via_zip_http_gate(cid: str, prefix: str):
@allure.step("Get via HTTP Gate by attribute")
def get_via_http_gate_by_attribute(cid: str, attribute: dict):
def get_via_http_gate_by_attribute(cid: str, attribute: dict, endpoint: str):
"""
This function gets given object from HTTP gate
:param cid: CID to get object from
:param attribute: attribute name: attribute value pair
cid: CID to get object from
attribute: attribute {name: attribute} value pair
endpoint: http gate endpoint
"""
attr_name = list(attribute.keys())[0]
attr_value = quote_plus(str(attribute.get(attr_name)))
request = f"{HTTP_GATE}/get_by_attribute/{cid}/{quote_plus(str(attr_name))}/{attr_value}"
request = f"{endpoint}/get_by_attribute/{cid}/{quote_plus(str(attr_name))}/{attr_value}"
resp = requests.get(request, stream=True)
if not resp.ok:
@ -104,14 +106,15 @@ def get_via_http_gate_by_attribute(cid: str, attribute: dict):
@allure.step("Upload via HTTP Gate")
def upload_via_http_gate(cid: str, path: str, headers: dict = None) -> str:
def upload_via_http_gate(cid: str, path: str, endpoint: str, headers: dict = None) -> str:
"""
This function upload given object through HTTP gate
:param cid: CID to get object from
:param path: File path to upload
:param headers: Object header
cid: CID to get object from
path: File path to upload
endpoint: http gate endpoint
headers: Object header
"""
request = f"{HTTP_GATE}/upload/{cid}"
request = f"{endpoint}/upload/{cid}"
files = {"upload_file": open(path, "rb")}
body = {"filename": path}
resp = requests.post(request, files=files, data=body, headers=headers)
@ -134,15 +137,16 @@ def upload_via_http_gate(cid: str, path: str, headers: dict = None) -> str:
@allure.step("Upload via HTTP Gate using Curl")
def upload_via_http_gate_curl(
cid: str, filepath: str, large_object=False, headers: dict = None
cid: str, filepath: str, endpoint: str, large_object=False, headers: dict = None
) -> str:
"""
This function upload given object through HTTP gate using curl utility.
:param cid: CID to get object from
:param filepath: File path to upload
:param headers: Object header
cid: CID to get object from
filepath: File path to upload
headers: Object header
endpoint: http gate endpoint
"""
request = f"{HTTP_GATE}/upload/{cid}"
request = f"{endpoint}/upload/{cid}"
files = f"file=@{filepath};filename={os.path.basename(filepath)}"
cmd = f"curl -F '{files}' {request}"
if large_object:
@ -156,13 +160,14 @@ def upload_via_http_gate_curl(
@allure.step("Get via HTTP Gate using Curl")
def get_via_http_curl(cid: str, oid: str) -> str:
def get_via_http_curl(cid: str, oid: str, endpoint: str) -> str:
"""
This function gets given object from HTTP gate using curl utility.
:param cid: CID to get object from
:param oid: object OID
cid: CID to get object from
oid: object OID
endpoint: http gate endpoint
"""
request = f"{HTTP_GATE}/get/{cid}/{oid}"
request = f"{endpoint}/get/{cid}/{oid}"
file_path = os.path.join(os.getcwd(), ASSETS_DIR, f"{cid}_{oid}_{str(uuid.uuid4())}")
cmd = f"curl {request} > {file_path}"

View file

@ -1,29 +1,77 @@
import json
import logging
import os
import random
import re
import uuid
from typing import Any, Optional
import allure
import json_transformers
from common import ASSETS_DIR, NEOFS_CLI_EXEC, NEOFS_ENDPOINT, NEOFS_NETMAP, WALLET_CONFIG
from cluster import Cluster
from common import ASSETS_DIR, NEOFS_CLI_EXEC, WALLET_CONFIG
from neofs_testlib.cli import NeofsCli
from neofs_testlib.shell import Shell
logger = logging.getLogger("NeoLogger")
@allure.step("Get object")
@allure.step("Get object from random node")
def get_object_from_random_node(
wallet: str,
cid: str,
oid: str,
shell: Shell,
cluster: Cluster,
bearer: Optional[str] = None,
write_object: Optional[str] = None,
xhdr: Optional[dict] = None,
wallet_config: Optional[str] = None,
no_progress: bool = True,
session: Optional[str] = None,
) -> str:
"""
GET from NeoFS random storage node
Args:
wallet: wallet on whose behalf GET is done
cid: ID of Container where we get the Object from
oid: Object ID
shell: executor for cli command
bearer (optional, str): path to Bearer Token file, appends to `--bearer` key
write_object (optional, str): path to downloaded file, appends to `--file` key
endpoint: NeoFS endpoint to send request to, appends to `--rpc-endpoint` key
wallet_config(optional, str): path to the wallet config
no_progress(optional, bool): do not show progress bar
xhdr (optional, dict): Request X-Headers in form of Key=Value
session (optional, dict): path to a JSON-encoded container session token
Returns:
(str): path to downloaded file
"""
endpoint = cluster.get_random_storage_rpc_endpoint()
return get_object(
wallet,
cid,
oid,
shell,
endpoint,
bearer,
write_object,
xhdr,
wallet_config,
no_progress,
session,
)
@allure.step("Get object from {endpoint}")
def get_object(
wallet: str,
cid: str,
oid: str,
shell: Shell,
endpoint: str = None,
bearer: Optional[str] = None,
write_object: str = "",
endpoint: str = "",
write_object: Optional[str] = None,
xhdr: Optional[dict] = None,
wallet_config: Optional[str] = None,
no_progress: bool = True,
@ -37,9 +85,9 @@ def get_object(
cid (str): ID of Container where we get the Object from
oid (str): Object ID
shell: executor for cli command
bearer (optional, str): path to Bearer Token file, appends to `--bearer` key
write_object (optional, str): path to downloaded file, appends to `--file` key
endpoint (optional, str): NeoFS endpoint to send request to, appends to `--rpc-endpoint` key
bearer: path to Bearer Token file, appends to `--bearer` key
write_object: path to downloaded file, appends to `--file` key
endpoint: NeoFS endpoint to send request to, appends to `--rpc-endpoint` key
wallet_config(optional, str): path to the wallet config
no_progress(optional, bool): do not show progress bar
xhdr (optional, dict): Request X-Headers in form of Key=Value
@ -52,12 +100,9 @@ def get_object(
write_object = str(uuid.uuid4())
file_path = os.path.join(ASSETS_DIR, write_object)
if not endpoint:
endpoint = random.sample(NEOFS_NETMAP, 1)[0]
cli = NeofsCli(shell, NEOFS_CLI_EXEC, wallet_config or WALLET_CONFIG)
cli.object.get(
rpc_endpoint=endpoint or NEOFS_ENDPOINT,
rpc_endpoint=endpoint,
wallet=wallet,
cid=cid,
oid=oid,
@ -71,15 +116,15 @@ def get_object(
return file_path
@allure.step("Get Range Hash")
@allure.step("Get Range Hash from {endpoint}")
def get_range_hash(
wallet: str,
cid: str,
oid: str,
range_cut: str,
shell: Shell,
endpoint: str,
bearer: Optional[str] = None,
endpoint: Optional[str] = None,
wallet_config: Optional[str] = None,
xhdr: Optional[dict] = None,
session: Optional[str] = None,
@ -102,10 +147,9 @@ def get_range_hash(
Returns:
None
"""
cli = NeofsCli(shell, NEOFS_CLI_EXEC, wallet_config or WALLET_CONFIG)
result = cli.object.hash(
rpc_endpoint=endpoint or NEOFS_ENDPOINT,
rpc_endpoint=endpoint,
wallet=wallet,
cid=cid,
oid=oid,
@ -119,16 +163,69 @@ def get_range_hash(
return result.stdout.split(":")[1].strip()
@allure.step("Put object")
@allure.step("Put object to random node")
def put_object_to_random_node(
wallet: str,
path: str,
cid: str,
shell: Shell,
cluster: Cluster,
bearer: Optional[str] = None,
attributes: Optional[dict] = None,
xhdr: Optional[dict] = None,
wallet_config: Optional[str] = None,
expire_at: Optional[int] = None,
no_progress: bool = True,
session: Optional[str] = None,
):
"""
PUT of given file to a random storage node.
Args:
wallet: wallet on whose behalf PUT is done
path: path to file to be PUT
cid: ID of Container where we get the Object from
shell: executor for cli command
cluster: cluster under test
bearer: path to Bearer Token file, appends to `--bearer` key
attributes: User attributes in form of Key1=Value1,Key2=Value2
cluster: cluster under test
wallet_config: path to the wallet config
no_progress: do not show progress bar
expire_at: Last epoch in the life of the object
xhdr: Request X-Headers in form of Key=Value
session: path to a JSON-encoded container session token
Returns:
ID of uploaded Object
"""
endpoint = cluster.get_random_storage_rpc_endpoint()
return put_object(
wallet,
path,
cid,
shell,
endpoint,
bearer,
attributes,
xhdr,
wallet_config,
expire_at,
no_progress,
session,
)
@allure.step("Put object at {endpoint} in container {cid}")
def put_object(
wallet: str,
path: str,
cid: str,
shell: Shell,
endpoint: str,
bearer: Optional[str] = None,
attributes: Optional[dict] = None,
xhdr: Optional[dict] = None,
endpoint: Optional[str] = None,
wallet_config: Optional[str] = None,
expire_at: Optional[int] = None,
no_progress: bool = True,
@ -138,25 +235,21 @@ def put_object(
PUT of given file.
Args:
wallet (str): wallet on whose behalf PUT is done
path (str): path to file to be PUT
cid (str): ID of Container where we get the Object from
wallet: wallet on whose behalf PUT is done
path: path to file to be PUT
cid: ID of Container where we get the Object from
shell: executor for cli command
bearer (optional, str): path to Bearer Token file, appends to `--bearer` key
attributes (optional, str): User attributes in form of Key1=Value1,Key2=Value2
endpoint(optional, str): NeoFS endpoint to send request to
wallet_config(optional, str): path to the wallet config
no_progress(optional, bool): do not show progress bar
expire_at (optional, int): Last epoch in the life of the object
xhdr (optional, dict): Request X-Headers in form of Key=Value
session (optional, dict): path to a JSON-encoded container session token
bearer: path to Bearer Token file, appends to `--bearer` key
attributes: User attributes in form of Key1=Value1,Key2=Value2
endpoint: NeoFS endpoint to send request to, appends to `--rpc-endpoint` key
wallet_config: path to the wallet config
no_progress: do not show progress bar
expire_at: Last epoch in the life of the object
xhdr: Request X-Headers in form of Key=Value
session: path to a JSON-encoded container session token
Returns:
(str): ID of uploaded Object
"""
if not endpoint:
endpoint = random.sample(NEOFS_NETMAP, 1)[0]
if not endpoint:
logger.info(f"---DEB:\n{NEOFS_NETMAP}")
cli = NeofsCli(shell, NEOFS_CLI_EXEC, wallet_config or WALLET_CONFIG)
result = cli.object.put(
@ -178,13 +271,13 @@ def put_object(
return oid.strip()
@allure.step("Delete object")
@allure.step("Delete object {cid}/{oid} from {endpoint}")
def delete_object(
wallet: str,
cid: str,
oid: str,
shell: Shell,
endpoint: Optional[str] = None,
endpoint: str = None,
bearer: str = "",
wallet_config: Optional[str] = None,
xhdr: Optional[dict] = None,
@ -194,21 +287,22 @@ def delete_object(
DELETE an Object.
Args:
wallet (str): wallet on whose behalf DELETE is done
cid (str): ID of Container where we get the Object from
oid (str): ID of Object we are going to delete
wallet: wallet on whose behalf DELETE is done
cid: ID of Container where we get the Object from
oid: ID of Object we are going to delete
shell: executor for cli command
bearer (optional, str): path to Bearer Token file, appends to `--bearer` key
endpoint (optional, str): NeoFS endpoint to send request to, appends to `--rpc-endpoint` key
wallet_config(optional, str): path to the wallet config
xhdr (optional, dict): Request X-Headers in form of Key=Value
session (optional, dict): path to a JSON-encoded container session token
bearer: path to Bearer Token file, appends to `--bearer` key
endpoint: NeoFS endpoint to send request to, appends to `--rpc-endpoint` key
wallet_config: path to the wallet config
xhdr: Request X-Headers in form of Key=Value
session: path to a JSON-encoded container session token
Returns:
(str): Tombstone ID
"""
cli = NeofsCli(shell, NEOFS_CLI_EXEC, wallet_config or WALLET_CONFIG)
result = cli.object.delete(
rpc_endpoint=endpoint or NEOFS_ENDPOINT,
rpc_endpoint=endpoint,
wallet=wallet,
cid=cid,
oid=oid,
@ -229,7 +323,7 @@ def get_range(
oid: str,
range_cut: str,
shell: Shell,
endpoint: Optional[str] = None,
endpoint: str = None,
wallet_config: Optional[str] = None,
bearer: str = "",
xhdr: Optional[dict] = None,
@ -239,16 +333,16 @@ def get_range(
GETRANGE an Object.
Args:
wallet (str): wallet on whose behalf GETRANGE is done
cid (str): ID of Container where we get the Object from
oid (str): ID of Object we are going to request
range_cut (str): range to take data from in the form offset:length
wallet: wallet on whose behalf GETRANGE is done
cid: ID of Container where we get the Object from
oid: ID of Object we are going to request
range_cut: range to take data from in the form offset:length
shell: executor for cli command
endpoint (optional, str): NeoFS endpoint to send request to, appends to `--rpc-endpoint` key
bearer (optional, str): path to Bearer Token file, appends to `--bearer` key
wallet_config(optional, str): path to the wallet config
xhdr (optional, dict): Request X-Headers in form of Key=Value
session (optional, dict): path to a JSON-encoded container session token
endpoint: NeoFS endpoint to send request to, appends to `--rpc-endpoint` key
bearer: path to Bearer Token file, appends to `--bearer` key
wallet_config: path to the wallet config
xhdr: Request X-Headers in form of Key=Value
session: path to a JSON-encoded container session token
Returns:
(str, bytes) - path to the file with range content and content of this file as bytes
"""
@ -256,7 +350,7 @@ def get_range(
cli = NeofsCli(shell, NEOFS_CLI_EXEC, wallet_config or WALLET_CONFIG)
cli.object.range(
rpc_endpoint=endpoint or NEOFS_ENDPOINT,
rpc_endpoint=endpoint,
wallet=wallet,
cid=cid,
oid=oid,
@ -278,9 +372,9 @@ def lock_object(
cid: str,
oid: str,
shell: Shell,
endpoint: str,
lifetime: Optional[int] = None,
expire_at: Optional[int] = None,
endpoint: Optional[str] = None,
address: Optional[str] = None,
bearer: Optional[str] = None,
session: Optional[str] = None,
@ -298,7 +392,8 @@ def lock_object(
oid: Object ID.
lifetime: Lock lifetime.
expire_at: Lock expiration epoch.
endpoint: Remote node address.
shell: executor for cli command
endpoint: NeoFS endpoint to send request to, appends to `--rpc-endpoint` key
session: Path to a JSON-encoded container session token.
ttl: TTL value in request meta header (default 2).
wallet: WIF (NEP-2) string or path to the wallet or binary key.
@ -310,7 +405,7 @@ def lock_object(
cli = NeofsCli(shell, NEOFS_CLI_EXEC, wallet_config or WALLET_CONFIG)
result = cli.object.lock(
rpc_endpoint=endpoint or NEOFS_ENDPOINT,
rpc_endpoint=endpoint,
lifetime=lifetime,
expire_at=expire_at,
address=address,
@ -334,8 +429,8 @@ def search_object(
wallet: str,
cid: str,
shell: Shell,
endpoint: str,
bearer: str = "",
endpoint: Optional[str] = None,
filters: Optional[dict] = None,
expected_objects_list: Optional[list] = None,
wallet_config: Optional[str] = None,
@ -348,26 +443,26 @@ def search_object(
SEARCH an Object.
Args:
wallet (str): wallet on whose behalf SEARCH is done
cid (str): ID of Container where we get the Object from
wallet: wallet on whose behalf SEARCH is done
cid: ID of Container where we get the Object from
shell: executor for cli command
bearer (optional, str): path to Bearer Token file, appends to `--bearer` key
endpoint (optional, str): NeoFS endpoint to send request to, appends to `--rpc-endpoint` key
filters (optional, dict): key=value pairs to filter Objects
expected_objects_list (optional, list): a list of ObjectIDs to compare found Objects with
wallet_config(optional, str): path to the wallet config
xhdr (optional, dict): Request X-Headers in form of Key=Value
session (optional, dict): path to a JSON-encoded container session token
bearer: path to Bearer Token file, appends to `--bearer` key
endpoint: NeoFS endpoint to send request to, appends to `--rpc-endpoint` key
filters: key=value pairs to filter Objects
expected_objects_list: a list of ObjectIDs to compare found Objects with
wallet_config: path to the wallet config
xhdr: Request X-Headers in form of Key=Value
session: path to a JSON-encoded container session token
phy: Search physically stored objects.
root: Search for user objects.
Returns:
(list): list of found ObjectIDs
list of found ObjectIDs
"""
cli = NeofsCli(shell, NEOFS_CLI_EXEC, wallet_config or WALLET_CONFIG)
result = cli.object.search(
rpc_endpoint=endpoint or NEOFS_ENDPOINT,
rpc_endpoint=endpoint,
wallet=wallet,
cid=cid,
bearer=bearer,
@ -401,8 +496,8 @@ def search_object(
def get_netmap_netinfo(
wallet: str,
shell: Shell,
endpoint: str,
wallet_config: Optional[str] = None,
endpoint: Optional[str] = None,
address: Optional[str] = None,
ttl: Optional[int] = None,
xhdr: Optional[dict] = None,
@ -411,7 +506,7 @@ def get_netmap_netinfo(
Get netmap netinfo output from node
Args:
wallet (str): wallet on whose behalf SEARCH is done
wallet (str): wallet on whose behalf request is done
shell: executor for cli command
endpoint (optional, str): NeoFS endpoint to send request to, appends to `--rpc-endpoint` key
address: Address of wallet account
@ -426,7 +521,7 @@ def get_netmap_netinfo(
cli = NeofsCli(shell, NEOFS_CLI_EXEC, wallet_config or WALLET_CONFIG)
output = cli.netmap.netinfo(
wallet=wallet,
rpc_endpoint=endpoint or NEOFS_ENDPOINT,
rpc_endpoint=endpoint,
address=address,
ttl=ttl,
xhdr=xhdr,
@ -452,9 +547,9 @@ def head_object(
cid: str,
oid: str,
shell: Shell,
endpoint: str,
bearer: str = "",
xhdr: Optional[dict] = None,
endpoint: Optional[str] = None,
json_output: bool = True,
is_raw: bool = False,
is_direct: bool = False,
@ -489,7 +584,7 @@ def head_object(
cli = NeofsCli(shell, NEOFS_CLI_EXEC, wallet_config or WALLET_CONFIG)
result = cli.object.head(
rpc_endpoint=endpoint or NEOFS_ENDPOINT,
rpc_endpoint=endpoint,
wallet=wallet,
cid=cid,
oid=oid,

View file

@ -6,17 +6,10 @@ from dataclasses import dataclass
from typing import Optional
import allure
from common import (
MORPH_BLOCK_TIME,
NEOFS_CLI_EXEC,
NEOFS_NETMAP_DICT,
STORAGE_WALLET_CONFIG,
STORAGE_WALLET_PASS,
)
from data_formatters import get_wallet_public_key
from cluster import Cluster, StorageNode
from common import MORPH_BLOCK_TIME, NEOFS_CLI_EXEC
from epoch import tick_epoch
from neofs_testlib.cli import NeofsCli
from neofs_testlib.hosting import Hosting
from neofs_testlib.shell import Shell
from utility import parse_time
@ -39,183 +32,189 @@ class HealthStatus:
return HealthStatus(network, health)
@allure.step("Stop storage nodes")
def stop_nodes(hosting: Hosting, number: int, nodes: list[str]) -> list[str]:
@allure.step("Stop random storage nodes")
def stop_random_storage_nodes(number: int, nodes: list[StorageNode]) -> list[StorageNode]:
"""
Shuts down the given number of randomly selected storage nodes.
Args:
number (int): the number of nodes to shut down
nodes (list): the list of nodes for possible shut down
number: the number of storage nodes to stop
nodes: the list of storage nodes to stop
Returns:
(list): the list of nodes that were shut down
the list of nodes that were stopped
"""
nodes_to_stop = random.sample(nodes, number)
for node in nodes_to_stop:
host = hosting.get_host_by_service(node)
host.stop_service(node)
node.stop_service()
return nodes_to_stop
@allure.step("Start storage nodes")
def start_nodes(hosting: Hosting, nodes: list[str]) -> None:
@allure.step("Start storage node")
def start_storage_nodes(nodes: list[StorageNode]) -> None:
"""
The function starts specified storage nodes.
Args:
nodes (list): the list of nodes to start
nodes: the list of nodes to start
"""
for node in nodes:
host = hosting.get_host_by_service(node)
host.start_service(node)
node.start_service()
@allure.step("Get Locode")
def get_locode() -> str:
endpoint_values = random.choice(list(NEOFS_NETMAP_DICT.values()))
locode = endpoint_values["UN-LOCODE"]
logger.info(f"Random locode chosen: {locode}")
@allure.step("Get Locode from random storage node")
def get_locode_from_random_node(cluster: Cluster) -> str:
node = random.choice(cluster.storage_nodes)
locode = node.get_un_locode()
logger.info(f"Chosen '{locode}' locode from node {node}")
return locode
@allure.step("Healthcheck for node {node_name}")
def node_healthcheck(hosting: Hosting, node_name: str) -> HealthStatus:
@allure.step("Healthcheck for storage node {node}")
def storage_node_healthcheck(node: StorageNode) -> HealthStatus:
"""
The function returns node's health status.
The function returns storage node's health status.
Args:
node_name str: node name for which health status should be retrieved.
node: storage node for which health status should be retrieved.
Returns:
health status as HealthStatus object.
"""
command = "control healthcheck"
output = _run_control_command_with_retries(hosting, node_name, command)
output = _run_control_command_with_retries(node, command)
return HealthStatus.from_stdout(output)
@allure.step("Set status for node {node_name}")
def node_set_status(hosting: Hosting, node_name: str, status: str, retries: int = 0) -> None:
@allure.step("Set status for {node}")
def storage_node_set_status(node: StorageNode, status: str, retries: int = 0) -> None:
"""
The function sets particular status for given node.
Args:
node_name: node name for which status should be set.
node: node for which status should be set.
status: online or offline.
retries (optional, int): number of retry attempts if it didn't work from the first time
"""
command = f"control set-status --status {status}"
_run_control_command_with_retries(hosting, node_name, command, retries)
_run_control_command_with_retries(node, command, retries)
@allure.step("Get netmap snapshot")
def get_netmap_snapshot(node_name: str, shell: Shell) -> str:
def get_netmap_snapshot(node: StorageNode, shell: Shell) -> str:
"""
The function returns string representation of netmap snapshot.
Args:
node_name str: node name from which netmap snapshot should be requested.
node: node from which netmap snapshot should be requested.
Returns:
string representation of netmap
"""
node_info = NEOFS_NETMAP_DICT[node_name]
cli = NeofsCli(shell, NEOFS_CLI_EXEC, config_file=STORAGE_WALLET_CONFIG)
storage_wallet_config = node.get_wallet_config_path()
storage_wallet_path = node.get_wallet_path()
cli = NeofsCli(shell, NEOFS_CLI_EXEC, config_file=storage_wallet_config)
return cli.netmap.snapshot(
rpc_endpoint=node_info["rpc"],
wallet=node_info["wallet_path"],
rpc_endpoint=node.get_rpc_endpoint(),
wallet=storage_wallet_path,
).stdout
@allure.step("Get shard list for node {node_name}")
def node_shard_list(hosting: Hosting, node_name: str) -> list[str]:
@allure.step("Get shard list for {node}")
def node_shard_list(node: StorageNode) -> list[str]:
"""
The function returns list of shards for specified node.
The function returns list of shards for specified storage node.
Args:
node_name str: node name for which shards should be returned.
node: node for which shards should be returned.
Returns:
list of shards.
"""
command = "control shards list"
output = _run_control_command_with_retries(hosting, node_name, command)
output = _run_control_command_with_retries(node, command)
return re.findall(r"Shard (.*):", output)
@allure.step("Shard set for node {node_name}")
def node_shard_set_mode(hosting: Hosting, node_name: str, shard: str, mode: str) -> str:
@allure.step("Shard set for {node}")
def node_shard_set_mode(node: StorageNode, shard: str, mode: str) -> str:
"""
The function sets mode for specified shard.
Args:
node_name str: node name on which shard mode should be set.
node: node on which shard mode should be set.
"""
command = f"control shards set-mode --id {shard} --mode {mode}"
return _run_control_command_with_retries(hosting, node_name, command)
return _run_control_command_with_retries(node, command)
@allure.step("Drop object from node {node_name}")
def drop_object(hosting: Hosting, node_name: str, cid: str, oid: str) -> str:
@allure.step("Drop object from {node}")
def drop_object(node: StorageNode, cid: str, oid: str) -> str:
"""
The function drops object from specified node.
Args:
node_name str: node name from which object should be dropped.
node_id str: node from which object should be dropped.
"""
command = f"control drop-objects -o {cid}/{oid}"
return _run_control_command_with_retries(hosting, node_name, command)
return _run_control_command_with_retries(node, command)
@allure.step("Delete data of node {node_name}")
def delete_node_data(hosting: Hosting, node_name: str) -> None:
host = hosting.get_host_by_service(node_name)
host.stop_service(node_name)
host.delete_storage_node_data(node_name)
@allure.step("Delete data from host for node {node}")
def delete_node_data(node: StorageNode) -> None:
node.stop_service()
node.host.delete_storage_node_data(node.name)
time.sleep(parse_time(MORPH_BLOCK_TIME))
@allure.step("Exclude node {node_to_exclude} from network map")
def exclude_node_from_network_map(
hosting: Hosting, node_to_exclude: str, alive_node: str, shell: Shell
node_to_exclude: StorageNode,
alive_node: StorageNode,
shell: Shell,
cluster: Cluster,
) -> None:
node_wallet_path = NEOFS_NETMAP_DICT[node_to_exclude]["wallet_path"]
node_netmap_key = get_wallet_public_key(node_wallet_path, STORAGE_WALLET_PASS)
node_netmap_key = node_to_exclude.get_wallet_public_key()
node_set_status(hosting, node_to_exclude, status="offline")
storage_node_set_status(node_to_exclude, status="offline")
time.sleep(parse_time(MORPH_BLOCK_TIME))
tick_epoch(shell=shell)
tick_epoch(shell, cluster)
snapshot = get_netmap_snapshot(node_name=alive_node, shell=shell)
snapshot = get_netmap_snapshot(node=alive_node, shell=shell)
assert (
node_netmap_key not in snapshot
), f"Expected node with key {node_netmap_key} not in network map"
), f"Expected node with key {node_netmap_key} to be absent in network map"
@allure.step("Include node {node_to_include} into network map")
def include_node_to_network_map(
hosting: Hosting, node_to_include: str, alive_node: str, shell: Shell
node_to_include: StorageNode,
alive_node: StorageNode,
shell: Shell,
cluster: Cluster,
) -> None:
node_set_status(hosting, node_to_include, status="online")
storage_node_set_status(node_to_include, status="online")
# Per suggestion of @fyrchik we need to wait for 2 blocks after we set status and after tick epoch.
# First sleep can be ommited afer https://github.com/nspcc-dev/neofs-node/issues/1790 complete.
# First sleep can be omitted after https://github.com/nspcc-dev/neofs-node/issues/1790 complete.
time.sleep(parse_time(MORPH_BLOCK_TIME) * 2)
tick_epoch(shell=shell)
tick_epoch(shell, cluster)
time.sleep(parse_time(MORPH_BLOCK_TIME) * 2)
check_node_in_map(node_to_include, shell, alive_node)
@allure.step("Check node {node_name} in network map")
def check_node_in_map(node_name: str, shell: Shell, alive_node: Optional[str] = None) -> None:
alive_node = alive_node or node_name
node_wallet_path = NEOFS_NETMAP_DICT[node_name]["wallet_path"]
node_netmap_key = get_wallet_public_key(node_wallet_path, STORAGE_WALLET_PASS)
@allure.step("Check node {node} in network map")
def check_node_in_map(
node: StorageNode, shell: Shell, alive_node: Optional[StorageNode] = None
) -> None:
alive_node = alive_node or node
logger.info(f"Node {node_name} netmap key: {node_netmap_key}")
node_netmap_key = node.get_wallet_public_key()
logger.info(f"Node ({node.label}) netmap key: {node_netmap_key}")
snapshot = get_netmap_snapshot(node_name=alive_node, shell=shell)
assert node_netmap_key in snapshot, f"Expected node with key {node_netmap_key} in network map"
snapshot = get_netmap_snapshot(alive_node, shell)
assert (
node_netmap_key in snapshot
), f"Expected node with key {node_netmap_key} to be in network map"
def _run_control_command_with_retries(
hosting: Hosting, node_name: str, command: str, retries: int = 0
) -> str:
def _run_control_command_with_retries(node: StorageNode, command: str, retries: int = 0) -> str:
for attempt in range(1 + retries): # original attempt + specified retries
try:
return _run_control_command(hosting, node_name, command)
return _run_control_command(node, command)
except AssertionError as err:
if attempt < retries:
logger.warning(f"Command {command} failed with error {err} and will be retried")
@ -223,16 +222,16 @@ def _run_control_command_with_retries(
raise AssertionError(f"Command {command} failed with error {err}") from err
def _run_control_command(hosting: Hosting, service_name: str, command: str) -> None:
host = hosting.get_host_by_service(service_name)
def _run_control_command(node: StorageNode, command: str) -> None:
host = node.host
service_config = host.get_service_config(service_name)
service_config = host.get_service_config(node.name)
wallet_path = service_config.attributes["wallet_path"]
wallet_password = service_config.attributes["wallet_password"]
control_endpoint = service_config.attributes["control_endpoint"]
shell = host.get_shell()
wallet_config_path = f"/tmp/{service_name}-config.yaml"
wallet_config_path = f"/tmp/{node.name}-config.yaml"
wallet_config = f'password: "{wallet_password}"'
shell.exec(f"echo '{wallet_config}' > {wallet_config_path}")

View file

@ -1,16 +1,17 @@
from typing import Optional
import allure
from cluster import Cluster
from file_helper import get_file_hash
from grpc_responses import OBJECT_ACCESS_DENIED, error_matches_status
from neofs_testlib.shell import Shell
from python_keywords.neofs_verbs import (
delete_object,
get_object,
get_object_from_random_node,
get_range,
get_range_hash,
head_object,
put_object,
put_object_to_random_node,
search_object,
)
@ -23,13 +24,14 @@ def can_get_object(
oid: str,
file_name: str,
shell: Shell,
cluster: Cluster,
bearer: Optional[str] = None,
wallet_config: Optional[str] = None,
xhdr: Optional[dict] = None,
) -> bool:
with allure.step("Try get object from container"):
try:
got_file_path = get_object(
got_file_path = get_object_from_random_node(
wallet,
cid,
oid,
@ -37,6 +39,7 @@ def can_get_object(
wallet_config=wallet_config,
xhdr=xhdr,
shell=shell,
cluster=cluster,
)
except OPERATION_ERROR_TYPE as err:
assert error_matches_status(
@ -52,6 +55,7 @@ def can_put_object(
cid: str,
file_name: str,
shell: Shell,
cluster: Cluster,
bearer: Optional[str] = None,
wallet_config: Optional[str] = None,
xhdr: Optional[dict] = None,
@ -59,7 +63,7 @@ def can_put_object(
) -> bool:
with allure.step("Try put object to container"):
try:
put_object(
put_object_to_random_node(
wallet,
file_name,
cid,
@ -68,6 +72,7 @@ def can_put_object(
xhdr=xhdr,
attributes=attributes,
shell=shell,
cluster=cluster,
)
except OPERATION_ERROR_TYPE as err:
assert error_matches_status(
@ -82,6 +87,7 @@ def can_delete_object(
cid: str,
oid: str,
shell: Shell,
endpoint: str,
bearer: Optional[str] = None,
wallet_config: Optional[str] = None,
xhdr: Optional[dict] = None,
@ -89,7 +95,14 @@ def can_delete_object(
with allure.step("Try delete object from container"):
try:
delete_object(
wallet, cid, oid, bearer=bearer, wallet_config=wallet_config, xhdr=xhdr, shell=shell
wallet,
cid,
oid,
bearer=bearer,
wallet_config=wallet_config,
xhdr=xhdr,
shell=shell,
endpoint=endpoint,
)
except OPERATION_ERROR_TYPE as err:
assert error_matches_status(
@ -104,6 +117,7 @@ def can_get_head_object(
cid: str,
oid: str,
shell: Shell,
endpoint: str,
bearer: Optional[str] = None,
wallet_config: Optional[str] = None,
xhdr: Optional[dict] = None,
@ -118,6 +132,7 @@ def can_get_head_object(
wallet_config=wallet_config,
xhdr=xhdr,
shell=shell,
endpoint=endpoint,
)
except OPERATION_ERROR_TYPE as err:
assert error_matches_status(
@ -132,6 +147,7 @@ def can_get_range_of_object(
cid: str,
oid: str,
shell: Shell,
endpoint: str,
bearer: Optional[str] = None,
wallet_config: Optional[str] = None,
xhdr: Optional[dict] = None,
@ -147,6 +163,7 @@ def can_get_range_of_object(
wallet_config=wallet_config,
xhdr=xhdr,
shell=shell,
endpoint=endpoint,
)
except OPERATION_ERROR_TYPE as err:
assert error_matches_status(
@ -161,6 +178,7 @@ def can_get_range_hash_of_object(
cid: str,
oid: str,
shell: Shell,
endpoint: str,
bearer: Optional[str] = None,
wallet_config: Optional[str] = None,
xhdr: Optional[dict] = None,
@ -176,6 +194,7 @@ def can_get_range_hash_of_object(
wallet_config=wallet_config,
xhdr=xhdr,
shell=shell,
endpoint=endpoint,
)
except OPERATION_ERROR_TYPE as err:
assert error_matches_status(
@ -189,6 +208,7 @@ def can_search_object(
wallet: str,
cid: str,
shell: Shell,
endpoint: str,
oid: Optional[str] = None,
bearer: Optional[str] = None,
wallet_config: Optional[str] = None,
@ -197,7 +217,13 @@ def can_search_object(
with allure.step("Try search object in container"):
try:
oids = search_object(
wallet, cid, bearer=bearer, wallet_config=wallet_config, xhdr=xhdr, shell=shell
wallet,
cid,
bearer=bearer,
wallet_config=wallet_config,
xhdr=xhdr,
shell=shell,
endpoint=endpoint,
)
except OPERATION_ERROR_TYPE as err:
assert error_matches_status(

View file

@ -6,19 +6,9 @@ import time
from typing import Optional
import allure
from common import (
GAS_HASH,
MAINNET_BLOCK_TIME,
MAINNET_SINGLE_ADDR,
MAINNET_WALLET_PASS,
MAINNET_WALLET_PATH,
MORPH_ENDPOINT,
NEO_MAINNET_ENDPOINT,
NEOFS_CONTRACT,
NEOGO_EXECUTABLE,
)
from cluster import MainChain, MorphChain
from common import GAS_HASH, MAINNET_BLOCK_TIME, NEOFS_CONTRACT, NEOGO_EXECUTABLE
from neo3 import wallet as neo3_wallet
from neofs_testlib.blockchain import RPCClient
from neofs_testlib.cli import NeoGo
from neofs_testlib.shell import Shell
from neofs_testlib.utils.converters import contract_hash_to_address
@ -32,30 +22,26 @@ TX_PERSIST_TIMEOUT = 15 # seconds
ASSET_POWER_MAINCHAIN = 10**8
ASSET_POWER_SIDECHAIN = 10**12
morph_rpc_client = RPCClient(MORPH_ENDPOINT)
mainnet_rpc_client = RPCClient(NEO_MAINNET_ENDPOINT)
def get_nns_contract_hash(morph_chain: MorphChain) -> str:
return morph_chain.rpc_client.get_contract_state(1)["hash"]
def get_nns_contract_hash() -> str:
rpc_client = RPCClient(MORPH_ENDPOINT)
return rpc_client.get_contract_state(1)["hash"]
def get_contract_hash(resolve_name: str, shell: Shell) -> str:
nns_contract_hash = get_nns_contract_hash()
def get_contract_hash(morph_chain: MorphChain, resolve_name: str, shell: Shell) -> str:
nns_contract_hash = get_nns_contract_hash(morph_chain)
neogo = NeoGo(shell=shell, neo_go_exec_path=NEOGO_EXECUTABLE)
out = neogo.contract.testinvokefunction(
scripthash=nns_contract_hash,
method="resolve",
arguments=f"string:{resolve_name} int:16",
rpc_endpoint=MORPH_ENDPOINT,
rpc_endpoint=morph_chain.get_endpoint(),
)
stack_data = json.loads(out.stdout.replace("\n", ""))["stack"][0]["value"]
return bytes.decode(base64.b64decode(stack_data[0]["value"]))
@allure.step("Withdraw Mainnet Gas")
def withdraw_mainnet_gas(shell: Shell, wlt: str, amount: int):
def withdraw_mainnet_gas(shell: Shell, main_chain: MainChain, wlt: str, amount: int):
address = get_last_address_from_wallet(wlt, EMPTY_PASSWORD)
scripthash = neo3_wallet.Account.address_to_script_hash(address)
@ -63,7 +49,7 @@ def withdraw_mainnet_gas(shell: Shell, wlt: str, amount: int):
out = neogo.contract.invokefunction(
wallet=wlt,
address=address,
rpc_endpoint=NEO_MAINNET_ENDPOINT,
rpc_endpoint=main_chain.get_endpoint(),
scripthash=NEOFS_CONTRACT,
method="withdraw",
arguments=f"{scripthash} int:{amount}",
@ -79,7 +65,7 @@ def withdraw_mainnet_gas(shell: Shell, wlt: str, amount: int):
raise AssertionError(f"TX {tx} hasn't been processed")
def transaction_accepted(tx_id: str):
def transaction_accepted(main_chain: MainChain, tx_id: str):
"""
This function returns True in case of accepted TX.
Args:
@ -91,7 +77,7 @@ def transaction_accepted(tx_id: str):
try:
for _ in range(0, TX_PERSIST_TIMEOUT):
time.sleep(1)
resp = mainnet_rpc_client.get_transaction_height(tx_id)
resp = main_chain.rpc_client.get_transaction_height(tx_id)
if resp is not None:
logger.info(f"TX is accepted in block: {resp}")
return True
@ -102,7 +88,7 @@ def transaction_accepted(tx_id: str):
@allure.step("Get NeoFS Balance")
def get_balance(shell: Shell, wallet_path: str, wallet_password: str = ""):
def get_balance(shell: Shell, morph_chain: MorphChain, wallet_path: str, wallet_password: str = ""):
"""
This function returns NeoFS balance for given wallet.
"""
@ -111,8 +97,8 @@ def get_balance(shell: Shell, wallet_path: str, wallet_password: str = ""):
acc = wallet.accounts[-1]
payload = [{"type": "Hash160", "value": str(acc.script_hash)}]
try:
resp = morph_rpc_client.invoke_function(
get_contract_hash("balance.neofs", shell=shell), "balanceOf", payload
resp = morph_chain.rpc_client.invoke_function(
get_contract_hash(morph_chain, "balance.neofs", shell=shell), "balanceOf", payload
)
logger.info(f"Got response \n{resp}")
value = int(resp["stack"][0]["value"])
@ -126,9 +112,10 @@ def get_balance(shell: Shell, wallet_path: str, wallet_password: str = ""):
def transfer_gas(
shell: Shell,
amount: int,
wallet_from_path: str = MAINNET_WALLET_PATH,
wallet_from_password: str = MAINNET_WALLET_PASS,
address_from: str = MAINNET_SINGLE_ADDR,
main_chain: MainChain,
wallet_from_path: Optional[str] = None,
wallet_from_password: Optional[str] = None,
address_from: Optional[str] = None,
address_to: Optional[str] = None,
wallet_to_path: Optional[str] = None,
wallet_to_password: Optional[str] = None,
@ -148,11 +135,20 @@ def transfer_gas(
address_to: The address of the wallet to transfer assets to.
amount: Amount of gas to transfer.
"""
wallet_from_path = wallet_from_path or main_chain.get_wallet_path()
wallet_from_password = (
wallet_from_password
if wallet_from_password is not None
else main_chain.get_wallet_password()
)
address_from = address_from or get_last_address_from_wallet(
wallet_from_path, wallet_from_password
)
address_to = address_to or get_last_address_from_wallet(wallet_to_path, wallet_to_password)
neogo = NeoGo(shell, neo_go_exec_path=NEOGO_EXECUTABLE)
out = neogo.nep17.transfer(
rpc_endpoint=NEO_MAINNET_ENDPOINT,
rpc_endpoint=main_chain.get_endpoint(),
wallet=wallet_from_path,
wallet_password=wallet_from_password,
amount=amount,
@ -164,13 +160,19 @@ def transfer_gas(
txid = out.stdout.strip().split("\n")[-1]
if len(txid) != 64:
raise Exception("Got no TXID after run the command")
if not transaction_accepted(txid):
if not transaction_accepted(main_chain, txid):
raise AssertionError(f"TX {txid} hasn't been processed")
time.sleep(parse_time(MAINNET_BLOCK_TIME))
@allure.step("NeoFS Deposit")
def deposit_gas(shell: Shell, amount: int, wallet_from_path: str, wallet_from_password: str):
def deposit_gas(
shell: Shell,
main_chain: MainChain,
amount: int,
wallet_from_path: str,
wallet_from_password: str,
):
"""
Transferring GAS from given wallet to NeoFS contract address.
"""
@ -182,6 +184,7 @@ def deposit_gas(shell: Shell, amount: int, wallet_from_path: str, wallet_from_pa
)
transfer_gas(
shell=shell,
main_chain=main_chain,
amount=amount,
wallet_from_path=wallet_from_path,
wallet_from_password=wallet_from_password,
@ -191,8 +194,8 @@ def deposit_gas(shell: Shell, amount: int, wallet_from_path: str, wallet_from_pa
@allure.step("Get Mainnet Balance")
def get_mainnet_balance(address: str):
resp = mainnet_rpc_client.get_nep17_balances(address=address)
def get_mainnet_balance(main_chain: MainChain, address: str):
resp = main_chain.rpc_client.get_nep17_balances(address=address)
logger.info(f"Got getnep17balances response: {resp}")
for balance in resp["balance"]:
if balance["assethash"] == GAS_HASH:
@ -201,8 +204,8 @@ def get_mainnet_balance(address: str):
@allure.step("Get Sidechain Balance")
def get_sidechain_balance(address: str):
resp = morph_rpc_client.get_nep17_balances(address=address)
def get_sidechain_balance(morph_chain: MorphChain, address: str):
resp = morph_chain.rpc_client.get_nep17_balances(address=address)
logger.info(f"Got getnep17balances response: {resp}")
for balance in resp["balance"]:
if balance["assethash"] == GAS_HASH:

View file

@ -6,7 +6,8 @@ import logging
from typing import Optional
import allure
from common import COMPLEX_OBJ_SIZE, NEOFS_CLI_EXEC, NEOFS_ENDPOINT, SIMPLE_OBJ_SIZE, WALLET_CONFIG
from cluster import Cluster
from common import COMPLEX_OBJ_SIZE, NEOFS_CLI_EXEC, SIMPLE_OBJ_SIZE, WALLET_CONFIG
from complex_object_actions import get_link_object
from neofs_testlib.cli import NeofsCli
from neofs_testlib.shell import Shell
@ -18,6 +19,7 @@ logger = logging.getLogger("NeoLogger")
@allure.step("Put Storagegroup")
def put_storagegroup(
shell: Shell,
endpoint: str,
wallet: str,
cid: str,
objects: list,
@ -47,7 +49,7 @@ def put_storagegroup(
lifetime=lifetime,
members=objects,
bearer=bearer,
rpc_endpoint=NEOFS_ENDPOINT,
rpc_endpoint=endpoint,
)
gid = result.stdout.split("\n")[1].split(": ")[1]
return gid
@ -56,6 +58,7 @@ def put_storagegroup(
@allure.step("List Storagegroup")
def list_storagegroup(
shell: Shell,
endpoint: str,
wallet: str,
cid: str,
bearer: Optional[str] = None,
@ -78,7 +81,7 @@ def list_storagegroup(
wallet=wallet,
cid=cid,
bearer=bearer,
rpc_endpoint=NEOFS_ENDPOINT,
rpc_endpoint=endpoint,
)
# throwing off the first string of output
found_objects = result.stdout.split("\n")[1:]
@ -88,6 +91,7 @@ def list_storagegroup(
@allure.step("Get Storagegroup")
def get_storagegroup(
shell: Shell,
endpoint: str,
wallet: str,
cid: str,
gid: str,
@ -112,7 +116,7 @@ def get_storagegroup(
cid=cid,
bearer=bearer,
id=gid,
rpc_endpoint=NEOFS_ENDPOINT,
rpc_endpoint=endpoint,
)
# TODO: temporary solution for parsing output. Needs to be replaced with
@ -136,6 +140,7 @@ def get_storagegroup(
@allure.step("Delete Storagegroup")
def delete_storagegroup(
shell: Shell,
endpoint: str,
wallet: str,
cid: str,
gid: str,
@ -160,7 +165,7 @@ def delete_storagegroup(
cid=cid,
bearer=bearer,
id=gid,
rpc_endpoint=NEOFS_ENDPOINT,
rpc_endpoint=endpoint,
)
tombstone_id = result.stdout.strip().split("\n")[1].split(": ")[1]
return tombstone_id
@ -169,6 +174,7 @@ def delete_storagegroup(
@allure.step("Verify list operation over Storagegroup")
def verify_list_storage_group(
shell: Shell,
endpoint: str,
wallet: str,
cid: str,
gid: str,
@ -176,7 +182,12 @@ def verify_list_storage_group(
wallet_config: str = WALLET_CONFIG,
):
storage_groups = list_storagegroup(
shell=shell, wallet=wallet, cid=cid, bearer=bearer, wallet_config=wallet_config
shell=shell,
endpoint=endpoint,
wallet=wallet,
cid=cid,
bearer=bearer,
wallet_config=wallet_config,
)
assert gid in storage_groups
@ -184,6 +195,7 @@ def verify_list_storage_group(
@allure.step("Verify get operation over Storagegroup")
def verify_get_storage_group(
shell: Shell,
cluster: Cluster,
wallet: str,
cid: str,
gid: str,
@ -193,16 +205,24 @@ def verify_get_storage_group(
wallet_config: str = WALLET_CONFIG,
):
obj_parts = []
endpoint = cluster.default_rpc_endpoint
if object_size == COMPLEX_OBJ_SIZE:
for obj in obj_list:
link_oid = get_link_object(
wallet, cid, obj, shell=shell, bearer=bearer, wallet_config=wallet_config
wallet,
cid,
obj,
shell=shell,
nodes=cluster.storage_nodes,
bearer=bearer,
wallet_config=wallet_config,
)
obj_head = head_object(
wallet=wallet,
cid=cid,
oid=link_oid,
shell=shell,
endpoint=endpoint,
is_raw=True,
bearer=bearer,
wallet_config=wallet_config,
@ -212,6 +232,7 @@ def verify_get_storage_group(
obj_num = len(obj_list)
storagegroup_data = get_storagegroup(
shell=shell,
endpoint=endpoint,
wallet=wallet,
cid=cid,
gid=gid,

View file

@ -6,12 +6,12 @@
"""
import logging
from typing import List, Optional
from typing import List
import allure
import complex_object_actions
import neofs_verbs
from common import NEOFS_NETMAP
from cluster import StorageNode
from grpc_responses import OBJECT_NOT_FOUND, error_matches_status
from neofs_testlib.shell import Shell
@ -19,7 +19,9 @@ logger = logging.getLogger("NeoLogger")
@allure.step("Get Object Copies")
def get_object_copies(complexity: str, wallet: str, cid: str, oid: str, shell: Shell) -> int:
def get_object_copies(
complexity: str, wallet: str, cid: str, oid: str, shell: Shell, nodes: list[StorageNode]
) -> int:
"""
The function performs requests to all nodes of the container and
finds out if they store a copy of the object. The procedure is
@ -37,14 +39,16 @@ def get_object_copies(complexity: str, wallet: str, cid: str, oid: str, shell: S
(int): the number of object copies in the container
"""
return (
get_simple_object_copies(wallet, cid, oid, shell)
get_simple_object_copies(wallet, cid, oid, shell, nodes)
if complexity == "Simple"
else get_complex_object_copies(wallet, cid, oid, shell)
else get_complex_object_copies(wallet, cid, oid, shell, nodes)
)
@allure.step("Get Simple Object Copies")
def get_simple_object_copies(wallet: str, cid: str, oid: str, shell: Shell) -> int:
def get_simple_object_copies(
wallet: str, cid: str, oid: str, shell: Shell, nodes: list[StorageNode]
) -> int:
"""
To figure out the number of a simple object copies, only direct
HEAD requests should be made to the every node of the container.
@ -55,14 +59,15 @@ def get_simple_object_copies(wallet: str, cid: str, oid: str, shell: Shell) -> i
cid (str): ID of the container
oid (str): ID of the Object
shell: executor for cli command
nodes: nodes to search on
Returns:
(int): the number of object copies in the container
"""
copies = 0
for node in NEOFS_NETMAP:
for node in nodes:
try:
response = neofs_verbs.head_object(
wallet, cid, oid, shell=shell, endpoint=node, is_direct=True
wallet, cid, oid, shell=shell, endpoint=node.get_rpc_endpoint(), is_direct=True
)
if response:
logger.info(f"Found object {oid} on node {node}")
@ -74,7 +79,9 @@ def get_simple_object_copies(wallet: str, cid: str, oid: str, shell: Shell) -> i
@allure.step("Get Complex Object Copies")
def get_complex_object_copies(wallet: str, cid: str, oid: str, shell: Shell) -> int:
def get_complex_object_copies(
wallet: str, cid: str, oid: str, shell: Shell, nodes: list[StorageNode]
) -> int:
"""
To figure out the number of a complex object copies, we firstly
need to retrieve its Last object. We consider that the number of
@ -90,37 +97,40 @@ def get_complex_object_copies(wallet: str, cid: str, oid: str, shell: Shell) ->
Returns:
(int): the number of object copies in the container
"""
last_oid = complex_object_actions.get_last_object(wallet, cid, oid, shell)
last_oid = complex_object_actions.get_last_object(wallet, cid, oid, shell, nodes)
assert last_oid, f"No Last Object for {cid}/{oid} found among all Storage Nodes"
return get_simple_object_copies(wallet, cid, last_oid, shell)
return get_simple_object_copies(wallet, cid, last_oid, shell, nodes)
@allure.step("Get Nodes With Object")
def get_nodes_with_object(
wallet: str, cid: str, oid: str, shell: Shell, skip_nodes: Optional[list[str]] = None
) -> list[str]:
cid: str, oid: str, shell: Shell, nodes: list[StorageNode]
) -> list[StorageNode]:
"""
The function returns list of nodes which store
the given object.
Args:
wallet (str): the path to the wallet on whose behalf
we request the nodes
cid (str): ID of the container which store the object
oid (str): object ID
shell: executor for cli command
skip_nodes (list): list of nodes that should be excluded from check
nodes: nodes to find on
Returns:
(list): nodes which store the object
"""
nodes_to_search = NEOFS_NETMAP
if skip_nodes:
nodes_to_search = [node for node in NEOFS_NETMAP if node not in skip_nodes]
nodes_list = []
for node in nodes_to_search:
for node in nodes:
wallet = node.get_wallet_path()
wallet_config = node.get_wallet_config_path()
try:
res = neofs_verbs.head_object(
wallet, cid, oid, shell=shell, endpoint=node, is_direct=True
wallet,
cid,
oid,
shell=shell,
endpoint=node.get_rpc_endpoint(),
is_direct=True,
wallet_config=wallet_config,
)
if res is not None:
logger.info(f"Found object {oid} on node {node}")
@ -132,7 +142,9 @@ def get_nodes_with_object(
@allure.step("Get Nodes Without Object")
def get_nodes_without_object(wallet: str, cid: str, oid: str, shell: Shell) -> List[str]:
def get_nodes_without_object(
wallet: str, cid: str, oid: str, shell: Shell, nodes: list[StorageNode]
) -> list[StorageNode]:
"""
The function returns list of nodes which do not store
the given object.
@ -146,10 +158,10 @@ def get_nodes_without_object(wallet: str, cid: str, oid: str, shell: Shell) -> L
(list): nodes which do not store the object
"""
nodes_list = []
for node in NEOFS_NETMAP:
for node in nodes:
try:
res = neofs_verbs.head_object(
wallet, cid, oid, shell=shell, endpoint=node, is_direct=True
wallet, cid, oid, shell=shell, endpoint=node.get_rpc_endpoint(), is_direct=True
)
if res is None:
nodes_list.append(node)

View file

@ -10,8 +10,10 @@ logger = logging.getLogger("NeoLogger")
@allure.step("Verify Head Tombstone")
def verify_head_tombstone(wallet_path: str, cid: str, oid_ts: str, oid: str, shell: Shell):
header = head_object(wallet_path, cid, oid_ts, shell=shell)["header"]
def verify_head_tombstone(
wallet_path: str, cid: str, oid_ts: str, oid: str, shell: Shell, endpoint: str
):
header = head_object(wallet_path, cid, oid_ts, shell=shell, endpoint=endpoint)["header"]
s_oid = header["sessionToken"]["body"]["object"]["target"]["objects"]
logger.info(f"Header Session OIDs is {s_oid}")