Blob Blame History Raw
#
# Copyright (c) 2018-2020 Red Hat, Inc.
#
# This file is part of nmstate
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, either version 2.1 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
#

import logging
from operator import itemgetter

from libnmstate.ifaces import ovs
from libnmstate.ifaces.bridge import BridgeIface
from libnmstate.ifaces.ovs import OvsPortIface
from libnmstate.schema import Interface
from libnmstate.schema import InterfaceType
from libnmstate.schema import OVSBridge as OB
from libnmstate.schema import OVSInterface
from libnmstate.schema import OvsDB

from .common import NM


PORT_PROFILE_PREFIX = "ovs-port-"

CONTROLLER_TYPE_METADATA = "_controller_type"
CONTROLLER_METADATA = "_controller"
SETTING_OVS_EXTERNALIDS = "SettingOvsExternalIDs"
SETTING_OVS_EXTERNAL_IDS_SETTING_NAME = "ovs-external-ids"

NM_OVS_VLAN_MODE_MAP = {
    "trunk": OB.Port.Vlan.Mode.TRUNK,
    "access": OB.Port.Vlan.Mode.ACCESS,
    "native-tagged": OB.Port.Vlan.Mode.TRUNK,
    "native-untagged": OB.Port.Vlan.Mode.UNKNOWN,  # Not supported yet
    "dot1q-tunnel": OB.Port.Vlan.Mode.UNKNOWN,  # Not supported yet
}


class LacpValue:
    ACTIVE = "active"
    OFF = "off"


def has_ovs_capability(nm_client):
    return NM.Capability.OVS in nm_client.get_capabilities()


def create_bridge_setting(options_state):
    bridge_setting = NM.SettingOvsBridge.new()
    for option_name, option_value in options_state.items():
        if option_name == "fail-mode":
            if option_value:
                bridge_setting.props.fail_mode = option_value
        elif option_name == "mcast-snooping-enable":
            bridge_setting.props.mcast_snooping_enable = option_value
        elif option_name == "rstp":
            bridge_setting.props.rstp_enable = option_value
        elif option_name == "stp":
            bridge_setting.props.stp_enable = option_value

    return bridge_setting


def create_ovsdb_external_ids_setting(ovsdb_conf):
    if _is_nm_support_ovs_external_ids():
        nm_setting = getattr(NM, SETTING_OVS_EXTERNALIDS).new()
        for key, value in ovsdb_conf.get(OvsDB.EXTERNAL_IDS, {}).items():
            if not key.startswith("NM."):
                nm_setting.set_data(key, str(value))
        return nm_setting
    else:
        logging.warn(
            "Please upgrade NetworkManger to 1.30+ "
            "for the support OVS external ID modification"
        )
        return None


def create_port_setting(port_state):
    port_setting = NM.SettingOvsPort.new()

    lag_state = port_state.get(OB.Port.LINK_AGGREGATION_SUBTREE)
    if lag_state:
        mode = lag_state.get(OB.Port.LinkAggregation.MODE)
        if mode == OB.Port.LinkAggregation.Mode.LACP:
            port_setting.props.lacp = LacpValue.ACTIVE
        elif mode in (
            OB.Port.LinkAggregation.Mode.ACTIVE_BACKUP,
            OB.Port.LinkAggregation.Mode.BALANCE_SLB,
        ):
            port_setting.props.lacp = LacpValue.OFF
            port_setting.props.bond_mode = mode
        elif mode == OB.Port.LinkAggregation.Mode.BALANCE_TCP:
            port_setting.props.lacp = LacpValue.ACTIVE
            port_setting.props.bond_mode = mode

        down_delay = lag_state.get(OB.Port.LinkAggregation.Options.DOWN_DELAY)
        if down_delay:
            port_setting.props.bond_downdelay = down_delay
        up_delay = lag_state.get(OB.Port.LinkAggregation.Options.UP_DELAY)
        if up_delay:
            port_setting.props.bond_updelay = up_delay

    vlan_state = port_state.get(OB.Port.VLAN_SUBTREE, {})
    if OB.Port.Vlan.MODE in vlan_state:
        if vlan_state[OB.Port.Vlan.MODE] != OB.Port.Vlan.Mode.UNKNOWN:
            port_setting.props.vlan_mode = vlan_state[OB.Port.Vlan.MODE]
    if OB.Port.Vlan.TAG in vlan_state:
        port_setting.props.tag = vlan_state[OB.Port.Vlan.TAG]

    return port_setting


def create_interface_setting(patch_state):
    interface_setting = NM.SettingOvsInterface.new()
    settings = [interface_setting]

    if patch_state and patch_state.get(OVSInterface.Patch.PEER):
        interface_setting.props.type = "patch"
        settings.append(create_patch_setting(patch_state))
    else:
        interface_setting.props.type = "internal"

    return settings


def create_patch_setting(patch_state):
    patch_setting = NM.SettingOvsPatch.new()
    patch_setting.props.peer = patch_state[OVSInterface.Patch.PEER]

    return patch_setting


def get_ovs_bridge_info(nm_dev_ovs_br):
    iface_info = {OB.CONFIG_SUBTREE: {}}
    ports_info = _get_bridge_nmstate_ports_info(nm_dev_ovs_br)
    options = _get_bridge_options(nm_dev_ovs_br)

    if ports_info or options:
        iface_info[OB.CONFIG_SUBTREE] = {
            OB.PORT_SUBTREE: ports_info,
            OB.OPTIONS_SUBTREE: options,
        }
    return iface_info


def get_ovsdb_external_ids(nm_profile):
    iface_info = {}
    if _is_nm_support_ovs_external_ids():
        nm_setting = nm_profile.get_setting_by_name(
            SETTING_OVS_EXTERNAL_IDS_SETTING_NAME
        )
        if nm_setting:
            external_ids = {}
            for key in nm_setting.get_data_keys():
                external_ids[key] = nm_setting.get_data(key)
            iface_info[OvsDB.OVS_DB_SUBTREE] = {
                OvsDB.EXTERNAL_IDS: external_ids
            }
    return iface_info


def _get_bridge_nmstate_ports_info(nm_dev_ovs_br):
    ports_info = []
    for nm_dev_ovs_port in nm_dev_ovs_br.get_slaves():
        port_info = {}
        nm_dev_ovs_ifaces = nm_dev_ovs_port.get_slaves()
        if not nm_dev_ovs_ifaces:
            continue
        if len(nm_dev_ovs_ifaces) == 1:
            port_info[OB.Port.NAME] = nm_dev_ovs_ifaces[0].get_iface()
        else:
            port_info = _get_lag_nmstate_port_info(nm_dev_ovs_port)
        vlan_info = _get_vlan_info(nm_dev_ovs_port)
        if vlan_info:
            port_info[OB.Port.VLAN_SUBTREE] = vlan_info
        ports_info.append(port_info)
    return sorted(ports_info, key=itemgetter(OB.Port.NAME))


def get_interface_info(act_con):
    """
    Get OVS interface information from the NM profile.
    """
    info = {}
    if act_con:
        patch_setting = _get_patch_setting(act_con)
        if patch_setting:
            info[OVSInterface.PATCH_CONFIG_SUBTREE] = {
                OVSInterface.Patch.PEER: patch_setting.props.peer,
            }

    return info


def _get_patch_setting(act_con):
    """
    Get NM.SettingOvsPatch from NM.ActiveConnection.
    For any error, return None.
    """
    remote_con = act_con.get_connection()
    if remote_con:
        return remote_con.get_setting_ovs_patch()

    return None


def _get_lag_nmstate_port_info(nm_dev_ovs_port):
    OVS_LAG = OB.Port.LinkAggregation

    lag = {
        OVS_LAG.PORT_SUBTREE: sorted(
            [
                {OVS_LAG.Port.NAME: nm_dev_ovs_iface.get_iface()}
                for nm_dev_ovs_iface in nm_dev_ovs_port.get_slaves()
            ],
            key=itemgetter(OVS_LAG.Port.NAME),
        ),
    }
    mode = _get_lag_mode(nm_dev_ovs_port)
    if mode:
        lag[OVS_LAG.MODE] = mode
    return {
        OB.Port.NAME: nm_dev_ovs_port.get_iface(),
        OB.Port.LINK_AGGREGATION_SUBTREE: lag,
    }


def _get_lag_mode(nm_dev_ovs_port):
    """
    TODO: Use applied profile instead of on-disk one.
    """
    mode = None
    nm_setting = _get_nm_setting_ovs_port(nm_dev_ovs_port)
    if nm_setting:
        lacp = nm_setting.props.lacp
        mode = nm_setting.props.bond_mode
        if not mode:
            if lacp == LacpValue.ACTIVE:
                mode = OB.Port.LinkAggregation.Mode.LACP
            else:
                mode = OB.Port.LinkAggregation.Mode.ACTIVE_BACKUP
    return mode


def _get_vlan_info(nm_dev_ovs_port):
    nm_setting = _get_nm_setting_ovs_port(nm_dev_ovs_port)
    if nm_setting:
        vlan_mode = nm_setting.props.vlan_mode
        if vlan_mode:
            nmstate_vlan_mode = NM_OVS_VLAN_MODE_MAP.get(
                vlan_mode, OB.Port.Vlan.Mode.UNKNOWN
            )
            if nmstate_vlan_mode == OB.Port.Vlan.Mode.UNKNOWN:
                logging.warning(
                    f"OVS Port VLAN mode '{vlan_mode}' is not supported yet"
                )
                return {OB.Port.Vlan.MODE: OB.Port.Vlan.Mode.UNKNOWN}
            else:
                return {
                    OB.Port.Vlan.MODE: nmstate_vlan_mode,
                    OB.Port.Vlan.TAG: nm_setting.get_tag(),
                }
    return {}


def _get_nm_setting_ovs_port(nm_dev_ovs_port):
    nm_ac = nm_dev_ovs_port.get_active_connection()
    if nm_ac:
        nm_profile = nm_ac.props.connection
        if nm_profile:
            return nm_profile.get_setting(NM.SettingOvsPort)
    return None


def _get_bridge_options(bridge_device):
    bridge_options = {}
    bridge_profile = None
    act_conn = bridge_device.get_active_connection()
    if act_conn:
        bridge_profile = act_conn.props.connection

    if bridge_profile:
        bridge_setting = bridge_profile.get_setting(NM.SettingOvsBridge)
        bridge_options["stp"] = bridge_setting.props.stp_enable
        bridge_options["rstp"] = bridge_setting.props.rstp_enable
        bridge_options["fail-mode"] = bridge_setting.props.fail_mode or ""
        bridge_options[
            "mcast-snooping-enable"
        ] = bridge_setting.props.mcast_snooping_enable

    return bridge_options


def create_iface_for_nm_ovs_port(iface):
    iface_name = iface.name
    iface_info = iface.to_dict()
    port_options = iface_info.get(BridgeIface.BRPORT_OPTIONS_METADATA)
    if ovs.is_ovs_lag_port(port_options):
        port_name = port_options[OB.Port.NAME]
    else:
        port_name = PORT_PROFILE_PREFIX + iface_name
    return OvsPortIface(
        {
            Interface.NAME: port_name,
            Interface.TYPE: InterfaceType.OVS_PORT,
            Interface.STATE: iface.state,
            OB.OPTIONS_SUBTREE: port_options,
            CONTROLLER_METADATA: iface_info[CONTROLLER_METADATA],
            CONTROLLER_TYPE_METADATA: iface_info[CONTROLLER_TYPE_METADATA],
        }
    )


def _is_nm_support_ovs_external_ids():
    return hasattr(NM, SETTING_OVS_EXTERNALIDS)