#
# Copyright (c) 2020 Red Hat, Inc.
#
# This file is part of nmstate
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, either version 2.1 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
from contextlib import contextmanager
import os
import time
import yaml
import pytest
import libnmstate
from libnmstate.schema import Interface
from libnmstate.schema import InterfaceState
from libnmstate.schema import LLDP
from .testlib import assertlib
from .testlib import cmdlib
from .testlib import ifacelib
from .testlib import statelib
from .testlib.veth import create_veth_pair
from .testlib.veth import remove_veth_pair
LLDPTEST = "lldptest"
LLDPTEST_PEER = "lldptest.peer"
LLDP_TEST_NS = "nmstate_lldp_test"
LLDP_SYSTEM_DESC = (
"Summit300-48 - Version 7.4e.1 (Build 5) by Release_Master "
"05/27/05 04:53:11"
)
EXPECTED_LLDP_NEIGHBOR = """
- system-name: Summit300-48
type: 5
- system-description: Summit300-48 - Version 7.4e.1 (Build 5) by Release_Master
05/27/05 04:53:11
type: 6
- system-capabilities:
- MAC Bridge component
- Router
type: 7
- _description: MAC address
chassis-id: 00:01:30:F9:AD:A0
chassis-id-type: 4
type: 1
- _description: Interface name
port-id: 1/1
port-id-type: 5
type: 2
- ieee-802-1-vlans:
- name: v2-0488-03-0505
vid: 488
oui: 00:80:c2
subtype: 3
type: 127
- ieee-802-3-mac-phy-conf:
autoneg: true
operational-mau-type: 16
pmd-autoneg-cap: 27648
oui: 00:12:0f
subtype: 1
type: 127
- ieee-802-1-ppvids:
- 0
oui: 00:80:c2
subtype: 2
type: 127
- management-addresses:
- address: 00:01:30:F9:AD:A0
address-subtype: MAC
interface-number: 1001
interface-number-subtype: 2
type: 8
- ieee-802-3-max-frame-size: 1522
oui: 00:12:0f
subtype: 4
type: 127
"""
LLDP_CAPS = ["MAC Bridge component", "Router"]
CHASSIS_ID = "chassis-id"
CHASSIS_ID_TYPE = "chassis-id-type"
MANAGEMENT_ADDRESSES = "management-addresses"
ADDRESS = "address"
ADDRESS_SUBTYPE = "address-subtype"
INTERFACE_NUMBER = "interface-number"
INTERFACE_NUMBER_SUBTYPE = "interface-number-subtype"
AUTONEG = "autoneg"
OPERATIONAL_MAU_TYPE = "operational-mau-type"
PMD_AUTONEG_CAP = "pmd-autoneg-cap"
PPVIDS_SUBTREE = "ieee-802-1-ppvids"
PPVID = "ppvid"
VLANS_SUBTREE = "ieee-802-1-vlans"
NAME = "name"
PORT_ID = "port-id"
PORT_ID_TYPE = "port-id-type"
SYSTEM_NAME = "system-name"
SYSTEM_DESCRIPTION = "system-description"
SYSTEM_CAPABILITIES = "system-capabilities"
MFS_KEY = "ieee-802-3-max-frame-size"
MAC_PHY_SUBTREE = "ieee-802-3-mac-phy-conf"
LLDP_TEST_SYSTEM_NAME = "Summit300-48"
@pytest.fixture(scope="module")
def lldpiface_env():
try:
create_veth_pair(LLDPTEST, LLDPTEST_PEER, LLDP_TEST_NS)
yield
finally:
remove_veth_pair(LLDPTEST, LLDP_TEST_NS)
@pytest.fixture
def lldptest_up(lldpiface_env):
with ifacelib.iface_up(LLDPTEST) as ifstate:
yield ifstate
def test_enable_lldp(lldptest_up):
with lldp_enabled(lldptest_up) as dstate:
assertlib.assert_state_match(dstate)
def test_lldp_yaml(lldptest_up):
with lldp_enabled(lldptest_up):
_send_lldp_packet()
dstate = statelib.show_only((LLDPTEST,))
lldp_config = dstate[Interface.KEY][0][LLDP.CONFIG_SUBTREE]
assert len(lldp_config[LLDP.NEIGHBORS_SUBTREE]) == 1
test_neighbor = lldp_config[LLDP.NEIGHBORS_SUBTREE][0]
assert test_neighbor == yaml.safe_load(EXPECTED_LLDP_NEIGHBOR)
def test_lldp_system(lldptest_up):
with lldp_enabled(lldptest_up):
_send_lldp_packet()
dstate = statelib.show_only((LLDPTEST,))
lldp_config = dstate[Interface.KEY][0][LLDP.CONFIG_SUBTREE]
assert len(lldp_config[LLDP.NEIGHBORS_SUBTREE]) == 1
test_neighbor = lldp_config[LLDP.NEIGHBORS_SUBTREE][0]
seen = set()
for tlv in test_neighbor:
seen.add(tlv[LLDP.Neighbors.TLV_TYPE])
if tlv[LLDP.Neighbors.TLV_TYPE] == 5:
assert tlv[SYSTEM_NAME] == "Summit300-48"
elif tlv[LLDP.Neighbors.TLV_TYPE] == 6:
assert tlv[SYSTEM_DESCRIPTION] == LLDP_SYSTEM_DESC
elif tlv[LLDP.Neighbors.TLV_TYPE] == 7:
assert tlv[SYSTEM_CAPABILITIES] == LLDP_CAPS
assert set([5, 6, 7]).issubset(seen)
def test_lldp_chassis(lldptest_up):
with lldp_enabled(lldptest_up):
_send_lldp_packet()
dstate = statelib.show_only((LLDPTEST,))
lldp_config = dstate[Interface.KEY][0][LLDP.CONFIG_SUBTREE]
assert len(lldp_config[LLDP.NEIGHBORS_SUBTREE]) == 1
test_neighbor = lldp_config[LLDP.NEIGHBORS_SUBTREE][0]
tlvs = list(
filter(
lambda tlv: tlv[LLDP.Neighbors.TLV_TYPE] == 1, test_neighbor
)
)
assert len(tlvs) == 1
assert tlvs[0][CHASSIS_ID] == "00:01:30:F9:AD:A0"
assert tlvs[0][CHASSIS_ID_TYPE] == 4
def test_lldp_management_addresses(lldptest_up):
with lldp_enabled(lldptest_up):
_send_lldp_packet()
dstate = statelib.show_only((LLDPTEST,))
lldp_config = dstate[Interface.KEY][0][LLDP.CONFIG_SUBTREE]
assert len(lldp_config[LLDP.NEIGHBORS_SUBTREE]) == 1
test_neighbor = lldp_config[LLDP.NEIGHBORS_SUBTREE][0]
tlvs = list(
filter(
lambda tlv: tlv[LLDP.Neighbors.TLV_TYPE] == 8, test_neighbor
)
)
assert len(tlvs) == 1
test_mngt = tlvs[0][MANAGEMENT_ADDRESSES][0]
assert test_mngt[ADDRESS] == "00:01:30:F9:AD:A0"
assert test_mngt[ADDRESS_SUBTYPE] == "MAC"
assert test_mngt[INTERFACE_NUMBER] == 1001
assert test_mngt[INTERFACE_NUMBER_SUBTYPE] == 2
def test_lldp_macphy(lldptest_up):
with lldp_enabled(lldptest_up):
_send_lldp_packet()
dstate = statelib.show_only((LLDPTEST,))
lldp_config = dstate[Interface.KEY][0][LLDP.CONFIG_SUBTREE]
assert len(lldp_config[LLDP.NEIGHBORS_SUBTREE]) == 1
test_neighbor = lldp_config[LLDP.NEIGHBORS_SUBTREE][0]
tlvs = list(
filter(
lambda tlv: tlv[LLDP.Neighbors.TLV_TYPE] == 127
and tlv[LLDP.Neighbors.ORGANIZATION_CODE] == "00:12:0f"
and tlv[LLDP.Neighbors.TLV_SUBTYPE] == 1,
test_neighbor,
)
)
assert len(tlvs) == 1
assert tlvs[0][MAC_PHY_SUBTREE][AUTONEG] is True
assert tlvs[0][MAC_PHY_SUBTREE][OPERATIONAL_MAU_TYPE] == 16
assert tlvs[0][MAC_PHY_SUBTREE][PMD_AUTONEG_CAP] == 27648
def test_lldp_port(lldptest_up):
with lldp_enabled(lldptest_up):
_send_lldp_packet()
dstate = statelib.show_only((LLDPTEST,))
lldp_config = dstate[Interface.KEY][0][LLDP.CONFIG_SUBTREE]
assert len(lldp_config[LLDP.NEIGHBORS_SUBTREE]) == 1
test_neighbor = lldp_config[LLDP.NEIGHBORS_SUBTREE][0]
tlvs = list(
filter(
lambda tlv: tlv[LLDP.Neighbors.TLV_TYPE] == 2, test_neighbor
)
)
assert len(tlvs) == 1
assert tlvs[0][PORT_ID] == "1/1"
assert tlvs[0][PORT_ID_TYPE] == 5
assert tlvs[0][LLDP.Neighbors.DESCRIPTION] == "Interface name"
def test_lldp_port_vlan(lldptest_up):
with lldp_enabled(lldptest_up):
_send_lldp_packet()
dstate = statelib.show_only((LLDPTEST,))
lldp_config = dstate[Interface.KEY][0][LLDP.CONFIG_SUBTREE]
assert len(lldp_config[LLDP.NEIGHBORS_SUBTREE]) == 1
test_neighbor = lldp_config[LLDP.NEIGHBORS_SUBTREE][0]
tlvs = list(
filter(
lambda tlv: tlv[LLDP.Neighbors.TLV_TYPE] == 127
and tlv[LLDP.Neighbors.ORGANIZATION_CODE] == "00:80:c2"
and tlv[LLDP.Neighbors.TLV_SUBTYPE] == 2,
test_neighbor,
)
)
assert len(tlvs) == 1
assert tlvs[0][PPVIDS_SUBTREE][0] == 0
def test_lldp_vlan(lldptest_up):
with lldp_enabled(lldptest_up):
_send_lldp_packet()
dstate = statelib.show_only((LLDPTEST,))
lldp_config = dstate[Interface.KEY][0][LLDP.CONFIG_SUBTREE]
assert len(lldp_config[LLDP.NEIGHBORS_SUBTREE]) == 1
test_neighbor = lldp_config[LLDP.NEIGHBORS_SUBTREE][0]
tlvs = list(
filter(
lambda tlv: tlv[LLDP.Neighbors.TLV_TYPE] == 127
and tlv[LLDP.Neighbors.ORGANIZATION_CODE] == "00:80:c2"
and tlv[LLDP.Neighbors.TLV_SUBTYPE] == 3,
test_neighbor,
)
)
assert len(tlvs) == 1
test_vlan = tlvs[0][VLANS_SUBTREE][0]
assert test_vlan[NAME] == "v2-0488-03-0505"
def test_lldp_mfs(lldptest_up):
with lldp_enabled(lldptest_up):
_send_lldp_packet()
dstate = statelib.show_only((LLDPTEST,))
lldp_config = dstate[Interface.KEY][0][LLDP.CONFIG_SUBTREE]
assert len(lldp_config[LLDP.NEIGHBORS_SUBTREE]) == 1
test_neighbor = lldp_config[LLDP.NEIGHBORS_SUBTREE][0]
tlvs = list(
filter(
lambda tlv: tlv[LLDP.Neighbors.TLV_TYPE] == 127
and tlv[LLDP.Neighbors.ORGANIZATION_CODE] == "00:12:0f"
and tlv[LLDP.Neighbors.TLV_SUBTYPE] == 4,
test_neighbor,
)
)
assert len(tlvs) == 1
assert tlvs[0][MFS_KEY] == 1522
def test_lldp_empty_neighbors(lldptest_up):
with lldp_enabled(lldptest_up):
dstate = statelib.show_only((LLDPTEST,))
lldp_state = dstate[Interface.KEY][0][LLDP.CONFIG_SUBTREE]
assert not lldp_state.get(LLDP.NEIGHBORS_SUBTREE, [])
def test_show_running_config_has_no_lldp_neighbor(lldptest_up):
with lldp_enabled(lldptest_up):
_send_lldp_packet()
dstate = statelib.show_only((LLDPTEST,))
lldp_config = dstate[Interface.KEY][0][LLDP.CONFIG_SUBTREE]
assert len(lldp_config[LLDP.NEIGHBORS_SUBTREE]) == 1
running_config = libnmstate.show_running_config()
for iface_config in running_config[Interface.KEY]:
if iface_config[Interface.NAME] == LLDPTEST:
lldp_iface_config = iface_config
break
assert lldp_iface_config[LLDP.CONFIG_SUBTREE][LLDP.ENABLED]
assert (
LLDP.NEIGHBORS_SUBTREE
not in lldp_iface_config[LLDP.CONFIG_SUBTREE]
)
@contextmanager
def lldp_enabled(ifstate):
lldp_config = ifstate[Interface.KEY][0][LLDP.CONFIG_SUBTREE]
lldp_config[LLDP.ENABLED] = True
libnmstate.apply(ifstate)
try:
yield ifstate
finally:
lldp_config[LLDP.ENABLED] = False
libnmstate.apply(ifstate)
def _send_lldp_packet():
test_dir = os.path.dirname(os.path.realpath(__file__))
cmdlib.exec_cmd(
f"ip netns exec {LLDP_TEST_NS} "
f"tcpreplay --intf1={LLDPTEST_PEER} "
f"{test_dir}/test_captures/lldp.pcap".split(),
check=True,
)
time.sleep(1)
@pytest.fixture
def eth1_up_with_lldp(eth1_up):
libnmstate.apply(
{
Interface.KEY: [
{
Interface.NAME: "eth1",
LLDP.CONFIG_SUBTREE: {LLDP.ENABLED: True},
}
]
}
)
def test_show_saved_config_ethernet_down_with_lldp(eth1_up_with_lldp):
running_state = statelib.show_only(("eth1",))
libnmstate.apply(
{
Interface.KEY: [
{
Interface.NAME: "eth1",
Interface.STATE: InterfaceState.DOWN,
}
]
}
)
saved_state = statelib.show_saved_config_only(("eth1",))
assert saved_state[Interface.KEY][0][LLDP.CONFIG_SUBTREE] == {
LLDP.ENABLED: True
}
assertlib.assert_state_match_full(saved_state, running_state)