Blame libnmstate/nm/connection.py

Packit b9ca78
#
Packit b9ca78
# Copyright (c) 2018-2020 Red Hat, Inc.
Packit b9ca78
#
Packit b9ca78
# This file is part of nmstate
Packit b9ca78
#
Packit b9ca78
# This program is free software: you can redistribute it and/or modify
Packit b9ca78
# it under the terms of the GNU Lesser General Public License as published by
Packit b9ca78
# the Free Software Foundation, either version 2.1 of the License, or
Packit b9ca78
# (at your option) any later version.
Packit b9ca78
#
Packit b9ca78
# This program is distributed in the hope that it will be useful,
Packit b9ca78
# but WITHOUT ANY WARRANTY; without even the implied warranty of
Packit b9ca78
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
Packit b9ca78
# GNU Lesser General Public License for more details.
Packit b9ca78
#
Packit b9ca78
# You should have received a copy of the GNU Lesser General Public License
Packit b9ca78
# along with this program. If not, see <https://www.gnu.org/licenses/>.
Packit b9ca78
#
Packit b9ca78
Packit b9ca78
import logging
Packit b9ca78
import uuid
Packit b9ca78
Packit b9ca78
from libnmstate.error import NmstateLibnmError
Packit b9ca78
from libnmstate.error import NmstateInternalError
Packit b9ca78
from libnmstate.error import NmstateValueError
Packit b9ca78
Packit b9ca78
from .common import NM
Packit b9ca78
from . import ipv4
Packit b9ca78
from . import ipv6
Packit b9ca78
Packit b9ca78
ACTIVATION_TIMEOUT_FOR_BRIDGE = 35  # Bridge STP requires 30 seconds.
Packit b9ca78
Packit b9ca78
Packit b9ca78
class ConnectionProfile:
Packit b9ca78
    def __init__(self, context, profile=None):
Packit b9ca78
        self._ctx = context
Packit b9ca78
        self._con_profile = profile
Packit b9ca78
        self._nm_dev = None
Packit b9ca78
        self._con_id = None
Packit b9ca78
        self._nm_ac = None
Packit b9ca78
        self._ac_handlers = set()
Packit b9ca78
        self._dev_handlers = set()
Packit b9ca78
Packit b9ca78
    def create(self, settings):
Packit b9ca78
        self.profile = NM.SimpleConnection.new()
Packit b9ca78
        for setting in settings:
Packit b9ca78
            self.profile.add_setting(setting)
Packit b9ca78
Packit b9ca78
    def import_by_device(self, nmdev=None):
Packit b9ca78
        ac = get_device_active_connection(nmdev or self.nmdevice)
Packit b9ca78
        if ac:
Packit b9ca78
            if nmdev:
Packit b9ca78
                self.nmdevice = nmdev
Packit b9ca78
            self.profile = ac.props.connection
Packit b9ca78
Packit b9ca78
    def import_by_id(self, con_id=None):
Packit b9ca78
        if con_id:
Packit b9ca78
            self.con_id = con_id
Packit b9ca78
        if self.con_id:
Packit b9ca78
            self.profile = self._ctx.client.get_connection_by_id(self.con_id)
Packit b9ca78
Packit b9ca78
    def update(self, con_profile, save_to_disk=True):
Packit b9ca78
        flags = NM.SettingsUpdate2Flags.BLOCK_AUTOCONNECT
Packit b9ca78
        if save_to_disk:
Packit b9ca78
            flags |= NM.SettingsUpdate2Flags.TO_DISK
Packit b9ca78
        else:
Packit b9ca78
            flags |= NM.SettingsUpdate2Flags.IN_MEMORY
Packit b9ca78
        action = f"Update profile: {self.profile.get_id()}"
Packit b9ca78
        user_data = action
Packit b9ca78
        args = None
Packit b9ca78
Packit b9ca78
        self._ctx.register_async(action, fast=True)
Packit b9ca78
        self.profile.update2(
Packit b9ca78
            con_profile.profile.to_dbus(NM.ConnectionSerializationFlags.ALL),
Packit b9ca78
            flags,
Packit b9ca78
            args,
Packit b9ca78
            self._ctx.cancellable,
Packit b9ca78
            self._update2_callback,
Packit b9ca78
            user_data,
Packit b9ca78
        )
Packit b9ca78
Packit b9ca78
    def add(self, save_to_disk=True):
Packit b9ca78
        nm_add_conn2_flags = NM.SettingsAddConnection2Flags
Packit b9ca78
        flags = nm_add_conn2_flags.BLOCK_AUTOCONNECT
Packit b9ca78
        if save_to_disk:
Packit b9ca78
            flags |= nm_add_conn2_flags.TO_DISK
Packit b9ca78
        else:
Packit b9ca78
            flags |= nm_add_conn2_flags.IN_MEMORY
Packit b9ca78
Packit b9ca78
        action = f"Add profile: {self.profile.get_id()}"
Packit b9ca78
        self._ctx.register_async(action, fast=True)
Packit b9ca78
Packit b9ca78
        user_data = action
Packit b9ca78
        args = None
Packit b9ca78
        ignore_out_result = False  # Don't fall back to old AddConnection()
Packit b9ca78
        self._ctx.client.add_connection2(
Packit b9ca78
            self.profile.to_dbus(NM.ConnectionSerializationFlags.ALL),
Packit b9ca78
            flags,
Packit b9ca78
            args,
Packit b9ca78
            ignore_out_result,
Packit b9ca78
            self._ctx.cancellable,
Packit b9ca78
            self._add_connection2_callback,
Packit b9ca78
            user_data,
Packit b9ca78
        )
Packit b9ca78
Packit b9ca78
    def delete(self):
Packit b9ca78
        if not self.profile:
Packit b9ca78
            self.import_by_id()
Packit b9ca78
            if not self.profile:
Packit b9ca78
                self.import_by_device()
Packit b9ca78
        if self.profile:
Packit b9ca78
            action = (
Packit b9ca78
                f"Delete profile: id:{self.profile.get_id()}, "
Packit b9ca78
                f"uuid:{self.profile.get_uuid()}"
Packit b9ca78
            )
Packit b9ca78
            user_data = action
Packit b9ca78
            self._ctx.register_async(action, fast=True)
Packit b9ca78
            self.profile.delete_async(
Packit b9ca78
                self._ctx.cancellable,
Packit b9ca78
                self._delete_connection_callback,
Packit b9ca78
                user_data,
Packit b9ca78
            )
Packit b9ca78
Packit b9ca78
    def activate(self):
Packit b9ca78
        if self.con_id:
Packit b9ca78
            self.import_by_id()
Packit b9ca78
        elif self.nmdevice:
Packit b9ca78
            self.import_by_device()
Packit b9ca78
        elif not self.profile:
Packit b9ca78
            raise NmstateInternalError(
Packit b9ca78
                "BUG: Failed  to find valid profile to activate: "
Packit b9ca78
                f"id={self.con_id}, dev={self.devname}"
Packit b9ca78
            )
Packit b9ca78
Packit b9ca78
        specific_object = None
Packit b9ca78
        if self.profile:
Packit b9ca78
            action = f"Activate profile: {self.profile.get_id()}"
Packit b9ca78
        elif self.nmdevice:
Packit b9ca78
            action = f"Activate profile: {self.nmdevice.get_iface()}"
Packit b9ca78
        else:
Packit b9ca78
            raise NmstateInternalError(
Packit b9ca78
                "BUG: Cannot activate a profile with empty profile id and "
Packit b9ca78
                "empty NM.Device"
Packit b9ca78
            )
Packit b9ca78
        user_data = action
Packit b9ca78
        self._ctx.register_async(action)
Packit b9ca78
        self._ctx.client.activate_connection_async(
Packit b9ca78
            self.profile,
Packit b9ca78
            self.nmdevice,
Packit b9ca78
            specific_object,
Packit b9ca78
            self._ctx.cancellable,
Packit b9ca78
            self._active_connection_callback,
Packit b9ca78
            user_data,
Packit b9ca78
        )
Packit b9ca78
Packit b9ca78
    @property
Packit b9ca78
    def profile(self):
Packit b9ca78
        return self._con_profile
Packit b9ca78
Packit b9ca78
    @profile.setter
Packit b9ca78
    def profile(self, con_profile):
Packit b9ca78
        assert self._con_profile is None
Packit b9ca78
        self._con_profile = con_profile
Packit b9ca78
Packit b9ca78
    @property
Packit b9ca78
    def devname(self):
Packit b9ca78
        if self._con_profile:
Packit b9ca78
            return self._con_profile.get_interface_name()
Packit b9ca78
        return None
Packit b9ca78
Packit b9ca78
    @property
Packit b9ca78
    def nmdevice(self):
Packit b9ca78
        return self._nm_dev
Packit b9ca78
Packit b9ca78
    @nmdevice.setter
Packit b9ca78
    def nmdevice(self, dev):
Packit b9ca78
        assert self._nm_dev is None
Packit b9ca78
        self._nm_dev = dev
Packit b9ca78
Packit b9ca78
    @property
Packit b9ca78
    def con_id(self):
Packit b9ca78
        con_id = self._con_profile.get_id() if self._con_profile else None
Packit b9ca78
        return self._con_id or con_id
Packit b9ca78
Packit b9ca78
    @con_id.setter
Packit b9ca78
    def con_id(self, connection_id):
Packit b9ca78
        assert self._con_id is None
Packit b9ca78
        self._con_id = connection_id
Packit b9ca78
Packit b9ca78
    def get_setting_duplicate(self, setting_name):
Packit b9ca78
        setting = None
Packit b9ca78
        if self.profile:
Packit b9ca78
            setting = self.profile.get_setting_by_name(setting_name)
Packit b9ca78
            if setting:
Packit b9ca78
                setting = setting.duplicate()
Packit b9ca78
        return setting
Packit b9ca78
Packit b9ca78
    def _active_connection_callback(self, src_object, result, user_data):
Packit b9ca78
        if self._ctx.is_cancelled():
Packit b9ca78
            self._activation_clean_up()
Packit b9ca78
            return
Packit b9ca78
        action = user_data
Packit b9ca78
Packit b9ca78
        try:
Packit b9ca78
            nm_act_con = src_object.activate_connection_finish(result)
Packit b9ca78
        except Exception as e:
Packit b9ca78
            self._ctx.fail(NmstateLibnmError(f"{action} failed: error={e}"))
Packit b9ca78
            return
Packit b9ca78
Packit b9ca78
        if nm_act_con is None:
Packit b9ca78
            self._ctx.fail(
Packit b9ca78
                NmstateLibnmError(
Packit b9ca78
                    f"{action} failed: "
Packit b9ca78
                    "error='None return from activate_connection_finish()'"
Packit b9ca78
                )
Packit b9ca78
            )
Packit b9ca78
        else:
Packit b9ca78
            devname = self.devname
Packit b9ca78
            logging.debug(
Packit b9ca78
                "Connection activation initiated: dev=%s, con-state=%s",
Packit b9ca78
                devname,
Packit b9ca78
                nm_act_con.props.state,
Packit b9ca78
            )
Packit b9ca78
            self._nm_ac = nm_act_con
Packit b9ca78
            self._nm_dev = self._ctx.get_nm_dev(devname)
Packit b9ca78
Packit b9ca78
            if is_activated(self._nm_ac, self._nm_dev):
Packit b9ca78
                self._ctx.finish_async(action)
Packit b9ca78
            elif self._is_activating():
Packit b9ca78
                self._wait_ac_activation(action)
Packit b9ca78
                if self._nm_dev:
Packit b9ca78
                    self.wait_dev_activation(action)
Packit b9ca78
            else:
Packit b9ca78
                if self._nm_dev:
Packit b9ca78
                    error_msg = (
Packit b9ca78
                        f"Connection {self.profile.get_id()} failed: "
Packit b9ca78
                        f"state={self._nm_ac.get_state()} "
Packit b9ca78
                        f"reason={self._nm_ac.get_state_reason()} "
Packit b9ca78
                        f"dev_state={self._nm_dev.get_state()} "
Packit b9ca78
                        f"dev_reason={self._nm_dev.get_state_reason()}"
Packit b9ca78
                    )
Packit b9ca78
                else:
Packit b9ca78
                    error_msg = (
Packit b9ca78
                        f"Connection {self.profile.get_id()} failed: "
Packit b9ca78
                        f"state={self._nm_ac.get_state()} "
Packit b9ca78
                        f"reason={self._nm_ac.get_state_reason()} dev=None"
Packit b9ca78
                    )
Packit b9ca78
                logging.error(error_msg)
Packit b9ca78
                self._ctx.fail(
Packit b9ca78
                    NmstateLibnmError(f"{action} failed: {error_msg}")
Packit b9ca78
                )
Packit b9ca78
Packit b9ca78
    def _wait_ac_activation(self, action):
Packit b9ca78
        self._ac_handlers.add(
Packit b9ca78
            self._nm_ac.connect(
Packit b9ca78
                "state-changed", self._ac_state_change_callback, action
Packit b9ca78
            )
Packit b9ca78
        )
Packit b9ca78
        self._ac_handlers.add(
Packit b9ca78
            self._nm_ac.connect(
Packit b9ca78
                "notify::state-flags",
Packit b9ca78
                self._ac_state_flags_change_callback,
Packit b9ca78
                action,
Packit b9ca78
            )
Packit b9ca78
        )
Packit b9ca78
Packit b9ca78
    def wait_dev_activation(self, action):
Packit b9ca78
        if self._nm_dev:
Packit b9ca78
            self._dev_handlers.add(
Packit b9ca78
                self._nm_dev.connect(
Packit b9ca78
                    "state-changed", self._dev_state_change_callback, action
Packit b9ca78
                )
Packit b9ca78
            )
Packit b9ca78
Packit b9ca78
    def _dev_state_change_callback(
Packit b9ca78
        self, _dev, _new_state, _old_state, _reason, action,
Packit b9ca78
    ):
Packit b9ca78
        if self._ctx.is_cancelled():
Packit b9ca78
            self._activation_clean_up()
Packit b9ca78
            return
Packit b9ca78
        self._activation_progress_check(action)
Packit b9ca78
Packit b9ca78
    def _ac_state_flags_change_callback(self, _nm_act_con, _state, action):
Packit b9ca78
        if self._ctx.is_cancelled():
Packit b9ca78
            self._activation_clean_up()
Packit b9ca78
            return
Packit b9ca78
        self._activation_progress_check(action)
Packit b9ca78
Packit b9ca78
    def _ac_state_change_callback(self, _nm_act_con, _state, _reason, action):
Packit b9ca78
        if self._ctx.is_cancelled():
Packit b9ca78
            self._activation_clean_up()
Packit b9ca78
            return
Packit b9ca78
        self._activation_progress_check(action)
Packit b9ca78
Packit b9ca78
    def _activation_progress_check(self, action):
Packit b9ca78
        if self._ctx.is_cancelled():
Packit b9ca78
            self._activation_clean_up()
Packit b9ca78
            return
Packit b9ca78
        devname = self._nm_dev.get_iface()
Packit b9ca78
        cur_nm_dev = self._ctx.get_nm_dev(devname)
Packit b9ca78
        if cur_nm_dev and cur_nm_dev != self._nm_dev:
Packit b9ca78
            logging.debug(f"The NM.Device of profile {devname} changed")
Packit b9ca78
            self._remove_dev_handlers()
Packit b9ca78
            self._nm_dev = cur_nm_dev
Packit b9ca78
            self.wait_dev_activation(action)
Packit b9ca78
Packit b9ca78
        cur_nm_ac = get_device_active_connection(self.nmdevice)
Packit b9ca78
        if cur_nm_ac and cur_nm_ac != self._nm_ac:
Packit b9ca78
            logging.debug(
Packit b9ca78
                "Active connection of device {} has been replaced".format(
Packit b9ca78
                    self.devname
Packit b9ca78
                )
Packit b9ca78
            )
Packit b9ca78
            self._remove_ac_handlers()
Packit b9ca78
            self._nm_ac = cur_nm_ac
Packit b9ca78
            self._wait_ac_activation(action)
Packit b9ca78
        if is_activated(self._nm_ac, self._nm_dev):
Packit b9ca78
            logging.debug(
Packit b9ca78
                "Connection activation succeeded: dev=%s, con-state=%s, "
Packit b9ca78
                "dev-state=%s, state-flags=%s",
Packit b9ca78
                devname,
Packit b9ca78
                self._nm_ac.get_state(),
Packit b9ca78
                self._nm_dev.get_state(),
Packit b9ca78
                self._nm_ac.get_state_flags(),
Packit b9ca78
            )
Packit b9ca78
            self._activation_clean_up()
Packit b9ca78
            self._ctx.finish_async(action)
Packit b9ca78
        elif (
Packit b9ca78
            not self._is_activating()
Packit b9ca78
            and self._is_sriov_parameter_not_supported_by_driver()
Packit b9ca78
        ):
Packit b9ca78
            reason = (
Packit b9ca78
                f"The device={self.devname} does not support one or "
Packit b9ca78
                "more of the SR-IOV parameters set."
Packit b9ca78
            )
Packit b9ca78
            self._activation_clean_up()
Packit b9ca78
            self._ctx.fail(
Packit b9ca78
                NmstateValueError(f"{action} failed: reason={reason}")
Packit b9ca78
            )
Packit b9ca78
        elif not self._is_activating():
Packit b9ca78
            reason = f"{self._nm_ac.get_state_reason()}"
Packit b9ca78
            if self.nmdevice:
Packit b9ca78
                reason += f" {self.nmdevice.get_state_reason()}"
Packit b9ca78
            self._activation_clean_up()
Packit b9ca78
            self._ctx.fail(
Packit b9ca78
                NmstateLibnmError(f"{action} failed: reason={reason}")
Packit b9ca78
            )
Packit b9ca78
Packit b9ca78
    def _is_sriov_parameter_not_supported_by_driver(self):
Packit b9ca78
        return (
Packit b9ca78
            self.nmdevice
Packit b9ca78
            and self.nmdevice.props.state_reason
Packit b9ca78
            == NM.DeviceStateReason.SRIOV_CONFIGURATION_FAILED
Packit b9ca78
        )
Packit b9ca78
Packit b9ca78
    def _activation_clean_up(self):
Packit b9ca78
        self._remove_ac_handlers()
Packit b9ca78
        self._remove_dev_handlers()
Packit b9ca78
Packit b9ca78
    def _is_activating(self):
Packit b9ca78
        if not self._nm_ac or not self._nm_dev:
Packit b9ca78
            return True
Packit b9ca78
        if (
Packit b9ca78
            self._nm_dev.get_state_reason()
Packit b9ca78
            == NM.DeviceStateReason.NEW_ACTIVATION
Packit b9ca78
        ):
Packit b9ca78
            return True
Packit b9ca78
Packit b9ca78
        return (
Packit b9ca78
            self._nm_ac.get_state() == NM.ActiveConnectionState.ACTIVATING
Packit b9ca78
        ) and not is_activated(self._nm_ac, self._nm_dev)
Packit b9ca78
Packit b9ca78
    def _remove_dev_handlers(self):
Packit b9ca78
        for handler_id in self._dev_handlers:
Packit b9ca78
            self._nm_dev.handler_disconnect(handler_id)
Packit b9ca78
        self._dev_handlers = set()
Packit b9ca78
Packit b9ca78
    def _remove_ac_handlers(self):
Packit b9ca78
        for handler_id in self._ac_handlers:
Packit b9ca78
            self._nm_ac.handler_disconnect(handler_id)
Packit b9ca78
        self._ac_handlers = set()
Packit b9ca78
Packit b9ca78
    def _add_connection2_callback(self, src_object, result, user_data):
Packit b9ca78
        if self._ctx.is_cancelled():
Packit b9ca78
            return
Packit b9ca78
        action = user_data
Packit b9ca78
        try:
Packit b9ca78
            profile = src_object.add_connection2_finish(result)[0]
Packit b9ca78
        except Exception as e:
Packit b9ca78
            self._ctx.fail(
Packit b9ca78
                NmstateLibnmError(f"{action} failed with error: {e}")
Packit b9ca78
            )
Packit b9ca78
            return
Packit b9ca78
Packit b9ca78
        if profile is None:
Packit b9ca78
            self._ctx.fail(
Packit b9ca78
                NmstateLibnmError(
Packit b9ca78
                    f"{action} failed with error: 'None returned from "
Packit b9ca78
                    "add_connection2_finish()"
Packit b9ca78
                )
Packit b9ca78
            )
Packit b9ca78
        else:
Packit b9ca78
            self._ctx.finish_async(action)
Packit b9ca78
Packit b9ca78
    def _update2_callback(self, src_object, result, user_data):
Packit b9ca78
        if self._ctx.is_cancelled():
Packit b9ca78
            return
Packit b9ca78
        action = user_data
Packit b9ca78
        try:
Packit b9ca78
            ret = src_object.update2_finish(result)
Packit b9ca78
        except Exception as e:
Packit b9ca78
            self._ctx.fail(
Packit b9ca78
                NmstateLibnmError(f"{action} failed with error={e}")
Packit b9ca78
            )
Packit b9ca78
            return
Packit b9ca78
        if ret is None:
Packit b9ca78
            self._ctx.fail(
Packit b9ca78
                NmstateLibnmError(
Packit b9ca78
                    f"{action} failed with error='None returned from "
Packit b9ca78
                    "update2_finish()'"
Packit b9ca78
                )
Packit b9ca78
            )
Packit b9ca78
        else:
Packit b9ca78
            self._ctx.finish_async(action)
Packit b9ca78
Packit b9ca78
    def _delete_connection_callback(self, src_object, result, user_data):
Packit b9ca78
        if self._ctx.is_cancelled():
Packit b9ca78
            return
Packit b9ca78
        action = user_data
Packit b9ca78
        try:
Packit b9ca78
            success = src_object.delete_finish(result)
Packit b9ca78
        except Exception as e:
Packit b9ca78
            self._ctx.fail(NmstateLibnmError(f"{action} failed: error={e}"))
Packit b9ca78
            return
Packit b9ca78
Packit b9ca78
        if success:
Packit b9ca78
            self._ctx.finish_async(action)
Packit b9ca78
        else:
Packit b9ca78
            self._ctx.fail(
Packit b9ca78
                NmstateLibnmError(
Packit b9ca78
                    f"{action} failed: "
Packit b9ca78
                    "error='None returned from delete_finish()'"
Packit b9ca78
                )
Packit b9ca78
            )
Packit b9ca78
Packit b9ca78
    def _reset_profile(self):
Packit b9ca78
        self._con_profile = None
Packit b9ca78
Packit b9ca78
Packit b9ca78
class ConnectionSetting:
Packit b9ca78
    def __init__(self, con_setting=None):
Packit b9ca78
        self._setting = con_setting
Packit b9ca78
Packit b9ca78
    def create(self, con_name, iface_name, iface_type):
Packit b9ca78
        con_setting = NM.SettingConnection.new()
Packit b9ca78
        con_setting.props.id = con_name
Packit b9ca78
        con_setting.props.interface_name = iface_name
Packit b9ca78
        con_setting.props.uuid = str(uuid.uuid4())
Packit b9ca78
        con_setting.props.type = iface_type
Packit b9ca78
        con_setting.props.autoconnect = True
Packit b9ca78
        con_setting.props.autoconnect_slaves = (
Packit b9ca78
            NM.SettingConnectionAutoconnectSlaves.YES
Packit b9ca78
        )
Packit b9ca78
Packit b9ca78
        self._setting = con_setting
Packit b9ca78
Packit b9ca78
    def import_by_profile(self, con_profile):
Packit b9ca78
        base = con_profile.profile.get_setting_connection()
Packit b9ca78
        new = NM.SettingConnection.new()
Packit b9ca78
        new.props.id = base.props.id
Packit b9ca78
        new.props.interface_name = base.props.interface_name
Packit b9ca78
        new.props.uuid = base.props.uuid
Packit b9ca78
        new.props.type = base.props.type
Packit b9ca78
        new.props.autoconnect = True
Packit b9ca78
        new.props.autoconnect_slaves = base.props.autoconnect_slaves
Packit b9ca78
Packit b9ca78
        self._setting = new
Packit b9ca78
Packit b9ca78
    def set_master(self, master, slave_type):
Packit b9ca78
        if master is not None:
Packit b9ca78
            self._setting.props.master = master
Packit b9ca78
            self._setting.props.slave_type = slave_type
Packit b9ca78
Packit b9ca78
    def set_profile_name(self, con_name):
Packit b9ca78
        self._setting.props.id = con_name
Packit b9ca78
Packit b9ca78
    @property
Packit b9ca78
    def setting(self):
Packit b9ca78
        return self._setting
Packit b9ca78
Packit b9ca78
Packit b9ca78
def get_device_active_connection(nm_device):
Packit b9ca78
    active_conn = None
Packit b9ca78
    if nm_device:
Packit b9ca78
        active_conn = nm_device.get_active_connection()
Packit b9ca78
    return active_conn
Packit b9ca78
Packit b9ca78
Packit b9ca78
def delete_iface_inactive_connections(context, ifname):
Packit b9ca78
    for con in list_connections_by_ifname(context, ifname):
Packit b9ca78
        con.delete()
Packit b9ca78
Packit b9ca78
Packit b9ca78
def list_connections_by_ifname(context, ifname):
Packit b9ca78
    return [
Packit b9ca78
        ConnectionProfile(context, profile=con)
Packit b9ca78
        for con in context.client.get_connections()
Packit b9ca78
        if con.get_interface_name() == ifname
Packit b9ca78
    ]
Packit b9ca78
Packit b9ca78
Packit b9ca78
def is_activated(nm_ac, nm_dev):
Packit b9ca78
    if not (nm_ac and nm_dev):
Packit b9ca78
        return False
Packit b9ca78
Packit b9ca78
    state = nm_ac.get_state()
Packit b9ca78
    if state == NM.ActiveConnectionState.ACTIVATED:
Packit b9ca78
        return True
Packit b9ca78
    elif state == NM.ActiveConnectionState.ACTIVATING:
Packit b9ca78
        ac_state_flags = nm_ac.get_state_flags()
Packit b9ca78
        nm_flags = NM.ActivationStateFlags
Packit b9ca78
        ip4_is_dynamic = ipv4.is_dynamic(nm_ac)
Packit b9ca78
        ip6_is_dynamic = ipv6.is_dynamic(nm_ac)
Packit b9ca78
        if (
Packit b9ca78
            ac_state_flags & nm_flags.IS_MASTER
Packit b9ca78
            or (ip4_is_dynamic and ac_state_flags & nm_flags.IP6_READY)
Packit b9ca78
            or (ip6_is_dynamic and ac_state_flags & nm_flags.IP4_READY)
Packit b9ca78
            or (ip4_is_dynamic and ip6_is_dynamic)
Packit b9ca78
        ):
Packit b9ca78
            # For interface meet any condition below will be
Packit b9ca78
            # treated as activated when reach IP_CONFIG state:
Packit b9ca78
            #   * Is master device.
Packit b9ca78
            #   * DHCPv4 enabled with IP6_READY flag.
Packit b9ca78
            #   * DHCPv6/Autoconf with IP4_READY flag.
Packit b9ca78
            #   * DHCPv4 enabled with DHCPv6/Autoconf enabled.
Packit b9ca78
            return (
Packit b9ca78
                NM.DeviceState.IP_CONFIG
Packit b9ca78
                <= nm_dev.get_state()
Packit b9ca78
                <= NM.DeviceState.ACTIVATED
Packit b9ca78
            )
Packit b9ca78
Packit b9ca78
    return False