Blame complib/cl_dispatcher.c

Packit 13e616
/*
Packit 13e616
 * Copyright (c) 2004-2006 Voltaire, Inc. All rights reserved.
Packit 13e616
 * Copyright (c) 2002-2005 Mellanox Technologies LTD. All rights reserved.
Packit 13e616
 * Copyright (c) 1996-2003 Intel Corporation. All rights reserved.
Packit 13e616
 *
Packit 13e616
 * This software is available to you under a choice of one of two
Packit 13e616
 * licenses.  You may choose to be licensed under the terms of the GNU
Packit 13e616
 * General Public License (GPL) Version 2, available from the file
Packit 13e616
 * COPYING in the main directory of this source tree, or the
Packit 13e616
 * OpenIB.org BSD license below:
Packit 13e616
 *
Packit 13e616
 *     Redistribution and use in source and binary forms, with or
Packit 13e616
 *     without modification, are permitted provided that the following
Packit 13e616
 *     conditions are met:
Packit 13e616
 *
Packit 13e616
 *      - Redistributions of source code must retain the above
Packit 13e616
 *        copyright notice, this list of conditions and the following
Packit 13e616
 *        disclaimer.
Packit 13e616
 *
Packit 13e616
 *      - Redistributions in binary form must reproduce the above
Packit 13e616
 *        copyright notice, this list of conditions and the following
Packit 13e616
 *        disclaimer in the documentation and/or other materials
Packit 13e616
 *        provided with the distribution.
Packit 13e616
 *
Packit 13e616
 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
Packit 13e616
 * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
Packit 13e616
 * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
Packit 13e616
 * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
Packit 13e616
 * BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
Packit 13e616
 * ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
Packit 13e616
 * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
Packit 13e616
 * SOFTWARE.
Packit 13e616
 *
Packit 13e616
 */
Packit 13e616
Packit 13e616
/*
Packit 13e616
 * Abstract:
Packit 13e616
 *    Implementation of Dispatcher abstraction.
Packit 13e616
 *
Packit 13e616
 */
Packit 13e616
Packit 13e616
#if HAVE_CONFIG_H
Packit 13e616
#  include <config.h>
Packit 13e616
#endif				/* HAVE_CONFIG_H */
Packit 13e616
Packit 13e616
#include <stdlib.h>
Packit 13e616
#include <complib/cl_dispatcher.h>
Packit 13e616
#include <complib/cl_thread.h>
Packit 13e616
#include <complib/cl_timer.h>
Packit 13e616
Packit 13e616
/* give some guidance when we build our cl_pool of messages */
Packit 13e616
#define CL_DISP_INITIAL_MSG_COUNT   256
Packit 13e616
#define CL_DISP_MSG_GROW_SIZE       64
Packit 13e616
Packit 13e616
/* give some guidance when we build our cl_pool of registration elements */
Packit 13e616
#define CL_DISP_INITIAL_REG_COUNT   16
Packit 13e616
#define CL_DISP_REG_GROW_SIZE       16
Packit 13e616
Packit 13e616
/********************************************************************
Packit 13e616
   __cl_disp_worker
Packit 13e616
Packit 13e616
   Description:
Packit 13e616
   This function takes messages off the FIFO and calls Processmsg()
Packit 13e616
   This function executes as passive level.
Packit 13e616
Packit 13e616
   Inputs:
Packit 13e616
   p_disp - Pointer to Dispatcher object
Packit 13e616
Packit 13e616
   Outputs:
Packit 13e616
   None
Packit 13e616
Packit 13e616
   Returns:
Packit 13e616
   None
Packit 13e616
********************************************************************/
Packit 13e616
void __cl_disp_worker(IN void *context)
Packit 13e616
{
Packit 13e616
	cl_disp_msg_t *p_msg;
Packit 13e616
	cl_dispatcher_t *p_disp = (cl_dispatcher_t *) context;
Packit 13e616
Packit 13e616
	cl_spinlock_acquire(&p_disp->lock);
Packit 13e616
Packit 13e616
	/* Process the FIFO until we drain it dry. */
Packit 13e616
	while (cl_qlist_count(&p_disp->msg_fifo)) {
Packit 13e616
		/* Pop the message at the head from the FIFO. */
Packit 13e616
		p_msg =
Packit 13e616
		    (cl_disp_msg_t *) cl_qlist_remove_head(&p_disp->msg_fifo);
Packit 13e616
Packit 13e616
		/* we track the time the last message spent in the queue */
Packit 13e616
		p_disp->last_msg_queue_time_us =
Packit 13e616
		    cl_get_time_stamp() - p_msg->in_time;
Packit 13e616
Packit 13e616
		/*
Packit 13e616
		 * Release the spinlock while the message is processed.
Packit 13e616
		 * The user's callback may reenter the dispatcher
Packit 13e616
		 * and cause the lock to be reaquired.
Packit 13e616
		 */
Packit 13e616
		cl_spinlock_release(&p_disp->lock);
Packit 13e616
		p_msg->p_dest_reg->pfn_rcv_callback((void *)p_msg->p_dest_reg->
Packit 13e616
						    context,
Packit 13e616
						    (void *)p_msg->p_data);
Packit 13e616
Packit 13e616
		cl_atomic_dec(&p_msg->p_dest_reg->ref_cnt);
Packit 13e616
Packit 13e616
		/* The client has seen the data.  Notify the sender as appropriate. */
Packit 13e616
		if (p_msg->pfn_xmt_callback) {
Packit 13e616
			p_msg->pfn_xmt_callback((void *)p_msg->context,
Packit 13e616
						(void *)p_msg->p_data);
Packit 13e616
			cl_atomic_dec(&p_msg->p_src_reg->ref_cnt);
Packit 13e616
		}
Packit 13e616
Packit 13e616
		/* Grab the lock for the next iteration through the list. */
Packit 13e616
		cl_spinlock_acquire(&p_disp->lock);
Packit 13e616
Packit 13e616
		/* Return this message to the pool. */
Packit 13e616
		cl_qpool_put(&p_disp->msg_pool, (cl_pool_item_t *) p_msg);
Packit 13e616
	}
Packit 13e616
Packit 13e616
	cl_spinlock_release(&p_disp->lock);
Packit 13e616
}
Packit 13e616
Packit 13e616
void cl_disp_construct(IN cl_dispatcher_t * const p_disp)
Packit 13e616
{
Packit 13e616
	CL_ASSERT(p_disp);
Packit 13e616
Packit 13e616
	cl_qlist_init(&p_disp->reg_list);
Packit 13e616
	cl_ptr_vector_construct(&p_disp->reg_vec);
Packit 13e616
	cl_qlist_init(&p_disp->msg_fifo);
Packit 13e616
	cl_spinlock_construct(&p_disp->lock);
Packit 13e616
	cl_qpool_construct(&p_disp->msg_pool);
Packit 13e616
}
Packit 13e616
Packit 13e616
void cl_disp_shutdown(IN cl_dispatcher_t * const p_disp)
Packit 13e616
{
Packit 13e616
	CL_ASSERT(p_disp);
Packit 13e616
Packit 13e616
	/* Stop the thread pool. */
Packit 13e616
	cl_thread_pool_destroy(&p_disp->worker_threads);
Packit 13e616
Packit 13e616
	/* Process all outstanding callbacks. */
Packit 13e616
	__cl_disp_worker(p_disp);
Packit 13e616
Packit 13e616
	/* Free all registration info. */
Packit 13e616
	while (!cl_is_qlist_empty(&p_disp->reg_list))
Packit 13e616
		free(cl_qlist_remove_head(&p_disp->reg_list));
Packit 13e616
}
Packit 13e616
Packit 13e616
void cl_disp_destroy(IN cl_dispatcher_t * const p_disp)
Packit 13e616
{
Packit 13e616
	CL_ASSERT(p_disp);
Packit 13e616
Packit 13e616
	cl_spinlock_destroy(&p_disp->lock);
Packit 13e616
	/* Destroy the message pool */
Packit 13e616
	cl_qpool_destroy(&p_disp->msg_pool);
Packit 13e616
	/* Destroy the pointer vector of registrants. */
Packit 13e616
	cl_ptr_vector_destroy(&p_disp->reg_vec);
Packit 13e616
}
Packit 13e616
Packit 13e616
cl_status_t cl_disp_init(IN cl_dispatcher_t * const p_disp,
Packit 13e616
			 IN const uint32_t thread_count,
Packit 13e616
			 IN const char *const name)
Packit 13e616
{
Packit 13e616
	cl_status_t status;
Packit 13e616
Packit 13e616
	CL_ASSERT(p_disp);
Packit 13e616
Packit 13e616
	cl_disp_construct(p_disp);
Packit 13e616
Packit 13e616
	status = cl_spinlock_init(&p_disp->lock);
Packit 13e616
	if (status != CL_SUCCESS) {
Packit 13e616
		cl_disp_destroy(p_disp);
Packit 13e616
		return (status);
Packit 13e616
	}
Packit 13e616
Packit 13e616
	/* Specify no upper limit to the number of messages in the pool */
Packit 13e616
	status = cl_qpool_init(&p_disp->msg_pool, CL_DISP_INITIAL_MSG_COUNT,
Packit 13e616
			       0, CL_DISP_MSG_GROW_SIZE, sizeof(cl_disp_msg_t),
Packit 13e616
			       NULL, NULL, NULL);
Packit 13e616
	if (status != CL_SUCCESS) {
Packit 13e616
		cl_disp_destroy(p_disp);
Packit 13e616
		return (status);
Packit 13e616
	}
Packit 13e616
Packit 13e616
	status = cl_ptr_vector_init(&p_disp->reg_vec, CL_DISP_INITIAL_REG_COUNT,
Packit 13e616
				    CL_DISP_REG_GROW_SIZE);
Packit 13e616
	if (status != CL_SUCCESS) {
Packit 13e616
		cl_disp_destroy(p_disp);
Packit 13e616
		return (status);
Packit 13e616
	}
Packit 13e616
Packit 13e616
	status = cl_thread_pool_init(&p_disp->worker_threads, thread_count,
Packit 13e616
				     __cl_disp_worker, p_disp, name);
Packit 13e616
	if (status != CL_SUCCESS)
Packit 13e616
		cl_disp_destroy(p_disp);
Packit 13e616
Packit 13e616
	return (status);
Packit 13e616
}
Packit 13e616
Packit 13e616
cl_disp_reg_handle_t cl_disp_register(IN cl_dispatcher_t * const p_disp,
Packit 13e616
				      IN const cl_disp_msgid_t msg_id,
Packit 13e616
				      IN cl_pfn_msgrcv_cb_t pfn_callback
Packit 13e616
				      OPTIONAL,
Packit 13e616
				      IN const void *const context OPTIONAL)
Packit 13e616
{
Packit 13e616
	cl_disp_reg_info_t *p_reg;
Packit 13e616
	cl_status_t status;
Packit 13e616
Packit 13e616
	CL_ASSERT(p_disp);
Packit 13e616
Packit 13e616
	/* Check that the requested registrant ID is available. */
Packit 13e616
	cl_spinlock_acquire(&p_disp->lock);
Packit 13e616
	if ((msg_id != CL_DISP_MSGID_NONE) &&
Packit 13e616
	    (msg_id < cl_ptr_vector_get_size(&p_disp->reg_vec)) &&
Packit 13e616
	    (cl_ptr_vector_get(&p_disp->reg_vec, msg_id))) {
Packit 13e616
		cl_spinlock_release(&p_disp->lock);
Packit 13e616
		return (NULL);
Packit 13e616
	}
Packit 13e616
Packit 13e616
	/* Get a registration info from the pool. */
Packit 13e616
	p_reg = (cl_disp_reg_info_t *) malloc(sizeof(cl_disp_reg_info_t));
Packit 13e616
	if (!p_reg) {
Packit 13e616
		cl_spinlock_release(&p_disp->lock);
Packit 13e616
		return (NULL);
Packit 13e616
	} else {
Packit 13e616
		memset(p_reg, 0, sizeof(cl_disp_reg_info_t));
Packit 13e616
	}
Packit 13e616
Packit 13e616
	p_reg->p_disp = p_disp;
Packit 13e616
	p_reg->ref_cnt = 0;
Packit 13e616
	p_reg->pfn_rcv_callback = pfn_callback;
Packit 13e616
	p_reg->context = context;
Packit 13e616
	p_reg->msg_id = msg_id;
Packit 13e616
Packit 13e616
	/* Insert the registration in the list. */
Packit 13e616
	cl_qlist_insert_tail(&p_disp->reg_list, (cl_list_item_t *) p_reg);
Packit 13e616
Packit 13e616
	/* Set the array entry to the registrant. */
Packit 13e616
	/* The ptr_vector grow automatically as necessary. */
Packit 13e616
	if (msg_id != CL_DISP_MSGID_NONE) {
Packit 13e616
		status = cl_ptr_vector_set(&p_disp->reg_vec, msg_id, p_reg);
Packit 13e616
		if (status != CL_SUCCESS) {
Packit 13e616
			free(p_reg);
Packit 13e616
			cl_spinlock_release(&p_disp->lock);
Packit 13e616
			return (NULL);
Packit 13e616
		}
Packit 13e616
	}
Packit 13e616
Packit 13e616
	cl_spinlock_release(&p_disp->lock);
Packit 13e616
Packit 13e616
	return (p_reg);
Packit 13e616
}
Packit 13e616
Packit 13e616
void cl_disp_unregister(IN const cl_disp_reg_handle_t handle)
Packit 13e616
{
Packit 13e616
	cl_disp_reg_info_t *p_reg;
Packit 13e616
	cl_dispatcher_t *p_disp;
Packit 13e616
Packit 13e616
	if (handle == CL_DISP_INVALID_HANDLE)
Packit 13e616
		return;
Packit 13e616
Packit 13e616
	p_reg = (cl_disp_reg_info_t *) handle;
Packit 13e616
	p_disp = p_reg->p_disp;
Packit 13e616
	CL_ASSERT(p_disp);
Packit 13e616
Packit 13e616
	cl_spinlock_acquire(&p_disp->lock);
Packit 13e616
	/*
Packit 13e616
	 * Clear the registrant vector entry.  This will cause any further
Packit 13e616
	 * post calls to fail.
Packit 13e616
	 */
Packit 13e616
	if (p_reg->msg_id != CL_DISP_MSGID_NONE) {
Packit 13e616
		CL_ASSERT(p_reg->msg_id <
Packit 13e616
			  cl_ptr_vector_get_size(&p_disp->reg_vec));
Packit 13e616
		cl_ptr_vector_set(&p_disp->reg_vec, p_reg->msg_id, NULL);
Packit 13e616
	}
Packit 13e616
	cl_spinlock_release(&p_disp->lock);
Packit 13e616
Packit 13e616
	while (p_reg->ref_cnt > 0)
Packit 13e616
		cl_thread_suspend(1);
Packit 13e616
Packit 13e616
	cl_spinlock_acquire(&p_disp->lock);
Packit 13e616
	/* Remove the registrant from the list. */
Packit 13e616
	cl_qlist_remove_item(&p_disp->reg_list, (cl_list_item_t *) p_reg);
Packit 13e616
	free(p_reg);
Packit 13e616
Packit 13e616
	cl_spinlock_release(&p_disp->lock);
Packit 13e616
}
Packit 13e616
Packit 13e616
cl_status_t cl_disp_post(IN const cl_disp_reg_handle_t handle,
Packit 13e616
			 IN const cl_disp_msgid_t msg_id,
Packit 13e616
			 IN const void *const p_data,
Packit 13e616
			 IN cl_pfn_msgdone_cb_t pfn_callback OPTIONAL,
Packit 13e616
			 IN const void *const context OPTIONAL)
Packit 13e616
{
Packit 13e616
	cl_disp_reg_info_t *p_src_reg = (cl_disp_reg_info_t *) handle;
Packit 13e616
	cl_disp_reg_info_t *p_dest_reg;
Packit 13e616
	cl_dispatcher_t *p_disp;
Packit 13e616
	cl_disp_msg_t *p_msg;
Packit 13e616
Packit 13e616
	p_disp = handle->p_disp;
Packit 13e616
	CL_ASSERT(p_disp);
Packit 13e616
	CL_ASSERT(msg_id != CL_DISP_MSGID_NONE);
Packit 13e616
Packit 13e616
	cl_spinlock_acquire(&p_disp->lock);
Packit 13e616
	/* Check that the recipient exists. */
Packit 13e616
	if (cl_ptr_vector_get_size(&p_disp->reg_vec) <= msg_id) {
Packit 13e616
		cl_spinlock_release(&p_disp->lock);
Packit 13e616
		return (CL_NOT_FOUND);
Packit 13e616
	}
Packit 13e616
Packit 13e616
	p_dest_reg = cl_ptr_vector_get(&p_disp->reg_vec, msg_id);
Packit 13e616
	if (!p_dest_reg) {
Packit 13e616
		cl_spinlock_release(&p_disp->lock);
Packit 13e616
		return (CL_NOT_FOUND);
Packit 13e616
	}
Packit 13e616
Packit 13e616
	/* Get a free message from the pool. */
Packit 13e616
	p_msg = (cl_disp_msg_t *) cl_qpool_get(&p_disp->msg_pool);
Packit 13e616
	if (!p_msg) {
Packit 13e616
		cl_spinlock_release(&p_disp->lock);
Packit 13e616
		return (CL_INSUFFICIENT_MEMORY);
Packit 13e616
	}
Packit 13e616
Packit 13e616
	/* Initialize the message */
Packit 13e616
	p_msg->p_src_reg = p_src_reg;
Packit 13e616
	p_msg->p_dest_reg = p_dest_reg;
Packit 13e616
	p_msg->p_data = p_data;
Packit 13e616
	p_msg->pfn_xmt_callback = pfn_callback;
Packit 13e616
	p_msg->context = context;
Packit 13e616
	p_msg->in_time = cl_get_time_stamp();
Packit 13e616
Packit 13e616
	/*
Packit 13e616
	 * Increment the sender's reference count if they request a completion
Packit 13e616
	 * notification.
Packit 13e616
	 */
Packit 13e616
	if (pfn_callback)
Packit 13e616
		cl_atomic_inc(&p_src_reg->ref_cnt);
Packit 13e616
Packit 13e616
	/* Increment the recipient's reference count. */
Packit 13e616
	cl_atomic_inc(&p_dest_reg->ref_cnt);
Packit 13e616
Packit 13e616
	/* Queue the message in the FIFO. */
Packit 13e616
	cl_qlist_insert_tail(&p_disp->msg_fifo, (cl_list_item_t *) p_msg);
Packit 13e616
	cl_spinlock_release(&p_disp->lock);
Packit 13e616
Packit 13e616
	/* Signal the thread pool that there is work to be done. */
Packit 13e616
	cl_thread_pool_signal(&p_disp->worker_threads);
Packit 13e616
	return (CL_SUCCESS);
Packit 13e616
}
Packit 13e616
Packit 13e616
void cl_disp_get_queue_status(IN const cl_disp_reg_handle_t handle,
Packit 13e616
			      OUT uint32_t * p_num_queued_msgs,
Packit 13e616
			      OUT uint64_t * p_last_msg_queue_time_ms)
Packit 13e616
{
Packit 13e616
	cl_dispatcher_t *p_disp = ((cl_disp_reg_info_t *) handle)->p_disp;
Packit 13e616
Packit 13e616
	cl_spinlock_acquire(&p_disp->lock);
Packit 13e616
Packit 13e616
	if (p_last_msg_queue_time_ms)
Packit 13e616
		*p_last_msg_queue_time_ms =
Packit 13e616
		    p_disp->last_msg_queue_time_us / 1000;
Packit 13e616
Packit 13e616
	if (p_num_queued_msgs)
Packit 13e616
		*p_num_queued_msgs = cl_qlist_count(&p_disp->msg_fifo);
Packit 13e616
Packit 13e616
	cl_spinlock_release(&p_disp->lock);
Packit 13e616
}