Wednesday 18 November 2009

Cloning Messages in BizTalk Server when CanSeek is False

Introduction

When writing custom pipeline components for BizTalk Server 2006 there are times when you want to access a copy of a message, rather than the actual message being processed. Typically this is because the original data stream containing the message is not seekable, meaning that by reading the stream you prevent the next component within the pipeline from doing so.

The BizTalk Server 2006 documentation reports that the Data method of the Microsoft.BizTalk.Message.Interop.IBaseMessagePart interface provides such a facility. According to the documentation:

"In custom pipeline components, Data clones an inbound data stream, whereas GetOriginalDataStream returns the original inbound stream."

The testing I've done, however, shows that this is only the case if the original data stream is itself seekable. It seems likely that this is because a stream must be read in order to be cloned, and if it is not seekable it could never be returned to its original position following the clone. So BizTalk Server cannot honour the request to clone the stream if the stream is not seekable.

This article describes a utility method, GetSafeDataStream, which retrieves a stream which can be safely read without adversely affecting the original data stream.

Caveat: The information in this article applies to BizTalk Server 2006. Your mileage with other versions may vary.

Implementing GetSafeDataStream

Our GetSafeDataStream method will take a Microsoft.BizTalk.Message.Interop.IBaseMessagePart parameter and return a System.IO.Stream. We need the IBaseMessagePart so we have access to both the Data property and the GetOriginalDataStream method. Both of these members return a Stream. We can thus provide a stub implementation as follows:

internal static System.IO.Stream GetSafeDataStream(Microsoft.BizTalk.Message.Interop.IBaseMessagePart messagePart)
{
  return null;
}

We now need to provide an actual implementation for the method. The first thing we need to do is establish whether or not the original data stream is seekable, because if it is we have nothing more to do.

  // obtain a reference to the original message stream
  System.IO.Stream originalDataStream = messagePart.GetOriginalDataStream();

  // if the original data stream is seekable, then return it as the caller can simply reset
  // the position once it's been read
  if (originalDataStream.CanSeek == true)
  {
    return originalDataStream;
  }

If the original stream is not seekable, then we need to ask BizTalk Server to provide a clone of it - knowing that it might not actually provide one. Note that when asking BizTalk Server to provide a clone, we have to allow for the fact that an exception might be thrown. For example, ICSharpCode.SharpZipLib.Zip.ZipInputStream throws a System.NotSupportedException when the Data property is accessed. This implies that both that a ZipInputStream is not seekable, and that it doesn't behave as BizTalk Server expects when an attempt it make to perform a seek operation. It could be that CanSeek returns true but an attempt to set Position throws the exception, or that simply calling CanSeek throws the exception. As I say - this is by implication only, I haven't checked.

One other 'gotcha' is that we have to access the Data property after having checked whether the original stream was seekable because if the Data property does actually return us a clone, it will sometimes (depending upon the type of stream we're dealing with) leave the original stream pointing at its end rather than seeking back to the beginning once the clone has been made. Nice.

  // obtain a reference to what the BizTalk Server 2006 documentation claims will be a clone of
  // the original message stream; note that we do this /after/ having checked whether the original
  // stream was seekable because if the Data property does actually return us a clone, it will
  // sometimes (depending upon the type of stream we're dealling with) leave the original stream
  // pointing at its end rather than seeking back to the beginning once the clone has been made
  System.IO.Stream theoreticalClone;
  try
  {
    theoreticalClone = messagePart.Data;
  }
  catch (System.NotSupportedException)
  {
    // testing has shown that some streams (e.g. ICSharpCode.SharpZipLib.Zip.ZipInputStream)
    // throw a System.NotSupportedException when an attempt is made to clone them
    theoreticalClone = null;
  }

Now that we've successfully accessed what BizTalk Server claims to be a clone of the original data stream, we need to establish whether it actually is or not. We do this by comparing the 'clone' with the original data stream - if they reference different objects that we can assume that the 'clone' is genuine.

  // if the theoretical clone is /actually/ a clone, then return it because the caller can do
  // whatever it wants with a clone
  if (theoreticalClone != null && theoreticalClone.Equals(originalDataStream) == false)
  {
    return theoreticalClone;
  }

If we've got this far we know that:

  • the original data stream is not seekable, and
  • BizTalk Server is unable to provide us with a clone of the original data stream.

But we still need to obtain a stream which our caller can safely use without compromising the original data stream. At this point we turn to the SeekableReadOnlyStream and VirtualStream classes provided within the SDK samples. SeekableReadOnlyStream, in concert with VirtualStream, implements a seekable read-only stream which uses buffering if the underlying stream is not seekable. A memory buffer is used initially, although this overflows to disk if a given threshold is reached.

  // otherwise, we need to replace the stream with one which is seekable
  else
  {
    System.IO.Stream seekableReadOnlyStream = new SeekableReadOnlyStream(originalDataStream);
    messagePart.Data = seekableReadOnlyStream;
    return seekableReadOnlyStream;
  }

The behaviour of SeekableReadOnlyStream is to provide a wrapper around the original stream: when a Read operation is performed on the SeekableReadOnlyStream it performs a Read operation on the wrapped stream, places the data in a buffer, and returns the data from that buffer. Should the caller need to perform a seek operation, the SeekableReadOnlyStream merely re-positions the pointer to its internal buffer and begins reading from there. You'll find SeekableReadOnlyStream.cs and VirtualStream.cs in C:\Program Files\Microsoft BizTalk Server 2006\SDK\Samples\Pipelines\ArbitraryXPathPropertyHandler, amongst other places.)

Calling GetSafeDataStream

Callers of GetSafeDataStream should not need to worry about what type of stream has been returned to them. Callers must, however, replace the data stream associated with the IBaseMessagePart with that returned by GetSafeDataStream to ensure that any following pipeline components read through the SeekableReadOnlyStream rather than attempting to read the (non-seekable) original stream. Calls must therefore assign the returned stream to the Data property. Remember that GetSafeDataStream might have just returned a reference to the original data stream anyway, so this assignment will have no effect in such cases.

Finally, although it seems unlikely, BizTalk Server could theoretically return a clone of the original data stream which is not seekable. We could test for this (and not reset the stream's position) but doing so would mean the data within the stream would be silently 'swallowed' by our pipeline component - a situation which might be difficult to diagnose. It therefore seems better to avoid testing for this (unlikely) situation so the problem is clearly highlighted when the exception is thrown.

A typical call to GetSafeDataStream would therefore be as follows:

public Microsoft.BizTalk.Message.Interop.IBaseMessage Execute(Microsoft.BizTalk.Component.Interop.IPipelineContext pContext, Microsoft.BizTalk.Message.Interop.IBaseMessage pInMsg)
{
  // wrap the message stream in a stream which is seekable (if the original stream isn't seekable
  // already)
  System.IO.Stream safeDataStream = GetSafeDataStream(pInMsg.BodyPart);

  // TODO: Use safeDataStream in some way

  // move the stream pointer back to the start; could theoretically throw an exception if BizTalk
  // Server were to return a non-seekable clone of the original stream within GetSafeDataStream
  safeDataStream.Position = 0L;

  // assign the safe data stream to the BodyPart (in case it was a SeekableReadOnlyStream)
  pInMsg.BodyPart.Data = safeDataStream;

  return pInMsg;
}

Summary

We have implemented a simple utility method, GetSafeDataStream, which provides access to a stream which a pipeline component can safely read without worrying about the effect that doing so might have on the original data stream. We also saw a simple example of how GetSafeDataStream might be called.

1 comment:

  1. Good stuff mate! Came in very useful in regard to a custom resolver I was writing that is being called from an ESB itinerary.

    ReplyDelete