AI写作智能体 自主规划任务,支持联网查询和网页读取,多模态高效创作各类分析报告、商业计划、营销方案、教学内容等。 广告
```cpp #include <iostream> #include <mutex> #include <thread> #include <condition_variable> #include <list> #include <atomic> #include <memory> #include <functional> template <typename T> class SynQueue { public: SynQueue(size_t maxSize) : maxSize_(maxSize), needStop_(false) {} void put(const T& x) { add(x); } void put(T&& x) { add(std::forward<T>(x)); } void take(std::list<T>& list) { // 条件不满足时,条件变量会释放mutex并将线程至于waiting状态,等待其他线程notify_one或notify_all将其唤醒 std::unique_lock<std::mutex> lock(mutex_); notEmpty_.wait(lock, [this] { return needStop_ || notEmpty(); }); if (needStop_) return; list = std::move(queue_); notFull_.notify_one(); } void take(T& t) { std::unique_lock<std::mutex> lock(mutex_); notEmpty_.wait(lock, [this] { return needStop_ || notEmpty(); }); if (needStop_) return; t = queue_.front(); queue_.pop_front(); notFull_.notify_one(); } void stop() { { std::lock_guard<std::mutex> lock(mutex_); needStop_ = true; } notFull_.notify_all(); notEmpty_.notify_all(); } private: size_t size() const { std::lock_guard<std::mutex> lock(mutex_); return queue_.size(); } bool notFull() const { bool full = queue_.size() >= maxSize_; if (full) std::cout << "queue is full, need waiting..." << std::endl; return !full; } bool notEmpty() const { bool empty = queue_.empty(); if (empty) std::cout << "queue is empty, need waiting... thread id=" << std::this_thread::get_id() << std::endl; return !empty; } template <typename F> void add(F&& x) { std::unique_lock<std::mutex> lock(mutex_); notFull_.wait(lock, [this] { return needStop_ || notFull(); }); if (needStop_) return; queue_.push_back(std::forward<F>(x)); notEmpty_.notify_one(); } private: std::list<T> queue_; // 缓冲区 std::mutex mutex_; // 互斥量和条件变量结合起来使用 std::condition_variable notEmpty_; // 不为空的条件变量 std::condition_variable notFull_; // 没有满的条件变量 size_t maxSize_; bool needStop_; }; class ThreadPool { public: using Task = std::function<void()>; ThreadPool(int numThreads = std::thread::hardware_concurrency()) : queue_(maxTaskCount) { start(numThreads); } ~ThreadPool() { stop(); } void addTask(Task&& task) { queue_.put(std::forward<Task>(task)); } void addTask(const Task& task) { queue_.put(task); } void stop() { std::call_once(onceFlag_, [this] { stopThreadGroup(); }); } private: void start(int numThreads) { running_ = true; for (int i = 0; i < numThreads; ++i) { threadGroup_.push_back( std::make_shared<std::thread>(&ThreadPool::runInThread, this)); } } void runInThread() { while (running_) { Task task; queue_.take(task); if (!running_) return; task(); } } void stopThreadGroup() { queue_.stop(); running_ = false; for (auto& thr : threadGroup_) { if (thr) thr->join(); } threadGroup_.clear(); } private: enum { maxTaskCount = 100 }; std::list<std::shared_ptr<std::thread>> threadGroup_; SynQueue<Task> queue_; std::atomic_bool running_; std::once_flag onceFlag_; }; void testThreadPool() { ThreadPool pool(3); std::thread thread1([&pool] { for (int i = 0; i < 10; ++i) { auto threadId = std::this_thread::get_id(); pool.addTask([threadId] { std::cout << "threadId=" << threadId << std::endl; }); } }); std::thread thread2([&pool] { for (int i = 0; i < 100; ++i) { auto threadId = std::this_thread::get_id(); pool.addTask([threadId] { std::cout << "threadId=" << threadId << std::endl; }); } }); std::this_thread::sleep_for(std::chrono::seconds(2)); getchar(); pool.stop(); thread1.join(); thread2.join(); } int main(int argc, char** argv) { testThreadPool(); return 0; } ```