Apache Kafka, 2011 yılında LinkedIn tarafından geliştirilen ve sonrasında Apache Software Foundation’a bağışlanan açık kaynaklı bir dağıtık olay akışı platformudur. Başlangıçta LinkedIn’in iç ihtiyaçları için geliştirilen Kafka, bugün Netflix, Uber, Twitter, Airbnb gibi büyük teknoloji şirketleri tarafından yaygın olarak kullanılmaktadır.
Kafka’nın popülerliğinin temel nedenleri şunlardır:
- Yüksek Verimlilik: Saniyede milyonlarca mesajı işleyebilme kapasitesi
- Düşük Gecikme: Milisaniye seviyesinde gecikme süreleri
- Dayanıklılık: Veri kaybını önleyen güvenilir depolama mekanizması
- Ölçeklenebilirlik: Yatay olarak kolayca ölçeklenebilme yeteneği
- Dağıtık Mimari: Yüksek erişilebilirlik için çoklu broker desteği
Modern mikroservis mimarilerinin yaygınlaşmasıyla birlikte, servisler arası iletişim için Kafka gibi mesajlaşma sistemleri vazgeçilmez hale gelmiştir. Bu makalede, iki popüler programlama dili olan Golang ve C#‘ın Kafka ekosistemindeki yerini inceleyeceğiz.
Kafka Mimarisi ve Temel Kavramlar
Apache Kafka’yı anlamak için temel bileşenlerini ve kavramlarını bilmek önemlidir:
Temel Bileşenler
- Broker: Kafka cluster’ını oluşturan sunucular
- Topic: Mesajların kategorize edildiği kanallar
- Partition: Topic’lerin paralel işlenebilmesi için bölünmüş parçaları
- Producer: Kafka’ya mesaj gönderen uygulamalar
- Consumer: Kafka’dan mesaj okuyan uygulamalar
- Consumer Group: Bir topic’in partitionlarını paylaşarak okuyan consumer’lar grubu
- Zookeeper/KRaft: Kafka cluster’ının metadata yönetimini sağlayan servis
Kafka’nın Çalışma Prensibi
Kafka, “commit log” adı verilen sıralı, ekleme-odaklı bir veri yapısı kullanır. Bu yapı sayesinde:
- Mesajlar disk üzerinde sıralı olarak saklanır
- Consumer’lar kendi okuma pozisyonlarını (offset) takip eder
- Mesajlar belirli bir süre (retention period) saklanır
- Yüksek throughput için sequential I/O ve zero-copy optimizasyonları kullanılır
Kafka’nın Yaygın Kullanım Senaryoları
Apache Kafka, çeşitli endüstrilerde farklı amaçlar için kullanılmaktadır. Aşağıdaki diyagram, Kafka ekosisteminin genel bir görünümünü ve yaygın kullanım senaryolarını göstermektedir:
Kafka’nın Yaygın Kullanım Senaryoları
Apache Kafka, çeşitli endüstrilerde farklı amaçlar için kullanılmaktadır. Aşağıdaki diyagram, Kafka ekosisteminin genel bir görünümünü ve yaygın kullanım senaryolarını göstermektedir:
Yukarıdaki diyagram, Kafka ekosisteminin dört ana bileşenini göstermektedir:
-
Veri Kaynakları: Veritabanları, API servisleri, IoT cihazları, log sistemleri ve kullanıcı etkileşimleri gibi çeşitli kaynaklardan veriler Kafka’ya aktarılır.
-
Kafka Ekosistemi: Kafka cluster’ı, producer’lar, consumer’lar ve Kafka Connect, Kafka Streams, ksqlDB gibi ek bileşenlerden oluşur. Bu ekosistem, verilerin güvenilir bir şekilde alınması, saklanması, işlenmesi ve dağıtılması için gerekli altyapıyı sağlar.
-
Kullanım Senaryoları: Kafka’nın yaygın kullanım alanları arasında mikroservis iletişimi, olay kaynaklı mimari, gerçek zamanlı analitik, veri entegrasyonu, log toplama, metrik izleme ve stream processing bulunmaktadır.
-
Hedef Sistemler: İşlenen veriler, veri ambarları, dashboard’lar, alarm sistemleri, uygulamalar, makine öğrenmesi modelleri, arama motorları ve önbellekler gibi çeşitli hedef sistemlere aktarılır.
Şimdi bu kullanım senaryolarını daha detaylı inceleyelim:
Mesajlaşma Sistemi
Geleneksel mesaj kuyrukları yerine, yüksek performanslı ve ölçeklenebilir bir mesajlaşma altyapısı olarak kullanılır. Mikroservisler arasındaki asenkron iletişimi sağlar.
Log Toplama
Dağıtık sistemlerden gelen logları merkezi bir noktada toplamak ve işlemek için kullanılır. Bu, sistem izleme ve sorun giderme süreçlerini kolaylaştırır.
Stream Processing
Gerçek zamanlı veri akışlarını işlemek için kullanılır. Örneğin, finansal işlemlerin anlık olarak izlenmesi veya kullanıcı davranışlarının gerçek zamanlı analizi.
Event Sourcing
Sistemdeki tüm değişikliklerin olay olarak kaydedildiği ve mevcut durumun bu olayların işlenmesiyle oluşturulduğu mimari yaklaşımda kullanılır.
Veri Entegrasyonu
Farklı sistemler arasında veri senkronizasyonu ve ETL (Extract, Transform, Load) işlemleri için kullanılır.
Telemetri Verileri
IoT cihazlarından gelen telemetri verilerinin toplanması ve işlenmesi için kullanılır.
Gerçek Zamanlı Analitik
Kullanıcı etkileşimleri, sistem metrikleri gibi verilerin gerçek zamanlı olarak analiz edilmesi için kullanılır.
Yukarıdaki diyagram, Kafka ekosisteminin dört ana bileşenini göstermektedir:
-
Veri Kaynakları: Veritabanları, API servisleri, IoT cihazları, log sistemleri ve kullanıcı etkileşimleri gibi çeşitli kaynaklardan veriler Kafka’ya aktarılır.
-
Kafka Ekosistemi: Kafka cluster’ı, producer’lar, consumer’lar ve Kafka Connect, Kafka Streams, ksqlDB gibi ek bileşenlerden oluşur. Bu ekosistem, verilerin güvenilir bir şekilde alınması, saklanması, işlenmesi ve dağıtılması için gerekli altyapıyı sağlar.
-
Kullanım Senaryoları: Kafka’nın yaygın kullanım alanları arasında mikroservis iletişimi, olay kaynaklı mimari, gerçek zamanlı analitik, veri entegrasyonu, log toplama, metrik izleme ve stream processing bulunmaktadır.
-
Hedef Sistemler: İşlenen veriler, veri ambarları, dashboard’lar, alarm sistemleri, uygulamalar, makine öğrenmesi modelleri, arama motorları ve önbellekler gibi çeşitli hedef sistemlere aktarılır.
Şimdi bu kullanım senaryolarını daha detaylı inceleyelim:
Mesajlaşma Sistemi
Geleneksel mesaj kuyrukları yerine, yüksek performanslı ve ölçeklenebilir bir mesajlaşma altyapısı olarak kullanılır. Mikroservisler arasındaki asenkron iletişimi sağlar.
Log Toplama
Dağıtık sistemlerden gelen logları merkezi bir noktada toplamak ve işlemek için kullanılır. Bu, sistem izleme ve sorun giderme süreçlerini kolaylaştırır.
Stream Processing
Gerçek zamanlı veri akışlarını işlemek için kullanılır. Örneğin, finansal işlemlerin anlık olarak izlenmesi veya kullanıcı davranışlarının gerçek zamanlı analizi.
Event Sourcing
Sistemdeki tüm değişikliklerin olay olarak kaydedildiği ve mevcut durumun bu olayların işlenmesiyle oluşturulduğu mimari yaklaşımda kullanılır.
Veri Entegrasyonu
Farklı sistemler arasında veri senkronizasyonu ve ETL (Extract, Transform, Load) işlemleri için kullanılır.
Telemetri Verileri
IoT cihazlarından gelen telemetri verilerinin toplanması ve işlenmesi için kullanılır.
Gerçek Zamanlı Analitik
Kullanıcı etkileşimleri, sistem metrikleri gibi verilerin gerçek zamanlı olarak analiz edilmesi için kullanılır.
Kafka Ekosistemi: Connect ve Streams
Kafka Connect
Kafka Connect, Kafka ile diğer sistemler arasında veri akışını kolaylaştıran bir çerçevedir. Hazır connector’lar kullanarak veya özel connector’lar geliştirerek, veritabanları, dosya sistemleri, anahtar-değer depoları ve diğer sistemlerle entegrasyon sağlar.
Golang ile Kafka Connect
Golang’da Kafka Connect için resmi bir kütüphane bulunmamaktadır, ancak özel connector’lar geliştirmek mümkündür:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
|
package main
import (
"context"
"encoding/json"
"fmt"
"log"
"time"
"github.com/Shopify/sarama"
"github.com/go-sql-driver/mysql"
)
// MySQL'den veri çeken ve Kafka'ya gönderen basit bir connector örneği
func main() {
// MySQL bağlantısı
db, err := sql.Open("mysql", "user:password@tcp(127.0.0.1:3306)/database")
if err != nil {
log.Fatal(err)
}
defer db.Close()
// Kafka producer yapılandırması
config := sarama.NewConfig()
config.Producer.Return.Successes = true
producer, err := sarama.NewSyncProducer([]string{"localhost:9092"}, config)
if err != nil {
log.Fatal(err)
}
defer producer.Close()
// Periyodik olarak veri çekme ve gönderme
ticker := time.NewTicker(1 * time.Minute)
defer ticker.Stop()
for range ticker.C {
// Veritabanından veri çekme
rows, err := db.Query("SELECT id, name, created_at FROM users WHERE updated_at > ?",
time.Now().Add(-1*time.Minute))
if err != nil {
log.Println("Query error:", err)
continue
}
for rows.Next() {
var id int
var name string
var createdAt time.Time
if err := rows.Scan(&id, &name, &createdAt); err != nil {
log.Println("Scan error:", err)
continue
}
// Veriyi JSON formatına dönüştürme
data, err := json.Marshal(map[string]interface{}{
"id": id,
"name": name,
"created_at": createdAt,
})
if err != nil {
log.Println("JSON error:", err)
continue
}
// Kafka'ya gönderme
_, _, err = producer.SendMessage(&sarama.ProducerMessage{
Topic: "db-changes",
Key: sarama.StringEncoder(fmt.Sprintf("%d", id)),
Value: sarama.ByteEncoder(data),
})
if err != nil {
log.Println("Send error:", err)
}
}
rows.Close()
}
}
|
C# ile Kafka Connect
C#‘da Kafka Connect için özel connector’lar geliştirilebilir:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
|
using System;
using System.Data.SqlClient;
using System.Text.Json;
using System.Threading;
using System.Threading.Tasks;
using Confluent.Kafka;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
namespace SqlServerKafkaConnector
{
public class SqlServerSourceConnector : BackgroundService
{
private readonly ILogger<SqlServerSourceConnector> _logger;
private readonly string _connectionString;
private readonly string _topic;
private readonly string _bootstrapServers;
private DateTime _lastCheckpoint = DateTime.MinValue;
public SqlServerSourceConnector(
ILogger<SqlServerSourceConnector> logger,
string connectionString,
string topic,
string bootstrapServers)
{
_logger = logger;
_connectionString = connectionString;
_topic = topic;
_bootstrapServers = bootstrapServers;
}
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
var config = new ProducerConfig { BootstrapServers = _bootstrapServers };
using var producer = new ProducerBuilder<string, string>(config).Build();
while (!stoppingToken.IsCancellationRequested)
{
try
{
await PollDatabaseChanges(producer);
await Task.Delay(TimeSpan.FromMinutes(1), stoppingToken);
}
catch (Exception ex)
{
_logger.LogError(ex, "Error polling database changes");
await Task.Delay(TimeSpan.FromSeconds(10), stoppingToken);
}
}
}
private async Task PollDatabaseChanges(IProducer<string, string> producer)
{
using var connection = new SqlConnection(_connectionString);
await connection.OpenAsync();
var query = @"SELECT Id, Name, CreatedAt FROM Users WHERE UpdatedAt > @LastCheckpoint";
using var command = new SqlCommand(query, connection);
command.Parameters.AddWithValue("@LastCheckpoint", _lastCheckpoint);
using var reader = await command.ExecuteReaderAsync();
while (await reader.ReadAsync())
{
var id = reader.GetInt32(0);
var name = reader.GetString(1);
var createdAt = reader.GetDateTime(2);
var data = new
{
Id = id,
Name = name,
CreatedAt = createdAt
};
var json = JsonSerializer.Serialize(data);
await producer.ProduceAsync(_topic, new Message<string, string>
{
Key = id.ToString(),
Value = json
});
_logger.LogInformation("Sent record: {Id}", id);
}
_lastCheckpoint = DateTime.UtcNow;
}
}
}
|
Kafka Streams
Kafka Streams, Kafka üzerinde stream processing uygulamaları geliştirmek için kullanılan bir kütüphanedir. Stateful ve stateless işlemler, windowing, joining ve aggregation gibi yetenekler sunar.
Golang için Stream Processing Alternatifleri
Golang’da resmi bir Kafka Streams kütüphanesi bulunmamaktadır, ancak benzer işlevselliği sağlayan özel çözümler geliştirilebilir:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
|
package main
import (
"context"
"encoding/json"
"log"
"strings"
"sync"
"time"
"github.com/Shopify/sarama"
)
// Basit bir stream processing uygulaması
func main() {
// Consumer yapılandırması
consumerConfig := sarama.NewConfig()
consumerConfig.Consumer.Return.Errors = true
consumerConfig.Consumer.Offsets.Initial = sarama.OffsetOldest
// Producer yapılandırması
producerConfig := sarama.NewConfig()
producerConfig.Producer.Return.Successes = true
// Consumer grup oluşturma
consumer, err := sarama.NewConsumerGroup([]string{"localhost:9092"}, "stream-processor", consumerConfig)
if err != nil {
log.Fatalf("Error creating consumer group: %v", err)
}
defer consumer.Close()
// Producer oluşturma
producer, err := sarama.NewSyncProducer([]string{"localhost:9092"}, producerConfig)
if err != nil {
log.Fatalf("Error creating producer: %v", err)
}
defer producer.Close()
// Stream processor
handler := &StreamProcessor{
producer: producer,
// 5 dakikalık bir pencere için basit bir in-memory state
windowedCounts: make(map[string]int),
windowStart: time.Now(),
windowSize: 5 * time.Minute,
}
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
// Tüketim döngüsü
topics := []string{"input-topic"}
for {
if err := consumer.Consume(ctx, topics, handler); err != nil {
log.Fatalf("Error from consumer: %v", err)
}
if ctx.Err() != nil {
return
}
}
}
// StreamProcessor, sarama.ConsumerGroupHandler arayüzünü uygular
type StreamProcessor struct {
producer sarama.SyncProducer
windowedCounts map[string]int
windowStart time.Time
windowSize time.Duration
mu sync.Mutex
}
func (p *StreamProcessor) Setup(session sarama.ConsumerGroupSession) error {
return nil
}
func (p *StreamProcessor) Cleanup(session sarama.ConsumerGroupSession) error {
return nil
}
func (p *StreamProcessor) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
for message := range claim.Messages() {
// Mesajı işle
var data map[string]interface{}
if err := json.Unmarshal(message.Value, &data); err != nil {
log.Printf("Error unmarshaling message: %v", err)
session.MarkMessage(message, "")
continue
}
// Örnek işlem: Kelime sayma
if text, ok := data["text"].(string); ok {
words := strings.Fields(text)
p.mu.Lock()
// Pencere süresi dolduysa, sonuçları gönder ve pencereyi sıfırla
if time.Since(p.windowStart) > p.windowSize {
for word, count := range p.windowedCounts {
result, _ := json.Marshal(map[string]interface{}{
"word": word,
"count": count,
"windowEnd": time.Now(),
"windowStart": p.windowStart,
})
p.producer.SendMessage(&sarama.ProducerMessage{
Topic: "word-counts",
Key: sarama.StringEncoder(word),
Value: sarama.ByteEncoder(result),
})
}
p.windowedCounts = make(map[string]int)
p.windowStart = time.Now()
}
// Kelimeleri say
for _, word := range words {
word = strings.ToLower(strings.Trim(word, ".,!?\"':;()"))
if word != "" {
p.windowedCounts[word]++
}
}
p.mu.Unlock()
}
session.MarkMessage(message, "")
}
return nil
}
|
C# ile Kafka Streams Benzeri İşlemler
C#‘da Kafka Streams benzeri işlevsellik için özel çözümler geliştirilebilir:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
|
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
using System.Text.Json;
using System.Text.RegularExpressions;
using System.Threading;
using System.Threading.Tasks;
using Confluent.Kafka;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
namespace KafkaStreamProcessor
{
public class WordCountProcessor : BackgroundService
{
private readonly ILogger<WordCountProcessor> _logger;
private readonly string _bootstrapServers;
private readonly string _inputTopic;
private readonly string _outputTopic;
private readonly TimeSpan _windowSize = TimeSpan.FromMinutes(5);
private ConcurrentDictionary<string, int> _wordCounts = new ConcurrentDictionary<string, int>();
private DateTime _windowStart = DateTime.UtcNow;
public WordCountProcessor(
ILogger<WordCountProcessor> logger,
string bootstrapServers,
string inputTopic,
string outputTopic)
{
_logger = logger;
_bootstrapServers = bootstrapServers;
_inputTopic = inputTopic;
_outputTopic = outputTopic;
}
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
var consumerConfig = new ConsumerConfig
{
BootstrapServers = _bootstrapServers,
GroupId = "word-count-processor",
AutoOffsetReset = AutoOffsetReset.Earliest,
EnableAutoCommit = false
};
var producerConfig = new ProducerConfig
{
BootstrapServers = _bootstrapServers
};
using var consumer = new ConsumerBuilder<Ignore, string>(consumerConfig).Build();
using var producer = new ProducerBuilder<string, string>(producerConfig).Build();
consumer.Subscribe(_inputTopic);
try
{
while (!stoppingToken.IsCancellationRequested)
{
try
{
var consumeResult = consumer.Consume(stoppingToken);
// Mesajı işle
try
{
var data = JsonSerializer.Deserialize<Dictionary<string, JsonElement>>(consumeResult.Message.Value);
if (data.TryGetValue("text", out var textElement) && textElement.ValueKind == JsonValueKind.String)
{
var text = textElement.GetString();
if (!string.IsNullOrEmpty(text))
{
ProcessText(text);
}
}
}
catch (JsonException ex)
{
_logger.LogError(ex, "Error deserializing message");
}
// Pencere süresi dolduysa, sonuçları gönder
if (DateTime.UtcNow - _windowStart > _windowSize)
{
await SendWindowResults(producer);
}
consumer.Commit(consumeResult);
}
catch (ConsumeException ex)
{
_logger.LogError(ex, "Error consuming message");
}
}
}
finally
{
consumer.Close();
}
}
private void ProcessText(string text)
{
var words = Regex.Split(text, @"\W+")
.Where(w => !string.IsNullOrEmpty(w))
.Select(w => w.ToLowerInvariant());
foreach (var word in words)
{
_wordCounts.AddOrUpdate(word, 1, (_, count) => count + 1);
}
}
private async Task SendWindowResults(IProducer<string, string> producer)
{
var windowEnd = DateTime.UtcNow;
var tasks = new List<Task>();
foreach (var entry in _wordCounts.ToArray())
{
var result = new
{
Word = entry.Key,
Count = entry.Value,
WindowStart = _windowStart,
WindowEnd = windowEnd
};
var json = JsonSerializer.Serialize(result);
tasks.Add(producer.ProduceAsync(_outputTopic, new Message<string, string>
{
Key = entry.Key,
Value = json
}));
_logger.LogInformation("Word count: {Word} = {Count}", entry.Key, entry.Value);
}
await Task.WhenAll(tasks);
_wordCounts.Clear();
_windowStart = DateTime.UtcNow;
}
}
}
|
Kafka Güvenliği ve Yetkilendirme
Kafka’da güvenlik ve yetkilendirme, özellikle kurumsal ortamlarda kritik öneme sahiptir. Her iki dilde de güvenli Kafka bağlantıları nasıl oluşturulacağını inceleyelim:
Golang ile Güvenli Kafka Bağlantıları
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
|
package main
import (
"crypto/tls"
"crypto/x509"
"io/ioutil"
"log"
"github.com/Shopify/sarama"
)
func main() {
// TLS yapılandırması
tlsConfig, err := createTLSConfig("client.cer.pem", "client.key.pem", "ca.pem")
if err != nil {
log.Fatalf("Error creating TLS config: %v", err)
}
// Kafka yapılandırması
config := sarama.NewConfig()
config.Net.TLS.Enable = true
config.Net.TLS.Config = tlsConfig
config.Net.SASL.Enable = true
config.Net.SASL.Mechanism = sarama.SASLTypePlaintext
config.Net.SASL.User = "username"
config.Net.SASL.Password = "password"
config.Producer.Return.Successes = true
// Producer oluşturma
producer, err := sarama.NewSyncProducer([]string{"kafka:9093"}, config)
if err != nil {
log.Fatalf("Error creating producer: %v", err)
}
defer producer.Close()
// Mesaj gönderme
msg := &sarama.ProducerMessage{
Topic: "secure-topic",
Value: sarama.StringEncoder("Secure message"),
}
partition, offset, err := producer.SendMessage(msg)
if err != nil {
log.Fatalf("Error sending message: %v", err)
}
log.Printf("Message sent to partition %d at offset %d", partition, offset)
}
func createTLSConfig(clientCertFile, clientKeyFile, caCertFile string) (*tls.Config, error) {
// CA sertifikasını yükle
caCert, err := ioutil.ReadFile(caCertFile)
if err != nil {
return nil, err
}
caCertPool := x509.NewCertPool()
caCertPool.AppendCertsFromPEM(caCert)
// İstemci sertifikası ve anahtarını yükle
cert, err := tls.LoadX509KeyPair(clientCertFile, clientKeyFile)
if err != nil {
return nil, err
}
// TLS yapılandırması oluştur
tlsConfig := &tls.Config{
Certificates: []tls.Certificate{cert},
RootCAs: caCertPool,
InsecureSkipVerify: false,
}
return tlsConfig, nil
}
|
C# ile Güvenli Kafka Bağlantıları
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
|
using System;
using System.IO;
using System.Security.Cryptography.X509Certificates;
using System.Threading;
using System.Threading.Tasks;
using Confluent.Kafka;
namespace SecureKafkaClient
{
class Program
{
static async Task Main(string[] args)
{
var config = new ProducerConfig
{
BootstrapServers = "kafka:9093",
// SSL/TLS yapılandırması
SecurityProtocol = SecurityProtocol.SaslSsl,
SslCaLocation = "ca.pem",
SslCertificateLocation = "client.cer.pem",
SslKeyLocation = "client.key.pem",
// SASL yapılandırması
SaslMechanism = SaslMechanism.Plain,
SaslUsername = "username",
SaslPassword = "password"
};
using var producer = new ProducerBuilder<Null, string>(config).Build();
try
{
var result = await producer.ProduceAsync("secure-topic",
new Message<Null, string> { Value = "Secure message" });
Console.WriteLine($"Message sent to {result.TopicPartitionOffset}");
}
catch (ProduceException<Null, string> e)
{
Console.WriteLine($"Error sending message: {e.Error.Reason}");
}
}
}
}
|
Kafka Monitoring ve Observability
Kafka uygulamalarının izlenmesi ve gözlemlenmesi, üretim ortamlarında kritik öneme sahiptir. Her iki dilde de monitoring çözümleri nasıl uygulanacağını inceleyelim:
Golang ile Kafka Monitoring
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
|
package main
import (
"log"
"net/http"
"time"
"github.com/Shopify/sarama"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/prometheus/client_golang/prometheus/promhttp"
)
var (
// Prometheus metrikleri
messagesProcessed = promauto.NewCounter(prometheus.CounterOpts{
Name: "kafka_consumer_messages_processed_total",
Help: "The total number of processed messages",
})
processingLatency = promauto.NewHistogram(prometheus.HistogramOpts{
Name: "kafka_consumer_processing_latency_seconds",
Help: "Message processing latency in seconds",
Buckets: prometheus.DefBuckets,
})
consumerLag = promauto.NewGaugeVec(prometheus.GaugeOpts{
Name: "kafka_consumer_lag",
Help: "The lag of the consumer per partition",
}, []string{"topic", "partition"})
)
func main() {
// Prometheus HTTP sunucusu
go func() {
http.Handle("/metrics", promhttp.Handler())
log.Fatal(http.ListenAndServe(":2112", nil))
}()
// Kafka consumer yapılandırması
config := sarama.NewConfig()
config.Consumer.Return.Errors = true
config.Consumer.Offsets.Initial = sarama.OffsetOldest
consumer, err := sarama.NewConsumerGroup([]string{"localhost:9092"}, "monitored-consumer", config)
if err != nil {
log.Fatalf("Error creating consumer group: %v", err)
}
defer consumer.Close()
// Consumer handler
handler := &MonitoredConsumerHandler{}
// Tüketim döngüsü
topics := []string{"monitored-topic"}
for {
if err := consumer.Consume(context.Background(), topics, handler); err != nil {
log.Fatalf("Error from consumer: %v", err)
}
}
}
// MonitoredConsumerHandler, sarama.ConsumerGroupHandler arayüzünü uygular
type MonitoredConsumerHandler struct{}
func (h *MonitoredConsumerHandler) Setup(session sarama.ConsumerGroupSession) error {
return nil
}
func (h *MonitoredConsumerHandler) Cleanup(session sarama.ConsumerGroupSession) error {
return nil
}
func (h *MonitoredConsumerHandler) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
// Tüketici gecikmesini izle
go monitorConsumerLag(session, claim)
for message := range claim.Messages() {
// İşleme başlangıç zamanı
startTime := time.Now()
// Mesajı işle
processMessage(message)
// Metrikleri güncelle
messagesProcessed.Inc()
processingLatency.Observe(time.Since(startTime).Seconds())
// Mesajı işaretleme
session.MarkMessage(message, "")
}
return nil
}
func monitorConsumerLag(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) {
ticker := time.NewTicker(10 * time.Second)
defer ticker.Stop()
topic := claim.Topic()
partition := claim.Partition()
for range ticker.C {
// Tüketici gecikmesini hesapla
currentOffset := session.MarkOffset(topic, partition, 0, "")
highWatermark := claim.HighWaterMarkOffset()
lag := highWatermark - currentOffset
// Metriği güncelle
consumerLag.WithLabelValues(topic, fmt.Sprintf("%d", partition)).Set(float64(lag))
}
}
func processMessage(message *sarama.ConsumerMessage) {
// Mesaj işleme mantığı
time.Sleep(100 * time.Millisecond) // Simüle edilmiş işleme süresi
}
|
C# ile Kafka Monitoring
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
|
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Threading;
using System.Threading.Tasks;
using Confluent.Kafka;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using Prometheus;
namespace KafkaMonitoringExample
{
public class MonitoredKafkaConsumer : BackgroundService
{
private readonly ILogger<MonitoredKafkaConsumer> _logger;
private readonly ConsumerConfig _config;
private readonly string _topic;
// Prometheus metrikleri
private static readonly Counter MessagesProcessed = Metrics.CreateCounter(
"kafka_consumer_messages_processed_total",
"The total number of processed messages");
private static readonly Histogram ProcessingLatency = Metrics.CreateHistogram(
"kafka_consumer_processing_latency_seconds",
"Message processing latency in seconds",
new HistogramConfiguration
{
Buckets = Histogram.ExponentialBuckets(0.001, 2, 10)
});
private static readonly Gauge ConsumerLag = Metrics.CreateGauge(
"kafka_consumer_lag",
"The lag of the consumer per partition",
new GaugeConfiguration
{
LabelNames = new[] { "topic", "partition" }
});
public MonitoredKafkaConsumer(
ILogger<MonitoredKafkaConsumer> logger,
ConsumerConfig config,
string topic)
{
_logger = logger;
_config = config;
_topic = topic;
// Prometheus HTTP sunucusu başlat
var server = new MetricServer(port: 9090);
server.Start();
}
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
using var consumer = new ConsumerBuilder<Ignore, string>(_config).Build();
consumer.Subscribe(_topic);
// Tüketici gecikmesini izleme görevi
_ = Task.Run(() => MonitorConsumerLag(consumer, stoppingToken), stoppingToken);
try
{
while (!stoppingToken.IsCancellationRequested)
{
try
{
var consumeResult = consumer.Consume(stoppingToken);
// İşleme başlangıç zamanı
var stopwatch = Stopwatch.StartNew();
// Mesajı işle
await ProcessMessageAsync(consumeResult.Message.Value);
// Metrikleri güncelle
MessagesProcessed.Inc();
ProcessingLatency.Observe(stopwatch.Elapsed.TotalSeconds);
// Mesajı işaretleme
consumer.Commit(consumeResult);
}
catch (ConsumeException ex)
{
_logger.LogError(ex, "Error consuming message");
}
}
}
finally
{
consumer.Close();
}
}
private async Task ProcessMessageAsync(string message)
{
// Mesaj işleme mantığı
await Task.Delay(100); // Simüle edilmiş işleme süresi
}
private async Task MonitorConsumerLag(IConsumer<Ignore, string> consumer, CancellationToken stoppingToken)
{
while (!stoppingToken.IsCancellationRequested)
{
try
{
var watermarkOffsets = consumer.GetWatermarkOffsets(
new TopicPartition(_topic, new Partition(0)));
var committedOffsets = consumer.Committed(
new List<TopicPartition> { new TopicPartition(_topic, new Partition(0)) },
TimeSpan.FromSeconds(10));
if (committedOffsets.Count > 0)
{
var lag = watermarkOffsets.High - committedOffsets[0].Offset;
ConsumerLag.WithLabels(_topic, "0").Set(lag.Value);
_logger.LogInformation("Consumer lag: {Lag}", lag);
}
}
catch (Exception ex)
{
_logger.LogError(ex, "Error monitoring consumer lag");
}
await Task.Delay(TimeSpan.FromSeconds(10), stoppingToken);
}
}
}
}
|
Kafka Uygulamalarında Hata Yönetimi ve Dayanıklılık
Kafka uygulamalarında hata yönetimi ve dayanıklılık, üretim ortamlarında kritik öneme sahiptir. Her iki dilde de bu konuları nasıl ele alabileceğimizi inceleyelim:
Golang ile Hata Yönetimi ve Dayanıklılık
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
|
package main
import (
"context"
"errors"
"log"
"os"
"os/signal"
"syscall"
"time"
"github.com/Shopify/sarama"
"github.com/cenkalti/backoff/v4"
)
func main() {
// Graceful shutdown için sinyal yakalama
signals := make(chan os.Signal, 1)
signal.Notify(signals, syscall.SIGINT, syscall.SIGTERM)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
// Sinyal yakalama goroutine
go func() {
<-signals
log.Println("Shutdown signal received, closing consumer...")
cancel()
}()
// Kafka consumer'ı başlat
if err := startConsumerWithRetry(ctx); err != nil {
log.Fatalf("Fatal error: %v", err)
}
}
func startConsumerWithRetry(ctx context.Context) error {
// Exponential backoff stratejisi
expBackoff := backoff.NewExponentialBackOff()
expBackoff.MaxElapsedTime = 5 * time.Minute
return backoff.RetryNotify(
func() error {
return startConsumer(ctx)
},
expBackoff,
func(err error, duration time.Duration) {
log.Printf("Error: %v, retrying in %v...", err, duration)
},
)
}
func startConsumer(ctx context.Context) error {
// Kafka yapılandırması
config := sarama.NewConfig()
config.Consumer.Return.Errors = true
config.Consumer.Offsets.Initial = sarama.OffsetOldest
config.Consumer.Offsets.AutoCommit.Enable = false
config.Consumer.Retry.Backoff = 1 * time.Second
config.Consumer.Retry.Max = 5
// Consumer grup oluşturma
group, err := sarama.NewConsumerGroup([]string{"localhost:9092"}, "resilient-consumer", config)
if err != nil {
return err
}
defer group.Close()
// Consumer hata kanalını izleme
go func() {
for err := range group.Errors() {
log.Printf("Consumer group error: %v", err)
}
}()
// Consumer handler
handler := &ResilientConsumerHandler{
readyC: make(chan bool),
}
// Tüketim döngüsü
topics := []string{"resilient-topic"}
// Handler hazır olana kadar bekle
go func() {
if err := group.Consume(ctx, topics, handler); err != nil {
log.Printf("Error from consumer: %v", err)
}
}()
<-handler.readyC
log.Println("Consumer is ready")
// Bağlam iptal edilene kadar bekle
<-ctx.Done()
log.Println("Context cancelled, closing consumer")
// Consumer'ın kapanması için biraz bekle
time.Sleep(2 * time.Second)
return nil
}
// ResilientConsumerHandler, sarama.ConsumerGroupHandler arayüzünü uygular
type ResilientConsumerHandler struct {
readyC chan bool
}
func (h *ResilientConsumerHandler) Setup(session sarama.ConsumerGroupSession) error {
// Consumer hazır olduğunda bildir
close(h.readyC)
return nil
}
func (h *ResilientConsumerHandler) Cleanup(session sarama.ConsumerGroupSession) error {
return nil
}
func (h *ResilientConsumerHandler) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
for {
select {
case message, ok := <-claim.Messages():
if !ok {
return nil
}
// Mesajı işle
if err := processMessageWithRetry(message); err != nil {
// Kritik hata durumunda işlemi durdur
if errors.Is(err, ErrCritical) {
return err
}
// Geçici hata durumunda mesajı işaretleme ve devam et
log.Printf("Error processing message, skipping: %v", err)
}
// Mesajı işaretleme
session.MarkMessage(message, "")
case <-session.Context().Done():
return nil
}
}
}
// Özel hata türleri
var ErrCritical = errors.New("critical error")
var ErrTemporary = errors.New("temporary error")
func processMessageWithRetry(message *sarama.ConsumerMessage) error {
// Retry stratejisi
expBackoff := backoff.NewExponentialBackOff()
expBackoff.MaxElapsedTime = 30 * time.Second
return backoff.Retry(
func() error {
err := processMessage(message)
// Geçici hata durumunda yeniden dene
if errors.Is(err, ErrTemporary) {
return err
}
// Diğer hata türlerinde yeniden deneme
return nil
},
expBackoff,
)
}
func processMessage(message *sarama.ConsumerMessage) error {
// Mesaj işleme mantığı
// ...
return nil
}
|
C# ile Hata Yönetimi ve Dayanıklılık
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
|
using System;
using System.Threading;
using System.Threading.Tasks;
using Confluent.Kafka;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using Polly;
using Polly.Retry;
namespace ResilientKafkaConsumer
{
public class ResilientKafkaConsumerService : BackgroundService
{
private readonly ILogger<ResilientKafkaConsumerService> _logger;
private readonly ConsumerConfig _config;
private readonly string _topic;
private readonly AsyncRetryPolicy _retryPolicy;
public ResilientKafkaConsumerService(
ILogger<ResilientKafkaConsumerService> logger,
ConsumerConfig config,
string topic)
{
_logger = logger;
_config = config;
_topic = topic;
// Polly retry politikası
_retryPolicy = Policy
.Handle<Exception>(ex => !(ex is OperationCanceledException) && IsTransientException(ex))
.WaitAndRetryAsync(
5,
retryAttempt => TimeSpan.FromSeconds(Math.Pow(2, retryAttempt)),
(exception, timeSpan, retryCount, context) =>
{
_logger.LogWarning(
exception,
"Error processing message. Retrying in {RetryTimeSpan}s. Attempt {RetryCount}",
timeSpan.TotalSeconds,
retryCount);
}
);
}
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
_logger.LogInformation("Resilient Kafka consumer service starting");
// Graceful shutdown için CancellationToken kullanımı
using var linkedCts = CancellationTokenSource.CreateLinkedTokenSource(stoppingToken);
try
{
await StartConsumerWithRetryAsync(linkedCts.Token);
}
catch (OperationCanceledException)
{
_logger.LogInformation("Consumer service was cancelled");
}
catch (Exception ex)
{
_logger.LogError(ex, "Fatal error in consumer service");
throw;
}
finally
{
_logger.LogInformation("Resilient Kafka consumer service stopping");
}
}
private async Task StartConsumerWithRetryAsync(CancellationToken cancellationToken)
{
// Bağlantı kurma yeniden deneme politikası
var connectionRetryPolicy = Policy
.Handle<Exception>(ex => !(ex is OperationCanceledException))
.WaitAndRetryForeverAsync(
retryAttempt => TimeSpan.FromSeconds(Math.Min(30, Math.Pow(2, retryAttempt))),
(exception, timeSpan, context) =>
{
_logger.LogWarning(
exception,
"Error connecting to Kafka. Retrying in {RetryTimeSpan}s",
timeSpan.TotalSeconds);
}
);
await connectionRetryPolicy.ExecuteAsync(async () =>
{
using var consumer = new ConsumerBuilder<Ignore, string>(_config)
.SetErrorHandler((_, error) =>
{
_logger.LogError("Kafka error: {ErrorReason}", error.Reason);
})
.SetPartitionsAssignedHandler((c, partitions) =>
{
_logger.LogInformation("Partitions assigned: {Partitions}", string.Join(", ", partitions));
})
.SetPartitionsRevokedHandler((c, partitions) =>
{
_logger.LogInformation("Partitions revoked: {Partitions}", string.Join(", ", partitions));
})
.Build();
try
{
consumer.Subscribe(_topic);
_logger.LogInformation("Subscribed to topic: {Topic}", _topic);
while (!cancellationToken.IsCancellationRequested)
{
try
{
var consumeResult = consumer.Consume(cancellationToken);
// Mesajı işle
await ProcessMessageWithRetryAsync(consumeResult, cancellationToken);
// Başarılı işleme sonrası offset'i commit et
try
{
consumer.Commit(consumeResult);
}
catch (KafkaException ex)
{
_logger.LogWarning(ex, "Error committing offset");
}
}
catch (ConsumeException ex)
{
_logger.LogError(ex, "Error consuming message");
}
}
}
finally
{
// Graceful shutdown
try
{
consumer.Close();
_logger.LogInformation("Consumer closed");
}
catch (Exception ex)
{
_logger.LogWarning(ex, "Error closing consumer");
}
}
});
}
private async Task ProcessMessageWithRetryAsync(ConsumeResult<Ignore, string> consumeResult, CancellationToken cancellationToken)
{
await _retryPolicy.ExecuteAsync(async () =>
{
try
{
await ProcessMessageAsync(consumeResult.Message.Value, cancellationToken);
_logger.LogInformation(
"Processed message at {TopicPartitionOffset}",
consumeResult.TopicPartitionOffset);
}
catch (Exception ex) when (IsCriticalException(ex))
{
_logger.LogCritical(ex, "Critical error processing message");
// Kritik hataları yeniden atmak için
throw;
}
catch (Exception ex) when (!IsTransientException(ex))
{
_logger.LogError(ex, "Non-transient error processing message");
// Geçici olmayan hataları yeniden atmak için
throw;
}
});
}
private async Task ProcessMessageAsync(string message, CancellationToken cancellationToken)
{
// Mesaj işleme mantığı
await Task.Delay(100, cancellationToken); // Simüle edilmiş işleme
// Rastgele hata simülasyonu (test amaçlı)
if (new Random().Next(100) < 10)
{
throw new TransientException("Simulated transient error");
}
}
private bool IsTransientException(Exception ex)
{
// Geçici hataları tanımlama
return ex is TransientException ||
ex is KafkaException kafkaEx && IsTransientKafkaError(kafkaEx.Error.Code);
}
private bool IsCriticalException(Exception ex)
{
// Kritik hataları tanımlama
return ex is CriticalException ||
ex is OutOfMemoryException;
}
private bool IsTransientKafkaError(ErrorCode errorCode)
{
// Geçici Kafka hatalarını tanımlama
return errorCode == ErrorCode.NetworkException ||
errorCode == ErrorCode.AllBrokersDown ||
errorCode == ErrorCode.NotCoordinatorForGroup ||
errorCode == ErrorCode.RequestTimedOut;
}
}
// Özel hata türleri
public class TransientException : Exception
{
public TransientException(string message) : base(message) { }
}
public class CriticalException : Exception
{
public CriticalException(string message) : base(message) { }
}
}
|
Kafka ile Mikroservis İletişimi
Kafka, mikroservisler arasında asenkron iletişim için mükemmel bir araçtır. Her iki dilde de mikroservis iletişimi için Kafka’nın nasıl kullanılabileceğini inceleyelim:
Golang ile Mikroservis İletişimi
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
|
package main
import (
"context"
"encoding/json"
"log"
"net/http"
"os"
"os/signal"
"syscall"
"time"
"github.com/Shopify/sarama"
"github.com/gin-gonic/gin"
)
// Olay türleri
const (
OrderCreated = "order.created"
OrderShipped = "order.shipped"
OrderDelivered = "order.delivered"
)
// Olay yapısı
type Event struct {
ID string `json:"id"`
Type string `json:"type"`
Timestamp time.Time `json:"timestamp"`
Payload interface{} `json:"payload"`
}
// Sipariş yapısı
type Order struct {
ID string `json:"id"`
CustomerID string `json:"customer_id"`
Products []Product `json:"products"`
TotalAmount float64 `json:"total_amount"`
Status string `json:"status"`
}
// Ürün yapısı
type Product struct {
ID string `json:"id"`
Name string `json:"name"`
Quantity int `json:"quantity"`
Price float64 `json:"price"`
}
// Kafka producer
var producer sarama.SyncProducer
func main() {
// Kafka yapılandırması
config := sarama.NewConfig()
config.Producer.RequiredAcks = sarama.WaitForAll
config.Producer.Retry.Max = 5
config.Producer.Return.Successes = true
// Producer oluşturma
var err error
producer, err = sarama.NewSyncProducer([]string{"localhost:9092"}, config)
if err != nil {
log.Fatalf("Failed to create producer: %s", err)
}
defer producer.Close()
// HTTP sunucusu
router := gin.Default()
router.POST("/orders", createOrder)
router.PUT("/orders/:id/ship", shipOrder)
router.PUT("/orders/:id/deliver", deliverOrder)
// HTTP sunucusunu başlat
srv := &http.Server{
Addr: ":8080",
Handler: router,
}
// Graceful shutdown
go func() {
if err := srv.ListenAndServe(); err != nil && err != http.ErrServerClosed {
log.Fatalf("Failed to start server: %s", err)
}
}()
// Sinyal yakalama
quit := make(chan os.Signal, 1)
signal.Notify(quit, syscall.SIGINT, syscall.SIGTERM)
<-quit
log.Println("Shutting down server...")
// Sunucuyu kapat
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
if err := srv.Shutdown(ctx); err != nil {
log.Fatalf("Server forced to shutdown: %s", err)
}
log.Println("Server exiting")
}
// Sipariş oluşturma handler'ı
func createOrder(c *gin.Context) {
var order Order
if err := c.ShouldBindJSON(&order); err != nil {
c.JSON(http.StatusBadRequest, gin.H{"error": err.Error()})
return
}
// Sipariş oluşturma işlemleri
order.Status = "created"
// Olayı yayınla
event := Event{
ID: generateID(),
Type: OrderCreated,
Timestamp: time.Now(),
Payload: order,
}
if err := publishEvent(OrderCreated, event); err != nil {
c.JSON(http.StatusInternalServerError, gin.H{"error": "Failed to publish event"})
return
}
c.JSON(http.StatusCreated, order)
}
// Sipariş gönderme handler'ı
func shipOrder(c *gin.Context) {
id := c.Param("id")
// Sipariş bulma ve güncelleme işlemleri (örnek)
order := Order{
ID: id,
Status: "shipped",
}
// Olayı yayınla
event := Event{
ID: generateID(),
Type: OrderShipped,
Timestamp: time.Now(),
Payload: order,
}
if err := publishEvent(OrderShipped, event); err != nil {
c.JSON(http.StatusInternalServerError, gin.H{"error": "Failed to publish event"})
return
}
c.JSON(http.StatusOK, order)
}
// Sipariş teslim handler'ı
func deliverOrder(c *gin.Context) {
id := c.Param("id")
// Sipariş bulma ve güncelleme işlemleri (örnek)
order := Order{
ID: id,
Status: "delivered",
}
// Olayı yayınla
event := Event{
ID: generateID(),
Type: OrderDelivered,
Timestamp: time.Now(),
Payload: order,
}
if err := publishEvent(OrderDelivered, event); err != nil {
c.JSON(http.StatusInternalServerError, gin.H{"error": "Failed to publish event"})
return
}
c.JSON(http.StatusOK, order)
}
// Olayı Kafka'ya yayınlama
func publishEvent(eventType string, event Event) error {
payload, err := json.Marshal(event)
if err != nil {
return err
}
// Kafka mesajı oluştur
msg := &sarama.ProducerMessage{
Topic: "orders",
Key: sarama.StringEncoder(event.ID),
Value: sarama.ByteEncoder(payload),
Headers: []sarama.RecordHeader{
{
Key: []byte("event_type"),
Value: []byte(eventType),
},
},
}
// Mesajı gönder
_, _, err = producer.SendMessage(msg)
return err
}
// Benzersiz ID oluşturma
func generateID() string {
return time.Now().Format("20060102150405") + "-" + randomString(6)
}
// Rastgele string oluşturma
func randomString(n int) string {
const letterBytes = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ"
b := make([]byte, n)
for i := range b {
b[i] = letterBytes[time.Now().UnixNano()%int64(len(letterBytes))]
}
return string(b)
}
|
C# ile Mikroservis İletişimi
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
|
using System;
using System.Collections.Generic;
using System.Text.Json;
using System.Threading.Tasks;
using Confluent.Kafka;
using Microsoft.AspNetCore.Builder;
using Microsoft.AspNetCore.Hosting;
using Microsoft.AspNetCore.Http;
using Microsoft.AspNetCore.Mvc;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
namespace OrderService
{
// Olay türleri
public static class EventTypes
{
public const string OrderCreated = "order.created";
public const string OrderShipped = "order.shipped";
public const string OrderDelivered = "order.delivered";
}
// Olay yapısı
public class Event
{
public string Id { get; set; }
public string Type { get; set; }
public DateTime Timestamp { get; set; }
public object Payload { get; set; }
}
// Sipariş yapısı
public class Order
{
public string Id { get; set; }
public string CustomerId { get; set; }
public List<Product> Products { get; set; }
public decimal TotalAmount { get; set; }
public string Status { get; set; }
}
// Ürün yapısı
public class Product
{
public string Id { get; set; }
public string Name { get; set; }
public int Quantity { get; set; }
public decimal Price { get; set; }
}
// Kafka event publisher
public interface IEventPublisher
{
Task PublishEventAsync(string eventType, Event @event);
}
public class KafkaEventPublisher : IEventPublisher
{
private readonly IProducer<string, string> _producer;
private readonly ILogger<KafkaEventPublisher> _logger;
public KafkaEventPublisher(ILogger<KafkaEventPublisher> logger, ProducerConfig config)
{
_logger = logger;
_producer = new ProducerBuilder<string, string>(config).Build();
}
public async Task PublishEventAsync(string eventType, Event @event)
{
try
{
var json = JsonSerializer.Serialize(@event);
var message = new Message<string, string>
{
Key = @event.Id,
Value = json,
Headers = new Headers
{
{ "event_type", System.Text.Encoding.UTF8.GetBytes(eventType) }
}
};
var result = await _producer.ProduceAsync("orders", message);
_logger.LogInformation(
"Published event {EventType} to {TopicPartitionOffset}",
eventType, result.TopicPartitionOffset);
}
catch (Exception ex)
{
_logger.LogError(ex, "Error publishing event {EventType}", eventType);
throw;
}
}
}
// Order controller
[ApiController]
[Route("api/[controller]")]
public class OrdersController : ControllerBase
{
private readonly IEventPublisher _eventPublisher;
private readonly ILogger<OrdersController> _logger;
public OrdersController(IEventPublisher eventPublisher, ILogger<OrdersController> logger)
{
_eventPublisher = eventPublisher;
_logger = logger;
}
[HttpPost]
public async Task<IActionResult> CreateOrder(Order order)
{
// Sipariş oluşturma işlemleri
order.Id = GenerateId();
order.Status = "created";
// Olayı yayınla
var @event = new Event
{
Id = GenerateId(),
Type = EventTypes.OrderCreated,
Timestamp = DateTime.UtcNow,
Payload = order
};
try
{
await _eventPublisher.PublishEventAsync(EventTypes.OrderCreated, @event);
return CreatedAtAction(nameof(GetOrder), new { id = order.Id }, order);
}
catch (Exception ex)
{
_logger.LogError(ex, "Error creating order");
return StatusCode(StatusCodes.Status500InternalServerError, "Failed to create order");
}
}
[HttpGet("{id}")]
public IActionResult GetOrder(string id)
{
// Gerçek uygulamada veritabanından sipariş bulma
var order = new Order { Id = id };
return Ok(order);
}
[HttpPut("{id}/ship")]
public async Task<IActionResult> ShipOrder(string id)
{
// Gerçek uygulamada veritabanından sipariş bulma ve güncelleme
var order = new Order
{
Id = id,
Status = "shipped"
};
// Olayı yayınla
var @event = new Event
{
Id = GenerateId(),
Type = EventTypes.OrderShipped,
Timestamp = DateTime.UtcNow,
Payload = order
};
try
{
await _eventPublisher.PublishEventAsync(EventTypes.OrderShipped, @event);
return Ok(order);
}
catch (Exception ex)
{
_logger.LogError(ex, "Error shipping order");
return StatusCode(StatusCodes.Status500InternalServerError, "Failed to ship order");
}
}
[HttpPut("{id}/deliver")]
public async Task<IActionResult> DeliverOrder(string id)
{
// Gerçek uygulamada veritabanından sipariş bulma ve güncelleme
var order = new Order
{
Id = id,
Status = "delivered"
};
// Olayı yayınla
var @event = new Event
{
Id = GenerateId(),
Type = EventTypes.OrderDelivered,
Timestamp = DateTime.UtcNow,
Payload = order
};
try
{
await _eventPublisher.PublishEventAsync(EventTypes.OrderDelivered, @event);
return Ok(order);
}
catch (Exception ex)
{
_logger.LogError(ex, "Error delivering order");
return StatusCode(StatusCodes.Status500InternalServerError, "Failed to deliver order");
}
}
private string GenerateId() => Guid.NewGuid().ToString("N");
}
// Startup
public class Startup
{
public void ConfigureServices(IServiceCollection services)
{
services.AddControllers();
// Kafka yapılandırması
var producerConfig = new ProducerConfig
{
BootstrapServers = "localhost:9092",
Acks = Acks.All
};
services.AddSingleton(producerConfig);
services.AddSingleton<IEventPublisher, KafkaEventPublisher>();
}
public void Configure(IApplicationBuilder app, IWebHostEnvironment env)
{
if (env.IsDevelopment())
{
app.UseDeveloperExceptionPage();
}
app.UseRouting();
app.UseEndpoints(endpoints =>
{
endpoints.MapControllers();
});
}
}
// Program
public class Program
{
public static void Main(string[] args)
{
CreateHostBuilder(args).Build().Run();
}
public static IHostBuilder CreateHostBuilder(string[] args) =>
Host.CreateDefaultBuilder(args)
.ConfigureWebHostDefaults(webBuilder =>
{
webBuilder.UseStartup<Startup>();
});
}
}
|
Kafka ile Olay Kaynaklı Mimari (Event-Sourcing)
Kafka, olay kaynaklı mimari (event-sourcing) için mükemmel bir altyapı sağlar. Her iki dilde de bu mimariyi nasıl uygulayabileceğimizi inceleyelim:
Golang ile Olay Kaynaklı Mimari
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
|
package main
import (
"context"
"encoding/json"
"fmt"
"log"
"sync"
"time"
"github.com/Shopify/sarama"
)
// Olay türleri
const (
AccountCreated = "account.created"
MoneyDeposited = "money.deposited"
MoneyWithdrawn = "money.withdrawn"
)
// Olay yapısı
type Event struct {
ID string `json:"id"`
Type string `json:"type"`
Timestamp time.Time `json:"timestamp"`
Payload interface{} `json:"payload"`
}
// Hesap yapısı
type Account struct {
ID string `json:"id"`
Owner string `json:"owner"`
Balance float64 `json:"balance"`
}
// Hesap oluşturma olayı
type AccountCreatedEvent struct {
ID string `json:"id"`
Owner string `json:"owner"`
}
// Para yatırma olayı
type MoneyDepositedEvent struct {
AccountID string `json:"account_id"`
Amount float64 `json:"amount"`
}
// Para çekme olayı
type MoneyWithdrawnEvent struct {
AccountID string `json:"account_id"`
Amount float64 `json:"amount"`
}
// Hesap projeksiyonu
type AccountProjection struct {
accounts map[string]*Account
eventHandler map[string]func(Event) error
mu sync.RWMutex
}
// Yeni hesap projeksiyonu oluşturma
func NewAccountProjection() *AccountProjection {
ap := &AccountProjection{
accounts: make(map[string]*Account),
eventHandler: make(map[string]func(Event) error),
}
// Olay işleyicilerini kaydet
ap.eventHandler[AccountCreated] = ap.handleAccountCreated
ap.eventHandler[MoneyDeposited] = ap.handleMoneyDeposited
ap.eventHandler[MoneyWithdrawn] = ap.handleMoneyWithdrawn
return ap
}
// Hesap oluşturma olayını işleme
func (ap *AccountProjection) handleAccountCreated(event Event) error {
var payload AccountCreatedEvent
if err := convertPayload(event.Payload, &payload); err != nil {
return err
}
ap.mu.Lock()
defer ap.mu.Unlock()
// Hesap zaten varsa hata döndür
if _, exists := ap.accounts[payload.ID]; exists {
return fmt.Errorf("account already exists: %s", payload.ID)
}
// Yeni hesap oluştur
ap.accounts[payload.ID] = &Account{
ID: payload.ID,
Owner: payload.Owner,
Balance: 0,
}
return nil
}
// Para yatırma olayını işleme
func (ap *AccountProjection) handleMoneyDeposited(event Event) error {
var payload MoneyDepositedEvent
if err := convertPayload(event.Payload, &payload); err != nil {
return err
}
ap.mu.Lock()
defer ap.mu.Unlock()
// Hesap yoksa hata döndür
account, exists := ap.accounts[payload.AccountID]
if !exists {
return fmt.Errorf("account not found: %s", payload.AccountID)
}
// Bakiyeyi güncelle
account.Balance += payload.Amount
return nil
}
// Para çekme olayını işleme
func (ap *AccountProjection) handleMoneyWithdrawn(event Event) error {
var payload MoneyWithdrawnEvent
if err := convertPayload(event.Payload, &payload); err != nil {
return err
}
ap.mu.Lock()
defer ap.mu.Unlock()
// Hesap yoksa hata döndür
account, exists := ap.accounts[payload.AccountID]
if !exists {
return fmt.Errorf("account not found: %s", payload.AccountID)
}
// Yetersiz bakiye kontrolü
if account.Balance < payload.Amount {
return fmt.Errorf("insufficient balance: %.2f < %.2f", account.Balance, payload.Amount)
}
// Bakiyeyi güncelle
account.Balance -= payload.Amount
return nil
}
// Olayı işleme
func (ap *AccountProjection) ApplyEvent(event Event) error {
handler, exists := ap.eventHandler[event.Type]
if !exists {
return fmt.Errorf("no handler for event type: %s", event.Type)
}
return handler(event)
}
// Hesap bilgisini getirme
func (ap *AccountProjection) GetAccount(id string) (*Account, error) {
ap.mu.RLock()
defer ap.mu.RUnlock()
account, exists := ap.accounts[id]
if !exists {
return nil, fmt.Errorf("account not found: %s", id)
}
return account, nil
}
// Payload dönüştürme yardımcı fonksiyonu
func convertPayload(src, dst interface{}) error {
data, err := json.Marshal(src)
if err != nil {
return err
}
return json.Unmarshal(data, dst)
}
func main() {
// Kafka consumer yapılandırması
config := sarama.NewConfig()
config.Consumer.Return.Errors = true
config.Consumer.Offsets.Initial = sarama.OffsetOldest
// Consumer oluşturma
consumer, err := sarama.NewConsumer([]string{"localhost:9092"}, config)
if err != nil {
log.Fatalf("Error creating consumer: %v", err)
}
defer consumer.Close()
// Olay topic'inden partition consumer oluşturma
partitionConsumer, err := consumer.ConsumePartition("account-events", 0, sarama.OffsetOldest)
if err != nil {
log.Fatalf("Error creating partition consumer: %v", err)
}
defer partitionConsumer.Close()
// Hesap projeksiyonu oluştur
projection := NewAccountProjection()
// Sinyalleri yakalama
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
// Olayları tüketme
go func() {
for {
select {
case msg := <-partitionConsumer.Messages():
var event Event
if err := json.Unmarshal(msg.Value, &event); err != nil {
log.Printf("Error unmarshaling event: %v", err)
continue
}
if err := projection.ApplyEvent(event); err != nil {
log.Printf("Error applying event: %v", err)
continue
}
log.Printf("Applied event: %s", event.Type)
case err := <-partitionConsumer.Errors():
log.Printf("Error consuming message: %v", err)
case <-ctx.Done():
return
}
}
}()
// Örnek olaylar oluştur ve uygula
events := []Event{
{
ID: "evt-1",
Type: AccountCreated,
Timestamp: time.Now(),
Payload: AccountCreatedEvent{
ID: "acc-1",
Owner: "John Doe",
},
},
{
ID: "evt-2",
Type: MoneyDeposited,
Timestamp: time.Now(),
Payload: MoneyDepositedEvent{
AccountID: "acc-1",
Amount: 100.0,
},
},
{
ID: "evt-3",
Type: MoneyWithdrawn,
Timestamp: time.Now(),
Payload: MoneyWithdrawnEvent{
AccountID: "acc-1",
Amount: 30.0,
},
},
}
for _, event := range events {
if err := projection.ApplyEvent(event); err != nil {
log.Printf("Error applying event: %v", err)
continue
}
log.Printf("Applied event: %s", event.Type)
}
// Hesap durumunu kontrol et
account, err := projection.GetAccount("acc-1")
if err != nil {
log.Fatalf("Error getting account: %v", err)
}
log.Printf("Account state: ID=%s, Owner=%s, Balance=%.2f",
account.ID, account.Owner, account.Balance)
// Programı çalışır durumda tut
select {}
}
|
C# ile Olay Kaynaklı Mimari
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
|
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Text.Json;
using System.Threading;
using System.Threading.Tasks;
using Confluent.Kafka;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
namespace EventSourcingExample
{
// Olay türleri
public static class EventTypes
{
public const string AccountCreated = "account.created";
public const string MoneyDeposited = "money.deposited";
public const string MoneyWithdrawn = "money.withdrawn";
}
// Olay yapısı
public class Event
{
public string Id { get; set; }
public string Type { get; set; }
public DateTime Timestamp { get; set; }
public JsonElement Payload { get; set; }
}
// Hesap yapısı
public class Account
{
public string Id { get; set; }
public string Owner { get; set; }
public decimal Balance { get; set; }
}
// Hesap oluşturma olayı
public class AccountCreatedEvent
{
public string Id { get; set; }
public string Owner { get; set; }
}
// Para yatırma olayı
public class MoneyDepositedEvent
{
public string AccountId { get; set; }
public decimal Amount { get; set; }
}
// Para çekme olayı
public class MoneyWithdrawnEvent
{
public string AccountId { get; set; }
public decimal Amount { get; set; }
}
// Hesap projeksiyonu
public class AccountProjection
{
private readonly ConcurrentDictionary<string, Account> _accounts = new ConcurrentDictionary<string, Account>();
private readonly Dictionary<string, Func<Event, Task>> _eventHandlers;
private readonly ILogger<AccountProjection> _logger;
public AccountProjection(ILogger<AccountProjection> logger)
{
_logger = logger;
_eventHandlers = new Dictionary<string, Func<Event, Task>>
{
{ EventTypes.AccountCreated, HandleAccountCreatedAsync },
{ EventTypes.MoneyDeposited, HandleMoneyDepositedAsync },
{ EventTypes.MoneyWithdrawn, HandleMoneyWithdrawnAsync }
};
}
// Hesap oluşturma olayını işleme
private async Task HandleAccountCreatedAsync(Event @event)
{
var payload = JsonSerializer.Deserialize<AccountCreatedEvent>(@event.Payload.GetRawText());
if (_accounts.ContainsKey(payload.Id))
{
_logger.LogWarning("Account already exists: {AccountId}", payload.Id);
return;
}
var account = new Account
{
Id = payload.Id,
Owner = payload.Owner,
Balance = 0
};
if (_accounts.TryAdd(payload.Id, account))
{
_logger.LogInformation("Account created: {AccountId}", payload.Id);
}
await Task.CompletedTask;
}
// Para yatırma olayını işleme
private async Task HandleMoneyDepositedAsync(Event @event)
{
var payload = JsonSerializer.Deserialize<MoneyDepositedEvent>(@event.Payload.GetRawText());
if (!_accounts.TryGetValue(payload.AccountId, out var account))
{
_logger.LogWarning("Account not found: {AccountId}", payload.AccountId);
return;
}
account.Balance += payload.Amount;
_logger.LogInformation("Deposited {Amount} to account {AccountId}, new balance: {Balance}",
payload.Amount, payload.AccountId, account.Balance);
await Task.CompletedTask;
}
// Para çekme olayını işleme
private async Task HandleMoneyWithdrawnAsync(Event @event)
{
var payload = JsonSerializer.Deserialize<MoneyWithdrawnEvent>(@event.Payload.GetRawText());
if (!_accounts.TryGetValue(payload.AccountId, out var account))
{
_logger.LogWarning("Account not found: {AccountId}", payload.AccountId);
return;
}
if (account.Balance < payload.Amount)
{
_logger.LogWarning("Insufficient balance: {Balance} < {Amount}", account.Balance, payload.Amount);
return;
}
account.Balance -= payload.Amount;
_logger.LogInformation("Withdrawn {Amount} from account {AccountId}, new balance: {Balance}",
payload.Amount, payload.AccountId, account.Balance);
await Task.CompletedTask;
}
// Olayı işleme
public async Task ApplyEventAsync(Event @event)
{
if (_eventHandlers.TryGetValue(@event.Type, out var handler))
{
await handler(@event);
}
else
{
_logger.LogWarning("No handler for event type: {EventType}", @event.Type);
}
}
// Hesap bilgisini getirme
public Account GetAccount(string id)
{
if (_accounts.TryGetValue(id, out var account))
{
return account;
}
return null;
}
// Tüm hesapları getirme
public IEnumerable<Account> GetAllAccounts()
{
return _accounts.Values;
}
}
// Kafka event consumer service
public class EventConsumerService : BackgroundService
{
private readonly ILogger<EventConsumerService> _logger;
private readonly ConsumerConfig _config;
private readonly AccountProjection _projection;
public EventConsumerService(
ILogger<EventConsumerService> logger,
ConsumerConfig config,
AccountProjection projection)
{
_logger = logger;
_config = config;
_projection = projection;
}
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
_logger.LogInformation("Event consumer service starting");
using var consumer = new ConsumerBuilder<string, string>(_config)
.SetErrorHandler((_, e) => _logger.LogError("Kafka error: {Reason}", e.Reason))
.Build();
consumer.Subscribe("account-events");
try
{
while (!stoppingToken.IsCancellationRequested)
{
try
{
var consumeResult = consumer.Consume(stoppingToken);
_logger.LogInformation("Consumed event from {TopicPartitionOffset}", consumeResult.TopicPartitionOffset);
var @event = JsonSerializer.Deserialize<Event>(consumeResult.Message.Value);
await _projection.ApplyEventAsync(@event);
// Commit offset
consumer.Commit(consumeResult);
}
catch (ConsumeException e)
{
_logger.LogError("Consume error: {Error}", e.Error.Reason);
}
catch (Exception e)
{
_logger.LogError(e, "Error processing event");
}
}
}
finally
{
consumer.Close();
}
}
}
// Event publisher
public interface IEventPublisher
{
Task PublishEventAsync<T>(string type, string id, T payload);
}
public class KafkaEventPublisher : IEventPublisher
{
private readonly IProducer<string, string> _producer;
private readonly ILogger<KafkaEventPublisher> _logger;
public KafkaEventPublisher(ProducerConfig config, ILogger<KafkaEventPublisher> logger)
{
_producer = new ProducerBuilder<string, string>(config).Build();
_logger = logger;
}
public async Task PublishEventAsync<T>(string type, string id, T payload)
{
var @event = new Event
{
Id = id,
Type = type,
Timestamp = DateTime.UtcNow,
Payload = JsonDocument.Parse(JsonSerializer.Serialize(payload)).RootElement
};
var json = JsonSerializer.Serialize(@event);
var message = new Message<string, string>
{
Key = id,
Value = json
};
try
{
var result = await _producer.ProduceAsync("account-events", message);
_logger.LogInformation("Published event {EventType} to {TopicPartitionOffset}",
type, result.TopicPartitionOffset);
}
catch (Exception ex)
{
_logger.LogError(ex, "Error publishing event {EventType}", type);
throw;
}
}
}
// Program
public class Program
{
public static async Task Main(string[] args)
{
var host = Host.CreateDefaultBuilder(args)
.ConfigureServices((hostContext, services) =>
{
// Kafka yapılandırması
services.AddSingleton(new ConsumerConfig
{
BootstrapServers = "localhost:9092",
GroupId = "account-projection",
AutoOffsetReset = AutoOffsetReset.Earliest,
EnableAutoCommit = false
});
services.AddSingleton(new ProducerConfig
{
BootstrapServers = "localhost:9092",
Acks = Acks.All
});
// Servisler
services.AddSingleton<AccountProjection>();
services.AddSingleton<IEventPublisher, KafkaEventPublisher>();
services.AddHostedService<EventConsumerService>();
})
.Build();
// Örnek olaylar oluştur
var publisher = host.Services.GetRequiredService<IEventPublisher>();
var logger = host.Services.GetRequiredService<ILogger<Program>>();
// Hesap oluştur
await publisher.PublishEventAsync(
EventTypes.AccountCreated,
Guid.NewGuid().ToString(),
new AccountCreatedEvent
{
Id = "acc-1",
Owner = "John Doe"
});
// Para yatır
await publisher.PublishEventAsync(
EventTypes.MoneyDeposited,
Guid.NewGuid().ToString(),
new MoneyDepositedEvent
{
AccountId = "acc-1",
Amount = 100
});
// Para çek
await publisher.PublishEventAsync(
EventTypes.MoneyWithdrawn,
Guid.NewGuid().ToString(),
new MoneyWithdrawnEvent
{
AccountId = "acc-1",
Amount = 30
});
// Host'u başlat
await host.RunAsync();
}
}
}
|
Sonuç
Bu makalede, Golang ve C# ile Apache Kafka uygulamaları geliştirmeyi karşılaştırmalı olarak inceledik. Her iki dil de Kafka ile çalışmak için güçlü kütüphaneler ve yetenekler sunmaktadır.
Golang’ın Avantajları
- Düşük bellek kullanımı ve yüksek performans
- Goroutine’ler ile kolay eşzamanlılık yönetimi
- Tek binary dağıtım kolaylığı
- Sarama gibi olgun ve performanslı kütüphaneler
C#‘ın Avantajları
- Zengin .NET ekosistemi ve kurumsal entegrasyon yetenekleri
- Güçlü tip sistemi ve nesne yönelimli programlama özellikleri
- Async/await ile kolay asenkron programlama
- Microsoft ve Confluent’tan kurumsal destek
Her iki dil de farklı kullanım senaryoları için uygun olabilir. Projenizin gereksinimlerine, ekibinizin uzmanlığına ve mevcut altyapınıza göre en uygun dili seçmeniz önemlidir.
Kafka, modern dağıtık sistemlerin omurgasını oluşturan güçlü bir platform olmaya devam etmektedir. Mikroservis mimarileri, olay kaynaklı sistemler ve gerçek zamanlı veri işleme senaryolarında Kafka’nın sunduğu yeteneklerden faydalanmak, ölçeklenebilir ve dayanıklı uygulamalar geliştirmenize yardımcı olacaktır.
Kaynaklar