[#334] Move common_steps*
Signed-off-by: Elizaveta Chichindaeva <elizaveta@nspcc.ru>
This commit is contained in:
parent
26f98c2cab
commit
18c30c39ff
69 changed files with 256 additions and 263 deletions
200
robot/resources/lib/python/acl.py
Normal file
200
robot/resources/lib/python/acl.py
Normal file
|
@ -0,0 +1,200 @@
|
|||
#!/usr/bin/python3.8
|
||||
|
||||
from enum import Enum, auto
|
||||
import json
|
||||
import os
|
||||
import re
|
||||
import uuid
|
||||
|
||||
import base64
|
||||
import base58
|
||||
from cli_helpers import _cmd_run
|
||||
from common import ASSETS_DIR, NEOFS_ENDPOINT
|
||||
from robot.api.deco import keyword
|
||||
from robot.api import logger
|
||||
|
||||
|
||||
"""
|
||||
Robot Keywords and helper functions for work with NeoFS ACL.
|
||||
"""
|
||||
|
||||
|
||||
ROBOT_AUTO_KEYWORDS = False
|
||||
|
||||
# path to neofs-cli executable
|
||||
NEOFS_CLI_EXEC = os.getenv('NEOFS_CLI_EXEC', 'neofs-cli')
|
||||
EACL_LIFETIME = 100500
|
||||
|
||||
class AutoName(Enum):
|
||||
def _generate_next_value_(name, start, count, last_values):
|
||||
return name
|
||||
|
||||
class Role(AutoName):
|
||||
USER = auto()
|
||||
SYSTEM = auto()
|
||||
OTHERS = auto()
|
||||
|
||||
|
||||
@keyword('Get eACL')
|
||||
def get_eacl(wif: str, cid: str):
|
||||
cmd = (
|
||||
f'{NEOFS_CLI_EXEC} --rpc-endpoint {NEOFS_ENDPOINT} --wif {wif} '
|
||||
f'container get-eacl --cid {cid}'
|
||||
)
|
||||
logger.info(f"cmd: {cmd}")
|
||||
try:
|
||||
output = _cmd_run(cmd)
|
||||
if re.search(r'extended ACL table is not set for this container', output):
|
||||
return None
|
||||
return output
|
||||
except RuntimeError as exc:
|
||||
logger.info("Extended ACL table is not set for this container")
|
||||
logger.info(f"Got exception while getting eacl: {exc}")
|
||||
return None
|
||||
|
||||
|
||||
@keyword('Set eACL')
|
||||
def set_eacl(wif: str, cid: str, eacl_table_path: str):
|
||||
cmd = (
|
||||
f'{NEOFS_CLI_EXEC} --rpc-endpoint {NEOFS_ENDPOINT} --wif {wif} '
|
||||
f'container set-eacl --cid {cid} --table {eacl_table_path} --await'
|
||||
)
|
||||
logger.info(f"cmd: {cmd}")
|
||||
_cmd_run(cmd)
|
||||
|
||||
|
||||
def _encode_cid_for_eacl(cid: str) -> str:
|
||||
cid_base58 = base58.b58decode(cid)
|
||||
return base64.b64encode(cid_base58).decode("utf-8")
|
||||
|
||||
|
||||
@keyword('Form BearerToken File')
|
||||
def form_bearertoken_file(wif: str, cid: str, eacl_records: list) -> str:
|
||||
"""
|
||||
This function fetches eACL for given <cid> on behalf of <wif>,
|
||||
then extends it with filters taken from <eacl_records>, signs
|
||||
with bearer token and writes to file
|
||||
"""
|
||||
enc_cid = _encode_cid_for_eacl(cid)
|
||||
file_path = f"{os.getcwd()}/{ASSETS_DIR}/{str(uuid.uuid4())}"
|
||||
|
||||
eacl = get_eacl(wif, cid)
|
||||
json_eacl = dict()
|
||||
if eacl:
|
||||
eacl = eacl.replace('eACL: ', '')
|
||||
eacl = eacl.split('Signature')[0]
|
||||
json_eacl = json.loads(eacl)
|
||||
logger.info(json_eacl)
|
||||
eacl_result = {
|
||||
"body":
|
||||
{
|
||||
"eaclTable":
|
||||
{
|
||||
"containerID":
|
||||
{
|
||||
"value": enc_cid
|
||||
},
|
||||
"records": []
|
||||
},
|
||||
"lifetime":
|
||||
{
|
||||
"exp": EACL_LIFETIME,
|
||||
"nbf": "1",
|
||||
"iat": "0"
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if not eacl_records:
|
||||
raise(f"Got empty eacl_records list: {eacl_records}")
|
||||
for record in eacl_records:
|
||||
op_data = {
|
||||
"operation": record['Operation'],
|
||||
"action": record['Access'],
|
||||
"filters": [],
|
||||
"targets": []
|
||||
}
|
||||
|
||||
if Role(record['Role']):
|
||||
op_data['targets'] = [
|
||||
{
|
||||
"role": record['Role']
|
||||
}
|
||||
]
|
||||
else:
|
||||
op_data['targets'] = [
|
||||
{
|
||||
"keys": [ record['Role'] ]
|
||||
}
|
||||
]
|
||||
|
||||
if 'Filters' in record.keys():
|
||||
op_data["filters"].append(record['Filters'])
|
||||
|
||||
eacl_result["body"]["eaclTable"]["records"].append(op_data)
|
||||
|
||||
# Add records from current eACL
|
||||
if "records" in json_eacl.keys():
|
||||
for record in json_eacl["records"]:
|
||||
eacl_result["body"]["eaclTable"]["records"].append(record)
|
||||
|
||||
with open(file_path, 'w', encoding='utf-8') as eacl_file:
|
||||
json.dump(eacl_result, eacl_file, ensure_ascii=False, indent=4)
|
||||
|
||||
logger.info(f"Got these extended ACL records: {eacl_result}")
|
||||
sign_bearer_token(wif, file_path)
|
||||
return file_path
|
||||
|
||||
|
||||
def sign_bearer_token(wif: str, eacl_rules_file: str):
|
||||
cmd = (
|
||||
f'{NEOFS_CLI_EXEC} util sign bearer-token --from {eacl_rules_file} '
|
||||
f'--to {eacl_rules_file} --wif {wif} --json'
|
||||
)
|
||||
logger.info(f"cmd: {cmd}")
|
||||
_cmd_run(cmd)
|
||||
|
||||
|
||||
@keyword('Form eACL JSON Common File')
|
||||
def form_eacl_json_common_file(eacl_records: list) -> str:
|
||||
# Input role can be Role (USER, SYSTEM, OTHERS) or public key.
|
||||
eacl = {"records":[]}
|
||||
file_path = f"{os.getcwd()}/{ASSETS_DIR}/{str(uuid.uuid4())}"
|
||||
|
||||
for record in eacl_records:
|
||||
op_data = dict()
|
||||
|
||||
if Role(record['Role']):
|
||||
op_data = {
|
||||
"operation": record['Operation'],
|
||||
"action": record['Access'],
|
||||
"filters": [],
|
||||
"targets": [
|
||||
{
|
||||
"role": record['Role']
|
||||
}
|
||||
]
|
||||
}
|
||||
else:
|
||||
op_data = {
|
||||
"operation": record['Operation'],
|
||||
"action": record['Access'],
|
||||
"filters": [],
|
||||
"targets": [
|
||||
{
|
||||
"keys": [ record['Role'] ]
|
||||
}
|
||||
]
|
||||
}
|
||||
|
||||
if 'Filters' in record.keys():
|
||||
op_data["filters"].append(record['Filters'])
|
||||
|
||||
eacl["records"].append(op_data)
|
||||
|
||||
logger.info(f"Got these extended ACL records: {eacl}")
|
||||
|
||||
with open(file_path, 'w', encoding='utf-8') as eacl_file:
|
||||
json.dump(eacl, eacl_file, ensure_ascii=False, indent=4)
|
||||
|
||||
return file_path
|
38
robot/resources/lib/python/cli_helpers.py
Normal file
38
robot/resources/lib/python/cli_helpers.py
Normal file
|
@ -0,0 +1,38 @@
|
|||
#!/usr/bin/python3.8
|
||||
|
||||
"""
|
||||
Helper functions to use with `neofs-cli`, `neo-go`
|
||||
and other CLIs.
|
||||
"""
|
||||
|
||||
import subprocess
|
||||
import pexpect
|
||||
|
||||
from robot.api import logger
|
||||
|
||||
ROBOT_AUTO_KEYWORDS = False
|
||||
|
||||
|
||||
def _cmd_run(cmd):
|
||||
"""
|
||||
Runs given shell command <cmd>, in case of success returns its stdout,
|
||||
in case of failure returns error message.
|
||||
"""
|
||||
try:
|
||||
compl_proc = subprocess.run(cmd, check=True, universal_newlines=True,
|
||||
stdout=subprocess.PIPE, stderr=subprocess.STDOUT, timeout=30,
|
||||
shell=True)
|
||||
output = compl_proc.stdout
|
||||
logger.info(f"Output: {output}")
|
||||
return output
|
||||
except subprocess.CalledProcessError as exc:
|
||||
raise RuntimeError(f"Error:\nreturn code: {exc.returncode} "
|
||||
f"\nOutput: {exc.output}") from exc
|
||||
|
||||
def _run_with_passwd(cmd):
|
||||
p = pexpect.spawn(cmd)
|
||||
p.expect(".*")
|
||||
p.sendline('\r')
|
||||
p.wait()
|
||||
cmd = p.read()
|
||||
return cmd.decode()
|
23
robot/resources/lib/python/cli_keywords.py
Normal file
23
robot/resources/lib/python/cli_keywords.py
Normal file
|
@ -0,0 +1,23 @@
|
|||
#!/usr/bin/python3.8
|
||||
|
||||
import pexpect
|
||||
|
||||
from robot.api.deco import keyword
|
||||
|
||||
ROBOT_AUTO_KEYWORDS = False
|
||||
|
||||
@keyword('Run Process And Enter Empty Password')
|
||||
def run_proccess_and_interact(cmd: str) -> str:
|
||||
p = pexpect.spawn(cmd)
|
||||
p.expect("[pP]assword")
|
||||
# enter empty password
|
||||
p.sendline('\r')
|
||||
p.wait()
|
||||
# throw a string with password prompt
|
||||
first = p.readline()
|
||||
# take all output
|
||||
child_output = p.readline()
|
||||
p.close()
|
||||
if p.exitstatus != 0:
|
||||
raise Exception(f"{first}\n{child_output}")
|
||||
return child_output
|
260
robot/resources/lib/python/gates.py
Normal file
260
robot/resources/lib/python/gates.py
Normal file
|
@ -0,0 +1,260 @@
|
|||
#!/usr/bin/python3.8
|
||||
|
||||
import os
|
||||
import re
|
||||
import shutil
|
||||
import subprocess
|
||||
import uuid
|
||||
import requests
|
||||
import botocore
|
||||
import boto3
|
||||
|
||||
from common import *
|
||||
from robot.api.deco import keyword
|
||||
from robot.api import logger
|
||||
from cli_helpers import _run_with_passwd
|
||||
|
||||
|
||||
ROBOT_AUTO_KEYWORDS = False
|
||||
|
||||
NEOFS_EXEC = os.getenv('NEOFS_EXEC', 'neofs-authmate')
|
||||
|
||||
@keyword('Init S3 Credentials')
|
||||
def init_s3_credentials(wallet):
|
||||
bucket = str(uuid.uuid4())
|
||||
records = ' \' {"records":[{"operation":"PUT","action":"ALLOW","filters":[],"targets":[{"role":"OTHERS","keys":[]}]}, {"operation":"SEARCH","action":"ALLOW","filters":[],"targets":[{"role":"OTHERS","keys":[]}]}, {"operation":"GET","action":"ALLOW","filters":[],"targets":[{"role":"OTHERS","keys":[]}]}]} \' '
|
||||
Cmd = (
|
||||
f'{NEOFS_EXEC} --debug --with-log issue-secret --wallet {wallet} '
|
||||
f'--gate-public-key={GATE_PUB_KEY} --peer {NEOFS_ENDPOINT} '
|
||||
f'--container-friendly-name {bucket} --create-session-token '
|
||||
f'--bearer-rules {records}'
|
||||
)
|
||||
logger.info(f"Executing command: {Cmd}")
|
||||
|
||||
try:
|
||||
output = _run_with_passwd(Cmd)
|
||||
logger.info(f"Command completed with output: {output}")
|
||||
|
||||
m = re.search(r'"container_id":\s+"(\w+)"', output)
|
||||
cid = m.group(1)
|
||||
logger.info("cid: %s" % cid)
|
||||
|
||||
m = re.search(r'"access_key_id":\s+"([\w\/]+)"', output)
|
||||
access_key_id = m.group(1)
|
||||
logger.info("access_key_id: %s" % access_key_id)
|
||||
|
||||
m = re.search(r'"secret_access_key":\s+"(\w+)"', output)
|
||||
secret_access_key = m.group(1)
|
||||
logger.info("secret_access_key: %s" % secret_access_key)
|
||||
|
||||
m = re.search(r'"owner_private_key":\s+"(\w+)"', output)
|
||||
owner_private_key = m.group(1)
|
||||
logger.info("owner_private_key: %s" % owner_private_key)
|
||||
|
||||
return cid, bucket, access_key_id, secret_access_key, owner_private_key
|
||||
|
||||
except subprocess.CalledProcessError as e:
|
||||
raise Exception(f"Error: \nreturn code: {e.returncode}. \nOutput: {e.stderr}")
|
||||
|
||||
|
||||
@keyword('Config S3 client')
|
||||
def config_s3_client(access_key_id, secret_access_key):
|
||||
try:
|
||||
session = boto3.session.Session()
|
||||
|
||||
s3_client = session.client(
|
||||
service_name='s3',
|
||||
aws_access_key_id=access_key_id,
|
||||
aws_secret_access_key=secret_access_key,
|
||||
endpoint_url=S3_GATE, verify=False
|
||||
)
|
||||
|
||||
return s3_client
|
||||
|
||||
except botocore.exceptions.ClientError as err:
|
||||
raise Exception(f"Error Message: {err.response['Error']['Message']}\n"
|
||||
f"Http status code: {err.response['ResponseMetadata']['HTTPStatusCode']}") from err
|
||||
|
||||
|
||||
@keyword('List objects S3 v2')
|
||||
def list_objects_s3_v2(s3_client, bucket):
|
||||
try:
|
||||
response = s3_client.list_objects_v2(Bucket=bucket)
|
||||
logger.info("S3 v2 List objects result: %s" % response['Contents'])
|
||||
obj_list = []
|
||||
for obj in response['Contents']:
|
||||
obj_list.append(obj['Key'])
|
||||
logger.info("Found s3 objects: %s" % obj_list)
|
||||
return obj_list
|
||||
|
||||
except botocore.exceptions.ClientError as err:
|
||||
raise Exception(f"Error Message: {err.response['Error']['Message']}\n"
|
||||
f"Http status code: {err.response['ResponseMetadata']['HTTPStatusCode']}") from err
|
||||
|
||||
|
||||
@keyword('List objects S3')
|
||||
def list_objects_s3(s3_client, bucket):
|
||||
try:
|
||||
response = s3_client.list_objects(Bucket=bucket)
|
||||
logger.info("S3 List objects result: %s" % response['Contents'])
|
||||
obj_list = []
|
||||
for obj in response['Contents']:
|
||||
obj_list.append(obj['Key'])
|
||||
logger.info("Found s3 objects: %s" % obj_list)
|
||||
return obj_list
|
||||
|
||||
except botocore.exceptions.ClientError as err:
|
||||
raise Exception(f"Error Message: {err.response['Error']['Message']}\n"
|
||||
f"Http status code: {err.response['ResponseMetadata']['HTTPStatusCode']}") from err
|
||||
|
||||
|
||||
@keyword('Create bucket S3')
|
||||
def create_bucket_s3(s3_client):
|
||||
bucket_name = str(uuid.uuid4())
|
||||
|
||||
try:
|
||||
s3_bucket = s3_client.create_bucket(Bucket=bucket_name)
|
||||
logger.info("Created S3 bucket: %s" % s3_bucket)
|
||||
return bucket_name
|
||||
|
||||
except botocore.exceptions.ClientError as err:
|
||||
raise Exception(f"Error Message: {err.response['Error']['Message']}\n"
|
||||
f"Http status code: {err.response['ResponseMetadata']['HTTPStatusCode']}") from err
|
||||
|
||||
|
||||
@keyword('List buckets S3')
|
||||
def list_buckets_s3(s3_client):
|
||||
found_buckets = []
|
||||
try:
|
||||
response = s3_client.list_buckets()
|
||||
logger.info("S3 List buckets result: %s" % response)
|
||||
|
||||
for bucket in response['Buckets']:
|
||||
found_buckets.append(bucket['Name'])
|
||||
|
||||
return found_buckets
|
||||
|
||||
except botocore.exceptions.ClientError as err:
|
||||
raise Exception(f"Error Message: {err.response['Error']['Message']}\n"
|
||||
f"Http status code: {err.response['ResponseMetadata']['HTTPStatusCode']}") from err
|
||||
|
||||
|
||||
@keyword('Delete bucket S3')
|
||||
def delete_bucket_s3(s3_client, bucket):
|
||||
try:
|
||||
response = s3_client.delete_bucket(Bucket=bucket)
|
||||
logger.info(f"S3 Delete bucket result: {response}")
|
||||
|
||||
return response
|
||||
|
||||
except botocore.exceptions.ClientError as err:
|
||||
raise Exception(f"Error Message: {err.response['Error']['Message']}\n"
|
||||
f"Http status code: {err.response['ResponseMetadata']['HTTPStatusCode']}") from err
|
||||
|
||||
|
||||
@keyword('HeadBucket S3')
|
||||
def headbucket(bucket, s3_client):
|
||||
try:
|
||||
response = s3_client.head_bucket(Bucket=bucket)
|
||||
logger.info(f"S3 HeadBucket result: {response}")
|
||||
return response
|
||||
|
||||
except botocore.exceptions.ClientError as err:
|
||||
raise Exception(f"Error Message: {err.response['Error']['Message']}\n"
|
||||
f"Http status code: {err.response['ResponseMetadata']['HTTPStatusCode']}") from err
|
||||
|
||||
|
||||
@keyword('Put object S3')
|
||||
def put_object_s3(s3_client, bucket, filepath):
|
||||
filename = os.path.basename(filepath)
|
||||
|
||||
with open(filepath, "rb") as f:
|
||||
fileContent = f.read()
|
||||
|
||||
try:
|
||||
response = s3_client.put_object(Body=fileContent, Bucket=bucket, Key=filename)
|
||||
logger.info("S3 Put object result: %s" % response)
|
||||
except botocore.exceptions.ClientError as err:
|
||||
raise Exception(f"Error Message: {err.response['Error']['Message']}\n"
|
||||
f"Http status code: {err.response['ResponseMetadata']['HTTPStatusCode']}") from err
|
||||
|
||||
|
||||
@keyword('Head object S3')
|
||||
def head_object_s3(s3_client, bucket, object_key):
|
||||
|
||||
try:
|
||||
response = s3_client.head_object(Bucket=bucket, Key=object_key)
|
||||
logger.info("S3 Head object result: %s" % response)
|
||||
return response
|
||||
|
||||
except botocore.exceptions.ClientError as err:
|
||||
raise Exception(f"Error Message: {err.response['Error']['Message']}\n"
|
||||
f"Http status code: {err.response['ResponseMetadata']['HTTPStatusCode']}") from err
|
||||
|
||||
|
||||
@keyword('Delete object S3')
|
||||
def delete_object_s3(s3_client, bucket, object_key):
|
||||
try:
|
||||
response = s3_client.delete_object(Bucket=bucket, Key=object_key)
|
||||
logger.info("S3 Put object result: %s" % response)
|
||||
return response
|
||||
|
||||
except botocore.exceptions.ClientError as err:
|
||||
raise Exception(f"Error Message: {err.response['Error']['Message']}\n"
|
||||
f"Http status code: {err.response['ResponseMetadata']['HTTPStatusCode']}") from err
|
||||
|
||||
|
||||
@keyword('Copy object S3')
|
||||
def copy_object_s3(s3_client, bucket, object_key, new_object):
|
||||
try:
|
||||
response = s3_client.copy_object(Bucket=bucket, CopySource=bucket+"/"+object_key, Key=new_object)
|
||||
logger.info("S3 Copy object result: %s" % response)
|
||||
return response
|
||||
|
||||
except botocore.exceptions.ClientError as err:
|
||||
raise Exception(f"Error Message: {err.response['Error']['Message']}\n"
|
||||
f"Http status code: {err.response['ResponseMetadata']['HTTPStatusCode']}") from err
|
||||
|
||||
|
||||
@keyword('Get object S3')
|
||||
def get_object_s3(s3_client, bucket, object_key, target_file):
|
||||
try:
|
||||
response = s3_client.get_object(Bucket=bucket, Key=object_key)
|
||||
|
||||
with open(f"{target_file}", 'wb') as f:
|
||||
chunk = response['Body'].read(1024)
|
||||
while chunk:
|
||||
f.write(chunk)
|
||||
chunk = response['Body'].read(1024)
|
||||
|
||||
return target_file
|
||||
|
||||
except botocore.exceptions.ClientError as err:
|
||||
raise Exception(f"Error Message: {err.response['Error']['Message']}\n"
|
||||
f"Http status code: {err.response['ResponseMetadata']['HTTPStatusCode']}") from err
|
||||
|
||||
|
||||
@keyword('Get via HTTP Gate')
|
||||
def get_via_http_gate(cid: str, oid: str):
|
||||
"""
|
||||
This function gets given object from HTTP gate
|
||||
:param cid: CID to get object from
|
||||
:param oid: object OID
|
||||
"""
|
||||
request = f'{HTTP_GATE}/get/{cid}/{oid}'
|
||||
resp = requests.get(request, stream=True)
|
||||
|
||||
if not resp.ok:
|
||||
raise Exception(f"""Failed to get object via HTTP gate:
|
||||
request: {resp.request.path_url},
|
||||
response: {resp.text},
|
||||
status code: {resp.status_code} {resp.reason}""")
|
||||
return
|
||||
|
||||
logger.info(f'Request: {request}')
|
||||
filename = os.path.curdir + f"/{cid}_{oid}"
|
||||
with open(filename, "wb") as f:
|
||||
shutil.copyfileobj(resp.raw, f)
|
||||
del resp
|
||||
return filename
|
||||
|
1107
robot/resources/lib/python/neofs.py
Normal file
1107
robot/resources/lib/python/neofs.py
Normal file
File diff suppressed because it is too large
Load diff
96
robot/resources/lib/python/payment_neogo.py
Normal file
96
robot/resources/lib/python/payment_neogo.py
Normal file
|
@ -0,0 +1,96 @@
|
|||
#!/usr/bin/python3
|
||||
|
||||
import os
|
||||
import pexpect
|
||||
import re
|
||||
|
||||
from robot.api.deco import keyword
|
||||
from robot.api import logger
|
||||
from neo3 import wallet
|
||||
|
||||
from common import *
|
||||
import rpc_client
|
||||
import contract
|
||||
|
||||
ROBOT_AUTO_KEYWORDS = False
|
||||
|
||||
NNS_CONTRACT = contract.get_nns_contract_hash(NEOFS_NEO_API_ENDPOINT)
|
||||
BALANCE_CONTRACT_HASH = contract.get_morph_contract_hash(
|
||||
'balance.neofs', NNS_CONTRACT, NEOFS_NEO_API_ENDPOINT
|
||||
)
|
||||
MORPH_TOKEN_POWER = 12
|
||||
|
||||
morph_rpc_cli = rpc_client.RPCClient(NEOFS_NEO_API_ENDPOINT)
|
||||
mainnet_rpc_cli = rpc_client.RPCClient(NEO_MAINNET_ENDPOINT)
|
||||
|
||||
|
||||
@keyword('Withdraw Mainnet Gas')
|
||||
def withdraw_mainnet_gas(wallet: str, address: str, scripthash: str, amount: int):
|
||||
cmd = (
|
||||
f"{NEOGO_CLI_EXEC} contract invokefunction -w {wallet} -a {address} "
|
||||
f"-r {NEO_MAINNET_ENDPOINT} {NEOFS_CONTRACT} withdraw {scripthash} "
|
||||
f"int:{amount} -- {scripthash}:Global"
|
||||
)
|
||||
|
||||
logger.info(f"Executing command: {cmd}")
|
||||
out = _run_sh_with_passwd('', cmd)
|
||||
logger.info(f"Command completed with output: {out}")
|
||||
m = re.match(r'^Sent invocation transaction (\w{64})$', out)
|
||||
if m is None:
|
||||
raise Exception("Can not get Tx.")
|
||||
tx = m.group(1)
|
||||
return tx
|
||||
|
||||
|
||||
@keyword('Transaction accepted in block')
|
||||
def transaction_accepted_in_block(tx_id: str):
|
||||
"""
|
||||
This function return True in case of accepted TX.
|
||||
Parameters:
|
||||
:param tx_id: transaction ID
|
||||
"""
|
||||
|
||||
try:
|
||||
resp = mainnet_rpc_cli.get_transaction_height(tx_id)
|
||||
if resp is not None:
|
||||
logger.info(f"got block height: {resp}")
|
||||
return True
|
||||
except Exception as e:
|
||||
logger.info(f"request failed with error: {e}")
|
||||
raise e
|
||||
|
||||
|
||||
@keyword('Get NeoFS Balance')
|
||||
def get_balance(wif: str):
|
||||
"""
|
||||
This function returns NeoFS balance for given WIF.
|
||||
"""
|
||||
|
||||
acc = wallet.Account.from_wif(wif, '')
|
||||
payload = [
|
||||
{
|
||||
'type': 'Hash160',
|
||||
'value': str(acc.script_hash)
|
||||
}
|
||||
]
|
||||
try:
|
||||
resp = morph_rpc_cli.invoke_function(
|
||||
BALANCE_CONTRACT_HASH, 'balanceOf', payload
|
||||
)
|
||||
logger.info(resp)
|
||||
value = int(resp['stack'][0]['value'])
|
||||
return value/(10**MORPH_TOKEN_POWER)
|
||||
except Exception as e:
|
||||
logger.error(f"failed to get {wif} balance: {e}")
|
||||
raise e
|
||||
|
||||
|
||||
def _run_sh_with_passwd(passwd, cmd):
|
||||
p = pexpect.spawn(cmd)
|
||||
p.expect(".*")
|
||||
p.sendline(passwd + '\r')
|
||||
p.wait()
|
||||
# throw a string with password prompt
|
||||
# take a string with tx hash
|
||||
tx_hash = p.read().splitlines()[-1]
|
||||
return tx_hash.decode()
|
53
robot/resources/lib/python/utility_keywords.py
Normal file
53
robot/resources/lib/python/utility_keywords.py
Normal file
|
@ -0,0 +1,53 @@
|
|||
#!/usr/bin/python3.8
|
||||
|
||||
import docker
|
||||
import os
|
||||
import tarfile
|
||||
import uuid
|
||||
|
||||
from neo3 import wallet
|
||||
from robot.api.deco import keyword
|
||||
from robot.api import logger
|
||||
from robot.libraries.BuiltIn import BuiltIn
|
||||
|
||||
from common import *
|
||||
|
||||
ROBOT_AUTO_KEYWORDS = False
|
||||
|
||||
@keyword('Generate file of bytes')
|
||||
def generate_file_of_bytes(size: str) -> str:
|
||||
"""
|
||||
Function generates big binary file with the specified size in bytes.
|
||||
:param size: the size in bytes, can be declared as 6e+6 for example
|
||||
"""
|
||||
size = int(float(size))
|
||||
filename = f"{os.getcwd()}/{ASSETS_DIR}/{str(uuid.uuid4())}"
|
||||
with open(filename, 'wb') as fout:
|
||||
fout.write(os.urandom(size))
|
||||
logger.info(f"file with size {size} bytes has been generated: {filename}")
|
||||
return filename
|
||||
|
||||
@keyword('Get Docker Logs')
|
||||
def get_container_logs(testcase_name: str) -> None:
|
||||
client = docker.APIClient(base_url='unix://var/run/docker.sock')
|
||||
logs_dir = BuiltIn().get_variable_value("${OUTPUT_DIR}")
|
||||
tar_name = f"{logs_dir}/dockerlogs({testcase_name}).tar.gz"
|
||||
tar = tarfile.open(tar_name, "w:gz")
|
||||
for container in client.containers():
|
||||
container_name = container['Names'][0][1:]
|
||||
if client.inspect_container(container_name)['Config']['Domainname'] == "neofs.devenv":
|
||||
file_name = f"{logs_dir}/docker_log_{container_name}"
|
||||
with open(file_name,'wb') as out:
|
||||
out.write(client.logs(container_name))
|
||||
logger.info(f"Collected logs from container {container_name}")
|
||||
tar.add(file_name)
|
||||
os.remove(file_name)
|
||||
tar.close()
|
||||
|
||||
@keyword('WIF to Binary')
|
||||
def wif_to_binary(wif: str) -> str:
|
||||
priv_key = wallet.Account.private_key_from_wif(wif)
|
||||
path = f"{os.getcwd()}/{ASSETS_DIR}/{str(uuid.uuid4())}"
|
||||
with open(path, "wb") as f:
|
||||
f.write(priv_key)
|
||||
return path
|
Loading…
Add table
Add a link
Reference in a new issue