前言:最近研究了io_uring这个Linux下的高性能异步IO框架,并尝试将其引入到Node.js中进行应用。所以本文打算介绍内核中io_uring的实现,因为io_uring的实现代码量大,逻辑复杂,只能慢慢分析。本文介绍io_uring初始化接口io_uring_setup的实现。staticlongio_uring_setup(u32entries,structio_uring_params__user*params){structio_uring_paramsp;inti;if(copy_from_user(&p,params,sizeof(p)))return-EFAULT;//支持的flagif(p.flags&~(IORING_SETUP_IOPOLL|IORING_SETUP_SQPOLL|IORING_SETUP_SQING_SQINGIORING_SETUP_CLAMP|IORING_SETUP_ATTACH_WQ))return-EINVAL;returnio_uring_create(entries,&p,params);}io_uring_setup是对io_uring_create的封装。第一个参数entries指定请求队列的长度,第二个参数params是用于调用者和内核之间通信的结构体。让我们看看定义。structio_uring_params{//定义请求队列长度(2的sq_entries次方),调用者定义__u32sq_entries;//完成队列长度,默认为2*请求队列长度__u32cq_entries;//控制内核行为的标志__u32flags;//Thecpu__u32sq_thread_cpuboundtothekernelthreadenabledinpollmode;//启用poll模式的内核线程的空闲时间之后会被挂起。__u32sq_thread_idle;//内核当前支持的能力,内核设置__u32features;__u32wq_fd;__u32resv[3];//记录内核数据的结构,调用者在后续调用mmap时需要用到。structio_sqring_offsetssq_off;structio_cqring_offsetscq_off;};让我们看看io_uring_create。staticintio_uring_create(unsignedentries,structio_uring_params*p,structio_uring_params__user*params){structuser_struct*user=NULL;structio_ring_ctx*ctx;boollimit_mem;intret;p->sq_entries=roundup_pow_of_two(entries);){p->cq_entries=roundup_pow_of_two(p->cq_entries);//完成队列不能小于请求队列if(p->cq_entriessq_entries)return-EINVAL;//如果超过阈值,需要设置IORING_SETUP_CLAMPflagif(p->cq_entries>IORING_MAX_CQ_ENTRIES){if(!(p->flags&IORING_SETUP_CLAMP))return-EINVAL;p->cq_entries=IORING_MAX_CQ_ENTRIES;}}else{//默认为两倍requestqueuelengthp->cq_entries=2*p->sq_entries;}//用户信息user=get_uid(current_user());//分配一个ctx记录context,因为调用者只能获取fd,关联的context会ctx=io_ring_ctx_alloc(p);ctx->user=user;//轮询方式相关的数据结构ctx->sqo_task=get_task_struct(current);//分配一个io_ringsret=io_allocate_scq_urings(ctx,p);//处理poll方式的逻辑ret=io_sq_offload_start(ctx,p);//后面还有很多,后面再分析}io_uring_create里面的代码很多,我们分析ste一步一步。首先分配了一个io_ring_ctx结构体,它是核心数据结构体,用于记录io_uring实例的上下文,但是我们暂时不需要了解它的具体定义,因为太多了,而我们本文只关注相关领域。1分配一个io_rings结构,然后调用io_allocate_scq_urings分配一个io_rings结构。这是非常核心的逻辑。我们看一下io_rings的定义。structio_rings{structio_uringsq,cq;u32sq_ring_mask,cq_ring_mask;u32sq_ring_entries,cq_ring_entries;u32sq_dropped;u32sq_flags;u32cq_flags;u32cq_overflow;主记录信息完成队列和structio_uring_cqecqes[];};我们继续看io_allocate_scq_urings。staticintio_allocate_scq_urings(structio_ring_ctx*ctx,structio_uring_params*p){structio_rings*rings;size_tsize,sq_array_offset;//记录请求和完成队列大小到ctxctx->sq_entries=p->sq_entries;ctx->cq_entries=p->cq_entries;计算结构体和额外数组的大小,sq_array_offset保存结构体的大小,size保存结构体+额外数组+另一个额外数组的大小*/size=rings_size(p->sq_entries,p->cq_entries,&sq_array_offset);//Allocatingmemoryrings=io_mem_alloc(size);//...}io_allocate_scq_urings有很多细节,我们分开来分析,看看rings_size的逻辑。staticunsignedlongrings_size(unsignedsq_entries,unsignedcq_entries,size_t*sq_offset){structio_rings*rings;size_toff,sq_array_size;//计算结构体和额外数组的大小,见io_rings定义off=struct_size(rings,cqes,cq_entries);set/record/sq_offsetsizeif(sq_offset)*sq_offset=off;//计算多个u32元素组成的数组的大小sq_array_size=array_size(sizeof(u32),sq_entries);//计算结构体的大小+sq_array_size保存到offif(check_add_overflow(off,sq_array_size,&off))returnSIZE_MAX;returnoff;}struct_size是一个宏,用于计算结构和附加字段的大小。我们刚才看到在io_rings结构体的定义中,最后一个字段是structio_uring_cqecqes[],看起来是一个空数组。其实它的内存是紧跟在结构体之后分配的,结构体如下。让我们看看struct_size是如何计算的。#definestruct_size(p,member,count)\__ab_c_size(count,\sizeof(*(p)->member)+__must_be_array((p)->member),\sizeof(*(p)))staticinline__must_checksize_t__ab_c_size(size_ta,size_tb,size_tc){size_tbytes;//计算a*b并保存到bytesif(check_mul_overflow(a,b,&bytes))returnSIZE_MAX;//计算bytes+c并保存到bytesif(check_add_overflow(bytes,c,&bytes))returnSIZE_MAX;返回字节;}我们看到计算方式是数组元素的大小*元素个数+结构本身的大小。计算完结构体的大小后,通过array_size计算另外一个数组的大小并相加,所以io_rings的结构体如下。分配好io_rings后,我们继续看下逻辑。staticintio_allocate_scq_urings(structio_ring_ctx*ctx,structio_uring_params*p){//...//在ctx->rings=rings中记录到ctx;//在rings结构中记录sq_array,u32数组的首地址ctx->sq_array=(u32*)((char*)rings+sq_array_offset);//用于环回处理rings->sq_ring_mask=p->sq_entries-1;rings->cq_ring_mask=p->cq_entries-1;//队列长度rings->sq_ring_entries=p->sq_entries;rings->cq_ring_entries=p->cq_entries;ctx->sq_mask=rings->sq_ring_mask;ctx->cq_mask=rings->cq_ring_mask;//请求队列的数组大小size=array_size(sizeof(structio_uring_sqe),p->sq_entries);//分配内存,记录到sq_sqesctx->sq_sqes=io_mem_alloc(size);return0;}经过一系列的设置,结构如下。创建完io_rings结构后,我们继续回到io_uring_create。2设置io_uring_params内核申请完一系列结构后,需要通过io_uring_params结构返回给调用者。staticintio_uring_create(unsignedentries,structio_uring_params*p,structio_uring_params__user*params){ret=io_allocate_scq_urings(ctx,p);0,sizeof(p->sq_off));//记录信阶段的结构移位p->sq_off.head=offsetof(structio_rings,sq.head);p->sq_off.tail=offsetof(structio_rings,sq.tail);p->sq_off.ring_mask=offsetof(structio_rings,sq_ring_mask);p->sq_off.ring_entries=offsetof(structio_rings,sq_ring_entries);p->sq_off.flags=offsetof(structio_rings,sq_flags);p->sq_off。dropped=offsetof(structio_rings,sq_dropped);p->sq_off.array=(char*)ctx->sq_array-(char*)ctx->rings;memset(&p->cq_off,0,sizeof(p->cq_off));p->cq_off.head=offsetof(structio_rings,cq.head);p->cq_off.tail=offsetof(structio_rings,cq.tail);p->cq_off.ring_mask=offsetof(structio_rings,cq_ring_mask);p->cq_off.ring_entries=offsetof(structio_rings,cq_ring_entries);p->cq_off.overflow=offsetof(structio_rings,cq_overflow);p->cq_off.cqes=offsetof(structio_rings,cqes);p->cq_off.flags=offsetof(structio_rings,cq_flags);//内核支持的属性p->features=IORING_FEAT_SINGLE_MMAP|IORING_FEAT_NODROP|ioring_feat_submit_stable|ioring_feat_rw_cur_pos|ioring_feat_cur_personality|ioring_feat_fact_fast_poll|ioring_feat_feat_poll_32bits;copy_to_to_user(params,param,sizeof(sizeof(*p))文件描述符内核通过io_uring_get_fd获取文件描述符返回给调用者。staticintio_uring_get_fd(structio_ring_ctx*ctx){structfile*file;//获取一个可用的fdintret=get_unused_fd_flags(O_RDWR|O_CLOEXEC);//分配一个文件结构,设置函数集为io_uring_fops,并关联上下文ctxfile=anon_inode_getfile("[io_uring]",&io_uring_fops,ctx,O_RDWR|O_CLOEXEC);//关联fd和file结构fd_install(ret,file);returnret;}io_uring_get_fd申请了一个fd和file,遵循vfs的设计,最重要的是把io_uring函数集挂在文件上。当通过fd操作io_uring实例时,会在经过vfs后执行相应的函数。另外,需要将ctx与file关联起来,因为通过fd操作io_uring时,需要获取fd对应的io_uring上下文。迄今为止。io_uring_setup解析完毕,但还不能使用。在io_uring的设计中,为了降低系统调用和用户与内核数据通信的开销,用户和内核共享数据结构,使得用户和内核可以操作同一个数据结构来达到通信目的,而无需使用系统调用。无需来回复制设计。为了达到这个目的,用户在拿到io_uring实例之后,还需要调用mmap来获取对应的内存映射。我们通过liburing库的逻辑来分析。4参见liburing库中io_uring的使用intio_uring_queue_init_params(unsignedentries,structio_uring*ring,structio_uring_params*p){intfd,ret;//调用io_uring_setup,getfdfd=__sys_io_uring_setup(entries,p);if(fd<0)return-errno;//内存映射ret=io_uring_queue_mmap(fd,p,ring);//保存系统支持的属性ring->features=p->features;return0;}下面重点介绍io_uring_queue_mmap。intio_uring_queue_mmap(intfd,structio_uring_params*p,structio_uring*ring){intret;memset(ring,0,sizeof(*ring));ret=io_uring_mmap(fd,p,&ring->sq,&ring->cq);//记录flags和fdif(!ret){ring->flags=p->flags;ring->ring_fd=fd;}returnret;}继续看io_uring_mmap。staticintio_uring_mmap(intfd,structio_uring_params*p,structio_uring_sq*sq,structio_uring_cq*cq){size_tsize;intret;//请求队列需要映射的内存大小,即整个结构体的大小structio_ringssq->ring_sz=p->sq_off。array+p->sq_entries*sizeof(unsigned);//请求队列的内存大小与完成的队列映射相同,等于请求队列的cq->ring_sz=sq->ring_sz;//映射得到虚拟地址,大小为sq->ring_szsq->ring_ptr=mmap(0,sq->ring_sz,PROT_READ|PROT_WRITE,MAP_SHARED|MAP_POPULATE,fd,IORING_OFF_SQ_RING);cq->ring_ptr=sq->ring_ptr;//通过首地址和偏移量得到对应字段的地址sq->khead=sq->ring_ptr+p->sq_off.head;sq->ktail=sq->ring_ptr+p->sq_off.tail;sq->kring_mask=sq->ring_ptr+p->sq_off.ring_mask;sq->kring_entries=sq->ring_ptr+p->sq_off.ring_entries;sq->kflags=sq->ring_ptr+p->sq_off.flags;sq->kdropped=sq->ring_ptr+p->sq_off.dropped;sq->array=sq->ring_ptr+p->sq_off.array;//映射内存size=p->sq_entries*sizeof(structio_uring_sqe);sq->sqes=mmap(0,大小,PROT_READ|PROT_WRITE,MAP_SHARED|MAP_POPULATE,fd,IORING_OFF_SQES);//同上=cq->ring_ptr+p->cq_off.ring_mask;cq->kring_entries=cq->ring_ptr+p->cq_off.ring_entries;cq->koverflow=cq->ring_ptr+p->cq_off.overflow;cq->cqes=cq->ring_ptr+p->cq_off.cqes;if(p->cq_off.flags)cq->kflags=cq->ring_ptr+p->cq_off.flags;return0;}io_uring_mmap保存了一些常用的字段信息,最重要的是做内存映射。再来看mmap的最后一个参数,分别是IORING_OFF_SQ_RING和IORING_OFF_SQES。接下来我们看看io_uring的mmaphook的实现。staticintio_uring_mmap(structfile*file,structvm_area_struct*vma){size_tsz=vma->vm_end-vma->vm_start;unsignedlongpfn;void*ptr;ptr=io_uring_validate_mmap_request(file,vma->vm_pgoff,sz);pfn=virt_to_phys(ptr)>PAGE_SHIFT;返回remap_pfn_range(vma,vma->vm_start,pfn,sz,vma->vm_page_prot);if(sz>page_size(page))returnERR_PTR(-EINVAL);returnptr;}这里设计的内容涉及复杂的内存管理。从代码中我们大概知道返回的地址是ctx->rings和ctx->sq_sqes。即我们在操作mmap返回的虚拟地址时,映射到内核的数据结构就是ctx这个字段。这样就完成了数据共享。最终的结构图如下。至此,分析告一段落。io_uring的实现真的很复杂,需要反复阅读和思考才能逐渐理解和理解它的原理。后记:io_uring作为新一代的IO框架,未来各大软件中应该都会用到,尤其是对性能要求极高的服务器,非常值得关注和学习。