.NET ile Apache Kafka Kullanımı (Confluent.Kafka Kod Örnekleriyle)

Github ProjeAyazDuru.Samples.Kafka

Apache Kafka Nedir?

Apache Kafka, ilk olarak LinkedIn tarafından geliştirilen ve daha sonra Apache Software Foundation’a bağışlanan, açık kaynak kodlu bir dağıtık stream işleme platformudur. Kafka, büyük veri akışlarını işlemek, saklamak ve dağıtmak için tasarlanmış bir mesajlaşma altyapısıdır.

Kafka’yı bir “veri otobanı” gibi hayal edebiliriz: Farklı uygulamalar, sistemler veya servisler Kafka üzerinden birbirlerine mesajlar gönderir ve alır. Kafka'nın en büyük avantajı, yüksek hacimli veriyi kayıpsız ve hızlı şekilde iletebilmesidir.

Başlıca Özellikleri

  • Dağıtık ve Ölçeklenebilir Mimari: Kafka, birden fazla sunucu (broker) üzerinde çalışır. Yük arttıkça yeni broker ekleyerek ölçeklenebilir.
  • Yüksek Performans: Milyonlarca mesajı saniyeler içinde işleyebilir. Benchmark’larda rakiplerini genellikle geride bırakır.
  • Dayanıklılık ve Veri Güvenliği: Tüm mesajlar disk üzerinde saklanır. Broker’lar arasında replikasyon ile veri kaybı önlenir.
  • Gerçek Zamanlı Veri Akışı: Mesajlar gönderildiği anda tüketicilere ulaşır.
  • Esnek Entegrasyon: Kafka, Java başta olmak üzere .NET, Python, Go ve daha birçok dil için kütüphanelere sahiptir.
  • Kuyruk & Yayın/Abone Modeli: Hem klasik kuyruk (queue) hem de publish-subscribe mantığını destekler.
  • Event Sourcing ve Log Tutma: Her olay bir mesajdır, geçmişteki olaylara tekrar erişmek mümkündür.
  • Yüksek Erişilebilirlik: Broker ve partition mimarisi ile sistemin bir parçası arızalansa bile hizmet devam eder.

Temel Kavramlar – Kargo Şirketi ile Anlatım

Kafka’nın dünyasına girmek için önce temel kavramları öğrenelim. Her birini kargo şirketi örneğiyle somutlaştıracağım:

1. Producer (Üretici)

Mesajı (olayı) Kafka’ya gönderen uygulama veya servis.

Örnek: Sipariş sistemi “Kargo Yola Çıktı” bilgisini Kafka’ya gönderir.

2. Consumer (Tüketici)

Mesajı Kafka’dan okuyan uygulama veya servis.

Örnek: Müşteri bilgilendirme servisi, yeni bir kargo durumu geldiğinde SMS gönderir.

3. Topic

Mesajların mantıksal olarak gruplanmasını sağlayan başlıklar.

Örnek: “KargoDurumlari” adında bir topic, tüm kargo hareketlerini içerir.

4. Partition

Topic’ler altındaki alt bölümler. Yük dağıtımı ve paralel okuma için kullanılır.

Örnek: “KargoDurumlari” topic’i 4 partition’a sahip. Her partition ayrı bir broker’da tutuluyor.

5. Broker

Kafka sunucusu. Birden fazla broker bir araya gelerek bir cluster oluşturur.

Örnek: Şirketin veri merkezi içinde 3 Kafka broker’ı var.

6. Zookeeper

Kafka cluster’ının yönetimi ve koordinasyonu için kullanılan yardımcı servis.

Not: Yeni Kafka sürümlerinde Zookeeper’sız mod (KRaft) da yaygınlaşıyor.

7. Consumer Group

Birden fazla consumer’ın aynı topic’i okurken yükü bölüştürdüğü grup.

Örnek: Müşteri bildirim servisi 3 makinede çalışıyor ve aynı grupta. Partition’lar aralarında dağıtılır.

Örneğin:

  1. Sipariş oluşturulunca “KargoDurumlari” topic’ine bir mesaj gönderilir (Producer).
  2. Birden fazla servis bu topic’i dinleyebilir (Consumer). Biri müşteri bilgilendirme, diğeri lojistik analiz.
  3. Partition’lar sayesinde yüksek performans ve paralel okuma sağlanır.
  4. Mesajlar disk üzerinde tutulur ve kaybolmaz.

Kafka Nerelerde Kullanılır?

Kafka, çok çeşitli alanlarda kullanılabilir. Bazı örnekler:

  • Mikroservis mimarileri: Servisler arasında olay tabanlı iletişim.
  • Gerçek zamanlı veri analizi: Finansal hareketler, anlık istatistikler.
  • IoT cihazlarından veri toplama: Sensör verileri, cihaz logları.
  • Kargo ve lojistik: Paket hareketleri, teslimat bildirimleri.
  • Log toplama ve merkezi izleme: Farklı uygulamalardan log akışı.
  • Oyun sunucuları: Oyuncu hareketleri, skor güncellemeleri.
  • Veri gölleri ve ETL süreçleri: Büyük veri işleme altyapısı.

Kafka Neden Kullanılır?

Kafka’nın tercih edilme nedenleri şunlardır:

  • Gerçek zamanlı veri işleme ihtiyacı: Anında aksiyon almak.
  • Yüksek hacimde veri: Milyonlarca mesajı kayıpsız, hızlı iletmek.
  • Sistemler arası gevşek bağlılık: Servislerin birbirinden bağımsız çalışabilmesi.
  • Dayanıklılık ve güvenilirlik: Mesaj kaybı olmadan iletişim.
  • Event sourcing: Geçmişe dönük olayların tekrar oynatılması.

Kafka’nın Avantajları ve Dikkat Edilecekler

Avantajlar:

  • Mesaj kaybı olmadan veri akışı.
  • Farklı uygulamaların kolayca entegre olması.
  • Geçmişe dönük veri analizi ve event replay.

Dikkat Edilecekler:

  • Konfigürasyon ve cluster yönetimi karmaşık olabilir.
  • Topic ve partition sayısı iyi planlanmalı.
  • Mesaj formatı (JSON, Avro, Protobuf) net belirlenmeli.
  • Güvenlik ve erişim kontrolü ihmal edilmemeli.

Docker ile Hızlı Kafka Kurulumu

Kafka’yı yerel ortamda hızlıca ayağa kaldırmak için aşağıdaki docker komutunu kullanabilirsiniz.

Kafka:

docker run -d --name kafka -p 9092:9092 -p 9093:9093 -e KAFKA_NODE_ID=1 -e KAFKA_PROCESS_ROLES=broker,controller -e KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092,CONTROLLER://0.0.0.0:9093 -e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://localhost:9092 -e KAFKA_CONTROLLER_QUORUM_VOTERS=1@localhost:9093 -e KAFKA_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT -e KAFKA_INTER_BROKER_LISTENER_NAME=PLAINTEXT -e KAFKA_CONTROLLER_LISTENER_NAMES=CONTROLLER -e KAFKA_LOG_DIRS=/var/lib/kafka/data -e KAFKA_AUTO_CREATE_TOPICS_ENABLE=true -e KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR=1 -e KAFKA_LOG_RETENTION_HOURS=168 -e KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS=0 -e CLUSTER_ID=Mk3OEYBSD34fcwNTJENDM2Qk -v kafka_data:/var/lib/kafka/data apache/kafka:latest

 

AyazDuru.Samples.Kafka.Consumer Projesi Örnek Kodları:

Program.cs

static void Main(string[] args)
{
    // Host yapılandırmasını başlatır (uygulama yaşam döngüsünü yönetir)
    var builder = Host.CreateDefaultBuilder(args);

    // Servisleri yapılandırır ve ConsumerService'i arka plan servisi olarak ekler
    builder.ConfigureServices(services =>
    {
        services.AddHostedService<ConsumerService>();
    });

    // Host'u oluşturur
    var host = builder.Build();

    // Uygulamayı başlatır ve arka plan servislerini çalıştırır
    host.Run();
}

ConsumerService.cs

// Kafka'dan mesajları arka planda dinleyen servis
public class ConsumerService : BackgroundService
{
    // Kafka'dan mesaj almak için kullanılan consumer nesnesi
    private readonly IConsumer<Ignore, string> _consumer;

    // Servis başlatılırken consumer yapılandırılır
    public ConsumerService(IConfiguration configuration)
    {
        // Kafka consumer yapılandırması
        var consumerConfig = new ConsumerConfig
        {
            BootstrapServers = "localhost:9092", // Kafka sunucu adresi
            GroupId = "MessagesConsumerGroup",   // Consumer group adı
            AutoOffsetReset = AutoOffsetReset.Earliest // En eski mesajdan başlat
        };

        // Consumer nesnesi oluşturuluyor
        _consumer = new ConsumerBuilder<Ignore, string>(consumerConfig).Build();
    }

    // Arka planda sürekli olarak Kafka'dan mesaj dinleyen ana metot
    protected override async Task ExecuteAsync(CancellationToken stoppingToken)
    {
        // "Messages" adlı topic'e abone olunur
        _consumer.Subscribe("Messages");
        Console.WriteLine("Kafka'dan mesaj bekleniyor...");

        // Servis durdurulana kadar sürekli dinleme yapılır
        while (!stoppingToken.IsCancellationRequested)
        {
            // Mesaj işleme metodu çağrılır
            ProcessKafkaMessage(stoppingToken);

            // 1 dakika beklenir (asenkron bekleme eksik, istenirse await eklenebilir)
            Task.Delay(TimeSpan.FromMinutes(1), stoppingToken);
        }

        // Consumer kapatılır
        _consumer.Close();
    }

    // Kafka'dan gelen mesajı işleyen metot
    public void ProcessKafkaMessage(CancellationToken stoppingToken)
    {
        try
        {
            // Mesaj alınır
            var consumeResult = _consumer.Consume(stoppingToken);

            // Mesaj içeriği alınır
            var message = consumeResult.Message.Value;

            // Mesaj konsola yazdırılır
            Console.WriteLine($"Kafka Gelen Mesaj: {message}");
        }
        catch (Exception ex)
        {
            // Hata durumunda konsola hata mesajı yazdırılır
            Console.WriteLine($"Kafka hata meydana geldi: {ex.Message}");
        }
    }
}

AyazDuru.Samples.Kafka.Publisher Projesi Örnek Kodları:

Index.cshtml.cs

 public class IndexModel : PageModel
 {
     private readonly ILogger<IndexModel> _logger;

     // Kafka'ya mesaj göndermek için kullanılan servis
     private readonly ProducerService _producerService;

     // Razor Page'den gelen mesajı tutan property
     [BindProperty]
     public string Message { get; set; }
    
     // Bağımlılıkları (logger ve producerService) enjekte eden constructor
     public IndexModel(ILogger<IndexModel> logger, ProducerService producerService)
     {
         _logger = logger;
         _producerService = producerService;
     }

     public void OnGet()
     {

     }

     // Form gönderildiğinde çalışan ve mesajı Kafka'ya ileten metot
     public async Task<IActionResult> OnPostAsync()
     {
         // Mesajı "Messages" adlı Kafka topic'ine gönderir
         await _producerService.PublishAsync("Messages", Message);
         return RedirectToPage();
     }
 }

ProducerService.cs

  public class ProducerService    
  {
      // Kafka'ya mesaj göndermek için kullanılan producer nesnesi
      private readonly IProducer<Null, string> _producer;

      // ProducerService sınıfı başlatılırken Kafka producer'ı yapılandırılır
      public ProducerService()
      {
          // Kafka sunucusunun adresini belirten yapılandırma
          var producerconfig = new ProducerConfig
          {
              BootstrapServers = "localhost:9092"
          };

          // Belirtilen yapılandırma ile producer nesnesi oluşturuluyor
          _producer = new ProducerBuilder<Null, string>(producerconfig).Build();
      }

      // Belirtilen topic'e asenkron olarak mesaj gönderir
      public async Task PublishAsync(string topic, string message)
      {           
          // Mesajı Kafka'ya gönderir
          await _producer.ProduceAsync(topic, new Message<Null, string> { Value = message });
      }
  }

Program.cs

  //Producer servisini ekliyoruz.
  builder.Services.AddSingleton<ProducerService>();

İki projeyide aynı anda çalıştırdığımız sonuç olarak aşağıdaki gibi konsol uygulamamıza mesajlar gelmeye başlayacaktır.

İyi çalışmalar dilerim.

Yorumlar kapalı