summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorGravatar Edward Rudd 2009-03-11 04:27:34 +0000
committerGravatar Edward Rudd 2009-03-11 04:27:34 +0000
commit99867e8a2eca4421075900e44f24cfd749db7dcb (patch)
tree4e895b4d820412ea502a7bb4e3804d2bdf673d67
parentb8cb3df3992184fe9dcd3fda36157874bb3f0c9e (diff)
add in splitting code
fixed time display (modulus of seconds)
-rw-r--r--utility/config.c67
-rw-r--r--utility/config.h16
-rw-r--r--utility/logparse.c169
-rw-r--r--utility/logparse.h2
-rw-r--r--utility/mod_log_sql.conf10
-rw-r--r--utility/shell.c7
6 files changed, 251 insertions, 20 deletions
diff --git a/utility/config.c b/utility/config.c
index 6d3f61e..b1ba4fa 100644
--- a/utility/config.c
+++ b/utility/config.c
@@ -24,6 +24,18 @@ static apr_status_t config_set_string(config_t *cfg, config_opt_t *opt,
24 return APR_SUCCESS; 24 return APR_SUCCESS;
25} 25}
26 26
27static apr_status_t config_set_file(config_t *cfg, config_opt_t *opt,
28 int argc, const char **argv)
29{
30 int offset = (int)(long)opt->data;
31 char **data = (char **)((void *)cfg + offset);
32 if (argc != 2)
33 return APR_EINVAL;
34 apr_filepath_merge(data, NULL, argv[1],
35 APR_FILEPATH_TRUENAME, cfg->pool);
36 return APR_SUCCESS;
37}
38
27static apr_status_t config_set_int(config_t *cfg, config_opt_t *opt, int argc, 39static apr_status_t config_set_int(config_t *cfg, config_opt_t *opt, int argc,
28 const char **argv) 40 const char **argv)
29{ 41{
@@ -70,7 +82,10 @@ static apr_status_t config_set_inputfile(config_t *cfg, config_opt_t *opt,
70 if (argc != 2) 82 if (argc != 2)
71 return APR_EINVAL; 83 return APR_EINVAL;
72 newp = (config_filestat_t *)apr_array_push(cfg->input_files); 84 newp = (config_filestat_t *)apr_array_push(cfg->input_files);
73 newp->fname = apr_pstrdup(cfg->pool, argv[1]); 85 char *temp;
86 apr_filepath_merge(&temp, NULL, argv[1],
87 APR_FILEPATH_TRUENAME, cfg->pool);
88 newp->fname = temp;
74 newp->result = "Not Parsed"; 89 newp->result = "Not Parsed";
75 return APR_SUCCESS; 90 return APR_SUCCESS;
76} 91}
@@ -213,6 +228,12 @@ void config_dump(config_t *cfg)
213 228
214 printf("InputDir: %s\n", cfg->input_dir); 229 printf("InputDir: %s\n", cfg->input_dir);
215 230
231 printf("Split input files: %d\n", cfg->split_enabled);
232 printf("Split output directory: %s\n", cfg->split_dir);
233 printf("Split file count: %d\n", cfg->split_count);
234 printf("Split min lines: %'d\n", cfg->split_minimum);
235 printf("Split max lines: %'d\n", cfg->split_maximum);
236
216 printf("DB Driver: %s\n", cfg->dbdriver); 237 printf("DB Driver: %s\n", cfg->dbdriver);
217 printf("DB Params: %s\n", cfg->dbparams); 238 printf("DB Params: %s\n", cfg->dbparams);
218 239
@@ -299,24 +320,42 @@ static void config_add_option(apr_pool_t *p, const char *const name,
299 320
300void config_init(apr_pool_t *p) 321void config_init(apr_pool_t *p)
301{ 322{
302 config_add_option(p, "ErrorLog", "File to log errors", config_set_string, 323 config_add_option(p, "ErrorLog", "File to log errors", config_set_file,
303 (void *)APR_OFFSETOF(config_t, errorlog)); 324 (void *)APR_OFFSETOF(config_t, errorlog));
304 config_add_option(p, "LogLevel", 325 config_add_option(p, "LogLevel", "Set Log Level (error, warn, debug, quiet)",
305 "Set Log Level (error, warn, debug, quiet)", config_set_loglevel, 326 config_set_loglevel, NULL);
306 NULL);
307 327
308 config_add_option(p, "BadLineFile", "File to log bad log lines", config_set_string, 328 config_add_option(p, "BadLineFile", "File to log bad log lines", config_set_file,
309 (void *)APR_OFFSETOF(config_t, badlinefile)); 329 (void *)APR_OFFSETOF(config_t, badlinefile));
310 config_add_option(p, "BadLineMax", 330 config_add_option(p, "BadLineMax", "Max number of bad lines before aborting",
311 "Max number of bad lines before aborting", config_set_int, 331 config_set_int, (void *)APR_OFFSETOF(config_t, badlinemax));
312 (void *)APR_OFFSETOF(config_t, badlinemax));
313 332
314 333
315 config_add_option(p, "InputDirectory", "Directory to scan for log files", 334 config_add_option(p, "InputDirectory", "Directory to scan for log files",
316 config_set_string, (void *)APR_OFFSETOF(config_t, input_dir)); 335 config_set_file, (void *)APR_OFFSETOF(config_t, input_dir));
317 config_add_option(p, "InputFile", "Parse only this file", 336 config_add_option(p, "InputFile", "Parse only this file",
318 config_set_inputfile, NULL); 337 config_set_inputfile, NULL);
319 338
339 config_add_option(p, "SplitInput",
340 "Split the file into pieces, then process",
341 config_set_flag, (void *)APR_OFFSETOF(config_t, split_enabled));
342 config_add_option(p, "SplitCount",
343 "Split the file into N number of pieces",
344 config_set_int, (void *)APR_OFFSETOF(config_t, split_count));
345 config_add_option(p, "SplitMinLines",
346 "Each split piece will have a minumum of N lines",
347 config_set_int, (void *)APR_OFFSETOF(config_t, split_minimum));
348 config_add_option(p, "SplitMaxLines",
349 "Each split piece will have a maximum of N lines",
350 config_set_int, (void *)APR_OFFSETOF(config_t, split_maximum));
351 config_add_option(p, "SplitDirectory",
352 "Output directory to put intermediate split files",
353 config_set_file, (void *)APR_OFFSETOF(config_t, split_dir));
354
355 config_add_option(p, "ThreadCount",
356 "Numer of threads to use for processing the input files",
357 config_set_int, (void *)APR_OFFSETOF(config_t, thread_count));
358
320 config_add_option(p, "DBDDriver", "DBD Driver to use", 359 config_add_option(p, "DBDDriver", "DBD Driver to use",
321 config_set_string, (void *)APR_OFFSETOF(config_t, dbdriver)); 360 config_set_string, (void *)APR_OFFSETOF(config_t, dbdriver));
322 config_add_option(p, "DBDParams", "DBD Connection Parameters", 361 config_add_option(p, "DBDParams", "DBD Connection Parameters",
@@ -325,8 +364,8 @@ void config_init(apr_pool_t *p)
325 config_set_string, (void *)APR_OFFSETOF(config_t, table)); 364 config_set_string, (void *)APR_OFFSETOF(config_t, table));
326 config_add_option(p, "UseTransactions", "Enable Transactions?", 365 config_add_option(p, "UseTransactions", "Enable Transactions?",
327 config_set_flag, (void *)APR_OFFSETOF(config_t, transactions)); 366 config_set_flag, (void *)APR_OFFSETOF(config_t, transactions));
328 config_add_option(p, "MachineID", "Machine ID to set", config_set_string, 367 config_add_option(p, "MachineID", "Machine ID to set",
329 (void *)APR_OFFSETOF(config_t, machineid)); 368 config_set_string, (void *)APR_OFFSETOF(config_t, machineid));
330 369
331 config_add_option(p, "LogFormatConfig", "Define input log formats", 370 config_add_option(p, "LogFormatConfig", "Define input log formats",
332 config_set_logformat, NULL); 371 config_set_logformat, NULL);
@@ -367,6 +406,10 @@ config_t *config_create(apr_pool_t *p)
367 cfg->loglevel = LOGLEVEL_ERROR; 406 cfg->loglevel = LOGLEVEL_ERROR;
368 cfg->summary = 1; 407 cfg->summary = 1;
369 cfg->transactions = 1; 408 cfg->transactions = 1;
409 cfg->thread_count = 1; // default one thread (aka non-threaded)
410 cfg->split_count = 4;
411 cfg->split_minimum = 10000;
412 cfg->split_maximum = 50000;
370 cfg->input_files = apr_array_make(cfg->pool, 2, sizeof(config_filestat_t)); 413 cfg->input_files = apr_array_make(cfg->pool, 2, sizeof(config_filestat_t));
371 cfg->log_formats = apr_hash_make(cfg->pool); 414 cfg->log_formats = apr_hash_make(cfg->pool);
372 cfg->output_fields = apr_array_make(cfg->pool, 10, 415 cfg->output_fields = apr_array_make(cfg->pool, 10,
diff --git a/utility/config.h b/utility/config.h
index ebedec3..91c6f65 100644
--- a/utility/config.h
+++ b/utility/config.h
@@ -41,6 +41,20 @@ struct config_t {
41 /** list of files to scan */ 41 /** list of files to scan */
42 apr_array_header_t *input_files; 42 apr_array_header_t *input_files;
43 43
44 /** split the input file before processing */
45 int split_enabled;
46 /** the number of files to split each input file into */
47 int split_count;
48 /** the minimum number of lines for each piece */
49 int split_minimum;
50 /** the maximum number of lines for each piece */
51 int split_maximum;
52 /** directory to put ouput split files */
53 const char *split_dir;
54
55 /** the number of threads to run the import in */
56 int thread_count;
57
44 /** db connection configuration */ 58 /** db connection configuration */
45 const char *dbdriver; 59 const char *dbdriver;
46 const char *dbparams; 60 const char *dbparams;
@@ -77,7 +91,7 @@ struct config_t {
77 91
78typedef struct config_filestat_t config_filestat_t; 92typedef struct config_filestat_t config_filestat_t;
79struct config_filestat_t { 93struct config_filestat_t {
80 char *fname; 94 const char *fname;
81 apr_size_t linesparsed; 95 apr_size_t linesparsed;
82 apr_size_t lineskipped; 96 apr_size_t lineskipped;
83 apr_size_t linesbad; 97 apr_size_t linesbad;
diff --git a/utility/logparse.c b/utility/logparse.c
index 534703d..7267682 100644
--- a/utility/logparse.c
+++ b/utility/logparse.c
@@ -200,18 +200,180 @@ void parser_find_logs(config_t *cfg)
200 if (apr_dir_open(&dir, cfg->input_dir, tp)==APR_SUCCESS) { 200 if (apr_dir_open(&dir, cfg->input_dir, tp)==APR_SUCCESS) {
201 while (apr_dir_read(&finfo, APR_FINFO_NAME | APR_FINFO_TYPE, dir) 201 while (apr_dir_read(&finfo, APR_FINFO_NAME | APR_FINFO_TYPE, dir)
202 == APR_SUCCESS) { 202 == APR_SUCCESS) {
203 char *temp;
203 if (finfo.filetype == APR_DIR) 204 if (finfo.filetype == APR_DIR)
204 continue; 205 continue;
205 newp = (config_filestat_t *)apr_array_push(cfg->input_files); 206 newp = (config_filestat_t *)apr_array_push(cfg->input_files);
206 newp->result = "Not Parsed"; 207 newp->result = "Not Parsed";
207 apr_filepath_merge(&(newp->fname), cfg->input_dir, finfo.name, 208 apr_filepath_merge(&temp, cfg->input_dir, finfo.name,
208 APR_FILEPATH_TRUENAME, cfg->pool); 209 APR_FILEPATH_TRUENAME, cfg->pool);
210 newp->fname = temp;
209 } 211 }
210 apr_dir_close(dir); 212 apr_dir_close(dir);
211 } 213 }
212 apr_pool_destroy(tp); 214 apr_pool_destroy(tp);
213} 215}
214 216
217#define BUFFER_SIZE (16 * 1024)
218
219void parser_split_logs(config_t *cfg)
220{
221 apr_pool_t *tp, *tfp;
222 apr_array_header_t *foundfiles;
223 config_filestat_t *filelist;
224 config_filestat_t *newfile;
225 apr_file_t *infile;
226 int f, l;
227 apr_status_t rv;
228 apr_finfo_t finfo;
229 char buff[BUFFER_SIZE];
230 int linecount;
231 int piecesize;
232
233 if (!cfg->split_enabled) return;
234 if (!cfg->split_dir) {
235 logging_log(cfg, LOGLEVEL_NOISE, "SPLITTER: Missing Split Output directory");
236 return;
237 }
238 apr_pool_create(&tp, cfg->pool);
239 apr_pool_create(&tfp, tp);
240
241 if (APR_SUCCESS != apr_stat(&finfo, cfg->split_dir, APR_FINFO_MIN, tp)) {
242 logging_log(cfg, LOGLEVEL_NOISE, "SPLITTER: Directory %s does not exist", cfg->split_dir);
243 return;
244 }
245 foundfiles = apr_array_copy(tp, cfg->input_files);
246 apr_array_clear(cfg->input_files);
247
248 filelist = (config_filestat_t *)foundfiles->elts;
249 for (f=0, l=foundfiles->nelts; f < l; f++) {
250 apr_pool_clear(tfp);
251 logging_log(cfg, LOGLEVEL_NOTICE, "SPLITTER: Begin Splitting Log File '%s'", filelist[f].fname);
252 rv = apr_file_open(&infile, filelist[f].fname, APR_FOPEN_READ, APR_OS_DEFAULT, tfp);
253
254 if (rv != APR_SUCCESS) {
255 logging_log(cfg, LOGLEVEL_NOISE, "SPLITTER: Could not open %s", filelist[f].fname);
256 return;
257 }
258 linecount = 0;
259 while (apr_file_eof(infile) == APR_SUCCESS) {
260 apr_size_t read = BUFFER_SIZE;
261 char *p;
262 apr_file_read(infile, buff, &read);
263 p = buff;
264 while ((p = memchr(p, '\n', (buff + read) - p))) {
265 ++p;
266 ++linecount;
267 }
268 }
269 printf("Lines %'d\n",linecount);
270 // now we know how long it is. Lets split up the file
271 piecesize = linecount / cfg->split_count;
272 if (piecesize < cfg->split_minimum)
273 piecesize = cfg->split_minimum;
274 if (piecesize > cfg->split_maximum && cfg->split_maximum > 0)
275 piecesize = cfg->split_maximum;
276 printf("Piece size %'d\n", piecesize);
277 if (piecesize > linecount) {
278 // File is smaller than piece size just add it back in as is
279 newfile = (config_filestat_t *)apr_array_push(cfg->input_files);
280 newfile->result = "Not Parsed";
281 newfile->fname = filelist[f].fname;
282 } else {
283 //split apart the files
284 int cur_line = 0;
285 int file_count = 1;
286 int out_lines = 0;
287 const char *basefile, *file;
288 apr_file_t *outfile;
289 char trail[2048];
290 apr_size_t trail_size = 0;
291 apr_size_t write;
292 apr_off_t off = 0;
293
294 apr_file_seek(infile, APR_SET, &off);
295
296 basefile = apr_pstrdup(tfp, basename(apr_pstrdup(tfp, filelist[f].fname)));
297
298 file = apr_psprintf(tfp, "%s/%s-%d", cfg->split_dir, basefile, file_count++);
299 printf("Out file %s\n", file);
300 logging_log(cfg, LOGLEVEL_NOTICE, "SPLITTER: Creating output file %s", file);
301 rv = apr_file_open(&outfile, file, APR_FOPEN_WRITE | APR_FOPEN_CREATE | APR_FOPEN_TRUNCATE, APR_OS_DEFAULT, tfp);
302 if (rv != APR_SUCCESS) {
303 logging_log(cfg, LOGLEVEL_NOISE, "SPLITTER: Could not open %s (%d)", file, rv);
304 return;
305 }
306 newfile = (config_filestat_t *)apr_array_push(cfg->input_files);
307 newfile->result = "Not Parsed";
308 newfile->fname = apr_pstrdup(cfg->pool, file);
309
310 while (apr_file_eof(infile) == APR_SUCCESS) {
311 apr_size_t read = BUFFER_SIZE;
312 char *p, *pp, *buff_start;
313 apr_file_read(infile, buff, &read);
314 buff_start = p = pp = buff;
315 if (trail_size) {
316 p = memchr(p, '\n', (buff + read) - p);
317 if (p) {
318 //printf("Trail Line: %p, %p, %d\n", pp, p, (p - pp) + trail_size);
319 ++p;
320 pp = p;
321 ++cur_line;
322 ++out_lines;
323 // write out to file
324 apr_file_write(outfile, trail, &trail_size);
325 trail_size = 0;
326 } else {
327 if ((read + trail_size) > 2048) {
328 logging_log(cfg, LOGLEVEL_NOISE, "SPLITTER: Excessively long line %d in file %s", cur_line, filelist[f].fname);
329 exit(1);
330 } else {
331 memcpy(trail+trail_size, buff, read);
332 trail_size += read;
333 }
334 }
335 }
336 while ((p = memchr(p, '\n', (buff + read) - p))) {
337 //printf("Line: %p, %p, %d\n", pp, p, (p - pp));
338 if (out_lines == piecesize) {
339 // Write out to file
340 write = pp - buff_start;
341 apr_file_write(outfile, buff_start, &write);
342 buff_start = pp;
343 out_lines = 0;
344 // Open new file
345 file = apr_psprintf(tfp, "%s/%s-%d", cfg->split_dir, basefile, file_count++);
346 printf("Out file %s\n", file);
347 logging_log(cfg, LOGLEVEL_NOTICE, "SPLITTER: Creating output file %s", file);
348 rv = apr_file_open(&outfile, file, APR_FOPEN_WRITE | APR_FOPEN_CREATE | APR_FOPEN_TRUNCATE, APR_OS_DEFAULT, tfp);
349 if (rv != APR_SUCCESS) {
350 logging_log(cfg, LOGLEVEL_NOISE, "SPLITTER: Could not open %s (%d)", file, rv);
351 return;
352 }
353 newfile = (config_filestat_t *)apr_array_push(cfg->input_files);
354 newfile->result = "Not Parsed";
355 newfile->fname = apr_pstrdup(cfg->pool, file);
356 }
357 ++p;
358 pp = p;
359 ++cur_line;
360 ++out_lines;
361 }
362 // Write out to file
363 write = pp - buff_start;
364 apr_file_write(outfile, buff_start, &write);
365
366 trail_size = (buff+read) - pp;
367 if (trail_size) {
368 memcpy(trail, pp, trail_size);
369 }
370 }
371 }
372 }
373 apr_pool_destroy(tfp);
374 apr_pool_destroy(tp);
375}
376
215apr_status_t parser_logbadline(config_t *cfg, const char *filename, 377apr_status_t parser_logbadline(config_t *cfg, const char *filename,
216 const char *badline) 378 const char *badline)
217{ 379{
@@ -392,6 +554,7 @@ apr_status_t parser_parsefile(config_t *cfg, config_filestat_t *fstat)
392 apr_file_t *file; 554 apr_file_t *file;
393 apr_status_t rv; 555 apr_status_t rv;
394 char buff[2048]; 556 char buff[2048];
557 char readbuff[BUFFER_SIZE];
395 char **targv; 558 char **targv;
396 int targc; 559 int targc;
397 560
@@ -400,8 +563,8 @@ apr_status_t parser_parsefile(config_t *cfg, config_filestat_t *fstat)
400 563
401 logging_log(cfg, LOGLEVEL_NOTICE, "PARSER: Begin Parsing Log File '%s'", fstat->fname); 564 logging_log(cfg, LOGLEVEL_NOTICE, "PARSER: Begin Parsing Log File '%s'", fstat->fname);
402 565
403 rv = apr_file_open(&file, fstat->fname, APR_FOPEN_READ | APR_BUFFERED, 566 rv = apr_file_open(&file, fstat->fname, APR_FOPEN_READ, APR_OS_DEFAULT, tp);
404 APR_OS_DEFAULT, tp); 567 apr_file_buffer_set(file, readbuff, BUFFER_SIZE);
405 if (rv != APR_SUCCESS) { 568 if (rv != APR_SUCCESS) {
406 logging_log(cfg, LOGLEVEL_NOISE, "PARSER: Could not open %s", fstat->fname); 569 logging_log(cfg, LOGLEVEL_NOISE, "PARSER: Could not open %s", fstat->fname);
407 return rv; 570 return rv;
diff --git a/utility/logparse.h b/utility/logparse.h
index 8f0fc42..cd085b5 100644
--- a/utility/logparse.h
+++ b/utility/logparse.h
@@ -23,6 +23,8 @@ void parser_init(apr_pool_t *p);
23 23
24void parser_find_logs(config_t *cfg); 24void parser_find_logs(config_t *cfg);
25 25
26void parser_split_logs(config_t *cfg);
27
26apr_status_t parser_tokenize_line(const char *arg_str, char ***argv_out, 28apr_status_t parser_tokenize_line(const char *arg_str, char ***argv_out,
27 apr_pool_t *token_context); 29 apr_pool_t *token_context);
28 30
diff --git a/utility/mod_log_sql.conf b/utility/mod_log_sql.conf
index 5b2c0f9..92d567c 100644
--- a/utility/mod_log_sql.conf
+++ b/utility/mod_log_sql.conf
@@ -6,10 +6,18 @@ DBDParams "host=localhost;user=root;dbname=apache_log"
6Table access_log 6Table access_log
7MachineID 7of9 7MachineID 7of9
8UseTransactions on 8UseTransactions on
9LogLevel debug 9LogLevel notice
10DryRun off 10DryRun off
11Summary on 11Summary on
12 12
13SplitInput on
14#SplitCount 4
15SplitMinLines 0
16SplitMaxLines 50000
17SplitDirectory ./split_temp
18
19#ThreadCount 1
20
13BadLineFile ./badlines.log 21BadLineFile ./badlines.log
14BadLineMax 10 22BadLineMax 10
15 23
diff --git a/utility/shell.c b/utility/shell.c
index 0e9d646..2f2f43b 100644
--- a/utility/shell.c
+++ b/utility/shell.c
@@ -61,10 +61,10 @@ void print_summary(config_t *cfg) {
61 61
62 fstat = (config_filestat_t *)cfg->input_files->elts; 62 fstat = (config_filestat_t *)cfg->input_files->elts;
63 63
64 printf("Execution Summary\n"); 64 printf("Execution Summary\nParsed %d files\n", cfg->input_files->nelts);
65 for (i=0, m=cfg->input_files->nelts; i<m; i++) { 65 for (i=0, m=cfg->input_files->nelts; i<m; i++) {
66 printf(" File: %s\n" 66 printf(" File: %s\n"
67 " Lines Parsed %d out of %d (Skipped %d, Bad %d)\n" 67 " Lines Parsed %'d out of %'d (Skipped %'d, Bad %'d)\n"
68 " Status: %s\n" 68 " Status: %s\n"
69 " Duration: %02"APR_TIME_T_FMT":%02"APR_TIME_T_FMT".%"APR_TIME_T_FMT" (minutes, seconds, and miliseconds)\n" 69 " Duration: %02"APR_TIME_T_FMT":%02"APR_TIME_T_FMT".%"APR_TIME_T_FMT" (minutes, seconds, and miliseconds)\n"
70 "\n", 70 "\n",
@@ -73,7 +73,7 @@ void print_summary(config_t *cfg) {
73 fstat[i].linesparsed, fstat[i].lineskipped, fstat[i].linesbad, 73 fstat[i].linesparsed, fstat[i].lineskipped, fstat[i].linesbad,
74 fstat[i].result, 74 fstat[i].result,
75 apr_time_sec(fstat[i].stop - fstat[i].start)/60, 75 apr_time_sec(fstat[i].stop - fstat[i].start)/60,
76 apr_time_sec(fstat[i].stop - fstat[i].start), 76 apr_time_sec(fstat[i].stop - fstat[i].start) % 60,
77 apr_time_msec(fstat[i].stop - fstat[i].start) 77 apr_time_msec(fstat[i].stop - fstat[i].start)
78 ); 78 );
79 } 79 }
@@ -192,6 +192,7 @@ int main(int argc, const char *const argv[])
192 } 192 }
193 } 193 }
194 if (!apr_is_empty_array(cfg->input_files)) { 194 if (!apr_is_empty_array(cfg->input_files)) {
195 parser_split_logs(cfg);
195 config_filestat_t *filelist; 196 config_filestat_t *filelist;
196 int f, l; 197 int f, l;
197 filelist = (config_filestat_t *)cfg->input_files->elts; 198 filelist = (config_filestat_t *)cfg->input_files->elts;