Blob Blame History Raw
/*  vim:expandtab:shiftwidth=2:tabstop=2:smarttab:
 * 
 *  Libmemcached library
 *
 *  Copyright (C) 2011 Data Differential, http://datadifferential.com/
 *  Copyright (C) 2010 Brian Aker All rights reserved.
 *
 *  Redistribution and use in source and binary forms, with or without
 *  modification, are permitted provided that the following conditions are
 *  met:
 *
 *      * Redistributions of source code must retain the above copyright
 *  notice, this list of conditions and the following disclaimer.
 *
 *      * Redistributions in binary form must reproduce the above
 *  copyright notice, this list of conditions and the following disclaimer
 *  in the documentation and/or other materials provided with the
 *  distribution.
 *
 *      * The names of its contributors may not be used to endorse or
 *  promote products derived from this software without specific prior
 *  written permission.
 *
 *  THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
 *  "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
 *  LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
 *  A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
 *  OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
 *  SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
 *  LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
 *  DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
 *  THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
 *  (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
 *  OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
 *
 */


#include <libmemcachedutil/common.h>

#include <cassert>
#include <cerrno>
#include <pthread.h>
#include <memory>

struct memcached_pool_st
{
  pthread_mutex_t mutex;
  pthread_cond_t cond;
  memcached_st *master;
  memcached_st **server_pool;
  int firstfree;
  const uint32_t size;
  uint32_t current_size;
  bool _owns_master;
  struct timespec _timeout;

  memcached_pool_st(memcached_st *master_arg, size_t max_arg) :
    master(master_arg),
    server_pool(NULL),
    firstfree(-1),
    size(uint32_t(max_arg)),
    current_size(0),
    _owns_master(false)
  {
    pthread_mutex_init(&mutex, NULL);
    pthread_cond_init(&cond, NULL);
    _timeout.tv_sec= 5;
    _timeout.tv_nsec= 0;
  }

  const struct timespec& timeout() const
  {
    return _timeout;
  }

  bool release(memcached_st*, memcached_return_t& rc);

  memcached_st *fetch(memcached_return_t& rc);
  memcached_st *fetch(const struct timespec&, memcached_return_t& rc);

  bool init(uint32_t initial);

  ~memcached_pool_st()
  {
    for (int x= 0; x <= firstfree; ++x)
    {
      memcached_free(server_pool[x]);
      server_pool[x]= NULL;
    }

    int error;
    if ((error= pthread_mutex_destroy(&mutex)) != 0)
    {
      assert_vmsg(error != 0, "pthread_mutex_destroy() %s(%d)", strerror(error), error);
    }

    if ((error= pthread_cond_destroy(&cond)) != 0)
    {
      assert_vmsg(error != 0, "pthread_cond_destroy() %s", strerror(error));
    }

    delete [] server_pool;
    if (_owns_master)
    {
      memcached_free(master);
    }
  }

  void increment_version()
  {
    ++master->configure.version;
  }

  bool compare_version(const memcached_st *arg) const
  {
    return (arg->configure.version == version());
  }

  int32_t version() const
  {
    return master->configure.version;
  }
};


/**
 * Grow the connection pool by creating a connection structure and clone the
 * original memcached handle.
 */
static bool grow_pool(memcached_pool_st* pool)
{
  assert(pool);

  memcached_st *obj;
  if (not (obj= memcached_clone(NULL, pool->master)))
  {
    return false;
  }

  pool->server_pool[++pool->firstfree]= obj;
  pool->current_size++;
  obj->configure.version= pool->version();

  return true;
}

bool memcached_pool_st::init(uint32_t initial)
{
  server_pool= new (std::nothrow) memcached_st *[size];
  if (server_pool == NULL)
  {
    return false;
  }

  /*
    Try to create the initial size of the pool. An allocation failure at
    this time is not fatal..
  */
  for (unsigned int x= 0; x < initial; ++x)
  {
    if (grow_pool(this) == false)
    {
      break;
    }
  }

  return true;
}


static inline memcached_pool_st *_pool_create(memcached_st* master, uint32_t initial, uint32_t max)
{
  if (initial == 0 or max == 0 or (initial > max))
  {
    return NULL;
  }

  memcached_pool_st *object= new (std::nothrow) memcached_pool_st(master, max);
  if (object == NULL)
  {
    return NULL;
  }

  /*
    Try to create the initial size of the pool. An allocation failure at
    this time is not fatal..
  */
  if (not object->init(initial))
  {
    delete object;
    return NULL;
  }

  return object;
}

memcached_pool_st *memcached_pool_create(memcached_st* master, uint32_t initial, uint32_t max)
{
  return _pool_create(master, initial, max);
}

memcached_pool_st * memcached_pool(const char *option_string, size_t option_string_length)
{
  memcached_st *memc= memcached(option_string, option_string_length);

  if (memc == NULL)
  {
    return NULL;
  }

  memcached_pool_st *self= memcached_pool_create(memc, memc->configure.initial_pool_size, memc->configure.max_pool_size);
  if (self == NULL)
  {
    memcached_free(memc);
    return NULL;
  }

  self->_owns_master= true;

  return self;
}

memcached_st*  memcached_pool_destroy(memcached_pool_st* pool)
{
  if (pool == NULL)
  {
    return NULL;
  }

  // Legacy that we return the original structure
  memcached_st *ret= NULL;
  if (pool->_owns_master)
  { }
  else
  {
    ret= pool->master;
  }

  delete pool;

  return ret;
}

memcached_st* memcached_pool_st::fetch(memcached_return_t& rc)
{
  static struct timespec relative_time= { 0, 0 };
  return fetch(relative_time, rc);
}

memcached_st* memcached_pool_st::fetch(const struct timespec& relative_time, memcached_return_t& rc)
{
  rc= MEMCACHED_SUCCESS;

  int error;
  if ((error= pthread_mutex_lock(&mutex)) != 0)
  {
    rc= MEMCACHED_IN_PROGRESS;
    return NULL;
  }

  memcached_st *ret= NULL;
  do
  {
    if (firstfree > -1)
    {
      ret= server_pool[firstfree--];
    }
    else if (current_size == size)
    {
      if (relative_time.tv_sec == 0 and relative_time.tv_nsec == 0)
      {
        error= pthread_mutex_unlock(&mutex);
        rc= MEMCACHED_NOTFOUND;

        return NULL;
      }

      struct timespec time_to_wait= {0, 0};
      time_to_wait.tv_sec= time(NULL) +relative_time.tv_sec;
      time_to_wait.tv_nsec= relative_time.tv_nsec;

      int thread_ret;
      if ((thread_ret= pthread_cond_timedwait(&cond, &mutex, &time_to_wait)) != 0)
      {
        int unlock_error;
        if ((unlock_error= pthread_mutex_unlock(&mutex)) != 0)
        {
          assert_vmsg(error != 0, "pthread_mutex_unlock() %s", strerror(error));
        }

        if (thread_ret == ETIMEDOUT)
        {
          rc= MEMCACHED_TIMEOUT;
        }
        else
        {
          errno= thread_ret;
          rc= MEMCACHED_ERRNO;
        }

        return NULL;
      }
    }
    else if (grow_pool(this) == false)
    {
      int unlock_error;
      if ((unlock_error= pthread_mutex_unlock(&mutex)) != 0)
      {
        assert_vmsg(error != 0, "pthread_mutex_unlock() %s", strerror(error));
      }

      return NULL;
    }
  } while (ret == NULL);

  if ((error= pthread_mutex_unlock(&mutex)) != 0)
  {
    assert_vmsg(error != 0, "pthread_mutex_unlock() %s", strerror(error));
  }

  return ret;
}

bool memcached_pool_st::release(memcached_st *released, memcached_return_t& rc)
{
  rc= MEMCACHED_SUCCESS;
  if (released == NULL)
  {
    rc= MEMCACHED_INVALID_ARGUMENTS;
    return false;
  }

  int error;
  if ((error= pthread_mutex_lock(&mutex)))
  {
    rc= MEMCACHED_IN_PROGRESS;
    return false;
  }

  /* 
    Someone updated the behavior on the object, so we clone a new memcached_st with the new settings. If we fail to clone, we keep the old one around.
  */
  if (compare_version(released) == false)
  {
    memcached_st *memc;
    if ((memc= memcached_clone(NULL, master)))
    {
      memcached_free(released);
      released= memc;
    }
  }

  server_pool[++firstfree]= released;

  if (firstfree == 0 and current_size == size)
  {
    /* we might have people waiting for a connection.. wake them up :-) */
    if ((error= pthread_cond_broadcast(&cond)) != 0)
    {
      assert_vmsg(error != 0, "pthread_cond_broadcast() %s", strerror(error));
    }
  }

  if ((error= pthread_mutex_unlock(&mutex)) != 0)
  {
  }

  return true;
}

memcached_st* memcached_pool_fetch(memcached_pool_st* pool, struct timespec* relative_time, memcached_return_t* rc)
{
  if (pool == NULL)
  {
    return NULL;
  }

  memcached_return_t unused;
  if (rc == NULL)
  {
    rc= &unused;
  }

  if (relative_time == NULL)
  {
    return pool->fetch(*rc);
  }

  return pool->fetch(*relative_time, *rc);
}

memcached_st* memcached_pool_pop(memcached_pool_st* pool,
                                 bool block,
                                 memcached_return_t *rc)
{
  if (pool == NULL)
  {
    return NULL;
  }

  memcached_return_t unused;
  if (rc == NULL)
  {
    rc= &unused;
  }

  memcached_st *memc;
  if (block)
  {
    memc= pool->fetch(pool->timeout(), *rc);
  }
  else
  {
    memc= pool->fetch(*rc);
  }

  return memc;
}

memcached_return_t memcached_pool_release(memcached_pool_st* pool, memcached_st *released)
{
  if (pool == NULL)
  {
    return MEMCACHED_INVALID_ARGUMENTS;
  }

  memcached_return_t rc;

  (void) pool->release(released, rc);

  return rc;
}

memcached_return_t memcached_pool_push(memcached_pool_st* pool, memcached_st *released)
{
  return memcached_pool_release(pool, released);
}


memcached_return_t memcached_pool_behavior_set(memcached_pool_st *pool,
                                               memcached_behavior_t flag,
                                               uint64_t data)
{
  if (pool == NULL)
  {
    return MEMCACHED_INVALID_ARGUMENTS;
  }

  int error;
  if ((error= pthread_mutex_lock(&pool->mutex)))
  {
    return MEMCACHED_IN_PROGRESS;
  }

  /* update the master */
  memcached_return_t rc= memcached_behavior_set(pool->master, flag, data);
  if (memcached_failed(rc))
  {
    if ((error= pthread_mutex_unlock(&pool->mutex)) != 0)
    {
      assert_vmsg(error != 0, "pthread_mutex_unlock() %s", strerror(error));
    }
    return rc;
  }

  pool->increment_version();
  /* update the clones */
  for (int xx= 0; xx <= pool->firstfree; ++xx)
  {
    if (memcached_success(memcached_behavior_set(pool->server_pool[xx], flag, data)))
    {
      pool->server_pool[xx]->configure.version= pool->version();
    }
    else
    {
      memcached_st *memc;
      if ((memc= memcached_clone(NULL, pool->master)))
      {
        memcached_free(pool->server_pool[xx]);
        pool->server_pool[xx]= memc;
        /* I'm not sure what to do in this case.. this would happen
          if we fail to push the server list inside the client..
          I should add a testcase for this, but I believe the following
          would work, except that you would add a hole in the pool list..
          in theory you could end up with an empty pool....
        */
      }
    }
  }

  if ((error= pthread_mutex_unlock(&pool->mutex)) != 0)
  {
    assert_vmsg(error != 0, "pthread_mutex_unlock() %s", strerror(error));
  }

  return rc;
}

memcached_return_t memcached_pool_behavior_get(memcached_pool_st *pool,
                                               memcached_behavior_t flag,
                                               uint64_t *value)
{
  if (pool == NULL)
  {
    return MEMCACHED_INVALID_ARGUMENTS;
  }

  int error;
  if ((error= pthread_mutex_lock(&pool->mutex)))
  {
    return MEMCACHED_IN_PROGRESS;
  }

  *value= memcached_behavior_get(pool->master, flag);

  if ((error= pthread_mutex_unlock(&pool->mutex)) != 0)
  {
    assert_vmsg(error != 0, "pthread_mutex_unlock() %s", strerror(error));
  }

  return MEMCACHED_SUCCESS;
}