What is a ring buffer?
단순한 single 스레드로 구현한다면 아래처럼 구현이 가능합니다.
void *buffer[16]; int in = 0, out = 0; void enqueue(void *value) { /* Add one item to the front of the queue*/ buffer[in] = value; in++; /* Advance the index for next time */ if (in == 16) in = 0; /* Wrap around! */ } void *dequeue() { /* Remove one item to the end of the queue.*/ void *result = buffer[out]; out++; if (out == 16) out = 0; return result; }
enqueue와 dequeue 모두 underflow나 overflow를 막아주지 않습니다. 즉, 가득 찬 큐에 넣거나, 빈 큐에서 제거하는 것이 가능합니다. 예를 뒤에서 위의 그림과 같은 Ring buffer에서 1부터 20까지 정수를 넣고 아무것도 제거하지 않는다면 17, 18, 19, 20은 1, 2, 3, 4를 덮어쓸 것입니다. 이러한 문제점을 바로 고치지는 않겠습니다. 대신 멀티스레드를 사용하는 버전을 만들어 보겠습니다. 이 때 queue-ing과 dequeue-ing 스레드는 링 버퍼가 가득 차거나 비었을 때 각각 block되어야 합니다.
What are gotchas of implementation a Ring Buffer?
void enqueue(void *value) b[ (in++) % N ] = value; }
Checking a multi-threaded implementation for correctness(Example 1)
void *b[16] int in = 0, out = 0 p_m_t lock sem_t s1, s2 void init() { sem_init(&s1,0,16) sem_init(&s2,0,0) } enqueue(void *value){ sem_wait(&s2) p_m_lock(&lock) b[ (in++) & (N-1) ] = value p_m_unlock(&lock) sem_post(&s1) } void *dequeue(){ sem_wait(&s1) p_m_lock(&lock) void *result = b[(out++) & 15] p_m_unlock(&lock) sem_post(&s2) return result; }
위 코드는 정확한 구현이 이루어지지 않았습니다. 어디서 문제가 발생할까요? 다음과 같은 부분을 생각해봅시다.
Enqueue/dequeue가 block되나요? / mutual exclusion을 만족합니까? / buffer underflow나 overflow가 발생하지 않을까요?
위의 코드에서는 좀 더 보기 쉽도록 pthread_mutex를 p_m으로 사용했습니다. sem_wait은 interrupt되지 않는다고 가정합시다.
분석
- s2의 초기값이 0입니다. 그러므로 버퍼가 비어있더라도 sem_wait을 처음 호출할 때 enqueue가 block됩니다.
- s1의 초기값이 16입니다. 그러므로 버퍼가 비어있어도 처음 sem_wait을 호출할 때 dequeue가 block되지 않습니다. 즉, underflow가 발생해 dequeue가 invalid data를 반환할 것입니다.
- 코드가 mutual exclusion을 만족하지 못합니다. 두 스레드가 in이나 out을 동시에 수정할 수 있습니다. 코드는 mutex lock을 사용하는 것처럼 보이지만, lock이 pthread_mutex_init()을 통해 초기화되지 않아 lock이 동작하지 않습니다.(pthread_mutex_lock은 아무런 동작도 하지 않습니다.)
Correct implementation of a ring buffer
#include <pthread.h> #include <semaphore.h> // N must be 2^i #define N (16) void *b[N] int in = 0, out = 0 p_m_t lock = PTHREAD_MUTEX_INITIALIZER sem_t countsem, spacesem void init() { sem_init(&countsem, 0, 0) sem_init(&spacesem, 0, 16) }
mutex lock은 global memory에 저장되어 PTHREAD_MUTEX_INITALIZER로 초기화됩니다. 만약 mutex를 위한 공간을 heap에 할당한다면 pthread_mutex_init(ptr, NULL)과 같이 사용합니다.
Enqueue method는 다음과 같습니다.
enqueue(void *value){ // wait if there is no space left: sem_wait( &spacesem ) p_m_lock(&lock) b[ (in++) & (N-1) ] = value p_m_unlock(&lock) // increment the count of the number of items sem_post(&countsem) }
-lock은 Critical section동안만 가지고 있습니다.(data structure에 접근시)
-완벽한 구현은 POSIX signal로 인해 sem_wait으로부터 early return하는 경우를 방지해야 합니다.
dequeue의 구현은 다음과 같습니다.
void *dequeue(){ // Wait if there are no items in the buffer sem_wait(&countsem) p_m_lock(&lock) void *result = b[(out++) & (N-1)] p_m_unlock(&lock) // Increment the count of the number of spaces sem_post(&spacesem) return result }
enqueue의 동기화 호출과의 대칭성을 비교해보세요. 두 경우 모두 count가 0이거나 space의 수와 같다면 일단 wait합니다.
생각할거리
sem_wait과 pthread_mutexlock의 호출 순서가 바뀐다면 어떤 일이 발생할까요>
'Angrave System Programming > Synchronization' 카테고리의 다른 글
Synchronization: Synchronization Across Processes (0) | 2019.02.07 |
---|---|
Synchronization: The Reader Writer Problem (0) | 2019.01.17 |
Synchronization: Implementing a barrier (0) | 2019.01.17 |
Synchronization: Condition Variables(2) (0) | 2019.01.16 |
Synchronization: Condition Variables(1) (1) | 2019.01.16 |