/*
* ModSecurity for Apache 2.x, http://www.modsecurity.org/
* Copyright (c) 2004-2013 Trustwave Holdings, Inc. (http://www.trustwave.com/)
*
* You may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* If any of the files related to licensing are missing or if you have any
* other questions related to licensing please contact Trustwave Holdings, Inc.
* directly using the email address security@modsecurity.org.
*/
#include <apr.h>
#include <apr_errno.h>
#include <apr_general.h>
#include <apr_file_io.h>
#include <apr_file_info.h>
#include <apr_hash.h>
#include <apr_lib.h>
#include <apr_strings.h>
#include <apr_signal.h>
#include <apr_thread_proc.h>
#include <apr_global_mutex.h>
#include <apr_getopt.h>
#include <apr_version.h>
#if APR_HAVE_UNISTD_H
#include <unistd.h> /* for getpid() */
#endif
#include <pcre.h>
#include <curl/curl.h>
#include <fcntl.h>
#include <sys/stat.h>
#include <sys/types.h>
#include "msc_release.h"
static void logc_shutdown(int rc);
static void create_new_worker(int lock);
static void error_log(int level, void *thread,
const char *text, ...) PRINTF_ATTRIBUTE(3,4);
/* -- Constants -- */
/* Error log levels. */
#define LOG_ERROR 1
#define LOG_WARNING 2
#define LOG_NOTICE 3
#define LOG_DEBUG 4
#define LOG_DEBUG2 5
/* The management thread will wake up every five seconds. */
#define MANAGER_SLEEP 5000000
#define MANAGER_SUBSLEEP 10000
/* Hack to allow multiple mlogc with single delete */
#define KEEP_ENTRIES_REMOVE_HACK 2600
#define KEEP_ENTRIES_REMOVE_TIME 0l
#ifdef TEST_HACK
#define TEST_WITH_RAND_SLEEP(n) \
do { \
int sec = rand()/(RAND_MAX/n); \
error_log(LOG_DEBUG2, NULL, "TEST_HACK: Sleeping for %ds", sec); \
apr_sleep(apr_time_from_sec(sec)); \
} while(0)
#else
#define TEST_WITH_RAND_SLEEP(n)
#endif
#define CAPTUREVECTORSIZE 60
#define PIPE_BUF_SIZE 65536
#define MEMALLOC_ERROR_MSG "Memory allocation failed!"
#define VERSION MODSEC_VERSION
#define CMDLINE_OPTS "fvh"
#define TXIN 0
#define TXOUT 1
#define STATUSBUF_SIZE 256
#define ISHEXCHAR(X) ( ((X >= '0')&&(X <= '9')) \
|| ((X >= 'a')&&(X <= 'f')) \
|| ((X >= 'A')&&(X <= 'F')) )
/* -- Regex Patterns -- */
/**
* This regular expression is used to parse the entire
* log line we receive from Apache. The REQUEST_LINE is
* treated as a single parameter to allow for invalid
* requests.
*/
static const char logline_pattern[] =
"^(\\S+)"
"\\ (\\S+)\\ (\\S+)\\ (\\S+)"
"\\ \\[([^:]+):(\\d+:\\d+:\\d+)\\ ([^\\]]+)\\]"
"\\ \"(.*)\""
"\\ (\\d+)\\ (\\S+)"
"\\ \"(.*)\"\\ \"(.*)\""
"\\ (\\S+)\\ \"(.*)\""
"\\ /?(\\S+)\\ (\\d+)\\ (\\d+)"
"\\ (\\S+)"
"(.*)$";
/**
* This regular expression can be used to parse
* a REQUEST_LINE field into method, URI, and
* protocol.
*/
static const char requestline_pattern[] =
"(\\S+)\\ (.*?)\\ (\\S+)";
/* -- Structures -- */
typedef struct {
unsigned long int id;
const char *line;
apr_size_t line_size;
} entry_t;
/* -- Global variables -- */
static pid_t logc_pid = 0;
static const char *conffile = NULL;
static const char *lockfile = NULL;
static int have_read_data = 0;
static int checkpoint_interval = 60;
static apr_time_t checkpoint_time_last = 0;
static const char *collector_root = NULL;
static apr_table_t *conf = NULL;
static const char *console_uri = NULL;
static apr_array_header_t *curl_handles = NULL;
static int current_workers = 0;
static int management_thread_active = 0;
static unsigned long int entry_counter = 1;
static const char *error_log_path = NULL;
static apr_file_t *error_log_fd = NULL;
static int error_log_level = 2;
static apr_hash_t *in_progress = NULL;
static int keep_alive = 150; /* Not used yet. */
static int keep_alive_timeout = 300; /* Not used yet. */
static int keep_entries = 0;
static const char *log_repository = NULL;
static void *logline_regex = NULL;
static int max_connections = 10;
static int max_worker_requests = 1000;
static apr_global_mutex_t *gmutex = NULL;
static apr_thread_mutex_t *mutex = NULL;
static apr_pool_t *pool = NULL;
static apr_pool_t *thread_pool = NULL;
static apr_pool_t *recv_pool = NULL;
static apr_array_header_t *queue = NULL;
static const char *queue_path = NULL;
static int ssl_validation = 0;
static int tlsprotocol = 1;
static curl_version_info_data* curlversion = NULL;
/* static apr_time_t queue_time = 0; */
static void *requestline_regex = NULL;
static int running = 0;
static const char *sensor_password = NULL;
static const char *sensor_username = NULL;
static int server_error = 0;
static apr_time_t server_error_last_check_time = 0;
static int server_error_timeout = 60;
static int startup_delay = 100;
static int transaction_delay = 100;
static const char *transaction_log_path = NULL;
static apr_file_t *transaction_log_fd = NULL;
/* -- Commandline opts -- */
static int opt_force = 0;
/* -- Code -- */
static char *_log_escape(apr_pool_t *mp, const char *input,
apr_size_t input_len)
{
static const char c2x_table[] = "0123456789abcdef";
unsigned char *d = NULL;
char *ret = NULL;
unsigned long int i;
if (input == NULL) return NULL;
ret = apr_palloc(mp, input_len * 4 + 1);
if (ret == NULL) return NULL;
d = (unsigned char *)ret;
i = 0;
while(i < input_len) {
switch(input[i]) {
case '"' :
*d++ = '\\';
*d++ = '"';
break;
case '\b' :
*d++ = '\\';
*d++ = 'b';
break;
case '\n' :
*d++ = '\\';
*d++ = 'n';
break;
case '\r' :
*d++ = '\\';
*d++ = 'r';
break;
case '\t' :
*d++ = '\\';
*d++ = 't';
break;
case '\v' :
*d++ = '\\';
*d++ = 'v';
break;
case '\\' :
*d++ = '\\';
*d++ = '\\';
break;
default :
if ((input[i] <= 0x1f)||(input[i] >= 0x7f)) {
*d++ = '\\';
*d++ = 'x';
*d++ = c2x_table[input[i] >> 4];
*d++ = c2x_table[input[i] & 0x0f];
} else {
*d++ = input[i];
}
break;
}
i++;
}
*d = 0;
return ret;
}
/**
* Converts a byte given as its hexadecimal representation
* into a proper byte. Handles uppercase and lowercase letters
* but does not check for overflows.
*/
static unsigned char x2c(unsigned char *what) {
register unsigned char digit;
digit = (what[0] >= 'A' ? ((what[0] & 0xdf) - 'A') + 10 : (what[0] - '0'));
digit *= 16;
digit += (what[1] >= 'A' ? ((what[1] & 0xdf) - 'A') + 10 : (what[1] - '0'));
return digit;
}
/**
* URL Decodes a string in-place
*/
static int urldecode_inplace(unsigned char *input, apr_size_t input_len) {
unsigned char *d = (unsigned char *)input;
apr_size_t i;
if (input == NULL) return 0;
i = 0;
while (i < input_len) {
if (input[i] == '%') {
/* Character is a percent sign. */
/* Are there enough bytes available? */
if (i + 2 < input_len) {
char c1 = input[i + 1];
char c2 = input[i + 2];
if (ISHEXCHAR(c1) && ISHEXCHAR(c2)) {
/* Valid encoding - decode it. */
*d++ = x2c(&input[i + 1]);
i += 3;
} else {
/* Not a valid encoding, skip this % */
*d++ = input[i++];
}
} else {
/* Not enough bytes available, copy the raw bytes. */
*d++ = input[i++];
}
} else {
/* Character is not a percent sign. */
if (input[i] == '+') {
*d++ = ' ';
} else {
*d++ = input[i];
}
i++;
}
}
*d = '\0';
return 1;
}
/**
* Detect a relative path and merge it with the collector root
* path. Leave absolute paths as they are.
*/
static const char *file_path(const char *path)
{
char *newpath = NULL;
apr_status_t rc;
if (path == NULL) return NULL;
rc = apr_filepath_merge(&newpath, collector_root,
path, APR_FILEPATH_TRUENAME, pool);
if ((newpath != NULL) && (rc == APR_SUCCESS || APR_STATUS_IS_EPATHWILD(rc)
|| APR_STATUS_IS_ENOENT(rc) || APR_STATUS_IS_ENOTDIR(rc)))
{
return newpath;
}
else {
return NULL;
}
}
/**
* Returns the current datetime as a string.
*/
static char *current_logtime(char *dest, int dlen)
{
apr_time_exp_t t;
apr_size_t len;
apr_time_exp_lt(&t, apr_time_now());
apr_strftime(dest, &len, dlen, "%a %b %d %H:%M:%S %Y", &t);
return dest;
}
/**
* Logs error to the error log (if available) or
* to the stderr.
*/
static void error_log(int level, void *thread, const char *text, ...)
{
char msg1[4096] = "";
char msg2[4096] = "";
char datetime[100];
va_list ap;
if (level > error_log_level) return;
va_start(ap, text);
apr_vsnprintf(msg1, sizeof(msg1), text, ap);
apr_snprintf(msg2, sizeof(msg2), "[%s] [%d] [%" APR_PID_T_FMT "/%pp] %s\n",
current_logtime(datetime, sizeof(datetime)),
level, logc_pid, (thread ? thread : 0), msg1);
if (error_log_fd != NULL) {
apr_size_t nbytes_written;
apr_size_t nbytes = strlen(msg2);
apr_file_write_full(error_log_fd, msg2, nbytes, &nbytes_written);
}
else {
fprintf(stderr, "%s", msg2);
}
va_end(ap);
}
/**
* Adds one entry to the internal queue. It will (optionally) start
* a new thread to handle it.
*/
static void add_entry(const char *data, int start_worker)
{
entry_t *entry = NULL;
entry = (entry_t *)malloc(sizeof(entry_t));
entry->id = 0;
entry->line = strdup(data);
entry->line_size = strlen(entry->line);
error_log(LOG_DEBUG, NULL, "Queue locking thread mutex.");
if (APR_STATUS_IS_EBUSY(apr_thread_mutex_trylock(mutex))) {
error_log(LOG_DEBUG, NULL, "Queue waiting on thread mutex.");
apr_thread_mutex_lock(mutex);
}
/* Assign unique ID to this log entry. */
entry->id = entry_counter++;
/* Add the new audit log entry to the queue. */
*(entry_t **)apr_array_push(queue) = entry;
/* Create a new worker if we can, but not if there is a
* known problem with the server.
*/
if ( (start_worker != 0)
&& (current_workers < max_connections)&&(server_error == 0))
{
create_new_worker(0);
}
error_log(LOG_DEBUG, NULL, "Queue unlocking thread mutex.");
apr_thread_mutex_unlock(mutex);
}
/**
* Read the queue entries.
*/
static int read_queue_entries(apr_file_t *fd, apr_time_t *queue_time)
{
char linebuf[4100];
int line_count = -1;
int line_size = 0;
apr_status_t rc = 0;
char *p = NULL;
for(;;) {
memset(linebuf, 0, 4100);
rc = apr_file_gets(linebuf, 4096, fd);
if (rc == APR_EOF) break;
if (rc != APR_SUCCESS) {
error_log(LOG_ERROR, NULL, "Error reading from the queue file.");
logc_shutdown(1);
}
if (line_count < 0) {
/* First line contains the queue time. */
*queue_time = (apr_time_t)apr_atoi64(linebuf);
line_count = 0;
continue;
}
p = &linebuf[0];
line_size = strlen(p);
/* Remove the \n from the end of the line. */
while(*p != '\0' && line_size > 0) {
if (*p == '\n') {
*p = '\0';
break;
}
p++;
line_size--;
}
if (linebuf[0] == '#') { /* Ignore comments. */
continue;
}
add_entry((const char *)&linebuf, 0);
line_count++;
}
apr_file_close(fd);
return line_count;
}
/**
* Initialise the transaction log. This code should be
* executed only once at startup.
*/
static void transaction_log_init(void)
{
/* ENH: These big enough? */
char new_queue_path[256];
char old_queue_path[256];
apr_file_t *queue_fd = NULL;
apr_time_t queue_time;
apr_snprintf(new_queue_path, sizeof(new_queue_path), "%s.new", queue_path);
apr_snprintf(old_queue_path, sizeof(old_queue_path), "%s.old", queue_path);
/* Put a lock in place to ensure exclusivity. */
error_log(LOG_DEBUG, NULL,
"Transaction initialization locking global mutex.");
if (APR_STATUS_IS_EBUSY(apr_global_mutex_trylock(gmutex))) {
error_log(LOG_DEBUG, NULL,
"Transaction initialization waiting on global mutex.");
apr_global_mutex_lock(gmutex);
}
error_log(LOG_DEBUG, NULL, "Transaction initialization started.");
/* Delete .new file if there is one. */
apr_file_remove(new_queue_path, pool);
/* Read in the data from the queue. */
if (apr_file_open(&queue_fd, queue_path, APR_READ | APR_FILE_NOCLEANUP,
0, pool) == APR_SUCCESS)
{
int line_count = read_queue_entries(queue_fd, &queue_time);
apr_file_close(queue_fd);
if (line_count > 0) {
error_log(LOG_NOTICE, NULL,
"Loaded %d entries from the queue file.", line_count);
}
}
/* Try the old queue file. */
else if (apr_file_open(&queue_fd, old_queue_path,
APR_READ | APR_FILE_NOCLEANUP,
0, pool) == APR_SUCCESS)
{
int line_count = read_queue_entries(queue_fd, &queue_time);
apr_file_close(queue_fd);
error_log(LOG_NOTICE, NULL,
"Loaded %d entries from the OLD queue file.", line_count);
apr_file_rename(old_queue_path, queue_path, pool);
}
else {
error_log(LOG_NOTICE, NULL,
"Queue file not found. New one will be created.");
}
/* Delete the old queue file. */
apr_file_remove(old_queue_path, pool);
checkpoint_time_last = apr_time_now();
/* Start fresh with the transaction log. Do note that
* we do not truncate the transaction log on purpose. Apache
* will start copies of piped logging binaries during configuration
* testing. Truncating would erase the log of a currently running
* instance.
*/
if (apr_file_open(&transaction_log_fd, transaction_log_path,
APR_WRITE | APR_CREATE | APR_APPEND | APR_XTHREAD,
APR_OS_DEFAULT, pool) != APR_SUCCESS)
{
error_log(LOG_ERROR, NULL,
"Failed to open the transaction log: %s\n",
transaction_log_path);
error_log(LOG_DEBUG, NULL,
"Transaction initialization unlocking global mutex.");
apr_global_mutex_unlock(gmutex);
logc_shutdown(1);
}
error_log(LOG_DEBUG, NULL, "Transaction initialization completed.");
/* Unlock */
error_log(LOG_DEBUG, NULL,
"Transaction initialization unlocking global mutex.");
apr_global_mutex_unlock(gmutex);
}
/**
* Log entry event (incoming or outgoing) to the transaction log.
*/
static void transaction_log(int direction, const char *entry)
{
apr_size_t nbytes, nbytes_written;
char msg[8196] = "";
apr_snprintf(msg, sizeof(msg), "%u %s: %s\n",
(unsigned int)apr_time_sec(apr_time_now()),
(direction == TXIN ? "IN" : "OUT"), entry);
nbytes = strlen(msg);
apr_file_write_full(transaction_log_fd, msg, nbytes, &nbytes_written);
}
/**
* Executes a checkpoint, which causes the current queue to be
* written to a file and the transaction log to be truncated.
*/
static void transaction_checkpoint(void)
{
/* ENH: These big enough? */
char new_queue_path[256];
char old_queue_path[256];
apr_file_t *queue_fd = NULL;
apr_hash_index_t *hi = NULL;
char msg[256];
int i;
apr_pool_t *cpool;
apr_snprintf(new_queue_path, sizeof(new_queue_path), "%s.new", queue_path);
apr_snprintf(old_queue_path, sizeof(old_queue_path), "%s.old", queue_path);
apr_snprintf(msg, sizeof(msg), "%u\n",
(unsigned int)apr_time_sec(apr_time_now()));
if (! have_read_data) {
error_log(LOG_DEBUG, NULL, "Checkpoint not required.");
return;
}
/* Put a lock in place to ensure exclusivity. */
error_log(LOG_DEBUG, NULL, "Checkpoint locking global mutex.");
if (APR_STATUS_IS_EBUSY(apr_global_mutex_trylock(gmutex))) {
error_log(LOG_DEBUG, NULL, "Checkpoint waiting on global mutex.");
apr_global_mutex_lock(gmutex);
}
error_log(LOG_DEBUG, NULL, "Checkpoint started.");
apr_pool_create(&cpool, NULL);
/* Dump active entries into a new queue file. */
if (apr_file_open(&queue_fd, new_queue_path,
APR_WRITE | APR_CREATE | APR_EXCL |
APR_TRUNCATE | APR_FILE_NOCLEANUP,
APR_OS_DEFAULT, cpool) != APR_SUCCESS)
{
error_log(LOG_ERROR, NULL, "Failed to create file: %s", new_queue_path);
error_log(LOG_DEBUG, NULL, "Checkpoint unlocking global mutex.");
apr_pool_destroy(cpool);
apr_global_mutex_unlock(gmutex);
return;
}
/* Write the time first. */
apr_file_write_full(queue_fd, msg, strlen(msg), NULL);
/* Dump the entries sitting in the queue first. */
for (i = 0; i < queue->nelts; i++) {
entry_t *entry = ((entry_t **)queue->elts)[i];
apr_file_write_full(queue_fd, entry->line, entry->line_size, NULL);
apr_file_write_full(queue_fd, &"\n", 1, NULL);
}
error_log(LOG_DEBUG2, NULL,
"Checkpoint wrote %d queued entries to new queue.", i);
/* Then dump the ones that are currently being processed. */
i = 0;
for (hi = apr_hash_first(NULL, in_progress);
hi != NULL; hi = apr_hash_next(hi))\
{
void *e;
entry_t *entry = NULL;
i++;
apr_hash_this(hi, NULL, NULL, &e);
entry = e; /* quiet type-punned warning */
apr_file_write_full(queue_fd, entry->line, entry->line_size, NULL);
apr_file_write_full(queue_fd, &"\n", 1, NULL);
}
error_log(LOG_DEBUG2, NULL,
"Checkpoint wrote %d additional entries to new queue.", i);
apr_file_close(queue_fd);
/* Switch the files and truncate the transaction log file. */
apr_file_remove(old_queue_path, cpool);
apr_file_rename(queue_path, old_queue_path, cpool);
apr_file_rename(new_queue_path, queue_path, cpool);
apr_file_remove(old_queue_path, cpool);
apr_file_trunc(transaction_log_fd, 0);
error_log(LOG_DEBUG, NULL, "Checkpoint completed.");
apr_pool_destroy(cpool);
/* Unlock and exit. */
error_log(LOG_DEBUG, NULL, "Checkpoint unlocking global mutex.");
apr_global_mutex_unlock(gmutex);
}
/**
* Parse one confguration line and add it to the
* configuration table.
*/
static void parse_configuration_line(const char *line, int line_count)
{
char *start = NULL, *command = NULL;
char *p = NULL;
/* Remove the trailing newline character. */
p = (char *)line;
while(*p != '\0') p++;
if ((p > start)&&(*(p - 1) == '\n')) *(p - 1) = '\0';
p = (char *)line;
/* Ignore whitespace at the beginning of the line. */
while(apr_isspace(*p)) p++;
/* Ignore empty lines and comments. */
if ((*p == '\0')||(*p == '#')) return;
start = p;
while(!apr_isspace(*p)&&(*p != '\0')) p++;
command = apr_pstrmemdup(pool, start, p - start);
while(apr_isspace(*p)) p++;
/* Remove whitespace at the end. */
start = p;
while(*p != '\0') p++;
if (p > start) {
p--;
while(apr_isspace(*p)) {
*p-- = '\0';
}
}
/* Remove quotes, but only if we have matching */
if ((*start == '"') && (p > start) && (*p == '"')) {
start++;
*p-- = '\0';
}
/* Take the last directive */
/* ENH: Error on dup directives? */
apr_table_set(conf, command, start);
}
/**
* Reads configuration from a file.
*/
static void read_configuration(void)
{
char linebuf[4096];
apr_status_t rc;
apr_file_t *fd;
int line_count;
conf = apr_table_make(pool, 32);
if (conf == NULL) {
error_log(LOG_ERROR, NULL, MEMALLOC_ERROR_MSG);
logc_shutdown(1);
}
rc = apr_file_open(&fd, conffile, APR_READ | APR_FILE_NOCLEANUP, 0, pool);
if (rc != APR_SUCCESS) {
error_log(LOG_ERROR, NULL,
"Unable to open configuration file: %s", conffile);
logc_shutdown(1);
}
line_count = 0;
for(;;) {
rc = apr_file_gets(linebuf, 4096, fd);
if (rc == APR_EOF) return;
if (rc != APR_SUCCESS) {
error_log(LOG_ERROR, NULL,
"Error reading from the configuration file.");
logc_shutdown(1);
}
line_count++;
parse_configuration_line(linebuf, line_count);
}
apr_file_close(fd);
}
/**
* Initialize the configuration.
*/
static void init_configuration(void)
{
char errstr[1024];
apr_status_t rc = 0;
const char *s = NULL;
/* Other values may be based off the collector root. */
s = apr_table_get(conf, "CollectorRoot");
if (s != NULL) {
collector_root = s;
}
/* Error Log */
s = apr_table_get(conf, "ErrorLog");
if (s != NULL) {
error_log_path = file_path(s);
}
s = apr_table_get(conf, "ErrorLogLevel");
if (s != NULL) {
error_log_level = atoi(s);
}
if ((rc = apr_file_open(&error_log_fd, error_log_path,
APR_WRITE | APR_CREATE | APR_APPEND,
APR_OS_DEFAULT, pool)) != APR_SUCCESS)
{
error_log(LOG_ERROR, NULL, "Failed to open the error log %s: %s\n",
error_log_path, apr_strerror(rc, errstr, 1024));
logc_shutdown(1);
}
error_log(LOG_NOTICE, NULL,
"Configuring ModSecurity Audit Log Collector %s.", VERSION);
/* Startup Delay */
s = apr_table_get(conf, "StartupDelay");
if (s != NULL) {
startup_delay = atoi(s);
}
/* TLS Protocol - TLSv1(0) TLSv1.1(1) TLSv1.2(2) (SSLv3 not supported) */
s = apr_table_get(conf, "TLSProtocol");
if (s != NULL) {
int num = atoi(s);
switch (num) {
case 0:
tlsprotocol = 0;
break;
case 1:
tlsprotocol = 1;
break;
case 2:
tlsprotocol = 2;
break;
default:
tlsprotocol = 2; /* Default is TLSv1.2 */
}
}
curlversion = curl_version_info(CURLVERSION_NOW);
if ( startup_delay > 0 ) {
error_log(LOG_NOTICE, NULL,
"Delaying execution for %dms.", startup_delay);
apr_sleep(startup_delay * 1000);
error_log(LOG_DEBUG, NULL,
"Continuing execution after %dms delay.", startup_delay);
}
/* Remaining Configuration */
error_log(LOG_DEBUG2, NULL, "CollectorRoot=%s", collector_root);
error_log(LOG_DEBUG2, NULL, "ErrorLog=%s", error_log_path);
error_log(LOG_DEBUG2, NULL, "ErrorLogLevel=%d", error_log_level);
error_log(LOG_DEBUG2, NULL, "StartupDelay=%d", startup_delay);
error_log(LOG_DEBUG2, NULL, "TLSProtocol=%d", tlsprotocol);
error_log(LOG_DEBUG2, NULL, "cURL version=%s", curlversion->version);
s = apr_table_get(conf, "CheckpointInterval");
if (s != NULL) {
checkpoint_interval = atoi(s);
error_log(LOG_DEBUG2, NULL,
"CheckpointInterval=%d", checkpoint_interval);
}
s = apr_table_get(conf, "InsecureNoCheckCert");
if (s != NULL) {
int num = atoi(s);
if (num)
{
ssl_validation = 0;
}
else
{
ssl_validation = 1;
}
error_log(LOG_DEBUG2, NULL, "InsecureNoCheckCert=%d", num);
}
s = apr_table_get(conf, "QueuePath");
if (s != NULL) {
queue_path = file_path(s);
error_log(LOG_DEBUG2, NULL, "QueuePath=%s", queue_path);
}
else {
error_log(LOG_ERROR, NULL,
"QueuePath not defined in the configuration file.");
logc_shutdown(1);
}
s = apr_table_get(conf, "LockFile");
if (s != NULL) {
lockfile = file_path(s);
error_log(LOG_DEBUG2, NULL, "LockFile=%s", lockfile);
}
s = apr_table_get(conf, "ServerErrorTimeout");
if (s != NULL) {
server_error_timeout = atoi(s);
error_log(LOG_DEBUG2, NULL,
"ServerErrorTimeout=%d", server_error_timeout);
}
s = apr_table_get(conf, "TransactionDelay");
if (s != NULL) {
transaction_delay = atoi(s);
error_log(LOG_DEBUG2, NULL, "TransactionDelay=%d", transaction_delay);
}
s = apr_table_get(conf, "TransactionLog");
if (s != NULL) {
transaction_log_path = file_path(s);
error_log(LOG_DEBUG2, NULL, "TransactionLog=%s", transaction_log_path);
}
s = apr_table_get(conf, "MaxConnections");
if (s != NULL) {
int v = atoi(s);
if (v >= 0) max_connections = v;
error_log(LOG_DEBUG2, NULL, "MaxConnections=%d", max_connections);
}
s = apr_table_get(conf, "MaxWorkerRequests");
if (s != NULL) {
int v = atoi(s);
if (v >= 0) max_worker_requests = v;
error_log(LOG_DEBUG2, NULL,
"MaxWorkerRequests=%d", max_worker_requests);
}
s = apr_table_get(conf, "KeepAlive");
if (s != NULL) {
int v = atoi(s);
if (v >= 0) keep_alive = v;
error_log(LOG_DEBUG2, NULL, "KeepAlive=%d", keep_alive);
}
s = apr_table_get(conf, "KeepAliveTimeout");
if (s != NULL) {
int v = atoi(s);
if (v >= 0) keep_alive_timeout = v;
error_log(LOG_DEBUG2, NULL, "KeepAliveTimeout=%d", keep_alive_timeout);
}
s = apr_table_get(conf, "LogStorageDir");
if (s != NULL) {
log_repository = file_path(s);
error_log(LOG_DEBUG2, NULL, "LogStorageDir=%s", log_repository);
}
else {
error_log(LOG_ERROR, NULL,
"Missing mandatory parameter LogStorageDir.\n");
logc_shutdown(1);
}
s = apr_table_get(conf, "ConsoleURI");
if (s != NULL) {
console_uri = s;
error_log(LOG_DEBUG2, NULL, "ConsoleURI=%s", console_uri);
}
else {
error_log(LOG_ERROR, NULL, "Missing mandatory parameter ConsoleURI.\n");
logc_shutdown(1);
}
s = apr_table_get(conf, "SensorUsername");
if (s != NULL) {
sensor_username = s;
error_log(LOG_DEBUG2, NULL, "SensorUsername=%s", sensor_username);
}
else {
error_log(LOG_ERROR, NULL,
"Missing mandatory parameter SensorUsername.\n");
logc_shutdown(1);
}
s = apr_table_get(conf, "SensorPassword");
if (s != NULL) {
sensor_password = s;
error_log(LOG_DEBUG2, NULL, "SensorPassword=%s", sensor_password);
}
else {
error_log(LOG_ERROR, NULL,
"Missing mandatory parameter SensorPassword.\n");
logc_shutdown(1);
}
s = apr_table_get(conf, "KeepEntries");
if (s != NULL) {
keep_entries = atoi(s);
}
else {
keep_entries = 0;
}
error_log(LOG_DEBUG2, NULL, "KeepEntries=%d", keep_entries);
}
/**
* Clean-up resources before process shutdown.
*/
static void logc_cleanup(void)
{
curl_global_cleanup();
}
/**
* Shutdown the logger.
*/
static void logc_shutdown(int rc)
{
/* Tell the threads to shut down. */
running = 0;
error_log(LOG_DEBUG, NULL, "Shutting down");
/* Wait for the management thread to stop */
/* ENH: Need a fixed timeout if this never happens */
while(management_thread_active != 0) {
apr_sleep(10 * 1000);
}
if (rc == 0) {
error_log(LOG_NOTICE, NULL,
"ModSecurity Audit Log Collector %s terminating normally.",
VERSION);
}
else {
error_log(LOG_NOTICE, NULL,
"ModSecurity Audit Log Collector %s "
"terminating with error %d", VERSION, rc);
}
if (error_log_fd != NULL) {
apr_file_flush(error_log_fd);
}
exit(rc);
}
/**
* Handle signals.
*/
static int handle_signals(int signum)
{
switch (signum) {
case SIGINT:
error_log(LOG_NOTICE, NULL, "Caught SIGINT, shutting down.");
logc_shutdown(0);
case SIGTERM:
error_log(LOG_NOTICE, NULL, "Caught SIGTERM, shutting down.");
logc_shutdown(0);
#ifndef WIN32
case SIGHUP:
error_log(LOG_NOTICE, NULL, "Caught SIGHUP, ignored.");
/* ENH: reload config? */
return 0;
case SIGALRM:
error_log(LOG_DEBUG, NULL, "Caught SIGALRM, ignored.");
return 0;
case SIGTSTP:
error_log(LOG_DEBUG, NULL, "Caught SIGTSTP, ignored.");
return 0;
#endif /* WIN32 */
}
#ifndef WIN32
error_log(LOG_NOTICE, NULL,
"Caught unexpected signal %d: %s",
signum, apr_signal_description_get(signum));
#else
error_log(LOG_NOTICE, NULL, "Caught unexpected signal %d", signum);
#endif /* WIN32 */
logc_shutdown(1);
return 0; /* should never reach */
}
#ifdef WIN32
/**
* This function is invoked by Curl to read the source file on Windows
*/
static size_t curl_readfunction(void *ptr, size_t size,
size_t nmemb, void *stream)
{
return fread(ptr, size, nmemb, (FILE *)stream);
}
#endif
/**
* This function is invoked by Curl to read the response
* body. Since we don't care about the response body the function
* pretends it is retrieving data where it isn't.
*/
static size_t curl_writefunction(void *ptr, size_t size,
size_t nmemb, void *stream)
{
unsigned char *data = (unsigned char *)ptr;
unsigned char *status = (unsigned char *)stream;
/* Grab the status line text from the first line of output */
if ((status[0] == 0) && (status[1] == 1)) {
apr_size_t i, j;
int ismsg = 0;
status[1] = 0; /* reset hidden init flag */
for (i = 0, j = 0; i < STATUSBUF_SIZE; i++) {
/* We found a line ending so we are done */
if ( data[i] == '\r' ) {
break;
}
/* Skip to after the first space (where msg is) */
if (ismsg < 3) {
if ((ismsg == 1) && !isspace(data[i])) {
ismsg++;
}
else if (isspace(data[i])) {
ismsg++;
}
continue;
}
/* Copy data (msg) from data to status */
status[j++] = data[i];
}
status[j] = '\0';
urldecode_inplace(status, j);
}
/* do nothing */
return (size * nmemb);
}
/**
* This function is invoked by Curl whenever it has something
* to say. We forward its messages to the error log at level
* DEBUG or DEBUG2 depending on the verbosity.
*/
static int curl_debugfunction(CURL *curl, curl_infotype infotype,
char *data, size_t datalen, void *ourdata)
{
apr_size_t i, effectivelen;
apr_thread_t *thread = (apr_thread_t *)ourdata;
if (error_log_level < LOG_DEBUG) return 0;
effectivelen = datalen;
for(i = 0; i < datalen; i++) {
if ((data[i] == 0x0a)||(data[i] == 0x0d)) {
effectivelen = i;
break;
}
}
switch (infotype) {
case CURLINFO_TEXT:
/* More verbose data starts with an indent */
if (apr_isspace(data[0])) {
char *dataptr = data + 1;
/* Skip initial whitespace (indent) */
while ( ((size_t)(dataptr - data) > datalen)
&& apr_isspace(*dataptr)) dataptr++;
dataptr++;
error_log(LOG_DEBUG2, thread, "CURL: %s",
_log_escape(apr_thread_pool_get(thread), dataptr,
effectivelen - (dataptr - data)));
}
else {
error_log(LOG_DEBUG, thread, "CURL: %s",
_log_escape(apr_thread_pool_get(thread), data,
effectivelen));
}
break;
case CURLINFO_HEADER_IN:
error_log(LOG_DEBUG, thread, "CURL: HEADER_IN %s",
_log_escape(apr_thread_pool_get(thread), data,
effectivelen));
break;
case CURLINFO_HEADER_OUT:
error_log(LOG_DEBUG, thread, "CURL: HEADER_OUT %s",
_log_escape(apr_thread_pool_get(thread), data,
effectivelen));
break;
case CURLINFO_DATA_IN:
error_log(LOG_DEBUG2, thread, "CURL: DATA_IN %s",
_log_escape(apr_thread_pool_get(thread), data,
effectivelen));
break;
case CURLINFO_DATA_OUT:
error_log(LOG_DEBUG2, thread, "CURL: DATA_OUT %s",
_log_escape(apr_thread_pool_get(thread), data,
effectivelen));
break;
default:
/* Ignore anything else */
break;
}
return 0;
}
/**
* Initialise the necessary resources and structures.
*/
static void logc_init(void)
{
char errstr[1024];
apr_status_t rc = 0;
const char *errptr = NULL;
int i, erroffset;
/* cURL major, minor and patch version */
short cmaj, cmin, cpat = 0;
queue = apr_array_make(pool, 64, sizeof(entry_t *));
if (queue == NULL) {
error_log(LOG_ERROR, NULL, MEMALLOC_ERROR_MSG);
logc_shutdown(1);
}
in_progress = apr_hash_make(pool);
if (in_progress == NULL) {
error_log(LOG_ERROR, NULL, MEMALLOC_ERROR_MSG);
logc_shutdown(1);
}
if ((rc = apr_global_mutex_create(&gmutex, lockfile,
APR_LOCK_DEFAULT, pool)) != APR_SUCCESS)
{
error_log(LOG_ERROR, NULL, "Failed to create global mutex: %s",
apr_strerror(rc, errstr, 1024));
logc_shutdown(1);
}
if ((rc = apr_thread_mutex_create(&mutex, APR_THREAD_MUTEX_UNNESTED,
pool)) != APR_SUCCESS)
{
error_log(LOG_ERROR, NULL, "Failed to create thread mutex: %s",
apr_strerror(rc, errstr, 1024));
logc_shutdown(1);
}
entry_counter = 1;
curl_handles = apr_array_make(pool, max_connections, sizeof(CURL *));
if (curl_handles == NULL) {
error_log(LOG_ERROR, NULL, MEMALLOC_ERROR_MSG);
logc_shutdown(1);
}
/* Initialise a number of Curl handles. */
for(i = 0; i < max_connections; i++) {
CURL *curl = NULL;
/* Create cURL handle. */
curl = curl_easy_init();
/* Pre-configure the handle. */
curl_easy_setopt(curl, CURLOPT_UPLOAD, TRUE);
curl_easy_setopt(curl, CURLOPT_ERRORBUFFER, (char *)NULL);
curl_easy_setopt(curl, CURLOPT_URL, console_uri);
curl_easy_setopt(curl, CURLOPT_HTTPAUTH, CURLAUTH_BASIC);
if (ssl_validation)
{
curl_easy_setopt(curl, CURLOPT_SSL_VERIFYPEER, 1);
curl_easy_setopt(curl, CURLOPT_SSL_VERIFYHOST, 1);
}
else
{
curl_easy_setopt(curl, CURLOPT_SSL_VERIFYPEER, FALSE);
curl_easy_setopt(curl, CURLOPT_SSL_VERIFYHOST, 0);
}
/* Seems like CURL_SSLVERSION_TLSv1_2 is not supported on libcurl
* < v7.34.0
*
* version_num is a 24 bit number created like this:
* <8 bits major number> | <8 bits minor number> | <8 bits patch number>.
*/
switch (tlsprotocol) {
case 0:
curl_easy_setopt(curl, CURLOPT_SSLVERSION, CURL_SSLVERSION_TLSv1_0);
break;
case 1:
curl_easy_setopt(curl, CURLOPT_SSLVERSION, CURL_SSLVERSION_TLSv1_1);
break;
case 2:
curl_easy_setopt(curl, CURLOPT_SSLVERSION, CURL_SSLVERSION_TLSv1_2);
break;
default:
curl_easy_setopt(curl, CURLOPT_SSLVERSION, CURL_SSLVERSION_TLSv1_2);
break;
}
cmaj = curlversion->version_num >> 16;
cmin = (curlversion->version_num & 0x00ff00) >> 8;
cpat = (curlversion->version_num & 0x0000ff);
/* If cURL version < v7.34.0, use TLS v1.x */
if (cmaj <= 7 && cmin < 34) {
curl_easy_setopt(curl, CURLOPT_SSLVERSION, CURL_SSLVERSION_TLSv1);
}
curl_easy_setopt(curl, CURLOPT_CONNECTTIMEOUT, 15);
curl_easy_setopt(curl, CURLOPT_NOSIGNAL, TRUE);
curl_easy_setopt(curl, CURLOPT_HEADER, TRUE);
curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION, curl_writefunction);
*(CURL **)apr_array_push(curl_handles) = curl;
}
if (cmaj <= 7 && cmin < 34) {
error_log(LOG_DEBUG2, NULL, "TLSv1.2 is unsupported in cURL %d.%d.%d", cmaj, cmin, cpat);
}
logline_regex = pcre_compile(logline_pattern, PCRE_CASELESS,
&errptr, &erroffset, NULL);
if (logline_regex == NULL) {
error_log(LOG_ERROR, NULL,
"Failed to compile pattern: %s\n", logline_pattern);
logc_shutdown(1);
}
requestline_regex = pcre_compile(requestline_pattern,
PCRE_CASELESS, &errptr, &erroffset, NULL);
if (requestline_regex == NULL) {
error_log(LOG_ERROR, NULL,
"Failed to compile pattern: %s\n", requestline_pattern);
logc_shutdown(1);
}
}
/**
* HACK: To allow two mlogcs running against a single dataset we use the
* mtime as a flag for deletion.
*
* 1) Check file date.
* 2) If it is KEEP_ENTRIES_REMOVE_TIME, then remove the file.
* 3) Otherwise set the date and let the other mlogc remove it.
*/
static void keep_entries_hack(apr_pool_t *mp,
apr_thread_t *thread, const char *fn)
{
apr_file_t *f = NULL;
apr_finfo_t finfo;
char errstr[1024];
apr_status_t rc;
/* Opening for write as required for exclusive lock */
if ((rc = apr_file_open(&f, fn,
APR_READ|APR_WRITE|APR_APPEND,
APR_OS_DEFAULT, mp)) != APR_SUCCESS)
{
error_log(LOG_ERROR, thread,
"Could not open \"%s\": %s",
fn, apr_strerror(rc, errstr, 1024));
return;
}
if ((rc = apr_file_lock(f,
APR_FLOCK_EXCLUSIVE|APR_FLOCK_NONBLOCK)) != APR_SUCCESS)
{
error_log(LOG_DEBUG2, thread,
"Waiting for lock on \"%s\": %s",
fn, apr_strerror(rc, errstr, 1024));
if ((rc = apr_file_lock(f, APR_FLOCK_EXCLUSIVE)) != APR_SUCCESS) {
error_log(LOG_ERROR, thread,
"Could not lock \"%s\": %s",
fn, apr_strerror(rc, errstr, 1024));
apr_file_close(f);
return;
}
}
error_log(LOG_DEBUG2, thread, "Locked: %s", fn);
/* For testing only */
TEST_WITH_RAND_SLEEP(2);
if ((rc = apr_stat(&finfo, fn, APR_FINFO_MIN, mp)) != APR_SUCCESS) {
error_log(LOG_ERROR, thread,
"Could not stat \"%s\": %s",
fn, apr_strerror(rc, errstr, 1024));
error_log(LOG_DEBUG2, thread, "Unlocked: %s", fn);
apr_file_close(f);
return;
}
if (error_log_level >= LOG_DEBUG) {
error_log(LOG_DEBUG, thread,
"STAT \"%s\" {"
"uid=%d; gid=%d; size=%" APR_OFF_T_FMT "; "
"csize=%" APR_OFF_T_FMT "; atime=%" APR_TIME_T_FMT "; "
"ctime=%" APR_TIME_T_FMT "; mtime=%" APR_TIME_T_FMT
"}",
fn, finfo.user, finfo.group, finfo.size,
finfo.csize, finfo.atime, finfo.ctime, finfo.mtime);
}
if (finfo.mtime != KEEP_ENTRIES_REMOVE_TIME) {
error_log(LOG_DEBUG2, thread, "Set mtime: %s", fn);
if ((rc = apr_file_mtime_set(fn,
(apr_time_t)KEEP_ENTRIES_REMOVE_TIME, mp))
!= APR_SUCCESS)
{
error_log(LOG_ERROR, thread,
"Could not set mtime on \"%s\": %s",
fn, apr_strerror(rc, errstr, 1024));
}
error_log(LOG_DEBUG2, thread, "Unlocked: %s", fn);
apr_file_close(f);
return;
}
error_log(LOG_DEBUG, thread, "Removing: %s", fn);
error_log(LOG_DEBUG2, thread, "Unlocked: %s", fn);
apr_file_close(f);
apr_file_remove(fn, mp);
}
/**
* Worker thread. Works in a loop, fetching jobs from the queue,
* until the queue is empty or it is otherwise told to quit.
*/
static void * APR_THREAD_FUNC thread_worker(apr_thread_t *thread, void *data)
{
unsigned int loop_count = 0;
CURL *curl = (CURL *)data;
entry_t **entryptr = NULL;
entry_t *entry = NULL;
apr_status_t rc;
apr_finfo_t finfo;
int capturevector[CAPTUREVECTORSIZE];
int take_new = 1;
apr_pool_t *tpool;
struct curl_slist *headerlist = NULL;
char curl_error_buffer[CURL_ERROR_SIZE] = "";
int num_requests = 0;
/* There is no need to do the sleep if this was an invalid entry
* as the sleep is just to protect flooding the console server
* with rapid requests. With an invalid entry we never hit the
* server, so we should not delay processing the next event.
*/
int nodelay = 0;
error_log(LOG_DEBUG, thread, "Worker thread starting.");
/* Each worker uses its own pool to manage memory. To avoid
* memory leaks the pool is cleared after each processed
* entry.
*/
apr_pool_create(&tpool, thread_pool);
/* Process jobs in a queue until there are no more jobs to process. */
for(;;) {
nodelay = 0;
/* Do we need to shut down? */
if (running == 0) {
error_log(LOG_DEBUG, thread, "We were told to shut down.");
goto THREAD_SHUTDOWN;
}
/* Is there a problem with the server? We need
* to shut down if there is. Except that we don't
* want to shut down if we were launched to investigate
* if the server came back online (loop_count will be
* zero in that case).
*/
if ((server_error == 1)&&(loop_count != 0)) {
error_log(LOG_DEBUG, thread, "Shutting down due to server error.");
goto THREAD_SHUTDOWN;
}
loop_count++;
/* Get a new entry, but only if we need one. */
if (take_new) {
error_log(LOG_DEBUG, thread, "Worker fetch locking thread mutex.");
if (APR_STATUS_IS_EBUSY(apr_thread_mutex_trylock(mutex))) {
error_log(LOG_DEBUG, thread,
"Worker fetch waiting on thread mutex.");
apr_thread_mutex_lock(mutex);
}
error_log(LOG_DEBUG, thread, "Worker fetch started.");
/* Deal with the previous entry. */
if (entry != NULL) {
error_log(LOG_DEBUG, thread,
"Removing previous entry from storage.");
transaction_log(TXOUT, entry->line);
/* Remove previous entry from storage. */
apr_hash_set(in_progress, &entry->id, sizeof(entry->id), NULL);
/* Release the memory it used to occupy. */
free((void *)entry->line);
free(entry);
entry = NULL;
}
error_log(LOG_DEBUG, thread, "Getting one entry from the queue.");
/* Get one entry. */
entryptr = (entry_t **)apr_array_pop(queue);
if (entryptr == NULL) {
error_log(LOG_DEBUG, thread,
"Worker fetch unlocking thread mutex.");
apr_thread_mutex_unlock(mutex);
error_log(LOG_DEBUG, thread,
"No more work for this thread, exiting.");
goto THREAD_SHUTDOWN;
}
entry = *entryptr;
apr_hash_set(in_progress, &entry->id, sizeof(entry->id), entry);
error_log(LOG_DEBUG, thread, "Worker fetch completed.");
error_log(LOG_DEBUG, thread,
"Worker fetch unlocking thread mutex.");
apr_thread_mutex_unlock(mutex);
}
/* Send one entry. */
error_log(LOG_DEBUG, thread, "Processing entry.");
take_new = 0;
/* Keep track of requests processed if we need to */
if (max_worker_requests > 0) {
num_requests++;
}
rc = pcre_exec(logline_regex, NULL, entry->line, entry->line_size, 0, 0,
capturevector, CAPTUREVECTORSIZE);
if (rc == PCRE_ERROR_NOMATCH) { /* No match. */
error_log(LOG_WARNING, thread,
"Invalid entry (failed to match regex): %s",
_log_escape(tpool, entry->line, entry->line_size));
take_new = 1;
nodelay = 1;
}
else if (rc < 0) { /* Error condition. */
error_log(LOG_WARNING, thread,
"Invalid entry (PCRE error %d): %s",
rc, _log_escape(tpool, entry->line, entry->line_size));
take_new = 1;
nodelay = 1;
}
else { /* We have a match. */
char *uniqueid = NULL;
char *auditlogentry = NULL;
char *hash = NULL;
char *summary = NULL;
char *credentials = NULL;
error_log(LOG_DEBUG, thread, "Regular expression matched.");
/* For testing only */
TEST_WITH_RAND_SLEEP(2);
uniqueid = apr_psprintf(tpool, "%.*s",
(capturevector[2*13+1] - capturevector[2*13]),
(entry->line + capturevector[2*13]));
auditlogentry = apr_psprintf(tpool, "%s/%.*s", log_repository,
(capturevector[2*15+1] - capturevector[2*15]),
(entry->line + capturevector[2*15]));
hash = apr_psprintf(tpool, "X-Content-Hash: %.*s",
(capturevector[2*18+1] - capturevector[2*15]),
(entry->line + capturevector[2*18]));
summary = apr_psprintf(tpool, "X-ForensicLog-Summary: %s",
entry->line);
credentials = apr_psprintf(tpool, "%s:%s",
sensor_username, sensor_password);
rc = apr_stat(&finfo, auditlogentry, APR_FINFO_SIZE, tpool);
if (rc == APR_SUCCESS) {
FILE *hd_src;
char response_buf[STATUSBUF_SIZE];
CURLcode res;
if (error_log_level >= LOG_DEBUG) {
error_log(LOG_DEBUG, thread,
"STAT \"%s\" {"
"uid=%d; gid=%d; size=%" APR_OFF_T_FMT "; "
"csize=%" APR_OFF_T_FMT "; "
"atime=%" APR_TIME_T_FMT "; "
"ctime=%" APR_TIME_T_FMT "; "
"mtime=%" APR_TIME_T_FMT
"}",
auditlogentry, finfo.user, finfo.group,
finfo.size, finfo.csize, finfo.atime,
finfo.ctime, finfo.mtime);
}
/* Initialize the respone buffer with a hidden value */
response_buf[0] = 0;
response_buf[1] = 1;
if (finfo.size == 0) {
error_log(LOG_WARNING, thread,
"File found (%" APR_OFF_T_FMT
" bytes), skipping.", finfo.size);
take_new = 1;
nodelay = 1;
goto THREAD_CLEANUP;
}
else {
error_log(LOG_DEBUG, thread,
"File found (%" APR_OFF_T_FMT
" bytes), activating cURL.", finfo.size);
}
curl_easy_setopt(curl, CURLOPT_VERBOSE, 1);
curl_easy_setopt(curl, CURLOPT_DEBUGFUNCTION,
curl_debugfunction);
curl_easy_setopt(curl, CURLOPT_DEBUGDATA, thread);
curl_easy_setopt(curl, CURLOPT_ERRORBUFFER, curl_error_buffer);
curl_easy_setopt(curl, CURLOPT_USERPWD, credentials);
curl_easy_setopt(curl, CURLOPT_WRITEDATA, (char *)response_buf);
headerlist = curl_slist_append(headerlist, "Expect:");
headerlist = curl_slist_append(headerlist, hash);
headerlist = curl_slist_append(headerlist, summary);
curl_easy_setopt(curl, CURLOPT_HTTPHEADER, headerlist);
hd_src = fopen(auditlogentry, "rb");
if (hd_src == NULL) {
error_log(LOG_WARNING, thread,
"Invalid entry (failed to open file for "
"reading): %s", auditlogentry);
take_new = 1;
nodelay = 1;
goto THREAD_CLEANUP;
}
curl_easy_setopt(curl, CURLOPT_READDATA, hd_src);
curl_easy_setopt(curl, CURLOPT_INFILESIZE_LARGE, finfo.size);
#ifdef WIN32
/* Mandatory on win32 */
curl_easy_setopt(curl, CURLOPT_READFUNCTION, curl_readfunction);
#endif
res = curl_easy_perform(curl);
fclose(hd_src);
if (res == 0) {
long response_code = 0;
res = curl_easy_getinfo(curl, CURLINFO_RESPONSE_CODE,
&response_code);
error_log(LOG_DEBUG, thread,
"Request returned with status \"%ld %s\": %s",
response_code, response_buf, uniqueid);
if (response_code == 0) {
/* Assume problem with connection */
error_log(LOG_WARNING, thread,
"Flagging server as errored after failure "
"to retrieve response code for entry %s "
"(cURL code %d): Possible SSL negotiation "
"error",
uniqueid, res);
apr_sleep(1000 * 1000);
take_new = 0;
server_error = 1;
server_error_last_check_time = apr_time_now();
}
else if (res != 0) {
error_log(LOG_WARNING, thread,
"Flagging server as errored after failure "
"to retrieve response code for entry %s "
"(cURL code %d): %s",
uniqueid, res, curl_error_buffer);
apr_sleep(1000 * 1000);
take_new = 0;
server_error = 1;
server_error_last_check_time = apr_time_now();
}
else {
if (response_code == 200) {
double total_time, upload_size;
if (server_error == 1) {
error_log(LOG_NOTICE, thread,
"Clearing the server error flag "
"after successful entry "
"submission: %s", uniqueid);
}
server_error = 0;
server_error_last_check_time = 0;
curl_easy_getinfo(curl, CURLINFO_TOTAL_TIME,
&total_time);
curl_easy_getinfo(curl, CURLINFO_SIZE_UPLOAD,
&upload_size);
if (!keep_entries) {
error_log(LOG_DEBUG, thread,
"Removing: %s", auditlogentry);
apr_file_remove(auditlogentry, tpool);
}
else if (keep_entries == KEEP_ENTRIES_REMOVE_HACK)
{
keep_entries_hack(tpool, thread, auditlogentry);
}
error_log(LOG_NOTICE, thread,
"Entry completed (%.3f seconds, %.0f "
"bytes): %s",
total_time, upload_size,
uniqueid);
take_new = 1;
}
else if (response_code == 409) {
/* Assume problem with audit log entry. */
error_log(LOG_WARNING, thread,
"Failed to submit entry with "
"\"409 %s\": %s",
response_buf, uniqueid);
take_new = 1;
}
else {
/* Assume problem with server. */
error_log(LOG_WARNING, thread,
"Flagging server as errored after "
"failure to submit entry %s with "
"HTTP response code %ld: %s",
uniqueid, response_code, response_buf);
server_error = 1;
server_error_last_check_time = apr_time_now();
take_new = 0;
}
}
}
else { /* Something isn't right. */
error_log(LOG_WARNING, thread,
"Flagging server as errored after "
"failure to submit entry %s "
"(cURL code %d): %s",
uniqueid, res, curl_error_buffer);
server_error = 1;
server_error_last_check_time = apr_time_now();
take_new = 0;
}
}
else {
error_log(LOG_WARNING, thread,
"Invalid entry (file not found %d): %s",
rc, auditlogentry);
take_new = 1;
nodelay = 1;
}
/* If we are tracking num_requests, then shutdown if we are
* over our threshold.
*/
if (num_requests && (num_requests >= max_worker_requests)) {
error_log(LOG_NOTICE, thread,
"Reached max requests (%d) for this worker, exiting.",
max_worker_requests);
goto THREAD_SHUTDOWN;
}
}
THREAD_CLEANUP:
/* Sleep if we sent data to the server so we do not flood */
/* ENH: Need to sleep for 1ms in a loop checking for shutdown */
if ((nodelay == 0) && (transaction_delay > 0)) {
error_log(LOG_DEBUG, thread,
"Sleeping for %d msec.", transaction_delay);
apr_sleep(transaction_delay * 1000);
}
if (headerlist != NULL) {
curl_slist_free_all(headerlist);
headerlist = NULL;
}
apr_pool_clear(tpool);
error_log(LOG_DEBUG, thread, "Worker processing completed.");
}
THREAD_SHUTDOWN:
error_log(LOG_DEBUG, thread, "Worker shutdown locking thread mutex.");
if (APR_STATUS_IS_EBUSY(apr_thread_mutex_trylock(mutex))) {
error_log(LOG_DEBUG, thread,
"Worker shutdown waiting on thread mutex.");
apr_thread_mutex_lock(mutex);
}
/* Deal with the previous entry, if any. */
if (entry != NULL) {
apr_hash_set(in_progress, &entry->id, sizeof(entry->id), NULL);
if (take_new == 0) { /* Not done. */
*(entry_t **)apr_array_push(queue) = entry;
}
else {
transaction_log(TXOUT, entry->line);
free((void *)entry->line);
free(entry);
}
entry = NULL;
}
/* Return curl handle to the pool for reuse. */
*(CURL **)apr_array_push(curl_handles) = curl;
/* No more work, exit. */
current_workers--;
error_log(LOG_DEBUG, thread, "Worker shutdown unlocking thread mutex.");
apr_thread_mutex_unlock(mutex);
apr_pool_destroy(tpool);
error_log(LOG_DEBUG, thread, "Worker thread completed.");
apr_thread_exit(thread, 0);
return NULL;
}
/**
* Creates one new worker, giving it one of the available
* Curl handles to work with.
*/
static void create_new_worker(int lock)
{
apr_thread_t *thread = NULL;
CURL **curlptr = NULL;
if (lock) {
error_log(LOG_DEBUG, NULL, "Worker creation locking thread mutex.");
if (APR_STATUS_IS_EBUSY(apr_thread_mutex_trylock(mutex))) {
error_log(LOG_DEBUG, NULL,
"Worker creation waiting on thread mutex.");
apr_thread_mutex_lock(mutex);
}
}
error_log(LOG_DEBUG, NULL, "Worker creation started.");
/* A sanity check: this part executes under lock and
* we want to make *sure* we don't create more threads
* than we are allowed.
*/
if (current_workers >= max_connections) {
if (lock) {
error_log(LOG_DEBUG, NULL,
"Worker creation unlocking thread mutex.");
apr_thread_mutex_unlock(mutex);
}
return;
}
/* Cleanup thread pool when idle */
if (current_workers <= 0) {
if (thread_pool != NULL) {
error_log(LOG_DEBUG, NULL, "Destroying thread_pool.");
apr_pool_destroy(thread_pool);
thread_pool = NULL;
}
error_log(LOG_DEBUG, NULL, "Creating thread_pool.");
apr_pool_create(&thread_pool, NULL);
}
curlptr = (CURL **)apr_array_pop(curl_handles);
if (curlptr != NULL) {
apr_threadattr_t *thread_attrs;
apr_status_t rc;
apr_threadattr_create(&thread_attrs, thread_pool);
apr_threadattr_detach_set(thread_attrs, 1);
apr_threadattr_stacksize_set(thread_attrs, 1024);
rc = apr_thread_create(&thread, thread_attrs,
thread_worker, *curlptr, thread_pool);
if (rc != APR_SUCCESS) {
if (lock) {
error_log(LOG_DEBUG, NULL,
"Worker creation unlocking thread mutex.");
apr_thread_mutex_unlock(mutex);
}
error_log(LOG_ERROR, NULL,
"Failed to create new worker thread: %d", rc);
logc_shutdown(1);
}
current_workers++;
}
else {
if (lock) {
error_log(LOG_DEBUG, NULL,
"Worker creation unlocking thread mutex.");
apr_thread_mutex_unlock(mutex);
}
error_log(LOG_ERROR, NULL, "No more cURL handles (Internal Error).");
logc_shutdown(1);
}
error_log(LOG_DEBUG, NULL, "Worker creation completed: %pp", thread);
if (lock) {
error_log(LOG_DEBUG, NULL, "Worker creation unlocking thread mutex.");
apr_thread_mutex_unlock(mutex);
}
}
/**
* This function implements the management thread.
*/
static void * APR_THREAD_FUNC thread_manager(apr_thread_t *thread, void *data)
{
apr_time_t last = 0;
apr_time_t now = 0;
error_log(LOG_DEBUG, thread, "Management thread: Starting.");
for(;;) {
now = apr_time_now();
/* Should we stop running? */
if (running == 0) {
/* We need to be last */
error_log(LOG_DEBUG, thread,
"Management thread: Waiting for worker "
"threads to finish.");
while(current_workers > 0) {
apr_sleep(10 * 1000);
}
if (have_read_data) {
error_log(LOG_NOTICE, thread,
"Running final transaction checkpoint.");
transaction_checkpoint();
}
error_log(LOG_DEBUG, thread, "Management thread: Exiting.");
management_thread_active = 0;
apr_thread_exit(thread, 0);
}
/* Sleep for a while, but wake up often to check running status */
if ((last > 0) && ((now - last) < MANAGER_SLEEP)) {
apr_sleep(MANAGER_SUBSLEEP);
continue;
}
last = now;
error_log(LOG_DEBUG2, thread, "Management thread: Processing");
/* When the server is flagged errored we need to
* create a worker thread from time to time to
* investigate.
*/
if (server_error) {
if ((current_workers == 0)&&
(apr_time_sec(now - server_error_last_check_time)
> server_error_timeout))
{
server_error_last_check_time = now;
error_log(LOG_DEBUG, thread,
"Management thread: Creating worker thread to "
"investigate server.");
create_new_worker(1);
}
}
else {
if ( (current_workers < max_connections)
&& (queue->nelts > current_workers) )
{
error_log(LOG_DEBUG, thread,
"Management thread: Creating worker thread to "
"catch up with the queue.");
create_new_worker(1);
}
}
/* Initiate a transaction log checkpoint if enough time passed
* since the last one.
*/
if (apr_time_sec(now - checkpoint_time_last) > checkpoint_interval) {
error_log(LOG_DEBUG, thread,
"Management thread: Initiating a checkpoint "
"(previous was %" APR_TIME_T_FMT " seconds ago).",
apr_time_sec(now - checkpoint_time_last));
checkpoint_time_last = now;
transaction_checkpoint();
}
else {
error_log(LOG_DEBUG2, thread,
"Management thread: Last checkpoint was %" APR_TIME_T_FMT
" seconds ago.",
apr_time_sec(now - checkpoint_time_last));
}
}
return NULL;
}
#ifndef WIN32
/**
* Thread to handle all signals
*/
static void * APR_THREAD_FUNC thread_signals(apr_thread_t *thread, void *data)
{
apr_status_t rc;
error_log(LOG_DEBUG, thread, "Signal thread: Starting.");
rc = apr_signal_thread(handle_signals);
if (rc != APR_SUCCESS) {
error_log(LOG_DEBUG, thread, "Signal thread: Error %d", rc);
logc_shutdown(1);
}
return NULL;
}
#endif /* WIN32 */
/**
* The main loop where we receive log entries from
* Apache and add them to the queue, sometimes creating
* new worker threads to handle them.
*/
static void receive_loop(void) {
apr_file_t *fd_stdin;
apr_size_t nbytes = PIPE_BUF_SIZE;
char *buf = apr_palloc(pool, PIPE_BUF_SIZE + 1);
char errstr[1024];
apr_size_t evnt = 0; /* Index in buf to first event char */
apr_size_t curr = 0; /* Index in buf to current processing char */
apr_size_t next = 0; /* Index in buf to next unused char */
int done = 0;
int drop_next = 0;
int buffered_events = 0;
int count = 0;
apr_pool_t *tmp_pool;
/* Open stdin. */
if (apr_file_open_stdin(&fd_stdin, pool) != APR_SUCCESS) {
error_log(LOG_ERROR, NULL, "Unable to open stdin for reading");
logc_shutdown(1);
}
/* Always want this NUL terminated */
buf[PIPE_BUF_SIZE] = '\0';
apr_pool_create(&tmp_pool, NULL);
/* Loop forever receiving entries from stdin. */
while(!done || (curr < next)) {
apr_status_t rc;
if (error_log_level >= LOG_DEBUG2) {
error_log(LOG_DEBUG2, NULL,
"Internal state: "
"[evnt \"%" APR_SIZE_T_FMT "\"]"
"[curr \"%" APR_SIZE_T_FMT "\"]"
"[next \"%" APR_SIZE_T_FMT "\"]"
"[nbytes \"%" APR_SIZE_T_FMT "\"]",
evnt, curr, next, nbytes);
}
/* If we are not done and have the space, read more */
if (!done && (nbytes > 0)) {
buffered_events = 0;
nbytes = PIPE_BUF_SIZE - next;
rc = apr_file_read(fd_stdin, (buf + next), &nbytes);
if (rc != APR_SUCCESS) {
if (have_read_data) {
error_log(LOG_NOTICE, NULL,
"No more data to read, emptying buffer: %s",
apr_strerror(rc, errstr, 1024));
}
done = 1;
}
else {
have_read_data = 1;
if (error_log_level == LOG_DEBUG) {
error_log(LOG_DEBUG, NULL,
"Read %" APR_SIZE_T_FMT " bytes from pipe",
nbytes);
}
else {
error_log(LOG_DEBUG2, NULL,
"Read %" APR_SIZE_T_FMT " bytes from pipe: `%s'",
nbytes,
_log_escape(tmp_pool, (buf + next), nbytes));
}
}
next += nbytes;
}
/**
* Each chunk of data we receive can contain one or more lines for
* which we need to find the EOL marker and then queue the event
* up to that. So, find/queue as many lines in the buffer as we
* can. Any remaining data will get shifted back to the beginning
* of the buffer and the buffer size for the next read adjusted.
*/
while(curr < next) {
/* Look for EOL so we can parse the event */
while((curr < next) && (buf[curr] != 0x0a)) {
curr++;
}
if (buf[curr] == 0x0a) {
buf[curr] = '\0';
/* We may have to drop this one if it previously failed */
if (drop_next) {
error_log(LOG_ERROR, NULL,
"Dropping remaining portion of failed "
"event: `%s'",
_log_escape(tmp_pool,
(buf + evnt), (curr - evnt)));
drop_next = 0;
}
else {
transaction_log(TXIN, buf + evnt);
error_log(LOG_DEBUG2, NULL,
"Received audit log entry "
"(count %lu queue %d workers %d): %s",
entry_counter, queue->nelts,
current_workers,
_log_escape(tmp_pool,
(buf + evnt), strlen(buf + evnt)));
add_entry(buf + evnt, 1);
buffered_events++;
}
/* Advance indexes to next event in buf */
evnt = curr = curr + 1;
}
else {
error_log(LOG_DEBUG2, NULL,
"Event buffer contains partial event: `%s'",
_log_escape(tmp_pool, (buf + evnt), (next - evnt)));
break;
}
}
if (buffered_events > 0) {
error_log(LOG_DEBUG, NULL,
"Processed %d entries from buffer.", buffered_events);
/* Move the unused portion of the buffer to the beginning */
next -= evnt;
curr -= evnt;
memmove(buf, (buf + evnt), next);
error_log(LOG_DEBUG2, NULL,
"Shifted buffer back %" APR_SIZE_T_FMT
" and offset %" APR_SIZE_T_FMT
" bytes for next read: `%s'",
evnt, next, _log_escape(tmp_pool, buf, next));
evnt = 0;
}
else if (next == PIPE_BUF_SIZE) {
/**
* There is a chance we could fill the buffer, but not have finished
* reading the event (no EOL yet), so we need to say so and drop
* all data until we find the end of the event that is too large.
*/
if (drop_next) {
error_log(LOG_ERROR, NULL,
"Event continuation too large, "
"dropping it as well: `%s'",
_log_escape(tmp_pool, buf, PIPE_BUF_SIZE));
}
else {
error_log(LOG_ERROR, NULL,
"Event too large, dropping event: `%s'",
_log_escape(tmp_pool, buf, PIPE_BUF_SIZE));
}
/* Rewind buf and mark that we need to drop up to the next event */
evnt = curr = next = 0;
drop_next = 1;
}
nbytes = PIPE_BUF_SIZE - next;
if (count++ > 1000) {
count = 0;
error_log(LOG_DEBUG, NULL, "Recycling tmp_pool.");
apr_pool_destroy(tmp_pool);
apr_pool_create(&tmp_pool, NULL);
}
else {
apr_pool_clear(tmp_pool);
}
}
/* Wait for queue to empty if specified */
if ((server_error == 0) && (opt_force != 0) && (queue->nelts > 0)) {
error_log(LOG_NOTICE, NULL,
"Waiting for queue to empty (%d active).", queue->nelts);
while ((server_error == 0) && (opt_force != 0) && (queue->nelts > 0)) {
apr_sleep(10 * 1000);
}
if (queue->nelts > 0) {
error_log(LOG_ERROR, NULL,
"Could not empty queue (%d active).", queue->nelts);
}
}
}
/**
* Creates the management thread.
*/
static void start_management_thread(void)
{
apr_thread_t *thread = NULL;
apr_threadattr_t *thread_attrs;
apr_status_t rc;
apr_threadattr_create(&thread_attrs, pool);
apr_threadattr_detach_set(thread_attrs, 1);
apr_threadattr_stacksize_set(thread_attrs, 1024);
management_thread_active = 1;
rc = apr_thread_create(&thread, thread_attrs, thread_manager, NULL, pool);
if (rc != APR_SUCCESS) {
error_log(LOG_ERROR, NULL,
"Failed to create new management thread: %d", rc);
management_thread_active = 0;
logc_shutdown(1);
}
}
#ifndef WIN32
/**
* Creates a thread to handle all signals
*/
static void start_signal_thread(void)
{
apr_thread_t *thread = NULL;
apr_threadattr_t *thread_attrs;
apr_status_t rc;
apr_threadattr_create(&thread_attrs, pool);
apr_threadattr_detach_set(thread_attrs, 1);
apr_threadattr_stacksize_set(thread_attrs, 1024);
rc = apr_thread_create(&thread, thread_attrs, thread_signals, NULL, pool);
if (rc != APR_SUCCESS) {
error_log(LOG_ERROR, NULL,
"Failed to create new signal thread: %d", rc);
logc_shutdown(1);
}
}
#endif /* WIN32 */
/**
* Usage text.
*/
static void usage(void) {
fprintf(stderr, "ModSecurity Log Collector (mlogc) v%s\n", VERSION);
fprintf(stderr, " Usage: mlogc [options] /path/to/the/mlogc.conf\n");
fprintf(stderr, "\n");
fprintf(stderr, " Options:\n");
fprintf(stderr, " -f Force depletion of queue on exit\n");
fprintf(stderr, " -v Version information\n");
fprintf(stderr, " -h This help\n\n");
}
/**
* Version text.
*/
static void version(void) {
fprintf(stderr,
"ModSecurity Log Collector (mlogc) v%s\n", VERSION);
fprintf(stderr,
" APR: compiled=\"%s\"; "
"loaded=\"%s\"\n", APR_VERSION_STRING, apr_version_string());
fprintf(stderr,
" PCRE: compiled=\"%d.%d\"; "
"loaded=\"%s\"\n", PCRE_MAJOR, PCRE_MINOR, pcre_version());
fprintf(stderr,
" cURL: compiled=\"%s\"; "
"loaded=\"%s\"\n", LIBCURL_VERSION, curl_version());
fprintf(stderr, "\n");
}
/**
* This is the main entry point.
*/
int main(int argc, const char * const argv[]) {
apr_getopt_t *opt;
apr_status_t rc;
apr_app_initialize(&argc, &argv, NULL);
atexit(apr_terminate);
curl_global_init(CURL_GLOBAL_ALL);
atexit(logc_cleanup);
logc_pid = getpid();
apr_pool_create(&pool, NULL);
apr_pool_create(&recv_pool, NULL);
#ifndef WIN32
apr_setup_signal_thread();
#else
apr_signal(SIGINT, handle_signals);
apr_signal(SIGTERM, handle_signals);
#endif /* WIN32 */
if (argc < 2) {
usage();
logc_shutdown(1);
}
/* Commandline opts */
rc = apr_getopt_init(&opt, pool, argc, argv);
if (rc != APR_SUCCESS) {
usage();
logc_shutdown(1);
}
do {
char ch;
const char *val;
rc = apr_getopt(opt, CMDLINE_OPTS, &ch, &val);
switch (rc) {
case APR_SUCCESS:
switch (ch) {
case 'f':
opt_force = 1;
break;
case 'v':
version();
logc_shutdown(0);
case 'h':
usage();
logc_shutdown(0);
}
break;
case APR_BADCH:
case APR_BADARG:
usage();
logc_shutdown(1);
}
} while (rc != APR_EOF);
/* Conf file is last */
conffile = argv[argc - 1];
read_configuration();
init_configuration();
logc_init();
transaction_log_init();
running = 1;
server_error = 0;
start_management_thread();
#ifndef WIN32
start_signal_thread();
#endif /* WIN32 */
/* Process stdin until EOF */
receive_loop();
logc_shutdown(0);
return 0;
}