summaryrefslogtreecommitdiffstatsabout
path: root/utility/shell.c
diff options
context:
space:
mode:
Diffstat (limited to 'utility/shell.c')
-rw-r--r--utility/shell.c166
1 files changed, 145 insertions, 21 deletions
diff --git a/utility/shell.c b/utility/shell.c
index 2f2f43b..0fea741 100644
--- a/utility/shell.c
+++ b/utility/shell.c
@@ -12,6 +12,17 @@
12#include "database.h" 12#include "database.h"
13#include "util.h" 13#include "util.h"
14 14
15#if APR_HAS_THREADS
16#include "apr_queue.h"
17#include "apr_thread_pool.h"
18
19static apr_queue_t *queue;
20
21void run_multithreaded(config_t *cfg);
22#endif
23
24void run_singlethreaded(config_t *cfg);
25
15const apr_getopt_option_t _opt_config[] = { 26const apr_getopt_option_t _opt_config[] = {
16 {"machineid", 'm', 1, "Machine ID for the log file"}, 27 {"machineid", 'm', 1, "Machine ID for the log file"},
17 {"transaction", 't', 1, "Use a Transaction (yes,no)"}, 28 {"transaction", 't', 1, "Use a Transaction (yes,no)"},
@@ -23,6 +34,7 @@ const apr_getopt_option_t _opt_config[] = {
23 {"dump", 'd', 0, "Dump the configuration after parsing and quit"}, 34 {"dump", 'd', 0, "Dump the configuration after parsing and quit"},
24 {"loglevel", 'l', 1, "Log Level (deubg, notice, error)"}, 35 {"loglevel", 'l', 1, "Log Level (deubg, notice, error)"},
25 {"summary", 's', 1, "Summary (yes,no)"}, 36 {"summary", 's', 1, "Summary (yes,no)"},
37 {"threadcount", 'p', 1, "Set thread count (a number greater than 0)"},
26 {"help", 'h', 0, "Show Help"}, 38 {"help", 'h', 0, "Show Help"},
27 {NULL} 39 {NULL}
28}; 40};
@@ -58,25 +70,45 @@ void show_help(const char *prog, const apr_getopt_option_t *opts, FILE *output)
58void print_summary(config_t *cfg) { 70void print_summary(config_t *cfg) {
59 config_filestat_t *fstat; 71 config_filestat_t *fstat;
60 int i,m; 72 int i,m;
73 apr_time_t totaltime = 0;
74 apr_size_t totalparsed = 0, totalskipped = 0, totalbad = 0;
61 75
62 fstat = (config_filestat_t *)cfg->input_files->elts; 76 fstat = (config_filestat_t *)cfg->input_files->elts;
63 77
64 printf("Execution Summary\nParsed %d files\n", cfg->input_files->nelts); 78 printf("Execution Summary\nParsed %d files\n", cfg->input_files->nelts);
65 for (i=0, m=cfg->input_files->nelts; i<m; i++) { 79 for (i=0, m=cfg->input_files->nelts; i<m; i++) {
80 totaltime += fstat[i].stop - fstat[i].start;
81 totalparsed += fstat[i].linesparsed;
82 totalskipped += fstat[i].lineskipped;
83 totalbad += fstat[i].linesbad;
66 printf(" File: %s\n" 84 printf(" File: %s\n"
67 " Lines Parsed %'d out of %'d (Skipped %'d, Bad %'d)\n" 85 " Lines Added %'d out of %'d (Skipped %'d, Bad %'d)\n"
68 " Status: %s\n" 86 " Status: %s\n"
69 " Duration: %02"APR_TIME_T_FMT":%02"APR_TIME_T_FMT".%"APR_TIME_T_FMT" (minutes, seconds, and miliseconds)\n" 87 " Duration: %02"APR_TIME_T_FMT":%02"APR_TIME_T_FMT".%"APR_TIME_T_FMT" (minutes, seconds, and miliseconds)\n"
70 "\n", 88 "\n",
71 fstat[i].fname, 89 fstat[i].fname,
72 fstat[i].linesparsed - fstat[i].lineskipped - fstat[i].linesbad, 90 fstat[i].linesparsed - fstat[i].lineskipped - fstat[i].linesbad,
73 fstat[i].linesparsed, fstat[i].lineskipped, fstat[i].linesbad, 91 fstat[i].linesparsed,
92 fstat[i].lineskipped,
93 fstat[i].linesbad,
74 fstat[i].result, 94 fstat[i].result,
75 apr_time_sec(fstat[i].stop - fstat[i].start)/60, 95 apr_time_sec(fstat[i].stop - fstat[i].start)/60,
76 apr_time_sec(fstat[i].stop - fstat[i].start) % 60, 96 apr_time_sec(fstat[i].stop - fstat[i].start) % 60,
77 apr_time_msec(fstat[i].stop - fstat[i].start) 97 apr_time_msec(fstat[i].stop - fstat[i].start)
78 ); 98 );
79 } 99 }
100 printf("Totals\n"
101 " Lines Added %'d out of %'d (Skipped %'d, Bad %'d)\n"
102 " Duration: %02"APR_TIME_T_FMT":%02"APR_TIME_T_FMT".%"APR_TIME_T_FMT" (minutes, seconds, and miliseconds)\n"
103 "\n",
104 totalparsed - totalskipped - totalbad,
105 totalparsed,
106 totalskipped,
107 totalbad,
108 apr_time_sec(totaltime)/60,
109 apr_time_sec(totaltime) % 60,
110 apr_time_msec(totaltime)
111 );
80} 112}
81 113
82int main(int argc, const char *const argv[]) 114int main(int argc, const char *const argv[])
@@ -130,6 +162,9 @@ int main(int argc, const char *const argv[])
130 case 'n': 162 case 'n':
131 apr_table_setn(args,"dryrun","yes"); 163 apr_table_setn(args,"dryrun","yes");
132 break; 164 break;
165 case 'p':
166 apr_table_setn(args,"threadcount",opt_arg);
167 break;
133 case 'r': 168 case 'r':
134 apr_table_setn(args,"logformat",opt_arg); 169 apr_table_setn(args,"logformat",opt_arg);
135 break; 170 break;
@@ -185,34 +220,123 @@ int main(int argc, const char *const argv[])
185 if (apr_is_empty_array(cfg->input_files)) { 220 if (apr_is_empty_array(cfg->input_files)) {
186 parser_find_logs(cfg); 221 parser_find_logs(cfg);
187 } 222 }
188 if (!cfg->dryrun) {
189 if ((rv = database_connect(cfg))) {
190 logging_log(cfg,LOGLEVEL_NOISE, "Error Connecting to Database");
191 exit(1);
192 }
193 }
194 if (!apr_is_empty_array(cfg->input_files)) { 223 if (!apr_is_empty_array(cfg->input_files)) {
195 parser_split_logs(cfg); 224 parser_split_logs(cfg);
196 config_filestat_t *filelist; 225#if APR_HAS_THREADS
197 int f, l; 226 if (cfg->thread_count > 1) {
198 filelist = (config_filestat_t *)cfg->input_files->elts; 227 run_multithreaded(cfg);
199 for (f=0, l=cfg->input_files->nelts; f < l; f++) { 228 } else {
200 rv = parser_parsefile(cfg, &filelist[f]); 229#endif
201 if (rv) { 230 run_singlethreaded(cfg);
202 logging_log(cfg, LOGLEVEL_NOISE, 231#if APR_HAS_THREADS
203 "Error occured parsing log files. Aborting");
204 break;
205 }
206 } 232 }
233#endif
207 } else { 234 } else {
208 logging_log(cfg,LOGLEVEL_NOISE,"No log files found to parse"); 235 logging_log(cfg,LOGLEVEL_NOISE,"No log files found to parse");
209 } 236 }
210 if (!cfg->dryrun) {
211 database_disconnect(cfg);
212 }
213 237
214 if (cfg->summary) { 238 if (cfg->summary) {
215 print_summary(cfg); 239 print_summary(cfg);
216 } 240 }
217 return 0; 241 return 0;
218} 242}
243
244void run_singlethreaded(config_t *cfg)
245{
246 config_filestat_t *filelist;
247 config_dbd_t *dbconn = NULL;
248 int f, l;
249 apr_status_t rv;
250
251 if (!cfg->dryrun) {
252 if ((rv = database_connect(cfg, &dbconn))) {
253 logging_log(cfg,LOGLEVEL_NOISE, "Error Connecting to Database");
254 exit(1);
255 }
256 }
257
258 filelist = (config_filestat_t *)cfg->input_files->elts;
259 for (f=0, l=cfg->input_files->nelts; f < l; f++) {
260 rv = parser_parsefile(cfg, dbconn, &filelist[f]);
261 if (rv) {
262 logging_log(cfg, LOGLEVEL_NOISE,
263 "Error occured parsing log files. Aborting");
264 break;
265 }
266 }
267
268 if (!cfg->dryrun) {
269 database_disconnect(dbconn);
270 }
271}
272
273#if APR_HAS_THREADS
274void * APR_THREAD_FUNC run_filethread(apr_thread_t *thd, void *data)
275{
276 config_t *cfg = data;
277 config_dbd_t *dbconn = NULL;
278 config_filestat_t *fileentry;
279 apr_status_t rv;
280
281 if (!cfg->dryrun) {
282 if ((rv = database_connect(cfg, &dbconn))) {
283 logging_log(cfg,LOGLEVEL_NOISE, "Error Connecting to Database");
284 exit(1);
285 }
286 }
287
288 while (1) {
289 rv = apr_queue_pop(queue, (void **)&fileentry);
290 if (rv == APR_EINTR)
291 continue;
292 if (rv == APR_EOF)
293 break;
294 rv = parser_parsefile(cfg, dbconn, fileentry);
295 if (rv) {
296 logging_log(cfg, LOGLEVEL_NOISE,
297 "Error occured parsing log file %s", fileentry->fname);
298 }
299 }
300
301 if (!cfg->dryrun) {
302 database_disconnect(dbconn);
303 }
304 return NULL;
305}
306
307void run_multithreaded(config_t *cfg)
308{
309 logging_log(cfg, LOGLEVEL_NOISE, "Running Multithreaded");
310
311 config_filestat_t *filelist;
312 int f, l;
313 apr_status_t rv;
314 apr_pool_t *tp;
315 apr_thread_pool_t *thrp;
316 unsigned int count;
317
318 apr_pool_create(&tp, cfg->pool);
319 rv = apr_queue_create(&queue, cfg->input_files->nelts, tp);
320
321 rv = apr_thread_pool_create(&thrp, 0, cfg->thread_count, tp);
322
323 //populate queue
324 filelist = (config_filestat_t *)cfg->input_files->elts;
325 for (f=0, l=cfg->input_files->nelts; f < l; f++) {
326 rv = apr_queue_push(queue, &filelist[f]);
327 }
328 // populate the worker threads
329 for (f=0; f<cfg->thread_count; f++) {
330 rv = apr_thread_pool_push(thrp, run_filethread, cfg, 0, NULL);
331 }
332
333 do {
334 apr_sleep(apr_time_from_sec(1));
335 count = apr_queue_size(queue);
336 } while (count > 0);
337
338 rv = apr_queue_term(queue);
339
340 rv = apr_thread_pool_destroy(thrp);
341}
342#endif