当前位置: 首页 > 科技观察

Node.jsC++层任务管理

时间:2023-03-17 12:44:02 科技观察

好久没更新了,今天写个笔记。我们都知道Node.js是基于事件循环运行的,本质上是生产者/消费者模型,所以任务管理机制是必不可少的。不过本文不介绍事件循环中的任务管理,而是介绍C++层的任务管理。本文主要介绍SetImmediate、SetImmediateThreadsafe、RequestInterrupt、AddCleanupHook这四个API产生的任务。时间关系,随便写,权当笔记。任务管理机制的初始化首先我们看一下Node.js启动过程中任务管理相关的逻辑。uv_check_start(immediate_check_handle(),CheckImmediate)uv_async_init(event_loop(),&task_queues_async_,[](uv_async_t*async){Environment*env=ContainerOf(&Environment::task_queues_async_,async);env->RunAndClearNativeImmediates();})CheckImmediate是在check阶段执行的函数,task_queues_async_用于线程间通信,即子线程向主线程提交任务时,通过task_queues_async_通知主线程,然后主线程执行由注册的回调uv_async_init。上面的代码是消费者的逻辑。后面会详细分析里面的处理流程。提交任务接下来我们一一看一下生产者的逻辑。模板voidEnvironment::SetImmediate(Fn&&cb,CallbackFlags::Flagsflags){autocallback=native_immediates_.CreateCallback(std::move(cb),flags);native_immediates_.Push(std::move(callback));//...}SetImmediate用于同一线程中的代码提交任务。模板voidEnvironment::SetImmediateThreadsafe(Fn&&cb,CallbackFlags::Flagsflags){autocallback=native_immediates_threadsafe_.CreateCallback(std::move(cb),flags);{Mutex::ScopedLock锁(native_immediates_thread_safe_native_immediates_threadsafe_).Push(std::move(callback));如果(task_queues_async_initialized_)uv_async_send(&task_queues_async_);}}SetImmediateThreadsafe用于子线程向主线程提交任务,所以需要加锁。模板voidEnvironment::RequestInterrupt(Fn&&cb){autocallback=native_immediates_interrupts_.CreateCallback(std::move(cb),CallbackFlags::kRefed);{Mutex::ScopedLock锁(native_immediates_threadsafe_mutex_);native_immediates(中断std::move(回调));如果(task_queues_async_initialized_)uv_async_send(&task_queues_async_);}RequestInterruptFromV8();}RequestInterrupt用于子线程向主线程提交代码。他和SetImmediateThreadsafe一个重要的区别就是调用了RequestInterruptFromV8。voidEnvironment::RequestInterruptFromV8(){isolate()->RequestInterrupt([](Isolate*isolate,void*data){std::unique_ptrenv_ptr{static_cast(data)};环境*env=*env_ptr;env->RunAndClearInterrupts();},interrupt_data);}RequestInterrupt可以让提交的代码在JS代码死循环时依然执行。接下来看AddCleanupHook。voidEnvironment::AddCleanupHook(CleanupQueue::Callbackfn,void*arg){cleanup_queue_.Add(fn,arg);}AddCleanupHook用于注册线程退出前的回调。生产者的逻辑比较简单,就是往任务队列中插入一个任务,如果涉及到线程间的任务,就通知主线程。消费者接下来看消费者的逻辑。根据前面的分析,我们可以知道消费者有几个:CheckImmediate、task_queues_async_处理函数、RequestInterrupt注册函数、退出前回调处理函数。先看看CheckImmediate。voidEnvironment::CheckImmediate(uv_check_t*handle){Environment*env=Environment::from_immediate_check_handle(handle);env->RunAndClearNativeImmediates();}voidEnvironment::RunAndClearNativeImmediates(boolonly_refed){RunAndClearInterrupts();autodrain_list=[&](NativeImmediateQueue*queue){while(autohead=queue->Shift()){head->Call(this);}返回假;};while(drain_list(&native_immediates_)){}NativeImmediateQueuethreadsafe_immediates;如果(native_immediates_threadsafe_.size()>0){Mutex::ScopedLock锁(native_immediates_threadsafe_mutex_);threadsafe_immediates.ConcatMove(std::move(native_immediates_threadsafe_));}while(drain_list(&threadsafe_immediates)){}}voidEnvironment::RunAndClearInterrupts(){while(native_immediates_interrupts_.size()>0){NativeImmediateQueue队列;{Mutex::ScopedLock锁(native_immediates_threadsafe_mutex_);queue.ConcatMove(std::move(native_immediates_interrupts_));}while(autohead=queue.Shift())head->Call(this);}}CheckImmediate函数处理SetImmediate、SetImmediateThreadsafe和RequestInterrupt产生的任务但是如果主线程阻塞在PollIO阶段时,提交任务时只有子线程会唤醒主线程,具体通过task_queues_async_结构,看处理函数。env->RunAndClearNativeImmediates();可以看到SetImmediate、SetImmediateThreadsafe和RequestInterrupt产生的任务也是在这个时候处理的。最后,让我们看一下退出前处理回调的函数。具体时机在FreeEnvironment函数中的env->RunCleanup()。voidEnvironment::RunCleanup(){RunAndClearNativeImmediates(true);while(!cleanup_queue_.empty()||principal_realm_->HasCleanupHooks()||native_immediates_.size()>0||native_immediates_threadsafe_.size()>0||native_immediates_interrupts_.size()>0){//见CleanupQueue::Draincleanup_queue_.Drain();RunAndClearNativeImmediates(真);}}//cleanup_queue_.Drain();voidCleanupQueue::Drain(){std::vector回调(cleanup_hooks_.begin(),cleanup_hooks_.end());std::sort(callbacks.begin(),callbacks.end(),[](constCleanupHookCallback&a,constCleanupHookCallback&b){returna.insertion_order_counter_>b.insertion_order_counter_;});for(constCleanupHookCallback&cb:callbacks){cb.fn_(cb.arg_);cleanup_hooks_.erase(cb);}}RunCleanup中同时处理了SetImmediate、SetImmediateThreadsafe、RequestInterrupt生成的任务和注册的预退出回调