Event Sourcing with Kafka in Go
This post is a transcript of a technical talk given by Formulate at Stockholm Go Conference 2018.
Many real-world engineering problems processes information that changes over time. If multiple people/machines/services need to access the same information it is critical that it stays consistent across users as it evolves (stays in sync without contradictions). Event sourcing is a powerful technique for ensuring that is the case. At Formulate we use it throughout our infrastructure, implemented with tool called Kafka.
Kafka crash course
Kafka is a publish/subscribe message broker. Think of it as a niche database that is optimized for fast writing and reading back information in the same order it was written. It does not impose any particular structure on the data, every record is just a blob of bytes, and it is not queryable like traditional relational databases, like Oracle or Postgres. The lack of high level features brings Kafka close to the metal, making it high-performant.
A Kafka cluster consists of multiple brokers. Each of these hold an eventually complete copy of the whole dataset, synchronizing as data comes in. If one broker becomes unavailable the remaining brokers cover up for it until it's back online. This makes it fault-tolerant.
The state of an application is the information it contains at a given point in time. With event sourcing you don't just keep track of the current snapshot of the state, but all the states it has gone through to get there.
Essentially it works like accounting. There you are very much interested in how you arrived at a final balance, not just the figure (and you're in fact required by law to keep track of it).
With Kafka all writes to the log are persisted on disk before it is a available for consumption. This is great for debugging and analytics as it allows you to go back and forth through time, inspecting any any historic state you might be interested in. It also allows you to restart a program at any point you like, which is great if you want to update it, rerun a part of it, or it crashes because of external reasons.
Systems level event souring
Event sourcing is not limited to individual applications but can be used to organize entire systems. This picture shows a simplified version Formulate's infrastructure.
Our customers' IT department serves us raw files with sales data, promotion calendar, product hierarchies etc. using an integration to their system. These are put in a file storage together with a reference in an ordered ever-growing append-only log (named "raw" above). One by one the files are materialized into a postgres database that acts as our application state. Both our user-interface (backend/frontend) and our analysis pipelines can then flexibly query this Postgres database to extract whatever information they need. The analysis pipelines writes results to another Kafka topic (named “results” above) that gets materialized back into Postgres, like the raw topic.
Should any service crash it will automatically restart and continue where it left off. If we want to update and redeploy a service it is safe to take it down the current version and put back the updated version without interfering with any of the other service. This setup even allows us to radically redesign the schema of our Postgres database, throw away the old version and rematerialize all data back into an updated database. Kafka holds the complete truth and Postgres only acts as a view of that. In Kafka the data is stored in a format that is maximally expressive and complete whereas in Postgres it is simplified into a format that is optimized for fast querying.
For performance reasons, the data generated by the user on the app flows in different direction, more as a backup than main driver. It can nonetheless be materialized back if we need it later.
Decoupling by message passing
There are three fundamental ways that programs exchange information with one another:
Commands — "Please perform an action for me" or "Please update your state to this new value".
Queries — "What is your state?" or "What is the state of the world?"
Messages — "This happened".
Event sourcing is done by message passing, which differs from commands and queries in that it is asynchronous. Asynchronous means that the writer of information does not wait for or adapt to the needs of any particular reader. Of course everyone sharing the same topic must agree on the message format but the writers and readers does not have care about who anyone else is or what they will do with the information. They are said to be loosely coupled. For example, if we want to add a metrics tracking service to the data ingestion pipeline or the analysis orchestration we just have it read the same logs and don't have to modify any of the existing services.
By contrast, commands and queries are synchronous, meaning that the requester has an open connection to the responder and waits for it to return its response. For the request to be successful, the requester must know a great deal about the responder:
If it's available at the moment
What requests it accepts
What it might return
How long to wait for a reply
What to do if it fails
In synchronous communication the coupling is much tighter. If either requester or responder is modified the other must be modified too. If there are 20 consumers of a service, they must all be updated if the service updates. The more tightly coupled a system is the harder it is to modify it, and consequently more costly to maintain.
To really hammer it in, consider the following pictures to get a more tactile sense of it:
Runable example at github.com/formulatehq/event-sourcing-in-go.
The analysis orchestration engine at Formulate illustrates the points above well. It consists of two services, tracker and worker, that communicate using message passing and build up state using event sourcing (tracker only, the worker is stateless).
Since some pipelines take hours to run, event sourcing is a great fit since it let's us stop, start, work on the tracker however we like without disturbing the workers. When it pops back online it rebuilds its state and continues. If new messages come in while it's unavailable that's ok too, because Kafka keeps track of how far it has read.
Essentially it works like an airport. The tower manages the state and workers gets instructed to check in luggage, load bag, fill up gas, and give permission to take-off and land. When there’s a lot of work at the check-in we can easily spin up many check-in workers and scale-down once the burst is over.
Do I really need this? — Yes, most likely.
If you deal with information that gets updated you have state that needs to be managed. This can be done in two ways:
Store a single mutable copy of the state (mutable snapshot).
Store all historic versions or an incremental change log (event sourcing).
A mutable snapshot is easier to implement and requires less memory and disk than event sourcing. It also conveniently updates all references within the same application instance, but not other instances. If the state is confined to a single instance, no concurrent reading and writing, and you don’t care about history then event sourcing is definitely overkill. However, most applications does not fit that description and would be better off with event sourcing.
Recommended talks, pods, and reads
If you like to dig deeper into this topic I can highly recommend the following material:
Simple Made Easy, Rich Hickey (2011)
The Language of the System — Rich Hickey (2013)
Turning the Database Inside Out with Apache Samza — Martin Kleppmann (2015)
Software Engineering Daily / Kafka — Podcast with Jeff Meyerson
Kafka in a Nutshell — Kevin Sookocheff (2015)
Event sourcing — Martin Fowler (2005)