Blob Blame History Raw
import contextlib
import ctypes
import fcntl
import os
import stat


__all__ = [
    "Loop",
    "LoopControl",
    "UnexpectedDevice"
]


class UnexpectedDevice(Exception):
    def __init__(self, expected_minor, rdev, mode):
        super(UnexpectedDevice, self).__init__()
        self.expected_minor = expected_minor
        self.rdev = rdev
        self.mode = mode


class LoopInfo(ctypes.Structure):
    _fields_ = [
        ('lo_device', ctypes.c_uint64),
        ('lo_inode', ctypes.c_uint64),
        ('lo_rdevice', ctypes.c_uint64),
        ('lo_offset', ctypes.c_uint64),
        ('lo_sizelimit', ctypes.c_uint64),
        ('lo_number', ctypes.c_uint32),
        ('lo_encrypt_type', ctypes.c_uint32),
        ('lo_encrypt_key_size', ctypes.c_uint32),
        ('lo_flags', ctypes.c_uint32),
        ('lo_file_name', ctypes.c_uint8 * 64),
        ('lo_crypt_name', ctypes.c_uint8 * 64),
        ('lo_encrypt_key', ctypes.c_uint8 * 32),
        ('lo_init', ctypes.c_uint64 * 2)
    ]


class Loop:
    """Loopback device

    A class representing a Linux loopback device, typically found at
    /dev/loop{minor}.

    Methods
    -------
    set_fd(fd)
        Bind a file descriptor to the loopback device
    clear_fd()
        Unbind the file descriptor from the loopback device
    change_fd(fd)
        Replace the bound file descriptor
    set_capacity()
        Re-read the capacity of the backing file
    set_status(offset=None, sizelimit=None, autoclear=None, partscan=None)
        Set properties of the loopback device
    mknod(dir_fd, mode=0o600)
        Create a secondary device node
    """

    LOOP_MAJOR = 7

    LO_FLAGS_READ_ONLY = 1
    LO_FLAGS_AUTOCLEAR = 4
    LO_FLAGS_PARTSCAN = 8
    LO_FLAGS_DIRECT_IO = 16

    LOOP_SET_FD = 0x4C00
    LOOP_CLR_FD = 0x4C01
    LOOP_SET_STATUS64 = 0x4C04
    LOOP_GET_STATUS64 = 0x4C05
    LOOP_CHANGE_FD = 0x4C06
    LOOP_SET_CAPACITY = 0x4C07
    LOOP_SET_DIRECT_IO = 0x4C08
    LOOP_SET_BLOCK_SIZE = 0x4C09

    def __init__(self, minor, dir_fd=None):
        """
        Parameters
        ----------
        minor
            the minor number of the underlying device
        dir_fd : int, optional
            A directory file descriptor to a filesystem containing the
            underlying device node, or None to use /dev (default is None)

        Raises
        ------
        UnexpectedDevice
            If the file in the expected device node location is not the
            expected device node
        """

        self.devname = f"loop{minor}"
        self.minor = minor

        with contextlib.ExitStack() as stack:
            if not dir_fd:
                dir_fd = os.open("/dev", os.O_DIRECTORY)
                stack.callback(lambda: os.close(dir_fd))
            self.fd = os.open(self.devname, os.O_RDWR, dir_fd=dir_fd)

        info = os.stat(self.fd)
        if ((not stat.S_ISBLK(info.st_mode)) or
                (not os.major(info.st_rdev) == self.LOOP_MAJOR) or
                (not os.minor(info.st_rdev) == minor)):
            raise UnexpectedDevice(minor, info.st_rdev, info.st_mode)

    def __del__(self):
        self.close()

    def close(self):
        """Close this loop device.

        No operations on this object are valid after this call.
        """
        if self.fd >= 0:
            os.close(self.fd)
            self.fd = -1
            self.devname = "<closed>"

    def set_fd(self, fd):
        """Bind a file descriptor to the loopback device

        The loopback device must be unbound. The backing file must be
        either a regular file or a block device. If the backing file is
        itself a loopback device, then a cycle must not be created. If
        the backing file is opened read-only, then the resulting
        loopback device will be read-only too.

        Parameters
        ----------
        fd : int
            the file descriptor to bind
        """

        fcntl.ioctl(self.fd, self.LOOP_SET_FD, fd)

    def clear_fd(self):
        """Unbind the file descriptor from the loopback device

        The loopback device must be bound. The device is then marked
        to be cleared, so once nobody holds it open any longer the
        backing file is unbound and the device returns to the unbound
        state.
        """

        fcntl.ioctl(self.fd, self.LOOP_CLR_FD)

    def change_fd(self, fd):
        """Replace the bound filedescriptor

        Atomically replace the backing filedescriptor of the loopback
        device, even if the device is held open.

        The effective size (taking sizelimit into account) of the new
        and existing backing file descriptors must be the same, and
        the loopback device must be read-only. The loopback device will
        remain read-only, even if the new file descriptor was opened
        read-write.

        Parameters
        ----------
        fd : int
            the file descriptor to change to
        """

        fcntl.ioctl(self.fd, self.LOOP_CHANGE_FD, fd)

    def set_status(self, offset=None, sizelimit=None, autoclear=None, partscan=None):
        """Set properties of the loopback device

        The loopback device must be bound, and the properties will be
        cleared once the device is unbound, but preserved by changing
        the backing file descriptor.

        Note that this operation is not atomic: All the current properties
        are read out, the ones specified in this function call are modified,
        and then they are written back. For this reason, concurrent
        modification of the properties must be avoided.

        Setting sizelimit means the size of the loopback device is taken
        to be the max of the size of the backing file and the limit. A
        limit of 0 is taken to mean unlimited.

        Enabling autoclear has the same effect as calling clear_fd().

        When partscan is first enabled, the partition table of the
        device is scanned, and new blockdevices potentially added for
        the partitions.

        Parameters
        ----------
        offset : int, optional
            The offset in bytes from the start of the backing file, or
            None to leave unchanged (default is None)
        sizelimit : int, optional
            The max size in bytes to make the loopback device, or None
            to leave unchanged (default is None)
        autoclear : bool, optional
            Whether or not to enable autoclear, or None to leave unchanged
            (default is None)
        partscan : bool, optional
            Whether or not to enable partition scanning, or None to leave
            unchanged (default is None)
        """

        info = LoopInfo()
        fcntl.ioctl(self.fd, self.LOOP_GET_STATUS64, info)
        if offset:
            info.lo_offset = offset
        if sizelimit:
            info.lo_sizelimit = sizelimit
        if autoclear is not None:
            if autoclear:
                info.lo_flags |= self.LO_FLAGS_AUTOCLEAR
            else:
                info.lo_flags &= ~self.LO_FLAGS_AUTOCLEAR
        if partscan is not None:
            if partscan:
                info.lo_flags |= self.LO_FLAGS_PARTSCAN
            else:
                info.lo_flags &= ~self.LO_FLAGS_PARTSCAN
        fcntl.ioctl(self.fd, self.LOOP_SET_STATUS64, info)

    def set_direct_io(self, dio=True):
        """Set the direct-IO property on the loopback device

        Enabling direct IO allows one to avoid double caching, which
        should improve performance and memory usage.

        Parameters
        ----------
        dio : bool, optional
            Whether or not to enable direct IO (default is True)
        """

        fcntl.ioctl(self.fd, self.LOOP_SET_DIRECT_IO, dio)

    def mknod(self, dir_fd, mode=0o600):
        """Create a secondary device node

        Create a device node with the correct name, mode, minor and major
        number in the provided directory.

        Note that the device node will survive even if a device is
        unbound and rebound, so anyone with access to the device node
        will have access to any future devices with the same minor
        number. The intended use of this is to first bind a file
        descriptor to a loopback device, then mknod it where it should
        be accessed from, and only after the destination directory is
        ensured to have been destroyed/made inaccessible should the the
        loopback device be unbound.

        Note that the provided directory should not be devtmpfs, as the
        device node is guaranteed to already exist there, and the call
        would hence fail.

        Parameters
        ----------
        dir_fd : int
            Target directory file descriptor
        mode : int, optional
            Access mode on the created device node (0o600 is default)
        """

        os.mknod(self.devname,
                 mode=(stat.S_IMODE(mode) | stat.S_IFBLK),
                 device=os.makedev(self.LOOP_MAJOR, self.minor),
                 dir_fd=dir_fd)


class LoopControl:
    """Loopback control device

    A class representing the Linux loopback control device, typically
    found at /dev/loop-control. It allows the creation and destruction
    of loopback devices.

    A loopback device may be bound, which means that a file descriptor
    has been attached to it as its backing file. Otherwise, it is
    considered unbound.

    Methods
    -------
    add(minor)
        Add a new loopback device
    remove(minor)
        Remove an existing loopback device
    get_unbound()
        Get or create the first unbound loopback device
    """

    LOOP_CTL_ADD = 0x4C80
    LOOP_CTL_REMOVE = 0x4C81
    LOOP_CTL_GET_FREE = 0x4C82

    def __init__(self, dir_fd=None):
        """
        Parameters
        ----------
        dir_fd : int, optional
            A directory filedescriptor to a devtmpfs filesystem,
            or None to use /dev (default is None)
        """

        if not dir_fd:
            dir_fd = os.open("/dev", os.O_DIRECTORY)
        self.fd = os.open("loop-control", os.O_RDWR, dir_fd=dir_fd)

    def add(self, minor=-1):
        """Add a new loopback device

        Add a new, unbound loopback device. If a minor number is given
        and it is positive, a loopback device with that minor number
        is added. Otherwise, if there are no unbound devices, a device
        using the first unused minor number is created.

        Parameters
        ----------
        minor : int, optional
            The requested minor number, or a negative value for
            unspecified (default is -1)

        Returns
        -------
        int
            The minor number of the created device
        """

        return fcntl.ioctl(self.fd, self.LOOP_CTL_ADD, minor)

    def remove(self, minor=-1):
        """Remove an existing loopback device

        Removes an unbound and unopen loopback device. If a minor
        number is given and it is positive, the loopback device
        with that minor number is removed. Otherwise, the first
        unbound device is attempted removed.

        Parameters
        ----------
        minor : int, optional
            The requested minor number, or a negative value for
            unspecified (default is -1)
        """

        fcntl.ioctl(self.fd, self.LOOP_CTL_REMOVE, minor)

    def get_unbound(self):
        """Get or create an unbound loopback device

        If an unbound loopback device exists, returns it.
        Otherwise, create a new one.

        Returns
        -------
        int
            The minor number of the returned device
        """

        return fcntl.ioctl(self.fd, self.LOOP_CTL_GET_FREE)