前言:最近研究了一下Linux的高性能异步IO框架io_uring,并尝试引入Node.js中应用起来。所以本文打算介绍一下io_uring在内核的实现,因为io_uring实现代码量大,逻辑复杂,所以只能慢慢分析。这一篇介绍io_uring初始化接口io_uring_setup的实现。
复制
static long io_uring_setup(u32 entries, struct io_uring_params __user *params){ struct io_uring_params p; int i; if (copy_from_user(&p, params, sizeof(p))) return -EFAULT; // 支持的flag if (p.flags & ~(IORING_SETUP_IOPOLL | IORING_SETUP_SQPOLL | IORING_SETUP_SQ_AFF | IORING_SETUP_CQSIZE | IORING_SETUP_CLAMP | IORING_SETUP_ATTACH_WQ)) return -EINVAL; return io_uring_create(entries, &p, params); }
1.
2.
3.
4.
5.
6.
7.
8.
9.
10.
11.
12.
13.
14.
io_uring_setup是对io_uring_create的封装。第一个参数entries指定请求队列的长度,第二个参数params是用于调用方和内核通信的结构体。我们看一下定义。
复制
struct io_uring_params { // 定义请求队列长度(2的sq_entries次方),调用方定义 __u32 sq_entries; // 完成队列长度,默认是2 * 请求队列长度 __u32 cq_entries; // 控制内核行为的标记 __u32 flags; // poll模式下开启的内核线程绑定的cpu __u32 sq_thread_cpu; // poll模式下开启的内核线程空闲时间,之后会挂起。 __u32 sq_thread_idle; // 内核当前支持的能力,内核设置 __u32 features; __u32 wq_fd; __u32 resv[3]; // 记录内核数据的结构体,调用方后续调用mmap需要用到。 struct io_sqring_offsets sq_off; struct io_cqring_offsets cq_off; };
1.
2.
3.
4.
5.
6.
7.
8.
9.
10.
11.
12.
13.
14.
15.
16.
17.
18.
19.
我们接着看io_uring_create。
复制
static int io_uring_create(unsigned entries, struct io_uring_params *p, struct io_uring_params __user *params){ struct user_struct *user = NULL; struct io_ring_ctx *ctx; bool limit_mem; int ret; p->sq_entries = roundup_pow_of_two(entries); // 自定义完成队列长度 if (p->flags & IORING_SETUP_CQSIZE) { p->cq_entries = roundup_pow_of_two(p->cq_entries); // 完成队列不能小于请求队列 if (p->cq_entries < p->sq_entries) return -EINVAL; // 超过阈值则需要设置IORING_SETUP_CLAMP标记 if (p->cq_entries > IORING_MAX_CQ_ENTRIES) { if (!(p->flags & IORING_SETUP_CLAMP)) return -EINVAL; p->cq_entries = IORING_MAX_CQ_ENTRIES; } } else { // 默认是两倍的请求队列长度 p->cq_entries = 2 * p->sq_entries; } // 用户信息 user = get_uid(current_user()); // 分配一个ctx记录上下文,因为调用方只能拿到fd,后续操作fd的时候会拿到关联的上下文 ctx = io_ring_ctx_alloc(p); ctx->user = user; // 和poll模式相关的数据结构 ctx->sqo_task = get_task_struct(current); // 分配一个io_rings ret = io_allocate_scq_urings(ctx, p); // 处理poll模式的逻辑 ret = io_sq_offload_start(ctx, p); // 后面还有很多,一会分析 }
1.
2.
3.
4.
5.
6.
7.
8.
9.
10.
11.
12.
13.
14.
15.
16.
17.
18.
19.
20.
21.
22.
23.
24.
25.
26.
27.
28.
29.
30.
31.
32.
33.
34.
35.
36.
37.
io_uring_create代码比较多,我们分步分析。首先分配了一个io_ring_ctx结构体,这是核心的数据结构,用于记录io_uring实例的上下文,不过我们暂时不需要了解它具体的定义,因为实在太多,只关注本文相关的字段。
接着调用io_allocate_scq_urings分配一个io_rings结构体,这是非常核心的逻辑,我们看一下io_rings的定义。
复制
struct io_rings { struct io_uring sq, cq; u32 sq_ring_mask, cq_ring_mask; u32 sq_ring_entries, cq_ring_entries; u32 sq_dropped; u32 sq_flags; u32 cq_flags; u32 cq_overflow; struct io_uring_cqe cqes[]; };
1.
2.
3.
4.
5.
6.
7.
8.
9.
10.
io_rings主要用于记录请求和完成队列的信息。我们继续看io_allocate_scq_urings。
复制
static int io_allocate_scq_urings(struct io_ring_ctx *ctx, struct io_uring_params *p){ struct io_rings *rings; size_t size, sq_array_offset; // 记录请求和完成队列大小到ctx ctx->sq_entries = p->sq_entries; ctx->cq_entries = p->cq_entries; /* 计算结构体和额外数组的大小,sq_array_offset保存结构体大小, size保存结构体+额外数组+另一个额外数组的大小 */ size = rings_size(p->sq_entries, p->cq_entries, &sq_array_offset); // 分配内存 rings = io_mem_alloc(size); // ... }
1.
2.
3.
4.
5.
6.
7.
8.
9.
10.
11.
12.
13.
14.
15.
16.
io_allocate_scq_urings细节比较多,我们分开分析,我们看一下rings_size的逻辑。
复制
static unsigned long rings_size(unsigned sq_entries, unsigned cq_entries, size_t *sq_offset){ struct io_rings *rings; size_t off, sq_array_size; // 计算结构体和格外数组的大小,见io_rings定义 off = struct_size(rings, cqes, cq_entries); // sq_offset记录结构体大小 if (sq_offset) *sq_offset = off; // 计算多个u32元素的数组的大小 sq_array_size = array_size(sizeof(u32), sq_entries); // 计算结构体大小 + sq_array_size的大小保存到off if (check_add_overflow(off, sq_array_size, &off)) return SIZE_MAX; return off; }
1.
2.
3.
4.
5.
6.
7.
8.
9.
10.
11.
12.
13.
14.
15.
16.
struct_size是计算结构体和额外字段大小的宏,我们刚才看到io_rings结构体的定义中,最后一个字段是struct io_uring_cqe cqes[],看起来是个空数组,其实他的内存是紧跟着结构体后面分配的,结构如下。
下面我们看struct_size是如何计算的。
复制
#define struct_size(p, member, count) \ __ab_c_size(count, \ sizeof(*(p)->member) + __must_be_array((p)->member),\ sizeof(*(p))) static inline __must_check size_t __ab_c_size(size_t a, size_t b, size_t c){ size_t bytes; // 计算a * b保存到bytes if (check_mul_overflow(a, b, &bytes)) return SIZE_MAX; // 计算bytes + c保存搭配bytes if (check_add_overflow(bytes, c, &bytes)) return SIZE_MAX; return bytes; }
1.
2.
3.
4.
5.
6.
7.
8.
9.
10.
11.
12.
13.
14.
15.
16.
我们看到计算方式就是数组元素大小*元素个数+结构体本身的大小。计算完结构体大小后又通过array_size计算了另一个数组的大小并加起来,所以io_rings的结构体如下所示。
分配了io_rings之后我们继续看接下来的逻辑。
复制
static int io_allocate_scq_urings(struct io_ring_ctx *ctx, struct io_uring_params *p){ // ... // 记录到ctx中 ctx->rings = rings; // sq_array记录rings结构体中,u32数组的首地址 ctx->sq_array = (u32 *)((char *)rings + sq_array_offset); // 用于回环处理 rings->sq_ring_mask = p->sq_entries - 1; rings->cq_ring_mask = p->cq_entries - 1; // 队列长度 rings->sq_ring_entries = p->sq_entries; rings->cq_ring_entries = p->cq_entries; ctx->sq_mask = rings->sq_ring_mask; ctx->cq_mask = rings->cq_ring_mask; // 请求队列的数组大小 size = array_size(sizeof(struct io_uring_sqe), p->sq_entries); // 分配内存并记录到sq_sqes ctx->sq_sqes = io_mem_alloc(size); return 0; }
1.
2.
3.
4.
5.
6.
7.
8.
9.
10.
11.
12.
13.
14.
15.
16.
17.
18.
19.
20.
21.
进行了一系列设置后,架构如下。
创建完io_rings结构体后,我们继续回到io_uring_create中。
内核申请完系列结构体后,需要通过io_uring_params结构体返回给调用方。
复制
static int io_uring_create(unsigned entries, struct io_uring_params *p, struct io_uring_params __user *params) { ret = io_allocate_scq_urings(ctx, p); // 初始化poll模式相关逻辑,如果开启了的话 ret = io_sq_offload_start(ctx, p); memset(&p->sq_off, 0, sizeof(p->sq_off)); // 记录字段在结构体的偏移 p->sq_off.head = offsetof(struct io_rings, sq.head); p->sq_off.tail = offsetof(struct io_rings, sq.tail); p->sq_off.ring_mask = offsetof(struct io_rings, sq_ring_mask); p->sq_off.ring_entries = offsetof(struct io_rings, sq_ring_entries); p->sq_off.flags = offsetof(struct io_rings, sq_flags); p->sq_off.dropped = offsetof(struct io_rings, sq_dropped); p->sq_off.array = (char *)ctx->sq_array - (char *)ctx->rings; memset(&p->cq_off, 0, sizeof(p->cq_off)); p->cq_off.head = offsetof(struct io_rings, cq.head); p->cq_off.tail = offsetof(struct io_rings, cq.tail); p->cq_off.ring_mask = offsetof(struct io_rings, cq_ring_mask); p->cq_off.ring_entries = offsetof(struct io_rings, cq_ring_entries); p->cq_off.overflow = offsetof(struct io_rings, cq_overflow); p->cq_off.cqes = offsetof(struct io_rings, cqes); p->cq_off.flags = offsetof(struct io_rings, cq_flags); // 内核支持的属性 p->features = IORING_FEAT_SINGLE_MMAP | IORING_FEAT_NODROP | IORING_FEAT_SUBMIT_STABLE | IORING_FEAT_RW_CUR_POS | IORING_FEAT_CUR_PERSONALITY | IORING_FEAT_FAST_POLL | IORING_FEAT_POLL_32BITS; copy_to_user(params, p, sizeof(*p)) // 获取fd ret = io_uring_get_fd(ctx); return ret; }
1.
2.
3.
4.
5.
6.
7.
8.
9.
10.
11.
12.
13.
14.
15.
16.
17.
18.
19.
20.
21.
22.
23.
24.
25.
26.
27.
28.
29.
30.
31.
32.
33.
34.
35.
io_uring_create继续进行了一系列赋值,赋值完后架构如下。
内核通过io_uring_get_fd获取文件描述符返回给调用方。
复制
static int io_uring_get_fd(struct io_ring_ctx *ctx){ struct file *file; // 获取一个可用fd int ret = get_unused_fd_flags(O_RDWR | O_CLOEXEC); // 分配一个file结构体,设置函数集为io_uring_fops,并关联上下文ctx file = anon_inode_getfile("[io_uring]", &io_uring_fops, ctx, O_RDWR | O_CLOEXEC); // 关联fd和file结构体 fd_install(ret, file); return ret; }
1.
2.
3.
4.
5.
6.
7.
8.
9.
10.
11.
12.
io_uring_get_fd申请了一个fd和file,这是遵循vfs的设计,最重要的是把io_uring的函数集挂在到file上,后续通过fd操作的io_uring实例的时候,经过vfs后就会执行对应的函数,另外还需要把ctx和file关联起来,因为后续通过fd操作io_uring时,需要拿到fd对应的io_uring上下文。至此。
io_uring_setup就分析完了,但是还不能使用。io_uring在设计中,为了减少系统调用和用户、内核数据通信的成本,实现了用户、内核共享数据结构的方式,这样用户和内核就可以操作同一份数据结构达到通信目的,而不用通过系统调用,更不需要设计来回复制。为了达到这个目的,用户拿到io_uring实例后,还需要调用mmap获取对应的内存映射。我们通过liburing库的逻辑来分析。
复制
int io_uring_queue_init_params(unsigned entries, struct io_uring *ring, struct io_uring_params *p){ int fd, ret; // 调用io_uring_setup,拿到fd fd = __sys_io_uring_setup(entries, p); if (fd < 0) return -errno; // 内存映射 ret = io_uring_queue_mmap(fd, p, ring); // 保存系统支持的属性 ring->features = p->features; return 0; }
1.
2.
3.
4.
5.
6.
7.
8.
9.
10.
11.
12.
13.
我们重点看一下io_uring_queue_mmap。
复制
int io_uring_queue_mmap(int fd, struct io_uring_params *p, struct io_uring *ring){ int ret; memset(ring, 0, sizeof(*ring)); ret = io_uring_mmap(fd, p, &ring->sq, &ring->cq); // 记录flags和fd if (!ret) { ring->flags = p->flags; ring->ring_fd = fd; } return ret; }
1.
2.
3.
4.
5.
6.
7.
8.
9.
10.
11.
12.
继续看io_uring_mmap。
复制
static int io_uring_mmap(int fd, struct io_uring_params *p, struct io_uring_sq *sq, struct io_uring_cq *cq){ size_t size; int ret; // 请求队列需要映射的内存大小,即整个结构体struct io_rings结构体的大小 sq->ring_sz = p->sq_off.array + p->sq_entries * sizeof(unsigned); // 请求队列和完成队列映射的内存大小一样,等于请求队列的 cq->ring_sz = sq->ring_sz; // 映射并拿到虚拟地址,大小是sq->ring_sz sq->ring_ptr = mmap(0, sq->ring_sz, PROT_READ | PROT_WRITE, MAP_SHARED | MAP_POPULATE, fd, IORING_OFF_SQ_RING); cq->ring_ptr = sq->ring_ptr; // 通过首地址和偏移拿到对应字段的地址 sq->khead = sq->ring_ptr + p->sq_off.head; sq->ktail = sq->ring_ptr + p->sq_off.tail; sq->kring_mask = sq->ring_ptr + p->sq_off.ring_mask; sq->kring_entries = sq->ring_ptr + p->sq_off.ring_entries; sq->kflags = sq->ring_ptr + p->sq_off.flags; sq->kdropped = sq->ring_ptr + p->sq_off.dropped; sq->array = sq->ring_ptr + p->sq_off.array; // 映射保存请求队列节点的内存 size = p->sq_entries * sizeof(struct io_uring_sqe); sq->sqes = mmap(0, size, PROT_READ | PROT_WRITE, MAP_SHARED | MAP_POPULATE, fd, IORING_OFF_SQES); // 同上 cq->khead = cq->ring_ptr + p->cq_off.head; cq->ktail = cq->ring_ptr + p->cq_off.tail; cq->kring_mask = cq->ring_ptr + p->cq_off.ring_mask; cq->kring_entries = cq->ring_ptr + p->cq_off.ring_entries; cq->koverflow = cq->ring_ptr + p->cq_off.overflow; cq->cqes = cq->ring_ptr + p->cq_off.cqes; if (p->cq_off.flags) cq->kflags = cq->ring_ptr + p->cq_off.flags; return 0; }
1.
2.
3.
4.
5.
6.
7.
8.
9.
10.
11.
12.
13.
14.
15.
16.
17.
18.
19.
20.
21.
22.
23.
24.
25.
26.
27.
28.
29.
30.
31.
32.
33.
34.
35.
36.
io_uring_mmap除了保存一些常用的字段信息外,最重要的是做了内存映射。我们看看mmap的最后一个参数分别是IORING_OFF_SQ_RING和IORING_OFF_SQES,接下来我们看看io_uring的mmap钩子的实现。
复制
static int io_uring_mmap(struct file *file, struct vm_area_struct *vma){ size_t sz = vma->vm_end - vma->vm_start; unsigned long pfn; void *ptr; ptr = io_uring_validate_mmap_request(file, vma->vm_pgoff, sz); pfn = virt_to_phys(ptr) >> PAGE_SHIFT; return remap_pfn_range(vma, vma->vm_start, pfn, sz, vma->vm_page_prot);}static void *io_uring_validate_mmap_request(struct file *file, loff_t pgoff, size_t sz){ struct io_ring_ctx *ctx = file->private_data; loff_t offset = pgoff << PAGE_SHIFT; struct page *page; void *ptr; switch (offset) { case IORING_OFF_SQ_RING: case IORING_OFF_CQ_RING: ptr = ctx->rings; break; case IORING_OFF_SQES: ptr = ctx->sq_sqes; break; default: return ERR_PTR(-EINVAL); } page = virt_to_head_page(ptr); if (sz > page_size(page)) return ERR_PTR(-EINVAL); return ptr; }
1.
2.
3.
4.
5.
6.
7.
8.
9.
10.
11.
12.
13.
14.
15.
16.
17.
18.
19.
20.
21.
22.
23.
24.
25.
26.
27.
28.
29.
30.
31.
32.
33.
这里设计的内容涉及到了复杂的内存管理,从代码中我们大概知道,返回的地址分别是ctx->rings和ctx->sq_sqes。即我们操作mmap返回的虚拟地址时,映射到内核的数据结构是ctx的字段。这样就完成了数据共享。最后形成的架构图如下。
至此,分析就告一段落,io_uring的实现实在是复杂,需要反复阅读和思考,才能慢慢理解和了解它的原理。
后记:io_uring作为新一代IO框架,未来应该会在各大软件中使用,尤其是对性能有极高要求的服务器,所以是非常值得关注和学习的。