#
# Copyright (c) 2020-2021 Red Hat, Inc.
#
# This file is part of nmstate
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, either version 2.1 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
#
from distutils.version import StrictVersion
import logging
from operator import itemgetter
from libnmstate.error import NmstateDependencyError
from libnmstate.error import NmstateNotSupportedError
from libnmstate.error import NmstateValueError
from libnmstate.ifaces.ovs import is_ovs_running
from libnmstate.schema import DNS
from libnmstate.schema import Interface
from libnmstate.schema import InterfaceType
from libnmstate.schema import LLDP
from libnmstate.plugin import NmstatePlugin
from .checkpoint import CheckPoint
from .checkpoint import get_checkpoints
from .common import NM
from .context import NmContext
from .device import get_device_common_info
from .device import get_iface_type
from .device import list_devices
from .dns import get_running as get_dns_running
from .dns import get_running_config as get_dns_running_config
from .infiniband import get_info as get_infiniband_info
from .ipv4 import get_info as get_ipv4_info
from .ipv6 import get_info as get_ipv6_info
from .lldp import get_info as get_lldp_info
from .macvlan import get_current_macvlan_type
from .ovs import get_interface_info as get_ovs_interface_info
from .ovs import get_ovs_bridge_info
from .ovs import get_ovsdb_external_ids
from .ovs import has_ovs_capability
from .profiles import NmProfiles
from .profiles import get_all_applied_configs
from .team import get_info as get_team_info
from .team import has_team_capability
from .translator import Nm2Api
from .user import get_info as get_user_info
from .veth import get_current_veth_type
from .wired import get_info as get_wired_info
from .ieee_802_1x import get_802_1x_info
class NetworkManagerPlugin(NmstatePlugin):
def __init__(self):
self._ctx = None
self._checkpoint = None
self.__applied_configs = None
@property
def priority(self):
return NmstatePlugin.DEFAULT_PRIORITY
@property
def name(self):
return "NetworkManager"
def unload(self):
if self._ctx:
self._ctx.clean_up()
self._ctx = None
@property
def _applied_configs(self):
if self.__applied_configs is None:
self.__applied_configs = get_all_applied_configs(self.context)
return self.__applied_configs
@property
def checkpoint(self):
return self._checkpoint
@property
def client(self):
return self.context.client if self.context else None
@property
def context(self):
if not self._ctx:
self._ctx = NmContext()
self._check_version_mismatch()
return self._ctx
@property
def capabilities(self):
capabilities = []
if has_ovs_capability(self.client) and is_ovs_running():
capabilities.append(NmstatePlugin.OVS_CAPABILITY)
if has_team_capability(self.client):
capabilities.append(NmstatePlugin.TEAM_CAPABILITY)
return capabilities
@property
def plugin_capabilities(self):
return [
NmstatePlugin.PLUGIN_CAPABILITY_IFACE,
NmstatePlugin.PLUGIN_CAPABILITY_ROUTE,
NmstatePlugin.PLUGIN_CAPABILITY_ROUTE_RULE,
NmstatePlugin.PLUGIN_CAPABILITY_DNS,
]
def get_interfaces(self):
info = []
applied_configs = self._applied_configs
devices_info = [
(dev, get_device_common_info(dev))
for dev in list_devices(self.client)
]
for dev, devinfo in devices_info:
if not dev.get_managed():
# Skip unmanaged interface
continue
nm_ac = dev.get_active_connection()
if (
nm_ac
and nm_ac.get_state_flags() & NM.ActivationStateFlags.EXTERNAL
):
# Skip external managed interface
continue
iface_info = Nm2Api.get_common_device_info(devinfo)
applied_config = applied_configs.get(iface_info[Interface.NAME])
act_con = dev.get_active_connection()
iface_info[Interface.IPV4] = get_ipv4_info(act_con, applied_config)
iface_info[Interface.IPV6] = get_ipv6_info(act_con, applied_config)
iface_info.update(get_wired_info(dev))
iface_info.update(get_user_info(self.context, dev))
iface_info.update(get_lldp_info(self.client, dev))
iface_info.update(get_team_info(dev))
iface_info.update(get_infiniband_info(applied_config))
iface_info.update(get_current_macvlan_type(applied_config))
iface_info.update(get_current_veth_type(applied_config))
iface_info.update(get_802_1x_info(self.context, act_con))
if iface_info[Interface.TYPE] == InterfaceType.OVS_BRIDGE:
iface_info.update(get_ovs_bridge_info(dev))
iface_info = _remove_ovs_bridge_unsupported_entries(iface_info)
elif iface_info[Interface.TYPE] == InterfaceType.OVS_INTERFACE:
iface_info.update(get_ovs_interface_info(act_con))
elif iface_info[Interface.TYPE] == InterfaceType.OVS_PORT:
continue
if applied_config:
iface_info.update(get_ovsdb_external_ids(applied_config))
info.append(iface_info)
info.sort(key=itemgetter("name"))
return info
def get_running_config_interfaces(self):
iface_infos = self.get_interfaces()
# Remove LLDP neighber information
for iface_info in iface_infos:
if LLDP.CONFIG_SUBTREE in iface_info:
iface_info[LLDP.CONFIG_SUBTREE].pop(
LLDP.NEIGHBORS_SUBTREE, None
)
return iface_infos
def get_routes(self):
return {}
def get_route_rules(self):
"""
Nispor will provide running config of route rule from kernel.
"""
return {}
def get_dns_client_config(self):
return {
DNS.RUNNING: get_dns_running(self.client),
DNS.CONFIG: get_dns_running_config(self._applied_configs),
}
def refresh_content(self):
self.__applied_configs = None
def apply_changes(self, net_state, save_to_disk):
NmProfiles(self.context).apply_config(net_state, save_to_disk)
def _load_checkpoint(self, checkpoint_path):
if checkpoint_path:
if self._checkpoint:
# Old checkpoint might timeout, hence it's legal to load
# another one.
self._checkpoint.clean_up()
candidates = get_checkpoints(self.client)
if checkpoint_path in candidates:
self._checkpoint = CheckPoint(
nm_context=self.context, dbuspath=checkpoint_path
)
else:
raise NmstateValueError("No checkpoint specified or found")
else:
if not self._checkpoint:
# Get latest one
candidates = get_checkpoints(self.client)
if candidates:
self._checkpoint = CheckPoint(
nm_context=self.context, dbuspath=candidates[0]
)
else:
raise NmstateValueError("No checkpoint specified or found")
def create_checkpoint(self, timeout=60):
self._checkpoint = CheckPoint.create(self.context, timeout)
return str(self._checkpoint)
def rollback_checkpoint(self, checkpoint=None):
self._load_checkpoint(checkpoint)
self._checkpoint.rollback()
self._checkpoint = None
def destroy_checkpoint(self, checkpoint=None):
self._load_checkpoint(checkpoint)
self._checkpoint.destroy()
self._checkpoint = None
def _check_version_mismatch(self):
nm_client_version = self.client.get_version()
nm_utils_version = _nm_utils_decode_version()
if nm_client_version is None:
raise NmstateDependencyError(
"NetworkManager daemon is not running which is required for "
"NetworkManager plugin"
)
elif StrictVersion(nm_client_version) != StrictVersion(
nm_utils_version
):
logging.warning(
"libnm version %s mismatches NetworkManager version %s",
nm_utils_version,
nm_client_version,
)
logging.debug(f"NetworkManager version {nm_client_version}")
def generate_configurations(self, net_state):
if not hasattr(NM, "keyfile_write"):
raise NmstateNotSupportedError(
f"Current NetworkManager version does not support generating "
"configurations, please upgrade to 1.30 or later versoin."
)
return NmProfiles(None).generate_config_strings(net_state)
def get_ignored_kernel_interface_names(self):
"""
Return a list of unmanged kernel interface names.
"""
ignored_ifaces = set()
for nm_dev in list_devices(self.client):
if (
nm_dev
and nm_dev.get_iface()
and not nm_dev.get_managed()
and _is_kernel_iface(nm_dev)
):
ignored_ifaces.add(nm_dev.get_iface())
return list(ignored_ifaces)
def _remove_ovs_bridge_unsupported_entries(iface_info):
"""
OVS bridges are not supporting several common interface key entries.
These entries are removed explicitly.
"""
iface_info.pop(Interface.IPV4, None)
iface_info.pop(Interface.IPV6, None)
iface_info.pop(Interface.MTU, None)
return iface_info
def _nm_utils_decode_version():
return f"{NM.MAJOR_VERSION}.{NM.MINOR_VERSION}.{NM.MICRO_VERSION}"
def _is_kernel_iface(nm_dev):
iface_type = get_iface_type(nm_dev)
return iface_type != InterfaceType.UNKNOWN and iface_type not in (
InterfaceType.OVS_BRIDGE,
InterfaceType.OVS_INTERFACE,
InterfaceType.OVS_PORT,
)