Apache Kafka stands as one of the most powerful and efficient distributed messaging systems in modern data architectures, offering an ideal solution for developing high-performance applications. In this comprehensive article, we will delve into the technical architecture behind Kafka’s exceptional performance and demonstrate its effective implementation with Go programming language through detailed examples and real-world scenarios.
Let’s demonstrate Kafka’s basic architecture with the following diagram:
The fundamental elements that provide Kafka’s speed are:
- Sequential Disk I/O
- Zero-Copy Transfer
- Batch Processing
- Partitioning & Parallelism
- Operating System Page Cache
- Compression
- Acks and Replication Settings
- Producer & Consumer Tuning
- Message Storage & Compression Policies
- Real-time Processing with Kafka Streams
- Zookeeper or KRaft Mode
- Monitoring & Performance Tuning
1. Sequential Disk I/O
Kafka minimizes disk access costs by writing messages sequentially to disk.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
|
func writeSequentially(filename string, messages []string) error {
file, err := os.OpenFile(filename, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644)
if err != nil {
return err
}
defer file.Close()
for _, msg := range messages {
if _, err := file.WriteString(msg + "\n"); err != nil {
return err
}
}
return nil
}
|
2. Zero-Copy Transfer
Kafka transfers data faster to disk and socket by bypassing user space with zero-copy.
1
2
3
4
5
6
7
8
9
10
|
func sendFileZeroCopy(conn net.Conn, filename string) error {
file, err := os.Open(filename)
if err != nil {
return err
}
defer file.Close()
_, err = io.Copy(conn, file)
return err
}
|
3. Batch Processing
Kafka reduces write costs by grouping messages.
1
2
3
4
5
6
7
|
func processBatch(messages []Message) {
batch := make([]string, 0, len(messages))
for _, msg := range messages {
batch = append(batch, msg.Value)
}
writeSequentially("kafka.log", batch)
}
|
Important settings:
batch.size
: Batch message size
linger.ms
: Send delay (to wait for more messages)
4. Partitioning & Parallelism
Kafka enables parallel consumption by dividing topics into partitions.
1
2
3
4
5
6
7
8
9
10
11
|
func processPartitions(partitions [][]Message) {
var wg sync.WaitGroup
for i, partition := range partitions {
wg.Add(1)
go func(partitionNum int, msgs []Message) {
defer wg.Done()
processBatch(msgs)
}(i, partition)
}
wg.Wait()
}
|
5. Operating System Page Cache
Kafka utilizes the operating system’s page cache for disk access.
1
2
3
|
func readWithCache(filename string) ([]byte, error) {
return os.ReadFile(filename) // Uses OS-level cache
}
|
6. Compression
Kafka can compress messages using formats like gzip, lz4, snappy, and zstd.
1
2
3
4
5
6
7
8
9
10
|
func compressData(data []byte) ([]byte, error) {
var buf bytes.Buffer
zw := gzip.NewWriter(&buf)
_, err := zw.Write(data)
if err != nil {
return nil, err
}
zw.Close()
return buf.Bytes(), nil
}
|
7. Acks and Replication Settings
Data safety and speed in Kafka are managed through acks settings:
acks=0
: No acknowledgment (fastest, high risk of data loss)
acks=1
: Only leader acknowledgment
acks=all
: All replicas acknowledgment (most reliable)
Additionally, minimum replication level can be set with min.insync.replicas
.
8. Producer & Consumer Settings
Producer Settings:
buffer.memory
: Buffer size in memory
compression.type
: Compression type
retries
: Number of retries
Consumer Settings:
fetch.min.bytes
: Minimum data threshold from broker
max.poll.records
: Maximum messages per poll
enable.auto.commit
: Automatic offset commit
9. Message Storage & Compression Policies
Kafka can store messages by:
- Time (
retention.ms
)
- Size (
retention.bytes
)
- Key-based compaction (
cleanup.policy=compact
)
This affects disk efficiency and access speed.
10. Real-time Processing with Kafka Streams
Kafka is not just a message queue but also a data processing engine.
1
2
3
4
|
// Example windowing operation - pseudo code
streamsBuilder.Stream("topic").
WindowedBy(TimeWindows.of(Duration.ofSeconds(10))).
Reduce((a, b) -> a + b)
|
11. KRaft Mode Instead of Zookeeper
Starting from Kafka 2.8+, KRaft (Kafka Raft) architecture can be used without Zookeeper. This simplifies management and reduces metadata latency.
Kafka’s performance can be optimized with proper monitoring tools:
- Prometheus + JMX Exporter
- Grafana dashboards
- Kafka Manager / AKHQ
Metrics to monitor:
- Message volume
- Consumer lag
- Disk usage
- Garbage Collection times
- Network and disk I/O
13. Go-Specific Kafka Implementations
Sarama vs Confluent-kafka-go Comparison
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
|
// Sarama Producer Example
config := sarama.NewConfig()
config.Producer.Return.Successes = true
producer, err := sarama.NewSyncProducer([]string{"localhost:9092"}, config)
if err != nil {
log.Fatal(err)
}
defer producer.Close()
msg := &sarama.ProducerMessage{
Topic: "test-topic",
Value: sarama.StringEncoder("test message"),
}
partition, offset, err := producer.SendMessage(msg)
// Confluent-kafka-go Producer Example
p, err := kafka.NewProducer(&kafka.ConfigMap{
"bootstrap.servers": "localhost:9092",
})
if err != nil {
log.Fatal(err)
}
defer p.Close()
p.Produce(&kafka.Message{
TopicPartition: kafka.TopicPartition{Topic: &topic, Partition: kafka.PartitionAny},
Value: []byte("test message"),
}, nil)
|
Goroutine-based Consumer Pool
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
|
type ConsumerPool struct {
consumers []*Consumer
wg sync.WaitGroup
ctx context.Context
cancel context.CancelFunc
}
func NewConsumerPool(brokers []string, topic string, numConsumers int) *ConsumerPool {
ctx, cancel := context.WithCancel(context.Background())
pool := &ConsumerPool{
consumers: make([]*Consumer, numConsumers),
ctx: ctx,
cancel: cancel,
}
for i := 0; i < numConsumers; i++ {
pool.consumers[i] = NewConsumer(brokers, topic)
}
return pool
}
func (p *ConsumerPool) Start() {
for _, consumer := range p.consumers {
p.wg.Add(1)
go func(c *Consumer) {
defer p.wg.Done()
c.Consume(p.ctx)
}(consumer)
}
}
func (p *ConsumerPool) Stop() {
p.cancel()
p.wg.Wait()
}
|
14. Load Balancing and Consumer Groups
Consumer Group Rebalancing Strategies
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
|
// Consumer Group Configuration
config := sarama.NewConfig()
config.Consumer.Group.Rebalance.Strategy = sarama.BalanceStrategyRoundRobin
config.Consumer.Group.Session.Timeout = 20 * time.Second
config.Consumer.Group.Heartbeat.Interval = 6 * time.Second
// Consumer Lag Monitoring
func monitorConsumerLag(group string) {
admin, err := sarama.NewClusterAdmin([]string{"localhost:9092"}, nil)
if err != nil {
log.Fatal(err)
}
defer admin.Close()
for {
lags, err := admin.ListConsumerGroupOffsets(group, nil)
if err != nil {
log.Printf("Error getting consumer lag: %v", err)
continue
}
// Process lag information
time.Sleep(1 * time.Minute)
}
}
|
15. Network and Serialization
Schema Registry Integration
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
|
// Avro Serialization with Schema Registry
type User struct {
Name string `avro:"name"`
Email string `avro:"email"`
}
func serializeWithAvro(user User) ([]byte, error) {
schema, err := avro.Parse(`{
"type": "record",
"name": "User",
"fields": [
{"name": "name", "type": "string"},
{"name": "email", "type": "string"}
]
}`)
if err != nil {
return nil, err
}
return avro.Marshal(schema, user)
}
// Network Buffer Settings
config := sarama.NewConfig()
config.Net.MaxOpenRequests = 5
config.Net.DialTimeout = 30 * time.Second
config.Net.ReadTimeout = 30 * time.Second
config.Net.WriteTimeout = 30 * time.Second
|
16. Error Handling and Resilience
Circuit Breaker Pattern
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
|
type CircuitBreaker struct {
failures int
threshold int
resetTimeout time.Duration
lastFailure time.Time
mu sync.Mutex
}
func (cb *CircuitBreaker) Execute(f func() error) error {
cb.mu.Lock()
if cb.failures >= cb.threshold {
if time.Since(cb.lastFailure) < cb.resetTimeout {
cb.mu.Unlock()
return errors.New("circuit breaker open")
}
cb.failures = 0
}
cb.mu.Unlock()
err := f()
if err != nil {
cb.mu.Lock()
cb.failures++
cb.lastFailure = time.Now()
cb.mu.Unlock()
}
return err
}
// Dead Letter Queue Implementation
func processWithDLQ(msg *sarama.ConsumerMessage) error {
err := processMessage(msg)
if err != nil {
// Send to DLQ
dlqMsg := &sarama.ProducerMessage{
Topic: "dlq-topic",
Value: sarama.ByteEncoder(msg.Value),
Headers: []sarama.RecordHeader{
{
Key: []byte("original-topic"),
Value: []byte(msg.Topic),
},
{
Key: []byte("error"),
Value: []byte(err.Error()),
},
},
}
producer.SendMessage(dlqMsg)
}
return err
}
|
17. Real-World Usage Examples
Log Aggregation Pattern
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
|
type LogAggregator struct {
producer sarama.SyncProducer
topic string
}
func (la *LogAggregator) SendLog(service string, level string, message string) error {
logEntry := map[string]interface{}{
"timestamp": time.Now().Unix(),
"service": service,
"level": level,
"message": message,
}
jsonData, err := json.Marshal(logEntry)
if err != nil {
return err
}
msg := &sarama.ProducerMessage{
Topic: la.topic,
Value: sarama.ByteEncoder(jsonData),
}
_, _, err = la.producer.SendMessage(msg)
return err
}
|
Event Sourcing Implementation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
|
type EventStore struct {
producer sarama.SyncProducer
topic string
}
func (es *EventStore) SaveEvent(aggregateID string, eventType string, data interface{}) error {
event := map[string]interface{}{
"aggregate_id": aggregateID,
"event_type": eventType,
"data": data,
"timestamp": time.Now().Unix(),
}
jsonData, err := json.Marshal(event)
if err != nil {
return err
}
msg := &sarama.ProducerMessage{
Topic: es.topic,
Key: sarama.StringEncoder(aggregateID),
Value: sarama.ByteEncoder(jsonData),
}
_, _, err = es.producer.SendMessage(msg)
return err
}
|
18. Benchmarking and Testing
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
|
func benchmarkKafkaPerformance(b *testing.B) {
config := sarama.NewConfig()
config.Producer.Return.Successes = true
config.Producer.Compression = sarama.CompressionSnappy
producer, err := sarama.NewSyncProducer([]string{"localhost:9092"}, config)
if err != nil {
b.Fatal(err)
}
defer producer.Close()
b.Run("SingleMessage", func(b *testing.B) {
for i := 0; i < b.N; i++ {
msg := &sarama.ProducerMessage{
Topic: "benchmark-topic",
Value: sarama.StringEncoder(fmt.Sprintf("message-%d", i)),
}
producer.SendMessage(msg)
}
})
b.Run("BatchMessage", func(b *testing.B) {
batch := make([]*sarama.ProducerMessage, 100)
for i := 0; i < b.N; i++ {
for j := 0; j < 100; j++ {
batch[j] = &sarama.ProducerMessage{
Topic: "benchmark-topic",
Value: sarama.StringEncoder(fmt.Sprintf("message-%d-%d", i, j)),
}
}
producer.SendMessages(batch)
}
})
}
|
19. JVM Settings for Kafka Brokers
Garbage Collection Configuration
1
2
3
4
|
# server.properties
# G1GC Configuration
KAFKA_HEAP_OPTS="-Xmx6G -Xms6G"
KAFKA_JVM_PERFORMANCE_OPTS="-XX:+UseG1GC -XX:MaxGCPauseMillis=20 -XX:InitiatingHeapOccupancyPercent=35 -XX:G1HeapRegionSize=16M -XX:MinMetaspaceFreeRatio=50 -XX:MaxMetaspaceFreeRatio=80"
|
Off-heap Memory Configuration
1
2
3
|
# server.properties
# Direct Memory Configuration
KAFKA_JVM_PERFORMANCE_OPTS="$KAFKA_JVM_PERFORMANCE_OPTS -XX:MaxDirectMemorySize=1G"
|
20. Microservice Communication Patterns
Request-Reply Pattern
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
|
type RequestReply struct {
producer sarama.SyncProducer
consumer sarama.Consumer
topic string
}
func (rr *RequestReply) SendRequest(requestID string, payload []byte) ([]byte, error) {
// Send request
msg := &sarama.ProducerMessage{
Topic: rr.topic,
Key: sarama.StringEncoder(requestID),
Value: sarama.ByteEncoder(payload),
}
_, _, err := rr.producer.SendMessage(msg)
if err != nil {
return nil, err
}
// Wait for response
partitionConsumer, err := rr.consumer.ConsumePartition(rr.topic, 0, sarama.OffsetNewest)
if err != nil {
return nil, err
}
defer partitionConsumer.Close()
for msg := range partitionConsumer.Messages() {
if string(msg.Key) == requestID {
return msg.Value, nil
}
}
return nil, errors.New("response timeout")
}
|
21. Kafka Security
SSL/TLS Configuration
1
2
3
4
5
6
7
|
config := sarama.NewConfig()
config.Net.TLS.Enable = true
config.Net.TLS.Config = &tls.Config{
InsecureSkipVerify: false,
RootCAs: rootCAs,
Certificates: []tls.Certificate{cert},
}
|
SASL Authentication
1
2
3
4
5
|
config := sarama.NewConfig()
config.Net.SASL.Enable = true
config.Net.SASL.Mechanism = sarama.SASLTypePlaintext
config.Net.SASL.User = "kafka-user"
config.Net.SASL.Password = "kafka-password"
|
ACL Management
1
2
3
4
5
6
|
# Create ACL for topic
kafka-acls --bootstrap-server localhost:9092 \
--add \
--allow-principal User:app1 \
--operation Read \
--topic test-topic
|
22. Kafka Connect
Database Connection
1
2
3
4
5
6
7
8
9
10
11
|
{
"name": "mysql-source",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
"connection.url": "jdbc:mysql://localhost:3306/mydb",
"table.whitelist": "users",
"mode": "incrementing",
"incrementing.column.name": "id",
"topic.prefix": "mysql-"
}
}
|
File System Connection
1
2
3
4
5
6
7
8
|
{
"name": "file-source",
"config": {
"connector.class": "org.apache.kafka.connect.file.FileStreamSourceConnector",
"file": "/path/to/file.txt",
"topic": "file-topic"
}
}
|
23. Kafka Streams Detailed Examples
Stateful Operations
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
|
type UserState struct {
Count int64
LastSeen time.Time
UserID string
}
func processUserEvents(stream *kstream.Stream) {
stream.GroupByKey().
Aggregate(
func() UserState { return UserState{} },
func(key string, value interface{}, state UserState) UserState {
state.Count++
state.LastSeen = time.Now()
return state
},
"user-state-store",
)
}
|
Windowing Operations
1
2
3
4
5
|
func processTimeWindows(stream *kstream.Stream) {
stream.GroupByKey().
WindowedBy(kstream.TumblingWindow(5 * time.Minute)).
Count("windowed-counts")
}
|
24. Disaster Recovery and High Availability
Backup Strategy
1
2
3
4
5
|
# Topic backup
kafka-backup --bootstrap-server localhost:9092 \
--topic test-topic \
--backup-dir /backup/kafka \
--format json
|
Multi-Datacenter Deployment
1
2
3
|
# server.properties
broker.rack=dc1-rack1
replica.selector.class=org.apache.kafka.common.replica.RackAwareReplicaSelector
|
AKHQ Configuration
1
2
3
4
5
6
7
8
|
akhq:
connections:
docker-kafka-server:
properties:
bootstrap.servers: "localhost:9092"
topics:
filter:
- "test-.*"
|
26. Anti-patterns and Best Practices
Topic Design
1
2
3
4
5
6
|
// Good topic naming example
const (
OrderCreatedTopic = "order-service.order.created"
OrderProcessedTopic = "order-service.order.processed"
PaymentReceivedTopic = "payment-service.payment.received"
)
|
Partition Count Determination
1
2
3
4
|
func calculatePartitionCount(throughput, targetLatency int) int {
// Each partition can process ~10MB per second
return (throughput / 10) + 1
}
|
27. Troubleshooting
Common Error Scenarios
1
2
3
4
5
6
7
8
9
10
|
func handleCommonErrors(err error) {
switch {
case errors.Is(err, sarama.ErrNotLeaderForPartition):
log.Println("Partition leader changed, will retry")
case errors.Is(err, sarama.ErrLeaderNotAvailable):
log.Println("Leader not available, will retry")
case errors.Is(err, sarama.ErrRequestTimedOut):
log.Println("Request timed out")
}
}
|
28. Monitoring and Alerting
Prometheus Metrics
1
2
3
4
|
func exposeMetrics() {
http.Handle("/metrics", promhttp.Handler())
go http.ListenAndServe(":8080", nil)
}
|
Grafana Dashboard
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
|
{
"dashboard": {
"panels": [
{
"title": "Kafka Consumer Lag",
"type": "graph",
"datasource": "Prometheus",
"targets": [
{
"expr": "kafka_consumer_group_lag"
}
]
}
]
}
}
|
29. Deployment Strategies
Kubernetes Deployment
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
|
apiVersion: apps/v1
kind: StatefulSet
metadata:
name: kafka
spec:
serviceName: kafka
replicas: 3
template:
spec:
containers:
- name: kafka
image: confluentinc/cp-kafka:latest
ports:
- containerPort: 9092
volumeMounts:
- name: data
mountPath: /var/lib/kafka/data
|
30. Cloud Services
AWS MSK Configuration
1
2
3
4
5
6
7
8
9
10
11
12
|
resource "aws_msk_cluster" "example" {
cluster_name = "example"
kafka_version = "2.8.1"
number_of_broker_nodes = 3
broker_node_group_info {
instance_type = "kafka.m5.large"
ebs_storage_size = 1000
client_subnets = [aws_subnet.example.id]
security_groups = [aws_security_group.example.id]
}
}
|
31. Testing Strategies
Unit Testing
1
2
3
4
5
6
7
8
|
func TestKafkaProducer(t *testing.T) {
mockProducer := NewMockProducer()
producer := NewProducer(mockProducer)
err := producer.SendMessage("test-topic", "test-message")
assert.NoError(t, err)
assert.True(t, mockProducer.MessageSent)
}
|
Chaos Testing
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
|
func TestKafkaChaos(t *testing.T) {
// Stop broker
stopBroker()
// Try to send message
err := producer.SendMessage("test-topic", "test-message")
assert.Error(t, err)
// Restart broker
startBroker()
// Try to send message again
err = producer.SendMessage("test-topic", "test-message")
assert.NoError(t, err)
}
|
32. Advanced Optimizations for Kafka with Go
Memory Management and GC Optimization
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
|
type KafkaProducer struct {
producer sarama.SyncProducer
buffer *bytes.Buffer
pool *sync.Pool
}
func NewKafkaProducer() *KafkaProducer {
return &KafkaProducer{
buffer: bytes.NewBuffer(make([]byte, 0, 1024)),
pool: &sync.Pool{
New: func() interface{} {
return make([]byte, 0, 1024)
},
},
}
}
func (kp *KafkaProducer) SendMessageOptimized(topic string, message []byte) error {
// Get buffer from pool
buf := kp.pool.Get().([]byte)
defer kp.pool.Put(buf)
// Clear buffer and append message
buf = buf[:0]
buf = append(buf, message...)
msg := &sarama.ProducerMessage{
Topic: topic,
Value: sarama.ByteEncoder(buf),
}
_, _, err := kp.producer.SendMessage(msg)
return err
}
|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
|
type HighPerformanceConsumer struct {
consumers []*Consumer
wg sync.WaitGroup
ctx context.Context
cancel context.CancelFunc
metrics *ConsumerMetrics
}
type ConsumerMetrics struct {
messagesProcessed atomic.Int64
processingTime atomic.Int64
errors atomic.Int64
lastProcessedOffset atomic.Int64
}
func NewHighPerformanceConsumer(brokers []string, topic string, numConsumers int) *HighPerformanceConsumer {
ctx, cancel := context.WithCancel(context.Background())
return &HighPerformanceConsumer{
consumers: make([]*Consumer, numConsumers),
ctx: ctx,
cancel: cancel,
metrics: &ConsumerMetrics{},
}
}
func (c *HighPerformanceConsumer) Start() {
for i := 0; i < len(c.consumers); i++ {
c.wg.Add(1)
go func(consumerID int) {
defer c.wg.Done()
c.processMessages(consumerID)
}(i)
}
}
func (c *HighPerformanceConsumer) processMessages(consumerID int) {
consumer := c.consumers[consumerID]
for {
select {
case <-c.ctx.Done():
return
default:
messages, err := consumer.FetchMessages(100)
if err != nil {
c.metrics.errors.Add(1)
continue
}
startTime := time.Now()
for _, msg := range messages {
if err := c.processMessage(msg); err != nil {
c.metrics.errors.Add(1)
continue
}
c.metrics.messagesProcessed.Add(1)
c.metrics.lastProcessedOffset.Store(msg.Offset)
}
c.metrics.processingTime.Add(time.Since(startTime).Milliseconds())
}
}
}
|
Batch Processing and Retry Mechanism
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
|
type BatchProcessor struct {
batchSize int
maxRetries int
retryBackoff time.Duration
processor MessageProcessor
metrics *BatchMetrics
}
type BatchMetrics struct {
batchCount atomic.Int64
successCount atomic.Int64
failureCount atomic.Int64
retryCount atomic.Int64
processingTime atomic.Int64
}
func (bp *BatchProcessor) ProcessBatch(messages []Message) error {
startTime := time.Now()
defer func() {
bp.metrics.processingTime.Add(time.Since(startTime).Milliseconds())
}()
batch := make([]Message, 0, bp.batchSize)
for _, msg := range messages {
batch = append(batch, msg)
if len(batch) >= bp.batchSize {
if err := bp.processWithRetry(batch); err != nil {
return err
}
batch = batch[:0]
}
}
if len(batch) > 0 {
return bp.processWithRetry(batch)
}
return nil
}
func (bp *BatchProcessor) processWithRetry(batch []Message) error {
var err error
for i := 0; i < bp.maxRetries; i++ {
if err = bp.processor.Process(batch); err == nil {
bp.metrics.successCount.Add(1)
return nil
}
bp.metrics.retryCount.Add(1)
time.Sleep(bp.retryBackoff * time.Duration(i+1))
}
bp.metrics.failureCount.Add(1)
return err
}
|
Circuit Breaker and Rate Limiting
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
|
type KafkaCircuitBreaker struct {
failures atomic.Int64
threshold int64
resetTimeout time.Duration
lastFailure atomic.Int64
mu sync.RWMutex
}
type RateLimiter struct {
rate rate.Limiter
burst int
windowSize time.Duration
}
func NewKafkaCircuitBreaker(threshold int64, resetTimeout time.Duration) *KafkaCircuitBreaker {
return &KafkaCircuitBreaker{
threshold: threshold,
resetTimeout: resetTimeout,
}
}
func (cb *KafkaCircuitBreaker) Execute(f func() error) error {
if cb.failures.Load() >= cb.threshold {
if time.Since(time.Unix(0, cb.lastFailure.Load())) < cb.resetTimeout {
return errors.New("circuit breaker open")
}
cb.failures.Store(0)
}
err := f()
if err != nil {
cb.failures.Add(1)
cb.lastFailure.Store(time.Now().UnixNano())
}
return err
}
func NewRateLimiter(rate float64, burst int, windowSize time.Duration) *RateLimiter {
return &RateLimiter{
rate: *rate.NewLimiter(rate, burst),
burst: burst,
windowSize: windowSize,
}
}
func (rl *RateLimiter) Allow() bool {
return rl.rate.Allow()
}
|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
|
type KafkaMetrics struct {
producerMetrics *ProducerMetrics
consumerMetrics *ConsumerMetrics
brokerMetrics *BrokerMetrics
}
type ProducerMetrics struct {
messagesSent atomic.Int64
bytesSent atomic.Int64
sendLatency atomic.Int64
errors atomic.Int64
batchSize atomic.Int64
}
type BrokerMetrics struct {
activeConnections atomic.Int64
requestLatency atomic.Int64
bytesIn atomic.Int64
bytesOut atomic.Int64
}
func (km *KafkaMetrics) ExposeMetrics() {
http.Handle("/metrics", promhttp.Handler())
go http.ListenAndServe(":8080", nil)
}
func (km *KafkaMetrics) RecordProducerMetrics(msg *sarama.ProducerMessage, err error) {
if err != nil {
km.producerMetrics.errors.Add(1)
return
}
km.producerMetrics.messagesSent.Add(1)
km.producerMetrics.bytesSent.Add(int64(len(msg.Value.Encode())))
km.producerMetrics.batchSize.Add(1)
}
|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
|
type MessageSerializer struct {
codec *gob.Encoder
buffer *bytes.Buffer
pool *sync.Pool
}
func NewMessageSerializer() *MessageSerializer {
return &MessageSerializer{
buffer: bytes.NewBuffer(make([]byte, 0, 1024)),
pool: &sync.Pool{
New: func() interface{} {
return make([]byte, 0, 1024)
},
},
}
}
func (ms *MessageSerializer) Serialize(msg interface{}) ([]byte, error) {
buf := ms.pool.Get().([]byte)
defer ms.pool.Put(buf)
buf = buf[:0]
ms.buffer.Reset()
ms.codec = gob.NewEncoder(ms.buffer)
if err := ms.codec.Encode(msg); err != nil {
return nil, err
}
return ms.buffer.Bytes(), nil
}
|
These optimizations are important techniques for achieving high performance with Kafka in Go. Specifically:
- Memory management and GC optimization
- High-performance consumer pool
- Batch processing and retry mechanism
- Circuit breaker and rate limiting
- Performance monitoring and metric collection
- High-performance serialization
These techniques can provide significant performance improvements, especially in high-volume systems.
๐ Security Checklist
-
SSL/TLS Configuration:
- Certificate management
- Encryption protocols
- Secure port configuration
-
Authentication:
- SASL mechanisms
- User management
- Password policies
-
Authorization:
- ACL rules
- Role-based access control
- Resource-based permissions
Latency:
- Write Latency
- Read Latency
Throughput:
- Messages per second
- Bytes per second
Durability:
- Data loss rate
- Replica synchronization delay
โ
Conclusion
Kafka’s high performance is achieved through architectural decisions like zero-copy, batch processing, OS cache utilization, partitioning, compression, and acks. When combined with Go, these structures make it possible to develop high-performance and scalable systems.
๐ Resources
- Apache Kafka Documentation
- Kafka Internal Structure - Confluent Blog
- Go Kafka Client (Shopify/sarama)
- Kafka Streams Documentation
- Monitoring Kafka with Prometheus
- Kafka Security Documentation
- Kafka Connect Documentation
- Kafka Cloud Services
- Kafka Testing Guide
๐ Additional Resources
- Go Performance Best Practices
- Kafka Monitoring Tools
- Go Concurrency Patterns
- Kafka Security Best Practices
- Go Memory Management