C++仿GCD多线程用法

typedef std::function<void(void)> MyFun;

typedef enum XTaskPriority {
    XTaskPriority_High,
    XTaskPriority_Default,
    XTaskPriority_Low,
    XTaskPriority_Background,
}XTaskPriority;

class XThreadPool;
class XTaskQueue {
    std::mutex mutex;
    std::queue<MyFun> queue;
public:
    XThreadPool *runInPool;
    static std::shared_ptr<XTaskQueue> getMainQueue() {
        //TODO::maybe mutithread cause problem
        static std::shared_ptr<XTaskQueue> mainQueue(new XTaskQueue());
        return mainQueue;
    }
    static std::shared_ptr<XTaskQueue> getGlobleQueue(XTaskPriority priority) {
        static std::shared_ptr<XTaskQueue> backgroundQueue(new XTaskQueue());
        static std::shared_ptr<XTaskQueue> lowQueue(new XTaskQueue());
        static std::shared_ptr<XTaskQueue> defaultQueue(new XTaskQueue());
        static std::shared_ptr<XTaskQueue> highQueue(new XTaskQueue());
        switch (priority) {
            case XTaskPriority_Background:
                return backgroundQueue;
            case XTaskPriority_Low:
                return lowQueue;
            case XTaskPriority_High:
                return highQueue;
            case XTaskPriority_Default:
            default:
                return defaultQueue;
        }
    }
     bool pop(MyFun &fun) {
        std::unique_lock<std::mutex> lk(mutex);
        if(queue.size() > 0) {
            fun = std::move(queue.back());
            queue.pop();
            return true;
        }
        return false;
    }
    void push(const MyFun &fun) {
        mutex.lock();
        queue.push(fun);
        mutex.unlock();
    }
};


class XThreadPool {
private:
    std::atomic_int_fast8_t threadNum;
    int_fast8_t maxNum;
    std::atomic_int taskNum;
    std::mutex mutex;
    std::condition_variable cv;
    void runLoop() {
        ++threadNum;
        do {
            std::unique_lock<std::mutex> lk(mutex);
            auto iter = taskQueue.begin();
            auto end = taskQueue.end();
            MyFun fun;
            bool hasFun = false;
            while (iter != end) {
                if((*iter)->pop(fun)) {
                    hasFun = true;
                    break;
                }
                ++iter;
            }
            if (hasFun) {
                lk.unlock();
                fun();
            } else {
                cv.wait(lk);
            }
        } while (1);
        --threadNum;
    }
public:
    std::vector<std::shared_ptr<XTaskQueue>> taskQueue;
    void onQueueChanged() {
        mutex.lock();
        cv.notify_one();
        mutex.unlock();
    }
    //初始化两个全局Pool
    static void initGlobelPool() {
        static XThreadPool mainThread;
        mainThread.taskQueue.push_back(XTaskQueue::getMainQueue());
        XTaskQueue::getMainQueue()->runInPool = &mainThread;
        mainThread.startNewThread();
        static XThreadPool backgroundThreadsPool;
        backgroundThreadsPool.taskQueue.push_back(XTaskQueue::getGlobleQueue(XTaskPriority_High));
        backgroundThreadsPool.taskQueue.push_back(XTaskQueue::getGlobleQueue(XTaskPriority_Default));
        backgroundThreadsPool.taskQueue.push_back(XTaskQueue::getGlobleQueue(XTaskPriority_Low));
        backgroundThreadsPool.taskQueue.push_back(XTaskQueue::getGlobleQueue(XTaskPriority_Background));
        XTaskQueue::getGlobleQueue(XTaskPriority_Background)->runInPool = &backgroundThreadsPool;
        XTaskQueue::getGlobleQueue(XTaskPriority_Low)->runInPool = &backgroundThreadsPool;
        XTaskQueue::getGlobleQueue(XTaskPriority_Default)->runInPool = &backgroundThreadsPool;
        XTaskQueue::getGlobleQueue(XTaskPriority_High)->runInPool = &backgroundThreadsPool;
        for (int i = 0; i < 4; ++i) {
            backgroundThreadsPool.startNewThread();
        }
    }
    void startNewThread() {
        std::thread thread(std::bind(&XThreadPool::runLoop, this));
        thread.detach();
    }
    XThreadPool() {
        taskNum = 0;
    }
    virtual ~XThreadPool() {
    }
};

struct XTask {
public:
    std::function<void(void)> fun;
    std::shared_ptr<XTaskQueue> queue;
    std::chrono::time_point<std::chrono::system_clock> time;
    bool operator <(const XTask& rh) {
        return time < rh.time;
    }
};

class XDispatchManager {
private:
    std::mutex mutex;
    std::condition_variable cv;
    std::vector<XTask> mQueue;
    void runLoop() {
        do {
            std::unique_lock<std::mutex> lk(mutex);
            if (mQueue.empty()) {
                cv.wait(lk);
            } else {
                cv.wait_until(lk, mQueue.begin()->time);
                auto now = std::chrono::system_clock::now();
                if (now >= mQueue.begin()->time) {
                    auto taskQueue =  mQueue.begin()->queue;
                    taskQueue->push(mQueue.begin()->fun);
                    if (taskQueue->runInPool) {
                        taskQueue->runInPool->onQueueChanged();
                    }
                    mQueue.erase(mQueue.begin());
                }
            }
        } while (1);
    }
public:
    static XDispatchManager* getSharedInstance() {
        static XDispatchManager *manager = new XDispatchManager();
        return manager;
    }
    XDispatchManager() {
        XThreadPool *pool = new XThreadPool();
        std::thread thread(std::bind(&XDispatchManager::runLoop, this));
        thread.detach();
    }
    inline void dispatchAsnyc(std::shared_ptr<XTaskQueue> taskQueue, const MyFun &fun) {
        taskQueue->push(fun);
        if(taskQueue->runInPool) {
            taskQueue->runInPool->onQueueChanged();
        }
    }
    void dispatchAfter(std::shared_ptr<XTaskQueue> taskQueue, const MyFun &fun, long delayMS) {
        XTask task;
        task.fun = fun;
        task.queue = taskQueue;
        task.time = std::chrono::system_clock::now() + std::chrono::milliseconds(delayMS);
        mutex.lock();
        mQueue.insert(std::upper_bound(mQueue.begin(), mQueue.end(), task, [](const XTask &lh, XTask &rh)->bool {
            return lh.time < rh.time;
        }), task);
        mutex.unlock();
        cv.notify_one();
    }
};

 

《C++仿GCD多线程用法》有1个想法

发表回复

您的电子邮箱地址不会被公开。 必填项已用 * 标注