From 7b8b286a14a500bc6bb34398342a71a7467e7f6f Mon Sep 17 00:00:00 2001 From: Vladimir Avdeev Date: Thu, 17 Nov 2022 10:14:29 +0300 Subject: [PATCH] Add unit tests for hosting Signed-off-by: Vladimir Avdeev --- tests/{test_ulils.py => test_converters.py} | 36 +------ tests/test_hosting.py | 113 ++++++++++++++++++++ tests/test_wallet.py | 38 +++++++ 3 files changed, 155 insertions(+), 32 deletions(-) rename tests/{test_ulils.py => test_converters.py} (55%) create mode 100644 tests/test_hosting.py create mode 100644 tests/test_wallet.py diff --git a/tests/test_ulils.py b/tests/test_converters.py similarity index 55% rename from tests/test_ulils.py rename to tests/test_converters.py index 50cf5fc6..f453c426 100644 --- a/tests/test_ulils.py +++ b/tests/test_converters.py @@ -1,15 +1,10 @@ -import json -import os from unittest import TestCase -from uuid import uuid4 -from neo3 import wallet as neo3_wallet - -from neofs_testlib.utils import converters, wallet +from neofs_testlib.utils import converters -class TestUtils(TestCase): - def test_converters_str_to_ascii_hex(self): +class TestConverters(TestCase): + def test_str_to_ascii_hex(self): source_str = "" result_str = "" self.assertEqual(converters.str_to_ascii_hex(source_str), result_str) @@ -18,6 +13,7 @@ class TestUtils(TestCase): result_str = "22746573745f646174612220663072205e636f6e766572742a" self.assertEqual(converters.str_to_ascii_hex(source_str), result_str) + def test_ascii_hex_to_str(self): source_str = "" result_bytes = b"" self.assertEqual(converters.ascii_hex_to_str(source_str), result_bytes) @@ -48,27 +44,3 @@ class TestUtils(TestCase): source_str = "d01a381aae45f1ed181db9d554cc5ccc69c69f4e" result_str = "NT5hJ5peVmvYdZCsFKUM5MTcEGw5TB4k89" self.assertEqual(converters.contract_hash_to_address(source_str), result_str) - - def test_init_wallet(self): - wallet_file_path = f"{str(uuid4())}.json" - for password in ("", "password"): - wrong_password = "wrong_password" - wallet.init_wallet(wallet_file_path, password) - self.assertTrue(os.path.exists(wallet_file_path)) - with open(wallet_file_path, "r") as wallet_file: - neo3_wallet.Wallet.from_json(json.load(wallet_file), password=password) - with self.assertRaises(Exception): - neo3_wallet.Wallet.from_json(json.load(wallet_file), password=wrong_password) - os.unlink(wallet_file_path) - - def test_get_last_address_from_wallet(self): - wallet_file_path = f"{str(uuid4())}.json" - for password in ("", "password"): - wallet.init_wallet(wallet_file_path, password) - with open(wallet_file_path, "r") as wallet_file: - wlt = neo3_wallet.Wallet.from_json(json.load(wallet_file), password=password) - last_address = wlt.accounts[-1].address - self.assertEqual( - wallet.get_last_address_from_wallet(wallet_file_path, password), last_address - ) - os.unlink(wallet_file_path) diff --git a/tests/test_hosting.py b/tests/test_hosting.py new file mode 100644 index 00000000..17cacb4c --- /dev/null +++ b/tests/test_hosting.py @@ -0,0 +1,113 @@ +from unittest import TestCase + +from neofs_testlib.hosting import CLIConfig, Hosting, ServiceConfig + + +class TestHosting(TestCase): + SERVICE_NAME_PREFIX = "service" + HOST1_ADDRESS = "10.10.10.10" + HOST1_PLUGIN = "docker" + HOST1_ATTRIBUTES = {"param1": "value1"} + SERVICE1_ATTRIBUTES = {"rpc_endpoint": "service1_endpoint"} + HOST1_CLIS = [{"name": "cli1", "exec_path": "cli1.exe", "attributes": {"param1": "value1"}}] + SERVICE1 = {"name": f"{SERVICE_NAME_PREFIX}1", "attributes": SERVICE1_ATTRIBUTES} + HOST1_SERVICES = [SERVICE1] + HOST1 = { + "address": HOST1_ADDRESS, + "plugin_name": HOST1_PLUGIN, + "attributes": HOST1_ATTRIBUTES, + "clis": HOST1_CLIS, + "services": HOST1_SERVICES, + } + + HOST2_ADDRESS = "localhost" + HOST2_PLUGIN = "docker" + HOST2_ATTRIBUTES = {"param2": "value2"} + SERVICE2_ATTRIBUTES = {"rpc_endpoint": "service2_endpoint"} + SERVICE3_ATTRIBUTES = {"rpc_endpoint": "service3_endpoint"} + HOST2_CLIS = [{"name": "cli2", "exec_path": "/bin/cli", "attributes": {}}] + SERVICE2 = {"name": f"{SERVICE_NAME_PREFIX}", "attributes": SERVICE2_ATTRIBUTES} + SERVICE3 = {"name": f"text_before_{SERVICE_NAME_PREFIX}3", "attributes": SERVICE3_ATTRIBUTES} + HOST2_SERVICES = [SERVICE2, SERVICE3] + HOST2 = { + "address": HOST2_ADDRESS, + "plugin_name": HOST2_PLUGIN, + "attributes": HOST2_ATTRIBUTES, + "clis": HOST2_CLIS, + "services": HOST2_SERVICES, + } + HOSTING_CONFIG = {"hosts": [HOST1, HOST2]} + + def test_hosting_configure(self): + hosting = Hosting() + hosting.configure(self.HOSTING_CONFIG) + self.assertEqual(len(hosting.hosts), 2) + + def test_get_host_by_address(self): + hosting = Hosting() + hosting.configure(self.HOSTING_CONFIG) + + host1 = hosting.get_host_by_address(self.HOST1_ADDRESS) + self.assertEqual(host1.config.address, self.HOST1_ADDRESS) + self.assertEqual(host1.config.plugin_name, self.HOST1_PLUGIN) + self.assertDictEqual(host1.config.attributes, self.HOST1_ATTRIBUTES) + self.assertListEqual(host1.config.clis, [CLIConfig(**cli) for cli in self.HOST1_CLIS]) + self.assertListEqual( + host1.config.services, [ServiceConfig(**service) for service in self.HOST1_SERVICES] + ) + + host2 = hosting.get_host_by_address(self.HOST2_ADDRESS) + self.assertEqual(host2.config.address, self.HOST2_ADDRESS) + self.assertEqual(host2.config.plugin_name, self.HOST2_PLUGIN) + self.assertDictEqual(host2.config.attributes, self.HOST2_ATTRIBUTES) + self.assertListEqual(host2.config.clis, [CLIConfig(**cli) for cli in self.HOST2_CLIS]) + self.assertListEqual( + host2.config.services, [ServiceConfig(**service) for service in self.HOST2_SERVICES] + ) + + def test_get_host_by_service(self): + hosting = Hosting() + hosting.configure(self.HOSTING_CONFIG) + + host_with_service1 = hosting.get_host_by_service(self.SERVICE1["name"]) + host_with_service2 = hosting.get_host_by_service(self.SERVICE2["name"]) + host_with_service3 = hosting.get_host_by_service(self.SERVICE3["name"]) + + self.assertEqual(host_with_service1.config.address, self.HOST1_ADDRESS) + self.assertEqual(host_with_service2.config.address, self.HOST2_ADDRESS) + self.assertEqual(host_with_service3.config.address, self.HOST2_ADDRESS) + + def test_get_service_config(self): + hosting = Hosting() + hosting.configure(self.HOSTING_CONFIG) + + service1_config = hosting.get_service_config(self.SERVICE1["name"]) + service2_config = hosting.get_service_config(self.SERVICE2["name"]) + service3_config = hosting.get_service_config(self.SERVICE3["name"]) + + self.assertEqual(service1_config.name, self.SERVICE1["name"]) + self.assertDictEqual(service1_config.attributes, self.SERVICE1_ATTRIBUTES) + + self.assertEqual(service2_config.name, self.SERVICE2["name"]) + self.assertDictEqual(service2_config.attributes, self.SERVICE2_ATTRIBUTES) + + self.assertEqual(service3_config.name, self.SERVICE3["name"]) + self.assertDictEqual(service3_config.attributes, self.SERVICE3_ATTRIBUTES) + + def test_find_service_configs(self): + hosting = Hosting() + hosting.configure(self.HOSTING_CONFIG) + + all_services = hosting.find_service_configs(r".+") + self.assertEqual(len(all_services), 3) + + services = hosting.find_service_configs(rf"^{self.SERVICE_NAME_PREFIX}") + self.assertEqual(len(services), 2) + for service in services: + self.assertEqual( + service.name[: len(self.SERVICE_NAME_PREFIX)], self.SERVICE_NAME_PREFIX + ) + + service1 = hosting.find_service_configs(self.SERVICE1["name"]) + self.assertEqual(len(service1), 1) + self.assertDictEqual(service1[0].attributes, self.SERVICE1_ATTRIBUTES) diff --git a/tests/test_wallet.py b/tests/test_wallet.py new file mode 100644 index 00000000..b9352e98 --- /dev/null +++ b/tests/test_wallet.py @@ -0,0 +1,38 @@ +import json +import os +from unittest import TestCase +from uuid import uuid4 + +from neo3.wallet import Wallet + +from neofs_testlib.utils.wallet import init_wallet, get_last_address_from_wallet + + +class TestWallet(TestCase): + DEFAULT_PASSWORD = "password" + EMPTY_PASSWORD = "" + + def test_init_wallet(self): + wallet_file_path = f"{str(uuid4())}.json" + for password in (self.EMPTY_PASSWORD, self.DEFAULT_PASSWORD): + wrong_password = "wrong_password" + init_wallet(wallet_file_path, password) + self.assertTrue(os.path.exists(wallet_file_path)) + with open(wallet_file_path, "r") as wallet_file: + Wallet.from_json(json.load(wallet_file), password=password) + with self.assertRaises(ValueError): + with open(wallet_file_path, "r") as wallet_file: + Wallet.from_json(json.load(wallet_file), password=wrong_password) + os.unlink(wallet_file_path) + + def test_get_last_address_from_wallet(self): + wallet_file_path = f"{str(uuid4())}.json" + init_wallet(wallet_file_path, self.DEFAULT_PASSWORD) + with open(wallet_file_path, "r") as wallet_file: + wallet = Wallet.from_json(json.load(wallet_file), password=self.DEFAULT_PASSWORD) + last_address = wallet.accounts[-1].address + self.assertEqual( + get_last_address_from_wallet(wallet_file_path, self.DEFAULT_PASSWORD), + last_address, + ) + os.unlink(wallet_file_path)