一次性事件
- C++标准库模型将这种一次性事件称为期望
future
- 当一个线程需要等待一个特定的一次性事件时,在某种程度上来说它需要知道这个事件在未来的表现形式。之后,这个线程会周期性的等待或检查,事件是否出发
- 在检查期间也会执行其他任务,直到对应的任务触发,而后等待期望的状态会变为就绪
ready
- 在检查期间也会执行其他任务,直到对应的任务触发,而后等待期望的状态会变为就绪
future
- 一个期望可能是数据相关的,也可能不是
- 当事件发生时,这个期望就不能被重置
- 在C++标准库中,有两种期望:
- 唯一期望:
std::future<>
,只能与一个指定事件相关联 - 共享期望:
std::shared_future<>
,能关联多个事件
- 唯一期望:
std::future
的实例
async
- 在不需要立刻得到结果的时候,可以使用
std::async
来启动一个异步任务。 std::async
返回一个std::future
对象,而不是返回一个线程对象,future
对象最终将持有函数的返回值。- 当我们需要这个值时,只要在
future
上调用get()
,线程就会阻塞直到future
就绪,然后返回该值。- 换句话而言,如果任务正常进行中,调用
the_answer.get()
将会阻塞住。如果任务已经运行结束了,调用the_answer.get()
将不会阻塞,并且将得到任务的结果
- 换句话而言,如果任务正常进行中,调用
1 2 3 4 5 6 7 8 9 10 11 12 13 |
//使用future获取异步任务的返回值 #include <future> #include <iostream> int find_the_answer_to_ltuae(); void do_other_stuff(); int main() { std::future<int> the_answer = std::async(find_the_answer_to_ltuae); do_other_stuff(); std::cout << "the_answer is" << the_answer.get() << std::endl; } |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 |
//使用std::async来将参数传递给函数 #include <string> #include <future> struct X { void foo(int,const std::string&); std::string bar(const std::string&); }; X x; //foo(42,"hello") auto f1 = std::async(&X::foo,&x,42,"hello"); //调用tempx.bar("goodbye") auto f2 = std::async(&X::bar,x,"goodbye"); struct Y { double operator() (double); }; Y y; //tempy(3.141) auto f3 = std::async(Y(),3.141); auto f4 = std::async(std::ref(y),2.1341); X baz(X&); // 调用bz(x) std::async(baz, std::ref(x)); class move_only { public: move_only(); move_only(move_only&&); move_only(move_only const&) = delete; move_only& operator=(move_only&&); move_only& operator=(move_only const&) = delete; void operator()(); }; auto f5 = std::async(move_only()); |
- 在默认情况下,期望是否进行等待取决于
std::async
是否启动一个线程,或是否有任务正在进行同步 - 在大多数情况上一条而言,但是,我们还可以在函数调用之前,向
std::async
传递一个std::launch
类型的额外参数,用这个参数来表明函数调用被延迟到wait或get函数调用时才执行std::launch::async
表明函数必须在其所在的独立线程上执行std::launch::deferred|std::launch::async
表明实现可以选择这两种方式的一种,最后一个选项是默认的。当函数调用被延迟,它可能不会在运行了
1 2 3 4 5 6 7 8 9 10 11 12 13 |
// 在新线程上执行 auto f6 = std::async(std::launch::async, Y(), 1.2); // 在wait()或get()调用时执行 auto f7 = std::async(std::launch::deferred, baz, std::ref(x)); // 实现选择执行方式 auto f8 = std::async(std::launch::deferred | std::launch::async, baz, std::ref(x)); auto f9 = std::async(baz, std::ref(x)); // 调用延迟函数 f7.wait(); |
std::launch::async
策略- 当使用这个策略时,任务会在一个独立的线程中立即启动。这确保了任务会异步地运行
- 返回的std::future对象用于检索在新线程中执行的任务的结果
- 一般情况下,这意味着任务得到了真正的并发执行
std::launch::deferred
策略- 使用此策略时,函数的执行会被延迟。
- 函数只有在你首次尝试获取
std::future
的结果时(例如,调用get()
或wait()
)才会执行 - 函数会在调用
get()
或wait()
的同一线程中执行,而不是一个新的线程 - 更像是一种“延迟计算”或“懒惰计算”策略,而不是真正的异步执行
- 组合上面两个策略
- 如果你调用
std::async
而不明确指定启动策略,它将默认为std::launch::async | std::launch::deferred
。 - 这意味着库可以选择其中的任何一个策略来执行。这可能根据库的实现和当前系统的状态来决定
- 如果你调用
- 组合介绍
1 2 3 4 5 6 7 8 |
// fut1 将在一个新的线程中执行 auto fut1 = std::async(std::launch::async, some_function); // fut2 将在调用了get或wait的当前线程中执行 auto fut2 = std::async(std::launch::deferred, some_function); // fut3 将可能在新的线程中执行,也可能在调用了get或wait的当前线程中执行 auto fut3 = std::async(some_function); |
packaged_task
和future
std::packaged_task<>
将一个future
绑定到一个函数或可调用对象上。- 当
std::packaged_task<>
对象被调用时,它就调用相关联的函数或可调用对象,并且让future
就绪,将返回值作为关联数据储存。 std::package_task<>
类模板的模板参数为函数签名。当我们构造一个实例的时候,必须传入一个函数或可调用对象,它可以接受指定的参数并返回指定的返回类型- 指定函数签名的返回类型可以用来标识,从
get_future()
返回的std::future<>
的类型,不过函数签名的参数列表,可以用来指定“打包任务”的函数调用操作符。
1 2 3 4 5 6 7 8 9 |
template<> class package_task<std::string(std::vector<char>*,int)> { public: template<typename Callable> explicit packaged_task(Callable&& f); std::future<std::string> get_future(); void operator() (std::vector<char>*,int); }; |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 |
//在线程之间传递任务 #include <deque> #include <mutex> #include <future> #include <thread> #include <utility> using namespace std; mutex m; deque<packaged_task<void()>> tasks; bool gui_shutdown_message_received(); void get_and_process_gui_message(); void gui_thread()//1 { while(!gui_shutdown_message_received())//2 { get_and_process_gui_message();//3 packaged_task<void()> task; { lock_guard<mutex> lk(m); if(tasks.empty())//4 continue; task = move(tasks.front());//5 tasks.pop_front(); } task();//6 } } thread gui_bg_thread(gui_thread); template<typename Func> future<void> post_task_for_gui_thread(Func f) { packaged_task<void()> task(f);//7 future<void> res = task.get_future();//8 lock_guard<mutex> lk(m); tasks.push_back(move(task));//9 return res;//10 } |
- 线程循环收消息直到收到GUI停止的消息
- 反复轮询处理GUI消息
- 判断任务队列中的消息,没有消息再次循环,否则从队列中提取任务
- 解除队列中的锁并允许任务
- 在队列上利用提供的函数创建一个新的任务包,通过调用get_future成员函数从任务中获取future
- 将任务置于列表之上
- 重复等待一次性事件
- 检查
future
是否准备好了结果,如果没有再去做其他事情,等其他事情处理完毕了,再重新检查 - 如果此时已经就绪了,就处理,如果还是没有继续,继续循环处理其他事情
- 检查
1 2 3 4 5 6 7 8 9 10 11 12 13 |
std::packaged_task<int(int, int)> task([](int a, int b) { return a + b; }); std::future<int> fut = task.get_future(); // Get future task(2, 3); // Do something... auto status = fut.wait_for(std::chrono::milliseconds(100)); if (status == std::future_status::ready) { int result = fut.get(); // 处理结果 } else { // 结果还没处理好,去做其他事情 } |
std::promise
std::promise
提供设置值的方式,它可以在这之后通过相关联的std::future<T>
对象进行读取。- 一对
std::promise
和std::future
为这一设施提供了一个可能的机制:等到中的线程可以阻塞future
,同时提供数据的线程可以使用配对中的promise
项,来设置相关的值并使future
就绪。 - 可以通过
get_future()
成员函数来获取一个给定的std::promise
相关的std::future
对象,就像是与std::packaged_task
相关。- 当promise的值已经设置完毕,对应期望的状态变为就绪,并且可用于检索已存储的值。
- 当在设置值之前销毁
std::promise
,将会存储一个异常。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 |
//使用promise在单个线程处理多个连接 #include <future> void process_connections(connection_set& connections) { while(!done(connection)) { for(connection_iterator connection = connections.begin(), end = connections.end(); ++connection) { if(connection->has_incoming_data()) { data_packet data = connection->incoming(); promise<payload_type>& p = connection->get_promise(data.id); p.set_value(data.payload); } if(connection->has_outgoing_data()) { outgoing_packet data = connection->top_of_outgoing_queue(); connect->send(data.payload); data.promise.set_value(true); } } } } |
promise
换句话讲,就是在创建线程的时候,把他作为参数std::move
进去,这样子线程可以设置数据或异常给主线程- 需要在
std::move
之前保存promise
对应的future
- 并且需要调用
future
的get
方法等待数据 - 不过调用了
get
方法后,主线程会被阻塞,直到子线程里面调用了promise
的set_value
或set_exception
- 需要在
1 2 3 4 5 6 7 8 9 10 11 12 13 14 |
void foo(std::promise<int> intPromise) { // Do some work... intPromise.set_value(42); } int main() { std::promise<int> prom; std::future<int> fut = prom.get_future(); std::thread t(foo, std::move(prom)); int value = fut.get(); std::cout << "Got the value from thread: " << value << std::endl; t.join(); return 0; } |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 |
void foo(std::promise<int> intPromise) { try { // ... some code that might throw throw std::runtime_error("An error occurred!"); } catch (...) { intPromise.set_exception(std::current_exception()); } } int main() { std::promise<int> prom; std::future<int> fut = prom.get_future(); std::thread t(foo, std::move(prom)); try { int value = fut.get(); } catch (const std::runtime_error &e) { std::cout << "Caught exception: " << e.what() << std::endl; } t.join(); return 0; } |
为future
保存异常
- 传参负数,产生异常
1 2 3 4 5 6 |
double square_root(double x) { if (x < 0) { throw std::out_of_range("x < 0"); } return sqrt(x); } |
- 如果调用square_root函数的是别的线程
1 2 3 4 5 |
double y = square_root(-1); // 将上面的调用改为异步调用 std::future<double> f = std::async(square_root, -1); double res = f.get(); |
- 如上述,函数作为
std::async
的一部分时,当在调用抛出一个异常,这个异常就会存储到期望的结果数据中,之后期望的状态被置为就绪,之后调用get函数时就会抛出这个存储的异常。- 将函数打包入
std::packaged_task
任务包中后,这个任务被调用时,同样会发生,当打包函数抛出一个异常,这个异常会被存储到期望的结果中,准备在调用get时再次抛出
- 将函数打包入
- 使用异常填充promise
1 2 3 4 5 6 7 8 9 |
extern std::promise<double> some_promise; try { some_promise.set_value(calculate_value()); } catch(...) { some_promise.set_exception(std::current_exception()); } |
多个线程的等待
- 虽然
std::future
可以处理所有在线程间数据转移的必要同步,但是调用某一特殊std::future
对象的成员函数,就会让这个线程的数据和其他线程的数据不同步- 当多个线程没有额外同步的情况下,访问一个独立的
std::future
对象时,就会有数据竞争和未定义的行为,这是因为:std::future
模型独享同步结果的所有权,并且通过调用get函数,一次性的获取数据,这就让并发访问变得没有意义了(只有一个线程可以获取结果值,因为在第一次调用get之后,就没有值可以获取了)
- 当多个线程没有额外同步的情况下,访问一个独立的
std::shared_future
可以解决让多个线程等待同一个事件。- 在每一个
std::shared_future
的独立对象 上成员函数调用返回的结果还是不同步的,所以为了在多个线程访问一个独立对象时,避免数据竞争,必须使用锁来对访问进行保护 - 优先使用的方法:
为了替代只有一个拷贝对象的情况,可以让每个线程都拥有自己对应的拷贝对象。这样,当每个线程都通过自己拥有的std::shared_future
对象获取结果,那么多个线程访问共享不同结果就是安全的。
- 在每一个
1 2 3 4 5 6 |
std::promise<int> p; std::future<int> f(p.get_future()); assert(f.valid()); // 1 "期望" f 是合法的 std::shared_future<int> sf(std::move(f)); assert(!f.valid()); // 2 "期望" f 现在是不合法的 assert(sf.valid()); // 3 sf 现在是合法的 |
1 2 |
std::promise<std::string> p; std::shared_future<std::string> sf(p.get_future()); // 1 隐式转移所有权 |
1 2 3 |
std::promise< std::map< SomeIndexType, SomeDataType, SomeComparator, SomeAllocator>::iterator> p; auto sf=p.get_future().share(); |
限定等待时间
时延超时方式
- 指定需要一段时间,如30毫秒
绝对超时方式
- 指定一个时刻
有时间限制的等待
condition_variable
有两个重载的wait_for
成员函数和两个重载的wait_until
成员函数
时钟
- 当前时间
std::chrono::system_clock::now()
- 非稳定的,因为时钟可调,也就是这种是完全自动适应本地账户的调节
std::chrono::steady_clock
- 稳定的
std::chrono::hig_resolution_clock
- 最小节拍周期,因而最高的精度的时钟
时延
std::chrono::duration<>
函数模板能够对时延进行处理- 第一个参数是类型表示
- 第二个参数是所用秒数
1 2 3 4 5 6 7 8 |
// 几分钟,short寸 60秒1分钟 std::chrono::duration<short, std::ratio<60, 1>> // 毫秒,double寸 1秒1000毫秒 std::chrono::duration<double, std::ratio<1, 1000>> // 预定义 std::chrono::seconds s = std::chrono::duration_cast<std::chrono::seconds>(ms); |
1 2 3 4 5 6 |
// 等待一个期望的状态变为就绪已经35毫秒 std::future<int> f = std::sync(some_task); if (f.wait_for(std::chrono::milliseconds(35)) == std::future_status::ready) { do_something_with(f.get()); } |
时间点
1 2 3 4 5 6 |
auto start=std::chrono::high_resolution_clock::now(); do_something(); auto stop=std::chrono::high_resolution_clock::now(); std::cout<<”do_something() took “ <<std::chrono::duration<double,std::chrono::seconds>(stop-start).count() <<” seconds”<<std::endl; |
实例
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 |
#include <condition_variable> #include <mutex> #include <chrono> std::condition_variable cv; bool done; std::mutex m; bool wait_loop() { auto const timeout = std::chrono::steady_clocl::now() + std::chrono::milliseconds(500); std::unique_lock<std::mutex> lk; while(!done) { if (cv.wait_until(lk, timeout) == std::cv_status::timeout) { break; } } return done; } |
可接受超时的函数
本文为原创文章,版权归Aet所有,欢迎分享本文,转载请保留出处!
你可能也喜欢
- ♥ 深入理解C++11:C++11新特性解析与应用 一12/21
- ♥ C++20_第二篇03/21
- ♥ Bkwin一12/01
- ♥ 线程和协程10/31
- ♥ macOs 解析mach-o05/11
- ♥ 51CTO:Linux C++网络编程一08/13