Blob Blame History Raw
/*
  Copyright (c) 2008-2012 Red Hat, Inc. <http://www.redhat.com>
  This file is part of GlusterFS.

  This file is licensed to you under your choice of the GNU Lesser
  General Public License, version 3 or any later version (LGPLv3 or
  later), or the GNU General Public License, version 2 (GPLv2), in all
  cases as published by the Free Software Foundation.
*/

#include <sys/poll.h>
#include <pthread.h>
#include <unistd.h>
#include <fcntl.h>
#include <stdlib.h>
#include <errno.h>
#include <string.h>

#include "glusterfs/logging.h"
#include "glusterfs/gf-event.h"
#include "glusterfs/mem-pool.h"
#include "glusterfs/common-utils.h"
#include "glusterfs/libglusterfs-messages.h"
#include "glusterfs/syscall.h"

struct event_pool *
event_pool_new(int count, int eventthreadcount)
{
    struct event_pool *event_pool = NULL;
    extern struct event_ops event_ops_poll;

#ifdef HAVE_SYS_EPOLL_H
    extern struct event_ops event_ops_epoll;

    event_pool = event_ops_epoll.new(count, eventthreadcount);

    if (event_pool) {
        event_pool->ops = &event_ops_epoll;
    } else {
        gf_msg("event", GF_LOG_WARNING, 0, LG_MSG_FALLBACK_TO_POLL,
               "falling back to poll based event handling");
    }
#endif

    if (!event_pool) {
        event_pool = event_ops_poll.new(count, eventthreadcount);

        if (event_pool)
            event_pool->ops = &event_ops_poll;
    }

    return event_pool;
}

int
event_register(struct event_pool *event_pool, int fd, event_handler_t handler,
               void *data, int poll_in, int poll_out, char notify_poller_death)
{
    int ret = -1;

    GF_VALIDATE_OR_GOTO("event", event_pool, out);

    ret = event_pool->ops->event_register(
        event_pool, fd, handler, data, poll_in, poll_out, notify_poller_death);
out:
    return ret;
}

int
event_unregister(struct event_pool *event_pool, int fd, int idx)
{
    int ret = -1;

    GF_VALIDATE_OR_GOTO("event", event_pool, out);

    ret = event_pool->ops->event_unregister(event_pool, fd, idx);

out:
    return ret;
}

int
event_unregister_close(struct event_pool *event_pool, int fd, int idx)
{
    int ret = -1;

    GF_VALIDATE_OR_GOTO("event", event_pool, out);

    ret = event_pool->ops->event_unregister_close(event_pool, fd, idx);

out:
    return ret;
}

int
event_select_on(struct event_pool *event_pool, int fd, int idx_hint,
                int poll_in, int poll_out)
{
    int ret = -1;

    GF_VALIDATE_OR_GOTO("event", event_pool, out);

    ret = event_pool->ops->event_select_on(event_pool, fd, idx_hint, poll_in,
                                           poll_out);
out:
    return ret;
}

int
event_dispatch(struct event_pool *event_pool)
{
    int ret = -1;

    GF_VALIDATE_OR_GOTO("event", event_pool, out);

    ret = event_pool->ops->event_dispatch(event_pool);
    if (ret)
        goto out;

out:
    return ret;
}

int
event_reconfigure_threads(struct event_pool *event_pool, int value)
{
    int ret = -1;

    GF_VALIDATE_OR_GOTO("event", event_pool, out);

    /* call event refresh function */
    ret = event_pool->ops->event_reconfigure_threads(event_pool, value);

out:
    return ret;
}

int
event_pool_destroy(struct event_pool *event_pool)
{
    int ret = -1;
    int destroy = 0, activethreadcount = 0;

    GF_VALIDATE_OR_GOTO("event", event_pool, out);

    pthread_mutex_lock(&event_pool->mutex);
    {
        destroy = event_pool->destroy;
        activethreadcount = event_pool->activethreadcount;
    }
    pthread_mutex_unlock(&event_pool->mutex);

    if (!destroy || (activethreadcount > 0)) {
        goto out;
    }

    ret = event_pool->ops->event_pool_destroy(event_pool);
out:
    return ret;
}

void
poller_destroy_handler(int fd, int idx, int gen, void *data, int poll_out,
                       int poll_in, int poll_err, char event_thread_exit)
{
    struct event_destroy_data *destroy = NULL;
    int readfd = -1;
    char buf = '\0';

    destroy = data;
    readfd = destroy->readfd;
    if (readfd < 0) {
        goto out;
    }

    while (sys_read(readfd, &buf, 1) > 0) {
    }

out:
    event_handled(destroy->pool, fd, idx, gen);

    return;
}

/* This function destroys all the poller threads.
 * Note: to be called before event_pool_destroy is called.
 * The order in which cleaning is performed:
 * - Register a pipe fd(this is for waking threads in poll()/epoll_wait())
 * - Set the destroy mode, which this no new event registration will succeed
 * - Reconfigure the thread count to 0(this will succeed only in destroy mode)
 * - Wake up all the threads in poll() or epoll_wait(), so that they can
 *   destroy themselves.
 * - Wait for the thread to join(which will happen only after all the other
 *   threads are destroyed)
 */
int
event_dispatch_destroy(struct event_pool *event_pool)
{
    int ret = -1, threadcount = 0;
    int fd[2] = {-1};
    int idx = -1;
    int flags = 0;
    struct timespec sleep_till = {
        0,
    };
    struct event_destroy_data data = {
        0,
    };

    GF_VALIDATE_OR_GOTO("event", event_pool, out);

    ret = pipe(fd);
    if (ret < 0)
        goto out;

    /* Make the read end of the pipe nonblocking */
    flags = fcntl(fd[0], F_GETFL);
    flags |= O_NONBLOCK;
    ret = fcntl(fd[0], F_SETFL, flags);
    if (ret < 0)
        goto out;

    /* Make the write end of the pipe nonblocking */
    flags = fcntl(fd[1], F_GETFL);
    flags |= O_NONBLOCK;
    ret = fcntl(fd[1], F_SETFL, flags);
    if (ret < 0)
        goto out;

    data.pool = event_pool;
    data.readfd = fd[1];

    /* From the main thread register an event on the pipe fd[0],
     */
    idx = event_register(event_pool, fd[0], poller_destroy_handler, &data, 1, 0,
                         0);
    if (idx < 0)
        goto out;

    /* Enter the destroy mode first, set this before reconfiguring to 0
     * threads, to prevent further reconfigure to thread count > 0.
     */
    pthread_mutex_lock(&event_pool->mutex);
    {
        threadcount = event_pool->eventthreadcount;
        event_pool->destroy = 1;
    }
    pthread_mutex_unlock(&event_pool->mutex);

    ret = event_reconfigure_threads(event_pool, 0);
    if (ret < 0)
        goto out;

    /* Write something onto the write end of the pipe(fd[1]) so that
     * poll wakes up and calls the handler, poller_destroy_handler()
     */
    pthread_mutex_lock(&event_pool->mutex);
    {
        /* Write to pipe(fd[1]) and then wait for 1 second or until
         * a poller thread that is dying, broadcasts. Make sure we
         * do not loop forever by limiting to 10 retries
         */
        int retry = 0;

        while (event_pool->activethreadcount > 0 &&
               (retry++ < (threadcount + 10))) {
            if (sys_write(fd[1], "dummy", 6) == -1) {
                break;
            }
            clock_gettime(CLOCK_REALTIME, &sleep_till);
            sleep_till.tv_sec += 1;
            ret = pthread_cond_timedwait(&event_pool->cond, &event_pool->mutex,
                                         &sleep_till);
            if (ret) {
                gf_msg_debug("event", 0,
                             "thread cond-timedwait failed "
                             "active-thread-count: %d, "
                             "retry: %d",
                             event_pool->activethreadcount, retry);
            }
        }
    }
    pthread_mutex_unlock(&event_pool->mutex);

    ret = event_unregister(event_pool, fd[0], idx);

out:
    if (fd[0] != -1)
        sys_close(fd[0]);
    if (fd[1] != -1)
        sys_close(fd[1]);

    return ret;
}

int
event_handled(struct event_pool *event_pool, int fd, int idx, int gen)
{
    int ret = 0;

    if (event_pool->ops->event_handled)
        ret = event_pool->ops->event_handled(event_pool, fd, idx, gen);

    return ret;
}