High Performance Linux



> Try Tempesta FW, a high performance open source application delivery controller for the Linux/x86-64 platform.

> Or check custom high-performance solutions from Tempesta Technologies, INC.

> Careers: if you love low-level C/C++ hacking and Linux, we'll be happy to hear from you.


Tuesday, January 7, 2014

Haswell AVX2 for Simple Integer Bitwise Operations

It's known that vector extension instructions of modern microprocessors are very useful for complicated SIMD (Single Instruction Multiple Data) computations, e.g. matrix multiplication, float point calculations for graphic and video applications etc. However, what about simple bitwise operations on plain integers? Do vector extensions provide any notable benefit in comparison with loops unrolling and other common optimizations on C-level?

Let's study simple example. We have 16 32-bit bitmaps and need to check them all for particular bit, just say whether the bit is set in any of the bitmaps or not. The bitmaps are placed in contiguous memory. Without loss of generality I'll consider following set of the bitmaps having or not set 11th bit (2048 = 1 << 11):

    0    2048 4096 48   5    11   8192 56   304  16384 3    204  208  60   901  208
     0    304  2048 48   5    11   8192 56   4096 16384 3    204  208  60   901  208
     0    304  4096 48   5    2048 8192 56   11   16384 3    204  208  60   901  208
     0    304  4096 48   5    11   2048 56   1024 16384 3    204  208  60   901  208
     0    304  4096 48   5    11   8192 56   2048 16384 3    204  208  60   901  208
     0    304  4096 48   5    11   8192 56   3    16384 2048 204  208  60   901  208

     0    304  4096 48   5    11   8192 56   60   16384 3    204  208  2048 901  208
     0    304  4096 48   5    11   8192 56   208  16384 3    204  208  60   901  2048
     0    304  4096 48   2000 11   8192 5    56   16384 3    204  2040 60   901  2047
     2048 2048 2048 2048 2048 2048 2048 2048 2048 2048  2048 2048 2048 2048 2048 2048


The first eight arrays of bitmaps just have the bit set in different bitmaps, 9th array doesn't have a bitmap with the set bit and finally 10th array has all the bitmaps with the bit.

Now we can write a simple C-function which checks all the bitmaps for the specified bit and returns true or false (the full example can be found at GitHub):

    inline bool
    cycle_lookup_naive(unsigned int bm, volatile unsigned int *w)
    {
        for (int i = 0; i < 16; ++i)
            if (bm & w[i])
                return true;
        return false;
    }


This function accepts desired bitmask as a first argument and array of the bitmaps as the second argument. Since the function is inlined and observes static constant array, I passed the second argument as volatile memory area to prevent compiler from optimizing out a loop in which the function is called 10M times.

I wish all available in G++ 4.7 optimizations for my Intel Core i7-4650U (Haswell), so I compile the program with -O3 -march=core-avx-i -mtune=core-avx-i -mavx2. GCC has auto-vectorization feature which is turned on on -O3 optimization level. The function running 10M times shows following results for the 10 arrays:

    19ms 18ms 24ms 31ms 35ms 42ms 48ms 54ms 53ms 10ms

And there is obvious, but still interesting property of the implementation - it runs faster on arrays which have the desired bitmap closed to the begin. The second thing is that run for 9th arrays, which doesn't have a bitmap with the set bit at all, is bit faster than for the array which has the set bit in last bitmap. This can be explained by branch prediction - processor caches branch in which there is no matching, so it stalls little bit when it finds matching last bitmap. There is interesting discussion on branch prediction of StackOveflow: Why is processing a sorted array faster than an unsorted array?.

The first optimization of the function which we can do is simply use iterate and match the bitmaps by 64-bit values instead of 32-bit:

    inline bool
    cycle_lookup(unsigned int bm, volatile unsigned int *w)
    {
        unsigned long *_w = (unsigned long *)w;
        unsigned long _bm = ((unsigned long)bm << 32) | bm;

        for (int i = 0; i < 8; ++i)
            if (_bm & _w[i])
                return true;
        return false;
    }


And this gives us real speedup:

    13ms 18ms 18ms 21ms 24ms 27ms 29ms 30ms 30ms 14ms

The reason for slight slowdown for the last array, when we find a bitmap from first try, is that we perform additional computation for 64-bit bitmap _bm. We also see that the function execution time depends on input data and this is not always desired. So since we just need to answer whether any of the bitmaps contain desired bit, we can OR all the bitmaps and check the result against bm. Also this way we can eliminate branch misprediction problem (thanks to Melzzzz for the proposed optimization which moves AND out of the loop):

    bool
    cycle_lookup_opt(unsigned int bm, volatile unsigned int *w)
    {
        unsigned long r = 0;
        unsigned long *_w = (unsigned long *)w;
        unsigned long _bm = ((unsigned long)bm << 32) | bm;

        for (int i = 0; i < 8; ++i)
                r |= _w[i];
        return r & _bm;
    }


And this gives us

    23ms 24ms 23ms 24ms 24ms 24ms 24ms 24ms 24ms 24ms

This is 24ms in average which is worse than 22.4ms for previous case. However this is only for the exact test set which can differ on real data (what if in most cases you find the bit only in the last bitmap?).

Now it's time to look under the hood of the compiler. In all the three cases the loop has constant bounds known on compile time, so it's expectable that compiler will vectorize the loop (recall that we compile the program with -03 which enables loops vectorization). However, it isn't so. Here is assembly code for cycle_lookup_opt:

    movl    %edi, %edi
    movq    %rdi, %rdx
    salq    $32, %rdx
    orq     %rdi, %rdx
    movq    %rdx, %rcx
    movq    %rdx, %rax
    andq    8(%rsi), %rax
    andq    16(%rsi), %rcx
    orq     %rax, %rcx
    movq    %rdx, %rax
    andq    (%rsi), %rax
    orq     %rcx, %rax
    movq    %rdx, %rcx
    andq    24(%rsi), %rcx
    orq     %rcx, %rax
    movq    %rdx, %rcx
    andq    32(%rsi), %rcx
    orq     %rcx, %rax
    movq    %rdx, %rcx
    andq    40(%rsi), %rcx
    orq     %rcx, %rax
    movq    %rdx, %rcx
    andq    56(%rsi), %rdx
    andq    48(%rsi), %rcx
    orq     %rcx, %rax
    orq     %rax, %rdx
    setne   %al
    ret


So the compiler simply unrolls the loop and also performs some operations grouping. It decides not to use CPU vector extension while -mavx2 was passed as optimization option.

AVX2 appeared in Haswell architecture allows 256-bit operations, so we can handle our bitmaps scanning only in two steps by 32 bytes at once. It looks promising that CPU will check 8 bitmaps at once, however there is drawback - it's costly to load/store YMM registers which are used by AVX.

First of all we need to convert our 32-bit bitmask to 256-bit value:

    __m256i m = _mm256_set_epi32(bm, bm, bm, bm, bm, bm, bm, bm);

Next, we load our 16 bitmaps to two 256-bit values:

    __m256i v0 = _mm256_set_epi32(w[0], w[1], w[2], w[3],
                                  w[4], w[5], w[6], w[7]);
    __m256i v1 = _mm256_set_epi32(w[8], w[9], w[10], w[11],
                                  w[12], w[13], w[14], w[15]);


Now we can perform AND operation on v0 and v1 with m which give us 1 only on 11ths positions in v0 and v1, so we can safely OR the values to get only one 256-bit value:

    __m256i o = _mm256_or_si256(a0, a1);

Unfortunately, we can't just evaluate o and return true or false, instead we have to unpack it:

    union {
        __m128i _128[2];
        int     _32[8];
    } mr;

    mr._128[0] = _mm256_extracti128_si256(o, 0);
    mr._128[1] = _mm256_extracti128_si256(o, 1);


and only after that evaluate the result with 8 ORs:

    #define OR8(a)  (a[0] | a[1] | a[2] | a[3] \
                     | a[4] | a[5] | a[6] | a[7])
    return !!OR8(mr._32);

This function (avx2_lookup() in the source code) gives us following timings:

    160ms 161ms 160ms 160ms 161ms 160ms 160ms 161ms 160ms 160ms

Thus our naive vectorization has nasty performance. To improve performance on the function we should reduce load/store overhead. Our optimization concludes in 3 steps which reduces initial two 256-bit values to two 64-bit ones. Also we can use _mm256_set_epi64x() which loads 256-bit YMM registers faster than _mm256_set_epi32(). Now resulting optimized function looks as below:

    bool
    avx2_lookup_opt1(unsigned int bm, volatile unsigned int *w)
    {
        union {
            __m128i _128;
            long    _64[2];
        } mr;
        __m128i m = _mm_set1_epi32(bm);
        __m256i v0 = _mm256_set_epi64x(*(long *)w,

                                       *(long *)(w + 2),
                                       *(long *)(w + 4),

                                       *(long *)(w + 6));
        __m256i v1 = _mm256_set_epi64x(*(long *)(w + 8),

                                       *(long *)(w + 10),
                                       *(long *)(w + 12),

                                       *(long *)(w + 14));
        __m256i o0 = _mm256_or_si256(v0, v1);
        __m128i h0 = _mm256_extracti128_si256(o0, 0);
        __m128i h1 = _mm256_extracti128_si256(o0, 1);
        __m128i o1 = _mm_or_si128(h0, h1);
        mr._128 = _mm_and_si128(o1, m);
        return mr._64[0] || mr._64[1];
}


Note that we perform first OR on v0 and v1 reducing them to one 256-bit value o0, next we loads first and second its halves into 128-bit h0 and h1 respectively. We do next reduction by OR against h0 and h1 getting o1. Only here we perform our AND operation. And finally, we load two halves of o1 into to 64-bit longs and return result of last OR operation. This optimizations give us

    71ms 70ms 70ms 69ms 70ms 70ms 71ms 70ms 69ms 71ms

Much better, but still worse than our even non-optimized plain C-loop. Hopefully, we have VTESTPS AVX instruction which can perform AND operation on two 256-bit operands and set ZF flag if all 32-bit words of the result are zero after the operation. Using the instruction (appropriate compiler intrinsic can be found in Intel® 64 and IA-32 Architectures Software Developer’s Manual Volume 2 (2A, 2B & 2C): Instruction Set Reference, A-Z) we can rewrite the function in following way:

    bool
    avx2_lookup_opt2(unsigned int bm, volatile unsigned int *w)
    {
 
        __m256i m = _mm256_set1_epi32(bm);
        __m256i v0 = _mm256_set_epi64x(*(long *)w,

                                       *(long *)(w + 2),
                                       *(long *)(w + 4),

                                       *(long *)(w + 6));
        __m256i v1 = _mm256_set_epi64x(*(long *)(w + 8),

                                       *(long *)(w + 10),
                                       *(long *)(w + 12),

                                       *(long *)(w + 14));
        __m256i o = _mm256_or_si256(v0, v1);
        return !_mm256_testz_si256(o, m);
}


which has very short assembly code and give us amazing results

    20ms 20ms 20ms 20ms 20ms 21ms 20ms 20ms 20ms 20ms

Which are not only faster than plain C loop in average, but also has very stable execution time.

Thus regardless GCC doesn't vectorize this loop with known bounds for simple bitwise operations on inegers,  manual vectorization of the loop with new AVX2 instruction set gives about 17% performance benefit in average.


 UPD.

Unfortunately,  the measurements were wrong. The compiler unrolls loops in test() and test_all() macroses and moves called function initialization code (more precisely, loading of wc array) out of the loop. So calling avx2_lookup_opt2() can be depicted in pseudo code in following way:

    load all the arrays to YMM0-YMM14 registers and make OR
    for [0,10M]:
        load bm to YMM14 register
        YMM14 AND YMM0-YMM14


So most heavy part of avx2_lookup_opt2(), YMM registers loading , was running out of the loop.

To avoid unnecessary optimization we should trick the compiler:

    volatile unsigned int * volatile wc;
    static unsigned int __wc[10][16] __attribute__((aligned(64)));


    wc = (volatile unsigned int * volatile)__wc;

Note double volatile specifier in wc declaration, which says that not only the pointer itself, but also the pointed memory, are volatile. Surely, we also should adjust code which uses wc now. And now we can rerun our tests:

First naive loop:

    18ms 20ms 31ms 36ms 45ms 50ms 62ms 83ms 83ms 12ms

Naive loop with 64-bit steps:

    18ms 18ms 19ms 24ms 27ms 32ms 38ms 42ms 41ms 18ms

Loop with OR instead of conditional return:

    43ms 43ms 43ms 43ms 44ms 42ms 43ms 42ms 43ms 43ms

First naive AVX2 implementation:

    160ms 159ms 160ms 161ms 160ms 161ms 159ms 160ms 159ms 161ms

Optimized AVX2 version:

    72ms 72ms 72ms 73ms 72ms 73ms 72ms 73ms 73ms 73ms

AVX2 version with VTESTPS instuction:

    65ms 63ms 64ms 64ms 64ms 63ms 64ms 64ms 65ms 63ms

Thus execution time of all the tests increased and my "cool" AVX2 implementation is actually much slower, then naive C implementation.

Thanks to Melzzzzz who proposed very short implementation for the function in bare assembly at comp.lang.asm.x86 group. So I had a look at generated by G++ code for avx2_lookup_opt2() and it looks messy:

          # Messy loadings of the array pieces from memory to YMM2 and YMM0 registers.     
    vmovq   8(%rsi), %xmm1
    vpinsrq $1, (%rsi), %xmm1, %xmm0
    vmovq   24(%rsi), %xmm1
    vpinsrq $1, 16(%rsi), %xmm1, %xmm2
    vmovq   40(%rsi), %xmm1
    vinserti128     $0x1, %xmm0, %ymm2, %ymm2
    pinsrq $1, 32(%rsi), %xmm1, %xmm0
    vmovq   56(%rsi), %xmm1
    vpinsrq $1, 48(%rsi), %xmm1, %xmm3
    vinserti128     $0x1, %xmm0, %ymm3, %ymm0 


    # __m256i o = _mm256_or_si256(v0, v1)
    vpor    %ymm0, %ymm2, %ymm0


    #  __m256i m = _mm256_set1_epi32(bm)
    vmovd   %edi, %xmm2
    vpshufd $0, %xmm2, %xmm1
    vinserti128     $1, %xmm1, %ymm1, %ymm1
    

    # ! _mm256_testz_si256(o, m)
    vptest  %ymm1, %ymm0
    setne   %al
 

    # Zero upper halves of all YMM registers for interoperability
    # with legacy SSE code.
    vzeroupper

    ret


Instead of the messy loading to YMM0 and YMM2 registers, Melzzzzz proposed to load only one YMM register using VMOVUPS instruction and perform OR on the register and memory.

In fact we don't need to use explicit loading into YMM0 and YMM2 - compiler can do this for us if we rewrite the function in this way (knowing that we can load 256-bit memory operands into the registers):

    __m256i m = _mm256_set1_epi32(bm);
    __m256i o = _mm256_or_si256(*(__m256i *)w, *(__m256i *)(w + 8));
    return !_mm256_testz_si256(o, m);


Also we don't need to be compatible with legacy SSE code, so we can use -mno-vzeroupper compiler option to avoid emitting of vzeroupper instruction. Thus as a result we get very short assembly code, which is very close to Melzzzzz proposal:

    # Move 256-bit at once from aligned memory to YMM0
    # (Also proposed by Joker-eph in the comments below). 
    vmovdqa (%rsi), %ymm0
 

    vmovd   %edi, %xmm2
    vpshufd $0, %xmm2, %xmm1
    vinserti128     $1, %xmm1, %ymm1, %ymm1


    # Use memory operand in OR
    vpor    32(%rsi), %ymm0, %ymm0

    vptest  %ymm1, %ymm0
    setne   %al
    ret


And now it runs with following times:

    21ms 21ms 21ms 20ms 21ms 21ms 20ms 21ms 21ms 21ms

Giving that our faster C-implementation (naive loop with 64-bit steps) runs for 26ms in average after the benchmark fixes, we get 19% performance improvement in average and 100% in worse case!

Friday, November 29, 2013

Calling Closed Kernel Functions in Linux Kernel Modules

Linux kernel exports by EXPORT_SYMBOL and Co. some its functions. Such functions can be used in loadable kernel modules. However, other functions, e.g. ip_rcv() or tcp_v4_rcv(), are closed. If you need some of these functions, then you can write trivial kernel patch which just exports the functions. We do this in our Synchronous Sockets.

However, there is more simple method. Linux kernel has nice kallsyms interface, which provides you addresses of kernel symbols. So firstly, you can just grep required symbol:

    $ grep '\<ip_rcv\>' /proc/kallsyms
    ffffffff8143590a T ip_rcv

And call this from a shell script and pass it somehow to your module which needs to call the function.

Hopefully, Linux kernel exports interface to kallsyms, so GPL-licensed modules can use it to find desired symbols.

Recently, we've written simple Linux kernel module which makes Nginx HTTP server working in Deep Packet Inspection (DPI) mode - you can attach a machine with Nginx to SPAN port of you router and Nginx thinks that it gets traffic from real clients and operate with them in common way. To do this we had to generate custon TCP ACK, FIN and RST segments and pass them directly to Linux TCP code. We did this with tcp_do_rcv() call. So lets see how to call the closed function from loadable kernel module:

    static int (*tcp_v4_rcv_ptr)(struct sk_buff *);

    static void *
    get_tcp_v4_rcv_ptr(void)
    {
        unsigned long tcp_v4_rcv_addr = 0;

        int get_tcp_v4_rcv(void *data, const char *namebuf,
                           struct module *owner, unsigned long addr)
        {
            if (strcmp(namebuf, "tcp_v4_rcv"))
                return 0;
            *(unsigned long *)data = addr;
            return 1;
        }

        kallsyms_on_each_symbol(get_tcp_v4_rcv, &tcp_v4_rcv_addr);

        return (void *)tcp_v4_rcv_addr;
    }


    tcp_v4_rcv_ptr = wd_get_tcp_v4_rcv_ptr();

    /* Call tcp_v4_rcv() and pass the packet directly to TCP code. */
    tcp_v4_rcv_ptr(aw->skb);

Monday, November 11, 2013

Studying Intel TSX Performance

Lock-free algorithms on atomic operations perfectly work with updating of small data (typically 8 or 16 bytes on modern x86-64 hardware). If you need to update more data, then you have to spin in checking loop to verify whether a particular update is consistent with other concurrent updates.

Suppose you have N source bank accounts and N destination bank accounts. And you need to transfer money from the source accounts to the destination at once. This is classic example for database transaction (usually database books use N = 1). For simplicity we can describe each account by one integer number, so if N = 1, then we can handle the transaction using double CAS (Compare And Swap, cmpxchg16b instruction on x86-64) operation. However, if N is much larger, then it's time to think about Transactional Memory. One year ago I've written about software transactional memory in GCC, but it's quite slow. So now it's time to see at Intel TSX.

The Test Case

Our target is to atomically execute following function:

    void
    trx_func(unsigned long trx_sz)

    {
        for (unsigned i = 0; i < trx_sz; ++i) {
            debit[i] += 1;
            credit[i] += -1;
        }
    }


(we move only one dollar in our example). Intel TSX operates by CPU cache lines (64 bytes for Haswell), so we need to ensure that each transaction reads and modifies only its own cache lines and doesn't affect cache lines of other transactions. So debit and credit could be defined as:

    struct CacheLine {
        long c[L1DSZ / sizeof(long)];
        CacheLine() : c{0} {}

        void
        operator+=(int x)
        {
            c[0] += x;
        }
    } __attribute__((aligned(L1DSZ)));

    CacheLine debit[TRX_BUF_SZ_MAX]

        __attribute__((aligned(L1DSZ)));
    CacheLine credit[TRX_BUF_SZ_MAX]

        __attribute__((aligned(L1DSZ)));

L1DSZ is size of cache line (getconf LEVEL1_DCACHE_LINESIZE). TRX_BUF_SZ_MAX is just some relatively big value, in my case it's 8192, we won't refer to it any more.

To understand TSX performance we need some code which can be compared with TSX transactions. So let's write simple spin lock synchronization:

    void
    execute_spinlock_trx(unsigned long trx_sz)
    {
        pthread_spin_lock(&spin_l);

        trx_func(trx_sz);

        pthread_spin_unlock(&spin_l);
    }


Certainly, the code must be run on many threads on multi core system. I won't show the threading code, you can find it at GitHub (compilation notes are in the header comment of the source code file).

Now let's have a look how we can use Intel TSX to execute trx_func() atomically:

    void
    execute_short_trx(unsigned long trx_sz)
    {
        while (1) {
            unsigned status = _xbegin();

            if (status == _XBEGIN_STARTED) {
                // we're in transactional context

                // Hacky check whether spinlock is locked.
                // See glibc/nptl/sysdeps/x86_64/pthread_spin_unlock.S

                if ((int)spin_l != 1)
                    _xabort(_ABORT_LOCK_BUSY);

                trx_func(trx_sz);

                _xend();

                return;
            }
            

            if (!(status & _XABORT_RETRY)
                && !(status & _XABORT_CONFLICT)
                && !((status & _XABORT_EXPLICIT)
                     && _XABORT_CODE(status) != _ABORT_LOCK_BUSY))
                break;

            _mm_pause();
        }

        // fallback to spinlock.
        execute_spinlock_trx(trx_sz);
    }


_xbegin(), _xend() and _xabort() functions as well as  _ABORT_LOCK_BUSY and _XABORT_* defines are stolen from glibc-2.18 code (nptl/sysdeps/unix/sysv/linux/x86/elision-lock.c, see also Lock Elision in the GNU C Library).

The function was also mostly written using __lll_lock_elision() from glibc-2.18 as an example. The function does following. Firstly, it starts TSX RTM (Restricted Transactional Memory) transaction using _xbegin(). If the transaction is normally started, then status has value _XBEGIN_STARTED and we're going into appropriate if branch. Code in the branch ends with return statement, so we exit function if the transaction is normally commited (using _xend() call). If the transaction aborts due to any reason, then all the changes in the branch are rolled back. Moreover, on rollback status takes different value and we jump to just after _xbegin() and test status again. Thus, the code after if corresponds to aborted transaction.

The function has a fallback path to spin lock. This is a common practise for TSX programming. Andi Kleen wrote nice article about this. Firstly, we check that spin lock is unlocked. This is done in transactional context, so TSX adds lock_l to its read set, so if some other CPU tries to acquire the lock, then it updates lock_l and current transaction aborts. If the lock is acquired, then somebody modifies protected data using the spin lock, so we need to abort the transaction. Next, there is two possibilities: try to execute the transaction again or also, like other CPU, fallback to spin lock.

Just falling back to spin lock it it's already acquired by other CPU gave very poor performance. Imagine that there is 2 CPUs. The first one tries to run transaction, but it aborts due to some reason (aborts are very common for TSX as we'll see bit later) and falls back to spin lock, acquires it and starts to update data. The second CPU also tries to execute transaction and sees that the lock is held by the first CPU, so it also fails back to spin lock. Spin lock is busy, so the second CPU goes to busy loop on it. When the first CPU finishes with its updates, then it releases the lock and the lock immediately acquired by waiting CPU. Now first CPUs tries to run transaction again and finds that the lock is acquired by other CPU, so it also fails back to spin lock... This scenario shows that naive fallback can lead to situation when only spin lock is usedto synchronize data and transactional memory doesn't work at all.

Glibc's __lll_lock_elision() uses adaptive locking algorithm which tries to balance between transaction restartings and fallbacks. We're interested in TSX properties, so our algorithms tries hardly to execute transaction.

On transaction abort processor sets flags which indicate the reason for abort. If _XABORT_RETRY is set, then processor suggests that there is sense to restart transaction. If we abort the transaction explicitly, then _XABORT_EXPLICIT is set. And _XABORT_CONFLICT indicates that there is data conflict with other transaction. In these three cases we restart current transaction. However, transaction can be aborted due to limited system resources (_XABORT_CAPACITY) or other, not for busy lock, explicit abort. So we check the abort code and fallback to spin lock in all other cases.

Test Results

For performance measurements I used Intel Core i7-4650U (dual core 1.70GHz with hyperthreading). The processor has 32KB 8-way Data L1 cache. The system was running Linux 3.12.0-rc6 with patches by Andi Kleen (git://git.kernel.org/pub/scm/linux/kernel/git/ak/linux-misc.git hsw/pmuX). X server and neworking were down during the tests and no any activity was performed on the machine.

It seems (see the abort tests below and Intel documentation: "Intel 64 and IA-32 Architectures Software Developer’s Manual Volume 1: Basic Architecture" and "Intel 64 and IA-32 Architectures Optimization Reference Manual") that TSX transactions abort if data doesn't fit L1 data cache, so all the tests uses very small data set which fits into L1 cache. Sine there is no memory operations or other CPU waiting points, then this is the case to switch HyperThreading off for better performance. My computer doesn't have such BIOS option, so I just use 2 threads binded to physical cores (CPUs 0 and 1):

    $ grep 'processor\|core id' /proc/cpuinfo
    processor       : 0
    core id         : 0
    processor       : 1
    core id         : 1
    processor       : 2
    core id         : 0
    processor       : 3
    core id         : 1


All the tests below were ran for 10M iterations (i.e. iter variable is equal to 10000000).

Aborts on Single-threaded workload

Single-threaded workload shows how TSX transactions work without contention on shared data between CPUs. This testing workload is produced by following lines in main():

    for (int trx_sz = 32; trx_sz <= 1024; trx_sz += 4)
        run_test(1, trx_sz, 1, 0, iter, Sync::TSX);

Figure 1: Dependency of aborts on transaction size (1 CPU)
Dependency of aborts number on transaction size (in cache lines) is depicted on Figure 1 (both the axes are logarithm scaled). Number of aborts (precisely, transaction aborts with clean _XABORT_RETRY bit in status) reaches 100% (10M) at around 256 cache lines. I count aborts number by local integer counter inside transaction abort handling code (please, see execute_short_trx() the source code for details). TSX provides abort code for aborted transaction, so we easily can gather statistics which type of aborts dominate in this workload. Just compile the program with -DABORT_COUNT and run the test case for trx_sz = 256:

    explicit abrt: 0
    retry abrt: 18
    conflict abrt: 18
    capacity abrt: 9969559


Let's check the results with Intel PCM tool (output is reduced for brevity):

    # ./pcm-tsx.x a.out -e RTM_RETIRED.ABORTED_MISC1 -e RTM_RETIRED.ABORTED_MISC2 -e RTM_RETIRED.ABORTED_MISC3 -e RTM_RETIRED.ABORTED_MISC4

    Time elapsed: 10453 ms 

    Event0: RTM_RETIRED.ABORTED_MISC1 Number of times an RTM execution aborted due to various memory events (raw 0x8c9)
    Event1: RTM_RETIRED.ABORTED_MISC2 Number of times an RTM execution aborted due to uncommon conditions (raw 0x10c9) 
    Event2: RTM_RETIRED.ABORTED_MISC3 Number of times an RTM execution aborted due to HLE-unfriendly instructions (raw 0x20c9) 
    Event3: RTM_RETIRED.ABORTED_MISC4 Number of times an RTM execution aborted due to incompatible memory type (raw 0x40c9)

    Core | Event0  | Event1  | Event2  | Event3
       0   9966 K       0         0         0    
 
      1      0         0         0         0    
  
     2      0         0         0         0    
  
    3      0         0         0         0
--------------------------------------------------
  
    *   9966 K       0         0         0      


Figure 2: Dependency of retries on transaction size
 So most of the aborts are caused by capacity problem. 256 * 64 = 16384 and this is a half of L1 data cache. The cache has 8-way associativity, however, it's still unlikely that the transaction work set produces so many address collisions that we can't utilize the cache fully. It is also unlikely that other program data utilizes rest 1 / 2 of the cache. So it seems that transaction size has lower limit even than L1 data cache.

Let's also plot graphs for number of retries and whole test execution time depending on transaction buffer size. Results are shown of Figure 2 and Figure 3 correspondingly.

Figure 3: Dependency of execution time on transaction size
 The time plot also shows significant fluctuation around transaction size 256 cache lines. At transaction size 244 it jumps from 10180ms to 12292ms after which execution time smoothly decreases to 9094ms for transaction size 264 and grows again.

UPD 1: as slotty noticed in the comment below each transaction in trx_func() modifies actually 2 cache lines, for debit and credit updates. The figure was drawn for transactions rather than acual number of modified cache lines by each transaction. So TSX transactions actually are limited by full L1d cache size.

TSX vs Spin Lock: Transaction Time

To run this test case we need to modify our trx_func() in following way:

    void
    trx_func(int thr_id, unsigned long trx_sz, int trx_count)
    {
        for (int c = 0; c < trx_count; c++)

            for (unsigned i = 0; i < trx_sz; ++i) {
                unsigned shift = thr_id * trx_sz + i;
                debit[shift] += 1;
                credit[shift] += -1;

            }
    }

Thus, we just execute the same data updates multiple time, so transaction work set stays the same while transaction time increases. Plus to adding surrounding loop I also added thread ID (0 or 1) to calculation of offset of updated item. This change allows both the CPUs perform always on different cache lines, so there is no data contention. And following lines of source code are responsible for the test:

    for (int trx_count = 1; trx_count <= 201; trx_count += 10)
        run_test(2, 2, trx_count, 0, iter, Sync::TSX);
    for (int trx_count = 1; trx_count <= 201; trx_count += 10)

        run_test(2, 2, trx_count, 0, iter, Sync::SpinLock);

Results for the tests are depicted on Figure 4. So for the short transactions (trx_count < 50) TSX shows better execution time, but on trx_count = 51 spin lock overtakes it.

Figure 4: TSX vs Spin Lock: Transaction Time
This results shows that TSX performs 3 times better (401ms vs 1329ms for trx_count = 1) on small transaction. This is interesting, but how to use this results in practise? I.e. when we should use TSX and when spin lock? In this thread Andi Kleen suggests "For contended conventional locks we usually recommend at least 200ns". This is also "just a number" and real benchmarks for particular workload, which is observed for TSX applicability, must be done.

However, in our case we don't have have data contention, i.e. both the CPUs can perform in parallel. Obviously, spin lock which must be acquired to change any data makes the code singe threaded (only one CPU can update the data at any given time). I expected that TSX should show much better results for the test due to more parallelism, but it isn't so...

To understand the issue let's compare aborts statistics for trx_count = 1 and trx_count = 60. For trx_count = 1 our statistics shows:

    explicit abrt: 28
    retry abrt: 567
    conflict abrt: 589

    capacity abrt: 8

for CPU 0 and

    explicit abrt: 67
    retry abrt: 441
    conflict abrt: 506
    capacity abrt: 3


for CPU 1. Meantime, pcm-tsx reports:

    Core | Event0  | Event1  | Event2  | Event3
       0    596         0        28         0     
       1    508         0        67         0     


Thus we can see that Event 2 with cryptic description "Number of times an RTM execution aborted due to HLE-unfriendly instructions" exactly matches our explicit aborts. Intel TSX has set of instructions which leads to transaction aborts. It seems that the aborts are handled as explicit (this is why we need to check abort code in execute_short_trx()). However, it's unclear why we didn't see the aborts in single threaded workload and Intel documentation with list of the instructions doesn't answer the question. Values for Event 0, "Number of times an RTM execution aborted due to various memory events", are very close to conflict aborts... The corresponding values for trx_count = 60 are:

     explicit abrt: 8524329
     retry abrt: 8538461
     conflict abrt: 8538484
     capacity abrt: 61


for CPU 0 and

    explicit abrt: 8524788
    retry abrt: 8554159
    conflict abrt: 8554179
    capacity abrt: 187


for CPU 1. pcm-tsx says:

    Core | Event0  | Event1  | Event2  | Event3
       0   8538 K       0      8524 K       0     
       1   8554 K       0      8524 K       0     


So the reason for low performance on many iterations inside the transaction is too huge aborts rate. Why do we see so many conflict aborts on uncontended data updates? Actually we have contended data - our spin lock for fallback. If we comment the fallback code (spin lock checking in transaction and acquiring the lock at the end of the function), then we'll see much better picture for trx_count = 60:

    explicit abrt: 0
    retry abrt: 425
    conflict abrt: 425
    capacity abrt: 204


    explicit abrt: 0
    retry abrt: 1886
    conflict abrt: 1886
    capacity abrt: 139


    Core | Event0  | Event1  | Event2  | Event3
       0    629         0         0         0     
       1   2025         0         0         0     


So it seems that spin lock fallback produces two types of aborts at the same time. If we comment out only _xabort(_ABORT_LOCK_BUSY), then we'll see very similar picture - zero Event 2. So Event 2 is exactly our explicit aborts. Intel documentation notes that transactions can abort due to various reasons - it looks like we have these various reasons as Event 0 and conflict & retry aborts.

TSX vs Spin Lock: Transaction Size

Now do the same as for previous test, but vary transaction work set instead of running time. The source code lines for the test are:

    for (int trx_sz = 1; trx_sz <= 256; trx_sz <<= 1)
        run_test(2, trx_sz, 1, 0, iter, Sync::TSX);
    for (int trx_sz = 1; trx_sz <= 256; trx_sz <<= 1)
        run_test(2, trx_sz, 1, 0, iter, Sync::SpinLock);


The test results are depicted on Figure 5 (note that both the axes are logarithm scaled). As for previous test we see very similar picture - TSX outperforms spin locks only for small data sets and loses at already at 32 cache lines.
Figure 5: TSX vs Spin Lock: Transaction Size

64 cache lines is a point at which TSX gets too much aborts (6,4M in camparison with only 7K for 32 cache lines). In discussion on Intel forum Andi Kleen suggested to things to optimize TSX performance:
  • "wait for the lock to become free again before retrying";
  • and "additional randomized backoff can also improve it in some cases".
So I made couple adjustments in execute_short_trx() function. Firstly, if the lock is acquired the function initially retries a transaction if it fins the lock acquired, but does it in transactional context. So I added following loop in abort handling part of the function:

    if ((status & _XABORT_EXPLICIT)
         && _XABORT_CODE(status) != _ABORT_LOCK_BUSY)
    {
        while ((int)spin_l != 1)
            _mm_pause();
        continue;
    }


Figure 6: TSX aborts on dual core workload
(the full adjusted code is available on GitHub). So we're spinning in the busy loop in waiting for the spin lock releasing before we restart the transaction. Results are shown of Figure 5 by blue curve - it shows much better time for the point of 64 cache lines (2314ms vs. 3412ms). Some of the other points somewhat better and some of them are somewhat worse.

To implement random fallbacks I used local abort counter abrt for the function (how many aborts happen during this run) and small array abrt_fallback of 64 constant items for the counter values. In the test each thread does 20M iterations and I've seen maximum aborts values also very close to 20M, so transactions have 1 abort in average. Thus I used very small values in abrt_fallback array from 0 to 0x39. To get randomness I intermixed the values. Following code does the "random" fallbacks:

    if (++abrt == abrt_fallback[af]) {
        af = (af + 1) % (sizeof(abrt_fallback)
                         / sizeof(*abrt_fallback));
        break;
    }


where af is global thread local index in the array.

Figure 6 shows how TSX aborts (for basic and the optimized versions) number raises in dual CPU environment (the figure is logarithm scaled on both the axes). Random fallbacks provides the lower abort rate in most cases, however as Figure 5 show it doesn't have the best execution time. So sometimes it's better to have higher abort rates by cost to avoid spin lock fallbacks (note that acquiring spin lock means that transaction on other CPU aborts and likely to try acquire the lock).

TSX vs Spin Lock: Data Overlapping

If we would have the workload as we was observing so far, then we simply could use fine grained spin locks to protect the data for each thread. Moreover, we even could update the data concurrently on different CPUs without any locks at all because using thread identifier thr_id we update different memory locations on different CPUs.

So now it's time to see more realistic example with arbitrary data overlapping. This is where transactional memory can't be easily replaced by fine grained locks.

Again, we need to modify our trx_func() that now it accepts additional parameter overlap and computes shift in following way:

    shift = thr_id * trx_sz + i - overlap * thr_id;

So now we can specify by overlap parameter how many data cells will be overlapped between CPUs. And the testing code is

    for (int overlap = 0; overlap <= 32; overlap++)
        run_test(2, 32, 1, overlap, iter, Sync::TSX);
    for (int overlap = 0; overlap <= 32; overlap++)
        run_test(2, 32, 1, overlap, iter, Sync::SpinLock);


Figure 7: TSX vs Spin Lock:data overlapping
The test was performed for transaction size of 32 cache lines with overlaping from 0 to all 32 cache lines.

Results are depicted on Figure 7. Average value for execution time for TSX is 2811ms and for spin lock is 2631ms.

It's expectable for spin lock that running time won't vary significantly with changing data overlapping - we have only one lock, so there is no difference to modify the same data cells on both the CPUs or completely different sets of cells. However I expected that transactional memory is sensitive to data overlapping, but it isn't so. We've already seen above that even nonoverlapping transactions still produces a lot of conflict aborts. And the same for this test - number of aborts for zero and all overlapping cells are the same, 14%.

UPD 2: Since we use spin lock as a fallback for TSX, then the spinlock can be that conflicting cache line which doesn't allow TSX scale on non-overlapping tests (i.e. the spinlock is the conflicting cache line). So I've ran the same test for TSX overlapping transactions with commented out spin lock fallback code. Unfortunately, it didn't change the curve for TSX on Figure 7.

Sunday, August 11, 2013

Lock-free Condition Wait for Lock-free Multi-producer Multi-consumer Queue

The lock-free multi-producer multi-consumer queue on ring buffer described in my previous post has following properties:
  1. in pop() the queue calls sched_yeild() which leads to 100% CPU consumption;
  2. consumers are waiting for particular position in the queue, i.e. if you put an item to the queue with all sleeping consumers then one and only one consumer can eat it;
  3. the queue has fixed number of consumers and producers;
  4. say we have 4 consumers and there are no available elements in the queue, then all consumers will wait on 4 consequent positions (x, x + 1, x + 2, x +3);
The first one is undesired due to too high power consumption and misleading  system statistics. To cope with this we can use usleep(3), but it looks like dirty and relatively expensive solution. It would be better to make the pop()'er to wait until a push()'er adds an item to the queue. In the naive queue implementation I used conditional variable to do this. Also, the spinning behaviour of the queue leads to unnecessary cache bouncing and degrades performance.

In this post I'm going to show an efficient way for condition wait. The original article about lock-free queue has used C++11 for the code implementation, however in this article I'll be mostly talking about Linux kernel because the algorithm was developed for kernel implementation of the queue. I'll be explaining all kernel specific things, so no special skills are required from a reader.

If you need to make consuming thread to go to sleep when there are no items in the queue, then probably you write code like following (this is C-like pseudo-code):

    // Consumer
    while (thr_pos().tail >= last_head_) {
        wait_event_interruptible(wq,
                                 thr_pos().tail < last_head_);

        // Update the last_head_.
        // .......
    }

    // Producer
    // Push element and wake up a consumer.
    // ......

    thr_pos().head = ULONG_MAX;
    wake_up_interruptible_all(wq);

I left pieces of code corresponding to the queue logic as they are in the original queue implementation, but surely we should rewrite the queue in plain C if we need to run it in kernel space.

wait_event_interruptible() and wake_up_interruptible_all() are Linux kernel analogs of pthread_cond_wait(3p) and pthread_cond_broadcast(3p). The both accepts a pointer to wait queue on which consumers are sleeping. wait_event_interruptible(), which is a C macro actually, also takes the condition on which the consumers wants to sleep (i.e. it waits until the condition is true). wake_up_interruptible_all() wakes up all consuming threads, the same way as pthread_cond_broadcast() does it. We can't use more efficient wake_up_interruptible(), which wakes up only one consumer, due to the second feature of our queue - we must be sure that exactly the consumer waiting on the position, into which we just inserted an item, is woken up, but standard interface doesn't allow us to specify which thread must be woken up. So we don't know which thread to wake up and we have to wake up all the sleeping threads.

The body of while loop in consumer code is slow path, but we want the things to be fast in our lock free implementation. The situation in the queue can change quickly, so a consumer, which just checked that there is no items in the queue, can find an item at next check and we should balance between how quickly consumer can observer the queue state and how many unnecessary cache bounces it produces. Therefore I add some spinning before going to sleep:

    // Consumer
    unsigned int loop_cnt = 0;
    while (thr_pos().tail >= last_head_) {
        if (++loop_cnt < 1000) {
            schedule();
        } else {

            wait_event_interruptible(wq,
                                     thr_pos().tail
                                      < last_head_);
            loop_cnt = 0;

        // Update the last_head_.
        // .......
    }

In practise the constant for loop spinning (1000 in the code above) should be chosen based on results of performance tests. Thus, we can minimize cost of condition wait for consumers. Unfortunately, we can't reliably do the same for producers - we don't know whether there are sleeping consumers or not in reliable way (if you just put a check and call wake_up() after it then a consumer can go to sleep just after the check say "there are no sleepers"). So we must always call waking up function.

Now let's have a brief look onto wake_up_interruptible_all() and wait_event_interruptible() implementations (linux-3.11-rc3, I've thrown out some logic for code brevity):

    #define wake_up_interruptible_all(x) \
        __wake_up(x, TASK_INTERRUPTIBLE, 0, NULL)

    void __wake_up(wait_queue_head_t *q, unsigned int mode,
                   int nr_exclusive, void *key)
    {
        unsigned long flags;

        spin_lock_irqsave(&q->lock, flags);
        __wake_up_common(q, mode, nr_exclusive, 0, key);
        spin_unlock_irqrestore(&q->lock, flags);
    }

    static void __wake_up_common(wait_queue_head_t *q,
                                 unsigned int mode,
                                 int nr_exclusive,

                                 int wake_flags, void *key)
    {
        wait_queue_t *curr, *next;

        list_for_each_entry_safe(curr, next, &q->task_list,

                                 task_list)
        {
            unsigned flags = curr->flags;

            if (curr->func(curr, mode, wake_flags, key)

                && (flags & WQ_FLAG_EXCLUSIVE)
                && !--nr_exclusive)
                        break;
        }
    }



    #define wait_event_interruptible(wq, condition)          \
    ({                                                       \
        int __ret = 0;                                       \
        if (!(condition))                                    \
            __wait_event_interruptible(wq, condition, __ret); \
       __ret;                                                \
    })

    #define __wait_event_interruptible(wq, condition, ret)   \
    do {                                                     \
        DEFINE_WAIT(__wait);                                 \
        for (;;) {                                           \
            prepare_to_wait(&wq, &__wait, TASK_INTERRUPTIBLE); \
            /* .... */                                       \
        }                                                    \
        finish_wait(&wq, &__wait);                           \
    } while (0)


    void
    prepare_to_wait(wait_queue_head_t *q, wait_queue_t *wait,

                    int state)
    {

        // .....
        spin_lock_irqsave(&q->lock, flags);
        // .....
        spin_unlock_irqrestore(&q->lock, flags);
    }

    

Here we see following two nasty things:
  • wait_event_interruptible() and wake_up_interruptible_all() acquires the same spin lock;
  • wake_up_interruptible_all() walks over a list of  tasks and items of the list are likely in sparse memory regions.
Thus, going to sleep and wake up a task are not a fast operations. Moreover, as we saw, we have to wake up all the consuming tasks instead of just one and this is also sad for performance, especially on many cores. So we need to implement conditional wait for the queue with following properties:
  1. concurrent going to sleep and waking up (i.e. lock-free);
  2. wake up only the consumer which waits for the item which we just inserted into the queue;
This is the place where the 3rd and 4th features of our queue come into action. Firstly, we allocate and initialize an array of task descriptors (struct task_struct in Linux kernel or it could be pthread_t in user space) with size of number of consumers:

   
struct task_struct *w_tasks[CONSUMERS] ____cacheline_aligned;

    memset(w_tasks, 0, sizeof(w_tasks));

We'll use the array to make consumers go to sleep concurrently. The question is how to safely get an index in the array for particular consuming task? We need to know exactly which task we have to wake up when we insert an item in the queue, so the answer is simple - just get residual of division of current position in the queue by number of consumers (CONSUMERS). Due to property 4 of our queue, we can say that, using such array indexing, all consumers safely get their positions in the array without conflicts, but we'll see bit later that this is not true and we need additional steps to solve the conflicts. However, at this point we can easily write waking up code (please, read it also as pseudo-code - this is a mix of previous C++ lock-free queue implementation and Linux kernel C implementation of the same queue):

    void
    wake_up(unsigned long position)
    {
        unsigned long pos = position % CONSUMERS;
        wait_queue_t wait = { .private = w_tasks[pos] };

        if (!wait.private)
                return; 


        /*
         * Asynchronously wake up the task.
         * See linux/kernel/sched_features.h.
         */
 

        default_wake_function(&wait, TASK_INTERRUPTIBLE,
                              0, NULL);
    }

Where default_wake_function() wakes up the task passed to it as a field of wait_queue_t structure - this is standard Linux kernel API. One important thing - there is noting bad if we try to wake up already running task, so we can leave this without locking.

The things are going harder when a task goes to sleep. Following problems are possible if many consumers go to sleep and many producers wake them up concurrently:
  1. a consumer misses its waken signal due to
    1. race with a producer on insertion into the awaited position (a consumer inserts its task descriptor into the array after a producer tried to wake up corresponding consumer);
    2. race with other consumer which rewrites pointer in the array;
  2. waking up wrong consumer;
  3. false wake up;
 The 3rd issue, false wake up, is easy to overcome - just recheck the condition in a loop over conditional wait. The other two are harder to fix. Imagine that we have 4 consumers (C0, C1, C2, C3 and CONSUMERS = 4) waiting on four positions correspondingly (N0, N1, N2 and N3) and two producers (P0 and P1). The producers are concurrently inserting two items N0 and N1 correspondingly and go to wake up consumers C0 and C1. If producer P0 is slow (e.g. it was preempted) than P1, then P1 can wake up C1, C1 eats the item and ready to consume the next one. The position of the next item is P4 = P0 + 4 which gives us exactly the same position P0 in w_tasks, so C1 goes to rewrite position of C0 and P0 will wake up C1 instead of C0. Even more nasty is that C0 could be never woken up at all. Therefore, we can acquire w_tasks item only if it is surely free and CMPXCHG is helping us with this.

Also there is other race scenario which we need to prevent. A producer and a consumer goes into push() and pop() operations simultaneously:
  1. consumers checks that there is no items in the queue and goes to wait;
  2. producer pushes the item and try to wake waiting task, but finds corresponding position in w_tasks as NULL and doesn't do anything;
  3. consumer sleeps in waiting for the item, probably forever.
This is a classical scenario which is perfectly handled with double check by any conditional wait code (either POSIX threads library or Linux kernel).

So lets write our fast lock-free conditional wait code:

    #define cond_wait(position, condition)                   \
    do {                                                     \
        unsigned long p = position % CONSUMERS;              \
        struct task_struct *curr_waiter;                     \
        curr_waiter = cmpxchg(&w_tasks[p], NULL, current);   \
        if (unlikely(curr_waiter)) {                         \
            wait_queue_t wait = { .private = curr_waiter };  \
            default_wake_function(&wait, TASK_INTERRUPTIBLE, \

                                  0, NULL);                  \
                schedule();                                  \
                if (condition)                               \
                        break;                               \
                continue;                                    \
        }                                                    \
        set_current_state(TASK_INTERRUPTIBLE);               \
        if (!(signal_pending(current) || condition))         \
                schedule();                                  \
        w_tasks[p] = NULL;                                   \
        set_current_state(TASK_RUNNING);                     \
        break;                                               \
    } while (1)


Where current is pointer to current task in Linux kernel (global variable). The current task goes to sleeping state by setting its state to TASK_INTERRUPTIBLE and rescheduling (by schedule() call). When task is waked up it continues its work flow from schedule() call and sets its state as running, so it will get time slice again on next rescheduling.

Our conditional wait spins in a loop while the position on w_tasks is non-NULL (i.e. it is acquired by some other waiting thread), so there is no conflict between consumers. Hopefully, the case when two tasks are competing for the same position in the wait array is rare, so I use unlikely specification (which is equal to __builtin_expect(X, 0) GCC's extension in user space).

If a task waiting on position P faces w_tasks[P % CONSUMERS] != NULL, then it is likely that the position is acquired by a task waiting on position Q, such that Q + CONSUMERS <= P. Since we have only CONSUMERS number of consumers, then it means that position P in the queue already has an item (due to property 4). We're in a slow path anyway, so there is no problem to try to wake up the waiting task to make its wake up happen earlier. There are also a chance that Q > P, but it is less likely and there is still no problem in false wake up. Somebody can push an item to the queue during our spinning in waiting for freeing position in w_tasks, so we must check the condition at each iteration.

Finally, we perform classical double check of the condition to avoid infinite waiting and set w_tasks position to NULL at the end of waiting.

This is fast condition wait, and moreover due to reduced cache bouncing, it makes the lock-free queue ever faster than its spinning version. The kernel module which uses the lock-free queue with this condition wait algorithm has shown about 10% performance improvement in comparison with the queue without condition wait. Unfortunately, I don't have independent test code in which I can measure the performance gain for the queue itself without additional application logic.

Saturday, July 27, 2013

C++ Variadic Templates For Multiple Inheritance

C++ variadic templates take variable number of arguments. C++ also allows to create a template class which inherits from template base class. These two allows us to inherit from variable number of base classes. When it's needed? Let's have a look at simple example which I faced recently.

Suppose you need class Messenger which receives raw messages from a socket, assembles messages of particular types and passes them into appropriate queue. Frequently the message queues are implemented as template like

    template<class T>
    struct Queue {
        // some class body
    };

So you has following queues for each type of message:

    Queue<MsgA> q_a;
    Queue<MsgB> q_b;
    Queue<MsgC> q_c;

The queues have to be members of class Messenger. Probably, it is not a big deal to copy paste 3, as in the example, members. However, the uglinesses raises from necessity to have registering interfaces for each queue (classes who uses the Messenger need to register on particular queue to receive messages from it), serialized push() interfaces, probably the queue accessors and some other methods specific for the queue (accessing the queues directly as to public members isn't a good idea). C++ meta-programming could help to generate the queues automatically with all required interfaces. Let's have see how we can use it for this task.

Messenger provides interfaces to the queues, so it "is-a" QueueHandler. QueueHandler handles queue of particular type as a member and provides interfaces to it. So you should generate set of QueueHander classes for each queue type and inherit Messenger from all of the classes:

    template<class T>
    struct QueueHandler {
        void register(Queue<T> *q) { /* some method body */ }
        void push(T *msg) { /* some other method body */ }
    private:

        Queue<T> q_;
    };


It's also worse to specify explicitly all the base QueueHandler classes for Messenger class. So you can introduce helping class GenericMessenger, which template specialization is Messenger, and use C++ variadic template to write the class independent on particular number of serviced queues:

    template<class... Args>
    struct GenericMessenger : QueueHandler<Args>... {
        // some struct body
    };

    typedef GenericMessenger<MsgA, MsgB, MsgC> Messenger;


Therefore, if you need to support one more type of messages (and their queue of course), then you just need to add the type to Messenger definition and there is no copy paste code!

The only one ugly thing is that you need to specify explicitly base class on accessing particular queue (this is because GenericMessenger has many base classes with the same methods' names, so we need to explicitly call method of particular base):

    Messenger *m = new Messenger();
    

    m->QueueHandler<A>::register(new Queue<MsgA>);
    m->QueueHandler<A>::push(new MsgA);