In part one of this tutorial I showed the problem with events in a clustered environment from an architecture view. Now we want our hands dirty and implement a solution using Apache Camel and MongoDB.
As you know Apache Camel implements the world famous enterprise integration patterns (EIP). The only thing we have to do is to choose one, that fits on our needs. The problem is – in short – that we have potentially n event instances inside a queue and potentially n consumers that are waiting. But only exactly one event instance should be consumed by one consumer. So we have to ensure, that not a double consumption happens. Slide four shows that problem:
So what can Camel do for us? If you’re searching for duplicated messages on the EIP page, you will find the ‘Idempotent Consumer Pattern‘. This is exactly what we’re looking for. With three lines of code it filters out all duplicated messages:
1 2 3 4 5 6 | ... from("seda:a") //source queue .idempotentConsumer(header("myMessageId"), MemoryIdempotentRepository.memoryIdempotentRepository(200)) //filter for duplicated messages .to("seda:b"); //destination queue ... |
That was easy… But wait! MemoryIdempotentRepository? We’re in a clustered environment. A in memory repository will not be shared over n server instances. So it will avoid, that the actual instance will consume duplicated events, but this is no guarantee, that not an other server instance will consume a second event from the same type. We need something with a shared persistence. That’s the point where MongoDB comes into game. We’re using MongoDB as persistence layer under catify, so it’s not a question, what db to use (here’re some thoughts about Mongo DB). By the way – it’s best suited for that case, because it’s really fast, scalable and reliable.
My first thought was to use a capped collection for this case. A capped collection has a fixed size and order. That means the last document that you put in it will be placed on top of the ‘stack’ – last in, first out. Fixed size means, you can provide a size in bytes and optionally a number of documents. If the size has been reached, the oldest document will be thrown away – the perfect feature for our case. We don’t have to think about disk space and a strategy to remove old registry entries. But there comes a great disadvantage with capped collections for our case: They don’t have indexes. On one side this is good, because it makes a capped collection even faster, but if you want to avoid duplicated db entries this is really bad. So a capped collection would be in theory fine, but we need something with a unique index constraint. So I decided to use a ‘normal’ collection (which is fast enough at all :).
The code for our MongoDB idempotent repository looks as follows:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 | ... public class Routes extends RouteBuilder { // we inject the instance over spring private MongoDbIdempotentRepository mongoDbIdempotentRepository; @Override public void configure() throws Exception { errorHandler(deadLetterChannel("seda:error")); /* * route filters out duplicated event messages * (e.g. produced from multiple server instances * in a clustered environment) */ from("seda:in") .routeId("mongodb_idempotent_repository")// for testing purposes it's a good idea to give every route an id .idempotentConsumer(header("myMessageId"), this.mongoDbIdempotentRepository) .to("seda:out"); } /** * set the instance of idempotent repository * of spring (IoC) * * @param mongoDbIdempotentRepository */ public void setMongoDbIdempotentRepository( MongoDbIdempotentRepository mongoDbIdempotentRepository) { this.mongoDbIdempotentRepository = mongoDbIdempotentRepository; } } |
Cool. But where is the MongoDbIdempotentRepository class? I’ll show you how to implement this in the next part of the series :)
[...] part two you can see how this will implemented with Apache Camel and MongoDB in five minutes using one of [...]
[...] the posts one and two I showed the problem and how to solve this in camel – we used a idempotent repository based [...]
Hi, great blog. You let us see that the Camel possibilities are infinite.
I have one question: is your assumption true? Does a cluster mean multiple consumers for an event?
ActiveMQ messages can be distributed to queues with the same name in a cluster
(see “Using Networks of Brokers” in https://access.redhat.com/knowledge/docs/Fuse_MQ_Enterprise/ ) and will as such be consumed loadbalanced, but only once.
But I may have missed something in your usecase.
Hi Luc,
I think we’re talking about the same. Perhaps “potentally n consumers for an event” would be a better wording. If you assume using Active MQ you won’t have that problem because the broker will handle that. But if you’ll try that – let’s say with a distributed cache like hazelcast, you have to ensure that an event will consumed only once. Here can Camel help.
Best regards – Claus