TÍTOL X. C ATÀLEG DE CONSTRUCCIONS EN SÒL NO URBANITZABLE
Capítol 2. Relació de construccions en sòl no urbanitzable
Let’s move on to the response flow. This implementation must be able to consume the response messages of the Amazon and Barnes & Noble bookstores, log these mes- sages, aggregate them into one message, calculate the cheapest offering, and eventu- ally return this offering to the book customer. This is quite a lot of functionality, so let’s begin the walkthrough of the response flow (figure 4.9).
The response flow in figure 4.9 consists of an impressive group of ServiceMix func- tionalities. When we compare the response flow with the request flow in figure 4.7, the main difference is the aggregator and the number of pipelines. Let’s start with the order of functionality that’s processed when the response flow is executed. The response flow is triggered by two response messages arriving from the Amazon book- store at the amazon.output JMS queue and from the Barnes & Noble bookstore at the barnes.output JMS queue.
The incoming message from the Barnes & Noble bookstore is first transformed with an XSLT component. This is the same functionality that we saw in the request flow implementation. So again, we use a pipeline to transform the in-only message exchange of the JMS endpoint to an in-out message exchange needed by the Saxon
XSLT transformer. (We don’t discuss the details here again.)
IMPLEMENTINGTHEAGGREGATOR
You may be wondering why the message exchange with the AggregateMessageLogger is in-out and not in-only, as in the request message flow. Why is a pipeline component necessary to transform the in-only message exchange initiated by the JMS consumers to an in-out message exchange to invoke the logger component? In most cases, a log- ger component requires just an in-only as it consumes a message and doesn’t produce
Listing 4.19 Saxon XSLT configuration
B
C
a response message. Well, AggregateMessageLogger does a bit more than just log the message contents.
Because we use a standard ServiceMix aggregator to group the two incoming response messages from Amazon and Barnes & Noble, we need to set some additional message properties. This standard aggregator is part of the splitter/aggregator func- tionality that ServiceMix provides. So messages are first divided into multiple smaller messages and eventually the aggregator groups the smaller messages into a large mes- sage again. Because we didn’t have to use splitter functionality, we must set some message properties that otherwise the splitter would have done. (We could’ve also cre- ated a new aggregator implementation, but that approach would make this example unnecessarily complex.)
The AggregatorMessageLogger implementation uses the functionality of the servicemix-bean service engine. You saw some examples of such an implementa- tion in section 4.2, but we’re using different functionality this time (see listing 4.20).
public void onMessageExchange(MessageExchange exchange) throws MessagingException {
Listing 4.20 AggregatorMessageLogger implementation
Figure 4.9 The response flow implementation of the book price case study. The incoming book quotes from Amazon and Barnes & Noble are logged and aggregated. Then the lowest price is calculated and sent back to the customer.
if (exchange.getStatus() != ExchangeStatus.ACTIVE) { logger.info("state is not active so ignoring call"); return;
}
Element payloadElem = null; try {
Source contentSource = getInMessage(exchange).getContent(); payloadElem = sourceTransformer.toDOMElement(
contentSource); } catch(Exception e) {
logger.error("error while reading payload", e); throw new MessagingException(e);
}
BookQuote bookQuote = QuoteMessageHelper. unmarshallQuote(payloadElem);
logger.info("received quote " + bookQuote.getIsbn() + ", " + bookQuote.getPrice() + ", " + bookQuote.getCompanyName()); NormalizedMessage outMessage = exchange.createMessage(); try { outMessage.setContent(new StringSource( QuoteMessageHelper.marshallQuote(bookQuote))); } catch(Exception e) {
logger.error("error while setting content in out message", e); throw new MessagingException(e);
}
outMessage.setProperty( "org.apache.servicemix.eip.splitter.corrid", bookQuote.getIsbn()); Integer aggregatorIndex = null;
if("Amazon".equalsIgnoreCase(bookQuote.getCompanyName())) { aggregatorIndex = new Integer(0);
} else if("BarnesAndNoble".equalsIgnoreCase( bookQuote.getCompanyName())) {
aggregatorIndex = new Integer(1); } outMessage.setProperty( "org.apache.servicemix.eip.splitter.index", aggregatorIndex); outMessage.setProperty( "org.apache.servicemix.eip.splitter.count", new Integer(2)); exchange.setMessage(outMessage, "out"); channel.send(exchange); }
Only the onMessageExchange method implementation is shown in listing 4.20. The logger, transformer, and channel instantiation were already shown in listings 4.8 and 4.11 in section 4.2. In this method implementation, the contents of the incoming mes- sage are first transformed to an XML representation as a Document Object Model (DOM) object
B
. This makes it easier to process the contents of the message for the rest of the method implementation. Then the DOM object instance is transformed toConverts message to DOM object
B
Transforms to BookQuoteC
Creates response messageD
Sets correlation identifierE
Sets message indexF
Sets group sizeG
a BookQuote JavaBean with an isbn, a price, and a companyName attribute
C
. Now we can easily log the message contents with Log4j to the console.To be able to set some message properties needed for the ServiceMix aggregator implementation, we created a response message
D
. First the contents of the incoming message are just copied to the outgoing message. We don’t want to change this part of the message. What needs to be added are some message properties, including a corre- lation identifier for the group of messages that need to be aggregated into one grouped messageE
. As with the Mule implementation, we use the ISBN for the corre- lation identifier. For real-world implementations, you’d certainly choose a better and more unique correlation identifier here.Another property that needs to be set for the aggregator is the message index. This is needed by the aggregator to determine whether every message of the group has been received. For this example, we just give the Amazon message a message index of 0 and the Barnes & Noble message a message index of 1. The index is then set with the index attribute
F
. The last property that needs to be set is the number of mes- sages that have to be aggregated into one message. This is done by setting the count attributeG
.CONFIGURINGTHESERVICEUNITS
We have yet to discuss configuring the pipeline that accepts messages from the Amazon response queue and the XSLT transformation, invokes the message logger, and routes the result to the aggregator component. This configuration is shown in listing 4.21. <eip:pipeline service="esb:quoteLoggingPipeline" endpoint="quoteLoggingPipelineEndpoint"> <eip:transformer> <eip:exchange-target service="esb:aggregateLoggerComponent"/> </eip:transformer> <eip:target> <eip:exchange-target service="esb:quoteAggregator"/> </eip:target> </eip:pipeline>
As shown with the eip:transformer element configuration
B
, the pipeline first invokes the AggregatorLoggerComponent from listing 4.9. With the eip:target con- figurationC
, you can set the target service for the response message received from the AggregatorLoggerComponent. The pipeline component is used in many exam- ples in this book as it is essential in invoking in-out components from an in-only mes- sage exchange.The next component, the split-aggregator, is invoked in the response flow when the pipeline in listing 4.21 receives a response from the special logger component. Because we enhanced the logging component with the creation of three aggregator- related properties (the correlation identifier, message index, and group size), we can
Listing 4.21 Pipeline configuration for the message logger
B
C
use the standard aggregator provided by ServiceMix. We only need to configure this aggregator (see listing 4.22).
<eip:split-aggregator service="esb:quoteAggregator" endpoint="quoteAggregatorEndpoint"> <eip:target> <eip:exchange-target service="esb:cheapestPricePipeline"/> </eip:target> </eip:split-aggregator>
The service name of the aggregator configuration
B
corresponds to the target desti- nation configuration of the pipeline in listing 4.21. This links the pipeline component and the aggregator component to each other. There’s no need to configure addi- tional properties; we can just pass the result of the aggregation to the next component in the response flow, which is another pipelineC
.The cheapestPricePipeline invokes the CheapestPriceCalculator bean compo- nent and passes the response message to the last point in the response flow, which is the JMS endpoint that produces a JMS message to the booksearch.output queue. The pipeline configuration is similar to the configuration in listing 4.21 and therefore isn’t included here. The configuration of all three pipelines and the aggregator are com- bined into one xbean.xml configuration for the servicemix-eip service engine. We discussed the logic inside the CheapestPriceCalculator in the Mule example, but because we have to deal with a message exchange for ServiceMix, we explore the details in listing 4.23.
public class CheapestPriceCalculator extends ComponentSupport implements MessageExchangeListener {
private static final Logger logger =
Logger.getLogger(CheapestPriceCalculator.class); private SourceTransformer sourceTransformer = new SourceTransformer();
@Resource
private DeliveryChannel channel;
public void onMessageExchange(MessageExchange exchange) throws MessagingException {
if (exchange.getStatus() != ExchangeStatus.ACTIVE) { logger.info("state is not active so ignoring call"); return;
}
Element payloadElem = null; Source contentSource = null;
try {
contentSource = getInMessage(exchange).getContent(); payloadElem = sourceTransformer.toDOMElement( contentSource); } catch(Exception e) {
Listing 4.22 ServiceMix aggregator configuration
Listing 4.23 Implementation of the CheapestPriceCalculator bean
B
C
Gets aggregated message as DOM
B
logger.error("error while reading payload", e); }
BookQuote cheapestQuote = null;
NodeList aggregateList = payloadElem.getChildNodes(); for(int i = 0; i < aggregateList.getLength(); i++) { Node aggregateNode = aggregateList.item(i); Node childNode = aggregateNode.getFirstChild();
BookQuote bookQuote = QuoteMessageHelper.unmarshallQuote( childNode);
if(cheapestQuote == null || bookQuote.getPrice().floatValue() < cheapestQuote.getPrice().floatValue()) {
cheapestQuote = bookQuote; }
}
NormalizedMessage outMessage = exchange.createMessage(); try {
outMessage.setContent(new StringSource( QuoteMessageHelper.marshallQuote( cheapestQuote))); } catch(Exception e) {
logger.error("error setting content on cheapest " + "bookstore response message", e);
}
exchange.setMessage(outMessage, "out"); channel.send(exchange);
} }
Because the aggregated message produced by the AggregatorMessageLogger in list- ing 4.20 is sent across the ServiceMix container as a normalized message, we have to work with XML message content that contains both the price response from the Ama- zon as well as the Barnes and Noble bookstores. Therefore, we first transform the input message to a DOM structure
B
.The root element of the input message contains two child elements that contain the price responses. So the next step is to loop through these two price responses to be able to determine which is the lowest
C
. When the price of the response message is the lowest, we store the unmarshaled BookQuote instance into a local variableD
.Now that we’ve determined the lowest price, the response message can be con- structed and the message content must be filled. Therefore, we must marshal the low- est book quote message into an XML message
E
. Because we marshal the BookQuote instance to an XML String representation, we use the StringSource message type. The response message is then sent to the JMS output queue.DEPLOYINGANDTESTINGTHE SERVICEMIXCHEAPESTPRICECALCULATOR
Let’s proceed to the test phase of our solution. The test resembles the one we used in the Mule example. We also use two message listeners, AmazonBookStore and Barnes- BookStore, for testing our ServiceMix solution. To trigger the implemented message flow, we can also use a JUnit test case, MessageFlowTest, or we can use the Swing test client, which produces a message that includes a simple ISBN to the booksearch. input queue. Loops through price responses
C
Stores cheapest priceD
Returns cheapest price messageE
With all the code in place, we just have to deploy the solution to the running ServiceMix container. You can start the ServiceMix container with the default target in the Ant build file ch4-examples.xml. When the container has started, the deploy- messageflow target can be used to build and deploy the lowest price service assembly. To start the Amazon and Barnes and Noble JMS listeners, you have to run the messageflow-spring target.
Let’s execute the JUnit test class esb.chapter4.messageflow.MessageFlowTest, available in the test directory of the book’s source code. Alternatively, you can use the Swing test client to trigger the message flow. You should see logging statements in the console in your execution environment as well as in the ServiceMix container console. In the console of the JUnit test or in the Swing test client, you should eventually see a “received lowest quote” message with some contents displayed. This means that the message flow has executed successfully.
Take your time to examine the full implementation of this message flow in the source code. The implementation consists of many different parts, so it’s a good idea to play with the source distribution for some time in order to fully grasp the details. Once you feel confident about all the parts of this message flow, you’ll have made a giant step toward becoming a ServiceMix developer.