Updates for s3 k6

This commit is contained in:
Andrey Berezin 2023-07-14 16:04:44 +03:00 committed by Andrey Berezin
parent 59b4157991
commit 62216293f8
4 changed files with 80 additions and 58 deletions

View file

@ -2,9 +2,10 @@ import json
import logging
import math
import os
from dataclasses import dataclass, fields
from dataclasses import dataclass
from time import sleep
from typing import Any
from urllib.parse import urlparse
from frostfs_testlib.load.interfaces import Loader
from frostfs_testlib.load.load_config import (
@ -16,11 +17,7 @@ from frostfs_testlib.load.load_config import (
from frostfs_testlib.processes.remote_process import RemoteProcess
from frostfs_testlib.reporter import get_reporter
from frostfs_testlib.resources.common import STORAGE_USER_NAME
from frostfs_testlib.resources.load_params import (
K6_STOP_SIGNAL_TIMEOUT,
K6_TEARDOWN_PERIOD,
LOAD_NODE_SSH_USER,
)
from frostfs_testlib.resources.load_params import K6_STOP_SIGNAL_TIMEOUT, K6_TEARDOWN_PERIOD
from frostfs_testlib.shell import Shell
from frostfs_testlib.storage.dataclasses.wallet import WalletInfo
from frostfs_testlib.testing.test_control import wait_for_success
@ -60,10 +57,9 @@ class K6:
self.loader: Loader = loader
self.shell: Shell = shell
self.wallet = wallet
self.scenario: LoadScenario = load_params.scenario
self.summary_json: str = os.path.join(
self.load_params.working_dir,
f"{self.load_params.load_id}_{self.scenario.value}_summary.json",
f"{self.load_params.load_id}_{self.load_params.scenario.value}_summary.json",
)
self._k6_dir: str = k6_dir
@ -98,24 +94,7 @@ class K6:
preset_scenario = preset_map[self.load_params.load_type]
command_args = base_args[preset_scenario].copy()
command_args += [
f"--{field.metadata['preset_argument']} '{getattr(self.load_params, field.name)}'"
for field in fields(self.load_params)
if field.metadata
and self.scenario in field.metadata["applicable_scenarios"]
and field.metadata["preset_argument"]
and getattr(self.load_params, field.name) is not None
]
if self.load_params.preset:
command_args += [
f"--{field.metadata['preset_argument']} '{getattr(self.load_params.preset, field.name)}'"
for field in fields(self.load_params.preset)
if field.metadata
and self.scenario in field.metadata["applicable_scenarios"]
and field.metadata["preset_argument"]
and getattr(self.load_params.preset, field.name) is not None
]
command_args += self.load_params.get_preset_arguments()
command = " ".join(command_args)
result = self.shell.exec(command)
@ -127,26 +106,7 @@ class K6:
@reporter.step_deco("Generate K6 command")
def _generate_env_variables(self) -> str:
env_vars = {
field.metadata["env_variable"]: getattr(self.load_params, field.name)
for field in fields(self.load_params)
if field.metadata
and self.scenario in field.metadata["applicable_scenarios"]
and field.metadata["env_variable"]
and getattr(self.load_params, field.name) is not None
}
if self.load_params.preset:
env_vars.update(
{
field.metadata["env_variable"]: getattr(self.load_params.preset, field.name)
for field in fields(self.load_params.preset)
if field.metadata
and self.scenario in field.metadata["applicable_scenarios"]
and field.metadata["env_variable"]
and getattr(self.load_params.preset, field.name) is not None
}
)
env_vars = self.load_params.get_env_vars()
env_vars[f"{self.load_params.load_type.value.upper()}_ENDPOINTS"] = ",".join(self.endpoints)
env_vars["SUMMARY_JSON"] = self.summary_json
@ -164,7 +124,7 @@ class K6:
):
command = (
f"{self._k6_dir}/k6 run {self._generate_env_variables()} "
f"{self._k6_dir}/scenarios/{self.scenario.value}.js"
f"{self._k6_dir}/scenarios/{self.load_params.scenario.value}.js"
)
user = STORAGE_USER_NAME if self.load_params.scenario == LoadScenario.LOCAL else None
self._k6_process = RemoteProcess.create(
@ -215,10 +175,10 @@ class K6:
summary_text = self.shell.exec(f"cat {self.summary_json}").stdout
summary_json = json.loads(summary_text)
endpoint = urlparse(self.endpoints[0]).netloc or self.endpoints[0]
allure_filenames = {
K6ProcessAllocationStrategy.PER_LOAD_NODE: f"{self.loader.ip}_{self.scenario.value}_summary.json",
K6ProcessAllocationStrategy.PER_ENDPOINT: f"{self.loader.ip}_{self.scenario.value}_{self.endpoints[0]}_summary.json",
K6ProcessAllocationStrategy.PER_LOAD_NODE: f"{self.loader.ip}_{self.load_params.scenario.value}_summary.json",
K6ProcessAllocationStrategy.PER_ENDPOINT: f"{self.loader.ip}_{self.load_params.scenario.value}_{endpoint}_summary.json",
}
allure_filename = allure_filenames[self.load_params.k6_process_allocation_strategy]

View file

@ -1,7 +1,8 @@
import os
from dataclasses import dataclass, field
from dataclasses import dataclass, field, fields, is_dataclass
from enum import Enum
from typing import Optional
from types import MappingProxyType
from typing import Any, Optional, get_args
class LoadType(Enum):
@ -42,6 +43,12 @@ grpc_preset_scenarios = [
s3_preset_scenarios = [LoadScenario.S3, LoadScenario.S3_CAR]
@dataclass
class MetaField:
metadata: MappingProxyType
value: Any
def metadata_field(
applicable_scenarios: list[LoadScenario],
preset_param: Optional[str] = None,
@ -138,6 +145,12 @@ class LoadParams:
preset: Optional[Preset] = None
# K6 download url
k6_url: Optional[str] = None
# No ssl verification flag
no_verify_ssl: Optional[bool] = metadata_field(
[LoadScenario.S3, LoadScenario.S3_CAR, LoadScenario.VERIFY, LoadScenario.HTTP],
"no-verify-ssl",
"NO_VERIFY_SSL",
)
# ------- COMMON SCENARIO PARAMS -------
# Load time is the maximum duration for k6 to give load. Default is the BACKGROUND_LOAD_DEFAULT_TIME value.
@ -225,3 +238,53 @@ class LoadParams:
self.registry_file = os.path.join(self.working_dir, f"{load_id}_registry.bolt")
if self.preset:
self.preset.pregen_json = os.path.join(self.working_dir, f"{load_id}_prepare.json")
def get_env_vars(self):
env_vars = {
meta_field.metadata["env_variable"]: meta_field.value
for meta_field in self._get_meta_fields(self)
if self.scenario in meta_field.metadata["applicable_scenarios"]
and meta_field.metadata["env_variable"]
and meta_field.value
}
return env_vars
def get_preset_arguments(self):
command_args = [
self._get_preset_argument(meta_field)
for meta_field in self._get_meta_fields(self)
if self.scenario in meta_field.metadata["applicable_scenarios"]
and meta_field.metadata["preset_argument"]
and meta_field.value
and self._get_preset_argument(meta_field)
]
return command_args
@staticmethod
def _get_preset_argument(meta_field: MetaField) -> str:
if isinstance(meta_field.value, bool):
# For preset calls, bool values are passed with just --<argument_name> if the value is True
return f"--{meta_field.metadata['preset_argument']}" if meta_field.value else ""
return f"--{meta_field.metadata['preset_argument']} '{meta_field.value}'"
@staticmethod
def _get_meta_fields(instance) -> list[MetaField]:
data_fields = fields(instance)
fields_with_data = [
MetaField(field.metadata, getattr(instance, field.name))
for field in data_fields
if field.metadata and getattr(instance, field.name)
]
for field in data_fields:
actual_field_type = (
get_args(field.type)[0] if len(get_args(field.type)) else get_args(field.type)
)
if is_dataclass(actual_field_type) and getattr(instance, field.name):
fields_with_data += LoadParams._get_meta_fields(getattr(instance, field.name))
return fields_with_data or []

View file

@ -6,6 +6,7 @@ import time
from concurrent.futures import ThreadPoolExecutor
from dataclasses import fields
from typing import Optional
from urllib.parse import urlparse
import yaml
@ -257,9 +258,10 @@ class DefaultRunner(RunnerBase):
raise RuntimeError("k6_process_allocation_strategy should not be none")
result = k6_instance.get_results()
endpoint = urlparse(k6_instance.endpoints[0]).netloc or k6_instance.endpoints[0]
keys_map = {
K6ProcessAllocationStrategy.PER_LOAD_NODE: k6_instance.loader.ip,
K6ProcessAllocationStrategy.PER_ENDPOINT: k6_instance.endpoints[0],
K6ProcessAllocationStrategy.PER_ENDPOINT: endpoint,
}
key = keys_map[k6_instance.load_params.k6_process_allocation_strategy]
results[key] = result

View file

@ -80,17 +80,14 @@ class BackgroundLoadController:
LoadType.S3: {
EndpointSelectionStrategy.ALL: list(
set(
endpoint.replace("http://", "").replace("https://", "")
endpoint
for node_under_load in self.nodes_under_load
for endpoint in node_under_load.service(S3Gate).get_all_endpoints()
)
),
EndpointSelectionStrategy.FIRST: list(
set(
node_under_load.service(S3Gate)
.get_endpoint()
.replace("http://", "")
.replace("https://", "")
node_under_load.service(S3Gate).get_endpoint()
for node_under_load in self.nodes_under_load
)
),