forked from TrueCloudLab/frostfs-testlib
Compare commits
10 commits
612e088763
...
aa277fdd6a
Author | SHA1 | Date | |
---|---|---|---|
aa277fdd6a | |||
7059596506 | |||
7112bf9c88 | |||
b1c21e0e5b | |||
02c079eda3 | |||
d28f3cdc28 | |||
e4878f4d1e | |||
807235af95 | |||
716a780a13 | |||
d6e08c477b |
22 changed files with 346 additions and 144 deletions
|
@ -351,3 +351,45 @@ class FrostfsCliObject(CliCommand):
|
|||
"object search",
|
||||
**{param: value for param, value in locals().items() if param not in ["self"]},
|
||||
)
|
||||
|
||||
def nodes(
|
||||
self,
|
||||
rpc_endpoint: str,
|
||||
wallet: str,
|
||||
cid: str,
|
||||
address: Optional[str] = None,
|
||||
bearer: Optional[str] = None,
|
||||
generate_key: Optional = None,
|
||||
oid: Optional[str] = None,
|
||||
trace: bool = False,
|
||||
root: bool = False,
|
||||
verify_presence_all: bool = False,
|
||||
ttl: Optional[int] = None,
|
||||
xhdr: Optional[dict] = None,
|
||||
timeout: Optional[str] = None,
|
||||
) -> CommandResult:
|
||||
"""
|
||||
Search object nodes.
|
||||
|
||||
Args:
|
||||
address: Address of wallet account.
|
||||
bearer: File with signed JSON or binary encoded bearer token.
|
||||
cid: Container ID.
|
||||
generate_key: Generate new private key.
|
||||
oid: Object ID.
|
||||
trace: Generate trace ID and print it.
|
||||
root: Search for user objects.
|
||||
rpc_endpoint: Remote node address (as 'multiaddr' or '<host>:<port>').
|
||||
verify_presence_all: Verify the actual presence of the object on all netmap nodes.
|
||||
ttl: TTL value in request meta header (default 2).
|
||||
wallet: WIF (NEP-2) string or path to the wallet or binary key.
|
||||
xhdr: Dict with request X-Headers.
|
||||
timeout: Timeout for the operation (default 15s).
|
||||
|
||||
Returns:
|
||||
Command's result.
|
||||
"""
|
||||
return self._execute(
|
||||
"object nodes",
|
||||
**{param: value for param, value in locals().items() if param not in ["self"]},
|
||||
)
|
||||
|
|
|
@ -64,6 +64,7 @@ class HostConfig:
|
|||
services: list[ServiceConfig] = field(default_factory=list)
|
||||
clis: list[CLIConfig] = field(default_factory=list)
|
||||
attributes: dict[str, str] = field(default_factory=dict)
|
||||
interfaces: dict[str, str] = field(default_factory=dict)
|
||||
|
||||
def __post_init__(self) -> None:
|
||||
self.services = [ServiceConfig(**service) for service in self.services or []]
|
||||
|
|
|
@ -178,6 +178,10 @@ class LoadParams:
|
|||
min_iteration_duration: Optional[str] = metadata_field(
|
||||
all_load_scenarios, None, "K6_MIN_ITERATION_DURATION", False
|
||||
)
|
||||
# Prepare/cut objects locally on client before sending
|
||||
prepare_locally: Optional[bool] = metadata_field(
|
||||
[LoadScenario.gRPC, LoadScenario.gRPC_CAR], None, "PREPARE_LOCALLY", False
|
||||
)
|
||||
# Specifies K6 setupTimeout time. Currently hardcoded in xk6 as 5 seconds for all scenarios
|
||||
# https://k6.io/docs/using-k6/k6-options/reference/#setup-timeout
|
||||
setup_timeout: Optional[str] = metadata_field(all_scenarios, None, "K6_SETUP_TIMEOUT", False)
|
||||
|
|
|
@ -100,7 +100,7 @@ class LoadReport:
|
|||
|
||||
return model_map[self.load_params.scenario]
|
||||
|
||||
def _get_oprations_sub_section_html(
|
||||
def _get_operations_sub_section_html(
|
||||
self,
|
||||
operation_type: str,
|
||||
total_operations: int,
|
||||
|
@ -132,7 +132,9 @@ class LoadReport:
|
|||
model = self._get_model_string()
|
||||
# write 8KB 15h49m 50op/sec 50th open model/closed model/min_iteration duration=1s - 1.636MB/s 199.57451/s
|
||||
short_summary = f"{operation_type} {object_size}{object_size_unit} {duration} {requested_rate_str} {vus_str} {model} - {throughput:.2f}{unit}/s {total_rate:.2f}/s"
|
||||
|
||||
errors_percent = 0
|
||||
if total_operations:
|
||||
errors_percent = total_errors/total_operations*100.0
|
||||
html = f"""
|
||||
<table border="1" cellpadding="5px"><tbody>
|
||||
<tr><th colspan="2" bgcolor="gainsboro">{short_summary}</th></tr>
|
||||
|
@ -143,7 +145,7 @@ class LoadReport:
|
|||
|
||||
<tr><th colspan="2" bgcolor="gainsboro">Errors</th></tr>
|
||||
{per_node_errors_html}
|
||||
{self._row("Total", f"{total_errors} ({total_errors/total_operations*100.0:.2f}%)")}
|
||||
{self._row("Total", f"{total_errors} ({errors_percent:.2f}%)")}
|
||||
{self._row("Threshold", f"{self.load_params.error_threshold:.2f}%")}
|
||||
</tbody></table><br><hr>
|
||||
"""
|
||||
|
@ -228,7 +230,7 @@ class LoadReport:
|
|||
delete_errors[node_key] = metrics.delete_failed_iterations
|
||||
|
||||
if write_section_required:
|
||||
html += self._get_oprations_sub_section_html(
|
||||
html += self._get_operations_sub_section_html(
|
||||
"Write",
|
||||
write_operations,
|
||||
requested_write_rate_str,
|
||||
|
@ -239,7 +241,7 @@ class LoadReport:
|
|||
)
|
||||
|
||||
if read_section_required:
|
||||
html += self._get_oprations_sub_section_html(
|
||||
html += self._get_operations_sub_section_html(
|
||||
"Read",
|
||||
read_operations,
|
||||
requested_read_rate_str,
|
||||
|
@ -250,7 +252,7 @@ class LoadReport:
|
|||
)
|
||||
|
||||
if delete_section_required:
|
||||
html += self._get_oprations_sub_section_html(
|
||||
html += self._get_operations_sub_section_html(
|
||||
"Delete",
|
||||
delete_operations,
|
||||
requested_delete_rate_str,
|
||||
|
|
|
@ -49,15 +49,15 @@ class LoadVerifier:
|
|||
if deleters and not delete_operations:
|
||||
exceptions.append(f"No any delete operation was performed")
|
||||
|
||||
if writers and write_errors / write_operations * 100 > self.load_params.error_threshold:
|
||||
if write_operations and writers and write_errors / write_operations * 100 > self.load_params.error_threshold:
|
||||
exceptions.append(
|
||||
f"Write error rate is greater than threshold: {write_errors / write_operations * 100} > {self.load_params.error_threshold}"
|
||||
)
|
||||
if readers and read_errors / read_operations * 100 > self.load_params.error_threshold:
|
||||
if read_operations and readers and read_errors / read_operations * 100 > self.load_params.error_threshold:
|
||||
exceptions.append(
|
||||
f"Read error rate is greater than threshold: {read_errors / read_operations * 100} > {self.load_params.error_threshold}"
|
||||
)
|
||||
if deleters and delete_errors / delete_operations * 100 > self.load_params.error_threshold:
|
||||
if delete_operations and deleters and delete_errors / delete_operations * 100 > self.load_params.error_threshold:
|
||||
exceptions.append(
|
||||
f"Delete error rate is greater than threshold: {delete_errors / delete_operations * 100} > {self.load_params.error_threshold}"
|
||||
)
|
||||
|
|
|
@ -11,7 +11,7 @@ BACKGROUND_WRITERS_COUNT = os.getenv("BACKGROUND_WRITERS_COUNT", 0)
|
|||
BACKGROUND_READERS_COUNT = os.getenv("BACKGROUND_READERS_COUNT", 0)
|
||||
BACKGROUND_DELETERS_COUNT = os.getenv("BACKGROUND_DELETERS_COUNT", 0)
|
||||
BACKGROUND_VERIFIERS_COUNT = os.getenv("BACKGROUND_VERIFIERS_COUNT", 0)
|
||||
BACKGROUND_LOAD_DEFAULT_TIME = os.getenv("BACKGROUND_LOAD_DEFAULT_TIME", 600)
|
||||
BACKGROUND_LOAD_DEFAULT_TIME = os.getenv("BACKGROUND_LOAD_DEFAULT_TIME", 1200)
|
||||
BACKGROUND_LOAD_DEFAULT_OBJECT_SIZE = os.getenv("BACKGROUND_LOAD_DEFAULT_OBJECT_SIZE", 32)
|
||||
BACKGROUND_LOAD_SETUP_TIMEOUT = os.getenv("BACKGROUND_LOAD_SETUP_TIMEOUT", "5s")
|
||||
|
||||
|
|
|
@ -1,9 +1,8 @@
|
|||
from abc import abstractmethod
|
||||
from datetime import datetime
|
||||
from enum import Enum
|
||||
from typing import Literal, Optional, Union
|
||||
|
||||
from frostfs_testlib.testing.readable import HumanReadableABC
|
||||
from frostfs_testlib.testing.readable import HumanReadableABC, HumanReadableEnum
|
||||
|
||||
|
||||
def _make_objs_dict(key_names):
|
||||
|
@ -15,7 +14,8 @@ def _make_objs_dict(key_names):
|
|||
return objs_dict
|
||||
|
||||
|
||||
class VersioningStatus(Enum):
|
||||
class VersioningStatus(HumanReadableEnum):
|
||||
UNDEFINED = None
|
||||
ENABLED = "Enabled"
|
||||
SUSPENDED = "Suspended"
|
||||
|
||||
|
|
|
@ -11,8 +11,9 @@ from frostfs_testlib.reporter import get_reporter
|
|||
from frostfs_testlib.resources.cli import CLI_DEFAULT_TIMEOUT, FROSTFS_CLI_EXEC, NEOGO_EXECUTABLE
|
||||
from frostfs_testlib.resources.common import ASSETS_DIR, DEFAULT_WALLET_CONFIG
|
||||
from frostfs_testlib.shell import Shell
|
||||
from frostfs_testlib.storage.cluster import Cluster
|
||||
from frostfs_testlib.storage.cluster import Cluster, ClusterNode
|
||||
from frostfs_testlib.utils import json_utils
|
||||
from frostfs_testlib.utils.cli_utils import parse_cmd_table, parse_netmap_output
|
||||
|
||||
logger = logging.getLogger("NeoLogger")
|
||||
reporter = get_reporter()
|
||||
|
@ -731,3 +732,62 @@ def neo_go_query_height(shell: Shell, endpoint: str) -> dict:
|
|||
latest_block[0].replace(":", ""): int(latest_block[1]),
|
||||
validated_state[0].replace(":", ""): int(validated_state[1]),
|
||||
}
|
||||
|
||||
|
||||
@reporter.step_deco("Search object nodes")
|
||||
def get_object_nodes(
|
||||
cluster: Cluster,
|
||||
wallet: str,
|
||||
cid: str,
|
||||
oid: str,
|
||||
shell: Shell,
|
||||
endpoint: str,
|
||||
bearer: str = "",
|
||||
xhdr: Optional[dict] = None,
|
||||
is_direct: bool = False,
|
||||
verify_presence_all: bool = False,
|
||||
wallet_config: Optional[str] = None,
|
||||
timeout: Optional[str] = CLI_DEFAULT_TIMEOUT,
|
||||
) -> list[ClusterNode]:
|
||||
cli = FrostfsCli(shell, FROSTFS_CLI_EXEC, wallet_config or DEFAULT_WALLET_CONFIG)
|
||||
|
||||
result_object_nodes = cli.object.nodes(
|
||||
rpc_endpoint=endpoint,
|
||||
wallet=wallet,
|
||||
cid=cid,
|
||||
oid=oid,
|
||||
bearer=bearer,
|
||||
ttl=1 if is_direct else None,
|
||||
xhdr=xhdr,
|
||||
timeout=timeout,
|
||||
verify_presence_all=verify_presence_all,
|
||||
)
|
||||
|
||||
parsing_output = parse_cmd_table(result_object_nodes.stdout, "|")
|
||||
list_object_nodes = [
|
||||
node
|
||||
for node in parsing_output
|
||||
if node["should_contain_object"] == "true" and node["actually_contains_object"] == "true"
|
||||
]
|
||||
|
||||
netmap_nodes_list = parse_netmap_output(
|
||||
cli.netmap.snapshot(
|
||||
rpc_endpoint=endpoint,
|
||||
wallet=wallet,
|
||||
).stdout
|
||||
)
|
||||
netmap_nodes = [
|
||||
netmap_node
|
||||
for object_node in list_object_nodes
|
||||
for netmap_node in netmap_nodes_list
|
||||
if object_node["node_id"] == netmap_node.node_id
|
||||
]
|
||||
|
||||
result = [
|
||||
cluster_node
|
||||
for netmap_node in netmap_nodes
|
||||
for cluster_node in cluster.cluster_nodes
|
||||
if netmap_node.node == cluster_node.host_ip
|
||||
]
|
||||
|
||||
return result
|
||||
|
|
42
src/frostfs_testlib/steps/iptables.py
Normal file
42
src/frostfs_testlib/steps/iptables.py
Normal file
|
@ -0,0 +1,42 @@
|
|||
from frostfs_testlib.shell import Shell
|
||||
from frostfs_testlib.storage.cluster import ClusterNode
|
||||
|
||||
|
||||
class IpTablesHelper:
|
||||
@staticmethod
|
||||
def drop_input_traffic_to_port(node: ClusterNode, ports: list[str]) -> None:
|
||||
shell = node.host.get_shell()
|
||||
for port in ports:
|
||||
shell.exec(f"iptables -A INPUT -p tcp --dport {port} -j DROP")
|
||||
|
||||
@staticmethod
|
||||
def drop_input_traffic_to_node(node: ClusterNode, block_ip: list[str]) -> None:
|
||||
shell = node.host.get_shell()
|
||||
for ip in block_ip:
|
||||
shell.exec(f"iptables -A INPUT -s {ip} -j DROP")
|
||||
|
||||
@staticmethod
|
||||
def restore_input_traffic_to_port(node: ClusterNode) -> None:
|
||||
shell = node.host.get_shell()
|
||||
ports = (
|
||||
shell.exec("iptables -L --numeric | grep DROP | awk '{print $7}'")
|
||||
.stdout.strip()
|
||||
.split("\n")
|
||||
)
|
||||
if ports[0] == "":
|
||||
return
|
||||
for port in ports:
|
||||
shell.exec(f"iptables -D INPUT -p tcp --dport {port.split(':')[-1]} -j DROP")
|
||||
|
||||
@staticmethod
|
||||
def restore_input_traffic_to_node(node: ClusterNode) -> None:
|
||||
shell = node.host.get_shell()
|
||||
unlock_ip = (
|
||||
shell.exec("iptables -L --numeric | grep DROP | awk '{print $4}'")
|
||||
.stdout.strip()
|
||||
.split("\n")
|
||||
)
|
||||
if unlock_ip[0] == "":
|
||||
return
|
||||
for ip in unlock_ip:
|
||||
shell.exec(f"iptables -D INPUT -s {ip} -j DROP")
|
|
@ -16,6 +16,7 @@ from frostfs_testlib.resources.cli import (
|
|||
from frostfs_testlib.resources.common import MORPH_BLOCK_TIME
|
||||
from frostfs_testlib.shell import Shell
|
||||
from frostfs_testlib.steps.epoch import tick_epoch
|
||||
from frostfs_testlib.steps.epoch import wait_for_epochs_align
|
||||
from frostfs_testlib.storage.cluster import Cluster, StorageNode
|
||||
from frostfs_testlib.storage.dataclasses.frostfs_services import S3Gate
|
||||
from frostfs_testlib.utils import datetime_utils
|
||||
|
@ -189,6 +190,7 @@ def exclude_node_from_network_map(
|
|||
|
||||
time.sleep(datetime_utils.parse_time(MORPH_BLOCK_TIME))
|
||||
tick_epoch(shell, cluster)
|
||||
wait_for_epochs_align(shell, cluster)
|
||||
|
||||
snapshot = get_netmap_snapshot(node=alive_node, shell=shell)
|
||||
assert (
|
||||
|
|
|
@ -13,7 +13,7 @@ from frostfs_testlib.reporter import get_reporter
|
|||
from frostfs_testlib.resources.cli import NEOGO_EXECUTABLE
|
||||
from frostfs_testlib.resources.common import FROSTFS_CONTRACT, GAS_HASH, MORPH_BLOCK_TIME
|
||||
from frostfs_testlib.shell import Shell
|
||||
from frostfs_testlib.storage.dataclasses.frostfs_services import MainChain, MorphChain
|
||||
from frostfs_testlib.storage.dataclasses.frostfs_services import MorphChain
|
||||
from frostfs_testlib.utils import converting_utils, datetime_utils, wallet_utils
|
||||
|
||||
reporter = get_reporter()
|
||||
|
@ -21,10 +21,8 @@ logger = logging.getLogger("NeoLogger")
|
|||
|
||||
EMPTY_PASSWORD = ""
|
||||
TX_PERSIST_TIMEOUT = 15 # seconds
|
||||
ASSET_POWER_MAINCHAIN = 10**8
|
||||
ASSET_POWER_SIDECHAIN = 10**12
|
||||
|
||||
|
||||
def get_nns_contract_hash(morph_chain: MorphChain) -> str:
|
||||
return morph_chain.rpc_client.get_contract_state(1)["hash"]
|
||||
|
||||
|
@ -41,33 +39,7 @@ def get_contract_hash(morph_chain: MorphChain, resolve_name: str, shell: Shell)
|
|||
stack_data = json.loads(out.stdout.replace("\n", ""))["stack"][0]["value"]
|
||||
return bytes.decode(base64.b64decode(stack_data[0]["value"]))
|
||||
|
||||
|
||||
@reporter.step_deco("Withdraw Mainnet Gas")
|
||||
def withdraw_mainnet_gas(shell: Shell, main_chain: MainChain, wlt: str, amount: int):
|
||||
address = wallet_utils.get_last_address_from_wallet(wlt, EMPTY_PASSWORD)
|
||||
scripthash = neo3_utils.address_to_script_hash(address)
|
||||
|
||||
neogo = NeoGo(shell=shell, neo_go_exec_path=NEOGO_EXECUTABLE)
|
||||
out = neogo.contract.invokefunction(
|
||||
wallet=wlt,
|
||||
address=address,
|
||||
rpc_endpoint=main_chain.get_endpoint(),
|
||||
scripthash=FROSTFS_CONTRACT,
|
||||
method="withdraw",
|
||||
arguments=f"{scripthash} int:{amount}",
|
||||
multisig_hash=f"{scripthash}:Global",
|
||||
wallet_password="",
|
||||
)
|
||||
|
||||
m = re.match(r"^Sent invocation transaction (\w{64})$", out.stdout)
|
||||
if m is None:
|
||||
raise Exception("Can not get Tx.")
|
||||
tx = m.group(1)
|
||||
if not transaction_accepted(main_chain, tx):
|
||||
raise AssertionError(f"TX {tx} hasn't been processed")
|
||||
|
||||
|
||||
def transaction_accepted(main_chain: MainChain, tx_id: str):
|
||||
def transaction_accepted(morph_chain: MorphChain, tx_id: str):
|
||||
"""
|
||||
This function returns True in case of accepted TX.
|
||||
Args:
|
||||
|
@ -79,8 +51,8 @@ def transaction_accepted(main_chain: MainChain, tx_id: str):
|
|||
try:
|
||||
for _ in range(0, TX_PERSIST_TIMEOUT):
|
||||
time.sleep(1)
|
||||
neogo = NeoGo(shell=main_chain.host.get_shell(), neo_go_exec_path=NEOGO_EXECUTABLE)
|
||||
resp = neogo.query.tx(tx_hash=tx_id, rpc_endpoint=main_chain.get_endpoint())
|
||||
neogo = NeoGo(shell=morph_chain.host.get_shell(), neo_go_exec_path=NEOGO_EXECUTABLE)
|
||||
resp = neogo.query.tx(tx_hash=tx_id, rpc_endpoint=morph_chain.get_endpoint())
|
||||
if resp is not None:
|
||||
logger.info(f"TX is accepted in block: {resp}")
|
||||
return True, resp
|
||||
|
@ -110,12 +82,11 @@ def get_balance(shell: Shell, morph_chain: MorphChain, wallet_path: str, wallet_
|
|||
logger.error(f"failed to get wallet balance: {out}")
|
||||
raise out
|
||||
|
||||
|
||||
@reporter.step_deco("Transfer Gas")
|
||||
def transfer_gas(
|
||||
shell: Shell,
|
||||
amount: int,
|
||||
main_chain: MainChain,
|
||||
morph_chain: MorphChain,
|
||||
wallet_from_path: Optional[str] = None,
|
||||
wallet_from_password: Optional[str] = None,
|
||||
address_from: Optional[str] = None,
|
||||
|
@ -138,11 +109,11 @@ def transfer_gas(
|
|||
address_to: The address of the wallet to transfer assets to.
|
||||
amount: Amount of gas to transfer.
|
||||
"""
|
||||
wallet_from_path = wallet_from_path or main_chain.get_wallet_path()
|
||||
wallet_from_path = wallet_from_path or morph_chain.get_wallet_path()
|
||||
wallet_from_password = (
|
||||
wallet_from_password
|
||||
if wallet_from_password is not None
|
||||
else main_chain.get_wallet_password()
|
||||
else morph_chain.get_wallet_password()
|
||||
)
|
||||
address_from = address_from or wallet_utils.get_last_address_from_wallet(
|
||||
wallet_from_path, wallet_from_password
|
||||
|
@ -153,7 +124,7 @@ def transfer_gas(
|
|||
|
||||
neogo = NeoGo(shell, neo_go_exec_path=NEOGO_EXECUTABLE)
|
||||
out = neogo.nep17.transfer(
|
||||
rpc_endpoint=main_chain.get_endpoint(),
|
||||
rpc_endpoint=morph_chain.get_endpoint(),
|
||||
wallet=wallet_from_path,
|
||||
wallet_password=wallet_from_password,
|
||||
amount=amount,
|
||||
|
@ -165,49 +136,11 @@ def transfer_gas(
|
|||
txid = out.stdout.strip().split("\n")[-1]
|
||||
if len(txid) != 64:
|
||||
raise Exception("Got no TXID after run the command")
|
||||
if not transaction_accepted(main_chain, txid):
|
||||
if not transaction_accepted(morph_chain, txid):
|
||||
raise AssertionError(f"TX {txid} hasn't been processed")
|
||||
time.sleep(datetime_utils.parse_time(MORPH_BLOCK_TIME))
|
||||
|
||||
|
||||
@reporter.step_deco("FrostFS Deposit")
|
||||
def deposit_gas(
|
||||
shell: Shell,
|
||||
main_chain: MainChain,
|
||||
amount: int,
|
||||
wallet_from_path: str,
|
||||
wallet_from_password: str,
|
||||
):
|
||||
"""
|
||||
Transferring GAS from given wallet to FrostFS contract address.
|
||||
"""
|
||||
# get FrostFS contract address
|
||||
deposit_addr = converting_utils.contract_hash_to_address(FROSTFS_CONTRACT)
|
||||
logger.info(f"FrostFS contract address: {deposit_addr}")
|
||||
address_from = wallet_utils.get_last_address_from_wallet(
|
||||
wallet_path=wallet_from_path, wallet_password=wallet_from_password
|
||||
)
|
||||
transfer_gas(
|
||||
shell=shell,
|
||||
main_chain=main_chain,
|
||||
amount=amount,
|
||||
wallet_from_path=wallet_from_path,
|
||||
wallet_from_password=wallet_from_password,
|
||||
address_to=deposit_addr,
|
||||
address_from=address_from,
|
||||
)
|
||||
|
||||
|
||||
@reporter.step_deco("Get Mainnet Balance")
|
||||
def get_mainnet_balance(main_chain: MainChain, address: str):
|
||||
resp = main_chain.rpc_client.get_nep17_balances(address=address)
|
||||
logger.info(f"Got getnep17balances response: {resp}")
|
||||
for balance in resp["balance"]:
|
||||
if balance["assethash"] == GAS_HASH:
|
||||
return float(balance["amount"]) / ASSET_POWER_MAINCHAIN
|
||||
return float(0)
|
||||
|
||||
|
||||
@reporter.step_deco("Get Sidechain Balance")
|
||||
def get_sidechain_balance(morph_chain: MorphChain, address: str):
|
||||
resp = morph_chain.rpc_client.get_nep17_balances(address=address)
|
||||
|
|
|
@ -67,6 +67,9 @@ def try_to_get_objects_and_expect_error(
|
|||
|
||||
@reporter.step_deco("Set versioning status to '{status}' for bucket '{bucket}'")
|
||||
def set_bucket_versioning(s3_client: S3ClientWrapper, bucket: str, status: VersioningStatus):
|
||||
if status == VersioningStatus.UNDEFINED:
|
||||
return
|
||||
|
||||
s3_client.get_bucket_versioning_status(bucket)
|
||||
s3_client.put_bucket_versioning(bucket, status=status)
|
||||
bucket_status = s3_client.get_bucket_versioning_status(bucket)
|
||||
|
|
|
@ -14,6 +14,7 @@ from frostfs_testlib.resources.common import ASSETS_DIR, DEFAULT_WALLET_CONFIG
|
|||
from frostfs_testlib.shell import Shell
|
||||
from frostfs_testlib.storage.dataclasses.storage_object_info import StorageObjectInfo
|
||||
from frostfs_testlib.storage.dataclasses.wallet import WalletInfo
|
||||
from frostfs_testlib.testing.readable import HumanReadableEnum
|
||||
from frostfs_testlib.utils import json_utils, wallet_utils
|
||||
|
||||
reporter = get_reporter()
|
||||
|
@ -26,7 +27,7 @@ WRONG_VERB = "wrong verb of the session"
|
|||
INVALID_SIGNATURE = "invalid signature of the session data"
|
||||
|
||||
|
||||
class ObjectVerb(Enum):
|
||||
class ObjectVerb(HumanReadableEnum):
|
||||
PUT = "PUT"
|
||||
DELETE = "DELETE"
|
||||
GET = "GET"
|
||||
|
@ -36,7 +37,7 @@ class ObjectVerb(Enum):
|
|||
SEARCH = "SEARCH"
|
||||
|
||||
|
||||
class ContainerVerb(Enum):
|
||||
class ContainerVerb(HumanReadableEnum):
|
||||
CREATE = "PUT"
|
||||
DELETE = "DELETE"
|
||||
SETEACL = "SETEACL"
|
||||
|
|
|
@ -2,7 +2,6 @@ from frostfs_testlib.storage.constants import _FrostfsServicesNames
|
|||
from frostfs_testlib.storage.dataclasses.frostfs_services import (
|
||||
HTTPGate,
|
||||
InnerRing,
|
||||
MainChain,
|
||||
MorphChain,
|
||||
S3Gate,
|
||||
StorageNode,
|
||||
|
@ -17,8 +16,6 @@ __class_registry.register_service(_FrostfsServicesNames.INNER_RING, InnerRing)
|
|||
__class_registry.register_service(_FrostfsServicesNames.MORPH_CHAIN, MorphChain)
|
||||
__class_registry.register_service(_FrostfsServicesNames.S3_GATE, S3Gate)
|
||||
__class_registry.register_service(_FrostfsServicesNames.HTTP_GATE, HTTPGate)
|
||||
# # TODO: Remove this since we are no longer have main chain
|
||||
__class_registry.register_service(_FrostfsServicesNames.MAIN_CHAIN, MainChain)
|
||||
|
||||
|
||||
def get_service_registry() -> ServiceRegistry:
|
||||
|
|
|
@ -21,4 +21,3 @@ class _FrostfsServicesNames:
|
|||
HTTP_GATE = "http-gate"
|
||||
MORPH_CHAIN = "morph-chain"
|
||||
INNER_RING = "ir"
|
||||
MAIN_CHAIN = "main-chain"
|
||||
|
|
|
@ -1,12 +1,15 @@
|
|||
import copy
|
||||
import itertools
|
||||
import time
|
||||
from concurrent.futures import ThreadPoolExecutor
|
||||
|
||||
import frostfs_testlib.resources.optionals as optionals
|
||||
from frostfs_testlib.reporter import get_reporter
|
||||
from frostfs_testlib.shell import CommandOptions, Shell
|
||||
from frostfs_testlib.steps import epoch
|
||||
from frostfs_testlib.steps.iptables import IpTablesHelper
|
||||
from frostfs_testlib.storage.cluster import Cluster, ClusterNode, StorageNode
|
||||
from frostfs_testlib.storage.controllers.disk_controller import DiskController
|
||||
from frostfs_testlib.testing import parallel
|
||||
from frostfs_testlib.testing.test_control import run_optionally, wait_for_success
|
||||
from frostfs_testlib.utils.failover_utils import (
|
||||
wait_all_storage_nodes_returned,
|
||||
|
@ -24,6 +27,7 @@ class ClusterStateController:
|
|||
self.detached_disks: dict[str, DiskController] = {}
|
||||
self.stopped_storage_nodes: list[ClusterNode] = []
|
||||
self.stopped_s3_gates: list[ClusterNode] = []
|
||||
self.dropped_traffic: list[ClusterNode] = []
|
||||
self.cluster = cluster
|
||||
self.shell = shell
|
||||
self.suspended_services: dict[str, list[ClusterNode]] = {}
|
||||
|
@ -139,15 +143,8 @@ class ClusterStateController:
|
|||
# In case if we stopped couple services, for example (s01-s04):
|
||||
# After starting only s01, it may require connections to s02-s04, which is still down, and fail to start.
|
||||
# Also, if something goes wrong here, we might skip s02-s04 start at all, and cluster will be left in a bad state.
|
||||
# So in order to make sure that services are at least attempted to be started, using threads here.
|
||||
with ThreadPoolExecutor(max_workers=len(self.stopped_storage_nodes)) as executor:
|
||||
start_result = executor.map(self.start_storage_service, self.stopped_storage_nodes)
|
||||
|
||||
# Looks tricky, but if exception is raised in any thread, it will be "eaten" by ThreadPoolExecutor,
|
||||
# But will be thrown here.
|
||||
# Not ideal solution, but okay for now
|
||||
for _ in start_result:
|
||||
pass
|
||||
# So in order to make sure that services are at least attempted to be started, using parallel runs here.
|
||||
parallel(self.start_storage_service, copy.copy(self.stopped_storage_nodes))
|
||||
|
||||
wait_all_storage_nodes_returned(self.shell, self.cluster)
|
||||
self.stopped_storage_nodes = []
|
||||
|
@ -170,14 +167,8 @@ class ClusterStateController:
|
|||
if not self.stopped_s3_gates:
|
||||
return
|
||||
|
||||
with ThreadPoolExecutor(max_workers=len(self.stopped_s3_gates)) as executor:
|
||||
start_result = executor.map(self.start_s3_gate, self.stopped_s3_gates)
|
||||
|
||||
# Looks tricky, but if exception is raised in any thread, it will be "eaten" by ThreadPoolExecutor,
|
||||
# But will be thrown here.
|
||||
# Not ideal solution, but okay for now
|
||||
for _ in start_result:
|
||||
pass
|
||||
parallel(self.start_s3_gate, copy.copy(self.stopped_s3_gates))
|
||||
self.stopped_s3_gates = []
|
||||
|
||||
@run_optionally(optionals.OPTIONAL_FAILOVER_ENABLED)
|
||||
@reporter.step_deco("Suspend {process_name} service in {node}")
|
||||
|
@ -204,6 +195,62 @@ class ClusterStateController:
|
|||
[node.host.wait_success_resume_process(process_name) for node in list_nodes]
|
||||
self.suspended_services = {}
|
||||
|
||||
@reporter.step_deco("Drop traffic to {node}, with ports - {ports}, nodes - {block_nodes}")
|
||||
def drop_traffic(
|
||||
self,
|
||||
mode: str,
|
||||
node: ClusterNode,
|
||||
wakeup_timeout: int,
|
||||
ports: list[str] = None,
|
||||
block_nodes: list[ClusterNode] = None,
|
||||
) -> None:
|
||||
allowed_modes = ["ports", "nodes"]
|
||||
assert mode in allowed_modes
|
||||
|
||||
match mode:
|
||||
case "ports":
|
||||
IpTablesHelper.drop_input_traffic_to_port(node, ports)
|
||||
case "nodes":
|
||||
list_ip = self._parse_intefaces(block_nodes)
|
||||
IpTablesHelper.drop_input_traffic_to_node(node, list_ip)
|
||||
time.sleep(wakeup_timeout)
|
||||
self.dropped_traffic.append(node)
|
||||
|
||||
@reporter.step_deco("Ping traffic")
|
||||
def ping_traffic(
|
||||
self,
|
||||
node: ClusterNode,
|
||||
nodes_list: list[ClusterNode],
|
||||
expect_result: int,
|
||||
) -> bool:
|
||||
shell = node.host.get_shell()
|
||||
options = CommandOptions(check=False)
|
||||
ips = self._parse_intefaces(nodes_list)
|
||||
for ip in ips:
|
||||
code = shell.exec(f"ping {ip} -c 1", options).return_code
|
||||
if code != expect_result:
|
||||
return False
|
||||
return True
|
||||
|
||||
@reporter.step_deco("Start traffic to {node}")
|
||||
def restore_traffic(
|
||||
self,
|
||||
mode: str,
|
||||
node: ClusterNode,
|
||||
) -> None:
|
||||
allowed_modes = ["ports", "nodes"]
|
||||
assert mode in allowed_modes
|
||||
|
||||
match mode:
|
||||
case "ports":
|
||||
IpTablesHelper.restore_input_traffic_to_port(node=node)
|
||||
case "nodes":
|
||||
IpTablesHelper.restore_input_traffic_to_node(node=node)
|
||||
|
||||
@reporter.step_deco("Restore blocked nodes")
|
||||
def restore_all_traffic(self):
|
||||
parallel(self._restore_traffic_to_node, self.dropped_traffic)
|
||||
|
||||
@run_optionally(optionals.OPTIONAL_FAILOVER_ENABLED)
|
||||
@reporter.step_deco("Hard reboot host {node} via magic SysRq option")
|
||||
def panic_reboot_host(self, node: ClusterNode, wait_for_return: bool = True):
|
||||
|
@ -230,3 +277,16 @@ class ClusterStateController:
|
|||
disk_controller = DiskController(node, device, mountpoint)
|
||||
|
||||
return disk_controller
|
||||
|
||||
def _restore_traffic_to_node(self, node):
|
||||
IpTablesHelper.restore_input_traffic_to_port(node)
|
||||
IpTablesHelper.restore_input_traffic_to_node(node)
|
||||
|
||||
def _parse_intefaces(self, nodes: list[ClusterNode]):
|
||||
interfaces = []
|
||||
for node in nodes:
|
||||
dict_interfaces = node.host.config.interfaces
|
||||
for type, ip in dict_interfaces.items():
|
||||
if "mgmt" not in type:
|
||||
interfaces.append(ip)
|
||||
return interfaces
|
||||
|
|
|
@ -3,6 +3,7 @@ from dataclasses import dataclass
|
|||
from enum import Enum
|
||||
from typing import Any, Dict, List, Optional, Union
|
||||
|
||||
from frostfs_testlib.testing.readable import HumanReadableEnum
|
||||
from frostfs_testlib.utils import wallet_utils
|
||||
|
||||
logger = logging.getLogger("NeoLogger")
|
||||
|
@ -10,7 +11,7 @@ EACL_LIFETIME = 100500
|
|||
FROSTFS_CONTRACT_CACHE_TIMEOUT = 30
|
||||
|
||||
|
||||
class EACLOperation(Enum):
|
||||
class EACLOperation(HumanReadableEnum):
|
||||
PUT = "put"
|
||||
GET = "get"
|
||||
HEAD = "head"
|
||||
|
@ -20,24 +21,24 @@ class EACLOperation(Enum):
|
|||
DELETE = "delete"
|
||||
|
||||
|
||||
class EACLAccess(Enum):
|
||||
class EACLAccess(HumanReadableEnum):
|
||||
ALLOW = "allow"
|
||||
DENY = "deny"
|
||||
|
||||
|
||||
class EACLRole(Enum):
|
||||
class EACLRole(HumanReadableEnum):
|
||||
OTHERS = "others"
|
||||
USER = "user"
|
||||
SYSTEM = "system"
|
||||
|
||||
|
||||
class EACLHeaderType(Enum):
|
||||
class EACLHeaderType(HumanReadableEnum):
|
||||
REQUEST = "req" # Filter request headers
|
||||
OBJECT = "obj" # Filter object headers
|
||||
SERVICE = "SERVICE" # Filter service headers. These are not processed by FrostFS nodes and exist for service use only
|
||||
|
||||
|
||||
class EACLMatchType(Enum):
|
||||
class EACLMatchType(HumanReadableEnum):
|
||||
STRING_EQUAL = "=" # Return true if strings are equal
|
||||
STRING_NOT_EQUAL = "!=" # Return true if strings are different
|
||||
|
||||
|
|
|
@ -110,30 +110,6 @@ class MorphChain(NodeBase):
|
|||
def label(self) -> str:
|
||||
return f"{self.name}: {self.get_endpoint()}"
|
||||
|
||||
|
||||
class MainChain(NodeBase):
|
||||
"""
|
||||
Class represents main-chain consensus node in a cluster
|
||||
|
||||
Consensus node is not always the same as physical host:
|
||||
It can be service running in a container or on physical host (or physical node, if you will):
|
||||
For testing perspective, it's not relevant how it is actually running,
|
||||
since frostfs network will still treat it as "node"
|
||||
"""
|
||||
|
||||
rpc_client: RPCClient
|
||||
|
||||
def construct(self):
|
||||
self.rpc_client = RPCClient(self.get_endpoint())
|
||||
|
||||
def get_endpoint(self) -> str:
|
||||
return self._get_attribute(ConfigAttributes.ENDPOINT_INTERNAL)
|
||||
|
||||
@property
|
||||
def label(self) -> str:
|
||||
return f"{self.name}: {self.get_endpoint()}"
|
||||
|
||||
|
||||
class StorageNode(NodeBase):
|
||||
"""
|
||||
Class represents storage node in a storage cluster
|
||||
|
|
|
@ -23,3 +23,22 @@ class StorageObjectInfo(ObjectRef):
|
|||
attributes: Optional[list[dict[str, str]]] = None
|
||||
tombstone: Optional[str] = None
|
||||
locks: Optional[list[LockObjectInfo]] = None
|
||||
|
||||
|
||||
@dataclass
|
||||
class NodeNetmapInfo:
|
||||
node_id: str
|
||||
node_status: str
|
||||
node_data_ip: str
|
||||
cluster_name: str
|
||||
continent: str
|
||||
country: str
|
||||
country_code: str
|
||||
external_address: str
|
||||
location: str
|
||||
node: str
|
||||
price: int
|
||||
sub_div: str
|
||||
sub_div_code: int
|
||||
un_locode: str
|
||||
role: str
|
||||
|
|
|
@ -1,4 +1,13 @@
|
|||
from abc import ABCMeta
|
||||
from enum import Enum
|
||||
|
||||
|
||||
class HumanReadableEnum(Enum):
|
||||
def __str__(self):
|
||||
return self._name_
|
||||
|
||||
def __repr__(self):
|
||||
return self._name_
|
||||
|
||||
|
||||
class HumanReadableABCMeta(ABCMeta):
|
||||
|
|
|
@ -5,18 +5,21 @@
|
|||
"""
|
||||
Helper functions to use with `frostfs-cli`, `neo-go` and other CLIs.
|
||||
"""
|
||||
import csv
|
||||
import json
|
||||
import logging
|
||||
import subprocess
|
||||
import sys
|
||||
from contextlib import suppress
|
||||
from datetime import datetime
|
||||
from io import StringIO
|
||||
from textwrap import shorten
|
||||
from typing import TypedDict, Union
|
||||
from typing import Dict, List, TypedDict, Union
|
||||
|
||||
import pexpect
|
||||
|
||||
from frostfs_testlib.reporter import get_reporter
|
||||
from frostfs_testlib.storage.dataclasses.storage_object_info import NodeNetmapInfo
|
||||
|
||||
reporter = get_reporter()
|
||||
logger = logging.getLogger("NeoLogger")
|
||||
|
@ -131,3 +134,49 @@ def log_command_execution(cmd: str, output: Union[str, TypedDict]) -> None:
|
|||
command_attachment = f"COMMAND: '{cmd}'\n" f"OUTPUT:\n {output}\n"
|
||||
with reporter.step(f'COMMAND: {shorten(cmd, width=60, placeholder="...")}'):
|
||||
reporter.attach(command_attachment, "Command execution")
|
||||
|
||||
|
||||
def parse_netmap_output(output: str) -> list[NodeNetmapInfo]:
|
||||
"""
|
||||
The cli command will return something like.
|
||||
|
||||
Epoch: 240
|
||||
Node 1: 01234 ONLINE /ip4/10.10.10.10/tcp/8080
|
||||
Continent: Europe
|
||||
Country: Russia
|
||||
CountryCode: RU
|
||||
ExternalAddr: /ip4/10.10.11.18/tcp/8080
|
||||
Location: Moskva
|
||||
Node: 10.10.10.12
|
||||
Price: 5
|
||||
SubDiv: Moskva
|
||||
SubDivCode: MOW
|
||||
UN-LOCODE: RU MOW
|
||||
role: alphabet
|
||||
|
||||
The code will parse each line and return each node as dataclass.
|
||||
"""
|
||||
netmap_list = output.split("Node ")[1:]
|
||||
dataclass_list = []
|
||||
for node in netmap_list:
|
||||
node = node.replace("\t", "").split("\n")
|
||||
node = *node[0].split(" ")[1:-1], *[row.split(": ")[-1] for row in node[1:-1]]
|
||||
dataclass_list.append(NodeNetmapInfo(*node))
|
||||
|
||||
return dataclass_list
|
||||
|
||||
|
||||
def parse_cmd_table(output: str, delimiter="|") -> list[dict[str, str]]:
|
||||
parsing_output = []
|
||||
reader = csv.reader(StringIO(output.strip()), delimiter=delimiter)
|
||||
iter_reader = iter(reader)
|
||||
header_row = next(iter_reader)
|
||||
for row in iter_reader:
|
||||
table = {}
|
||||
for i in range(len(row)):
|
||||
header = header_row[i].strip().lower().replace(" ", "_")
|
||||
value = row[i].strip().lower()
|
||||
if header:
|
||||
table[header] = value
|
||||
parsing_output.append(table)
|
||||
return parsing_output
|
||||
|
|
|
@ -3,6 +3,7 @@ from typing import Any
|
|||
import pytest
|
||||
|
||||
from frostfs_testlib.s3 import AwsCliClient, Boto3ClientWrapper
|
||||
from frostfs_testlib.storage.dataclasses.acl import EACLRole
|
||||
from frostfs_testlib.storage.dataclasses.frostfs_services import (
|
||||
HTTPGate,
|
||||
InnerRing,
|
||||
|
@ -30,6 +31,7 @@ class TestDataclassesStr:
|
|||
(S3Gate, "S3Gate"),
|
||||
(HTTPGate, "HTTPGate"),
|
||||
(InnerRing, "InnerRing"),
|
||||
(EACLRole.OTHERS, "OTHERS"),
|
||||
],
|
||||
)
|
||||
def test_classes_string_representation(self, obj: Any, expected: str):
|
||||
|
|
Loading…
Reference in a new issue