Publisher / Subscriber with Windows Azure Service Bus and WCF

Hi all! Let’s talk about a Pub/Sub.

Motivation

A classical SOA architecture, that mainly uses Workflow Foundation for orchestrations and WCF for services, needs somekind of business event processing.

soawithoutevents

These events usually aren’t calls that follows a normalized business process flow. The events are generated by services which don’t need to know whichever listen to them. So, for implementing this behaviour we need a Publisher/Subscriber. The event generators publish messages and the listeners have to subscribe to them.

As we want to be able to deploy the software in Azure or local using Windows Azure Service Bus and its topics seems a good choice.

soawithevents

Installing Windows Azure Service Bus

The installation of the WASB is straight forward… for a little moment. As the installation is not the point of this post here you have useful links that saved me time and frustration:

http://joe.blog.freemansoft.com/2013/02/installing-microsoft-message-bus-for.html

http://abhishekrlal.com/2011/03/22/clientsdkconfig/

This is a nice UI for managing it:

http://code.msdn.microsoft.com/windowsazure/Service-Bus-Explorer-f2abca5a

Transactions with Windows Azure Service Bus

The main issue talking about transactions with this bus, and I suppose with every bus in the cloud, is that we don’t have the possibility of creating distributed transactions. For this reason we are not totally robust between our process and the reading/writing from its queues. So at the end we have two independent transactions, the one that takes the message from the topic queue and the one that process the message and writes in local DDBBs…

What could happen?

1) We could read from the Azure topic queue

2) We execute our business process in another transaction

3) We commit the business process transaction

4) We commit the read from azure

If the execution fails between 1 and 3, 3 not included, the message and the process is all rollbacked and retried.

If the execution fails between 3 and 4, 4 not included, the business process has been executed but the message reading is not committed. So we are going to receive the message again. We have to consider this case.

If the execution reach point 4 all is ok :).

So we need to take into account the possibility of receiving already processed messages. This is the way we are going to do it:

  • 1) TX reading from the topic subscription queue.
    • 2) TX for processing the event in WCF, different from the topic subscription queue one.
    • 3) Checks if the message hash has been already processed.
      • 3a) if not the invoker processes it and stores the message hash as a processed message .
      • 3b) else… do nothing
    • 4) Commits the business transaction
  • 5) Commits the Azure transaction

The general solution schema

The idea I look for is to have a very simple library that would contain all the needed WCF stuff. So we could have this projects in our solution:

  • ConcreteProjectEvents: event DTOs for our projects
  • PubSubLibrary:
    • The service interface (IEventNotification)
    • A client for that interface
    • A ServiceHostFactory that configures the Azure subscriptions and the needed behaviours.
    • A behaviour for detecting duplicated calls.
  • Clients and services

Let’s now see the important parts of the code:

This is the general interface that every subscriber has to implement.


    [ServiceContract()]
    public interface IEventNotification
    {
        [OperationContract(IsOneWay = true)]
        [ReceiveContextEnabled(ManualControl = true)] //needed for manually committing the azure topic transaction.
        void OnEventOccurred(T value);

    }

The previous steps, the ones from 2 to 5, are executed in a IOperationInvoker, here is the invoke method (you will see two transactions, tran and topicReadingTran):


        ///
/// If it has been invoked it skips the message, else it executes it.
        ///
        public object Invoke(object instance, object[] inputs, out object[] outputs)
        {
            var incomingProperties = OperationContext.Current.IncomingMessageProperties;
            var property = incomingProperties[BrokeredMessageProperty.Name] as BrokeredMessageProperty;

            //this transaction wraps the operation execution and the messaging processing.
            using (var tran = new TransactionScope(TransactionScopeOption.RequiresNew, new TransactionOptions() { IsolationLevel = IsolationLevel.ReadCommitted }))
            {

                ReceiveContext receiveContext;
                if (ReceiveContext.TryGet(incomingProperties, out receiveContext))
                {
                    var rep = new AzureServiceBusAuxRepository();
                    var hash = HashUtil.CalculateMD5Hash(inputs[0]);

                    if (rep.IsMessageProcessed(hash, subscription))
                    {
                        //message was already processed. The operation won't be invoked.
                        outputs = new object[0];

                    }
                    else
                    {
                        //invokes the operation
                        this.invoker.Invoke(instance, inputs, out outputs);

                        //insert in processed messages database
                        rep.InsertMessageProcessed(hash, subscription);
                    }

                    //commits the method+messaging insertion transaction.
                    tran.Complete();

                    //commits the reception in the service bus
                    using (var topicReadingTran = new TransactionScope(TransactionScopeOption.Suppress))
                    {
                        receiveContext.Complete(TimeSpan.FromSeconds(10));
                        topicReadingTran.Complete();
                    }

                    return null;
                }
                else
                {
                    throw new InvalidOperationException("...");
                }

            }

        }

All of this is supported by a database table with three fields: MessageHash, Subscription and ProcessedTime. When a message is received the OperationInvoker checks if it has been processed, when it is processed the hash of the message and the subscription are stored. If the same message comes again it won’t be processed.


    ///
/// checks if the messages have been already processed
    ///
    class AzureServiceBusAuxRepository
    {
        ///
/// is the message already processed for that subscription?
        ///
        ///message hash
        ///subscription
        ///
        public bool IsMessageProcessed(string hash, string subscription)
        {
            using (SqlConnection connection = new SqlConnection(Settings.Default.DETECTDUPLICATEBEHAVIORCONNECTIONSTRING))
            {
                connection.Open();

                SqlCommand command = new SqlCommand("select count(*) from ProcessedMessages where MessageHash = @p0 and Subscription = @p1",connection);

                command.Parameters.Add(new SqlParameter { ParameterName = "p0", Value = hash });
                command.Parameters.Add(new SqlParameter { ParameterName = "p1", Value = subscription });

                return (int)command.ExecuteScalar()>0;
            }

        }

        ///
/// Inserts a new processed message for a given subscription.
        ///
        ///
        ///
        public void InsertMessageProcessed(string hash, string subscription)
        {
            using (SqlConnection connection = new SqlConnection(Settings.Default.DETECTDUPLICATEBEHAVIORCONNECTIONSTRING))
            {
                connection.Open();

                SqlCommand command = new SqlCommand("INSERT INTO ProcessedMessages(MessageHash,Subscription,ProcessedTime) VALUES (@p0,@p1,@p2)", connection);

                command.Parameters.Add(new SqlParameter { ParameterName = "p0", Value = hash });
                command.Parameters.Add(new SqlParameter { ParameterName = "p1", Value = subscription });
                command.Parameters.Add(new SqlParameter { ParameterName = "p2", Value = DateTime.UtcNow });

                command.ExecuteNonQuery();
            }

        }
    }

The service configuration involves the following:

A ServiceHostFactory that configures the topic subscription, the oauth if needed and the duplicate behaviour detection. Here is the code:


  public class ReliableServiceBusServiceHost : ServiceHost
    {

        public ReliableServiceBusServiceHost(Type t, Uri[] baseAddresses) :
                base( t, baseAddresses ) {}

        protected override void OnOpening()
        {
            base.OnOpening();

            //takes the servicebus endpoints
            var serviceBusEndPoints = this.Description.Endpoints.Where(e => e.Binding.Name == "NetMessagingBinding").ToList();

            CreateSubscriptions(serviceBusEndPoints);

            if (Settings.Default.OAUTHENABLED)
            {
                AddOAuthToEndpoint(serviceBusEndPoints);
            }

            if (Settings.Default.DETECTDUPLICATEBEHAVIOR)
            {
                AddDetectDuplicateBehavior(serviceBusEndPoints);
            }

        }

        ///
/// Adds OAuth configuration
        ///
        ///
        public static void AddOAuthToEndpoint(List epList)
        {
            // Machine name
            var networkCredential = new NetworkCredential(Settings.Default.OAUTHWINDOWSUSERNAME, Settings.Default.OAUTHWINDOWSPASSWORD, Settings.Default.OAUTHWINDOWSDOMAIN);

            // The TransportClientEndpointBehavior specifies the Service Bus credentials for a particular endpoint
            var stsUris = new List { new Uri(Settings.Default.OAUTHURL) };
            var transportClientEndpointBehavior = new TransportClientEndpointBehavior
            {
                TokenProvider = TokenProvider.CreateOAuthTokenProvider(stsUris, networkCredential)
            };

            epList.ForEach(ep => ep.Behaviors.Add(transportClientEndpointBehavior));
        }

        ///
/// Adds duplicate messages detection to the endpoint
        ///
        ///
        public static void AddDetectDuplicateBehavior(List epList)
        {

            epList.ForEach(ep => ep.Contract.Operations.ToList().ForEach(o => o.Behaviors.Add(new DetectDuplicateMessagesFromServiceBusBehaviour(ep.ListenUri.Segments.Last()))));
        }

        public static void CreateSubscriptions(List epList)
        {
            //create subscription if it doesn't exists
            var nsManager = NamespaceManager.Create();

            foreach (var ep in epList)
	        {
                //get the topic name ...\TOPIC\Subscriptions\SubcriptionName
                var topicName = ep.ListenUri.Segments.Reverse().ToArray()[2].Trim('/');
                var subscriptionName = ep.ListenUri.Segments.Last();

                if (!nsManager.TopicExists(topicName))
                    nsManager.CreateTopic(topicName);

                var topicInfo = nsManager.GetTopic(topicName);

                if (!nsManager.SubscriptionExists(topicInfo.Path, subscriptionName))
                    nsManager.CreateSubscription(topicInfo.Path, subscriptionName);

        	}
        }
    }

The service host factory is configured in the .svc file:


<%@ ServiceHost Factory="PubSubLibrary.WCFConfiguration.ReliableServiceBusServiceHostFactory"
Language="C#" Service="WCFSubscriber.WCFServiceSubscriber"
CodeBehind="WCFServiceSubscriber.svc.cs" %>

And the important parts in the .config of the service:


 <services>
    <service name="WCFSubscriber.WCFServiceSubscriber">
        <endpoint name="WCFServiceSubscriber" listenUri="sb://machinename/ServiceBusDefaultNamespace/TEMPERATUREWCF/subscriptions/WCFSub2" address="sb://machinename/ServiceBusDefaultNamespace/TEMPERATUREWCF" binding="netMessagingBinding" contract="PubSubLibrary.IEventNotification`1[[PubSubLibrary.Event, ConcreteProjectEvents, Version=1.0.0.0, Culture=neutral, PublicKeyToken=null]]" />
    </service>
 </services>

  <appSettings>
    <!-- Service Bus specific app setings for messaging connections -->
    <add key="Microsoft.ServiceBus.ConnectionString" value="Endpoint=sb://machinename/ServiceBusDefaultNamespace;StsEndpoint=https://techlab.sgs.telvent.com:9355/ServiceBusDefaultNamespace/$STS/OAuth/;RuntimePort=9354;ManagementPort=9355;WindowsUsername=user;WindowsDomain=domain;WindowsPassword=password" />
  </appSettings>


With this we have finished all the service configuration. The client code is much easier, it is just one method, no service reference is needed:


public class WCFServiceBusClient 
    {
        public static void OnEventOccurred<TEvent>(TEvent value, string caller, string endpointName) 
            {
                // Send a list of dummy messages for WCF.
                var channelFactory = new ChannelFactory<IEventNotification<TEvent>>(endpointName);

                try
                {
                    //add oauth authentication
                    if (Settings.Default.OAUTHENABLED)
                        ReliableServiceBusServiceHost.AddOAuthToEndpoint(new List<ServiceEndpoint>() { channelFactory.Endpoint });

                    // Create a channel.
                    var channel = channelFactory.CreateChannel();

                    using (new OperationContextScope((IContextChannel)channel))
                    {
                        // Attach metadata for detecting if it is repeated.
                        var bmp = new BrokeredMessageProperty();
                        bmp.MessageId = HashUtil.CalculateMD5Hash(value);

                        OperationContext.Current.OutgoingMessageProperties.Add(
                            BrokeredMessageProperty.Name, bmp);

                        channel.OnEventOccurred(value);
                    }
                }
                finally
                {
                    channelFactory.Close();
                }
                
            }
    }

The call from the client is one line:

     WCFServiceBusClient.OnEventOccurred((Event)new TemperatureChangedEvent(27, now), "test", "WCFServiceBusTemperaturePUBSUBEndPoint");

with a very simple .config:


    <client>
      <endpoint address="sb://techlab.sgs.telvent.com/ServiceBusDefaultNamespace/TEMPERATUREWCF"
        binding="netMessagingBinding" 
        contract="PubSubLibrary.IEventNotification`1[[PubSubLibrary.Event, ConcreteProjectEvents, Version=1.0.0.0, Culture=neutral, PublicKeyToken=null]]" name="WCFServiceBusTemperaturePUBSUBEndPoint" />
    </client>




Final considerations

  • Because of the lack of distributed transactions you can have duplicate messages sending to the topic subscription queue, but you can avoid it enabling the duplicate messaging detection in the queue, that would avoid two equal messages in the topic queue, but not processing a message more than once.
  • The WCF service has to be in autostart mode for collecting messages from the topic.
  • The code is a prototipe, of course it has to be modified before inserting it in a real project.
  • You will find OAuth authentication in the code, I had to use it because I wasn’t in the same domain than the Service Bus.

Here is the code.

Hope this helps.

Thanks to the Frank team :).

Advertisements

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s

%d bloggers like this: