BT

Facilitating the Spread of Knowledge and Innovation in Professional Software Development

Write for InfoQ

Topics

Choose your language

InfoQ Homepage Articles Database-based High Performance Message Exchange Service for Enterprise Applications

Database-based High Performance Message Exchange Service for Enterprise Applications

Database Message Exchange Service allows messages to be stored in database in order to be picked up by a Windows service and delivered to external services and vice versa. A message can be anything – an order, some task, a message for a destination message queue, a payload for calling external webservice and so on. It decouples the client from the external services that are not in the same network or not always available or do not support high frequency poll needed by front-ends.

Introduction

Database Message Exchange Service allows messages to be stored in a database in order to be picked up by a Windows service and delivered to external services and vice versa. A message can be anything – an order, some task, a message for a destination message queue, a payload for calling an external web service and so on. It decouples the client from the external services that are not in the same network or not always available or do not support high frequency polling needed by clients. The client can perform high frequency polls on the database, without causing a hit on the external services on every poll. Since the messages are stored in a database, they can be replayed in case of failure, they can be archived, and they can be processed in some other priority order than first-in-first-out and so on. Moreover, reports can be generated from the database using the data available in the stored messages. Traditional queue services like MSMQ or IBM MQ aren’t always the right choice to be used directly from the client, especially if you have a Web front-end that performs millions of polls on them and you need to sometimes browse and replay messages in some specific order. Additionally, MIS reports from queue are not possible, so a database-based high performance message queuing and delivery service is preferred. The architecture of the service is flexible as it allows any external service to be integrated via a plug-in model and it can launch multiple threads to read and write in parallel to and from external services. It also offers connection throttling to external services by design and prevents external services from being adversely affected by a sudden increase in traffic on the client. Finally, the service can be deployed in multiple servers to support scalability without requiring any code change.

It’s currently used in production at BT, where we use it to queue messages from a high volume website and the messages are exchanged with an IBM MQ.

Architecture

Database Message Exchange Service uses database tables to store incoming and outgoing messages. There’s a windows service or console application that reads messages from the database and sends them to an external service. Similarly, it reads from an external service and stores them in a database so that a client can pick up the message from the database. The database tables simulate a FIFO queue as described in my earlier article on Building a high performance queue in a database. The architecture is like this:

Figure: Database Message Exchange Service architecture

The architecture of the service is kept very simple to make it easy to configure and deploy it for multiple external services. If you have four different external services, you just deploy four clones of the service. Each instance of the service can talk to only one type of external service but it can launch multiple reader/writer threads both for queueing and dequeueing on database and on external service. The Exchange Manager, which does the controller’s job, launches a configured number of threads to dequeue messages from the database and then launches a configured number of threads to connect and read from the external service. External Service can be anything – another webservice, queue, FTP, file system and so on. You just implement the IExternalProvider interface for each type of external service, configure the number of reader and writer threads you want, and you are done. The whole service is provided as a reusable library so that you can choose how you want to run it – as a console app or as a Windows service.

From the client’s point of view, the client needs to construct a message with a CorrelationID and then enqueue it in the Inbox. Then keep polling at some interval for a response to arrive in Outbox.

That’s from a client’s point of view. On the service side, the ExchangeManager launches N threads to keep polling for messages on Inbox. Once a message is found, it sends the message to the external service using the External Service Provider. Here’s how each thread does its job:

(Click on the image to enlarge it)

In the same way, ExchangeManager also launches N threads to keep polling on the external service. Once a message is found, it stores the message in the Outbox table. If a message arrives with the right CorrelationID, then the client can pick it up by calling RetrieveFromOutbox. Here’s how each thread does its job:

(Click on the image to enlarge it)

The following architecture issues are addressed:

  • Ensuring that a message isn’t picked up by multiple services.
  • Ensuring that messages can be queued at a very high frequency without blowing up the external services.
  • Ensuring that client/front-end can perform high frequency polls to check for arrival of a response to their queued messages.
  • Ensuring that messages are picked up from Inbox immediately by launching multiple database polling threads. If one thread is processing one message and the external service is taking a while to send the message, then the other threads can pick up other messages from Inbox and send them.
  • Ensuring that the external service read happens on multiple threads so that the delay in processing a received message and storing it in Outbox is minimized and maximum utilization of external services capability to deliver messages is ensured.
  • On the event of unavailability of the service, the messages are not lost. Messages are picked up and processed at a steady speed.
  • Ensuring that a sudden arrival of a large amount of messages does not cause high CPU on the service nor does it cause additional stress on the external service. A steady speed of reading and writing on both database and external service is maintained through configurable delays.
  • Easy to scale out. The service can be deployed multiple times on the same computer or in different computers.
  • Easy to start and stop.
  • Ability to debug on a production instance by attaching a debugger.

I have seen such issues from message exchange services in various companies I have worked for. So, I have tried my best to solve these once and for all. It’s a service that’s deployed on production and working very well.

Benefits

Why not use the SQL Server Service Broker? Firstly because it ties you to use the Standard or a higher edition of SQL Server, which has a hefty license cost. Using my approach you can use the free SQL Server Express and install a couple of them on a cheap VM and setup a pretty scalable “Service Broker” cluster. Scaling out is cheap - just install another VM, install the free SQL Express, and you are done. You can build a large cluster of message storage and distribution service without paying for any SQL Server license fee at all. Moreover, you can create the service using pure .NET code, unit test them and debug them easily on production without having to attach a debugger to SQL Server Service Broker. Anyone who has worked with CLR stored procedures knows the pain of releasing newer versions. Pure .NET code makes debugging, crash dumping, exception logging, etc. straightforward and cheap. It gives you the ultimate flexibility since you have full control over a .NET app doing the message exchange work reading from a very simple database schema.

Why not use MSMQ then? Firstly, because MSMQ does not allow you to read messages other than in the FIFO order. You might want to read messages using some other field than the queue date-time. MSMQ won’t let you retry messages once they are dequeued from the queue. You will have to queue them again. There’s no notion of changing message’s state. You either take the message out of the queue or you leave it, but you cannot change the message’s state to process it later. You cannot use any other custom filtering criteria to read messages from the queue besides the FIFO criteria.

Implementation

The controller class ExchangeManager manages a pool of threads of IDBQueueProvider and IExternalProvider implementations. IDBQueueProvider is implemented by SqlDBQueueProvider by default, which takes care of reading messages out of Inbox and writing messages to Outbox. IExternalProvider is what you need to implement depending on what is the external service you are trying to consume. It can be a file system, an external webservice, a MSMQ or IBMMQ, or just an FTP server. You need to implement the ReadAndWait and Send methods to support the message exchange. IExternalProvider takes messages as ExternalMessage, which you can inherit from and use your own class.

(Click on the image to enlarge it)

As a consumer, the only class you need to deal with is the ExchangeManager. It does everything for you. Here’s how you can use it form a console app:

static void Main(string[] args)

{

   Console.WriteLine("Starting...");

   var manager = new ExchangeManager(() => new ExternalProviderMQImpl(),

      () => new SqlDBQueueProvider(Config.ConnectionString),

      new Log4NetLogger(),

      Config.ExternalThreadCount,

      Config.DBThreadCount,

      Config.DBPollInterval,

      Config.ExternalReadInterval,

      Config.ExternalReadTimeout,

      Config.StopTimeout);

   manager.Start();

   Console.WriteLine("Started. Press ENTER to close.");

   Console.ReadLine();

   manager.Stop();

}

Once you call Start(), it launches the specified number of DB threads and External threads. The above code shows how to use it in a console app. For a Windows service, in the OnStart event, you just call Start() and in the OnStop event, call Stop().

ExchangeManager uses the SmartThreadPool library to manage the threads. It’s another open source library that allows fine control over threads, especially when you want to start a group of threads and then stop a group of threads.

During Start, ExchangeManager creates a number of threads to listen to the database:

/// <summary> 

/// Launch the threads that will poll DB at certain intervals 

/// and if any message found in Inbox, it will use the External Provider 

/// to send the message to the Queue. 

/// </summary> 

private void StartDBReaderThreads() 

   var me = this; 

   for (int i = 0; i < this.DBPollerThreadCount; i++) 

   { 

      this.ThreadPool.QueueWorkItem(() => 

      { 

         me.Logger.Debug("Starting DB Reader Thread " + i + "..."); 

         var dbQueueProvider = me.DBQueueProviderBuilder(); 

         var externalProvider = me.ExternalProviderBuilder(); 

         try 

         { 

            me.Logger.Debug("Initializing DB Provider..."); 

            dbQueueProvider.Initialize(); 

            me.Logger.Debug("Initializing External Provider..."); 

            externalProvider.Connect(); 

            while (!me.Stopped) 

            { 

               try 

               { 

                  var inboxItems = dbQueueProvider.DequeueInbox(); 

                  if (inboxItems != null) 

                  { 

                     foreach (InboxItem item in inboxItems) 

                     { 

                        me.Logger.Debug("Sending message to External " + item.ItemID + "..."); 

                        me.Logger.LogInboxItem(item); 

                        try 

                        { 

                           externalProvider.Send(new ExternalMessage 

                           { 

                              CorrelationID = item.CorrelationID, 

                              Data = item.TextData 

                           }); 

                        } 

                        catch (ThreadAbortException) 

                        { 

                           throw; 

                        } 

                        catch (Exception x) 

                        { 

                           if (!externalProvider.TryRecoverFromFault(x)) 

                           return; 

                        } 

                        me.Logger.Debug("Sent message to External " + item.CorrelationID); 

                    } 

                 } 

             } 

             catch (ThreadAbortException) 

             { 

                throw; 

             } 

             catch (Exception x) 

             { 

                me.Logger.Error(x); 

                if (!dbQueueProvider.TryRecoverFromFault(x)) 

                return; 

             }

It first creates one instance of the IDBQueueProvider and one instance of IExternalProvider. Then it runs a continuous loop to periodically poll from DB and check if there’s any message in the Inbox table to pick it up. Once it finds a list of messages, it then sends each message through the IExternalProvder.Send() function.

Similarly it creates a number of threads to listen and wait on IExternalProvider. When a message arrives, it converts the ExternalMessage to OutboxItem and then stores in the Outbox folder for the client to pickup.

/// <summary> 

/// Launch the threads for reading from external service. 

/// </summary> 

private void StartExternalReaderThreads() 

   var me = this; 

   for (int i = 0; i < this.ExternalReaderThreadsCount; i++) 

   { 

      me.ThreadPool.QueueWorkItem(() => 

      { 

         me.Logger.Debug("Starting DB Reader Thread " + i + "..."); 

         var dbQueueProvider = me.DBQueueProviderBuilder(); 

         var externalQueueProvider = me.ExternalProviderBuilder(); 

         try 

         { 

            me.Logger.Debug("Initializing DB Provider..."); 

            dbQueueProvider.Initialize(); 

            me.Logger.Debug("Initializing External Provider..."); 

            externalQueueProvider.Connect(); 

            while (!me.Stopped) 

            { 

               var message = default(ExternalMessage); 

               try 

               { 

                  // This is a blocking call, until message arrives or we get 

                  // some exception, execution does not proceed 

                  message = externalQueueProvider.ReadAndWait(this.ExternalReadTimeout); 

               } 

               catch (ThreadAbortException) 

               { 

                  throw; 

               } 

               catch (Exception x) 

               { 

                  if (!externalQueueProvider.TryRecoverFromFault(x)) 

                  return; 

               } 

               if (message != null) 

               { 

                  me.Logger.Debug("Received message: " + message.CorrelationID); 

                  me.Logger.LogMessage(message); 

                  try 

                  { 

                     var outboxItem = new OutboxItem 

                     { 

                        CorrelationID = message.CorrelationID, 

                        TextData = message.Data, 

                        ItemDateTime = DateTime.Now 

                     }; 

                     me.Logger.Debug("Enqueue outbox item: " + outboxItem.CorrelationID); 

                     me.Logger.LogOutboxItem(outboxItem); 

                     dbQueueProvider.EnqueueOutbox(outboxItem); 

                  } 

                  catch (ThreadAbortException) 

                  { 

                     throw; 

                  } 

                  catch (Exception x) 

                  { 

                     me.Logger.Error(x); 

                     if (!dbQueueProvider.TryRecoverFromFault(x)) 

                     return; 

                  } 

                  me.Logger.Debug("Queued message in Outbox: " + message.CorrelationID); 

               } 

               else 

               { 

                  me.Logger.Debug("Got null message from External."); 

               } 

               Thread.Sleep(this.ExternalReadInterval); 

Here it calls IExternalProvider.ReadAndWait() which should make a blocking call to the external service and return a message. It’s fine if it does not return any message. For example, you might be checking a file path regularly for any new file to appear and when there’s no file, return null. If you are reading from MSMQ, then the ReadAndWait() is a blocking read.

Once a message is found, it stores the message in the Outbox table.

Each of the interfaces are documented in detail to tell you exactly what you need to do and what you should not do in the implementation. For example, the IExternalProvider is the one you implement for each and every external service.

public interface IExternalProvider : IDisposable 

   /// <summary> 

   /// Connect to the External provider. For MQ, establish the connection 

   /// to the MQ here. For http, open http connection here. 

   /// </summary> 

   void Connect(); 

 

   /// <summary> 

   /// Disconnect from external provider. Eg close connection to MQ or http. 

   /// </summary> 

   void Disconnect(); 

 

   /// <summary> 

   /// Disconnect and Connect. You can either just call Disconnect and Connect 

   /// or implement some custom code for handling a reconnection scenario. 

   /// Reconnect is called when there's some problem with connection, say connection 

   /// dropped abnormally. 

   /// </summary> 

   void Reconnect(); 

 

   /// <summary> 

   /// A blocking read call to the external service. This function should not 

   /// return unless some data is available. So, for MQ, you call Read() which 

   /// blocks thread until a message arrives. For http service, you can keep 

   /// polling an http endpoint here. 

   /// If you can't implement a blocking call, then if there's nothing to return, 

   /// return null. 

   /// The blocking call must not block more than the given timeout. 

   /// </summary> 

   /// 

   /// <returns>Null if no data. Otherwise an ExternalMessage</returns> 

   ExternalMessage ReadAndWait(TimeSpan timeout); 

 

   /// <summary> 

   /// Send a message to destination. Destination can be a MQ. 

   /// </summary> 

   /// <param name="message">Message to send</param> 

   void Send(ExternalMessage message); 

 

   /// <summary> 

   /// Check if the exception is one of those exception that 

   /// can be recovered from. For example, connection dropped. 

   /// Try Reconnect() within this function. If there's exception 

   /// from Reconnect(), then return false to indicate permanent 

   /// failure. Return true if the connection was recovered. 

   /// </summary> 

   /// <param name="x"></param> 

   /// <returns>True if the exception is recoverable. False if permanent exception.</returns> 

   bool TryRecoverFromFault(Exception x); 

}

Once you implement this interface as per the specification, ExchangeManager will take care of calling it right away, getting messages from external sources, sending messages queued in database and so on.

Installation

First you create a console application or a Windows service. The sample implementation comes with a console application. You can use it as is, all you need to do is write the code inside the ExternalProviderMQImpl class for the specific external service you want to connect to. Then you need to create a database and run the scripts in the DB folder of the resource file (MQtoDBQueue.zip). Then configure the app.Config with the right connection string and that’s it.

<configuration> 

   <connectionStrings> 

      <add name="QueueDBConnectionString" connectionString="Server=.\SQLEXPRESS;Integrated Security=SSPI;Initial Catalog=DBQueue;"/> 

   </connectionStrings> 

   <appSettings> 

      <add key="PollFromDBDuration" value="00:00:00.100"/> 

      <add key="PollFromExternalDuration" value="00:00:00.100"/> 

      <add key="NoOfThreadsOnExternal" value="4"/> 

      <add key="NoOfThreadsOnDB" value="4"/> 

      <add key="ExternalReadTimeout" value="00:00:05"/> 

      <add key="StopTimeout" value="00:00:10"/> 

   </appSettings>

Let’s go through the configuration settings as they are key to handle scalability and throttling:

  • PollFromDBDuration: This is the interval between two poll requests on the Inbox table to check for pending message to pick up. If it’s too high, then there’ll be lag in pickup up messages after they get queued. If it’s too low, then the Inbox table will get too frequent poll calls.
  • NoOfThreadsOnDB: How many parallel threads to run that will keep polling the database to pickup messages from the Inbox. “1” can be fine. But if you have a very high number of messages getting queued in Inbox and the external service is taking is too much time to send them and thus messages are getting piled up in Inbox, you can increase this number and launch more parallel threads to read messages from Inbox and send them.
  • PollFromExternalDuration: The interval between two ReadAndWait() calls on external service. If you have a blocking read call on external service, for ex MSMQ, then it can be very low. But if you are making some external webservice call, then you need adjust it according to the capacity of the external service.
  • NoOfThreadsExternal: How many parallel threads to run that will call ReadAndWait() on external service.
  • ExternalReadTimeout: How long to wait for a single ReadAndWait() call. It needs to be inside the implementation of ReadAndWait().
  • StopTimeout: How long to wait after Shutdown() has been called and threads are not stopping and need to abort them.

Once you have configured these, you are good to go.

Conclusion

Database Message Exchange Service helps you build a high performance scalable intermediate queue or storage for messages that need to be exchanged with external services. It allows high frequency poll from the client/front-end as it happens on the database, not directly on the external service, thus decoupling the front-end from the external services. It allows reliable storage, retry, failover and reporting of messages that aren’t available in traditional queueing solutions. Pure .NET code makes it easy to debug, crash dump, maintain and extend.

Resources: MQtoDBQueue.zip

About the Author

Omar AL Zabir is the Chief Architect of SaaS Platform at BT, UK. He was the co-founder and CTO of a popular Web 2.0 startup - Pageflakes, that got acquired by MySpace founder. Omar is a Microsoft MVP and CodeProject MVP. He writes articles mostly on AJAX and .NET Performance & Scalability. Omar has published “Building a Web 2.0 Portal using ASP.NET 3.5” from O’Reilly. He has several popular Open source projects and blogs at his site http://omaralzabir.com.

Rate this Article

Adoption
Style

BT