Blob Blame History Raw
#
# Copyright (c) 2018-2020 Red Hat, Inc.
#
# This file is part of nmstate
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, either version 2.1 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
#

import logging
from operator import itemgetter

from libnmstate.schema import OVSBridge as OB
from libnmstate.schema import OVSInterface

from . import connection
from .common import NM


PORT_PROFILE_PREFIX = "ovs-port-"

NM_OVS_VLAN_MODE_MAP = {
    "trunk": OB.Port.Vlan.Mode.TRUNK,
    "access": OB.Port.Vlan.Mode.ACCESS,
    "native-tagged": OB.Port.Vlan.Mode.TRUNK,
    "native-untagged": OB.Port.Vlan.Mode.UNKNOWN,  # Not supported yet
    "dot1q-tunnel": OB.Port.Vlan.Mode.UNKNOWN,  # Not supported yet
}


class LacpValue:
    ACTIVE = "active"
    OFF = "off"


def has_ovs_capability(nm_client):
    return NM.Capability.OVS in nm_client.get_capabilities()


def create_bridge_setting(options_state):
    bridge_setting = NM.SettingOvsBridge.new()
    for option_name, option_value in options_state.items():
        if option_name == "fail-mode":
            if option_value:
                bridge_setting.props.fail_mode = option_value
        elif option_name == "mcast-snooping-enable":
            bridge_setting.props.mcast_snooping_enable = option_value
        elif option_name == "rstp":
            bridge_setting.props.rstp_enable = option_value
        elif option_name == "stp":
            bridge_setting.props.stp_enable = option_value

    return bridge_setting


def create_port_setting(port_state):
    port_setting = NM.SettingOvsPort.new()

    lag_state = port_state.get(OB.Port.LINK_AGGREGATION_SUBTREE)
    if lag_state:
        mode = lag_state.get(OB.Port.LinkAggregation.MODE)
        if mode == OB.Port.LinkAggregation.Mode.LACP:
            port_setting.props.lacp = LacpValue.ACTIVE
        elif mode in (
            OB.Port.LinkAggregation.Mode.ACTIVE_BACKUP,
            OB.Port.LinkAggregation.Mode.BALANCE_SLB,
        ):
            port_setting.props.lacp = LacpValue.OFF
            port_setting.props.bond_mode = mode
        elif mode == OB.Port.LinkAggregation.Mode.BALANCE_TCP:
            port_setting.props.lacp = LacpValue.ACTIVE
            port_setting.props.bond_mode = mode

        down_delay = lag_state.get(OB.Port.LinkAggregation.Options.DOWN_DELAY)
        if down_delay:
            port_setting.props.bond_downdelay = down_delay
        up_delay = lag_state.get(OB.Port.LinkAggregation.Options.UP_DELAY)
        if up_delay:
            port_setting.props.bond_updelay = up_delay

    vlan_state = port_state.get(OB.Port.VLAN_SUBTREE, {})
    if OB.Port.Vlan.MODE in vlan_state:
        if vlan_state[OB.Port.Vlan.MODE] != OB.Port.Vlan.Mode.UNKNOWN:
            port_setting.props.vlan_mode = vlan_state[OB.Port.Vlan.MODE]
    if OB.Port.Vlan.TAG in vlan_state:
        port_setting.props.tag = vlan_state[OB.Port.Vlan.TAG]

    return port_setting


def create_interface_setting(patch_state):
    interface_setting = NM.SettingOvsInterface.new()
    settings = [interface_setting]

    if patch_state and patch_state.get(OVSInterface.Patch.PEER):
        interface_setting.props.type = "patch"
        settings.append(create_patch_setting(patch_state))
    else:
        interface_setting.props.type = "internal"

    return settings


def create_patch_setting(patch_state):
    patch_setting = NM.SettingOvsPatch.new()
    patch_setting.props.peer = patch_state[OVSInterface.Patch.PEER]

    return patch_setting


def is_ovs_bridge_type_id(type_id):
    return type_id == NM.DeviceType.OVS_BRIDGE


def is_ovs_port_type_id(type_id):
    return type_id == NM.DeviceType.OVS_PORT


def is_ovs_interface_type_id(type_id):
    return type_id == NM.DeviceType.OVS_INTERFACE


def get_port_by_slave(nmdev):
    active_con = connection.get_device_active_connection(nmdev)
    if active_con:
        master = active_con.get_master()
        if master and is_ovs_port_type_id(master.get_device_type()):
            return master
    return None


def get_ovs_info(context, bridge_device, devices_info):
    port_profiles = _get_slave_profiles(bridge_device, devices_info)
    ports = _get_bridge_ports_info(context, port_profiles, devices_info)
    options = _get_bridge_options(context, bridge_device)

    if ports or options:
        return {"port": ports, "options": options}
    else:
        return {}


def get_interface_info(act_con):
    """
    Get OVS interface information from the NM profile.
    """
    info = {}
    if act_con:
        patch_setting = _get_patch_setting(act_con)
        if patch_setting:
            info[OVSInterface.PATCH_CONFIG_SUBTREE] = {
                OVSInterface.Patch.PEER: patch_setting.props.peer,
            }

    return info


def _get_patch_setting(act_con):
    """
    Get NM.SettingOvsPatch from NM.ActiveConnection.
    For any error, return None.
    """
    remote_con = act_con.get_connection()
    if remote_con:
        return remote_con.get_setting_ovs_patch()

    return None


def get_slaves(nm_device):
    return nm_device.get_slaves()


def _get_bridge_ports_info(context, port_profiles, devices_info):
    ports_info = []
    for p in port_profiles:
        port_info = _get_bridge_port_info(context, p, devices_info)
        if port_info:
            ports_info.append(port_info)
    ports_info.sort(key=itemgetter(OB.Port.NAME))
    return ports_info


def _get_bridge_port_info(context, port_profile, devices_info):
    """
    Report port information.
    Note: The current implementation supports only system OVS ports and
    access vlan-mode (trunks are not supported).
    """
    port_info = {}

    port_setting = port_profile.get_setting(NM.SettingOvsPort)
    vlan_mode = port_setting.props.vlan_mode

    port_name = port_profile.get_interface_name()
    port_device = context.get_nm_dev(port_name)
    port_slave_profiles = _get_slave_profiles(port_device, devices_info)
    port_slave_names = [c.get_interface_name() for c in port_slave_profiles]

    if port_slave_names:
        number_of_interfaces = len(port_slave_names)
        if number_of_interfaces == 1:
            port_info[OB.Port.NAME] = port_slave_names[0]
        else:
            port_lag_info = _get_lag_info(
                port_name, port_setting, port_slave_names
            )
            port_info.update(port_lag_info)

        if vlan_mode:
            nmstate_vlan_mode = NM_OVS_VLAN_MODE_MAP.get(
                vlan_mode, OB.Port.Vlan.Mode.UNKNOWN
            )
            if nmstate_vlan_mode == OB.Port.Vlan.Mode.UNKNOWN:
                logging.warning(
                    f"OVS Port VLAN mode '{vlan_mode}' is not supported yet"
                )
            port_info[OB.Port.VLAN_SUBTREE] = {
                OB.Port.Vlan.MODE: nmstate_vlan_mode,
                OB.Port.Vlan.TAG: port_setting.get_tag(),
            }
    return port_info


def _get_lag_info(port_name, port_setting, port_slave_names):
    port_info = {}

    lacp = port_setting.props.lacp
    mode = port_setting.props.bond_mode
    if not mode:
        if lacp == LacpValue.ACTIVE:
            mode = OB.Port.LinkAggregation.Mode.LACP
        else:
            mode = OB.Port.LinkAggregation.Mode.ACTIVE_BACKUP
    port_info[OB.Port.NAME] = port_name
    port_info[OB.Port.LINK_AGGREGATION_SUBTREE] = {
        OB.Port.LinkAggregation.MODE: mode,
        OB.Port.LinkAggregation.SLAVES_SUBTREE: sorted(
            [
                {OB.Port.LinkAggregation.Slave.NAME: iface_name}
                for iface_name in port_slave_names
            ],
            key=itemgetter(OB.Port.LinkAggregation.Slave.NAME),
        ),
    }
    return port_info


def _get_bridge_options(context, bridge_device):
    bridge_options = {}
    con = connection.ConnectionProfile(context)
    con.import_by_device(bridge_device)
    if con.profile:
        bridge_setting = con.profile.get_setting(NM.SettingOvsBridge)
        bridge_options["stp"] = bridge_setting.props.stp_enable
        bridge_options["rstp"] = bridge_setting.props.rstp_enable
        bridge_options["fail-mode"] = bridge_setting.props.fail_mode or ""
        bridge_options[
            "mcast-snooping-enable"
        ] = bridge_setting.props.mcast_snooping_enable

    return bridge_options


def _get_slave_profiles(master_device, devices_info):
    slave_profiles = []
    for dev, _ in devices_info:
        active_con = connection.get_device_active_connection(dev)
        if active_con:
            master = active_con.props.master
            if master and (master.get_iface() == master_device.get_iface()):
                slave_profiles.append(active_con.props.connection)
    return slave_profiles