From b8790b1b48c238f2ba266e34625b8e8f5db0ad6e Mon Sep 17 00:00:00 2001 From: Edward Rudd Date: Sat, 14 Mar 2009 22:07:56 +0000 Subject: refactoroed to allo wdb connections to be per-thread added initial threading implementation using apr_queues and apr_thread_pools --- diff --git a/utility/config.h b/utility/config.h index 91c6f65..a9bd1b5 100644 --- a/utility/config.h +++ b/utility/config.h @@ -58,7 +58,6 @@ struct config_t { /** db connection configuration */ const char *dbdriver; const char *dbparams; - config_dbd_t *dbconn; /** Logging table */ const char *table; diff --git a/utility/database.c b/utility/database.c index 98c203b..ff81caa 100644 --- a/utility/database.c +++ b/utility/database.c @@ -19,15 +19,16 @@ void database_init(apr_pool_t *p) apr_dbd_init(p); } -apr_status_t database_connect(config_t *cfg) +/** @todo split this into load and connect */ +apr_status_t database_connect(config_t *cfg, config_dbd_t **dbconn) { apr_status_t rv; if (!cfg->dbdriver || !cfg->dbparams) return APR_EINVAL; - if (!cfg->dbconn) { - cfg->dbconn = apr_pcalloc(cfg->pool, sizeof(config_dbd_t)); + if (!*dbconn) { + *dbconn = apr_pcalloc(cfg->pool, sizeof(config_dbd_t)); } - rv = apr_dbd_get_driver(cfg->pool, cfg->dbdriver, &(cfg->dbconn->driver)); + rv = apr_dbd_get_driver(cfg->pool, cfg->dbdriver, &((*dbconn)->driver)); if (rv) { logging_log(cfg, LOGLEVEL_ERROR, @@ -36,8 +37,8 @@ apr_status_t database_connect(config_t *cfg) return rv; } - rv = apr_dbd_open(cfg->dbconn->driver, cfg->pool, cfg->dbparams, - &(cfg->dbconn->dbd)); + rv = apr_dbd_open((*dbconn)->driver, cfg->pool, cfg->dbparams, + &((*dbconn)->dbd)); if (rv) { logging_log(cfg, LOGLEVEL_ERROR, "DB: Could not connect to database. Error (%d)%s", rv, @@ -48,12 +49,13 @@ apr_status_t database_connect(config_t *cfg) return APR_SUCCESS; } -apr_status_t database_disconnect(config_t *cfg) +apr_status_t database_disconnect(config_dbd_t *dbconn) { - return apr_dbd_close(cfg->dbconn->driver, cfg->dbconn->dbd); + return apr_dbd_close(dbconn->driver, dbconn->dbd); } -static apr_dbd_prepared_t *database_prepare_insert(config_t *cfg, apr_pool_t *p) +static apr_dbd_prepared_t *database_prepare_insert(config_t *cfg, + config_dbd_t *dbconn, apr_pool_t *p) { apr_status_t rv; char *sql; @@ -93,19 +95,20 @@ static apr_dbd_prepared_t *database_prepare_insert(config_t *cfg, apr_pool_t *p) logging_log(cfg, LOGLEVEL_DEBUG, "DB: Generated SQL: %s", sql); - rv = apr_dbd_prepare(cfg->dbconn->driver, cfg->pool, cfg->dbconn->dbd, sql, + rv = apr_dbd_prepare(dbconn->driver, cfg->pool, dbconn->dbd, sql, "INSERT", &stmt); if (rv) { logging_log(cfg, LOGLEVEL_NOISE, "DB: Unable to Prepare SQL insert: %s", apr_dbd_error( - cfg->dbconn->driver, cfg->dbconn->dbd, rv)); + dbconn->driver, dbconn->dbd, rv)); return NULL; } return stmt; } -apr_status_t database_insert(config_t *cfg, apr_pool_t *p, apr_table_t *data) +apr_status_t database_insert(config_t *cfg, config_dbd_t *dbconn, + apr_pool_t *p, apr_table_t *data) { apr_status_t rv; int f, nfs; @@ -113,93 +116,95 @@ apr_status_t database_insert(config_t *cfg, apr_pool_t *p, apr_table_t *data) ofields = (config_output_field_t *)cfg->output_fields->elts; nfs = cfg->output_fields->nelts; // Prepare statement - if (!cfg->dbconn->stmt) { - cfg->dbconn->stmt = database_prepare_insert(cfg, p); - if (!cfg->dbconn->stmt) { + if (!dbconn->stmt) { + dbconn->stmt = database_prepare_insert(cfg, dbconn, p); + if (!dbconn->stmt) { return APR_EINVAL; } - cfg->dbconn->args = apr_palloc(cfg->pool, nfs * sizeof(char *)); + dbconn->args = apr_palloc(cfg->pool, nfs * sizeof(char *)); } for (f=0; fdbconn->args[f] = apr_table_get(data, ofields[f].field); + dbconn->args[f] = apr_table_get(data, ofields[f].field); } - rv = apr_dbd_pquery(cfg->dbconn->driver, p, cfg->dbconn->dbd, &f, - cfg->dbconn->stmt, nfs, cfg->dbconn->args); + rv = apr_dbd_pquery(dbconn->driver, p, dbconn->dbd, &f, + dbconn->stmt, nfs, dbconn->args); if (rv) { logging_log(cfg, LOGLEVEL_ERROR, "DB: Unable to Insert SQL: %s", - apr_dbd_error(cfg->dbconn->driver, cfg->dbconn->dbd, rv)); + apr_dbd_error(dbconn->driver, dbconn->dbd, rv)); return rv; } return APR_SUCCESS; } -apr_status_t database_trans_start(config_t *cfg, apr_pool_t *p) +apr_status_t database_trans_start(config_t *cfg, config_dbd_t *dbconn, + apr_pool_t *p) { #if HAVE_APR_DBD_TRANSACTION_MODE_GET apr_status_t rv; if (!cfg->transactions) return APR_SUCCESS; - if (cfg->dbconn->txn) { + if (dbconn->txn) { logging_log(cfg, LOGLEVEL_NOISE, "Transaction Already Started. Something is BROKE"); return APR_EINVAL; } logging_log(cfg, LOGLEVEL_DEBUG, "DB: Starting Transaction"); - rv = apr_dbd_transaction_start(cfg->dbconn->driver, p, cfg->dbconn->dbd, - &cfg->dbconn->txn); + rv = apr_dbd_transaction_start(dbconn->driver, p, dbconn->dbd, + &dbconn->txn); if (rv) logging_log(cfg, LOGLEVEL_NOISE, "DB: Error Starting Transaction: (%d)%s", rv, apr_dbd_error( - cfg->dbconn->driver, cfg->dbconn->dbd, rv)); + dbconn->driver, dbconn->dbd, rv)); return rv; #else return APR_SUCCESS; #endif } -apr_status_t database_trans_stop(config_t *cfg, apr_pool_t *p) +apr_status_t database_trans_stop(config_t *cfg, config_dbd_t *dbconn, + apr_pool_t *p) { #if HAVE_APR_DBD_TRANSACTION_MODE_GET apr_status_t rv; if (!cfg->transactions) return APR_SUCCESS; - if (!cfg->dbconn->txn) { + if (!dbconn->txn) { logging_log(cfg, LOGLEVEL_NOISE, "No Transaction Started. Something is BROKE"); return APR_EINVAL; } logging_log(cfg, LOGLEVEL_DEBUG, "DB: Stopping Transaction"); - rv = apr_dbd_transaction_end(cfg->dbconn->driver, p, cfg->dbconn->txn); + rv = apr_dbd_transaction_end(dbconn->driver, p, dbconn->txn); if (rv) logging_log(cfg, LOGLEVEL_NOISE, "DB: Error Stopping Transaction: (%d)%s", rv, apr_dbd_error( - cfg->dbconn->driver, cfg->dbconn->dbd, rv)); + dbconn->driver, dbconn->dbd, rv)); - cfg->dbconn->txn = NULL; + dbconn->txn = NULL; return rv; #else return APR_SUCCESS; #endif } -apr_status_t database_trans_abort(config_t *cfg) +apr_status_t database_trans_abort(config_t *cfg, config_dbd_t *dbconn) { #if HAVE_APR_DBD_TRANSACTION_MODE_GET apr_status_t rv; if (!cfg->transactions) return APR_SUCCESS; - if (!cfg->dbconn->txn) { + if (!dbconn->txn) { logging_log(cfg, LOGLEVEL_NOISE, "No Transaction Started. Something is BROKE"); return APR_EINVAL; } logging_log(cfg, LOGLEVEL_NOTICE, "DB: Aborting Transaction"); - rv = apr_dbd_transaction_mode_set(cfg->dbconn->driver, cfg->dbconn->txn, + rv = apr_dbd_transaction_mode_set(dbconn->driver, dbconn->txn, APR_DBD_TRANSACTION_ROLLBACK); if (rv) logging_log(cfg, LOGLEVEL_NOISE, "DB: Error Aborting Transaction: (%d)%s", rv, apr_dbd_error( - cfg->dbconn->driver, cfg->dbconn->dbd, rv)); + dbconn->driver, dbconn->dbd, rv)); return rv; #else return APR_SUCCESS; diff --git a/utility/database.h b/utility/database.h index eed8898..9797d51 100644 --- a/utility/database.h +++ b/utility/database.h @@ -7,16 +7,19 @@ void database_init(apr_pool_t *p); -apr_status_t database_connect(config_t *cfg); +apr_status_t database_connect(config_t *cfg, config_dbd_t **dbconn); -apr_status_t database_disconnect(config_t *cfg); +apr_status_t database_disconnect(config_dbd_t *dbconn); -apr_status_t database_insert(config_t *cfg, apr_pool_t *p, apr_table_t *data); +apr_status_t database_insert(config_t *cfg, config_dbd_t *dbconn, + apr_pool_t *p, apr_table_t *data); -apr_status_t database_trans_start(config_t *cfg, apr_pool_t *p); +apr_status_t database_trans_start(config_t *cfg, config_dbd_t *dbconn, + apr_pool_t *p); -apr_status_t database_trans_stop(config_t *cfg, apr_pool_t *p); +apr_status_t database_trans_stop(config_t *cfg, config_dbd_t *dbconn, + apr_pool_t *p); -apr_status_t database_trans_abort(config_t *cfg); +apr_status_t database_trans_abort(config_t *cfg, config_dbd_t *dbconn); #endif /*DATABASE_H_*/ diff --git a/utility/logparse.c b/utility/logparse.c index 7267682..1b3cc97 100644 --- a/utility/logparse.c +++ b/utility/logparse.c @@ -266,14 +266,12 @@ void parser_split_logs(config_t *cfg) ++linecount; } } - printf("Lines %'d\n",linecount); // now we know how long it is. Lets split up the file piecesize = linecount / cfg->split_count; if (piecesize < cfg->split_minimum) piecesize = cfg->split_minimum; if (piecesize > cfg->split_maximum && cfg->split_maximum > 0) piecesize = cfg->split_maximum; - printf("Piece size %'d\n", piecesize); if (piecesize > linecount) { // File is smaller than piece size just add it back in as is newfile = (config_filestat_t *)apr_array_push(cfg->input_files); @@ -296,7 +294,6 @@ void parser_split_logs(config_t *cfg) basefile = apr_pstrdup(tfp, basename(apr_pstrdup(tfp, filelist[f].fname))); file = apr_psprintf(tfp, "%s/%s-%d", cfg->split_dir, basefile, file_count++); - printf("Out file %s\n", file); logging_log(cfg, LOGLEVEL_NOTICE, "SPLITTER: Creating output file %s", file); rv = apr_file_open(&outfile, file, APR_FOPEN_WRITE | APR_FOPEN_CREATE | APR_FOPEN_TRUNCATE, APR_OS_DEFAULT, tfp); if (rv != APR_SUCCESS) { @@ -343,7 +340,6 @@ void parser_split_logs(config_t *cfg) out_lines = 0; // Open new file file = apr_psprintf(tfp, "%s/%s-%d", cfg->split_dir, basefile, file_count++); - printf("Out file %s\n", file); logging_log(cfg, LOGLEVEL_NOTICE, "SPLITTER: Creating output file %s", file); rv = apr_file_open(&outfile, file, APR_FOPEN_WRITE | APR_FOPEN_CREATE | APR_FOPEN_TRUNCATE, APR_OS_DEFAULT, tfp); if (rv != APR_SUCCESS) { @@ -548,7 +544,8 @@ apr_status_t parser_tokenize_line(const char *arg_str, char ***argv_out, return APR_SUCCESS; } -apr_status_t parser_parsefile(config_t *cfg, config_filestat_t *fstat) +apr_status_t parser_parsefile(config_t *cfg, config_dbd_t *dbconn, + config_filestat_t *fstat) { apr_pool_t *tp, *targp; apr_file_t *file; @@ -573,7 +570,7 @@ apr_status_t parser_parsefile(config_t *cfg, config_filestat_t *fstat) fstat->linesparsed = 0; // Start Transaction fstat->start = apr_time_now(); - if (!cfg->dryrun && database_trans_start(cfg,tp)) { + if (!cfg->dryrun && database_trans_start(cfg, dbconn, tp)) { fstat->result = "Database Transaction Error"; fstat->stop = apr_time_now(); return rv; @@ -614,14 +611,14 @@ apr_status_t parser_parsefile(config_t *cfg, config_filestat_t *fstat) targc = 0; while (targv[targc]) targc++; - rv = parser_processline(targp, cfg, fstat, targv, targc); + rv = parser_processline(targp, cfg, dbconn, fstat, targv, targc); if (rv != APR_SUCCESS) { int i; fstat->linesbad++; rv = parser_logbadline(cfg, fstat->fname, buff); if (rv) { - if (!cfg->dryrun) database_trans_abort(cfg); + if (!cfg->dryrun) database_trans_abort(cfg, dbconn); logging_log(cfg, LOGLEVEL_ERROR, "Line %d(%d): %s", fstat->linesparsed, targc, buff); for (i = 0; targv[i]; i++) { @@ -637,7 +634,7 @@ apr_status_t parser_parsefile(config_t *cfg, config_filestat_t *fstat) } while (rv == APR_SUCCESS); apr_file_close(file); // Finish Transaction - if (!cfg->dryrun && database_trans_stop(cfg,tp)) { + if (!cfg->dryrun && database_trans_stop(cfg, dbconn, tp)) { fstat->result = apr_psprintf(cfg->pool, "Input line %d, Database Transaction Error", fstat->linesparsed); @@ -655,7 +652,7 @@ apr_status_t parser_parsefile(config_t *cfg, config_filestat_t *fstat) } apr_status_t parser_processline(apr_pool_t *ptemp, config_t *cfg, - config_filestat_t *fstat, char **argv, int argc) + config_dbd_t *dbconn, config_filestat_t *fstat, char **argv, int argc) { config_logformat_t *fmt; config_logformat_field_t *ifields; @@ -760,7 +757,7 @@ apr_status_t parser_processline(apr_pool_t *ptemp, config_t *cfg, // Process DB Query if (!cfg->dryrun) { - rv = database_insert(cfg, ptemp, dataout); + rv = database_insert(cfg, dbconn, ptemp, dataout); if (rv) { fstat->result = apr_psprintf(cfg->pool, "Input line %d, Database Error", diff --git a/utility/logparse.h b/utility/logparse.h index cd085b5..7ca0958 100644 --- a/utility/logparse.h +++ b/utility/logparse.h @@ -28,9 +28,10 @@ void parser_split_logs(config_t *cfg); apr_status_t parser_tokenize_line(const char *arg_str, char ***argv_out, apr_pool_t *token_context); -apr_status_t parser_parsefile(config_t *cfg, config_filestat_t *fstat); +apr_status_t parser_parsefile(config_t *cfg, config_dbd_t *dbconn, + config_filestat_t *fstat); apr_status_t parser_processline(apr_pool_t *ptemp, config_t *cfg, - config_filestat_t *line, char **argv, int argc); + config_dbd_t *dbconn, config_filestat_t *line, char **argv, int argc); #endif /*LOGPARSE_H_*/ diff --git a/utility/shell.c b/utility/shell.c index 2f2f43b..0fea741 100644 --- a/utility/shell.c +++ b/utility/shell.c @@ -12,6 +12,17 @@ #include "database.h" #include "util.h" +#if APR_HAS_THREADS +#include "apr_queue.h" +#include "apr_thread_pool.h" + +static apr_queue_t *queue; + +void run_multithreaded(config_t *cfg); +#endif + +void run_singlethreaded(config_t *cfg); + const apr_getopt_option_t _opt_config[] = { {"machineid", 'm', 1, "Machine ID for the log file"}, {"transaction", 't', 1, "Use a Transaction (yes,no)"}, @@ -23,6 +34,7 @@ const apr_getopt_option_t _opt_config[] = { {"dump", 'd', 0, "Dump the configuration after parsing and quit"}, {"loglevel", 'l', 1, "Log Level (deubg, notice, error)"}, {"summary", 's', 1, "Summary (yes,no)"}, + {"threadcount", 'p', 1, "Set thread count (a number greater than 0)"}, {"help", 'h', 0, "Show Help"}, {NULL} }; @@ -58,25 +70,45 @@ void show_help(const char *prog, const apr_getopt_option_t *opts, FILE *output) void print_summary(config_t *cfg) { config_filestat_t *fstat; int i,m; + apr_time_t totaltime = 0; + apr_size_t totalparsed = 0, totalskipped = 0, totalbad = 0; fstat = (config_filestat_t *)cfg->input_files->elts; printf("Execution Summary\nParsed %d files\n", cfg->input_files->nelts); for (i=0, m=cfg->input_files->nelts; iinput_files)) { parser_find_logs(cfg); } - if (!cfg->dryrun) { - if ((rv = database_connect(cfg))) { - logging_log(cfg,LOGLEVEL_NOISE, "Error Connecting to Database"); - exit(1); - } - } if (!apr_is_empty_array(cfg->input_files)) { parser_split_logs(cfg); - config_filestat_t *filelist; - int f, l; - filelist = (config_filestat_t *)cfg->input_files->elts; - for (f=0, l=cfg->input_files->nelts; f < l; f++) { - rv = parser_parsefile(cfg, &filelist[f]); - if (rv) { - logging_log(cfg, LOGLEVEL_NOISE, - "Error occured parsing log files. Aborting"); - break; - } +#if APR_HAS_THREADS + if (cfg->thread_count > 1) { + run_multithreaded(cfg); + } else { +#endif + run_singlethreaded(cfg); +#if APR_HAS_THREADS } +#endif } else { logging_log(cfg,LOGLEVEL_NOISE,"No log files found to parse"); } - if (!cfg->dryrun) { - database_disconnect(cfg); - } if (cfg->summary) { print_summary(cfg); } return 0; } + +void run_singlethreaded(config_t *cfg) +{ + config_filestat_t *filelist; + config_dbd_t *dbconn = NULL; + int f, l; + apr_status_t rv; + + if (!cfg->dryrun) { + if ((rv = database_connect(cfg, &dbconn))) { + logging_log(cfg,LOGLEVEL_NOISE, "Error Connecting to Database"); + exit(1); + } + } + + filelist = (config_filestat_t *)cfg->input_files->elts; + for (f=0, l=cfg->input_files->nelts; f < l; f++) { + rv = parser_parsefile(cfg, dbconn, &filelist[f]); + if (rv) { + logging_log(cfg, LOGLEVEL_NOISE, + "Error occured parsing log files. Aborting"); + break; + } + } + + if (!cfg->dryrun) { + database_disconnect(dbconn); + } +} + +#if APR_HAS_THREADS +void * APR_THREAD_FUNC run_filethread(apr_thread_t *thd, void *data) +{ + config_t *cfg = data; + config_dbd_t *dbconn = NULL; + config_filestat_t *fileentry; + apr_status_t rv; + + if (!cfg->dryrun) { + if ((rv = database_connect(cfg, &dbconn))) { + logging_log(cfg,LOGLEVEL_NOISE, "Error Connecting to Database"); + exit(1); + } + } + + while (1) { + rv = apr_queue_pop(queue, (void **)&fileentry); + if (rv == APR_EINTR) + continue; + if (rv == APR_EOF) + break; + rv = parser_parsefile(cfg, dbconn, fileentry); + if (rv) { + logging_log(cfg, LOGLEVEL_NOISE, + "Error occured parsing log file %s", fileentry->fname); + } + } + + if (!cfg->dryrun) { + database_disconnect(dbconn); + } + return NULL; +} + +void run_multithreaded(config_t *cfg) +{ + logging_log(cfg, LOGLEVEL_NOISE, "Running Multithreaded"); + + config_filestat_t *filelist; + int f, l; + apr_status_t rv; + apr_pool_t *tp; + apr_thread_pool_t *thrp; + unsigned int count; + + apr_pool_create(&tp, cfg->pool); + rv = apr_queue_create(&queue, cfg->input_files->nelts, tp); + + rv = apr_thread_pool_create(&thrp, 0, cfg->thread_count, tp); + + //populate queue + filelist = (config_filestat_t *)cfg->input_files->elts; + for (f=0, l=cfg->input_files->nelts; f < l; f++) { + rv = apr_queue_push(queue, &filelist[f]); + } + // populate the worker threads + for (f=0; fthread_count; f++) { + rv = apr_thread_pool_push(thrp, run_filethread, cfg, 0, NULL); + } + + do { + apr_sleep(apr_time_from_sec(1)); + count = apr_queue_size(queue); + } while (count > 0); + + rv = apr_queue_term(queue); + + rv = apr_thread_pool_destroy(thrp); +} +#endif -- cgit v0.9.2