forked from TrueCloudLab/frostfs-testlib
515 lines
21 KiB
Python
515 lines
21 KiB
Python
import copy
|
|
import itertools
|
|
import math
|
|
import re
|
|
import time
|
|
from dataclasses import fields
|
|
from typing import Optional
|
|
from urllib.parse import urlparse
|
|
|
|
import yaml
|
|
|
|
from frostfs_testlib import reporter
|
|
from frostfs_testlib.cli.frostfs_authmate.authmate import FrostfsAuthmate
|
|
from frostfs_testlib.load.interfaces.loader import Loader
|
|
from frostfs_testlib.load.interfaces.scenario_runner import ScenarioRunner
|
|
from frostfs_testlib.load.k6 import K6
|
|
from frostfs_testlib.load.load_config import K6ProcessAllocationStrategy, LoadParams, LoadType
|
|
from frostfs_testlib.load.loaders import NodeLoader, RemoteLoader
|
|
from frostfs_testlib.resources import optionals
|
|
from frostfs_testlib.resources.cli import FROSTFS_AUTHMATE_EXEC
|
|
from frostfs_testlib.resources.common import STORAGE_USER_NAME
|
|
from frostfs_testlib.resources.load_params import BACKGROUND_LOAD_VUS_COUNT_DIVISOR, LOAD_NODE_SSH_USER, LOAD_NODES
|
|
from frostfs_testlib.shell.command_inspectors import SuInspector
|
|
from frostfs_testlib.shell.interfaces import CommandOptions, InteractiveInput
|
|
from frostfs_testlib.storage.cluster import ClusterNode
|
|
from frostfs_testlib.storage.controllers.cluster_state_controller import ClusterStateController
|
|
from frostfs_testlib.storage.dataclasses.frostfs_services import S3Gate, StorageNode
|
|
from frostfs_testlib.storage.dataclasses.wallet import WalletInfo
|
|
from frostfs_testlib.testing import parallel, run_optionally
|
|
from frostfs_testlib.testing.test_control import retry
|
|
from frostfs_testlib.utils import datetime_utils
|
|
from frostfs_testlib.utils.file_keeper import FileKeeper
|
|
|
|
|
|
class RunnerBase(ScenarioRunner):
|
|
k6_instances: list[K6]
|
|
|
|
@reporter.step("Run preset on loaders")
|
|
def preset(self):
|
|
parallel([k6.preset for k6 in self.k6_instances])
|
|
|
|
@reporter.step("Wait until load finish")
|
|
def wait_until_finish(self, soft_timeout: int = 0):
|
|
parallel([k6.wait_until_finished for k6 in self.k6_instances], soft_timeout=soft_timeout)
|
|
|
|
@property
|
|
def is_running(self):
|
|
futures = parallel([k6.is_running for k6 in self.k6_instances])
|
|
|
|
return any([future.result() for future in futures])
|
|
|
|
def get_k6_instances(self):
|
|
return self.k6_instances
|
|
|
|
|
|
class DefaultRunner(RunnerBase):
|
|
loaders: list[Loader]
|
|
loaders_wallet: WalletInfo
|
|
|
|
def __init__(
|
|
self,
|
|
loaders_wallet: WalletInfo,
|
|
load_ip_list: Optional[list[str]] = None,
|
|
) -> None:
|
|
if load_ip_list is None:
|
|
load_ip_list = LOAD_NODES
|
|
self.loaders = RemoteLoader.from_ip_list(load_ip_list)
|
|
self.loaders_wallet = loaders_wallet
|
|
|
|
@run_optionally(optionals.OPTIONAL_BACKGROUND_LOAD_ENABLED)
|
|
@reporter.step("Preparation steps")
|
|
def prepare(
|
|
self,
|
|
load_params: LoadParams,
|
|
cluster_nodes: list[ClusterNode],
|
|
nodes_under_load: list[ClusterNode],
|
|
k6_dir: str,
|
|
):
|
|
if load_params.load_type != LoadType.S3:
|
|
return
|
|
|
|
with reporter.step("Init s3 client on loaders"):
|
|
storage_node = nodes_under_load[0].service(StorageNode)
|
|
s3_public_keys = [node.service(S3Gate).get_wallet_public_key() for node in cluster_nodes]
|
|
grpc_peer = storage_node.get_rpc_endpoint()
|
|
|
|
parallel(self._prepare_loader, self.loaders, load_params, grpc_peer, s3_public_keys, k6_dir)
|
|
|
|
def _prepare_loader(
|
|
self,
|
|
loader: Loader,
|
|
load_params: LoadParams,
|
|
grpc_peer: str,
|
|
s3_public_keys: list[str],
|
|
k6_dir: str,
|
|
):
|
|
with reporter.step(f"Init s3 client on {loader.ip}"):
|
|
shell = loader.get_shell()
|
|
frostfs_authmate_exec: FrostfsAuthmate = FrostfsAuthmate(shell, FROSTFS_AUTHMATE_EXEC)
|
|
issue_secret_output = frostfs_authmate_exec.secret.issue(
|
|
wallet=self.loaders_wallet.path,
|
|
peer=grpc_peer,
|
|
gate_public_key=s3_public_keys,
|
|
container_placement_policy=load_params.preset.container_placement_policy,
|
|
container_policy=f"{k6_dir}/scenarios/files/policy.json",
|
|
wallet_password=self.loaders_wallet.password,
|
|
).stdout
|
|
aws_access_key_id = str(
|
|
re.search(r"access_key_id.*:\s.(?P<aws_access_key_id>\w*)", issue_secret_output).group(
|
|
"aws_access_key_id"
|
|
)
|
|
)
|
|
aws_secret_access_key = str(
|
|
re.search(
|
|
r"secret_access_key.*:\s.(?P<aws_secret_access_key>\w*)",
|
|
issue_secret_output,
|
|
).group("aws_secret_access_key")
|
|
)
|
|
|
|
configure_input = [
|
|
InteractiveInput(prompt_pattern=r"AWS Access Key ID.*", input=aws_access_key_id),
|
|
InteractiveInput(prompt_pattern=r"AWS Secret Access Key.*", input=aws_secret_access_key),
|
|
InteractiveInput(prompt_pattern=r".*", input=""),
|
|
InteractiveInput(prompt_pattern=r".*", input=""),
|
|
]
|
|
shell.exec("aws configure", CommandOptions(interactive_inputs=configure_input))
|
|
|
|
@reporter.step("Init k6 instances")
|
|
def init_k6_instances(self, load_params: LoadParams, endpoints: list[str], k6_dir: str):
|
|
self.k6_instances = []
|
|
cycled_loaders = itertools.cycle(self.loaders)
|
|
|
|
k6_distribution_count = {
|
|
K6ProcessAllocationStrategy.PER_LOAD_NODE: len(self.loaders),
|
|
K6ProcessAllocationStrategy.PER_ENDPOINT: len(endpoints),
|
|
}
|
|
endpoints_generators = {
|
|
K6ProcessAllocationStrategy.PER_LOAD_NODE: itertools.cycle([endpoints]),
|
|
K6ProcessAllocationStrategy.PER_ENDPOINT: itertools.cycle([[endpoint] for endpoint in endpoints]),
|
|
}
|
|
k6_processes_count = k6_distribution_count[load_params.k6_process_allocation_strategy]
|
|
endpoints_gen = endpoints_generators[load_params.k6_process_allocation_strategy]
|
|
|
|
distributed_load_params_list = self._get_distributed_load_params_list(load_params, k6_processes_count)
|
|
|
|
futures = parallel(
|
|
self._init_k6_instance,
|
|
distributed_load_params_list,
|
|
loader=cycled_loaders,
|
|
endpoints=endpoints_gen,
|
|
k6_dir=k6_dir,
|
|
)
|
|
self.k6_instances = [future.result() for future in futures]
|
|
|
|
def _init_k6_instance(self, load_params_for_loader: LoadParams, loader: Loader, endpoints: list[str], k6_dir: str):
|
|
shell = loader.get_shell()
|
|
with reporter.step(f"Init K6 instance on {loader.ip} for endpoints {endpoints}"):
|
|
with reporter.step(f"Make working directory"):
|
|
shell.exec(f"sudo mkdir -p {load_params_for_loader.working_dir}")
|
|
shell.exec(f"sudo chown {LOAD_NODE_SSH_USER} {load_params_for_loader.working_dir}")
|
|
|
|
return K6(
|
|
load_params_for_loader,
|
|
endpoints,
|
|
k6_dir,
|
|
shell,
|
|
loader,
|
|
self.loaders_wallet,
|
|
)
|
|
|
|
def _get_distributed_load_params_list(
|
|
self, original_load_params: LoadParams, workers_count: int
|
|
) -> list[LoadParams]:
|
|
divisor = int(BACKGROUND_LOAD_VUS_COUNT_DIVISOR)
|
|
distributed_load_params: list[LoadParams] = []
|
|
|
|
for i in range(workers_count):
|
|
load_params = copy.deepcopy(original_load_params)
|
|
# Append #i here in case if multiple k6 processes goes into same load node
|
|
load_params.set_id(f"{load_params.load_id}_{i}")
|
|
distributed_load_params.append(load_params)
|
|
|
|
load_fields = fields(original_load_params)
|
|
|
|
for field in load_fields:
|
|
if (
|
|
field.metadata
|
|
and original_load_params.scenario in field.metadata["applicable_scenarios"]
|
|
and field.metadata["distributed"]
|
|
and getattr(original_load_params, field.name) is not None
|
|
):
|
|
original_value = getattr(original_load_params, field.name)
|
|
distribution = self._get_distribution(math.ceil(original_value / divisor), workers_count)
|
|
for i in range(workers_count):
|
|
setattr(distributed_load_params[i], field.name, distribution[i])
|
|
|
|
return distributed_load_params
|
|
|
|
def _get_distribution(self, clients_count: int, workers_count: int) -> list[int]:
|
|
"""
|
|
This function will distribute evenly as possible X clients to Y workers.
|
|
For example if we have 150 readers (clients) and we want to spread it over 4 load nodes (workers)
|
|
this will return [38, 38, 37, 37].
|
|
|
|
Args:
|
|
clients_count: amount of things needs to be distributed.
|
|
workers_count: amount of workers.
|
|
|
|
Returns:
|
|
list of distribution.
|
|
"""
|
|
if workers_count < 1:
|
|
raise Exception("Workers cannot be less then 1")
|
|
|
|
# Amount of guaranteed payload on one worker
|
|
clients_per_worker = clients_count // workers_count
|
|
# Remainder of clients left to be distributed
|
|
remainder = clients_count - clients_per_worker * workers_count
|
|
|
|
distribution = [clients_per_worker + 1 if i < remainder else clients_per_worker for i in range(workers_count)]
|
|
return distribution
|
|
|
|
def start(self):
|
|
load_params = self.k6_instances[0].load_params
|
|
|
|
parallel([k6.start for k6 in self.k6_instances])
|
|
|
|
wait_after_start_time = datetime_utils.parse_time(load_params.setup_timeout) + 5
|
|
with reporter.step(f"Wait for start timeout + couple more seconds ({wait_after_start_time}) before moving on"):
|
|
time.sleep(wait_after_start_time)
|
|
|
|
def stop(self):
|
|
for k6_instance in self.k6_instances:
|
|
k6_instance.stop()
|
|
|
|
def get_results(self) -> dict:
|
|
results = {}
|
|
for k6_instance in self.k6_instances:
|
|
if k6_instance.load_params.k6_process_allocation_strategy is None:
|
|
raise RuntimeError("k6_process_allocation_strategy should not be none")
|
|
|
|
result = k6_instance.get_results()
|
|
endpoint = urlparse(k6_instance.endpoints[0]).netloc or k6_instance.endpoints[0]
|
|
keys_map = {
|
|
K6ProcessAllocationStrategy.PER_LOAD_NODE: k6_instance.loader.ip,
|
|
K6ProcessAllocationStrategy.PER_ENDPOINT: endpoint,
|
|
}
|
|
key = keys_map[k6_instance.load_params.k6_process_allocation_strategy]
|
|
results[key] = result
|
|
|
|
return results
|
|
|
|
|
|
class LocalRunner(RunnerBase):
|
|
loaders: list[Loader]
|
|
cluster_state_controller: ClusterStateController
|
|
file_keeper: FileKeeper
|
|
wallet: WalletInfo
|
|
|
|
def __init__(
|
|
self,
|
|
cluster_state_controller: ClusterStateController,
|
|
file_keeper: FileKeeper,
|
|
nodes_under_load: list[ClusterNode],
|
|
) -> None:
|
|
self.cluster_state_controller = cluster_state_controller
|
|
self.file_keeper = file_keeper
|
|
self.loaders = [NodeLoader(node) for node in nodes_under_load]
|
|
self.nodes_under_load = nodes_under_load
|
|
|
|
@run_optionally(optionals.OPTIONAL_BACKGROUND_LOAD_ENABLED)
|
|
@reporter.step("Preparation steps")
|
|
def prepare(
|
|
self,
|
|
load_params: LoadParams,
|
|
cluster_nodes: list[ClusterNode],
|
|
nodes_under_load: list[ClusterNode],
|
|
k6_dir: str,
|
|
):
|
|
parallel(self.prepare_node, nodes_under_load, k6_dir, load_params)
|
|
|
|
@retry(3, 5, expected_result=True)
|
|
def allow_user_to_login_in_system(self, cluster_node: ClusterNode):
|
|
shell = cluster_node.host.get_shell()
|
|
|
|
result = None
|
|
try:
|
|
shell.exec(f"sudo chsh -s /bin/bash {STORAGE_USER_NAME}")
|
|
self.lock_passwd_on_node(cluster_node)
|
|
options = CommandOptions(check=False, extra_inspectors=[SuInspector(STORAGE_USER_NAME)])
|
|
result = shell.exec("whoami", options)
|
|
finally:
|
|
if not result or result.return_code:
|
|
self.restore_passwd_on_node(cluster_node)
|
|
return False
|
|
|
|
return True
|
|
|
|
@reporter.step("Prepare node {cluster_node}")
|
|
def prepare_node(self, cluster_node: ClusterNode, k6_dir: str, load_params: LoadParams):
|
|
shell = cluster_node.host.get_shell()
|
|
|
|
with reporter.step("Allow storage user to login into system"):
|
|
self.allow_user_to_login_in_system(cluster_node)
|
|
|
|
with reporter.step("Update limits.conf"):
|
|
limits_path = "/etc/security/limits.conf"
|
|
self.file_keeper.add(cluster_node.storage_node, limits_path)
|
|
content = f"{STORAGE_USER_NAME} hard nofile 65536\n{STORAGE_USER_NAME} soft nofile 65536\n"
|
|
shell.exec(f"echo '{content}' | sudo tee {limits_path}")
|
|
|
|
with reporter.step("Download K6"):
|
|
shell.exec(f"sudo rm -rf {k6_dir};sudo mkdir {k6_dir}")
|
|
shell.exec(f"sudo curl -so {k6_dir}/k6.tar.gz {load_params.k6_url}")
|
|
shell.exec(f"sudo tar xf {k6_dir}/k6.tar.gz -C {k6_dir}")
|
|
shell.exec(f"sudo chmod -R 777 {k6_dir}")
|
|
|
|
with reporter.step("Create empty_passwd"):
|
|
self.wallet = WalletInfo(f"{k6_dir}/scenarios/files/wallet.json", "", "/tmp/empty_passwd.yml")
|
|
content = yaml.dump({"password": ""})
|
|
shell.exec(f'echo "{content}" | sudo tee {self.wallet.config_path}')
|
|
shell.exec(f"sudo chmod -R 777 {self.wallet.config_path}")
|
|
|
|
@reporter.step("Init k6 instances")
|
|
def init_k6_instances(self, load_params: LoadParams, endpoints: list[str], k6_dir: str):
|
|
self.k6_instances = []
|
|
futures = parallel(
|
|
self._init_k6_instance,
|
|
self.loaders,
|
|
load_params,
|
|
k6_dir,
|
|
)
|
|
self.k6_instances = [future.result() for future in futures]
|
|
|
|
def _init_k6_instance(self, loader: Loader, load_params: LoadParams, k6_dir: str):
|
|
shell = loader.get_shell()
|
|
with reporter.step(f"Init K6 instance on {loader.ip}"):
|
|
with reporter.step(f"Make working directory"):
|
|
shell.exec(f"sudo mkdir -p {load_params.working_dir}")
|
|
# If we chmod /home/<user_name> folder we can no longer ssh to the node
|
|
# !! IMPORTANT !!
|
|
if (
|
|
load_params.working_dir
|
|
and not load_params.working_dir == f"/home/{LOAD_NODE_SSH_USER}"
|
|
and not load_params.working_dir == f"/home/{LOAD_NODE_SSH_USER}/"
|
|
):
|
|
shell.exec(f"sudo chmod -R 777 {load_params.working_dir}")
|
|
|
|
return K6(
|
|
load_params,
|
|
["localhost:8080"],
|
|
k6_dir,
|
|
shell,
|
|
loader,
|
|
self.wallet,
|
|
)
|
|
|
|
def start(self):
|
|
load_params = self.k6_instances[0].load_params
|
|
|
|
self.cluster_state_controller.stop_services_of_type(S3Gate)
|
|
self.cluster_state_controller.stop_services_of_type(StorageNode)
|
|
|
|
parallel([k6.start for k6 in self.k6_instances])
|
|
|
|
wait_after_start_time = datetime_utils.parse_time(load_params.setup_timeout) + 5
|
|
with reporter.step(f"Wait for start timeout + couple more seconds ({wait_after_start_time}) before moving on"):
|
|
time.sleep(wait_after_start_time)
|
|
|
|
@reporter.step("Restore passwd on {cluster_node}")
|
|
def restore_passwd_on_node(self, cluster_node: ClusterNode):
|
|
shell = cluster_node.host.get_shell()
|
|
shell.exec("sudo chattr -i /etc/passwd")
|
|
|
|
@reporter.step("Lock passwd on {cluster_node}")
|
|
def lock_passwd_on_node(self, cluster_node: ClusterNode):
|
|
shell = cluster_node.host.get_shell()
|
|
shell.exec("sudo chattr +i /etc/passwd")
|
|
|
|
def stop(self):
|
|
for k6_instance in self.k6_instances:
|
|
k6_instance.stop()
|
|
|
|
self.cluster_state_controller.start_all_stopped_services()
|
|
|
|
def get_results(self) -> dict:
|
|
results = {}
|
|
for k6_instance in self.k6_instances:
|
|
result = k6_instance.get_results()
|
|
results[k6_instance.loader.ip] = result
|
|
|
|
parallel(self.restore_passwd_on_node, self.nodes_under_load)
|
|
|
|
return results
|
|
|
|
|
|
class S3LocalRunner(LocalRunner):
|
|
endpoints: list[str]
|
|
k6_dir: str
|
|
|
|
@reporter.step("Run preset on loaders")
|
|
def preset(self):
|
|
LocalRunner.preset(self)
|
|
with reporter.step(f"Resolve containers in preset"):
|
|
parallel(self._resolve_containers_in_preset, self.k6_instances)
|
|
|
|
@reporter.step("Resolve containers in preset")
|
|
def _resolve_containers_in_preset(self, k6_instance: K6):
|
|
k6_instance.shell.exec(
|
|
f"sudo {self.k6_dir}/scenarios/preset/resolve_containers_in_preset.py --endpoint {k6_instance.endpoints[0]} --preset_file {k6_instance.load_params.preset.pregen_json}"
|
|
)
|
|
|
|
@reporter.step("Init k6 instances")
|
|
def init_k6_instances(self, load_params: LoadParams, endpoints: list[str], k6_dir: str):
|
|
self.k6_instances = []
|
|
futures = parallel(
|
|
self._init_k6_instance_,
|
|
self.loaders,
|
|
load_params,
|
|
endpoints,
|
|
k6_dir,
|
|
)
|
|
self.k6_instances = [future.result() for future in futures]
|
|
|
|
def _init_k6_instance_(self, loader: Loader, load_params: LoadParams, endpoints: list[str], k6_dir: str):
|
|
shell = loader.get_shell()
|
|
with reporter.step(f"Init K6 instance on {loader.ip} for endpoints {endpoints}"):
|
|
with reporter.step(f"Make working directory"):
|
|
shell.exec(f"sudo mkdir -p {load_params.working_dir}")
|
|
# If we chmod /home/<user_name> folder we can no longer ssh to the node
|
|
# !! IMPORTANT !!
|
|
if (
|
|
load_params.working_dir
|
|
and not load_params.working_dir == f"/home/{LOAD_NODE_SSH_USER}"
|
|
and not load_params.working_dir == f"/home/{LOAD_NODE_SSH_USER}/"
|
|
):
|
|
shell.exec(f"sudo chmod -R 777 {load_params.working_dir}")
|
|
|
|
return K6(
|
|
load_params,
|
|
self.endpoints,
|
|
k6_dir,
|
|
shell,
|
|
loader,
|
|
self.wallet,
|
|
)
|
|
|
|
@run_optionally(optionals.OPTIONAL_BACKGROUND_LOAD_ENABLED)
|
|
@reporter.step("Preparation steps")
|
|
def prepare(
|
|
self,
|
|
load_params: LoadParams,
|
|
cluster_nodes: list[ClusterNode],
|
|
nodes_under_load: list[ClusterNode],
|
|
k6_dir: str,
|
|
):
|
|
self.k6_dir = k6_dir
|
|
with reporter.step("Init s3 client on loaders"):
|
|
storage_node = nodes_under_load[0].service(StorageNode)
|
|
s3_public_keys = [node.service(S3Gate).get_wallet_public_key() for node in cluster_nodes]
|
|
grpc_peer = storage_node.get_rpc_endpoint()
|
|
|
|
parallel(self.prepare_node, nodes_under_load, k6_dir, load_params, s3_public_keys, grpc_peer)
|
|
|
|
@reporter.step("Prepare node {cluster_node}")
|
|
def prepare_node(
|
|
self, cluster_node: ClusterNode, k6_dir: str, load_params: LoadParams, s3_public_keys: list[str], grpc_peer: str
|
|
):
|
|
LocalRunner.prepare_node(self, cluster_node, k6_dir, load_params)
|
|
self.endpoints = cluster_node.s3_gate.get_all_endpoints()
|
|
shell = cluster_node.host.get_shell()
|
|
|
|
with reporter.step("Uninstall previous installation of aws cli"):
|
|
shell.exec(f"sudo rm -rf /usr/local/aws-cli")
|
|
shell.exec(f"sudo rm -rf /usr/local/bin/aws")
|
|
shell.exec(f"sudo rm -rf /usr/local/bin/aws_completer")
|
|
|
|
with reporter.step("Install aws cli"):
|
|
shell.exec(f"sudo curl {load_params.awscli_url} -o {k6_dir}/awscliv2.zip")
|
|
shell.exec(f"sudo unzip -q {k6_dir}/awscliv2.zip -d {k6_dir}")
|
|
shell.exec(f"sudo {k6_dir}/aws/install")
|
|
|
|
with reporter.step("Install requests python module"):
|
|
shell.exec(f"sudo apt-get -y install python3-pip")
|
|
shell.exec(f"sudo curl -so {k6_dir}/requests.tar.gz {load_params.requests_module_url}")
|
|
shell.exec(f"sudo python3 -m pip install -I {k6_dir}/requests.tar.gz")
|
|
|
|
with reporter.step(f"Init s3 client on {cluster_node.host_ip}"):
|
|
frostfs_authmate_exec: FrostfsAuthmate = FrostfsAuthmate(shell, FROSTFS_AUTHMATE_EXEC)
|
|
issue_secret_output = frostfs_authmate_exec.secret.issue(
|
|
wallet=self.wallet.path,
|
|
peer=grpc_peer,
|
|
gate_public_key=s3_public_keys,
|
|
container_placement_policy=load_params.preset.container_placement_policy,
|
|
container_policy=f"{k6_dir}/scenarios/files/policy.json",
|
|
wallet_password=self.wallet.password,
|
|
).stdout
|
|
aws_access_key_id = str(
|
|
re.search(r"access_key_id.*:\s.(?P<aws_access_key_id>\w*)", issue_secret_output).group(
|
|
"aws_access_key_id"
|
|
)
|
|
)
|
|
aws_secret_access_key = str(
|
|
re.search(
|
|
r"secret_access_key.*:\s.(?P<aws_secret_access_key>\w*)",
|
|
issue_secret_output,
|
|
).group("aws_secret_access_key")
|
|
)
|
|
configure_input = [
|
|
InteractiveInput(prompt_pattern=r"AWS Access Key ID.*", input=aws_access_key_id),
|
|
InteractiveInput(prompt_pattern=r"AWS Secret Access Key.*", input=aws_secret_access_key),
|
|
InteractiveInput(prompt_pattern=r".*", input=""),
|
|
InteractiveInput(prompt_pattern=r".*", input=""),
|
|
]
|
|
shell.exec("aws configure", CommandOptions(interactive_inputs=configure_input))
|