High Performance Linux



> Try Tempesta FW, a high performance open source application delivery controller for the Linux/x86-64 platform.

> Or check custom high-performance solutions from Tempesta Technologies, INC.

> Careers: if you love low-level C/C++ hacking and Linux, we'll be happy to hear from you.


Saturday, July 27, 2013

C++ Variadic Templates For Multiple Inheritance

C++ variadic templates take variable number of arguments. C++ also allows to create a template class which inherits from template base class. These two allows us to inherit from variable number of base classes. When it's needed? Let's have a look at simple example which I faced recently.

Suppose you need class Messenger which receives raw messages from a socket, assembles messages of particular types and passes them into appropriate queue. Frequently the message queues are implemented as template like

    template<class T>
    struct Queue {
        // some class body
    };

So you has following queues for each type of message:

    Queue<MsgA> q_a;
    Queue<MsgB> q_b;
    Queue<MsgC> q_c;

The queues have to be members of class Messenger. Probably, it is not a big deal to copy paste 3, as in the example, members. However, the uglinesses raises from necessity to have registering interfaces for each queue (classes who uses the Messenger need to register on particular queue to receive messages from it), serialized push() interfaces, probably the queue accessors and some other methods specific for the queue (accessing the queues directly as to public members isn't a good idea). C++ meta-programming could help to generate the queues automatically with all required interfaces. Let's have see how we can use it for this task.

Messenger provides interfaces to the queues, so it "is-a" QueueHandler. QueueHandler handles queue of particular type as a member and provides interfaces to it. So you should generate set of QueueHander classes for each queue type and inherit Messenger from all of the classes:

    template<class T>
    struct QueueHandler {
        void register(Queue<T> *q) { /* some method body */ }
        void push(T *msg) { /* some other method body */ }
    private:

        Queue<T> q_;
    };


It's also worse to specify explicitly all the base QueueHandler classes for Messenger class. So you can introduce helping class GenericMessenger, which template specialization is Messenger, and use C++ variadic template to write the class independent on particular number of serviced queues:

    template<class... Args>
    struct GenericMessenger : QueueHandler<Args>... {
        // some struct body
    };

    typedef GenericMessenger<MsgA, MsgB, MsgC> Messenger;


Therefore, if you need to support one more type of messages (and their queue of course), then you just need to add the type to Messenger definition and there is no copy paste code!

The only one ugly thing is that you need to specify explicitly base class on accessing particular queue (this is because GenericMessenger has many base classes with the same methods' names, so we need to explicitly call method of particular base):

    Messenger *m = new Messenger();
    

    m->QueueHandler<A>::register(new Queue<MsgA>);
    m->QueueHandler<A>::push(new MsgA);



Thursday, May 23, 2013

Lock-free Multi-producer Multi-consumer Queue on Ring Buffer

My article "Lock-Free Multi-Producer Multi-Consumer Queue on Ring Buffer" was published by Linux Journal more than 30 days ago, so now I can post it here.

Work queue has always been one of the most hot points in server software.
  Here is how to scale it effectively to multi-core environment.


I. INTRODUCTION

Nowadays high performance server software (e.g. HTTP accelerator) in most cases
runs on multi-core machines. Modern hardware could provide 32, 64 and more CPU
cores. In such highly-concurrent environment lock contention sometimes hurts
overall system performance more than data copying, context switches etc. Thus,
moving most hot data structures from locked to lock-free design can
significantly improve performance of software working in multi-core
environment.

One of the most hot data structure in traditional server software is work
queue, which could have hundreds of thousands push and pop operations per
second from tens of producers and/or consumers.

Work queue is a FIFO data structure which has only two operations: push() and
pop(). It usually limits its size such that pop() waits if there
is no elements in the queue and push() waits if the queue contains maximum
allowed number of elements. It is important that many threads can execute pop()
and push() operations simultaneously on different CPU cores.

One of the possible work queue implementations is ring buffer storing pointers
to the queued elements. It has good performance especially in comparison
with common non-intrusive linked list (which stores copies of values passed by
the user, e.g. std::list).
Significant thing about ring buffer implementation is that it natively limits
its size - you only need to move current position in round-robin fashion. From
other side linked lists require maintaining additional field for total queue
length. With linked list push and pop operations have to modify the queue
length in addition to element links updating, so you need to take more care of
consistency in the queue for lock-free implementation.

Basically different CPU families provides different guarantees for memory
operations ordering and this is critical for lock-free algorithms.
In this article we'll concentrate on x86 as most widespread architecture
rather than write generic (but slower) code.


II. NAIVE SYNCHRONIZED QUEUE

First of all lets define the interface for our queue (I'll use C++11 in the
article):

        template<class T, long Q_SIZE>
        class NaiveQueue {
        public:
            NaiveQueue();
            void push(T *x);
            T *pop();
        };

The queue will store T* pointers and has maximum size of Q_SIZE.

Lets see how the queue would looks in naive locked implementation. To develop
the queue we need an array in which we place our ring buffer. We can define
this as

        T *ptr_array_[Q_SIZE];

Two members of the class, head_ and tail_, will point to head (next position
to push an element) and tail (next item to pop) of the queue and should be
initialized to zero in the class construction. We can simplify our operations
on ring buffer by defining the counters as unsigned long. Unsigned long (which
is 64 bit in length) is large enough to handle ever millions operations per
second for thousands of years. So tail_ and head_ will be defined as:

        unsigned long head_;
        unsigned long tail_;

This way we can access the elements (the same for head_ and tail_) just by

        ptr_array_[tail_++ & Q_MASK]

Where Q_MASK is defined as

        static const unsigned long Q_MASK = Q_SIZE - 1;

To get current position in the array we can calculate a remainder of integer
division of tail_ by Q_SIZE, but rather we define Q_SIZE as a power of 2
(32768 in our case) so we can use bitwise AND between Q_MASK and tail_ which
is bit faster.

Since the operations on the queue must wait if there is no elements or the
queue is full, we need two condition variables:

        std::condition_variable cond_empty_;
        std::condition_variable cond_overflow_;

to wait on some new elements in the queue or some free space respectively.
Surely, we need a mutex to serialize our queue:

        std::mutex mtx_;

This way we can write push() and pop() in the following way:

        void push(T *x)
        {
            std::unique_lock<std::mutex> lock(mtx_);

            cond_overflow_.wait(lock, [&head_, &tail_]() {
                            return tail_ + Q_SIZE > head_;
                    });

            ptr_array_[head_++ & Q_MASK] = x;

            cond_empty_.notify_one();
        }

        T *pop()
        {
            std::unique_lock<std::mutex> lock(mtx_);

            cond_empty_.wait(lock, [&head_, &tail_]() {
                            return tail_ < head_;
                    });

            T *x = ptr_array_[tail_++ & Q_MASK];

            cond_overflow_.notify_one();

            return x;
        }

We perform both the operations under acquired exclusive lock using mtx_. When
the lock acquired we can check current queue state: whether it is empty (and
we can not pop any new element) or full (can not push a new element).
std::condition_variable::wait() moves the current thread to sleep state until
the specified predicate is true. Next we push or pop an element and notify
other thread (by notify_one() call) that we have changed the queue state.
Since we add or delete only one element at a time, then only one thread waiting
for a available elements or free slots in the queue can make progress, so we
notify and wake up only one thread.

The problem with the implementation is that only one thread at single point of
time can modify the queue. Moreover mutexes and condition variables
are expensive - in Linux they are implemented by futex(2) system call.
So each time when a thread needs to wait on a mutex or condition variable,
that leads to call futex(2) which reschedule the thread and moves it to wait
queue.

Now lets run plain test which just push and pop addresses to and from the
queue in 16 producers and 16 consumers (please refer end of the article for
link to full source code). On a box with 16 Xeon cores the test took about 7
minutes:

        # time ./a.out

        real    6m59.219s
        user    6m21.515s
        sys     72m34.177s

And strace with -c and -f options shows that 99.98% of time the program spends
in futex system call.


III. LOCK-FREE MULTI-PRODUCER MULTI-CONSUMER QUEUE

Hopefully you do not have to ask kernel for help with user space threads
synchronization. CPU (at least the most known architectures) provide atomic
memory operations and barriers. With the operations you can atomically

 * read memory operand, modify it and write back
 * read memory operand, compare it with a value and swap with other value

Memory barriers are special assembly instructions also known as fences.
Fences guarantee instructions execution order on local CPU and visibility
order on other CPUs. Lets consider two independent by data instructions, A
and B, separated by fence (let it be mfence which provides guarantee for
ordering of read and write operations):

 A
 mfence
 B

The fence guaranties that:
1. compiler optimizations won't move A after the fence or B before the fence;
2. CPU will execute A and B instructions in-order (event it normally executes
   instructions out-of-order);
3. other CPU cores and processor packages, which work on the same bus, will
   see result of instruction A before result of instruction B.

For our queue we need to synchronize multiple threads access to head_ and
tail_ fields. Actually, when you run head_++ (this is an example of RMW,
Read-Modify-Write, operation since processor must read current head_ value,
increment it locally and write back to memory) on two cores, then both the cores
could simultaneously read current head_ value, increment it and simultaneously
write the new value back, so one increment is lost. For atomic operations
C++11 provides std::atomic template which should replace current GCC sync_
intrinsics in future. Unfortunately, for my compiler (GCC 4.6.3 for x86-64)
std::atomic<> methods still generate extra fences independently on specified
memory order. So I'll use old GCC's intrinsics for atomic operations.

We can atomically read and increment a variable (e.g. our head_) by

        __sync_fetch_and_add(&head_, 1);

This makes CPU to lock the shared memory location on which it's going to do an
operation (increment in our case). In multiprocessor environment processors
communicate to each other to ensure that they all see relevant data. This is
known as cache coherency protocol. By this protocol processor can take
exclusive ownership on a memory location. However these communications are not
for free and we should use such atomic operations carefully and only when
we really need them. Otherwise we can hurt performance significantly.

Meanwhile plain read and write operations on memory locations execute
atomically and do not require any additional actions (like specifying 'lock'
prefix to make the instruction run atomically on x86 architecture).

In our lock-free implementation we're going to abandon mutex mtx_ and
consequently both the condition variable. However we still need to wait if the
queue is full on push and if the queue is empty on pop operations. For push we
would do this by simple loop like we did it for locked queue:

        while (tail_ + Q_SIZE < head_)
            sched_yield();

sched_yield() just lets other thread to run on current processor. This is
native and fastest way to reschedule current thread. However if there is no
other thread which is waiting in scheduler run queue for available CPU, then
current thread will be immediately scheduled back. Thus we'll see always 100%
CPU usage, ever if we have no data to process. To cope with this we can use
usleep(3) with some small value.

Lets see more carefully what's going on in the loop. Firstly we read tail_ value,
next we read value of head_ and after that we make a decision whether to wait
or push an element and move head_ forward. Current thread can schedule at any
place of the check and ever after the check. Lets consider 2 threads scenario:

        Thread 1                  Thread 2

        read tail_                read tail_
        read head_                read head_
        (scheduled)               push an element
        push an element

If we had only one free place in the ring buffer, then we override pointer to
oldest queued element. We can solve the problem by incrementing the shared
head_ before the loop and use temporal local variable (i.e. we reserve a place
to which we're going to insert an element and wait when it is free):

        unsigned long tmp_head =
            __sync_fetch_and_add(&head_, 1);
        while (tail_ + Q_SIZE < tmp_head)
            sched_yield();
        ptr_array_[tmp_head & Q_MASK] = x;

We can write similar code for pop() (just swap head and tail). However the
problem still exists. Two producers can increment head_, check that they have
enough space and reschedule at the same time just before inserting x. A
consumer can wake up instantly (it sees that head_ moved forward to two
positions) and read a value from the queue which was not inserted yet.

Before solving the issue lets see which picture we have in 2 producers (P1 and
P2) and 2 consumers (C1 and C2) case:

                     LT                          LH
        | _ | _ | _ | x | x | x | x | x | x | x | _ | _ | _ |
                      ^   ^                       ^   ^
                      |   |                       |   |
                      C1  C2                      P1  P2

On the picture '_' denotes free slots and 'x' denotes inserted elements. At
the picture C1 and C2 are going to read values and P1 and P2 are going to
write an elements to currently free slots. Let LT be a latest (lowest) tail
value among all the consumers, which is stored in tmp_tail of latest consumer,
C1 on the picture. Consumer C1 currently can work on the queue at LT position
(i.e. it is at the middle of fetching the element). And let LH correspondingly
be lowest value of tmp_head among all the producers. At each given time you can
not push an element to position equal or greater than LT and should not try to
pop an element at position equal or greater than LH. It means that all the
producers should care about current LT value and all consumers about current
LH value. So lets introduce the two helping class members for LH and LT:

        volatile unsigned long last_head_;
        volatile unsigned long last_tail_;

Thus we should check for last_tail_ value instead of tail_ in the loop above.
We need to update the values from multiple threads, but we're going to do this
by plain write operations, without RMW. So the members do not have to be of
atomic type. I just specified the variables as volatile to prevent their values
caching in local processor registers.

Now the question is who and when should update last_head_ and last_tail_
values. We do expect that in most cases we are able to perform push and/or pop
operation on the queue without a wait. Thus we can update the two helping
variables only when we really need them, i.e. inside the waiting loop.
So when a producer realizes that it can not insert a new element because of too
small last_tail_ value it falls into the wait loop and try to update last_tail_
value. To update the value the thread must inspect current tmp_tail of each
consumer. So we need to make the temporal value visible to other threads.
One of the possible solutions is to maintain an array of tmp_tail and tmp_head
values with size equal to number of running threads. We can do this with
following code:

        struct ThrPos {
            volatile unsigned long head, tail;
        };

        ThrPos thr_p_[std::max(n_consumers_, n_producers_)];

where n_consumers_ is the number of consumers and n_producers_ is the number of
producers. We can allocate the array dynamically, but leave it statically sized
for simplicity for now. Many threads read the elements of the array, but only one
thread with plain move instruction (no RMW operation) can update them,
so you also can use regular reads on the variables.

Since thr_p_ values are used to only limit moving of current queue pointers,
then we initialize them to maximum allowed values, i.e. do not limit head_ and
tail_ movings until somebody push or pop into the queue.

We can find the lowest tail values for all the consumers by following loop:

        auto min = tail_;
        for (size_t i = 0; i < n_consumers_; ++i) {
            auto tmp_t = thr_p_[i].tail;

            asm volatile("" ::: "memory"); // compiler barrier

            if (tmp_t < min)
                min = tmp_t;
        l}

The temporal variable tmp_t is required here since you can not atomically
compare whether thr_p_[i].tail is less than min and update min if it is. When
you remember current consumer's tail and compare it with min, the consumer can
move the tail. It can move it only forward, so the check in the while
condition is still correct and you won't overwrite some live queue elements.
But if you wouldn't use tmp_t and write the code like

        if (thr_p_[i].tail < min)
            min = thr_p_[i].tail;

Then the consumer can has lower tail value while you're comparing it with min,
but move it far forward after the comparison is done and just before the
assignment. So you probably find incorrect minimal value.

I added compiler barrier, asm volatile("" ::: "memory") (this is GCC specific
compiler barrier), to be sure that compiler won't move thr_p_[i].tail access
and will access the memory location only once - to load its value to tmp_t.

One important thing about the array is that it must be indexed by current thread
identifier. Since POSIX threads (and consequently C++ threads which uses
them) do not use small monotonically increasing values for threads identifying,
then we need to use our own thread wrapping. I'll use inline thr_pos() method of
the queue to access the array elements:

        ThrPos& thr_pos() const
        {
            return thr_p_[ThrId()];
        }

(you can find example of ThrId() implementation in the source referenced at
the end of the article).

Before writing the final implementation of push() and pop() lets back to
initial application of our queue, work queue. Usually, producers and consumers
do a lot of work between operations with the queue. For instance, it could be
very slow IO operation. So what happens if one consumer fetch an element from
the queue and go to sleep in long IO operation? Its tail value will be stay the
same for long time and all the producers will wait on it ever all the other
consumers fully cleared the queue. This is not desired behavior.

Lets fix this by two steps. First, lets assign to per-thread tail pointer
maximum allowed value just after the fetching the element. So we should write
following at the end of pop() method:

        T *ret = ptr_array_[thr_pos().tail & Q_MASK];
        thr_pos().tail = ULONG_MAX;
        return ret;

Since a producer in push() starts to find minimal allowed value for last_tail_
from current value of global tail_, then it can assign current tail_ value
to last_tail_ only if there is no any active consumers. This is what we wish.

Generally speaking, other processors can see thr_pos().tail update before
local processor reads from ptr_array_, so they can move and overwrite the
position in the array before local processor reads it. This is possible on
processors with relaxed memory operation ordering. However x86 provides
relatively strict memory ordering rules, particularly it guarantees that
1. stores are not reordered with earlier loads
2. and stores are seen in consistent order by other processors.
Thus, loading from ptr_array_ and storing to thr_pos().tail in the code above
will be done on x86 and seen by all processors in exactly this order.
So we don't need any explicit memory barriers here.

The second step which we need to do is correctly set thr_pos().tail at the
beginning of pop(). We assign current thr_pos().tail by

        thr_pos().tail = __sync_fetch_and_add(&tail_, 1);

The problem is that the operation is atomic only for tail_ shift, but not for
thr_pos().tail assignment. So there is a time window in which
thr_pos().tail = ULONG_MAX, and tail_ could be shifted significantly by other
consumers, so push() will set last_tail_ to current, just incremented, tail_.
So when we're are going to pop an element we have to reserve a tail position
less or equal to tail_ value which we'll pop an element from:

        thr_pos().tail = tail_;
        thr_pos().tail = __sync_fetch_and_add(&tail_, 1);

In this code we actually perform following three operations:

        write tail_ to thr_pos().tail
        increment tail_
        write previous value of tail_ to thr_pos().tail

Again, in general case we have no guarantee that other processors will "see"
results of the write operations in the same order. Potentially some other
processor can firstly read incremented tail_ value, try to find new last_tail_
and only after that read new current thread tail value. However,
__sync_fetch_and_add() executes locked instruction which implies implicit full
memory barrier on most architectures (including x86), so neither first nor third
operations can not be moved over the second one. Therefore we also can skip
explicit memory barriers here.

Thus if the queue is almost full then all producers will stop at or before
the position of element which we're popping.

Now we're are ready to write our final implementation of push() and pop()
methods. Here they are:

        void push(T *ptr)
        {
            thr_pos().head = head_;
            thr_pos().head = __sync_fetch_and_add(&head_, 1);

            while (__builtin_expect(thr_pos().head >=
                                    last_tail_ + Q_SIZE, 0))
            {
                ::sched_yield();

                auto min = tail_;
                for (size_t i = 0; i < n_consumers_; ++i) {
                    auto tmp_t = thr_p_[i].tail;

                    asm volatile("" ::: "memory"); // compiler barrier

                    if (tmp_t < min)
                        min = tmp_t;
                }
                last_tail_ = min;
            }

            ptr_array_[thr_pos().head & Q_MASK] = ptr;
            thr_pos().head = ULONG_MAX;
        }

        T *pop()
        {
            thr_pos().tail = tail_;
            thr_pos().tail = __sync_fetch_and_add(&tail_, 1);

            while (__builtin_expect(thr_pos().tail >=
                                    last_head_, 0))
            {
                ::sched_yield();

                auto min = head_;
                for (size_t i = 0; i < n_producers_; ++i) {
                    auto tmp_h = thr_p_[i].head;

                    asm volatile("" ::: "memory"); // compiler barrier

                    if (tmp_h < min)
                        min = tmp_h;
                }
                last_head_ = min;
            }

            T *ret = ptr_array_[thr_pos().tail & Q_MASK];
            thr_pos().tail = ULONG_MAX;
            return ret;
        }

Careful reader can notice that multiple threads can scan current head or tail
values over all the producing or consuming threads. So number of threads can
find different min values and try to write them to last_head_ or last_tail_
simultaneously, so probably you would use CAS operation here. However atomic
CAS is expensive and worst that can happen there is that you assign too small
value to last_head_ or last_tail_. Or ever overwrite new higher value with a
smaller old value, so you'll fall into sched_yield() again. Maybe we fall to
sched_yield() more frequently than if we use synchronized CAS operation,
but in practice the cost of extra atomic operation reduces performance.

Also I used __builtin_expect with zero expect argument to say that we do not
expect that the condition in while statement becomes true too frequently and
compiler should move the inner loop code after the code executed if the
condition is false. This way you can improve instruction cache usage.

Finally lets run the same test as for naive queue:

        # time ./a.out 

        real    1m53.566s
        user    27m55.784s
        sys     2m4.461s

This is 3.7 times faster than our naive queue implementation!


IV. CONCLUSION

Nowadays, high performance computing is typically achieved by two ways:
horizontal scaling (scale-out) by adding new computational nodes and
vertical scaling (scale-up) by adding extra computational resources (like
CPUs or memory) to a single node. Unfortunatelly, linear scaling is possible
only in theory. In practice if you double your computational resources, then
it is likely that you get only 30-60% performance gain. Lock contention is one
of the problems which prevents efficient scale-up by adding extra CPUs.
Lock-free algorigthms makes scale-up more productive and allows to get more
performance in multi-core environments.

The code for naive and lock-free queue implementations with the tests for
correctness is available at:

  https://github.com/krizhanovsky/NatSys-Lab/blob/master/lockfree_rb_q.cc

Alexander Krizhanovsky is the software architect and founder of NatSys-Lab.
Before NatSys-Lab he was working as Senior Software Developer at IBM, Yandex
and Parallels. He specializes in high performance solutions for UNIX
environment.

Special thanks to Johann George from SanDisk for final review of the paper.

Tuesday, April 2, 2013

My Article In Linux Journal

April issue of Linux Journal (issue #228, High Performance Computing) has arrived. My article "Lock-Free Multi-Producer Multi-Consumer Queue on Ring Buffer" is at page 104 and referred as "How to Scale the Work Queue in a Multicore Environment" on the cover.

The article describes lock-free work queue which is already working in production environments on two of our clients. This queue significantly improves performance on multi-core system. I got 3.7 times better results for synthetic tests in comparison with naive queue implementation on 16-core Xeon machine. One of the projects, where the queue is integrated, got about 35% performance improvement after replacement double-linked queue protected with spin lock by the lock-free queue.

You can find the source of the queue with performance and correctness tests here.


UPD. Also you can read it on-line at Linux Journal site.

Friday, March 29, 2013

What's Wrong With Sockets Performance And How to Fix It

Socket API is a nice think which allows you to easily write network programs. But sockets have fundamental problem from performance point of view - they are asynchronous with network interrupts. And this is regardless whether you're using blocking or nonblocking IO.

Let's consider a multi-threaded application which is working with number of sockets and reads some data from them. Typically, it does following (pseudo code):

    int n = epoll_wait(e->fd, e->event, e->max_events, 1000);
    for (int i = 0; i < n; ++i) {
        unsigned char buf[4096];
        read(e->event[i].data.fd, buf, 4096);
    }

The polled socket could be either blocking or non-blocking socket. Let's forget about the buffer copying for a moment and concentrate on what's happen with arriving packets.


The figure depicts two processes (there is no difference between processes and threads in our discussion) which reads from three sockets. The processes are working on different CPUs. Probably Receive Flow Steering (RFS) is used so packets designated for the first process go to the first CPU and packets for second process are processed by softirq at the second CPU. Each socket has a receive queue where incoming packets are placed before reading process consumes them.

If we look at the code sample carefully then we find two system calls, relatively slow operations. The process also can be rescheduled and/or preempted between the syscalls. So if the process is waked up in epoll_wait() call by a socket event (when the socket gets a packet) then it reads data from the socket with some delay. There are a bold arrow between the second socket's queue and the first process which depicts reading a data from the socket. There are two complications:
  • the process can be preempted by softirq between waking up on epoll_wait() and reading from the socket (however, it's easy to prevent this by binding the process and the NIC interrupts to different cores);
  • during high loaf Linux switches to polling mode and very very quickly grabs bunches of packets, so during the delay between the two syscalls softirq can process a lot of other packets.
The problem is that when the process goes to treat the packet, softirq can read other packet (a lot of packets actually, see a bold arrow from softirq to the queue of the first socket at the figure). With packet lenght from 64 to 1500 bytes for common Ethernet link it's obviously that the packet, which is reading by the process now, could not be in CPU cache any more. The packet is simply  pushed out by other packets from the CPU cache. Thus ever with using zero copy networking user-space applications can not achieve good performance.

In fact Linux firewall works in softirq context. It means that the packet is processed synchronously, immediately when it is received. Moreover, synchronous packets processing is not limited by network level (on which firewalls works) operations. Fortunately Linux assembles TCP stream also in softirq context. Linux kernel also provides few callbacks in struct sock (see include/net/sock.h):

    void (*sk_state_change)(struct sock *sk);
    void (*sk_data_ready)(struct sock *sk, int bytes);
    void (*sk_write_space)(struct sock *sk);
    void (*sk_error_report)(struct sock *sk);
    int  (*sk_backlog_rcv)(struct sock *sk, struct sk_buff *skb);


For example sk_data_ready() is called when a new data received on the socket. So it is simple to read TCP data synchronously in deferred interrupt context. Writing to the socket is bit more harder, but still possible. Of course, your application must be in-kernel now.

Let's have a look at simple example how to use the hooks for TCP data reading. First of all we need a listening socket (this is kernel sockets, so the the socket API is different);

    struct socket *l_sock;
    sock_create_kern(AF_INET, SOCK_STREAM, IPPROTO_TCP, &l_sock);
    inet_sk(s->sk)->freebind = 1;
    /* addr is some address packed into struct sockaddr_in */
    l_sock->ops->bind(l_sock, (struct sockaddr *)addr,

                      sizeof(addr));

    l_sock->sk->sk_state_change = th_tcp_state_change;


    l_sock->ops->listen(l_sock, 100);


sk_state_chage() is called by Linux TCP code when the socket state is changed. We need a new connection established socket, so we need to handle TCP_ESTABLISHED state change. TCP_ESTABLISHED will be set for child socket of course, but we set the callback to listening socket because the child socket inherits the callback pointers from its parent. th_tcp_state_change() can be defined as:

    void
    th_tcp_state_change(struct sock *sk)
    {
        if (sk->sk_state == TCP_ESTABLISHED)

            sk->sk_data_ready = th_tcp_data_ready;
    }

And here we set other callback, but already for the child socket. th_tcp_data_ready() is called when a new data is available in socket receive queue (sk_receive_queue). So in the function we need to do what standard Linux tcp_recvmsg() does - traverse the queue and pick packets with appropriate sequence numbers from it:

    void
    th_tcp_data_ready(struct sock *sk, int bytes)
    {

        unsigned int processed = 0, off;
        struct sk_buff *skb, *tmp;
        struct tcp_sock *tp = tcp_sk(sk);


        skb_queue_walk_safe(&sk->sk_receive_queue, skb, tmp) {
            off = tp->copied_seq - TCP_SKB_CB(skb)->seq;
            if (tcp_hdr(skb)->syn)
                off--;
            if (off < skb->len) {

                int n = skb_headlen(skb);
                printk(KERN_INFO "Received: %.*s\n",

                       n - off, skb->data + off);
                tp->copied_seq += n - off;

                processed += n - off;
            }

        }  

        /*
         * Send ACK to the client and recalculate

         * the appropriate TCP receive buffer space.
         */

        tcp_cleanup_rbuf(sk, processed);
        tcp_rcv_space_adjust(sk);


        /* Release skb - it's no longer needed. */
        sk_eat_skb(sk, skb, 0);
}

The function should be more complicated to properly handle skb's paged data and fragments, release the skb, more accurately process TCP sequence numbers and so on, but basic idea should be clear.


UPD: You can find source code of the Linux kernel synchronous socket API at  https://github.com/krizhanovsky/sync_socket .

Monday, November 26, 2012

Coding Style

I believe that coding style in a project is important thing. MySQL, PostgreSQL, Linux and FreeBSD kernels, glibc and many other excelent open source projects have their own coding style. The styles are different and that's expected because they were developed by different people with different preferences. The only crucial thing is code consistency. I guess you saw a code where spaces and tabs were mixed for indentations - it is always disgusting to work with such code. So the code of a project can use any style his owner likes which, but it must be consistent.

In our projects we follow Linux kernel coding style (you can find it at linux/Documentation/CodingStyle) with some additions for C++. We use the same coding style for kernel C and application C++ programming, so we have adjusted the original guide with some C++ specific things. Here it is.

Thursday, September 20, 2012

Linux: scaling softirq among many CPU cores

Some years ago I have tested network interrupts affinity - you set ~0 as a CPU mask to balance network interrupts among all your CPU cores and you get all softirq instances running in parallel. Such interrupts distribution among CPU cores sometimes is a bad idea due to CPU caches computational burden and probable packets reordering. In most cases it is not recommended for servers performing some TCP application (e.g. web server). However this ability is crucial for some low level packet applications like firewalls, routers or Anti-DDoS solutions (in last cases most of the packets must be dropped as quick as possible), which do a lot of work in softirq. So for some time I was thinking that there is no problem to share softirq load between CPU cores.

To get softirq sharing between CPU cores you just need to do

    $ for irq in `grep eth0 /proc/interrupts | cut -d: -f1`; do \
        echo ffffff > /proc/irq/$irq/smp_affinity; \
    done

This makes (as I thought) your APIC to distribute interrupts between all your CPUs in round-robin fashion (or probably using some more cleaver technique). And this really was working in my tests.

Recently our client concerned about this ability, so I wrote very simple testing kernel module which just makes more work in softirq:

#include <linux/kernel.h>
#include <linux/module.h>
#include <linux/netfilter.h>
#include <linux/netfilter_ipv4.h>

MODULE_LICENSE("GPL");

/**
 * Just eat some local CPU time and accept the packet.
 */
static unsigned int
st_hook(unsigned int hooknum, struct sk_buff *skb,
        const struct net_device *in,
        const struct net_device *out,
        int (*okfn)(struct sk_buff *))
{
    unsigned int i;
    for (i = 0; i <= 1000 * 1000; ++i)
        skb_linearize(skb);

    return NF_ACCEPT;
}

static struct nf_hook_ops st_ip_ops[] __read_mostly = {
    {
        .hook = st_hook,
        .owner = THIS_MODULE,
        .pf = PF_INET,
        .hooknum = NF_INET_PRE_ROUTING,
        .priority = NF_IP_PRI_FIRST,
    },
};

static int __init
st_init(void)
{
    if (nf_register_hooks(st_ip_ops, ARRAY_SIZE(st_ip_ops))) {
        printk(KERN_ERR "%s: can't register nf hook\n",
               __FILE__);
        return 1;
    }
    printk(KERN_ERR "%s: loaded\n", __FILE__);

    return 0;
}

static void
st_exit(void)
{
    nf_unregister_hooks(st_ip_ops, ARRAY_SIZE(st_ip_ops));
    printk(KERN_ERR "%s: unloaded\n", __FILE__);
}

module_init(st_init);
module_exit(st_exit);

I loaded the system with iperf over 1Gbps channel. And I was very confused when see that only one CPU of 24-cores machine was doing whole the work and all other CPUs was doing nothing!

To understand what's going on lets have a look how Linux handles incoming packets and interrupts from network card (e.g. Intel 10 Gigabit PCI Express which is placed at drivers/net/ixgbe). Softirq works in per-cpu kernel threads, ksoftirqd (kernel/softirq.c: ksoftirqd()), i.e. if you have 4-cores machine, then you have 4 ksoftirqd threads (ksoftirqd/0, ksoftirqd/1, ksoftirqd/2 and ksoftirqd/3). ksoftirqd() calls do_softirq(), which by-turn calls __do_softirq(). The last one uses softirq_vec vector to get required hadler for current softirq type (e.g. NET_RX_SOFTIRQ for receiving or NET_TX_SOFTIRQ for sending softirqs correspondingly). The next step is to call virtual function action() for the handler. For NET_RX_SOFTIRQ net_rx_action() (net/core/dev.c) is called here. net_rx_action() reads napi_struct from per-cpu queue softnet_data and calls virtual function poll() - a NAPI callback (ixgbe_poll() in our case) which actually reads packets from the device ring queues. The driver processes interrupts in ixgbe_intr(). This function runs NAPI through call __napi_schedule(), which pushes current napi_struct to per-cpu softnet_data->poll_list, which net_rx_action() reads packets (on the same CPU) from. Thus softirq runs on the same core which received hardware interrupt.

This way theoretically if harware interrupts are going to N cores, then these and only these N cores are doing softirq. So I had a look at /proc/interrupts statistics and saw that only one 0th core is actually receiving interrupts from NIC while I set ~0 mask in smp_affinity for the interupt (actually I had MSI-X card, so I set the mask to all the interrupt vectors for the card).

I started googling for the answers why on earth interupts do not distribute among all the cores. The first topics which I found were nice articles by Alexander Sandler:

Following these articles not all hardware is actually able to spread interrupts between CPU cores. During my tests I was using IBM servers of particular model, but this is not the case of the client - they use very different hardware. This is why I saw one nice picture on my previous tests, but faced quite different behaviour on other hardware.

The good news is that linux 2.6.35 has introduced nice feature -  RPS (Receive Packet Steering). The core of the feature is get_rps_cpu() from dev/net/core.c, which computes a hash from IP source and destination addresses of an incoming packet and determines a which CPU send the packet to based on the hash. netif_receive_skb() or netif_rx() which call the function puts the packet to appropriate per-cpu queue for further processing by softirq. So there are two important consequences:
  1. packets are processed by different CPUs (with processing I mostly mean Netfilter pre-routing hooks);
  2. it is unlikely that packets belonging to the same TCP stream are reordered (packets reordering is a well-known problem for TCP performance, see for example Beyond softnet).
To enable the feature you should specify CPUs mask as following (the adapter from the example is connected via MSI-X and has 8 tx-rx queues, so we need to update masks for all the queues):

    $ for i in `seq 0 7`; do \
        echo fffffff > /sys/class/net/eth0/queues/rx-$i/rps_cpus ; \
    done

After runnign linux-2.6.35 and setting all CPUs to be able to process softirq I got following nice picture in top:

  2238 root      20   0  411m  888  740 S  152  0.0   2:38.94 iperf
    10 root      20   0     0    0    0 R  100  0.0   0:35.44 ksoftirqd/2
    19 root      20   0     0    0    0 R  100  0.0   0:46.48 ksoftirqd/5
    22 root      20   0     0    0    0 R  100  0.0   0:29.10 ksoftirqd/6
    25 root      20   0     0    0    0 R  100  0.0   2:47.36 ksoftirqd/7
    28 root      20   0     0    0    0 R  100  0.0   0:33.73 ksoftirqd/8
    31 root      20   0     0    0    0 R  100  0.0   0:46.63 ksoftirqd/9
    40 root      20   0     0    0    0 R  100  0.0   0:45.33 ksoftirqd/12
    46 root      20   0     0    0    0 R  100  0.0   0:29.10 ksoftirqd/14
    49 root      20   0     0    0    0 R  100  0.0   0:47.35 ksoftirqd/15
    52 root      20   0     0    0    0 R  100  0.0   2:33.74 ksoftirqd/16
    55 root      20   0     0    0    0 R  100  0.0   0:46.92 ksoftirqd/17
    58 root      20   0     0    0    0 R  100  0.0   0:32.07 ksoftirqd/18
    67 root      20   0     0    0    0 R  100  0.0   0:46.63 ksoftirqd/21
    70 root      20   0     0    0    0 R  100  0.0   0:28.95 ksoftirqd/22
    73 root      20   0     0    0    0 R  100  0.0   0:45.03 ksoftirqd/23
     7 root      20   0     0    0    0 R   99  0.0   0:47.97 ksoftirqd/1
    37 root      20   0     0    0    0 R   99  0.0   2:42.29 ksoftirqd/11
    34 root      20   0     0    0    0 R   77  0.0   0:28.78 ksoftirqd/10
    64 root      20   0     0    0    0 R   76  0.0   0:30.34 ksoftirqd/20

So as we see almost all of the cores are doing softirqs.

Friday, May 25, 2012

Software Transactional Memory (STM) in GCC-4.7

GCC-4.7 introduces new amazing feature - Software Transactional Memory (STM). It is still experimental and not yet optimized feature, however we already can have a look how STM works. Currently GCC implements pure Software TM, i.e. without hardware support. Intel announced hardware support for TM (HTM) in Haswell microarchitecture as Transactional Synchronization Extension (TSX), so probably in next year we'll have hybrid TM - software transactional memory with hardware optimizations.

Firstly, to understand what STM is, lets consider following simple program:

    #include <iostream> 
    #include <thread>

    static const auto THR_NUM = 4;
    static const auto ITER_NUM = 1000 * 1000;

    static auto a = 0, b = 0, c = 0;

    static void
    thr_func()
    {
            for (auto i = 0; i < ITER_NUM; ++i) {
                    ++a;
                    b += 2;
                    c = a + b;
            }
    }

    int
    main(int argc, char *argv[])
    {
            std::thread thr[THR_NUM];

            for (auto &t : thr)
                    t = std::thread(thr_func);

            for (auto &t : thr)
                    t.join();

            std::cout << "a=" << a << " b=" << b
                    << " c=" << c << std::endl;

            return 0;
    }

Now try to compile (don't forget -std=c++11 since C++11 is still not default option for g++) and run the program. Probably you'll see that a, b and c contains ugly values which change from run to run, e.g.:

        $ ./a.out
        a=2139058 b=4316262 c=6455320
        $ ./a.out
        a=2077152 b=4463948 c=6541100

Result is expected because 4 threads concurrently updates all the three variables and all the variables are updated in RMW (Read-Modify-Write) manner. Now lets place operations on all the three variables into one transaction (yes, this is very like database transactions), so all the variables will be read and written in atomic manner:

        static void
        thr_func()
        {
                for (auto i = 0; i < ITER_NUM; ++i)
                        __transaction_atomic {
                                ++a;
                                b += 2;
                                c = a + b;
                        }
        }

Lets compile the code with -fgnu-tm to enable STM in GCC and rerun the program. This time you'll see nice numbers, which stay the same regardless of the run try:

        $ ./a.out
        a=4000000 b=8000000 c=12000000
        $ ./a.out
        a=4000000 b=8000000 c=12000000

This is quite simple case and you'll probably prefer to use mutex here. But you can refer to Ulrich Drepper's "Parallel Programming with Transactional Memory" for more complicated example when mutex alternative is not so obvious. It's easy to see that STM would be quite useful for example to implement highly-concurrent self-balancing binary search tree which could need to lock number of nodes for rotation on insertion or deletion (traditionally such data structures are implemented by introducing per-node mutex and are prone to deadlocks).

You may noticed that STM version of the program runs much more slowly. So lets analyze what it's doing so long. For basic investigation lets run the program under strace and print system calls statistics:

        $ strace -f -c ./a.out
        ........
        % time   seconds  usecs/call   calls  errors syscall
        ------ --------- ----------- ------- ------- ---------
         99.39  0.021295          11    1920     390 futex
        .......

So it means that STM in libitm (GCC implements STM as libitm library which you can see in ldd output) is implemented via futex() system call, like common mutex. Before going deeper into libitm internals lets see at the transaction code more carefully and split the code into basic read and write operations. We have 3 memory locations, variables a, b and c, which we perform read and write operations on. First operation is ++a which is actually read a value from memory, update it and write back, so we have two operations here - one read and one write. Next b += 2 - exactly the same: read the value, add 2 and write it back. And the last one, c = a + b, is two reads (a and b) and one write to c. Moreover, all these operation are inside transaction, so we have to start and commit a transaction.

To understand what's going on inside thr_func() lets simplify it as follows:

        static void
        thr_func()
        {
               __transaction_atomic {
                        ++a;
               }
        }

and disassemble it:

        push   %rbp
        mov    %rsp,%rbp
        mov    $0x29,%edi
        mov    $0x0,%eax
        callq  400fd8 <_ITM_beginTransaction@plt>
        mov    $0x6052ec,%edi
        callq  4010b8 <_ITM_RU4@plt>
        add    $0x1,%eax
        mov    %eax,%esi
        mov    $0x6052ec,%edi
        callq  400fe8 <_ITM_WU4@plt>
        callq  400f48 <_ITM_commitTransaction@plt>
        pop    %rbp
        retq

Now we see four calls of _ITM_* functions (as explained in info libitm, GCC follows the Intel's Draft Specification of Transactional LanguageConstructs for C++ (v1.1) in its implementation of transactions, so _ITM_ prefix is just Intel's naming convention) for transaction begin, transaction commit and the pair of read (RU4) and write (WU4) operations.

_ITM_beginTransaction() saves the machine state (for x86 see  libitm/config/x86/sjlj.S) and calls GTM::gtm_thread::begin_transaction() (see libitm/beginend.cc) which initializes the transaction data, checks transaction nesting and performs other preparation steps.

_ITM_commitTransaction() is defined in  libitm/beginend.cc and tries to commit the transaction by calling GTM::gtm_thread::trycommit() and if it fails restarts the transaction. GTM::gtm_thread::trycommit() is the place where all the threads are sleeping in futex() (which we saw in strace output) to write all modified data. So this is most heavy part of transaction.


The most interesting stuff is in read and write operations. 0x6052ec is address of variable a. _ITM_RU4 and _ITM_WU4 are just a sequence of jumps which lead (in this particular case) to ml_wt_dispatch::load() and ml_wt_dispatch::store() correspondingly. First one accepts only the variable address and the second one - the variable address and the stored value. Load() reads a memory region by specified address, but before that it calls ml_wt_dispatch::pre_load() function which verifies that the memory location is not locked or recent and restarts the transaction (these service data is taken from global table indexed by hash function over the address). Store() by-turn calls ml_wt_dispatch::pre_write() which locks the memory location (all service data for the memory location also is taken by the same global table) and updated the release (version) of the memory location before the write (the release version is checked in pre_load() as 'recent').