frostfs-testcases/pytest_tests/testsuites/conftest.py
Andrey Berezin cd06a073a2 [#129] Updates for failover
Signed-off-by: Andrey Berezin <a.berezin@yadro.com>
(cherry picked from commit ae57672c7d)
2023-11-14 10:31:35 +03:00

369 lines
13 KiB
Python

import logging
import os
import shutil
from datetime import datetime, timedelta, timezone
from typing import Optional
import allure
import pytest
import yaml
from dateutil import parser
from frostfs_testlib.healthcheck.interfaces import Healthcheck
from frostfs_testlib.hosting import Hosting
from frostfs_testlib.plugins import load_plugin
from frostfs_testlib.reporter import AllureHandler, get_reporter
from frostfs_testlib.resources.common import (
ASSETS_DIR,
COMPLEX_OBJECT_CHUNKS_COUNT,
COMPLEX_OBJECT_TAIL_SIZE,
DEFAULT_WALLET_PASS,
SIMPLE_OBJECT_SIZE,
)
from frostfs_testlib.s3.interfaces import S3ClientWrapper, VersioningStatus
from frostfs_testlib.shell import LocalShell, Shell
from frostfs_testlib.steps.cli.container import list_containers
from frostfs_testlib.steps.cli.object import get_netmap_netinfo
from frostfs_testlib.steps.node_management import storage_node_healthcheck
from frostfs_testlib.steps.s3 import s3_helper
from frostfs_testlib.storage.cluster import Cluster, ClusterNode
from frostfs_testlib.storage.controllers.cluster_state_controller import ClusterStateController
from frostfs_testlib.storage.dataclasses.frostfs_services import StorageNode
from frostfs_testlib.storage.dataclasses.object_size import ObjectSize
from frostfs_testlib.storage.dataclasses.wallet import WalletFactory, WalletInfo
from frostfs_testlib.testing.cluster_test_base import ClusterTestBase
from frostfs_testlib.testing.parallel import parallel
from frostfs_testlib.testing.test_control import wait_for_success
from frostfs_testlib.utils import env_utils, version_utils
from pytest_tests.resources.common import HOSTING_CONFIG_FILE, TEST_CYCLES_COUNT
logger = logging.getLogger("NeoLogger")
# Add logs check test even if it's not fit to mark selectors
def pytest_configure(config: pytest.Config):
markers = config.option.markexpr
if markers != "":
config.option.markexpr = f"logs_after_session or ({markers})"
# pytest hook. Do not rename
def pytest_collection_modifyitems(items: list[pytest.Item]):
# Make network tests last based on @pytest.mark.node_mgmt and logs_test to be latest
def priority(item: pytest.Item) -> int:
is_node_mgmt_test = 1 if item.get_closest_marker("node_mgmt") else 0
is_logs_check_test = 100 if item.get_closest_marker("logs_after_session") else 0
return is_node_mgmt_test + is_logs_check_test
items.sort(key=lambda item: priority(item))
def pytest_generate_tests(metafunc: pytest.Metafunc):
if (
TEST_CYCLES_COUNT <= 1
or metafunc.definition.get_closest_marker("logs_after_session")
or metafunc.definition.get_closest_marker("no_cycles")
):
return
metafunc.fixturenames.append("cycle")
metafunc.parametrize(
"cycle",
range(1, TEST_CYCLES_COUNT + 1),
ids=[f"cycle {cycle}" for cycle in range(1, TEST_CYCLES_COUNT + 1)],
)
@pytest.fixture(scope="session")
def configure_testlib():
get_reporter().register_handler(AllureHandler())
logging.getLogger("paramiko").setLevel(logging.INFO)
yield
@pytest.fixture(scope="session")
def client_shell(configure_testlib) -> Shell:
yield LocalShell()
@pytest.fixture(scope="session")
def hosting(configure_testlib) -> Hosting:
with open(HOSTING_CONFIG_FILE, "r") as file:
hosting_config = yaml.full_load(file)
hosting_instance = Hosting()
hosting_instance.configure(hosting_config)
yield hosting_instance
@pytest.fixture(scope="session")
def require_multiple_hosts(hosting: Hosting):
"""Designates tests that require environment with multiple hosts.
These tests will be skipped on an environment that has only 1 host.
"""
if len(hosting.hosts) <= 1:
pytest.skip("Test only works with multiple hosts")
yield
@pytest.fixture(scope="session")
def require_multiple_interfaces(cluster: Cluster):
"""
We determine that there are the required number of interfaces for tests
If there are no required interfaces, the tests will be skipped.
"""
interfaces = cluster.cluster_nodes[0].host.config.interfaces
if "internal1" not in interfaces or "data1" not in interfaces:
pytest.skip("This test requires multiple internal and data interfaces")
yield
@pytest.fixture(scope="session")
def max_object_size(cluster: Cluster, client_shell: Shell) -> int:
storage_node = cluster.storage_nodes[0]
net_info = get_netmap_netinfo(
wallet=storage_node.get_wallet_path(),
wallet_config=storage_node.get_wallet_config_path(),
endpoint=storage_node.get_rpc_endpoint(),
shell=client_shell,
)
yield net_info["maximum_object_size"]
@pytest.fixture(scope="session")
def simple_object_size(max_object_size: int) -> ObjectSize:
size = min(int(SIMPLE_OBJECT_SIZE), max_object_size)
return ObjectSize("simple", size)
@pytest.fixture(scope="session")
def complex_object_size(max_object_size: int) -> ObjectSize:
size = max_object_size * int(COMPLEX_OBJECT_CHUNKS_COUNT) + int(COMPLEX_OBJECT_TAIL_SIZE)
return ObjectSize("complex", size)
# By default we want all tests to be executed with both object sizes
# This can be overriden in choosen tests if needed
@pytest.fixture(scope="session", params=["simple", "complex"])
def object_size(
simple_object_size: ObjectSize, complex_object_size: ObjectSize, request: pytest.FixtureRequest
) -> ObjectSize:
if request.param == "simple":
return simple_object_size
return complex_object_size
@pytest.fixture(scope="session")
def wallet_factory(temp_directory: str, client_shell: Shell, cluster: Cluster) -> WalletFactory:
return WalletFactory(temp_directory, client_shell, cluster)
@pytest.fixture(scope="session")
def cluster(temp_directory: str, hosting: Hosting, client_shell: Shell) -> Cluster:
cluster = Cluster(hosting)
if cluster.is_local_devenv():
cluster.create_wallet_configs(hosting)
ClusterTestBase.shell = client_shell
ClusterTestBase.cluster = cluster
yield cluster
@allure.step("[Class]: Provide S3 policy")
@pytest.fixture(scope="class")
def s3_policy(request: pytest.FixtureRequest):
policy = None
if "param" in request.__dict__:
policy = request.param
return policy
@pytest.fixture(scope="session")
@allure.title("[Session] Create healthcheck object")
def healthcheck(cluster: Cluster) -> Healthcheck:
healthcheck_cls = load_plugin(
"frostfs.testlib.healthcheck", cluster.cluster_nodes[0].host.config.healthcheck_plugin_name
)
return healthcheck_cls()
@pytest.fixture(scope="session")
def cluster_state_controller(client_shell: Shell, cluster: Cluster, healthcheck: Healthcheck) -> ClusterStateController:
controller = ClusterStateController(client_shell, cluster, healthcheck)
yield controller
@allure.step("[Class]: Create S3 client")
@pytest.fixture(scope="class")
def s3_client(
default_wallet: str,
client_shell: Shell,
s3_policy: Optional[str],
cluster: Cluster,
auth_container_placement_policy: str,
request: pytest.FixtureRequest,
) -> S3ClientWrapper:
wallet = WalletInfo(path=default_wallet, password=DEFAULT_WALLET_PASS)
(cid, access_key_id, secret_access_key) = s3_helper.init_s3_credentials(
wallet,
client_shell,
cluster,
s3gates=[cluster_node.s3_gate for cluster_node in cluster.cluster_nodes],
policy=s3_policy,
container_placement_policy=auth_container_placement_policy,
)
containers_list = list_containers(wallet.path, shell=client_shell, endpoint=cluster.default_rpc_endpoint)
assert cid in containers_list, f"Expected cid {cid} in {containers_list}"
s3_client_cls = request.param
client = s3_client_cls(access_key_id, secret_access_key, cluster.default_s3_gate_endpoint)
yield client
@pytest.fixture
def versioning_status(request: pytest.FixtureRequest) -> VersioningStatus:
if "param" in request.__dict__:
return request.param
return VersioningStatus.UNDEFINED
@allure.step("Create/delete bucket")
@pytest.fixture
def bucket(s3_client: S3ClientWrapper, versioning_status: VersioningStatus):
bucket_name = s3_client.create_bucket()
if versioning_status:
s3_helper.set_bucket_versioning(s3_client, bucket_name, versioning_status)
yield bucket_name
s3_helper.delete_bucket_with_objects(s3_client, bucket_name)
@allure.step("Create two buckets")
@pytest.fixture
def two_buckets(s3_client: S3ClientWrapper):
bucket_1 = s3_client.create_bucket()
bucket_2 = s3_client.create_bucket()
yield bucket_1, bucket_2
for bucket_name in [bucket_1, bucket_2]:
s3_helper.delete_bucket_with_objects(s3_client, bucket_name)
@allure.step("[Autouse/Session] Check binary versions")
@pytest.fixture(scope="session", autouse=True)
def check_binary_versions(hosting: Hosting, client_shell: Shell, request: pytest.FixtureRequest):
local_versions = version_utils.get_local_binaries_versions(client_shell)
remote_versions = version_utils.get_remote_binaries_versions(hosting)
all_versions = {**local_versions, **remote_versions}
environment_dir = request.config.getoption("--alluredir")
if not environment_dir:
return None
file_path = f"{environment_dir}/environment.properties"
env_utils.save_env_properties(file_path, all_versions)
@allure.step("Prepare tmp directory")
@pytest.fixture(scope="session")
def temp_directory():
with allure.step("Prepare tmp directory"):
full_path = os.path.join(os.getcwd(), ASSETS_DIR)
shutil.rmtree(full_path, ignore_errors=True)
os.mkdir(full_path)
yield full_path
with allure.step("Remove tmp directory"):
shutil.rmtree(full_path)
@allure.step("[Autouse/Session] Test session start time")
@pytest.fixture(scope="session", autouse=True)
def session_start_time():
start_time = datetime.utcnow()
return start_time
@allure.title("[Autouse/Session] After deploy healthcheck")
@pytest.fixture(scope="session", autouse=True)
def after_deploy_healthcheck(cluster: Cluster):
with allure.step("Wait for cluster readiness after deploy"):
parallel(readiness_on_node, cluster.cluster_nodes)
SERVICE_ACTIVE_TIME = 20
@wait_for_success(60 * SERVICE_ACTIVE_TIME * 3, 60)
@allure.step("Check readiness on node {cluster_node}")
def readiness_on_node(cluster_node: ClusterNode):
# TODO: Move to healtcheck classes
svc_name = cluster_node.service(StorageNode).get_service_systemctl_name()
with allure.step(f"Check service {svc_name} is active"):
result = cluster_node.host.get_shell().exec(f"systemctl is-active {svc_name}")
assert "active" == result.stdout.strip(), f"Service {svc_name} should be in active state"
with allure.step(f"Check service {svc_name} is active more than {SERVICE_ACTIVE_TIME} minutes"):
result = cluster_node.host.get_shell().exec(
f"systemctl show {svc_name} --property ActiveEnterTimestamp | cut -d '=' -f 2"
)
start_time = parser.parse(result.stdout.strip())
current_time = datetime.now(tz=timezone.utc)
active_time = current_time - start_time
active_minutes = active_time.seconds // 60
active_seconds = active_time.seconds - active_minutes * 60
assert active_time > timedelta(
minutes=SERVICE_ACTIVE_TIME
), f"Service should be in active state more than {SERVICE_ACTIVE_TIME} minutes, current {active_minutes}m:{active_seconds}s"
@allure.title("[Autouse/Test] Run health check for all nodes")
@pytest.fixture(autouse=True)
def run_health_check(cluster: Cluster, request: pytest.FixtureRequest):
if request.node.get_closest_marker("no_healthcheck"):
# Skip healthcheck for tests marked with no_healthcheck
return
parallel(healthcheck_on_node, cluster.cluster_nodes)
@allure.title("Run health check for {cluster_node}")
def healthcheck_on_node(cluster_node: ClusterNode):
health_check = storage_node_healthcheck(cluster_node.storage_node)
assert (
health_check.health_status == "READY" and health_check.network_status == "ONLINE"
), f"Node {cluster_node} is not healthy"
@allure.step("Prepare wallet and deposit")
@pytest.fixture(scope="session")
def default_wallet(wallet_factory: WalletFactory) -> str:
wallet = wallet_factory.create_wallet(password=DEFAULT_WALLET_PASS)
allure.attach.file(wallet.path, os.path.basename(wallet.path), allure.attachment_type.JSON)
return wallet.path
@allure.step("[Class]: Container placement policy for keys")
@pytest.fixture(scope="class")
def auth_container_placement_policy(cluster: Cluster, request: pytest.FixtureRequest):
placeholders = {
"$ALPHABET_NODE_COUNT$": 4 if len(cluster.cluster_nodes) < 8 else 8,
"$NODE_COUNT$": len(cluster.cluster_nodes),
}
placement_policy = None
if "param" in request.__dict__:
placement_policy = request.param
for key, value in placeholders.items():
placement_policy = placement_policy.replace(key, str(value))
return placement_policy