漫话 C++11 之 condition_variable

condition_variable 是条件变量,通常和 mutex 搭配使用。

condition_variable 对象的某个 wait 函数被调用的时候,它使用 unique_lock 来锁住当前线程。当前线程会一直被阻塞,直到另外一个线程在相同的 condition_variable 对象上调用了 notification 函数来唤醒当前线程。

condition_variable 对象通常使用 unique_lock<mutex> 来等待,如果需要使用另外的 lockable 类型,可以使用 condition_variable_any 类。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
// condition_variable example
#include <iostream> // std::cout
#include <thread> // std::thread
#include <mutex> // std::mutex, std::unique_lock
#include <condition_variable> // std::condition_variable

static std::mutex mtx;
static std::condition_variable cv;
static bool ready = false;

static void print_id (int id) {
std::unique_lock<std::mutex> lck(mtx);
while (!ready) {
cv.wait(lck);
}

std::cout << "id=" << id << std::endl;
}

static void go () {
std::unique_lock<std::mutex> lck(mtx);
ready = true;
cv.notify_all();
}

int main () {
std::thread threads[10];
// spawn 10 threads:
for (int i=0; i<10; ++i)
threads[i] = std::thread(print_id,i);

std::cout << "10 threads ready to race...\n";
go(); // go!

for (auto& th : threads)
th.join();

return 0;
}
  • 只提供了默认构造函数。
  • wait、wait_for、wait_until 方法
type arg
unconditional void wait (unique_lock& lck);
predicate template void wait (unique_lock& lck, Predicate pred);

其中 pred 是一个可调用对象/方法,

  • 它不接受任何参数
  • 返回值必须可以被转化为 bool

unconditional

在线程被阻塞时,wait 函数会自动调用 lck.unlock() 来释放锁,使得其他被阻塞在锁竞争上的线程得以继续执行。

一旦当前线程被唤醒 notifiedwait函数会重新调用 lck.lock(),使得 lck的状态和 wait 函数被调用时相同。

Predicate

只有当 pred 条件为 false 时调用 wait() 才会阻塞当前线程,并且在收到其他线程的通知后只有当 predtrue 时才会被解除阻塞。

举个栗子

下面是一个使用 pred 模式的 wait 实现的「单生产者-单消费者」模型:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
// condition_variable::wait (with predicate)
#include <iostream> // std::cout
#include <thread> // std::thread, std::this_thread::yield
#include <mutex> // std::mutex, std::unique_lock
#include <condition_variable> // std::condition_variable

std::mutex mtx;
std::condition_variable cv;

int cargo = 0;
bool shipment_available() {return cargo!=0;}

void consume (int n) {
for (int i=0; i<n; ++i) {
std::unique_lock<std::mutex> lck(mtx);
cv.wait(lck,shipment_available);
// consume:
std::cout << cargo << '\n';
// notify
cargo=0;
}
}

int main ()
{
std::thread consumer_thread (consume,10);

// produce 10 items when needed:
for (int i=0; i<10; ++i) {
while (shipment_available())
std::this_thread::yield(); //wait cargo to be consumed
std::unique_lock<std::mutex> lck(mtx);
cargo = i+1;
cv.notify_one();
}

consumer_thread.join();

return 0;
}
彦祖老师 wechat