import logging import os import shutil import uuid from datetime import datetime import allure import pytest import yaml from frostfs_testlib.hosting import Hosting from frostfs_testlib.reporter import AllureHandler, get_reporter from frostfs_testlib.shell import LocalShell, Shell from frostfs_testlib.utils import wallet_utils from pytest_tests.helpers import binary_version, env_properties from pytest_tests.helpers.cluster import Cluster from pytest_tests.helpers.frostfs_verbs import get_netmap_netinfo from pytest_tests.helpers.k6 import LoadParams from pytest_tests.helpers.node_management import storage_node_healthcheck from pytest_tests.helpers.payment_neogo import deposit_gas, transfer_gas from pytest_tests.helpers.wallet import WalletFactory from pytest_tests.resources.common import ( ASSETS_DIR, COMPLEX_OBJECT_CHUNKS_COUNT, COMPLEX_OBJECT_TAIL_SIZE, FREE_STORAGE, HOSTING_CONFIG_FILE, SIMPLE_OBJECT_SIZE, STORAGE_NODE_SERVICE_NAME_REGEX, TEST_CYCLES_COUNT, WALLET_PASS, ) from pytest_tests.resources.load_params import ( BACKGROUND_LOAD_MAX_TIME, BACKGROUND_OBJ_SIZE, BACKGROUND_READERS_COUNT, BACKGROUND_WRITERS_COUNT, LOAD_NODE_SSH_PRIVATE_KEY_PATH, LOAD_NODE_SSH_USER, LOAD_NODES, ) from pytest_tests.steps.load import get_services_endpoints, prepare_k6_instances logger = logging.getLogger("NeoLogger") # Add logs check test even if it's not fit to mark selectors def pytest_configure(config: pytest.Config): markers = config.option.markexpr if markers != "": config.option.markexpr = f"logs_after_session or ({markers})" # pytest hook. Do not rename def pytest_collection_modifyitems(items: list[pytest.Item]): # Make network tests last based on @pytest.mark.node_mgmt and logs_test to be latest def priority(item: pytest.Item) -> int: is_node_mgmt_test = 1 if item.get_closest_marker("node_mgmt") else 0 is_logs_check_test = 100 if item.get_closest_marker("logs_after_session") else 0 return is_node_mgmt_test + is_logs_check_test items.sort(key=lambda item: priority(item)) def pytest_generate_tests(metafunc: pytest.Metafunc): if ( TEST_CYCLES_COUNT <= 1 or metafunc.definition.get_closest_marker("logs_after_session") or metafunc.definition.get_closest_marker("no_cycles") ): return metafunc.fixturenames.append("cycle") metafunc.parametrize( "cycle", range(1, TEST_CYCLES_COUNT + 1), ids=[f"cycle {cycle}" for cycle in range(1, TEST_CYCLES_COUNT + 1)], ) @pytest.fixture(scope="session") def configure_testlib(): get_reporter().register_handler(AllureHandler()) yield @pytest.fixture(scope="session") def client_shell(configure_testlib) -> Shell: yield LocalShell() @pytest.fixture(scope="session") def hosting(configure_testlib) -> Hosting: with open(HOSTING_CONFIG_FILE, "r") as file: hosting_config = yaml.full_load(file) hosting_instance = Hosting() hosting_instance.configure(hosting_config) yield hosting_instance @pytest.fixture(scope="session") def require_multiple_hosts(hosting: Hosting): """Designates tests that require environment with multiple hosts. These tests will be skipped on an environment that has only 1 host. """ if len(hosting.hosts) <= 1: pytest.skip("Test only works with multiple hosts") yield @pytest.fixture(scope="session") def max_object_size(cluster: Cluster, client_shell: Shell) -> int: storage_node = cluster.storage_nodes[0] net_info = get_netmap_netinfo( wallet=storage_node.get_wallet_path(), wallet_config=storage_node.get_wallet_config_path(), endpoint=storage_node.get_rpc_endpoint(), shell=client_shell, ) yield net_info["maximum_object_size"] @pytest.fixture(scope="session") def simple_object_size(max_object_size: int) -> int: yield int(SIMPLE_OBJECT_SIZE) if int(SIMPLE_OBJECT_SIZE) < max_object_size else max_object_size @pytest.fixture(scope="session") def complex_object_size(max_object_size: int) -> int: return max_object_size * int(COMPLEX_OBJECT_CHUNKS_COUNT) + int(COMPLEX_OBJECT_TAIL_SIZE) @pytest.fixture(scope="session") def wallet_factory(temp_directory: str, client_shell: Shell, cluster: Cluster) -> WalletFactory: return WalletFactory(temp_directory, client_shell, cluster) @pytest.fixture(scope="session") def cluster(temp_directory: str, hosting: Hosting) -> Cluster: cluster = Cluster(hosting) if cluster.is_local_devevn(): cluster.create_wallet_configs(hosting) yield cluster @pytest.fixture(scope="session", autouse=True) @allure.title("Check binary versions") def check_binary_versions(request, hosting: Hosting, client_shell: Shell): local_versions = binary_version.get_local_binaries_versions(client_shell) remote_versions = binary_version.get_remote_binaries_versions(hosting) all_versions = {**local_versions, **remote_versions} env_properties.save_env_properties(request.config, all_versions) @pytest.fixture(scope="session") @allure.title("Prepare tmp directory") def temp_directory(): with allure.step("Prepare tmp directory"): full_path = os.path.join(os.getcwd(), ASSETS_DIR) shutil.rmtree(full_path, ignore_errors=True) os.mkdir(full_path) yield full_path with allure.step("Remove tmp directory"): shutil.rmtree(full_path) @allure.step("[Autouse/Session] Test session start time") @pytest.fixture(scope="session", autouse=True) def session_start_time(): start_time = datetime.utcnow() return start_time @pytest.fixture(scope="session", autouse=True) @allure.title("Run health check for all storage nodes") def run_health_check(session_start_time, cluster: Cluster): failed_nodes = [] for node in cluster.storage_nodes: health_check = storage_node_healthcheck(node) if health_check.health_status != "READY" or health_check.network_status != "ONLINE": failed_nodes.append(node) if failed_nodes: raise AssertionError(f"Nodes {failed_nodes} are not healthy") @pytest.fixture(scope="session") def background_grpc_load(client_shell: Shell, hosting: Hosting): registry_file = os.path.join("/tmp/", f"{str(uuid.uuid4())}.bolt") prepare_file = os.path.join("/tmp/", f"{str(uuid.uuid4())}.json") allure.dynamic.title( f"Start background load with parameters: " f"writers = {BACKGROUND_WRITERS_COUNT}, " f"obj_size = {BACKGROUND_OBJ_SIZE}, " f"load_time = {BACKGROUND_LOAD_MAX_TIME}" f"prepare_json = {prepare_file}" ) with allure.step("Get endpoints"): endpoints_list = get_services_endpoints( hosting=hosting, service_name_regex=STORAGE_NODE_SERVICE_NAME_REGEX, endpoint_attribute="rpc_endpoint", ) endpoints = ",".join(endpoints_list) load_params = LoadParams( endpoint=endpoints, obj_size=BACKGROUND_OBJ_SIZE, registry_file=registry_file, containers_count=1, obj_count=0, out_file=prepare_file, readers=0, writers=BACKGROUND_WRITERS_COUNT, deleters=0, load_time=BACKGROUND_LOAD_MAX_TIME, load_type="grpc", ) k6_load_instances = prepare_k6_instances( load_nodes=LOAD_NODES, login=LOAD_NODE_SSH_USER, pkey=LOAD_NODE_SSH_PRIVATE_KEY_PATH, load_params=load_params, ) with allure.step("Run background load"): for k6_load_instance in k6_load_instances: k6_load_instance.start() yield with allure.step("Stop background load"): for k6_load_instance in k6_load_instances: k6_load_instance.stop() with allure.step("Verify background load data"): verify_params = LoadParams( endpoint=endpoints, clients=BACKGROUND_READERS_COUNT, registry_file=registry_file, load_time=BACKGROUND_LOAD_MAX_TIME, load_type="verify", ) k6_verify_instances = prepare_k6_instances( load_nodes=LOAD_NODES, login=LOAD_NODE_SSH_USER, pkey=LOAD_NODE_SSH_PRIVATE_KEY_PATH, load_params=verify_params, prepare=False, ) with allure.step("Run verify background load data"): for k6_verify_instance in k6_verify_instances: k6_verify_instance.start() k6_verify_instance.wait_until_finished(BACKGROUND_LOAD_MAX_TIME) @pytest.fixture(scope="session") @allure.title("Prepare wallet and deposit") def default_wallet(client_shell: Shell, temp_directory: str, cluster: Cluster): wallet_path = os.path.join(os.getcwd(), ASSETS_DIR, f"{str(uuid.uuid4())}.json") wallet_utils.init_wallet(wallet_path, WALLET_PASS) allure.attach.file(wallet_path, os.path.basename(wallet_path), allure.attachment_type.JSON) if not FREE_STORAGE: main_chain = cluster.main_chain_nodes[0] deposit = 30 transfer_gas( shell=client_shell, amount=deposit + 1, main_chain=main_chain, wallet_to_path=wallet_path, wallet_to_password=WALLET_PASS, ) deposit_gas( shell=client_shell, main_chain=main_chain, amount=deposit, wallet_from_path=wallet_path, wallet_from_password=WALLET_PASS, ) return wallet_path