Introduction to Custom Pipeline Processing in Microsoft BizTalk Server 2004Christof Claessens
Applies to:
The BizTalk Server pipeline architecture opens up a wealth of opportunities by allowing you to plug in you own components. Technically, when developing so called "custom pipeline components", only a few basic principles need to be taken into account. As such, nearly any .NET developer should be able to get away with it within a few hours. In order to even further improve this developer experience, this article aims at explaining and introducing a couple of basic principles and techniques. Downloads: Contents: IntroductionMicrosoft BizTalk Server 2004 has completely been rearchitected and opens up a whole new world, full of new concepts, principles and opportunities, ready to be discovered by you. Before we get started, I would like to use this opportunity to invite you to join the BizTalk Server community. If you feel like sharing your knowledge, your findings or methodologies, please do so! It will not only encourage further activities in this community and help others; it also gives you an opportunity to validate your knowledge in any area you would like to open for discussion... ![]() Pipeline BasicsThe MSDN and the BizTalk Server online documentation do a great job explaining the basics. However, let's repeat a few things here, in order to get up to speed quickly. StagesEvery pipeline is divided in a couple of so called "pipeline stages". These differ somewhat depending on the type of pipeline. Each of them:
For a "receive pipeline", the following stages are executed in the specified order:
For "send pipelines", the stages are ordered like this:
Pipelines are executed as soon as a message is received or transmitted. (Note: This implies, for example, that a receive pipeline accesses data that has never been persisted previously!) As mentioned before, each of the stages in a pipeline is executed in a particular order. Since all stages may contain more than one single pipeline component, BizTalk Server somehow has to determine which of them to execute and in which order. It is exactly for this purpose, that each stage is assigned an "Execution mode". Possible values for the execution mode of a pipeline stage are:
When the execution mode is set to "all", all pipeline components in the stage are executed in the order as they appear on the design surface. For stages with their execution mode equal to "first match", the components will "probe" the message. Probing means that one by one, each of the components gets a chance to take a look at the message and determine whether or not they can handle it. As soon as a component "matches" the message, the probing ends and the matching component will proceed with processing. In practice, only the disassemble stage of a receive pipeline uses the first match algorithm. ![]() Custom Pipeline Components DevelopmentIntroduction to custom pipeline componentsFor a wealth of different reasons, you might decide on writing your own BizTalk Server custom pipeline component. A few ideas:
In order to start developing you first need to determine which kind of component you would like to create:
Although the concepts discussed so far may seem quite complex, the interfaces you need to implement, to get your pipeline component up and running, are in fact not difficult at all. Once you know how one type of component behaves, other types of components will not cause you any trouble either. So, for the purpose of this paper, I will focus on the development of "General Pipeline Components". In order to develop a general pipeline component, you will need to implement the following interfaces:
This one's pretty easy. The IBaseComponent interface contains just three functions so that the engine is able to retrieve the component's name, description and version. IComponentThis interface contains the single core method of the pipeline processing framework: public IBaseMessage Execute(IpipelineContext pContext, IBaseMessage pInMsg) This method takes the pipeline's context and an inbound message, and is responsible for returning the outbound processed message. The BizTalk Server IBaseMessage interface defines some functions, methods and properties that:
Each message part implements the IBaseMessagePart interface, whose core property is the Data property: Stream Data {get; set;} Since the Data is typed as Stream, the framework is actually able to effectively "stream" data through the different components, without needing to take each message completely in memory. So... how does that work? Assume two custom pipeline components pcA and pcB, with pcB executed after pcA. The pipeline framework will make sure that pcA's execute method will provide the IBaseMessage that pcB will work on. The IBaseMessage, as discussed before, is a collection of one or more message parts. Although more parts are possible, only one can be marked as the "body part". To keep it simple, assume just one part was present: the body part. Since the body part's data was represented as Stream, pcB is forced to work on pcA's result stream. This means that while pcB is reading pcA's result stream, pcA is still able to continue generating it. If pcA was made streaming, the data it provides as the result of the Execute call will be generated/modified/looked at, while it is requested byte by byte by pcB! This means that pcB (if made streaming), in turn, will only request bytes, as they are needed to provide pcC with bytes as it reads pcB's body part's stream. IComponentUIThe IComponentUI interface contains two methods:
This interface enables a component to store and retrieve its configuration. Storing and retrieving config settings is done through what is called a "propertyBag" - an abstraction of the storage for component properties. Things that require your attentionTry to make your pipeline components streaming, as much as possible! Non stream-able components will not only consume just as much memory as the message needs, but in addition will hit performance as well! At least, this usually is the case... Another important thing to notice is that receive pipeline components are executed before any message persistence has been done. This means that losing data in your component (for example, as a result of bad exception handling) means the loss of the entire data. Even the adapter will not be able to undo this. ![]() Writing a streaming pipeline component: sample walkthroughOk... so far for the theory! Let's walk through a working sample and discover how things look like in code! While doing this, I'll make use of some techniques and a framework that highly facilitate the writing of a fully streaming pipeline component. Since discussing every single line of code would take me too far, for this paper's purpose, I will focus on the core aspects of coding pipeline components. For full code reference, the source code will be made publicly available as well. Sample component's goalSo, first of all: what will our component's goal be? As you'll probably now, BizTalk Server allows you to configure property promotion in a very intuitive way. However, this has its limits. Things that are by default not possible with regard to property promotion are, among others:
Bad luck if you need something similar? Not if you continue reading :-) Our sample component will target exactly those kinds of scenarios, where you cannot count on the BizTalk features to promote your data! How does this work?Our component will allow the developer to set three properties at design time:
The first two components determine the target property to promote the final result on. The third, StreamingXPathExpression, will take an XPath location that indicates the location, of the value in the incoming xml stream to promote, to the component. Now... wait a minute. Didn't XPath require you to load the full stream into the DOM? So, how can we ever make this streaming? Indeed, the regular XPath expressions require the data to be available in a document object model. This has very good reasons: ultimately, XPath expressions allow for quite arbitrary location steps to be specified. Each of those location steps may choose to jump to either siblings, child or even parent xml nodes! To allow this, the entire data should be in memory, anyway. So far, no luck as regular XPath is concerned. However, for quite some time there were discussions about "streaming XPath". (Also called: forward-only XPath.) Not until recently, a first implementation of this XPath subset has finally been made available on MSDN. Streaming XPath allows the use of only a subset of regular XPath. That subset is especially targeted at being able to walk through XML in a forward only, streaming way. Dare Obasanjo (Program Manager for the Microsoft Web Data team) and Howard Hao have written an excellent article on this, explaining the SXPath concepts very well. In addition to this, as mentioned before, a first implementation, called the "XPathReader", was made available as well. For more in depth info about this new technology, I would like to refer you to these excellent resources. So, how does this relate to our component? Since regular XPath did not fit our needs, we'll resort to a streaming XPath implementation! This will enable you to:
Way too cool for a sample? Not at all, we'll get this up and running by the end of this paper! Note: as this is a work in progress, the actual sample implementation, available for download, may differ slightly from the code discussed below. Coding the public design time propertiesOur streaming pipeline component will expose three properties at design time. Implementing them can't be more straightforward: we'll define 3 public properties and we're done! Any public properties that don't have a browseable attribute set to false, will be accessible in the design time pipeline component property pane.
private string m_prompropname = "";
private string m_prompropnamespace = "";
private string m_sxpathexpression = "";
public string TargetPromotedPropertyName
{
get{ return m_prompropname; }
set{ m_prompropname = value; }
}
public string TargetPromotedPropertyNamespace
{
get{ return m_prompropnamespace; }
set{ m_prompropnamespace = value; }
}
public string StreamingXPathExpression
{
get{ return m_sxpathexpression; }
set{ m_sxpathexpression = value; }
}Persistence of design time propertiesNow that we have coded the design time properties, we still need some means to persist their values (configured at design time) so that they will be accessible at run time as well. It is exactly for this purpose, that the IPersistPropertyBag interface was developed. A property bag is basically an abstraction of a persistent storage mechanism. Using name-value pairs, it allows you to read and write data to it. In code, the 2 core interface methods look like:
//Load is called by the environment when the component should load all
// persisted values:
public void Load(IPropertyBag propertyBag, int errorLog)
{
m_prompropname = readBagForStringValue(ref propertyBag, m_propbagkey_prompropname);
m_prompropnamespace = readBagForStringValue(ref propertyBag,
m_propbagkey_prompropnamespace);
m_sxpathexpression = readBagForStringValue(ref propertyBag,
m_propbagkey_sxpathexpression);
}
// Save is called by the environment when the component should save all
// persistent values:
public void Save(IPropertyBag propertyBag, bool clearDirty, bool saveAllProperties)
{
writeBagWithStringValue(ref propertyBag, m_propbagkey_prompropname,
m_prompropname);
writeBagWithStringValue(ref propertyBag, m_propbagkey_prompropnamespace,
m_prompropnamespace);
writeBagWithStringValue(ref propertyBag, m_propbagkey_sxpathexpression,
m_sxpathexpression);
}
//Helper methods for accessing the property bag:
private string readBagForStringValue(ref IPropertyBag bag,
string key, string defaultvalue)
{
object result;
try
{
bag.Read(key, out result, 0);
if(result != null) return (string) result;
else return defaultvalue;
}
catch
{
return defaultvalue;
}
}
private string readBagForStringValue(ref IPropertyBag bag, string key)
{
return readBagForStringValue(ref bag, key, "");
}
private void writeBagWithStringValue(ref IPropertyBag bag, string propertyName, string valueToWrite)
{
object obj_valueToWrite = valueToWrite;
bag.Write(propertyName, ref obj_valueToWrite);
}Implementing the the IBaseComponent propertiesEvery pipeline component should implement IBaseComponent, which basically enables any environment to ask the component for its name, version and description. Since those properties should not be available in the design time property pane, we'll mark them as not browseable:
//Constant pipeline component values:
private const string C_COMPONENTNAME = "Streaming Promotion Pipeline Component";
private const string C_COMPONENTDESC = "BizTalk Server 2004 Pipeline Component" +
" taking care of promoting data using XPath in a streaming implementation.";
private const string C_COMPONENTVER = "1.0.0.0";
//Implementation of the IBaseComponent interface:
[Browsable(false)]
public string Description
{
get{ return C_COMPONENTDESC; }
}
[Browsable(false)]
public string Name
{
get{ return C_COMPONENTNAME; }
}
[Browsable(false)]
public string Version
{
get{ return C_COMPONENTVER; }
}A fancy icon on a valid componentThe IComponentUI enables us both:
A minimal implementation might look like:
public System.Collections.IEnumerator Validate(object projectSystem)
{
System.Collections.Specialized.StringCollection validationResult =
new System.Collections.Specialized.StringCollection();
return (IEnumerator) validationResult.GetEnumerator();
}
[Browsable(false)]
public System.IntPtr Icon
{
get
{
//No icon associated with this pipeline component:
return IntPtr.Zero;
}
}Processing the message dataNow that we've implemented the basic plumbing, let's dive into the core method implementation of the pipeline processing framework: the IComponent interface's Execute method! As described previously, this method will be responsible for providing the next component in line with the resulting processed datastream in which it will work. Since we can split most responsibilities into several methods and custom helper classes, we're able to keep the interface's implementation relatively short:
private IBaseMessage m_messageToPromoteOn;
private CloningStreamSplitter m_streamsplitter;
public IBaseMessage Execute(IPipelineContext pContext, IBaseMessage pInMsg)
{
m_streamsplitter = new CloningStreamSplitter(pInMsg.BodyPart.Data);
//Start new thread, promoting the data found on configured XPath location:
Thread dataPromotionThread = new Thread(new ThreadStart(StartPromotingData));
dataPromotionThread.Start();
m_messageToPromoteOn = cloneIBaseMessage(
pContext.GetMessageFactory(), pInMsg, m_streamsplitter.PrimaryStream);
return m_messageToPromoteOn;
}At this point most of this will probably seem rather unclear. No worries, let's walk through each of these details one by one. Introducing the CloningStreamsplitterFirst of all, we use a class called "CloningStreamSplitter". This class basically:
Why do we need this? Normally, streams (like discussed already) within the pipeline processing framework are non-seekable. This means that once a stream is read, we cannot "undo" this. Another issue with this is, that reading a stream that contains XML data, will most probably be done using the XmlReader class. This class is an excellent tool for parsing XML data, but unfortunately does not offer any handy way of "forwarding" parsed data that has been read. The only way this would be possible, is to write each attribute, element, processing instruction, whitespace etc... out to some buffer mechanism, which in turn could be used again to create some other stream... This is definitely NOT something you'd like to do, believe me! Even when you would succeed in doing this, you'll lose information like non-significant whitespace and CDATA marks. In some scenarios (where signing and encryption is used), this cannot be allowed. So, our CloningStreamSplitter solves those issues because it will be able to produce a literal byte-exact clone of the original stream. (We'll discuss its implementation a little further, so read on.) This allows us to work on the cloned stream, instead of on the original stream. m_streamsplitter = new CloningStreamSplitter(pInMsg.BodyPart.Data); The pInMsg contains one or more message parts of which one will be marked as the body. The "data" property on that body will give us the data stream we can work on. In this case we clone it before performing any actions on any data. A separate thread dedicated to data processingAs we'll discuss later on, our CloningStreamSplitter's implementation will require that both the cloned stream and the original stream will be read by two different threads. One of those threads will be the thread the next pipeline component will use to read the original data stream with, while the other thread is up to us to choose. Since the execute method's purpose is to return the result message the next component has to act on, we can't afford to block the current execute method's calling thread... So the only option left is to create a kind of processing/worker thread, responsible for:
//Start new thread, promoting the data found on configured XPath location: Thread dataPromotionThread = new Thread(new ThreadStart(StartPromotingData)); dataPromotionThread.Start(); The thread will execute the code in the StartPromotingData method, which we'll discuss later on. As for now, we're ready to create the result message. Creating the result messageMessages within the pipeline processing framework always comply with the IBaseMessage interface and follow the factory pattern. The factory required for the creation of such an IBaseMessage instance can be found on the pContext pipeline context parameter, given in the call to Execute. Further, we'll need to:
m_messageToPromoteOn = cloneIBaseMessage( pContext.GetMessageFactory(), pInMsg, m_streamsplitter.PrimaryStream); The cloneIBaseMessage implementation, responsible for preparing the result message, looks like:
//Clones a full IBaseMessage except for the body part:
IBaseMessage cloneIBaseMessage(IBaseMessageFactory msgFactory, IBaseMessage srcMessage)
{
return cloneIBaseMessage(msgFactory, srcMessage, srcMessage.BodyPart.Data);
}
//Clones a full IBaseMessage except for the body part and assigns the new body stream:
IBaseMessage cloneIBaseMessage(IBaseMessageFactory msgFactory, IBaseMessage srcMessage,
Stream newBodyDataStream)
{
IBaseMessage resultMsg = msgFactory.CreateMessage();
//copy context by reference:
resultMsg.Context = srcMessage.Context;
//copy all non-body parts by reference:
copyAllNonBodyMsgParts(srcMessage, resultMsg);
//create new body part:
IBaseMessagePart body = msgFactory.CreateMessagePart();
body.PartProperties = srcMessage.BodyPart.PartProperties;
body.Data = newBodyDataStream;
//add it to the result message:
resultMsg.AddPart(srcMessage.BodyPartName, body, true);
return resultMsg;
}
void copyAllNonBodyMsgParts(IBaseMessage src, IBaseMessage dest)
{
for(int i = 0; i < src.PartCount; i++)
{
//fetch part from original message:
string partname;
IBaseMessagePart part = src.GetPartByIndex(i, out partname);
//copy it to destination message:
if(partname != src.BodyPartName)
dest.AddPart(partname, part, false);
}
}Use of the streaming XPath implementation in data promotionAs previously described, the actual implementation of the data promotion will happen by a dedicated thread, using the XPathReader implementation:
private void StartPromotingData()
{
XPathReader xpreader = null;
try
{
//Create streaming XPath reader, based on a clone of the original datastream:
xpreader = new XPathReader(
new StreamReader(m_streamsplitter.ClonedStream), m_sxpathexpression);
//Read until matched the xpath expression:
xpreader.ReadUntilMatch();
//Promote value found on configured XPath location:
m_messageToPromoteOn.Context.Promote(
m_prompropname,
m_prompropnamespace,
xpreader.Value);
//Even after data found: keep on reading till end of cloned stream:
while(xpreader.Read()){}
}
catch(Exception e)
{
throw new Exception(
"StreamingPipelineComponent failed during promotion of data.", e);
}
finally
{
if(xpreader != null) xpreader.Close();
}
}For more details on streaming XPath and how this can be used within your own scenario's, I'd like to refer to the MSDN articles on this subject. As far as the actual promotion of data goes, this is fairly simple:
//Promote value found on configured XPath location:
m_messageToPromoteOn.Context.Promote(
m_prompropname,
m_prompropnamespace,
xpreader.Value);Each IBaseMessage has a property named "Context", that allows you to promote data on. In this case we'll promote the value, found by the XPathReader, onto the context property configured at design time. Cloning a stream for use in pipeline componentsI will not discuss in detail the code that is responsible for stream cloning. Instead, I'd rather like to give an overview of the basic principles and techniques used in there. For actual implementation details, the code is available for download. The most important constraint to take into account when implementing the CloningStreamSplitter, is the fact that we are implementing a streaming pipeline component. This means that no data, whatsoever, can be kept in memory. In the end this means that the data for the cloned stream can only be provided when two conditions are met:
These two conditions are only met if both the original stream and the cloned stream are read in sync and in order. Assume two threads (like we discussed, this is a requisite in order for this technique to work):
First the original stream has to be read from. That data will be buffered after which it is handed to the originaldatathread. If the originaldatathread reads from the stream again, the call needs to be blocked until the equal amount of bytes has been read from the cloned stream as well. (Remember: the two streams have to be read in sync in order to prevent keeping any data in memory.) The cloned stream will actually be our own System.IO.Stream implementation. This allows data to be returned, repeatedly, from the small data buffer, which is filled while the original data stream is read from. In case the cloneddatathread would request data, which has not been read yet by the originaldatathread, the call has to be blocked until the requested data is available. This way, by intelligently blocking and synchronizing threads, we avoid keeping data in memory. At the same time:
The actual sample source code showing this technique is available for download. ConclusionThis paper was aimed at providing some guidance in pipeline processing with in particular a strong focus on custom pipeline development. After explaining some basic principles and giving an introduction to the various interfaces involved, it drew attention to some things to keep in mind. Finally, a sample walkthrough illustrates how all this looks in code. BizTalk Server provides you with a very powerful pipeline processing framework. This power, however, comes with some drawbacks: to make pipeline components streaming, some effort will be needed. Since a lot can be made generic, good architecture could prove to be a great help. ![]() About the author
| ||||||