CQRS with MongoDB+MassTransit+ASP.NET MVC+AngularJS

Hi all!

It is always funny to be able to start a project from scratch, that way we can try new concepts and technologies. As our projects are very event oriented and the performance is critical I want to create a CQRS (maybe without event source) and put in its core a pub/sub (in this case it will be MassTransit) . I won’t explain these concepts here, as I won’t be able to do it better than Fowler, Udi Dahan , and more … , I just want to give a good little example of how to combine the technologies in a small project that could be a good starting point to work with and later transfer this knowledge to the real big project.

Requirements:

The general architecture of the example is the following

GeneralArchitecture

The backend would be a group of services linked to the bus, in this case we have only one (ReadManagementService), and the “query updater”, the process that keeps the collection that will be queried up to date. The front end is the UI that connects directly to the queries database and to the Bus.

The main goal with this architecture is to achieve more loose coupling and increase the performance (UI will be able to read from a database specially denormalized to gain speed):

StepsDiagram

Lets breakdown the projects in the solution:

  • Messages: they are the messages that goes through the bus, the projects that connect to the bus share this dll in order to have a “common contract”. We have to types
    • Commands: a request for executing an action.
    • Events: something that has happened in the system.
  • QuartzServer: this service receives the messages that has to be sent in the future, for instance tomorrow, and relaunch them at a specific time. This is important for timeouts in sagas and other stuff.  An example is:

Bus.Instance.ScheduleMessage(30.Seconds().FromUtcNow(), new LaunchEngineMessage() { CorrelationId = guid });

  • QueriesUpdater: it is connected to the bus and updates the read database through the events.
  • ReadManagementService: manages the read business. It listens to commands and create events to communicate with others.
  • UIAngular (ASP.NET MVC+AngularJS)
    • Controllers: asp.net controllers to create rest services and manage the view navigation.
    • Scripts/application
      • application: inits the angularjs.
      • /controllers: each angular managed view part has a controller that manages its behaviour.
      • /services: they are “singletons” that are shared across the controllers. We have two: the one that calls the reads REST service and the one that receives calls from the server through SignalR.
    • Views: the initial html and the templates for the angularjs client.

Lets see the code involved in the last diagram:

  1. Angular (javascript) request a command: in the createRead angular controller there is a method, saveNewRead, which invokes the REST service in the server to launch a command (1). The server return the guid with the command id (3), the controller waits for its commandid event to finish (7).
    
    uiAngularControllers.controller('CreateReadController', ['$scope', '$rootScope', '$http', 'ReadsService', function ($scope, $rootScope,$http, ReadsService) {
    
        $rootScope.title = 'Create Read';
    
        $scope.newRead = {
            SerialNumber: "sn66",
            ReadTimeStamp: "2014-04-10",
            Value: "33"
        };
    
        $scope.serviceRequestProgress={
            progress: 0,
            text: "not launched"
        };
    
        $scope.currentRequest = null;
    
        //this function is invoked to launch a command
        $scope.saveNewRead = function () {
    
            $scope.serviceRequestProgress = {
                progress: 33,
                text: "calling server"
            };
    
            var newRead = ReadsService.save($scope.newRead,function(putResponseHeaders){
    
                $scope.currentRequest = putResponseHeaders.TransactionID;
    
                $scope.serviceRequestProgress = {
                    progress: 66,
                    text: "server calling succeed"
                };
            });
    
        };
    
        var unbind = $rootScope.$on('readInsertionFinished', function (event, args) {
    
            if (args.message == $scope.currentRequest)
            {
                $scope.serviceRequestProgress = {
                    progress: 100,
                    text: "read successfully inserted"
                }
            }
    
            $scope.reads = ReadsService.query();
        });
    
        $scope.formats = ['dd-MMMM-yyyy', 'yyyy/MM/dd', 'shortDate'];
        $scope.format = $scope.formats[0];
    
    }]);
    
  2. The UI (the rest service) creates a command and sends it to the bus (2). This is done in ReadsController.cs. It is veeeeeeeeery easy, just publishing in the bus.
    public CommandRequestInfoDto Post([FromBody] DataReadDto read)
            {
                var guid = Guid.NewGuid();
    
                Bus.Instance.Publish(new InsertReadCommand
                                        {
                                            SerialNumber=read.SerialNumber,
                                            ReadTimeStamp = read.ReadTimeStamp,
                                            Value=read.Value,
                                            CorrelationId=guid
                                        });
    
                return new CommandRequestInfoDto(){TransactionID=guid.ToString()};
            }
    
  3. Service receives the command from the bus (4). In the ReadManagementService project, in program.cs, we can find all the configuration needed to make MassTransit work, it is very straight forward, Bus get initialized and it subscribes every consumer through StructureMap IoC.
     //Search for every consumer and add to the container
                var container = new Container(cfg =>
                {
                    cfg.Scan(x =>
                    {
                        x.TheCallingAssembly();
                        x.AddAllTypesOf(typeof(IConsumer));
                    });
    
                });
    
                Bus.Initialize(sbc =>
                {
    
                    sbc.UseRabbitMq();
    
                    sbc.ReceiveFrom("rabbitmq://localhost/mt_readsmanagementservice_receive_queue");
    
                    sbc.Subscribe(subs =>
                    {
                        subs.Saga<ReadsCollectorSaga>(new InMemorySagaRepository<ReadsCollectorSaga>()).Permanent();
    
                    });
    
                    sbc.Subscribe(x => x.LoadFrom(container));
    
                });
    
                container.Inject(Bus.Instance);
    

    Also a message consumer has the following code, there you can see how it publishes an event back to the bus at the end of the method(5).

        public class ReadHasEnteredConsumer : Consumes<InsertReadCommand>.All
        {
            public void Consume(InsertReadCommand msg)
            {
                Console.WriteLine("New read inserted");
    
                DataReadsRepository rep = new DataReadsRepository();
    
                rep.AddDataRead(new DataRead()
                                {
                                    ReadTimeStamp=msg.ReadTimeStamp,
                                    SerialNumber=msg.SerialNumber,
                                    Value=msg.Value,
                                    CommandId=msg.CorrelationId
                                });
    
                Bus.Instance.Publish(new ReadsCollectionChangedMessage() { CorrelationId = msg.CorrelationId });
            }
        }
    
  4. The UI receives the event thanks that it is connected to the bus in the Global.asax (6) and sends it to the browsers through signalR.
         protected void Application_Start()
            {
                AreaRegistration.RegisterAllAreas();
                FilterConfig.RegisterGlobalFilters(GlobalFilters.Filters);
                RouteConfig.RegisterRoutes(RouteTable.Routes);
                BundleConfig.RegisterBundles(BundleTable.Bundles);
                GlobalConfiguration.Configure(WebApiConfig.Register);
    
                Bus.Initialize(sbc =>
                {
                    sbc.ReceiveFrom("rabbitmq://localhost/mt_receive_queue_ui");
    
                    sbc.UseRabbitMq();
    
                    //subscribes to this message to communicate the changes to the clients
                    sbc.Subscribe(subs =>
                    {
                        subs.Handler<ReadsCollectionChangedMessage>((ctx, msg) =>
                        {
                            //calls to readInsertionFinished in every client
                            var hub = GlobalHost.ConnectionManager.GetHubContext<NonPersistentHub>();
                            hub.Clients.All.readInsertionFinished(msg.CorrelationId.ToString());
    
                        });
                    });
                });
            }
    
  5. The service in SignalRHubService.js receives the call from the server and spread and event through the AngularJS controllers (7).
         uiAngularServices.service('SignalRHubService', function ($, $rootScope) {
        var proxy = null;
    
        var initialize = function () {
            //Getting the connection object
            connection = $.hubConnection();
    
            //Creating proxy
            this.proxy = connection.createHubProxy('nonPersistentHub');
    
            //Starting connection
            connection.start();
    
            //Publishing an event when server pushes a readInsertionFinished message
            this.proxy.on('readInsertionFinished', function (msg) {
                $rootScope.$emit("readInsertionFinished", { message: msg });
            });
        };
    
        return {
            initialize: initialize
    
        };
    });
    
    

Well, this is all for now. I just want to give a simple example and an explanation of it. In next posts I will try to cover more advanced details like transactionality, working in a cluster, testing and so on…

Here is the code.

Thanks to the Frank team :).

One comment

  1. I cannot download your example code. Please upload again or send it to thoaingo07@gmail.com. Many thanks

Leave a comment