Blob Blame History Raw
#
# Copyright (c) 2020 Red Hat, Inc.
#
# This file is part of nmstate
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, either version 2.1 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
#

from copy import deepcopy

from libnmstate.error import NmstateValueError
from libnmstate.error import NmstateVerificationError
from libnmstate.error import NmstateNotImplementedError
from libnmstate.iplib import is_ipv6_address
from libnmstate.prettystate import format_desired_current_state_diff
from libnmstate.schema import DNS
from libnmstate.schema import Interface


class DnsState:
    PRIORITY_METADATA = "_priority"

    def __init__(self, des_dns_state, cur_dns_state):
        self._config_changed = False
        if des_dns_state is None or des_dns_state.get(DNS.CONFIG) is None:
            # Use current config if DNS.KEY not defined or DNS.CONFIG not
            # defined.
            self._dns_state = cur_dns_state or {}
        else:
            self._dns_state = des_dns_state
            self._validate()
            self._config_changed = _is_dns_config_changed(
                des_dns_state, cur_dns_state
            )
        self._cur_dns_state = deepcopy(cur_dns_state) if cur_dns_state else {}

    @property
    def current_config(self):
        return _get_config(self._cur_dns_state)

    @property
    def config(self):
        return _get_config(self._dns_state)

    @property
    def _config_servers(self):
        return _get_config_servers(self._dns_state)

    @property
    def _config_searches(self):
        return _get_config_searches(self._dns_state)

    def gen_metadata(self, ifaces, route_state):
        """
        Return DNS configure targeting to store as metadata of interface.
        Data structure returned is:
            {
                iface_name: {
                    Interface.IPV4: {
                        DNS.SERVER: dns_servers,
                        DNS.SEARCH: dns_searches,
                    },
                    Interface.IPV6: {
                        DNS.SERVER: dns_servers,
                        DNS.SEARCH: dns_searches,
                    },
                }
            }
        """
        iface_metadata = {}
        if not self._config_servers and not self._config_searches:
            return iface_metadata
        ipv4_iface, ipv6_iface = self._find_ifaces_for_name_servers(
            ifaces, route_state
        )
        if ipv4_iface == ipv6_iface:
            iface_metadata = {
                ipv4_iface: {
                    Interface.IPV4: {DNS.SERVER: [], DNS.SEARCH: []},
                    Interface.IPV6: {DNS.SERVER: [], DNS.SEARCH: []},
                },
            }
        else:
            if ipv4_iface:
                iface_metadata[ipv4_iface] = {
                    Interface.IPV4: {DNS.SERVER: [], DNS.SEARCH: []},
                }
            if ipv6_iface:
                iface_metadata[ipv6_iface] = {
                    Interface.IPV6: {DNS.SERVER: [], DNS.SEARCH: []},
                }
        index = 0
        searches_saved = False
        for server in self._config_servers:
            iface_name = None
            if is_ipv6_address(server):
                iface_name = ipv6_iface
                family = Interface.IPV6
            else:
                iface_name = ipv4_iface
                family = Interface.IPV4
            if not iface_name:
                raise NmstateValueError(
                    "Failed to find suitable interface for saving DNS "
                    "name servers: %s" % server
                )
            iface_dns_metada = iface_metadata[iface_name][family]
            iface_dns_metada[DNS.SERVER].append(server)
            iface_dns_metada.setdefault(DnsState.PRIORITY_METADATA, index)
            if not searches_saved:
                iface_dns_metada[DNS.SEARCH] = self._config_searches
            searches_saved = True
            index += 1
        return iface_metadata

    def _find_ifaces_for_name_servers(self, ifaces, route_state):
        """
        Find interface to store the DNS configurations in the order of:
            * Any interface with static gateway
            * Any interface configured as dynamic IP with 'auto-dns:False'
        Return tuple: (ipv4_iface, ipv6_iface)
        """
        ipv4_iface, ipv6_iface = self._find_ifaces_with_static_gateways(
            route_state
        )
        if not (ipv4_iface and ipv6_iface):
            (
                auto_ipv4_iface,
                auto_ipv6_iface,
            ) = self._find_ifaces_with_auto_dns_false(ifaces)
            if not ipv4_iface and auto_ipv4_iface:
                ipv4_iface = auto_ipv4_iface
            if not ipv6_iface and auto_ipv6_iface:
                ipv6_iface = auto_ipv6_iface

        return ipv4_iface, ipv6_iface

    def _find_ifaces_with_static_gateways(self, route_state):
        """
        Return tuple of interfaces with IPv4 and IPv6 static gateways.
        """
        ipv4_iface = None
        ipv6_iface = None
        for iface_name, route_set in route_state.config_iface_routes.items():
            for route in route_set:
                if ipv4_iface and ipv6_iface:
                    return (ipv4_iface, ipv6_iface)
                if route.is_gateway:
                    if route.is_ipv6:
                        ipv6_iface = iface_name
                    else:
                        ipv4_iface = iface_name
        return (ipv4_iface, ipv6_iface)

    def _find_ifaces_with_auto_dns_false(self, ifaces):
        ipv4_iface = None
        ipv6_iface = None
        for iface in ifaces.values():
            if ipv4_iface and ipv6_iface:
                return (ipv4_iface, ipv6_iface)
            for family in (Interface.IPV4, Interface.IPV6):
                ip_state = iface.ip_state(family)
                if ip_state.is_dynamic and (not ip_state.auto_dns):
                    if family == Interface.IPV4:
                        ipv4_iface = iface.name
                    else:
                        ipv6_iface = iface.name

        return (ipv4_iface, ipv6_iface)

    def verify(self, cur_dns_state):
        cur_dns = DnsState(des_dns_state=None, cur_dns_state=cur_dns_state,)
        if self.config.get(DNS.SERVER, []) != cur_dns.config.get(
            DNS.SERVER, []
        ) or self.config.get(DNS.SEARCH, []) != cur_dns.config.get(
            DNS.SEARCH, []
        ):
            raise NmstateVerificationError(
                format_desired_current_state_diff(
                    {DNS.KEY: self.config}, {DNS.KEY: cur_dns.config},
                )
            )

    def _validate(self):
        if (
            len(self._config_servers) > 2
            and any(is_ipv6_address(n) for n in self._config_servers)
            and any(not is_ipv6_address(n) for n in self._config_servers)
        ):
            raise NmstateNotImplementedError(
                "Three or more nameservers are only supported when using "
                "either IPv4 or IPv6 nameservers but not both."
            )

    @property
    def config_changed(self):
        return self._config_changed


def _get_config(state):
    conf = state.get(DNS.CONFIG, {})
    if not conf:
        conf = {DNS.SERVER: [], DNS.SEARCH: []}
    return conf


def _get_config_servers(state):
    return _get_config(state).get(DNS.SERVER, [])


def _get_config_searches(state):
    return _get_config(state).get(DNS.SEARCH, [])


def _is_dns_config_changed(des_dns_state, cur_dns_state):
    return _get_config_servers(des_dns_state) != _get_config_servers(
        cur_dns_state
    ) or _get_config_searches(des_dns_state) != _get_config_searches(
        cur_dns_state
    )