There’s been a lot of buzz around the term “Highly Scalable Systems” and in every interview (for backend at least) you will encounter a question related to that. In backend we either scale the services (our servers) or the database. In this post we will take data driven decisions on how to scale the database focusing on a particular table, say the posts table.
Note - The numbers at which we are changing strategies and taking new decisions may not be accurate and our focus will be on a relational database like MySQL or PostgreSQL.
The Idea
So let’s consider that we are dealing with a single table - posts that stores the posts created by the users and with time it is growing rapidly. This data growth will make us take some data and architecture level decisions which will help us scale the database with increasing data.
We can assume the structure of the post table is like:
1 | CREATE TABLE posts ( |
This is just an ideal posts table, user_id is the id of the user who has created the post and slug is a unique identifier which will be assigned to the post on creation for frontend routing.
0 to 100k
Until now we will be fine. We can keep creating new posts and can easily fetch the data from the table.
100k to 1 Million
For the first million posts, we can apply different optimization techniques, add caching and vertical scaling to squeeze out most of the database instance to serve the users. The various optimizations can be like:
- Proper indexing of columns involved in the
WHEREqueries like:created_at DESCand(user_id, created_at). - Query tuning like improving and reducing multi table complex
JOINSand even eliminating them (if possible). - Using caching like
REDISto cache the data. - Using connection pooling.
- And keep increasing the
cpu(s)andmemoryto it which we call vertical scaling.
1 | -- example of indexes to speed up queries |
Disaster Alert: We will quickly hit the limit of max CPUs, IO and max memory.
1 Million to 100 Million
Optimizations and vertical scaling quickly hit the limits and we have to take hard choices like changing the architecture. There can be several issues like:
- Feed queries hits the same index constantly.
- Disk I/O grows, buffer cache hit rate drops.
- backups takes time and the complex queries (like shared posts which can be self join) will become extremely slow.
Let’s have a look at the different approach we can take to keep scaling the database:
Materialized Views
If we have joins, like if we want to display the author details of the post as well then we must be joining with the users table and with increasing data JOINS become expensive. During such scenarios, we can create materialized views and refresh them in every few seconds. This will help us to query a single table instead of multiple tables.
1 | CREATE MATERIALIZED VIEW recent_posts_with_users AS |
Now we can avoid the JOIN query and use a single query to fetch posts with author details. But there is an overhead of materialized views, we need to refresh them at certain intervals using TRIGGER or CRONJOBs.
1 | REFRESH MATERIALIZED VIEW recent_posts_with_users; |
Note: In production application we need to add likes count, view counts, post score, comments and much more…
Searching
With increasing data doing search or text matching via SQL will be really a bad idea. We need to add dedicated systems that can index the post contents and help us search easily. We can use Elasticsearch to ease the pain of searching which is an essential feature for any application.
Separating Reads & Writes
The read volume is always higher than the write volume for a blog platform so one prominent idea is to separate the reads and writes to independent instances. We use the master-slave architecture. Here’s how it works:
- A user creates a post (a write) -> this is routed to the Primary.
- The Primary asynchronously copies the new post to all the Replicas.
- A user views a post (a read) -> is routed to one of the Replicas.
With time based on the read percentage we will keep increasing the numbers of replica for the master.
Disaster Alert: Since the master will asynchronously update the replicas we will encounter Replication Lag.
Replication lag is a situation where the immediate writes are not instantly reflected in the replicas and this is common problem for multi replica database setup. We can solve this with force read of immediate write from the master/primary instance instead of the replica instance.
100 Million to 500 Million
The reading problem was solved via replication but now we face issues with writes. Write pressure becomes the biggest issue as the index maintenance will be very expensive.
Table Partitioning
Partitioning is a process where it splits a single large table into smaller physical parts (separate files), but the database still treats it as one logical table. It is very helpful if we query the data based on range and it can delete or archive a set of ranges very quickly.
We can partition the table by created_at which will help us with fast index scans and easier deletion.
To partition the data in MySQL there are some requirements like every partitioning expression must use only columns that are in the PRIMARY KEY or in every UNIQUE KEY. Since in our posts table above we have only id and slug as primary key and unique key, we need to tweak our table.
Let’s look at the criteria which can give us most benefit:
- We can partition by year. This is the most appropriate one because posts are part of homepage feeds and most of the queries are always on time ranges.
- Alternatively we can also partition based on the
user_idbut that is not much helpful.
So we will first tweak the posts table to partition it by year.
Disaster Alert: Blindly adding created_at to our PRIMARY KEY on a 100M row production table will cause massive locking, index rebuilds, and downtime unless we do it carefully.
1 | ALTER TABLE posts DROP PRIMARY KEY, ADD PRIMARY KEY (id, created_at); |
Creating the partition.
1 | CREATE TABLE posts ( |
In future we can create further partitions using the following commands
1 | ALTER TABLE posts |
There are several types of partitions available like RANGE, LIST, HASH, KEY & COLUMNS. If data is increasing aggressively, instead of yearly partitions, we can switch to monthly or even weekly partitions.
Note: In production we will create a new partitioned table, then we have to route writes to it while we backfill old rows in small batches, build indexes concurrently, then switch read queries to the partitioned tables
Sharding
Partitioning did us a great help but if you have noticed, all the data is still on the same machine! And with writes increasing exponentially, we can hit the limit of the CPU, RAM, disk, IOPS all very quickly. hence we have to opt for sharding.
Sharding splits data across different database instances (servers). Each shard has its own schema and tables, but stores only part of the data. It removes CPU/memory bottlenecks of single server.
Disaster Alert: Now we need a proxy or build application logic to determine which shard contains the required data. MongoDB has this shard locater logic built into it!
Note: If we have multiple tables and they used joins then joins between shards are one of the hardest problems in sharding.
Denormalization
Denormalization in a relational database means intentionally storing redundant or duplicate data to improve performance, simplify queries, or avoid expensive joins—even though it violates strict normal forms. At scale, denormalization is a performance optimization, not a schema design flaw.
We are dealing with posts but when we show them in the app, it will always carry user information. Currently we do not have any user data in the posts table and hence we used JOIN to get hte data of the user. Since we now have shards, the JOINs are extremely tricky.
Denormalize can help and to denormalize the data we can add the fields of users table to the posts table. We can add username, profile_image_url to the post table.
Disaster Alert: Now we need some way to manage the denormalized data whenever the related data changes.
For example if a username is changed, we must be notified so that we can update the username in our denormalized table. The best way to update the data is to use event driven approach using Kafka or PubSub!
500 Million to 10 Billion
We now have denormalized shards with event driven system to update/delete related data. But at this point when we are about to hit 1B posts, event the shards can cross the limits of the resources like CPU, Memory, Disk and IO ops. A few things we can do here to help us is to. use:
- We can create multiple replicas for each shard.
- The replicas can help us to segregate the read & writes.
Archiving
The other helpful tactic is to identify unused rows and move them to cold storage
After 10 Billion
This is really very huge data volume! To keep the database up and running we have to repeat the steps we have done above.
- Create shards dynamically based on granular ranges.
- Create multiple replicas for each shards dynamically.
- Create partitions inside the shards.
- Keep archiving the unused old datasets.
- Implement strict observability, monitor slow queries and storage growth per shard.
- Use consistent hashing for even distribution and Vitess for automatic re-sharding
Consider changing the schema and if feasible, change database to high throughput storage systems like Cassandra, ClickHouse, BigQuery, etc.
Conclusion
In the end, scaling a huge table means making trade-offs. We give up some simple features so that we can gain speed and reliability across multiple servers. Hitting billions of rows works when we add smart routing, split our data in the right way, and use the right database tools for each type of work.
I tried my best to cover as much possible and as clearly as possible but still we skipped a few important aspects like the impact of migrations, the cross shard queries mechanics and detailed caching. Will try to cover these soon.
Stay tuned for more.