John D'Emic's blog about programming, integration, system administration, etc...

Tuesday, September 1, 2009

Mule Endpoint QoS with Esper

Esper is a really slick, open-source CEP engine I've been playing with to monitor traffic on Mule endpoints.  Monitoring endpoints with port checks, JMX and log monitoring gives a lot of insight into the health of individual Mule instances, but offers little insight when external services fail.  An external producer of JMS messages to a queue may fail, a database may have a slow-query situation where rows take longer then expected to return or an SMTP outage may stop messages from being delivered to an IMAP server.  Any of these situations would cause less then an expected amount of messages to be delivered to a JMS, JDBC or IMAP endpoint.

By using a wiretap-router or envelope-interceptor on an inbound-endpoint, data about incoming messages can be sent to a CEP engine to construct an event stream.  A query can then be written that produces an event when less messages are seen then expected on the stream.  

A quick demonstration of this follows.  Here are a couple of Groovy scripts that will be wired up with Spring and used to monitor a CXF inbound-endpoint on a Mule instance with Esper.


import org.mule.api.lifecycle.Callable
import org.mule.api.MuleEventContext

class EventInjector implements Callable {

def esperService

public Object onCall(MuleEventContext context) {
esperService.getEPRuntime().sendEvent(context.getMessage())
}

}



This component will be used to receive messages off the wiretap and inject them into the event stream.  The next component will be used to register listeners on the stream.


import com.espertech.esper.client.UpdateListener
import com.espertech.esper.client.EventBean

import org.mule.module.client.MuleClient

class MuleEventListener implements UpdateListener {

def expression
def payloadExpression
def esperService
def endpoint

def initialize() {
def statement = esperService.getEPAdministrator().createEPL(expression);
statement.addListener(this)
}

public void update(EventBean[] newEvents, EventBean[] oldEvents) {
def client = new MuleClient()
def event = newEvents[0];
client.dispatch(endpoint, event.get(payloadExpression), null)
}

}

This component code takes two Esper expressions.  expression queries the event stream for events.  payloadExpression populates the message payload of the new message.  endpoint is where this message will be published to.  Here is the Spring beans config that wires the two component scripts with Esper.


<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:lang="http://www.springframework.org/schema/lang"
xsi:schemaLocation=
"http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-2.5.xsd
http://www.springframework.org/schema/lang
http://www.springframework.org/schema/lang/spring-lang-2.5.xsd
">

<bean id="esperService" scope="singleton"
class="com.espertech.esper.client.EPServiceProviderManager"
factory-method="getDefaultProvider"/>

<lang:groovy id="eventInjector"
script-source="classpath:/EventInjector.groovy">
<lang:property name="esperService" ref="esperService"/>
</lang:groovy>

<lang:groovy id="mininumMessageListener"
script-source="classpath:/MuleEventListener.groovy"
init-method="initialize">
<lang:property name="esperService" ref="esperService"/>
<lang:property name="endpoint" value="jms://topic:alerts"/>
<lang:property name="expression"
value="select count(*) from org.mule.api.MuleMessage.win:time_batch(10, 'FORCE_UPDATE, START_EAGER') having count(*) < 5"/>
<lang:property name="payloadExpression" value="count(*)"/>
</lang:groovy>
</beans>



"mininumMessageListener" will send a JMS message to the "alerts" topic when less then 5 messages appear on the stream in a 10 second window.  The following Mule config pulls all the above together.



<mule xmlns="http://www.mulesource.org/schema/mule/core/2.2"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:spring="http://www.springframework.org/schema/beans"
xmlns:test="http://www.mulesource.org/schema/mule/test/2.2"
xmlns:jms="http://www.mulesource.org/schema/mule/jms/2.2"
xmlns:vm="http://www.mulesource.org/schema/mule/vm/2.2"
xmlns:cxf="http://www.mulesource.org/schema/mule/cxf/2.2"
xmlns:mule-xml="http://www.mulesource.org/schema/mule/xml/2.2"
xsi:schemaLocation="
http://www.mulesource.org/schema/mule/core/2.2 http://www.mulesource.org/schema/mule/core/2.2/mule.xsd
http://www.mulesource.org/schema/mule/cxf/2.2 http://www.mulesource.org/schema/mule/cxf/2.2/mule-cxf.xsd
http://www.mulesource.org/schema/mule/test/2.2 http://www.mulesource.org/schema/mule/test/2.2/mule-test.xsd
http://www.mulesource.org/schema/mule/jms/2.2 http://www.mulesource.org/schema/mule/jms/2.2/mule-jms.xsd
http://www.mulesource.org/schema/mule/vm/2.2 http://www.mulesource.org/schema/mule/vm/2.2/mule-vm.xsd
http://www.mulesource.org/schema/mule/xml/2.2 http://www.mulesource.org/schema/mule/xml/2.2/mule-xml.xsd
http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.5.xsd">


<spring:beans>
<spring:import resource="classpath:spring-config.xml"/>
</spring:beans>

<vm:connector name="vmConnector" queueEvents="true"/>

<cxf:connector name="cxfConnector"/>

<model name="main">

<service name="soapService">
<inbound>
<cxf:inbound-endpoint address="http://localhost:9756/people" connector-ref="cxfConnector"
frontend="simple"
serviceClass="org.mule.tck.testmodels.services.PeopleService"/>
<wire-tap-router>
<vm:outbound-endpoint path="cep.in"/>
</wire-tap-router>
</inbound>
<test:web-service-component/>
</service>

<service name="Complex Event Processing Service">
<inbound>
<vm:inbound-endpoint path="cep.in"/>
</inbound>
<component>
<spring-object bean="eventInjector"/>
</component>
</service>

</model>

</mule>



This example is simplistic, but hopefully the usefulness of this sort of approach is obvious.  One particular improvement is to use JMS instead of VM as the target of the wiretap.  In this scenario, "Complex Event Processing Service" could be hosted in a separate Mule instance dedicated for event analysis. This would additionally allow horizontally load-balancing "soapService" instances to contribute to the same event stream.

I'm additionally using the MuleMessage as the event type.  This offers a limited view into the messages.  A more useful implementation would operate on the payload of the messages, via Maps, POJO's or XML.  The online Esper documentation is extremely well-written and offers examples to get that going.  





No comments: