frostfs-testcases/pytest_tests/testsuites/conftest.py

602 lines
21 KiB
Python
Raw Normal View History

import logging
import random
from datetime import datetime, timedelta, timezone
from typing import Optional
import allure
import pytest
from dateutil import parser
from frostfs_testlib import plugins, reporter
from frostfs_testlib.cli import FrostfsCli
from frostfs_testlib.clients import AwsCliClient, Boto3ClientWrapper, S3ClientWrapper, S3HttpClient
from frostfs_testlib.clients.s3 import BucketContainerResolver, VersioningStatus
from frostfs_testlib.credentials.interfaces import CredentialsProvider, User
from frostfs_testlib.healthcheck.interfaces import Healthcheck
from frostfs_testlib.hosting import Hosting
from frostfs_testlib.resources import optionals
from frostfs_testlib.resources.common import COMPLEX_OBJECT_CHUNKS_COUNT, COMPLEX_OBJECT_TAIL_SIZE, SIMPLE_OBJECT_SIZE
from frostfs_testlib.shell import LocalShell, Shell
from frostfs_testlib.steps import s3_helper
from frostfs_testlib.steps.cli.container import DEFAULT_EC_PLACEMENT_RULE, DEFAULT_PLACEMENT_RULE, FROSTFS_CLI_EXEC
from frostfs_testlib.steps.cli.object import get_netmap_netinfo
from frostfs_testlib.steps.epoch import ensure_fresh_epoch
from frostfs_testlib.storage.cluster import Cluster, ClusterNode
from frostfs_testlib.storage.controllers.cluster_state_controller import ClusterStateController
from frostfs_testlib.storage.dataclasses.frostfs_services import StorageNode
from frostfs_testlib.storage.dataclasses.object_size import ObjectSize
from frostfs_testlib.storage.dataclasses.policy import PlacementPolicy
from frostfs_testlib.storage.dataclasses.wallet import WalletInfo
from frostfs_testlib.storage.grpc_operations.client_wrappers import CliClientWrapper
from frostfs_testlib.storage.grpc_operations.interfaces import GrpcClientWrapper
from frostfs_testlib.testing.cluster_test_base import ClusterTestBase
from frostfs_testlib.testing.parallel import parallel
from frostfs_testlib.testing.test_control import cached_fixture, run_optionally, wait_for_success
from frostfs_testlib.utils import env_utils, string_utils, version_utils
from frostfs_testlib.utils.file_utils import TestFile, generate_file
from ..helpers.container_creation import create_container_with_ape, create_containers_with_ape
from ..helpers.container_request import EVERYONE_ALLOW_ALL, ContainerRequest, MultipleContainersRequest
from ..resources.common import TEST_CYCLES_COUNT
logger = logging.getLogger("NeoLogger")
SERVICE_ACTIVE_TIME = 20
WALLTETS_IN_POOL = 2
# Add logs check test even if it's not fit to mark selectors
def pytest_configure(config: pytest.Config):
markers = config.option.markexpr
if markers != "" and "sanity" not in markers:
config.option.markexpr = f"logs_after_session or ({markers})"
number_key = pytest.StashKey[str]()
start_time = pytest.StashKey[int]()
test_outcome = pytest.StashKey[str]()
# pytest hook. Do not rename
def pytest_collection_modifyitems(items: list[pytest.Item]):
# Change order of tests based on @pytest.mark.order(<int>) marker
def order(item: pytest.Item) -> int:
order_marker = item.get_closest_marker("order")
if order_marker and (len(order_marker.args) != 1 or not isinstance(order_marker.args[0], int)):
raise RuntimeError("Incorrect usage of pytest.mark.order")
order_value = order_marker.args[0] if order_marker else 0
return order_value
items.sort(key=lambda item: order(item))
# pytest hook. Do not rename
def pytest_collection_finish(session: pytest.Session):
items_total = len(session.items)
for number, item in enumerate(session.items, 1):
item.stash[number_key] = f"[{number}/{items_total}]"
item.stash[test_outcome] = ""
item.stash[start_time] = 0
# pytest hook. Do not rename
def pytest_runtest_setup(item: pytest.Item):
item.stash[start_time] = int(datetime.now().timestamp())
logger.info(f"STARTED {item.stash[number_key]}: {item.name}")
# pytest hook. Do not rename
def pytest_runtest_makereport(item: pytest.Item, call: pytest.CallInfo):
if call.excinfo is not None:
if call.excinfo.typename == "Skipped":
item.stash[start_time] = int(datetime.now().timestamp())
item.stash[test_outcome] += f"SKIPPED on {call.when}; "
else:
item.stash[test_outcome] += f"FAILED on {call.when}; "
if call.when == "teardown":
duration = int(datetime.now().timestamp()) - item.stash[start_time]
if not item.stash[test_outcome]:
outcome = "PASSED "
else:
outcome = item.stash[test_outcome]
logger.info(f"ENDED {item.stash[number_key]}: {item.name}: {outcome}(duration={duration}s)")
# pytest hook. Do not rename
def pytest_generate_tests(metafunc: pytest.Metafunc):
if (
TEST_CYCLES_COUNT <= 1
or metafunc.definition.get_closest_marker("logs_after_session")
or metafunc.definition.get_closest_marker("no_cycles")
):
return
metafunc.fixturenames.append("cycle")
metafunc.parametrize("cycle", range(1, TEST_CYCLES_COUNT + 1), ids=[f"cycle {cycle}" for cycle in range(1, TEST_CYCLES_COUNT + 1)])
@pytest.fixture(scope="session")
def client_shell(configure_testlib) -> Shell:
yield LocalShell()
@pytest.fixture(scope="session")
def require_multiple_hosts(hosting: Hosting):
"""Designates tests that require environment with multiple hosts.
These tests will be skipped on an environment that has only 1 host.
"""
if len(hosting.hosts) <= 1:
pytest.skip("Test only works with multiple hosts")
yield
@pytest.fixture(scope="session")
def require_multiple_interfaces(cluster: Cluster):
"""
We determine that there are the required number of interfaces for tests
If there are no required interfaces, the tests will be skipped.
"""
interfaces = cluster.cluster_nodes[0].host.config.interfaces
if "internal1" not in interfaces or "data1" not in interfaces:
pytest.skip("This test requires multiple internal and data interfaces")
return
@pytest.fixture(scope="session")
@cached_fixture(optionals.OPTIONAL_CACHE_FIXTURES)
def max_object_size(cluster: Cluster, client_shell: Shell) -> int:
storage_node = cluster.storage_nodes[0]
wallet = WalletInfo.from_node(storage_node)
net_info = get_netmap_netinfo(wallet=wallet, endpoint=storage_node.get_rpc_endpoint(), shell=client_shell)
return net_info["maximum_object_size"]
@pytest.fixture(scope="session")
2023-09-08 10:35:34 +00:00
def simple_object_size(max_object_size: int) -> ObjectSize:
size = min(int(SIMPLE_OBJECT_SIZE), max_object_size)
return ObjectSize("simple", size)
@pytest.fixture(scope="session")
2023-09-08 10:35:34 +00:00
def complex_object_size(max_object_size: int) -> ObjectSize:
size = max_object_size * int(COMPLEX_OBJECT_CHUNKS_COUNT) + int(COMPLEX_OBJECT_TAIL_SIZE)
return ObjectSize("complex", size)
2023-09-08 10:35:34 +00:00
# By default we want all tests to be executed with both object sizes
# This can be overriden in choosen tests if needed
@pytest.fixture(
scope="session", params=[pytest.param("simple", marks=pytest.mark.simple), pytest.param("complex", marks=pytest.mark.complex)]
)
def object_size(simple_object_size: ObjectSize, complex_object_size: ObjectSize, request: pytest.FixtureRequest) -> ObjectSize:
2023-09-08 10:35:34 +00:00
if request.param == "simple":
return simple_object_size
return complex_object_size
2024-05-23 07:33:20 +00:00
@pytest.fixture()
def test_file(object_size: ObjectSize) -> TestFile:
return generate_file(object_size.value)
@pytest.fixture(scope="module")
def test_file_module(object_size: ObjectSize) -> TestFile:
return generate_file(object_size.value)
# Deprecated. Please migrate all to test_file
@pytest.fixture()
def file_path(test_file: TestFile) -> TestFile:
return test_file
@pytest.fixture(scope="session")
def rep_placement_policy() -> PlacementPolicy:
return PlacementPolicy("rep", DEFAULT_PLACEMENT_RULE)
@pytest.fixture(scope="session")
def ec_placement_policy() -> PlacementPolicy:
return PlacementPolicy("ec", DEFAULT_EC_PLACEMENT_RULE)
@pytest.fixture(scope="session")
@allure.title("Init Frostfs CLI")
def frostfs_cli(client_shell: Shell, wallet: WalletInfo) -> FrostfsCli:
return FrostfsCli(client_shell, FROSTFS_CLI_EXEC, wallet.config_path)
@pytest.fixture(scope="session")
@allure.title("Init GrpcClientWrapper with local Frostfs CLI")
def grpc_client(frostfs_cli: FrostfsCli) -> GrpcClientWrapper:
return CliClientWrapper(frostfs_cli)
# By default we want all tests to be executed with both storage policies.
# This can be overriden in choosen tests if needed.
@pytest.fixture(scope="session", params=[pytest.param("rep", marks=pytest.mark.rep), pytest.param("ec", marks=pytest.mark.ec)])
def placement_policy(
rep_placement_policy: PlacementPolicy, ec_placement_policy: PlacementPolicy, request: pytest.FixtureRequest
) -> PlacementPolicy:
if request.param == "rep":
return rep_placement_policy
elif request.param == "ec":
return ec_placement_policy
return request.param
2023-09-08 10:35:34 +00:00
2024-05-23 07:33:20 +00:00
@pytest.fixture(scope="session")
def cluster(temp_directory: str, hosting: Hosting, client_shell: Shell) -> Cluster:
cluster = Cluster(hosting)
if cluster.is_local_devenv():
cluster.create_wallet_configs(hosting)
ClusterTestBase.shell = client_shell
ClusterTestBase.cluster = cluster
yield cluster
@allure.title("[Session]: Provide S3 policy")
@pytest.fixture(scope="session")
def s3_policy(request: pytest.FixtureRequest):
policy = None
if "param" in request.__dict__:
policy = request.param
return policy
@pytest.fixture(scope="session")
@allure.title("[Session] Create healthcheck object")
def healthcheck(cluster: Cluster) -> Healthcheck:
healthcheck_cls = plugins.load_plugin("frostfs.testlib.healthcheck", cluster.cluster_nodes[0].host.config.healthcheck_plugin_name)
return healthcheck_cls()
@pytest.fixture(scope="session")
def cluster_state_controller_session(client_shell: Shell, cluster: Cluster, healthcheck: Healthcheck) -> ClusterStateController:
controller = ClusterStateController(client_shell, cluster, healthcheck)
return controller
@pytest.fixture
def cluster_state_controller(cluster_state_controller_session: ClusterStateController) -> ClusterStateController:
yield cluster_state_controller_session
cluster_state_controller_session.start_stopped_hosts()
cluster_state_controller_session.start_all_stopped_services()
@pytest.fixture(scope="session")
def credentials_provider(cluster: Cluster) -> CredentialsProvider:
return CredentialsProvider(cluster)
@allure.title("[Session]: Create S3 client")
2023-10-31 14:51:09 +00:00
@pytest.fixture(
scope="session",
2023-10-31 14:51:09 +00:00
params=[
pytest.param(AwsCliClient, marks=[pytest.mark.aws, pytest.mark.weekly]),
pytest.param(Boto3ClientWrapper, marks=[pytest.mark.boto3, pytest.mark.nightly]),
2023-10-31 14:51:09 +00:00
],
)
def s3_client(
user: User,
s3_policy: Optional[str],
cluster: Cluster,
request: pytest.FixtureRequest,
credentials_provider: CredentialsProvider,
) -> S3ClientWrapper:
node = cluster.cluster_nodes[0]
credentials_provider.S3.provide(user, node, s3_policy)
s3_client_cls = request.param
client = s3_client_cls(user.s3_credentials.access_key, user.s3_credentials.secret_key, cluster.default_s3_gate_endpoint)
return client
@allure.title("[Session] Create S3 http client")
@pytest.fixture(scope="session")
def s3_http_client(
default_user: User, s3_policy: Optional[str], cluster: Cluster, credentials_provider: CredentialsProvider
) -> S3HttpClient:
node = cluster.cluster_nodes[0]
credentials_provider.S3.provide(default_user, node, s3_policy)
return S3HttpClient(
cluster.default_s3_gate_endpoint,
default_user.s3_credentials.access_key,
default_user.s3_credentials.secret_key,
)
@pytest.fixture
def versioning_status(request: pytest.FixtureRequest) -> VersioningStatus:
if "param" in request.__dict__:
return request.param
return VersioningStatus.UNDEFINED
@allure.title("[Session] Bulk create buckets for tests")
@pytest.fixture(scope="session")
def buckets_pool(s3_client: S3ClientWrapper, request: pytest.FixtureRequest):
test_buckets: list = []
s3_client_type = type(s3_client).__name__
for test in request.session.items:
if s3_client_type not in test.name:
continue
if "bucket" in test.fixturenames:
test_buckets.append(string_utils.unique_name("bucket-"))
if "two_buckets" in test.fixturenames:
test_buckets.append(string_utils.unique_name("bucket-"))
test_buckets.append(string_utils.unique_name("bucket-"))
2023-11-01 16:20:15 +00:00
if test_buckets:
parallel(s3_client.create_bucket, test_buckets)
return test_buckets
@allure.title("[Test] Create bucket")
@pytest.fixture
def bucket(buckets_pool: list[str], s3_client: S3ClientWrapper, versioning_status: VersioningStatus):
if buckets_pool:
bucket_name = buckets_pool.pop()
else:
bucket_name = s3_client.create_bucket()
if versioning_status:
s3_helper.set_bucket_versioning(s3_client, bucket_name, versioning_status)
return bucket_name
@allure.title("[Test] Create two buckets")
@pytest.fixture
def two_buckets(buckets_pool: list[str], s3_client: S3ClientWrapper) -> list[str]:
buckets: list[str] = []
2023-10-31 14:51:09 +00:00
for _ in range(2):
if buckets_pool:
buckets.append(buckets_pool.pop())
else:
buckets.append(s3_client.create_bucket())
return buckets
@allure.title("[Autouse/Session] Collect binary versions")
@pytest.fixture(scope="session", autouse=True)
@run_optionally(optionals.OPTIONAL_AUTOUSE_FIXTURES_ENABLED)
def collect_binary_versions(hosting: Hosting, client_shell: Shell, request: pytest.FixtureRequest):
environment_dir = request.config.getoption("--alluredir")
if not environment_dir:
return None
local_versions = version_utils.get_local_binaries_versions(client_shell)
remote_versions = version_utils.get_remote_binaries_versions(hosting)
remote_versions_keys = list(remote_versions.keys())
all_versions = {
**local_versions,
**{
f"{name}_{remote_versions_keys.index(host) + 1:02d}": version
for host, versions in remote_versions.items()
for name, version in versions.items()
},
}
file_path = f"{environment_dir}/environment.properties"
env_utils.save_env_properties(file_path, all_versions)
@reporter.step("[Autouse/Session] Test session start time")
@pytest.fixture(scope="session", autouse=True)
def session_start_time(configure_testlib):
start_time = datetime.utcnow()
return start_time
@allure.title("[Autouse/Session] After deploy healthcheck")
@pytest.fixture(scope="session", autouse=True)
@run_optionally(optionals.OPTIONAL_AUTOUSE_FIXTURES_ENABLED)
def after_deploy_healthcheck(cluster: Cluster):
with reporter.step("Wait for cluster readiness after deploy"):
parallel(readiness_on_node, cluster.cluster_nodes)
@pytest.fixture(scope="session")
def rpc_endpoint(cluster: Cluster):
return cluster.default_rpc_endpoint
@wait_for_success(60 * SERVICE_ACTIVE_TIME * 3, 60, title="Wait for {cluster_node} readiness")
def readiness_on_node(cluster_node: ClusterNode):
if "skip_readiness_check" in cluster_node.host.config.attributes and cluster_node.host.config.attributes["skip_readiness_check"]:
return
# TODO: Move to healtcheck classes
svc_name = cluster_node.service(StorageNode).get_service_systemctl_name()
with reporter.step(f"Check service {svc_name} is active"):
result = cluster_node.host.get_shell().exec(f"systemctl is-active {svc_name}")
assert "active" == result.stdout.strip(), f"Service {svc_name} should be in active state"
with reporter.step(f"Check service {svc_name} is active more than {SERVICE_ACTIVE_TIME} minutes"):
result = cluster_node.host.get_shell().exec(f"systemctl show {svc_name} --property ActiveEnterTimestamp | cut -d '=' -f 2")
start_time = parser.parse(result.stdout.strip())
current_time = datetime.now(tz=timezone.utc)
active_time = current_time - start_time
active_minutes = active_time.seconds // 60
active_seconds = active_time.seconds - active_minutes * 60
assert active_time > timedelta(
minutes=SERVICE_ACTIVE_TIME
), f"Service should be in active state more than {SERVICE_ACTIVE_TIME} minutes, current {active_minutes}m:{active_seconds}s"
@reporter.step("Prepare default user with wallet")
@pytest.fixture(scope="session")
@cached_fixture(optionals.OPTIONAL_CACHE_FIXTURES)
def default_user(credentials_provider: CredentialsProvider, cluster: Cluster) -> User:
user = User(string_utils.unique_name("user-"))
node = cluster.cluster_nodes[0]
credentials_provider.GRPC.provide(user, node)
return user
@pytest.fixture(scope="session")
@cached_fixture(optionals.OPTIONAL_CACHE_FIXTURES)
def users_pool(credentials_provider: CredentialsProvider, cluster: Cluster) -> list[User]:
users = [User(string_utils.unique_name("user-")) for _ in range(WALLTETS_IN_POOL)]
parallel(credentials_provider.GRPC.provide, users, cluster_node=cluster.cluster_nodes[0])
return users
@pytest.fixture(scope="session")
def user_tag(request: pytest.FixtureRequest) -> str:
tag = "default"
if "param" in request.__dict__:
tag = request.param
return tag
@pytest.fixture(scope="session")
@cached_fixture(optionals.OPTIONAL_CACHE_FIXTURES)
@reporter.step("Create {user_tag} user")
def user(user_tag: str) -> User:
user = User(string_utils.unique_name("user-"))
user.attributes["tag"] = user_tag
return user
@pytest.fixture(scope="session")
def wallet(user: User, credentials_provider: CredentialsProvider, cluster: Cluster) -> WalletInfo:
credentials_provider.GRPC.provide(user, cluster.cluster_nodes[0])
return user.wallet
# TODO: Migrate tests to fixture wallet above
@reporter.step("Get wallet for default user")
@pytest.fixture(scope="session")
def default_wallet(wallet: WalletInfo) -> WalletInfo:
return wallet
@pytest.fixture(scope="session")
@cached_fixture(optionals.OPTIONAL_CACHE_FIXTURES)
def wallets_pool(users_pool: list[User]) -> list[WalletInfo]:
return [user.wallet for user in users_pool]
@pytest.fixture(scope="session")
def other_wallet(wallets_pool: list[WalletInfo]) -> WalletInfo:
if not wallets_pool:
raise RuntimeError("[other_wallet] No wallets in pool. Consider increasing WALLTETS_IN_POOL or review.")
return wallets_pool.pop()
@pytest.fixture(scope="session")
def other_wallet_2(wallets_pool: list[WalletInfo]) -> WalletInfo:
if not wallets_pool:
raise RuntimeError("[other_wallet2] No wallets in pool. Consider increasing WALLTETS_IN_POOL or review.")
return wallets_pool.pop()
@pytest.fixture()
@allure.title("Select random node for testing")
def node_under_test(cluster: Cluster) -> ClusterNode:
selected_node = random.choice(cluster.cluster_nodes)
reporter.attach(f"{selected_node}", "Selected node")
return selected_node
@allure.title("Init bucket container resolver")
@pytest.fixture()
def bucket_container_resolver(node_under_test: ClusterNode) -> BucketContainerResolver:
resolver_cls = plugins.load_plugin("frostfs.testlib.bucket_cid_resolver", node_under_test.host.config.product)
resolver: BucketContainerResolver = resolver_cls()
return resolver
@pytest.fixture(scope="session", params=[pytest.param(EVERYONE_ALLOW_ALL)])
def container_request(request: pytest.FixtureRequest) -> ContainerRequest:
if "param" in request.__dict__:
return request.param
container_marker = request.node.get_closest_marker("container")
# let default container to be public at the moment
container_request = EVERYONE_ALLOW_ALL
if container_marker:
if len(container_marker.args) != 1:
raise RuntimeError(f"Something wrong with container marker: {container_marker}")
container_request = container_marker.args[0]
if not container_request:
raise RuntimeError(
f"""Container specification is empty.
Add @pytest.mark.parametrize("container_request", [ContainerRequest(...)], indirect=True) decorator."""
)
return container_request
@pytest.fixture(scope="session")
def multiple_containers_request(request: pytest.FixtureRequest) -> ContainerRequest:
if "param" in request.__dict__:
return request.param
raise RuntimeError(
f"""Container specification is empty.
Add @pytest.mark.parametrize("container_requests", [[ContainerRequest(...), ..., ContainerRequest(...)]], indirect=True) decorator."""
)
@pytest.fixture
def container(
wallet: WalletInfo,
frostfs_cli: FrostfsCli,
client_shell: Shell,
cluster: Cluster,
rpc_endpoint: str,
container_request: ContainerRequest,
) -> str:
return create_container_with_ape(container_request, frostfs_cli, wallet, client_shell, cluster, rpc_endpoint)
@pytest.fixture
def containers(
wallet: WalletInfo,
frostfs_cli: FrostfsCli,
client_shell: Shell,
cluster: Cluster,
rpc_endpoint: str,
multiple_containers_request: MultipleContainersRequest,
) -> list[str]:
return create_containers_with_ape(frostfs_cli, wallet, client_shell, cluster, rpc_endpoint, multiple_containers_request)
@pytest.fixture()
def new_epoch(client_shell: Shell, cluster: Cluster) -> int:
return ensure_fresh_epoch(client_shell, cluster)
@pytest.fixture(scope="module")
def new_epoch_module_scope(client_shell: Shell, cluster: Cluster) -> int:
return ensure_fresh_epoch(client_shell, cluster)