线程池
创建线程池方法一
- 调用
async_factory_impl::create
创建线程池- 但是这个函数实际作用是用于创建一个记录器
- 由于需要一个线程池对象参数,所以在创建记录器之前做了个检查,如果还不存在线程池,就先创建线程池,再构造记录器
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 |
template<async_overflow_policy OverflowPolicy = async_overflow_policy::block> struct async_factory_impl { template<typename Sink, typename... SinkArgs> static std::shared_ptr<async_logger> create(std::string logger_name, SinkArgs &&... args) { // create global thread pool if not already exists.. std::lock_guard<std::recursive_mutex> tp_lock(details::s_thread_pool_mutex); if (!details::s_thread_pool) { details::s_thread_pool = std::make_shared<details::thread_pool>(details::default_async_q_size, 1U); } auto sink = std::make_shared<Sink>(std::forward<SinkArgs>(args)...); auto new_logger = std::make_shared<async_logger>(std::move(logger_name), std::move(sink), std::move(details::s_thread_pool), OverflowPolicy); return new_logger; } }; |
- 这里使用递归锁的原因可能是为了使代码的防御性强一些
thread_pool
类的解析看后文
创建线程池方法二
- 调用
init_thread_pool
创建线程池
1 2 3 4 5 6 7 8 9 10 11 12 |
// set global thread pool. inline void init_thread_pool(size_t q_size, size_t thread_count, std::function<void()> on_thread_start) { auto tp = std::make_shared<details::thread_pool>(q_size, thread_count, on_thread_start); details::registry::instance().set_tp(std::move(tp)); } // set global thread pool. inline void init_thread_pool(size_t q_size, size_t thread_count) { init_thread_pool(q_size, thread_count, [] {}); } |
- 而
s_thread_pool
是一个静态的std::shared_ptr
对象
1 2 3 4 5 |
namespace details { static const size_t default_async_q_size = 8192; static std::shared_ptr<thread_pool> s_thread_pool; static std::recursive_mutex s_thread_pool_mutex; } // namespace details |
static
对象只会被初始化一次,并且:- 在
C++11
及之后的版本中,函数内的局部静态对象的初始化是线程安全的
- 在
- 可以看到用上面这个方法创建线程池后,会被转移到了
class SPDLOG_API registry
这个类的实例里面- 而这个类
class SPDLOG_API registry
是一个单例类 - 类
class SPDLOG_API registry
解析见下文
- 而这个类
1 2 3 4 5 |
SPDLOG_INLINE registry ®istry::instance() { static registry s_instance; return s_instance; } |
获取线程池方法三
- 如方法二中所述,用方法二创建的线程池会被保存到
class SPDLOG_API registry
这个类的静态实例里面- 所以可以通过这个类的接口来获取,这个类提供了这样的方法
1 2 3 4 5 |
SPDLOG_INLINE std::shared_ptr<thread_pool> registry::get_tp() { std::lock_guard<std::recursive_mutex> lock(tp_mutex_); return tp_; } |
thread_pool类
类
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 |
class SPDLOG_API thread_pool { public: using item_type = async_msg; using q_type = details::mpmc_blocking_queue<item_type>; thread_pool(size_t q_max_items, size_t threads_n, std::function<void()> on_thread_start, std::function<void()> on_thread_stop); thread_pool(size_t q_max_items, size_t threads_n, std::function<void()> on_thread_start); thread_pool(size_t q_max_items, size_t threads_n); // message all threads to terminate gracefully and join them ~thread_pool(); thread_pool(const thread_pool &) = delete; thread_pool &operator=(thread_pool &&) = delete; void post_log(async_logger_ptr &&worker_ptr, const details::log_msg &msg, async_overflow_policy overflow_policy); void post_flush(async_logger_ptr &&worker_ptr, async_overflow_policy overflow_policy); size_t overrun_counter(); size_t queue_size(); private: q_type q_; std::vector<std::thread> threads_; void post_async_msg_(async_msg &&new_msg, async_overflow_policy overflow_policy); void worker_loop_(); // process next message in the queue // return true if this thread should still be active (while no terminate msg // was received) bool process_next_msg_(); }; |
q_
是一个队列,是在线程池构造的初始化列表里面,通过调用这个队列的构造函数来初始化- 根据上面的代码,可以看到,它一般被初始化为下面的大小
1 |
static const size_t default_async_q_size = 8192; |
- 在构造函数里,检查了要创建的线程的数量,为
0
或过大,就抛一个异常出去 - 然后创建所要求的数量的线程,并保存在了
vector
里面- 可以看到,是通过传入了一个
lambda
来隐式生成了一个线程对象
- 可以看到,是通过传入了一个
1 2 3 4 5 6 7 8 |
for (size_t i = 0; i < threads_n; i++) { threads_.emplace_back([this, on_thread_start, on_thread_stop] { on_thread_start(); this->thread_pool::worker_loop_(); on_thread_stop(); }); } |
process_next_msg_
这个函数相当于是线程池里线程的消息泵- 只有当这个函数返回
false
的时候,该线程结束 - 所以销毁线程的时候,也是利用了这一点,通过传入了类型为
terminate
的消息包,并调用notify_one
来通知一个线程来处理这个消息包 - 在
process_next_msg_
里面,发现消息包的类型为terminate
,于是返回了false
,从而结束了这个线程
- 只有当这个函数返回
1 2 3 4 |
void SPDLOG_INLINE thread_pool::worker_loop_() { while (process_next_msg_()) {} } |
- 消息泵函数里面,则通过队列的
dequeue_for
这个函数来获取一个消息包- 没有拿到消息包,返回
true
,从而再次进入消息泵 - 拿到了消息包,根据消息包的类型来处理
- 消息包里,不仅存了消息包的类型,还存了一个记录器的智能指针对象
- 对于
log
类型的消息,用存着的智能指针对象去记录 - 对于
flush
类型的消息,用存着的智能指针对象去调用fflush
- 对于
terminate
类型的消息,返回false
,让线程结束
- 没有拿到消息包,返回
数据队列
pop_cv_
在检查队列,只要队列没满,就能往里面添加数据,成功添加一个数据,便push_cv_
通知一下
1 2 3 4 5 6 7 8 9 |
void enqueue(T &&item) { { std::unique_lock<std::mutex> lock(queue_mutex_); pop_cv_.wait(lock, [this] { return !this->q_.full(); }); q_.push_back(std::move(item)); } push_cv_.notify_one(); } |
- 立即加入队列的版本
1 2 3 4 5 6 7 8 |
void enqueue_nowait(T &&item) { { std::unique_lock<std::mutex> lock(queue_mutex_); q_.push_back(std::move(item)); } push_cv_.notify_one(); } |
- 取数据,这个函数正是上面线程的消息泵取数据的地方
push_cv_
在等待数据,一段时间内,队列还是为空就返回- 不为空就取出来一个消息,而这个函数的第一个参数是出参,调这个函数的地方,会对这个消息包进行处理,上文有讲
- 同时,每取出一个消息包,就通知一下
pop_cv_
这个条件变量,而它不再阻塞,开始继续检查队列是不是已满
1 2 3 4 5 6 7 8 9 10 11 12 13 14 |
bool dequeue_for(T &popped_item, std::chrono::milliseconds wait_duration) { { std::unique_lock<std::mutex> lock(queue_mutex_); if (!push_cv_.wait_for(lock, wait_duration, [this] { return !this->q_.empty(); })) { return false; } popped_item = std::move(q_.front()); q_.pop_front(); } pop_cv_.notify_one(); return true; } |
- 查询队列数据
- 这里上锁是为了确保在多线程环境中数据的一致性和线程安全
1 2 3 4 5 6 7 8 9 |
size_t SPDLOG_INLINE thread_pool::overrun_counter() { return q_.overrun_counter(); } size_t SPDLOG_INLINE thread_pool::queue_size() { return q_.size(); } |
1 2 3 4 5 6 7 8 9 10 11 |
size_t overrun_counter() { std::unique_lock<std::mutex> lock(queue_mutex_); return q_.overrun_counter(); } size_t size() { std::unique_lock<std::mutex> lock(queue_mutex_); return q_.size(); } |
其他接口
post_log
- 往线程池里投递消息
log
消息包
- 往线程池里投递消息
post_flush
- 往线程池里投递消息
flush
消息包
- 往线程池里投递消息
registry类
记录器
- 会在创建
registry
类的时候创建一个默认记录器,默认记录器没有名字
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 |
SPDLOG_INLINE registry::registry() : formatter_(new pattern_formatter()) { #ifndef SPDLOG_DISABLE_DEFAULT_LOGGER // create default logger (ansicolor_stdout_sink_mt or wincolor_stdout_sink_mt in windows). # ifdef _WIN32 auto color_sink = std::make_shared<sinks::wincolor_stdout_sink_mt>(); # else auto color_sink = std::make_shared<sinks::ansicolor_stdout_sink_mt>(); # endif const char *default_logger_name = ""; default_logger_ = std::make_shared<spdlog::logger>(default_logger_name, std::move(color_sink)); loggers_[default_logger_name] = default_logger_; #endif // SPDLOG_DISABLE_DEFAULT_LOGGER } |
- 可以往
registry
里面注册记录器- 可以看到,注册记录器就是,
registry
用了一个map
把每一个记录器的智能指针对象保存了起来
- 可以看到,注册记录器就是,
1 |
std::unordered_map<std::string, std::shared_ptr<logger>> loggers_; |
1 2 3 4 5 |
SPDLOG_INLINE void registry::register_logger(std::shared_ptr<logger> new_logger) { std::lock_guard<std::mutex> lock(logger_map_mutex_); register_logger_(std::move(new_logger)); } |
1 2 3 4 5 6 |
SPDLOG_INLINE void registry::register_logger_(std::shared_ptr<logger> new_logger) { auto logger_name = new_logger->name(); throw_if_exists_(logger_name); loggers_[logger_name] = std::move(new_logger); } |
- 初始化记录器,对一个新的记录器进行初始化,并注册
1 2 3 |
SPDLOG_INLINE void registry::initialize_logger(std::shared_ptr<logger> new_logger) { // ... } |
- 从
registry
里面用记录器名字获取记录器- 它这些查找都是加锁的
1 2 3 4 5 6 |
SPDLOG_INLINE std::shared_ptr<logger> registry::get(const std::string &logger_name) { std::lock_guard<std::mutex> lock(logger_map_mutex_); auto found = loggers_.find(logger_name); return found == loggers_.end() ? nullptr : found->second; } |
默认记录器
- 它不仅被存到了
registry
里面,没有使用std::move
- 而且还用单独的
default_logger_
来存储默认记录器 - 当然,也有更新默认记录器的方法
1 2 3 4 5 6 7 8 9 10 |
SPDLOG_INLINE std::shared_ptr<logger> registry::default_logger() { std::lock_guard<std::mutex> lock(logger_map_mutex_); return default_logger_; } SPDLOG_INLINE logger *registry::get_default_raw() { return default_logger_.get(); } |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 |
SPDLOG_INLINE void registry::set_default_logger(std::shared_ptr<logger> new_default_logger) { std::lock_guard<std::mutex> lock(logger_map_mutex_); // remove previous default logger from the map if (default_logger_ != nullptr) { loggers_.erase(default_logger_->name()); } if (new_default_logger != nullptr) { loggers_[new_default_logger->name()] = new_default_logger; } default_logger_ = std::move(new_default_logger); } |
线程池
- 获取
1 2 3 4 5 |
SPDLOG_INLINE std::shared_ptr<thread_pool> registry::get_tp() { std::lock_guard<std::recursive_mutex> lock(tp_mutex_); return tp_; } |
记录器->接收器->格式器
- 设置
- 可以看到,调用这个接口设置一个格式器后,不仅被保存了,而且还被用于更新每一个记录器的格式器
1 2 3 4 5 6 7 8 9 |
SPDLOG_INLINE void registry::set_formatter(std::unique_ptr<formatter> formatter) { std::lock_guard<std::mutex> lock(logger_map_mutex_); formatter_ = std::move(formatter); for (auto &l : loggers_) { l.second->set_formatter(formatter_->clone()); } } |
- 格式器是接收器的组件
- 具象化的接收器,每一个都实现了基类接收器设置格式器的抽象方法
1 2 3 4 5 |
SPDLOG_INLINE void stdout_sink_base<ConsoleMutex>::set_formatter(std::unique_ptr<spdlog::formatter> sink_formatter) { std::lock_guard<mutex_t> lock(mutex_); formatter_ = std::move(sink_formatter); } |
记录器->回溯器
- 每个记录器还有一个回溯器
- 可以看到,回溯器有个环形缓冲区
1 |
details::backtracer tracer_; |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 |
class SPDLOG_API backtracer { mutable std::mutex mutex_; std::atomic<bool> enabled_{false}; circular_q<log_msg_buffer> messages_; public: backtracer() = default; backtracer(const backtracer &other); backtracer(backtracer &&other) SPDLOG_NOEXCEPT; backtracer &operator=(backtracer other); void enable(size_t size); void disable(); bool enabled() const; void push_back(const log_msg &msg); // pop all items in the q and apply the given fun on each of them. void foreach_pop(std::function<void(const details::log_msg &)> fun); }; |
- 打印消息的时候,会根据记录器的回溯器的标志,来判断是否把消息添加到回溯器中
1 |
bool traceback_enabled = tracer_.enabled(); |
- 打印回溯信息
- 记录器给出了接口,通过回溯器的
foreach_pop
接口获取回溯器里面的每一个消息包,并打印信息
- 记录器给出了接口,通过回溯器的
1 2 3 4 5 6 7 8 9 10 |
SPDLOG_INLINE void logger::dump_backtrace_() { using details::log_msg; if (tracer_.enabled()) { sink_it_(log_msg{name(), level::info, "****************** Backtrace Start ******************"}); tracer_.foreach_pop([this](const log_msg &msg) { this->sink_it_(msg); }); sink_it_(log_msg{name(), level::info, "****************** Backtrace End ********************"}); } } |
类图
本文为原创文章,版权归Aet所有,欢迎分享本文,转载请保留出处!
你可能也喜欢
- ♥ Effective C++_第一篇01/10
- ♥ 51CTO:Linux C++网络编程四08/19
- ♥ C++20_第二篇03/21
- ♥ STL_heap06/15
- ♥ C++标准模板库编程实战_智能指针11/30
- ♥ C++编程规范101规则、准则与最佳实践 一01/05