Merge branch 'master' into feature--10059-MFA

This commit is contained in:
Roman Chernykh 2024-10-30 07:03:23 +00:00
commit 0204d93986
33 changed files with 917 additions and 188 deletions

View file

@ -27,8 +27,8 @@ dependencies = [
"testrail-api>=1.12.0", "testrail-api>=1.12.0",
"pytest==7.1.2", "pytest==7.1.2",
"tenacity==8.0.1", "tenacity==8.0.1",
"boto3==1.16.33", "boto3==1.35.30",
"boto3-stubs[essential]==1.16.33", "boto3-stubs[essential]==1.35.30",
] ]
requires-python = ">=3.10" requires-python = ">=3.10"

View file

@ -8,8 +8,8 @@ docstring_parser==0.15
testrail-api==1.12.0 testrail-api==1.12.0
tenacity==8.0.1 tenacity==8.0.1
pytest==7.1.2 pytest==7.1.2
boto3==1.16.33 boto3==1.35.30
boto3-stubs[essential]==1.16.33 boto3-stubs[essential]==1.35.30
# Dev dependencies # Dev dependencies
black==22.8.0 black==22.8.0

View file

@ -1,3 +1,4 @@
__version__ = "2.0.1" __version__ = "2.0.1"
from .fixtures import configure_testlib, hosting from .fixtures import configure_testlib, hosting, temp_directory
from .hooks import pytest_collection_modifyitems

View file

@ -69,9 +69,7 @@ class FrostfsAdmMorph(CliCommand):
**{param: param_value for param, param_value in locals().items() if param not in ["self"]}, **{param: param_value for param, param_value in locals().items() if param not in ["self"]},
) )
def set_config( def set_config(self, set_key_value: str, rpc_endpoint: Optional[str] = None, alphabet_wallets: Optional[str] = None) -> CommandResult:
self, set_key_value: str, rpc_endpoint: Optional[str] = None, alphabet_wallets: Optional[str] = None
) -> CommandResult:
"""Add/update global config value in the FrostFS network. """Add/update global config value in the FrostFS network.
Args: Args:
@ -125,7 +123,7 @@ class FrostfsAdmMorph(CliCommand):
) )
def force_new_epoch( def force_new_epoch(
self, rpc_endpoint: Optional[str] = None, alphabet_wallets: Optional[str] = None self, rpc_endpoint: Optional[str] = None, alphabet_wallets: Optional[str] = None, delta: Optional[int] = None
) -> CommandResult: ) -> CommandResult:
"""Create new FrostFS epoch event in the side chain. """Create new FrostFS epoch event in the side chain.
@ -344,9 +342,131 @@ class FrostfsAdmMorph(CliCommand):
return self._execute( return self._execute(
f"morph remove-nodes {' '.join(node_netmap_keys)}", f"morph remove-nodes {' '.join(node_netmap_keys)}",
**{ **{param: param_value for param, param_value in locals().items() if param not in ["self", "node_netmap_keys"]},
param: param_value
for param, param_value in locals().items()
if param not in ["self", "node_netmap_keys"]
},
) )
def add_rule(
self,
endpoint: str,
chain_id: str,
target_name: str,
target_type: str,
rule: Optional[list[str]] = None,
path: Optional[str] = None,
chain_id_hex: Optional[bool] = None,
wallet: Optional[str] = None,
address: Optional[str] = None,
timeout: Optional[str] = None,
) -> CommandResult:
"""Drop objects from the node's local storage
Args:
address: Address of wallet account
chain-id: Assign ID to the parsed chain
chain-id-hex: Flag to parse chain ID as hex
endpoint: Remote node control address (as 'multiaddr' or '<host>:<port>')
path: Path to encoded chain in JSON or binary format
rule: Rule statement
target-name: Resource name in APE resource name format
target-type: Resource type(container/namespace)
timeout: Timeout for an operation (default 15s)
wallet: Path to the wallet or binary key
Returns:
Command`s result.
"""
return self._execute(
"control add-rule",
**{param: value for param, value in locals().items() if param not in ["self"]},
)
def get_rule(
self,
endpoint: str,
chain_id: str,
target_name: str,
target_type: str,
chain_id_hex: Optional[bool] = None,
wallet: Optional[str] = None,
address: Optional[str] = None,
timeout: Optional[str] = None,
) -> CommandResult:
"""Drop objects from the node's local storage
Args:
address string Address of wallet account
chain-id string Chain id
chain-id-hex Flag to parse chain ID as hex
endpoint string Remote node control address (as 'multiaddr' or '<host>:<port>')
target-name string Resource name in APE resource name format
target-type string Resource type(container/namespace)
timeout duration Timeout for an operation (default 15s)
wallet string Path to the wallet or binary key
Returns:
Command`s result.
"""
return self._execute(
"control get-rule",
**{param: value for param, value in locals().items() if param not in ["self"]},
)
def list_rules(
self,
target_type: str,
target_name: Optional[str] = None,
rpc_endpoint: Optional[str] = None,
wallet: Optional[str] = None,
address: Optional[str] = None,
timeout: Optional[str] = None,
) -> CommandResult:
"""Drop objects from the node's local storage
Args:
address: Address of wallet account
endpoint: Remote node control address (as 'multiaddr' or '<host>:<port>')
target-name: Resource name in APE resource name format
target-type: Resource type(container/namespace)
timeout: Timeout for an operation (default 15s)
wallet: Path to the wallet or binary key
Returns:
Command`s result.
"""
return self._execute(
"morph ape list-rule-chains",
**{param: value for param, value in locals().items() if param not in ["self"]},
)
def remove_rule(
self,
endpoint: str,
chain_id: str,
target_name: str,
target_type: str,
all: Optional[bool] = None,
chain_id_hex: Optional[bool] = None,
wallet: Optional[str] = None,
address: Optional[str] = None,
timeout: Optional[str] = None,
) -> CommandResult:
"""Drop objects from the node's local storage
Args:
address: Address of wallet account
all: Remove all chains
chain-id: Assign ID to the parsed chain
chain-id-hex: Flag to parse chain ID as hex
endpoint: Remote node control address (as 'multiaddr' or '<host>:<port>')
target-name: Resource name in APE resource name format
target-type: Resource type(container/namespace)
timeout: Timeout for an operation (default 15s)
wallet: Path to the wallet or binary key
Returns:
Command`s result.
"""
return self._execute(
"control remove-rule",
**{param: value for param, value in locals().items() if param not in ["self"]},
)

View file

@ -16,6 +16,8 @@ class FrostfsCliContainer(CliCommand):
basic_acl: Optional[str] = None, basic_acl: Optional[str] = None,
await_mode: bool = False, await_mode: bool = False,
disable_timestamp: bool = False, disable_timestamp: bool = False,
force: bool = False,
trace: bool = False,
name: Optional[str] = None, name: Optional[str] = None,
nonce: Optional[str] = None, nonce: Optional[str] = None,
policy: Optional[str] = None, policy: Optional[str] = None,
@ -37,6 +39,8 @@ class FrostfsCliContainer(CliCommand):
basic_acl: Hex encoded basic ACL value or keywords like 'public-read-write', basic_acl: Hex encoded basic ACL value or keywords like 'public-read-write',
'private', 'eacl-public-read' (default "private"). 'private', 'eacl-public-read' (default "private").
disable_timestamp: Disable timestamp container attribute. disable_timestamp: Disable timestamp container attribute.
force: Skip placement validity check.
trace: Generate trace ID and print it.
name: Container name attribute. name: Container name attribute.
nonce: UUIDv4 nonce value for container. nonce: UUIDv4 nonce value for container.
policy: QL-encoded or JSON-encoded placement policy or path to file with it. policy: QL-encoded or JSON-encoded placement policy or path to file with it.
@ -69,6 +73,7 @@ class FrostfsCliContainer(CliCommand):
ttl: Optional[int] = None, ttl: Optional[int] = None,
xhdr: Optional[dict] = None, xhdr: Optional[dict] = None,
force: bool = False, force: bool = False,
trace: bool = False,
) -> CommandResult: ) -> CommandResult:
""" """
Delete an existing container. Delete an existing container.
@ -78,6 +83,7 @@ class FrostfsCliContainer(CliCommand):
address: Address of wallet account. address: Address of wallet account.
await_mode: Block execution until container is removed. await_mode: Block execution until container is removed.
cid: Container ID. cid: Container ID.
trace: Generate trace ID and print it.
force: Do not check whether container contains locks and remove immediately. force: Do not check whether container contains locks and remove immediately.
rpc_endpoint: Remote node address (as 'multiaddr' or '<host>:<port>'). rpc_endpoint: Remote node address (as 'multiaddr' or '<host>:<port>').
session: Path to a JSON-encoded container session token. session: Path to a JSON-encoded container session token.
@ -104,6 +110,7 @@ class FrostfsCliContainer(CliCommand):
await_mode: bool = False, await_mode: bool = False,
to: Optional[str] = None, to: Optional[str] = None,
json_mode: bool = False, json_mode: bool = False,
trace: bool = False,
ttl: Optional[int] = None, ttl: Optional[int] = None,
xhdr: Optional[dict] = None, xhdr: Optional[dict] = None,
timeout: Optional[str] = None, timeout: Optional[str] = None,
@ -116,6 +123,7 @@ class FrostfsCliContainer(CliCommand):
await_mode: Block execution until container is removed. await_mode: Block execution until container is removed.
cid: Container ID. cid: Container ID.
json_mode: Print or dump container in JSON format. json_mode: Print or dump container in JSON format.
trace: Generate trace ID and print it.
rpc_endpoint: Remote node address (as 'multiaddr' or '<host>:<port>'). rpc_endpoint: Remote node address (as 'multiaddr' or '<host>:<port>').
to: Path to dump encoded container. to: Path to dump encoded container.
ttl: TTL value in request meta header (default 2). ttl: TTL value in request meta header (default 2).
@ -155,6 +163,8 @@ class FrostfsCliContainer(CliCommand):
cid: Container ID. cid: Container ID.
rpc_endpoint: Remote node address (as 'multiaddr' or '<host>:<port>'). rpc_endpoint: Remote node address (as 'multiaddr' or '<host>:<port>').
to: Path to dump encoded container. to: Path to dump encoded container.
json_mode: Print or dump container in JSON format.
trace: Generate trace ID and print it.
session: Path to a JSON-encoded container session token. session: Path to a JSON-encoded container session token.
ttl: TTL value in request meta header (default 2). ttl: TTL value in request meta header (default 2).
wallet: WIF (NEP-2) string or path to the wallet or binary key. wallet: WIF (NEP-2) string or path to the wallet or binary key.
@ -174,6 +184,7 @@ class FrostfsCliContainer(CliCommand):
def list( def list(
self, self,
rpc_endpoint: str, rpc_endpoint: str,
name: Optional[str] = None,
wallet: Optional[str] = None, wallet: Optional[str] = None,
address: Optional[str] = None, address: Optional[str] = None,
generate_key: Optional[bool] = None, generate_key: Optional[bool] = None,
@ -188,11 +199,13 @@ class FrostfsCliContainer(CliCommand):
Args: Args:
address: Address of wallet account. address: Address of wallet account.
name: List containers by the attribute name.
owner: Owner of containers (omit to use owner from private key). owner: Owner of containers (omit to use owner from private key).
rpc_endpoint: Remote node address (as 'multiaddr' or '<host>:<port>'). rpc_endpoint: Remote node address (as 'multiaddr' or '<host>:<port>').
ttl: TTL value in request meta header (default 2). ttl: TTL value in request meta header (default 2).
wallet: WIF (NEP-2) string or path to the wallet or binary key. wallet: WIF (NEP-2) string or path to the wallet or binary key.
xhdr: Dict with request X-Headers. xhdr: Dict with request X-Headers.
trace: Generate trace ID and print it.
timeout: Timeout for the operation (default 15s). timeout: Timeout for the operation (default 15s).
generate_key: Generate a new private key. generate_key: Generate a new private key.
@ -208,9 +221,11 @@ class FrostfsCliContainer(CliCommand):
self, self,
rpc_endpoint: str, rpc_endpoint: str,
cid: str, cid: str,
bearer: Optional[str] = None,
wallet: Optional[str] = None, wallet: Optional[str] = None,
address: Optional[str] = None, address: Optional[str] = None,
generate_key: Optional[bool] = None, generate_key: Optional[bool] = None,
trace: bool = False,
ttl: Optional[int] = None, ttl: Optional[int] = None,
xhdr: Optional[dict] = None, xhdr: Optional[dict] = None,
timeout: Optional[str] = None, timeout: Optional[str] = None,
@ -221,10 +236,12 @@ class FrostfsCliContainer(CliCommand):
Args: Args:
address: Address of wallet account. address: Address of wallet account.
cid: Container ID. cid: Container ID.
bearer: File with signed JSON or binary encoded bearer token.
rpc_endpoint: Remote node address (as 'multiaddr' or '<host>:<port>'). rpc_endpoint: Remote node address (as 'multiaddr' or '<host>:<port>').
ttl: TTL value in request meta header (default 2). ttl: TTL value in request meta header (default 2).
wallet: WIF (NEP-2) string or path to the wallet or binary key. wallet: WIF (NEP-2) string or path to the wallet or binary key.
xhdr: Dict with request X-Headers. xhdr: Dict with request X-Headers.
trace: Generate trace ID and print it.
timeout: Timeout for the operation (default 15s). timeout: Timeout for the operation (default 15s).
generate_key: Generate a new private key. generate_key: Generate a new private key.
@ -236,6 +253,7 @@ class FrostfsCliContainer(CliCommand):
**{param: value for param, value in locals().items() if param not in ["self"]}, **{param: value for param, value in locals().items() if param not in ["self"]},
) )
# TODO Deprecated method with 0.42
def set_eacl( def set_eacl(
self, self,
rpc_endpoint: str, rpc_endpoint: str,
@ -281,6 +299,7 @@ class FrostfsCliContainer(CliCommand):
address: Optional[str] = None, address: Optional[str] = None,
ttl: Optional[int] = None, ttl: Optional[int] = None,
from_file: Optional[str] = None, from_file: Optional[str] = None,
trace: bool = False,
short: Optional[bool] = True, short: Optional[bool] = True,
xhdr: Optional[dict] = None, xhdr: Optional[dict] = None,
generate_key: Optional[bool] = None, generate_key: Optional[bool] = None,
@ -298,6 +317,7 @@ class FrostfsCliContainer(CliCommand):
from_file: string File path with encoded container from_file: string File path with encoded container
timeout: duration Timeout for the operation (default 15 s) timeout: duration Timeout for the operation (default 15 s)
short: shorten the output of node information. short: shorten the output of node information.
trace: Generate trace ID and print it.
xhdr: Dict with request X-Headers. xhdr: Dict with request X-Headers.
generate_key: Generate a new private key. generate_key: Generate a new private key.

View file

@ -370,11 +370,11 @@ class FrostfsCliObject(CliCommand):
self, self,
rpc_endpoint: str, rpc_endpoint: str,
cid: str, cid: str,
oid: Optional[str] = None,
wallet: Optional[str] = None, wallet: Optional[str] = None,
address: Optional[str] = None, address: Optional[str] = None,
bearer: Optional[str] = None, bearer: Optional[str] = None,
generate_key: Optional[bool] = None, generate_key: Optional[bool] = None,
oid: Optional[str] = None,
trace: bool = False, trace: bool = False,
root: bool = False, root: bool = False,
verify_presence_all: bool = False, verify_presence_all: bool = False,

View file

@ -40,7 +40,7 @@ class FrostfsCliShards(CliCommand):
self, self,
endpoint: str, endpoint: str,
mode: str, mode: str,
id: Optional[list[str]], id: Optional[list[str]] = None,
wallet: Optional[str] = None, wallet: Optional[str] = None,
wallet_password: Optional[str] = None, wallet_password: Optional[str] = None,
address: Optional[str] = None, address: Optional[str] = None,

View file

@ -1,5 +1,4 @@
import re import re
from datetime import datetime
from typing import Optional from typing import Optional
from frostfs_testlib import reporter from frostfs_testlib import reporter
@ -10,6 +9,7 @@ from frostfs_testlib.shell import LocalShell
from frostfs_testlib.steps.cli.container import list_containers from frostfs_testlib.steps.cli.container import list_containers
from frostfs_testlib.storage.cluster import ClusterNode from frostfs_testlib.storage.cluster import ClusterNode
from frostfs_testlib.storage.dataclasses.frostfs_services import S3Gate from frostfs_testlib.storage.dataclasses.frostfs_services import S3Gate
from frostfs_testlib.utils import string_utils
class AuthmateS3CredentialsProvider(S3CredentialsProvider): class AuthmateS3CredentialsProvider(S3CredentialsProvider):
@ -22,7 +22,7 @@ class AuthmateS3CredentialsProvider(S3CredentialsProvider):
gate_public_keys = [node.service(S3Gate).get_wallet_public_key() for node in cluster_nodes] gate_public_keys = [node.service(S3Gate).get_wallet_public_key() for node in cluster_nodes]
# unique short bucket name # unique short bucket name
bucket = f"bucket-{hex(int(datetime.now().timestamp()*1000000))}" bucket = string_utils.unique_name("bucket-")
frostfs_authmate: FrostfsAuthmate = FrostfsAuthmate(shell, FROSTFS_AUTHMATE_EXEC) frostfs_authmate: FrostfsAuthmate = FrostfsAuthmate(shell, FROSTFS_AUTHMATE_EXEC)
issue_secret_output = frostfs_authmate.secret.issue( issue_secret_output = frostfs_authmate.secret.issue(

View file

@ -7,7 +7,7 @@ import yaml
from frostfs_testlib import reporter from frostfs_testlib import reporter
from frostfs_testlib.hosting.hosting import Hosting from frostfs_testlib.hosting.hosting import Hosting
from frostfs_testlib.resources.common import HOSTING_CONFIG_FILE from frostfs_testlib.resources.common import ASSETS_DIR, HOSTING_CONFIG_FILE
from frostfs_testlib.storage import get_service_registry from frostfs_testlib.storage import get_service_registry
@ -24,6 +24,16 @@ def configure_testlib():
registry.register_service(svc.name, svc.load()) registry.register_service(svc.name, svc.load())
@pytest.fixture(scope="session")
def temp_directory(configure_testlib):
with reporter.step("Prepare tmp directory"):
full_path = ASSETS_DIR
if not os.path.exists(full_path):
os.mkdir(full_path)
return full_path
@pytest.fixture(scope="session") @pytest.fixture(scope="session")
def hosting(configure_testlib) -> Hosting: def hosting(configure_testlib) -> Hosting:
with open(HOSTING_CONFIG_FILE, "r") as file: with open(HOSTING_CONFIG_FILE, "r") as file:

View file

@ -0,0 +1,13 @@
import pytest
@pytest.hookimpl
def pytest_collection_modifyitems(items: list[pytest.Item]):
# All tests which reside in frostfs nodeid are granted with frostfs marker, excluding
# nodeid = full path of the test
# 1. plugins
# 2. testlib itself
for item in items:
location = item.location[0]
if "frostfs" in location and "plugin" not in location and "testlib" not in location:
item.add_marker("frostfs")

View file

@ -185,6 +185,12 @@ class DockerHost(Host):
def is_file_exist(self, file_path: str) -> None: def is_file_exist(self, file_path: str) -> None:
raise NotImplementedError("Not implemented for docker") raise NotImplementedError("Not implemented for docker")
def wipefs_storage_node_data(self, service_name: str) -> None:
raise NotImplementedError("Not implemented for docker")
def finish_wipefs(self, service_name: str) -> None:
raise NotImplementedError("Not implemented for docker")
def delete_storage_node_data(self, service_name: str, cache_only: bool = False) -> None: def delete_storage_node_data(self, service_name: str, cache_only: bool = False) -> None:
volume_path = self.get_data_directory(service_name) volume_path = self.get_data_directory(service_name)
@ -240,7 +246,7 @@ class DockerHost(Host):
until: Optional[datetime] = None, until: Optional[datetime] = None,
unit: Optional[str] = None, unit: Optional[str] = None,
exclude_filter: Optional[str] = None, exclude_filter: Optional[str] = None,
priority: Optional[str] = None priority: Optional[str] = None,
) -> str: ) -> str:
client = self._get_docker_client() client = self._get_docker_client()
filtered_logs = "" filtered_logs = ""

View file

@ -178,6 +178,21 @@ class Host(ABC):
cache_only: To delete cache only. cache_only: To delete cache only.
""" """
@abstractmethod
def wipefs_storage_node_data(self, service_name: str) -> None:
"""Erases all data of the storage node with specified name.
Args:
service_name: Name of storage node service.
"""
def finish_wipefs(self, service_name: str) -> None:
"""Erases all data of the storage node with specified name.
Args:
service_name: Name of storage node service.
"""
@abstractmethod @abstractmethod
def delete_fstree(self, service_name: str) -> None: def delete_fstree(self, service_name: str) -> None:
""" """
@ -297,7 +312,7 @@ class Host(ABC):
until: Optional[datetime] = None, until: Optional[datetime] = None,
unit: Optional[str] = None, unit: Optional[str] = None,
exclude_filter: Optional[str] = None, exclude_filter: Optional[str] = None,
priority: Optional[str] = None priority: Optional[str] = None,
) -> str: ) -> str:
"""Get logs from host filtered by regex. """Get logs from host filtered by regex.
@ -306,7 +321,7 @@ class Host(ABC):
since: If set, limits the time from which logs should be collected. Must be in UTC. since: If set, limits the time from which logs should be collected. Must be in UTC.
until: If set, limits the time until which logs should be collected. Must be in UTC. until: If set, limits the time until which logs should be collected. Must be in UTC.
unit: required unit. unit: required unit.
priority: logs level, 0 - emergency, 7 - debug. All messages with that code and higher. priority: logs level, 0 - emergency, 7 - debug. All messages with that code and higher.
For example, if we specify the -p 2 option, journalctl will show all messages with levels 2, 1 and 0. For example, if we specify the -p 2 option, journalctl will show all messages with levels 2, 1 and 0.
Returns: Returns:

View file

@ -1,5 +1,6 @@
from abc import ABC, abstractmethod from abc import ABC, abstractmethod
from frostfs_testlib.load.interfaces.loader import Loader
from frostfs_testlib.load.k6 import K6 from frostfs_testlib.load.k6 import K6
from frostfs_testlib.load.load_config import LoadParams from frostfs_testlib.load.load_config import LoadParams
from frostfs_testlib.storage.cluster import ClusterNode from frostfs_testlib.storage.cluster import ClusterNode
@ -48,3 +49,7 @@ class ScenarioRunner(ABC):
@abstractmethod @abstractmethod
def get_results(self) -> dict: def get_results(self) -> dict:
"""Get results from K6 run""" """Get results from K6 run"""
@abstractmethod
def get_loaders(self) -> list[Loader]:
"""Return loaders"""

View file

@ -30,6 +30,7 @@ from frostfs_testlib.utils.file_keeper import FileKeeper
class RunnerBase(ScenarioRunner): class RunnerBase(ScenarioRunner):
k6_instances: list[K6] k6_instances: list[K6]
loaders: list[Loader]
@reporter.step("Run preset on loaders") @reporter.step("Run preset on loaders")
def preset(self): def preset(self):
@ -49,9 +50,11 @@ class RunnerBase(ScenarioRunner):
def get_k6_instances(self): def get_k6_instances(self):
return self.k6_instances return self.k6_instances
def get_loaders(self) -> list[Loader]:
return self.loaders
class DefaultRunner(RunnerBase): class DefaultRunner(RunnerBase):
loaders: list[Loader]
user: User user: User
def __init__( def __init__(
@ -228,7 +231,6 @@ class DefaultRunner(RunnerBase):
class LocalRunner(RunnerBase): class LocalRunner(RunnerBase):
loaders: list[Loader]
cluster_state_controller: ClusterStateController cluster_state_controller: ClusterStateController
file_keeper: FileKeeper file_keeper: FileKeeper
user: User user: User

View file

@ -29,3 +29,4 @@ S3_MALFORMED_XML_REQUEST = "The XML you provided was not well-formed or did not
RULE_ACCESS_DENIED_CONTAINER = "access to container operation {operation} is denied by access policy engine: Access denied" RULE_ACCESS_DENIED_CONTAINER = "access to container operation {operation} is denied by access policy engine: Access denied"
RULE_ACCESS_DENIED_OBJECT = "access to object operation denied: ape denied request: method {operation}: Access denied" RULE_ACCESS_DENIED_OBJECT = "access to object operation denied: ape denied request: method {operation}: Access denied"
NO_RULE_FOUND_CONTAINER = "access to container operation {operation} is denied by access policy engine: NoRuleFound" NO_RULE_FOUND_CONTAINER = "access to container operation {operation} is denied by access policy engine: NoRuleFound"
NO_RULE_FOUND_OBJECT = "access to object operation denied: ape denied request: method {operation}: NoRuleFound"

View file

@ -754,6 +754,36 @@ class AwsCliClient(S3ClientWrapper):
response = self._to_json(output) response = self._to_json(output)
return response.get("ObjectLockConfiguration") return response.get("ObjectLockConfiguration")
@reporter.step("Put bucket lifecycle configuration")
def put_bucket_lifecycle_configuration(self, bucket: str, lifecycle_configuration: dict, dumped_configuration: str) -> dict:
cmd = (
f"aws {self.common_flags} s3api put-bucket-lifecycle-configuration --bucket {bucket} "
f"--endpoint-url {self.s3gate_endpoint} --lifecycle-configuration file://{dumped_configuration} --profile {self.profile}"
)
output = self.local_shell.exec(cmd).stdout
response = self._to_json(output)
return response
@reporter.step("Get bucket lifecycle configuration")
def get_bucket_lifecycle_configuration(self, bucket: str) -> dict:
cmd = (
f"aws {self.common_flags} s3api get-bucket-lifecycle-configuration --bucket {bucket} "
f"--endpoint-url {self.s3gate_endpoint} --profile {self.profile}"
)
output = self.local_shell.exec(cmd).stdout
response = self._to_json(output)
return response
@reporter.step("Delete bucket lifecycle configuration")
def delete_bucket_lifecycle(self, bucket: str) -> dict:
cmd = (
f"aws {self.common_flags} s3api delete-bucket-lifecycle --bucket {bucket} "
f"--endpoint-url {self.s3gate_endpoint} --profile {self.profile}"
)
output = self.local_shell.exec(cmd).stdout
response = self._to_json(output)
return response
@staticmethod @staticmethod
def _to_json(output: str) -> dict: def _to_json(output: str) -> dict:
json_output = {} json_output = {}

View file

@ -68,6 +68,7 @@ class Boto3ClientWrapper(S3ClientWrapper):
self.access_key_id: str = access_key_id self.access_key_id: str = access_key_id
self.secret_access_key: str = secret_access_key self.secret_access_key: str = secret_access_key
self.s3gate_endpoint: str = "" self.s3gate_endpoint: str = ""
self.iam_endpoint: str = ""
self.boto3_iam_client: S3Client = None self.boto3_iam_client: S3Client = None
self.boto3_sts_client: S3Client = None self.boto3_sts_client: S3Client = None
self.set_endpoint(s3gate_endpoint) self.set_endpoint(s3gate_endpoint)
@ -91,11 +92,16 @@ class Boto3ClientWrapper(S3ClientWrapper):
@reporter.step("Set endpoint IAM to {iam_endpoint}") @reporter.step("Set endpoint IAM to {iam_endpoint}")
def set_iam_endpoint(self, iam_endpoint: str): def set_iam_endpoint(self, iam_endpoint: str):
if self.iam_endpoint == iam_endpoint:
return
self.iam_endpoint = iam_endpoint
self.boto3_iam_client = self.session.client( self.boto3_iam_client = self.session.client(
service_name="iam", service_name="iam",
aws_access_key_id=self.access_key_id, aws_access_key_id=self.access_key_id,
aws_secret_access_key=self.secret_access_key, aws_secret_access_key=self.secret_access_key,
endpoint_url=iam_endpoint, endpoint_url=self.iam_endpoint,
verify=False, verify=False,
) )
# since the STS does not have an enpoint, IAM is used # since the STS does not have an enpoint, IAM is used
@ -305,6 +311,27 @@ class Boto3ClientWrapper(S3ClientWrapper):
response = self.boto3_client.delete_bucket_cors(Bucket=bucket) response = self.boto3_client.delete_bucket_cors(Bucket=bucket)
log_command_execution(self.s3gate_endpoint, "S3 delete_bucket_cors result", response, {"Bucket": bucket}) log_command_execution(self.s3gate_endpoint, "S3 delete_bucket_cors result", response, {"Bucket": bucket})
@reporter.step("Put bucket lifecycle configuration")
@report_error
def put_bucket_lifecycle_configuration(self, bucket: str, lifecycle_configuration: dict, dumped_configuration: str) -> dict:
response = self.boto3_client.put_bucket_lifecycle_configuration(Bucket=bucket, LifecycleConfiguration=lifecycle_configuration)
log_command_execution(self.s3gate_endpoint, "S3 put_bucket_lifecycle_configuration result", response, {"Bucket": bucket})
return response
@reporter.step("Get bucket lifecycle configuration")
@report_error
def get_bucket_lifecycle_configuration(self, bucket: str) -> dict:
response = self.boto3_client.get_bucket_lifecycle_configuration(Bucket=bucket)
log_command_execution(self.s3gate_endpoint, "S3 get_bucket_lifecycle_configuration result", response, {"Bucket": bucket})
return {"Rules": response.get("Rules")}
@reporter.step("Delete bucket lifecycle configuration")
@report_error
def delete_bucket_lifecycle(self, bucket: str) -> dict:
response = self.boto3_client.delete_bucket_lifecycle(Bucket=bucket)
log_command_execution(self.s3gate_endpoint, "S3 delete_bucket_lifecycle result", response, {"Bucket": bucket})
return response
# END OF BUCKET METHODS # # END OF BUCKET METHODS #
# OBJECT METHODS # # OBJECT METHODS #
@ -675,25 +702,36 @@ class Boto3ClientWrapper(S3ClientWrapper):
# Some methods don't have checks because boto3 is silent in some cases (delete, attach, etc.) # Some methods don't have checks because boto3 is silent in some cases (delete, attach, etc.)
@reporter.step("Adds the specified user to the specified group") @reporter.step("Adds the specified user to the specified group")
@report_error
def iam_add_user_to_group(self, user_name: str, group_name: str) -> dict: def iam_add_user_to_group(self, user_name: str, group_name: str) -> dict:
response = self.boto3_iam_client.add_user_to_group(UserName=user_name, GroupName=group_name) params = self._convert_to_s3_params(locals().items())
response = self.boto3_iam_client.add_user_to_group(**params)
log_command_execution(self.iam_endpoint, "IAM Add User to Group", response, params)
return response return response
@reporter.step("Attaches the specified managed policy to the specified IAM group") @reporter.step("Attaches the specified managed policy to the specified IAM group")
@report_error
def iam_attach_group_policy(self, group_name: str, policy_arn: str) -> dict: def iam_attach_group_policy(self, group_name: str, policy_arn: str) -> dict:
response = self.boto3_iam_client.attach_group_policy(GroupName=group_name, PolicyArn=policy_arn) params = self._convert_to_s3_params(locals().items())
response = self.boto3_iam_client.attach_group_policy(**params)
log_command_execution(self.iam_endpoint, "IAM Attach Group Policy", response, params)
sleep(S3_SYNC_WAIT_TIME * 10) sleep(S3_SYNC_WAIT_TIME * 10)
return response return response
@reporter.step("Attaches the specified managed policy to the specified user") @reporter.step("Attaches the specified managed policy to the specified user")
@report_error
def iam_attach_user_policy(self, user_name: str, policy_arn: str) -> dict: def iam_attach_user_policy(self, user_name: str, policy_arn: str) -> dict:
response = self.boto3_iam_client.attach_user_policy(UserName=user_name, PolicyArn=policy_arn) params = self._convert_to_s3_params(locals().items())
response = self.boto3_iam_client.attach_user_policy(**params)
log_command_execution(self.iam_endpoint, "IAM Attach User Policy", response, params)
sleep(S3_SYNC_WAIT_TIME * 10) sleep(S3_SYNC_WAIT_TIME * 10)
return response return response
@reporter.step("Creates a new AWS secret access key and access key ID for the specified user") @reporter.step("Creates a new AWS secret access key and access key ID for the specified user")
@report_error
def iam_create_access_key(self, user_name: str) -> dict: def iam_create_access_key(self, user_name: str) -> dict:
response = self.boto3_iam_client.create_access_key(UserName=user_name) response = self.boto3_iam_client.create_access_key(UserName=user_name)
log_command_execution(self.iam_endpoint, "IAM Create Access Key", response, {"UserName": user_name})
access_key_id = response["AccessKey"].get("AccessKeyId") access_key_id = response["AccessKey"].get("AccessKeyId")
secret_access_key = response["AccessKey"].get("SecretAccessKey") secret_access_key = response["AccessKey"].get("SecretAccessKey")
@ -703,138 +741,190 @@ class Boto3ClientWrapper(S3ClientWrapper):
return access_key_id, secret_access_key return access_key_id, secret_access_key
@reporter.step("Creates a new group") @reporter.step("Creates a new group")
@report_error
def iam_create_group(self, group_name: str) -> dict: def iam_create_group(self, group_name: str) -> dict:
response = self.boto3_iam_client.create_group(GroupName=group_name) response = self.boto3_iam_client.create_group(GroupName=group_name)
log_command_execution(self.iam_endpoint, "IAM Create Group", response, {"GroupName": group_name})
assert response.get("Group"), f"Expected Group in response:\n{response}" assert response.get("Group"), f"Expected Group in response:\n{response}"
assert response["Group"].get("GroupName") == group_name, f"GroupName should be equal to {group_name}" assert response["Group"].get("GroupName") == group_name, f"GroupName should be equal to {group_name}"
return response return response
@reporter.step("Creates a new managed policy for your AWS account") @reporter.step("Creates a new managed policy for your AWS account")
@report_error
def iam_create_policy(self, policy_name: str, policy_document: dict) -> dict: def iam_create_policy(self, policy_name: str, policy_document: dict) -> dict:
response = self.boto3_iam_client.create_policy(PolicyName=policy_name, PolicyDocument=json.dumps(policy_document)) params = self._convert_to_s3_params(locals().items())
params["PolicyDocument"] = json.dumps(policy_document)
response = self.boto3_iam_client.create_policy(**params)
log_command_execution(self.iam_endpoint, "IAM Create Policy", response, params)
assert response.get("Policy"), f"Expected Policy in response:\n{response}" assert response.get("Policy"), f"Expected Policy in response:\n{response}"
assert response["Policy"].get("PolicyName") == policy_name, f"PolicyName should be equal to {policy_name}" assert response["Policy"].get("PolicyName") == policy_name, f"PolicyName should be equal to {policy_name}"
return response return response
@reporter.step("Creates a new IAM user for your AWS account") @reporter.step("Creates a new IAM user for your AWS account")
@report_error
def iam_create_user(self, user_name: str) -> dict: def iam_create_user(self, user_name: str) -> dict:
response = self.boto3_iam_client.create_user(UserName=user_name) response = self.boto3_iam_client.create_user(UserName=user_name)
log_command_execution(self.iam_endpoint, "IAM Create User", response, {"UserName": user_name})
assert response.get("User"), f"Expected User in response:\n{response}" assert response.get("User"), f"Expected User in response:\n{response}"
assert response["User"].get("UserName") == user_name, f"UserName should be equal to {user_name}" assert response["User"].get("UserName") == user_name, f"UserName should be equal to {user_name}"
return response return response
@reporter.step("Deletes the access key pair associated with the specified IAM user") @reporter.step("Deletes the access key pair associated with the specified IAM user")
@report_error
def iam_delete_access_key(self, access_key_id: str, user_name: str) -> dict: def iam_delete_access_key(self, access_key_id: str, user_name: str) -> dict:
response = self.boto3_iam_client.delete_access_key(AccessKeyId=access_key_id, UserName=user_name) params = self._convert_to_s3_params(locals().items())
response = self.boto3_iam_client.delete_access_key(**params)
log_command_execution(self.iam_endpoint, "IAM Delete Access Key", response, params)
return response return response
@reporter.step("Deletes the specified IAM group") @reporter.step("Deletes the specified IAM group")
@report_error
def iam_delete_group(self, group_name: str) -> dict: def iam_delete_group(self, group_name: str) -> dict:
response = self.boto3_iam_client.delete_group(GroupName=group_name) response = self.boto3_iam_client.delete_group(GroupName=group_name)
log_command_execution(self.iam_endpoint, "IAM Delete Group", response, {"GroupName": group_name})
return response return response
@reporter.step("Deletes the specified inline policy that is embedded in the specified IAM group") @reporter.step("Deletes the specified inline policy that is embedded in the specified IAM group")
@report_error
def iam_delete_group_policy(self, group_name: str, policy_name: str) -> dict: def iam_delete_group_policy(self, group_name: str, policy_name: str) -> dict:
response = self.boto3_iam_client.delete_group_policy(GroupName=group_name, PolicyName=policy_name) params = self._convert_to_s3_params(locals().items())
response = self.boto3_iam_client.delete_group_policy(**params)
log_command_execution(self.iam_endpoint, "IAM Delete Group Policy", response, params)
return response return response
@reporter.step("Deletes the specified managed policy") @reporter.step("Deletes the specified managed policy")
@report_error
def iam_delete_policy(self, policy_arn: str) -> dict: def iam_delete_policy(self, policy_arn: str) -> dict:
response = self.boto3_iam_client.delete_policy(PolicyArn=policy_arn) response = self.boto3_iam_client.delete_policy(PolicyArn=policy_arn)
log_command_execution(self.iam_endpoint, "IAM Delete Policy", response, {"PolicyArn": policy_arn})
return response return response
@reporter.step("Deletes the specified IAM user") @reporter.step("Deletes the specified IAM user")
@report_error
def iam_delete_user(self, user_name: str) -> dict: def iam_delete_user(self, user_name: str) -> dict:
response = self.boto3_iam_client.delete_user(UserName=user_name) response = self.boto3_iam_client.delete_user(UserName=user_name)
log_command_execution(self.iam_endpoint, "IAM Delete User", response, {"UserName": user_name})
return response return response
@reporter.step("Deletes the specified inline policy that is embedded in the specified IAM user") @reporter.step("Deletes the specified inline policy that is embedded in the specified IAM user")
@report_error
def iam_delete_user_policy(self, user_name: str, policy_name: str) -> dict: def iam_delete_user_policy(self, user_name: str, policy_name: str) -> dict:
response = self.boto3_iam_client.delete_user_policy(UserName=user_name, PolicyName=policy_name) params = self._convert_to_s3_params(locals().items())
response = self.boto3_iam_client.delete_user_policy(**params)
log_command_execution(self.iam_endpoint, "IAM Delete User Policy", response, params)
return response return response
@reporter.step("Removes the specified managed policy from the specified IAM group") @reporter.step("Removes the specified managed policy from the specified IAM group")
@report_error
def iam_detach_group_policy(self, group_name: str, policy_arn: str) -> dict: def iam_detach_group_policy(self, group_name: str, policy_arn: str) -> dict:
response = self.boto3_iam_client.detach_group_policy(GroupName=group_name, PolicyArn=policy_arn) params = self._convert_to_s3_params(locals().items())
response = self.boto3_iam_client.detach_group_policy(**params)
log_command_execution(self.iam_endpoint, "IAM Detach Group Policy", response, params)
sleep(S3_SYNC_WAIT_TIME * 10) sleep(S3_SYNC_WAIT_TIME * 10)
return response return response
@reporter.step("Removes the specified managed policy from the specified user") @reporter.step("Removes the specified managed policy from the specified user")
@report_error
def iam_detach_user_policy(self, user_name: str, policy_arn: str) -> dict: def iam_detach_user_policy(self, user_name: str, policy_arn: str) -> dict:
response = self.boto3_iam_client.detach_user_policy(UserName=user_name, PolicyArn=policy_arn) params = self._convert_to_s3_params(locals().items())
response = self.boto3_iam_client.detach_user_policy(**params)
log_command_execution(self.iam_endpoint, "IAM Detach User Policy", response, params)
sleep(S3_SYNC_WAIT_TIME * 10) sleep(S3_SYNC_WAIT_TIME * 10)
return response return response
@reporter.step("Returns a list of IAM users that are in the specified IAM group") @reporter.step("Returns a list of IAM users that are in the specified IAM group")
@report_error
def iam_get_group(self, group_name: str) -> dict: def iam_get_group(self, group_name: str) -> dict:
response = self.boto3_iam_client.get_group(GroupName=group_name) response = self.boto3_iam_client.get_group(GroupName=group_name)
log_command_execution(self.iam_endpoint, "IAM Get Group", response, {"GroupName": group_name})
assert response.get("Group").get("GroupName") == group_name, f"GroupName should be equal to {group_name}" assert response.get("Group").get("GroupName") == group_name, f"GroupName should be equal to {group_name}"
return response return response
@reporter.step("Retrieves the specified inline policy document that is embedded in the specified IAM group") @reporter.step("Retrieves the specified inline policy document that is embedded in the specified IAM group")
@report_error
def iam_get_group_policy(self, group_name: str, policy_name: str) -> dict: def iam_get_group_policy(self, group_name: str, policy_name: str) -> dict:
response = self.boto3_iam_client.get_group_policy(GroupName=group_name, PolicyName=policy_name) params = self._convert_to_s3_params(locals().items())
response = self.boto3_iam_client.get_group_policy(**params)
log_command_execution(self.iam_endpoint, "IAM Get Group Policy", response, params)
return response return response
@reporter.step("Retrieves information about the specified managed policy") @reporter.step("Retrieves information about the specified managed policy")
@report_error
def iam_get_policy(self, policy_arn: str) -> dict: def iam_get_policy(self, policy_arn: str) -> dict:
response = self.boto3_iam_client.get_policy(PolicyArn=policy_arn) response = self.boto3_iam_client.get_policy(PolicyArn=policy_arn)
log_command_execution(self.iam_endpoint, "IAM Get Policy", response, {"PolicyArn": policy_arn})
assert response.get("Policy"), f"Expected Policy in response:\n{response}" assert response.get("Policy"), f"Expected Policy in response:\n{response}"
assert response["Policy"].get("Arn") == policy_arn, f"PolicyArn should be equal to {policy_arn}" assert response["Policy"].get("Arn") == policy_arn, f"PolicyArn should be equal to {policy_arn}"
return response return response
@reporter.step("Retrieves information about the specified version of the specified managed policy") @reporter.step("Retrieves information about the specified version of the specified managed policy")
@report_error
def iam_get_policy_version(self, policy_arn: str, version_id: str) -> dict: def iam_get_policy_version(self, policy_arn: str, version_id: str) -> dict:
response = self.boto3_iam_client.get_policy_version(PolicyArn=policy_arn, VersionId=version_id) params = self._convert_to_s3_params(locals().items())
response = self.boto3_iam_client.get_policy_version(**params)
log_command_execution(self.iam_endpoint, "IAM Get Policy Version", response, params)
assert response.get("PolicyVersion"), f"Expected PolicyVersion in response:\n{response}" assert response.get("PolicyVersion"), f"Expected PolicyVersion in response:\n{response}"
assert response["PolicyVersion"].get("VersionId") == version_id, f"VersionId should be equal to {version_id}" assert response["PolicyVersion"].get("VersionId") == version_id, f"VersionId should be equal to {version_id}"
return response return response
@reporter.step("Retrieves information about the specified IAM user") @reporter.step("Retrieves information about the specified IAM user")
@report_error
def iam_get_user(self, user_name: str) -> dict: def iam_get_user(self, user_name: str) -> dict:
response = self.boto3_iam_client.get_user(UserName=user_name) response = self.boto3_iam_client.get_user(UserName=user_name)
log_command_execution(self.iam_endpoint, "IAM Get User", response, {"UserName": user_name})
assert response.get("User"), f"Expected User in response:\n{response}" assert response.get("User"), f"Expected User in response:\n{response}"
assert response["User"].get("UserName") == user_name, f"UserName should be equal to {user_name}" assert response["User"].get("UserName") == user_name, f"UserName should be equal to {user_name}"
return response return response
@reporter.step("Retrieves the specified inline policy document that is embedded in the specified IAM user") @reporter.step("Retrieves the specified inline policy document that is embedded in the specified IAM user")
@report_error
def iam_get_user_policy(self, user_name: str, policy_name: str) -> dict: def iam_get_user_policy(self, user_name: str, policy_name: str) -> dict:
response = self.boto3_iam_client.get_user_policy(UserName=user_name, PolicyName=policy_name) params = self._convert_to_s3_params(locals().items())
response = self.boto3_iam_client.get_user_policy(**params)
log_command_execution(self.iam_endpoint, "IAM Get User Policy", response, params)
assert response.get("UserName"), f"Expected UserName in response:\n{response}" assert response.get("UserName"), f"Expected UserName in response:\n{response}"
return response return response
@reporter.step("Returns information about the access key IDs associated with the specified IAM user") @reporter.step("Returns information about the access key IDs associated with the specified IAM user")
@report_error
def iam_list_access_keys(self, user_name: str) -> dict: def iam_list_access_keys(self, user_name: str) -> dict:
response = self.boto3_iam_client.list_access_keys(UserName=user_name) response = self.boto3_iam_client.list_access_keys(UserName=user_name)
log_command_execution(self.iam_endpoint, "IAM List Access Keys", response, {"UserName": user_name})
return response return response
@reporter.step("Lists all managed policies that are attached to the specified IAM group") @reporter.step("Lists all managed policies that are attached to the specified IAM group")
@report_error
def iam_list_attached_group_policies(self, group_name: str) -> dict: def iam_list_attached_group_policies(self, group_name: str) -> dict:
response = self.boto3_iam_client.list_attached_group_policies(GroupName=group_name) response = self.boto3_iam_client.list_attached_group_policies(GroupName=group_name)
log_command_execution(self.iam_endpoint, "IAM List Attached Group Policies", response, {"GroupName": group_name})
assert response.get("AttachedPolicies"), f"Expected AttachedPolicies in response:\n{response}" assert response.get("AttachedPolicies"), f"Expected AttachedPolicies in response:\n{response}"
return response return response
@reporter.step("Lists all managed policies that are attached to the specified IAM user") @reporter.step("Lists all managed policies that are attached to the specified IAM user")
@report_error
def iam_list_attached_user_policies(self, user_name: str) -> dict: def iam_list_attached_user_policies(self, user_name: str) -> dict:
response = self.boto3_iam_client.list_attached_user_policies(UserName=user_name) response = self.boto3_iam_client.list_attached_user_policies(UserName=user_name)
log_command_execution(self.iam_endpoint, "IAM List Attached User Policies", response, {"UserName": user_name})
assert response.get("AttachedPolicies"), f"Expected AttachedPolicies in response:\n{response}" assert response.get("AttachedPolicies"), f"Expected AttachedPolicies in response:\n{response}"
return response return response
@reporter.step("Lists all IAM users, groups, and roles that the specified managed policy is attached to") @reporter.step("Lists all IAM users, groups, and roles that the specified managed policy is attached to")
@report_error
def iam_list_entities_for_policy(self, policy_arn: str) -> dict: def iam_list_entities_for_policy(self, policy_arn: str) -> dict:
response = self.boto3_iam_client.list_entities_for_policy(PolicyArn=policy_arn) response = self.boto3_iam_client.list_entities_for_policy(PolicyArn=policy_arn)
log_command_execution(self.iam_endpoint, "IAM List Entities For Policy", response, {"PolicyArn": policy_arn})
assert response.get("PolicyGroups"), f"Expected PolicyGroups in response:\n{response}" assert response.get("PolicyGroups"), f"Expected PolicyGroups in response:\n{response}"
assert response.get("PolicyUsers"), f"Expected PolicyUsers in response:\n{response}" assert response.get("PolicyUsers"), f"Expected PolicyUsers in response:\n{response}"
@ -842,100 +932,127 @@ class Boto3ClientWrapper(S3ClientWrapper):
return response return response
@reporter.step("Lists the names of the inline policies that are embedded in the specified IAM group") @reporter.step("Lists the names of the inline policies that are embedded in the specified IAM group")
@report_error
def iam_list_group_policies(self, group_name: str) -> dict: def iam_list_group_policies(self, group_name: str) -> dict:
response = self.boto3_iam_client.list_group_policies(GroupName=group_name) response = self.boto3_iam_client.list_group_policies(GroupName=group_name)
log_command_execution(self.iam_endpoint, "IAM List Group Policies", response, {"GroupName": group_name})
assert response.get("PolicyNames"), f"Expected PolicyNames in response:\n{response}" assert response.get("PolicyNames"), f"Expected PolicyNames in response:\n{response}"
return response return response
@reporter.step("Lists the IAM groups") @reporter.step("Lists the IAM groups")
@report_error
def iam_list_groups(self) -> dict: def iam_list_groups(self) -> dict:
response = self.boto3_iam_client.list_groups() response = self.boto3_iam_client.list_groups()
log_command_execution(self.iam_endpoint, "IAM List Groups", response)
assert response.get("Groups"), f"Expected Groups in response:\n{response}" assert response.get("Groups"), f"Expected Groups in response:\n{response}"
return response return response
@reporter.step("Lists the IAM groups that the specified IAM user belongs to") @reporter.step("Lists the IAM groups that the specified IAM user belongs to")
@report_error
def iam_list_groups_for_user(self, user_name: str) -> dict: def iam_list_groups_for_user(self, user_name: str) -> dict:
response = self.boto3_iam_client.list_groups_for_user(UserName=user_name) response = self.boto3_iam_client.list_groups_for_user(UserName=user_name)
log_command_execution(self.iam_endpoint, "IAM List Groups For User", response, {"UserName": user_name})
assert response.get("Groups"), f"Expected Groups in response:\n{response}" assert response.get("Groups"), f"Expected Groups in response:\n{response}"
return response return response
@reporter.step("Lists all the managed policies that are available in your AWS account") @reporter.step("Lists all the managed policies that are available in your AWS account")
@report_error
def iam_list_policies(self) -> dict: def iam_list_policies(self) -> dict:
response = self.boto3_iam_client.list_policies() response = self.boto3_iam_client.list_policies()
log_command_execution(self.iam_endpoint, "IAM List Policies", response)
assert response.get("Policies"), f"Expected Policies in response:\n{response}" assert response.get("Policies"), f"Expected Policies in response:\n{response}"
return response return response
@reporter.step("Lists information about the versions of the specified managed policy") @reporter.step("Lists information about the versions of the specified managed policy")
@report_error
def iam_list_policy_versions(self, policy_arn: str) -> dict: def iam_list_policy_versions(self, policy_arn: str) -> dict:
response = self.boto3_iam_client.list_policy_versions(PolicyArn=policy_arn) response = self.boto3_iam_client.list_policy_versions(PolicyArn=policy_arn)
log_command_execution(self.iam_endpoint, "IAM List Policy Versions", response, {"PolicyArn": policy_arn})
assert response.get("Versions"), f"Expected Versions in response:\n{response}" assert response.get("Versions"), f"Expected Versions in response:\n{response}"
return response return response
@reporter.step("Lists the names of the inline policies embedded in the specified IAM user") @reporter.step("Lists the names of the inline policies embedded in the specified IAM user")
@report_error
def iam_list_user_policies(self, user_name: str) -> dict: def iam_list_user_policies(self, user_name: str) -> dict:
response = self.boto3_iam_client.list_user_policies(UserName=user_name) response = self.boto3_iam_client.list_user_policies(UserName=user_name)
log_command_execution(self.iam_endpoint, "IAM List User Policies", response, {"UserName": user_name})
assert response.get("PolicyNames"), f"Expected PolicyNames in response:\n{response}" assert response.get("PolicyNames"), f"Expected PolicyNames in response:\n{response}"
return response return response
@reporter.step("Lists the IAM users") @reporter.step("Lists the IAM users")
@report_error
def iam_list_users(self) -> dict: def iam_list_users(self) -> dict:
response = self.boto3_iam_client.list_users() response = self.boto3_iam_client.list_users()
log_command_execution(self.iam_endpoint, "IAM List Users", response)
assert response.get("Users"), f"Expected Users in response:\n{response}" assert response.get("Users"), f"Expected Users in response:\n{response}"
return response return response
@reporter.step("Adds or updates an inline policy document that is embedded in the specified IAM group") @reporter.step("Adds or updates an inline policy document that is embedded in the specified IAM group")
@report_error
def iam_put_group_policy(self, group_name: str, policy_name: str, policy_document: dict) -> dict: def iam_put_group_policy(self, group_name: str, policy_name: str, policy_document: dict) -> dict:
response = self.boto3_iam_client.put_group_policy( params = self._convert_to_s3_params(locals().items())
GroupName=group_name, PolicyName=policy_name, PolicyDocument=json.dumps(policy_document) params["PolicyDocument"] = json.dumps(policy_document)
) response = self.boto3_iam_client.put_group_policy(**params)
log_command_execution(self.iam_endpoint, "IAM Put Group Policy", response, params)
sleep(S3_SYNC_WAIT_TIME * 10) sleep(S3_SYNC_WAIT_TIME * 10)
return response return response
@reporter.step("Adds or updates an inline policy document that is embedded in the specified IAM user") @reporter.step("Adds or updates an inline policy document that is embedded in the specified IAM user")
@report_error
def iam_put_user_policy(self, user_name: str, policy_name: str, policy_document: dict) -> dict: def iam_put_user_policy(self, user_name: str, policy_name: str, policy_document: dict) -> dict:
response = self.boto3_iam_client.put_user_policy( params = self._convert_to_s3_params(locals().items())
UserName=user_name, PolicyName=policy_name, PolicyDocument=json.dumps(policy_document) params["PolicyDocument"] = json.dumps(policy_document)
) response = self.boto3_iam_client.put_user_policy(**params)
log_command_execution(self.iam_endpoint, "IAM Put User Policy", response, params)
sleep(S3_SYNC_WAIT_TIME * 10) sleep(S3_SYNC_WAIT_TIME * 10)
return response return response
@reporter.step("Removes the specified user from the specified group") @reporter.step("Removes the specified user from the specified group")
@report_error
def iam_remove_user_from_group(self, group_name: str, user_name: str) -> dict: def iam_remove_user_from_group(self, group_name: str, user_name: str) -> dict:
response = self.boto3_iam_client.remove_user_from_group(GroupName=group_name, UserName=user_name) params = self._convert_to_s3_params(locals().items())
response = self.boto3_iam_client.remove_user_from_group(**params)
log_command_execution(self.iam_endpoint, "IAM Remove User From Group", response, params)
return response return response
@reporter.step("Updates the name and/or the path of the specified IAM group") @reporter.step("Updates the name and/or the path of the specified IAM group")
@report_error
def iam_update_group(self, group_name: str, new_name: str, new_path: Optional[str] = None) -> dict: def iam_update_group(self, group_name: str, new_name: str, new_path: Optional[str] = None) -> dict:
response = self.boto3_iam_client.update_group(GroupName=group_name, NewGroupName=new_name, NewPath="/") params = {"GroupName": group_name, "NewGroupName": new_name, "NewPath": "/"}
response = self.boto3_iam_client.update_group(**params)
log_command_execution(self.iam_endpoint, "IAM Update Group", response, params)
return response return response
@reporter.step("Updates the name and/or the path of the specified IAM user") @reporter.step("Updates the name and/or the path of the specified IAM user")
@report_error
def iam_update_user(self, user_name: str, new_name: str, new_path: Optional[str] = None) -> dict: def iam_update_user(self, user_name: str, new_name: str, new_path: Optional[str] = None) -> dict:
response = self.boto3_iam_client.update_user(UserName=user_name, NewUserName=new_name, NewPath="/") params = {"UserName": user_name, "NewUserName": new_name, "NewPath": "/"}
response = self.boto3_iam_client.update_user(**params)
log_command_execution(self.iam_endpoint, "IAM Update User", response, params)
return response return response
@reporter.step("Adds one or more tags to an IAM user") @reporter.step("Adds one or more tags to an IAM user")
@report_error
def iam_tag_user(self, user_name: str, tags: list) -> dict: def iam_tag_user(self, user_name: str, tags: list) -> dict:
tags_json = [{"Key": tag_key, "Value": tag_value} for tag_key, tag_value in tags] params = self._convert_to_s3_params(locals().items())
response = self.boto3_iam_client.tag_user(UserName=user_name, Tags=tags_json) params["Tags"] = [{"Key": tag_key, "Value": tag_value} for tag_key, tag_value in tags]
response = self.boto3_iam_client.tag_user(**params)
log_command_execution(self.iam_endpoint, "IAM Tag User", response, params)
return response return response
@reporter.step("List tags of IAM user") @reporter.step("List tags of IAM user")
@report_error
def iam_list_user_tags(self, user_name: str) -> dict: def iam_list_user_tags(self, user_name: str) -> dict:
response = self.boto3_iam_client.list_user_tags(UserName=user_name) response = self.boto3_iam_client.list_user_tags(UserName=user_name)
log_command_execution(self.iam_endpoint, "IAM List User Tags", response, {"UserName": user_name})
return response return response
@reporter.step("Removes the specified tags from the user") @reporter.step("Removes the specified tags from the user")
@report_error
def iam_untag_user(self, user_name: str, tag_keys: list) -> dict: def iam_untag_user(self, user_name: str, tag_keys: list) -> dict:
response = self.boto3_iam_client.untag_user(UserName=user_name, TagKeys=tag_keys) params = self._convert_to_s3_params(locals().items())
response = self.boto3_iam_client.untag_user(**params)
log_command_execution(self.iam_endpoint, "IAM Untag User", response, params)
return response return response
# MFA methods # MFA methods

View file

@ -58,6 +58,10 @@ class S3ClientWrapper(HumanReadableABC):
def set_endpoint(self, s3gate_endpoint: str): def set_endpoint(self, s3gate_endpoint: str):
"""Set endpoint""" """Set endpoint"""
@abstractmethod
def set_iam_endpoint(self, iam_endpoint: str):
"""Set iam endpoint"""
@abstractmethod @abstractmethod
def create_bucket( def create_bucket(
self, self,
@ -366,6 +370,18 @@ class S3ClientWrapper(HumanReadableABC):
def delete_object_tagging(self, bucket: str, key: str) -> None: def delete_object_tagging(self, bucket: str, key: str) -> None:
"""Removes the entire tag set from the specified object.""" """Removes the entire tag set from the specified object."""
@abstractmethod
def put_bucket_lifecycle_configuration(self, bucket: str, lifecycle_configuration: dict, dumped_configuration: str) -> dict:
"""Adds or updates bucket lifecycle configuration"""
@abstractmethod
def get_bucket_lifecycle_configuration(self, bucket: str) -> dict:
"""Gets bucket lifecycle configuration"""
@abstractmethod
def delete_bucket_lifecycle(self, bucket: str) -> dict:
"""Deletes bucket lifecycle"""
@abstractmethod @abstractmethod
def get_object_attributes( def get_object_attributes(
self, self,

View file

@ -327,13 +327,6 @@ def _parse_cid(output: str) -> str:
return splitted[1] return splitted[1]
@reporter.step("Search container by name")
def search_container_by_name(name: str, node: ClusterNode):
resolver_cls = load_plugin("frostfs.testlib.bucket_cid_resolver", node.host.config.product)
resolver: BucketContainerResolver = resolver_cls()
return resolver.resolve(node, name)
@reporter.step("Search for nodes with a container") @reporter.step("Search for nodes with a container")
def search_nodes_with_container( def search_nodes_with_container(
wallet: WalletInfo, wallet: WalletInfo,

View file

@ -69,7 +69,7 @@ def get_epoch(shell: Shell, cluster: Cluster, alive_node: Optional[StorageNode]
@reporter.step("Tick Epoch") @reporter.step("Tick Epoch")
def tick_epoch(shell: Shell, cluster: Cluster, alive_node: Optional[StorageNode] = None): def tick_epoch(shell: Shell, cluster: Cluster, alive_node: Optional[StorageNode] = None, delta: Optional[int] = None):
""" """
Tick epoch using frostfs-adm or NeoGo if frostfs-adm is not available (DevEnv) Tick epoch using frostfs-adm or NeoGo if frostfs-adm is not available (DevEnv)
Args: Args:
@ -88,12 +88,17 @@ def tick_epoch(shell: Shell, cluster: Cluster, alive_node: Optional[StorageNode]
frostfs_adm_exec_path=FROSTFS_ADM_EXEC, frostfs_adm_exec_path=FROSTFS_ADM_EXEC,
config_file=FROSTFS_ADM_CONFIG_PATH, config_file=FROSTFS_ADM_CONFIG_PATH,
) )
frostfs_adm.morph.force_new_epoch() frostfs_adm.morph.force_new_epoch(delta=delta)
return return
# Otherwise we tick epoch using transaction # Otherwise we tick epoch using transaction
cur_epoch = get_epoch(shell, cluster) cur_epoch = get_epoch(shell, cluster)
if delta:
next_epoch = cur_epoch + delta
else:
next_epoch = cur_epoch + 1
# Use first node by default # Use first node by default
ir_node = cluster.services(InnerRing)[0] ir_node = cluster.services(InnerRing)[0]
# In case if no local_wallet_path is provided, we use wallet_path # In case if no local_wallet_path is provided, we use wallet_path
@ -110,7 +115,7 @@ def tick_epoch(shell: Shell, cluster: Cluster, alive_node: Optional[StorageNode]
wallet_password=ir_wallet_pass, wallet_password=ir_wallet_pass,
scripthash=get_contract_hash(morph_chain, "netmap.frostfs", shell=shell), scripthash=get_contract_hash(morph_chain, "netmap.frostfs", shell=shell),
method="newEpoch", method="newEpoch",
arguments=f"int:{cur_epoch + 1}", arguments=f"int:{next_epoch}",
multisig_hash=f"{ir_address}:Global", multisig_hash=f"{ir_address}:Global",
address=ir_address, address=ir_address,
rpc_endpoint=morph_endpoint, rpc_endpoint=morph_endpoint,

View file

@ -1,8 +1,8 @@
import re import re
from frostfs_testlib import reporter from frostfs_testlib import reporter
from frostfs_testlib.testing.test_control import wait_for_success
from frostfs_testlib.storage.cluster import ClusterNode from frostfs_testlib.storage.cluster import ClusterNode
from frostfs_testlib.testing.test_control import wait_for_success
@reporter.step("Check metrics result") @reporter.step("Check metrics result")
@ -19,7 +19,7 @@ def check_metrics_counter(
counter_act += get_metrics_value(cluster_node, parse_from_command, **metrics_greps) counter_act += get_metrics_value(cluster_node, parse_from_command, **metrics_greps)
assert eval( assert eval(
f"{counter_act} {operator} {counter_exp}" f"{counter_act} {operator} {counter_exp}"
), f"Expected: {counter_exp} {operator} Actual: {counter_act} in node: {cluster_node}" ), f"Expected: {counter_exp} {operator} Actual: {counter_act} in nodes: {cluster_nodes}"
@reporter.step("Get metrics value from node: {node}") @reporter.step("Get metrics value from node: {node}")

View file

@ -13,6 +13,7 @@ from frostfs_testlib.resources.common import MORPH_BLOCK_TIME
from frostfs_testlib.shell import Shell from frostfs_testlib.shell import Shell
from frostfs_testlib.steps.epoch import tick_epoch, wait_for_epochs_align from frostfs_testlib.steps.epoch import tick_epoch, wait_for_epochs_align
from frostfs_testlib.storage.cluster import Cluster, StorageNode from frostfs_testlib.storage.cluster import Cluster, StorageNode
from frostfs_testlib.testing.test_control import wait_for_success
from frostfs_testlib.utils import datetime_utils from frostfs_testlib.utils import datetime_utils
logger = logging.getLogger("NeoLogger") logger = logging.getLogger("NeoLogger")
@ -111,10 +112,7 @@ def get_netmap_snapshot(node: StorageNode, shell: Shell) -> str:
storage_wallet_path = node.get_wallet_path() storage_wallet_path = node.get_wallet_path()
cli = FrostfsCli(shell, FROSTFS_CLI_EXEC, config_file=storage_wallet_config) cli = FrostfsCli(shell, FROSTFS_CLI_EXEC, config_file=storage_wallet_config)
return cli.netmap.snapshot( return cli.netmap.snapshot(rpc_endpoint=node.get_rpc_endpoint(), wallet=storage_wallet_path).stdout
rpc_endpoint=node.get_rpc_endpoint(),
wallet=storage_wallet_path,
).stdout
@reporter.step("Get shard list for {node}") @reporter.step("Get shard list for {node}")
@ -202,12 +200,7 @@ def delete_node_data(node: StorageNode) -> None:
@reporter.step("Exclude node {node_to_exclude} from network map") @reporter.step("Exclude node {node_to_exclude} from network map")
def exclude_node_from_network_map( def exclude_node_from_network_map(node_to_exclude: StorageNode, alive_node: StorageNode, shell: Shell, cluster: Cluster) -> None:
node_to_exclude: StorageNode,
alive_node: StorageNode,
shell: Shell,
cluster: Cluster,
) -> None:
node_netmap_key = node_to_exclude.get_wallet_public_key() node_netmap_key = node_to_exclude.get_wallet_public_key()
storage_node_set_status(node_to_exclude, status="offline") storage_node_set_status(node_to_exclude, status="offline")
@ -221,12 +214,7 @@ def exclude_node_from_network_map(
@reporter.step("Include node {node_to_include} into network map") @reporter.step("Include node {node_to_include} into network map")
def include_node_to_network_map( def include_node_to_network_map(node_to_include: StorageNode, alive_node: StorageNode, shell: Shell, cluster: Cluster) -> None:
node_to_include: StorageNode,
alive_node: StorageNode,
shell: Shell,
cluster: Cluster,
) -> None:
storage_node_set_status(node_to_include, status="online") storage_node_set_status(node_to_include, status="online")
# Per suggestion of @fyrchik we need to wait for 2 blocks after we set status and after tick epoch. # Per suggestion of @fyrchik we need to wait for 2 blocks after we set status and after tick epoch.
@ -236,7 +224,7 @@ def include_node_to_network_map(
tick_epoch(shell, cluster) tick_epoch(shell, cluster)
time.sleep(datetime_utils.parse_time(MORPH_BLOCK_TIME) * 2) time.sleep(datetime_utils.parse_time(MORPH_BLOCK_TIME) * 2)
check_node_in_map(node_to_include, shell, alive_node) await_node_in_map(node_to_include, shell, alive_node)
@reporter.step("Check node {node} in network map") @reporter.step("Check node {node} in network map")
@ -250,6 +238,11 @@ def check_node_in_map(node: StorageNode, shell: Shell, alive_node: Optional[Stor
assert node_netmap_key in snapshot, f"Expected node with key {node_netmap_key} to be in network map" assert node_netmap_key in snapshot, f"Expected node with key {node_netmap_key} to be in network map"
@wait_for_success(300, 15, title="Await node {node} in network map")
def await_node_in_map(node: StorageNode, shell: Shell, alive_node: Optional[StorageNode] = None) -> None:
check_node_in_map(node, shell, alive_node)
@reporter.step("Check node {node} NOT in network map") @reporter.step("Check node {node} NOT in network map")
def check_node_not_in_map(node: StorageNode, shell: Shell, alive_node: Optional[StorageNode] = None) -> None: def check_node_not_in_map(node: StorageNode, shell: Shell, alive_node: Optional[StorageNode] = None) -> None:
alive_node = alive_node or node alive_node = alive_node or node
@ -276,12 +269,7 @@ def wait_for_node_to_be_ready(node: StorageNode) -> None:
@reporter.step("Remove nodes from network map trough cli-adm morph command") @reporter.step("Remove nodes from network map trough cli-adm morph command")
def remove_nodes_from_map_morph( def remove_nodes_from_map_morph(shell: Shell, cluster: Cluster, remove_nodes: list[StorageNode], alive_node: Optional[StorageNode] = None):
shell: Shell,
cluster: Cluster,
remove_nodes: list[StorageNode],
alive_node: Optional[StorageNode] = None,
):
""" """
Move node to the Offline state in the candidates list and tick an epoch to update the netmap Move node to the Offline state in the candidates list and tick an epoch to update the netmap
using frostfs-adm using frostfs-adm
@ -300,9 +288,5 @@ def remove_nodes_from_map_morph(
if FROSTFS_ADM_EXEC and FROSTFS_ADM_CONFIG_PATH: if FROSTFS_ADM_EXEC and FROSTFS_ADM_CONFIG_PATH:
# If frostfs-adm is available, then we tick epoch with it (to be consistent with UAT tests) # If frostfs-adm is available, then we tick epoch with it (to be consistent with UAT tests)
frostfsadm = FrostfsAdm( frostfsadm = FrostfsAdm(shell=remote_shell, frostfs_adm_exec_path=FROSTFS_ADM_EXEC, config_file=FROSTFS_ADM_CONFIG_PATH)
shell=remote_shell,
frostfs_adm_exec_path=FROSTFS_ADM_EXEC,
config_file=FROSTFS_ADM_CONFIG_PATH,
)
frostfsadm.morph.remove_nodes(node_netmap_keys) frostfsadm.morph.remove_nodes(node_netmap_keys)

View file

@ -7,8 +7,9 @@ from dateutil.parser import parse
from frostfs_testlib import reporter from frostfs_testlib import reporter
from frostfs_testlib.s3 import S3ClientWrapper, VersioningStatus from frostfs_testlib.s3 import S3ClientWrapper, VersioningStatus
from frostfs_testlib.s3.interfaces import BucketContainerResolver
from frostfs_testlib.shell import Shell from frostfs_testlib.shell import Shell
from frostfs_testlib.steps.cli.container import search_container_by_name, search_nodes_with_container from frostfs_testlib.steps.cli.container import search_nodes_with_container
from frostfs_testlib.storage.cluster import Cluster, ClusterNode from frostfs_testlib.storage.cluster import Cluster, ClusterNode
from frostfs_testlib.storage.dataclasses.wallet import WalletInfo from frostfs_testlib.storage.dataclasses.wallet import WalletInfo
@ -175,10 +176,11 @@ def search_nodes_with_bucket(
wallet: WalletInfo, wallet: WalletInfo,
shell: Shell, shell: Shell,
endpoint: str, endpoint: str,
bucket_container_resolver: BucketContainerResolver,
) -> list[ClusterNode]: ) -> list[ClusterNode]:
cid = None cid = None
for cluster_node in cluster.cluster_nodes: for cluster_node in cluster.cluster_nodes:
cid = search_container_by_name(name=bucket_name, node=cluster_node) cid = bucket_container_resolver.resolve(cluster_node, bucket_name)
if cid: if cid:
break break
nodes_list = search_nodes_with_container(wallet=wallet, cid=cid, shell=shell, endpoint=endpoint, cluster=cluster) nodes_list = search_nodes_with_container(wallet=wallet, cid=cid, shell=shell, endpoint=endpoint, cluster=cluster)

View file

@ -12,6 +12,7 @@ class ConfigAttributes:
REMOTE_WALLET_CONFIG = "remote_wallet_config_path" REMOTE_WALLET_CONFIG = "remote_wallet_config_path"
ENDPOINT_DATA_0 = "endpoint_data0" ENDPOINT_DATA_0 = "endpoint_data0"
ENDPOINT_DATA_1 = "endpoint_data1" ENDPOINT_DATA_1 = "endpoint_data1"
ENDPOINT_DATA_0_NS = "endpoint_data0_namespace"
ENDPOINT_INTERNAL = "endpoint_internal0" ENDPOINT_INTERNAL = "endpoint_internal0"
ENDPOINT_PROMETHEUS = "endpoint_prometheus" ENDPOINT_PROMETHEUS = "endpoint_prometheus"
CONTROL_ENDPOINT = "control_endpoint" CONTROL_ENDPOINT = "control_endpoint"

View file

@ -14,6 +14,7 @@ from frostfs_testlib.resources.cli import FROSTFS_ADM_CONFIG_PATH, FROSTFS_ADM_E
from frostfs_testlib.resources.common import MORPH_BLOCK_TIME from frostfs_testlib.resources.common import MORPH_BLOCK_TIME
from frostfs_testlib.shell import CommandOptions, Shell, SshConnectionProvider from frostfs_testlib.shell import CommandOptions, Shell, SshConnectionProvider
from frostfs_testlib.steps.network import IpHelper from frostfs_testlib.steps.network import IpHelper
from frostfs_testlib.steps.node_management import include_node_to_network_map, remove_nodes_from_map_morph
from frostfs_testlib.storage.cluster import Cluster, ClusterNode, S3Gate, StorageNode from frostfs_testlib.storage.cluster import Cluster, ClusterNode, S3Gate, StorageNode
from frostfs_testlib.storage.controllers.disk_controller import DiskController from frostfs_testlib.storage.controllers.disk_controller import DiskController
from frostfs_testlib.storage.dataclasses.node_base import NodeBase, ServiceClass from frostfs_testlib.storage.dataclasses.node_base import NodeBase, ServiceClass
@ -39,6 +40,7 @@ class ClusterStateController:
self.stopped_nodes: list[ClusterNode] = [] self.stopped_nodes: list[ClusterNode] = []
self.detached_disks: dict[str, DiskController] = {} self.detached_disks: dict[str, DiskController] = {}
self.dropped_traffic: list[ClusterNode] = [] self.dropped_traffic: list[ClusterNode] = []
self.excluded_from_netmap: list[StorageNode] = []
self.stopped_services: set[NodeBase] = set() self.stopped_services: set[NodeBase] = set()
self.cluster = cluster self.cluster = cluster
self.healthcheck = healthcheck self.healthcheck = healthcheck
@ -307,24 +309,17 @@ class ClusterStateController:
self.suspended_services = {} self.suspended_services = {}
@reporter.step("Drop traffic to {node}, nodes - {block_nodes}") @reporter.step("Drop traffic to {node}, nodes - {block_nodes}")
def drop_traffic( def drop_traffic(self, node: ClusterNode, wakeup_timeout: int, name_interface: str, block_nodes: list[ClusterNode] = None) -> None:
self,
node: ClusterNode,
wakeup_timeout: int,
name_interface: str,
block_nodes: list[ClusterNode] = None,
) -> None:
list_ip = self._parse_interfaces(block_nodes, name_interface) list_ip = self._parse_interfaces(block_nodes, name_interface)
IpHelper.drop_input_traffic_to_node(node, list_ip) IpHelper.drop_input_traffic_to_node(node, list_ip)
time.sleep(wakeup_timeout) time.sleep(wakeup_timeout)
self.dropped_traffic.append(node) self.dropped_traffic.append(node)
@reporter.step("Start traffic to {node}") @reporter.step("Start traffic to {node}")
def restore_traffic( def restore_traffic(self, node: ClusterNode) -> None:
self,
node: ClusterNode,
) -> None:
IpHelper.restore_input_traffic_to_node(node=node) IpHelper.restore_input_traffic_to_node(node=node)
index = self.dropped_traffic.index(node)
self.dropped_traffic.pop(index)
@reporter.step("Restore blocked nodes") @reporter.step("Restore blocked nodes")
def restore_all_traffic(self): def restore_all_traffic(self):
@ -408,9 +403,7 @@ class ClusterStateController:
@reporter.step("Set MaintenanceModeAllowed - {status}") @reporter.step("Set MaintenanceModeAllowed - {status}")
def set_maintenance_mode_allowed(self, status: str, cluster_node: ClusterNode) -> None: def set_maintenance_mode_allowed(self, status: str, cluster_node: ClusterNode) -> None:
frostfs_adm = FrostfsAdm( frostfs_adm = FrostfsAdm(
shell=cluster_node.host.get_shell(), shell=cluster_node.host.get_shell(), frostfs_adm_exec_path=FROSTFS_ADM_EXEC, config_file=FROSTFS_ADM_CONFIG_PATH
frostfs_adm_exec_path=FROSTFS_ADM_EXEC,
config_file=FROSTFS_ADM_CONFIG_PATH,
) )
frostfs_adm.morph.set_config(set_key_value=f"MaintenanceModeAllowed={status}") frostfs_adm.morph.set_config(set_key_value=f"MaintenanceModeAllowed={status}")
@ -451,6 +444,25 @@ class ClusterStateController:
else: else:
assert netmap[0].node_status == status, f"Node status should be '{status}', but was '{netmap[0].node_status}'" assert netmap[0].node_status == status, f"Node status should be '{status}', but was '{netmap[0].node_status}'"
def remove_node_from_netmap(self, removes_nodes: list[StorageNode]) -> None:
alive_storage = list(set(self.cluster.storage_nodes) - set(removes_nodes))[0]
remove_nodes_from_map_morph(self.shell, self.cluster, removes_nodes, alive_storage)
self.excluded_from_netmap.extend(removes_nodes)
def include_node_to_netmap(self, include_node: StorageNode, alive_node: StorageNode):
include_node_to_network_map(include_node, alive_node, self.shell, self.cluster)
self.excluded_from_netmap.pop(self.excluded_from_netmap.index(include_node))
def include_all_excluded_nodes(self):
if not self.excluded_from_netmap:
return
alive_node = list(set(self.cluster.storage_nodes) - set(self.excluded_from_netmap))[0]
if not alive_node:
return
for exclude_node in self.excluded_from_netmap.copy():
self.include_node_to_netmap(exclude_node, alive_node)
def _get_cli( def _get_cli(
self, local_shell: Shell, local_wallet: WalletInfo, cluster_node: ClusterNode self, local_shell: Shell, local_wallet: WalletInfo, cluster_node: ClusterNode
) -> tuple[FrostfsAdm, FrostfsCli, FrostfsCli]: ) -> tuple[FrostfsAdm, FrostfsCli, FrostfsCli]:
@ -467,11 +479,7 @@ class ClusterStateController:
frostfs_adm = FrostfsAdm(shell=shell, frostfs_adm_exec_path=FROSTFS_ADM_EXEC, config_file=FROSTFS_ADM_CONFIG_PATH) frostfs_adm = FrostfsAdm(shell=shell, frostfs_adm_exec_path=FROSTFS_ADM_EXEC, config_file=FROSTFS_ADM_CONFIG_PATH)
frostfs_cli = FrostfsCli(local_shell, FROSTFS_CLI_EXEC, local_wallet.config_path) frostfs_cli = FrostfsCli(local_shell, FROSTFS_CLI_EXEC, local_wallet.config_path)
frostfs_cli_remote = FrostfsCli( frostfs_cli_remote = FrostfsCli(shell=shell, frostfs_cli_exec_path=FROSTFS_CLI_EXEC, config_file=wallet_config_path)
shell=shell,
frostfs_cli_exec_path=FROSTFS_CLI_EXEC,
config_file=wallet_config_path,
)
return frostfs_adm, frostfs_cli, frostfs_cli_remote return frostfs_adm, frostfs_cli, frostfs_cli_remote
def _enable_date_synchronizer(self, cluster_node: ClusterNode): def _enable_date_synchronizer(self, cluster_node: ClusterNode):
@ -531,11 +539,8 @@ class ClusterStateController:
except Exception as err: except Exception as err:
logger.warning(f"Host ping fails with error {err}") logger.warning(f"Host ping fails with error {err}")
return HostStatus.ONLINE return HostStatus.ONLINE
@reporter.step("Get contract by domain - {domain_name}") @reporter.step("Get contract by domain - {domain_name}")
def get_domain_contracts(self, cluster_node: ClusterNode, domain_name: str): def get_domain_contracts(self, cluster_node: ClusterNode, domain_name: str):
frostfs_adm = FrostfsAdm( frostfs_adm = FrostfsAdm(shell=cluster_node.host.get_shell(), frostfs_adm_exec_path=FROSTFS_ADM_EXEC)
shell=cluster_node.host.get_shell(),
frostfs_adm_exec_path=FROSTFS_ADM_EXEC,
)
return frostfs_adm.morph.dump_hashes(cluster_node.morph_chain.get_http_endpoint(), domain_name).stdout return frostfs_adm.morph.dump_hashes(cluster_node.morph_chain.get_http_endpoint(), domain_name).stdout

View file

@ -39,12 +39,18 @@ class S3Gate(NodeBase):
def get_endpoint(self) -> str: def get_endpoint(self) -> str:
return self._get_attribute(ConfigAttributes.ENDPOINT_DATA_0) return self._get_attribute(ConfigAttributes.ENDPOINT_DATA_0)
def get_ns_endpoint(self, ns_name: str) -> str:
return self._get_attribute(f"{ConfigAttributes.ENDPOINT_DATA_0}_namespace").format(namespace=ns_name)
def get_all_endpoints(self) -> list[str]: def get_all_endpoints(self) -> list[str]:
return [ return [
self._get_attribute(ConfigAttributes.ENDPOINT_DATA_0), self._get_attribute(ConfigAttributes.ENDPOINT_DATA_0),
self._get_attribute(ConfigAttributes.ENDPOINT_DATA_1), self._get_attribute(ConfigAttributes.ENDPOINT_DATA_1),
] ]
def get_ns_endpoint(self, ns_name: str) -> str:
return self._get_attribute(ConfigAttributes.ENDPOINT_DATA_0_NS).format(namespace=ns_name)
def service_healthcheck(self) -> bool: def service_healthcheck(self) -> bool:
health_metric = "frostfs_s3_gw_state_health" health_metric = "frostfs_s3_gw_state_health"
output = self.host.get_shell().exec(f"curl -s localhost:8086 | grep {health_metric} | sed 1,2d").stdout output = self.host.get_shell().exec(f"curl -s localhost:8086 | grep {health_metric} | sed 1,2d").stdout

View file

@ -90,3 +90,6 @@ class Chunk:
def __str__(self) -> str: def __str__(self) -> str:
return self.object_id return self.object_id
def __repr__(self) -> str:
return self.object_id

View file

@ -8,6 +8,7 @@ from frostfs_testlib.storage.cluster import Cluster, ClusterNode
from frostfs_testlib.storage.controllers.shards_watcher import ShardsWatcher from frostfs_testlib.storage.controllers.shards_watcher import ShardsWatcher
from frostfs_testlib.storage.dataclasses.storage_object_info import Chunk, NodeNetmapInfo from frostfs_testlib.storage.dataclasses.storage_object_info import Chunk, NodeNetmapInfo
from frostfs_testlib.storage.grpc_operations import interfaces from frostfs_testlib.storage.grpc_operations import interfaces
from frostfs_testlib.testing.test_control import wait_for_success
from frostfs_testlib.utils.cli_utils import parse_netmap_output from frostfs_testlib.utils.cli_utils import parse_netmap_output
@ -42,6 +43,7 @@ class ChunksOperations(interfaces.ChunksInterface):
if cluster_node.host_ip == node_info.node: if cluster_node.host_ip == node_info.node:
return (cluster_node, node_info) return (cluster_node, node_info)
@wait_for_success(300, 5, fail_testcase=None)
@reporter.step("Search shard with chunk {chunk}") @reporter.step("Search shard with chunk {chunk}")
def get_shard_chunk(self, node: ClusterNode, chunk: Chunk) -> str: def get_shard_chunk(self, node: ClusterNode, chunk: Chunk) -> str:
oid_path = f"{chunk.object_id[0]}/{chunk.object_id[1]}/{chunk.object_id[2]}/{chunk.object_id[3]}" oid_path = f"{chunk.object_id[0]}/{chunk.object_id[1]}/{chunk.object_id[2]}/{chunk.object_id[3]}"
@ -60,11 +62,10 @@ class ChunksOperations(interfaces.ChunksInterface):
rpc_endpoint: str, rpc_endpoint: str,
cid: str, cid: str,
oid: str, oid: str,
wallet: Optional[str] = None,
address: Optional[str] = None, address: Optional[str] = None,
bearer: Optional[str] = None, bearer: Optional[str] = None,
generate_key: Optional[bool] = None, generate_key: Optional[bool] = None,
trace: bool = False, trace: bool = True,
root: bool = False, root: bool = False,
verify_presence_all: bool = False, verify_presence_all: bool = False,
json: bool = True, json: bool = True,
@ -72,20 +73,33 @@ class ChunksOperations(interfaces.ChunksInterface):
xhdr: Optional[dict] = None, xhdr: Optional[dict] = None,
timeout: Optional[str] = None, timeout: Optional[str] = None,
) -> list[Chunk]: ) -> list[Chunk]:
object_nodes = self.cli.object.nodes(**{param: value for param, value in locals().items() if param not in ["self"]}) object_nodes = self.cli.object.nodes(
return self._parse_object_nodes(object_nodes.stdout) rpc_endpoint=rpc_endpoint,
cid=cid,
address=address,
bearer=bearer,
generate_key=generate_key,
oid=oid,
trace=trace,
root=root,
verify_presence_all=verify_presence_all,
json=json,
ttl=ttl,
xhdr=xhdr,
timeout=timeout,
)
return self._parse_object_nodes(object_nodes.stdout.split("\n")[0])
@reporter.step("Get last parity chunk") @reporter.step("Get last parity chunk")
def get_parity( def get_parity(
self, self,
rpc_endpoint: str, rpc_endpoint: str,
cid: str, cid: str,
wallet: Optional[str] = None,
address: Optional[str] = None, address: Optional[str] = None,
bearer: Optional[str] = None, bearer: Optional[str] = None,
generate_key: Optional[bool] = None, generate_key: Optional[bool] = None,
oid: Optional[str] = None, oid: Optional[str] = None,
trace: bool = False, trace: bool = True,
root: bool = False, root: bool = False,
verify_presence_all: bool = False, verify_presence_all: bool = False,
json: bool = True, json: bool = True,
@ -93,29 +107,56 @@ class ChunksOperations(interfaces.ChunksInterface):
xhdr: Optional[dict] = None, xhdr: Optional[dict] = None,
timeout: Optional[str] = None, timeout: Optional[str] = None,
) -> Chunk: ) -> Chunk:
object_nodes = self.cli.object.nodes(**{param: value for param, value in locals().items() if param not in ["self"]}) object_nodes = self.cli.object.nodes(
return self._parse_object_nodes(object_nodes.stdout)[-1] rpc_endpoint=rpc_endpoint,
cid=cid,
address=address,
bearer=bearer,
generate_key=generate_key,
oid=oid,
trace=trace,
root=root,
verify_presence_all=verify_presence_all,
json=json,
ttl=ttl,
xhdr=xhdr,
timeout=timeout,
)
return self._parse_object_nodes(object_nodes.stdout.split("\n")[0])[-1]
@reporter.step("Get first data chunk") @reporter.step("Get first data chunk")
def get_first_data( def get_first_data(
self, self,
rpc_endpoint: str, rpc_endpoint: str,
cid: str, cid: str,
wallet: Optional[str] = None, oid: Optional[str] = None,
address: Optional[str] = None, address: Optional[str] = None,
bearer: Optional[str] = None, bearer: Optional[str] = None,
generate_key: Optional[bool] = None, generate_key: Optional[bool] = None,
oid: Optional[str] = None, trace: bool = True,
trace: bool = False,
root: bool = False, root: bool = False,
verify_presence_all: bool = False, verify_presence_all: bool = False,
json: bool = True, json: bool = True,
ttl: Optional[int] = None, ttl: Optional[int] = None,
xhdr: Optional[dict] = None, xhdr: Optional[dict] = None,
timeout: Optional[str] = None, timeout: Optional[str] = CLI_DEFAULT_TIMEOUT,
) -> Chunk: ) -> Chunk:
object_nodes = self.cli.object.nodes(**{param: value for param, value in locals().items() if param not in ["self"]}) object_nodes = self.cli.object.nodes(
return self._parse_object_nodes(object_nodes.stdout)[0] rpc_endpoint=rpc_endpoint,
cid=cid,
address=address,
bearer=bearer,
generate_key=generate_key,
oid=oid,
trace=trace,
root=root,
verify_presence_all=verify_presence_all,
json=json,
ttl=ttl,
xhdr=xhdr,
timeout=timeout,
)
return self._parse_object_nodes(object_nodes.stdout.split("\n")[0])[0]
def _parse_object_nodes(self, object_nodes: str) -> list[Chunk]: def _parse_object_nodes(self, object_nodes: str) -> list[Chunk]:
parse_result = json.loads(object_nodes) parse_result = json.loads(object_nodes)

View file

@ -1,11 +1,16 @@
import json
import logging import logging
from typing import Optional import re
from typing import List, Optional, Union
from frostfs_testlib import reporter from frostfs_testlib import reporter
from frostfs_testlib.cli.frostfs_cli.cli import FrostfsCli from frostfs_testlib.cli.frostfs_cli.cli import FrostfsCli
from frostfs_testlib.plugins import load_plugin
from frostfs_testlib.resources.cli import CLI_DEFAULT_TIMEOUT from frostfs_testlib.resources.cli import CLI_DEFAULT_TIMEOUT
from frostfs_testlib.storage.constants import PlacementRule from frostfs_testlib.s3.interfaces import BucketContainerResolver
from frostfs_testlib.storage.cluster import Cluster, ClusterNode
from frostfs_testlib.storage.grpc_operations import interfaces from frostfs_testlib.storage.grpc_operations import interfaces
from frostfs_testlib.utils import json_utils
logger = logging.getLogger("NeoLogger") logger = logging.getLogger("NeoLogger")
@ -18,13 +23,22 @@ class ContainerOperations(interfaces.ContainerInterface):
def create( def create(
self, self,
endpoint: str, endpoint: str,
rule: str = PlacementRule.DEFAULT_PLACEMENT_RULE, nns_zone: Optional[str] = None,
basic_acl: str = "", nns_name: Optional[str] = None,
address: Optional[str] = None,
attributes: Optional[dict] = None, attributes: Optional[dict] = None,
session_token: str = "", basic_acl: Optional[str] = None,
await_mode: bool = False,
disable_timestamp: bool = False,
force: bool = False,
trace: bool = False,
name: Optional[str] = None, name: Optional[str] = None,
options: Optional[dict] = None, nonce: Optional[str] = None,
await_mode: bool = True, policy: Optional[str] = None,
session: Optional[str] = None,
subnet: Optional[str] = None,
ttl: Optional[int] = None,
xhdr: Optional[dict] = None,
timeout: Optional[str] = CLI_DEFAULT_TIMEOUT, timeout: Optional[str] = CLI_DEFAULT_TIMEOUT,
) -> str: ) -> str:
""" """
@ -54,14 +68,23 @@ class ContainerOperations(interfaces.ContainerInterface):
""" """
result = self.cli.container.create( result = self.cli.container.create(
rpc_endpoint=endpoint, rpc_endpoint=endpoint,
policy=rule, policy=policy,
basic_acl=basic_acl, nns_zone=nns_zone,
nns_name=nns_name,
address=address,
attributes=attributes, attributes=attributes,
name=name, basic_acl=basic_acl,
session=session_token,
await_mode=await_mode, await_mode=await_mode,
disable_timestamp=disable_timestamp,
force=force,
trace=trace,
name=name,
nonce=nonce,
session=session,
subnet=subnet,
ttl=ttl,
xhdr=xhdr,
timeout=timeout, timeout=timeout,
**options or {},
) )
cid = self._parse_cid(result.stdout) cid = self._parse_cid(result.stdout)
@ -71,21 +94,216 @@ class ContainerOperations(interfaces.ContainerInterface):
return cid return cid
@reporter.step("List Containers") @reporter.step("List Containers")
def list(self, endpoint: str, timeout: Optional[str] = CLI_DEFAULT_TIMEOUT) -> list[str]: def list(
self,
endpoint: str,
name: Optional[str] = None,
address: Optional[str] = None,
generate_key: Optional[bool] = None,
owner: Optional[str] = None,
ttl: Optional[int] = None,
xhdr: Optional[dict] = None,
timeout: Optional[str] = CLI_DEFAULT_TIMEOUT,
**params,
) -> List[str]:
""" """
A wrapper for `frostfs-cli container list` call. It returns all the A wrapper for `frostfs-cli container list` call. It returns all the
available containers for the given wallet. available containers for the given wallet.
Args: Args:
wallet (WalletInfo): a wallet on whose behalf we list the containers
shell: executor for cli command shell: executor for cli command
endpoint: FrostFS endpoint to send request to, appends to `--rpc-endpoint` key endpoint: FrostFS endpoint to send request to, appends to `--rpc-endpoint` key
timeout: Timeout for the operation. timeout: Timeout for the operation.
Returns: Returns:
(list): list of containers (list): list of containers
""" """
result = self.cli.container.list(rpc_endpoint=endpoint, timeout=timeout) result = self.cli.container.list(
rpc_endpoint=endpoint,
name=name,
address=address,
generate_key=generate_key,
owner=owner,
ttl=ttl,
xhdr=xhdr,
timeout=timeout,
**params,
)
return result.stdout.split() return result.stdout.split()
@reporter.step("List Objects in container")
def list_objects(
self,
endpoint: str,
cid: str,
bearer: Optional[str] = None,
wallet: Optional[str] = None,
address: Optional[str] = None,
generate_key: Optional[bool] = None,
trace: bool = False,
ttl: Optional[int] = None,
xhdr: Optional[dict] = None,
timeout: Optional[str] = CLI_DEFAULT_TIMEOUT,
) -> List[str]:
"""
A wrapper for `frostfs-cli container list-objects` call. It returns all the
available objects in container.
Args:
container_id: cid of container
endpoint: FrostFS endpoint to send request to, appends to `--rpc-endpoint` key
timeout: Timeout for the operation.
Returns:
(list): list of containers
"""
result = self.cli.container.list_objects(
rpc_endpoint=endpoint,
cid=cid,
bearer=bearer,
wallet=wallet,
address=address,
generate_key=generate_key,
trace=trace,
ttl=ttl,
xhdr=xhdr,
timeout=timeout,
)
logger.info(f"Container objects: \n{result}")
return result.stdout.split()
@reporter.step("Delete container")
def delete(
self,
endpoint: str,
cid: str,
address: Optional[str] = None,
await_mode: bool = False,
session: Optional[str] = None,
ttl: Optional[int] = None,
xhdr: Optional[dict] = None,
force: bool = False,
trace: bool = False,
):
try:
return self.cli.container.delete(
rpc_endpoint=endpoint,
cid=cid,
address=address,
await_mode=await_mode,
session=session,
ttl=ttl,
xhdr=xhdr,
force=force,
trace=trace,
).stdout
except RuntimeError as e:
print(f"Error request:\n{e}")
@reporter.step("Get container")
def get(
self,
endpoint: str,
cid: str,
address: Optional[str] = None,
generate_key: Optional[bool] = None,
await_mode: bool = False,
to: Optional[str] = None,
json_mode: bool = True,
trace: bool = False,
ttl: Optional[int] = None,
xhdr: Optional[dict] = None,
timeout: Optional[str] = CLI_DEFAULT_TIMEOUT,
) -> Union[dict, str]:
result = self.cli.container.get(
rpc_endpoint=endpoint,
cid=cid,
address=address,
generate_key=generate_key,
await_mode=await_mode,
to=to,
json_mode=json_mode,
trace=trace,
ttl=ttl,
xhdr=xhdr,
timeout=timeout,
)
container_info = json.loads(result.stdout)
attributes = dict()
for attr in container_info["attributes"]:
attributes[attr["key"]] = attr["value"]
container_info["attributes"] = attributes
container_info["ownerID"] = json_utils.json_reencode(container_info["ownerID"]["value"])
return container_info
@reporter.step("Get eacl container")
def get_eacl(
self,
endpoint: str,
cid: str,
address: Optional[str] = None,
generate_key: Optional[bool] = None,
await_mode: bool = False,
json_mode: bool = True,
trace: bool = False,
to: Optional[str] = None,
session: Optional[str] = None,
ttl: Optional[int] = None,
xhdr: Optional[dict] = None,
timeout: Optional[str] = CLI_DEFAULT_TIMEOUT,
):
return self.cli.container.get_eacl(
rpc_endpoint=endpoint,
cid=cid,
address=address,
generate_key=generate_key,
await_mode=await_mode,
to=to,
session=session,
ttl=ttl,
xhdr=xhdr,
timeout=CLI_DEFAULT_TIMEOUT,
).stdout
@reporter.step("Get nodes container")
def nodes(
self,
endpoint: str,
cid: str,
cluster: Cluster,
address: Optional[str] = None,
ttl: Optional[int] = None,
from_file: Optional[str] = None,
trace: bool = False,
short: Optional[bool] = True,
xhdr: Optional[dict] = None,
generate_key: Optional[bool] = None,
timeout: Optional[str] = CLI_DEFAULT_TIMEOUT,
) -> List[ClusterNode]:
result = self.cli.container.search_node(
rpc_endpoint=endpoint,
cid=cid,
address=address,
ttl=ttl,
from_file=from_file,
trace=trace,
short=short,
xhdr=xhdr,
generate_key=generate_key,
timeout=timeout,
).stdout
pattern = r"[0-9]+(?:\.[0-9]+){3}"
nodes_ip = list(set(re.findall(pattern, result)))
with reporter.step(f"nodes ips = {nodes_ip}"):
nodes_list = cluster.get_nodes_by_ip(nodes_ip)
with reporter.step(f"Return nodes - {nodes_list}"):
return nodes_list
@reporter.step("Resolve container by name")
def resolve_container_by_name(name: str, node: ClusterNode):
resolver_cls = load_plugin("frostfs.testlib.bucket_cid_resolver", node.host.config.product)
resolver: BucketContainerResolver = resolver_cls()
return resolver.resolve(node, name)
def _parse_cid(self, output: str) -> str: def _parse_cid(self, output: str) -> str:
""" """
Parses container ID from a given CLI output. The input string we expect: Parses container ID from a given CLI output. The input string we expect:

View file

@ -509,6 +509,7 @@ class ObjectOperations(interfaces.ObjectInterface):
cid: str, cid: str,
endpoint: str, endpoint: str,
bearer: str = "", bearer: str = "",
oid: Optional[str] = None,
filters: Optional[dict] = None, filters: Optional[dict] = None,
expected_objects_list: Optional[list] = None, expected_objects_list: Optional[list] = None,
xhdr: Optional[dict] = None, xhdr: Optional[dict] = None,
@ -516,6 +517,9 @@ class ObjectOperations(interfaces.ObjectInterface):
phy: bool = False, phy: bool = False,
root: bool = False, root: bool = False,
timeout: Optional[str] = CLI_DEFAULT_TIMEOUT, timeout: Optional[str] = CLI_DEFAULT_TIMEOUT,
address: Optional[str] = None,
generate_key: Optional[bool] = None,
ttl: Optional[int] = None,
) -> list: ) -> list:
""" """
SEARCH an Object. SEARCH an Object.
@ -541,11 +545,15 @@ class ObjectOperations(interfaces.ObjectInterface):
rpc_endpoint=endpoint, rpc_endpoint=endpoint,
cid=cid, cid=cid,
bearer=bearer, bearer=bearer,
oid=oid,
xhdr=xhdr, xhdr=xhdr,
filters=[f"{filter_key} EQ {filter_val}" for filter_key, filter_val in filters.items()] if filters else None, filters=[f"{filter_key} EQ {filter_val}" for filter_key, filter_val in filters.items()] if filters else None,
session=session, session=session,
phy=phy, phy=phy,
root=root, root=root,
address=address,
generate_key=generate_key,
ttl=ttl,
timeout=timeout, timeout=timeout,
) )

View file

@ -1,7 +1,6 @@
from abc import ABC, abstractmethod from abc import ABC, abstractmethod
from typing import Any, Optional from typing import Any, List, Optional
from frostfs_testlib.resources.cli import CLI_DEFAULT_TIMEOUT
from frostfs_testlib.shell.interfaces import CommandResult from frostfs_testlib.shell.interfaces import CommandResult
from frostfs_testlib.storage.cluster import Cluster, ClusterNode from frostfs_testlib.storage.cluster import Cluster, ClusterNode
from frostfs_testlib.storage.constants import PlacementRule from frostfs_testlib.storage.constants import PlacementRule
@ -96,7 +95,7 @@ class ObjectInterface(ABC):
bearer: str = "", bearer: str = "",
xhdr: Optional[dict] = None, xhdr: Optional[dict] = None,
session: Optional[str] = None, session: Optional[str] = None,
timeout: Optional[str] = CLI_DEFAULT_TIMEOUT, timeout: Optional[str] = None,
) -> str: ) -> str:
pass pass
@ -111,7 +110,7 @@ class ObjectInterface(ABC):
xhdr: Optional[dict] = None, xhdr: Optional[dict] = None,
no_progress: bool = True, no_progress: bool = True,
session: Optional[str] = None, session: Optional[str] = None,
timeout: Optional[str] = CLI_DEFAULT_TIMEOUT, timeout: Optional[str] = None,
) -> file_utils.TestFile: ) -> file_utils.TestFile:
pass pass
@ -126,14 +125,14 @@ class ObjectInterface(ABC):
xhdr: Optional[dict] = None, xhdr: Optional[dict] = None,
no_progress: bool = True, no_progress: bool = True,
session: Optional[str] = None, session: Optional[str] = None,
timeout: Optional[str] = CLI_DEFAULT_TIMEOUT, timeout: Optional[str] = None,
) -> str: ) -> str:
pass pass
@abstractmethod @abstractmethod
def hash( def hash(
self, self,
rpc_endpoint: str, endpoint: str,
cid: str, cid: str,
oid: str, oid: str,
address: Optional[str] = None, address: Optional[str] = None,
@ -145,7 +144,7 @@ class ObjectInterface(ABC):
session: Optional[str] = None, session: Optional[str] = None,
hash_type: Optional[str] = None, hash_type: Optional[str] = None,
xhdr: Optional[dict] = None, xhdr: Optional[dict] = None,
timeout: Optional[str] = CLI_DEFAULT_TIMEOUT, timeout: Optional[str] = None,
) -> str: ) -> str:
pass pass
@ -161,7 +160,7 @@ class ObjectInterface(ABC):
is_raw: bool = False, is_raw: bool = False,
is_direct: bool = False, is_direct: bool = False,
session: Optional[str] = None, session: Optional[str] = None,
timeout: Optional[str] = CLI_DEFAULT_TIMEOUT, timeout: Optional[str] = None,
) -> CommandResult | Any: ) -> CommandResult | Any:
pass pass
@ -178,7 +177,7 @@ class ObjectInterface(ABC):
session: Optional[str] = None, session: Optional[str] = None,
ttl: Optional[int] = None, ttl: Optional[int] = None,
xhdr: Optional[dict] = None, xhdr: Optional[dict] = None,
timeout: Optional[str] = CLI_DEFAULT_TIMEOUT, timeout: Optional[str] = None,
) -> str: ) -> str:
pass pass
@ -195,7 +194,7 @@ class ObjectInterface(ABC):
expire_at: Optional[int] = None, expire_at: Optional[int] = None,
no_progress: bool = True, no_progress: bool = True,
session: Optional[str] = None, session: Optional[str] = None,
timeout: Optional[str] = CLI_DEFAULT_TIMEOUT, timeout: Optional[str] = None,
) -> str: ) -> str:
pass pass
@ -212,7 +211,7 @@ class ObjectInterface(ABC):
expire_at: Optional[int] = None, expire_at: Optional[int] = None,
no_progress: bool = True, no_progress: bool = True,
session: Optional[str] = None, session: Optional[str] = None,
timeout: Optional[str] = CLI_DEFAULT_TIMEOUT, timeout: Optional[str] = None,
) -> str: ) -> str:
pass pass
@ -226,7 +225,7 @@ class ObjectInterface(ABC):
bearer: str = "", bearer: str = "",
xhdr: Optional[dict] = None, xhdr: Optional[dict] = None,
session: Optional[str] = None, session: Optional[str] = None,
timeout: Optional[str] = CLI_DEFAULT_TIMEOUT, timeout: Optional[str] = None,
) -> tuple[file_utils.TestFile, bytes]: ) -> tuple[file_utils.TestFile, bytes]:
pass pass
@ -236,14 +235,18 @@ class ObjectInterface(ABC):
cid: str, cid: str,
endpoint: str, endpoint: str,
bearer: str = "", bearer: str = "",
oid: Optional[str] = None,
filters: Optional[dict] = None, filters: Optional[dict] = None,
expected_objects_list: Optional[list] = None, expected_objects_list: Optional[list] = None,
xhdr: Optional[dict] = None, xhdr: Optional[dict] = None,
session: Optional[str] = None, session: Optional[str] = None,
phy: bool = False, phy: bool = False,
root: bool = False, root: bool = False,
timeout: Optional[str] = CLI_DEFAULT_TIMEOUT, timeout: Optional[str] = None,
) -> list: address: Optional[str] = None,
generate_key: Optional[bool] = None,
ttl: Optional[int] = None,
) -> List:
pass pass
@abstractmethod @abstractmethod
@ -257,8 +260,8 @@ class ObjectInterface(ABC):
xhdr: Optional[dict] = None, xhdr: Optional[dict] = None,
is_direct: bool = False, is_direct: bool = False,
verify_presence_all: bool = False, verify_presence_all: bool = False,
timeout: Optional[str] = CLI_DEFAULT_TIMEOUT, timeout: Optional[str] = None,
) -> list[ClusterNode]: ) -> List[ClusterNode]:
pass pass
@ -267,16 +270,120 @@ class ContainerInterface(ABC):
def create( def create(
self, self,
endpoint: str, endpoint: str,
rule: str = PlacementRule.DEFAULT_PLACEMENT_RULE, nns_zone: Optional[str] = None,
basic_acl: str = "", nns_name: Optional[str] = None,
address: Optional[str] = None,
attributes: Optional[dict] = None, attributes: Optional[dict] = None,
session_token: str = "", basic_acl: Optional[str] = None,
await_mode: bool = False,
disable_timestamp: bool = False,
force: bool = False,
trace: bool = False,
name: Optional[str] = None, name: Optional[str] = None,
options: Optional[dict] = None, nonce: Optional[str] = None,
await_mode: bool = True, policy: Optional[str] = None,
timeout: Optional[str] = CLI_DEFAULT_TIMEOUT, session: Optional[str] = None,
subnet: Optional[str] = None,
ttl: Optional[int] = None,
xhdr: Optional[dict] = None,
timeout: Optional[str] = None,
) -> str: ) -> str:
pass """
Create a new container and register it in the FrostFS.
It will be stored in the sidechain when the Inner Ring accepts it.
"""
raise NotImplementedError("No implemethed method create")
@abstractmethod
def delete(
self,
endpoint: str,
cid: str,
address: Optional[str] = None,
await_mode: bool = False,
session: Optional[str] = None,
ttl: Optional[int] = None,
xhdr: Optional[dict] = None,
force: bool = False,
trace: bool = False,
) -> List[str]:
"""
Delete an existing container.
Only the owner of the container has permission to remove the container.
"""
raise NotImplementedError("No implemethed method delete")
@abstractmethod
def get(
self,
endpoint: str,
cid: str,
address: Optional[str] = None,
generate_key: Optional[bool] = None,
await_mode: bool = False,
to: Optional[str] = None,
json_mode: bool = True,
trace: bool = False,
ttl: Optional[int] = None,
xhdr: Optional[dict] = None,
timeout: Optional[str] = None,
) -> List[str]:
"""Get container field info."""
raise NotImplementedError("No implemethed method get")
@abstractmethod
def get_eacl(
self,
endpoint: str,
cid: str,
address: Optional[str] = None,
generate_key: Optional[bool] = None,
await_mode: bool = False,
json_mode: bool = True,
trace: bool = False,
to: Optional[str] = None,
session: Optional[str] = None,
ttl: Optional[int] = None,
xhdr: Optional[dict] = None,
timeout: Optional[str] = None,
) -> List[str]:
"""Get extended ACL table of container."""
raise NotImplementedError("No implemethed method get-eacl")
@abstractmethod
def list(
self,
endpoint: str,
name: Optional[str] = None,
address: Optional[str] = None,
generate_key: Optional[bool] = None,
trace: bool = False,
owner: Optional[str] = None,
ttl: Optional[int] = None,
xhdr: Optional[dict] = None,
timeout: Optional[str] = None,
**params,
) -> List[str]:
"""List all created containers."""
raise NotImplementedError("No implemethed method list")
@abstractmethod
def nodes(
self,
endpoint: str,
cid: str,
cluster: Cluster,
address: Optional[str] = None,
ttl: Optional[int] = None,
from_file: Optional[str] = None,
trace: bool = False,
short: Optional[bool] = True,
xhdr: Optional[dict] = None,
generate_key: Optional[bool] = None,
timeout: Optional[str] = None,
) -> List[ClusterNode]:
"""Show the nodes participating in the container in the current epoch."""
raise NotImplementedError("No implemethed method nodes")
class GrpcClientWrapper(ABC): class GrpcClientWrapper(ABC):

View file

@ -25,12 +25,8 @@ class ClusterTestBase:
for _ in range(epochs_to_tick): for _ in range(epochs_to_tick):
self.tick_epoch(alive_node, wait_block) self.tick_epoch(alive_node, wait_block)
def tick_epoch( def tick_epoch(self, alive_node: Optional[StorageNode] = None, wait_block: int = None, delta: Optional[int] = None):
self, epoch.tick_epoch(self.shell, self.cluster, alive_node=alive_node, delta=delta)
alive_node: Optional[StorageNode] = None,
wait_block: int = None,
):
epoch.tick_epoch(self.shell, self.cluster, alive_node=alive_node)
if wait_block: if wait_block:
self.wait_for_blocks(wait_block) self.wait_for_blocks(wait_block)

View file

@ -1,3 +1,4 @@
import itertools
import random import random
import re import re
import string import string
@ -7,6 +8,9 @@ ONLY_ASCII_LETTERS = string.ascii_letters
DIGITS_AND_ASCII_LETTERS = string.ascii_letters + string.digits DIGITS_AND_ASCII_LETTERS = string.ascii_letters + string.digits
NON_DIGITS_AND_LETTERS = string.punctuation NON_DIGITS_AND_LETTERS = string.punctuation
# if unique_name is called multiple times within the same microsecond, append 0-4 to the name so it surely unique
FUSE = itertools.cycle(range(5))
def unique_name(prefix: str = "", postfix: str = ""): def unique_name(prefix: str = "", postfix: str = ""):
""" """
@ -18,7 +22,7 @@ def unique_name(prefix: str = "", postfix: str = ""):
Returns: Returns:
unique name string unique name string
""" """
return f"{prefix}{hex(int(datetime.now().timestamp() * 1000000))}{postfix}" return f"{prefix}{hex(int(datetime.now().timestamp() * 1000000))}{next(FUSE)}{postfix}"
def random_string(length: int = 5, source: str = ONLY_ASCII_LETTERS): def random_string(length: int = 5, source: str = ONLY_ASCII_LETTERS):