thread
相关
创建线程
- 使用
std::thread
构造函数创建线程,需传递一个可调用对象(函数、Lambda
、函数对象等)
1 2 3 4 5 6 7 8 9 10 11 12 |
#include <iostream> #include <thread> void my_function() { std::cout << "Thread is running..." << std::endl; } int main() { std::thread t(my_function); // 创建并启动线程 // ... 其他操作 return 0; } |
可调用对象
- 普通函数(函数指针)
1 2 3 4 5 6 7 8 9 10 11 |
#include <thread> void myFunction(int param) { // 线程逻辑 } int main() { std::thread t(myFunction, 42); // 传递函数指针和参数 t.join(); return 0; } |
- 函数对象(仿函数,
Functor
)- 定义一个重载了
operator()
的类:
- 定义一个重载了
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 |
#include <thread> class MyFunctor { public: void operator()(int param) { // 线程逻辑 } }; int main() { MyFunctor functor; std::thread t(functor, 42); // 传递函数对象和参数 t.join(); return 0; } |
lambda
表达式- 直接在线程构造函数中定义行为:
1 2 3 4 5 6 7 8 9 10 |
#include <thread> int main() { int localVar = 42; std::thread t([&localVar]() { // 线程逻辑(可捕获局部变量) }); t.join(); return 0; } |
- 成员函数(成员函数指针)
- 需要绑定对象实例和成员函数:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 |
#include <thread> class MyClass { public: void myMethod(int param) { // 线程逻辑 } }; int main() { MyClass obj; std::thread t(&MyClass::myMethod, &obj, 42); // 成员函数指针 + 对象指针 + 参数 t.join(); return 0; } |
std::function
包装的可调用对象
1 2 3 4 5 6 7 8 9 10 11 |
#include <thread> #include <functional> void myFunction(int param) { /* ... */ } int main() { std::function<void(int)> func = myFunction; std::thread t(func, 42); // 通过 std::function 传递 t.join(); return 0; } |
std::bind
生成的绑定对象- 适配参数或绑定对象实例:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 |
#include <thread> #include <functional> void myFunction(int a, double b) { /* ... */ } class MyClass { public: void myMethod(int param) { /* ... */ } }; int main() { // 绑定普通函数的参数 auto boundFunc = std::bind(myFunction, 42, 3.14); std::thread t1(boundFunc); t1.join(); // 绑定成员函数和对象实例 MyClass obj; auto boundMethod = std::bind(&MyClass::myMethod, &obj, 100); std::thread t2(boundMethod); t2.join(); return 0; } |
join
- 阻塞当前线程(通常是主线程),直到被调用的线程执行完毕
- 确保线程执行完成后才继续后续操作
1 2 3 4 5 6 |
int main() { std::thread t(my_function); t.join(); // 主线程在此等待 t 完成 std::cout << "Main thread continues." << std::endl; return 0; } |
- 为什么必须调用
join()
或detach()
?- 在
std::thread
对象销毁前,必须调用join()
或detach()
,否则程序会调用std::terminate
终止 - 设计意图:强制开发者明确管理线程的生命周期,避免悬空线程或资源泄漏
- 在
1 2 3 |
std::thread t(worker); // 没有调用 t.join() 或 t.detach() // 程序会崩溃! |
join()
的阻塞行为- 调用
join()
的线程会阻塞,直到目标线程结束 - 适用场景:需要等待子线程完成后再继续主线程逻辑(例如计算结果依赖子线程的输出)
- 调用
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 |
void worker(int id) { std::cout << "线程 " << id << " 开始工作..." << std::endl; std::this_thread::sleep_for(std::chrono::seconds(1)); } int main() { std::thread t1(worker, 1); std::thread t2(worker, 2); t1.join(); // 主线程等待 t1 t2.join(); // 主线程等待 t2 std::cout << "所有线程完成!" << std::endl; return 0; } |
- 注意:确保被
join()
的线程对象仍然有效
1 2 3 4 5 6 7 |
std::thread t(worker); t.join(); // 正确 // t 对象仍有效 std::thread* pt = new std::thread(worker); pt->join(); // 正确 delete pt; // 需手动释放内存 |
- 注意:一个线程对象只能被
join()
一次- 调用后线程对象不再关联任何线程
1 2 3 |
std::thread t(worker); t.join(); // 正确 t.join(); // ❌ 错误!线程已结束,t 不再关联任何线程 |
- 注意:如果线程函数抛出异常,需确保
join()
仍会被调用:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 |
void worker() { throw std::runtime_error("子线程出错!"); } int main() { std::thread t(worker); try { // 主线程可能在此抛出异常 t.join(); } catch (const std::exception& e) { std::cerr << "捕获异常: " << e.what() << std::endl; t.join(); // 仍需要清理资源 } return 0; } |
detach
- 将线程与
std::thread
对象分离,使其在后台独立运行- 调用
detach()
后,主线程不再管理子线程的生命周期,子线程独立运行 - 主线程继续执行,不会等待子线程完成
- 子线程结束后,其资源由
C++
运行时库自动回收
- 调用
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 |
#include <iostream> #include <thread> void worker() { std::cout << "子线程开始工作..." << std::endl; std::this_thread::sleep_for(std::chrono::seconds(2)); std::cout << "子线程结束工作!" << std::endl; } int main() { std::thread t(worker); t.detach(); // 主线程与子线程分离 std::cout << "主线程继续执行..." << std::endl; std::this_thread::sleep_for(std::chrono::seconds(3)); // 等待足够时间让子线程完成 return 0; } //输出(可能): //主线程继续执行... //子线程开始工作... //子线程结束工作! |
- 为什么必须调用
join()
或detach()
?- 在
std::thread
对象销毁前,必须调用join()
或detach()
,否则程序会调用std::terminate
终止 - 设计意图:强制开发者明确管理线程的生命周期,避免悬空线程或资源泄漏
- 在
detach
使用场景- 后台任务:例如日志记录、心跳检测、网络监听等不需要同步的任务
- 不关心结果:当子线程的执行结果不影响主线程逻辑时
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 |
#include <fstream> #include <thread> void logToFile(const std::string& message) { std::ofstream file("log.txt", std::ios::app); if (file.is_open()) { file << message << std::endl; } } int main() { std::thread logger([]() { while (true) { logToFile("心跳检测..."); std::this_thread::sleep_for(std::chrono::seconds(1)); } }); logger.detach(); // 分离日志线程 // 主线程继续其他任务 std::this_thread::sleep_for(std::chrono::seconds(5)); return 0; } |
- 注意:分离后,子线程可能访问已销毁的对象,需确保数据有效性:
1 2 3 4 5 6 7 8 9 10 11 12 |
void worker(const std::string& message) { std::cout << message << std::endl; } int main() { std::string msg = "Hello"; std::thread t(worker, msg); // msg 按值传递,避免悬空引用 t.detach(); // msg 销毁后,子线程仍安全 return 0; } |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 |
// 错误的 void worker(const std::string& message) { std::this_thread::sleep_for(std::chrono::seconds(1)); std::cout << message << std::endl; // msg 可能已销毁! } int main() { std::thread t; { std::string msg = "Hello"; t = std::thread(worker, std::ref(msg)); // 按引用传递 } // msg 在此销毁 t.detach(); // 子线程访问已销毁的 msg std::this_thread::sleep_for(std::chrono::seconds(2)); return 0; } |
- 注意:一个线程对象只能调用一次
detach()
:
1 2 3 |
std::thread t(worker); t.detach(); // 正确 t.detach(); // ❌ 错误!线程已分离,t 不再关联任何线程 |
- 注意:分离后无法通过
std::thread
对象获取线程状态或结果:
1 2 3 4 5 6 |
std::thread t(worker); t.detach(); if (t.joinable()) { t.join(); // ❌ 错误!线程已分离 } |
joinable
- 作用:
- 检查线程状态:判断线程对象是否关联一个活跃的线程(即线程已启动但未调用
join()
或detach()
) - 安全操作:避免对无效线程调用
join()
或detach()
,防止程序崩溃
- 检查线程状态:判断线程对象是否关联一个活跃的线程(即线程已启动但未调用
- 函数原型:
- 返回
true
:线程对象关联一个活跃线程,可以调用join()
或detach()
- 返回
false
:线程对象未关联线程,或线程已被join()
/detach()
- 返回
1 |
bool joinable() const noexcept; |
- 何时返回
true
(满足以下条件时,joinable()
返回true
:)- 线程已通过构造函数启动(例如
std::thread t(func)
) - 尚未对该线程调用
join()
或detach()
- 线程对象未被移动(例如
std::thread t2 = std::move(t1)
,移动后t1
变为未关联)
- 线程已通过构造函数启动(例如
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 |
#include <thread> void worker() {} int main() { std::thread t1; // 默认构造,未关联线程 std::cout << t1.joinable(); // false std::thread t2(worker); // 关联活跃线程 std::cout << t2.joinable(); // true t2.join(); // 调用 join() std::cout << t2.joinable(); // false std::thread t3(worker); t3.detach(); // 调用 detach() std::cout << t3.joinable(); // false return 0; } |
- 注意:
- 调用
join()
或detach()
前,建议检查线程是否可操作
- 调用
1 2 3 |
if (t.joinable()) { t.join(); // 或 t.detach() } |
join
和detach
对比
- 概述
特性 | join() |
detach() |
线程所有权 | 主线程等待子线程完成 | 子线程独立运行,与主线程分离 |
资源释放 | 自动释放资源 | 系统自动释放资源(但需自行管理生命周期) |
典型场景 | 需要等待结果 | 后台任务(如日志、网络心跳) |
- 最佳实践
- 优先使用
join()
在需要同步或处理子线程结果时,优先使用join()
,避免数据竞争和生命周期问题 - 谨慎使用
detach()
仅在以下情况下使用detach()
:
1 子线程完全独立,无需与主线程交互
2 确保子线程不会访问无效数据(如局部变量) - 使用
RAII
管理线程
通过封装类或std::jthread
(C++20
)自动管理线程生命周期
- 优先使用
线程ID
- 通过
std::this_thread::get_id()
获取当前执行线程的 ID
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 |
#include <iostream> #include <thread> void worker() { std::thread::id this_id = std::this_thread::get_id(); std::cout << "子线程 ID: " << this_id << std::endl; } int main() { std::thread::id main_id = std::this_thread::get_id(); std::cout << "主线程 ID: " << main_id << std::endl; std::thread t(worker); t.join(); return 0; } |
- 线程对象的
ID
- 使用
t.get_id()
获取线程唯一标识
- 使用
1 2 3 |
std::thread t(worker); std::thread::id t_id = t.get_id(); std::cout << "线程对象 t 的 ID: " << t_id << std::endl; |
- 线程
ID
的平台相关性std::thread::id
是C++
标准库的逻辑ID
,与操作系统线程ID
(如Linux
的pthread_t
或Windows
的DWORD
)无关- 若需获取底层线程
ID
,可通过std::thread::native_handle()
,但会牺牲可移植性
1 2 |
std::thread t(worker); pthread_t native_id = t.native_handle(); // Linux 示例 |
- 注意:
- 在
C++
标准库中,通过std::this_thread::get_id()
或std::thread::get_id()
获取的线程ID
(类型为std::thread::id
)并不是操作系统底层的实际线程ID
- 返回平台相关的线程句柄,可进一步转换为原生线程
ID
- 在
1 2 |
std::thread t(worker); pthread_t native_handle = t.native_handle(); // Linux 示例 |
线程移动语意
std::thread
不可复制,但可通过移动转移所有权:
1 2 |
std::thread t1(my_function); std::thread t2 = std::move(t1); // t1 不再拥有线程 |
竞态条件
- 多个线程访问共享数据时需使用同步机制(如
std::mutex
)
1 2 3 4 5 6 7 |
std::mutex mtx; int shared_data = 0; void increment() { std::lock_guard<std::mutex> lock(mtx); shared_data++; } |
- 死锁
- 避免在多个锁之间形成循环等待
- 按固定顺序加锁,或使用
std::lock
同时加锁:
1 2 3 4 5 6 7 8 |
std::mutex mtx1, mtx2; void safe_operation() { std::lock(mtx1, mtx2); // 同时锁定两个互斥量 std::lock_guard<std::mutex> lock1(mtx1, std::adopt_lock); std::lock_guard<std::mutex> lock2(mtx2, std::adopt_lock); // 操作共享数据 } |
传递参数
- 默认行为
- 参数按值拷贝到线程的独立内存空间,但需注意参数的生存期
- 向线程传递参数时,参数会被拷贝或移动,确保生命周期
1 2 3 4 5 6 |
void print(int x) { std::cout << x; } int main() { int value = 42; std::thread t(print, value); // 值传递,避免悬空引用 t.join(); } |
1 2 3 4 5 6 7 8 |
// 错误 void print(const std::string& s) { /* ... */ } int main() { std::thread t(print, "hello"); // 传递临时字符串的引用,可能导致悬空 t.join(); } |
1 2 3 |
// 正确 std::thread t(print, std::string("hello")); // 显式构造临时对象 |
- 传递引用
- 若需传递引用,使用
std::ref
(但需确保引用的对象生命周期足够长):
- 若需传递引用,使用
1 2 3 4 5 6 7 8 |
void modify(int& x) { x = 42; } int main() { int value = 0; std::thread t(modify, std::ref(value)); // 正确传递引用 t.join(); std::cout << value; // 输出 42 } |
异常安全
- 若线程函数抛出异常且未被捕获,程序会调用
std::terminate()
- 正确做法:在线程函数内部用
try/catch
捕获所有异常
- 正确做法:在线程函数内部用
1 2 3 4 5 6 7 |
void thread_func() { try { // 可能抛出异常的代码 } catch (...) { // 处理异常 } } |
- 如果主线程可能抛出异常,需确保异常前已处理子线程:
1 2 3 4 5 6 7 8 9 10 |
int main() { std::thread t(my_function); try { // 可能抛出异常的代码 } catch (...) { t.join(); // 或 t.detach() throw; } t.join(); } |
线程池
- 线程的创建和销毁成本较高,频繁创建线程可能导致性能下降
- 推荐使用线程池(如第三方库或
C++23
的std::execution
)
- 推荐使用线程池(如第三方库或
- 线程数量不应超过硬件支持的并发线程数(通过
std::thread::hardware_concurrency()
获取)
jthread
相关
- 通过包装类(如
C++20
的std::jthread
)自动管理线程生命周期,避免遗漏- 析构时自动调用
join()
,支持协作式取消(通过request_stop()
)
- 析构时自动调用
1 2 3 4 |
// C++20 示例:std::jthread 自动 join { std::jthread t(my_function); // 析构时自动调用 join() } |
1 2 3 4 5 6 |
std::jthread t([](std::stop_token st) { while (!st.stop_requested()) { // 执行任务 } }); t.request_stop(); // 请求停止 |
std::stop_token
- 用于优雅地终止线程
多线程同步相关
mutex
1 2 3 4 5 6 7 8 |
#include <mutex> std::mutex mtx; void safeIncrement(int& counter) { mtx.lock(); counter++; // 临界区操作 mtx.unlock(); // 必须手动解锁 } |
mutex
-自动锁管理(RAII
)
std::lock_guard
- 自动在作用域内加锁/解锁,不可手动控制
1 2 3 4 |
void safeIncrement(int& counter) { std::lock_guard<std::mutex> lock(mtx); counter++; // 自动加锁/解锁 } |
std::unique_lock
- 更灵活的锁(可延迟加锁、手动控制)
1 2 3 4 5 6 |
void safeIncrement(int& counter) { std::unique_lock<std::mutex> lock(mtx, std::defer_lock); lock.lock(); // 手动加锁 counter++; lock.unlock(); // 可提前解锁 } |
- 其他
mutex
类型
类型 | 特性 |
std::recursive_mutex |
允许同一线程多次加锁(解决递归函数中的锁重入问题) |
std::timed_mutex |
支持超时加锁(try_lock_for , try_lock_until ) |
std::shared_mutex (C++17) |
读写锁:写独占,读共享(lock_shared() 为读锁,lock() 为写锁) |
condition_variable
条件变量
- 用于线程间通知,避免忙等待(
Busy Waiting
) - 基本用法
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 |
#include <condition_variable> std::mutex mtx; std::condition_variable cv; bool data_ready = false; // 等待线程 void consumer() { std::unique_lock<std::mutex> lock(mtx); cv.wait(lock, [] { return data_ready; }); // 等待条件满足 // 处理数据... } // 通知线程 void producer() { { std::lock_guard<std::mutex> lock(mtx); data_ready = true; } cv.notify_one(); // 通知一个等待线程 } |
- 注意事项
- 虚假唤醒(
Spurious Wakeup
):wait
需在条件检查循环中使用: - 批量通知:
cv.notify_all()
唤醒所有等待线程
- 虚假唤醒(
1 2 |
cv.wait(lock, [] { return data_ready; }); // 等价于: while (!data_ready) cv.wait(lock); |
原子操作
- 无需锁的轻量级线程安全操作,适用于简单数据类型
std::atomic
类型
1 2 3 4 5 6 |
#include <atomic> std::atomic<int> counter(0); void increment() { counter++; // 原子操作 } |
- 内存顺序
- 指定原子操作的内存同步语义(如
memory_order_relaxed
,memory_order_seq_cst
):
- 指定原子操作的内存同步语义(如
1 2 3 4 5 6 7 8 |
std::atomic<bool> flag(false); void worker() { flag.store(true, std::memory_order_release); } void reader() { while (!flag.load(std::memory_order_acquire)); } |
死锁
-
不死锁条件
- 互斥资源竞争
- 持有并等待
- 不可剥夺
- 循环等待
-
死锁避免方法:
- 固定加锁顺序:所有线程按相同顺序获取锁
- 使用
std::lock
批量加锁:
1 2 3 4 5 6 7 |
std::mutex mtx1, mtx2; void safeOperation() { std::lock(mtx1, mtx2); // 同时加锁,避免死锁 std::lock_guard<std::mutex> lock1(mtx1, std::adopt_lock); std::lock_guard<std::mutex> lock2(mtx2, std::adopt_lock); // 操作共享资源 } |
线程安全的数据结构设计
- 可以通过封装共享数据
- 将共享数据与互斥量封装在同一个类中
- 通过接口控制访问
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 |
#include <queue> #include <mutex> #include <condition_variable> template <typename T> class ThreadSafeQueue { public: void push(const T& value) { std::lock_guard<std::mutex> lock(mtx); data_queue.push(value); cv.notify_one(); } bool try_pop(T& value) { std::lock_guard<std::mutex> lock(mtx); if (data_queue.empty()) return false; value = data_queue.front(); data_queue.pop(); return true; } void wait_and_pop(T& value) { std::unique_lock<std::mutex> lock(mtx); cv.wait(lock, [this] { return !data_queue.empty(); }); value = data_queue.front(); data_queue.pop(); } private: std::queue<T> data_queue; std::mutex mtx; std::condition_variable cv; }; |
注意事项
- 减少锁的粒度
- 使用细粒度锁(如每个资源一个锁)
- 避免在临界区执行耗时操作(如
I/O
)
- 无锁编程(
Lock-Free
)- 适用场景:高并发简单操作(如计数器)
- 使用原子操作或
std::atomic
实现
- 线程局部存储(
TLS
)- 使用
thread_local
关键字避免共享:
- 使用
1 |
thread_local int local_counter = 0; // 每个线程独立副本 |
多线程同步其他
std::lock_guard
- 特性
- 仅提供作用域内的自动加锁/解锁
- 构造时立即加锁,析构时自动解锁
- 无法转移所有权
- 场景
- 简单的临界区保护(无需延迟加锁或提前解锁)
- 需要轻量级锁管理的场景
- 优点与限制
优点 | 限制 |
代码简洁,无手动管理风险 | 无法手动控制加锁/解锁时机 |
性能开销极小 | 不支持条件变量 |
无法与其他锁配合批量加锁 |
- 示例
1 2 3 4 5 6 7 8 9 |
#include <mutex> #include <iostream> std::mutex mtx; void safePrint(const std::string& msg) { std::lock_guard<std::mutex> lock(mtx); // 构造时加锁 std::cout << msg << std::endl; } // 析构时自动解锁 |
std::unique_lock
- 特性
- 支持手动加锁/解锁、延迟加锁、超时加锁
- 通过移动语义转移锁的所有权
- 可与
std::condition_variable
配合使用
- 场景
- 需要延迟加锁或提前解锁
- 配合条件变量实现线程间通信
- 批量加锁多个互斥量(避免死锁)
- 构造函数模式
模式 | 说明 |
std::defer_lock |
延迟加锁(需手动调用 lock() ) |
std::try_to_lock |
尝试加锁(非阻塞) |
std::adopt_lock |
假定已持有锁(直接接管锁的所有权) |
- 优点与限制
优点 | 限制 |
支持灵活加锁/解锁 | 性能略高于 std::lock_guard |
可配合条件变量使用 | 代码复杂度稍高 |
支持批量加锁(std::lock ) |
- 示例代码
1 2 3 4 5 6 7 8 9 10 11 |
#include <mutex> #include <iostream> std::mutex mtx; void flexiblePrint(const std::string& msg) { std::unique_lock<std::mutex> lock(mtx, std::defer_lock); // 延迟加锁 lock.lock(); // 手动加锁 std::cout << msg << std::endl; lock.unlock(); // 手动提前解锁 } // 若未解锁,析构时自动解锁 |
- 关键对比
特性 | std::lock_guard |
std::unique_lock |
加锁时机 | 构造时立即加锁 | 可延迟加锁(通过 std::defer_lock ) |
手动解锁 | 不支持 | 支持 (unlock() ) |
条件变量支持 | 不支持 | 支持 |
性能开销 | 极低(接近原生互斥量) | 稍高(因需维护额外状态) |
移动语义 | 不可移动 | 支持所有权转移 |
适用场景 | 简单临界区保护 | 复杂锁管理或条件变量 |
- 场景一:条件变量(
std::unique_lock
必需
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 |
std::mutex mtx; std::condition_variable cv; bool data_ready = false; void consumer() { std::unique_lock<std::mutex> lock(mtx); cv.wait(lock, [] { return data_ready; }); // 自动解锁并等待通知 // 处理数据... } void producer() { { std::lock_guard<std::mutex> lock(mtx); data_ready = true; } cv.notify_one(); } |
- 场景二:批量加锁避免死锁
1 2 3 4 5 6 7 8 |
std::mutex mtx1, mtx2; void safeOperation() { std::unique_lock<std::mutex> lock1(mtx1, std::defer_lock); std::unique_lock<std::mutex> lock2(mtx2, std::defer_lock); std::lock(lock1, lock2); // 原子化批量加锁 // 操作共享资源 } |
- 场景三:延迟加锁优化性能
1 2 3 4 5 6 7 8 |
void processData(const Data& data) { std::unique_lock<std::mutex> lock(mtx, std::defer_lock); // 非临界区操作(无需锁) if (needsLocking(data)) { lock.lock(); // 临界区操作 } } |
- 使用建议
- 在简单场景下优先使用
std::lock_guard
- 在需要灵活性时,接受
std::unique_lock
的微小性能损失
- 在简单场景下优先使用
线程池相关
- 示例代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 |
#include <iostream> #include <vector> #include <thread> #include <queue> #include <mutex> #include <condition_variable> #include <functional> #include <future> #include <any> // 线程池管理器类 class ThreadPoolManager { public: explicit ThreadPoolManager() { // 获取硬件支持的并发线程数,取一半(至少1个) size_t thread_count = std::max(1u, std::thread::hardware_concurrency() / 2); pool_ = std::make_unique<ThreadPool>(thread_count); } // 添加任务到队列 template<typename F, typename... Args> auto push_task(F&& f, Args&&... args) -> std::future<decltype(f(args...))> { return pool_->enqueue(std::forward<F>(f), std::forward<Args>(args)...); } private: // 线程池内部实现 class ThreadPool { public: explicit ThreadPool(size_t thread_count) : stop(false) { for (size_t i = 0; i < thread_count; ++i) { workers.emplace_back([this] { worker_loop(); }); } } ~ThreadPool() { { std::unique_lock<std::mutex> lock(queue_mutex); stop = true; } condition.notify_all(); for (std::thread& worker : workers) { if (worker.joinable()) { worker.join(); } } } template<typename F, typename... Args> auto enqueue(F&& f, Args&&... args) -> std::future<decltype(f(args...))> { using return_type = decltype(f(args...)); auto task = std::make_shared<std::packaged_task<return_type()>>( std::bind(std::forward<F>(f), std::forward<Args>(args)...) ); std::future<return_type> res = task->get_future(); { std::unique_lock<std::mutex> lock(queue_mutex); if (stop) { throw std::runtime_error("线程池已停止,无法添加任务"); } tasks.emplace([task]() { (*task)(); }); } condition.notify_one(); return res; } private: std::vector<std::thread> workers; std::queue<std::function<void()>> tasks; std::mutex queue_mutex; std::condition_variable condition; bool stop; void worker_loop() { while (true) { std::function<void()> task; { std::unique_lock<std::mutex> lock(queue_mutex); condition.wait(lock, [this] { return stop || !tasks.empty(); }); if (stop && tasks.empty()) { return; } task = std::move(tasks.front()); tasks.pop(); } task(); } } }; std::unique_ptr<ThreadPool> pool_; }; // 示例任务函数 void sample_task(int id) { std::this_thread::sleep_for(std::chrono::milliseconds(100)); std::cout << "任务 " << id << " 由线程 " << std::this_thread::get_id() << " 执行" << std::endl; } int main() { ThreadPoolManager manager; // 添加10个任务 std::vector<std::future<void>> futures; for (int i = 0; i < 10; ++i) { futures.emplace_back( manager.push_task(sample_task, i) ); } // 等待所有任务完成 for (auto& future : futures) { future.wait(); } std::cout << "所有任务已完成!" << std::endl; return 0; } |
一次性事件
async
- 作用
- 简化异步任务:自动启动线程或任务,返回
std::future
- 简化异步任务:自动启动线程或任务,返回
- 启动策略
std::launch::async
:强制新线程执行std::launch::deferred
:延迟到get()
/wait()
时在当前线程执行
1 2 3 4 5 |
auto fut = std::async(std::launch::async, []{ std::this_thread::sleep_for(std::chrono::seconds(1)); return 3.14; }); double pi = fut.get(); // 等待结果 |
- 场景
- 简单的异步计算(如文件加载、数学运算)
- 需要快速获取结果的轻量级任务
- 不加参数
std::launch::async
,是不是不是新线程?- 默认情况下,
std::async
的启动策略是 组合模式
- 默认情况下,
1 2 3 |
auto fut = std::async(func); // 等价于: auto fut = std::async(std::launch::async | std::launch::deferred, func); |
- 可能的情况一:
- 启动新线程(类似
std::launch::async
) - 大多数现代标准库(如
MSVC
、GCC
、Clang
)默认会选择async
,立即创建新线程执行任务
- 启动新线程(类似
1 2 3 4 5 |
auto fut = std::async([]{ std::cout << "线程ID: " << std::this_thread::get_id() << std::endl; }); fut.wait(); // 输出可能不同于主线程ID(如:140737307965184) |
- 可能的情况二:
- 延迟执行(类似
std::launch::deferred
) - 少数实现可能在某些情况下选择
deferred
(例如资源紧张时)
- 延迟执行(类似
1 2 3 4 |
auto fut = std::async([]{ std::cout << "线程ID: " << std::this_thread::get_id() << std::endl; }); // 不调用 fut.get() 或 fut.wait(),任务不会执行! |
- 若需确保任务始终异步执行,应显式指定
std::launch::async
:
1 |
auto fut = std::async(std::launch::async, func); // 强制新线程执行 |
future
- 作用
- 占位符:表示一个未来可能获取的值(或异常)
- 阻塞等待:通过
get()
方法阻塞当前线程,直到结果就绪 - 一次性读取:只能调用一次
get()
1 2 3 4 |
#include <future> std::future<int> fut = std::async([]{ return 42; }); int result = fut.get(); // 阻塞直到结果就绪 |
wait
- 阻塞当前线程,直到关联的异步任务完成(无论任务是否返回结果)
- 仅等待任务完成,不返回任何值或异常
- 可多次调用
第一次调用的时候,如果任务已经完成,不会阻塞。如果任务还没完成,会阻塞。
第二次调用的时候,如果任务已经完成,不会阻塞。如果任务还没完成,继续阻塞。 - 场景
需要等待异步任务完成后再继续后续逻辑,但不需要任务的结果
检查任务是否已完成(结合wait_for()
或wait_until()
)
get
- 阻塞当前线程,直到关联的异步任务完成,并返回任务的结果(或抛出异常)
- 调用后,
std::future
对象变为无效(valid() == false
) - 不可重复调用
- 场景
需要获取异步任务的返回值或处理任务中抛出的异常
确保任务完成后才使用其结果
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 |
#include <future> #include <iostream> int main() { auto fut = std::async([] { std::this_thread::sleep_for(std::chrono::seconds(2)); return 42; }); std::cout << "等待任务完成..." << std::endl; fut.wait(); // 阻塞直到任务完成 std::cout << "任务已完成!" << std::endl; return 0; } |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 |
#include <future> #include <iostream> int main() { auto fut = std::async([] { std::this_thread::sleep_for(std::chrono::seconds(2)); return 42; }); std::cout << "等待并获取结果..." << std::endl; int result = fut.get(); // 阻塞并获取结果 std::cout << "结果:" << result << std::endl; return 0; } |
1 2 3 4 5 6 7 8 9 10 11 |
// 使用 wait_for() 或 wait_until() 实现非阻塞检查: auto fut = std::async(/* 任务 */); // 非阻塞检查任务状态 auto status = fut.wait_for(std::chrono::seconds(0)); if (status == std::future_status::ready) { std::cout << "任务已完成" << std::endl; } else { std::cout << "任务未完成" << std::endl; } |
- 组合使用场景:先
wait()
再get()
1 2 3 4 5 6 7 8 9 |
auto fut = std::async(/* 任务 */); // 等待期间执行其他逻辑 doSomethingElse(); fut.wait(); // 确保任务完成 if (someCondition) { int result = fut.get(); // 安全获取结果 } |
- 组合使用场景:避免多次调用
get()
get()
只能调用一次,多次调用会导致未定义行为:
1 2 3 |
auto fut = std::async([]{ return 42; }); int r1 = fut.get(); // 正确 int r2 = fut.get(); // ❌ 错误!future 已无效 |
std::shared_future
- 若需要多次获取结果,可将
std::future
转换为std::shared_future
:
- 若需要多次获取结果,可将
1 2 3 4 5 |
auto fut = std::async([]{ return 42; }); std::shared_future<int> shared_fut = fut.share(); int r1 = shared_fut.get(); // 正确 int r2 = shared_fut.get(); // 正确(允许多次调用) |
package_task
- 作用
- 包装任务:将可调用对象(函数、
Lambda
)与std::future
绑定 - 灵活调度:可将任务传递给线程池或其他线程执行
- 包装任务:将可调用对象(函数、
- 场景
- 需要手动控制任务调度(如线程池)
- 需要将任务存储到队列中延迟执行
1 2 3 4 5 6 7 8 9 10 11 |
#include <future> #include <thread> std::packaged_task<int()> task([]{ return 7 * 6; }); std::future<int> fut = task.get_future(); // 将任务传递给线程执行 std::thread t(std::move(task)); t.detach(); int result = fut.get(); // 42 |
promise
- 作用
- 显式传值:通过
set_value()
或set_exception()
手动设置结果 - 与
future
关联:通过get_future()
获取关联的std::future
- 显式传值:通过
- 场景
- 需要在线程间手动传递结果(如回调函数)
- 需要将异步结果与复杂逻辑解耦
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 |
#include <future> #include <thread> void worker(std::promise<int> prom) { std::this_thread::sleep_for(std::chrono::seconds(1)); prom.set_value(42); // 手动设置结果 } int main() { std::promise<int> prom; std::future<int> fut = prom.get_future(); std::thread t(worker, std::move(prom)); t.detach(); int result = fut.get(); // 42 } |
协作示例
- 使用线程池执行任务,并通过
std::packaged_task
和std::future
获取结果
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 |
#include <future> #include <queue> #include <thread> #include <mutex> #include <condition_variable> class ThreadPool { public: ThreadPool(size_t threads) : stop(false) { for (size_t i = 0; i < threads; ++i) { workers.emplace_back([this] { while (true) { std::function<void()> task; { std::unique_lock<std::mutex> lock(queue_mutex); condition.wait(lock, [this] { return stop || !tasks.empty(); }); if (stop && tasks.empty()) return; task = std::move(tasks.front()); tasks.pop(); } task(); } }); } } template<typename F> std::future<typename std::result_of<F()>::type> enqueue(F&& f) { using return_type = typename std::result_of<F()>::type; auto task = std::make_shared<std::packaged_task<return_type()>>( std::forward<F>(f) ); std::future<return_type> res = task->get_future(); { std::unique_lock<std::mutex> lock(queue_mutex); if (stop) throw std::runtime_error("线程池已停止"); tasks.emplace([task]() { (*task)(); }); } condition.notify_one(); return res; } ~ThreadPool() { { std::unique_lock<std::mutex> lock(queue_mutex); stop = true; } condition.notify_all(); for (std::thread& worker : workers) { worker.join(); } } private: std::vector<std::thread> workers; std::queue<std::function<void()>> tasks; std::mutex queue_mutex; std::condition_variable condition; bool stop; }; int main() { ThreadPool pool(4); auto future = pool.enqueue([] { std::this_thread::sleep_for(std::chrono::seconds(1)); return 42; }); std::cout << future.get() << std::endl; // 输出 42 return 0; } |
本文为原创文章,版权归Aet所有,欢迎分享本文,转载请保留出处!
你可能也喜欢
- ♥ Soui八06/20
- ♥ SOUI源码:log4z06/24
- ♥ breakpad记述:Windows07/27
- ♥ 51CTO:C++网络通信引擎架构与实现一09/09
- ♥ C++_函数模板、类模板、特化、模板元编程、SFINAE、概念06/22
- ♥ COM组件_101/31