typedef std::function<void(void)> MyFun; typedef enum XTaskPriority { XTaskPriority_High, XTaskPriority_Default, XTaskPriority_Low, XTaskPriority_Background, }XTaskPriority; class XThreadPool; class XTaskQueue { std::mutex mutex; std::queue<MyFun> queue; public: XThreadPool *runInPool; static std::shared_ptr<XTaskQueue> getMainQueue() { //TODO::maybe mutithread cause problem static std::shared_ptr<XTaskQueue> mainQueue(new XTaskQueue()); return mainQueue; } static std::shared_ptr<XTaskQueue> getGlobleQueue(XTaskPriority priority) { static std::shared_ptr<XTaskQueue> backgroundQueue(new XTaskQueue()); static std::shared_ptr<XTaskQueue> lowQueue(new XTaskQueue()); static std::shared_ptr<XTaskQueue> defaultQueue(new XTaskQueue()); static std::shared_ptr<XTaskQueue> highQueue(new XTaskQueue()); switch (priority) { case XTaskPriority_Background: return backgroundQueue; case XTaskPriority_Low: return lowQueue; case XTaskPriority_High: return highQueue; case XTaskPriority_Default: default: return defaultQueue; } } bool pop(MyFun &fun) { std::unique_lock<std::mutex> lk(mutex); if(queue.size() > 0) { fun = std::move(queue.back()); queue.pop(); return true; } return false; } void push(const MyFun &fun) { mutex.lock(); queue.push(fun); mutex.unlock(); } }; class XThreadPool { private: std::atomic_int_fast8_t threadNum; int_fast8_t maxNum; std::atomic_int taskNum; std::mutex mutex; std::condition_variable cv; void runLoop() { ++threadNum; do { std::unique_lock<std::mutex> lk(mutex); auto iter = taskQueue.begin(); auto end = taskQueue.end(); MyFun fun; bool hasFun = false; while (iter != end) { if((*iter)->pop(fun)) { hasFun = true; break; } ++iter; } if (hasFun) { lk.unlock(); fun(); } else { cv.wait(lk); } } while (1); --threadNum; } public: std::vector<std::shared_ptr<XTaskQueue>> taskQueue; void onQueueChanged() { mutex.lock(); cv.notify_one(); mutex.unlock(); } //初始化两个全局Pool static void initGlobelPool() { static XThreadPool mainThread; mainThread.taskQueue.push_back(XTaskQueue::getMainQueue()); XTaskQueue::getMainQueue()->runInPool = &mainThread; mainThread.startNewThread(); static XThreadPool backgroundThreadsPool; backgroundThreadsPool.taskQueue.push_back(XTaskQueue::getGlobleQueue(XTaskPriority_High)); backgroundThreadsPool.taskQueue.push_back(XTaskQueue::getGlobleQueue(XTaskPriority_Default)); backgroundThreadsPool.taskQueue.push_back(XTaskQueue::getGlobleQueue(XTaskPriority_Low)); backgroundThreadsPool.taskQueue.push_back(XTaskQueue::getGlobleQueue(XTaskPriority_Background)); XTaskQueue::getGlobleQueue(XTaskPriority_Background)->runInPool = &backgroundThreadsPool; XTaskQueue::getGlobleQueue(XTaskPriority_Low)->runInPool = &backgroundThreadsPool; XTaskQueue::getGlobleQueue(XTaskPriority_Default)->runInPool = &backgroundThreadsPool; XTaskQueue::getGlobleQueue(XTaskPriority_High)->runInPool = &backgroundThreadsPool; for (int i = 0; i < 4; ++i) { backgroundThreadsPool.startNewThread(); } } void startNewThread() { std::thread thread(std::bind(&XThreadPool::runLoop, this)); thread.detach(); } XThreadPool() { taskNum = 0; } virtual ~XThreadPool() { } }; struct XTask { public: std::function<void(void)> fun; std::shared_ptr<XTaskQueue> queue; std::chrono::time_point<std::chrono::system_clock> time; bool operator <(const XTask& rh) { return time < rh.time; } }; class XDispatchManager { private: std::mutex mutex; std::condition_variable cv; std::vector<XTask> mQueue; void runLoop() { do { std::unique_lock<std::mutex> lk(mutex); if (mQueue.empty()) { cv.wait(lk); } else { cv.wait_until(lk, mQueue.begin()->time); auto now = std::chrono::system_clock::now(); if (now >= mQueue.begin()->time) { auto taskQueue = mQueue.begin()->queue; taskQueue->push(mQueue.begin()->fun); if (taskQueue->runInPool) { taskQueue->runInPool->onQueueChanged(); } mQueue.erase(mQueue.begin()); } } } while (1); } public: static XDispatchManager* getSharedInstance() { static XDispatchManager *manager = new XDispatchManager(); return manager; } XDispatchManager() { XThreadPool *pool = new XThreadPool(); std::thread thread(std::bind(&XDispatchManager::runLoop, this)); thread.detach(); } inline void dispatchAsnyc(std::shared_ptr<XTaskQueue> taskQueue, const MyFun &fun) { taskQueue->push(fun); if(taskQueue->runInPool) { taskQueue->runInPool->onQueueChanged(); } } void dispatchAfter(std::shared_ptr<XTaskQueue> taskQueue, const MyFun &fun, long delayMS) { XTask task; task.fun = fun; task.queue = taskQueue; task.time = std::chrono::system_clock::now() + std::chrono::milliseconds(delayMS); mutex.lock(); mQueue.insert(std::upper_bound(mQueue.begin(), mQueue.end(), task, [](const XTask &lh, XTask &rh)->bool { return lh.time < rh.time; }), task); mutex.unlock(); cv.notify_one(); } };
Can you be more specific about the content of your article? After reading it, I still have some doubts. Hope you can help me.