๐ Partitions Enable Parallelism
Partitions allow multiple consumers to process topic in parallel. Ordering guaranteed per partition.
Apache Kafka is a distributed streaming platform for building real-time data pipelines and streaming applications.
Topic = Category/stream of messages. Like a table or folder.
Characteristics:
Partition = Ordered sequence of messages within topic.
Why Partitions?
Partitioning Strategy:
Consumer Group = Group of consumers working together.
Key Rules:
Example:
Offset = Position of message in partition.
Offset Management:
Broker = Kafka server. Cluster = Multiple brokers.
Replication:
1from kafka import KafkaProducer2import json3
4class KafkaEventProducer:5 """Kafka producer for events"""6
7 def __init__(self, bootstrap_servers: list):8 self.producer = KafkaProducer(9 bootstrap_servers=bootstrap_servers,10 value_serializer=lambda v: json.dumps(v).encode('utf-8'),11 key_serializer=lambda k: k.encode('utf-8') if k else None,12 # Idempotent producer (exactly-once)13 enable_idempotence=True,14 acks='all', # Wait for all replicas15 retries=3,16 max_in_flight_requests_per_connection=117 )18
19 def send_event(self, topic: str, event: dict, key: str = None):20 """Send event to topic"""21 future = self.producer.send(22 topic,23 key=key,24 value=event25 )26
27 # Wait for acknowledgment28 try:29 record_metadata = future.get(timeout=10)30 print(f"Event sent to {record_metadata.topic} "31 f"partition {record_metadata.partition} "32 f"offset {record_metadata.offset}")33 return record_metadata34 except Exception as e:35 print(f"Error sending event: {e}")36 raise37
38 def send_with_callback(self, topic: str, event: dict, key: str = None):39 """Send event with callback"""40 def on_send_success(record_metadata):41 print(f"Event sent: {record_metadata.topic}/"42 f"{record_metadata.partition}/"43 f"{record_metadata.offset}")44
45 def on_send_error(exception):46 print(f"Error sending event: {exception}")47
48 self.producer.send(49 topic,50 key=key,51 value=event52 ).add_callback(on_send_success).add_errback(on_send_error)53
54 def flush(self):55 """Flush pending messages"""56 self.producer.flush()57
58 def close(self):59 """Close producer"""60 self.producer.close()61
62# Usage63producer = KafkaEventProducer(['localhost:9092'])64
65# Send event66producer.send_event('user-events', {67 'event_type': 'user.created',68 'user_id': 123,69 'email': 'john@example.com',70 'timestamp': '2024-01-01T10:00:00Z'71}, key='123') # Key ensures same user goes to same partition72
73producer.flush()74producer.close()1import org.apache.kafka.clients.producer.*;2import org.apache.kafka.common.serialization.StringSerializer;3import com.fasterxml.jackson.databind.ObjectMapper;4import java.util.Properties;5
6public class KafkaEventProducer {7 private final KafkaProducer<String, String> producer;8 private final ObjectMapper objectMapper = new ObjectMapper();9
10 public KafkaEventProducer(String bootstrapServers) {11 Properties props = new Properties();12 props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);13 props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);14 props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);15
16 // Idempotent producer (exactly-once)17 props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);18 props.put(ProducerConfig.ACKS_CONFIG, "all");19 props.put(ProducerConfig.RETRIES_CONFIG, 3);20 props.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 1);21
22 this.producer = new KafkaProducer<>(props);23 }24
25 public void sendEvent(String topic, Map<String, Object> event, String key) {26 try {27 String value = objectMapper.writeValueAsString(event);28
29 ProducerRecord<String, String> record = new ProducerRecord<>(30 topic, key, value31 );32
33 // Send with callback34 producer.send(record, (metadata, exception) -> {35 if (exception != null) {36 System.err.println("Error sending event: " + exception.getMessage());37 } else {38 System.out.println("Event sent to " + metadata.topic() +39 "/" + metadata.partition() +40 "/" + metadata.offset());41 }42 });43 } catch (Exception e) {44 System.err.println("Error serializing event: " + e.getMessage());45 }46 }47
48 public void flush() {49 producer.flush();50 }51
52 public void close() {53 producer.close();54 }55}56
57// Usage58KafkaEventProducer producer = new KafkaEventProducer("localhost:9092");59
60Map<String, Object> event = new HashMap<>();61event.put("event_type", "user.created");62event.put("user_id", 123);63event.put("email", "john@example.com");64event.put("timestamp", "2024-01-01T10:00:00Z");65
66producer.sendEvent("user-events", event, "123");67producer.flush();68producer.close();1from kafka import KafkaConsumer2from kafka.errors import KafkaError3import json4from typing import Callable5
6class KafkaEventConsumer:7 """Kafka consumer for events"""8
9 def __init__(self, bootstrap_servers: list, group_id: str):10 self.consumer = KafkaConsumer(11 bootstrap_servers=bootstrap_servers,12 group_id=group_id,13 value_deserializer=lambda m: json.loads(m.decode('utf-8')),14 key_deserializer=lambda k: k.decode('utf-8') if k else None,15 enable_auto_commit=False, # Manual offset commit16 auto_offset_reset='earliest', # Start from beginning if no offset17 max_poll_records=100 # Batch size18 )19
20 def subscribe(self, topics: list):21 """Subscribe to topics"""22 self.consumer.subscribe(topics)23 print(f"Subscribed to topics: {topics}")24
25 def consume(self, handler: Callable):26 """Consume messages"""27 try:28 while True:29 # Poll for messages (batch)30 message_batch = self.consumer.poll(timeout_ms=1000)31
32 for topic_partition, messages in message_batch.items():33 for message in messages:34 try:35 # Process message36 handler(message.value, message.key, message.offset)37
38 # Commit offset after processing39 self.consumer.commit()40 except Exception as e:41 print(f"Error processing message: {e}")42 # Don't commit - will retry43 except KeyboardInterrupt:44 print("Stopping consumer...")45 finally:46 self.consumer.close()47
48 def consume_with_manual_commit(self, handler: Callable):49 """Consume with manual offset commit"""50 try:51 while True:52 message_batch = self.consumer.poll(timeout_ms=1000)53
54 offsets_to_commit = {}55
56 for topic_partition, messages in message_batch.items():57 for message in messages:58 try:59 # Process message60 handler(message.value, message.key, message.offset)61
62 # Track offset for commit63 offsets_to_commit[topic_partition] = \64 OffsetAndMetadata(message.offset + 1, None)65 except Exception as e:66 print(f"Error processing message: {e}")67 # Don't commit failed messages68 break69
70 # Commit all processed offsets71 if offsets_to_commit:72 self.consumer.commit(offsets_to_commit)73 except KeyboardInterrupt:74 print("Stopping consumer...")75 finally:76 self.consumer.close()77
78# Usage79def handle_event(event, key, offset):80 print(f"Processing event: {event['event_type']} "81 f"key: {key} offset: {offset}")82 # Process event...83
84consumer = KafkaEventConsumer(85 ['localhost:9092'],86 group_id='event-processors'87)88consumer.subscribe(['user-events', 'order-events'])89consumer.consume(handle_event)1import org.apache.kafka.clients.consumer.*;2import org.apache.kafka.common.serialization.StringDeserializer;3import com.fasterxml.jackson.databind.ObjectMapper;4import java.time.Duration;5import java.util.*;6
7public class KafkaEventConsumer {8 private final KafkaConsumer<String, String> consumer;9 private final ObjectMapper objectMapper = new ObjectMapper();10
11 public KafkaEventConsumer(String bootstrapServers, String groupId) {12 Properties props = new Properties();13 props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);14 props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);15 props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);16 props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);17 props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false); // Manual commit18 props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");19 props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 100);20
21 this.consumer = new KafkaConsumer<>(props);22 }23
24 public void subscribe(List<String> topics) {25 consumer.subscribe(topics);26 System.out.println("Subscribed to topics: " + topics);27 }28
29 public void consume(java.util.function.Consumer<Map<String, Object>> handler) {30 try {31 while (true) {32 // Poll for messages (batch)33 ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));34
35 for (ConsumerRecord<String, String> record : records) {36 try {37 // Deserialize event38 Map<String, Object> event = objectMapper.readValue(39 record.value(),40 Map.class41 );42
43 // Process event44 handler.accept(event);45
46 // Commit offset after processing47 consumer.commitSync();48 } catch (Exception e) {49 System.err.println("Error processing message: " + e.getMessage());50 // Don't commit - will retry51 }52 }53 }54 } catch (Exception e) {55 System.err.println("Consumer error: " + e.getMessage());56 } finally {57 consumer.close();58 }59 }60}61
62// Usage63KafkaEventConsumer consumer = new KafkaEventConsumer(64 "localhost:9092",65 "event-processors"66);67
68consumer.subscribe(Arrays.asList("user-events", "order-events"));69
70consumer.consume(event -> {71 System.out.println("Processing event: " + event.get("event_type"));72 // Process event...73});Ensures message processed exactly once.
Idempotent Producer
Transactional Producer
Idempotent Consumer
1from kafka import KafkaProducer, KafkaConsumer2from kafka.errors import KafkaError3import json4from typing import Set5
6class ExactlyOnceProcessor:7 """Exactly-once message processing"""8
9 def __init__(self, bootstrap_servers: list, group_id: str):10 # Idempotent producer11 self.producer = KafkaProducer(12 bootstrap_servers=bootstrap_servers,13 enable_idempotence=True,14 acks='all',15 transactional_id='exactly-once-producer'16 )17
18 # Consumer with manual commit19 self.consumer = KafkaConsumer(20 bootstrap_servers=bootstrap_servers,21 group_id=group_id,22 enable_auto_commit=False,23 isolation_level='read_committed' # Only read committed messages24 )25
26 # Track processed offsets (for idempotency)27 self.processed_offsets: Set[tuple] = set()28
29 def process_exactly_once(self, topic: str, handler):30 """Process messages exactly once"""31 self.consumer.subscribe([topic])32
33 try:34 while True:35 message_batch = self.consumer.poll(timeout_ms=1000)36
37 for topic_partition, messages in message_batch.items():38 # Begin transaction39 self.producer.begin_transaction()40
41 try:42 for message in messages:43 # Check if already processed (idempotency)44 offset_key = (topic_partition.topic,45 topic_partition.partition,46 message.offset)47
48 if offset_key in self.processed_offsets:49 print(f"Skipping duplicate: {offset_key}")50 continue51
52 # Process message53 result = handler(message.value)54
55 # Send result to output topic (in transaction)56 if result:57 self.producer.send('output-topic',58 value=json.dumps(result))59
60 # Mark as processed61 self.processed_offsets.add(offset_key)62
63 # Commit transaction (atomic)64 self.producer.commit_transaction()65
66 # Commit consumer offset67 self.consumer.commit()68 except Exception as e:69 # Abort transaction on error70 self.producer.abort_transaction()71 print(f"Error processing: {e}")72 finally:73 self.producer.close()74 self.consumer.close()1import org.apache.kafka.clients.producer.KafkaProducer;2import org.apache.kafka.clients.producer.ProducerConfig;3import org.apache.kafka.clients.consumer.KafkaConsumer;4import org.apache.kafka.clients.consumer.ConsumerConfig;5import org.apache.kafka.clients.consumer.ConsumerRecord;6import org.apache.kafka.clients.consumer.ConsumerRecords;7import java.util.*;8
9public class ExactlyOnceProcessor {10 private final KafkaProducer<String, String> producer;11 private final KafkaConsumer<String, String> consumer;12 private final Set<String> processedOffsets = new HashSet<>();13
14 public ExactlyOnceProcessor(String bootstrapServers, String groupId) {15 // Idempotent producer16 Properties producerProps = new Properties();17 producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);18 producerProps.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);19 producerProps.put(ProducerConfig.ACKS_CONFIG, "all");20 producerProps.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "exactly-once-producer");21 this.producer = new KafkaProducer<>(producerProps);22 this.producer.initTransactions();23
24 // Consumer with manual commit25 Properties consumerProps = new Properties();26 consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);27 consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);28 consumerProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);29 consumerProps.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed");30 this.consumer = new KafkaConsumer<>(consumerProps);31 }32
33 public void processExactlyOnce(String topic, java.util.function.Function<String, String> handler) {34 consumer.subscribe(Collections.singletonList(topic));35
36 try {37 while (true) {38 ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));39
40 // Begin transaction41 producer.beginTransaction();42
43 try {44 for (ConsumerRecord<String, String> record : records) {45 // Check if already processed46 String offsetKey = record.topic() + "-" +47 record.partition() + "-" +48 record.offset();49
50 if (processedOffsets.contains(offsetKey)) {51 System.out.println("Skipping duplicate: " + offsetKey);52 continue;53 }54
55 // Process message56 String result = handler.apply(record.value());57
58 // Send result (in transaction)59 if (result != null) {60 producer.send(new ProducerRecord<>("output-topic", result));61 }62
63 // Mark as processed64 processedOffsets.add(offsetKey);65 }66
67 // Commit transaction68 producer.commitTransaction();69
70 // Commit consumer offset71 consumer.commitSync();72 } catch (Exception e) {73 // Abort transaction on error74 producer.abortTransaction();75 System.err.println("Error processing: " + e.getMessage());76 }77 }78 } finally {79 producer.close();80 consumer.close();81 }82 }83}๐ Partitions Enable Parallelism
Partitions allow multiple consumers to process topic in parallel. Ordering guaranteed per partition.
๐ฅ Consumer Groups Scale
Consumer groups distribute partitions across consumers. Add consumers to scale throughput.
๐ Offsets Track Progress
Offsets track consumer position. Commit offsets to resume after restart. Critical for reliability.
โ Exactly-Once is Complex
Exactly-once requires idempotent producer, transactions, and idempotent consumer. Use when duplicates are critical.