frostfs-testlib/src/frostfs_testlib/storage/controllers/disk_controller.py
Andrey Berezin 997e768e92 Move shared code to testlib
Signed-off-by: Andrey Berezin <a.berezin@yadro.com>
2023-05-14 13:43:59 +03:00

41 lines
1.6 KiB
Python

from frostfs_testlib.hosting.interfaces import DiskInfo
from frostfs_testlib.shell import CommandOptions
from frostfs_testlib.storage.cluster import StorageNode
from frostfs_testlib.testing.test_control import wait_for_success
class DiskController:
def __init__(self, node: StorageNode, device: str, mountpoint: str) -> None:
self.node: StorageNode = node
self.device: str = device
self.device_by_label: str
self.mountpoint: str = mountpoint.strip()
self.disk_info: DiskInfo = DiskInfo()
self.id = self.get_id(node, device)
shell = node.host.get_shell()
cmd = f"sudo udevadm info -n {device} | egrep \"S:.*label\" | awk '{{print $2}}'"
self.device_by_label = f"/dev/{shell.exec(cmd).stdout.strip()}"
@wait_for_success(60, 3, False)
def _wait_until_detached(self):
return self.node.host.is_disk_attached(self.device, self.disk_info)
@wait_for_success(60, 3, True)
def _wait_until_attached(self):
return self.node.host.is_disk_attached(self.device, self.disk_info)
def detach(self):
self.disk_info = self.node.host.detach_disk(self.device)
self._wait_until_detached()
def attach(self):
self.node.host.attach_disk(self.device, self.disk_info)
self._wait_until_attached()
remote_shell = self.node.host.get_shell()
remote_shell.exec(f"sudo umount -l {self.device}", options=CommandOptions(check=False))
remote_shell.exec(f"sudo mount {self.device_by_label} {self.mountpoint}")
@staticmethod
def get_id(node: StorageNode, device: str):
return f"{node.host.config.address} - {device}"