c++11线程

std::thread

  • thread 的构造函数的第一个参数是函数(对象),后面跟的是这个函数所需的参数。
  • thread 要求在析构之前要么 join(阻塞直到线程退出),要么 detach(放弃对线程的管理),否则程序会异常退出。
  • 只有joinable(已经join的、已经detach的或者空的线程对象都不满足joinable)的thread才可以对其调用 join 成员函数,否则会引发异常
  • c++11还提供了获取线程id,或者系统cpu个数,获取thread native_handle,使得线程休眠等功能

下面的代码执行如下流程:

  1. 传递参数,起两个线程
  2. 两个线程分别休眠100毫秒
  3. 使用互斥量(mutex)锁定cout,然后输出一行信息
  4. 主线程等待这两个线程退出后程序结束
  5. 用lambda匿名函数
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
#include <chrono>
#include <iostream>
#include <mutex>
#include <thread>
using namespace std;
mutex mtx;
void func(const char *name)
{
this_thread::sleep_for(100ms);
lock_guard<mutex> guard(mtx);
cout << "I am thread " << name << '\n';
}
int main()
{
thread t1(func, "A");
thread t2(func, "B");
t1.join();
t2.join();
auto func1 = [](int k) {
for (int i = 0; i < k; ++i) {
cout << i << " ";
}
cout << endl;
};
std::thread tt(func1, 20);
if (tt.joinable()) { // 检查线程可否被join
tt.join();
}
cout << "当前线程ID " << tt.get_id() << endl;
cout << "当前cpu个数 " << std::thread::hardware_concurrency() << endl;
auto handle = tt.native_handle();// handle可用于pthread相关操作
std::this_thread::sleep_for(std::chrono::seconds(1));
}

thread不能在析构时自动join,感觉不是很自然,但是在C++20的jthread到来之前,只能这样用着,附近的笔记参考现代C++实战30讲,用到了自定义的scoped_thread,可以简单认为更智能的std::thread

std::mutex

std::mutex是一种线程同步的手段,用于保存多线程同时操作的共享数据。mutex分为四种:

  • std::mutex:独占的互斥量,不能递归使用,不带超时功能
  • std::recursive_mutex:递归互斥量,可重入,不带超时功能
  • std::timed_mutex:带超时的互斥量,不能递归
  • std::recursive_timed_mutex:带超时的互斥量,可以递归使用

std::mutex不允许拷贝构造,初始是unlock状态

三个函数

  1. lock():三种情况
    1. 如果该mutex没有被锁,则上锁
    2. 如果该mutex被其他线程锁住,则当前线程阻塞,直至其他线程解锁
    3. 如果该mutex被当前线程锁住(递归上锁),则产生死锁
  2. unlock():只允许在已获得锁时调用
  3. try_lock():相当于非阻塞的加锁,三种情况
    1. 如果该mutex没有被锁,则上锁
    2. 如果该mutex被其他线程锁住,则当前线程返回false,直至其他线程解锁
    3. 如果该mutex被当前线程锁住(递归上锁),则产生死锁
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
#include <iostream>
#include <mutex>
#include <thread>
#include <chrono>

using namespace std;
std::timed_mutex timed_mutex_;

int main() {
auto func1 = [](int k) {
timed_mutex_.try_lock_for(std::chrono::milliseconds(200));
for (int i = 0; i < k; ++i) {
cout << i << " ";
}
cout << endl;
timed_mutex_.unlock();
};
std::thread threads[5];
for (int i = 0; i < 5; ++i) {
threads[i] = std::thread(func1, 200);
}
for (auto& th : threads) {
th.join();
}
return 0;
}

unique_lock与lock_guard区别

两者都包含于头文件<mutex>中,C++11

这两个都是类模板,用RAII的思想来处理锁,不用手动mutex.lock()、mutex.unlock()

  • lock_guard只有构造函数,直接构造即可,在整个区域内有效,在块内{}作为局部变量,自动析构
  • unique_lock更加灵活,还提供lock()、try_lock()、unlock()等函数,所以可以在必要时加锁和解锁,不必像lock_guard一样非得在构造和析构时加锁和解锁
  • unique_lock在效率上差一点,内存占用多一点。
  • 条件变量cond_variable的接收参数是unique_lock
1
2
3
4
5
6
std::mutex mtx;
void some_func() {
std::lock_guard<std::mutex> guard(mtx);
// 做需要同步的工作
// 函数结束时,自动释放局部对象lock_guard
}

std::atomic

头文件<atomic>定义了原子量和内存序(C++11起),用atomic_int/atomic_bool/…代替int/bool/…,即可保证这些操作都是原子性的,比mutex对资源加锁解锁要快

编译器提供了一个原子对象的成员函数 is_lock_free,可以检查这个原子对象上的操作是否是无锁的

atomic规定了内存序,这样就有了内存屏障,防止编译器优化

原子操作有三类:

  • 读:在读取的过程中,读取位置的内容不会发生任何变动。
  • 写:在写入的过程中,其他执行线程不会看到部分写入的结果。
  • 读‐修改‐写:读取内存、修改数值、然后写回内存,整个操作的过程中间不会有其他写入操作插入,其他执行线程不会看到部分写入的结果。

可以用于单例模式中双检锁失效的问题

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36

struct OriginCounter { // 普通的计数器
int count;
std::mutex mutex_;
void add() {
std::lock_guard<std::mutex> lock(mutex_);
++count;
}

void sub() {
std::lock_guard<std::mutex> lock(mutex_);
--count;
}

int get() {
std::lock_guard<std::mutex> lock(mutex_);
return count;
}
};

struct NewCounter { // 使用原子变量的计数器
std::atomic<int> count;
void add() {
++count;
// count.store(++count);这种方式也可以
}

void sub() {
--count;
// count.store(--count);
}

int get() {
return count.load();
}
};

std::callonce

c++11提供了std::call_once来保证某一函数在多线程环境中只调用一次,它需要配合std::once_flag使用,直接看使用代码吧:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
std::once_flag onceflag;
void CallOnce() {
std::call_once(onceflag, []() {
cout << "call once" << endl;
});
}

int main() {
std::thread threads[5];
for (int i = 0; i < 5; ++i) {
threads[i] = std::thread(CallOnce);
}
for (auto& th : threads) {
th.join();
}
return 0;
}

std::condition_variable与虚假唤醒

当条件不满足时,相关线程被一直阻塞,直到某种条件出现,这些线程才会被唤醒

条件变量一般与互斥锁结合,通常与unique_lock类模板结合使用

  • wait:当前线程阻塞直至条件变量被通知或被虚假唤醒,若用版本二,则唤醒必须满足谓词函数(还有指定时长的wait_for和指定截止时间的wait_until)

    1
    2
    3
    4
    5
    // 版本一:因为存在虚假唤醒,所以没有谓词函数的版本一般不用!
    void wait( std::unique_lock<std::mutex>& lock );
    // 版本二:Predicate 谓词函数,可以普通函数或者lambda表达式
    template< class Predicate >
    void wait( std::unique_lock<std::mutex>& lock, Predicate pred );
  • notify_all/notify_one:通知

    1
    2
    3
    4
    // 若任何线程在 *this 上等待,则调用 notify_one 会解阻塞(唤醒)等待线程之一。
    void notify_one() noexcept;
    // 若任何线程在 *this 上等待,则解阻塞(唤醒)全部等待线程。
    void notify_all() noexcept;

虚假唤醒:

在正常情况下,wait类型函数返回时要不是因为被唤醒,要不是因为超时才返回,但是在实际中发现,因处理器的原因,多个线程可能都会被唤醒(即使用的是pthread_cond_signal()或notify_one()),那我们要让虚假唤醒的线程睡回去,所以一般都是使用带有谓词参数的wait函数,

1
cond.wait(lock, [](){return status});

因为这种(xxx, Predicate pred)类型的函数等价于:

1
2
3
4
while (!pred()) //while循环,解决了虚假唤醒的问题
{
wait(lock);
}

“惊群效应”。有人觉得此处既然是被唤醒的,肯定是满足条件了,其实不然。如果是多个线程都在等待这个条件,而同时只能有一个线程进行处理,此时就必须要再次条件判断,以使只有一个线程进入临界区处理。

pthread_cond_signal()也可能唤醒多个线程,而如果你同时只允许一个线程访问的话,就必须要使用while来进行条件判断,以保证临界区内只有一个线程在处理。

为什么条件变量需要和锁配合使用?

因为内部是通过判断及修改某个全局变量来决定线程的阻塞与唤醒,多线程操作同一个变量肯定需要加锁来使得线程安全。同时,一个简单的wait函数调用内部会很复杂的,有可能线程A调用了wait函数但是还没有进入到wait阻塞等待前,另一个线程B在此时却调用了notify函数,此时nofity的信号就丢失啦,如果加了锁,线程B必须等待线程A释放了锁并进入了等待状态后才可以调用notify,继而防止信号丢失。

std::future/std::promise/std::packaged_task/std::async/std::shared_future

  • 头文件<future>就包括了这五者,C++11
std::future 与 std::promise
  • std::future作为异步结果的传输通道,通过get()可以很方便的获取线程函数的返回值,std::promise用来包装一个值,将数据和future绑定起来,而std::packaged_task则用来包装一个调用对象,将函数和future绑定起来,方便异步调用。而std::future是不可以复制的,如果需要复制放到容器中可以使用std::shared_future。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
#include <functional>
#include <future>
#include <iostream>
#include <thread>
using namespace std;
void func(std::future<int>& fut) {
int x = fut.get(); // 阻塞直到源 promise 调用了 set_value
cout << "value: " << x << endl;
}
int main() {
std::promise<int> prom;
std::future<int> fut = prom.get_future();
std::thread t(func, std::ref(fut));
prom.set_value(144);
t.join();
return 0;
}
std::packaged_task
  • std::packaged_task则用来包装一个调用对象,将函数和future绑定起来,方便异步调用。
  • 理解:
    • std::future用于访问异步操作的结果,而std::promise和std::packaged_task在future高一层,它们内部都有一个future,promise包装的是一个值,packaged_task包装的是一个函数
    • packaged_task ≈ promise + function
1
2
3
4
5
6
7
8
9
10
#include <future>
#include <iostream>
#include <thread>
int main() {
std::packaged_task<int(int, int)> task([](int a, int b) { return a + b; });
auto f = task.get_future();
std::thread t(std::move(task), 1, 2);
std::cout << f.get() << std::endl;
if (t.joinable()) t.join();
}
  • 注意一个future上只能调用一个get函数,第二次会导致程序崩溃,所以要想在多线程调用future,得用future.share()方法生成shared_future,当然底层还是只用了一次get函数。
std::async
  • future与async配合使用,可以从异步任务中获取结果,std::async用于创建异步任务,实际上就是创建一个线程执行相应任务,返回的结果会保存在 future 中,不需要像 packaged_task 和 promise 那么麻烦,线程操作应优先使用 async
  • async ≈ thread + packaged_task
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
// 没有future与async的做法,非常冗余,定义了一堆变量
void work(condition_variable& cv, int& result)
{
// 假装我们计算了很久
this_thread::sleep_for(2s); result = 42;
cv.notify_one();
}
int main(){
condition_variable cv;
mutex cv_mut;
int result;
scoped_thread th{work, ref(cv), ref(result)};
cout << "I am waiting now\n"; unique_lock lock{cv_mut};
cv.wait(lock);
cout << "Answer: " << result;
}

// 引入future与async,非常简单
int work() {
// 假装我们计算了很久
this_thread::sleep_for(2s); return 42;
}
int main() {
auto fut = async(launch::async, work); // 调用 async 可以获得一个未来量
cout << "I am waiting now\n";
cout << "Answer: " << fut.get(); // 在未来量上调用 get 成员函数可以获得其结果
}


// 第一个参数是创建策略:
// std::launch::async表示任务执行在另一线程
// std::launch::deferred表示延迟执行任务,调用get或者wait时才会执行,不会创建线程,惰性执行在当前线程。
async(std::launch::async | std::launch::deferred, func, args...);
future与promise可以实现多线程同步

线程1初始化一个promise对象和一个future对象,promise传递给线程2,相当于线程2对线程1的一个承诺;future相当于一个接受一个承诺,用来获取未来线程2传递的值。线程2获取到promise后,需要对这个promise传递有关的数据,之后线程1的future就可以获取数据了。一组promise和future只能使用一次,既不能重复设,也不能重复取。

比如下面的例子,promise和future在这里成对出现,可以看作是一个一次性管道:有人需要兑现承诺,往promise里放东西(set_value);有人就像收期货一样,到时间去future里拿(get)就行了。我们把prom移动给新线程,这样老线程就完全不需要管理它的生命周期了。

1
2
3
4
5
6
7
8
void work(promise<int> prom) {
// 假装我们计算了很久
this_thread::sleep_for(2s); prom.set_value(42);
}
int main() {
promise<int> prom;
auto fut = prom.get_future(); scoped_thread th{work, move(prom)};
cout << "I am waiting now\n"; cout << "Answer: " << fut.get();
std::shared_future
  • 普通的future有个特点,它不能拷贝,只能移动,这就意味着只能有一个线程一个实例可以通过get()拿到对应的结果。
  • 如果想要多个线程多个实例拿到结果,就可以使用shared_future,调用普通 future 的 shared()方法
1
2
3
4
5
6
7
8
9
10
11
12
13
#include <future>
#include <iostream>
#include <thread>
int main() {
std::promise<int> prom;
auto fu = prom.get_future();
auto shared_fu = fu.share();
auto f1 = std::async(std::launch::async, [shared_fu]() { std::cout << shared_fu.get() << std::endl; });
auto f2 = std::async(std::launch::async, [shared_fu]() { std::cout << shared_fu.get() << std::endl; });
prom.set_value(102);
f1.get();
f2.get();
}

应用一:两个线程交替打印

用到了unique_lock来管理mutex,还有条件变量condition_variable来通知另一个线程,

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
#include <iostream>
#include <thread>
#include <mutex>
#include <condition_variable>
using namespace std;
mutex mtx;
condition_variable cond_var;
bool flag = true;
int maxIter = 10;
void printA(){
while(1){
unique_lock<std::mutex> ulock(mtx);
cond_var.wait(ulock, []{return flag;});
cout << "threadA: " << flag << endl;
flag = false;
cond_var.notify_one();
if (--maxIter <= 0) break;
}
}
void printB(){
while(1){
unique_lock<std::mutex> ulock(mtx);
cond_var.wait(ulock, []{return !flag;});
cout << "threadB: " << flag << endl;
flag = true;
cond_var.notify_one();
if (--maxIter <= 0) break;
}
}
int main()
{
thread t1(printA);
thread t2(printB);
t1.join();
t2.join();
}

应用二:生产者消费者模型

多生产者多消费者,利用互斥锁、条件变量、阻塞队列实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
#include <iostream>
#include <queue>
#include <thread>
#include <mutex>
#include <condition_variable>
using namespace std;
mutex mtx;
condition_variable produce, consume; // 条件变量是一种同步机制,要和mutex以及lock一起使用
queue<int> q; // shared value by producers and consumers, which is the critical section
int maxSize = 20;
int maxConsume = 100;
int maxProduce = 100;
void consumer() {
while (true) {
unique_lock<mutex> lck(mtx); // RAII
consume.wait(lck, [] {return q.size() != 0; }); // wait(block) consumer until q.size() != 0 is true
cout << "consumer " << this_thread::get_id() << ": ";
q.pop();
cout << q.size() << '\n';
produce.notify_all(); // notify(wake up) producer when q.size() != maxSize is true
if (--maxConsume <= 0) break;
}
}

void producer(int id) {
while (true) {
unique_lock<mutex> lck(mtx);
produce.wait(lck, [] {return q.size() != maxSize; }); // wait(block) producer until q.size() != maxSize is true
cout << "-> producer " << this_thread::get_id() << ": ";
q.push(id);
cout << q.size() << '\n';
consume.notify_all(); // notify(wake up) consumer when q.size() != 0 is true
if (--maxProduce <= 0) break;
}
}
int main()
{
thread consumers[2], producers[2];
for (int i = 0; i < 2; ++i) {
consumers[i] = thread(consumer);
producers[i] = thread(producer, i + 1);
}
// 结束时必须要调用join,不然会异常退出
for (int i = 0; i < 2; ++i) {
producers[i].join();
consumers[i].join();
}
}

应用三:线程池

线程池中线程,在线程池中等待并执行分配的任务,用条件变量实现等待与通知机制

基于C++11实现线程池的工作原理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
class ThreadPool{
public:
static const int kInitThreadsSize = 3;
enum taskPriorityE { level0, level1, level2, }; // 优先级
typedef std::function<void()> Task;
typedef std::pair<taskPriorityE, Task> TaskPair;

private:
typedef std::vector<std::thread*> Threads;
typedef std::priority_queue<TaskPair, std::vector<TaskPair>, TaskPriorityCmp> Tasks;

Threads m_threads;
Tasks m_tasks;

std::mutex m_mutex;
Condition m_cond;
bool m_isStarted;

public:
ThreadPool() : m_mutex(), m_cond(m_mutex), m_isStarted(false) {}
~ThreadPool() { if(m_isStarted) stop();}

void start() {
m_isStarted = true;
m_threads.reserve(kInitThreadsSize);
for (int i = 0; i < kInitThreadsSize; ++i) {
m_threads.push_back(new std::thread(std::bind(&threadLoop, this)));
}
}
void stop() {
{
std::unique_lock<std::mutex> lock(m_mutex);
m_isStarted = false;
m_cond.notifyAll();
}
for (auto it = m_threads.begin(); it != m_threads.end(); ++it) {
(*it)->join();
delete *it;
}
m_threads.clear();
}
void addTask(const Task&) {
std::unique_lock<std::mutex> lock(m_mutex);
TaskPair taskPair(level2, task);
m_tasks.push(taskPair);
m_cond.notify();
}

private:
ThreadPool(const ThreadPool&);//禁止复制拷贝.
const ThreadPool& operator=(const ThreadPool&);

struct TaskPriorityCmp {
bool operator() (const ThreadPool::TaskPair p1, const ThreadPool::TaskPair p2) {
return p1.first > p2.first; // first的小值优先
}
};

void threadLoop() {
while (m_isStarted) {
Task task = take();
if (task) {
task();
}
}
}
Task take() {
std::unique_lock<std::mutex> lock(m_mutex);
while(m_tasks.empty() && m_isStarted) { // 避免虚假唤醒
m_cond.wait(lock);
}
Task task;
auto size = m_tasks.size();
if (!m_tasks.empty() && m_isStarted) {
task = m_tasks.top().second;
m_tasks.pop();
}
return task;
}
};

参考:c++11新特性之线程相关所有知识点 by 程序喵大人