summaryrefslogtreecommitdiffstatsabout
path: root/utility/logparse.c
diff options
context:
space:
mode:
authorEdward Rudd <urkle@outoforder.cc>2009-03-14 22:07:56 (GMT)
committer Edward Rudd <urkle@outoforder.cc>2009-03-14 22:07:56 (GMT)
commitb8790b1b48c238f2ba266e34625b8e8f5db0ad6e (patch)
treea9c7b08bc4ed3ee59aaf574f5f1738efa915fadc /utility/logparse.c
parent99867e8a2eca4421075900e44f24cfd749db7dcb (diff)
refactoroed to allo wdb connections to be per-thread
added initial threading implementation using apr_queues and apr_thread_pools
Diffstat (limited to 'utility/logparse.c')
-rw-r--r--utility/logparse.c19
1 files changed, 8 insertions, 11 deletions
diff --git a/utility/logparse.c b/utility/logparse.c
index 7267682..1b3cc97 100644
--- a/utility/logparse.c
+++ b/utility/logparse.c
@@ -266,14 +266,12 @@ void parser_split_logs(config_t *cfg)
266 ++linecount; 266 ++linecount;
267 } 267 }
268 } 268 }
269 printf("Lines %'d\n",linecount);
270 // now we know how long it is. Lets split up the file 269 // now we know how long it is. Lets split up the file
271 piecesize = linecount / cfg->split_count; 270 piecesize = linecount / cfg->split_count;
272 if (piecesize < cfg->split_minimum) 271 if (piecesize < cfg->split_minimum)
273 piecesize = cfg->split_minimum; 272 piecesize = cfg->split_minimum;
274 if (piecesize > cfg->split_maximum && cfg->split_maximum > 0) 273 if (piecesize > cfg->split_maximum && cfg->split_maximum > 0)
275 piecesize = cfg->split_maximum; 274 piecesize = cfg->split_maximum;
276 printf("Piece size %'d\n", piecesize);
277 if (piecesize > linecount) { 275 if (piecesize > linecount) {
278 // File is smaller than piece size just add it back in as is 276 // File is smaller than piece size just add it back in as is
279 newfile = (config_filestat_t *)apr_array_push(cfg->input_files); 277 newfile = (config_filestat_t *)apr_array_push(cfg->input_files);
@@ -296,7 +294,6 @@ void parser_split_logs(config_t *cfg)
296 basefile = apr_pstrdup(tfp, basename(apr_pstrdup(tfp, filelist[f].fname))); 294 basefile = apr_pstrdup(tfp, basename(apr_pstrdup(tfp, filelist[f].fname)));
297 295
298 file = apr_psprintf(tfp, "%s/%s-%d", cfg->split_dir, basefile, file_count++); 296 file = apr_psprintf(tfp, "%s/%s-%d", cfg->split_dir, basefile, file_count++);
299 printf("Out file %s\n", file);
300 logging_log(cfg, LOGLEVEL_NOTICE, "SPLITTER: Creating output file %s", file); 297 logging_log(cfg, LOGLEVEL_NOTICE, "SPLITTER: Creating output file %s", file);
301 rv = apr_file_open(&outfile, file, APR_FOPEN_WRITE | APR_FOPEN_CREATE | APR_FOPEN_TRUNCATE, APR_OS_DEFAULT, tfp); 298 rv = apr_file_open(&outfile, file, APR_FOPEN_WRITE | APR_FOPEN_CREATE | APR_FOPEN_TRUNCATE, APR_OS_DEFAULT, tfp);
302 if (rv != APR_SUCCESS) { 299 if (rv != APR_SUCCESS) {
@@ -343,7 +340,6 @@ void parser_split_logs(config_t *cfg)
343 out_lines = 0; 340 out_lines = 0;
344 // Open new file 341 // Open new file
345 file = apr_psprintf(tfp, "%s/%s-%d", cfg->split_dir, basefile, file_count++); 342 file = apr_psprintf(tfp, "%s/%s-%d", cfg->split_dir, basefile, file_count++);
346 printf("Out file %s\n", file);
347 logging_log(cfg, LOGLEVEL_NOTICE, "SPLITTER: Creating output file %s", file); 343 logging_log(cfg, LOGLEVEL_NOTICE, "SPLITTER: Creating output file %s", file);
348 rv = apr_file_open(&outfile, file, APR_FOPEN_WRITE | APR_FOPEN_CREATE | APR_FOPEN_TRUNCATE, APR_OS_DEFAULT, tfp); 344 rv = apr_file_open(&outfile, file, APR_FOPEN_WRITE | APR_FOPEN_CREATE | APR_FOPEN_TRUNCATE, APR_OS_DEFAULT, tfp);
349 if (rv != APR_SUCCESS) { 345 if (rv != APR_SUCCESS) {
@@ -548,7 +544,8 @@ apr_status_t parser_tokenize_line(const char *arg_str, char ***argv_out,
548 return APR_SUCCESS; 544 return APR_SUCCESS;
549} 545}
550 546
551apr_status_t parser_parsefile(config_t *cfg, config_filestat_t *fstat) 547apr_status_t parser_parsefile(config_t *cfg, config_dbd_t *dbconn,
548 config_filestat_t *fstat)
552{ 549{
553 apr_pool_t *tp, *targp; 550 apr_pool_t *tp, *targp;
554 apr_file_t *file; 551 apr_file_t *file;
@@ -573,7 +570,7 @@ apr_status_t parser_parsefile(config_t *cfg, config_filestat_t *fstat)
573 fstat->linesparsed = 0; 570 fstat->linesparsed = 0;
574 // Start Transaction 571 // Start Transaction
575 fstat->start = apr_time_now(); 572 fstat->start = apr_time_now();
576 if (!cfg->dryrun && database_trans_start(cfg,tp)) { 573 if (!cfg->dryrun && database_trans_start(cfg, dbconn, tp)) {
577 fstat->result = "Database Transaction Error"; 574 fstat->result = "Database Transaction Error";
578 fstat->stop = apr_time_now(); 575 fstat->stop = apr_time_now();
579 return rv; 576 return rv;
@@ -614,14 +611,14 @@ apr_status_t parser_parsefile(config_t *cfg, config_filestat_t *fstat)
614 targc = 0; 611 targc = 0;
615 while (targv[targc]) 612 while (targv[targc])
616 targc++; 613 targc++;
617 rv = parser_processline(targp, cfg, fstat, targv, targc); 614 rv = parser_processline(targp, cfg, dbconn, fstat, targv, targc);
618 if (rv != APR_SUCCESS) { 615 if (rv != APR_SUCCESS) {
619 int i; 616 int i;
620 617
621 fstat->linesbad++; 618 fstat->linesbad++;
622 rv = parser_logbadline(cfg, fstat->fname, buff); 619 rv = parser_logbadline(cfg, fstat->fname, buff);
623 if (rv) { 620 if (rv) {
624 if (!cfg->dryrun) database_trans_abort(cfg); 621 if (!cfg->dryrun) database_trans_abort(cfg, dbconn);
625 logging_log(cfg, LOGLEVEL_ERROR, "Line %d(%d): %s", fstat->linesparsed, 622 logging_log(cfg, LOGLEVEL_ERROR, "Line %d(%d): %s", fstat->linesparsed,
626 targc, buff); 623 targc, buff);
627 for (i = 0; targv[i]; i++) { 624 for (i = 0; targv[i]; i++) {
@@ -637,7 +634,7 @@ apr_status_t parser_parsefile(config_t *cfg, config_filestat_t *fstat)
637 } while (rv == APR_SUCCESS); 634 } while (rv == APR_SUCCESS);
638 apr_file_close(file); 635 apr_file_close(file);
639 // Finish Transaction 636 // Finish Transaction
640 if (!cfg->dryrun && database_trans_stop(cfg,tp)) { 637 if (!cfg->dryrun && database_trans_stop(cfg, dbconn, tp)) {
641 fstat->result = apr_psprintf(cfg->pool, 638 fstat->result = apr_psprintf(cfg->pool,
642 "Input line %d, Database Transaction Error", 639 "Input line %d, Database Transaction Error",
643 fstat->linesparsed); 640 fstat->linesparsed);
@@ -655,7 +652,7 @@ apr_status_t parser_parsefile(config_t *cfg, config_filestat_t *fstat)
655} 652}
656 653
657apr_status_t parser_processline(apr_pool_t *ptemp, config_t *cfg, 654apr_status_t parser_processline(apr_pool_t *ptemp, config_t *cfg,
658 config_filestat_t *fstat, char **argv, int argc) 655 config_dbd_t *dbconn, config_filestat_t *fstat, char **argv, int argc)
659{ 656{
660 config_logformat_t *fmt; 657 config_logformat_t *fmt;
661 config_logformat_field_t *ifields; 658 config_logformat_field_t *ifields;
@@ -760,7 +757,7 @@ apr_status_t parser_processline(apr_pool_t *ptemp, config_t *cfg,
760 757
761 // Process DB Query 758 // Process DB Query
762 if (!cfg->dryrun) { 759 if (!cfg->dryrun) {
763 rv = database_insert(cfg, ptemp, dataout); 760 rv = database_insert(cfg, dbconn, ptemp, dataout);
764 if (rv) { 761 if (rv) {
765 fstat->result = apr_psprintf(cfg->pool, 762 fstat->result = apr_psprintf(cfg->pool,
766 "Input line %d, Database Error", 763 "Input line %d, Database Error",