/*
* sockem - socket-level network emulation
*
* Copyright (c) 2016, Magnus Edenhill, Andreas Smas
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are met:
*
* 1. Redistributions of source code must retain the above copyright notice,
* this list of conditions and the following disclaimer.
* 2. Redistributions in binary form must reproduce the above copyright notice,
* this list of conditions and the following disclaimer in the documentation
* and/or other materials provided with the distribution.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
* AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
* ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
* LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
* CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
* SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
* INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
* CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
* ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
* POSSIBILITY OF SUCH DAMAGE.
*/
#define _GNU_SOURCE /* for strdupa() and RTLD_NEXT */
#include <errno.h>
#include <unistd.h>
#include <stdlib.h>
#include <string.h>
#include <stdarg.h>
#include <stdio.h>
#include <poll.h>
#include <assert.h>
#include <netinet/in.h>
#include <dlfcn.h>
#include "sockem.h"
#include <sys/queue.h>
#ifdef __APPLE__
#include <sys/time.h> /* for gettimeofday() */
#endif
#ifdef _MSC_VER
#define socket_errno() WSAGetLastError()
#else
#define socket_errno() errno
#define SOCKET_ERROR -1
#endif
#ifndef strdupa
#define strdupa(s) \
({ \
const char *_s = (s); \
size_t _len = strlen(_s)+1; \
char *_d = (char *)alloca(_len); \
(char *)memcpy(_d, _s, _len); \
})
#endif
#include <pthread.h>
typedef pthread_mutex_t mtx_t;
#define mtx_init(M) pthread_mutex_init(M, NULL)
#define mtx_destroy(M) pthread_mutex_destroy(M)
#define mtx_lock(M) pthread_mutex_lock(M)
#define mtx_unlock(M) pthread_mutex_unlock(M)
typedef pthread_t thrd_t;
#define thrd_create(THRD,START_ROUTINE,ARG) \
pthread_create(THRD, NULL, START_ROUTINE, ARG)
#define thrd_join(THRD,RETVAL) \
pthread_join(THRD, NULL)
static mtx_t sockem_lock;
static LIST_HEAD(, sockem_s) sockems;
static pthread_once_t sockem_once = PTHREAD_ONCE_INIT;
static char *sockem_conf_str = "";
typedef int64_t sockem_ts_t;
#ifdef LIBSOCKEM_PRELOAD
static int (*sockem_orig_connect) (int, const struct sockaddr *, socklen_t);
static int (*sockem_orig_close) (int);
#define sockem_close0(S) (sockem_orig_close(S))
#define sockem_connect0(S,A,AL) (sockem_orig_connect(S,A,AL))
#else
#define sockem_close0(S) close(S)
#define sockem_connect0(S,A,AL) connect(S,A,AL)
#endif
struct sockem_conf {
/* FIXME: these needs to be implemented */
int tx_thruput; /* app->peer bytes/second */
int rx_thruput; /* peer->app bytes/second */
int delay; /* latency in ms */
int jitter; /* latency variation in ms */
int debug; /* enable sockem printf debugging */
size_t recv_bufsz; /* recv chunk/buffer size */
int direct; /* direct forward, no delay or rate-limiting */
};
typedef struct sockem_buf_s {
TAILQ_ENTRY(sockem_buf_s) sb_link;
size_t sb_size;
size_t sb_of;
char *sb_data;
int64_t sb_at; /* Transmit at this absolute time. */
} sockem_buf_t;
struct sockem_s {
LIST_ENTRY(sockem_s) link;
enum {
/* Forwarder thread run states */
SOCKEM_INIT,
SOCKEM_START,
SOCKEM_RUN,
SOCKEM_TERM
} run;
int as; /* application's socket. */
int ls; /* internal application listen socket */
int ps; /* internal peer socket connecting sockem to the peer.*/
void *recv_buf; /* Receive buffer */
size_t recv_bufsz; /* .. size */
int linked; /* On sockems list */
thrd_t thrd; /* Forwarder thread */
mtx_t lock;
struct sockem_conf conf; /* application-set config.
* protected by .lock */
struct sockem_conf use; /* last copy of .conf
* local to skm thread */
TAILQ_HEAD(, sockem_buf_s) bufs; /* Buffers in queue waiting for
* transmission (delayed) */
size_t bufs_size; /* Total number of bytes currently enqueued
* for transmission */
size_t bufs_size_max; /* Soft max threshold for bufs_size,
* when this value is exceeded the app fd
* is removed from the poll set until
* bufs_size falls below the threshold again. */
int poll_fd_cnt;
int64_t ts_last_fwd; /* For rate-limiter: timestamp of last forward */
};
static int sockem_vset (sockem_t *skm, va_list ap);
/**
* A microsecond monotonic clock
*/
static __attribute__((unused)) __inline int64_t sockem_clock (void) {
#ifdef __APPLE__
/* No monotonic clock on Darwin */
struct timeval tv;
gettimeofday(&tv, NULL);
return ((int64_t)tv.tv_sec * 1000000LLU) + (int64_t)tv.tv_usec;
#elif _MSC_VER
return (int64_t)GetTickCount64() * 1000LLU;
#else
struct timespec ts;
clock_gettime(CLOCK_MONOTONIC, &ts);
return ((int64_t)ts.tv_sec * 1000000LLU) +
((int64_t)ts.tv_nsec / 1000LLU);
#endif
}
/**
* @brief Initialize libsockem once.
*/
static void sockem_init (void) {
mtx_init(&sockem_lock);
sockem_conf_str = getenv("SOCKEM_CONF");
if (!sockem_conf_str)
sockem_conf_str = "";
if (strstr(sockem_conf_str, "debug"))
fprintf(stderr, "%% libsockem pre-loaded (%s)\n",
sockem_conf_str);
#ifdef LIBSOCKEM_PRELOAD
sockem_orig_connect = dlsym(RTLD_NEXT, "connect");
sockem_orig_close = dlsym(RTLD_NEXT, "close");
#endif
}
/**
* @returns the maximum waittime in ms for poll()
* @remark lock must be held
*/
static int sockem_calc_waittime (sockem_t *skm, int64_t now) {
const sockem_buf_t *sb;
if (!(sb = TAILQ_FIRST(&skm->bufs)))
return 1000;
else if (now > sb->sb_at)
return 0;
else
return (sb->sb_at - now) / 1000;
}
/**
* @brief Unlink and destroy a buffer
*/
static void sockem_buf_destroy (sockem_t *skm, sockem_buf_t *sb) {
skm->bufs_size -= sb->sb_size - sb->sb_of;
TAILQ_REMOVE(&skm->bufs, sb, sb_link);
free(sb);
}
/**
* @brief Add delayed buffer to transmit.
*/
static sockem_buf_t *sockem_buf_add (sockem_t *skm,
size_t size, const void *data) {
sockem_buf_t *sb;
skm->bufs_size += size;
if (skm->bufs_size > skm->bufs_size_max) {
/* No more buffer space, halt recv fd until
* queued buffers drop below threshold. */
skm->poll_fd_cnt = 1;
}
sb = malloc(sizeof(*sb) + size);
sb->sb_of = 0;
sb->sb_size = size;
sb->sb_data = (char *)(sb+1);
sb->sb_at = sockem_clock() +
((skm->use.delay +
(skm->use.jitter / 2)/*FIXME*/) * 1000);
memcpy(sb->sb_data, data, size);
TAILQ_INSERT_TAIL(&skm->bufs, sb, sb_link);
return sb;
}
/**
* @brief Forward any delayed buffers that have passed their deadline
* @remark lock must be held but will be released momentarily while
* performing send syscall.
*/
static int sockem_fwd_bufs (sockem_t *skm, int ofd) {
sockem_buf_t *sb;
int64_t now = sockem_clock();
size_t to_write;
int64_t elapsed = now - skm->ts_last_fwd;
if (!elapsed)
return 0;
/* Calculate how many bytes to send to adhere to rate-limit */
to_write = (size_t)((double)skm->use.tx_thruput *
((double)elapsed / 1000000.0));
while (to_write > 0 &&
(sb = TAILQ_FIRST(&skm->bufs)) && sb->sb_at <= now) {
ssize_t r;
size_t remain = sb->sb_size - sb->sb_of;
size_t wr = to_write < remain ? to_write : remain;
if (wr == 0)
break;
mtx_unlock(&skm->lock);
r = send(ofd, sb->sb_data+sb->sb_of, wr, 0);
mtx_lock(&skm->lock);
if (r == -1) {
if (errno == ENOBUFS || errno == EAGAIN ||
errno == EWOULDBLOCK)
return 0;
return -1;
}
skm->ts_last_fwd = now;
sb->sb_of += r;
to_write -= r;
if (sb->sb_of < sb->sb_size)
break;
sockem_buf_destroy(skm, sb);
now = sockem_clock();
}
/* Re-enable app fd poll if queued buffers are below threshold */
if (skm->bufs_size < skm->bufs_size_max)
skm->poll_fd_cnt = 2;
return 0;
}
/**
* @brief read from \p ifd, write to \p ofd in a blocking fashion.
*
* @returns the number of bytes forwarded, or -1 on error.
*/
static int sockem_recv_fwd (sockem_t *skm, int ifd, int ofd, int direct) {
ssize_t r, wr;
r = recv(ifd, skm->recv_buf, skm->recv_bufsz, MSG_DONTWAIT);
if (r == -1) {
int serr = socket_errno();
if (serr == EAGAIN || serr == EWOULDBLOCK)
return 0;
return -1;
} else if (r == 0) {
/* Socket closed */
return -1;
}
if (direct) {
/* No delay or rate limit, send right away */
wr = send(ofd, skm->recv_buf, r, 0);
if (wr < r)
return -1;
return wr;
} else {
sockem_buf_add(skm, r, skm->recv_buf);
return r;
}
}
/**
* @brief Close all sockets and unsets ->run.
* @remark Preserves caller's errno.
* @remark lock must be held.
*/
static void sockem_close_all (sockem_t *skm) {
int serr = socket_errno();
if (skm->ls != -1) {
sockem_close0(skm->ls);
skm->ls = -1;
}
if (skm->ps != -1) {
sockem_close0(skm->ps);
skm->ps = -1;
}
skm->run = SOCKEM_TERM;
errno = serr;
}
/**
* @brief Copy desired (app) config to internally use(d) configuration.
* @remark lock must be held
*/
static __inline void sockem_conf_use (sockem_t *skm) {
skm->use = skm->conf;
/* Figure out if direct forward is to be used */
skm->use.direct = !(skm->use.delay || skm->use.jitter ||
(skm->use.tx_thruput < (1 << 30)));
}
/**
* @brief sockem internal per-socket forwarder thread
*/
static void *sockem_run (void *arg) {
sockem_t *skm = arg;
int cs = -1;
int ls;
struct pollfd pfd[2];
mtx_lock(&skm->lock);
if (skm->run == SOCKEM_START)
skm->run = SOCKEM_RUN;
sockem_conf_use(skm);
ls = skm->ls;
mtx_unlock(&skm->lock);
skm->recv_bufsz = skm->use.recv_bufsz;
skm->recv_buf = malloc(skm->recv_bufsz);
/* Accept connection from sockfd in sockem_connect() */
cs = accept(ls, NULL, 0);
if (cs == -1) {
mtx_lock(&skm->lock);
if (skm->run == SOCKEM_TERM) {
/* App socket was closed. */
goto done;
}
fprintf(stderr, "%% sockem: accept(%d) failed: %s\n",
ls, strerror(socket_errno()));
assert(cs != -1);
}
/* Set up poll (blocking IO) */
memset(pfd, 0, sizeof(pfd));
pfd[1].fd = cs;
pfd[1].events = POLLIN;
mtx_lock(&skm->lock);
pfd[0].fd = skm->ps;
mtx_unlock(&skm->lock);
pfd[0].events = POLLIN;
skm->poll_fd_cnt = 2;
mtx_lock(&skm->lock);
while (skm->run == SOCKEM_RUN) {
int r;
int i;
int waittime = sockem_calc_waittime(skm, sockem_clock());
mtx_unlock(&skm->lock);
r = poll(pfd, skm->poll_fd_cnt, waittime);
if (r == -1)
break;
/* Send/forward delayed buffers */
mtx_lock(&skm->lock);
sockem_conf_use(skm);
if (sockem_fwd_bufs(skm, skm->ps) == -1) {
mtx_unlock(&skm->lock);
skm->run = SOCKEM_TERM;
break;
}
mtx_unlock(&skm->lock);
for (i = 0 ; r > 0 && i < 2 ; i++) {
if (pfd[i].revents & (POLLHUP|POLLERR)) {
skm->run = SOCKEM_TERM;
} else if (pfd[i].revents & POLLIN) {
if (sockem_recv_fwd(
skm,
pfd[i].fd,
pfd[i^1].fd,
/* direct mode for app socket
* with delay, and always for
* peer socket (receive channel) */
i == 0 || skm->use.direct) == -1) {
skm->run = SOCKEM_TERM;
break;
}
}
}
mtx_lock(&skm->lock);
}
done:
if (cs != -1)
sockem_close0(cs);
sockem_close_all(skm);
mtx_unlock(&skm->lock);
free(skm->recv_buf);
return NULL;
}
/**
* @brief Connect socket \p s to \p addr
*/
static int sockem_do_connect (int s, const struct sockaddr *addr,
socklen_t addrlen) {
int r;
r = sockem_connect0(s, addr, addrlen);
if (r == SOCKET_ERROR) {
int serr = socket_errno();
if (serr != EINPROGRESS
#ifdef _MSC_VER
&& serr != WSAEWOULDBLOCK
#endif
) {
#ifndef _MSC_VER
errno = serr;
#endif
return -1;
}
}
return 0;
}
sockem_t *sockem_connect (int sockfd, const struct sockaddr *addr,
socklen_t addrlen, ...) {
sockem_t *skm;
int ls, ps;
struct sockaddr_in6 sin6 = { .sin6_family = addr->sa_family };
socklen_t addrlen2 = addrlen;
va_list ap;
pthread_once(&sockem_once, sockem_init);
/* Create internal app listener socket */
ls = socket(addr->sa_family, SOCK_STREAM, IPPROTO_TCP);
if (ls == -1)
return NULL;
if (bind(ls, (struct sockaddr *)&sin6, addrlen) == -1) {
sockem_close0(ls);
return NULL;
}
/* Get bound address */
if (getsockname(ls, (struct sockaddr *)&sin6, &addrlen2) == -1) {
sockem_close0(ls);
return NULL;
}
if (listen(ls, 1) == -1) {
sockem_close0(ls);
return NULL;
}
/* Create internal peer socket */
ps = socket(addr->sa_family, SOCK_STREAM, IPPROTO_TCP);
if (ps == -1) {
sockem_close0(ls);
return NULL;
}
/* Connect to peer */
if (sockem_do_connect(ps, addr, addrlen) == -1) {
sockem_close0(ls);
sockem_close0(ps);
return NULL;
}
/* Create sockem handle */
skm = calloc(1, sizeof(*skm));
skm->as = sockfd;
skm->ls = ls;
skm->ps = ps;
skm->bufs_size_max = 16 * 1024 * 1024; /* 16kb of queue buffer */
TAILQ_INIT(&skm->bufs);
mtx_init(&skm->lock);
/* Default config */
skm->conf.rx_thruput = 1 << 30;
skm->conf.tx_thruput = 1 << 30;
skm->conf.delay = 0;
skm->conf.jitter = 0;
skm->conf.recv_bufsz = 1024*1024;
skm->conf.direct = 1;
/* Apply passed configuration */
va_start(ap, addrlen);
if (sockem_vset(skm, ap) == -1) {
va_end(ap);
sockem_close(skm);
return NULL;
}
va_end(ap);
mtx_lock(&skm->lock);
skm->run = SOCKEM_START;
/* Create pipe thread */
if (thrd_create(&skm->thrd, sockem_run, skm) != 0) {
mtx_unlock(&skm->lock);
sockem_close(skm);
return NULL;
}
mtx_unlock(&skm->lock);
/* Connect application socket to listen socket */
if (sockem_do_connect(sockfd,
(struct sockaddr *)&sin6, addrlen2) == -1) {
sockem_close(skm);
return NULL;
}
mtx_lock(&sockem_lock);
LIST_INSERT_HEAD(&sockems, skm, link);
mtx_lock(&skm->lock);
skm->linked = 1;
mtx_unlock(&skm->lock);
mtx_unlock(&sockem_lock);
return skm;
}
/**
* @brief Purge/drop all queued buffers
*/
static void sockem_bufs_purge (sockem_t *skm) {
sockem_buf_t *sb;
while ((sb = TAILQ_FIRST(&skm->bufs)))
sockem_buf_destroy(skm, sb);
}
void sockem_close (sockem_t *skm) {
mtx_lock(&sockem_lock);
mtx_lock(&skm->lock);
if (skm->linked)
LIST_REMOVE(skm, link);
mtx_unlock(&sockem_lock);
/* If thread is running let it close the sockets
* to avoid race condition. */
if (skm->run == SOCKEM_START ||
skm->run == SOCKEM_RUN)
skm->run = SOCKEM_TERM;
else
sockem_close_all(skm);
mtx_unlock(&skm->lock);
thrd_join(skm->thrd, NULL);
sockem_bufs_purge(skm);
mtx_destroy(&skm->lock);
free(skm);
}
/**
* @brief Set single conf key.
* @remark lock must be held.
* @returns 0 on success or -1 if key is unknown
*/
static int sockem_set0 (sockem_t *skm, const char *key, int val) {
if (!strcmp(key, "rx.thruput") ||
!strcmp(key, "rx.throughput"))
skm->conf.rx_thruput = val;
else if (!strcmp(key, "tx.thruput") ||
!strcmp(key, "tx.throughput"))
skm->conf.tx_thruput = val;
else if (!strcmp(key, "delay"))
skm->conf.delay = val;
else if (!strcmp(key, "jitter"))
skm->conf.jitter = val;
else if (!strcmp(key, "rx.bufsz"))
skm->conf.recv_bufsz = val;
else if (!strcmp(key, "debug"))
skm->conf.debug = val;
else if (!strcmp(key, "true"))
; /* dummy key for allowing non-empty but default config */
else if (!strchr(key, ',')) {
char *s = strdupa(key);
while (*s) {
char *t = strchr(s, ',');
char *d = strchr(s, '=');
if (t)
*t = '\0';
if (!d)
return -1;
*(d++) = '\0';
if (sockem_set0(skm, s, atoi(d)) == -1)
return -1;
if (!t)
break;
s += 1;
}
} else
return -1;
return 0;
}
/**
* @brief Set sockem config parameters
*/
static int sockem_vset (sockem_t *skm, va_list ap) {
const char *key;
int val;
mtx_lock(&skm->lock);
while ((key = va_arg(ap, const char *))) {
val = va_arg(ap, int);
if (sockem_set0(skm, key, val) == -1) {
mtx_unlock(&skm->lock);
return -1;
}
}
mtx_unlock(&skm->lock);
return 0;
}
int sockem_set (sockem_t *skm, ...) {
va_list ap;
int r;
va_start(ap, skm);
r = sockem_vset(skm, ap);
va_end(ap);
return r;
}
sockem_t *sockem_find (int sockfd) {
sockem_t *skm;
pthread_once(&sockem_once, sockem_init);
mtx_lock(&sockem_lock);
LIST_FOREACH(skm, &sockems, link)
if (skm->as == sockfd)
break;
mtx_unlock(&sockem_lock);
return skm;
}
#ifdef LIBSOCKEM_PRELOAD
/**
* Provide overloading socket APIs and conf bootstrapping from env vars.
*
*/
/**
* @brief connect(2) overload
*/
int connect (int sockfd, const struct sockaddr *addr, socklen_t addrlen) {
sockem_t *skm;
pthread_once(&sockem_once, sockem_init);
skm = sockem_connect(sockfd, addr, addrlen, sockem_conf_str, 0, NULL);
if (!skm)
return -1;
return 0;
}
/**
* @brief close(2) overload
*/
int close (int fd) {
sockem_t *skm;
pthread_once(&sockem_once, sockem_init);
mtx_lock(&sockem_lock);
skm = sockem_find(fd);
if (skm)
sockem_close(skm);
mtx_unlock(&sockem_lock);
return sockem_close0(fd);
}
#endif