From b8790b1b48c238f2ba266e34625b8e8f5db0ad6e Mon Sep 17 00:00:00 2001 From: Edward Rudd Date: Sat, 14 Mar 2009 22:07:56 +0000 Subject: refactoroed to allo wdb connections to be per-thread added initial threading implementation using apr_queues and apr_thread_pools --- (limited to 'utility/shell.c') diff --git a/utility/shell.c b/utility/shell.c index 2f2f43b..0fea741 100644 --- a/utility/shell.c +++ b/utility/shell.c @@ -12,6 +12,17 @@ #include "database.h" #include "util.h" +#if APR_HAS_THREADS +#include "apr_queue.h" +#include "apr_thread_pool.h" + +static apr_queue_t *queue; + +void run_multithreaded(config_t *cfg); +#endif + +void run_singlethreaded(config_t *cfg); + const apr_getopt_option_t _opt_config[] = { {"machineid", 'm', 1, "Machine ID for the log file"}, {"transaction", 't', 1, "Use a Transaction (yes,no)"}, @@ -23,6 +34,7 @@ const apr_getopt_option_t _opt_config[] = { {"dump", 'd', 0, "Dump the configuration after parsing and quit"}, {"loglevel", 'l', 1, "Log Level (deubg, notice, error)"}, {"summary", 's', 1, "Summary (yes,no)"}, + {"threadcount", 'p', 1, "Set thread count (a number greater than 0)"}, {"help", 'h', 0, "Show Help"}, {NULL} }; @@ -58,25 +70,45 @@ void show_help(const char *prog, const apr_getopt_option_t *opts, FILE *output) void print_summary(config_t *cfg) { config_filestat_t *fstat; int i,m; + apr_time_t totaltime = 0; + apr_size_t totalparsed = 0, totalskipped = 0, totalbad = 0; fstat = (config_filestat_t *)cfg->input_files->elts; printf("Execution Summary\nParsed %d files\n", cfg->input_files->nelts); for (i=0, m=cfg->input_files->nelts; iinput_files)) { parser_find_logs(cfg); } - if (!cfg->dryrun) { - if ((rv = database_connect(cfg))) { - logging_log(cfg,LOGLEVEL_NOISE, "Error Connecting to Database"); - exit(1); - } - } if (!apr_is_empty_array(cfg->input_files)) { parser_split_logs(cfg); - config_filestat_t *filelist; - int f, l; - filelist = (config_filestat_t *)cfg->input_files->elts; - for (f=0, l=cfg->input_files->nelts; f < l; f++) { - rv = parser_parsefile(cfg, &filelist[f]); - if (rv) { - logging_log(cfg, LOGLEVEL_NOISE, - "Error occured parsing log files. Aborting"); - break; - } +#if APR_HAS_THREADS + if (cfg->thread_count > 1) { + run_multithreaded(cfg); + } else { +#endif + run_singlethreaded(cfg); +#if APR_HAS_THREADS } +#endif } else { logging_log(cfg,LOGLEVEL_NOISE,"No log files found to parse"); } - if (!cfg->dryrun) { - database_disconnect(cfg); - } if (cfg->summary) { print_summary(cfg); } return 0; } + +void run_singlethreaded(config_t *cfg) +{ + config_filestat_t *filelist; + config_dbd_t *dbconn = NULL; + int f, l; + apr_status_t rv; + + if (!cfg->dryrun) { + if ((rv = database_connect(cfg, &dbconn))) { + logging_log(cfg,LOGLEVEL_NOISE, "Error Connecting to Database"); + exit(1); + } + } + + filelist = (config_filestat_t *)cfg->input_files->elts; + for (f=0, l=cfg->input_files->nelts; f < l; f++) { + rv = parser_parsefile(cfg, dbconn, &filelist[f]); + if (rv) { + logging_log(cfg, LOGLEVEL_NOISE, + "Error occured parsing log files. Aborting"); + break; + } + } + + if (!cfg->dryrun) { + database_disconnect(dbconn); + } +} + +#if APR_HAS_THREADS +void * APR_THREAD_FUNC run_filethread(apr_thread_t *thd, void *data) +{ + config_t *cfg = data; + config_dbd_t *dbconn = NULL; + config_filestat_t *fileentry; + apr_status_t rv; + + if (!cfg->dryrun) { + if ((rv = database_connect(cfg, &dbconn))) { + logging_log(cfg,LOGLEVEL_NOISE, "Error Connecting to Database"); + exit(1); + } + } + + while (1) { + rv = apr_queue_pop(queue, (void **)&fileentry); + if (rv == APR_EINTR) + continue; + if (rv == APR_EOF) + break; + rv = parser_parsefile(cfg, dbconn, fileentry); + if (rv) { + logging_log(cfg, LOGLEVEL_NOISE, + "Error occured parsing log file %s", fileentry->fname); + } + } + + if (!cfg->dryrun) { + database_disconnect(dbconn); + } + return NULL; +} + +void run_multithreaded(config_t *cfg) +{ + logging_log(cfg, LOGLEVEL_NOISE, "Running Multithreaded"); + + config_filestat_t *filelist; + int f, l; + apr_status_t rv; + apr_pool_t *tp; + apr_thread_pool_t *thrp; + unsigned int count; + + apr_pool_create(&tp, cfg->pool); + rv = apr_queue_create(&queue, cfg->input_files->nelts, tp); + + rv = apr_thread_pool_create(&thrp, 0, cfg->thread_count, tp); + + //populate queue + filelist = (config_filestat_t *)cfg->input_files->elts; + for (f=0, l=cfg->input_files->nelts; f < l; f++) { + rv = apr_queue_push(queue, &filelist[f]); + } + // populate the worker threads + for (f=0; fthread_count; f++) { + rv = apr_thread_pool_push(thrp, run_filethread, cfg, 0, NULL); + } + + do { + apr_sleep(apr_time_from_sec(1)); + count = apr_queue_size(queue); + } while (count > 0); + + rv = apr_queue_term(queue); + + rv = apr_thread_pool_destroy(thrp); +} +#endif -- cgit v0.9.2