How do I wait for N threads to reach a certain point before continuing onto the next step?

두 stage로 동작하는 멀티 스레드 계산을 수행한다고 생각해봅시다. 이 때 첫번째 stage가 완료될 때까지 두 번째 stage로 진행하고 싶지 않다면 어떻게 해야할까요?

이 때 barrier라고 불리는 synchronization method를 사용합니다. 스레드가 배리어에 도달하면, 그 barrier에서 모든 스레드가 도착할 때까지 기다립니다. 모두 배리어에 도달한다면 그 후에 진행이 시작됩니다.

이것은 친구들과 하이킹을 하는 것과 비슷합니다. 서로는 각 언덕에서 기다리기로 약속합니다. 만약 당신이 먼저 첫 언덕에 가장 먼저 도착했다고 생각해봅시다. 그 정상에서 친구들을 기다려야 합니다. 한명 한명씩 정상에 도달하고, 그들 중 아무도 먼저 나아가지 않습니다. 모두 언덕에 도달하게 되면 다같이 나아갑니다.

pthread는 이러한 일을 수행하기 위해 pthread_barrier_wait()이란 함수를 가지고 있습니다. pthread_barrier_t 변수를 선언해주고 pthread_barrier_init()을 통해 초기화시켜 줍니다. pthread_barrier_init()은 인자로 배리어에 참가하는 스레드의 수를 받습니다.

https://github.com/angrave/SystemProgramming/wiki/Sample-program-using-pthread-barriers


위는 이러한 사용법의 예입니다.


이제 배리어을 사용해 모른 스레드들이 큰 계산에서 sync할수 있도록 구현해 봅시다.


double data[256][8192]

1 Threads do first calculation (use and change values in data)

2 Barrier! Wait for all threads to finish first calculation before continuing

3 Threads do second calculation (use and change values in data)


스레드 함수는 4개의 주요 파트로 이루어져 있습니다.


void *calc(void *arg) {
    /* Do my part of the first calculation */
    /* Am I the last thread to finish? If so wake up all the other threads! */
    /* Otherwise wait until the other threads has finished part one */
    /* Do my part of the second calculation */
}



main 스레드에서는 16개의 스레드를 생산해 16개로 계산을 나누어 줍니다. 각각의 스레드는 특정한 값을 받게 되고, 스스로의 블록을 가지고 작업합니다. void* type은 작은 integer를 저장할 수 있기 때문에 i값을 void pointer로 캐스팅해 넘겨줄 것입니다.


#define N (16)
double data[256][8192];
int main() {
    pthread_t ids[N];
    for (int i = 0; i < N; i++)  
        pthread_create(&ids[i], NULL, calc, (void *) i);


이 포인터 값을 실제 메모리처럼 역참조하지 않습니다.  바로 그 값은 정수로 cast합니다.


void *calc(void *ptr) {
// Thread 0 will work on rows 0..15, thread 1 on rows 16..31
    int x, y, start = N * (int) ptr;
    int end = start + N; 
    for (x = start; x < end; x++) for (y = 0; y < 8192; y++) { /* do calc #1 */ }


첫번째 계산 stage가 끝나면, 제일 늦게 도착한 스레드가 아니라면 다른 스레드들을 기다려야 합니다. 그러므로 checkpoint라고 알려진 배리어에 도착한 스레드의 수를 추적해야 합니다.

// Global: 
int remain = N;


// After calc #1 code:
remain--; // We finished
if (remain == 0) { /*I'm last!  -  Time for everyone to wake up! */ }
else {
    while (remain != 0) { /* spin spin spin*/ }
}



하지만 위의 이 코드는 race condition을 가지고 있고 busy loop를 가지고 있습니다. condition variable을 사용해 broadcast/signal 함수를 이용하면 다른 sleeping 스레드들을 깨울 수 있습니다.


condition variable은 집과 같습니다. 스레드들은 condition variable에 가서 잠이 듭니다.(pthread_cond_wait) 그 중에 한 명만 깨울 수도 있고,(pthread_cond_signal), 모두를 깨울 수도 있습니다(pthread_cond_broadcast). 현재 waiting중인 스레드가 없다면 아무런 일도 일어나지 않습니다.


condition variable version은 종종 busy loop의 부정확한 해결책(아래에 소개)과 유사합니다.

첫 번째로 mutex와 condition variable을 추가하고 main에서 초기화해줍니다.


//global variables
pthread_mutex_t m;
pthread_cond_t cv;

int main() {
    pthread_mutex_init(&m, NULL);
    pthread_cond_init(&cv, NULL);


remain을 한 번에 한 스레드만이 수정할 수 있도록 mutex를 사용합니다. 마지막으로 도착하는 스레드는 잠들어 있는 다른 모든 스레드를 깨웁니다. 즉, pthread_cond_signal이 아닌 pthread_cond_broadcast를 사용합니다.


pthread_mutex_lock(&m);
remain--; 
if (remain == 0) { pthread_cond_broadcast(&cv); }
else {
    while (remain != 0) { pthread_cond_wait(&cv, &m); }
}
pthread_mutex_unlock(&m);


스레드가 pthread_cond_wait에 진입하면, mutex를 release하고 sleep합니다. 나중에 이 스레드는 깨워지게 됩니다.  일단 스레드가 깨워지게 되면 반환되기 전에 mutex를 lock할수 있도록 기다려야 합니다. sleeping thread가 먼저 일어나도, while loop안에서 condition을 확인해 원한다면 다시 wait합니다.


위의 barrier는 다시 사용할 수 없습니다. 만약 어떤 계산 loop를 넣는다면 deadlck이나 race condition이 될 수 있을 가능성이 높아집나다. 얼마나 위의 barrier를 재사용 가능하게 만들수 있을지 생각해 봅시다. 어떻게 여러개의 스레드가 loop안에서 barrier_wait을 호출하여 같은 iteration안에 있는지를 확인할 수 있는 방법을 생각해 봅시다.


Posted by 몰랑&봉봉
,