# SPDX-License-Identifier: (GPL-2.0 OR Linux-OpenIB)
# Copyright (c) 2019, Mellanox Technologies. All rights reserved.
import weakref
from pyverbs.pyverbs_error import PyverbsError, PyverbsRDMAError
from pyverbs.base import PyverbsRDMAErrno
from pyverbs.pd cimport PD, ParentDomain
from pyverbs.base cimport close_weakrefs
cimport pyverbs.libibverbs_enums as e
from pyverbs.device cimport Context
from pyverbs.srq cimport SRQ
from pyverbs.qp cimport QP
cdef class CompChannel(PyverbsCM):
"""
A completion channel is a file descriptor used to deliver completion
notifications to a userspace process. When a completion event is generated
for a CQ, the event is delivered via the completion channel attached to the
CQ.
"""
def __init__(self, Context context not None):
"""
Initializes a completion channel object on the given device.
:param context: The device's context to use
:return: A CompChannel object on success
"""
super().__init__()
self.cc = v.ibv_create_comp_channel(context.context)
if self.cc == NULL:
raise PyverbsRDMAErrno('Failed to create a completion channel')
self.context = context
context.add_ref(self)
self.cqs = weakref.WeakSet()
self.logger.debug('Created a Completion Channel')
def __dealloc__(self):
self.close()
cpdef close(self):
if self.cc != NULL:
self.logger.debug('Closing completion channel')
close_weakrefs([self.cqs])
rc = v.ibv_destroy_comp_channel(self.cc)
if rc != 0:
raise PyverbsRDMAError('Failed to destroy a completion channel',
rc)
self.cc = NULL
def get_cq_event(self, CQ expected_cq):
"""
Waits for the next completion event in the completion event channel
:param expected_cq: The CQ that is expected to get the event
:return: None
"""
cdef v.ibv_cq *cq
cdef void *ctx
rc = v.ibv_get_cq_event(self.cc, &cq, &ctx)
if rc != 0:
raise PyverbsRDMAErrno('Failed to get CQ event')
if cq != expected_cq.cq:
raise PyverbsRDMAErrno('Received event on an unexpected CQ')
expected_cq.num_events += 1
cdef add_ref(self, obj):
if isinstance(obj, CQ) or isinstance(obj, CQEX):
self.cqs.add(obj)
cdef class CQ(PyverbsCM):
"""
A Completion Queue is the notification mechanism for work request
completions. A CQ can have 0 or more associated QPs.
"""
def __init__(self, Context context not None, cqe, cq_context=None,
CompChannel channel=None, comp_vector=0):
"""
Initializes a CQ object with the given parameters.
:param context: The device's context on which to open the CQ
:param cqe: CQ's capacity
:param cq_context: User context's pointer
:param channel: If set, will be used to return completion events
:param comp_vector: Will be used for signaling completion events.
Must be larger than 0 and smaller than the
context's num_comp_vectors
:return: The newly created CQ
"""
super().__init__()
if channel is not None:
self.cq = v.ibv_create_cq(context.context, cqe, <void*>cq_context,
channel.cc, comp_vector)
channel.add_ref(self)
self.channel = channel
else:
self.cq = v.ibv_create_cq(context.context, cqe, <void*>cq_context,
NULL, comp_vector)
self.channel = None
if self.cq == NULL:
raise PyverbsRDMAErrno('Failed to create a CQ')
self.context = context
context.add_ref(self)
self.qps = weakref.WeakSet()
self.srqs = weakref.WeakSet()
self.num_events = 0
self.logger.debug('Created a CQ')
cdef add_ref(self, obj):
if isinstance(obj, QP):
self.qps.add(obj)
elif isinstance(obj, SRQ):
self.srqs.add(obj)
else:
raise PyverbsError('Unrecognized object type')
def __dealloc__(self):
self.close()
cpdef close(self):
if self.cq != NULL:
self.logger.debug('Closing CQ')
close_weakrefs([self.qps, self.srqs])
if self.num_events:
self.ack_events(self.num_events)
rc = v.ibv_destroy_cq(self.cq)
if rc != 0:
raise PyverbsRDMAError('Failed to close CQ', rc)
self.cq = NULL
self.context = None
self.channel = None
def poll(self, num_entries=1):
"""
Polls the CQ for completions.
:param num_entries: number of completions to pull
:return: (npolled, wcs): The number of polled completions and an array
of the polled completions
"""
cdef v.ibv_wc wc
wcs = []
npolled = 0
while npolled < num_entries:
rc = v.ibv_poll_cq(self.cq, 1, &wc)
if rc < 0:
raise PyverbsRDMAErrno('Failed to poll CQ')
if rc == 0:
break;
npolled += 1
wcs.append(WC(wr_id=wc.wr_id, status=wc.status, opcode=wc.opcode,
vendor_err=wc.vendor_err, byte_len=wc.byte_len,
qp_num=wc.qp_num, src_qp=wc.src_qp,
imm_data=wc.imm_data, wc_flags=wc.wc_flags,
pkey_index=wc.pkey_index, slid=wc.slid, sl=wc.sl,
dlid_path_bits=wc.dlid_path_bits))
return npolled, wcs
def req_notify(self, solicited_only = False):
"""
Request completion notification on the completion queue.
:param solicited_only: If non-zero, notifications will be created only
for incoming send / RDMA write WRs with
immediate data that have the solicited bit set in
their send flags.
:return: None
"""
rc = v.ibv_req_notify_cq(self.cq, solicited_only)
if rc != 0:
raise PyverbsRDMAErrno('Request notify CQ returned {rc}'.
format(rc=rc))
def ack_events(self, num_events):
"""
Get and acknowledge CQ events
:param num_events: Number of events to acknowledge
:return: None
"""
v.ibv_ack_cq_events(self.cq, num_events)
self.num_events -= num_events
def __str__(self):
print_format = '{:22}: {:<20}\n'
return 'CQ\n' +\
print_format.format('Handle', self.cq.handle) +\
print_format.format('CQEs', self.cq.cqe)
@property
def comp_channel(self):
return self.channel
cdef class CqInitAttrEx(PyverbsObject):
def __init__(self, cqe = 100, CompChannel channel = None, comp_vector = 0,
wc_flags = 0, comp_mask = 0, flags = 0, PD parent_domain = None):
"""
Initializes a CqInitAttrEx object with the given parameters.
:param cqe: CQ's capacity
:param channel: If set, will be used to return completion events
:param comp_vector: Will be used for signaling completion events.
Must be larger than 0 and smaller than the
context's num_comp_vectors
:param wc_flags: The wc_flags that should be returned in ibv_poll_cq_ex.
Or'ed bit of enum ibv_wc_flags_ex.
:param comp_mask: compatibility mask (extended verb)
:param flags: create cq attr flags - one or more flags from
ibv_create_cq_attr_flags enum
:param parent_domain: If set, will be used to custom alloc cq buffers.
:return:
"""
super().__init__()
self.attr.cqe = cqe
self.attr.cq_context = NULL
self.attr.channel = NULL if channel is None else channel.cc
self.attr.comp_vector = comp_vector
self.attr.wc_flags = wc_flags
self.attr.comp_mask = comp_mask
self.attr.flags = flags
self.attr.parent_domain = NULL if parent_domain is None else parent_domain.pd
self.channel = channel
self.parent_domain = parent_domain
@property
def cqe(self):
return self.attr.cqe
@cqe.setter
def cqe(self, val):
self.attr.cqe = val
# Setter-only properties require the older syntax
property cq_context:
def __set__(self, val):
self.attr.cq_context = <void*>val
@property
def parent_domain(self):
return self.parent_domain
@parent_domain.setter
def parent_domain(self, PD val):
self.parent_domain = val
self.attr.parent_domain = val.pd
@property
def comp_channel(self):
return self.channel
@comp_channel.setter
def comp_channel(self, CompChannel val):
self.channel = val
self.attr.channel = val.cc
@property
def comp_vector(self):
return self.attr.comp_vector
@comp_vector.setter
def comp_vector(self, val):
self.attr.comp_vector = val
@property
def wc_flags(self):
return self.attr.wc_flags
@wc_flags.setter
def wc_flags(self, val):
self.attr.wc_flags = val
@property
def comp_mask(self):
return self.attr.comp_mask
@comp_mask.setter
def comp_mask(self, val):
self.attr.comp_mask = val
@property
def flags(self):
return self.attr.flags
@flags.setter
def flags(self, val):
self.attr.flags = val
def __str__(self):
print_format = '{:22}: {:<20}\n'
return print_format.format('Number of CQEs', self.cqe) +\
print_format.format('WC flags', create_wc_flags_to_str(self.wc_flags)) +\
print_format.format('comp mask', self.comp_mask) +\
print_format.format('flags', self.flags)
cdef class CQEX(PyverbsCM):
def __init__(self, Context context not None, CqInitAttrEx init_attr):
"""
Initializes a CQEX object on the given device's context with the given
attributes.
:param context: The device's context on which to open the CQ
:param init_attr: Initial attributes that describe the CQ
:return: The newly created CQEX on success
"""
super().__init__()
self.qps = weakref.WeakSet()
self.srqs = weakref.WeakSet()
if self.cq != NULL:
# Leave CQ initialization to the provider
return
if init_attr is None:
init_attr = CqInitAttrEx()
self.cq = v.ibv_create_cq_ex(context.context, &init_attr.attr)
if init_attr.comp_channel:
init_attr.comp_channel.add_ref(self)
if init_attr.parent_domain:
(<ParentDomain>init_attr.parent_domain).add_ref(self)
if self.cq == NULL:
raise PyverbsRDMAErrno('Failed to create extended CQ')
self.ibv_cq = v.ibv_cq_ex_to_cq(self.cq)
self.context = context
context.add_ref(self)
cdef add_ref(self, obj):
if isinstance(obj, QP):
self.qps.add(obj)
elif isinstance(obj, SRQ):
self.srqs.add(obj)
else:
raise PyverbsError('Unrecognized object type')
def __dealloc__(self):
self.close()
cpdef close(self):
if self.cq != NULL:
self.logger.debug('Closing CQEx')
close_weakrefs([self.srqs, self.qps])
rc = v.ibv_destroy_cq(<v.ibv_cq*>self.cq)
if rc != 0:
raise PyverbsRDMAError('Failed to destroy CQEX', rc)
self.cq = NULL
self.context = None
def start_poll(self, PollCqAttr attr):
"""
Start polling a batch of work completions.
:param attr: For easy future extensions
:return: 0 on success, ENOENT when no completions are available
"""
if attr is None:
attr = PollCqAttr()
return v.ibv_start_poll(self.cq, &attr.attr)
def poll_next(self):
"""
Get the next work completion.
:return: 0 on success, ENOENT when no completions are available
"""
return v.ibv_next_poll(self.cq)
def end_poll(self):
"""
Indicates the end of polling batch of work completions
:return: None
"""
return v.ibv_end_poll(self.cq)
def read_opcode(self):
return v.ibv_wc_read_opcode(self.cq)
def read_vendor_err(self):
return v.ibv_wc_read_vendor_err(self.cq)
def read_byte_len(self):
return v.ibv_wc_read_byte_len(self.cq)
def read_imm_data(self):
return v.ibv_wc_read_imm_data(self.cq)
def read_qp_num(self):
return v.ibv_wc_read_qp_num(self.cq)
def read_src_qp(self):
return v.ibv_wc_read_src_qp(self.cq)
def read_wc_flags(self):
return v.ibv_wc_read_wc_flags(self.cq)
def read_slid(self):
return v.ibv_wc_read_slid(self.cq)
def read_sl(self):
return v.ibv_wc_read_sl(self.cq)
def read_dlid_path_bits(self):
return v.ibv_wc_read_dlid_path_bits(self.cq)
def read_timestamp(self):
return v.ibv_wc_read_completion_ts(self.cq)
def read_cvlan(self):
return v.ibv_wc_read_cvlan(self.cq)
def read_flow_tag(self):
return v.ibv_wc_read_flow_tag(self.cq)
def read_tm_info(self):
info = WcTmInfo()
v.ibv_wc_read_tm_info(self.cq, &info.info)
return info
def read_completion_wallclock_ns(self):
return v.ibv_wc_read_completion_wallclock_ns(self.cq)
@property
def status(self):
return self.cq.status
@status.setter
def status(self, val):
self.cq.status = val
@property
def wr_id(self):
return self.cq.wr_id
@wr_id.setter
def wr_id(self, val):
self.cq.wr_id = val
def __str__(self):
print_format = '{:<22}: {:<20}\n'
return 'Extended CQ:\n' +\
print_format.format('Handle', self.cq.handle) +\
print_format.format('CQEs', self.cq.cqe)
cdef class WC(PyverbsObject):
def __init__(self, wr_id=0, status=0, opcode=0, vendor_err=0, byte_len=0,
qp_num=0, src_qp=0, imm_data=0, wc_flags=0, pkey_index=0,
slid=0, sl=0, dlid_path_bits=0):
super().__init__()
self.wc.wr_id = wr_id
self.wc.status = status
self.wc.opcode = opcode
self.wc.vendor_err = vendor_err
self.wc.byte_len = byte_len
self.wc.qp_num = qp_num
self.wc.src_qp = src_qp
self.wc.wc_flags = wc_flags
self.wc.pkey_index = pkey_index
self.wc.slid = slid
self.wc.imm_data = imm_data
self.wc.sl = sl
self.wc.dlid_path_bits = dlid_path_bits
@property
def wr_id(self):
return self.wc.wr_id
@wr_id.setter
def wr_id(self, val):
self.wc.wr_id = val
@property
def status(self):
return self.wc.status
@status.setter
def status(self, val):
self.wc.status = val
@property
def opcode(self):
return self.wc.opcode
@opcode.setter
def opcode(self, val):
self.wc.opcode = val
@property
def vendor_err(self):
return self.wc.vendor_err
@vendor_err.setter
def vendor_err(self, val):
self.wc.vendor_err = val
@property
def byte_len(self):
return self.wc.byte_len
@byte_len.setter
def byte_len(self, val):
self.wc.byte_len = val
@property
def qp_num(self):
return self.wc.qp_num
@qp_num.setter
def qp_num(self, val):
self.wc.qp_num = val
@property
def src_qp(self):
return self.wc.src_qp
@src_qp.setter
def src_qp(self, val):
self.wc.src_qp = val
@property
def wc_flags(self):
return self.wc.wc_flags
@wc_flags.setter
def wc_flags(self, val):
self.wc.wc_flags = val
@property
def pkey_index(self):
return self.wc.pkey_index
@pkey_index.setter
def pkey_index(self, val):
self.wc.pkey_index = val
@property
def slid(self):
return self.wc.slid
@slid.setter
def slid(self, val):
self.wc.slid = val
@property
def sl(self):
return self.wc.sl
@sl.setter
def sl(self, val):
self.wc.sl = val
@property
def imm_data(self):
return self.wc.imm_data
@imm_data.setter
def imm_data(self, val):
self.wc.imm_data = val
@property
def dlid_path_bits(self):
return self.wc.dlid_path_bits
@dlid_path_bits.setter
def dlid_path_bits(self, val):
self.wc.dlid_path_bits = val
def __str__(self):
print_format = '{:22}: {:<20}\n'
return print_format.format('WR ID', self.wr_id) +\
print_format.format('status', cqe_status_to_str(self.status)) +\
print_format.format('opcode', cqe_opcode_to_str(self.opcode)) +\
print_format.format('vendor error', self.vendor_err) +\
print_format.format('byte length', self.byte_len) +\
print_format.format('QP num', self.qp_num) +\
print_format.format('source QP', self.src_qp) +\
print_format.format('WC flags', cqe_flags_to_str(self.wc_flags)) +\
print_format.format('pkey index', self.pkey_index) +\
print_format.format('slid', self.slid) +\
print_format.format('sl', self.sl) +\
print_format.format('imm_data', self.imm_data) +\
print_format.format('dlid path bits', self.dlid_path_bits)
cdef class PollCqAttr(PyverbsObject):
@property
def comp_mask(self):
return self.attr.comp_mask
@comp_mask.setter
def comp_mask(self, val):
self.attr.comp_mask = val
cdef class WcTmInfo(PyverbsObject):
@property
def tag(self):
return self.info.tag
@tag.setter
def tag(self, val):
self.info.tag = val
@property
def priv(self):
return self.info.priv
@priv.setter
def priv(self, val):
self.info.priv = val
def cqe_status_to_str(status):
try:
return {e.IBV_WC_SUCCESS: "success",
e.IBV_WC_LOC_LEN_ERR: "local length error",
e.IBV_WC_LOC_QP_OP_ERR: "local QP op error",
e.IBV_WC_LOC_EEC_OP_ERR: "local EEC op error",
e.IBV_WC_LOC_PROT_ERR: "local protection error",
e.IBV_WC_WR_FLUSH_ERR: "WR flush error",
e.IBV_WC_MW_BIND_ERR: "memory window bind error",
e.IBV_WC_BAD_RESP_ERR: "bad response error",
e.IBV_WC_LOC_ACCESS_ERR: "local access error",
e.IBV_WC_REM_INV_REQ_ERR: "remote invalidate request error",
e.IBV_WC_REM_ACCESS_ERR: "remote access error",
e.IBV_WC_REM_OP_ERR: "remote op error",
e.IBV_WC_RETRY_EXC_ERR: "retry exceeded error",
e.IBV_WC_RNR_RETRY_EXC_ERR: "RNR retry exceeded",
e.IBV_WC_LOC_RDD_VIOL_ERR: "local RDD violation error",
e.IBV_WC_REM_INV_RD_REQ_ERR: "remote invalidate RD request error",
e.IBV_WC_REM_ABORT_ERR: "remote abort error",
e.IBV_WC_INV_EECN_ERR: "invalidate EECN error",
e.IBV_WC_INV_EEC_STATE_ERR: "invalidate EEC state error",
e.IBV_WC_FATAL_ERR: "WC fatal error",
e.IBV_WC_RESP_TIMEOUT_ERR: "response timeout error",
e.IBV_WC_GENERAL_ERR: "general error"}[status]
except KeyError:
return "Unknown CQE status"
def cqe_opcode_to_str(opcode):
try:
return {0x0: "Send", 0x1:"RDMA write", 0x2: "RDMA read",
0x3: "Compare and swap", 0x4: "Fetch and add",
0x5: "Bind Memory window", 0x6: "Local invalidate",
0x7: "TSO", 0x80: "Receive",
0x81: "Receive RDMA with immediate",
0x82: "Tag matching - add", 0x83: "Tag matching - delete",
0x84: "Tag matching - sync", 0x85: "Tag matching - receive",
0x86: "Tag matching - no tag"}[opcode]
except KeyError:
return "Unknown CQE opcode {op}".format(op=opcode)
def flags_to_str(flags, dictionary):
flags_str = ""
for f in dictionary:
if flags & f:
flags_str += dictionary[f]
flags_str += " "
return flags_str
def cqe_flags_to_str(flags):
cqe_flags = {1: "GRH", 2: "With immediate", 4: "IP csum OK",
8: "With invalidate", 16: "TM sync request", 32: "TM match",
64: "TM data valid"}
return flags_to_str(flags, cqe_flags)
def create_wc_flags_to_str(flags):
cqe_flags = {e.IBV_WC_EX_WITH_BYTE_LEN: 'IBV_WC_EX_WITH_BYTE_LEN',
e.IBV_WC_EX_WITH_IMM: 'IBV_WC_EX_WITH_IMM',
e.IBV_WC_EX_WITH_QP_NUM: 'IBV_WC_EX_WITH_QP_NUM',
e.IBV_WC_EX_WITH_SRC_QP: 'IBV_WC_EX_WITH_SRC_QP',
e.IBV_WC_EX_WITH_SLID: 'IBV_WC_EX_WITH_SLID',
e.IBV_WC_EX_WITH_SL: 'IBV_WC_EX_WITH_SL',
e.IBV_WC_EX_WITH_DLID_PATH_BITS: 'IBV_WC_EX_WITH_DLID_PATH_BITS',
e.IBV_WC_EX_WITH_COMPLETION_TIMESTAMP: 'IBV_WC_EX_WITH_COMPLETION_TIMESTAMP',
e.IBV_WC_EX_WITH_CVLAN: 'IBV_WC_EX_WITH_CVLAN',
e.IBV_WC_EX_WITH_FLOW_TAG: 'IBV_WC_EX_WITH_FLOW_TAG',
e.IBV_WC_EX_WITH_COMPLETION_TIMESTAMP_WALLCLOCK: 'IBV_WC_EX_WITH_COMPLETION_TIMESTAMP_WALLCLOCK'}
return flags_to_str(flags, cqe_flags)