#
# Copyright (c) 2020 Red Hat, Inc.
#
# This file is part of nmstate
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, either version 2.1 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
#
import logging
from operator import attrgetter
from libnmstate.schema import Interface
from libnmstate.schema import InterfaceIP
from libnmstate.schema import InterfaceIPv6
from libnmstate.schema import InterfaceState
from libnmstate.schema import InterfaceType
DEFAULT_MAC_ADDRESS = "00:00:00:00:00:00"
class NisporPluginBaseIface:
def __init__(self, np_iface):
self._np_iface = np_iface
@property
def np_iface(self):
return self._np_iface
@property
def mac(self):
if self._np_iface.mac_address:
return self._np_iface.mac_address.upper()
return DEFAULT_MAC_ADDRESS
@property
def mtu(self):
return self._np_iface.mtu
@property
def type(self):
return InterfaceType.UNKNOWN
@property
def state(self):
np_state = self._np_iface.state
np_flags = self._np_iface.flags
if np_state == "up" or "up" in np_flags or "running" in np_flags:
return InterfaceState.UP
elif np_state == "down":
return InterfaceState.DOWN
else:
logging.debug(
f"Got unexpect nispor interface state {np_state} for "
f"{self._np_iface}"
)
return InterfaceState.DOWN
def _ip_info(self, config_only):
return {
Interface.IPV4: NisporPlugintIpState(
Interface.IPV4, self.np_iface.ipv4
).to_dict(config_only),
Interface.IPV6: NisporPlugintIpState(
Interface.IPV6, self.np_iface.ipv6
).to_dict(config_only),
}
def to_dict(self, config_only):
iface_info = {
Interface.NAME: self.np_iface.name,
Interface.TYPE: self.type,
Interface.STATE: self.state,
Interface.MAC: self.mac,
}
if self.mtu:
iface_info[Interface.MTU] = self.mtu
ip_info = self._ip_info(config_only)
if ip_info:
iface_info.update(ip_info)
return iface_info
class NisporPlugintIpState:
def __init__(self, family, np_ip_state):
self._family = family
self._np_ip_state = np_ip_state
self._addresses = []
if np_ip_state:
self._addresses = sorted(
np_ip_state.addresses, key=attrgetter("address")
)
@property
def _is_ipv6(self):
return self._family == Interface.IPV6
def _has_dhcp_address(self):
return any(
_is_dhcp_addr(addr, self._is_ipv6) for addr in self._addresses
)
def _has_autoconf_address(self):
return self._is_ipv6 and any(
_is_autoconf_addr(addr) for addr in self._addresses
)
def to_dict(self, config_only):
if not self._addresses or not self._np_ip_state:
return {InterfaceIP.ENABLED: False, InterfaceIP.ADDRESS: []}
else:
if config_only:
addresses = [
addr
for addr in self._addresses
if not _is_autoconf_addr(addr)
and not _is_dhcp_addr(addr, self._is_ipv6)
]
else:
addresses = self._addresses
info = {
InterfaceIP.ENABLED: True,
InterfaceIP.ADDRESS: [
{
InterfaceIP.ADDRESS_IP: addr.address,
InterfaceIP.ADDRESS_PREFIX_LENGTH: addr.prefix_len,
}
for addr in addresses
],
}
if self._has_dhcp_address():
info[InterfaceIP.DHCP] = True
if self._has_autoconf_address():
info[InterfaceIPv6.AUTOCONF] = True
return info
def _is_dhcp_addr(np_addr, is_ipv6):
if is_ipv6:
return np_addr.valid_lft != "forever" and np_addr.prefix_len == 128
else:
return np_addr.valid_lft != "forever"
def _is_autoconf_addr(np_addr):
return np_addr.valid_lft != "forever" and np_addr.prefix_len == 64