/*
* Amanda, The Advanced Maryland Automatic Network Disk Archiver
* Copyright (c) 2008-2012 Zmanda, Inc. All Rights Reserved.
* Copyright (c) 2013-2016 Carbonite, Inc. All Rights Reserved.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful, but
* WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
* or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
* for more details.
*
* You should have received a copy of the GNU General Public License along
* with this program; if not, write to the Free Software Foundation, Inc.,
* 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
*
* Contact information: Carbonite Inc., 756 N Pastoria Ave
* Sunnyvale, CA 94085, or: http://www.zmanda.com
*/
#include "amanda.h"
#include "amxfer.h"
#include "element-glue.h"
#include "directtcp.h"
#include "amutil.h"
#include "sockaddr-util.h"
#include "stream.h"
#include "debug.h"
#include "conffile.h"
#include "mem-ring.h"
#include "shm-ring.h"
/*
* Instance definition
*/
typedef struct XferElementGlue_ {
XferElement __parent__;
/* instructions to push_buffer_impl */
enum {
PUSH_TO_RING_BUFFER,
PUSH_TO_FD, /* write to write_fd */
PUSH_INVALID,
PUSH_ACCEPT_FIRST = (1 << 16),
PUSH_CONNECT_FIRST = (2 << 16),
} on_push;
/* instructions to pull_buffer_impl */
enum {
PULL_FROM_RING_BUFFER,
PULL_FROM_FD, /* read from read_fd */
PULL_INVALID,
PULL_ACCEPT_FIRST = (1 << 16),
PULL_CONNECT_FIRST = (2 << 16),
} on_pull;
int *write_fdp;
int *read_fdp;
mem_ring_t *mem_ring;
gboolean need_thread;
/* the stuff we might use, depending on what flavor of glue we're
* providing.. */
int pipe[2];
int input_listen_socket, output_listen_socket;
int input_data_socket, output_data_socket;
int read_fd, write_fd;
/* a ring buffer of ptr/size pairs with semaphores */
struct { gpointer buf; size_t size; } *ring;
amsemaphore_t *ring_used_sem, *ring_free_sem;
gint ring_head, ring_tail;
GThread *thread;
GThreadFunc threadfunc;
} XferElementGlue;
/*
* Class definition
*/
typedef struct XferElementGlueClass_ {
XferElementClass __parent__;
} XferElementGlueClass;
static GObjectClass *parent_class = NULL;
/*
* Utility functions, etc.
*/
static void
make_pipe(
XferElementGlue *self)
{
if (pipe(self->pipe) < 0)
g_critical(_("Could not create pipe: %s"), strerror(errno));
}
static void
send_xfer_done(
XferElementGlue *self)
{
xfer_queue_message(XFER_ELEMENT(self)->xfer,
xmsg_new((XferElement *)self, XMSG_DONE, 0));
}
static gboolean
do_directtcp_listen(
XferElement *elt,
int *sockp,
DirectTCPAddr **addrsp)
{
int sock;
sockaddr_union data_addr;
DirectTCPAddr *addrs;
socklen_t len;
struct addrinfo *res;
struct addrinfo *res_addr;
sockaddr_union *addr = NULL;
int r;
if ((r = resolve_hostname("localhost", 0, &res, NULL)) != 0) {
xfer_cancel_with_error(elt, "resolve_hostname(localhost): %s", gai_strerror(r));
return FALSE;
}
for (res_addr = res; res_addr != NULL; res_addr = res_addr->ai_next) {
if (res_addr->ai_family == AF_INET) {
addr = (sockaddr_union *)res_addr->ai_addr;
break;
}
}
if (!addr) {
addr = (sockaddr_union *)res->ai_addr;
}
sock = *sockp = socket(SU_GET_FAMILY(addr), SOCK_STREAM, 0);
if (sock < 0) {
xfer_cancel_with_error(elt, "socket(): %s", strerror(errno));
freeaddrinfo(res);
return FALSE;
}
len = SS_LEN(addr);
if (bind(sock, (struct sockaddr *)addr, len) != 0) {
xfer_cancel_with_error(elt, "bind(): %s", strerror(errno));
freeaddrinfo(res);
close(sock);
*sockp = -1;
return FALSE;
}
if (listen(sock, 1) < 0) {
xfer_cancel_with_error(elt, "listen(): %s", strerror(errno));
freeaddrinfo(res);
close(sock);
*sockp = -1;
return FALSE;
}
/* TODO: which addresses should this display? all ifaces? localhost? */
len = sizeof(data_addr);
if (getsockname(sock, (struct sockaddr *)&data_addr, &len) < 0)
error("getsockname(): %s", strerror(errno));
addrs = g_new0(DirectTCPAddr, 2);
copy_sockaddr(&addrs[0], &data_addr);
*addrsp = addrs;
freeaddrinfo(res);
return TRUE;
}
static gboolean
prolong_accept(
gpointer data)
{
return !XFER_ELEMENT(data)->cancelled;
}
static int
do_directtcp_accept(
XferElementGlue *self,
int *socketp)
{
int sock;
time_t timeout_time;
time_t dtimeout = (time_t)getconf_int(CNF_DTIMEOUT);
timeout_time = time(NULL) + dtimeout;
g_assert(*socketp != -1);
if ((sock = interruptible_accept(*socketp, NULL, NULL,
prolong_accept, self, timeout_time)) == -1) {
close(*socketp);
*socketp = -1;
/* if the accept was interrupted due to a cancellation, then do not
* add a further error message */
if (errno == 0 && XFER_ELEMENT(self)->cancelled)
return -1;
xfer_cancel_with_error(XFER_ELEMENT(self),
_("Error accepting incoming connection: %s"), strerror(errno));
wait_until_xfer_cancelled(XFER_ELEMENT(self)->xfer);
return -1;
}
/* close the listening socket now, for good measure */
close(*socketp);
*socketp = -1;
g_debug("do_directtcp_accept: %d", sock);
return sock;
}
static int
do_directtcp_connect(
XferElementGlue *self,
DirectTCPAddr *addrs)
{
XferElement *elt = XFER_ELEMENT(self);
sockaddr_union addr;
int sock;
#ifdef WORKING_IPV6
char strsockaddr[INET6_ADDRSTRLEN + 20];
#else
char strsockaddr[INET_ADDRSTRLEN + 20];
#endif
if (!addrs) {
g_debug("element-glue got no directtcp addresses to connect to!");
if (!elt->cancelled) {
xfer_cancel_with_error(elt,
"%s got no directtcp addresses to connect to",
xfer_element_repr(elt));
}
goto cancel_wait;
}
/* set up the sockaddr -- IPv4 only */
copy_sockaddr(&addr, addrs);
str_sockaddr_r(&addr, strsockaddr, sizeof(strsockaddr));
if (strncmp(strsockaddr,"255.255.255.255:", 16) == 0) {
char buffer[32770];
char *s;
int size;
char *data_host;
int data_port;
char *stream_msg = NULL;
g_debug("do_directtcp_connect making indirect data connection to %s",
strsockaddr);
data_port = SU_GET_PORT(&addr);
sock = stream_client(NULL, "localhost", data_port,
STREAM_BUFSIZE, 0, NULL, 0, &stream_msg);
if (stream_msg) {
xfer_cancel_with_error(elt, "stream_client(): %s", stream_msg);
g_free(stream_msg);
goto cancel_wait;
}
if (sock < 0) {
xfer_cancel_with_error(elt, "stream_client(): %s", strerror(errno));
goto cancel_wait;
}
size = full_read(sock, buffer, 32768);
if (size < 0 ) {
xfer_cancel_with_error(elt, "failed to read from indirecttcp: %s",
strerror(errno));
goto cancel_wait;
}
close(sock);
buffer[size++] = ' ';
buffer[size] = '\0';
if ((s = strchr(buffer, ':')) == NULL) {
xfer_cancel_with_error(elt,
"Failed to parse indirect data stream: %s",
buffer);
goto cancel_wait;
}
*s++ = '\0';
data_host = buffer;
data_port = atoi(s);
str_to_sockaddr(data_host, &addr);
SU_SET_PORT(&addr, data_port);
str_sockaddr_r(&addr, strsockaddr, sizeof(strsockaddr));
}
sock = socket(SU_GET_FAMILY(&addr), SOCK_STREAM, 0);
g_debug("do_directtcp_connect making data connection to %s", strsockaddr);
if (sock < 0) {
xfer_cancel_with_error(elt,
"socket(): %s", strerror(errno));
goto cancel_wait;
}
if (connect(sock, (struct sockaddr *)&addr, SS_LEN(&addr)) < 0) {
xfer_cancel_with_error(elt,
"connect(): %s", strerror(errno));
close(sock);
goto cancel_wait;
}
g_debug("do_directtcp_connect: connected to %s, fd %d", strsockaddr, sock);
return sock;
cancel_wait:
wait_until_xfer_cancelled(elt->xfer);
return -1;
}
#define GLUE_BUFFER_SIZE 32768
#define GLUE_RING_BUFFER_SIZE 32
#define mech_pair(IN,OUT) ((IN)*XFER_MECH_MAX+(OUT))
/*
* fd handling
*/
/* if self->read_fdp or self->write_fdp are pointing to this integer, then they
* should be redirected to point to the upstream's output_fd or downstream's
* input_fd, respectively, at the first call to get_read_fd or get_write_fd,
* respectively. */
static int neighboring_element_fd = -1;
#define get_read_fd(self) (((self)->read_fd == -1)? _get_read_fd((self)) : (self)->read_fd)
static int
_get_read_fd(XferElementGlue *self)
{
assert(self->read_fdp);
if (self->read_fdp == &neighboring_element_fd) {
XferElement *elt = XFER_ELEMENT(self);
self->read_fd = xfer_element_swap_output_fd(elt->upstream, -1);
} else {
self->read_fd = *self->read_fdp;
*self->read_fdp = -1;
}
self->read_fdp = NULL;
return self->read_fd;
}
#define get_write_fd(self) (((self)->write_fd == -1)? _get_write_fd((self)) : (self)->write_fd)
static int
_get_write_fd(XferElementGlue *self)
{
assert(self->write_fdp);
if (self->write_fdp == &neighboring_element_fd) {
XferElement *elt = XFER_ELEMENT(self);
self->write_fd = xfer_element_swap_input_fd(elt->downstream, -1);
} else {
self->write_fd = *self->write_fdp;
*self->write_fdp = -1;
}
self->write_fdp = NULL;
return self->write_fd;
}
static int
close_read_fd(XferElementGlue *self)
{
int fd = get_read_fd(self);
self->read_fd = -1;
return close(fd);
}
static int
close_write_fd(XferElementGlue *self)
{
int fd = get_write_fd(self);
self->write_fd = -1;
return close(fd);
}
/*
* Worker thread utility functions
*/
static void
pull_and_write(XferElementGlue *self)
{
XferElement *elt = XFER_ELEMENT(self);
int fd = get_write_fd(self);
XMsg *msg;
size_t written;
g_debug("pull_and_write");
self->write_fdp = NULL;
while (!elt->cancelled) {
size_t len;
char *buf;
/* get a buffer from upstream */
buf = xfer_element_pull_buffer(elt->upstream, &len);
if (!buf)
break;
/* write it */
if (!elt->downstream->drain_mode) {
written = full_write(fd, buf, len);
if (written < len) {
if (elt->downstream->must_drain) {
g_debug("Error writing to fd %d: %s", fd, strerror(errno));
} else if (elt->downstream->ignore_broken_pipe && errno == EPIPE) {
} else {
if (!elt->cancelled) {
xfer_cancel_with_error(elt,
_("Error writing to fd %d: %s"), fd, strerror(errno));
wait_until_xfer_cancelled(elt->xfer);
}
amfree(buf);
break;
}
elt->downstream->drain_mode = TRUE;
}
}
crc32_add((uint8_t *)buf, len, &elt->crc);
amfree(buf);
}
if (elt->cancelled && elt->expect_eof)
xfer_element_drain_buffers(elt->upstream);
g_debug("sending XMSG_CRC message %p", elt->downstream);
g_debug("pull_and_write CRC: %08x size %lld",
crc32_finish(&elt->crc), (long long)elt->crc.size);
msg = xmsg_new(elt->downstream, XMSG_CRC, 0);
msg->crc = crc32_finish(&elt->crc);
msg->size = elt->crc.size;
xfer_queue_message(elt->xfer, msg);
/* close the fd we've been writing, as an EOF signal to downstream, and
* set it to -1 to avoid accidental re-use */
close_write_fd(self);
}
static void
pull_static_and_write(XferElementGlue *self)
{
XferElement *elt = XFER_ELEMENT(self);
int fd = get_write_fd(self);
XMsg *msg;
size_t written;
size_t block_size_up = xfer_element_get_block_size(elt->upstream);
size_t block_size;
char *buf, *buf1;
g_debug("pull_static_and_write");
if (block_size_up != 0)
block_size = block_size_up;
else
block_size = NETWORK_BLOCK_BYTES;
buf = malloc(block_size);
self->write_fdp = NULL;
while (!elt->cancelled) {
size_t len;
/* get a buffer from upstream */
buf1 = xfer_element_pull_buffer_static(elt->upstream, buf, block_size, &len);
if (!buf1)
break;
/* write it */
if (!elt->downstream->drain_mode) {
written = full_write(fd, buf, len);
if (written < len) {
if (elt->downstream->must_drain) {
g_debug("Error writing to fd %d: %s", fd, strerror(errno));
} else if (elt->downstream->ignore_broken_pipe && errno == EPIPE) {
} else {
if (!elt->cancelled) {
xfer_cancel_with_error(elt,
_("Error writing to fd %d: %s"), fd, strerror(errno));
wait_until_xfer_cancelled(elt->xfer);
}
amfree(buf);
break;
}
elt->downstream->drain_mode = TRUE;
}
}
crc32_add((uint8_t *)buf, len, &elt->crc);
}
if (elt->cancelled && elt->expect_eof)
xfer_element_drain_buffers(elt->upstream);
g_debug("sending XMSG_CRC message %p", elt->downstream);
g_debug("pull_static_and_write CRC: %08x size %lld",
crc32_finish(&elt->crc), (long long)elt->crc.size);
msg = xmsg_new(elt->downstream, XMSG_CRC, 0);
msg->crc = crc32_finish(&elt->crc);
msg->size = elt->crc.size;
xfer_queue_message(elt->xfer, msg);
amfree(buf);
/* close the fd we've been writing, as an EOF signal to downstream, and
* set it to -1 to avoid accidental re-use */
close_write_fd(self);
}
static void
read_and_write(XferElementGlue *self)
{
XferElement *elt = XFER_ELEMENT(self);
/* dynamically allocate a buffer, in case this thread has
* a limited amount of stack allocated */
char *buf = g_malloc(GLUE_BUFFER_SIZE);
int rfd = get_read_fd(self);
int wfd = get_write_fd(self);
XMsg *msg;
crc32_init(&elt->crc);
g_debug("read_and_write: read from %d, write to %d", rfd, wfd);
while (!elt->cancelled) {
size_t len;
/* read from upstream */
len = read_fully(rfd, buf, GLUE_BUFFER_SIZE, NULL);
if (len < GLUE_BUFFER_SIZE) {
if (errno) {
if (!elt->cancelled) {
xfer_cancel_with_error(elt,
_("Error reading from fd %d: %s"), rfd, strerror(errno));
wait_until_xfer_cancelled(elt->xfer);
}
break;
} else if (len == 0) { /* we only count a zero-length read as EOF */
break;
}
}
/* write the buffer fully */
if (!elt->downstream->drain_mode && full_write(wfd, buf, len) < len) {
if (elt->downstream->must_drain) {
g_debug("Could not write to fd %d: %s", wfd, strerror(errno));
} else if (elt->downstream->ignore_broken_pipe && errno == EPIPE) {
} else {
if (!elt->cancelled) {
xfer_cancel_with_error(elt,
_("Could not write to fd %d: %s"),
wfd, strerror(errno));
wait_until_xfer_cancelled(elt->xfer);
}
break;
}
}
crc32_add((uint8_t *)buf, len, &elt->crc);
}
if (elt->cancelled && elt->expect_eof)
xfer_element_drain_fd(rfd);
/* close the read fd. If it's not at EOF, then upstream will get EPIPE, which will hopefully
* kill it and complete the cancellation */
close_read_fd(self);
/* close the fd we've been writing, as an EOF signal to downstream */
close_write_fd(self);
g_debug("read_and_write upstream CRC: %08x size %lld",
crc32_finish(&elt->crc), (long long)elt->crc.size);
g_debug("sending XMSG_CRC message");
msg = xmsg_new(elt->upstream, XMSG_CRC, 0);
msg->crc = crc32_finish(&elt->crc);
msg->size = elt->crc.size;
xfer_queue_message(elt->xfer, msg);
g_debug("read_and_write downstream CRC: %08x size %lld",
crc32_finish(&elt->crc), (long long)elt->crc.size);
g_debug("sending XMSG_CRC message");
msg = xmsg_new(elt->downstream, XMSG_CRC, 0);
msg->crc = crc32_finish(&elt->crc);
msg->size = elt->crc.size;
xfer_queue_message(elt->xfer, msg);
amfree(buf);
}
static void
read_and_push(
XferElementGlue *self)
{
XferElement *elt = XFER_ELEMENT(self);
int fd = get_read_fd(self);
XMsg *msg;
crc32_init(&elt->crc);
while (!elt->cancelled) {
char *buf = g_malloc(GLUE_BUFFER_SIZE);
gsize len;
int read_error;
/* read a buffer from upstream */
len = read_fully(fd, buf, GLUE_BUFFER_SIZE, &read_error);
if (len < GLUE_BUFFER_SIZE) {
if (read_error) {
if (!elt->cancelled) {
xfer_cancel_with_error(elt,
_("Error reading from fd %d: %s"), fd, strerror(read_error));
g_debug("element-glue: error reading from fd %d: %s",
fd, strerror(read_error));
wait_until_xfer_cancelled(elt->xfer);
}
amfree(buf);
break;
} else if (len == 0) { /* we only count a zero-length read as EOF */
amfree(buf);
break;
}
}
crc32_add((uint8_t *)buf, len, &elt->crc);
xfer_element_push_buffer(elt->downstream, buf, len);
}
if (elt->cancelled && elt->expect_eof)
xfer_element_drain_fd(fd);
/* send an EOF indication downstream */
xfer_element_push_buffer(elt->downstream, NULL, 0);
/* close the read fd, since it's at EOF */
close_read_fd(self);
g_debug("sending XMSG_CRC message");
g_debug("read_and_push CRC: %08x size %lld",
crc32_finish(&elt->crc), (long long)elt->crc.size);
msg = xmsg_new(elt->upstream, XMSG_CRC, 0);
msg->crc = crc32_finish(&elt->crc);
msg->size = elt->crc.size;
xfer_queue_message(elt->xfer, msg);
}
static void
read_and_push_static(
XferElementGlue *self)
{
XferElement *elt = XFER_ELEMENT(self);
int fd = get_read_fd(self);
XMsg *msg;
char *buf = g_malloc(GLUE_BUFFER_SIZE);
g_debug("read_and_push_static");
crc32_init(&elt->crc);
while (!elt->cancelled) {
gsize len;
int read_error;
/* read a buffer from upstream */
len = read_fully(fd, buf, GLUE_BUFFER_SIZE, &read_error);
if (len < GLUE_BUFFER_SIZE) {
if (read_error) {
if (!elt->cancelled) {
xfer_cancel_with_error(elt,
_("Error reading from fd %d: %s"), fd, strerror(read_error));
g_debug("element-glue: error reading from fd %d: %s",
fd, strerror(read_error));
wait_until_xfer_cancelled(elt->xfer);
}
amfree(buf);
break;
} else if (len == 0) { /* we only count a zero-length read as EOF */
amfree(buf);
break;
}
}
crc32_add((uint8_t *)buf, len, &elt->crc);
xfer_element_push_buffer_static(elt->downstream, buf, len);
}
if (elt->cancelled && elt->expect_eof)
xfer_element_drain_fd(fd);
/* send an EOF indication downstream */
xfer_element_push_buffer_static(elt->downstream, NULL, 0);
/* close the read fd, since it's at EOF */
close_read_fd(self);
g_debug("sending XMSG_CRC message");
g_debug("read_and_push_static CRC: %08x size %lld",
crc32_finish(&elt->crc), (long long)elt->crc.size);
msg = xmsg_new(elt->upstream, XMSG_CRC, 0);
msg->crc = crc32_finish(&elt->crc);
msg->size = elt->crc.size;
xfer_queue_message(elt->xfer, msg);
}
static void
read_to_mem_ring(
XferElementGlue *self)
{
XferElement *elt = XFER_ELEMENT(self);
int fd = get_read_fd(self);
XMsg *msg;
uint64_t read_offset;
uint64_t write_offset;
uint64_t producer_block_size;
uint64_t consumer_block_size;
uint64_t mem_ring_size;
g_debug("read_to_mem_ring");
mem_ring_producer_set_size(self->mem_ring, GLUE_BUFFER_SIZE*4, GLUE_BUFFER_SIZE);
mem_ring_size = self->mem_ring->ring_size;
producer_block_size = self->mem_ring->producer_block_size;
consumer_block_size = self->mem_ring->consumer_block_size;
crc32_init(&elt->crc);
while (!elt->cancelled) {
gsize len;
gsize len2;
int read_error;
g_mutex_lock(self->mem_ring->mutex);
write_offset = self->mem_ring->write_offset;
read_offset = self->mem_ring->read_offset;
while (!(write_offset == read_offset) &&
!((write_offset < read_offset) &&
(read_offset - write_offset > producer_block_size)) &&
!((write_offset > read_offset) &&
(mem_ring_size - write_offset + read_offset > producer_block_size))) {
if (elt->cancelled) {
g_mutex_unlock(self->mem_ring->mutex);
goto return_eof;
}
g_cond_wait(self->mem_ring->free_cond, self->mem_ring->mutex);
write_offset = self->mem_ring->write_offset;
read_offset = self->mem_ring->read_offset;
}
g_mutex_unlock(self->mem_ring->mutex);
/* read a buffer from upstream */
if (write_offset + self->mem_ring->producer_block_size <= mem_ring_size) {
len = read_fully(fd, self->mem_ring->buffer+write_offset, producer_block_size, &read_error);
if (len > 0) {
crc32_add((uint8_t *)self->mem_ring->buffer+write_offset, len, &elt->crc);
write_offset += len;
write_offset %= mem_ring_size;
g_mutex_lock(self->mem_ring->mutex);
self->mem_ring->data_avail += len;
self->mem_ring->written += len;
self->mem_ring->write_offset = write_offset;
if (self->mem_ring->data_avail >= consumer_block_size) {
g_cond_broadcast(self->mem_ring->add_cond);
self->mem_ring->data_avail -= consumer_block_size;
}
g_mutex_unlock(self->mem_ring->mutex);
}
if (len < producer_block_size) {
if (read_error) {
if (!elt->cancelled) {
xfer_cancel_with_error(elt,
_("Error reading from fd %d: %s"), fd, strerror(read_error));
g_debug("element-glue: error reading from fd %d: %s",
fd, strerror(read_error));
wait_until_xfer_cancelled(elt->xfer);
}
break;
} else if (len == 0) { /* we only count a zero-length read as EOF */
break;
}
}
} else {
len = read_fully(fd, self->mem_ring->buffer+write_offset, mem_ring_size - write_offset, &read_error);
if (len > 0) {
crc32_add((uint8_t *)self->mem_ring->buffer+write_offset, len, &elt->crc);
}
len2 = 0;
if (len == mem_ring_size - write_offset) {
len2 = read_fully(fd, self->mem_ring->buffer, producer_block_size - (mem_ring_size - write_offset), &read_error);
if (len2 > 0) {
crc32_add((uint8_t *)self->mem_ring->buffer, len2, &elt->crc);
len += len2;
}
}
if (len > 0) {
write_offset += len;
write_offset %= mem_ring_size;
g_mutex_lock(self->mem_ring->mutex);
self->mem_ring->write_offset = write_offset;
self->mem_ring->data_avail += len;
if (self->mem_ring->data_avail >= consumer_block_size) {
g_cond_broadcast(self->mem_ring->add_cond);
self->mem_ring->data_avail -= consumer_block_size;
}
g_mutex_unlock(self->mem_ring->mutex);
}
if (len < producer_block_size) {
if (read_error) {
if (!elt->cancelled) {
xfer_cancel_with_error(elt,
_("Error reading from fd %d: %s"), fd, strerror(read_error));
g_debug("element-glue: error reading from fd %d: %s",
fd, strerror(read_error));
wait_until_xfer_cancelled(elt->xfer);
}
break;
} else if (len == 0 || len2 == 0) { /* we only count a zero-length read as EOF */
break;
}
}
}
}
return_eof:
if (elt->cancelled && elt->expect_eof)
xfer_element_drain_fd(fd);
/* send an EOF indication downstream */
g_mutex_lock(self->mem_ring->mutex);
self->mem_ring->eof_flag = TRUE;
g_cond_broadcast(self->mem_ring->add_cond);
g_mutex_unlock(self->mem_ring->mutex);
/* close the read fd, since it's at EOF */
close_read_fd(self);
g_debug("sending XMSG_CRC message");
g_debug("read_to_mem_ring CRC: %08x size %lld",
crc32_finish(&elt->crc), (long long)elt->crc.size);
msg = xmsg_new(elt->upstream, XMSG_CRC, 0);
msg->crc = crc32_finish(&elt->crc);
msg->size = elt->crc.size;
xfer_queue_message(elt->xfer, msg);
}
static void
read_to_shm_ring(
XferElementGlue *self)
{
XferElement *elt = XFER_ELEMENT(self);
int fd = get_read_fd(self);
XMsg *msg;
uint64_t write_offset;
uint64_t written;
uint64_t readx;
uint64_t shm_ring_size;
struct iovec iov[2];
int iov_count;
ssize_t n;
size_t consumer_block_size;
g_debug("read_to_shm_ring");
elt->shm_ring = shm_ring_link(xfer_element_get_shm_ring(elt->downstream)->shm_control_name);
shm_ring_producer_set_size(elt->shm_ring, GLUE_BUFFER_SIZE*4, GLUE_BUFFER_SIZE);
shm_ring_size = elt->shm_ring->mc->ring_size;
consumer_block_size = elt->shm_ring->mc->consumer_block_size;
crc32_init(&elt->crc);
while (!elt->cancelled && !elt->shm_ring->mc->cancelled) {
write_offset = elt->shm_ring->mc->write_offset;
written = elt->shm_ring->mc->written;
while (!elt->cancelled && !elt->shm_ring->mc->cancelled) {
readx = elt->shm_ring->mc->readx;
if (shm_ring_size - (written - readx) > elt->shm_ring->block_size)
break;
if (shm_ring_sem_wait(elt->shm_ring, elt->shm_ring->sem_write) != 0)
break;
}
if (elt->cancelled || elt->shm_ring->mc->cancelled) {
break;
}
iov[0].iov_base = elt->shm_ring->data + write_offset;
if (write_offset + elt->shm_ring->block_size <= shm_ring_size) {
iov[0].iov_len = elt->shm_ring->block_size;
iov_count = 1;
} else {
iov[0].iov_len = shm_ring_size - write_offset;
iov[1].iov_base = elt->shm_ring->data;
iov[1].iov_len = elt->shm_ring->block_size - iov[0].iov_len;
iov_count = 2;
}
n = readv(fd, iov, iov_count);
if (n > 0) {
write_offset += n;
write_offset %= shm_ring_size;
elt->shm_ring->mc->write_offset = write_offset;
elt->shm_ring->mc->written += n;
elt->shm_ring->data_avail += n;
if (elt->shm_ring->data_avail >= consumer_block_size) {
sem_post(elt->shm_ring->sem_read);
elt->shm_ring->data_avail -= consumer_block_size;
}
if (n <= (ssize_t)iov[0].iov_len) {
crc32_add((uint8_t *)iov[0].iov_base, n, &elt->crc);
} else {
crc32_add((uint8_t *)iov[0].iov_base, iov[0].iov_len, &elt->crc);
crc32_add((uint8_t *)iov[1].iov_base, n - iov[0].iov_len, &elt->crc);
}
} else {
elt->shm_ring->mc->eof_flag = TRUE;
break;
}
}
if (elt->cancelled) {
elt->shm_ring->mc->cancelled = TRUE;
g_debug("read_to_shm_ring: cancel shm-ring because elt cancelled");
} else if (elt->shm_ring->mc->cancelled) {
xfer_cancel_with_error(elt, "shm_ring cancelled");
}
sem_post(elt->shm_ring->sem_read);
sem_post(elt->shm_ring->sem_read);
// wait for the consumer to read everything
while (!elt->cancelled &&
!elt->shm_ring->mc->cancelled &&
(elt->shm_ring->mc->written != elt->shm_ring->mc->readx ||
!elt->shm_ring->mc->eof_flag)) {
if (shm_ring_sem_wait(elt->shm_ring, elt->shm_ring->sem_write) != 0)
break;
}
/* close the read fd, since it's at EOF */
close_read_fd(self);
g_debug("sending XMSG_CRC message");
g_debug("read_to_shm_ring CRC: %08x size %lld",
crc32_finish(&elt->crc), (long long)elt->crc.size);
msg = xmsg_new(elt->upstream, XMSG_CRC, 0);
msg->crc = crc32_finish(&elt->crc);
msg->size = elt->crc.size;
xfer_queue_message(elt->xfer, msg);
close_producer_shm_ring(elt->shm_ring);
elt->shm_ring = NULL;
return;
}
static void
pull_static_to_shm_ring(
XferElementGlue *self)
{
XferElement *elt = XFER_ELEMENT(self);
XMsg *msg;
uint64_t write_offset;
uint64_t written;
uint64_t readx;
uint64_t shm_ring_size;
size_t block_size;
size_t len;
size_t consumer_block_size;
gpointer base;
g_debug("pull_static_to_shm_ring");
elt->shm_ring = shm_ring_link(xfer_element_get_shm_ring(elt->downstream)->shm_control_name);
shm_ring_producer_set_size(elt->shm_ring, GLUE_BUFFER_SIZE*4, GLUE_BUFFER_SIZE);
shm_ring_size = elt->shm_ring->mc->ring_size;
consumer_block_size = elt->shm_ring->mc->consumer_block_size;
crc32_init(&elt->crc);
while (!elt->cancelled && !elt->shm_ring->mc->cancelled) {
write_offset = elt->shm_ring->mc->write_offset;
written = elt->shm_ring->mc->written;
while (!elt->cancelled && !elt->shm_ring->mc->cancelled) {
readx = elt->shm_ring->mc->readx;
if (shm_ring_size - (written - readx) > elt->shm_ring->block_size)
break;
if (shm_ring_sem_wait(elt->shm_ring, elt->shm_ring->sem_write) != 0)
break;
}
if (elt->cancelled || elt->shm_ring->mc->cancelled)
break;
if (write_offset + elt->shm_ring->block_size <= shm_ring_size) {
block_size = elt->shm_ring->block_size;
} else {
block_size = shm_ring_size - write_offset;
}
base = elt->shm_ring->data + write_offset;
xfer_element_pull_buffer_static(elt->upstream, base, block_size, &len);
if (len > 0) {
write_offset += len;
write_offset %= shm_ring_size;
elt->shm_ring->mc->write_offset = write_offset;
elt->shm_ring->mc->written += len;
elt->shm_ring->data_avail += len;
if (elt->shm_ring->data_avail >= consumer_block_size) {
sem_post(elt->shm_ring->sem_read);
elt->shm_ring->data_avail -= consumer_block_size;
}
crc32_add((uint8_t *)base, len, &elt->crc);
} else {
elt->shm_ring->mc->eof_flag = TRUE;
break;
}
}
if (elt->cancelled) {
elt->shm_ring->mc->cancelled = TRUE;
g_debug("pull_static_to_shm_ring: cancel shm-ring because elt cancelled");
} else if (elt->shm_ring->mc->cancelled) {
xfer_cancel_with_error(elt, "shm_ring cancelled");
}
sem_post(elt->shm_ring->sem_read); // for the last block
sem_post(elt->shm_ring->sem_read); // for the eof_flag
// wait for the consumer to read everything
while (!elt->cancelled &&
!elt->shm_ring->mc->cancelled &&
(elt->shm_ring->mc->written != elt->shm_ring->mc->readx ||
!elt->shm_ring->mc->eof_flag)) {
if (shm_ring_sem_wait(elt->shm_ring, elt->shm_ring->sem_write) != 0)
break;
}
g_debug("sending XMSG_CRC message");
g_debug("pull_static_to_shm_ring CRC: %08x size %lld",
crc32_finish(&elt->crc), (long long)elt->crc.size);
msg = xmsg_new(elt->upstream, XMSG_CRC, 0);
msg->crc = crc32_finish(&elt->crc);
msg->size = elt->crc.size;
xfer_queue_message(elt->xfer, msg);
return;
}
static void
shm_ring_and_push_buffer_static(XferElementGlue *self)
{
XferElement *elt = XFER_ELEMENT(self);
uint64_t read_offset;
uint64_t shm_ring_size;
gsize usable = 0;
gboolean eof_flag = FALSE;
g_debug("shm_ring_and_push_buffer_static");
shm_ring_consumer_set_size(elt->shm_ring, SHM_RING_SIZE, SHM_RING_BLOCK_SIZE);
shm_ring_size = elt->shm_ring->mc->ring_size;
sem_post(elt->shm_ring->sem_write);
while (!elt->shm_ring->mc->cancelled) {
do {
usable = elt->shm_ring->mc->written - elt->shm_ring->mc->readx;
eof_flag = elt->shm_ring->mc->eof_flag;
if (shm_ring_sem_wait(elt->shm_ring, elt->shm_ring->sem_read) != 0)
break;
} while (!elt->shm_ring->mc->cancelled &&
usable < elt->shm_ring->block_size && !eof_flag);
read_offset = elt->shm_ring->mc->read_offset;
while (usable >= elt->shm_ring->block_size || eof_flag) {
gsize to_write = usable;
if (to_write > elt->shm_ring->block_size)
to_write = elt->shm_ring->block_size;
if (to_write > 0) {
xfer_element_push_buffer_static(elt->downstream,
elt->shm_ring->data +read_offset,
to_write);
}
if (to_write) {
read_offset += to_write;
if (read_offset >= shm_ring_size)
read_offset -= shm_ring_size;
elt->shm_ring->mc->read_offset = read_offset;
elt->shm_ring->mc->readx += to_write;
sem_post(elt->shm_ring->sem_write);
usable -= to_write;
}
if (elt->shm_ring->mc->write_offset == elt->shm_ring->mc->read_offset &&
elt->shm_ring->mc->eof_flag) {
// notify the producer that everythinng is read
xfer_element_push_buffer_static(elt->downstream, NULL, 0);
sem_post(elt->shm_ring->sem_write);
return;
}
}
}
}
static void
pull_and_push(XferElementGlue *self)
{
XferElement *elt = XFER_ELEMENT(self);
gboolean eof_sent = FALSE;
g_debug("pull_and_push");
while (!elt->cancelled) {
char *buf;
size_t len;
/* get a buffer from upstream */
buf = xfer_element_pull_buffer(elt->upstream, &len);
/* and push it downstream */
xfer_element_push_buffer(elt->downstream, buf, len);
if (!buf) {
eof_sent = TRUE;
break;
}
}
if (elt->cancelled && elt->expect_eof)
xfer_element_drain_buffers(elt->upstream);
if (!eof_sent)
xfer_element_push_buffer(elt->downstream, NULL, 0);
}
static void
pull_and_push_static(
XferElementGlue *self)
{
XferElement *elt = XFER_ELEMENT(self);
gboolean eof_sent = FALSE;
size_t block_size_up = xfer_element_get_block_size(elt->upstream);
size_t block_size_down = xfer_element_get_block_size(elt->downstream);
size_t block_size;
char *buf;
g_debug("pull_and_push_static");
if (block_size_up != 0 && block_size_down != 0 && block_size_up != block_size_down) {
g_critical("pull_and_push_static with different block_size (%zu, %zu)", block_size_up, block_size_down);
}
if (block_size_up != 0)
block_size = block_size_up;
else if (block_size_down != 0)
block_size = block_size_down;
else
block_size = NETWORK_BLOCK_BYTES;
buf = malloc(block_size);
while (!elt->cancelled) {
size_t len;
/* get a buffer from upstream */
xfer_element_pull_buffer_static(elt->upstream, buf, block_size, &len);
/* and push it downstream */
if (len == 0) {
xfer_element_push_buffer_static(elt->downstream, NULL, len);
eof_sent = TRUE;
break;
} else {
xfer_element_push_buffer_static(elt->downstream, buf, len);
}
}
amfree(buf);
if (elt->cancelled && elt->expect_eof)
xfer_element_drain_buffers(elt->upstream);
if (!eof_sent)
xfer_element_push_buffer_static(elt->downstream, NULL, 0);
}
static gpointer
worker_thread(
gpointer data)
{
XferElement *elt = XFER_ELEMENT(data);
XferElementGlue *self = XFER_ELEMENT_GLUE(data);
switch (mech_pair(elt->input_mech, elt->output_mech)) {
case mech_pair(XFER_MECH_READFD, XFER_MECH_WRITEFD):
read_and_write(self);
break;
case mech_pair(XFER_MECH_READFD, XFER_MECH_PUSH_BUFFER):
case mech_pair(XFER_MECH_WRITEFD, XFER_MECH_PUSH_BUFFER):
read_and_push(self);
break;
case mech_pair(XFER_MECH_READFD, XFER_MECH_PUSH_BUFFER_STATIC):
case mech_pair(XFER_MECH_WRITEFD, XFER_MECH_PUSH_BUFFER_STATIC):
read_and_push_static(self);
break;
case mech_pair(XFER_MECH_READFD, XFER_MECH_MEM_RING):
case mech_pair(XFER_MECH_WRITEFD, XFER_MECH_MEM_RING):
read_to_mem_ring(self);
break;
case mech_pair(XFER_MECH_READFD, XFER_MECH_SHM_RING):
case mech_pair(XFER_MECH_WRITEFD, XFER_MECH_SHM_RING):
read_to_shm_ring(self);
break;
case mech_pair(XFER_MECH_PULL_BUFFER, XFER_MECH_READFD):
case mech_pair(XFER_MECH_PULL_BUFFER, XFER_MECH_WRITEFD):
pull_and_write(self);
break;
case mech_pair(XFER_MECH_PULL_BUFFER_STATIC, XFER_MECH_READFD):
case mech_pair(XFER_MECH_PULL_BUFFER_STATIC, XFER_MECH_WRITEFD):
pull_static_and_write(self);
break;
case mech_pair(XFER_MECH_PULL_BUFFER, XFER_MECH_PUSH_BUFFER):
pull_and_push(self);
break;
case mech_pair(XFER_MECH_PULL_BUFFER_STATIC, XFER_MECH_PUSH_BUFFER_STATIC):
pull_and_push_static(self);
break;
case mech_pair(XFER_MECH_READFD, XFER_MECH_DIRECTTCP_LISTEN):
case mech_pair(XFER_MECH_WRITEFD, XFER_MECH_DIRECTTCP_LISTEN):
if ((self->output_data_socket = do_directtcp_connect(self,
elt->downstream->input_listen_addrs)) == -1)
break;
self->write_fdp = &self->output_data_socket;
read_and_write(self);
break;
case mech_pair(XFER_MECH_PULL_BUFFER, XFER_MECH_DIRECTTCP_LISTEN):
if ((self->output_data_socket = do_directtcp_connect(self,
elt->downstream->input_listen_addrs)) == -1)
break;
self->write_fdp = &self->output_data_socket;
pull_and_write(self);
break;
case mech_pair(XFER_MECH_PULL_BUFFER_STATIC, XFER_MECH_DIRECTTCP_LISTEN):
if ((self->output_data_socket = do_directtcp_connect(self,
elt->downstream->input_listen_addrs)) == -1)
break;
self->write_fdp = &self->output_data_socket;
pull_static_and_write(self);
break;
case mech_pair(XFER_MECH_DIRECTTCP_LISTEN, XFER_MECH_READFD):
case mech_pair(XFER_MECH_DIRECTTCP_LISTEN, XFER_MECH_WRITEFD):
if ((self->input_data_socket = do_directtcp_accept(self, &self->input_listen_socket)) == -1)
break;
self->read_fdp = &self->input_data_socket;
read_and_write(self);
break;
case mech_pair(XFER_MECH_DIRECTTCP_LISTEN, XFER_MECH_PUSH_BUFFER):
if ((self->input_data_socket = do_directtcp_accept(self,
&self->input_listen_socket)) == -1)
break;
self->read_fdp = &self->input_data_socket;
read_and_push(self);
break;
case mech_pair(XFER_MECH_DIRECTTCP_LISTEN, XFER_MECH_PUSH_BUFFER_STATIC):
if ((self->input_data_socket = do_directtcp_accept(self,
&self->input_listen_socket)) == -1)
break;
self->read_fdp = &self->input_data_socket;
read_and_push_static(self);
break;
case mech_pair(XFER_MECH_DIRECTTCP_LISTEN, XFER_MECH_MEM_RING):
if ((self->input_data_socket = do_directtcp_accept(self, &self->input_listen_socket)) == -1)
break;
self->read_fdp = &self->input_data_socket;
read_to_mem_ring(self);
break;
case mech_pair(XFER_MECH_DIRECTTCP_LISTEN, XFER_MECH_SHM_RING):
if ((self->input_data_socket = do_directtcp_accept(self, &self->input_listen_socket)) == -1)
break;
self->read_fdp = &self->input_data_socket;
read_to_shm_ring(self);
break;
/* The following pair have no glue threads */
case mech_pair(XFER_MECH_DIRECTTCP_CONNECT, XFER_MECH_PULL_BUFFER):
case mech_pair(XFER_MECH_DIRECTTCP_CONNECT, XFER_MECH_PULL_BUFFER_STATIC):
case mech_pair(XFER_MECH_DIRECTTCP_LISTEN, XFER_MECH_PULL_BUFFER):
case mech_pair(XFER_MECH_DIRECTTCP_LISTEN, XFER_MECH_PULL_BUFFER_STATIC):
case mech_pair(XFER_MECH_READFD, XFER_MECH_PULL_BUFFER):
case mech_pair(XFER_MECH_WRITEFD, XFER_MECH_READFD):
case mech_pair(XFER_MECH_WRITEFD, XFER_MECH_PULL_BUFFER):
case mech_pair(XFER_MECH_WRITEFD, XFER_MECH_PULL_BUFFER_STATIC):
case mech_pair(XFER_MECH_PUSH_BUFFER, XFER_MECH_READFD):
case mech_pair(XFER_MECH_PUSH_BUFFER_STATIC, XFER_MECH_READFD):
case mech_pair(XFER_MECH_PUSH_BUFFER, XFER_MECH_WRITEFD):
case mech_pair(XFER_MECH_PUSH_BUFFER_STATIC, XFER_MECH_WRITEFD):
case mech_pair(XFER_MECH_PUSH_BUFFER, XFER_MECH_PULL_BUFFER):
case mech_pair(XFER_MECH_PUSH_BUFFER_STATIC, XFER_MECH_PULL_BUFFER_STATIC):
case mech_pair(XFER_MECH_PUSH_BUFFER, XFER_MECH_DIRECTTCP_LISTEN):
case mech_pair(XFER_MECH_PUSH_BUFFER_STATIC, XFER_MECH_DIRECTTCP_LISTEN):
case mech_pair(XFER_MECH_PUSH_BUFFER, XFER_MECH_DIRECTTCP_CONNECT):
case mech_pair(XFER_MECH_PUSH_BUFFER_STATIC, XFER_MECH_DIRECTTCP_CONNECT):
default:
g_debug("Worker no thread: %d %d", elt->input_mech, elt->output_mech);
g_assert_not_reached();
break;
// case mech_pair(XFER_MECH_PULL_BUFFER_STATIC, XFER_MECH_MEM_RING):
// pull_static_to_mem_ring(self);
// break;
case mech_pair(XFER_MECH_PULL_BUFFER_STATIC, XFER_MECH_SHM_RING):
pull_static_to_shm_ring(self);
break;
case mech_pair(XFER_MECH_WRITEFD, XFER_MECH_DIRECTTCP_CONNECT):
case mech_pair(XFER_MECH_READFD, XFER_MECH_DIRECTTCP_CONNECT):
if ((self->output_data_socket = do_directtcp_accept(self,
&self->output_listen_socket)) == -1)
break;
self->write_fdp = &self->output_data_socket;
read_and_write(self);
break;
case mech_pair(XFER_MECH_DIRECTTCP_CONNECT, XFER_MECH_WRITEFD):
case mech_pair(XFER_MECH_DIRECTTCP_CONNECT, XFER_MECH_READFD):
if ((self->input_data_socket = do_directtcp_connect(self,
elt->upstream->output_listen_addrs)) == -1)
break;
self->read_fdp = &self->input_data_socket;
read_and_write(self);
break;
case mech_pair(XFER_MECH_DIRECTTCP_CONNECT, XFER_MECH_PUSH_BUFFER):
if ((self->input_data_socket = do_directtcp_connect(self,
elt->upstream->output_listen_addrs)) == -1)
break;
self->read_fdp = &self->input_data_socket;
read_and_push(self);
break;
case mech_pair(XFER_MECH_DIRECTTCP_CONNECT, XFER_MECH_PUSH_BUFFER_STATIC):
if ((self->input_data_socket = do_directtcp_connect(self,
elt->upstream->output_listen_addrs)) == -1)
break;
self->read_fdp = &self->input_data_socket;
read_and_push_static(self);
break;
case mech_pair(XFER_MECH_DIRECTTCP_CONNECT, XFER_MECH_MEM_RING):
if ((self->input_data_socket = do_directtcp_connect(self,
elt->upstream->output_listen_addrs)) == -1)
break;
self->read_fdp = &self->input_data_socket;
read_to_mem_ring(self);
break;
case mech_pair(XFER_MECH_DIRECTTCP_CONNECT, XFER_MECH_SHM_RING):
if ((self->input_data_socket = do_directtcp_connect(self,
elt->upstream->output_listen_addrs)) == -1)
break;
self->read_fdp = &self->input_data_socket;
read_to_shm_ring(self);
break;
case mech_pair(XFER_MECH_PULL_BUFFER, XFER_MECH_DIRECTTCP_CONNECT):
if ((self->output_data_socket = do_directtcp_accept(self,
&self->output_listen_socket)) == -1)
break;
self->write_fdp = &self->output_data_socket;
pull_and_write(self);
break;
case mech_pair(XFER_MECH_PULL_BUFFER_STATIC, XFER_MECH_DIRECTTCP_CONNECT):
if ((self->output_data_socket = do_directtcp_accept(self,
&self->output_listen_socket)) == -1)
break;
self->write_fdp = &self->output_data_socket;
pull_static_and_write(self);
break;
case mech_pair(XFER_MECH_DIRECTTCP_LISTEN, XFER_MECH_DIRECTTCP_CONNECT):
/* TODO: use async accept's here to avoid order dependency */
if ((self->output_data_socket = do_directtcp_accept(self,
&self->output_listen_socket)) == -1)
break;
self->write_fdp = &self->output_data_socket;
if ((self->input_data_socket = do_directtcp_accept(self,
&self->input_listen_socket)) == -1)
break;
self->read_fdp = &self->input_data_socket;
read_and_write(self);
break;
case mech_pair(XFER_MECH_DIRECTTCP_CONNECT, XFER_MECH_DIRECTTCP_LISTEN):
/* TODO: use async connects and select() to avoid order dependency here */
if ((self->input_data_socket = do_directtcp_connect(self,
elt->upstream->output_listen_addrs)) == -1)
break;
self->read_fdp = &self->input_data_socket;
if ((self->output_data_socket = do_directtcp_connect(self,
elt->downstream->input_listen_addrs)) == -1)
break;
self->write_fdp = &self->output_data_socket;
read_and_write(self);
break;
case mech_pair(XFER_MECH_SHM_RING, XFER_MECH_PUSH_BUFFER_STATIC):
shm_ring_and_push_buffer_static(self);
break;
}
send_xfer_done(self);
return NULL;
}
/*
* Implementation
*/
static gboolean
setup_impl(
XferElement *elt)
{
XferElementGlue *self = (XferElementGlue *)elt;
gboolean need_ring = FALSE;
gboolean need_listen_input = FALSE;
gboolean need_listen_output = FALSE;
g_assert(elt->input_mech != XFER_MECH_NONE);
g_assert(elt->output_mech != XFER_MECH_NONE);
g_assert(elt->input_mech != elt->output_mech);
self->read_fdp = NULL;
self->write_fdp = NULL;
self->on_push = PUSH_INVALID;
self->on_pull = PULL_INVALID;
self->need_thread = FALSE;
g_debug("setup_impl: %d, %d", elt->input_mech, elt->output_mech);
switch (mech_pair(elt->input_mech, elt->output_mech)) {
case mech_pair(XFER_MECH_READFD, XFER_MECH_WRITEFD):
/* thread will read from one fd and write to the other */
self->read_fdp = &neighboring_element_fd;
self->write_fdp = &neighboring_element_fd;
self->need_thread = TRUE;
break;
case mech_pair(XFER_MECH_READFD, XFER_MECH_PUSH_BUFFER):
case mech_pair(XFER_MECH_READFD, XFER_MECH_PUSH_BUFFER_STATIC):
/* thread will read from one fd and call push_buffer downstream */
self->read_fdp = &neighboring_element_fd;
self->need_thread = TRUE;
break;
case mech_pair(XFER_MECH_READFD, XFER_MECH_PULL_BUFFER):
case mech_pair(XFER_MECH_READFD, XFER_MECH_PULL_BUFFER_STATIC):
self->read_fdp = &neighboring_element_fd;
self->on_pull = PULL_FROM_FD;
break;
case mech_pair(XFER_MECH_READFD, XFER_MECH_DIRECTTCP_LISTEN):
/* thread will connect for output, then read from fd and write to the
* socket. */
self->read_fdp = &neighboring_element_fd;
self->need_thread = TRUE;
break;
case mech_pair(XFER_MECH_READFD, XFER_MECH_DIRECTTCP_CONNECT):
/* thread will accept output conn, then read from upstream and write to socket */
self->read_fdp = &neighboring_element_fd;
self->need_thread = TRUE;
need_listen_output = TRUE;
break;
case mech_pair(XFER_MECH_READFD, XFER_MECH_MEM_RING):
self->read_fdp = &neighboring_element_fd;
self->mem_ring = create_mem_ring();
self->need_thread = TRUE;
break;
case mech_pair(XFER_MECH_READFD, XFER_MECH_SHM_RING):
self->read_fdp = &neighboring_element_fd;
self->need_thread = TRUE;
break;
case mech_pair(XFER_MECH_WRITEFD, XFER_MECH_READFD):
make_pipe(self);
g_assert(xfer_element_swap_input_fd(elt, self->pipe[1]) == -1);
self->pipe[1] = -1; /* upstream will close this for us */
g_assert(xfer_element_swap_output_fd(elt, self->pipe[0]) == -1);
self->pipe[0] = -1; /* downstream will close this for us */
break;
case mech_pair(XFER_MECH_WRITEFD, XFER_MECH_PUSH_BUFFER):
case mech_pair(XFER_MECH_WRITEFD, XFER_MECH_PUSH_BUFFER_STATIC):
/* thread will read from pipe and call downstream's push_buffer */
make_pipe(self);
g_assert(xfer_element_swap_input_fd(elt, self->pipe[1]) == -1);
self->pipe[1] = -1; /* upstream will close this for us */
self->read_fdp = &self->pipe[0];
self->need_thread = TRUE;
break;
case mech_pair(XFER_MECH_WRITEFD, XFER_MECH_PULL_BUFFER):
case mech_pair(XFER_MECH_WRITEFD, XFER_MECH_PULL_BUFFER_STATIC):
make_pipe(self);
g_assert(xfer_element_swap_input_fd(elt, self->pipe[1]) == -1);
self->pipe[1] = -1; /* upstream will close this for us */
self->on_pull = PULL_FROM_FD;
self->read_fdp = &self->pipe[0];
break;
case mech_pair(XFER_MECH_WRITEFD, XFER_MECH_DIRECTTCP_LISTEN):
/* thread will connect for output, then read from pipe and write to socket */
make_pipe(self);
g_assert(xfer_element_swap_input_fd(elt, self->pipe[1]) == -1);
self->pipe[1] = -1; /* upstream will close this for us */
self->read_fdp = &self->pipe[0];
self->need_thread = TRUE;
break;
case mech_pair(XFER_MECH_WRITEFD, XFER_MECH_DIRECTTCP_CONNECT):
/* thread will accept output conn, then read from pipe and write to socket */
make_pipe(self);
g_assert(xfer_element_swap_input_fd(elt, self->pipe[1]) == -1);
self->pipe[1] = -1; /* upstream will close this for us */
self->read_fdp = &self->pipe[0];
self->need_thread = TRUE;
need_listen_output = TRUE;
break;
case mech_pair(XFER_MECH_WRITEFD, XFER_MECH_MEM_RING):
make_pipe(self);
g_assert(xfer_element_swap_input_fd(elt, self->pipe[1]) == -1);
self->pipe[1] = -1; /* upstream will close this for us */
self->read_fdp = &self->pipe[0];
self->mem_ring = create_mem_ring();
self->need_thread = TRUE;
break;
case mech_pair(XFER_MECH_WRITEFD, XFER_MECH_SHM_RING):
make_pipe(self);
g_assert(xfer_element_swap_input_fd(elt, self->pipe[1]) == -1);
self->pipe[1] = -1; /* upstream will close this for us */
self->read_fdp = &self->pipe[0];
self->need_thread = TRUE;
break;
case mech_pair(XFER_MECH_PUSH_BUFFER, XFER_MECH_READFD):
case mech_pair(XFER_MECH_PUSH_BUFFER_STATIC, XFER_MECH_READFD):
make_pipe(self);
g_assert(xfer_element_swap_output_fd(elt, self->pipe[0]) == -1);
self->pipe[0] = -1; /* downstream will close this for us */
self->on_push = PUSH_TO_FD;
self->write_fdp = &self->pipe[1];
break;
case mech_pair(XFER_MECH_PUSH_BUFFER, XFER_MECH_WRITEFD):
case mech_pair(XFER_MECH_PUSH_BUFFER_STATIC, XFER_MECH_WRITEFD):
self->on_push = PUSH_TO_FD;
self->write_fdp = &neighboring_element_fd;
break;
case mech_pair(XFER_MECH_PUSH_BUFFER, XFER_MECH_PULL_BUFFER):
self->on_push = PUSH_TO_RING_BUFFER;
self->on_pull = PULL_FROM_RING_BUFFER;
need_ring = TRUE;
break;
case mech_pair(XFER_MECH_PUSH_BUFFER, XFER_MECH_DIRECTTCP_LISTEN):
case mech_pair(XFER_MECH_PUSH_BUFFER_STATIC, XFER_MECH_DIRECTTCP_LISTEN):
/* push will connect for output first */
self->on_push = PUSH_TO_FD | PUSH_CONNECT_FIRST;
break;
case mech_pair(XFER_MECH_PUSH_BUFFER, XFER_MECH_DIRECTTCP_CONNECT):
case mech_pair(XFER_MECH_PUSH_BUFFER_STATIC, XFER_MECH_DIRECTTCP_CONNECT):
/* push will accept for output first */
self->on_push = PUSH_TO_FD | PUSH_ACCEPT_FIRST;
need_listen_output = TRUE;
break;
case mech_pair(XFER_MECH_PULL_BUFFER, XFER_MECH_READFD):
case mech_pair(XFER_MECH_PULL_BUFFER_STATIC, XFER_MECH_READFD):
/* thread will pull from upstream and write to pipe */
make_pipe(self);
g_assert(xfer_element_swap_output_fd(elt, self->pipe[0]) == -1);
self->pipe[0] = -1; /* downstream will close this for us */
self->write_fdp = &self->pipe[1];
self->need_thread = TRUE;
break;
case mech_pair(XFER_MECH_PULL_BUFFER, XFER_MECH_WRITEFD):
case mech_pair(XFER_MECH_PULL_BUFFER_STATIC, XFER_MECH_WRITEFD):
/* thread will pull from upstream and write to downstream */
self->write_fdp = &neighboring_element_fd;
self->need_thread = TRUE;
break;
case mech_pair(XFER_MECH_PULL_BUFFER, XFER_MECH_PUSH_BUFFER):
case mech_pair(XFER_MECH_PULL_BUFFER_STATIC, XFER_MECH_PUSH_BUFFER_STATIC):
/* thread will pull from upstream and push to downstream */
self->need_thread = TRUE;
break;
case mech_pair(XFER_MECH_PULL_BUFFER, XFER_MECH_DIRECTTCP_LISTEN):
case mech_pair(XFER_MECH_PULL_BUFFER_STATIC, XFER_MECH_DIRECTTCP_LISTEN):
/* thread will connect for output, then pull from upstream and write to socket */
self->need_thread = TRUE;
break;
case mech_pair(XFER_MECH_PULL_BUFFER, XFER_MECH_DIRECTTCP_CONNECT):
case mech_pair(XFER_MECH_PULL_BUFFER_STATIC, XFER_MECH_DIRECTTCP_CONNECT):
/* thread will accept for output, then pull from upstream and write to socket */
self->need_thread = TRUE;
need_listen_output = TRUE;
break;
// case mech_pair(XFER_MECH_PULL_BUFFER_STATIC, XFER_MECH_MEM_RING):
// /* thread will pull_static from upstream and add to a mem_ring */
// self->need_thread = TRUE;
// break;
case mech_pair(XFER_MECH_PULL_BUFFER_STATIC, XFER_MECH_SHM_RING):
/* thread will pull_static from upstream and add to a mem_ring */
self->need_thread = TRUE;
break;
case mech_pair(XFER_MECH_DIRECTTCP_LISTEN, XFER_MECH_READFD):
/* thread will accept for input, then read from socket and write to pipe */
make_pipe(self);
g_assert(xfer_element_swap_output_fd(elt, self->pipe[0]) == -1);
self->pipe[0] = -1; /* downstream will close this for us */
self->write_fdp = &self->pipe[1];
self->need_thread = TRUE;
need_listen_input = TRUE;
break;
case mech_pair(XFER_MECH_DIRECTTCP_LISTEN, XFER_MECH_WRITEFD):
/* thread will accept for input, then read from socket and write to downstream */
self->write_fdp = &neighboring_element_fd;
self->need_thread = TRUE;
need_listen_input = TRUE;
break;
case mech_pair(XFER_MECH_DIRECTTCP_LISTEN, XFER_MECH_PUSH_BUFFER):
case mech_pair(XFER_MECH_DIRECTTCP_LISTEN, XFER_MECH_PUSH_BUFFER_STATIC):
/* thread will accept for input, then read from socket and push downstream */
self->need_thread = TRUE;
need_listen_input = TRUE;
break;
case mech_pair(XFER_MECH_DIRECTTCP_LISTEN, XFER_MECH_PULL_BUFFER):
case mech_pair(XFER_MECH_DIRECTTCP_LISTEN, XFER_MECH_PULL_BUFFER_STATIC):
/* first pull will accept for input, then read from socket */
self->on_pull = PULL_FROM_FD | PULL_ACCEPT_FIRST;
need_listen_input = TRUE;
break;
case mech_pair(XFER_MECH_DIRECTTCP_LISTEN, XFER_MECH_DIRECTTCP_CONNECT):
/* thread will accept on both sides, then copy from socket to socket */
self->need_thread = TRUE;
need_listen_input = TRUE;
need_listen_output = TRUE;
break;
case mech_pair(XFER_MECH_DIRECTTCP_LISTEN, XFER_MECH_MEM_RING):
need_listen_input = TRUE;
self->mem_ring = create_mem_ring();
self->need_thread = TRUE;
break;
case mech_pair(XFER_MECH_DIRECTTCP_CONNECT, XFER_MECH_MEM_RING):
self->mem_ring = create_mem_ring();
self->need_thread = TRUE;
break;
case mech_pair(XFER_MECH_DIRECTTCP_LISTEN, XFER_MECH_SHM_RING):
need_listen_input = TRUE;
self->need_thread = TRUE;
break;
case mech_pair(XFER_MECH_DIRECTTCP_CONNECT, XFER_MECH_SHM_RING):
self->need_thread = TRUE;
break;
case mech_pair(XFER_MECH_DIRECTTCP_CONNECT, XFER_MECH_READFD):
/* thread will connect for input, then read from socket and write to pipe */
make_pipe(self);
g_assert(xfer_element_swap_output_fd(elt, self->pipe[0]) == -1);
self->pipe[0] = -1; /* downstream will close this for us */
self->write_fdp = &self->pipe[1];
self->need_thread = TRUE;
break;
case mech_pair(XFER_MECH_DIRECTTCP_CONNECT, XFER_MECH_WRITEFD):
/* thread will connect for input, then read from socket and write to downstream */
self->write_fdp = &neighboring_element_fd;
self->need_thread = TRUE;
break;
case mech_pair(XFER_MECH_DIRECTTCP_CONNECT, XFER_MECH_PUSH_BUFFER):
case mech_pair(XFER_MECH_DIRECTTCP_CONNECT, XFER_MECH_PUSH_BUFFER_STATIC):
/* thread will connect for input, then read from socket and push downstream */
self->need_thread = TRUE;
break;
case mech_pair(XFER_MECH_DIRECTTCP_CONNECT, XFER_MECH_PULL_BUFFER):
case mech_pair(XFER_MECH_DIRECTTCP_CONNECT, XFER_MECH_PULL_BUFFER_STATIC):
/* first pull will connect for input, then read from socket */
self->on_pull = PULL_FROM_FD | PULL_CONNECT_FIRST;
break;
case mech_pair(XFER_MECH_DIRECTTCP_CONNECT, XFER_MECH_DIRECTTCP_LISTEN):
/* thread will connect on both sides, then copy from socket to socket */
self->on_pull = PULL_FROM_FD | PULL_ACCEPT_FIRST;
self->need_thread = TRUE;
break;
case mech_pair(XFER_MECH_SHM_RING, XFER_MECH_PUSH_BUFFER_STATIC):
elt->shm_ring = shm_ring_create(NULL);
self->need_thread = TRUE;
break;
default:
g_debug("setup_impl: %d, %d", elt->input_mech, elt->output_mech);
g_assert_not_reached();
break;
}
/* set up ring if desired */
if (need_ring) {
self->ring = g_try_malloc(sizeof(*self->ring) * GLUE_RING_BUFFER_SIZE);
if (self->ring == NULL) {
xfer_cancel_with_error(elt, "Can't allocate memory for ring");
return FALSE;
}
self->ring_used_sem = amsemaphore_new_with_value(0);
self->ring_free_sem = amsemaphore_new_with_value(GLUE_RING_BUFFER_SIZE);
}
if (need_listen_input) {
if (!do_directtcp_listen(elt,
&self->input_listen_socket, &elt->input_listen_addrs))
return FALSE;
}
if (need_listen_output) {
if (!do_directtcp_listen(elt,
&self->output_listen_socket, &elt->output_listen_addrs))
return FALSE;
}
return TRUE;
}
static mem_ring_t *
get_mem_ring_impl(
XferElement *elt)
{
XferElementGlue *self = (XferElementGlue *)elt;
return self->mem_ring;
}
static gboolean
start_impl(
XferElement *elt)
{
XferElementGlue *self = (XferElementGlue *)elt;
if (self->need_thread)
self->thread = g_thread_create(worker_thread, (gpointer)self, TRUE, NULL);
/* we're active if we have a thread that will eventually die */
return self->need_thread;
}
static gpointer
pull_buffer_impl(
XferElement *elt,
size_t *size)
{
XferElementGlue *self = XFER_ELEMENT_GLUE(elt);
g_debug("pUll_buffer_impl");
/* accept first, if required */
if (self->on_pull & PULL_ACCEPT_FIRST) {
/* don't accept the next time around */
self->on_pull &= ~PULL_ACCEPT_FIRST;
if (elt->cancelled) {
*size = 0;
return NULL;
}
if ((self->input_data_socket = do_directtcp_accept(self,
&self->input_listen_socket)) == -1) {
/* do_directtcp_accept already signalled an error; xfer
* is cancelled */
*size = 0;
return NULL;
}
/* read from this new socket */
self->read_fdp = &self->input_data_socket;
} else if (self->on_pull & PULL_CONNECT_FIRST) {
/* or connect first, if required */
/* don't connect the next time around */
self->on_pull &= ~PULL_CONNECT_FIRST;
if (elt->cancelled) {
*size = 0;
return NULL;
}
if ((self->input_data_socket = do_directtcp_connect(self,
elt->upstream->output_listen_addrs)) == -1) {
/* do_directtcp_connect already signalled an error; xfer
* is cancelled */
*size = 0;
return NULL;
}
/* read from this new socket */
self->read_fdp = &self->input_data_socket;
}
switch (self->on_pull) {
case PULL_FROM_RING_BUFFER: {
gpointer buf;
if (elt->cancelled) {
/* the finalize method will empty the ring buffer */
*size = 0;
return NULL;
}
/* make sure there's at least one element available */
amsemaphore_down(self->ring_used_sem);
/* get it */
buf = self->ring[self->ring_tail].buf;
*size = self->ring[self->ring_tail].size;
self->ring_tail = (self->ring_tail + 1) % GLUE_RING_BUFFER_SIZE;
/* and mark this element as free to be overwritten */
amsemaphore_up(self->ring_free_sem);
return buf;
}
case PULL_FROM_FD: {
int fd = get_read_fd(self);
char *buf;
ssize_t len;
/* if the fd is already closed, it's possible upstream bailed out
* so quickly that we didn't even get a look at the fd */
if (elt->cancelled || fd == -1) {
if (fd != -1) {
if (elt->expect_eof)
xfer_element_drain_fd(fd);
close_read_fd(self);
}
*size = 0;
return NULL;
}
buf = g_malloc(GLUE_BUFFER_SIZE);
/* read from upstream */
len = read_fully(fd, buf, GLUE_BUFFER_SIZE, NULL);
if (len < GLUE_BUFFER_SIZE) {
if (errno) {
if (!elt->cancelled) {
xfer_cancel_with_error(elt,
_("Error reading from fd %d: %s"), fd, strerror(errno));
wait_until_xfer_cancelled(elt->xfer);
}
/* return an EOF */
amfree(buf);
len = 0;
/* and finish off the upstream */
if (elt->expect_eof) {
xfer_element_drain_fd(fd);
}
close_read_fd(self);
} else if (len == 0) {
/* EOF */
g_free(buf);
buf = NULL;
*size = 0;
/* signal EOF to downstream */
close_read_fd(self);
}
}
*size = (size_t)len;
return buf;
}
default:
case PULL_INVALID:
g_assert_not_reached();
return NULL;
}
}
static gpointer
pull_buffer_static_impl(
XferElement *elt,
gpointer buf,
size_t block_size,
size_t *size)
{
XferElementGlue *self = XFER_ELEMENT_GLUE(elt);
g_debug("pUll_buffer_impl");
/* accept first, if required */
if (self->on_pull & PULL_ACCEPT_FIRST) {
/* don't accept the next time around */
self->on_pull &= ~PULL_ACCEPT_FIRST;
if (elt->cancelled) {
*size = 0;
return NULL;
}
if ((self->input_data_socket = do_directtcp_accept(self,
&self->input_listen_socket)) == -1) {
/* do_directtcp_accept already signalled an error; xfer
* is cancelled */
*size = 0;
return NULL;
}
/* read from this new socket */
self->read_fdp = &self->input_data_socket;
} else if (self->on_pull & PULL_CONNECT_FIRST) {
/* or connect first, if required */
/* don't connect the next time around */
self->on_pull &= ~PULL_CONNECT_FIRST;
if (elt->cancelled) {
*size = 0;
return NULL;
}
if ((self->input_data_socket = do_directtcp_connect(self,
elt->upstream->output_listen_addrs)) == -1) {
/* do_directtcp_connect already signalled an error; xfer
* is cancelled */
*size = 0;
return NULL;
}
/* read from this new socket */
self->read_fdp = &self->input_data_socket;
}
switch (self->on_pull) {
case PULL_FROM_RING_BUFFER: {
gpointer buf;
g_critical("PULL_FROM_RING_BUFFER unimplemented");
if (elt->cancelled) {
/* the finalize method will empty the ring buffer */
*size = 0;
return NULL;
}
/* make sure there's at least one element available */
amsemaphore_down(self->ring_used_sem);
/* get it */
buf = self->ring[self->ring_tail].buf;
*size = self->ring[self->ring_tail].size;
self->ring_tail = (self->ring_tail + 1) % GLUE_RING_BUFFER_SIZE;
/* and mark this element as free to be overwritten */
amsemaphore_up(self->ring_free_sem);
return buf;
}
case PULL_FROM_FD: {
int fd = get_read_fd(self);
ssize_t len;
/* if the fd is already closed, it's possible upstream bailed out
* so quickly that we didn't even get a look at the fd */
if (elt->cancelled || fd == -1) {
if (fd != -1) {
if (elt->expect_eof)
xfer_element_drain_fd(fd);
close_read_fd(self);
}
*size = 0;
return NULL;
}
/* read from upstream */
len = read_fully(fd, buf, block_size, NULL);
if (len < (ssize_t)block_size) {
if (errno) {
if (!elt->cancelled) {
xfer_cancel_with_error(elt,
_("Error reading from fd %d: %s"), fd, strerror(errno));
wait_until_xfer_cancelled(elt->xfer);
}
/* return an EOF */
buf = NULL;
len = 0;
/* and finish off the upstream */
if (elt->expect_eof) {
xfer_element_drain_fd(fd);
}
close_read_fd(self);
} else if (len == 0) {
/* EOF */
buf = NULL;
len = 0;
/* signal EOF to downstream */
close_read_fd(self);
}
}
*size = (size_t)len;
return buf;
}
default:
case PULL_INVALID:
g_assert_not_reached();
return NULL;
}
}
static void
push_buffer_impl(
XferElement *elt,
gpointer buf,
size_t len)
{
XferElementGlue *self = (XferElementGlue *)elt;
XMsg *msg;
g_debug("push_buffer_impl");
/* accept first, if required */
if (self->on_push & PUSH_ACCEPT_FIRST) {
/* don't accept the next time around */
self->on_push &= ~PUSH_ACCEPT_FIRST;
if (elt->cancelled) {
return;
}
if ((self->output_data_socket = do_directtcp_accept(self,
&self->output_listen_socket)) == -1) {
/* do_directtcp_accept already signalled an error; xfer
* is cancelled */
return;
}
/* write to this new socket */
self->write_fdp = &self->output_data_socket;
}
/* or connect first, if required */
if (self->on_push & PUSH_CONNECT_FIRST) {
/* don't accept the next time around */
self->on_push &= ~PUSH_CONNECT_FIRST;
if (elt->cancelled) {
return;
}
if ((self->output_data_socket = do_directtcp_connect(self,
elt->downstream->input_listen_addrs)) == -1) {
/* do_directtcp_connect already signalled an error; xfer
* is cancelled */
return;
}
/* read from this new socket */
self->write_fdp = &self->output_data_socket;
}
switch (self->on_push) {
case PUSH_TO_RING_BUFFER:
/* just drop packets if the transfer has been cancelled */
if (elt->cancelled) {
amfree(buf);
return;
}
/* make sure there's at least one element free */
amsemaphore_down(self->ring_free_sem);
/* set it */
self->ring[self->ring_head].buf = buf;
self->ring[self->ring_head].size = len;
self->ring_head = (self->ring_head + 1) % GLUE_RING_BUFFER_SIZE;
/* and mark this element as available for reading */
amsemaphore_up(self->ring_used_sem);
return;
case PUSH_TO_FD: {
int fd = get_write_fd(self);
/* if the fd is already closed, it's possible upstream bailed out
* so quickly that we didn't even get a look at the fd. In this
* case we can assume the xfer has been cancelled and just discard
* the data. */
if (fd == -1)
return;
if (elt->cancelled) {
if (!elt->expect_eof || !buf) {
close_write_fd(self);
/* hack to ensure we won't close the fd again, if we get another push */
elt->expect_eof = TRUE;
}
amfree(buf);
return;
}
/* write the full buffer to the fd, or close on EOF */
if (buf) {
if (!elt->downstream->drain_mode &&
full_write(fd, buf, len) < len) {
if (elt->downstream->must_drain) {
g_debug("Error writing to fd %d: %s",
fd, strerror(errno));
} else if (elt->downstream->ignore_broken_pipe &&
errno == EPIPE) {
} else {
if (!elt->cancelled) {
xfer_cancel_with_error(elt,
_("Error writing to fd %d: %s"),
fd, strerror(errno));
wait_until_xfer_cancelled(elt->xfer);
}
/* nothing special to do to handle a cancellation */
}
elt->downstream->drain_mode = TRUE;
}
crc32_add((uint8_t *)buf, len, &elt->crc);
amfree(buf);
} else {
g_debug("sending XMSG_CRC message");
g_debug("push_to_fd CRC: %08x", crc32_finish(&elt->crc));
msg = xmsg_new(elt->downstream, XMSG_CRC, 0);
msg->crc = crc32_finish(&elt->crc);
msg->size = elt->crc.size;
xfer_queue_message(elt->xfer, msg);
close_write_fd(self);
}
return;
}
default:
case PUSH_INVALID:
g_assert_not_reached();
break;
}
}
static void
push_buffer_static_impl(
XferElement *elt,
gpointer buf,
size_t len)
{
XferElementGlue *self = (XferElementGlue *)elt;
XMsg *msg;
/* accept first, if required */
if (self->on_push & PUSH_ACCEPT_FIRST) {
/* don't accept the next time around */
self->on_push &= ~PUSH_ACCEPT_FIRST;
if (elt->cancelled) {
return;
}
if ((self->output_data_socket = do_directtcp_accept(self,
&self->output_listen_socket)) == -1) {
/* do_directtcp_accept already signalled an error; xfer
* is cancelled */
return;
}
/* write to this new socket */
self->write_fdp = &self->output_data_socket;
}
/* or connect first, if required */
if (self->on_push & PUSH_CONNECT_FIRST) {
/* don't accept the next time around */
self->on_push &= ~PUSH_CONNECT_FIRST;
if (elt->cancelled) {
return;
}
if ((self->output_data_socket = do_directtcp_connect(self,
elt->downstream->input_listen_addrs)) == -1) {
/* do_directtcp_connect already signalled an error; xfer
* is cancelled */
return;
}
/* read from this new socket */
self->write_fdp = &self->output_data_socket;
}
switch (self->on_push) {
case PUSH_TO_RING_BUFFER:
/* just drop packets if the transfer has been cancelled */
if (elt->cancelled) {
amfree(buf);
return;
}
g_critical("PUSH_TO_RING_BUFFER not implemented");
/* make sure there's at least one element free */
amsemaphore_down(self->ring_free_sem);
/* set it */
self->ring[self->ring_head].buf = buf;
self->ring[self->ring_head].size = len;
self->ring_head = (self->ring_head + 1) % GLUE_RING_BUFFER_SIZE;
/* and mark this element as available for reading */
amsemaphore_up(self->ring_used_sem);
return;
case PUSH_TO_FD: {
int fd = get_write_fd(self);
/* if the fd is already closed, it's possible upstream bailed out
* so quickly that we didn't even get a look at the fd. In this
* case we can assume the xfer has been cancelled and just discard
* the data. */
if (fd == -1)
return;
if (elt->cancelled) {
if (!elt->expect_eof || !buf) {
close_write_fd(self);
/* hack to ensure we won't close the fd again, if we get another push */
elt->expect_eof = TRUE;
}
return;
}
/* write the full buffer to the fd, or close on EOF */
if (buf) {
if (!elt->downstream->drain_mode &&
full_write(fd, buf, len) < len) {
if (elt->downstream->must_drain) {
g_debug("Error writing to fd %d: %s",
fd, strerror(errno));
} else if (elt->downstream->ignore_broken_pipe &&
errno == EPIPE) {
} else {
if (!elt->cancelled) {
xfer_cancel_with_error(elt,
_("Error writing to fd %d: %s"),
fd, strerror(errno));
wait_until_xfer_cancelled(elt->xfer);
}
/* nothing special to do to handle a cancellation */
}
elt->downstream->drain_mode = TRUE;
}
crc32_add((uint8_t *)buf, len, &elt->crc);
} else {
g_debug("sending XMSG_CRC message");
g_debug("push_to_fd CRC: %08x", crc32_finish(&elt->crc));
msg = xmsg_new(elt->downstream, XMSG_CRC, 0);
msg->crc = crc32_finish(&elt->crc);
msg->size = elt->crc.size;
xfer_queue_message(elt->xfer, msg);
close_write_fd(self);
}
return;
}
default:
case PUSH_INVALID:
g_assert_not_reached();
break;
}
}
static void
instance_init(
XferElementGlue *self)
{
XferElement *elt = (XferElement *)self;
elt->can_generate_eof = TRUE;
self->pipe[0] = self->pipe[1] = -1;
self->input_listen_socket = -1;
self->output_listen_socket = -1;
self->input_data_socket = -1;
self->output_data_socket = -1;
self->read_fd = -1;
self->write_fd = -1;
crc32_init(&elt->crc);
}
static void
finalize_impl(
GObject * obj_self)
{
XferElementGlue *self = XFER_ELEMENT_GLUE(obj_self);
/* first make sure the worker thread has finished up */
if (self->thread)
g_thread_join(self->thread);
/* close our pipes and fd's if they're still open */
if (self->pipe[0] != -1) close(self->pipe[0]);
if (self->pipe[1] != -1) close(self->pipe[1]);
if (self->input_data_socket != -1) close(self->input_data_socket);
if (self->output_data_socket != -1) close(self->output_data_socket);
if (self->input_listen_socket != -1) close(self->input_listen_socket);
if (self->output_listen_socket != -1) close(self->output_listen_socket);
if (self->read_fd != -1) close(self->read_fd);
if (self->write_fd != -1) close(self->write_fd);
if (self->ring) {
/* empty the ring buffer, ignoring syncronization issues */
while (self->ring_used_sem->value) {
if (self->ring[self->ring_tail].buf)
amfree(self->ring[self->ring_tail].buf);
self->ring_tail = (self->ring_tail + 1) % GLUE_RING_BUFFER_SIZE;
}
amfree(self->ring);
amsemaphore_free(self->ring_used_sem);
amsemaphore_free(self->ring_free_sem);
}
/* chain up */
G_OBJECT_CLASS(parent_class)->finalize(obj_self);
}
static xfer_element_mech_pair_t _pairs[] = {
{ XFER_MECH_READFD, XFER_MECH_WRITEFD, XFER_NROPS(2), XFER_NTHREADS(1), XFER_NALLOC(0) }, /* splice or copy */
{ XFER_MECH_READFD, XFER_MECH_PUSH_BUFFER, XFER_NROPS(1), XFER_NTHREADS(1), XFER_NALLOC(1) }, /* read and call */
{ XFER_MECH_READFD, XFER_MECH_PUSH_BUFFER_STATIC, XFER_NROPS(1), XFER_NTHREADS(1), XFER_NALLOC(0) }, /* read and call */
{ XFER_MECH_READFD, XFER_MECH_PULL_BUFFER, XFER_NROPS(1), XFER_NTHREADS(0), XFER_NALLOC(1) }, /* read on demand */
{ XFER_MECH_READFD, XFER_MECH_PULL_BUFFER_STATIC, XFER_NROPS(1), XFER_NTHREADS(0), XFER_NALLOC(1) }, /* read on demand */
{ XFER_MECH_READFD, XFER_MECH_DIRECTTCP_LISTEN, XFER_NROPS(2), XFER_NTHREADS(1), XFER_NALLOC(0) }, /* splice or copy */
{ XFER_MECH_READFD, XFER_MECH_DIRECTTCP_CONNECT, XFER_NROPS(2), XFER_NTHREADS(1), XFER_NALLOC(0) }, /* splice or copy */
{ XFER_MECH_READFD, XFER_MECH_MEM_RING, XFER_NROPS(1), XFER_NTHREADS(1), XFER_NALLOC(0) }, /* read and add to mem ring */
{ XFER_MECH_READFD, XFER_MECH_SHM_RING, XFER_NROPS(1), XFER_NTHREADS(1), XFER_NALLOC(0) }, /* read and add to shm ring */
{ XFER_MECH_WRITEFD, XFER_MECH_READFD, XFER_NROPS(0), XFER_NTHREADS(0), XFER_NALLOC(0) }, /* pipe */
{ XFER_MECH_WRITEFD, XFER_MECH_PUSH_BUFFER, XFER_NROPS(1), XFER_NTHREADS(1), XFER_NALLOC(1) }, /* pipe + read and call*/
{ XFER_MECH_WRITEFD, XFER_MECH_PUSH_BUFFER_STATIC, XFER_NROPS(1), XFER_NTHREADS(1), XFER_NALLOC(0) }, /* pipe + read and call*/
{ XFER_MECH_WRITEFD, XFER_MECH_PULL_BUFFER, XFER_NROPS(1), XFER_NTHREADS(0), XFER_NALLOC(1) }, /* pipe + read on demand */
{ XFER_MECH_WRITEFD, XFER_MECH_PULL_BUFFER_STATIC, XFER_NROPS(1), XFER_NTHREADS(0), XFER_NALLOC(1) }, /* pipe + read on demand */
{ XFER_MECH_WRITEFD, XFER_MECH_DIRECTTCP_LISTEN, XFER_NROPS(2), XFER_NTHREADS(1), XFER_NALLOC(0) }, /* pipe + splice or copy*/
{ XFER_MECH_WRITEFD, XFER_MECH_DIRECTTCP_CONNECT, XFER_NROPS(2), XFER_NTHREADS(1), XFER_NALLOC(0) }, /* splice or copy + pipe */
{ XFER_MECH_WRITEFD, XFER_MECH_MEM_RING, XFER_NROPS(1), XFER_NTHREADS(1), XFER_NALLOC(0) }, /* pipe + read and add to mem ring */
{ XFER_MECH_WRITEFD, XFER_MECH_SHM_RING, XFER_NROPS(1), XFER_NTHREADS(1), XFER_NALLOC(0) }, /* pipe + read and add to shm ring */
{ XFER_MECH_PUSH_BUFFER, XFER_MECH_READFD, XFER_NROPS(1), XFER_NTHREADS(0), XFER_NALLOC(0) }, /* write on demand + pipe */
{ XFER_MECH_PUSH_BUFFER, XFER_MECH_WRITEFD, XFER_NROPS(1), XFER_NTHREADS(0), XFER_NALLOC(0) }, /* write on demand */
{ XFER_MECH_PUSH_BUFFER, XFER_MECH_PULL_BUFFER, XFER_NROPS(0), XFER_NTHREADS(0), XFER_NALLOC(0) }, /* async queue */
{ XFER_MECH_PUSH_BUFFER, XFER_MECH_DIRECTTCP_LISTEN, XFER_NROPS(1), XFER_NTHREADS(0), XFER_NALLOC(0) }, /* write on demand */
{ XFER_MECH_PUSH_BUFFER, XFER_MECH_DIRECTTCP_CONNECT, XFER_NROPS(1), XFER_NTHREADS(0), XFER_NALLOC(0) }, /* write on demand */
{ XFER_MECH_PUSH_BUFFER_STATIC, XFER_MECH_READFD, XFER_NROPS(1), XFER_NTHREADS(0), XFER_NALLOC(0) }, /* write on demand + pipe */
{ XFER_MECH_PUSH_BUFFER_STATIC, XFER_MECH_WRITEFD, XFER_NROPS(1), XFER_NTHREADS(0), XFER_NALLOC(0) }, /* write on demand */
{ XFER_MECH_PUSH_BUFFER_STATIC, XFER_MECH_PULL_BUFFER_STATIC, XFER_NROPS(0), XFER_NTHREADS(0), XFER_NALLOC(0) }, /* async queue */
{ XFER_MECH_PUSH_BUFFER_STATIC, XFER_MECH_DIRECTTCP_LISTEN, XFER_NROPS(1), XFER_NTHREADS(0), XFER_NALLOC(0) }, /* write on demand */
{ XFER_MECH_PUSH_BUFFER_STATIC, XFER_MECH_DIRECTTCP_CONNECT, XFER_NROPS(1), XFER_NTHREADS(0), XFER_NALLOC(0) }, /* write on demand */
{ XFER_MECH_PULL_BUFFER, XFER_MECH_READFD, XFER_NROPS(1), XFER_NTHREADS(1), XFER_NALLOC(0) }, /* call and write + pipe */
{ XFER_MECH_PULL_BUFFER, XFER_MECH_WRITEFD, XFER_NROPS(1), XFER_NTHREADS(1), XFER_NALLOC(0) }, /* call and write */
{ XFER_MECH_PULL_BUFFER, XFER_MECH_PUSH_BUFFER, XFER_NROPS(0), XFER_NTHREADS(1), XFER_NALLOC(0) }, /* call and call */
{ XFER_MECH_PULL_BUFFER, XFER_MECH_DIRECTTCP_LISTEN, XFER_NROPS(1), XFER_NTHREADS(1), XFER_NALLOC(0) }, /* call and write */
{ XFER_MECH_PULL_BUFFER, XFER_MECH_DIRECTTCP_CONNECT, XFER_NROPS(1), XFER_NTHREADS(1), XFER_NALLOC(0) }, /* call and write */
{ XFER_MECH_PULL_BUFFER_STATIC, XFER_MECH_READFD, XFER_NROPS(1), XFER_NTHREADS(1), XFER_NALLOC(0) }, /* call and write + pipe */
{ XFER_MECH_PULL_BUFFER_STATIC, XFER_MECH_WRITEFD, XFER_NROPS(1), XFER_NTHREADS(1), XFER_NALLOC(0) }, /* call and write */
{ XFER_MECH_PULL_BUFFER_STATIC, XFER_MECH_PUSH_BUFFER_STATIC, XFER_NROPS(0), XFER_NTHREADS(1), XFER_NALLOC(0) }, /* call and call */
// { XFER_MECH_PULL_BUFFER_STATIC, XFER_MECH_DIRECTTCP_LISTEN, XFER_NROPS(1), XFER_NTHREADS(1), XFER_NALLOC(0) }, /* call and write */
{ XFER_MECH_PULL_BUFFER_STATIC, XFER_MECH_DIRECTTCP_CONNECT, XFER_NROPS(1), XFER_NTHREADS(1), XFER_NALLOC(0) }, /* call and write */
// { XFER_MECH_PULL_BUFFER_STATIC, XFER_MECH_MEM_RING, XFER_NROPS(1), XFER_NTHREADS(1), XFER_NALLOC(0) }, /* pull and add to mem ring*/
{ XFER_MECH_PULL_BUFFER_STATIC, XFER_MECH_SHM_RING, XFER_NROPS(1), XFER_NTHREADS(1), XFER_NALLOC(0) }, /* pull and add to shm ring*/
{ XFER_MECH_DIRECTTCP_LISTEN, XFER_MECH_READFD, XFER_NROPS(2), XFER_NTHREADS(1), XFER_NALLOC(0) }, /* splice or copy + pipe */
{ XFER_MECH_DIRECTTCP_LISTEN, XFER_MECH_WRITEFD, XFER_NROPS(2), XFER_NTHREADS(1), XFER_NALLOC(0) }, /* splice or copy */
{ XFER_MECH_DIRECTTCP_LISTEN, XFER_MECH_PUSH_BUFFER, XFER_NROPS(1), XFER_NTHREADS(1), XFER_NALLOC(1) }, /* read and call */
{ XFER_MECH_DIRECTTCP_LISTEN, XFER_MECH_PUSH_BUFFER_STATIC, XFER_NROPS(1), XFER_NTHREADS(1), XFER_NALLOC(0) }, /* read and call */
{ XFER_MECH_DIRECTTCP_LISTEN, XFER_MECH_PULL_BUFFER, XFER_NROPS(1), XFER_NTHREADS(0), XFER_NALLOC(1) }, /* read on demand */
{ XFER_MECH_DIRECTTCP_LISTEN, XFER_MECH_PULL_BUFFER_STATIC, XFER_NROPS(1), XFER_NTHREADS(0), XFER_NALLOC(1) }, /* read on demand */
{ XFER_MECH_DIRECTTCP_LISTEN, XFER_MECH_DIRECTTCP_CONNECT, XFER_NROPS(2), XFER_NTHREADS(1), XFER_NALLOC(0) }, /* splice or copy */
{ XFER_MECH_DIRECTTCP_LISTEN, XFER_MECH_MEM_RING, XFER_NROPS(1), XFER_NTHREADS(1), XFER_NALLOC(0) }, /* read and add to mem ring */
{ XFER_MECH_DIRECTTCP_LISTEN, XFER_MECH_SHM_RING, XFER_NROPS(1), XFER_NTHREADS(1), XFER_NALLOC(0) }, /* read and add to shm ring */
{ XFER_MECH_DIRECTTCP_CONNECT, XFER_MECH_READFD, XFER_NROPS(2), XFER_NTHREADS(1), XFER_NALLOC(0) }, /* splice or copy + pipe */
{ XFER_MECH_DIRECTTCP_CONNECT, XFER_MECH_WRITEFD, XFER_NROPS(2), XFER_NTHREADS(1), XFER_NALLOC(0) }, /* splice or copy + pipe */
{ XFER_MECH_DIRECTTCP_CONNECT, XFER_MECH_PUSH_BUFFER, XFER_NROPS(1), XFER_NTHREADS(1), XFER_NALLOC(1) }, /* read and call */
{ XFER_MECH_DIRECTTCP_CONNECT, XFER_MECH_PUSH_BUFFER_STATIC, XFER_NROPS(1), XFER_NTHREADS(1), XFER_NALLOC(0) }, /* read and call */
{ XFER_MECH_DIRECTTCP_CONNECT, XFER_MECH_PULL_BUFFER, XFER_NROPS(1), XFER_NTHREADS(0), XFER_NALLOC(1) }, /* read on demand */
{ XFER_MECH_DIRECTTCP_CONNECT, XFER_MECH_PULL_BUFFER_STATIC, XFER_NROPS(1), XFER_NTHREADS(0), XFER_NALLOC(1) }, /* read on demand */
{ XFER_MECH_DIRECTTCP_CONNECT, XFER_MECH_DIRECTTCP_LISTEN, XFER_NROPS(2), XFER_NTHREADS(1), XFER_NALLOC(0) }, /* splice or copy */
{ XFER_MECH_DIRECTTCP_CONNECT, XFER_MECH_MEM_RING, XFER_NROPS(1), XFER_NTHREADS(1), XFER_NALLOC(0) }, /* read and add to mem ring */
{ XFER_MECH_DIRECTTCP_CONNECT, XFER_MECH_SHM_RING, XFER_NROPS(1), XFER_NTHREADS(1), XFER_NALLOC(0) }, /* read and add to shm ring */
{ XFER_MECH_MEM_RING, XFER_MECH_READFD, XFER_NROPS(1), XFER_NTHREADS(1), XFER_NALLOC(0) }, /* */
{ XFER_MECH_MEM_RING, XFER_MECH_WRITEFD, XFER_NROPS(1), XFER_NTHREADS(1), XFER_NALLOC(0) }, /* */
//TODO { XFER_MECH_MEM_RING, XFER_MECH_DIRECTTCP_LISTEN, XFER_NROPS(1), XFER_NTHREADS(1), XFER_NALLOC(0) }, /* */
//TODO { XFER_MECH_MEM_RING, XFER_MECH_DIRECTTCP_CONNECT, XFER_NROPS(1), XFER_NTHREADS(1), XFER_NALLOC(0) }, /* */
{ XFER_MECH_SHM_RING, XFER_MECH_READFD, XFER_NROPS(1), XFER_NTHREADS(1), XFER_NALLOC(0) }, /* */
{ XFER_MECH_SHM_RING, XFER_MECH_WRITEFD, XFER_NROPS(1), XFER_NTHREADS(1), XFER_NALLOC(0) }, /* */
{ XFER_MECH_SHM_RING, XFER_MECH_DIRECTTCP_LISTEN, XFER_NROPS(1), XFER_NTHREADS(1), XFER_NALLOC(0) }, /* */
{ XFER_MECH_SHM_RING, XFER_MECH_DIRECTTCP_CONNECT, XFER_NROPS(1), XFER_NTHREADS(1), XFER_NALLOC(0) }, /* */
{ XFER_MECH_SHM_RING, XFER_MECH_PUSH_BUFFER_STATIC, XFER_NROPS(0), XFER_NTHREADS(1), XFER_NALLOC(0) }, /* */
/* terminator */
{ XFER_MECH_NONE, XFER_MECH_NONE, XFER_NROPS(0), XFER_NTHREADS(0), XFER_NALLOC(0) },
};
xfer_element_mech_pair_t *xfer_element_glue_mech_pairs = _pairs;
static void
class_init(
XferElementGlueClass * selfc)
{
XferElementClass *klass = XFER_ELEMENT_CLASS(selfc);
GObjectClass *goc = G_OBJECT_CLASS(selfc);
klass->setup = setup_impl;
klass->start = start_impl;
klass->get_mem_ring = get_mem_ring_impl;
klass->push_buffer = push_buffer_impl;
klass->push_buffer_static = push_buffer_static_impl;
klass->pull_buffer = pull_buffer_impl;
klass->pull_buffer_static = pull_buffer_static_impl;
klass->perl_class = "Amanda::Xfer::Element::Glue";
klass->mech_pairs = xfer_element_glue_mech_pairs;
goc->finalize = finalize_impl;
parent_class = g_type_class_peek_parent(selfc);
}
GType
xfer_element_glue_get_type (void)
{
static GType type = 0;
if (G_UNLIKELY(type == 0)) {
static const GTypeInfo info = {
sizeof (XferElementGlueClass),
(GBaseInitFunc) NULL,
(GBaseFinalizeFunc) NULL,
(GClassInitFunc) class_init,
(GClassFinalizeFunc) NULL,
NULL /* class_data */,
sizeof (XferElementGlue),
0 /* n_preallocs */,
(GInstanceInitFunc) instance_init,
NULL
};
type = g_type_register_static (XFER_ELEMENT_TYPE, "XferElementGlue", &info, 0);
}
return type;
}
/* create an element of this class; prototype is in xfer-element.h */
XferElement *
xfer_element_glue(void)
{
XferElementGlue *self = (XferElementGlue *)g_object_new(XFER_ELEMENT_GLUE_TYPE, NULL);
XferElement *elt = XFER_ELEMENT(self);
return elt;
}