s3local.js scenario

Signed-off-by: m.malygina <m.malygina@yadro.com>
This commit is contained in:
m.malygina 2023-11-14 14:00:08 +03:00
parent 6519cfafc9
commit 61a1b28652
5 changed files with 150 additions and 6 deletions

View file

@ -11,4 +11,4 @@ from frostfs_testlib.load.load_config import (
) )
from frostfs_testlib.load.load_report import LoadReport from frostfs_testlib.load.load_report import LoadReport
from frostfs_testlib.load.loaders import NodeLoader, RemoteLoader from frostfs_testlib.load.loaders import NodeLoader, RemoteLoader
from frostfs_testlib.load.runners import DefaultRunner, LocalRunner from frostfs_testlib.load.runners import DefaultRunner, LocalRunner, S3LocalRunner

View file

@ -20,6 +20,7 @@ class LoadScenario(Enum):
S3 = "s3" S3 = "s3"
S3_CAR = "s3_car" S3_CAR = "s3_car"
S3_MULTIPART = "s3_multipart" S3_MULTIPART = "s3_multipart"
S3_LOCAL = "s3local"
HTTP = "http" HTTP = "http"
VERIFY = "verify" VERIFY = "verify"
LOCAL = "local" LOCAL = "local"
@ -38,11 +39,12 @@ all_load_scenarios = [
LoadScenario.S3_CAR, LoadScenario.S3_CAR,
LoadScenario.gRPC_CAR, LoadScenario.gRPC_CAR,
LoadScenario.LOCAL, LoadScenario.LOCAL,
LoadScenario.S3_MULTIPART LoadScenario.S3_MULTIPART,
LoadScenario.S3_LOCAL
] ]
all_scenarios = all_load_scenarios.copy() + [LoadScenario.VERIFY] all_scenarios = all_load_scenarios.copy() + [LoadScenario.VERIFY]
constant_vus_scenarios = [LoadScenario.gRPC, LoadScenario.S3, LoadScenario.HTTP, LoadScenario.LOCAL, LoadScenario.S3_MULTIPART] constant_vus_scenarios = [LoadScenario.gRPC, LoadScenario.S3, LoadScenario.HTTP, LoadScenario.LOCAL, LoadScenario.S3_MULTIPART, LoadScenario.S3_LOCAL]
constant_arrival_rate_scenarios = [LoadScenario.gRPC_CAR, LoadScenario.S3_CAR] constant_arrival_rate_scenarios = [LoadScenario.gRPC_CAR, LoadScenario.S3_CAR]
grpc_preset_scenarios = [ grpc_preset_scenarios = [
@ -51,7 +53,7 @@ grpc_preset_scenarios = [
LoadScenario.gRPC_CAR, LoadScenario.gRPC_CAR,
LoadScenario.LOCAL, LoadScenario.LOCAL,
] ]
s3_preset_scenarios = [LoadScenario.S3, LoadScenario.S3_CAR, LoadScenario.S3_MULTIPART] s3_preset_scenarios = [LoadScenario.S3, LoadScenario.S3_CAR, LoadScenario.S3_MULTIPART, LoadScenario.S3_LOCAL]
@dataclass @dataclass
@ -172,9 +174,13 @@ class LoadParams:
preset: Optional[Preset] = None preset: Optional[Preset] = None
# K6 download url # K6 download url
k6_url: Optional[str] = None k6_url: Optional[str] = None
# Requests module url
requests_module_url: Optional[str] = None
# aws cli download url
awscli_url: Optional[str] = None
# No ssl verification flag # No ssl verification flag
no_verify_ssl: Optional[bool] = metadata_field( no_verify_ssl: Optional[bool] = metadata_field(
[LoadScenario.S3, LoadScenario.S3_CAR, LoadScenario.S3_MULTIPART, LoadScenario.VERIFY, LoadScenario.HTTP], [LoadScenario.S3, LoadScenario.S3_CAR, LoadScenario.S3_MULTIPART, LoadScenario.S3_LOCAL, LoadScenario.VERIFY, LoadScenario.HTTP],
"no-verify-ssl", "no-verify-ssl",
"NO_VERIFY_SSL", "NO_VERIFY_SSL",
False, False,
@ -283,7 +289,9 @@ class LoadParams:
# ------- LOCAL SCENARIO PARAMS ------- # ------- LOCAL SCENARIO PARAMS -------
# Config file location (filled automatically) # Config file location (filled automatically)
config_file: Optional[str] = metadata_field([LoadScenario.LOCAL], None, "CONFIG_FILE", False) config_file: Optional[str] = metadata_field([LoadScenario.LOCAL, LoadScenario.S3_LOCAL], None, "CONFIG_FILE", False)
# Config directory location (filled automatically)
config_dir: Optional[str] = metadata_field([LoadScenario.S3_LOCAL], None, "CONFIG_DIR", False)
def set_id(self, load_id): def set_id(self, load_id):
self.load_id = load_id self.load_id = load_id

View file

@ -165,6 +165,14 @@ class S3Metrics(MetricsBase):
_DELETE_ERRORS = "aws_obj_delete_fails" _DELETE_ERRORS = "aws_obj_delete_fails"
_DELETE_LATENCY = "aws_obj_delete_duration" _DELETE_LATENCY = "aws_obj_delete_duration"
class S3LocalMetrics(MetricsBase):
_WRITE_SUCCESS = "s3local_obj_put_total"
_WRITE_ERRORS = "s3local_obj_put_fails"
_WRITE_LATENCY = "s3local_obj_put_duration"
_READ_SUCCESS = "s3local_obj_get_total"
_READ_ERRORS = "s3local_obj_get_fails"
_READ_LATENCY = "s3local_obj_get_duration"
class LocalMetrics(MetricsBase): class LocalMetrics(MetricsBase):
_WRITE_SUCCESS = "local_obj_put_total" _WRITE_SUCCESS = "local_obj_put_total"
@ -197,6 +205,7 @@ def get_metrics_object(load_type: LoadScenario, summary: dict[str, Any]) -> Metr
LoadScenario.S3: S3Metrics, LoadScenario.S3: S3Metrics,
LoadScenario.S3_CAR: S3Metrics, LoadScenario.S3_CAR: S3Metrics,
LoadScenario.S3_MULTIPART: S3Metrics, LoadScenario.S3_MULTIPART: S3Metrics,
LoadScenario.S3_LOCAL: S3LocalMetrics,
LoadScenario.VERIFY: VerifyMetrics, LoadScenario.VERIFY: VerifyMetrics,
LoadScenario.LOCAL: LocalMetrics, LoadScenario.LOCAL: LocalMetrics,
} }

View file

@ -97,6 +97,7 @@ class LoadReport:
LoadScenario.gRPC_CAR: "open model", LoadScenario.gRPC_CAR: "open model",
LoadScenario.S3_CAR: "open model", LoadScenario.S3_CAR: "open model",
LoadScenario.LOCAL: "local fill", LoadScenario.LOCAL: "local fill",
LoadScenario.S3_LOCAL: "local fill"
} }
return model_map[self.load_params.scenario] return model_map[self.load_params.scenario]

View file

@ -390,3 +390,129 @@ class LocalRunner(RunnerBase):
parallel(self.restore_passwd_on_node, self.nodes_under_load) parallel(self.restore_passwd_on_node, self.nodes_under_load)
return results return results
class S3LocalRunner(LocalRunner):
endpoints: list[str]
k6_dir: str
@reporter.step_deco("Run preset on loaders")
def preset(self):
LocalRunner.preset(self)
with reporter.step(f"Resolve containers in preset"):
parallel(self._resolve_containers_in_preset, self.k6_instances)
@reporter.step_deco("Resolve containers in preset")
def _resolve_containers_in_preset(self, k6_instance: K6):
k6_instance.shell.exec(
f"sudo {self.k6_dir}/scenarios/preset/resolve_containers_in_preset.py --endpoint {k6_instance.endpoints[0]} --preset_file {k6_instance.load_params.preset.pregen_json}")
@reporter.step_deco("Init k6 instances")
def init_k6_instances(self, load_params: LoadParams, endpoints: list[str], k6_dir: str):
self.k6_instances = []
futures = parallel(
self._init_k6_instance_,
self.loaders,
load_params,
endpoints,
k6_dir,
)
self.k6_instances = [future.result() for future in futures]
def _init_k6_instance_(self, loader: Loader, load_params: LoadParams, endpoints: list[str], k6_dir: str):
shell = loader.get_shell()
with reporter.step(f"Init K6 instance on {loader.ip} for endpoints {endpoints}"):
with reporter.step(f"Make working directory"):
shell.exec(f"sudo mkdir -p {load_params.working_dir}")
# If we chmod /home/<user_name> folder we can no longer ssh to the node
# !! IMPORTANT !!
if (
load_params.working_dir
and not load_params.working_dir == f"/home/{LOAD_NODE_SSH_USER}"
and not load_params.working_dir == f"/home/{LOAD_NODE_SSH_USER}/"
):
shell.exec(f"sudo chmod -R 777 {load_params.working_dir}")
return K6(
load_params,
self.endpoints,
k6_dir,
shell,
loader,
self.wallet,
)
@run_optionally(optionals.OPTIONAL_BACKGROUND_LOAD_ENABLED)
@reporter.step_deco("Preparation steps")
def prepare(
self,
load_params: LoadParams,
cluster_nodes: list[ClusterNode],
nodes_under_load: list[ClusterNode],
k6_dir: str,
):
self.k6_dir = k6_dir
with reporter.step("Init s3 client on loaders"):
storage_node = nodes_under_load[0].service(StorageNode)
s3_public_keys = [
node.service(S3Gate).get_wallet_public_key() for node in cluster_nodes
]
grpc_peer = storage_node.get_rpc_endpoint()
parallel(self.prepare_node, nodes_under_load, k6_dir, load_params, s3_public_keys, grpc_peer)
@reporter.step_deco("Prepare node {cluster_node}")
def prepare_node(self,
cluster_node: ClusterNode,
k6_dir: str,
load_params: LoadParams,
s3_public_keys: list[str],
grpc_peer: str):
LocalRunner.prepare_node(self,cluster_node, k6_dir, load_params)
self.endpoints = cluster_node.s3_gate.get_all_endpoints()
shell = cluster_node.host.get_shell()
with reporter.step("Uninstall previous installation of aws cli"):
shell.exec(f"sudo rm -rf /usr/local/aws-cli")
shell.exec(f"sudo rm -rf /usr/local/bin/aws")
shell.exec(f"sudo rm -rf /usr/local/bin/aws_completer")
with reporter.step("Install aws cli"):
shell.exec(f"sudo curl {load_params.awscli_url} -o {k6_dir}/awscliv2.zip")
shell.exec(f"sudo unzip -q {k6_dir}/awscliv2.zip -d {k6_dir}")
shell.exec(f"sudo {k6_dir}/aws/install")
with reporter.step("Install requests python module"):
shell.exec(f"sudo apt-get -y install python3-pip")
shell.exec(f"sudo curl -so {k6_dir}/requests.tar.gz {load_params.requests_module_url}")
shell.exec(f"sudo python3 -m pip install -I {k6_dir}/requests.tar.gz")
with reporter.step(f"Init s3 client on {cluster_node.host_ip}"):
frostfs_authmate_exec: FrostfsAuthmate = FrostfsAuthmate(shell, FROSTFS_AUTHMATE_EXEC)
issue_secret_output = frostfs_authmate_exec.secret.issue(
wallet=self.wallet.path,
peer=grpc_peer,
gate_public_key=s3_public_keys,
container_placement_policy=load_params.preset.container_placement_policy,
container_policy=f"{k6_dir}/scenarios/files/policy.json",
wallet_password=self.wallet.password,
).stdout
aws_access_key_id = str(
re.search(
r"access_key_id.*:\s.(?P<aws_access_key_id>\w*)", issue_secret_output
).group("aws_access_key_id")
)
aws_secret_access_key = str(
re.search(
r"secret_access_key.*:\s.(?P<aws_secret_access_key>\w*)",
issue_secret_output,
).group("aws_secret_access_key")
)
configure_input = [
InteractiveInput(prompt_pattern=r"AWS Access Key ID.*", input=aws_access_key_id),
InteractiveInput(
prompt_pattern=r"AWS Secret Access Key.*", input=aws_secret_access_key
),
InteractiveInput(prompt_pattern=r".*", input=""),
InteractiveInput(prompt_pattern=r".*", input=""),
]
shell.exec("aws configure", CommandOptions(interactive_inputs=configure_input))