Parsing large Files with Apache Camel

Juli 9, 2012 claus

For a transformation with Smooks we had to parse the incoming EDI files line per line, to create a valid EDI source. For this we wrote a simple route like this:

from("file://src/data/in?delete=true&idempotent=true&readLock=changed&readLockCheckInterval=1500")
	.marshal().string("UTF-8")
	.split(body().tokenize("\n")).streaming()
		.bean(LineParser.class)
		.to("log:LINEPARSER?groupSize=1000")
		.to("file://src/data/out?fileExist=Append")
	.end();

This route reads a file, splits it into line chuncks, each line will be parsed and finally the modified line will be added to a file in the destination folder. Really simple and it does what it was build for… But that approach is really slow. For each line we have a single write operation. This is really expensive as this console output shows:

Per line writes are really slow

Per line writes are really slow

We have files with more that 5 million lines – so at a average of 170 to 200 messages per second we would need nearly 7 hours to parse one file! For this we needed a solution. And camel provides one: the aggregation pattern. Parsing should be faster if we aggregate the line before writing it on disk. So let’s try it with that route:

from("file://src/data/in?delete=true&idempotent=true&readLock=changed&readLockCheckInterval=1500")
	.marshal().string("UTF-8")
	.split(body().tokenize("\n")).streaming()
		.bean(LineParser.class)
		.to("log:LINEPARSER?groupSize=1000")
                .setHeader("foo", constant("foo")) // #1
                .aggregate(header("foo"), 
                                new StringBodyAggregator()).completionSize(750)
                                                .completionTimeout(1500) // #2
		.to("file://src/data/out?fileExist=Append")
	.end();

We added two lines. First the one marked with ‘#1′ – it sets a fixed header variable for aggregation (I know this is nonsense, but we did that mistake – so take it as lessons learned ;). The second one marked with ‘#2′ is the aggregation itself. There’s no hidden magic – the ‘StringBodyAggregator’ class simply aggregates all incomming bodies as they are:

public class StringBodyAggregator implements AggregationStrategy {
 
	@Override
	public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
 
		if(oldExchange == null) {
			return newExchange;
		}
 
		String oldBody = oldExchange.getIn().getBody(String.class);
		String newBody = newExchange.getIn().getBody(String.class);
		String body = oldBody + newBody;
 
		oldExchange.getIn().setBody(body);
 
		return oldExchange;
	}
 
}

The result is stunning. The average speed raised from 200 messages (lines) per second to more that 4.000 messages!!!

line parsing with aggregation

line parsing with aggregation


This is twenty times faster! A really great improvement… But lets take a look on the result files. The three source test files have been initially equal in size. So the result files sizes should be equal as well. But they aren’t. The reason is quite simple – because of our global aggregation variable the have been mixed up. File ‘a’ now contains entries from file ‘b’ and file ‘b’ from file ‘c’.
The solution for this problem is really simple. Just replace the global aggregation route with the file name that the Camel file component provides out of the box (see #1 in the listing below).

from("file://src/data/in?delete=true&idempotent=true&readLock=changed&readLockCheckInterval=500")
	.marshal().string("UTF-8")
	.split(body().tokenize("\n")).streaming()
		.bean(LineParser.class)
		.to("log:LINEPARSER?groupSize=1000")
		.aggregate(header(Exchange.FILE_NAME_ONLY), // #1
				new StringBodyAggregator()).completionSize(750)
							.completionTimeout(1500)
		.to("file://src/data/out?fileExist=Append")
	.end();

And voilà. The speed is as high as before – and the result ist correct, too. We have three equal sized files inside folder ‘out’.
sample sources

, , , , apache camel, tech docs

7 Comments → “Parsing large Files with Apache Camel”

  1. Christian Müller 1 year ago   Antworten

    Hi Claus!

    Nice block.
    If you use a StringBuilder for the String concatenation, you can speed this up a bit more:
    public class MyAggregatorStrategy implements AggregationStrategy {

    @Override
    public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
    if (oldExchange == null) {
    newExchange.getIn().setBody(new StringBuilder(newExchange.getIn().getBody(String.class)));
    return newExchange;
    }

    oldExchange.getIn().getBody(StringBuilder.class).append(newExchange.getIn().getBody(String.class));

    return oldExchange;
    }
    }

    You also have to add a
    convertBodyTo(String.class)
    before writing into the file…

    Hope this helps.

    Best,
    Christian

  2. GV 1 year ago   Antworten

    how to configure this using SPring DSL ? Can you please help ?

  3. kanthraj 2 months ago   Antworten

    Am starting with Camel. Need a full code to Read a file contenrs and display it on console.

    any some links to CAmel documentation.

Trackbacks For This Post

  1. Writing large Files line by line with Apache Camel | Vivi's Home - 5 months ago

    [...] via Writing large Files line by line with Apache Camel. [...]

  2. How to aggregate one message into multiple group with camel aggregate? - Apache Solutions - Developers Q & A - 5 months ago

    [...] more information: here is an article wrote by Claus Ibsen on parsing large file with Camel This entry was posted in [...]

  3. A streaming Camel Aggregator strategy | alvrod blogs here - 1 month ago

    [...] Parsing large files with Apache Camel – a nice example using the Splitter / Aggregator to process a file and generate an output file using the Aggregator to improve performance by buffering writes. Consider the comments, too – the first comment proposes to use a StringBuilder instead of concatenating Strings, an important improvement3. [...]

Leave a Reply