fixed [#233]; keywords from neofs-keywords moved to neofs-testcases

Signed-off-by: anastasia prasolova <anastasia@nspcc.ru>
This commit is contained in:
anastasia prasolova 2022-07-04 22:49:14 +03:00 committed by Anastasia Prasolova
parent b1b86351df
commit 34d282cf5a
19 changed files with 147 additions and 72 deletions

View file

@ -0,0 +1,31 @@
#!/usr/bin/python3.9
import contract
from robot.api import logger
from robot.api.deco import keyword
from robot.libraries.BuiltIn import BuiltIn
IR_WALLET_PATH = BuiltIn().get_variable_value("${IR_WALLET_PATH}")
IR_WALLET_PASS = BuiltIn().get_variable_value("${IR_WALLET_PASS}")
SIDECHAIN_EP = BuiltIn().get_variable_value("${MORPH_ENDPOINT}")
@keyword('Get Epoch')
def get_epoch():
epoch = int(contract.testinvoke_contract(
contract.get_netmap_contract_hash(SIDECHAIN_EP),
'epoch',
SIDECHAIN_EP)
)
logger.info(f"Got epoch {epoch}")
return epoch
@keyword('Tick Epoch')
def tick_epoch():
cur_epoch = get_epoch()
return contract.invoke_contract_multisig(
contract.get_netmap_contract_hash(SIDECHAIN_EP),
f"newEpoch int:{cur_epoch+1}",
IR_WALLET_PATH, IR_WALLET_PASS, SIDECHAIN_EP)

View file

@ -3,29 +3,26 @@
import re
import time
from common import (MAINNET_WALLET_PATH, MORPH_ENDPOINT,
NEO_MAINNET_ENDPOINT, NEOFS_CONTRACT, MAINNET_SINGLE_ADDR)
import rpc_client
import contract
import converters
from wallet import nep17_transfer
from wrappers import run_sh_with_passwd_contract
import rpc_client
from common import (GAS_HASH, MAINNET_SINGLE_ADDR, MAINNET_WALLET_PATH,
MORPH_ENDPOINT, NEO_MAINNET_ENDPOINT, NEOFS_CONTRACT,
NEOGO_CLI_EXEC)
from converters import load_wallet
from neo3 import wallet
from robot.api.deco import keyword
from robot.api import logger
from robot.libraries.BuiltIn import BuiltIn
from robot.api.deco import keyword
from wallet import nep17_transfer
from wrappers import run_sh_with_passwd_contract
ROBOT_AUTO_KEYWORDS = False
MORPH_TOKEN_POWER = 12
EMPTY_PASSWORD = ''
MAINNET_WALLET_PASS = 'one'
TX_PERSIST_TIMEOUT = 15 #seconds
NEOGO_CLI_EXEC = BuiltIn().get_variable_value("${NEOGO_CLI_EXEC}")
TX_PERSIST_TIMEOUT = 15 # seconds
ASSET_POWER_MAINCHAIN = 10 ** 8
ASSET_POWER_SIDECHAIN = 10 ** 12
morph_rpc_cli = rpc_client.RPCClient(MORPH_ENDPOINT)
mainnet_rpc_cli = rpc_client.RPCClient(NEO_MAINNET_ENDPOINT)
@ -43,7 +40,8 @@ def withdraw_mainnet_gas(wlt: str, amount: int):
)
logger.info(f"Executing command: {cmd}")
out = (run_sh_with_passwd_contract('', cmd, expect_confirmation=True)).decode('utf-8')
raw_out = run_sh_with_passwd_contract('', cmd, expect_confirmation=True)
out = raw_out.decode('utf-8')
logger.info(f"Command completed with output: {out}")
m = re.match(r'^Sent invocation transaction (\w{64})$', out)
if m is None:
@ -90,20 +88,21 @@ def get_balance(wallet_path: str):
]
try:
resp = morph_rpc_cli.invoke_function(
contract.get_balance_contract_hash(MORPH_ENDPOINT),
'balanceOf',
payload
)
contract.get_balance_contract_hash(MORPH_ENDPOINT),
'balanceOf',
payload
)
logger.info(f"Got response \n{resp}")
value = int(resp['stack'][0]['value'])
return value / (10 ** MORPH_TOKEN_POWER)
return value / ASSET_POWER_SIDECHAIN
except Exception as out:
logger.error(f"failed to get wallet balance: {out}")
raise out
@keyword('Transfer Mainnet Gas')
def transfer_mainnet_gas(wallet_to: str, amount: int, wallet_password: str = EMPTY_PASSWORD):
def transfer_mainnet_gas(wallet_to: str, amount: int,
wallet_password: str = EMPTY_PASSWORD):
'''
This function transfer GAS in main chain from mainnet wallet to
the provided wallet. If the wallet contains more than one address,
@ -118,14 +117,17 @@ def transfer_mainnet_gas(wallet_to: str, amount: int, wallet_password: str = EMP
'''
address_to = _address_from_wallet(wallet_to, wallet_password)
txid = nep17_transfer(MAINNET_WALLET_PATH, address_to, amount, NEO_MAINNET_ENDPOINT,
wallet_pass=MAINNET_WALLET_PASS, addr_from=MAINNET_SINGLE_ADDR)
txid = nep17_transfer(MAINNET_WALLET_PATH, address_to, amount,
NEO_MAINNET_ENDPOINT,
wallet_pass=MAINNET_WALLET_PASS,
addr_from=MAINNET_SINGLE_ADDR)
if not transaction_accepted(txid):
raise AssertionError(f"TX {txid} hasn't been processed")
@keyword('NeoFS Deposit')
def neofs_deposit(wallet_to: str, amount: int, wallet_password: str = EMPTY_PASSWORD):
def neofs_deposit(wallet_to: str, amount: int,
wallet_password: str = EMPTY_PASSWORD):
"""
Transferring GAS from given wallet to NeoFS contract address.
"""
@ -135,11 +137,13 @@ def neofs_deposit(wallet_to: str, amount: int, wallet_password: str = EMPTY_PASS
address_to = _address_from_wallet(wallet_to, wallet_password)
txid = nep17_transfer(wallet_to, deposit_addr, amount, NEO_MAINNET_ENDPOINT,
wallet_pass=wallet_password, addr_from=address_to)
txid = nep17_transfer(wallet_to, deposit_addr, amount,
NEO_MAINNET_ENDPOINT, wallet_pass=wallet_password,
addr_from=address_to)
if not transaction_accepted(txid):
raise AssertionError(f"TX {txid} hasn't been processed")
def _address_from_wallet(wlt: str, wallet_password: str):
"""
Extracting the address from the given wallet.
@ -153,3 +157,23 @@ def _address_from_wallet(wlt: str, wallet_password: str):
address = wallet_loaded.accounts[-1].address
logger.info(f"got address: {address}")
return address
@keyword('Get Mainnet Balance')
def get_mainnet_balance(address: str):
resp = mainnet_rpc_cli.get_nep17_balances(address=address)
logger.info(f"Got getnep17balances response: {resp}")
for balance in resp['balance']:
if balance['assethash'] == GAS_HASH:
return float(balance['amount'])/ASSET_POWER_MAINCHAIN
return float(0)
@keyword('Get Sidechain Balance')
def get_sidechain_balance(address: str):
resp = morph_rpc_cli.get_nep17_balances(address=address)
logger.info(f"Got getnep17balances response: {resp}")
for balance in resp['balance']:
if balance['assethash'] == GAS_HASH:
return float(balance['amount'])/ASSET_POWER_SIDECHAIN
return float(0)

View file

@ -6,12 +6,13 @@ import tarfile
import uuid
import docker
import wallet
from common import ASSETS_DIR, SIMPLE_OBJ_SIZE
from robot.api import logger
from robot.api.deco import keyword
from robot.libraries.BuiltIn import BuiltIn
from cli_helpers import _cmd_run
from common import SIMPLE_OBJ_SIZE, ASSETS_DIR
ROBOT_AUTO_KEYWORDS = False
@ -19,7 +20,8 @@ ROBOT_AUTO_KEYWORDS = False
@keyword('Generate file')
def generate_file_and_file_hash(size: int) -> str:
"""
Function generates a big binary file with the specified size in bytes and its hash.
Function generates a big binary file with the specified size in bytes
and its hash.
Args:
size (int): the size in bytes, can be declared as 6e+6 for example
Returns:
@ -51,6 +53,18 @@ def get_file_hash(filename: str):
return file_hash.hexdigest()
@keyword('Generate Wallet')
def generate_wallet():
return wallet.init_wallet_w_addr(ASSETS_DIR)
# TODO: should be deleted in the scope
# of https://github.com/nspcc-dev/neofs-testcases/issues/191
@keyword('Init Wallet from WIF')
def init_wallet_from_wif(dir_path: str, wif: str):
return wallet.init_wallet_from_wif(dir_path, wif)
@keyword('Get Docker Logs')
def get_container_logs(testcase_name: str) -> None:
client = docker.APIClient(base_url='unix://var/run/docker.sock')
@ -59,7 +73,8 @@ def get_container_logs(testcase_name: str) -> None:
tar = tarfile.open(tar_name, "w:gz")
for container in client.containers():
container_name = container['Names'][0][1:]
if client.inspect_container(container_name)['Config']['Domainname'] == "neofs.devenv":
if (client.inspect_container(container_name)['Config']['Domainname']
== "neofs.devenv"):
file_name = f"{logs_dir}/docker_log_{container_name}"
with open(file_name, 'wb') as out:
out.write(client.logs(container_name))
@ -84,7 +99,10 @@ def make_up(services: list = [], config_dict: dict = {}):
cmd = f'make up/{service}'
_cmd_run(cmd)
else:
cmd = f'make up/basic; make update.max_object_size val={SIMPLE_OBJ_SIZE}'
cmd = (
f'make up/basic;'
f'make update.max_object_size val={SIMPLE_OBJ_SIZE}'
)
_cmd_run(cmd, timeout=120)
os.chdir(test_path)