April issue of Linux Journal (issue #228, High Performance Computing)
has arrived. My article "Lock-Free Multi-Producer Multi-Consumer Queue
on Ring Buffer" is at page 104 and referred as "How to Scale the Work Queue in a Multicore
Environment" on the cover.
The article describes lock-free work queue which is already working in production environments on two of our clients. This queue significantly improves performance on multi-core system. I got 3.7 times better results for synthetic tests in comparison with naive queue implementation on 16-core Xeon machine. One of the projects, where the queue is integrated, got about 35% performance improvement after replacement double-linked queue protected with spin lock by the lock-free queue.
You can find the source of the queue with performance and correctness tests here.
UPD. Also you can read it on-line at Linux Journal site.
High Performance Linux
> Try Tempesta FW, a high performance open source application delivery controller for the Linux/x86-64 platform.
> Or check custom high-performance solutions from Tempesta Technologies, INC.
> Careers: if you love low-level C/C++ hacking and Linux, we'll be happy to hear from you.
Tuesday, April 2, 2013
Friday, March 29, 2013
What's Wrong With Sockets Performance And How to Fix It
Socket API is a nice think which allows you to easily write network programs. But sockets have fundamental problem from performance point of view - they are asynchronous with network interrupts. And this is regardless whether you're using blocking or nonblocking IO.
Let's consider a multi-threaded application which is working with number of sockets and reads some data from them. Typically, it does following (pseudo code):
int n = epoll_wait(e->fd, e->event, e->max_events, 1000);
for (int i = 0; i < n; ++i) {
unsigned char buf[4096];
read(e->event[i].data.fd, buf, 4096);
}
The polled socket could be either blocking or non-blocking socket. Let's forget about the buffer copying for a moment and concentrate on what's happen with arriving packets.

The figure depicts two processes (there is no difference between processes and threads in our discussion) which reads from three sockets. The processes are working on different CPUs. Probably Receive Flow Steering (RFS) is used so packets designated for the first process go to the first CPU and packets for second process are processed by softirq at the second CPU. Each socket has a receive queue where incoming packets are placed before reading process consumes them.
If we look at the code sample carefully then we find two system calls, relatively slow operations. The process also can be rescheduled and/or preempted between the syscalls. So if the process is waked up in epoll_wait() call by a socket event (when the socket gets a packet) then it reads data from the socket with some delay. There are a bold arrow between the second socket's queue and the first process which depicts reading a data from the socket. There are two complications:
In fact Linux firewall works in softirq context. It means that the packet is processed synchronously, immediately when it is received. Moreover, synchronous packets processing is not limited by network level (on which firewalls works) operations. Fortunately Linux assembles TCP stream also in softirq context. Linux kernel also provides few callbacks in struct sock (see include/net/sock.h):
void (*sk_state_change)(struct sock *sk);
void (*sk_data_ready)(struct sock *sk, int bytes);
void (*sk_write_space)(struct sock *sk);
void (*sk_error_report)(struct sock *sk);
int (*sk_backlog_rcv)(struct sock *sk, struct sk_buff *skb);
For example sk_data_ready() is called when a new data received on the socket. So it is simple to read TCP data synchronously in deferred interrupt context. Writing to the socket is bit more harder, but still possible. Of course, your application must be in-kernel now.
Let's have a look at simple example how to use the hooks for TCP data reading. First of all we need a listening socket (this is kernel sockets, so the the socket API is different);
struct socket *l_sock;
sock_create_kern(AF_INET, SOCK_STREAM, IPPROTO_TCP, &l_sock);
inet_sk(s->sk)->freebind = 1;
/* addr is some address packed into struct sockaddr_in */
l_sock->ops->bind(l_sock, (struct sockaddr *)addr,
sizeof(addr));
l_sock->sk->sk_state_change = th_tcp_state_change;
l_sock->ops->listen(l_sock, 100);
sk_state_chage() is called by Linux TCP code when the socket state is changed. We need a new connection established socket, so we need to handle TCP_ESTABLISHED state change. TCP_ESTABLISHED will be set for child socket of course, but we set the callback to listening socket because the child socket inherits the callback pointers from its parent. th_tcp_state_change() can be defined as:
void
th_tcp_state_change(struct sock *sk)
{
if (sk->sk_state == TCP_ESTABLISHED)
sk->sk_data_ready = th_tcp_data_ready;
}
And here we set other callback, but already for the child socket. th_tcp_data_ready() is called when a new data is available in socket receive queue (sk_receive_queue). So in the function we need to do what standard Linux tcp_recvmsg() does - traverse the queue and pick packets with appropriate sequence numbers from it:
void
th_tcp_data_ready(struct sock *sk, int bytes)
{
unsigned int processed = 0, off;
struct sk_buff *skb, *tmp;
struct tcp_sock *tp = tcp_sk(sk);
skb_queue_walk_safe(&sk->sk_receive_queue, skb, tmp) {
off = tp->copied_seq - TCP_SKB_CB(skb)->seq;
if (tcp_hdr(skb)->syn)
off--;
if (off < skb->len) {
int n = skb_headlen(skb);
printk(KERN_INFO "Received: %.*s\n",
n - off, skb->data + off);
tp->copied_seq += n - off;
processed += n - off;
}
}
/*
* Send ACK to the client and recalculate
* the appropriate TCP receive buffer space.
*/
tcp_cleanup_rbuf(sk, processed);
tcp_rcv_space_adjust(sk);
/* Release skb - it's no longer needed. */
sk_eat_skb(sk, skb, 0);
}
The function should be more complicated to properly handle skb's paged data and fragments, release the skb, more accurately process TCP sequence numbers and so on, but basic idea should be clear.
UPD: You can find source code of the Linux kernel synchronous socket API at https://github.com/krizhanovsky/sync_socket .
Let's consider a multi-threaded application which is working with number of sockets and reads some data from them. Typically, it does following (pseudo code):
int n = epoll_wait(e->fd, e->event, e->max_events, 1000);
for (int i = 0; i < n; ++i) {
unsigned char buf[4096];
read(e->event[i].data.fd, buf, 4096);
}
The polled socket could be either blocking or non-blocking socket. Let's forget about the buffer copying for a moment and concentrate on what's happen with arriving packets.

The figure depicts two processes (there is no difference between processes and threads in our discussion) which reads from three sockets. The processes are working on different CPUs. Probably Receive Flow Steering (RFS) is used so packets designated for the first process go to the first CPU and packets for second process are processed by softirq at the second CPU. Each socket has a receive queue where incoming packets are placed before reading process consumes them.
If we look at the code sample carefully then we find two system calls, relatively slow operations. The process also can be rescheduled and/or preempted between the syscalls. So if the process is waked up in epoll_wait() call by a socket event (when the socket gets a packet) then it reads data from the socket with some delay. There are a bold arrow between the second socket's queue and the first process which depicts reading a data from the socket. There are two complications:
- the process can be preempted by softirq between waking up on epoll_wait() and reading from the socket (however, it's easy to prevent this by binding the process and the NIC interrupts to different cores);
- during high loaf Linux switches to polling mode and very very quickly grabs bunches of packets, so during the delay between the two syscalls softirq can process a lot of other packets.
In fact Linux firewall works in softirq context. It means that the packet is processed synchronously, immediately when it is received. Moreover, synchronous packets processing is not limited by network level (on which firewalls works) operations. Fortunately Linux assembles TCP stream also in softirq context. Linux kernel also provides few callbacks in struct sock (see include/net/sock.h):
void (*sk_state_change)(struct sock *sk);
void (*sk_data_ready)(struct sock *sk, int bytes);
void (*sk_write_space)(struct sock *sk);
void (*sk_error_report)(struct sock *sk);
int (*sk_backlog_rcv)(struct sock *sk, struct sk_buff *skb);
For example sk_data_ready() is called when a new data received on the socket. So it is simple to read TCP data synchronously in deferred interrupt context. Writing to the socket is bit more harder, but still possible. Of course, your application must be in-kernel now.
Let's have a look at simple example how to use the hooks for TCP data reading. First of all we need a listening socket (this is kernel sockets, so the the socket API is different);
struct socket *l_sock;
sock_create_kern(AF_INET, SOCK_STREAM, IPPROTO_TCP, &l_sock);
inet_sk(s->sk)->freebind = 1;
/* addr is some address packed into struct sockaddr_in */
l_sock->ops->bind(l_sock, (struct sockaddr *)addr,
sizeof(addr));
l_sock->sk->sk_state_change = th_tcp_state_change;
l_sock->ops->listen(l_sock, 100);
sk_state_chage() is called by Linux TCP code when the socket state is changed. We need a new connection established socket, so we need to handle TCP_ESTABLISHED state change. TCP_ESTABLISHED will be set for child socket of course, but we set the callback to listening socket because the child socket inherits the callback pointers from its parent. th_tcp_state_change() can be defined as:
void
th_tcp_state_change(struct sock *sk)
{
if (sk->sk_state == TCP_ESTABLISHED)
sk->sk_data_ready = th_tcp_data_ready;
}
And here we set other callback, but already for the child socket. th_tcp_data_ready() is called when a new data is available in socket receive queue (sk_receive_queue). So in the function we need to do what standard Linux tcp_recvmsg() does - traverse the queue and pick packets with appropriate sequence numbers from it:
void
th_tcp_data_ready(struct sock *sk, int bytes)
{
unsigned int processed = 0, off;
struct sk_buff *skb, *tmp;
struct tcp_sock *tp = tcp_sk(sk);
skb_queue_walk_safe(&sk->sk_receive_queue, skb, tmp) {
off = tp->copied_seq - TCP_SKB_CB(skb)->seq;
if (tcp_hdr(skb)->syn)
off--;
if (off < skb->len) {
int n = skb_headlen(skb);
printk(KERN_INFO "Received: %.*s\n",
n - off, skb->data + off);
tp->copied_seq += n - off;
processed += n - off;
}
}
/*
* Send ACK to the client and recalculate
* the appropriate TCP receive buffer space.
*/
tcp_cleanup_rbuf(sk, processed);
tcp_rcv_space_adjust(sk);
/* Release skb - it's no longer needed. */
sk_eat_skb(sk, skb, 0);
}
The function should be more complicated to properly handle skb's paged data and fragments, release the skb, more accurately process TCP sequence numbers and so on, but basic idea should be clear.
UPD: You can find source code of the Linux kernel synchronous socket API at https://github.com/krizhanovsky/sync_socket .
Monday, November 26, 2012
Coding Style
I believe that coding style in a project is important thing. MySQL, PostgreSQL, Linux and FreeBSD kernels, glibc and many other excelent open source projects have their own coding style. The styles are different and that's expected because they were developed by different people with different preferences. The only crucial thing is code consistency. I guess you saw a code where spaces and tabs were mixed for indentations - it is always disgusting to work with such code. So the code of a project can use any style his owner likes which, but it must be consistent.
In our projects we follow Linux kernel coding style (you can find it at linux/Documentation/CodingStyle) with some additions for C++. We use the same coding style for kernel C and application C++ programming, so we have adjusted the original guide with some C++ specific things. Here it is.
In our projects we follow Linux kernel coding style (you can find it at linux/Documentation/CodingStyle) with some additions for C++. We use the same coding style for kernel C and application C++ programming, so we have adjusted the original guide with some C++ specific things. Here it is.
Thursday, September 20, 2012
Linux: scaling softirq among many CPU cores
Some years ago I have tested network interrupts affinity - you set ~0 as a CPU mask to balance network interrupts among all your CPU cores and you get all softirq instances running in parallel. Such interrupts distribution among CPU cores sometimes is a bad idea due to CPU caches computational burden and probable packets reordering. In most cases it is not recommended for servers performing some TCP application (e.g. web server). However this ability is crucial for some low level packet applications like firewalls, routers or Anti-DDoS solutions (in last cases most of the packets must be dropped as quick as possible), which do a lot of work in softirq. So for some time I was thinking that there is no problem to share softirq load between CPU cores.
To get softirq sharing between CPU cores you just need to do
I loaded the system with iperf over 1Gbps channel. And I was very confused when see that only one CPU of 24-cores machine was doing whole the work and all other CPUs was doing nothing!
To understand what's going on lets have a look how Linux handles incoming packets and interrupts from network card (e.g. Intel 10 Gigabit PCI Express which is placed at drivers/net/ixgbe). Softirq works in per-cpu kernel threads, ksoftirqd (kernel/softirq.c: ksoftirqd()), i.e. if you have 4-cores machine, then you have 4 ksoftirqd threads (ksoftirqd/0, ksoftirqd/1, ksoftirqd/2 and ksoftirqd/3). ksoftirqd() calls do_softirq(), which by-turn calls __do_softirq(). The last one uses softirq_vec vector to get required hadler for current softirq type (e.g. NET_RX_SOFTIRQ for receiving or NET_TX_SOFTIRQ for sending softirqs correspondingly). The next step is to call virtual function action() for the handler. For NET_RX_SOFTIRQ net_rx_action() (net/core/dev.c) is called here. net_rx_action() reads napi_struct from per-cpu queue softnet_data and calls virtual function poll() - a NAPI callback (ixgbe_poll() in our case) which actually reads packets from the device ring queues. The driver processes interrupts in ixgbe_intr(). This function runs NAPI through call __napi_schedule(), which pushes current napi_struct to per-cpu softnet_data->poll_list, which net_rx_action() reads packets (on the same CPU) from. Thus softirq runs on the same core which received hardware interrupt.
This way theoretically if harware interrupts are going to N cores, then these and only these N cores are doing softirq. So I had a look at /proc/interrupts statistics and saw that only one 0th core is actually receiving interrupts from NIC while I set ~0 mask in smp_affinity for the interupt (actually I had MSI-X card, so I set the mask to all the interrupt vectors for the card).
After runnign linux-2.6.35 and setting all CPUs to be able to process softirq I got following nice picture in top:
So as we see almost all of the cores are doing softirqs.
To get softirq sharing between CPU cores you just need to do
$ for irq in `grep eth0 /proc/interrupts | cut -d: -f1`; do \
echo ffffff > /proc/irq/$irq/smp_affinity; \
done
This makes (as I thought) your APIC to distribute interrupts between all your CPUs in round-robin fashion (or probably using some more cleaver technique). And this really was working in my tests.
Recently our client concerned about this ability, so I wrote very simple testing kernel module which just makes more work in softirq:
#include <linux/kernel.h>
#include <linux/module.h>
#include <linux/netfilter.h>
#include <linux/netfilter_ipv4.h>
MODULE_LICENSE("GPL");
/**
* Just eat some local CPU time and accept the packet.
*/
static unsigned int
st_hook(unsigned int hooknum, struct sk_buff *skb,
#include <linux/module.h>
#include <linux/netfilter.h>
#include <linux/netfilter_ipv4.h>
MODULE_LICENSE("GPL");
/**
* Just eat some local CPU time and accept the packet.
*/
static unsigned int
st_hook(unsigned int hooknum, struct sk_buff *skb,
const struct net_device *in,
const struct net_device *out,
const struct net_device *out,
int (*okfn)(struct sk_buff *))
{
unsigned int i;
for (i = 0; i <= 1000 * 1000; ++i)
skb_linearize(skb);
return NF_ACCEPT;
}
static struct nf_hook_ops st_ip_ops[] __read_mostly = {
{
.hook = st_hook,
.owner = THIS_MODULE,
.pf = PF_INET,
.hooknum = NF_INET_PRE_ROUTING,
.priority = NF_IP_PRI_FIRST,
},
};
static int __init
st_init(void)
{
if (nf_register_hooks(st_ip_ops, ARRAY_SIZE(st_ip_ops))) {
printk(KERN_ERR "%s: can't register nf hook\n",
{
unsigned int i;
for (i = 0; i <= 1000 * 1000; ++i)
skb_linearize(skb);
return NF_ACCEPT;
}
static struct nf_hook_ops st_ip_ops[] __read_mostly = {
{
.hook = st_hook,
.owner = THIS_MODULE,
.pf = PF_INET,
.hooknum = NF_INET_PRE_ROUTING,
.priority = NF_IP_PRI_FIRST,
},
};
static int __init
st_init(void)
{
if (nf_register_hooks(st_ip_ops, ARRAY_SIZE(st_ip_ops))) {
printk(KERN_ERR "%s: can't register nf hook\n",
__FILE__);
return 1;
}
printk(KERN_ERR "%s: loaded\n", __FILE__);
return 0;
}
static void
st_exit(void)
{
nf_unregister_hooks(st_ip_ops, ARRAY_SIZE(st_ip_ops));
printk(KERN_ERR "%s: unloaded\n", __FILE__);
}
module_init(st_init);
module_exit(st_exit);
return 1;
}
printk(KERN_ERR "%s: loaded\n", __FILE__);
return 0;
}
static void
st_exit(void)
{
nf_unregister_hooks(st_ip_ops, ARRAY_SIZE(st_ip_ops));
printk(KERN_ERR "%s: unloaded\n", __FILE__);
}
module_init(st_init);
module_exit(st_exit);
I loaded the system with iperf over 1Gbps channel. And I was very confused when see that only one CPU of 24-cores machine was doing whole the work and all other CPUs was doing nothing!
To understand what's going on lets have a look how Linux handles incoming packets and interrupts from network card (e.g. Intel 10 Gigabit PCI Express which is placed at drivers/net/ixgbe). Softirq works in per-cpu kernel threads, ksoftirqd (kernel/softirq.c: ksoftirqd()), i.e. if you have 4-cores machine, then you have 4 ksoftirqd threads (ksoftirqd/0, ksoftirqd/1, ksoftirqd/2 and ksoftirqd/3). ksoftirqd() calls do_softirq(), which by-turn calls __do_softirq(). The last one uses softirq_vec vector to get required hadler for current softirq type (e.g. NET_RX_SOFTIRQ for receiving or NET_TX_SOFTIRQ for sending softirqs correspondingly). The next step is to call virtual function action() for the handler. For NET_RX_SOFTIRQ net_rx_action() (net/core/dev.c) is called here. net_rx_action() reads napi_struct from per-cpu queue softnet_data and calls virtual function poll() - a NAPI callback (ixgbe_poll() in our case) which actually reads packets from the device ring queues. The driver processes interrupts in ixgbe_intr(). This function runs NAPI through call __napi_schedule(), which pushes current napi_struct to per-cpu softnet_data->poll_list, which net_rx_action() reads packets (on the same CPU) from. Thus softirq runs on the same core which received hardware interrupt.
This way theoretically if harware interrupts are going to N cores, then these and only these N cores are doing softirq. So I had a look at /proc/interrupts statistics and saw that only one 0th core is actually receiving interrupts from NIC while I set ~0 mask in smp_affinity for the interupt (actually I had MSI-X card, so I set the mask to all the interrupt vectors for the card).
I started googling for the answers why on earth interupts do not distribute among all the cores. The first topics which I found were nice articles by Alexander Sandler:
Following these articles not all hardware is actually able to spread interrupts between CPU cores. During my tests I was using IBM servers of particular model, but this is not the case of the client - they use very different hardware. This is why I saw one nice picture on my previous tests, but faced quite different behaviour on other hardware.
The good news is that linux 2.6.35 has introduced nice feature - RPS (Receive Packet Steering). The core of the feature is get_rps_cpu() from dev/net/core.c, which computes a hash from IP source and destination addresses of an incoming packet and determines a which CPU send the packet to based on the hash. netif_receive_skb() or netif_rx() which call the function puts the packet to appropriate per-cpu queue for further processing by softirq. So there are two important consequences:
- packets are processed by different CPUs (with processing I mostly mean Netfilter pre-routing hooks);
- it is unlikely that packets belonging to the same TCP stream are reordered (packets reordering is a well-known problem for TCP performance, see for example Beyond softnet).
$ for i in `seq 0 7`; do \
echo fffffff > /sys/class/net/eth0/queues/rx-$i/rps_cpus ; \
done
After runnign linux-2.6.35 and setting all CPUs to be able to process softirq I got following nice picture in top:
2238 root 20 0 411m 888 740 S 152 0.0 2:38.94 iperf
10 root 20 0 0 0 0 R 100 0.0 0:35.44 ksoftirqd/2
19 root 20 0 0 0 0 R 100 0.0 0:46.48 ksoftirqd/5
22 root 20 0 0 0 0 R 100 0.0 0:29.10 ksoftirqd/6
25 root 20 0 0 0 0 R 100 0.0 2:47.36 ksoftirqd/7
28 root 20 0 0 0 0 R 100 0.0 0:33.73 ksoftirqd/8
31 root 20 0 0 0 0 R 100 0.0 0:46.63 ksoftirqd/9
40 root 20 0 0 0 0 R 100 0.0 0:45.33 ksoftirqd/12
46 root 20 0 0 0 0 R 100 0.0 0:29.10 ksoftirqd/14
49 root 20 0 0 0 0 R 100 0.0 0:47.35 ksoftirqd/15
52 root 20 0 0 0 0 R 100 0.0 2:33.74 ksoftirqd/16
55 root 20 0 0 0 0 R 100 0.0 0:46.92 ksoftirqd/17
58 root 20 0 0 0 0 R 100 0.0 0:32.07 ksoftirqd/18
67 root 20 0 0 0 0 R 100 0.0 0:46.63 ksoftirqd/21
70 root 20 0 0 0 0 R 100 0.0 0:28.95 ksoftirqd/22
73 root 20 0 0 0 0 R 100 0.0 0:45.03 ksoftirqd/23
7 root 20 0 0 0 0 R 99 0.0 0:47.97 ksoftirqd/1
37 root 20 0 0 0 0 R 99 0.0 2:42.29 ksoftirqd/11
34 root 20 0 0 0 0 R 77 0.0 0:28.78 ksoftirqd/10
64 root 20 0 0 0 0 R 76 0.0 0:30.34 ksoftirqd/20
So as we see almost all of the cores are doing softirqs.
Friday, May 25, 2012
Software Transactional Memory (STM) in GCC-4.7
GCC-4.7 introduces new amazing feature - Software Transactional Memory (STM). It is still experimental and not yet optimized feature, however we already can have a look how STM works. Currently GCC implements pure Software TM, i.e. without hardware support. Intel announced hardware support for TM (HTM) in Haswell microarchitecture as Transactional Synchronization Extension (TSX), so probably in next year we'll have hybrid TM - software transactional memory with hardware optimizations.
Firstly, to understand what STM is, lets consider following simple program:
#include <iostream>
static const auto THR_NUM = 4;
static const auto ITER_NUM = 1000 * 1000;
static auto a = 0, b = 0, c = 0;
static void
thr_func()
{
for (auto i = 0; i < ITER_NUM; ++i) {
++a;
b += 2;
c = a + b;
}
}
int
main(int argc, char *argv[])
{
std::thread thr[THR_NUM];
for (auto &t : thr)
t = std::thread(thr_func);
for (auto &t : thr)
t.join();
std::cout << "a=" << a << " b=" << b
<< " c=" << c << std::endl;
return 0;
}
Now try to compile (don't forget -std=c++11 since C++11 is still not default option for g++) and run the program. Probably you'll see that a, b and c contains ugly values which change from run to run, e.g.:
Result is expected because 4 threads concurrently updates all the three variables and all the variables are updated in RMW (Read-Modify-Write) manner. Now lets place operations on all the three variables into one transaction (yes, this is very like database transactions), so all the variables will be read and written in atomic manner:
Lets compile the code with -fgnu-tm to enable STM in GCC and rerun the program. This time you'll see nice numbers, which stay the same regardless of the run try:
a=4000000 b=8000000 c=12000000
This is quite simple case and you'll probably prefer to use mutex here. But you can refer to Ulrich Drepper's "Parallel Programming with Transactional Memory" for more complicated example when mutex alternative is not so obvious. It's easy to see that STM would be quite useful for example to implement highly-concurrent self-balancing binary search tree which could need to lock number of nodes for rotation on insertion or deletion (traditionally such data structures are implemented by introducing per-node mutex and are prone to deadlocks).
You may noticed that STM version of the program runs much more slowly. So lets analyze what it's doing so long. For basic investigation lets run the program under strace and print system calls statistics:
So it means that STM in libitm (GCC implements STM as libitm library which you can see in ldd output) is implemented via futex() system call, like common mutex. Before going deeper into libitm internals lets see at the transaction code more carefully and split the code into basic read and write operations. We have 3 memory locations, variables a, b and c, which we perform read and write operations on. First operation is ++a which is actually read a value from memory, update it and write back, so we have two operations here - one read and one write. Next b += 2 - exactly the same: read the value, add 2 and write it back. And the last one, c = a + b, is two reads (a and b) and one write to c. Moreover, all these operation are inside transaction, so we have to start and commit a transaction.
To understand what's going on inside thr_func() lets simplify it as follows:
and disassemble it:
Now we see four calls of _ITM_* functions (as explained in info libitm, GCC follows the Intel's Draft Specification of Transactional LanguageConstructs for C++ (v1.1) in its implementation of transactions, so _ITM_ prefix is just Intel's naming convention) for transaction begin, transaction commit and the pair of read (RU4) and write (WU4) operations.
_ITM_beginTransaction() saves the machine state (for x86 see libitm/config/x86/sjlj.S) and calls GTM::gtm_thread::begin_transaction() (see libitm/beginend.cc) which initializes the transaction data, checks transaction nesting and performs other preparation steps.
_ITM_commitTransaction() is defined in libitm/beginend.cc and tries to commit the transaction by calling GTM::gtm_thread::trycommit() and if it fails restarts the transaction. GTM::gtm_thread::trycommit() is the place where all the threads are sleeping in futex() (which we saw in strace output) to write all modified data. So this is most heavy part of transaction.
The most interesting stuff is in read and write operations. 0x6052ec is address of variable a. _ITM_RU4 and _ITM_WU4 are just a sequence of jumps which lead (in this particular case) to ml_wt_dispatch::load() and ml_wt_dispatch::store() correspondingly. First one accepts only the variable address and the second one - the variable address and the stored value. Load() reads a memory region by specified address, but before that it calls ml_wt_dispatch::pre_load() function which verifies that the memory location is not locked or recent and restarts the transaction (these service data is taken from global table indexed by hash function over the address). Store() by-turn calls ml_wt_dispatch::pre_write() which locks the memory location (all service data for the memory location also is taken by the same global table) and updated the release (version) of the memory location before the write (the release version is checked in pre_load() as 'recent').
Firstly, to understand what STM is, lets consider following simple program:
#include <iostream>
#include <thread>
static const auto ITER_NUM = 1000 * 1000;
static auto a = 0, b = 0, c = 0;
static void
thr_func()
{
for (auto i = 0; i < ITER_NUM; ++i) {
++a;
b += 2;
c = a + b;
}
}
int
main(int argc, char *argv[])
{
std::thread thr[THR_NUM];
for (auto &t : thr)
t = std::thread(thr_func);
for (auto &t : thr)
t.join();
std::cout << "a=" << a << " b=" << b
<< " c=" << c << std::endl;
return 0;
}
Now try to compile (don't forget -std=c++11 since C++11 is still not default option for g++) and run the program. Probably you'll see that a, b and c contains ugly values which change from run to run, e.g.:
$ ./a.out
a=2139058 b=4316262 c=6455320
a=2139058 b=4316262 c=6455320
$ ./a.out
a=2077152 b=4463948 c=6541100
a=2077152 b=4463948 c=6541100
Result is expected because 4 threads concurrently updates all the three variables and all the variables are updated in RMW (Read-Modify-Write) manner. Now lets place operations on all the three variables into one transaction (yes, this is very like database transactions), so all the variables will be read and written in atomic manner:
static void
thr_func()
{
for (auto i = 0; i < ITER_NUM; ++i)
__transaction_atomic {
++a;
b += 2;
c = a + b;
}
}
thr_func()
{
for (auto i = 0; i < ITER_NUM; ++i)
__transaction_atomic {
++a;
b += 2;
c = a + b;
}
}
Lets compile the code with -fgnu-tm to enable STM in GCC and rerun the program. This time you'll see nice numbers, which stay the same regardless of the run try:
$ ./a.out
a=4000000 b=8000000 c=12000000
$ ./a.out a=4000000 b=8000000 c=12000000
a=4000000 b=8000000 c=12000000
This is quite simple case and you'll probably prefer to use mutex here. But you can refer to Ulrich Drepper's "Parallel Programming with Transactional Memory" for more complicated example when mutex alternative is not so obvious. It's easy to see that STM would be quite useful for example to implement highly-concurrent self-balancing binary search tree which could need to lock number of nodes for rotation on insertion or deletion (traditionally such data structures are implemented by introducing per-node mutex and are prone to deadlocks).
You may noticed that STM version of the program runs much more slowly. So lets analyze what it's doing so long. For basic investigation lets run the program under strace and print system calls statistics:
$ strace -f -c ./a.out
........
% time seconds usecs/call calls errors syscall
------ --------- ----------- ------- ------- ---------
99.39 0.021295 11 1920 390 futex
.......
------ --------- ----------- ------- ------- ---------
99.39 0.021295 11 1920 390 futex
.......
So it means that STM in libitm (GCC implements STM as libitm library which you can see in ldd output) is implemented via futex() system call, like common mutex. Before going deeper into libitm internals lets see at the transaction code more carefully and split the code into basic read and write operations. We have 3 memory locations, variables a, b and c, which we perform read and write operations on. First operation is ++a which is actually read a value from memory, update it and write back, so we have two operations here - one read and one write. Next b += 2 - exactly the same: read the value, add 2 and write it back. And the last one, c = a + b, is two reads (a and b) and one write to c. Moreover, all these operation are inside transaction, so we have to start and commit a transaction.
To understand what's going on inside thr_func() lets simplify it as follows:
static void
thr_func()
{
__transaction_atomic {
++a;
}
}
thr_func()
{
__transaction_atomic {
++a;
}
}
and disassemble it:
push %rbp
mov %rsp,%rbp
mov $0x29,%edi
mov $0x0,%eax
callq 400fd8 <_ITM_beginTransaction@plt>
mov $0x6052ec,%edi
callq 4010b8 <_ITM_RU4@plt>
add $0x1,%eax
mov %eax,%esi
mov $0x6052ec,%edi
callq 400fe8 <_ITM_WU4@plt>
callq 400f48 <_ITM_commitTransaction@plt>
pop %rbp
retq
Now we see four calls of _ITM_* functions (as explained in info libitm, GCC follows the Intel's Draft Specification of Transactional LanguageConstructs for C++ (v1.1) in its implementation of transactions, so _ITM_ prefix is just Intel's naming convention) for transaction begin, transaction commit and the pair of read (RU4) and write (WU4) operations.
_ITM_beginTransaction() saves the machine state (for x86 see libitm/config/x86/sjlj.S) and calls GTM::gtm_thread::begin_transaction() (see libitm/beginend.cc) which initializes the transaction data, checks transaction nesting and performs other preparation steps.
_ITM_commitTransaction() is defined in libitm/beginend.cc and tries to commit the transaction by calling GTM::gtm_thread::trycommit() and if it fails restarts the transaction. GTM::gtm_thread::trycommit() is the place where all the threads are sleeping in futex() (which we saw in strace output) to write all modified data. So this is most heavy part of transaction.
The most interesting stuff is in read and write operations. 0x6052ec is address of variable a. _ITM_RU4 and _ITM_WU4 are just a sequence of jumps which lead (in this particular case) to ml_wt_dispatch::load() and ml_wt_dispatch::store() correspondingly. First one accepts only the variable address and the second one - the variable address and the stored value. Load() reads a memory region by specified address, but before that it calls ml_wt_dispatch::pre_load() function which verifies that the memory location is not locked or recent and restarts the transaction (these service data is taken from global table indexed by hash function over the address). Store() by-turn calls ml_wt_dispatch::pre_write() which locks the memory location (all service data for the memory location also is taken by the same global table) and updated the release (version) of the memory location before the write (the release version is checked in pre_load() as 'recent').
Friday, March 30, 2012
Dynamic Strong Type System
Dynamic type system means that data types are defined in run-time. This is opposite to static type system, when data types are defined in compile-time. Examples of static type programming languages are C/C++ or Java, dynamic type - Python, Perl and Erlang. Strong type system requires explicit types conversions (like static_cast in C++), from other side weak type system allows implicit type conversions (e.g. "5" + 9 is valid Perl expression).
I argue against dynamic but strong type system. It makes large programs difficult to maintain.Let's write simple function which just sums two integer variables and prints the result in Perl, C++ and Erlang:
% Erlang
sum(A, B) ->
io:format("~p~n", [A + B]).
// C++
void sum(int a, int b)
{
std:cout << a + b << std::end;
}
# Perl
sub sum
{
my ($a, $b) = @_;
print $a + $b
}
What happen if we decide to change the function argument types, say from integer to string? It will still work for Perl and broke for C++ on compile time since you can not pass std::string or char * as integer arguments to the function. However, Erlang example will crash in runtime with bad argument error.
Changing type of arguments from integer to string seems very unnatural at first glance. But converting types from atom to list for Erlang is not so unnatural and such adjustments in code requires careful investigation of whole data flow from the level where the types were changed to the lower lever which uses the data.
This way a programming language is more pleasant to use (at least for me) if it uses dynamic and weak type system or static and strong, but not dynamic and strong.
Saturday, March 24, 2012
Simple Cluster Management Using Erlang Mnesia
The logic for load balancing and failovering itself is not trivial, but there is also one not obvious problem - synchronization between the cluster nodes. The problem is crucial in shared-nothing cluster architecture with shared state, i.e. there is no single physical point of failure in the cluster, but all the nodes operate on a shared data. Lets consider that you have a cluster of working nodes and you have a jobs on all of the nodes, which you need to reassign depending on each node load. Traditionally you'd do this with some director node - this node collects load factors from all the nodes and reschedule the jobs between the nodes. The director itself is a cluster of at least two machines for redundancy. But this works only for large clusters - it has not so much sense to have two directors for small cluster of two or little bit more working nodes which works in active-active scenario. For such small clusters we would wish that all the nodes are able to participate in cluster load balancing and failovering itself.
Let's consider a quick example. You have cluster of two active working nodes, A and B, and both the nodes are loaded equally. Let J1 and J2 be a jobs (e.g. client connections) on node A and J3 and J4 a jobs on node B respectively. At some point of time clients which makes jobs J3 and J4 reduced their activity, but J1 and J2 increased (i.e. load factors produced by the jobs are increased). You would expect that the nodes will reschedule the jobs/connections between the nodes in manner like J1 and J3 to node A and J2 and J4 to node B. To do this both the nodes have to track current load on itself and other node and reassign the jobs in accordance with the loads ratio. Since both the nodes work independently, then we can not guarantee that the node won't try to reassign the jobs in different ways at the same time (e.g. node A tries to assign J1 and J3 to itself and J2 and J4 to B while B is trying to assign J1 and J4 to itself and J2 and J3 to A). If we have a cluster of bit more more machines (say 3) and the machines can fail at any time then we also have a change that some node crash during the jobs reassignment and we loose some jobs.
To synchronize such kind of communications between the nodes usually we use algorithms of distributed state machines like Paxos or Virtual Synchrony. The last is greatly implemented in Spread Toolkit, which you can use with C/C++ or Perl bindings to build reliable cluster management system.
However Erlang provides distributed database, Mnesia, from the box, which is useful in solving such issues. In fact, Mnesia have distributed transactions, but it is not a heavy disk-bases database. Mnesia has special kind of tables, ram_copies, which is actually just a hash tables. So the key point: with Erlang and Mnesia we can atomically and reliably execute distributed operations on hash tables stored in RAM on different cluster nodes.
Now lets have a look how to solve the jobs scheduling problem with Erlang (I suppose that the reader is familiar with basic Erlang syntax). First of all let's create two table with assigned jobs and current load:
-record(jobs, {job_id, weight, node_name}).
-record(cluster_view, {node_name}).
mnesia:create_table(cluster_view,
[{ram_copies, [node()]},
{attributes, record_info(fields, cluster_view)}
]).
mnesia:create_table(jobs,
[{ram_copies, [node()]},
{attributes, record_info(fields, jobs)},
{index, [node_name]}
]).
(I defined secondary index for node_name in jobs table to be able to select all jobs assigned to a node).
Each node periodically updates its load status in transactional manner:
mnesia:transaction(fun() ->
mnesia:write(#cluster_view{node_name = node(),
load = CurrentLoad})
end).
where CurrentLoad is the value of current load of the node. I don't do any error handling here for brevity, but it should be done in production code.
And load balancing can be done in following way (this is not the most effective method, but simple enough):
mnesia:transaction(fun() ->
% QLC Query Handler to get sorted list of Jobs by
% weight field in descending order.
% Returns list of tuples of form {weight, job_id}
JLQH = qlc:sort(qlc:q([{J#jobs.weight, J#jobs.job_id} || J <- mnesia:table(jobs)]),
[{order, descending}]).
% Get two lists of jobs assigned to each node.
% E.g. if it returns {10,[1],8,[2,4,3]}, then
% job 1 with weight 10 has to be assigned to
% node A and jobs 1, 4 and 3 with total weight 8
% to node B.
qlc:fold(fun(J, D) ->if element(1, D) =< element(3, D) ->
{element(1, D) + element(1, J),
lists:append(element(2, D),[element(2, J)]),
element(3, D),
element(4, D)};
true ->
{element(1, D),
element(2, D),
element(3, D) + element(1, J),
lists:append(element(4, D), [element(2, J)])}
end
end,
{0, [], 0, []},
JLQH)
end).
This way only one node at each given time can check current nodes load and redistribute jobs atomically.
Thus, distributed Mnesia transactions are very useful to build simple cluster management system for small clusters (distributed transactions are very expensive in general, so they are absolutely not suitable for large clusters with intensive management operations), but it has number of drawbacks which make it hard to develop flexible cluster operations. The one of such things is that Mnesia does not have normal mechanism to detach the cluster. So if you need to detach a node from the cluster such that it will keep current cluster data snapshot, but won't replicate to and from other cluster nodes, then you have to dump all tables with dump_tables() to local text file, recreate local database schema and load the tables back. Other thing is that you have no enough flexibility in Mnesia setting to manage database schema in node failure case.
P.S. Ulf Wiger has given very good presentation about Mnesia ablilities in cluster environment.
Subscribe to:
Posts (Atom)