线程池是一种线程使用模式。线程过多会带来调度开销,进而影响缓存局部性和整体性能。而线程池维护着多个线程,等待着监督管理者分配可并发执行的任务。这避免了在处理短时间任务时创建与销毁线程的代价。线程池不仅能够保证内核的充分利用,还能防止过分调度。可用线程数量应该取决于可用的并发处理器、处理器内核、内存、网络sockets等的数量。
线程池的运用场景:
设计一个线程池:
代码实现:
Makefile文件:
thraed_pool:main.cc
g++ -o $@ $^ -std=c++11 -lpthread
.PHONY:clean
clean:
rm -rf thraed_pool
thread_pool.hpp文件:
#pragma once
#include
#include
#include
#include
// 基于工作队列的线程池
namespace ns_thread_pool
{
const int g_num = 10;
template <class T>
class ThreadPool
{
private:
std::queue<T> _task_queue; // 工作队列
int _num; // 工作队列的容量上限
pthread_mutex_t _mtx; // 保护临界资源的互斥量
pthread_cond_t _cond;
public:
void Lock()
{
pthread_mutex_lock(&_mtx);
}
void Unlock()
{
pthread_mutex_unlock(&_mtx);
}
void Wakeup()
{
pthread_cond_signal(&_cond);
}
void Wait()
{
pthread_cond_wait(&_cond, &_mtx);
}
bool IsEmpty()
{
return _task_queue.empty();
}
public:
ThreadPool(int num &#61; g_num) : _num(num)
{
pthread_mutex_init(&_mtx, nullptr);
pthread_cond_init(&_cond, nullptr);
}
static void *Rountine(void *args)
{
pthread_detach(pthread_self());
ThreadPool<T> *tp &#61; (ThreadPool<T> *)args;
while (true)
{
tp->Lock();
while (tp->IsEmpty())
{
tp->Wait();
}
T t;
tp->PopTask(&t);
tp->Unlock();
t();
}
}
void ThreadInit()
{
pthread_t tid;
for (int i &#61; 0; i < _num; i&#43;&#43;)
{
pthread_create(&tid, nullptr, Rountine, (void *)this);
}
}
void PushTask(const T &in)
{
Lock();
_task_queue.push(in);
Unlock();
Wakeup();
}
void PopTask(T *out)
{
*out &#61; _task_queue.front();
_task_queue.pop();
}
~ThreadPool()
{
pthread_mutex_destroy(&_mtx);
pthread_cond_destroy(&_cond);
}
};
}
Task.hpp文件&#xff1a;
#pragma once
#include
#include
#include
namespace ns_task
{
class Task
{
private:
int _x;
int _y;
char _op;
public:
Task() {}
Task(int x, int y, char op):_x(x), _y(y), _op(op) {}
std::string message()
{
std::string msg &#61; std::to_string(_x);
msg &#43;&#61; _op;
msg &#43;&#61; std::to_string(_y);
msg &#43;&#61; "&#61;?";
return msg;
}
int Run()
{
int res &#61; 0;
switch(_op)
{
case &#39;&#43;&#39;:
res &#61; _x &#43; _y;
break;
case &#39;-&#39;:
res &#61; _x - _y;
break;
case &#39;*&#39;:
res &#61; _x * _y;
break;
case &#39;/&#39;:
res &#61; _x / _y;
break;
case &#39;%&#39;:
res &#61; _x % _y;
break;
default:
break;
}
std::cout << "Thread: " << pthread_self() << " Task: " << _x << _op << _y << "&#61;" << res << std::endl;
return res;
}
int operator()()
{
return Run();
}
~Task(){}
};
}
main.cc文件&#xff1a;
#include "thread_pool.hpp"
#include "Task.hpp"
#include
#include
#include
using namespace ns_task;
using namespace ns_thread_pool;
int main()
{
srand((long long)time(nullptr));
ThreadPool<Task>* tp &#61; new ThreadPool<Task>();
tp->ThreadInit();
while(true)
{
Task t(rand()%20&#43;1, rand()%10&#43;1, "&#43;-*/%"[rand()%5]);
tp->PushTask(t);
}
return 0;
}
运行结果&#xff1a;
[cwx&#64;VM-20-16-centos normal_thread_pool]$ make
g&#43;&#43; -o thraed_pool main.cc -std&#61;c&#43;&#43;11 -lpthread
[cwx&#64;VM-20-16-centos normal_thread_pool]$ ./thraed_pool
Thread: 140412644165376 Task: 10-7&#61;3
Thread: 140412644165376 Task: 17&#43;10&#61;27
Thread: 140412669343488 Task: 6-7&#61;-1
Thread: 140412711307008 Task: 2/6&#61;0
Thread: 140412702914304 Task: 10/7&#61;1
......
洗碗的例子解释饿汉方式和懒汉方式&#xff1a;
懒汉方式最核心的思想就是"延时加载"&#xff0c;从而优化服务器的启动速度。
饿汉方式实现单例模式&#xff1a;
template <typename T>
class Singleton {
static T data;
public:
static T* GetInstance() {
return &data;
}
};
只要通过 Singleton这个类来使用 T 对象, 则一个进程中只有一个 T 对象的实例.
懒汉方式实现单例模式&#xff1a;
template <typename T>
class Singleton {
static T* inst;
public:
static T* GetInstance() {
if (inst &#61;&#61; NULL) {
inst &#61; new T();
}
return inst;
}
};
存在一个严重的问题, 线程不安全.
第一次调用 GetInstance 的时候, 如果两个线程同时调用, 可能会创建出两份 T 对象的实例.
饿汉方式实现单例模式&#xff08;线程安全版本&#xff09;:
template <typename T>
class Singleton {
volatile static T* inst; // 需要设置 volatile 关键字, 否则可能被编译器优化.
static std::mutex lock;
public:
static T* GetInstance() {
if (inst &#61;&#61; NULL) { // 双重判定空指针, 降低锁冲突的概率, 提高性能.
lock.lock(); // 使用互斥锁, 保证多线程情况下也只调用一次 new.
if (inst &#61;&#61; NULL) {
inst &#61; new T();
}
lock.unlock();
}
return inst;
}
};
注意事项:
thread_pool.hpp文件&#xff1a;
#pragma once
#include
#include
#include
#include
// 懒汉模式单例模式多线程池
namespace ns_thread_pool
{
const int g_num &#61; 10;
template <class T>
class ThreadPool
{
private:
std::queue<T> _task_queue; // 工作队列 -- 临界资源
int _num; // 工作队列的容量
pthread_mutex_t _mtx; // 保护工作队列临界资源的锁
pthread_cond_t _cond; // 队列为空&#xff0c;在此环境变量等待
static ThreadPool<T>* inst; // 单例模式的静态对象指针
private:
// 单例模式构造函数必须定义以及必须为私有private
ThreadPool(int num &#61; g_num) : _num(num)
{
// 互斥锁和环境变量初始化
pthread_mutex_init(&_mtx, nullptr);
pthread_cond_init(&_cond, nullptr);
}
ThreadPool(const ThreadPool<T> &tp) &#61; delete;
ThreadPool<T> &operator&#61;(ThreadPool<T> &tp) &#61; delete;
public:
void Lock()
{
pthread_mutex_lock(&_mtx);
}
void Unlock()
{
pthread_mutex_unlock(&_mtx);
}
void Wakeup()
{
pthread_cond_signal(&_cond);
}
void Wait()
{
pthread_cond_wait(&_cond, &_mtx);
}
bool IsEmpty()
{
return _task_queue.empty();
}
public:
static ThreadPool<T> *GetInstance()
{
// 静态互斥锁初始化
static pthread_mutex_t lock &#61; PTHREAD_MUTEX_INITIALIZER;
// 双条件判断可以减少锁的争用&#xff0c;提高效率
if(inst &#61;&#61; nullptr)
{
pthread_mutex_lock(&lock);
if(inst &#61;&#61; nullptr)
{
inst &#61; new ThreadPool<T>();
inst->ThreadInit();
}
pthread_mutex_unlock(&lock);
}
return inst;
}
static void *Rountine(void *args)
{
pthread_detach(pthread_self()); // 线程分离&#xff0c;主线程无需关注该线程
ThreadPool<T> *tp &#61; (ThreadPool<T> *)args;
while (true)
{
tp->Lock();
while (tp->IsEmpty())
{
// 工作队列为空&#xff0c;线程需要等待
tp->Wait();
}
T t;
tp->PopTask(&t);
tp->Unlock();
t();
}
}
void ThreadInit()
{
// 创建_num个线程的多线程池
pthread_t tid;
for (int i &#61; 0; i < _num; i&#43;&#43;)
{
pthread_create(&tid, nullptr, Rountine, (void *)this);
}
}
void PushTask(const T &in)
{
Lock();
_task_queue.push(in);
Unlock();
Wakeup();
}
void PopTask(T *out)
{
*out &#61; _task_queue.front();
_task_queue.pop();
}
~ThreadPool()
{
pthread_mutex_destroy(&_mtx);
pthread_cond_destroy(&_cond);
}
};
template<class T>
ThreadPool<T>* ThreadPool<T>::inst &#61; nullptr;
}
挂起等待特性的锁 vs 自旋锁
example&#xff1a;小明准备和女朋友小红约会&#xff0c;小明来到小红的宿舍楼下给小红打电话问小红何时下楼&#xff0c;小红表示需要化个妆才能约会需要半个小时的时间&#xff0c;小明听后前往网吧边打游戏边等待小红。如果小红说无需化妆只需要五分钟就可以下楼&#xff0c;小明就不会前往网吧&#xff0c;而是在楼下每一分钟发消息询问小红的下楼情况。
小红如果需要化妆&#xff0c;小明前往网吧的情况等价于挂起等待。
小红无需化妆&#xff0c;小明在楼下不断发消息&#xff0c;检测小红的状态等价于自旋的过程。
决定小明是否前往网吧的因素是小红需要多长时间才能下楼&#xff0c;如果小红需要化妆就前往网吧&#xff0c;等价于线程挂起等待&#xff0c;无需化妆就不断进行检测&#xff0c;等待于自旋的过程&#xff0c;这是因为线程挂起等待是有成本的。
线程如果访问临界资源花费的时长比较短&#xff0c;比较适合自旋锁&#xff1a;
#include
int pthread_spin_destroy(pthread_spinlock_t *lock);
int pthread_spin_init(pthread_spinlock_t *lock, int pshared);
int pthread_spin_lock(pthread_spinlock_t *lock);
int pthread_spin_trylock(pthread_spinlock_t *lock);
线程如果访问临界资源花费的时长比较长&#xff0c;比较适合挂起等待性质的锁&#xff0c;比如悲观锁。
321原则&#xff1a;
三种关系&#xff1a;
两种角色&#xff1a;
一个交易场所&#xff1a;
基本操作&#xff1a;
设置读者优先&#xff1a;
int pthread_rwlockattr_setkind_np(pthread_rwlockattr_t *attr, int pref);
/*
pref 共有 3 种选择
PTHREAD_RWLOCK_PREFER_READER_NP (默认设置) 读者优先&#xff0c;可能会导致写者饥饿情况
PTHREAD_RWLOCK_PREFER_WRITER_NP 写者优先&#xff0c;目前有 BUG&#xff0c;导致表现行为和
PTHREAD_RWLOCK_PREFER_READER_NP 一致
PTHREAD_RWLOCK_PREFER_WRITER_NONRECURSIVE_NP 写者优先&#xff0c;但写者不能递归加锁
*/
初始化&#xff1a;
int pthread_rwlock_init(pthread_rwlock_t *restrict rwlock,
const pthread_rwlockattr_t *restrict attr);
销毁&#xff1a;
int pthread_rwlock_destroy(pthread_rwlock_t *rwlock);
加锁和解锁
int pthread_rwlock_rdlock(pthread_rwlock_t *rwlock);
int pthread_rwlock_wrlock(pthread_rwlock_t *rwlock);
int pthread_rwlock_unlock(pthread_rwlock_t *rwlock);