23.10.2021

EDA, Kafka, Ksql

EDA - Event Driven Architecture

A software architecture pattern promoting the production, detection, consumption of, and reaction to events. İletişimi data ile değil, eventler ile yapıldığı model diyebiliriz. Şu kısımlardan oluşur.

EDA faydaları

EDA Dezavantajları

Event Storming

Kafka

Kafka özellikleri (Zero copy concept)

Distributed streaming platform

Kafka binary veri ile çalışır. Serialization veya deserialization yapmaz. Bütün veri binary olarak networkden gelir ve binary olarak disk e yazılır. (TCP)

Kafka architecture

2 ana yapıdan oluşur, broker ve zookeper.

Kurulum

Records

Kafka ya gelen ve giden mesajlara “Record” denir. 3 kısımdan oluşur. Key, value, timestamp. Key ve value herhangi birşey olabilir, timestamp ise bir sayıdır, timestamp formatında. Mesajın üretildiği zamanı belirtir.

Topics

Mesajların gruplandırılmasıdır. Broker içinde tutulurlar. Hangi topic’in hangi brokerda olduğu bilgisi ise zookeeper da tutulur. 2 tür topic vardır, delete ve compact.

Kafka yükü birden fazla broker’a dağıtır. Bunu partition lar ile yapar. Oluşturulan bir topic farklı partitionlara bölünür ve her partition da farklı brokerlarda saklanır. Burada her partition farklı veri saklarlar. Peki yedeklilik nası sağlanır? Aslında arka planda her brokerın diğer partition’ı diğer broker’ın partition ına otomatik olarak yedeklenir. Böylece bir broker down olunca, diğer brokerda zaten down olan brokerın yedek mesajları bulunmaktadır.

Producers

Kafka ya veri göndermek için gerekli bütün işi yapan sınıflardır. Bir nesneyi alırlar, key ve value değerlerini binary formata çevirirler ve network den gönderirler.

Topic yaratma

Mesaj yollamak

Producer yaratmak için şu değerler kafidir

Consumers

Consumer lar pull mantığı ile çalışır, kafka mesajları push etmek, consumerlar çeker. Böylece yavaş consumerlar kafkayı yavaşlatmazlar. Consumer başlayınca, Belirli bir periyotta kafkaya benim için mesaj varmı diye sürekli sorar. Mesaj gelince yine binary formattadır. Bu veriyi deserialize eder.

Mesaj almak

Consumer yaratmak için şu değerler kafidir

Communication with AVRO and message registry

Bir nesneyi serialize ederek network ile başka sunucuya, veya diske yazabilir, bu veriyi daha sonra deserialize edebiliriz. Burada serialize ettikten sonra binary olarak veya plaintext olarak gönderebiliriz. Binary hızlıdır fakat debug etmesi zordur, okunamaz. Diğer bir husus ise serialization formatın veri yapısını doğrulayıp doğrulamadığıdır. Bu Schema lar vasıyasıyla validate edilebilir. Mesela protobuff, XML, JSON vs..

AVRO data serialization sistemidir. Veriyi binary olarak iletir ve şema validasyonu yapar. Binary olduğu için JSON ve XML e göre daha hızlıdır. Şema ile beraber serDe yapılabilir. Tipler diğer notumda var.

Schema Registry

Producer ve consumer lar için şema dağıtımı uygulamasıdır. Şemalarını bir kafka topic içine yazar. Uygulama şu şekilde çalışır. Producer kafkaya nesne göndermek için SR ye bu sınıfın şemasını bana ver, der. SR şemayı, ona özel bir şema ID ile gönderir. Producer bu şema ile mesajı serialize edip şema ID ile beraber kafkaya yollar. Kafkayı dinleyen consumer mesajı alır ve deserialize etmek için önce SR ye bana şu şema ID li şemayı ver der. SR ilgili şemayı consumer a yollar ve consumer da ilgili şema ile veriyi deserialize edebilir. Görüldüğü üzere servisler birbiri ile decoupled!

Demo

SR yi confluent github dan indirebilir(https://github.com/confluentinc/schema-registry) ve mvn package ile derleyebiliriz. Ardından default ayar dosyası ile başlattığımızda 8081. porttan dinlemeye başlar. Yukarıdaki producer ayarlarından farklı olarak

Consumer da ise yukardaki değişikliklere ek olarak “specific.avro.reader” true yapılır ve tipler arası otomatik dönüşümün yapılması sağlanır.

Kafka Streams

Stream veri biliminde data parçalarının tek tek işlenmesidir. Mesela bir fraud uygulaması yapabiliriz. UI gelen ödemeleri sürekli payments topic’ine yazar. Fraud stream uygulamamız ise buraya gelen mesajları sürekli dinler ve bazı kuralları geçer ise bunu validated-payments isimli bir topic’e yazabilir. (Dispatch) Onu da başka bir processor dinleyebilir.

Stream uygulaması aslında bir topic consumer ve producer dan ibarettir. Streams bize bu yapıyı hazır sunar. Böylece stream uygulamasının business kısmına odaklanabiliriz. Stream kısmı ise mesajı çeşitli aşamalardan geçirerek işler. Bu aşamalara “topology” denir. “Topolgy is acyclic(zincir biçiminde olan) graph of sources, processors, and sinks.

Duality of Streams

Streams uygulamaları sadece stream kullanmak için yapılmaz. Veriyi veritabanında da saklamamız gerekir. Çünkü stream girişi olan ve stream çıkışı olan bir processor, diğer eventlerin birbirleri arasında bir bağlantı yoktur. Hepsini birbirinden bağımsız işler. Bu stream topic eventleri genellikle belirli süre sonra silinir. (Delete topics lerde saklanırlar.) Fakat banka hesabı gibi birşeyde hesap hareketlerinin hepsi banka hesabını oluşturur ve her işlemin diğer işlemlerden haberdar olması gerekir.

Çözüm için tablolar geliştirilmiştir. Bir streamde gerçekleşen son olaylar, tablo olarak saklanabilir. (Compaction topics lerde saklanırlar.)

Stream türünde bir akış, tabloya çevrilebilir veya tablo bir stream’e çevrilebilir. Stream’i tabloya çevirmek için aggregate(), reduce(), count() gibi metodlar kullanılır. Tabloyu stream’e çevirmek için ise sadece tablo kayıtlarında for ile gezeriz, daha kolayı toStream() metodunu kullanırız.

Processors

Stateless Operations

Java 8 streams API çok benziyor.

Stateful Operations

KSQL

Ksql, kafka streams API’nin üstüne kurulmuş, stream uygulamaları yazmamızı sağlayan bir API dir. Confluent tarafından yazılmış ve open source dur. Amacı streams uygulamaları yazmamızı kolaylaştırmaktır.

Çalışma mantığı

KSQL server kurarız, içinde streams uygulamalarımız çalışacaktır. Bu server’ın kafka cluster’ımıza bağlanmasını sağlarız. Çünkü cluster da topicleri produce ve consume edecektir. Peki bu serverda stream uygulamaları nasıl oluşur? Kullanıcı KSQL yazarak aslında streams uygulamaları yazmış olur. Kullanıcı bu KSQL cümleleri development ortamında “ksql-cli” üzerinde yazıp kafka server’a iletir. Bu iletim REST ile yapılır.

Yazılan her KSQL sorgusu bir stream uygulaması oluşmasına sebebiyet verir. Sorgu sonucu serverda bir topoloji oluşur ve otomatik olarak çalışmaya başlar.

Kullanma sebebimiz ise şunlar olabilir. Öncelikle çok basit bir select ile stream uygulaması oluşturabiliriz. Topic lerdeki verileri okumamızı çok kolaylaştırır. Ayrıca datayı manupile etmemizi de çok kolaylaştırır.

Windowing

Belirli zaman aralığında gelen mesajları sayar. Mesajları key değerine göre gruplayarak sayar. 2 çeşit yapılır.

KSQL Statements

CREATE STREAM AS SELECT, 2 iş yapar

KSQL kurulum

KQL ayarları

KSQL CLI çalıştır

CLI komutları

CREATE STREAM ksql_payments WITH ( KAFKA_TOPIC=’payments’, VALUE_FORMAT=’AVRO’);

Yukarıdaki komutla yeni bir stream yaratılır. payments topicini izler ve payments topici value formatı AVRO dur.

Aşağıda ise son 10 dakikada kullanıcıların gerçekleştirdiği transaction sayılarını görmek istedik. Bu işi stream ile yapamayız o yüzden table oluşturuyoruz.

CREATE TABLE warnings AS SELECT userId, COUNT() // value formatı burada belirlenir FROM ksql_payments WINDOW HOPPING(SIZE 10 MINUTES, ADVANCE BY 1 MINUTE) GROUP BY userId // count kullandığımız için gruplamamız lazım HAVING COUNT() > 5;

Kafka Connect

Kafka Connect, bir kafka cluster dan dış sistemlere hiçbir kod yazmadan topiclere gelen verileri dışarı yazmaya yarar. Destekledikleri;

Kafka connect bunu yaparken her mesajı ilgili servise tek tek yollamaz! Önce kendi internal memory sinde tutar ve belirli aralıklarla dış servise yazar. Amaç gereksiz overhead i önlemek, transaction sayısını azaltmak.

Kafka connect birden fazla oluştururlarak scale edilebilir. Development için standalone kullanılır, prod için distributed olanı kullanılır.

Connectors,

Kafka verilerini persist etmek için oluşturulan hazır API lerdir.

DB den kafkaya iletmek için, JDBC source connector kullanılır. Kafkadan AWS e iletmek için, AWS S3 Sink connector kullanılır.

Kafka REST

Kafka iletişimini binary yapar demiştik. Bu iletişim OSI layer 4 de, TCP protokolünde olur. Bu sebepten şifreli değildir veriler. Şifrelemek istersek layer 6 ya çıkmamız ve SSL kullanmamız gerekir. Dezantajı ise hız azalır, overhead artar.

Kafka iletişim mimarisi

Producer bir mesajı TCP olarak ACK yapmadan direk olarak kafkaya gönderir. Kafka ise sadece mesajı aldım diye ACK yollar. Bu iletişim aynı zamanda asenkron gerçekleşir.

Eğer uygumamamıza kafka-clients ekleyemezsek, kafkanın önüne REST proxy ekleriz. Bu proxy kafkadan mesaj produce eder veya consume eder. Legacy uygulamamız ise GET ve POST metodları ile topicler ile iletişim kurmamızı sağlar.

Confluent aynı şekilde github dan “kafka-rest” projemizi kullanmamıza izin verir. (https://github.com/confluentinc/kafka-rest)