a.berezin
ffdfff6ba0
Some checks reported warnings
DCO check / Commits Check (pull_request) Has been cancelled
Signed-off-by: a.berezin <a.berezin@yadro.com>
486 lines
17 KiB
Python
486 lines
17 KiB
Python
import logging
|
|
import os
|
|
import random
|
|
import shutil
|
|
from datetime import datetime, timedelta, timezone
|
|
from typing import Optional
|
|
|
|
import allure
|
|
import pytest
|
|
from dateutil import parser
|
|
from frostfs_testlib import plugins, reporter
|
|
from frostfs_testlib.cli import FrostfsCli
|
|
from frostfs_testlib.credentials.interfaces import CredentialsProvider, User
|
|
from frostfs_testlib.healthcheck.interfaces import Healthcheck
|
|
from frostfs_testlib.hosting import Hosting
|
|
from frostfs_testlib.resources import optionals
|
|
from frostfs_testlib.resources.common import ASSETS_DIR, COMPLEX_OBJECT_CHUNKS_COUNT, COMPLEX_OBJECT_TAIL_SIZE, SIMPLE_OBJECT_SIZE
|
|
from frostfs_testlib.s3 import AwsCliClient, Boto3ClientWrapper, S3ClientWrapper, VersioningStatus
|
|
from frostfs_testlib.s3.interfaces import BucketContainerResolver
|
|
from frostfs_testlib.shell import LocalShell, Shell
|
|
from frostfs_testlib.steps.cli.container import DEFAULT_EC_PLACEMENT_RULE, DEFAULT_PLACEMENT_RULE, FROSTFS_CLI_EXEC
|
|
from frostfs_testlib.steps.cli.object import get_netmap_netinfo
|
|
from frostfs_testlib.steps.s3 import s3_helper
|
|
from frostfs_testlib.storage.cluster import Cluster, ClusterNode
|
|
from frostfs_testlib.storage.controllers.cluster_state_controller import ClusterStateController
|
|
from frostfs_testlib.storage.dataclasses.frostfs_services import StorageNode
|
|
from frostfs_testlib.storage.dataclasses.object_size import ObjectSize
|
|
from frostfs_testlib.storage.dataclasses.policy import PlacementPolicy
|
|
from frostfs_testlib.storage.dataclasses.wallet import WalletInfo
|
|
from frostfs_testlib.testing.cluster_test_base import ClusterTestBase
|
|
from frostfs_testlib.testing.parallel import parallel
|
|
from frostfs_testlib.testing.test_control import run_optionally, wait_for_success
|
|
from frostfs_testlib.utils import env_utils, string_utils, version_utils
|
|
from frostfs_testlib.utils.file_utils import TestFile, generate_file
|
|
|
|
from pytest_tests.resources.common import TEST_CYCLES_COUNT
|
|
|
|
logger = logging.getLogger("NeoLogger")
|
|
|
|
SERVICE_ACTIVE_TIME = 20
|
|
WALLTETS_IN_POOL = 2
|
|
|
|
# Add logs check test even if it's not fit to mark selectors
|
|
def pytest_configure(config: pytest.Config):
|
|
markers = config.option.markexpr
|
|
if markers != "" and "sanity" not in markers:
|
|
config.option.markexpr = f"logs_after_session or ({markers})"
|
|
|
|
|
|
number_key = pytest.StashKey[str]()
|
|
start_time = pytest.StashKey[int]()
|
|
test_outcome = pytest.StashKey[str]()
|
|
# pytest hook. Do not rename
|
|
def pytest_collection_modifyitems(items: list[pytest.Item]):
|
|
# Change order of tests based on @pytest.mark.order(<int>) marker
|
|
def order(item: pytest.Item) -> int:
|
|
order_marker = item.get_closest_marker("order")
|
|
if order_marker and (len(order_marker.args) != 1 or not isinstance(order_marker.args[0], int)):
|
|
raise RuntimeError("Incorrect usage of pytest.mark.order")
|
|
|
|
order_value = order_marker.args[0] if order_marker else 0
|
|
return order_value
|
|
|
|
items.sort(key=lambda item: order(item))
|
|
|
|
|
|
# pytest hook. Do not rename
|
|
def pytest_collection_finish(session: pytest.Session):
|
|
items_total = len(session.items)
|
|
for number, item in enumerate(session.items, 1):
|
|
item.stash[number_key] = f"[{number}/{items_total}]"
|
|
item.stash[test_outcome] = ""
|
|
item.stash[start_time] = 0
|
|
|
|
|
|
# pytest hook. Do not rename
|
|
def pytest_runtest_setup(item: pytest.Item):
|
|
item.stash[start_time] = int(datetime.now().timestamp())
|
|
logger.info(f"STARTED {item.stash[number_key]}: {item.name}")
|
|
|
|
|
|
# pytest hook. Do not rename
|
|
def pytest_runtest_makereport(item: pytest.Item, call: pytest.CallInfo):
|
|
if call.excinfo is not None:
|
|
if call.excinfo.typename == "Skipped":
|
|
item.stash[start_time] = int(datetime.now().timestamp())
|
|
item.stash[test_outcome] += f"SKIPPED on {call.when}; "
|
|
else:
|
|
item.stash[test_outcome] += f"FAILED on {call.when}; "
|
|
|
|
if call.when == "teardown":
|
|
duration = int(datetime.now().timestamp()) - item.stash[start_time]
|
|
if not item.stash[test_outcome]:
|
|
outcome = "PASSED "
|
|
else:
|
|
outcome = item.stash[test_outcome]
|
|
logger.info(f"ENDED {item.stash[number_key]}: {item.name}: {outcome}(duration={duration}s)")
|
|
|
|
|
|
# pytest hook. Do not rename
|
|
def pytest_generate_tests(metafunc: pytest.Metafunc):
|
|
if (
|
|
TEST_CYCLES_COUNT <= 1
|
|
or metafunc.definition.get_closest_marker("logs_after_session")
|
|
or metafunc.definition.get_closest_marker("no_cycles")
|
|
):
|
|
return
|
|
|
|
metafunc.fixturenames.append("cycle")
|
|
metafunc.parametrize(
|
|
"cycle",
|
|
range(1, TEST_CYCLES_COUNT + 1),
|
|
ids=[f"cycle {cycle}" for cycle in range(1, TEST_CYCLES_COUNT + 1)],
|
|
)
|
|
|
|
|
|
@pytest.fixture(scope="session")
|
|
def client_shell(configure_testlib) -> Shell:
|
|
yield LocalShell()
|
|
|
|
|
|
@pytest.fixture(scope="session")
|
|
def require_multiple_hosts(hosting: Hosting):
|
|
"""Designates tests that require environment with multiple hosts.
|
|
|
|
These tests will be skipped on an environment that has only 1 host.
|
|
"""
|
|
if len(hosting.hosts) <= 1:
|
|
pytest.skip("Test only works with multiple hosts")
|
|
yield
|
|
|
|
|
|
@pytest.fixture(scope="session")
|
|
def require_multiple_interfaces(cluster: Cluster):
|
|
"""
|
|
We determine that there are the required number of interfaces for tests
|
|
|
|
If there are no required interfaces, the tests will be skipped.
|
|
"""
|
|
interfaces = cluster.cluster_nodes[0].host.config.interfaces
|
|
if "internal1" not in interfaces or "data1" not in interfaces:
|
|
pytest.skip("This test requires multiple internal and data interfaces")
|
|
yield
|
|
|
|
|
|
@pytest.fixture(scope="session")
|
|
def max_object_size(cluster: Cluster, client_shell: Shell) -> int:
|
|
storage_node = cluster.storage_nodes[0]
|
|
wallet = WalletInfo.from_node(storage_node)
|
|
net_info = get_netmap_netinfo(
|
|
wallet=wallet,
|
|
endpoint=storage_node.get_rpc_endpoint(),
|
|
shell=client_shell,
|
|
)
|
|
yield net_info["maximum_object_size"]
|
|
|
|
|
|
@pytest.fixture(scope="session")
|
|
def simple_object_size(max_object_size: int) -> ObjectSize:
|
|
size = min(int(SIMPLE_OBJECT_SIZE), max_object_size)
|
|
return ObjectSize("simple", size)
|
|
|
|
|
|
@pytest.fixture()
|
|
def file_path(object_size: ObjectSize) -> TestFile:
|
|
return generate_file(object_size.value)
|
|
|
|
|
|
@pytest.fixture(scope="session")
|
|
def complex_object_size(max_object_size: int) -> ObjectSize:
|
|
size = max_object_size * int(COMPLEX_OBJECT_CHUNKS_COUNT) + int(COMPLEX_OBJECT_TAIL_SIZE)
|
|
return ObjectSize("complex", size)
|
|
|
|
|
|
# By default we want all tests to be executed with both object sizes
|
|
# This can be overriden in choosen tests if needed
|
|
@pytest.fixture(
|
|
scope="session",
|
|
params=[pytest.param("simple", marks=pytest.mark.simple), pytest.param("complex", marks=pytest.mark.complex)],
|
|
)
|
|
def object_size(simple_object_size: ObjectSize, complex_object_size: ObjectSize, request: pytest.FixtureRequest) -> ObjectSize:
|
|
if request.param == "simple":
|
|
return simple_object_size
|
|
|
|
return complex_object_size
|
|
|
|
|
|
@pytest.fixture(scope="session")
|
|
def rep_placement_policy() -> PlacementPolicy:
|
|
return PlacementPolicy("rep", DEFAULT_PLACEMENT_RULE)
|
|
|
|
|
|
@pytest.fixture(scope="session")
|
|
def ec_placement_policy() -> PlacementPolicy:
|
|
return PlacementPolicy("ec", DEFAULT_EC_PLACEMENT_RULE)
|
|
|
|
|
|
@pytest.fixture(scope="session")
|
|
@allure.title("Init Frostfs CLI")
|
|
def frostfs_cli(client_shell: Shell, default_wallet: WalletInfo) -> FrostfsCli:
|
|
return FrostfsCli(client_shell, FROSTFS_CLI_EXEC, default_wallet.config_path)
|
|
|
|
|
|
# By default we want all tests to be executed with both storage policies.
|
|
# This can be overriden in choosen tests if needed.
|
|
@pytest.fixture(
|
|
scope="session",
|
|
params=[pytest.param("rep", marks=pytest.mark.rep), pytest.param("ec", marks=pytest.mark.ec)],
|
|
)
|
|
def placement_policy(
|
|
rep_placement_policy: PlacementPolicy, ec_placement_policy: PlacementPolicy, request: pytest.FixtureRequest
|
|
) -> PlacementPolicy:
|
|
if request.param == "rep":
|
|
return rep_placement_policy
|
|
|
|
return ec_placement_policy
|
|
|
|
|
|
@pytest.fixture(scope="session")
|
|
def cluster(temp_directory: str, hosting: Hosting, client_shell: Shell) -> Cluster:
|
|
cluster = Cluster(hosting)
|
|
if cluster.is_local_devenv():
|
|
cluster.create_wallet_configs(hosting)
|
|
|
|
ClusterTestBase.shell = client_shell
|
|
ClusterTestBase.cluster = cluster
|
|
|
|
yield cluster
|
|
|
|
|
|
@allure.title("[Session]: Provide S3 policy")
|
|
@pytest.fixture(scope="session")
|
|
def s3_policy(request: pytest.FixtureRequest):
|
|
policy = None
|
|
if "param" in request.__dict__:
|
|
policy = request.param
|
|
|
|
return policy
|
|
|
|
|
|
@pytest.fixture(scope="session")
|
|
@allure.title("[Session] Create healthcheck object")
|
|
def healthcheck(cluster: Cluster) -> Healthcheck:
|
|
healthcheck_cls = plugins.load_plugin("frostfs.testlib.healthcheck", cluster.cluster_nodes[0].host.config.healthcheck_plugin_name)
|
|
|
|
return healthcheck_cls()
|
|
|
|
|
|
@pytest.fixture(scope="session")
|
|
def cluster_state_controller_session(client_shell: Shell, cluster: Cluster, healthcheck: Healthcheck) -> ClusterStateController:
|
|
controller = ClusterStateController(client_shell, cluster, healthcheck)
|
|
return controller
|
|
|
|
|
|
@pytest.fixture
|
|
def cluster_state_controller(cluster_state_controller_session: ClusterStateController) -> ClusterStateController:
|
|
yield cluster_state_controller_session
|
|
cluster_state_controller_session.start_stopped_hosts()
|
|
cluster_state_controller_session.start_all_stopped_services()
|
|
|
|
|
|
@pytest.fixture(scope="session")
|
|
def credentials_provider(cluster: Cluster) -> CredentialsProvider:
|
|
return CredentialsProvider(cluster)
|
|
|
|
|
|
@allure.title("[Session]: Create S3 client")
|
|
@pytest.fixture(
|
|
scope="session",
|
|
params=[
|
|
pytest.param(AwsCliClient, marks=[pytest.mark.aws, pytest.mark.weekly]),
|
|
pytest.param(Boto3ClientWrapper, marks=[pytest.mark.boto3, pytest.mark.nightly]),
|
|
],
|
|
)
|
|
def s3_client(
|
|
default_user: User,
|
|
s3_policy: Optional[str],
|
|
cluster: Cluster,
|
|
request: pytest.FixtureRequest,
|
|
credentials_provider: CredentialsProvider,
|
|
) -> S3ClientWrapper:
|
|
node = cluster.cluster_nodes[0]
|
|
credentials_provider.S3.provide(default_user, node, s3_policy)
|
|
|
|
s3_client_cls = request.param
|
|
client = s3_client_cls(default_user.s3_credentials.access_key, default_user.s3_credentials.secret_key, cluster.default_s3_gate_endpoint)
|
|
return client
|
|
|
|
|
|
@pytest.fixture
|
|
def versioning_status(request: pytest.FixtureRequest) -> VersioningStatus:
|
|
if "param" in request.__dict__:
|
|
return request.param
|
|
|
|
return VersioningStatus.UNDEFINED
|
|
|
|
|
|
@allure.title("[Session] Bulk create buckets for tests")
|
|
@pytest.fixture(scope="session")
|
|
def buckets_pool(s3_client: S3ClientWrapper, request: pytest.FixtureRequest):
|
|
test_buckets: list = []
|
|
|
|
s3_client_type = type(s3_client).__name__
|
|
|
|
for test in request.session.items:
|
|
if s3_client_type not in test.name:
|
|
continue
|
|
|
|
if "bucket" in test.fixturenames:
|
|
test_buckets.append(string_utils.unique_name("bucket-"))
|
|
|
|
if "two_buckets" in test.fixturenames:
|
|
test_buckets.append(string_utils.unique_name("bucket-"))
|
|
test_buckets.append(string_utils.unique_name("bucket-"))
|
|
|
|
if test_buckets:
|
|
parallel(s3_client.create_bucket, test_buckets)
|
|
|
|
return test_buckets
|
|
|
|
|
|
@allure.title("[Test] Create bucket")
|
|
@pytest.fixture
|
|
def bucket(buckets_pool: list[str], s3_client: S3ClientWrapper, versioning_status: VersioningStatus):
|
|
if buckets_pool:
|
|
bucket_name = buckets_pool.pop()
|
|
else:
|
|
bucket_name = s3_client.create_bucket()
|
|
|
|
if versioning_status:
|
|
s3_helper.set_bucket_versioning(s3_client, bucket_name, versioning_status)
|
|
|
|
return bucket_name
|
|
|
|
|
|
@allure.title("[Test] Create two buckets")
|
|
@pytest.fixture
|
|
def two_buckets(buckets_pool: list[str], s3_client: S3ClientWrapper) -> list[str]:
|
|
buckets: list[str] = []
|
|
|
|
for _ in range(2):
|
|
if buckets_pool:
|
|
buckets.append(buckets_pool.pop())
|
|
else:
|
|
buckets.append(s3_client.create_bucket())
|
|
|
|
return buckets
|
|
|
|
|
|
@allure.title("[Autouse/Session] Collect binary versions")
|
|
@pytest.fixture(scope="session", autouse=True)
|
|
@run_optionally(optionals.OPTIONAL_AUTOUSE_FIXTURES_ENABLED)
|
|
def collect_binary_versions(hosting: Hosting, client_shell: Shell, request: pytest.FixtureRequest):
|
|
environment_dir = request.config.getoption("--alluredir")
|
|
if not environment_dir:
|
|
return None
|
|
|
|
local_versions = version_utils.get_local_binaries_versions(client_shell)
|
|
remote_versions = version_utils.get_remote_binaries_versions(hosting)
|
|
remote_versions_keys = list(remote_versions.keys())
|
|
|
|
all_versions = {
|
|
**local_versions,
|
|
**{
|
|
f"{name}_{remote_versions_keys.index(host) + 1:02d}": version
|
|
for host, versions in remote_versions.items()
|
|
for name, version in versions.items()
|
|
},
|
|
}
|
|
|
|
file_path = f"{environment_dir}/environment.properties"
|
|
env_utils.save_env_properties(file_path, all_versions)
|
|
|
|
|
|
@reporter.step("Prepare tmp directory")
|
|
@pytest.fixture(scope="session")
|
|
def temp_directory(configure_testlib):
|
|
with reporter.step("Prepare tmp directory"):
|
|
full_path = os.path.join(os.getcwd(), ASSETS_DIR)
|
|
shutil.rmtree(full_path, ignore_errors=True)
|
|
os.mkdir(full_path)
|
|
|
|
yield full_path
|
|
|
|
with reporter.step("Remove tmp directory"):
|
|
shutil.rmtree(full_path)
|
|
|
|
|
|
@reporter.step("[Autouse/Session] Test session start time")
|
|
@pytest.fixture(scope="session", autouse=True)
|
|
def session_start_time(configure_testlib):
|
|
start_time = datetime.utcnow()
|
|
return start_time
|
|
|
|
|
|
@allure.title("[Autouse/Session] After deploy healthcheck")
|
|
@pytest.fixture(scope="session", autouse=True)
|
|
@run_optionally(optionals.OPTIONAL_AUTOUSE_FIXTURES_ENABLED)
|
|
def after_deploy_healthcheck(cluster: Cluster):
|
|
with reporter.step("Wait for cluster readiness after deploy"):
|
|
parallel(readiness_on_node, cluster.cluster_nodes)
|
|
|
|
|
|
@pytest.fixture(scope="session")
|
|
def rpc_endpoint(cluster: Cluster):
|
|
return cluster.default_rpc_endpoint
|
|
|
|
|
|
@wait_for_success(60 * SERVICE_ACTIVE_TIME * 3, 60, title="Wait for {cluster_node} readiness")
|
|
def readiness_on_node(cluster_node: ClusterNode):
|
|
if "skip_readiness_check" in cluster_node.host.config.attributes and cluster_node.host.config.attributes["skip_readiness_check"]:
|
|
return
|
|
|
|
# TODO: Move to healtcheck classes
|
|
svc_name = cluster_node.service(StorageNode).get_service_systemctl_name()
|
|
with reporter.step(f"Check service {svc_name} is active"):
|
|
result = cluster_node.host.get_shell().exec(f"systemctl is-active {svc_name}")
|
|
assert "active" == result.stdout.strip(), f"Service {svc_name} should be in active state"
|
|
|
|
with reporter.step(f"Check service {svc_name} is active more than {SERVICE_ACTIVE_TIME} minutes"):
|
|
result = cluster_node.host.get_shell().exec(f"systemctl show {svc_name} --property ActiveEnterTimestamp | cut -d '=' -f 2")
|
|
start_time = parser.parse(result.stdout.strip())
|
|
current_time = datetime.now(tz=timezone.utc)
|
|
active_time = current_time - start_time
|
|
|
|
active_minutes = active_time.seconds // 60
|
|
active_seconds = active_time.seconds - active_minutes * 60
|
|
|
|
assert active_time > timedelta(
|
|
minutes=SERVICE_ACTIVE_TIME
|
|
), f"Service should be in active state more than {SERVICE_ACTIVE_TIME} minutes, current {active_minutes}m:{active_seconds}s"
|
|
|
|
|
|
@reporter.step("Prepare default user with wallet")
|
|
@pytest.fixture(scope="session")
|
|
def default_user(credentials_provider: CredentialsProvider, cluster: Cluster) -> User:
|
|
user = User(string_utils.unique_name("user-"))
|
|
node = cluster.cluster_nodes[0]
|
|
|
|
credentials_provider.GRPC.provide(user, node)
|
|
|
|
return user
|
|
|
|
|
|
@reporter.step("Get wallet for default user")
|
|
@pytest.fixture(scope="session")
|
|
def default_wallet(default_user: User) -> WalletInfo:
|
|
return default_user.wallet
|
|
|
|
|
|
@pytest.fixture(scope="session")
|
|
def wallets_pool(credentials_provider: CredentialsProvider, cluster: Cluster) -> list[WalletInfo]:
|
|
users = [User(string_utils.unique_name("user-")) for _ in range(WALLTETS_IN_POOL)]
|
|
parallel(credentials_provider.GRPC.provide, users, cluster_node=cluster.cluster_nodes[0])
|
|
|
|
return [user.wallet for user in users]
|
|
|
|
|
|
@pytest.fixture(scope="session")
|
|
def other_wallet(wallets_pool: list[WalletInfo]) -> WalletInfo:
|
|
if not wallets_pool:
|
|
raise RuntimeError("[other_wallet] No wallets in pool. Consider increasing WALLTETS_IN_POOL or review.")
|
|
return wallets_pool.pop()
|
|
|
|
|
|
@pytest.fixture(scope="session")
|
|
def other_wallet_2(wallets_pool: list[WalletInfo]) -> WalletInfo:
|
|
if not wallets_pool:
|
|
raise RuntimeError("[other_wallet2] No wallets in pool. Consider increasing WALLTETS_IN_POOL or review.")
|
|
return wallets_pool.pop()
|
|
|
|
|
|
@pytest.fixture()
|
|
@allure.title("Select random node for testing")
|
|
def node_under_test(cluster: Cluster) -> ClusterNode:
|
|
selected_node = random.choice(cluster.cluster_nodes)
|
|
reporter.attach(f"{selected_node}", "Selected node")
|
|
return selected_node
|
|
|
|
|
|
@allure.title("Init bucket container resolver")
|
|
@pytest.fixture()
|
|
def bucket_container_resolver(node_under_test: ClusterNode) -> BucketContainerResolver:
|
|
resolver_cls = plugins.load_plugin("frostfs.testlib.bucket_cid_resolver", node_under_test.host.config.product)
|
|
resolver: BucketContainerResolver = resolver_cls()
|
|
return resolver
|