Blame libnmstate/nm/lldp.py

Packit b9ca78
#
Packit b9ca78
# Copyright (c) 2020 Red Hat, Inc.
Packit b9ca78
#
Packit b9ca78
# This file is part of nmstate
Packit b9ca78
#
Packit b9ca78
# This program is free software: you can redistribute it and/or modify
Packit b9ca78
# it under the terms of the GNU Lesser General Public License as published by
Packit b9ca78
# the Free Software Foundation, either version 2.1 of the License, or
Packit b9ca78
# (at your option) any later version.
Packit b9ca78
#
Packit b9ca78
# This program is distributed in the hope that it will be useful,
Packit b9ca78
# but WITHOUT ANY WARRANTY; without even the implied warranty of
Packit b9ca78
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
Packit b9ca78
# GNU Lesser General Public License for more details.
Packit b9ca78
#
Packit b9ca78
# You should have received a copy of the GNU Lesser General Public License
Packit b9ca78
# along with this program. If not, see <https://www.gnu.org/licenses/>.
Packit b9ca78
#
Packit b9ca78
Packit b9ca78
from libnmstate.schema import LLDP
Packit b9ca78
Packit b9ca78
from .common import NM
Packit b9ca78
Packit b9ca78
Packit b9ca78
NM_VLAN_ID_KEY = "vid"
Packit b9ca78
NM_VLAN_NAME_KEY = "name"
Packit b9ca78
NM_MACPHY_AUTONEG_KEY = "autoneg"
Packit b9ca78
NM_MACPHY_PMD_AUTONEG_KEY = "pmd-autoneg-cap"
Packit b9ca78
NM_MACPHY_MAU_TYPE_KEY = "operational-mau-type"
Packit b9ca78
NM_PPVLAN_ID_KEY = "ppvid"
Packit b9ca78
NM_PPVLAN_FLAGS_KEY = "flags"
Packit b9ca78
NM_MANAGEMENT_ADDR_KEY = "address"
Packit b9ca78
NM_MANAGEMENT_ADDR_TYPE_KEY = "address-subtype"
Packit b9ca78
NM_MANAGEMENT_ADDR_IFACE_NUMBER_KEY = "interface-number"
Packit b9ca78
NM_MANAGEMENT_ADDR_IFACE_NUMBER_TYPE_KEY = "interface-number-subtype"
Packit b9ca78
NM_MANAGEMENT_ADDR_TYPE_IPV4 = 1
Packit b9ca78
NM_MANAGEMENT_ADDR_TYPE_MAC = 6
Packit b9ca78
NM_INTERFACE_TYPE_IFINDEX = 2
Packit b9ca78
NM_INTERFACE_TYPE_SYSTEM_PORT = 3
Packit b9ca78
NM_LLDP_STATUS_DEFAULT = -1
Packit b9ca78
CHASSIS_TYPE_UNKNOWN = "unknown"
Packit b9ca78
PORT_TYPE_UNKNOWN = "unknown"
Packit b9ca78
Packit b9ca78
CHASSIS_ID_TLV = 1
Packit b9ca78
PORT_TLV = 2
Packit b9ca78
SYSTEM_NAME_TLV = 5
Packit b9ca78
SYSTEM_DESCRIPTION_TLV = 6
Packit b9ca78
SYSTEM_CAPABILITIES_TLV = 7
Packit b9ca78
MANAGEMENT_ADDRESSES_TLV = 8
Packit b9ca78
ORGANIZATION_SPECIFIC_TLV = 127
Packit b9ca78
Packit b9ca78
IEEE = "00:80:c2"
Packit b9ca78
PORT_VLAN_SUBTYPE_TLV = 2
Packit b9ca78
VLAN_SUBTYPE_TLV = 3
Packit b9ca78
Packit b9ca78
IEEE_802_3 = "00:12:0f"
Packit b9ca78
MAC_PHY_SUBTYPE_TLV = 1
Packit b9ca78
MFS_SUBTYPE_TLV = 4
Packit b9ca78
Packit b9ca78
LLDP_CAP_NAMES = {
Packit b9ca78
    0b1: "Other",
Packit b9ca78
    0b10: "Repeater",
Packit b9ca78
    0b100: "MAC Bridge component",
Packit b9ca78
    0b1000: "802.11 Access Point (AP)",
Packit b9ca78
    0b1_0000: "Router",
Packit b9ca78
    0b10_0000: "Telephone",
Packit b9ca78
    0b100_0000: "DOCSIS cable device",
Packit b9ca78
    0b1000_0000: "Station Only",
Packit b9ca78
    0b1_0000_0000: "C-VLAN component",
Packit b9ca78
    0b10_0000_0000: "S-VLAN component",
Packit b9ca78
    0b100_0000_0000: "Two-port MAC Relay component",
Packit b9ca78
}
Packit b9ca78
Packit b9ca78
Packit b9ca78
LLDP_CHASSIS_TYPE_TO_NMSTATE = [
Packit b9ca78
    "Reserved",
Packit b9ca78
    "Chassis component",
Packit b9ca78
    "Interface alias",
Packit b9ca78
    "Port component",
Packit b9ca78
    "MAC address",
Packit b9ca78
    "Network address",
Packit b9ca78
    "Interface name",
Packit b9ca78
    "Locally assigned",
Packit b9ca78
]
Packit b9ca78
Packit b9ca78
Packit b9ca78
LLDP_PORT_TYPE_TO_NMSTATE = [
Packit b9ca78
    "Reserved",
Packit b9ca78
    "Interface alias",
Packit b9ca78
    "Port component",
Packit b9ca78
    "MAC address",
Packit b9ca78
    "Network address",
Packit b9ca78
    "Interface name",
Packit b9ca78
    "Agent circuit ID",
Packit b9ca78
    "Locally assigned",
Packit b9ca78
]
Packit b9ca78
Packit b9ca78
Packit b9ca78
def apply_lldp_setting(con_setting, iface_desired_state):
Packit b9ca78
    lldp_status = iface_desired_state.get(LLDP.CONFIG_SUBTREE, {}).get(
Packit b9ca78
        LLDP.ENABLED, None
Packit b9ca78
    )
Packit b9ca78
    if lldp_status is not None:
Packit b9ca78
        lldp_status = int(lldp_status)
Packit b9ca78
        con_setting.setting.props.lldp = lldp_status
Packit b9ca78
Packit b9ca78
Packit Service dd31ba
def get_info(nm_client, nmdev):
Packit b9ca78
    """
Packit b9ca78
    Provides the current LLDP neighbors information
Packit b9ca78
    """
Packit Service dd31ba
    lldp_status = _get_lldp_status(nm_client, nmdev)
Packit Service dd31ba
    info = {}
Packit Service dd31ba
    if lldp_status == NM_LLDP_STATUS_DEFAULT or not lldp_status:
Packit Service dd31ba
        info[LLDP.ENABLED] = False
Packit Service dd31ba
    else:
Packit Service dd31ba
        info[LLDP.ENABLED] = True
Packit b9ca78
        _get_neighbors_info(info, nmdev)
Packit b9ca78
Packit Service dd31ba
    return {LLDP.CONFIG_SUBTREE: info}
Packit b9ca78
Packit b9ca78
Packit Service dd31ba
def _get_lldp_status(nm_client, nmdev):
Packit b9ca78
    """
Packit b9ca78
    Default means NM global config file value which is by default disabled.
Packit b9ca78
    According to NM folks, there is no way from libnm to know if lldp is
Packit b9ca78
    enable or not with libnm if the value in the profile is default.
Packit b9ca78
    Therefore, the best option is to force the users to enable it explicitly.
Packit b9ca78
    This is going to be solved by a property in the NM.Device object to know if
Packit b9ca78
    the device is listening on LLDP.
Packit b9ca78
Packit b9ca78
    Ref: https://bugzilla.redhat.com/1832273
Packit b9ca78
    """
Packit Service dd31ba
    lldp_status = None
Packit Service dd31ba
    lldp_profile = None
Packit Service dd31ba
    act_conn = nmdev.get_active_connection()
Packit Service dd31ba
    if act_conn:
Packit Service dd31ba
        lldp_profile = act_conn.props.connection
Packit Service dd31ba
    if lldp_profile:
Packit Service dd31ba
        con_setting = lldp_profile.get_setting_connection()
Packit b9ca78
        if con_setting:
Packit Service dd31ba
            lldp_status = con_setting.get_lldp()
Packit b9ca78
Packit b9ca78
    return lldp_status
Packit b9ca78
Packit b9ca78
Packit b9ca78
def _get_neighbors_info(info, nmdev):
Packit b9ca78
    neighbors = nmdev.get_lldp_neighbors()
Packit b9ca78
    info_neighbors = []
Packit b9ca78
    for neighbor in neighbors:
Packit b9ca78
        n_info = []
Packit b9ca78
        _add_neighbor_system_info(neighbor, n_info)
Packit b9ca78
        _add_neighbor_chassis_info(neighbor, n_info)
Packit b9ca78
        _add_neighbor_port_info(neighbor, n_info)
Packit b9ca78
        _add_neighbor_vlans_info(neighbor, n_info)
Packit b9ca78
        _add_neighbor_macphy_info(neighbor, n_info)
Packit b9ca78
        _add_neighbor_port_vlans_info(neighbor, n_info)
Packit b9ca78
        _add_neighbor_management_addresses(neighbor, n_info)
Packit b9ca78
        _add_max_frame_size(neighbor, n_info)
Packit b9ca78
        info_neighbors.append(n_info)
Packit b9ca78
Packit b9ca78
    if info_neighbors:
Packit b9ca78
        info[LLDP.NEIGHBORS_SUBTREE] = info_neighbors
Packit b9ca78
Packit b9ca78
Packit b9ca78
def _add_neighbor_system_info(neighbor, info):
Packit b9ca78
    sys_name = neighbor.get_attr_value(NM.LLDP_ATTR_SYSTEM_NAME)
Packit b9ca78
    if sys_name:
Packit b9ca78
        sys_name_object = {
Packit b9ca78
            LLDP.Neighbors.TLV_TYPE: SYSTEM_NAME_TLV,
Packit b9ca78
            NM.LLDP_ATTR_SYSTEM_NAME: sys_name.get_string(),
Packit b9ca78
        }
Packit b9ca78
        info.append(sys_name_object)
Packit b9ca78
Packit b9ca78
    sys_desc = neighbor.get_attr_value(NM.LLDP_ATTR_SYSTEM_DESCRIPTION)
Packit b9ca78
    if sys_desc:
Packit b9ca78
        sys_desc_object = {
Packit b9ca78
            LLDP.Neighbors.TLV_TYPE: SYSTEM_DESCRIPTION_TLV,
Packit b9ca78
            NM.LLDP_ATTR_SYSTEM_DESCRIPTION: sys_desc.get_string().rstrip(),
Packit b9ca78
        }
Packit b9ca78
        info.append(sys_desc_object)
Packit b9ca78
Packit b9ca78
    sys_caps = neighbor.get_attr_value(NM.LLDP_ATTR_SYSTEM_CAPABILITIES)
Packit b9ca78
    if sys_caps:
Packit b9ca78
        sys_caps_object = {
Packit b9ca78
            LLDP.Neighbors.TLV_TYPE: SYSTEM_CAPABILITIES_TLV,
Packit b9ca78
            NM.LLDP_ATTR_SYSTEM_CAPABILITIES: _decode_sys_caps(
Packit b9ca78
                sys_caps.get_uint32()
Packit b9ca78
            ),
Packit b9ca78
        }
Packit b9ca78
        info.append(sys_caps_object)
Packit b9ca78
Packit b9ca78
Packit b9ca78
def _decode_sys_caps(code):
Packit b9ca78
    capabilities = []
Packit b9ca78
    for mask, capability in LLDP_CAP_NAMES.items():
Packit b9ca78
        if code & mask:
Packit b9ca78
            capabilities.append(capability)
Packit b9ca78
    return capabilities
Packit b9ca78
Packit b9ca78
Packit b9ca78
def _add_neighbor_chassis_info(neighbor, info):
Packit b9ca78
    chassis_info = {}
Packit b9ca78
    chassis_object = {}
Packit b9ca78
    chassis_id = neighbor.get_attr_value(NM.LLDP_ATTR_CHASSIS_ID)
Packit b9ca78
    if chassis_id:
Packit b9ca78
        chassis_object[NM.LLDP_ATTR_CHASSIS_ID] = chassis_id.get_string()
Packit b9ca78
Packit b9ca78
    chassis_id_type = neighbor.get_attr_value(NM.LLDP_ATTR_CHASSIS_ID_TYPE)
Packit b9ca78
    if chassis_id_type:
Packit b9ca78
        chassis_object[
Packit b9ca78
            NM.LLDP_ATTR_CHASSIS_ID_TYPE
Packit b9ca78
        ] = chassis_id_type.get_uint32()
Packit b9ca78
        chassis_object[LLDP.Neighbors.DESCRIPTION] = _decode_chassis_type(
Packit b9ca78
            chassis_id_type.get_uint32()
Packit b9ca78
        )
Packit b9ca78
Packit b9ca78
    if chassis_object:
Packit b9ca78
        chassis_info[LLDP.Neighbors.TLV_TYPE] = CHASSIS_ID_TLV
Packit b9ca78
        chassis_info.update(chassis_object)
Packit b9ca78
        info.append(chassis_info)
Packit b9ca78
Packit b9ca78
Packit b9ca78
def _decode_chassis_type(code):
Packit b9ca78
    try:
Packit b9ca78
        return LLDP_CHASSIS_TYPE_TO_NMSTATE[code]
Packit b9ca78
    except IndexError:
Packit b9ca78
        return CHASSIS_TYPE_UNKNOWN
Packit b9ca78
Packit b9ca78
Packit b9ca78
def _add_neighbor_port_info(neighbor, info):
Packit b9ca78
    port_info = {}
Packit b9ca78
    port_object = {}
Packit b9ca78
    port_id = neighbor.get_attr_value(NM.LLDP_ATTR_PORT_ID)
Packit b9ca78
    if port_id:
Packit b9ca78
        port_object[NM.LLDP_ATTR_PORT_ID] = port_id.get_string()
Packit b9ca78
Packit b9ca78
    port_type = neighbor.get_attr_value(NM.LLDP_ATTR_PORT_ID_TYPE)
Packit b9ca78
    if port_type:
Packit b9ca78
        port_object[NM.LLDP_ATTR_PORT_ID_TYPE] = port_type.get_uint32()
Packit b9ca78
        port_object[LLDP.Neighbors.DESCRIPTION] = _decode_port_type(
Packit b9ca78
            port_type.get_uint32()
Packit b9ca78
        )
Packit b9ca78
Packit b9ca78
    if port_object:
Packit b9ca78
        port_info[LLDP.Neighbors.TLV_TYPE] = PORT_TLV
Packit b9ca78
        port_info.update(port_object)
Packit b9ca78
        info.append(port_info)
Packit b9ca78
Packit b9ca78
Packit b9ca78
def _decode_port_type(code):
Packit b9ca78
    try:
Packit b9ca78
        return LLDP_PORT_TYPE_TO_NMSTATE[code]
Packit b9ca78
    except IndexError:
Packit b9ca78
        return PORT_TYPE_UNKNOWN
Packit b9ca78
Packit b9ca78
Packit b9ca78
def _add_neighbor_vlans_info(neighbor, info):
Packit b9ca78
    vlans_info = {}
Packit b9ca78
    vlan_objects = []
Packit b9ca78
    vlans = neighbor.get_attr_value(NM.LLDP_ATTR_IEEE_802_1_VLANS)
Packit b9ca78
    if vlans:
Packit b9ca78
        vlans = vlans.unpack()
Packit b9ca78
        for vlan in vlans:
Packit b9ca78
            vlan_object = vlan.copy()
Packit b9ca78
            vlan_object[NM_VLAN_NAME_KEY] = vlan_object[
Packit b9ca78
                NM_VLAN_NAME_KEY
Packit b9ca78
            ].replace("\\000", "")
Packit b9ca78
            if vlan_object:
Packit b9ca78
                vlan_objects.append(vlan_object)
Packit b9ca78
Packit b9ca78
    if vlan_objects:
Packit b9ca78
        vlans_info[LLDP.Neighbors.TLV_TYPE] = ORGANIZATION_SPECIFIC_TLV
Packit b9ca78
        vlans_info[LLDP.Neighbors.ORGANIZATION_CODE] = IEEE
Packit b9ca78
        vlans_info[LLDP.Neighbors.TLV_SUBTYPE] = VLAN_SUBTYPE_TLV
Packit b9ca78
        vlans_info[NM.LLDP_ATTR_IEEE_802_1_VLANS] = vlan_objects
Packit b9ca78
        info.append(vlans_info)
Packit b9ca78
Packit b9ca78
Packit b9ca78
def _add_neighbor_macphy_info(neighbor, info):
Packit b9ca78
    macphy_info = {}
Packit b9ca78
    macphy_object = {}
Packit b9ca78
    macphy_conf = neighbor.get_attr_value(NM.LLDP_ATTR_IEEE_802_3_MAC_PHY_CONF)
Packit b9ca78
    if macphy_conf:
Packit b9ca78
        macphy_object[NM_MACPHY_AUTONEG_KEY] = bool(
Packit b9ca78
            macphy_conf[NM_MACPHY_AUTONEG_KEY]
Packit b9ca78
        )
Packit b9ca78
        macphy_object[NM_MACPHY_PMD_AUTONEG_KEY] = macphy_conf[
Packit b9ca78
            NM_MACPHY_PMD_AUTONEG_KEY
Packit b9ca78
        ]
Packit b9ca78
        macphy_object[NM_MACPHY_MAU_TYPE_KEY] = macphy_conf[
Packit b9ca78
            NM_MACPHY_MAU_TYPE_KEY
Packit b9ca78
        ]
Packit b9ca78
Packit b9ca78
        macphy_info[LLDP.Neighbors.TLV_TYPE] = ORGANIZATION_SPECIFIC_TLV
Packit b9ca78
        macphy_info[LLDP.Neighbors.ORGANIZATION_CODE] = IEEE_802_3
Packit b9ca78
        macphy_info[LLDP.Neighbors.TLV_SUBTYPE] = MAC_PHY_SUBTYPE_TLV
Packit b9ca78
        macphy_info[NM.LLDP_ATTR_IEEE_802_3_MAC_PHY_CONF] = macphy_object
Packit b9ca78
        info.append(macphy_info)
Packit b9ca78
Packit b9ca78
Packit b9ca78
def _add_neighbor_port_vlans_info(neighbor, info):
Packit b9ca78
    port_vlan_objects = []
Packit b9ca78
    port_vlans_info = {}
Packit b9ca78
    port_vlans = neighbor.get_attr_value(NM.LLDP_ATTR_IEEE_802_1_PPVIDS)
Packit b9ca78
    if port_vlans:
Packit b9ca78
        port_vlans = port_vlans.unpack()
Packit b9ca78
        for p_vlan in port_vlans:
Packit b9ca78
            port_vlan_objects.append(p_vlan[NM_PPVLAN_ID_KEY])
Packit b9ca78
        if port_vlan_objects:
Packit b9ca78
            port_vlans_info[
Packit b9ca78
                LLDP.Neighbors.TLV_TYPE
Packit b9ca78
            ] = ORGANIZATION_SPECIFIC_TLV
Packit b9ca78
            port_vlans_info[LLDP.Neighbors.ORGANIZATION_CODE] = IEEE
Packit b9ca78
            port_vlans_info[LLDP.Neighbors.TLV_SUBTYPE] = PORT_VLAN_SUBTYPE_TLV
Packit b9ca78
            port_vlans_info[NM.LLDP_ATTR_IEEE_802_1_PPVIDS] = port_vlan_objects
Packit b9ca78
            info.append(port_vlans_info)
Packit b9ca78
Packit b9ca78
Packit b9ca78
def _add_neighbor_management_addresses(neighbor, info):
Packit b9ca78
    addresses_objects = []
Packit b9ca78
    addresses_info = {}
Packit b9ca78
    mngt_addresses = neighbor.get_attr_value(NM.LLDP_ATTR_MANAGEMENT_ADDRESSES)
Packit b9ca78
    if mngt_addresses:
Packit b9ca78
        mngt_addresses = mngt_addresses.unpack()
Packit b9ca78
        for mngt_address in mngt_addresses:
Packit b9ca78
            mngt_address_info = {}
Packit b9ca78
            addr, addr_type = _decode_management_address_type(
Packit b9ca78
                mngt_address[NM_MANAGEMENT_ADDR_TYPE_KEY],
Packit b9ca78
                mngt_address[NM_MANAGEMENT_ADDR_KEY],
Packit b9ca78
            )
Packit b9ca78
            mngt_address_info[NM_MANAGEMENT_ADDR_KEY] = addr
Packit b9ca78
            mngt_address_info[NM_MANAGEMENT_ADDR_TYPE_KEY] = addr_type
Packit b9ca78
            mngt_address_info[
Packit b9ca78
                NM_MANAGEMENT_ADDR_IFACE_NUMBER_KEY
Packit b9ca78
            ] = mngt_address[NM_MANAGEMENT_ADDR_IFACE_NUMBER_KEY]
Packit b9ca78
            mngt_address_info[
Packit b9ca78
                NM_MANAGEMENT_ADDR_IFACE_NUMBER_TYPE_KEY
Packit b9ca78
            ] = mngt_address[NM_MANAGEMENT_ADDR_IFACE_NUMBER_TYPE_KEY]
Packit b9ca78
            addresses_objects.append(mngt_address_info)
Packit b9ca78
        if addresses_objects:
Packit b9ca78
            addresses_info[LLDP.Neighbors.TLV_TYPE] = MANAGEMENT_ADDRESSES_TLV
Packit b9ca78
            addresses_info[
Packit b9ca78
                NM.LLDP_ATTR_MANAGEMENT_ADDRESSES
Packit b9ca78
            ] = addresses_objects
Packit b9ca78
            info.append(addresses_info)
Packit b9ca78
Packit b9ca78
Packit b9ca78
def _add_max_frame_size(neighbor, info):
Packit b9ca78
    mfs = neighbor.get_attr_value(NM.LLDP_ATTR_IEEE_802_3_MAX_FRAME_SIZE)
Packit b9ca78
    if mfs:
Packit b9ca78
        mfs_object = {
Packit b9ca78
            LLDP.Neighbors.TLV_TYPE: ORGANIZATION_SPECIFIC_TLV,
Packit b9ca78
            LLDP.Neighbors.ORGANIZATION_CODE: IEEE_802_3,
Packit b9ca78
            LLDP.Neighbors.TLV_SUBTYPE: MFS_SUBTYPE_TLV,
Packit b9ca78
            NM.LLDP_ATTR_IEEE_802_3_MAX_FRAME_SIZE: mfs.get_uint32(),
Packit b9ca78
        }
Packit b9ca78
        info.append(mfs_object)
Packit b9ca78
Packit b9ca78
Packit b9ca78
def _decode_management_address_type(code, address):
Packit b9ca78
    if code == NM_MANAGEMENT_ADDR_TYPE_IPV4:
Packit b9ca78
        addr = ".".join(map(str, address))
Packit b9ca78
        addr_type = "ipv4"
Packit b9ca78
    elif code == NM_MANAGEMENT_ADDR_TYPE_MAC:
Packit b9ca78
        addr = ":".join(["{:02X}".format(octet) for octet in address])
Packit b9ca78
        addr_type = "MAC"
Packit b9ca78
    else:
Packit b9ca78
        addr = ":".join(["{:04X}".format(octet) for octet in address])
Packit b9ca78
        addr_type = "ipv6"
Packit b9ca78
Packit b9ca78
    return addr, addr_type