/*
Copyright (c) 2012 Red Hat, Inc. <http://www.redhat.com>
This file is part of GlusterFS.
This file is licensed to you under your choice of the GNU Lesser
General Public License, version 3 or any later version (LGPLv3 or
later), or the GNU General Public License, version 2 (GPLv2), in all
cases as published by the Free Software Foundation.
*/
#include <sys/poll.h>
#include <pthread.h>
#include <unistd.h>
#include <fcntl.h>
#include <stdlib.h>
#include <errno.h>
#include <string.h>
#include "glusterfs/logging.h"
#include "glusterfs/gf-event.h"
#include "glusterfs/mem-pool.h"
#include "glusterfs/common-utils.h"
#include "glusterfs/syscall.h"
#include "glusterfs/libglusterfs-messages.h"
struct event_slot_poll {
int fd;
int events;
void *data;
event_handler_t handler;
};
static int
event_register_poll(struct event_pool *event_pool, int fd,
event_handler_t handler, void *data, int poll_in,
int poll_out, char notify_poller_death);
static void
__flush_fd(int fd, int idx, int gen, void *data, int poll_in, int poll_out,
int poll_err, char event_thread_died)
{
char buf[64];
int ret = -1;
if (!poll_in)
return;
do {
ret = sys_read(fd, buf, 64);
if (ret == -1 && errno != EAGAIN) {
gf_msg("poll", GF_LOG_ERROR, errno, LG_MSG_FILE_OP_FAILED,
"read on %d returned "
"error",
fd);
}
} while (ret == 64);
return;
}
static int
__event_getindex(struct event_pool *event_pool, int fd, int idx)
{
int ret = -1;
int i = 0;
GF_VALIDATE_OR_GOTO("event", event_pool, out);
/* lookup in used space based on index provided */
if (idx > -1 && idx < event_pool->used) {
if (event_pool->reg[idx].fd == fd) {
ret = idx;
goto out;
}
}
/* search in used space, if lookup fails */
for (i = 0; i < event_pool->used; i++) {
if (event_pool->reg[i].fd == fd) {
ret = i;
break;
}
}
out:
return ret;
}
static struct event_pool *
event_pool_new_poll(int count, int eventthreadcount)
{
struct event_pool *event_pool = NULL;
int ret = -1;
event_pool = GF_CALLOC(1, sizeof(*event_pool), gf_common_mt_event_pool);
if (!event_pool)
return NULL;
event_pool->count = count;
event_pool->reg = GF_CALLOC(event_pool->count, sizeof(*event_pool->reg),
gf_common_mt_reg);
if (!event_pool->reg) {
GF_FREE(event_pool);
return NULL;
}
pthread_mutex_init(&event_pool->mutex, NULL);
ret = pipe(event_pool->breaker);
if (ret == -1) {
gf_msg("poll", GF_LOG_ERROR, errno, LG_MSG_PIPE_CREATE_FAILED,
"pipe creation failed");
GF_FREE(event_pool->reg);
GF_FREE(event_pool);
return NULL;
}
ret = fcntl(event_pool->breaker[0], F_SETFL, O_NONBLOCK);
if (ret == -1) {
gf_msg("poll", GF_LOG_ERROR, errno, LG_MSG_SET_PIPE_FAILED,
"could not set pipe to non blocking mode");
sys_close(event_pool->breaker[0]);
sys_close(event_pool->breaker[1]);
event_pool->breaker[0] = event_pool->breaker[1] = -1;
GF_FREE(event_pool->reg);
GF_FREE(event_pool);
return NULL;
}
ret = fcntl(event_pool->breaker[1], F_SETFL, O_NONBLOCK);
if (ret == -1) {
gf_msg("poll", GF_LOG_ERROR, errno, LG_MSG_SET_PIPE_FAILED,
"could not set pipe to non blocking mode");
sys_close(event_pool->breaker[0]);
sys_close(event_pool->breaker[1]);
event_pool->breaker[0] = event_pool->breaker[1] = -1;
GF_FREE(event_pool->reg);
GF_FREE(event_pool);
return NULL;
}
ret = event_register_poll(event_pool, event_pool->breaker[0], __flush_fd,
NULL, 1, 0, 0);
if (ret == -1) {
gf_msg("poll", GF_LOG_ERROR, 0, LG_MSG_REGISTER_PIPE_FAILED,
"could not register pipe fd with poll event loop");
sys_close(event_pool->breaker[0]);
sys_close(event_pool->breaker[1]);
event_pool->breaker[0] = event_pool->breaker[1] = -1;
GF_FREE(event_pool->reg);
GF_FREE(event_pool);
return NULL;
}
if (eventthreadcount > 1) {
gf_msg("poll", GF_LOG_INFO, 0, LG_MSG_POLL_IGNORE_MULTIPLE_THREADS,
"Currently poll "
"does not use multiple event processing threads, "
"thread count (%d) ignored",
eventthreadcount);
}
/* although, eventhreadcount for poll implementation is always
* going to be 1, eventthreadcount needs to be set to 1 so that
* rpcsvc_request_handler() thread scaling works flawlessly in
* both epoll and poll models
*/
event_pool->eventthreadcount = 1;
return event_pool;
}
static int
event_register_poll(struct event_pool *event_pool, int fd,
event_handler_t handler, void *data, int poll_in,
int poll_out, char notify_poller_death)
{
int idx = -1;
GF_VALIDATE_OR_GOTO("event", event_pool, out);
pthread_mutex_lock(&event_pool->mutex);
{
if (event_pool->count == event_pool->used) {
event_pool->count += 256;
event_pool->reg = GF_REALLOC(
event_pool->reg, event_pool->count * sizeof(*event_pool->reg));
if (!event_pool->reg)
goto unlock;
}
idx = event_pool->used++;
event_pool->reg[idx].fd = fd;
event_pool->reg[idx].events = POLLPRI;
event_pool->reg[idx].handler = handler;
event_pool->reg[idx].data = data;
switch (poll_in) {
case 1:
event_pool->reg[idx].events |= POLLIN;
break;
case 0:
event_pool->reg[idx].events &= ~POLLIN;
break;
case -1:
/* do nothing */
break;
default:
gf_msg("poll", GF_LOG_ERROR, 0, LG_MSG_INVALID_POLL_IN,
"invalid poll_in value %d", poll_in);
break;
}
switch (poll_out) {
case 1:
event_pool->reg[idx].events |= POLLOUT;
break;
case 0:
event_pool->reg[idx].events &= ~POLLOUT;
break;
case -1:
/* do nothing */
break;
default:
gf_msg("poll", GF_LOG_ERROR, 0, LG_MSG_INVALID_POLL_OUT,
"invalid poll_out value %d", poll_out);
break;
}
event_pool->changed = 1;
}
unlock:
pthread_mutex_unlock(&event_pool->mutex);
out:
return idx;
}
static int
event_unregister_poll(struct event_pool *event_pool, int fd, int idx_hint)
{
int idx = -1;
GF_VALIDATE_OR_GOTO("event", event_pool, out);
pthread_mutex_lock(&event_pool->mutex);
{
idx = __event_getindex(event_pool, fd, idx_hint);
if (idx == -1) {
gf_msg("poll", GF_LOG_ERROR, 0, LG_MSG_INDEX_NOT_FOUND,
"index not found for fd=%d (idx_hint=%d)", fd, idx_hint);
errno = ENOENT;
goto unlock;
}
event_pool->reg[idx] = event_pool->reg[--event_pool->used];
event_pool->changed = 1;
}
unlock:
pthread_mutex_unlock(&event_pool->mutex);
out:
return idx;
}
static int
event_unregister_close_poll(struct event_pool *event_pool, int fd, int idx_hint)
{
int ret = -1;
ret = event_unregister_poll(event_pool, fd, idx_hint);
sys_close(fd);
return ret;
}
static int
event_select_on_poll(struct event_pool *event_pool, int fd, int idx_hint,
int poll_in, int poll_out)
{
int idx = -1;
GF_VALIDATE_OR_GOTO("event", event_pool, out);
pthread_mutex_lock(&event_pool->mutex);
{
idx = __event_getindex(event_pool, fd, idx_hint);
if (idx == -1) {
gf_msg("poll", GF_LOG_ERROR, 0, LG_MSG_INDEX_NOT_FOUND,
"index not found for fd=%d (idx_hint=%d)", fd, idx_hint);
errno = ENOENT;
goto unlock;
}
switch (poll_in) {
case 1:
event_pool->reg[idx].events |= POLLIN;
break;
case 0:
event_pool->reg[idx].events &= ~POLLIN;
break;
case -1:
/* do nothing */
break;
default:
/* TODO: log error */
break;
}
switch (poll_out) {
case 1:
event_pool->reg[idx].events |= POLLOUT;
break;
case 0:
event_pool->reg[idx].events &= ~POLLOUT;
break;
case -1:
/* do nothing */
break;
default:
/* TODO: log error */
break;
}
if (poll_in + poll_out > -2)
event_pool->changed = 1;
}
unlock:
pthread_mutex_unlock(&event_pool->mutex);
out:
return idx;
}
static int
event_dispatch_poll_handler(struct event_pool *event_pool, struct pollfd *ufds,
int i)
{
event_handler_t handler = NULL;
void *data = NULL;
int idx = -1;
int ret = 0;
handler = NULL;
data = NULL;
pthread_mutex_lock(&event_pool->mutex);
{
idx = __event_getindex(event_pool, ufds[i].fd, i);
if (idx == -1) {
gf_msg("poll", GF_LOG_ERROR, 0, LG_MSG_INDEX_NOT_FOUND,
"index not found for "
"fd=%d (idx_hint=%d)",
ufds[i].fd, i);
goto unlock;
}
handler = event_pool->reg[idx].handler;
data = event_pool->reg[idx].data;
}
unlock:
pthread_mutex_unlock(&event_pool->mutex);
if (handler)
handler(ufds[i].fd, idx, 0, data,
(ufds[i].revents & (POLLIN | POLLPRI)),
(ufds[i].revents & (POLLOUT)),
(ufds[i].revents & (POLLERR | POLLHUP | POLLNVAL)), 0);
return ret;
}
static int
event_dispatch_poll_resize(struct event_pool *event_pool, struct pollfd *ufds,
int size)
{
int i = 0;
pthread_mutex_lock(&event_pool->mutex);
{
if (event_pool->changed == 0) {
goto unlock;
}
if (event_pool->used > event_pool->evcache_size) {
GF_FREE(event_pool->evcache);
event_pool->evcache = ufds = NULL;
event_pool->evcache_size = event_pool->used;
ufds = GF_CALLOC(sizeof(struct pollfd), event_pool->evcache_size,
gf_common_mt_pollfd);
if (!ufds)
goto unlock;
event_pool->evcache = ufds;
}
if (ufds == NULL) {
goto unlock;
}
for (i = 0; i < event_pool->used; i++) {
ufds[i].fd = event_pool->reg[i].fd;
ufds[i].events = event_pool->reg[i].events;
ufds[i].revents = 0;
}
size = i;
}
unlock:
pthread_mutex_unlock(&event_pool->mutex);
return size;
}
static int
event_dispatch_poll(struct event_pool *event_pool)
{
struct pollfd *ufds = NULL;
int size = 0;
int i = 0;
int ret = -1;
GF_VALIDATE_OR_GOTO("event", event_pool, out);
pthread_mutex_lock(&event_pool->mutex);
{
event_pool->activethreadcount = 1;
}
pthread_mutex_unlock(&event_pool->mutex);
while (1) {
pthread_mutex_lock(&event_pool->mutex);
{
if (event_pool->destroy == 1) {
event_pool->activethreadcount = 0;
pthread_cond_broadcast(&event_pool->cond);
pthread_mutex_unlock(&event_pool->mutex);
return 0;
}
}
pthread_mutex_unlock(&event_pool->mutex);
size = event_dispatch_poll_resize(event_pool, ufds, size);
ufds = event_pool->evcache;
ret = poll(ufds, size, 1);
if (ret == 0)
/* timeout */
continue;
if (ret == -1 && errno == EINTR)
/* sys call */
continue;
for (i = 0; i < size; i++) {
if (!ufds[i].revents)
continue;
event_dispatch_poll_handler(event_pool, ufds, i);
}
}
out:
return -1;
}
int
event_reconfigure_threads_poll(struct event_pool *event_pool, int value)
{
/* No-op for poll */
return 0;
}
/* This function is the destructor for the event_pool data structure
* Should be called only after poller_threads_destroy() is called,
* else will lead to crashes.
*/
static int
event_pool_destroy_poll(struct event_pool *event_pool)
{
int ret = 0;
ret = sys_close(event_pool->breaker[0]);
if (ret)
return ret;
ret = sys_close(event_pool->breaker[1]);
if (ret)
return ret;
event_pool->breaker[0] = event_pool->breaker[1] = -1;
GF_FREE(event_pool->reg);
GF_FREE(event_pool);
return ret;
}
struct event_ops event_ops_poll = {
.new = event_pool_new_poll,
.event_register = event_register_poll,
.event_select_on = event_select_on_poll,
.event_unregister = event_unregister_poll,
.event_unregister_close = event_unregister_close_poll,
.event_dispatch = event_dispatch_poll,
.event_reconfigure_threads = event_reconfigure_threads_poll,
.event_pool_destroy = event_pool_destroy_poll};