Blob Blame History Raw
# SPDX-License-Identifier: (GPL-2.0 OR Linux-OpenIB)
# Copyright (c) 2019 Mellanox Technologies, Inc. All rights reserved.  See COPYING file
"""
Provide some useful helper function for pyverbs rdmacm' tests.
"""
from tests.utils import validate, poll_cq, get_send_element, get_recv_wr
from pyverbs.pyverbs_error import PyverbsError
from tests.base import CMResources
from pyverbs.cmid import CMEvent
import pyverbs.cm_enums as ce
import os

events_dict = {ce.RDMA_CM_EVENT_ADDR_ERROR: 'Resolve Address Error',
               ce.RDMA_CM_EVENT_ROUTE_ERROR: 'Resolve Route Error',
               ce.RDMA_CM_EVENT_CONNECT_ERROR: 'Connection Error',
               ce.RDMA_CM_EVENT_UNREACHABLE: 'Node is Unreachable',
               ce.RDMA_CM_EVENT_REJECTED: 'Connection Rejected',
               ce.RDMA_CM_EVENT_DEVICE_REMOVAL: 'Device Removal',
               ce.RDMA_CM_EVENT_MULTICAST_JOIN: 'Multicast Join',
               ce.RDMA_CM_EVENT_MULTICAST_ERROR: 'Multicast Error',
               ce.RDMA_CM_EVENT_ADDR_CHANGE: 'Address Change',
               ce.RDMA_CM_EVENT_TIMEWAIT_EXIT: 'Time wait Exit'}


def _server_traffic_with_ext_qp(agr_obj, syncer):
    recv_wr = get_recv_wr(agr_obj)
    agr_obj.qp.post_recv(recv_wr)
    syncer.wait()
    for _ in range(agr_obj.num_msgs):
        poll_cq(agr_obj.cq)
        agr_obj.qp.post_recv(recv_wr)
        msg_received = agr_obj.mr.read(agr_obj.msg_size, 0)
        validate(msg_received, agr_obj.is_server, agr_obj.msg_size)
        send_wr = get_send_element(agr_obj, agr_obj.is_server)[0]
        agr_obj.qp.post_send(send_wr)
        poll_cq(agr_obj.cq)


def server_traffic(agr_obj, syncer):
    """
    RDMACM passive side traffic function which sends and receives a message, and
    then validates the received message. This operation is executed
    <agr_obj.num_msgs> times. If agr_obj.with_ext_qp is set, the traffic will
    use the external QP (agr_obj.qp).
    :param agr_obj: Aggregation object which contains all necessary resources
    :param syncer: multiprocessing.Barrier object for processes synchronization
    :return: None
    """
    if agr_obj.with_ext_qp:
        return _server_traffic_with_ext_qp(agr_obj, syncer)
    send_msg = agr_obj.msg_size * 's'
    cmid = agr_obj.child_id
    for _ in range(agr_obj.num_msgs):
        cmid.post_recv(agr_obj.mr)
        syncer.wait()
        syncer.wait()
        cmid.get_recv_comp()
        msg_received = agr_obj.mr.read(agr_obj.msg_size, 0)
        validate(msg_received, agr_obj.is_server, agr_obj.msg_size)
        agr_obj.mr.write(send_msg, agr_obj.msg_size)
        cmid.post_send(agr_obj.mr)
        cmid.get_send_comp()
        syncer.wait()


def _client_traffic_with_ext_qp(agr_obj, syncer):
    recv_wr = get_recv_wr(agr_obj)
    syncer.wait()
    for _ in range(agr_obj.num_msgs):
        send_wr = get_send_element(agr_obj, agr_obj.is_server)[0]
        agr_obj.qp.post_send(send_wr)
        poll_cq(agr_obj.cq)
        agr_obj.qp.post_recv(recv_wr)
        poll_cq(agr_obj.cq)
        msg_received = agr_obj.mr.read(agr_obj.msg_size, 0)
        validate(msg_received, agr_obj.is_server, agr_obj.msg_size)


def client_traffic(agr_obj, syncer):
    """
    RDMACM active side traffic function which sends and receives a message, and
    then validates the received message. This operation is executed
    <agr_obj.num_msgs> times. If agr_obj.with_ext_qp is set, the traffic will
    use the external QP (agr_obj.qp).
    :param agr_obj: Aggregation object which contains all necessary resources
    :param syncer: multiprocessing.Barrier object for processes synchronization
    :return: None
    """
    if agr_obj.with_ext_qp:
        return _client_traffic_with_ext_qp(agr_obj, syncer)
    send_msg = agr_obj.msg_size * 'c'
    cmid = agr_obj.cmid
    for _ in range(agr_obj.num_msgs):
        agr_obj.mr.write(send_msg, agr_obj.msg_size)
        syncer.wait()
        cmid.post_send(agr_obj.mr)
        cmid.get_send_comp()
        syncer.wait()
        cmid.post_recv(agr_obj.mr)
        syncer.wait()
        cmid.get_recv_comp()
        msg_received = agr_obj.mr.read(agr_obj.msg_size, 0)
        validate(msg_received, agr_obj.is_server, agr_obj.msg_size)


def event_handler(agr_obj):
    """
    Handle and execute corresponding API for RDMACM events of asynchronous
    communication
    :param agr_obj: Aggregation object which contains all necessary resources
    :return: None
    """
    cm_event = CMEvent(agr_obj.cmid.event_channel)
    if cm_event.event_type == ce.RDMA_CM_EVENT_ADDR_RESOLVED:
        agr_obj.cmid.resolve_route()
    elif cm_event.event_type == ce.RDMA_CM_EVENT_ROUTE_RESOLVED:
        agr_obj.create_qp()
        param = agr_obj.create_conn_param()
        if agr_obj.with_ext_qp:
            param.qpn = agr_obj.qp.qp_num
        agr_obj.cmid.connect(param)
    elif cm_event.event_type == ce.RDMA_CM_EVENT_CONNECT_REQUEST:
        agr_obj.create_child_id(cm_event)
        param = agr_obj.create_conn_param()
        agr_obj.create_qp()
        if agr_obj.with_ext_qp:
            agr_obj.modify_ext_qp_to_rts()
            param.qpn = agr_obj.qp.qp_num
        agr_obj.child_id.accept(param)
    elif cm_event.event_type == ce.RDMA_CM_EVENT_ESTABLISHED:
        agr_obj.connected = True
    elif cm_event.event_type == ce.RDMA_CM_EVENT_CONNECT_RESPONSE:
        agr_obj.connected = True
        if agr_obj.with_ext_qp:
            agr_obj.modify_ext_qp_to_rts()
        agr_obj.cmid.establish()
    elif cm_event.event_type == ce.RDMA_CM_EVENT_DISCONNECTED:
        if agr_obj.is_server:
            agr_obj.child_id.disconnect()
            agr_obj.connected = False
        else:
            agr_obj.cmid.disconnect()
            agr_obj.connected = False
    else:
        if cm_event.event_type in events_dict:
            raise PyverbsError('Unexpected event - {}'.format(
                               events_dict[cm_event.event_type]))
        else:
            raise PyverbsError('The event {} is not supported'.format(
                               cm_event.event_type))
    cm_event.ack_cm_event()


def sync_traffic(addr, syncer, notifier, is_server):
    """
    RDMACM synchronous data and control path which first establish a connection
    using RDMACM's synchronous API and then execute RDMACM synchronous traffic.
    :param addr: Address to connect to and to bind to
    :param syncer: multiprocessing.Barrier object for processes synchronization
    :param notifier: Notify parent process about any exceptions or success
    :param is_server: A flag which indicates if this is a server or client
    :return: None
    """
    try:
        if is_server:
            server = CMResources(src=addr)
            server.cmid.listen()
            syncer.wait()
            server.create_child_id()
            server.child_id.accept()
            server.create_mr()
            server_traffic(server, syncer)
            server.child_id.disconnect()
        else:
            client = CMResources(dst=addr)
            syncer.wait()
            client.cmid.connect()
            client.create_mr()
            client_traffic(client, syncer)
            client.cmid.disconnect()
    except Exception as ex:
        side = 'passive' if is_server else 'active'
        notifier.put('Caught exception in {side} side process: pid {pid}\n'
                     .format(side=side, pid=os.getpid()) +
                     'Exception message: {ex}'.format(ex=str(ex)))
    else:
        notifier.put(None)


def async_traffic_with_ext_qp(addr, syncer, notifier, is_server):
    return async_traffic(addr, syncer, notifier, is_server, True)


def async_traffic(addr, syncer, notifier, is_server, with_ext_qp=False):
    """
    RDMACM asynchronous data and control path function that first establishes a
    connection using RDMACM events API and then executes RDMACM asynchronous
    traffic.
    :param addr: Address to connect to and to bind to
    :param syncer: multiprocessing.Barrier object for processes synchronization
    :param notifier: Notify parent process about any exceptions or success
    :param is_server: A flag which indicates if this is a server or not
    :param with_ext_qp: If set, an external RC QP will be created and used by
                        RDMACM (default: False)
    :return: None
    """
    try:
        if is_server:
            server = CMResources(src=addr, is_async=True,
                                 with_ext_qp=with_ext_qp)
            listen_id = server.cmid
            listen_id.bind_addr(server.ai)
            listen_id.listen()
            syncer.wait()
            while not server.connected:
                event_handler(server)
            server.create_mr()
            server_traffic(server, syncer)
            server.child_id.disconnect()
        else:
            client = CMResources(src=addr, dst=addr, is_async=True,
                                 with_ext_qp=with_ext_qp)
            id = client.cmid
            id.resolve_addr(client.ai)
            syncer.wait()
            while not client.connected:
                event_handler(client)
            client.create_mr()
            client_traffic(client, syncer)
            event_handler(client)
    except Exception as ex:
        side = 'passive' if is_server else 'active'
        notifier.put('Caught exception in {side} side process: pid {pid}\n'
                     .format(side=side, pid=os.getpid()) +
                     'Exception message: {ex}'.format(ex=str(ex)))
    else:
        notifier.put(None)