Compare commits

...

10 commits

Author SHA1 Message Date
aa277fdd6a Increase default load time
Signed-off-by: anikeev-yadro <a.anikeev@yadro.com>
2023-08-29 16:55:25 +03:00
7059596506 Support prepare locally flag
Signed-off-by: m.malygina <m.malygina@yadro.com>
2023-08-21 11:59:05 +00:00
7112bf9c88 Change NodeNetmapInfo class
Signed-off-by: Dmitriy Zayakin <d.zayakin@yadro.com>
2023-08-17 12:54:05 +03:00
b1c21e0e5b Add Iptables helper
Signed-off-by: Dmitriy Zayakin <d.zayakin@yadro.com>
2023-08-16 12:22:14 +00:00
02c079eda3 [OBJECT-3949] delete mainchain ready 2023-08-15 12:32:36 +00:00
d28f3cdc28 Add UNDEFINED versionins status
Signed-off-by: Andrey Berezin <a.berezin@yadro.com>
2023-08-04 14:19:49 +03:00
e4878f4d1e Add readable enums
Signed-off-by: Andrey Berezin <a.berezin@yadro.com>
2023-08-02 21:38:27 +03:00
807235af95 Fix multiple services start (copy array for upstream functions)
Signed-off-by: Andrey Berezin <a.berezin@yadro.com>
2023-07-31 14:08:12 +03:00
716a780a13 Add epoch align after tick
Signed-off-by: anikeev-yadro <a.anikeev@yadro.com>
2023-07-27 16:25:53 +00:00
d6e08c477b fix divizion by zero, when total operations is zero 2023-07-27 11:33:43 +00:00
22 changed files with 346 additions and 144 deletions

View file

@ -351,3 +351,45 @@ class FrostfsCliObject(CliCommand):
"object search",
**{param: value for param, value in locals().items() if param not in ["self"]},
)
def nodes(
self,
rpc_endpoint: str,
wallet: str,
cid: str,
address: Optional[str] = None,
bearer: Optional[str] = None,
generate_key: Optional = None,
oid: Optional[str] = None,
trace: bool = False,
root: bool = False,
verify_presence_all: bool = False,
ttl: Optional[int] = None,
xhdr: Optional[dict] = None,
timeout: Optional[str] = None,
) -> CommandResult:
"""
Search object nodes.
Args:
address: Address of wallet account.
bearer: File with signed JSON or binary encoded bearer token.
cid: Container ID.
generate_key: Generate new private key.
oid: Object ID.
trace: Generate trace ID and print it.
root: Search for user objects.
rpc_endpoint: Remote node address (as 'multiaddr' or '<host>:<port>').
verify_presence_all: Verify the actual presence of the object on all netmap nodes.
ttl: TTL value in request meta header (default 2).
wallet: WIF (NEP-2) string or path to the wallet or binary key.
xhdr: Dict with request X-Headers.
timeout: Timeout for the operation (default 15s).
Returns:
Command's result.
"""
return self._execute(
"object nodes",
**{param: value for param, value in locals().items() if param not in ["self"]},
)

View file

@ -64,6 +64,7 @@ class HostConfig:
services: list[ServiceConfig] = field(default_factory=list)
clis: list[CLIConfig] = field(default_factory=list)
attributes: dict[str, str] = field(default_factory=dict)
interfaces: dict[str, str] = field(default_factory=dict)
def __post_init__(self) -> None:
self.services = [ServiceConfig(**service) for service in self.services or []]

View file

@ -178,6 +178,10 @@ class LoadParams:
min_iteration_duration: Optional[str] = metadata_field(
all_load_scenarios, None, "K6_MIN_ITERATION_DURATION", False
)
# Prepare/cut objects locally on client before sending
prepare_locally: Optional[bool] = metadata_field(
[LoadScenario.gRPC, LoadScenario.gRPC_CAR], None, "PREPARE_LOCALLY", False
)
# Specifies K6 setupTimeout time. Currently hardcoded in xk6 as 5 seconds for all scenarios
# https://k6.io/docs/using-k6/k6-options/reference/#setup-timeout
setup_timeout: Optional[str] = metadata_field(all_scenarios, None, "K6_SETUP_TIMEOUT", False)

View file

@ -100,7 +100,7 @@ class LoadReport:
return model_map[self.load_params.scenario]
def _get_oprations_sub_section_html(
def _get_operations_sub_section_html(
self,
operation_type: str,
total_operations: int,
@ -132,7 +132,9 @@ class LoadReport:
model = self._get_model_string()
# write 8KB 15h49m 50op/sec 50th open model/closed model/min_iteration duration=1s - 1.636MB/s 199.57451/s
short_summary = f"{operation_type} {object_size}{object_size_unit} {duration} {requested_rate_str} {vus_str} {model} - {throughput:.2f}{unit}/s {total_rate:.2f}/s"
errors_percent = 0
if total_operations:
errors_percent = total_errors/total_operations*100.0
html = f"""
<table border="1" cellpadding="5px"><tbody>
<tr><th colspan="2" bgcolor="gainsboro">{short_summary}</th></tr>
@ -143,7 +145,7 @@ class LoadReport:
<tr><th colspan="2" bgcolor="gainsboro">Errors</th></tr>
{per_node_errors_html}
{self._row("Total", f"{total_errors} ({total_errors/total_operations*100.0:.2f}%)")}
{self._row("Total", f"{total_errors} ({errors_percent:.2f}%)")}
{self._row("Threshold", f"{self.load_params.error_threshold:.2f}%")}
</tbody></table><br><hr>
"""
@ -228,7 +230,7 @@ class LoadReport:
delete_errors[node_key] = metrics.delete_failed_iterations
if write_section_required:
html += self._get_oprations_sub_section_html(
html += self._get_operations_sub_section_html(
"Write",
write_operations,
requested_write_rate_str,
@ -239,7 +241,7 @@ class LoadReport:
)
if read_section_required:
html += self._get_oprations_sub_section_html(
html += self._get_operations_sub_section_html(
"Read",
read_operations,
requested_read_rate_str,
@ -250,7 +252,7 @@ class LoadReport:
)
if delete_section_required:
html += self._get_oprations_sub_section_html(
html += self._get_operations_sub_section_html(
"Delete",
delete_operations,
requested_delete_rate_str,

View file

@ -49,15 +49,15 @@ class LoadVerifier:
if deleters and not delete_operations:
exceptions.append(f"No any delete operation was performed")
if writers and write_errors / write_operations * 100 > self.load_params.error_threshold:
if write_operations and writers and write_errors / write_operations * 100 > self.load_params.error_threshold:
exceptions.append(
f"Write error rate is greater than threshold: {write_errors / write_operations * 100} > {self.load_params.error_threshold}"
)
if readers and read_errors / read_operations * 100 > self.load_params.error_threshold:
if read_operations and readers and read_errors / read_operations * 100 > self.load_params.error_threshold:
exceptions.append(
f"Read error rate is greater than threshold: {read_errors / read_operations * 100} > {self.load_params.error_threshold}"
)
if deleters and delete_errors / delete_operations * 100 > self.load_params.error_threshold:
if delete_operations and deleters and delete_errors / delete_operations * 100 > self.load_params.error_threshold:
exceptions.append(
f"Delete error rate is greater than threshold: {delete_errors / delete_operations * 100} > {self.load_params.error_threshold}"
)

View file

@ -11,7 +11,7 @@ BACKGROUND_WRITERS_COUNT = os.getenv("BACKGROUND_WRITERS_COUNT", 0)
BACKGROUND_READERS_COUNT = os.getenv("BACKGROUND_READERS_COUNT", 0)
BACKGROUND_DELETERS_COUNT = os.getenv("BACKGROUND_DELETERS_COUNT", 0)
BACKGROUND_VERIFIERS_COUNT = os.getenv("BACKGROUND_VERIFIERS_COUNT", 0)
BACKGROUND_LOAD_DEFAULT_TIME = os.getenv("BACKGROUND_LOAD_DEFAULT_TIME", 600)
BACKGROUND_LOAD_DEFAULT_TIME = os.getenv("BACKGROUND_LOAD_DEFAULT_TIME", 1200)
BACKGROUND_LOAD_DEFAULT_OBJECT_SIZE = os.getenv("BACKGROUND_LOAD_DEFAULT_OBJECT_SIZE", 32)
BACKGROUND_LOAD_SETUP_TIMEOUT = os.getenv("BACKGROUND_LOAD_SETUP_TIMEOUT", "5s")

View file

@ -1,9 +1,8 @@
from abc import abstractmethod
from datetime import datetime
from enum import Enum
from typing import Literal, Optional, Union
from frostfs_testlib.testing.readable import HumanReadableABC
from frostfs_testlib.testing.readable import HumanReadableABC, HumanReadableEnum
def _make_objs_dict(key_names):
@ -15,7 +14,8 @@ def _make_objs_dict(key_names):
return objs_dict
class VersioningStatus(Enum):
class VersioningStatus(HumanReadableEnum):
UNDEFINED = None
ENABLED = "Enabled"
SUSPENDED = "Suspended"

View file

@ -11,8 +11,9 @@ from frostfs_testlib.reporter import get_reporter
from frostfs_testlib.resources.cli import CLI_DEFAULT_TIMEOUT, FROSTFS_CLI_EXEC, NEOGO_EXECUTABLE
from frostfs_testlib.resources.common import ASSETS_DIR, DEFAULT_WALLET_CONFIG
from frostfs_testlib.shell import Shell
from frostfs_testlib.storage.cluster import Cluster
from frostfs_testlib.storage.cluster import Cluster, ClusterNode
from frostfs_testlib.utils import json_utils
from frostfs_testlib.utils.cli_utils import parse_cmd_table, parse_netmap_output
logger = logging.getLogger("NeoLogger")
reporter = get_reporter()
@ -731,3 +732,62 @@ def neo_go_query_height(shell: Shell, endpoint: str) -> dict:
latest_block[0].replace(":", ""): int(latest_block[1]),
validated_state[0].replace(":", ""): int(validated_state[1]),
}
@reporter.step_deco("Search object nodes")
def get_object_nodes(
cluster: Cluster,
wallet: str,
cid: str,
oid: str,
shell: Shell,
endpoint: str,
bearer: str = "",
xhdr: Optional[dict] = None,
is_direct: bool = False,
verify_presence_all: bool = False,
wallet_config: Optional[str] = None,
timeout: Optional[str] = CLI_DEFAULT_TIMEOUT,
) -> list[ClusterNode]:
cli = FrostfsCli(shell, FROSTFS_CLI_EXEC, wallet_config or DEFAULT_WALLET_CONFIG)
result_object_nodes = cli.object.nodes(
rpc_endpoint=endpoint,
wallet=wallet,
cid=cid,
oid=oid,
bearer=bearer,
ttl=1 if is_direct else None,
xhdr=xhdr,
timeout=timeout,
verify_presence_all=verify_presence_all,
)
parsing_output = parse_cmd_table(result_object_nodes.stdout, "|")
list_object_nodes = [
node
for node in parsing_output
if node["should_contain_object"] == "true" and node["actually_contains_object"] == "true"
]
netmap_nodes_list = parse_netmap_output(
cli.netmap.snapshot(
rpc_endpoint=endpoint,
wallet=wallet,
).stdout
)
netmap_nodes = [
netmap_node
for object_node in list_object_nodes
for netmap_node in netmap_nodes_list
if object_node["node_id"] == netmap_node.node_id
]
result = [
cluster_node
for netmap_node in netmap_nodes
for cluster_node in cluster.cluster_nodes
if netmap_node.node == cluster_node.host_ip
]
return result

View file

@ -0,0 +1,42 @@
from frostfs_testlib.shell import Shell
from frostfs_testlib.storage.cluster import ClusterNode
class IpTablesHelper:
@staticmethod
def drop_input_traffic_to_port(node: ClusterNode, ports: list[str]) -> None:
shell = node.host.get_shell()
for port in ports:
shell.exec(f"iptables -A INPUT -p tcp --dport {port} -j DROP")
@staticmethod
def drop_input_traffic_to_node(node: ClusterNode, block_ip: list[str]) -> None:
shell = node.host.get_shell()
for ip in block_ip:
shell.exec(f"iptables -A INPUT -s {ip} -j DROP")
@staticmethod
def restore_input_traffic_to_port(node: ClusterNode) -> None:
shell = node.host.get_shell()
ports = (
shell.exec("iptables -L --numeric | grep DROP | awk '{print $7}'")
.stdout.strip()
.split("\n")
)
if ports[0] == "":
return
for port in ports:
shell.exec(f"iptables -D INPUT -p tcp --dport {port.split(':')[-1]} -j DROP")
@staticmethod
def restore_input_traffic_to_node(node: ClusterNode) -> None:
shell = node.host.get_shell()
unlock_ip = (
shell.exec("iptables -L --numeric | grep DROP | awk '{print $4}'")
.stdout.strip()
.split("\n")
)
if unlock_ip[0] == "":
return
for ip in unlock_ip:
shell.exec(f"iptables -D INPUT -s {ip} -j DROP")

View file

@ -16,6 +16,7 @@ from frostfs_testlib.resources.cli import (
from frostfs_testlib.resources.common import MORPH_BLOCK_TIME
from frostfs_testlib.shell import Shell
from frostfs_testlib.steps.epoch import tick_epoch
from frostfs_testlib.steps.epoch import wait_for_epochs_align
from frostfs_testlib.storage.cluster import Cluster, StorageNode
from frostfs_testlib.storage.dataclasses.frostfs_services import S3Gate
from frostfs_testlib.utils import datetime_utils
@ -189,6 +190,7 @@ def exclude_node_from_network_map(
time.sleep(datetime_utils.parse_time(MORPH_BLOCK_TIME))
tick_epoch(shell, cluster)
wait_for_epochs_align(shell, cluster)
snapshot = get_netmap_snapshot(node=alive_node, shell=shell)
assert (

View file

@ -13,7 +13,7 @@ from frostfs_testlib.reporter import get_reporter
from frostfs_testlib.resources.cli import NEOGO_EXECUTABLE
from frostfs_testlib.resources.common import FROSTFS_CONTRACT, GAS_HASH, MORPH_BLOCK_TIME
from frostfs_testlib.shell import Shell
from frostfs_testlib.storage.dataclasses.frostfs_services import MainChain, MorphChain
from frostfs_testlib.storage.dataclasses.frostfs_services import MorphChain
from frostfs_testlib.utils import converting_utils, datetime_utils, wallet_utils
reporter = get_reporter()
@ -21,10 +21,8 @@ logger = logging.getLogger("NeoLogger")
EMPTY_PASSWORD = ""
TX_PERSIST_TIMEOUT = 15 # seconds
ASSET_POWER_MAINCHAIN = 10**8
ASSET_POWER_SIDECHAIN = 10**12
def get_nns_contract_hash(morph_chain: MorphChain) -> str:
return morph_chain.rpc_client.get_contract_state(1)["hash"]
@ -41,33 +39,7 @@ def get_contract_hash(morph_chain: MorphChain, resolve_name: str, shell: Shell)
stack_data = json.loads(out.stdout.replace("\n", ""))["stack"][0]["value"]
return bytes.decode(base64.b64decode(stack_data[0]["value"]))
@reporter.step_deco("Withdraw Mainnet Gas")
def withdraw_mainnet_gas(shell: Shell, main_chain: MainChain, wlt: str, amount: int):
address = wallet_utils.get_last_address_from_wallet(wlt, EMPTY_PASSWORD)
scripthash = neo3_utils.address_to_script_hash(address)
neogo = NeoGo(shell=shell, neo_go_exec_path=NEOGO_EXECUTABLE)
out = neogo.contract.invokefunction(
wallet=wlt,
address=address,
rpc_endpoint=main_chain.get_endpoint(),
scripthash=FROSTFS_CONTRACT,
method="withdraw",
arguments=f"{scripthash} int:{amount}",
multisig_hash=f"{scripthash}:Global",
wallet_password="",
)
m = re.match(r"^Sent invocation transaction (\w{64})$", out.stdout)
if m is None:
raise Exception("Can not get Tx.")
tx = m.group(1)
if not transaction_accepted(main_chain, tx):
raise AssertionError(f"TX {tx} hasn't been processed")
def transaction_accepted(main_chain: MainChain, tx_id: str):
def transaction_accepted(morph_chain: MorphChain, tx_id: str):
"""
This function returns True in case of accepted TX.
Args:
@ -79,8 +51,8 @@ def transaction_accepted(main_chain: MainChain, tx_id: str):
try:
for _ in range(0, TX_PERSIST_TIMEOUT):
time.sleep(1)
neogo = NeoGo(shell=main_chain.host.get_shell(), neo_go_exec_path=NEOGO_EXECUTABLE)
resp = neogo.query.tx(tx_hash=tx_id, rpc_endpoint=main_chain.get_endpoint())
neogo = NeoGo(shell=morph_chain.host.get_shell(), neo_go_exec_path=NEOGO_EXECUTABLE)
resp = neogo.query.tx(tx_hash=tx_id, rpc_endpoint=morph_chain.get_endpoint())
if resp is not None:
logger.info(f"TX is accepted in block: {resp}")
return True, resp
@ -110,12 +82,11 @@ def get_balance(shell: Shell, morph_chain: MorphChain, wallet_path: str, wallet_
logger.error(f"failed to get wallet balance: {out}")
raise out
@reporter.step_deco("Transfer Gas")
def transfer_gas(
shell: Shell,
amount: int,
main_chain: MainChain,
morph_chain: MorphChain,
wallet_from_path: Optional[str] = None,
wallet_from_password: Optional[str] = None,
address_from: Optional[str] = None,
@ -138,11 +109,11 @@ def transfer_gas(
address_to: The address of the wallet to transfer assets to.
amount: Amount of gas to transfer.
"""
wallet_from_path = wallet_from_path or main_chain.get_wallet_path()
wallet_from_path = wallet_from_path or morph_chain.get_wallet_path()
wallet_from_password = (
wallet_from_password
if wallet_from_password is not None
else main_chain.get_wallet_password()
else morph_chain.get_wallet_password()
)
address_from = address_from or wallet_utils.get_last_address_from_wallet(
wallet_from_path, wallet_from_password
@ -153,7 +124,7 @@ def transfer_gas(
neogo = NeoGo(shell, neo_go_exec_path=NEOGO_EXECUTABLE)
out = neogo.nep17.transfer(
rpc_endpoint=main_chain.get_endpoint(),
rpc_endpoint=morph_chain.get_endpoint(),
wallet=wallet_from_path,
wallet_password=wallet_from_password,
amount=amount,
@ -165,49 +136,11 @@ def transfer_gas(
txid = out.stdout.strip().split("\n")[-1]
if len(txid) != 64:
raise Exception("Got no TXID after run the command")
if not transaction_accepted(main_chain, txid):
if not transaction_accepted(morph_chain, txid):
raise AssertionError(f"TX {txid} hasn't been processed")
time.sleep(datetime_utils.parse_time(MORPH_BLOCK_TIME))
@reporter.step_deco("FrostFS Deposit")
def deposit_gas(
shell: Shell,
main_chain: MainChain,
amount: int,
wallet_from_path: str,
wallet_from_password: str,
):
"""
Transferring GAS from given wallet to FrostFS contract address.
"""
# get FrostFS contract address
deposit_addr = converting_utils.contract_hash_to_address(FROSTFS_CONTRACT)
logger.info(f"FrostFS contract address: {deposit_addr}")
address_from = wallet_utils.get_last_address_from_wallet(
wallet_path=wallet_from_path, wallet_password=wallet_from_password
)
transfer_gas(
shell=shell,
main_chain=main_chain,
amount=amount,
wallet_from_path=wallet_from_path,
wallet_from_password=wallet_from_password,
address_to=deposit_addr,
address_from=address_from,
)
@reporter.step_deco("Get Mainnet Balance")
def get_mainnet_balance(main_chain: MainChain, address: str):
resp = main_chain.rpc_client.get_nep17_balances(address=address)
logger.info(f"Got getnep17balances response: {resp}")
for balance in resp["balance"]:
if balance["assethash"] == GAS_HASH:
return float(balance["amount"]) / ASSET_POWER_MAINCHAIN
return float(0)
@reporter.step_deco("Get Sidechain Balance")
def get_sidechain_balance(morph_chain: MorphChain, address: str):
resp = morph_chain.rpc_client.get_nep17_balances(address=address)

View file

@ -67,6 +67,9 @@ def try_to_get_objects_and_expect_error(
@reporter.step_deco("Set versioning status to '{status}' for bucket '{bucket}'")
def set_bucket_versioning(s3_client: S3ClientWrapper, bucket: str, status: VersioningStatus):
if status == VersioningStatus.UNDEFINED:
return
s3_client.get_bucket_versioning_status(bucket)
s3_client.put_bucket_versioning(bucket, status=status)
bucket_status = s3_client.get_bucket_versioning_status(bucket)

View file

@ -14,6 +14,7 @@ from frostfs_testlib.resources.common import ASSETS_DIR, DEFAULT_WALLET_CONFIG
from frostfs_testlib.shell import Shell
from frostfs_testlib.storage.dataclasses.storage_object_info import StorageObjectInfo
from frostfs_testlib.storage.dataclasses.wallet import WalletInfo
from frostfs_testlib.testing.readable import HumanReadableEnum
from frostfs_testlib.utils import json_utils, wallet_utils
reporter = get_reporter()
@ -26,7 +27,7 @@ WRONG_VERB = "wrong verb of the session"
INVALID_SIGNATURE = "invalid signature of the session data"
class ObjectVerb(Enum):
class ObjectVerb(HumanReadableEnum):
PUT = "PUT"
DELETE = "DELETE"
GET = "GET"
@ -36,7 +37,7 @@ class ObjectVerb(Enum):
SEARCH = "SEARCH"
class ContainerVerb(Enum):
class ContainerVerb(HumanReadableEnum):
CREATE = "PUT"
DELETE = "DELETE"
SETEACL = "SETEACL"

View file

@ -2,7 +2,6 @@ from frostfs_testlib.storage.constants import _FrostfsServicesNames
from frostfs_testlib.storage.dataclasses.frostfs_services import (
HTTPGate,
InnerRing,
MainChain,
MorphChain,
S3Gate,
StorageNode,
@ -17,8 +16,6 @@ __class_registry.register_service(_FrostfsServicesNames.INNER_RING, InnerRing)
__class_registry.register_service(_FrostfsServicesNames.MORPH_CHAIN, MorphChain)
__class_registry.register_service(_FrostfsServicesNames.S3_GATE, S3Gate)
__class_registry.register_service(_FrostfsServicesNames.HTTP_GATE, HTTPGate)
# # TODO: Remove this since we are no longer have main chain
__class_registry.register_service(_FrostfsServicesNames.MAIN_CHAIN, MainChain)
def get_service_registry() -> ServiceRegistry:

View file

@ -21,4 +21,3 @@ class _FrostfsServicesNames:
HTTP_GATE = "http-gate"
MORPH_CHAIN = "morph-chain"
INNER_RING = "ir"
MAIN_CHAIN = "main-chain"

View file

@ -1,12 +1,15 @@
import copy
import itertools
import time
from concurrent.futures import ThreadPoolExecutor
import frostfs_testlib.resources.optionals as optionals
from frostfs_testlib.reporter import get_reporter
from frostfs_testlib.shell import CommandOptions, Shell
from frostfs_testlib.steps import epoch
from frostfs_testlib.steps.iptables import IpTablesHelper
from frostfs_testlib.storage.cluster import Cluster, ClusterNode, StorageNode
from frostfs_testlib.storage.controllers.disk_controller import DiskController
from frostfs_testlib.testing import parallel
from frostfs_testlib.testing.test_control import run_optionally, wait_for_success
from frostfs_testlib.utils.failover_utils import (
wait_all_storage_nodes_returned,
@ -24,6 +27,7 @@ class ClusterStateController:
self.detached_disks: dict[str, DiskController] = {}
self.stopped_storage_nodes: list[ClusterNode] = []
self.stopped_s3_gates: list[ClusterNode] = []
self.dropped_traffic: list[ClusterNode] = []
self.cluster = cluster
self.shell = shell
self.suspended_services: dict[str, list[ClusterNode]] = {}
@ -139,15 +143,8 @@ class ClusterStateController:
# In case if we stopped couple services, for example (s01-s04):
# After starting only s01, it may require connections to s02-s04, which is still down, and fail to start.
# Also, if something goes wrong here, we might skip s02-s04 start at all, and cluster will be left in a bad state.
# So in order to make sure that services are at least attempted to be started, using threads here.
with ThreadPoolExecutor(max_workers=len(self.stopped_storage_nodes)) as executor:
start_result = executor.map(self.start_storage_service, self.stopped_storage_nodes)
# Looks tricky, but if exception is raised in any thread, it will be "eaten" by ThreadPoolExecutor,
# But will be thrown here.
# Not ideal solution, but okay for now
for _ in start_result:
pass
# So in order to make sure that services are at least attempted to be started, using parallel runs here.
parallel(self.start_storage_service, copy.copy(self.stopped_storage_nodes))
wait_all_storage_nodes_returned(self.shell, self.cluster)
self.stopped_storage_nodes = []
@ -170,14 +167,8 @@ class ClusterStateController:
if not self.stopped_s3_gates:
return
with ThreadPoolExecutor(max_workers=len(self.stopped_s3_gates)) as executor:
start_result = executor.map(self.start_s3_gate, self.stopped_s3_gates)
# Looks tricky, but if exception is raised in any thread, it will be "eaten" by ThreadPoolExecutor,
# But will be thrown here.
# Not ideal solution, but okay for now
for _ in start_result:
pass
parallel(self.start_s3_gate, copy.copy(self.stopped_s3_gates))
self.stopped_s3_gates = []
@run_optionally(optionals.OPTIONAL_FAILOVER_ENABLED)
@reporter.step_deco("Suspend {process_name} service in {node}")
@ -204,6 +195,62 @@ class ClusterStateController:
[node.host.wait_success_resume_process(process_name) for node in list_nodes]
self.suspended_services = {}
@reporter.step_deco("Drop traffic to {node}, with ports - {ports}, nodes - {block_nodes}")
def drop_traffic(
self,
mode: str,
node: ClusterNode,
wakeup_timeout: int,
ports: list[str] = None,
block_nodes: list[ClusterNode] = None,
) -> None:
allowed_modes = ["ports", "nodes"]
assert mode in allowed_modes
match mode:
case "ports":
IpTablesHelper.drop_input_traffic_to_port(node, ports)
case "nodes":
list_ip = self._parse_intefaces(block_nodes)
IpTablesHelper.drop_input_traffic_to_node(node, list_ip)
time.sleep(wakeup_timeout)
self.dropped_traffic.append(node)
@reporter.step_deco("Ping traffic")
def ping_traffic(
self,
node: ClusterNode,
nodes_list: list[ClusterNode],
expect_result: int,
) -> bool:
shell = node.host.get_shell()
options = CommandOptions(check=False)
ips = self._parse_intefaces(nodes_list)
for ip in ips:
code = shell.exec(f"ping {ip} -c 1", options).return_code
if code != expect_result:
return False
return True
@reporter.step_deco("Start traffic to {node}")
def restore_traffic(
self,
mode: str,
node: ClusterNode,
) -> None:
allowed_modes = ["ports", "nodes"]
assert mode in allowed_modes
match mode:
case "ports":
IpTablesHelper.restore_input_traffic_to_port(node=node)
case "nodes":
IpTablesHelper.restore_input_traffic_to_node(node=node)
@reporter.step_deco("Restore blocked nodes")
def restore_all_traffic(self):
parallel(self._restore_traffic_to_node, self.dropped_traffic)
@run_optionally(optionals.OPTIONAL_FAILOVER_ENABLED)
@reporter.step_deco("Hard reboot host {node} via magic SysRq option")
def panic_reboot_host(self, node: ClusterNode, wait_for_return: bool = True):
@ -230,3 +277,16 @@ class ClusterStateController:
disk_controller = DiskController(node, device, mountpoint)
return disk_controller
def _restore_traffic_to_node(self, node):
IpTablesHelper.restore_input_traffic_to_port(node)
IpTablesHelper.restore_input_traffic_to_node(node)
def _parse_intefaces(self, nodes: list[ClusterNode]):
interfaces = []
for node in nodes:
dict_interfaces = node.host.config.interfaces
for type, ip in dict_interfaces.items():
if "mgmt" not in type:
interfaces.append(ip)
return interfaces

View file

@ -3,6 +3,7 @@ from dataclasses import dataclass
from enum import Enum
from typing import Any, Dict, List, Optional, Union
from frostfs_testlib.testing.readable import HumanReadableEnum
from frostfs_testlib.utils import wallet_utils
logger = logging.getLogger("NeoLogger")
@ -10,7 +11,7 @@ EACL_LIFETIME = 100500
FROSTFS_CONTRACT_CACHE_TIMEOUT = 30
class EACLOperation(Enum):
class EACLOperation(HumanReadableEnum):
PUT = "put"
GET = "get"
HEAD = "head"
@ -20,24 +21,24 @@ class EACLOperation(Enum):
DELETE = "delete"
class EACLAccess(Enum):
class EACLAccess(HumanReadableEnum):
ALLOW = "allow"
DENY = "deny"
class EACLRole(Enum):
class EACLRole(HumanReadableEnum):
OTHERS = "others"
USER = "user"
SYSTEM = "system"
class EACLHeaderType(Enum):
class EACLHeaderType(HumanReadableEnum):
REQUEST = "req" # Filter request headers
OBJECT = "obj" # Filter object headers
SERVICE = "SERVICE" # Filter service headers. These are not processed by FrostFS nodes and exist for service use only
class EACLMatchType(Enum):
class EACLMatchType(HumanReadableEnum):
STRING_EQUAL = "=" # Return true if strings are equal
STRING_NOT_EQUAL = "!=" # Return true if strings are different

View file

@ -110,30 +110,6 @@ class MorphChain(NodeBase):
def label(self) -> str:
return f"{self.name}: {self.get_endpoint()}"
class MainChain(NodeBase):
"""
Class represents main-chain consensus node in a cluster
Consensus node is not always the same as physical host:
It can be service running in a container or on physical host (or physical node, if you will):
For testing perspective, it's not relevant how it is actually running,
since frostfs network will still treat it as "node"
"""
rpc_client: RPCClient
def construct(self):
self.rpc_client = RPCClient(self.get_endpoint())
def get_endpoint(self) -> str:
return self._get_attribute(ConfigAttributes.ENDPOINT_INTERNAL)
@property
def label(self) -> str:
return f"{self.name}: {self.get_endpoint()}"
class StorageNode(NodeBase):
"""
Class represents storage node in a storage cluster

View file

@ -23,3 +23,22 @@ class StorageObjectInfo(ObjectRef):
attributes: Optional[list[dict[str, str]]] = None
tombstone: Optional[str] = None
locks: Optional[list[LockObjectInfo]] = None
@dataclass
class NodeNetmapInfo:
node_id: str
node_status: str
node_data_ip: str
cluster_name: str
continent: str
country: str
country_code: str
external_address: str
location: str
node: str
price: int
sub_div: str
sub_div_code: int
un_locode: str
role: str

View file

@ -1,4 +1,13 @@
from abc import ABCMeta
from enum import Enum
class HumanReadableEnum(Enum):
def __str__(self):
return self._name_
def __repr__(self):
return self._name_
class HumanReadableABCMeta(ABCMeta):

View file

@ -5,18 +5,21 @@
"""
Helper functions to use with `frostfs-cli`, `neo-go` and other CLIs.
"""
import csv
import json
import logging
import subprocess
import sys
from contextlib import suppress
from datetime import datetime
from io import StringIO
from textwrap import shorten
from typing import TypedDict, Union
from typing import Dict, List, TypedDict, Union
import pexpect
from frostfs_testlib.reporter import get_reporter
from frostfs_testlib.storage.dataclasses.storage_object_info import NodeNetmapInfo
reporter = get_reporter()
logger = logging.getLogger("NeoLogger")
@ -131,3 +134,49 @@ def log_command_execution(cmd: str, output: Union[str, TypedDict]) -> None:
command_attachment = f"COMMAND: '{cmd}'\n" f"OUTPUT:\n {output}\n"
with reporter.step(f'COMMAND: {shorten(cmd, width=60, placeholder="...")}'):
reporter.attach(command_attachment, "Command execution")
def parse_netmap_output(output: str) -> list[NodeNetmapInfo]:
"""
The cli command will return something like.
Epoch: 240
Node 1: 01234 ONLINE /ip4/10.10.10.10/tcp/8080
Continent: Europe
Country: Russia
CountryCode: RU
ExternalAddr: /ip4/10.10.11.18/tcp/8080
Location: Moskva
Node: 10.10.10.12
Price: 5
SubDiv: Moskva
SubDivCode: MOW
UN-LOCODE: RU MOW
role: alphabet
The code will parse each line and return each node as dataclass.
"""
netmap_list = output.split("Node ")[1:]
dataclass_list = []
for node in netmap_list:
node = node.replace("\t", "").split("\n")
node = *node[0].split(" ")[1:-1], *[row.split(": ")[-1] for row in node[1:-1]]
dataclass_list.append(NodeNetmapInfo(*node))
return dataclass_list
def parse_cmd_table(output: str, delimiter="|") -> list[dict[str, str]]:
parsing_output = []
reader = csv.reader(StringIO(output.strip()), delimiter=delimiter)
iter_reader = iter(reader)
header_row = next(iter_reader)
for row in iter_reader:
table = {}
for i in range(len(row)):
header = header_row[i].strip().lower().replace(" ", "_")
value = row[i].strip().lower()
if header:
table[header] = value
parsing_output.append(table)
return parsing_output

View file

@ -3,6 +3,7 @@ from typing import Any
import pytest
from frostfs_testlib.s3 import AwsCliClient, Boto3ClientWrapper
from frostfs_testlib.storage.dataclasses.acl import EACLRole
from frostfs_testlib.storage.dataclasses.frostfs_services import (
HTTPGate,
InnerRing,
@ -30,6 +31,7 @@ class TestDataclassesStr:
(S3Gate, "S3Gate"),
(HTTPGate, "HTTPGate"),
(InnerRing, "InnerRing"),
(EACLRole.OTHERS, "OTHERS"),
],
)
def test_classes_string_representation(self, obj: Any, expected: str):