Compare commits

...

47 commits

Author SHA1 Message Date
24e1dfef28 [#324]Extension list_objects method 2024-11-26 07:37:56 +00:00
0c9660fffc [#323] Update APE related entities
Signed-off-by: a.berezin <a.berezin@yadro.com>
2024-11-20 17:14:33 +03:00
8eaa511e5c [#322] Added classmethod decorator in Http client
Signed-off-by: Dmitriy Zayakin <d.zayakin@yadro.com>
2024-11-18 15:07:24 +00:00
a1953684b8 [#307] added methods for testing MFA 2024-11-18 07:08:42 +00:00
451de5e07e [#320] Added shards detach function
Signed-off-by: Dmitry Anurin <danurin@yadro.com>
2024-11-14 16:22:06 +03:00
f24bfc06fd [#319] Add cached fixture feature
Signed-off-by: a.berezin <a.berezin@yadro.com>
2024-11-13 17:46:03 +03:00
47bc11835b [#318] Add tombstone expiration test
Signed-off-by: a.berezin <a.berezin@yadro.com>
2024-11-13 10:11:03 +03:00
2a90ec74ff [#317] update morph rule chain 2024-11-12 16:01:12 +03:00
95b32a036a [#316] Extend parallel exception message output
Signed-off-by: a.berezin <a.berezin@yadro.com>
2024-11-12 12:28:10 +03:00
55d8ee5da0 [#315] Add http client
Signed-off-by: Kirill Sosnovskikh <k.sosnovskikh@yadro.com>
2024-11-08 15:51:32 +03:00
ea40940514 [#313] update force_new_epoch 2024-11-05 12:37:56 +03:00
6f1baf3cf6 [#312] update morph remove_nodes 2024-11-01 15:50:17 +03:00
26139767f4 [#311] Add AWS CLI command to report from Boto3 request
Signed-off-by: Kirill Sosnovskikh <k.sosnovskikh@yadro.com>
2024-10-31 12:14:51 +00:00
3d6a356e20 [#306] Fix handling of bucket names in AWS CLI
- Add quotes around container names if they contain spaces or `-`.

Signed-off-by: Kirill Sosnovskikh <k.sosnovskikh@yadro.com>
2024-10-31 12:14:36 +00:00
e6faddedeb [#297] add morph rule chain 2024-10-31 13:00:40 +03:00
b2bf6677f1 [#310] Update test marking
Signed-off-by: a.berezin <a.berezin@yadro.com>
2024-10-25 18:52:43 +03:00
3f3be83d90 [#305] Added IAM abstract method 2024-10-25 08:07:47 +00:00
5fa58a55c0 [#304] Improve logging Boto3 IAM methods
Signed-off-by: Kirill Sosnovskikh <k.sosnovskikh@yadro.com>
2024-10-18 19:24:26 +03:00
738cfacbb7 [#300] Refactor tests: use unique_name instead hex + timestamp
Signed-off-by: Kirill Sosnovskikh <k.sosnovskikh@yadro.com>
2024-10-14 10:09:13 +00:00
cf48f474eb [#303] add check if registry is on hdd
Signed-off-by: m.malygina <m.malygina@yadro.com>
2024-10-14 11:16:09 +03:00
2a41f2b0f6 [#301] Added interfaces for put/get lifecycle configuration to s3 clients 2024-10-11 13:35:33 +00:00
a04eba8aec [#302] Autoadd marks for frostfs
Signed-off-by: a.berezin <a.berezin@yadro.com>
2024-10-11 12:23:32 +03:00
2976e30b75 [#299] Add fuse to prevent similar names generation
Signed-off-by: a.berezin <a.berezin@yadro.com>
2024-10-07 15:59:00 +03:00
24b8ca73d7 [#291] get namespace endpoint 2024-09-18 12:30:02 +00:00
cef64e315e [#267] add no rule found object and morph chain 2024-09-18 12:29:54 +00:00
0d750ed114 [#293] Add in CSC methods change blockchain netmap and update CliWrapper
Signed-off-by: Dmitriy Zayakin <d.zayakin@yadro.com>
2024-09-17 10:19:28 +00:00
1bee69042b [#294] add wipe data using wipefs method
Signed-off-by: m.malygina <m.malygina@yadro.com>
2024-09-17 09:38:03 +00:00
4a2ac8a9b6 [#290] Update restore traffic method
Signed-off-by: Dmitriy Zayakin <d.zayakin@yadro.com>
2024-09-11 10:42:51 +03:00
36bfe385d5 Added method get s3 endpoint for namespace 2024-09-10 14:05:44 +00:00
565fd4c72b [#289] Move temp dir fixture to testlib
Signed-off-by: a.berezin <a.berezin@yadro.com>
2024-09-10 13:28:57 +00:00
84e83487f9 [#288] Update object and chunks Clients
Signed-off-by: Dmitriy Zayakin <d.zayakin@yadro.com>
2024-09-10 13:54:51 +03:00
d2f8323fb9 [#286] Change args id in shards.set-mode command
Signed-off-by: Dmitriy Zayakin <d.zayakin@yadro.com>
2024-09-03 15:11:43 +03:00
eba782e7d2 [#285] Change func search bucket nodes and remove old resolver bucket cnr
Signed-off-by: Dmitriy Zayakin <d.zayakin@yadro.com>
2024-09-02 11:15:56 +00:00
85c2707ec8 [#284] Add container operational in CliWrapper
Signed-off-by: Dmitriy Zayakin <d.zayakin@yadro.com>
2024-08-28 12:12:05 +03:00
0caca54e36 [#283] Fix mistakes
Signed-off-by: a.berezin <a.berezin@yadro.com>
2024-08-16 18:12:25 +03:00
8ae1b99db9 [#282] New grpc realization for object operations
Signed-off-by: Dmitriy Zayakin <d.zayakin@yadro.com>
2024-08-16 10:22:21 +03:00
6926c09dbe [#281] add hostname to HostConfig
Signed-off-by: m.malygina <m.malygina@yadro.com>
2024-08-13 14:34:29 +00:00
1c2ed25929 [#280] Fix neo-go query height in steps
Signed-off-by: Dmitriy Zayakin <d.zayakin@yadro.com>
2024-08-13 13:50:19 +00:00
0ba4a73db3 [#279] Add objectID filter for APE
Signed-off-by: a.berezin <a.berezin@yadro.com>
2024-08-08 18:34:46 +03:00
8a8b35846e [#278] Small QoL updates
Signed-off-by: a.berezin <a.berezin@yadro.com>
2024-08-07 18:01:03 +03:00
5bdacdf5ba [#269] Fix get contracts method 2024-08-05 12:54:31 +00:00
ae9e8d8c30 [#274] Fix iam_get_policy function 2024-08-05 12:48:58 +00:00
54b42e2d8d [#274] Fix iam_attach_group_policy function 2024-08-05 12:48:58 +00:00
ea60c2104a [#277] MInor change for shard
Signed-off-by: Dmitriy Zayakin <d.zayakin@yadro.com>
2024-08-05 12:48:20 +00:00
8306a9f3ff [#276] Context manager for parralel func
Signed-off-by: Dmitriy Zayakin <d.zayakin@yadro.com>
2024-08-05 12:47:29 +00:00
6b036a09b7 [#275] Add 'retry' and 'PRESET_CONTAINER_CREATION_RETRY_COUNT' variables to define max num of container creation retries 2024-08-02 11:32:02 +03:00
a983e0566e [#272] Add --generate-key flag to object operations
Signed-off-by: Kirill Sosnovskikh <k.sosnovskikh@yadro.com>
2024-07-29 13:26:47 +03:00
55 changed files with 3415 additions and 471 deletions

View file

@ -1,5 +1,6 @@
hosts:
- address: localhost
hostname: localhost
attributes:
sudo_shell: false
plugin_name: docker

View file

@ -27,8 +27,8 @@ dependencies = [
"testrail-api>=1.12.0",
"pytest==7.1.2",
"tenacity==8.0.1",
"boto3==1.16.33",
"boto3-stubs[essential]==1.16.33",
"boto3==1.35.30",
"boto3-stubs[essential]==1.35.30",
]
requires-python = ">=3.10"

View file

@ -8,8 +8,8 @@ docstring_parser==0.15
testrail-api==1.12.0
tenacity==8.0.1
pytest==7.1.2
boto3==1.16.33
boto3-stubs[essential]==1.16.33
boto3==1.35.30
boto3-stubs[essential]==1.35.30
# Dev dependencies
black==22.8.0

View file

@ -1,3 +1,4 @@
__version__ = "2.0.1"
from .fixtures import configure_testlib, hosting
from .fixtures import configure_testlib, hosting, temp_directory
from .hooks import pytest_collection_modifyitems

View file

@ -69,9 +69,7 @@ class FrostfsAdmMorph(CliCommand):
**{param: param_value for param, param_value in locals().items() if param not in ["self"]},
)
def set_config(
self, set_key_value: str, rpc_endpoint: Optional[str] = None, alphabet_wallets: Optional[str] = None
) -> CommandResult:
def set_config(self, set_key_value: str, rpc_endpoint: Optional[str] = None, alphabet_wallets: Optional[str] = None) -> CommandResult:
"""Add/update global config value in the FrostFS network.
Args:
@ -125,7 +123,7 @@ class FrostfsAdmMorph(CliCommand):
)
def force_new_epoch(
self, rpc_endpoint: Optional[str] = None, alphabet_wallets: Optional[str] = None
self, rpc_endpoint: Optional[str] = None, alphabet_wallets: Optional[str] = None, delta: Optional[int] = None
) -> CommandResult:
"""Create new FrostFS epoch event in the side chain.
@ -344,9 +342,124 @@ class FrostfsAdmMorph(CliCommand):
return self._execute(
f"morph remove-nodes {' '.join(node_netmap_keys)}",
**{
param: param_value
for param, param_value in locals().items()
if param not in ["self", "node_netmap_keys"]
},
**{param: param_value for param, param_value in locals().items() if param not in ["self", "node_netmap_keys"]},
)
def add_rule(
self,
chain_id: str,
target_name: str,
target_type: str,
rule: Optional[list[str]] = None,
path: Optional[str] = None,
chain_id_hex: Optional[bool] = None,
chain_name: Optional[str] = None,
wallet: Optional[str] = None,
address: Optional[str] = None,
timeout: Optional[str] = None,
) -> CommandResult:
"""Drop objects from the node's local storage
Args:
chain-id: Assign ID to the parsed chain
chain-id-hex: Flag to parse chain ID as hex
path: Path to encoded chain in JSON or binary format
rule: Rule statement
target-name: Resource name in APE resource name format
target-type: Resource type(container/namespace)
timeout: Timeout for an operation (default 15s)
wallet: Path to the wallet or binary key
Returns:
Command`s result.
"""
return self._execute(
"morph ape add-rule-chain",
**{param: value for param, value in locals().items() if param not in ["self"]},
)
def get_rule(
self,
chain_id: str,
target_name: str,
target_type: str,
chain_id_hex: Optional[bool] = None,
chain_name: Optional[str] = None,
wallet: Optional[str] = None,
address: Optional[str] = None,
timeout: Optional[str] = None,
) -> CommandResult:
"""Drop objects from the node's local storage
Args:
chain-id string Chain id
chain-id-hex Flag to parse chain ID as hex
target-name string Resource name in APE resource name format
target-type string Resource type(container/namespace)
timeout duration Timeout for an operation (default 15s)
wallet string Path to the wallet or binary key
Returns:
Command`s result.
"""
return self._execute(
"morph ape get-rule-chain",
**{param: value for param, value in locals().items() if param not in ["self"]},
)
def list_rules(
self,
target_type: str,
target_name: Optional[str] = None,
rpc_endpoint: Optional[str] = None,
chain_name: Optional[str] = None,
wallet: Optional[str] = None,
address: Optional[str] = None,
timeout: Optional[str] = None,
) -> CommandResult:
"""Drop objects from the node's local storage
Args:
target-name: Resource name in APE resource name format
target-type: Resource type(container/namespace)
timeout: Timeout for an operation (default 15s)
wallet: Path to the wallet or binary key
Returns:
Command`s result.
"""
return self._execute(
"morph ape list-rule-chains",
**{param: value for param, value in locals().items() if param not in ["self"]},
)
def remove_rule(
self,
chain_id: str,
target_name: str,
target_type: str,
all: Optional[bool] = None,
chain_name: Optional[str] = None,
chain_id_hex: Optional[bool] = None,
wallet: Optional[str] = None,
address: Optional[str] = None,
timeout: Optional[str] = None,
) -> CommandResult:
"""Drop objects from the node's local storage
Args:
all: Remove all chains
chain-id: Assign ID to the parsed chain
chain-id-hex: Flag to parse chain ID as hex
target-name: Resource name in APE resource name format
target-type: Resource type(container/namespace)
timeout: Timeout for an operation (default 15s)
wallet: Path to the wallet or binary key
Returns:
Command`s result.
"""
return self._execute(
"morph ape rm-rule-chain",
**{param: value for param, value in locals().items() if param not in ["self"]},
)

View file

@ -16,6 +16,8 @@ class FrostfsCliContainer(CliCommand):
basic_acl: Optional[str] = None,
await_mode: bool = False,
disable_timestamp: bool = False,
force: bool = False,
trace: bool = False,
name: Optional[str] = None,
nonce: Optional[str] = None,
policy: Optional[str] = None,
@ -37,6 +39,8 @@ class FrostfsCliContainer(CliCommand):
basic_acl: Hex encoded basic ACL value or keywords like 'public-read-write',
'private', 'eacl-public-read' (default "private").
disable_timestamp: Disable timestamp container attribute.
force: Skip placement validity check.
trace: Generate trace ID and print it.
name: Container name attribute.
nonce: UUIDv4 nonce value for container.
policy: QL-encoded or JSON-encoded placement policy or path to file with it.
@ -69,6 +73,7 @@ class FrostfsCliContainer(CliCommand):
ttl: Optional[int] = None,
xhdr: Optional[dict] = None,
force: bool = False,
trace: bool = False,
) -> CommandResult:
"""
Delete an existing container.
@ -78,6 +83,7 @@ class FrostfsCliContainer(CliCommand):
address: Address of wallet account.
await_mode: Block execution until container is removed.
cid: Container ID.
trace: Generate trace ID and print it.
force: Do not check whether container contains locks and remove immediately.
rpc_endpoint: Remote node address (as 'multiaddr' or '<host>:<port>').
session: Path to a JSON-encoded container session token.
@ -100,9 +106,11 @@ class FrostfsCliContainer(CliCommand):
cid: str,
wallet: Optional[str] = None,
address: Optional[str] = None,
generate_key: Optional[bool] = None,
await_mode: bool = False,
to: Optional[str] = None,
json_mode: bool = False,
trace: bool = False,
ttl: Optional[int] = None,
xhdr: Optional[dict] = None,
timeout: Optional[str] = None,
@ -115,12 +123,14 @@ class FrostfsCliContainer(CliCommand):
await_mode: Block execution until container is removed.
cid: Container ID.
json_mode: Print or dump container in JSON format.
trace: Generate trace ID and print it.
rpc_endpoint: Remote node address (as 'multiaddr' or '<host>:<port>').
to: Path to dump encoded container.
ttl: TTL value in request meta header (default 2).
wallet: WIF (NEP-2) string or path to the wallet or binary key.
xhdr: Dict with request X-Headers.
timeout: Timeout for the operation (default 15s).
generate_key: Generate a new private key.
Returns:
Command's result.
@ -136,6 +146,7 @@ class FrostfsCliContainer(CliCommand):
cid: str,
wallet: Optional[str] = None,
address: Optional[str] = None,
generate_key: Optional[bool] = None,
await_mode: bool = False,
to: Optional[str] = None,
session: Optional[str] = None,
@ -152,11 +163,14 @@ class FrostfsCliContainer(CliCommand):
cid: Container ID.
rpc_endpoint: Remote node address (as 'multiaddr' or '<host>:<port>').
to: Path to dump encoded container.
json_mode: Print or dump container in JSON format.
trace: Generate trace ID and print it.
session: Path to a JSON-encoded container session token.
ttl: TTL value in request meta header (default 2).
wallet: WIF (NEP-2) string or path to the wallet or binary key.
xhdr: Dict with request X-Headers.
timeout: Timeout for the operation (default 15s).
generate_key: Generate a new private key.
Returns:
Command's result.
@ -170,8 +184,10 @@ class FrostfsCliContainer(CliCommand):
def list(
self,
rpc_endpoint: str,
name: Optional[str] = None,
wallet: Optional[str] = None,
address: Optional[str] = None,
generate_key: Optional[bool] = None,
owner: Optional[str] = None,
ttl: Optional[int] = None,
xhdr: Optional[dict] = None,
@ -183,12 +199,15 @@ class FrostfsCliContainer(CliCommand):
Args:
address: Address of wallet account.
name: List containers by the attribute name.
owner: Owner of containers (omit to use owner from private key).
rpc_endpoint: Remote node address (as 'multiaddr' or '<host>:<port>').
ttl: TTL value in request meta header (default 2).
wallet: WIF (NEP-2) string or path to the wallet or binary key.
xhdr: Dict with request X-Headers.
trace: Generate trace ID and print it.
timeout: Timeout for the operation (default 15s).
generate_key: Generate a new private key.
Returns:
Command's result.
@ -202,8 +221,11 @@ class FrostfsCliContainer(CliCommand):
self,
rpc_endpoint: str,
cid: str,
bearer: Optional[str] = None,
wallet: Optional[str] = None,
address: Optional[str] = None,
generate_key: Optional[bool] = None,
trace: bool = False,
ttl: Optional[int] = None,
xhdr: Optional[dict] = None,
timeout: Optional[str] = None,
@ -214,11 +236,14 @@ class FrostfsCliContainer(CliCommand):
Args:
address: Address of wallet account.
cid: Container ID.
bearer: File with signed JSON or binary encoded bearer token.
rpc_endpoint: Remote node address (as 'multiaddr' or '<host>:<port>').
ttl: TTL value in request meta header (default 2).
wallet: WIF (NEP-2) string or path to the wallet or binary key.
xhdr: Dict with request X-Headers.
trace: Generate trace ID and print it.
timeout: Timeout for the operation (default 15s).
generate_key: Generate a new private key.
Returns:
Command's result.
@ -228,6 +253,7 @@ class FrostfsCliContainer(CliCommand):
**{param: value for param, value in locals().items() if param not in ["self"]},
)
# TODO Deprecated method with 0.42
def set_eacl(
self,
rpc_endpoint: str,
@ -273,6 +299,7 @@ class FrostfsCliContainer(CliCommand):
address: Optional[str] = None,
ttl: Optional[int] = None,
from_file: Optional[str] = None,
trace: bool = False,
short: Optional[bool] = True,
xhdr: Optional[dict] = None,
generate_key: Optional[bool] = None,
@ -290,8 +317,9 @@ class FrostfsCliContainer(CliCommand):
from_file: string File path with encoded container
timeout: duration Timeout for the operation (default 15 s)
short: shorten the output of node information.
trace: Generate trace ID and print it.
xhdr: Dict with request X-Headers.
generate_key: Generate a new private key
generate_key: Generate a new private key.
Returns:

View file

@ -13,6 +13,7 @@ class FrostfsCliObject(CliCommand):
wallet: Optional[str] = None,
address: Optional[str] = None,
bearer: Optional[str] = None,
generate_key: Optional[bool] = None,
session: Optional[str] = None,
ttl: Optional[int] = None,
xhdr: Optional[dict] = None,
@ -25,6 +26,7 @@ class FrostfsCliObject(CliCommand):
address: Address of wallet account.
bearer: File with signed JSON or binary encoded bearer token.
cid: Container ID.
generate_key: Generate new private key.
oid: Object ID.
rpc_endpoint: Remote node address (as 'multiaddr' or '<host>:<port>').
session: Filepath to a JSON- or binary-encoded token of the object DELETE session.
@ -49,6 +51,7 @@ class FrostfsCliObject(CliCommand):
wallet: Optional[str] = None,
address: Optional[str] = None,
bearer: Optional[str] = None,
generate_key: Optional[bool] = None,
file: Optional[str] = None,
header: Optional[str] = None,
no_progress: bool = False,
@ -66,6 +69,7 @@ class FrostfsCliObject(CliCommand):
bearer: File with signed JSON or binary encoded bearer token.
cid: Container ID.
file: File to write object payload to. Default: stdout.
generate_key: Generate new private key.
header: File to write header to. Default: stdout.
no_progress: Do not show progress bar.
oid: Object ID.
@ -93,6 +97,7 @@ class FrostfsCliObject(CliCommand):
wallet: Optional[str] = None,
address: Optional[str] = None,
bearer: Optional[str] = None,
generate_key: Optional[bool] = None,
range: Optional[str] = None,
salt: Optional[str] = None,
ttl: Optional[int] = None,
@ -108,6 +113,7 @@ class FrostfsCliObject(CliCommand):
address: Address of wallet account.
bearer: File with signed JSON or binary encoded bearer token.
cid: Container ID.
generate_key: Generate new private key.
oid: Object ID.
range: Range to take hash from in the form offset1:length1,...
rpc_endpoint: Remote node address (as 'multiaddr' or '<host>:<port>').
@ -135,6 +141,7 @@ class FrostfsCliObject(CliCommand):
wallet: Optional[str] = None,
address: Optional[str] = None,
bearer: Optional[str] = None,
generate_key: Optional[bool] = None,
file: Optional[str] = None,
json_mode: bool = False,
main_only: bool = False,
@ -153,6 +160,7 @@ class FrostfsCliObject(CliCommand):
bearer: File with signed JSON or binary encoded bearer token.
cid: Container ID.
file: File to write object payload to. Default: stdout.
generate_key: Generate new private key.
json_mode: Marshal output in JSON.
main_only: Return only main fields.
oid: Object ID.
@ -183,6 +191,7 @@ class FrostfsCliObject(CliCommand):
expire_at: Optional[int] = None,
address: Optional[str] = None,
bearer: Optional[str] = None,
generate_key: Optional[bool] = None,
session: Optional[str] = None,
ttl: Optional[int] = None,
xhdr: Optional[dict] = None,
@ -195,6 +204,7 @@ class FrostfsCliObject(CliCommand):
address: Address of wallet account.
bearer: File with signed JSON or binary encoded bearer token.
cid: Container ID.
generate_key: Generate new private key.
oid: Object ID.
lifetime: Lock lifetime.
expire_at: Lock expiration epoch.
@ -222,6 +232,7 @@ class FrostfsCliObject(CliCommand):
address: Optional[str] = None,
attributes: Optional[dict] = None,
bearer: Optional[str] = None,
generate_key: Optional[bool] = None,
copies_number: Optional[int] = None,
disable_filename: bool = False,
disable_timestamp: bool = False,
@ -246,6 +257,7 @@ class FrostfsCliObject(CliCommand):
disable_timestamp: Do not set well-known timestamp attribute.
expire_at: Last epoch in the life of the object.
file: File with object payload.
generate_key: Generate new private key.
no_progress: Do not show progress bar.
notify: Object notification in the form of *epoch*:*topic*; '-'
topic means using default.
@ -273,6 +285,7 @@ class FrostfsCliObject(CliCommand):
wallet: Optional[str] = None,
address: Optional[str] = None,
bearer: Optional[str] = None,
generate_key: Optional[bool] = None,
file: Optional[str] = None,
json_mode: bool = False,
raw: bool = False,
@ -289,6 +302,7 @@ class FrostfsCliObject(CliCommand):
bearer: File with signed JSON or binary encoded bearer token.
cid: Container ID.
file: File to write object payload to. Default: stdout.
generate_key: Generate new private key.
json_mode: Marshal output in JSON.
oid: Object ID.
range: Range to take data from in the form offset:length.
@ -315,6 +329,7 @@ class FrostfsCliObject(CliCommand):
wallet: Optional[str] = None,
address: Optional[str] = None,
bearer: Optional[str] = None,
generate_key: Optional[bool] = None,
filters: Optional[list] = None,
oid: Optional[str] = None,
phy: bool = False,
@ -332,6 +347,7 @@ class FrostfsCliObject(CliCommand):
bearer: File with signed JSON or binary encoded bearer token.
cid: Container ID.
filters: Repeated filter expressions or files with protobuf JSON.
generate_key: Generate new private key.
oid: Object ID.
phy: Search physically stored objects.
root: Search for user objects.
@ -354,11 +370,11 @@ class FrostfsCliObject(CliCommand):
self,
rpc_endpoint: str,
cid: str,
oid: Optional[str] = None,
wallet: Optional[str] = None,
address: Optional[str] = None,
bearer: Optional[str] = None,
generate_key: Optional[bool] = None,
oid: Optional[str] = None,
trace: bool = False,
root: bool = False,
verify_presence_all: bool = False,

View file

@ -40,7 +40,7 @@ class FrostfsCliShards(CliCommand):
self,
endpoint: str,
mode: str,
id: Optional[list[str]],
id: Optional[list[str]] = None,
wallet: Optional[str] = None,
wallet_password: Optional[str] = None,
address: Optional[str] = None,
@ -143,3 +143,119 @@ class FrostfsCliShards(CliCommand):
**{param: value for param, value in locals().items() if param not in ["self", "wallet_password"]},
)
def evacuation_start(
self,
endpoint: str,
id: Optional[str] = None,
scope: Optional[str] = None,
all: bool = False,
no_errors: bool = True,
await_mode: bool = False,
address: Optional[str] = None,
timeout: Optional[str] = None,
no_progress: bool = False,
) -> CommandResult:
"""
Objects evacuation from shard to other shards.
Args:
address: Address of wallet account
all: Process all shards
await: Block execution until evacuation is completed
endpoint: Remote node control address (as 'multiaddr' or '<host>:<port>')
id: List of shard IDs in base58 encoding
no_errors: Skip invalid/unreadable objects (default true)
no_progress: Print progress if await provided
scope: Evacuation scope; possible values: trees, objects, all (default "all")
timeout: Timeout for an operation (default 15s)
Returns:
Command's result.
"""
return self._execute(
"control shards evacuation start",
**{param: value for param, value in locals().items() if param not in ["self"]},
)
def evacuation_reset(
self,
endpoint: str,
address: Optional[str] = None,
timeout: Optional[str] = None,
) -> CommandResult:
"""
Reset evacuate objects from shard to other shards status.
Args:
address: Address of wallet account
endpoint: Remote node control address (as 'multiaddr' or '<host>:<port>')
timeout: Timeout for an operation (default 15s)
Returns:
Command's result.
"""
return self._execute(
"control shards evacuation reset",
**{param: value for param, value in locals().items() if param not in ["self"]},
)
def evacuation_stop(
self,
endpoint: str,
address: Optional[str] = None,
timeout: Optional[str] = None,
) -> CommandResult:
"""
Stop running evacuate process from shard to other shards.
Args:
address: Address of wallet account
endpoint: Remote node control address (as 'multiaddr' or '<host>:<port>')
timeout: Timeout for an operation (default 15s)
Returns:
Command's result.
"""
return self._execute(
"control shards evacuation stop",
**{param: value for param, value in locals().items() if param not in ["self"]},
)
def evacuation_status(
self,
endpoint: str,
address: Optional[str] = None,
timeout: Optional[str] = None,
) -> CommandResult:
"""
Get evacuate objects from shard to other shards status.
Args:
address: Address of wallet account
endpoint: Remote node control address (as 'multiaddr' or '<host>:<port>')
timeout: Timeout for an operation (default 15s)
Returns:
Command's result.
"""
return self._execute(
"control shards evacuation status",
**{param: value for param, value in locals().items() if param not in ["self"]},
)
def detach(self, endpoint: str, address: Optional[str] = None, id: Optional[str] = None, timeout: Optional[str] = None):
"""
Detach and close the shards
Args:
address: Address of wallet account
endpoint: Remote node control address (as 'multiaddr' or '<host>:<port>')
id: List of shard IDs in base58 encoding
timeout: Timeout for an operation (default 15s)
Returns:
Command's result.
"""
return self._execute(
"control shards detach",
**{param: value for param, value in locals().items() if param not in ["self"]},
)

View file

@ -1,5 +1,4 @@
import re
from datetime import datetime
from typing import Optional
from frostfs_testlib import reporter
@ -10,6 +9,7 @@ from frostfs_testlib.shell import LocalShell
from frostfs_testlib.steps.cli.container import list_containers
from frostfs_testlib.storage.cluster import ClusterNode
from frostfs_testlib.storage.dataclasses.frostfs_services import S3Gate
from frostfs_testlib.utils import string_utils
class AuthmateS3CredentialsProvider(S3CredentialsProvider):
@ -22,7 +22,7 @@ class AuthmateS3CredentialsProvider(S3CredentialsProvider):
gate_public_keys = [node.service(S3Gate).get_wallet_public_key() for node in cluster_nodes]
# unique short bucket name
bucket = f"bucket-{hex(int(datetime.now().timestamp()*1000000))}"
bucket = string_utils.unique_name("bucket-")
frostfs_authmate: FrostfsAuthmate = FrostfsAuthmate(shell, FROSTFS_AUTHMATE_EXEC)
issue_secret_output = frostfs_authmate.secret.issue(

View file

@ -7,7 +7,7 @@ import yaml
from frostfs_testlib import reporter
from frostfs_testlib.hosting.hosting import Hosting
from frostfs_testlib.resources.common import HOSTING_CONFIG_FILE
from frostfs_testlib.resources.common import ASSETS_DIR, HOSTING_CONFIG_FILE
from frostfs_testlib.storage import get_service_registry
@ -24,6 +24,16 @@ def configure_testlib():
registry.register_service(svc.name, svc.load())
@pytest.fixture(scope="session")
def temp_directory(configure_testlib):
with reporter.step("Prepare tmp directory"):
full_path = ASSETS_DIR
if not os.path.exists(full_path):
os.mkdir(full_path)
return full_path
@pytest.fixture(scope="session")
def hosting(configure_testlib) -> Hosting:
with open(HOSTING_CONFIG_FILE, "r") as file:

View file

@ -0,0 +1,13 @@
import pytest
@pytest.hookimpl
def pytest_collection_modifyitems(items: list[pytest.Item]):
# All tests which reside in frostfs nodeid are granted with frostfs marker, excluding
# nodeid = full path of the test
# 1. plugins
# 2. testlib itself
for item in items:
location = item.location[0]
if "frostfs" in location and "plugin" not in location and "testlib" not in location:
item.add_marker("frostfs")

View file

@ -60,6 +60,7 @@ class HostConfig:
"""
plugin_name: str
hostname: str
healthcheck_plugin_name: str
address: str
s3_creds_plugin_name: str = field(default="authmate")

View file

@ -164,6 +164,9 @@ class DockerHost(Host):
return volume_path
def send_signal_to_service(self, service_name: str, signal: str) -> None:
raise NotImplementedError("Not implemented for docker")
def delete_metabase(self, service_name: str) -> None:
raise NotImplementedError("Not implemented for docker")
@ -185,6 +188,12 @@ class DockerHost(Host):
def is_file_exist(self, file_path: str) -> None:
raise NotImplementedError("Not implemented for docker")
def wipefs_storage_node_data(self, service_name: str) -> None:
raise NotImplementedError("Not implemented for docker")
def finish_wipefs(self, service_name: str) -> None:
raise NotImplementedError("Not implemented for docker")
def delete_storage_node_data(self, service_name: str, cache_only: bool = False) -> None:
volume_path = self.get_data_directory(service_name)
@ -240,7 +249,7 @@ class DockerHost(Host):
until: Optional[datetime] = None,
unit: Optional[str] = None,
exclude_filter: Optional[str] = None,
priority: Optional[str] = None
priority: Optional[str] = None,
) -> str:
client = self._get_docker_client()
filtered_logs = ""

View file

@ -117,6 +117,17 @@ class Host(ABC):
service_name: Name of the service to stop.
"""
@abstractmethod
def send_signal_to_service(self, service_name: str, signal: str) -> None:
"""Send signal to service with specified name using kill -<signal>
The service must be hosted on this host.
Args:
service_name: Name of the service to stop.
signal: signal name. See kill -l to all names
"""
@abstractmethod
def mask_service(self, service_name: str) -> None:
"""Prevent the service from start by any activity by masking it.
@ -178,6 +189,21 @@ class Host(ABC):
cache_only: To delete cache only.
"""
@abstractmethod
def wipefs_storage_node_data(self, service_name: str) -> None:
"""Erases all data of the storage node with specified name.
Args:
service_name: Name of storage node service.
"""
def finish_wipefs(self, service_name: str) -> None:
"""Erases all data of the storage node with specified name.
Args:
service_name: Name of storage node service.
"""
@abstractmethod
def delete_fstree(self, service_name: str) -> None:
"""
@ -297,7 +323,7 @@ class Host(ABC):
until: Optional[datetime] = None,
unit: Optional[str] = None,
exclude_filter: Optional[str] = None,
priority: Optional[str] = None
priority: Optional[str] = None,
) -> str:
"""Get logs from host filtered by regex.

View file

View file

@ -0,0 +1,97 @@
import json
import logging
import logging.config
import httpx
from frostfs_testlib import reporter
timeout = httpx.Timeout(60, read=150)
LOGGING_CONFIG = {
"disable_existing_loggers": False,
"version": 1,
"handlers": {"default": {"class": "logging.StreamHandler", "formatter": "http", "stream": "ext://sys.stderr"}},
"formatters": {
"http": {
"format": "%(levelname)s [%(asctime)s] %(name)s - %(message)s",
"datefmt": "%Y-%m-%d %H:%M:%S",
}
},
"loggers": {
"httpx": {
"handlers": ["default"],
"level": "DEBUG",
},
"httpcore": {
"handlers": ["default"],
"level": "ERROR",
},
},
}
logging.config.dictConfig(LOGGING_CONFIG)
logger = logging.getLogger("NeoLogger")
class HttpClient:
@reporter.step("Send {method} request to {url}")
def send(self, method: str, url: str, expected_status_code: int = None, **kwargs: dict) -> httpx.Response:
transport = httpx.HTTPTransport(verify=False, retries=5)
client = httpx.Client(timeout=timeout, transport=transport)
response = client.request(method, url, **kwargs)
self._attach_response(response)
logger.info(f"Response: {response.status_code} => {response.text}")
if expected_status_code:
assert response.status_code == expected_status_code, (
f"Got {response.status_code} response code" f" while {expected_status_code} expected"
)
return response
@classmethod
def _attach_response(cls, response: httpx.Response):
request = response.request
try:
request_headers = json.dumps(dict(request.headers), indent=4)
except json.JSONDecodeError:
request_headers = str(request.headers)
try:
request_body = request.read()
try:
request_body = request_body.decode("utf-8")
except UnicodeDecodeError as e:
request_body = f"Unable to decode binary data to text using UTF-8 encoding: {str(e)}"
except Exception as e:
request_body = f"Error reading request body: {str(e)}"
request_body = "" if request_body is None else request_body
try:
response_headers = json.dumps(dict(response.headers), indent=4)
except json.JSONDecodeError:
response_headers = str(response.headers)
report = (
f"Method: {request.method}\n\n"
f"URL: {request.url}\n\n"
f"Request Headers: {request_headers}\n\n"
f"Request Body: {request_body}\n\n"
f"Response Status Code: {response.status_code}\n\n"
f"Response Headers: {response_headers}\n\n"
f"Response Body: {response.text}\n\n"
)
curl_request = cls._create_curl_request(request.url, request.method, request.headers, request_body)
reporter.attach(report, "Requests Info")
reporter.attach(curl_request, "CURL")
@classmethod
def _create_curl_request(cls, url: str, method: str, headers: httpx.Headers, data: str) -> str:
headers = " ".join(f'-H "{name.title()}: {value}"' for name, value in headers.items())
data = f" -d '{data}'" if data else ""
# Option -k means no verify SSL
return f"curl {url} -X {method} {headers}{data} -k"

View file

@ -1,5 +1,6 @@
from abc import ABC, abstractmethod
from frostfs_testlib.load.interfaces.loader import Loader
from frostfs_testlib.load.k6 import K6
from frostfs_testlib.load.load_config import LoadParams
from frostfs_testlib.storage.cluster import ClusterNode
@ -48,3 +49,7 @@ class ScenarioRunner(ABC):
@abstractmethod
def get_results(self) -> dict:
"""Get results from K6 run"""
@abstractmethod
def get_loaders(self) -> list[Loader]:
"""Return loaders"""

View file

@ -190,6 +190,8 @@ class Preset(MetaConfig):
containers_count: Optional[int] = metadata_field(grpc_preset_scenarios, "containers", None, False)
# Container placement policy for containers for gRPC
container_placement_policy: Optional[list[str]] = metadata_field(grpc_preset_scenarios, "policy", None, False, formatter=force_list)
# Number of retries for creation of container
container_creation_retry: Optional[int] = metadata_field(grpc_preset_scenarios, "retry", None, False)
# ------ S3 ------
# Amount of buckets which should be created

View file

@ -30,6 +30,7 @@ from frostfs_testlib.utils.file_keeper import FileKeeper
class RunnerBase(ScenarioRunner):
k6_instances: list[K6]
loaders: list[Loader]
@reporter.step("Run preset on loaders")
def preset(self):
@ -49,9 +50,11 @@ class RunnerBase(ScenarioRunner):
def get_k6_instances(self):
return self.k6_instances
def get_loaders(self) -> list[Loader]:
return self.loaders
class DefaultRunner(RunnerBase):
loaders: list[Loader]
user: User
def __init__(
@ -228,7 +231,6 @@ class DefaultRunner(RunnerBase):
class LocalRunner(RunnerBase):
loaders: list[Loader]
cluster_state_controller: ClusterStateController
file_keeper: FileKeeper
user: User

View file

@ -51,3 +51,6 @@ CREDENTIALS_CREATE_TIMEOUT = "1m"
HOSTING_CONFIG_FILE = os.getenv(
"HOSTING_CONFIG_FILE", os.path.abspath(os.path.join(os.path.dirname(__file__), "..", "..", "..", ".devenv.hosting.yaml"))
)
MORE_LOG = os.getenv("MORE_LOG", "1")
EXPIRATION_EPOCH_ATTRIBUTE = "__SYSTEM__EXPIRATION_EPOCH"

View file

@ -27,5 +27,10 @@ S3_BUCKET_DOES_NOT_ALLOW_ACL = "The bucket does not allow ACLs"
S3_MALFORMED_XML_REQUEST = "The XML you provided was not well-formed or did not validate against our published schema."
RULE_ACCESS_DENIED_CONTAINER = "access to container operation {operation} is denied by access policy engine: Access denied"
RULE_ACCESS_DENIED_OBJECT = "access to object operation denied: ape denied request: method {operation}: Access denied"
# Errors from node missing reasons if request was forwarded. Commenting for now
# RULE_ACCESS_DENIED_OBJECT = "access to object operation denied: ape denied request: method {operation}: Access denied"
RULE_ACCESS_DENIED_OBJECT = "access to object operation denied: ape denied request"
NO_RULE_FOUND_CONTAINER = "access to container operation {operation} is denied by access policy engine: NoRuleFound"
# Errors from node missing reasons if request was forwarded. Commenting for now
# NO_RULE_FOUND_OBJECT = "access to object operation denied: ape denied request: method {operation}: NoRuleFound"
NO_RULE_FOUND_OBJECT = "access to object operation denied: ape denied request"

View file

@ -26,6 +26,7 @@ BACKGROUND_LOAD_CONTAINER_PLACEMENT_POLICY = os.getenv(
)
BACKGROUND_LOAD_S3_LOCATION = os.getenv("BACKGROUND_LOAD_S3_LOCATION", "node-off")
PRESET_CONTAINERS_COUNT = os.getenv("CONTAINERS_COUNT", "40")
PRESET_CONTAINER_CREATION_RETRY_COUNT = os.getenv("CONTAINER_CREATION_RETRY_COUNT", "20")
# TODO: At lease one object is required due to bug in xk6 (buckets with no objects produce millions exceptions in read)
PRESET_OBJECTS_COUNT = os.getenv("OBJ_COUNT", "1")
K6_DIRECTORY = os.getenv("K6_DIRECTORY", "/etc/k6")

View file

@ -16,11 +16,10 @@ OPTIONAL_NODE_UNDER_LOAD = os.getenv("OPTIONAL_NODE_UNDER_LOAD")
OPTIONAL_FAILOVER_ENABLED = str_to_bool(os.getenv("OPTIONAL_FAILOVER_ENABLED", "true"))
# Set this to True to disable background load. I.E. node which supposed to be stopped will not be actually stopped.
OPTIONAL_BACKGROUND_LOAD_ENABLED = str_to_bool(
os.getenv("OPTIONAL_BACKGROUND_LOAD_ENABLED", "true")
)
OPTIONAL_BACKGROUND_LOAD_ENABLED = str_to_bool(os.getenv("OPTIONAL_BACKGROUND_LOAD_ENABLED", "true"))
# Set this to False for disable autouse fixture like node healthcheck during developing time.
OPTIONAL_AUTOUSE_FIXTURES_ENABLED = str_to_bool(
os.getenv("OPTIONAL_AUTOUSE_FIXTURES_ENABLED", "true")
)
OPTIONAL_AUTOUSE_FIXTURES_ENABLED = str_to_bool(os.getenv("OPTIONAL_AUTOUSE_FIXTURES_ENABLED", "true"))
# Use cache for fixtures with @cachec_fixture decorator
OPTIONAL_CACHE_FIXTURES = str_to_bool(os.getenv("OPTIONAL_CACHE_FIXTURES", "false"))

View file

@ -70,6 +70,9 @@ class AwsCliClient(S3ClientWrapper):
if bucket is None:
bucket = string_utils.unique_name("bucket-")
if bucket.startswith("-") or " " in bucket:
bucket = f'"{bucket}"'
if object_lock_enabled_for_bucket is None:
object_lock = ""
elif object_lock_enabled_for_bucket:
@ -103,16 +106,25 @@ class AwsCliClient(S3ClientWrapper):
@reporter.step("Delete bucket S3")
def delete_bucket(self, bucket: str) -> None:
if bucket.startswith("-") or " " in bucket:
bucket = f'"{bucket}"'
cmd = f"aws {self.common_flags} s3api delete-bucket --bucket {bucket} --endpoint {self.s3gate_endpoint} --profile {self.profile}"
self.local_shell.exec(cmd, command_options)
@reporter.step("Head bucket S3")
def head_bucket(self, bucket: str) -> None:
if bucket.startswith("-") or " " in bucket:
bucket = f'"{bucket}"'
cmd = f"aws {self.common_flags} s3api head-bucket --bucket {bucket} --endpoint {self.s3gate_endpoint} --profile {self.profile}"
self.local_shell.exec(cmd)
@reporter.step("Put bucket versioning status")
def put_bucket_versioning(self, bucket: str, status: VersioningStatus) -> None:
if bucket.startswith("-") or " " in bucket:
bucket = f'"{bucket}"'
cmd = (
f"aws {self.common_flags} s3api put-bucket-versioning --bucket {bucket} "
f"--versioning-configuration Status={status.value} "
@ -122,6 +134,9 @@ class AwsCliClient(S3ClientWrapper):
@reporter.step("Get bucket versioning status")
def get_bucket_versioning_status(self, bucket: str) -> Literal["Enabled", "Suspended"]:
if bucket.startswith("-") or " " in bucket:
bucket = f'"{bucket}"'
cmd = (
f"aws {self.common_flags} s3api get-bucket-versioning --bucket {bucket} "
f"--endpoint {self.s3gate_endpoint} --profile {self.profile}"
@ -132,6 +147,9 @@ class AwsCliClient(S3ClientWrapper):
@reporter.step("Put bucket tagging")
def put_bucket_tagging(self, bucket: str, tags: list) -> None:
if bucket.startswith("-") or " " in bucket:
bucket = f'"{bucket}"'
tags_json = {"TagSet": [{"Key": tag_key, "Value": tag_value} for tag_key, tag_value in tags]}
cmd = (
f"aws {self.common_flags} s3api put-bucket-tagging --bucket {bucket} "
@ -141,6 +159,9 @@ class AwsCliClient(S3ClientWrapper):
@reporter.step("Get bucket tagging")
def get_bucket_tagging(self, bucket: str) -> list:
if bucket.startswith("-") or " " in bucket:
bucket = f'"{bucket}"'
cmd = (
f"aws {self.common_flags} s3api get-bucket-tagging --bucket {bucket} "
f"--endpoint {self.s3gate_endpoint} --profile {self.profile}"
@ -151,6 +172,9 @@ class AwsCliClient(S3ClientWrapper):
@reporter.step("Get bucket acl")
def get_bucket_acl(self, bucket: str) -> list:
if bucket.startswith("-") or " " in bucket:
bucket = f'"{bucket}"'
cmd = (
f"aws {self.common_flags} s3api get-bucket-acl --bucket {bucket} " f"--endpoint {self.s3gate_endpoint} --profile {self.profile}"
)
@ -160,6 +184,9 @@ class AwsCliClient(S3ClientWrapper):
@reporter.step("Get bucket location")
def get_bucket_location(self, bucket: str) -> dict:
if bucket.startswith("-") or " " in bucket:
bucket = f'"{bucket}"'
cmd = (
f"aws {self.common_flags} s3api get-bucket-location --bucket {bucket} "
f"--endpoint {self.s3gate_endpoint} --profile {self.profile}"
@ -169,8 +196,20 @@ class AwsCliClient(S3ClientWrapper):
return response.get("LocationConstraint")
@reporter.step("List objects S3")
def list_objects(self, bucket: str, full_output: bool = False) -> Union[dict, list[str]]:
cmd = f"aws {self.common_flags} s3api list-objects --bucket {bucket} " f"--endpoint {self.s3gate_endpoint} --profile {self.profile}"
def list_objects(
self, bucket: str, full_output: bool = False, page_size: Optional[int] = None, prefix: Optional[str] = None
) -> Union[dict, list[str]]:
if bucket.startswith("-") or " " in bucket:
bucket = f'"{bucket}"'
cmd = f"aws {self.common_flags} s3api list-objects --bucket {bucket} " f"--endpoint {self.s3gate_endpoint} "
if page_size:
cmd = cmd.replace("--no-paginate", "")
cmd += f" --page-size {page_size} "
if prefix:
cmd += f" --prefix {prefix}"
if self.profile:
cmd += f" --profile {self.profile} "
output = self.local_shell.exec(cmd).stdout
response = self._to_json(output)
@ -181,6 +220,9 @@ class AwsCliClient(S3ClientWrapper):
@reporter.step("List objects S3 v2")
def list_objects_v2(self, bucket: str, full_output: bool = False) -> Union[dict, list[str]]:
if bucket.startswith("-") or " " in bucket:
bucket = f'"{bucket}"'
cmd = (
f"aws {self.common_flags} s3api list-objects-v2 --bucket {bucket} "
f"--endpoint {self.s3gate_endpoint} --profile {self.profile}"
@ -195,6 +237,9 @@ class AwsCliClient(S3ClientWrapper):
@reporter.step("List objects versions S3")
def list_objects_versions(self, bucket: str, full_output: bool = False) -> dict:
if bucket.startswith("-") or " " in bucket:
bucket = f'"{bucket}"'
cmd = (
f"aws {self.common_flags} s3api list-object-versions --bucket {bucket} "
f"--endpoint {self.s3gate_endpoint} --profile {self.profile}"
@ -205,6 +250,9 @@ class AwsCliClient(S3ClientWrapper):
@reporter.step("List objects delete markers S3")
def list_delete_markers(self, bucket: str, full_output: bool = False) -> list:
if bucket.startswith("-") or " " in bucket:
bucket = f'"{bucket}"'
cmd = (
f"aws {self.common_flags} s3api list-object-versions --bucket {bucket} "
f"--endpoint {self.s3gate_endpoint} --profile {self.profile}"
@ -228,8 +276,13 @@ class AwsCliClient(S3ClientWrapper):
) -> str:
if bucket is None:
bucket = source_bucket
if bucket.startswith("-") or " " in bucket:
bucket = f'"{bucket}"'
if key is None:
key = string_utils.unique_name("copy-object-")
copy_source = f"{source_bucket}/{source_key}"
cmd = (
@ -266,6 +319,9 @@ class AwsCliClient(S3ClientWrapper):
grant_full_control: Optional[str] = None,
grant_read: Optional[str] = None,
) -> str:
if bucket.startswith("-") or " " in bucket:
bucket = f'"{bucket}"'
if key is None:
key = os.path.basename(filepath)
@ -297,6 +353,9 @@ class AwsCliClient(S3ClientWrapper):
@reporter.step("Head object S3")
def head_object(self, bucket: str, key: str, version_id: Optional[str] = None) -> dict:
if bucket.startswith("-") or " " in bucket:
bucket = f'"{bucket}"'
version = f" --version-id {version_id}" if version_id else ""
cmd = (
f"aws {self.common_flags} s3api head-object --bucket {bucket} --key {key} "
@ -315,6 +374,9 @@ class AwsCliClient(S3ClientWrapper):
object_range: Optional[tuple[int, int]] = None,
full_output: bool = False,
) -> dict | TestFile:
if bucket.startswith("-") or " " in bucket:
bucket = f'"{bucket}"'
test_file = TestFile(os.path.join(os.getcwd(), ASSETS_DIR, string_utils.unique_name("dl-object-")))
version = f" --version-id {version_id}" if version_id else ""
cmd = (
@ -329,6 +391,9 @@ class AwsCliClient(S3ClientWrapper):
@reporter.step("Get object ACL")
def get_object_acl(self, bucket: str, key: str, version_id: Optional[str] = None) -> list:
if bucket.startswith("-") or " " in bucket:
bucket = f'"{bucket}"'
version = f" --version-id {version_id}" if version_id else ""
cmd = (
f"aws {self.common_flags} s3api get-object-acl --bucket {bucket} --key {key} "
@ -347,6 +412,9 @@ class AwsCliClient(S3ClientWrapper):
grant_write: Optional[str] = None,
grant_read: Optional[str] = None,
) -> list:
if bucket.startswith("-") or " " in bucket:
bucket = f'"{bucket}"'
cmd = (
f"aws {self.common_flags} s3api put-object-acl --bucket {bucket} --key {key} "
f" --endpoint {self.s3gate_endpoint} --profile {self.profile}"
@ -369,6 +437,9 @@ class AwsCliClient(S3ClientWrapper):
grant_write: Optional[str] = None,
grant_read: Optional[str] = None,
) -> None:
if bucket.startswith("-") or " " in bucket:
bucket = f'"{bucket}"'
cmd = (
f"aws {self.common_flags} s3api put-bucket-acl --bucket {bucket} "
f" --endpoint {self.s3gate_endpoint} --profile {self.profile}"
@ -383,6 +454,9 @@ class AwsCliClient(S3ClientWrapper):
@reporter.step("Delete objects S3")
def delete_objects(self, bucket: str, keys: list[str]) -> dict:
if bucket.startswith("-") or " " in bucket:
bucket = f'"{bucket}"'
file_path = os.path.join(os.getcwd(), ASSETS_DIR, "delete.json")
delete_structure = json.dumps(_make_objs_dict(keys))
with open(file_path, "w") as out_file:
@ -399,6 +473,9 @@ class AwsCliClient(S3ClientWrapper):
@reporter.step("Delete object S3")
def delete_object(self, bucket: str, key: str, version_id: Optional[str] = None) -> dict:
if bucket.startswith("-") or " " in bucket:
bucket = f'"{bucket}"'
version = f" --version-id {version_id}" if version_id else ""
cmd = (
f"aws {self.common_flags} s3api delete-object --bucket {bucket} "
@ -409,6 +486,9 @@ class AwsCliClient(S3ClientWrapper):
@reporter.step("Delete object versions S3")
def delete_object_versions(self, bucket: str, object_versions: list) -> dict:
if bucket.startswith("-") or " " in bucket:
bucket = f'"{bucket}"'
# Build deletion list in S3 format
delete_list = {
"Objects": [
@ -435,6 +515,9 @@ class AwsCliClient(S3ClientWrapper):
@reporter.step("Delete object versions S3 without delete markers")
def delete_object_versions_without_dm(self, bucket: str, object_versions: list) -> None:
if bucket.startswith("-") or " " in bucket:
bucket = f'"{bucket}"'
# Delete objects without creating delete markers
for object_version in object_versions:
self.delete_object(bucket=bucket, key=object_version["Key"], version_id=object_version["VersionId"])
@ -450,6 +533,8 @@ class AwsCliClient(S3ClientWrapper):
part_number: int = 0,
full_output: bool = True,
) -> dict:
if bucket.startswith("-") or " " in bucket:
bucket = f'"{bucket}"'
attrs = ",".join(attributes)
version = f" --version-id {version_id}" if version_id else ""
@ -473,6 +558,9 @@ class AwsCliClient(S3ClientWrapper):
@reporter.step("Get bucket policy")
def get_bucket_policy(self, bucket: str) -> dict:
if bucket.startswith("-") or " " in bucket:
bucket = f'"{bucket}"'
cmd = (
f"aws {self.common_flags} s3api get-bucket-policy --bucket {bucket} "
f"--endpoint {self.s3gate_endpoint} --profile {self.profile}"
@ -483,6 +571,9 @@ class AwsCliClient(S3ClientWrapper):
@reporter.step("Delete bucket policy")
def delete_bucket_policy(self, bucket: str) -> dict:
if bucket.startswith("-") or " " in bucket:
bucket = f'"{bucket}"'
cmd = (
f"aws {self.common_flags} s3api delete-bucket-policy --bucket {bucket} "
f"--endpoint {self.s3gate_endpoint} --profile {self.profile}"
@ -493,6 +584,9 @@ class AwsCliClient(S3ClientWrapper):
@reporter.step("Put bucket policy")
def put_bucket_policy(self, bucket: str, policy: dict) -> None:
if bucket.startswith("-") or " " in bucket:
bucket = f'"{bucket}"'
# Leaving it as is was in test repo. Double dumps to escape resulting string
# Example:
# policy = {"a": 1}
@ -508,6 +602,9 @@ class AwsCliClient(S3ClientWrapper):
@reporter.step("Get bucket cors")
def get_bucket_cors(self, bucket: str) -> dict:
if bucket.startswith("-") or " " in bucket:
bucket = f'"{bucket}"'
cmd = (
f"aws {self.common_flags} s3api get-bucket-cors --bucket {bucket} "
f"--endpoint {self.s3gate_endpoint} --profile {self.profile}"
@ -518,6 +615,9 @@ class AwsCliClient(S3ClientWrapper):
@reporter.step("Put bucket cors")
def put_bucket_cors(self, bucket: str, cors_configuration: dict) -> None:
if bucket.startswith("-") or " " in bucket:
bucket = f'"{bucket}"'
cmd = (
f"aws {self.common_flags} s3api put-bucket-cors --bucket {bucket} "
f"--cors-configuration '{json.dumps(cors_configuration)}' --endpoint {self.s3gate_endpoint} --profile {self.profile}"
@ -526,6 +626,9 @@ class AwsCliClient(S3ClientWrapper):
@reporter.step("Delete bucket cors")
def delete_bucket_cors(self, bucket: str) -> None:
if bucket.startswith("-") or " " in bucket:
bucket = f'"{bucket}"'
cmd = (
f"aws {self.common_flags} s3api delete-bucket-cors --bucket {bucket} "
f"--endpoint {self.s3gate_endpoint} --profile {self.profile}"
@ -534,6 +637,9 @@ class AwsCliClient(S3ClientWrapper):
@reporter.step("Delete bucket tagging")
def delete_bucket_tagging(self, bucket: str) -> None:
if bucket.startswith("-") or " " in bucket:
bucket = f'"{bucket}"'
cmd = (
f"aws {self.common_flags} s3api delete-bucket-tagging --bucket {bucket} "
f"--endpoint {self.s3gate_endpoint} --profile {self.profile}"
@ -549,6 +655,9 @@ class AwsCliClient(S3ClientWrapper):
version_id: Optional[str] = None,
bypass_governance_retention: Optional[bool] = None,
) -> None:
if bucket.startswith("-") or " " in bucket:
bucket = f'"{bucket}"'
version = f" --version-id {version_id}" if version_id else ""
cmd = (
f"aws {self.common_flags} s3api put-object-retention --bucket {bucket} --key {key} "
@ -566,6 +675,9 @@ class AwsCliClient(S3ClientWrapper):
legal_hold_status: Literal["ON", "OFF"],
version_id: Optional[str] = None,
) -> None:
if bucket.startswith("-") or " " in bucket:
bucket = f'"{bucket}"'
version = f" --version-id {version_id}" if version_id else ""
legal_hold = json.dumps({"Status": legal_hold_status})
cmd = (
@ -576,6 +688,9 @@ class AwsCliClient(S3ClientWrapper):
@reporter.step("Put object tagging")
def put_object_tagging(self, bucket: str, key: str, tags: list, version_id: Optional[str] = "") -> None:
if bucket.startswith("-") or " " in bucket:
bucket = f'"{bucket}"'
tags = [{"Key": tag_key, "Value": tag_value} for tag_key, tag_value in tags]
tagging = {"TagSet": tags}
version = f" --version-id {version_id}" if version_id else ""
@ -587,6 +702,9 @@ class AwsCliClient(S3ClientWrapper):
@reporter.step("Get object tagging")
def get_object_tagging(self, bucket: str, key: str, version_id: Optional[str] = None) -> list:
if bucket.startswith("-") or " " in bucket:
bucket = f'"{bucket}"'
version = f" --version-id {version_id}" if version_id else ""
cmd = (
f"aws {self.common_flags} s3api get-object-tagging --bucket {bucket} --key {key} "
@ -598,6 +716,9 @@ class AwsCliClient(S3ClientWrapper):
@reporter.step("Delete object tagging")
def delete_object_tagging(self, bucket: str, key: str, version_id: Optional[str] = None) -> None:
if bucket.startswith("-") or " " in bucket:
bucket = f'"{bucket}"'
version = f" --version-id {version_id}" if version_id else ""
cmd = (
f"aws {self.common_flags} s3api delete-object-tagging --bucket {bucket} "
@ -613,6 +734,9 @@ class AwsCliClient(S3ClientWrapper):
acl: Optional[str] = None,
metadata: Optional[dict] = None,
) -> dict:
if bucket.startswith("-") or " " in bucket:
bucket = f'"{bucket}"'
cmd = (
f"aws {self.common_flags} s3 sync {dir_path} s3://{bucket} " f"--endpoint-url {self.s3gate_endpoint} --profile {self.profile}"
)
@ -633,6 +757,9 @@ class AwsCliClient(S3ClientWrapper):
acl: Optional[str] = None,
metadata: Optional[dict] = None,
) -> dict:
if bucket.startswith("-") or " " in bucket:
bucket = f'"{bucket}"'
cmd = (
f"aws {self.common_flags} s3 cp {dir_path} s3://{bucket} "
f"--endpoint-url {self.s3gate_endpoint} --recursive --profile {self.profile}"
@ -648,6 +775,9 @@ class AwsCliClient(S3ClientWrapper):
@reporter.step("Create multipart upload S3")
def create_multipart_upload(self, bucket: str, key: str) -> str:
if bucket.startswith("-") or " " in bucket:
bucket = f'"{bucket}"'
cmd = (
f"aws {self.common_flags} s3api create-multipart-upload --bucket {bucket} "
f"--key {key} --endpoint-url {self.s3gate_endpoint} --profile {self.profile}"
@ -661,6 +791,9 @@ class AwsCliClient(S3ClientWrapper):
@reporter.step("List multipart uploads S3")
def list_multipart_uploads(self, bucket: str) -> Optional[list[dict]]:
if bucket.startswith("-") or " " in bucket:
bucket = f'"{bucket}"'
cmd = (
f"aws {self.common_flags} s3api list-multipart-uploads --bucket {bucket} "
f"--endpoint-url {self.s3gate_endpoint} --profile {self.profile}"
@ -671,6 +804,9 @@ class AwsCliClient(S3ClientWrapper):
@reporter.step("Abort multipart upload S3")
def abort_multipart_upload(self, bucket: str, key: str, upload_id: str) -> None:
if bucket.startswith("-") or " " in bucket:
bucket = f'"{bucket}"'
cmd = (
f"aws {self.common_flags} s3api abort-multipart-upload --bucket {bucket} "
f"--key {key} --upload-id {upload_id} --endpoint-url {self.s3gate_endpoint} --profile {self.profile}"
@ -679,6 +815,9 @@ class AwsCliClient(S3ClientWrapper):
@reporter.step("Upload part S3")
def upload_part(self, bucket: str, key: str, upload_id: str, part_num: int, filepath: str) -> str:
if bucket.startswith("-") or " " in bucket:
bucket = f'"{bucket}"'
cmd = (
f"aws {self.common_flags} s3api upload-part --bucket {bucket} --key {key} "
f"--upload-id {upload_id} --part-number {part_num} --body {filepath} "
@ -691,6 +830,9 @@ class AwsCliClient(S3ClientWrapper):
@reporter.step("Upload copy part S3")
def upload_part_copy(self, bucket: str, key: str, upload_id: str, part_num: int, copy_source: str) -> str:
if bucket.startswith("-") or " " in bucket:
bucket = f'"{bucket}"'
cmd = (
f"aws {self.common_flags} s3api upload-part-copy --bucket {bucket} --key {key} "
f"--upload-id {upload_id} --part-number {part_num} --copy-source {copy_source} "
@ -704,6 +846,9 @@ class AwsCliClient(S3ClientWrapper):
@reporter.step("List parts S3")
def list_parts(self, bucket: str, key: str, upload_id: str) -> list[dict]:
if bucket.startswith("-") or " " in bucket:
bucket = f'"{bucket}"'
cmd = (
f"aws {self.common_flags} s3api list-parts --bucket {bucket} --key {key} "
f"--upload-id {upload_id} --endpoint-url {self.s3gate_endpoint} --profile {self.profile}"
@ -717,6 +862,9 @@ class AwsCliClient(S3ClientWrapper):
@reporter.step("Complete multipart upload S3")
def complete_multipart_upload(self, bucket: str, key: str, upload_id: str, parts: list) -> None:
if bucket.startswith("-") or " " in bucket:
bucket = f'"{bucket}"'
file_path = os.path.join(os.getcwd(), ASSETS_DIR, "parts.json")
parts_dict = {"Parts": [{"ETag": etag, "PartNumber": part_num} for part_num, etag in parts]}
@ -737,6 +885,9 @@ class AwsCliClient(S3ClientWrapper):
@reporter.step("Put object lock configuration")
def put_object_lock_configuration(self, bucket: str, configuration: dict) -> dict:
if bucket.startswith("-") or " " in bucket:
bucket = f'"{bucket}"'
cmd = (
f"aws {self.common_flags} s3api put-object-lock-configuration --bucket {bucket} "
f"--object-lock-configuration '{json.dumps(configuration)}' --endpoint-url {self.s3gate_endpoint} --profile {self.profile}"
@ -746,6 +897,9 @@ class AwsCliClient(S3ClientWrapper):
@reporter.step("Get object lock configuration")
def get_object_lock_configuration(self, bucket: str):
if bucket.startswith("-") or " " in bucket:
bucket = f'"{bucket}"'
cmd = (
f"aws {self.common_flags} s3api get-object-lock-configuration --bucket {bucket} "
f"--endpoint-url {self.s3gate_endpoint} --profile {self.profile}"
@ -754,6 +908,45 @@ class AwsCliClient(S3ClientWrapper):
response = self._to_json(output)
return response.get("ObjectLockConfiguration")
@reporter.step("Put bucket lifecycle configuration")
def put_bucket_lifecycle_configuration(self, bucket: str, lifecycle_configuration: dict, dumped_configuration: str) -> dict:
if bucket.startswith("-") or " " in bucket:
bucket = f'"{bucket}"'
cmd = (
f"aws {self.common_flags} s3api put-bucket-lifecycle-configuration --bucket {bucket} "
f"--endpoint-url {self.s3gate_endpoint} --lifecycle-configuration file://{dumped_configuration} --profile {self.profile}"
)
output = self.local_shell.exec(cmd).stdout
response = self._to_json(output)
return response
@reporter.step("Get bucket lifecycle configuration")
def get_bucket_lifecycle_configuration(self, bucket: str) -> dict:
if bucket.startswith("-") or " " in bucket:
bucket = f'"{bucket}"'
cmd = (
f"aws {self.common_flags} s3api get-bucket-lifecycle-configuration --bucket {bucket} "
f"--endpoint-url {self.s3gate_endpoint} --profile {self.profile}"
)
output = self.local_shell.exec(cmd).stdout
response = self._to_json(output)
return response
@reporter.step("Delete bucket lifecycle configuration")
def delete_bucket_lifecycle(self, bucket: str) -> dict:
if bucket.startswith("-") or " " in bucket:
bucket = f'"{bucket}"'
cmd = (
f"aws {self.common_flags} s3api delete-bucket-lifecycle --bucket {bucket} "
f"--endpoint-url {self.s3gate_endpoint} --profile {self.profile}"
)
output = self.local_shell.exec(cmd).stdout
response = self._to_json(output)
return response
@staticmethod
def _to_json(output: str) -> dict:
json_output = {}
@ -975,7 +1168,7 @@ class AwsCliClient(S3ClientWrapper):
response = self._to_json(output)
assert response.get("Policy"), f"Expected Policy in response:\n{response}"
assert response["Policy"].get("PolicyName") == policy_name, f"PolicyName should be equal to {policy_name}"
assert response["Policy"].get("Arn") == policy_arn, f"PolicyArn should be equal to {policy_arn}"
return response
@ -1256,3 +1449,90 @@ class AwsCliClient(S3ClientWrapper):
response = self._to_json(output)
return response
# MFA METHODS
@reporter.step("Creates a new virtual MFA device")
def iam_create_virtual_mfa_device(self, virtual_mfa_device_name: str, outfile: str, bootstrap_method: str) -> tuple:
cmd = f"aws {self.common_flags} iam create-virtual-mfa-device --virtual-mfa-device-name {virtual_mfa_device_name}\
--outfile {outfile} --bootstrap-method {bootstrap_method} --endpoint {self.iam_endpoint}"
if self.profile:
cmd += f" --profile {self.profile}"
output = self.local_shell.exec(cmd).stdout
response = self._to_json(output)
serial_number = response.get("VirtualMFADevice", {}).get("SerialNumber")
assert serial_number, f"Expected SerialNumber in response:\n{response}"
return serial_number, False
@reporter.step("Deactivates the specified MFA device and removes it from association with the user name")
def iam_deactivate_mfa_device(self, user_name: str, serial_number: str) -> dict:
cmd = f"aws {self.common_flags} iam deactivate-mfa-device --user-name {user_name} --serial-number {serial_number} --endpoint {self.iam_endpoint}"
if self.profile:
cmd += f" --profile {self.profile}"
output = self.local_shell.exec(cmd).stdout
response = self._to_json(output)
return response
@reporter.step("Deletes a virtual MFA device")
def iam_delete_virtual_mfa_device(self, serial_number: str) -> dict:
cmd = f"aws {self.common_flags} iam delete-virtual-mfa-device --serial-number {serial_number} --endpoint {self.iam_endpoint}"
if self.profile:
cmd += f" --profile {self.profile}"
output = self.local_shell.exec(cmd).stdout
response = self._to_json(output)
return response
@reporter.step("Enables the specified MFA device and associates it with the specified IAM user")
def iam_enable_mfa_device(self, user_name: str, serial_number: str, authentication_code1: str, authentication_code2: str) -> dict:
cmd = f"aws {self.common_flags} iam enable-mfa-device --user-name {user_name} --serial-number {serial_number} --authentication-code1 {authentication_code1}\
--authentication-code2 {authentication_code2} --endpoint {self.iam_endpoint}"
if self.profile:
cmd += f" --profile {self.profile}"
output = self.local_shell.exec(cmd).stdout
response = self._to_json(output)
return response
@reporter.step("Lists the MFA devices for an IAM user")
def iam_list_virtual_mfa_devices(self) -> dict:
cmd = f"aws {self.common_flags} iam list-virtual-mfa-devices --endpoint {self.iam_endpoint}"
if self.profile:
cmd += f" --profile {self.profile}"
output = self.local_shell.exec(cmd).stdout
response = self._to_json(output)
assert response.get("VirtualMFADevices"), f"Expected VirtualMFADevices in response:\n{response}"
return response
@reporter.step("Get session token for user")
def sts_get_session_token(
self, duration_seconds: Optional[str] = None, serial_number: Optional[str] = None, token_code: Optional[str] = None
) -> tuple:
cmd = f"aws {self.common_flags} sts get-session-token --endpoint {self.iam_endpoint}"
if duration_seconds:
cmd += f" --duration-seconds {duration_seconds}"
if serial_number:
cmd += f" --serial-number {serial_number}"
if token_code:
cmd += f" --token-code {token_code}"
if self.profile:
cmd += f" --profile {self.profile}"
output = self.local_shell.exec(cmd).stdout
response = self._to_json(output)
access_key = response.get("Credentials", {}).get("AccessKeyId")
secret_access_key = response.get("Credentials", {}).get("SecretAccessKey")
session_token = response.get("Credentials", {}).get("SessionToken")
assert access_key, f"Expected AccessKeyId in response:\n{response}"
assert secret_access_key, f"Expected SecretAccessKey in response:\n{response}"
assert session_token, f"Expected SessionToken in response:\n{response}"
return access_key, secret_access_key, session_token

File diff suppressed because it is too large Load diff

View file

@ -58,6 +58,10 @@ class S3ClientWrapper(HumanReadableABC):
def set_endpoint(self, s3gate_endpoint: str):
"""Set endpoint"""
@abstractmethod
def set_iam_endpoint(self, iam_endpoint: str):
"""Set iam endpoint"""
@abstractmethod
def create_bucket(
self,
@ -191,7 +195,9 @@ class S3ClientWrapper(HumanReadableABC):
"""
@abstractmethod
def list_objects(self, bucket: str, full_output: bool = False) -> Union[dict, list[str]]:
def list_objects(
self, bucket: str, full_output: bool = False, page_size: Optional[int] = None, prefix: Optional[str] = None
) -> Union[dict, list[str]]:
"""Returns some or all (up to 1,000) of the objects in a bucket with each request.
You can use the request parameters as selection criteria to return a subset of the objects in a bucket.
A 200 OK response can contain valid or invalid XML. Make sure to design your application
@ -366,6 +372,18 @@ class S3ClientWrapper(HumanReadableABC):
def delete_object_tagging(self, bucket: str, key: str) -> None:
"""Removes the entire tag set from the specified object."""
@abstractmethod
def put_bucket_lifecycle_configuration(self, bucket: str, lifecycle_configuration: dict, dumped_configuration: str) -> dict:
"""Adds or updates bucket lifecycle configuration"""
@abstractmethod
def get_bucket_lifecycle_configuration(self, bucket: str) -> dict:
"""Gets bucket lifecycle configuration"""
@abstractmethod
def delete_bucket_lifecycle(self, bucket: str) -> dict:
"""Deletes bucket lifecycle"""
@abstractmethod
def get_object_attributes(
self,
@ -408,7 +426,7 @@ class S3ClientWrapper(HumanReadableABC):
"""Adds the specified user to the specified group"""
@abstractmethod
def iam_attach_group_policy(self, group: str, policy_arn: str) -> dict:
def iam_attach_group_policy(self, group_name: str, policy_arn: str) -> dict:
"""Attaches the specified managed policy to the specified IAM group"""
@abstractmethod
@ -562,3 +580,32 @@ class S3ClientWrapper(HumanReadableABC):
@abstractmethod
def iam_untag_user(self, user_name: str, tag_keys: list) -> dict:
"""Removes the specified tags from the user"""
# MFA methods
@abstractmethod
def iam_create_virtual_mfa_device(
self, virtual_mfa_device_name: str, outfile: Optional[str] = None, bootstrap_method: Optional[str] = None
) -> tuple:
"""Creates a new virtual MFA device"""
@abstractmethod
def iam_deactivate_mfa_device(self, user_name: str, serial_number: str) -> dict:
"""Deactivates the specified MFA device and removes it from association with the user name"""
@abstractmethod
def iam_delete_virtual_mfa_device(self, serial_number: str) -> dict:
"""Deletes a virtual MFA device"""
@abstractmethod
def iam_enable_mfa_device(self, user_name: str, serial_number: str, authentication_code1: str, authentication_code2: str) -> dict:
"""Enables the specified MFA device and associates it with the specified IAM user"""
@abstractmethod
def iam_list_virtual_mfa_devices(self) -> dict:
"""Lists the MFA devices for an IAM user"""
@abstractmethod
def sts_get_session_token(
self, duration_seconds: Optional[str] = None, serial_number: Optional[str] = None, token_code: Optional[str] = None
) -> tuple:
"""Get session token for user"""

View file

@ -1,15 +1,18 @@
import logging
import subprocess
import tempfile
from contextlib import nullcontext
from datetime import datetime
from typing import IO, Optional
import pexpect
from frostfs_testlib import reporter
from frostfs_testlib.resources.common import MORE_LOG
from frostfs_testlib.shell.interfaces import CommandInspector, CommandOptions, CommandResult, Shell
logger = logging.getLogger("frostfs.testlib.shell")
step_context = reporter.step if MORE_LOG == "1" else nullcontext
class LocalShell(Shell):
@ -28,7 +31,7 @@ class LocalShell(Shell):
for inspector in [*self.command_inspectors, *extra_inspectors]:
command = inspector.inspect(original_command, command)
with reporter.step(f"Executing command: {command}"):
with step_context(f"Executing command: {command}"):
if options.interactive_inputs:
return self._exec_interactive(command, options)
return self._exec_non_interactive(command, options)

View file

@ -327,13 +327,6 @@ def _parse_cid(output: str) -> str:
return splitted[1]
@reporter.step("Search container by name")
def search_container_by_name(name: str, node: ClusterNode):
resolver_cls = load_plugin("frostfs.testlib.bucket_cid_resolver", node.host.config.product)
resolver: BucketContainerResolver = resolver_cls()
return resolver.resolve(node, name)
@reporter.step("Search for nodes with a container")
def search_nodes_with_container(
wallet: WalletInfo,

View file

@ -15,7 +15,7 @@ from frostfs_testlib.storage.cluster import Cluster, ClusterNode
from frostfs_testlib.storage.dataclasses.wallet import WalletInfo
from frostfs_testlib.testing import wait_for_success
from frostfs_testlib.utils import json_utils
from frostfs_testlib.utils.cli_utils import parse_cmd_table, parse_netmap_output
from frostfs_testlib.utils.cli_utils import parse_netmap_output
from frostfs_testlib.utils.file_utils import TestFile
logger = logging.getLogger("NeoLogger")
@ -623,25 +623,20 @@ def head_object(
# If response is Complex Object header, it has `splitId` key
if "splitId" in decoded.keys():
logger.info("decoding split header")
return json_utils.decode_split_header(decoded)
# If response is Last or Linking Object header,
# it has `header` dictionary and non-null `split` dictionary
if "split" in decoded["header"].keys():
if decoded["header"]["split"]:
logger.info("decoding linking object")
return json_utils.decode_linking_object(decoded)
if decoded["header"]["objectType"] == "STORAGE_GROUP":
logger.info("decoding storage group")
return json_utils.decode_storage_group(decoded)
if decoded["header"]["objectType"] == "TOMBSTONE":
logger.info("decoding tombstone")
return json_utils.decode_tombstone(decoded)
logger.info("decoding simple header")
return json_utils.decode_simple_header(decoded)
@ -695,11 +690,13 @@ def neo_go_query_height(shell: Shell, endpoint: str) -> dict:
latest_block = first_line.split(":")
# taking second line from command's output contain wallet key
second_line = output.split("\n")[1]
validated_state = second_line.split(":")
return {
latest_block[0].replace(":", ""): int(latest_block[1]),
validated_state[0].replace(":", ""): int(validated_state[1]),
}
if second_line != "":
validated_state = second_line.split(":")
return {
latest_block[0].replace(":", ""): int(latest_block[1]),
validated_state[0].replace(":", ""): int(validated_state[1]),
}
return {latest_block[0].replace(":", ""): int(latest_block[1])}
@wait_for_success()

View file

@ -69,7 +69,7 @@ def get_epoch(shell: Shell, cluster: Cluster, alive_node: Optional[StorageNode]
@reporter.step("Tick Epoch")
def tick_epoch(shell: Shell, cluster: Cluster, alive_node: Optional[StorageNode] = None):
def tick_epoch(shell: Shell, cluster: Cluster, alive_node: Optional[StorageNode] = None, delta: Optional[int] = None):
"""
Tick epoch using frostfs-adm or NeoGo if frostfs-adm is not available (DevEnv)
Args:
@ -88,12 +88,17 @@ def tick_epoch(shell: Shell, cluster: Cluster, alive_node: Optional[StorageNode]
frostfs_adm_exec_path=FROSTFS_ADM_EXEC,
config_file=FROSTFS_ADM_CONFIG_PATH,
)
frostfs_adm.morph.force_new_epoch()
frostfs_adm.morph.force_new_epoch(delta=delta)
return
# Otherwise we tick epoch using transaction
cur_epoch = get_epoch(shell, cluster)
if delta:
next_epoch = cur_epoch + delta
else:
next_epoch = cur_epoch + 1
# Use first node by default
ir_node = cluster.services(InnerRing)[0]
# In case if no local_wallet_path is provided, we use wallet_path
@ -110,7 +115,7 @@ def tick_epoch(shell: Shell, cluster: Cluster, alive_node: Optional[StorageNode]
wallet_password=ir_wallet_pass,
scripthash=get_contract_hash(morph_chain, "netmap.frostfs", shell=shell),
method="newEpoch",
arguments=f"int:{cur_epoch + 1}",
arguments=f"int:{next_epoch}",
multisig_hash=f"{ir_address}:Global",
address=ir_address,
rpc_endpoint=morph_endpoint,

View file

@ -1,8 +1,8 @@
import re
from frostfs_testlib import reporter
from frostfs_testlib.testing.test_control import wait_for_success
from frostfs_testlib.storage.cluster import ClusterNode
from frostfs_testlib.testing.test_control import wait_for_success
@reporter.step("Check metrics result")
@ -19,7 +19,7 @@ def check_metrics_counter(
counter_act += get_metrics_value(cluster_node, parse_from_command, **metrics_greps)
assert eval(
f"{counter_act} {operator} {counter_exp}"
), f"Expected: {counter_exp} {operator} Actual: {counter_act} in node: {cluster_node}"
), f"Expected: {counter_exp} {operator} Actual: {counter_act} in nodes: {cluster_nodes}"
@reporter.step("Get metrics value from node: {node}")

View file

@ -13,6 +13,7 @@ from frostfs_testlib.resources.common import MORPH_BLOCK_TIME
from frostfs_testlib.shell import Shell
from frostfs_testlib.steps.epoch import tick_epoch, wait_for_epochs_align
from frostfs_testlib.storage.cluster import Cluster, StorageNode
from frostfs_testlib.testing.test_control import wait_for_success
from frostfs_testlib.utils import datetime_utils
logger = logging.getLogger("NeoLogger")
@ -111,10 +112,7 @@ def get_netmap_snapshot(node: StorageNode, shell: Shell) -> str:
storage_wallet_path = node.get_wallet_path()
cli = FrostfsCli(shell, FROSTFS_CLI_EXEC, config_file=storage_wallet_config)
return cli.netmap.snapshot(
rpc_endpoint=node.get_rpc_endpoint(),
wallet=storage_wallet_path,
).stdout
return cli.netmap.snapshot(rpc_endpoint=node.get_rpc_endpoint(), wallet=storage_wallet_path).stdout
@reporter.step("Get shard list for {node}")
@ -202,12 +200,7 @@ def delete_node_data(node: StorageNode) -> None:
@reporter.step("Exclude node {node_to_exclude} from network map")
def exclude_node_from_network_map(
node_to_exclude: StorageNode,
alive_node: StorageNode,
shell: Shell,
cluster: Cluster,
) -> None:
def exclude_node_from_network_map(node_to_exclude: StorageNode, alive_node: StorageNode, shell: Shell, cluster: Cluster) -> None:
node_netmap_key = node_to_exclude.get_wallet_public_key()
storage_node_set_status(node_to_exclude, status="offline")
@ -221,12 +214,7 @@ def exclude_node_from_network_map(
@reporter.step("Include node {node_to_include} into network map")
def include_node_to_network_map(
node_to_include: StorageNode,
alive_node: StorageNode,
shell: Shell,
cluster: Cluster,
) -> None:
def include_node_to_network_map(node_to_include: StorageNode, alive_node: StorageNode, shell: Shell, cluster: Cluster) -> None:
storage_node_set_status(node_to_include, status="online")
# Per suggestion of @fyrchik we need to wait for 2 blocks after we set status and after tick epoch.
@ -236,7 +224,7 @@ def include_node_to_network_map(
tick_epoch(shell, cluster)
time.sleep(datetime_utils.parse_time(MORPH_BLOCK_TIME) * 2)
check_node_in_map(node_to_include, shell, alive_node)
await_node_in_map(node_to_include, shell, alive_node)
@reporter.step("Check node {node} in network map")
@ -250,6 +238,11 @@ def check_node_in_map(node: StorageNode, shell: Shell, alive_node: Optional[Stor
assert node_netmap_key in snapshot, f"Expected node with key {node_netmap_key} to be in network map"
@wait_for_success(300, 15, title="Await node {node} in network map")
def await_node_in_map(node: StorageNode, shell: Shell, alive_node: Optional[StorageNode] = None) -> None:
check_node_in_map(node, shell, alive_node)
@reporter.step("Check node {node} NOT in network map")
def check_node_not_in_map(node: StorageNode, shell: Shell, alive_node: Optional[StorageNode] = None) -> None:
alive_node = alive_node or node
@ -276,12 +269,7 @@ def wait_for_node_to_be_ready(node: StorageNode) -> None:
@reporter.step("Remove nodes from network map trough cli-adm morph command")
def remove_nodes_from_map_morph(
shell: Shell,
cluster: Cluster,
remove_nodes: list[StorageNode],
alive_node: Optional[StorageNode] = None,
):
def remove_nodes_from_map_morph(shell: Shell, cluster: Cluster, remove_nodes: list[StorageNode], alive_node: Optional[StorageNode] = None):
"""
Move node to the Offline state in the candidates list and tick an epoch to update the netmap
using frostfs-adm
@ -300,9 +288,5 @@ def remove_nodes_from_map_morph(
if FROSTFS_ADM_EXEC and FROSTFS_ADM_CONFIG_PATH:
# If frostfs-adm is available, then we tick epoch with it (to be consistent with UAT tests)
frostfsadm = FrostfsAdm(
shell=remote_shell,
frostfs_adm_exec_path=FROSTFS_ADM_EXEC,
config_file=FROSTFS_ADM_CONFIG_PATH,
)
frostfsadm = FrostfsAdm(shell=remote_shell, frostfs_adm_exec_path=FROSTFS_ADM_EXEC, config_file=FROSTFS_ADM_CONFIG_PATH)
frostfsadm.morph.remove_nodes(node_netmap_keys)

View file

@ -7,8 +7,9 @@ from dateutil.parser import parse
from frostfs_testlib import reporter
from frostfs_testlib.s3 import S3ClientWrapper, VersioningStatus
from frostfs_testlib.s3.interfaces import BucketContainerResolver
from frostfs_testlib.shell import Shell
from frostfs_testlib.steps.cli.container import search_container_by_name, search_nodes_with_container
from frostfs_testlib.steps.cli.container import search_nodes_with_container
from frostfs_testlib.storage.cluster import Cluster, ClusterNode
from frostfs_testlib.storage.dataclasses.wallet import WalletInfo
@ -175,10 +176,11 @@ def search_nodes_with_bucket(
wallet: WalletInfo,
shell: Shell,
endpoint: str,
bucket_container_resolver: BucketContainerResolver,
) -> list[ClusterNode]:
cid = None
for cluster_node in cluster.cluster_nodes:
cid = search_container_by_name(name=bucket_name, node=cluster_node)
cid = bucket_container_resolver.resolve(cluster_node, bucket_name)
if cid:
break
nodes_list = search_nodes_with_container(wallet=wallet, cid=cid, shell=shell, endpoint=endpoint, cluster=cluster)

View file

@ -11,10 +11,10 @@ from frostfs_testlib.storage import get_service_registry
from frostfs_testlib.storage.configuration.interfaces import ServiceConfigurationYml
from frostfs_testlib.storage.constants import ConfigAttributes
from frostfs_testlib.storage.dataclasses.frostfs_services import HTTPGate, InnerRing, MorphChain, S3Gate, StorageNode
from frostfs_testlib.storage.dataclasses.metrics import Metrics
from frostfs_testlib.storage.dataclasses.node_base import NodeBase, ServiceClass
from frostfs_testlib.storage.dataclasses.storage_object_info import Interfaces
from frostfs_testlib.storage.service_registry import ServiceRegistry
from frostfs_testlib.storage.dataclasses.metrics import Metrics
class ClusterNode:
@ -91,10 +91,10 @@ class ClusterNode:
config_str = yaml.dump(new_config)
shell.exec(f"echo '{config_str}' | sudo tee {config_file_path}")
def config(self, service_type: type[ServiceClass]) -> ServiceConfigurationYml:
def config(self, service_type: ServiceClass) -> ServiceConfigurationYml:
return self.service(service_type).config
def service(self, service_type: type[ServiceClass]) -> ServiceClass:
def service(self, service_type: ServiceClass) -> ServiceClass:
"""
Get a service cluster node of specified type.

View file

@ -12,7 +12,15 @@ class ConfigAttributes:
REMOTE_WALLET_CONFIG = "remote_wallet_config_path"
ENDPOINT_DATA_0 = "endpoint_data0"
ENDPOINT_DATA_1 = "endpoint_data1"
ENDPOINT_DATA_0_NS = "endpoint_data0_namespace"
ENDPOINT_INTERNAL = "endpoint_internal0"
ENDPOINT_PROMETHEUS = "endpoint_prometheus"
CONTROL_ENDPOINT = "control_endpoint"
UN_LOCODE = "un_locode"
class PlacementRule:
DEFAULT_PLACEMENT_RULE = "REP 2 IN X CBF 1 SELECT 4 FROM * AS X"
SINGLE_PLACEMENT_RULE = "REP 1 IN X CBF 1 SELECT 4 FROM * AS X"
REP_2_FOR_3_NODES_PLACEMENT_RULE = "REP 2 IN X CBF 1 SELECT 3 FROM * AS X"
DEFAULT_EC_PLACEMENT_RULE = "EC 3.1"

View file

@ -14,6 +14,7 @@ from frostfs_testlib.resources.cli import FROSTFS_ADM_CONFIG_PATH, FROSTFS_ADM_E
from frostfs_testlib.resources.common import MORPH_BLOCK_TIME
from frostfs_testlib.shell import CommandOptions, Shell, SshConnectionProvider
from frostfs_testlib.steps.network import IpHelper
from frostfs_testlib.steps.node_management import include_node_to_network_map, remove_nodes_from_map_morph
from frostfs_testlib.storage.cluster import Cluster, ClusterNode, S3Gate, StorageNode
from frostfs_testlib.storage.controllers.disk_controller import DiskController
from frostfs_testlib.storage.dataclasses.node_base import NodeBase, ServiceClass
@ -39,6 +40,7 @@ class ClusterStateController:
self.stopped_nodes: list[ClusterNode] = []
self.detached_disks: dict[str, DiskController] = {}
self.dropped_traffic: list[ClusterNode] = []
self.excluded_from_netmap: list[StorageNode] = []
self.stopped_services: set[NodeBase] = set()
self.cluster = cluster
self.healthcheck = healthcheck
@ -170,6 +172,15 @@ class ClusterStateController:
if service_type == StorageNode:
self.wait_after_storage_startup()
@run_optionally(optionals.OPTIONAL_FAILOVER_ENABLED)
@reporter.step("Send sighup to all {service_type} services")
def sighup_services_of_type(self, service_type: type[ServiceClass]):
services = self.cluster.services(service_type)
parallel([service.send_signal_to_service for service in services], signal="SIGHUP")
if service_type == StorageNode:
self.wait_after_storage_startup()
@wait_for_success(600, 60)
def wait_s3gate(self, s3gate: S3Gate):
with reporter.step(f"Wait for {s3gate} reconnection"):
@ -204,21 +215,27 @@ class ClusterStateController:
@run_optionally(optionals.OPTIONAL_FAILOVER_ENABLED)
@reporter.step("Stop {service_type} service on {node}")
def stop_service_of_type(self, node: ClusterNode, service_type: type[ServiceClass], mask: bool = True):
def stop_service_of_type(self, node: ClusterNode, service_type: ServiceClass, mask: bool = True):
service = node.service(service_type)
service.stop_service(mask)
self.stopped_services.add(service)
@run_optionally(optionals.OPTIONAL_FAILOVER_ENABLED)
@reporter.step("Send sighup to {service_type} service on {node}")
def sighup_service_of_type(self, node: ClusterNode, service_type: ServiceClass):
service = node.service(service_type)
service.send_signal_to_service("SIGHUP")
@run_optionally(optionals.OPTIONAL_FAILOVER_ENABLED)
@reporter.step("Start {service_type} service on {node}")
def start_service_of_type(self, node: ClusterNode, service_type: type[ServiceClass]):
def start_service_of_type(self, node: ClusterNode, service_type: ServiceClass):
service = node.service(service_type)
service.start_service()
self.stopped_services.discard(service)
@run_optionally(optionals.OPTIONAL_FAILOVER_ENABLED)
@reporter.step("Start all stopped {service_type} services")
def start_stopped_services_of_type(self, service_type: type[ServiceClass]):
def start_stopped_services_of_type(self, service_type: ServiceClass):
stopped_svc = self._get_stopped_by_type(service_type)
if not stopped_svc:
return
@ -307,24 +324,17 @@ class ClusterStateController:
self.suspended_services = {}
@reporter.step("Drop traffic to {node}, nodes - {block_nodes}")
def drop_traffic(
self,
node: ClusterNode,
wakeup_timeout: int,
name_interface: str,
block_nodes: list[ClusterNode] = None,
) -> None:
def drop_traffic(self, node: ClusterNode, wakeup_timeout: int, name_interface: str, block_nodes: list[ClusterNode] = None) -> None:
list_ip = self._parse_interfaces(block_nodes, name_interface)
IpHelper.drop_input_traffic_to_node(node, list_ip)
time.sleep(wakeup_timeout)
self.dropped_traffic.append(node)
@reporter.step("Start traffic to {node}")
def restore_traffic(
self,
node: ClusterNode,
) -> None:
def restore_traffic(self, node: ClusterNode) -> None:
IpHelper.restore_input_traffic_to_node(node=node)
index = self.dropped_traffic.index(node)
self.dropped_traffic.pop(index)
@reporter.step("Restore blocked nodes")
def restore_all_traffic(self):
@ -408,9 +418,7 @@ class ClusterStateController:
@reporter.step("Set MaintenanceModeAllowed - {status}")
def set_maintenance_mode_allowed(self, status: str, cluster_node: ClusterNode) -> None:
frostfs_adm = FrostfsAdm(
shell=cluster_node.host.get_shell(),
frostfs_adm_exec_path=FROSTFS_ADM_EXEC,
config_file=FROSTFS_ADM_CONFIG_PATH,
shell=cluster_node.host.get_shell(), frostfs_adm_exec_path=FROSTFS_ADM_EXEC, config_file=FROSTFS_ADM_CONFIG_PATH
)
frostfs_adm.morph.set_config(set_key_value=f"MaintenanceModeAllowed={status}")
@ -451,6 +459,25 @@ class ClusterStateController:
else:
assert netmap[0].node_status == status, f"Node status should be '{status}', but was '{netmap[0].node_status}'"
def remove_node_from_netmap(self, removes_nodes: list[StorageNode]) -> None:
alive_storage = list(set(self.cluster.storage_nodes) - set(removes_nodes))[0]
remove_nodes_from_map_morph(self.shell, self.cluster, removes_nodes, alive_storage)
self.excluded_from_netmap.extend(removes_nodes)
def include_node_to_netmap(self, include_node: StorageNode, alive_node: StorageNode):
include_node_to_network_map(include_node, alive_node, self.shell, self.cluster)
self.excluded_from_netmap.pop(self.excluded_from_netmap.index(include_node))
def include_all_excluded_nodes(self):
if not self.excluded_from_netmap:
return
alive_node = list(set(self.cluster.storage_nodes) - set(self.excluded_from_netmap))[0]
if not alive_node:
return
for exclude_node in self.excluded_from_netmap.copy():
self.include_node_to_netmap(exclude_node, alive_node)
def _get_cli(
self, local_shell: Shell, local_wallet: WalletInfo, cluster_node: ClusterNode
) -> tuple[FrostfsAdm, FrostfsCli, FrostfsCli]:
@ -467,11 +494,7 @@ class ClusterStateController:
frostfs_adm = FrostfsAdm(shell=shell, frostfs_adm_exec_path=FROSTFS_ADM_EXEC, config_file=FROSTFS_ADM_CONFIG_PATH)
frostfs_cli = FrostfsCli(local_shell, FROSTFS_CLI_EXEC, local_wallet.config_path)
frostfs_cli_remote = FrostfsCli(
shell=shell,
frostfs_cli_exec_path=FROSTFS_CLI_EXEC,
config_file=wallet_config_path,
)
frostfs_cli_remote = FrostfsCli(shell=shell, frostfs_cli_exec_path=FROSTFS_CLI_EXEC, config_file=wallet_config_path)
return frostfs_adm, frostfs_cli, frostfs_cli_remote
def _enable_date_synchronizer(self, cluster_node: ClusterNode):
@ -534,8 +557,5 @@ class ClusterStateController:
@reporter.step("Get contract by domain - {domain_name}")
def get_domain_contracts(self, cluster_node: ClusterNode, domain_name: str):
frostfs_adm = FrostfsAdm(
shell=cluster_node.host.get_shell(),
frostfs_adm_exec_path=FROSTFS_ADM_EXEC,
)
return frostfs_adm.morph.dump_hashes(cluster_node.morph_chain.get_endpoint(), domain_name).stdout
frostfs_adm = FrostfsAdm(shell=cluster_node.host.get_shell(), frostfs_adm_exec_path=FROSTFS_ADM_EXEC)
return frostfs_adm.morph.dump_hashes(cluster_node.morph_chain.get_http_endpoint(), domain_name).stdout

View file

@ -2,22 +2,22 @@ import json
from typing import Any
from frostfs_testlib.cli.frostfs_cli.shards import FrostfsCliShards
from frostfs_testlib.shell.interfaces import CommandResult
from frostfs_testlib.storage.cluster import ClusterNode
from frostfs_testlib.testing.test_control import wait_for_success
class ShardsWatcher:
shards_snapshots: list[dict[str, Any]] = []
def __init__(self, node_under_test: ClusterNode) -> None:
self.shards_snapshots: list[dict[str, Any]] = []
self.storage_node = node_under_test.storage_node
self.take_shards_snapshot()
def take_shards_snapshot(self):
def take_shards_snapshot(self) -> None:
snapshot = self.get_shards_snapshot()
self.shards_snapshots.append(snapshot)
def get_shards_snapshot(self):
def get_shards_snapshot(self) -> dict[str, Any]:
shards_snapshot: dict[str, Any] = {}
shards = self.get_shards()
@ -26,17 +26,17 @@ class ShardsWatcher:
return shards_snapshot
def _get_current_snapshot(self):
def _get_current_snapshot(self) -> dict[str, Any]:
return self.shards_snapshots[-1]
def _get_previous_snapshot(self):
def _get_previous_snapshot(self) -> dict[str, Any]:
return self.shards_snapshots[-2]
def _is_shard_present(self, shard_id):
def _is_shard_present(self, shard_id) -> bool:
snapshot = self._get_current_snapshot()
return shard_id in snapshot
def get_shards_with_new_errors(self):
def get_shards_with_new_errors(self) -> dict[str, Any]:
current_snapshot = self._get_current_snapshot()
previous_snapshot = self._get_previous_snapshot()
shards_with_new_errors: dict[str, Any] = {}
@ -46,7 +46,7 @@ class ShardsWatcher:
return shards_with_new_errors
def get_shards_with_errors(self):
def get_shards_with_errors(self) -> dict[str, Any]:
snapshot = self.get_shards_snapshot()
shards_with_errors: dict[str, Any] = {}
for shard_id, shard in snapshot.items():
@ -55,7 +55,7 @@ class ShardsWatcher:
return shards_with_errors
def get_shard_status(self, shard_id: str):
def get_shard_status(self, shard_id: str): # -> Any:
snapshot = self.get_shards_snapshot()
assert shard_id in snapshot, f"Shard {shard_id} is missing: {snapshot}"
@ -63,18 +63,18 @@ class ShardsWatcher:
return snapshot[shard_id]["mode"]
@wait_for_success(60, 2)
def await_for_all_shards_status(self, status: str):
def await_for_all_shards_status(self, status: str) -> None:
snapshot = self.get_shards_snapshot()
for shard_id in snapshot:
assert snapshot[shard_id]["mode"] == status, f"Shard {shard_id} have wrong shard status"
@wait_for_success(60, 2)
def await_for_shard_status(self, shard_id: str, status: str):
def await_for_shard_status(self, shard_id: str, status: str) -> None:
assert self.get_shard_status(shard_id) == status
@wait_for_success(60, 2)
def await_for_shard_have_new_errors(self, shard_id: str):
def await_for_shard_have_new_errors(self, shard_id: str) -> None:
self.take_shards_snapshot()
assert self._is_shard_present(shard_id)
shards_with_new_errors = self.get_shards_with_new_errors()
@ -82,7 +82,7 @@ class ShardsWatcher:
assert shard_id in shards_with_new_errors, f"Expected shard {shard_id} to have new errors, but haven't {self.shards_snapshots[-1]}"
@wait_for_success(300, 5)
def await_for_shards_have_no_new_errors(self):
def await_for_shards_have_no_new_errors(self) -> None:
self.take_shards_snapshot()
shards_with_new_errors = self.get_shards_with_new_errors()
assert len(shards_with_new_errors) == 0
@ -102,7 +102,7 @@ class ShardsWatcher:
return json.loads(response.stdout.split(">", 1)[1])
def set_shard_mode(self, shard_id: str, mode: str, clear_errors: bool = True):
def set_shard_mode(self, shard_id: str, mode: str, clear_errors: bool = True) -> CommandResult:
shards_cli = FrostfsCliShards(
self.storage_node.host.get_shell(),
self.storage_node.host.get_cli_config("frostfs-cli").exec_path,

View file

@ -14,14 +14,19 @@ class ConfigStateManager(StateManager):
self.cluster = self.csc.cluster
@reporter.step("Change configuration for {service_type} on all nodes")
def set_on_all_nodes(self, service_type: type[ServiceClass], values: dict[str, Any]):
def set_on_all_nodes(self, service_type: type[ServiceClass], values: dict[str, Any], sighup: bool = False):
services = self.cluster.services(service_type)
nodes = self.cluster.nodes(services)
self.services_with_changed_config.update([(node, service_type) for node in nodes])
self.csc.stop_services_of_type(service_type)
if not sighup:
self.csc.stop_services_of_type(service_type)
parallel([node.config(service_type).set for node in nodes], values=values)
self.csc.start_services_of_type(service_type)
if not sighup:
self.csc.start_services_of_type(service_type)
else:
self.csc.sighup_services_of_type(service_type)
@reporter.step("Change configuration for {service_type} on {node}")
def set_on_node(self, node: ClusterNode, service_type: type[ServiceClass], values: dict[str, Any]):
@ -32,18 +37,26 @@ class ConfigStateManager(StateManager):
self.csc.start_service_of_type(node, service_type)
@reporter.step("Revert all configuration changes")
def revert_all(self):
def revert_all(self, sighup: bool = False):
if not self.services_with_changed_config:
return
parallel(self._revert_svc, self.services_with_changed_config)
parallel(self._revert_svc, self.services_with_changed_config, sighup)
self.services_with_changed_config.clear()
self.csc.start_all_stopped_services()
if not sighup:
self.csc.start_all_stopped_services()
# TODO: parallel can't have multiple parallel_items :(
@reporter.step("Revert all configuration {node_and_service}")
def _revert_svc(self, node_and_service: tuple[ClusterNode, ServiceClass]):
def _revert_svc(self, node_and_service: tuple[ClusterNode, ServiceClass], sighup: bool = False):
node, service_type = node_and_service
self.csc.stop_service_of_type(node, service_type)
service = node.service(service_type)
if not sighup:
self.csc.stop_service_of_type(node, service_type)
node.config(service_type).revert()
if sighup:
service.send_signal_to_service("SIGHUP")

View file

@ -26,6 +26,33 @@ class ObjectOperations(HumanReadableEnum):
return [op for op in ObjectOperations if op != ObjectOperations.WILDCARD_ALL]
class ContainerOperations(HumanReadableEnum):
PUT = "container.put"
GET = "container.get"
LIST = "container.list"
DELETE = "container.delete"
WILDCARD_ALL = "container.*"
@staticmethod
def get_all():
return [op for op in ObjectOperations if op != ObjectOperations.WILDCARD_ALL]
@dataclass
class Operations:
GET_CONTAINER = "GetContainer"
PUT_CONTAINER = "PutContainer"
DELETE_CONTAINER = "DeleteContainer"
LIST_CONTAINER = "ListContainers"
GET_OBJECT = "GetObject"
DELETE_OBJECT = "DeleteObject"
HASH_OBJECT = "HashObject"
RANGE_OBJECT = "RangeObject"
SEARCH_OBJECT = "SearchObject"
HEAD_OBJECT = "HeadObject"
PUT_OBJECT = "PutObject"
class Verb(HumanReadableEnum):
ALLOW = "allow"
DENY = "deny"
@ -47,6 +74,8 @@ class ConditionType(HumanReadableEnum):
class ConditionKey(HumanReadableEnum):
ROLE = '"\\$Actor:role"'
PUBLIC_KEY = '"\\$Actor:publicKey"'
OBJECT_TYPE = '"\\$Object:objectType"'
OBJECT_ID = '"\\$Object:objectID"'
class MatchType(HumanReadableEnum):
@ -75,6 +104,14 @@ class Condition:
def by_key(*args, **kwargs) -> "Condition":
return Condition(ConditionKey.PUBLIC_KEY, *args, **kwargs)
@staticmethod
def by_object_type(*args, **kwargs) -> "Condition":
return Condition(ConditionKey.OBJECT_TYPE, *args, **kwargs)
@staticmethod
def by_object_id(*args, **kwargs) -> "Condition":
return Condition(ConditionKey.OBJECT_ID, *args, **kwargs)
class Rule:
def __init__(
@ -99,7 +136,7 @@ class Rule:
if not operations:
self.operations = []
elif isinstance(operations, ObjectOperations):
elif isinstance(operations, (ObjectOperations, ContainerOperations)):
self.operations = [operations]
else:
self.operations = operations

View file

@ -39,12 +39,18 @@ class S3Gate(NodeBase):
def get_endpoint(self) -> str:
return self._get_attribute(ConfigAttributes.ENDPOINT_DATA_0)
def get_ns_endpoint(self, ns_name: str) -> str:
return self._get_attribute(f"{ConfigAttributes.ENDPOINT_DATA_0}_namespace").format(namespace=ns_name)
def get_all_endpoints(self) -> list[str]:
return [
self._get_attribute(ConfigAttributes.ENDPOINT_DATA_0),
self._get_attribute(ConfigAttributes.ENDPOINT_DATA_1),
]
def get_ns_endpoint(self, ns_name: str) -> str:
return self._get_attribute(ConfigAttributes.ENDPOINT_DATA_0_NS).format(namespace=ns_name)
def service_healthcheck(self) -> bool:
health_metric = "frostfs_s3_gw_state_health"
output = self.host.get_shell().exec(f"curl -s localhost:8086 | grep {health_metric} | sed 1,2d").stdout

View file

@ -65,6 +65,10 @@ class NodeBase(HumanReadableABC):
with reporter.step(f"Start {self.name} service on {self.host.config.address}"):
self.host.start_service(self.name)
def send_signal_to_service(self, signal: str):
with reporter.step(f"Send -{signal} signal to {self.name} service on {self.host.config.address}"):
self.host.send_signal_to_service(self.name, signal)
@abstractmethod
def service_healthcheck(self) -> bool:
"""Service healthcheck."""
@ -185,9 +189,7 @@ class NodeBase(HumanReadableABC):
if attribute_name not in config.attributes:
if default_attribute_name is None:
raise RuntimeError(
f"Service {self.name} has no {attribute_name} in config and fallback attribute isn't set either"
)
raise RuntimeError(f"Service {self.name} has no {attribute_name} in config and fallback attribute isn't set either")
return config.attributes[default_attribute_name]
@ -197,9 +199,7 @@ class NodeBase(HumanReadableABC):
return self.host.get_service_config(self.name)
def get_service_uptime(self, service: str) -> datetime:
result = self.host.get_shell().exec(
f"systemctl show {service} --property ActiveEnterTimestamp | cut -d '=' -f 2"
)
result = self.host.get_shell().exec(f"systemctl show {service} --property ActiveEnterTimestamp | cut -d '=' -f 2")
start_time = parser.parse(result.stdout.strip())
current_time = datetime.now(tz=timezone.utc)
active_time = current_time - start_time

View file

@ -77,3 +77,19 @@ class NodeNetInfo:
maintenance_mode_allowed: str = None
eigen_trust_alpha: str = None
eigen_trust_iterations: str = None
@dataclass
class Chunk:
def __init__(self, object_id: str, required_nodes: list, confirmed_nodes: list, ec_parent_object_id: str, ec_index: int) -> None:
self.object_id = object_id
self.required_nodes = required_nodes
self.confirmed_nodes = confirmed_nodes
self.ec_parent_object_id = ec_parent_object_id
self.ec_index = ec_index
def __str__(self) -> str:
return self.object_id
def __repr__(self) -> str:
return self.object_id

View file

@ -0,0 +1,14 @@
from frostfs_testlib.cli.frostfs_cli.cli import FrostfsCli
from frostfs_testlib.storage.grpc_operations import interfaces
from frostfs_testlib.storage.grpc_operations.implementations import container, object
class CliClientWrapper(interfaces.GrpcClientWrapper):
def __init__(self, cli: FrostfsCli) -> None:
self.cli = cli
self.object: interfaces.ObjectInterface = object.ObjectOperations(self.cli)
self.container: interfaces.ContainerInterface = container.ContainerOperations(self.cli)
class RpcClientWrapper(interfaces.GrpcClientWrapper):
pass # The next series

View file

@ -0,0 +1,165 @@
import json
from typing import Optional
from frostfs_testlib import reporter
from frostfs_testlib.cli.frostfs_cli.cli import FrostfsCli
from frostfs_testlib.resources.cli import CLI_DEFAULT_TIMEOUT
from frostfs_testlib.storage.cluster import Cluster, ClusterNode
from frostfs_testlib.storage.controllers.shards_watcher import ShardsWatcher
from frostfs_testlib.storage.dataclasses.storage_object_info import Chunk, NodeNetmapInfo
from frostfs_testlib.storage.grpc_operations import interfaces
from frostfs_testlib.testing.test_control import wait_for_success
from frostfs_testlib.utils.cli_utils import parse_netmap_output
class ChunksOperations(interfaces.ChunksInterface):
def __init__(self, cli: FrostfsCli) -> None:
self.cli = cli
@reporter.step("Search node without chunks")
def search_node_without_chunks(self, chunks: list[Chunk], cluster: Cluster, endpoint: str = None) -> list[ClusterNode]:
if not endpoint:
endpoint = cluster.default_rpc_endpoint
netmap = parse_netmap_output(self.cli.netmap.snapshot(endpoint, timeout=CLI_DEFAULT_TIMEOUT).stdout)
chunks_node_key = []
for chunk in chunks:
chunks_node_key.extend(chunk.confirmed_nodes)
for node_info in netmap.copy():
if node_info.node_id in chunks_node_key and node_info in netmap:
netmap.remove(node_info)
result = []
for node_info in netmap:
for cluster_node in cluster.cluster_nodes:
if node_info.node == cluster_node.host_ip:
result.append(cluster_node)
return result
@reporter.step("Search node with chunk {chunk}")
def get_chunk_node(self, cluster: Cluster, chunk: Chunk) -> tuple[ClusterNode, NodeNetmapInfo]:
netmap = parse_netmap_output(self.cli.netmap.snapshot(cluster.default_rpc_endpoint, timeout=CLI_DEFAULT_TIMEOUT).stdout)
for node_info in netmap:
if node_info.node_id in chunk.confirmed_nodes:
for cluster_node in cluster.cluster_nodes:
if cluster_node.host_ip == node_info.node:
return (cluster_node, node_info)
@wait_for_success(300, 5, fail_testcase=None)
@reporter.step("Search shard with chunk {chunk}")
def get_shard_chunk(self, node: ClusterNode, chunk: Chunk) -> str:
oid_path = f"{chunk.object_id[0]}/{chunk.object_id[1]}/{chunk.object_id[2]}/{chunk.object_id[3]}"
node_shell = node.storage_node.host.get_shell()
shards_watcher = ShardsWatcher(node)
with reporter.step("Search object file"):
for shard_id, shard_info in shards_watcher.shards_snapshots[-1].items():
check_dir = node_shell.exec(f" [ -d {shard_info['blobstor'][1]['path']}/{oid_path} ] && echo 1 || echo 0").stdout
if "1" in check_dir.strip():
return shard_id
@reporter.step("Get all chunks")
def get_all(
self,
rpc_endpoint: str,
cid: str,
oid: str,
address: Optional[str] = None,
bearer: Optional[str] = None,
generate_key: Optional[bool] = None,
trace: bool = True,
root: bool = False,
verify_presence_all: bool = False,
json: bool = True,
ttl: Optional[int] = None,
xhdr: Optional[dict] = None,
timeout: Optional[str] = None,
) -> list[Chunk]:
object_nodes = self.cli.object.nodes(
rpc_endpoint=rpc_endpoint,
cid=cid,
address=address,
bearer=bearer,
generate_key=generate_key,
oid=oid,
trace=trace,
root=root,
verify_presence_all=verify_presence_all,
json=json,
ttl=ttl,
xhdr=xhdr,
timeout=timeout,
)
return self._parse_object_nodes(object_nodes.stdout.split("\n")[0])
@reporter.step("Get last parity chunk")
def get_parity(
self,
rpc_endpoint: str,
cid: str,
address: Optional[str] = None,
bearer: Optional[str] = None,
generate_key: Optional[bool] = None,
oid: Optional[str] = None,
trace: bool = True,
root: bool = False,
verify_presence_all: bool = False,
json: bool = True,
ttl: Optional[int] = None,
xhdr: Optional[dict] = None,
timeout: Optional[str] = None,
) -> Chunk:
object_nodes = self.cli.object.nodes(
rpc_endpoint=rpc_endpoint,
cid=cid,
address=address,
bearer=bearer,
generate_key=generate_key,
oid=oid,
trace=trace,
root=root,
verify_presence_all=verify_presence_all,
json=json,
ttl=ttl,
xhdr=xhdr,
timeout=timeout,
)
return self._parse_object_nodes(object_nodes.stdout.split("\n")[0])[-1]
@reporter.step("Get first data chunk")
def get_first_data(
self,
rpc_endpoint: str,
cid: str,
oid: Optional[str] = None,
address: Optional[str] = None,
bearer: Optional[str] = None,
generate_key: Optional[bool] = None,
trace: bool = True,
root: bool = False,
verify_presence_all: bool = False,
json: bool = True,
ttl: Optional[int] = None,
xhdr: Optional[dict] = None,
timeout: Optional[str] = CLI_DEFAULT_TIMEOUT,
) -> Chunk:
object_nodes = self.cli.object.nodes(
rpc_endpoint=rpc_endpoint,
cid=cid,
address=address,
bearer=bearer,
generate_key=generate_key,
oid=oid,
trace=trace,
root=root,
verify_presence_all=verify_presence_all,
json=json,
ttl=ttl,
xhdr=xhdr,
timeout=timeout,
)
return self._parse_object_nodes(object_nodes.stdout.split("\n")[0])[0]
def _parse_object_nodes(self, object_nodes: str) -> list[Chunk]:
parse_result = json.loads(object_nodes)
if parse_result.get("errors"):
raise parse_result["errors"]
return [Chunk(**chunk) for chunk in parse_result["data_objects"]]

View file

@ -0,0 +1,330 @@
import json
import logging
import re
from typing import List, Optional, Union
from frostfs_testlib import reporter
from frostfs_testlib.cli.frostfs_cli.cli import FrostfsCli
from frostfs_testlib.plugins import load_plugin
from frostfs_testlib.resources.cli import CLI_DEFAULT_TIMEOUT
from frostfs_testlib.s3.interfaces import BucketContainerResolver
from frostfs_testlib.storage.cluster import Cluster, ClusterNode
from frostfs_testlib.storage.grpc_operations import interfaces
from frostfs_testlib.utils import json_utils
logger = logging.getLogger("NeoLogger")
class ContainerOperations(interfaces.ContainerInterface):
def __init__(self, cli: FrostfsCli) -> None:
self.cli = cli
@reporter.step("Create Container")
def create(
self,
endpoint: str,
nns_zone: Optional[str] = None,
nns_name: Optional[str] = None,
address: Optional[str] = None,
attributes: Optional[dict] = None,
basic_acl: Optional[str] = None,
await_mode: bool = False,
disable_timestamp: bool = False,
force: bool = False,
trace: bool = False,
name: Optional[str] = None,
nonce: Optional[str] = None,
policy: Optional[str] = None,
session: Optional[str] = None,
subnet: Optional[str] = None,
ttl: Optional[int] = None,
xhdr: Optional[dict] = None,
timeout: Optional[str] = CLI_DEFAULT_TIMEOUT,
) -> str:
"""
A wrapper for `frostfs-cli container create` call.
Args:
wallet (WalletInfo): a wallet on whose behalf a container is created
rule (optional, str): placement rule for container
basic_acl (optional, str): an ACL for container, will be
appended to `--basic-acl` key
attributes (optional, dict): container attributes , will be
appended to `--attributes` key
session_token (optional, str): a path to session token file
session_wallet(optional, str): a path to the wallet which signed
the session token; this parameter makes sense
when paired with `session_token`
shell: executor for cli command
endpoint: FrostFS endpoint to send request to, appends to `--rpc-endpoint` key
options (optional, dict): any other options to pass to the call
name (optional, str): container name attribute
await_mode (bool): block execution until container is persisted
wait_for_creation (): Wait for container shows in container list
timeout: Timeout for the operation.
Returns:
(str): CID of the created container
"""
result = self.cli.container.create(
rpc_endpoint=endpoint,
policy=policy,
nns_zone=nns_zone,
nns_name=nns_name,
address=address,
attributes=attributes,
basic_acl=basic_acl,
await_mode=await_mode,
disable_timestamp=disable_timestamp,
force=force,
trace=trace,
name=name,
nonce=nonce,
session=session,
subnet=subnet,
ttl=ttl,
xhdr=xhdr,
timeout=timeout,
)
cid = self._parse_cid(result.stdout)
logger.info("Container created; waiting until it is persisted in the sidechain")
return cid
@reporter.step("List Containers")
def list(
self,
endpoint: str,
name: Optional[str] = None,
address: Optional[str] = None,
generate_key: Optional[bool] = None,
owner: Optional[str] = None,
ttl: Optional[int] = None,
xhdr: Optional[dict] = None,
timeout: Optional[str] = CLI_DEFAULT_TIMEOUT,
**params,
) -> List[str]:
"""
A wrapper for `frostfs-cli container list` call. It returns all the
available containers for the given wallet.
Args:
shell: executor for cli command
endpoint: FrostFS endpoint to send request to, appends to `--rpc-endpoint` key
timeout: Timeout for the operation.
Returns:
(list): list of containers
"""
result = self.cli.container.list(
rpc_endpoint=endpoint,
name=name,
address=address,
generate_key=generate_key,
owner=owner,
ttl=ttl,
xhdr=xhdr,
timeout=timeout,
**params,
)
return result.stdout.split()
@reporter.step("List Objects in container")
def list_objects(
self,
endpoint: str,
cid: str,
bearer: Optional[str] = None,
wallet: Optional[str] = None,
address: Optional[str] = None,
generate_key: Optional[bool] = None,
trace: bool = False,
ttl: Optional[int] = None,
xhdr: Optional[dict] = None,
timeout: Optional[str] = CLI_DEFAULT_TIMEOUT,
) -> List[str]:
"""
A wrapper for `frostfs-cli container list-objects` call. It returns all the
available objects in container.
Args:
container_id: cid of container
endpoint: FrostFS endpoint to send request to, appends to `--rpc-endpoint` key
timeout: Timeout for the operation.
Returns:
(list): list of containers
"""
result = self.cli.container.list_objects(
rpc_endpoint=endpoint,
cid=cid,
bearer=bearer,
wallet=wallet,
address=address,
generate_key=generate_key,
trace=trace,
ttl=ttl,
xhdr=xhdr,
timeout=timeout,
)
logger.info(f"Container objects: \n{result}")
return result.stdout.split()
@reporter.step("Delete container")
def delete(
self,
endpoint: str,
cid: str,
address: Optional[str] = None,
await_mode: bool = False,
session: Optional[str] = None,
ttl: Optional[int] = None,
xhdr: Optional[dict] = None,
force: bool = False,
trace: bool = False,
):
try:
return self.cli.container.delete(
rpc_endpoint=endpoint,
cid=cid,
address=address,
await_mode=await_mode,
session=session,
ttl=ttl,
xhdr=xhdr,
force=force,
trace=trace,
).stdout
except RuntimeError as e:
print(f"Error request:\n{e}")
@reporter.step("Get container")
def get(
self,
endpoint: str,
cid: str,
address: Optional[str] = None,
generate_key: Optional[bool] = None,
await_mode: bool = False,
to: Optional[str] = None,
json_mode: bool = True,
trace: bool = False,
ttl: Optional[int] = None,
xhdr: Optional[dict] = None,
timeout: Optional[str] = CLI_DEFAULT_TIMEOUT,
) -> Union[dict, str]:
result = self.cli.container.get(
rpc_endpoint=endpoint,
cid=cid,
address=address,
generate_key=generate_key,
await_mode=await_mode,
to=to,
json_mode=json_mode,
trace=trace,
ttl=ttl,
xhdr=xhdr,
timeout=timeout,
)
container_info = json.loads(result.stdout)
attributes = dict()
for attr in container_info["attributes"]:
attributes[attr["key"]] = attr["value"]
container_info["attributes"] = attributes
container_info["ownerID"] = json_utils.json_reencode(container_info["ownerID"]["value"])
return container_info
@reporter.step("Get eacl container")
def get_eacl(
self,
endpoint: str,
cid: str,
address: Optional[str] = None,
generate_key: Optional[bool] = None,
await_mode: bool = False,
json_mode: bool = True,
trace: bool = False,
to: Optional[str] = None,
session: Optional[str] = None,
ttl: Optional[int] = None,
xhdr: Optional[dict] = None,
timeout: Optional[str] = CLI_DEFAULT_TIMEOUT,
):
return self.cli.container.get_eacl(
rpc_endpoint=endpoint,
cid=cid,
address=address,
generate_key=generate_key,
await_mode=await_mode,
to=to,
session=session,
ttl=ttl,
xhdr=xhdr,
timeout=CLI_DEFAULT_TIMEOUT,
).stdout
@reporter.step("Get nodes container")
def nodes(
self,
endpoint: str,
cid: str,
cluster: Cluster,
address: Optional[str] = None,
ttl: Optional[int] = None,
from_file: Optional[str] = None,
trace: bool = False,
short: Optional[bool] = True,
xhdr: Optional[dict] = None,
generate_key: Optional[bool] = None,
timeout: Optional[str] = CLI_DEFAULT_TIMEOUT,
) -> List[ClusterNode]:
result = self.cli.container.search_node(
rpc_endpoint=endpoint,
cid=cid,
address=address,
ttl=ttl,
from_file=from_file,
trace=trace,
short=short,
xhdr=xhdr,
generate_key=generate_key,
timeout=timeout,
).stdout
pattern = r"[0-9]+(?:\.[0-9]+){3}"
nodes_ip = list(set(re.findall(pattern, result)))
with reporter.step(f"nodes ips = {nodes_ip}"):
nodes_list = cluster.get_nodes_by_ip(nodes_ip)
with reporter.step(f"Return nodes - {nodes_list}"):
return nodes_list
@reporter.step("Resolve container by name")
def resolve_container_by_name(name: str, node: ClusterNode):
resolver_cls = load_plugin("frostfs.testlib.bucket_cid_resolver", node.host.config.product)
resolver: BucketContainerResolver = resolver_cls()
return resolver.resolve(node, name)
def _parse_cid(self, output: str) -> str:
"""
Parses container ID from a given CLI output. The input string we expect:
container ID: 2tz86kVTDpJxWHrhw3h6PbKMwkLtBEwoqhHQCKTre1FN
awaiting...
container has been persisted on sidechain
We want to take 'container ID' value from the string.
Args:
output (str): CLI output to parse
Returns:
(str): extracted CID
"""
try:
# taking first line from command's output
first_line = output.split("\n")[0]
except Exception:
first_line = ""
logger.error(f"Got empty output: {output}")
splitted = first_line.split(": ")
if len(splitted) != 2:
raise ValueError(f"no CID was parsed from command output: \t{first_line}")
return splitted[1]

View file

@ -0,0 +1,624 @@
import json
import logging
import os
import re
import uuid
from typing import Any, Optional
from frostfs_testlib import reporter, utils
from frostfs_testlib.cli.frostfs_cli.cli import FrostfsCli
from frostfs_testlib.resources.cli import CLI_DEFAULT_TIMEOUT
from frostfs_testlib.resources.common import ASSETS_DIR
from frostfs_testlib.shell.interfaces import CommandResult
from frostfs_testlib.storage.cluster import Cluster, ClusterNode
from frostfs_testlib.storage.grpc_operations import interfaces
from frostfs_testlib.storage.grpc_operations.implementations.chunks import ChunksOperations
from frostfs_testlib.testing.test_control import wait_for_success
from frostfs_testlib.utils import cli_utils, file_utils
logger = logging.getLogger("NeoLogger")
class ObjectOperations(interfaces.ObjectInterface):
def __init__(self, cli: FrostfsCli) -> None:
self.cli = cli
self.chunks: interfaces.ChunksInterface = ChunksOperations(self.cli)
@reporter.step("Delete object")
def delete(
self,
cid: str,
oid: str,
endpoint: str,
bearer: str = "",
xhdr: Optional[dict] = None,
session: Optional[str] = None,
timeout: Optional[str] = CLI_DEFAULT_TIMEOUT,
) -> str:
"""
DELETE an Object.
Args:
cid: ID of Container where we get the Object from
oid: ID of Object we are going to delete
bearer: path to Bearer Token file, appends to `--bearer` key
endpoint: FrostFS endpoint to send request to, appends to `--rpc-endpoint` key
xhdr: Request X-Headers in form of Key=Value
session: path to a JSON-encoded container session token
timeout: Timeout for the operation.
Returns:
(str): Tombstone ID
"""
result = self.cli.object.delete(
rpc_endpoint=endpoint,
cid=cid,
oid=oid,
bearer=bearer,
xhdr=xhdr,
session=session,
timeout=timeout,
)
id_str = result.stdout.split("\n")[1]
tombstone = id_str.split(":")[1]
return tombstone.strip()
@reporter.step("Get object")
def get(
self,
cid: str,
oid: str,
endpoint: str,
bearer: Optional[str] = None,
write_object: Optional[str] = None,
xhdr: Optional[dict] = None,
no_progress: bool = True,
session: Optional[str] = None,
timeout: Optional[str] = CLI_DEFAULT_TIMEOUT,
) -> file_utils.TestFile:
"""
GET from FrostFS.
Args:
cid (str): ID of Container where we get the Object from
oid (str): Object ID
bearer: path to Bearer Token file, appends to `--bearer` key
write_object: path to downloaded file, appends to `--file` key
endpoint: FrostFS endpoint to send request to, appends to `--rpc-endpoint` key
no_progress(optional, bool): do not show progress bar
xhdr (optional, dict): Request X-Headers in form of Key=Value
session (optional, dict): path to a JSON-encoded container session token
timeout: Timeout for the operation.
Returns:
(str): path to downloaded file
"""
if not write_object:
write_object = str(uuid.uuid4())
test_file = file_utils.TestFile(os.path.join(ASSETS_DIR, write_object))
self.cli.object.get(
rpc_endpoint=endpoint,
cid=cid,
oid=oid,
file=test_file,
bearer=bearer,
no_progress=no_progress,
xhdr=xhdr,
session=session,
timeout=timeout,
)
return test_file
@reporter.step("Get object from random node")
def get_from_random_node(
self,
cid: str,
oid: str,
cluster: Cluster,
bearer: Optional[str] = None,
write_object: Optional[str] = None,
xhdr: Optional[dict] = None,
no_progress: bool = True,
session: Optional[str] = None,
timeout: Optional[str] = CLI_DEFAULT_TIMEOUT,
) -> str:
"""
GET from FrostFS random storage node
Args:
cid: ID of Container where we get the Object from
oid: Object ID
cluster: cluster object
bearer (optional, str): path to Bearer Token file, appends to `--bearer` key
write_object (optional, str): path to downloaded file, appends to `--file` key
no_progress(optional, bool): do not show progress bar
xhdr (optional, dict): Request X-Headers in form of Key=Value
session (optional, dict): path to a JSON-encoded container session token
timeout: Timeout for the operation.
Returns:
(str): path to downloaded file
"""
endpoint = cluster.get_random_storage_rpc_endpoint()
return self.get(
cid,
oid,
endpoint,
bearer,
write_object,
xhdr,
no_progress,
session,
timeout,
)
@reporter.step("Get hash object")
def hash(
self,
rpc_endpoint: str,
cid: str,
oid: str,
address: Optional[str] = None,
bearer: Optional[str] = None,
generate_key: Optional[bool] = None,
range: Optional[str] = None,
salt: Optional[str] = None,
ttl: Optional[int] = None,
session: Optional[str] = None,
hash_type: Optional[str] = None,
xhdr: Optional[dict] = None,
timeout: Optional[str] = CLI_DEFAULT_TIMEOUT,
) -> str:
"""
Get object hash.
Args:
address: Address of wallet account.
bearer: File with signed JSON or binary encoded bearer token.
cid: Container ID.
generate_key: Generate new private key.
oid: Object ID.
range: Range to take hash from in the form offset1:length1,...
rpc_endpoint: Remote node address (as 'multiaddr' or '<host>:<port>').
salt: Salt in hex format.
ttl: TTL value in request meta header (default 2).
session: Filepath to a JSON- or binary-encoded token of the object RANGEHASH session.
hash_type: Hash type. Either 'sha256' or 'tz' (default "sha256").
wallet: WIF (NEP-2) string or path to the wallet or binary key.
xhdr: Dict with request X-Headers.
timeout: Timeout for the operation (default 15s).
Returns:
Command's result.
"""
result = self.cli.object.hash(
rpc_endpoint=rpc_endpoint,
cid=cid,
oid=oid,
address=address,
bearer=bearer,
generate_key=generate_key,
range=range,
salt=salt,
ttl=ttl,
xhdr=xhdr,
session=session,
hash_type=hash_type,
timeout=timeout,
)
return result.stdout
@reporter.step("Head object")
def head(
self,
cid: str,
oid: str,
endpoint: str,
bearer: str = "",
xhdr: Optional[dict] = None,
json_output: bool = True,
is_raw: bool = False,
is_direct: bool = False,
session: Optional[str] = None,
timeout: Optional[str] = CLI_DEFAULT_TIMEOUT,
) -> CommandResult | Any:
"""
HEAD an Object.
Args:
cid (str): ID of Container where we get the Object from
oid (str): ObjectID to HEAD
bearer (optional, str): path to Bearer Token file, appends to `--bearer` key
endpoint(optional, str): FrostFS endpoint to send request to
json_output(optional, bool): return response in JSON format or not; this flag
turns into `--json` key
is_raw(optional, bool): send "raw" request or not; this flag
turns into `--raw` key
is_direct(optional, bool): send request directly to the node or not; this flag
turns into `--ttl 1` key
xhdr (optional, dict): Request X-Headers in form of Key=Value
session (optional, dict): path to a JSON-encoded container session token
timeout: Timeout for the operation.
Returns:
depending on the `json_output` parameter value, the function returns
(dict): HEAD response in JSON format
or
(str): HEAD response as a plain text
"""
result = self.cli.object.head(
rpc_endpoint=endpoint,
cid=cid,
oid=oid,
bearer=bearer,
json_mode=json_output,
raw=is_raw,
ttl=1 if is_direct else None,
xhdr=xhdr,
session=session,
timeout=timeout,
)
if not json_output:
return result
try:
decoded = json.loads(result.stdout)
except Exception as exc:
# If we failed to parse output as JSON, the cause might be
# the plain text string in the beginning of the output.
# Here we cut off first string and try to parse again.
logger.info(f"failed to parse output: {exc}")
logger.info("parsing output in another way")
fst_line_idx = result.stdout.find("\n")
decoded = json.loads(result.stdout[fst_line_idx:])
# if response
if "chunks" in decoded.keys():
logger.info("decoding ec chunks")
return decoded["chunks"]
# If response is Complex Object header, it has `splitId` key
if "splitId" in decoded.keys():
logger.info("decoding split header")
return utils.json_utils.decode_split_header(decoded)
# If response is Last or Linking Object header,
# it has `header` dictionary and non-null `split` dictionary
if "split" in decoded["header"].keys():
if decoded["header"]["split"]:
logger.info("decoding linking object")
return utils.json_utils.decode_linking_object(decoded)
if decoded["header"]["objectType"] == "STORAGE_GROUP":
logger.info("decoding storage group")
return utils.json_utils.decode_storage_group(decoded)
if decoded["header"]["objectType"] == "TOMBSTONE":
logger.info("decoding tombstone")
return utils.json_utils.decode_tombstone(decoded)
logger.info("decoding simple header")
return utils.json_utils.decode_simple_header(decoded)
@reporter.step("Lock Object")
def lock(
self,
cid: str,
oid: str,
endpoint: str,
lifetime: Optional[int] = None,
expire_at: Optional[int] = None,
address: Optional[str] = None,
bearer: Optional[str] = None,
session: Optional[str] = None,
ttl: Optional[int] = None,
xhdr: Optional[dict] = None,
timeout: Optional[str] = CLI_DEFAULT_TIMEOUT,
) -> str:
"""
Locks object in container.
Args:
address: Address of wallet account.
bearer: File with signed JSON or binary encoded bearer token.
cid: Container ID.
oid: Object ID.
lifetime: Lock lifetime.
expire_at: Lock expiration epoch.
shell: executor for cli command
endpoint: FrostFS endpoint to send request to, appends to `--rpc-endpoint` key
session: Path to a JSON-encoded container session token.
ttl: TTL value in request meta header (default 2).
wallet: WIF (NEP-2) string or path to the wallet or binary key.
xhdr: Dict with request X-Headers.
timeout: Timeout for the operation.
Returns:
Lock object ID
"""
result = self.cli.object.lock(
rpc_endpoint=endpoint,
lifetime=lifetime,
expire_at=expire_at,
address=address,
cid=cid,
oid=oid,
bearer=bearer,
xhdr=xhdr,
session=session,
ttl=ttl,
timeout=timeout,
)
# Splitting CLI output to separate lines and taking the penultimate line
id_str = result.stdout.strip().split("\n")[0]
oid = id_str.split(":")[1]
return oid.strip()
@reporter.step("Put object")
def put(
self,
path: str,
cid: str,
endpoint: str,
bearer: Optional[str] = None,
copies_number: Optional[int] = None,
attributes: Optional[dict] = None,
xhdr: Optional[dict] = None,
expire_at: Optional[int] = None,
no_progress: bool = True,
session: Optional[str] = None,
timeout: Optional[str] = CLI_DEFAULT_TIMEOUT,
) -> str:
"""
PUT of given file.
Args:
path: path to file to be PUT
cid: ID of Container where we get the Object from
bearer: path to Bearer Token file, appends to `--bearer` key
copies_number: Number of copies of the object to store within the RPC call
attributes: User attributes in form of Key1=Value1,Key2=Value2
endpoint: FrostFS endpoint to send request to, appends to `--rpc-endpoint` key
no_progress: do not show progress bar
expire_at: Last epoch in the life of the object
xhdr: Request X-Headers in form of Key=Value
session: path to a JSON-encoded container session token
timeout: Timeout for the operation.
Returns:
(str): ID of uploaded Object
"""
result = self.cli.object.put(
rpc_endpoint=endpoint,
file=path,
cid=cid,
attributes=attributes,
bearer=bearer,
copies_number=copies_number,
expire_at=expire_at,
no_progress=no_progress,
xhdr=xhdr,
session=session,
timeout=timeout,
)
# Splitting CLI output to separate lines and taking the penultimate line
id_str = result.stdout.strip().split("\n")[-2]
oid = id_str.split(":")[1]
return oid.strip()
@reporter.step("Put object to random node")
def put_to_random_node(
self,
path: str,
cid: str,
cluster: Cluster,
bearer: Optional[str] = None,
copies_number: Optional[int] = None,
attributes: Optional[dict] = None,
xhdr: Optional[dict] = None,
expire_at: Optional[int] = None,
no_progress: bool = True,
session: Optional[str] = None,
timeout: Optional[str] = CLI_DEFAULT_TIMEOUT,
) -> str:
"""
PUT of given file to a random storage node.
Args:
path: path to file to be PUT
cid: ID of Container where we get the Object from
cluster: cluster under test
bearer: path to Bearer Token file, appends to `--bearer` key
copies_number: Number of copies of the object to store within the RPC call
attributes: User attributes in form of Key1=Value1,Key2=Value2
cluster: cluster under test
no_progress: do not show progress bar
expire_at: Last epoch in the life of the object
xhdr: Request X-Headers in form of Key=Value
session: path to a JSON-encoded container session token
timeout: Timeout for the operation.
Returns:
ID of uploaded Object
"""
endpoint = cluster.get_random_storage_rpc_endpoint()
return self.put(
path,
cid,
endpoint,
bearer,
copies_number,
attributes,
xhdr,
expire_at,
no_progress,
session,
timeout=timeout,
)
@reporter.step("Get Range")
def range(
self,
cid: str,
oid: str,
range_cut: str,
endpoint: str,
bearer: str = "",
xhdr: Optional[dict] = None,
session: Optional[str] = None,
timeout: Optional[str] = CLI_DEFAULT_TIMEOUT,
) -> tuple[file_utils.TestFile, bytes]:
"""
GETRANGE an Object.
Args:
wallet: wallet on whose behalf GETRANGE is done
cid: ID of Container where we get the Object from
oid: ID of Object we are going to request
range_cut: range to take data from in the form offset:length
shell: executor for cli command
endpoint: FrostFS endpoint to send request to, appends to `--rpc-endpoint` key
bearer: path to Bearer Token file, appends to `--bearer` key
xhdr: Request X-Headers in form of Key=Value
session: path to a JSON-encoded container session token
timeout: Timeout for the operation.
Returns:
(str, bytes) - path to the file with range content and content of this file as bytes
"""
test_file = file_utils.TestFile(os.path.join(ASSETS_DIR, str(uuid.uuid4())))
self.cli.object.range(
rpc_endpoint=endpoint,
cid=cid,
oid=oid,
range=range_cut,
file=test_file,
bearer=bearer,
xhdr=xhdr,
session=session,
timeout=timeout,
)
with open(test_file, "rb") as file:
content = file.read()
return test_file, content
@reporter.step("Search object")
def search(
self,
cid: str,
endpoint: str,
bearer: str = "",
oid: Optional[str] = None,
filters: Optional[dict] = None,
expected_objects_list: Optional[list] = None,
xhdr: Optional[dict] = None,
session: Optional[str] = None,
phy: bool = False,
root: bool = False,
timeout: Optional[str] = CLI_DEFAULT_TIMEOUT,
address: Optional[str] = None,
generate_key: Optional[bool] = None,
ttl: Optional[int] = None,
) -> list:
"""
SEARCH an Object.
Args:
wallet: wallet on whose behalf SEARCH is done
cid: ID of Container where we get the Object from
shell: executor for cli command
bearer: path to Bearer Token file, appends to `--bearer` key
endpoint: FrostFS endpoint to send request to, appends to `--rpc-endpoint` key
filters: key=value pairs to filter Objects
expected_objects_list: a list of ObjectIDs to compare found Objects with
xhdr: Request X-Headers in form of Key=Value
session: path to a JSON-encoded container session token
phy: Search physically stored objects.
root: Search for user objects.
timeout: Timeout for the operation.
Returns:
list of found ObjectIDs
"""
result = self.cli.object.search(
rpc_endpoint=endpoint,
cid=cid,
bearer=bearer,
oid=oid,
xhdr=xhdr,
filters=[f"{filter_key} EQ {filter_val}" for filter_key, filter_val in filters.items()] if filters else None,
session=session,
phy=phy,
root=root,
address=address,
generate_key=generate_key,
ttl=ttl,
timeout=timeout,
)
found_objects = re.findall(r"(\w{43,44})", result.stdout)
if expected_objects_list:
if sorted(found_objects) == sorted(expected_objects_list):
logger.info(f"Found objects list '{found_objects}' " f"is equal for expected list '{expected_objects_list}'")
else:
logger.warning(f"Found object list {found_objects} " f"is not equal to expected list '{expected_objects_list}'")
return found_objects
@wait_for_success()
@reporter.step("Search object nodes")
def nodes(
self,
cluster: Cluster,
cid: str,
oid: str,
alive_node: ClusterNode,
bearer: str = "",
xhdr: Optional[dict] = None,
is_direct: bool = False,
verify_presence_all: bool = False,
timeout: Optional[str] = CLI_DEFAULT_TIMEOUT,
) -> list[ClusterNode]:
endpoint = alive_node.storage_node.get_rpc_endpoint()
response = self.cli.object.nodes(
rpc_endpoint=endpoint,
cid=cid,
oid=oid,
bearer=bearer,
ttl=1 if is_direct else None,
json=True,
xhdr=xhdr,
timeout=timeout,
verify_presence_all=verify_presence_all,
)
response_json = json.loads(response.stdout)
# Currently, the command will show expected and confirmed nodes.
# And we (currently) count only nodes which are both expected and confirmed
object_nodes_id = {
required_node
for data_object in response_json["data_objects"]
for required_node in data_object["required_nodes"]
if required_node in data_object["confirmed_nodes"]
}
netmap_nodes_list = cli_utils.parse_netmap_output(
self.cli.netmap.snapshot(
rpc_endpoint=endpoint,
).stdout
)
netmap_nodes = [
netmap_node for object_node in object_nodes_id for netmap_node in netmap_nodes_list if object_node == netmap_node.node_id
]
object_nodes = [
cluster_node
for netmap_node in netmap_nodes
for cluster_node in cluster.cluster_nodes
if netmap_node.node == cluster_node.host_ip
]
return object_nodes

View file

@ -0,0 +1,392 @@
from abc import ABC, abstractmethod
from typing import Any, List, Optional
from frostfs_testlib.shell.interfaces import CommandResult
from frostfs_testlib.storage.cluster import Cluster, ClusterNode
from frostfs_testlib.storage.constants import PlacementRule
from frostfs_testlib.storage.dataclasses.storage_object_info import Chunk, NodeNetmapInfo
from frostfs_testlib.utils import file_utils
class ChunksInterface(ABC):
@abstractmethod
def search_node_without_chunks(self, chunks: list[Chunk], cluster: Cluster, endpoint: str = None) -> list[ClusterNode]:
pass
@abstractmethod
def get_chunk_node(self, cluster: Cluster, chunk: Chunk) -> tuple[ClusterNode, NodeNetmapInfo]:
pass
@abstractmethod
def get_shard_chunk(self, node: ClusterNode, chunk: Chunk) -> str:
pass
@abstractmethod
def get_all(
self,
rpc_endpoint: str,
cid: str,
oid: str,
wallet: Optional[str] = None,
address: Optional[str] = None,
bearer: Optional[str] = None,
generate_key: Optional[bool] = None,
trace: bool = False,
root: bool = False,
verify_presence_all: bool = False,
json: bool = True,
ttl: Optional[int] = None,
xhdr: Optional[dict] = None,
timeout: Optional[str] = None,
) -> list[Chunk]:
pass
@abstractmethod
def get_parity(
self,
rpc_endpoint: str,
cid: str,
wallet: Optional[str] = None,
address: Optional[str] = None,
bearer: Optional[str] = None,
generate_key: Optional[bool] = None,
oid: Optional[str] = None,
trace: bool = False,
root: bool = False,
verify_presence_all: bool = False,
json: bool = True,
ttl: Optional[int] = None,
xhdr: Optional[dict] = None,
timeout: Optional[str] = None,
) -> Chunk:
pass
@abstractmethod
def get_first_data(
self,
rpc_endpoint: str,
cid: str,
wallet: Optional[str] = None,
address: Optional[str] = None,
bearer: Optional[str] = None,
generate_key: Optional[bool] = None,
oid: Optional[str] = None,
trace: bool = False,
root: bool = False,
verify_presence_all: bool = False,
json: bool = True,
ttl: Optional[int] = None,
xhdr: Optional[dict] = None,
timeout: Optional[str] = None,
) -> Chunk:
pass
class ObjectInterface(ABC):
def __init__(self) -> None:
self.chunks: ChunksInterface
@abstractmethod
def delete(
self,
cid: str,
oid: str,
endpoint: str,
bearer: str = "",
xhdr: Optional[dict] = None,
session: Optional[str] = None,
timeout: Optional[str] = None,
) -> str:
pass
@abstractmethod
def get(
self,
cid: str,
oid: str,
endpoint: str,
bearer: Optional[str] = None,
write_object: Optional[str] = None,
xhdr: Optional[dict] = None,
no_progress: bool = True,
session: Optional[str] = None,
timeout: Optional[str] = None,
) -> file_utils.TestFile:
pass
@abstractmethod
def get_from_random_node(
self,
cid: str,
oid: str,
cluster: Cluster,
bearer: Optional[str] = None,
write_object: Optional[str] = None,
xhdr: Optional[dict] = None,
no_progress: bool = True,
session: Optional[str] = None,
timeout: Optional[str] = None,
) -> str:
pass
@abstractmethod
def hash(
self,
endpoint: str,
cid: str,
oid: str,
address: Optional[str] = None,
bearer: Optional[str] = None,
generate_key: Optional[bool] = None,
range: Optional[str] = None,
salt: Optional[str] = None,
ttl: Optional[int] = None,
session: Optional[str] = None,
hash_type: Optional[str] = None,
xhdr: Optional[dict] = None,
timeout: Optional[str] = None,
) -> str:
pass
@abstractmethod
def head(
self,
cid: str,
oid: str,
endpoint: str,
bearer: str = "",
xhdr: Optional[dict] = None,
json_output: bool = True,
is_raw: bool = False,
is_direct: bool = False,
session: Optional[str] = None,
timeout: Optional[str] = None,
) -> CommandResult | Any:
pass
@abstractmethod
def lock(
self,
cid: str,
oid: str,
endpoint: str,
lifetime: Optional[int] = None,
expire_at: Optional[int] = None,
address: Optional[str] = None,
bearer: Optional[str] = None,
session: Optional[str] = None,
ttl: Optional[int] = None,
xhdr: Optional[dict] = None,
timeout: Optional[str] = None,
) -> str:
pass
@abstractmethod
def put(
self,
path: str,
cid: str,
endpoint: str,
bearer: Optional[str] = None,
copies_number: Optional[int] = None,
attributes: Optional[dict] = None,
xhdr: Optional[dict] = None,
expire_at: Optional[int] = None,
no_progress: bool = True,
session: Optional[str] = None,
timeout: Optional[str] = None,
) -> str:
pass
@abstractmethod
def put_to_random_node(
self,
path: str,
cid: str,
cluster: Cluster,
bearer: Optional[str] = None,
copies_number: Optional[int] = None,
attributes: Optional[dict] = None,
xhdr: Optional[dict] = None,
expire_at: Optional[int] = None,
no_progress: bool = True,
session: Optional[str] = None,
timeout: Optional[str] = None,
) -> str:
pass
@abstractmethod
def range(
self,
cid: str,
oid: str,
range_cut: str,
endpoint: str,
bearer: str = "",
xhdr: Optional[dict] = None,
session: Optional[str] = None,
timeout: Optional[str] = None,
) -> tuple[file_utils.TestFile, bytes]:
pass
@abstractmethod
def search(
self,
cid: str,
endpoint: str,
bearer: str = "",
oid: Optional[str] = None,
filters: Optional[dict] = None,
expected_objects_list: Optional[list] = None,
xhdr: Optional[dict] = None,
session: Optional[str] = None,
phy: bool = False,
root: bool = False,
timeout: Optional[str] = None,
address: Optional[str] = None,
generate_key: Optional[bool] = None,
ttl: Optional[int] = None,
) -> List:
pass
@abstractmethod
def nodes(
self,
cluster: Cluster,
cid: str,
oid: str,
alive_node: ClusterNode,
bearer: str = "",
xhdr: Optional[dict] = None,
is_direct: bool = False,
verify_presence_all: bool = False,
timeout: Optional[str] = None,
) -> List[ClusterNode]:
pass
class ContainerInterface(ABC):
@abstractmethod
def create(
self,
endpoint: str,
nns_zone: Optional[str] = None,
nns_name: Optional[str] = None,
address: Optional[str] = None,
attributes: Optional[dict] = None,
basic_acl: Optional[str] = None,
await_mode: bool = False,
disable_timestamp: bool = False,
force: bool = False,
trace: bool = False,
name: Optional[str] = None,
nonce: Optional[str] = None,
policy: Optional[str] = None,
session: Optional[str] = None,
subnet: Optional[str] = None,
ttl: Optional[int] = None,
xhdr: Optional[dict] = None,
timeout: Optional[str] = None,
) -> str:
"""
Create a new container and register it in the FrostFS.
It will be stored in the sidechain when the Inner Ring accepts it.
"""
raise NotImplementedError("No implemethed method create")
@abstractmethod
def delete(
self,
endpoint: str,
cid: str,
address: Optional[str] = None,
await_mode: bool = False,
session: Optional[str] = None,
ttl: Optional[int] = None,
xhdr: Optional[dict] = None,
force: bool = False,
trace: bool = False,
) -> List[str]:
"""
Delete an existing container.
Only the owner of the container has permission to remove the container.
"""
raise NotImplementedError("No implemethed method delete")
@abstractmethod
def get(
self,
endpoint: str,
cid: str,
address: Optional[str] = None,
generate_key: Optional[bool] = None,
await_mode: bool = False,
to: Optional[str] = None,
json_mode: bool = True,
trace: bool = False,
ttl: Optional[int] = None,
xhdr: Optional[dict] = None,
timeout: Optional[str] = None,
) -> List[str]:
"""Get container field info."""
raise NotImplementedError("No implemethed method get")
@abstractmethod
def get_eacl(
self,
endpoint: str,
cid: str,
address: Optional[str] = None,
generate_key: Optional[bool] = None,
await_mode: bool = False,
json_mode: bool = True,
trace: bool = False,
to: Optional[str] = None,
session: Optional[str] = None,
ttl: Optional[int] = None,
xhdr: Optional[dict] = None,
timeout: Optional[str] = None,
) -> List[str]:
"""Get extended ACL table of container."""
raise NotImplementedError("No implemethed method get-eacl")
@abstractmethod
def list(
self,
endpoint: str,
name: Optional[str] = None,
address: Optional[str] = None,
generate_key: Optional[bool] = None,
trace: bool = False,
owner: Optional[str] = None,
ttl: Optional[int] = None,
xhdr: Optional[dict] = None,
timeout: Optional[str] = None,
**params,
) -> List[str]:
"""List all created containers."""
raise NotImplementedError("No implemethed method list")
@abstractmethod
def nodes(
self,
endpoint: str,
cid: str,
cluster: Cluster,
address: Optional[str] = None,
ttl: Optional[int] = None,
from_file: Optional[str] = None,
trace: bool = False,
short: Optional[bool] = True,
xhdr: Optional[dict] = None,
generate_key: Optional[bool] = None,
timeout: Optional[str] = None,
) -> List[ClusterNode]:
"""Show the nodes participating in the container in the current epoch."""
raise NotImplementedError("No implemethed method nodes")
class GrpcClientWrapper(ABC):
def __init__(self) -> None:
self.object: ObjectInterface
self.container: ContainerInterface

View file

@ -25,12 +25,8 @@ class ClusterTestBase:
for _ in range(epochs_to_tick):
self.tick_epoch(alive_node, wait_block)
def tick_epoch(
self,
alive_node: Optional[StorageNode] = None,
wait_block: int = None,
):
epoch.tick_epoch(self.shell, self.cluster, alive_node=alive_node)
def tick_epoch(self, alive_node: Optional[StorageNode] = None, wait_block: int = None, delta: Optional[int] = None):
epoch.tick_epoch(self.shell, self.cluster, alive_node=alive_node, delta=delta)
if wait_block:
self.wait_for_blocks(wait_block)

View file

@ -1,10 +1,23 @@
import itertools
import traceback
from concurrent.futures import Future, ThreadPoolExecutor
from contextlib import contextmanager
from typing import Callable, Collection, Optional, Union
MAX_WORKERS = 50
@contextmanager
def parallel_workers_limit(workers_count: int):
global MAX_WORKERS
original_value = MAX_WORKERS
MAX_WORKERS = workers_count
try:
yield
finally:
MAX_WORKERS = original_value
def parallel(
fn: Union[Callable, list[Callable]],
parallel_items: Optional[Collection] = None,
@ -43,7 +56,42 @@ def parallel(
# Check for exceptions
exceptions = [future.exception() for future in futures if future.exception()]
if exceptions:
message = "\n".join([str(e) for e in exceptions])
# Prettify exception in parallel with all underlying stack traces
# For example, we had 3 RuntimeError exceptions during parallel. This format will give us something like
#
# RuntimeError: The following exceptions occured during parallel run:
# 1) Exception one text
# 2) Exception two text
# 3) Exception three text
# TRACES:
# ==== 1 ====
# Traceback (most recent call last):
# File "/usr/lib/python3.10/concurrent/futures/thread.py", line 58, in run
# result = self.fn(*self.args, **self.kwargs)
# File "frostfs_testcases/pytest_tests/testsuites/object/test_object_tombstone.py", line 17, in check_service
# raise RuntimeError(f"Exception one text")
# RuntimeError: Exception one text
#
# ==== 2 ====
# Traceback (most recent call last):
# File "/usr/lib/python3.10/concurrent/futures/thread.py", line 58, in run
# result = self.fn(*self.args, **self.kwargs)
# File "frostfs_testcases/pytest_tests/testsuites/object/test_object_tombstone.py", line 17, in check_service
# raise RuntimeError(f"Exception two text")
# RuntimeError: Exception two text
#
# ==== 3 ====
# Traceback (most recent call last):
# File "/usr/lib/python3.10/concurrent/futures/thread.py", line 58, in run
# result = self.fn(*self.args, **self.kwargs)
# File "frostfs_testcases/pytest_tests/testsuites/object/test_object_tombstone.py", line 17, in check_service
# raise RuntimeError(f"Exception three text")
# RuntimeError: Exception three text
short_summary = "\n".join([f"{i}) {str(e)}" for i, e in enumerate(exceptions, 1)])
stack_traces = "\n".join(
[f"==== {i} ====\n{''.join(traceback.TracebackException.from_exception(e).format())}" for i, e in enumerate(exceptions, 1)]
)
message = f"{short_summary}\nTRACES:\n{stack_traces}"
raise RuntimeError(f"The following exceptions occured during parallel run:\n{message}")
return futures

View file

@ -1,13 +1,16 @@
import inspect
import logging
import os
from functools import wraps
from time import sleep, time
from typing import Any
import yaml
from _pytest.outcomes import Failed
from pytest import fail
from frostfs_testlib import reporter
from frostfs_testlib.resources.common import ASSETS_DIR
from frostfs_testlib.utils.func_utils import format_by_args
logger = logging.getLogger("NeoLogger")
@ -128,6 +131,42 @@ def run_optionally(enabled: bool, mock_value: Any = True):
return deco
def cached_fixture(enabled: bool):
"""
Decorator to cache fixtures.
MUST be placed after @pytest.fixture and before @allure decorators.
Args:
enabled: if true, decorated func will be cached.
"""
def deco(func):
@wraps(func)
def func_impl(*a, **kw):
# TODO: *a and *kw should be parsed to some kind of hashsum and used in filename to prevent cache load from different parameters
cache_file = os.path.join(ASSETS_DIR, f"fixture_cache_{func.__name__}.yml")
if enabled and os.path.exists(cache_file):
with open(cache_file, "r") as cache_input:
return yaml.load(cache_input, Loader=yaml.Loader)
result = func(*a, **kw)
if enabled:
with open(cache_file, "w") as cache_output:
yaml.dump(result, cache_output)
return result
# TODO: cache yielding fixtures
@wraps(func)
def gen_impl(*a, **kw):
raise NotImplementedError("Not implemented for yielding fixtures")
return gen_impl if inspect.isgeneratorfunction(func) else func_impl
return deco
def wait_for_success(
max_wait_time: int = 60,
interval: int = 1,

View file

@ -9,13 +9,12 @@ import csv
import json
import logging
import re
import subprocess
import sys
from contextlib import suppress
from datetime import datetime
from io import StringIO
from textwrap import shorten
from typing import Dict, List, Optional, TypedDict, Union
from typing import Any, Optional, Union
import pexpect
@ -75,22 +74,75 @@ def _attach_allure_log(cmd: str, output: str, return_code: int, start_time: date
reporter.attach(command_attachment, "Command execution")
def log_command_execution(url: str, cmd: str, output: Union[str, dict], params: Optional[dict] = None) -> None:
def log_command_execution(cmd: str, output: Union[str, dict], params: Optional[dict] = None, **kwargs) -> None:
logger.info(f"{cmd}: {output}")
with suppress(Exception):
json_output = json.dumps(output, indent=4, sort_keys=True)
output = json_output
if not params:
params = {}
output_params = params
try:
json_params = json.dumps(params, indent=4, sort_keys=True)
json_params = json.dumps(params, indent=4, sort_keys=True, default=str)
except TypeError as err:
logger.warning(f"Failed to serialize '{cmd}' request parameters:\n{params}\nException: {err}")
else:
params = json_params
output_params = json_params
command_attachment = f"COMMAND: '{cmd}'\n" f"URL: {url}\n" f"PARAMS:\n{params}\n" f"OUTPUT:\n{output}\n"
reporter.attach(command_attachment, "Command execution")
output = json.dumps(output, indent=4, sort_keys=True, default=str)
command_execution = f"COMMAND: '{cmd}'\n" f"URL: {kwargs['endpoint']}\n" f"PARAMS:\n{output_params}\n" f"OUTPUT:\n{output}\n"
aws_command = _convert_request_to_aws_cli_command(cmd, params, **kwargs)
reporter.attach(command_execution, "Command execution")
reporter.attach(aws_command, "AWS CLI Command")
def _convert_request_to_aws_cli_command(command: str, params: dict, **kwargs) -> str:
overriden_names = [_convert_json_name_to_aws_cli(name) for name in kwargs.keys()]
command = command.replace("_", "-")
options = []
for name, value in params.items():
name = _convert_json_name_to_aws_cli(name)
# To override parameters for AWS CLI
if name in overriden_names:
continue
if option := _create_option(name, value):
options.append(option)
for name, value in kwargs.items():
name = _convert_json_name_to_aws_cli(name)
if option := _create_option(name, value):
options.append(option)
options = " ".join(options)
api = "s3api" if "s3" in kwargs["endpoint"] else "iam"
return f"aws --no-verify-ssl --no-paginate {api} {command} {options}"
def _convert_json_name_to_aws_cli(name: str) -> str:
specific_names = {"CORSConfiguration": "cors-configuration"}
if aws_cli_name := specific_names.get(name):
return aws_cli_name
return re.sub(r"([a-z])([A-Z])", r"\1 \2", name).lower().replace(" ", "-").replace("_", "-")
def _create_option(name: str, value: Any) -> str | None:
if isinstance(value, bool) and value:
return f"--{name}"
if isinstance(value, dict):
value = json.dumps(value, indent=4, sort_keys=True, default=str)
return f"--{name} '{value}'"
if value:
return f"--{name} {value}"
return None
def parse_netmap_output(output: str) -> list[NodeNetmapInfo]:

View file

@ -45,7 +45,7 @@ def ensure_directory_opener(path, flags):
# TODO: Do not add {size} to title yet, since it produces dynamic info in top level steps
# Use object_size dt in future as argument
@reporter.step("Generate file")
def generate_file(size: int) -> TestFile:
def generate_file(size: int, file_name: Optional[str] = None) -> TestFile:
"""Generates a binary file with the specified size in bytes.
Args:
@ -54,7 +54,11 @@ def generate_file(size: int) -> TestFile:
Returns:
The path to the generated file.
"""
test_file = TestFile(os.path.join(ASSETS_DIR, string_utils.unique_name("object-")))
if file_name is None:
file_name = string_utils.unique_name("object-")
test_file = TestFile(os.path.join(ASSETS_DIR, file_name))
with open(test_file, "wb", opener=ensure_directory_opener) as file:
file.write(os.urandom(size))
logger.info(f"File with size {size} bytes has been generated: {test_file}")

View file

@ -1,3 +1,4 @@
import itertools
import random
import re
import string
@ -7,6 +8,9 @@ ONLY_ASCII_LETTERS = string.ascii_letters
DIGITS_AND_ASCII_LETTERS = string.ascii_letters + string.digits
NON_DIGITS_AND_LETTERS = string.punctuation
# if unique_name is called multiple times within the same microsecond, append 0-4 to the name so it surely unique
FUSE = itertools.cycle(range(5))
def unique_name(prefix: str = "", postfix: str = ""):
"""
@ -18,7 +22,7 @@ def unique_name(prefix: str = "", postfix: str = ""):
Returns:
unique name string
"""
return f"{prefix}{hex(int(datetime.now().timestamp() * 1000000))}{postfix}"
return f"{prefix}{hex(int(datetime.now().timestamp() * 1000000))}{next(FUSE)}{postfix}"
def random_string(length: int = 5, source: str = ONLY_ASCII_LETTERS):