Sep 18, 2015 by Parker DeBardelaben Data

Powering MailChimp Pro Reporting

This week, we launched MailChimp Pro, a powerful new feature set for quickly growing businesses with lists of 50,000 or more that want to optimize their email marketing. In order to build MailChimp Pro, which is our most powerful set of tools so far, our Ops team would first need a data infrastructure with:

  • Hundreds of servers across 2 datacenters that cost millions of dollars
  • 7 days (or 60TB) of events from our applications in Kafka
  • The ability to process 400,000 events/second, with room to grow much larger

And we'd have to do it with a small team of engineers. In fact, only 2-6 at any given time would be working on what would become MailChimp's data pipeline.

But in this post, we're going to dive specifically into some of the infrastructure we've built to support Pro's Comparative Reports feature. Comparative Reports was a massive undertaking that involved tremendous engineering and design effort on the application side. It also uses a new internal service we built called ChimpQuery, which denormalizes data from MailChimp into a table structure optimized for reporting and segmentation. All subscriber and campaign data for Pro users is synchronized from MailChimp into ChimpQuery within seconds of arriving on our servers, utilizing our brand new data pipeline which was built on top of Apache Kafka. Even more exciting, this infrastructure will power many new features and applications in the future. (Keep your eyes peeled for Pro Segmentation in an upcoming release, for example.) Phew!

But wait, how did we get here?

A long time ago...

ChimpQuery isn't our first attempt to synchronize data from MailChimp into other systems. The Email Genome Project is built on what we might call data pipeline v1.0. Email data is written out to flat files, copied to a centralized staging server and consumed into a Redis cluster with 7 TB of RAM. While this works very well for this use case, there are a few pitfalls:

  • Dual write (file and database) means managing two locations, which makes it easier to fall out of sync.
  • Data coming from separate app servers has no order guarantee.
  • This approach doesn't handle multiple data consumers very easily.

Based on our experience with EGP and a few other systems, we set out to find a better solution that could be used to build larger systems.

Enter Kafka

We began exploring Apache Kafka about a year ago. Kafka offers a distributed log that's optimized for multiple consumers. If you're not familiar with Kafka or why unifying systems around logs is so useful, here's a helpful article. In a nutshell, Kafka offers topics, which could be thought of as directories, and within these topics exist partitions, which are individual log files.

Kafka is written in Scala, but our apps are written in PHP. As there was no current PHP client for Kafka, we had to write our own or add another language to our stack. We're careful about adding new technologies, only doing so when it serves a purpose that supports our business. We chose to use Scala for any code that would directly touch Kafka to avoid relying on a third-party library, and because much of the ecosystem around Kafka and other data-related technology is based on the JVM.

Two of our developers dove in and became comfortable enough with Scala to write the first version of our data pipeline, initially setting out to simply replace the act of copying files with the act of writing data into Kafka. We quickly learned that 1) Kafka would be a good fit for write once/read many, and 2) we'd have to get updates directly from MySQL to guarantee data consistency between MailChimp and our other systems. Shipping events programmatically from our code wasn't sufficient to capture everything.

Better, faster, stronger

We're heavy MySQL users. MailChimp has more than 100 distinct sets of instances and around 100 TB of primary, active data. MySQL has built-in replication, which we use for redundancy, ensuring that multiple copies of each instance are available for failover.

Replication comes in a few forms, and we use statement-based binlogs in our primary instances for a few different reasons. Statement-based replication works well for replicating to another MySQL instance, but if you want to have enough per-record detail for another system to easily get the details on every item and every change, row-based replication is the way to go. As part of building our data pipeline, we set up another layer of MySQL slaves (a new slave for each of those 100+ sets mentioned above) as the first step. This enables us to capture absolutely everything that happens in our applications and commit them to binlogs that contain a record of every row that's touched. This solved both our dual write and ordering problems because:

  • Users will hit random app servers, but each user's data lives on a single MySQL instance.
  • MySQL's binlog order is guaranteed. We preserve this order by writing binlog events into a single Kafka partition per instance.

Shipping binlogs into Kafka is simple enough, but that's only the tip of the iceberg — binlogs by themselves are still very raw and lack data important to the general consumer. We use 3 discrete stages to convert the binlog data to a generally consumable format, with the output of each stage stored in Kafka. This segmentation will allow us the flexibility to reuse and adapt our infrastructure for future projects.

The first stage consumes the binlog events (using this great MySQL binary log connector library) and spreads the data across many partitions to facilitate subsequent parallel processing. A second stage maintains empty replicas of the databases to which the events occurred in order to enrich the raw DML events. These replicas are updated by the DDL events in the stream in such a way as to ensure that each DML event is enriched by a replica table whose state is identical to that of the originating table at the time of the event's occurrence. The final third stage serializes the events into Thrift structs. Each of these stages is written in Scala, with the latter two using the Akka Streams library to regulate the flow of events (prevent buffer overflows, etc.) according to the reactive approach it embodies. We hit all sorts of challenges along the way that aren't reflected here, but we were able to work through them all.

Thrift objects are usable with practically any language, including PHP. One final consumer that's specific to ChimpQuery reads data from Kafka and queues it in Redis for PHP to consume. In the near future, we plan to generalize this Thrift consumer to support sending the same Thrift-serialized data to other systems.

ChimpQuery sits atop sharded PostgreSQL pairs (12 of them, currently) that are specifically optimized for the types of advanced reporting and segmentation required by MailChimp Pro. Ultimately, this provides an API back to our primary code base for our users that have Pro enabled. We use PostgreSQL schemas to isolate user data, preventing table and index data from commingling, and helping to mitigate the impact of large users on performance for smaller users. a galaxy far, far away

These days, we push hundreds of thousands of events per second to ChimpQuery through Kafka. It's the end result of an engineering effort that took more than a year with a small team. We've accomplished a lot, but have a lot more to build and support with our data pipeline as a centerpiece. This is only the first of many new features and products that can leverage this same infrastructure. And we're looking for smart people to help. If this sort of work sounds interesting to you, consider joining our Data Systems team.