Introduction to Custom Pipeline Processing in Microsoft BizTalk Server 2004

Christof Claessens

Applies to:
  • Visual Studio .NET 2003
  • BizTalk Server 2004
Summary:

The BizTalk Server pipeline architecture opens up a wealth of opportunities by allowing you to plug in you own components. Technically, when developing so called "custom pipeline components", only a few basic principles need to be taken into account. As such, nearly any .NET developer should be able to get away with it within a few hours. In order to even further improve this developer experience, this article aims at explaining and introducing a couple of basic principles and techniques.

Downloads:
Contents:

Introduction

Microsoft BizTalk Server 2004 has completely been rearchitected and opens up a whole new world, full of new concepts, principles and opportunities, ready to be discovered by you. Before we get started, I would like to use this opportunity to invite you to join the BizTalk Server community. If you feel like sharing your knowledge, your findings or methodologies, please do so! It will not only encourage further activities in this community and help others; it also gives you an opportunity to validate your knowledge in any area you would like to open for discussion...


Pipeline Basics

The MSDN and the BizTalk Server online documentation do a great job explaining the basics. However, let's repeat a few things here, in order to get up to speed quickly.

Stages

Every pipeline is divided in a couple of so called "pipeline stages". These differ somewhat depending on the type of pipeline. Each of them:

  • is designed to contain a particular type of component
  • can possibly contain one or more pipeline components
  • is executed in a predefined order

For a "receive pipeline", the following stages are executed in the specified order:

  • Decoding stage: This stage typically contains components that "decode" message data. For example, components that remove encryption, components that verify against certificates, ...
  • Disassemble stage: This stage typically contains components that split up message data. In order to split data, the format the data is in, has to be recognized as well. So typically those features are combined together:
    • matching the message with the correct schema
    • depending on the type of schema (basic or envelope): splitting up the message in different messages
  • Validate stage: This stage typically contains components that validate the data in a message. Although theoretically validation may happen against any type of given structure description, this will almost always be an XML Schema definition. (Even flat files are described using annotated schemas...)
  • ResolveParty stage: This stage typically contains components that attempt to determine from which partner a message came from.

For "send pipelines", the stages are ordered like this:

  • Pre-assemble stage: Like its name implies, this stage may contain components that perform actions, which need to happen before the assemble stage.
  • Assemble stage: The components in the assemble stage typically serialize the message in the desired output format. (Think for example about components that serialize the internal XML data to a flat file structured output message.)
  • Encode stage: This stage is used mainly for encoding, signing and encryption purposes.

Stage Execution

Pipelines are executed as soon as a message is received or transmitted. (Note: This implies, for example, that a receive pipeline accesses data that has never been persisted previously!)

As mentioned before, each of the stages in a pipeline is executed in a particular order. Since all stages may contain more than one single pipeline component, BizTalk Server somehow has to determine which of them to execute and in which order. It is exactly for this purpose, that each stage is assigned an "Execution mode".

Possible values for the execution mode of a pipeline stage are:

  • All
  • First match

When the execution mode is set to "all", all pipeline components in the stage are executed in the order as they appear on the design surface. For stages with their execution mode equal to "first match", the components will "probe" the message. Probing means that one by one, each of the components gets a chance to take a look at the message and determine whether or not they can handle it. As soon as a component "matches" the message, the probing ends and the matching component will proceed with processing.

In practice, only the disassemble stage of a receive pipeline uses the first match algorithm.


Custom Pipeline Components Development

Introduction to custom pipeline components

For a wealth of different reasons, you might decide on writing your own BizTalk Server custom pipeline component. A few ideas:

  • Validations that cannot be expressed in XML schema
  • Decryption algorithms not supported by BizTalk Server out of the box
  • Checks on signature formats that BizTalk Server yet does not recognize
  • Conversions that might not be possible using the BizTalk Server mapper
  • Manually implement a disassemble algorithm to split up your messages
  • Provision of additional context to a message as it comes in, in order to support advanced routing and correlation scenarios
  • ...

In order to start developing you first need to determine which kind of component you would like to create:

  • General pipeline component:
    • Input: one message
    • Output: one message
    • Can be placed in any stage except for the assembling or disassembling stage
  • Assembling pipeline component:
    • Input: takes possible more then one message (algorithms not yet implemented in BizTalk Server 2004)
    • Output: one message
    • Can only reside in the assembling stage
  • Disassembling pipeline component
    • Input: takes one message
    • Output: returns one or more messages
    • Can only reside in the disassembling stage
  • "Probing pipeline component": Even though this type of component is discussed in the documentation, I will purposely not discuss it here. Probing pipelines are only useful in stages with their execution mode set to First Match. So it does not make any sense to discuss this separately since the disassembling component is the only probing pipeline for the moment, in BizTalk Server.

Development Basics

Although the concepts discussed so far may seem quite complex, the interfaces you need to implement, to get your pipeline component up and running, are in fact not difficult at all. Once you know how one type of component behaves, other types of components will not cause you any trouble either. So, for the purpose of this paper, I will focus on the development of "General Pipeline Components".

In order to develop a general pipeline component, you will need to implement the following interfaces:

  • IBaseComponent
  • IComponent
  • IComponentUI
  • IPersistPropertyBag

IBaseComponent

This one's pretty easy. The IBaseComponent interface contains just three functions so that the engine is able to retrieve the component's name, description and version.

IComponent

This interface contains the single core method of the pipeline processing framework:

public IBaseMessage Execute(IpipelineContext pContext, IBaseMessage pInMsg) 

This method takes the pipeline's context and an inbound message, and is responsible for returning the outbound processed message. The BizTalk Server IBaseMessage interface defines some functions, methods and properties that:

  • Give read/write access to the message's data parts
  • Give read/write access to the message's context

Each message part implements the IBaseMessagePart interface, whose core property is the Data property:

Stream Data {get; set;} 

Since the Data is typed as Stream, the framework is actually able to effectively "stream" data through the different components, without needing to take each message completely in memory.

So... how does that work?

Assume two custom pipeline components pcA and pcB, with pcB executed after pcA. The pipeline framework will make sure that pcA's execute method will provide the IBaseMessage that pcB will work on. The IBaseMessage, as discussed before, is a collection of one or more message parts. Although more parts are possible, only one can be marked as the "body part". To keep it simple, assume just one part was present: the body part. Since the body part's data was represented as Stream, pcB is forced to work on pcA's result stream. This means that while pcB is reading pcA's result stream, pcA is still able to continue generating it. If pcA was made streaming, the data it provides as the result of the Execute call will be generated/modified/looked at, while it is requested byte by byte by pcB! This means that pcB (if made streaming), in turn, will only request bytes, as they are needed to provide pcC with bytes as it reads pcB's body part's stream.

IComponentUI

The IComponentUI interface contains two methods:

  • One that enables you to validate the components configuration.
  • One that provides the design time environment with an icon for the component. If NULL is returned, the default icon will be used.

IPersistPropertyBag

This interface enables a component to store and retrieve its configuration. Storing and retrieving config settings is done through what is called a "propertyBag" - an abstraction of the storage for component properties.

Things that require your attention

Try to make your pipeline components streaming, as much as possible! Non stream-able components will not only consume just as much memory as the message needs, but in addition will hit performance as well! At least, this usually is the case...

Another important thing to notice is that receive pipeline components are executed before any message persistence has been done. This means that losing data in your component (for example, as a result of bad exception handling) means the loss of the entire data. Even the adapter will not be able to undo this.


Writing a streaming pipeline component: sample walkthrough

Ok... so far for the theory! Let's walk through a working sample and discover how things look like in code! While doing this, I'll make use of some techniques and a framework that highly facilitate the writing of a fully streaming pipeline component.

Since discussing every single line of code would take me too far, for this paper's purpose, I will focus on the core aspects of coding pipeline components. For full code reference, the source code will be made publicly available as well.

Sample component's goal

So, first of all: what will our component's goal be? As you'll probably now, BizTalk Server allows you to configure property promotion in a very intuitive way. However, this has its limits. Things that are by default not possible with regard to property promotion are, among others:

  • Promoting a repeating field (for example: imagine you only need the first occurrence)
  • Promoting a field whose location depends on the document's structure or content (for example: the first name of the first "author" node whose last name was set to "claessens")

Bad luck if you need something similar? Not if you continue reading :-) Our sample component will target exactly those kinds of scenarios, where you cannot count on the BizTalk features to promote your data!

How does this work?

Our component will allow the developer to set three properties at design time:

  • TargetPromotedPropertyName
  • TargetPromotedPropertyNamespace
  • StreamingXPathExpression

The first two components determine the target property to promote the final result on. The third, StreamingXPathExpression, will take an XPath location that indicates the location, of the value in the incoming xml stream to promote, to the component.

Now... wait a minute. Didn't XPath require you to load the full stream into the DOM? So, how can we ever make this streaming?

Indeed, the regular XPath expressions require the data to be available in a document object model. This has very good reasons: ultimately, XPath expressions allow for quite arbitrary location steps to be specified. Each of those location steps may choose to jump to either siblings, child or even parent xml nodes! To allow this, the entire data should be in memory, anyway. So far, no luck as regular XPath is concerned. However, for quite some time there were discussions about "streaming XPath". (Also called: forward-only XPath.)

Not until recently, a first implementation of this XPath subset has finally been made available on MSDN. Streaming XPath allows the use of only a subset of regular XPath. That subset is especially targeted at being able to walk through XML in a forward only, streaming way. Dare Obasanjo (Program Manager for the Microsoft Web Data team) and Howard Hao have written an excellent article on this, explaining the SXPath concepts very well. In addition to this, as mentioned before, a first implementation, called the "XPathReader", was made available as well. For more in depth info about this new technology, I would like to refer you to these excellent resources.

So, how does this relate to our component? Since regular XPath did not fit our needs, we'll resort to a streaming XPath implementation! This will enable you to:

  • promote properties
  • located by an XPath expression
  • in a streaming manner
  • without any major performance impact and a
  • absolute minimal memory footprint

Way too cool for a sample? Not at all, we'll get this up and running by the end of this paper!

Note: as this is a work in progress, the actual sample implementation, available for download, may differ slightly from the code discussed below.

Coding the public design time properties

Our streaming pipeline component will expose three properties at design time. Implementing them can't be more straightforward: we'll define 3 public properties and we're done! Any public properties that don't have a browseable attribute set to false, will be accessible in the design time pipeline component property pane.

private string m_prompropname = ""; 
private string m_prompropnamespace = ""; 
private string m_sxpathexpression = ""; 
 
public string TargetPromotedPropertyName 
{ 
    get{ return m_prompropname; } 
    set{ m_prompropname = value; } 
} 
 
public string TargetPromotedPropertyNamespace 
{ 
    get{ return m_prompropnamespace; } 
    set{ m_prompropnamespace = value; } 
} 
 
public string StreamingXPathExpression 
{ 
    get{ return m_sxpathexpression; } 
    set{ m_sxpathexpression = value; } 
}

Persistence of design time properties

Now that we have coded the design time properties, we still need some means to persist their values (configured at design time) so that they will be accessible at run time as well. It is exactly for this purpose, that the IPersistPropertyBag interface was developed. A property bag is basically an abstraction of a persistent storage mechanism. Using name-value pairs, it allows you to read and write data to it. In code, the 2 core interface methods look like:

//Load is called by the environment when the component should load all 
// persisted values: 
public void Load(IPropertyBag propertyBag, int errorLog) 
{ 
    m_prompropname = readBagForStringValue(ref propertyBag, m_propbagkey_prompropname); 
    m_prompropnamespace = readBagForStringValue(ref propertyBag, 
      m_propbagkey_prompropnamespace); 
    m_sxpathexpression = readBagForStringValue(ref propertyBag, 
      m_propbagkey_sxpathexpression);   
} 
 
// Save is called by the environment when the component should save all  
// persistent values: 
public void Save(IPropertyBag propertyBag, bool clearDirty, bool saveAllProperties) 
{ 
    writeBagWithStringValue(ref propertyBag, m_propbagkey_prompropname, 
      m_prompropname); 
    writeBagWithStringValue(ref propertyBag, m_propbagkey_prompropnamespace, 
      m_prompropnamespace); 
    writeBagWithStringValue(ref propertyBag, m_propbagkey_sxpathexpression, 
      m_sxpathexpression); 
} 
 
//Helper methods for accessing the property bag: 
private string readBagForStringValue(ref IPropertyBag bag, 
string key, string defaultvalue) 
{ 
	object result; 
 
  	try 
    { 
      bag.Read(key, out result, 0); 
      if(result != null) return (string) result; 
      else return defaultvalue; 
    } 
    catch 
    { 
      return defaultvalue; 
    } 
} 
 
private string readBagForStringValue(ref IPropertyBag bag, string key) 
{ 
    return readBagForStringValue(ref bag, key, ""); 
} 
private void writeBagWithStringValue(ref IPropertyBag bag, string propertyName, string valueToWrite) 
{ 
    object obj_valueToWrite = valueToWrite; 
    bag.Write(propertyName, ref obj_valueToWrite); 
}

Implementing the the IBaseComponent properties

Every pipeline component should implement IBaseComponent, which basically enables any environment to ask the component for its name, version and description. Since those properties should not be available in the design time property pane, we'll mark them as not browseable:

//Constant pipeline component values: 
private const string C_COMPONENTNAME = "Streaming Promotion Pipeline Component"; 
private const string C_COMPONENTDESC = "BizTalk Server 2004 Pipeline Component" + 
  " taking care of promoting data using XPath in a streaming implementation."; 
private const string C_COMPONENTVER = "1.0.0.0"; 
 
//Implementation of the IBaseComponent interface: 
[Browsable(false)] 
public string Description 
{ 
    get{ return C_COMPONENTDESC; } 
} 
 
[Browsable(false)] 
public string Name 
{ 
    get{ return C_COMPONENTNAME; } 
} 
 
[Browsable(false)] 
public string Version 
{ 
    get{ return C_COMPONENTVER; } 
}

A fancy icon on a valid component

The IComponentUI enables us both:

  • to provide a custom icon for our pipeline component
  • to validate the property/configuration data, configured at design time

A minimal implementation might look like:

public System.Collections.IEnumerator Validate(object projectSystem) 
{ 
    System.Collections.Specialized.StringCollection validationResult =  
      new System.Collections.Specialized.StringCollection(); 
 
    return (IEnumerator) validationResult.GetEnumerator(); 
} 
 
[Browsable(false)] 
public System.IntPtr Icon 
{ 
    get 
    { 
       //No icon associated with this pipeline component: 
       return IntPtr.Zero; 
    } 
}

Processing the message data

Now that we've implemented the basic plumbing, let's dive into the core method implementation of the pipeline processing framework: the IComponent interface's Execute method!

As described previously, this method will be responsible for providing the next component in line with the resulting processed datastream in which it will work. Since we can split most responsibilities into several methods and custom helper classes, we're able to keep the interface's implementation relatively short:

private IBaseMessage m_messageToPromoteOn; 
private CloningStreamSplitter m_streamsplitter; 
 
public IBaseMessage Execute(IPipelineContext pContext, IBaseMessage pInMsg) 
{ 
    m_streamsplitter = new CloningStreamSplitter(pInMsg.BodyPart.Data); 
 
    //Start new thread, promoting the data found on configured XPath location: 
    Thread dataPromotionThread = new Thread(new ThreadStart(StartPromotingData)); 
    dataPromotionThread.Start(); 
 
    m_messageToPromoteOn = cloneIBaseMessage( 
      pContext.GetMessageFactory(), pInMsg, m_streamsplitter.PrimaryStream); 
 
    return m_messageToPromoteOn; 
}

At this point most of this will probably seem rather unclear. No worries, let's walk through each of these details one by one.

Introducing the CloningStreamsplitter

First of all, we use a class called "CloningStreamSplitter". This class basically:

  • takes a stream in its constructor
  • after construction, provides two properties:
    • the original stream
    • a clone of the original stream

Why do we need this? Normally, streams (like discussed already) within the pipeline processing framework are non-seekable. This means that once a stream is read, we cannot "undo" this. Another issue with this is, that reading a stream that contains XML data, will most probably be done using the XmlReader class. This class is an excellent tool for parsing XML data, but unfortunately does not offer any handy way of "forwarding" parsed data that has been read. The only way this would be possible, is to write each attribute, element, processing instruction, whitespace etc... out to some buffer mechanism, which in turn could be used again to create some other stream... This is definitely NOT something you'd like to do, believe me! Even when you would succeed in doing this, you'll lose information like non-significant whitespace and CDATA marks. In some scenarios (where signing and encryption is used), this cannot be allowed.

So, our CloningStreamSplitter solves those issues because it will be able to produce a literal byte-exact clone of the original stream. (We'll discuss its implementation a little further, so read on.) This allows us to work on the cloned stream, instead of on the original stream.

m_streamsplitter = new CloningStreamSplitter(pInMsg.BodyPart.Data);

The pInMsg contains one or more message parts of which one will be marked as the body. The "data" property on that body will give us the data stream we can work on. In this case we clone it before performing any actions on any data.

A separate thread dedicated to data processing

As we'll discuss later on, our CloningStreamSplitter's implementation will require that both the cloned stream and the original stream will be read by two different threads. One of those threads will be the thread the next pipeline component will use to read the original data stream with, while the other thread is up to us to choose.

Since the execute method's purpose is to return the result message the next component has to act on, we can't afford to block the current execute method's calling thread... So the only option left is to create a kind of processing/worker thread, responsible for:

  • reading the cloned stream
  • parsing it using the XmlReader
  • looking for value present on the configured streaming XPath location
  • using the XPathReader implementation we discussed before
  • promoting the data found:
    • on the message context
    • within the configured context namespace and property

//Start new thread, promoting the data found on configured XPath location: 
 
Thread dataPromotionThread = new Thread(new ThreadStart(StartPromotingData)); 
dataPromotionThread.Start();

The thread will execute the code in the StartPromotingData method, which we'll discuss later on. As for now, we're ready to create the result message.

Creating the result message

Messages within the pipeline processing framework always comply with the IBaseMessage interface and follow the factory pattern. The factory required for the creation of such an IBaseMessage instance can be found on the pContext pipeline context parameter, given in the call to Execute.

Further, we'll need to:

  • copy each of the non-body message parts of pInMsg, onto that instance
  • copy the context that was already present on pInMsg, onto that instance
  • add a new body message part, to that instance, that, as its data property, will provide the resulting data stream (in our case, just the original one, since we're not modifying any data)

m_messageToPromoteOn = cloneIBaseMessage( 
  pContext.GetMessageFactory(), pInMsg, m_streamsplitter.PrimaryStream);

The cloneIBaseMessage implementation, responsible for preparing the result message, looks like:

//Clones a full IBaseMessage except for the body part: 
IBaseMessage cloneIBaseMessage(IBaseMessageFactory msgFactory, IBaseMessage srcMessage) 
{ 
    return cloneIBaseMessage(msgFactory, srcMessage, srcMessage.BodyPart.Data); 
} 
 
//Clones a full IBaseMessage except for the body part and assigns the new body stream: 
IBaseMessage cloneIBaseMessage(IBaseMessageFactory msgFactory, IBaseMessage srcMessage, 
          					   Stream newBodyDataStream) 
 
{ 
    IBaseMessage resultMsg = msgFactory.CreateMessage(); 
 
    //copy context by reference: 
    resultMsg.Context = srcMessage.Context; 
 
    //copy all non-body parts by reference: 
    copyAllNonBodyMsgParts(srcMessage, resultMsg); 
 
    //create new body part: 
    IBaseMessagePart body = msgFactory.CreateMessagePart(); 
    body.PartProperties = srcMessage.BodyPart.PartProperties; 
    body.Data = newBodyDataStream; 
 
    //add it to the result message: 
    resultMsg.AddPart(srcMessage.BodyPartName, body, true); 
 
    return resultMsg; 
} 
 
void copyAllNonBodyMsgParts(IBaseMessage src, IBaseMessage dest) 
{ 
    for(int i = 0; i < src.PartCount; i++) 
    { 
      //fetch part from original message: 
      string partname; 
      IBaseMessagePart part = src.GetPartByIndex(i, out partname); 
 
      //copy it to destination message: 
      if(partname != src.BodyPartName) 
        dest.AddPart(partname, part, false); 
    } 
}

Use of the streaming XPath implementation in data promotion

As previously described, the actual implementation of the data promotion will happen by a dedicated thread, using the XPathReader implementation:

private void StartPromotingData() 
{ 
    XPathReader xpreader = null; 
 
    try 
    { 
     	 //Create streaming XPath reader, based on a clone of the original datastream: 
     	 xpreader = new XPathReader( 
     		 new StreamReader(m_streamsplitter.ClonedStream), m_sxpathexpression); 
 
     	 //Read until matched the xpath expression: 
     	 xpreader.ReadUntilMatch(); 
 
    	 //Promote value found on configured XPath location: 
    	 m_messageToPromoteOn.Context.Promote(
    	 	m_prompropname,
    	 	m_prompropnamespace,
    	 	xpreader.Value); 
 
     	 //Even after data found: keep on reading till end of cloned stream: 
     	 while(xpreader.Read()){} 
    } 
    catch(Exception e) 
    { 
      	throw new Exception( 
         "StreamingPipelineComponent failed during promotion of data.", e); 
    } 
    finally 
    { 
      	if(xpreader != null) xpreader.Close(); 
    } 
}

For more details on streaming XPath and how this can be used within your own scenario's, I'd like to refer to the MSDN articles on this subject. As far as the actual promotion of data goes, this is fairly simple:

//Promote value found on configured XPath location: 
m_messageToPromoteOn.Context.Promote( 
    m_prompropname, 
    m_prompropnamespace, 
    xpreader.Value);

Each IBaseMessage has a property named "Context", that allows you to promote data on. In this case we'll promote the value, found by the XPathReader, onto the context property configured at design time.

Cloning a stream for use in pipeline components

I will not discuss in detail the code that is responsible for stream cloning. Instead, I'd rather like to give an overview of the basic principles and techniques used in there. For actual implementation details, the code is available for download.

The most important constraint to take into account when implementing the CloningStreamSplitter, is the fact that we are implementing a streaming pipeline component. This means that no data, whatsoever, can be kept in memory. In the end this means that the data for the cloned stream can only be provided when two conditions are met:

  • it has been requested for, by the consuming thread (we cannot buffer the whole datastream in advance, so we'll need to generate the data as is asked for, on the fly)
  • the original datastream has already been read (if not, the data cannot be cloned since it is not known yet – remember the stream is not seek-able, going back is not possible)

These two conditions are only met if both the original stream and the cloned stream are read in sync and in order.

Assume two threads (like we discussed, this is a requisite in order for this technique to work):

  • the "originaldatathread", reading and consuming the original data stream (or at least: our implementation's proxy to the original data stream)
  • the "cloneddatathread", reading and consuming the cloned data stream (which will be our own stream implementation)

First the original stream has to be read from. That data will be buffered after which it is handed to the originaldatathread. If the originaldatathread reads from the stream again, the call needs to be blocked until the equal amount of bytes has been read from the cloned stream as well. (Remember: the two streams have to be read in sync in order to prevent keeping any data in memory.)

The cloned stream will actually be our own System.IO.Stream implementation. This allows data to be returned, repeatedly, from the small data buffer, which is filled while the original data stream is read from. In case the cloneddatathread would request data, which has not been read yet by the originaldatathread, the call has to be blocked until the requested data is available.

This way, by intelligently blocking and synchronizing threads, we avoid keeping data in memory. At the same time:

  • our component remains fully streaming
  • the original data is not touched at all and remains byte by byte exactly the same as the inbound stream
  • the implementation model for the data processing is tremendously simplified and allows the use of components similar to the:
    • XmlReader
    • XPathReader

The actual sample source code showing this technique is available for download.

Conclusion

This paper was aimed at providing some guidance in pipeline processing with in particular a strong focus on custom pipeline development. After explaining some basic principles and giving an introduction to the various interfaces involved, it drew attention to some things to keep in mind. Finally, a sample walkthrough illustrates how all this looks in code.

BizTalk Server provides you with a very powerful pipeline processing framework. This power, however, comes with some drawbacks: to make pipeline components streaming, some effort will be needed. Since a lot can be made generic, good architecture could prove to be a great help.


About the author

Christof Claessens was "born into .NET" with the first .NET Framework beta as subject of his thesis. He finds XML and related technologies too attractive to be left unexplored. So his spare time and weekends are mainly consumed by his passion for technology with a focus on EAI (Enterprise Application Integration) and SOA (Service Oriented Architectures) in particular. As Christof participated in the BizTalk Server 2004 "Early Adopter Program", he now actively contributes to the BizTalk Server newsgroups as well as in the Belgian BizTalk User group. Christof also does presentations on Belgian TechNet events where he shares his passion for BizTalk Server. You can contact him at: bts2004communityfeedback(at) hotmail.com.
Christof Claessens