import logging import random from datetime import datetime, timedelta, timezone from typing import Optional import allure import pytest from dateutil import parser from frostfs_testlib import plugins, reporter from frostfs_testlib.cli import FrostfsCli from frostfs_testlib.clients import AwsCliClient, Boto3ClientWrapper, S3ClientWrapper, S3HttpClient from frostfs_testlib.clients.s3 import BucketContainerResolver, VersioningStatus from frostfs_testlib.credentials.interfaces import CredentialsProvider, User from frostfs_testlib.healthcheck.interfaces import Healthcheck from frostfs_testlib.hosting import Hosting from frostfs_testlib.resources import optionals from frostfs_testlib.resources.common import COMPLEX_OBJECT_CHUNKS_COUNT, COMPLEX_OBJECT_TAIL_SIZE, SIMPLE_OBJECT_SIZE from frostfs_testlib.shell import LocalShell, Shell from frostfs_testlib.steps import s3_helper from frostfs_testlib.steps.cli.container import DEFAULT_EC_PLACEMENT_RULE, DEFAULT_PLACEMENT_RULE, FROSTFS_CLI_EXEC from frostfs_testlib.steps.cli.object import get_netmap_netinfo from frostfs_testlib.steps.epoch import ensure_fresh_epoch from frostfs_testlib.storage.cluster import Cluster, ClusterNode from frostfs_testlib.storage.controllers.cluster_state_controller import ClusterStateController from frostfs_testlib.storage.dataclasses.frostfs_services import StorageNode from frostfs_testlib.storage.dataclasses.object_size import ObjectSize from frostfs_testlib.storage.dataclasses.policy import PlacementPolicy from frostfs_testlib.storage.dataclasses.wallet import WalletInfo from frostfs_testlib.storage.grpc_operations.client_wrappers import CliClientWrapper from frostfs_testlib.storage.grpc_operations.interfaces import GrpcClientWrapper from frostfs_testlib.testing.cluster_test_base import ClusterTestBase from frostfs_testlib.testing.parallel import parallel from frostfs_testlib.testing.test_control import cached_fixture, run_optionally, wait_for_success from frostfs_testlib.utils import env_utils, string_utils, version_utils from frostfs_testlib.utils.file_utils import TestFile, generate_file from ..helpers.container_creation import create_container_with_ape, create_containers_with_ape from ..helpers.container_request import EVERYONE_ALLOW_ALL, ContainerRequest, MultipleContainersRequest from ..resources.common import TEST_CYCLES_COUNT logger = logging.getLogger("NeoLogger") SERVICE_ACTIVE_TIME = 20 WALLTETS_IN_POOL = 2 # Add logs check test even if it's not fit to mark selectors def pytest_configure(config: pytest.Config): markers = config.option.markexpr if markers != "" and "sanity" not in markers: config.option.markexpr = f"logs_after_session or ({markers})" number_key = pytest.StashKey[str]() start_time = pytest.StashKey[int]() test_outcome = pytest.StashKey[str]() # pytest hook. Do not rename def pytest_collection_modifyitems(items: list[pytest.Item]): # Change order of tests based on @pytest.mark.order() marker def order(item: pytest.Item) -> int: order_marker = item.get_closest_marker("order") if order_marker and (len(order_marker.args) != 1 or not isinstance(order_marker.args[0], int)): raise RuntimeError("Incorrect usage of pytest.mark.order") order_value = order_marker.args[0] if order_marker else 0 return order_value items.sort(key=lambda item: order(item)) # pytest hook. Do not rename def pytest_collection_finish(session: pytest.Session): items_total = len(session.items) for number, item in enumerate(session.items, 1): item.stash[number_key] = f"[{number}/{items_total}]" item.stash[test_outcome] = "" item.stash[start_time] = 0 # pytest hook. Do not rename def pytest_runtest_setup(item: pytest.Item): item.stash[start_time] = int(datetime.now().timestamp()) logger.info(f"STARTED {item.stash[number_key]}: {item.name}") # pytest hook. Do not rename def pytest_runtest_makereport(item: pytest.Item, call: pytest.CallInfo): if call.excinfo is not None: if call.excinfo.typename == "Skipped": item.stash[start_time] = int(datetime.now().timestamp()) item.stash[test_outcome] += f"SKIPPED on {call.when}; " else: item.stash[test_outcome] += f"FAILED on {call.when}; " if call.when == "teardown": duration = int(datetime.now().timestamp()) - item.stash[start_time] if not item.stash[test_outcome]: outcome = "PASSED " else: outcome = item.stash[test_outcome] logger.info(f"ENDED {item.stash[number_key]}: {item.name}: {outcome}(duration={duration}s)") # pytest hook. Do not rename def pytest_generate_tests(metafunc: pytest.Metafunc): if ( TEST_CYCLES_COUNT <= 1 or metafunc.definition.get_closest_marker("logs_after_session") or metafunc.definition.get_closest_marker("no_cycles") ): return metafunc.fixturenames.append("cycle") metafunc.parametrize("cycle", range(1, TEST_CYCLES_COUNT + 1), ids=[f"cycle {cycle}" for cycle in range(1, TEST_CYCLES_COUNT + 1)]) @pytest.fixture(scope="session") def client_shell(configure_testlib) -> Shell: yield LocalShell() @pytest.fixture(scope="session") def require_multiple_hosts(hosting: Hosting): """Designates tests that require environment with multiple hosts. These tests will be skipped on an environment that has only 1 host. """ if len(hosting.hosts) <= 1: pytest.skip("Test only works with multiple hosts") yield @pytest.fixture(scope="session") def require_multiple_interfaces(cluster: Cluster): """ We determine that there are the required number of interfaces for tests If there are no required interfaces, the tests will be skipped. """ interfaces = cluster.cluster_nodes[0].host.config.interfaces if "internal1" not in interfaces or "data1" not in interfaces: pytest.skip("This test requires multiple internal and data interfaces") return @pytest.fixture(scope="session") @cached_fixture(optionals.OPTIONAL_CACHE_FIXTURES) def max_object_size(cluster: Cluster, client_shell: Shell) -> int: storage_node = cluster.storage_nodes[0] wallet = WalletInfo.from_node(storage_node) net_info = get_netmap_netinfo(wallet=wallet, endpoint=storage_node.get_rpc_endpoint(), shell=client_shell) return net_info["maximum_object_size"] @pytest.fixture(scope="session") def simple_object_size(max_object_size: int) -> ObjectSize: size = min(int(SIMPLE_OBJECT_SIZE), max_object_size) return ObjectSize("simple", size) @pytest.fixture(scope="session") def complex_object_size(max_object_size: int) -> ObjectSize: size = max_object_size * int(COMPLEX_OBJECT_CHUNKS_COUNT) + int(COMPLEX_OBJECT_TAIL_SIZE) return ObjectSize("complex", size) # By default we want all tests to be executed with both object sizes # This can be overriden in choosen tests if needed @pytest.fixture( scope="session", params=[pytest.param("simple", marks=pytest.mark.simple), pytest.param("complex", marks=pytest.mark.complex)] ) def object_size(simple_object_size: ObjectSize, complex_object_size: ObjectSize, request: pytest.FixtureRequest) -> ObjectSize: if request.param == "simple": return simple_object_size return complex_object_size @pytest.fixture() def test_file(object_size: ObjectSize) -> TestFile: return generate_file(object_size.value) @pytest.fixture(scope="module") def test_file_module(object_size: ObjectSize) -> TestFile: return generate_file(object_size.value) # Deprecated. Please migrate all to test_file @pytest.fixture() def file_path(test_file: TestFile) -> TestFile: return test_file @pytest.fixture(scope="session") def rep_placement_policy() -> PlacementPolicy: return PlacementPolicy("rep", DEFAULT_PLACEMENT_RULE) @pytest.fixture(scope="session") def ec_placement_policy() -> PlacementPolicy: return PlacementPolicy("ec", DEFAULT_EC_PLACEMENT_RULE) @pytest.fixture(scope="session") @allure.title("Init Frostfs CLI") def frostfs_cli(client_shell: Shell, wallet: WalletInfo) -> FrostfsCli: return FrostfsCli(client_shell, FROSTFS_CLI_EXEC, wallet.config_path) @pytest.fixture(scope="session") @allure.title("Init GrpcClientWrapper with local Frostfs CLI") def grpc_client(frostfs_cli: FrostfsCli) -> GrpcClientWrapper: return CliClientWrapper(frostfs_cli) # By default we want all tests to be executed with both storage policies. # This can be overriden in choosen tests if needed. @pytest.fixture(scope="session", params=[pytest.param("rep", marks=pytest.mark.rep), pytest.param("ec", marks=pytest.mark.ec)]) def placement_policy( rep_placement_policy: PlacementPolicy, ec_placement_policy: PlacementPolicy, request: pytest.FixtureRequest ) -> PlacementPolicy: if request.param == "rep": return rep_placement_policy elif request.param == "ec": return ec_placement_policy return request.param @pytest.fixture(scope="session") def cluster(temp_directory: str, hosting: Hosting, client_shell: Shell) -> Cluster: cluster = Cluster(hosting) if cluster.is_local_devenv(): cluster.create_wallet_configs(hosting) ClusterTestBase.shell = client_shell ClusterTestBase.cluster = cluster yield cluster @allure.title("[Session]: Provide S3 policy") @pytest.fixture(scope="session") def s3_policy(request: pytest.FixtureRequest): policy = None if "param" in request.__dict__: policy = request.param return policy @pytest.fixture(scope="session") @allure.title("[Session] Create healthcheck object") def healthcheck(cluster: Cluster) -> Healthcheck: healthcheck_cls = plugins.load_plugin("frostfs.testlib.healthcheck", cluster.cluster_nodes[0].host.config.healthcheck_plugin_name) return healthcheck_cls() @pytest.fixture(scope="session") def cluster_state_controller_session(client_shell: Shell, cluster: Cluster, healthcheck: Healthcheck) -> ClusterStateController: controller = ClusterStateController(client_shell, cluster, healthcheck) return controller @pytest.fixture def cluster_state_controller(cluster_state_controller_session: ClusterStateController) -> ClusterStateController: yield cluster_state_controller_session cluster_state_controller_session.start_stopped_hosts() cluster_state_controller_session.start_all_stopped_services() @pytest.fixture(scope="session") def credentials_provider(cluster: Cluster) -> CredentialsProvider: return CredentialsProvider(cluster) @allure.title("[Session]: Create S3 client") @pytest.fixture( scope="session", params=[ pytest.param(AwsCliClient, marks=[pytest.mark.aws, pytest.mark.weekly]), pytest.param(Boto3ClientWrapper, marks=[pytest.mark.boto3, pytest.mark.nightly]), ], ) def s3_client( user: User, s3_policy: Optional[str], cluster: Cluster, request: pytest.FixtureRequest, credentials_provider: CredentialsProvider, ) -> S3ClientWrapper: node = cluster.cluster_nodes[0] credentials_provider.S3.provide(user, node, s3_policy) s3_client_cls = request.param client = s3_client_cls(user.s3_credentials.access_key, user.s3_credentials.secret_key, cluster.default_s3_gate_endpoint) return client @allure.title("[Session] Create S3 http client") @pytest.fixture(scope="session") def s3_http_client( default_user: User, s3_policy: Optional[str], cluster: Cluster, credentials_provider: CredentialsProvider ) -> S3HttpClient: node = cluster.cluster_nodes[0] credentials_provider.S3.provide(default_user, node, s3_policy) return S3HttpClient( cluster.default_s3_gate_endpoint, default_user.s3_credentials.access_key, default_user.s3_credentials.secret_key, ) @pytest.fixture def versioning_status(request: pytest.FixtureRequest) -> VersioningStatus: if "param" in request.__dict__: return request.param return VersioningStatus.UNDEFINED @allure.title("[Session] Bulk create buckets for tests") @pytest.fixture(scope="session") def buckets_pool(s3_client: S3ClientWrapper, request: pytest.FixtureRequest): test_buckets: list = [] s3_client_type = type(s3_client).__name__ for test in request.session.items: if s3_client_type not in test.name: continue if "bucket" in test.fixturenames: test_buckets.append(string_utils.unique_name("bucket-")) if "two_buckets" in test.fixturenames: test_buckets.append(string_utils.unique_name("bucket-")) test_buckets.append(string_utils.unique_name("bucket-")) if test_buckets: parallel(s3_client.create_bucket, test_buckets) return test_buckets @allure.title("[Test] Create bucket") @pytest.fixture def bucket(buckets_pool: list[str], s3_client: S3ClientWrapper, versioning_status: VersioningStatus): if buckets_pool: bucket_name = buckets_pool.pop() else: bucket_name = s3_client.create_bucket() if versioning_status: s3_helper.set_bucket_versioning(s3_client, bucket_name, versioning_status) return bucket_name @allure.title("[Test] Create two buckets") @pytest.fixture def two_buckets(buckets_pool: list[str], s3_client: S3ClientWrapper) -> list[str]: buckets: list[str] = [] for _ in range(2): if buckets_pool: buckets.append(buckets_pool.pop()) else: buckets.append(s3_client.create_bucket()) return buckets @allure.title("[Autouse/Session] Collect binary versions") @pytest.fixture(scope="session", autouse=True) @run_optionally(optionals.OPTIONAL_AUTOUSE_FIXTURES_ENABLED) def collect_binary_versions(hosting: Hosting, client_shell: Shell, request: pytest.FixtureRequest): environment_dir = request.config.getoption("--alluredir") if not environment_dir: return None local_versions = version_utils.get_local_binaries_versions(client_shell) remote_versions = version_utils.get_remote_binaries_versions(hosting) remote_versions_keys = list(remote_versions.keys()) all_versions = { **local_versions, **{ f"{name}_{remote_versions_keys.index(host) + 1:02d}": version for host, versions in remote_versions.items() for name, version in versions.items() }, } file_path = f"{environment_dir}/environment.properties" env_utils.save_env_properties(file_path, all_versions) @reporter.step("[Autouse/Session] Test session start time") @pytest.fixture(scope="session", autouse=True) def session_start_time(configure_testlib): start_time = datetime.utcnow() return start_time @allure.title("[Autouse/Session] After deploy healthcheck") @pytest.fixture(scope="session", autouse=True) @run_optionally(optionals.OPTIONAL_AUTOUSE_FIXTURES_ENABLED) def after_deploy_healthcheck(cluster: Cluster): with reporter.step("Wait for cluster readiness after deploy"): parallel(readiness_on_node, cluster.cluster_nodes) @pytest.fixture(scope="session") def rpc_endpoint(cluster: Cluster): return cluster.default_rpc_endpoint @wait_for_success(60 * SERVICE_ACTIVE_TIME * 3, 60, title="Wait for {cluster_node} readiness") def readiness_on_node(cluster_node: ClusterNode): if "skip_readiness_check" in cluster_node.host.config.attributes and cluster_node.host.config.attributes["skip_readiness_check"]: return # TODO: Move to healtcheck classes svc_name = cluster_node.service(StorageNode).get_service_systemctl_name() with reporter.step(f"Check service {svc_name} is active"): result = cluster_node.host.get_shell().exec(f"systemctl is-active {svc_name}") assert "active" == result.stdout.strip(), f"Service {svc_name} should be in active state" with reporter.step(f"Check service {svc_name} is active more than {SERVICE_ACTIVE_TIME} minutes"): result = cluster_node.host.get_shell().exec(f"systemctl show {svc_name} --property ActiveEnterTimestamp | cut -d '=' -f 2") start_time = parser.parse(result.stdout.strip()) current_time = datetime.now(tz=timezone.utc) active_time = current_time - start_time active_minutes = active_time.seconds // 60 active_seconds = active_time.seconds - active_minutes * 60 assert active_time > timedelta( minutes=SERVICE_ACTIVE_TIME ), f"Service should be in active state more than {SERVICE_ACTIVE_TIME} minutes, current {active_minutes}m:{active_seconds}s" @reporter.step("Prepare default user with wallet") @pytest.fixture(scope="session") @cached_fixture(optionals.OPTIONAL_CACHE_FIXTURES) def default_user(credentials_provider: CredentialsProvider, cluster: Cluster) -> User: user = User(string_utils.unique_name("user-")) node = cluster.cluster_nodes[0] credentials_provider.GRPC.provide(user, node) return user @pytest.fixture(scope="session") @cached_fixture(optionals.OPTIONAL_CACHE_FIXTURES) def users_pool(credentials_provider: CredentialsProvider, cluster: Cluster) -> list[User]: users = [User(string_utils.unique_name("user-")) for _ in range(WALLTETS_IN_POOL)] parallel(credentials_provider.GRPC.provide, users, cluster_node=cluster.cluster_nodes[0]) return users @pytest.fixture(scope="session") def user_tag(request: pytest.FixtureRequest) -> str: tag = "default" if "param" in request.__dict__: tag = request.param return tag @pytest.fixture(scope="session") @cached_fixture(optionals.OPTIONAL_CACHE_FIXTURES) @reporter.step("Create {user_tag} user") def user(user_tag: str) -> User: user = User(string_utils.unique_name("user-")) user.attributes["tag"] = user_tag return user @pytest.fixture(scope="session") def wallet(user: User, credentials_provider: CredentialsProvider, cluster: Cluster) -> WalletInfo: credentials_provider.GRPC.provide(user, cluster.cluster_nodes[0]) return user.wallet # TODO: Migrate tests to fixture wallet above @reporter.step("Get wallet for default user") @pytest.fixture(scope="session") def default_wallet(wallet: WalletInfo) -> WalletInfo: return wallet @pytest.fixture(scope="session") @cached_fixture(optionals.OPTIONAL_CACHE_FIXTURES) def wallets_pool(users_pool: list[User]) -> list[WalletInfo]: return [user.wallet for user in users_pool] @pytest.fixture(scope="session") def other_wallet(wallets_pool: list[WalletInfo]) -> WalletInfo: if not wallets_pool: raise RuntimeError("[other_wallet] No wallets in pool. Consider increasing WALLTETS_IN_POOL or review.") return wallets_pool.pop() @pytest.fixture(scope="session") def other_wallet_2(wallets_pool: list[WalletInfo]) -> WalletInfo: if not wallets_pool: raise RuntimeError("[other_wallet2] No wallets in pool. Consider increasing WALLTETS_IN_POOL or review.") return wallets_pool.pop() @pytest.fixture() @allure.title("Select random node for testing") def node_under_test(cluster: Cluster) -> ClusterNode: selected_node = random.choice(cluster.cluster_nodes) reporter.attach(f"{selected_node}", "Selected node") return selected_node @allure.title("Init bucket container resolver") @pytest.fixture() def bucket_container_resolver(node_under_test: ClusterNode) -> BucketContainerResolver: resolver_cls = plugins.load_plugin("frostfs.testlib.bucket_cid_resolver", node_under_test.host.config.product) resolver: BucketContainerResolver = resolver_cls() return resolver @pytest.fixture(scope="session", params=[pytest.param(EVERYONE_ALLOW_ALL)]) def container_request(request: pytest.FixtureRequest) -> ContainerRequest: if "param" in request.__dict__: return request.param container_marker = request.node.get_closest_marker("container") # let default container to be public at the moment container_request = EVERYONE_ALLOW_ALL if container_marker: if len(container_marker.args) != 1: raise RuntimeError(f"Something wrong with container marker: {container_marker}") container_request = container_marker.args[0] if not container_request: raise RuntimeError( f"""Container specification is empty. Add @pytest.mark.parametrize("container_request", [ContainerRequest(...)], indirect=True) decorator.""" ) return container_request @pytest.fixture(scope="session") def multiple_containers_request(request: pytest.FixtureRequest) -> ContainerRequest: if "param" in request.__dict__: return request.param raise RuntimeError( f"""Container specification is empty. Add @pytest.mark.parametrize("container_requests", [[ContainerRequest(...), ..., ContainerRequest(...)]], indirect=True) decorator.""" ) @pytest.fixture def container( wallet: WalletInfo, frostfs_cli: FrostfsCli, client_shell: Shell, cluster: Cluster, rpc_endpoint: str, container_request: ContainerRequest, ) -> str: return create_container_with_ape(container_request, frostfs_cli, wallet, client_shell, cluster, rpc_endpoint) @pytest.fixture def containers( wallet: WalletInfo, frostfs_cli: FrostfsCli, client_shell: Shell, cluster: Cluster, rpc_endpoint: str, multiple_containers_request: MultipleContainersRequest, ) -> list[str]: return create_containers_with_ape(frostfs_cli, wallet, client_shell, cluster, rpc_endpoint, multiple_containers_request) @pytest.fixture() def new_epoch(client_shell: Shell, cluster: Cluster) -> int: return ensure_fresh_epoch(client_shell, cluster) @pytest.fixture(scope="module") def new_epoch_module_scope(client_shell: Shell, cluster: Cluster) -> int: return ensure_fresh_epoch(client_shell, cluster)