watchdog 的概念大概是定时去做一些事情,具体的概念可以网上搜索,本文主要是介绍一下使用 Libuv 实现的 watchdog,背景主要是因为 Node.js 是单线程的,一旦主线程繁忙或者陷入死循环,那么整个进程都无法工作了。
虽然 Node.js 在 JS 层实现了子线程模块,但是因为子线程持有单独的 V8 Isolate 和 loop,所以存在很多限制。比如我们希望在主线程繁忙时还能收集到主线程的一些数据(进程 / V8 内存、CPU 使用情况等)。这时候就需要使用 addon 去实现,addon 虽然是 C、C++ 等语言写的,但是依然跑在主线程里。所以我们需要使用底层提供的原生子线程加上 addon 实现我们的需求。本文正是基于这种场景的 watchdog。
首先看一下整体的类结构。
下面看一下每个类的作用和实现。
因为存在主线程和子线程,线程间会互相向对方提交任务,这就涉及到多线程互斥访问的问题。TaskManager 是负责管理任务的类,是线程安全的。
复制
class TaskManager {public:TaskManager() {uv_mutex_init(&tasks_mutex);};void add_task(task_cb cb, void * data) {struct task* t = new task;t->cb = cb;t->data = data;uv_mutex_lock(&tasks_mutex);tasks.push_back(t);uv_mutex_unlock(&tasks_mutex);};void handle_task() {std::vector<struct task*> task_list;uv_mutex_lock(&tasks_mutex);task_list.swap(tasks);uv_mutex_unlock(&tasks_mutex);std::vector<struct task*>::iterator iter;for(iter = task_list.begin(); task_list.end() != iter;) {struct task* t = (*iter);t->cb(t->data);iter = task_list.erase(iter);delete t;}};private:std::vector<struct task*> tasks;uv_mutex_t tasks_mutex;};
1.
2.
3.
4.
5.
6.
7.
8.
9.
10.
11.
12.
13.
14.
15.
16.
17.
18.
19.
20.
21.
22.
23.
24.
25.
26.
27.
28.
29.
30.
31.
实现很简单,主要是使用 uv_mutex_t 解决互斥问题,提供 add_task 和 handle_task 两个方法。add_task 是追加任务,handle_task 是处理任务。handle_task 具体的调用时机由 TaskManager 的持有者决定。
WatchDog 是对 Libuv 定时器的封装,核心数据结构是 watch_dog_ctx。
复制
typedef void (*watch_dog_cb)(void* ctx);struct watch_dog_ctx{ void * data; // 上下文 watch_dog_cb work; // 任务函数 int poll_interval; // 定时时间};
1.
2.
3.
4.
5.
6.
7.
watch_dog_ctx 是对一个定时任务的封装。watch_dog_ctx 的任务会在子线程中定时被执行,这就解决了主线程陷入繁忙时无法工作的问题。
复制
NodeWatchDog::WatchDog::WatchDog(uv_loop_t* loop, struct watch_dog_ctx* ctx) {this->ctx = ctx;uv_timer_init(loop, &timer);timer.data = this;is_stop = false;}void NodeWatchDog::WatchDog::start() {if (is_stop) {return;}uv_timer_start(&timer,[](uv_timer_t* timer) { NodeWatchDog::WatchDog* watch_dog = (NodeWatchDog::WatchDog*)timer->data;struct watch_dog_ctx* ctx = watch_dog->get_ctx();ctx->work(ctx);}, ctx->poll_interval, ctx->poll_interval);};void NodeWatchDog::WatchDog::stop() {is_stop = true;uv_timer_stop(&timer);}
1.
2.
3.
4.
5.
6.
7.
8.
9.
10.
11.
12.
13.
14.
15.
16.
17.
18.
19.
20.
21.
22.
23.
24.
25.
26.
27.
WatchDogWorker 是负责管理子线程和 watchdog 的类。首先看一下定义。
复制
class WatchDogWorker {public:WatchDogWorker();~WatchDogWorker() {};void start();void stop();void add_watchdog(struct watch_dog_ctx* watchdog);void add_task(task_cb cb, void * data);void handle_task();uv_loop_t* get_event_loop() {return &loop;}uv_sem_t * get_thread_sem() {return &sem;}private:uv_loop_t loop;uv_thread_t tid;uv_sem_t sem;TaskManager task_manager;uv_async_t notify_async;std::vector<WatchDog*> watch_dogs;};
1.
2.
3.
4.
5.
6.
7.
8.
9.
10.
11.
12.
13.
14.
15.
16.
17.
18.
19.
20.
21.
22.
23.
1)add_watchdog 用于新增 watchdog。
复制
void NodeWatchDog::WatchDogWorker::add_watchdog(struct watch_dog_ctx* watchdog_ctx) { NodeWatchDog::WatchDog *watchdog = new NodeWatchDog::WatchDog(&loop, watchdog_ctx); watch_dogs.push_back(watchdog);}
1.
2.
3.
4.
2)start 函数用于创建子线程和启动 watchdog。
复制
void NodeWatchDog::WatchDogWorker::start() {std::vector<NodeWatchDog::WatchDog*>::iterator iter;for(iter = watch_dogs.begin(); watch_dogs.end() != iter; iter++) {(*iter)->start();}int r = uv_thread_create(&tid, [](void *data) {NodeWatchDog::WatchDogWorker * worker = (NodeWatchDog::WatchDogWorker *)data;uv_sem_post((uv_sem_t*)worker->get_thread_sem());uv_loop_t * loop = worker->get_event_loop();uv_run(loop, UV_RUN_DEFAULT);uv_loop_close(loop);}, (void *)this);if (!r) {uv_sem_wait(&sem);} uv_sem_destroy(&sem);}
1.
2.
3.
4.
5.
6.
7.
8.
9.
10.
11.
12.
13.
14.
15.
16.
17.
18.
启动 watchdog 时会往子线程的 loop 里插入一个定时器。然后创建子线程,在子线程里开启一个新的 loop。在这个 loop 里就会不断执行 watchdog 的任务。
3)add_task 用于其他线程外另一个线程插入一个任务。下面是任务的定义。
复制
typedef void (*task_cb)(void *data);struct task{task(){cb = nullptr;data = nullptr;}task_cb cb;void *data;};
1.
2.
3.
4.
5.
6.
7.
8.
9.
10.
定义很简单,一个工作函数和对应的上下文。接着看 add_task。
复制
void NodeWatchDog::WatchDogWorker::add_task(task_cb cb, void * data) {task_manager.add_task(cb, data);uv_async_send(¬ify_async);}
1.
2.
3.
4.
add_task 直接调用 task_manager 的 add_task 函数插入任务,因为它保证了线程安全。接着通过 Libuv 提供的 async 线程间通信机制通知另一个线程有新任务。如果是在 Node.js 里的话,还需要通过另一种方式进行通知,这里就不具体展开。
此处,我们就实现了在子线程里定时执行任务,并实现主线程和子线程互相提交任务的能力。最终再实现一个 WatchDogManager 用于管理多个 worker。
复制
class WatchDogManager{public:WatchDogManager(uv_loop_t *loop);~WatchDogManager() {};void start();void stop();void add_task(task_cb cb, void *data);void handle_task();void add_worker(WatchDogWorker* worker);WatchDogWorker* get_worker(int index) { return workers[index]; };private:uv_async_t notify_async;TaskManager task_manager;std::vector<WatchDogWorker*> workers;};
1.
2.
3.
4.
5.
6.
7.
8.
9.
10.
11.
12.
13.
14.
15.
16.
17.
通过 WatchDogManager 的类定义,我们大概就知道它的功能。首先看一下 start 方法。
复制
void NodeWatchDog::WatchDogManager::start(){ std::vector<NodeWatchDog::WatchDogWorker*>::iterator iter; for(iter = workers.begin(); workers.end() != iter; iter++) { (*iter)->start(); }}
1.
2.
3.
4.
5.
6.
7.
start 函数就是启动多个 worker,刚才已经介绍过,worker 会创建一个子线程定时执行任务。其他方法就没有太多逻辑,就不一一介绍。
接下来看看使用方式。
复制
#include "src/watch_dog_manager.h"#include "uv.h"#include "stdio.h"using namespace NodeWatchDog;int main() {setbuf(stdout, NULL);uv_loop_t loop;uv_loop_init(&loop);WatchDogWorker *worker = new WatchDogWorker();struct watch_dog_ctx* ctx = new watch_dog_ctx;ctx->work = [](void *ctx) {printf("worker task execute in thread id => %ld\n", (long)pthread_self());};ctx->poll_interval = 1000;worker->add_watchdog(ctx);worker->start();uv_idle_t idle;uv_idle_init(&loop, &idle);uv_idle_start(&idle, [](uv_idle_t *) {});printf("main thread id => %ld\n", (long)pthread_self());uv_run(&loop, UV_RUN_DEFAULT);delete worker;delete ctx;return 0;}
1.
2.
3.
4.
5.
6.
7.
8.
9.
10.
11.
12.
13.
14.
15.
16.
17.
18.
19.
20.
21.
22.
23.
24.
25.
新建一个 watchdog 和 worker,并把 watchdog 插入到 worker 中,最后启动 worker。同时主线程也进入自己的 loop。执行输出如下。
复制
main thread id => 4338490880worker task execute in thread id => 123145480077312worker task execute in thread id => 123145480077312
1.
2.
3.
我们看到,printf 分别执行在不同的线程中。接下来再看一下复杂的场景。
复制
#include "src/watch_dog_manager.h"#include "uv.h"#include "stdio.h"using namespace NodeWatchDog;int main() {setbuf(stdout, NULL);uv_loop_t loop;uv_loop_init(&loop);WatchDogManager *manager = new WatchDogManager(&loop);WatchDogWorker *worker = new WatchDogWorker();struct watch_dog_ctx* ctx = new watch_dog_ctx;ctx->data = (void *)manager;ctx->work = [](void *ctx) {struct watch_dog_ctx* watchdog_ctx = (struct watch_dog_ctx*)ctx;WatchDogManager* manager = (WatchDogManager*)watchdog_ctx->data;// 提交任务给主线程 manager->add_task([](void *data) {printf("manager task execute in thread id => %ld\n", (long)pthread_self());WatchDogManager* manager = (WatchDogManager*)data;// 提交任务给某个子线程 manager->get_worker(0)->add_task([](void *data) {printf("worker task execute in thread id => %ld\n", (long)pthread_self());}, nullptr);}, manager);};ctx->poll_interval = 1000;worker->add_watchdog(ctx);manager->add_worker(worker);manager->start();uv_idle_t idle;uv_idle_init(&loop, &idle);uv_idle_start(&idle, [](uv_idle_t *) {//});printf("main thread id => %ld\n", (long)pthread_self());uv_run(&loop, UV_RUN_DEFAULT);delete manager;delete worker;delete ctx;return 0;}
1.
2.
3.
4.
5.
6.
7.
8.
9.
10.
11.
12.
13.
14.
15.
16.
17.
18.
19.
20.
21.
22.
23.
24.
25.
26.
27.
28.
29.
30.
31.
32.
33.
34.
35.
36.
37.
38.
39.
40.
41.
42.
上面的例子中,当在子线程中执行回调时,通过 manager->add_task 往主线程提及一个任务。然后在主线程执行这个任务时,首先输出当前线程 id,再通过 manager->get_worker(0)->add_task 给子线程提交一个任务。比如我们希望定时收集主线程的 CPU 数据,那么就可以给子线程插入一个 watchdog,然后在 watchdog 回调里给主线程提交一个任务,当主线程执行这个任务的时候,我们就可以拿到主线程的 CPU 数据。当然还有更复杂的场景,比如获取 CPU Profile 时,主线程和子线程会进行多次交互。最后再看一个多个 worker 线程的例子。
复制
WatchDogManager *manager = new WatchDogManager(&loop); WatchDogWorker *worker1 = new WatchDogWorker(); WatchDogWorker *worker2 = new WatchDogWorker(); struct watch_dog_ctx* ctx = new watch_dog_ctx; ctx->work = [](void *ctx) { printf("manager task execute in thread id => %ld\n", (long)pthread_self()); }; ctx->poll_interval = 1000; worker1->add_watchdog(ctx); worker2->add_watchdog(ctx); manager->add_worker(worker1); manager->add_worker(worker2); manager->start();
1.
2.
3.
4.
5.
6.
7.
8.
9.
10.
11.
12.
13.
以上代码输出如下。
复制
main thread id => 4469550592manager task execute in thread id => 123145483321344manager task execute in thread id => 123145491722240manager task execute in thread id => 123145483321344manager task execute in thread id => 123145491722240
1.
2.
3.
4.
5.
我们看到存在多个子线程,并且成功执行了任务。
后记:单线程使得编码变得简单,但是也导致了一些限制,这时候我们就需要使用额外的子线程来解决单线程存在的限制。引入多线程后,就需要解决多线程互斥和通信问题。以上代码只是介绍了一个大致的思路,还有地方需要完善,如果有想法的同学也可以交流。
仓库:https://github.com/theanarkh/uv-watchdog