等待事件或其他条件
- 如果一个线程正等待着第二个线程完成一项任务,它有几个选择。
- 可以一直检测共享数据(由互斥量保护)中的标识,并且让第二个线程在完成任务时设置该标识。
浪费资源 - 使用
std::this_thread::sleep_for()
,让等待的线程在检查之间休眠一会儿
得到正确的休眠时间比较难 - 使用C++标准库提供的工具来等待事件本身。
- 可以一直检测共享数据(由互斥量保护)中的标识,并且让第二个线程在完成任务时设置该标识。
- 等待由另一个线程触发一个事件的最基本机制是条件变量
第二种选择:
1 2 3 4 5 6 7 8 9 10 11 12 13 |
bool flag; std::mutex m; void wait_for_flag() { std::unique_lock<std::mutex> lk(m); while(!flag) { lk.unlock();//1 std::this_thread::sleep_for(std::chrono::milliseconds(100));//2 lk.lock();//3 } } |
- 解锁互斥量
- 解锁之后让函数休眠一段时间,这段时间之后再次锁定
- 在解锁的这段时间里,其他线程有机会获取它并设置标识
第三个选择
- 通过另一线程触发等待事件的机制是最基本的唤醒方式,这种机制就称为“条件变量”。
- 从概念上来说,条件变量与某些事件或其他条件相关,并且一个或多个线程可以等待该条件被满足。
- 当某个线程已经确定条件得到满足,它就可以通知一个或多个正在条件变量上进行等待的线程,以便唤醒它们并让它们继续处理。
等待条件达成
condition_variable
- 仅限于和
std::mutex
一起工作
condition_variable_any
- 可以与符合类似互斥量的最低标准的任何东西一起工作
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 |
//condition_variable等待数据 #include <condition_variable> #include <mutex> #include <queue> mutex mut; queue<data_chunk> data_queue;//1 condition_variable data_cond; void data_preparation_thread() { while(more_data_to_prepare()) { const data_chunk data = prepare_data(); lock_guard<mutex> lk(mut); data_queue.push(data);//2 data_cond.notify_one();//3 } } void data_processing_thread() { while(true) { unique_lock<mutex> lk(mut);//4 data_cond.wait(lk,[]{return !data_queue.empty();});//5 data_chunk data = data_queue.front(); data_queue.pop(); lk.unlock();//6 process(data); if(is_last_chunk(data)) break; } } |
- 首先创建一个在两个线程间传递数据的队列
- 当数据准备就绪时,使用
lock_guard
去锁定保护队列的互斥量,并将数据压入队列 - 然后在
condition_variable
的实例上调用notify_one
函数,以通知等待中的线程(如果有的话) - 在等待中的线程中,以
unique_lock
锁定互斥量- 等待中的线程必须在等待期间解锁互斥量,并在这之后重新将其锁定,
lock_guard
没有这样的灵活性
- 等待中的线程必须在等待期间解锁互斥量,并在这之后重新将其锁定,
- 然后在
condition_variable
上调用wait
,传入锁对象以及表示正在等待的条件lambda
函数(检查到队列不为空时,即有数据处理时) wait
接下来检查判断条件,整个lambda
表达式返回的结果参数为true
时,返回。如果不满足条件,也就是队列中没有数据处理,lambda
返回false
,wait
将会解锁该互斥量,并将该线程置于等待或者阻塞状态。unique_lock
不仅适用于对wait
的调用,还可用于有待处理但未处理的数据
使用条件变量建立一个线程安全队列
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 |
//队列接口 template <class T,class Container = std::deque<T>> class queue { public: explicit queue(const Container&); explicit queue(Container&& = Container()); template<class Alloc> explicit queue(const Alloc&); template<class Alloc> queue(const Container&,const Alloc&); template<class Alloc> queue(Container&&,const Alloc&); template<class Alloc> queue(queue&&,const Alloc&); void swap(queue& q); bool empty() const; size_type size() const; T& front(); const T& front() const; T& back(); const T& back() const; void push(const T& x); void push(T&& x); void pop(); template<class... Args> void emplace(Args&&... args); }; |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 |
//threadsafe_queue #include <queue> #include <memory> #include <mutex> #include <condition_variable> template<typename T> class threadsafe_queue { public: threadsafe_queue() {} threadsafe_queue(const threadsafe_queue& other) { std::lock_guard<mutex> lk(mut); data_queue = other.data_queue; } threadsafe_queue& operator=(const threadsafe_queue&) = delete; void push(T new_value) { lock_guard<mutex> lk(mut); data_queue.push(new_value); data_cond.notify_one(); } bool try_pop(T& value)//1 { lock_guard<mutex> lk(mut); if(data_queue.empty()) return false; value = data_queue.front(); data_queue.pop(); return true; } std::shared_ptr<T> try_pop()//2 { lock_guard<mutex> lk(mut); if(data_queue.empty()) return shared_ptr<T>(); shared_ptr<T> res(make_shared<T>(data_queue.front())); data_queue.pop(); return res; } void wait_and_pop(T& value) { unique_lock<mutex> lk(mut); data_cond.wait(lk,[this]{return !data_queue.empty();}); value = data_queue.front(); data_queue.pop(); } shard_ptr<T> wait_and_pop() { unique_lock<mutex> lk(mut); data_cond.wait(lk,[this]{return !data_queue.empty();}); shared_ptr<T> res(make_shared<T>(data_queue.front())); data_queue.pop(); return res; } bool empty() const { lock_guard<mutex> lk(mut); return data_queue.empty(); } public: threadsafe_queue<data_chunk> data_queue;//11 void data_preparation_thread() { while(more_data_to_prepare()) { const data_chunk data=prepare_data(); data_queue.push(data);//12 } } void data_processing_thread() { while(true) { data_chunk data; data_queue.wait_and_pop(data);//13 process(data); if(is_last_chunk(data)); break; } } private: mutex mut; queue<T> data_queue; condition_variable data_cond; }; |
本文为原创文章,版权归Aet所有,欢迎分享本文,转载请保留出处!
你可能也喜欢
- ♥ Soui二05/18
- ♥ Boost 程序库完全开发指南:工具与字符串08/22
- ♥ COM组件_207/22
- ♥ 51CTO:Linux C++网络编程五08/20
- ♥ C++编程规范101规则、准则与最佳实践 二01/07
- ♥ Soui七06/02