A streaming Camel Aggregator strategy

Having established that Camel is a nice tool in your belt, and as promised in my previous post, let’s move forward with a detailed view of the all-mighty Aggregator pattern and walk through the creation of a custom streaming-based aggregator strategy.

My goodness editor, Minsc, is very happy with this post, as it is all about promoting a useful library.

Why the Camel Aggregator is so useful

Abstractly speaking, the Aggregator is a special kind of sink that can group “related” incoming messages until the group is “complete”. Upon completion the Aggregator outputs a single, “aggregated” message. So why is this useful? This abstract behaviour can map to several concrete use cases. For example, the Camel solution to a few EIP uses Aggregators:

  • Recipient List uses an aggregation strategy to group the responses from a list of recipients into a single response to the source. The Scatter-Gather pattern is an extension using the same basic concept. The Multicast pattern is similar too.
  • Whenever you use a Splitter to process pieces of a message individually there will be an Aggregator to create a response to create a unified outgoing message by combining the outputs of the sub-processed messages. The Composed Message Processor takes a similar approach.

To illustrate with a somewhat silly example, suppose that you have a route that receives words (i.e. each input message consists of a single word), and you want to output whole sentences. Furthermore let’s assume that you can recognize a “sentence” when you receive a message ending in a period. You could use the Aggregator for this:

[code language=”xml”]
<aggregate strategyRef="aggregatorStrategy">
<correlationExpression>
<constant>true</constant>
</correlationExpression>

<completionPredicate>
<simple>${body} regex ‘^.*\.$'</simple>
</completionPredicate>

<to uri="mock:aggregated"/>
</aggregate>
[/code]

Let’s go over this tiny example to understand how the different bits realize the Aggregator concepts. Recall that

the Aggregator groups related incoming messages

You define whether messages are correlated or not via the “correlationExpression” child of the Aggregator declaration. The example above says that all messages are correlated but there is considerable flexibility here to use parts of the exchange such as a file name, id, or whatever.

until the group is complete

You define when you are finished aggregating by declaring a “completionPredicate”. In this case I’m completing when I receive a message that ends in a period. Again there is a lot of flexibility here and you will regularly find yourself using appropriate headers to dynamically determine whether the aggregation is complete[1. There is also specific support to integrate with other patterns, such as the Batch Consumer support.]. Other completion options include:

  • size, declared as the completionSize attribute of the aggregate element: the aggregate will complete after receiving size messages.
  • timeout, declared as the completionTimeout or as the completionInterval attribute of the aggregate element: the aggregate will complete after being inactive for completionTimeout millis or every completionInterval millis.

upon completion the Aggregator outputs the aggregated message

Whenever the aggregator completes a message will be produced and sent to the “to” sink defined inside of it (to uri=”mock” in the example above).

Some additional notes:

  • The aggregate may “complete” multiple times depending on the closeCorrelationKeyOnCompletion option which is disabled by default
  • The aggregate can behave quite differently depending on its strategy (more on this follows), which may require additional parameters
  • The aggregate itself is a sink, which means that you can put it as the destination of any route (i.e. anywhere you’d put a <to … /> element)

Lastly, more important, and tying with the real purpose of this post: the aggregation strategy, which

  • implements the desired aggregation behaviour
  • is a class that implements AggregationStrategy or the more specific / featureful TimeoutAwareAggregationStrategy or CompletionAwareAggregationStrategy
  • must be instantiated elsewhere and provided to the Aggregate via the strategyRef attribute of the aggregate element
  • can be one of the strategies provided by Camel or be a class that you wrote as part of your project

For example this is how you’d create a GroupedExchangeAggregationStrategy, which is part of camel-core and simply stores all the aggregated exchanges in a list (the “id” of this bean is the name passed to the aggregate as the value of strategyRef):

[code language=”xml”]
<bean
id="aggregatorStrategy"
class="org.apache.camel.processor.aggregate.GroupedExchangeAggregationStrategy"
/>
[/code]

More aggregation strategies

I used my google-fu to find more aggregation strategies being used[2. Or at least blogged about.]. Usage examples always help to understand what’s going on.

Using the Camel Aggregator correctly – a fairly complete post about the Aggregator.

Parsing large files with Apache Camel – a nice example using the Splitter / Aggregator to process a file and generate an output file using the Aggregator to improve performance by buffering writes. Consider the comments, too – the first comment proposes to use a StringBuilder instead of concatenating Strings, an important improvement[3. The comment is by Christian Müller, a Camel commiter, which goes to show how helpful the Camel devs are.].

Loading data from log files into a database using Splitter / Aggregator – another complete example putting these fine components to real world usage.

There’s also a lot of detail within the Red Hat Fuse documentation, including details on the threading model.

Creating large files

Take a look at the Parsing large files with Apache Camel post and the URI he’s using to produce the file via the File component:

[code language=”text”]file://src/data/out?fileExist=Append[/code]

Notice how Claus is using the fileExist=Append option? That’s because his aggregate is completing multiple times and writing a large “block” of lines to the output file every time it completes. Indeed this would seem to be the only way to continuously produce a file “by blocks”. However doing it like this means that the file is “produced” many times, and so introduces a couple of problems.

Signaling completion

All these fake completions prevent the usage of the important tempFileName / doneFileName options. These options exist for a reason: though file based integrations are exceedingly common, it is sadly difficult, in practice, for processes to reliably integrate via shared files[4. The File component has many options clearly directed at alleviating this pain for which I’m deeply grateful to the Camel project.].

Error management

We took an input file, split it to pieces, processed each piece independently, then put it back together. Sound familiar? So what if something unexpected happens during all that processing? Like the software equivalent of a deflector shield coming up during transmission… something that might cause message loss along the way, for example. How are we to react, or indeed even detect, such a case?

fileExist=Append is not enough

So let’s say we want to use this approach to process large files, but we don’t want to lose the ability to signal other processes when we’re actually done creating the file, or to detect errors. What to do, then?

Introducing the StreamingAggregatorStrategy

Here’s an idea: establish a side channel to stream the content from the source to the sink, while making the Aggregate complete only once per file, so that completion can be properly signaled to other applications, including completion status (success or error). This is how it works:

  1. When a new aggregate begins, create the output file and a PipedWriter / PipedReader pair. Link the PipedReader to the file, keep it open and store a reference to the PipedWriter in the Exchange, in the same fashion as the previous examples store the aggregate object in the Exchange.
  2. When new Exchanges arrive to the aggregate, write their content to the writer and discard them.
  3. When the aggregate completes, close the pipes, verify the content of the aggregate and output a signal with the result.

The net effect of this design is that content is continuously streamed to the output file via the pipes, which reduces the memory footprint of the application and renders the fileExist=Append option unnecessary. Instead, because the Aggregate will complete only once per file, we can use all the Camel file producing goodness, as desired.

The whole thing works because the Camel file producer understands readers, as you can see around here[5. One of the beauties of Open Source: lets you find out all the little details.].

The code

The full implementation is at GitHub. It’s not much (less than 70 lines of Scala), so let’s go over it a few lines at a time.

[code language=”scala”]
class StreamingAggregator extends CompletionAwareAggregationStrategy {
@BeanProperty var endpointUri: String = _
@EndpointInject var producer: ProducerTemplate = _
[/code]

CompletionAwareAggregationStrategy is our interface of choice to implement – it provides a callback on completion, necessary for point (3) above. Spring will hook up the @magic, where the endpointUri property will be the destination of the component. It could possibly point to any Camel producer component that understands messages with a java.io.Reader body (if any), but I’m thinking primarily about files.

[scala]

override def aggregate(oldExchangeNullable: Exchange, newExchange: Exchange): Exchange = {
val oldExchangeOpt = Option(oldExchangeNullable) // pesky nulls
oldExchangeOpt match {
case None => // first message
val writer = new PipedWriter()
val reader = new PipedReader(writer, 100000)

writer.write(newExchange.getIn.getBody.toString)

val createFileExchange = newExchange.copy()
createFileExchange.getIn.setBody(reader)
producer.asyncSend(endpointUri, createFileExchange) // start producing file

newExchange.getIn.setBody(writer) // pass the writer around
newExchange

[/scala]

The aggregate() method is what Camel calls when Exchanges arrive to the aggregator (like words in our first example). It works foldishly: the “new” Exchange is always the newcomer, while the “old” one is what the method returned the last time; it’s null the first time, so you “aggregate” content by returning it so that it’s available the next time, and so on.

The code above shows what to do to begin outputting content (as outlined in the first step): send a new Exchange, with the reader in its body, to the defined endpoint. It’s important to use asyncSend(), naturally (we don’t want our aggregator to block waiting for content that will never arrive because it blocked). The writer is returned, so that it will be available when new Exchanges arrive.

[scala]
case Some(oldExchange) =>
val message = oldExchange.getIn
val writer = message.getBody.asInstanceOf[PipedWriter]
val newBody = newExchange.getIn.getBody
writer.append(newBody.toString)
newExchange.getIn.setBody(writer)
newExchange
[/scala]

When new Exchanges arrive, write their content, keep passing the writer around.

[scala]
override def onCompletion(completed: Exchange) {
val resultHeaderName = "AggregationResult"
val completedBy = completed.getProperty(Exchange.AGGREGATED_COMPLETED_BY)
val input = completed.getIn
val writer = input.getBody.asInstanceOf[PipedWriter]
writer.flush()
writer.close()
val expectedCount = input.getHeader("CamelSplitIndex").asInstanceOf[Int] + 1
val count = completed.getProperty("CamelAggregatedSize", classOf[Long])
input.setBody(s"Completed aggregating $count messages by $completedBy from ${completed.getFromEndpoint}")
completedBy match {
// Complete stream
case "predicate" if expectedCount == count =>
input.setHeader(resultHeaderName, "SUCCESS")
// Something went wrong (missing messages?)
case other: String =>
input.setHeader(resultHeaderName, "MISSING_CONTENT")
case other: Any =>
input.setHeader(resultHeaderName, "UNEXPECTED")
[/scala]

The onCompletion() callback lets us react when the aggregate completes. As you can see in the code, the completed Exchange has a property pointing out the completion reason, which I’m using in this case to infer whether the route finished properly or not. The reasoning here is that if we expect to aggregate X messages but actually processed Y < X, then something went wrong along the way. The CamelSplitIndex and CamelAggregatedSize headers are populated by the Camel File consumer component, but you could set them up yourself if you were using this strategy in a non-file based route.

In any case, and as described earlier in point (3), upon completion we finish the streams (which will signal the File producer that the file creation is complete) and return an Exchange containing a description of the net result of the Aggregate. The added AggregationResult header might be useful further along the route to react differently depending on whether the Aggregate completed successfully or not.

A warning: the Camel documentation clearly states that the onCompletion method must not throw any Exceptions, ever, which the code does not ensure. Do not use this code for production purposes as-is.

But Camel already has a “streaming” component

Yes it does. How much does the code presented here overlap with functionality already present on that component? I’m not sure; I haven’t worked with it. Go ahead and burn me in the comments.

Recap

This post describes how to achieve a goal of dubious value by subverting the contract of a plugin and creating a dependency on non documented behaviour. Not bad!

On the bright side, we’ve also covered in detail the creation of a new aggregation strategy and, hopefully, also shown how you can use the Aggregator to great effect. Just remember that the code is not solid enough to be added “as is” to your project, as it lacks error checking, input validation, etc.

 

Notes

Posted in Uncategorized

Leave a Reply

Your email address will not be published. Required fields are marked *

*

You may use these HTML tags and attributes: <a href="" title=""> <abbr title=""> <acronym title=""> <b> <blockquote cite=""> <cite> <code> <del datetime=""> <em> <i> <q cite=""> <s> <strike> <strong>