当前位置: 首页 > 科技观察

一篇带你了解io_uring和Node.js的文章

时间:2023-03-14 20:17:52 科技观察

前言:io_uring是由JensAxboe开发的异步IO框架,在Linux内核5.1引入。这篇文章介绍了什么是异步框架和io_uring的一些基本内容,最后介绍一个关于node.js(Libuv)中io_uring的pr,之前有提到但是还没有合并。1io_uring简介在io_uring之前,Linux并没有成熟的异步IO能力。什么是异步IO?回想一下我们读取资源的过程,我们可以通过阻塞或者非阻塞的方式调用read和readv,或者通过epoll监听文件描述,以字符和事件的方式,在回调中调用read系列函数进行读取。这些API有一个共同点,无论是主动检测还是被动检测资源是否可读,当可读时,进程都需要自己执行读操作。io_uring的强大之处在于不需要进程自己主动进行读操作,而是内核在读完后通知进程。与epoll相比,io_uring更进了一步。类似的能力还有windows的IOCP。2io_uring基本使用2.1初始化io_uring和epoll一样,API不多,但是io_uring比epoll复杂很多。我们首先需要调用io_uring_setup来初始化io_uring并得到一个fd。intring_fd;unsigned*sring_tail,*sring_mask,*sring_array,*cring_head,*cring_tail,*cring_mask;structio_uring_sqe*sqes;structio_uring_cqe*cqes;charbuff[BLOCK_SZ];off_toffset;structio_uring_paramsp;m*sq_ptr,,sizeof(p));//得到io_uring对应的fditring_fd=io_uring_setup(QUEUE_DEPTH,&p);intsring_sz=p.sq_off.array+p.sq_entries*sizeof(unsigned);intcring_sz=p.cq_off.cqes+p。cq_entries*sizeof(structio_uring_cqe);//将ring_fd映射到mmap返回的地址,我们可以通过操作返回地址来操作ring_fd,达到用户和内核共享数据的目的cq_ptr=sq_ptr=mmap(0,sring_sz,PROT_READ|PROT_WRITE,MAP_SHARED|MAP_POPULATE,ring_fd,IORING_OFF_SQ_RING);sqes=mmap(0,p.sq_entries*sizeof(structio_uring_sqe),PROT_READ|PROT_WRITE,MAP_SHARED|MAP_POPULATE,ring_fd,IORING_OFF_SQES);//保存任务队列的地址和完成队列,稍后提交获取任务并完成任务节点,需要使用sring_tail=sq_ptr+p.sq_off.tail;sring_mask=sq_ptr+p.sq_off.ring_mask;sring_array=sq_ptr+p.sq_off.array;cring_head=cq_ptr+p.cq_off.head;cring_tail=cq_ptr+p.cq_off.tail;cring_mask=cq_ptr+p.cq_off.ring_mask;cqes=cq_ptr+p.cq_off.cqes;io_uring不仅实现起来很复杂,使用起来也很复杂,但目前只需要对原理有一个大概的了解即可。上述代码的主要用途如下。1birth之后的io_uring,得到一个io_uring实例对应的fd。2将io_uring对应的fd通过mmap映射到一个内存地址,然后我们就可以通过操作内存地址与内核进行通信了。3保存任务队列和完成队列的地址信息,后面会用到。2.2提交任务我们看到io_uring底层维护了任务队列(sq)和完成队列(completionqueue)两个队列(cq)。相应的节点称为sqe和cqe。当我们需要操作某个资源时,我们可以获取一个seq,填写字段,然后提交给内核。我们来看看sqe的核心字段。structio_uring_sqe{__u8opcode;/*操作类型,如读写*/__s32fd;/*资源对应的fd*/__u64off;/*资源的偏移量(操作起点)*/__u64addr;/*保存数据的第一个内存地址*/__u32len;/*数据长度*/__u64user_data;/*用户自定义字段,通常用于关联请求和响应*/__u8flags;/*Flags*/...};io_uring_sqe的核心字段比较好理解,structure一个请求发出后,插入到内核的请求任务队列中。然后调用io_uring_enter通知内核有任务需要处理。我们可以设置在返回之前调用io_uring_enter时等待多少请求。此外,内核处理轮询模式。此时内核会启动内核线程检测任务是否完成,进程不需要调用io_uring_enter。下面是我们发送读取请求的逻辑。unsignedindex,tail;tail=*sring_tail;//在请求队列中获取空闲位置,是一个环,需要环回index=tail&*sring_mask;structio_uring_sqe*sqe=&sqes[index];//初始化请求structuresqe->opcode=op;//读取fdsqe->fd=fd;//将读取到的数据保存到buff中//响应时关联buff即可找到对应的请求上下文sqe->addr=(unsignedlong)buff;sqe->user_data=(unsignedlonglong)buff;memset(buff,0,sizeof(buff));sqe->len=BLOCK_SZ;sqe->off=offset;//插入请求队列sring_array[index]=index;//updateindextail++;//通知内核有任务需要处理,等待任务完成后返回io_uring_smp_store_release(sring_tail,tail);intret=io_uring_enter(ring_fd,1,1,IORING_ENTER_GETEVENTS);2.3任务完成时当任务完成时,io_uring_enter会返回。但是这里有一个问题。请求任务和响应不对应。内核不保证任务完成的顺序。内核只是告诉我们哪些任务已经完成。我们可以通过user_data关联请求和响应,类似于rpc通信中的seq。user_data字段在request中设置,会在response中返回,让请求者知道response对应的是哪个request。响应对应的结构比较简单。structio_uring_cqe{/*用户自定义字段,通常用于关联请求和响应*/__u64user_data;/*系统调用的返回码,如read*/__s32res;//暂时不用__u32flags;};我们这里假设请求和响应是串行的,所以不需要使用user_data字段来关联请求和响应。从前面的代码可以看出,我们将数据读入了buff变量。我们来看看内核返回后我们的处理逻辑。structio_uring_cqe*cqe;unsignedhead,reaped=0;//获取完成队列的头节点,消费buff中存储的数据head=io_uring_smp_load_acquire(cring_head);cqe=&cqes[head&(*cring_mask)];//更新头部索引head++;io_uring_smp_store_release(cring_head,head);这是io_uring一次读操作的大致流程。我们看到用户层的逻辑还是比较复杂的,作者也想到了,所以封装了Libring库,简化了使用。3Liburing的使用那么我们怎么使用呢,我们来回忆一下epoll的使用。//创建epoll实例intepollfd=epoll_create();//封装fd并订阅事件structepoll_eventevent;event.events=EPOLLIN;event.data.fd=listenFd;//注册到epollpoll_ctl(epollfd,EPOLL_CTL_ADD,listenFd,&event);//等待事件触发intnum=epoll_wait(epollfd,events,MAX_EVENTS,-1);for(i=0;ifd=fd;//初始化uv__handle_init(loop,&backend_data->poll_handle,UV_POLL);backend_data->poll_handle.flags|=UV_HANDLE_INTERNAL;//初始化poll_handle的io观察者,fd为io_uring的fd,回调为uv__io_uring_done。uv__io_init(&backend_data->poll_handle.io_watcher,uv__io_uring_done,ring->ring_fd);loop->flags|=UV_LOOP_USE_IOURING;loop->backend.data=backend_data;我们看到初始化了io_uring,在初始化By的时候初始化了一个ioobservation。让我们看看接下来在哪里使用它。intuv_fs_read(uv_loop_t*loop,uv_fs_t*req,uv_filefile,constuv_buf_tbu??fs[],unsignedintnbufs,int64_toff,uv_fs_cbcb){intrc;INIT(READ);req->file=file;req->nbufs=nbufs;req->bufs=req->bufsml;memcpy(req->bufs,bufs,nbufs*sizeof(*bufs));req->off=off;/*先调用uv__platform_fs_read,如果不支持,再降级到原来的线程池方案staticintuv__fs_retry_with_threadpool(intrc){returnrc==UV_ENOSYS||rc==UV_ENOTSUP||rc==UV_ENOMEM;}*/rc=uv__platform_fs_read(loop,req,file,bufs,nbufs,off,cb);if(!uv__fs_retry_with_threadpool(rc))returnrc;//这里来说明降级方案POST的使用;}uv_fs_read函数是读取文件内容时执行的函数。在提交任务到线程池之前,经过修改,增加了一个前置逻辑uvplatform_fs_read。让我们看看uvplatform_fs_read。intuv__platform_fs_read(uv_loop_t*loop,uv_fs_t*req,uv_os_fd_tfile,constuv_buf_tbu??fs[],unsignedintnbufs,int64_toff,uv_fs_cbcb){returnuv__io_uring_fs_work(IORING_OP_READV,loop,req,file,bufs,nbufs,off,cb);}intuv__io_uring_fs_work(uint8_topcode,uv_loop_t*loop,uv_fs_t*req,uv_os_fd_tfile,constuv_buf_tbu??fs[],unsignedintnbufs,int64_toff,uv_fs_cbcb){structuv__backend_data_io_uring*backend_data;structio_uring_sqe*sqe;intsubmitted;uint32_tincr_val;uv_poll_t*handle;backend_data=loop->backend2->backend2->backend_data=loop->backend2>pending+1;//得到一个请求结构体sqe=io_uring_get_sqe(&backend_data->ring);//初始化请求sqe->opcode=opcode;sqe->fd=file;sqe->off=off;sqe->addr=(uint64_t)req->bufs;sqe->len=nbufs;//管理req上下文,任务完成时会使用sqe->user_data=(uint64_t)req;//提交给内核,非-阻塞调用,返回提交任务数submitted=io_uring_submit(&backend_data->ring);//提交成功if(submitted==1){req->priv.fs_req_engine|=UV__ENGINE_IOURING;//提交时是第一个任务,然后注册io观察者等待可读事件if(backend_data->;pending++==0){handle=&backend_data->poll_handle;uv__io_start(loop,&handle->io_watcher,POLLIN);uv__handle_start(handle);}return0;}returnUV__ERR(errno);}我们看到上面的代码会给出kernel提交一个任务,但不等待内核返回,在提交第一个任务时注册一个可读事件给epoll。下面看一下io_uring的poll接口的实现(epoll的原理可以参考上一篇文章)。static__poll_tio_uring_poll(structfile*file,poll_table*wait){structio_ring_ctx*ctx=file->private_data;__poll_tmask=0;poll_wait(file,&ctx->cq_wait,wait);smp_rmb();//如果提交队列未满,可以这样写if(READ_ONCE(ctx->rings->sq.tail)-ctx->cached_sq_head!=ctx->rings->sq_ring_entries)mask|=EPOLLOUT|EPOLLWRNORM;//如果完成队列不为空,则可以读取if(io_cqring_events(ctx,false))mask|=EPOLLIN|EPOLLRDNORM;returnmask;}staticunsignedio_cqring_events(structio_ring_ctx*ctx,boolnoflush){structio_rings*rings=ctx->rings;smp_rmb();//完整队列是不为空,则可以读取returnctx->cached_cq_ONtail-READ_(rings->cq.head);}所以当io_uring有任务完成,即完成队列不为空时,会在libuv的poll中检测io,回调将被执行。voiduv_io_uring_done(uv_loop_t*loop,uv_io_t*w,unsignedintevents){uv_poll_t*handle;structio_uring*ring;structuv__backend_data_io_uring*backend_data;structio_uring_cqe*cqe;uv_fs_t*req;intfinished1;handle=container_of(w,uv_poll_t,io_watcher);backend_data=loop->backend.data;ring=&backend_data->ring;finished1=0;while(1){//获取完成节点io_uring_peek_cqe(ring,&cqe);//所有任务完成,注销事件if(--backend_data->pending==0)uv_poll_stop(handle);//获取响应对应的请求上下文req=(void*)(uintptr_t)cqe->user_data;if(req->result==0)req->result=cqe->res;io_uring_cq_advance(ring,1);//执行回调req->cb(req);}}至此我们已经看到了这个pr的逻辑,主要是引入io_uring用于文件io,由于兼容性问题在Libuv中使用线程实现了pool,io_uring支持普通文件,自然可以在新版linux上用来替代线程池方案。后记:io_uring既强大又复杂。一切都由内核处理,完成后我们会得到通知。不仅我们不再需要手动执行read,而且还减少了系统调用的开销,尤其是在需要多次读取的时候。看起来很棒,io_uring---Linux上真正的异步IO。但是里面包含的知识远不止于此,有空我会更新的。