frostfs-testcases/pytest_tests/helpers/frostfs_verbs.py
Dmitriy Zayakin 81a5734e76 Add tests for node shutdown and object replication integrity
Signed-off-by: Dmitriy Zayakin <d.zayakin@yadro.com>
2023-06-22 09:00:57 +00:00

704 lines
22 KiB
Python

import json
import logging
import os
import re
import uuid
from typing import Any, Optional
import allure
from frostfs_testlib.cli import FrostfsCli, NeoGo
from frostfs_testlib.shell import Shell
from frostfs_testlib.utils import json_utils
from pytest_tests.helpers.cluster import Cluster
from pytest_tests.resources.common import (
ASSETS_DIR,
CLI_DEFAULT_TIMEOUT,
FROSTFS_CLI_EXEC,
NEOGO_EXECUTABLE,
WALLET_CONFIG,
)
logger = logging.getLogger("NeoLogger")
@allure.step("Get object from random node")
def get_object_from_random_node(
wallet: str,
cid: str,
oid: str,
shell: Shell,
cluster: Cluster,
bearer: Optional[str] = None,
write_object: Optional[str] = None,
xhdr: Optional[dict] = None,
wallet_config: Optional[str] = None,
no_progress: bool = True,
session: Optional[str] = None,
timeout: Optional[str] = CLI_DEFAULT_TIMEOUT,
) -> str:
"""
GET from FrostFS random storage node
Args:
wallet: wallet on whose behalf GET is done
cid: ID of Container where we get the Object from
oid: Object ID
shell: executor for cli command
bearer (optional, str): path to Bearer Token file, appends to `--bearer` key
write_object (optional, str): path to downloaded file, appends to `--file` key
endpoint: FrostFS endpoint to send request to, appends to `--rpc-endpoint` key
wallet_config(optional, str): path to the wallet config
no_progress(optional, bool): do not show progress bar
xhdr (optional, dict): Request X-Headers in form of Key=Value
session (optional, dict): path to a JSON-encoded container session token
timeout: Timeout for the operation.
Returns:
(str): path to downloaded file
"""
endpoint = cluster.get_random_storage_rpc_endpoint()
return get_object(
wallet,
cid,
oid,
shell,
endpoint,
bearer,
write_object,
xhdr,
wallet_config,
no_progress,
session,
timeout,
)
@allure.step("Get object from {endpoint}")
def get_object(
wallet: str,
cid: str,
oid: str,
shell: Shell,
endpoint: str = None,
bearer: Optional[str] = None,
write_object: Optional[str] = None,
xhdr: Optional[dict] = None,
wallet_config: Optional[str] = None,
no_progress: bool = True,
session: Optional[str] = None,
timeout: Optional[str] = CLI_DEFAULT_TIMEOUT,
) -> str:
"""
GET from FrostFS.
Args:
wallet (str): wallet on whose behalf GET is done
cid (str): ID of Container where we get the Object from
oid (str): Object ID
shell: executor for cli command
bearer: path to Bearer Token file, appends to `--bearer` key
write_object: path to downloaded file, appends to `--file` key
endpoint: FrostFS endpoint to send request to, appends to `--rpc-endpoint` key
wallet_config(optional, str): path to the wallet config
no_progress(optional, bool): do not show progress bar
xhdr (optional, dict): Request X-Headers in form of Key=Value
session (optional, dict): path to a JSON-encoded container session token
timeout: Timeout for the operation.
Returns:
(str): path to downloaded file
"""
if not write_object:
write_object = str(uuid.uuid4())
file_path = os.path.join(ASSETS_DIR, write_object)
cli = FrostfsCli(shell, FROSTFS_CLI_EXEC, wallet_config or WALLET_CONFIG)
cli.object.get(
rpc_endpoint=endpoint,
wallet=wallet,
cid=cid,
oid=oid,
file=file_path,
bearer=bearer,
no_progress=no_progress,
xhdr=xhdr,
session=session,
timeout=timeout,
)
return file_path
@allure.step("Get Range Hash from {endpoint}")
def get_range_hash(
wallet: str,
cid: str,
oid: str,
range_cut: str,
shell: Shell,
endpoint: str,
bearer: Optional[str] = None,
wallet_config: Optional[str] = None,
xhdr: Optional[dict] = None,
session: Optional[str] = None,
timeout: Optional[str] = CLI_DEFAULT_TIMEOUT,
):
"""
GETRANGEHASH of given Object.
Args:
wallet: wallet on whose behalf GETRANGEHASH is done
cid: ID of Container where we get the Object from
oid: Object ID
shell: executor for cli command
bearer: path to Bearer Token file, appends to `--bearer` key
range_cut: Range to take hash from in the form offset1:length1,...,
value to pass to the `--range` parameter
endpoint: FrostFS endpoint to send request to, appends to `--rpc-endpoint` key
wallet_config: path to the wallet config
xhdr: Request X-Headers in form of Key=Values
session: Filepath to a JSON- or binary-encoded token of the object RANGEHASH session.
timeout: Timeout for the operation.
Returns:
None
"""
cli = FrostfsCli(shell, FROSTFS_CLI_EXEC, wallet_config or WALLET_CONFIG)
result = cli.object.hash(
rpc_endpoint=endpoint,
wallet=wallet,
cid=cid,
oid=oid,
range=range_cut,
bearer=bearer,
xhdr=xhdr,
session=session,
timeout=timeout,
)
# cutting off output about range offset and length
return result.stdout.split(":")[1].strip()
@allure.step("Put object to random node")
def put_object_to_random_node(
wallet: str,
path: str,
cid: str,
shell: Shell,
cluster: Cluster,
bearer: Optional[str] = None,
attributes: Optional[dict] = None,
xhdr: Optional[dict] = None,
wallet_config: Optional[str] = None,
expire_at: Optional[int] = None,
no_progress: bool = True,
session: Optional[str] = None,
timeout: Optional[str] = CLI_DEFAULT_TIMEOUT,
):
"""
PUT of given file to a random storage node.
Args:
wallet: wallet on whose behalf PUT is done
path: path to file to be PUT
cid: ID of Container where we get the Object from
shell: executor for cli command
cluster: cluster under test
bearer: path to Bearer Token file, appends to `--bearer` key
attributes: User attributes in form of Key1=Value1,Key2=Value2
cluster: cluster under test
wallet_config: path to the wallet config
no_progress: do not show progress bar
expire_at: Last epoch in the life of the object
xhdr: Request X-Headers in form of Key=Value
session: path to a JSON-encoded container session token
timeout: Timeout for the operation.
Returns:
ID of uploaded Object
"""
endpoint = cluster.get_random_storage_rpc_endpoint()
return put_object(
wallet,
path,
cid,
shell,
endpoint,
bearer,
attributes,
xhdr,
wallet_config,
expire_at,
no_progress,
session,
timeout,
)
@allure.step("Put object at {endpoint} in container {cid}")
def put_object(
wallet: str,
path: str,
cid: str,
shell: Shell,
endpoint: str,
bearer: Optional[str] = None,
attributes: Optional[dict] = None,
xhdr: Optional[dict] = None,
wallet_config: Optional[str] = None,
expire_at: Optional[int] = None,
no_progress: bool = True,
session: Optional[str] = None,
timeout: Optional[str] = CLI_DEFAULT_TIMEOUT,
):
"""
PUT of given file.
Args:
wallet: wallet on whose behalf PUT is done
path: path to file to be PUT
cid: ID of Container where we get the Object from
shell: executor for cli command
bearer: path to Bearer Token file, appends to `--bearer` key
attributes: User attributes in form of Key1=Value1,Key2=Value2
endpoint: FrostFS endpoint to send request to, appends to `--rpc-endpoint` key
wallet_config: path to the wallet config
no_progress: do not show progress bar
expire_at: Last epoch in the life of the object
xhdr: Request X-Headers in form of Key=Value
session: path to a JSON-encoded container session token
timeout: Timeout for the operation.
Returns:
(str): ID of uploaded Object
"""
cli = FrostfsCli(shell, FROSTFS_CLI_EXEC, wallet_config or WALLET_CONFIG)
result = cli.object.put(
rpc_endpoint=endpoint,
wallet=wallet,
file=path,
cid=cid,
attributes=attributes,
bearer=bearer,
expire_at=expire_at,
no_progress=no_progress,
xhdr=xhdr,
session=session,
timeout=timeout,
)
# splitting CLI output to lines and taking the penultimate line
id_str = result.stdout.strip().split("\n")[-2]
oid = id_str.split(":")[1]
return oid.strip()
@allure.step("Delete object {cid}/{oid} from {endpoint}")
def delete_object(
wallet: str,
cid: str,
oid: str,
shell: Shell,
endpoint: str = None,
bearer: str = "",
wallet_config: Optional[str] = None,
xhdr: Optional[dict] = None,
session: Optional[str] = None,
timeout: Optional[str] = CLI_DEFAULT_TIMEOUT,
):
"""
DELETE an Object.
Args:
wallet: wallet on whose behalf DELETE is done
cid: ID of Container where we get the Object from
oid: ID of Object we are going to delete
shell: executor for cli command
bearer: path to Bearer Token file, appends to `--bearer` key
endpoint: FrostFS endpoint to send request to, appends to `--rpc-endpoint` key
wallet_config: path to the wallet config
xhdr: Request X-Headers in form of Key=Value
session: path to a JSON-encoded container session token
timeout: Timeout for the operation.
Returns:
(str): Tombstone ID
"""
cli = FrostfsCli(shell, FROSTFS_CLI_EXEC, wallet_config or WALLET_CONFIG)
result = cli.object.delete(
rpc_endpoint=endpoint,
wallet=wallet,
cid=cid,
oid=oid,
bearer=bearer,
xhdr=xhdr,
session=session,
timeout=timeout,
)
id_str = result.stdout.split("\n")[1]
tombstone = id_str.split(":")[1]
return tombstone.strip()
@allure.step("Get Range")
def get_range(
wallet: str,
cid: str,
oid: str,
range_cut: str,
shell: Shell,
endpoint: str = None,
wallet_config: Optional[str] = None,
bearer: str = "",
xhdr: Optional[dict] = None,
session: Optional[str] = None,
timeout: Optional[str] = CLI_DEFAULT_TIMEOUT,
):
"""
GETRANGE an Object.
Args:
wallet: wallet on whose behalf GETRANGE is done
cid: ID of Container where we get the Object from
oid: ID of Object we are going to request
range_cut: range to take data from in the form offset:length
shell: executor for cli command
endpoint: FrostFS endpoint to send request to, appends to `--rpc-endpoint` key
bearer: path to Bearer Token file, appends to `--bearer` key
wallet_config: path to the wallet config
xhdr: Request X-Headers in form of Key=Value
session: path to a JSON-encoded container session token
timeout: Timeout for the operation.
Returns:
(str, bytes) - path to the file with range content and content of this file as bytes
"""
range_file_path = os.path.join(ASSETS_DIR, str(uuid.uuid4()))
cli = FrostfsCli(shell, FROSTFS_CLI_EXEC, wallet_config or WALLET_CONFIG)
cli.object.range(
rpc_endpoint=endpoint,
wallet=wallet,
cid=cid,
oid=oid,
range=range_cut,
file=range_file_path,
bearer=bearer,
xhdr=xhdr,
session=session,
timeout=timeout,
)
with open(range_file_path, "rb") as file:
content = file.read()
return range_file_path, content
@allure.step("Lock Object")
def lock_object(
wallet: str,
cid: str,
oid: str,
shell: Shell,
endpoint: str,
lifetime: Optional[int] = None,
expire_at: Optional[int] = None,
address: Optional[str] = None,
bearer: Optional[str] = None,
session: Optional[str] = None,
wallet_config: Optional[str] = None,
ttl: Optional[int] = None,
xhdr: Optional[dict] = None,
timeout: Optional[str] = CLI_DEFAULT_TIMEOUT,
) -> str:
"""
Lock object in container.
Args:
address: Address of wallet account.
bearer: File with signed JSON or binary encoded bearer token.
cid: Container ID.
oid: Object ID.
lifetime: Lock lifetime.
expire_at: Lock expiration epoch.
shell: executor for cli command
endpoint: FrostFS endpoint to send request to, appends to `--rpc-endpoint` key
session: Path to a JSON-encoded container session token.
ttl: TTL value in request meta header (default 2).
wallet: WIF (NEP-2) string or path to the wallet or binary key.
xhdr: Dict with request X-Headers.
timeout: Timeout for the operation.
Returns:
Lock object ID
"""
cli = FrostfsCli(shell, FROSTFS_CLI_EXEC, wallet_config or WALLET_CONFIG)
result = cli.object.lock(
rpc_endpoint=endpoint,
lifetime=lifetime,
expire_at=expire_at,
address=address,
wallet=wallet,
cid=cid,
oid=oid,
bearer=bearer,
xhdr=xhdr,
session=session,
ttl=ttl,
timeout=timeout,
)
# splitting CLI output to lines and taking the penultimate line
id_str = result.stdout.strip().split("\n")[0]
oid = id_str.split(":")[1]
return oid.strip()
@allure.step("Search object")
def search_object(
wallet: str,
cid: str,
shell: Shell,
endpoint: str,
bearer: str = "",
filters: Optional[dict] = None,
expected_objects_list: Optional[list] = None,
wallet_config: Optional[str] = None,
xhdr: Optional[dict] = None,
session: Optional[str] = None,
phy: bool = False,
root: bool = False,
timeout: Optional[str] = CLI_DEFAULT_TIMEOUT,
) -> list:
"""
SEARCH an Object.
Args:
wallet: wallet on whose behalf SEARCH is done
cid: ID of Container where we get the Object from
shell: executor for cli command
bearer: path to Bearer Token file, appends to `--bearer` key
endpoint: FrostFS endpoint to send request to, appends to `--rpc-endpoint` key
filters: key=value pairs to filter Objects
expected_objects_list: a list of ObjectIDs to compare found Objects with
wallet_config: path to the wallet config
xhdr: Request X-Headers in form of Key=Value
session: path to a JSON-encoded container session token
phy: Search physically stored objects.
root: Search for user objects.
timeout: Timeout for the operation.
Returns:
list of found ObjectIDs
"""
cli = FrostfsCli(shell, FROSTFS_CLI_EXEC, wallet_config or WALLET_CONFIG)
result = cli.object.search(
rpc_endpoint=endpoint,
wallet=wallet,
cid=cid,
bearer=bearer,
xhdr=xhdr,
filters=[f"{filter_key} EQ {filter_val}" for filter_key, filter_val in filters.items()]
if filters
else None,
session=session,
phy=phy,
root=root,
timeout=timeout,
)
found_objects = re.findall(r"(\w{43,44})", result.stdout)
if expected_objects_list:
if sorted(found_objects) == sorted(expected_objects_list):
logger.info(
f"Found objects list '{found_objects}' "
f"is equal for expected list '{expected_objects_list}'"
)
else:
logger.warning(
f"Found object list {found_objects} "
f"is not equal to expected list '{expected_objects_list}'"
)
return found_objects
@allure.step("Get netmap netinfo")
def get_netmap_netinfo(
wallet: str,
shell: Shell,
endpoint: str,
wallet_config: Optional[str] = None,
address: Optional[str] = None,
ttl: Optional[int] = None,
xhdr: Optional[dict] = None,
timeout: Optional[str] = CLI_DEFAULT_TIMEOUT,
) -> dict[str, Any]:
"""
Get netmap netinfo output from node
Args:
wallet (str): wallet on whose behalf request is done
shell: executor for cli command
endpoint (optional, str): FrostFS endpoint to send request to, appends to `--rpc-endpoint` key
address: Address of wallet account
ttl: TTL value in request meta header (default 2)
wallet: Path to the wallet or binary key
xhdr: Request X-Headers in form of Key=Value
Returns:
(dict): dict of parsed command output
"""
cli = FrostfsCli(shell, FROSTFS_CLI_EXEC, wallet_config or WALLET_CONFIG)
output = cli.netmap.netinfo(
wallet=wallet,
rpc_endpoint=endpoint,
address=address,
ttl=ttl,
xhdr=xhdr,
timeout=timeout,
)
settings = dict()
patterns = [
(re.compile("(.*): (\d+)"), int),
(re.compile("(.*): (false|true)"), bool),
(re.compile("(.*): (\d+\.\d+)"), float),
]
for pattern, func in patterns:
for setting, value in re.findall(pattern, output.stdout):
settings[setting.lower().strip().replace(" ", "_")] = func(value)
return settings
@allure.step("Head object")
def head_object(
wallet: str,
cid: str,
oid: str,
shell: Shell,
endpoint: str,
bearer: str = "",
xhdr: Optional[dict] = None,
json_output: bool = True,
is_raw: bool = False,
is_direct: bool = False,
wallet_config: Optional[str] = None,
session: Optional[str] = None,
timeout: Optional[str] = CLI_DEFAULT_TIMEOUT,
):
"""
HEAD an Object.
Args:
wallet (str): wallet on whose behalf HEAD is done
cid (str): ID of Container where we get the Object from
oid (str): ObjectID to HEAD
shell: executor for cli command
bearer (optional, str): path to Bearer Token file, appends to `--bearer` key
endpoint(optional, str): FrostFS endpoint to send request to
json_output(optional, bool): return response in JSON format or not; this flag
turns into `--json` key
is_raw(optional, bool): send "raw" request or not; this flag
turns into `--raw` key
is_direct(optional, bool): send request directly to the node or not; this flag
turns into `--ttl 1` key
wallet_config(optional, str): path to the wallet config
xhdr (optional, dict): Request X-Headers in form of Key=Value
session (optional, dict): path to a JSON-encoded container session token
timeout: Timeout for the operation.
Returns:
depending on the `json_output` parameter value, the function returns
(dict): HEAD response in JSON format
or
(str): HEAD response as a plain text
"""
cli = FrostfsCli(shell, FROSTFS_CLI_EXEC, wallet_config or WALLET_CONFIG)
result = cli.object.head(
rpc_endpoint=endpoint,
wallet=wallet,
cid=cid,
oid=oid,
bearer=bearer,
json_mode=json_output,
raw=is_raw,
ttl=1 if is_direct else None,
xhdr=xhdr,
session=session,
timeout=timeout,
)
if not json_output:
return result
try:
decoded = json.loads(result.stdout)
except Exception as exc:
# If we failed to parse output as JSON, the cause might be
# the plain text string in the beginning of the output.
# Here we cut off first string and try to parse again.
logger.info(f"failed to parse output: {exc}")
logger.info("parsing output in another way")
fst_line_idx = result.stdout.find("\n")
decoded = json.loads(result.stdout[fst_line_idx:])
# If response is Complex Object header, it has `splitId` key
if "splitId" in decoded.keys():
logger.info("decoding split header")
return json_utils.decode_split_header(decoded)
# If response is Last or Linking Object header,
# it has `header` dictionary and non-null `split` dictionary
if "split" in decoded["header"].keys():
if decoded["header"]["split"]:
logger.info("decoding linking object")
return json_utils.decode_linking_object(decoded)
if decoded["header"]["objectType"] == "STORAGE_GROUP":
logger.info("decoding storage group")
return json_utils.decode_storage_group(decoded)
if decoded["header"]["objectType"] == "TOMBSTONE":
logger.info("decoding tombstone")
return json_utils.decode_tombstone(decoded)
logger.info("decoding simple header")
return json_utils.decode_simple_header(decoded)
@allure.step("Run neo-go query height")
def neo_go_query_height(shell: Shell, endpoint: str) -> dict:
"""
Run neo-go query height command
Args:
shell: executor for cli command
endpoint: endpoint to execute
Returns:
dict->
Latest block: {value}
Validated state: {value}
"""
neogo = NeoGo(shell, neo_go_exec_path=NEOGO_EXECUTABLE)
output = neogo.query.height(rpc_endpoint=endpoint).stdout
try:
# taking first line from command's output contain the latest block in blockchain
first_line = output.split("\n")[0]
except Exception:
logger.error(f"Got empty output (neo-go query height): {output}")
latest_block = first_line.split(":")
# taking second line from command's output contain wallet key
second_line = output.split("\n")[1]
validated_state = second_line.split(":")
return {
latest_block[0].replace(":", ""): int(latest_block[1]),
validated_state[0].replace(":", ""): int(validated_state[1]),
}