What is a ring buffer?

링 버퍼는 연속된 메모리를 원형처럼 취급하는 단순하고 보통 크기가 정해진 저장 방식입니다. 시작과 끝 두 개의 인덱스를 가지고 있습니다. array는 인덱싱이 원형이 아니므로 array의 끝에 도달했을 때 다시 index count가 0으로 돌아와야 합니다. 큐의 앞에 데이터가 추가되거나 끝에서 제거된 경우, 현재 item들은 트랙을 따라 원을 그리는 열차 모양을 하게 됩니다.


단순한 single 스레드로 구현한다면 아래처럼 구현이 가능합니다.


void *buffer[16];
int in = 0, out = 0;

void enqueue(void *value) { /* Add one item to the front of the queue*/
  buffer[in] = value;
  in++; /* Advance the index for next time */
  if (in == 16) in = 0; /* Wrap around! */
}

void *dequeue() { /* Remove one item to the end of the queue.*/
  void *result = buffer[out];
  out++;
  if (out == 16) out = 0;
  return result;
}


enqueue와 dequeue 모두 underflow나 overflow를 막아주지 않습니다. 즉, 가득 찬 큐에 넣거나, 빈 큐에서 제거하는 것이 가능합니다. 예를 뒤에서 위의 그림과 같은 Ring buffer에서 1부터 20까지 정수를 넣고 아무것도 제거하지 않는다면 17, 18, 19, 20은 1, 2, 3, 4를 덮어쓸 것입니다. 이러한 문제점을 바로 고치지는 않겠습니다. 대신 멀티스레드를 사용하는 버전을 만들어 보겠습니다. 이 때 queue-ing과 dequeue-ing 스레드는 링 버퍼가 가득 차거나 비었을 때 각각 block되어야 합니다.


What are gotchas of implementation a Ring Buffer?

enqueue와 dequeu를 다음과 같은 압축된 형식으로 사용할 생각이 들지 않았나요?
void enqueue(void *value)
  b[ (in++) % N ] = value;
}

위와 같은 코드는 정삭적으로 동작할 것처럼 보이지만 버그가 존재합니다. in이 충분히 커져서 int의 양수 boundery를 벗어나면 overflow가 발생해 음수가 됩니다.  %연산자는 부호를 그대로 유지합니다. 그러므로 b[-14]와 같은 잘못된 접근을 할 수 있습니다.
또한 아직 overflow와 underflow를 방지하지 못합니다. 이것을 하기 위해 멀티스레드로 구현하는 시도를 해 보겠습니다. 이 때 공간이 생기거나 하나이상의 아이템이 생길 때까지 block 되도록 만듭니다.

Checking a multi-threaded implementation for correctness(Example 1)

void *b[16]
int in = 0, out = 0
p_m_t lock
sem_t s1, s2
void init() {
    sem_init(&s1,0,16)
    sem_init(&s2,0,0)
}

enqueue(void *value){

 sem_wait(&s2)
 p_m_lock(&lock)

 b[ (in++) & (N-1) ] = value

 p_m_unlock(&lock)
 sem_post(&s1)
}

void *dequeue(){
  sem_wait(&s1)
  p_m_lock(&lock)
  void *result = b[(out++) & 15]
  p_m_unlock(&lock)
  sem_post(&s2)

  return result;
}


위 코드는 정확한 구현이 이루어지지 않았습니다. 어디서 문제가 발생할까요? 다음과 같은 부분을 생각해봅시다.

Enqueue/dequeue가 block되나요? / mutual exclusion을 만족합니까? / buffer underflow나 overflow가 발생하지 않을까요? 

위의 코드에서는 좀 더 보기 쉽도록 pthread_mutex를 p_m으로 사용했습니다. sem_wait은 interrupt되지 않는다고 가정합시다.


분석

  • s2의 초기값이 0입니다. 그러므로 버퍼가 비어있더라도 sem_wait을 처음 호출할 때 enqueue가 block됩니다.
  • s1의 초기값이 16입니다. 그러므로 버퍼가 비어있어도 처음 sem_wait을 호출할 때 dequeue가 block되지 않습니다. 즉, underflow가 발생해 dequeue가 invalid data를 반환할 것입니다.
  • 코드가 mutual exclusion을 만족하지 못합니다. 두 스레드가 in이나 out을 동시에 수정할 수 있습니다. 코드는 mutex lock을 사용하는 것처럼 보이지만, lock이 pthread_mutex_init()을 통해 초기화되지 않아 lock이 동작하지 않습니다.(pthread_mutex_lock은 아무런 동작도 하지 않습니다.)

Correct implementation of a ring buffer

아래는 pseudo 코드입니다. 
#include <pthread.h>
#include <semaphore.h>
// N must be 2^i
#define N (16)

void *b[N]
int in = 0, out = 0
p_m_t lock = PTHREAD_MUTEX_INITIALIZER
sem_t countsem, spacesem

void init() {
  sem_init(&countsem, 0, 0)
  sem_init(&spacesem, 0, 16)
}

mutex lock은 global memory에 저장되어 PTHREAD_MUTEX_INITALIZER로 초기화됩니다. 만약 mutex를 위한 공간을 heap에 할당한다면 pthread_mutex_init(ptr, NULL)과 같이 사용합니다.



Enqueue method는 다음과 같습니다.

enqueue(void *value){
 // wait if there is no space left:
 sem_wait( &spacesem )

 p_m_lock(&lock)
 b[ (in++) & (N-1) ] = value
 p_m_unlock(&lock)

 // increment the count of the number of items
 sem_post(&countsem)
}

-lock은 Critical section동안만 가지고 있습니다.(data structure에 접근시)

-완벽한 구현은 POSIX signal로 인해 sem_wait으로부터 early return하는 경우를 방지해야 합니다.


dequeue의 구현은 다음과 같습니다.


void *dequeue(){
  // Wait if there are no items in the buffer
  sem_wait(&countsem)

  p_m_lock(&lock)
  void *result = b[(out++) & (N-1)]
  p_m_unlock(&lock)

  // Increment the count of the number of spaces
  sem_post(&spacesem)

  return result
}

enqueue의 동기화 호출과의 대칭성을 비교해보세요. 두 경우 모두 count가 0이거나 space의 수와 같다면 일단 wait합니다.




생각할거리


pthread_mutex_lock과 sem_post의 호출이 바뀐다면 어떤 일이 발생할까요?

sem_wait과 pthread_mutexlock의 호출 순서가 바뀐다면 어떤 일이 발생할까요>


Posted by 몰랑&봉봉
,