// Copyright(c) 2018-2020, Intel Corporation
//
// Redistribution and use in source and binary forms, with or without
// modification, are permitted provided that the following conditions are met:
//
// * Redistributions of source code must retain the above copyright notice,
// this list of conditions and the following disclaimer.
// * Redistributions in binary form must reproduce the above copyright notice,
// this list of conditions and the following disclaimer in the documentation
// and/or other materials provided with the distribution.
// * Neither the name of Intel Corporation nor the names of its contributors
// may be used to endorse or promote products derived from this software
// without specific prior written permission.
//
// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
// AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
// IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
// ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
// LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
// CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
// SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
// INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
// CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
// ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
// POSSIBILITY OF SUCH DAMAGE.
#ifdef HAVE_CONFIG_H
#include <config.h>
#endif // HAVE_CONFIG_H
#include <semaphore.h>
#include <time.h>
#include <inttypes.h>
#include "event_dispatcher_thread.h"
#ifdef LOG
#undef LOG
#endif
#define LOG(format, ...) \
log_printf("event_dispatcher_thread: " format, ##__VA_ARGS__)
event_dispatcher_thread_config event_dispatcher_config = {
.global = &global_config,
.sched_policy = SCHED_RR,
.sched_priority = 30,
};
#define EVENT_DISPATCH_QUEUE_DEPTH 512
typedef struct _evt_dispatch_queue {
event_dispatch_queue_item q[EVENT_DISPATCH_QUEUE_DEPTH];
unsigned head;
unsigned tail;
pthread_mutex_t lock;
} evt_dispatch_queue;
STATIC sem_t evt_dispatch_sem;
STATIC evt_dispatch_queue normal_queue = {
{ { NULL, NULL, NULL }, },
0,
0,
PTHREAD_RECURSIVE_MUTEX_INITIALIZER_NP,
};
STATIC evt_dispatch_queue high_priority_queue = {
{ { NULL, NULL, NULL }, },
0,
0,
PTHREAD_RECURSIVE_MUTEX_INITIALIZER_NP,
};
STATIC void evt_queue_init(evt_dispatch_queue *q)
{
memset(q->q, 0, sizeof(q->q));
q->head = q->tail = 0;
}
STATIC void evt_queue_destroy(evt_dispatch_queue *q)
{
q->head = q->tail = 0;
}
STATIC volatile bool dispatcher_is_ready = false;
bool evt_dispatcher_is_ready(void)
{
return dispatcher_is_ready;
}
STATIC bool evt_queue_is_full(evt_dispatch_queue *q)
{
const size_t num = sizeof(q->q) / sizeof(q->q[0]);
if (q->tail > q->head) {
if ((q->head == 0) && (q->tail == (num - 1)))
return true;
} else if (q->tail < q->head) {
if (q->tail == (q->head - 1))
return true;
}
return false;
}
STATIC bool evt_queue_is_empty(evt_dispatch_queue *q)
{
return q->head == q->tail;
}
STATIC bool _evt_queue_response(evt_dispatch_queue *q,
fpgad_respond_event_t callback,
fpgad_monitored_device *device,
void *context)
{
int res;
opae_mutex_lock(res, &q->lock);
if (evt_queue_is_full(q)) {
opae_mutex_unlock(res, &q->lock);
return false;
}
q->q[q->tail].callback = callback;
q->q[q->tail].device = device;
q->q[q->tail].context = context;
q->tail = (q->tail + 1) % EVENT_DISPATCH_QUEUE_DEPTH;
opae_mutex_unlock(res, &q->lock);
sem_post(&evt_dispatch_sem);
return true;
}
STATIC bool _evt_queue_get(evt_dispatch_queue *q,
event_dispatch_queue_item *item)
{
int res;
opae_mutex_lock(res, &q->lock);
if (evt_queue_is_empty(q)) {
opae_mutex_unlock(res, &q->lock);
return false;
}
*item = q->q[q->head];
memset(&q->q[q->head], 0, sizeof(q->q[0]));
q->head = (q->head + 1) % EVENT_DISPATCH_QUEUE_DEPTH;
opae_mutex_unlock(res, &q->lock);
return true;
}
bool evt_queue_response(fpgad_respond_event_t callback,
fpgad_monitored_device *device,
void *context)
{
return _evt_queue_response(&normal_queue,
callback,
device,
context);
}
bool evt_queue_get(event_dispatch_queue_item *item)
{
return _evt_queue_get(&normal_queue, item);
}
bool evt_queue_response_high(fpgad_respond_event_t callback,
fpgad_monitored_device *device,
void *context)
{
return _evt_queue_response(&high_priority_queue,
callback,
device,
context);
}
bool evt_queue_get_high(event_dispatch_queue_item *item)
{
return _evt_queue_get(&high_priority_queue, item);
}
void *event_dispatcher_thread(void *thread_context)
{
event_dispatcher_thread_config *c =
(event_dispatcher_thread_config *)thread_context;
struct sched_param sched_param;
int policy = 0;
int res;
struct timespec ts;
LOG("starting\n");
res = pthread_getschedparam(pthread_self(), &policy, &sched_param);
if (res) {
LOG("error getting scheduler params: %s\n", strerror(res));
} else {
policy = c->sched_policy;
sched_param.sched_priority = c->sched_priority;
res = pthread_setschedparam(pthread_self(),
policy,
&sched_param);
if (res) {
LOG("error setting scheduler params"
" (got root?): %s\n", strerror(res));
}
}
evt_queue_init(&normal_queue);
evt_queue_init(&high_priority_queue);
if (sem_init(&evt_dispatch_sem, 0, 0)) {
LOG("failed to init queue sem.\n");
goto out_exit;
}
dispatcher_is_ready = true;
while (c->global->running) {
clock_gettime(CLOCK_REALTIME, &ts);
ts.tv_nsec += c->global->poll_interval_usec * 1000;
if (ts.tv_nsec > 1000000000) {
++ts.tv_sec;
ts.tv_nsec -= 1000000000;
}
res = sem_timedwait(&evt_dispatch_sem, &ts);
if (!res) {
event_dispatch_queue_item item;
// Process all high-priority items first
while (evt_queue_get_high(&item)) {
LOG("dispatching (high) for object_id: 0x%" PRIx64 ".\n",
item.device->object_id);
item.callback(item.device, item.context);
}
if (evt_queue_get(&item)) {
LOG("dispatching for object_id: 0x%" PRIx64 ".\n",
item.device->object_id);
item.callback(item.device, item.context);
}
}
}
dispatcher_is_ready = false;
evt_queue_destroy(&normal_queue);
evt_queue_destroy(&high_priority_queue);
sem_destroy(&evt_dispatch_sem);
out_exit:
LOG("exiting\n");
return NULL;
}