import logging import os import random import shutil from datetime import datetime, timedelta, timezone from importlib.metadata import entry_points from typing import Optional import allure import pytest import yaml from dateutil import parser from frostfs_testlib import plugins, reporter from frostfs_testlib.healthcheck.interfaces import Healthcheck from frostfs_testlib.hosting import Hosting from frostfs_testlib.reporter import AllureHandler, StepsLogger from frostfs_testlib.resources.common import ( ASSETS_DIR, COMPLEX_OBJECT_CHUNKS_COUNT, COMPLEX_OBJECT_TAIL_SIZE, DEFAULT_WALLET_PASS, SIMPLE_OBJECT_SIZE, ) from frostfs_testlib.s3 import AwsCliClient, Boto3ClientWrapper, S3ClientWrapper, VersioningStatus from frostfs_testlib.shell import LocalShell, Shell from frostfs_testlib.steps.cli.container import list_containers from frostfs_testlib.steps.cli.object import get_netmap_netinfo from frostfs_testlib.steps.s3 import s3_helper from frostfs_testlib.storage import get_service_registry from frostfs_testlib.storage.cluster import Cluster, ClusterNode from frostfs_testlib.storage.controllers.cluster_state_controller import ClusterStateController from frostfs_testlib.storage.dataclasses.frostfs_services import StorageNode from frostfs_testlib.storage.dataclasses.object_size import ObjectSize from frostfs_testlib.storage.dataclasses.wallet import WalletFactory, WalletInfo from frostfs_testlib.testing.cluster_test_base import ClusterTestBase from frostfs_testlib.testing.parallel import parallel from frostfs_testlib.testing.test_control import wait_for_success from frostfs_testlib.utils import env_utils, version_utils from pytest_tests.resources.common import HOSTING_CONFIG_FILE, TEST_CYCLES_COUNT logger = logging.getLogger("NeoLogger") SERVICE_ACTIVE_TIME = 20 # Add logs check test even if it's not fit to mark selectors def pytest_configure(config: pytest.Config): markers = config.option.markexpr if markers != "": config.option.markexpr = f"logs_after_session or ({markers})" number_key = pytest.StashKey[str]() start_time = pytest.StashKey[int]() test_outcome = pytest.StashKey[str]() # pytest hook. Do not rename def pytest_collection_modifyitems(items: list[pytest.Item]): # Make network tests last based on @pytest.mark.node_mgmt and logs_test to be latest def priority(item: pytest.Item) -> int: is_node_mgmt_test = 1 if item.get_closest_marker("node_mgmt") else 0 is_logs_check_test = 100 if item.get_closest_marker("logs_after_session") else 0 is_system_time_test = 10 if item.get_closest_marker("time") else 0 return is_node_mgmt_test + is_logs_check_test + is_system_time_test items.sort(key=lambda item: priority(item)) # pytest hook. Do not rename def pytest_collection_finish(session: pytest.Session): items_total = len(session.items) for number, item in enumerate(session.items, 1): item.stash[number_key] = f"[{number}/{items_total}]" item.stash[test_outcome] = "" item.stash[start_time] = 0 # pytest hook. Do not rename def pytest_runtest_setup(item: pytest.Item): item.stash[start_time] = int(datetime.now().timestamp()) logger.info(f"STARTED {item.stash[number_key]}: {item.name}") # pytest hook. Do not rename def pytest_runtest_makereport(item: pytest.Item, call: pytest.CallInfo): if call.excinfo is not None: if call.excinfo.typename == "Skipped": item.stash[start_time] = int(datetime.now().timestamp()) item.stash[test_outcome] += f"SKIPPED on {call.when}; " else: item.stash[test_outcome] += f"FAILED on {call.when}; " if call.when == "teardown": duration = int(datetime.now().timestamp()) - item.stash[start_time] if not item.stash[test_outcome]: outcome = "PASSED " else: outcome = item.stash[test_outcome] logger.info(f"ENDED {item.stash[number_key]}: {item.name}: {outcome}(duration={duration}s)") # pytest hook. Do not rename def pytest_generate_tests(metafunc: pytest.Metafunc): if ( TEST_CYCLES_COUNT <= 1 or metafunc.definition.get_closest_marker("logs_after_session") or metafunc.definition.get_closest_marker("no_cycles") ): return metafunc.fixturenames.append("cycle") metafunc.parametrize( "cycle", range(1, TEST_CYCLES_COUNT + 1), ids=[f"cycle {cycle}" for cycle in range(1, TEST_CYCLES_COUNT + 1)], ) @pytest.fixture(scope="session") def configure_testlib(): reporter.get_reporter().register_handler(AllureHandler()) reporter.get_reporter().register_handler(StepsLogger()) logging.getLogger("paramiko").setLevel(logging.INFO) # Register Services for cluster registry = get_service_registry() services = entry_points(group="frostfs.testlib.services") for svc in services: registry.register_service(svc.name, svc.load()) yield @pytest.fixture(scope="session") def client_shell(configure_testlib) -> Shell: yield LocalShell() @pytest.fixture(scope="session") def hosting(configure_testlib) -> Hosting: with open(HOSTING_CONFIG_FILE, "r") as file: hosting_config = yaml.full_load(file) hosting_instance = Hosting() hosting_instance.configure(hosting_config) yield hosting_instance @pytest.fixture(scope="session") def require_multiple_hosts(hosting: Hosting): """Designates tests that require environment with multiple hosts. These tests will be skipped on an environment that has only 1 host. """ if len(hosting.hosts) <= 1: pytest.skip("Test only works with multiple hosts") yield @pytest.fixture(scope="session") def require_multiple_interfaces(cluster: Cluster): """ We determine that there are the required number of interfaces for tests If there are no required interfaces, the tests will be skipped. """ interfaces = cluster.cluster_nodes[0].host.config.interfaces if "internal1" not in interfaces or "data1" not in interfaces: pytest.skip("This test requires multiple internal and data interfaces") yield @pytest.fixture(scope="session") def max_object_size(cluster: Cluster, client_shell: Shell) -> int: storage_node = cluster.storage_nodes[0] net_info = get_netmap_netinfo( wallet=storage_node.get_wallet_path(), wallet_config=storage_node.get_wallet_config_path(), endpoint=storage_node.get_rpc_endpoint(), shell=client_shell, ) yield net_info["maximum_object_size"] @pytest.fixture(scope="session") def simple_object_size(max_object_size: int) -> ObjectSize: size = min(int(SIMPLE_OBJECT_SIZE), max_object_size) return ObjectSize("simple", size) @pytest.fixture(scope="session") def complex_object_size(max_object_size: int) -> ObjectSize: size = max_object_size * int(COMPLEX_OBJECT_CHUNKS_COUNT) + int(COMPLEX_OBJECT_TAIL_SIZE) return ObjectSize("complex", size) # By default we want all tests to be executed with both object sizes # This can be overriden in choosen tests if needed @pytest.fixture( scope="session", params=[pytest.param("simple", marks=pytest.mark.simple), pytest.param("complex", marks=pytest.mark.complex)], ) def object_size( simple_object_size: ObjectSize, complex_object_size: ObjectSize, request: pytest.FixtureRequest ) -> ObjectSize: if request.param == "simple": return simple_object_size return complex_object_size @pytest.fixture(scope="session") def wallet_factory(temp_directory: str, client_shell: Shell, cluster: Cluster) -> WalletFactory: return WalletFactory(temp_directory, client_shell, cluster) @pytest.fixture(scope="session") def cluster(temp_directory: str, hosting: Hosting, client_shell: Shell) -> Cluster: cluster = Cluster(hosting) if cluster.is_local_devenv(): cluster.create_wallet_configs(hosting) ClusterTestBase.shell = client_shell ClusterTestBase.cluster = cluster yield cluster @reporter.step("[Class]: Provide S3 policy") @pytest.fixture(scope="class") def s3_policy(request: pytest.FixtureRequest): policy = None if "param" in request.__dict__: policy = request.param return policy @pytest.fixture(scope="session") @allure.title("[Session] Create healthcheck object") def healthcheck(cluster: Cluster) -> Healthcheck: healthcheck_cls = plugins.load_plugin( "frostfs.testlib.healthcheck", cluster.cluster_nodes[0].host.config.healthcheck_plugin_name ) return healthcheck_cls() @pytest.fixture(scope="session") def cluster_state_controller(client_shell: Shell, cluster: Cluster, healthcheck: Healthcheck) -> ClusterStateController: controller = ClusterStateController(client_shell, cluster, healthcheck) yield controller @reporter.step("[Class]: Create S3 client") @pytest.fixture( scope="class", params=[ pytest.param(AwsCliClient, marks=pytest.mark.aws), pytest.param(Boto3ClientWrapper, marks=pytest.mark.boto3), ], ) def s3_client( default_wallet: str, client_shell: Shell, s3_policy: Optional[str], cluster: Cluster, auth_container_placement_policy: str, request: pytest.FixtureRequest, ) -> S3ClientWrapper: wallet = WalletInfo(path=default_wallet, password=DEFAULT_WALLET_PASS) (cid, access_key_id, secret_access_key) = s3_helper.init_s3_credentials( wallet, client_shell, cluster, s3gates=[cluster_node.s3_gate for cluster_node in cluster.cluster_nodes], policy=s3_policy, container_placement_policy=auth_container_placement_policy, ) containers_list = list_containers(wallet.path, shell=client_shell, endpoint=cluster.default_rpc_endpoint) assert cid in containers_list, f"Expected cid {cid} in {containers_list}" s3_client_cls = request.param client = s3_client_cls(access_key_id, secret_access_key, cluster.default_s3_gate_endpoint) yield client @pytest.fixture def versioning_status(request: pytest.FixtureRequest) -> VersioningStatus: if "param" in request.__dict__: return request.param return VersioningStatus.UNDEFINED @reporter.step("Create/delete bucket") @pytest.fixture def bucket(s3_client: S3ClientWrapper, versioning_status: VersioningStatus, request: pytest.FixtureRequest): bucket_name = s3_client.create_bucket() if versioning_status: s3_helper.set_bucket_versioning(s3_client, bucket_name, versioning_status) yield bucket_name if "sanity" not in request.config.option.markexpr: s3_helper.delete_bucket_with_objects(s3_client, bucket_name) @reporter.step("Create two buckets") @pytest.fixture def two_buckets(s3_client: S3ClientWrapper, request: pytest.FixtureRequest): bucket_1 = s3_client.create_bucket() bucket_2 = s3_client.create_bucket() yield bucket_1, bucket_2 if "sanity" not in request.config.option.markexpr: for bucket_name in [bucket_1, bucket_2]: s3_helper.delete_bucket_with_objects(s3_client, bucket_name) @reporter.step("[Autouse/Session] Check binary versions") @pytest.fixture(scope="session", autouse=True) def check_binary_versions(hosting: Hosting, client_shell: Shell, request: pytest.FixtureRequest): local_versions = version_utils.get_local_binaries_versions(client_shell) remote_versions = version_utils.get_remote_binaries_versions(hosting) all_versions = { **local_versions, **{binary_name: binary["version"] for binary_name, binary in remote_versions.items()}, } environment_dir = request.config.getoption("--alluredir") if not environment_dir: return None file_path = f"{environment_dir}/environment.properties" env_utils.save_env_properties(file_path, all_versions) @reporter.step("Prepare tmp directory") @pytest.fixture(scope="session") def temp_directory(configure_testlib): with reporter.step("Prepare tmp directory"): full_path = os.path.join(os.getcwd(), ASSETS_DIR) shutil.rmtree(full_path, ignore_errors=True) os.mkdir(full_path) yield full_path with reporter.step("Remove tmp directory"): shutil.rmtree(full_path) @reporter.step("[Autouse/Session] Test session start time") @pytest.fixture(scope="session", autouse=True) def session_start_time(configure_testlib): start_time = datetime.utcnow() return start_time @allure.title("[Autouse/Session] After deploy healthcheck") @pytest.fixture(scope="session", autouse=True) def after_deploy_healthcheck(cluster: Cluster): with reporter.step("Wait for cluster readiness after deploy"): parallel(readiness_on_node, cluster.cluster_nodes) @wait_for_success(60 * SERVICE_ACTIVE_TIME * 3, 60, title="Wait for {cluster_node} readiness") def readiness_on_node(cluster_node: ClusterNode): # TODO: Move to healtcheck classes svc_name = cluster_node.service(StorageNode).get_service_systemctl_name() with reporter.step(f"Check service {svc_name} is active"): result = cluster_node.host.get_shell().exec(f"systemctl is-active {svc_name}") assert "active" == result.stdout.strip(), f"Service {svc_name} should be in active state" with reporter.step(f"Check service {svc_name} is active more than {SERVICE_ACTIVE_TIME} minutes"): result = cluster_node.host.get_shell().exec( f"systemctl show {svc_name} --property ActiveEnterTimestamp | cut -d '=' -f 2" ) start_time = parser.parse(result.stdout.strip()) current_time = datetime.now(tz=timezone.utc) active_time = current_time - start_time active_minutes = active_time.seconds // 60 active_seconds = active_time.seconds - active_minutes * 60 assert active_time > timedelta( minutes=SERVICE_ACTIVE_TIME ), f"Service should be in active state more than {SERVICE_ACTIVE_TIME} minutes, current {active_minutes}m:{active_seconds}s" @allure.title("[Autouse/Test] Run health check for all nodes") @pytest.fixture(autouse=True) def run_health_check(healthcheck: Healthcheck, cluster: Cluster, request: pytest.FixtureRequest): if request.node.get_closest_marker("no_healthcheck"): # Skip healthcheck for tests marked with no_healthcheck return parallel(healthcheck.storage_healthcheck, cluster.cluster_nodes) @reporter.step("Prepare wallet and deposit") @pytest.fixture(scope="session") def default_wallet(wallet_factory: WalletFactory) -> str: wallet = wallet_factory.create_wallet(password=DEFAULT_WALLET_PASS) reporter.attach(wallet.path, os.path.basename(wallet.path)) return wallet.path @reporter.step("[Class]: Container placement policy for keys") @pytest.fixture(scope="class") def auth_container_placement_policy(cluster: Cluster, request: pytest.FixtureRequest): placeholders = { "$ALPHABET_NODE_COUNT$": 4 if len(cluster.cluster_nodes) < 8 else 8, "$NODE_COUNT$": len(cluster.cluster_nodes), } placement_policy = None if "param" in request.__dict__: placement_policy = request.param for key, value in placeholders.items(): placement_policy = placement_policy.replace(key, str(value)) return placement_policy @pytest.fixture() @allure.title("Select random node for testing") def node_under_test(cluster: Cluster) -> ClusterNode: selected_node = random.choice(cluster.cluster_nodes) reporter.attach(f"{selected_node}", "Selected node") return selected_node