
Github Proje: AyazDuru.Samples.Kafka
Apache Kafka Nedir?
Apache Kafka, ilk olarak LinkedIn tarafından geliştirilen ve daha sonra Apache Software Foundation’a bağışlanan, açık kaynak kodlu bir dağıtık stream işleme platformudur. Kafka, büyük veri akışlarını işlemek, saklamak ve dağıtmak için tasarlanmış bir mesajlaşma altyapısıdır.
Kafka’yı bir “veri otobanı” gibi hayal edebiliriz: Farklı uygulamalar, sistemler veya servisler Kafka üzerinden birbirlerine mesajlar gönderir ve alır. Kafka'nın en büyük avantajı, yüksek hacimli veriyi kayıpsız ve hızlı şekilde iletebilmesidir.
Başlıca Özellikleri
- Dağıtık ve Ölçeklenebilir Mimari: Kafka, birden fazla sunucu (broker) üzerinde çalışır. Yük arttıkça yeni broker ekleyerek ölçeklenebilir.
- Yüksek Performans: Milyonlarca mesajı saniyeler içinde işleyebilir. Benchmark’larda rakiplerini genellikle geride bırakır.
- Dayanıklılık ve Veri Güvenliği: Tüm mesajlar disk üzerinde saklanır. Broker’lar arasında replikasyon ile veri kaybı önlenir.
- Gerçek Zamanlı Veri Akışı: Mesajlar gönderildiği anda tüketicilere ulaşır.
- Esnek Entegrasyon: Kafka, Java başta olmak üzere .NET, Python, Go ve daha birçok dil için kütüphanelere sahiptir.
- Kuyruk & Yayın/Abone Modeli: Hem klasik kuyruk (queue) hem de publish-subscribe mantığını destekler.
- Event Sourcing ve Log Tutma: Her olay bir mesajdır, geçmişteki olaylara tekrar erişmek mümkündür.
- Yüksek Erişilebilirlik: Broker ve partition mimarisi ile sistemin bir parçası arızalansa bile hizmet devam eder.
Temel Kavramlar – Kargo Şirketi ile Anlatım
Kafka’nın dünyasına girmek için önce temel kavramları öğrenelim. Her birini kargo şirketi örneğiyle somutlaştıracağım:
Mesajı (olayı) Kafka’ya gönderen uygulama veya servis.
Örnek: Sipariş sistemi “Kargo Yola Çıktı” bilgisini Kafka’ya gönderir.
Mesajı Kafka’dan okuyan uygulama veya servis.
Örnek: Müşteri bilgilendirme servisi, yeni bir kargo durumu geldiğinde SMS gönderir.
Mesajların mantıksal olarak gruplanmasını sağlayan başlıklar.
Örnek: “KargoDurumlari” adında bir topic, tüm kargo hareketlerini içerir.
Topic’ler altındaki alt bölümler. Yük dağıtımı ve paralel okuma için kullanılır.
Örnek: “KargoDurumlari” topic’i 4 partition’a sahip. Her partition ayrı bir broker’da tutuluyor.
Kafka sunucusu. Birden fazla broker bir araya gelerek bir cluster oluşturur.
Örnek: Şirketin veri merkezi içinde 3 Kafka broker’ı var.
Kafka cluster’ının yönetimi ve koordinasyonu için kullanılan yardımcı servis.
Not: Yeni Kafka sürümlerinde Zookeeper’sız mod (KRaft) da yaygınlaşıyor.
Birden fazla consumer’ın aynı topic’i okurken yükü bölüştürdüğü grup.
Örnek: Müşteri bildirim servisi 3 makinede çalışıyor ve aynı grupta. Partition’lar aralarında dağıtılır.
- Sipariş oluşturulunca “KargoDurumlari” topic’ine bir mesaj gönderilir (Producer).
- Birden fazla servis bu topic’i dinleyebilir (Consumer). Biri müşteri bilgilendirme, diğeri lojistik analiz.
- Partition’lar sayesinde yüksek performans ve paralel okuma sağlanır.
- Mesajlar disk üzerinde tutulur ve kaybolmaz.
Kafka Nerelerde Kullanılır?
Kafka, çok çeşitli alanlarda kullanılabilir. Bazı örnekler:
- Mikroservis mimarileri: Servisler arasında olay tabanlı iletişim.
- Gerçek zamanlı veri analizi: Finansal hareketler, anlık istatistikler.
- IoT cihazlarından veri toplama: Sensör verileri, cihaz logları.
- Kargo ve lojistik: Paket hareketleri, teslimat bildirimleri.
- Log toplama ve merkezi izleme: Farklı uygulamalardan log akışı.
- Oyun sunucuları: Oyuncu hareketleri, skor güncellemeleri.
- Veri gölleri ve ETL süreçleri: Büyük veri işleme altyapısı.
Kafka’nın tercih edilme nedenleri şunlardır:
- Gerçek zamanlı veri işleme ihtiyacı: Anında aksiyon almak.
- Yüksek hacimde veri: Milyonlarca mesajı kayıpsız, hızlı iletmek.
- Sistemler arası gevşek bağlılık: Servislerin birbirinden bağımsız çalışabilmesi.
- Dayanıklılık ve güvenilirlik: Mesaj kaybı olmadan iletişim.
- Event sourcing: Geçmişe dönük olayların tekrar oynatılması.
Kafka’nın Avantajları ve Dikkat Edilecekler
Avantajlar:
- Mesaj kaybı olmadan veri akışı.
- Farklı uygulamaların kolayca entegre olması.
- Geçmişe dönük veri analizi ve event replay.
Dikkat Edilecekler:
- Konfigürasyon ve cluster yönetimi karmaşık olabilir.
- Topic ve partition sayısı iyi planlanmalı.
- Mesaj formatı (JSON, Avro, Protobuf) net belirlenmeli.
- Güvenlik ve erişim kontrolü ihmal edilmemeli.
Docker ile Hızlı Kafka Kurulumu
Kafka’yı yerel ortamda hızlıca ayağa kaldırmak için aşağıdaki docker komutunu kullanabilirsiniz.
Kafka:
docker run -d --name kafka -p 9092:9092 -p 9093:9093 -e KAFKA_NODE_ID=1 -e KAFKA_PROCESS_ROLES=broker,controller -e KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092,CONTROLLER://0.0.0.0:9093 -e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://localhost:9092 -e KAFKA_CONTROLLER_QUORUM_VOTERS=1@localhost:9093 -e KAFKA_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT -e KAFKA_INTER_BROKER_LISTENER_NAME=PLAINTEXT -e KAFKA_CONTROLLER_LISTENER_NAMES=CONTROLLER -e KAFKA_LOG_DIRS=/var/lib/kafka/data -e KAFKA_AUTO_CREATE_TOPICS_ENABLE=true -e KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR=1 -e KAFKA_LOG_RETENTION_HOURS=168 -e KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS=0 -e CLUSTER_ID=Mk3OEYBSD34fcwNTJENDM2Qk -v kafka_data:/var/lib/kafka/data apache/kafka:latest
AyazDuru.Samples.Kafka.Consumer Projesi Örnek Kodları:
Program.cs
static void Main(string[] args)
{
// Host yapılandırmasını başlatır (uygulama yaşam döngüsünü yönetir)
var builder = Host.CreateDefaultBuilder(args);
// Servisleri yapılandırır ve ConsumerService'i arka plan servisi olarak ekler
builder.ConfigureServices(services =>
{
services.AddHostedService<ConsumerService>();
});
// Host'u oluşturur
var host = builder.Build();
// Uygulamayı başlatır ve arka plan servislerini çalıştırır
host.Run();
}
ConsumerService.cs
// Kafka'dan mesajları arka planda dinleyen servis
public class ConsumerService : BackgroundService
{
// Kafka'dan mesaj almak için kullanılan consumer nesnesi
private readonly IConsumer<Ignore, string> _consumer;
// Servis başlatılırken consumer yapılandırılır
public ConsumerService(IConfiguration configuration)
{
// Kafka consumer yapılandırması
var consumerConfig = new ConsumerConfig
{
BootstrapServers = "localhost:9092", // Kafka sunucu adresi
GroupId = "MessagesConsumerGroup", // Consumer group adı
AutoOffsetReset = AutoOffsetReset.Earliest // En eski mesajdan başlat
};
// Consumer nesnesi oluşturuluyor
_consumer = new ConsumerBuilder<Ignore, string>(consumerConfig).Build();
}
// Arka planda sürekli olarak Kafka'dan mesaj dinleyen ana metot
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
// "Messages" adlı topic'e abone olunur
_consumer.Subscribe("Messages");
Console.WriteLine("Kafka'dan mesaj bekleniyor...");
// Servis durdurulana kadar sürekli dinleme yapılır
while (!stoppingToken.IsCancellationRequested)
{
// Mesaj işleme metodu çağrılır
ProcessKafkaMessage(stoppingToken);
// 1 dakika beklenir (asenkron bekleme eksik, istenirse await eklenebilir)
Task.Delay(TimeSpan.FromMinutes(1), stoppingToken);
}
// Consumer kapatılır
_consumer.Close();
}
// Kafka'dan gelen mesajı işleyen metot
public void ProcessKafkaMessage(CancellationToken stoppingToken)
{
try
{
// Mesaj alınır
var consumeResult = _consumer.Consume(stoppingToken);
// Mesaj içeriği alınır
var message = consumeResult.Message.Value;
// Mesaj konsola yazdırılır
Console.WriteLine($"Kafka Gelen Mesaj: {message}");
}
catch (Exception ex)
{
// Hata durumunda konsola hata mesajı yazdırılır
Console.WriteLine($"Kafka hata meydana geldi: {ex.Message}");
}
}
}
AyazDuru.Samples.Kafka.Publisher Projesi Örnek Kodları:
Index.cshtml.cs
public class IndexModel : PageModel
{
private readonly ILogger<IndexModel> _logger;
// Kafka'ya mesaj göndermek için kullanılan servis
private readonly ProducerService _producerService;
// Razor Page'den gelen mesajı tutan property
[BindProperty]
public string Message { get; set; }
// Bağımlılıkları (logger ve producerService) enjekte eden constructor
public IndexModel(ILogger<IndexModel> logger, ProducerService producerService)
{
_logger = logger;
_producerService = producerService;
}
public void OnGet()
{
}
// Form gönderildiğinde çalışan ve mesajı Kafka'ya ileten metot
public async Task<IActionResult> OnPostAsync()
{
// Mesajı "Messages" adlı Kafka topic'ine gönderir
await _producerService.PublishAsync("Messages", Message);
return RedirectToPage();
}
}
ProducerService.cs
public class ProducerService
{
// Kafka'ya mesaj göndermek için kullanılan producer nesnesi
private readonly IProducer<Null, string> _producer;
// ProducerService sınıfı başlatılırken Kafka producer'ı yapılandırılır
public ProducerService()
{
// Kafka sunucusunun adresini belirten yapılandırma
var producerconfig = new ProducerConfig
{
BootstrapServers = "localhost:9092"
};
// Belirtilen yapılandırma ile producer nesnesi oluşturuluyor
_producer = new ProducerBuilder<Null, string>(producerconfig).Build();
}
// Belirtilen topic'e asenkron olarak mesaj gönderir
public async Task PublishAsync(string topic, string message)
{
// Mesajı Kafka'ya gönderir
await _producer.ProduceAsync(topic, new Message<Null, string> { Value = message });
}
}
Program.cs
//Producer servisini ekliyoruz.
builder.Services.AddSingleton<ProducerService>();
İki projeyide aynı anda çalıştırdığımız sonuç olarak aşağıdaki gibi konsol uygulamamıza mesajlar gelmeye başlayacaktır.

İyi çalışmalar dilerim.