/*
Copyright (c) 2012 Red Hat, Inc. <http://www.redhat.com>
This file is part of GlusterFS.
This file is licensed to you under your choice of the GNU Lesser
General Public License, version 3 or any later version (LGPLv3 or
later), or the GNU General Public License, version 2 (GPLv2), in all
cases as published by the Free Software Foundation.
*/
#include <sys/poll.h>
#include <pthread.h>
#include <unistd.h>
#include <fcntl.h>
#include <stdlib.h>
#include <errno.h>
#include <string.h>
#include "glusterfs/logging.h"
#include "glusterfs/gf-event.h"
#include "glusterfs/mem-pool.h"
#include "glusterfs/common-utils.h"
#include "glusterfs/syscall.h"
#include "glusterfs/libglusterfs-messages.h"
#ifdef HAVE_SYS_EPOLL_H
#include <sys/epoll.h>
struct event_slot_epoll {
int fd;
int events;
int gen;
int idx;
gf_atomic_t ref;
int do_close;
int in_handler;
int handled_error;
void *data;
event_handler_t handler;
gf_lock_t lock;
struct list_head poller_death;
};
struct event_thread_data {
struct event_pool *event_pool;
int event_index;
};
static struct event_slot_epoll *
__event_newtable(struct event_pool *event_pool, int table_idx)
{
struct event_slot_epoll *table = NULL;
int i = -1;
table = GF_CALLOC(sizeof(*table), EVENT_EPOLL_SLOTS, gf_common_mt_ereg);
if (!table)
return NULL;
for (i = 0; i < EVENT_EPOLL_SLOTS; i++) {
table[i].fd = -1;
LOCK_INIT(&table[i].lock);
INIT_LIST_HEAD(&table[i].poller_death);
}
event_pool->ereg[table_idx] = table;
event_pool->slots_used[table_idx] = 0;
return table;
}
static int
event_slot_ref(struct event_slot_epoll *slot)
{
if (!slot)
return -1;
return GF_ATOMIC_INC(slot->ref);
}
static int
__event_slot_alloc(struct event_pool *event_pool, int fd,
char notify_poller_death, struct event_slot_epoll **slot)
{
int i = 0;
int j = 0;
int table_idx = -1;
int gen = -1;
struct event_slot_epoll *table = NULL;
retry:
while (i < EVENT_EPOLL_TABLES) {
switch (event_pool->slots_used[i]) {
case EVENT_EPOLL_SLOTS:
break;
case 0:
if (!event_pool->ereg[i]) {
table = __event_newtable(event_pool, i);
if (!table)
return -1;
} else {
table = event_pool->ereg[i];
}
break;
default:
table = event_pool->ereg[i];
break;
}
if (table)
/* break out of the loop */
break;
i++;
}
if (!table)
return -1;
table_idx = i;
for (j = 0; j < EVENT_EPOLL_SLOTS; j++) {
if (table[j].fd == -1) {
/* wipe everything except bump the generation */
gen = table[j].gen;
memset(&table[j], 0, sizeof(table[j]));
table[j].gen = gen + 1;
LOCK_INIT(&table[j].lock);
INIT_LIST_HEAD(&table[j].poller_death);
table[j].fd = fd;
if (notify_poller_death) {
table[j].idx = table_idx * EVENT_EPOLL_SLOTS + j;
list_add_tail(&table[j].poller_death,
&event_pool->poller_death);
}
event_pool->slots_used[table_idx]++;
break;
}
}
if (j == EVENT_EPOLL_SLOTS) {
table = NULL;
i++;
goto retry;
} else {
(*slot) = &table[j];
event_slot_ref(*slot);
return table_idx * EVENT_EPOLL_SLOTS + j;
}
}
static int
event_slot_alloc(struct event_pool *event_pool, int fd,
char notify_poller_death, struct event_slot_epoll **slot)
{
int idx = -1;
pthread_mutex_lock(&event_pool->mutex);
{
idx = __event_slot_alloc(event_pool, fd, notify_poller_death, slot);
}
pthread_mutex_unlock(&event_pool->mutex);
return idx;
}
static void
__event_slot_dealloc(struct event_pool *event_pool, int idx)
{
int table_idx = 0;
int offset = 0;
struct event_slot_epoll *table = NULL;
struct event_slot_epoll *slot = NULL;
int fd = -1;
table_idx = idx / EVENT_EPOLL_SLOTS;
offset = idx % EVENT_EPOLL_SLOTS;
table = event_pool->ereg[table_idx];
if (!table)
return;
slot = &table[offset];
slot->gen++;
fd = slot->fd;
slot->fd = -1;
slot->handled_error = 0;
slot->in_handler = 0;
list_del_init(&slot->poller_death);
if (fd != -1)
event_pool->slots_used[table_idx]--;
return;
}
static void
event_slot_dealloc(struct event_pool *event_pool, int idx)
{
pthread_mutex_lock(&event_pool->mutex);
{
__event_slot_dealloc(event_pool, idx);
}
pthread_mutex_unlock(&event_pool->mutex);
return;
}
static struct event_slot_epoll *
event_slot_get(struct event_pool *event_pool, int idx)
{
struct event_slot_epoll *slot = NULL;
struct event_slot_epoll *table = NULL;
int table_idx = 0;
int offset = 0;
table_idx = idx / EVENT_EPOLL_SLOTS;
offset = idx % EVENT_EPOLL_SLOTS;
table = event_pool->ereg[table_idx];
if (!table)
return NULL;
slot = &table[offset];
event_slot_ref(slot);
return slot;
}
static void
__event_slot_unref(struct event_pool *event_pool, struct event_slot_epoll *slot,
int idx)
{
int ref = -1;
int fd = -1;
int do_close = 0;
ref = GF_ATOMIC_DEC(slot->ref);
if (ref)
/* slot still alive */
goto done;
LOCK(&slot->lock);
{
fd = slot->fd;
do_close = slot->do_close;
slot->do_close = 0;
}
UNLOCK(&slot->lock);
__event_slot_dealloc(event_pool, idx);
if (do_close)
sys_close(fd);
done:
return;
}
static void
event_slot_unref(struct event_pool *event_pool, struct event_slot_epoll *slot,
int idx)
{
int ref = -1;
int fd = -1;
int do_close = 0;
ref = GF_ATOMIC_DEC(slot->ref);
if (ref)
/* slot still alive */
goto done;
LOCK(&slot->lock);
{
fd = slot->fd;
do_close = slot->do_close;
slot->do_close = 0;
}
UNLOCK(&slot->lock);
event_slot_dealloc(event_pool, idx);
if (do_close)
sys_close(fd);
done:
return;
}
static struct event_pool *
event_pool_new_epoll(int count, int eventthreadcount)
{
struct event_pool *event_pool = NULL;
int epfd = -1;
event_pool = GF_CALLOC(1, sizeof(*event_pool), gf_common_mt_event_pool);
if (!event_pool)
goto out;
epfd = epoll_create(count);
if (epfd == -1) {
gf_msg("epoll", GF_LOG_ERROR, errno, LG_MSG_EPOLL_FD_CREATE_FAILED,
"epoll fd creation "
"failed");
GF_FREE(event_pool->reg);
GF_FREE(event_pool);
event_pool = NULL;
goto out;
}
event_pool->fd = epfd;
event_pool->count = count;
INIT_LIST_HEAD(&event_pool->poller_death);
event_pool->eventthreadcount = eventthreadcount;
event_pool->auto_thread_count = 0;
pthread_mutex_init(&event_pool->mutex, NULL);
out:
return event_pool;
}
static void
__slot_update_events(struct event_slot_epoll *slot, int poll_in, int poll_out)
{
switch (poll_in) {
case 1:
slot->events |= EPOLLIN;
break;
case 0:
slot->events &= ~EPOLLIN;
break;
case -1:
/* do nothing */
break;
default:
gf_msg("epoll", GF_LOG_ERROR, 0, LG_MSG_INVALID_POLL_IN,
"invalid poll_in value %d", poll_in);
break;
}
switch (poll_out) {
case 1:
slot->events |= EPOLLOUT;
break;
case 0:
slot->events &= ~EPOLLOUT;
break;
case -1:
/* do nothing */
break;
default:
gf_msg("epoll", GF_LOG_ERROR, 0, LG_MSG_INVALID_POLL_OUT,
"invalid poll_out value %d", poll_out);
break;
}
}
int
event_register_epoll(struct event_pool *event_pool, int fd,
event_handler_t handler, void *data, int poll_in,
int poll_out, char notify_poller_death)
{
int idx = -1;
int ret = -1;
int destroy = 0;
struct epoll_event epoll_event = {
0,
};
struct event_data *ev_data = (void *)&epoll_event.data;
struct event_slot_epoll *slot = NULL;
GF_VALIDATE_OR_GOTO("event", event_pool, out);
/* TODO: Even with the below check, there is a possibility of race,
* What if the destroy mode is set after the check is done.
* Not sure of the best way to prevent this race, ref counting
* is one possibility.
* There is no harm in registering and unregistering the fd
* even after destroy mode is set, just that such fds will remain
* open until unregister is called, also the events on that fd will be
* notified, until one of the poller thread is alive.
*/
pthread_mutex_lock(&event_pool->mutex);
{
destroy = event_pool->destroy;
}
pthread_mutex_unlock(&event_pool->mutex);
if (destroy == 1)
goto out;
idx = event_slot_alloc(event_pool, fd, notify_poller_death, &slot);
if (idx == -1) {
gf_msg("epoll", GF_LOG_ERROR, 0, LG_MSG_SLOT_NOT_FOUND,
"could not find slot for fd=%d", fd);
return -1;
}
assert(slot->fd == fd);
LOCK(&slot->lock);
{
/* make epoll 'singleshot', which
means we need to re-add the fd with
epoll_ctl(EPOLL_CTL_MOD) after delivery of every
single event. This assures us that while a poller
thread has picked up and is processing an event,
another poller will not try to pick this at the same
time as well.
*/
slot->events = EPOLLPRI | EPOLLHUP | EPOLLERR | EPOLLONESHOT;
slot->handler = handler;
slot->data = data;
__slot_update_events(slot, poll_in, poll_out);
epoll_event.events = slot->events;
ev_data->idx = idx;
ev_data->gen = slot->gen;
ret = epoll_ctl(event_pool->fd, EPOLL_CTL_ADD, fd, &epoll_event);
/* check ret after UNLOCK() to avoid deadlock in
event_slot_unref()
*/
}
UNLOCK(&slot->lock);
if (ret == -1) {
gf_msg("epoll", GF_LOG_ERROR, errno, LG_MSG_EPOLL_FD_ADD_FAILED,
"failed to add fd(=%d) to "
"epoll fd(=%d)",
fd, event_pool->fd);
event_slot_unref(event_pool, slot, idx);
idx = -1;
}
/* keep slot->ref (do not event_slot_unref) if successful */
out:
return idx;
}
static int
event_unregister_epoll_common(struct event_pool *event_pool, int fd, int idx,
int do_close)
{
int ret = -1;
struct event_slot_epoll *slot = NULL;
GF_VALIDATE_OR_GOTO("event", event_pool, out);
/* During shutdown, it may happen that a socket registration with
* the event sub-system may fail and an rpc_transport_unref() may
* be called for such an unregistered socket with idx == -1. This
* may cause the following assert(slot->fd == fd) to fail.
*/
if (idx < 0)
goto out;
slot = event_slot_get(event_pool, idx);
if (!slot) {
gf_msg("epoll", GF_LOG_ERROR, 0, LG_MSG_SLOT_NOT_FOUND,
"could not find slot for fd=%d idx=%d", fd, idx);
return -1;
}
assert(slot->fd == fd);
LOCK(&slot->lock);
{
ret = epoll_ctl(event_pool->fd, EPOLL_CTL_DEL, fd, NULL);
if (ret == -1) {
gf_msg("epoll", GF_LOG_ERROR, errno, LG_MSG_EPOLL_FD_DEL_FAILED,
"fail to del "
"fd(=%d) from epoll fd(=%d)",
fd, event_pool->fd);
goto unlock;
}
slot->do_close = do_close;
slot->gen++; /* detect unregister in dispatch_handler() */
}
unlock:
UNLOCK(&slot->lock);
event_slot_unref(event_pool, slot, idx); /* one for event_register() */
event_slot_unref(event_pool, slot, idx); /* one for event_slot_get() */
out:
return ret;
}
static int
event_unregister_epoll(struct event_pool *event_pool, int fd, int idx_hint)
{
int ret = -1;
ret = event_unregister_epoll_common(event_pool, fd, idx_hint, 0);
return ret;
}
static int
event_unregister_close_epoll(struct event_pool *event_pool, int fd,
int idx_hint)
{
int ret = -1;
ret = event_unregister_epoll_common(event_pool, fd, idx_hint, 1);
return ret;
}
static int
event_select_on_epoll(struct event_pool *event_pool, int fd, int idx,
int poll_in, int poll_out)
{
int ret = -1;
struct event_slot_epoll *slot = NULL;
struct epoll_event epoll_event = {
0,
};
struct event_data *ev_data = (void *)&epoll_event.data;
GF_VALIDATE_OR_GOTO("event", event_pool, out);
slot = event_slot_get(event_pool, idx);
if (!slot) {
gf_msg("epoll", GF_LOG_ERROR, 0, LG_MSG_SLOT_NOT_FOUND,
"could not find slot for fd=%d idx=%d", fd, idx);
return -1;
}
assert(slot->fd == fd);
LOCK(&slot->lock);
{
__slot_update_events(slot, poll_in, poll_out);
epoll_event.events = slot->events;
ev_data->idx = idx;
ev_data->gen = slot->gen;
if (slot->in_handler)
/*
* in_handler indicates at least one thread
* executing event_dispatch_epoll_handler()
* which will perform epoll_ctl(EPOLL_CTL_MOD)
* anyways (because of EPOLLET)
*
* This not only saves a system call, but also
* avoids possibility of another epoll thread
* picking up the next event while the ongoing
* handler is still in progress (and resulting
* in unnecessary contention on rpc_transport_t->mutex).
*/
goto unlock;
ret = epoll_ctl(event_pool->fd, EPOLL_CTL_MOD, fd, &epoll_event);
if (ret == -1) {
gf_msg("epoll", GF_LOG_ERROR, errno, LG_MSG_EPOLL_FD_MODIFY_FAILED,
"failed to "
"modify fd(=%d) events to %d",
fd, epoll_event.events);
}
}
unlock:
UNLOCK(&slot->lock);
event_slot_unref(event_pool, slot, idx);
out:
return idx;
}
static int
event_dispatch_epoll_handler(struct event_pool *event_pool,
struct epoll_event *event)
{
struct event_data *ev_data = NULL;
struct event_slot_epoll *slot = NULL;
event_handler_t handler = NULL;
void *data = NULL;
int idx = -1;
int gen = -1;
int ret = -1;
int fd = -1;
gf_boolean_t handled_error_previously = _gf_false;
ev_data = (void *)&event->data;
handler = NULL;
data = NULL;
idx = ev_data->idx;
gen = ev_data->gen;
slot = event_slot_get(event_pool, idx);
if (!slot) {
gf_msg("epoll", GF_LOG_ERROR, 0, LG_MSG_SLOT_NOT_FOUND,
"could not find slot for idx=%d", idx);
return -1;
}
LOCK(&slot->lock);
{
fd = slot->fd;
if (fd == -1) {
gf_msg("epoll", GF_LOG_ERROR, 0, LG_MSG_STALE_FD_FOUND,
"stale fd found on "
"idx=%d, gen=%d, events=%d, slot->gen=%d",
idx, gen, event->events, slot->gen);
/* fd got unregistered in another thread */
goto pre_unlock;
}
if (gen != slot->gen) {
gf_msg("epoll", GF_LOG_ERROR, 0, LG_MSG_GENERATION_MISMATCH,
"generation "
"mismatch on idx=%d, gen=%d, slot->gen=%d, "
"slot->fd=%d",
idx, gen, slot->gen, slot->fd);
/* slot was re-used and therefore is another fd! */
goto pre_unlock;
}
handler = slot->handler;
data = slot->data;
if (slot->in_handler > 0) {
/* Another handler is inprogress, skip this one. */
handler = NULL;
goto pre_unlock;
}
if (slot->handled_error) {
handled_error_previously = _gf_true;
} else {
slot->handled_error = (event->events & (EPOLLERR | EPOLLHUP));
slot->in_handler++;
}
}
pre_unlock:
UNLOCK(&slot->lock);
ret = 0;
if (!handler)
goto out;
if (!handled_error_previously) {
handler(fd, idx, gen, data, (event->events & (EPOLLIN | EPOLLPRI)),
(event->events & (EPOLLOUT)),
(event->events & (EPOLLERR | EPOLLHUP)), 0);
}
out:
event_slot_unref(event_pool, slot, idx);
return ret;
}
static void *
event_dispatch_epoll_worker(void *data)
{
struct epoll_event event;
int ret = -1;
struct event_thread_data *ev_data = data;
struct event_pool *event_pool;
int myindex = -1;
int timetodie = 0, gen = 0;
struct list_head poller_death_notify;
struct event_slot_epoll *slot = NULL, *tmp = NULL;
GF_VALIDATE_OR_GOTO("event", ev_data, out);
event_pool = ev_data->event_pool;
myindex = ev_data->event_index;
GF_VALIDATE_OR_GOTO("event", event_pool, out);
gf_msg("epoll", GF_LOG_INFO, 0, LG_MSG_STARTED_EPOLL_THREAD,
"Started"
" thread with index %d",
myindex - 1);
pthread_mutex_lock(&event_pool->mutex);
{
event_pool->activethreadcount++;
}
pthread_mutex_unlock(&event_pool->mutex);
for (;;) {
if (event_pool->eventthreadcount < myindex) {
/* ...time to die, thread count was decreased below
* this threads index */
/* Start with extra safety at this point, reducing
* lock conention in normal case when threads are not
* reconfigured always */
pthread_mutex_lock(&event_pool->mutex);
{
if (event_pool->eventthreadcount < myindex) {
while (event_pool->poller_death_sliced) {
pthread_cond_wait(&event_pool->cond,
&event_pool->mutex);
}
INIT_LIST_HEAD(&poller_death_notify);
/* if found true in critical section,
* die */
event_pool->pollers[myindex - 1] = 0;
event_pool->activethreadcount--;
timetodie = 1;
gen = ++event_pool->poller_gen;
list_for_each_entry(slot, &event_pool->poller_death,
poller_death)
{
event_slot_ref(slot);
}
list_splice_init(&event_pool->poller_death,
&poller_death_notify);
event_pool->poller_death_sliced = 1;
pthread_cond_broadcast(&event_pool->cond);
}
}
pthread_mutex_unlock(&event_pool->mutex);
if (timetodie) {
list_for_each_entry(slot, &poller_death_notify, poller_death)
{
slot->handler(slot->fd, 0, gen, slot->data, 0, 0, 0, 1);
}
pthread_mutex_lock(&event_pool->mutex);
{
list_for_each_entry_safe(slot, tmp, &poller_death_notify,
poller_death)
{
__event_slot_unref(event_pool, slot, slot->idx);
}
list_splice(&poller_death_notify,
&event_pool->poller_death);
event_pool->poller_death_sliced = 0;
pthread_cond_broadcast(&event_pool->cond);
}
pthread_mutex_unlock(&event_pool->mutex);
gf_msg("epoll", GF_LOG_INFO, 0, LG_MSG_EXITED_EPOLL_THREAD,
"Exited thread with index %d", myindex);
goto out;
}
}
ret = epoll_wait(event_pool->fd, &event, 1, -1);
if (ret == 0)
/* timeout */
continue;
if (ret == -1 && errno == EINTR)
/* sys call */
continue;
ret = event_dispatch_epoll_handler(event_pool, &event);
if (ret) {
gf_msg("epoll", GF_LOG_ERROR, 0, LG_MSG_EXITED_EPOLL_THREAD,
"Failed to dispatch handler");
}
}
out:
if (ev_data)
GF_FREE(ev_data);
return NULL;
}
/* Attempts to start the # of configured pollers, ensuring at least the first
* is started in a joinable state */
static int
event_dispatch_epoll(struct event_pool *event_pool)
{
int i = 0;
pthread_t t_id;
int pollercount = 0;
int ret = -1;
struct event_thread_data *ev_data = NULL;
/* Start the configured number of pollers */
pthread_mutex_lock(&event_pool->mutex);
{
pollercount = event_pool->eventthreadcount;
/* Set to MAX if greater */
if (pollercount > EVENT_MAX_THREADS)
pollercount = EVENT_MAX_THREADS;
/* Default pollers to 1 in case this is incorrectly set */
if (pollercount <= 0)
pollercount = 1;
event_pool->activethreadcount++;
for (i = 0; i < pollercount; i++) {
ev_data = GF_CALLOC(1, sizeof(*ev_data), gf_common_mt_event_pool);
if (!ev_data) {
if (i == 0) {
/* Need to succeed creating 0'th
* thread, to joinable and wait */
break;
} else {
/* Inability to create other threads
* are a lesser evil, and ignored */
continue;
}
}
ev_data->event_pool = event_pool;
ev_data->event_index = i + 1;
ret = gf_thread_create(&t_id, NULL, event_dispatch_epoll_worker,
ev_data, "epoll%03hx", i & 0x3ff);
if (!ret) {
event_pool->pollers[i] = t_id;
/* mark all threads other than one in index 0
* as detachable. Errors can be ignored, they
* spend their time as zombies if not detched
* and the thread counts are decreased */
if (i != 0)
pthread_detach(event_pool->pollers[i]);
} else {
gf_msg("epoll", GF_LOG_WARNING, 0,
LG_MSG_START_EPOLL_THREAD_FAILED,
"Failed to start thread for index %d", i);
if (i == 0) {
GF_FREE(ev_data);
break;
} else {
GF_FREE(ev_data);
continue;
}
}
}
}
pthread_mutex_unlock(&event_pool->mutex);
/* Just wait for the first thread, that is created in a joinable state
* and will never die, ensuring this function never returns */
if (event_pool->pollers[0] != 0)
pthread_join(event_pool->pollers[0], NULL);
pthread_mutex_lock(&event_pool->mutex);
{
event_pool->activethreadcount--;
}
pthread_mutex_unlock(&event_pool->mutex);
return ret;
}
/**
* @param event_pool event_pool on which fds of interest are registered for
* events.
*
* @return 1 if at least one epoll worker thread is spawned, 0 otherwise
*
* NB This function SHOULD be called under event_pool->mutex.
*/
static int
event_pool_dispatched_unlocked(struct event_pool *event_pool)
{
return (event_pool->pollers[0] != 0);
}
int
event_reconfigure_threads_epoll(struct event_pool *event_pool, int value)
{
int i;
int ret = 0;
pthread_t t_id;
int oldthreadcount;
struct event_thread_data *ev_data = NULL;
pthread_mutex_lock(&event_pool->mutex);
{
/* Reconfigure to 0 threads is allowed only in destroy mode */
if (event_pool->destroy == 1) {
value = 0;
} else {
/* Set to MAX if greater */
if (value > EVENT_MAX_THREADS)
value = EVENT_MAX_THREADS;
/* Default pollers to 1 in case this is set incorrectly */
if (value <= 0)
value = 1;
}
oldthreadcount = event_pool->eventthreadcount;
/* Start 'worker' threads as necessary only if event_dispatch()
* was called before. If event_dispatch() was not called, there
* will be no epoll 'worker' threads running yet. */
if (event_pool_dispatched_unlocked(event_pool) &&
(oldthreadcount < value)) {
/* create more poll threads */
for (i = oldthreadcount; i < value; i++) {
/* Start a thread if the index at this location
* is a 0, so that the older thread is confirmed
* as dead */
if (event_pool->pollers[i] == 0) {
ev_data = GF_CALLOC(1, sizeof(*ev_data),
gf_common_mt_event_pool);
if (!ev_data) {
continue;
}
ev_data->event_pool = event_pool;
ev_data->event_index = i + 1;
ret = gf_thread_create(&t_id, NULL,
event_dispatch_epoll_worker, ev_data,
"epoll%03hx", i & 0x3ff);
if (ret) {
gf_msg("epoll", GF_LOG_WARNING, 0,
LG_MSG_START_EPOLL_THREAD_FAILED,
"Failed to start thread"
" for index %d",
i);
GF_FREE(ev_data);
} else {
pthread_detach(t_id);
event_pool->pollers[i] = t_id;
}
}
}
}
/* if value decreases, threads will terminate, themselves */
event_pool->eventthreadcount = value;
}
pthread_mutex_unlock(&event_pool->mutex);
return 0;
}
/* This function is the destructor for the event_pool data structure
* Should be called only after poller_threads_destroy() is called,
* else will lead to crashes.
*/
static int
event_pool_destroy_epoll(struct event_pool *event_pool)
{
int ret = 0, i = 0, j = 0;
struct event_slot_epoll *table = NULL;
ret = sys_close(event_pool->fd);
for (i = 0; i < EVENT_EPOLL_TABLES; i++) {
if (event_pool->ereg[i]) {
table = event_pool->ereg[i];
event_pool->ereg[i] = NULL;
for (j = 0; j < EVENT_EPOLL_SLOTS; j++) {
LOCK_DESTROY(&table[j].lock);
}
GF_FREE(table);
}
}
pthread_mutex_destroy(&event_pool->mutex);
pthread_cond_destroy(&event_pool->cond);
GF_FREE(event_pool->evcache);
GF_FREE(event_pool->reg);
GF_FREE(event_pool);
return ret;
}
static int
event_handled_epoll(struct event_pool *event_pool, int fd, int idx, int gen)
{
struct event_slot_epoll *slot = NULL;
struct epoll_event epoll_event = {
0,
};
struct event_data *ev_data = (void *)&epoll_event.data;
int ret = 0;
slot = event_slot_get(event_pool, idx);
if (!slot) {
gf_msg("epoll", GF_LOG_ERROR, 0, LG_MSG_SLOT_NOT_FOUND,
"could not find slot for fd=%d idx=%d", fd, idx);
return -1;
}
assert(slot->fd == fd);
LOCK(&slot->lock);
{
slot->in_handler--;
if (gen != slot->gen) {
/* event_unregister() happened while we were
in handler()
*/
gf_msg_debug("epoll", 0,
"generation bumped on idx=%d"
" from gen=%d to slot->gen=%d, fd=%d, "
"slot->fd=%d",
idx, gen, slot->gen, fd, slot->fd);
goto unlock;
}
/* This call also picks up the changes made by another
thread calling event_select_on_epoll() while this
thread was busy in handler()
*/
if (slot->in_handler == 0) {
epoll_event.events = slot->events;
ev_data->idx = idx;
ev_data->gen = gen;
ret = epoll_ctl(event_pool->fd, EPOLL_CTL_MOD, fd, &epoll_event);
}
}
unlock:
UNLOCK(&slot->lock);
event_slot_unref(event_pool, slot, idx);
return ret;
}
struct event_ops event_ops_epoll = {
.new = event_pool_new_epoll,
.event_register = event_register_epoll,
.event_select_on = event_select_on_epoll,
.event_unregister = event_unregister_epoll,
.event_unregister_close = event_unregister_close_epoll,
.event_dispatch = event_dispatch_epoll,
.event_reconfigure_threads = event_reconfigure_threads_epoll,
.event_pool_destroy = event_pool_destroy_epoll,
.event_handled = event_handled_epoll,
};
#endif