c++11线程 std::thread
thread 的构造函数的第一个参数是函数(对象),后面跟的是这个函数所需的参数。
thread 要求在析构之前要么 join(阻塞直到线程退出),要么 detach(放弃对线程的管理),否则程序会异常退出。
只有joinable(已经join的、已经detach的或者空的线程对象都不满足joinable)的thread才可以对其调用 join 成员函数,否则会引发异常
c++11还提供了获取线程id,或者系统cpu个数,获取thread native_handle,使得线程休眠等功能
下面的代码执行如下流程:
传递参数,起两个线程
两个线程分别休眠100毫秒
使用互斥量(mutex)锁定cout,然后输出一行信息
主线程等待这两个线程退出后程序结束
用lambda匿名函数
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 #include <chrono> #include <iostream> #include <mutex> #include <thread> using namespace std;mutex mtx; void func (const char *name) { this_thread::sleep_for (100 ms); lock_guard<mutex> guard (mtx) ; cout << "I am thread " << name << '\n' ; } int main () { thread t1 (func, "A" ) ; thread t2 (func, "B" ) ; t1.join (); t2.join (); auto func1 = [](int k) { for (int i = 0 ; i < k; ++i) { cout << i << " " ; } cout << endl; }; std::thread tt (func1, 20 ) ; if (tt.joinable ()) { tt.join (); } cout << "当前线程ID " << tt.get_id () << endl; cout << "当前cpu个数 " << std::thread::hardware_concurrency () << endl; auto handle = tt.native_handle (); std::this_thread::sleep_for (std::chrono::seconds (1 )); }
thread不能在析构时自动join,感觉不是很自然,但是在C++20的jthread到来之前,只能这样用着,附近的笔记参考现代C++实战30讲,用到了自定义的scoped_thread,可以简单认为更智能的std::thread
std::mutex std::mutex是一种线程同步的手段,用于保存多线程同时操作的共享数据。mutex分为四种:
std::mutex:独占的互斥量,不能递归使用,不带超时功能
std::recursive_mutex:递归互斥量,可重入,不带超时功能
std::timed_mutex:带超时的互斥量,不能递归
std::recursive_timed_mutex:带超时的互斥量,可以递归使用
std::mutex不允许拷贝构造,初始是unlock状态
三个函数
lock():三种情况
如果该mutex没有被锁,则上锁
如果该mutex被其他线程锁住,则当前线程阻塞 ,直至其他线程解锁
如果该mutex被当前线程锁住(递归上锁),则产生死锁
unlock():只允许在已获得锁时调用
try_lock():相当于非阻塞 的加锁,三种情况
如果该mutex没有被锁,则上锁
如果该mutex被其他线程锁住,则当前线程返回false ,直至其他线程解锁
如果该mutex被当前线程锁住(递归上锁),则产生死锁
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 #include <iostream> #include <mutex> #include <thread> #include <chrono> using namespace std;std::timed_mutex timed_mutex_; int main () { auto func1 = [](int k) { timed_mutex_.try_lock_for (std::chrono::milliseconds (200 )); for (int i = 0 ; i < k; ++i) { cout << i << " " ; } cout << endl; timed_mutex_.unlock (); }; std::thread threads[5 ]; for (int i = 0 ; i < 5 ; ++i) { threads[i] = std::thread (func1, 200 ); } for (auto & th : threads) { th.join (); } return 0 ; }
unique_lock与lock_guard区别 两者都包含于头文件<mutex>
中,C++11
这两个都是类模板,用RAII的思想来处理锁,不用手动mutex.lock()、mutex.unlock()
lock_guard只有构造函数,直接构造即可,在整个区域内有效,在块内{}
作为局部变量,自动析构
unique_lock更加灵活,还提供lock()、try_lock()、unlock()等函数,所以可以在必要时加锁和解锁,不必像lock_guard一样非得在构造和析构时加锁和解锁
unique_lock在效率上差一点,内存占用多一点。
条件变量cond_variable的接收参数是unique_lock
1 2 3 4 5 6 std::mutex mtx; void some_func () { std::lock_guard<std::mutex> guard (mtx) ; }
std::atomic 头文件<atomic>
定义了原子量和内存序(C++11起),用atomic_int/atomic_bool/…代替int/bool/…,即可保证这些操作都是原子性的,比mutex对资源加锁解锁要快
编译器提供了一个原子对象的成员函数 is_lock_free,可以检查这个原子对象上的操作是否是无锁的
atomic规定了内存序,这样就有了内存屏障,防止编译器优化
原子操作有三类:
读:在读取的过程中,读取位置的内容不会发生任何变动。
写:在写入的过程中,其他执行线程不会看到部分写入的结果。
读‐修改‐写:读取内存、修改数值、然后写回内存,整个操作的过程中间不会有其他写入操作插入,其他执行线程不会看到部分写入的结果。
可以用于单例模式中双检锁失效的问题
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 struct OriginCounter { int count; std::mutex mutex_; void add () { std::lock_guard<std::mutex> lock (mutex_) ; ++count; } void sub () { std::lock_guard<std::mutex> lock (mutex_) ; --count; } int get () { std::lock_guard<std::mutex> lock (mutex_) ; return count; } }; struct NewCounter { std::atomic<int > count; void add () { ++count; } void sub () { --count; } int get () { return count.load (); } };
std::callonce c++11提供了std::call_once来保证某一函数在多线程环境中只调用一次,它需要配合std::once_flag使用,直接看使用代码吧:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 std::once_flag onceflag; void CallOnce () { std::call_once (onceflag, []() { cout << "call once" << endl; }); } int main () { std::thread threads[5 ]; for (int i = 0 ; i < 5 ; ++i) { threads[i] = std::thread (CallOnce); } for (auto & th : threads) { th.join (); } return 0 ; }
std::condition_variable与虚假唤醒 当条件不满足时,相关线程被一直阻塞,直到某种条件出现,这些线程才会被唤醒
条件变量一般与互斥锁结合,通常与unique_lock类模板结合使用
虚假唤醒:
在正常情况下,wait类型函数返回时要不是因为被唤醒,要不是因为超时才返回,但是在实际中发现,因处理器的原因,多个线程可能都会被唤醒(即使用的是pthread_cond_signal()或notify_one()),那我们要让虚假唤醒的线程睡回去,所以一般都是使用带有谓词参数的wait函数,
1 cond.wait (lock, [](){return status});
因为这种(xxx, Predicate pred)类型的函数等价于:
1 2 3 4 while (!pred ()) { wait (lock); }
“惊群效应”。有人觉得此处既然是被唤醒的,肯定是满足条件了,其实不然。如果是多个线程都在等待这个条件,而同时只能有一个线程进行处理,此时就必须要再次条件判断,以使只有一个线程进入临界区处理。
pthread_cond_signal()也可能唤醒多个线程,而如果你同时只允许一个线程访问的话,就必须要使用while来进行条件判断,以保证临界区内只有一个线程在处理。
为什么条件变量需要和锁配合使用?
因为内部是通过判断及修改某个全局变量来决定线程的阻塞与唤醒,多线程操作同一个变量肯定需要加锁来使得线程安全。同时,一个简单的wait函数调用内部会很复杂的,有可能线程A调用了wait函数但是还没有进入到wait阻塞等待前,另一个线程B在此时却调用了notify函数,此时nofity的信号就丢失啦,如果加了锁,线程B必须等待线程A释放了锁并进入了等待状态后才可以调用notify,继而防止信号丢失。
std::future/std::promise/std::packaged_task/std::async/std::shared_future
std::future 与 std::promise
std::future作为异步结果的传输通道,通过get()可以很方便的获取线程函数的返回值,std::promise用来包装一个值,将数据和future绑定起来,而std::packaged_task则用来包装一个调用对象,将函数和future绑定起来,方便异步调用。而std::future是不可以复制的,如果需要复制放到容器中可以使用std::shared_future。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 #include <functional> #include <future> #include <iostream> #include <thread> using namespace std;void func (std::future<int >& fut) { int x = fut.get (); cout << "value: " << x << endl; } int main () { std::promise<int > prom; std::future<int > fut = prom.get_future (); std::thread t (func, std::ref(fut)) ; prom.set_value (144 ); t.join (); return 0 ; }
std::packaged_task
std::packaged_task则用来包装一个调用对象,将函数和future绑定起来,方便异步调用。
理解:
std::future用于访问异步操作的结果,而std::promise和std::packaged_task在future高一层,它们内部都有一个future,promise包装的是一个值,packaged_task包装的是一个函数
packaged_task ≈ promise + function
1 2 3 4 5 6 7 8 9 10 #include <future> #include <iostream> #include <thread> int main () { std::packaged_task<int (int , int ) > task ([](int a, int b) { return a + b; }) ; auto f = task.get_future (); std::thread t (std::move(task), 1 , 2 ) ; std::cout << f.get () << std::endl; if (t.joinable ()) t.join (); }
注意一个future上只能调用一个get函数,第二次会导致程序崩溃 ,所以要想在多线程调用future,得用future.share()方法生成shared_future,当然底层还是只用了一次get函数。
std::async
future与async配合使用,可以从异步任务 中获取结果,std::async用于创建异步任务,实际上就是创建一个线程执行相应任务,返回的结果会保存在 future 中,不需要像 packaged_task 和 promise 那么麻烦,线程操作应优先使用 async
async ≈ thread + packaged_task
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 void work (condition_variable& cv, int & result) { this_thread::sleep_for (2 s); result = 42 ; cv.notify_one (); } int main () { condition_variable cv; mutex cv_mut; int result; scoped_thread th{work, ref (cv), ref (result)}; cout << "I am waiting now\n" ; unique_lock lock{cv_mut}; cv.wait (lock); cout << "Answer: " << result; } int work () { this_thread::sleep_for (2 s); return 42 ; } int main () { auto fut = async (launch::async, work); cout << "I am waiting now\n" ; cout << "Answer: " << fut.get (); } async (std::launch::async | std::launch::deferred, func, args...);
future与promise可以实现多线程同步 线程1初始化一个promise对象和一个future对象,promise传递给线程2,相当于线程2对线程1的一个承诺 ;future相当于一个接受一个承诺 ,用来获取未来线程2传递的值。线程2获取到promise后,需要对这个promise传递有关的数据,之后线程1的future就可以获取数据了。一组promise和future只能使用一次,既不能重复设,也不能重复取。
比如下面的例子,promise和future在这里成对出现,可以看作是一个一次性管道 :有人需要兑现承诺 ,往promise里放东西(set_value);有人就像收期货一样,到时间去future里拿(get)就行了。我们把prom移动给新线程,这样老线程就完全不需要管理它的生命周期了。
1 2 3 4 5 6 7 8 void work (promise<int > prom) { this_thread::sleep_for (2 s); prom.set_value (42 ); } int main () { promise<int > prom; auto fut = prom.get_future (); scoped_thread th{work, move (prom)}; cout << "I am waiting now\n" ; cout << "Answer: " << fut.get ();
std::shared_future
普通的future有个特点,它不能拷贝,只能移动,这就意味着只能有一个线程一个实例可以通过get()拿到对应的结果。
如果想要多个线程多个实例拿到结果,就可以使用shared_future,调用普通 future 的 shared()方法
1 2 3 4 5 6 7 8 9 10 11 12 13 #include <future> #include <iostream> #include <thread> int main () { std::promise<int > prom; auto fu = prom.get_future (); auto shared_fu = fu.share (); auto f1 = std::async (std::launch::async, [shared_fu]() { std::cout << shared_fu.get () << std::endl; }); auto f2 = std::async (std::launch::async, [shared_fu]() { std::cout << shared_fu.get () << std::endl; }); prom.set_value (102 ); f1.get (); f2.get (); }
应用一:两个线程交替打印 用到了unique_lock来管理mutex,还有条件变量condition_variable来通知另一个线程,
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 #include <iostream> #include <thread> #include <mutex> #include <condition_variable> using namespace std; mutex mtx; condition_variable cond_var; bool flag = true ;int maxIter = 10 ;void printA () { while (1 ){ unique_lock<std::mutex> ulock (mtx) ; cond_var.wait (ulock, []{return flag;}); cout << "threadA: " << flag << endl; flag = false ; cond_var.notify_one (); if (--maxIter <= 0 ) break ; } } void printB () { while (1 ){ unique_lock<std::mutex> ulock (mtx) ; cond_var.wait (ulock, []{return !flag;}); cout << "threadB: " << flag << endl; flag = true ; cond_var.notify_one (); if (--maxIter <= 0 ) break ; } } int main () { thread t1 (printA) ; thread t2 (printB) ; t1.join (); t2.join (); }
应用二:生产者消费者模型 多生产者多消费者,利用互斥锁、条件变量、阻塞队列实现
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 #include <iostream> #include <queue> #include <thread> #include <mutex> #include <condition_variable> using namespace std;mutex mtx; condition_variable produce, consume; queue<int > q; int maxSize = 20 ;int maxConsume = 100 ;int maxProduce = 100 ;void consumer () { while (true ) { unique_lock<mutex> lck (mtx) ; consume.wait (lck, [] {return q.size () != 0 ; }); cout << "consumer " << this_thread::get_id () << ": " ; q.pop (); cout << q.size () << '\n' ; produce.notify_all (); if (--maxConsume <= 0 ) break ; } } void producer (int id) { while (true ) { unique_lock<mutex> lck (mtx) ; produce.wait (lck, [] {return q.size () != maxSize; }); cout << "-> producer " << this_thread::get_id () << ": " ; q.push (id); cout << q.size () << '\n' ; consume.notify_all (); if (--maxProduce <= 0 ) break ; } } int main () { thread consumers[2 ], producers[2 ]; for (int i = 0 ; i < 2 ; ++i) { consumers[i] = thread (consumer); producers[i] = thread (producer, i + 1 ); } for (int i = 0 ; i < 2 ; ++i) { producers[i].join (); consumers[i].join (); } }
应用三:线程池 线程池中线程,在线程池中等待并执行分配的任务,用条件变量实现等待与通知机制
基于C++11实现线程池的工作原理
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 class ThreadPool {public : static const int kInitThreadsSize = 3 ; enum taskPriorityE { level0, level1, level2, }; typedef std::function<void ()> Task; typedef std::pair<taskPriorityE, Task> TaskPair; private : typedef std::vector<std::thread*> Threads; typedef std::priority_queue<TaskPair, std::vector<TaskPair>, TaskPriorityCmp> Tasks; Threads m_threads; Tasks m_tasks; std::mutex m_mutex; Condition m_cond; bool m_isStarted; public : ThreadPool () : m_mutex (), m_cond (m_mutex), m_isStarted (false ) {} ~ThreadPool () { if (m_isStarted) stop ();} void start () { m_isStarted = true ; m_threads.reserve (kInitThreadsSize); for (int i = 0 ; i < kInitThreadsSize; ++i) { m_threads.push_back (new std::thread (std::bind (&threadLoop, this ))); } } void stop () { { std::unique_lock<std::mutex> lock (m_mutex) ; m_isStarted = false ; m_cond.notifyAll (); } for (auto it = m_threads.begin (); it != m_threads.end (); ++it) { (*it)->join (); delete *it; } m_threads.clear (); } void addTask (const Task&) { std::unique_lock<std::mutex> lock (m_mutex) ; TaskPair taskPair (level2, task) ; m_tasks.push (taskPair); m_cond.notify (); } private : ThreadPool (const ThreadPool&); const ThreadPool& operator =(const ThreadPool&); struct TaskPriorityCmp { bool operator () (const ThreadPool::TaskPair p1, const ThreadPool::TaskPair p2) { return p1.first > p2.first; } }; void threadLoop () { while (m_isStarted) { Task task = take (); if (task) { task (); } } } Task take () { std::unique_lock<std::mutex> lock (m_mutex) ; while (m_tasks.empty () && m_isStarted) { m_cond.wait (lock); } Task task; auto size = m_tasks.size (); if (!m_tasks.empty () && m_isStarted) { task = m_tasks.top ().second; m_tasks.pop (); } return task; } };
参考:c++11新特性之线程相关所有知识点 by 程序喵大人