CQRS with event sourcing using NServiceBus, Event Store, Elastic Search, AngularJS and ASP.NET MVC (part 3)

CQRS+event sourcing posts

Introduction

Command + event sourcing

Read model: Elastic Search and event store projections

UI

The Q

So we have a lot of streams with a lot of business events inside them (Bank-Client-1: ClientCreated, MoneyDeposited, MoneyWithdrawn…) . This events are not very “queriable”, we just assign more events to business entities, that’s all. So… how do we read? The overall idea is to process all or part of the events to another database, or even to a several, and query them there. This is very clean, we save the entities by the events that happen in the system and later we can have several technologies for query them. This way we use more tools in the system: good writing technologies and good quering ones, not one for everything that does a so-so job.

In the example we will use two technologies to read:

– EventStore projections: with these you can process the events to store some data or create new events. We have two in the example:

– TotalMoneyInTheBank: this projection starts with a state that has a property of TotalMoney set to 0. For every MoneyDeposited event it adds that quantity to the TotalMoney property, for MoneyWithdrawn it substract the quantity. This is the way we join every client stream to know how much money is in the box.

– PossiblyStolenCards: this projection is more complex. In each client stream it looks for if some money has been withdrawn for an ATM. If a client has had multiple withdrawn in less than 2 minutes we think that card could be stolen. In that case we emit another event, which will go to PossibleStolenCardsClients. So we read events and can create another events which can be also read.

– ElasticSearch: here we create denormalized documents with information. For instance, for each client we will fill its info with each event we process.

    public class ClientInformation
    {
        public string ID { get; set; }

        public string Name { get; set; }

        [ElasticProperty(Type = FieldType.Double)]
        public double Balance { get; set; }

        [ElasticProperty(Type = FieldType.Date)]
        public DateTime LastMovement { get; set; }

        public bool PossiblyStolen { get; set; }
    }

 

How do we get the info from EventStore and send it to ElasticSearch?

In the code there is a component called ElasticSearchSynchronizer, this service subscribes to every event that is inserted in the EventStore, this way this will get notified every time a new event is saved. For instance it will get notified of every money withdrawn, from that it will update the client information and create a new AmountDepositedInTheBank in ElasticSearch. This one is very similar to the event in EventStore, but as we have created it in ElasticSearch we can query them easily.


private void ConnectToEventstore(){

       latestPosition = Position.Start;
         
       var subs = connection.SubscribeToAllFrom(latestPosition,true, HandleEvent);

         
        //subscribe to the stream of fraud alert
         connection.SubscribeToStreamFrom("PossiblyStolenCardClients", 0, true,
              (sub, e) =>
             {                                     
                 var jsonString = Encoding.UTF8.GetString(e.Event.Data);

                 JObject o = JObject.Parse(jsonString);
                 
                 var clientID = (string)o["ClientID"];

                 Bus.Publish(new ClientPossiblyStolen() { ClientID = clientID});

                 var ci = indexer.Get<ClientInformation>(clientID);

                 if (ci == null)
                     ci = new ClientInformation() { ID = clientID };

                 ci.PossiblyStolen = true;

                 indexer.Index(ci);
             }
             );
          
         
          Console.WriteLine("Indexing service started");
}

private void Handle(MoneyDeposited evt)
{
    var ci = indexer.Get<ClientInformation>(evt.ClientID);

    ci.Balance += evt.Quantity;
    ci.LastMovement = evt.TimeStamp;

    indexer.Index(ci);

    var ad = indexer.Get<AmountDepositedInTheBank>(evt.TransactionId.ToString());

    if(ad==null)
        indexer.Index(new AmountDepositedInTheBank { Quantity = evt.Quantity, TimeStamp = evt.TimeStamp, ID=evt.ClientID,TransactionId=evt.TransactionId });
}

 

If you noticed here we also subscribe to PossibleStolenCardsClients, but in this case we update the client information and also launch an event to the bus. This event will be processed by the UI that will show a message to the user alerting it.

 

How do we query in ElasticSearch?

Easy! A repository is created for querying the information using the ElasticSearch driver.

 


 public class ClientInformationRepository : ElasticSearchReadModel.Repositories.IClientInformationRepository
    {
        private const string INDEX = "metermanager";

        public List<ClientInformation> GetClientsBy(string name, bool? onlyPossiblyStolen)
        {
            ElasticClient esClient;

            var settings = new ConnectionSettings(new Uri("http://localhost:9200"));
            settings.SetDefaultIndex(INDEX);

            esClient = new ElasticClient(settings);

            //text is always in lowercase in ES
            if (!string.IsNullOrEmpty(name))
                name = name.ToLowerInvariant();

            var result = esClient.Search<ClientInformation>(
               sd => sd.Query(  q=> q.Strict(false).Wildcard(t => t.Name,name))
                   .Filter(f =>
                       f.Strict(false).Term(t => t.PossiblyStolen, onlyPossiblyStolen))                  

                   );

            return result.Documents.ToList();
        }

    }

Here you can take the code:

https://github.com/pablocastilla/CQRS-NServiceBus-EventStore-ElasticSearch

 

Next post: the UI.

Advertisements

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s

%d bloggers like this: