CQRS with event sourcing using NServiceBus, Event Store, Elastic Search, AngularJS and ASP.NET MVC (part 2)

CQRS+event sourcing posts

Introduction

Command + event sourcing

Read model: Elastic Search and event store projections

UI

 

The C

As you may know CQRS stands for Command Query Responsability Segregation. This basically means that you will implement two separate systems, one for managing commands (C) and other for managing queries (Q). They get synchronized by “something” in the middle. Here we will also cover a “special” way to store the command data: event sourcing with Event Store. Storing events means that you store what has happened in the system as an event series, not in a structured way. For instance you don’t have a table called Clients (Relational way), or you don’t have an aggregate with all event info in a document (NoSQL), you just store the event client created in client-X events stream. So at the end you will have a lot os streams, maybe one per client, and all client’s events inside its stream. In the image you can see the stream “Bank-Client-38794” (38794 is the client id) with two events: ClientCreated and MoneyDeposited. As you can see events are just serialized JSONs.

 

ClientStream

 

Architecture

All the CQRS + ES stuff is thought for implementing veeeeeeery high scalable system. For that I think it will be better to put a messaging bus in the middle for balancing and “buffering” the load. That way a lot of endpoints that can receive messages are exposed to the upper layer (the UI or an Integration Bus). So let’s see how a command works:

 

NServiceBus endpoint

I have implemented a few services which handles NServiceBus commands, they receives messages, store events (these events are at model-eventsourcing level) and maybe propagate more events to the bus. Here you can see the handler for the CreateClient bus command.


    public class CreateClientHandler : IHandleMessages <CreateClientCommand>
    {
        /// <summary>
        /// This handles the CreateClientCommand. It stores the create client event and make the first deposit.
        /// </summary>
        /// <param name="message"></param>
        public void Handle(CreateClientCommand message)
        {

            var domainRepository = ObjectFactory.GetInstance<IDomainRepository>();

            var client = Client.CreateClient(message.ClientID, message.Name);
            client.Deposit(message.InitialDeposit, DateTime.UtcNow,message.TransactionId);

            domainRepository.Save<Client>(client, true);

        }
    }

It is simple. The first difference is that the event sourcing approach is more DDD style and less like commands that uses classes with properties against a database. It is more OO :).

Creating and storing objects in the model

Here is the important part of the Client aggregate at writting events:
– When a client is created a ClientCreated event is raised. When an event is raised it is applied to the instance (see Apply function) and also it is stored in a list of uncommitted events (in the AggregateBase class).
– Also the same for Deposit, this function makes the validations (not in this case :)) and call to RaiseEvent.


        public Client(string id, string name) : this()
        {
            RaiseEvent(new ClientCreated(id,name));
        }

        public static Client CreateClient(string id, string name)
        {
            return new Client(id, name);
        }

        public void Deposit(double quantity, DateTime timeStamp, Guid transactionId, bool fromATM = false)
        {
            RaiseEvent(new MoneyDeposited(quantity, timeStamp, ID, transactionId, fromATM));
        }

        public void Withdraw(double quantity, DateTime timeStamp, Guid transactionId, bool fromATM = false)
        {
            quantity = Math.Abs(quantity);

            RaiseEvent(new MoneyWithdrawn(quantity, timeStamp, ID, transactionId, fromATM));
        }

        private void Apply(ClientCreated obj)
        {
            this.ID = obj.ID;

            Name = obj.Name;

            balance = 0;
        }

        private void Apply(MoneyDeposited obj)
        {
            balance += obj.Quantity;

        }             

        private void Apply(MoneyWithdrawn obj)
        {
            balance -= obj.Quantity;
        }

Of, so we have a new client created with a list of uncommitted events applied. What happens when we call to save? In the following class you see how the uncommitted events of an aggregate are stored:
– It takes the events
– Calculates the aggregate version expected and original. The version at the end is the number of events in the stream.
– Creates the stream name, in this case it has the following format Bank-Client-ClientId.
– Calls to event store for saving.


        public override IEnumerable<IDomainEvent> Save<TAggregate>(TAggregate aggregate, bool isInitial=false)
        {
            var events = aggregate.UncommitedEvents().ToList();
            //var expectedVersion = CalculateExpectedVersion(aggregate, events);

            var originalVersion = aggregate.Version - events.Count;

            var expectedVersion = originalVersion == -1 ? ExpectedVersion.NoStream : originalVersion;

            if(isInitial)
                expectedVersion = ExpectedVersion.NoStream;

            var eventData = events.Select(CreateEventData);
            var streamName = AggregateToStreamName(aggregate.GetType(), aggregate.AggregateId);
            connection.AppendToStreamAsync(streamName, expectedVersion, eventData);
            return events;
        }

And that’s all at the saving events part. Of course there are more complicated stuff regarding EventStore and NServiceBus, but that’s for more detailed posts.

Recreating objects from EventStore

Ok, now we have a client. This client could go to the bank to deposit more money.


    public class DepositMoneyHandler : IHandleMessages<DepositMoneyCommand>
    {
        public void Handle(DepositMoneyCommand message)
        {
            var domainRepository = ObjectFactory.GetInstance<IDomainRepository>();

            var client = domainRepository.GetById<Client>(message.ClientID);

            client.Deposit(message.Quantity, DateTime.UtcNow, message.TransactionId,message.FromATM);

            domainRepository.Save<Client>(client);
        }
    }

The client object has to be instanciated and filled with the latest information. How is it done? Let’s see how the GetById works:
– It creates the stream name: Bank-Client-ClientId
– Takes all its events
– and call to the function BuildAggregate.


 public override TResult GetById<TResult>(string id)
 {
 var streamName = AggregateToStreamName(typeof(TResult), id);
 var eventsSlice = connection.ReadStreamEventsForwardAsync(streamName, 0, int.MaxValue, false);
 if (eventsSlice.Result.Status == SliceReadStatus.StreamNotFound)
 {
 throw new AggregateNotFoundException("Could not found aggregate of type " + typeof(TResult) + " and id " + id);
 }
 var deserializedEvents = eventsSlice.Result.Events.Select(e =>
 {
 var metadata = SerializationUtils.DeserializeObject<Dictionary<string, string>>(e.OriginalEvent.Metadata);
 var eventData = SerializationUtils.DeserializeObject(e.OriginalEvent.Data, metadata[EventClrTypeHeader]);
 return eventData as IDomainEvent;
 });
 return BuildAggregate<TResult>(deserializedEvents);
 }

BuildAggregate applies every event to the object and return it when finished. We have with that the current version of the object.

        protected TResult BuildAggregate<TResult>(IEnumerable<IDomainEvent> events) where TResult : IAggregate, new()
        {
            var result = new TResult();
            foreach (var @event in events)
            {
                result.ApplyEvent(@event);
            }
            return result;
        }

 

With all of this we have again the Client created and now we can work more with it.

 

Here you can take the code:

https://github.com/pablocastilla/CQRS-NServiceBus-EventStore-ElasticSearch

 

Next post: the read model.

 

 

 

NOTE: the aggregate code and repository has been taken and modified from this great blog posts:

http://blog.tomasjansson.com/cqrs-the-simple-way-with-eventstore-and-elasticsearch-integrating-elasticsearch/

 

 

 

 

Advertisements

2 comments

  1. Hi Pablo,
    I’m still studying your solution example. It’s very good.
    Based on your experience, do you usually create a CrossCutting library like the one in the example for each project? Or you use a more generic one like CommonDomain (NEventStore)?
    Thanks

    1. Hi!

      Sorry but I don’t really understand the question. I use the crosscutting for common utils code, not logic, so it can be shared between projects.

      The domain is logic oriented and specific for each project

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s

%d bloggers like this: