/*
* Amanda, The Advanced Maryland Automatic Network Disk Archiver
* Copyright (c) 2016-2016 Carbonite, Inc. All Rights Reserved.
* All Rights Reserved.
*
* Permission to use, copy, modify, distribute, and sell this software and its
* documentation for any purpose is hereby granted without fee, provided that
* the above copyright notice appear in all copies and that both that
* copyright notice and this permission notice appear in supporting
* documentation, and that the name of U.M. not be used in advertising or
* publicity pertaining to distribution of the software without specific,
* written prior permission. U.M. makes no representations about the
* suitability of this software for any purpose. It is provided "as is"
* without express or implied warranty.
*
* U.M. DISCLAIMS ALL WARRANTIES WITH REGARD TO THIS SOFTWARE, INCLUDING ALL
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS, IN NO EVENT SHALL U.M.
* BE LIABLE FOR ANY SPECIAL, INDIRECT OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
* WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN ACTION
* OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN
* CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*
* Authors: the Amanda Development Team. Its members are listed in a
* file named AUTHORS, in the root directory of this distribution.
*/
/*
* shared memory ring buffer
*/
#include <config.h>
#include <stdint.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <time.h>
#include <sys/types.h>
#include <errno.h>
#include <unistd.h>
#include <arpa/inet.h>
#include <sys/mman.h>
#include <sys/stat.h>
#include <fcntl.h>
#include <glib.h>
#include <semaphore.h>
#include <glob.h>
#include "amanda.h"
#include "glib.h"
#include "conffile.h"
#include "security.h"
#include "shm-ring.h"
#define DEFAULT_SHM_RING_BLOCK_SIZE (NETWORK_BLOCK_BYTES)
#define DEFAULT_SHM_RING_SIZE (DEFAULT_SHM_RING_BLOCK_SIZE*8)
/* NetBSD only supports 14 character semaphore names */
#if __NetBSD__
# define SHM_CONTROL_NAME "/Ac-%04x-%05x"
# define SHM_DATA_NAME "/Ad-%04x-%05x"
# define SEM_WRITE_NAME "/Aw-%04x-%05x"
# define SEM_READ_NAME "/Ar-%04x-%05x"
# define SEM_READY_NAME "/Ay-%04x-%05x"
# define SEM_START_NAME "/As-%04x-%05x"
# define SHM_CONTROL_GLOB "/dev/shm/Ac-*-*"
# define AMANDA_GLOB "/dev/shm/A?-*-*"
#else
# define SHM_CONTROL_NAME "/amanda_shm_control-%d-%d"
# define SHM_DATA_NAME "/amanda_shm_data-%d-%d"
# define SEM_WRITE_NAME "/amanda_sem_write-%d-%d"
# define SEM_READ_NAME "/amanda_sem_read-%d-%d"
# define SEM_READY_NAME "/amanda_sem_ready-%d-%d"
# define SEM_START_NAME "/amanda_sem_start-%d-%d"
# define SHM_CONTROL_GLOB "/dev/shm/amanda_shm_control-*-*"
# define AMANDA_GLOB "/dev/shm/amanda*-*-*"
#endif
static int shm_ring_id = 0;
GMutex *shm_ring_mutex = NULL;
static GHashTable *hash_sem = NULL;
static void alloc_shm_ring(shm_ring_t *shm_ring);
static sem_t *am_sem_create(char *name);
static sem_t *am_sem_open(char *name);
static void am_sem_close(sem_t *sem);
static int
get_next_shm_ring_id(void)
{
int id;
g_mutex_lock(shm_ring_mutex);
id = shm_ring_id++;
g_mutex_unlock(shm_ring_mutex);
return id;
}
void
clean_shm_ring(void)
{
if (shm_ring_mutex) {
g_mutex_free(shm_ring_mutex);
}
}
void
cleanup_shm_ring(void)
{
glob_t globbuf;
char **aglob;
int r;
time_t now = time(NULL) - 300;
GHashTable *names;
names = g_hash_table_new_full(g_str_hash, g_str_equal, g_free, NULL);
r = glob(SHM_CONTROL_GLOB, GLOB_NOSORT, NULL, &globbuf);
if (r == 0) {
for (aglob = globbuf.gl_pathv; *aglob != NULL; aglob++) {
int cfd;
g_hash_table_insert(names, g_strdup(*aglob), GINT_TO_POINTER(1));
g_debug("cleanup_shm_ring: control_name: %s", *aglob);
cfd = shm_open(*aglob+8, O_RDONLY, 0);
if (cfd >= 0) {
struct stat statbuf;
if (fstat(cfd, &statbuf) == 0 &&
statbuf.st_atime < now &&
statbuf.st_mtime < now &&
statbuf.st_ctime < now &&
statbuf.st_size == sizeof(shm_ring_control_t)) {
shm_ring_control_t *mc;
mc = mmap(NULL, sizeof(shm_ring_control_t),
PROT_READ, MAP_SHARED, cfd, 0);
if (mc != MAP_FAILED) {
gboolean all_dead = TRUE;
int i;
g_hash_table_insert(names, g_strdup(mc->sem_write_name), GINT_TO_POINTER(1));
g_hash_table_insert(names, g_strdup(mc->sem_read_name), GINT_TO_POINTER(1));
g_hash_table_insert(names, g_strdup(mc->sem_ready_name), GINT_TO_POINTER(1));
g_hash_table_insert(names, g_strdup(mc->sem_start_name), GINT_TO_POINTER(1));
g_hash_table_insert(names, g_strdup(mc->shm_data_name), GINT_TO_POINTER(1));
for (i=0; i<SHM_RING_MAX_PID; i++) {
if (mc->pids[i] != 0) {
if (kill(mc->pids[i], 0) == -1) {
if (errno != ESRCH) {
all_dead = FALSE;
}
} else {
all_dead = FALSE;
}
}
}
// check all pids
if (all_dead) {
g_debug("sem_unlink %s", mc->sem_write_name);
g_debug("sem_unlink %s", mc->sem_read_name);
g_debug("sem_unlink %s", mc->sem_ready_name);
g_debug("sem_unlink %s", mc->sem_start_name);
g_debug("shm_unlink %s", mc->shm_data_name);
sem_unlink(mc->sem_write_name);
sem_unlink(mc->sem_read_name);
sem_unlink(mc->sem_ready_name);
sem_unlink(mc->sem_start_name);
shm_unlink(mc->shm_data_name);
munmap(mc, sizeof(shm_ring_control_t));
g_debug("shm_unlink %s", *aglob+8);
shm_unlink(*aglob+8);
} else {
munmap(mc, sizeof(shm_ring_control_t));
}
} else {
g_debug("mmap failed '%s': %s", *aglob+8, strerror(errno));
}
} else {
g_debug("fstat failed '%s': %s", *aglob+8, strerror(errno));
}
close(cfd);
} else {
g_debug("shm_open failed '%s': %s", *aglob+8, strerror(errno));
}
}
} else if (r == GLOB_NOSPACE) {
g_debug("glob failed because no space");
} else if (r == GLOB_ABORTED) {
g_debug("glob aborted");
} else if (r == GLOB_NOMATCH) {
// good
} else {
// impossible
}
globfree(&globbuf);
r = glob("AMANDA_GLOB", GLOB_NOSORT, NULL, &globbuf);
if (r == 0) {
int one_day_old = time(NULL) - 60*60*24;
for (aglob = globbuf.gl_pathv; *aglob != NULL; aglob++) {
if (!g_hash_table_lookup(names, *aglob)) {
struct stat statbuf;
if (stat(*aglob, &statbuf) == 0 &&
statbuf.st_mtime < one_day_old) {
g_debug("unlink unknown old file: %s", *aglob);
unlink(*aglob);
}
}
}
}
globfree(&globbuf);
g_hash_table_destroy(names);
}
int
shm_ring_sem_wait(
shm_ring_t *shm_ring,
sem_t *sem)
{
int i;
while(1) {
struct timespec tv = {time(NULL)+300, 0};
#ifdef HAVE_SEM_TIMEDWAIT
if (sem_timedwait(sem, &tv) == 0)
return 0;
#else
if (sem_wait(sem) == 0)
return 0;
#endif
if (shm_ring->mc->cancelled) {
g_debug("shm_ring_sem_wait: shm-ring is cancelled");
return -1;
}
if (errno == EINTR)
continue;
if (errno != ETIMEDOUT) {
goto failed_sem_wait;
}
/* Check all pids */
for (i=0; i<SHM_RING_MAX_PID; i++) {
if (shm_ring->mc->pids[i] != 0) {
if (kill(shm_ring->mc->pids[i], 0) == -1) {
if (errno == ESRCH) {
goto failed_sem_wait;
}
}
}
}
}
failed_sem_wait:
g_debug("shm_ring_sem_wait: failed_sem_wait: %s", strerror(errno));
shm_ring->mc->cancelled = 1;
sem_post(shm_ring->sem_read);
sem_post(shm_ring->sem_write);
sem_post(shm_ring->sem_ready);
sem_post(shm_ring->sem_start);
return -1;
}
void
fd_to_shm_ring(
int fd,
shm_ring_t *shm_ring,
crc_t *crc)
{
uint64_t write_offset;
uint64_t written;
uint64_t readx;
uint64_t shm_ring_size;
struct iovec iov[2];
int iov_count;
ssize_t n;
size_t consumer_block_size;
g_debug("fd_to_shm_ring");
shm_ring_size = shm_ring->mc->ring_size;
consumer_block_size = shm_ring->mc->consumer_block_size;
crc32_init(crc);
while (!shm_ring->mc->cancelled) {
write_offset = shm_ring->mc->write_offset;
written = shm_ring->mc->written;
while (!shm_ring->mc->cancelled) {
readx = shm_ring->mc->readx;
if (shm_ring_size - (written - readx) >= shm_ring->block_size)
break;
if (shm_ring_sem_wait(shm_ring, shm_ring->sem_write) != 0) {
break;
}
}
if (shm_ring->mc->cancelled)
break;
iov[0].iov_base = shm_ring->data + write_offset;
if (write_offset + shm_ring->block_size <= shm_ring_size) {
iov[0].iov_len = shm_ring->block_size;
iov_count = 1;
} else {
iov[0].iov_len = shm_ring_size - write_offset;
iov[1].iov_base = shm_ring->data;
iov[1].iov_len = shm_ring->block_size - iov[0].iov_len;
iov_count = 2;
}
n = readv(fd, iov, iov_count);
if (n > 0) {
if (shm_ring->mc->written == 0 && shm_ring->mc->need_sem_ready) {
sem_post(shm_ring->sem_ready);
if (shm_ring_sem_wait(shm_ring, shm_ring->sem_start) != 0) {
break;
}
}
write_offset += n;
write_offset %= shm_ring_size;
shm_ring->mc->write_offset = write_offset;
shm_ring->mc->written += n;
shm_ring->data_avail += n;
if (shm_ring->data_avail >= consumer_block_size) {
sem_post(shm_ring->sem_read);
shm_ring->data_avail -= consumer_block_size;
}
if (n <= (ssize_t)iov[0].iov_len) {
crc32_add((uint8_t *)iov[0].iov_base, n, crc);
} else {
crc32_add((uint8_t *)iov[0].iov_base, iov[0].iov_len, crc);
crc32_add((uint8_t *)iov[1].iov_base, n - iov[0].iov_len, crc);
}
} else {
shm_ring->mc->eof_flag = TRUE;
break;
}
}
sem_post(shm_ring->sem_read);
sem_post(shm_ring->sem_read);
// wait for the consumer to read everything
while (!shm_ring->mc->cancelled &&
(shm_ring->mc->written != shm_ring->mc->readx ||
!shm_ring->mc->eof_flag)) {
if (shm_ring_sem_wait(shm_ring, shm_ring->sem_write) != 0) {
break;
}
}
}
void
close_producer_shm_ring(
shm_ring_t *shm_ring)
{
if (!shm_ring->mc->eof_flag) {
shm_ring->mc->eof_flag = TRUE;
}
sem_post(shm_ring->sem_ready);
sem_post(shm_ring->sem_start);
sem_post(shm_ring->sem_write);
sem_post(shm_ring->sem_read);
g_debug("close_producer_shm_ring sem_close(sem_write %p", shm_ring->sem_write);
am_sem_close(shm_ring->sem_write);
am_sem_close(shm_ring->sem_ready);
am_sem_close(shm_ring->sem_read);
am_sem_close(shm_ring->sem_start);
if (shm_ring->shm_data_mmap_size > 0 && shm_ring->data) {
if (munmap(shm_ring->data, shm_ring->shm_data_mmap_size) == -1) {;
g_debug("munmap(data) failed: %s", strerror(errno));
exit(0);
}
}
if (munmap(shm_ring->mc, sizeof(shm_ring_control_t)) == -1) {
g_debug("munmap(mc) failed: %s", strerror(errno));
exit(1);
}
aclose(shm_ring->shm_data);
aclose(shm_ring->shm_control);
g_free(shm_ring->shm_control_name);
g_free(shm_ring);
}
void
shm_ring_to_security_stream(
shm_ring_t *shm_ring,
struct security_stream_t *netfd,
crc_t *crc)
{
uint64_t read_offset;
uint64_t shm_ring_size;
gsize usable = 0;
gboolean eof_flag = FALSE;
g_debug("shm_ring_to_security_stream");
shm_ring_size = shm_ring->mc->ring_size;
sem_post(shm_ring->sem_write);
while (!shm_ring->mc->cancelled) {
do {
if (shm_ring_sem_wait(shm_ring, shm_ring->sem_read) != 0) {
break;
}
usable = shm_ring->mc->written - shm_ring->mc->readx;
eof_flag = shm_ring->mc->eof_flag;
} while (!shm_ring->mc->cancelled &&
usable < shm_ring->block_size && !eof_flag);
read_offset = shm_ring->mc->read_offset;
while (usable >= shm_ring->block_size || eof_flag) {
gsize to_write = usable;
if (to_write > shm_ring->block_size)
to_write = shm_ring->block_size;
if (to_write + read_offset <= shm_ring_size) {
security_stream_write(netfd, shm_ring->data + read_offset,
to_write);
if (crc) {
crc32_add((uint8_t *)shm_ring->data + read_offset, to_write,
crc);
}
} else {
security_stream_write(netfd, shm_ring->data + read_offset,
shm_ring_size - read_offset);
security_stream_write(netfd, shm_ring->data,
to_write - shm_ring_size + read_offset);
if (crc) {
crc32_add((uint8_t *)shm_ring->data + read_offset, shm_ring_size - read_offset, crc);
crc32_add((uint8_t *)shm_ring->data, usable - shm_ring_size + read_offset, crc);
}
}
if (to_write) {
read_offset += to_write;
if (read_offset >= shm_ring_size)
read_offset -= shm_ring_size;
shm_ring->mc->read_offset = read_offset;
shm_ring->mc->readx += to_write;
sem_post(shm_ring->sem_write);
usable -= to_write;
}
if (shm_ring->mc->write_offset == shm_ring->mc->read_offset &&
shm_ring->mc->eof_flag) {
// notify the producer that everything is read
sem_post(shm_ring->sem_write);
return;
}
}
}
}
void
shm_ring_to_fd(
shm_ring_t *shm_ring,
int fd,
crc_t *crc)
{
uint64_t read_offset;
uint64_t shm_ring_size;
gsize usable = 0;
gboolean eof_flag = FALSE;
g_debug("shm_ring_to_fd");
shm_ring_size = shm_ring->mc->ring_size;
sem_post(shm_ring->sem_write);
while (!shm_ring->mc->cancelled) {
do {
if (shm_ring_sem_wait(shm_ring, shm_ring->sem_read) != 0) {
break;
}
usable = shm_ring->mc->written - shm_ring->mc->readx;
eof_flag = shm_ring->mc->eof_flag;
} while (!shm_ring->mc->cancelled &&
usable < shm_ring->block_size && !eof_flag);
read_offset = shm_ring->mc->read_offset;
while (usable >= shm_ring->block_size || eof_flag) {
gsize to_write = usable;
if (to_write > shm_ring->block_size)
to_write = shm_ring->block_size;
if (to_write + read_offset <= shm_ring_size) {
if (full_write(fd, shm_ring->data + read_offset, to_write) != to_write) {
g_debug("full_write failed: %s", strerror(errno));
shm_ring->mc->cancelled = TRUE;
sem_post(shm_ring->sem_write);
return;
}
if (crc) {
crc32_add((uint8_t *)shm_ring->data + read_offset, to_write,
crc);
}
} else {
if (full_write(fd, shm_ring->data + read_offset,
shm_ring_size - read_offset) != shm_ring_size - read_offset) {
g_debug("full_write failed: %s", strerror(errno));
shm_ring->mc->cancelled = TRUE;
sem_post(shm_ring->sem_write);
return;
}
if (full_write(fd, shm_ring->data,
to_write - shm_ring_size + read_offset) != to_write - shm_ring_size + read_offset) {
g_debug("full_write failed: %s", strerror(errno));
shm_ring->mc->cancelled = TRUE;
sem_post(shm_ring->sem_write);
return;
}
if (crc) {
crc32_add((uint8_t *)shm_ring->data + read_offset, shm_ring_size - read_offset, crc);
crc32_add((uint8_t *)shm_ring->data, usable - shm_ring_size + read_offset, crc);
}
}
if (to_write) {
read_offset += to_write;
if (read_offset >= shm_ring_size)
read_offset -= shm_ring_size;
shm_ring->mc->read_offset = read_offset;
shm_ring->mc->readx += to_write;
sem_post(shm_ring->sem_write);
usable -= to_write;
}
if (shm_ring->mc->write_offset == shm_ring->mc->read_offset &&
shm_ring->mc->eof_flag) {
// notify the producer that everythinng is read
sem_post(shm_ring->sem_write);
return;
}
}
}
}
void
shm_ring_producer_set_size(
shm_ring_t *shm_ring,
ssize_t ring_size,
ssize_t block_size)
{
g_debug("shm_ring_producer_set_size");
shm_ring->ring_size = ring_size;
shm_ring->block_size = block_size;
shm_ring->mc->producer_ring_size = ring_size;
shm_ring->mc->producer_block_size = block_size;
if (shm_ring_sem_wait(shm_ring, shm_ring->sem_write) == -1) {
exit(1);
}
alloc_shm_ring(shm_ring);
if (ftruncate(shm_ring->shm_data, shm_ring->mc->ring_size) == -1) {
g_debug("ftruncate of shm_data failed: %s", strerror(errno));
exit(1);
}
shm_ring->shm_data_mmap_size = shm_ring->mc->ring_size;
shm_ring->data = mmap(NULL, shm_ring->shm_data_mmap_size,
PROT_READ|PROT_WRITE, MAP_SHARED,
shm_ring->shm_data, 0);
if (shm_ring->data == MAP_FAILED) {
g_debug("shm_ring shm_ring->data failed: %s", strerror(errno));
exit(1);
}
sem_post(shm_ring->sem_read);
}
static void
alloc_shm_ring(
shm_ring_t *shm_ring)
{
uint64_t best_ring_size;
if (shm_ring->mc->producer_ring_size > shm_ring->mc->consumer_ring_size) {
best_ring_size = shm_ring->mc->producer_ring_size;
if (best_ring_size < shm_ring->mc->producer_block_size * 2)
best_ring_size = shm_ring->mc->producer_block_size * 2;
} else {
best_ring_size = shm_ring->mc->consumer_ring_size;
if (best_ring_size < shm_ring->mc->consumer_block_size * 2)
best_ring_size = shm_ring->mc->consumer_block_size * 2;
}
if (best_ring_size % shm_ring->mc->producer_block_size != 0) {
best_ring_size = ((best_ring_size % shm_ring->mc->producer_block_size)+1) * shm_ring->mc->producer_block_size;
}
while (best_ring_size % shm_ring->mc->consumer_block_size != 0) {
best_ring_size += shm_ring->mc->producer_block_size;
}
shm_ring->ring_size = best_ring_size;
shm_ring->mc->ring_size = shm_ring->ring_size;
}
static sem_t *
am_sem_create(
char *name)
{
sem_t *r;
int nb;
g_mutex_lock(shm_ring_mutex);
r = sem_open(name, O_CREAT | O_EXCL, S_IRUSR | S_IWUSR, 0);
if (r == SEM_FAILED) {
g_debug("am_sem_create failed '%s': %s", name, strerror(errno));
exit(1);
}
if (!hash_sem) {
hash_sem = g_hash_table_new(g_direct_hash, g_direct_equal);
}
nb = 1;
g_debug("am_sem_create %p %d", r, nb);
g_hash_table_insert(hash_sem, r, GINT_TO_POINTER(nb));
g_mutex_unlock(shm_ring_mutex);
return r;
}
static sem_t *
am_sem_open(
char *name)
{
sem_t *r;
int nb;
g_mutex_lock(shm_ring_mutex);
r = sem_open(name, 0);
if (r == SEM_FAILED) {
g_debug("am_sem_open failed '%s': %s", name, strerror(errno));
exit(1);
}
if (!hash_sem) {
hash_sem = g_hash_table_new(g_direct_hash, g_direct_equal);
}
nb = GPOINTER_TO_INT(g_hash_table_lookup(hash_sem, r));
nb++;
g_debug("am_sem_open %p %d", r, nb);
g_hash_table_insert(hash_sem, r, GINT_TO_POINTER(nb));
g_mutex_unlock(shm_ring_mutex);
return r;
}
static void
am_sem_close(
sem_t *sem)
{
int nb;
g_mutex_lock(shm_ring_mutex);
nb = GPOINTER_TO_INT(g_hash_table_lookup(hash_sem, sem));
nb--;
g_debug("am_sem_close %p %d", sem, nb);
if (nb <= 0) {
g_hash_table_remove(hash_sem, sem);
if (sem_close(sem) == -1) {
g_debug("sem_close(%p) failed: %s", sem, strerror(errno));
exit(1);
}
} else {
g_hash_table_insert(hash_sem, sem, GINT_TO_POINTER(nb));
}
g_mutex_unlock(shm_ring_mutex);
}
shm_ring_t *
shm_ring_create(
char **errmsg)
{
char *msg;
shm_ring_t *shm_ring = g_new0(shm_ring_t, 1);
g_debug("shm_ring_create");
shm_ring->shm_control_name = g_strdup_printf(SHM_CONTROL_NAME, (int)getpid(), get_next_shm_ring_id());
shm_unlink(shm_ring->shm_control_name);
shm_ring->shm_control = shm_open(shm_ring->shm_control_name, O_RDWR | O_CREAT, S_IRUSR | S_IWUSR);
if (shm_ring->shm_control == -1) {
msg = g_strdup_printf("shm_control failed '%s': %s", shm_ring->shm_control_name, strerror(errno));
g_debug("%s", msg);
if (*errmsg) {
*errmsg = msg;
return NULL;
}
exit(1);
}
if (ftruncate(shm_ring->shm_control, sizeof(shm_ring_control_t)) == -1) {
msg = g_strdup_printf("ftruncate of shm_control failed '%s': %s", shm_ring->shm_control_name, strerror(errno));
g_debug("%s", msg);
if (*errmsg) {
*errmsg = msg;
return NULL;
}
exit(1);
}
shm_ring->mc = mmap(NULL, sizeof(shm_ring_control_t), PROT_READ|PROT_WRITE,
MAP_SHARED, shm_ring->shm_control, 0);
if (shm_ring->mc == MAP_FAILED) {
msg = g_strdup_printf("shm_ring shm_ring.mc failed '%s': %s", shm_ring->shm_control_name, strerror(errno));
g_debug("%s", msg);
if (*errmsg) {
*errmsg = msg;
return NULL;
}
exit(1);
}
shm_ring->mc->write_offset = 0;
shm_ring->mc->read_offset = 0;
shm_ring->mc->eof_flag = FALSE;
shm_ring->mc->pids[0] = getpid();;
g_snprintf(shm_ring->mc->sem_write_name,
sizeof(shm_ring->mc->sem_write_name),
SEM_WRITE_NAME, (int)getpid(), get_next_shm_ring_id());
g_snprintf(shm_ring->mc->sem_read_name,
sizeof(shm_ring->mc->sem_read_name),
SEM_READ_NAME, (int)getpid(), get_next_shm_ring_id());
g_snprintf(shm_ring->mc->sem_ready_name,
sizeof(shm_ring->mc->sem_ready_name),
SEM_READY_NAME, (int)getpid(), get_next_shm_ring_id());
g_snprintf(shm_ring->mc->sem_start_name,
sizeof(shm_ring->mc->sem_start_name),
SEM_START_NAME, (int)getpid(), get_next_shm_ring_id());
g_snprintf(shm_ring->mc->shm_data_name,
sizeof(shm_ring->mc->shm_data_name),
SHM_DATA_NAME, (int)getpid(), get_next_shm_ring_id());
shm_unlink(shm_ring->mc->shm_data_name);
shm_ring->shm_data = shm_open(shm_ring->mc->shm_data_name,
O_RDWR | O_CREAT | O_EXCL, S_IRUSR | S_IWUSR);
if (shm_ring->shm_data == -1) {
msg = g_strdup_printf("shm_data failed '%s': %s", shm_ring->mc->shm_data_name,strerror(errno));
g_debug("%s", msg);
if (*errmsg) {
*errmsg = msg;
return NULL;
}
exit(1);
}
sem_unlink(shm_ring->mc->sem_write_name);
shm_ring->sem_write = am_sem_create(shm_ring->mc->sem_write_name);
sem_unlink(shm_ring->mc->sem_read_name);
shm_ring->sem_read = am_sem_create(shm_ring->mc->sem_read_name);
sem_unlink(shm_ring->mc->sem_ready_name);
shm_ring->sem_ready = am_sem_create(shm_ring->mc->sem_ready_name);
sem_unlink(shm_ring->mc->sem_start_name);
shm_ring->sem_start = am_sem_create(shm_ring->mc->sem_start_name);
g_debug("shm_data: %s", shm_ring->mc->shm_data_name);
g_debug("sem_write: %s", shm_ring->mc->sem_write_name);
g_debug("sem_read: %s", shm_ring->mc->sem_read_name);
g_debug("sem_ready: %s", shm_ring->mc->sem_ready_name);
g_debug("sem_start: %s", shm_ring->mc->sem_start_name);
return shm_ring;
}
void
shm_ring_consumer_set_size(
shm_ring_t *shm_ring,
ssize_t ring_size, /* shm_ring desired size */
ssize_t block_size)
{
g_debug("shm_ring_consumer_set_size");
shm_ring->ring_size = ring_size;
shm_ring->block_size = block_size;
shm_ring->mc->consumer_ring_size = ring_size;
shm_ring->mc->consumer_block_size = block_size;
sem_post(shm_ring->sem_write);
if (shm_ring_sem_wait(shm_ring, shm_ring->sem_read) == -1) {
g_debug("shm_ring_consumer_set_size: fail shm_ring_sem_wait");
return;
}
if (shm_ring->mc->cancelled) {
g_debug("shm_ring_consumer_set_size: cancelled");
return;
}
if (shm_ring->mc->ring_size == 0) {
g_debug("shm_ring_consumer_set_size: ring_size == 0");
shm_ring->mc->cancelled = TRUE;
sem_post(shm_ring->sem_read);
sem_post(shm_ring->sem_write);
sem_post(shm_ring->sem_ready);
sem_post(shm_ring->sem_start);
return;
}
shm_ring->ring_size = shm_ring->mc->ring_size;
shm_ring->shm_data_mmap_size = shm_ring->mc->ring_size;
shm_ring->data = mmap(NULL, shm_ring->shm_data_mmap_size,
PROT_READ|PROT_WRITE, MAP_SHARED,
shm_ring->shm_data, 0);
if (shm_ring->data == MAP_FAILED) {
g_debug("shm_ring shm_ring->data failed (%lld): %s", (long long)shm_ring->shm_data_mmap_size, strerror(errno));
g_debug("shm_ring->ring_size %lld", (long long)shm_ring->ring_size);
g_debug("shm_ring->block_size %lld", (long long)shm_ring->block_size);
g_debug("shm_ring->mc->consumer_ring_size %lld", (long long)shm_ring->mc->consumer_ring_size);
g_debug("shm_ring->mc->producer_ring_size %lld", (long long)shm_ring->mc->producer_ring_size);
g_debug("shm_ring->mc->consumer_block_size %lld", (long long)shm_ring->mc->consumer_block_size);
g_debug("shm_ring->mc->producer_block_size %lld", (long long)shm_ring->mc->producer_block_size);
g_debug("shm_ring->mc->ring_size %lld", (long long)shm_ring->mc->ring_size);
exit(1);
}
}
shm_ring_t *
shm_ring_link(
char *name)
{
shm_ring_t *shm_ring = g_new0(shm_ring_t, 1);
int i;
g_debug("shm_ring_link %s", name);
shm_ring->shm_control_name = g_strdup(name);
shm_ring->shm_control = shm_open(shm_ring->shm_control_name, O_RDWR, S_IRUSR | S_IWUSR);
if (shm_ring->shm_control == -1) {
g_debug("shm_control failed '%s': %s", shm_ring->shm_control_name, strerror(errno));
exit(1);
}
shm_ring->mc = mmap(NULL, sizeof(shm_ring_control_t), PROT_READ|PROT_WRITE, MAP_SHARED,
shm_ring->shm_control, 0);
if (shm_ring->mc == MAP_FAILED) {
g_debug("shm_ring shm_ring.mc failed '%s': %s", shm_ring->shm_control_name, strerror(errno));
exit(1);
}
shm_ring->shm_data = shm_open(shm_ring->mc->shm_data_name, O_RDWR, S_IRUSR | S_IWUSR);
if (shm_ring->shm_data == -1) {
g_debug("shm_data failed '%s': %s", shm_ring->mc->shm_data_name, strerror(errno));
exit(1);
}
shm_ring->shm_data_mmap_size = 0;
shm_ring->sem_write = am_sem_open(shm_ring->mc->sem_write_name);
shm_ring->sem_read = am_sem_open(shm_ring->mc->sem_read_name);
shm_ring->sem_ready = am_sem_open(shm_ring->mc->sem_ready_name);
shm_ring->sem_start = am_sem_open(shm_ring->mc->sem_start_name);
for (i=1; i < SHM_RING_MAX_PID; i++) {
if (shm_ring->mc->pids[i] == 0) {
shm_ring->mc->pids[i] = getpid();
break;
}
}
return shm_ring;
}
void
close_consumer_shm_ring(
shm_ring_t *shm_ring)
{
g_debug("close_consumer_shm_ring sem_close(sem_write %p", shm_ring->sem_write);
am_sem_close(shm_ring->sem_write);
am_sem_close(shm_ring->sem_read);
am_sem_close(shm_ring->sem_ready);
am_sem_close(shm_ring->sem_start);
if (sem_unlink(shm_ring->mc->sem_write_name) == -1 && errno != ENOENT) {
g_debug("sem_unlink(sem_write_name) failed: %s", strerror(errno));
exit(1);
}
if (sem_unlink(shm_ring->mc->sem_read_name) == -1 && errno != ENOENT) {
g_debug("sem_unlink(sem_read_name) failed: %s", strerror(errno));
exit(1);
}
if (sem_unlink(shm_ring->mc->sem_ready_name) == -1 && errno != ENOENT) {
g_debug("sem_unlink(sem_ready_name) failed: %s", strerror(errno));
exit(1);
}
if (sem_unlink(shm_ring->mc->sem_start_name) == -1 && errno != ENOENT) {
g_debug("sem_unlink(sem_start_name) failed: %s", strerror(errno));
exit(1);
}
if (shm_ring->shm_data_mmap_size > 0 && shm_ring->data) {
if (munmap(shm_ring->data, shm_ring->shm_data_mmap_size) == -1) {
g_debug("munmap(data) failed: %s", strerror(errno));
exit(1);
}
}
if (shm_unlink(shm_ring->mc->shm_data_name) == -1 && errno != ENOENT) {
g_debug("shm_unlink(shm_ring_data_name) failed: %s", strerror(errno));
exit(1);
}
if (munmap(shm_ring->mc, sizeof(shm_ring_control_t)) == -1) {
g_debug("munmap(mc) failed: %s", strerror(errno));
exit(1);
}
if (shm_unlink(shm_ring->shm_control_name) == -1 && errno != ENOENT) {
g_debug("shm_unlink(shm_ring_control_name) failed: %s", strerror(errno));
exit(1);
}
aclose(shm_ring->shm_data);
aclose(shm_ring->shm_control);
g_free(shm_ring->shm_control_name);
g_free(shm_ring);
}