본문 바로가기
Language/Java

[Java] 쓰레드 6 - 쓰레드 동기화(synchronized , Lock, Condition)

by 계범 2022. 3. 14.

쓰레드의 동기화

멀티쓰레드 프로세스의 경우 여러 쓰레드가 자원을 공유해서 작업하기 때문에 서로에게 영향을 줄 수 있다.

이러한 일을 방지하기 위해 한 쓰레드가 진행중인 작업을 다른 쓰레드가 간섭하지 못하도록 막는 것을 '쓰레드 동기화(Synchronization)'라고 한다.

 

synchronized를 이용한 동기화

// 1. 메서드 전체를 임계 영역으로 지정
public synchronized void calcSum(){
	// ...
}

// 2. 특정 영역을 임계 영역으로 지정
synchronized(객체의 참조변수){
	// ...
}
임계영역이란 둘 이상의 프로세스(쓰레드)가 공유 데이터에 접근하여 작업하는 코드 블록.

2022.01.05 - [CS/OS(운영체제)] - 동기화 툴(프로세스 동기화)

 

1) 메서드 앞에 synchronized를 붙여서 메서드 전체가 임계 영역으로 설정된다.

쓰레드는 해당 메서드가 호출된 시점부터 해당 메서드가 포함된 객체의 lock을 얻어 작업을 수행하다가 메서드가 종료되면 lock을 반환한다.

 

2) 메서드 내의 코드 일부를 블럭{}으로 감싸고 블럭 앞에 'synchronized(참조변수)'를 붙이는 것이다.

이때 참조변수는 락을 걸고자하는 객체를 참조하는 것이어야 한다.

이 블럭을 synchronized블럭이라고 부르며, 쓰레드가 이 블럭 영역 안으로 들어가면 객체의 lock을 얻게되고, 블럭을 벗어나면 반납한다.

 

모든 객체는 lock을 하나씩 가지고 있으며, 해당 객체의 lock을 가지고 있는 쓰레드만 임계 영역의 코드를 수행할 수 있다. 다른 쓰레드들은 lock을 얻을 때까지 기다린다.

 

wait()와 notify()

특정 스레드가 객체의 락을 가진 상태로 오랜 시간을 보내지 않게 하기 위한 메서드들이다.

동기화된 임계영역의 코드를 수행하다가 작업을 더 이상 진행할 상황이 아니면, wait()를 호출하여 쓰레드가 락을 반납하고 기다리게 한다. 그럼 다른 쓰레드가 락을 얻어 해당 객체대한 작업을 수행 할 수 있게 되고, 추후에 작업을 진행할 수 있는 상황이 되면 notify()를 호출해서, 작업을 중단했던 쓰레드가 다시 락을 얻어 진행할 수 있게 된다.

 

wait()이 호출되면, 실행 중이던 쓰레드는 해당 객체의 대기실(waiting pool)에서 통지를 기다린다.

notify()가 호출되면, 해당 객체의 대기실에 있던 모든 쓰레드 중에서 임의의 쓰레드만 통지를 받는다.

notifyAll()은 모든 쓰레드에게 통보를 하지만, 그래도 lock을 얻어 나오는 쓰레드는 하나이다.

 

wait()는 notify() 또는 notifyAll()이 호출될 때까지 기다린다. 매개변수가 있으면, 지정된 시간이 지난후에 자동적으로 notify()가 호출되는 것과 같이 행동한다.

 

waiting pool은 객체마다 존재하는 것으로 notifyAll()을 한다고 모든 객체의 waiting pool에 있는 쓰레드를 깨우는 것은 아니다.

 

wait(), notify(), notifyAll()
- Object에 정의되어 있다.
- 동기화 블록(synchronized블록)내에서만 사용할 수 있다.
- 보다 효율적인 동기화를 가능하게 한다.
import java.util.ArrayList;

class Customer implements Runnable {
	private Table table;
	private String food;

	Customer(Table table, String food) {
		this.table = table;  
		this.food  = food;
	}

	public void run() {
		while(true) {
			try { Thread.sleep(100);} catch(InterruptedException e) {}
			String name = Thread.currentThread().getName();
			
			table.remove(food);
			System.out.println(name + " ate a " + food);
		} // while
	}
}

class Cook implements Runnable {
	private Table table;
	
	Cook(Table table) {	this.table = table; }

	public void run() {
		while(true) {
			int idx = (int)(Math.random()*table.dishNum());
			table.add(table.dishNames[idx]);
			try { Thread.sleep(10);} catch(InterruptedException e) {}
		} // while
	}
}

class Table {
	String[] dishNames = { "donut","donut","burger" }; // donut의 확률을 높인다.
	final int MAX_FOOD = 6;
	private ArrayList<String> dishes = new ArrayList<>();

	public synchronized void add(String dish) {
		while(dishes.size() >= MAX_FOOD) {
				String name = Thread.currentThread().getName();
				System.out.println(name+" is waiting.");
				try {
					wait(); // COOK쓰레드를 기다리게 한다.
					Thread.sleep(500);
				} catch(InterruptedException e) {}	
		}
		dishes.add(dish);
		notify();  // 기다리고 있는 CUST를 깨우기 위함.
		System.out.println("Dishes:" + dishes.toString());
	}

	public void remove(String dishName) {

		synchronized(this) {	
			String name = Thread.currentThread().getName();

			while(dishes.size()==0) {
					System.out.println(name+" is waiting.");
					try {
						wait(); // CUST쓰레드를 기다리게 한다.
						Thread.sleep(500);
					} catch(InterruptedException e) {}	
			}

			while(true) {
				for(int i=0; i<dishes.size();i++) {
					if(dishName.equals(dishes.get(i))) {
						dishes.remove(i);
						notify(); // 잠자고 있는 COOK을 깨우기 위함 
						return;
					}
				} // for문의 끝

				try {
					System.out.println(name+" is waiting.");
					wait(); // 원하는 음식이 없는 CUST쓰레드를 기다리게 한다.
					Thread.sleep(500);
				} catch(InterruptedException e) {}	
			} // while(true)
		} // synchronized
	}

	public int dishNum() { return dishNames.length; }
}

class ThreadWaitEx3 {
	public static void main(String[] args) throws Exception {
		Table table = new Table();

		new Thread(new Cook(table), "COOK1").start();
		new Thread(new Customer(table, "donut"),  "CUST1").start();
		new Thread(new Customer(table, "burger"), "CUST2").start();
	
		Thread.sleep(2000);
		System.exit(0);
	}
}

 

위의 예제에서 보면 문제가 하나 있는데,

테이블 객체의 waiting pool에 요리사 쓰레드와 손님 쓰레드가 같이 기다린다는 것이다.

그래서 notify()가 호출되었을 때, 요리사 쓰레드와 손님 쓰레드 중에서 누가 통지 받을지 알 수없다.

 

운이 나쁘면 요리사 쓰레드는 계쏙 통지를 못받고 무한히 기다려야 하는데 이것을 '기아(starvation)현상'이라고 한다.

이 현상을 막으려면, notify() 대신 notifyAll()을 해야한다.

 

notifyAll()로 요리사 쓰레드의 기아현상을 막더라도, 손님 쓰레드까지 통지를 받아서 lock을 얻기위해 둘이 경쟁하게 된다. 이처럼 여러 쓰레드가 lock을 얻기위해 서로 경쟁하는 것을 '경쟁 상태(race condition)'이라고 한다.

 

Lock과 Condition을 이용한 동기화

lock클래스들을 이용해서 동기화하는 방법.

 

synchronized블럭으로 동기화하면 자동적으로 lock이 걸리고 풀려서 편하나, 같은 메서드 내에서만 lock을 걸 수 있다는 제약이 불편하다. 이럴 때 lock 클래스를 사용한다.

 

ReentrantLock : 재진입이 가능한 lock. 가장 일반적인 배타 lock
ReentrantReadWriteLock : 읽기에는 공유적이고, 쓰기에는 배타적인 lock
StampedLock : ReentrantReadWriteLock에 낙관적인 lock의 기능을 추가

 

ReentrantLock

  • 가장 일반적인 lock
  • 특정 조건에서 lock을 풀고 나중에 다시 lock을 얻는 형태
ReentrantLock() : 기본 생성자
ReentrantLock(boolean fair) : fair에 true를 주면 lock이 풀렸을 때 가장 오래 기다린 쓰레드가 lock을 획득.

void lock() : lock 잠금
void unlock(): lock 해지
boolean isLocked() : lock 잠금여부 반환.
boolean tryLock() : 다른 쓰레드에 의해 lock이 걸려 있으면 lock을 얻으려고 기다리지 않는다.
boolean tryLock(long timeout, TimeUnit unit) : 지정된 시간만큼만 기다린다.

수동으로 해야함.

 

 

ReentrantReadWriteLock

  • 읽기를 위한 lock과 쓰기를 위한 lock을 제공
  • 읽기 lock이 걸려있으면, 다른 쓰레드가 읽기 lock을 중복해서 걸고 읽기를 수행할 수 있음
    • 읽는 것은 내용이 변경되지 않으므로 여러 쓰레드가 수행에도 문제가 없음
  • 읽기 lock이 걸려있을 땐, 쓰기 lock 불가. 반대도 마찬가지.

 

StampedLock

  • lock을 걸거나 해지할 때 '스탬프(long타입의 정수값)'을 사용.
  • 위의 lock에서 '낙관적 읽기 lock(optimistic reading lock)'이 추가된 것.
  • 위와 달리 '낙관적 읽기 lock'은 쓰기 lock에 의해 바로 풀림. 풀리면 읽기 lock을 얻어서 다시 읽어야함.
  • 무조건 읽기 lock을 걸지 않고, 쓰기 읽기와 충돌할 때만 쓰기가 끝난 후에 읽기 lock을 거는 것.

 

ReentrantLock과 Condition

앞의 wait() & notify()의 문제점인 waiting pool 내 쓰레드를 구분하지 못한다는 것을 해결한 것이 Condition.

위의 예시에서 손님 쓰레드를 위한 Condition과 요리사 쓰레드를 위한 Condition을 만들어서 각각의 waiting pool에서 따로 기다리도록하여 해결 가능.

 

private ReentrantLock lock = new ReentrantLock();

// lock으로 condition 생성
private Condition forCook = lock.newCondition();
private Condition forCust = lock.newCondition();

그 다음에 wait() & notify() 대신 Condition의 await() & signal()을 사용하면 된다.

 

Object Condition
void wait() void await()
void awaitUninterruptibly()
void wait(long timeout) boolean await(long time, TimeUnit unit)
long      awaitNanos(long nanosTimeout)
boolean awaitUntil(Date deadline)
void notify() void signal()
void notifyAll() void signalAll()

 

최종예제

import java.util.ArrayList;
import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.locks.Condition;

class Customer implements Runnable {
	private Table table;
	private String food;

	Customer(Table table, String food) {
		this.table = table;  
		this.food  = food;
	}

	public void run() {
		while(true) {
			try { Thread.sleep(100);} catch(InterruptedException e) {}
			String name = Thread.currentThread().getName();
			
			table.remove(food);
			System.out.println(name + " ate a " + food);
		} // while
	}
}

class Cook implements Runnable {
	private Table table;
	
	Cook(Table table) {	this.table = table; }

	public void run() {
		while(true) {
			int idx = (int)(Math.random()*table.dishNum());
			table.add(table.dishNames[idx]);
			try { Thread.sleep(10);} catch(InterruptedException e) {}
		} // while
	}
}

class Table {
	String[] dishNames = { "donut","donut","burger" }; // donut의 확률을 높인다.
	final int MAX_FOOD = 6;
	private ArrayList<String> dishes = new ArrayList<>();

	private ReentrantLock lock = new ReentrantLock();
	private Condition forCook = lock.newCondition();
	private Condition forCust  = lock.newCondition();

	public void add(String dish) {
		lock.lock();

		try {
			while(dishes.size() >= MAX_FOOD) {
					String name = Thread.currentThread().getName();
					System.out.println(name+" is waiting.");
					try {
						forCook.await(); // wait(); COOK쓰레드를 기다리게 한다.
						Thread.sleep(500);
					} catch(InterruptedException e) {}	
			}

			dishes.add(dish);
			forCust.signal(); // notify();  기다리고 있는 CUST를 깨우기 위함.
			System.out.println("Dishes:" + dishes.toString());
		} finally {
			lock.unlock();
		}
	}

	public void remove(String dishName) {
		lock.lock(); //		synchronized(this) {	
		String name = Thread.currentThread().getName();

		try {
			while(dishes.size()==0) {
					System.out.println(name+" is waiting.");
					try {
						forCust.await(); // wait(); CUST쓰레드를 기다리게 한다.
						Thread.sleep(500);
					} catch(InterruptedException e) {}	
			}

			while(true) {
				for(int i=0; i<dishes.size();i++) {
					if(dishName.equals(dishes.get(i))) {
						dishes.remove(i);
						forCook.signal(); // notify();잠자고 있는 COOK을 깨움
						return;
					}
				} // for문의 끝

				try {
					System.out.println(name+" is waiting.");
					forCust.await(); // wait(); // CUST쓰레드를 기다리게 한다.
					Thread.sleep(500);
				} catch(InterruptedException e) {}	
			} // while(true)
			 // } // synchronized
		} finally {
			lock.unlock();
		}
	}

	public int dishNum() { return dishNames.length; }
}

class ThreadWaitEx4 {
	public static void main(String[] args) throws Exception {
		Table table = new Table();

		new Thread(new Cook(table), "COOK1").start();
		new Thread(new Customer(table, "donut"),  "CUST1").start();
		new Thread(new Customer(table, "burger"), "CUST2").start();
	
		Thread.sleep(2000);
		System.exit(0);
	}
}

 

참조

'Java의 정석' 책

댓글