John D'Emic's blog about programming, integration, system administration, etc...

Thursday, September 24, 2009

HornetQ and Mule

The lack of OpenMQ's delivery retry options have led me down the road of evaluating JMS brokers again. I luckily didn't have to look very far. JBoss' recently released HornetQ broker is very impressive. Getting it going with Mule is trivial. Here's the config I'm using, which connects to a local HornetQ instance using Netty (instead of JNDI.)


<spring:bean name="transportConfiguration"
class="org.hornetq.core.config.TransportConfiguration">
<spring:constructor-arg
value="org.hornetq.integration.transports.netty.NettyConnectorFactory"/>
</spring:bean>

<spring:bean name="connectionFactory"
class="org.hornetq.jms.client.HornetQConnectionFactory">
<spring:constructor-arg ref="transportConfiguration"/>
<spring:property name="minLargeMessageSize" value="250000"/>
<spring:property name="cacheLargeMessagesClient" value="false"/>
</spring:bean>

<jms:connector name="jmsConnector"
connectionFactory-ref="connectionFactory"
createMultipleTransactedReceivers="false"
numberOfConcurrentTransactedReceivers="1"
specification="1.1">
</jms:connector>

Tuesday, September 1, 2009

Mule Endpoint QoS with Esper

Esper is a really slick, open-source CEP engine I've been playing with to monitor traffic on Mule endpoints.  Monitoring endpoints with port checks, JMX and log monitoring gives a lot of insight into the health of individual Mule instances, but offers little insight when external services fail.  An external producer of JMS messages to a queue may fail, a database may have a slow-query situation where rows take longer then expected to return or an SMTP outage may stop messages from being delivered to an IMAP server.  Any of these situations would cause less then an expected amount of messages to be delivered to a JMS, JDBC or IMAP endpoint.

By using a wiretap-router or envelope-interceptor on an inbound-endpoint, data about incoming messages can be sent to a CEP engine to construct an event stream.  A query can then be written that produces an event when less messages are seen then expected on the stream.  

A quick demonstration of this follows.  Here are a couple of Groovy scripts that will be wired up with Spring and used to monitor a CXF inbound-endpoint on a Mule instance with Esper.


import org.mule.api.lifecycle.Callable
import org.mule.api.MuleEventContext

class EventInjector implements Callable {

def esperService

public Object onCall(MuleEventContext context) {
esperService.getEPRuntime().sendEvent(context.getMessage())
}

}



This component will be used to receive messages off the wiretap and inject them into the event stream.  The next component will be used to register listeners on the stream.


import com.espertech.esper.client.UpdateListener
import com.espertech.esper.client.EventBean

import org.mule.module.client.MuleClient

class MuleEventListener implements UpdateListener {

def expression
def payloadExpression
def esperService
def endpoint

def initialize() {
def statement = esperService.getEPAdministrator().createEPL(expression);
statement.addListener(this)
}

public void update(EventBean[] newEvents, EventBean[] oldEvents) {
def client = new MuleClient()
def event = newEvents[0];
client.dispatch(endpoint, event.get(payloadExpression), null)
}

}

This component code takes two Esper expressions.  expression queries the event stream for events.  payloadExpression populates the message payload of the new message.  endpoint is where this message will be published to.  Here is the Spring beans config that wires the two component scripts with Esper.


<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:lang="http://www.springframework.org/schema/lang"
xsi:schemaLocation=
"http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-2.5.xsd
http://www.springframework.org/schema/lang
http://www.springframework.org/schema/lang/spring-lang-2.5.xsd
">

<bean id="esperService" scope="singleton"
class="com.espertech.esper.client.EPServiceProviderManager"
factory-method="getDefaultProvider"/>

<lang:groovy id="eventInjector"
script-source="classpath:/EventInjector.groovy">
<lang:property name="esperService" ref="esperService"/>
</lang:groovy>

<lang:groovy id="mininumMessageListener"
script-source="classpath:/MuleEventListener.groovy"
init-method="initialize">
<lang:property name="esperService" ref="esperService"/>
<lang:property name="endpoint" value="jms://topic:alerts"/>
<lang:property name="expression"
value="select count(*) from org.mule.api.MuleMessage.win:time_batch(10, 'FORCE_UPDATE, START_EAGER') having count(*) < 5"/>
<lang:property name="payloadExpression" value="count(*)"/>
</lang:groovy>
</beans>



"mininumMessageListener" will send a JMS message to the "alerts" topic when less then 5 messages appear on the stream in a 10 second window.  The following Mule config pulls all the above together.



<mule xmlns="http://www.mulesource.org/schema/mule/core/2.2"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:spring="http://www.springframework.org/schema/beans"
xmlns:test="http://www.mulesource.org/schema/mule/test/2.2"
xmlns:jms="http://www.mulesource.org/schema/mule/jms/2.2"
xmlns:vm="http://www.mulesource.org/schema/mule/vm/2.2"
xmlns:cxf="http://www.mulesource.org/schema/mule/cxf/2.2"
xmlns:mule-xml="http://www.mulesource.org/schema/mule/xml/2.2"
xsi:schemaLocation="
http://www.mulesource.org/schema/mule/core/2.2 http://www.mulesource.org/schema/mule/core/2.2/mule.xsd
http://www.mulesource.org/schema/mule/cxf/2.2 http://www.mulesource.org/schema/mule/cxf/2.2/mule-cxf.xsd
http://www.mulesource.org/schema/mule/test/2.2 http://www.mulesource.org/schema/mule/test/2.2/mule-test.xsd
http://www.mulesource.org/schema/mule/jms/2.2 http://www.mulesource.org/schema/mule/jms/2.2/mule-jms.xsd
http://www.mulesource.org/schema/mule/vm/2.2 http://www.mulesource.org/schema/mule/vm/2.2/mule-vm.xsd
http://www.mulesource.org/schema/mule/xml/2.2 http://www.mulesource.org/schema/mule/xml/2.2/mule-xml.xsd
http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.5.xsd">


<spring:beans>
<spring:import resource="classpath:spring-config.xml"/>
</spring:beans>

<vm:connector name="vmConnector" queueEvents="true"/>

<cxf:connector name="cxfConnector"/>

<model name="main">

<service name="soapService">
<inbound>
<cxf:inbound-endpoint address="http://localhost:9756/people" connector-ref="cxfConnector"
frontend="simple"
serviceClass="org.mule.tck.testmodels.services.PeopleService"/>
<wire-tap-router>
<vm:outbound-endpoint path="cep.in"/>
</wire-tap-router>
</inbound>
<test:web-service-component/>
</service>

<service name="Complex Event Processing Service">
<inbound>
<vm:inbound-endpoint path="cep.in"/>
</inbound>
<component>
<spring-object bean="eventInjector"/>
</component>
</service>

</model>

</mule>



This example is simplistic, but hopefully the usefulness of this sort of approach is obvious.  One particular improvement is to use JMS instead of VM as the target of the wiretap.  In this scenario, "Complex Event Processing Service" could be hosted in a separate Mule instance dedicated for event analysis. This would additionally allow horizontally load-balancing "soapService" instances to contribute to the same event stream.

I'm additionally using the MuleMessage as the event type.  This offers a limited view into the messages.  A more useful implementation would operate on the payload of the messages, via Maps, POJO's or XML.  The online Esper documentation is extremely well-written and offers examples to get that going.  





Wednesday, June 17, 2009

SAAJ Transport -> SAAJ Module

With the time I had off work, editing the book and (briefly from) feeding the baby a few weeks ago, I naturally decided to add receiver functionality to the SAAJ transport. In a sleep deprived state  I considered extending the HTTP and JMS transports to use SAAJ to receive and dispatch messages. After chatting with David, however, a cleaner approach seemed to be in order. I started to work on refactoring the code I was using in the SAAJ MessageAdapter, which moved the message payloads back and forth from SOAP, into dedicated transformer implementations: soap-message-to-document transformer and document-to-soap-message-transformer.  

The refactored implementation allows you to use the SAAJ transformers to transform message payloads over arbitrary transports, like VM, XMPP or file, in addition to HTTP and JMS. The transformers are available from the (renamed) SAAJ Module. Cursory documentation and a few examples are available.  The distribution link isn't working yet, but you can download the jars and get access to the pom from here.

Friday, April 10, 2009

Mule SAAJ Transport

I've recently been working on a couple of projects that use complex SOAP API's. One of these API's is specified with a 1.5 megabyte WSDL along with something like 50 megabytes worth of Axis generated stub classes. Since I only needed to use a small subset of the WSDL's methods, I wasn't thrilled with dealing with the WSDL or the Axis stubs directly. As we're using Mule, this would have involved using either a CXF WSDL outbound-endpoint or maybe using the stub classes in a component.

Since I knew what the SOAP payloads look like (the SOAP body content), what I really wanted to do was just build this XML and pass it to an endpoint. It would also be nice if this endpoint dynamically set the SOAP message headers, extracted the SOAP body from the response and applied the response transformers (and perhaps got me a beer.)

I didn't see an obvious way to do with the CXF transport, so I took a stab at implementing such a transport myself. I had used SAAJ in a web-services proxy project I worked on last year and it seemed like a good fit. As such, I present the SAAJ-Transport. You can currently use it to pass arbitrary XML that is used as the SOAP body in messages sent to a SOAP endpoint. The endpoint handles constructing the SOAP message for you, adding the headers and extracting the SOAP body from the response (it won't get you a beer...yet.) Here's an example using a chaining-router to send a SOAP message and send the response to a VM endpoint.


<outbound>
<chaining-router>
<saaj:outbound-endpoint address="${service.url}" synchronous="true">
<transformers>
<transformer ref="templateToRequest"/>
<mulexml:xml-to-dom-transformer returnClass="org.w3c.dom.Document"/>
<saaj:document-to-soap-message-transformer/>
<saaj:mime-header-transformer key="Cookie" value="#[header:SERVICE_SOAP_SESSION_ID]"/>
</transformers>
</saaj:outbound-endpoint>
<vm:outbound-endpoint path="service.dispatcher">
<transformers>
<saaj:soapbody-to-document-transformer/>
<mulexml:dom-to-xml-transformer returnClass="java.lang.String"/>
</transformers>
</vm:outbound-endpoint>
</chaining-router>
</outbound>


The "document-to-soap-message-transformer" takes an org.w3c.dom.Document, transforms it to a SAAJ SOAPMessage and uses SAAJ to invoke the web-service. The mime-header-transformer adds a MIME header to the message (in this case a Cookie). Existing properties on the MuleMessage will be added the the SOAP header. When the response is received, the transport will extract out the SOAPBody and return it as the synchronous response, as well as set any SOAP headers as properties on the MuleMessage. In this case, the response is transformed back to a Document, then to a String, then finally passed out on the VM endpoint.

I'm hoping next week to get full documentation and examples up on the MuleForge page. I'm also planning to work on receiver functionality. This would allow you to receive SOAP messages on an inbound-endpoint and have their bodies extracted, headers set as Mule properties, etc. I'm still working on getting the distribution together. For now you'll need to checkout the source and use "mvn clean package" to build the jar or "mvn clean install" to get them into your local repository.

Tuesday, March 3, 2009

Mule, Smooks and Nagios

I've been working on upgrading our integration infrastructure on and off since the new year.  This began with the OpenMQ migration I previously blogged about and was followed by upgrading our Mule 1.4.3 services to Mule 2.x.  In addition to the technology changes, I wanted to use the upgrade as an excuse to clean-up some messy stuff we had in place. An example of which being the amount of custom transformation we were doing in Java code.

Our integration implementation makes heavy use of the Canonical Data Model pattern.  To shortly sum it up , we accept data in a variety of formats (XML, CSV or proprietary) and map them to an XML schema and/or Java object model.  Beyond the standard transport transformations supplied by Mule, we needed to implement a zoo of custom transformers to move to the canonical format.  I was looking for a way to mitigate this complexity overhead in some sort of framework.

I had read this article on InfoQ about Smooks around when thinking about the above and it seemed like a good fit, especially since there is a Mule module for it.  To make a long story short, we were able to upgrade to Mule 2.x and, using Smooks, not have to implement any model specific Mule transformers.  

Smooks works by streaming data in, transforming it and streaming it out.  "Cartridges" supply various transformation capabilities and exist for common data formats like XML, JSON and CSV.  The streaming model means that the transformations themselves don't require the entire documents to be loaded in memory.  This allows for large documents to be transformed without requiring the associated memory footprint.

The transformations can  be accomplished via XML configuration assuming the data formats being used have an associated cartridge.  This is also the case if the data is in a format you can easily move to a different format.  For instance, we have Nagios 2.x instances that use a semi-colon delimited status.log to write alert data.  A simple Groovy script allowed me to replace the semi-colons with commas.  I was then able to use the CSV cartridge to convert the data to XML.

The above Nagios instances are being upgraded to Nagios 3.x.  In Nagios 3.x, the status.log format is different.  Instead of being semi-colon delimited, it is in a proprietary format that sort of looks like JSON.  Here's an example:


servicestatus {
host_name=liro_url_laces0
service_description=liro_https://acmesoft.com/VI/Pages/General/TestConn.aspx
modified_attributes=0
check_command=check_https!/VI/
check_period=24x7
notification_period=24x7
check_interval=15.000000
retry_interval=2.000000
event_handler=
has_been_checked=1
..
}
There obviously isn't a Smooks cartridge that supports this format.  One solution might be to try to convert the above format to JSON.  This will probably work but likely be error-prone  (and annoying to implement.)  An alternative is to implement an XMLReader to parse the above file and spit out an XML Document.  

Smooks uses implementations of XMLReader to parse arbitrary file formats as XML.  It then operate on the SAX stream or DOM as dictated by a configuration file.  The following illustrates an implementation of the parse method of XMLReader that will parse the status.log format above:



public void parse(InputSource inputSource) throws IOException, SAXException {
if (contentHandler == null) {
throw new IllegalStateException("'contentHandler' not set. Cannot parse Email stream.");
}

String currentBlock = null;

contentHandler.startDocument();
contentHandler.startElement(XMLConstants.NULL_NS_URI, "statusLog", "", EMPTY_ATTRIBS);

for (String line : getString(inputSource).split("\n")) {

if (line.startsWith("#"))
continue;

if (line.contains("servicestatus")) {
String block = StringUtils.deleteWhitespace(line.split("\\{")[0]);
contentHandler.startElement(XMLConstants.NULL_NS_URI, block, "", EMPTY_ATTRIBS);
currentBlock = block;
}

if (currentBlock != null) {
if (line.contains("=")) {
String[] fields = line.split("=", 2);
String fieldName = StringEscapeUtils.escapeXml(StringUtils.deleteWhitespace(fields[0].replace("=", "")));

contentHandler.startElement(XMLConstants.NULL_NS_URI, fieldName, "", EMPTY_ATTRIBS);
if (fields.length > 1) {
String content = StringEscapeUtils.escapeXml(fields[1]);

contentHandler.characters(content.toCharArray(), 0, content.length());
} else {
contentHandler.characters(" ".toCharArray(), 0, 1);
}
contentHandler.endElement(XMLConstants.NULL_NS_URI, fieldName, "");
}

if (line.contains("}")) {
contentHandler.endElement(XMLConstants.NULL_NS_URI, currentBlock, "");
currentBlock = null;
}
}

}

contentHandler.endElement(XMLConstants.NULL_NS_URI, "statusLog", "");
contentHandler.endDocument();
}

We can plug the reader into the Smooks XML config :


<smooks-resource-list xmlns="http://www.milyn.org/xsd/smooks-1.1.xsd"
xmlns:csv="http://www.milyn.org/xsd/smooks/csv-1.1.xsd"
xmlns:ftl="http://www.milyn.org/xsd/smooks/freemarker-1.1.xsd"
>

<params>
<param name="stream.filter.type">SAX</param>
<param name="default.serialization.on">false</param>
</params>

<reader class="net.opsource.osb.reader.NagiosReader"/>

<resource-config selector="servicestatus">
<resource>org.milyn.delivery.DomModelCreator</resource>
</resource-config>

<ftl:freemarker applyOnElement="statusLog">
<ftl:template><!--
<ApplicationResponseTimes>
<?TEMPLATE-SPLIT-PI?>
</ApplicationResponseTimes>
-->
</ftl:template>
</ftl:freemarker>

<ftl:freemarker applyOnElement="servicestatus">
<ftl:template>smooks/monitoring/application_response_time/metric.ftl</ftl:template>
</ftl:freemarker>

</smooks-resource-list>



Now we plug it into Mule using the Smooks module and we're ready to go.


<smooks:transformer name="nagiosStatusLineToXML"
configFile="smooks/monitoring/application_response_time/smooks-config.xml"
resultType="STRING"/>


I'm pretty excited about this because I'm no longer writing a dedicated transformer for each domain model I'm mapping data to. I just need to implement XMLReaders when I come across a data format not already supported by a Smooks cartridge.

Thursday, January 15, 2009

OpenMQ, Second Thoughts

 OpenMQ was fairly painless to get going with Mule.  I opted to set it up in our staging environment as a conventional cluster with 2 nodes. In this scenario, clients can maintain a list of brokers to connect to in the event one of them fails.  Loadbalancing between brokers might also be supported, but I haven't gone too far down that rabbit hole yet.  

Nor have I gone down the rabbit hole of HA clusters, which support failover of message data between brokers but require a shared database.  Amongst other things, we're using JMS to distribute monitoring alerts.  To do HA in production in a sensible manner, we'd need to back OpenMQ against our production MySQL cluster.  Since we're sending monitoring data about the same production MySQL cluster over JMS,  if the MySQL cluster failed we'd never hear about it.  We're not (currently) using JMS for any sort of financial or mission-critical data so losing a few messages in the event of a failover isn't too big of a deal for us as long as its a rare occurrence.  

Getting clients to connect to OpenMQ was a little more painful. It manages all of its "objects" (connection factories, queues, topics, etc) in JNDI.  We don't really use any sort of distributed JNDI infrastructure, so I started off using the filesystem JNDI context supplied with OpenMQ. In this scenario, all your OpenMQ objects are stored in a hidden file in a directory.  This works fine in development or testing situations when your clients and broker all have access to the directory.  Its obviously not an option for production, unless you do something painful like make tarballs of the JNDI filesystem directory and scp them or around or export it over NFS.    

According to the documentation, the "right" way seems to be by using an LDAP directory context to store the JNDI data (someone please correct me if I'm wrong about this.)  In this case, you store your OpenMQ objects to LDAP.  Each client then loads the appropriate connection factory, queus, etc from LDAP.  This is nice in the sense that configuration data for the connections (broker lists, etc) are maintained outside of the clients.  Presumably this allows you to add brokers to a cluster,  etc w/o having to restart your JMS clients.  

Despite the bit of complexity, this again was pretty straightforward. I just needed an LDAP directory to store the JNDI data in.  It (briefly) occurred to me to use our Active Directory deployment.  My assumption was, however, that this would involve modifying Active Directory's schema which I've never done before and heard nightmare stories about(it would also involve making changes to the production AD deployment - which is treated akin to a live hand grenade in the company.)

I ultimately opted to use OpenLDAP.  This was painless.  The only thing I had to do was include the supplied java.schema in the the slapd.conf and restart the service.  A short while later I was able to get Mule and JMeter sending messages through it.  The OpenMQ command line stuff worked great while doing some preliminary load testing.  The queue metrics in particular were really nice - it lets you watch queue statistics the same way you'd watch memory statistics with vmstat or disk statistics with iostat.  I am pretty impressed so far...

Wednesday, January 14, 2009

OpenMQ, First Thoughts

For a variety of reasons I've been evaluating message brokers other then ActiveMQ.  I installed OpenMQ after hearing anecdotal evidence it was a good alternative.  Before switching to doing development full-time, I spent a fair amount of time in the trenches admining Solaris and Linux boxes.  As such, I'm a little bit too familiar with Sun's weird installers for things - and OpenMQ was no exception.  I was confronted with the first ncurses GUI I've seen since I stopped using Mutt to read my email - a tarball or RPM would have been sufficient. 

Despite this, the installation went smoothly enough and I had the broker up and going in about 15 minutes.  I had two brokers up in a cluster 45 minutes later.  All through the command line. Which was extremely impressive.  I'm going to spend some time today getting Mule and JMeter going with it and start passing some messages through.  I'll follow up with how that goes...