Friday, 14 September 2007

XML Debatching in File / FTP adapter

With the release of patchset 10.1.3.3 for Oracle SOA Suite 10.1.3, some cool new features have been introduced that I would like to bring to your attention. Besides the long-awaited error-policy framework, we now also get the option to de-batch large XML files when polling them from a file or FTP location. Debatching has been an option for the File and FTP adapters for some time now, but was always restricted to native (flat) files (typically record-based layouts). Large XML files have always been a challenge to BPEL, because internally XML is loaded in a DOM tree, which takes a considerable amount of memory compared to the original size of the document in its text form.

What does de-batching do?

Debatching polls for incoming files, and when it finds a file and starts reading it, it doesn't start a single new instance of a BPEL process and delivers the file contents, but instead it chops up the file in "batches" of configurable size (typically 1), and starts a new instance for each batch. So for a file of 1000 records (lines) and a batch size of 1, you will end up with 1000 new instances. There is no guarantee that these 1000 instances will be executed in order of the records in the original file, mind you. These instances will be processed by the pool of engine threads just like any other active instance. For XML, as part of the patch install instructions for the 10.1.3.3 patchset, you install a "stream-based" XML parser called STaX, which is well-known in the Java community. Instead of reading the whole file into main memory, the STaX parser just "browses" like a lexical scanner through the XML file and does not build large trees of objects in memory.

How it works

To activate the de-batching behaviour of the adapter, you have to add the "PublishSize" attribute to the jca:operation tag in the adapter's wsdl:

<jca:operation

PhysicalDirectory="D:\"

ActivationSpec="oracle.tip.adapter.file.inbound.FileActivationSpec"
DeleteFile="true"

IncludeFiles=".*\.xml"
PublishSize="1"
PollingFrequency="10"
MinimumAge="5"
OpaqueSchema="false"
>

What it will do at runtime is take your XML file, preserve the root tag, and for each child of the root, generate a separate inbound document containing the child with the root tag again wrapped around it:

<dcs320 xmlns="http://www.customer.com/schemas/dcs320/1.0">
<salesline>
<messagetype>DCS320</messagetype>
<messageid>487235</messageid>
<messagelinenumber>1</messagelinenumber>
<datamutationtype>0</datamutationtype>
<dcaddressnumber>0000410</dcaddressnumber>
</salesline>
<salesline>
<messagetype>DCS320</messagetype>
<messageid>487235</messageid>
<messagelinenumber>2</messagelinenumber>
<datamutationtype>0</datamutationtype>
<dcaddressnumber>0000411</dcaddressnumber>
</salesline>
<salesline>
<messagetype>DCS320</messagetype>
<messageid>487235</messageid>
<messagelinenumber>3</messagelinenumber>
<datamutationtype>0</datamutationtype>
<dcaddressnumber>0000412</dcaddressnumber>
</salesline>
</dcs320>

The above document will launch instances that get documents as input that look like:

<dcs320 xmlns="http://www.customer.com/schemas/dcs320/1.0">
<salesline>
<messagetype>DCS320</messagetype>
<messageid>487235</messageid>
<messagelinenumber>1</messagelinenumber>
<datamutationtype>0</datamutationtype>
<dcaddressnumber>0000410</dcaddressnumber>
</salesline>
</dcs320>

And so on, one for each <salesline> child. The ability to de-batch large XML files gives us the option to process large XML documents (that have a repeating structure) in BPEL, one piece at a time.

Process Control Challenges

The main challenge I've always had with debatching or other "parallel processing" patterns is to stay in control regarding error handling and monitoring progress & completion. Especially for non-transactional protocols like File and FTP, this can be tricky. Another neat feature that has been built into the 10.1.3.3 adapter is a batch notification mechanism. This means that at certain points in its processing, the adapter will post meta-data events about its progress to an event handler, typically another BPEL process. You configure this notification behaviour in the activationAgent section of the bpel.xml in the debatching process. Below an example of such a bpel.xml:

<?xml version = '1.0' encoding = 'UTF-8'?>
<BPELSuitcase>
<BPELProcess id="XMLDebatcher" src="XMLDebatcher.bpel">
<partnerLinkBindings>
<partnerLinkBinding name="debatch">
<property name="wsdlLocation">debatch.wsdl</property>
</partnerLinkBinding>
</partnerLinkBindings>
<activationAgents>
<activationAgent className="oracle.tip.adapter.fw.agent.jca.JCAActivationAgent" partnerLink="debatch">
<property name="batchNotificationHandler">bpel://default|BatchMgmtProcess</property>
<property name="portType">Read_ptt</property>
</activationAgent>
</activationAgents>
</BPELProcess>
</BPELSuitcase>

Above, the batch notification events are sent to a BPEL process called "BatchMgmtProcess", deployed in the "default" domain on the same BPEL server. The wsdl of this BatchMgmtProcess must define a number of messageTypes and operations. This is what it should look like (never mind the correlation stuff for now):

<definitions
name="BatchManagerInterface"
targetNamespace="http://xmlns.oracle.com/pcbpel/batching"
xmlns="http://schemas.xmlsoap.org/wsdl/"
xmlns:bpws="http://schemas.xmlsoap.org/ws/2003/03/business-process/"
xmlns:tns="http://xmlns.oracle.com/pcbpel/batching"
xmlns:plnk="http://schemas.xmlsoap.org/ws/2003/05/partner-link/"
xmlns:pns1="http://xmlns.oracle.com/BatchMgmtProcess/correlationset"
xmlns:batch="http://xmlns.oracle.com/pcbpel/batching/types"
>
<import namespace="http://xmlns.oracle.com/BatchMgmtProcess/correlationset" location="BatchMgmtProcess_Properties.wsdl"/>
<types>
<schema attributeFormDefault="qualified" elementFormDefault="qualified" targetNamespace="http://xmlns.oracle.com/pcbpel/batching/types"
xmlns:tns="http://xmlns.oracle.com/pcbpel/batching/types" xmlns="http://www.w3.org/2001/XMLSchema">
<element name="batchReadInitiateElement" type="tns:batchReadInitiateType"/>
<complexType name="batchReadInitiateType">
<sequence>
<element name="batchId" type="string"/>
<element name="batchMetaData" type="string"/>
<element name="batchDescription" type="string"/>
<element name="process" type="string"/>
<element name="domain" type="string"/>
</sequence>
</complexType>
<element name="batchReadCompleteElement" type="tns:batchReadCompleteType"/>
<complexType name="batchReadCompleteType">
<sequence>
<element name="batchId" type="string"/>
<element name="batchMetaData" type="string"/>
<element name="batchDescription" type="string"/>
<element name="batchExpectedSize" type="long"/>
<element name="process" type="string"/>
<element name="domain" type="string"/>
</sequence>
</complexType>
<element name="batchProcessCompleteElement" type="tns:batchProcessCompleteType"/>
<complexType name="batchProcessCompleteType">
<sequence>
<element name="batchId" type="string"/>
<element name="batchMetaData" type="string"/>
<element name="batchDescription" type="string"/>
<element name="batchFinalSize" type="long"/>
<element name="process" type="string"/>
<element name="domain" type="string"/>
</sequence>
</complexType>
<element name="batchReadFailureElement" type="tns:batchReadFailureType"/>
<complexType name="batchReadFailureType">
<sequence>
<element name="batchId" type="string"/>
<element name="batchMetaData" type="string"/>
<element name="batchDescription" type="string"/>
<element name="batchPartialSize" type="long"/>
<element name="process" type="string"/>
<element name="domain" type="string"/>
</sequence>
</complexType>
</schema>
</types>
<message name="batchReadInitiateMessage">
<part name="event" element="batch:batchReadInitiateElement"/>
</message>
<message name="batchReadCompleteMessage">
<part name="event" element="batch:batchReadCompleteElement"/>
</message>
<message name="batchProcessCompleteMessage">
<part name="event" element="batch:batchProcessCompleteElement"/>
</message>
<message name="batchReadFailureMessage">
<part name="event" element="batch:batchReadFailureElement"/>
</message>
<portType name="BatchManagerInterface">
<operation name="onBatchReadStart">
<input message="tns:batchReadInitiateMessage"/>
</operation>
<operation name="onBatchReadComplete">
<input message="tns:batchReadCompleteMessage"/>
</operation>
<operation name="onBatchProcessComplete">
<input message="tns:batchProcessCompleteMessage"/>
</operation>
<operation name="onBatchReadFailure">
<input message="tns:batchReadFailureMessage"/>
</operation>
</portType>
<plnk:partnerLinkType name="BatchManagerInterfacePartnerLinkType">
<plnk:role name="BatchManagerInterfaceRole">
<plnk:portType name="tns:BatchManagerInterface"/>
</plnk:role>
</plnk:partnerLinkType>
<bpws:propertyAlias propertyName="pns1:batchId" messageType="tns:batchReadCompleteMessage" part="event"
query="/batch:batchReadCompleteElement/batch:batchId"/>
<bpws:propertyAlias propertyName="pns1:batchId" messageType="tns:batchReadInitiateMessage" part="event"
query="/batch:batchReadInitiateElement/batch:batchId"/>
<bpws:propertyAlias propertyName="pns1:batchId" messageType="tns:batchReadFailureMessage" part="event"
query="/batch:batchReadFailureElement/batch:batchId"/>
</definitions>

The value of the batchId field is generated at runtime by the adapter, and can for example be used to initiate a correlationSet. You can then later on receive (e.g. using a Pick with correlation) a batchReadComplete or batchReadFailure message for the same file in the same monitoring instance. Here’s a fragment of what it could look like:

<correlationSets>

<correlationSet name="CorrelationSet_1" properties="ns3:batchId"/>

</correlationSets>

<sequence name="main">

<pick name="Pick_1" createInstance="yes">

<onMessage portType="ns1:BatchManagerInterface"

operation="onBatchReadStart"

variable="OnMessage_onBatchReadStart_InputVariable"

partnerLink="BatchManagerInterface">

<correlations>

<correlation initiate="yes" set="CorrelationSet_1"/>

</correlations>

Happy Debatching!