```cpp
#include <iostream>
#include <mutex>
#include <thread>
#include <condition_variable>
#include <list>
#include <atomic>
#include <memory>
#include <functional>
template <typename T>
class SynQueue {
public:
SynQueue(size_t maxSize) : maxSize_(maxSize), needStop_(false) {}
void put(const T& x) { add(x); }
void put(T&& x) { add(std::forward<T>(x)); }
void take(std::list<T>& list)
{
// 条件不满足时,条件变量会释放mutex并将线程至于waiting状态,等待其他线程notify_one或notify_all将其唤醒
std::unique_lock<std::mutex> lock(mutex_);
notEmpty_.wait(lock, [this] { return needStop_ || notEmpty(); });
if (needStop_)
return;
list = std::move(queue_);
notFull_.notify_one();
}
void take(T& t)
{
std::unique_lock<std::mutex> lock(mutex_);
notEmpty_.wait(lock, [this] { return needStop_ || notEmpty(); });
if (needStop_)
return;
t = queue_.front();
queue_.pop_front();
notFull_.notify_one();
}
void stop()
{
{
std::lock_guard<std::mutex> lock(mutex_);
needStop_ = true;
}
notFull_.notify_all();
notEmpty_.notify_all();
}
private:
size_t size() const
{
std::lock_guard<std::mutex> lock(mutex_);
return queue_.size();
}
bool notFull() const
{
bool full = queue_.size() >= maxSize_;
if (full)
std::cout << "queue is full, need waiting..." << std::endl;
return !full;
}
bool notEmpty() const
{
bool empty = queue_.empty();
if (empty)
std::cout << "queue is empty, need waiting... thread id="
<< std::this_thread::get_id() << std::endl;
return !empty;
}
template <typename F>
void add(F&& x)
{
std::unique_lock<std::mutex> lock(mutex_);
notFull_.wait(lock, [this] { return needStop_ || notFull(); });
if (needStop_)
return;
queue_.push_back(std::forward<F>(x));
notEmpty_.notify_one();
}
private:
std::list<T> queue_; // 缓冲区
std::mutex mutex_; // 互斥量和条件变量结合起来使用
std::condition_variable notEmpty_; // 不为空的条件变量
std::condition_variable notFull_; // 没有满的条件变量
size_t maxSize_;
bool needStop_;
};
class ThreadPool {
public:
using Task = std::function<void()>;
ThreadPool(int numThreads = std::thread::hardware_concurrency())
: queue_(maxTaskCount)
{
start(numThreads);
}
~ThreadPool() { stop(); }
void addTask(Task&& task)
{
queue_.put(std::forward<Task>(task));
}
void addTask(const Task& task)
{
queue_.put(task);
}
void stop()
{
std::call_once(onceFlag_, [this] { stopThreadGroup(); });
}
private:
void start(int numThreads)
{
running_ = true;
for (int i = 0; i < numThreads; ++i)
{
threadGroup_.push_back(
std::make_shared<std::thread>(&ThreadPool::runInThread, this));
}
}
void runInThread()
{
while (running_)
{
Task task;
queue_.take(task);
if (!running_)
return;
task();
}
}
void stopThreadGroup()
{
queue_.stop();
running_ = false;
for (auto& thr : threadGroup_)
{
if (thr)
thr->join();
}
threadGroup_.clear();
}
private:
enum { maxTaskCount = 100 };
std::list<std::shared_ptr<std::thread>> threadGroup_;
SynQueue<Task> queue_;
std::atomic_bool running_;
std::once_flag onceFlag_;
};
void testThreadPool()
{
ThreadPool pool(3);
std::thread thread1([&pool] {
for (int i = 0; i < 10; ++i)
{
auto threadId = std::this_thread::get_id();
pool.addTask([threadId] {
std::cout << "threadId=" << threadId << std::endl;
});
}
});
std::thread thread2([&pool] {
for (int i = 0; i < 100; ++i)
{
auto threadId = std::this_thread::get_id();
pool.addTask([threadId] {
std::cout << "threadId=" << threadId << std::endl;
});
}
});
std::this_thread::sleep_for(std::chrono::seconds(2));
getchar();
pool.stop();
thread1.join();
thread2.join();
}
int main(int argc, char** argv)
{
testThreadPool();
return 0;
}
```
- 空白目录
- 算法
- 排序
- 冒泡排序
- 选择排序
- 插入排序
- 归并排序
- 快速排序
- 计数排序
- 桶排序
- 基数排序
- 希尔排序
- 堆排序
- 二分查找
- 最小堆
- 最小索引堆
- 平衡二叉树(AVL tree)
- bitmap位图
- 布隆过滤器
- hashmap
- topK
- 跳表
- LRU Cache
- kmp
- 最小堆和堆排序
- 最短路径
- C++
- 运行时类型判断RTTI
- C++反射
- 手动实现智能指针
- 序列化实现
- rpc实现
- std::forward
- 函数指针的妙用
- C/C++
- std::function
- 同步队列
- 线程池实现
- std::promise
- 深入理解虚函数
- extern "C" 关键字讲解
- 大端小端的区别
- 简历
- 简历1
- redis
- 数据结构和对象
- sds
- list
- zskiplist
- 腾讯云redis面试题总结
- redis集群部署
- LeetCode
- 目标
- go基础
- 算法快速入门
- 数据结构篇
- 二叉树
- 链表
- 栈和队列
- 二进制
- 基础算法篇
- 二分搜索
- 排序算法
- 动态规划
- 算法思维
- 递归思维
- 滑动窗口思想
- 二叉搜索树
- 回溯法
- 其他
- 剑指offer
- 笔记
- git代理加速
- Linux
- vim大法
- vscode远程不能跳转
- cmake
- 设计模式
- 单例模式
- 简单工厂模式
- 外观模式
- 适配器模式
- 工厂方法模式
- 抽象工厂模式
- 生成器模式
- 原型模式
- 中介者模式
- 观察者模式
- 访问者模式
- 命令模式
- 网络编程
- epoll reactor模式
- linux timerfd系列函数总结
- IO
- mapreduce
- 反射器
- leo通信库
- Mutex
- Condition
- thread
- raft
- 协程
- hook
- 定时器
- 别人的面试经验
- 面试题
- vector崩溃问题
- JAVA
- Linux java环境配置
- ucore
- lab1
- FreeNOS
- leveldb
- 刷题笔记
- 回文串
- 前缀树
- 字符串查找
- 查找两个字符串a,b中的最长公共子串
- 动态规划
- golang
- 顺序循环打印实现
- 数据结构
- rpc运用
- python
- 单例
- 深拷贝浅拷贝
- 链表
- python基础题
- mysql
- 事务
- Linux
- 共享内存
- 刷题记录
- 贪心算法
- 动态规划
- 面试
- 腾讯C++面试
- 微众面试JD
- 迅雷网络面试
- 学习网址
- rabbitMq
