当前位置: 首页 > 科技观察

说说用Libuv实现的Watchdog,你学会了吗?

时间:2023-03-22 17:01:31 科技观察

watchdog的概念就是每隔一段时间做一些事情。您可以在线搜索特定概念。本文主要介绍使用Libuv实现的watchdog。后台主要是因为Node.js是单线程的。一旦主线程忙碌或者陷入死循环,整个流程就无法进行。Node.js虽然在JS层实现了子线程模块,但是由于子线程持有单独的V8Isolate和loop,所以有很多限制。比如我们希望在主线程繁忙的时候收集主线程的一些数据(进程/V8内存,CPU使用率等)。这时候就需要使用addon来实现了。虽然addon是用C、C++等语言编写的,但它仍然运行在主线程中。所以我们需要使用底层加上addon提供的原生子线程来实现我们的需求。本文就是基于这种场景的看门狗。首先看整体的类结构。下面我们来看一下各个类的作用和实现。1、由于TaskManager中主线程和子线程的存在,线程之间会相互提交任务,这就涉及到多线程互斥访问的问题。TaskManager是负责管理任务的类,是线程安全的。类TaskManager{public:TaskManager(){uv_mutex_init(&tasks_mutex);};voidadd_task(task_cbcb,void*data){structtask*t=newtask;t->cb=cb;t->数据=数据;uv_mutex_lock(&tasks_mutex);tasks.push_back(t);uv_mutex_unlock(&tasks_mutex);};voidhandle_task(){std::vectortask_list;uv_mutex_lock(&tasks_mutex);task_list.swap(任务);uv_mutex_unlock(&tasks_mutex);std::vector::iteratoriter;for(iter=task_list.begin();task_list.end()!=iter;){structtask*t=(*iter);t->cb(t->数据);iter=task_list.erase(iter);删除吨;}};私有:std::vectortasks;uv_mutex_ttasks_mutex;};实现很简单,主要是利用uv_mutex_t解决互斥问题,提供add_task和handle_task两个方法add_task是附加任务,handle_task是处理任务。handle_task的具体调用时机由TaskManager的持有者决定。2.WatchDogWatchDog是对Libuv定时器的封装,核心数据结构是watch_dog_ctx。typedefvoid(*watch_dog_cb)(void*ctx);structwatch_dog_ctx{void*data;//上下文watch_dog_cb工作;//任务函数intpoll_interval;//计时时间};watch_dog_ctx是一个定时任务的封装。watch_dog_ctx的任务会在子线程中定时执行,解决了主线程繁忙时无法工作的问题。NodeWatchDog::WatchDog::WatchDog(uv_loop_t*loop,structwatch_dog_ctx*ctx){this->ctx=ctx;uv_timer_init(loop,&timer);计时器。数据=这个;is_stop=false;}voidNodeWatchDog::WatchDog::start(){如果(is_stop){返回;}uv_timer_start(&timer,[](uv_timer_t*timer){NodeWatchDog::WatchDog*watch_dog=(NodeWatchDog::WatchDog*)timer->data;structwatch_dog_ctx*ctx=watch_dog->get_ctx();ctx->work(ctx);},ctx->poll_interval,ctx->poll_interval);};voidNodeWatchDog::WatchDog::stop(){is_stop=true;uv_timer_stop(&timer);}3.WatchDogWorkerWatchDogWorker是负载管理子线程序和看门狗的类。首先看一下确定义。类WatchDogWorker{public:WatchDogWorker();~WatchDogWorker(){};无效开始();无效停止();voidadd_watchdog(structwatch_dog_ctx*看门狗);voidadd_task(task_cbcb,void*data);voidhandle_task();uv_loop_t*get_event_loop(){返回&loop;}uv_sem_t*get_thread_sem(){返回&sem;私人:uv_loop_t循环;uv_thread_ttid;uv_sem_tsem;任务管理器task_manager;uv_async_t通知异步;std::vectorwatch_dogs;};1)add_watchdog用于新增看门狗。voidNodeWatchDog::WatchDogWorker::add_watchdog(structwatch_dog_ctx*watchdog_ctx){NodeWatchDog::WatchDog*watchdog=newNodeWatchDog::WatchDog(&loop,看门狗_ctx);watch_dogs.push_back(watchdog);}2)start函数用于创建子线程序和启动watchdog。voidNodeWatchDog::WatchDogWorker::start(){std::vector::iteratoriter;for(iter=watch_dogs.begin();watch_dogs.end()!=iter;iter++){(*iter)->start();}intr=uv_thread_create(&tid,[](void*data){NodeWatchDog::WatchDogWorker*worker=(NodeWatchDog::WatchDogWorker*)data;uv_sem_post((uv_sem_t*)worker->get_thread_sem());uv_loop_t*loop=worker->get_event_loop();uv_run(loop,UV_RUN_DEFAULT);uv_loop_close(loop);},(void*)this);如果(!r){uv_sem_wait(&sem);}uv_sem_destroy(&sem);}启动看门狗时,会在子线程的循环中插入一个定时器。然后创建一个子线程,并在子线程中开始一个新的循环。在这个循环中,watchdog的任务会不断的执行。3)add_task用于在其他线程之外插入另一个线程的任务。下面是任务的定义。typedefvoid(*task_cb)(void*data);structtask{task(){cb=nullptr;数据=空指针;}task_cbcb;无效*数据;};定义很简单,一个工作函数和对应的上下文。然后看add_task。voidNodeWatchDog::WatchDogWorker::add_task(task_cbcb,void*data){task_manager.add_task(cb,data);uv_async_send(¬ify_async);}add_task直接调用task_manager的add_task函数插入任务,因为这样保证了线程安全。然后通过Libuv提供的异步线程间通信机制通知另一个线程有新任务。如果是在Node.js中,则需要通过其他方式通知,这里不再详述。到这里,我们就实现了子线程中任务的定时执行,实现了主线程和子线程相互提交任务的能力。最后,实现一个WatchDogManager来管理多个worker。4.WatchDogManagerclassWatchDogManager{public:WatchDogManager(uv_loop_t*loop);~WatchDogManager(){};无效开始();无效停止();voidadd_task(task_cbcb,void*data);voidhandle_task();voidadd_worker(WatchDogWorker*worker);WatchDogWorker*get_worker(intindex){returnworkers[index];};私有:uv_async_tnotify_async;任务管理器task_manager;std::vector工人;};知道它的功能。先看start方法。voidNodeWatchDog::WatchDogManager::start(){std::vector::iteratoriter;for(iter=workers.begin();workers.end()!=iter;iter++){(*iter)->start();}}start函数是启动多个worker。如前所述,一个worker会创建一个子线程来定时执行任务。其他方法逻辑不多,就不一一介绍了。5.使用接下来,让我们看看如何使用它。#include"src/watch_dog_manager.h"#include"uv.h"#include"stdio.h"使用命名空间NodeWatchDog;intmain(){setbuf(stdout,NULL);uv_loop_t循环;uv_loop_init(&loop);WatchDogWorker*worker=newWatchDogWorker();structwatch_dog_ctx*ctx=newwatch_dog_ctx;ctx->work=[](void*ctx){printf("workertaskexecuteinthreadid=>%ld\n",(long)pthread_self());};ctx->poll_interval=1000;worker->add_watchdog(ctx);工人->开始();uv_idle_t闲置;uv_idle_init(&loop,&idle);uv_idle_start(&idle,[](uv_idle_t*){});printf("主线程id=>%ld\n",(long)pthread_self());uv_run(&loop,UV_RUN_DEFAULT);删除工人;删除ctx;return0;}新建看门狗和worker,并将watchdog插入worker,最后启动worker。与此同时,主线程也进入了自己的循环。执行输出如下。mainthreadid=>4338490880workertaskexecuteinthreadid=>123145480077312workertaskexecuteinthreadid=>123145480077312我们看到printf是在不同的线程中执行的。让我们看看更复杂的场景。#include"src/watch_dog_manager.h"#include"uv.h"#include"stdio.h"usingnamespaceNodeWatchDog;intmain(){setbuf(stdout,NULL);uv_loop_t循环;uv_loop_init(&loop);WatchDogManager*manager=newWatchDogManager(&loop);WatchDogWorker*worker=newWatchDogWorker();structwatch_dog_ctx*ctx=newwatch_dog_ctx;ctx->data=(void*)manager;ctx->work=[](void*ctx){structwatch_dog_ctx*watchdog_ctx=(structwatch_dog_ctx*)ctx;WatchDogManager*manager=(WatchDogManager*)watchdog_ctx->data;//提交任务给主线程序manager->add_task([](void*data){printf("managertaskexecuteinthreadid=>%ld\n",(long)pthread_self());WatchDogManager*manager=(WatchDogManager*)data;//提交任务给某个子线程序manager->get_worker(0)->add_task([](void*data){printf("workertaskexecuteinthreadid=>%ld\n",(长)pthread_self());},空指针);},经理);};ctx->poll_interval=1000;worker->add_watchdog(ctx);经理->添加工人(工人);经理->开始();uv_idle_t闲置;uv_idle_init(&loop,&idle);uv_idle_start(&idle,[](uv_idle_t*){//});printf("主线程id=>%ld\n",(long)pthread_self());uv_run(&loop,UV_RUN_DEFAULT);删除经理;删除工人;删除ctx;return0;}在上面的例子中,当回调在子线程中执行时,通过manager->add_task向主线程提及一个任务,然后当主线程执行任务时,首先输出当前线程id,然后通过manager->get_worker(0)->add_task向子线程提交任务。比如我们要定时采集主线程的CPU数据,可以在子线程中插入看门狗,然后在看门狗回调中向主线程提交任务。当主线程执行这个任务的时候,我们就可以获得主线程的CPU。数据。当然还有更复杂的场景,比如获取CPUProfile时,主线程和子线程会进行多次交互。最后,看一个多工作线程的例子。WatchDogManager*manager=newWatchDogManager(&loop);WatchDogWorker*worker1=newWatchDogWorker();WatchDogWorker*worker2=newWatchDogWorker();structwatch_dog_ctx*ctx=newwatch_dog_ctx;ctx->work=[](void*ctx){printf("管理任务在线程id=>%ld\n中执行",(long)pthread_self());};ctx->poll_interval=1000;worker1->add_watchdog(ctx);worker2->add_watchdog(ctx);经理->add_worker(worker1);经理->add_worker(worker2);经理->开始();上述代码的输出如下。mainthreadid=>4469550592managertaskexecuteinthreadid=>123145483321344managertaskexecuteinthreadid=>123145491722240managertaskexecuteinthreadid=>123145483321344managertaskexecuteinthreadid=>123145491722240我们看到存在多个子线程,并且成功执行了任务。后记:单线程使编码更容易,但也导致了一些局限性。这时候就需要使用额外的子线程来解决单线程的局限性。引入多线程后,就要解决多线程的互斥和通信问题。上面的代码只是介绍了一个大概的思路,还有需要改进的地方。有想法的同学也可以交流。仓库:https://github.com/theanarkh/uv-watchdog