11.1 Concurrent collection classes
11.1.1 Interface BlockingQueue
11.1.2 Interface ConcurrentMap
11.1.3 Class ConcurrentHashMap
11.2.2 Acquire lock and return immediately
11.2.4 Nonblock-structured locking
11.2.5 Interface ReadWriteLock
11.2.6 Class ReentrantReadWriteLock
11.3.3 Interface ExecutorService
11.3.5 Interface ScheduledExecutorService
11.4 Parallel fork/join framework
Günümüzde uygulamalarımızın aynı anda birçok iş yapabilmesi bir gereklilik haline gelmiştir. Fakat concurrent uygulamalar geliştirmek için Javanın low level kütüphaneleri yeterli değildir. Bu bölümde nasıl concurrent uygulamalar yazacağımızı ve bunları yazarken kullancağımız framework’leri, yaklaşımları göreceğiz. Tabiiki her chapterda olduğu gibi bu konular sınavla sınırlıdır. (Bunlar java.util.concurrent paketi sınıfları, java.util.concurrent.locks paketi sınıfları, ExecutorService ve parallel fork/join framework)
Concurrent collection sınıfları bir collection’a nesne eklerken, diğer taraftan nesne silen bir operasyonu “happens-before” ilişkisi tanımlayarak memory uyumsuzluk sorunlarını giderir. Programcılar java.util sınıfı içindeki collection’lara thread-safe özelliği olan collaction’lar ekliyordu fakat java.util.concurrent paketi içinde java collection sınıflarına birçok ek özellik eklenmiş halleri mevcuttur. Bu sınıflar şunlardır;
Concurrent program yazmak zordur. Hem thread safety ile hem de performans ile baş etmeniz gerekmektedir. (The individual operations of ConcurrentHashMap are safe—that is, multiple threads can put values into the same map object in a safe manner. But these can be misused by the developers if they try to combine multiple safe operations into a single operation.)
BlockingQueue bildiğimiz bir kuyruk,sıra interface’idir. Birden fazla thread’in kullanacağı bir kuyruğa ihtiyacımız olduğundan kullanılır. Bu arayüzü implemente eden sınıflardan biri ArrayBlockingQueue ‘dir. Bu sınıfın constructor’ı initial kapasitesini belirleyen bir metoda sahipdir ve bu değer değiştirilemez! Sıra kapasitesi dolunca yeni eleman eklemeyi bloklar, engeller. Ayrıca boş bir sıradan da eleman çıkarmayı engeller. “producer–consumer” pattern’inde çalışır. Bir thread elemanları oluşturup sıraya ekler, başka bir thread bunları okur.
Birçok client’ın(producer) bir server’a istek yaptığını düşünün. Server(consumer) ise bütün isteklere sırayla cevap vermelidir. Bunu yönetmek için server bir anda karşılayacağı istek sayınının bir limiti olmalıdır. İstekler bir BlokcingQueqe ya konabilir. Böylece limit fazlası istekler engellenir veya istek hiç yoksa server thread’i engellenmeli ve beklemelidir. Bu örneğin örneği; chapter11.s1.LoadTesting sınıfındadır.
queue.put(req); satırı kuyrukda yer varsa ekler, yok ise yer açılana kadar bekler.
queue.take(); ise kuyruğun başındaki nesneyi getirir ve kuyruktan çıkarır. Eğer kuyrukta nesne yok ise nesne bulunana kadar bekler.
ConcurrentMap interface’i java.util.Map sınıfını extend eder. Bu interface key var ise bu key-value çiftini silmek veya değiştirmek için metodlar sağlar. Veya key yok ise eklemeyi sağlayan metodları bulunur. Aşağıda bazı metodları listelenmiştir.
{width=”5.236111111111111in”
height=”2.1180555555555554in”}
Önceki bölümdeki ConcurrentMap interface’inin implemantasyonu da ConcurrentHashMap’dir. HashMap’in concurrent versiyonudur. HashMap senkron değildir ve birden fazla thread ile hashMap değerlerini değiştiriyor isek hashMap’e erişimi senkronize etmemiz gerekir. Fakat bütün HashMap’i lock’lamak birden fazla thread ile çalışır iken ciddi performans sorunlarına yol açar.
Bu problemin çözümü ConcurrentHashMap sınıfıdır. Kendisini sadece bir thread’in kitlemesi yerine ConcurrentHashMap birden fazla thread’in erişmesine izin verir. Kensine sınırsız thread’in okumasına izin verirken sınırlı sayıda thread’in değiştirmesine izin verir. Ayrıca içinde dönerken ConcurrentModificationException hatası fırlatmadığı için dönerken kendisini kitlememize gerek yoktur. Peki iterator’a eriştikten sonra yeni eleman eklenir ise ne olur? İterator sadece HashMap oluşturulduğundaki değerleri verir. Her platformada olmamak ile birlikte, yeni değerleri yansıtabilir…
Drawback = sakınca
Avantajları olduğu gibi bazı sakıncaları da vardır. Mesela elemanlarını edit ederken bütün collaction’ı kitlemediği için size metodu her zaman doğru cevabı vermeyebilir !
ConcurrentMap<Integer, String> map = new ConcurrentHashMap<>();
map.containsKey(key) Bir key ‘in ConcurrentHashMap içinde olup olmadığı boolean döner
map.put(key, value) Varolan bir key değerini değiştirir.
map.replace(key, value) ConcurrentHashMap içinde key varsa atomik olarak değerini değiştirir
Bu metodları kullanan chapter11.s1.UseConcurrentMap sınıfı vardır. Orada thread-safe ile ilgili bir de örnek metin bulunuyor. 3. Metodun önemini anlatıyor.
Aşağıda tekrar java.util.concurrent paketi ve onların java.util paketindeki benzerleri gösterilmiştir.
{width=”4.986111111111111in”
height=”2.673611111111111in”}
To execute synchronized code, a thread must acquire either an implicit or an explicit lock on an object’s monitor. Where no explicit Lockclasses are used, I’ll refer to it as an implicit lock. Lock objects offer multiple advantages over implicit locking of an object’s monitor. Unlike an implicit lock, a thread can use explicit lock objects to wait to acquire a lock until a time duration elapses. Lock objects also support interruptible lock waits, nonblock-structured locks, multiple condition variables, lock polling, and scalability benefits.
Aşağıda Java API nin Lock interface’inin metodları bulunmaktadır.
{width=”5.25in” height=”2.4166666666666665in”}
Method lock() acquires a lock on a Lock object. If the lock isn’t available, it waits until the lock can be acquired. Call method unlock() on a Lock object to release its lock when you no longer need it.
Örnek kod chapter11.s2.LockMethod da var ama bir iş yapmıyor. Sadece lock nesnesi üzerinde lock olana kadar bekliyor o kadar.
Implicit = Üstü kapalı, ima edilen
Explicit = Açık, belirgin
Bir şarkıcıdan imza almak için bekliyorsun. Eğer çok zaman geçer ise beklemekten vazgeçip dönebilirsin. But threads waiting to acquire implicit object locks can’t quit. Once a thread initiates a request to acquire an implicit lock on an object monitor, it can neither stop itself nor can it be asked to do so by any other thread. With explicit locks, you can request a thread to acquire a lock on an object monitor if it’s available and return immediately.
Chapter11.s2.Shipment sınıfında bir kod var. Bu kodda 2 senkron işlem var ve iç içe birbirlerini kitliyorlar. Bu kod JVM’in çalıştırmasına göre dead-lock’a girebilir. Bu olasılığı tamamen ortadan kaldırmak için ise yukarda anlatılan şekilde harici lock kullanırız. Bu sayede deadlock’a girmemesini garanti altına almış oluruz. Düzeltilmiş kod Chapter11.s2.ShipmentMod sınıfında vardır. Bu sınıfta Inventory sınıfının içinde bir harici lock(ReentrantLock) tanımlanıyor. Bu lock i kullanarak ( Lock.tryLock() metoduyla ) her iki nesne üzerinde de lock alabilir isek işlemler yapılıyor, yoksa veri değişikliği yapılmıyor. Alamaz isek kod geri dönüyor, lock almayı beklemiyor. Bu şekilde kod hiçbir zaman deadlock a girmiyor.
Sınavda yanıltmak için acquire(), acquireLock(), release(), ve releaseLock() metodları çıkabilir. Bunların hiçbiri geçerli bir metod değildir. Geçerli olanlar; lock(), unlock(), tryLock(), ve lockInterruptibly()
Doktorda randevunu beklediğini farzet. Bir arama geliyor ve daha önemli bir işin çıkıyor veya şu saate kadar bekleyebilirsin sonra diğer işine gitmen gerekiyor. Bu gibi durumları thread’lerde yapabiliriz. Lock nesnesinin lockInterruptibly() ve tryLock(long time, TimeUnit unit) metodları bu işi yaparlar.
Chapter11.s2.Employee sınıfında bir kod var. İnceler isek lock.lockInterruptibly(); satırı ile lock elde ediliyor fakat dışarıdan bir kesme gelir ise sonlandırılabilme özelliği var. Thread.interrupt() komutu ile lock elde edilen metod kesilebiliyor. Eğer bu komuta kesme gelir ise catch(InterruptedException e) şeklinde yakalanıyor ve Thread.currentThread().interrupt(); komutu ile sonlandırılabiliyor. Burada interrupt komutu çalışması belirsizdir. Thread sonlanmış, bitmiş hiç başlamamış veya yarıda olabilir. kesin olarak sonlandıracak anlamına gelmez.
{width=”4.708333333333333in”
height=”3.9305555555555554in”}
Sadasdas
{width=”4.486111111111111in” height=”1.0625in”}
Adsdasd
{width=”5.270833333333333in”
height=”3.9166666666666665in”}
Chapter 10’da yeni bir thread yaratmak için Thread sınıfını ve Runnable interface’ini öğrendik. Bu yöntemler bir task ile yakın bir ilişki kurarlar ve küçük uygulamalar ile sorun yaratmazlar. Fakat büyük uygulamalar için thread bağımsızlığı ve thread yönetimi için başka bir yol gereklidir.
Executor Framework’ü yukarıda anlatılmaya çalışılan problem için yani bir task oluşturma işi ile task çalıştırma işini birbirinden ayırmamızı sağlar. Bu framework’ü kullanarak Runnable ve Callable interface’lerini kullanarak thread yaratabiliriz. Bu task’ları Executor’a yeni tasklar oluşturmak için iletebiliriz.
Aşağıda bahsedilen sınıf ve arayüzleri görebiliriz. (java.util.concurrent)
{width=”5.020833333333333in”
height=”1.9166666666666667in”}
Thread sınıfını ve Runnable arayüzünü kullanarak task onu çalıştıran thread tarafından çalıştırılmış olur ve yakınen birbirlerine bağlıdırlar. Executor interface’i bir Runnable task’ı nasıl çalıştıracağını bilen sınıfları vardır. Ayrıca task submission ‘u ile task execution’u ayırmamızı sağlar. Bu arayüzünü sole(tek, biricik) metodunu implemente ederek bir taskı çalıştırmak için şunları belirleyebiliriz;
void execute(Runnable)
Chapter11.s3.Hotel sınıfında bir kod var. İnceler isek task oluşturulması Hotel sınıfında, task çalıştırılması ise Order sınıfında yapılarak bu iki iş birbirinden ayrılmış oluyor.
Calling start() on a Thread instance starts a new thread of execution. Calling run() on a Runnable instance executes the thread in the same thread of execution.
Runnable ve Callable interface’lerini karşılaştıracak olur isek, Runnable’ın run metodu dönüş değeri döndürmez ve checked exception fırlatamaz. Bu iki durum Callable’da handle edilmiştir.
public interface Callable<V> {
/**
* Computes a result, or throws an exception if unable to do so.
*
* @return computed result
* @throws Exception if unable to compute a result
*/
V call() throws Exception;
}
Bir önceki örnekteki Orders sınıfı şu şekilde yazılabilidi.
class Order implements Callable<Void> {
String name;
Order(String name) {this.name = name;}
@Override
public Void call() throws Exception {
System.out.println(name);
if (name.equalsIgnoreCase("berry"))
throw new Exception("Berry unavailable");
return null;
}
}
Biz burada call değeri dönüş değeri işimize yaramadığı için Void ile tanımlayabiliriz. (If you don’t want your Callable to return a value, you can create it using Callable<Void>)
Bu interdace Executor interface’inden extend olmuştur ve kendine gönderilen task’ların yönetiminden ve sonlandırılmasından sorumludur. Şu metodları tanımlar;
Aşağıdaki tabloda ise ScheduledService interface’i medtodları bulunur.
{width=”5.256944444444445in” height=”5.25in”}
{width=”5.270833333333333in”
height=”1.9097222222222223in”}
Bir restorantta tek bir aşçı olduğunu düşünelim. Müşteri arttıkça bir aşçı yetişemeyecek ve birden fazla aşkı tutacaksınız. Bazı durumlarda aşçı sayısını arttırmak da soruna çözüm olmayacak çünkü aşçılar aynı kaynakları kullandıkları için onları beklemek zorunda kalabileceklerdir.(Mesela fırını, buzdolabını,vb gibi) Ayrıca aşçılar iş yapmasa bile restoran kaynaklarını tüketeceklerdir.(Mutfakta fiziksel alan, parasal zaman) Eğer mutfağı sürekli bu aşçılar ile doldurur isek mutfakta yer kalmayabilir. Uygulamalarımızda da “exhaustion of physical resources” uygulamamızın crash olmasına sebebiyet verebilir.
Yukarıda tanımlanan durumu önlemek için uygulamamızda aynı anda koşan thread sayısını sınırlamalıyız. Bu problemin çözümü “Thread pools” dur. Amaç schuler’ı doldurmadan ve performansı azaltmadan işlemciyi optimum düzeyde kullanmaktır.
Executors sınıfı birden fazla öntanımlı thread pool alan statik metodlara sahiptir. Bunlar;
Chapter11.s3.Hotel2 sınıfında Thread pool’a gönderilen Callable nesneleri vardır. Burada thread pool’a submit ile yeni işler ekleniyor. shutDown ile yeni iş eklenmesi kapatılıyor ve halihazırda çalışan thread’lerin bitmesi bekleniyor. shutDownNow ise yeni iş eklemeyi kapatırken halihazırda çalıaşn thread’leri bitiriyor. awaitTermination ise shutdown isteğinden sonra bütün threadlerin bitmesini veya timeout bitinceye kadar kodun ilerlemesini durduruyor.
Bu interface gelecek veya periyodik thread çalıştırmamızı sağlar. Bütün çalışanlara hatırlatıcı email’i göndereceğimizi varsayalım. Chapter11.s3.ReminderMgr sınıfında örnek kod vardır. Bu kod çalıştırılınca ve her 24 saatte bir Runnable sınıfında tanımlanan işi çalıştırır. Aşağıda metodları vardır. Mesela örneğimiz scheduleAtFixedRate metodu ile yazılmıştır.
{width=”5.270833333333333in” height=”3.375in”}
Java 7 ile gelen fork/join framework multi-cpu işlemcilerde halihazırdaki Concurrency sınıflarını genişleterek donanımsal olarak paralel işler yapmamızı sağlar. Büyük işlemleri böler, küçük parçaları işler ve sonucu birleştirir.
Bu framework işlemci yoğunluklu uygulamalar ve parçalanarak paralel olarak birbirinden bağımsız çalışabilen uygulamalar için tasarlanmıştır. Blok görevler, I/O kullanan işlemler veya senkronizasyon gerektiren işlemler için uygun değildir.
Aşağıda fork/join framework’ün “divide-and-conquer” algoritması görülmektedir.
{width=”5.0in” height=”3.423611111111111in”}
Şemadan görüldüğü üzere framework işe başlamadan problemin boyutuna bakar, tanımlanan limitten küçük ise hemen işler, büyük ise problemi parçalara ayırır ve paralel olarak birbirinden bağımsız parçalar olarak sonucu hesaplar ve çıkan sonuçları birleştirerek sonucu elde eder.
ForkJoinPool sınıfı bu framework’ün concrete halidir. Bu sınıf önceki chapter’da işlenen ExecutorService interface’ini implemente eder. ExecutorService’e benzer fakat ForkJoinPool “work-stealing algorithm” kullanır. Bu algoritma = when worker threads run out of tasks, they steal tasks from other worker threads to avoid blocking waiting threads.
Aşağıdaki şemada bu algoritma çalışması gösterilmiştir. Threadler kendi işlerini bir Deque’da tutuyor ve sürekli bu kuyruğu işliyorlar. Eğer kuyruları bitip iş kalmaz ise diğer thread’lerin kuruklarındaki işleri yapıyorlar.
{width=”5.166666666666667in”
height=”3.1180555555555554in”}
ForkJoinPool sınıfı ForkJoinTask türünde olmayan client’lar(management ve monitoring taskları) için giriş noktası sağlar. Problem çözen classımız ForkJoinTask’ın subclass’ı olmalıdır. ForkJoinTask, ForkJoinPool içinde koşan bütün taskların abstract base’idir. ForkJoinTask bir thread’in hafif bir versiyonu gibi düşünebiliriz. Birçok ForkJoinTask, ForkJoinPool içinde çalıştırılır. Aşağıdaki şekilde bu sınıflar ve bu sınıfların 2 alt sınıfı RecursiveTask ve RecursiveAction sınıfları görülebilir. RecursiveAction sonuç döndürmeyen hesaplamaları, RecursiveTask ise sonuç döndüren hesaplamaları yapar.
{width=”4.958333333333333in”
height=”1.6944444444444444in”}
Şimdi bu sınıfların bir örneğini yapalım. Chapter11.s4.CalcSum sınıfında 100 adet elemanı olan bir dizi elemanların toplamı dizi parçalara ayrılarak fork/join framework ile hesaplanıyor. Örneğimizde önce CalcSum sınıfı “extends RecursiveTask<Integer>” sınıfı ile oluşturuluyor. RecursiveTask içinde override etmemiz gereken “compute” sınıfı bulunuyor. ForkJoinPool instance’ı üzerinde “invoke” metodu çağrılınca compute metodu çağrılıyor. Bu metod ayrıca RecursiveTask instance’ından fork metodu çağrılıncada çalıştırılıyor.
( In the preceding code, 1 defines class CalcSum, which extends RecursiveTask