Andrew Dodd

 Home

Updating SimpleCQRS Golang port

Overview

I found that my Golang port of the SimpleCQRS project had a) a few errors that I really should fix, and b) should probably be refactored to be non-blocking on the command submission (up for debate I guess). While fixing them I discovered a new way to look at eventual consistency (and learned to be ok with it).

First, the fix

I decided one day to run Go’s race detector on the code from the original port, only to find that there was a data race! Having a closer look at the code this is not exactly surprising, as Go’s collections (like map and list) are not natively threadsafe, and the code was definitely reading and editing the same structures. The simplest place to see the race is in the List read model handling updates and providing GetInventoryItems() access. This might be avoidable if Go had a way to create a read-only view over the .db.list slice.

You can see the details of the changes here in the commit log.

I’m not sure if using a sync.RWMutex is the best way to solve this issue, but given the state and shape of the code it is certainly the quickest. It is probaby a better idea to use Go’s more powerful concurrency primitives (like channels and the select statement); or to use concurrency guarantees from an alternative storage (i.e. a read database or cache, rather than maps & lists), but both would require a significant restructure of the code and would depart from the original SimpleCQRS code dramatically.

Second, not blocking on command processing

It is not exactly clear from the original code (commit a426d3..ish) that the command submission actually blocks until it is processed.

To make it a bit clearer, I introduced an artificial delay to the “Rename” command - as a mocked form of latency. The changes are in commit 7933ee5, look something like this:

func (r *InventoryCommandHandlers) HandleRenameInventoryItem(m Command) error {
    message := m.(RenameInventoryItem)
    ar, _ := r.repo.GetById(message.InventoryItemId)
    item := ar.(*InventoryItem)
    time.Sleep(10 * time.Second) // Name changes take ages, not sure why
    err := item.ChangeName(message.NewName)
    if err != nil {
        return err
    }
    return r.repo.Save(item, message.OriginalVersion)
}

Modifying the code and then submitting a Rename request makes it abundantly clear that the web request synchronously waits for the command to be processed before returning. Following through the code, it progresses a little bit like this:

HTTP POST Request
 -> changeNameHandler
  -> fakeBus.Dispatch(cmd RenameCommand)
   -> bus.commandHandlers[RenameCommand](cmd)
    -> InventoryCommandHandlers(repo).HandleRenameCommand()
     -> repo.GetItem()
     -> item.Rename()
     -> repo.Save()
      -> repo.EventStore.SaveEvents(item.AggregateRoot.UncommittedEvents())
       -> eventStore.bus.Publish(events)
        -> for processor in bus.EventProcessors() -> go { process.Publish(evt) } 
           // i.e. here at the goroutine are we allowed to become async
   +----+
   |
   v
   HTTP redirect("/")

This implies that the POST / Command submission waits until:

  • the command has been validated & submitted for processing
  • the command processor has been found and executed
  • the repo has loaded the aggregate root, submitted the command, and requested the saving of the events
  • the event store has stored the events and published them to be processed.

Which is an awful long time for the the HTTP request to wait!

What to change?

The idea that I had was to first see if I could introduce a seam between the command submission and the processing. This would allow the HTTP request to return on “successful receipt of the command” (i.e. making no guarantees about whether it was applied). Second, in true violation of all of these CQRS/ES / Eventual Consistency discussions, I wanted to see if I could make it possible for the caller to decide if they want to wait for their command to be processed or not (i.e. let the command submitter decide to be sync or async). This is a violation in the sense that it allows the user to pin a processing unit (i.e. it violates the architectural goal of high scalability), but it is also in interesting thing to explore…

Use an event loop

Event loops are all the rage right now, right? So why not use one? (They sort of are, but I think they have been the rage for a long time actually…in most embedded systems, the whole thing is a big event loop!).

This situation does actually lend itself quite well to an event loop, as we want the commands to be processed sequentially (to avoid concurrency issues). The simplest way I could think to do this is to use a Go channel as the event stream, and have the FakeBus use a continuous loop to process the commands. This design will probably not achieve very high throughput, but it will suffice for this demo project. (Aside: A design that has mechanical sympathy like what is discussed by the LMAX Disruptor papers/talks would be better, but I’m not in a position to spend ages making the perfect demo project!).

The commit at 3f26769 implements the whole set of changes, but in essence the Dispatch(cmd Command) function on the Bus changed from:

func (fb *FakeBus) Dispatch(cmd Command) error {
    if handler, ok := fb.commandHandlers[reflect.TypeOf(cmd)]; ok {
        return handler(cmd)
    }
    return errors.New("no handler registered")
}

to:

func (fb *FakeBus) Dispatch(cmd Command) error {
    if _, ok := fb.commandHandlers[reflect.TypeOf(cmd)]; ok {
        fb.commandQueue <- cmd
        return nil
    }
    return errors.New("no handler registered")
}

So from the outside, the interface remains the same, but internally it is very different. The initialisation of the FakeBus creates a channel for received commands, and starts a never-ending goroutine (NB: we have no way to stop this goroutine, so technically it is a memory leak) to process the commands. It looks a little like this:

func (fb *FakeBus) processCommands() {
    for {
        select {
        case cmd := <-fb.commandQueue:
            // this should never panic, as the cmd should not be
            // accepted if there is no handler
            handler := fb.commandHandlers[reflect.TypeOf(cmd)]
            handler(cmd)
        }
    }
}

Not unexpectedly, this allows the “command submission” side of the chain to be decoupled from the “command processing” and “event dispatch” side!

Third, allowing the caller to choose sync or async

Although this is a violation of the original goal of this blog post, it’s cool to investigate it (and it serves as a way to practice some Go).

The basis for what I am going to do comes from what I learned about select and nil channels in the Advanced Go Concurrency Patterns talk.

The core of the changes revolves around modifying the CommandDispatcher interface to take an optional synchronousResponse channel:

type CommandProcessingError error
type CommandSubmissionError error

type CommandDispatcher interface {
    Dispatch(e                   Command,
             synchronousResponse chan CommandProcessingError) CommandSubmissionError
}

This allows the external calling code to determine if it would like to wait for the command to be “successfully processed” or not (NB: this is different from waiting until “processed by all read models”).

The following snippet (from the CQRSGui/main.go) file) shows optionally waiting or not:

bus := getBus(...)

// will NOT wait for ccommand to be processed, only until it has been validated and
// queued for processing
err = bus.Dispatch(s.RenameInventoryItem{ii.Id, version, name}, nil)

// this WILL WAIT until after the command has been processed
waitForSuccess := make(chan s.CommandProcessingError)
err = bus.Dispatch(s.RenameInventoryItem{ii.Id, version, name}, waitForSuccess)

// the result of the command processing will be returned on the provided channel
err := <- waitForSuccess

The internal implementation does not have to change much to accomodate this behaviour. First, the internal channel of commands to process must capture both the Command and the return channel:

type queuedCommand struct {
    cmd                     Command
    synchronousResponseChan chan CommandProcessingError
}

type FakeBus struct {
    commandQueue    chan queuedCommand
    commandHandlers map[reflect.Type]CommandHandler
    eventProcessors map[reflect.Type][]EventProcessor
}

Second, the infinite loop code that processes each command must return the result to the channel if it exists:

func (fb *FakeBus) processCommands() {
    for {
        select {
        case cmdReq := <-fb.commandQueue:
            cmd := cmdReq.cmd
            respChan := cmdReq.synchronousResponseChan
            handler, _ := fb.commandHandlers[reflect.TypeOf(cmd)]
            result := handler(cmd)

            // Utlise the select {default} trick to skip this if respChan is nil
            select {
            case respChan <- result:
            default:
            }
        }
    }
}

This code utilises the select { default } trick discussed in the Advanced Go talk to ensure that if a response channel is provided then the result is sent to it, and if not the select will skip over (as select statements with default case will not block on nil channels).

Forth, a neat way to look at and get on board with eventual consistency

The change from using standard application architecture (e.g. an ORM or ActiveRecord-esque layer over data access) to having different read and write models with eventual consistency is a bit of a mind bend. This is doubly true when you start thinking about it in terms of how you would normally build a UI delivered over HTTP. In most applications the method for data storage means that as soon as your mutation is committed it can be read back out (i.e. UPDATE x ... followed by a SELECT x ... will return the data you just wrote to the DB).

I certainly was worried about this architectural pattern from the perspective of how much more complicated it might make the design of the UI. Users generally expect that when they click “save”, the next page shows them the updated data (and they have been well trained enough these days to expect that the data have been successfully stored in a databse somewhere). Sure, you could “fake it” (which is sometimes proposed for UIs built on top of CQRS/ES), but this feels like a pandora’s box to me, as you’ll never know when it is appropriate to fake it or not. Additionally, while I’m totally on-board with using Eventual Consistency, I would always be a bit worried about how much more work it would be to implement, given the assumptions of my tooling and users that an Always Consistent model is being used.

There are a number of possible options that get mentioned for dealing with eventual consistency, such as an arbitrary “please wait” screen that is long enough to allow the eventually consistent window to close; or complicated concepts like registering a “command submitted id” and waiting until the command has been processed. None of thse are great solutions, so I wondered if you might be able to ignored the problem entirely for many systems.

In the SimpleCRQS project, a request to mutate some state flows something like:

1. Browser
2.  HTTP POST
3.   - Create a command
4.   - Submit command for processing
5.   - Redirect to "view"
6. Browser redirect
7.  HTTP GET ("view")
8.    - Get aggregate from read model

While playing around with the artificial delay in the Rename handler above, I found that if I made the delay 2ms or less the latency of the time to get from step 4 to step 7 on the browser was long enough for the command to have been processed, the event generates and handled by the read model.

Interestingly this implies that if your consecutive read latency (i.e. how long you expect between a client making a write and then a read on the same data) is longer than your expected eventual consisency window, you can probably afford to ignore the problem entirely. It’s such a cool finding I’m going to write it again in a different way….if your command processing is faster than consecutive WRITE-READ requests from a single client, you can probably ignore the need to deal with inconsistencies due to eventual consistency in the UI!!!

Additionally, if you are building a more responsive UI that uses “notications” or “websockets” (i.e. has an RPC interface rather than HTTP Verbs Only) you can doubly ignore it (especially for small and/or prototype projects), as the event notification can be viewed as just another “view model”.

Conclusion

Well….once again I have a blog post that peters out, probably because I have written it over the course of a month or two. I have no great conclusion, except to say that I hope one day to get an opportunity to work on a system that makes good use of the CQRS/ES pattern.