Blob Blame History Raw
# SPDX-License-Identifier: (GPL-2.0 OR Linux-OpenIB)
# Copyright (c) 2019 Mellanox Technologies, Inc. All rights reserved. See COPYING file

from tests.rdmacm_utils import sync_traffic, async_traffic, \
    async_traffic_with_ext_qp
from pyverbs.pyverbs_error import PyverbsError
from tests.base import RDMATestCase
import multiprocessing as mp
import pyverbs.device as d
import subprocess
import unittest
import json

NUM_OF_PROCESSES = 2


class CMTestCase(RDMATestCase):
    def setUp(self):
        if self.dev_name is not None:
            net_name = self.get_net_name(self.dev_name)
            try:
                self.ip_addr = self.get_ip_address(net_name)
            except KeyError:
                raise unittest.SkipTest('Device {} doesn\'t have net interface'
                                        .format(self.dev_name))
        else:
            dev_list = d.get_device_list()
            for dev in dev_list:
                net_name = self.get_net_name(dev.name.decode())
                try:
                    self.ip_addr = self.get_ip_address(net_name)
                except IndexError:
                    continue
                else:
                    self.dev_name = dev.name.decode()
                    break
            if self.dev_name is None:
                raise unittest.SkipTest('No devices with net interface')
        super().setUp()

    @staticmethod
    def get_net_name(dev):
        out = subprocess.check_output(['ls', '/sys/class/infiniband/{}/device/net/'
                                      .format(dev)])
        return out.decode().split('\n')[0]

    @staticmethod
    def get_ip_address(ifname):
        out = subprocess.check_output(['ip', '-j', 'addr', 'show', ifname])
        loaded_json = json.loads(out.decode())
        interface = loaded_json[0]['addr_info'][0]['local']
        if 'fe80::' in interface:
            interface = interface + '%' + ifname
        return interface

    @staticmethod
    def two_nodes_rdmacm_traffic(ip_addr, traffic_func):
        ctx = mp.get_context('fork')
        syncer = ctx.Barrier(NUM_OF_PROCESSES, timeout=5)
        notifier = ctx.Queue()
        passive = ctx.Process(target=traffic_func,
                              args=[ip_addr, syncer, notifier, True])
        active = ctx.Process(target=traffic_func,
                             args=[ip_addr, syncer, notifier, False])
        passive.start()
        active.start()
        while notifier.empty():
            pass

        for _ in range(NUM_OF_PROCESSES):
            res = notifier.get()
            if res is not None:
                passive.terminate()
                active.terminate()
                raise PyverbsError(res)

        passive.join()
        active.join()

    def test_rdmacm_sync_traffic(self):
        self.two_nodes_rdmacm_traffic(self.ip_addr, sync_traffic)

    def test_rdmacm_async_traffic(self):
        self.two_nodes_rdmacm_traffic(self.ip_addr, async_traffic)

    def test_rdmacm_async_traffic_external_qp(self):
        self.two_nodes_rdmacm_traffic(self.ip_addr, async_traffic_with_ext_qp)