WSO2 Complex Event Processor (CEP) has
a pluggable architecture where users can write and plug 'brokers' to
send events from an event source to CEP engine and also receive
output events from the CEP engine and send them to an event sink.
Currently there are quite a few broker types available with the CEP
supporting JMS, WS-Events, Email etc. If you want to persist events
in a relational database, here's how you can achieve it by writing an
RDBMS broker.
There are two classes you have to
extend in order to implement a custom broker.
1.
org.wso2.carbon.broker.core.BrokerType
2.
org.wso2.carbon.broker.core.BrokerTypeFactory
These classes are available in
org.wso2.carbon.broker.core package which can be added as a maven
dependency as follows:
<dependency>
<groupId>org.wso2.carbon</groupId>
<artifactId>org.wso2.carbon.broker.core</artifactId>
<version>4.0.7</version>
</dependency>
Depending on the RDBMS vendor you are
going to support, you'll have to add a dependency to the relevant
JDBC driver as well. Here for this example we will be using the MySQL
driver.
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.6</version>
</dependency>
BrokerType implementation class
contains the methods that will receive events from the CEP engine. It
also has a way to define the broker specific configuration parameters
(such as database host ip/username) that will be available in the web
console at runtime.
Out of the methods in BrokerType,
you'll have to implement following two methods.
1. getBrokerTypeDto() - this should return the BrokerTypeDto object that
contains the required configuration parameters for the Broker.
When implementing getBrokerTypeDto(),
you should populate the BrokerTypeDto with all the parameters that are going to be adjusted at runtime. Here's an example how you can add the hostname field so
that user is asked to fill it when configuring a new broker.
Property hostName = new
Property(“Hostname”);
hostName.setRequired(true);
hostName.setDisplayName(“Host
name”));
getBrokerTypeDto().addProperty(hostName);
2. publish(String topic,
Object message, BrokerConfiguration brokerConfiguration) - this is
where the output events from the CEP engine will be received and
written to the database.
The parameter 'topic' used here maps to
a database table name and the user can give it when configuring a new
CEP bucket. The 'message' parameter will contain the output and if
you choose the output mapping to be 'Map mapping' (which is again
available at the time when configuring a new CEP bucket), you will
receive a java.util.Map with all the output key/value pairs. The
brokerConfiguration contains all the configuration parameters you
requested in the getBrokerTypeDto() method, including hostname,
username etc. The first thing you have to do here is to check whether
the given table exists and create a new one using the topic name if
it doesn't already exist. You can infer its column names and data
types by examining the Map received. Once it is done, you can simply
create an statement and feed the values to the table.
BrokerTypeFactory, as its name implies,
is responsible for creating/providing BrokerType objects. All you
have to do here is to implement getBrokerType()
method to return an instance of the BrokerType implementation class.
Once you complete writing the code,
package it and put the jar file into repository/components/lib
folder. Also remember to add the JDBC driver to the same directory.
Then put an entry to repository/conf/broker.xml as follows (create a
new broker.xml file if it does not exist):
<brokerTypes xmlns="http://wso2.org/carbon/broker">
<brokerType class="org.wso2.cep.broker.RDBMSBrokerTypeFactory"/>
</brokerTypes>
Now start the server and select
'Configure' in the web console. When you select Broker -> Add, you
will see that the newly deployed broker is listed under the 'Broker
Type' dropdown. You can configure the broker instance from there.
An example implementation can be found
here.