İçerikler

Golang ve C# ile Apache Kafka Uygulamaları: Karşılaştırmalı Bir İnceleme

Apache Kafka, 2011 yılında LinkedIn tarafından geliştirilen ve sonrasında Apache Software Foundation’a bağışlanan açık kaynaklı bir dağıtık olay akışı platformudur. Başlangıçta LinkedIn’in iç ihtiyaçları için geliştirilen Kafka, bugün Netflix, Uber, Twitter, Airbnb gibi büyük teknoloji şirketleri tarafından yaygın olarak kullanılmaktadır.

Kafka’nın popülerliğinin temel nedenleri şunlardır:

  • Yüksek Verimlilik: Saniyede milyonlarca mesajı işleyebilme kapasitesi
  • Düşük Gecikme: Milisaniye seviyesinde gecikme süreleri
  • Dayanıklılık: Veri kaybını önleyen güvenilir depolama mekanizması
  • Ölçeklenebilirlik: Yatay olarak kolayca ölçeklenebilme yeteneği
  • Dağıtık Mimari: Yüksek erişilebilirlik için çoklu broker desteği

Modern mikroservis mimarilerinin yaygınlaşmasıyla birlikte, servisler arası iletişim için Kafka gibi mesajlaşma sistemleri vazgeçilmez hale gelmiştir. Bu makalede, iki popüler programlama dili olan Golang ve C#‘ın Kafka ekosistemindeki yerini inceleyeceğiz.

Kafka Mimarisi ve Temel Kavramlar

Apache Kafka’yı anlamak için temel bileşenlerini ve kavramlarını bilmek önemlidir:

Temel Bileşenler

  • Broker: Kafka cluster’ını oluşturan sunucular
  • Topic: Mesajların kategorize edildiği kanallar
  • Partition: Topic’lerin paralel işlenebilmesi için bölünmüş parçaları
  • Producer: Kafka’ya mesaj gönderen uygulamalar
  • Consumer: Kafka’dan mesaj okuyan uygulamalar
  • Consumer Group: Bir topic’in partitionlarını paylaşarak okuyan consumer’lar grubu
  • Zookeeper/KRaft: Kafka cluster’ının metadata yönetimini sağlayan servis

Kafka’nın Çalışma Prensibi

Kafka, “commit log” adı verilen sıralı, ekleme-odaklı bir veri yapısı kullanır. Bu yapı sayesinde:

  1. Mesajlar disk üzerinde sıralı olarak saklanır
  2. Consumer’lar kendi okuma pozisyonlarını (offset) takip eder
  3. Mesajlar belirli bir süre (retention period) saklanır
  4. Yüksek throughput için sequential I/O ve zero-copy optimizasyonları kullanılır

Kafka’nın Yaygın Kullanım Senaryoları

Apache Kafka, çeşitli endüstrilerde farklı amaçlar için kullanılmaktadır. Aşağıdaki diyagram, Kafka ekosisteminin genel bir görünümünü ve yaygın kullanım senaryolarını göstermektedir:

Kafka’nın Yaygın Kullanım Senaryoları

Apache Kafka, çeşitli endüstrilerde farklı amaçlar için kullanılmaktadır. Aşağıdaki diyagram, Kafka ekosisteminin genel bir görünümünü ve yaygın kullanım senaryolarını göstermektedir:

Yukarıdaki diyagram, Kafka ekosisteminin dört ana bileşenini göstermektedir:

  1. Veri Kaynakları: Veritabanları, API servisleri, IoT cihazları, log sistemleri ve kullanıcı etkileşimleri gibi çeşitli kaynaklardan veriler Kafka’ya aktarılır.

  2. Kafka Ekosistemi: Kafka cluster’ı, producer’lar, consumer’lar ve Kafka Connect, Kafka Streams, ksqlDB gibi ek bileşenlerden oluşur. Bu ekosistem, verilerin güvenilir bir şekilde alınması, saklanması, işlenmesi ve dağıtılması için gerekli altyapıyı sağlar.

  3. Kullanım Senaryoları: Kafka’nın yaygın kullanım alanları arasında mikroservis iletişimi, olay kaynaklı mimari, gerçek zamanlı analitik, veri entegrasyonu, log toplama, metrik izleme ve stream processing bulunmaktadır.

  4. Hedef Sistemler: İşlenen veriler, veri ambarları, dashboard’lar, alarm sistemleri, uygulamalar, makine öğrenmesi modelleri, arama motorları ve önbellekler gibi çeşitli hedef sistemlere aktarılır.

Şimdi bu kullanım senaryolarını daha detaylı inceleyelim:

Mesajlaşma Sistemi

Geleneksel mesaj kuyrukları yerine, yüksek performanslı ve ölçeklenebilir bir mesajlaşma altyapısı olarak kullanılır. Mikroservisler arasındaki asenkron iletişimi sağlar.

Log Toplama

Dağıtık sistemlerden gelen logları merkezi bir noktada toplamak ve işlemek için kullanılır. Bu, sistem izleme ve sorun giderme süreçlerini kolaylaştırır.

Stream Processing

Gerçek zamanlı veri akışlarını işlemek için kullanılır. Örneğin, finansal işlemlerin anlık olarak izlenmesi veya kullanıcı davranışlarının gerçek zamanlı analizi.

Event Sourcing

Sistemdeki tüm değişikliklerin olay olarak kaydedildiği ve mevcut durumun bu olayların işlenmesiyle oluşturulduğu mimari yaklaşımda kullanılır.

Veri Entegrasyonu

Farklı sistemler arasında veri senkronizasyonu ve ETL (Extract, Transform, Load) işlemleri için kullanılır.

Telemetri Verileri

IoT cihazlarından gelen telemetri verilerinin toplanması ve işlenmesi için kullanılır.

Gerçek Zamanlı Analitik

Kullanıcı etkileşimleri, sistem metrikleri gibi verilerin gerçek zamanlı olarak analiz edilmesi için kullanılır.

Yukarıdaki diyagram, Kafka ekosisteminin dört ana bileşenini göstermektedir:

  1. Veri Kaynakları: Veritabanları, API servisleri, IoT cihazları, log sistemleri ve kullanıcı etkileşimleri gibi çeşitli kaynaklardan veriler Kafka’ya aktarılır.

  2. Kafka Ekosistemi: Kafka cluster’ı, producer’lar, consumer’lar ve Kafka Connect, Kafka Streams, ksqlDB gibi ek bileşenlerden oluşur. Bu ekosistem, verilerin güvenilir bir şekilde alınması, saklanması, işlenmesi ve dağıtılması için gerekli altyapıyı sağlar.

  3. Kullanım Senaryoları: Kafka’nın yaygın kullanım alanları arasında mikroservis iletişimi, olay kaynaklı mimari, gerçek zamanlı analitik, veri entegrasyonu, log toplama, metrik izleme ve stream processing bulunmaktadır.

  4. Hedef Sistemler: İşlenen veriler, veri ambarları, dashboard’lar, alarm sistemleri, uygulamalar, makine öğrenmesi modelleri, arama motorları ve önbellekler gibi çeşitli hedef sistemlere aktarılır.

Şimdi bu kullanım senaryolarını daha detaylı inceleyelim:

Mesajlaşma Sistemi

Geleneksel mesaj kuyrukları yerine, yüksek performanslı ve ölçeklenebilir bir mesajlaşma altyapısı olarak kullanılır. Mikroservisler arasındaki asenkron iletişimi sağlar.

Log Toplama

Dağıtık sistemlerden gelen logları merkezi bir noktada toplamak ve işlemek için kullanılır. Bu, sistem izleme ve sorun giderme süreçlerini kolaylaştırır.

Stream Processing

Gerçek zamanlı veri akışlarını işlemek için kullanılır. Örneğin, finansal işlemlerin anlık olarak izlenmesi veya kullanıcı davranışlarının gerçek zamanlı analizi.

Event Sourcing

Sistemdeki tüm değişikliklerin olay olarak kaydedildiği ve mevcut durumun bu olayların işlenmesiyle oluşturulduğu mimari yaklaşımda kullanılır.

Veri Entegrasyonu

Farklı sistemler arasında veri senkronizasyonu ve ETL (Extract, Transform, Load) işlemleri için kullanılır.

Telemetri Verileri

IoT cihazlarından gelen telemetri verilerinin toplanması ve işlenmesi için kullanılır.

Gerçek Zamanlı Analitik

Kullanıcı etkileşimleri, sistem metrikleri gibi verilerin gerçek zamanlı olarak analiz edilmesi için kullanılır.

Kafka Ekosistemi: Connect ve Streams

Kafka Connect

Kafka Connect, Kafka ile diğer sistemler arasında veri akışını kolaylaştıran bir çerçevedir. Hazır connector’lar kullanarak veya özel connector’lar geliştirerek, veritabanları, dosya sistemleri, anahtar-değer depoları ve diğer sistemlerle entegrasyon sağlar.

Golang ile Kafka Connect

Golang’da Kafka Connect için resmi bir kütüphane bulunmamaktadır, ancak özel connector’lar geliştirmek mümkündür:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
package main

import (
	"context"
	"encoding/json"
	"fmt"
	"log"
	"time"

	"github.com/Shopify/sarama"
	"github.com/go-sql-driver/mysql"
)

// MySQL'den veri çeken ve Kafka'ya gönderen basit bir connector örneği
func main() {
	// MySQL bağlantısı
	db, err := sql.Open("mysql", "user:password@tcp(127.0.0.1:3306)/database")
	if err != nil {
		log.Fatal(err)
	}
	defer db.Close()

	// Kafka producer yapılandırması
	config := sarama.NewConfig()
	config.Producer.Return.Successes = true
	producer, err := sarama.NewSyncProducer([]string{"localhost:9092"}, config)
	if err != nil {
		log.Fatal(err)
	}
	defer producer.Close()

	// Periyodik olarak veri çekme ve gönderme
	ticker := time.NewTicker(1 * time.Minute)
	defer ticker.Stop()

	for range ticker.C {
		// Veritabanından veri çekme
		rows, err := db.Query("SELECT id, name, created_at FROM users WHERE updated_at > ?", 
			time.Now().Add(-1*time.Minute))
		if err != nil {
			log.Println("Query error:", err)
			continue
		}

		for rows.Next() {
			var id int
			var name string
			var createdAt time.Time
			
			if err := rows.Scan(&id, &name, &createdAt); err != nil {
				log.Println("Scan error:", err)
				continue
			}
			
			// Veriyi JSON formatına dönüştürme
			data, err := json.Marshal(map[string]interface{}{
				"id": id,
				"name": name,
				"created_at": createdAt,
			})
			if err != nil {
				log.Println("JSON error:", err)
				continue
			}
			
			// Kafka'ya gönderme
			_, _, err = producer.SendMessage(&sarama.ProducerMessage{
				Topic: "db-changes",
				Key:   sarama.StringEncoder(fmt.Sprintf("%d", id)),
				Value: sarama.ByteEncoder(data),
			})
			if err != nil {
				log.Println("Send error:", err)
			}
		}
		rows.Close()
	}
}

C# ile Kafka Connect

C#‘da Kafka Connect için özel connector’lar geliştirilebilir:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
using System;
using System.Data.SqlClient;
using System.Text.Json;
using System.Threading;
using System.Threading.Tasks;
using Confluent.Kafka;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;

namespace SqlServerKafkaConnector
{
    public class SqlServerSourceConnector : BackgroundService
    {
        private readonly ILogger<SqlServerSourceConnector> _logger;
        private readonly string _connectionString;
        private readonly string _topic;
        private readonly string _bootstrapServers;
        private DateTime _lastCheckpoint = DateTime.MinValue;

        public SqlServerSourceConnector(
            ILogger<SqlServerSourceConnector> logger,
            string connectionString,
            string topic,
            string bootstrapServers)
        {
            _logger = logger;
            _connectionString = connectionString;
            _topic = topic;
            _bootstrapServers = bootstrapServers;
        }

        protected override async Task ExecuteAsync(CancellationToken stoppingToken)
        {
            var config = new ProducerConfig { BootstrapServers = _bootstrapServers };
            
            using var producer = new ProducerBuilder<string, string>(config).Build();
            
            while (!stoppingToken.IsCancellationRequested)
            {
                try
                {
                    await PollDatabaseChanges(producer);
                    await Task.Delay(TimeSpan.FromMinutes(1), stoppingToken);
                }
                catch (Exception ex)
                {
                    _logger.LogError(ex, "Error polling database changes");
                    await Task.Delay(TimeSpan.FromSeconds(10), stoppingToken);
                }
            }
        }

        private async Task PollDatabaseChanges(IProducer<string, string> producer)
        {
            using var connection = new SqlConnection(_connectionString);
            await connection.OpenAsync();
            
            var query = @"SELECT Id, Name, CreatedAt FROM Users WHERE UpdatedAt > @LastCheckpoint";
            
            using var command = new SqlCommand(query, connection);
            command.Parameters.AddWithValue("@LastCheckpoint", _lastCheckpoint);
            
            using var reader = await command.ExecuteReaderAsync();
            
            while (await reader.ReadAsync())
            {
                var id = reader.GetInt32(0);
                var name = reader.GetString(1);
                var createdAt = reader.GetDateTime(2);
                
                var data = new
                {
                    Id = id,
                    Name = name,
                    CreatedAt = createdAt
                };
                
                var json = JsonSerializer.Serialize(data);
                
                await producer.ProduceAsync(_topic, new Message<string, string>
                {
                    Key = id.ToString(),
                    Value = json
                });
                
                _logger.LogInformation("Sent record: {Id}", id);
            }
            
            _lastCheckpoint = DateTime.UtcNow;
        }
    }
}

Kafka Streams

Kafka Streams, Kafka üzerinde stream processing uygulamaları geliştirmek için kullanılan bir kütüphanedir. Stateful ve stateless işlemler, windowing, joining ve aggregation gibi yetenekler sunar.

Golang için Stream Processing Alternatifleri

Golang’da resmi bir Kafka Streams kütüphanesi bulunmamaktadır, ancak benzer işlevselliği sağlayan özel çözümler geliştirilebilir:

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
package main

import (
	"context"
	"encoding/json"
	"log"
	"strings"
	"sync"
	"time"

	"github.com/Shopify/sarama"
)

// Basit bir stream processing uygulaması
func main() {
	// Consumer yapılandırması
	consumerConfig := sarama.NewConfig()
	consumerConfig.Consumer.Return.Errors = true
	consumerConfig.Consumer.Offsets.Initial = sarama.OffsetOldest

	// Producer yapılandırması
	producerConfig := sarama.NewConfig()
	producerConfig.Producer.Return.Successes = true

	// Consumer grup oluşturma
	consumer, err := sarama.NewConsumerGroup([]string{"localhost:9092"}, "stream-processor", consumerConfig)
	if err != nil {
		log.Fatalf("Error creating consumer group: %v", err)
	}
	defer consumer.Close()

	// Producer oluşturma
	producer, err := sarama.NewSyncProducer([]string{"localhost:9092"}, producerConfig)
	if err != nil {
		log.Fatalf("Error creating producer: %v", err)
	}
	defer producer.Close()

	// Stream processor
	handler := &StreamProcessor{
		producer: producer,
		// 5 dakikalık bir pencere için basit bir in-memory state
		windowedCounts: make(map[string]int),
		windowStart:    time.Now(),
		windowSize:     5 * time.Minute,
	}

	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()

	// Tüketim döngüsü
	topics := []string{"input-topic"}
	for {
		if err := consumer.Consume(ctx, topics, handler); err != nil {
			log.Fatalf("Error from consumer: %v", err)
		}
		
		if ctx.Err() != nil {
			return
		}
	}
}

// StreamProcessor, sarama.ConsumerGroupHandler arayüzünü uygular
type StreamProcessor struct {
	producer       sarama.SyncProducer
	windowedCounts map[string]int
	windowStart    time.Time
	windowSize     time.Duration
	mu             sync.Mutex
}

func (p *StreamProcessor) Setup(session sarama.ConsumerGroupSession) error {
	return nil
}

func (p *StreamProcessor) Cleanup(session sarama.ConsumerGroupSession) error {
	return nil
}

func (p *StreamProcessor) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
	for message := range claim.Messages() {
		// Mesajı işle
		var data map[string]interface{}
		if err := json.Unmarshal(message.Value, &data); err != nil {
			log.Printf("Error unmarshaling message: %v", err)
			session.MarkMessage(message, "")
			continue
		}
		
		// Örnek işlem: Kelime sayma
		if text, ok := data["text"].(string); ok {
			words := strings.Fields(text)
			
			p.mu.Lock()
			// Pencere süresi dolduysa, sonuçları gönder ve pencereyi sıfırla
			if time.Since(p.windowStart) > p.windowSize {
				for word, count := range p.windowedCounts {
					result, _ := json.Marshal(map[string]interface{}{
						"word":       word,
						"count":      count,
						"windowEnd":  time.Now(),
						"windowStart": p.windowStart,
					})
					
					p.producer.SendMessage(&sarama.ProducerMessage{
						Topic: "word-counts",
						Key:   sarama.StringEncoder(word),
						Value: sarama.ByteEncoder(result),
					})
				}
				
				p.windowedCounts = make(map[string]int)
				p.windowStart = time.Now()
			}
			
			// Kelimeleri say
			for _, word := range words {
				word = strings.ToLower(strings.Trim(word, ".,!?\"':;()"))
				if word != "" {
					p.windowedCounts[word]++
				}
			}
			p.mu.Unlock()
		}
		
		session.MarkMessage(message, "")
	}
	return nil
}

C# ile Kafka Streams Benzeri İşlemler

C#‘da Kafka Streams benzeri işlevsellik için özel çözümler geliştirilebilir:

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
using System.Text.Json;
using System.Text.RegularExpressions;
using System.Threading;
using System.Threading.Tasks;
using Confluent.Kafka;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;

namespace KafkaStreamProcessor
{
    public class WordCountProcessor : BackgroundService
    {
        private readonly ILogger<WordCountProcessor> _logger;
        private readonly string _bootstrapServers;
        private readonly string _inputTopic;
        private readonly string _outputTopic;
        private readonly TimeSpan _windowSize = TimeSpan.FromMinutes(5);
        
        private ConcurrentDictionary<string, int> _wordCounts = new ConcurrentDictionary<string, int>();
        private DateTime _windowStart = DateTime.UtcNow;

        public WordCountProcessor(
            ILogger<WordCountProcessor> logger,
            string bootstrapServers,
            string inputTopic,
            string outputTopic)
        {
            _logger = logger;
            _bootstrapServers = bootstrapServers;
            _inputTopic = inputTopic;
            _outputTopic = outputTopic;
        }

        protected override async Task ExecuteAsync(CancellationToken stoppingToken)
        {
            var consumerConfig = new ConsumerConfig
            {
                BootstrapServers = _bootstrapServers,
                GroupId = "word-count-processor",
                AutoOffsetReset = AutoOffsetReset.Earliest,
                EnableAutoCommit = false
            };

            var producerConfig = new ProducerConfig
            {
                BootstrapServers = _bootstrapServers
            };

            using var consumer = new ConsumerBuilder<Ignore, string>(consumerConfig).Build();
            using var producer = new ProducerBuilder<string, string>(producerConfig).Build();
            
            consumer.Subscribe(_inputTopic);
            
            try
            {
                while (!stoppingToken.IsCancellationRequested)
                {
                    try
                    {
                        var consumeResult = consumer.Consume(stoppingToken);
                        
                        // Mesajı işle
                        try
                        {
                            var data = JsonSerializer.Deserialize<Dictionary<string, JsonElement>>(consumeResult.Message.Value);
                            
                            if (data.TryGetValue("text", out var textElement) && textElement.ValueKind == JsonValueKind.String)
                            {
                                var text = textElement.GetString();
                                if (!string.IsNullOrEmpty(text))
                                {
                                    ProcessText(text);
                                }
                            }
                        }
                        catch (JsonException ex)
                        {
                            _logger.LogError(ex, "Error deserializing message");
                        }
                        
                        // Pencere süresi dolduysa, sonuçları gönder
                        if (DateTime.UtcNow - _windowStart > _windowSize)
                        {
                            await SendWindowResults(producer);
                        }
                        
                        consumer.Commit(consumeResult);
                    }
                    catch (ConsumeException ex)
                    {
                        _logger.LogError(ex, "Error consuming message");
                    }
                }
            }
            finally
            {
                consumer.Close();
            }
        }

        private void ProcessText(string text)
        {
            var words = Regex.Split(text, @"\W+")
                .Where(w => !string.IsNullOrEmpty(w))
                .Select(w => w.ToLowerInvariant());
            
            foreach (var word in words)
            {
                _wordCounts.AddOrUpdate(word, 1, (_, count) => count + 1);
            }
        }

        private async Task SendWindowResults(IProducer<string, string> producer)
        {
            var windowEnd = DateTime.UtcNow;
            var tasks = new List<Task>();
            
            foreach (var entry in _wordCounts.ToArray())
            {
                var result = new
                {
                    Word = entry.Key,
                    Count = entry.Value,
                    WindowStart = _windowStart,
                    WindowEnd = windowEnd
                };
                
                var json = JsonSerializer.Serialize(result);
                
                tasks.Add(producer.ProduceAsync(_outputTopic, new Message<string, string>
                {
                    Key = entry.Key,
                    Value = json
                }));
                
                _logger.LogInformation("Word count: {Word} = {Count}", entry.Key, entry.Value);
            }
            
            await Task.WhenAll(tasks);
            
            _wordCounts.Clear();
            _windowStart = DateTime.UtcNow;
        }
    }
}

Kafka Güvenliği ve Yetkilendirme

Kafka’da güvenlik ve yetkilendirme, özellikle kurumsal ortamlarda kritik öneme sahiptir. Her iki dilde de güvenli Kafka bağlantıları nasıl oluşturulacağını inceleyelim:

Golang ile Güvenli Kafka Bağlantıları

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
package main

import (
	"crypto/tls"
	"crypto/x509"
	"io/ioutil"
	"log"

	"github.com/Shopify/sarama"
)

func main() {
	// TLS yapılandırması
	tlsConfig, err := createTLSConfig("client.cer.pem", "client.key.pem", "ca.pem")
	if err != nil {
		log.Fatalf("Error creating TLS config: %v", err)
	}

	// Kafka yapılandırması
	config := sarama.NewConfig()
	config.Net.TLS.Enable = true
	config.Net.TLS.Config = tlsConfig
	config.Net.SASL.Enable = true
	config.Net.SASL.Mechanism = sarama.SASLTypePlaintext
	config.Net.SASL.User = "username"
	config.Net.SASL.Password = "password"
	config.Producer.Return.Successes = true

	// Producer oluşturma
	producer, err := sarama.NewSyncProducer([]string{"kafka:9093"}, config)
	if err != nil {
		log.Fatalf("Error creating producer: %v", err)
	}
	defer producer.Close()

	// Mesaj gönderme
	msg := &sarama.ProducerMessage{
		Topic: "secure-topic",
		Value: sarama.StringEncoder("Secure message"),
	}

	partition, offset, err := producer.SendMessage(msg)
	if err != nil {
		log.Fatalf("Error sending message: %v", err)
	}

	log.Printf("Message sent to partition %d at offset %d", partition, offset)
}

func createTLSConfig(clientCertFile, clientKeyFile, caCertFile string) (*tls.Config, error) {
	// CA sertifikasını yükle
	caCert, err := ioutil.ReadFile(caCertFile)
	if err != nil {
		return nil, err
	}

	caCertPool := x509.NewCertPool()
	caCertPool.AppendCertsFromPEM(caCert)

	// İstemci sertifikası ve anahtarını yükle
	cert, err := tls.LoadX509KeyPair(clientCertFile, clientKeyFile)
	if err != nil {
		return nil, err
	}

	// TLS yapılandırması oluştur
	tlsConfig := &tls.Config{
		Certificates:       []tls.Certificate{cert},
		RootCAs:            caCertPool,
		InsecureSkipVerify: false,
	}

	return tlsConfig, nil
}

C# ile Güvenli Kafka Bağlantıları

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
using System;
using System.IO;
using System.Security.Cryptography.X509Certificates;
using System.Threading;
using System.Threading.Tasks;
using Confluent.Kafka;

namespace SecureKafkaClient
{
    class Program
    {
        static async Task Main(string[] args)
        {
            var config = new ProducerConfig
            {
                BootstrapServers = "kafka:9093",
                
                // SSL/TLS yapılandırması
                SecurityProtocol = SecurityProtocol.SaslSsl,
                SslCaLocation = "ca.pem",
                SslCertificateLocation = "client.cer.pem",
                SslKeyLocation = "client.key.pem",
                
                // SASL yapılandırması
                SaslMechanism = SaslMechanism.Plain,
                SaslUsername = "username",
                SaslPassword = "password"
            };

            using var producer = new ProducerBuilder<Null, string>(config).Build();
            
            try
            {
                var result = await producer.ProduceAsync("secure-topic", 
                    new Message<Null, string> { Value = "Secure message" });
                
                Console.WriteLine($"Message sent to {result.TopicPartitionOffset}");
            }
            catch (ProduceException<Null, string> e)
            {
                Console.WriteLine($"Error sending message: {e.Error.Reason}");
            }
        }
    }
}

Kafka Monitoring ve Observability

Kafka uygulamalarının izlenmesi ve gözlemlenmesi, üretim ortamlarında kritik öneme sahiptir. Her iki dilde de monitoring çözümleri nasıl uygulanacağını inceleyelim:

Golang ile Kafka Monitoring

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
package main

import (
	"log"
	"net/http"
	"time"

	"github.com/Shopify/sarama"
	"github.com/prometheus/client_golang/prometheus"
	"github.com/prometheus/client_golang/prometheus/promauto"
	"github.com/prometheus/client_golang/prometheus/promhttp"
)

var (
	// Prometheus metrikleri
	messagesProcessed = promauto.NewCounter(prometheus.CounterOpts{
		Name: "kafka_consumer_messages_processed_total",
		Help: "The total number of processed messages",
	})
	
	processingLatency = promauto.NewHistogram(prometheus.HistogramOpts{
		Name:    "kafka_consumer_processing_latency_seconds",
		Help:    "Message processing latency in seconds",
		Buckets: prometheus.DefBuckets,
	})
	
	consumerLag = promauto.NewGaugeVec(prometheus.GaugeOpts{
		Name: "kafka_consumer_lag",
		Help: "The lag of the consumer per partition",
	}, []string{"topic", "partition"})
)

func main() {
	// Prometheus HTTP sunucusu
	go func() {
		http.Handle("/metrics", promhttp.Handler())
		log.Fatal(http.ListenAndServe(":2112", nil))
	}()

	// Kafka consumer yapılandırması
	config := sarama.NewConfig()
	config.Consumer.Return.Errors = true
	config.Consumer.Offsets.Initial = sarama.OffsetOldest

	consumer, err := sarama.NewConsumerGroup([]string{"localhost:9092"}, "monitored-consumer", config)
	if err != nil {
		log.Fatalf("Error creating consumer group: %v", err)
	}
	defer consumer.Close()

	// Consumer handler
	handler := &MonitoredConsumerHandler{}
	
	// Tüketim döngüsü
	topics := []string{"monitored-topic"}
	for {
		if err := consumer.Consume(context.Background(), topics, handler); err != nil {
			log.Fatalf("Error from consumer: %v", err)
		}
	}
}

// MonitoredConsumerHandler, sarama.ConsumerGroupHandler arayüzünü uygular
type MonitoredConsumerHandler struct{}

func (h *MonitoredConsumerHandler) Setup(session sarama.ConsumerGroupSession) error {
	return nil
}

func (h *MonitoredConsumerHandler) Cleanup(session sarama.ConsumerGroupSession) error {
	return nil
}

func (h *MonitoredConsumerHandler) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
	// Tüketici gecikmesini izle
	go monitorConsumerLag(session, claim)
	
	for message := range claim.Messages() {
		// İşleme başlangıç zamanı
		startTime := time.Now()
		
		// Mesajı işle
		processMessage(message)
		
		// Metrikleri güncelle
		messagesProcessed.Inc()
		processingLatency.Observe(time.Since(startTime).Seconds())
		
		// Mesajı işaretleme
		session.MarkMessage(message, "")
	}
	return nil
}

func monitorConsumerLag(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) {
	ticker := time.NewTicker(10 * time.Second)
	defer ticker.Stop()
	
	topic := claim.Topic()
	partition := claim.Partition()
	
	for range ticker.C {
		// Tüketici gecikmesini hesapla
		currentOffset := session.MarkOffset(topic, partition, 0, "")
		highWatermark := claim.HighWaterMarkOffset()
		lag := highWatermark - currentOffset
		
		// Metriği güncelle
		consumerLag.WithLabelValues(topic, fmt.Sprintf("%d", partition)).Set(float64(lag))
	}
}

func processMessage(message *sarama.ConsumerMessage) {
	// Mesaj işleme mantığı
	time.Sleep(100 * time.Millisecond) // Simüle edilmiş işleme süresi
}

C# ile Kafka Monitoring

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Threading;
using System.Threading.Tasks;
using Confluent.Kafka;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using Prometheus;

namespace KafkaMonitoringExample
{
    public class MonitoredKafkaConsumer : BackgroundService
    {
        private readonly ILogger<MonitoredKafkaConsumer> _logger;
        private readonly ConsumerConfig _config;
        private readonly string _topic;
        
        // Prometheus metrikleri
        private static readonly Counter MessagesProcessed = Metrics.CreateCounter(
            "kafka_consumer_messages_processed_total", 
            "The total number of processed messages");
        
        private static readonly Histogram ProcessingLatency = Metrics.CreateHistogram(
            "kafka_consumer_processing_latency_seconds",
            "Message processing latency in seconds",
            new HistogramConfiguration
            {
                Buckets = Histogram.ExponentialBuckets(0.001, 2, 10)
            });
        
        private static readonly Gauge ConsumerLag = Metrics.CreateGauge(
            "kafka_consumer_lag",
            "The lag of the consumer per partition",
            new GaugeConfiguration
            {
                LabelNames = new[] { "topic", "partition" }
            });

        public MonitoredKafkaConsumer(
            ILogger<MonitoredKafkaConsumer> logger,
            ConsumerConfig config,
            string topic)
        {
            _logger = logger;
            _config = config;
            _topic = topic;
            
            // Prometheus HTTP sunucusu başlat
            var server = new MetricServer(port: 9090);
            server.Start();
        }

        protected override async Task ExecuteAsync(CancellationToken stoppingToken)
        {
            using var consumer = new ConsumerBuilder<Ignore, string>(_config).Build();
            consumer.Subscribe(_topic);
            
            // Tüketici gecikmesini izleme görevi
            _ = Task.Run(() => MonitorConsumerLag(consumer, stoppingToken), stoppingToken);
            
            try
            {
                while (!stoppingToken.IsCancellationRequested)
                {
                    try
                    {
                        var consumeResult = consumer.Consume(stoppingToken);
                        
                        // İşleme başlangıç zamanı
                        var stopwatch = Stopwatch.StartNew();
                        
                        // Mesajı işle
                        await ProcessMessageAsync(consumeResult.Message.Value);
                        
                        // Metrikleri güncelle
                        MessagesProcessed.Inc();
                        ProcessingLatency.Observe(stopwatch.Elapsed.TotalSeconds);
                        
                        // Mesajı işaretleme
                        consumer.Commit(consumeResult);
                    }
                    catch (ConsumeException ex)
                    {
                        _logger.LogError(ex, "Error consuming message");
                    }
                }
            }
            finally
            {
                consumer.Close();
            }
        }

        private async Task ProcessMessageAsync(string message)
        {
            // Mesaj işleme mantığı
            await Task.Delay(100); // Simüle edilmiş işleme süresi
        }
        
        private async Task MonitorConsumerLag(IConsumer<Ignore, string> consumer, CancellationToken stoppingToken)
        {
            while (!stoppingToken.IsCancellationRequested)
            {
                try
                {
                    var watermarkOffsets = consumer.GetWatermarkOffsets(
                        new TopicPartition(_topic, new Partition(0)));
                    
                    var committedOffsets = consumer.Committed(
                        new List<TopicPartition> { new TopicPartition(_topic, new Partition(0)) },
                        TimeSpan.FromSeconds(10));
                    
                    if (committedOffsets.Count > 0)
                    {
                        var lag = watermarkOffsets.High - committedOffsets[0].Offset;
                        ConsumerLag.WithLabels(_topic, "0").Set(lag.Value);
                        _logger.LogInformation("Consumer lag: {Lag}", lag);
                    }
                }
                catch (Exception ex)
                {
                    _logger.LogError(ex, "Error monitoring consumer lag");
                }
                
                await Task.Delay(TimeSpan.FromSeconds(10), stoppingToken);
            }
        }
    }
}

Kafka Uygulamalarında Hata Yönetimi ve Dayanıklılık

Kafka uygulamalarında hata yönetimi ve dayanıklılık, üretim ortamlarında kritik öneme sahiptir. Her iki dilde de bu konuları nasıl ele alabileceğimizi inceleyelim:

Golang ile Hata Yönetimi ve Dayanıklılık

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
package main

import (
	"context"
	"errors"
	"log"
	"os"
	"os/signal"
	"syscall"
	"time"

	"github.com/Shopify/sarama"
	"github.com/cenkalti/backoff/v4"
)

func main() {
	// Graceful shutdown için sinyal yakalama
	signals := make(chan os.Signal, 1)
	signal.Notify(signals, syscall.SIGINT, syscall.SIGTERM)

	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()

	// Sinyal yakalama goroutine
	go func() {
		<-signals
		log.Println("Shutdown signal received, closing consumer...")
		cancel()
	}()

	// Kafka consumer'ı başlat
	if err := startConsumerWithRetry(ctx); err != nil {
		log.Fatalf("Fatal error: %v", err)
	}
}

func startConsumerWithRetry(ctx context.Context) error {
	// Exponential backoff stratejisi
	expBackoff := backoff.NewExponentialBackOff()
	expBackoff.MaxElapsedTime = 5 * time.Minute

	return backoff.RetryNotify(
		func() error {
			return startConsumer(ctx)
		},
		expBackoff,
		func(err error, duration time.Duration) {
			log.Printf("Error: %v, retrying in %v...", err, duration)
		},
	)
}

func startConsumer(ctx context.Context) error {
	// Kafka yapılandırması
	config := sarama.NewConfig()
	config.Consumer.Return.Errors = true
	config.Consumer.Offsets.Initial = sarama.OffsetOldest
	config.Consumer.Offsets.AutoCommit.Enable = false
	config.Consumer.Retry.Backoff = 1 * time.Second
	config.Consumer.Retry.Max = 5

	// Consumer grup oluşturma
	group, err := sarama.NewConsumerGroup([]string{"localhost:9092"}, "resilient-consumer", config)
	if err != nil {
		return err
	}
	defer group.Close()

	// Consumer hata kanalını izleme
	go func() {
		for err := range group.Errors() {
			log.Printf("Consumer group error: %v", err)
		}
	}()

	// Consumer handler
	handler := &ResilientConsumerHandler{
		readyC: make(chan bool),
	}

	// Tüketim döngüsü
	topics := []string{"resilient-topic"}
	
	// Handler hazır olana kadar bekle
	go func() {
		if err := group.Consume(ctx, topics, handler); err != nil {
			log.Printf("Error from consumer: %v", err)
		}
	}()

	<-handler.readyC
	log.Println("Consumer is ready")

	// Bağlam iptal edilene kadar bekle
	<-ctx.Done()
	log.Println("Context cancelled, closing consumer")
	
	// Consumer'ın kapanması için biraz bekle
	time.Sleep(2 * time.Second)
	return nil
}

// ResilientConsumerHandler, sarama.ConsumerGroupHandler arayüzünü uygular
type ResilientConsumerHandler struct {
	readyC chan bool
}

func (h *ResilientConsumerHandler) Setup(session sarama.ConsumerGroupSession) error {
	// Consumer hazır olduğunda bildir
	close(h.readyC)
	return nil
}

func (h *ResilientConsumerHandler) Cleanup(session sarama.ConsumerGroupSession) error {
	return nil
}

func (h *ResilientConsumerHandler) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
	for {
		select {
		case message, ok := <-claim.Messages():
			if !ok {
				return nil
			}
			
			// Mesajı işle
			if err := processMessageWithRetry(message); err != nil {
				// Kritik hata durumunda işlemi durdur
				if errors.Is(err, ErrCritical) {
					return err
				}
				
				// Geçici hata durumunda mesajı işaretleme ve devam et
				log.Printf("Error processing message, skipping: %v", err)
			}
			
			// Mesajı işaretleme
			session.MarkMessage(message, "")
			
		case <-session.Context().Done():
			return nil
		}
	}
}

// Özel hata türleri
var ErrCritical = errors.New("critical error")
var ErrTemporary = errors.New("temporary error")

func processMessageWithRetry(message *sarama.ConsumerMessage) error {
	// Retry stratejisi
	expBackoff := backoff.NewExponentialBackOff()
	expBackoff.MaxElapsedTime = 30 * time.Second

	return backoff.Retry(
		func() error {
			err := processMessage(message)
			
			// Geçici hata durumunda yeniden dene
			if errors.Is(err, ErrTemporary) {
				return err
			}
			
			// Diğer hata türlerinde yeniden deneme
			return nil
		},
		expBackoff,
	)
}

func processMessage(message *sarama.ConsumerMessage) error {
	// Mesaj işleme mantığı
	// ...
	return nil
}

C# ile Hata Yönetimi ve Dayanıklılık

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
using System;
using System.Threading;
using System.Threading.Tasks;
using Confluent.Kafka;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using Polly;
using Polly.Retry;

namespace ResilientKafkaConsumer
{
    public class ResilientKafkaConsumerService : BackgroundService
    {
        private readonly ILogger<ResilientKafkaConsumerService> _logger;
        private readonly ConsumerConfig _config;
        private readonly string _topic;
        private readonly AsyncRetryPolicy _retryPolicy;

        public ResilientKafkaConsumerService(
            ILogger<ResilientKafkaConsumerService> logger,
            ConsumerConfig config,
            string topic)
        {
            _logger = logger;
            _config = config;
            _topic = topic;
            
            // Polly retry politikası
            _retryPolicy = Policy
                .Handle<Exception>(ex => !(ex is OperationCanceledException) && IsTransientException(ex))
                .WaitAndRetryAsync(
                    5,
                    retryAttempt => TimeSpan.FromSeconds(Math.Pow(2, retryAttempt)),
                    (exception, timeSpan, retryCount, context) =>
                    {
                        _logger.LogWarning(
                            exception, 
                            "Error processing message. Retrying in {RetryTimeSpan}s. Attempt {RetryCount}",
                            timeSpan.TotalSeconds,
                            retryCount);
                    }
                );
        }

        protected override async Task ExecuteAsync(CancellationToken stoppingToken)
        {
            _logger.LogInformation("Resilient Kafka consumer service starting");

            // Graceful shutdown için CancellationToken kullanımı
            using var linkedCts = CancellationTokenSource.CreateLinkedTokenSource(stoppingToken);
            
            try
            {
                await StartConsumerWithRetryAsync(linkedCts.Token);
            }
            catch (OperationCanceledException)
            {
                _logger.LogInformation("Consumer service was cancelled");
            }
            catch (Exception ex)
            {
                _logger.LogError(ex, "Fatal error in consumer service");
                throw;
            }
            finally
            {
                _logger.LogInformation("Resilient Kafka consumer service stopping");
            }
        }

        private async Task StartConsumerWithRetryAsync(CancellationToken cancellationToken)
        {
            // Bağlantı kurma yeniden deneme politikası
            var connectionRetryPolicy = Policy
                .Handle<Exception>(ex => !(ex is OperationCanceledException))
                .WaitAndRetryForeverAsync(
                    retryAttempt => TimeSpan.FromSeconds(Math.Min(30, Math.Pow(2, retryAttempt))),
                    (exception, timeSpan, context) =>
                    {
                        _logger.LogWarning(
                            exception,
                            "Error connecting to Kafka. Retrying in {RetryTimeSpan}s",
                            timeSpan.TotalSeconds);
                    }
                );

            await connectionRetryPolicy.ExecuteAsync(async () =>
            {
                using var consumer = new ConsumerBuilder<Ignore, string>(_config)
                    .SetErrorHandler((_, error) =>
                    {
                        _logger.LogError("Kafka error: {ErrorReason}", error.Reason);
                    })
                    .SetPartitionsAssignedHandler((c, partitions) =>
                    {
                        _logger.LogInformation("Partitions assigned: {Partitions}", string.Join(", ", partitions));
                    })
                    .SetPartitionsRevokedHandler((c, partitions) =>
                    {
                        _logger.LogInformation("Partitions revoked: {Partitions}", string.Join(", ", partitions));
                    })
                    .Build();

                try
                {
                    consumer.Subscribe(_topic);
                    _logger.LogInformation("Subscribed to topic: {Topic}", _topic);

                    while (!cancellationToken.IsCancellationRequested)
                    {
                        try
                        {
                            var consumeResult = consumer.Consume(cancellationToken);
                            
                            // Mesajı işle
                            await ProcessMessageWithRetryAsync(consumeResult, cancellationToken);
                            
                            // Başarılı işleme sonrası offset'i commit et
                            try
                            {
                                consumer.Commit(consumeResult);
                            }
                            catch (KafkaException ex)
                            {
                                _logger.LogWarning(ex, "Error committing offset");
                            }
                        }
                        catch (ConsumeException ex)
                        {
                            _logger.LogError(ex, "Error consuming message");
                        }
                    }
                }
                finally
                {
                    // Graceful shutdown
                    try
                    {
                        consumer.Close();
                        _logger.LogInformation("Consumer closed");
                    }
                    catch (Exception ex)
                    {
                        _logger.LogWarning(ex, "Error closing consumer");
                    }
                }
            });
        }

        private async Task ProcessMessageWithRetryAsync(ConsumeResult<Ignore, string> consumeResult, CancellationToken cancellationToken)
        {
            await _retryPolicy.ExecuteAsync(async () =>
            {
                try
                {
                    await ProcessMessageAsync(consumeResult.Message.Value, cancellationToken);
                    _logger.LogInformation(
                        "Processed message at {TopicPartitionOffset}",
                        consumeResult.TopicPartitionOffset);
                }
                catch (Exception ex) when (IsCriticalException(ex))
                {
                    _logger.LogCritical(ex, "Critical error processing message");
                    // Kritik hataları yeniden atmak için
                    throw;
                }
                catch (Exception ex) when (!IsTransientException(ex))
                {
                    _logger.LogError(ex, "Non-transient error processing message");
                    // Geçici olmayan hataları yeniden atmak için
                    throw;
                }
            });
        }

        private async Task ProcessMessageAsync(string message, CancellationToken cancellationToken)
        {
            // Mesaj işleme mantığı
            await Task.Delay(100, cancellationToken); // Simüle edilmiş işleme
            
            // Rastgele hata simülasyonu (test amaçlı)
            if (new Random().Next(100) < 10)
            {
                throw new TransientException("Simulated transient error");
            }
        }

        private bool IsTransientException(Exception ex)
        {
            // Geçici hataları tanımlama
            return ex is TransientException ||
                   ex is KafkaException kafkaEx && IsTransientKafkaError(kafkaEx.Error.Code);
        }

        private bool IsCriticalException(Exception ex)
        {
            // Kritik hataları tanımlama
            return ex is CriticalException ||
                   ex is OutOfMemoryException;
        }

        private bool IsTransientKafkaError(ErrorCode errorCode)
        {
            // Geçici Kafka hatalarını tanımlama
            return errorCode == ErrorCode.NetworkException ||
                   errorCode == ErrorCode.AllBrokersDown ||
                   errorCode == ErrorCode.NotCoordinatorForGroup ||
                   errorCode == ErrorCode.RequestTimedOut;
        }
    }

    // Özel hata türleri
    public class TransientException : Exception
    {
        public TransientException(string message) : base(message) { }
    }

    public class CriticalException : Exception
    {
        public CriticalException(string message) : base(message) { }
    }
}

Kafka ile Mikroservis İletişimi

Kafka, mikroservisler arasında asenkron iletişim için mükemmel bir araçtır. Her iki dilde de mikroservis iletişimi için Kafka’nın nasıl kullanılabileceğini inceleyelim:

Golang ile Mikroservis İletişimi

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
package main

import (
	"context"
	"encoding/json"
	"log"
	"net/http"
	"os"
	"os/signal"
	"syscall"
	"time"

	"github.com/Shopify/sarama"
	"github.com/gin-gonic/gin"
)

// Olay türleri
const (
	OrderCreated  = "order.created"
	OrderShipped  = "order.shipped"
	OrderDelivered = "order.delivered"
)

// Olay yapısı
type Event struct {
	ID        string      `json:"id"`
	Type      string      `json:"type"`
	Timestamp time.Time   `json:"timestamp"`
	Payload   interface{} `json:"payload"`
}

// Sipariş yapısı
type Order struct {
	ID          string    `json:"id"`
	CustomerID  string    `json:"customer_id"`
	Products    []Product `json:"products"`
	TotalAmount float64   `json:"total_amount"`
	Status      string    `json:"status"`
}

// Ürün yapısı
type Product struct {
	ID       string  `json:"id"`
	Name     string  `json:"name"`
	Quantity int     `json:"quantity"`
	Price    float64 `json:"price"`
}

// Kafka producer
var producer sarama.SyncProducer

func main() {
	// Kafka yapılandırması
	config := sarama.NewConfig()
	config.Producer.RequiredAcks = sarama.WaitForAll
	config.Producer.Retry.Max = 5
	config.Producer.Return.Successes = true

	// Producer oluşturma
	var err error
	producer, err = sarama.NewSyncProducer([]string{"localhost:9092"}, config)
	if err != nil {
		log.Fatalf("Failed to create producer: %s", err)
	}
	defer producer.Close()

	// HTTP sunucusu
	router := gin.Default()
	router.POST("/orders", createOrder)
	router.PUT("/orders/:id/ship", shipOrder)
	router.PUT("/orders/:id/deliver", deliverOrder)

	// HTTP sunucusunu başlat
	srv := &http.Server{
		Addr:    ":8080",
		Handler: router,
	}

	// Graceful shutdown
	go func() {
		if err := srv.ListenAndServe(); err != nil && err != http.ErrServerClosed {
			log.Fatalf("Failed to start server: %s", err)
		}
	}()

	// Sinyal yakalama
	quit := make(chan os.Signal, 1)
	signal.Notify(quit, syscall.SIGINT, syscall.SIGTERM)
	<-quit
	log.Println("Shutting down server...")

	// Sunucuyu kapat
	ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
	defer cancel()
	if err := srv.Shutdown(ctx); err != nil {
		log.Fatalf("Server forced to shutdown: %s", err)
	}

	log.Println("Server exiting")
}

// Sipariş oluşturma handler'ı
func createOrder(c *gin.Context) {
	var order Order
	if err := c.ShouldBindJSON(&order); err != nil {
		c.JSON(http.StatusBadRequest, gin.H{"error": err.Error()})
		return
	}

	// Sipariş oluşturma işlemleri
	order.Status = "created"

	// Olayı yayınla
	event := Event{
		ID:        generateID(),
		Type:      OrderCreated,
		Timestamp: time.Now(),
		Payload:   order,
	}

	if err := publishEvent(OrderCreated, event); err != nil {
		c.JSON(http.StatusInternalServerError, gin.H{"error": "Failed to publish event"})
		return
	}

	c.JSON(http.StatusCreated, order)
}

// Sipariş gönderme handler'ı
func shipOrder(c *gin.Context) {
	id := c.Param("id")
	
	// Sipariş bulma ve güncelleme işlemleri (örnek)
	order := Order{
		ID:     id,
		Status: "shipped",
	}

	// Olayı yayınla
	event := Event{
		ID:        generateID(),
		Type:      OrderShipped,
		Timestamp: time.Now(),
		Payload:   order,
	}

	if err := publishEvent(OrderShipped, event); err != nil {
		c.JSON(http.StatusInternalServerError, gin.H{"error": "Failed to publish event"})
		return
	}

	c.JSON(http.StatusOK, order)
}

// Sipariş teslim handler'ı
func deliverOrder(c *gin.Context) {
	id := c.Param("id")
	
	// Sipariş bulma ve güncelleme işlemleri (örnek)
	order := Order{
		ID:     id,
		Status: "delivered",
	}

	// Olayı yayınla
	event := Event{
		ID:        generateID(),
		Type:      OrderDelivered,
		Timestamp: time.Now(),
		Payload:   order,
	}

	if err := publishEvent(OrderDelivered, event); err != nil {
		c.JSON(http.StatusInternalServerError, gin.H{"error": "Failed to publish event"})
		return
	}

	c.JSON(http.StatusOK, order)
}

// Olayı Kafka'ya yayınlama
func publishEvent(eventType string, event Event) error {
	payload, err := json.Marshal(event)
	if err != nil {
		return err
	}

	// Kafka mesajı oluştur
	msg := &sarama.ProducerMessage{
		Topic: "orders",
		Key:   sarama.StringEncoder(event.ID),
		Value: sarama.ByteEncoder(payload),
		Headers: []sarama.RecordHeader{
			{
				Key:   []byte("event_type"),
				Value: []byte(eventType),
			},
		},
	}

	// Mesajı gönder
	_, _, err = producer.SendMessage(msg)
	return err
}

// Benzersiz ID oluşturma
func generateID() string {
	return time.Now().Format("20060102150405") + "-" + randomString(6)
}

// Rastgele string oluşturma
func randomString(n int) string {
	const letterBytes = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ"
	b := make([]byte, n)
	for i := range b {
		b[i] = letterBytes[time.Now().UnixNano()%int64(len(letterBytes))]
	}
	return string(b)
}

C# ile Mikroservis İletişimi

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
using System;
using System.Collections.Generic;
using System.Text.Json;
using System.Threading.Tasks;
using Confluent.Kafka;
using Microsoft.AspNetCore.Builder;
using Microsoft.AspNetCore.Hosting;
using Microsoft.AspNetCore.Http;
using Microsoft.AspNetCore.Mvc;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;

namespace OrderService
{
    // Olay türleri
    public static class EventTypes
    {
        public const string OrderCreated = "order.created";
        public const string OrderShipped = "order.shipped";
        public const string OrderDelivered = "order.delivered";
    }

    // Olay yapısı
    public class Event
    {
        public string Id { get; set; }
        public string Type { get; set; }
        public DateTime Timestamp { get; set; }
        public object Payload { get; set; }
    }

    // Sipariş yapısı
    public class Order
    {
        public string Id { get; set; }
        public string CustomerId { get; set; }
        public List<Product> Products { get; set; }
        public decimal TotalAmount { get; set; }
        public string Status { get; set; }
    }

    // Ürün yapısı
    public class Product
    {
        public string Id { get; set; }
        public string Name { get; set; }
        public int Quantity { get; set; }
        public decimal Price { get; set; }
    }

    // Kafka event publisher
    public interface IEventPublisher
    {
        Task PublishEventAsync(string eventType, Event @event);
    }

    public class KafkaEventPublisher : IEventPublisher
    {
        private readonly IProducer<string, string> _producer;
        private readonly ILogger<KafkaEventPublisher> _logger;

        public KafkaEventPublisher(ILogger<KafkaEventPublisher> logger, ProducerConfig config)
        {
            _logger = logger;
            _producer = new ProducerBuilder<string, string>(config).Build();
        }

        public async Task PublishEventAsync(string eventType, Event @event)
        {
            try
            {
                var json = JsonSerializer.Serialize(@event);
                
                var message = new Message<string, string>
                {
                    Key = @event.Id,
                    Value = json,
                    Headers = new Headers
                    {
                        { "event_type", System.Text.Encoding.UTF8.GetBytes(eventType) }
                    }
                };

                var result = await _producer.ProduceAsync("orders", message);
                _logger.LogInformation(
                    "Published event {EventType} to {TopicPartitionOffset}",
                    eventType, result.TopicPartitionOffset);
            }
            catch (Exception ex)
            {
                _logger.LogError(ex, "Error publishing event {EventType}", eventType);
                throw;
            }
        }
    }

    // Order controller
    [ApiController]
    [Route("api/[controller]")]
    public class OrdersController : ControllerBase
    {
        private readonly IEventPublisher _eventPublisher;
        private readonly ILogger<OrdersController> _logger;

        public OrdersController(IEventPublisher eventPublisher, ILogger<OrdersController> logger)
        {
            _eventPublisher = eventPublisher;
            _logger = logger;
        }

        [HttpPost]
        public async Task<IActionResult> CreateOrder(Order order)
        {
            // Sipariş oluşturma işlemleri
            order.Id = GenerateId();
            order.Status = "created";

            // Olayı yayınla
            var @event = new Event
            {
                Id = GenerateId(),
                Type = EventTypes.OrderCreated,
                Timestamp = DateTime.UtcNow,
                Payload = order
            };

            try
            {
                await _eventPublisher.PublishEventAsync(EventTypes.OrderCreated, @event);
                return CreatedAtAction(nameof(GetOrder), new { id = order.Id }, order);
            }
            catch (Exception ex)
            {
                _logger.LogError(ex, "Error creating order");
                return StatusCode(StatusCodes.Status500InternalServerError, "Failed to create order");
            }
        }

        [HttpGet("{id}")]
        public IActionResult GetOrder(string id)
        {
            // Gerçek uygulamada veritabanından sipariş bulma
            var order = new Order { Id = id };
            return Ok(order);
        }

        [HttpPut("{id}/ship")]
        public async Task<IActionResult> ShipOrder(string id)
        {
            // Gerçek uygulamada veritabanından sipariş bulma ve güncelleme
            var order = new Order
            {
                Id = id,
                Status = "shipped"
            };

            // Olayı yayınla
            var @event = new Event
            {
                Id = GenerateId(),
                Type = EventTypes.OrderShipped,
                Timestamp = DateTime.UtcNow,
                Payload = order
            };

            try
            {
                await _eventPublisher.PublishEventAsync(EventTypes.OrderShipped, @event);
                return Ok(order);
            }
            catch (Exception ex)
            {
                _logger.LogError(ex, "Error shipping order");
                return StatusCode(StatusCodes.Status500InternalServerError, "Failed to ship order");
            }
        }

        [HttpPut("{id}/deliver")]
        public async Task<IActionResult> DeliverOrder(string id)
        {
            // Gerçek uygulamada veritabanından sipariş bulma ve güncelleme
            var order = new Order
            {
                Id = id,
                Status = "delivered"
            };

            // Olayı yayınla
            var @event = new Event
            {
                Id = GenerateId(),
                Type = EventTypes.OrderDelivered,
                Timestamp = DateTime.UtcNow,
                Payload = order
            };

            try
            {
                await _eventPublisher.PublishEventAsync(EventTypes.OrderDelivered, @event);
                return Ok(order);
            }
            catch (Exception ex)
            {
                _logger.LogError(ex, "Error delivering order");
                return StatusCode(StatusCodes.Status500InternalServerError, "Failed to deliver order");
            }
        }

        private string GenerateId() => Guid.NewGuid().ToString("N");
    }

    // Startup
    public class Startup
    {
        public void ConfigureServices(IServiceCollection services)
        {
            services.AddControllers();
            
            // Kafka yapılandırması
            var producerConfig = new ProducerConfig
            {
                BootstrapServers = "localhost:9092",
                Acks = Acks.All
            };
            
            services.AddSingleton(producerConfig);
            services.AddSingleton<IEventPublisher, KafkaEventPublisher>();
        }

        public void Configure(IApplicationBuilder app, IWebHostEnvironment env)
        {
            if (env.IsDevelopment())
            {
                app.UseDeveloperExceptionPage();
            }

            app.UseRouting();
            app.UseEndpoints(endpoints =>
            {
                endpoints.MapControllers();
            });
        }
    }

    // Program
    public class Program
    {
        public static void Main(string[] args)
        {
            CreateHostBuilder(args).Build().Run();
        }

        public static IHostBuilder CreateHostBuilder(string[] args) =>
            Host.CreateDefaultBuilder(args)
                .ConfigureWebHostDefaults(webBuilder =>
                {
                    webBuilder.UseStartup<Startup>();
                });
    }
}

Kafka ile Olay Kaynaklı Mimari (Event-Sourcing)

Kafka, olay kaynaklı mimari (event-sourcing) için mükemmel bir altyapı sağlar. Her iki dilde de bu mimariyi nasıl uygulayabileceğimizi inceleyelim:

Golang ile Olay Kaynaklı Mimari

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
package main

import (
	"context"
	"encoding/json"
	"fmt"
	"log"
	"sync"
	"time"

	"github.com/Shopify/sarama"
)

// Olay türleri
const (
	AccountCreated = "account.created"
	MoneyDeposited = "money.deposited"
	MoneyWithdrawn = "money.withdrawn"
)

// Olay yapısı
type Event struct {
	ID        string      `json:"id"`
	Type      string      `json:"type"`
	Timestamp time.Time   `json:"timestamp"`
	Payload   interface{} `json:"payload"`
}

// Hesap yapısı
type Account struct {
	ID      string  `json:"id"`
	Owner   string  `json:"owner"`
	Balance float64 `json:"balance"`
}

// Hesap oluşturma olayı
type AccountCreatedEvent struct {
	ID    string `json:"id"`
	Owner string `json:"owner"`
}

// Para yatırma olayı
type MoneyDepositedEvent struct {
	AccountID string  `json:"account_id"`
	Amount    float64 `json:"amount"`
}

// Para çekme olayı
type MoneyWithdrawnEvent struct {
	AccountID string  `json:"account_id"`
	Amount    float64 `json:"amount"`
}

// Hesap projeksiyonu
type AccountProjection struct {
	accounts     map[string]*Account
	eventHandler map[string]func(Event) error
	mu           sync.RWMutex
}

// Yeni hesap projeksiyonu oluşturma
func NewAccountProjection() *AccountProjection {
	ap := &AccountProjection{
		accounts:     make(map[string]*Account),
		eventHandler: make(map[string]func(Event) error),
	}

	// Olay işleyicilerini kaydet
	ap.eventHandler[AccountCreated] = ap.handleAccountCreated
	ap.eventHandler[MoneyDeposited] = ap.handleMoneyDeposited
	ap.eventHandler[MoneyWithdrawn] = ap.handleMoneyWithdrawn

	return ap
}

// Hesap oluşturma olayını işleme
func (ap *AccountProjection) handleAccountCreated(event Event) error {
	var payload AccountCreatedEvent
	if err := convertPayload(event.Payload, &payload); err != nil {
		return err
	}

	ap.mu.Lock()
	defer ap.mu.Unlock()

	// Hesap zaten varsa hata döndür
	if _, exists := ap.accounts[payload.ID]; exists {
		return fmt.Errorf("account already exists: %s", payload.ID)
	}

	// Yeni hesap oluştur
	ap.accounts[payload.ID] = &Account{
		ID:      payload.ID,
		Owner:   payload.Owner,
		Balance: 0,
	}

	return nil
}

// Para yatırma olayını işleme
func (ap *AccountProjection) handleMoneyDeposited(event Event) error {
	var payload MoneyDepositedEvent
	if err := convertPayload(event.Payload, &payload); err != nil {
		return err
	}

	ap.mu.Lock()
	defer ap.mu.Unlock()

	// Hesap yoksa hata döndür
	account, exists := ap.accounts[payload.AccountID]
	if !exists {
		return fmt.Errorf("account not found: %s", payload.AccountID)
	}

	// Bakiyeyi güncelle
	account.Balance += payload.Amount

	return nil
}

// Para çekme olayını işleme
func (ap *AccountProjection) handleMoneyWithdrawn(event Event) error {
	var payload MoneyWithdrawnEvent
	if err := convertPayload(event.Payload, &payload); err != nil {
		return err
	}

	ap.mu.Lock()
	defer ap.mu.Unlock()

	// Hesap yoksa hata döndür
	account, exists := ap.accounts[payload.AccountID]
	if !exists {
		return fmt.Errorf("account not found: %s", payload.AccountID)
	}

	// Yetersiz bakiye kontrolü
	if account.Balance < payload.Amount {
		return fmt.Errorf("insufficient balance: %.2f < %.2f", account.Balance, payload.Amount)
	}

	// Bakiyeyi güncelle
	account.Balance -= payload.Amount

	return nil
}

// Olayı işleme
func (ap *AccountProjection) ApplyEvent(event Event) error {
	handler, exists := ap.eventHandler[event.Type]
	if !exists {
		return fmt.Errorf("no handler for event type: %s", event.Type)
	}

	return handler(event)
}

// Hesap bilgisini getirme
func (ap *AccountProjection) GetAccount(id string) (*Account, error) {
	ap.mu.RLock()
	defer ap.mu.RUnlock()

	account, exists := ap.accounts[id]
	if !exists {
		return nil, fmt.Errorf("account not found: %s", id)
	}

	return account, nil
}

// Payload dönüştürme yardımcı fonksiyonu
func convertPayload(src, dst interface{}) error {
	data, err := json.Marshal(src)
	if err != nil {
		return err
	}

	return json.Unmarshal(data, dst)
}

func main() {
	// Kafka consumer yapılandırması
	config := sarama.NewConfig()
	config.Consumer.Return.Errors = true
	config.Consumer.Offsets.Initial = sarama.OffsetOldest

	// Consumer oluşturma
	consumer, err := sarama.NewConsumer([]string{"localhost:9092"}, config)
	if err != nil {
		log.Fatalf("Error creating consumer: %v", err)
	}
	defer consumer.Close()

	// Olay topic'inden partition consumer oluşturma
	partitionConsumer, err := consumer.ConsumePartition("account-events", 0, sarama.OffsetOldest)
	if err != nil {
		log.Fatalf("Error creating partition consumer: %v", err)
	}
	defer partitionConsumer.Close()

	// Hesap projeksiyonu oluştur
	projection := NewAccountProjection()

	// Sinyalleri yakalama
	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()

	// Olayları tüketme
	go func() {
		for {
			select {
			case msg := <-partitionConsumer.Messages():
				var event Event
				if err := json.Unmarshal(msg.Value, &event); err != nil {
					log.Printf("Error unmarshaling event: %v", err)
					continue
				}

				if err := projection.ApplyEvent(event); err != nil {
					log.Printf("Error applying event: %v", err)
					continue
				}

				log.Printf("Applied event: %s", event.Type)

			case err := <-partitionConsumer.Errors():
				log.Printf("Error consuming message: %v", err)

			case <-ctx.Done():
				return
			}
		}
	}()

	// Örnek olaylar oluştur ve uygula
	events := []Event{
		{
			ID:        "evt-1",
			Type:      AccountCreated,
			Timestamp: time.Now(),
			Payload: AccountCreatedEvent{
				ID:    "acc-1",
				Owner: "John Doe",
			},
		},
		{
			ID:        "evt-2",
			Type:      MoneyDeposited,
			Timestamp: time.Now(),
			Payload: MoneyDepositedEvent{
				AccountID: "acc-1",
				Amount:    100.0,
			},
		},
		{
			ID:        "evt-3",
			Type:      MoneyWithdrawn,
			Timestamp: time.Now(),
			Payload: MoneyWithdrawnEvent{
				AccountID: "acc-1",
				Amount:    30.0,
			},
		},
	}

	for _, event := range events {
		if err := projection.ApplyEvent(event); err != nil {
			log.Printf("Error applying event: %v", err)
			continue
		}
		log.Printf("Applied event: %s", event.Type)
	}

	// Hesap durumunu kontrol et
	account, err := projection.GetAccount("acc-1")
	if err != nil {
		log.Fatalf("Error getting account: %v", err)
	}

	log.Printf("Account state: ID=%s, Owner=%s, Balance=%.2f",
		account.ID, account.Owner, account.Balance)

	// Programı çalışır durumda tut
	select {}
}

C# ile Olay Kaynaklı Mimari

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Text.Json;
using System.Threading;
using System.Threading.Tasks;
using Confluent.Kafka;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;

namespace EventSourcingExample
{
      // Olay türleri
    public static class EventTypes
    {
        public const string AccountCreated = "account.created";
        public const string MoneyDeposited = "money.deposited";
        public const string MoneyWithdrawn = "money.withdrawn";
    }

    // Olay yapısı
    public class Event
    {
        public string Id { get; set; }
        public string Type { get; set; }
        public DateTime Timestamp { get; set; }
        public JsonElement Payload { get; set; }
    }

    // Hesap yapısı
    public class Account
    {
        public string Id { get; set; }
        public string Owner { get; set; }
        public decimal Balance { get; set; }
    }

    // Hesap oluşturma olayı
    public class AccountCreatedEvent
    {
        public string Id { get; set; }
        public string Owner { get; set; }
    }

    // Para yatırma olayı
    public class MoneyDepositedEvent
    {
        public string AccountId { get; set; }
        public decimal Amount { get; set; }
    }

    // Para çekme olayı
    public class MoneyWithdrawnEvent
    {
        public string AccountId { get; set; }
        public decimal Amount { get; set; }
    }

    // Hesap projeksiyonu
    public class AccountProjection
    {
        private readonly ConcurrentDictionary<string, Account> _accounts = new ConcurrentDictionary<string, Account>();
        private readonly Dictionary<string, Func<Event, Task>> _eventHandlers;
        private readonly ILogger<AccountProjection> _logger;

        public AccountProjection(ILogger<AccountProjection> logger)
        {
            _logger = logger;
            _eventHandlers = new Dictionary<string, Func<Event, Task>>
            {
                { EventTypes.AccountCreated, HandleAccountCreatedAsync },
                { EventTypes.MoneyDeposited, HandleMoneyDepositedAsync },
                { EventTypes.MoneyWithdrawn, HandleMoneyWithdrawnAsync }
            };
        }

        // Hesap oluşturma olayını işleme
        private async Task HandleAccountCreatedAsync(Event @event)
        {
            var payload = JsonSerializer.Deserialize<AccountCreatedEvent>(@event.Payload.GetRawText());
            
            if (_accounts.ContainsKey(payload.Id))
            {
                _logger.LogWarning("Account already exists: {AccountId}", payload.Id);
                return;
            }

            var account = new Account
            {
                Id = payload.Id,
                Owner = payload.Owner,
                Balance = 0
            };

            if (_accounts.TryAdd(payload.Id, account))
            {
                _logger.LogInformation("Account created: {AccountId}", payload.Id);
            }
            
            await Task.CompletedTask;
        }

        // Para yatırma olayını işleme
        private async Task HandleMoneyDepositedAsync(Event @event)
        {
            var payload = JsonSerializer.Deserialize<MoneyDepositedEvent>(@event.Payload.GetRawText());
            
            if (!_accounts.TryGetValue(payload.AccountId, out var account))
            {
                _logger.LogWarning("Account not found: {AccountId}", payload.AccountId);
                return;
            }

            account.Balance += payload.Amount;
            _logger.LogInformation("Deposited {Amount} to account {AccountId}, new balance: {Balance}", 
                payload.Amount, payload.AccountId, account.Balance);
            
            await Task.CompletedTask;
        }

        // Para çekme olayını işleme
        private async Task HandleMoneyWithdrawnAsync(Event @event)
        {
            var payload = JsonSerializer.Deserialize<MoneyWithdrawnEvent>(@event.Payload.GetRawText());
            
            if (!_accounts.TryGetValue(payload.AccountId, out var account))
            {
                _logger.LogWarning("Account not found: {AccountId}", payload.AccountId);
                return;
            }

            if (account.Balance < payload.Amount)
            {
                _logger.LogWarning("Insufficient balance: {Balance} < {Amount}", account.Balance, payload.Amount);
                return;
            }

            account.Balance -= payload.Amount;
            _logger.LogInformation("Withdrawn {Amount} from account {AccountId}, new balance: {Balance}", 
                payload.Amount, payload.AccountId, account.Balance);
            
            await Task.CompletedTask;
        }

        // Olayı işleme
        public async Task ApplyEventAsync(Event @event)
        {
            if (_eventHandlers.TryGetValue(@event.Type, out var handler))
            {
                await handler(@event);
            }
            else
            {
                _logger.LogWarning("No handler for event type: {EventType}", @event.Type);
            }
        }

        // Hesap bilgisini getirme
        public Account GetAccount(string id)
        {
            if (_accounts.TryGetValue(id, out var account))
            {
                return account;
            }
            
            return null;
        }

        // Tüm hesapları getirme
        public IEnumerable<Account> GetAllAccounts()
        {
            return _accounts.Values;
        }
    }

    // Kafka event consumer service
    public class EventConsumerService : BackgroundService
    {
        private readonly ILogger<EventConsumerService> _logger;
        private readonly ConsumerConfig _config;
        private readonly AccountProjection _projection;

        public EventConsumerService(
            ILogger<EventConsumerService> logger,
            ConsumerConfig config,
            AccountProjection projection)
        {
            _logger = logger;
            _config = config;
            _projection = projection;
        }

        protected override async Task ExecuteAsync(CancellationToken stoppingToken)
        {
            _logger.LogInformation("Event consumer service starting");

            using var consumer = new ConsumerBuilder<string, string>(_config)
                .SetErrorHandler((_, e) => _logger.LogError("Kafka error: {Reason}", e.Reason))
                .Build();

            consumer.Subscribe("account-events");

            try
            {
                while (!stoppingToken.IsCancellationRequested)
                {
                    try
                    {
                        var consumeResult = consumer.Consume(stoppingToken);
                        
                        _logger.LogInformation("Consumed event from {TopicPartitionOffset}", consumeResult.TopicPartitionOffset);
                        
                        var @event = JsonSerializer.Deserialize<Event>(consumeResult.Message.Value);
                        await _projection.ApplyEventAsync(@event);
                        
                        // Commit offset
                        consumer.Commit(consumeResult);
                    }
                    catch (ConsumeException e)
                    {
                        _logger.LogError("Consume error: {Error}", e.Error.Reason);
                    }
                    catch (Exception e)
                    {
                        _logger.LogError(e, "Error processing event");
                    }
                }
            }
            finally
            {
                consumer.Close();
            }
        }
    }

    // Event publisher
    public interface IEventPublisher
    {
        Task PublishEventAsync<T>(string type, string id, T payload);
    }

    public class KafkaEventPublisher : IEventPublisher
    {
        private readonly IProducer<string, string> _producer;
        private readonly ILogger<KafkaEventPublisher> _logger;

        public KafkaEventPublisher(ProducerConfig config, ILogger<KafkaEventPublisher> logger)
        {
            _producer = new ProducerBuilder<string, string>(config).Build();
            _logger = logger;
        }

        public async Task PublishEventAsync<T>(string type, string id, T payload)
        {
            var @event = new Event
            {
                Id = id,
                Type = type,
                Timestamp = DateTime.UtcNow,
                Payload = JsonDocument.Parse(JsonSerializer.Serialize(payload)).RootElement
            };

            var json = JsonSerializer.Serialize(@event);
            
            var message = new Message<string, string>
            {
                Key = id,
                Value = json
            };

            try
            {
                var result = await _producer.ProduceAsync("account-events", message);
                _logger.LogInformation("Published event {EventType} to {TopicPartitionOffset}", 
                    type, result.TopicPartitionOffset);
            }
            catch (Exception ex)
            {
                _logger.LogError(ex, "Error publishing event {EventType}", type);
                throw;
            }
        }
    }

    // Program
    public class Program
    {
        public static async Task Main(string[] args)
        {
            var host = Host.CreateDefaultBuilder(args)
                .ConfigureServices((hostContext, services) =>
                {
                    // Kafka yapılandırması
                    services.AddSingleton(new ConsumerConfig
                    {
                        BootstrapServers = "localhost:9092",
                        GroupId = "account-projection",
                        AutoOffsetReset = AutoOffsetReset.Earliest,
                        EnableAutoCommit = false
                    });

                    services.AddSingleton(new ProducerConfig
                    {
                        BootstrapServers = "localhost:9092",
                        Acks = Acks.All
                    });

                    // Servisler
                    services.AddSingleton<AccountProjection>();
                    services.AddSingleton<IEventPublisher, KafkaEventPublisher>();
                    services.AddHostedService<EventConsumerService>();
                })
                .Build();

            // Örnek olaylar oluştur
            var publisher = host.Services.GetRequiredService<IEventPublisher>();
            var logger = host.Services.GetRequiredService<ILogger<Program>>();

            // Hesap oluştur
            await publisher.PublishEventAsync(
                EventTypes.AccountCreated,
                Guid.NewGuid().ToString(),
                new AccountCreatedEvent
                {
                    Id = "acc-1",
                    Owner = "John Doe"
                });

            // Para yatır
            await publisher.PublishEventAsync(
                EventTypes.MoneyDeposited,
                Guid.NewGuid().ToString(),
                new MoneyDepositedEvent
                {
                    AccountId = "acc-1",
                    Amount = 100
                });

            // Para çek
            await publisher.PublishEventAsync(
                EventTypes.MoneyWithdrawn,
                Guid.NewGuid().ToString(),
                new MoneyWithdrawnEvent
                {
                    AccountId = "acc-1",
                    Amount = 30
                });

            // Host'u başlat
            await host.RunAsync();
        }
    }
}

Sonuç

Bu makalede, Golang ve C# ile Apache Kafka uygulamaları geliştirmeyi karşılaştırmalı olarak inceledik. Her iki dil de Kafka ile çalışmak için güçlü kütüphaneler ve yetenekler sunmaktadır.

Golang’ın Avantajları

  • Düşük bellek kullanımı ve yüksek performans
  • Goroutine’ler ile kolay eşzamanlılık yönetimi
  • Tek binary dağıtım kolaylığı
  • Sarama gibi olgun ve performanslı kütüphaneler

C#‘ın Avantajları

  • Zengin .NET ekosistemi ve kurumsal entegrasyon yetenekleri
  • Güçlü tip sistemi ve nesne yönelimli programlama özellikleri
  • Async/await ile kolay asenkron programlama
  • Microsoft ve Confluent’tan kurumsal destek

Her iki dil de farklı kullanım senaryoları için uygun olabilir. Projenizin gereksinimlerine, ekibinizin uzmanlığına ve mevcut altyapınıza göre en uygun dili seçmeniz önemlidir.

Kafka, modern dağıtık sistemlerin omurgasını oluşturan güçlü bir platform olmaya devam etmektedir. Mikroservis mimarileri, olay kaynaklı sistemler ve gerçek zamanlı veri işleme senaryolarında Kafka’nın sunduğu yeteneklerden faydalanmak, ölçeklenebilir ve dayanıklı uygulamalar geliştirmenize yardımcı olacaktır.

Kaynaklar