Every day PlanetScale processes more than 10 billion of our customers’ queries. We need to collect, store, and serve telemetry data generated by these queries to power Insights, our built-in query performance tool. This post describes how we built a scalable telemetry pipeline using Apache Kafka and a sharded PlanetScale database.
To show you Insights, we pull from the following datasets:
- Database-level time series data (e.g., queries per second across the entire database).
- Query pattern-level time series data (e.g., p95 for a single query pattern like
SELECT email FROM users where id = %)
- Data on specific query executions for slow/expensive queries (the “slow query log”).
The database-level data fits well into a time series database like Prometheus, but we run into several issues when trying to store the query-pattern level data in a time series database.
- On any given day, there are 10s of millions of unique query patterns being used across PlanetScale, and we anticipate that this number will continue growing. Most time series databases start having issues when dimension cardinality is that high. We evaluated the cost of storing this data in Prometheus and found that it would be expensive enough to be fairly unattractive.
- We need to store additional data for each query pattern aggregation, such as the complete normalized SQL, the tables used, and the last time the query was executed.
- To power Insights search, we need to be able to filter query patterns in sophisticated ways like substring matching against normalized SQL.
Given these requirements, we built a hybrid solution that uses Prometheus for database-level aggregations (where cardinality is suitably low) and a sharded PlanetScale database, backed by MySQL and Vitess, to store query-pattern level statistics and individual slow query events.
The Insights pipeline begins in VTGate. VTGate is a Vitess component that proxies query traffic to the underlying MySQL instances. We’ve added instrumentation to our internal build of Vitess that does the following (in addition to serving metrics that Prometheus scrapes):
- Sends an aggregate summary for each query fingerprint (for more on how we determine fingerprints, read this blog post about query performance analysis) to Kafka every 15 seconds. 15 seconds is a good balance between keeping the number of messages manageable and providing a near real-time experience.
- Sends slow query events to a Kafka topic immediately.
A primary design goal for Insights is that the instrumentation should never slow your database down or cause unavailability. We impose several limits at the instrumentation site to ensure this.
- We set a limit for the number of unique query patterns per interval. Since every unique query requires memory to track in VTGate, we need to ensure that we don’t consume an unbounded amount of memory if a database sees an enormous number of unique query patterns very quickly. We monitor VTGates to ensure that even our largest customers aren’t regularly exceeding this threshold.
- We limit the number of recorded slow query log events using a continuously refilled token bucket rate limiter with a generous initial capacity. This allows us to capture bursts of slow queries but limit overall throughput. Typically you don’t need to see hundreds of examples of the same slow query, so this doesn’t detract from the product.
Data submitted in VTGate is published to a bounded memory buffer and flushed to Kafka asynchronously. Asynchronous publication minimizes per-query overhead and ensures we continue to serve queries even during a Kafka outage. We guard against a temporary Kafka unavailability by buffering up to 5MB, which will be sent when Kafka becomes available again.
Data is read from Kafka by our application and written to MySQL. The query pattern data is aggregated by time in the database. We store the query pattern data in both per-hour and per-minute roll-up tables to serve large time periods efficiently and small periods with high granularity. Slow query events are written one-to-one into a MySQL table. For both the aggregate and slow query topics, we track the offset and partition from the underlying Kafka messages in the MySQL tables and use uniqueness constraints to avoid duplicating data if Kafka consumers retry batches following a failure.
Aggregate query data is mapped to Kafka partitions by setting the Kafka key to a deterministic hash of the database identifier and the query fingerprint. Because of this, all messages for a given database/query pattern will arrive in the same partition and we can merge aggregate Kafka messages in memory for each consumer batch to avoid unnecessary database writes. In practice, we’ve found that in-memory coalescing decreases database writes by about 30% - 40%. Larger batches yield better write coalescing but require more memory in the consumer and increase end-to-end latency. Under normal operations, the average batch size is around 200 messages but can go as high as 1,000 if there is a load spike or we’re working through a Kafka backlog. The higher coalesce rate in larger batches helps us quickly burn down message backlogs when they occur.
The Kafka consumers issue about 5k writes per second to the MySQL database, and we need to be ready to scale this out as PlanetScale grows. To ensure that the database doesn’t become a bottleneck, we sharded the Insights database cluster based on the customer database ID. (If you want to learn more about sharding, read our blog post on how database sharding works). Database ID works well as a shard key because we never need to join data across customer databases, and it results in a fairly even distribution of data across shards.
Insights originally shipped with four shards, but we increased this to eight earlier this year to keep up with increased write volume and to build operation experience resharding. Vitess can re-shard an actively used database, but we opted to provision a new, larger, PlanetScale database when we needed to increase the number of shards. Since Insights currently stores eight days of data, we provisioned a new set of consumers, let the new branch receive duplicate writes for eight days, and then cut the application over to read from the new database. This method allowed us to test and gain confidence in the new cluster before placing it in the critical path. Based on load tests and resource utilization metrics in production, we’ve found that our maximum write throughput has so far scaled linearly with the number of shards.
We’ve successfully run the Insights database cluster on fairly small machines (2 vCPUs and 2GB memory). A larger number of smaller shards keeps backups and schema changes fast, gives us the option of quickly scaling up to larger hardware if we encounter an unexpected throughput increase, and gives us breathing room to backfill a new cluster with more shards when necessary.
Time series latency percentiles are critical at the database level, to monitor overall health, and at the per query-pattern level to spot problematic queries. The database-level data is stored in Prometheus, so we can use the built-in quantile estimation tools. Since we’re storing the query pattern data in MySQL, though, we had to find a way to store and retrieve percentile data in MySQL without the help of any built-in functions.
As a brief refresher, a percentile is a summary statistic generated from a set of observations. If the 95th percentile of query latency is 100ms, 95% of the observed queries will be faster than 100ms, and 5% will be slower. Percentiles are typically more useful than other simpler statistics like the mean because they give you a more concrete idea of the actual performance of your system. For example, if the mean response time for a simple lookup query is 100ms - is your query fast enough? It could be that response time clusters tightly around 100ms in which case you probably need to find a way to improve performance. Or it could be that the vast majority of queries are taking a few milliseconds but a single query took 30s, in which case there’s probably nothing to be improved. If you know the 50th percentile query latency is 100ms, on the other hand, you know half of the time your query executes, it’s taking more than 100ms and there’s definitely room for improvement.
Calculating percentiles is harder than averaging though. The most straightforward way to determine the nth percentile is to record every observation, sort them, and then return the value n% into the sorted observations. This would require collecting and storing raw latencies for every single query which is impractical at scale. Another approach would be to precompute percentiles at the instrumentation site, but then we run into another problem: we need to be able to combine percentiles to merge data from multiple sources or roll percentiles up to larger time buckets. Sadly, averaging percentiles does not yield statistically meaningful results.
To efficiently collect and store percentile data that can be combined in a valid way, we decided to use DDSketch. DDSketch is a probabilistic data structure and set of algorithms built to compute error-bounded quantile estimates. DDSketches are fast to compute, bounded in size, and can be merged without losing statistical validity. The core idea is that a set of observations can be approximated by grouping values into buckets with exponentially increasing widths, and then storing a count of occurrences for each bucket. Quantiles can be calculated by storing the buckets in sorted order and finding the bucket key which contains the nth percentile value. Sketches can then be merged by summing the bucket counts. The accuracy of a DDSketch is determined by a parameter, ⍺, which controls bucket width and bounds the relative error of quantile estimates. Setting a lower ⍺ yields more accurate quantiles at the cost of increased sketch size. We’re using ⍺=0.01 which is sufficiently accurate (estimates can be off by at most 1%) and yields suitably small sketches.
Each VTGate instance records a sketch of the latencies for each query pattern and sends it along with the other aggregates every 15 seconds. The sketches are read from Kafka and written to MySQL in a custom binary format. We’ve implemented a small library of loadable C++ MySQL functions that know how to read and write the binary format, allowing us to aggregate sketches and compute arbitrary percentiles in MySQL. Performing these functions in MySQL allows us to calculate percentiles without needing to pull the underlying sketches into our application. It also lets us use the full expressive power of SQL to get the data we need. We plan to open source the MySQL function library in the near future.
MySQL is not typically the first data store that comes to mind for time-series data. MySQL was not explicitly designed as a time series database. It requires schemas and provides all manner of durability and transactional guarantees that are critical for application data but not strictly necessary in the time series domain. So, why are we storing time series data in MySQL? There are several reasons why this made sense in our case:
- The high cardinality of our primary dimension (query pattern fingerprint) made using Prometheus and many other time-series databases prohibitively expensive.
- Our set of dimensions is well-known and changes infrequently.
- The product requires the ability to filter the dataset in ways that many time series databases do not support.
- We have a natural shard key.
A wide variety of OLAP databases could also serve our needs here, but all of them involve significant operational overhead and a steep learning curve. We were pleased that our problem fit nicely into sharded Vitess and MySQL and we could avoid deploying and maintaining an additional storage system. With Kafka and Vitess sharding, we can scale all of the components of the Insights pipeline as volume increases and we’re well positioned to keep up with PlanetScale’s growth.