Sharding & Partitioning
Why Shard?
Section titled “Why Shard?”When a single database becomes too large or slow, sharding splits it into smaller pieces distributed across multiple servers.
Horizontal Partitioning (Sharding)
Section titled “Horizontal Partitioning (Sharding)”Horizontal partitioning splits a table by rows. Each partition (shard) contains different rows based on a partition key.
How Sharding Works
Section titled “How Sharding Works”Key Concept: Rows are distributed across shards based on the shard key. All rows with the same shard key value go to the same shard.
Shard Key Selection
Section titled “Shard Key Selection”Choosing the right shard key is critical for even distribution and query performance.
Good Shard Keys:
- ✅ user_id: Even distribution, most queries are user-specific
- ✅ country: Geographic distribution, queries often country-specific
- ✅ tenant_id: Multi-tenant applications
Bad Shard Keys:
- ❌ created_at: Skewed (recent data in one shard)
- ❌ status: Uneven distribution (most rows might be “active”)
- ❌ email: Can work but harder to distribute evenly
Vertical Partitioning
Section titled “Vertical Partitioning”Vertical partitioning splits a table by columns. Different columns are stored in different tables or databases.
How Vertical Partitioning Works
Section titled “How Vertical Partitioning Works”Use Cases:
- Hot vs Cold data: Frequently accessed columns vs rarely accessed
- Large blobs: Store separately (e.g., images, documents)
- Different access patterns: Some columns read often, others write often
Sharding Strategies
Section titled “Sharding Strategies”Strategy 1: Range-Based Sharding
Section titled “Strategy 1: Range-Based Sharding”Range-based sharding partitions data by value ranges.
Pros:
- ✅ Simple to understand
- ✅ Easy range queries (e.g., “users 1-100”)
Cons:
- ❌ Can cause hotspots (if IDs are sequential)
- ❌ Hard to rebalance
Strategy 2: Hash-Based Sharding
Section titled “Strategy 2: Hash-Based Sharding”Hash-based sharding uses a hash function on the shard key to determine the shard.
Pros:
- ✅ Even distribution
- ✅ No hotspots
- ✅ Easy to add/remove shards
Cons:
- ❌ Hard to do range queries (need to query all shards)
- ❌ Hard to rebalance (need to rehash everything)
Strategy 3: Directory-Based Sharding
Section titled “Strategy 3: Directory-Based Sharding”Directory-based sharding uses a lookup table (directory) to map shard keys to shards.
Pros:
- ✅ Flexible (can move data easily)
- ✅ Easy to rebalance
- ✅ Can use any shard key
Cons:
- ❌ Single point of failure (directory)
- ❌ Extra lookup overhead
- ❌ Directory can become a bottleneck
Challenges of Sharding
Section titled “Challenges of Sharding”Challenge 1: Cross-Shard Queries
Section titled “Challenge 1: Cross-Shard Queries”Solution: Design shard key to match query patterns. If queries are country-based, shard by country.
Challenge 2: Rebalancing
Section titled “Challenge 2: Rebalancing”When adding or removing shards, data must be rebalanced.
Solution: Use consistent hashing or directory-based sharding for easier rebalancing.
Challenge 3: Cross-Shard Transactions
Section titled “Challenge 3: Cross-Shard Transactions”Solution: Use Saga pattern for distributed transactions, or design to avoid cross-shard operations.
LLD ↔ HLD Connection
Section titled “LLD ↔ HLD Connection”How sharding affects your class design:
Shard-Aware Repository
Section titled “Shard-Aware Repository”1import hashlib2
3class ShardRouter:4 def __init__(self, shards):5 self.shards = shards # List of database connections6 self.num_shards = len(shards)7
8 def get_shard(self, shard_key: int):9 # Hash-based sharding10 shard_index = shard_key % self.num_shards11 return self.shards[shard_index]12
13 def get_shard_by_user_id(self, user_id: int):14 return self.get_shard(user_id)15
16class ShardedUserRepository:17 def __init__(self, shard_router):18 self.router = shard_router19
20 def find_by_id(self, user_id: int):21 # Route to correct shard22 shard = self.router.get_shard_by_user_id(user_id)23 with shard.cursor() as cursor:24 cursor.execute("SELECT * FROM users WHERE id = %s", (user_id,))25 return cursor.fetchone()26
27 def save(self, user):28 # Route to correct shard29 shard = self.router.get_shard_by_user_id(user.id)30 with shard.cursor() as cursor:31 cursor.execute(32 "INSERT INTO users (id, name, email) VALUES (%s, %s, %s)",33 (user.id, user.name, user.email)34 )35 shard.commit()36
37 def find_by_country(self, country: str):38 # Cross-shard query - query all shards39 results = []40 for shard in self.router.shards:41 with shard.cursor() as cursor:42 cursor.execute(43 "SELECT * FROM users WHERE country = %s",44 (country,)45 )46 results.extend(cursor.fetchall())47 return results1import java.sql.*;2import java.util.*;3
4public class ShardRouter {5 private final List<Connection> shards;6 private final int numShards;7
8 public ShardRouter(List<Connection> shards) {9 this.shards = shards;10 this.numShards = shards.size();11 }12
13 public Connection getShard(int shardKey) {14 // Hash-based sharding15 int shardIndex = shardKey % numShards;16 return shards.get(shardIndex);17 }18
19 public Connection getShardByUserId(int userId) {20 return getShard(userId);21 }22}23
24public class ShardedUserRepository {25 private final ShardRouter router;26
27 public ShardedUserRepository(ShardRouter router) {28 this.router = router;29 }30
31 public User findById(int userId) throws SQLException {32 // Route to correct shard33 Connection shard = router.getShardByUserId(userId);34 try (PreparedStatement stmt = shard.prepareStatement(35 "SELECT * FROM users WHERE id = ?"36 )) {37 stmt.setInt(1, userId);38 ResultSet rs = stmt.executeQuery();39 if (rs.next()) {40 return mapToUser(rs);41 }42 }43 return null;44 }45
46 public void save(User user) throws SQLException {47 // Route to correct shard48 Connection shard = router.getShardByUserId(user.getId());49 try (PreparedStatement stmt = shard.prepareStatement(50 "INSERT INTO users (id, name, email) VALUES (?, ?, ?)"51 )) {52 stmt.setInt(1, user.getId());53 stmt.setString(2, user.getName());54 stmt.setString(3, user.getEmail());55 stmt.executeUpdate();56 shard.commit();57 }58 }59
60 public List<User> findByCountry(String country) throws SQLException {61 // Cross-shard query - query all shards62 List<User> results = new ArrayList<>();63 for (Connection shard : router.shards) {64 try (PreparedStatement stmt = shard.prepareStatement(65 "SELECT * FROM users WHERE country = ?"66 )) {67 stmt.setString(1, country);68 ResultSet rs = stmt.executeQuery();69 while (rs.next()) {70 results.add(mapToUser(rs));71 }72 }73 }74 return results;75 }76
77 private User mapToUser(ResultSet rs) throws SQLException {78 // Map result set to User object79 return new User(rs.getInt("id"), rs.getString("name"), rs.getString("email"));80 }81}Deep Dive: Advanced Sharding Considerations
Section titled “Deep Dive: Advanced Sharding Considerations”Shard Key Selection: The Critical Decision
Section titled “Shard Key Selection: The Critical Decision”Choosing the wrong shard key can kill your system. Here’s what senior engineers consider:
Hotspot Problem
Section titled “Hotspot Problem”Problem: Uneven distribution creates hotspots (one shard overloaded).
Example:
- Bad shard key:
created_at(timestamp) - Result: All new data goes to one shard → hotspot
- Impact: That shard becomes bottleneck, others idle
Solution:
- Good shard key:
user_id(distributed evenly) - Better:
hash(user_id)for even distribution - Best: Composite key
(tenant_id, user_id)for multi-tenant apps
Production Example: Instagram
- Shard key:
user_id(notphoto_id) - Why: Most queries are user-specific (“show my photos”)
- Result: User’s data in one shard → fast queries
- Trade-off: Cross-user queries hit all shards (acceptable)
Query Pattern Analysis
Section titled “Query Pattern Analysis”Senior engineers analyze query patterns before choosing shard key:
Rule: Optimize shard key for 80% of queries, accept cross-shard for remaining 20%.
Consistent Hashing: Advanced Sharding Strategy
Section titled “Consistent Hashing: Advanced Sharding Strategy”Consistent hashing solves the rebalancing problem elegantly.
How it works:
- Map shards and keys to a hash ring
- Each key maps to the next shard clockwise
- Adding/removing shards only affects adjacent keys
Benefits:
- ✅ Minimal rebalancing: Only ~1/N keys move when adding shard
- ✅ No directory needed: Direct hash calculation
- ✅ Handles failures: Failed shard’s keys redistribute automatically
Production Example: DynamoDB
- Uses consistent hashing for partition key
- Partition count: Automatically scales (adds partitions as data grows)
- Rebalancing: Seamless, minimal data movement
- Performance: O(1) lookup, no directory overhead
Shard Rebalancing: Production Challenges
Section titled “Shard Rebalancing: Production Challenges”Challenge 1: Zero-Downtime Rebalancing
Section titled “Challenge 1: Zero-Downtime Rebalancing”Problem: Moving data between shards while serving traffic.
Solution: Dual-Write Pattern
Timeline: Typically 1-7 days depending on data size
Challenge 2: Handling Rebalancing Failures
Section titled “Challenge 2: Handling Rebalancing Failures”Problem: What if rebalancing fails mid-way?
Solutions:
- Checkpointing: Save progress, resume from checkpoint
- Rollback: Keep old shard until new shard verified
- Monitoring: Alert on rebalancing failures
- Automation: Retry with exponential backoff
Production Pattern:
1class ShardRebalancer:2 def rebalance(self, old_shard, new_shard, key_range):3 checkpoint = self.load_checkpoint(key_range)4 for key in key_range:5 if key < checkpoint:6 continue # Skip already copied7
8 try:9 # Copy data10 data = old_shard.read(key)11 new_shard.write(key, data)12
13 # Save checkpoint14 self.save_checkpoint(key)15 except Exception as e:16 # Log and continue (will retry)17 self.log_error(key, e)18 # Can rollback if neededCross-Shard Query Optimization
Section titled “Cross-Shard Query Optimization”Strategy 1: Fan-Out Queries
Section titled “Strategy 1: Fan-Out Queries”Pattern: Query all shards in parallel, merge results.
Performance:
- Sequential: 4 shards × 50ms = 200ms total
- Parallel: max(50ms across shards) = 50ms total
- Improvement: 4x faster!
Code Pattern:
1async def cross_shard_query(self, query):2 # Query all shards in parallel3 tasks = [shard.query(query) for shard in self.shards]4 results = await asyncio.gather(*tasks)5
6 # Merge results7 return self.merge_results(results)Limitations:
- Cost: N queries instead of 1
- Latency: Limited by slowest shard
- Consistency: Results may be from different points in time
Strategy 2: Materialized Views
Section titled “Strategy 2: Materialized Views”Pattern: Pre-compute cross-shard aggregations.
Example:
- Base data: Sharded by
user_id - Materialized view: Aggregated by
country(updated periodically) - Query: “Users by country” → Query materialized view (single shard)
Trade-offs:
- ✅ Fast queries: Single-shard lookup
- ❌ Stale data: Updated periodically (eventual consistency)
- ❌ Storage: Extra storage for views
- ❌ Maintenance: Must keep views updated
Production Considerations: Real-World Sharding
Section titled “Production Considerations: Real-World Sharding”Consideration 1: Shard Size Limits
Section titled “Consideration 1: Shard Size Limits”Problem: How big should a shard be?
Industry Standards:
- PostgreSQL: ~100GB per shard (performance degrades after)
- MySQL: ~500GB per shard (with proper indexing)
- MongoDB: ~64GB per shard (recommended limit)
- Cassandra: ~1TB per node (but partitions within)
Why Limits Matter:
- Backup time: Larger shards = longer backup windows
- Query performance: Larger shards = slower queries
- Recovery time: Larger shards = longer recovery (MTTR)
Best Practice: Plan for growth. Start sharding before hitting limits.
Consideration 2: Geographic Sharding
Section titled “Consideration 2: Geographic Sharding”Pattern: Shard by geographic region.
Benefits:
- ✅ Latency: Data closer to users
- ✅ Compliance: Data residency requirements
- ✅ Disaster recovery: Regional failures isolated
Example: Multi-Region Setup
- US-East: US users
- EU-West: European users
- Asia-Pacific: Asian users
Challenges:
- Cross-region queries: High latency
- Data synchronization: Complex
- Consistency: Eventual across regions
Consideration 3: Shard Monitoring
Section titled “Consideration 3: Shard Monitoring”What to Monitor:
- Shard size: Alert when approaching limits
- Query distribution: Detect hotspots
- Cross-shard query rate: Optimize if too high
- Rebalancing status: Track progress
- Shard health: Detect failures early
Production Metrics:
1class ShardMonitor:2 def collect_metrics(self):3 return {4 'shard_sizes': {s.id: s.size() for s in self.shards},5 'query_distribution': self.get_query_distribution(),6 'cross_shard_rate': self.get_cross_shard_rate(),7 'hotspots': self.detect_hotspots(),8 'rebalancing_status': self.get_rebalancing_status()9 }Performance Benchmarks: Sharding Impact
Section titled “Performance Benchmarks: Sharding Impact”| Metric | Single DB | 4 Shards | 16 Shards |
|---|---|---|---|
| Write Throughput | 5K ops/sec | 20K ops/sec | 80K ops/sec |
| Read Throughput | 10K ops/sec | 40K ops/sec | 160K ops/sec |
| Query Latency (single-shard) | 10ms | 10ms | 10ms |
| Query Latency (cross-shard) | N/A | 50ms | 200ms |
| Storage Capacity | 500GB | 2TB | 8TB |
Key Insights:
- Throughput scales linearly with shard count
- Single-shard queries: No latency impact
- Cross-shard queries: Latency increases with shard count
- Storage: Scales horizontally
Key Takeaways
Section titled “Key Takeaways”What’s Next?
Section titled “What’s Next?”Now that you understand relational databases and scaling, let’s explore NoSQL databases and when to use them:
Next up: NoSQL Databases — Learn about Document, Key-Value, Column-family, and Graph databases.