Archive

Posts Tagged ‘multithreading’

std::thread and std::future in C++11

December 12th, 2014 No comments

This is a quick note to chapter 4 of C++ Concurrency in Action.

1. std::thread

In C++11, It’s quite simple to create a separate thread using std::thread. Following code will simply output “hello world” or “world hello”:

#include <iostream>
#include <thread>
using namespace std;

void foo(const char *s) {
    cout << s << endl;
}

int main() {
    thread t(foo, "hello");
    cout << "world" << endl;
    /* destructor of std::thread calls std::terminate(), so we should call join() manually. */
    t.join();
    return 0;
}

2. std::mutex and std::condition_variable

If you need synchronization between threads, there are std::mutex and std::condition_variable. The semantics are the same with that in pthread library. Here’s a simple producer/consumer demo:

#include <iostream>
#include <chrono>
#include <condition_variable>
#include <mutex>
#include <queue>
#include <thread>
using namespace std;

queue<int> q;
mutex m;
condition_variable c;
const chrono::milliseconds ms(1000);

void producer() {
    static int i = 0;
    while (true) {
        m.lock();
        cout << "pushing " << i << endl;
        q.push(i++);
        c.notify_one();
        m.unlock();
        this_thread::sleep_for(ms);
    }
}

void consumer() {
    while (true) {
        unique_lock<mutex> lk(m);
        c.wait(lk, [](){ return !q.empty(); });
        int i = q.front();
        cout << "popping " << i << endl;
        q.pop();
        lk.unlock();
        this_thread::sleep_for(ms);
    }
}

int main() {
    thread t(consumer);
    thread t2(producer);
    t.join();
    t2.join();
    return 0;
}

3. std::future with std::async()

C++11 also simplifies our work with one-off events with std::future. std::future provides a mechanism to access the result of asynchronous operations. It can be used with std::async(), std::packaged_task and std::promise. Starting with std::async():

#include <iostream>
#include <future>
using namespace std;

void foo(const char *s) {
    cout << s << endl;
}

int bar(int a, int b) {
    return a + b;
}

int main() {
    /* auto will be simpler */
    future<void> f = std::async(foo, "hello");
    future<int> f2 = std::async(launch::async, bar, 1, 2);
    /* f.get() is required if f is deferred by the library */
    //f.get();
    /* std::async() can return a value */
    cout << "1 + 2 = " << f2.get() << endl;
    /* threads created by std::async() are joined automatically */
    return 0;
}

std::async() gives two advantages over the direct usage of std::thread. Threads created by it are automatically joined. And we can now have a return value. std::async() decides whether to run the callback function in a separate thread or just in the current thread. But there’s a chance to specify a control flag(launch::async or launch::deferred) to tell the library, what approach we want it to run the callback.

When testing With gcc-4.8, foo() is not called. But with VC++2013, it does output “hello”.

4. std::future with std::packaged_task

With std::async(), we cannot control when our callback function is invoked. That’s what std::packaged_task is designed to deal with. It’s just a wrapper to callables. We can request an associated std::future from it. And when a std::packaged_task is invoked and finished, the associated future will be ready:

#include <iostream>
#include <future>
using namespace std;

void foo() {
    cout << "in pt.." << endl;
}

int bar(int a, int b) {
    cout << "in pt2.." << endl;
    return a + b;
}

/* associate with tasks */
packaged_task<void()> pt(foo);
packaged_task<int(int,int)> pt2(bar);

void waiter() {
    /* get associated future */
    auto f = pt.get_future();
    /* wait here */
    f.get();
    cout << "after f.get().." << endl;
}

void waiter2() {
    auto f2 = pt2.get_future();
    f2.get();
    cout << "after f2.get().." << endl;
}

int main() {
    auto t = std::async(launch::async, waiter);
    auto t2 = std::async(launch::async, waiter2);
    /* associated futures will be ready when the packaged tasks complete */
    pt();
    pt2(1, 2);
    return 0;
}

In waiter() and waiter2(), future::get() blocks until the associating std::packaged_task completes. You will always get “in pt” before “after f.get()” and “in pt2” before “after f2.get()”. They are synchronized.

5. std::future with std::promise

You may also need to get notified in the middle of a task. std::promise can help you. It works like a lightweight event.

Future and Promise are the two separate sides of an asynchronous operation. std::promise is used by the “producer/writer”, while std::future is used by the “consumer/reader”. The reason it is separated into these two interfaces is to hide the “write/set” functionality from the “consumer/reader”:

#include <iostream>
#include <future>
using namespace std;

promise<bool> p;
promise<int> p2;

void waiter() {
    /* get associated future */
    auto f = p.get_future();
    /* wait here */
    f.get();
    cout << "after f.get().." << endl;
}

void waiter2() {
    auto f2 = p2.get_future();
    try {
        f2.get();
    } catch (...) {
        /* caught exception */
        cout << "caught exception in f2.get().." << endl;
    }
    cout << "after f2.get().." << endl;
}

int main() {
    auto t = std::async(launch::async, waiter);
    auto t2 = std::async(launch::async, waiter2);
    /* associated futures will be ready after a value is set */
    cout << "setting p.." << endl;
    p.set_value(true);
    /* exceptions can also be set */
    cout << "setting p2.." << endl;
    p2.set_exception(std::exception_ptr(nullptr));
    return 0;
}

Again in waiter() and waiter2(), future::get() blocks until a value or an exception is set into the associating std::promise. So “setting p” is always before “f.get()” and “setting p2” is always before “f2.get()”. They are synchronized.

NOTE: std::future seems to be not correctly implemented in VC++2013. So the last two code snippet do not work with it. But you can try the online VC++2015 compiler(still in preview as this writing), it works.

Categories: C/C++ Tags: ,

Different Semantics Between Win32 Events and Condition Variables

November 26th, 2014 No comments

Following the last post, I’m trying to implement a thread pool for practise, which supposed to work under both Windows and Linux platform. But the different semantics between Win32 events and condition variables makes it impossible to code in a unified logic. First, Linux uses mutex and condition variable to keep synchronization. While there is only event under Windows. Then, pthread_cond_signal() does nothing if no thread is currently waiting on the condition:

int main() {
    pthread_mutex_t m;
    pthread_cond_t c;
    pthread_mutex_init(&m, NULL);
    pthread_cond_init(&c, NULL);
    /* signal & wait */
    pthread_cond_signal(&c);
    pthread_mutex_lock(&m); /* hangs here, since signal is missed. */
    pthread_cond_wait(&c, &m);
    pthread_mutex_unlock(&m);
    /* destroy */
    pthread_mutex_destroy(&m);
    pthread_cond_destroy(&c);
    return 0;
}

But under Windows, code below simply pass through:

int main() {
    HANDLE e = CreateEvent(NULL, FALSE, FALSE, NULL);
    SetEvent(e);
    WaitForSingleObject(e, INFINITE); /* won't miss. */
    return 0;
}

And, under Windows Vista and later versions, a new series of synchronization API was introduced to align with the Linux API:

int main() { /* requires windows vista and later. */
    CRITICAL_SECTION cs;
    CONDITION_VARIABLE cv;
    InitializeCriticalSection(&cs);
    InitializeConditionVariable(&cv);
    /* signal & wait */
    WakeConditionVariable(&cv);
    EnterCriticalSection(&cs);
    SleepConditionVariableCS(&cv, &cs, INFINITE); /* hangs here, since signal is missed. */
    LeaveCriticalSection(&cs);
    /* destroy */
    DeleteCriticalSection(&cs);
    return 0;
}
Categories: C/C++ Tags: ,

Spurious Wakeups

November 20th, 2014 No comments

http://vladimir_prus.blogspot.com/2005/07/spurious-wakeups.html

One of the two basic synchronisation primitives in multithreaded programming is called “condition variables”. Here’s a small example:

bool something_happened;
boost::mutex m;
boost::condition_variable c;
void thread1() {
    boost::mutex::scoped_lock(m);
    while (!something_happened) {
        c.wait(m);
    }
}
void thread2() {
    // do lots of work
    boost::mutex::scoped_lock(m);
    something_happened = true;
    c.notify_one();
}

Here, the call to “c.wait()” unlocks the mutex (allowing the other thread to eventually lock it), and suspends the calling thread. When another thread calls ‘notify’, the first thread wakes up, locks the mutex again (implicitly, inside ‘wait’), sees that variable is set to ‘true’ and goes on.

But why do we need the while loop, can’t we write:

    if (!something_happened) {
        c.wait(m);
    }

We can’t. And the killer reason is that ‘wait’ can return without any ‘notify’ call. That’s called spurious wakeup and is explicitly allowed by POSIX. Essentially, return from ‘wait’ only indicates that the shared data might have changed, so that data must be evaluated again.

Okay, so why this is not fixed yet? The first reason is that nobody wants to fix it. Wrapping call to ‘wait’ in a loop is very desired for several other reasons. But those reasons require explanation, while spurious wakeup is a hammer that can be applied to any first year student without fail.

The second reason is that fixing this is supposed to be hard. Most sources I’ve seen say that fixing that would require very large overhead on certain architectures. Strangely, no details were ever given, which made me wonder if avoiding spurious wakeups is simple, but all the threading experts secretly decided to tell everybody it’s hard.

After asking on comp.programming.thread, I at least know the reason for Linux (thanks to Ben Hutchings). Internally, wait is implemented as a call to the ‘futex’ system call. Each blocking system call on Linux returns abruptly when the process receives a signal — because calling signal handler from kernel call is tricky. What if the signal handler calls some other system function? And a new signal arrives? It’s easy to run out of kernel stack for a process. Exactly because each system call can be interrupted, when glibc calls any blocking function, like ‘read’, it does it in a loop, and if ‘read’ returns EINTR, calls ‘read’ again.

Can the same trick be used to conditions? No, because the moment we return from ‘futex’ call, another thread can send us notification. And since we’re not waiting inside ‘futex’, we’ll miss the notification(A third thread can get it, and change the value of predicate. — gonwan). So, we need to return to the caller, and have it reevaluate the predicate. If another thread indeed set it to true, we’ll break out of the loop.

So much for spurious wakeups on Linux. But I’m still very interested to know what the original reasons were.

==============================
Also see the explanation for spurious wakeups on the linux man page: pthread_cond_signal.
Last note: PulseEvent() in windows(manual-reset) = pthread_cond_signal() in linux, while SetEvent() in windows(auto-reset) = pthread_cond_broadcast() in linux, see here and here. And spurious wakeups are also possible on windows when using condition variables.

Categories: C/C++ Tags: ,

Logging in Multithreaded Environment Using Thread-Local Storage

April 12th, 2013 No comments

Generally, A logger is a singleton class. The declaration may look like:

#ifndef _LOGGER_H
#define _LOGGER_H

#include <string>

class Logger
{
private:
    Logger() { }
public:
    static void Init(const std::string &name);
    static Logger *GetInstance();
    void Write(const char *format, ...);
private:
    static std::string ms_name;
    static Logger *ms_this_logger;
};

#endif

The Init function is used to set log name or maybe other configuration information. And We can use the Write function to write logs.

Well, in a multithreaded environment, locks must be added to prevent concurrent issues and keep the output log in order. And sometimes we want to have separate log configurations. How can we implement it without breaking the original interfaces?

One easy way is to maintain a list of all available Logger instances, so that we can find and use a unique Logger in each thread. The approach is somehow like the one used in log4j. But log4j reads configuration files to initialize loggers, while our configuration information is set in runtime.

Another big issue is that we must add a new parameter to the GetInstance function to tell our class which Logger to return. The change breaks interfaces.

By utilizing TLS (thread-local storage), we can easily solve the above issues. Every logger will be thread-local, say every thread has its own logger instance which is stored in its thread context. Here comes the declaration for our new Logger class, boost::thread_specific_ptr from boost library is used to simplify our TLS operations:

#ifndef _LOGGER2_H
#define _LOGGER2_H

#include <string>
#include <boost/thread.hpp>

class Logger
{
private:
    Logger() { }
public:
    static void Init(const std::string &name);
    static Logger *GetInstance();
    void Write(const char *format, ...);
private:
    static boost::thread_specific_ptr<std::string> ms_name;
    static boost::thread_specific_ptr<Logger> ms_this_logger;
};

#endif

Simply use boost::thread_specific_ptr to wrap the original 2 static variables, and they will be in TLS automatically, that’s all. The implementation:

#include "logger2.h"
#include <stdio.h>
#include <stdarg.h>
#include <string.h>

using namespace std;

boost::thread_specific_ptr<string> Logger::ms_name;
boost::thread_specific_ptr<Logger> Logger::ms_this_logger;

void Logger::Init(const string &name)
{
    if (!name.empty()) {
        ms_name.reset(new std::string(name));
    }
}

Logger *Logger::GetInstance()
{
    if (ms_this_logger.get() == NULL) {
        ms_this_logger.reset(new Logger);
    }
    return ms_this_logger.get();
}

void Logger::Write(const char *format, ...)
{
    va_list arglist;
    char buffer[1024];
    va_start(arglist, format);
    memset(buffer, 0, sizeof(buffer));
    vsnprintf(buffer, sizeof(buffer), format, arglist);
    va_end(arglist);
    printf("[%s] %s\n", ms_name.get()->c_str(), buffer);
}

Our test code:

#include <boost/date_time.hpp>
#include <boost/thread.hpp>
/*
 * actually, we do not matter which header file to include,
 * since they have compatible public interface, compatible ABI.
 */
//#include "logger.h"
#include "logger2.h"

using namespace std;

class Thread
{
public:
    Thread(const char *name) : m_name(name) { }
    void operator()()
    {
        /* set logger name in thread */
        Logger::Init(m_name);
        /* call GetInstance() and Write() in other functions with thread-local enabled */
        Logger *logger = Logger::GetInstance();
        for (int i = 0; i < 3; i++) {
            logger->Write("Hello %d", i);
            boost::this_thread::sleep(boost::posix_time::seconds(1));
        }
    }
private:
    string m_name;
};

int main()
{
    boost::thread t1(Thread("name1"));
    boost::thread t2(Thread("name2"));
    t1.join();
    t2.join();
    return 0;
}

Output when using the original Logger may look like:

[name1] Hello 0
[name2] Hello 0
[name2] Hello 1
[name2] Hello 1
[name2] Hello 2
[name2] Hello 2

When using the TLS version, it may look like:

[name1] Hello 0
[name2] Hello 0
[name1] Hello 1
[name2] Hello 1
[name1] Hello 2
[name2] Hello 2

Everything is in order now. You may want to know what OS API boost uses to achieve TLS. I’ll show you the details in boost 1.43:

# windows implementation
boost::thread_specific_ptr::reset()
  --> boost::detail::set_tss_data()
  --> boost::detail::get_or_make_current_thread_data()
  --> boost::detail::get_current_thread_data()
  --> ::TlsGetValue()
# see:
# ${BOOST}/boost/thread/tss.hpp
# ${BOOST}/lib/thread/src/win32/thread.cpp
# *nix implementation
boost::thread_specific_ptr::reset()
  --> boost::detail::set_tss_data()
  --> boost::detail::add_new_tss_node()
  --> boost::detail::get_or_make_current_thread_data()
  --> boost::detail::get_current_thread_data()
  --> ::pthread_getspecific()
# see:
# ${BOOST}/boost/thread/tss.hpp
# ${BOOST}/lib/thread/src/pthread/thread.cpp

The underlying API is TlsGetValue under windows and pthread_getspecific under *nix platforms.

Categories: C/C++ Tags: , ,