import json import logging import os import random import shutil import time from datetime import datetime, timedelta, timezone from typing import Optional import allure import pytest from dateutil import parser from frostfs_testlib import plugins, reporter from frostfs_testlib.cli import FrostfsCli from frostfs_testlib.credentials.interfaces import CredentialsProvider, User from frostfs_testlib.healthcheck.interfaces import Healthcheck from frostfs_testlib.hosting import Hosting from frostfs_testlib.resources import optionals from frostfs_testlib.resources.common import COMPLEX_OBJECT_CHUNKS_COUNT, COMPLEX_OBJECT_TAIL_SIZE, MORPH_BLOCK_TIME, SIMPLE_OBJECT_SIZE from frostfs_testlib.resources.wellknown_acl import PUBLIC_ACL from frostfs_testlib.s3 import AwsCliClient, Boto3ClientWrapper, S3ClientWrapper, VersioningStatus from frostfs_testlib.s3.interfaces import BucketContainerResolver from frostfs_testlib.shell import LocalShell, Shell from frostfs_testlib.steps.cli.container import ( DEFAULT_EC_PLACEMENT_RULE, DEFAULT_PLACEMENT_RULE, FROSTFS_CLI_EXEC, create_container, search_nodes_with_container, ) from frostfs_testlib.steps.cli.object import get_netmap_netinfo from frostfs_testlib.steps.epoch import ensure_fresh_epoch from frostfs_testlib.steps.s3 import s3_helper from frostfs_testlib.storage.cluster import Cluster, ClusterNode from frostfs_testlib.storage.controllers.cluster_state_controller import ClusterStateController from frostfs_testlib.storage.dataclasses import ape from frostfs_testlib.storage.dataclasses.frostfs_services import StorageNode from frostfs_testlib.storage.dataclasses.object_size import ObjectSize from frostfs_testlib.storage.dataclasses.policy import PlacementPolicy from frostfs_testlib.storage.dataclasses.wallet import WalletInfo from frostfs_testlib.storage.grpc_operations.client_wrappers import CliClientWrapper from frostfs_testlib.storage.grpc_operations.interfaces import GrpcClientWrapper from frostfs_testlib.testing.cluster_test_base import ClusterTestBase from frostfs_testlib.testing.parallel import parallel from frostfs_testlib.testing.test_control import run_optionally, wait_for_success from frostfs_testlib.utils import datetime_utils, env_utils, string_utils, version_utils from frostfs_testlib.utils.file_utils import TestFile, generate_file from workspace.frostfs_testcases.pytest_tests.helpers.container_spec import ContainerSpec from ..resources.common import TEST_CYCLES_COUNT logger = logging.getLogger("NeoLogger") SERVICE_ACTIVE_TIME = 20 WALLTETS_IN_POOL = 2 # Add logs check test even if it's not fit to mark selectors def pytest_configure(config: pytest.Config): markers = config.option.markexpr if markers != "" and "sanity" not in markers: config.option.markexpr = f"logs_after_session or ({markers})" number_key = pytest.StashKey[str]() start_time = pytest.StashKey[int]() test_outcome = pytest.StashKey[str]() # pytest hook. Do not rename def pytest_collection_modifyitems(items: list[pytest.Item]): # Change order of tests based on @pytest.mark.order() marker def order(item: pytest.Item) -> int: order_marker = item.get_closest_marker("order") if order_marker and (len(order_marker.args) != 1 or not isinstance(order_marker.args[0], int)): raise RuntimeError("Incorrect usage of pytest.mark.order") order_value = order_marker.args[0] if order_marker else 0 return order_value items.sort(key=lambda item: order(item)) # pytest hook. Do not rename def pytest_collection_finish(session: pytest.Session): items_total = len(session.items) for number, item in enumerate(session.items, 1): item.stash[number_key] = f"[{number}/{items_total}]" item.stash[test_outcome] = "" item.stash[start_time] = 0 # pytest hook. Do not rename def pytest_runtest_setup(item: pytest.Item): item.stash[start_time] = int(datetime.now().timestamp()) logger.info(f"STARTED {item.stash[number_key]}: {item.name}") # pytest hook. Do not rename def pytest_runtest_makereport(item: pytest.Item, call: pytest.CallInfo): if call.excinfo is not None: if call.excinfo.typename == "Skipped": item.stash[start_time] = int(datetime.now().timestamp()) item.stash[test_outcome] += f"SKIPPED on {call.when}; " else: item.stash[test_outcome] += f"FAILED on {call.when}; " if call.when == "teardown": duration = int(datetime.now().timestamp()) - item.stash[start_time] if not item.stash[test_outcome]: outcome = "PASSED " else: outcome = item.stash[test_outcome] logger.info(f"ENDED {item.stash[number_key]}: {item.name}: {outcome}(duration={duration}s)") # pytest hook. Do not rename def pytest_generate_tests(metafunc: pytest.Metafunc): if ( TEST_CYCLES_COUNT <= 1 or metafunc.definition.get_closest_marker("logs_after_session") or metafunc.definition.get_closest_marker("no_cycles") ): return metafunc.fixturenames.append("cycle") metafunc.parametrize("cycle", range(1, TEST_CYCLES_COUNT + 1), ids=[f"cycle {cycle}" for cycle in range(1, TEST_CYCLES_COUNT + 1)]) @pytest.fixture(scope="session") def client_shell(configure_testlib) -> Shell: yield LocalShell() @pytest.fixture(scope="session") def require_multiple_hosts(hosting: Hosting): """Designates tests that require environment with multiple hosts. These tests will be skipped on an environment that has only 1 host. """ if len(hosting.hosts) <= 1: pytest.skip("Test only works with multiple hosts") yield @pytest.fixture(scope="session") def require_multiple_interfaces(cluster: Cluster): """ We determine that there are the required number of interfaces for tests If there are no required interfaces, the tests will be skipped. """ interfaces = cluster.cluster_nodes[0].host.config.interfaces if "internal1" not in interfaces or "data1" not in interfaces: pytest.skip("This test requires multiple internal and data interfaces") yield @pytest.fixture(scope="session") def max_object_size(cluster: Cluster, client_shell: Shell) -> int: storage_node = cluster.storage_nodes[0] wallet = WalletInfo.from_node(storage_node) net_info = get_netmap_netinfo(wallet=wallet, endpoint=storage_node.get_rpc_endpoint(), shell=client_shell) yield net_info["maximum_object_size"] @pytest.fixture(scope="session") def simple_object_size(max_object_size: int) -> ObjectSize: size = min(int(SIMPLE_OBJECT_SIZE), max_object_size) return ObjectSize("simple", size) @pytest.fixture(scope="session") def complex_object_size(max_object_size: int) -> ObjectSize: size = max_object_size * int(COMPLEX_OBJECT_CHUNKS_COUNT) + int(COMPLEX_OBJECT_TAIL_SIZE) return ObjectSize("complex", size) # By default we want all tests to be executed with both object sizes # This can be overriden in choosen tests if needed @pytest.fixture( scope="session", params=[pytest.param("simple", marks=pytest.mark.simple), pytest.param("complex", marks=pytest.mark.complex)] ) def object_size(simple_object_size: ObjectSize, complex_object_size: ObjectSize, request: pytest.FixtureRequest) -> ObjectSize: if request.param == "simple": return simple_object_size return complex_object_size @pytest.fixture() def test_file(object_size: ObjectSize) -> TestFile: return generate_file(object_size.value) # Deprecated. Please migrate all to test_file @pytest.fixture() def file_path(test_file: TestFile) -> TestFile: return test_file @pytest.fixture(scope="session") def rep_placement_policy() -> PlacementPolicy: return PlacementPolicy("rep", DEFAULT_PLACEMENT_RULE) @pytest.fixture(scope="session") def ec_placement_policy() -> PlacementPolicy: return PlacementPolicy("ec", DEFAULT_EC_PLACEMENT_RULE) @pytest.fixture(scope="session") @allure.title("Init Frostfs CLI") def frostfs_cli(client_shell: Shell, default_wallet: WalletInfo) -> FrostfsCli: return FrostfsCli(client_shell, FROSTFS_CLI_EXEC, default_wallet.config_path) @pytest.fixture(scope="session") @allure.title("Init GrpcClientWrapper with local Frostfs CLI") def grpc_client(frostfs_cli: FrostfsCli) -> GrpcClientWrapper: return CliClientWrapper(frostfs_cli) # By default we want all tests to be executed with both storage policies. # This can be overriden in choosen tests if needed. @pytest.fixture(scope="session", params=[pytest.param("rep", marks=pytest.mark.rep), pytest.param("ec", marks=pytest.mark.ec)]) def placement_policy( rep_placement_policy: PlacementPolicy, ec_placement_policy: PlacementPolicy, request: pytest.FixtureRequest ) -> PlacementPolicy: if request.param == "rep": return rep_placement_policy return ec_placement_policy @pytest.fixture(scope="session") def cluster(temp_directory: str, hosting: Hosting, client_shell: Shell) -> Cluster: cluster = Cluster(hosting) if cluster.is_local_devenv(): cluster.create_wallet_configs(hosting) ClusterTestBase.shell = client_shell ClusterTestBase.cluster = cluster yield cluster @allure.title("[Session]: Provide S3 policy") @pytest.fixture(scope="session") def s3_policy(request: pytest.FixtureRequest): policy = None if "param" in request.__dict__: policy = request.param return policy @pytest.fixture(scope="session") @allure.title("[Session] Create healthcheck object") def healthcheck(cluster: Cluster) -> Healthcheck: healthcheck_cls = plugins.load_plugin("frostfs.testlib.healthcheck", cluster.cluster_nodes[0].host.config.healthcheck_plugin_name) return healthcheck_cls() @pytest.fixture(scope="session") def cluster_state_controller_session(client_shell: Shell, cluster: Cluster, healthcheck: Healthcheck) -> ClusterStateController: controller = ClusterStateController(client_shell, cluster, healthcheck) return controller @pytest.fixture def cluster_state_controller(cluster_state_controller_session: ClusterStateController) -> ClusterStateController: yield cluster_state_controller_session cluster_state_controller_session.start_stopped_hosts() cluster_state_controller_session.start_all_stopped_services() @pytest.fixture(scope="session") def credentials_provider(cluster: Cluster) -> CredentialsProvider: return CredentialsProvider(cluster) @allure.title("[Session]: Create S3 client") @pytest.fixture( scope="session", params=[ pytest.param(AwsCliClient, marks=[pytest.mark.aws, pytest.mark.weekly]), pytest.param(Boto3ClientWrapper, marks=[pytest.mark.boto3, pytest.mark.nightly]), ], ) def s3_client( default_user: User, s3_policy: Optional[str], cluster: Cluster, request: pytest.FixtureRequest, credentials_provider: CredentialsProvider, ) -> S3ClientWrapper: node = cluster.cluster_nodes[0] credentials_provider.S3.provide(default_user, node, s3_policy) s3_client_cls = request.param client = s3_client_cls(default_user.s3_credentials.access_key, default_user.s3_credentials.secret_key, cluster.default_s3_gate_endpoint) return client @pytest.fixture def versioning_status(request: pytest.FixtureRequest) -> VersioningStatus: if "param" in request.__dict__: return request.param return VersioningStatus.UNDEFINED @allure.title("[Session] Bulk create buckets for tests") @pytest.fixture(scope="session") def buckets_pool(s3_client: S3ClientWrapper, request: pytest.FixtureRequest): test_buckets: list = [] s3_client_type = type(s3_client).__name__ for test in request.session.items: if s3_client_type not in test.name: continue if "bucket" in test.fixturenames: test_buckets.append(string_utils.unique_name("bucket-")) if "two_buckets" in test.fixturenames: test_buckets.append(string_utils.unique_name("bucket-")) test_buckets.append(string_utils.unique_name("bucket-")) if test_buckets: parallel(s3_client.create_bucket, test_buckets) return test_buckets @allure.title("[Test] Create bucket") @pytest.fixture def bucket(buckets_pool: list[str], s3_client: S3ClientWrapper, versioning_status: VersioningStatus): if buckets_pool: bucket_name = buckets_pool.pop() else: bucket_name = s3_client.create_bucket() if versioning_status: s3_helper.set_bucket_versioning(s3_client, bucket_name, versioning_status) return bucket_name @allure.title("[Test] Create two buckets") @pytest.fixture def two_buckets(buckets_pool: list[str], s3_client: S3ClientWrapper) -> list[str]: buckets: list[str] = [] for _ in range(2): if buckets_pool: buckets.append(buckets_pool.pop()) else: buckets.append(s3_client.create_bucket()) return buckets @allure.title("[Autouse/Session] Collect binary versions") @pytest.fixture(scope="session", autouse=True) @run_optionally(optionals.OPTIONAL_AUTOUSE_FIXTURES_ENABLED) def collect_binary_versions(hosting: Hosting, client_shell: Shell, request: pytest.FixtureRequest): environment_dir = request.config.getoption("--alluredir") if not environment_dir: return None local_versions = version_utils.get_local_binaries_versions(client_shell) remote_versions = version_utils.get_remote_binaries_versions(hosting) remote_versions_keys = list(remote_versions.keys()) all_versions = { **local_versions, **{ f"{name}_{remote_versions_keys.index(host) + 1:02d}": version for host, versions in remote_versions.items() for name, version in versions.items() }, } file_path = f"{environment_dir}/environment.properties" env_utils.save_env_properties(file_path, all_versions) @reporter.step("[Autouse/Session] Test session start time") @pytest.fixture(scope="session", autouse=True) def session_start_time(configure_testlib): start_time = datetime.utcnow() return start_time @allure.title("[Autouse/Session] After deploy healthcheck") @pytest.fixture(scope="session", autouse=True) @run_optionally(optionals.OPTIONAL_AUTOUSE_FIXTURES_ENABLED) def after_deploy_healthcheck(cluster: Cluster): with reporter.step("Wait for cluster readiness after deploy"): parallel(readiness_on_node, cluster.cluster_nodes) @pytest.fixture(scope="session") def rpc_endpoint(cluster: Cluster): return cluster.default_rpc_endpoint @wait_for_success(60 * SERVICE_ACTIVE_TIME * 3, 60, title="Wait for {cluster_node} readiness") def readiness_on_node(cluster_node: ClusterNode): if "skip_readiness_check" in cluster_node.host.config.attributes and cluster_node.host.config.attributes["skip_readiness_check"]: return # TODO: Move to healtcheck classes svc_name = cluster_node.service(StorageNode).get_service_systemctl_name() with reporter.step(f"Check service {svc_name} is active"): result = cluster_node.host.get_shell().exec(f"systemctl is-active {svc_name}") assert "active" == result.stdout.strip(), f"Service {svc_name} should be in active state" with reporter.step(f"Check service {svc_name} is active more than {SERVICE_ACTIVE_TIME} minutes"): result = cluster_node.host.get_shell().exec(f"systemctl show {svc_name} --property ActiveEnterTimestamp | cut -d '=' -f 2") start_time = parser.parse(result.stdout.strip()) current_time = datetime.now(tz=timezone.utc) active_time = current_time - start_time active_minutes = active_time.seconds // 60 active_seconds = active_time.seconds - active_minutes * 60 assert active_time > timedelta( minutes=SERVICE_ACTIVE_TIME ), f"Service should be in active state more than {SERVICE_ACTIVE_TIME} minutes, current {active_minutes}m:{active_seconds}s" @reporter.step("Prepare default user with wallet") @pytest.fixture(scope="session") def default_user(credentials_provider: CredentialsProvider, cluster: Cluster) -> User: user = User(string_utils.unique_name("user-")) node = cluster.cluster_nodes[0] credentials_provider.GRPC.provide(user, node) return user @reporter.step("Get wallet for default user") @pytest.fixture(scope="session") def default_wallet(default_user: User) -> WalletInfo: return default_user.wallet @pytest.fixture(scope="session") def wallets_pool(credentials_provider: CredentialsProvider, cluster: Cluster) -> list[WalletInfo]: users = [User(string_utils.unique_name("user-")) for _ in range(WALLTETS_IN_POOL)] parallel(credentials_provider.GRPC.provide, users, cluster_node=cluster.cluster_nodes[0]) return [user.wallet for user in users] @pytest.fixture(scope="session") def other_wallet(wallets_pool: list[WalletInfo]) -> WalletInfo: if not wallets_pool: raise RuntimeError("[other_wallet] No wallets in pool. Consider increasing WALLTETS_IN_POOL or review.") return wallets_pool.pop() @pytest.fixture(scope="session") def other_wallet_2(wallets_pool: list[WalletInfo]) -> WalletInfo: if not wallets_pool: raise RuntimeError("[other_wallet2] No wallets in pool. Consider increasing WALLTETS_IN_POOL or review.") return wallets_pool.pop() @pytest.fixture() @allure.title("Select random node for testing") def node_under_test(cluster: Cluster) -> ClusterNode: selected_node = random.choice(cluster.cluster_nodes) reporter.attach(f"{selected_node}", "Selected node") return selected_node @allure.title("Init bucket container resolver") @pytest.fixture() def bucket_container_resolver(node_under_test: ClusterNode) -> BucketContainerResolver: resolver_cls = plugins.load_plugin("frostfs.testlib.bucket_cid_resolver", node_under_test.host.config.product) resolver: BucketContainerResolver = resolver_cls() return resolver @pytest.fixture def container( default_wallet: WalletInfo, frostfs_cli: FrostfsCli, client_shell: Shell, cluster: Cluster, request: pytest.FixtureRequest, rpc_endpoint: str, ) -> str: with reporter.step("Get container specification for test"): container_spec = _get_container_spec(request) with reporter.step("Create container"): cid = _create_container_by_spec(default_wallet, client_shell, cluster, rpc_endpoint, container_spec) # TODO: deprecate this. Use generic ContainerSpec.ape_rule param if container_spec.allow_owner_via_ape: with reporter.step("Allow owner via APE on container"): _allow_owner_via_ape(frostfs_cli, cluster, cid) with reporter.step("Apply APE rules for container"): if container_spec.ape_rules: _apply_ape_rules(frostfs_cli, cluster, cid, container_spec.ape_rules) return cid def _apply_ape_rules(frostfs_cli: FrostfsCli, cluster: Cluster, container: str, ape_rules: list[ape.Rule]): for ape_rule in ape_rules: rule_str = ape_rule.as_string() with reporter.step(f"Apply APE rule '{rule_str}' for container {container}"): frostfs_cli.ape_manager.add( cluster.default_rpc_endpoint, ape_rule.chain_id, target_name=container, target_type="container", rule=rule_str, ) with reporter.step("Wait for one block"): time.sleep(datetime_utils.parse_time(MORPH_BLOCK_TIME)) def _create_container_by_spec( default_wallet: WalletInfo, client_shell: Shell, cluster: Cluster, rpc_endpoint: str, container_spec: ContainerSpec ) -> str: # TODO: add container spec to step message with reporter.step("Create container"): cid = create_container( default_wallet, client_shell, rpc_endpoint, basic_acl=container_spec.basic_acl, rule=container_spec.parsed_rule(cluster) ) with reporter.step("Search nodes holding the container"): container_holder_nodes = search_nodes_with_container(default_wallet, cid, client_shell, cluster.default_rpc_endpoint, cluster) report_data = {node.id: node.host_ip for node in container_holder_nodes} reporter.attach(json.dumps(report_data, indent=2), "container_nodes.json") return cid def _get_container_spec(request: pytest.FixtureRequest) -> ContainerSpec: container_marker = request.node.get_closest_marker("container") # let default container to be public at the moment container_spec = ContainerSpec(basic_acl=PUBLIC_ACL) if container_marker: if len(container_marker.args) != 1: raise RuntimeError(f"Something wrong with container marker: {container_marker}") container_spec = container_marker.args[0] if "param" in request.__dict__: container_spec = request.param if not container_spec: raise RuntimeError( f"""Container specification is empty. Either add @pytest.mark.container(ContainerSpec(...)) or @pytest.mark.parametrize(\"container\", [ContainerSpec(...)], indirect=True) decorator""" ) return container_spec def _allow_owner_via_ape(frostfs_cli: FrostfsCli, cluster: Cluster, container: str): with reporter.step("Create allow APE rule for container owner"): role_condition = ape.Condition.by_role(ape.Role.OWNER) ape_rule = ape.Rule(ape.Verb.ALLOW, ape.ObjectOperations.WILDCARD_ALL, role_condition) frostfs_cli.ape_manager.add( cluster.default_rpc_endpoint, ape_rule.chain_id, target_name=container, target_type="container", rule=ape_rule.as_string(), ) with reporter.step("Wait for one block"): time.sleep(datetime_utils.parse_time(MORPH_BLOCK_TIME)) @pytest.fixture() def new_epoch(client_shell: Shell, cluster: Cluster) -> int: return ensure_fresh_epoch(client_shell, cluster)