Compare commits
12 commits
master
...
grpc-metri
Author | SHA1 | Date | |
---|---|---|---|
2cffff3ffe | |||
d9f4e88f94 | |||
deb2f12bec | |||
f236c1b083 | |||
cc13a43bec | |||
a74d1bff4f | |||
547f6106ec | |||
c2aa41e5dc | |||
8e446ccb96 | |||
9c9fb7878a | |||
3a799afdcf | |||
b610e04a7b |
16 changed files with 307 additions and 10 deletions
|
@ -9,6 +9,8 @@ class FrostfsCliContainer(CliCommand):
|
|||
self,
|
||||
rpc_endpoint: str,
|
||||
wallet: Optional[str] = None,
|
||||
nns_zone: Optional[str] = None,
|
||||
nns_name: Optional[str] = None,
|
||||
address: Optional[str] = None,
|
||||
attributes: Optional[dict] = None,
|
||||
basic_acl: Optional[str] = None,
|
||||
|
@ -45,6 +47,8 @@ class FrostfsCliContainer(CliCommand):
|
|||
wallet: WIF (NEP-2) string or path to the wallet or binary key.
|
||||
xhdr: Dict with request X-Headers.
|
||||
timeout: Timeout for the operation (default 15s).
|
||||
nns_zone: Container nns zone attribute.
|
||||
nns_name: Container nns name attribute.
|
||||
|
||||
Returns:
|
||||
Command's result.
|
||||
|
|
|
@ -79,3 +79,154 @@ class FrostfsCliControl(CliCommand):
|
|||
"control drop-objects",
|
||||
**{param: value for param, value in locals().items() if param not in ["self"]},
|
||||
)
|
||||
|
||||
def add_rule(
|
||||
self,
|
||||
endpoint: str,
|
||||
chain_id: str,
|
||||
target_name: str,
|
||||
target_type: str,
|
||||
rule: Optional[list[str]] = None,
|
||||
path: Optional[str] = None,
|
||||
chain_id_hex: Optional[bool] = None,
|
||||
wallet: Optional[str] = None,
|
||||
address: Optional[str] = None,
|
||||
timeout: Optional[str] = None,
|
||||
) -> CommandResult:
|
||||
"""Drop objects from the node's local storage
|
||||
|
||||
Args:
|
||||
address: Address of wallet account
|
||||
chain-id: Assign ID to the parsed chain
|
||||
chain-id-hex: Flag to parse chain ID as hex
|
||||
endpoint: Remote node control address (as 'multiaddr' or '<host>:<port>')
|
||||
path: Path to encoded chain in JSON or binary format
|
||||
rule: Rule statement
|
||||
target-name: Resource name in APE resource name format
|
||||
target-type: Resource type(container/namespace)
|
||||
timeout: Timeout for an operation (default 15s)
|
||||
wallet: Path to the wallet or binary key
|
||||
|
||||
Returns:
|
||||
Command`s result.
|
||||
"""
|
||||
return self._execute(
|
||||
"control add-rule",
|
||||
**{param: value for param, value in locals().items() if param not in ["self"]},
|
||||
)
|
||||
|
||||
def get_rule(
|
||||
self,
|
||||
endpoint: str,
|
||||
chain_id: str,
|
||||
target_name: str,
|
||||
target_type: str,
|
||||
chain_id_hex: Optional[bool] = None,
|
||||
wallet: Optional[str] = None,
|
||||
address: Optional[str] = None,
|
||||
timeout: Optional[str] = None,
|
||||
) -> CommandResult:
|
||||
"""Drop objects from the node's local storage
|
||||
|
||||
Args:
|
||||
address string Address of wallet account
|
||||
chain-id string Chain id
|
||||
chain-id-hex Flag to parse chain ID as hex
|
||||
endpoint string Remote node control address (as 'multiaddr' or '<host>:<port>')
|
||||
target-name string Resource name in APE resource name format
|
||||
target-type string Resource type(container/namespace)
|
||||
timeout duration Timeout for an operation (default 15s)
|
||||
wallet string Path to the wallet or binary key
|
||||
|
||||
Returns:
|
||||
Command`s result.
|
||||
"""
|
||||
return self._execute(
|
||||
"control get-rule",
|
||||
**{param: value for param, value in locals().items() if param not in ["self"]},
|
||||
)
|
||||
|
||||
def list_rules(
|
||||
self,
|
||||
endpoint: str,
|
||||
target_name: str,
|
||||
target_type: str,
|
||||
wallet: Optional[str] = None,
|
||||
address: Optional[str] = None,
|
||||
timeout: Optional[str] = None,
|
||||
) -> CommandResult:
|
||||
"""Drop objects from the node's local storage
|
||||
|
||||
Args:
|
||||
address: Address of wallet account
|
||||
endpoint: Remote node control address (as 'multiaddr' or '<host>:<port>')
|
||||
target-name: Resource name in APE resource name format
|
||||
target-type: Resource type(container/namespace)
|
||||
timeout: Timeout for an operation (default 15s)
|
||||
wallet: Path to the wallet or binary key
|
||||
|
||||
Returns:
|
||||
Command`s result.
|
||||
"""
|
||||
return self._execute(
|
||||
"control list-rules",
|
||||
**{param: value for param, value in locals().items() if param not in ["self"]},
|
||||
)
|
||||
|
||||
def list_targets(
|
||||
self,
|
||||
endpoint: str,
|
||||
chain_name: str,
|
||||
wallet: Optional[str] = None,
|
||||
address: Optional[str] = None,
|
||||
timeout: Optional[str] = None,
|
||||
) -> CommandResult:
|
||||
"""Drop objects from the node's local storage
|
||||
|
||||
Args:
|
||||
address: Address of wallet account
|
||||
chain-name: Chain name(ingress|s3)
|
||||
endpoint: Remote node control address (as 'multiaddr' or '<host>:<port>')
|
||||
timeout: Timeout for an operation (default 15s)
|
||||
wallet: Path to the wallet or binary key
|
||||
|
||||
Returns:
|
||||
Command`s result.
|
||||
"""
|
||||
return self._execute(
|
||||
"control list-targets",
|
||||
**{param: value for param, value in locals().items() if param not in ["self"]},
|
||||
)
|
||||
|
||||
def remove_rule(
|
||||
self,
|
||||
endpoint: str,
|
||||
chain_id: str,
|
||||
target_name: str,
|
||||
target_type: str,
|
||||
all: Optional[bool] = None,
|
||||
chain_id_hex: Optional[bool] = None,
|
||||
wallet: Optional[str] = None,
|
||||
address: Optional[str] = None,
|
||||
timeout: Optional[str] = None,
|
||||
) -> CommandResult:
|
||||
"""Drop objects from the node's local storage
|
||||
|
||||
Args:
|
||||
address: Address of wallet account
|
||||
all: Remove all chains
|
||||
chain-id: Assign ID to the parsed chain
|
||||
chain-id-hex: Flag to parse chain ID as hex
|
||||
endpoint: Remote node control address (as 'multiaddr' or '<host>:<port>')
|
||||
target-name: Resource name in APE resource name format
|
||||
target-type: Resource type(container/namespace)
|
||||
timeout: Timeout for an operation (default 15s)
|
||||
wallet: Path to the wallet or binary key
|
||||
|
||||
Returns:
|
||||
Command`s result.
|
||||
"""
|
||||
return self._execute(
|
||||
"control remove-rule",
|
||||
**{param: value for param, value in locals().items() if param not in ["self"]},
|
||||
)
|
||||
|
|
|
@ -27,3 +27,27 @@ class FrostfsCliTree(CliCommand):
|
|||
"tree healthcheck",
|
||||
**{param: value for param, value in locals().items() if param not in ["self"]},
|
||||
)
|
||||
|
||||
def list(
|
||||
self,
|
||||
cid: str,
|
||||
rpc_endpoint: Optional[str] = None,
|
||||
wallet: Optional[str] = None,
|
||||
timeout: Optional[str] = None,
|
||||
) -> CommandResult:
|
||||
"""Get Tree List
|
||||
|
||||
Args:
|
||||
cid: Container ID.
|
||||
rpc_endpoint: Remote node address (as 'multiaddr' or '<host>:<port>').
|
||||
wallet: WIF (NEP-2) string or path to the wallet or binary key.
|
||||
timeout: duration Timeout for the operation (default 15 s)
|
||||
|
||||
Returns:
|
||||
Command's result.
|
||||
|
||||
"""
|
||||
return self._execute(
|
||||
"tree list",
|
||||
**{param: value for param, value in locals().items() if param not in ["self"]},
|
||||
)
|
||||
|
|
|
@ -26,7 +26,7 @@ class S3CredentialsProvider(ABC):
|
|||
self.cluster = cluster
|
||||
|
||||
@abstractmethod
|
||||
def provide(self, user: User, cluster_node: ClusterNode, location_constraints: Optional[str] = None) -> S3Credentials:
|
||||
def provide(self, user: User, cluster_node: ClusterNode, location_constraints: Optional[str] = None, **kwargs) -> S3Credentials:
|
||||
raise NotImplementedError("Directly called abstract class?")
|
||||
|
||||
|
||||
|
@ -35,7 +35,7 @@ class GrpcCredentialsProvider(ABC):
|
|||
self.cluster = cluster
|
||||
|
||||
@abstractmethod
|
||||
def provide(self, user: User, cluster_node: ClusterNode) -> WalletInfo:
|
||||
def provide(self, user: User, cluster_node: ClusterNode, **kwargs) -> WalletInfo:
|
||||
raise NotImplementedError("Directly called abstract class?")
|
||||
|
||||
|
||||
|
|
|
@ -47,6 +47,14 @@ class BasicHealthcheck(Healthcheck):
|
|||
|
||||
self._perform(cluster_node, checks)
|
||||
|
||||
@wait_for_success(900, 30, title="Wait for tree healthcheck on {cluster_node}")
|
||||
def tree_healthcheck(self, cluster_node: ClusterNode) -> str | None:
|
||||
checks = {
|
||||
self._tree_healthcheck: {},
|
||||
}
|
||||
|
||||
self._perform(cluster_node, checks)
|
||||
|
||||
@wait_for_success(120, 5, title="Wait for service healthcheck on {cluster_node}")
|
||||
def services_healthcheck(self, cluster_node: ClusterNode):
|
||||
svcs_to_check = cluster_node.services
|
||||
|
|
|
@ -19,3 +19,7 @@ class Healthcheck(ABC):
|
|||
@abstractmethod
|
||||
def services_healthcheck(self, cluster_node: ClusterNode):
|
||||
"""Perform service status check on target cluster node"""
|
||||
|
||||
@abstractmethod
|
||||
def tree_healthcheck(self, cluster_node: ClusterNode):
|
||||
"""Perform tree healthcheck on target cluster node"""
|
||||
|
|
|
@ -86,7 +86,7 @@ class SummarizedStats:
|
|||
target.latencies.by_node[node_key] = operation.latency
|
||||
target.throughput += operation.throughput
|
||||
target.errors.threshold = load_params.error_threshold
|
||||
target.total_bytes = operation.total_bytes
|
||||
target.total_bytes += operation.total_bytes
|
||||
if operation.failed_iterations:
|
||||
target.errors.by_node[node_key] = operation.failed_iterations
|
||||
|
||||
|
|
|
@ -233,6 +233,8 @@ class LoadParams:
|
|||
)
|
||||
# Percentage of filling of all data disks on all nodes
|
||||
fill_percent: Optional[float] = None
|
||||
# if specified, max payload size in GB of the storage engine. If the storage engine is already full, no new objects will be saved.
|
||||
max_total_size_gb: Optional[float] = metadata_field([LoadScenario.LOCAL, LoadScenario.S3_LOCAL], None, "MAX_TOTAL_SIZE_GB")
|
||||
# if set, the payload is generated on the fly and is not read into memory fully.
|
||||
streaming: Optional[int] = metadata_field(all_load_scenarios, None, "STREAMING", False)
|
||||
# Output format
|
||||
|
|
|
@ -57,6 +57,8 @@ class LoadVerifier:
|
|||
invalid_objects = verify_metrics.read.failed_iterations
|
||||
total_left_objects = load_metrics.write.success_iterations - delete_success
|
||||
|
||||
if invalid_objects > 0:
|
||||
issues.append(f"There were {invalid_objects} verification fails (hash mismatch).")
|
||||
# Due to interruptions we may see total verified objects to be less than written on writers count
|
||||
if abs(total_left_objects - verified_objects) > writers:
|
||||
issues.append(
|
||||
|
|
|
@ -489,6 +489,16 @@ class AwsCliClient(S3ClientWrapper):
|
|||
response = self._to_json(output)
|
||||
return response.get("Policy")
|
||||
|
||||
@reporter.step("Delete bucket policy")
|
||||
def delete_bucket_policy(self, bucket: str) -> dict:
|
||||
cmd = (
|
||||
f"aws {self.common_flags} s3api delete-bucket-policy --bucket {bucket} "
|
||||
f"--endpoint {self.s3gate_endpoint} --profile {self.profile}"
|
||||
)
|
||||
output = self.local_shell.exec(cmd).stdout
|
||||
response = self._to_json(output)
|
||||
return response
|
||||
|
||||
@reporter.step("Put bucket policy")
|
||||
def put_bucket_policy(self, bucket: str, policy: dict) -> None:
|
||||
# Leaving it as is was in test repo. Double dumps to escape resulting string
|
||||
|
@ -729,7 +739,10 @@ class AwsCliClient(S3ClientWrapper):
|
|||
f"--key {key} --upload-id {upload_id} --multipart-upload file://{file_path} "
|
||||
f"--endpoint-url {self.s3gate_endpoint} --profile {self.profile}"
|
||||
)
|
||||
self.local_shell.exec(cmd)
|
||||
output = self.local_shell.exec(cmd).stdout
|
||||
response = self._to_json(output)
|
||||
|
||||
return response
|
||||
|
||||
@reporter.step("Put object lock configuration")
|
||||
def put_object_lock_configuration(self, bucket: str, configuration: dict) -> dict:
|
||||
|
|
|
@ -135,7 +135,7 @@ class Boto3ClientWrapper(S3ClientWrapper):
|
|||
|
||||
s3_bucket = self.boto3_client.create_bucket(**params)
|
||||
log_command_execution(f"Created S3 bucket {bucket}", s3_bucket)
|
||||
sleep(S3_SYNC_WAIT_TIME * 10)
|
||||
sleep(S3_SYNC_WAIT_TIME)
|
||||
return bucket
|
||||
|
||||
@reporter.step("List buckets S3")
|
||||
|
@ -156,7 +156,7 @@ class Boto3ClientWrapper(S3ClientWrapper):
|
|||
def delete_bucket(self, bucket: str) -> None:
|
||||
response = self.boto3_client.delete_bucket(Bucket=bucket)
|
||||
log_command_execution("S3 Delete bucket result", response)
|
||||
sleep(S3_SYNC_WAIT_TIME * 10)
|
||||
sleep(S3_SYNC_WAIT_TIME)
|
||||
|
||||
@reporter.step("Head bucket S3")
|
||||
@report_error
|
||||
|
@ -246,6 +246,13 @@ class Boto3ClientWrapper(S3ClientWrapper):
|
|||
log_command_execution("S3 get_bucket_policy result", response)
|
||||
return response.get("Policy")
|
||||
|
||||
@reporter.step("Delete bucket policy")
|
||||
@report_error
|
||||
def delete_bucket_policy(self, bucket: str) -> str:
|
||||
response = self.boto3_client.delete_bucket_policy(Bucket=bucket)
|
||||
log_command_execution("S3 delete_bucket_policy result", response)
|
||||
return response
|
||||
|
||||
@reporter.step("Put bucket policy")
|
||||
@report_error
|
||||
def put_bucket_policy(self, bucket: str, policy: dict) -> None:
|
||||
|
@ -372,7 +379,7 @@ class Boto3ClientWrapper(S3ClientWrapper):
|
|||
}
|
||||
response = self.boto3_client.delete_object(**params)
|
||||
log_command_execution("S3 Delete object result", response)
|
||||
sleep(S3_SYNC_WAIT_TIME * 10)
|
||||
sleep(S3_SYNC_WAIT_TIME)
|
||||
return response
|
||||
|
||||
@reporter.step("Delete objects S3")
|
||||
|
@ -383,7 +390,7 @@ class Boto3ClientWrapper(S3ClientWrapper):
|
|||
assert (
|
||||
"Errors" not in response
|
||||
), f'The following objects have not been deleted: {[err_info["Key"] for err_info in response["Errors"]]}.\nError Message: {response["Errors"]["Message"]}'
|
||||
sleep(S3_SYNC_WAIT_TIME * 10)
|
||||
sleep(S3_SYNC_WAIT_TIME)
|
||||
return response
|
||||
|
||||
@reporter.step("Delete object versions S3")
|
||||
|
@ -571,6 +578,8 @@ class Boto3ClientWrapper(S3ClientWrapper):
|
|||
)
|
||||
log_command_execution("S3 Complete multipart upload", response)
|
||||
|
||||
return response
|
||||
|
||||
@reporter.step("Put object retention")
|
||||
@report_error
|
||||
def put_object_retention(
|
||||
|
|
|
@ -152,6 +152,10 @@ class S3ClientWrapper(HumanReadableABC):
|
|||
def get_bucket_policy(self, bucket: str) -> str:
|
||||
"""Returns the policy of a specified bucket."""
|
||||
|
||||
@abstractmethod
|
||||
def delete_bucket_policy(self, bucket: str) -> str:
|
||||
"""Deletes the policy of a specified bucket."""
|
||||
|
||||
@abstractmethod
|
||||
def put_bucket_policy(self, bucket: str, policy: dict) -> None:
|
||||
"""Applies S3 bucket policy to an S3 bucket."""
|
||||
|
|
|
@ -13,6 +13,7 @@ from frostfs_testlib.resources.common import ASSETS_DIR
|
|||
from frostfs_testlib.shell import Shell
|
||||
from frostfs_testlib.storage.cluster import Cluster, ClusterNode
|
||||
from frostfs_testlib.storage.dataclasses.wallet import WalletInfo
|
||||
from frostfs_testlib.testing import wait_for_success
|
||||
from frostfs_testlib.utils import json_utils
|
||||
from frostfs_testlib.utils.cli_utils import parse_cmd_table, parse_netmap_output
|
||||
|
||||
|
@ -695,6 +696,7 @@ def neo_go_query_height(shell: Shell, endpoint: str) -> dict:
|
|||
}
|
||||
|
||||
|
||||
@wait_for_success()
|
||||
@reporter.step("Search object nodes")
|
||||
def get_object_nodes(
|
||||
cluster: Cluster,
|
||||
|
|
35
src/frostfs_testlib/steps/cli/tree.py
Normal file
35
src/frostfs_testlib/steps/cli/tree.py
Normal file
|
@ -0,0 +1,35 @@
|
|||
import logging
|
||||
from typing import Optional
|
||||
|
||||
from frostfs_testlib import reporter
|
||||
from frostfs_testlib.cli import FrostfsCli
|
||||
from frostfs_testlib.plugins import load_plugin
|
||||
from frostfs_testlib.resources.cli import CLI_DEFAULT_TIMEOUT, FROSTFS_CLI_EXEC
|
||||
from frostfs_testlib.shell import Shell
|
||||
from frostfs_testlib.storage.dataclasses.wallet import WalletInfo
|
||||
|
||||
logger = logging.getLogger("NeoLogger")
|
||||
|
||||
|
||||
|
||||
@reporter.step("Get Tree List")
|
||||
def get_tree_list(
|
||||
wallet: WalletInfo,
|
||||
cid: str,
|
||||
shell: Shell,
|
||||
endpoint: str,
|
||||
timeout: Optional[str] = CLI_DEFAULT_TIMEOUT,
|
||||
) -> None:
|
||||
"""
|
||||
A wrapper for `frostfs-cli tree list` call.
|
||||
Args:
|
||||
wallet (WalletInfo): path to a wallet on whose behalf we delete the container
|
||||
cid (str): ID of the container to delete
|
||||
shell: executor for cli command
|
||||
endpoint: FrostFS endpoint to send request to, appends to `--rpc-endpoint` key
|
||||
timeout: Timeout for the operation.
|
||||
This function doesn't return anything.
|
||||
"""
|
||||
|
||||
cli = FrostfsCli(shell, FROSTFS_CLI_EXEC, wallet.config_path)
|
||||
cli.tree.list(cid=cid, rpc_endpoint=endpoint, timeout=timeout)
|
|
@ -14,6 +14,7 @@ from frostfs_testlib.storage.dataclasses.frostfs_services import HTTPGate, Inner
|
|||
from frostfs_testlib.storage.dataclasses.node_base import NodeBase, ServiceClass
|
||||
from frostfs_testlib.storage.dataclasses.storage_object_info import Interfaces
|
||||
from frostfs_testlib.storage.service_registry import ServiceRegistry
|
||||
from frostfs_testlib.storage.dataclasses.metrics import Metrics
|
||||
|
||||
|
||||
class ClusterNode:
|
||||
|
@ -24,11 +25,13 @@ class ClusterNode:
|
|||
class_registry: ServiceRegistry
|
||||
id: int
|
||||
host: Host
|
||||
metrics: Metrics
|
||||
|
||||
def __init__(self, host: Host, id: int) -> None:
|
||||
self.host = host
|
||||
self.id = id
|
||||
self.class_registry = get_service_registry()
|
||||
self.metrics = Metrics(host=self.host, metrics_endpoint=self.storage_node.get_metrics_endpoint())
|
||||
|
||||
@property
|
||||
def host_ip(self):
|
||||
|
|
36
src/frostfs_testlib/storage/dataclasses/metrics.py
Normal file
36
src/frostfs_testlib/storage/dataclasses/metrics.py
Normal file
|
@ -0,0 +1,36 @@
|
|||
from frostfs_testlib.hosting import Host
|
||||
from frostfs_testlib.shell.interfaces import CommandResult
|
||||
|
||||
|
||||
class Metrics:
|
||||
def __init__(self, host: Host, metrics_endpoint: str) -> None:
|
||||
self.storage = StorageMetrics(host, metrics_endpoint)
|
||||
|
||||
|
||||
|
||||
class StorageMetrics:
|
||||
"""
|
||||
Class represents storage metrics in a cluster
|
||||
"""
|
||||
def __init__(self, host: Host, metrics_endpoint: str) -> None:
|
||||
self.host = host
|
||||
self.metrics_endpoint = metrics_endpoint
|
||||
|
||||
def get_metric_container(self, metric: str, cid: str) -> CommandResult:
|
||||
shell = self.host.get_shell()
|
||||
result = shell.exec(f"curl -s {self.metrics_endpoint} | grep {metric} |grep {cid}")
|
||||
return result
|
||||
|
||||
def get_metrics_search_by_greps(self, **greps) -> CommandResult:
|
||||
"""
|
||||
Get a metrics, search by: cid, metric_type, shard_id etc.
|
||||
Args:
|
||||
greps: dict of grep-command-name and value
|
||||
for example get_metrics_search_by_greps(command='container_objects_total', cid='123456')
|
||||
Return:
|
||||
result of metrics
|
||||
"""
|
||||
shell = self.host.get_shell()
|
||||
additional_greps = " |grep ".join([grep_command for grep_command in greps.values()])
|
||||
result = shell.exec(f"curl -s {self.metrics_endpoint} | grep {additional_greps}")
|
||||
return result
|
Loading…
Reference in a new issue