From 99867e8a2eca4421075900e44f24cfd749db7dcb Mon Sep 17 00:00:00 2001 From: Edward Rudd Date: Wed, 11 Mar 2009 04:27:34 +0000 Subject: add in splitting code fixed time display (modulus of seconds) --- utility/config.c | 67 +++++++++++++++---- utility/config.h | 16 ++++- utility/logparse.c | 169 ++++++++++++++++++++++++++++++++++++++++++++++- utility/logparse.h | 2 + utility/mod_log_sql.conf | 10 ++- utility/shell.c | 7 +- 6 files changed, 251 insertions(+), 20 deletions(-) diff --git a/utility/config.c b/utility/config.c index 6d3f61e..b1ba4fa 100644 --- a/utility/config.c +++ b/utility/config.c @@ -24,6 +24,18 @@ static apr_status_t config_set_string(config_t *cfg, config_opt_t *opt, return APR_SUCCESS; } +static apr_status_t config_set_file(config_t *cfg, config_opt_t *opt, + int argc, const char **argv) +{ + int offset = (int)(long)opt->data; + char **data = (char **)((void *)cfg + offset); + if (argc != 2) + return APR_EINVAL; + apr_filepath_merge(data, NULL, argv[1], + APR_FILEPATH_TRUENAME, cfg->pool); + return APR_SUCCESS; +} + static apr_status_t config_set_int(config_t *cfg, config_opt_t *opt, int argc, const char **argv) { @@ -70,7 +82,10 @@ static apr_status_t config_set_inputfile(config_t *cfg, config_opt_t *opt, if (argc != 2) return APR_EINVAL; newp = (config_filestat_t *)apr_array_push(cfg->input_files); - newp->fname = apr_pstrdup(cfg->pool, argv[1]); + char *temp; + apr_filepath_merge(&temp, NULL, argv[1], + APR_FILEPATH_TRUENAME, cfg->pool); + newp->fname = temp; newp->result = "Not Parsed"; return APR_SUCCESS; } @@ -213,6 +228,12 @@ void config_dump(config_t *cfg) printf("InputDir: %s\n", cfg->input_dir); + printf("Split input files: %d\n", cfg->split_enabled); + printf("Split output directory: %s\n", cfg->split_dir); + printf("Split file count: %d\n", cfg->split_count); + printf("Split min lines: %'d\n", cfg->split_minimum); + printf("Split max lines: %'d\n", cfg->split_maximum); + printf("DB Driver: %s\n", cfg->dbdriver); printf("DB Params: %s\n", cfg->dbparams); @@ -299,24 +320,42 @@ static void config_add_option(apr_pool_t *p, const char *const name, void config_init(apr_pool_t *p) { - config_add_option(p, "ErrorLog", "File to log errors", config_set_string, + config_add_option(p, "ErrorLog", "File to log errors", config_set_file, (void *)APR_OFFSETOF(config_t, errorlog)); - config_add_option(p, "LogLevel", - "Set Log Level (error, warn, debug, quiet)", config_set_loglevel, - NULL); + config_add_option(p, "LogLevel", "Set Log Level (error, warn, debug, quiet)", + config_set_loglevel, NULL); - config_add_option(p, "BadLineFile", "File to log bad log lines", config_set_string, + config_add_option(p, "BadLineFile", "File to log bad log lines", config_set_file, (void *)APR_OFFSETOF(config_t, badlinefile)); - config_add_option(p, "BadLineMax", - "Max number of bad lines before aborting", config_set_int, - (void *)APR_OFFSETOF(config_t, badlinemax)); + config_add_option(p, "BadLineMax", "Max number of bad lines before aborting", + config_set_int, (void *)APR_OFFSETOF(config_t, badlinemax)); config_add_option(p, "InputDirectory", "Directory to scan for log files", - config_set_string, (void *)APR_OFFSETOF(config_t, input_dir)); + config_set_file, (void *)APR_OFFSETOF(config_t, input_dir)); config_add_option(p, "InputFile", "Parse only this file", config_set_inputfile, NULL); + config_add_option(p, "SplitInput", + "Split the file into pieces, then process", + config_set_flag, (void *)APR_OFFSETOF(config_t, split_enabled)); + config_add_option(p, "SplitCount", + "Split the file into N number of pieces", + config_set_int, (void *)APR_OFFSETOF(config_t, split_count)); + config_add_option(p, "SplitMinLines", + "Each split piece will have a minumum of N lines", + config_set_int, (void *)APR_OFFSETOF(config_t, split_minimum)); + config_add_option(p, "SplitMaxLines", + "Each split piece will have a maximum of N lines", + config_set_int, (void *)APR_OFFSETOF(config_t, split_maximum)); + config_add_option(p, "SplitDirectory", + "Output directory to put intermediate split files", + config_set_file, (void *)APR_OFFSETOF(config_t, split_dir)); + + config_add_option(p, "ThreadCount", + "Numer of threads to use for processing the input files", + config_set_int, (void *)APR_OFFSETOF(config_t, thread_count)); + config_add_option(p, "DBDDriver", "DBD Driver to use", config_set_string, (void *)APR_OFFSETOF(config_t, dbdriver)); config_add_option(p, "DBDParams", "DBD Connection Parameters", @@ -325,8 +364,8 @@ void config_init(apr_pool_t *p) config_set_string, (void *)APR_OFFSETOF(config_t, table)); config_add_option(p, "UseTransactions", "Enable Transactions?", config_set_flag, (void *)APR_OFFSETOF(config_t, transactions)); - config_add_option(p, "MachineID", "Machine ID to set", config_set_string, - (void *)APR_OFFSETOF(config_t, machineid)); + config_add_option(p, "MachineID", "Machine ID to set", + config_set_string, (void *)APR_OFFSETOF(config_t, machineid)); config_add_option(p, "LogFormatConfig", "Define input log formats", config_set_logformat, NULL); @@ -367,6 +406,10 @@ config_t *config_create(apr_pool_t *p) cfg->loglevel = LOGLEVEL_ERROR; cfg->summary = 1; cfg->transactions = 1; + cfg->thread_count = 1; // default one thread (aka non-threaded) + cfg->split_count = 4; + cfg->split_minimum = 10000; + cfg->split_maximum = 50000; cfg->input_files = apr_array_make(cfg->pool, 2, sizeof(config_filestat_t)); cfg->log_formats = apr_hash_make(cfg->pool); cfg->output_fields = apr_array_make(cfg->pool, 10, diff --git a/utility/config.h b/utility/config.h index ebedec3..91c6f65 100644 --- a/utility/config.h +++ b/utility/config.h @@ -41,6 +41,20 @@ struct config_t { /** list of files to scan */ apr_array_header_t *input_files; + /** split the input file before processing */ + int split_enabled; + /** the number of files to split each input file into */ + int split_count; + /** the minimum number of lines for each piece */ + int split_minimum; + /** the maximum number of lines for each piece */ + int split_maximum; + /** directory to put ouput split files */ + const char *split_dir; + + /** the number of threads to run the import in */ + int thread_count; + /** db connection configuration */ const char *dbdriver; const char *dbparams; @@ -77,7 +91,7 @@ struct config_t { typedef struct config_filestat_t config_filestat_t; struct config_filestat_t { - char *fname; + const char *fname; apr_size_t linesparsed; apr_size_t lineskipped; apr_size_t linesbad; diff --git a/utility/logparse.c b/utility/logparse.c index 534703d..7267682 100644 --- a/utility/logparse.c +++ b/utility/logparse.c @@ -200,18 +200,180 @@ void parser_find_logs(config_t *cfg) if (apr_dir_open(&dir, cfg->input_dir, tp)==APR_SUCCESS) { while (apr_dir_read(&finfo, APR_FINFO_NAME | APR_FINFO_TYPE, dir) == APR_SUCCESS) { + char *temp; if (finfo.filetype == APR_DIR) continue; newp = (config_filestat_t *)apr_array_push(cfg->input_files); newp->result = "Not Parsed"; - apr_filepath_merge(&(newp->fname), cfg->input_dir, finfo.name, + apr_filepath_merge(&temp, cfg->input_dir, finfo.name, APR_FILEPATH_TRUENAME, cfg->pool); + newp->fname = temp; } apr_dir_close(dir); } apr_pool_destroy(tp); } +#define BUFFER_SIZE (16 * 1024) + +void parser_split_logs(config_t *cfg) +{ + apr_pool_t *tp, *tfp; + apr_array_header_t *foundfiles; + config_filestat_t *filelist; + config_filestat_t *newfile; + apr_file_t *infile; + int f, l; + apr_status_t rv; + apr_finfo_t finfo; + char buff[BUFFER_SIZE]; + int linecount; + int piecesize; + + if (!cfg->split_enabled) return; + if (!cfg->split_dir) { + logging_log(cfg, LOGLEVEL_NOISE, "SPLITTER: Missing Split Output directory"); + return; + } + apr_pool_create(&tp, cfg->pool); + apr_pool_create(&tfp, tp); + + if (APR_SUCCESS != apr_stat(&finfo, cfg->split_dir, APR_FINFO_MIN, tp)) { + logging_log(cfg, LOGLEVEL_NOISE, "SPLITTER: Directory %s does not exist", cfg->split_dir); + return; + } + foundfiles = apr_array_copy(tp, cfg->input_files); + apr_array_clear(cfg->input_files); + + filelist = (config_filestat_t *)foundfiles->elts; + for (f=0, l=foundfiles->nelts; f < l; f++) { + apr_pool_clear(tfp); + logging_log(cfg, LOGLEVEL_NOTICE, "SPLITTER: Begin Splitting Log File '%s'", filelist[f].fname); + rv = apr_file_open(&infile, filelist[f].fname, APR_FOPEN_READ, APR_OS_DEFAULT, tfp); + + if (rv != APR_SUCCESS) { + logging_log(cfg, LOGLEVEL_NOISE, "SPLITTER: Could not open %s", filelist[f].fname); + return; + } + linecount = 0; + while (apr_file_eof(infile) == APR_SUCCESS) { + apr_size_t read = BUFFER_SIZE; + char *p; + apr_file_read(infile, buff, &read); + p = buff; + while ((p = memchr(p, '\n', (buff + read) - p))) { + ++p; + ++linecount; + } + } + printf("Lines %'d\n",linecount); + // now we know how long it is. Lets split up the file + piecesize = linecount / cfg->split_count; + if (piecesize < cfg->split_minimum) + piecesize = cfg->split_minimum; + if (piecesize > cfg->split_maximum && cfg->split_maximum > 0) + piecesize = cfg->split_maximum; + printf("Piece size %'d\n", piecesize); + if (piecesize > linecount) { + // File is smaller than piece size just add it back in as is + newfile = (config_filestat_t *)apr_array_push(cfg->input_files); + newfile->result = "Not Parsed"; + newfile->fname = filelist[f].fname; + } else { + //split apart the files + int cur_line = 0; + int file_count = 1; + int out_lines = 0; + const char *basefile, *file; + apr_file_t *outfile; + char trail[2048]; + apr_size_t trail_size = 0; + apr_size_t write; + apr_off_t off = 0; + + apr_file_seek(infile, APR_SET, &off); + + basefile = apr_pstrdup(tfp, basename(apr_pstrdup(tfp, filelist[f].fname))); + + file = apr_psprintf(tfp, "%s/%s-%d", cfg->split_dir, basefile, file_count++); + printf("Out file %s\n", file); + logging_log(cfg, LOGLEVEL_NOTICE, "SPLITTER: Creating output file %s", file); + rv = apr_file_open(&outfile, file, APR_FOPEN_WRITE | APR_FOPEN_CREATE | APR_FOPEN_TRUNCATE, APR_OS_DEFAULT, tfp); + if (rv != APR_SUCCESS) { + logging_log(cfg, LOGLEVEL_NOISE, "SPLITTER: Could not open %s (%d)", file, rv); + return; + } + newfile = (config_filestat_t *)apr_array_push(cfg->input_files); + newfile->result = "Not Parsed"; + newfile->fname = apr_pstrdup(cfg->pool, file); + + while (apr_file_eof(infile) == APR_SUCCESS) { + apr_size_t read = BUFFER_SIZE; + char *p, *pp, *buff_start; + apr_file_read(infile, buff, &read); + buff_start = p = pp = buff; + if (trail_size) { + p = memchr(p, '\n', (buff + read) - p); + if (p) { + //printf("Trail Line: %p, %p, %d\n", pp, p, (p - pp) + trail_size); + ++p; + pp = p; + ++cur_line; + ++out_lines; + // write out to file + apr_file_write(outfile, trail, &trail_size); + trail_size = 0; + } else { + if ((read + trail_size) > 2048) { + logging_log(cfg, LOGLEVEL_NOISE, "SPLITTER: Excessively long line %d in file %s", cur_line, filelist[f].fname); + exit(1); + } else { + memcpy(trail+trail_size, buff, read); + trail_size += read; + } + } + } + while ((p = memchr(p, '\n', (buff + read) - p))) { + //printf("Line: %p, %p, %d\n", pp, p, (p - pp)); + if (out_lines == piecesize) { + // Write out to file + write = pp - buff_start; + apr_file_write(outfile, buff_start, &write); + buff_start = pp; + out_lines = 0; + // Open new file + file = apr_psprintf(tfp, "%s/%s-%d", cfg->split_dir, basefile, file_count++); + printf("Out file %s\n", file); + logging_log(cfg, LOGLEVEL_NOTICE, "SPLITTER: Creating output file %s", file); + rv = apr_file_open(&outfile, file, APR_FOPEN_WRITE | APR_FOPEN_CREATE | APR_FOPEN_TRUNCATE, APR_OS_DEFAULT, tfp); + if (rv != APR_SUCCESS) { + logging_log(cfg, LOGLEVEL_NOISE, "SPLITTER: Could not open %s (%d)", file, rv); + return; + } + newfile = (config_filestat_t *)apr_array_push(cfg->input_files); + newfile->result = "Not Parsed"; + newfile->fname = apr_pstrdup(cfg->pool, file); + } + ++p; + pp = p; + ++cur_line; + ++out_lines; + } + // Write out to file + write = pp - buff_start; + apr_file_write(outfile, buff_start, &write); + + trail_size = (buff+read) - pp; + if (trail_size) { + memcpy(trail, pp, trail_size); + } + } + } + } + apr_pool_destroy(tfp); + apr_pool_destroy(tp); +} + apr_status_t parser_logbadline(config_t *cfg, const char *filename, const char *badline) { @@ -392,6 +554,7 @@ apr_status_t parser_parsefile(config_t *cfg, config_filestat_t *fstat) apr_file_t *file; apr_status_t rv; char buff[2048]; + char readbuff[BUFFER_SIZE]; char **targv; int targc; @@ -400,8 +563,8 @@ apr_status_t parser_parsefile(config_t *cfg, config_filestat_t *fstat) logging_log(cfg, LOGLEVEL_NOTICE, "PARSER: Begin Parsing Log File '%s'", fstat->fname); - rv = apr_file_open(&file, fstat->fname, APR_FOPEN_READ | APR_BUFFERED, - APR_OS_DEFAULT, tp); + rv = apr_file_open(&file, fstat->fname, APR_FOPEN_READ, APR_OS_DEFAULT, tp); + apr_file_buffer_set(file, readbuff, BUFFER_SIZE); if (rv != APR_SUCCESS) { logging_log(cfg, LOGLEVEL_NOISE, "PARSER: Could not open %s", fstat->fname); return rv; diff --git a/utility/logparse.h b/utility/logparse.h index 8f0fc42..cd085b5 100644 --- a/utility/logparse.h +++ b/utility/logparse.h @@ -23,6 +23,8 @@ void parser_init(apr_pool_t *p); void parser_find_logs(config_t *cfg); +void parser_split_logs(config_t *cfg); + apr_status_t parser_tokenize_line(const char *arg_str, char ***argv_out, apr_pool_t *token_context); diff --git a/utility/mod_log_sql.conf b/utility/mod_log_sql.conf index 5b2c0f9..92d567c 100644 --- a/utility/mod_log_sql.conf +++ b/utility/mod_log_sql.conf @@ -6,10 +6,18 @@ DBDParams "host=localhost;user=root;dbname=apache_log" Table access_log MachineID 7of9 UseTransactions on -LogLevel debug +LogLevel notice DryRun off Summary on +SplitInput on +#SplitCount 4 +SplitMinLines 0 +SplitMaxLines 50000 +SplitDirectory ./split_temp + +#ThreadCount 1 + BadLineFile ./badlines.log BadLineMax 10 diff --git a/utility/shell.c b/utility/shell.c index 0e9d646..2f2f43b 100644 --- a/utility/shell.c +++ b/utility/shell.c @@ -61,10 +61,10 @@ void print_summary(config_t *cfg) { fstat = (config_filestat_t *)cfg->input_files->elts; - printf("Execution Summary\n"); + printf("Execution Summary\nParsed %d files\n", cfg->input_files->nelts); for (i=0, m=cfg->input_files->nelts; iinput_files)) { + parser_split_logs(cfg); config_filestat_t *filelist; int f, l; filelist = (config_filestat_t *)cfg->input_files->elts; -- cgit