/* queue.c --
* Copyright 2007,2013,2015,2018 Red Hat Inc., Durham, North Carolina.
* All Rights Reserved.
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public
* License as published by the Free Software Foundation; either
* version 2.1 of the License, or (at your option) any later version.
*
* This library is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public
* License along with this library; if not, write to the Free Software
* Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
*
* Authors:
* Steve Grubb <sgrubb@redhat.com>
*/
#include "config.h"
#include <pthread.h>
#include <stdlib.h>
#include <unistd.h>
#include <syslog.h>
#include "queue.h"
static volatile event_t **q;
static pthread_mutex_t queue_lock;
static pthread_cond_t queue_nonempty;
static unsigned int q_next, q_last, q_depth, processing_suspended;
static unsigned int currently_used, max_used, overflowed;
static const char *SINGLE = "1";
static const char *HALT = "0";
static int queue_full_warning = 0;
extern volatile int disp_hup;
#define QUEUE_FULL_LIMIT 5
void reset_suspended(void)
{
processing_suspended = 0;
queue_full_warning = 0;
}
int init_queue(unsigned int size)
{
unsigned int i;
processing_suspended = 0;
q_next = 0;
q_last = 0;
currently_used = 0;
max_used = 0;
overflowed = 0;
q_depth = size;
q = malloc(q_depth * sizeof(event_t *));
if (q == NULL)
return -1;
for (i=0; i < q_depth; i++)
q[i] = NULL;
/* Setup IPC mechanisms */
pthread_mutex_init(&queue_lock, NULL);
pthread_cond_init(&queue_nonempty, NULL);
return 0;
}
static void change_runlevel(const char *level)
{
char *argv[3];
int pid;
static const char *init_pgm = "/sbin/init";
pid = fork();
if (pid < 0) {
syslog(LOG_ALERT, "Audispd failed to fork switching runlevels");
return;
}
if (pid) /* Parent */
return;
/* Child */
argv[0] = (char *)init_pgm;
argv[1] = (char *)level;
argv[2] = NULL;
execve(init_pgm, argv, NULL);
syslog(LOG_ALERT, "Audispd failed to exec %s", init_pgm);
exit(1);
}
static void do_overflow_action(struct disp_conf *config)
{
overflowed = 1;
switch (config->overflow_action)
{
case O_IGNORE:
break;
case O_SYSLOG:
if (queue_full_warning < QUEUE_FULL_LIMIT) {
syslog(LOG_ERR,
"queue to plugins is full - dropping event");
queue_full_warning++;
if (queue_full_warning == QUEUE_FULL_LIMIT)
syslog(LOG_ERR,
"auditd queue full reporting "
"limit reached - ending "
"dropped event notifications");
}
break;
case O_SUSPEND:
syslog(LOG_ALERT,
"Auditd is suspending event passing to plugins due to overflowing its queue.");
processing_suspended = 1;
break;
case O_SINGLE:
syslog(LOG_ALERT,
"Auditd is now changing the system to single user mode due to overflowing its queue");
change_runlevel(SINGLE);
break;
case O_HALT:
syslog(LOG_ALERT,
"Auditd is now halting the system due to overflowing its queue");
change_runlevel(HALT);
break;
default:
syslog(LOG_ALERT, "Unknown overflow action requested");
break;
}
}
/* returns 0 on success and -1 on error */
int enqueue(event_t *e, struct disp_conf *config)
{
unsigned int n, retry_cnt = 0;
if (processing_suspended) {
free(e);
return 0;
}
retry:
// We allow 3 retries and then its over
if (retry_cnt > 3) {
do_overflow_action(config);
free(e);
return -1;
}
pthread_mutex_lock(&queue_lock);
// OK, have lock add event
n = q_next%q_depth;
if (q[n] == NULL) {
q[n] = e;
q_next = (n+1) % q_depth;
currently_used++;
if (currently_used > max_used)
max_used = currently_used;
pthread_cond_signal(&queue_nonempty);
pthread_mutex_unlock(&queue_lock);
} else {
pthread_mutex_unlock(&queue_lock);
struct timespec ts;
ts.tv_sec = 0;
ts.tv_nsec = 2 * 1000 * 1000; // 2 milliseconds
nanosleep(&ts, NULL); /* Let other thread try to log it. */
retry_cnt++;
goto retry;
}
return 0;
}
event_t *dequeue(void)
{
event_t *e;
unsigned int n;
// Wait until its got something in it
pthread_mutex_lock(&queue_lock);
if (disp_hup) {
pthread_mutex_unlock(&queue_lock);
return NULL;
}
n = q_last%q_depth;
if (q[n] == NULL) {
pthread_cond_wait(&queue_nonempty, &queue_lock);
if (disp_hup) {
pthread_mutex_unlock(&queue_lock);
return NULL;
}
n = q_last%q_depth;
}
// OK, grab the next event
if (q[n] != NULL) {
e = (event_t *)q[n];
q[n] = NULL;
q_last = (n+1) % q_depth;
} else
e = NULL;
currently_used--;
pthread_mutex_unlock(&queue_lock);
// Process the event
return e;
}
void nudge_queue(void)
{
pthread_cond_signal(&queue_nonempty);
}
void increase_queue_depth(unsigned int size)
{
pthread_mutex_lock(&queue_lock);
if (size > q_depth) {
unsigned int i;
void *tmp_q;
tmp_q = realloc(q, size * sizeof(event_t *));
q = tmp_q;
for (i=q_depth; i<size; i++)
q[i] = NULL;
q_depth = size;
}
pthread_mutex_unlock(&queue_lock);
}
void write_queue_state(FILE *f)
{
fprintf(f, "current plugin queue depth = %u\n", currently_used);
fprintf(f, "max plugin queue depth used = %u\n", max_used);
fprintf(f, "plugin queue size = %u\n", q_depth);
fprintf(f, "plugin queue overflow detected = %s\n",
overflowed ? "yes" : "no");
fprintf(f, "plugin queueing suspended = %s\n",
processing_suspended ? "yes" : "no");
}
void resume_queue(void)
{
processing_suspended = 0;
}
void destroy_queue(void)
{
unsigned int i;
for (i=0; i<q_depth; i++)
free((void *)q[i]);
free(q);
}