Work queue has always been one of the most hot points in server software. Here is how to scale it effectively to multi-core environment. I. INTRODUCTION Nowadays high performance server software (e.g. HTTP accelerator) in most cases runs on multi-core machines. Modern hardware could provide 32, 64 and more CPU cores. In such highly-concurrent environment lock contention sometimes hurts overall system performance more than data copying, context switches etc. Thus, moving most hot data structures from locked to lock-free design can significantly improve performance of software working in multi-core environment. One of the most hot data structure in traditional server software is work queue, which could have hundreds of thousands push and pop operations per second from tens of producers and/or consumers. Work queue is a FIFO data structure which has only two operations: push() and pop(). It usually limits its size such that pop() waits if there is no elements in the queue and push() waits if the queue contains maximum allowed number of elements. It is important that many threads can execute pop() and push() operations simultaneously on different CPU cores. One of the possible work queue implementations is ring buffer storing pointers to the queued elements. It has good performance especially in comparison with common non-intrusive linked list (which stores copies of values passed by the user, e.g. std::list). Significant thing about ring buffer implementation is that it natively limits its size - you only need to move current position in round-robin fashion. From other side linked lists require maintaining additional field for total queue length. With linked list push and pop operations have to modify the queue length in addition to element links updating, so you need to take more care of consistency in the queue for lock-free implementation. Basically different CPU families provides different guarantees for memory operations ordering and this is critical for lock-free algorithms. In this article we'll concentrate on x86 as most widespread architecture rather than write generic (but slower) code. II. NAIVE SYNCHRONIZED QUEUE First of all lets define the interface for our queue (I'll use C++11 in the article): template<class T, long Q_SIZE> class NaiveQueue { public: NaiveQueue(); void push(T *x); T *pop(); }; The queue will store T* pointers and has maximum size of Q_SIZE. Lets see how the queue would looks in naive locked implementation. To develop the queue we need an array in which we place our ring buffer. We can define this as T *ptr_array_[Q_SIZE]; Two members of the class, head_ and tail_, will point to head (next position to push an element) and tail (next item to pop) of the queue and should be initialized to zero in the class construction. We can simplify our operations on ring buffer by defining the counters as unsigned long. Unsigned long (which is 64 bit in length) is large enough to handle ever millions operations per second for thousands of years. So tail_ and head_ will be defined as: unsigned long head_; unsigned long tail_; This way we can access the elements (the same for head_ and tail_) just by ptr_array_[tail_++ & Q_MASK] Where Q_MASK is defined as static const unsigned long Q_MASK = Q_SIZE - 1; To get current position in the array we can calculate a remainder of integer division of tail_ by Q_SIZE, but rather we define Q_SIZE as a power of 2 (32768 in our case) so we can use bitwise AND between Q_MASK and tail_ which is bit faster. Since the operations on the queue must wait if there is no elements or the queue is full, we need two condition variables: std::condition_variable cond_empty_; std::condition_variable cond_overflow_; to wait on some new elements in the queue or some free space respectively. Surely, we need a mutex to serialize our queue: std::mutex mtx_; This way we can write push() and pop() in the following way: void push(T *x) { std::unique_lock<std::mutex> lock(mtx_); cond_overflow_.wait(lock, [&head_, &tail_]() { return tail_ + Q_SIZE > head_; }); ptr_array_[head_++ & Q_MASK] = x; cond_empty_.notify_one(); } T *pop() { std::unique_lock<std::mutex> lock(mtx_); cond_empty_.wait(lock, [&head_, &tail_]() { return tail_ < head_; }); T *x = ptr_array_[tail_++ & Q_MASK]; cond_overflow_.notify_one(); return x; } We perform both the operations under acquired exclusive lock using mtx_. When the lock acquired we can check current queue state: whether it is empty (and we can not pop any new element) or full (can not push a new element). std::condition_variable::wait() moves the current thread to sleep state until the specified predicate is true. Next we push or pop an element and notify other thread (by notify_one() call) that we have changed the queue state. Since we add or delete only one element at a time, then only one thread waiting for a available elements or free slots in the queue can make progress, so we notify and wake up only one thread. The problem with the implementation is that only one thread at single point of time can modify the queue. Moreover mutexes and condition variables are expensive - in Linux they are implemented by futex(2) system call. So each time when a thread needs to wait on a mutex or condition variable, that leads to call futex(2) which reschedule the thread and moves it to wait queue. Now lets run plain test which just push and pop addresses to and from the queue in 16 producers and 16 consumers (please refer end of the article for link to full source code). On a box with 16 Xeon cores the test took about 7 minutes: # time ./a.out real 6m59.219s user 6m21.515s sys 72m34.177s And strace with -c and -f options shows that 99.98% of time the program spends in futex system call. III. LOCK-FREE MULTI-PRODUCER MULTI-CONSUMER QUEUE Hopefully you do not have to ask kernel for help with user space threads synchronization. CPU (at least the most known architectures) provide atomic memory operations and barriers. With the operations you can atomically * read memory operand, modify it and write back * read memory operand, compare it with a value and swap with other value Memory barriers are special assembly instructions also known as fences. Fences guarantee instructions execution order on local CPU and visibility order on other CPUs. Lets consider two independent by data instructions, A and B, separated by fence (let it be mfence which provides guarantee for ordering of read and write operations): A mfence B The fence guaranties that: 1. compiler optimizations won't move A after the fence or B before the fence; 2. CPU will execute A and B instructions in-order (event it normally executes instructions out-of-order); 3. other CPU cores and processor packages, which work on the same bus, will see result of instruction A before result of instruction B. For our queue we need to synchronize multiple threads access to head_ and tail_ fields. Actually, when you run head_++ (this is an example of RMW, Read-Modify-Write, operation since processor must read current head_ value, increment it locally and write back to memory) on two cores, then both the cores could simultaneously read current head_ value, increment it and simultaneously write the new value back, so one increment is lost. For atomic operations C++11 provides std::atomic template which should replace current GCC sync_ intrinsics in future. Unfortunately, for my compiler (GCC 4.6.3 for x86-64) std::atomic<> methods still generate extra fences independently on specified memory order. So I'll use old GCC's intrinsics for atomic operations. We can atomically read and increment a variable (e.g. our head_) by __sync_fetch_and_add(&head_, 1); This makes CPU to lock the shared memory location on which it's going to do an operation (increment in our case). In multiprocessor environment processors communicate to each other to ensure that they all see relevant data. This is known as cache coherency protocol. By this protocol processor can take exclusive ownership on a memory location. However these communications are not for free and we should use such atomic operations carefully and only when we really need them. Otherwise we can hurt performance significantly. Meanwhile plain read and write operations on memory locations execute atomically and do not require any additional actions (like specifying 'lock' prefix to make the instruction run atomically on x86 architecture). In our lock-free implementation we're going to abandon mutex mtx_ and consequently both the condition variable. However we still need to wait if the queue is full on push and if the queue is empty on pop operations. For push we would do this by simple loop like we did it for locked queue: while (tail_ + Q_SIZE < head_) sched_yield(); sched_yield() just lets other thread to run on current processor. This is native and fastest way to reschedule current thread. However if there is no other thread which is waiting in scheduler run queue for available CPU, then current thread will be immediately scheduled back. Thus we'll see always 100% CPU usage, ever if we have no data to process. To cope with this we can use usleep(3) with some small value. Lets see more carefully what's going on in the loop. Firstly we read tail_ value, next we read value of head_ and after that we make a decision whether to wait or push an element and move head_ forward. Current thread can schedule at any place of the check and ever after the check. Lets consider 2 threads scenario: Thread 1 Thread 2 read tail_ read tail_ read head_ read head_ (scheduled) push an element push an element If we had only one free place in the ring buffer, then we override pointer to oldest queued element. We can solve the problem by incrementing the shared head_ before the loop and use temporal local variable (i.e. we reserve a place to which we're going to insert an element and wait when it is free): unsigned long tmp_head = __sync_fetch_and_add(&head_, 1); while (tail_ + Q_SIZE < tmp_head) sched_yield(); ptr_array_[tmp_head & Q_MASK] = x; We can write similar code for pop() (just swap head and tail). However the problem still exists. Two producers can increment head_, check that they have enough space and reschedule at the same time just before inserting x. A consumer can wake up instantly (it sees that head_ moved forward to two positions) and read a value from the queue which was not inserted yet. Before solving the issue lets see which picture we have in 2 producers (P1 and P2) and 2 consumers (C1 and C2) case: LT LH | _ | _ | _ | x | x | x | x | x | x | x | _ | _ | _ | ^ ^ ^ ^ | | | | C1 C2 P1 P2 On the picture '_' denotes free slots and 'x' denotes inserted elements. At the picture C1 and C2 are going to read values and P1 and P2 are going to write an elements to currently free slots. Let LT be a latest (lowest) tail value among all the consumers, which is stored in tmp_tail of latest consumer, C1 on the picture. Consumer C1 currently can work on the queue at LT position (i.e. it is at the middle of fetching the element). And let LH correspondingly be lowest value of tmp_head among all the producers. At each given time you can not push an element to position equal or greater than LT and should not try to pop an element at position equal or greater than LH. It means that all the producers should care about current LT value and all consumers about current LH value. So lets introduce the two helping class members for LH and LT: volatile unsigned long last_head_; volatile unsigned long last_tail_; Thus we should check for last_tail_ value instead of tail_ in the loop above. We need to update the values from multiple threads, but we're going to do this by plain write operations, without RMW. So the members do not have to be of atomic type. I just specified the variables as volatile to prevent their values caching in local processor registers. Now the question is who and when should update last_head_ and last_tail_ values. We do expect that in most cases we are able to perform push and/or pop operation on the queue without a wait. Thus we can update the two helping variables only when we really need them, i.e. inside the waiting loop. So when a producer realizes that it can not insert a new element because of too small last_tail_ value it falls into the wait loop and try to update last_tail_ value. To update the value the thread must inspect current tmp_tail of each consumer. So we need to make the temporal value visible to other threads. One of the possible solutions is to maintain an array of tmp_tail and tmp_head values with size equal to number of running threads. We can do this with following code: struct ThrPos { volatile unsigned long head, tail; }; ThrPos thr_p_[std::max(n_consumers_, n_producers_)]; where n_consumers_ is the number of consumers and n_producers_ is the number of producers. We can allocate the array dynamically, but leave it statically sized for simplicity for now. Many threads read the elements of the array, but only one thread with plain move instruction (no RMW operation) can update them, so you also can use regular reads on the variables. Since thr_p_ values are used to only limit moving of current queue pointers, then we initialize them to maximum allowed values, i.e. do not limit head_ and tail_ movings until somebody push or pop into the queue. We can find the lowest tail values for all the consumers by following loop: auto min = tail_; for (size_t i = 0; i < n_consumers_; ++i) { auto tmp_t = thr_p_[i].tail; asm volatile("" ::: "memory"); // compiler barrier if (tmp_t < min) min = tmp_t; l} The temporal variable tmp_t is required here since you can not atomically compare whether thr_p_[i].tail is less than min and update min if it is. When you remember current consumer's tail and compare it with min, the consumer can move the tail. It can move it only forward, so the check in the while condition is still correct and you won't overwrite some live queue elements. But if you wouldn't use tmp_t and write the code like if (thr_p_[i].tail < min) min = thr_p_[i].tail; Then the consumer can has lower tail value while you're comparing it with min, but move it far forward after the comparison is done and just before the assignment. So you probably find incorrect minimal value. I added compiler barrier, asm volatile("" ::: "memory") (this is GCC specific compiler barrier), to be sure that compiler won't move thr_p_[i].tail access and will access the memory location only once - to load its value to tmp_t. One important thing about the array is that it must be indexed by current thread identifier. Since POSIX threads (and consequently C++ threads which uses them) do not use small monotonically increasing values for threads identifying, then we need to use our own thread wrapping. I'll use inline thr_pos() method of the queue to access the array elements: ThrPos& thr_pos() const { return thr_p_[ThrId()]; } (you can find example of ThrId() implementation in the source referenced at the end of the article). Before writing the final implementation of push() and pop() lets back to initial application of our queue, work queue. Usually, producers and consumers do a lot of work between operations with the queue. For instance, it could be very slow IO operation. So what happens if one consumer fetch an element from the queue and go to sleep in long IO operation? Its tail value will be stay the same for long time and all the producers will wait on it ever all the other consumers fully cleared the queue. This is not desired behavior. Lets fix this by two steps. First, lets assign to per-thread tail pointer maximum allowed value just after the fetching the element. So we should write following at the end of pop() method: T *ret = ptr_array_[thr_pos().tail & Q_MASK]; thr_pos().tail = ULONG_MAX; return ret; Since a producer in push() starts to find minimal allowed value for last_tail_ from current value of global tail_, then it can assign current tail_ value to last_tail_ only if there is no any active consumers. This is what we wish. Generally speaking, other processors can see thr_pos().tail update before local processor reads from ptr_array_, so they can move and overwrite the position in the array before local processor reads it. This is possible on processors with relaxed memory operation ordering. However x86 provides relatively strict memory ordering rules, particularly it guarantees that 1. stores are not reordered with earlier loads 2. and stores are seen in consistent order by other processors. Thus, loading from ptr_array_ and storing to thr_pos().tail in the code above will be done on x86 and seen by all processors in exactly this order. So we don't need any explicit memory barriers here. The second step which we need to do is correctly set thr_pos().tail at the beginning of pop(). We assign current thr_pos().tail by thr_pos().tail = __sync_fetch_and_add(&tail_, 1); The problem is that the operation is atomic only for tail_ shift, but not for thr_pos().tail assignment. So there is a time window in which thr_pos().tail = ULONG_MAX, and tail_ could be shifted significantly by other consumers, so push() will set last_tail_ to current, just incremented, tail_. So when we're are going to pop an element we have to reserve a tail position less or equal to tail_ value which we'll pop an element from: thr_pos().tail = tail_; thr_pos().tail = __sync_fetch_and_add(&tail_, 1); In this code we actually perform following three operations: write tail_ to thr_pos().tail increment tail_ write previous value of tail_ to thr_pos().tail Again, in general case we have no guarantee that other processors will "see" results of the write operations in the same order. Potentially some other processor can firstly read incremented tail_ value, try to find new last_tail_ and only after that read new current thread tail value. However, __sync_fetch_and_add() executes locked instruction which implies implicit full memory barrier on most architectures (including x86), so neither first nor third operations can not be moved over the second one. Therefore we also can skip explicit memory barriers here. Thus if the queue is almost full then all producers will stop at or before the position of element which we're popping. Now we're are ready to write our final implementation of push() and pop() methods. Here they are: void push(T *ptr) { thr_pos().head = head_; thr_pos().head = __sync_fetch_and_add(&head_, 1); while (__builtin_expect(thr_pos().head >= last_tail_ + Q_SIZE, 0)) { ::sched_yield(); auto min = tail_; for (size_t i = 0; i < n_consumers_; ++i) { auto tmp_t = thr_p_[i].tail; asm volatile("" ::: "memory"); // compiler barrier if (tmp_t < min) min = tmp_t; } last_tail_ = min; } ptr_array_[thr_pos().head & Q_MASK] = ptr; thr_pos().head = ULONG_MAX; } T *pop() { thr_pos().tail = tail_; thr_pos().tail = __sync_fetch_and_add(&tail_, 1); while (__builtin_expect(thr_pos().tail >= last_head_, 0)) { ::sched_yield(); auto min = head_; for (size_t i = 0; i < n_producers_; ++i) { auto tmp_h = thr_p_[i].head; asm volatile("" ::: "memory"); // compiler barrier if (tmp_h < min) min = tmp_h; } last_head_ = min; } T *ret = ptr_array_[thr_pos().tail & Q_MASK]; thr_pos().tail = ULONG_MAX; return ret; } Careful reader can notice that multiple threads can scan current head or tail values over all the producing or consuming threads. So number of threads can find different min values and try to write them to last_head_ or last_tail_ simultaneously, so probably you would use CAS operation here. However atomic CAS is expensive and worst that can happen there is that you assign too small value to last_head_ or last_tail_. Or ever overwrite new higher value with a smaller old value, so you'll fall into sched_yield() again. Maybe we fall to sched_yield() more frequently than if we use synchronized CAS operation, but in practice the cost of extra atomic operation reduces performance. Also I used __builtin_expect with zero expect argument to say that we do not expect that the condition in while statement becomes true too frequently and compiler should move the inner loop code after the code executed if the condition is false. This way you can improve instruction cache usage. Finally lets run the same test as for naive queue: # time ./a.out real 1m53.566s user 27m55.784s sys 2m4.461s This is 3.7 times faster than our naive queue implementation! IV. CONCLUSION Nowadays, high performance computing is typically achieved by two ways: horizontal scaling (scale-out) by adding new computational nodes and vertical scaling (scale-up) by adding extra computational resources (like CPUs or memory) to a single node. Unfortunatelly, linear scaling is possible only in theory. In practice if you double your computational resources, then it is likely that you get only 30-60% performance gain. Lock contention is one of the problems which prevents efficient scale-up by adding extra CPUs. Lock-free algorigthms makes scale-up more productive and allows to get more performance in multi-core environments. The code for naive and lock-free queue implementations with the tests for correctness is available at: https://github.com/krizhanovsky/NatSys-Lab/blob/master/lockfree_rb_q.cc Alexander Krizhanovsky is the software architect and founder of NatSys-Lab. Before NatSys-Lab he was working as Senior Software Developer at IBM, Yandex and Parallels. He specializes in high performance solutions for UNIX environment. Special thanks to Johann George from SanDisk for final review of the paper.
High Performance Linux
> Try Tempesta FW, a high performance open source application delivery controller for the Linux/x86-64 platform.
> Or check custom high-performance solutions from Tempesta Technologies, INC.
> Careers: if you love low-level C/C++ hacking and Linux, we'll be happy to hear from you.
Thursday, May 23, 2013
Lock-free Multi-producer Multi-consumer Queue on Ring Buffer
My article "Lock-Free Multi-Producer Multi-Consumer Queue on Ring Buffer" was published by Linux Journal more than 30 days ago, so now I can post it here.
Subscribe to:
Post Comments (Atom)
Hi, thank you for nice article.
ReplyDeleteThis algorithm is not lock-free but obstruction-free.
In case, all producer is preempted by scheduler just after
ptr_array_[thr_pos().head & Q_MASK] = ptr;
line, all consumer will get infinity loop.
sched_yield() can give progress to consumer from consumer.
By continuing such situation, consumers will get live-lock.
Practically, such situation may not happen, so this algorithm is safe.
Hi,
ReplyDeletethis is good point about the live lock. This terrible situation becomes very probable if we have number of producers and consumers threads greater than number of CPU cores.
However, it should not be live lock actually, since the system scheduler will preempt the consumers the same way, so producers get time to finish push() and move the pointers.
Also this is always good to run one thread per one core to minimize context switches and get better performance of the algorithm. And in this case we won't get the nasty preemption issue.
Hello, thanx for the wonderful article. I have some very fundamental questions regarding the unit test. What exactly is being pushed and pop? Secondly, what is the purpose of X_EMPTY and X_MISSED?
ReplyDeleteThanx in advance!
Omar,
Deleteq_type (originally unsigned char) is inserted to the queue in unit test. X_MISSED is inserted by producers - if we observer the value when all consumers and producers finish, then the array cell was missed by consumers. X_EMPTY is initialization value - if the value is observer at end of the test, then either producers and consumers missed the cell.
I think there is a mistake: unsigned long is 32 bits and not 64
ReplyDeleteBrahim,
Deletethe queue is designed for 64-bit architectures only (in fact you can't compile it on 32-bit system). And unsigned long is exactly 64 bit in size on such architectures.
Hi Alexander
ReplyDeleteI am trying to understand the algorithm. I dont quite understand why you have 2 statements:
thr_pos().tail = tail_;
thr_pos().tail = __sync_fetch_and_add(&tail_, 1);
Wouldnt the last statement achieve the goal of reserving the current tail for the consumer?
Thanks in advance
Hi Jojy,
Deletethis particular piece od code is described in the article: "The problem is that the operation is atomic only for tail_ shift, but not for
thr_pos().tail assignment. So there is a time window in which
thr_pos().tail = ULONG_MAX, and tail_ could be shifted significantly by other
consumers, so push() will set last_tail_ to current, just incremented, tail_.
So when we're are going to pop an element we have to reserve a tail position
less or equal to tail_ value which we'll pop an element from..."
Basically, you can think about thr_pos().tail as a 'stop' number - we cann't move tail_ pointer if some of the threads have lower vavlues for their local tail pointers. While there is a time gap between incrementing tail_ and assigning its value to thr_pos().tail while thr_pos().tail stays ULONG_MAX, pushers can use the ring buffer slot before we pop()'ed the item.
I hope this makes the algorithm more clear....
Thanks Alexander for clarifying the logic.
DeleteHow do you guarantee that the pop() pops a newly added object?
ReplyDeleteHi Alexander. Thanks for this awesome article.
ReplyDeleteI have a doubt. I think that if the producer calls push just once ( and then never calls push again ) then the consumer won't be able to consume that single element.
In the above scenario, thr_pos().head ( and head_ and last_head_ ) would be "1" , thr_pos().tail ( and tail_ ) would also be "1". So the consumer would keep looping here [ while (__builtin_expect(thr_pos().tail >= last_head_, 0)) ] and hence won't be able to consume the only element that was pushed.
Hi Vedant,
Deletewe initialize all the queue pointers to 0, so when we push 1st item, tail pointer remains 0, so a consumer can make progress. You can test it with following patch:
$ git diff
diff --git a/lockfree_rb_q.cc b/lockfree_rb_q.cc
index cf0c328..5f8cb19 100644
--- a/lockfree_rb_q.cc
+++ b/lockfree_rb_q.cc
@@ -346,9 +346,9 @@ private:
* Tests for naive and lock-free queues
* ------------------------------------------------------------------------
*/
-static const auto N = QUEUE_SIZE * 1024;
-static const auto CONSUMERS = 2;
-static const auto PRODUCERS = 2;
+static const auto N = 1; //QUEUE_SIZE * 1024;
+static const auto CONSUMERS = 1;//2;
+static const auto PRODUCERS = 1;//2;
typedef unsigned char q_type;
@@ -381,6 +381,7 @@ struct Producer : public Worker<Q> {
for (auto i = thr_id(); i < N * PRODUCERS; i += PRODUCERS) {
x[i] = X_MISSED;
Worker<Q>::q_->push(x + i);
+ std::cout << "pushed 1 element" << std::endl;
}
}
};
@@ -400,6 +401,7 @@ struct Consumer : public Worker<Q> {
assert(v);
assert(*v == X_MISSED);
*v = (q_type)(thr_id() + 1); // don't write zero
+ std::cout << "poped 1 element" << std::endl;
}
}
};
@@ -467,11 +469,11 @@ main()
LockFreeQueue lf_q(PRODUCERS, CONSUMERS);
run_test>(std::move(lf_q));
- NaiveQueue n_q;
+/* NaiveQueue n_q;
run_test>(std::move(n_q));
BoostQueue b_q;
- run_test>(std::move(b_q));
+ run_test>(std::move(b_q));*/
return 0;
}
[alex@tempesta blog]$ ./a.out
pushed 1 element
poped 1 element
11ms
check X data...
Passed
Thanks a lot ! I was using __sync_add_and_fetch instead of __sync_fetch_and_add which was leading to the above mentioned issue.
Delete