İçerikler

Apache Kafka ve Go ile Yüksek Performanslı Mesajlaşma Sistemleri: Mimari, Optimizasyon ve Pratik Çözümler

İçerikler

Apache Kafka ve Go ile Yüksek Performanslı Mesajlaşma Sistemleri: Mimari, Optimizasyon ve Pratik Çözümler

Modern veri mimarilerinin en güçlü ve verimli dağıtık mesajlaşma sistemlerinden biri olan Apache Kafka, yüksek performanslı uygulamalar geliştirmek için ideal bir çözüm sunmaktadır. Bu kapsamlı makalede, Kafka’nın olağanüstü performansının arkasındaki teknik mimariyi inceleyecek ve Go programlama dili ile etkili uygulamasını detaylı örnekler ve gerçek dünya senaryoları üzerinden göstereceğiz.


🏗 Kafka’nın Performans Mimarisi

Kafka’nın temel mimarisini aşağıdaki diyagram ile gösterelim:

Kafka’nın hızını sağlayan temel unsurlar:

  • Sıralı Disk I/O
  • Sıfır-Kopya Transferi
  • Toplu İşleme
  • Bölümleme ve Paralellik
  • İşletim Sistemi Sayfa Önbelleği
  • Sıkıştırma
  • Acks ve Replikasyon Ayarları
  • Producer & Consumer Ayarları
  • Mesaj Depolama ve Sıkıştırma Politikaları
  • Kafka Streams ile Gerçek Zamanlı İşleme
  • Zookeeper veya KRaft Modu
  • İzleme ve Performans Ayarı

1. Sıralı Disk I/O

Kafka, mesajları diske sıralı olarak yazarak disk erişim maliyetlerini minimize eder.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
func writeSequentially(filename string, messages []string) error {
    file, err := os.OpenFile(filename, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644)
    if err != nil {
        return err
    }
    defer file.Close()

    for _, msg := range messages {
        if _, err := file.WriteString(msg + "\n"); err != nil {
            return err
        }
    }
    return nil
}

2. Sıfır-Kopya Transferi

Kafka, sıfır-kopya ile kullanıcı alanını bypass ederek diske ve sokete daha hızlı veri transferi sağlar.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
func sendFileZeroCopy(conn net.Conn, filename string) error {
    file, err := os.Open(filename)
    if err != nil {
        return err
    }
    defer file.Close()

    _, err = io.Copy(conn, file)
    return err
}

3. Toplu İşleme

Kafka, mesajları gruplandırarak yazma maliyetlerini azaltır.

1
2
3
4
5
6
7
func processBatch(messages []Message) {
    batch := make([]string, 0, len(messages))
    for _, msg := range messages {
        batch = append(batch, msg.Value)
    }
    writeSequentially("kafka.log", batch)
}

Önemli ayarlar:

  • batch.size: Toplu mesaj boyutu
  • linger.ms: Gönderme gecikmesi (daha fazla mesaj beklemek için)

4. Bölümleme ve Paralellik

Kafka, konuları bölümlere ayırarak paralel tüketimi sağlar.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
func processPartitions(partitions [][]Message) {
    var wg sync.WaitGroup
    for i, partition := range partitions {
        wg.Add(1)
        go func(partitionNum int, msgs []Message) {
            defer wg.Done()
            processBatch(msgs)
        }(i, partition)
    }
    wg.Wait()
}

5. İşletim Sistemi Sayfa Önbelleği

Kafka, disk erişimi için işletim sisteminin sayfa önbelleğini kullanır.

1
2
3
func readWithCache(filename string) ([]byte, error) {
    return os.ReadFile(filename) // OS seviyesinde önbellek kullanır
}

6. Sıkıştırma

Kafka, gzip, lz4, snappy ve zstd gibi formatları kullanarak mesajları sıkıştırabilir.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
func compressData(data []byte) ([]byte, error) {
    var buf bytes.Buffer
    zw := gzip.NewWriter(&buf)
    _, err := zw.Write(data)
    if err != nil {
        return nil, err
    }
    zw.Close()
    return buf.Bytes(), nil
}

7. Acks ve Replikasyon Ayarları

Kafka’da veri güvenliği ve hızı, acks ayarları ile yönetilir:

  • acks=0: Onay yok (en hızlı, veri kaybı riski yüksek)
  • acks=1: Sadece leader onayı
  • acks=all: Tüm replikaların onayı (en güvenilir)

Ayrıca, minimum replikasyon seviyesi min.insync.replicas ile ayarlanabilir.

8. Producer & Consumer Ayarları

Producer Ayarları:

  • buffer.memory: Bellekteki tampon boyutu
  • compression.type: Sıkıştırma tipi
  • retries: Yeniden deneme sayısı

Consumer Ayarları:

  • fetch.min.bytes: Broker’dan minimum veri eşiği
  • max.poll.records: Poll başına maksimum mesaj sayısı
  • enable.auto.commit: Otomatik offset commit

9. Mesaj Depolama ve Sıkıştırma Politikaları

Kafka mesajları şu şekilde saklayabilir:

  • Zaman (retention.ms)
  • Boyut (retention.bytes)
  • Anahtar bazlı sıkıştırma (cleanup.policy=compact)

Bu, disk verimliliğini ve erişim hızını etkiler.

10. Kafka Streams ile Gerçek Zamanlı İşleme

Kafka sadece bir mesaj kuyruğu değil, aynı zamanda bir veri işleme motorudur.

1
2
3
4
// Örnek pencereleme işlemi - sahte kod
streamsBuilder.Stream("topic").
    WindowedBy(TimeWindows.of(Duration.ofSeconds(10))).
    Reduce((a, b) -> a + b)

11. Zookeeper Yerine KRaft Modu

Kafka 2.8+‘dan itibaren, Zookeeper olmadan KRaft (Kafka Raft) mimarisi kullanılabilir. Bu, yönetimi basitleştirir ve metadata gecikmesini azaltır.

12. İzleme ve Performans Ayarı

Kafka’nın performansı uygun izleme araçları ile optimize edilebilir:

  • Prometheus + JMX Exporter
  • Grafana dashboardları
  • Kafka Manager / AKHQ

İzlenmesi gereken metrikler:

  • Mesaj hacmi
  • Consumer gecikmesi
  • Disk kullanımı
  • Garbage Collection süreleri
  • Network ve disk I/O

13. Go-Spesifik Kafka Uygulamaları

Sarama vs Confluent-kafka-go Karşılaştırması

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
// Sarama Producer Örneği
config := sarama.NewConfig()
config.Producer.Return.Successes = true
producer, err := sarama.NewSyncProducer([]string{"localhost:9092"}, config)
if err != nil {
    log.Fatal(err)
}
defer producer.Close()

msg := &sarama.ProducerMessage{
    Topic: "test-topic",
    Value: sarama.StringEncoder("test message"),
}
partition, offset, err := producer.SendMessage(msg)

// Confluent-kafka-go Producer Örneği
p, err := kafka.NewProducer(&kafka.ConfigMap{
    "bootstrap.servers": "localhost:9092",
})
if err != nil {
    log.Fatal(err)
}
defer p.Close()

p.Produce(&kafka.Message{
    TopicPartition: kafka.TopicPartition{Topic: &topic, Partition: kafka.PartitionAny},
    Value:          []byte("test message"),
}, nil)

Goroutine Tabanlı Consumer Havuzu

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
type ConsumerPool struct {
    consumers []*Consumer
    wg        sync.WaitGroup
    ctx       context.Context
    cancel    context.CancelFunc
}

func NewConsumerPool(brokers []string, topic string, numConsumers int) *ConsumerPool {
    ctx, cancel := context.WithCancel(context.Background())
    pool := &ConsumerPool{
        consumers: make([]*Consumer, numConsumers),
        ctx:       ctx,
        cancel:    cancel,
    }
    
    for i := 0; i < numConsumers; i++ {
        pool.consumers[i] = NewConsumer(brokers, topic)
    }
    return pool
}

func (p *ConsumerPool) Start() {
    for _, consumer := range p.consumers {
        p.wg.Add(1)
        go func(c *Consumer) {
            defer p.wg.Done()
            c.Consume(p.ctx)
        }(consumer)
    }
}

func (p *ConsumerPool) Stop() {
    p.cancel()
    p.wg.Wait()
}

14. Yük Dengeleme ve Consumer Grupları

Consumer Group Yeniden Dengeleme Stratejileri

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
// Consumer Group Yapılandırması
config := sarama.NewConfig()
config.Consumer.Group.Rebalance.Strategy = sarama.BalanceStrategyRoundRobin
config.Consumer.Group.Session.Timeout = 20 * time.Second
config.Consumer.Group.Heartbeat.Interval = 6 * time.Second

// Consumer Gecikme İzleme
func monitorConsumerLag(group string) {
    admin, err := sarama.NewClusterAdmin([]string{"localhost:9092"}, nil)
    if err != nil {
        log.Fatal(err)
    }
    defer admin.Close()

    for {
        lags, err := admin.ListConsumerGroupOffsets(group, nil)
        if err != nil {
            log.Printf("Consumer gecikmesi alınırken hata: %v", err)
            continue
        }
        // Gecikme bilgilerini işle
        time.Sleep(1 * time.Minute)
    }
}

15. Network ve Serileştirme

Schema Registry Entegrasyonu

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
// Schema Registry ile Avro Serileştirme
type User struct {
    Name  string `avro:"name"`
    Email string `avro:"email"`
}

func serializeWithAvro(user User) ([]byte, error) {
    schema, err := avro.Parse(`{
        "type": "record",
        "name": "User",
        "fields": [
            {"name": "name", "type": "string"},
            {"name": "email", "type": "string"}
        ]
    }`)
    if err != nil {
        return nil, err
    }

    return avro.Marshal(schema, user)
}

// Network Buffer Ayarı
config := sarama.NewConfig()
config.Net.MaxOpenRequests = 5
config.Net.DialTimeout = 30 * time.Second
config.Net.ReadTimeout = 30 * time.Second
config.Net.WriteTimeout = 30 * time.Second

16. Hata Yönetimi ve Dayanıklılık

Circuit Breaker Deseni

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
type CircuitBreaker struct {
    failures     int
    threshold    int
    resetTimeout time.Duration
    lastFailure  time.Time
    mu           sync.Mutex
}

func (cb *CircuitBreaker) Execute(f func() error) error {
    cb.mu.Lock()
    if cb.failures >= cb.threshold {
        if time.Since(cb.lastFailure) < cb.resetTimeout {
            cb.mu.Unlock()
            return errors.New("circuit breaker açık")
        }
        cb.failures = 0
    }
    cb.mu.Unlock()

    err := f()
    if err != nil {
        cb.mu.Lock()
        cb.failures++
        cb.lastFailure = time.Now()
        cb.mu.Unlock()
    }
    return err
}

// Dead Letter Queue Uygulaması
func processWithDLQ(msg *sarama.ConsumerMessage) error {
    err := processMessage(msg)
    if err != nil {
        // DLQ'ya gönder
        dlqMsg := &sarama.ProducerMessage{
            Topic: "dlq-topic",
            Value: sarama.ByteEncoder(msg.Value),
            Headers: []sarama.RecordHeader{
                {
                    Key:   []byte("original-topic"),
                    Value: []byte(msg.Topic),
                },
                {
                    Key:   []byte("error"),
                    Value: []byte(err.Error()),
                },
            },
        }
        producer.SendMessage(dlqMsg)
    }
    return err
}

17. Gerçek Dünya Kullanım Örnekleri

Log Toplama Deseni

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
type LogAggregator struct {
    producer sarama.SyncProducer
    topic    string
}

func (la *LogAggregator) SendLog(service string, level string, message string) error {
    logEntry := map[string]interface{}{
        "timestamp": time.Now().Unix(),
        "service":   service,
        "level":     level,
        "message":   message,
    }

    jsonData, err := json.Marshal(logEntry)
    if err != nil {
        return err
    }

    msg := &sarama.ProducerMessage{
        Topic: la.topic,
        Value: sarama.ByteEncoder(jsonData),
    }

    _, _, err = la.producer.SendMessage(msg)
    return err
}

Event Sourcing Uygulaması

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
type EventStore struct {
    producer sarama.SyncProducer
    topic    string
}

func (es *EventStore) SaveEvent(aggregateID string, eventType string, data interface{}) error {
    event := map[string]interface{}{
        "aggregate_id": aggregateID,
        "event_type":   eventType,
        "data":        data,
        "timestamp":   time.Now().Unix(),
    }

    jsonData, err := json.Marshal(event)
    if err != nil {
        return err
    }

    msg := &sarama.ProducerMessage{
        Topic: es.topic,
        Key:   sarama.StringEncoder(aggregateID),
        Value: sarama.ByteEncoder(jsonData),
    }

    _, _, err = es.producer.SendMessage(msg)
    return err
}

18. Benchmarking ve Test

Performans Test Kurulumu

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
func benchmarkKafkaPerformance(b *testing.B) {
    config := sarama.NewConfig()
    config.Producer.Return.Successes = true
    config.Producer.Compression = sarama.CompressionSnappy
    
    producer, err := sarama.NewSyncProducer([]string{"localhost:9092"}, config)
    if err != nil {
        b.Fatal(err)
    }
    defer producer.Close()

    b.Run("TekMesaj", func(b *testing.B) {
        for i := 0; i < b.N; i++ {
            msg := &sarama.ProducerMessage{
                Topic: "benchmark-topic",
                Value: sarama.StringEncoder(fmt.Sprintf("message-%d", i)),
            }
            producer.SendMessage(msg)
        }
    })

    b.Run("TopluMesaj", func(b *testing.B) {
        batch := make([]*sarama.ProducerMessage, 100)
        for i := 0; i < b.N; i++ {
            for j := 0; j < 100; j++ {
                batch[j] = &sarama.ProducerMessage{
                    Topic: "benchmark-topic",
                    Value: sarama.StringEncoder(fmt.Sprintf("message-%d-%d", i, j)),
                }
            }
            producer.SendMessages(batch)
        }
    })
}

19. Kafka Broker’ları için JVM Ayarı

Garbage Collection Yapılandırması

1
2
3
4
# server.properties
# G1GC Yapılandırması
KAFKA_HEAP_OPTS="-Xmx6G -Xms6G"
KAFKA_JVM_PERFORMANCE_OPTS="-XX:+UseG1GC -XX:MaxGCPauseMillis=20 -XX:InitiatingHeapOccupancyPercent=35 -XX:G1HeapRegionSize=16M -XX:MinMetaspaceFreeRatio=50 -XX:MaxMetaspaceFreeRatio=80"

Off-heap Bellek Yapılandırması

1
2
3
# server.properties
# Direct Memory Yapılandırması
KAFKA_JVM_PERFORMANCE_OPTS="$KAFKA_JVM_PERFORMANCE_OPTS -XX:MaxDirectMemorySize=1G"

20. Mikroservis İletişim Desenleri

İstek-Yanıt Deseni

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
type RequestReply struct {
    producer sarama.SyncProducer
    consumer sarama.Consumer
    topic    string
}

func (rr *RequestReply) SendRequest(requestID string, payload []byte) ([]byte, error) {
    // İstek gönder
    msg := &sarama.ProducerMessage{
        Topic: rr.topic,
        Key:   sarama.StringEncoder(requestID),
        Value: sarama.ByteEncoder(payload),
    }
    _, _, err := rr.producer.SendMessage(msg)
    if err != nil {
        return nil, err
    }

    // Yanıt bekle
    partitionConsumer, err := rr.consumer.ConsumePartition(rr.topic, 0, sarama.OffsetNewest)
    if err != nil {
        return nil, err
    }
    defer partitionConsumer.Close()

    for msg := range partitionConsumer.Messages() {
        if string(msg.Key) == requestID {
            return msg.Value, nil
        }
    }
    return nil, errors.New("yanıt için zaman aşımı")
}

21. Kafka Güvenliği

SSL/TLS Yapılandırması

1
2
3
4
5
6
7
config := sarama.NewConfig()
config.Net.TLS.Enable = true
config.Net.TLS.Config = &tls.Config{
    InsecureSkipVerify: false,
    RootCAs:            rootCAs,
    Certificates:       []tls.Certificate{cert},
}

SASL Kimlik Doğrulama

1
2
3
4
5
config := sarama.NewConfig()
config.Net.SASL.Enable = true
config.Net.SASL.Mechanism = sarama.SASLTypePlaintext
config.Net.SASL.User = "kafka-user"
config.Net.SASL.Password = "kafka-password"

ACL Yönetimi

1
2
3
4
5
6
# Topic için ACL oluştur
kafka-acls --bootstrap-server localhost:9092 \
    --add \
    --allow-principal User:app1 \
    --operation Read \
    --topic test-topic

22. Kafka Connect

Veritabanı Bağlantısı

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
{
    "name": "mysql-source",
    "config": {
        "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
        "connection.url": "jdbc:mysql://localhost:3306/mydb",
        "table.whitelist": "users",
        "mode": "incrementing",
        "incrementing.column.name": "id",
        "topic.prefix": "mysql-"
    }
}

Dosya Sistemi Bağlantısı

1
2
3
4
5
6
7
8
{
    "name": "file-source",
    "config": {
        "connector.class": "org.apache.kafka.connect.file.FileStreamSourceConnector",
        "file": "/path/to/file.txt",
        "topic": "file-topic"
    }
}

23. Kafka Streams Detaylı Örnekler

Durumlu İşlemler

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
type UserState struct {
    Count     int64
    LastSeen  time.Time
    UserID    string
}

func processUserEvents(stream *kstream.Stream) {
    stream.GroupByKey().
        Aggregate(
            func() UserState { return UserState{} },
            func(key string, value interface{}, state UserState) UserState {
                state.Count++
                state.LastSeen = time.Now()
                return state
            },
            "user-state-store",
        )
}

Pencereleme İşlemleri

1
2
3
4
5
func processTimeWindows(stream *kstream.Stream) {
    stream.GroupByKey().
        WindowedBy(kstream.TumblingWindow(5 * time.Minute)).
        Count("windowed-counts")
}

24. Felaket Kurtarma ve Yüksek Erişilebilirlik

Yedekleme Stratejisi

1
2
3
4
5
# Topic yedekleme
kafka-backup --bootstrap-server localhost:9092 \
    --topic test-topic \
    --backup-dir /backup/kafka \
    --format json

Çoklu Veri Merkezi Dağıtımı

1
2
3
# server.properties
broker.rack=dc1-rack1
replica.selector.class=org.apache.kafka.common.replica.RackAwareReplicaSelector

25. Kafka UI Araçları

AKHQ Yapılandırması

1
2
3
4
5
6
7
8
akhq:
  connections:
    docker-kafka-server:
      properties:
        bootstrap.servers: "localhost:9092"
      topics:
        filter:
          - "test-.*"

26. Anti-patterns ve En İyi Uygulamalar

Topic Tasarımı

1
2
3
4
5
6
// İyi topic isimlendirme örneği
const (
    OrderCreatedTopic    = "order-service.order.created"
    OrderProcessedTopic  = "order-service.order.processed"
    PaymentReceivedTopic = "payment-service.payment.received"
)

Partition Sayısı Belirleme

1
2
3
4
func calculatePartitionCount(throughput, targetLatency int) int {
    // Her partition saniyede ~10MB işleyebilir
    return (throughput / 10) + 1
}

27. Sorun Giderme

Yaygın Hata Senaryoları

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
func handleCommonErrors(err error) {
    switch {
    case errors.Is(err, sarama.ErrNotLeaderForPartition):
        log.Println("Partition lideri değişti, yeniden denenecek")
    case errors.Is(err, sarama.ErrLeaderNotAvailable):
        log.Println("Lider mevcut değil, yeniden denenecek")
    case errors.Is(err, sarama.ErrRequestTimedOut):
        log.Println("İstek zaman aşımına uğradı")
    }
}

28. İzleme ve Uyarı

Prometheus Metrikleri

1
2
3
4
func exposeMetrics() {
    http.Handle("/metrics", promhttp.Handler())
    go http.ListenAndServe(":8080", nil)
}

Grafana Dashboard

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
{
    "dashboard": {
        "panels": [
            {
                "title": "Kafka Consumer Gecikmesi",
                "type": "graph",
                "datasource": "Prometheus",
                "targets": [
                    {
                        "expr": "kafka_consumer_group_lag"
                    }
                ]
            }
        ]
    }
}

29. Dağıtım Stratejileri

Kubernetes Dağıtımı

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
apiVersion: apps/v1
kind: StatefulSet
metadata:
  name: kafka
spec:
  serviceName: kafka
  replicas: 3
  template:
    spec:
      containers:
      - name: kafka
        image: confluentinc/cp-kafka:latest
        ports:
        - containerPort: 9092
        volumeMounts:
        - name: data
          mountPath: /var/lib/kafka/data

30. Bulut Servisleri

AWS MSK Yapılandırması

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
resource "aws_msk_cluster" "example" {
  cluster_name           = "example"
  kafka_version         = "2.8.1"
  number_of_broker_nodes = 3

  broker_node_group_info {
    instance_type   = "kafka.m5.large"
    ebs_storage_size = 1000
    client_subnets  = [aws_subnet.example.id]
    security_groups = [aws_security_group.example.id]
  }
}

31. Test Stratejileri

Birim Testi

1
2
3
4
5
6
7
8
func TestKafkaProducer(t *testing.T) {
    mockProducer := NewMockProducer()
    producer := NewProducer(mockProducer)
    
    err := producer.SendMessage("test-topic", "test-message")
    assert.NoError(t, err)
    assert.True(t, mockProducer.MessageSent)
}

Kaos Testi

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
func TestKafkaChaos(t *testing.T) {
    // Broker'ı durdur
    stopBroker()
    
    // Mesaj göndermeyi dene
    err := producer.SendMessage("test-topic", "test-message")
    assert.Error(t, err)
    
    // Broker'ı yeniden başlat
    startBroker()
    
    // Mesaj göndermeyi tekrar dene
    err = producer.SendMessage("test-topic", "test-message")
    assert.NoError(t, err)
}

32. Go ile Kafka Kullanımında İleri Seviye Optimizasyonlar

Bellek Yönetimi ve GC Optimizasyonu

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
type KafkaProducer struct {
    producer sarama.SyncProducer
    buffer   *bytes.Buffer
    pool     *sync.Pool
}

func NewKafkaProducer() *KafkaProducer {
    return &KafkaProducer{
        buffer: bytes.NewBuffer(make([]byte, 0, 1024)),
        pool: &sync.Pool{
            New: func() interface{} {
                return make([]byte, 0, 1024)
            },
        },
    }
}

func (kp *KafkaProducer) SendMessageOptimized(topic string, message []byte) error {
    // Buffer'ı pool'dan al
    buf := kp.pool.Get().([]byte)
    defer kp.pool.Put(buf)
    
    // Buffer'ı temizle ve mesajı ekle
    buf = buf[:0]
    buf = append(buf, message...)
    
    msg := &sarama.ProducerMessage{
        Topic: topic,
        Value: sarama.ByteEncoder(buf),
    }
    
    _, _, err := kp.producer.SendMessage(msg)
    return err
}

Yüksek Performanslı Consumer Havuzu

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
type HighPerformanceConsumer struct {
    consumers []*Consumer
    wg        sync.WaitGroup
    ctx       context.Context
    cancel    context.CancelFunc
    metrics   *ConsumerMetrics
}

type ConsumerMetrics struct {
    messagesProcessed    atomic.Int64
    processingTime       atomic.Int64
    errors              atomic.Int64
    lastProcessedOffset atomic.Int64
}

func NewHighPerformanceConsumer(brokers []string, topic string, numConsumers int) *HighPerformanceConsumer {
    ctx, cancel := context.WithCancel(context.Background())
    return &HighPerformanceConsumer{
        consumers: make([]*Consumer, numConsumers),
        ctx:       ctx,
        cancel:    cancel,
        metrics:   &ConsumerMetrics{},
    }
}

func (c *HighPerformanceConsumer) Start() {
    for i := 0; i < len(c.consumers); i++ {
        c.wg.Add(1)
        go func(consumerID int) {
            defer c.wg.Done()
            c.processMessages(consumerID)
        }(i)
    }
}

func (c *HighPerformanceConsumer) processMessages(consumerID int) {
    consumer := c.consumers[consumerID]
    for {
        select {
        case <-c.ctx.Done():
            return
        default:
            messages, err := consumer.FetchMessages(100)
            if err != nil {
                c.metrics.errors.Add(1)
                continue
            }
            
            startTime := time.Now()
            for _, msg := range messages {
                if err := c.processMessage(msg); err != nil {
                    c.metrics.errors.Add(1)
                    continue
                }
                c.metrics.messagesProcessed.Add(1)
                c.metrics.lastProcessedOffset.Store(msg.Offset)
            }
            c.metrics.processingTime.Add(time.Since(startTime).Milliseconds())
        }
    }
}

Batch İşleme ve Retry Mekanizması

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
type BatchProcessor struct {
    batchSize    int
    maxRetries   int
    retryBackoff time.Duration
    processor    MessageProcessor
    metrics      *BatchMetrics
}

type BatchMetrics struct {
    batchCount     atomic.Int64
    successCount   atomic.Int64
    failureCount   atomic.Int64
    retryCount     atomic.Int64
    processingTime atomic.Int64
}

func (bp *BatchProcessor) ProcessBatch(messages []Message) error {
    startTime := time.Now()
    defer func() {
        bp.metrics.processingTime.Add(time.Since(startTime).Milliseconds())
    }()

    batch := make([]Message, 0, bp.batchSize)
    for _, msg := range messages {
        batch = append(batch, msg)
        if len(batch) >= bp.batchSize {
            if err := bp.processWithRetry(batch); err != nil {
                return err
            }
            batch = batch[:0]
        }
    }

    if len(batch) > 0 {
        return bp.processWithRetry(batch)
    }
    return nil
}

func (bp *BatchProcessor) processWithRetry(batch []Message) error {
    var err error
    for i := 0; i < bp.maxRetries; i++ {
        if err = bp.processor.Process(batch); err == nil {
            bp.metrics.successCount.Add(1)
            return nil
        }
        bp.metrics.retryCount.Add(1)
        time.Sleep(bp.retryBackoff * time.Duration(i+1))
    }
    bp.metrics.failureCount.Add(1)
    return err
}

Circuit Breaker ve Rate Limiting

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
type KafkaCircuitBreaker struct {
    failures     atomic.Int64
    threshold    int64
    resetTimeout time.Duration
    lastFailure  atomic.Int64
    mu           sync.RWMutex
}

type RateLimiter struct {
    rate       rate.Limiter
    burst      int
    windowSize time.Duration
}

func NewKafkaCircuitBreaker(threshold int64, resetTimeout time.Duration) *KafkaCircuitBreaker {
    return &KafkaCircuitBreaker{
        threshold:    threshold,
        resetTimeout: resetTimeout,
    }
}

func (cb *KafkaCircuitBreaker) Execute(f func() error) error {
    if cb.failures.Load() >= cb.threshold {
        if time.Since(time.Unix(0, cb.lastFailure.Load())) < cb.resetTimeout {
            return errors.New("circuit breaker açık")
        }
        cb.failures.Store(0)
    }

    err := f()
    if err != nil {
        cb.failures.Add(1)
        cb.lastFailure.Store(time.Now().UnixNano())
    }
    return err
}

func NewRateLimiter(rate float64, burst int, windowSize time.Duration) *RateLimiter {
    return &RateLimiter{
        rate:       *rate.NewLimiter(rate, burst),
        burst:      burst,
        windowSize: windowSize,
    }
}

func (rl *RateLimiter) Allow() bool {
    return rl.rate.Allow()
}

Performans İzleme ve Metrik Toplama

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
type KafkaMetrics struct {
    producerMetrics *ProducerMetrics
    consumerMetrics *ConsumerMetrics
    brokerMetrics   *BrokerMetrics
}

type ProducerMetrics struct {
    messagesSent     atomic.Int64
    bytesSent        atomic.Int64
    sendLatency      atomic.Int64
    errors           atomic.Int64
    batchSize        atomic.Int64
}

type BrokerMetrics struct {
    activeConnections atomic.Int64
    requestLatency    atomic.Int64
    bytesIn          atomic.Int64
    bytesOut         atomic.Int64
}

func (km *KafkaMetrics) ExposeMetrics() {
    http.Handle("/metrics", promhttp.Handler())
    go http.ListenAndServe(":8080", nil)
}

func (km *KafkaMetrics) RecordProducerMetrics(msg *sarama.ProducerMessage, err error) {
    if err != nil {
        km.producerMetrics.errors.Add(1)
        return
    }
    
    km.producerMetrics.messagesSent.Add(1)
    km.producerMetrics.bytesSent.Add(int64(len(msg.Value.Encode())))
    km.producerMetrics.batchSize.Add(1)
}

Yüksek Performanslı Serileştirme

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
type MessageSerializer struct {
    codec    *gob.Encoder
    buffer   *bytes.Buffer
    pool     *sync.Pool
}

func NewMessageSerializer() *MessageSerializer {
    return &MessageSerializer{
        buffer: bytes.NewBuffer(make([]byte, 0, 1024)),
        pool: &sync.Pool{
            New: func() interface{} {
                return make([]byte, 0, 1024)
            },
        },
    }
}

func (ms *MessageSerializer) Serialize(msg interface{}) ([]byte, error) {
    buf := ms.pool.Get().([]byte)
    defer ms.pool.Put(buf)
    
    buf = buf[:0]
    ms.buffer.Reset()
    ms.codec = gob.NewEncoder(ms.buffer)
    
    if err := ms.codec.Encode(msg); err != nil {
        return nil, err
    }
    
    return ms.buffer.Bytes(), nil
}

Bu optimizasyonlar, Go ile Kafka kullanımında yüksek performans elde etmek için önemli tekniklerdir. Özellikle:

  1. Bellek yönetimi ve GC optimizasyonu
  2. Yüksek performanslı consumer havuzu
  3. Batch işleme ve retry mekanizması
  4. Circuit breaker ve rate limiting
  5. Performans izleme ve metrik toplama
  6. Yüksek performanslı serileştirme

Bu teknikler, özellikle yüksek hacimli sistemlerde önemli performans artışları sağlayabilir.

🔍 Güvenlik Kontrol Listesi

  1. SSL/TLS Yapılandırması:

    • Sertifika yönetimi
    • Şifreleme protokolleri
    • Güvenli port yapılandırması
  2. Kimlik Doğrulama:

    • SASL mekanizmaları
    • Kullanıcı yönetimi
    • Şifre politikaları
  3. Yetkilendirme:

    • ACL kuralları
    • Rol tabanlı erişim kontrolü
    • Kaynak tabanlı izinler

📊 Kafka Performans Metrikleri

Gecikme:

  • Yazma Gecikmesi
  • Okuma Gecikmesi

Verim:

  • Saniyede mesaj
  • Saniyede byte

Dayanıklılık:

  • Veri kaybı oranı
  • Replika senkronizasyon gecikmesi

✅ Sonuç

Kafka’nın yüksek performansı, sıfır-kopya, toplu işleme, OS önbelleği kullanımı, bölümleme, sıkıştırma ve acks gibi mimari kararlarla sağlanır. Go ile birleştirildiğinde, bu yapılar yüksek performanslı ve ölçeklenebilir sistemler geliştirmeyi mümkün kılar.

📚 Kaynaklar

  1. Apache Kafka Dokümantasyonu
  2. Kafka İç Yapısı - Confluent Blog
  3. Go Kafka İstemcisi (Shopify/sarama)
  4. Kafka Streams Dokümantasyonu
  5. Prometheus ile Kafka İzleme
  6. Kafka Güvenlik Dokümantasyonu
  7. Kafka Connect Dokümantasyonu
  8. Kafka Bulut Servisleri
  9. Kafka Test Rehberi

📚 Ek Kaynaklar

  1. Go Performans En İyi Uygulamaları
  2. Kafka İzleme Araçları
  3. Go Eşzamanlılık Desenleri
  4. Kafka Güvenlik En İyi Uygulamaları
  5. Go Bellek Yönetimi