/* * ModSecurity for Apache 2.x, http://www.modsecurity.org/ * Copyright (c) 2004-2013 Trustwave Holdings, Inc. (http://www.trustwave.com/) * * You may not use this file except in compliance with * the License.  You may obtain a copy of the License at * *     http://www.apache.org/licenses/LICENSE-2.0 * * If any of the files related to licensing are missing or if you have any * other questions related to licensing please contact Trustwave Holdings, Inc. * directly using the email address security@modsecurity.org. */ #include #include #include #include #include #include #include #include #include #include #include #include #include #if APR_HAVE_UNISTD_H #include /* for getpid() */ #endif #include #include #include #include #include #include "msc_release.h" static void logc_shutdown(int rc); static void create_new_worker(int lock); static void error_log(int level, void *thread, const char *text, ...) PRINTF_ATTRIBUTE(3,4); /* -- Constants -- */ /* Error log levels. */ #define LOG_ERROR 1 #define LOG_WARNING 2 #define LOG_NOTICE 3 #define LOG_DEBUG 4 #define LOG_DEBUG2 5 /* The management thread will wake up every five seconds. */ #define MANAGER_SLEEP 5000000 #define MANAGER_SUBSLEEP 10000 /* Hack to allow multiple mlogc with single delete */ #define KEEP_ENTRIES_REMOVE_HACK 2600 #define KEEP_ENTRIES_REMOVE_TIME 0l #ifdef TEST_HACK #define TEST_WITH_RAND_SLEEP(n) \ do { \ int sec = rand()/(RAND_MAX/n); \ error_log(LOG_DEBUG2, NULL, "TEST_HACK: Sleeping for %ds", sec); \ apr_sleep(apr_time_from_sec(sec)); \ } while(0) #else #define TEST_WITH_RAND_SLEEP(n) #endif #define CAPTUREVECTORSIZE 60 #define PIPE_BUF_SIZE 65536 #define MEMALLOC_ERROR_MSG "Memory allocation failed!" #define VERSION MODSEC_VERSION #define CMDLINE_OPTS "fvh" #define TXIN 0 #define TXOUT 1 #define STATUSBUF_SIZE 256 #define ISHEXCHAR(X) ( ((X >= '0')&&(X <= '9')) \ || ((X >= 'a')&&(X <= 'f')) \ || ((X >= 'A')&&(X <= 'F')) ) /* -- Regex Patterns -- */ /** * This regular expression is used to parse the entire * log line we receive from Apache. The REQUEST_LINE is * treated as a single parameter to allow for invalid * requests. */ static const char logline_pattern[] = "^(\\S+)" "\\ (\\S+)\\ (\\S+)\\ (\\S+)" "\\ \\[([^:]+):(\\d+:\\d+:\\d+)\\ ([^\\]]+)\\]" "\\ \"(.*)\"" "\\ (\\d+)\\ (\\S+)" "\\ \"(.*)\"\\ \"(.*)\"" "\\ (\\S+)\\ \"(.*)\"" "\\ /?(\\S+)\\ (\\d+)\\ (\\d+)" "\\ (\\S+)" "(.*)$"; /** * This regular expression can be used to parse * a REQUEST_LINE field into method, URI, and * protocol. */ static const char requestline_pattern[] = "(\\S+)\\ (.*?)\\ (\\S+)"; /* -- Structures -- */ typedef struct { unsigned long int id; const char *line; apr_size_t line_size; } entry_t; /* -- Global variables -- */ static pid_t logc_pid = 0; static const char *conffile = NULL; static const char *lockfile = NULL; static int have_read_data = 0; static int checkpoint_interval = 60; static apr_time_t checkpoint_time_last = 0; static const char *collector_root = NULL; static apr_table_t *conf = NULL; static const char *console_uri = NULL; static apr_array_header_t *curl_handles = NULL; static int current_workers = 0; static int management_thread_active = 0; static unsigned long int entry_counter = 1; static const char *error_log_path = NULL; static apr_file_t *error_log_fd = NULL; static int error_log_level = 2; static apr_hash_t *in_progress = NULL; static int keep_alive = 150; /* Not used yet. */ static int keep_alive_timeout = 300; /* Not used yet. */ static int keep_entries = 0; static const char *log_repository = NULL; static void *logline_regex = NULL; static int max_connections = 10; static int max_worker_requests = 1000; static apr_global_mutex_t *gmutex = NULL; static apr_thread_mutex_t *mutex = NULL; static apr_pool_t *pool = NULL; static apr_pool_t *thread_pool = NULL; static apr_pool_t *recv_pool = NULL; static apr_array_header_t *queue = NULL; static const char *queue_path = NULL; static int ssl_validation = 0; static int tlsprotocol = 1; static curl_version_info_data* curlversion = NULL; /* static apr_time_t queue_time = 0; */ static void *requestline_regex = NULL; static int running = 0; static const char *sensor_password = NULL; static const char *sensor_username = NULL; static int server_error = 0; static apr_time_t server_error_last_check_time = 0; static int server_error_timeout = 60; static int startup_delay = 100; static int transaction_delay = 100; static const char *transaction_log_path = NULL; static apr_file_t *transaction_log_fd = NULL; /* -- Commandline opts -- */ static int opt_force = 0; /* -- Code -- */ static char *_log_escape(apr_pool_t *mp, const char *input, apr_size_t input_len) { static const char c2x_table[] = "0123456789abcdef"; unsigned char *d = NULL; char *ret = NULL; unsigned long int i; if (input == NULL) return NULL; ret = apr_palloc(mp, input_len * 4 + 1); if (ret == NULL) return NULL; d = (unsigned char *)ret; i = 0; while(i < input_len) { switch(input[i]) { case '"' : *d++ = '\\'; *d++ = '"'; break; case '\b' : *d++ = '\\'; *d++ = 'b'; break; case '\n' : *d++ = '\\'; *d++ = 'n'; break; case '\r' : *d++ = '\\'; *d++ = 'r'; break; case '\t' : *d++ = '\\'; *d++ = 't'; break; case '\v' : *d++ = '\\'; *d++ = 'v'; break; case '\\' : *d++ = '\\'; *d++ = '\\'; break; default : if ((input[i] <= 0x1f)||(input[i] >= 0x7f)) { *d++ = '\\'; *d++ = 'x'; *d++ = c2x_table[input[i] >> 4]; *d++ = c2x_table[input[i] & 0x0f]; } else { *d++ = input[i]; } break; } i++; } *d = 0; return ret; } /** * Converts a byte given as its hexadecimal representation * into a proper byte. Handles uppercase and lowercase letters * but does not check for overflows. */ static unsigned char x2c(unsigned char *what) { register unsigned char digit; digit = (what[0] >= 'A' ? ((what[0] & 0xdf) - 'A') + 10 : (what[0] - '0')); digit *= 16; digit += (what[1] >= 'A' ? ((what[1] & 0xdf) - 'A') + 10 : (what[1] - '0')); return digit; } /** * URL Decodes a string in-place */ static int urldecode_inplace(unsigned char *input, apr_size_t input_len) { unsigned char *d = (unsigned char *)input; apr_size_t i; if (input == NULL) return 0; i = 0; while (i < input_len) { if (input[i] == '%') { /* Character is a percent sign. */ /* Are there enough bytes available? */ if (i + 2 < input_len) { char c1 = input[i + 1]; char c2 = input[i + 2]; if (ISHEXCHAR(c1) && ISHEXCHAR(c2)) { /* Valid encoding - decode it. */ *d++ = x2c(&input[i + 1]); i += 3; } else { /* Not a valid encoding, skip this % */ *d++ = input[i++]; } } else { /* Not enough bytes available, copy the raw bytes. */ *d++ = input[i++]; } } else { /* Character is not a percent sign. */ if (input[i] == '+') { *d++ = ' '; } else { *d++ = input[i]; } i++; } } *d = '\0'; return 1; } /** * Detect a relative path and merge it with the collector root * path. Leave absolute paths as they are. */ static const char *file_path(const char *path) { char *newpath = NULL; apr_status_t rc; if (path == NULL) return NULL; rc = apr_filepath_merge(&newpath, collector_root, path, APR_FILEPATH_TRUENAME, pool); if ((newpath != NULL) && (rc == APR_SUCCESS || APR_STATUS_IS_EPATHWILD(rc) || APR_STATUS_IS_ENOENT(rc) || APR_STATUS_IS_ENOTDIR(rc))) { return newpath; } else { return NULL; } } /** * Returns the current datetime as a string. */ static char *current_logtime(char *dest, int dlen) { apr_time_exp_t t; apr_size_t len; apr_time_exp_lt(&t, apr_time_now()); apr_strftime(dest, &len, dlen, "%a %b %d %H:%M:%S %Y", &t); return dest; } /** * Logs error to the error log (if available) or * to the stderr. */ static void error_log(int level, void *thread, const char *text, ...) { char msg1[4096] = ""; char msg2[4096] = ""; char datetime[100]; va_list ap; if (level > error_log_level) return; va_start(ap, text); apr_vsnprintf(msg1, sizeof(msg1), text, ap); apr_snprintf(msg2, sizeof(msg2), "[%s] [%d] [%" APR_PID_T_FMT "/%pp] %s\n", current_logtime(datetime, sizeof(datetime)), level, logc_pid, (thread ? thread : 0), msg1); if (error_log_fd != NULL) { apr_size_t nbytes_written; apr_size_t nbytes = strlen(msg2); apr_file_write_full(error_log_fd, msg2, nbytes, &nbytes_written); } else { fprintf(stderr, "%s", msg2); } va_end(ap); } /** * Adds one entry to the internal queue. It will (optionally) start * a new thread to handle it. */ static void add_entry(const char *data, int start_worker) { entry_t *entry = NULL; entry = (entry_t *)malloc(sizeof(entry_t)); entry->id = 0; entry->line = strdup(data); entry->line_size = strlen(entry->line); error_log(LOG_DEBUG, NULL, "Queue locking thread mutex."); if (APR_STATUS_IS_EBUSY(apr_thread_mutex_trylock(mutex))) { error_log(LOG_DEBUG, NULL, "Queue waiting on thread mutex."); apr_thread_mutex_lock(mutex); } /* Assign unique ID to this log entry. */ entry->id = entry_counter++; /* Add the new audit log entry to the queue. */ *(entry_t **)apr_array_push(queue) = entry; /* Create a new worker if we can, but not if there is a * known problem with the server. */ if ( (start_worker != 0) && (current_workers < max_connections)&&(server_error == 0)) { create_new_worker(0); } error_log(LOG_DEBUG, NULL, "Queue unlocking thread mutex."); apr_thread_mutex_unlock(mutex); } /** * Read the queue entries. */ static int read_queue_entries(apr_file_t *fd, apr_time_t *queue_time) { char linebuf[4100]; int line_count = -1; int line_size = 0; apr_status_t rc = 0; char *p = NULL; for(;;) { memset(linebuf, 0, 4100); rc = apr_file_gets(linebuf, 4096, fd); if (rc == APR_EOF) break; if (rc != APR_SUCCESS) { error_log(LOG_ERROR, NULL, "Error reading from the queue file."); logc_shutdown(1); } if (line_count < 0) { /* First line contains the queue time. */ *queue_time = (apr_time_t)apr_atoi64(linebuf); line_count = 0; continue; } p = &linebuf[0]; line_size = strlen(p); /* Remove the \n from the end of the line. */ while(*p != '\0' && line_size > 0) { if (*p == '\n') { *p = '\0'; break; } p++; line_size--; } if (linebuf[0] == '#') { /* Ignore comments. */ continue; } add_entry((const char *)&linebuf, 0); line_count++; } apr_file_close(fd); return line_count; } /** * Initialise the transaction log. This code should be * executed only once at startup. */ static void transaction_log_init(void) { /* ENH: These big enough? */ char new_queue_path[256]; char old_queue_path[256]; apr_file_t *queue_fd = NULL; apr_time_t queue_time; apr_snprintf(new_queue_path, sizeof(new_queue_path), "%s.new", queue_path); apr_snprintf(old_queue_path, sizeof(old_queue_path), "%s.old", queue_path); /* Put a lock in place to ensure exclusivity. */ error_log(LOG_DEBUG, NULL, "Transaction initialization locking global mutex."); if (APR_STATUS_IS_EBUSY(apr_global_mutex_trylock(gmutex))) { error_log(LOG_DEBUG, NULL, "Transaction initialization waiting on global mutex."); apr_global_mutex_lock(gmutex); } error_log(LOG_DEBUG, NULL, "Transaction initialization started."); /* Delete .new file if there is one. */ apr_file_remove(new_queue_path, pool); /* Read in the data from the queue. */ if (apr_file_open(&queue_fd, queue_path, APR_READ | APR_FILE_NOCLEANUP, 0, pool) == APR_SUCCESS) { int line_count = read_queue_entries(queue_fd, &queue_time); apr_file_close(queue_fd); if (line_count > 0) { error_log(LOG_NOTICE, NULL, "Loaded %d entries from the queue file.", line_count); } } /* Try the old queue file. */ else if (apr_file_open(&queue_fd, old_queue_path, APR_READ | APR_FILE_NOCLEANUP, 0, pool) == APR_SUCCESS) { int line_count = read_queue_entries(queue_fd, &queue_time); apr_file_close(queue_fd); error_log(LOG_NOTICE, NULL, "Loaded %d entries from the OLD queue file.", line_count); apr_file_rename(old_queue_path, queue_path, pool); } else { error_log(LOG_NOTICE, NULL, "Queue file not found. New one will be created."); } /* Delete the old queue file. */ apr_file_remove(old_queue_path, pool); checkpoint_time_last = apr_time_now(); /* Start fresh with the transaction log. Do note that * we do not truncate the transaction log on purpose. Apache * will start copies of piped logging binaries during configuration * testing. Truncating would erase the log of a currently running * instance. */ if (apr_file_open(&transaction_log_fd, transaction_log_path, APR_WRITE | APR_CREATE | APR_APPEND | APR_XTHREAD, APR_OS_DEFAULT, pool) != APR_SUCCESS) { error_log(LOG_ERROR, NULL, "Failed to open the transaction log: %s\n", transaction_log_path); error_log(LOG_DEBUG, NULL, "Transaction initialization unlocking global mutex."); apr_global_mutex_unlock(gmutex); logc_shutdown(1); } error_log(LOG_DEBUG, NULL, "Transaction initialization completed."); /* Unlock */ error_log(LOG_DEBUG, NULL, "Transaction initialization unlocking global mutex."); apr_global_mutex_unlock(gmutex); } /** * Log entry event (incoming or outgoing) to the transaction log. */ static void transaction_log(int direction, const char *entry) { apr_size_t nbytes, nbytes_written; char msg[8196] = ""; apr_snprintf(msg, sizeof(msg), "%u %s: %s\n", (unsigned int)apr_time_sec(apr_time_now()), (direction == TXIN ? "IN" : "OUT"), entry); nbytes = strlen(msg); apr_file_write_full(transaction_log_fd, msg, nbytes, &nbytes_written); } /** * Executes a checkpoint, which causes the current queue to be * written to a file and the transaction log to be truncated. */ static void transaction_checkpoint(void) { /* ENH: These big enough? */ char new_queue_path[256]; char old_queue_path[256]; apr_file_t *queue_fd = NULL; apr_hash_index_t *hi = NULL; char msg[256]; int i; apr_pool_t *cpool; apr_snprintf(new_queue_path, sizeof(new_queue_path), "%s.new", queue_path); apr_snprintf(old_queue_path, sizeof(old_queue_path), "%s.old", queue_path); apr_snprintf(msg, sizeof(msg), "%u\n", (unsigned int)apr_time_sec(apr_time_now())); if (! have_read_data) { error_log(LOG_DEBUG, NULL, "Checkpoint not required."); return; } /* Put a lock in place to ensure exclusivity. */ error_log(LOG_DEBUG, NULL, "Checkpoint locking global mutex."); if (APR_STATUS_IS_EBUSY(apr_global_mutex_trylock(gmutex))) { error_log(LOG_DEBUG, NULL, "Checkpoint waiting on global mutex."); apr_global_mutex_lock(gmutex); } error_log(LOG_DEBUG, NULL, "Checkpoint started."); apr_pool_create(&cpool, NULL); /* Dump active entries into a new queue file. */ if (apr_file_open(&queue_fd, new_queue_path, APR_WRITE | APR_CREATE | APR_EXCL | APR_TRUNCATE | APR_FILE_NOCLEANUP, APR_OS_DEFAULT, cpool) != APR_SUCCESS) { error_log(LOG_ERROR, NULL, "Failed to create file: %s", new_queue_path); error_log(LOG_DEBUG, NULL, "Checkpoint unlocking global mutex."); apr_pool_destroy(cpool); apr_global_mutex_unlock(gmutex); return; } /* Write the time first. */ apr_file_write_full(queue_fd, msg, strlen(msg), NULL); /* Dump the entries sitting in the queue first. */ for (i = 0; i < queue->nelts; i++) { entry_t *entry = ((entry_t **)queue->elts)[i]; apr_file_write_full(queue_fd, entry->line, entry->line_size, NULL); apr_file_write_full(queue_fd, &"\n", 1, NULL); } error_log(LOG_DEBUG2, NULL, "Checkpoint wrote %d queued entries to new queue.", i); /* Then dump the ones that are currently being processed. */ i = 0; for (hi = apr_hash_first(NULL, in_progress); hi != NULL; hi = apr_hash_next(hi))\ { void *e; entry_t *entry = NULL; i++; apr_hash_this(hi, NULL, NULL, &e); entry = e; /* quiet type-punned warning */ apr_file_write_full(queue_fd, entry->line, entry->line_size, NULL); apr_file_write_full(queue_fd, &"\n", 1, NULL); } error_log(LOG_DEBUG2, NULL, "Checkpoint wrote %d additional entries to new queue.", i); apr_file_close(queue_fd); /* Switch the files and truncate the transaction log file. */ apr_file_remove(old_queue_path, cpool); apr_file_rename(queue_path, old_queue_path, cpool); apr_file_rename(new_queue_path, queue_path, cpool); apr_file_remove(old_queue_path, cpool); apr_file_trunc(transaction_log_fd, 0); error_log(LOG_DEBUG, NULL, "Checkpoint completed."); apr_pool_destroy(cpool); /* Unlock and exit. */ error_log(LOG_DEBUG, NULL, "Checkpoint unlocking global mutex."); apr_global_mutex_unlock(gmutex); } /** * Parse one confguration line and add it to the * configuration table. */ static void parse_configuration_line(const char *line, int line_count) { char *start = NULL, *command = NULL; char *p = NULL; /* Remove the trailing newline character. */ p = (char *)line; while(*p != '\0') p++; if ((p > start)&&(*(p - 1) == '\n')) *(p - 1) = '\0'; p = (char *)line; /* Ignore whitespace at the beginning of the line. */ while(apr_isspace(*p)) p++; /* Ignore empty lines and comments. */ if ((*p == '\0')||(*p == '#')) return; start = p; while(!apr_isspace(*p)&&(*p != '\0')) p++; command = apr_pstrmemdup(pool, start, p - start); while(apr_isspace(*p)) p++; /* Remove whitespace at the end. */ start = p; while(*p != '\0') p++; if (p > start) { p--; while(apr_isspace(*p)) { *p-- = '\0'; } } /* Remove quotes, but only if we have matching */ if ((*start == '"') && (p > start) && (*p == '"')) { start++; *p-- = '\0'; } /* Take the last directive */ /* ENH: Error on dup directives? */ apr_table_set(conf, command, start); } /** * Reads configuration from a file. */ static void read_configuration(void) { char linebuf[4096]; apr_status_t rc; apr_file_t *fd; int line_count; conf = apr_table_make(pool, 32); if (conf == NULL) { error_log(LOG_ERROR, NULL, MEMALLOC_ERROR_MSG); logc_shutdown(1); } rc = apr_file_open(&fd, conffile, APR_READ | APR_FILE_NOCLEANUP, 0, pool); if (rc != APR_SUCCESS) { error_log(LOG_ERROR, NULL, "Unable to open configuration file: %s", conffile); logc_shutdown(1); } line_count = 0; for(;;) { rc = apr_file_gets(linebuf, 4096, fd); if (rc == APR_EOF) return; if (rc != APR_SUCCESS) { error_log(LOG_ERROR, NULL, "Error reading from the configuration file."); logc_shutdown(1); } line_count++; parse_configuration_line(linebuf, line_count); } apr_file_close(fd); } /** * Initialize the configuration. */ static void init_configuration(void) { char errstr[1024]; apr_status_t rc = 0; const char *s = NULL; /* Other values may be based off the collector root. */ s = apr_table_get(conf, "CollectorRoot"); if (s != NULL) { collector_root = s; } /* Error Log */ s = apr_table_get(conf, "ErrorLog"); if (s != NULL) { error_log_path = file_path(s); } s = apr_table_get(conf, "ErrorLogLevel"); if (s != NULL) { error_log_level = atoi(s); } if ((rc = apr_file_open(&error_log_fd, error_log_path, APR_WRITE | APR_CREATE | APR_APPEND, APR_OS_DEFAULT, pool)) != APR_SUCCESS) { error_log(LOG_ERROR, NULL, "Failed to open the error log %s: %s\n", error_log_path, apr_strerror(rc, errstr, 1024)); logc_shutdown(1); } error_log(LOG_NOTICE, NULL, "Configuring ModSecurity Audit Log Collector %s.", VERSION); /* Startup Delay */ s = apr_table_get(conf, "StartupDelay"); if (s != NULL) { startup_delay = atoi(s); } /* TLS Protocol - TLSv1(0) TLSv1.1(1) TLSv1.2(2) (SSLv3 not supported) */ s = apr_table_get(conf, "TLSProtocol"); if (s != NULL) { int num = atoi(s); switch (num) { case 0: tlsprotocol = 0; break; case 1: tlsprotocol = 1; break; case 2: tlsprotocol = 2; break; default: tlsprotocol = 2; /* Default is TLSv1.2 */ } } curlversion = curl_version_info(CURLVERSION_NOW); if ( startup_delay > 0 ) { error_log(LOG_NOTICE, NULL, "Delaying execution for %dms.", startup_delay); apr_sleep(startup_delay * 1000); error_log(LOG_DEBUG, NULL, "Continuing execution after %dms delay.", startup_delay); } /* Remaining Configuration */ error_log(LOG_DEBUG2, NULL, "CollectorRoot=%s", collector_root); error_log(LOG_DEBUG2, NULL, "ErrorLog=%s", error_log_path); error_log(LOG_DEBUG2, NULL, "ErrorLogLevel=%d", error_log_level); error_log(LOG_DEBUG2, NULL, "StartupDelay=%d", startup_delay); error_log(LOG_DEBUG2, NULL, "TLSProtocol=%d", tlsprotocol); error_log(LOG_DEBUG2, NULL, "cURL version=%s", curlversion->version); s = apr_table_get(conf, "CheckpointInterval"); if (s != NULL) { checkpoint_interval = atoi(s); error_log(LOG_DEBUG2, NULL, "CheckpointInterval=%d", checkpoint_interval); } s = apr_table_get(conf, "InsecureNoCheckCert"); if (s != NULL) { int num = atoi(s); if (num) { ssl_validation = 0; } else { ssl_validation = 1; } error_log(LOG_DEBUG2, NULL, "InsecureNoCheckCert=%d", num); } s = apr_table_get(conf, "QueuePath"); if (s != NULL) { queue_path = file_path(s); error_log(LOG_DEBUG2, NULL, "QueuePath=%s", queue_path); } else { error_log(LOG_ERROR, NULL, "QueuePath not defined in the configuration file."); logc_shutdown(1); } s = apr_table_get(conf, "LockFile"); if (s != NULL) { lockfile = file_path(s); error_log(LOG_DEBUG2, NULL, "LockFile=%s", lockfile); } s = apr_table_get(conf, "ServerErrorTimeout"); if (s != NULL) { server_error_timeout = atoi(s); error_log(LOG_DEBUG2, NULL, "ServerErrorTimeout=%d", server_error_timeout); } s = apr_table_get(conf, "TransactionDelay"); if (s != NULL) { transaction_delay = atoi(s); error_log(LOG_DEBUG2, NULL, "TransactionDelay=%d", transaction_delay); } s = apr_table_get(conf, "TransactionLog"); if (s != NULL) { transaction_log_path = file_path(s); error_log(LOG_DEBUG2, NULL, "TransactionLog=%s", transaction_log_path); } s = apr_table_get(conf, "MaxConnections"); if (s != NULL) { int v = atoi(s); if (v >= 0) max_connections = v; error_log(LOG_DEBUG2, NULL, "MaxConnections=%d", max_connections); } s = apr_table_get(conf, "MaxWorkerRequests"); if (s != NULL) { int v = atoi(s); if (v >= 0) max_worker_requests = v; error_log(LOG_DEBUG2, NULL, "MaxWorkerRequests=%d", max_worker_requests); } s = apr_table_get(conf, "KeepAlive"); if (s != NULL) { int v = atoi(s); if (v >= 0) keep_alive = v; error_log(LOG_DEBUG2, NULL, "KeepAlive=%d", keep_alive); } s = apr_table_get(conf, "KeepAliveTimeout"); if (s != NULL) { int v = atoi(s); if (v >= 0) keep_alive_timeout = v; error_log(LOG_DEBUG2, NULL, "KeepAliveTimeout=%d", keep_alive_timeout); } s = apr_table_get(conf, "LogStorageDir"); if (s != NULL) { log_repository = file_path(s); error_log(LOG_DEBUG2, NULL, "LogStorageDir=%s", log_repository); } else { error_log(LOG_ERROR, NULL, "Missing mandatory parameter LogStorageDir.\n"); logc_shutdown(1); } s = apr_table_get(conf, "ConsoleURI"); if (s != NULL) { console_uri = s; error_log(LOG_DEBUG2, NULL, "ConsoleURI=%s", console_uri); } else { error_log(LOG_ERROR, NULL, "Missing mandatory parameter ConsoleURI.\n"); logc_shutdown(1); } s = apr_table_get(conf, "SensorUsername"); if (s != NULL) { sensor_username = s; error_log(LOG_DEBUG2, NULL, "SensorUsername=%s", sensor_username); } else { error_log(LOG_ERROR, NULL, "Missing mandatory parameter SensorUsername.\n"); logc_shutdown(1); } s = apr_table_get(conf, "SensorPassword"); if (s != NULL) { sensor_password = s; error_log(LOG_DEBUG2, NULL, "SensorPassword=%s", sensor_password); } else { error_log(LOG_ERROR, NULL, "Missing mandatory parameter SensorPassword.\n"); logc_shutdown(1); } s = apr_table_get(conf, "KeepEntries"); if (s != NULL) { keep_entries = atoi(s); } else { keep_entries = 0; } error_log(LOG_DEBUG2, NULL, "KeepEntries=%d", keep_entries); } /** * Clean-up resources before process shutdown. */ static void logc_cleanup(void) { curl_global_cleanup(); } /** * Shutdown the logger. */ static void logc_shutdown(int rc) { /* Tell the threads to shut down. */ running = 0; error_log(LOG_DEBUG, NULL, "Shutting down"); /* Wait for the management thread to stop */ /* ENH: Need a fixed timeout if this never happens */ while(management_thread_active != 0) { apr_sleep(10 * 1000); } if (rc == 0) { error_log(LOG_NOTICE, NULL, "ModSecurity Audit Log Collector %s terminating normally.", VERSION); } else { error_log(LOG_NOTICE, NULL, "ModSecurity Audit Log Collector %s " "terminating with error %d", VERSION, rc); } if (error_log_fd != NULL) { apr_file_flush(error_log_fd); } exit(rc); } /** * Handle signals. */ static int handle_signals(int signum) { switch (signum) { case SIGINT: error_log(LOG_NOTICE, NULL, "Caught SIGINT, shutting down."); logc_shutdown(0); case SIGTERM: error_log(LOG_NOTICE, NULL, "Caught SIGTERM, shutting down."); logc_shutdown(0); #ifndef WIN32 case SIGHUP: error_log(LOG_NOTICE, NULL, "Caught SIGHUP, ignored."); /* ENH: reload config? */ return 0; case SIGALRM: error_log(LOG_DEBUG, NULL, "Caught SIGALRM, ignored."); return 0; case SIGTSTP: error_log(LOG_DEBUG, NULL, "Caught SIGTSTP, ignored."); return 0; #endif /* WIN32 */ } #ifndef WIN32 error_log(LOG_NOTICE, NULL, "Caught unexpected signal %d: %s", signum, apr_signal_description_get(signum)); #else error_log(LOG_NOTICE, NULL, "Caught unexpected signal %d", signum); #endif /* WIN32 */ logc_shutdown(1); return 0; /* should never reach */ } #ifdef WIN32 /** * This function is invoked by Curl to read the source file on Windows */ static size_t curl_readfunction(void *ptr, size_t size, size_t nmemb, void *stream) { return fread(ptr, size, nmemb, (FILE *)stream); } #endif /** * This function is invoked by Curl to read the response * body. Since we don't care about the response body the function * pretends it is retrieving data where it isn't. */ static size_t curl_writefunction(void *ptr, size_t size, size_t nmemb, void *stream) { unsigned char *data = (unsigned char *)ptr; unsigned char *status = (unsigned char *)stream; /* Grab the status line text from the first line of output */ if ((status[0] == 0) && (status[1] == 1)) { apr_size_t i, j; int ismsg = 0; status[1] = 0; /* reset hidden init flag */ for (i = 0, j = 0; i < STATUSBUF_SIZE; i++) { /* We found a line ending so we are done */ if ( data[i] == '\r' ) { break; } /* Skip to after the first space (where msg is) */ if (ismsg < 3) { if ((ismsg == 1) && !isspace(data[i])) { ismsg++; } else if (isspace(data[i])) { ismsg++; } continue; } /* Copy data (msg) from data to status */ status[j++] = data[i]; } status[j] = '\0'; urldecode_inplace(status, j); } /* do nothing */ return (size * nmemb); } /** * This function is invoked by Curl whenever it has something * to say. We forward its messages to the error log at level * DEBUG or DEBUG2 depending on the verbosity. */ static int curl_debugfunction(CURL *curl, curl_infotype infotype, char *data, size_t datalen, void *ourdata) { apr_size_t i, effectivelen; apr_thread_t *thread = (apr_thread_t *)ourdata; if (error_log_level < LOG_DEBUG) return 0; effectivelen = datalen; for(i = 0; i < datalen; i++) { if ((data[i] == 0x0a)||(data[i] == 0x0d)) { effectivelen = i; break; } } switch (infotype) { case CURLINFO_TEXT: /* More verbose data starts with an indent */ if (apr_isspace(data[0])) { char *dataptr = data + 1; /* Skip initial whitespace (indent) */ while ( ((size_t)(dataptr - data) > datalen) && apr_isspace(*dataptr)) dataptr++; dataptr++; error_log(LOG_DEBUG2, thread, "CURL: %s", _log_escape(apr_thread_pool_get(thread), dataptr, effectivelen - (dataptr - data))); } else { error_log(LOG_DEBUG, thread, "CURL: %s", _log_escape(apr_thread_pool_get(thread), data, effectivelen)); } break; case CURLINFO_HEADER_IN: error_log(LOG_DEBUG, thread, "CURL: HEADER_IN %s", _log_escape(apr_thread_pool_get(thread), data, effectivelen)); break; case CURLINFO_HEADER_OUT: error_log(LOG_DEBUG, thread, "CURL: HEADER_OUT %s", _log_escape(apr_thread_pool_get(thread), data, effectivelen)); break; case CURLINFO_DATA_IN: error_log(LOG_DEBUG2, thread, "CURL: DATA_IN %s", _log_escape(apr_thread_pool_get(thread), data, effectivelen)); break; case CURLINFO_DATA_OUT: error_log(LOG_DEBUG2, thread, "CURL: DATA_OUT %s", _log_escape(apr_thread_pool_get(thread), data, effectivelen)); break; default: /* Ignore anything else */ break; } return 0; } /** * Initialise the necessary resources and structures. */ static void logc_init(void) { char errstr[1024]; apr_status_t rc = 0; const char *errptr = NULL; int i, erroffset; /* cURL major, minor and patch version */ short cmaj, cmin, cpat = 0; queue = apr_array_make(pool, 64, sizeof(entry_t *)); if (queue == NULL) { error_log(LOG_ERROR, NULL, MEMALLOC_ERROR_MSG); logc_shutdown(1); } in_progress = apr_hash_make(pool); if (in_progress == NULL) { error_log(LOG_ERROR, NULL, MEMALLOC_ERROR_MSG); logc_shutdown(1); } if ((rc = apr_global_mutex_create(&gmutex, lockfile, APR_LOCK_DEFAULT, pool)) != APR_SUCCESS) { error_log(LOG_ERROR, NULL, "Failed to create global mutex: %s", apr_strerror(rc, errstr, 1024)); logc_shutdown(1); } if ((rc = apr_thread_mutex_create(&mutex, APR_THREAD_MUTEX_UNNESTED, pool)) != APR_SUCCESS) { error_log(LOG_ERROR, NULL, "Failed to create thread mutex: %s", apr_strerror(rc, errstr, 1024)); logc_shutdown(1); } entry_counter = 1; curl_handles = apr_array_make(pool, max_connections, sizeof(CURL *)); if (curl_handles == NULL) { error_log(LOG_ERROR, NULL, MEMALLOC_ERROR_MSG); logc_shutdown(1); } /* Initialise a number of Curl handles. */ for(i = 0; i < max_connections; i++) { CURL *curl = NULL; /* Create cURL handle. */ curl = curl_easy_init(); /* Pre-configure the handle. */ curl_easy_setopt(curl, CURLOPT_UPLOAD, TRUE); curl_easy_setopt(curl, CURLOPT_ERRORBUFFER, (char *)NULL); curl_easy_setopt(curl, CURLOPT_URL, console_uri); curl_easy_setopt(curl, CURLOPT_HTTPAUTH, CURLAUTH_BASIC); if (ssl_validation) { curl_easy_setopt(curl, CURLOPT_SSL_VERIFYPEER, 1); curl_easy_setopt(curl, CURLOPT_SSL_VERIFYHOST, 1); } else { curl_easy_setopt(curl, CURLOPT_SSL_VERIFYPEER, FALSE); curl_easy_setopt(curl, CURLOPT_SSL_VERIFYHOST, 0); } /* Seems like CURL_SSLVERSION_TLSv1_2 is not supported on libcurl * < v7.34.0 * * version_num is a 24 bit number created like this: * <8 bits major number> | <8 bits minor number> | <8 bits patch number>. */ switch (tlsprotocol) { case 0: curl_easy_setopt(curl, CURLOPT_SSLVERSION, CURL_SSLVERSION_TLSv1_0); break; case 1: curl_easy_setopt(curl, CURLOPT_SSLVERSION, CURL_SSLVERSION_TLSv1_1); break; case 2: curl_easy_setopt(curl, CURLOPT_SSLVERSION, CURL_SSLVERSION_TLSv1_2); break; default: curl_easy_setopt(curl, CURLOPT_SSLVERSION, CURL_SSLVERSION_TLSv1_2); break; } cmaj = curlversion->version_num >> 16; cmin = (curlversion->version_num & 0x00ff00) >> 8; cpat = (curlversion->version_num & 0x0000ff); /* If cURL version < v7.34.0, use TLS v1.x */ if (cmaj <= 7 && cmin < 34) { curl_easy_setopt(curl, CURLOPT_SSLVERSION, CURL_SSLVERSION_TLSv1); } curl_easy_setopt(curl, CURLOPT_CONNECTTIMEOUT, 15); curl_easy_setopt(curl, CURLOPT_NOSIGNAL, TRUE); curl_easy_setopt(curl, CURLOPT_HEADER, TRUE); curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION, curl_writefunction); *(CURL **)apr_array_push(curl_handles) = curl; } if (cmaj <= 7 && cmin < 34) { error_log(LOG_DEBUG2, NULL, "TLSv1.2 is unsupported in cURL %d.%d.%d", cmaj, cmin, cpat); } logline_regex = pcre_compile(logline_pattern, PCRE_CASELESS, &errptr, &erroffset, NULL); if (logline_regex == NULL) { error_log(LOG_ERROR, NULL, "Failed to compile pattern: %s\n", logline_pattern); logc_shutdown(1); } requestline_regex = pcre_compile(requestline_pattern, PCRE_CASELESS, &errptr, &erroffset, NULL); if (requestline_regex == NULL) { error_log(LOG_ERROR, NULL, "Failed to compile pattern: %s\n", requestline_pattern); logc_shutdown(1); } } /** * HACK: To allow two mlogcs running against a single dataset we use the * mtime as a flag for deletion. * * 1) Check file date. * 2) If it is KEEP_ENTRIES_REMOVE_TIME, then remove the file. * 3) Otherwise set the date and let the other mlogc remove it. */ static void keep_entries_hack(apr_pool_t *mp, apr_thread_t *thread, const char *fn) { apr_file_t *f = NULL; apr_finfo_t finfo; char errstr[1024]; apr_status_t rc; /* Opening for write as required for exclusive lock */ if ((rc = apr_file_open(&f, fn, APR_READ|APR_WRITE|APR_APPEND, APR_OS_DEFAULT, mp)) != APR_SUCCESS) { error_log(LOG_ERROR, thread, "Could not open \"%s\": %s", fn, apr_strerror(rc, errstr, 1024)); return; } if ((rc = apr_file_lock(f, APR_FLOCK_EXCLUSIVE|APR_FLOCK_NONBLOCK)) != APR_SUCCESS) { error_log(LOG_DEBUG2, thread, "Waiting for lock on \"%s\": %s", fn, apr_strerror(rc, errstr, 1024)); if ((rc = apr_file_lock(f, APR_FLOCK_EXCLUSIVE)) != APR_SUCCESS) { error_log(LOG_ERROR, thread, "Could not lock \"%s\": %s", fn, apr_strerror(rc, errstr, 1024)); apr_file_close(f); return; } } error_log(LOG_DEBUG2, thread, "Locked: %s", fn); /* For testing only */ TEST_WITH_RAND_SLEEP(2); if ((rc = apr_stat(&finfo, fn, APR_FINFO_MIN, mp)) != APR_SUCCESS) { error_log(LOG_ERROR, thread, "Could not stat \"%s\": %s", fn, apr_strerror(rc, errstr, 1024)); error_log(LOG_DEBUG2, thread, "Unlocked: %s", fn); apr_file_close(f); return; } if (error_log_level >= LOG_DEBUG) { error_log(LOG_DEBUG, thread, "STAT \"%s\" {" "uid=%d; gid=%d; size=%" APR_OFF_T_FMT "; " "csize=%" APR_OFF_T_FMT "; atime=%" APR_TIME_T_FMT "; " "ctime=%" APR_TIME_T_FMT "; mtime=%" APR_TIME_T_FMT "}", fn, finfo.user, finfo.group, finfo.size, finfo.csize, finfo.atime, finfo.ctime, finfo.mtime); } if (finfo.mtime != KEEP_ENTRIES_REMOVE_TIME) { error_log(LOG_DEBUG2, thread, "Set mtime: %s", fn); if ((rc = apr_file_mtime_set(fn, (apr_time_t)KEEP_ENTRIES_REMOVE_TIME, mp)) != APR_SUCCESS) { error_log(LOG_ERROR, thread, "Could not set mtime on \"%s\": %s", fn, apr_strerror(rc, errstr, 1024)); } error_log(LOG_DEBUG2, thread, "Unlocked: %s", fn); apr_file_close(f); return; } error_log(LOG_DEBUG, thread, "Removing: %s", fn); error_log(LOG_DEBUG2, thread, "Unlocked: %s", fn); apr_file_close(f); apr_file_remove(fn, mp); } /** * Worker thread. Works in a loop, fetching jobs from the queue, * until the queue is empty or it is otherwise told to quit. */ static void * APR_THREAD_FUNC thread_worker(apr_thread_t *thread, void *data) { unsigned int loop_count = 0; CURL *curl = (CURL *)data; entry_t **entryptr = NULL; entry_t *entry = NULL; apr_status_t rc; apr_finfo_t finfo; int capturevector[CAPTUREVECTORSIZE]; int take_new = 1; apr_pool_t *tpool; struct curl_slist *headerlist = NULL; char curl_error_buffer[CURL_ERROR_SIZE] = ""; int num_requests = 0; /* There is no need to do the sleep if this was an invalid entry * as the sleep is just to protect flooding the console server * with rapid requests. With an invalid entry we never hit the * server, so we should not delay processing the next event. */ int nodelay = 0; error_log(LOG_DEBUG, thread, "Worker thread starting."); /* Each worker uses its own pool to manage memory. To avoid * memory leaks the pool is cleared after each processed * entry. */ apr_pool_create(&tpool, thread_pool); /* Process jobs in a queue until there are no more jobs to process. */ for(;;) { nodelay = 0; /* Do we need to shut down? */ if (running == 0) { error_log(LOG_DEBUG, thread, "We were told to shut down."); goto THREAD_SHUTDOWN; } /* Is there a problem with the server? We need * to shut down if there is. Except that we don't * want to shut down if we were launched to investigate * if the server came back online (loop_count will be * zero in that case). */ if ((server_error == 1)&&(loop_count != 0)) { error_log(LOG_DEBUG, thread, "Shutting down due to server error."); goto THREAD_SHUTDOWN; } loop_count++; /* Get a new entry, but only if we need one. */ if (take_new) { error_log(LOG_DEBUG, thread, "Worker fetch locking thread mutex."); if (APR_STATUS_IS_EBUSY(apr_thread_mutex_trylock(mutex))) { error_log(LOG_DEBUG, thread, "Worker fetch waiting on thread mutex."); apr_thread_mutex_lock(mutex); } error_log(LOG_DEBUG, thread, "Worker fetch started."); /* Deal with the previous entry. */ if (entry != NULL) { error_log(LOG_DEBUG, thread, "Removing previous entry from storage."); transaction_log(TXOUT, entry->line); /* Remove previous entry from storage. */ apr_hash_set(in_progress, &entry->id, sizeof(entry->id), NULL); /* Release the memory it used to occupy. */ free((void *)entry->line); free(entry); entry = NULL; } error_log(LOG_DEBUG, thread, "Getting one entry from the queue."); /* Get one entry. */ entryptr = (entry_t **)apr_array_pop(queue); if (entryptr == NULL) { error_log(LOG_DEBUG, thread, "Worker fetch unlocking thread mutex."); apr_thread_mutex_unlock(mutex); error_log(LOG_DEBUG, thread, "No more work for this thread, exiting."); goto THREAD_SHUTDOWN; } entry = *entryptr; apr_hash_set(in_progress, &entry->id, sizeof(entry->id), entry); error_log(LOG_DEBUG, thread, "Worker fetch completed."); error_log(LOG_DEBUG, thread, "Worker fetch unlocking thread mutex."); apr_thread_mutex_unlock(mutex); } /* Send one entry. */ error_log(LOG_DEBUG, thread, "Processing entry."); take_new = 0; /* Keep track of requests processed if we need to */ if (max_worker_requests > 0) { num_requests++; } rc = pcre_exec(logline_regex, NULL, entry->line, entry->line_size, 0, 0, capturevector, CAPTUREVECTORSIZE); if (rc == PCRE_ERROR_NOMATCH) { /* No match. */ error_log(LOG_WARNING, thread, "Invalid entry (failed to match regex): %s", _log_escape(tpool, entry->line, entry->line_size)); take_new = 1; nodelay = 1; } else if (rc < 0) { /* Error condition. */ error_log(LOG_WARNING, thread, "Invalid entry (PCRE error %d): %s", rc, _log_escape(tpool, entry->line, entry->line_size)); take_new = 1; nodelay = 1; } else { /* We have a match. */ char *uniqueid = NULL; char *auditlogentry = NULL; char *hash = NULL; char *summary = NULL; char *credentials = NULL; error_log(LOG_DEBUG, thread, "Regular expression matched."); /* For testing only */ TEST_WITH_RAND_SLEEP(2); uniqueid = apr_psprintf(tpool, "%.*s", (capturevector[2*13+1] - capturevector[2*13]), (entry->line + capturevector[2*13])); auditlogentry = apr_psprintf(tpool, "%s/%.*s", log_repository, (capturevector[2*15+1] - capturevector[2*15]), (entry->line + capturevector[2*15])); hash = apr_psprintf(tpool, "X-Content-Hash: %.*s", (capturevector[2*18+1] - capturevector[2*15]), (entry->line + capturevector[2*18])); summary = apr_psprintf(tpool, "X-ForensicLog-Summary: %s", entry->line); credentials = apr_psprintf(tpool, "%s:%s", sensor_username, sensor_password); rc = apr_stat(&finfo, auditlogentry, APR_FINFO_SIZE, tpool); if (rc == APR_SUCCESS) { FILE *hd_src; char response_buf[STATUSBUF_SIZE]; CURLcode res; if (error_log_level >= LOG_DEBUG) { error_log(LOG_DEBUG, thread, "STAT \"%s\" {" "uid=%d; gid=%d; size=%" APR_OFF_T_FMT "; " "csize=%" APR_OFF_T_FMT "; " "atime=%" APR_TIME_T_FMT "; " "ctime=%" APR_TIME_T_FMT "; " "mtime=%" APR_TIME_T_FMT "}", auditlogentry, finfo.user, finfo.group, finfo.size, finfo.csize, finfo.atime, finfo.ctime, finfo.mtime); } /* Initialize the respone buffer with a hidden value */ response_buf[0] = 0; response_buf[1] = 1; if (finfo.size == 0) { error_log(LOG_WARNING, thread, "File found (%" APR_OFF_T_FMT " bytes), skipping.", finfo.size); take_new = 1; nodelay = 1; goto THREAD_CLEANUP; } else { error_log(LOG_DEBUG, thread, "File found (%" APR_OFF_T_FMT " bytes), activating cURL.", finfo.size); } curl_easy_setopt(curl, CURLOPT_VERBOSE, 1); curl_easy_setopt(curl, CURLOPT_DEBUGFUNCTION, curl_debugfunction); curl_easy_setopt(curl, CURLOPT_DEBUGDATA, thread); curl_easy_setopt(curl, CURLOPT_ERRORBUFFER, curl_error_buffer); curl_easy_setopt(curl, CURLOPT_USERPWD, credentials); curl_easy_setopt(curl, CURLOPT_WRITEDATA, (char *)response_buf); headerlist = curl_slist_append(headerlist, "Expect:"); headerlist = curl_slist_append(headerlist, hash); headerlist = curl_slist_append(headerlist, summary); curl_easy_setopt(curl, CURLOPT_HTTPHEADER, headerlist); hd_src = fopen(auditlogentry, "rb"); if (hd_src == NULL) { error_log(LOG_WARNING, thread, "Invalid entry (failed to open file for " "reading): %s", auditlogentry); take_new = 1; nodelay = 1; goto THREAD_CLEANUP; } curl_easy_setopt(curl, CURLOPT_READDATA, hd_src); curl_easy_setopt(curl, CURLOPT_INFILESIZE_LARGE, finfo.size); #ifdef WIN32 /* Mandatory on win32 */ curl_easy_setopt(curl, CURLOPT_READFUNCTION, curl_readfunction); #endif res = curl_easy_perform(curl); fclose(hd_src); if (res == 0) { long response_code = 0; res = curl_easy_getinfo(curl, CURLINFO_RESPONSE_CODE, &response_code); error_log(LOG_DEBUG, thread, "Request returned with status \"%ld %s\": %s", response_code, response_buf, uniqueid); if (response_code == 0) { /* Assume problem with connection */ error_log(LOG_WARNING, thread, "Flagging server as errored after failure " "to retrieve response code for entry %s " "(cURL code %d): Possible SSL negotiation " "error", uniqueid, res); apr_sleep(1000 * 1000); take_new = 0; server_error = 1; server_error_last_check_time = apr_time_now(); } else if (res != 0) { error_log(LOG_WARNING, thread, "Flagging server as errored after failure " "to retrieve response code for entry %s " "(cURL code %d): %s", uniqueid, res, curl_error_buffer); apr_sleep(1000 * 1000); take_new = 0; server_error = 1; server_error_last_check_time = apr_time_now(); } else { if (response_code == 200) { double total_time, upload_size; if (server_error == 1) { error_log(LOG_NOTICE, thread, "Clearing the server error flag " "after successful entry " "submission: %s", uniqueid); } server_error = 0; server_error_last_check_time = 0; curl_easy_getinfo(curl, CURLINFO_TOTAL_TIME, &total_time); curl_easy_getinfo(curl, CURLINFO_SIZE_UPLOAD, &upload_size); if (!keep_entries) { error_log(LOG_DEBUG, thread, "Removing: %s", auditlogentry); apr_file_remove(auditlogentry, tpool); } else if (keep_entries == KEEP_ENTRIES_REMOVE_HACK) { keep_entries_hack(tpool, thread, auditlogentry); } error_log(LOG_NOTICE, thread, "Entry completed (%.3f seconds, %.0f " "bytes): %s", total_time, upload_size, uniqueid); take_new = 1; } else if (response_code == 409) { /* Assume problem with audit log entry. */ error_log(LOG_WARNING, thread, "Failed to submit entry with " "\"409 %s\": %s", response_buf, uniqueid); take_new = 1; } else { /* Assume problem with server. */ error_log(LOG_WARNING, thread, "Flagging server as errored after " "failure to submit entry %s with " "HTTP response code %ld: %s", uniqueid, response_code, response_buf); server_error = 1; server_error_last_check_time = apr_time_now(); take_new = 0; } } } else { /* Something isn't right. */ error_log(LOG_WARNING, thread, "Flagging server as errored after " "failure to submit entry %s " "(cURL code %d): %s", uniqueid, res, curl_error_buffer); server_error = 1; server_error_last_check_time = apr_time_now(); take_new = 0; } } else { error_log(LOG_WARNING, thread, "Invalid entry (file not found %d): %s", rc, auditlogentry); take_new = 1; nodelay = 1; } /* If we are tracking num_requests, then shutdown if we are * over our threshold. */ if (num_requests && (num_requests >= max_worker_requests)) { error_log(LOG_NOTICE, thread, "Reached max requests (%d) for this worker, exiting.", max_worker_requests); goto THREAD_SHUTDOWN; } } THREAD_CLEANUP: /* Sleep if we sent data to the server so we do not flood */ /* ENH: Need to sleep for 1ms in a loop checking for shutdown */ if ((nodelay == 0) && (transaction_delay > 0)) { error_log(LOG_DEBUG, thread, "Sleeping for %d msec.", transaction_delay); apr_sleep(transaction_delay * 1000); } if (headerlist != NULL) { curl_slist_free_all(headerlist); headerlist = NULL; } apr_pool_clear(tpool); error_log(LOG_DEBUG, thread, "Worker processing completed."); } THREAD_SHUTDOWN: error_log(LOG_DEBUG, thread, "Worker shutdown locking thread mutex."); if (APR_STATUS_IS_EBUSY(apr_thread_mutex_trylock(mutex))) { error_log(LOG_DEBUG, thread, "Worker shutdown waiting on thread mutex."); apr_thread_mutex_lock(mutex); } /* Deal with the previous entry, if any. */ if (entry != NULL) { apr_hash_set(in_progress, &entry->id, sizeof(entry->id), NULL); if (take_new == 0) { /* Not done. */ *(entry_t **)apr_array_push(queue) = entry; } else { transaction_log(TXOUT, entry->line); free((void *)entry->line); free(entry); } entry = NULL; } /* Return curl handle to the pool for reuse. */ *(CURL **)apr_array_push(curl_handles) = curl; /* No more work, exit. */ current_workers--; error_log(LOG_DEBUG, thread, "Worker shutdown unlocking thread mutex."); apr_thread_mutex_unlock(mutex); apr_pool_destroy(tpool); error_log(LOG_DEBUG, thread, "Worker thread completed."); apr_thread_exit(thread, 0); return NULL; } /** * Creates one new worker, giving it one of the available * Curl handles to work with. */ static void create_new_worker(int lock) { apr_thread_t *thread = NULL; CURL **curlptr = NULL; if (lock) { error_log(LOG_DEBUG, NULL, "Worker creation locking thread mutex."); if (APR_STATUS_IS_EBUSY(apr_thread_mutex_trylock(mutex))) { error_log(LOG_DEBUG, NULL, "Worker creation waiting on thread mutex."); apr_thread_mutex_lock(mutex); } } error_log(LOG_DEBUG, NULL, "Worker creation started."); /* A sanity check: this part executes under lock and * we want to make *sure* we don't create more threads * than we are allowed. */ if (current_workers >= max_connections) { if (lock) { error_log(LOG_DEBUG, NULL, "Worker creation unlocking thread mutex."); apr_thread_mutex_unlock(mutex); } return; } /* Cleanup thread pool when idle */ if (current_workers <= 0) { if (thread_pool != NULL) { error_log(LOG_DEBUG, NULL, "Destroying thread_pool."); apr_pool_destroy(thread_pool); thread_pool = NULL; } error_log(LOG_DEBUG, NULL, "Creating thread_pool."); apr_pool_create(&thread_pool, NULL); } curlptr = (CURL **)apr_array_pop(curl_handles); if (curlptr != NULL) { apr_threadattr_t *thread_attrs; apr_status_t rc; apr_threadattr_create(&thread_attrs, thread_pool); apr_threadattr_detach_set(thread_attrs, 1); apr_threadattr_stacksize_set(thread_attrs, 1024); rc = apr_thread_create(&thread, thread_attrs, thread_worker, *curlptr, thread_pool); if (rc != APR_SUCCESS) { if (lock) { error_log(LOG_DEBUG, NULL, "Worker creation unlocking thread mutex."); apr_thread_mutex_unlock(mutex); } error_log(LOG_ERROR, NULL, "Failed to create new worker thread: %d", rc); logc_shutdown(1); } current_workers++; } else { if (lock) { error_log(LOG_DEBUG, NULL, "Worker creation unlocking thread mutex."); apr_thread_mutex_unlock(mutex); } error_log(LOG_ERROR, NULL, "No more cURL handles (Internal Error)."); logc_shutdown(1); } error_log(LOG_DEBUG, NULL, "Worker creation completed: %pp", thread); if (lock) { error_log(LOG_DEBUG, NULL, "Worker creation unlocking thread mutex."); apr_thread_mutex_unlock(mutex); } } /** * This function implements the management thread. */ static void * APR_THREAD_FUNC thread_manager(apr_thread_t *thread, void *data) { apr_time_t last = 0; apr_time_t now = 0; error_log(LOG_DEBUG, thread, "Management thread: Starting."); for(;;) { now = apr_time_now(); /* Should we stop running? */ if (running == 0) { /* We need to be last */ error_log(LOG_DEBUG, thread, "Management thread: Waiting for worker " "threads to finish."); while(current_workers > 0) { apr_sleep(10 * 1000); } if (have_read_data) { error_log(LOG_NOTICE, thread, "Running final transaction checkpoint."); transaction_checkpoint(); } error_log(LOG_DEBUG, thread, "Management thread: Exiting."); management_thread_active = 0; apr_thread_exit(thread, 0); } /* Sleep for a while, but wake up often to check running status */ if ((last > 0) && ((now - last) < MANAGER_SLEEP)) { apr_sleep(MANAGER_SUBSLEEP); continue; } last = now; error_log(LOG_DEBUG2, thread, "Management thread: Processing"); /* When the server is flagged errored we need to * create a worker thread from time to time to * investigate. */ if (server_error) { if ((current_workers == 0)&& (apr_time_sec(now - server_error_last_check_time) > server_error_timeout)) { server_error_last_check_time = now; error_log(LOG_DEBUG, thread, "Management thread: Creating worker thread to " "investigate server."); create_new_worker(1); } } else { if ( (current_workers < max_connections) && (queue->nelts > current_workers) ) { error_log(LOG_DEBUG, thread, "Management thread: Creating worker thread to " "catch up with the queue."); create_new_worker(1); } } /* Initiate a transaction log checkpoint if enough time passed * since the last one. */ if (apr_time_sec(now - checkpoint_time_last) > checkpoint_interval) { error_log(LOG_DEBUG, thread, "Management thread: Initiating a checkpoint " "(previous was %" APR_TIME_T_FMT " seconds ago).", apr_time_sec(now - checkpoint_time_last)); checkpoint_time_last = now; transaction_checkpoint(); } else { error_log(LOG_DEBUG2, thread, "Management thread: Last checkpoint was %" APR_TIME_T_FMT " seconds ago.", apr_time_sec(now - checkpoint_time_last)); } } return NULL; } #ifndef WIN32 /** * Thread to handle all signals */ static void * APR_THREAD_FUNC thread_signals(apr_thread_t *thread, void *data) { apr_status_t rc; error_log(LOG_DEBUG, thread, "Signal thread: Starting."); rc = apr_signal_thread(handle_signals); if (rc != APR_SUCCESS) { error_log(LOG_DEBUG, thread, "Signal thread: Error %d", rc); logc_shutdown(1); } return NULL; } #endif /* WIN32 */ /** * The main loop where we receive log entries from * Apache and add them to the queue, sometimes creating * new worker threads to handle them. */ static void receive_loop(void) { apr_file_t *fd_stdin; apr_size_t nbytes = PIPE_BUF_SIZE; char *buf = apr_palloc(pool, PIPE_BUF_SIZE + 1); char errstr[1024]; apr_size_t evnt = 0; /* Index in buf to first event char */ apr_size_t curr = 0; /* Index in buf to current processing char */ apr_size_t next = 0; /* Index in buf to next unused char */ int done = 0; int drop_next = 0; int buffered_events = 0; int count = 0; apr_pool_t *tmp_pool; /* Open stdin. */ if (apr_file_open_stdin(&fd_stdin, pool) != APR_SUCCESS) { error_log(LOG_ERROR, NULL, "Unable to open stdin for reading"); logc_shutdown(1); } /* Always want this NUL terminated */ buf[PIPE_BUF_SIZE] = '\0'; apr_pool_create(&tmp_pool, NULL); /* Loop forever receiving entries from stdin. */ while(!done || (curr < next)) { apr_status_t rc; if (error_log_level >= LOG_DEBUG2) { error_log(LOG_DEBUG2, NULL, "Internal state: " "[evnt \"%" APR_SIZE_T_FMT "\"]" "[curr \"%" APR_SIZE_T_FMT "\"]" "[next \"%" APR_SIZE_T_FMT "\"]" "[nbytes \"%" APR_SIZE_T_FMT "\"]", evnt, curr, next, nbytes); } /* If we are not done and have the space, read more */ if (!done && (nbytes > 0)) { buffered_events = 0; nbytes = PIPE_BUF_SIZE - next; rc = apr_file_read(fd_stdin, (buf + next), &nbytes); if (rc != APR_SUCCESS) { if (have_read_data) { error_log(LOG_NOTICE, NULL, "No more data to read, emptying buffer: %s", apr_strerror(rc, errstr, 1024)); } done = 1; } else { have_read_data = 1; if (error_log_level == LOG_DEBUG) { error_log(LOG_DEBUG, NULL, "Read %" APR_SIZE_T_FMT " bytes from pipe", nbytes); } else { error_log(LOG_DEBUG2, NULL, "Read %" APR_SIZE_T_FMT " bytes from pipe: `%s'", nbytes, _log_escape(tmp_pool, (buf + next), nbytes)); } } next += nbytes; } /** * Each chunk of data we receive can contain one or more lines for * which we need to find the EOL marker and then queue the event * up to that. So, find/queue as many lines in the buffer as we * can. Any remaining data will get shifted back to the beginning * of the buffer and the buffer size for the next read adjusted. */ while(curr < next) { /* Look for EOL so we can parse the event */ while((curr < next) && (buf[curr] != 0x0a)) { curr++; } if (buf[curr] == 0x0a) { buf[curr] = '\0'; /* We may have to drop this one if it previously failed */ if (drop_next) { error_log(LOG_ERROR, NULL, "Dropping remaining portion of failed " "event: `%s'", _log_escape(tmp_pool, (buf + evnt), (curr - evnt))); drop_next = 0; } else { transaction_log(TXIN, buf + evnt); error_log(LOG_DEBUG2, NULL, "Received audit log entry " "(count %lu queue %d workers %d): %s", entry_counter, queue->nelts, current_workers, _log_escape(tmp_pool, (buf + evnt), strlen(buf + evnt))); add_entry(buf + evnt, 1); buffered_events++; } /* Advance indexes to next event in buf */ evnt = curr = curr + 1; } else { error_log(LOG_DEBUG2, NULL, "Event buffer contains partial event: `%s'", _log_escape(tmp_pool, (buf + evnt), (next - evnt))); break; } } if (buffered_events > 0) { error_log(LOG_DEBUG, NULL, "Processed %d entries from buffer.", buffered_events); /* Move the unused portion of the buffer to the beginning */ next -= evnt; curr -= evnt; memmove(buf, (buf + evnt), next); error_log(LOG_DEBUG2, NULL, "Shifted buffer back %" APR_SIZE_T_FMT " and offset %" APR_SIZE_T_FMT " bytes for next read: `%s'", evnt, next, _log_escape(tmp_pool, buf, next)); evnt = 0; } else if (next == PIPE_BUF_SIZE) { /** * There is a chance we could fill the buffer, but not have finished * reading the event (no EOL yet), so we need to say so and drop * all data until we find the end of the event that is too large. */ if (drop_next) { error_log(LOG_ERROR, NULL, "Event continuation too large, " "dropping it as well: `%s'", _log_escape(tmp_pool, buf, PIPE_BUF_SIZE)); } else { error_log(LOG_ERROR, NULL, "Event too large, dropping event: `%s'", _log_escape(tmp_pool, buf, PIPE_BUF_SIZE)); } /* Rewind buf and mark that we need to drop up to the next event */ evnt = curr = next = 0; drop_next = 1; } nbytes = PIPE_BUF_SIZE - next; if (count++ > 1000) { count = 0; error_log(LOG_DEBUG, NULL, "Recycling tmp_pool."); apr_pool_destroy(tmp_pool); apr_pool_create(&tmp_pool, NULL); } else { apr_pool_clear(tmp_pool); } } /* Wait for queue to empty if specified */ if ((server_error == 0) && (opt_force != 0) && (queue->nelts > 0)) { error_log(LOG_NOTICE, NULL, "Waiting for queue to empty (%d active).", queue->nelts); while ((server_error == 0) && (opt_force != 0) && (queue->nelts > 0)) { apr_sleep(10 * 1000); } if (queue->nelts > 0) { error_log(LOG_ERROR, NULL, "Could not empty queue (%d active).", queue->nelts); } } } /** * Creates the management thread. */ static void start_management_thread(void) { apr_thread_t *thread = NULL; apr_threadattr_t *thread_attrs; apr_status_t rc; apr_threadattr_create(&thread_attrs, pool); apr_threadattr_detach_set(thread_attrs, 1); apr_threadattr_stacksize_set(thread_attrs, 1024); management_thread_active = 1; rc = apr_thread_create(&thread, thread_attrs, thread_manager, NULL, pool); if (rc != APR_SUCCESS) { error_log(LOG_ERROR, NULL, "Failed to create new management thread: %d", rc); management_thread_active = 0; logc_shutdown(1); } } #ifndef WIN32 /** * Creates a thread to handle all signals */ static void start_signal_thread(void) { apr_thread_t *thread = NULL; apr_threadattr_t *thread_attrs; apr_status_t rc; apr_threadattr_create(&thread_attrs, pool); apr_threadattr_detach_set(thread_attrs, 1); apr_threadattr_stacksize_set(thread_attrs, 1024); rc = apr_thread_create(&thread, thread_attrs, thread_signals, NULL, pool); if (rc != APR_SUCCESS) { error_log(LOG_ERROR, NULL, "Failed to create new signal thread: %d", rc); logc_shutdown(1); } } #endif /* WIN32 */ /** * Usage text. */ static void usage(void) { fprintf(stderr, "ModSecurity Log Collector (mlogc) v%s\n", VERSION); fprintf(stderr, " Usage: mlogc [options] /path/to/the/mlogc.conf\n"); fprintf(stderr, "\n"); fprintf(stderr, " Options:\n"); fprintf(stderr, " -f Force depletion of queue on exit\n"); fprintf(stderr, " -v Version information\n"); fprintf(stderr, " -h This help\n\n"); } /** * Version text. */ static void version(void) { fprintf(stderr, "ModSecurity Log Collector (mlogc) v%s\n", VERSION); fprintf(stderr, " APR: compiled=\"%s\"; " "loaded=\"%s\"\n", APR_VERSION_STRING, apr_version_string()); fprintf(stderr, " PCRE: compiled=\"%d.%d\"; " "loaded=\"%s\"\n", PCRE_MAJOR, PCRE_MINOR, pcre_version()); fprintf(stderr, " cURL: compiled=\"%s\"; " "loaded=\"%s\"\n", LIBCURL_VERSION, curl_version()); fprintf(stderr, "\n"); } /** * This is the main entry point. */ int main(int argc, const char * const argv[]) { apr_getopt_t *opt; apr_status_t rc; apr_app_initialize(&argc, &argv, NULL); atexit(apr_terminate); curl_global_init(CURL_GLOBAL_ALL); atexit(logc_cleanup); logc_pid = getpid(); apr_pool_create(&pool, NULL); apr_pool_create(&recv_pool, NULL); #ifndef WIN32 apr_setup_signal_thread(); #else apr_signal(SIGINT, handle_signals); apr_signal(SIGTERM, handle_signals); #endif /* WIN32 */ if (argc < 2) { usage(); logc_shutdown(1); } /* Commandline opts */ rc = apr_getopt_init(&opt, pool, argc, argv); if (rc != APR_SUCCESS) { usage(); logc_shutdown(1); } do { char ch; const char *val; rc = apr_getopt(opt, CMDLINE_OPTS, &ch, &val); switch (rc) { case APR_SUCCESS: switch (ch) { case 'f': opt_force = 1; break; case 'v': version(); logc_shutdown(0); case 'h': usage(); logc_shutdown(0); } break; case APR_BADCH: case APR_BADARG: usage(); logc_shutdown(1); } } while (rc != APR_EOF); /* Conf file is last */ conffile = argv[argc - 1]; read_configuration(); init_configuration(); logc_init(); transaction_log_init(); running = 1; server_error = 0; start_management_thread(); #ifndef WIN32 start_signal_thread(); #endif /* WIN32 */ /* Process stdin until EOF */ receive_loop(); logc_shutdown(0); return 0; }