forked from TrueCloudLab/frostfs-testcases
a97e1ee1e9
Signed-off-by: anastasia prasolova <anastasia@nspcc.ru>
155 lines
5.4 KiB
Python
155 lines
5.4 KiB
Python
#!/usr/bin/python3
|
|
|
|
import base64
|
|
import json
|
|
import os
|
|
import re
|
|
import random
|
|
import uuid
|
|
import base58
|
|
|
|
from neo3 import wallet
|
|
from common import (NEOFS_NETMAP, WALLET_PASS, NEOFS_ENDPOINT,
|
|
NEOFS_NETMAP_DICT, ASSETS_DIR)
|
|
from cli_helpers import _cmd_run
|
|
import json_transformers
|
|
from robot.api.deco import keyword
|
|
from robot.api import logger
|
|
|
|
ROBOT_AUTO_KEYWORDS = False
|
|
|
|
# path to neofs-cli executable
|
|
NEOFS_CLI_EXEC = os.getenv('NEOFS_CLI_EXEC', 'neofs-cli')
|
|
|
|
|
|
# TODO: move to neofs-keywords
|
|
@keyword('Get ScriptHash')
|
|
def get_scripthash(wif: str):
|
|
acc = wallet.Account.from_wif(wif, '')
|
|
return str(acc.script_hash)
|
|
|
|
|
|
@keyword('Verify Head Tombstone')
|
|
def verify_head_tombstone(wallet: str, cid: str, oid_ts: str, oid: str, addr: str):
|
|
# TODO: replace with HEAD from neofs_verbs.py
|
|
object_cmd = (
|
|
f'{NEOFS_CLI_EXEC} --rpc-endpoint {NEOFS_ENDPOINT} --wallet {wallet} '
|
|
f'--config {WALLET_PASS} object head --cid {cid} --oid {oid_ts} --json'
|
|
)
|
|
output = _cmd_run(object_cmd)
|
|
full_headers = json.loads(output)
|
|
logger.info(f"Output: {full_headers}")
|
|
|
|
# Header verification
|
|
header_cid = full_headers["header"]["containerID"]["value"]
|
|
if json_transformers.json_reencode(header_cid) == cid:
|
|
logger.info(f"Header CID is expected: {cid} ({header_cid} in the output)")
|
|
else:
|
|
raise Exception("Header CID is not expected.")
|
|
|
|
header_owner = full_headers["header"]["ownerID"]["value"]
|
|
if json_transformers.json_reencode(header_owner) == addr:
|
|
logger.info(f"Header ownerID is expected: {addr} ({header_owner} in the output)")
|
|
else:
|
|
raise Exception("Header ownerID is not expected.")
|
|
|
|
header_type = full_headers["header"]["objectType"]
|
|
if header_type == "TOMBSTONE":
|
|
logger.info(f"Header Type is expected: {header_type}")
|
|
else:
|
|
raise Exception("Header Type is not expected.")
|
|
|
|
header_session_type = full_headers["header"]["sessionToken"]["body"]["object"]["verb"]
|
|
if header_session_type == "DELETE":
|
|
logger.info(f"Header Session Type is expected: {header_session_type}")
|
|
else:
|
|
raise Exception("Header Session Type is not expected.")
|
|
|
|
header_session_cid = full_headers["header"]["sessionToken"]["body"]["object"]["address"]["containerID"]["value"]
|
|
if json_transformers.json_reencode(header_session_cid) == cid:
|
|
logger.info(f"Header ownerID is expected: {addr} ({header_session_cid} in the output)")
|
|
else:
|
|
raise Exception("Header Session CID is not expected.")
|
|
|
|
header_session_oid = full_headers["header"]["sessionToken"]["body"]["object"]["address"]["objectID"]["value"]
|
|
if json_transformers.json_reencode(header_session_oid) == oid:
|
|
logger.info(f"Header Session OID (deleted object) is expected: {oid} ({header_session_oid} in the output)")
|
|
else:
|
|
raise Exception("Header Session OID (deleted object) is not expected.")
|
|
|
|
|
|
@keyword('Get control endpoint with wif')
|
|
def get_control_endpoint_with_wif(endpoint_number: str = ''):
|
|
if endpoint_number == '':
|
|
netmap = []
|
|
for key in NEOFS_NETMAP_DICT.keys():
|
|
netmap.append(key)
|
|
endpoint_num = random.sample(netmap, 1)[0]
|
|
logger.info(f'Random node chosen: {endpoint_num}')
|
|
else:
|
|
endpoint_num = endpoint_number
|
|
|
|
endpoint_values = NEOFS_NETMAP_DICT[f'{endpoint_num}']
|
|
endpoint_control = endpoint_values['control']
|
|
wif = endpoint_values['wif']
|
|
|
|
return endpoint_num, endpoint_control, wif
|
|
|
|
|
|
@keyword('Get Locode')
|
|
def get_locode():
|
|
endpoint_values = random.choice(list(NEOFS_NETMAP_DICT.values()))
|
|
locode = endpoint_values['UN-LOCODE']
|
|
logger.info(f'Random locode chosen: {locode}')
|
|
|
|
return locode
|
|
|
|
|
|
@keyword('Generate Session Token')
|
|
def generate_session_token(owner: str, pub_key: str, cid: str = "", wildcard: bool = False) -> str:
|
|
|
|
file_path = f"{os.getcwd()}/{ASSETS_DIR}/{str(uuid.uuid4())}"
|
|
|
|
owner_64 = base64.b64encode(base58.b58decode(owner)).decode('utf-8')
|
|
cid_64 = base64.b64encode(cid.encode('utf-8')).decode('utf-8')
|
|
pub_key_64 = base64.b64encode(bytes.fromhex(pub_key)).decode('utf-8')
|
|
id_64 = base64.b64encode(uuid.uuid4().bytes).decode('utf-8')
|
|
|
|
session_token = {
|
|
"body":{
|
|
"id":f"{id_64}",
|
|
"ownerID":{
|
|
"value":f"{owner_64}"
|
|
},
|
|
"lifetime":{
|
|
"exp":"100000000",
|
|
"nbf":"0",
|
|
"iat":"0"
|
|
},
|
|
"sessionKey":f"{pub_key_64}",
|
|
"container":{
|
|
"verb":"PUT",
|
|
"wildcard": wildcard,
|
|
**({ "containerID":{"value":f"{cid_64}"} } if not wildcard else {})
|
|
}
|
|
}
|
|
}
|
|
|
|
logger.info(f"Got this Session Token: {session_token}")
|
|
|
|
with open(file_path, 'w', encoding='utf-8') as session_token_file:
|
|
json.dump(session_token, session_token_file, ensure_ascii=False, indent=4)
|
|
|
|
return file_path
|
|
|
|
|
|
@keyword ('Sign Session Token')
|
|
def sign_session_token(session_token: str, wallet: str, to_file: str=''):
|
|
if to_file:
|
|
to_file = f'--to {to_file}'
|
|
cmd = (
|
|
f'{NEOFS_CLI_EXEC} util sign session-token --from {session_token} '
|
|
f'-w {wallet} {to_file} --config {WALLET_PASS}'
|
|
)
|
|
logger.info(f"cmd: {cmd}")
|
|
_cmd_run(cmd)
|