typedef std::function<void(void)> MyFun;
typedef enum XTaskPriority {
XTaskPriority_High,
XTaskPriority_Default,
XTaskPriority_Low,
XTaskPriority_Background,
}XTaskPriority;
class XThreadPool;
class XTaskQueue {
std::mutex mutex;
std::queue<MyFun> queue;
public:
XThreadPool *runInPool;
static std::shared_ptr<XTaskQueue> getMainQueue() {
//TODO::maybe mutithread cause problem
static std::shared_ptr<XTaskQueue> mainQueue(new XTaskQueue());
return mainQueue;
}
static std::shared_ptr<XTaskQueue> getGlobleQueue(XTaskPriority priority) {
static std::shared_ptr<XTaskQueue> backgroundQueue(new XTaskQueue());
static std::shared_ptr<XTaskQueue> lowQueue(new XTaskQueue());
static std::shared_ptr<XTaskQueue> defaultQueue(new XTaskQueue());
static std::shared_ptr<XTaskQueue> highQueue(new XTaskQueue());
switch (priority) {
case XTaskPriority_Background:
return backgroundQueue;
case XTaskPriority_Low:
return lowQueue;
case XTaskPriority_High:
return highQueue;
case XTaskPriority_Default:
default:
return defaultQueue;
}
}
bool pop(MyFun &fun) {
std::unique_lock<std::mutex> lk(mutex);
if(queue.size() > 0) {
fun = std::move(queue.back());
queue.pop();
return true;
}
return false;
}
void push(const MyFun &fun) {
mutex.lock();
queue.push(fun);
mutex.unlock();
}
};
class XThreadPool {
private:
std::atomic_int_fast8_t threadNum;
int_fast8_t maxNum;
std::atomic_int taskNum;
std::mutex mutex;
std::condition_variable cv;
void runLoop() {
++threadNum;
do {
std::unique_lock<std::mutex> lk(mutex);
auto iter = taskQueue.begin();
auto end = taskQueue.end();
MyFun fun;
bool hasFun = false;
while (iter != end) {
if((*iter)->pop(fun)) {
hasFun = true;
break;
}
++iter;
}
if (hasFun) {
lk.unlock();
fun();
} else {
cv.wait(lk);
}
} while (1);
--threadNum;
}
public:
std::vector<std::shared_ptr<XTaskQueue>> taskQueue;
void onQueueChanged() {
mutex.lock();
cv.notify_one();
mutex.unlock();
}
//初始化两个全局Pool
static void initGlobelPool() {
static XThreadPool mainThread;
mainThread.taskQueue.push_back(XTaskQueue::getMainQueue());
XTaskQueue::getMainQueue()->runInPool = &mainThread;
mainThread.startNewThread();
static XThreadPool backgroundThreadsPool;
backgroundThreadsPool.taskQueue.push_back(XTaskQueue::getGlobleQueue(XTaskPriority_High));
backgroundThreadsPool.taskQueue.push_back(XTaskQueue::getGlobleQueue(XTaskPriority_Default));
backgroundThreadsPool.taskQueue.push_back(XTaskQueue::getGlobleQueue(XTaskPriority_Low));
backgroundThreadsPool.taskQueue.push_back(XTaskQueue::getGlobleQueue(XTaskPriority_Background));
XTaskQueue::getGlobleQueue(XTaskPriority_Background)->runInPool = &backgroundThreadsPool;
XTaskQueue::getGlobleQueue(XTaskPriority_Low)->runInPool = &backgroundThreadsPool;
XTaskQueue::getGlobleQueue(XTaskPriority_Default)->runInPool = &backgroundThreadsPool;
XTaskQueue::getGlobleQueue(XTaskPriority_High)->runInPool = &backgroundThreadsPool;
for (int i = 0; i < 4; ++i) {
backgroundThreadsPool.startNewThread();
}
}
void startNewThread() {
std::thread thread(std::bind(&XThreadPool::runLoop, this));
thread.detach();
}
XThreadPool() {
taskNum = 0;
}
virtual ~XThreadPool() {
}
};
struct XTask {
public:
std::function<void(void)> fun;
std::shared_ptr<XTaskQueue> queue;
std::chrono::time_point<std::chrono::system_clock> time;
bool operator <(const XTask& rh) {
return time < rh.time;
}
};
class XDispatchManager {
private:
std::mutex mutex;
std::condition_variable cv;
std::vector<XTask> mQueue;
void runLoop() {
do {
std::unique_lock<std::mutex> lk(mutex);
if (mQueue.empty()) {
cv.wait(lk);
} else {
cv.wait_until(lk, mQueue.begin()->time);
auto now = std::chrono::system_clock::now();
if (now >= mQueue.begin()->time) {
auto taskQueue = mQueue.begin()->queue;
taskQueue->push(mQueue.begin()->fun);
if (taskQueue->runInPool) {
taskQueue->runInPool->onQueueChanged();
}
mQueue.erase(mQueue.begin());
}
}
} while (1);
}
public:
static XDispatchManager* getSharedInstance() {
static XDispatchManager *manager = new XDispatchManager();
return manager;
}
XDispatchManager() {
XThreadPool *pool = new XThreadPool();
std::thread thread(std::bind(&XDispatchManager::runLoop, this));
thread.detach();
}
inline void dispatchAsnyc(std::shared_ptr<XTaskQueue> taskQueue, const MyFun &fun) {
taskQueue->push(fun);
if(taskQueue->runInPool) {
taskQueue->runInPool->onQueueChanged();
}
}
void dispatchAfter(std::shared_ptr<XTaskQueue> taskQueue, const MyFun &fun, long delayMS) {
XTask task;
task.fun = fun;
task.queue = taskQueue;
task.time = std::chrono::system_clock::now() + std::chrono::milliseconds(delayMS);
mutex.lock();
mQueue.insert(std::upper_bound(mQueue.begin(), mQueue.end(), task, [](const XTask &lh, XTask &rh)->bool {
return lh.time < rh.time;
}), task);
mutex.unlock();
cv.notify_one();
}
};