forked from TrueCloudLab/frostfs-testlib
Refactoring utils with adding several new ones
This commit is contained in:
parent
5568cbd0bf
commit
4fd9d69701
10 changed files with 276 additions and 61 deletions
|
@ -1,3 +1,5 @@
|
|||
import converters
|
||||
import errors
|
||||
import wallet
|
||||
import frostfs_testlib.utils.converting_utils
|
||||
import frostfs_testlib.utils.datetime_utils
|
||||
import frostfs_testlib.utils.json_utils
|
||||
import frostfs_testlib.utils.string_utils
|
||||
import frostfs_testlib.utils.wallet_utils
|
||||
|
|
|
@ -3,7 +3,6 @@ import binascii
|
|||
import json
|
||||
|
||||
import base58
|
||||
from neo3.wallet import wallet as neo3_wallet
|
||||
|
||||
|
||||
def str_to_ascii_hex(input: str) -> str:
|
||||
|
@ -61,9 +60,3 @@ def get_wif_from_private_key(priv_key: bytes) -> str:
|
|||
compressed_flag = b"\x01"
|
||||
wif = base58.b58encode_check(wif_version + priv_key + compressed_flag)
|
||||
return wif.decode("utf-8")
|
||||
|
||||
|
||||
def load_wallet(path: str, passwd: str = "") -> neo3_wallet.Wallet:
|
||||
with open(path, "r") as wallet_file:
|
||||
wlt_data = wallet_file.read()
|
||||
return neo3_wallet.Wallet.from_json(json.loads(wlt_data), password=passwd)
|
27
src/frostfs_testlib/utils/datetime_utils.py
Normal file
27
src/frostfs_testlib/utils/datetime_utils.py
Normal file
|
@ -0,0 +1,27 @@
|
|||
# There is place for date time utils functions
|
||||
|
||||
|
||||
def parse_time(value: str) -> int:
|
||||
"""Converts time interval in text form into time interval as number of seconds.
|
||||
|
||||
Args:
|
||||
value: time interval as text.
|
||||
|
||||
Returns:
|
||||
Number of seconds in the parsed time interval.
|
||||
"""
|
||||
value = value.lower()
|
||||
|
||||
for suffix in ["s", "sec"]:
|
||||
if value.endswith(suffix):
|
||||
return int(value[: -len(suffix)])
|
||||
|
||||
for suffix in ["m", "min"]:
|
||||
if value.endswith(suffix):
|
||||
return int(value[: -len(suffix)]) * 60
|
||||
|
||||
for suffix in ["h", "hr", "hour"]:
|
||||
if value.endswith(suffix):
|
||||
return int(value[: -len(suffix)]) * 60 * 60
|
||||
|
||||
raise ValueError(f"Unknown units in time value '{value}'")
|
|
@ -1,11 +0,0 @@
|
|||
import re
|
||||
|
||||
|
||||
def error_matches_status(error: Exception, status_pattern: str) -> bool:
|
||||
"""
|
||||
Determines whether exception matches specified status pattern.
|
||||
|
||||
We use re.search() to be consistent with pytest.raises.
|
||||
"""
|
||||
match = re.search(status_pattern, str(error))
|
||||
return match is not None
|
136
src/frostfs_testlib/utils/json_utils.py
Normal file
136
src/frostfs_testlib/utils/json_utils.py
Normal file
|
@ -0,0 +1,136 @@
|
|||
"""
|
||||
When doing requests to FrostFS, we get JSON output as an automatically decoded
|
||||
structure from protobuf. Some fields are decoded with boilerplates and binary
|
||||
values are Base64-encoded.
|
||||
|
||||
This module contains functions which rearrange the structure and reencode binary
|
||||
data from Base64 to Base58.
|
||||
"""
|
||||
|
||||
import base64
|
||||
|
||||
import base58
|
||||
|
||||
|
||||
def decode_simple_header(data: dict) -> dict:
|
||||
"""
|
||||
This function reencodes Simple Object header and its attributes.
|
||||
"""
|
||||
try:
|
||||
data = decode_common_fields(data)
|
||||
|
||||
# Normalize object attributes
|
||||
data["header"]["attributes"] = {
|
||||
attr["key"]: attr["value"] for attr in data["header"]["attributes"]
|
||||
}
|
||||
except Exception as exc:
|
||||
raise ValueError(f"failed to decode JSON output: {exc}") from exc
|
||||
|
||||
return data
|
||||
|
||||
|
||||
def decode_split_header(data: dict) -> dict:
|
||||
"""
|
||||
This function rearranges Complex Object header.
|
||||
The header holds SplitID, a random unique
|
||||
number, which is common among all splitted objects, and IDs of the Linking
|
||||
Object and the last splitted Object.
|
||||
"""
|
||||
try:
|
||||
data["splitId"] = json_reencode(data["splitId"])
|
||||
data["lastPart"] = json_reencode(data["lastPart"]["value"]) if data["lastPart"] else None
|
||||
data["link"] = json_reencode(data["link"]["value"]) if data["link"] else None
|
||||
except Exception as exc:
|
||||
raise ValueError(f"failed to decode JSON output: {exc}") from exc
|
||||
|
||||
return data
|
||||
|
||||
|
||||
def decode_linking_object(data: dict) -> dict:
|
||||
"""
|
||||
This function reencodes Linking Object header.
|
||||
It contains IDs of child Objects and Split Chain data.
|
||||
"""
|
||||
try:
|
||||
data = decode_simple_header(data)
|
||||
split = data["header"]["split"]
|
||||
split["children"] = [json_reencode(item["value"]) for item in split["children"]]
|
||||
split["splitID"] = json_reencode(split["splitID"])
|
||||
split["previous"] = json_reencode(split["previous"]["value"]) if split["previous"] else None
|
||||
split["parent"] = json_reencode(split["parent"]["value"]) if split["parent"] else None
|
||||
except Exception as exc:
|
||||
raise ValueError(f"failed to decode JSON output: {exc}") from exc
|
||||
|
||||
return data
|
||||
|
||||
|
||||
def decode_storage_group(data: dict) -> dict:
|
||||
"""
|
||||
This function reencodes Storage Group header.
|
||||
"""
|
||||
try:
|
||||
data = decode_common_fields(data)
|
||||
except Exception as exc:
|
||||
raise ValueError(f"failed to decode JSON output: {exc}") from exc
|
||||
|
||||
return data
|
||||
|
||||
|
||||
def decode_tombstone(data: dict) -> dict:
|
||||
"""
|
||||
This function re-encodes Tombstone header.
|
||||
"""
|
||||
try:
|
||||
data = decode_simple_header(data)
|
||||
data["header"]["sessionToken"] = decode_session_token(data["header"]["sessionToken"])
|
||||
except Exception as exc:
|
||||
raise ValueError(f"failed to decode JSON output: {exc}") from exc
|
||||
return data
|
||||
|
||||
|
||||
def decode_session_token(data: dict) -> dict:
|
||||
"""
|
||||
This function re-encodes a fragment of header which contains
|
||||
information about session token.
|
||||
"""
|
||||
target = data["body"]["object"]["target"]
|
||||
target["container"] = json_reencode(target["container"]["value"])
|
||||
target["objects"] = [json_reencode(obj["value"]) for obj in target["objects"]]
|
||||
return data
|
||||
|
||||
|
||||
def json_reencode(data: str) -> str:
|
||||
"""
|
||||
According to JSON protocol, binary data (Object/Container/Storage Group IDs, etc)
|
||||
is converted to string via Base58 encoder. But we usually operate with Base64-encoded format.
|
||||
This function reencodes given Base58 string into the Base64 one.
|
||||
"""
|
||||
return base58.b58encode(base64.b64decode(data)).decode("utf-8")
|
||||
|
||||
|
||||
def encode_for_json(data: str) -> str:
|
||||
"""
|
||||
This function encodes binary data for sending them as protobuf
|
||||
structures.
|
||||
"""
|
||||
return base64.b64encode(base58.b58decode(data)).decode("utf-8")
|
||||
|
||||
|
||||
def decode_common_fields(data: dict) -> dict:
|
||||
"""
|
||||
Despite of type (simple/complex Object, Storage Group, etc) every Object
|
||||
header contains several common fields.
|
||||
This function rearranges these fields.
|
||||
"""
|
||||
data["objectID"] = json_reencode(data["objectID"]["value"])
|
||||
|
||||
header = data["header"]
|
||||
header["containerID"] = json_reencode(header["containerID"]["value"])
|
||||
header["ownerID"] = json_reencode(header["ownerID"]["value"])
|
||||
header["payloadHash"] = json_reencode(header["payloadHash"]["sum"])
|
||||
header["version"] = f"{header['version']['major']}{header['version']['minor']}"
|
||||
# Homomorphic hash is optional and its calculation might be disabled in trusted network
|
||||
if header.get("homomorphicHash"):
|
||||
header["homomorphicHash"] = json_reencode(header["homomorphicHash"]["sum"])
|
||||
|
||||
return data
|
31
src/frostfs_testlib/utils/string_utils.py
Normal file
31
src/frostfs_testlib/utils/string_utils.py
Normal file
|
@ -0,0 +1,31 @@
|
|||
import random
|
||||
import re
|
||||
import string
|
||||
|
||||
ONLY_ASCII_LETTERS = string.ascii_letters
|
||||
DIGITS_AND_ASCII_LETTERS = string.ascii_letters + string.digits
|
||||
|
||||
|
||||
def random_string(length: int = 5, source: str = ONLY_ASCII_LETTERS):
|
||||
"""
|
||||
Generate random string from source letters list
|
||||
|
||||
Args:
|
||||
length: length for generated string
|
||||
source: source string with letters for generate random string
|
||||
Returns:
|
||||
(str): random string with len == length
|
||||
"""
|
||||
|
||||
return "".join(random.choice(string.ascii_letters) for i in range(length))
|
||||
|
||||
|
||||
def is_str_match_pattern(error: Exception, status_pattern: str) -> bool:
|
||||
"""
|
||||
Determines whether exception matches specified status pattern.
|
||||
|
||||
We use re.search() to be consistent with pytest.raises.
|
||||
"""
|
||||
match = re.search(status_pattern, str(error))
|
||||
|
||||
return match is not None
|
|
@ -1,38 +0,0 @@
|
|||
import json
|
||||
import logging
|
||||
|
||||
from neo3.wallet import wallet as neo3_wallet
|
||||
from neo3.wallet import account as neo3_account
|
||||
|
||||
logger = logging.getLogger("frostfs.testlib.utils")
|
||||
|
||||
|
||||
def init_wallet(wallet_path: str, wallet_password: str):
|
||||
"""
|
||||
Create new wallet and new account.
|
||||
Args:
|
||||
wallet_path: The path to the wallet to save wallet.
|
||||
wallet_password: The password for new wallet.
|
||||
"""
|
||||
wallet = neo3_wallet.Wallet()
|
||||
account = neo3_account.Account.create_new(wallet_password)
|
||||
wallet.account_add(account)
|
||||
with open(wallet_path, "w") as out:
|
||||
json.dump(wallet.to_json(), out)
|
||||
logger.info(f"Init new wallet: {wallet_path}, address: {account.address}")
|
||||
|
||||
|
||||
def get_last_address_from_wallet(wallet_path: str, wallet_password: str):
|
||||
"""
|
||||
Extracting the last address from the given wallet.
|
||||
Args:
|
||||
wallet_path: The path to the wallet to extract address from.
|
||||
wallet_password: The password for the given wallet.
|
||||
Returns:
|
||||
The address for the wallet.
|
||||
"""
|
||||
with open(wallet_path) as wallet_file:
|
||||
wallet = neo3_wallet.Wallet.from_json(json.load(wallet_file), password=wallet_password)
|
||||
address = wallet.accounts[-1].address
|
||||
logger.info(f"got address: {address}")
|
||||
return address
|
75
src/frostfs_testlib/utils/wallet_utils.py
Normal file
75
src/frostfs_testlib/utils/wallet_utils.py
Normal file
|
@ -0,0 +1,75 @@
|
|||
import base64
|
||||
import json
|
||||
import logging
|
||||
|
||||
import base58
|
||||
from neo3.wallet import account as neo3_account
|
||||
from neo3.wallet import wallet as neo3_wallet
|
||||
|
||||
logger = logging.getLogger("frostfs.testlib.utils")
|
||||
|
||||
|
||||
def init_wallet(wallet_path: str, wallet_password: str):
|
||||
"""
|
||||
Create new wallet and new account.
|
||||
Args:
|
||||
wallet_path: The path to the wallet to save wallet.
|
||||
wallet_password: The password for new wallet.
|
||||
"""
|
||||
wallet = neo3_wallet.Wallet()
|
||||
account = neo3_account.Account.create_new(wallet_password)
|
||||
wallet.account_add(account)
|
||||
with open(wallet_path, "w") as out:
|
||||
json.dump(wallet.to_json(), out)
|
||||
logger.info(f"Init new wallet: {wallet_path}, address: {account.address}")
|
||||
|
||||
|
||||
def get_last_address_from_wallet(wallet_path: str, wallet_password: str):
|
||||
"""
|
||||
Extracting the last address from the given wallet.
|
||||
Args:
|
||||
wallet_path: The path to the wallet to extract address from.
|
||||
wallet_password: The password for the given wallet.
|
||||
Returns:
|
||||
The address for the wallet.
|
||||
"""
|
||||
with open(wallet_path) as wallet_file:
|
||||
wallet = neo3_wallet.Wallet.from_json(json.load(wallet_file), password=wallet_password)
|
||||
address = wallet.accounts[-1].address
|
||||
logger.info(f"got address: {address}")
|
||||
return address
|
||||
|
||||
|
||||
def get_wallet_public_key(wallet_path: str, wallet_password: str, format: str = "hex") -> str:
|
||||
def __fix_wallet_schema(wallet: dict) -> None:
|
||||
# Temporary function to fix wallets that do not conform to the schema
|
||||
# TODO: get rid of it once issue is solved
|
||||
if "name" not in wallet:
|
||||
wallet["name"] = None
|
||||
for account in wallet["accounts"]:
|
||||
if "extra" not in account:
|
||||
account["extra"] = None
|
||||
|
||||
# Get public key from wallet file
|
||||
with open(wallet_path, "r") as file:
|
||||
wallet_content = json.load(file)
|
||||
__fix_wallet_schema(wallet_content)
|
||||
wallet_from_json = neo3_wallet.Wallet.from_json(wallet_content, password=wallet_password)
|
||||
public_key_hex = str(wallet_from_json.accounts[0].public_key)
|
||||
|
||||
# Convert public key to specified format
|
||||
if format == "hex":
|
||||
return public_key_hex
|
||||
if format == "base58":
|
||||
public_key_base58 = base58.b58encode(bytes.fromhex(public_key_hex))
|
||||
return public_key_base58.decode("utf-8")
|
||||
if format == "base64":
|
||||
public_key_base64 = base64.b64encode(bytes.fromhex(public_key_hex))
|
||||
return public_key_base64.decode("utf-8")
|
||||
raise ValueError(f"Invalid public key format: {format}")
|
||||
|
||||
|
||||
def load_wallet(path: str, passwd: str = "") -> neo3_wallet.Wallet:
|
||||
with open(path, "r") as wallet_file:
|
||||
wlt_data = wallet_file.read()
|
||||
return neo3_wallet.Wallet.from_json(json.loads(wlt_data), password=passwd)
|
|
@ -1,6 +1,6 @@
|
|||
from unittest import TestCase
|
||||
|
||||
from frostfs_testlib.utils import converters
|
||||
from frostfs_testlib.utils import converting_utils
|
||||
|
||||
|
||||
class TestConverters(TestCase):
|
||||
|
|
|
@ -5,7 +5,7 @@ from uuid import uuid4
|
|||
|
||||
from neo3.wallet.wallet import Wallet
|
||||
|
||||
from frostfs_testlib.utils.wallet import init_wallet, get_last_address_from_wallet
|
||||
from frostfs_testlib.utils.wallet_utils import get_last_address_from_wallet, init_wallet
|
||||
|
||||
|
||||
class TestWallet(TestCase):
|
||||
|
|
Loading…
Reference in a new issue