운영체제 8편 - 동기화의 고전적인 문제들

이 글은 학부 운영체제 수업을 듣고 정리한 글입니다.
맥락없이 운영체제에 대한 내용들이 불쑥 등장하니 양해부탁드립니다.

이번 편은 동기화(synchronization)에 이어 동기화의 고전적인 문제들에 대한 내용입니다.

동기화에 대한 고전적인 대표적인 문제들이 몇가지 있는데 이중 2가지를 알아볼 것이다.

  • Bounded-buffer problem
  • Readers and Writers problem

Bounded-buffer problem

이는 간단하게 표현하면 N개의 buffer에 여러 producer와 consumer가 동시에 접근하는 문제이다.
몇가지 용어부터 정리해보자.

  • 생산자(Producer)
    한번에 하나의 아이템을 생성해 buffer에 저장한다. 다만 이미 buffer의 상태가 full이면 대기한다.
  • 소비자(Consumer)
    Buffer에서 하나의 아이템을 가져온다. 다만 buffer가 비어있다면 대기한다.

Bounded-buffer problem

이 문제는 주변에서도 많이 찾아볼 수 있는데 패킷을 예로 들 수 있겠다. 패킷이 계속 들어올때 ethernet을 사용한다고 가정한다면 ethernet driver가 패킷을 하나씩 저장해준다. 이들이 buffer에 올라오면 그다음 부터는 Kernel의 역할이다. Kernel이 buffer를 읽어야한다. 이를 읽을때 어떤 것을 동기화를 해야하는가. 어떤 자료구조를 사용해야 하는가 등이 이문제를 해결하는데 중요한 요점이다.

여기에서는 Producer와 Consumer가 충돌없이 buffer에 접근할 수 있도록 해야하므로, Buffer에 접근하는 부분이 critical section이 되겠다.

여러개의 Producer가 동시에 produce할 수 없고, Producer와 Consumer가 동시에 produce, consume 할 수 없다. 또한 여러개의 Consumer도 동시에 consume할 수 없다.
이를 여러개의 적절한 자료구조와 알고리즘을 통해 해결해보자.

이는 3가지 semaphore를 사용해서 구현할 수 있다. (다른 구현방법도 많다.)
우선 buffer는 간단하게 N의 크기를 가진 배열로 하자. Producer는 buffer가 full이 아니라면 새로운 item을 추가한다. Consumer는 buffer가 empty가 아니라면 item을 소비한다.

문제를 해결하기 위해 도입한 3가지 semaphore와 그들의 역할은 다음과 같다.

  • empty
    버퍼내에 저장할 공간을 의미한다. Producer들의 진입을 관리한다. buffer의 크기가 N이므로 초기값은 N으로 하여 0이 될때까지 Producer가 진입할 수 있다.
  • full
    버퍼내에 소비할 아이템이 있는지를 의미한다. Consumer들의 진입을 관리한다. 초기값은 0으로 하여 비어있을때는 Consumer가 진입하지 못한다.
  • Mutex
    buffer의 변경을 위한 semaphore이다. 초기값은 1로하여 동시에 1개의 Producer or Consumer만 진입할 수 있다.

pseudo 코드를 바로 살펴보자.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
Producer:
do {
// 버퍼에 최소 하나의 공간이 생기기를 기다린다.
wait(empty);
wait(mutex);

[produce an item in buffer]

signal(mutex);
// 버퍼에 아이템이 있음을 알린다.
signal(full);
} while (1);


Consumer:
do {
// 버퍼에 적어도 하나의 아이템이 채워지기를 기다린다.
wait(full);
wait(mutex);

[consume item from buffer]

signal(mutex);
// 버퍼에 하나의 빈 공간이 생겼음을 알린다.
signal(empty);
} while (1);

이를 자바로 구현한 코드도 살펴보자.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
import java.util.concurrent.Semaphore;

public class Buffer {
private String[] buffer;
private int index;

public Buffer(int size) {
this.buffer = new String[size];
}

public void add(String message) {
if (index >= buffer.length) {
throw new RuntimeException("buffer is full");
}

buffer[index++] = message;
}

public String pop() {
if (index == 0) {
throw new RuntimeException("buffer is empty");
}

return buffer[--index];
}
}

public class Producer {
private int counter;
private final Buffer buffer;
private final Semaphore empty;
private final Semaphore mutex;
private final Semaphore full;

public Producer(Buffer buffer,
Semaphore empty,
Semaphore mutex,
Semaphore full) {
this.buffer = buffer;
this.empty = empty;
this.mutex = mutex;
this.full = full;
}

public void produce(String message) {
try {
empty.acquire();
mutex.acquire();

buffer.add(message + "-" + counter);
counter++;

mutex.release();
full.release();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
}

public class Consumer {
private final Buffer buffer;
private final Semaphore empty;
private final Semaphore mutex;
private final Semaphore full;

public Consumer(Buffer buffer,
Semaphore empty,
Semaphore mutex,
Semaphore full) {
this.buffer = buffer;
this.empty = empty;
this.mutex = mutex;
this.full = full;
}

public String consume() {
try {
full.acquire();
mutex.acquire();

String message = buffer.pop();
System.out.println("pop " + message);

mutex.release();
empty.release();

return message;
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
}

public class Main {
public static void main(String args[]) {
final int size = 3;
Buffer buffer = new Buffer(size);
Semaphore emptySemaphore = new Semaphore(size);
Semaphore mutexSemaphore = new Semaphore(1);
Semaphore fullSemaphore = new Semaphore(0);
Producer producer1 = new Producer(
buffer, emptySemaphore, mutexSemaphore, fullSemaphore);
Producer producer2 = new Producer(
buffer, emptySemaphore, mutexSemaphore, fullSemaphore);
Consumer consumer1 = new Consumer(
buffer, emptySemaphore, mutexSemaphore, fullSemaphore);

new Thread(() -> IntStream.range(0, 100).forEach(i -> producer1.produce("producerThread1"))).start();
new Thread(() -> IntStream.range(0, 100).forEach(i -> producer2.produce("producerThread2"))).start();
new Thread(() -> {
while (true) {
consumer1.consume();
}
}).start();
}
}

producer1producer2는 각각 100번씩 buffer에 문자열 item을 채워넣는다.
그리고 buffer를 consumer가 계속해서 소비하도록 한다.

Readers And Writers Problem

이는 여러개의 Reader와 Writer가 하나의 공유데이터를 읽거나 쓰기위해 접근하는 문제를 의미한다.

몇가지 용어들을 정리해보자.

  • Reader
    여러 Reader들은 동시에 데이터를 읽을 수 있다.
  • Writer
    Reader 혹은 다른 Writer가 데이터에 접근하고 있으면 write를 할 수 없다.

Readers Writers problem

즉 여러개의 reader는 동시에 critical section에 접근할 수 있지만, writer는 다른 reader나 writer가 접근하지 않을때 write 할 수 있다. Reader도 writer가 현재 critical section에 접근중이면 진입할 수 없다. 앞에서 보았던 Bounded-buffer problem에서는 reader와 writer가 각각 한번만 들어가는 것을 허용했는데 Readers Writers problem에서는 여러 reader들의 접근을 허용한다.
이를 여러개의 적절한 자료구조와 알고리즘을 통해 해결해보자.

이는 2가지 semaphore와 정수형 자료구조를 사용해서 구현할 수 있다.

문제를 해결하기 위해 도입한 자료구조들의 역할은 다음과 같다.

  • int readCount
    버퍼에 현재 접근하고 있는 reader들의 개수를 의미한다. reader 개수가 0이되어야 writer가 접근할 수 있다. 초기값은 0으로 설정한다.
  • Semaphore write
    Writer간의 동기화를 위한 semaphore이다. 초기값은 1로 두고, 1일때에만 writer가 접근할 수 있다.
  • Semaphore mutex
    readCount에 대한 접근과 write semaphore에 대한 접근이 원자적으로 수행되기위한 semaphore이다. 초기값은 1로 설정한다.

pseudo 코드를 바로 살펴보자.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
Writer:
wait(write);
[write]
signal(write);


Reader:
wait(mutex);
readCount++;
// Reader가 최소 1개 진입해있을때 writer의 접근을 막는다.
if (readCount == 1) {
wait(write);
}
signal(mutex);

[read]

wait(mutex);
readCount--;
// 아무런 Reader 도 진입해있지 않으므로 writer의 접근을 허용한다.
if (readCount == 0) {
signal(write);
}
signal(mutex);

이를 자바를 사용하여 구현한 코드도 살펴보자.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
import java.util.concurrent.Semaphore;

public class Data {
private int number;

public int getNumber() {
return number;
}

public void setNumber(int number) {
this.number = number;
}
}

public class Writer {
private int counter;
private final Semaphore writer;
private final Data data;

public Writer(Semaphore writer, Data data) {
this.writer = writer;
this.data = data;
}

public void write() {
try {
writer.acquire();
data.setNumber(++counter);
System.out.println("Write data " + counter + " in thread " + Thread.currentThread().getName());
writer.release();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
}

public class Reader {
private static int readCount;
private final Semaphore writer;
private final Semaphore mutex;
private final Data data;

public Reader(Semaphore writer, Semaphore mutex, Data data) {
this.writer = writer;
this.mutex = mutex;
this.data = data;
}

public void read() {
try {
mutex.acquire();
readCount++;
if (readCount == 1) {
writer.acquire();
}
mutex.release();

int num = data.getNumber();
System.out.println("Read data " + num + " in thread " + Thread.currentThread().getName());

mutex.acquire();
readCount--;
if (readCount == 0) {
writer.release();
}
mutex.release();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
}

public class Main {
private static void sleep(long timeMillis) {
try {
Thread.sleep(timeMillis);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}

public static void main(String args[]) {
Data data = new Data();
Semaphore writer = new Semaphore(1);
Semaphore mutex = new Semaphore(1);

Reader reader1 = new Reader(writer, mutex, data);
Reader reader2 = new Reader(writer, mutex, data);
Writer writer1 = new Writer(writer, data);

Thread readerThread1 = new Thread(() -> IntStream.range(0, 20).forEach(i -> {
reader1.read();
sleep(5);
}));
readerThread1.setName("readerThread-0");
Thread readerThread2 = new Thread(() -> IntStream.range(0 ,20).forEach(i -> {
reader2. read();
sleep(5);
}));
readerThread2.setName("readerThread-1");
Thread writerThread1 = new Thread(() -> IntStream.range(0, 50).forEach(i -> {
writer1.write();
sleep(10);
}));
writerThread1.setName("writerThread-0");
readerThread1.start();
readerThread2.start();
writerThread1.start();
}
}

이에 대한 출력은 다음과 같다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
Read data 0 in thread readerThread-1
Read data 0 in thread readerThread-0
Write data 1 in thread writerThread-0
Read data 1 in thread readerThread-1
Read data 1 in thread readerThread-0
Read data 1 in thread readerThread-1
Read data 1 in thread readerThread-0
Write data 2 in thread writerThread-0
Read data 2 in thread readerThread-1
Read data 2 in thread readerThread-0
Read data 2 in thread readerThread-1
Read data 2 in thread readerThread-0
Write data 3 in thread writerThread-0
Read data 3 in thread readerThread-1
...
...

다만 이와같이 구현을 하게되면 Writer가 starve할 수 있는 문제가 있다. 왜냐하면 readCount가 0일때에만 writer가 접근할 수 있으므로 여러 reader들이 계속 접근을 하게되면 writer는 접근할 수 있는 기회를 얻지 못하게 된다.

동기화 알고리즘의 설계

실제 실무에서도 동기화 문제에 부딪혀 이를 해결해야 하는 경우가 빈번하다.
이를 해결하기 위해 다음과 같은 사항을 고려하며 생각해보면 도움이 될 수 있겠다.

먼저 동기화 문제를 동작조건과 함께 서술해보자. 어떤 조건이 있어야하는지? 어떤 접근형태를 가지는지를 먼저 정리해보자.
그 다음에 사용할 semaphore를 설계한다. 몇개를 사용할지? 이 semaphore가 왜 필요할지, 어떤 대상을 protect할지 사고해보고 결정한다. 또한 각 semaphore에 알맞는 초기값을 설계해야한다.
그리고 알고리즘을 설계한 후 검토한다. 다음 사항을 검토해보자.

  • Data consistency가 확보되는지
  • Deadlock이 발생하는지
  • Starvation 가능성이 있는지
  • Concurrency를 얼마나 제공하는지


댓글