从内核看IO_Uring的实现(一)

最近研究了一下Linux的高性能异步IO框架io_uring,并尝试引入Node.js中应用起来。所
首页 新闻资讯 行业资讯 从内核看IO_Uring的实现(一)

[[410006]]

前言:最近研究了一下Linux的高性能异步IO框架io_uring,并尝试引入Node.js中应用起来。所以本文打算介绍一下io_uring在内核的实现,因为io_uring实现代码量大,逻辑复杂,所以只能慢慢分析。这一篇介绍io_uring初始化接口io_uring_setup的实现。

复制

static long io_uring_setup(u32 entries, struct io_uring_params __user *params){     struct io_uring_params p;     int i;      if (copy_from_user(&p, params, sizeof(p)))         return -EFAULT;     // 支持的flag     if (p.flags & ~(IORING_SETUP_IOPOLL | IORING_SETUP_SQPOLL |             IORING_SETUP_SQ_AFF | IORING_SETUP_CQSIZE |             IORING_SETUP_CLAMP | IORING_SETUP_ATTACH_WQ))         return -EINVAL;      return  io_uring_create(entries, &p, params); }
  • 1.

  • 2.

  • 3.

  • 4.

  • 5.

  • 6.

  • 7.

  • 8.

  • 9.

  • 10.

  • 11.

  • 12.

  • 13.

  • 14.

io_uring_setup是对io_uring_create的封装。第一个参数entries指定请求队列的长度,第二个参数params是用于调用方和内核通信的结构体。我们看一下定义。

复制

struct io_uring_params {     // 定义请求队列长度(2的sq_entries次方),调用方定义     __u32 sq_entries;     // 完成队列长度,默认是2 * 请求队列长度     __u32 cq_entries;     // 控制内核行为的标记     __u32 flags;     // poll模式下开启的内核线程绑定的cpu     __u32 sq_thread_cpu;     // poll模式下开启的内核线程空闲时间,之后会挂起。     __u32 sq_thread_idle;     // 内核当前支持的能力,内核设置     __u32 features;     __u32 wq_fd;     __u32 resv[3];     // 记录内核数据的结构体,调用方后续调用mmap需要用到。     struct io_sqring_offsets sq_off;     struct io_cqring_offsets cq_off; };
  • 1.

  • 2.

  • 3.

  • 4.

  • 5.

  • 6.

  • 7.

  • 8.

  • 9.

  • 10.

  • 11.

  • 12.

  • 13.

  • 14.

  • 15.

  • 16.

  • 17.

  • 18.

  • 19.

我们接着看io_uring_create。

复制

static int io_uring_create(unsigned entries, struct io_uring_params *p,                struct io_uring_params __user *params){     struct user_struct *user = NULL;     struct io_ring_ctx *ctx;     bool limit_mem;     int ret;      p->sq_entries = roundup_pow_of_two(entries);     // 自定义完成队列长度     if (p->flags & IORING_SETUP_CQSIZE) {         p->cq_entries = roundup_pow_of_two(p->cq_entries);         // 完成队列不能小于请求队列         if (p->cq_entries < p->sq_entries)             return -EINVAL;         // 超过阈值则需要设置IORING_SETUP_CLAMP标记         if (p->cq_entries > IORING_MAX_CQ_ENTRIES) {             if (!(p->flags & IORING_SETUP_CLAMP))                 return -EINVAL;             p->cq_entries = IORING_MAX_CQ_ENTRIES;         }     } else {         // 默认是两倍的请求队列长度         p->cq_entries = 2 * p->sq_entries;     }     // 用户信息     user = get_uid(current_user());     // 分配一个ctx记录上下文,因为调用方只能拿到fd,后续操作fd的时候会拿到关联的上下文     ctx = io_ring_ctx_alloc(p);     ctx->user = user;     // 和poll模式相关的数据结构     ctx->sqo_task = get_task_struct(current);     // 分配一个io_rings     ret = io_allocate_scq_urings(ctx, p);     // 处理poll模式的逻辑     ret = io_sq_offload_start(ctx, p);     // 后面还有很多,一会分析 }
  • 1.

  • 2.

  • 3.

  • 4.

  • 5.

  • 6.

  • 7.

  • 8.

  • 9.

  • 10.

  • 11.

  • 12.

  • 13.

  • 14.

  • 15.

  • 16.

  • 17.

  • 18.

  • 19.

  • 20.

  • 21.

  • 22.

  • 23.

  • 24.

  • 25.

  • 26.

  • 27.

  • 28.

  • 29.

  • 30.

  • 31.

  • 32.

  • 33.

  • 34.

  • 35.

  • 36.

  • 37.

io_uring_create代码比较多,我们分步分析。首先分配了一个io_ring_ctx结构体,这是核心的数据结构,用于记录io_uring实例的上下文,不过我们暂时不需要了解它具体的定义,因为实在太多,只关注本文相关的字段。

1 分配一个io_rings结构体

接着调用io_allocate_scq_urings分配一个io_rings结构体,这是非常核心的逻辑,我们看一下io_rings的定义。

复制

struct io_rings {     struct io_uring     sq, cq;     u32         sq_ring_mask, cq_ring_mask;     u32         sq_ring_entries, cq_ring_entries;     u32         sq_dropped;     u32         sq_flags;     u32         cq_flags;     u32         cq_overflow;     struct io_uring_cqe cqes[]; };
  • 1.

  • 2.

  • 3.

  • 4.

  • 5.

  • 6.

  • 7.

  • 8.

  • 9.

  • 10.

io_rings主要用于记录请求和完成队列的信息。我们继续看io_allocate_scq_urings。

复制

static int io_allocate_scq_urings(struct io_ring_ctx *ctx,                   struct io_uring_params *p){     struct io_rings *rings;     size_t size, sq_array_offset;     // 记录请求和完成队列大小到ctx     ctx->sq_entries = p->sq_entries;     ctx->cq_entries = p->cq_entries;     /*          计算结构体和额外数组的大小,sq_array_offset保存结构体大小,         size保存结构体+额外数组+另一个额外数组的大小     */     size = rings_size(p->sq_entries, p->cq_entries, &sq_array_offset);     // 分配内存     rings = io_mem_alloc(size);     // ... }
  • 1.

  • 2.

  • 3.

  • 4.

  • 5.

  • 6.

  • 7.

  • 8.

  • 9.

  • 10.

  • 11.

  • 12.

  • 13.

  • 14.

  • 15.

  • 16.

io_allocate_scq_urings细节比较多,我们分开分析,我们看一下rings_size的逻辑。

复制

static unsigned long rings_size(unsigned sq_entries, unsigned cq_entries,                 size_t *sq_offset){     struct io_rings *rings;     size_t off, sq_array_size;     // 计算结构体和格外数组的大小,见io_rings定义     off = struct_size(rings, cqes, cq_entries);     // sq_offset记录结构体大小     if (sq_offset)         *sq_offset = off;     // 计算多个u32元素的数组的大小     sq_array_size = array_size(sizeof(u32), sq_entries);     // 计算结构体大小 + sq_array_size的大小保存到off     if (check_add_overflow(off, sq_array_size, &off))         return SIZE_MAX;     return off; }
  • 1.

  • 2.

  • 3.

  • 4.

  • 5.

  • 6.

  • 7.

  • 8.

  • 9.

  • 10.

  • 11.

  • 12.

  • 13.

  • 14.

  • 15.

  • 16.

struct_size是计算结构体和额外字段大小的宏,我们刚才看到io_rings结构体的定义中,最后一个字段是struct io_uring_cqe  cqes[],看起来是个空数组,其实他的内存是紧跟着结构体后面分配的,结构如下。

下面我们看struct_size是如何计算的。

复制

#define struct_size(p, member, count)                   \     __ab_c_size(count,                      \             sizeof(*(p)->member) + __must_be_array((p)->member),\             sizeof(*(p)))  static inline __must_check size_t __ab_c_size(size_t a, size_t b, size_t c){     size_t bytes;     // 计算a * b保存到bytes     if (check_mul_overflow(a, b, &bytes))         return SIZE_MAX;     // 计算bytes + c保存搭配bytes     if (check_add_overflow(bytes, c, &bytes))         return SIZE_MAX;      return bytes; }
  • 1.

  • 2.

  • 3.

  • 4.

  • 5.

  • 6.

  • 7.

  • 8.

  • 9.

  • 10.

  • 11.

  • 12.

  • 13.

  • 14.

  • 15.

  • 16.

我们看到计算方式就是数组元素大小*元素个数+结构体本身的大小。计算完结构体大小后又通过array_size计算了另一个数组的大小并加起来,所以io_rings的结构体如下所示。

分配了io_rings之后我们继续看接下来的逻辑。

复制

static int io_allocate_scq_urings(struct io_ring_ctx *ctx,                   struct io_uring_params *p){     // ...     // 记录到ctx中     ctx->rings = rings;     // sq_array记录rings结构体中,u32数组的首地址     ctx->sq_array = (u32 *)((char *)rings + sq_array_offset);     // 用于回环处理     rings->sq_ring_mask = p->sq_entries - 1;     rings->cq_ring_mask = p->cq_entries - 1;     // 队列长度     rings->sq_ring_entries = p->sq_entries;     rings->cq_ring_entries = p->cq_entries;     ctx->sq_mask = rings->sq_ring_mask;     ctx->cq_mask = rings->cq_ring_mask;     // 请求队列的数组大小     size = array_size(sizeof(struct io_uring_sqe), p->sq_entries);     // 分配内存并记录到sq_sqes     ctx->sq_sqes = io_mem_alloc(size);     return 0; }
  • 1.

  • 2.

  • 3.

  • 4.

  • 5.

  • 6.

  • 7.

  • 8.

  • 9.

  • 10.

  • 11.

  • 12.

  • 13.

  • 14.

  • 15.

  • 16.

  • 17.

  • 18.

  • 19.

  • 20.

  • 21.

进行了一系列设置后,架构如下。

创建完io_rings结构体后,我们继续回到io_uring_create中。

2 设置io_uring_params

内核申请完系列结构体后,需要通过io_uring_params结构体返回给调用方。

复制

static int io_uring_create(unsigned entries, struct io_uring_params *p,                struct io_uring_params __user *params) {      ret = io_allocate_scq_urings(ctx, p);     // 初始化poll模式相关逻辑,如果开启了的话     ret = io_sq_offload_start(ctx, p);     memset(&p->sq_off, 0, sizeof(p->sq_off));     // 记录字段在结构体的偏移     p->sq_off.head = offsetof(struct io_rings, sq.head);     p->sq_off.tail = offsetof(struct io_rings, sq.tail);     p->sq_off.ring_mask = offsetof(struct io_rings, sq_ring_mask);     p->sq_off.ring_entries = offsetof(struct io_rings, sq_ring_entries);     p->sq_off.flags = offsetof(struct io_rings, sq_flags);     p->sq_off.dropped = offsetof(struct io_rings, sq_dropped);     p->sq_off.array = (char *)ctx->sq_array - (char *)ctx->rings;      memset(&p->cq_off, 0, sizeof(p->cq_off));     p->cq_off.head = offsetof(struct io_rings, cq.head);     p->cq_off.tail = offsetof(struct io_rings, cq.tail);     p->cq_off.ring_mask = offsetof(struct io_rings, cq_ring_mask);     p->cq_off.ring_entries = offsetof(struct io_rings, cq_ring_entries);     p->cq_off.overflow = offsetof(struct io_rings, cq_overflow);     p->cq_off.cqes = offsetof(struct io_rings, cqes);     p->cq_off.flags = offsetof(struct io_rings, cq_flags);     // 内核支持的属性     p->features = IORING_FEAT_SINGLE_MMAP | IORING_FEAT_NODROP |             IORING_FEAT_SUBMIT_STABLE | IORING_FEAT_RW_CUR_POS |             IORING_FEAT_CUR_PERSONALITY | IORING_FEAT_FAST_POLL |             IORING_FEAT_POLL_32BITS;      copy_to_user(params, p, sizeof(*p))     // 获取fd     ret = io_uring_get_fd(ctx);     return ret; }
  • 1.

  • 2.

  • 3.

  • 4.

  • 5.

  • 6.

  • 7.

  • 8.

  • 9.

  • 10.

  • 11.

  • 12.

  • 13.

  • 14.

  • 15.

  • 16.

  • 17.

  • 18.

  • 19.

  • 20.

  • 21.

  • 22.

  • 23.

  • 24.

  • 25.

  • 26.

  • 27.

  • 28.

  • 29.

  • 30.

  • 31.

  • 32.

  • 33.

  • 34.

  • 35.

io_uring_create继续进行了一系列赋值,赋值完后架构如下。

3 获取文件描述符

内核通过io_uring_get_fd获取文件描述符返回给调用方。

复制

static int io_uring_get_fd(struct io_ring_ctx *ctx){     struct file *file;     // 获取一个可用fd     int ret = get_unused_fd_flags(O_RDWR | O_CLOEXEC);     // 分配一个file结构体,设置函数集为io_uring_fops,并关联上下文ctx     file = anon_inode_getfile("[io_uring]", &io_uring_fops, ctx,                     O_RDWR | O_CLOEXEC);      // 关联fd和file结构体     fd_install(ret, file);     return ret; }
  • 1.

  • 2.

  • 3.

  • 4.

  • 5.

  • 6.

  • 7.

  • 8.

  • 9.

  • 10.

  • 11.

  • 12.

io_uring_get_fd申请了一个fd和file,这是遵循vfs的设计,最重要的是把io_uring的函数集挂在到file上,后续通过fd操作的io_uring实例的时候,经过vfs后就会执行对应的函数,另外还需要把ctx和file关联起来,因为后续通过fd操作io_uring时,需要拿到fd对应的io_uring上下文。至此。

io_uring_setup就分析完了,但是还不能使用。io_uring在设计中,为了减少系统调用和用户、内核数据通信的成本,实现了用户、内核共享数据结构的方式,这样用户和内核就可以操作同一份数据结构达到通信目的,而不用通过系统调用,更不需要设计来回复制。为了达到这个目的,用户拿到io_uring实例后,还需要调用mmap获取对应的内存映射。我们通过liburing库的逻辑来分析。

4 从liburing库看io_uring的使用

复制

int io_uring_queue_init_params(unsigned entries, struct io_uring *ring,                    struct io_uring_params *p){     int fd, ret;     // 调用io_uring_setup,拿到fd     fd = __sys_io_uring_setup(entries, p);     if (fd < 0)         return -errno;     // 内存映射     ret = io_uring_queue_mmap(fd, p, ring);     // 保存系统支持的属性     ring->features = p->features;     return 0; }
  • 1.

  • 2.

  • 3.

  • 4.

  • 5.

  • 6.

  • 7.

  • 8.

  • 9.

  • 10.

  • 11.

  • 12.

  • 13.

我们重点看一下io_uring_queue_mmap。

复制

int io_uring_queue_mmap(int fd, struct io_uring_params *p, struct io_uring *ring){     int ret;      memset(ring, 0, sizeof(*ring));     ret = io_uring_mmap(fd, p, &ring->sq, &ring->cq);     // 记录flags和fd     if (!ret) {         ring->flags = p->flags;         ring->ring_fd = fd;     }     return ret; }
  • 1.

  • 2.

  • 3.

  • 4.

  • 5.

  • 6.

  • 7.

  • 8.

  • 9.

  • 10.

  • 11.

  • 12.

继续看io_uring_mmap。

复制

static int io_uring_mmap(int fd, struct io_uring_params *p,              struct io_uring_sq *sq, struct io_uring_cq *cq){     size_t size;     int ret;     // 请求队列需要映射的内存大小,即整个结构体struct io_rings结构体的大小     sq->ring_sz = p->sq_off.array + p->sq_entries * sizeof(unsigned);     // 请求队列和完成队列映射的内存大小一样,等于请求队列的     cq->ring_sz = sq->ring_sz;     // 映射并拿到虚拟地址,大小是sq->ring_sz     sq->ring_ptr = mmap(0, sq->ring_sz, PROT_READ | PROT_WRITE,             MAP_SHARED | MAP_POPULATE, fd, IORING_OFF_SQ_RING);     cq->ring_ptr = sq->ring_ptr;     // 通过首地址和偏移拿到对应字段的地址     sq->khead = sq->ring_ptr + p->sq_off.head;     sq->ktail = sq->ring_ptr + p->sq_off.tail;     sq->kring_mask = sq->ring_ptr + p->sq_off.ring_mask;     sq->kring_entries = sq->ring_ptr + p->sq_off.ring_entries;     sq->kflags = sq->ring_ptr + p->sq_off.flags;     sq->kdropped = sq->ring_ptr + p->sq_off.dropped;     sq->array = sq->ring_ptr + p->sq_off.array;     // 映射保存请求队列节点的内存     size = p->sq_entries * sizeof(struct io_uring_sqe);     sq->sqes = mmap(0, size, PROT_READ | PROT_WRITE,                 MAP_SHARED | MAP_POPULATE, fd,                 IORING_OFF_SQES);     // 同上     cq->khead = cq->ring_ptr + p->cq_off.head;     cq->ktail = cq->ring_ptr + p->cq_off.tail;     cq->kring_mask = cq->ring_ptr + p->cq_off.ring_mask;     cq->kring_entries = cq->ring_ptr + p->cq_off.ring_entries;     cq->koverflow = cq->ring_ptr + p->cq_off.overflow;     cq->cqes = cq->ring_ptr + p->cq_off.cqes;     if (p->cq_off.flags)         cq->kflags = cq->ring_ptr + p->cq_off.flags;     return 0; }
  • 1.

  • 2.

  • 3.

  • 4.

  • 5.

  • 6.

  • 7.

  • 8.

  • 9.

  • 10.

  • 11.

  • 12.

  • 13.

  • 14.

  • 15.

  • 16.

  • 17.

  • 18.

  • 19.

  • 20.

  • 21.

  • 22.

  • 23.

  • 24.

  • 25.

  • 26.

  • 27.

  • 28.

  • 29.

  • 30.

  • 31.

  • 32.

  • 33.

  • 34.

  • 35.

  • 36.

io_uring_mmap除了保存一些常用的字段信息外,最重要的是做了内存映射。我们看看mmap的最后一个参数分别是IORING_OFF_SQ_RING和IORING_OFF_SQES,接下来我们看看io_uring的mmap钩子的实现。

复制

static int io_uring_mmap(struct file *file, struct vm_area_struct *vma){     size_t sz = vma->vm_end - vma->vm_start;     unsigned long pfn;     void *ptr;      ptr = io_uring_validate_mmap_request(file, vma->vm_pgoff, sz);      pfn = virt_to_phys(ptr) >> PAGE_SHIFT;     return remap_pfn_range(vma, vma->vm_start, pfn, sz, vma->vm_page_prot);}static void *io_uring_validate_mmap_request(struct file *file,                         loff_t pgoff, size_t sz){     struct io_ring_ctx *ctx = file->private_data;     loff_t offset = pgoff << PAGE_SHIFT;     struct page *page;     void *ptr;      switch (offset) {     case IORING_OFF_SQ_RING:     case IORING_OFF_CQ_RING:         ptr = ctx->rings;         break;     case IORING_OFF_SQES:         ptr = ctx->sq_sqes;         break;     default:         return ERR_PTR(-EINVAL);     }      page = virt_to_head_page(ptr);     if (sz > page_size(page))         return ERR_PTR(-EINVAL);      return ptr; }
  • 1.

  • 2.

  • 3.

  • 4.

  • 5.

  • 6.

  • 7.

  • 8.

  • 9.

  • 10.

  • 11.

  • 12.

  • 13.

  • 14.

  • 15.

  • 16.

  • 17.

  • 18.

  • 19.

  • 20.

  • 21.

  • 22.

  • 23.

  • 24.

  • 25.

  • 26.

  • 27.

  • 28.

  • 29.

  • 30.

  • 31.

  • 32.

  • 33.

这里设计的内容涉及到了复杂的内存管理,从代码中我们大概知道,返回的地址分别是ctx->rings和ctx->sq_sqes。即我们操作mmap返回的虚拟地址时,映射到内核的数据结构是ctx的字段。这样就完成了数据共享。最后形成的架构图如下。

至此,分析就告一段落,io_uring的实现实在是复杂,需要反复阅读和思考,才能慢慢理解和了解它的原理。

后记:io_uring作为新一代IO框架,未来应该会在各大软件中使用,尤其是对性能有极高要求的服务器,所以是非常值得关注和学习的。

 

14    2021-07-07 23:38:05    内核 IO Linux