2022-03-15 11:58:59 +00:00
|
|
|
#!/usr/bin/python3
|
2020-07-01 02:28:31 +00:00
|
|
|
|
2021-08-25 17:08:54 +00:00
|
|
|
import json
|
2020-07-01 02:28:31 +00:00
|
|
|
import os
|
2021-08-25 17:08:54 +00:00
|
|
|
import random
|
|
|
|
|
|
|
|
from neo3 import wallet
|
2020-07-01 02:28:31 +00:00
|
|
|
from robot.api import logger
|
2022-06-09 13:08:11 +00:00
|
|
|
from robot.api.deco import keyword
|
2022-06-06 13:47:13 +00:00
|
|
|
from robot.libraries.BuiltIn import BuiltIn
|
2022-03-15 11:58:59 +00:00
|
|
|
|
2022-06-09 13:08:11 +00:00
|
|
|
import neofs_verbs
|
|
|
|
from common import NEOFS_NETMAP_DICT
|
|
|
|
|
2020-07-01 02:28:31 +00:00
|
|
|
ROBOT_AUTO_KEYWORDS = False
|
|
|
|
|
2021-01-17 11:55:10 +00:00
|
|
|
# path to neofs-cli executable
|
|
|
|
NEOFS_CLI_EXEC = os.getenv('NEOFS_CLI_EXEC', 'neofs-cli')
|
2020-11-18 15:15:57 +00:00
|
|
|
|
|
|
|
|
2020-12-23 22:38:16 +00:00
|
|
|
@keyword('Verify Head Tombstone')
|
2022-06-06 13:47:13 +00:00
|
|
|
def verify_head_tombstone(wallet_path: str, cid: str, oid_ts: str, oid: str):
|
|
|
|
header = neofs_verbs.head_object(wallet_path, cid, oid_ts)
|
|
|
|
header = header['header']
|
2020-11-30 10:33:05 +00:00
|
|
|
|
2022-06-06 13:47:13 +00:00
|
|
|
BuiltIn().should_be_equal(header["containerID"], cid,
|
2022-06-09 13:08:11 +00:00
|
|
|
msg="Tombstone Header CID is wrong")
|
2020-12-23 22:38:16 +00:00
|
|
|
|
2022-06-06 13:47:13 +00:00
|
|
|
wlt_data = dict()
|
|
|
|
with open(wallet_path, 'r') as fout:
|
|
|
|
wlt_data = json.loads(fout.read())
|
|
|
|
wlt = wallet.Wallet.from_json(wlt_data, password='')
|
|
|
|
addr = wlt.accounts[0].address
|
2020-12-23 22:38:16 +00:00
|
|
|
|
2022-06-06 13:47:13 +00:00
|
|
|
BuiltIn().should_be_equal(header["ownerID"], addr,
|
2022-06-09 13:08:11 +00:00
|
|
|
msg="Tombstone Owner ID is wrong")
|
2020-07-01 02:28:31 +00:00
|
|
|
|
2022-06-06 13:47:13 +00:00
|
|
|
BuiltIn().should_be_equal(header["objectType"], 'TOMBSTONE',
|
2022-06-09 13:08:11 +00:00
|
|
|
msg="Header Type isn't Tombstone")
|
2020-07-01 02:28:31 +00:00
|
|
|
|
2022-06-06 13:47:13 +00:00
|
|
|
BuiltIn().should_be_equal(header["sessionToken"]["body"]["object"]["verb"], 'DELETE',
|
2022-06-09 13:08:11 +00:00
|
|
|
msg="Header Session Type isn't DELETE")
|
2022-06-06 13:47:13 +00:00
|
|
|
|
|
|
|
BuiltIn().should_be_equal(header["sessionToken"]["body"]["object"]["address"]["containerID"],
|
2022-06-09 13:08:11 +00:00
|
|
|
cid,
|
|
|
|
msg="Header Session ID is wrong")
|
2022-06-06 13:47:13 +00:00
|
|
|
|
|
|
|
BuiltIn().should_be_equal(header["sessionToken"]["body"]["object"]["address"]["objectID"],
|
2022-06-09 13:08:11 +00:00
|
|
|
oid,
|
|
|
|
msg="Header Session OID is wrong")
|
2020-07-01 02:28:31 +00:00
|
|
|
|
2020-11-28 03:41:35 +00:00
|
|
|
|
2022-06-24 12:19:25 +00:00
|
|
|
@keyword('Get control endpoint and wallet')
|
|
|
|
def get_control_endpoint_and_wallet(endpoint_number: str = ''):
|
|
|
|
"""
|
|
|
|
Gets control endpoint for a random or given node
|
|
|
|
|
|
|
|
Args:
|
|
|
|
endpoint_number (optional, str): the number of the node
|
|
|
|
in the form of 's01', 's02', etc.
|
|
|
|
given in NEOFS_NETMAP_DICT as keys
|
|
|
|
Returns:
|
|
|
|
(str): the number of the node
|
|
|
|
(str): endpoint control for the node
|
|
|
|
(str): the wallet of the respective node
|
|
|
|
"""
|
2021-09-09 08:13:37 +00:00
|
|
|
if endpoint_number == '':
|
2022-06-24 12:19:25 +00:00
|
|
|
endpoint_num = random.choice(list(NEOFS_NETMAP_DICT.keys()))
|
2021-09-09 08:13:37 +00:00
|
|
|
logger.info(f'Random node chosen: {endpoint_num}')
|
|
|
|
else:
|
|
|
|
endpoint_num = endpoint_number
|
|
|
|
|
|
|
|
endpoint_values = NEOFS_NETMAP_DICT[f'{endpoint_num}']
|
|
|
|
endpoint_control = endpoint_values['control']
|
2022-06-24 12:19:25 +00:00
|
|
|
wlt = endpoint_values['wallet_path']
|
2022-03-11 16:08:14 +00:00
|
|
|
|
2022-06-24 12:19:25 +00:00
|
|
|
return endpoint_num, endpoint_control, wlt
|
2021-09-09 08:13:37 +00:00
|
|
|
|
2022-04-25 09:53:20 +00:00
|
|
|
|
2021-10-04 16:16:49 +00:00
|
|
|
@keyword('Get Locode')
|
|
|
|
def get_locode():
|
|
|
|
endpoint_values = random.choice(list(NEOFS_NETMAP_DICT.values()))
|
|
|
|
locode = endpoint_values['UN-LOCODE']
|
|
|
|
logger.info(f'Random locode chosen: {locode}')
|
|
|
|
|
|
|
|
return locode
|