// Copyright(c) 2018-2020, Intel Corporation // // Redistribution and use in source and binary forms, with or without // modification, are permitted provided that the following conditions are met: // // * Redistributions of source code must retain the above copyright notice, // this list of conditions and the following disclaimer. // * Redistributions in binary form must reproduce the above copyright notice, // this list of conditions and the following disclaimer in the documentation // and/or other materials provided with the distribution. // * Neither the name of Intel Corporation nor the names of its contributors // may be used to endorse or promote products derived from this software // without specific prior written permission. // // THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" // AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE // IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE // ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE // LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR // CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF // SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS // INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN // CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) // ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE // POSSIBILITY OF SUCH DAMAGE. #ifdef HAVE_CONFIG_H #include #endif // HAVE_CONFIG_H #include #include #include #include "event_dispatcher_thread.h" #ifdef LOG #undef LOG #endif #define LOG(format, ...) \ log_printf("event_dispatcher_thread: " format, ##__VA_ARGS__) event_dispatcher_thread_config event_dispatcher_config = { .global = &global_config, .sched_policy = SCHED_RR, .sched_priority = 30, }; #define EVENT_DISPATCH_QUEUE_DEPTH 512 typedef struct _evt_dispatch_queue { event_dispatch_queue_item q[EVENT_DISPATCH_QUEUE_DEPTH]; unsigned head; unsigned tail; pthread_mutex_t lock; } evt_dispatch_queue; STATIC sem_t evt_dispatch_sem; STATIC evt_dispatch_queue normal_queue = { { { NULL, NULL, NULL }, }, 0, 0, PTHREAD_RECURSIVE_MUTEX_INITIALIZER_NP, }; STATIC evt_dispatch_queue high_priority_queue = { { { NULL, NULL, NULL }, }, 0, 0, PTHREAD_RECURSIVE_MUTEX_INITIALIZER_NP, }; STATIC void evt_queue_init(evt_dispatch_queue *q) { memset(q->q, 0, sizeof(q->q)); q->head = q->tail = 0; } STATIC void evt_queue_destroy(evt_dispatch_queue *q) { q->head = q->tail = 0; } STATIC volatile bool dispatcher_is_ready = false; bool evt_dispatcher_is_ready(void) { return dispatcher_is_ready; } STATIC bool evt_queue_is_full(evt_dispatch_queue *q) { const size_t num = sizeof(q->q) / sizeof(q->q[0]); if (q->tail > q->head) { if ((q->head == 0) && (q->tail == (num - 1))) return true; } else if (q->tail < q->head) { if (q->tail == (q->head - 1)) return true; } return false; } STATIC bool evt_queue_is_empty(evt_dispatch_queue *q) { return q->head == q->tail; } STATIC bool _evt_queue_response(evt_dispatch_queue *q, fpgad_respond_event_t callback, fpgad_monitored_device *device, void *context) { int res; opae_mutex_lock(res, &q->lock); if (evt_queue_is_full(q)) { opae_mutex_unlock(res, &q->lock); return false; } q->q[q->tail].callback = callback; q->q[q->tail].device = device; q->q[q->tail].context = context; q->tail = (q->tail + 1) % EVENT_DISPATCH_QUEUE_DEPTH; opae_mutex_unlock(res, &q->lock); sem_post(&evt_dispatch_sem); return true; } STATIC bool _evt_queue_get(evt_dispatch_queue *q, event_dispatch_queue_item *item) { int res; opae_mutex_lock(res, &q->lock); if (evt_queue_is_empty(q)) { opae_mutex_unlock(res, &q->lock); return false; } *item = q->q[q->head]; memset(&q->q[q->head], 0, sizeof(q->q[0])); q->head = (q->head + 1) % EVENT_DISPATCH_QUEUE_DEPTH; opae_mutex_unlock(res, &q->lock); return true; } bool evt_queue_response(fpgad_respond_event_t callback, fpgad_monitored_device *device, void *context) { return _evt_queue_response(&normal_queue, callback, device, context); } bool evt_queue_get(event_dispatch_queue_item *item) { return _evt_queue_get(&normal_queue, item); } bool evt_queue_response_high(fpgad_respond_event_t callback, fpgad_monitored_device *device, void *context) { return _evt_queue_response(&high_priority_queue, callback, device, context); } bool evt_queue_get_high(event_dispatch_queue_item *item) { return _evt_queue_get(&high_priority_queue, item); } void *event_dispatcher_thread(void *thread_context) { event_dispatcher_thread_config *c = (event_dispatcher_thread_config *)thread_context; struct sched_param sched_param; int policy = 0; int res; struct timespec ts; LOG("starting\n"); res = pthread_getschedparam(pthread_self(), &policy, &sched_param); if (res) { LOG("error getting scheduler params: %s\n", strerror(res)); } else { policy = c->sched_policy; sched_param.sched_priority = c->sched_priority; res = pthread_setschedparam(pthread_self(), policy, &sched_param); if (res) { LOG("error setting scheduler params" " (got root?): %s\n", strerror(res)); } } evt_queue_init(&normal_queue); evt_queue_init(&high_priority_queue); if (sem_init(&evt_dispatch_sem, 0, 0)) { LOG("failed to init queue sem.\n"); goto out_exit; } dispatcher_is_ready = true; while (c->global->running) { clock_gettime(CLOCK_REALTIME, &ts); ts.tv_nsec += c->global->poll_interval_usec * 1000; if (ts.tv_nsec > 1000000000) { ++ts.tv_sec; ts.tv_nsec -= 1000000000; } res = sem_timedwait(&evt_dispatch_sem, &ts); if (!res) { event_dispatch_queue_item item; // Process all high-priority items first while (evt_queue_get_high(&item)) { LOG("dispatching (high) for object_id: 0x%" PRIx64 ".\n", item.device->object_id); item.callback(item.device, item.context); } if (evt_queue_get(&item)) { LOG("dispatching for object_id: 0x%" PRIx64 ".\n", item.device->object_id); item.callback(item.device, item.context); } } } dispatcher_is_ready = false; evt_queue_destroy(&normal_queue); evt_queue_destroy(&high_priority_queue); sem_destroy(&evt_dispatch_sem); out_exit: LOG("exiting\n"); return NULL; }