Event Sourcing with Kafka in Go

This post is a transcript of a technical talk given by Formulate at Stockholm Go Conference 2018.


Many real-world engineering problems processes information that changes over time. If multiple people/machines/services need to access the same information it is critical that it stays consistent across users as it evolves (stays in sync without contradictions). Event sourcing is a powerful technique for ensuring that is the case. At Formulate we use it throughout our infrastructure, implemented with tool called Kafka.

Kafka crash course

Kafka is a publish/subscribe message broker. Think of it as a niche database that is optimized for fast writing and reading back information in the same order it was written. It does not impose any particular structure on the data, every record is just a blob of bytes, and it is not queryable like traditional relational databases, like Oracle or Postgres. The lack of high level features brings Kafka close to the metal, making it high-performant.

A Kafka cluster consists of multiple brokers. Each of these hold an eventually complete copy of the whole dataset, synchronizing as data comes in. If one broker becomes unavailable the remaining brokers cover up for it until it's back online. This makes it fault-tolerant.

 The data is stored in ever-growing logs called topics. When you consume from a topic you are guaranteed to get the messages sent back to you in the exact same order they were written. Topics may be partitionned to allow reading in parallel to improve processing speed. The order guarantee still holds within each partition but not across partitions, but in some cases it's a sensible tradeoff (demonstrated in the demo at the end).

The data is stored in ever-growing logs called topics. When you consume from a topic you are guaranteed to get the messages sent back to you in the exact same order they were written. Topics may be partitionned to allow reading in parallel to improve processing speed. The order guarantee still holds within each partition but not across partitions, but in some cases it's a sensible tradeoff (demonstrated in the demo at the end).

Event sourcing

The state of an application is the information it contains at a given point in time. With event sourcing you don't just keep track of the current snapshot of the state, but all the states it has gone through to get there.

 This application has two variables A and B. In the most recent state A is 4 and B is 7, but we still keep all historic states it passed through along the way.

This application has two variables A and B. In the most recent state A is 4 and B is 7, but we still keep all historic states it passed through along the way.

Essentially it works like accounting. There you are very much interested in how you arrived at a final balance, not just the figure (and you're in fact required by law to keep track of it).

With Kafka all writes to the log are persisted on disk before it is a available for consumption. This is great for debugging and analytics as it allows you to go back and forth through time, inspecting any any historic state you might be interested in. It also allows you to restart a program at any point you like, which is great if you want to update it, rerun a part of it, or it crashes because of external reasons.

Systems level event souring

Event sourcing is not limited to individual applications but can be used to organize entire systems. This picture shows a simplified version Formulate's infrastructure.

Our customers' IT department serves us raw files with sales data, promotion calendar, product hierarchies etc. using an integration to their system. These are put in a file storage together with a reference in an ordered ever-growing append-only log (named "raw" above). One by one the files are materialized into a postgres database that acts as our application state. Both our user-interface (backend/frontend) and our analysis pipelines can then flexibly query this Postgres database to extract whatever information they need. The analysis pipelines writes results to another Kafka topic (named “results” above) that gets materialized back into Postgres, like the raw topic.

Should any service crash it will automatically restart and continue where it left off. If we want to update and redeploy a service it is safe to take it down the current version and put back the updated version without interfering with any of the other service. This setup even allows us to radically redesign the schema of our Postgres database, throw away the old version and rematerialize all data back into an updated database. Kafka holds the complete truth and Postgres only acts as a view of that. In Kafka the data is stored in a format that is maximally expressive and complete whereas in Postgres it is simplified into a format that is optimized for fast querying.

For performance reasons, the data generated by the user on the app flows in different direction, more as a backup than main driver. It can nonetheless be materialized back if we need it later.

Decoupling by message passing

There are three fundamental ways that programs exchange information with one another:

  • Commands — "Please perform an action for me" or "Please update your state to this new value".

  • Queries — "What is your state?" or "What is the state of the world?"

  • Messages — "This happened".

Event sourcing is done by message passing, which differs from commands and queries in that it is asynchronous. Asynchronous means that the writer of information does not wait for or adapt to the needs of any particular reader. Of course everyone sharing the same topic must agree on the message format but the writers and readers does not have care about who anyone else is or what they will do with the information. They are said to be loosely coupled. For example, if we want to add a metrics tracking service to the data ingestion pipeline or the analysis orchestration we just have it read the same logs and don't have to modify any of the existing services.

By contrast, commands and queries are synchronous, meaning that the requester has an open connection to the responder and waits for it to return its response. For the request to be successful, the requester must know a great deal about the responder:

  • It's address

  • If it's available at the moment

  • What requests it accepts

  • What it might return

  • How long to wait for a reply

  • What to do if it fails

etc.

In synchronous communication the coupling is much tighter. If either requester or responder is modified the other must be modified too. If there are 20 consumers of a service, they must all be updated if the service updates. The more tightly coupled a system is the harder it is to modify it, and consequently more costly to maintain.

To really hammer it in, consider the following pictures to get a more tactile sense of it:

  Tight coupling is like knitting.  You can't easily modify the existing glove without breaking it. It's basically impossible to replace the red threads with yellow threads because they are so closely tied together with the rest of the glove. If you have to replace a red thread you'll probably start over from scratch with an entirely new code base.

Tight coupling is like knitting. You can't easily modify the existing glove without breaking it. It's basically impossible to replace the red threads with yellow threads because they are so closely tied together with the rest of the glove. If you have to replace a red thread you'll probably start over from scratch with an entirely new code base.

  Loose coupling is like beads.  You can easily replace any bead with minimal impact to the overall system. Notice though that although the beads are in fact not coupled to each other at all, they are all tightly coupled to the bottom plate. There has to be at least some coupling for any information exchange to happen, but hopefully the plate rarely changes.

Loose coupling is like beads. You can easily replace any bead with minimal impact to the overall system. Notice though that although the beads are in fact not coupled to each other at all, they are all tightly coupled to the bottom plate. There has to be at least some coupling for any information exchange to happen, but hopefully the plate rarely changes.

  Tight and loose coupling brings different properties to your system  (fast and rich vs. reliable and cheap). In most real applications you need a mix of both tightly and loosely coupled parts, like this Lego town. Anyone can see that replacing one house won't affect the other, or the bus in the background. They are loosely coupled. Within each house the bricks are more tightly coupled, but you can still take them apart and add a floor relatively easy.

Tight and loose coupling brings different properties to your system (fast and rich vs. reliable and cheap). In most real applications you need a mix of both tightly and loosely coupled parts, like this Lego town. Anyone can see that replacing one house won't affect the other, or the bus in the background. They are loosely coupled. Within each house the bricks are more tightly coupled, but you can still take them apart and add a floor relatively easy.

 Indeed, going back to Formulate's system we see that it indeed employs a mix of communcation types. The  black arrows  show  commands , i.e. please execute an action or update your state. The  red arrows  show  queries , i.e. please tell me something about the state of the world. The  blue arrows  show  message passing , i.e. just logging that something happened.

Indeed, going back to Formulate's system we see that it indeed employs a mix of communcation types. The black arrows show commands, i.e. please execute an action or update your state. The red arrows show queries, i.e. please tell me something about the state of the world. The blue arrows show message passing, i.e. just logging that something happened.

  Too tight coupling is a common problem, but too loose rarely is.  Make sure you know the tools of tight and loose coupling! Otherwise you might find yourself using tools that don’t don’t fit the problem and ends up with high maintenance costs.

Too tight coupling is a common problem, but too loose rarely is. Make sure you know the tools of tight and loose coupling! Otherwise you might find yourself using tools that don’t don’t fit the problem and ends up with high maintenance costs.

Interactive demo

Runable example at github.com/formulatehq/event-sourcing-in-go.

The analysis orchestration engine at Formulate illustrates the points above well. It consists of two services, tracker and worker, that communicate using message passing and build up state using event sourcing (tracker only, the worker is stateless).

 The  workers  pulls jobs of a log, fits models and writes results to another log. The job logs are populated by the  tracker , which has a schedule and accepts incoming ad hoc analysis requests from our data scientists. Oftentimes a request consists of a large collection of jobs that must be executed in a particular order, so the tracker takes care of the coordination using event sourcing to manage its state.

The workers pulls jobs of a log, fits models and writes results to another log. The job logs are populated by the tracker, which has a schedule and accepts incoming ad hoc analysis requests from our data scientists. Oftentimes a request consists of a large collection of jobs that must be executed in a particular order, so the tracker takes care of the coordination using event sourcing to manage its state.

Since some pipelines take hours to run, event sourcing is a great fit since it let's us stop, start, work on the tracker however we like without disturbing the workers. When it pops back online it rebuilds its state and continues. If new messages come in while it's unavailable that's ok too, because Kafka keeps track of how far it has read.

Essentially it works like an airport. The tower manages the state and workers gets instructed to check in luggage, load bag, fill up gas, and give permission to take-off and land. When there’s a lot of work at the check-in we can easily spin up many check-in workers and scale-down once the burst is over.

Do I really need this? — Yes, most likely.

If you deal with information that gets updated you have state that needs to be managed. This can be done in two ways:

  • Store a single mutable copy of the state (mutable snapshot).

  • Store all historic versions or an incremental change log (event sourcing).

A mutable snapshot is easier to implement and requires less memory and disk than event sourcing. It also conveniently updates all references within the same application instance, but not other instances. If the state is confined to a single instance, no concurrent reading and writing, and you don’t care about history then event sourcing is definitely overkill. However, most applications does not fit that description and would be better off with event sourcing.

Recommended talks, pods, and reads

If you like to dig deeper into this topic I can highly recommend the following material:

Christofer BäcklinTech