Add unit tests for hosting

Signed-off-by: Vladimir Avdeev <v.avdeev@yadro.com>
This commit is contained in:
Vladimir Avdeev 2022-11-17 10:14:29 +03:00 committed by Vladimir Avdeev
parent 3816a3c7f1
commit 7b8b286a14
3 changed files with 155 additions and 32 deletions

View file

@ -1,15 +1,10 @@
import json
import os
from unittest import TestCase from unittest import TestCase
from uuid import uuid4
from neo3 import wallet as neo3_wallet from neofs_testlib.utils import converters
from neofs_testlib.utils import converters, wallet
class TestUtils(TestCase): class TestConverters(TestCase):
def test_converters_str_to_ascii_hex(self): def test_str_to_ascii_hex(self):
source_str = "" source_str = ""
result_str = "" result_str = ""
self.assertEqual(converters.str_to_ascii_hex(source_str), result_str) self.assertEqual(converters.str_to_ascii_hex(source_str), result_str)
@ -18,6 +13,7 @@ class TestUtils(TestCase):
result_str = "22746573745f646174612220663072205e636f6e766572742a" result_str = "22746573745f646174612220663072205e636f6e766572742a"
self.assertEqual(converters.str_to_ascii_hex(source_str), result_str) self.assertEqual(converters.str_to_ascii_hex(source_str), result_str)
def test_ascii_hex_to_str(self):
source_str = "" source_str = ""
result_bytes = b"" result_bytes = b""
self.assertEqual(converters.ascii_hex_to_str(source_str), result_bytes) self.assertEqual(converters.ascii_hex_to_str(source_str), result_bytes)
@ -48,27 +44,3 @@ class TestUtils(TestCase):
source_str = "d01a381aae45f1ed181db9d554cc5ccc69c69f4e" source_str = "d01a381aae45f1ed181db9d554cc5ccc69c69f4e"
result_str = "NT5hJ5peVmvYdZCsFKUM5MTcEGw5TB4k89" result_str = "NT5hJ5peVmvYdZCsFKUM5MTcEGw5TB4k89"
self.assertEqual(converters.contract_hash_to_address(source_str), result_str) self.assertEqual(converters.contract_hash_to_address(source_str), result_str)
def test_init_wallet(self):
wallet_file_path = f"{str(uuid4())}.json"
for password in ("", "password"):
wrong_password = "wrong_password"
wallet.init_wallet(wallet_file_path, password)
self.assertTrue(os.path.exists(wallet_file_path))
with open(wallet_file_path, "r") as wallet_file:
neo3_wallet.Wallet.from_json(json.load(wallet_file), password=password)
with self.assertRaises(Exception):
neo3_wallet.Wallet.from_json(json.load(wallet_file), password=wrong_password)
os.unlink(wallet_file_path)
def test_get_last_address_from_wallet(self):
wallet_file_path = f"{str(uuid4())}.json"
for password in ("", "password"):
wallet.init_wallet(wallet_file_path, password)
with open(wallet_file_path, "r") as wallet_file:
wlt = neo3_wallet.Wallet.from_json(json.load(wallet_file), password=password)
last_address = wlt.accounts[-1].address
self.assertEqual(
wallet.get_last_address_from_wallet(wallet_file_path, password), last_address
)
os.unlink(wallet_file_path)

113
tests/test_hosting.py Normal file
View file

@ -0,0 +1,113 @@
from unittest import TestCase
from neofs_testlib.hosting import CLIConfig, Hosting, ServiceConfig
class TestHosting(TestCase):
SERVICE_NAME_PREFIX = "service"
HOST1_ADDRESS = "10.10.10.10"
HOST1_PLUGIN = "docker"
HOST1_ATTRIBUTES = {"param1": "value1"}
SERVICE1_ATTRIBUTES = {"rpc_endpoint": "service1_endpoint"}
HOST1_CLIS = [{"name": "cli1", "exec_path": "cli1.exe", "attributes": {"param1": "value1"}}]
SERVICE1 = {"name": f"{SERVICE_NAME_PREFIX}1", "attributes": SERVICE1_ATTRIBUTES}
HOST1_SERVICES = [SERVICE1]
HOST1 = {
"address": HOST1_ADDRESS,
"plugin_name": HOST1_PLUGIN,
"attributes": HOST1_ATTRIBUTES,
"clis": HOST1_CLIS,
"services": HOST1_SERVICES,
}
HOST2_ADDRESS = "localhost"
HOST2_PLUGIN = "docker"
HOST2_ATTRIBUTES = {"param2": "value2"}
SERVICE2_ATTRIBUTES = {"rpc_endpoint": "service2_endpoint"}
SERVICE3_ATTRIBUTES = {"rpc_endpoint": "service3_endpoint"}
HOST2_CLIS = [{"name": "cli2", "exec_path": "/bin/cli", "attributes": {}}]
SERVICE2 = {"name": f"{SERVICE_NAME_PREFIX}", "attributes": SERVICE2_ATTRIBUTES}
SERVICE3 = {"name": f"text_before_{SERVICE_NAME_PREFIX}3", "attributes": SERVICE3_ATTRIBUTES}
HOST2_SERVICES = [SERVICE2, SERVICE3]
HOST2 = {
"address": HOST2_ADDRESS,
"plugin_name": HOST2_PLUGIN,
"attributes": HOST2_ATTRIBUTES,
"clis": HOST2_CLIS,
"services": HOST2_SERVICES,
}
HOSTING_CONFIG = {"hosts": [HOST1, HOST2]}
def test_hosting_configure(self):
hosting = Hosting()
hosting.configure(self.HOSTING_CONFIG)
self.assertEqual(len(hosting.hosts), 2)
def test_get_host_by_address(self):
hosting = Hosting()
hosting.configure(self.HOSTING_CONFIG)
host1 = hosting.get_host_by_address(self.HOST1_ADDRESS)
self.assertEqual(host1.config.address, self.HOST1_ADDRESS)
self.assertEqual(host1.config.plugin_name, self.HOST1_PLUGIN)
self.assertDictEqual(host1.config.attributes, self.HOST1_ATTRIBUTES)
self.assertListEqual(host1.config.clis, [CLIConfig(**cli) for cli in self.HOST1_CLIS])
self.assertListEqual(
host1.config.services, [ServiceConfig(**service) for service in self.HOST1_SERVICES]
)
host2 = hosting.get_host_by_address(self.HOST2_ADDRESS)
self.assertEqual(host2.config.address, self.HOST2_ADDRESS)
self.assertEqual(host2.config.plugin_name, self.HOST2_PLUGIN)
self.assertDictEqual(host2.config.attributes, self.HOST2_ATTRIBUTES)
self.assertListEqual(host2.config.clis, [CLIConfig(**cli) for cli in self.HOST2_CLIS])
self.assertListEqual(
host2.config.services, [ServiceConfig(**service) for service in self.HOST2_SERVICES]
)
def test_get_host_by_service(self):
hosting = Hosting()
hosting.configure(self.HOSTING_CONFIG)
host_with_service1 = hosting.get_host_by_service(self.SERVICE1["name"])
host_with_service2 = hosting.get_host_by_service(self.SERVICE2["name"])
host_with_service3 = hosting.get_host_by_service(self.SERVICE3["name"])
self.assertEqual(host_with_service1.config.address, self.HOST1_ADDRESS)
self.assertEqual(host_with_service2.config.address, self.HOST2_ADDRESS)
self.assertEqual(host_with_service3.config.address, self.HOST2_ADDRESS)
def test_get_service_config(self):
hosting = Hosting()
hosting.configure(self.HOSTING_CONFIG)
service1_config = hosting.get_service_config(self.SERVICE1["name"])
service2_config = hosting.get_service_config(self.SERVICE2["name"])
service3_config = hosting.get_service_config(self.SERVICE3["name"])
self.assertEqual(service1_config.name, self.SERVICE1["name"])
self.assertDictEqual(service1_config.attributes, self.SERVICE1_ATTRIBUTES)
self.assertEqual(service2_config.name, self.SERVICE2["name"])
self.assertDictEqual(service2_config.attributes, self.SERVICE2_ATTRIBUTES)
self.assertEqual(service3_config.name, self.SERVICE3["name"])
self.assertDictEqual(service3_config.attributes, self.SERVICE3_ATTRIBUTES)
def test_find_service_configs(self):
hosting = Hosting()
hosting.configure(self.HOSTING_CONFIG)
all_services = hosting.find_service_configs(r".+")
self.assertEqual(len(all_services), 3)
services = hosting.find_service_configs(rf"^{self.SERVICE_NAME_PREFIX}")
self.assertEqual(len(services), 2)
for service in services:
self.assertEqual(
service.name[: len(self.SERVICE_NAME_PREFIX)], self.SERVICE_NAME_PREFIX
)
service1 = hosting.find_service_configs(self.SERVICE1["name"])
self.assertEqual(len(service1), 1)
self.assertDictEqual(service1[0].attributes, self.SERVICE1_ATTRIBUTES)

38
tests/test_wallet.py Normal file
View file

@ -0,0 +1,38 @@
import json
import os
from unittest import TestCase
from uuid import uuid4
from neo3.wallet import Wallet
from neofs_testlib.utils.wallet import init_wallet, get_last_address_from_wallet
class TestWallet(TestCase):
DEFAULT_PASSWORD = "password"
EMPTY_PASSWORD = ""
def test_init_wallet(self):
wallet_file_path = f"{str(uuid4())}.json"
for password in (self.EMPTY_PASSWORD, self.DEFAULT_PASSWORD):
wrong_password = "wrong_password"
init_wallet(wallet_file_path, password)
self.assertTrue(os.path.exists(wallet_file_path))
with open(wallet_file_path, "r") as wallet_file:
Wallet.from_json(json.load(wallet_file), password=password)
with self.assertRaises(ValueError):
with open(wallet_file_path, "r") as wallet_file:
Wallet.from_json(json.load(wallet_file), password=wrong_password)
os.unlink(wallet_file_path)
def test_get_last_address_from_wallet(self):
wallet_file_path = f"{str(uuid4())}.json"
init_wallet(wallet_file_path, self.DEFAULT_PASSWORD)
with open(wallet_file_path, "r") as wallet_file:
wallet = Wallet.from_json(json.load(wallet_file), password=self.DEFAULT_PASSWORD)
last_address = wallet.accounts[-1].address
self.assertEqual(
get_last_address_from_wallet(wallet_file_path, self.DEFAULT_PASSWORD),
last_address,
)
os.unlink(wallet_file_path)