Thread Safe Stack

What is an atomic operation?

Wiki 내용을 보면,
시스템의 나머지 부분이 즉시 발생하는 것처럼 보이는 경우 atomic 또는 uninterruptible operation입니다.
lock이 없다면, 단순한 CPU instruction만이 atomic operation입니다(바이트를 메모리로부터 읽어오기 등). Single CPU system에서는 interrupt를 일시적으로 비활성화하지만(이렇게 함으로써 연속된 operation이 interrupt되지 않습니다), 사실은 이런 atomicity는 mutex lock같은 synchronization primitives를 이용해 얻을 수 있습니다.
라고 되어있습니다.

i++같이 increment해주는 것은 atomic하지 않습니다. 사실 이 과정은 3 가지로 나뉘기 때문입니다.

bit pattern을 메모리에서 CPU로 복사하기 ; CPU register를 사용해 계산하기; 계산한 bit pattern을 메모리에 다시 복사하기.

이러한 increment step 중에 다른 스레드나 프로세스가 이전 값을 읽거나 increment가 끝난 후에 덮어쓰기가 일어날 수도 있습니다.


How do I use mutex lock to make my data-structure thread-safe?

이 코스는 introduction입니다. 높은 효율의 thread-safe 자료구조를 쓰는 것은 하나의 책이 통채로 필요합니다. 다음은 thread-safe가 아닌 단순한 자료구조(stack)입니다.

// A simple fixed-sized stack (version 1)
#define STACK_SIZE 20
int count;
double values[STACK_SIZE];

void push(double v) { 
    values[count++] = v; 
}

double pop() {
    return values[--count];
}

int is_empty() {
    return count == 0;
}



스택 Version 1은 두 스레드가 동시에 push나 pop을 호출한다면 결과나 stack이 정확하지 않을 수 있어 thread-safe가 아닙니다. 예를 들어 두 스레드가 pop을 동시에 호출한다면 두 스레드는 같은 값을 읽어 둘 다 원래 count value를 읽을 것입니다.

이것을 thread-safe 자료구조로 바꾸려면 코드의 CS를 알아내야 합니다. (CS-critical section: 한 번에 한 스레드만 진입해야 하는 코드 부분) 위의 예에서면, push, pop, is_empty 함수가 같은 변수에 접근하고, 스택의 CS 전부입니다.


push(또는 pop)이 동작하는 동안, 자료구조는 불안정안 상태입니다.(예를 들면 count는 아직 쓰여지지 않아 여전히 원래 값을 가지고 있을 수도 있습니다) 이러한 method를 mutex로 둘러싸 한 번에 한 스레드에서 스택을 update(또는 read)할 수 있게 보장합니다.


다음은 위를 적용해 만든 정확한 코드 후보입니다. 정확할까요?아니면 틀릴까요?


// An attempt at a thread-safe stack (version 2)
#define STACK_SIZE 20
int count;
double values[STACK_SIZE];

pthread_mutex_t m1 = PTHREAD_MUTEX_INITIALIZER;
pthread_mutex_t m2 = PTHREAD_MUTEX_INITIALIZER;

void push(double v) { 
    pthread_mutex_lock(&m1);
    values[count++] = v;
    pthread_mutex_unlock(&m1);
}

double pop() {
    pthread_mutex_lock(&m2);
    double v = values[--count];
    pthread_mutex_unlock(&m2);

    return v;
}

int is_empty() {
    pthread_mutex_lock(&m1);
    return count == 0;
    pthread_mutex_unlock(&m1);
}



위 코드를 version 2라고 하면, 적어도 하나의 에러를 가지고 있습니다.  에러를 찾는 시간을 가지고 그 결과를 생각해봅시다.


만약 세개의 스레드가 push를 동시에 호출하면 m1 lock은 한 스레드만이 스택을 사용할 수 있게 보장합니다.(나머지 두 스레드는 첫 스레드가 완료되어 unlock하기 전까지 wait합니다. 그러고 나면 다음 스레드가 CS에 진입하고 그 스레드가 종료된 후에 세 번째 스레드가 진입하는 것을 허가받습니다.)


pop이 동시에 호출되는 것도 위와 비슷하게 적용되지만, version 2에서는 push와 pop이 서로 다른 mutex lock을 사용해 posh와 pop이 동시에 호출되는 경우를 방지하지 못합니다.


이것을 수정하는 것은 매우 단순합니다. push와 pop function에서 하나의 mutex lock을 사용하는 것입니다.


두 번째 에러로는 is_empty가 비교 후 return하고 나면 unlock이 되지 않는다는 것입니다. 하지만 이 에러는 바로 보이지 않을수도 있습니다. 예를 들면 하나의 스레드가 is_empty를 호출하고 다른 스레드가 push를 호출합니다. 이 스레드는 멈추게 될 것입니다. 디버거를 사용해 스레드가 push 안에 lock method에 막혀있다는 것을 발견할수 있습니다. 왜냐하면 이전에 is_empty에서 걸린 lock이 영원히 풀리지 않기 때문입니다. 그러므로 한 스레드에서 발생된 문제는 더 뒤에 임의의 다른 스레드에서 문제를 발생시킬 수 있습니다.


더 발전된 version 3는 아래와같습니다.


// An attempt at a thread-safe stack (version 3)
int count;
double values[count];
pthread_mutex_t m = PTHREAD_MUTEX_INITIALIZER;

void push(double v) { 
  pthread_mutex_lock(&m); 
  values[count++] = v;
  pthread_mutex_unlock(&m);
}
double pop() {
  pthread_mutex_lock(&m);
  double v = values[--count];
  pthread_mutex_unlock(&m);
  return v;
}
int is_empty() {
  pthread_mutex_lock(&m);
  int result= count == 0;
  pthread_mutex_unlock(&m);
  return result;
}


Version 3는 thread-safe(모든 critical section에 대해서 mutual exclusion을 보장)입니다. 하지만 다음 2가지를 조심해야 합니다. 

  • is_empty는 thread-safe지만 결과는 이미 유효기간이 지났을 수도 있습니다. 예를 들어 thread가 result를 return하는 순간에 스택이 이미 비지 않았을 수도 있기 때문입니다.
  • underflow(빈 스택에서 pop)이나 overflow(가득 찬 스택에 push)에 대한 보호가 되어있지 않습니다.

뒤의 문제는 counting semaphore를 이용하면 고쳐질 수 있습니다.

이 구현은 하나의 스택을 가정합니다. 더 일반적은 목적의 버전은 mutex를 메모리 구조의 일부로 포함해 mutex를 초기화하기 위해 pthread_mutex_init을 사용할 수도 있습니다. 예를 들면

// Support for multiple stacks (each one has a mutex)
typedef struct stack {
  int count;
  pthread_mutex_t m; 
  double *values;
} stack_t;

stack_t* stack_create(int capacity) {
  stack_t *result = malloc(sizeof(stack_t));
  result->count = 0;
  result->values = malloc(sizeof(double) * capacity);
  pthread_mutex_init(&result->m, NULL);
  return result;
}
void stack_destroy(stack_t *s) {
  free(s->values);
  pthread_mutex_destroy(&s->m);
  free(s);
}
// Warning no underflow or overflow checks!

void push(stack_t *s, double v) { 
  pthread_mutex_lock(&s->m); 
  s->values[(s->count)++] = v; 
  pthread_mutex_unlock(&s->m); }

double pop(stack_t *s) { 
  pthread_mutex_lock(&s->m); 
  double v = s->values[--(s->count)]; 
  pthread_mutex_unlock(&s->m); 
  return v;
}

int is_empty(stack_t *s) { 
  pthread_mutex_lock(&s->m); 
  int result = s->count == 0; 
  pthread_mutex_unlock(&s->m);
  return result;
}


이 자료구조의 사용 예는 다음과 같습니다.


int main() {
    stack_t *s1 = stack_create(10 /* Max capacity*/);
    stack_t *s2 = stack_create(10);
    push(s1, 3.141);
    push(s2, pop(s1));
    stack_destroy(s2);
    stack_destroy(s1);
}










Posted by 몰랑&봉봉
,