Contents

Apache Kafka Applications with Golang and C#: A Comparative Analysis

Apache Kafka is an open-source distributed event streaming platform developed by LinkedIn in 2011 and later donated to the Apache Software Foundation. Initially developed for LinkedIn’s internal needs, Kafka is now widely used by major technology companies such as Netflix, Uber, Twitter, and Airbnb.

The main reasons for Kafka’s popularity are:

  • High Efficiency: Ability to process millions of messages per second
  • Low Latency: Millisecond-level delay times
  • Durability: Reliable storage mechanism preventing data loss
  • Scalability: Ability to scale horizontally with ease
  • Distributed Architecture: Multi-broker support for high availability

With the proliferation of modern microservice architectures, messaging systems like Kafka have become essential for inter-service communication. In this article, we’ll examine the place of two popular programming languages, Golang and C#, in the Kafka ecosystem.

Kafka Architecture and Basic Concepts

It’s important to understand the basic components and concepts of Apache Kafka:

Basic Components

  • Broker: Servers that make up the Kafka cluster
  • Topic: Channels where messages are categorized
  • Partition: Divided parts of topics that can be processed in parallel
  • Producer: Applications that send messages to Kafka
  • Consumer: Applications that read messages from Kafka
  • Consumer Group: A group of consumers that read by sharing the partitions of a topic
  • Zookeeper/KRaft: Service that provides metadata management for the Kafka cluster

How Kafka Works

Kafka uses a sequential, append-focused data structure called a “commit log”. Thanks to this structure:

  1. Messages are stored sequentially on disk
  2. Consumers track their own reading positions (offset)
  3. Messages are stored for a certain period (retention period)
  4. Sequential I/O and zero-copy optimizations are used for high throughput

Common Use Cases for Kafka

Apache Kafka is used for different purposes across various industries. The following diagram shows a general overview of the Kafka ecosystem and common use cases:

The diagram above shows the four main components of the Kafka ecosystem:

  1. Data Sources: Data from various sources such as databases, API services, IoT devices, log systems, and user interactions is transferred to Kafka.

  2. Kafka Ecosystem: The Kafka ecosystem consists of the Kafka cluster, producers, consumers, and additional components like Kafka Connect, Kafka Streams, and ksqlDB. This ecosystem provides the necessary infrastructure for reliably receiving, storing, processing, and distributing data.

  3. Use Cases: Common use cases for Kafka include microservice communication, event-driven architecture, real-time analytics, data integration, log collection, metric monitoring, and stream processing.

  4. Target Systems: Processed data is transferred to various target systems such as data warehouses, dashboards, alert systems, applications, machine learning models, search engines, and caches.

Now let’s examine these use cases in more detail:

Messaging System

Used as a high-performance and scalable messaging infrastructure instead of traditional message queues. Provides asynchronous communication between microservices.

Log Collection

Used to collect and process logs from distributed systems at a central point. This facilitates system monitoring and troubleshooting processes.

Stream Processing

Used to process real-time data streams. For example, real-time monitoring of financial transactions or real-time analysis of user behaviors.

Event Sourcing

Used in the architectural approach where all changes in the system are recorded as events and the current state is created by processing these events.

Data Integration

Used for data synchronization and ETL (Extract, Transform, Load) operations between different systems.

Telemetry Data

Used for collecting and processing telemetry data from IoT devices.

Real-Time Analytics

Used for real-time analysis of data such as user interactions and system metrics.

Kafka Ecosystem: Connect and Streams

Kafka Connect

Kafka Connect is a framework that facilitates data flow between Kafka and other systems. It provides integration with databases, file systems, key-value stores, and other systems using ready-made connectors or by developing custom connectors.

Kafka Connect with Golang

There is no official library for Kafka Connect in Golang, but it is possible to develop custom connectors:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
package main

import (
	"context"
	"encoding/json"
	"fmt"
	"log"
	"time"

	"github.com/Shopify/sarama"
	"github.com/go-sql-driver/mysql"
)

// A simple connector example that pulls data from MySQL and sends it to Kafka
func main() {
	// MySQL connection
	db, err := sql.Open("mysql", "user:password@tcp(127.0.0.1:3306)/database")
	if err != nil {
		log.Fatal(err)
	}
	defer db.Close()

	// Kafka producer configuration
	config := sarama.NewConfig()
	config.Producer.Return.Successes = true
	producer, err := sarama.NewSyncProducer([]string{"localhost:9092"}, config)
	if err != nil {
		log.Fatal(err)
	}
	defer producer.Close()

	// Periodically fetch and send data
	ticker := time.NewTicker(1 * time.Minute)
	defer ticker.Stop()

	for range ticker.C {
		// Fetch data from database
		rows, err := db.Query("SELECT id, name, created_at FROM users WHERE updated_at > ?", 
			time.Now().Add(-1*time.Minute))
		if err != nil {
			log.Println("Query error:", err)
			continue
		}

		for rows.Next() {
			var id int
			var name string
			var createdAt time.Time
			
			if err := rows.Scan(&id, &name, &createdAt); err != nil {
				log.Println("Scan error:", err)
				continue
			}
			
			// Convert data to JSON format
			data, err := json.Marshal(map[string]interface{}{
				"id": id,
				"name": name,
				"created_at": createdAt,
			})
			if err != nil {
				log.Println("JSON error:", err)
				continue
			}
			
			// Send to Kafka
			_, _, err = producer.SendMessage(&sarama.ProducerMessage{
				Topic: "db-changes",
				Key:   sarama.StringEncoder(fmt.Sprintf("%d", id)),
				Value: sarama.ByteEncoder(data),
			})
			if err != nil {
				log.Println("Send error:", err)
			}
		}
		rows.Close()
	}
}

Kafka Connect with C#

Custom connectors can be developed in C#:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
using System;
using System.Data.SqlClient;
using System.Text.Json;
using System.Threading;
using System.Threading.Tasks;
using Confluent.Kafka;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;

namespace SqlServerKafkaConnector
{
    public class SqlServerSourceConnector : BackgroundService
    {
        private readonly ILogger<SqlServerSourceConnector> _logger;
        private readonly string _connectionString;
        private readonly string _topic;
        private readonly string _bootstrapServers;
        private DateTime _lastCheckpoint = DateTime.MinValue;

        public SqlServerSourceConnector(
            ILogger<SqlServerSourceConnector> logger,
            string connectionString,
            string topic,
            string bootstrapServers)
        {
            _logger = logger;
            _connectionString = connectionString;
            _topic = topic;
            _bootstrapServers = bootstrapServers;
        }

        protected override async Task ExecuteAsync(CancellationToken stoppingToken)
        {
            var config = new ProducerConfig { BootstrapServers = _bootstrapServers };
            
            using var producer = new ProducerBuilder<string, string>(config).Build();
            
            while (!stoppingToken.IsCancellationRequested)
            {
                try
                {
                    await PollDatabaseChanges(producer);
                    await Task.Delay(TimeSpan.FromMinutes(1), stoppingToken);
                }
                catch (Exception ex)
                {
                    _logger.LogError(ex, "Error polling database changes");
                    await Task.Delay(TimeSpan.FromSeconds(10), stoppingToken);
                }
            }
        }

        private async Task PollDatabaseChanges(IProducer<string, string> producer)
        {
            using var connection = new SqlConnection(_connectionString);
            await connection.OpenAsync();
            
            var query = @"SELECT Id, Name, CreatedAt FROM Users WHERE UpdatedAt > @LastCheckpoint";
            
            using var command = new SqlCommand(query, connection);
            command.Parameters.AddWithValue("@LastCheckpoint", _lastCheckpoint);
            
            using var reader = await command.ExecuteReaderAsync();
            
            while (await reader.ReadAsync())
            {
                var id = reader.GetInt32(0);
                var name = reader.GetString(1);
                var createdAt = reader.GetDateTime(2);
                
                var data = new
                {
                    Id = id,
                    Name = name,
                    CreatedAt = createdAt
                };
                
                var json = JsonSerializer.Serialize(data);
                
                await producer.ProduceAsync(_topic, new Message<string, string>
                {
                    Key = id.ToString(),
                    Value = json
                });
                
                _logger.LogInformation("Sent record: {Id}", id);
            }
            
            _lastCheckpoint = DateTime.UtcNow;
        }
    }
}

Kafka Streams

Kafka Streams is a library used to develop stream processing applications on Kafka. It offers capabilities such as stateful and stateless operations, windowing, joining, and aggregation.

Stream Processing Alternatives for Golang

There is no official Kafka Streams library for Golang, but custom solutions providing similar functionality can be developed:

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
package main

import (
	"context"
	"encoding/json"
	"log"
	"strings"
	"sync"
	"time"

	"github.com/Shopify/sarama"
)

// A simple stream processing application
func main() {
	// Consumer configuration
	consumerConfig := sarama.NewConfig()
	consumerConfig.Consumer.Return.Errors = true
	consumerConfig.Consumer.Offsets.Initial = sarama.OffsetOldest

	// Producer configuration
	producerConfig := sarama.NewConfig()
	producerConfig.Producer.Return.Successes = true

	// Create consumer group
	consumer, err := sarama.NewConsumerGroup([]string{"localhost:9092"}, "stream-processor", consumerConfig)
	if err != nil {
		log.Fatalf("Error creating consumer group: %v", err)
	}
	defer consumer.Close()

	// Create producer
	producer, err := sarama.NewSyncProducer([]string{"localhost:9092"}, producerConfig)
	if err != nil {
		log.Fatalf("Error creating producer: %v", err)
	}
	defer producer.Close()

	// Stream processor
	handler := &StreamProcessor{
		producer: producer,
		// Simple in-memory state for a 5-minute window
		windowedCounts: make(map[string]int),
		windowStart:    time.Now(),
		windowSize:     5 * time.Minute,
	}

	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()

	// Consumption loop
	topics := []string{"input-topic"}
	for {
		if err := consumer.Consume(ctx, topics, handler); err != nil {
			log.Fatalf("Error from consumer: %v", err)
		}
		
		if ctx.Err() != nil {
			return
		}
	}
}

// StreamProcessor implements the sarama.ConsumerGroupHandler interface
type StreamProcessor struct {
	producer       sarama.SyncProducer
	windowedCounts map[string]int
	windowStart    time.Time
	windowSize     time.Duration
	mu             sync.Mutex
}

func (p *StreamProcessor) Setup(session sarama.ConsumerGroupSession) error {
	return nil
}

func (p *StreamProcessor) Cleanup(session sarama.ConsumerGroupSession) error {
	return nil
}

func (p *StreamProcessor) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
	for message := range claim.Messages() {
		// Process message
		var data map[string]interface{}
		if err := json.Unmarshal(message.Value, &data); err != nil {
			log.Printf("Error unmarshaling message: %v", err)
			session.MarkMessage(message, "")
			continue
		}
		
		// Example operation: Word counting
		if text, ok := data["text"].(string); ok {
			words := strings.Fields(text)
			
			p.mu.Lock()
			// If window duration is over, send results and reset window
			if time.Since(p.windowStart) > p.windowSize {
				for word, count := range p.windowedCounts {
					result, _ := json.Marshal(map[string]interface{}{
						"word":       word,
						"count":      count,
						"windowEnd":  time.Now(),
						"windowStart": p.windowStart,
					})
					
					p.producer.SendMessage(&sarama.ProducerMessage{
						Topic: "word-counts",
						Key:   sarama.StringEncoder(word),
						Value: sarama.ByteEncoder(result),
					})
				}
				
				p.windowedCounts = make(map[string]int)
				p.windowStart = time.Now()
			}
			
			// Count words
			for _, word := range words {
				word = strings.ToLower(strings.Trim(word, ".,!?\"':;()"))
				if word != "" {
					p.windowedCounts[word]++
				}
			}
			p.mu.Unlock()
		}
		
		session.MarkMessage(message, "")
	}
	return nil
}

Kafka Streams-like Operations with C#

Custom solutions providing functionality similar to Kafka Streams can be developed in C#:

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
using System.Text.Json;
using System.Text.RegularExpressions;
using System.Threading;
using System.Threading.Tasks;
using Confluent.Kafka;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;

namespace KafkaStreamProcessor
{
    public class WordCountProcessor : BackgroundService
    {
        private readonly ILogger<WordCountProcessor> _logger;
        private readonly string _bootstrapServers;
        private readonly string _inputTopic;
        private readonly string _outputTopic;
        private readonly TimeSpan _windowSize = TimeSpan.FromMinutes(5);
        
        private ConcurrentDictionary<string, int> _wordCounts = new ConcurrentDictionary<string, int>();
        private DateTime _windowStart = DateTime.UtcNow;

        public WordCountProcessor(
            ILogger<WordCountProcessor> logger,
            string bootstrapServers,
            string inputTopic,
            string outputTopic)
        {
            _logger = logger;
            _bootstrapServers = bootstrapServers;
            _inputTopic = inputTopic;
            _outputTopic = outputTopic;
        }

        protected override async Task ExecuteAsync(CancellationToken stoppingToken)
        {
            var consumerConfig = new ConsumerConfig
            {
                BootstrapServers = _bootstrapServers,
                GroupId = "word-count-processor",
                AutoOffsetReset = AutoOffsetReset.Earliest,
                EnableAutoCommit = false
            };

            var producerConfig = new ProducerConfig
            {
                BootstrapServers = _bootstrapServers
            };

            using var consumer = new ConsumerBuilder<Ignore, string>(consumerConfig).Build();
            using var producer = new ProducerBuilder<string, string>(producerConfig).Build();
            
            consumer.Subscribe(_inputTopic);
            
            try
            {
                while (!stoppingToken.IsCancellationRequested)
                {
                    try
                    {
                        var consumeResult = consumer.Consume(stoppingToken);
                        
                        // Process message
                        try
                        {
                            var data = JsonSerializer.Deserialize<Dictionary<string, JsonElement>>(consumeResult.Message.Value);
                            
                            if (data.TryGetValue("text", out var textElement) && textElement.ValueKind == JsonValueKind.String)
                            {
                                var text = textElement.GetString();
                                if (!string.IsNullOrEmpty(text))
                                {
                                    ProcessText(text);
                                }
                            }
                        }
                        catch (JsonException ex)
                        {
                            _logger.LogError(ex, "Error deserializing message");
                        }
                        
                        // If window duration is over, send results
                        if (DateTime.UtcNow - _windowStart > _windowSize)
                        {
                            await SendWindowResults(producer);
                        }
                        
                        consumer.Commit(consumeResult);
                    }
                    catch (ConsumeException ex)
                    {
                        _logger.LogError(ex, "Error consuming message");
                    }
                }
            }
            finally
            {
                consumer.Close();
            }
        }

        private void ProcessText(string text)
        {
            var words = Regex.Split(text, @"\W+")
                .Where(w => !string.IsNullOrEmpty(w))
                .Select(w => w.ToLowerInvariant());
            
            foreach (var word in words)
            {
                _wordCounts.AddOrUpdate(word, 1, (_, count) => count + 1);
            }
        }

        private async Task SendWindowResults(IProducer<string, string> producer)
        {
            var windowEnd = DateTime.UtcNow;
            var tasks = new List<Task>();
            
            foreach (var entry in _wordCounts.ToArray())
            {
                var result = new
                {
                    Word = entry.Key,
                    Count = entry.Value,
                    WindowStart = _windowStart,
                    WindowEnd = windowEnd
                };
                
                var json = JsonSerializer.Serialize(result);
                
                tasks.Add(producer.ProduceAsync(_outputTopic, new Message<string, string>
                {
                    Key = entry.Key,
                    Value = json
                }));
                
                _logger.LogInformation("Word count: {Word} = {Count}", entry.Key, entry.Value);
            }
            
            await Task.WhenAll(tasks);
            
            _wordCounts.Clear();
            _windowStart = DateTime.UtcNow;
        }
    }
}

Kafka Security and Authorization

Security and authorization in Kafka are critical, especially in enterprise environments. Let’s examine how to create secure Kafka connections in both languages:

Secure Kafka Connections with Golang

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
package main

import (
	"crypto/tls"
	"crypto/x509"
	"io/ioutil"
	"log"

	"github.com/Shopify/sarama"
)

func main() {
	// TLS configuration
	tlsConfig, err := createTLSConfig("client.cer.pem", "client.key.pem", "ca.pem")
	if err != nil {
		log.Fatalf("Error creating TLS config: %v", err)
	}

	// Kafka configuration
	config := sarama.NewConfig()
	config.Net.TLS.Enable = true
	config.Net.TLS.Config = tlsConfig
	config.Net.SASL.Enable = true
	config.Net.SASL.Mechanism = sarama.SASLTypePlaintext
	config.Net.SASL.User = "username"
	config.Net.SASL.Password = "password"
	config.Producer.Return.Successes = true

	// Create producer
	producer, err := sarama.NewSyncProducer([]string{"kafka:9093"}, config)
	if err != nil {
		log.Fatalf("Error creating producer: %v", err)
	}
	defer producer.Close()

	// Send message
	msg := &sarama.ProducerMessage{
		Topic: "secure-topic",
		Value: sarama.StringEncoder("Secure message"),
	}

	partition, offset, err := producer.SendMessage(msg)
	if err != nil {
		log.Fatalf("Error sending message: %v", err)
	}

	log.Printf("Message sent to partition %d at offset %d", partition, offset)
}

func createTLSConfig(clientCertFile, clientKeyFile, caCertFile string) (*tls.Config, error) {
	// Load CA certificate
	caCert, err := ioutil.ReadFile(caCertFile)
	if err != nil {
		return nil, err
	}

	caCertPool := x509.NewCertPool()
	caCertPool.AppendCertsFromPEM(caCert)

	// Load client certificate and key
	cert, err := tls.LoadX509KeyPair(clientCertFile, clientKeyFile)
	if err != nil {
		return nil, err
	}

	// Create TLS configuration
	tlsConfig := &tls.Config{
		Certificates:       []tls.Certificate{cert},
		RootCAs:            caCertPool,
		InsecureSkipVerify: false,
	}

	return tlsConfig, nil
}

Secure Kafka Connections with C#

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
using System;
using System.IO;
using System.Security.Cryptography.X509Certificates;
using System.Threading;
using System.Threading.Tasks;
using Confluent.Kafka;

namespace SecureKafkaClient
{
    class Program
    {
        static async Task Main(string[] args)
        {
            var config = new ProducerConfig
            {
                BootstrapServers = "kafka:9093",
                
                // SSL/TLS configuration
                SecurityProtocol = SecurityProtocol.SaslSsl,
                SslCaLocation = "ca.pem",
                SslCertificateLocation = "client.cer.pem",
                SslKeyLocation = "client.key.pem",
                
                // SASL configuration
                SaslMechanism = SaslMechanism.Plain,
                SaslUsername = "username",
                SaslPassword = "password"
            };

            using var producer = new ProducerBuilder<Null, string>(config).Build();
            
            try
            {
                var result = await producer.ProduceAsync("secure-topic", 
                    new Message<Null, string> { Value = "Secure message" });
                
                Console.WriteLine($"Message sent to {result.TopicPartitionOffset}");
            }
            catch (ProduceException<Null, string> e)
            {
                Console.WriteLine($"Error sending message: {e.Error.Reason}");
            }
        }
    }
}

Kafka Monitoring and Observability

Monitoring and observing Kafka applications is critical in production environments. Let’s examine how to implement monitoring solutions in both languages:

Kafka Monitoring with Golang

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
package main

import (
	"log"
	"net/http"
	"time"

	"github.com/Shopify/sarama"
	"github.com/prometheus/client_golang/prometheus"
	"github.com/prometheus/client_golang/prometheus/promauto"
	"github.com/prometheus/client_golang/prometheus/promhttp"
)

var (
	// Prometheus metrics
	messagesProcessed = promauto.NewCounter(prometheus.CounterOpts{
		Name: "kafka_consumer_messages_processed_total",
		Help: "The total number of processed messages",
	})
	
	processingLatency = promauto.NewHistogram(prometheus.HistogramOpts{
		Name:    "kafka_consumer_processing_latency_seconds",
		Help:    "Message processing latency in seconds",
		Buckets: prometheus.DefBuckets,
	})
	
	consumerLag = promauto.NewGaugeVec(prometheus.GaugeOpts{
		Name: "kafka_consumer_lag",
		Help: "The lag of the consumer per partition",
	}, []string{"topic", "partition"})
)

func main() {
	// Prometheus HTTP server
	go func() {
		http.Handle("/metrics", promhttp.Handler())
		log.Fatal(http.ListenAndServe(":2112", nil))
	}()

	// Kafka consumer configuration
	config := sarama.NewConfig()
	config.Consumer.Return.Errors = true
	config.Consumer.Offsets.Initial = sarama.OffsetOldest

	consumer, err := sarama.NewConsumerGroup([]string{"localhost:9092"}, "monitored-consumer", config)
	if err != nil {
		log.Fatalf("Error creating consumer group: %v", err)
	}
	defer consumer.Close()

	// Consumer handler
	handler := &MonitoredConsumerHandler{}
	
	// Consumption loop
	topics := []string{"monitored-topic"}
	for {
		if err := consumer.Consume(context.Background(), topics, handler); err != nil {
			log.Fatalf("Error from consumer: %v", err)
		}
	}
}

// MonitoredConsumerHandler implements the sarama.ConsumerGroupHandler interface
type MonitoredConsumerHandler struct{}

func (h *MonitoredConsumerHandler) Setup(session sarama.ConsumerGroupSession) error {
	return nil
}

func (h *MonitoredConsumerHandler) Cleanup(session sarama.ConsumerGroupSession) error {
	return nil
}

func (h *MonitoredConsumerHandler) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
	// Monitor consumer lag
	go monitorConsumerLag(session, claim)
	
	for message := range claim.Messages() {
		// Processing start time
		startTime := time.Now()
		
		// Process message
		processMessage(message)
		
		// Update metrics
		messagesProcessed.Inc()
		processingLatency.Observe(time.Since(startTime).Seconds())
		
		// Mark message
		session.MarkMessage(message, "")
	}
	return nil
}

func monitorConsumerLag(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) {
	ticker := time.NewTicker(10 * time.Second)
	defer ticker.Stop()
	
	topic := claim.Topic()
	partition := claim.Partition()
	
	for range ticker.C {
		// Calculate consumer lag
		currentOffset := session.MarkOffset(topic, partition, 0, "")
		highWatermark := claim.HighWaterMarkOffset()
		lag := highWatermark - currentOffset
		
		// Update metric
		consumerLag.WithLabelValues(topic, fmt.Sprintf("%d", partition)).Set(float64(lag))
	}
}

func processMessage(message *sarama.ConsumerMessage) {
	// Message processing logic
	time.Sleep(100 * time.Millisecond) // Simulated processing time
}

Kafka Monitoring with C#

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Threading;
using System.Threading.Tasks;
using Confluent.Kafka;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using Prometheus;

namespace KafkaMonitoringExample
{
    public class MonitoredKafkaConsumer : BackgroundService
    {
        private readonly ILogger<MonitoredKafkaConsumer> _logger;
        private readonly ConsumerConfig _config;
        private readonly string _topic;
        
        // Prometheus metrics
        private static readonly Counter MessagesProcessed = Metrics.CreateCounter(
            "kafka_consumer_messages_processed_total", 
            "The total number of processed messages");
        
        private static readonly Histogram ProcessingLatency = Metrics.CreateHistogram(
            "kafka_consumer_processing_latency_seconds",
            "Message processing latency in seconds",
            new HistogramConfiguration
            {
                Buckets = Histogram.ExponentialBuckets(0.001, 2, 10)
            });
        
        private static readonly Gauge ConsumerLag = Metrics.CreateGauge(
            "kafka_consumer_lag",
            "The lag of the consumer per partition",
            new GaugeConfiguration
            {
                LabelNames = new[] { "topic", "partition" }
            });

        public MonitoredKafkaConsumer(
            ILogger<MonitoredKafkaConsumer> logger,
            ConsumerConfig config,
            string topic)
        {
            _logger = logger;
            _config = config;
            _topic = topic;
            
            // Start Prometheus HTTP server
            var server = new MetricServer(port: 9090);
            server.Start();
        }

        protected override async Task ExecuteAsync(CancellationToken stoppingToken)
        {
            using var consumer = new ConsumerBuilder<Ignore, string>(_config).Build();
            consumer.Subscribe(_topic);
            
            // Consumer lag monitoring task
            _ = Task.Run(() => MonitorConsumerLag(consumer, stoppingToken), stoppingToken);
            
            try
            {
                while (!stoppingToken.IsCancellationRequested)
                {
                    try
                    {
                        var consumeResult = consumer.Consume(stoppingToken);
                        
                        // Processing start time
                        var stopwatch = Stopwatch.StartNew();
                        
                        // Process message
                        await ProcessMessageAsync(consumeResult.Message.Value);
                        
                        // Update metrics
                        MessagesProcessed.Inc();
                        ProcessingLatency.Observe(stopwatch.Elapsed.TotalSeconds);
                        
                        // Mark message
                        consumer.Commit(consumeResult);
                    }
                    catch (ConsumeException ex)
                    {
                        _logger.LogError(ex, "Error consuming message");
                    }
                }
            }
            finally
            {
                consumer.Close();
            }
        }

        private async Task ProcessMessageAsync(string message)
        {
            // Message processing logic
            await Task.Delay(100); // Simulated processing time
        }
        
        private async Task MonitorConsumerLag(IConsumer<Ignore, string> consumer, CancellationToken stoppingToken)
        {
            while (!stoppingToken.IsCancellationRequested)
            {
                try
                {
                    var watermarkOffsets = consumer.GetWatermarkOffsets(
                        new TopicPartition(_topic, new Partition(0)));
                    
                    var committedOffsets = consumer.Committed(
                        new List<TopicPartition> { new TopicPartition(_topic, new Partition(0)) },
                        TimeSpan.FromSeconds(10));
                    
                    if (committedOffsets.Count > 0)
                    {
                        var lag = watermarkOffsets.High - committedOffsets[0].Offset;
                        ConsumerLag.WithLabels(_topic, "0").Set(lag.Value);
                        _logger.LogInformation("Consumer lag: {Lag}", lag);
                    }
                }
                catch (Exception ex)
                {
                    _logger.LogError(ex, "Error monitoring consumer lag");
                }
                
                await Task.Delay(TimeSpan.FromSeconds(10), stoppingToken);
            }
        }
    }
}

Error Management and Resilience in Kafka Applications

Error management and resilience are critical in Kafka applications in production environments. Let’s examine how we can address these issues in both languages:

Error Management and Resilience with Golang

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
package main

import (
	"context"
	"errors"
	"log"
	"os"
	"os/signal"
	"syscall"
	"time"

	"github.com/Shopify/sarama"
	"github.com/cenkalti/backoff/v4"
)

func main() {
	// Signal capture for graceful shutdown
	signals := make(chan os.Signal, 1)
	signal.Notify(signals, syscall.SIGINT, syscall.SIGTERM)

	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()

	// Signal capture goroutine
	go func() {
		<-signals
		log.Println("Shutdown signal received, closing consumer...")
		cancel()
	}()

	// Start Kafka consumer
	if err := startConsumerWithRetry(ctx); err != nil {
		log.Fatalf("Fatal error: %v", err)
	}
}

func startConsumerWithRetry(ctx context.Context) error {
	// Exponential backoff strategy
	expBackoff := backoff.NewExponentialBackOff()
	expBackoff.MaxElapsedTime = 5 * time.Minute

	return backoff.RetryNotify(
		func() error {
			return startConsumer(ctx)
		},
		expBackoff,
		func(err error, duration time.Duration) {
			log.Printf("Error: %v, retrying in %v...", err, duration)
		},
	)
}

func startConsumer(ctx context.Context) error {
	// Kafka configuration
	config := sarama.NewConfig()
	config.Consumer.Return.Errors = true
	config.Consumer.Offsets.Initial = sarama.OffsetOldest
	config.Consumer.Offsets.AutoCommit.Enable = false
	config.Consumer.Retry.Backoff = 1 * time.Second
	config.Consumer.Retry.Max = 5

	// Create consumer group
	group, err := sarama.NewConsumerGroup([]string{"localhost:9092"}, "resilient-consumer", config)
	if err != nil {
		return err
	}
	defer group.Close()

	// Monitor consumer error channel
	go func() {
		for err := range group.Errors() {
			log.Printf("Consumer group error: %v", err)
		}
	}()

	// Consumer handler
	handler := &ResilientConsumerHandler{
		readyC: make(chan bool),
	}

	// Consumption loop
	topics := []string{"resilient-topic"}
	
	// Wait until handler is ready
	go func() {
		if err := group.Consume(ctx, topics, handler); err != nil {
			log.Printf("Error from consumer: %v", err)
		}
	}()

	<-handler.readyC
	log.Println("Consumer is ready")

	// Wait until context is cancelled
	<-ctx.Done()
	log.Println("Context cancelled, closing consumer")
	
	// Wait a bit for consumer to close
	time.Sleep(2 * time.Second)
	return nil
}

// ResilientConsumerHandler implements the sarama.ConsumerGroupHandler interface
type ResilientConsumerHandler struct {
	readyC chan bool
}

func (h *ResilientConsumerHandler) Setup(session sarama.ConsumerGroupSession) error {
	// Notify when consumer is ready
	close(h.readyC)
	return nil
}

func (h *ResilientConsumerHandler) Cleanup(session sarama.ConsumerGroupSession) error {
	return nil
}

func (h *ResilientConsumerHandler) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
	for {
		select {
		case message, ok := <-claim.Messages():
			if !ok {
				return nil
			}
			
			// Process message
			if err := processMessageWithRetry(message); err != nil {
				// Stop process in case of critical error
				if errors.Is(err, ErrCritical) {
					return err
				}
				
				// Skip message and continue in case of temporary error
				log.Printf("Error processing message, skipping: %v", err)
			}
			
			// Mark message
			session.MarkMessage(message, "")
			
		case <-session.Context().Done():
			return nil
		}
	}
}

// Custom error types
var ErrCritical = errors.New("critical error")
var ErrTemporary = errors.New("temporary error")

func processMessageWithRetry(message *sarama.ConsumerMessage) error {
	// Retry strategy
	expBackoff := backoff.NewExponentialBackOff()
	expBackoff.MaxElapsedTime = 30 * time.Second

	return backoff.Retry(
		func() error {
			err := processMessage(message)
			
			// Retry in case of temporary error
			if errors.Is(err, ErrTemporary) {
				return err
			}
			
			// Don't retry for other error types
			return nil
		},
		expBackoff,
	)
}

func processMessage(message *sarama.ConsumerMessage) error {
	// Message processing logic
	// ...
	return nil
}

Error Management and Resilience with C#

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
using System;
using System.Threading;
using System.Threading.Tasks;
using Confluent.Kafka;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using Polly;
using Polly.Retry;

namespace ResilientKafkaConsumer
{
    public class ResilientKafkaConsumerService : BackgroundService
    {
        private readonly ILogger<ResilientKafkaConsumerService> _logger;
        private readonly ConsumerConfig _config;
        private readonly string _topic;
        private readonly AsyncRetryPolicy _retryPolicy;

        public ResilientKafkaConsumerService(
            ILogger<ResilientKafkaConsumerService> logger,
            ConsumerConfig config,
            string topic)
        {
            _logger = logger;
            _config = config;
            _topic = topic;
            
            // Polly retry policy
            _retryPolicy = Policy
                .Handle<Exception>(ex => !(ex is OperationCanceledException) && IsTransientException(ex))
                .WaitAndRetryAsync(
                    5,
                    retryAttempt => TimeSpan.FromSeconds(Math.Pow(2, retryAttempt)),
                    (exception, timeSpan, retryCount, context) =>
                    {
                        _logger.LogWarning(
                            exception, 
                            "Error processing message. Retrying in {RetryTimeSpan}s. Attempt {RetryCount}",
                            timeSpan.TotalSeconds,
                            retryCount);
                    }
                );
        }

        protected override async Task ExecuteAsync(CancellationToken stoppingToken)
        {
            _logger.LogInformation("Resilient Kafka consumer service starting");

            // CancellationToken for graceful shutdown
            using var linkedCts = CancellationTokenSource.CreateLinkedTokenSource(stoppingToken);
            
            try
            {
                await StartConsumerWithRetryAsync(linkedCts.Token);
            }
            catch (OperationCanceledException)
            {
                _logger.LogInformation("Consumer service was cancelled");
            }
            catch (Exception ex)
            {
                _logger.LogError(ex, "Fatal error in consumer service");
                throw;
            }
            finally
            {
                _logger.LogInformation("Resilient Kafka consumer service stopping");
            }
        }

        private async Task StartConsumerWithRetryAsync(CancellationToken cancellationToken)
        {
            // Connection retry policy
            var connectionRetryPolicy = Policy
                .Handle<Exception>(ex => !(ex is OperationCanceledException))
                .WaitAndRetryForeverAsync(
                    retryAttempt => TimeSpan.FromSeconds(Math.Min(30, Math.Pow(2, retryAttempt))),
                    (exception, timeSpan, context) =>
                    {
                        _logger.LogWarning(
                            exception,
                            "Error connecting to Kafka. Retrying in {RetryTimeSpan}s",
                            timeSpan.TotalSeconds);
                    }
                );

            await connectionRetryPolicy.ExecuteAsync(async () =>
            {
                using var consumer = new ConsumerBuilder<Ignore, string>(_config)
                    .SetErrorHandler((_, error) =>
                    {
                        _logger.LogError("Kafka error: {ErrorReason}", error.Reason);
                    })
                    .SetPartitionsAssignedHandler((c, partitions) =>
                    {
                        _logger.LogInformation("Partitions assigned: {Partitions}", string.Join(", ", partitions));
                    })
                    .SetPartitionsRevokedHandler((c, partitions) =>
                    {
                        _logger.LogInformation("Partitions revoked: {Partitions}", string.Join(", ", partitions));
                    })
                    .Build();

                try
                {
                    consumer.Subscribe(_topic);
                    _logger.LogInformation("Subscribed to topic: {Topic}", _topic);

                    while (!cancellationToken.IsCancellationRequested)
                    {
                        try
                        {
                            var consumeResult = consumer.Consume(cancellationToken);
                            
                            // Process message
                            await ProcessMessageWithRetryAsync(consumeResult, cancellationToken);
                            
                            // Commit offset after successful processing
                            try
                            {
                                consumer.Commit(consumeResult);
                            }
                            catch (KafkaException ex)
                            {
                                _logger.LogWarning(ex, "Error committing offset");
                            }
                        }
                        catch (ConsumeException ex)
                        {
                            _logger.LogError(ex, "Error consuming message");
                        }
                    }
                }
                finally
                {
                    // Graceful shutdown
                    try
                    {
                        consumer.Close();
                        _logger.LogInformation("Consumer closed");
                    }
                    catch (Exception ex)
                    {
                        _logger.LogWarning(ex, "Error closing consumer");
                    }
                }
            });
        }

        private async Task ProcessMessageWithRetryAsync(ConsumeResult<Ignore, string> consumeResult, CancellationToken cancellationToken)
        {
            await _retryPolicy.ExecuteAsync(async () =>
            {
                try
                {
                    await ProcessMessageAsync(consumeResult.Message.Value, cancellationToken);
                    _logger.LogInformation(
                        "Processed message at {TopicPartitionOffset}",
                        consumeResult.TopicPartitionOffset);
                }
                catch (Exception ex) when (IsCriticalException(ex))
                {
                    _logger.LogCritical(ex, "Critical error processing message");
                    // Rethrow critical errors
                    throw;
                }
                catch (Exception ex) when (!IsTransientException(ex))
                {
                    _logger.LogError(ex, "Non-transient error processing message");
                    // Rethrow non-transient errors
                    throw;
                }
            });
        }

        private async Task ProcessMessageAsync(string message, CancellationToken cancellationToken)
        {
            // Message processing logic
            await Task.Delay(100, cancellationToken); // Simulated processing
            
            // Random error simulation (for testing)
            if (new Random().Next(100) < 10)
            {
                throw new TransientException("Simulated transient error");
            }
        }

        private bool IsTransientException(Exception ex)
        {
            // Define transient errors
            return ex is TransientException ||
                   ex is KafkaException kafkaEx && IsTransientKafkaError(kafkaEx.Error.Code);
        }

        private bool IsCriticalException(Exception ex)
        {
            // Define critical errors
            return ex is CriticalException ||
                   ex is OutOfMemoryException;
        }

        private bool IsTransientKafkaError(ErrorCode errorCode)
        {
            // Define transient Kafka errors
            return errorCode == ErrorCode.NetworkException ||
                   errorCode == ErrorCode.AllBrokersDown ||
                   errorCode == ErrorCode.NotCoordinatorForGroup ||
                   errorCode == ErrorCode.RequestTimedOut;
        }
    }

    // Custom error types
    public class TransientException : Exception
    {
        public TransientException(string message) : base(message) { }
    }

    public class CriticalException : Exception
    {
        public CriticalException(string message) : base(message) { }
    }
}

Microservice Communication with Kafka

Kafka is an excellent tool for asynchronous communication between microservices. Let’s examine how Kafka can be used for microservice communication in both languages:

Microservice Communication with Golang

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
package main

import (
	"context"
	"encoding/json"
	"log"
	"net/http"
	"os"
	"os/signal"
	"syscall"
	"time"

	"github.com/Shopify/sarama"
	"github.com/gin-gonic/gin"
)

// Event types
const (
	OrderCreated  = "order.created"
	OrderShipped  = "order.shipped"
	OrderDelivered = "order.delivered"
)

// Event structure
type Event struct {
	ID        string      `json:"id"`
	Type      string      `json:"type"`
	Timestamp time.Time   `json:"timestamp"`
	Payload   interface{} `json:"payload"`
}

// Order structure
type Order struct {
	ID          string    `json:"id"`
	CustomerID  string    `json:"customer_id"`
	Products    []Product `json:"products"`
	TotalAmount float64   `json:"total_amount"`
	Status      string    `json:"status"`
}

// Product structure
type Product struct {
	ID       string  `json:"id"`
	Name     string  `json:"name"`
	Quantity int     `json:"quantity"`
	Price    float64 `json:"price"`
}

// Kafka producer
var producer sarama.SyncProducer

func main() {
	// Kafka configuration
	config := sarama.NewConfig()
	config.Producer.RequiredAcks = sarama.WaitForAll
	config.Producer.Retry.Max = 5
	config.Producer.Return.Successes = true

	// Create producer
	var err error
	producer, err = sarama.NewSyncProducer([]string{"localhost:9092"}, config)
	if err != nil {
		log.Fatalf("Failed to create producer: %s", err)
	}
	defer producer.Close()

	// HTTP server
	router := gin.Default()
	router.POST("/orders", createOrder)
	router.PUT("/orders/:id/ship", shipOrder)
	router.PUT("/orders/:id/deliver", deliverOrder)

	// Start HTTP server
	srv := &http.Server{
		Addr:    ":8080",
		Handler: router,
	}

	// Graceful shutdown
	go func() {
		if err := srv.ListenAndServe(); err != nil && err != http.ErrServerClosed {
			log.Fatalf("Failed to start server: %s", err)
		}
	}()

	// Signal capture
	quit := make(chan os.Signal, 1)
	signal.Notify(quit, syscall.SIGINT, syscall.SIGTERM)
	<-quit
	log.Println("Shutting down server...")

	// Close server
	ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
	defer cancel()
	if err := srv.Shutdown(ctx); err != nil {
		log.Fatalf("Server forced to shutdown: %s", err)
	}

	log.Println("Server exiting")
}

// Order creation handler
func createOrder(c *gin.Context) {
	var order Order
	if err := c.ShouldBindJSON(&order); err != nil {
		c.JSON(http.StatusBadRequest, gin.H{"error": err.Error()})
		return
	}

	// Order creation operations
	order.Status = "created"

	// Publish event
	event := Event{
		ID:        generateID(),
		Type:      OrderCreated,
		Timestamp: time.Now(),
		Payload:   order,
	}

	if err := publishEvent(OrderCreated, event); err != nil {
		c.JSON(http.StatusInternalServerError, gin.H{"error": "Failed to publish event"})
		return
	}

	c.JSON(http.StatusCreated, order)
}

// Order shipping handler
func shipOrder(c *gin.Context) {
	id := c.Param("id")
	
	// Find and update order (example)
	order := Order{
		ID:     id,
		Status: "shipped",
	}

	// Publish event
	event := Event{
		ID:        generateID(),
		Type:      OrderShipped,
		Timestamp: time.Now(),
		Payload:   order,
	}

	if err := publishEvent(OrderShipped, event); err != nil {
		c.JSON(http.StatusInternalServerError, gin.H{"error": "Failed to publish event"})
		return
	}

	c.JSON(http.StatusOK, order)
}

// Order delivery handler
func deliverOrder(c *gin.Context) {
	id := c.Param("id")
	
	// Find and update order (example)
	order := Order{
		ID:     id,
		Status: "delivered",
	}

	// Publish event
	event := Event{
		ID:        generateID(),
		Type:      OrderDelivered,
		Timestamp: time.Now(),
		Payload:   order,
	}

	if err := publishEvent(OrderDelivered, event); err != nil {
		c.JSON(http.StatusInternalServerError, gin.H{"error": "Failed to publish event"})
		return
	}

	c.JSON(http.StatusOK, order)
}

// Publish event to Kafka
func publishEvent(eventType string, event Event) error {
	payload, err := json.Marshal(event)
	if err != nil {
		return err
	}

	// Create Kafka message
	msg := &sarama.ProducerMessage{
		Topic: "orders",
		Key:   sarama.StringEncoder(event.ID),
		Value: sarama.ByteEncoder(payload),
		Headers: []sarama.RecordHeader{
			{
				Key:   []byte("event_type"),
				Value: []byte(eventType),
			},
		},
	}

	// Send message
	_, _, err = producer.SendMessage(msg)
	return err
}

// Generate unique ID
func generateID() string {
	return time.Now().Format("20060102150405") + "-" + randomString(6)
}

// Generate random string
func randomString(n int) string {
	const letterBytes = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ"
	b := make([]byte, n)
	for i := range b {
		b[i] = letterBytes[time.Now().UnixNano()%int64(len(letterBytes))]
	}
	return string(b)
}

Microservice Communication with C#

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
using System;
using System.Collections.Generic;
using System.Text.Json;
using System.Threading.Tasks;
using Confluent.Kafka;
using Microsoft.AspNetCore.Builder;
using Microsoft.AspNetCore.Hosting;
using Microsoft.AspNetCore.Http;
using Microsoft.AspNetCore.Mvc;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;

namespace OrderService
{
    // Event types
    public static class EventTypes
    {
        public const string OrderCreated = "order.created";
        public const string OrderShipped = "order.shipped";
        public const string OrderDelivered = "order.delivered";
    }

    // Event structure
    public class Event
    {
        public string Id { get; set; }
        public string Type { get; set; }
        public DateTime Timestamp { get; set; }
        public object Payload { get; set; }
    }

    // Order structure
    public class Order
    {
        public string Id { get; set; }
        public string CustomerId { get; set; }
        public List<Product> Products { get; set; }
        public decimal TotalAmount { get; set; }
        public string Status { get; set; }
    }

    // Product structure
    public class Product
    {
        public string Id { get; set; }
        public string Name { get; set; }
        public int Quantity { get; set; }
        public decimal Price { get; set; }
    }

    // Kafka event publisher
    public interface IEventPublisher
    {
        Task PublishEventAsync(string eventType, Event @event);
    }

    public class KafkaEventPublisher : IEventPublisher
    {
        private readonly IProducer<string, string> _producer;
        private readonly ILogger<KafkaEventPublisher> _logger;

        public KafkaEventPublisher(ILogger<KafkaEventPublisher> logger, ProducerConfig config)
        {
            _logger = logger;
            _producer = new ProducerBuilder<string, string>(config).Build();
        }

        public async Task PublishEventAsync(string eventType, Event @event)
        {
            try
            {
                var json = JsonSerializer.Serialize(@event);
                
                var message = new Message<string, string>
                {
                    Key = @event.Id,
                    Value = json,
                    Headers = new Headers
                    {
                        { "event_type", System.Text.Encoding.UTF8.GetBytes(eventType) }
                    }
                };

                var result = await _producer.ProduceAsync("orders", message);
                _logger.LogInformation(
                    "Published event {EventType} to {TopicPartitionOffset}",
                    eventType, result.TopicPartitionOffset);
            }
            catch (Exception ex)
            {
                _logger.LogError(ex, "Error publishing event {EventType}", eventType);
                throw;
            }
        }
    }

    // Order controller
    [ApiController]
    [Route("api/[controller]")]
    public class OrdersController : ControllerBase
    {
        private readonly IEventPublisher _eventPublisher;
        private readonly ILogger<OrdersController> _logger;

        public OrdersController(IEventPublisher eventPublisher, ILogger<OrdersController> logger)
        {
            _eventPublisher = eventPublisher;
            _logger = logger;
        }

        [HttpPost]
        public async Task<IActionResult> CreateOrder(Order order)
        {
            // Order creation operations
            order.Id = GenerateId();
            order.Status = "created";

            // Publish event
            var @event = new Event
            {
                Id = GenerateId(),
                Type = EventTypes.OrderCreated,
                Timestamp = DateTime.UtcNow,
                Payload = order
            };

            try
            {
                await _eventPublisher.PublishEventAsync(EventTypes.OrderCreated, @event);
                return CreatedAtAction(nameof(GetOrder), new { id = order.Id }, order);
            }
            catch (Exception ex)
            {
                _logger.LogError(ex, "Error creating order");
                return StatusCode(StatusCodes.Status500InternalServerError, "Failed to create order");
            }
        }

        [HttpGet("{id}")]
        public IActionResult GetOrder(string id)
        {
            // In a real application, find order from database
            var order = new Order { Id = id };
            return Ok(order);
        }

        [HttpPut("{id}/ship")]
        public async Task<IActionResult> ShipOrder(string id)
        {
            // In a real application, find and update order from database
            var order = new Order
            {
                Id = id,
                Status = "shipped"
            };

            // Publish event
            var @event = new Event
            {
                Id = GenerateId(),
                Type = EventTypes.OrderShipped,
                Timestamp = DateTime.UtcNow,
                Payload = order
            };

            try
            {
                await _eventPublisher.PublishEventAsync(EventTypes.OrderShipped, @event);
                return Ok(order);
            }
            catch (Exception ex)
            {
                _logger.LogError(ex, "Error shipping order");
                return StatusCode(StatusCodes.Status500InternalServerError, "Failed to ship order");
            }
        }

        [HttpPut("{id}/deliver")]
        public async Task<IActionResult> DeliverOrder(string id)
        {
            // In a real application, find and update order from database
            var order = new Order
            {
                Id = id,
                Status = "delivered"
            };

            // Publish event
            var @event = new Event
            {
                Id = GenerateId(),
                Type = EventTypes.OrderDelivered,
                Timestamp = DateTime.UtcNow,
                Payload = order
            };

            try
            {
                await _eventPublisher.PublishEventAsync(EventTypes.OrderDelivered, @event);
                return Ok(order);
            }
            catch (Exception ex)
            {
                _logger.LogError(ex, "Error delivering order");
                return StatusCode(StatusCodes.Status500InternalServerError, "Failed to deliver order");
            }
        }

        private string GenerateId() => Guid.NewGuid().ToString("N");
    }

    // Startup
    public class Startup
    {
        public void ConfigureServices(IServiceCollection services)
        {
            services.AddControllers();
            
            // Kafka configuration
            var producerConfig = new ProducerConfig
            {
                BootstrapServers = "localhost:9092",
                Acks = Acks.All
            };
            
            services.AddSingleton(producerConfig);
            services.AddSingleton<IEventPublisher, KafkaEventPublisher>();
        }

        public void Configure(IApplicationBuilder app, IWebHostEnvironment env)
        {
            if (env.IsDevelopment())
            {
                app.UseDeveloperExceptionPage();
            }

            app.UseRouting();
            app.UseEndpoints(endpoints =>
            {
                endpoints.MapControllers();
            });
        }
    }

    // Program
    public class Program
    {
        public static void Main(string[] args)
        {
            CreateHostBuilder(args).Build().Run();
        }

        public static IHostBuilder CreateHostBuilder(string[] args) =>
            Host.CreateDefaultBuilder(args)
                .ConfigureWebHostDefaults(webBuilder =>
                {
                    webBuilder.UseStartup<Startup>();
                });
    }
}

Event-Driven Architecture with Kafka

Kafka provides an excellent infrastructure for event-driven architecture. Let’s examine how we can implement this architecture in both languages:

Event-Driven Architecture with Golang

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
package main

import (
	"context"
	"encoding/json"
	"fmt"
	"log"
	"sync"
	"time"

	"github.com/Shopify/sarama"
)

// Event types
const (
	AccountCreated = "account.created"
	MoneyDeposited = "money.deposited"
	MoneyWithdrawn = "money.withdrawn"
)

// Event structure
type Event struct {
	ID        string      `json:"id"`
	Type      string      `json:"type"`
	Timestamp time.Time   `json:"timestamp"`
	Payload   interface{} `json:"payload"`
}

// Account structure
type Account struct {
	ID      string  `json:"id"`
	Owner   string  `json:"owner"`
	Balance float64 `json:"balance"`
}

// Account creation event
type AccountCreatedEvent struct {
	ID    string `json:"id"`
	Owner string `json:"owner"`
}

// Money deposit event
type MoneyDepositedEvent struct {
	AccountID string  `json:"account_id"`
	Amount    float64 `json:"amount"`
}

// Money withdrawal event
type MoneyWithdrawnEvent struct {
	AccountID string  `json:"account_id"`
	Amount    float64 `json:"amount"`
}

// Account projection
type AccountProjection struct {
	accounts     map[string]*Account
	eventHandler map[string]func(Event) error
	mu           sync.RWMutex
}

// Create new account projection
func NewAccountProjection() *AccountProjection {
	ap := &AccountProjection{
		accounts:     make(map[string]*Account),
		eventHandler: make(map[string]func(Event) error),
	}

	// Register event handlers
	ap.eventHandler[AccountCreated] = ap.handleAccountCreated
	ap.eventHandler[MoneyDeposited] = ap.handleMoneyDeposited
	ap.eventHandler[MoneyWithdrawn] = ap.handleMoneyWithdrawn

	return ap
}

// Handle account creation event
func (ap *AccountProjection) handleAccountCreated(event Event) error {
	var payload AccountCreatedEvent
	if err := convertPayload(event.Payload, &payload); err != nil {
		return err
	}

	ap.mu.Lock()
	defer ap.mu.Unlock()

	// Return error if account already exists
	if _, exists := ap.accounts[payload.ID]; exists {
		return fmt.Errorf("account already exists: %s", payload.ID)
	}

	// Create new account
	ap.accounts[payload.ID] = &Account{
		ID:      payload.ID,
		Owner:   payload.Owner,
		Balance: 0,
	}

	return nil
}

// Handle money deposit event
func (ap *AccountProjection) handleMoneyDeposited(event Event) error {
	var payload MoneyDepositedEvent
	if err := convertPayload(event.Payload, &payload); err != nil {
		return err
	}

	ap.mu.Lock()
	defer ap.mu.Unlock()

	// Return error if account doesn't exist
	account, exists := ap.accounts[payload.AccountID]
	if !exists {
		return fmt.Errorf("account not found: %s", payload.AccountID)
	}

	// Update balance
	account.Balance += payload.Amount

	return nil
}

// Handle money withdrawal event
func (ap *AccountProjection) handleMoneyWithdrawn(event Event) error {
	var payload MoneyWithdrawnEvent
	if err := convertPayload(event.Payload, &payload); err != nil {
		return err
	}

	ap.mu.Lock()
	defer ap.mu.Unlock()

	// Return error if account doesn't exist
	account, exists := ap.accounts[payload.AccountID]
	if !exists {
		return fmt.Errorf("account not found: %s", payload.AccountID)
	}

	// Check for insufficient balance
	if account.Balance < payload.Amount {
		return fmt.Errorf("insufficient balance: %.2f < %.2f", account.Balance, payload.Amount)
	}

	// Update balance
	account.Balance -= payload.Amount

	return nil
}

// Process event
func (ap *AccountProjection) ApplyEvent(event Event) error {
	handler, exists := ap.eventHandler[event.Type]
	if !exists {
		return fmt.Errorf("no handler for event type: %s", event.Type)
	}

	return handler(event)
}

// Get account information
func (ap *AccountProjection) GetAccount(id string) (*Account, error) {
	ap.mu.RLock()
	defer ap.mu.RUnlock()

	account, exists := ap.accounts[id]
	if !exists {
		return nil, fmt.Errorf("account not found: %s", id)
	}

	return account, nil
}

// Payload conversion helper function
func convertPayload(src, dst interface{}) error {
	data, err := json.Marshal(src)
	if err != nil {
		return err
	}

	return json.Unmarshal(data, dst)
}

func main() {
	// Kafka consumer configuration
	config := sarama.NewConfig()
	config.Consumer.Return.Errors = true
	config.Consumer.Offsets.Initial = sarama.OffsetOldest

	// Create consumer
	consumer, err := sarama.NewConsumer([]string{"localhost:9092"}, config)
	if err != nil {
		log.Fatalf("Error creating consumer: %v", err)
	}
	defer consumer.Close()

	// Create partition consumer from event topic
	partitionConsumer, err := consumer.ConsumePartition("account-events", 0, sarama.OffsetOldest)
	if err != nil {
		log.Fatalf("Error creating partition consumer: %v", err)
	}
	defer partitionConsumer.Close()

	// Create account projection
	projection := NewAccountProjection()

	// Capture signals
	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()

	// Consume events
	go func() {
		for {
			select {
			case msg := <-partitionConsumer.Messages():
				var event Event
				if err := json.Unmarshal(msg.Value, &event); err != nil {
					log.Printf("Error unmarshaling event: %v", err)
					continue
				}

				if err := projection.ApplyEvent(event); err != nil {
					log.Printf("Error applying event: %v", err)
					continue
				}

				log.Printf("Applied event: %s", event.Type)

			case err := <-partitionConsumer.Errors():
				log.Printf("Error consuming message: %v", err)

			case <-ctx.Done():
				return
			}
		}
	}()

	// Create and apply sample events
	events := []Event{
		{
			ID:        "evt-1",
			Type:      AccountCreated,
			Timestamp: time.Now(),
			Payload: AccountCreatedEvent{
				ID:    "acc-1",
				Owner: "John Doe",
			},
		},
		{
			ID:        "evt-2",
			Type:      MoneyDeposited,
			Timestamp: time.Now(),
			Payload: MoneyDepositedEvent{
				AccountID: "acc-1",
				Amount:    100.0,
			},
		},
		{
			ID:        "evt-3",
			Type:      MoneyWithdrawn,
			Timestamp: time.Now(),
			Payload: MoneyWithdrawnEvent{
				AccountID: "acc-1",
				Amount:    30.0,
			},
		},
	}

	for _, event := range events {
		if err := projection.ApplyEvent(event); err != nil {
			log.Printf("Error applying event: %v", err)
			continue
		}
		log.Printf("Applied event: %s", event.Type)
	}

	// Check account state
	account, err := projection.GetAccount("acc-1")
	if err != nil {
		log.Fatalf("Error getting account: %v", err)
	}

	log.Printf("Account state: ID=%s, Owner=%s, Balance=%.2f",
		account.ID, account.Owner, account.Balance)

	// Keep program running
	select {}
}

Event-Driven Architecture with C#

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Text.Json;
using System.Threading;
using System.Threading.Tasks;
using Confluent.Kafka;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;

namespace EventSourcingExample
{
      // Event types
    public static class EventTypes
    {
        public const string AccountCreated = "account.created";
        public const string MoneyDeposited = "money.deposited";
        public const string MoneyWithdrawn = "money.withdrawn";
    }

    // Event structure
    public class Event
    {
        public string Id { get; set; }
        public string Type { get; set; }
        public DateTime Timestamp { get; set; }
        public JsonElement Payload { get; set; }
    }

    // Account structure
    public class Account
    {
        public string Id { get; set; }
        public string Owner { get; set; }
        public decimal Balance { get; set; }
    }

    // Account creation event
    public class AccountCreatedEvent
    {
        public string Id { get; set; }
        public string Owner { get; set; }
    }

    // Money deposit event
    public class MoneyDepositedEvent
    {
        public string AccountId { get; set; }
        public decimal Amount { get; set; }
    }

    // Money withdrawal event
    public class MoneyWithdrawnEvent
    {
        public string AccountId { get; set; }
        public decimal Amount { get; set; }
    }

    // Account projection
    public class AccountProjection
    {
        private readonly ConcurrentDictionary<string, Account> _accounts = new ConcurrentDictionary<string, Account>();
        private readonly Dictionary<string, Func<Event, Task>> _eventHandlers;
        private readonly ILogger<AccountProjection> _logger;

        public AccountProjection(ILogger<AccountProjection> logger)
        {
            _logger = logger;
            _eventHandlers = new Dictionary<string, Func<Event, Task>>
            {
                { EventTypes.AccountCreated, HandleAccountCreatedAsync },
                { EventTypes.MoneyDeposited, HandleMoneyDepositedAsync },
                { EventTypes.MoneyWithdrawn, HandleMoneyWithdrawnAsync }
            };
        }

        // Handle account creation event
        private async Task HandleAccountCreatedAsync(Event @event)
        {
            var payload = JsonSerializer.Deserialize<AccountCreatedEvent>(@event.Payload.GetRawText());
            
            if (_accounts.ContainsKey(payload.Id))
            {
                _logger.LogWarning("Account already exists: {AccountId}", payload.Id);
                return;
            }

            var account = new Account
            {
                Id = payload.Id,
                Owner = payload.Owner,
                Balance = 0
            };

            if (_accounts.TryAdd(payload.Id, account))
            {
                _logger.LogInformation("Account created: {AccountId}", payload.Id);
            }
            
            await Task.CompletedTask;
        }

        // Handle money deposit event
        private async Task HandleMoneyDepositedAsync(Event @event)
        {
            var payload = JsonSerializer.Deserialize<MoneyDepositedEvent>(@event.Payload.GetRawText());
            
            if (!_accounts.TryGetValue(payload.AccountId, out var account))
            {
                _logger.LogWarning("Account not found: {AccountId}", payload.AccountId);
                return;
            }

            account.Balance += payload.Amount;
            _logger.LogInformation("Deposited {Amount} to account {AccountId}, new balance: {Balance}", 
                payload.Amount, payload.AccountId, account.Balance);
            
            await Task.CompletedTask;
        }

        // Handle money withdrawal event
        private async Task HandleMoneyWithdrawnAsync(Event @event)
        {
            var payload = JsonSerializer.Deserialize<MoneyWithdrawnEvent>(@event.Payload.GetRawText());
            
            if (!_accounts.TryGetValue(payload.AccountId, out var account))
            {
                _logger.LogWarning("Account not found: {AccountId}", payload.AccountId);
                return;
            }

            if (account.Balance < payload.Amount)
            {
                _logger.LogWarning("Insufficient balance: {Balance} < {Amount}", account.Balance, payload.Amount);
                return;
            }

            account.Balance -= payload.Amount;
            _logger.LogInformation("Withdrawn {Amount} from account {AccountId}, new balance: {Balance}", 
                payload.Amount, payload.AccountId, account.Balance);
            
            await Task.CompletedTask;
        }

        // Process event
        public async Task ApplyEventAsync(Event @event)
        {
            if (_eventHandlers.TryGetValue(@event.Type, out var handler))
            {
                await handler(@event);
            }
            else
            {
                _logger.LogWarning("No handler for event type: {EventType}", @event.Type);
            }
        }

        // Get account information
        public Account GetAccount(string id)
        {
            if (_accounts.TryGetValue(id, out var account))
            {
                return account;
            }
            
            return null;
        }

        // Get all accounts
        public IEnumerable<Account> GetAllAccounts()
        {
            return _accounts.Values;
        }
    }

    // Kafka event consumer service
    public class EventConsumerService : BackgroundService
    {
        private readonly ILogger<EventConsumerService> _logger;
        private readonly ConsumerConfig _config;
        private readonly AccountProjection _projection;

        public EventConsumerService(
            ILogger<EventConsumerService> logger,
            ConsumerConfig config,
            AccountProjection projection)
        {
            _logger = logger;
            _config = config;
            _projection = projection;
        }

        protected override async Task ExecuteAsync(CancellationToken stoppingToken)
        {
            _logger.LogInformation("Event consumer service starting");

            using var consumer = new ConsumerBuilder<string, string>(_config)
                .SetErrorHandler((_, e) => _logger.LogError("Kafka error: {Reason}", e.Reason))
                .Build();

            consumer.Subscribe("account-events");

            try
            {
                while (!stoppingToken.IsCancellationRequested)
                {
                    try
                    {
                        var consumeResult = consumer.Consume(stoppingToken);
                        
                        _logger.LogInformation("Consumed event from {TopicPartitionOffset}", consumeResult.TopicPartitionOffset);
                        
                        var @event = JsonSerializer.Deserialize<Event>(consumeResult.Message.Value);
                        await _projection.ApplyEventAsync(@event);
                        
                        // Commit offset
                        consumer.Commit(consumeResult);
                    }
                    catch (ConsumeException e)
                    {
                        _logger.LogError("Consume error: {Error}", e.Error.Reason);
                    }
                    catch (Exception e)
                    {
                        _logger.LogError(e, "Error processing event");
                    }
                }
            }
            finally
            {
                consumer.Close();
            }
        }
    }

    // Event publisher
    public interface IEventPublisher
    {
        Task PublishEventAsync<T>(string type, string id, T payload);
    }

    public class KafkaEventPublisher : IEventPublisher
    {
        private readonly IProducer<string, string> _producer;
        private readonly ILogger<KafkaEventPublisher> _logger;

        public KafkaEventPublisher(ProducerConfig config, ILogger<KafkaEventPublisher> logger)
        {
            _producer = new ProducerBuilder<string, string>(config).Build();
            _logger = logger;
        }

        public async Task PublishEventAsync<T>(string type, string id, T payload)
        {
            var @event = new Event
            {
                Id = id,
                Type = type,
                Timestamp = DateTime.UtcNow,
                Payload = JsonDocument.Parse(JsonSerializer.Serialize(payload)).RootElement
            };

            var json = JsonSerializer.Serialize(@event);
            
            var message = new Message<string, string>
            {
                Key = id,
                Value = json
            };

            try
            {
                var result = await _producer.ProduceAsync("account-events", message);
                _logger.LogInformation("Published event {EventType} to {TopicPartitionOffset}", 
                    type, result.TopicPartitionOffset);
            }
            catch (Exception ex)
            {
                _logger.LogError(ex, "Error publishing event {EventType}", type);
                throw;
            }
        }
    }

    // Program
    public class Program
    {
        public static async Task Main(string[] args)
        {
            var host = Host.CreateDefaultBuilder(args)
                .ConfigureServices((hostContext, services) =>
                {
                    // Kafka configuration
                    services.AddSingleton(new ConsumerConfig
                    {
                        BootstrapServers = "localhost:9092",
                        GroupId = "account-projection",
                        AutoOffsetReset = AutoOffsetReset.Earliest,
                        EnableAutoCommit = false
                    });

                    services.AddSingleton(new ProducerConfig
                    {
                        BootstrapServers = "localhost:9092",
                        Acks = Acks.All
                    });

                    // Services
                    services.AddSingleton<AccountProjection>();
                    services.AddSingleton<IEventPublisher, KafkaEventPublisher>();
                    services.AddHostedService<EventConsumerService>();
                })
                .Build();

            // Create sample events
            var publisher = host.Services.GetRequiredService<IEventPublisher>();
            var logger = host.Services.GetRequiredService<ILogger<Program>>();

            // Create account
            await publisher.PublishEventAsync(
                EventTypes.AccountCreated,
                Guid.NewGuid().ToString(),
                new AccountCreatedEvent
                {
                    Id = "acc-1",
                    Owner = "John Doe"
                });

            // Deposit money
            await publisher.PublishEventAsync(
                EventTypes.MoneyDeposited,
                Guid.NewGuid().ToString(),
                new MoneyDepositedEvent
                {
                    AccountId = "acc-1",
                    Amount = 100
                });

            // Withdraw money
            await publisher.PublishEventAsync(
                EventTypes.MoneyWithdrawn,
                Guid.NewGuid().ToString(),
                new MoneyWithdrawnEvent
                {
                    AccountId = "acc-1",
                    Amount = 30
                });

            // Start host
            await host.RunAsync();
        }
    }
}

Conclusion

In this article, we have examined developing Apache Kafka applications with Golang and C# comparatively. Both languages offer strong libraries and capabilities for working with Kafka.

Advantages of Golang

  • Low memory usage and high performance
  • Easy concurrency management with goroutines
  • Single binary distribution convenience
  • Mature and high-performance libraries like Sarama

Advantages of C#

  • Rich .NET ecosystem and enterprise integration capabilities
  • Strong type system and object-oriented programming features
  • Easy asynchronous programming with async/await
  • Enterprise support from Microsoft and Confluent

Both languages may be suitable for different use cases. It’s important to choose the most appropriate language based on your project’s requirements, your team’s expertise, and your existing infrastructure.

Kafka continues to be a powerful platform forming the backbone of modern distributed systems. Leveraging Kafka’s capabilities in microservice architectures, event-driven systems, and real-time data processing scenarios will help you develop scalable and resilient applications.

Resources