Saturday, October 24, 2015

WSO2 ESB: Polling a database for changes

WSO2 ESB has DBLookup mediator to read data from a database. But in some cases, you may need ESB to keep polling a database and proceed with some operations only if there's a change in it. Although this is not supported out-of-the-box, we can easily write a simple class mediator to implement this requirement.

One problem that arises when doing this is that there's no way to keep a value in-memory across multiple messages, since the properties used in ESB artifacts are local to the message context. To overcome this, we can consider two options:
  1. Writing a class mediator that keeps the last read value in an instance variable and performs any comparisons required to identify changes in the database.

  2. Simply store the last read value to another table/field and compare it with the current field (this requires multiple DBLookup/Report calls).

In this blog post, we'll explore how we can implement the first option.

So the basic flow will be:
  • A scheduled task keeps calling a sequence
  • In the called sequence, we have a DBLookup mediator that fetches a field from a database table (includes a column that indicates a change, such as a timestamp)
  • The change indicator field is extracted and added to a property
  • Class mediator is called, it compares the property's current value with the previously stored value and sets the result (changed or not) to another property
  • With a filter on the resultant property, we can identify a change and proceed with necessary operations

A sample configuration is as follows.

The scheduled task that keeps calling a sequence:

<?xml version="1.0" encoding="UTF-8"?>
<task xmlns="http://ws.apache.org/ns/synapse"
name="SampleInjectToSequenceTask"
class="org.apache.synapse.startup.tasks.MessageInjector"
group="synapse.simple.quartz">
<trigger count="100" interval="10"/>
<property xmlns:task="http://www.wso2.org/products/wso2commons/tasks" name="message">
<dummyTriggerMessage/>
</property>
<property xmlns:task="http://www.wso2.org/products/wso2commons/tasks"
name="injectTo"
value="sequence"/>
<property xmlns:task="http://www.wso2.org/products/wso2commons/tasks"
name="sequenceName"
value="db_update_detection_sequence"/>
</task>

The sequence that does the DB lookup and identifies if there's a change:

<?xml version="1.0" encoding="UTF-8"?>
<sequence xmlns="http://ws.apache.org/ns/synapse" name="db_update_detection_sequence">
<log level="custom">
<property name="db lookup/change detection sequence" value="true"/>
</log>
<dblookup>
<connection>
<pool>
<password/>
<user>root</user>
<url>jdbc:mysql://localhost:3306/mydb</url>
<driver>com.mysql.jdbc.Driver</driver>
</pool>
</connection>
<statement>
<sql>select * from mytable ;</sql>
<result name="current_indicator_value" column="updatedTime"/>
</statement>
</dblookup>
<class name="org.wso2.sample.PropertyChangeDetector"/>
<filter source="get-property('indicator_changed')" regex="true">
<!-- logic goes here -->
<log level="custom">
<property name="new change detected:" value="change detected"/>
</log>
</filter>
</sequence>

Sample code for the class mediator used in the above sequence can be found here.

Wednesday, October 21, 2015

WSO2 ESB: Transferring data from files to a database

In some integration scenarios you may want to use ESB to poll a directory for files and store the content to a database.

Following steps can be used to read CSV files from a directory, extract their data and then store them in a database.

We use a VFS proxy to poll the directory and look for new files. The parameters transport.vfs.FileURI and transport.vfs.FileNamePatter are used to indicate where/what types of files to look for (any regular expression can be used here).
Once a file is read, we need to convert it to a format that is supported by the DB Report mediator of the ESB. In this case we convert the csv data to xml using the Smooks mediator.

Once the conversion is done, we can easily use the converted xml data inside the DB Report mediator to enter them to a DB with XPath expressions.

In addition, we can specify some actions such as where the file to be placed (or deleted) after storing in the DB (transport.vfs.ActionAfterProcess),  what actions to be taken if a failure occurs (transport.vfs.ActionAfterFailure) etc

The complete proxy configuration is given below. Please note that you have to enable vfs transport in axis2.xml in order to get this working.

<proxy name="VFStoDBProxy" startOnLoad="true" trace="disable" transports="vfs">
<description/>
<target>
<inSequence>
<log level="full"/>
<smooks config-key="smooks-csv">
<input type="text"/>
<output type="xml"/>
</smooks>
<log level="full"/>
<dbreport>
<connection>
<pool>
<password/>
<user>root</user>
<url>jdbc:mysql://localhost:3306/mydb</url>
<driver>com.mysql.jdbc.Driver</driver>
</pool>
</connection>
<statement>
<sql><![CDATA[insert into file_content_table (name, email, telephone, address) values (?,?,?,?)]]></sql>
<parameter expression="//name" type="VARCHAR"/>
<parameter expression="//telephoneNo" type="VARCHAR"/>
<parameter expression="//emailAddress" type="VARCHAR"/>
<parameter expression="//address" type="VARCHAR"/>
</statement>
</dbreport>
</inSequence>
<outSequence/>
</target>
<parameter name="transport.vfs.ActionAfterProcess">DELETE</parameter>
<parameter name="transport.PollInterval">10</parameter>
<parameter name="transport.vfs.FileURI">file://C:/vfs/in</parameter>
<parameter name="transport.vfs.MoveAfterFailure">file://C:/vfs/fail</parameter>
<parameter name="transport.vfs.FileNamePattern">.*\.csv</parameter>
<parameter name="transport.vfs.ContentType">text/plain</parameter>
<parameter name="transport.vfs.ActionAfterFailure">MOVE</parameter>
</proxy>
view raw proxyconfig.xml hosted with ❤ by GitHub
In addition to the above proxy, you have to add the smooks-config xml file to the location specified in the proxy configuration. The smooks configuration for this sample is as follows:

<?xml version="1.0" encoding="UTF-8"?>
<smooks-resource-list xmlns="http://www.milyn.org/xsd/smooks-1.1.xsd" xmlns:csv="http://www.milyn.org/xsd/smooks/csv-1.2.xsd">
<resource-config selector="org.xml.sax.driver">
<resource>org.milyn.csv.CSVReader</resource>
<param name="fields">name,telephoneNo,emailAddress,address</param>
<param name="rootElementName">contacts</param>
<param name="recordElementName">contactRecord</param>
</resource-config>
</smooks-resource-list>
You also need to add a local entry to the esb configuration to point to the smooks config file. (the above proxy configuration points to this local entry key for smooks configuration)

<localEntry key="smooks-csv" src="file:repository/samples/resources/smooks/smooksconfig.xml"/>