summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorGravatar Edward Rudd 2009-03-14 22:07:56 +0000
committerGravatar Edward Rudd 2009-03-14 22:07:56 +0000
commitb8790b1b48c238f2ba266e34625b8e8f5db0ad6e (patch)
treea9c7b08bc4ed3ee59aaf574f5f1738efa915fadc
parent99867e8a2eca4421075900e44f24cfd749db7dcb (diff)
refactoroed to allo wdb connections to be per-thread
added initial threading implementation using apr_queues and apr_thread_pools
-rw-r--r--utility/config.h1
-rw-r--r--utility/database.c73
-rw-r--r--utility/database.h15
-rw-r--r--utility/logparse.c19
-rw-r--r--utility/logparse.h5
-rw-r--r--utility/shell.c166
6 files changed, 204 insertions, 75 deletions
diff --git a/utility/config.h b/utility/config.h
index 91c6f65..a9bd1b5 100644
--- a/utility/config.h
+++ b/utility/config.h
@@ -58,7 +58,6 @@ struct config_t {
58 /** db connection configuration */ 58 /** db connection configuration */
59 const char *dbdriver; 59 const char *dbdriver;
60 const char *dbparams; 60 const char *dbparams;
61 config_dbd_t *dbconn;
62 61
63 /** Logging table */ 62 /** Logging table */
64 const char *table; 63 const char *table;
diff --git a/utility/database.c b/utility/database.c
index 98c203b..ff81caa 100644
--- a/utility/database.c
+++ b/utility/database.c
@@ -19,15 +19,16 @@ void database_init(apr_pool_t *p)
19 apr_dbd_init(p); 19 apr_dbd_init(p);
20} 20}
21 21
22apr_status_t database_connect(config_t *cfg) 22/** @todo split this into load and connect */
23apr_status_t database_connect(config_t *cfg, config_dbd_t **dbconn)
23{ 24{
24 apr_status_t rv; 25 apr_status_t rv;
25 if (!cfg->dbdriver || !cfg->dbparams) 26 if (!cfg->dbdriver || !cfg->dbparams)
26 return APR_EINVAL; 27 return APR_EINVAL;
27 if (!cfg->dbconn) { 28 if (!*dbconn) {
28 cfg->dbconn = apr_pcalloc(cfg->pool, sizeof(config_dbd_t)); 29 *dbconn = apr_pcalloc(cfg->pool, sizeof(config_dbd_t));
29 } 30 }
30 rv = apr_dbd_get_driver(cfg->pool, cfg->dbdriver, &(cfg->dbconn->driver)); 31 rv = apr_dbd_get_driver(cfg->pool, cfg->dbdriver, &((*dbconn)->driver));
31 if (rv) { 32 if (rv) {
32 33
33 logging_log(cfg, LOGLEVEL_ERROR, 34 logging_log(cfg, LOGLEVEL_ERROR,
@@ -36,8 +37,8 @@ apr_status_t database_connect(config_t *cfg)
36 return rv; 37 return rv;
37 } 38 }
38 39
39 rv = apr_dbd_open(cfg->dbconn->driver, cfg->pool, cfg->dbparams, 40 rv = apr_dbd_open((*dbconn)->driver, cfg->pool, cfg->dbparams,
40 &(cfg->dbconn->dbd)); 41 &((*dbconn)->dbd));
41 if (rv) { 42 if (rv) {
42 logging_log(cfg, LOGLEVEL_ERROR, 43 logging_log(cfg, LOGLEVEL_ERROR,
43 "DB: Could not connect to database. Error (%d)%s", rv, 44 "DB: Could not connect to database. Error (%d)%s", rv,
@@ -48,12 +49,13 @@ apr_status_t database_connect(config_t *cfg)
48 return APR_SUCCESS; 49 return APR_SUCCESS;
49} 50}
50 51
51apr_status_t database_disconnect(config_t *cfg) 52apr_status_t database_disconnect(config_dbd_t *dbconn)
52{ 53{
53 return apr_dbd_close(cfg->dbconn->driver, cfg->dbconn->dbd); 54 return apr_dbd_close(dbconn->driver, dbconn->dbd);
54} 55}
55 56
56static apr_dbd_prepared_t *database_prepare_insert(config_t *cfg, apr_pool_t *p) 57static apr_dbd_prepared_t *database_prepare_insert(config_t *cfg,
58 config_dbd_t *dbconn, apr_pool_t *p)
57{ 59{
58 apr_status_t rv; 60 apr_status_t rv;
59 char *sql; 61 char *sql;
@@ -93,19 +95,20 @@ static apr_dbd_prepared_t *database_prepare_insert(config_t *cfg, apr_pool_t *p)
93 95
94 logging_log(cfg, LOGLEVEL_DEBUG, "DB: Generated SQL: %s", sql); 96 logging_log(cfg, LOGLEVEL_DEBUG, "DB: Generated SQL: %s", sql);
95 97
96 rv = apr_dbd_prepare(cfg->dbconn->driver, cfg->pool, cfg->dbconn->dbd, sql, 98 rv = apr_dbd_prepare(dbconn->driver, cfg->pool, dbconn->dbd, sql,
97 "INSERT", &stmt); 99 "INSERT", &stmt);
98 100
99 if (rv) { 101 if (rv) {
100 logging_log(cfg, LOGLEVEL_NOISE, 102 logging_log(cfg, LOGLEVEL_NOISE,
101 "DB: Unable to Prepare SQL insert: %s", apr_dbd_error( 103 "DB: Unable to Prepare SQL insert: %s", apr_dbd_error(
102 cfg->dbconn->driver, cfg->dbconn->dbd, rv)); 104 dbconn->driver, dbconn->dbd, rv));
103 return NULL; 105 return NULL;
104 } 106 }
105 return stmt; 107 return stmt;
106} 108}
107 109
108apr_status_t database_insert(config_t *cfg, apr_pool_t *p, apr_table_t *data) 110apr_status_t database_insert(config_t *cfg, config_dbd_t *dbconn,
111 apr_pool_t *p, apr_table_t *data)
109{ 112{
110 apr_status_t rv; 113 apr_status_t rv;
111 int f, nfs; 114 int f, nfs;
@@ -113,93 +116,95 @@ apr_status_t database_insert(config_t *cfg, apr_pool_t *p, apr_table_t *data)
113 ofields = (config_output_field_t *)cfg->output_fields->elts; 116 ofields = (config_output_field_t *)cfg->output_fields->elts;
114 nfs = cfg->output_fields->nelts; 117 nfs = cfg->output_fields->nelts;
115 // Prepare statement 118 // Prepare statement
116 if (!cfg->dbconn->stmt) { 119 if (!dbconn->stmt) {
117 cfg->dbconn->stmt = database_prepare_insert(cfg, p); 120 dbconn->stmt = database_prepare_insert(cfg, dbconn, p);
118 if (!cfg->dbconn->stmt) { 121 if (!dbconn->stmt) {
119 return APR_EINVAL; 122 return APR_EINVAL;
120 } 123 }
121 cfg->dbconn->args = apr_palloc(cfg->pool, nfs * sizeof(char *)); 124 dbconn->args = apr_palloc(cfg->pool, nfs * sizeof(char *));
122 } 125 }
123 for (f=0; f<nfs; f++) { 126 for (f=0; f<nfs; f++) {
124 cfg->dbconn->args[f] = apr_table_get(data, ofields[f].field); 127 dbconn->args[f] = apr_table_get(data, ofields[f].field);
125 } 128 }
126 rv = apr_dbd_pquery(cfg->dbconn->driver, p, cfg->dbconn->dbd, &f, 129 rv = apr_dbd_pquery(dbconn->driver, p, dbconn->dbd, &f,
127 cfg->dbconn->stmt, nfs, cfg->dbconn->args); 130 dbconn->stmt, nfs, dbconn->args);
128 if (rv) { 131 if (rv) {
129 logging_log(cfg, LOGLEVEL_ERROR, "DB: Unable to Insert SQL: %s", 132 logging_log(cfg, LOGLEVEL_ERROR, "DB: Unable to Insert SQL: %s",
130 apr_dbd_error(cfg->dbconn->driver, cfg->dbconn->dbd, rv)); 133 apr_dbd_error(dbconn->driver, dbconn->dbd, rv));
131 return rv; 134 return rv;
132 } 135 }
133 return APR_SUCCESS; 136 return APR_SUCCESS;
134} 137}
135 138
136apr_status_t database_trans_start(config_t *cfg, apr_pool_t *p) 139apr_status_t database_trans_start(config_t *cfg, config_dbd_t *dbconn,
140 apr_pool_t *p)
137{ 141{
138#if HAVE_APR_DBD_TRANSACTION_MODE_GET 142#if HAVE_APR_DBD_TRANSACTION_MODE_GET
139 apr_status_t rv; 143 apr_status_t rv;
140 if (!cfg->transactions) 144 if (!cfg->transactions)
141 return APR_SUCCESS; 145 return APR_SUCCESS;
142 if (cfg->dbconn->txn) { 146 if (dbconn->txn) {
143 logging_log(cfg, LOGLEVEL_NOISE, 147 logging_log(cfg, LOGLEVEL_NOISE,
144 "Transaction Already Started. Something is BROKE"); 148 "Transaction Already Started. Something is BROKE");
145 return APR_EINVAL; 149 return APR_EINVAL;
146 } 150 }
147 logging_log(cfg, LOGLEVEL_DEBUG, "DB: Starting Transaction"); 151 logging_log(cfg, LOGLEVEL_DEBUG, "DB: Starting Transaction");
148 rv = apr_dbd_transaction_start(cfg->dbconn->driver, p, cfg->dbconn->dbd, 152 rv = apr_dbd_transaction_start(dbconn->driver, p, dbconn->dbd,
149 &cfg->dbconn->txn); 153 &dbconn->txn);
150 if (rv) 154 if (rv)
151 logging_log(cfg, LOGLEVEL_NOISE, 155 logging_log(cfg, LOGLEVEL_NOISE,
152 "DB: Error Starting Transaction: (%d)%s", rv, apr_dbd_error( 156 "DB: Error Starting Transaction: (%d)%s", rv, apr_dbd_error(
153 cfg->dbconn->driver, cfg->dbconn->dbd, rv)); 157 dbconn->driver, dbconn->dbd, rv));
154 return rv; 158 return rv;
155#else 159#else
156 return APR_SUCCESS; 160 return APR_SUCCESS;
157#endif 161#endif
158} 162}
159 163
160apr_status_t database_trans_stop(config_t *cfg, apr_pool_t *p) 164apr_status_t database_trans_stop(config_t *cfg, config_dbd_t *dbconn,
165 apr_pool_t *p)
161{ 166{
162#if HAVE_APR_DBD_TRANSACTION_MODE_GET 167#if HAVE_APR_DBD_TRANSACTION_MODE_GET
163 apr_status_t rv; 168 apr_status_t rv;
164 if (!cfg->transactions) 169 if (!cfg->transactions)
165 return APR_SUCCESS; 170 return APR_SUCCESS;
166 if (!cfg->dbconn->txn) { 171 if (!dbconn->txn) {
167 logging_log(cfg, LOGLEVEL_NOISE, 172 logging_log(cfg, LOGLEVEL_NOISE,
168 "No Transaction Started. Something is BROKE"); 173 "No Transaction Started. Something is BROKE");
169 return APR_EINVAL; 174 return APR_EINVAL;
170 } 175 }
171 logging_log(cfg, LOGLEVEL_DEBUG, "DB: Stopping Transaction"); 176 logging_log(cfg, LOGLEVEL_DEBUG, "DB: Stopping Transaction");
172 rv = apr_dbd_transaction_end(cfg->dbconn->driver, p, cfg->dbconn->txn); 177 rv = apr_dbd_transaction_end(dbconn->driver, p, dbconn->txn);
173 if (rv) 178 if (rv)
174 logging_log(cfg, LOGLEVEL_NOISE, 179 logging_log(cfg, LOGLEVEL_NOISE,
175 "DB: Error Stopping Transaction: (%d)%s", rv, apr_dbd_error( 180 "DB: Error Stopping Transaction: (%d)%s", rv, apr_dbd_error(
176 cfg->dbconn->driver, cfg->dbconn->dbd, rv)); 181 dbconn->driver, dbconn->dbd, rv));
177 182
178 cfg->dbconn->txn = NULL; 183 dbconn->txn = NULL;
179 return rv; 184 return rv;
180#else 185#else
181 return APR_SUCCESS; 186 return APR_SUCCESS;
182#endif 187#endif
183} 188}
184 189
185apr_status_t database_trans_abort(config_t *cfg) 190apr_status_t database_trans_abort(config_t *cfg, config_dbd_t *dbconn)
186{ 191{
187#if HAVE_APR_DBD_TRANSACTION_MODE_GET 192#if HAVE_APR_DBD_TRANSACTION_MODE_GET
188 apr_status_t rv; 193 apr_status_t rv;
189 if (!cfg->transactions) 194 if (!cfg->transactions)
190 return APR_SUCCESS; 195 return APR_SUCCESS;
191 if (!cfg->dbconn->txn) { 196 if (!dbconn->txn) {
192 logging_log(cfg, LOGLEVEL_NOISE, 197 logging_log(cfg, LOGLEVEL_NOISE,
193 "No Transaction Started. Something is BROKE"); 198 "No Transaction Started. Something is BROKE");
194 return APR_EINVAL; 199 return APR_EINVAL;
195 } 200 }
196 logging_log(cfg, LOGLEVEL_NOTICE, "DB: Aborting Transaction"); 201 logging_log(cfg, LOGLEVEL_NOTICE, "DB: Aborting Transaction");
197 rv = apr_dbd_transaction_mode_set(cfg->dbconn->driver, cfg->dbconn->txn, 202 rv = apr_dbd_transaction_mode_set(dbconn->driver, dbconn->txn,
198 APR_DBD_TRANSACTION_ROLLBACK); 203 APR_DBD_TRANSACTION_ROLLBACK);
199 if (rv) 204 if (rv)
200 logging_log(cfg, LOGLEVEL_NOISE, 205 logging_log(cfg, LOGLEVEL_NOISE,
201 "DB: Error Aborting Transaction: (%d)%s", rv, apr_dbd_error( 206 "DB: Error Aborting Transaction: (%d)%s", rv, apr_dbd_error(
202 cfg->dbconn->driver, cfg->dbconn->dbd, rv)); 207 dbconn->driver, dbconn->dbd, rv));
203 return rv; 208 return rv;
204#else 209#else
205 return APR_SUCCESS; 210 return APR_SUCCESS;
diff --git a/utility/database.h b/utility/database.h
index eed8898..9797d51 100644
--- a/utility/database.h
+++ b/utility/database.h
@@ -7,16 +7,19 @@
7 7
8void database_init(apr_pool_t *p); 8void database_init(apr_pool_t *p);
9 9
10apr_status_t database_connect(config_t *cfg); 10apr_status_t database_connect(config_t *cfg, config_dbd_t **dbconn);
11 11
12apr_status_t database_disconnect(config_t *cfg); 12apr_status_t database_disconnect(config_dbd_t *dbconn);
13 13
14apr_status_t database_insert(config_t *cfg, apr_pool_t *p, apr_table_t *data); 14apr_status_t database_insert(config_t *cfg, config_dbd_t *dbconn,
15 apr_pool_t *p, apr_table_t *data);
15 16
16apr_status_t database_trans_start(config_t *cfg, apr_pool_t *p); 17apr_status_t database_trans_start(config_t *cfg, config_dbd_t *dbconn,
18 apr_pool_t *p);
17 19
18apr_status_t database_trans_stop(config_t *cfg, apr_pool_t *p); 20apr_status_t database_trans_stop(config_t *cfg, config_dbd_t *dbconn,
21 apr_pool_t *p);
19 22
20apr_status_t database_trans_abort(config_t *cfg); 23apr_status_t database_trans_abort(config_t *cfg, config_dbd_t *dbconn);
21 24
22#endif /*DATABASE_H_*/ 25#endif /*DATABASE_H_*/
diff --git a/utility/logparse.c b/utility/logparse.c
index 7267682..1b3cc97 100644
--- a/utility/logparse.c
+++ b/utility/logparse.c
@@ -266,14 +266,12 @@ void parser_split_logs(config_t *cfg)
266 ++linecount; 266 ++linecount;
267 } 267 }
268 } 268 }
269 printf("Lines %'d\n",linecount);
270 // now we know how long it is. Lets split up the file 269 // now we know how long it is. Lets split up the file
271 piecesize = linecount / cfg->split_count; 270 piecesize = linecount / cfg->split_count;
272 if (piecesize < cfg->split_minimum) 271 if (piecesize < cfg->split_minimum)
273 piecesize = cfg->split_minimum; 272 piecesize = cfg->split_minimum;
274 if (piecesize > cfg->split_maximum && cfg->split_maximum > 0) 273 if (piecesize > cfg->split_maximum && cfg->split_maximum > 0)
275 piecesize = cfg->split_maximum; 274 piecesize = cfg->split_maximum;
276 printf("Piece size %'d\n", piecesize);
277 if (piecesize > linecount) { 275 if (piecesize > linecount) {
278 // File is smaller than piece size just add it back in as is 276 // File is smaller than piece size just add it back in as is
279 newfile = (config_filestat_t *)apr_array_push(cfg->input_files); 277 newfile = (config_filestat_t *)apr_array_push(cfg->input_files);
@@ -296,7 +294,6 @@ void parser_split_logs(config_t *cfg)
296 basefile = apr_pstrdup(tfp, basename(apr_pstrdup(tfp, filelist[f].fname))); 294 basefile = apr_pstrdup(tfp, basename(apr_pstrdup(tfp, filelist[f].fname)));
297 295
298 file = apr_psprintf(tfp, "%s/%s-%d", cfg->split_dir, basefile, file_count++); 296 file = apr_psprintf(tfp, "%s/%s-%d", cfg->split_dir, basefile, file_count++);
299 printf("Out file %s\n", file);
300 logging_log(cfg, LOGLEVEL_NOTICE, "SPLITTER: Creating output file %s", file); 297 logging_log(cfg, LOGLEVEL_NOTICE, "SPLITTER: Creating output file %s", file);
301 rv = apr_file_open(&outfile, file, APR_FOPEN_WRITE | APR_FOPEN_CREATE | APR_FOPEN_TRUNCATE, APR_OS_DEFAULT, tfp); 298 rv = apr_file_open(&outfile, file, APR_FOPEN_WRITE | APR_FOPEN_CREATE | APR_FOPEN_TRUNCATE, APR_OS_DEFAULT, tfp);
302 if (rv != APR_SUCCESS) { 299 if (rv != APR_SUCCESS) {
@@ -343,7 +340,6 @@ void parser_split_logs(config_t *cfg)
343 out_lines = 0; 340 out_lines = 0;
344 // Open new file 341 // Open new file
345 file = apr_psprintf(tfp, "%s/%s-%d", cfg->split_dir, basefile, file_count++); 342 file = apr_psprintf(tfp, "%s/%s-%d", cfg->split_dir, basefile, file_count++);
346 printf("Out file %s\n", file);
347 logging_log(cfg, LOGLEVEL_NOTICE, "SPLITTER: Creating output file %s", file); 343 logging_log(cfg, LOGLEVEL_NOTICE, "SPLITTER: Creating output file %s", file);
348 rv = apr_file_open(&outfile, file, APR_FOPEN_WRITE | APR_FOPEN_CREATE | APR_FOPEN_TRUNCATE, APR_OS_DEFAULT, tfp); 344 rv = apr_file_open(&outfile, file, APR_FOPEN_WRITE | APR_FOPEN_CREATE | APR_FOPEN_TRUNCATE, APR_OS_DEFAULT, tfp);
349 if (rv != APR_SUCCESS) { 345 if (rv != APR_SUCCESS) {
@@ -548,7 +544,8 @@ apr_status_t parser_tokenize_line(const char *arg_str, char ***argv_out,
548 return APR_SUCCESS; 544 return APR_SUCCESS;
549} 545}
550 546
551apr_status_t parser_parsefile(config_t *cfg, config_filestat_t *fstat) 547apr_status_t parser_parsefile(config_t *cfg, config_dbd_t *dbconn,
548 config_filestat_t *fstat)
552{ 549{
553 apr_pool_t *tp, *targp; 550 apr_pool_t *tp, *targp;
554 apr_file_t *file; 551 apr_file_t *file;
@@ -573,7 +570,7 @@ apr_status_t parser_parsefile(config_t *cfg, config_filestat_t *fstat)
573 fstat->linesparsed = 0; 570 fstat->linesparsed = 0;
574 // Start Transaction 571 // Start Transaction
575 fstat->start = apr_time_now(); 572 fstat->start = apr_time_now();
576 if (!cfg->dryrun && database_trans_start(cfg,tp)) { 573 if (!cfg->dryrun && database_trans_start(cfg, dbconn, tp)) {
577 fstat->result = "Database Transaction Error"; 574 fstat->result = "Database Transaction Error";
578 fstat->stop = apr_time_now(); 575 fstat->stop = apr_time_now();
579 return rv; 576 return rv;
@@ -614,14 +611,14 @@ apr_status_t parser_parsefile(config_t *cfg, config_filestat_t *fstat)
614 targc = 0; 611 targc = 0;
615 while (targv[targc]) 612 while (targv[targc])
616 targc++; 613 targc++;
617 rv = parser_processline(targp, cfg, fstat, targv, targc); 614 rv = parser_processline(targp, cfg, dbconn, fstat, targv, targc);
618 if (rv != APR_SUCCESS) { 615 if (rv != APR_SUCCESS) {
619 int i; 616 int i;
620 617
621 fstat->linesbad++; 618 fstat->linesbad++;
622 rv = parser_logbadline(cfg, fstat->fname, buff); 619 rv = parser_logbadline(cfg, fstat->fname, buff);
623 if (rv) { 620 if (rv) {
624 if (!cfg->dryrun) database_trans_abort(cfg); 621 if (!cfg->dryrun) database_trans_abort(cfg, dbconn);
625 logging_log(cfg, LOGLEVEL_ERROR, "Line %d(%d): %s", fstat->linesparsed, 622 logging_log(cfg, LOGLEVEL_ERROR, "Line %d(%d): %s", fstat->linesparsed,
626 targc, buff); 623 targc, buff);
627 for (i = 0; targv[i]; i++) { 624 for (i = 0; targv[i]; i++) {
@@ -637,7 +634,7 @@ apr_status_t parser_parsefile(config_t *cfg, config_filestat_t *fstat)
637 } while (rv == APR_SUCCESS); 634 } while (rv == APR_SUCCESS);
638 apr_file_close(file); 635 apr_file_close(file);
639 // Finish Transaction 636 // Finish Transaction
640 if (!cfg->dryrun && database_trans_stop(cfg,tp)) { 637 if (!cfg->dryrun && database_trans_stop(cfg, dbconn, tp)) {
641 fstat->result = apr_psprintf(cfg->pool, 638 fstat->result = apr_psprintf(cfg->pool,
642 "Input line %d, Database Transaction Error", 639 "Input line %d, Database Transaction Error",
643 fstat->linesparsed); 640 fstat->linesparsed);
@@ -655,7 +652,7 @@ apr_status_t parser_parsefile(config_t *cfg, config_filestat_t *fstat)
655} 652}
656 653
657apr_status_t parser_processline(apr_pool_t *ptemp, config_t *cfg, 654apr_status_t parser_processline(apr_pool_t *ptemp, config_t *cfg,
658 config_filestat_t *fstat, char **argv, int argc) 655 config_dbd_t *dbconn, config_filestat_t *fstat, char **argv, int argc)
659{ 656{
660 config_logformat_t *fmt; 657 config_logformat_t *fmt;
661 config_logformat_field_t *ifields; 658 config_logformat_field_t *ifields;
@@ -760,7 +757,7 @@ apr_status_t parser_processline(apr_pool_t *ptemp, config_t *cfg,
760 757
761 // Process DB Query 758 // Process DB Query
762 if (!cfg->dryrun) { 759 if (!cfg->dryrun) {
763 rv = database_insert(cfg, ptemp, dataout); 760 rv = database_insert(cfg, dbconn, ptemp, dataout);
764 if (rv) { 761 if (rv) {
765 fstat->result = apr_psprintf(cfg->pool, 762 fstat->result = apr_psprintf(cfg->pool,
766 "Input line %d, Database Error", 763 "Input line %d, Database Error",
diff --git a/utility/logparse.h b/utility/logparse.h
index cd085b5..7ca0958 100644
--- a/utility/logparse.h
+++ b/utility/logparse.h
@@ -28,9 +28,10 @@ void parser_split_logs(config_t *cfg);
28apr_status_t parser_tokenize_line(const char *arg_str, char ***argv_out, 28apr_status_t parser_tokenize_line(const char *arg_str, char ***argv_out,
29 apr_pool_t *token_context); 29 apr_pool_t *token_context);
30 30
31apr_status_t parser_parsefile(config_t *cfg, config_filestat_t *fstat); 31apr_status_t parser_parsefile(config_t *cfg, config_dbd_t *dbconn,
32 config_filestat_t *fstat);
32 33
33apr_status_t parser_processline(apr_pool_t *ptemp, config_t *cfg, 34apr_status_t parser_processline(apr_pool_t *ptemp, config_t *cfg,
34 config_filestat_t *line, char **argv, int argc); 35 config_dbd_t *dbconn, config_filestat_t *line, char **argv, int argc);
35 36
36#endif /*LOGPARSE_H_*/ 37#endif /*LOGPARSE_H_*/
diff --git a/utility/shell.c b/utility/shell.c
index 2f2f43b..0fea741 100644
--- a/utility/shell.c
+++ b/utility/shell.c
@@ -12,6 +12,17 @@
12#include "database.h" 12#include "database.h"
13#include "util.h" 13#include "util.h"
14 14
15#if APR_HAS_THREADS
16#include "apr_queue.h"
17#include "apr_thread_pool.h"
18
19static apr_queue_t *queue;
20
21void run_multithreaded(config_t *cfg);
22#endif
23
24void run_singlethreaded(config_t *cfg);
25
15const apr_getopt_option_t _opt_config[] = { 26const apr_getopt_option_t _opt_config[] = {
16 {"machineid", 'm', 1, "Machine ID for the log file"}, 27 {"machineid", 'm', 1, "Machine ID for the log file"},
17 {"transaction", 't', 1, "Use a Transaction (yes,no)"}, 28 {"transaction", 't', 1, "Use a Transaction (yes,no)"},
@@ -23,6 +34,7 @@ const apr_getopt_option_t _opt_config[] = {
23 {"dump", 'd', 0, "Dump the configuration after parsing and quit"}, 34 {"dump", 'd', 0, "Dump the configuration after parsing and quit"},
24 {"loglevel", 'l', 1, "Log Level (deubg, notice, error)"}, 35 {"loglevel", 'l', 1, "Log Level (deubg, notice, error)"},
25 {"summary", 's', 1, "Summary (yes,no)"}, 36 {"summary", 's', 1, "Summary (yes,no)"},
37 {"threadcount", 'p', 1, "Set thread count (a number greater than 0)"},
26 {"help", 'h', 0, "Show Help"}, 38 {"help", 'h', 0, "Show Help"},
27 {NULL} 39 {NULL}
28}; 40};
@@ -58,25 +70,45 @@ void show_help(const char *prog, const apr_getopt_option_t *opts, FILE *output)
58void print_summary(config_t *cfg) { 70void print_summary(config_t *cfg) {
59 config_filestat_t *fstat; 71 config_filestat_t *fstat;
60 int i,m; 72 int i,m;
73 apr_time_t totaltime = 0;
74 apr_size_t totalparsed = 0, totalskipped = 0, totalbad = 0;
61 75
62 fstat = (config_filestat_t *)cfg->input_files->elts; 76 fstat = (config_filestat_t *)cfg->input_files->elts;
63 77
64 printf("Execution Summary\nParsed %d files\n", cfg->input_files->nelts); 78 printf("Execution Summary\nParsed %d files\n", cfg->input_files->nelts);
65 for (i=0, m=cfg->input_files->nelts; i<m; i++) { 79 for (i=0, m=cfg->input_files->nelts; i<m; i++) {
80 totaltime += fstat[i].stop - fstat[i].start;
81 totalparsed += fstat[i].linesparsed;
82 totalskipped += fstat[i].lineskipped;
83 totalbad += fstat[i].linesbad;
66 printf(" File: %s\n" 84 printf(" File: %s\n"
67 " Lines Parsed %'d out of %'d (Skipped %'d, Bad %'d)\n" 85 " Lines Added %'d out of %'d (Skipped %'d, Bad %'d)\n"
68 " Status: %s\n" 86 " Status: %s\n"
69 " Duration: %02"APR_TIME_T_FMT":%02"APR_TIME_T_FMT".%"APR_TIME_T_FMT" (minutes, seconds, and miliseconds)\n" 87 " Duration: %02"APR_TIME_T_FMT":%02"APR_TIME_T_FMT".%"APR_TIME_T_FMT" (minutes, seconds, and miliseconds)\n"
70 "\n", 88 "\n",
71 fstat[i].fname, 89 fstat[i].fname,
72 fstat[i].linesparsed - fstat[i].lineskipped - fstat[i].linesbad, 90 fstat[i].linesparsed - fstat[i].lineskipped - fstat[i].linesbad,
73 fstat[i].linesparsed, fstat[i].lineskipped, fstat[i].linesbad, 91 fstat[i].linesparsed,
92 fstat[i].lineskipped,
93 fstat[i].linesbad,
74 fstat[i].result, 94 fstat[i].result,
75 apr_time_sec(fstat[i].stop - fstat[i].start)/60, 95 apr_time_sec(fstat[i].stop - fstat[i].start)/60,
76 apr_time_sec(fstat[i].stop - fstat[i].start) % 60, 96 apr_time_sec(fstat[i].stop - fstat[i].start) % 60,
77 apr_time_msec(fstat[i].stop - fstat[i].start) 97 apr_time_msec(fstat[i].stop - fstat[i].start)
78 ); 98 );
79 } 99 }
100 printf("Totals\n"
101 " Lines Added %'d out of %'d (Skipped %'d, Bad %'d)\n"
102 " Duration: %02"APR_TIME_T_FMT":%02"APR_TIME_T_FMT".%"APR_TIME_T_FMT" (minutes, seconds, and miliseconds)\n"
103 "\n",
104 totalparsed - totalskipped - totalbad,
105 totalparsed,
106 totalskipped,
107 totalbad,
108 apr_time_sec(totaltime)/60,
109 apr_time_sec(totaltime) % 60,
110 apr_time_msec(totaltime)
111 );
80} 112}
81 113
82int main(int argc, const char *const argv[]) 114int main(int argc, const char *const argv[])
@@ -130,6 +162,9 @@ int main(int argc, const char *const argv[])
130 case 'n': 162 case 'n':
131 apr_table_setn(args,"dryrun","yes"); 163 apr_table_setn(args,"dryrun","yes");
132 break; 164 break;
165 case 'p':
166 apr_table_setn(args,"threadcount",opt_arg);
167 break;
133 case 'r': 168 case 'r':
134 apr_table_setn(args,"logformat",opt_arg); 169 apr_table_setn(args,"logformat",opt_arg);
135 break; 170 break;
@@ -185,34 +220,123 @@ int main(int argc, const char *const argv[])
185 if (apr_is_empty_array(cfg->input_files)) { 220 if (apr_is_empty_array(cfg->input_files)) {
186 parser_find_logs(cfg); 221 parser_find_logs(cfg);
187 } 222 }
188 if (!cfg->dryrun) {
189 if ((rv = database_connect(cfg))) {
190 logging_log(cfg,LOGLEVEL_NOISE, "Error Connecting to Database");
191 exit(1);
192 }
193 }
194 if (!apr_is_empty_array(cfg->input_files)) { 223 if (!apr_is_empty_array(cfg->input_files)) {
195 parser_split_logs(cfg); 224 parser_split_logs(cfg);
196 config_filestat_t *filelist; 225#if APR_HAS_THREADS
197 int f, l; 226 if (cfg->thread_count > 1) {
198 filelist = (config_filestat_t *)cfg->input_files->elts; 227 run_multithreaded(cfg);
199 for (f=0, l=cfg->input_files->nelts; f < l; f++) { 228 } else {
200 rv = parser_parsefile(cfg, &filelist[f]); 229#endif
201 if (rv) { 230 run_singlethreaded(cfg);
202 logging_log(cfg, LOGLEVEL_NOISE, 231#if APR_HAS_THREADS
203 "Error occured parsing log files. Aborting");
204 break;
205 }
206 } 232 }
233#endif
207 } else { 234 } else {
208 logging_log(cfg,LOGLEVEL_NOISE,"No log files found to parse"); 235 logging_log(cfg,LOGLEVEL_NOISE,"No log files found to parse");
209 } 236 }
210 if (!cfg->dryrun) {
211 database_disconnect(cfg);
212 }
213 237
214 if (cfg->summary) { 238 if (cfg->summary) {
215 print_summary(cfg); 239 print_summary(cfg);
216 } 240 }
217 return 0; 241 return 0;
218} 242}
243
244void run_singlethreaded(config_t *cfg)
245{
246 config_filestat_t *filelist;
247 config_dbd_t *dbconn = NULL;
248 int f, l;
249 apr_status_t rv;
250
251 if (!cfg->dryrun) {
252 if ((rv = database_connect(cfg, &dbconn))) {
253 logging_log(cfg,LOGLEVEL_NOISE, "Error Connecting to Database");
254 exit(1);
255 }
256 }
257
258 filelist = (config_filestat_t *)cfg->input_files->elts;
259 for (f=0, l=cfg->input_files->nelts; f < l; f++) {
260 rv = parser_parsefile(cfg, dbconn, &filelist[f]);
261 if (rv) {
262 logging_log(cfg, LOGLEVEL_NOISE,
263 "Error occured parsing log files. Aborting");
264 break;
265 }
266 }
267
268 if (!cfg->dryrun) {
269 database_disconnect(dbconn);
270 }
271}
272
273#if APR_HAS_THREADS
274void * APR_THREAD_FUNC run_filethread(apr_thread_t *thd, void *data)
275{
276 config_t *cfg = data;
277 config_dbd_t *dbconn = NULL;
278 config_filestat_t *fileentry;
279 apr_status_t rv;
280
281 if (!cfg->dryrun) {
282 if ((rv = database_connect(cfg, &dbconn))) {
283 logging_log(cfg,LOGLEVEL_NOISE, "Error Connecting to Database");
284 exit(1);
285 }
286 }
287
288 while (1) {
289 rv = apr_queue_pop(queue, (void **)&fileentry);
290 if (rv == APR_EINTR)
291 continue;
292 if (rv == APR_EOF)
293 break;
294 rv = parser_parsefile(cfg, dbconn, fileentry);
295 if (rv) {
296 logging_log(cfg, LOGLEVEL_NOISE,
297 "Error occured parsing log file %s", fileentry->fname);
298 }
299 }
300
301 if (!cfg->dryrun) {
302 database_disconnect(dbconn);
303 }
304 return NULL;
305}
306
307void run_multithreaded(config_t *cfg)
308{
309 logging_log(cfg, LOGLEVEL_NOISE, "Running Multithreaded");
310
311 config_filestat_t *filelist;
312 int f, l;
313 apr_status_t rv;
314 apr_pool_t *tp;
315 apr_thread_pool_t *thrp;
316 unsigned int count;
317
318 apr_pool_create(&tp, cfg->pool);
319 rv = apr_queue_create(&queue, cfg->input_files->nelts, tp);
320
321 rv = apr_thread_pool_create(&thrp, 0, cfg->thread_count, tp);
322
323 //populate queue
324 filelist = (config_filestat_t *)cfg->input_files->elts;
325 for (f=0, l=cfg->input_files->nelts; f < l; f++) {
326 rv = apr_queue_push(queue, &filelist[f]);
327 }
328 // populate the worker threads
329 for (f=0; f<cfg->thread_count; f++) {
330 rv = apr_thread_pool_push(thrp, run_filethread, cfg, 0, NULL);
331 }
332
333 do {
334 apr_sleep(apr_time_from_sec(1));
335 count = apr_queue_size(queue);
336 } while (count > 0);
337
338 rv = apr_queue_term(queue);
339
340 rv = apr_thread_pool_destroy(thrp);
341}
342#endif