[#191] Credentials work overhaul

Signed-off-by: Andrey Berezin <a.berezin@yadro.com>
This commit is contained in:
Andrey Berezin 2024-03-11 19:23:10 +03:00
parent 09a7f66d1e
commit 25925c637b
31 changed files with 370 additions and 485 deletions

View file

@ -9,13 +9,13 @@ from typing import Any
from urllib.parse import urlparse
from frostfs_testlib import reporter
from frostfs_testlib.credentials.interfaces import User
from frostfs_testlib.load.interfaces.loader import Loader
from frostfs_testlib.load.load_config import K6ProcessAllocationStrategy, LoadParams, LoadScenario, LoadType
from frostfs_testlib.processes.remote_process import RemoteProcess
from frostfs_testlib.resources.common import STORAGE_USER_NAME
from frostfs_testlib.resources.load_params import K6_STOP_SIGNAL_TIMEOUT, K6_TEARDOWN_PERIOD
from frostfs_testlib.shell import Shell
from frostfs_testlib.storage.dataclasses.wallet import WalletInfo
from frostfs_testlib.testing.test_control import wait_for_success
EXIT_RESULT_CODE = 0
@ -42,16 +42,16 @@ class K6:
k6_dir: str,
shell: Shell,
loader: Loader,
wallet: WalletInfo,
user: User,
):
if load_params.scenario is None:
raise RuntimeError("Scenario should not be none")
self.load_params: LoadParams = load_params
self.load_params = load_params
self.endpoints = endpoints
self.loader: Loader = loader
self.shell: Shell = shell
self.wallet = wallet
self.loader = loader
self.shell = shell
self.user = user
self.preset_output: str = ""
self.summary_json: str = os.path.join(
self.load_params.working_dir,
@ -64,13 +64,9 @@ class K6:
f"{self._generate_env_variables()}{self._k6_dir}/k6 run {self._generate_k6_variables()} "
f"{self._k6_dir}/scenarios/{self.load_params.scenario.value}.js"
)
user = STORAGE_USER_NAME if self.load_params.scenario == LoadScenario.LOCAL else None
process_id = (
self.load_params.load_id
if self.load_params.scenario != LoadScenario.VERIFY
else f"{self.load_params.load_id}_verify"
)
self._k6_process = RemoteProcess.create(command, self.shell, self.load_params.working_dir, user, process_id)
remote_user = STORAGE_USER_NAME if self.load_params.scenario == LoadScenario.LOCAL else None
process_id = self.load_params.load_id if self.load_params.scenario != LoadScenario.VERIFY else f"{self.load_params.load_id}_verify"
self._k6_process = RemoteProcess.create(command, self.shell, self.load_params.working_dir, remote_user, process_id)
def _get_fill_percents(self):
fill_percents = self.shell.exec("df -H --output=source,pcent,target | grep frostfs").stdout.split("\n")
@ -103,8 +99,8 @@ class K6:
preset_grpc: [
preset_grpc,
f"--endpoint {','.join(self.endpoints)}",
f"--wallet {self.wallet.path} ",
f"--config {self.wallet.config_path} ",
f"--wallet {self.user.wallet.path} ",
f"--config {self.user.wallet.config_path} ",
],
preset_s3: [
preset_s3,
@ -167,9 +163,7 @@ class K6:
remaining_time = timeout - working_time
setup_teardown_time = (
int(K6_TEARDOWN_PERIOD)
+ self.load_params.get_init_time()
+ int(self.load_params.setup_timeout.replace("s", "").strip())
int(K6_TEARDOWN_PERIOD) + self.load_params.get_init_time() + int(self.load_params.setup_timeout.replace("s", "").strip())
)
remaining_time_including_setup_and_teardown = remaining_time + setup_teardown_time
timeout = remaining_time_including_setup_and_teardown
@ -201,9 +195,7 @@ class K6:
if not self.load_params.fill_percent is None:
with reporter.step(f"Check the percentage of filling of all data disks on the node"):
if self.check_fill_percent():
logger.info(
f"Stopping load on because disks is filled more then {self.load_params.fill_percent}%"
)
logger.info(f"Stopping load on because disks is filled more then {self.load_params.fill_percent}%")
event.set()
self.stop()
return

View file

@ -1,24 +1,20 @@
import copy
import itertools
import math
import re
import time
from dataclasses import fields
from threading import Event
from typing import Optional
from urllib.parse import urlparse
import yaml
from frostfs_testlib import reporter
from frostfs_testlib.cli.frostfs_authmate.authmate import FrostfsAuthmate
from frostfs_testlib.credentials.interfaces import S3Credentials, User
from frostfs_testlib.load.interfaces.loader import Loader
from frostfs_testlib.load.interfaces.scenario_runner import ScenarioRunner
from frostfs_testlib.load.k6 import K6
from frostfs_testlib.load.load_config import K6ProcessAllocationStrategy, LoadParams, LoadType
from frostfs_testlib.load.loaders import NodeLoader, RemoteLoader
from frostfs_testlib.resources import optionals
from frostfs_testlib.resources.cli import FROSTFS_AUTHMATE_EXEC
from frostfs_testlib.resources.common import STORAGE_USER_NAME
from frostfs_testlib.resources.load_params import BACKGROUND_LOAD_VUS_COUNT_DIVISOR, LOAD_NODE_SSH_USER, LOAD_NODES
from frostfs_testlib.shell.command_inspectors import SuInspector
@ -26,7 +22,6 @@ from frostfs_testlib.shell.interfaces import CommandOptions, InteractiveInput
from frostfs_testlib.storage.cluster import ClusterNode
from frostfs_testlib.storage.controllers.cluster_state_controller import ClusterStateController
from frostfs_testlib.storage.dataclasses.frostfs_services import S3Gate, StorageNode
from frostfs_testlib.storage.dataclasses.wallet import WalletInfo
from frostfs_testlib.testing import parallel, run_optionally
from frostfs_testlib.testing.test_control import retry
from frostfs_testlib.utils import datetime_utils
@ -57,17 +52,17 @@ class RunnerBase(ScenarioRunner):
class DefaultRunner(RunnerBase):
loaders: list[Loader]
loaders_wallet: WalletInfo
user: User
def __init__(
self,
loaders_wallet: WalletInfo,
user: User,
load_ip_list: Optional[list[str]] = None,
) -> None:
if load_ip_list is None:
load_ip_list = LOAD_NODES
self.loaders = RemoteLoader.from_ip_list(load_ip_list)
self.loaders_wallet = loaders_wallet
self.user = user
@run_optionally(optionals.OPTIONAL_BACKGROUND_LOAD_ENABLED)
@reporter.step("Preparation steps")
@ -86,55 +81,27 @@ class DefaultRunner(RunnerBase):
return
with reporter.step("Init s3 client on loaders"):
storage_node = nodes_under_load[0].service(StorageNode)
s3_public_keys = [node.service(S3Gate).get_wallet_public_key() for node in cluster_nodes]
grpc_peer = storage_node.get_rpc_endpoint()
parallel(self._prepare_loader, self.loaders, load_params, grpc_peer, s3_public_keys, k6_dir)
s3_credentials = self.user.s3_credentials
parallel(self._aws_configure_on_loader, self.loaders, s3_credentials)
def _force_fresh_registry(self, loader: Loader, load_params: LoadParams):
with reporter.step(f"Forcing fresh registry on {loader.ip}"):
shell = loader.get_shell()
shell.exec(f"rm -f {load_params.registry_file}")
def _prepare_loader(
def _aws_configure_on_loader(
self,
loader: Loader,
load_params: LoadParams,
grpc_peer: str,
s3_public_keys: list[str],
k6_dir: str,
s3_credentials: S3Credentials,
):
with reporter.step(f"Init s3 client on {loader.ip}"):
shell = loader.get_shell()
frostfs_authmate_exec: FrostfsAuthmate = FrostfsAuthmate(shell, FROSTFS_AUTHMATE_EXEC)
issue_secret_output = frostfs_authmate_exec.secret.issue(
wallet=self.loaders_wallet.path,
peer=grpc_peer,
gate_public_key=s3_public_keys,
container_placement_policy=load_params.preset.container_placement_policy,
container_policy=f"{k6_dir}/scenarios/files/policy.json",
wallet_password=self.loaders_wallet.password,
).stdout
aws_access_key_id = str(
re.search(r"access_key_id.*:\s.(?P<aws_access_key_id>\w*)", issue_secret_output).group(
"aws_access_key_id"
)
)
aws_secret_access_key = str(
re.search(
r"secret_access_key.*:\s.(?P<aws_secret_access_key>\w*)",
issue_secret_output,
).group("aws_secret_access_key")
)
with reporter.step(f"Aws configure on {loader.ip}"):
configure_input = [
InteractiveInput(prompt_pattern=r"AWS Access Key ID.*", input=aws_access_key_id),
InteractiveInput(prompt_pattern=r"AWS Secret Access Key.*", input=aws_secret_access_key),
InteractiveInput(prompt_pattern=r"AWS Access Key ID.*", input=s3_credentials.access_key),
InteractiveInput(prompt_pattern=r"AWS Secret Access Key.*", input=s3_credentials.secret_key),
InteractiveInput(prompt_pattern=r".*", input=""),
InteractiveInput(prompt_pattern=r".*", input=""),
]
shell.exec("aws configure", CommandOptions(interactive_inputs=configure_input))
loader.get_shell().exec("aws configure", CommandOptions(interactive_inputs=configure_input))
@reporter.step("Init k6 instances")
def init_k6_instances(self, load_params: LoadParams, endpoints: list[str], k6_dir: str):
@ -176,12 +143,10 @@ class DefaultRunner(RunnerBase):
k6_dir,
shell,
loader,
self.loaders_wallet,
self.user,
)
def _get_distributed_load_params_list(
self, original_load_params: LoadParams, workers_count: int
) -> list[LoadParams]:
def _get_distributed_load_params_list(self, original_load_params: LoadParams, workers_count: int) -> list[LoadParams]:
divisor = int(BACKGROUND_LOAD_VUS_COUNT_DIVISOR)
distributed_load_params: list[LoadParams] = []
@ -266,18 +231,20 @@ class LocalRunner(RunnerBase):
loaders: list[Loader]
cluster_state_controller: ClusterStateController
file_keeper: FileKeeper
wallet: WalletInfo
user: User
def __init__(
self,
cluster_state_controller: ClusterStateController,
file_keeper: FileKeeper,
nodes_under_load: list[ClusterNode],
user: User,
) -> None:
self.cluster_state_controller = cluster_state_controller
self.file_keeper = file_keeper
self.loaders = [NodeLoader(node) for node in nodes_under_load]
self.nodes_under_load = nodes_under_load
self.user = user
@run_optionally(optionals.OPTIONAL_BACKGROUND_LOAD_ENABLED)
@reporter.step("Preparation steps")
@ -326,11 +293,9 @@ class LocalRunner(RunnerBase):
shell.exec(f"sudo tar xf {k6_dir}/k6.tar.gz --strip-components 2 -C {k6_dir}")
shell.exec(f"sudo chmod -R 777 {k6_dir}")
with reporter.step("Create empty_passwd"):
self.wallet = WalletInfo(f"{k6_dir}/scenarios/files/wallet.json", "", "/tmp/empty_passwd.yml")
content = yaml.dump({"password": ""})
shell.exec(f'echo "{content}" | sudo tee {self.wallet.config_path}')
shell.exec(f"sudo chmod -R 777 {self.wallet.config_path}")
with reporter.step("chmod 777 wallet related files on loader"):
shell.exec(f"sudo chmod -R 777 {self.user.wallet.config_path}")
shell.exec(f"sudo chmod -R 777 {self.user.wallet.path}")
@reporter.step("Init k6 instances")
def init_k6_instances(self, load_params: LoadParams, endpoints: list[str], k6_dir: str):
@ -363,7 +328,7 @@ class LocalRunner(RunnerBase):
k6_dir,
shell,
loader,
self.wallet,
self.user,
)
def start(self):
@ -453,7 +418,7 @@ class S3LocalRunner(LocalRunner):
k6_dir,
shell,
loader,
self.wallet,
self.user,
)
@run_optionally(optionals.OPTIONAL_BACKGROUND_LOAD_ENABLED)
@ -466,17 +431,10 @@ class S3LocalRunner(LocalRunner):
k6_dir: str,
):
self.k6_dir = k6_dir
with reporter.step("Init s3 client on loaders"):
storage_node = nodes_under_load[0].service(StorageNode)
s3_public_keys = [node.service(S3Gate).get_wallet_public_key() for node in cluster_nodes]
grpc_peer = storage_node.get_rpc_endpoint()
parallel(self.prepare_node, nodes_under_load, k6_dir, load_params, s3_public_keys, grpc_peer)
parallel(self.prepare_node, nodes_under_load, k6_dir, load_params, cluster_nodes)
@reporter.step("Prepare node {cluster_node}")
def prepare_node(
self, cluster_node: ClusterNode, k6_dir: str, load_params: LoadParams, s3_public_keys: list[str], grpc_peer: str
):
def prepare_node(self, cluster_node: ClusterNode, k6_dir: str, load_params: LoadParams, cluster_nodes: list[ClusterNode]):
LocalRunner.prepare_node(self, cluster_node, k6_dir, load_params)
self.endpoints = cluster_node.s3_gate.get_all_endpoints()
shell = cluster_node.host.get_shell()
@ -497,29 +455,9 @@ class S3LocalRunner(LocalRunner):
shell.exec(f"sudo python3 -m pip install -I {k6_dir}/requests.tar.gz")
with reporter.step(f"Init s3 client on {cluster_node.host_ip}"):
frostfs_authmate_exec: FrostfsAuthmate = FrostfsAuthmate(shell, FROSTFS_AUTHMATE_EXEC)
issue_secret_output = frostfs_authmate_exec.secret.issue(
wallet=self.wallet.path,
peer=grpc_peer,
gate_public_key=s3_public_keys,
container_placement_policy=load_params.preset.container_placement_policy,
container_policy=f"{k6_dir}/scenarios/files/policy.json",
wallet_password=self.wallet.password,
).stdout
aws_access_key_id = str(
re.search(r"access_key_id.*:\s.(?P<aws_access_key_id>\w*)", issue_secret_output).group(
"aws_access_key_id"
)
)
aws_secret_access_key = str(
re.search(
r"secret_access_key.*:\s.(?P<aws_secret_access_key>\w*)",
issue_secret_output,
).group("aws_secret_access_key")
)
configure_input = [
InteractiveInput(prompt_pattern=r"AWS Access Key ID.*", input=aws_access_key_id),
InteractiveInput(prompt_pattern=r"AWS Secret Access Key.*", input=aws_secret_access_key),
InteractiveInput(prompt_pattern=r"AWS Access Key ID.*", input=self.user.s3_credentials.access_key),
InteractiveInput(prompt_pattern=r"AWS Secret Access Key.*", input=self.user.s3_credentials.secret_key),
InteractiveInput(prompt_pattern=r".*", input=""),
InteractiveInput(prompt_pattern=r".*", input=""),
]