Rename neofs to frostfs

Signed-off-by: Yulia Kovshova <y.kovshova@yadro.com>
This commit is contained in:
Юлия Ковшова 2023-01-10 16:02:24 +03:00 committed by Stanislav Bogatyrev
parent 5a2c7ac98d
commit 6d3b6f0f2f
83 changed files with 330 additions and 338 deletions

View file

@ -0,0 +1,4 @@
from frostfs_testlib.cli.frostfs_adm import FrostfsAdm
from frostfs_testlib.cli.frostfs_authmate import FrostfsAuthmate
from frostfs_testlib.cli.frostfs_cli import FrostfsCli
from frostfs_testlib.cli.neogo import NeoGo, NetworkType

View file

@ -0,0 +1,74 @@
from typing import Optional
from frostfs_testlib.shell import CommandOptions, CommandResult, InteractiveInput, Shell
class CliCommand:
WALLET_SOURCE_ERROR_MSG = "Provide either wallet or wallet_config to specify wallet location"
WALLET_PASSWD_ERROR_MSG = "Provide either wallet_password or wallet_config to specify password"
cli_exec_path: Optional[str] = None
__base_params: Optional[str] = None
map_params = {
"json_mode": "json",
"await_mode": "await",
"hash_type": "hash",
"doc_type": "type",
"to_address": "to",
"from_address": "from",
"to_file": "to",
"from_file": "from",
}
def __init__(self, shell: Shell, cli_exec_path: str, **base_params):
self.shell = shell
self.cli_exec_path = cli_exec_path
self.__base_params = " ".join(
[f"--{param} {value}" for param, value in base_params.items() if value]
)
def _format_command(self, command: str, **params) -> str:
param_str = []
for param, value in params.items():
if param == "post_data":
param_str.append(value)
continue
if param in self.map_params.keys():
param = self.map_params[param]
param = param.replace("_", "-")
if not value:
continue
if isinstance(value, bool):
param_str.append(f"--{param}")
elif isinstance(value, int):
param_str.append(f"--{param} {value}")
elif isinstance(value, list):
for value_item in value:
val_str = str(value_item).replace("'", "\\'")
param_str.append(f"--{param} '{val_str}'")
elif isinstance(value, dict):
param_str.append(
f'--{param} \'{",".join(f"{key}={val}" for key, val in value.items())}\''
)
else:
if "'" in str(value):
value_str = str(value).replace('"', '\\"')
param_str.append(f'--{param} "{value_str}"')
else:
param_str.append(f"--{param} '{value}'")
param_str = " ".join(param_str)
return f"{self.cli_exec_path} {self.__base_params} {command or ''} {param_str}"
def _execute(self, command: Optional[str], **params) -> CommandResult:
return self.shell.exec(self._format_command(command, **params))
def _execute_with_password(self, command: Optional[str], password, **params) -> CommandResult:
return self.shell.exec(
self._format_command(command, **params),
options=CommandOptions(
interactive_inputs=[InteractiveInput(prompt_pattern="assword", input=password)]
),
)

View file

@ -0,0 +1 @@
from frostfs_testlib.cli.frostfs_adm.adm import FrostfsAdm

View file

@ -0,0 +1,22 @@
from typing import Optional
from frostfs_testlib.cli.frostfs_adm.config import FrostfsAdmConfig
from frostfs_testlib.cli.frostfs_adm.morph import FrostfsAdmMorph
from frostfs_testlib.cli.frostfs_adm.storage_config import FrostfsAdmStorageConfig
from frostfs_testlib.cli.frostfs_adm.subnet import FrostfsAdmMorphSubnet
from frostfs_testlib.cli.frostfs_adm.version import FrostfsAdmVersion
from frostfs_testlib.shell import Shell
class FrostfsAdm:
morph: Optional[FrostfsAdmMorph] = None
subnet: Optional[FrostfsAdmMorphSubnet] = None
storage_config: Optional[FrostfsAdmStorageConfig] = None
version: Optional[FrostfsAdmVersion] = None
def __init__(self, shell: Shell, frostfs_adm_exec_path: str, config_file: Optional[str] = None):
self.config = FrostfsAdmConfig(shell, frostfs_adm_exec_path, config=config_file)
self.morph = FrostfsAdmMorph(shell, frostfs_adm_exec_path, config=config_file)
self.subnet = FrostfsAdmMorphSubnet(shell, frostfs_adm_exec_path, config=config_file)
self.storage_config = FrostfsAdmStorageConfig(shell, frostfs_adm_exec_path, config=config_file)
self.version = FrostfsAdmVersion(shell, frostfs_adm_exec_path, config=config_file)

View file

@ -0,0 +1,22 @@
from frostfs_testlib.cli.cli_command import CliCommand
from frostfs_testlib.shell import CommandResult
class FrostfsAdmConfig(CliCommand):
def init(self, path: str = "~/.frostfs/adm/config.yml") -> CommandResult:
"""Initialize basic frostfs-adm configuration file.
Args:
path: Path to config (default ~/.frostfs/adm/config.yml).
Returns:
Command's result.
"""
return self._execute(
"config init",
**{
param: param_value
for param, param_value in locals().items()
if param not in ["self"]
},
)

View file

@ -0,0 +1,356 @@
from typing import Optional
from frostfs_testlib.cli.cli_command import CliCommand
from frostfs_testlib.shell import CommandResult
class FrostfsAdmMorph(CliCommand):
def deposit_notary(
self,
rpc_endpoint: str,
account: str,
gas: str,
storage_wallet: Optional[str] = None,
till: Optional[str] = None,
) -> CommandResult:
"""Deposit GAS for notary service.
Args:
account: Wallet account address.
gas: Amount of GAS to deposit.
rpc_endpoint: N3 RPC node endpoint.
storage_wallet: Path to storage node wallet.
till: Notary deposit duration in blocks.
Returns:
Command's result.
"""
return self._execute(
"morph deposit-notary",
**{
param: param_value
for param, param_value in locals().items()
if param not in ["self"]
},
)
def dump_balances(
self,
rpc_endpoint: str,
alphabet: Optional[str] = None,
proxy: Optional[str] = None,
script_hash: Optional[str] = None,
storage: Optional[str] = None,
) -> CommandResult:
"""Dump GAS balances.
Args:
alphabet: Dump balances of alphabet contracts.
proxy: Dump balances of the proxy contract.
rpc_endpoint: N3 RPC node endpoint.
script_hash: Use script-hash format for addresses.
storage: Dump balances of storage nodes from the current netmap.
Returns:
Command's result.
"""
return self._execute(
"morph dump-balances",
**{
param: param_value
for param, param_value in locals().items()
if param not in ["self"]
},
)
def dump_config(self, rpc_endpoint: str) -> CommandResult:
"""Section for morph network configuration commands.
Args:
rpc_endpoint: N3 RPC node endpoint
Returns:
Command's result.
"""
return self._execute(
"morph dump-config",
**{
param: param_value
for param, param_value in locals().items()
if param not in ["self"]
},
)
def dump_containers(
self,
rpc_endpoint: str,
cid: Optional[str] = None,
container_contract: Optional[str] = None,
dump: str = "./testlib_dump_container",
) -> CommandResult:
"""Dump FrostFS containers to file.
Args:
cid: Containers to dump.
container_contract: Container contract hash (for networks without NNS).
dump: File where to save dumped containers (default: ./testlib_dump_container).
rpc_endpoint: N3 RPC node endpoint.
Returns:
Command's result.
"""
return self._execute(
"morph dump-containers",
**{
param: param_value
for param, param_value in locals().items()
if param not in ["self"]
},
)
def dump_hashes(self, rpc_endpoint: str) -> CommandResult:
"""Dump deployed contract hashes.
Args:
rpc_endpoint: N3 RPC node endpoint.
Returns:
Command's result.
"""
return self._execute(
"morph dump-hashes",
**{
param: param_value
for param, param_value in locals().items()
if param not in ["self"]
},
)
def force_new_epoch(
self, rpc_endpoint: Optional[str] = None, alphabet: Optional[str] = None
) -> CommandResult:
"""Create new FrostFS epoch event in the side chain.
Args:
alphabet: Path to alphabet wallets dir.
rpc_endpoint: N3 RPC node endpoint.
Returns:
Command's result.
"""
return self._execute(
"morph force-new-epoch",
**{
param: param_value
for param, param_value in locals().items()
if param not in ["self"]
},
)
def generate_alphabet(
self,
rpc_endpoint: str,
alphabet_wallets: str,
size: int = 7,
) -> CommandResult:
"""Generate alphabet wallets for consensus nodes of the morph network.
Args:
alphabet_wallets: Path to alphabet wallets dir.
size: Amount of alphabet wallets to generate (default 7).
rpc_endpoint: N3 RPC node endpoint.
Returns:
Command's result.
"""
return self._execute(
"morph generate-alphabet",
**{
param: param_value
for param, param_value in locals().items()
if param not in ["self"]
},
)
def generate_storage_wallet(
self,
rpc_endpoint: str,
alphabet_wallets: str,
storage_wallet: str,
initial_gas: Optional[str] = None,
) -> CommandResult:
"""Generate storage node wallet for the morph network.
Args:
alphabet_wallets: Path to alphabet wallets dir.
initial_gas: Initial amount of GAS to transfer.
rpc_endpoint: N3 RPC node endpoint.
storage_wallet: Path to new storage node wallet.
Returns:
Command's result.
"""
return self._execute(
"morph generate-storage-wallet",
**{
param: param_value
for param, param_value in locals().items()
if param not in ["self"]
},
)
def init(
self,
rpc_endpoint: str,
alphabet_wallets: str,
contracts: str,
protocol: str,
container_alias_fee: int = 500,
container_fee: int = 1000,
epoch_duration: int = 240,
homomorphic_disabled: bool = False,
local_dump: Optional[str] = None,
max_object_size: int = 67108864,
) -> CommandResult:
"""Section for morph network configuration commands.
Args:
alphabet_wallets: Path to alphabet wallets dir.
container_alias_fee: Container alias fee (default 500).
container_fee: Container registration fee (default 1000).
contracts: Path to archive with compiled FrostFS contracts
(default fetched from latest github release).
epoch_duration: Amount of side chain blocks in one FrostFS epoch (default 240).
homomorphic_disabled: Disable object homomorphic hashing.
local_dump: Path to the blocks dump file.
max_object_size: Max single object size in bytes (default 67108864).
protocol: Path to the consensus node configuration.
rpc_endpoint: N3 RPC node endpoint.
Returns:
Command's result.
"""
return self._execute(
"morph init",
**{
param: param_value
for param, param_value in locals().items()
if param not in ["self"]
},
)
def refill_gas(
self,
rpc_endpoint: str,
alphabet_wallets: str,
storage_wallet: str,
gas: Optional[str] = None,
) -> CommandResult:
"""Refill GAS of storage node's wallet in the morph network
Args:
alphabet_wallets: Path to alphabet wallets dir.
gas: Additional amount of GAS to transfer.
rpc_endpoint: N3 RPC node endpoint.
storage_wallet: Path to new storage node wallet.
Returns:
Command's result.
"""
return self._execute(
"morph refill-gas",
**{
param: param_value
for param, param_value in locals().items()
if param not in ["self"]
},
)
def restore_containers(
self,
rpc_endpoint: str,
alphabet_wallets: str,
cid: str,
dump: str,
) -> CommandResult:
"""Restore FrostFS containers from file.
Args:
alphabet_wallets: Path to alphabet wallets dir.
cid: Containers to restore.
dump: File to restore containers from.
rpc_endpoint: N3 RPC node endpoint.
Returns:
Command's result.
"""
return self._execute(
"morph restore-containers",
**{
param: param_value
for param, param_value in locals().items()
if param not in ["self"]
},
)
def set_policy(
self,
rpc_endpoint: str,
alphabet_wallets: str,
exec_fee_factor: Optional[int] = None,
storage_price: Optional[int] = None,
fee_per_byte: Optional[int] = None,
) -> CommandResult:
"""Set global policy values.
Args:
alphabet_wallets: Path to alphabet wallets dir.
exec_fee_factor: ExecFeeFactor=<n1>.
storage_price: StoragePrice=<n2>.
fee_per_byte: FeePerByte=<n3>.
rpc_endpoint: N3 RPC node endpoint.
Returns:
Command's result.
"""
non_param_attribute = ""
if exec_fee_factor:
non_param_attribute += f"ExecFeeFactor={exec_fee_factor} "
if storage_price:
non_param_attribute += f"StoragePrice={storage_price} "
if fee_per_byte:
non_param_attribute += f"FeePerByte={fee_per_byte} "
return self._execute(
f"morph restore-containers {non_param_attribute}",
**{
param: param_value
for param, param_value in locals().items()
if param not in ["self", "exec_fee_factor", "storage_price", "fee_per_byte"]
},
)
def update_contracts(
self,
rpc_endpoint: str,
alphabet_wallets: str,
contracts: Optional[str] = None,
) -> CommandResult:
"""Update FrostFS contracts.
Args:
alphabet_wallets: Path to alphabet wallets dir.
contracts: Path to archive with compiled FrostFS contracts
(default fetched from latest github release).
rpc_endpoint: N3 RPC node endpoint.
Returns:
Command's result.
"""
return self._execute(
"morph update-contracts",
**{
param: param_value
for param, param_value in locals().items()
if param not in ["self"]
},
)

View file

@ -0,0 +1,23 @@
from frostfs_testlib.cli.cli_command import CliCommand
from frostfs_testlib.shell import CommandResult
class FrostfsAdmStorageConfig(CliCommand):
def set(self, account: str, wallet: str) -> CommandResult:
"""Initialize basic frostfs-adm configuration file.
Args:
account: Wallet account.
wallet: Path to wallet.
Returns:
Command's result.
"""
return self._execute(
"storage-config",
**{
param: param_value
for param, param_value in locals().items()
if param not in ["self"]
},
)

View file

@ -0,0 +1,239 @@
from typing import Optional
from frostfs_testlib.cli.cli_command import CliCommand
from frostfs_testlib.shell import CommandResult
class FrostfsAdmMorphSubnet(CliCommand):
def create(
self, rpc_endpoint: str, address: str, wallet: str, notary: bool = False
) -> CommandResult:
"""Create FrostFS subnet.
Args:
address: Address in the wallet, optional.
notary: Flag to create subnet in notary environment.
rpc_endpoint: N3 RPC node endpoint.
wallet: Path to file with wallet.
Returns:
Command's result.
"""
return self._execute(
"morph subnet create",
**{
param: param_value
for param, param_value in locals().items()
if param not in ["self"]
},
)
def get(self, rpc_endpoint: str, subnet: str) -> CommandResult:
"""Read information about the FrostFS subnet.
Args:
rpc_endpoint: N3 RPC node endpoint.
subnet: ID of the subnet to read.
Returns:
Command's result.
"""
return self._execute(
"morph subnet get",
**{
param: param_value
for param, param_value in locals().items()
if param not in ["self"]
},
)
def remove(
self, rpc_endpoint: str, wallet: str, subnet: str, address: Optional[str] = None
) -> CommandResult:
"""Remove FrostFS subnet.
Args:
address: Address in the wallet, optional.
rpc_endpoint: N3 RPC node endpoint.
subnet: ID of the subnet to read.
wallet: Path to file with wallet.
Returns:
Command's result.
"""
return self._execute(
"morph subnet remove",
**{
param: param_value
for param, param_value in locals().items()
if param not in ["self"]
},
)
def admin_add(
self,
rpc_endpoint: str,
wallet: str,
admin: str,
subnet: str,
client: Optional[str] = None,
group: Optional[str] = None,
address: Optional[str] = None,
) -> CommandResult:
"""Add admin to the FrostFS subnet.
Args:
address: Address in the wallet, optional.
admin: Hex-encoded public key of the admin.
client: Add client admin instead of node one.
group: Client group ID in text format (needed with --client only).
rpc_endpoint: N3 RPC node endpoint.
subnet: ID of the subnet to read.
wallet: Path to file with wallet.
Returns:
Command's result.
"""
return self._execute(
"morph subnet admin add",
**{
param: param_value
for param, param_value in locals().items()
if param not in ["self"]
},
)
def admin_remove(
self,
rpc_endpoint: str,
wallet: str,
admin: str,
subnet: str,
client: Optional[str] = None,
address: Optional[str] = None,
) -> CommandResult:
"""Remove admin of the FrostFS subnet.
Args:
address: Address in the wallet, optional.
admin: Hex-encoded public key of the admin.
client: Remove client admin instead of node one.
rpc_endpoint: N3 RPC node endpoint.
subnet: ID of the subnet to read.
wallet: Path to file with wallet.
Returns:
Command's result.
"""
return self._execute(
"morph subnet admin remove",
**{
param: param_value
for param, param_value in locals().items()
if param not in ["self"]
},
)
def client_add(
self,
rpc_endpoint: str,
wallet: str,
subnet: str,
client: Optional[str] = None,
group: Optional[str] = None,
address: Optional[str] = None,
) -> CommandResult:
"""Add client to the FrostFS subnet.
Args:
address: Address in the wallet, optional.
client: Add client admin instead of node one.
group: Client group ID in text format (needed with --client only).
rpc_endpoint: N3 RPC node endpoint.
subnet: ID of the subnet to read.
wallet: Path to file with wallet.
Returns:
Command's result.
"""
return self._execute(
"morph subnet client add",
**{
param: param_value
for param, param_value in locals().items()
if param not in ["self"]
},
)
def client_remove(
self,
rpc_endpoint: str,
wallet: str,
client: str,
group: str,
subnet: str,
address: Optional[str] = None,
) -> CommandResult:
"""Remove client of the FrostFS subnet.
Args:
address: Address in the wallet, optional.
client: Remove client admin instead of node one.
group: ID of the client group to work with.
rpc_endpoint: N3 RPC node endpoint.
subnet: ID of the subnet to read.
wallet: Path to file with wallet.
Returns:
Command's result.
"""
return self._execute(
"morph subnet client remove",
**{
param: param_value
for param, param_value in locals().items()
if param not in ["self"]
},
)
def node_add(self, rpc_endpoint: str, wallet: str, node: str, subnet: str) -> CommandResult:
"""Add node to the FrostFS subnet.
Args:
node: Hex-encoded public key of the node.
rpc_endpoint: N3 RPC node endpoint.
subnet: ID of the subnet to read.
wallet: Path to file with wallet.
Returns:
Command's result.
"""
return self._execute(
"morph subnet node add",
**{
param: param_value
for param, param_value in locals().items()
if param not in ["self"]
},
)
def node_remove(self, rpc_endpoint: str, wallet: str, node: str, subnet: str) -> CommandResult:
"""Remove node from the FrostFS subnet.
Args:
node: Hex-encoded public key of the node.
rpc_endpoint: N3 RPC node endpoint.
subnet: ID of the subnet to read.
wallet: Path to file with wallet.
Returns:
Command's result.
"""
return self._execute(
"morph subnet node remove",
**{
param: param_value
for param, param_value in locals().items()
if param not in ["self"]
},
)

View file

@ -0,0 +1,12 @@
from frostfs_testlib.cli.cli_command import CliCommand
from frostfs_testlib.shell import CommandResult
class FrostfsAdmVersion(CliCommand):
def get(self) -> CommandResult:
"""Application version
Returns:
Command's result.
"""
return self._execute("", version=True)

View file

@ -0,0 +1 @@
from frostfs_testlib.cli.frostfs_authmate.authmate import FrostfsAuthmate

View file

@ -0,0 +1,14 @@
from typing import Optional
from frostfs_testlib.cli.frostfs_authmate.secret import FrostfsAuthmateSecret
from frostfs_testlib.cli.frostfs_authmate.version import FrostfsAuthmateVersion
from frostfs_testlib.shell import Shell
class FrostfsAuthmate:
secret: Optional[FrostfsAuthmateSecret] = None
version: Optional[FrostfsAuthmateVersion] = None
def __init__(self, shell: Shell, frostfs_authmate_exec_path: str):
self.secret = FrostfsAuthmateSecret(shell, frostfs_authmate_exec_path)
self.version = FrostfsAuthmateVersion(shell, frostfs_authmate_exec_path)

View file

@ -0,0 +1,91 @@
from typing import Optional, Union
from frostfs_testlib.cli.cli_command import CliCommand
from frostfs_testlib.shell import CommandResult
class FrostfsAuthmateSecret(CliCommand):
def obtain(
self,
wallet: str,
wallet_password: str,
peer: str,
gate_wallet: str,
access_key_id: str,
address: Optional[str] = None,
gate_address: Optional[str] = None,
) -> CommandResult:
"""Obtain a secret from FrostFS network.
Args:
wallet: Path to the wallet.
wallet_password: Wallet password.
address: Address of wallet account.
peer: Address of frostfs peer to connect to.
gate_wallet: Path to the wallet.
gate_address: Address of wallet account.
access_key_id: Access key id for s3.
Returns:
Command's result.
"""
return self._execute_with_password(
"obtain-secret",
wallet_password,
**{
param: param_value
for param, param_value in locals().items()
if param not in ["self"]
},
)
def issue(
self,
wallet: str,
wallet_password: str,
peer: str,
bearer_rules: str,
gate_public_key: Union[str, list[str]],
address: Optional[str] = None,
container_id: Optional[str] = None,
container_friendly_name: Optional[str] = None,
container_placement_policy: Optional[str] = None,
session_tokens: Optional[str] = None,
lifetime: Optional[str] = None,
container_policy: Optional[str] = None,
aws_cli_credentials: Optional[str] = None,
) -> CommandResult:
"""Obtain a secret from FrostFS network
Args:
wallet: Path to the wallet.
wallet_password: Wallet password.
address: Address of wallet account.
peer: Address of a frostfs peer to connect to.
bearer_rules: Rules for bearer token as plain json string.
gate_public_key: Public 256r1 key of a gate (send list[str] of keys to use multiple gates).
container_id: Auth container id to put the secret into.
container_friendly_name: Friendly name of auth container to put the secret into.
container_placement_policy: Placement policy of auth container to put the secret into
(default: "REP 2 IN X CBF 3 SELECT 2 FROM * AS X").
session_tokens: Create session tokens with rules, if the rules are set as 'none', no
session tokens will be created.
lifetime: Lifetime of tokens. For example 50h30m (note: max time unit is an hour so to
set a day you should use 24h). It will be ceil rounded to the nearest amount of
epoch. (default: 720h0m0s).
container_policy: Mapping AWS storage class to FrostFS storage policy as plain json string
or path to json file.
aws_cli_credentials: Path to the aws cli credential file.
Returns:
Command's result.
"""
return self._execute_with_password(
"issue-secret",
wallet_password,
**{
param: param_value
for param, param_value in locals().items()
if param not in ["self"]
},
)

View file

@ -0,0 +1,12 @@
from frostfs_testlib.cli.cli_command import CliCommand
from frostfs_testlib.shell import CommandResult
class FrostfsAuthmateVersion(CliCommand):
def get(self) -> CommandResult:
"""Application version
Returns:
Command's result.
"""
return self._execute("", version=True)

View file

@ -0,0 +1 @@
from frostfs_testlib.cli.frostfs_cli.cli import FrostfsCli

View file

@ -0,0 +1,30 @@
from typing import Optional
from frostfs_testlib.cli.cli_command import CliCommand
from frostfs_testlib.shell import CommandResult
class FrostfsCliAccounting(CliCommand):
def balance(
self,
wallet: Optional[str] = None,
rpc_endpoint: Optional[str] = None,
address: Optional[str] = None,
owner: Optional[str] = None,
) -> CommandResult:
"""Get internal balance of FrostFS account
Args:
address: Address of wallet account.
owner: Owner of balance account (omit to use owner from private key).
rpc_endpoint: Remote node address (as 'multiaddr' or '<host>:<port>').
wallet: WIF (NEP-2) string or path to the wallet or binary key.
Returns:
Command's result.
"""
return self._execute(
"accounting balance",
**{param: value for param, value in locals().items() if param not in ["self"]},
)

View file

@ -0,0 +1,52 @@
from typing import Optional
from frostfs_testlib.cli.cli_command import CliCommand
from frostfs_testlib.shell import CommandResult
class FrostfsCliACL(CliCommand):
def extended_create(
self, cid: str, out: str, file: Optional[str] = None, rule: Optional[list] = None
) -> CommandResult:
"""Create extended ACL from the text representation.
Rule consist of these blocks: <action> <operation> [<filter1> ...] [<target1> ...]
Action is 'allow' or 'deny'.
Operation is an object service verb: 'get', 'head', 'put', 'search', 'delete', 'getrange',
or 'getrangehash'.
Filter consists of <typ>:<key><match><value>
Typ is 'obj' for object applied filter or 'req' for request applied filter.
Key is a valid unicode string corresponding to object or request header key.
Well-known system object headers start with '$Object:' prefix.
User defined headers start without prefix.
Read more about filter keys at:
http://github.com/TrueCloudLab/frostfs-api/blob/master/proto-docs/acl.md#message-eaclrecordfilter
Match is '=' for matching and '!=' for non-matching filter.
Value is a valid unicode string corresponding to object or request header value.
Target is
'user' for container owner,
'system' for Storage nodes in container and Inner Ring nodes,
'others' for all other request senders,
'pubkey:<key1>,<key2>,...' for exact request sender, where <key> is a hex-encoded 33-byte
public key.
When both '--rule' and '--file' arguments are used, '--rule' records will be placed higher
in resulting extended ACL table.
Args:
cid: Container ID.
file: Read list of extended ACL table records from from text file.
out: Save JSON formatted extended ACL table in file.
rule: Extended ACL table record to apply.
Returns:
Command's result.
"""
return self._execute(
"acl extended create",
**{param: value for param, value in locals().items() if param not in ["self"]},
)

View file

@ -0,0 +1,38 @@
from typing import Optional
from frostfs_testlib.cli.frostfs_cli.accounting import FrostfsCliAccounting
from frostfs_testlib.cli.frostfs_cli.acl import FrostfsCliACL
from frostfs_testlib.cli.frostfs_cli.container import FrostfsCliContainer
from frostfs_testlib.cli.frostfs_cli.netmap import FrostfsCliNetmap
from frostfs_testlib.cli.frostfs_cli.object import FrostfsCliObject
from frostfs_testlib.cli.frostfs_cli.session import FrostfsCliSession
from frostfs_testlib.cli.frostfs_cli.shards import FrostfsCliShards
from frostfs_testlib.cli.frostfs_cli.storagegroup import FrostfsCliStorageGroup
from frostfs_testlib.cli.frostfs_cli.util import FrostfsCliUtil
from frostfs_testlib.cli.frostfs_cli.version import FrostfsCliVersion
from frostfs_testlib.shell import Shell
class FrostfsCli:
accounting: Optional[FrostfsCliAccounting] = None
acl: Optional[FrostfsCliACL] = None
container: Optional[FrostfsCliContainer] = None
netmap: Optional[FrostfsCliNetmap] = None
object: Optional[FrostfsCliObject] = None
session: Optional[FrostfsCliSession] = None
shards: Optional[FrostfsCliShards] = None
storagegroup: Optional[FrostfsCliStorageGroup] = None
util: Optional[FrostfsCliUtil] = None
version: Optional[FrostfsCliVersion] = None
def __init__(self, shell: Shell, frostfs_cli_exec_path: str, config_file: Optional[str] = None):
self.accounting = FrostfsCliAccounting(shell, frostfs_cli_exec_path, config=config_file)
self.acl = FrostfsCliACL(shell, frostfs_cli_exec_path, config=config_file)
self.container = FrostfsCliContainer(shell, frostfs_cli_exec_path, config=config_file)
self.netmap = FrostfsCliNetmap(shell, frostfs_cli_exec_path, config=config_file)
self.object = FrostfsCliObject(shell, frostfs_cli_exec_path, config=config_file)
self.session = FrostfsCliSession(shell, frostfs_cli_exec_path, config=config_file)
self.shards = FrostfsCliShards(shell, frostfs_cli_exec_path, config=config_file)
self.storagegroup = FrostfsCliStorageGroup(shell, frostfs_cli_exec_path, config=config_file)
self.util = FrostfsCliUtil(shell, frostfs_cli_exec_path, config=config_file)
self.version = FrostfsCliVersion(shell, frostfs_cli_exec_path, config=config_file)

View file

@ -0,0 +1,264 @@
from typing import Optional
from frostfs_testlib.cli.cli_command import CliCommand
from frostfs_testlib.shell import CommandResult
class FrostfsCliContainer(CliCommand):
def create(
self,
rpc_endpoint: str,
wallet: str,
address: Optional[str] = None,
attributes: Optional[dict] = None,
basic_acl: Optional[str] = None,
await_mode: bool = False,
disable_timestamp: bool = False,
name: Optional[str] = None,
nonce: Optional[str] = None,
policy: Optional[str] = None,
session: Optional[str] = None,
subnet: Optional[str] = None,
ttl: Optional[int] = None,
xhdr: Optional[dict] = None,
timeout: Optional[str] = None,
) -> CommandResult:
"""
Create a new container and register it in the FrostFS.
It will be stored in the sidechain when the Inner Ring accepts it.
Args:
address: Address of wallet account.
attributes: Comma separated pairs of container attributes in form of
Key1=Value1,Key2=Value2.
await_mode: Block execution until container is persisted.
basic_acl: Hex encoded basic ACL value or keywords like 'public-read-write',
'private', 'eacl-public-read' (default "private").
disable_timestamp: Disable timestamp container attribute.
name: Container name attribute.
nonce: UUIDv4 nonce value for container.
policy: QL-encoded or JSON-encoded placement policy or path to file with it.
rpc_endpoint: Remote node address (as 'multiaddr' or '<host>:<port>').
session: Path to a JSON-encoded container session token.
subnet: String representation of container subnetwork.
ttl: TTL value in request meta header (default 2).
wallet: WIF (NEP-2) string or path to the wallet or binary key.
xhdr: Dict with request X-Headers.
timeout: Timeout for the operation (default 15s).
Returns:
Command's result.
"""
return self._execute(
"container create",
**{param: value for param, value in locals().items() if param not in ["self"]},
)
def delete(
self,
rpc_endpoint: str,
wallet: str,
cid: str,
address: Optional[str] = None,
await_mode: bool = False,
session: Optional[str] = None,
ttl: Optional[int] = None,
xhdr: Optional[dict] = None,
force: bool = False,
timeout: Optional[str] = None,
) -> CommandResult:
"""
Delete an existing container.
Only the owner of the container has permission to remove the container.
Args:
address: Address of wallet account.
await_mode: Block execution until container is removed.
cid: Container ID.
force: Do not check whether container contains locks and remove immediately.
rpc_endpoint: Remote node address (as 'multiaddr' or '<host>:<port>').
session: Path to a JSON-encoded container session token.
ttl: TTL value in request meta header (default 2).
wallet: WIF (NEP-2) string or path to the wallet or binary key.
xhdr: Dict with request X-Headers.
timeout: Timeout for the operation (default 15s).
Returns:
Command's result.
"""
return self._execute(
"container delete",
**{param: value for param, value in locals().items() if param not in ["self"]},
)
def get(
self,
rpc_endpoint: str,
wallet: str,
cid: str,
address: Optional[str] = None,
await_mode: bool = False,
to: Optional[str] = None,
json_mode: bool = False,
ttl: Optional[int] = None,
xhdr: Optional[dict] = None,
timeout: Optional[str] = None,
) -> CommandResult:
"""
Get container field info.
Args:
address: Address of wallet account.
await_mode: Block execution until container is removed.
cid: Container ID.
json_mode: Print or dump container in JSON format.
rpc_endpoint: Remote node address (as 'multiaddr' or '<host>:<port>').
to: Path to dump encoded container.
ttl: TTL value in request meta header (default 2).
wallet: WIF (NEP-2) string or path to the wallet or binary key.
xhdr: Dict with request X-Headers.
timeout: Timeout for the operation (default 15s).
Returns:
Command's result.
"""
return self._execute(
"container get",
**{param: value for param, value in locals().items() if param not in ["self"]},
)
def get_eacl(
self,
rpc_endpoint: str,
wallet: str,
cid: str,
address: Optional[str] = None,
await_mode: bool = False,
to: Optional[str] = None,
session: Optional[str] = None,
ttl: Optional[int] = None,
xhdr: Optional[dict] = None,
timeout: Optional[str] = None,
) -> CommandResult:
"""
Get extended ACL table of container.
Args:
address: Address of wallet account.
await_mode: Block execution until container is removed.
cid: Container ID.
rpc_endpoint: Remote node address (as 'multiaddr' or '<host>:<port>').
to: Path to dump encoded container.
session: Path to a JSON-encoded container session token.
ttl: TTL value in request meta header (default 2).
wallet: WIF (NEP-2) string or path to the wallet or binary key.
xhdr: Dict with request X-Headers.
timeout: Timeout for the operation (default 15s).
Returns:
Command's result.
"""
return self._execute(
"container get-eacl",
**{param: value for param, value in locals().items() if param not in ["self"]},
)
def list(
self,
rpc_endpoint: str,
wallet: str,
address: Optional[str] = None,
owner: Optional[str] = None,
ttl: Optional[int] = None,
xhdr: Optional[dict] = None,
timeout: Optional[str] = None,
**params,
) -> CommandResult:
"""
List all created containers.
Args:
address: Address of wallet account.
owner: Owner of containers (omit to use owner from private key).
rpc_endpoint: Remote node address (as 'multiaddr' or '<host>:<port>').
ttl: TTL value in request meta header (default 2).
wallet: WIF (NEP-2) string or path to the wallet or binary key.
xhdr: Dict with request X-Headers.
timeout: Timeout for the operation (default 15s).
Returns:
Command's result.
"""
return self._execute(
"container list",
**{param: value for param, value in locals().items() if param not in ["self"]},
)
def list_objects(
self,
rpc_endpoint: str,
wallet: str,
cid: str,
address: Optional[str] = None,
ttl: Optional[int] = None,
xhdr: Optional[dict] = None,
timeout: Optional[str] = None,
) -> CommandResult:
"""
List existing objects in container.
Args:
address: Address of wallet account.
cid: Container ID.
rpc_endpoint: Remote node address (as 'multiaddr' or '<host>:<port>').
ttl: TTL value in request meta header (default 2).
wallet: WIF (NEP-2) string or path to the wallet or binary key.
xhdr: Dict with request X-Headers.
timeout: Timeout for the operation (default 15s).
Returns:
Command's result.
"""
return self._execute(
"container list-objects",
**{param: value for param, value in locals().items() if param not in ["self"]},
)
def set_eacl(
self,
rpc_endpoint: str,
wallet: str,
cid: str,
address: Optional[str] = None,
await_mode: bool = False,
table: Optional[str] = None,
session: Optional[str] = None,
ttl: Optional[int] = None,
xhdr: Optional[dict] = None,
timeout: Optional[str] = None,
) -> CommandResult:
"""
Set a new extended ACL table for the container.
Container ID in the EACL table will be substituted with the ID from the CLI.
Args:
address: Address of wallet account.
await_mode: Block execution until container is removed.
cid: Container ID.
rpc_endpoint: Remote node address (as 'multiaddr' or '<host>:<port>').
session: Path to a JSON-encoded container session token.
table: Path to file with JSON or binary encoded EACL table.
ttl: TTL value in request meta header (default 2).
wallet: WIF (NEP-2) string or path to the wallet or binary key.
xhdr: Dict with request X-Headers.
timeout: Timeout for the operation (default 15s).
Returns:
Command's result.
"""
return self._execute(
"container set-eacl",
**{param: value for param, value in locals().items() if param not in ["self"]},
)

View file

@ -0,0 +1,120 @@
from typing import Optional
from frostfs_testlib.cli.cli_command import CliCommand
from frostfs_testlib.shell import CommandResult
class FrostfsCliNetmap(CliCommand):
def epoch(
self,
rpc_endpoint: str,
wallet: str,
address: Optional[str] = None,
generate_key: bool = False,
ttl: Optional[int] = None,
xhdr: Optional[dict] = None,
) -> CommandResult:
"""
Get current epoch number.
Args:
address: Address of wallet account.
generate_key: Generate new private key.
rpc_endpoint: Remote node address (as 'multiaddr' or '<host>:<port>').
ttl: TTL value in request meta header (default 2).
wallet: Path to the wallet or binary key.
xhdr: Dict with request X-Headers.
Returns:
Command's result.
"""
return self._execute(
"netmap epoch",
**{param: value for param, value in locals().items() if param not in ["self"]},
)
def netinfo(
self,
rpc_endpoint: str,
wallet: str,
address: Optional[str] = None,
generate_key: bool = False,
ttl: Optional[int] = None,
xhdr: Optional[dict] = None,
) -> CommandResult:
"""
Get information about FrostFS network.
Args:
address: Address of wallet account
generate_key: Generate new private key
rpc_endpoint: Remote node address (as 'multiaddr' or '<host>:<port>')
ttl: TTL value in request meta header (default 2)
wallet: Path to the wallet or binary key
xhdr: Request X-Headers in form of Key=Value
Returns:
Command's result.
"""
return self._execute(
"netmap netinfo",
**{param: value for param, value in locals().items() if param not in ["self"]},
)
def nodeinfo(
self,
rpc_endpoint: str,
wallet: str,
address: Optional[str] = None,
generate_key: bool = False,
json: bool = False,
ttl: Optional[int] = None,
xhdr: Optional[dict] = None,
) -> CommandResult:
"""
Get target node info.
Args:
address: Address of wallet account.
generate_key: Generate new private key.
json: Print node info in JSON format.
rpc_endpoint: Remote node address (as 'multiaddr' or '<host>:<port>').
ttl: TTL value in request meta header (default 2).
wallet: Path to the wallet or binary key.
xhdr: Dict with request X-Headers.
Returns:
Command's result.
"""
return self._execute(
"netmap nodeinfo",
**{param: value for param, value in locals().items() if param not in ["self"]},
)
def snapshot(
self,
rpc_endpoint: str,
wallet: str,
address: Optional[str] = None,
generate_key: bool = False,
ttl: Optional[int] = None,
xhdr: Optional[dict] = None,
) -> CommandResult:
"""
Request current local snapshot of the network map.
Args:
address: Address of wallet account.
generate_key: Generate new private key.
rpc_endpoint: Remote node address (as 'multiaddr' or '<host>:<port>').
ttl: TTL value in request meta header (default 2).
wallet: Path to the wallet or binary key.
xhdr: Dict with request X-Headers.
Returns:
Command's result.
"""
return self._execute(
"netmap snapshot",
**{param: value for param, value in locals().items() if param not in ["self"]},
)

View file

@ -0,0 +1,351 @@
from typing import Optional
from frostfs_testlib.cli.cli_command import CliCommand
from frostfs_testlib.shell import CommandResult
class FrostfsCliObject(CliCommand):
def delete(
self,
rpc_endpoint: str,
wallet: str,
cid: str,
oid: str,
address: Optional[str] = None,
bearer: Optional[str] = None,
session: Optional[str] = None,
ttl: Optional[int] = None,
xhdr: Optional[dict] = None,
timeout: Optional[str] = None,
) -> CommandResult:
"""
Delete object from FrostFS.
Args:
address: Address of wallet account.
bearer: File with signed JSON or binary encoded bearer token.
cid: Container ID.
oid: Object ID.
rpc_endpoint: Remote node address (as 'multiaddr' or '<host>:<port>').
session: Filepath to a JSON- or binary-encoded token of the object DELETE session.
ttl: TTL value in request meta header (default 2).
wallet: WIF (NEP-2) string or path to the wallet or binary key.
xhdr: Dict with request X-Headers.
timeout: Timeout for the operation (default 15s).
Returns:
Command's result.
"""
return self._execute(
"object delete",
**{param: value for param, value in locals().items() if param not in ["self"]},
)
def get(
self,
rpc_endpoint: str,
wallet: str,
cid: str,
oid: str,
address: Optional[str] = None,
bearer: Optional[str] = None,
file: Optional[str] = None,
header: Optional[str] = None,
no_progress: bool = False,
raw: bool = False,
session: Optional[str] = None,
ttl: Optional[int] = None,
xhdr: Optional[dict] = None,
timeout: Optional[str] = None,
) -> CommandResult:
"""
Get object from FrostFS.
Args:
address: Address of wallet account.
bearer: File with signed JSON or binary encoded bearer token.
cid: Container ID.
file: File to write object payload to. Default: stdout.
header: File to write header to. Default: stdout.
no_progress: Do not show progress bar.
oid: Object ID.
raw: Set raw request option.
rpc_endpoint: Remote node address (as 'multiaddr' or '<host>:<port>').
session: Filepath to a JSON- or binary-encoded token of the object GET session.
ttl: TTL value in request meta header (default 2).
wallet: WIF (NEP-2) string or path to the wallet or binary key.
xhdr: Dict with request X-Headers.
timeout: Timeout for the operation (default 15s).
Returns:
Command's result.
"""
return self._execute(
"object get",
**{param: value for param, value in locals().items() if param not in ["self"]},
)
def hash(
self,
rpc_endpoint: str,
wallet: str,
cid: str,
oid: str,
address: Optional[str] = None,
bearer: Optional[str] = None,
range: Optional[str] = None,
salt: Optional[str] = None,
ttl: Optional[int] = None,
session: Optional[str] = None,
hash_type: Optional[str] = None,
xhdr: Optional[dict] = None,
timeout: Optional[str] = None,
) -> CommandResult:
"""
Get object hash.
Args:
address: Address of wallet account.
bearer: File with signed JSON or binary encoded bearer token.
cid: Container ID.
oid: Object ID.
range: Range to take hash from in the form offset1:length1,...
rpc_endpoint: Remote node address (as 'multiaddr' or '<host>:<port>').
salt: Salt in hex format.
ttl: TTL value in request meta header (default 2).
session: Filepath to a JSON- or binary-encoded token of the object RANGEHASH session.
hash_type: Hash type. Either 'sha256' or 'tz' (default "sha256").
wallet: WIF (NEP-2) string or path to the wallet or binary key.
xhdr: Dict with request X-Headers.
timeout: Timeout for the operation (default 15s).
Returns:
Command's result.
"""
return self._execute(
"object hash",
**{
param: value for param, value in locals().items() if param not in ["self", "params"]
},
)
def head(
self,
rpc_endpoint: str,
wallet: str,
cid: str,
oid: str,
address: Optional[str] = None,
bearer: Optional[str] = None,
file: Optional[str] = None,
json_mode: bool = False,
main_only: bool = False,
proto: bool = False,
raw: bool = False,
session: Optional[str] = None,
ttl: Optional[int] = None,
xhdr: Optional[dict] = None,
timeout: Optional[str] = None,
) -> CommandResult:
"""
Get object header.
Args:
address: Address of wallet account.
bearer: File with signed JSON or binary encoded bearer token.
cid: Container ID.
file: File to write object payload to. Default: stdout.
json_mode: Marshal output in JSON.
main_only: Return only main fields.
oid: Object ID.
proto: Marshal output in Protobuf.
raw: Set raw request option.
rpc_endpoint: Remote node address (as 'multiaddr' or '<host>:<port>').
session: Filepath to a JSON- or binary-encoded token of the object HEAD session.
ttl: TTL value in request meta header (default 2).
wallet: WIF (NEP-2) string or path to the wallet or binary key.
xhdr: Dict with request X-Headers.
timeout: Timeout for the operation (default 15s).
Returns:
Command's result.
"""
return self._execute(
"object head",
**{param: value for param, value in locals().items() if param not in ["self"]},
)
def lock(
self,
rpc_endpoint: str,
wallet: str,
cid: str,
oid: str,
lifetime: Optional[int] = None,
expire_at: Optional[int] = None,
address: Optional[str] = None,
bearer: Optional[str] = None,
session: Optional[str] = None,
ttl: Optional[int] = None,
xhdr: Optional[dict] = None,
timeout: Optional[str] = None,
) -> CommandResult:
"""
Lock object in container.
Args:
address: Address of wallet account.
bearer: File with signed JSON or binary encoded bearer token.
cid: Container ID.
oid: Object ID.
lifetime: Lock lifetime.
expire_at: Lock expiration epoch.
rpc_endpoint: Remote node address (as 'multiaddr' or '<host>:<port>').
session: Filepath to a JSON- or binary-encoded token of the object PUT session.
ttl: TTL value in request meta header (default 2).
wallet: WIF (NEP-2) string or path to the wallet or binary key.
xhdr: Dict with request X-Headers.
timeout: Timeout for the operation (default 15s).
Returns:
Command's result.
"""
return self._execute(
"object lock",
**{param: value for param, value in locals().items() if param not in ["self"]},
)
def put(
self,
rpc_endpoint: str,
wallet: str,
cid: str,
file: str,
address: Optional[str] = None,
attributes: Optional[dict] = None,
bearer: Optional[str] = None,
disable_filename: bool = False,
disable_timestamp: bool = False,
expire_at: Optional[int] = None,
no_progress: bool = False,
notify: Optional[str] = None,
session: Optional[str] = None,
ttl: Optional[int] = None,
xhdr: Optional[dict] = None,
timeout: Optional[str] = None,
) -> CommandResult:
"""
Put object to FrostFS.
Args:
address: Address of wallet account.
attributes: User attributes in form of Key1=Value1,Key2=Value2.
bearer: File with signed JSON or binary encoded bearer token.
cid: Container ID.
disable_filename: Do not set well-known filename attribute.
disable_timestamp: Do not set well-known timestamp attribute.
expire_at: Last epoch in the life of the object.
file: File with object payload.
no_progress: Do not show progress bar.
notify: Object notification in the form of *epoch*:*topic*; '-'
topic means using default.
rpc_endpoint: Remote node address (as 'multiaddr' or '<host>:<port>').
session: Filepath to a JSON- or binary-encoded token of the object PUT session.
ttl: TTL value in request meta header (default 2).
wallet: WIF (NEP-2) string or path to the wallet or binary key.
xhdr: Dict with request X-Headers.
timeout: Timeout for the operation (default 15s).
Returns:
Command's result.
"""
return self._execute(
"object put",
**{param: value for param, value in locals().items() if param not in ["self"]},
)
def range(
self,
rpc_endpoint: str,
wallet: str,
cid: str,
oid: str,
range: str,
address: Optional[str] = None,
bearer: Optional[str] = None,
file: Optional[str] = None,
json_mode: bool = False,
raw: bool = False,
session: Optional[str] = None,
ttl: Optional[int] = None,
xhdr: Optional[dict] = None,
timeout: Optional[str] = None,
) -> CommandResult:
"""
Get payload range data of an object.
Args:
address: Address of wallet account.
bearer: File with signed JSON or binary encoded bearer token.
cid: Container ID.
file: File to write object payload to. Default: stdout.
json_mode: Marshal output in JSON.
oid: Object ID.
range: Range to take data from in the form offset:length.
raw: Set raw request option.
rpc_endpoint: Remote node address (as 'multiaddr' or '<host>:<port>').
session: Filepath to a JSON- or binary-encoded token of the object RANGE session.
ttl: TTL value in request meta header (default 2).
wallet: WIF (NEP-2) string or path to the wallet or binary key.
xhdr: Dict with request X-Headers.
timeout: Timeout for the operation (default 15s).
Returns:
Command's result.
"""
return self._execute(
"object range",
**{param: value for param, value in locals().items() if param not in ["self"]},
)
def search(
self,
rpc_endpoint: str,
wallet: str,
cid: str,
address: Optional[str] = None,
bearer: Optional[str] = None,
filters: Optional[list] = None,
oid: Optional[str] = None,
phy: bool = False,
root: bool = False,
session: Optional[str] = None,
ttl: Optional[int] = None,
xhdr: Optional[dict] = None,
timeout: Optional[str] = None,
) -> CommandResult:
"""
Search object.
Args:
address: Address of wallet account.
bearer: File with signed JSON or binary encoded bearer token.
cid: Container ID.
filters: Repeated filter expressions or files with protobuf JSON.
oid: Object ID.
phy: Search physically stored objects.
root: Search for user objects.
rpc_endpoint: Remote node address (as 'multiaddr' or '<host>:<port>').
session: Filepath to a JSON- or binary-encoded token of the object SEARCH session.
ttl: TTL value in request meta header (default 2).
wallet: WIF (NEP-2) string or path to the wallet or binary key.
xhdr: Dict with request X-Headers.
timeout: Timeout for the operation (default 15s).
Returns:
Command's result.
"""
return self._execute(
"object search",
**{param: value for param, value in locals().items() if param not in ["self"]},
)

View file

@ -0,0 +1,41 @@
from typing import Optional
from frostfs_testlib.cli.cli_command import CliCommand
from frostfs_testlib.shell import CommandResult
class FrostfsCliSession(CliCommand):
def create(
self,
rpc_endpoint: str,
wallet: str,
wallet_password: str,
out: str,
lifetime: Optional[int] = None,
address: Optional[str] = None,
json: Optional[bool] = False,
) -> CommandResult:
"""
Create session token.
Args:
address: Address of wallet account.
out: File to write session token to.
lifetime: Number of epochs for token to stay valid.
json: Output token in JSON.
wallet: WIF (NEP-2) string or path to the wallet or binary key.
wallet_password: Wallet password.
rpc_endpoint: Remote node address (as 'multiaddr' or '<host>:<port>').
Returns:
Command's result.
"""
return self._execute_with_password(
"session create",
wallet_password,
**{
param: value
for param, value in locals().items()
if param not in ["self", "wallet_password"]
},
)

View file

@ -0,0 +1,138 @@
from typing import Optional
from frostfs_testlib.cli.cli_command import CliCommand
from frostfs_testlib.shell import CommandResult
class FrostfsCliShards(CliCommand):
def flush_cache(
self,
endpoint: str,
wallet: str,
wallet_password: str,
id: Optional[list[str]],
address: Optional[str] = None,
all: bool = False,
) -> CommandResult:
"""
Flush objects from the write-cache to the main storage.
Args:
address: Address of wallet account.
id: List of shard IDs in base58 encoding.
all: Process all shards.
endpoint: Remote node address (as 'multiaddr' or '<host>:<port>').
wallet: WIF (NEP-2) string or path to the wallet or binary key.
wallet_password: Wallet password.
Returns:
Command's result.
"""
return self._execute_with_password(
f"control shards flush-cache",
wallet_password,
**{param: value for param, value in locals().items() if param not in ["self"]},
)
def set_mode(
self,
endpoint: str,
wallet: str,
wallet_password: str,
mode: str,
id: Optional[list[str]],
address: Optional[str] = None,
all: bool = False,
clear_errors: bool = False,
) -> CommandResult:
"""
Set work mode of the shard.
Args:
address: Address of wallet account.
id: List of shard IDs in base58 encoding.
mode: New shard mode ('degraded-read-only', 'read-only', 'read-write').
all: Process all shards.
clear_errors: Set shard error count to 0.
endpoint: Remote node address (as 'multiaddr' or '<host>:<port>').
wallet: WIF (NEP-2) string or path to the wallet or binary key.
wallet_password: Wallet password.
Returns:
Command's result.
"""
return self._execute_with_password(
f"control shards set-mode",
wallet_password,
**{
param: value
for param, value in locals().items()
if param not in ["self", "wallet_password"]
},
)
def dump(
self,
endpoint: str,
wallet: str,
wallet_password: str,
id: str,
path: str,
address: Optional[str] = None,
no_errors: bool = False,
) -> CommandResult:
"""
Dump objects from shard to a file.
Args:
address: Address of wallet account.
no_errors: Skip invalid/unreadable objects.
id: Shard ID in base58 encoding.
path: File to write objects to.
endpoint: Remote node address (as 'multiaddr' or '<host>:<port>').
wallet: WIF (NEP-2) string or path to the wallet or binary key.
wallet_password: Wallet password.
Returns:
Command's result.
"""
return self._execute_with_password(
f"control shards dump",
wallet_password,
**{
param: value
for param, value in locals().items()
if param not in ["self", "wallet_password"]
},
)
def list(
self,
endpoint: str,
wallet: str,
wallet_password: str,
address: Optional[str] = None,
json_mode: bool = False,
) -> CommandResult:
"""
List shards of the storage node.
Args:
address: Address of wallet account.
json_mode: Print shard info as a JSON array.
endpoint: Remote node address (as 'multiaddr' or '<host>:<port>').
wallet: WIF (NEP-2) string or path to the wallet or binary key.
wallet_password: Wallet password.
Returns:
Command's result.
"""
return self._execute_with_password(
f"control shards list",
wallet_password,
**{
param: value
for param, value in locals().items()
if param not in ["self", "wallet_password"]
},
)

View file

@ -0,0 +1,147 @@
from typing import Optional
from frostfs_testlib.cli.cli_command import CliCommand
from frostfs_testlib.shell import CommandResult
class FrostfsCliStorageGroup(CliCommand):
def put(
self,
rpc_endpoint: str,
wallet: str,
cid: str,
members: list[str],
ttl: Optional[int] = None,
bearer: Optional[str] = None,
lifetime: Optional[int] = None,
address: Optional[str] = None,
xhdr: Optional[dict] = None,
) -> CommandResult:
"""
Put storage group to FrostFS.
Args:
address: Address of wallet account.
bearer: File with signed JSON or binary encoded bearer token.
cid: Container ID.
members: ID list of storage group members.
lifetime: Storage group lifetime in epochs.
rpc_endpoint: Remote node address (as 'multiaddr' or '<host>:<port>').
ttl: TTL value in request meta header.
wallet: WIF (NEP-2) string or path to the wallet or binary key.
xhdr: Dict with request X-Headers.
Returns:
Command's result.
"""
members = ",".join(members)
return self._execute(
"storagegroup put",
**{param: value for param, value in locals().items() if param not in ["self"]},
)
def get(
self,
rpc_endpoint: str,
wallet: str,
cid: str,
id: str,
raw: Optional[bool] = False,
ttl: Optional[int] = None,
bearer: Optional[str] = None,
lifetime: Optional[int] = None,
address: Optional[str] = None,
xhdr: Optional[dict] = None,
) -> CommandResult:
"""
Get storage group from FrostFS.
Args:
address: Address of wallet account.
bearer: File with signed JSON or binary encoded bearer token.
cid: Container ID.
id: Storage group identifier.
raw: Set raw request option.
lifetime: Storage group lifetime in epochs.
rpc_endpoint: Remote node address (as 'multiaddr' or '<host>:<port>').
ttl: TTL value in request meta header.
wallet: WIF (NEP-2) string or path to the wallet or binary key.
xhdr: Dict with request X-Headers.
Returns:
Command's result.
"""
return self._execute(
"storagegroup get",
**{param: value for param, value in locals().items() if param not in ["self"]},
)
def list(
self,
rpc_endpoint: str,
wallet: str,
cid: str,
raw: Optional[bool] = False,
ttl: Optional[int] = None,
bearer: Optional[str] = None,
lifetime: Optional[int] = None,
address: Optional[str] = None,
xhdr: Optional[dict] = None,
) -> CommandResult:
"""
List storage groups in FrostFS container.
Args:
address: Address of wallet account.
bearer: File with signed JSON or binary encoded bearer token.
cid: Container ID.
raw: Set raw request option.
lifetime: Storage group lifetime in epochs.
rpc_endpoint: Remote node address (as 'multiaddr' or '<host>:<port>').
ttl: TTL value in request meta header.
wallet: WIF (NEP-2) string or path to the wallet or binary key.
xhdr: Dict with request X-Headers.
Returns:
Command's result.
"""
return self._execute(
"storagegroup list",
**{param: value for param, value in locals().items() if param not in ["self"]},
)
def delete(
self,
rpc_endpoint: str,
wallet: str,
cid: str,
id: str,
raw: Optional[bool] = False,
ttl: Optional[int] = None,
bearer: Optional[str] = None,
lifetime: Optional[int] = None,
address: Optional[str] = None,
xhdr: Optional[dict] = None,
) -> CommandResult:
"""
Delete storage group from FrostFS.
Args:
address: Address of wallet account.
bearer: File with signed JSON or binary encoded bearer token.
cid: Container ID.
id: Storage group identifier.
raw: Set raw request option.
lifetime: Storage group lifetime in epochs.
rpc_endpoint: Remote node address (as 'multiaddr' or '<host>:<port>').
ttl: TTL value in request meta header.
wallet: WIF (NEP-2) string or path to the wallet or binary key.
xhdr: Dict with request X-Headers.
Returns:
Command's result.
"""
return self._execute(
"storagegroup delete",
**{param: value for param, value in locals().items() if param not in ["self"]},
)

View file

@ -0,0 +1,56 @@
from typing import Optional
from frostfs_testlib.cli.cli_command import CliCommand
from frostfs_testlib.shell import CommandResult
class FrostfsCliUtil(CliCommand):
def sign_bearer_token(
self,
wallet: str,
from_file: str,
to_file: str,
address: Optional[str] = None,
json: Optional[bool] = False,
) -> CommandResult:
"""
Sign bearer token to use it in requests.
Args:
address: Address of wallet account.
from_file: File with JSON or binary encoded bearer token to sign.
to_file: File to dump signed bearer token (default: binary encoded).
json: Dump bearer token in JSON encoding.
wallet: WIF (NEP-2) string or path to the wallet or binary key.
Returns:
Command's result.
"""
return self._execute(
"util sign bearer-token",
**{param: value for param, value in locals().items() if param not in ["self"]},
)
def sign_session_token(
self,
wallet: str,
from_file: str,
to_file: str,
address: Optional[str] = None,
) -> CommandResult:
"""
Sign session token to use it in requests.
Args:
address: Address of wallet account.
from_file: File with JSON encoded session token to sign.
to_file: File to dump signed bearer token (default: binary encoded).
wallet: WIF (NEP-2) string or path to the wallet or binary key.
Returns:
Command's result.
"""
return self._execute(
"util sign session-token",
**{param: value for param, value in locals().items() if param not in ["self"]},
)

View file

@ -0,0 +1,13 @@
from frostfs_testlib.cli.cli_command import CliCommand
from frostfs_testlib.shell import CommandResult
class FrostfsCliVersion(CliCommand):
def get(self) -> CommandResult:
"""
Application version and FrostFS API compatibility.
Returns:
Command's result.
"""
return self._execute("", version=True)

View file

@ -0,0 +1,2 @@
from frostfs_testlib.cli.neogo.go import NeoGo
from frostfs_testlib.cli.neogo.network_type import NetworkType

View file

@ -0,0 +1,134 @@
from typing import Optional
from frostfs_testlib.cli.cli_command import CliCommand
from frostfs_testlib.shell import CommandResult
class NeoGoCandidate(CliCommand):
def register(
self,
address: str,
rpc_endpoint: str,
wallet: Optional[str] = None,
wallet_config: Optional[str] = None,
wallet_password: Optional[str] = None,
gas: Optional[float] = None,
timeout: int = 10,
) -> CommandResult:
"""Register as a new candidate.
Args:
address: Address to register.
wallet: Target location of the wallet file ('-' to read from stdin);
conflicts with --wallet-config flag.
wallet_config: Target location of the wallet config file; conflicts with --wallet flag.
wallet_password: Wallet password.
gas: Network fee to add to the transaction (prioritizing it).
rpc_endpoint: RPC node address.
timeout: Timeout for the operation (default: 10s).
Returns:
Command's result.
"""
assert bool(wallet) ^ bool(wallet_config), self.WALLET_SOURCE_ERROR_MSG
exec_param = {
param: param_value
for param, param_value in locals().items()
if param not in ["self", "wallet_password"]
}
exec_param["timeout"] = f"{timeout}s"
if wallet_password is not None:
return self._execute_with_password(
"wallet candidate register", wallet_password, **exec_param
)
if wallet_config:
return self._execute("wallet candidate register", **exec_param)
raise Exception(self.WALLET_PASSWD_ERROR_MSG)
def unregister(
self,
address: str,
rpc_endpoint: str,
wallet: Optional[str] = None,
wallet_config: Optional[str] = None,
wallet_password: Optional[str] = None,
gas: Optional[float] = None,
timeout: int = 10,
) -> CommandResult:
"""Unregister self as a candidate.
Args:
address: Address to unregister.
wallet: Target location of the wallet file ('-' to read from stdin);
conflicts with --wallet-config flag.
wallet_config: Target location of the wallet config file; conflicts with --wallet flag.
wallet_password: Wallet password.
gas: Network fee to add to the transaction (prioritizing it).
rpc_endpoint: RPC node address.
timeout: Timeout for the operation (default: 10s).
Returns:
Command's result.
"""
assert bool(wallet) ^ bool(wallet_config), self.WALLET_SOURCE_ERROR_MSG
exec_param = {
param: param_value
for param, param_value in locals().items()
if param not in ["self", "wallet_password"]
}
exec_param["timeout"] = f"{timeout}s"
if wallet_password is not None:
return self._execute_with_password(
"wallet candidate unregister", wallet_password, **exec_param
)
if wallet_config:
return self._execute("wallet candidate unregister", **exec_param)
raise Exception(self.WALLET_PASSWD_ERROR_MSG)
def vote(
self,
address: str,
candidate: str,
rpc_endpoint: str,
wallet: Optional[str] = None,
wallet_config: Optional[str] = None,
wallet_password: Optional[str] = None,
gas: Optional[float] = None,
timeout: int = 10,
) -> CommandResult:
"""Votes for a validator.
Voting happens by calling "vote" method of a NEO native contract. Do not provide
candidate argument to perform unvoting.
Args:
address: Address to vote from
candidate: Public key of candidate to vote for.
wallet: Target location of the wallet file ('-' to read from stdin);
conflicts with --wallet-config flag.
wallet_config: Target location of the wallet config file; conflicts with --wallet flag.
wallet_password: Wallet password.
gas: Network fee to add to the transaction (prioritizing it).
rpc_endpoint: RPC node address.
timeout: Timeout for the operation (default: 10s).
Returns:
Command's result.
"""
assert bool(wallet) ^ bool(wallet_config), self.WALLET_SOURCE_ERROR_MSG
exec_param = {
param: param_value
for param, param_value in locals().items()
if param not in ["self", "wallet_password"]
}
exec_param["timeout"] = f"{timeout}s"
if wallet_password is not None:
return self._execute_with_password(
"wallet candidate vote", wallet_password, **exec_param
)
if wallet_config:
return self._execute("wallet candidate vote", **exec_param)
raise Exception(self.WALLET_PASSWD_ERROR_MSG)

View file

@ -0,0 +1,398 @@
from typing import Optional
from frostfs_testlib.cli.cli_command import CliCommand
from frostfs_testlib.shell import CommandResult
class NeoGoContract(CliCommand):
def compile(
self,
input_file: str,
out: str,
manifest: str,
config: str,
no_standards: bool = False,
no_events: bool = False,
no_permissions: bool = False,
bindings: Optional[str] = None,
) -> CommandResult:
"""Compile a smart contract to a .nef file.
Args:
input_file: Input file for the smart contract to be compiled.
out: Output of the compiled contract.
manifest: Emit contract manifest (*.manifest.json) file into separate file using
configuration input file (*.yml).
config: Configuration input file (*.yml).
no_standards: Do not check compliance with supported standards.
no_events: Do not check emitted events with the manifest.
no_permissions: Do not check if invoked contracts are allowed in manifest.
bindings: Output file for smart-contract bindings configuration.
Returns:
Command's result.
"""
return self._execute(
"contract compile",
**{
param: param_value
for param, param_value in locals().items()
if param not in ["self"]
},
)
def deploy(
self,
address: str,
input_file: str,
manifest: str,
rpc_endpoint: str,
sysgas: Optional[float] = None,
wallet: Optional[str] = None,
wallet_config: Optional[str] = None,
wallet_password: Optional[str] = None,
gas: Optional[float] = None,
out: Optional[str] = None,
force: bool = False,
timeout: int = 10,
) -> CommandResult:
"""Deploy a smart contract (.nef with description)
Args:
wallet: Wallet to use to get the key for transaction signing;
conflicts with wallet_config.
wallet_config: Path to wallet config to use to get the key for transaction signing;
conflicts with wallet.
wallet_password: Wallet password.
address: Address to use as transaction signee (and gas source).
gas: Network fee to add to the transaction (prioritizing it).
sysgas: System fee to add to transaction (compensating for execution).
out: File to put JSON transaction to.
force: Do not ask for a confirmation.
rpc_endpoint: RPC node address.
timeout: Timeout for the operation (default: 10s).
input_file: Input file for the smart contract (*.nef).
manifest: Emit contract manifest (*.manifest.json) file into separate file using
configuration input file (*.yml).
Returns:
Command's result.
"""
assert bool(wallet) ^ bool(wallet_config), self.WALLET_SOURCE_ERROR_MSG
exec_param = {
param: param_value
for param, param_value in locals().items()
if param not in ["self", "wallet_password"]
}
exec_param["timeout"] = f"{timeout}s"
if wallet_password is not None:
return self._execute_with_password(
"contract deploy",
wallet_password,
**exec_param,
)
if wallet_config:
return self._execute(
"contract deploy",
**exec_param,
)
raise Exception(self.WALLET_PASSWD_ERROR_MSG)
def generate_wrapper(
self,
out: str,
hash: str,
config: Optional[str] = None,
manifest: Optional[str] = None,
) -> CommandResult:
"""Generate wrapper to use in other contracts.
Args:
config: Configuration file to use.
manifest: Read contract manifest (*.manifest.json) file.
out: Output of the compiled contract.
hash: Smart-contract hash.
Returns:
Command's result.
"""
return self._execute(
"contract generate-wrapper",
**{
param: param_value
for param, param_value in locals().items()
if param not in ["self"]
},
)
def invokefunction(
self,
scripthash: str,
address: Optional[str] = None,
wallet: Optional[str] = None,
method: Optional[str] = None,
arguments: Optional[str] = None,
multisig_hash: Optional[str] = None,
wallet_config: Optional[str] = None,
wallet_password: Optional[str] = None,
gas: Optional[float] = None,
sysgas: Optional[float] = None,
out: Optional[str] = None,
force: bool = False,
rpc_endpoint: Optional[str] = None,
timeout: int = 10,
) -> CommandResult:
"""Executes given (as a script hash) deployed script.
Script is executed with the given method, arguments and signers. Sender is included in
the list of signers by default with None witness scope. If you'd like to change default
sender's scope, specify it via signers parameter. See testinvokefunction documentation
for the details about parameters. It differs from testinvokefunction in that this command
sends an invocation transaction to the network.
Args:
scripthash: Function hash.
method: Call method.
arguments: Method arguments.
multisig_hash: Multisig hash.
wallet: Wallet to use to get the key for transaction signing;
conflicts with wallet_config.
wallet_config: Path to wallet config to use to get the key for transaction signing;
conflicts with wallet.
wallet_password: Wallet password.
address: Address to use as transaction signee (and gas source).
gas: Network fee to add to the transaction (prioritizing it).
sysgas: System fee to add to transaction (compensating for execution).
out: File to put JSON transaction to.
force: Force-push the transaction in case of bad VM state after test script invocation.
rpc_endpoint: RPC node address.
timeout: Timeout for the operation (default: 10s).
Returns:
Command's result.
"""
assert bool(wallet) ^ bool(wallet_config), self.WALLET_SOURCE_ERROR_MSG
multisig_hash = f"-- {multisig_hash}" or ""
post_data = f"{scripthash} {method or ''} {arguments or ''} {multisig_hash}"
exec_param = {
param: param_value
for param, param_value in locals().items()
if param
not in [
"self",
"scripthash",
"method",
"arguments",
"multisig_hash",
"wallet_password",
]
}
exec_param["timeout"] = f"{timeout}s"
exec_param["post_data"] = post_data
if wallet_password is not None:
return self._execute_with_password(
"contract invokefunction", wallet_password, **exec_param
)
if wallet_config:
return self._execute("contract invokefunction", **exec_param)
raise Exception(self.WALLET_PASSWD_ERROR_MSG)
def testinvokefunction(
self,
scripthash: str,
wallet: Optional[str] = None,
wallet_password: Optional[str] = None,
method: Optional[str] = None,
arguments: Optional[str] = None,
multisig_hash: Optional[str] = None,
rpc_endpoint: Optional[str] = None,
timeout: int = 10,
) -> CommandResult:
"""Executes given (as a script hash) deployed script.
Script is executed with the given method, arguments and signers (sender is not included
by default). If no method is given "" is passed to the script, if no arguments are given,
an empty array is passed, if no signers are given no array is passed. If signers are
specified, the first one of them is treated as a sender. All of the given arguments are
encapsulated into array before invoking the script. The script thus should follow the
regular convention of smart contract arguments (method string and an array of other
arguments).
See more information and samples in `neo-go contract testinvokefunction --help`.
Args:
scripthash: Function hash.
wallet: Wallet to use for testinvoke.
wallet_password: Wallet password.
method: Call method.
arguments: Method arguments.
multisig_hash: Multisig hash.
rpc_endpoint: RPC node address.
timeout: Timeout for the operation (default: 10s).
Returns:
Command's result.
"""
multisig_hash = f"-- {multisig_hash}" if multisig_hash else ""
post_data = f"{scripthash} {method or ''} {arguments or ''} {multisig_hash}"
exec_param = {
param: param_value
for param, param_value in locals().items()
if param
not in [
"self",
"scripthash",
"method",
"arguments",
"multisig_hash",
"wallet_password",
]
}
exec_param["timeout"] = f"{timeout}s"
exec_param["post_data"] = post_data
if wallet_password is not None:
return self._execute_with_password(
"contract testinvokefunction", wallet_password, **exec_param
)
return self._execute("contract testinvokefunction", **exec_param)
def testinvokescript(
self,
input_file: str,
rpc_endpoint: Optional[str] = None,
timeout: int = 10,
) -> CommandResult:
"""Executes given compiled AVM instructions in NEF format.
Instructions are executed with the given set of signers not including sender by default.
See testinvokefunction documentation for the details about parameters.
Args:
input_file: Input location of the .nef file that needs to be invoked.
rpc_endpoint: RPC node address.
timeout: Timeout for the operation (default: 10s).
Returns:
Command's result.
"""
exec_param = {
param: param_value for param, param_value in locals().items() if param not in ["self"]
}
exec_param["timeout"] = f"{timeout}s"
return self._execute(
"contract testinvokescript",
**exec_param,
)
def init(self, name: str, skip_details: bool = False) -> CommandResult:
"""Initialize a new smart-contract in a directory with boiler plate code.
Args:
name: Name of the smart-contract to be initialized.
skip_details: Skip filling in the projects and contract details.
Returns:
Command's result.
"""
return self._execute(
"contract init",
**{
param: param_value
for param, param_value in locals().items()
if param not in ["self"]
},
)
def inspect(
self,
input_file: Optional[str] = None,
compile: Optional[str] = None,
) -> CommandResult:
"""Creates a user readable dump of the program instructions.
Args:
input_file: Input file of the program (either .go or .nef).
compile: Compile input file (it should be go code then).
Returns:
Command's result.
"""
return self._execute(
"contract inspect",
**{
param: param_value
for param, param_value in locals().items()
if param not in ["self"]
},
)
def calc_hash(
self,
input_file: str,
manifest: str,
sender: Optional[str] = None,
) -> CommandResult:
"""Calculates hash of a contract after deployment.
Args:
input_file: Path to NEF file.
sender: Sender script hash or address.
manifest: Path to manifest file.
Returns:
Command's result.
"""
return self._execute(
"contract calc-hash",
**{
param: param_value
for param, param_value in locals().items()
if param not in ["self"]
},
)
def add_group(
self,
manifest: str,
address: str,
wallet: Optional[str] = None,
wallet_config: Optional[str] = None,
wallet_password: Optional[str] = None,
sender: Optional[str] = None,
nef: Optional[str] = None,
) -> CommandResult:
"""Adds group to the manifest.
Args:
wallet: Wallet to use to get the key for transaction signing;
conflicts with wallet_config.
wallet_config: Path to wallet config to use to get the key for transaction signing;
conflicts with wallet.
wallet_password: Wallet password.
sender: Deploy transaction sender.
address: Account to sign group with.
nef: Path to the NEF file.
manifest: Path to the manifest.
Returns:
Command's result.
"""
assert bool(wallet) ^ bool(wallet_config), self.WALLET_SOURCE_ERROR_MSG
exec_param = {
param: param_value
for param, param_value in locals().items()
if param not in ["self", "wallet_password"]
}
if wallet_password is not None:
return self._execute_with_password(
"contract manifest add-group", wallet_password, **exec_param
)
if wallet_config:
return self._execute("contract manifest add-group", **exec_param)
raise Exception(self.WALLET_PASSWD_ERROR_MSG)

View file

@ -0,0 +1,69 @@
from typing import Optional
from frostfs_testlib.cli.cli_command import CliCommand
from frostfs_testlib.cli.neogo.network_type import NetworkType
from frostfs_testlib.shell import CommandResult
class NeoGoDb(CliCommand):
def dump(
self,
config_path: str,
out: str,
network: NetworkType = NetworkType.PRIVATE,
count: int = 0,
start: int = 0,
) -> CommandResult:
"""Dump blocks (starting with block #1) to the file.
Args:
config_path: Path to config.
network: Select network type (default: private).
count: Number of blocks to be processed (default or 0: all chain) (default: 0).
start: Block number to start from (default: 0) (default: 0).
out: Output file (stdout if not given).
Returns:
Command's result.
"""
return self._execute(
"db dump",
**{network.value: True},
**{
param: param_value
for param, param_value in locals().items()
if param not in ["self"]
},
)
def restore(
self,
config_path: str,
input_file: str,
network: NetworkType = NetworkType.PRIVATE,
count: int = 0,
dump: Optional[str] = None,
incremental: bool = False,
) -> CommandResult:
"""Dump blocks (starting with block #1) to the file.
Args:
config_path: Path to config.
network: Select network type (default: private).
count: Number of blocks to be processed (default or 0: all chain) (default: 0).
input_file: Input file (stdin if not given).
dump: Directory for storing JSON dumps.
incremental: Use if dump is incremental.
Returns:
Command's result.
"""
return self._execute(
"db restore",
**{network.value: True},
**{
param: param_value
for param, param_value in locals().items()
if param not in ["self"]
},
)

View file

@ -0,0 +1,37 @@
from typing import Optional
from frostfs_testlib.cli.neogo.candidate import NeoGoCandidate
from frostfs_testlib.cli.neogo.contract import NeoGoContract
from frostfs_testlib.cli.neogo.db import NeoGoDb
from frostfs_testlib.cli.neogo.nep17 import NeoGoNep17
from frostfs_testlib.cli.neogo.node import NeoGoNode
from frostfs_testlib.cli.neogo.query import NeoGoQuery
from frostfs_testlib.cli.neogo.version import NeoGoVersion
from frostfs_testlib.cli.neogo.wallet import NeoGoWallet
from frostfs_testlib.shell import Shell
class NeoGo:
candidate: Optional[NeoGoCandidate] = None
contract: Optional[NeoGoContract] = None
db: Optional[NeoGoDb] = None
nep17: Optional[NeoGoNep17] = None
node: Optional[NeoGoNode] = None
query: Optional[NeoGoQuery] = None
version: Optional[NeoGoVersion] = None
wallet: Optional[NeoGoWallet] = None
def __init__(
self,
shell: Shell,
neo_go_exec_path: str,
config_path: Optional[str] = None,
):
self.candidate = NeoGoCandidate(shell, neo_go_exec_path, config_path=config_path)
self.contract = NeoGoContract(shell, neo_go_exec_path, config_path=config_path)
self.db = NeoGoDb(shell, neo_go_exec_path, config_path=config_path)
self.nep17 = NeoGoNep17(shell, neo_go_exec_path, config_path=config_path)
self.node = NeoGoNode(shell, neo_go_exec_path, config_path=config_path)
self.query = NeoGoQuery(shell, neo_go_exec_path, config_path=config_path)
self.version = NeoGoVersion(shell, neo_go_exec_path, config_path=config_path)
self.wallet = NeoGoWallet(shell, neo_go_exec_path, config_path=config_path)

View file

@ -0,0 +1,240 @@
from typing import Optional
from frostfs_testlib.cli.cli_command import CliCommand
from frostfs_testlib.shell import CommandResult
class NeoGoNep17(CliCommand):
def balance(
self,
address: str,
token: str,
rpc_endpoint: str,
wallet: Optional[str] = None,
wallet_config: Optional[str] = None,
timeout: int = 10,
) -> CommandResult:
"""Get address balance.
Args:
address: Address to use.
wallet: Target location of the wallet file ('-' to read from stdin);
conflicts with --wallet-config flag.
wallet_config: Target location of the wallet config file; conflicts with --wallet flag.
token: Token to use (hash or name (for NEO/GAS or imported tokens)).
rpc_endpoint: RPC node address.
timeout: Timeout for the operation (default: 10s).
Returns:
Command's result.
"""
assert bool(wallet) ^ bool(wallet_config), self.WALLET_SOURCE_ERROR_MSG
exec_param = {
param: param_value for param, param_value in locals().items() if param not in ["self"]
}
exec_param["timeout"] = f"{timeout}s"
return self._execute(
"wallet nep17 balance",
**exec_param,
)
def import_token(
self,
address: str,
wallet: Optional[str] = None,
wallet_config: Optional[str] = None,
token: Optional[str] = None,
rpc_endpoint: Optional[str] = None,
timeout: int = 10,
) -> CommandResult:
"""Import NEP-17 token to a wallet.
Args:
address: Token contract address or hash in LE.
wallet: Target location of the wallet file ('-' to read from stdin);
conflicts with --wallet-config flag.
wallet_config: Target location of the wallet config file; conflicts with --wallet flag.
token: Token to use (hash or name (for NEO/GAS or imported tokens)).
rpc_endpoint: RPC node address.
timeout: Timeout for the operation (default: 10s).
Returns:
Command's result.
"""
assert bool(wallet) ^ bool(wallet_config), self.WALLET_SOURCE_ERROR_MSG
exec_param = {
param: param_value for param, param_value in locals().items() if param not in ["self"]
}
exec_param["timeout"] = f"{timeout}s"
return self._execute(
"wallet nep17 import",
**exec_param,
)
def info(
self,
token: str,
wallet: Optional[str] = None,
wallet_config: Optional[str] = None,
) -> CommandResult:
"""Print imported NEP-17 token info.
Args:
wallet: Target location of the wallet file ('-' to read from stdin);
conflicts with --wallet-config flag.
wallet_config: Target location of the wallet config file; conflicts with --wallet flag.
token: Token to use (hash or name (for NEO/GAS or imported tokens)).
Returns:
Command's result.
"""
assert bool(wallet) ^ bool(wallet_config), self.WALLET_SOURCE_ERROR_MSG
return self._execute(
"wallet nep17 info",
**{
param: param_value
for param, param_value in locals().items()
if param not in ["self"]
},
)
def remove(
self,
token: str,
wallet: Optional[str] = None,
wallet_config: Optional[str] = None,
force: bool = False,
) -> CommandResult:
"""Remove NEP-17 token from the wallet.
Args:
wallet: Target location of the wallet file ('-' to read from stdin);
conflicts with --wallet-config flag.
wallet_config: Target location of the wallet config file; conflicts with --wallet flag.
token: Token to use (hash or name (for NEO/GAS or imported tokens)).
force: Do not ask for a confirmation.
Returns:
Command's result.
"""
return self._execute(
"wallet nep17 remove",
**{
param: param_value
for param, param_value in locals().items()
if param not in ["self"]
},
)
def transfer(
self,
token: str,
to_address: str,
rpc_endpoint: str,
sysgas: Optional[float] = None,
wallet: Optional[str] = None,
wallet_config: Optional[str] = None,
wallet_password: Optional[str] = None,
out: Optional[str] = None,
from_address: Optional[str] = None,
force: bool = False,
gas: Optional[float] = None,
amount: float = 0,
timeout: int = 10,
) -> CommandResult:
"""Transfers specified NEP-17 token amount.
Transfer is executed with optional 'data' parameter and cosigners list attached to the
transfer. See 'contract testinvokefunction' documentation for the details about 'data'
parameter and cosigners syntax. If no 'data' is given then default nil value will be used.
If no cosigners are given then the sender with CalledByEntry scope will be used as the only
signer.
Args:
wallet: Target location of the wallet file ('-' to read from stdin);
conflicts with --wallet-config flag.
wallet_config: Target location of the wallet config file; conflicts with --wallet flag.
wallet_password: Wallet password.
out: File to put JSON transaction to.
from_address: Address to send an asset from.
to_address: Address to send an asset to.
token: Token to use (hash or name (for NEO/GAS or imported tokens)).
force: Do not ask for a confirmation.
gas: Network fee to add to the transaction (prioritizing it).
sysgas: System fee to add to transaction (compensating for execution).
force: Do not ask for a confirmation.
amount: Amount of asset to send.
rpc_endpoint: RPC node address.
timeout: Timeout for the operation (default: 10s).
Returns:
Command's result.
"""
assert bool(wallet) ^ bool(wallet_config), self.WALLET_SOURCE_ERROR_MSG
exec_param = {
param: param_value
for param, param_value in locals().items()
if param not in ["self", "wallet_password"]
}
exec_param["timeout"] = f"{timeout}s"
if wallet_password is not None:
return self._execute_with_password(
"wallet nep17 transfer",
wallet_password,
**exec_param,
)
if wallet_config:
return self._execute(
"wallet nep17 transfer",
**exec_param,
)
raise Exception(self.WALLET_PASSWD_ERROR_MSG)
def multitransfer(
self,
token: str,
to_address: list[str],
sysgas: float,
rpc_endpoint: str,
wallet: Optional[str] = None,
wallet_config: Optional[str] = None,
out: Optional[str] = None,
from_address: Optional[str] = None,
force: bool = False,
gas: Optional[float] = None,
amount: float = 0,
timeout: int = 10,
) -> CommandResult:
"""Transfer NEP-17 tokens to multiple recipients.
Args:
wallet: Target location of the wallet file ('-' to read from stdin);
conflicts with --wallet-config flag.
wallet_config: Target location of the wallet config file; conflicts with --wallet flag.
out: File to put JSON transaction to.
from_address: Address to send an asset from.
to_address: Address to send an asset to.
token: Token to use (hash or name (for NEO/GAS or imported tokens)).
force: Do not ask for a confirmation.
gas: Network fee to add to the transaction (prioritizing it).
sysgas: System fee to add to transaction (compensating for execution).
force: Do not ask for a confirmation.
amount: Amount of asset to send.
rpc_endpoint: RPC node address.
timeout: Timeout for the operation (default: 10s).
Returns:
Command's result.
"""
assert bool(wallet) ^ bool(wallet_config), self.WALLET_SOURCE_ERROR_MSG
exec_param = {
param: param_value for param, param_value in locals().items() if param not in ["self"]
}
exec_param["timeout"] = f"{timeout}s"
return self._execute(
"wallet nep17 multitransfer",
**exec_param,
)

View file

@ -0,0 +1,7 @@
from enum import Enum
class NetworkType(Enum):
PRIVATE = "privnet"
MAIN = "mainnet"
TEST = "testnet"

View file

@ -0,0 +1,16 @@
from frostfs_testlib.cli.cli_command import CliCommand
from frostfs_testlib.cli.neogo.network_type import NetworkType
from frostfs_testlib.shell import CommandResult
class NeoGoNode(CliCommand):
def start(self, network: NetworkType = NetworkType.PRIVATE) -> CommandResult:
"""Start a NEO node.
Args:
network: Select network type (default: private).
Returns:
Command's result.
"""
return self._execute("start", **{network.value: True})

View file

@ -0,0 +1,100 @@
from frostfs_testlib.cli.cli_command import CliCommand
from frostfs_testlib.shell import CommandResult
class NeoGoQuery(CliCommand):
def candidates(self, rpc_endpoint: str, timeout: str = "10s") -> CommandResult:
"""Get candidates and votes.
Args:
rpc_endpoint: RPC node address.
timeout: Timeout for the operation (default: 10s).
Returns:
Command's result.
"""
return self._execute(
"query candidates",
**{
param: param_value
for param, param_value in locals().items()
if param not in ["self"]
},
)
def committee(self, rpc_endpoint: str, timeout: str = "10s") -> CommandResult:
"""Get committee list.
Args:
rpc_endpoint: RPC node address.
timeout: Timeout for the operation (default: 10s).
Returns:
Command's result.
"""
return self._execute(
"query committee",
**{
param: param_value
for param, param_value in locals().items()
if param not in ["self"]
},
)
def height(self, rpc_endpoint: str, timeout: str = "10s") -> CommandResult:
"""Get node height.
Args:
rpc_endpoint: RPC node address.
timeout: Timeout for the operation (default: 10s).
Returns:
Command's result.
"""
return self._execute(
"query height",
**{
param: param_value
for param, param_value in locals().items()
if param not in ["self"]
},
)
def tx(self, tx_hash: str, rpc_endpoint: str, timeout: str = "10s") -> CommandResult:
"""Query transaction status.
Args:
tx_hash: Hash of transaction.
rpc_endpoint: RPC node address.
timeout: Timeout for the operation (default: 10s).
Returns:
Command's result.
"""
return self._execute(
f"query tx {tx_hash}",
**{
param: param_value
for param, param_value in locals().items()
if param not in ["self", "hash"]
},
)
def voter(self, rpc_endpoint: str, timeout: str = "10s") -> CommandResult:
"""Print NEO holder account state.
Args:
rpc_endpoint: RPC node address.
timeout: Timeout for the operation (default: 10s).
Returns:
Command's result.
"""
return self._execute(
"query voter",
**{
param: param_value
for param, param_value in locals().items()
if param not in ["self"]
},
)

View file

@ -0,0 +1,12 @@
from frostfs_testlib.cli.cli_command import CliCommand
from frostfs_testlib.shell import CommandResult
class NeoGoVersion(CliCommand):
def get(self) -> CommandResult:
"""Application version.
Returns:
Command's result.
"""
return self._execute("", version=True)

View file

@ -0,0 +1,381 @@
from typing import Optional
from frostfs_testlib.cli.cli_command import CliCommand
from frostfs_testlib.shell import CommandResult
class NeoGoWallet(CliCommand):
def claim(
self,
address: str,
rpc_endpoint: str,
wallet: Optional[str] = None,
wallet_config: Optional[str] = None,
timeout: int = 10,
) -> CommandResult:
"""Claim GAS.
Args:
address: Address to claim GAS for.
wallet: Target location of the wallet file ('-' to read from stdin);
conflicts with --wallet-config flag.
wallet_config: Target location of the wallet config file; conflicts with --wallet flag.
rpc_endpoint: RPC node address.
timeout: Timeout for the operation (default: 10s).
Returns:
Command's result.
"""
assert bool(wallet) ^ bool(wallet_config), self.WALLET_SOURCE_ERROR_MSG
exec_param = {
param: param_value for param, param_value in locals().items() if param not in ["self"]
}
exec_param["timeout"] = f"{timeout}s"
return self._execute(
"wallet claim",
**exec_param,
)
def init(
self,
wallet: Optional[str] = None,
wallet_config: Optional[str] = None,
account: bool = False,
) -> CommandResult:
"""Create a new wallet.
Args:
wallet: Target location of the wallet file ('-' to read from stdin);
conflicts with --wallet-config flag.
wallet_config: Target location of the wallet config file; conflicts with --wallet flag.
account: Create a new account.
Returns:
Command's result.
"""
assert bool(wallet) ^ bool(wallet_config), self.WALLET_SOURCE_ERROR_MSG
return self._execute(
"wallet init",
**{
param: param_value
for param, param_value in locals().items()
if param not in ["self"]
},
)
def convert(
self,
out: str,
wallet: Optional[str] = None,
wallet_config: Optional[str] = None,
) -> CommandResult:
"""Convert addresses from existing NEO2 NEP6-wallet to NEO3 format.
Args:
wallet: Target location of the wallet file ('-' to read from stdin);
conflicts with --wallet-config flag.
wallet_config: Target location of the wallet config file; conflicts with --wallet flag.
out: Where to write converted wallet.
Returns:
Command's result.
"""
assert bool(wallet) ^ bool(wallet_config), self.WALLET_SOURCE_ERROR_MSG
return self._execute(
"wallet convert",
**{
param: param_value
for param, param_value in locals().items()
if param not in ["self"]
},
)
def create(
self,
wallet: Optional[str] = None,
wallet_config: Optional[str] = None,
) -> CommandResult:
"""Add an account to the existing wallet.
Args:
wallet: Target location of the wallet file ('-' to read from stdin);
conflicts with --wallet-config flag.
wallet_config: Target location of the wallet config file; conflicts with --wallet flag.
Returns:
Command's result.
"""
assert bool(wallet) ^ bool(wallet_config), self.WALLET_SOURCE_ERROR_MSG
return self._execute(
"wallet create",
**{
param: param_value
for param, param_value in locals().items()
if param not in ["self"]
},
)
def dump(
self,
wallet: Optional[str] = None,
wallet_config: Optional[str] = None,
decrypt: bool = False,
) -> CommandResult:
"""Check and dump an existing NEO wallet.
Args:
wallet: Target location of the wallet file ('-' to read from stdin);
conflicts with --wallet-config flag.
wallet_config: Target location of the wallet config file; conflicts with --wallet flag.
decrypt: Decrypt encrypted keys.
Returns:
Command's result.
"""
assert bool(wallet) ^ bool(wallet_config), self.WALLET_SOURCE_ERROR_MSG
return self._execute(
"wallet dump",
**{
param: param_value
for param, param_value in locals().items()
if param not in ["self"]
},
)
def dump_keys(
self,
address: Optional[str] = None,
wallet: Optional[str] = None,
wallet_config: Optional[str] = None,
) -> CommandResult:
"""Check and dump an existing NEO wallet.
Args:
wallet: Target location of the wallet file ('-' to read from stdin);
conflicts with --wallet-config flag.
wallet_config: Target location of the wallet config file; conflicts with --wallet flag.
address: Address to print public keys for.
Returns:
Command's result.
"""
assert bool(wallet) ^ bool(wallet_config), self.WALLET_SOURCE_ERROR_MSG
return self._execute(
"wallet dump-keys",
**{
param: param_value
for param, param_value in locals().items()
if param not in ["self"]
},
)
def export(
self,
wallet: Optional[str] = None,
wallet_config: Optional[str] = None,
decrypt: bool = False,
) -> CommandResult:
"""Export keys for address.
Args:
wallet: Target location of the wallet file ('-' to read from stdin);
conflicts with --wallet-config flag.
wallet_config: Target location of the wallet config file; conflicts with --wallet flag.
decrypt: Decrypt encrypted keys.
Returns:
Command's result.
"""
assert bool(wallet) ^ bool(wallet_config), self.WALLET_SOURCE_ERROR_MSG
return self._execute(
"wallet export",
**{
param: param_value
for param, param_value in locals().items()
if param not in ["self"]
},
)
def import_wif(
self,
wif: str,
name: str,
contract: str,
wallet: Optional[str] = None,
wallet_config: Optional[str] = None,
) -> CommandResult:
"""Import WIF of a standard signature contract.
Args:
wallet: Target location of the wallet file ('-' to read from stdin);
conflicts with --wallet-config flag.
wallet_config: Target location of the wallet config file; conflicts with --wallet flag.
wif: WIF to import.
name: Optional account name.
contract: Verification script for custom contracts.
Returns:
Command's result.
"""
assert bool(wallet) ^ bool(wallet_config), self.WALLET_SOURCE_ERROR_MSG
return self._execute(
"wallet import",
**{
param: param_value
for param, param_value in locals().items()
if param not in ["self"]
},
)
def import_multisig(
self,
wif: str,
name: Optional[str] = None,
min_number: int = 0,
wallet: Optional[str] = None,
wallet_config: Optional[str] = None,
) -> CommandResult:
"""Import multisig contract.
Args:
wallet: Target location of the wallet file ('-' to read from stdin);
conflicts with --wallet-config flag.
wallet_config: Target location of the wallet config file; conflicts with --wallet flag.
wif: WIF to import.
name: Optional account name.
min_number: Minimal number of signatures (default: 0).
Returns:
Command's result.
"""
assert bool(wallet) ^ bool(wallet_config), self.WALLET_SOURCE_ERROR_MSG
return self._execute(
"wallet import-multisig",
**{
param: param_value
for param, param_value in locals().items()
if param not in ["self"]
},
)
def import_deployed(
self,
wif: str,
rpc_endpoint: str,
name: Optional[str] = None,
wallet: Optional[str] = None,
wallet_config: Optional[str] = None,
contract: Optional[str] = None,
timeout: int = 10,
) -> CommandResult:
"""Import deployed contract.
Args:
wallet: Target location of the wallet file ('-' to read from stdin);
conflicts with --wallet-config flag.
wallet_config: Target location of the wallet config file; conflicts with --wallet flag.
wif: WIF to import.
name: Optional account name.
contract: Contract hash or address.
rpc_endpoint: RPC node address.
timeout: Timeout for the operation (default: 10s).
Returns:
Command's result.
"""
assert bool(wallet) ^ bool(wallet_config), self.WALLET_SOURCE_ERROR_MSG
exec_param = {
param: param_value for param, param_value in locals().items() if param not in ["self"]
}
exec_param["timeout"] = f"{timeout}s"
return self._execute(
"wallet import-deployed",
**exec_param,
)
def remove(
self,
address: str,
wallet: Optional[str] = None,
wallet_config: Optional[str] = None,
force: bool = False,
) -> CommandResult:
"""Remove an account from the wallet.
Args:
wallet: Target location of the wallet file ('-' to read from stdin);
conflicts with --wallet-config flag.
wallet_config: Target location of the wallet config file; conflicts with --wallet flag.
address: Account address or hash in LE form to be removed.
force: Do not ask for a confirmation.
Returns:
Command's result.
"""
assert bool(wallet) ^ bool(wallet_config), self.WALLET_SOURCE_ERROR_MSG
return self._execute(
"wallet remove",
**{
param: param_value
for param, param_value in locals().items()
if param not in ["self"]
},
)
def sign(
self,
input_file: str,
address: str,
rpc_endpoint: Optional[str] = None,
wallet: Optional[str] = None,
wallet_config: Optional[str] = None,
wallet_password: Optional[str] = None,
out: Optional[str] = None,
timeout: int = 10,
) -> CommandResult:
"""Cosign transaction with multisig/contract/additional account.
Signs the given (in the input file) context (which must be a transaction signing context)
for the given address using the given wallet. This command can output the resulting JSON
(with additional signature added) right to the console (if no output file and no RPC
endpoint specified) or into a file (which can be the same as input one). If an RPC endpoint
is given it'll also try to construct a complete transaction and send it via RPC (printing
its hash if everything is OK).
Args:
wallet: Target location of the wallet file ('-' to read from stdin);
conflicts with --wallet-config flag.
wallet_config: Target location of the wallet config file; conflicts with --wallet flag.
wallet_password: Wallet password.
out: File to put JSON transaction to.
input_file: File with JSON transaction.
address: Address to use.
rpc_endpoint: RPC node address.
timeout: Timeout for the operation (default: 10s).
Returns:
Command's result.
"""
assert bool(wallet) ^ bool(wallet_config), self.WALLET_SOURCE_ERROR_MSG
exec_param = {
param: param_value
for param, param_value in locals().items()
if param not in ["self", "wallet_password"]
}
exec_param["timeout"] = f"{timeout}s"
if wallet_password is not None:
return self._execute_with_password("wallet sign", wallet_password, **exec_param)
if wallet_config:
return self._execute("wallet sign", **exec_param)
raise Exception(self.WALLET_PASSWD_ERROR_MSG)