Sunday, March 31, 2013

Writing a Relational Database broker for the WSO2 Complex Event Processor


WSO2 Complex Event Processor (CEP) has a pluggable architecture where users can write and plug 'brokers' to send events from an event source to CEP engine and also receive output events from the CEP engine and send them to an event sink. Currently there are quite a few broker types available with the CEP supporting JMS, WS-Events, Email etc. If you want to persist events in a relational database, here's how you can achieve it by writing an RDBMS broker.

There are two classes you have to extend in order to implement a custom broker.

    1. org.wso2.carbon.broker.core.BrokerType
    2. org.wso2.carbon.broker.core.BrokerTypeFactory

These classes are available in org.wso2.carbon.broker.core package which can be added as a maven dependency as follows:

<dependency>
    <groupId>org.wso2.carbon</groupId>
    <artifactId>org.wso2.carbon.broker.core</artifactId>
    <version>4.0.7</version>
</dependency>


Depending on the RDBMS vendor you are going to support, you'll have to add a dependency to the relevant JDBC driver as well. Here for this example we will be using the MySQL driver.


<dependency>
    <groupId>mysql</groupId>
    <artifactId>mysql-connector-java</artifactId>
    <version>5.1.6</version>
</dependency>


BrokerType implementation class contains the methods that will receive events from the CEP engine. It also has a way to define the broker specific configuration parameters (such as database host ip/username) that will be available in the web console at runtime.

Out of the methods in BrokerType, you'll have to implement following two methods.

1. getBrokerTypeDto() - this should return the BrokerTypeDto object that contains the required configuration parameters for the Broker.

When implementing getBrokerTypeDto(), you should populate the BrokerTypeDto with all the parameters that are going to be adjusted at runtime. Here's an example how you can add the hostname field so that user is asked to fill it when configuring a new broker.


    Property hostName = new Property(“Hostname”);
    hostName.setRequired(true);
    hostName.setDisplayName(“Host name”));
    getBrokerTypeDto().addProperty(hostName);

2. publish(String topic, Object message, BrokerConfiguration brokerConfiguration) - this is where the output events from the CEP engine will be received and written to the database.

The parameter 'topic' used here maps to a database table name and the user can give it when configuring a new CEP bucket. The 'message' parameter will contain the output and if you choose the output mapping to be 'Map mapping' (which is again available at the time when configuring a new CEP bucket), you will receive a java.util.Map with all the output key/value pairs. The brokerConfiguration contains all the configuration parameters you requested in the getBrokerTypeDto() method, including hostname, username etc. The first thing you have to do here is to check whether the given table exists and create a new one using the topic name if it doesn't already exist. You can infer its column names and data types by examining the Map received. Once it is done, you can simply create an statement and feed the values to the table.

BrokerTypeFactory, as its name implies, is responsible for creating/providing BrokerType objects. All you have to do here is to implement getBrokerType() method to return an instance of the BrokerType implementation class.

Once you complete writing the code, package it and put the jar file into repository/components/lib folder. Also remember to add the JDBC driver to the same directory. Then put an entry to repository/conf/broker.xml as follows (create a new broker.xml file if it does not exist):


    <brokerTypes xmlns="http://wso2.org/carbon/broker">
    <brokerType class="org.wso2.cep.broker.RDBMSBrokerTypeFactory"/>
    </brokerTypes>


Now start the server and select 'Configure' in the web console. When you select Broker -> Add, you will see that the newly deployed broker is listed under the 'Broker Type' dropdown. You can configure the broker instance from there.

An example implementation can be found here.