Update network tests #173
2 changed files with 38 additions and 36 deletions
|
@ -57,7 +57,7 @@ def pytest_collection_modifyitems(items: list[pytest.Item]):
|
||||||
def priority(item: pytest.Item) -> int:
|
def priority(item: pytest.Item) -> int:
|
||||||
is_node_mgmt_test = 1 if item.get_closest_marker("node_mgmt") else 0
|
is_node_mgmt_test = 1 if item.get_closest_marker("node_mgmt") else 0
|
||||||
is_logs_check_test = 100 if item.get_closest_marker("logs_after_session") else 0
|
is_logs_check_test = 100 if item.get_closest_marker("logs_after_session") else 0
|
||||||
is_system_time_test = 10 if item.get_closest_marker("system_time") else 0
|
is_system_time_test = 10 if item.get_closest_marker("time") else 0
|
||||||
return is_node_mgmt_test + is_logs_check_test + is_system_time_test
|
return is_node_mgmt_test + is_logs_check_test + is_system_time_test
|
||||||
|
|
||||||
items.sort(key=lambda item: priority(item))
|
items.sort(key=lambda item: priority(item))
|
||||||
|
|
|
@ -51,7 +51,7 @@ class TestFailoverNetwork(ClusterTestBase):
|
||||||
not_empty = len(blocked_nodes) != 0
|
not_empty = len(blocked_nodes) != 0
|
||||||
for node in list(blocked_nodes):
|
for node in list(blocked_nodes):
|
||||||
with reporter.step(f"Restore network for {node}"):
|
with reporter.step(f"Restore network for {node}"):
|
||||||
cluster_state_controller.restore_traffic(mode="ports", node=node)
|
cluster_state_controller.restore_traffic(node=node)
|
||||||
blocked_nodes.remove(node)
|
blocked_nodes.remove(node)
|
||||||
if not_empty:
|
if not_empty:
|
||||||
parallel(healthcheck.storage_healthcheck, self.cluster.cluster_nodes)
|
parallel(healthcheck.storage_healthcheck, self.cluster.cluster_nodes)
|
||||||
|
@ -149,43 +149,45 @@ class TestFailoverNetwork(ClusterTestBase):
|
||||||
# TODO: the intent of this logic is not clear, need to revisit
|
# TODO: the intent of this logic is not clear, need to revisit
|
||||||
nodes_to_block = random.choices(nodes, k=2)
|
nodes_to_block = random.choices(nodes, k=2)
|
||||||
|
|
||||||
excluded_nodes = []
|
nodes_non_block = list(set(self.cluster.storage_nodes) - set(nodes_to_block))
|
||||||
for node in nodes_to_block:
|
nodes_non_block_cluster = [
|
||||||
with reporter.step(f"Block incoming traffic at node {node} on port {PORTS_TO_BLOCK}"):
|
cluster_node for cluster_node in self.cluster.cluster_nodes if cluster_node.storage_node in nodes_non_block
|
||||||
block_node = [
|
]
|
||||||
cluster_node for cluster_node in self.cluster.cluster_nodes if cluster_node.storage_node == node
|
with reporter.step("Block traffic and check corrupted object"):
|
||||||
]
|
for node in nodes_non_block_cluster:
|
||||||
blocked_nodes.append(*block_node)
|
with reporter.step(f"Block incoming traffic at node {node}"):
|
||||||
excluded_nodes.append(node)
|
blocked_nodes.append(node)
|
||||||
cluster_state_controller.drop_traffic(
|
cluster_state_controller.drop_traffic(
|
||||||
mode="ports",
|
node=node, wakeup_timeout=wakeup_node_timeout, name_interface="data", block_nodes=nodes_to_block
|
||||||
node=node,
|
)
|
||||||
wakeup_timeout=wakeup_node_timeout,
|
|
||||||
ports=PORTS_TO_BLOCK,
|
|
||||||
)
|
|
||||||
|
|
||||||
with reporter.step(f"Check object is not stored on node {node}"):
|
with reporter.step(f"Check object is not stored on node {node}"):
|
||||||
new_nodes = wait_object_replication(
|
new_nodes = wait_object_replication(
|
||||||
cid,
|
cid,
|
||||||
oid,
|
oid,
|
||||||
2,
|
2,
|
||||||
shell=self.shell,
|
shell=self.shell,
|
||||||
nodes=list(set(self.cluster.storage_nodes) - set(excluded_nodes)),
|
nodes=list(set(self.cluster.storage_nodes) - set(nodes_non_block)),
|
||||||
)
|
)
|
||||||
assert node not in new_nodes
|
assert node.storage_node not in new_nodes
|
||||||
|
|
||||||
with reporter.step("Check object data is not corrupted"):
|
with reporter.step("Check object data is not corrupted"):
|
||||||
got_file_path = get_object(wallet, cid, oid, endpoint=new_nodes[0].get_rpc_endpoint(), shell=self.shell)
|
got_file_path = get_object(
|
||||||
assert get_file_hash(source_file_path) == get_file_hash(got_file_path)
|
wallet, cid, oid, endpoint=new_nodes[0].get_rpc_endpoint(), shell=self.shell
|
||||||
|
)
|
||||||
|
assert get_file_hash(source_file_path) == get_file_hash(got_file_path)
|
||||||
|
|
||||||
for node in nodes_to_block:
|
with reporter.step(f"Unblock incoming traffic"):
|
||||||
with reporter.step(f"Unblock incoming traffic at host {node} on port {PORTS_TO_BLOCK}"):
|
for node in nodes_non_block_cluster:
|
||||||
cluster_state_controller.restore_traffic(mode="ports", node=node)
|
with reporter.step(f"Unblock at host {node}"):
|
||||||
block_node = [
|
cluster_state_controller.restore_traffic(node=node)
|
||||||
cluster_node for cluster_node in self.cluster.cluster_nodes if cluster_node.storage_node == node
|
block_node = [
|
||||||
]
|
cluster_node
|
||||||
blocked_nodes.remove(*block_node)
|
for cluster_node in self.cluster.cluster_nodes
|
||||||
sleep(wakeup_node_timeout)
|
if cluster_node.storage_node == node.storage_node
|
||||||
|
]
|
||||||
|
blocked_nodes.remove(*block_node)
|
||||||
|
sleep(wakeup_node_timeout)
|
||||||
|
|
||||||
with reporter.step("Check object data is not corrupted"):
|
with reporter.step("Check object data is not corrupted"):
|
||||||
new_nodes = wait_object_replication(cid, oid, 2, shell=self.shell, nodes=self.cluster.storage_nodes)
|
new_nodes = wait_object_replication(cid, oid, 2, shell=self.shell, nodes=self.cluster.storage_nodes)
|
||||||
|
|
Loading…
Reference in a new issue