forked from TrueCloudLab/frostfs-testcases
422 lines
15 KiB
Python
422 lines
15 KiB
Python
import logging
|
||
import os
|
||
import random
|
||
import shutil
|
||
from datetime import datetime, timedelta, timezone
|
||
from importlib.metadata import entry_points
|
||
from typing import Optional
|
||
|
||
import allure
|
||
import pytest
|
||
import yaml
|
||
from dateutil import parser
|
||
from frostfs_testlib import plugins, reporter
|
||
from frostfs_testlib.credentials.interfaces import CredentialsProvider
|
||
from frostfs_testlib.healthcheck.interfaces import Healthcheck
|
||
from frostfs_testlib.hosting import Hosting
|
||
from frostfs_testlib.reporter import AllureHandler, StepsLogger
|
||
from frostfs_testlib.resources.common import (
|
||
ASSETS_DIR,
|
||
COMPLEX_OBJECT_CHUNKS_COUNT,
|
||
COMPLEX_OBJECT_TAIL_SIZE,
|
||
DEFAULT_WALLET_PASS,
|
||
SIMPLE_OBJECT_SIZE,
|
||
)
|
||
from frostfs_testlib.s3 import AwsCliClient, Boto3ClientWrapper, S3ClientWrapper, VersioningStatus
|
||
from frostfs_testlib.shell import LocalShell, Shell
|
||
from frostfs_testlib.steps.cli.object import get_netmap_netinfo
|
||
from frostfs_testlib.steps.s3 import s3_helper
|
||
from frostfs_testlib.storage import get_service_registry
|
||
from frostfs_testlib.storage.cluster import Cluster, ClusterNode
|
||
from frostfs_testlib.storage.controllers.cluster_state_controller import ClusterStateController
|
||
from frostfs_testlib.storage.dataclasses.frostfs_services import StorageNode
|
||
from frostfs_testlib.storage.dataclasses.object_size import ObjectSize
|
||
from frostfs_testlib.storage.dataclasses.wallet import WalletFactory, WalletInfo
|
||
from frostfs_testlib.testing.cluster_test_base import ClusterTestBase
|
||
from frostfs_testlib.testing.parallel import parallel
|
||
from frostfs_testlib.testing.test_control import wait_for_success
|
||
from frostfs_testlib.utils import env_utils, version_utils
|
||
|
||
from pytest_tests.resources.common import HOSTING_CONFIG_FILE, TEST_CYCLES_COUNT
|
||
|
||
logger = logging.getLogger("NeoLogger")
|
||
|
||
SERVICE_ACTIVE_TIME = 20
|
||
|
||
# Add logs check test even if it's not fit to mark selectors
|
||
def pytest_configure(config: pytest.Config):
|
||
markers = config.option.markexpr
|
||
if markers != "":
|
||
config.option.markexpr = f"logs_after_session or ({markers})"
|
||
|
||
|
||
number_key = pytest.StashKey[str]()
|
||
start_time = pytest.StashKey[int]()
|
||
test_outcome = pytest.StashKey[str]()
|
||
# pytest hook. Do not rename
|
||
def pytest_collection_modifyitems(items: list[pytest.Item]):
|
||
# Make network tests last based on @pytest.mark.node_mgmt and logs_test to be latest
|
||
def priority(item: pytest.Item) -> int:
|
||
is_node_mgmt_test = 1 if item.get_closest_marker("node_mgmt") else 0
|
||
is_logs_check_test = 100 if item.get_closest_marker("logs_after_session") else 0
|
||
is_system_time_test = 10 if item.get_closest_marker("time") else 0
|
||
return is_node_mgmt_test + is_logs_check_test + is_system_time_test
|
||
|
||
items.sort(key=lambda item: priority(item))
|
||
|
||
|
||
# pytest hook. Do not rename
|
||
def pytest_collection_finish(session: pytest.Session):
|
||
items_total = len(session.items)
|
||
for number, item in enumerate(session.items, 1):
|
||
item.stash[number_key] = f"[{number}/{items_total}]"
|
||
item.stash[test_outcome] = ""
|
||
item.stash[start_time] = 0
|
||
|
||
|
||
# pytest hook. Do not rename
|
||
def pytest_runtest_setup(item: pytest.Item):
|
||
item.stash[start_time] = int(datetime.now().timestamp())
|
||
logger.info(f"STARTED {item.stash[number_key]}: {item.name}")
|
||
|
||
|
||
# pytest hook. Do not rename
|
||
def pytest_runtest_makereport(item: pytest.Item, call: pytest.CallInfo):
|
||
if call.excinfo is not None:
|
||
if call.excinfo.typename == "Skipped":
|
||
item.stash[start_time] = int(datetime.now().timestamp())
|
||
item.stash[test_outcome] += f"SKIPPED on {call.when}; "
|
||
else:
|
||
item.stash[test_outcome] += f"FAILED on {call.when}; "
|
||
|
||
if call.when == "teardown":
|
||
duration = int(datetime.now().timestamp()) - item.stash[start_time]
|
||
if not item.stash[test_outcome]:
|
||
outcome = "PASSED "
|
||
else:
|
||
outcome = item.stash[test_outcome]
|
||
logger.info(f"ENDED {item.stash[number_key]}: {item.name}: {outcome}(duration={duration}s)")
|
||
|
||
|
||
# pytest hook. Do not rename
|
||
def pytest_generate_tests(metafunc: pytest.Metafunc):
|
||
if (
|
||
TEST_CYCLES_COUNT <= 1
|
||
or metafunc.definition.get_closest_marker("logs_after_session")
|
||
or metafunc.definition.get_closest_marker("no_cycles")
|
||
):
|
||
return
|
||
|
||
metafunc.fixturenames.append("cycle")
|
||
metafunc.parametrize(
|
||
"cycle",
|
||
range(1, TEST_CYCLES_COUNT + 1),
|
||
ids=[f"cycle {cycle}" for cycle in range(1, TEST_CYCLES_COUNT + 1)],
|
||
)
|
||
|
||
|
||
@pytest.fixture(scope="session")
|
||
def configure_testlib():
|
||
reporter.get_reporter().register_handler(AllureHandler())
|
||
reporter.get_reporter().register_handler(StepsLogger())
|
||
|
||
logging.getLogger("paramiko").setLevel(logging.INFO)
|
||
|
||
# Register Services for cluster
|
||
registry = get_service_registry()
|
||
services = entry_points(group="frostfs.testlib.services")
|
||
for svc in services:
|
||
registry.register_service(svc.name, svc.load())
|
||
|
||
yield
|
||
|
||
|
||
@pytest.fixture(scope="session")
|
||
def client_shell(configure_testlib) -> Shell:
|
||
yield LocalShell()
|
||
|
||
|
||
@pytest.fixture(scope="session")
|
||
def hosting(configure_testlib) -> Hosting:
|
||
with open(HOSTING_CONFIG_FILE, "r") as file:
|
||
hosting_config = yaml.full_load(file)
|
||
|
||
hosting_instance = Hosting()
|
||
hosting_instance.configure(hosting_config)
|
||
|
||
yield hosting_instance
|
||
|
||
|
||
@pytest.fixture(scope="session")
|
||
def require_multiple_hosts(hosting: Hosting):
|
||
"""Designates tests that require environment with multiple hosts.
|
||
|
||
These tests will be skipped on an environment that has only 1 host.
|
||
"""
|
||
if len(hosting.hosts) <= 1:
|
||
pytest.skip("Test only works with multiple hosts")
|
||
yield
|
||
|
||
|
||
@pytest.fixture(scope="session")
|
||
def require_multiple_interfaces(cluster: Cluster):
|
||
"""
|
||
We determine that there are the required number of interfaces for tests
|
||
|
||
If there are no required interfaces, the tests will be skipped.
|
||
"""
|
||
interfaces = cluster.cluster_nodes[0].host.config.interfaces
|
||
if "internal1" not in interfaces or "data1" not in interfaces:
|
||
pytest.skip("This test requires multiple internal and data interfaces")
|
||
yield
|
||
|
||
|
||
@pytest.fixture(scope="session")
|
||
def max_object_size(cluster: Cluster, client_shell: Shell) -> int:
|
||
storage_node = cluster.storage_nodes[0]
|
||
net_info = get_netmap_netinfo(
|
||
wallet=storage_node.get_wallet_path(),
|
||
wallet_config=storage_node.get_wallet_config_path(),
|
||
endpoint=storage_node.get_rpc_endpoint(),
|
||
shell=client_shell,
|
||
)
|
||
yield net_info["maximum_object_size"]
|
||
|
||
|
||
@pytest.fixture(scope="session")
|
||
def simple_object_size(max_object_size: int) -> ObjectSize:
|
||
size = min(int(SIMPLE_OBJECT_SIZE), max_object_size)
|
||
return ObjectSize("simple", size)
|
||
|
||
|
||
@pytest.fixture(scope="session")
|
||
def complex_object_size(max_object_size: int) -> ObjectSize:
|
||
size = max_object_size * int(COMPLEX_OBJECT_CHUNKS_COUNT) + int(COMPLEX_OBJECT_TAIL_SIZE)
|
||
return ObjectSize("complex", size)
|
||
|
||
|
||
# By default we want all tests to be executed with both object sizes
|
||
# This can be overriden in choosen tests if needed
|
||
@pytest.fixture(
|
||
scope="session",
|
||
params=[pytest.param("simple", marks=pytest.mark.simple), pytest.param("complex", marks=pytest.mark.complex)],
|
||
)
|
||
def object_size(
|
||
simple_object_size: ObjectSize, complex_object_size: ObjectSize, request: pytest.FixtureRequest
|
||
) -> ObjectSize:
|
||
if request.param == "simple":
|
||
return simple_object_size
|
||
|
||
return complex_object_size
|
||
|
||
|
||
@pytest.fixture(scope="session")
|
||
def wallet_factory(temp_directory: str, client_shell: Shell, cluster: Cluster) -> WalletFactory:
|
||
return WalletFactory(temp_directory, client_shell, cluster)
|
||
|
||
|
||
@pytest.fixture(scope="session")
|
||
def cluster(temp_directory: str, hosting: Hosting, client_shell: Shell) -> Cluster:
|
||
cluster = Cluster(hosting)
|
||
if cluster.is_local_devenv():
|
||
cluster.create_wallet_configs(hosting)
|
||
|
||
ClusterTestBase.shell = client_shell
|
||
ClusterTestBase.cluster = cluster
|
||
|
||
yield cluster
|
||
|
||
|
||
@reporter.step("[Class]: Provide S3 policy")
|
||
@pytest.fixture(scope="class")
|
||
def s3_policy(request: pytest.FixtureRequest):
|
||
policy = None
|
||
if "param" in request.__dict__:
|
||
policy = request.param
|
||
|
||
return policy
|
||
|
||
|
||
@pytest.fixture(scope="session")
|
||
@allure.title("[Session] Create healthcheck object")
|
||
def healthcheck(cluster: Cluster) -> Healthcheck:
|
||
healthcheck_cls = plugins.load_plugin(
|
||
"frostfs.testlib.healthcheck", cluster.cluster_nodes[0].host.config.healthcheck_plugin_name
|
||
)
|
||
|
||
return healthcheck_cls()
|
||
|
||
|
||
@pytest.fixture(scope="session")
|
||
def cluster_state_controller(client_shell: Shell, cluster: Cluster, healthcheck: Healthcheck) -> ClusterStateController:
|
||
controller = ClusterStateController(client_shell, cluster, healthcheck)
|
||
yield controller
|
||
|
||
|
||
@reporter.step("[Class]: Create S3 client")
|
||
@pytest.fixture(
|
||
scope="class",
|
||
params=[
|
||
pytest.param(AwsCliClient, marks=pytest.mark.aws),
|
||
pytest.param(Boto3ClientWrapper, marks=pytest.mark.boto3),
|
||
],
|
||
)
|
||
def s3_client(
|
||
default_wallet: str,
|
||
client_shell: Shell,
|
||
s3_policy: Optional[str],
|
||
cluster: Cluster,
|
||
request: pytest.FixtureRequest,
|
||
) -> S3ClientWrapper:
|
||
wallet = WalletInfo(path=default_wallet, password=DEFAULT_WALLET_PASS)
|
||
node = cluster.cluster_nodes[0]
|
||
|
||
credentials_provider = CredentialsProvider(node.host.config.s3_creds_plugin_name)
|
||
credentials_provider.stash["cluster"] = cluster
|
||
credentials_provider.stash["wallet"] = wallet
|
||
credentials_provider.stash["shell"] = client_shell
|
||
credentials_provider.stash["location_constraints"] = s3_policy
|
||
|
||
access_key_id, secret_access_key = credentials_provider.S3.provide(node)
|
||
|
||
s3_client_cls = request.param
|
||
client = s3_client_cls(access_key_id, secret_access_key, cluster.default_s3_gate_endpoint)
|
||
return client
|
||
|
||
|
||
@pytest.fixture
|
||
def versioning_status(request: pytest.FixtureRequest) -> VersioningStatus:
|
||
if "param" in request.__dict__:
|
||
return request.param
|
||
|
||
return VersioningStatus.UNDEFINED
|
||
|
||
|
||
@reporter.step("Create/delete bucket")
|
||
@pytest.fixture
|
||
def bucket(s3_client: S3ClientWrapper, versioning_status: VersioningStatus, request: pytest.FixtureRequest):
|
||
|
||
bucket_name = s3_client.create_bucket()
|
||
|
||
if versioning_status:
|
||
s3_helper.set_bucket_versioning(s3_client, bucket_name, versioning_status)
|
||
|
||
yield bucket_name
|
||
|
||
if "sanity" not in request.config.option.markexpr:
|
||
s3_helper.delete_bucket_with_objects(s3_client, bucket_name)
|
||
|
||
|
||
@reporter.step("Create two buckets")
|
||
@pytest.fixture
|
||
def two_buckets(s3_client: S3ClientWrapper, request: pytest.FixtureRequest):
|
||
bucket_1 = s3_client.create_bucket()
|
||
bucket_2 = s3_client.create_bucket()
|
||
yield bucket_1, bucket_2
|
||
|
||
if "sanity" not in request.config.option.markexpr:
|
||
for bucket_name in [bucket_1, bucket_2]:
|
||
s3_helper.delete_bucket_with_objects(s3_client, bucket_name)
|
||
|
||
|
||
@reporter.step("[Autouse/Session] Check binary versions")
|
||
@pytest.fixture(scope="session", autouse=True)
|
||
def check_binary_versions(hosting: Hosting, client_shell: Shell, request: pytest.FixtureRequest):
|
||
local_versions = version_utils.get_local_binaries_versions(client_shell)
|
||
remote_versions, exсeptions_remote_binaries_versions = version_utils.get_remote_binaries_versions(hosting)
|
||
|
||
all_versions = {
|
||
**local_versions,
|
||
**{binary_name: binary["version"] for binary_name, binary in remote_versions.items()},
|
||
}
|
||
|
||
environment_dir = request.config.getoption("--alluredir")
|
||
if not environment_dir:
|
||
return None
|
||
|
||
file_path = f"{environment_dir}/environment.properties"
|
||
env_utils.save_env_properties(file_path, all_versions)
|
||
|
||
|
||
@reporter.step("Prepare tmp directory")
|
||
@pytest.fixture(scope="session")
|
||
def temp_directory(configure_testlib):
|
||
with reporter.step("Prepare tmp directory"):
|
||
full_path = os.path.join(os.getcwd(), ASSETS_DIR)
|
||
shutil.rmtree(full_path, ignore_errors=True)
|
||
os.mkdir(full_path)
|
||
|
||
yield full_path
|
||
|
||
with reporter.step("Remove tmp directory"):
|
||
shutil.rmtree(full_path)
|
||
|
||
|
||
@reporter.step("[Autouse/Session] Test session start time")
|
||
@pytest.fixture(scope="session", autouse=True)
|
||
def session_start_time(configure_testlib):
|
||
start_time = datetime.utcnow()
|
||
return start_time
|
||
|
||
|
||
@allure.title("[Autouse/Session] After deploy healthcheck")
|
||
@pytest.fixture(scope="session", autouse=True)
|
||
def after_deploy_healthcheck(cluster: Cluster):
|
||
with reporter.step("Wait for cluster readiness after deploy"):
|
||
parallel(readiness_on_node, cluster.cluster_nodes)
|
||
|
||
|
||
@wait_for_success(60 * SERVICE_ACTIVE_TIME * 3, 60, title="Wait for {cluster_node} readiness")
|
||
def readiness_on_node(cluster_node: ClusterNode):
|
||
if (
|
||
"skip_readiness_check" in cluster_node.host.config.attributes
|
||
and cluster_node.host.config.attributes["skip_readiness_check"]
|
||
):
|
||
return
|
||
|
||
# TODO: Move to healtcheck classes
|
||
svc_name = cluster_node.service(StorageNode).get_service_systemctl_name()
|
||
with reporter.step(f"Check service {svc_name} is active"):
|
||
result = cluster_node.host.get_shell().exec(f"systemctl is-active {svc_name}")
|
||
assert "active" == result.stdout.strip(), f"Service {svc_name} should be in active state"
|
||
|
||
with reporter.step(f"Check service {svc_name} is active more than {SERVICE_ACTIVE_TIME} minutes"):
|
||
result = cluster_node.host.get_shell().exec(
|
||
f"systemctl show {svc_name} --property ActiveEnterTimestamp | cut -d '=' -f 2"
|
||
)
|
||
start_time = parser.parse(result.stdout.strip())
|
||
current_time = datetime.now(tz=timezone.utc)
|
||
active_time = current_time - start_time
|
||
|
||
active_minutes = active_time.seconds // 60
|
||
active_seconds = active_time.seconds - active_minutes * 60
|
||
|
||
assert active_time > timedelta(
|
||
minutes=SERVICE_ACTIVE_TIME
|
||
), f"Service should be in active state more than {SERVICE_ACTIVE_TIME} minutes, current {active_minutes}m:{active_seconds}s"
|
||
|
||
|
||
@allure.title("[Autouse/Test] Run health check for all nodes")
|
||
@pytest.fixture(autouse=True)
|
||
def run_health_check(healthcheck: Healthcheck, cluster: Cluster, request: pytest.FixtureRequest):
|
||
if request.node.get_closest_marker("no_healthcheck"):
|
||
# Skip healthcheck for tests marked with no_healthcheck
|
||
return
|
||
parallel(healthcheck.storage_healthcheck, cluster.cluster_nodes)
|
||
|
||
|
||
@reporter.step("Prepare wallet and deposit")
|
||
@pytest.fixture(scope="session")
|
||
def default_wallet(wallet_factory: WalletFactory) -> str:
|
||
wallet = wallet_factory.create_wallet(password=DEFAULT_WALLET_PASS)
|
||
reporter.attach(wallet.path, os.path.basename(wallet.path))
|
||
return wallet.path
|
||
|
||
|
||
@pytest.fixture()
|
||
@allure.title("Select random node for testing")
|
||
def node_under_test(cluster: Cluster) -> ClusterNode:
|
||
selected_node = random.choice(cluster.cluster_nodes)
|
||
reporter.attach(f"{selected_node}", "Selected node")
|
||
return selected_node
|