forked from TrueCloudLab/frostfs-testlib
19 lines
746 B
Python
19 lines
746 B
Python
from frostfs_testlib.shell import CommandOptions
|
|
from frostfs_testlib.storage.cluster import ClusterNode
|
|
|
|
|
|
class IpHelper:
|
|
@staticmethod
|
|
def drop_input_traffic_to_node(node: ClusterNode, block_ip: list[str]) -> None:
|
|
shell = node.host.get_shell()
|
|
for ip in block_ip:
|
|
shell.exec(f"ip route add blackhole {ip}")
|
|
|
|
@staticmethod
|
|
def restore_input_traffic_to_node(node: ClusterNode) -> None:
|
|
shell = node.host.get_shell()
|
|
unlock_ip = shell.exec("ip route list | grep blackhole", CommandOptions(check=False))
|
|
if unlock_ip.return_code != 0:
|
|
return
|
|
for ip in unlock_ip.stdout.strip().split("\n"):
|
|
shell.exec(f"ip route del blackhole {ip.split(' ')[1]}")
|