Unix2019b/Внутренняя реализация потоков
Содержание
«Сырое» создание потока
Мы умеем создавать процессы, используя только системный вызов fork. Спрашивается, можно ли создать вручную поток, не используя библиотеку POSIX Threads. Практической ценности этот навык не несёт.
clone()
Этот системный вызов специфичен для ОС Linux и не входит в стандарт POSIX.
#include <sched.h> int clone(int (*fn)(void *), void *child_stack, int flags, void *arg, ... /* pid_t *ptid, struct user_desc *tls, pid_t *ctid */ );
В результате он создаёт новый runnable entity. Но можно более тонко настроить, что будет скопировано, а что будет общим у дочернего и родительского entity: адресное пространство, таблица дескрипторов, таблица обработчиков сигналов. В зависимости от опций получится или «процесс», или «поток».
В Linux вызов fork() реализуется через clone().
NPTL для создания потоков делает вызов clone() с параметрами
CLONE_VM | CLONE_FILES | CLONE_FS | CLONE_SIGHAND | CLONE_THREAD | CLONE_SETTLS | CLONE_PARENT_SETTID | CLONE_CHILD_CLEARTID | CLONE_SYSVSEM
В прикладном коде почти никогда не нужно вызывать clone() вручную для создания потоков. Лучше использовать pthreads, это переносимо и гораздо удобнее.
Пример
#define _GNU_SOURCE #include <stdio.h> #include <unistd.h> #include <sched.h> #include <stdlib.h> #include <sys/types.h> #include <sys/wait.h> volatile int g_counter = 0; #define STACK_SIZE 4096 int ThreadFunc(void* ptr) { volatile int* shouldTerminate = ptr; while (!*shouldTerminate) { printf("[thread] = %d\n", g_counter); sleep(1); } printf("[thread] Terminating...\n"); return 0; } int main() { printf("My pid: %d\n", getpid()); void* stack = malloc(STACK_SIZE); if (stack == NULL) { perror("Unable to allocate thread stack"); return 1; } int shouldTerminate = 0; int flag = SIGCHLD | CLONE_SIGHAND | CLONE_FS | CLONE_VM | CLONE_FILES; pid_t tid = clone(ThreadFunc, stack + STACK_SIZE, flag, &shouldTerminate); if (tid == -1) { perror("clone()"); return 1; } printf("Done! Thread pid: %d\n", tid); while (1) { if (getchar() == EOF) { shouldTerminate = 1; break; } g_counter++; } printf("Waiting for thread...\n"); int status; if (waitpid(tid, &status, __WALL) == -1) { perror("waitpid()"); } free(stack); printf("Done! Graceful shutdown.\n"); return 0; }
В примере обратите внимание на то, что раз на архитектуре x86 стек растёт вниз, то в clone передаётся указатель на конец массива, используемого под стек, а не указатель на начало.
Атомарные операции
Типичный пример гонки
Допустим, есть глобальная переменная
volatile int count;
и два потока, каждый поток 100 млн раз увеличивает на единицу её значение:
void* ThreadFunc(void* arg) { for (int i = 0; i < N; ++i) { ++count; } return NULL; }
Ожидаем в результате получить 200 млн.
Запускаем, работает быстро, но неправильно:
$ time ./a.out Result = 102494218 real 0m0.315s user 0m0.500s sys 0m0.000s
Посмотрим, что там в ассемблере:
0000000000400c80 <ThreadFunc>: 400c80: ba 00 e1 f5 05 mov edx,0x5f5e100 400c85: 0f 1f 00 nop DWORD PTR [rax] 400c88: 8b 05 92 d4 2d 00 mov eax,DWORD PTR [rip+0x2dd492] # 6de120 <count> 400c8e: 83 c0 01 add eax,0x1 400c91: 83 ea 01 sub edx,0x1 400c94: 89 05 86 d4 2d 00 mov DWORD PTR [rip+0x2dd486],eax # 6de120 <count> 400c9a: 75 ec jne 400c88 <ThreadFunc+0x8>
Понятно: сначала чтение из памяти в регистр, потом инкремент регистра, потом запись в память. Имеет место race condition:
Защитим нашу переменную с помощью мьютекса.
void* ThreadFunc(void* arg) { for (int i = 0; i < N; ++i) { pthread_mutex_lock(&mutex); ++count; pthread_mutex_unlock(&mutex); } return NULL; }
Работает правильно, но очень медленно.
Result = 200000000 real 0m10.913s user 0m12.500s sys 0m8.719s
Обманчивая атомарность на процессорах x86
Казалось бы, в машинном коде отдельная инструкция неделима. При прерывании, при переключении контекста всегда какая-то инструкция или выполнена, или нет. Но на самом деле далеко не все инструкции атомарны. Архитектура предоставляет специальные атомарные инструкции.
Попробуем в нашем примере сделать инкремент одной инструкцией при помощи inline-ассемблера.
asm("incl count");
Однако это не помогает:
Result = 103679802 real 0m0.286s user 0m0.500s sys 0m0.000s
Выясняется, что обычные инструкции read-modify-write (например add или inc) работают неатомарно.
Атомарные инструкции
К атомарным инструкциям на x86 относят:
- чтение/запись байта;
- чтение/запись 2/4/8 байт по адресу, выровненному на 2/4/8;
- невыровненное обращение к закешированной памяти, попадающее в линию кеша;
- read-modify-write-инструкции с префиксом lock (add, sub, dec, inc, bts, btr, btc);
- exchange-инструкции с префиксом lock;
- compare-and-exchange-инструкции с префиксом lock.
Попробуем добавить префикс lock [1]. Он пишется перед инструкцией и имеет в бинарном виде код 0xf0.
asm("lock incl count");
Это приводит к верному результату (существенно быстрее, чем с мьютексом, но медленнее, чем с обычными инструкциями без локов):
Result = 200000000 real 0m2.994s user 0m5.906s sys 0m0.016s
Кроме того, есть такие инструкции.
- xchg dst, src. Производит обмен двух значений.
- xadd dst, src. Эквивалент src = dst, dst = (old)src + dst.
- cmpxchg dst, src. Сравнивает dst и rax, если равны, то записывает src в dst и выставляет флаг ZF, иначе сохраняет dst в rax.
Последняя инструкция лежит в основе высокоуровневой операции CAS (compare and swap) — сравнить значение в памяти с одним из аргументов, и в случае успеха записать второй аргумент в память. На основе этой операции строятся разные неблокирующие алгоритмы.
В стандарте C11
UNIX Portable Atomic Operations
Спрашивается, можно ли пользоваться atomic-операциями из C, не привлекая ассемблерные вставки.
Стало можно:
- в стандарте C11 ввели atomic types;
- в стандарте C++11 ввели std::atomic.
Барьеры памяти
Барьер (англ. memory barrier, memory fence) — которая приказывает компилятору (при генерации инструкций) и центральному процессору (при исполнении инструкций) устанавливать строгую последовательность между обращениями к памяти до и после барьера. Это означает, что все обращения к памяти перед барьером будут гарантированно выполнены до первого обращения к памяти после барьера.
Барьеры памяти необходимы, так как большинство современных процессоров используют оптимизации производительности, которые могут привести к переупорядочиванию инструкций. Также переупорядочивание обращений к памяти может быть вызвано компилятором в процессе оптимизации использования регистров целевого процессора. Такие перестановки обычно не влияют на корректность программы с одним потоком исполнения, но могут вызвать непредсказуемое поведение в многопоточных программах.
Абстрактный пример
Следующая программа исполняется на двух процессорах.
Изначально ячейки памяти x
и f
содержат значение 0
. Программа в процессоре #1 находится в цикле пока f
равен нулю, затем она печатает значение x
. Программа в процессоре #2 записывает значение 42
в x
, а затем сохраняет значение 1
в f
. Псевдокод для двух программных фрагментов:
Процессор #1:
while (f == 0) { } // Здесь необходим барьер print x;
Процессор #2:
x = 42; // Здесь необходим барьер f = 1;
Хотя ожидается, что print
всегда напечатает «42», но если процессор #2 изменит порядок исполнения инструкций и вначале изменит значение f
то print может вывести «0». Аналогично, процессор #1 может прочитать x
перед f
, и print снова выведет неожидаемое значение. Для большинства программ, ни одна из этих ситуаций не приемлема. Барьер памяти для процессора #2 может быть вставлен перед изменением значения f
. Также можно вставить барьер для процессора #1 перед чтением x
В процессорах x86
Архитектура предоставляет следующие инструкции:
- SFENCE (англ. store fence),
- LFENCE (англ. load fence),
- MFENCE (англ. memory fence).
При программировании на С
Барьеры памяти работают только на аппаратном уровне. Компиляторы могут также переупорядочить инструкции как часть оптимизации программы. Меры по предотвращению переупорядочивания необходимы только для данных, которые не защищены примитивами синхронизации.
В языках С и С++, ключевое слово volatile предназначено для исключения оптимизаций компилятора. Используется чаще всего для работы с отображаемым в память вводом-выводом. Однако данное ключевое слово (в отличие от Java) никак не обеспечивает атомарность и защиту от внеочередного исполнения. Доступы к volatile-переменным не могут быть переупорядочены, но volatile не влияет на порядок обращения к другим переменным.
Конструкция вида asm volatile ("" ::: "memory"); для gcc работает как программный барьер, действует только на этапе компиляции и не генерирует никаких инструкций процессору. Она предотвращает перестановку инструкций компилятором.
Встроенная функция __sync_synchronize(), наоборот, вынуждает компилятор вставить барьерную инструкцию для процессора.
При использовании примитивов синхронизации
Многопоточные программы обычно используют примитивы синхронизации, предоставляемые высокоуровневым API, таким как POSIX Threads. Примитивы синхронизации, такие как мьютексы, обычно реализуются с барьерами памяти, необходимыми для обеспечения ожидаемой семантики видимости памяти. Явное использование барьеров памяти обычно не требуется, барьеры уже встроены в библиотеку. Если к переменной обращение идёт внутри критической секции (под мьютексом), делать её atomic или volatile нет никакого смысла.
pthread_mutex_lock(m); count++; pthread_mutex_unlock(m);
Устройство примитивов синхронизации
Мьютексы и прочие высокоуровневые примитивы синхронизации помогают нам писать многопоточный код. Но возникает вопрос, каким образом эти примитивы синхронизации реализованы внутри. Оказывается, на ОС Linux большая часть этих структур из pthread построена на основе так называемых фьютексов.
Мы предполагаем, что в библиотеке нам доступен ряд функций для выполнения атомарных операций, реализация которых использует специальные инструкции процессора в зависимости от архитектуры.
- atomic_dec(var) — уменьшить значение переменной на единицу, вернуть её старое значение.
- atomic_inc(var) — увеличить значение переменной на единицу, вернуть её старое значение.
- cmpxchg(var, old, new) — если текущее значение var равно old, то заменить его на new. В любом случае веруть значение var до операции.
Как правило, эти операции сравнительно быстрые, но гораздо медленнее неатомарных аналогов. На x86 могут занимать порядка 100 тактов.
Spinlock
Спин-блокировка (spinlock) — это примитив, в котором поток, пытающийся завладеть блокировкой, просто ждёт в цикле («крутится»), регулярно проверяя, свободна ли блокировка. Поскольку поток остается активным, но не выполняет полезную работу, использование такой блокировки является так называемым busy waiting.
Создадим целочисленную переменную и введём состояния:
- 0 — unlocked,
- 1 — locked.
Напишем код на псевдокоде, который похож на C++.
class mutex { public: mutex() : val(0) { } void lock() { while (cmpxchg(val, 0, 1) != 0) { // Changing from 0 to 1 means that we have acquired the lock. } } void unlock() { atomic_dec(val); } private: int val; }
- Спинлоки применяются для синхронизации небольших участков кода, когда использование более сложных механизмов неоправданно или невозможно.
- Более интеллектуальная реализация будет использовать обычную, а не атомарную операцию, для опроса в цикле, а атомарную операцию — только для попыток захвата.
- Если спинлок используется в ядре, обычно есть смысл на время запретить прерывания, чтобы время, в течение которого захвачен лок, было минимальным. Если в момент нахождения внутри критической секции придёт прерывание, время нахождения внутри этой секции может стать неопределённо большим.
- Если спинлок используется в пользовательском пространстве, то в момент нахождения в критической секции может произойти переключение контекста или может прийти сигнал. Хоть эти события и редки, они могут создать бесполезную трату ресурсов в ожидающих потоках.
Futex
Futex (сокращение от fast userspace mutex) — способ реализации базовых блокировок, «кирпичик» для построения более сложных примитивов синхронизации (семафоров, мьютексов). Фьютексы появились в ядре Linux в 2003 г.
Фьютекс состоит из двух частей:
- выровненной переменной 32-битного целого типа, которая размещается в userspace,
- очередь ожидания в kernelspace, ассоциированная с этим целым числом.
Основная идея состоит в том, чтобы процессы и потоки могли оперировать с целым числом в пользовательском пространстве, выполняя дорогие системные вызовы в ядро только в случае, когда нужны операции с очередью ожидания (например, разбудить поток или усыпить текущий поток).
Правильно спроектированный примитив синхронизации на базе фьютекса при отсутствии соперничества не делает системных вызовов. Например, чтобы взять лок, мы делаем лишь атомарный инкремент переменной (это быстро). Если при этом детектируется наличие других соревнующихся потоков, то тогда делаем настоящий системный вызов (это медленно). В итоге чаще всего в реальной практике удаётся избегать системных вызовов.
Словом futex называется системный вызов:
#include <linux/futex.h> #include <sys/time.h> int futex(int *uaddr, int futex_op, int val, const struct timespec *timeout, /* or: uint32_t val2 */ int *uaddr2, int val3);
Впервые введены в ядро Linux с версии 2.5.7 (development); выработана стабильная семантика с 2.5.40; включаются в стабильные версии серии 2.6.x.
Операции
Значения целочисленной переменной могут быть любыми (те, которые удобны для данной реализации).
Операций несколько, все они выражаются через указанный системный вызов. Мы для простоты рассмотрим только две. Названия и сигнатуры условны (для большей очевидности).
- futex_wait(addr, val) — приостановить выполнение текущего потока в случае, если переменная в памяти имеет указанное значение. Перед помещением потока в очередь ожидания ядро проверяет значение переменной по адресу addr: если там записано не val, то засыпать не нужно, сразу же вернуть управление.
- futex_wake(addr, count) — разбудить один или несколько потоков в очереди. Макс. количество потоков передаётся в переменной count.
Пример: event
class event { public: event() : val(0) { } void ev_signal() { atomic_inc(val); futex_wake(&val, INT_MAX); } void ev_wait() { futex_wait(&val, val); } private: int val; };
Использовать атомарный инкремент тут необязательно.
Пример: mutex
Введём состояния по значениям переменной-фьютекса:
- 0 — unlocked,
- 1 — locked, no waiters,
- 2 — locked, one or more waiters.
Приведём реализацию мьютекса на псевдокоде.
class mutex { public: mutex() : val(0) { } void lock() { int c; // If the lock was previously unlocked, there's nothing else for us to do. // Otherwise, we'll probably have to wait. if ((c = cmpxchg(val, 0, 1)) != 0) { do { // If the mutex is locked, we signal that we're waiting by setting the // val to 2. A shortcut checks is it's 2 already and avoids the atomic // operation in this case. if (c == 2 || cmpcxhg(val, 1, 2) != 0) { // Here we have to actually sleep, because the mutex is actually // locked. Note that it's not necessary to loop around this syscall; // a spurious wakeup will do no harm since we only exit the do...while // loop when val is indeed 0. futex_wait(&val, 2); } // We're here when either: // (a) the mutex was in fact unlocked (by an intervening thread). // (b) we slept waiting for the val and were awoken. // // So we try to lock the val again. We set the state to 2 because we // can't be certain there's no other thread at this exact point. So we // prefer to err on the safe side. } while ((c = cmpxchg(val, 0, 2)) != 0); } } void unlock() { if (atomic_dec(val) != 1) { val = 0; futex_wake(&val, 1); } } private: int val; }
Может возникнуть соблазн немного упростить код, но при этом очень легко сделать его совсем неправильным. Подробный разбор с объяснением, почему сделано так, а не иначе, можно почитать в статье "Futexes are Tricky".