NServiceBus: command and event routing in database with scale out

Hi all!

In my current project we want to have all the routing configuration in database, including all the command and event routing. Also we want to use MSMQ+DTC and scale out without using the distributor because we will have endpoints with millions of messages per hour but each message processing is very fast. For that I have implemented a little prototype to check if it is easy to do it:

https://github.com/pablocastilla/NServiceBus-RoutingInDataBase

The main part, where the tricks are done, is the DatabaseRouting project, here we can find:

– DatabaseRoutingBehaviour: there I inject a new step in the NServiceBus pipeline.  In that step addresses for a message type are searched and a “random balancing” is made with them just before the message is sent through the wire.

 



public class DatabaseRoutingBehaviour: IBehavior<OutgoingContext>
{

      //constructors and initializations</pre>
     public void Invoke(OutgoingContext context, Action next)
      {
                //... code removed for explanation ....
                //skipping events and replies
                //looking for endpoints in the database by: source endpoint, source machine, destination endpoint, destination machine
                //...
               CommandRoutingConfiguration finalEndpoint = null;
 
 
               if (possibleEndpoints==null || possibleEndpoints.Count() == 0)
               {
                    throw new Exception("NO ENDPOINT FOUND FOR " + context.OutgoingLogicalMessage.MessageType.ToString());
               }
               //random with the endpoints
               if (possibleEndpoints.Count() > 1)
               {
                     Random rnd = new Random();
                     int selected = rnd.Next(0, possibleEndpoints.Count());
 
                     finalEndpoint = possibleEndpoints[selected];
               }
               if (possibleEndpoints.Count() == 1)
               {
                     finalEndpoint = possibleEndpoints[0];
               }
         
               //change message addresss.
               ((NServiceBus.Unicast.SendOptions)(context.DeliveryOptions)).Destination = new Address(finalEndpoint.DestinationEndpoint, finalEndpoint.DestinationMachine);
                                                                           
               next();
       }
}


public class DynamicRoutingStepInPipeline : RegisterStep
{
    public DynamicRoutingStepInPipeline()
        : base("NewStepInPipeline", typeof(DatabaseRoutingBehaviour), "Looks for an endpoint in the database")
    {
 
        InsertBefore(WellKnownStep.DispatchMessageToTransport);
    }
}

 

– DatabaseRoutingSubscriptionStorage: A new subscription storage is implemented. In that storage subscribers are searched in database without autosubscription. A round robin is done there if several SubscriberEndpoint with the same name are found. Also a new PersistenceDefinition and a new Feature are implemented.

 


public class DatabaseRoutingSubscriptionStorage : ISubscriptionStorage
    {
        public IEnumerable<NServiceBus.Address> GetSubscriberAddressesForMessage(IEnumerable<NServiceBus.Unicast.Subscriptions.MessageType> messageTypes)
        {
   
            var addresses = new List<Address>();                     
 
            IEventSubscriptionRepository rep = new EventSubscriptionRepository(); 
            var subs = rep.GetSubscriptionByTypes(messageTypes);
  
            //if there are subscribers with the same endpoint a random is done between them. 
            var results = from s in subs
                          group s by s.SubscriberEndpoint into g
                          select new { Endpoint = g.Key, Subscriptions = g.ToList() };
 
            foreach (var r in results)
            { 
                if (r.Subscriptions.Count() > 1)
                {
                    Random rnd = new Random();
                    int selected = rnd.Next(0, subs.Count());
 
                    addresses.Add(new Address(r.Subscriptions[selected].SubscriberEndpoint,r.Subscriptions[selected].SubscriberMachine));
                }
                if (subs.Count() == 1)
                {
                    addresses.Add(new Address(r.Subscriptions[0].SubscriberEndpoint, r.Subscriptions[0].SubscriberMachine));
                }
            }
 
                      
            return addresses;
        }
 
        ....
    }

 

– DatabaseRoutingConfiguration: NServiceBus checks if an endpoint exists in the configuration before sending the message, for that every endpoint in the database is introduced in the configuration but anything is done with it after that. This is just a dirty trick which I don’t like to make it work.


   public class DatabaseRoutingConfiguration : IConfigurationSource
    {
        public T GetConfiguration<T>() where T : class, new()
        {
            // the part you are overriding
            if (typeof(T) == typeof(UnicastBusConfig))
            {
                var endpointMappint = new MessageEndpointMappingCollection();

                ICommandRoutingConfigurationRepository rep = new CommandRoutingConfigurationRepository();

                var routingInfo = rep.GetRoutingInfo();
                var messagesAlreadyInConfiguration = new List<string>();
                

                foreach (var ep in routingInfo)
                {
                    if (messagesAlreadyInConfiguration.Contains(ep.MessageType))
                    {
                        continue;
                    }
                    
                    endpointMappint.Add(new MessageEndpointMapping() 
                                        {
                                            AssemblyName=ep.MessageAssembly,
                                            TypeFullName=ep.MessageType,
                                            Endpoint = ep.DestinationEndpoint+"@"+ep.DestinationMachine
                                        });

                    messagesAlreadyInConfiguration.Add(ep.MessageType);
                }

                return new UnicastBusConfig
                {
                    MessageEndpointMappings = endpointMappint
                } as T;
            }
            // leaving the rest of the configuration as is:
            return ConfigurationManager.GetSection(typeof(T).Name) as T;
        }
    }

 

In order to use it we have to configure the bus like this:


    public class EndpointConfig : IConfigureThisEndpoint
    {
        public void Customize(BusConfiguration configuration)
        {
              configuration.UsePersistence<DatabaseRoutingInMemoryPersistence>();
              configuration.DisableFeature<AutoSubscribe>();            
              configuration.Pipeline.Register<DatabaseRouting.DatabaseRoutingStepInPipeline>();
              configuration.CustomConfigurationSource(new DatabaseRoutingConfiguration());</pre>
        }
 }

 

In our project we have introduce it, with much better code, and so far it works really well. To see it in action launch the following projects in the solution: CreateUserHandler, CreateUserHandler2, NewUserSaga (1k are launched here) and UserCreatedHandler

 

Hope this helps to someone.

Advertisements

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s

%d bloggers like this: