Blame libnmstate/route.py

Packit b9ca78
#
Packit b9ca78
# Copyright (c) 2020 Red Hat, Inc.
Packit b9ca78
#
Packit b9ca78
# This file is part of nmstate
Packit b9ca78
#
Packit b9ca78
# This program is free software: you can redistribute it and/or modify
Packit b9ca78
# it under the terms of the GNU Lesser General Public License as published by
Packit b9ca78
# the Free Software Foundation, either version 2.1 of the License, or
Packit b9ca78
# (at your option) any later version.
Packit b9ca78
#
Packit b9ca78
# This program is distributed in the hope that it will be useful,
Packit b9ca78
# but WITHOUT ANY WARRANTY; without even the implied warranty of
Packit b9ca78
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
Packit b9ca78
# GNU Lesser General Public License for more details.
Packit b9ca78
#
Packit b9ca78
# You should have received a copy of the GNU Lesser General Public License
Packit b9ca78
# along with this program. If not, see <https://www.gnu.org/licenses/>.
Packit b9ca78
#
Packit b9ca78
Packit b9ca78
from collections import defaultdict
Packit b9ca78
Packit b9ca78
from libnmstate.error import NmstateValueError
Packit b9ca78
from libnmstate.error import NmstateVerificationError
Packit b9ca78
from libnmstate.iplib import is_ipv6_address
Packit b9ca78
from libnmstate.iplib import canonicalize_ip_network
Packit b9ca78
from libnmstate.iplib import canonicalize_ip_address
Packit b9ca78
from libnmstate.prettystate import format_desired_current_state_diff
Packit b9ca78
from libnmstate.schema import Interface
Packit b9ca78
from libnmstate.schema import Route
Packit b9ca78
Packit Service 588ec5
from .ifaces.base_iface import BaseIface
Packit b9ca78
from .state import StateEntry
Packit b9ca78
from .state import state_match
Packit b9ca78
Packit b9ca78
Packit b9ca78
class RouteEntry(StateEntry):
Packit b9ca78
    IPV4_DEFAULT_GATEWAY_DESTINATION = "0.0.0.0/0"
Packit b9ca78
    IPV6_DEFAULT_GATEWAY_DESTINATION = "::/0"
Packit b9ca78
Packit b9ca78
    def __init__(self, route):
Packit b9ca78
        self.table_id = route.get(Route.TABLE_ID)
Packit b9ca78
        self.state = route.get(Route.STATE)
Packit b9ca78
        self.metric = route.get(Route.METRIC)
Packit b9ca78
        self.destination = route.get(Route.DESTINATION)
Packit b9ca78
        self.next_hop_address = route.get(Route.NEXT_HOP_ADDRESS)
Packit b9ca78
        self.next_hop_interface = route.get(Route.NEXT_HOP_INTERFACE)
Packit b9ca78
        # TODO: Convert IPv6 full address to abbreviated address
Packit b9ca78
        self.complement_defaults()
Packit b9ca78
        self._invalid_reason = None
Packit b9ca78
        self._canonicalize_ip_address()
Packit b9ca78
Packit b9ca78
    @property
Packit b9ca78
    def is_ipv6(self):
Packit b9ca78
        return is_ipv6_address(self.destination)
Packit b9ca78
Packit b9ca78
    @property
Packit b9ca78
    def is_gateway(self):
Packit b9ca78
        if self.is_ipv6:
Packit b9ca78
            return (
Packit b9ca78
                self.destination == RouteEntry.IPV6_DEFAULT_GATEWAY_DESTINATION
Packit b9ca78
            )
Packit b9ca78
        else:
Packit b9ca78
            return (
Packit b9ca78
                self.destination == RouteEntry.IPV4_DEFAULT_GATEWAY_DESTINATION
Packit b9ca78
            )
Packit b9ca78
Packit b9ca78
    @property
Packit b9ca78
    def invalid_reason(self):
Packit b9ca78
        return self._invalid_reason
Packit b9ca78
Packit b9ca78
    def complement_defaults(self):
Packit b9ca78
        if not self.absent:
Packit b9ca78
            if self.table_id is None:
Packit b9ca78
                self.table_id = Route.USE_DEFAULT_ROUTE_TABLE
Packit b9ca78
            if self.metric is None:
Packit b9ca78
                self.metric = Route.USE_DEFAULT_METRIC
Packit b9ca78
            if self.next_hop_address is None:
Packit b9ca78
                self.next_hop_address = ""
Packit b9ca78
Packit b9ca78
    def _keys(self):
Packit b9ca78
        return (
Packit b9ca78
            self.table_id,
Packit b9ca78
            self.metric,
Packit b9ca78
            self.destination,
Packit b9ca78
            self.next_hop_address,
Packit b9ca78
            self.next_hop_interface,
Packit b9ca78
        )
Packit b9ca78
Packit b9ca78
    def __lt__(self, other):
Packit b9ca78
        return (
Packit b9ca78
            self.table_id or Route.USE_DEFAULT_ROUTE_TABLE,
Packit b9ca78
            self.next_hop_interface or "",
Packit b9ca78
            self.destination or "",
Packit Service 293802
            self.next_hop_address or "",
Packit Service 293802
            self.metric or Route.USE_DEFAULT_METRIC,
Packit b9ca78
        ) < (
Packit b9ca78
            other.table_id or Route.USE_DEFAULT_ROUTE_TABLE,
Packit b9ca78
            other.next_hop_interface or "",
Packit b9ca78
            other.destination or "",
Packit Service 293802
            other.next_hop_address or "",
Packit Service 293802
            other.metric or Route.USE_DEFAULT_METRIC,
Packit b9ca78
        )
Packit b9ca78
Packit b9ca78
    @property
Packit b9ca78
    def absent(self):
Packit b9ca78
        return self.state == Route.STATE_ABSENT
Packit b9ca78
Packit b9ca78
    def is_valid(self, ifaces):
Packit b9ca78
        """
Packit b9ca78
        Return False when next hop interface or destination not defined;
Packit b9ca78
        Return False when route is next hop to any of these interfaces:
Packit b9ca78
            * Interface not in InterfaceState.UP state.
Packit b9ca78
            * Interface does not exists.
Packit b9ca78
            * Interface has IPv4/IPv6 disabled.
Packit b9ca78
            * Interface configured as dynamic IPv4/IPv6.
Packit b9ca78
        """
Packit b9ca78
        if not self.next_hop_interface:
Packit b9ca78
            self._invalid_reason = (
Packit b9ca78
                "Route entry does not have next hop interface"
Packit b9ca78
            )
Packit b9ca78
            return False
Packit b9ca78
        if not self.destination:
Packit b9ca78
            self._invalid_reason = "Route entry does not have destination"
Packit b9ca78
            return False
Packit Service 588ec5
        iface = ifaces.all_kernel_ifaces.get(self.next_hop_interface)
Packit b9ca78
        if not iface:
Packit b9ca78
            self._invalid_reason = (
Packit b9ca78
                f"Route {self.to_dict()} next hop to unknown interface"
Packit b9ca78
            )
Packit b9ca78
            return False
Packit b9ca78
        if not iface.is_up:
Packit b9ca78
            self._invalid_reason = (
Packit b9ca78
                f"Route {self.to_dict()} next hop to down/absent interface"
Packit b9ca78
            )
Packit b9ca78
            return False
Packit b9ca78
        if iface.is_dynamic(
Packit b9ca78
            Interface.IPV6 if self.is_ipv6 else Interface.IPV4
Packit b9ca78
        ):
Packit b9ca78
            self._invalid_reason = (
Packit b9ca78
                f"Route {self.to_dict()} next hop to interface with dynamic IP"
Packit b9ca78
            )
Packit b9ca78
            return False
Packit b9ca78
        if self.is_ipv6:
Packit b9ca78
            if not iface.is_ipv6_enabled():
Packit b9ca78
                self._invalid_reason = (
Packit b9ca78
                    f"Route {self.to_dict()} next hop to interface with IPv6 "
Packit b9ca78
                    "disabled"
Packit b9ca78
                )
Packit b9ca78
                return False
Packit b9ca78
        else:
Packit b9ca78
            if not iface.is_ipv4_enabled():
Packit b9ca78
                self._invalid_reason = (
Packit b9ca78
                    f"Route {self.to_dict()} next hop to interface with IPv4 "
Packit b9ca78
                    "disabled"
Packit b9ca78
                )
Packit b9ca78
                return False
Packit b9ca78
        return True
Packit b9ca78
Packit b9ca78
    def _canonicalize_ip_address(self):
Packit b9ca78
        if not self.absent:
Packit b9ca78
            if self.destination:
Packit b9ca78
                self.destination = canonicalize_ip_network(self.destination)
Packit b9ca78
            if self.next_hop_address:
Packit b9ca78
                self.next_hop_address = canonicalize_ip_address(
Packit b9ca78
                    self.next_hop_address
Packit b9ca78
                )
Packit b9ca78
Packit b9ca78
Packit b9ca78
class RouteState:
Packit b9ca78
    def __init__(self, ifaces, des_route_state, cur_route_state):
Packit b9ca78
        self._cur_routes = defaultdict(set)
Packit b9ca78
        self._routes = defaultdict(set)
Packit b9ca78
        if cur_route_state:
Packit b9ca78
            for entry in cur_route_state.get(Route.CONFIG, []):
Packit b9ca78
                rt = RouteEntry(entry)
Packit b9ca78
                self._cur_routes[rt.next_hop_interface].add(rt)
Packit b9ca78
                if not ifaces or rt.is_valid(ifaces):
Packit b9ca78
                    self._routes[rt.next_hop_interface].add(rt)
Packit b9ca78
        if des_route_state:
Packit b9ca78
            self._merge_routes(des_route_state, ifaces)
Packit b9ca78
Packit b9ca78
    def _merge_routes(self, des_route_state, ifaces):
Packit b9ca78
        # Handle absent route before adding desired route entries to
Packit b9ca78
        # make sure absent route does not delete route defined in
Packit b9ca78
        # desire state
Packit b9ca78
        for entry in des_route_state.get(Route.CONFIG, []):
Packit b9ca78
            rt = RouteEntry(entry)
Packit b9ca78
            if rt.absent:
Packit b9ca78
                self._apply_absent_routes(rt, ifaces)
Packit b9ca78
        for entry in des_route_state.get(Route.CONFIG, []):
Packit b9ca78
            rt = RouteEntry(entry)
Packit b9ca78
            if not rt.absent:
Packit b9ca78
                if rt.is_valid(ifaces):
Packit Service 588ec5
                    ifaces.all_kernel_ifaces[
Packit Service 588ec5
                        rt.next_hop_interface
Packit Service 588ec5
                    ].mark_as_changed()
Packit b9ca78
                    self._routes[rt.next_hop_interface].add(rt)
Packit b9ca78
                else:
Packit b9ca78
                    raise NmstateValueError(rt.invalid_reason)
Packit b9ca78
Packit b9ca78
    def _apply_absent_routes(self, rt, ifaces):
Packit b9ca78
        """
Packit b9ca78
        Remove routes based on absent routes and treat missing property as
Packit b9ca78
        wildcard match.
Packit b9ca78
        """
Packit b9ca78
        absent_iface_name = rt.next_hop_interface
Packit b9ca78
        for iface_name, route_set in self._routes.items():
Packit b9ca78
            if absent_iface_name and absent_iface_name != iface_name:
Packit b9ca78
                continue
Packit b9ca78
            new_routes = set()
Packit b9ca78
            for route in route_set:
Packit b9ca78
                if not rt.match(route):
Packit b9ca78
                    new_routes.add(route)
Packit b9ca78
            if new_routes != route_set:
Packit b9ca78
                self._routes[iface_name] = new_routes
Packit b9ca78
Packit b9ca78
    def gen_metadata(self, ifaces):
Packit b9ca78
        """
Packit b9ca78
        Generate metada which could used for storing into interface.
Packit b9ca78
        Data structure returned is:
Packit b9ca78
            {
Packit b9ca78
                iface_name: {
Packit b9ca78
                    Interface.IPV4: ipv4_routes,
Packit b9ca78
                    Interface.IPV6: ipv6_routes,
Packit b9ca78
                }
Packit b9ca78
            }
Packit b9ca78
        """
Packit b9ca78
        route_metadata = {}
Packit b9ca78
        for iface_name, route_set in self._routes.items():
Packit b9ca78
            route_metadata[iface_name] = {
Packit b9ca78
                Interface.IPV4: [],
Packit b9ca78
                Interface.IPV6: [],
Packit b9ca78
            }
Packit Service 588ec5
            if route_set != self._cur_routes[iface_name]:
Packit Service 588ec5
                route_metadata[iface_name][
Packit Service 588ec5
                    BaseIface.ROUTE_CHANGED_METADATA
Packit Service 588ec5
                ] = True
Packit b9ca78
            for route in route_set:
Packit b9ca78
                family = Interface.IPV6 if route.is_ipv6 else Interface.IPV4
Packit b9ca78
                route_metadata[iface_name][family].append(route.to_dict())
Packit b9ca78
        return route_metadata
Packit b9ca78
Packit b9ca78
    @property
Packit b9ca78
    def config_iface_routes(self):
Packit b9ca78
        """
Packit b9ca78
        Return configured routes indexed by next hop interface
Packit b9ca78
        """
Packit b9ca78
        if list(self._routes.values()) == [set()]:
Packit b9ca78
            return {}
Packit b9ca78
        return self._routes
Packit b9ca78
Packit b9ca78
    def verify(self, cur_route_state):
Packit b9ca78
        current = RouteState(
Packit b9ca78
            ifaces=None, des_route_state=None, cur_route_state=cur_route_state
Packit b9ca78
        )
Packit b9ca78
        for iface_name, route_set in self._routes.items():
Packit b9ca78
            routes_info = [r.to_dict() for r in sorted(route_set)]
Packit b9ca78
            cur_routes_info = [
Packit b9ca78
                r.to_dict()
Packit b9ca78
                for r in sorted(current._routes.get(iface_name, set()))
Packit b9ca78
            ]
Packit b9ca78
            if not state_match(routes_info, cur_routes_info):
Packit b9ca78
                raise NmstateVerificationError(
Packit b9ca78
                    format_desired_current_state_diff(
Packit b9ca78
                        {Route.KEY: {Route.CONFIG: routes_info}},
Packit b9ca78
                        {Route.KEY: {Route.CONFIG: cur_routes_info}},
Packit b9ca78
                    )
Packit b9ca78
                )