#
# Copyright (c) 2019-2020 Red Hat, Inc.
#
# This file is part of nmstate
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, either version 2.1 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
#
import re
import subprocess
from libnmstate.error import NmstateNotSupportedError
from libnmstate.schema import Ethernet
from libnmstate.schema import Interface
from .common import NM
from .common import GLib
SRIOV_NMSTATE_TO_NM_MAP = {
Ethernet.SRIOV.VFS.MAC_ADDRESS: (
NM.SRIOV_VF_ATTRIBUTE_MAC,
GLib.Variant.new_string,
),
Ethernet.SRIOV.VFS.SPOOF_CHECK: (
NM.SRIOV_VF_ATTRIBUTE_SPOOF_CHECK,
GLib.Variant.new_boolean,
),
Ethernet.SRIOV.VFS.TRUST: (
NM.SRIOV_VF_ATTRIBUTE_TRUST,
GLib.Variant.new_boolean,
),
Ethernet.SRIOV.VFS.MIN_TX_RATE: (
NM.SRIOV_VF_ATTRIBUTE_MIN_TX_RATE,
GLib.Variant.new_uint32,
),
Ethernet.SRIOV.VFS.MAX_TX_RATE: (
NM.SRIOV_VF_ATTRIBUTE_MAX_TX_RATE,
GLib.Variant.new_uint32,
),
}
SRIOV_NMSTATE_TO_REGEX = {
Ethernet.SRIOV.VFS.MAC_ADDRESS: re.compile(
r"[a-fA-F0-9:]{17}|[a-fA-F0-9]{12}"
),
Ethernet.SRIOV.VFS.SPOOF_CHECK: re.compile(r"checking (on|off)"),
Ethernet.SRIOV.VFS.TRUST: re.compile(r"trust (on|off)"),
Ethernet.SRIOV.VFS.MIN_TX_RATE: re.compile(r"min_tx_rate ([0-9]+)"),
Ethernet.SRIOV.VFS.MAX_TX_RATE: re.compile(r"max_tx_rate ([0-9]+)"),
}
def create_setting(context, iface_state, base_con_profile):
sriov_setting = None
ifname = iface_state[Interface.NAME]
sriov_config = iface_state.get(Ethernet.CONFIG_SUBTREE, {}).get(
Ethernet.SRIOV_SUBTREE
)
if sriov_config:
if not _has_sriov_capability(context, ifname):
raise NmstateNotSupportedError(
f"Interface '{ifname}' does not support SR-IOV"
)
sriov_setting = base_con_profile.get_setting_duplicate(
NM.SETTING_SRIOV_SETTING_NAME
)
if not sriov_setting:
sriov_setting = NM.SettingSriov.new()
vfs_config = sriov_config.get(Ethernet.SRIOV.VFS_SUBTREE, [])
vf_object_ids = {vf.get_index() for vf in sriov_setting.props.vfs}
vf_config_ids = {
vf_config[Ethernet.SRIOV.VFS.ID] for vf_config in vfs_config
}
# As the user must do full edit of vfs, nmstate is deleting all the vfs
# and then adding all the vfs from the config.
for vf_id in _remove_sriov_vfs_in_setting(
vfs_config, sriov_setting, vf_object_ids
):
sriov_setting.remove_vf_by_index(vf_id)
for vf_object in _create_sriov_vfs_from_config(
vfs_config, sriov_setting, vf_config_ids
):
sriov_setting.add_vf(vf_object)
sriov_setting.props.total_vfs = sriov_config[Ethernet.SRIOV.TOTAL_VFS]
return sriov_setting
def _create_sriov_vfs_from_config(vfs_config, sriov_setting, vf_ids_to_add):
vfs_config_to_add = (
vf_config
for vf_config in vfs_config
if vf_config[Ethernet.SRIOV.VFS.ID] in vf_ids_to_add
)
for vf_config in vfs_config_to_add:
vf_id = vf_config.pop(Ethernet.SRIOV.VFS.ID)
vf_object = NM.SriovVF.new(vf_id)
for key, val in vf_config.items():
_set_nm_attribute(vf_object, key, val)
yield vf_object
def _set_nm_attribute(vf_object, key, value):
nm_attr, nm_variant = SRIOV_NMSTATE_TO_NM_MAP[key]
vf_object.set_attribute(nm_attr, nm_variant(value))
def _remove_sriov_vfs_in_setting(vfs_config, sriov_setting, vf_ids_to_remove):
for vf_id in vf_ids_to_remove:
yield vf_id
def _has_sriov_capability(context, ifname):
dev = context.get_nm_dev(ifname)
return dev and (NM.DeviceCapabilities.SRIOV & dev.props.capabilities)
def get_info(device):
"""
Provide the current active SR-IOV runtime values
"""
sriov_running_info = {}
ifname = device.get_iface()
numvf_path = f"/sys/class/net/{ifname}/device/sriov_numvfs"
try:
with open(numvf_path) as f:
sriov_running_info[Ethernet.SRIOV.TOTAL_VFS] = int(f.read())
except FileNotFoundError:
return sriov_running_info
if sriov_running_info[Ethernet.SRIOV.TOTAL_VFS]:
sriov_running_info[Ethernet.SRIOV.VFS_SUBTREE] = _get_sriov_vfs_info(
ifname
)
else:
sriov_running_info[Ethernet.SRIOV.VFS_SUBTREE] = []
return {Ethernet.SRIOV_SUBTREE: sriov_running_info}
def _get_sriov_vfs_info(ifname):
"""
This is a workaround to get the VFs configuration from runtime.
Ref: https://bugzilla.redhat.com/1777520
"""
proc = subprocess.run(
("ip", "link", "show", ifname),
stdout=subprocess.PIPE,
encoding="utf-8",
)
iplink_output = proc.stdout
# This is ignoring the first two line of the ip link output because they
# are about the PF and we don't need them.
vfs = iplink_output.splitlines(False)[2:]
vfs_config = [
vf_config for vf_config in _parse_ip_link_output_for_vfs(vfs)
]
return vfs_config
def _parse_ip_link_output_for_vfs(vfs):
for vf_id, vf in enumerate(vfs):
vf_config = _parse_ip_link_output_options_for_vf(vf)
vf_config[Ethernet.SRIOV.VFS.ID] = vf_id
yield vf_config
def _parse_ip_link_output_options_for_vf(vf):
vf_options = {}
for option, expr in SRIOV_NMSTATE_TO_REGEX.items():
match_expr = expr.search(vf)
if match_expr:
if option == Ethernet.SRIOV.VFS.MAC_ADDRESS:
value = match_expr.group(0).upper()
else:
value = match_expr.group(1)
if value.isdigit():
value = int(value)
elif value == "on":
value = True
elif value == "off":
value = False
vf_options[option] = value
return vf_options