failover kill services #3

Closed
vkarak1 wants to merge 1 commit from vkarak1_failover_kill_services into master
2 changed files with 335 additions and 1 deletions

View file

@ -272,7 +272,7 @@ class Cluster:
return self._get_nodes(_ServicesNames.S3_GATE)
@property
def http_gates(self) -> list[S3Gate]:
def http_gates(self) -> list[HTTPGate]:
"""
Returns list of HTTP gates
"""

View file

@ -0,0 +1,334 @@
import logging
import random
import allure
import pytest
from cluster import HTTPGate, MorphChain, StorageNode
from epoch import ensure_fresh_epoch, wait_for_epochs_align
from file_helper import generate_file
from neofs_testlib.hosting import Host
from neofs_testlib.shell import Shell
from python_keywords.container import create_container
from python_keywords.http_gate import (
get_object_and_verify_hashes,
get_via_http_curl,
upload_via_http_gate_curl,
)
from python_keywords.neofs_verbs import delete_object, put_object_to_random_node, search_object
from python_keywords.node_management import check_node_in_map, storage_node_healthcheck
from tenacity import retry, stop_after_attempt, wait_fixed
from test_control import wait_for_success
from wallet import WalletFactory, WalletFile
from wellknown_acl import PUBLIC_ACL
from steps.cluster_test_base import ClusterTestBase
logger = logging.getLogger("NeoLogger")
NEOFS_IR = "neofs-ir"
NEOFS_HTTP = "neofs-http"
NEOFS_NODE = "neofs-storage"
NEO_GO = "neo-go"
@pytest.mark.failover
@pytest.mark.failover_network
abereziny commented 2023-01-18 16:38:31 +00:00 (Migrated from github.com)
Review

failover_network?

failover_network?
class TestFailoverServiceKill(ClusterTestBase):
PLACEMENT_RULE = "REP 1 IN X CBF 1 SELECT 1 FROM * AS X"
@pytest.fixture(scope="class")
def user_wallet(self, wallet_factory: WalletFactory) -> WalletFile:
return wallet_factory.create_wallet()
@pytest.fixture(scope="class")
@allure.title("Create container")
def user_container(self, user_wallet: WalletFile):
return create_container(
wallet=user_wallet.path,
shell=self.shell,
endpoint=self.cluster.default_rpc_endpoint,
rule=self.PLACEMENT_RULE,
basic_acl=PUBLIC_ACL,
)
@allure.title("Check and return status of given service")
def service_status(self, service: str, shell: Shell) -> str:
return shell.exec(f"sudo systemctl is-active {service}").stdout.rstrip()
@allure.step("Run health check to node")
def health_check(self, node: StorageNode):
abereziny commented 2023-01-18 16:42:23 +00:00 (Migrated from github.com)
Review
We have similar functionality here: https://github.com/TrueCloudLab/frostfs-testcases/blob/6e4558d7920415717246409568add971b146ce1b/pytest_tests/testsuites/network/test_node_management.py#L435,L465
health_check = storage_node_healthcheck(node)
assert health_check.health_status == "READY" and health_check.network_status == "ONLINE"
@allure.step("Wait for expected status of passed service")
@wait_for_success(60, 5)
def wait_for_expected_status(self, service: str, status: str, shell: Shell):
real_status = self.service_status(service=service, shell=shell)
assert (
status == real_status
), f"Service {service}: expected status= {status}, real status {real_status}"
@allure.step("Run neo-go dump-keys")
def neo_go_dump_keys(self, shell: Shell, node: StorageNode) -> dict:
abereziny commented 2023-01-18 16:44:19 +00:00 (Migrated from github.com)
Review
Already in testlib https://github.com/TrueCloudLab/frostfs-testlib/blob/master/src/neofs_testlib/cli/neogo/wallet.py#L149
host = node.host
service_config = host.get_service_config(node.name)
wallet_path = service_config.attributes["wallet_path"]
output = shell.exec(f"neo-go wallet dump-keys -w {wallet_path}").stdout
try:
# taking first line from command's output contain wallet address
first_line = output.split("\n")[0]
except Exception:
logger.error(f"Got empty output (neo-go dump keys): {output}")
address_id = first_line.split()[0]
# taking second line from command's output contain wallet key
wallet_key = output.split("\n")[1]
return {address_id: wallet_key}
@allure.step("Run neo-go query height")
def neo_go_query_height(self, shell: Shell, node: StorageNode) -> dict:
abereziny commented 2023-01-18 16:46:05 +00:00 (Migrated from github.com)
Review
In a testlib https://github.com/TrueCloudLab/frostfs-testlib/blob/980158ad72180f93d6e7acfc05946faa9feba21c/src/neofs_testlib/cli/neogo/query.py#L44
morph_chain = self.morph_chain(node)
output = shell.exec(f"neo-go query height -r {morph_chain.get_endpoint()}").stdout
try:
# taking first line from command's output contain the latest block in blockchain
first_line = output.split("\n")[0]
except Exception:
logger.error(f"Got empty output (neo-go query height): {output}")
latest_block = first_line.split(":")
# taking second line from command's output contain wallet key
second_line = output.split("\n")[1]
validated_state = second_line.split(":")
return {
latest_block[0].replace(":", ""): int(latest_block[1]),
validated_state[0].replace(":", ""): int(validated_state[1]),
}
@allure.step("Kill process")
def kill_by_pid(self, pid: int, shell: Shell):
shell.exec(f"sudo kill -9 {pid}")
@allure.step("Kill by process name")
def kill_by_service_name(self, service: str, shell: Shell):
self.kill_by_pid(self.service_pid(service, shell), shell)
@allure.step("Return pid by service name")
# retry mechanism cause when the task has been started recently '0' PID could be returned
@retry(wait=wait_fixed(10), stop=stop_after_attempt(5), reraise=True)
abereziny commented 2023-01-18 16:47:33 +00:00 (Migrated from github.com)
Review

wait_for_sucess will also work

wait_for_sucess will also work
def service_pid(self, service: str, shell: Shell) -> int:
output = shell.exec(f"systemctl show --property MainPID {service}").stdout.rstrip()
splitted = output.split("=")
PID = int(splitted[1])
assert PID > 0, f"Service {service} has invalid PID={PID}"
return PID
@allure.step("HTTPGate according to the passed node")
def http_gate(self, node: StorageNode) -> HTTPGate:
abereziny commented 2023-01-18 16:50:00 +00:00 (Migrated from github.com)
Review

The better approach is to have ClusterNode (physical one)
details in the dm.

The better approach is to have ClusterNode (physical one) details in the dm.
index = self.cluster.storage_nodes.index(node)
return self.cluster.http_gates[index]
@allure.step("MorphChain according to the passed node")
def morph_chain(self, node: StorageNode) -> MorphChain:
index = self.cluster.storage_nodes.index(node)
return self.cluster.morph_chain_nodes[index]
@allure.step("WalletFile according to the passed node")
def storage_wallet(self, node: StorageNode) -> WalletFile:
return WalletFile.from_node(node)
def random_node(self, service: str):
with allure.step(f"Find random node to process"):
rand_node_num = random.randint(0, len(self.cluster.storage_nodes) - 1)
node = self.cluster.storage_nodes[rand_node_num]
shell = node.host.get_shell()
with allure.step(f"Get status of {service} from the node {node.get_rpc_endpoint()}"):
self.wait_for_expected_status(service=service, status="active", shell=shell)
return node
@allure.title(
f"kill {NEO_GO}, wait for restart, then check is service healthy and can continue to process"
)
def test_neofs_go(self):
node = self.random_node(service=NEO_GO)
shell = node.host.get_shell()
initial_pid = self.service_pid(NEO_GO, shell)
dump_keys = self.neo_go_dump_keys(node=node, shell=shell)
with allure.step(
f"Node: {node.get_rpc_endpoint()}-> Kill {NEO_GO} service, PID {initial_pid}, then wait till the task will be restarted"
):
self.kill_by_service_name(NEO_GO, shell)
self.wait_for_expected_status(service=NEO_GO, status="active", shell=shell)
with allure.step(f"Verify that pid has been changed"):
new_pid = self.service_pid(NEO_GO, shell)
assert (
initial_pid != new_pid
), f"Pid hasn't been changed - initial {initial_pid}, new {new_pid}"
with allure.step(f"Verify that {NEO_GO} dump-keys and query height are working well"):
dump_keys_after_restart = self.neo_go_dump_keys(node=node, shell=shell)
assert (
dump_keys == dump_keys_after_restart
), f"Dump keys should be equal, initial:{dump_keys}, after restart: {dump_keys_after_restart}"
query_height_result = self.neo_go_query_height(shell=shell, node=node)
logger.info(f"QueryRst= {query_height_result}")
@allure.title(
f"kill {NEOFS_IR}, wait for restart, then check is service healthy and can continue to process"
)
def test_neofs_ir(self):
node = self.random_node(service=NEOFS_IR)
shell = node.host.get_shell()
initial_pid = self.service_pid(NEOFS_IR, shell)
with allure.step(
f"Node: {node.get_rpc_endpoint()}-> Kill {NEOFS_IR} service, PID {initial_pid}, then wait till the task will be restarted"
):
self.kill_by_service_name(NEOFS_IR, shell)
self.wait_for_expected_status(service=NEOFS_IR, status="active", shell=shell)
with allure.step(f"Verify that pid has been changed"):
new_pid = self.service_pid(NEOFS_IR, shell)
assert (
initial_pid != new_pid
), f"Pid hasn't been changed - initial {initial_pid}, new {new_pid}"
with allure.step(
f"Node: {node.get_rpc_endpoint()}-> Force-new-epoch - check that {NEOFS_IR} is alive"
):
ensure_fresh_epoch(self.shell, self.cluster)
wait_for_epochs_align(self.shell, self.cluster)
self.health_check(node)
@pytest.mark.parametrize(
"object_size",
[pytest.lazy_fixture("simple_object_size"), pytest.lazy_fixture("complex_object_size")],
ids=["simple object", "complex object"],
)
@allure.title(
f"kill {NEOFS_HTTP}, wait for restart, then check is service healthy and can continue to process"
)
def test_neofs_http(self, user_container: str, object_size: int, user_wallet: WalletFile):
node = self.random_node(service=NEOFS_HTTP)
http = self.http_gate(node)
shell = node.host.get_shell()
initial_pid = self.service_pid(NEOFS_HTTP, shell)
file_path_grpc = generate_file(object_size)
with allure.step("Put objects using gRPC"):
oid_grpc = put_object_to_random_node(
wallet=user_wallet.path,
path=file_path_grpc,
cid=user_container,
shell=self.shell,
cluster=self.cluster,
)
with allure.step(
f"Node: {node.get_rpc_endpoint()}-> Kill {NEOFS_HTTP} service, PID {initial_pid}, then wait till the task will be restarted"
):
self.kill_by_service_name(NEOFS_HTTP, shell)
self.wait_for_expected_status(service=NEOFS_HTTP, status="active", shell=shell)
with allure.step(f"Verify that pid has been changed"):
new_pid = self.service_pid(NEOFS_HTTP, shell)
assert (
initial_pid != new_pid
), f"Pid hasn't been changed - initial {initial_pid}, new {new_pid}"
with allure.step(f"Get object_grpc and verify hashes: endpoint {node.get_rpc_endpoint()}"):
get_object_and_verify_hashes(
oid=oid_grpc,
file_name=file_path_grpc,
wallet=user_wallet.path,
cid=user_container,
shell=self.shell,
nodes=self.cluster.storage_nodes,
endpoint=http.get_endpoint(),
object_getter=get_via_http_curl,
)
with allure.step(f"Put objects using curl utility - endpoint {node.get_rpc_endpoint()}"):
file_path_http = generate_file(object_size)
oid_http = upload_via_http_gate_curl(
cid=user_container, filepath=file_path_http, endpoint=http.get_endpoint()
)
with allure.step(f"Get object_http and verify hashes: endpoint {http.get_endpoint()}"):
get_object_and_verify_hashes(
oid=oid_http,
file_name=file_path_http,
wallet=user_wallet.path,
cid=user_container,
shell=self.shell,
nodes=self.cluster.storage_nodes,
endpoint=http.get_endpoint(),
object_getter=get_via_http_curl,
)
@pytest.mark.parametrize(
"object_size",
[pytest.lazy_fixture("simple_object_size"), pytest.lazy_fixture("complex_object_size")],
ids=["simple object", "complex object"],
)
@allure.title(
f"kill {NEOFS_NODE}, wait for restart, then check is service healthy and can continue to process"
)
def test_neofs_node(self, user_container: str, object_size: int, user_wallet: WalletFile):
node = self.random_node(service=NEOFS_NODE)
http = self.http_gate(node)
shell = node.host.get_shell()
initial_pid = self.service_pid(NEOFS_NODE, shell)
file_path1 = generate_file(object_size)
with allure.step("Put objects#1 using gRPC"):
oid1 = put_object_to_random_node(
wallet=user_wallet.path,
path=file_path1,
cid=user_container,
shell=self.shell,
cluster=self.cluster,
)
with allure.step(
f"Node: {node.get_rpc_endpoint()}-> Kill {NEOFS_NODE} service, PID {initial_pid}, then wait till the task will be restarted"
):
self.kill_by_service_name(NEOFS_NODE, shell)
self.wait_for_expected_status(service=NEOFS_NODE, status="active", shell=shell)
with allure.step(f"Verify that pid has been changed"):
new_pid = self.service_pid(NEOFS_NODE, shell)
assert (
initial_pid != new_pid
), f"Pid hasn't been changed - initial {initial_pid}, new {new_pid}"
with allure.step(f"Get object#1 and verify hashes: endpoint {http.get_endpoint()}"):
get_object_and_verify_hashes(
oid=oid1,
file_name=file_path1,
wallet=user_wallet.path,
cid=user_container,
shell=self.shell,
nodes=self.cluster.storage_nodes,
endpoint=http.get_endpoint(),
)
file_path2 = generate_file(object_size)
with allure.step("Put objects#2 using gRPC"):
oid2 = put_object_to_random_node(
wallet=user_wallet.path,
path=file_path2,
cid=user_container,
shell=self.shell,
cluster=self.cluster,
)
with allure.step("Search object2"):
search_result = search_object(
user_wallet.path, user_container, shell=self.shell, endpoint=node.get_rpc_endpoint()
)
if oid2 not in search_result:
raise AssertionError(f"Object_id {oid2} not found in {search_result}")
with allure.step("Get object2"):
get_object_and_verify_hashes(
oid=oid2,
file_name=file_path2,
wallet=user_wallet.path,
cid=user_container,
shell=self.shell,
nodes=self.cluster.storage_nodes,
endpoint=http.get_endpoint(),
object_getter=get_via_http_curl,
)
with allure.step("Delete objects"):
for oid_to_delete in oid1, oid2:
delete_object(
user_wallet.path,
user_container,
oid_to_delete,
self.shell,
node.get_rpc_endpoint(),
)