[#3] Move source code of testlib to src directory

Signed-off-by: Vladimir Domnich <v.domnich@yadro.com>
This commit is contained in:
Vladimir Domnich 2022-09-23 10:12:04 +04:00 committed by Vladimir
parent c48f7b7ff2
commit f5cd6a1954
41 changed files with 230 additions and 207 deletions

View file

@ -0,0 +1,3 @@
from neofs_testlib.cli.neofs_adm.adm import NeofsAdm
from neofs_testlib.cli.neofs_authmate.authmate import NeofsAuthmate
from neofs_testlib.cli.neogo.go import NeoGo

View file

@ -0,0 +1,58 @@
from typing import Optional
from neofs_testlib.shell import CommandResult, Shell
class CliCommand:
WALLET_SOURCE_ERROR_MSG = "Provide either wallet or wallet_config to specify wallet location"
cli_exec_path: Optional[str] = None
__base_params: Optional[str] = None
map_params = {
"json_mode": "json",
"await_mode": "await",
"hash_type": "hash",
"doc_type": "type",
}
def __init__(self, shell: Shell, cli_exec_path: str, **base_params):
self.shell = shell
self.cli_exec_path = cli_exec_path
self.__base_params = " ".join(
[f"--{param} {value}" for param, value in base_params.items() if value]
)
def _format_command(self, command: str, **params) -> str:
param_str = []
for param, value in params.items():
if param in self.map_params.keys():
param = self.map_params[param]
param = param.replace("_", "-")
if not value:
continue
if isinstance(value, bool):
param_str.append(f"--{param}")
elif isinstance(value, int):
param_str.append(f"--{param} {value}")
elif isinstance(value, list):
for value_item in value:
val_str = str(value_item).replace("'", "\\'")
param_str.append(f"--{param} '{val_str}'")
elif isinstance(value, dict):
param_str.append(
f'--{param} \'{",".join(f"{key}={val}" for key, val in value.items())}\''
)
else:
if "'" in str(value):
value_str = str(value).replace('"', '\\"')
param_str.append(f'--{param} "{value_str}"')
else:
param_str.append(f"--{param} '{value}'")
param_str = " ".join(param_str)
return f"{self.cli_exec_path} {self.__base_params} {command or ''} {param_str}"
def _execute(self, command: Optional[str], **params) -> CommandResult:
return self.shell.exec(self._format_command(command, **params))

View file

@ -0,0 +1 @@
from neofs_testlib.cli.neofs_adm.adm import NeofsAdm

View file

@ -0,0 +1,22 @@
from typing import Optional
from neofs_testlib.cli.neofs_adm.config import NeofsAdmConfig
from neofs_testlib.cli.neofs_adm.morph import NeofsAdmMorph
from neofs_testlib.cli.neofs_adm.storage_config import NeofsAdmStorageConfig
from neofs_testlib.cli.neofs_adm.subnet import NeofsAdmMorphSubnet
from neofs_testlib.cli.neofs_adm.version import NeofsAdmVersion
from neofs_testlib.shell import Shell
class NeofsAdm:
morph: Optional[NeofsAdmMorph] = None
subnet: Optional[NeofsAdmMorphSubnet] = None
storage_config: Optional[NeofsAdmStorageConfig] = None
version: Optional[NeofsAdmVersion] = None
def __init__(self, shell: Shell, neofs_adm_exec_path: str, config_file: Optional[str] = None):
self.config = NeofsAdmConfig(shell, neofs_adm_exec_path, config=config_file)
self.morph = NeofsAdmMorph(shell, neofs_adm_exec_path, config=config_file)
self.subnet = NeofsAdmMorphSubnet(shell, neofs_adm_exec_path, config=config_file)
self.storage_config = NeofsAdmStorageConfig(shell, neofs_adm_exec_path, config=config_file)
self.version = NeofsAdmVersion(shell, neofs_adm_exec_path, config=config_file)

View file

@ -0,0 +1,24 @@
from neofs_testlib.cli.cli_command import CliCommand
from neofs_testlib.shell import CommandResult
class NeofsAdmConfig(CliCommand):
def init(self, path: str = "~/.neofs/adm/config.yml") -> CommandResult:
"""Initialize basic neofs-adm configuration file.
Args:
path (str): path to config (default ~/.neofs/adm/config.yml)
Returns:
str: Command string
"""
return self._execute(
"config init",
**{
param: param_value
for param, param_value in locals().items()
if param not in ["self"]
},
)

View file

@ -0,0 +1,384 @@
from typing import Optional
from neofs_testlib.cli.cli_command import CliCommand
from neofs_testlib.shell import CommandResult
class NeofsAdmMorph(CliCommand):
def deposit_notary(
self,
rpc_endpoint: str,
account: str,
gas: str,
storage_wallet: Optional[str] = None,
till: Optional[str] = None,
) -> CommandResult:
"""Deposit GAS for notary service.
Args:
account (str): wallet account address
gas (str): amount of GAS to deposit
rpc_endpoint (str): N3 RPC node endpoint
storage_wallet (str): path to storage node wallet
till (str): notary deposit duration in blocks
Returns:
str: Command string
"""
return self._execute(
"morph deposit-notary",
**{
param: param_value
for param, param_value in locals().items()
if param not in ["self"]
},
)
def dump_balances(
self,
rpc_endpoint: str,
alphabet: Optional[str] = None,
proxy: Optional[str] = None,
script_hash: Optional[str] = None,
storage: Optional[str] = None,
) -> CommandResult:
"""Dump GAS balances
Args:
alphabet (str): dump balances of alphabet contracts
proxy (str): dump balances of the proxy contract
rpc_endpoint (str): N3 RPC node endpoint
script_hash (str): use script-hash format for addresses
storage (str): dump balances of storage nodes from the current netmap
Returns:
str: Command string
"""
return self._execute(
"morph dump-balances",
**{
param: param_value
for param, param_value in locals().items()
if param not in ["self"]
},
)
def dump_config(self, rpc_endpoint: str) -> CommandResult:
"""Section for morph network configuration commands.
Args:
rpc_endpoint (str): N3 RPC node endpoint
Returns:
str: Command string
"""
return self._execute(
"morph dump-config",
**{
param: param_value
for param, param_value in locals().items()
if param not in ["self"]
},
)
def dump_containers(
self,
rpc_endpoint: str,
cid: Optional[str] = None,
container_contract: Optional[str] = None,
dump: str = "./testlib_dump_container",
) -> CommandResult:
"""Dump NeoFS containers to file.
Args:
cid (str): containers to dump
container_contract (str): container contract hash (for networks without NNS)
dump (str): file where to save dumped containers
(default: ./testlib_dump_container)
rpc_endpoint (str): N3 RPC node endpoint
Returns:
str: Command string
"""
return self._execute(
"morph dump-containers",
**{
param: param_value
for param, param_value in locals().items()
if param not in ["self"]
},
)
def dump_hashes(self, rpc_endpoint: str) -> CommandResult:
"""Dump deployed contract hashes.
Args:
rpc_endpoint (str): N3 RPC node endpoint
Returns:
str: Command string
"""
return self._execute(
"morph dump-hashes",
**{
param: param_value
for param, param_value in locals().items()
if param not in ["self"]
},
)
def force_new_epoch(
self, rpc_endpoint: Optional[str] = None, alphabet: Optional[str] = None
) -> CommandResult:
"""Create new NeoFS epoch event in the side chain
Args:
alphabet (str): path to alphabet wallets dir
rpc_endpoint (str): N3 RPC node endpoint
Returns:
str: Command string
"""
return self._execute(
"morph force-new-epoch",
**{
param: param_value
for param, param_value in locals().items()
if param not in ["self"]
},
)
def generate_alphabet(
self,
rpc_endpoint: str,
alphabet_wallets: str,
size: int = 7,
) -> CommandResult:
"""Generate alphabet wallets for consensus nodes of the morph network
Args:
alphabet_wallets (str): path to alphabet wallets dir
size (int): amount of alphabet wallets to generate (default 7)
rpc_endpoint (str): N3 RPC node endpoint
Returns:
str: Command string
"""
return self._execute(
"morph generate-alphabet",
**{
param: param_value
for param, param_value in locals().items()
if param not in ["self"]
},
)
def generate_storage_wallet(
self,
rpc_endpoint: str,
alphabet_wallets: str,
storage_wallet: str,
initial_gas: Optional[str] = None,
) -> CommandResult:
"""Generate storage node wallet for the morph network
Args:
alphabet_wallets (str): path to alphabet wallets dir
initial_gas (str): initial amount of GAS to transfer
rpc_endpoint (str): N3 RPC node endpoint
storage_wallet (str): path to new storage node wallet
Returns:
str: Command string
"""
return self._execute(
"morph generate-storage-wallet",
**{
param: param_value
for param, param_value in locals().items()
if param not in ["self"]
},
)
def init(
self,
rpc_endpoint: str,
alphabet_wallets: str,
contracts: str,
protocol: str,
container_alias_fee: int = 500,
container_fee: int = 1000,
epoch_duration: int = 240,
homomorphic_disabled: bool = False,
local_dump: Optional[str] = None,
max_object_size: int = 67108864,
) -> CommandResult:
"""Section for morph network configuration commands.
Args:
alphabet_wallets (str): path to alphabet wallets dir
container_alias_fee (int): container alias fee (default 500)
container_fee (int): container registration fee (default 1000)
contracts (str): path to archive with compiled NeoFS contracts
(default fetched from latest github release)
epoch_duration (int): amount of side chain blocks in one NeoFS epoch
(default 240)
homomorphic_disabled: (bool): disable object homomorphic hashing
local_dump (str): path to the blocks dump file
max_object_size (int): max single object size in bytes (default 67108864)
protocol (str): path to the consensus node configuration
rpc_endpoint (str): N3 RPC node endpoint
Returns:
str: Command string
"""
return self._execute(
"morph init",
**{
param: param_value
for param, param_value in locals().items()
if param not in ["self"]
},
)
def refill_gas(
self,
rpc_endpoint: str,
alphabet_wallets: str,
storage_wallet: str,
gas: Optional[str] = None,
) -> CommandResult:
"""Refill GAS of storage node's wallet in the morph network
Args:
alphabet_wallets (str): path to alphabet wallets dir
gas (str): additional amount of GAS to transfer
rpc_endpoint (str): N3 RPC node endpoint
storage_wallet (str): path to new storage node wallet
Returns:
str: Command string
"""
return self._execute(
"morph refill-gas",
**{
param: param_value
for param, param_value in locals().items()
if param not in ["self"]
},
)
def restore_containers(
self,
rpc_endpoint: str,
alphabet_wallets: str,
cid: str,
dump: str,
) -> CommandResult:
"""Restore NeoFS containers from file.
Args:
alphabet_wallets (str): path to alphabet wallets dir
cid (str): containers to restore
dump (str): file to restore containers from
rpc_endpoint (str): N3 RPC node endpoint
Returns:
str: Command string
"""
return self._execute(
"morph restore-containers",
**{
param: param_value
for param, param_value in locals().items()
if param not in ["self"]
},
)
def set_policy(
self,
rpc_endpoint: str,
alphabet_wallets: str,
exec_fee_factor: Optional[int] = None,
storage_price: Optional[int] = None,
fee_per_byte: Optional[int] = None,
) -> CommandResult:
"""Set global policy values
Args:
alphabet_wallets (str): path to alphabet wallets dir
exec_fee_factor (int): ExecFeeFactor=<n1>
storage_price (int): StoragePrice=<n2>
fee_per_byte (int): FeePerByte=<n3>
rpc_endpoint (str): N3 RPC node endpoint
Returns:
str: Command string
"""
non_param_attribute = ""
if exec_fee_factor:
non_param_attribute += f"ExecFeeFactor={exec_fee_factor} "
if storage_price:
non_param_attribute += f"StoragePrice={storage_price} "
if fee_per_byte:
non_param_attribute += f"FeePerByte={fee_per_byte} "
return self._execute(
f"morph restore-containers {non_param_attribute}",
**{
param: param_value
for param, param_value in locals().items()
if param not in ["self", "exec_fee_factor", "storage_price", "fee_per_byte"]
},
)
def update_contracts(
self,
rpc_endpoint: str,
alphabet_wallets: str,
contracts: Optional[str] = None,
) -> CommandResult:
"""Update NeoFS contracts.
Args:
alphabet_wallets (str): path to alphabet wallets dir
contracts (str): path to archive with compiled NeoFS contracts
(default fetched from latest github release)
rpc_endpoint (str): N3 RPC node endpoint
Returns:
str: Command string
"""
return self._execute(
"morph update-contracts",
**{
param: param_value
for param, param_value in locals().items()
if param not in ["self"]
},
)

View file

@ -0,0 +1,25 @@
from neofs_testlib.cli.cli_command import CliCommand
from neofs_testlib.shell import CommandResult
class NeofsAdmStorageConfig(CliCommand):
def set(self, account: str, wallet: str) -> CommandResult:
"""Initialize basic neofs-adm configuration file.
Args:
account (str): wallet account
wallet (str): path to wallet
Returns:
str: Command string
"""
return self._execute(
"storage-config",
**{
param: param_value
for param, param_value in locals().items()
if param not in ["self"]
},
)

View file

@ -0,0 +1,257 @@
from typing import Optional
from neofs_testlib.cli.cli_command import CliCommand
from neofs_testlib.shell import CommandResult
class NeofsAdmMorphSubnet(CliCommand):
def create(
self, rpc_endpoint: str, address: str, wallet: str, notary: bool = False
) -> CommandResult:
"""Create NeoFS subnet.
Args:
address (str): Address in the wallet, optional
notary (bool): Flag to create subnet in notary environment
rpc_endpoint (str): N3 RPC node endpoint
wallet (str): Path to file with wallet
Returns:
str: Command string
"""
return self._execute(
"morph subnet create",
**{
param: param_value
for param, param_value in locals().items()
if param not in ["self"]
},
)
def get(self, rpc_endpoint: str, subnet: str) -> CommandResult:
"""Read information about the NeoFS subnet.
Args:
rpc_endpoint (str): N3 RPC node endpoint
subnet (str): ID of the subnet to read
Returns:
str: Command string
"""
return self._execute(
"morph subnet get",
**{
param: param_value
for param, param_value in locals().items()
if param not in ["self"]
},
)
def remove(
self, rpc_endpoint: str, wallet: str, subnet: str, address: Optional[str] = None
) -> CommandResult:
"""Remove NeoFS subnet.
Args:
address (str): Address in the wallet, optional
rpc_endpoint (str): N3 RPC node endpoint
subnet (str): ID of the subnet to read
wallet (str): Path to file with wallet
Returns:
str: Command string
"""
return self._execute(
"morph subnet remove",
**{
param: param_value
for param, param_value in locals().items()
if param not in ["self"]
},
)
def admin_add(
self,
rpc_endpoint: str,
wallet: str,
admin: str,
subnet: str,
client: Optional[str] = None,
group: Optional[str] = None,
address: Optional[str] = None,
) -> CommandResult:
"""Add admin to the NeoFS subnet.
Args:
address (str): Address in the wallet, optional
admin (str): Hex-encoded public key of the admin
client (str): Add client admin instead of node one
group (str): Client group ID in text format (needed with --client only)
rpc_endpoint (str): N3 RPC node endpoint
subnet (str): ID of the subnet to read
wallet (str): Path to file with wallet
Returns:
str: Command string
"""
return self._execute(
"morph subnet admin add",
**{
param: param_value
for param, param_value in locals().items()
if param not in ["self"]
},
)
def admin_remove(
self,
rpc_endpoint: str,
wallet: str,
admin: str,
subnet: str,
client: Optional[str] = None,
address: Optional[str] = None,
) -> CommandResult:
"""Remove admin of the NeoFS subnet.
Args:
address (str): Address in the wallet, optional
admin (str): Hex-encoded public key of the admin
client (str): Remove client admin instead of node one
rpc_endpoint (str): N3 RPC node endpoint
subnet (str): ID of the subnet to read
wallet (str): Path to file with wallet
Returns:
str: Command string
"""
return self._execute(
"morph subnet admin remove",
**{
param: param_value
for param, param_value in locals().items()
if param not in ["self"]
},
)
def client_add(
self,
rpc_endpoint: str,
wallet: str,
subnet: str,
client: Optional[str] = None,
group: Optional[str] = None,
address: Optional[str] = None,
) -> CommandResult:
"""Add client to the NeoFS subnet.
Args:
address (str): Address in the wallet, optional
client (str): Add client admin instead of node one
group (str): Client group ID in text format (needed with --client only)
rpc_endpoint (str): N3 RPC node endpoint
subnet (str): ID of the subnet to read
wallet (str): Path to file with wallet
Returns:
str: Command string
"""
return self._execute(
"morph subnet client add",
**{
param: param_value
for param, param_value in locals().items()
if param not in ["self"]
},
)
def client_remove(
self,
rpc_endpoint: str,
wallet: str,
client: str,
group: str,
subnet: str,
address: Optional[str] = None,
) -> CommandResult:
"""Remove client of the NeoFS subnet.
Args:
address (str): Address in the wallet, optional
client (str): Remove client admin instead of node one
group (str): ID of the client group to work with
rpc_endpoint (str): N3 RPC node endpoint
subnet (str): ID of the subnet to read
wallet (str): Path to file with wallet
Returns:
str: Command string
"""
return self._execute(
"morph subnet client remove",
**{
param: param_value
for param, param_value in locals().items()
if param not in ["self"]
},
)
def node_add(self, rpc_endpoint: str, wallet: str, node: str, subnet: str) -> CommandResult:
"""Add node to the NeoFS subnet.
Args:
node (str): Hex-encoded public key of the node
rpc_endpoint (str): N3 RPC node endpoint
subnet (str): ID of the subnet to read
wallet (str): Path to file with wallet
Returns:
str: Command string
"""
return self._execute(
"morph subnet node add",
**{
param: param_value
for param, param_value in locals().items()
if param not in ["self"]
},
)
def node_remove(self, rpc_endpoint: str, wallet: str, node: str, subnet: str) -> CommandResult:
"""Remove node from the NeoFS subnet.
Args:
node (str): Hex-encoded public key of the node
rpc_endpoint (str): N3 RPC node endpoint
subnet (str): ID of the subnet to read
wallet (str): Path to file with wallet
Returns:
str: Command string
"""
return self._execute(
"morph subnet node remove",
**{
param: param_value
for param, param_value in locals().items()
if param not in ["self"]
},
)

View file

@ -0,0 +1,13 @@
from neofs_testlib.cli.cli_command import CliCommand
from neofs_testlib.shell import CommandResult
class NeofsAdmVersion(CliCommand):
def get(self) -> CommandResult:
"""Application version
Returns:
str: Command string
"""
return self._execute("", version=True)

View file

@ -0,0 +1 @@
from neofs_testlib.cli.neofs_authmate.authmate import NeofsAuthmate

View file

@ -0,0 +1,14 @@
from typing import Optional
from neofs_testlib.cli.neofs_authmate.secret import NeofsAuthmateSecret
from neofs_testlib.cli.neofs_authmate.version import NeofsAuthmateVersion
from neofs_testlib.shell import Shell
class NeofsAuthmate:
secret: Optional[NeofsAuthmateSecret] = None
version: Optional[NeofsAuthmateVersion] = None
def __init__(self, shell: Shell, neofs_authmate_exec_path: str):
self.secret = NeofsAuthmateSecret(shell, neofs_authmate_exec_path)
self.version = NeofsAuthmateVersion(shell, neofs_authmate_exec_path)

View file

@ -0,0 +1,92 @@
from typing import Optional
from neofs_testlib.cli.cli_command import CliCommand
from neofs_testlib.shell import CommandResult
class NeofsAuthmateSecret(CliCommand):
def obtain(
self,
wallet: str,
peer: str,
gate_wallet: str,
access_key_id: str,
address: Optional[str] = None,
gate_address: Optional[str] = None,
) -> CommandResult:
"""Obtain a secret from NeoFS network
Args:
wallet (str): path to the wallet
address (str): address of wallet account
peer (str): address of neofs peer to connect to
gate_wallet (str): path to the wallet
gate_address (str): address of wallet account
access_key_id (str): access key id for s3
Returns:
str: Command string
"""
return self._execute(
"obtain-secret",
**{
param: param_value
for param, param_value in locals().items()
if param not in ["self"]
},
)
def issue(
self,
wallet: str,
peer: str,
bearer_rules: str,
gate_public_key: str,
address: Optional[str] = None,
container_id: Optional[str] = None,
container_friendly_name: Optional[str] = None,
container_placement_policy: Optional[str] = None,
session_tokens: Optional[str] = None,
lifetime: Optional[str] = None,
container_policy: Optional[str] = None,
aws_cli_credentials: Optional[str] = None,
) -> CommandResult:
"""Obtain a secret from NeoFS network
Args:
wallet (str): path to the wallet
address (str): address of wallet account
peer (str): address of a neofs peer to connect to
bearer_rules (str): rules for bearer token as plain json string
gate_public_key (str): public 256r1 key of a gate (use flags repeatedly for
multiple gates)
container_id (str): auth container id to put the secret into
container_friendly_name (str): friendly name of auth container to put the
secret into
container_placement_policy (str): placement policy of auth container to put the
secret into
(default: "REP 2 IN X CBF 3 SELECT 2 FROM * AS X")
session_tokens (str): create session tokens with rules, if the rules are
set as 'none', no session tokens will be created
lifetime (str): Lifetime of tokens. For example 50h30m
(note: max time unit is an hour so to set a day you
should use 24h). It will be ceil rounded to the
nearest amount of epoch. (default: 720h0m0s)
container_policy (str): mapping AWS storage class to NeoFS storage policy as
plain json string or path to json file
aws_cli_credentials (str): path to the aws cli credential file
Returns:
str: Command string
"""
return self._execute(
"issue-secret",
**{
param: param_value
for param, param_value in locals().items()
if param not in ["self"]
},
)

View file

@ -0,0 +1,13 @@
from neofs_testlib.cli.cli_command import CliCommand
from neofs_testlib.shell import CommandResult
class NeofsAuthmateVersion(CliCommand):
def get(self) -> CommandResult:
"""Application version
Returns:
str: Command string
"""
return self._execute("", version=True)

View file

@ -0,0 +1,2 @@
from neofs_testlib.cli.neogo.go import NeoGo
from neofs_testlib.cli.neogo.network_type import NetworkType

View file

@ -0,0 +1,118 @@
from typing import Optional
from neofs_testlib.cli.cli_command import CliCommand
from neofs_testlib.shell import CommandResult
class NeoGoCandidate(CliCommand):
def register(
self,
address: str,
rpc_endpoint: str,
wallet: Optional[str] = None,
wallet_config: Optional[str] = None,
gas: Optional[float] = None,
timeout: int = 10,
) -> CommandResult:
"""Register as a new candidate
Args:
address (str): Address to register
wallet (str): Target location of the wallet file ('-' to read from stdin);
conflicts with --wallet-config flag.
wallet_config (str): Target location of the wallet config file;
conflicts with --wallet flag.
gas (float): network fee to add to the transaction (prioritizing it)
rpc_endpoint (str): RPC node address
timeout (int): Timeout for the operation (default: 10s)
Returns:
str: Command string
"""
assert bool(wallet) ^ bool(wallet_config), self.WALLET_SOURCE_ERROR_MSG
return self._execute(
"wallet candidate register",
**{
param: param_value
for param, param_value in locals().items()
if param not in ["self"]
},
)
def unregister(
self,
address: str,
rpc_endpoint: str,
wallet: Optional[str] = None,
wallet_config: Optional[str] = None,
gas: Optional[float] = None,
timeout: int = 10,
) -> CommandResult:
"""Unregister self as a candidate
Args:
address (str): Address to unregister
wallet (str): Target location of the wallet file ('-' to read from stdin);
conflicts with --wallet-config flag.
wallet_config (str): Target location of the wallet config file;
conflicts with --wallet flag.
gas (float): network fee to add to the transaction (prioritizing it)
rpc_endpoint (str): RPC node address
timeout (int): Timeout for the operation (default: 10s)
Returns:
str: Command string
"""
assert bool(wallet) ^ bool(wallet_config), self.WALLET_SOURCE_ERROR_MSG
return self._execute(
"wallet candidate unregister",
**{
param: param_value
for param, param_value in locals().items()
if param not in ["self"]
},
)
def vote(
self,
candidate: str,
rpc_endpoint: str,
wallet: Optional[str] = None,
wallet_config: Optional[str] = None,
gas: Optional[float] = None,
timeout: int = 10,
) -> CommandResult:
"""Votes for a validator by calling "vote" method of a NEO native
contract. Do not provide candidate argument to perform unvoting.
Args:
candidate (str): Public key of candidate to vote for
wallet (str): Target location of the wallet file ('-' to read from stdin);
conflicts with --wallet-config flag.
wallet_config (str): Target location of the wallet config file;
conflicts with --wallet flag.
gas (float): network fee to add to the transaction (prioritizing it)
rpc_endpoint (str): RPC node address
timeout (int): Timeout for the operation (default: 10s)
Returns:
str: Command string
"""
assert bool(wallet) ^ bool(wallet_config), self.WALLET_SOURCE_ERROR_MSG
return self._execute(
"wallet candidate vote",
**{
param: param_value
for param, param_value in locals().items()
if param not in ["self"]
},
)

View file

@ -0,0 +1,357 @@
from typing import Optional
from neofs_testlib.cli.cli_command import CliCommand
from neofs_testlib.shell import CommandResult
class NeoGoContract(CliCommand):
def compile(
self,
input_file: str,
out: str,
manifest: str,
config: str,
no_standards: bool = False,
no_events: bool = False,
no_permissions: bool = False,
bindings: Optional[str] = None,
) -> CommandResult:
"""Compile a smart contract to a .nef file
Args:
input_file (str): Input file for the smart contract to be compiled
out (str): Output of the compiled contract
manifest (str): Emit contract manifest (*.manifest.json) file into separate
file using configuration input file (*.yml)
config (str): Configuration input file (*.yml)
no_standards (bool): do not check compliance with supported standards
no_events (bool): do not check emitted events with the manifest
no_permissions (bool): do not check if invoked contracts are allowed in manifest
bindings (str): output file for smart-contract bindings configuration
Returns:
str: Command string
"""
return self._execute(
"contract compile",
**{
param: param_value
for param, param_value in locals().items()
if param not in ["self"]
},
)
def deploy(
self,
address: str,
input_file: str,
sysgas: float,
manifest: str,
rpc_endpoint: str,
wallet: Optional[str] = None,
wallet_config: Optional[str] = None,
gas: Optional[float] = None,
out: Optional[str] = None,
force: bool = False,
timeout: int = 10,
) -> CommandResult:
"""Deploy a smart contract (.nef with description)
Args:
wallet (str): wallet to use to get the key for transaction signing;
conflicts with wallet_config
wallet_config (str): path to wallet config to use to get the key for transaction
signing; conflicts with wallet
address (str): address to use as transaction signee (and gas source)
gas (float): network fee to add to the transaction (prioritizing it)
sysgas (float): system fee to add to transaction (compensating for execution)
out (str): file to put JSON transaction to
force (bool): Do not ask for a confirmation
rpc_endpoint (str): RPC node address
timeout (int): Timeout for the operation (default: 10s)
input_file (str): Input file for the smart contract (*.nef)
manifest (str): Emit contract manifest (*.manifest.json) file into separate
file using configuration input file (*.yml)
Returns:
str: Command string
"""
assert bool(wallet) ^ bool(wallet_config), self.WALLET_SOURCE_ERROR_MSG
return self._execute(
"contract deploy",
**{
param: param_value
for param, param_value in locals().items()
if param not in ["self"]
},
)
def generate_wrapper(
self,
out: str,
hash: str,
config: Optional[str] = None,
manifest: Optional[str] = None,
) -> CommandResult:
"""Generate wrapper to use in other contracts
Args:
config (str): Configuration file to use
manifest (str): Read contract manifest (*.manifest.json) file
out (str): Output of the compiled contract
hash (str): Smart-contract hash
Returns:
str: Command string
"""
return self._execute(
"contract generate-wrapper",
**{
param: param_value
for param, param_value in locals().items()
if param not in ["self"]
},
)
def invokefunction(
self,
address: str,
scripthash: str,
wallet: Optional[str] = None,
method: Optional[str] = None,
arguments: Optional[str] = None,
multisig_hash: Optional[str] = None,
wallet_config: Optional[str] = None,
gas: Optional[float] = None,
sysgas: Optional[float] = None,
out: Optional[str] = None,
force: bool = False,
rpc_endpoint: Optional[str] = None,
timeout: int = 10,
) -> CommandResult:
"""Executes given (as a script hash) deployed script with the given method,
arguments and signers. Sender is included in the list of signers by default
with None witness scope. If you'd like to change default sender's scope,
specify it via signers parameter. See testinvokefunction documentation for
the details about parameters. It differs from testinvokefunction in that this
command sends an invocation transaction to the network.
Args:
scripthash (str): Function hash
method (str): Call method
arguments (str): Method arguments
multisig_hash (str): Multisig hash
wallet (str): wallet to use to get the key for transaction signing;
conflicts with wallet_config
wallet_config (str): path to wallet config to use to get the key for transaction
signing; conflicts with wallet
address (str): address to use as transaction signee (and gas source)
gas (float): network fee to add to the transaction (prioritizing it)
sysgas (float): system fee to add to transaction (compensating for execution)
out (str): file to put JSON transaction to
force (bool): force-push the transaction in case of bad VM state after
test script invocation
rpc_endpoint (str): RPC node address
timeout (int): Timeout for the operation (default: 10s)
Returns:
str: Command string
"""
multisig_hash = f"-- {multisig_hash}" or ""
return self._execute(
"contract invokefunction "
f"{scripthash} {method or ''} {arguments or ''} {multisig_hash}",
**{
param: param_value
for param, param_value in locals().items()
if param not in ["self", "scripthash", "method", "arguments", "multisig_hash"]
},
)
def testinvokefunction(
self,
scripthash: str,
wallet: Optional[str] = None,
method: Optional[str] = None,
arguments: Optional[str] = None,
multisig_hash: Optional[str] = None,
rpc_endpoint: Optional[str] = None,
timeout: int = 10,
) -> CommandResult:
"""Executes given (as a script hash) deployed script with the given method,
arguments and signers (sender is not included by default). If no method is given
"" is passed to the script, if no arguments are given, an empty array is
passed, if no signers are given no array is passed. If signers are specified,
the first one of them is treated as a sender. All of the given arguments are
encapsulated into array before invoking the script. The script thus should
follow the regular convention of smart contract arguments (method string and
an array of other arguments).
See more information and samples in `neo-go contract testinvokefunction --help`
Args:
scripthash (str): Function hash
method (str): Call method
arguments (str): Method arguments
multisig_hash (str): Multisig hash
rpc_endpoint (str): RPC node address
timeout (int): Timeout for the operation (default: 10s)
Returns:
str: Command string
"""
multisig_hash = f"-- {multisig_hash}" or ""
return self._execute(
"contract testinvokefunction "
f"{scripthash} {method or ''} {arguments or ''} {multisig_hash}",
**{
param: param_value
for param, param_value in locals().items()
if param not in ["self", "scripthash", "method", "arguments", "multisig_hash"]
},
)
def testinvokescript(
self,
input_file: str,
rpc_endpoint: Optional[str] = None,
timeout: int = 10,
) -> CommandResult:
"""Executes given compiled AVM instructions in NEF format with the given set of
signers not included sender by default. See testinvokefunction documentation
for the details about parameters.
Args:
input_file (str): Input location of the .nef file that needs to be invoked
conflicts with wallet_config
rpc_endpoint (str): RPC node address
timeout (int): Timeout for the operation (default: 10s)
Returns:
str: Command string
"""
return self._execute(
f"contract testinvokescript",
**{
param: param_value
for param, param_value in locals().items()
if param not in ["self"]
},
)
def init(
self,
name: str,
skip_details: bool = False,
) -> CommandResult:
"""Initialize a new smart-contract in a directory with boiler plate code
Args:
name (str): name of the smart-contract to be initialized
skip_details (bool): skip filling in the projects and contract details
Returns:
str: Command string
"""
return self._execute(
"contract init",
**{
param: param_value
for param, param_value in locals().items()
if param not in ["self"]
},
)
def inspect(
self,
input_file: Optional[str] = None,
compile: Optional[str] = None,
) -> CommandResult:
"""Creates a user readable dump of the program instructions
Args:
input_file (str): input file of the program (either .go or .nef)
compile (str): compile input file (it should be go code then)
Returns:
str: Command string
"""
return self._execute(
"contract inspect",
**{
param: param_value
for param, param_value in locals().items()
if param not in ["self"]
},
)
def calc_hash(
self,
input_file: str,
manifest: str,
sender: Optional[str] = None,
) -> CommandResult:
"""Calculates hash of a contract after deployment
Args:
input_file (str): path to NEF file
sender (str): sender script hash or address
manifest (str): path to manifest file
Returns:
str: Command string
"""
return self._execute(
"contract calc-hash",
**{
param: param_value
for param, param_value in locals().items()
if param not in ["self"]
},
)
def add_group(
self,
manifest: str,
address: str,
wallet: Optional[str] = None,
wallet_config: Optional[str] = None,
sender: Optional[str] = None,
nef: Optional[str] = None,
) -> CommandResult:
"""Adds group to the manifest
Args:
wallet (str): wallet to use to get the key for transaction signing;
conflicts with wallet_config
wallet_config (str): path to wallet config to use to get the key for transaction
signing; conflicts with wallet
sender (str): deploy transaction sender
address (str): account to sign group with
nef (str): path to the NEF file
manifest (str): path to the manifest
Returns:
str: Command string
"""
return self._execute(
"contract manifest add-group",
**{
param: param_value
for param, param_value in locals().items()
if param not in ["self"]
},
)

View file

@ -0,0 +1,73 @@
from typing import Optional
from neofs_testlib.cli.cli_command import CliCommand
from neofs_testlib.cli.neogo.network_type import NetworkType
from neofs_testlib.shell import CommandResult
class NeoGoDb(CliCommand):
def dump(
self,
config_path: str,
out: str,
network: NetworkType = NetworkType.PRIVATE,
count: int = 0,
start: int = 0,
) -> CommandResult:
"""Dump blocks (starting with block #1) to the file
Args:
config_path (str): path to config
network (NetworkType): Select network type (default: private)
count (int): number of blocks to be processed (default or 0: all chain)
(default: 0)
start (int): block number to start from (default: 0) (default: 0)
out (srt): Output file (stdout if not given)
Returns:
str: Command string
"""
return self._execute(
"db dump",
**{network.value: True},
**{
param: param_value
for param, param_value in locals().items()
if param not in ["self"]
},
)
def restore(
self,
config_path: str,
input_file: str,
network: NetworkType = NetworkType.PRIVATE,
count: int = 0,
dump: Optional[str] = None,
incremental: bool = False,
) -> CommandResult:
"""Dump blocks (starting with block #1) to the file
Args:
config_path (str): path to config
network (NetworkType): Select network type (default: private)
count (int): number of blocks to be processed (default or 0: all chain)
(default: 0)
input_file (str): Input file (stdin if not given)
dump (str): directory for storing JSON dumps
incremental (bool): use if dump is incremental
Returns:
str: Command string
"""
return self._execute(
"db restore",
**{network.value: True},
**{
param: param_value
for param, param_value in locals().items()
if param not in ["self"]
},
)

View file

@ -0,0 +1,37 @@
from typing import Optional
from neofs_testlib.cli.neogo.candidate import NeoGoCandidate
from neofs_testlib.cli.neogo.contract import NeoGoContract
from neofs_testlib.cli.neogo.db import NeoGoDb
from neofs_testlib.cli.neogo.nep17 import NeoGoNep17
from neofs_testlib.cli.neogo.node import NeoGoNode
from neofs_testlib.cli.neogo.query import NeoGoQuery
from neofs_testlib.cli.neogo.version import NeoGoVersion
from neofs_testlib.cli.neogo.wallet import NeoGoWallet
from neofs_testlib.shell import Shell
class NeoGo:
candidate: Optional[NeoGoCandidate] = None
contract: Optional[NeoGoContract] = None
db: Optional[NeoGoDb] = None
nep17: Optional[NeoGoNep17] = None
node: Optional[NeoGoNode] = None
query: Optional[NeoGoQuery] = None
version: Optional[NeoGoVersion] = None
wallet: Optional[NeoGoWallet] = None
def __init__(
self,
shell: Shell,
neo_go_exec_path: Optional[str] = None,
config_path: Optional[str] = None,
):
self.candidate = NeoGoCandidate(shell, neo_go_exec_path, config_path=config_path)
self.contract = NeoGoContract(shell, neo_go_exec_path, config_path=config_path)
self.db = NeoGoDb(shell, neo_go_exec_path, config_path=config_path)
self.nep17 = NeoGoNep17(shell, neo_go_exec_path, config_path=config_path)
self.node = NeoGoNode(shell, neo_go_exec_path, config_path=config_path)
self.query = NeoGoQuery(shell, neo_go_exec_path, config_path=config_path)
self.version = NeoGoVersion(shell, neo_go_exec_path, config_path=config_path)
self.wallet = NeoGoWallet(shell, neo_go_exec_path, config_path=config_path)

View file

@ -0,0 +1,242 @@
from typing import Optional
from neofs_testlib.cli.cli_command import CliCommand
from neofs_testlib.shell import CommandResult
class NeoGoNep17(CliCommand):
def balance(
self,
address: str,
token: str,
rpc_endpoint: str,
wallet: Optional[str] = None,
wallet_config: Optional[str] = None,
timeout: int = 10,
) -> CommandResult:
"""Get address balance
Args:
address (str): Address to use
wallet (str): Target location of the wallet file ('-' to read from stdin);
conflicts with --wallet-config flag.
wallet_config (str): Target location of the wallet config file;
conflicts with --wallet flag.
token (str): Token to use (hash or name (for NEO/GAS or imported tokens))
rpc_endpoint (str): RPC node address
timeout (int): Timeout for the operation (default: 10s)
Returns:
str: Command string
"""
assert bool(wallet) ^ bool(wallet_config), self.WALLET_SOURCE_ERROR_MSG
return self._execute(
"wallet nep17 balance",
**{
param: param_value
for param, param_value in locals().items()
if param not in ["self"]
},
)
def import_token(
self,
address: str,
wallet: Optional[str] = None,
wallet_config: Optional[str] = None,
token: Optional[str] = None,
rpc_endpoint: Optional[str] = None,
timeout: int = 10,
) -> CommandResult:
"""import NEP-17 token to a wallet
Args:
address (str): Token contract address or hash in LE
wallet (str): Target location of the wallet file ('-' to read from stdin);
conflicts with --wallet-config flag.
wallet_config (str): Target location of the wallet config file;
conflicts with --wallet flag.
token (str): Token to use (hash or name (for NEO/GAS or imported tokens))
rpc_endpoint (str): RPC node address
timeout (int): Timeout for the operation (default: 10s)
Returns:
str: Command string
"""
assert bool(wallet) ^ bool(wallet_config), self.WALLET_SOURCE_ERROR_MSG
return self._execute(
"wallet nep17 import",
**{
param: param_value
for param, param_value in locals().items()
if param not in ["self"]
},
)
def info(
self,
token: str,
wallet: Optional[str] = None,
wallet_config: Optional[str] = None,
) -> CommandResult:
"""print imported NEP-17 token info
Args:
wallet (str): Target location of the wallet file ('-' to read from stdin);
conflicts with --wallet-config flag.
wallet_config (str): Target location of the wallet config file;
conflicts with --wallet flag.
token (str): Token to use (hash or name (for NEO/GAS or imported tokens))
Returns:
str: Command string
"""
assert bool(wallet) ^ bool(wallet_config), self.WALLET_SOURCE_ERROR_MSG
return self._execute(
"wallet nep17 info",
**{
param: param_value
for param, param_value in locals().items()
if param not in ["self"]
},
)
def remove(
self,
token: str,
wallet: Optional[str] = None,
wallet_config: Optional[str] = None,
force: bool = False,
) -> CommandResult:
"""remove NEP-17 token from the wallet
Args:
wallet (str): Target location of the wallet file ('-' to read from stdin);
conflicts with --wallet-config flag.
wallet_config (str): Target location of the wallet config file;
conflicts with --wallet flag.
token (str): Token to use (hash or name (for NEO/GAS or imported tokens))
force (bool): Do not ask for a confirmation
Returns:
str: Command string
"""
return self._execute(
"wallet nep17 remove",
**{
param: param_value
for param, param_value in locals().items()
if param not in ["self"]
},
)
def transfer(
self,
token: str,
to_address: str,
sysgas: float,
rpc_endpoint: str,
wallet: Optional[str] = None,
wallet_config: Optional[str] = None,
out: Optional[str] = None,
from_address: Optional[str] = None,
force: bool = False,
gas: Optional[float] = None,
amount: float = 0,
timeout: int = 10,
) -> CommandResult:
"""Transfers specified NEP-17 token amount with optional 'data' parameter and cosigners
list attached to the transfer. See 'contract testinvokefunction' documentation
for the details about 'data' parameter and cosigners syntax. If no 'data' is
given then default nil value will be used. If no cosigners are given then the
sender with CalledByEntry scope will be used as the only signer.
Args:
wallet (str): Target location of the wallet file ('-' to read from stdin);
conflicts with --wallet-config flag.
wallet_config (str): Target location of the wallet config file;
conflicts with --wallet flag.
out (str): file to put JSON transaction to
from_address (str): Address to send an asset from
to_address (str): Address to send an asset to
token (str): Token to use (hash or name (for NEO/GAS or imported tokens))
force (bool): Do not ask for a confirmation
gas (float): network fee to add to the transaction (prioritizing it)
sysgas (float): system fee to add to transaction (compensating for execution)
force (bool): Do not ask for a confirmation
amount (float) Amount of asset to send
rpc_endpoint (str): RPC node address
timeout (int): Timeout for the operation (default: 10s)
Returns:
str: Command string
"""
assert bool(wallet) ^ bool(wallet_config), self.WALLET_SOURCE_ERROR_MSG
return self._execute(
"wallet nep17 transfer",
**{
param: param_value
for param, param_value in locals().items()
if param not in ["self"]
},
)
def multitransfer(
self,
token: str,
to_address: list[str],
sysgas: float,
rpc_endpoint: str,
wallet: Optional[str] = None,
wallet_config: Optional[str] = None,
out: Optional[str] = None,
from_address: Optional[str] = None,
force: bool = False,
gas: Optional[float] = None,
amount: float = 0,
timeout: int = 10,
) -> CommandResult:
"""transfer NEP-17 tokens to multiple recipients
Args:
wallet (str): Target location of the wallet file ('-' to read from stdin);
conflicts with --wallet-config flag.
wallet_config (str): Target location of the wallet config file;
conflicts with --wallet flag.
out (str): file to put JSON transaction to
from_address (str): Address to send an asset from
to_address (str): Address to send an asset to
token (str): Token to use (hash or name (for NEO/GAS or imported tokens))
force (bool): Do not ask for a confirmation
gas (float): network fee to add to the transaction (prioritizing it)
sysgas (float): system fee to add to transaction (compensating for execution)
force (bool): Do not ask for a confirmation
amount (float) Amount of asset to send
rpc_endpoint (str): RPC node address
timeout (int): Timeout for the operation (default: 10s)
Returns:
str: Command string
"""
assert bool(wallet) ^ bool(wallet_config), self.WALLET_SOURCE_ERROR_MSG
return self._execute(
"wallet nep17 multitransfer",
**{
param: param_value
for param, param_value in locals().items()
if param not in ["self"]
},
)

View file

@ -0,0 +1,7 @@
from enum import Enum
class NetworkType(Enum):
PRIVATE = "privnet"
MAIN = "mainnet"
TEST = "testnet"

View file

@ -0,0 +1,17 @@
from neofs_testlib.cli.cli_command import CliCommand
from neofs_testlib.cli.neogo.network_type import NetworkType
from neofs_testlib.shell import CommandResult
class NeoGoNode(CliCommand):
def start(self, network: NetworkType = NetworkType.PRIVATE) -> CommandResult:
"""Start a NEO node
Args:
network (NetworkType): Select network type (default: private)
Returns:
str: Command string
"""
return self._execute("start", **{network.value: True})

View file

@ -0,0 +1,126 @@
from neofs_testlib.cli.cli_command import CliCommand
from neofs_testlib.shell import CommandResult
class NeoGoQuery(CliCommand):
def candidates(
self,
rpc_endpoint: str,
timeout: int = 10,
) -> CommandResult:
"""Get candidates and votes
Args:
rpc_endpoint (str): RPC node address
timeout (int): Timeout for the operation (default: 10s)
Returns:
str: Command string
"""
return self._execute(
"query candidates",
**{
param: param_value
for param, param_value in locals().items()
if param not in ["self"]
},
)
def committee(
self,
rpc_endpoint: str,
timeout: int = 10,
) -> CommandResult:
"""Get committee list
Args:
rpc_endpoint (str): RPC node address
timeout (int): Timeout for the operation (default: 10s)
Returns:
str: Command string
"""
return self._execute(
"query committee",
**{
param: param_value
for param, param_value in locals().items()
if param not in ["self"]
},
)
def height(
self,
rpc_endpoint: str,
timeout: int = 10,
) -> CommandResult:
"""Get node height
Args:
rpc_endpoint (str): RPC node address
timeout (int): Timeout for the operation (default: 10s)
Returns:
str: Command string
"""
return self._execute(
"query height",
**{
param: param_value
for param, param_value in locals().items()
if param not in ["self"]
},
)
def tx(
self,
tx_hash: str,
rpc_endpoint: str,
timeout: int = 10,
) -> CommandResult:
"""Query transaction status
Args:
tx_hash (str): Hash of transaction
rpc_endpoint (str): RPC node address
timeout (int): Timeout for the operation (default: 10s)
Returns:
str: Command string
"""
return self._execute(
f"query tx {tx_hash}",
**{
param: param_value
for param, param_value in locals().items()
if param not in ["self", "hash"]
},
)
def voter(
self,
rpc_endpoint: str,
timeout: int = 10,
) -> CommandResult:
"""Print NEO holder account state
Args:
rpc_endpoint (str): RPC node address
timeout (int): Timeout for the operation (default: 10s)
Returns:
str: Command string
"""
return self._execute(
"query voter",
**{
param: param_value
for param, param_value in locals().items()
if param not in ["self"]
},
)

View file

@ -0,0 +1,13 @@
from neofs_testlib.cli.cli_command import CliCommand
from neofs_testlib.shell import CommandResult
class NeoGoVersion(CliCommand):
def get(self) -> CommandResult:
"""Application version
Returns:
str: Command string
"""
return self._execute("", version=True)

View file

@ -0,0 +1,394 @@
from typing import Optional
from neofs_testlib.cli.cli_command import CliCommand
from neofs_testlib.shell import CommandResult
class NeoGoWallet(CliCommand):
def claim(
self,
address: str,
rpc_endpoint: str,
wallet: Optional[str] = None,
wallet_config: Optional[str] = None,
timeout: int = 10,
) -> CommandResult:
"""claim GAS
Args:
address (str): Address to claim GAS for
wallet (str): Target location of the wallet file ('-' to read from stdin);
conflicts with --wallet-config flag.
wallet_config (str): Target location of the wallet config file;
conflicts with --wallet flag.
rpc_endpoint (str): RPC node address
timeout (int): Timeout for the operation (default: 10s)
Returns:
str: Command string
"""
assert bool(wallet) ^ bool(wallet_config), self.WALLET_SOURCE_ERROR_MSG
return self._execute(
"wallet claim",
**{
param: param_value
for param, param_value in locals().items()
if param not in ["self"]
},
)
def init(
self,
wallet: Optional[str] = None,
wallet_config: Optional[str] = None,
account: bool = False,
) -> CommandResult:
"""create a new wallet
Args:
wallet (str): Target location of the wallet file ('-' to read from stdin);
conflicts with --wallet-config flag.
wallet_config (str): Target location of the wallet config file;
conflicts with --wallet flag.
account (bool): Create a new account
Returns:
str: Command string
"""
assert bool(wallet) ^ bool(wallet_config), self.WALLET_SOURCE_ERROR_MSG
return self._execute(
"wallet init",
**{
param: param_value
for param, param_value in locals().items()
if param not in ["self"]
},
)
def convert(
self,
out: str,
wallet: Optional[str] = None,
wallet_config: Optional[str] = None,
) -> CommandResult:
"""convert addresses from existing NEO2 NEP6-wallet to NEO3 format
Args:
wallet (str): Target location of the wallet file ('-' to read from stdin);
conflicts with --wallet-config flag.
wallet_config (str): Target location of the wallet config file;
conflicts with --wallet flag.
out (str): where to write converted wallet
Returns:
str: Command string
"""
assert bool(wallet) ^ bool(wallet_config), self.WALLET_SOURCE_ERROR_MSG
return self._execute(
"wallet convert",
**{
param: param_value
for param, param_value in locals().items()
if param not in ["self"]
},
)
def create(
self,
wallet: Optional[str] = None,
wallet_config: Optional[str] = None,
) -> CommandResult:
"""add an account to the existing wallet
Args:
wallet (str): Target location of the wallet file ('-' to read from stdin);
conflicts with --wallet-config flag.
wallet_config (str): Target location of the wallet config file;
conflicts with --wallet flag.
Returns:
str: Command string
"""
assert bool(wallet) ^ bool(wallet_config), self.WALLET_SOURCE_ERROR_MSG
return self._execute(
"wallet create",
**{
param: param_value
for param, param_value in locals().items()
if param not in ["self"]
},
)
def dump(
self,
wallet: Optional[str] = None,
wallet_config: Optional[str] = None,
decrypt: bool = False,
) -> CommandResult:
"""check and dump an existing NEO wallet
Args:
wallet (str): Target location of the wallet file ('-' to read from stdin);
conflicts with --wallet-config flag.
wallet_config (str): Target location of the wallet config file;
conflicts with --wallet flag.
decrypt (bool): Decrypt encrypted keys.
Returns:
str: Command string
"""
assert bool(wallet) ^ bool(wallet_config), self.WALLET_SOURCE_ERROR_MSG
return self._execute(
"wallet dump",
**{
param: param_value
for param, param_value in locals().items()
if param not in ["self"]
},
)
def dump_keys(
self,
address: str,
wallet: Optional[str] = None,
wallet_config: Optional[str] = None,
) -> CommandResult:
"""check and dump an existing NEO wallet
Args:
wallet (str): Target location of the wallet file ('-' to read from stdin);
conflicts with --wallet-config flag.
wallet_config (str): Target location of the wallet config file;
conflicts with --wallet flag.
address (str): address to print public keys for
Returns:
str: Command string
"""
assert bool(wallet) ^ bool(wallet_config), self.WALLET_SOURCE_ERROR_MSG
return self._execute(
"wallet dump-keys",
**{
param: param_value
for param, param_value in locals().items()
if param not in ["self"]
},
)
def export(
self,
wallet: Optional[str] = None,
wallet_config: Optional[str] = None,
decrypt: bool = False,
) -> CommandResult:
"""export keys for address
Args:
wallet (str): Target location of the wallet file ('-' to read from stdin);
conflicts with --wallet-config flag.
wallet_config (str): Target location of the wallet config file;
conflicts with --wallet flag.
decrypt (bool): Decrypt encrypted keys.
Returns:
str: Command string
"""
assert bool(wallet) ^ bool(wallet_config), self.WALLET_SOURCE_ERROR_MSG
return self._execute(
"wallet export",
**{
param: param_value
for param, param_value in locals().items()
if param not in ["self"]
},
)
def import_wif(
self,
wif: str,
name: str,
contract: str,
wallet: Optional[str] = None,
wallet_config: Optional[str] = None,
) -> CommandResult:
"""import WIF of a standard signature contract
Args:
wallet (str): Target location of the wallet file ('-' to read from stdin);
conflicts with --wallet-config flag.
wallet_config (str): Target location of the wallet config file;
conflicts with --wallet flag.
wif (str): WIF to import
name (str): Optional account name
contract (str): Verification script for custom contracts
Returns:
str: Command string
"""
assert bool(wallet) ^ bool(wallet_config), self.WALLET_SOURCE_ERROR_MSG
return self._execute(
"wallet import",
**{
param: param_value
for param, param_value in locals().items()
if param not in ["self"]
},
)
def import_multisig(
self,
wif: str,
name: Optional[str] = None,
min_number: int = 0,
wallet: Optional[str] = None,
wallet_config: Optional[str] = None,
) -> CommandResult:
"""import multisig contract
Args:
wallet (str): Target location of the wallet file ('-' to read from stdin);
conflicts with --wallet-config flag.
wallet_config (str): Target location of the wallet config file;
conflicts with --wallet flag.
wif (str): WIF to import
name (str): Optional account name
min_number (int): Minimal number of signatures (default: 0)
Returns:
str: Command string
"""
assert bool(wallet) ^ bool(wallet_config), self.WALLET_SOURCE_ERROR_MSG
return self._execute(
"wallet import-multisig",
**{
param: param_value
for param, param_value in locals().items()
if param not in ["self"]
},
)
def import_deployed(
self,
wif: str,
rpc_endpoint: str,
name: Optional[str] = None,
wallet: Optional[str] = None,
wallet_config: Optional[str] = None,
contract: Optional[str] = None,
timeout: int = 10,
) -> CommandResult:
"""import multisig contract
Args:
wallet (str): Target location of the wallet file ('-' to read from stdin);
conflicts with --wallet-config flag.
wallet_config (str): Target location of the wallet config file;
conflicts with --wallet flag.
wif (str): WIF to import
name (str): Optional account name
contract (str): Contract hash or address
rpc_endpoint (str): RPC node address
timeout (int): Timeout for the operation (default: 10s)
Returns:
str: Command string
"""
assert bool(wallet) ^ bool(wallet_config), self.WALLET_SOURCE_ERROR_MSG
return self._execute(
"wallet import-deployed",
**{
param: param_value
for param, param_value in locals().items()
if param not in ["self"]
},
)
def remove(
self,
address: str,
wallet: Optional[str] = None,
wallet_config: Optional[str] = None,
force: bool = False,
) -> CommandResult:
"""check and dump an existing NEO wallet
Args:
wallet (str): Target location of the wallet file ('-' to read from stdin);
conflicts with --wallet-config flag.
wallet_config (str): Target location of the wallet config file;
conflicts with --wallet flag.
address (str): Account address or hash in LE form to be removed
force (bool): Do not ask for a confirmation
Returns:
str: Command string
"""
assert bool(wallet) ^ bool(wallet_config), self.WALLET_SOURCE_ERROR_MSG
return self._execute(
"wallet remove",
**{
param: param_value
for param, param_value in locals().items()
if param not in ["self"]
},
)
def sign(
self,
input_file: str,
address: str,
rpc_endpoint: str,
wallet: Optional[str] = None,
wallet_config: Optional[str] = None,
out: Optional[str] = None,
timeout: int = 10,
) -> CommandResult:
"""import multisig contract
Args:
wallet (str): Target location of the wallet file ('-' to read from stdin);
conflicts with --wallet-config flag.
wallet_config (str): Target location of the wallet config file;
conflicts with --wallet flag.
out (str): file to put JSON transaction to
input_file (str): file with JSON transaction
address (str): Address to use
rpc_endpoint (str): RPC node address
timeout (int): Timeout for the operation (default: 10s)
Returns:
str: Command string
"""
assert bool(wallet) ^ bool(wallet_config), self.WALLET_SOURCE_ERROR_MSG
return self._execute(
"wallet sign",
**{
param: param_value
for param, param_value in locals().items()
if param not in ["self"]
},
)

View file

@ -0,0 +1,14 @@
import os
from neofs_testlib.reporter.allure_reporter import AllureReporter
from neofs_testlib.reporter.dummy_reporter import DummyReporter
from neofs_testlib.reporter.interfaces import Reporter
def get_reporter() -> Reporter:
# TODO: in scope of reporter implementation task here we will have extendable
# solution for configuring and providing reporter for the library
if os.getenv("TESTLIB_REPORTER_TYPE", "DUMMY") == "DUMMY":
return DummyReporter()
else:
return AllureReporter()

View file

@ -0,0 +1,36 @@
import os
from contextlib import AbstractContextManager
from textwrap import shorten
from typing import Any
import allure
from allure import attachment_type
from neofs_testlib.reporter.interfaces import Reporter
class AllureReporter(Reporter):
"""
Implements storing of test artifacts in Allure report.
"""
def step(self, name: str) -> AbstractContextManager:
name = shorten(name, width=70, placeholder="...")
return allure.step(name)
def attach(self, body: Any, file_name: str) -> None:
attachment_name, extension = os.path.splitext(file_name)
attachment_type = self._resolve_attachment_type(extension)
allure.attach(body, attachment_name, attachment_type)
def _resolve_attachment_type(self, extension: str) -> attachment_type:
"""
Try to find matching Allure attachment type by extension. If no match was found,
default to TXT format.
"""
extension = extension.lower()
return next(
(allure_type for allure_type in attachment_type if allure_type.extension == extension),
attachment_type.TXT,
)

View file

@ -0,0 +1,21 @@
from contextlib import AbstractContextManager, contextmanager
from typing import Any
from neofs_testlib.reporter.interfaces import Reporter
@contextmanager
def _dummy_step():
yield
class DummyReporter(Reporter):
"""
Dummy implementation of reporter, does not store artifacts anywhere.
"""
def step(self, name: str) -> AbstractContextManager:
return _dummy_step()
def attach(self, content: Any, file_name: str) -> None:
pass

View file

@ -0,0 +1,28 @@
from abc import ABC, abstractmethod
from contextlib import AbstractContextManager
from typing import Any
class Reporter(ABC):
"""
Interface that supports storage of test artifacts in some reporting tool.
"""
@abstractmethod
def step(self, name: str) -> AbstractContextManager:
"""
Register a new step in test execution.
:param str name: Name of the step
:return: step context
"""
@abstractmethod
def attach(self, content: Any, file_name: str) -> None:
"""
Attach specified content with given file name to the test report.
:param any content: content to attach. If content value is not a string, it will be
converted to a string.
:param str file_name: file name of attachment.
"""

View file

@ -0,0 +1,3 @@
from neofs_testlib.shell.interfaces import CommandResult, Shell
from neofs_testlib.shell.local_shell import LocalShell
from neofs_testlib.shell.ssh_shell import SSHShell

View file

@ -0,0 +1,61 @@
from abc import ABC, abstractmethod
from dataclasses import dataclass
from typing import Optional
@dataclass
class InteractiveInput:
"""
Interactive input for a shell command.
:attr str prompt_pattern: regular expression that defines expected prompt from the command.
:attr str input: user input that should be supplied to the command in response to the prompt.
"""
prompt_pattern: str
input: str
@dataclass
class CommandOptions:
"""
Options that control command execution.
:attr list interactive_inputs: user inputs that should be interactively supplied to
the command during execution.
:attr int timeout: timeout for command execution (in seconds).
:attr bool check: controls whether to check return code of the command. Set to False to
ignore non-zero return codes.
"""
interactive_inputs: Optional[list[InteractiveInput]] = None
timeout: int = 30
check: bool = True
@dataclass
class CommandResult:
"""
Represents a result of a command executed via shell.
"""
stdout: str
stderr: str
return_code: int
class Shell(ABC):
"""
Interface of a command shell on some system (local or remote).
"""
@abstractmethod
def exec(self, command: str, options: Optional[CommandOptions] = None) -> CommandResult:
"""
Executes specified command on this shell. To execute interactive command, user inputs
should be specified in *options*.
:param str command: command to execute on the shell.
:param CommandOptions options: options that control command execution.
:return command result.
"""

View file

@ -0,0 +1,173 @@
import logging
import subprocess
import tempfile
from datetime import datetime
from typing import IO, Optional
import pexpect
from neofs_testlib.reporter import get_reporter
from neofs_testlib.shell.interfaces import CommandOptions, CommandResult, Shell
logger = logging.getLogger("neofs.testlib.shell")
reporter = get_reporter()
class LocalShell(Shell):
"""
Implements command shell on a local machine.
"""
def exec(self, command: str, options: Optional[CommandOptions] = None) -> CommandResult:
# If no options were provided, use default options
options = options or CommandOptions()
logger.info(f"Executing command: {command}")
if options.interactive_inputs:
return self._exec_interactive(command, options)
return self._exec_non_interactive(command, options)
def _exec_interactive(self, command: str, options: CommandOptions) -> CommandResult:
start_time = datetime.utcnow()
log_file = tempfile.TemporaryFile() # File is reliable cross-platform way to capture output
result = None
command_process = None
try:
command_process = pexpect.spawn(command, timeout=options.timeout)
command_process.delaybeforesend = 1
command_process.logfile_read = log_file
for interactive_input in options.interactive_inputs:
command_process.expect(interactive_input.prompt_pattern)
command_process.sendline(interactive_input.input)
result = self._get_pexpect_process_result(command_process, command)
if options.check and result.return_code != 0:
raise RuntimeError(
f"Command: {command}\nreturn code: {result.return_code}\nOutput: {result.stdout}"
)
return result
except pexpect.ExceptionPexpect as exc:
result = self._get_pexpect_process_result(command_process, command)
message = (
f"Command: {command}\nreturn code: {result.return_code}\nOutput: {result.stdout}"
)
if options.check:
raise RuntimeError(message) from exc
else:
logger.exception(message)
return result
except OSError as exc:
result = self._get_pexpect_process_result(command_process, command)
message = (
f"Command: {command}\nreturn code: {result.return_code}\nOutput: {exc.strerror}"
)
if options.check:
raise RuntimeError(message) from exc
else:
logger.exception(message)
return result
except Exception:
result = self._get_pexpect_process_result(command_process, command)
raise
finally:
log_file.close()
end_time = datetime.utcnow()
self._report_command_result(command, start_time, end_time, result)
def _exec_non_interactive(self, command: str, options: CommandOptions) -> CommandResult:
start_time = datetime.utcnow()
result = None
try:
command_process = subprocess.run(
command,
check=options.check,
universal_newlines=True,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
timeout=options.timeout,
shell=True,
)
result = CommandResult(
stdout=command_process.stdout or "",
stderr=command_process.stderr or "",
return_code=command_process.returncode,
)
return result
except subprocess.CalledProcessError as exc:
# TODO: always set check flag to false and capture command result normally
result = self._get_failing_command_result(command)
raise RuntimeError(
f"Command: {command}\nError:\n"
f"return code: {exc.returncode}\n"
f"output: {exc.output}"
) from exc
except OSError as exc:
raise RuntimeError(f"Command: {command}\nOutput: {exc.strerror}") from exc
except Exception as exc:
result = self._get_failing_command_result(command)
raise
finally:
end_time = datetime.utcnow()
self._report_command_result(command, start_time, end_time, result)
def _get_failing_command_result(self, command: str) -> CommandResult:
return_code, cmd_output = subprocess.getstatusoutput(command)
return CommandResult(stdout=cmd_output, stderr="", return_code=return_code)
def _get_pexpect_process_result(
self, command_process: Optional[pexpect.spawn], command: str
) -> CommandResult:
"""
If command process is not None, captures output of this process.
If command process is None, then command fails when we attempt to start it, in this case
we use regular non-interactive process to get it's output.
"""
if command_process is None:
return self._get_failing_command_result(command)
# Wait for child process to end it's work
if command_process.isalive():
command_process.expect(pexpect.EOF)
# Close the process to obtain the exit code
command_process.close()
return_code = command_process.exitstatus
# Capture output from the log file
log_file: IO[bytes] = command_process.logfile_read
log_file.seek(0)
output = log_file.read().decode()
return CommandResult(stdout=output, stderr="", return_code=return_code)
def _report_command_result(
self,
command: str,
start_time: datetime,
end_time: datetime,
result: Optional[CommandResult],
) -> None:
# TODO: increase logging level if return code is non 0, should be warning at least
logger.info(
f"Command: {command}\n"
f"{'Success:' if result and result.return_code == 0 else 'Error:'}\n"
f"return code: {result.return_code if result else ''} "
f"\nOutput: {result.stdout if result else ''}"
)
if result:
elapsed_time = end_time - start_time
command_attachment = (
f"COMMAND: {command}\n"
f"RETCODE: {result.return_code}\n\n"
f"STDOUT:\n{result.stdout}\n"
f"STDERR:\n{result.stderr}\n"
f"Start / End / Elapsed\t {start_time.time()} / {end_time.time()} / {elapsed_time}"
)
with reporter.step(f"COMMAND: {command}"):
reporter.attach(command_attachment, "Command execution.txt")

View file

@ -0,0 +1,239 @@
import logging
import socket
import textwrap
from datetime import datetime
from functools import lru_cache, wraps
from time import sleep
from typing import ClassVar, Optional
from paramiko import (
AutoAddPolicy,
ECDSAKey,
Ed25519Key,
PKey,
RSAKey,
SSHClient,
SSHException,
ssh_exception,
)
from paramiko.ssh_exception import AuthenticationException
from neofs_testlib.reporter import get_reporter
from neofs_testlib.shell.interfaces import CommandOptions, CommandResult, Shell
logger = logging.getLogger("neofs.testlib.shell")
reporter = get_reporter()
class HostIsNotAvailable(Exception):
"""Raised when host is not reachable via SSH connection"""
def __init__(self, host: str = None):
msg = f"Host {host} is not available"
super().__init__(msg)
def log_command(func):
@wraps(func)
def wrapper(shell: "SSHShell", command: str, *args, **kwargs) -> CommandResult:
command_info = command.removeprefix("$ProgressPreference='SilentlyContinue'\n")
with reporter.step(command_info):
logging.info(f'Execute command "{command}" on "{shell.host}"')
start_time = datetime.utcnow()
result = func(shell, command, *args, **kwargs)
end_time = datetime.utcnow()
elapsed_time = end_time - start_time
log_message = (
f"HOST: {shell.host}\n"
f"COMMAND:\n{textwrap.indent(command, ' ')}\n"
f"RC:\n {result.return_code}\n"
f"STDOUT:\n{textwrap.indent(result.stdout, ' ')}\n"
f"STDERR:\n{textwrap.indent(result.stderr, ' ')}\n"
f"Start / End / Elapsed\t {start_time.time()} / {end_time.time()} / {elapsed_time}"
)
logger.info(log_message)
reporter.attach(log_message, "SSH command.txt")
return result
return wrapper
@lru_cache
def _load_private_key(file_path: str, password: Optional[str]) -> PKey:
"""
Loads private key from specified file.
We support several type formats, however paramiko doesn't provide functionality to determine
key type in advance. So we attempt to load file with each of the supported formats and then
cache the result so that we don't need to figure out type again on subsequent calls.
"""
logger.debug(f"Loading ssh key from {file_path}")
for key_type in (Ed25519Key, ECDSAKey, RSAKey):
try:
return key_type.from_private_key_file(file_path, password)
except SSHException as ex:
logger.warn(f"SSH key {file_path} can't be loaded with {key_type}: {ex}")
continue
raise SSHException(f"SSH key {file_path} is not supported")
class SSHShell(Shell):
"""
Implements command shell on a remote machine via SSH connection.
"""
# Time in seconds to delay after remote command has completed. The delay is required
# to allow remote command to flush its output buffer
DELAY_AFTER_EXIT = 0.2
SSH_CONNECTION_ATTEMPTS: ClassVar[int] = 3
CONNECTION_TIMEOUT = 90
def __init__(
self,
host: str,
login: str,
password: Optional[str] = None,
private_key_path: Optional[str] = None,
private_key_passphrase: Optional[str] = None,
port: str = "22",
) -> None:
self.host = host
self.port = port
self.login = login
self.password = password
self.private_key_path = private_key_path
self.private_key_passphrase = private_key_passphrase
self.__connection: Optional[SSHClient] = None
@property
def _connection(self):
if not self.__connection:
self.__connection = self._create_connection()
return self.__connection
def drop(self):
self._reset_connection()
def exec(self, command: str, options: Optional[CommandOptions] = None) -> CommandResult:
options = options or CommandOptions()
if options.interactive_inputs:
result = self._exec_interactive(command, options)
else:
result = self._exec_non_interactive(command, options)
if options.check and result.return_code != 0:
raise RuntimeError(
f"Command: {command}\nreturn code: {result.return_code}"
f"\nOutput: {result.stdout}"
)
return result
@log_command
def _exec_interactive(self, command: str, options: CommandOptions) -> CommandResult:
stdin, stdout, stderr = self._connection.exec_command(command, timeout=options.timeout)
for interactive_input in options.interactive_inputs:
input = interactive_input.input
if not input.endswith("\n"):
input = f"{input}\n"
try:
stdin.write(input)
except OSError:
logger.exception(f"Error while feeding {input} into command {command}")
# stdin.close()
# Wait for command to complete and flush its buffer before we attempt to read output
sleep(self.DELAY_AFTER_EXIT)
return_code = stdout.channel.recv_exit_status()
sleep(self.DELAY_AFTER_EXIT)
result = CommandResult(
stdout=stdout.read().decode(errors="ignore"),
stderr=stderr.read().decode(errors="ignore"),
return_code=return_code,
)
return result
@log_command
def _exec_non_interactive(self, command: str, options: CommandOptions) -> CommandResult:
try:
_, stdout, stderr = self._connection.exec_command(command, timeout=options.timeout)
# Wait for command to complete and flush its buffer before we attempt to read output
return_code = stdout.channel.recv_exit_status()
sleep(self.DELAY_AFTER_EXIT)
return CommandResult(
stdout=stdout.read().decode(errors="ignore"),
stderr=stderr.read().decode(errors="ignore"),
return_code=return_code,
)
except (
SSHException,
TimeoutError,
ssh_exception.NoValidConnectionsError,
ConnectionResetError,
AttributeError,
socket.timeout,
) as exc:
logger.exception(f"Can't execute command {command} on host: {self.host}")
self._reset_connection()
raise HostIsNotAvailable(self.host) from exc
def _create_connection(self, attempts: int = SSH_CONNECTION_ATTEMPTS) -> SSHClient:
for attempt in range(attempts):
connection = SSHClient()
connection.set_missing_host_key_policy(AutoAddPolicy())
try:
if self.private_key_path:
logging.info(
f"Trying to connect to host {self.host} as {self.login} using SSH key "
f"{self.private_key_path} (attempt {attempt})"
)
connection.connect(
hostname=self.host,
port=self.port,
username=self.login,
pkey=_load_private_key(self.private_key_path, self.private_key_passphrase),
timeout=self.CONNECTION_TIMEOUT,
)
else:
logging.info(
f"Trying to connect to host {self.host} as {self.login} using password "
f"(attempt {attempt})"
)
connection.connect(
hostname=self.host,
port=self.port,
username=self.login,
password=self.password,
timeout=self.CONNECTION_TIMEOUT,
)
return connection
except AuthenticationException:
connection.close()
logger.exception(f"Can't connect to host {self.host}")
raise
except (
SSHException,
ssh_exception.NoValidConnectionsError,
AttributeError,
socket.timeout,
OSError,
) as exc:
connection.close()
can_retry = attempt + 1 < attempts
if can_retry:
logger.warn(f"Can't connect to host {self.host}, will retry. Error: {exc}")
continue
logger.exception(f"Can't connect to host {self.host}")
raise HostIsNotAvailable(self.host) from exc
def _reset_connection(self) -> None:
if self.__connection:
self.__connection.close()
self.__connection = None