Blame libnmstate/route_rule.py

Packit Service 0535c1
from collections import defaultdict
Packit Service 0535c1
import logging
Packit Service 0535c1
Packit Service 0535c1
from libnmstate.error import NmstateNotImplementedError
Packit Service 0535c1
from libnmstate.error import NmstateVerificationError
Packit Service 0535c1
from libnmstate.error import NmstateValueError
Packit Service 0535c1
from libnmstate.iplib import KERNEL_MAIN_ROUTE_TABLE_ID
Packit Service 0535c1
from libnmstate.iplib import is_ipv6_address
Packit Service 0535c1
from libnmstate.iplib import canonicalize_ip_network
Packit Service 0535c1
from libnmstate.prettystate import format_desired_current_state_diff
Packit Service 0535c1
from libnmstate.schema import Interface
Packit Service 0535c1
from libnmstate.schema import RouteRule
Packit Service 0535c1
from libnmstate.schema import Route
Packit Service 0535c1
Packit Service 0535c1
from .state import StateEntry
Packit Service 0535c1
from .state import state_match
Packit Service 0535c1
Packit Service 0535c1
Packit Service 0535c1
class RouteRuleEntry(StateEntry):
Packit Service 0535c1
    def __init__(self, route_rule):
Packit Service 0535c1
        self.ip_from = route_rule.get(RouteRule.IP_FROM)
Packit Service 0535c1
        self.ip_to = route_rule.get(RouteRule.IP_TO)
Packit Service 0535c1
        self.priority = route_rule.get(RouteRule.PRIORITY)
Packit Service 0535c1
        self.route_table = route_rule.get(RouteRule.ROUTE_TABLE)
Packit Service 0535c1
        self._complement_defaults()
Packit Service 0535c1
        self._canonicalize_ip_network()
Packit Service 0535c1
Packit Service 0535c1
    def _complement_defaults(self):
Packit Service 0535c1
        if self.ip_from is None:
Packit Service 0535c1
            self.ip_from = ""
Packit Service 0535c1
        if self.ip_to is None:
Packit Service 0535c1
            self.ip_to = ""
Packit Service 0535c1
        if self.priority is None:
Packit Service 0535c1
            self.priority = RouteRule.USE_DEFAULT_PRIORITY
Packit Service 0535c1
        if (
Packit Service 0535c1
            self.route_table is None
Packit Service 0535c1
            or self.route_table == RouteRule.USE_DEFAULT_ROUTE_TABLE
Packit Service 0535c1
        ):
Packit Service 0535c1
            self.route_table = KERNEL_MAIN_ROUTE_TABLE_ID
Packit Service 0535c1
Packit Service 0535c1
    def _canonicalize_ip_network(self):
Packit Service 0535c1
        if self.ip_from:
Packit Service 0535c1
            self.ip_from = canonicalize_ip_network(self.ip_from)
Packit Service 0535c1
        if self.ip_to:
Packit Service 0535c1
            self.ip_to = canonicalize_ip_network(self.ip_to)
Packit Service 0535c1
Packit Service 0535c1
    def _keys(self):
Packit Service 0535c1
        return (self.ip_from, self.ip_to, self.priority, self.route_table)
Packit Service 0535c1
Packit Service 0535c1
    @property
Packit Service 0535c1
    def is_ipv6(self):
Packit Service 0535c1
        if self.ip_from:
Packit Service 0535c1
            return is_ipv6_address(self.ip_from)
Packit Service 0535c1
        elif self.ip_to:
Packit Service 0535c1
            return is_ipv6_address(self.ip_to)
Packit Service 0535c1
        else:
Packit Service 0535c1
            logging.warning(
Packit Service 0535c1
                f"Neither {RouteRule.IP_FROM} nor {RouteRule.IP_TO} "
Packit Service 0535c1
                "is defined, treating it a IPv4 route rule"
Packit Service 0535c1
            )
Packit Service 0535c1
            return False
Packit Service 0535c1
Packit Service 0535c1
    @property
Packit Service 0535c1
    def absent(self):
Packit Service 0535c1
        raise NmstateNotImplementedError(
Packit Service 0535c1
            "RouteRuleEntry does not support absent property"
Packit Service 0535c1
        )
Packit Service 0535c1
Packit Service 0535c1
    def is_valid(self, config_iface_routes):
Packit Service 0535c1
        """
Packit Service 0535c1
        Return False when there is no route for defined route table.
Packit Service 0535c1
        """
Packit Service 0535c1
        found = False
Packit Service 0535c1
        for route_set in config_iface_routes.values():
Packit Service 0535c1
            for route in route_set:
Packit Service 0535c1
                if route.table_id == self.route_table or (
Packit Service 0535c1
                    route.table_id == Route.USE_DEFAULT_ROUTE_TABLE
Packit Service 0535c1
                    and self.route_table == KERNEL_MAIN_ROUTE_TABLE_ID
Packit Service 0535c1
                ):
Packit Service 0535c1
                    found = True
Packit Service 0535c1
                    break
Packit Service 0535c1
        return found
Packit Service 0535c1
Packit Service 0535c1
Packit Service 0535c1
class RouteRuleState:
Packit Service 0535c1
    def __init__(self, route_state, des_rule_state, cur_rule_state):
Packit Service 0535c1
        self._config_changed = False
Packit Service 0535c1
        self._cur_rules = defaultdict(set)
Packit Service 0535c1
        self._rules = defaultdict(set)
Packit Service 0535c1
        if cur_rule_state:
Packit Service 0535c1
            for rule_dict in _get_config(cur_rule_state):
Packit Service 0535c1
                rule = RouteRuleEntry(rule_dict)
Packit Service 0535c1
                self._cur_rules[rule.route_table].add(rule)
Packit Service 0535c1
        if des_rule_state:
Packit Service 0535c1
            for rule_dict in _get_config(des_rule_state):
Packit Service 0535c1
                rule = RouteRuleEntry(rule_dict)
Packit Service 0535c1
                self._rules[rule.route_table].add(rule)
Packit Service 0535c1
            if self._rules != self._cur_rules:
Packit Service 0535c1
                self._config_changed = True
Packit Service 0535c1
        else:
Packit Service 0535c1
            # Discard invalid route rule when merging from current
Packit Service 0535c1
            for rules in self._cur_rules.values():
Packit Service 0535c1
                for rule in rules:
Packit Service 0535c1
                    if not route_state or rule.is_valid(
Packit Service 0535c1
                        route_state.config_iface_routes
Packit Service 0535c1
                    ):
Packit Service 0535c1
                        self._rules[rule.route_table].add(rule)
Packit Service 0535c1
Packit Service 0535c1
    @property
Packit Service 0535c1
    def _config(self):
Packit Service 0535c1
        return _get_config(self._rules)
Packit Service 0535c1
Packit Service 0535c1
    def verify(self, cur_rule_state):
Packit Service 0535c1
        current = RouteRuleState(
Packit Service 0535c1
            route_state=None,
Packit Service 0535c1
            des_rule_state=None,
Packit Service 0535c1
            cur_rule_state=cur_rule_state,
Packit Service 0535c1
        )
Packit Service 0535c1
        for route_table, rules in self._rules.items():
Packit Service 0535c1
            rule_info = [
Packit Service 0535c1
                _remove_route_rule_default_values(r.to_dict())
Packit Service 0535c1
                for r in sorted(rules)
Packit Service 0535c1
            ]
Packit Service 0535c1
            cur_rule_info = [
Packit Service 0535c1
                r.to_dict()
Packit Service 0535c1
                for r in sorted(current._rules.get(route_table, set()))
Packit Service 0535c1
            ]
Packit Service 0535c1
Packit Service 0535c1
            if not state_match(rule_info, cur_rule_info):
Packit Service 0535c1
                raise NmstateVerificationError(
Packit Service 0535c1
                    format_desired_current_state_diff(
Packit Service 0535c1
                        {RouteRule.KEY: {RouteRule.CONFIG: rule_info}},
Packit Service 0535c1
                        {RouteRule.KEY: {RouteRule.CONFIG: cur_rule_info}},
Packit Service 0535c1
                    )
Packit Service 0535c1
                )
Packit Service 0535c1
Packit Service 0535c1
    @property
Packit Service 0535c1
    def config_changed(self):
Packit Service 0535c1
        return self._config_changed
Packit Service 0535c1
Packit Service 0535c1
    def gen_metadata(self, route_state):
Packit Service 0535c1
        """
Packit Service 0535c1
        Generate metada which could used for storing into interface.
Packit Service 0535c1
        Data structure returned is:
Packit Service 0535c1
            {
Packit Service 0535c1
                iface_name: {
Packit Service 0535c1
                    Interface.IPV4: ipv4_route_rules,
Packit Service 0535c1
                    Interface.IPV6: ipv6_route_rules,
Packit Service 0535c1
                }
Packit Service 0535c1
            }
Packit Service 0535c1
        """
Packit Service 0535c1
        route_rule_metadata = {}
Packit Service 0535c1
        for route_table, rules in self._rules.items():
Packit Service 0535c1
            iface_name = self._iface_for_route_table(route_state, route_table)
Packit Service 0535c1
            route_rule_metadata[iface_name] = {
Packit Service 0535c1
                Interface.IPV4: [],
Packit Service 0535c1
                Interface.IPV6: [],
Packit Service 0535c1
            }
Packit Service 0535c1
            for rule in rules:
Packit Service 0535c1
                family = Interface.IPV6 if rule.is_ipv6 else Interface.IPV4
Packit Service 0535c1
                route_rule_metadata[iface_name][family].append(rule.to_dict())
Packit Service 0535c1
        return route_rule_metadata
Packit Service 0535c1
Packit Service 0535c1
    def _iface_for_route_table(self, route_state, route_table):
Packit Service 0535c1
        for routes in route_state.config_iface_routes.values():
Packit Service 0535c1
            for route in routes:
Packit Service 0535c1
                if route.table_id == route_table:
Packit Service 0535c1
                    return route.next_hop_interface
Packit Service 0535c1
        raise NmstateValueError(
Packit Service 0535c1
            "Failed to find interface to with route table ID "
Packit Service 0535c1
            f"{route_table} to store route rules"
Packit Service 0535c1
        )
Packit Service 0535c1
Packit Service 0535c1
Packit Service 0535c1
def _get_config(state):
Packit Service 0535c1
    return state.get(RouteRule.CONFIG, [])
Packit Service 0535c1
Packit Service 0535c1
Packit Service 0535c1
def _remove_route_rule_default_values(rule):
Packit Service 0535c1
    if rule.get(RouteRule.PRIORITY) == RouteRule.USE_DEFAULT_PRIORITY:
Packit Service 0535c1
        del rule[RouteRule.PRIORITY]
Packit Service 0535c1
    if rule.get(RouteRule.ROUTE_TABLE) == RouteRule.USE_DEFAULT_ROUTE_TABLE:
Packit Service 0535c1
        del rule[RouteRule.ROUTE_TABLE]
Packit Service 0535c1
    return rule