聊聊使用 Libuv 实现的 Watchdog,你学会了吗?

本文主要是介绍一下使用 Libuv 实现的 watchdog,背景主要是因为 Node.js 是单线
首页 新闻资讯 行业资讯 聊聊使用 Libuv 实现的 Watchdog,你学会了吗?

watchdog 的概念大概是定时去做一些事情,具体的概念可以网上搜索,本文主要是介绍一下使用 Libuv 实现的 watchdog,背景主要是因为 Node.js 是单线程的,一旦主线程繁忙或者陷入死循环,那么整个进程都无法工作了。

虽然 Node.js 在 JS 层实现了子线程模块,但是因为子线程持有单独的 V8 Isolate 和 loop,所以存在很多限制。比如我们希望在主线程繁忙时还能收集到主线程的一些数据(进程 / V8 内存、CPU 使用情况等)。这时候就需要使用 addon 去实现,addon 虽然是 C、C++ 等语言写的,但是依然跑在主线程里。所以我们需要使用底层提供的原生子线程加上 addon 实现我们的需求。本文正是基于这种场景的 watchdog。

首先看一下整体的类结构。

52d3542686a6d36389e834a1f360a18438f334.png

下面看一下每个类的作用和实现。

1.TaskManager

因为存在主线程和子线程,线程间会互相向对方提交任务,这就涉及到多线程互斥访问的问题。TaskManager 是负责管理任务的类,是线程安全的。

复制

class TaskManager {public:TaskManager() {uv_mutex_init(&tasks_mutex);};void add_task(task_cb cb, void * data) {struct task* t = new task;t->cb = cb;t->data = data;uv_mutex_lock(&tasks_mutex);tasks.push_back(t);uv_mutex_unlock(&tasks_mutex);};void handle_task() {std::vector<struct task*> task_list;uv_mutex_lock(&tasks_mutex);task_list.swap(tasks);uv_mutex_unlock(&tasks_mutex);std::vector<struct task*>::iterator iter;for(iter = task_list.begin(); task_list.end() != iter;)   {struct task* t = (*iter);t->cb(t->data);iter = task_list.erase(iter);delete t;}};private:std::vector<struct task*> tasks;uv_mutex_t tasks_mutex;};
  • 1.

  • 2.

  • 3.

  • 4.

  • 5.

  • 6.

  • 7.

  • 8.

  • 9.

  • 10.

  • 11.

  • 12.

  • 13.

  • 14.

  • 15.

  • 16.

  • 17.

  • 18.

  • 19.

  • 20.

  • 21.

  • 22.

  • 23.

  • 24.

  • 25.

  • 26.

  • 27.

  • 28.

  • 29.

  • 30.

  • 31.

实现很简单,主要是使用 uv_mutex_t 解决互斥问题,提供 add_task 和 handle_task 两个方法。add_task 是追加任务,handle_task 是处理任务。handle_task 具体的调用时机由 TaskManager 的持有者决定。

2.WatchDog

WatchDog 是对 Libuv 定时器的封装,核心数据结构是 watch_dog_ctx。

复制

typedef void (*watch_dog_cb)(void* ctx);struct watch_dog_ctx{  
   
   void * data; // 上下文
   watch_dog_cb work; // 任务函数   int poll_interval; // 定时时间};
  • 1.

  • 2.

  • 3.

  • 4.

  • 5.

  • 6.

  • 7.

watch_dog_ctx 是对一个定时任务的封装。watch_dog_ctx 的任务会在子线程中定时被执行,这就解决了主线程陷入繁忙时无法工作的问题。

复制

NodeWatchDog::WatchDog::WatchDog(uv_loop_t* loop, struct watch_dog_ctx* ctx) {this->ctx = ctx;uv_timer_init(loop, &timer);timer.data = this;is_stop = false;}void NodeWatchDog::WatchDog::start() {if (is_stop) {return;}uv_timer_start(&timer,[](uv_timer_t* timer) { 
            NodeWatchDog::WatchDog* watch_dog = (NodeWatchDog::WatchDog*)timer->data;struct watch_dog_ctx* ctx = watch_dog->get_ctx();ctx->work(ctx);}, 
        ctx->poll_interval, 
        ctx->poll_interval);};void NodeWatchDog::WatchDog::stop() {is_stop = true;uv_timer_stop(&timer);}
  • 1.

  • 2.

  • 3.

  • 4.

  • 5.

  • 6.

  • 7.

  • 8.

  • 9.

  • 10.

  • 11.

  • 12.

  • 13.

  • 14.

  • 15.

  • 16.

  • 17.

  • 18.

  • 19.

  • 20.

  • 21.

  • 22.

  • 23.

  • 24.

  • 25.

  • 26.

  • 27.

3.WatchDogWorker

WatchDogWorker 是负责管理子线程和 watchdog 的类。首先看一下定义。

复制

class WatchDogWorker {public:WatchDogWorker();~WatchDogWorker() {};void start();void stop();void add_watchdog(struct watch_dog_ctx* watchdog);void add_task(task_cb cb, void * data);void handle_task();uv_loop_t* get_event_loop() {return &loop;}uv_sem_t * get_thread_sem() {return &sem;}private:uv_loop_t loop;uv_thread_t tid;uv_sem_t sem;TaskManager task_manager;uv_async_t notify_async;std::vector<WatchDog*> watch_dogs;};
  • 1.

  • 2.

  • 3.

  • 4.

  • 5.

  • 6.

  • 7.

  • 8.

  • 9.

  • 10.

  • 11.

  • 12.

  • 13.

  • 14.

  • 15.

  • 16.

  • 17.

  • 18.

  • 19.

  • 20.

  • 21.

  • 22.

  • 23.

1)add_watchdog 用于新增 watchdog。

复制

void NodeWatchDog::WatchDogWorker::add_watchdog(struct watch_dog_ctx* watchdog_ctx) {
   NodeWatchDog::WatchDog *watchdog = new NodeWatchDog::WatchDog(&loop, watchdog_ctx);
   watch_dogs.push_back(watchdog);}
  • 1.

  • 2.

  • 3.

  • 4.

2)start 函数用于创建子线程和启动 watchdog。

复制

void NodeWatchDog::WatchDogWorker::start() {std::vector<NodeWatchDog::WatchDog*>::iterator iter;for(iter = watch_dogs.begin(); watch_dogs.end() != iter; iter++)   {(*iter)->start();}int r = uv_thread_create(&tid, [](void *data) {NodeWatchDog::WatchDogWorker * worker = (NodeWatchDog::WatchDogWorker *)data;uv_sem_post((uv_sem_t*)worker->get_thread_sem());uv_loop_t * loop = worker->get_event_loop();uv_run(loop, UV_RUN_DEFAULT);uv_loop_close(loop);}, (void *)this);if (!r) {uv_sem_wait(&sem);} uv_sem_destroy(&sem);}
  • 1.

  • 2.

  • 3.

  • 4.

  • 5.

  • 6.

  • 7.

  • 8.

  • 9.

  • 10.

  • 11.

  • 12.

  • 13.

  • 14.

  • 15.

  • 16.

  • 17.

  • 18.

启动 watchdog 时会往子线程的 loop 里插入一个定时器。然后创建子线程,在子线程里开启一个新的 loop。在这个 loop 里就会不断执行 watchdog 的任务。

3)add_task 用于其他线程外另一个线程插入一个任务。下面是任务的定义。

复制

typedef void (*task_cb)(void *data);struct task{task(){cb = nullptr;data = nullptr;}task_cb cb;void *data;};
  • 1.

  • 2.

  • 3.

  • 4.

  • 5.

  • 6.

  • 7.

  • 8.

  • 9.

  • 10.

定义很简单,一个工作函数和对应的上下文。接着看 add_task。

复制

void NodeWatchDog::WatchDogWorker::add_task(task_cb cb, void * data) {task_manager.add_task(cb, data);uv_async_send(&notify_async);}
  • 1.

  • 2.

  • 3.

  • 4.

add_task 直接调用 task_manager 的 add_task 函数插入任务,因为它保证了线程安全。接着通过 Libuv 提供的 async 线程间通信机制通知另一个线程有新任务。如果是在 Node.js 里的话,还需要通过另一种方式进行通知,这里就不具体展开。

此处,我们就实现了在子线程里定时执行任务,并实现主线程和子线程互相提交任务的能力。最终再实现一个 WatchDogManager 用于管理多个 worker。

4.WatchDogManager

复制

class WatchDogManager{public:WatchDogManager(uv_loop_t *loop);~WatchDogManager() {};void start();void stop();void add_task(task_cb cb, void *data);void handle_task();void add_worker(WatchDogWorker* worker);WatchDogWorker* get_worker(int index) { return workers[index]; };private:uv_async_t notify_async;TaskManager task_manager;std::vector<WatchDogWorker*> workers;};
  • 1.

  • 2.

  • 3.

  • 4.

  • 5.

  • 6.

  • 7.

  • 8.

  • 9.

  • 10.

  • 11.

  • 12.

  • 13.

  • 14.

  • 15.

  • 16.

  • 17.

通过 WatchDogManager 的类定义,我们大概就知道它的功能。首先看一下 start 方法。

复制

void NodeWatchDog::WatchDogManager::start(){
  std::vector<NodeWatchDog::WatchDogWorker*>::iterator iter;
  for(iter = workers.begin(); workers.end() != iter; iter++)   
  {  (*iter)->start();
  }}
  • 1.

  • 2.

  • 3.

  • 4.

  • 5.

  • 6.

  • 7.

start 函数就是启动多个 worker,刚才已经介绍过,worker 会创建一个子线程定时执行任务。其他方法就没有太多逻辑,就不一一介绍。

5.使用

接下来看看使用方式。

复制

#include "src/watch_dog_manager.h"#include "uv.h"#include "stdio.h"using namespace NodeWatchDog;int main() {setbuf(stdout, NULL);uv_loop_t loop;uv_loop_init(&loop);WatchDogWorker *worker = new WatchDogWorker();struct watch_dog_ctx* ctx = new watch_dog_ctx;ctx->work = [](void *ctx) {printf("worker task execute in thread id => %ld\n", (long)pthread_self());};ctx->poll_interval = 1000;worker->add_watchdog(ctx);worker->start();uv_idle_t idle;uv_idle_init(&loop, &idle);uv_idle_start(&idle, [](uv_idle_t *) {});printf("main thread id => %ld\n", (long)pthread_self());uv_run(&loop, UV_RUN_DEFAULT);delete worker;delete ctx;return 0;}
  • 1.

  • 2.

  • 3.

  • 4.

  • 5.

  • 6.

  • 7.

  • 8.

  • 9.

  • 10.

  • 11.

  • 12.

  • 13.

  • 14.

  • 15.

  • 16.

  • 17.

  • 18.

  • 19.

  • 20.

  • 21.

  • 22.

  • 23.

  • 24.

  • 25.

新建一个 watchdog 和 worker,并把 watchdog 插入到 worker 中,最后启动 worker。同时主线程也进入自己的 loop。执行输出如下。

复制

main thread id => 4338490880worker task execute in thread id => 123145480077312worker task execute in thread id => 123145480077312
  • 1.

  • 2.

  • 3.

我们看到,printf 分别执行在不同的线程中。接下来再看一下复杂的场景。

复制

#include "src/watch_dog_manager.h"#include "uv.h"#include "stdio.h"using namespace NodeWatchDog;int main() {setbuf(stdout, NULL);uv_loop_t loop;uv_loop_init(&loop);WatchDogManager *manager = new WatchDogManager(&loop);WatchDogWorker *worker = new WatchDogWorker();struct watch_dog_ctx* ctx = new watch_dog_ctx;ctx->data = (void *)manager;ctx->work = [](void *ctx) {struct watch_dog_ctx* watchdog_ctx = (struct watch_dog_ctx*)ctx;WatchDogManager* manager = (WatchDogManager*)watchdog_ctx->data;// 提交任务给主线程
        manager->add_task([](void *data) {printf("manager task execute in thread id => %ld\n", (long)pthread_self());WatchDogManager* manager = (WatchDogManager*)data;// 提交任务给某个子线程
            manager->get_worker(0)->add_task([](void *data) {printf("worker task execute in thread id => %ld\n", (long)pthread_self());}, nullptr);}, manager);};ctx->poll_interval = 1000;worker->add_watchdog(ctx);manager->add_worker(worker);manager->start();uv_idle_t idle;uv_idle_init(&loop, &idle);uv_idle_start(&idle, [](uv_idle_t *) {//});printf("main thread id => %ld\n", (long)pthread_self());uv_run(&loop, UV_RUN_DEFAULT);delete manager;delete worker;delete ctx;return 0;}
  • 1.

  • 2.

  • 3.

  • 4.

  • 5.

  • 6.

  • 7.

  • 8.

  • 9.

  • 10.

  • 11.

  • 12.

  • 13.

  • 14.

  • 15.

  • 16.

  • 17.

  • 18.

  • 19.

  • 20.

  • 21.

  • 22.

  • 23.

  • 24.

  • 25.

  • 26.

  • 27.

  • 28.

  • 29.

  • 30.

  • 31.

  • 32.

  • 33.

  • 34.

  • 35.

  • 36.

  • 37.

  • 38.

  • 39.

  • 40.

  • 41.

  • 42.

上面的例子中,当在子线程中执行回调时,通过 manager->add_task 往主线程提及一个任务。然后在主线程执行这个任务时,首先输出当前线程 id,再通过 manager->get_worker(0)->add_task 给子线程提交一个任务。比如我们希望定时收集主线程的 CPU 数据,那么就可以给子线程插入一个 watchdog,然后在 watchdog 回调里给主线程提交一个任务,当主线程执行这个任务的时候,我们就可以拿到主线程的 CPU 数据。当然还有更复杂的场景,比如获取 CPU Profile 时,主线程和子线程会进行多次交互。最后再看一个多个 worker 线程的例子。

复制

 WatchDogManager *manager = new WatchDogManager(&loop);
 WatchDogWorker *worker1 = new WatchDogWorker();
 WatchDogWorker *worker2 = new WatchDogWorker();
 struct watch_dog_ctx* ctx = new watch_dog_ctx;
 ctx->work = [](void *ctx) { printf("manager task execute in thread id => %ld\n", (long)pthread_self());
 };
 ctx->poll_interval = 1000;
 worker1->add_watchdog(ctx);
 worker2->add_watchdog(ctx);
 manager->add_worker(worker1);
 manager->add_worker(worker2);
 manager->start();
  • 1.

  • 2.

  • 3.

  • 4.

  • 5.

  • 6.

  • 7.

  • 8.

  • 9.

  • 10.

  • 11.

  • 12.

  • 13.

以上代码输出如下。

复制

main thread id => 4469550592manager task execute in thread id => 123145483321344manager task execute in thread id => 123145491722240manager task execute in thread id => 123145483321344manager task execute in thread id => 123145491722240
  • 1.

  • 2.

  • 3.

  • 4.

  • 5.

我们看到存在多个子线程,并且成功执行了任务。

后记:单线程使得编码变得简单,但是也导致了一些限制,这时候我们就需要使用额外的子线程来解决单线程存在的限制。引入多线程后,就需要解决多线程互斥和通信问题。以上代码只是介绍了一个大致的思路,还有地方需要完善,如果有想法的同学也可以交流。

仓库:https://github.com/theanarkh/uv-watchdog

14    2022-03-05 23:29:18    Libuv watchdog Node.js