Strictly guaranteeing message uniqueness and ordering in Kafka while avoiding message duplication or loss during network issues is a complex requirement. It requires comprehensive design from three aspects: producer, consumer, and offset management.
Requirements Analysis
Uniqueness
- Ensure each message is consumed only once, avoiding duplicate consumption
- May need to carry unique identifiers (like UUID) in messages and implement deduplication at the consumer end
Ordering
- Guarantee messages are consumed in the order sent by the producer
- Kafka’s partitioning mechanism naturally supports in-partition ordering, so related messages need to be sent to the same partition
Reliability Under Network Issues
- Network interruptions may cause offset commit failures or messages not being properly processed
- Need to ensure “at-least-once” consumption and achieve “exactly-once” consumption through deduplication
Based on these requirements, we need:
- Ensure ordered messages with unique identifiers at the producer end
- Use manual offset commit with idempotency checks at the consumer end
- Maintain consistency through transactional or log recovery mechanisms in exceptional cases (like network issues)
Solution Design
1. Producer Side
Ensuring order: Send messages that need ordered consumption to the same partition. This can be achieved by specifying a partition or using a consistent key (Kafka determines the partition based on the key’s hash). Ensuring uniqueness: Add unique identifiers (like UUID) to each message and carry them in the message content.
Example producer code:
package main
import (
"fmt"
"github.com/IBM/sarama"
"log"
"time"
)
func main() {
config := sarama.NewConfig()
config.Producer.RequiredAcks = sarama.WaitForAll // Wait for all replicas to confirm
config.Producer.Retry.Max = 5 // Number of retries
config.Producer.Return.Successes = true // Return success messages
producer, err := sarama.NewSyncProducer([]string{"localhost:9092"}, config)
if err != nil {
log.Fatalf("Failed to create producer: %v", err)
}
defer producer.Close()
topic := "test-topic"
for i := 0; i < 10; i++ {
msg := &sarama.ProducerMessage{
Topic: topic,
Key: sarama.StringEncoder("order-key"), // Use fixed key for ordering
Value: sarama.StringEncoder(fmt.Sprintf("msg-%d|%s", i, time.Now().String())),
}
partition, offset, err := producer.SendMessage(msg)
if err != nil {
log.Printf("Failed to send message: %v", err)
} else {
fmt.Printf("Message sent to partition %d, offset %d\n", partition, offset)
}
}
}
2. Consumer Side
- Ordering: Consume from a single partition (or ensure consumer group assigns only one partition)
- Uniqueness: Use memory or persistent storage (like Redis, database) to record processed message IDs for deduplication
- Offset commit: Manually commit offsets, ensuring messages are processed successfully before committing
- Network issue handling: Use transactional commits or logging to ensure recovery after exceptions
Here’s an example consumer code:
package main
import (
"context"
"fmt"
"log"
"os"
"os/signal"
"strings"
"sync"
"github.com/IBM/sarama"
)
// ProcessedMessages for recording processed message IDs (simulating persistent storage)
type ProcessedMessages struct {
sync.Mutex
ids map[string]bool
}
func NewProcessedMessages() *ProcessedMessages {
return &ProcessedMessages{
ids: make(map[string]bool),
}
}
func (p *ProcessedMessages) IsProcessed(id string) bool {
p.Lock()
defer p.Unlock()
return p.ids[id]
}
func (p *ProcessedMessages) MarkProcessed(id string) {
p.Lock()
defer p.Unlock()
p.ids[id] = true
}
type consumerGroupHandler struct {
processed *ProcessedMessages
}
func (h consumerGroupHandler) Setup(_ sarama.ConsumerGroupSession) error { return nil }
func (h consumerGroupHandler) Cleanup(_ sarama.ConsumerGroupSession) error { return nil }
func (h consumerGroupHandler) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
for msg := range claim.Messages() {
// Assume message format is "msg-id|content"
parts := strings.SplitN(string(msg.Value), "|", 2)
if len(parts) != 2 {
log.Printf("Invalid message format: %s", msg.Value)
continue
}
msgID := parts[0]
// Check if already processed
if h.processed.IsProcessed(msgID) {
log.Printf("Message %s already processed, skipping", msgID)
session.MarkMessage(msg, "") // Mark as processed to avoid duplicate commits
continue
}
// Process message
fmt.Printf("Processing message: Offset=%d, ID=%s, Value=%s\n", msg.Offset, msgID, parts[1])
// Simulate business processing
// Add database operations, logging, etc. here
// Mark as processed
h.processed.MarkProcessed(msgID)
// Commit offset
session.MarkMessage(msg, "")
session.Commit() // Manual commit to ensure successful processing
}
return nil
}
func main() {
config := sarama.NewConfig()
config.Consumer.Group.Rebalance.Strategy = sarama.BalanceStrategyRoundRobin
config.Consumer.Offsets.Initial = sarama.OffsetOldest
config.Consumer.Offsets.AutoCommit.Enable = false // Disable auto-commit
client, err := sarama.NewConsumerGroup([]string{"localhost:9092"}, "my-group", config)
if err != nil {
log.Fatalf("Failed to create consumer group: %v", err)
}
defer client.Close()
processed := NewProcessedMessages()
handler := consumerGroupHandler{processed: processed}
topics := []string{"test-topic"}
ctx, cancel := context.WithCancel(context.Background())
signals := make(chan os.Signal, 1)
signal.Notify(signals, os.Interrupt)
go func() {
for {
err := client.Consume(ctx, topics, handler)
if err != nil {
log.Printf("Consumer group error: %v", err)
}
if ctx.Err() != nil {
return
}
}
}()
<-signals
fmt.Println("Interrupt received, shutting down...")
cancel()
}
Key Points Analysis
Uniqueness Guarantee
- Use
ProcessedMessagesstructure to record processed message IDs (recommend using Redis or database for persistence in production) - Check if ID exists before processing message, skip if already exists
Ordering Guarantee
- Producer uses fixed key (e.g., “order-key”) to ensure messages go to same partition
- Consumer only consumes from single partition (or assigns single partition through consumer group)
Reliability Under Network Issues
- Manual offset commit: Only commit offset after message is successfully processed and recorded
- Idempotency: Even if network interruption causes offset commit failure, duplicate processing is avoided through ID check on restart
- Transactions (optional): For stronger guarantees, use Kafka’s transactional producers and consumers (supported by sarama but requires additional configuration)
Exception Scenario Handling
Network Interruption, Offset Not Committed
- Consumer starts from last committed offset after restart
- Processed messages are skipped (due to recorded IDs)
Processing Failure, Offset Not Committed
- Next consumption will retry, ensuring no message loss
Duplicate Consumption
- Avoid duplicate processing through ID checking
Optimization Suggestions
Persistent Storage
Replace ProcessedMessages with Redis or database to ensure state persistence after restart.
Example (using Redis):
import "github.com/go-redis/redis/v8"
func (h consumerGroupHandler) IsProcessed(ctx context.Context, id string) bool {
val, err := h.redis.Get(ctx, id).Result()
return err == nil && val == "1"
}
func (h consumerGroupHandler) MarkProcessed(ctx context.Context, id string) {
h.redis.Set(ctx, id, "1", 0)
}
Batch Commit
For better performance, process and commit offsets in batches:
batchSize := 10
count := 0
for msg := range claim.Messages() {
// Process message
session.MarkMessage(msg, "")
count++
if count%batchSize == 0 {
session.Commit()
}
}
Logging
Record operation logs when processing messages for state recovery during exceptions.
Summary
You can meet the requirements through these measures:
- Producer: Fixed key for ordering, unique ID for deduplication
- Consumer: Manual offset commit with idempotency check (ID deduplication)
- Exception handling: Persist processed states to ensure consistency after network issues
This solution implements “exactly-once” semantics and prevents message duplication or loss during network issues. For stronger guarantees, you can further introduce Kafka’s transaction support or external state management (like database transactions).
Comments