1. Семафоры

Семафоры представляют еще одно средство синхронизации для доступа к ресурсу. В Java семафоры представлены классом Semaphore, который располагается в пакете java.util.concurrent.

Для управления доступом к ресурсу семафор использует счетчик, представляющий количество разрешений. Если значение счетчика больше нуля, то поток получает доступ к ресурсу, при этом счетчик уменьшается на единицу. После окончания работы с ресурсом поток освобождает семафор, и счетчик увеличивается на единицу. Если же счетчик равен нулю, то поток блокируется и ждет, пока не получит разрешение от семафора.

Установить количество разрешений для доступа к ресурсу можно с помощью конструкторов класса Semaphore:

Semaphore(int permits)
Semaphore(int permits, boolean fair)

Параметр permits указывает на количество допустимых разрешений для доступа к ресурсу. Параметр fair во втором конструкторе позволяет установить очередность получения доступа. Если он равен true, то разрешения будут предоставляться ожидающим потокам в том порядке, в каком они запрашивали доступ. Если же он равен false, то разрешения будут предоставляться в неопределенном порядке.

Для получения разрешения у семафора надо вызвать метод acquire(), который имеет две формы:

void acquire() throws InterruptedException
void acquire(int permits) throws InterruptedВxception

Для получения одного разрешения применяется первый вариант, а для получения нескольких разрешений - второй вариант.

После вызова этого метода пока поток не получит разрешение, он блокируется.

После окончания работы с ресурсом полученное ранее разрешение надо освободить с помощью метода release():

void release()
void release(int permits)

Первый вариант метода освобождает одно разрешение, а второй вариант - количество разрешений, указанных в permits.

Используем семафор в простом примере:

import java.util.concurrent.Semaphore;

public class Program {
    public static void main(String[] args) {
        Semaphore sem = new Semaphore(1); // 1 разрешение
        CommonResource res = new CommonResource();
        new Thread(new CountThread(res, sem, "CountThread 1")).start();
        new Thread(new CountThread(res, sem, "CountThread 2")).start();
        new Thread(new CountThread(res, sem, "CountThread 3")).start();
    }
}

class CommonResource {
    int x = 0;
}

class CountThread implements Runnable {
    CommonResource res;
    Semaphore sem;
    String name;

    CountThread(CommonResource res, Semaphore sem, String name) {
        this.res = res;
        this.sem = sem;
        this.name = name;
    }

    public void run() {
        try {
            System.out.println(name + " ожидает разрешение");
            sem.acquire();
            res.x = 1;
            for(int i = 1; i < 5; i++) {
                System.out.println(this.name + ": " + res.x);
                res.x++;
                Thread.sleep(100);
            }
        } catch(InterruptedException e) {
            System.out.println(e.getMessage());
        }
        System.out.println(name + " освобождает разрешение");
        sem.release();
    }
}

Итак, здесь есть общий ресурс CommonResource с полем x, которое изменяется каждым потоком. Потоки представлены классом CountThread, который получает семафор и выполняет некоторые действия над ресурсом. В основном классе программы эти потоки запускаются. В итоге мы получим следующий вывод:

CountThread 1 ожидает разрешение
CountThread 2 ожидает разрешение
CountThread 3 ожидает разрешение
CountThread 1: 1
CountThread 1: 2
CountThread 1: 3
CountThread 1: 4
CountThread 1 освобождает разрешение
CountThread 3: 1
CountThread 3: 2
CountThread 3: 3
CountThread 3: 4
CountThread 3 освобождает разрешение
CountThread 2: 1
CountThread 2: 2
CountThread 2: 3
CountThread 2: 4
CountThread 2 освобождает разрешение

Семафоры отлично подходят для решения задач, где надо ограничивать доступ. Например, классическая задача про обедающих философов. Ее суть: есть несколько философов, допустим, пять, но одновременно за столом могут сидеть не более двух. И надо, чтобы все философы пообедали, но при этом не возникло взаимоблокировки философами друг друга в борьбе за тарелку и вилку:

import java.util.concurrent.Semaphore;

public class Program {
    public static void main(String[] args) {
        Semaphore sem = new Semaphore(2);
        for(int i = 1; i < 6; i++)
            new Philosopher(sem, i).start();
    }
}

// класс философа
class Philosopher extends Thread {
    Semaphore sem; // семафор. ограничивающий число философов
    // кол-во приемов пищи
    int num = 0;
    // условный номер философа
    int id;
    // в качестве параметров конструктора передаем идентификатор философа и семафор
    Philosopher(Semaphore sem, int id) {
        this.sem = sem;
        this.id = id;
    }

    public void run() {
        try {
            while(num < 3) { // пока количество приемов пищи не достигнет 3
                //Запрашиваем у семафора разрешение на выполнение
                sem.acquire();
                System.out.println ("Философ " + id+" садится за стол");
                // философ ест
                sleep(500);
                num++;

                System.out.println ("Философ " + id+" выходит из-за стола");
                sem.release();

                // философ гуляет
                sleep(500);
            }
        } catch(InterruptedException e) {
            System.out.println ("у философа " + id + " проблемы со здоровьем");
        }
    }
}

В итоге только два философа смогут одновременно находиться за столом, а другие будут ждать:

Философ 1 садится за стол
Философ 3 садится за стол
Философ 3 выходит из-за стола
Философ 1 выходит из-за стола
Философ 2 садится за стол
Философ 4 садится за стол
Философ 2 выходит из-за стола
Философ 4 выходит из-за стола
Философ 5 садится за стол
Философ 1 садится за стол
Философ 1 выходит из-за стола
Философ 5 выходит из-за стола
Философ 3 садится за стол
Философ 2 садится за стол
Философ 3 выходит из-за стола
Философ 4 садится за стол
Философ 2 выходит из-за стола
Философ 5 садится за стол
Философ 4 выходит из-за стола
Философ 5 выходит из-за стола
Философ 1 садится за стол
Философ 3 садится за стол
Философ 1 выходит из-за стола
Философ 2 садится за стол
Философ 3 выходит из-за стола
Философ 5 садится за стол
Философ 2 выходит из-за стола
Философ 4 садится за стол
Философ 5 выходит из-за стола
Философ 4 выходит из-за стола

2. Класс Exchanger

Класс Exchanger предназначен для обмена данными между потоками. Он является типизированным и типизируется типом данных, которыми потоки должны обмениваться.

Обмен данными производится с помощью единственного метода этого класса exchange():

V exchange(V x) throws InterruptedException
V exchange(V x, long timeout, TimeUnit unit) throws InterruptedException, TimeoutException

Параметр x представляет буфер данных для обмена. Вторая форма метода также определяет параметр timeout - время ожидания и unit - тип временных единиц, применяемых для параметра timeout.

Данный класс очень просто использовать:

import java.util.concurrent.Exchanger;

public class Program {
    public static void main(String[] args) {
        Exchanger<String> ex = new Exchanger<String>();
        new Thread(new PutThread(ex)).start();
        new Thread(new GetThread(ex)).start();
    }
}

class PutThread implements Runnable {
    Exchanger<String> exchanger;
    String message;

    PutThread(Exchanger<String> ex) {
        this.exchanger = ex;
        message = "Hello Java!";
    }

    public void run() {
        try {
            message = exchanger.exchange(message);
            System.out.println("PutThread has received: " + message);
        } catch(InterruptedException ex) {
            System.out.println(ex.getMessage());
        }
    }
}

class GetThread implements Runnable {
    Exchanger<String> exchanger;
    String message;

    GetThread(Exchanger<String> ex) {
        this.exchanger = ex;
        message = "Hello World!";
    }

    public void run() {
        try {
            message = exchanger.exchange(message);
            System.out.println("GetThread has received: " + message);
        } catch(InterruptedException ex) {
            System.out.println(ex.getMessage());
        }
    }
}

В классе PutThread отправляет в буфер сообщение Hello Java!:

message = exchanger.exchange(message);

Причем в ответ метод exchange() возвращает данные, которые отправил в буфер другой поток. То есть происходит обмен данными. Хотя нам необязательно получать данные, мы можем просто их отправить:

exchanger.exchange(message);

Логика класса GetThread аналогична - также отправляется сообщение.

В итоге консоль выведет следующий результат:

PutThread has received: Hello World!
GetThread has received: Hello Java!

3. Класс Phaser

Класс Phaser позволяет синхронизировать потоки, представляющие отдельную фазу или стадию выполнения общего действия. Phaser определяет объект синхронизации, который ждет, пока не завершится определенная фаза. Затем Phaser переходит к следующей стадии или фазе и снова ожидает ее завершения.

Для создания объекта Phaser используется один из конструкторов:

Phaser()
Phaser(int parties)
Phaser(Phaser parent)
Phaser(Phaser parent, int parties)

Параметр parties указывает на количество сторон (грубо говоря, потоков), которые должны выполнять все фазы действия. Первый конструктор создает объект Phaser без каких-либо сторон. Второй конструктор регистрирует передаваемое в конструктор количество сторон. Третий и четвертый конструкторы также устанавливают родительский объект Phaser.

Основные методы класса Phaser:

  • int register() регистрирует сторону, которая выполняет фазы, и возвращает номер текущей фазы - обычно фаза 0

  • int arrive() сообщает, что сторона завершила фазу и возвращает номер текущей фазы

  • int arriveAndAwaitAdvance() аналогичен методу arrive, только при этом заставляет phaser ожидать завершения фазы всеми остальными сторонами

  • int arriveAndDeregister() сообщает о завершении всех фаз стороной и снимает ее с регистрации. Возвращает номер текущей фазы или отрицательное число, если синхронизатор Phaser завершил свою работу

  • int getPhase() возвращает номер текущей фазы

При работае с классом Phaser обычно сначала создается его объект. Далее нам надо зарегистрировать все участвующие стороны. Для регистрации в каждой стороне вызывается метод register(), либо можно обойтись и без этого метода, передав нужное количество сторон в конструктор Phaser.

Затем каждая сторона выполняет некоторый набор действий, составляющих фазу. А синхронизатор Phaser ждет, пока все стороны не завершат выполнение фазы. Чтобы сообщить синхронизатору, что фаза завершена, сторона должна вызвать метод arrive() или arriveAndAwaitAdvance(). После этого синхронизатор переходит к следующей фазе.

Применим Phaser в приложении:

import java.util.concurrent.Phaser;

public class Program {
    public static void main(String[] args) {
        Phaser phaser = new Phaser(1);
        new Thread(new PhaseThread(phaser, "PhaseThread 1")).start();
        new Thread(new PhaseThread(phaser, "PhaseThread 2")).start();

        // ждем завершения фазы 0
        int phase = phaser.getPhase();
        phaser.arriveAndAwaitAdvance();
        System.out.println("Фаза " + phase + " завершена");

        // ждем завершения фазы 1
        phase = phaser.getPhase();
        phaser.arriveAndAwaitAdvance();
        System.out.println("Фаза " + phase + " завершена");

        // ждем завершения фазы 2
        phase = phaser.getPhase();
        phaser.arriveAndAwaitAdvance();
        System.out.println("Фаза " + phase + " завершена");

        phaser.arriveAndDeregister();
    }
}

class PhaseThread implements Runnable {
    Phaser phaser;
    String name;

    PhaseThread(Phaser phaser, String name) {
        this.phaser = phaser;
        this.name = name;
        phaser.register();
    }
    public void run() {

        System.out.println(name + " выполняет фазу " + phaser.getPhase());
        phaser.arriveAndAwaitAdvance(); // сообщаем, что первая фаза достигнута

        System.out.println(name + " выполняет фазу " + phaser.getPhase());
        phaser.arriveAndAwaitAdvance(); // сообщаем, что вторая фаза достигнута

        System.out.println(name + " выполняет фазу " + phaser.getPhase());
        phaser.arriveAndDeregister(); // сообщаем о завершении фаз и удаляем с регистрации объекты
    }
}

Итак, здесь у нас фазы выполняются тремя сторонами - главным потоком и двумя потоками PhaseThread. Поэтому при создании объекта Phaser ему передается число 1 - главный поток, а в конструкторе PhaseThread вызывается метод register(). Мы в принципе могли бы не использовать метод register(), но тогда нам надо было бы указать Phaser phaser = new Phaser(3), так как у нас три стороны.

Фаза в каждой стороне представляет минимальный примитивный набор действий: для потоков PhaseThread это вывод сообщения, а для главного потока - подсчет текущей фазы с помощью метода getPhase(). При этом отсчет фаз начинается с нуля. Каждая сторона завершает выполнение фазы вызовом метода phaser.arriveAndAwaitAdvance(). При вызове этого метода пока последняя сторона не завершит выполнение текущей фазы, все остальные стороны блокируются.

После завершения выполнения последней фазы происходит отмена регистрации всех сторон с помощью метода arriveAndDeregister().

В итоге работа программы даст следующий вывод:

PhaseThread 1 выполняет фазу 0
PhaseThread 2 выполняет фазу 0
PhaseThread 1 выполняет фазу 1
PhaseThread 2 выполняет фазу 1
Фаза 0 завершена
Фаза 1 завершена
PhaseThread 1 выполняет фазу 2
PhaseThread 2 выполняет фазу 2
Фаза 2 завершена

В данном случае получается немного путанный вывод. Так, сообщения о выполнении фазы 1 выводится после сообщения об окончании фазы 0. Что связано с многопоточностью - фазы завершились, но в одном потоке еще не выведено сообщение о завершении, тогда как другие потоки уже начали выполнение следующей фазы. В любом случае все это происходит уже после завершения фазы.

Но чтобы было более наглядно, мы можем использовать sleep() в потоках:

public void run() {
        System.out.println(name + " выполняет фазу " + phaser.getPhase());
        phaser.arriveAndAwaitAdvance(); // сообщаем, что первая фаза достигнута
        try {
            Thread.sleep(200);
        } catch(InterruptedException ex) {
            System.out.println(ex.getMessage());
        }

        System.out.println(name + " выполняет фазу " + phaser.getPhase());
        phaser.arriveAndAwaitAdvance(); // сообщаем, что вторая фаза достигнута
        try {
            Thread.sleep(200);
        } catch(InterruptedException ex) {
            System.out.println(ex.getMessage());
        }
        System.out.println(name + " выполняет фазу " + phaser.getPhase());
        phaser.arriveAndDeregister(); // сообщаем о завершении фаз и удаляем с регистрации объекты
    }

И в этом случае вывод будет более привычным, хотя на работу фаз это никак не повлияет.

PhaseThread 1 выполняет фазу 0
PhaseThread 2 выполняет фазу 0
Фаза 0 завершена
PhaseThread 2 выполняет фазу 1
PhaseThread 1 выполняет фазу 1
Фаза 1 завершена
PhaseThread 2 выполняет фазу 2
PhaseThread 1 выполняет фазу 2
Фаза 2 завершена

4. Блокировки. ReentrantLock

Для управления доступом к общему ресурсу в качестве альтернативы оператору synchronized мы можем использовать блокировки. Функциональность блокировок заключена в пакете java.util.concurrent.locks.

Вначале поток пытается получить доступ к общему ресурсу. Если он свободен, то на него накладывает блокировку. После завершения работы блокировка с общего ресурса снимается. Если же ресурс не свободен и на него уже наложена блокировка, то поток ожидает, пока эта блокировка не будет снята.

Классы блокировок реализуют интерфейс Lock, который определяет следующие методы:

  • void lock() ожидает, пока не будет получена блокировка

  • void lockInterruptibly() throws InterruptedException ожидает, пока не будет получена блокировка, если поток не прерван

  • boolean tryLock() пытается получить блокировку, если блокировка получена, то возвращает true. Если блокировка не получена, то возвращает false. В отличие от метода lock() не ожидает получения блокировки, если она недоступна

  • void unlock() снимает блокировку

  • Condition newCondition() возвращает объект Condition, который связан с текущей блокировкой

Организация блокировки в общем случае довольно проста: для получения блокировки вызывается метод lock(), а после окончания работы с общими ресурсами вызывается метод unlock(), который снимает блокировку.

Объект Condition позволяет управлять блокировкой.

Как правило, для работы с блокировками используется класс ReentrantLock из пакета java.util.concurrent.locks. Данный класс реализует интерфейс Lock.

Для примера возьмем код из темы про оператор synchronized и перепишем данный код с использованием заглушки ReentrantLock:

import java.util.concurrent.locks.ReentrantLock;

public class Program {
    public static void main(String[] args) {
        CommonResource commonResource= new CommonResource();
        ReentrantLock locker = new ReentrantLock(); // создаем заглушку
        for (int i = 1; i < 6; i++) {
            Thread t = new Thread(new CountThread(commonResource, locker));
            t.setName("Thread "+ i);
            t.start();
        }
    }
}

class CommonResource {
    int x = 0;
}

class CountThread implements Runnable {
    CommonResource res;
    ReentrantLock locker;

    CountThread(CommonResource res, ReentrantLock lock) {
        this.res = res;
        this.locker = lock;
    }

    public void run() {
        locker.lock(); // устанавливаем блокировку
        try {
            res.x = 1;
            for (int i = 1; i < 5; i++) {
                System.out.printf("%s %d \n", Thread.currentThread().getName(), res.x);
                res.x++;
                Thread.sleep(100);
            }
        } catch (InterruptedException e) {
            System.out.println(e.getMessage());
        } finally {
            locker.unlock(); // снимаем блокировку
        }
    }
}

Здесь также используется общий ресурс CommonResource, для управления которым создается пять потоков. На входе в критическую секцию устанавливается заглушка:

locker.lock();

После этого только один поток имеет доступ к критической секции, а остальные потоки ожидают снятия блокировки. В блоке finally после всей окончания основной работы потока эта блокировка снимается. Причем делается это обязательно в блоке finally, так как в случае возникновения ошибки все остальные потоки окажутся заблокированными.

В итоге мы получим вывод, аналогичный тому, который был в случае с оператором synchronized:

Thread 4 1
Thread 4 2
Thread 4 3
Thread 4 4
Thread 3 1
Thread 3 2
Thread 3 3
Thread 3 4
Thread 2 1
Thread 2 2
Thread 2 3
Thread 2 4
Thread 1 1
Thread 1 2
Thread 1 3
Thread 1 4
Thread 5 1
Thread 5 2
Thread 5 3
Thread 5 4

5. Interface Condition

Применение условий в блокировках позволяет добиться контроля над управлением доступом к потокам. Условие блокировки представлет собой объект интерфейса Condition из пакета java.util.concurrent.locks.

Применение объектов Condition во многом аналогично использованию методов wait()/notify()/notifyAll() класса Object, которые были рассмотрены в одной из прошлых тем. В частности, мы можем использовать следующие методы интерфейса Condition:

  • await() поток ожидает, пока не будет выполнено некоторое условие и пока другой поток не вызовет методы signal()/signalAll(). Во многом аналогичен методу wait класса Object

  • signal() сигнализирует, что поток, у которого ранее был вызван метод await(), может продолжить работу. Применение аналогично использованию методу notify() класса Object

  • signalAll() сигнализирует всем потокам, у которых ранее был вызван метод await(), что они могут продолжить работу. Аналогичен методу notifyAll() класса Object

Эти методы вызываются из блока кода, который попадает под действие блокировки ReentrantLock. Сначала, используя эту блокировку, нам надо получить объект Condition:

ReentrantLock locker = new ReentrantLock();
Condition condition = locker.newCondition();

Как правило, сначала проверяется условие доступа. Если соблюдается условие, то поток ожидает, пока условие не изменится:

while (условие) {
    condition.await();
}

После выполнения всех действий другим потокам подается сигнал об изменении условия:

condition.signalAll();

Важно в конце вызвать метод signal()/signalAll(), чтобы избежать возможности взаимоблокировки потоков.

Для примера возьмем задачу из темы про методы wait()/notify() и изменим ее, применяя объект Condition.

Итак, у нас есть склад, где могут одновременно быть размещено не более 3 товаров. И производитель должен произвести 5 товаров, а покупатель должен эти товары купить. В то же время покупатель не может купить товар, если на складе нет никаких товаров:

import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.locks.Condition;

public class Program {
    public static void main(String[] args) {
        Store store=new Store();
        Producer producer = new Producer(store);
        Consumer consumer = new Consumer(store);
        new Thread(producer).start();
        new Thread(consumer).start();
    }
}

// Класс Магазин, хранящий произведенные товары
class Store {
   private int product = 0;
   ReentrantLock locker;
   Condition condition;

   Store() {
       locker = new ReentrantLock(); // создаем блокировку
       condition = locker.newCondition(); // получаем условие, связанное с блокировкой
   }

   public void get() {
      locker.lock();
      try {
          // пока нет доступных товаров на складе, ожидаем
          while (product < 1) {
              condition.await();
          }

          product--;
          System.out.println("Покупатель купил 1 товар");
          System.out.println("Товаров на складе: " + product);

          // сигнализируем
          condition.signalAll();
      } catch (InterruptedException e) {
          System.out.println(e.getMessage());
      } finally {
          locker.unlock();
      }
   }

   public void put() {
       locker.lock();
       try {
          // пока на складе 3 товара, ждем освобождения места
          while (product >= 3) {
              condition.await();
          }

          product++;
          System.out.println("Производитель добавил 1 товар");
          System.out.println("Товаров на складе: " + product);
          // сигнализируем
          condition.signalAll();
      } catch (InterruptedException e) {
          System.out.println(e.getMessage());
      } finally {
          locker.unlock();
      }
   }
}

// класс Производитель
class Producer implements Runnable {
    Store store;

    Producer(Store store) {
       this.store = store;
    }

    public void run() {
        for (int i = 1; i < 6; i++) {
            store.put();
        }
    }
}

// Класс Потребитель
class Consumer implements Runnable {
    Store store;

    Consumer(Store store) {
       this.store = store;
    }

    public void run() {
        for (int i = 1; i < 6; i++) {
            store.get();
        }
    }
}

В итоге мы получим вывод наподобие следующего:

Производитель добавил 1 товар
Товаров на складе: 1
Производитель добавил 1 товар
Товаров на складе: 2
Производитель добавил 1 товар
Товаров на складе: 3
Покупатель купил 1 товар
Товаров на складе: 2
Покупатель купил 1 товар
Товаров на складе: 1
Покупатель купил 1 товар
Товаров на складе: 0
Производитель добавил 1 товар
Товаров на складе: 1
Производитель добавил 1 товар
Товаров на складе: 2
Покупатель купил 1 товар
Товаров на складе: 1
Покупатель купил 1 товар
Товаров на складе: 0