diff options
author | Edward Rudd | 2009-03-14 22:07:56 +0000 |
---|---|---|
committer | Edward Rudd | 2009-03-14 22:07:56 +0000 |
commit | b8790b1b48c238f2ba266e34625b8e8f5db0ad6e (patch) | |
tree | a9c7b08bc4ed3ee59aaf574f5f1738efa915fadc | |
parent | 99867e8a2eca4421075900e44f24cfd749db7dcb (diff) |
refactoroed to allo wdb connections to be per-thread
added initial threading implementation using apr_queues and apr_thread_pools
-rw-r--r-- | utility/config.h | 1 | ||||
-rw-r--r-- | utility/database.c | 73 | ||||
-rw-r--r-- | utility/database.h | 15 | ||||
-rw-r--r-- | utility/logparse.c | 19 | ||||
-rw-r--r-- | utility/logparse.h | 5 | ||||
-rw-r--r-- | utility/shell.c | 166 |
6 files changed, 204 insertions, 75 deletions
diff --git a/utility/config.h b/utility/config.h index 91c6f65..a9bd1b5 100644 --- a/utility/config.h +++ b/utility/config.h | |||
@@ -58,7 +58,6 @@ struct config_t { | |||
58 | /** db connection configuration */ | 58 | /** db connection configuration */ |
59 | const char *dbdriver; | 59 | const char *dbdriver; |
60 | const char *dbparams; | 60 | const char *dbparams; |
61 | config_dbd_t *dbconn; | ||
62 | 61 | ||
63 | /** Logging table */ | 62 | /** Logging table */ |
64 | const char *table; | 63 | const char *table; |
diff --git a/utility/database.c b/utility/database.c index 98c203b..ff81caa 100644 --- a/utility/database.c +++ b/utility/database.c | |||
@@ -19,15 +19,16 @@ void database_init(apr_pool_t *p) | |||
19 | apr_dbd_init(p); | 19 | apr_dbd_init(p); |
20 | } | 20 | } |
21 | 21 | ||
22 | apr_status_t database_connect(config_t *cfg) | 22 | /** @todo split this into load and connect */ |
23 | apr_status_t database_connect(config_t *cfg, config_dbd_t **dbconn) | ||
23 | { | 24 | { |
24 | apr_status_t rv; | 25 | apr_status_t rv; |
25 | if (!cfg->dbdriver || !cfg->dbparams) | 26 | if (!cfg->dbdriver || !cfg->dbparams) |
26 | return APR_EINVAL; | 27 | return APR_EINVAL; |
27 | if (!cfg->dbconn) { | 28 | if (!*dbconn) { |
28 | cfg->dbconn = apr_pcalloc(cfg->pool, sizeof(config_dbd_t)); | 29 | *dbconn = apr_pcalloc(cfg->pool, sizeof(config_dbd_t)); |
29 | } | 30 | } |
30 | rv = apr_dbd_get_driver(cfg->pool, cfg->dbdriver, &(cfg->dbconn->driver)); | 31 | rv = apr_dbd_get_driver(cfg->pool, cfg->dbdriver, &((*dbconn)->driver)); |
31 | if (rv) { | 32 | if (rv) { |
32 | 33 | ||
33 | logging_log(cfg, LOGLEVEL_ERROR, | 34 | logging_log(cfg, LOGLEVEL_ERROR, |
@@ -36,8 +37,8 @@ apr_status_t database_connect(config_t *cfg) | |||
36 | return rv; | 37 | return rv; |
37 | } | 38 | } |
38 | 39 | ||
39 | rv = apr_dbd_open(cfg->dbconn->driver, cfg->pool, cfg->dbparams, | 40 | rv = apr_dbd_open((*dbconn)->driver, cfg->pool, cfg->dbparams, |
40 | &(cfg->dbconn->dbd)); | 41 | &((*dbconn)->dbd)); |
41 | if (rv) { | 42 | if (rv) { |
42 | logging_log(cfg, LOGLEVEL_ERROR, | 43 | logging_log(cfg, LOGLEVEL_ERROR, |
43 | "DB: Could not connect to database. Error (%d)%s", rv, | 44 | "DB: Could not connect to database. Error (%d)%s", rv, |
@@ -48,12 +49,13 @@ apr_status_t database_connect(config_t *cfg) | |||
48 | return APR_SUCCESS; | 49 | return APR_SUCCESS; |
49 | } | 50 | } |
50 | 51 | ||
51 | apr_status_t database_disconnect(config_t *cfg) | 52 | apr_status_t database_disconnect(config_dbd_t *dbconn) |
52 | { | 53 | { |
53 | return apr_dbd_close(cfg->dbconn->driver, cfg->dbconn->dbd); | 54 | return apr_dbd_close(dbconn->driver, dbconn->dbd); |
54 | } | 55 | } |
55 | 56 | ||
56 | static apr_dbd_prepared_t *database_prepare_insert(config_t *cfg, apr_pool_t *p) | 57 | static apr_dbd_prepared_t *database_prepare_insert(config_t *cfg, |
58 | config_dbd_t *dbconn, apr_pool_t *p) | ||
57 | { | 59 | { |
58 | apr_status_t rv; | 60 | apr_status_t rv; |
59 | char *sql; | 61 | char *sql; |
@@ -93,19 +95,20 @@ static apr_dbd_prepared_t *database_prepare_insert(config_t *cfg, apr_pool_t *p) | |||
93 | 95 | ||
94 | logging_log(cfg, LOGLEVEL_DEBUG, "DB: Generated SQL: %s", sql); | 96 | logging_log(cfg, LOGLEVEL_DEBUG, "DB: Generated SQL: %s", sql); |
95 | 97 | ||
96 | rv = apr_dbd_prepare(cfg->dbconn->driver, cfg->pool, cfg->dbconn->dbd, sql, | 98 | rv = apr_dbd_prepare(dbconn->driver, cfg->pool, dbconn->dbd, sql, |
97 | "INSERT", &stmt); | 99 | "INSERT", &stmt); |
98 | 100 | ||
99 | if (rv) { | 101 | if (rv) { |
100 | logging_log(cfg, LOGLEVEL_NOISE, | 102 | logging_log(cfg, LOGLEVEL_NOISE, |
101 | "DB: Unable to Prepare SQL insert: %s", apr_dbd_error( | 103 | "DB: Unable to Prepare SQL insert: %s", apr_dbd_error( |
102 | cfg->dbconn->driver, cfg->dbconn->dbd, rv)); | 104 | dbconn->driver, dbconn->dbd, rv)); |
103 | return NULL; | 105 | return NULL; |
104 | } | 106 | } |
105 | return stmt; | 107 | return stmt; |
106 | } | 108 | } |
107 | 109 | ||
108 | apr_status_t database_insert(config_t *cfg, apr_pool_t *p, apr_table_t *data) | 110 | apr_status_t database_insert(config_t *cfg, config_dbd_t *dbconn, |
111 | apr_pool_t *p, apr_table_t *data) | ||
109 | { | 112 | { |
110 | apr_status_t rv; | 113 | apr_status_t rv; |
111 | int f, nfs; | 114 | int f, nfs; |
@@ -113,93 +116,95 @@ apr_status_t database_insert(config_t *cfg, apr_pool_t *p, apr_table_t *data) | |||
113 | ofields = (config_output_field_t *)cfg->output_fields->elts; | 116 | ofields = (config_output_field_t *)cfg->output_fields->elts; |
114 | nfs = cfg->output_fields->nelts; | 117 | nfs = cfg->output_fields->nelts; |
115 | // Prepare statement | 118 | // Prepare statement |
116 | if (!cfg->dbconn->stmt) { | 119 | if (!dbconn->stmt) { |
117 | cfg->dbconn->stmt = database_prepare_insert(cfg, p); | 120 | dbconn->stmt = database_prepare_insert(cfg, dbconn, p); |
118 | if (!cfg->dbconn->stmt) { | 121 | if (!dbconn->stmt) { |
119 | return APR_EINVAL; | 122 | return APR_EINVAL; |
120 | } | 123 | } |
121 | cfg->dbconn->args = apr_palloc(cfg->pool, nfs * sizeof(char *)); | 124 | dbconn->args = apr_palloc(cfg->pool, nfs * sizeof(char *)); |
122 | } | 125 | } |
123 | for (f=0; f<nfs; f++) { | 126 | for (f=0; f<nfs; f++) { |
124 | cfg->dbconn->args[f] = apr_table_get(data, ofields[f].field); | 127 | dbconn->args[f] = apr_table_get(data, ofields[f].field); |
125 | } | 128 | } |
126 | rv = apr_dbd_pquery(cfg->dbconn->driver, p, cfg->dbconn->dbd, &f, | 129 | rv = apr_dbd_pquery(dbconn->driver, p, dbconn->dbd, &f, |
127 | cfg->dbconn->stmt, nfs, cfg->dbconn->args); | 130 | dbconn->stmt, nfs, dbconn->args); |
128 | if (rv) { | 131 | if (rv) { |
129 | logging_log(cfg, LOGLEVEL_ERROR, "DB: Unable to Insert SQL: %s", | 132 | logging_log(cfg, LOGLEVEL_ERROR, "DB: Unable to Insert SQL: %s", |
130 | apr_dbd_error(cfg->dbconn->driver, cfg->dbconn->dbd, rv)); | 133 | apr_dbd_error(dbconn->driver, dbconn->dbd, rv)); |
131 | return rv; | 134 | return rv; |
132 | } | 135 | } |
133 | return APR_SUCCESS; | 136 | return APR_SUCCESS; |
134 | } | 137 | } |
135 | 138 | ||
136 | apr_status_t database_trans_start(config_t *cfg, apr_pool_t *p) | 139 | apr_status_t database_trans_start(config_t *cfg, config_dbd_t *dbconn, |
140 | apr_pool_t *p) | ||
137 | { | 141 | { |
138 | #if HAVE_APR_DBD_TRANSACTION_MODE_GET | 142 | #if HAVE_APR_DBD_TRANSACTION_MODE_GET |
139 | apr_status_t rv; | 143 | apr_status_t rv; |
140 | if (!cfg->transactions) | 144 | if (!cfg->transactions) |
141 | return APR_SUCCESS; | 145 | return APR_SUCCESS; |
142 | if (cfg->dbconn->txn) { | 146 | if (dbconn->txn) { |
143 | logging_log(cfg, LOGLEVEL_NOISE, | 147 | logging_log(cfg, LOGLEVEL_NOISE, |
144 | "Transaction Already Started. Something is BROKE"); | 148 | "Transaction Already Started. Something is BROKE"); |
145 | return APR_EINVAL; | 149 | return APR_EINVAL; |
146 | } | 150 | } |
147 | logging_log(cfg, LOGLEVEL_DEBUG, "DB: Starting Transaction"); | 151 | logging_log(cfg, LOGLEVEL_DEBUG, "DB: Starting Transaction"); |
148 | rv = apr_dbd_transaction_start(cfg->dbconn->driver, p, cfg->dbconn->dbd, | 152 | rv = apr_dbd_transaction_start(dbconn->driver, p, dbconn->dbd, |
149 | &cfg->dbconn->txn); | 153 | &dbconn->txn); |
150 | if (rv) | 154 | if (rv) |
151 | logging_log(cfg, LOGLEVEL_NOISE, | 155 | logging_log(cfg, LOGLEVEL_NOISE, |
152 | "DB: Error Starting Transaction: (%d)%s", rv, apr_dbd_error( | 156 | "DB: Error Starting Transaction: (%d)%s", rv, apr_dbd_error( |
153 | cfg->dbconn->driver, cfg->dbconn->dbd, rv)); | 157 | dbconn->driver, dbconn->dbd, rv)); |
154 | return rv; | 158 | return rv; |
155 | #else | 159 | #else |
156 | return APR_SUCCESS; | 160 | return APR_SUCCESS; |
157 | #endif | 161 | #endif |
158 | } | 162 | } |
159 | 163 | ||
160 | apr_status_t database_trans_stop(config_t *cfg, apr_pool_t *p) | 164 | apr_status_t database_trans_stop(config_t *cfg, config_dbd_t *dbconn, |
165 | apr_pool_t *p) | ||
161 | { | 166 | { |
162 | #if HAVE_APR_DBD_TRANSACTION_MODE_GET | 167 | #if HAVE_APR_DBD_TRANSACTION_MODE_GET |
163 | apr_status_t rv; | 168 | apr_status_t rv; |
164 | if (!cfg->transactions) | 169 | if (!cfg->transactions) |
165 | return APR_SUCCESS; | 170 | return APR_SUCCESS; |
166 | if (!cfg->dbconn->txn) { | 171 | if (!dbconn->txn) { |
167 | logging_log(cfg, LOGLEVEL_NOISE, | 172 | logging_log(cfg, LOGLEVEL_NOISE, |
168 | "No Transaction Started. Something is BROKE"); | 173 | "No Transaction Started. Something is BROKE"); |
169 | return APR_EINVAL; | 174 | return APR_EINVAL; |
170 | } | 175 | } |
171 | logging_log(cfg, LOGLEVEL_DEBUG, "DB: Stopping Transaction"); | 176 | logging_log(cfg, LOGLEVEL_DEBUG, "DB: Stopping Transaction"); |
172 | rv = apr_dbd_transaction_end(cfg->dbconn->driver, p, cfg->dbconn->txn); | 177 | rv = apr_dbd_transaction_end(dbconn->driver, p, dbconn->txn); |
173 | if (rv) | 178 | if (rv) |
174 | logging_log(cfg, LOGLEVEL_NOISE, | 179 | logging_log(cfg, LOGLEVEL_NOISE, |
175 | "DB: Error Stopping Transaction: (%d)%s", rv, apr_dbd_error( | 180 | "DB: Error Stopping Transaction: (%d)%s", rv, apr_dbd_error( |
176 | cfg->dbconn->driver, cfg->dbconn->dbd, rv)); | 181 | dbconn->driver, dbconn->dbd, rv)); |
177 | 182 | ||
178 | cfg->dbconn->txn = NULL; | 183 | dbconn->txn = NULL; |
179 | return rv; | 184 | return rv; |
180 | #else | 185 | #else |
181 | return APR_SUCCESS; | 186 | return APR_SUCCESS; |
182 | #endif | 187 | #endif |
183 | } | 188 | } |
184 | 189 | ||
185 | apr_status_t database_trans_abort(config_t *cfg) | 190 | apr_status_t database_trans_abort(config_t *cfg, config_dbd_t *dbconn) |
186 | { | 191 | { |
187 | #if HAVE_APR_DBD_TRANSACTION_MODE_GET | 192 | #if HAVE_APR_DBD_TRANSACTION_MODE_GET |
188 | apr_status_t rv; | 193 | apr_status_t rv; |
189 | if (!cfg->transactions) | 194 | if (!cfg->transactions) |
190 | return APR_SUCCESS; | 195 | return APR_SUCCESS; |
191 | if (!cfg->dbconn->txn) { | 196 | if (!dbconn->txn) { |
192 | logging_log(cfg, LOGLEVEL_NOISE, | 197 | logging_log(cfg, LOGLEVEL_NOISE, |
193 | "No Transaction Started. Something is BROKE"); | 198 | "No Transaction Started. Something is BROKE"); |
194 | return APR_EINVAL; | 199 | return APR_EINVAL; |
195 | } | 200 | } |
196 | logging_log(cfg, LOGLEVEL_NOTICE, "DB: Aborting Transaction"); | 201 | logging_log(cfg, LOGLEVEL_NOTICE, "DB: Aborting Transaction"); |
197 | rv = apr_dbd_transaction_mode_set(cfg->dbconn->driver, cfg->dbconn->txn, | 202 | rv = apr_dbd_transaction_mode_set(dbconn->driver, dbconn->txn, |
198 | APR_DBD_TRANSACTION_ROLLBACK); | 203 | APR_DBD_TRANSACTION_ROLLBACK); |
199 | if (rv) | 204 | if (rv) |
200 | logging_log(cfg, LOGLEVEL_NOISE, | 205 | logging_log(cfg, LOGLEVEL_NOISE, |
201 | "DB: Error Aborting Transaction: (%d)%s", rv, apr_dbd_error( | 206 | "DB: Error Aborting Transaction: (%d)%s", rv, apr_dbd_error( |
202 | cfg->dbconn->driver, cfg->dbconn->dbd, rv)); | 207 | dbconn->driver, dbconn->dbd, rv)); |
203 | return rv; | 208 | return rv; |
204 | #else | 209 | #else |
205 | return APR_SUCCESS; | 210 | return APR_SUCCESS; |
diff --git a/utility/database.h b/utility/database.h index eed8898..9797d51 100644 --- a/utility/database.h +++ b/utility/database.h | |||
@@ -7,16 +7,19 @@ | |||
7 | 7 | ||
8 | void database_init(apr_pool_t *p); | 8 | void database_init(apr_pool_t *p); |
9 | 9 | ||
10 | apr_status_t database_connect(config_t *cfg); | 10 | apr_status_t database_connect(config_t *cfg, config_dbd_t **dbconn); |
11 | 11 | ||
12 | apr_status_t database_disconnect(config_t *cfg); | 12 | apr_status_t database_disconnect(config_dbd_t *dbconn); |
13 | 13 | ||
14 | apr_status_t database_insert(config_t *cfg, apr_pool_t *p, apr_table_t *data); | 14 | apr_status_t database_insert(config_t *cfg, config_dbd_t *dbconn, |
15 | apr_pool_t *p, apr_table_t *data); | ||
15 | 16 | ||
16 | apr_status_t database_trans_start(config_t *cfg, apr_pool_t *p); | 17 | apr_status_t database_trans_start(config_t *cfg, config_dbd_t *dbconn, |
18 | apr_pool_t *p); | ||
17 | 19 | ||
18 | apr_status_t database_trans_stop(config_t *cfg, apr_pool_t *p); | 20 | apr_status_t database_trans_stop(config_t *cfg, config_dbd_t *dbconn, |
21 | apr_pool_t *p); | ||
19 | 22 | ||
20 | apr_status_t database_trans_abort(config_t *cfg); | 23 | apr_status_t database_trans_abort(config_t *cfg, config_dbd_t *dbconn); |
21 | 24 | ||
22 | #endif /*DATABASE_H_*/ | 25 | #endif /*DATABASE_H_*/ |
diff --git a/utility/logparse.c b/utility/logparse.c index 7267682..1b3cc97 100644 --- a/utility/logparse.c +++ b/utility/logparse.c | |||
@@ -266,14 +266,12 @@ void parser_split_logs(config_t *cfg) | |||
266 | ++linecount; | 266 | ++linecount; |
267 | } | 267 | } |
268 | } | 268 | } |
269 | printf("Lines %'d\n",linecount); | ||
270 | // now we know how long it is. Lets split up the file | 269 | // now we know how long it is. Lets split up the file |
271 | piecesize = linecount / cfg->split_count; | 270 | piecesize = linecount / cfg->split_count; |
272 | if (piecesize < cfg->split_minimum) | 271 | if (piecesize < cfg->split_minimum) |
273 | piecesize = cfg->split_minimum; | 272 | piecesize = cfg->split_minimum; |
274 | if (piecesize > cfg->split_maximum && cfg->split_maximum > 0) | 273 | if (piecesize > cfg->split_maximum && cfg->split_maximum > 0) |
275 | piecesize = cfg->split_maximum; | 274 | piecesize = cfg->split_maximum; |
276 | printf("Piece size %'d\n", piecesize); | ||
277 | if (piecesize > linecount) { | 275 | if (piecesize > linecount) { |
278 | // File is smaller than piece size just add it back in as is | 276 | // File is smaller than piece size just add it back in as is |
279 | newfile = (config_filestat_t *)apr_array_push(cfg->input_files); | 277 | newfile = (config_filestat_t *)apr_array_push(cfg->input_files); |
@@ -296,7 +294,6 @@ void parser_split_logs(config_t *cfg) | |||
296 | basefile = apr_pstrdup(tfp, basename(apr_pstrdup(tfp, filelist[f].fname))); | 294 | basefile = apr_pstrdup(tfp, basename(apr_pstrdup(tfp, filelist[f].fname))); |
297 | 295 | ||
298 | file = apr_psprintf(tfp, "%s/%s-%d", cfg->split_dir, basefile, file_count++); | 296 | file = apr_psprintf(tfp, "%s/%s-%d", cfg->split_dir, basefile, file_count++); |
299 | printf("Out file %s\n", file); | ||
300 | logging_log(cfg, LOGLEVEL_NOTICE, "SPLITTER: Creating output file %s", file); | 297 | logging_log(cfg, LOGLEVEL_NOTICE, "SPLITTER: Creating output file %s", file); |
301 | rv = apr_file_open(&outfile, file, APR_FOPEN_WRITE | APR_FOPEN_CREATE | APR_FOPEN_TRUNCATE, APR_OS_DEFAULT, tfp); | 298 | rv = apr_file_open(&outfile, file, APR_FOPEN_WRITE | APR_FOPEN_CREATE | APR_FOPEN_TRUNCATE, APR_OS_DEFAULT, tfp); |
302 | if (rv != APR_SUCCESS) { | 299 | if (rv != APR_SUCCESS) { |
@@ -343,7 +340,6 @@ void parser_split_logs(config_t *cfg) | |||
343 | out_lines = 0; | 340 | out_lines = 0; |
344 | // Open new file | 341 | // Open new file |
345 | file = apr_psprintf(tfp, "%s/%s-%d", cfg->split_dir, basefile, file_count++); | 342 | file = apr_psprintf(tfp, "%s/%s-%d", cfg->split_dir, basefile, file_count++); |
346 | printf("Out file %s\n", file); | ||
347 | logging_log(cfg, LOGLEVEL_NOTICE, "SPLITTER: Creating output file %s", file); | 343 | logging_log(cfg, LOGLEVEL_NOTICE, "SPLITTER: Creating output file %s", file); |
348 | rv = apr_file_open(&outfile, file, APR_FOPEN_WRITE | APR_FOPEN_CREATE | APR_FOPEN_TRUNCATE, APR_OS_DEFAULT, tfp); | 344 | rv = apr_file_open(&outfile, file, APR_FOPEN_WRITE | APR_FOPEN_CREATE | APR_FOPEN_TRUNCATE, APR_OS_DEFAULT, tfp); |
349 | if (rv != APR_SUCCESS) { | 345 | if (rv != APR_SUCCESS) { |
@@ -548,7 +544,8 @@ apr_status_t parser_tokenize_line(const char *arg_str, char ***argv_out, | |||
548 | return APR_SUCCESS; | 544 | return APR_SUCCESS; |
549 | } | 545 | } |
550 | 546 | ||
551 | apr_status_t parser_parsefile(config_t *cfg, config_filestat_t *fstat) | 547 | apr_status_t parser_parsefile(config_t *cfg, config_dbd_t *dbconn, |
548 | config_filestat_t *fstat) | ||
552 | { | 549 | { |
553 | apr_pool_t *tp, *targp; | 550 | apr_pool_t *tp, *targp; |
554 | apr_file_t *file; | 551 | apr_file_t *file; |
@@ -573,7 +570,7 @@ apr_status_t parser_parsefile(config_t *cfg, config_filestat_t *fstat) | |||
573 | fstat->linesparsed = 0; | 570 | fstat->linesparsed = 0; |
574 | // Start Transaction | 571 | // Start Transaction |
575 | fstat->start = apr_time_now(); | 572 | fstat->start = apr_time_now(); |
576 | if (!cfg->dryrun && database_trans_start(cfg,tp)) { | 573 | if (!cfg->dryrun && database_trans_start(cfg, dbconn, tp)) { |
577 | fstat->result = "Database Transaction Error"; | 574 | fstat->result = "Database Transaction Error"; |
578 | fstat->stop = apr_time_now(); | 575 | fstat->stop = apr_time_now(); |
579 | return rv; | 576 | return rv; |
@@ -614,14 +611,14 @@ apr_status_t parser_parsefile(config_t *cfg, config_filestat_t *fstat) | |||
614 | targc = 0; | 611 | targc = 0; |
615 | while (targv[targc]) | 612 | while (targv[targc]) |
616 | targc++; | 613 | targc++; |
617 | rv = parser_processline(targp, cfg, fstat, targv, targc); | 614 | rv = parser_processline(targp, cfg, dbconn, fstat, targv, targc); |
618 | if (rv != APR_SUCCESS) { | 615 | if (rv != APR_SUCCESS) { |
619 | int i; | 616 | int i; |
620 | 617 | ||
621 | fstat->linesbad++; | 618 | fstat->linesbad++; |
622 | rv = parser_logbadline(cfg, fstat->fname, buff); | 619 | rv = parser_logbadline(cfg, fstat->fname, buff); |
623 | if (rv) { | 620 | if (rv) { |
624 | if (!cfg->dryrun) database_trans_abort(cfg); | 621 | if (!cfg->dryrun) database_trans_abort(cfg, dbconn); |
625 | logging_log(cfg, LOGLEVEL_ERROR, "Line %d(%d): %s", fstat->linesparsed, | 622 | logging_log(cfg, LOGLEVEL_ERROR, "Line %d(%d): %s", fstat->linesparsed, |
626 | targc, buff); | 623 | targc, buff); |
627 | for (i = 0; targv[i]; i++) { | 624 | for (i = 0; targv[i]; i++) { |
@@ -637,7 +634,7 @@ apr_status_t parser_parsefile(config_t *cfg, config_filestat_t *fstat) | |||
637 | } while (rv == APR_SUCCESS); | 634 | } while (rv == APR_SUCCESS); |
638 | apr_file_close(file); | 635 | apr_file_close(file); |
639 | // Finish Transaction | 636 | // Finish Transaction |
640 | if (!cfg->dryrun && database_trans_stop(cfg,tp)) { | 637 | if (!cfg->dryrun && database_trans_stop(cfg, dbconn, tp)) { |
641 | fstat->result = apr_psprintf(cfg->pool, | 638 | fstat->result = apr_psprintf(cfg->pool, |
642 | "Input line %d, Database Transaction Error", | 639 | "Input line %d, Database Transaction Error", |
643 | fstat->linesparsed); | 640 | fstat->linesparsed); |
@@ -655,7 +652,7 @@ apr_status_t parser_parsefile(config_t *cfg, config_filestat_t *fstat) | |||
655 | } | 652 | } |
656 | 653 | ||
657 | apr_status_t parser_processline(apr_pool_t *ptemp, config_t *cfg, | 654 | apr_status_t parser_processline(apr_pool_t *ptemp, config_t *cfg, |
658 | config_filestat_t *fstat, char **argv, int argc) | 655 | config_dbd_t *dbconn, config_filestat_t *fstat, char **argv, int argc) |
659 | { | 656 | { |
660 | config_logformat_t *fmt; | 657 | config_logformat_t *fmt; |
661 | config_logformat_field_t *ifields; | 658 | config_logformat_field_t *ifields; |
@@ -760,7 +757,7 @@ apr_status_t parser_processline(apr_pool_t *ptemp, config_t *cfg, | |||
760 | 757 | ||
761 | // Process DB Query | 758 | // Process DB Query |
762 | if (!cfg->dryrun) { | 759 | if (!cfg->dryrun) { |
763 | rv = database_insert(cfg, ptemp, dataout); | 760 | rv = database_insert(cfg, dbconn, ptemp, dataout); |
764 | if (rv) { | 761 | if (rv) { |
765 | fstat->result = apr_psprintf(cfg->pool, | 762 | fstat->result = apr_psprintf(cfg->pool, |
766 | "Input line %d, Database Error", | 763 | "Input line %d, Database Error", |
diff --git a/utility/logparse.h b/utility/logparse.h index cd085b5..7ca0958 100644 --- a/utility/logparse.h +++ b/utility/logparse.h | |||
@@ -28,9 +28,10 @@ void parser_split_logs(config_t *cfg); | |||
28 | apr_status_t parser_tokenize_line(const char *arg_str, char ***argv_out, | 28 | apr_status_t parser_tokenize_line(const char *arg_str, char ***argv_out, |
29 | apr_pool_t *token_context); | 29 | apr_pool_t *token_context); |
30 | 30 | ||
31 | apr_status_t parser_parsefile(config_t *cfg, config_filestat_t *fstat); | 31 | apr_status_t parser_parsefile(config_t *cfg, config_dbd_t *dbconn, |
32 | config_filestat_t *fstat); | ||
32 | 33 | ||
33 | apr_status_t parser_processline(apr_pool_t *ptemp, config_t *cfg, | 34 | apr_status_t parser_processline(apr_pool_t *ptemp, config_t *cfg, |
34 | config_filestat_t *line, char **argv, int argc); | 35 | config_dbd_t *dbconn, config_filestat_t *line, char **argv, int argc); |
35 | 36 | ||
36 | #endif /*LOGPARSE_H_*/ | 37 | #endif /*LOGPARSE_H_*/ |
diff --git a/utility/shell.c b/utility/shell.c index 2f2f43b..0fea741 100644 --- a/utility/shell.c +++ b/utility/shell.c | |||
@@ -12,6 +12,17 @@ | |||
12 | #include "database.h" | 12 | #include "database.h" |
13 | #include "util.h" | 13 | #include "util.h" |
14 | 14 | ||
15 | #if APR_HAS_THREADS | ||
16 | #include "apr_queue.h" | ||
17 | #include "apr_thread_pool.h" | ||
18 | |||
19 | static apr_queue_t *queue; | ||
20 | |||
21 | void run_multithreaded(config_t *cfg); | ||
22 | #endif | ||
23 | |||
24 | void run_singlethreaded(config_t *cfg); | ||
25 | |||
15 | const apr_getopt_option_t _opt_config[] = { | 26 | const apr_getopt_option_t _opt_config[] = { |
16 | {"machineid", 'm', 1, "Machine ID for the log file"}, | 27 | {"machineid", 'm', 1, "Machine ID for the log file"}, |
17 | {"transaction", 't', 1, "Use a Transaction (yes,no)"}, | 28 | {"transaction", 't', 1, "Use a Transaction (yes,no)"}, |
@@ -23,6 +34,7 @@ const apr_getopt_option_t _opt_config[] = { | |||
23 | {"dump", 'd', 0, "Dump the configuration after parsing and quit"}, | 34 | {"dump", 'd', 0, "Dump the configuration after parsing and quit"}, |
24 | {"loglevel", 'l', 1, "Log Level (deubg, notice, error)"}, | 35 | {"loglevel", 'l', 1, "Log Level (deubg, notice, error)"}, |
25 | {"summary", 's', 1, "Summary (yes,no)"}, | 36 | {"summary", 's', 1, "Summary (yes,no)"}, |
37 | {"threadcount", 'p', 1, "Set thread count (a number greater than 0)"}, | ||
26 | {"help", 'h', 0, "Show Help"}, | 38 | {"help", 'h', 0, "Show Help"}, |
27 | {NULL} | 39 | {NULL} |
28 | }; | 40 | }; |
@@ -58,25 +70,45 @@ void show_help(const char *prog, const apr_getopt_option_t *opts, FILE *output) | |||
58 | void print_summary(config_t *cfg) { | 70 | void print_summary(config_t *cfg) { |
59 | config_filestat_t *fstat; | 71 | config_filestat_t *fstat; |
60 | int i,m; | 72 | int i,m; |
73 | apr_time_t totaltime = 0; | ||
74 | apr_size_t totalparsed = 0, totalskipped = 0, totalbad = 0; | ||
61 | 75 | ||
62 | fstat = (config_filestat_t *)cfg->input_files->elts; | 76 | fstat = (config_filestat_t *)cfg->input_files->elts; |
63 | 77 | ||
64 | printf("Execution Summary\nParsed %d files\n", cfg->input_files->nelts); | 78 | printf("Execution Summary\nParsed %d files\n", cfg->input_files->nelts); |
65 | for (i=0, m=cfg->input_files->nelts; i<m; i++) { | 79 | for (i=0, m=cfg->input_files->nelts; i<m; i++) { |
80 | totaltime += fstat[i].stop - fstat[i].start; | ||
81 | totalparsed += fstat[i].linesparsed; | ||
82 | totalskipped += fstat[i].lineskipped; | ||
83 | totalbad += fstat[i].linesbad; | ||
66 | printf(" File: %s\n" | 84 | printf(" File: %s\n" |
67 | " Lines Parsed %'d out of %'d (Skipped %'d, Bad %'d)\n" | 85 | " Lines Added %'d out of %'d (Skipped %'d, Bad %'d)\n" |
68 | " Status: %s\n" | 86 | " Status: %s\n" |
69 | " Duration: %02"APR_TIME_T_FMT":%02"APR_TIME_T_FMT".%"APR_TIME_T_FMT" (minutes, seconds, and miliseconds)\n" | 87 | " Duration: %02"APR_TIME_T_FMT":%02"APR_TIME_T_FMT".%"APR_TIME_T_FMT" (minutes, seconds, and miliseconds)\n" |
70 | "\n", | 88 | "\n", |
71 | fstat[i].fname, | 89 | fstat[i].fname, |
72 | fstat[i].linesparsed - fstat[i].lineskipped - fstat[i].linesbad, | 90 | fstat[i].linesparsed - fstat[i].lineskipped - fstat[i].linesbad, |
73 | fstat[i].linesparsed, fstat[i].lineskipped, fstat[i].linesbad, | 91 | fstat[i].linesparsed, |
92 | fstat[i].lineskipped, | ||
93 | fstat[i].linesbad, | ||
74 | fstat[i].result, | 94 | fstat[i].result, |
75 | apr_time_sec(fstat[i].stop - fstat[i].start)/60, | 95 | apr_time_sec(fstat[i].stop - fstat[i].start)/60, |
76 | apr_time_sec(fstat[i].stop - fstat[i].start) % 60, | 96 | apr_time_sec(fstat[i].stop - fstat[i].start) % 60, |
77 | apr_time_msec(fstat[i].stop - fstat[i].start) | 97 | apr_time_msec(fstat[i].stop - fstat[i].start) |
78 | ); | 98 | ); |
79 | } | 99 | } |
100 | printf("Totals\n" | ||
101 | " Lines Added %'d out of %'d (Skipped %'d, Bad %'d)\n" | ||
102 | " Duration: %02"APR_TIME_T_FMT":%02"APR_TIME_T_FMT".%"APR_TIME_T_FMT" (minutes, seconds, and miliseconds)\n" | ||
103 | "\n", | ||
104 | totalparsed - totalskipped - totalbad, | ||
105 | totalparsed, | ||
106 | totalskipped, | ||
107 | totalbad, | ||
108 | apr_time_sec(totaltime)/60, | ||
109 | apr_time_sec(totaltime) % 60, | ||
110 | apr_time_msec(totaltime) | ||
111 | ); | ||
80 | } | 112 | } |
81 | 113 | ||
82 | int main(int argc, const char *const argv[]) | 114 | int main(int argc, const char *const argv[]) |
@@ -130,6 +162,9 @@ int main(int argc, const char *const argv[]) | |||
130 | case 'n': | 162 | case 'n': |
131 | apr_table_setn(args,"dryrun","yes"); | 163 | apr_table_setn(args,"dryrun","yes"); |
132 | break; | 164 | break; |
165 | case 'p': | ||
166 | apr_table_setn(args,"threadcount",opt_arg); | ||
167 | break; | ||
133 | case 'r': | 168 | case 'r': |
134 | apr_table_setn(args,"logformat",opt_arg); | 169 | apr_table_setn(args,"logformat",opt_arg); |
135 | break; | 170 | break; |
@@ -185,34 +220,123 @@ int main(int argc, const char *const argv[]) | |||
185 | if (apr_is_empty_array(cfg->input_files)) { | 220 | if (apr_is_empty_array(cfg->input_files)) { |
186 | parser_find_logs(cfg); | 221 | parser_find_logs(cfg); |
187 | } | 222 | } |
188 | if (!cfg->dryrun) { | ||
189 | if ((rv = database_connect(cfg))) { | ||
190 | logging_log(cfg,LOGLEVEL_NOISE, "Error Connecting to Database"); | ||
191 | exit(1); | ||
192 | } | ||
193 | } | ||
194 | if (!apr_is_empty_array(cfg->input_files)) { | 223 | if (!apr_is_empty_array(cfg->input_files)) { |
195 | parser_split_logs(cfg); | 224 | parser_split_logs(cfg); |
196 | config_filestat_t *filelist; | 225 | #if APR_HAS_THREADS |
197 | int f, l; | 226 | if (cfg->thread_count > 1) { |
198 | filelist = (config_filestat_t *)cfg->input_files->elts; | 227 | run_multithreaded(cfg); |
199 | for (f=0, l=cfg->input_files->nelts; f < l; f++) { | 228 | } else { |
200 | rv = parser_parsefile(cfg, &filelist[f]); | 229 | #endif |
201 | if (rv) { | 230 | run_singlethreaded(cfg); |
202 | logging_log(cfg, LOGLEVEL_NOISE, | 231 | #if APR_HAS_THREADS |
203 | "Error occured parsing log files. Aborting"); | ||
204 | break; | ||
205 | } | ||
206 | } | 232 | } |
233 | #endif | ||
207 | } else { | 234 | } else { |
208 | logging_log(cfg,LOGLEVEL_NOISE,"No log files found to parse"); | 235 | logging_log(cfg,LOGLEVEL_NOISE,"No log files found to parse"); |
209 | } | 236 | } |
210 | if (!cfg->dryrun) { | ||
211 | database_disconnect(cfg); | ||
212 | } | ||
213 | 237 | ||
214 | if (cfg->summary) { | 238 | if (cfg->summary) { |
215 | print_summary(cfg); | 239 | print_summary(cfg); |
216 | } | 240 | } |
217 | return 0; | 241 | return 0; |
218 | } | 242 | } |
243 | |||
244 | void run_singlethreaded(config_t *cfg) | ||
245 | { | ||
246 | config_filestat_t *filelist; | ||
247 | config_dbd_t *dbconn = NULL; | ||
248 | int f, l; | ||
249 | apr_status_t rv; | ||
250 | |||
251 | if (!cfg->dryrun) { | ||
252 | if ((rv = database_connect(cfg, &dbconn))) { | ||
253 | logging_log(cfg,LOGLEVEL_NOISE, "Error Connecting to Database"); | ||
254 | exit(1); | ||
255 | } | ||
256 | } | ||
257 | |||
258 | filelist = (config_filestat_t *)cfg->input_files->elts; | ||
259 | for (f=0, l=cfg->input_files->nelts; f < l; f++) { | ||
260 | rv = parser_parsefile(cfg, dbconn, &filelist[f]); | ||
261 | if (rv) { | ||
262 | logging_log(cfg, LOGLEVEL_NOISE, | ||
263 | "Error occured parsing log files. Aborting"); | ||
264 | break; | ||
265 | } | ||
266 | } | ||
267 | |||
268 | if (!cfg->dryrun) { | ||
269 | database_disconnect(dbconn); | ||
270 | } | ||
271 | } | ||
272 | |||
273 | #if APR_HAS_THREADS | ||
274 | void * APR_THREAD_FUNC run_filethread(apr_thread_t *thd, void *data) | ||
275 | { | ||
276 | config_t *cfg = data; | ||
277 | config_dbd_t *dbconn = NULL; | ||
278 | config_filestat_t *fileentry; | ||
279 | apr_status_t rv; | ||
280 | |||
281 | if (!cfg->dryrun) { | ||
282 | if ((rv = database_connect(cfg, &dbconn))) { | ||
283 | logging_log(cfg,LOGLEVEL_NOISE, "Error Connecting to Database"); | ||
284 | exit(1); | ||
285 | } | ||
286 | } | ||
287 | |||
288 | while (1) { | ||
289 | rv = apr_queue_pop(queue, (void **)&fileentry); | ||
290 | if (rv == APR_EINTR) | ||
291 | continue; | ||
292 | if (rv == APR_EOF) | ||
293 | break; | ||
294 | rv = parser_parsefile(cfg, dbconn, fileentry); | ||
295 | if (rv) { | ||
296 | logging_log(cfg, LOGLEVEL_NOISE, | ||
297 | "Error occured parsing log file %s", fileentry->fname); | ||
298 | } | ||
299 | } | ||
300 | |||
301 | if (!cfg->dryrun) { | ||
302 | database_disconnect(dbconn); | ||
303 | } | ||
304 | return NULL; | ||
305 | } | ||
306 | |||
307 | void run_multithreaded(config_t *cfg) | ||
308 | { | ||
309 | logging_log(cfg, LOGLEVEL_NOISE, "Running Multithreaded"); | ||
310 | |||
311 | config_filestat_t *filelist; | ||
312 | int f, l; | ||
313 | apr_status_t rv; | ||
314 | apr_pool_t *tp; | ||
315 | apr_thread_pool_t *thrp; | ||
316 | unsigned int count; | ||
317 | |||
318 | apr_pool_create(&tp, cfg->pool); | ||
319 | rv = apr_queue_create(&queue, cfg->input_files->nelts, tp); | ||
320 | |||
321 | rv = apr_thread_pool_create(&thrp, 0, cfg->thread_count, tp); | ||
322 | |||
323 | //populate queue | ||
324 | filelist = (config_filestat_t *)cfg->input_files->elts; | ||
325 | for (f=0, l=cfg->input_files->nelts; f < l; f++) { | ||
326 | rv = apr_queue_push(queue, &filelist[f]); | ||
327 | } | ||
328 | // populate the worker threads | ||
329 | for (f=0; f<cfg->thread_count; f++) { | ||
330 | rv = apr_thread_pool_push(thrp, run_filethread, cfg, 0, NULL); | ||
331 | } | ||
332 | |||
333 | do { | ||
334 | apr_sleep(apr_time_from_sec(1)); | ||
335 | count = apr_queue_size(queue); | ||
336 | } while (count > 0); | ||
337 | |||
338 | rv = apr_queue_term(queue); | ||
339 | |||
340 | rv = apr_thread_pool_destroy(thrp); | ||
341 | } | ||
342 | #endif | ||