By using a wiretap-router or envelope-interceptor on an inbound-endpoint, data about incoming messages can be sent to a CEP engine to construct an event stream. A query can then be written that produces an event when less messages are seen then expected on the stream.
A quick demonstration of this follows. Here are a couple of Groovy scripts that will be wired up with Spring and used to monitor a CXF inbound-endpoint on a Mule instance with Esper.
import org.mule.api.lifecycle.Callable
import org.mule.api.MuleEventContext
class EventInjector implements Callable {
def esperService
public Object onCall(MuleEventContext context) {
esperService.getEPRuntime().sendEvent(context.getMessage())
}
}
This component will be used to receive messages off the wiretap and inject them into the event stream. The next component will be used to register listeners on the stream.
import com.espertech.esper.client.UpdateListener
import com.espertech.esper.client.EventBean
import org.mule.module.client.MuleClient
class MuleEventListener implements UpdateListener {
def expression
def payloadExpression
def esperService
def endpoint
def initialize() {
def statement = esperService.getEPAdministrator().createEPL(expression);
statement.addListener(this)
}
public void update(EventBean[] newEvents, EventBean[] oldEvents) {
def client = new MuleClient()
def event = newEvents[0];
client.dispatch(endpoint, event.get(payloadExpression), null)
}
}
This component code takes two Esper expressions.
expression
queries the event stream for events. payloadExpression
populates the message payload of the new message. endpoint
is where this message will be published to. Here is the Spring beans config that wires the two component scripts with Esper.
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:lang="http://www.springframework.org/schema/lang"
xsi:schemaLocation=
"http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-2.5.xsd
http://www.springframework.org/schema/lang
http://www.springframework.org/schema/lang/spring-lang-2.5.xsd
">
<bean id="esperService" scope="singleton"
class="com.espertech.esper.client.EPServiceProviderManager"
factory-method="getDefaultProvider"/>
<lang:groovy id="eventInjector"
script-source="classpath:/EventInjector.groovy">
<lang:property name="esperService" ref="esperService"/>
</lang:groovy>
<lang:groovy id="mininumMessageListener"
script-source="classpath:/MuleEventListener.groovy"
init-method="initialize">
<lang:property name="esperService" ref="esperService"/>
<lang:property name="endpoint" value="jms://topic:alerts"/>
<lang:property name="expression"
value="select count(*) from org.mule.api.MuleMessage.win:time_batch(10, 'FORCE_UPDATE, START_EAGER') having count(*) < 5"/>
<lang:property name="payloadExpression" value="count(*)"/>
</lang:groovy>
</beans>
"mininumMessageListener" will send a JMS message to the "alerts" topic when less then 5 messages appear on the stream in a 10 second window. The following Mule config pulls all the above together.
<mule xmlns="http://www.mulesource.org/schema/mule/core/2.2"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:spring="http://www.springframework.org/schema/beans"
xmlns:test="http://www.mulesource.org/schema/mule/test/2.2"
xmlns:jms="http://www.mulesource.org/schema/mule/jms/2.2"
xmlns:vm="http://www.mulesource.org/schema/mule/vm/2.2"
xmlns:cxf="http://www.mulesource.org/schema/mule/cxf/2.2"
xmlns:mule-xml="http://www.mulesource.org/schema/mule/xml/2.2"
xsi:schemaLocation="
http://www.mulesource.org/schema/mule/core/2.2 http://www.mulesource.org/schema/mule/core/2.2/mule.xsd
http://www.mulesource.org/schema/mule/cxf/2.2 http://www.mulesource.org/schema/mule/cxf/2.2/mule-cxf.xsd
http://www.mulesource.org/schema/mule/test/2.2 http://www.mulesource.org/schema/mule/test/2.2/mule-test.xsd
http://www.mulesource.org/schema/mule/jms/2.2 http://www.mulesource.org/schema/mule/jms/2.2/mule-jms.xsd
http://www.mulesource.org/schema/mule/vm/2.2 http://www.mulesource.org/schema/mule/vm/2.2/mule-vm.xsd
http://www.mulesource.org/schema/mule/xml/2.2 http://www.mulesource.org/schema/mule/xml/2.2/mule-xml.xsd
http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.5.xsd">
<spring:beans>
<spring:import resource="classpath:spring-config.xml"/>
</spring:beans>
<vm:connector name="vmConnector" queueEvents="true"/>
<cxf:connector name="cxfConnector"/>
<model name="main">
<service name="soapService">
<inbound>
<cxf:inbound-endpoint address="http://localhost:9756/people" connector-ref="cxfConnector"
frontend="simple"
serviceClass="org.mule.tck.testmodels.services.PeopleService"/>
<wire-tap-router>
<vm:outbound-endpoint path="cep.in"/>
</wire-tap-router>
</inbound>
<test:web-service-component/>
</service>
<service name="Complex Event Processing Service">
<inbound>
<vm:inbound-endpoint path="cep.in"/>
</inbound>
<component>
<spring-object bean="eventInjector"/>
</component>
</service>
</model>
</mule>
This example is simplistic, but hopefully the usefulness of this sort of approach is obvious. One particular improvement is to use JMS instead of VM as the target of the wiretap. In this scenario, "Complex Event Processing Service" could be hosted in a separate Mule instance dedicated for event analysis. This would additionally allow horizontally load-balancing "soapService" instances to contribute to the same event stream.
I'm additionally using the MuleMessage as the event type. This offers a limited view into the messages. A more useful implementation would operate on the payload of the messages, via Maps, POJO's or XML. The online Esper documentation is extremely well-written and offers examples to get that going.
No comments:
Post a Comment