Modern veri mimarilerinin en güçlü ve verimli dağıtık mesajlaşma sistemlerinden biri olan Apache Kafka, yüksek performanslı uygulamalar geliştirmek için ideal bir çözüm sunmaktadır. Bu kapsamlı makalede, Kafka’nın olağanüstü performansının arkasındaki teknik mimariyi inceleyecek ve Go programlama dili ile etkili uygulamasını detaylı örnekler ve gerçek dünya senaryoları üzerinden göstereceğiz.
Kafka’nın temel mimarisini aşağıdaki diyagram ile gösterelim:
Kafka’nın hızını sağlayan temel unsurlar:
- Sıralı Disk I/O
- Sıfır-Kopya Transferi
- Toplu İşleme
- Bölümleme ve Paralellik
- İşletim Sistemi Sayfa Önbelleği
- Sıkıştırma
- Acks ve Replikasyon Ayarları
- Producer & Consumer Ayarları
- Mesaj Depolama ve Sıkıştırma Politikaları
- Kafka Streams ile Gerçek Zamanlı İşleme
- Zookeeper veya KRaft Modu
- İzleme ve Performans Ayarı
1. Sıralı Disk I/O
Kafka, mesajları diske sıralı olarak yazarak disk erişim maliyetlerini minimize eder.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
|
func writeSequentially(filename string, messages []string) error {
file, err := os.OpenFile(filename, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644)
if err != nil {
return err
}
defer file.Close()
for _, msg := range messages {
if _, err := file.WriteString(msg + "\n"); err != nil {
return err
}
}
return nil
}
|
2. Sıfır-Kopya Transferi
Kafka, sıfır-kopya ile kullanıcı alanını bypass ederek diske ve sokete daha hızlı veri transferi sağlar.
1
2
3
4
5
6
7
8
9
10
|
func sendFileZeroCopy(conn net.Conn, filename string) error {
file, err := os.Open(filename)
if err != nil {
return err
}
defer file.Close()
_, err = io.Copy(conn, file)
return err
}
|
3. Toplu İşleme
Kafka, mesajları gruplandırarak yazma maliyetlerini azaltır.
1
2
3
4
5
6
7
|
func processBatch(messages []Message) {
batch := make([]string, 0, len(messages))
for _, msg := range messages {
batch = append(batch, msg.Value)
}
writeSequentially("kafka.log", batch)
}
|
Önemli ayarlar:
batch.size
: Toplu mesaj boyutu
linger.ms
: Gönderme gecikmesi (daha fazla mesaj beklemek için)
4. Bölümleme ve Paralellik
Kafka, konuları bölümlere ayırarak paralel tüketimi sağlar.
1
2
3
4
5
6
7
8
9
10
11
|
func processPartitions(partitions [][]Message) {
var wg sync.WaitGroup
for i, partition := range partitions {
wg.Add(1)
go func(partitionNum int, msgs []Message) {
defer wg.Done()
processBatch(msgs)
}(i, partition)
}
wg.Wait()
}
|
5. İşletim Sistemi Sayfa Önbelleği
Kafka, disk erişimi için işletim sisteminin sayfa önbelleğini kullanır.
1
2
3
|
func readWithCache(filename string) ([]byte, error) {
return os.ReadFile(filename) // OS seviyesinde önbellek kullanır
}
|
6. Sıkıştırma
Kafka, gzip, lz4, snappy ve zstd gibi formatları kullanarak mesajları sıkıştırabilir.
1
2
3
4
5
6
7
8
9
10
|
func compressData(data []byte) ([]byte, error) {
var buf bytes.Buffer
zw := gzip.NewWriter(&buf)
_, err := zw.Write(data)
if err != nil {
return nil, err
}
zw.Close()
return buf.Bytes(), nil
}
|
7. Acks ve Replikasyon Ayarları
Kafka’da veri güvenliği ve hızı, acks ayarları ile yönetilir:
acks=0
: Onay yok (en hızlı, veri kaybı riski yüksek)
acks=1
: Sadece leader onayı
acks=all
: Tüm replikaların onayı (en güvenilir)
Ayrıca, minimum replikasyon seviyesi min.insync.replicas
ile ayarlanabilir.
8. Producer & Consumer Ayarları
Producer Ayarları:
buffer.memory
: Bellekteki tampon boyutu
compression.type
: Sıkıştırma tipi
retries
: Yeniden deneme sayısı
Consumer Ayarları:
fetch.min.bytes
: Broker’dan minimum veri eşiği
max.poll.records
: Poll başına maksimum mesaj sayısı
enable.auto.commit
: Otomatik offset commit
9. Mesaj Depolama ve Sıkıştırma Politikaları
Kafka mesajları şu şekilde saklayabilir:
- Zaman (
retention.ms
)
- Boyut (
retention.bytes
)
- Anahtar bazlı sıkıştırma (
cleanup.policy=compact
)
Bu, disk verimliliğini ve erişim hızını etkiler.
10. Kafka Streams ile Gerçek Zamanlı İşleme
Kafka sadece bir mesaj kuyruğu değil, aynı zamanda bir veri işleme motorudur.
1
2
3
4
|
// Örnek pencereleme işlemi - sahte kod
streamsBuilder.Stream("topic").
WindowedBy(TimeWindows.of(Duration.ofSeconds(10))).
Reduce((a, b) -> a + b)
|
11. Zookeeper Yerine KRaft Modu
Kafka 2.8+‘dan itibaren, Zookeeper olmadan KRaft (Kafka Raft) mimarisi kullanılabilir. Bu, yönetimi basitleştirir ve metadata gecikmesini azaltır.
Kafka’nın performansı uygun izleme araçları ile optimize edilebilir:
- Prometheus + JMX Exporter
- Grafana dashboardları
- Kafka Manager / AKHQ
İzlenmesi gereken metrikler:
- Mesaj hacmi
- Consumer gecikmesi
- Disk kullanımı
- Garbage Collection süreleri
- Network ve disk I/O
13. Go-Spesifik Kafka Uygulamaları
Sarama vs Confluent-kafka-go Karşılaştırması
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
|
// Sarama Producer Örneği
config := sarama.NewConfig()
config.Producer.Return.Successes = true
producer, err := sarama.NewSyncProducer([]string{"localhost:9092"}, config)
if err != nil {
log.Fatal(err)
}
defer producer.Close()
msg := &sarama.ProducerMessage{
Topic: "test-topic",
Value: sarama.StringEncoder("test message"),
}
partition, offset, err := producer.SendMessage(msg)
// Confluent-kafka-go Producer Örneği
p, err := kafka.NewProducer(&kafka.ConfigMap{
"bootstrap.servers": "localhost:9092",
})
if err != nil {
log.Fatal(err)
}
defer p.Close()
p.Produce(&kafka.Message{
TopicPartition: kafka.TopicPartition{Topic: &topic, Partition: kafka.PartitionAny},
Value: []byte("test message"),
}, nil)
|
Goroutine Tabanlı Consumer Havuzu
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
|
type ConsumerPool struct {
consumers []*Consumer
wg sync.WaitGroup
ctx context.Context
cancel context.CancelFunc
}
func NewConsumerPool(brokers []string, topic string, numConsumers int) *ConsumerPool {
ctx, cancel := context.WithCancel(context.Background())
pool := &ConsumerPool{
consumers: make([]*Consumer, numConsumers),
ctx: ctx,
cancel: cancel,
}
for i := 0; i < numConsumers; i++ {
pool.consumers[i] = NewConsumer(brokers, topic)
}
return pool
}
func (p *ConsumerPool) Start() {
for _, consumer := range p.consumers {
p.wg.Add(1)
go func(c *Consumer) {
defer p.wg.Done()
c.Consume(p.ctx)
}(consumer)
}
}
func (p *ConsumerPool) Stop() {
p.cancel()
p.wg.Wait()
}
|
14. Yük Dengeleme ve Consumer Grupları
Consumer Group Yeniden Dengeleme Stratejileri
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
|
// Consumer Group Yapılandırması
config := sarama.NewConfig()
config.Consumer.Group.Rebalance.Strategy = sarama.BalanceStrategyRoundRobin
config.Consumer.Group.Session.Timeout = 20 * time.Second
config.Consumer.Group.Heartbeat.Interval = 6 * time.Second
// Consumer Gecikme İzleme
func monitorConsumerLag(group string) {
admin, err := sarama.NewClusterAdmin([]string{"localhost:9092"}, nil)
if err != nil {
log.Fatal(err)
}
defer admin.Close()
for {
lags, err := admin.ListConsumerGroupOffsets(group, nil)
if err != nil {
log.Printf("Consumer gecikmesi alınırken hata: %v", err)
continue
}
// Gecikme bilgilerini işle
time.Sleep(1 * time.Minute)
}
}
|
15. Network ve Serileştirme
Schema Registry Entegrasyonu
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
|
// Schema Registry ile Avro Serileştirme
type User struct {
Name string `avro:"name"`
Email string `avro:"email"`
}
func serializeWithAvro(user User) ([]byte, error) {
schema, err := avro.Parse(`{
"type": "record",
"name": "User",
"fields": [
{"name": "name", "type": "string"},
{"name": "email", "type": "string"}
]
}`)
if err != nil {
return nil, err
}
return avro.Marshal(schema, user)
}
// Network Buffer Ayarı
config := sarama.NewConfig()
config.Net.MaxOpenRequests = 5
config.Net.DialTimeout = 30 * time.Second
config.Net.ReadTimeout = 30 * time.Second
config.Net.WriteTimeout = 30 * time.Second
|
16. Hata Yönetimi ve Dayanıklılık
Circuit Breaker Deseni
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
|
type CircuitBreaker struct {
failures int
threshold int
resetTimeout time.Duration
lastFailure time.Time
mu sync.Mutex
}
func (cb *CircuitBreaker) Execute(f func() error) error {
cb.mu.Lock()
if cb.failures >= cb.threshold {
if time.Since(cb.lastFailure) < cb.resetTimeout {
cb.mu.Unlock()
return errors.New("circuit breaker açık")
}
cb.failures = 0
}
cb.mu.Unlock()
err := f()
if err != nil {
cb.mu.Lock()
cb.failures++
cb.lastFailure = time.Now()
cb.mu.Unlock()
}
return err
}
// Dead Letter Queue Uygulaması
func processWithDLQ(msg *sarama.ConsumerMessage) error {
err := processMessage(msg)
if err != nil {
// DLQ'ya gönder
dlqMsg := &sarama.ProducerMessage{
Topic: "dlq-topic",
Value: sarama.ByteEncoder(msg.Value),
Headers: []sarama.RecordHeader{
{
Key: []byte("original-topic"),
Value: []byte(msg.Topic),
},
{
Key: []byte("error"),
Value: []byte(err.Error()),
},
},
}
producer.SendMessage(dlqMsg)
}
return err
}
|
17. Gerçek Dünya Kullanım Örnekleri
Log Toplama Deseni
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
|
type LogAggregator struct {
producer sarama.SyncProducer
topic string
}
func (la *LogAggregator) SendLog(service string, level string, message string) error {
logEntry := map[string]interface{}{
"timestamp": time.Now().Unix(),
"service": service,
"level": level,
"message": message,
}
jsonData, err := json.Marshal(logEntry)
if err != nil {
return err
}
msg := &sarama.ProducerMessage{
Topic: la.topic,
Value: sarama.ByteEncoder(jsonData),
}
_, _, err = la.producer.SendMessage(msg)
return err
}
|
Event Sourcing Uygulaması
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
|
type EventStore struct {
producer sarama.SyncProducer
topic string
}
func (es *EventStore) SaveEvent(aggregateID string, eventType string, data interface{}) error {
event := map[string]interface{}{
"aggregate_id": aggregateID,
"event_type": eventType,
"data": data,
"timestamp": time.Now().Unix(),
}
jsonData, err := json.Marshal(event)
if err != nil {
return err
}
msg := &sarama.ProducerMessage{
Topic: es.topic,
Key: sarama.StringEncoder(aggregateID),
Value: sarama.ByteEncoder(jsonData),
}
_, _, err = es.producer.SendMessage(msg)
return err
}
|
18. Benchmarking ve Test
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
|
func benchmarkKafkaPerformance(b *testing.B) {
config := sarama.NewConfig()
config.Producer.Return.Successes = true
config.Producer.Compression = sarama.CompressionSnappy
producer, err := sarama.NewSyncProducer([]string{"localhost:9092"}, config)
if err != nil {
b.Fatal(err)
}
defer producer.Close()
b.Run("TekMesaj", func(b *testing.B) {
for i := 0; i < b.N; i++ {
msg := &sarama.ProducerMessage{
Topic: "benchmark-topic",
Value: sarama.StringEncoder(fmt.Sprintf("message-%d", i)),
}
producer.SendMessage(msg)
}
})
b.Run("TopluMesaj", func(b *testing.B) {
batch := make([]*sarama.ProducerMessage, 100)
for i := 0; i < b.N; i++ {
for j := 0; j < 100; j++ {
batch[j] = &sarama.ProducerMessage{
Topic: "benchmark-topic",
Value: sarama.StringEncoder(fmt.Sprintf("message-%d-%d", i, j)),
}
}
producer.SendMessages(batch)
}
})
}
|
19. Kafka Broker’ları için JVM Ayarı
Garbage Collection Yapılandırması
1
2
3
4
|
# server.properties
# G1GC Yapılandırması
KAFKA_HEAP_OPTS="-Xmx6G -Xms6G"
KAFKA_JVM_PERFORMANCE_OPTS="-XX:+UseG1GC -XX:MaxGCPauseMillis=20 -XX:InitiatingHeapOccupancyPercent=35 -XX:G1HeapRegionSize=16M -XX:MinMetaspaceFreeRatio=50 -XX:MaxMetaspaceFreeRatio=80"
|
Off-heap Bellek Yapılandırması
1
2
3
|
# server.properties
# Direct Memory Yapılandırması
KAFKA_JVM_PERFORMANCE_OPTS="$KAFKA_JVM_PERFORMANCE_OPTS -XX:MaxDirectMemorySize=1G"
|
20. Mikroservis İletişim Desenleri
İstek-Yanıt Deseni
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
|
type RequestReply struct {
producer sarama.SyncProducer
consumer sarama.Consumer
topic string
}
func (rr *RequestReply) SendRequest(requestID string, payload []byte) ([]byte, error) {
// İstek gönder
msg := &sarama.ProducerMessage{
Topic: rr.topic,
Key: sarama.StringEncoder(requestID),
Value: sarama.ByteEncoder(payload),
}
_, _, err := rr.producer.SendMessage(msg)
if err != nil {
return nil, err
}
// Yanıt bekle
partitionConsumer, err := rr.consumer.ConsumePartition(rr.topic, 0, sarama.OffsetNewest)
if err != nil {
return nil, err
}
defer partitionConsumer.Close()
for msg := range partitionConsumer.Messages() {
if string(msg.Key) == requestID {
return msg.Value, nil
}
}
return nil, errors.New("yanıt için zaman aşımı")
}
|
21. Kafka Güvenliği
SSL/TLS Yapılandırması
1
2
3
4
5
6
7
|
config := sarama.NewConfig()
config.Net.TLS.Enable = true
config.Net.TLS.Config = &tls.Config{
InsecureSkipVerify: false,
RootCAs: rootCAs,
Certificates: []tls.Certificate{cert},
}
|
SASL Kimlik Doğrulama
1
2
3
4
5
|
config := sarama.NewConfig()
config.Net.SASL.Enable = true
config.Net.SASL.Mechanism = sarama.SASLTypePlaintext
config.Net.SASL.User = "kafka-user"
config.Net.SASL.Password = "kafka-password"
|
ACL Yönetimi
1
2
3
4
5
6
|
# Topic için ACL oluştur
kafka-acls --bootstrap-server localhost:9092 \
--add \
--allow-principal User:app1 \
--operation Read \
--topic test-topic
|
22. Kafka Connect
Veritabanı Bağlantısı
1
2
3
4
5
6
7
8
9
10
11
|
{
"name": "mysql-source",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
"connection.url": "jdbc:mysql://localhost:3306/mydb",
"table.whitelist": "users",
"mode": "incrementing",
"incrementing.column.name": "id",
"topic.prefix": "mysql-"
}
}
|
Dosya Sistemi Bağlantısı
1
2
3
4
5
6
7
8
|
{
"name": "file-source",
"config": {
"connector.class": "org.apache.kafka.connect.file.FileStreamSourceConnector",
"file": "/path/to/file.txt",
"topic": "file-topic"
}
}
|
23. Kafka Streams Detaylı Örnekler
Durumlu İşlemler
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
|
type UserState struct {
Count int64
LastSeen time.Time
UserID string
}
func processUserEvents(stream *kstream.Stream) {
stream.GroupByKey().
Aggregate(
func() UserState { return UserState{} },
func(key string, value interface{}, state UserState) UserState {
state.Count++
state.LastSeen = time.Now()
return state
},
"user-state-store",
)
}
|
Pencereleme İşlemleri
1
2
3
4
5
|
func processTimeWindows(stream *kstream.Stream) {
stream.GroupByKey().
WindowedBy(kstream.TumblingWindow(5 * time.Minute)).
Count("windowed-counts")
}
|
24. Felaket Kurtarma ve Yüksek Erişilebilirlik
Yedekleme Stratejisi
1
2
3
4
5
|
# Topic yedekleme
kafka-backup --bootstrap-server localhost:9092 \
--topic test-topic \
--backup-dir /backup/kafka \
--format json
|
Çoklu Veri Merkezi Dağıtımı
1
2
3
|
# server.properties
broker.rack=dc1-rack1
replica.selector.class=org.apache.kafka.common.replica.RackAwareReplicaSelector
|
25. Kafka UI Araçları
AKHQ Yapılandırması
1
2
3
4
5
6
7
8
|
akhq:
connections:
docker-kafka-server:
properties:
bootstrap.servers: "localhost:9092"
topics:
filter:
- "test-.*"
|
26. Anti-patterns ve En İyi Uygulamalar
Topic Tasarımı
1
2
3
4
5
6
|
// İyi topic isimlendirme örneği
const (
OrderCreatedTopic = "order-service.order.created"
OrderProcessedTopic = "order-service.order.processed"
PaymentReceivedTopic = "payment-service.payment.received"
)
|
Partition Sayısı Belirleme
1
2
3
4
|
func calculatePartitionCount(throughput, targetLatency int) int {
// Her partition saniyede ~10MB işleyebilir
return (throughput / 10) + 1
}
|
27. Sorun Giderme
Yaygın Hata Senaryoları
1
2
3
4
5
6
7
8
9
10
|
func handleCommonErrors(err error) {
switch {
case errors.Is(err, sarama.ErrNotLeaderForPartition):
log.Println("Partition lideri değişti, yeniden denenecek")
case errors.Is(err, sarama.ErrLeaderNotAvailable):
log.Println("Lider mevcut değil, yeniden denenecek")
case errors.Is(err, sarama.ErrRequestTimedOut):
log.Println("İstek zaman aşımına uğradı")
}
}
|
28. İzleme ve Uyarı
Prometheus Metrikleri
1
2
3
4
|
func exposeMetrics() {
http.Handle("/metrics", promhttp.Handler())
go http.ListenAndServe(":8080", nil)
}
|
Grafana Dashboard
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
|
{
"dashboard": {
"panels": [
{
"title": "Kafka Consumer Gecikmesi",
"type": "graph",
"datasource": "Prometheus",
"targets": [
{
"expr": "kafka_consumer_group_lag"
}
]
}
]
}
}
|
29. Dağıtım Stratejileri
Kubernetes Dağıtımı
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
|
apiVersion: apps/v1
kind: StatefulSet
metadata:
name: kafka
spec:
serviceName: kafka
replicas: 3
template:
spec:
containers:
- name: kafka
image: confluentinc/cp-kafka:latest
ports:
- containerPort: 9092
volumeMounts:
- name: data
mountPath: /var/lib/kafka/data
|
30. Bulut Servisleri
AWS MSK Yapılandırması
1
2
3
4
5
6
7
8
9
10
11
12
|
resource "aws_msk_cluster" "example" {
cluster_name = "example"
kafka_version = "2.8.1"
number_of_broker_nodes = 3
broker_node_group_info {
instance_type = "kafka.m5.large"
ebs_storage_size = 1000
client_subnets = [aws_subnet.example.id]
security_groups = [aws_security_group.example.id]
}
}
|
31. Test Stratejileri
Birim Testi
1
2
3
4
5
6
7
8
|
func TestKafkaProducer(t *testing.T) {
mockProducer := NewMockProducer()
producer := NewProducer(mockProducer)
err := producer.SendMessage("test-topic", "test-message")
assert.NoError(t, err)
assert.True(t, mockProducer.MessageSent)
}
|
Kaos Testi
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
|
func TestKafkaChaos(t *testing.T) {
// Broker'ı durdur
stopBroker()
// Mesaj göndermeyi dene
err := producer.SendMessage("test-topic", "test-message")
assert.Error(t, err)
// Broker'ı yeniden başlat
startBroker()
// Mesaj göndermeyi tekrar dene
err = producer.SendMessage("test-topic", "test-message")
assert.NoError(t, err)
}
|
32. Go ile Kafka Kullanımında İleri Seviye Optimizasyonlar
Bellek Yönetimi ve GC Optimizasyonu
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
|
type KafkaProducer struct {
producer sarama.SyncProducer
buffer *bytes.Buffer
pool *sync.Pool
}
func NewKafkaProducer() *KafkaProducer {
return &KafkaProducer{
buffer: bytes.NewBuffer(make([]byte, 0, 1024)),
pool: &sync.Pool{
New: func() interface{} {
return make([]byte, 0, 1024)
},
},
}
}
func (kp *KafkaProducer) SendMessageOptimized(topic string, message []byte) error {
// Buffer'ı pool'dan al
buf := kp.pool.Get().([]byte)
defer kp.pool.Put(buf)
// Buffer'ı temizle ve mesajı ekle
buf = buf[:0]
buf = append(buf, message...)
msg := &sarama.ProducerMessage{
Topic: topic,
Value: sarama.ByteEncoder(buf),
}
_, _, err := kp.producer.SendMessage(msg)
return err
}
|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
|
type HighPerformanceConsumer struct {
consumers []*Consumer
wg sync.WaitGroup
ctx context.Context
cancel context.CancelFunc
metrics *ConsumerMetrics
}
type ConsumerMetrics struct {
messagesProcessed atomic.Int64
processingTime atomic.Int64
errors atomic.Int64
lastProcessedOffset atomic.Int64
}
func NewHighPerformanceConsumer(brokers []string, topic string, numConsumers int) *HighPerformanceConsumer {
ctx, cancel := context.WithCancel(context.Background())
return &HighPerformanceConsumer{
consumers: make([]*Consumer, numConsumers),
ctx: ctx,
cancel: cancel,
metrics: &ConsumerMetrics{},
}
}
func (c *HighPerformanceConsumer) Start() {
for i := 0; i < len(c.consumers); i++ {
c.wg.Add(1)
go func(consumerID int) {
defer c.wg.Done()
c.processMessages(consumerID)
}(i)
}
}
func (c *HighPerformanceConsumer) processMessages(consumerID int) {
consumer := c.consumers[consumerID]
for {
select {
case <-c.ctx.Done():
return
default:
messages, err := consumer.FetchMessages(100)
if err != nil {
c.metrics.errors.Add(1)
continue
}
startTime := time.Now()
for _, msg := range messages {
if err := c.processMessage(msg); err != nil {
c.metrics.errors.Add(1)
continue
}
c.metrics.messagesProcessed.Add(1)
c.metrics.lastProcessedOffset.Store(msg.Offset)
}
c.metrics.processingTime.Add(time.Since(startTime).Milliseconds())
}
}
}
|
Batch İşleme ve Retry Mekanizması
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
|
type BatchProcessor struct {
batchSize int
maxRetries int
retryBackoff time.Duration
processor MessageProcessor
metrics *BatchMetrics
}
type BatchMetrics struct {
batchCount atomic.Int64
successCount atomic.Int64
failureCount atomic.Int64
retryCount atomic.Int64
processingTime atomic.Int64
}
func (bp *BatchProcessor) ProcessBatch(messages []Message) error {
startTime := time.Now()
defer func() {
bp.metrics.processingTime.Add(time.Since(startTime).Milliseconds())
}()
batch := make([]Message, 0, bp.batchSize)
for _, msg := range messages {
batch = append(batch, msg)
if len(batch) >= bp.batchSize {
if err := bp.processWithRetry(batch); err != nil {
return err
}
batch = batch[:0]
}
}
if len(batch) > 0 {
return bp.processWithRetry(batch)
}
return nil
}
func (bp *BatchProcessor) processWithRetry(batch []Message) error {
var err error
for i := 0; i < bp.maxRetries; i++ {
if err = bp.processor.Process(batch); err == nil {
bp.metrics.successCount.Add(1)
return nil
}
bp.metrics.retryCount.Add(1)
time.Sleep(bp.retryBackoff * time.Duration(i+1))
}
bp.metrics.failureCount.Add(1)
return err
}
|
Circuit Breaker ve Rate Limiting
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
|
type KafkaCircuitBreaker struct {
failures atomic.Int64
threshold int64
resetTimeout time.Duration
lastFailure atomic.Int64
mu sync.RWMutex
}
type RateLimiter struct {
rate rate.Limiter
burst int
windowSize time.Duration
}
func NewKafkaCircuitBreaker(threshold int64, resetTimeout time.Duration) *KafkaCircuitBreaker {
return &KafkaCircuitBreaker{
threshold: threshold,
resetTimeout: resetTimeout,
}
}
func (cb *KafkaCircuitBreaker) Execute(f func() error) error {
if cb.failures.Load() >= cb.threshold {
if time.Since(time.Unix(0, cb.lastFailure.Load())) < cb.resetTimeout {
return errors.New("circuit breaker açık")
}
cb.failures.Store(0)
}
err := f()
if err != nil {
cb.failures.Add(1)
cb.lastFailure.Store(time.Now().UnixNano())
}
return err
}
func NewRateLimiter(rate float64, burst int, windowSize time.Duration) *RateLimiter {
return &RateLimiter{
rate: *rate.NewLimiter(rate, burst),
burst: burst,
windowSize: windowSize,
}
}
func (rl *RateLimiter) Allow() bool {
return rl.rate.Allow()
}
|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
|
type KafkaMetrics struct {
producerMetrics *ProducerMetrics
consumerMetrics *ConsumerMetrics
brokerMetrics *BrokerMetrics
}
type ProducerMetrics struct {
messagesSent atomic.Int64
bytesSent atomic.Int64
sendLatency atomic.Int64
errors atomic.Int64
batchSize atomic.Int64
}
type BrokerMetrics struct {
activeConnections atomic.Int64
requestLatency atomic.Int64
bytesIn atomic.Int64
bytesOut atomic.Int64
}
func (km *KafkaMetrics) ExposeMetrics() {
http.Handle("/metrics", promhttp.Handler())
go http.ListenAndServe(":8080", nil)
}
func (km *KafkaMetrics) RecordProducerMetrics(msg *sarama.ProducerMessage, err error) {
if err != nil {
km.producerMetrics.errors.Add(1)
return
}
km.producerMetrics.messagesSent.Add(1)
km.producerMetrics.bytesSent.Add(int64(len(msg.Value.Encode())))
km.producerMetrics.batchSize.Add(1)
}
|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
|
type MessageSerializer struct {
codec *gob.Encoder
buffer *bytes.Buffer
pool *sync.Pool
}
func NewMessageSerializer() *MessageSerializer {
return &MessageSerializer{
buffer: bytes.NewBuffer(make([]byte, 0, 1024)),
pool: &sync.Pool{
New: func() interface{} {
return make([]byte, 0, 1024)
},
},
}
}
func (ms *MessageSerializer) Serialize(msg interface{}) ([]byte, error) {
buf := ms.pool.Get().([]byte)
defer ms.pool.Put(buf)
buf = buf[:0]
ms.buffer.Reset()
ms.codec = gob.NewEncoder(ms.buffer)
if err := ms.codec.Encode(msg); err != nil {
return nil, err
}
return ms.buffer.Bytes(), nil
}
|
Bu optimizasyonlar, Go ile Kafka kullanımında yüksek performans elde etmek için önemli tekniklerdir. Özellikle:
- Bellek yönetimi ve GC optimizasyonu
- Yüksek performanslı consumer havuzu
- Batch işleme ve retry mekanizması
- Circuit breaker ve rate limiting
- Performans izleme ve metrik toplama
- Yüksek performanslı serileştirme
Bu teknikler, özellikle yüksek hacimli sistemlerde önemli performans artışları sağlayabilir.
🔍 Güvenlik Kontrol Listesi
-
SSL/TLS Yapılandırması:
- Sertifika yönetimi
- Şifreleme protokolleri
- Güvenli port yapılandırması
-
Kimlik Doğrulama:
- SASL mekanizmaları
- Kullanıcı yönetimi
- Şifre politikaları
-
Yetkilendirme:
- ACL kuralları
- Rol tabanlı erişim kontrolü
- Kaynak tabanlı izinler
Gecikme:
- Yazma Gecikmesi
- Okuma Gecikmesi
Verim:
- Saniyede mesaj
- Saniyede byte
Dayanıklılık:
- Veri kaybı oranı
- Replika senkronizasyon gecikmesi
✅ Sonuç
Kafka’nın yüksek performansı, sıfır-kopya, toplu işleme, OS önbelleği kullanımı, bölümleme, sıkıştırma ve acks gibi mimari kararlarla sağlanır. Go ile birleştirildiğinde, bu yapılar yüksek performanslı ve ölçeklenebilir sistemler geliştirmeyi mümkün kılar.
📚 Kaynaklar
- Apache Kafka Dokümantasyonu
- Kafka İç Yapısı - Confluent Blog
- Go Kafka İstemcisi (Shopify/sarama)
- Kafka Streams Dokümantasyonu
- Prometheus ile Kafka İzleme
- Kafka Güvenlik Dokümantasyonu
- Kafka Connect Dokümantasyonu
- Kafka Bulut Servisleri
- Kafka Test Rehberi
📚 Ek Kaynaklar
- Go Performans En İyi Uygulamaları
- Kafka İzleme Araçları
- Go Eşzamanlılık Desenleri
- Kafka Güvenlik En İyi Uygulamaları
- Go Bellek Yönetimi