Blob Blame History Raw
#
# Copyright (c) 2020 Red Hat, Inc.
#
# This file is part of nmstate
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, either version 2.1 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
#

from libnmstate.schema import LinuxBridge as LB

from .common import NM


class PortVlanFilter:
    def __init__(self):
        self._trunk_tags = []
        self._tag = None
        self._is_native = None
        self._port_mode = None

    def create_configuration(self, trunk_tags, tag, is_native_vlan=False):
        """
        Fill the PortVlanFilter object with data whose format is tied to the
        API.
        :param trunk_tags: list of schema.LinuxBridge.Port.Vlan.TrunkTags
               objects.
        :param tag: the access tag for access ports, the native vlan ID for
               trunk ports
        :param is_native_vlan: boolean attribute indicating if the trunk port
               has a native vlan.
        """
        self._trunk_tags = trunk_tags
        self._tag = tag
        self._is_native = is_native_vlan
        self._port_mode = (
            LB.Port.Vlan.Mode.TRUNK if trunk_tags else LB.Port.Vlan.Mode.ACCESS
        )

    @property
    def trunk_tags(self):
        return self._trunk_tags

    @property
    def tag(self):
        return self._tag

    @property
    def is_native(self):
        return self._is_native

    @property
    def port_mode(self):
        return self._port_mode

    def to_nm(self):
        """
        Generate a list of NM.BridgeVlan objects from the encapsulated
        PortVlanFilter data
        """

        port_vlan_config = []
        if self._port_mode == LB.Port.Vlan.Mode.TRUNK:
            port_vlan_config += map(
                PortVlanFilter._generate_vlan_trunk_port_config,
                self._trunk_tags,
            )
            if self._is_native and self._tag:
                port_vlan_config.append(
                    PortVlanFilter._generate_vlan_access_port_config(self._tag)
                )
        elif self._port_mode == LB.Port.Vlan.Mode.ACCESS and self._tag:
            port_vlan_config.append(
                PortVlanFilter._generate_vlan_access_port_config(self._tag)
            )

        return port_vlan_config

    def to_dict(self):
        """
        Get the port vlan filtering configuration in dict format - e.g. in yaml
        format:
        - name: eth1
          vlan:
            type: trunk
            trunk-tags:
              - id: 101
              - id-range:
                  min: 200
                  max: 4095
            tag: 100
            enable-native: true
        """

        port_vlan_state = {
            LB.Port.Vlan.MODE: self._port_mode,
            LB.Port.Vlan.TRUNK_TAGS: self._trunk_tags,
        }
        if self._tag:
            port_vlan_state[LB.Port.Vlan.TAG] = self._tag
        if self._port_mode == LB.Port.Vlan.Mode.TRUNK:
            port_vlan_state[LB.Port.Vlan.ENABLE_NATIVE] = self._is_native
        return port_vlan_state

    def import_from_bridge_settings(self, nm_bridge_vlans):
        """
        Instantiates a PortVlanFilter object from a list of NM.BridgeVlan
        objects.
        """

        self._is_native = False
        trunk_tags = []

        is_access_port = PortVlanFilter._is_access_port(nm_bridge_vlans)
        for nm_bridge_vlan in nm_bridge_vlans:
            vlan_min, vlan_max = PortVlanFilter.get_vlan_tag_range(
                nm_bridge_vlan
            )
            if is_access_port:
                self._tag = vlan_min
            elif nm_bridge_vlan.is_pvid() and nm_bridge_vlan.is_untagged():
                # an NM.BridgeVlan has a range and can be PVID and/or untagged
                # according to NM's model, PVID / untagged apply to the 'max'
                # part of the range
                self._tag = vlan_max
                self._is_native = True
            else:
                trunk_tags.append(
                    PortVlanFilter._translate_nm_bridge_vlan_to_trunk_tags(
                        vlan_min, vlan_max
                    )
                )

        self._trunk_tags = trunk_tags
        self._port_mode = (
            LB.Port.Vlan.Mode.TRUNK if trunk_tags else LB.Port.Vlan.Mode.ACCESS
        )

    @staticmethod
    def get_vlan_tag_range(nm_bridge_vlan):
        """
        Extract the vlan tags from the NM.BridgeVlan object.
        A single NM.BridgeVlan object can have a range of vlan tags, or a
        single one.
        When a NM.BridgeVlan holds a single tag, the min_range and max_range
        returned will have the same vlan tag.
        :return: min_range, max_range
        """

        port_vlan_tags = nm_bridge_vlan.to_str().split()

        if "-" in port_vlan_tags[0]:
            vlan_min, vlan_max = port_vlan_tags[0].split("-")
            return int(vlan_min), int(vlan_max)
        else:
            tag = int(port_vlan_tags[0])
            return tag, tag

    @staticmethod
    def _is_access_port(nm_bridge_vlan_ports):
        return (
            len(nm_bridge_vlan_ports) == 1
            and nm_bridge_vlan_ports[0].is_pvid()
            and nm_bridge_vlan_ports[0].is_untagged()
        )

    @staticmethod
    def _translate_nm_bridge_vlan_to_trunk_tags(min_vlan, max_vlan):
        if max_vlan != min_vlan:
            port_data = {
                LB.Port.Vlan.TrunkTags.ID_RANGE: {
                    LB.Port.Vlan.TrunkTags.MIN_RANGE: min_vlan,
                    LB.Port.Vlan.TrunkTags.MAX_RANGE: max_vlan,
                }
            }
        else:
            port_data = {LB.Port.Vlan.TrunkTags.ID: min_vlan}

        return port_data

    @staticmethod
    def _generate_vlan_trunk_port_config(trunk_port):
        min_range = max_range = trunk_port.get(LB.Port.Vlan.TrunkTags.ID)
        if min_range is None:
            ranged_vlan_tags = trunk_port.get(LB.Port.Vlan.TrunkTags.ID_RANGE)
            min_range = ranged_vlan_tags[LB.Port.Vlan.TrunkTags.MIN_RANGE]
            max_range = ranged_vlan_tags[LB.Port.Vlan.TrunkTags.MAX_RANGE]
        port_vlan = NM.BridgeVlan.new(min_range, max_range)
        port_vlan.set_untagged(False)
        port_vlan.set_pvid(False)
        return port_vlan

    @staticmethod
    def _generate_vlan_access_port_config(vlan_tag):
        port_vlan = NM.BridgeVlan.new(vlan_tag, vlan_tag)
        port_vlan.set_untagged(True)
        port_vlan.set_pvid(True)
        return port_vlan