行为树及其实现

第一次听说行为树还是在2011年珠三角技术沙龙上Akara的分享, 这里也有个不错的视频系列. 可惜此后一直未有机会实践. 这两天趁着假期用C写了实现(github repo). 顺便填填博客的坑.

背景

在游戏AI中, 常见的实现有决策树、状态机等, 它们各自存在着不足. 以状态机FSM为例, 它非常难以通用和扩展, 状态转化的复杂度随着每个新增状态将变得越发缭乱. 考虑到可能存在多个并行的状态机, 它们之间的交互更是复杂交错, 难解难分. 于是大神们创造了行为树(BehaviourTree),

行为树

Behavior-Chart

(这张高大上的图片来自这里)

显而易见, 行为树作为一棵树, 必然有中间节点和叶子节点, 它们分别负责选择节点和执行AI逻辑. 在游戏中, 一般是每帧或者核心状态变化时执行行为树的Update(或称Tick)接口, 该接口从树根开始通过中间节点进行判断, 以此搜索到叶子节点.

叶子节点

叶子节点通常执行具体的动作, 一般需要提供以下回调:

  • enter: 进入叶子节点, 一般用于初始化状态, 分配内存等
  • excute/tick: 执行具体的AI逻辑
  • exit: 退出叶子节点, 一般用于清理状态, 回收内存等

其中, excute/tick的执行结果分为完成和执行中两种运行状态. 行为树每次执行时, 可以从树根开始重新搜索, 也可以记忆最后一个返回执行中的子节点, 从该节点开始执行, 两种方式各有利弊.

中间节点

中间节点通常作为选择节点, 负责确认子节点中的执行顺序, 可以泛泛的分为以下几种:

  • 顺序(sequence): 自左往右顺序执行子节点
  • 随机(random): 随机执行一个子节点
  • 优先(priority): 自左往右顺序执行子节点直到其中一个返回成功
黑板

考虑到节点之间或者行为树之间的输入输出, 引入了 黑板(blackboard) 的概念, 分为几种:

  • global: 所有行为树共享
  • per-tree: 一棵行为树的黑板, 该树的所有节点共享
  • per-node: 单个节点的黑板, 主要用于节点

实现

来自育碧的finney用800行C++代码做了实现. 于我而言, 其封装层次过深, 略显臃肿(当然逻辑还是很清晰的), 且OO为个人不太喜欢的继承, 因此花了一天多的时间, 以组合的形式来面向对象,用400行代码完成了一份C的实现. 考虑到其中不少工作是在维护虚函数表, 如果换成C++实现, 相信200行代码足以, XD.

这里简单描述下API:

30 void *node_create(struct node_callbacks *cbs, void *blackboard); 创建一个子节点, cbs为回调集合. blackboard为按需初始化的per-node黑板, 当然你也可以在node的enter回调中设置.
33 void *branch_create(int type, int num, void **nodes); 创建中间节点, type表示执行类型(sequence, priority, random), nodes表示其子节点列表
34 void *behaviourtree_create(void *branch); 创建一棵行为树, branch表示树根的选择节点.
36 void behaviourtree_tick(void *bt, void *object); 执行行为树, object表示行为树的上下文, 一般可以用于为行为树绑定一个对象object.

在子节点node_callbacks的tick回调中, 需要明确告诉行为树自己的执行结果, 方式为回调函数类型的最后一个参数node_operations, 分为完成success, 失败fail, 运行中running.

具体使用可以参考example1和example2.

TODO

从example中可以看到, 行为树的构造是比较形式化的. 大厂一般会有自己一套编辑器可视化的编辑行为树,  生成其结构的描述数据. 这里有一个网页版的GUI, 生成的行为树结构为json格式, 以后需要的话可以编写代码解析其格式来自动构造行为树(大坑, 逃~).

dm-cache源码浅析

最近想学习Linux IO子系统, 找了flashcache代码, 它通过内核提供的Device Mapper机制, 将一块SSD和一块普通磁盘虚拟为一个块设备, 其中SSD作为cache, 数据最终落地到普通磁盘. 这种混合存储的策略, 向上层应用(如mysql)屏蔽了底层的实现, 上层应用看到的只是一个挂载到虚拟块设备上的某种文件系统, 使用常见的文件系统接口即可读写数据, 一方面保持兼容, 一方面获得不错的性能. flashcache的代码只有几千行, 从commit log中可以看到版本迭代比较频繁, 也因此引入了较多我个人不关心的新特性. flashcache源码中作者写到借鉴了dm-cache的代码, 所以查了下资料, 竟是国人出品, sloc不足两千, 一晚上就可以看完, 正合胃口. dm-cache的使用可以参考flashcache文档, 原理见flashcache原理.

内存结构

dm-cache思路非常简单, 它把SSD作为cache, 将数据持久化到普通磁盘. 其中, SSD cache组织方式为set-associative map, 这和CPU cache的组织非常相像, 只是这里的key是cacheblock编号. cacheblock是dm-cache为了方便存取数据引入的单位, 粒度在磁盘block之上. 在通过dmsetup创建dm-cache块设备时时可以指定cacheblock的大小, 默认为8个连续的磁盘block组成一个cacheblock, 即4k字节. 上层的IO请求由Device Mapper框架切割为cacheblock大小(且对齐)的bio, 然后交由dm-cache处理. 也就是说, 不管是对SSD, 还是普通磁盘, dm-cache处理IO的单位都是cacheblock. 它在内存中的metadata为:

117 /* Cache block metadata structure */
118 struct cacheblock {
119     spinlock_t lock;    /* Lock to protect operations on the bio list */
120     sector_t block;     /* Sector number of the cached block */
121     unsigned short state;  /* State of a block */
122     unsigned long counter; /* Logical timestamp of the block’s last access */
123     struct bio_list bios;  /* List of pending bios */
124 };

其中, block字段表示当前cacheblock的起始扇区编号. 既然SSD作为cache, 针对写请求必定会有writeback和writethrough等多种选择. writeback即数据先写到SSD, 然后由后台线程在合适的时间写回磁盘. writethrough指数据同时写入磁盘和SSD. (flashcache在这基础上又增加了writearound的方式, 意思是绕过SSD cache, 数据直接写入磁盘, 在处理读请求时更新到SSD.) 不管是writeback, 还是writethrough, 数据写入磁盘(或者由磁盘读取数据更新至cache)都不可能一蹴而就, 所以每个cacheblock必定会有一个状态(state字段). 另外, cache有淘汰的概念, dm-cache支持FIFO或LRU淘汰, 所以需要为每个cacheblock保存其最后访问时间(counter字段). 最后, 为了互斥同时请求同一个cacheblock, 每个cacheblock还对应一个spinlock. 被互斥的后发请求记录在bios链表中. 在当前cacheblock上的操作完成后, dm-cache将重新提交bios链表上的bio.

接下来看下dm-cache的总控结构体cache_c:

 80 /*
 81  * Cache context
 82  */
 83 struct cache_c {
 84     struct dm_dev *src_dev;        /* Source device */
 85     struct dm_dev *cache_dev;  /* Cache device */
 86     struct dm_kcopyd_client *kcp_client; /* Kcopyd client for writing back data */
 87
 88     struct cacheblock *cache;  /* Hash table for cache blocks */
 89     sector_t size;          /* Cache size */
 90     unsigned int bits;     /* Cache size in bits */
 91     unsigned int assoc;        /* Cache associativity */
 92     unsigned int block_size;   /* Cache block size */
 93     unsigned int block_shift;  /* Cache block size in bits */
 94     unsigned int block_mask;   /* Cache block mask */
 95     unsigned int consecutive_shift;    /* Consecutive blocks size in bits */
 96     unsigned long counter;     /* Logical timestamp of last access */
 97     unsigned int write_policy; /* Cache write policy */
 98     sector_t dirty_blocks;      /* Number of dirty blocks */
 99
100     spinlock_t lock;        /* Lock to protect page allocation/deallocation */
101     struct page_list *pages;   /* Pages for I/O */
102     unsigned int nr_pages;     /* Number of pages */
103     unsigned int nr_free_pages;    /* Number of free pages */
104     wait_queue_head_t destroyq; /* Wait queue for I/O completion */
105     atomic_t nr_jobs;       /* Number of I/O jobs */
106     struct dm_io_client *io_client;   /* Client memory pool*/
107
108     /* Stats */
109     unsigned long reads;       /* Number of reads */
110     unsigned long writes;      /* Number of writes */
111     unsigned long cache_hits;  /* Number of cache hits */
112     unsigned long replace;     /* Number of cache replacements */
113     unsigned long writeback;   /* Number of replaced dirty blocks */
114     unsigned long dirty;       /* Number of submitted dirty blocks */
115 };

其中, src_dev和cache_dev分别为磁盘和SSD在DM框架的抽象. cache字段为连续的cacheblock数组, 元素个数即size字段. 其余字段顾名思义, 不再赘述.

初始化

dm-cache的初始化代码相对简单, DM框架获取dmsetup参数, 传递给cache_ctr(), dm-cache通过该函数构造一个cache_c对象, 保存在dm_target.private中. dm_target结构中另一重要字段为split_io, 这个字段表示DM框架分割bio的粒度, cache_ctr()函数指定其为cacheblock大小.

上层的读写请求在IO内核路径上表示为bio, 针对Device Mapper框架虚拟出来的块设备的bio请求, DM框架通过bio的block编号找到所属的dm_targets(一个bio的请求可能横跨多个dm_target), 逐个回调dm_target.type->map, 该字段为函数指针, 在dm-cache模块加载到内核时, 由该模块的初始化函数dm_cache_init()注册为cache_map(). 也就是说, 读写请求的入口都是cache_map().

请求处理

如上所述, 读写请求的入口都是cache_map(), 其实现如下:

1202 /*
1203  * Decide the mapping and perform necessary cache operations for a bio request.
1204  */
1205 static int cache_map(struct dm_target *ti, struct bio *bio,
1206               union map_info *map_context)
1207 {
1208     struct cache_c *dmc = (struct cache_c *) ti->private;
1209     sector_t request_block, cache_block = 0, offset;
1210     int res;
1211
1212     offset = bio->bi_sector & dmc->block_mask;
1213     request_block = bio->bi_sector – offset;
1214
1220     if (bio_data_dir(bio) == READ) dmc->reads++;
1221     else dmc->writes++;
1222
1223     res = cache_lookup(dmc, request_block, &cache_block);
1224     if (1 == res)  /* Cache hit; server request from cache */
1225         return cache_hit(dmc, bio, cache_block);
1226     else if (0 == res) /* Cache miss; replacement block is found */
1227         return cache_miss(dmc, bio, cache_block);
1228     else if (2 == res) { /* Entire cache set is dirty; initiate a write-back */
1229         write_back(dmc, cache_block, 1);
1230         dmc->writeback++;
1231     }
1232
1233     /* Forward to source device */
1234     bio->bi_bdev = dmc->src_dev->bdev;
1235
1236     return 1;
1237 }

该函数首先从ti->private中获取cache_c *dmc, 这个对象由cache_ctr()中构造. 接着获得bio所请求的起始扇区(即bio->bi_sector)所属的cacheblock的扇区编号, 保存在request_block变量. 接着通过cache_lookup()函数在dmc->cache中查找, key便是request_block. cache_lookup()代码相对简单, 不再细述.

如果cache中查找失败, 则进入cache_miss()逻辑. 其最后一个参数cache_block为cache_lookup()以某种淘汰形式找到的待替换的cacheblock的扇区编号.

1189 /* Handle cache misses */
1190 static int cache_miss(struct cache_c *dmc, struct bio* bio, sector_t cache_block) {
1191     if (bio_data_dir(bio) == READ)
1192         return cache_read_miss(dmc, bio, cache_block);
1193     else
1194         return cache_write_miss(dmc, bio, cache_block);
1195 }

cache_miss()函数判断bio是读是写, 读则调用cache_read_miss(), 否则调用cache_write_miss().

篇幅所限, 接下来我们只看下读请求未命中cache的情况, 这时cache_read_miss()将被调用.

1073 /*
1074  * Handle a read cache miss:
1075  *  Update the metadata; fetch the necessary block from source device;
1076  *  store data to cache device.
1077  */
1078 static int cache_read_miss(struct cache_c *dmc, struct bio* bio,
1079                            sector_t cache_block) {
1080     struct cacheblock *cache = dmc->cache;
1081     unsigned int offset, head, tail;
1082     struct kcached_job *job;
1083     sector_t request_block, left;
1084
1085     offset = (unsigned int)(bio->bi_sector & dmc->block_mask);
1086     request_block = bio->bi_sector – offset;
1087
1095     cache_insert(dmc, request_block, cache_block); /* Update metadata first */
1096
1097     job = new_kcached_job(dmc, bio, request_block, cache_block);
1098
1099     head = to_bytes(offset);
1100
1101     left = (dmc->src_dev->bdev->bd_inode->i_size>>9) – request_block;
1102     if (left < dmc->block_size) {
1103         tail = to_bytes(left) – bio->bi_size – head;
1104         job->src.count = left;
1105         job->dest.count = left;
1106     } else
1107         tail = to_bytes(dmc->block_size) – bio->bi_size – head;
1108
1109     /* Requested block is aligned with a cache block */
1110     if (0 == head && 0 == tail)
1111         job->nr_pages= 0;
1112     else /* Need new pages to store extra data */
1113         job->nr_pages = dm_div_up(head, PAGE_SIZE) + dm_div_up(tail, PAGE_SIZE);
1114     job->rw = READ; /* Fetch data from the source device */
1115
1118     queue_job(job);
1119
1120     return 0;
1121 }

函数首先调用cache_insert()更新cache, 设置该cacheblock.state为RESERVED. 然后调用new_kcached_job()分配一个kcached_job对象. 第1109~1104行是核心代码, 如前文所述, 上层请求的bio已经由DM框架按cacheblock单位切分, 也就是说, cache_map()所处理的每个bio请求的扇区数最大为cacheblock. 如下图所示: 第1099行获得这个bio在cacheblock中的偏移, 保存在head. 第1101行获得第request_block块扇区到磁盘最后一块扇区所跨过的扇区数. 第1103或1107行获得bio请求的数据最后一个字节离磁盘最后一字节(或者下一个cacheblock)的偏移. 第1110行, 如果0 == head且0 == tail, 说明所请求的bio正好覆盖整个cacheblock. 否则, 说明请求的bio只占cacheblock的一部分, 针对这种情况, 需要为该bio未请求的前后两部分分别分配页面. 因为dm-cache请求磁盘的单位为cacheblock大小. 第1114行指定job的读写方向为READ. 最后, 第1118行提交job.

回头看看cache_read_miss()中的1097行分配job所调用的函数new_kcached_job(), 第3个参数request_block表示bio请求在磁盘的起始扇区号, 第4个参数cache_block表示bio请求在SSD的起始扇区号.

1049 static struct kcached_job *new_kcached_job(struct cache_c *dmc, struct bio* bio,
1050                                            sector_t request_block,
1051                                            sector_t cache_block)
1052 {
1053     struct dm_io_region src, dest;
1054     struct kcached_job *job;
1055
1056     src.bdev = dmc->src_dev->bdev;
1057     src.sector = request_block;
1058     src.count = dmc->block_size;
1059     dest.bdev = dmc->cache_dev->bdev;
1060     dest.sector = cache_block << dmc->block_shift;
1061     dest.count = src.count;
1062
1063     job = mempool_alloc(_job_pool, GFP_NOIO);
1064     job->dmc = dmc;
1065     job->bio = bio;
1066     job->src = src;
1067     job->dest = dest;
1068     job->cacheblock = &dmc->cache[cache_block];
1069
1070     return job;
1071 }

接下来看看job结构的定义:

126 /* Structure for a kcached job */
127 struct kcached_job {
128     struct list_head list;
129     struct cache_c *dmc;
130     struct bio *bio;   /* Original bio */
131     struct dm_io_region src;
132     struct dm_io_region dest;
133     struct cacheblock *cacheblock;
134     int rw;
135     /*
136      * When the original bio is not aligned with cache blocks,
137      * we need extra bvecs and pages for padding.
138      */
139     struct bio_vec *bvec;
140     unsigned int nr_pages;
141     struct page_list *pages;
142 };

在dm-cache中, job有3种状态, 它以list字段链入其所属于的链表, 分别为_io_jobs, _pages_jobs和_complete_jobs. 其中, io_jobs表示待执行IO的任务, page_jobs待分配页面的任务, compelet_jobs表示待收尾的任务. kcached_job的bio字段存储DM框架发给cache_map()的bio请求, src和dest分别指向磁盘和SSD的dm_io_region. cacheblock指针指向cache_c.cache数组中以请求的bio所落在的SSD磁盘上的cacheblock编号为下标的偏移. rw字段表示job当前的读写方向.

回到cache_read_miss()函数, 它在第1118行调用queue_job()提交了任务, 代码如下:

736 static void queue_job(struct kcached_job *job)
737 {
738     atomic_inc(&job->dmc->nr_jobs);
739     if (job->nr_pages > 0) /* Request pages */
740         push(&_pages_jobs, job);
741     else /* Go ahead to do I/O */
742         push(&_io_jobs, job);
743     wake();
744 }

可以看到, 如果需要为job分配页面, 则将job链入_pages_jobs链表, 否则, 链入_io_jobs链表. 然后调用wake():

299 static inline void wake(void)
300 {
301     queue_work(_kcached_wq, &_kcached_work);
302 }

wake()函数只是对queue_work()的封装, 它将_kcached_work提交到_kcached_wq. 在dm-cache模块初始化函数dm_cache_init()中, _kcached_work被注册的回调为do_work. 所以, 当_kcached_work被调度时, do_work()将被回调.

729 static void do_work(struct work_struct *ignored)
730 {
731     process_jobs(&_complete_jobs, do_complete);
732     process_jobs(&_pages_jobs, do_pages);
733     process_jobs(&_io_jobs, do_io);
734 }

可见, do_work()依次遍历_complete_jobs, _pages_jobs和_io_jobs链表中的任务, 以任务为参数, 分别回调do_complete, do_pages, do_io. 在这里, 遍历的顺序是有讲究的: 先处理_complete_jobs任务, 是因为此类任务完成后可能释放一些页面回页面内存池; 然后处理_pages_jobs任务, 因为此类任务只有获取页面后才能执行IO操作, 它从页面内存池中获取页面; 最后处理_io_jobs链表任务.

process_jobs()代码如下:

696 /*
697  * Run through a list for as long as possible.  Returns the count
698  * of successful jobs.
699  */
700 static int process_jobs(struct list_head *jobs,
701                         int (*fn) (struct kcached_job *))
702 {
703     struct kcached_job *job;
704     int r, count = 0;
705
706     while ((job = pop(jobs))) {
707         r = fn(job);
708
709         if (r < 0) {
710             /* error this rogue job */
711             DMERR("process_jobs: Job processing error");
712         }
713
714         if (r > 0) {
715             /*
716              * We couldn’t service this job ATM, so
717              * push this job back onto the list.
718              */
719             push(jobs, job);
720             break;
721         }
722
723         count++;
724     }
725
726     return count;
727 }

它依次遍历链表, 调用回调.

回到queue_job(), 前面说过, 因为dm-cache读写SSD及磁盘的粒度为cacheblock大小, 所以如果bio请求未对其cacheblock, 或请求大小不等于cacheblock大小, 则需要为该cacheblock中, bio不关心的前后部分分配页面, 即把job提交到_pages_jobs链表. 否则, 直接提交到_io_jobs链表.

_pages_jobs链表的回调函数do_pages非常简单, 它从页面内存池获取一些页面(页面数为nr_pages), 保存在kcached_job结构的pages字段, 然后将job提交到_io_jobs链表.

针对_io_jobs链表上的任务, do_work()将以do_io回调来处理该任务.

618 static int do_io(struct kcached_job *job)
619 {
620     int r = 0;
621
622     if (job->rw == READ) { /* Read from source device */
623         r = do_fetch(job);
624     } else { /* Write to cache device */
625         r = do_store(job);
626     }
627
628     return r;
629 }

针对读请求, 很明显是进入do_fetch()分支.

400 /*
401  * Fetch data from the source device asynchronously.
402  * For a READ bio, if a cache block is larger than the requested data, then
403  * additional data are prefetched. Larger cache block size enables more
404  * aggressive read prefetching, which is useful for read-mostly usage.
405  * For a WRITE bio, if a cache block is larger than the requested data, the
406  * entire block needs to be fetched, and larger block size incurs more overhead.
407  * In scenaros where writes are frequent, 4KB is a good cache block size.
408  */
409 static int do_fetch(struct kcached_job *job)
410 {
411     int r = 0, i, j;
412     struct bio *bio = job->bio;
413     struct cache_c *dmc = job->dmc;
414     unsigned int offset, head, tail, remaining, nr_vecs, idx = 0;
415     struct bio_vec *bvec;
416     struct page_list *pl;
417     printk("do_fetch");
418     offset = (unsigned int) (bio->bi_sector & dmc->block_mask);
419     head = to_bytes(offset);
420     tail = to_bytes(dmc->block_size) – bio->bi_size – head;
425
426     if (bio_data_dir(bio) == READ) { /* The original request is a READ */
427         if (0 == job->nr_pages) { /* The request is aligned to cache block */
428             r = dm_io_async_bvec(1, &job->src, READ,
429                                  bio->bi_io_vec + bio->bi_idx,
430                                  io_callback, job);
431             return r;
432         }
433
434         nr_vecs = bio->bi_vcnt – bio->bi_idx + job->nr_pages;
435         bvec = kmalloc(nr_vecs * sizeof(*bvec), GFP_NOIO);
436         if (!bvec) {
437             DMERR("do_fetch: No memory");
438             return 1;
439         }
440
441         pl = job->pages;
442         i = 0;
443         while (head) {
444             bvec[i].bv_len = min(head, (unsigned int)PAGE_SIZE);
445             bvec[i].bv_offset = 0;
446             bvec[i].bv_page = pl->page;
447             head -= bvec[i].bv_len;
448             pl = pl->next;
449             i++;
450         }
451
452         remaining = bio->bi_size;
453         j = bio->bi_idx;
454         while (remaining) {
455             bvec[i] = bio->bi_io_vec[j];
456             remaining -= bvec[i].bv_len;
457             i++; j++;
458         }
459
460         while (tail) {
461             bvec[i].bv_len = min(tail, (unsigned int)PAGE_SIZE);
462             bvec[i].bv_offset = 0;
463             bvec[i].bv_page = pl->page;
464             tail -= bvec[i].bv_len;
465             pl = pl->next;
466             i++;
467         }
468
469         job->bvec = bvec;
470         r = dm_io_async_bvec(1, &job->src, READ, job->bvec, io_callback, job);
471         return r;
472     } else { /* The original request is a WRITE */
541     }
542 }

-如果任务没有申请页面, 即bio请求正好cacheblock对齐且请求大小正好为一个cacheblock, 则直接调用dm_io_async_bvec().
-如果任务申请了页面, 即bio请求不是cacheblock对齐, 或者请求大小不是一个cacheblock, 则通过第434~467行代码主动构造一个bio_vec *bvec, 保存在job->bvec中, 然后调用dm_io_async_bvec().

仔细比较上述两种情况调用dm_io_async_bvec()所传递的参数, 不难发现, 只有第4个参数是不一样的. 前者传递的为原来请求的bio的bvec, 后者传递的为主动构造的bvec.

dm_io_async_bvec()函数提交IO, 从磁盘(job->src)中读取数据到第4个参数, 然后回调io_callback().

382 static void io_callback(unsigned long error, void *context)
383 {
384     struct kcached_job *job = (struct kcached_job *) context;
385
386     if (error) {
387         /* TODO */
388         DMERR("io_callback: io error");
389         return;
390     }
391
392     if (job->rw == READ) {
393         job->rw = WRITE;
394         push(&_io_jobs, job);
395     } else
396         push(&_complete_jobs, job);
397     wake();
398 }

读请求的job->rw为READ, 将其修改为WRITE后将job提交到_io_jobs链表. _io_jobs链表元素再次由do_work()以do_io()回调. 此时, 因为job->rw为WRITE, 所以调用的函数变成了do_store().

544 /*
545  * Store data to the cache source device asynchronously.
546  * For a READ bio request, the data fetched from the source device are returned
547  * to kernel and stored in cache at the same time.
548  * For a WRITE bio request, the data are written to the cache and source device
549  * at the same time.
550  */
551 static int do_store(struct kcached_job *job)
552 {
553     int i, j, r = 0;
554     struct bio *bio = job->bio ;
555     struct cache_c *dmc = job->dmc;
556     unsigned int offset, head, tail, remaining, nr_vecs;
557     struct bio_vec *bvec;
558     offset = (unsigned int) (bio->bi_sector & dmc->block_mask);
559     head = to_bytes(offset);
560     tail = to_bytes(dmc->block_size) – bio->bi_size – head;
566
567     if (0 == job->nr_pages) /* Original request is aligned with cache blocks */
568         r = dm_io_async_bvec(1, &job->dest, WRITE, bio->bi_io_vec + bio->bi_idx,
569                              io_callback, job);
570     else {
571         if (bio_data_dir(bio) == WRITE && head > 0 && tail > 0) {
573             nr_vecs = job->nr_pages + bio->bi_vcnt – bio->bi_idx;
574             if (offset && (offset + bio->bi_size < PAGE_SIZE)) nr_vecs++;
576             bvec = kmalloc(nr_vecs * sizeof(*bvec), GFP_KERNEL);
577             if (!bvec) {
578                 DMERR("do_store: No memory");
579                 return 1;
580             }
581
582             i = 0;
583             while (head) {
584                 bvec[i].bv_len = min(head, job->bvec[i].bv_len);
585                 bvec[i].bv_offset = 0;
586                 bvec[i].bv_page = job->bvec[i].bv_page;
587                 head -= bvec[i].bv_len;
588                 i++;
589             }
590             remaining = bio->bi_size;
591             j = bio->bi_idx;
592             while (remaining) {
593                 bvec[i] = bio->bi_io_vec[j];
594                 remaining -= bvec[i].bv_len;
595                 i++; j++;
596             }
597             j = (to_bytes(offset) + bio->bi_size) / PAGE_SIZE;
598             bvec[i].bv_offset = (to_bytes(offset) + bio->bi_size) –
599                                 j * PAGE_SIZE;
600             bvec[i].bv_len = PAGE_SIZE – bvec[i].bv_offset;
601             bvec[i].bv_page = job->bvec[j].bv_page;
602             tail -= bvec[i].bv_len;
603             i++; j++;
604             while (tail) {
605                 bvec[i] = job->bvec[j];
606                 tail -= bvec[i].bv_len;
607                 i++; j++;
608             }
609             kfree(job->bvec);
610             job->bvec = bvec;
611         }
612
613         r = dm_io_async_bvec(1, &job->dest, WRITE, job->bvec, io_callback, job);
614     }
615     return r;
616 }

这段代码和do_fetch()非常相像, 不再细述. 它把do_fetch()中从磁盘读取的数据, 通过dm_io_async_bvec()函数, 写入SSD(job->dest). 然后io_callback()再次被回调. 此时, 因为job->rw为WRITE, io_callback()将任务提交到_complete_jobs链表. 该链表对应的回调函数为do_complete():

673 static int do_complete(struct kcached_job *job)
674 {
675     int r = 0;
676     struct bio *bio = job->bio;
677
680     bio_endio(bio, 0);
681
682     if (job->nr_pages > 0) {
683         kfree(job->bvec);
684         kcached_put_pages(job->dmc, job->pages);
685     }
686
687     flush_bios(job->cacheblock);
688     mempool_free(job, _job_pool);
689
690     if (atomic_dec_and_test(&job->dmc->nr_jobs))
691         wake_up(&job->dmc->destroyq);
692
693     return r;
694 }

do_complete()首先调用bio_endio(), 告诉IO子系统上层, 当前bio已经处理完成. 然后释放页面. 之后调用flush_bios()重新提交在当前bio之后所有发往同个cacheblock的bios, 最后释放job.

至此, 读请求完成. 写请求与读请求大同小异, 不表.

总结陈词

IO处理内核化是一种有效的IO优化方式. 另外, IO路径网络化(iSCSI)也是大势所趋, 如Amazon的EBS及腾讯的CBS(入门参考块存储的世界). 希望以后一窥究竟.

kmemcache源码浅析

简介

kmemcache是memcache的linux内核移植版, 这两天断断续续的看了其网络方面的实现.

简单来说, kmemcache不落窠臼, 摈弃了epoll通知机制. 它借助skb的回调函数, 实现packet级别的调度. 在网路模型上, kmemcache分为一个dispatcher和多个workers(均为workqueue线程). dispatcher服务于TCP和unix domain sockets, 它将新建的连接丢给某个worker. 除此之外, workers还处理UDP请求.

下面详细分析源码.

mc_connector

kmemcache分为umemcached和kmemcache.ko两部分. umemcached为用户态daemon, 主要作用是解析启动参数, 将启动的settings信息传给kmemcache.ko. kmemcache.ko是内核模块,完成除解析启动参数之外的其他所有功能.

umemcached解析启动参数的代码非常简单, 不必多说.

这里简单分析下umemcached和kmemcache.ko通过netlink机制实现数据交互的代码(mc_connector.[hc]).

初始化阶段

kmemcache.ko模块在初始化时创建协议号为NETLINK_MEMCACHE的netlink socket, 注册其回调函数为mc_nl_data().
umemcached创建相同协议的netlink socket.

数据交互阶段

kmemcache.ko向umemcached发起请求的流程:

1. mc_get_unique_val — 分配请求的序列号, 并填写命令号
2. mc_add_callback(xx, xx, 1) — 注册该请求的函数函数
3. mc_send_msg_* — 发送数据. (同步请求)
4. mc_del_callback(xx, 1) — 删除cn_entry
5. mc_put_unique_val — 回收序列号

注意kmemcache.ko在调用mc_send_msg_*()发送数据时, 将等待在cn_entry.comp完成量上.

umemcached响应的流程与一般的网络服务代码无异, 通过epoll监听socket, 请求到达后, umemcached根据命令号调用sendmsg()响应. 这将回调mc_nl_callback(), 该函数在cn_queue.list中查找对应的cn_entry, 然后将cn_entry.work提交到cn_queue.workqueue. 当该work最终被调度时, 将回调mc_nc_work(). mc_nc_work()进一步回调通过mc_add_callback()注册的回调函数. 在这之后, mc_cn_work()调用complete(cn_entry.comp). 此时, 在mc_send_msg_*()函数内部等待cn_entry.comp的kmemcache.ko内核线程将被唤醒.

从上面的描述可以看出, kmemcache.ko与umemcached之间的数据交互是 “请求 – 应答” 式的. 且kmemcache.ko发送请求后将同步等待回复(可以指定等待时间).

相关细节

 61 struct cn_id {
 62     __u32   idx;
 63     __u32   val;
 64 };
 65
 66 struct cn_msg {
 67     struct cn_id id;
 68
 69     __u16   len;
 70     __u8    data[0];
 71 };
 72
 82 typedef void* (cn_callback_fn)(struct cn_msg *, struct netlink_skb_parms *);
 83
 84 struct cn_callback {
 85     struct sk_buff *skb;
 86
 87     cn_callback_fn *f;
 88
 89     void *out;
 90 };
 91
 92 struct cn_entry {
 93 #define ENTRY_NEW   (0x1 << 0)
 94 #define ENTRY_RUNNING   (0x1 << 1)
 95 #define ENTRY_FINISHED  (0x1 << 2)
 96     u32 flags:4;
 97     u32 unused:28;
 98     struct cn_id id;
 99     struct list_head list_entry;
100
101     struct cn_callback callback;
102     struct work_struct work;
103     struct completion comp;
104 };

cn_msg表示umemcached和kmemcache.ko间交互的数据包, cn_id为包头, cn_id.idx可以理解为命令号, 用以标识包体类型, cn_id.val可以理解为序列号, 用于防止窜包, 请求数据包和对应的回复数据包包头相同; 包体为len + data.

19 struct cn_queue {
21     struct workqueue_struct *workqueue;
24     struct list_head list;
25     spinlock_t lock;
26 };

cn_queue保存一个workqueue, 并以list成员维护cn_entry链表. 每个cn_entry结点实际上对应kmemcache.ko模块向umemcached发起的一个请求. kmemcache.ko发起请求前, 会分配一个cn_entry, 然后通过mc_add_callback()将cn_entry插入到cn_queue.list链表尾部. mc_add_callback()将该请求对应的回复数据包的回调函数保存在cn_entry.callback中, 并通过INIT_WORK初始化cn_entry.work, 注册cn_entry.work.func的回调函数为mc_cn_work(或mc_cn_work_del).

当umemcached响应请求, 往netlink socket连接响应数据时, mc_nl_callback()函数将被回调.

268 static void mc_nl_callback(struct sk_buff *_skb)
269 {
270     struct sk_buff *skb;
271     struct nlmsghdr *nlh;
272     struct cn_msg *msg;
273     struct cn_entry *entry;
274     struct cn_queue *queue = cn.queue;
275
276     skb = skb_get(_skb);
280     nlh = nlmsg_hdr(skb);
287
288     msg = NLMSG_DATA(nlh);
289     spin_lock_bh(&queue->lock);
290     list_for_each_entry(entry, &queue->list, list_entry) {
291         if (entry->id.idx == msg->id.idx &&
292             entry->id.val == msg->id.val) {
293             entry->callback.skb = skb;
295             queue_work(queue->workqueue, &entry->work);
306     }
307     spin_unlock_bh(&queue->lock);
315 }

可以看到, mc_nl_callback()根据umemcached回复的数据包包头, 在cn_queue.list链表中查找对应的cn_entry. 查找成功后, 将cn_entry.work提交到cn_queue.workqueue.

上文已经分析过mc_add_callback()注册了cn_entry.work.func为mc_cn_work, 所以当cn_entry.work任务被调度时, mc_cn_work()将被回调. 该函数最终将回调cn_entry.callback.f(). 而我们知道, 该回调函数是在调用mc_add_callback()时通过参数传入的.

例子

举个例子, 当kmemcache.ko需要退出时, 它调用shutdown_cmd(), 通过mc_add_callback()注册了回调函数shutdown_callback(), 然后通过mc_send_msg_timeout()向umemcached发出命令号为CN_IDX_SHUTDOWN的请求. umemcached回复内容后退出. kmemcache.ko收到回复, 表明umemcached马上就要退出, 通过一系列回调, 最终shutdown_callback()被调用. 然后kmemcache.ko调用try_shutdown()尝试将自己卸载. 此后, umemcached从sendmsg()调用中返回, 程序退出.

umemcached向kmemcache.ko传递启动设置信息的流程与退出流程原理相同, 不再赘述.

mc_dispatcher

dispatcher是listen sockets的管理器, 其数据结构为:

21 /* dispatcher master */
22 struct dispatcher_master {
23 #define ACCEPT_NEW  1
24 #define SOCK_CLOSE  2
25     unsigned long flags;
26
27     struct list_head list;     /* tcp/unix socket list */
28     spinlock_t lock;
29
30     struct workqueue_struct *wq;
31 };

其中, list成员为serve_socket组成的链表, 一个serve_socket对象对应的listen socket. 其结构为:

35 /* dispatcher listen socket */
36 struct serve_sock {
37     net_transport_t transport;
38     unsigned long state;       /* conn state */
39     struct socket *sock;       /* listen socket */
40     struct list_head list;     /* link to master’s listen list */
41     struct work_struct work;
42 };

mc_dispatcher.c文件头部定义了一个dispatcher_master结构的全局对象:

33 static struct dispatcher_master dsper;

其初始化函数为dispatcher_init():

701 /**
702  * init dispatcher.
703  * create the shared dispatcher kthread and start listen socket
704  *
705  * Returns 0 on success, error code other wise.
706  */
707 int dispatcher_init(void)
708 {
711     INIT_LIST_HEAD(&dsper.list);
712     spin_lock_init(&dsper.lock);
713
714     dsper.wq = create_singlethread_workqueue("kmcmasterd");
720
721     server_init();
725     set_bit(ACCEPT_NEW, &dsper.flags);
730 }

dispatcher_init()首先初始化dsper.list和dsper.lock, 然后调用create_singlethread_workqueue创建一个名为kmcmasterd的workqueue, 保存在dsper.wq中. 然后调用server_init()创建listen sockets. server_init()函数根据启动参数中是否创建unix domain sockets的标志选择调用server_socket_unix()还是server_inet_init(), 为讨论方便, 这里假设kmemcache启动时未要求创建unix domain sockets, 直接看server_inet_init()的实现:

509 static int server_inet_init(void)
510 {
512     char *path, *data = sock_info->data;
513     int selen = sizeof(sock_entry_t);
514     sock_entry_t *se = (sock_entry_t *)data;
515     struct file *filp = NULL;
532
533     for (; data + selen + se->addrlen <= path;) {
535     server_socket_inet(se, filp);
538         data += selen + se->addrlen;
539         se = (sock_entry_t *)data;
540     }
548 }

可以看到, server_inet_init()为启动时要求的每个端口调用server_socket_inet():

337 static int server_socket_inet(sock_entry_t *se, struct file *filp)
338 {
339     int ret = 0;
340     int flags = 1, level, name;
341     struct serve_sock *ss;
342     struct linger ling = {0, 0};
343
344     ss = __alloc_serve_sock(se->trans);
350
351     ret = sock_create_kern(se->family, se->type, se->protocol, &ss->sock);
358
359     if (!IS_UDP(se->trans)) {
360         ss->sock->sk->sk_allocation = GFP_ATOMIC;
361         set_sock_callbacks(ss->sock, ss);
362     }
363
415     ret = kernel_bind(ss->sock, (struct sockaddr *)se->addr, se->addrlen);
420
421     if (!IS_UDP(se->trans)) {
422         ret = kernel_listen(ss->sock, settings.backlog);
427     }
436
437     if (IS_UDP(se->trans)) {
438         static int last_cpu = –1;
439         int cpu, res = 0;
440
441         if (settings.num_threads_per_udp == 1) {
442             last_cpu = (last_cpu + 1) % num_online_cpus();
443             ret = mc_dispatch_conn_udp(ss->sock, conn_read,
444                            UDP_READ_BUF_SIZE, last_cpu);
445             if (!ret) res++;
446         } else {
447             for_each_online_cpu(cpu) {
448                 ret = mc_dispatch_conn_udp(ss->sock, conn_read,
449                                UDP_READ_BUF_SIZE,
450                                cpu);
451                 if (!ret) res++;
452             }
453         }
454
463     } else {
464         spin_lock(&dsper.lock);
465         list_add_tail(&ss->list, &dsper.list);
466         spin_unlock(&dsper.lock);
467     }
468
480 }

函数server_socket_inet()首先调用__alloc_serve_sock()创建并初始化一个serve_sock对象, 然后调用sock_create_kern()创建一个socket, 然后对该socket进行一系列的setsockopt, bind, listen等初始化操作. 在这之后, server_socket_inet()根据这个socket是否为UDP协议, 分为两类操作:

1. 如果为UDP协议, 调用mc_dispatch_conn_udp(), 后文细说
2. 如果不是UDP协议, 那么将ss链接到dsper.list链表

然后server_socket_inet()函数退出.

接下来看看TCP listen sockets, 其实在第2点之前, server_socket_inet()函数针对非UDP socket, 将通过set_sock_callbacks()注册该socket的几个回调函数:

240 static void set_sock_callbacks(struct socket *sock, struct serve_sock *ss)
241 {
242     struct sock *sk = sock->sk;
245
246     sk->sk_user_data    = ss;
247     sk->sk_data_ready   = mc_disp_data_ready;
252 }

当某个listen socket上有新连接到达时, 将回调sk_user_data_ready, 也就是mc_disp_data_ready()函数:

216 /* data available on socket, or listen socket received a connect */
217 static void mc_disp_data_ready(struct sock *sk, int unused)
218 {
219     struct serve_sock *ss =
220         (struct serve_sock *)sk->sk_user_data;
221
224     if (sk->sk_state == TCP_LISTEN)
225         _queue(ss);
226 }

看下_queue(ss)的实现:

202 static void inline _queue(struct serve_sock *ss)
203 {
209     queue_work(dsper.wq, &ss->work);
210 }

可以看到, _queue()非常简单, 它将ss->work提交到dsper.wq. ss->work的回调函数在server_socker_inet()调用__alloc_serve_sock()创建时设置为mc_listen_work().

在这之后, 当ss->work任务被调度时, mc_listen_work()将被回调:

190 static void mc_listen_work(struct work_struct *work)
191 {
192     struct serve_sock *ss =
193         container_of(work, struct serve_sock, work);
194
195     /* accept many */;
196     for (; !test_bit(SOCK_CLOSE, &dsper.flags);) {
197         if (mc_accept_one(ss))
198             break;
199     }
200 }

mc_listen_work()尽可能的通过mc_accept_one()接收新连接.

141 static int mc_accept_one(struct serve_sock *ss)
142 {
144     struct socket *nsock;
145     struct socket *sock = ss->sock;
146
147     sock_create_lite(sock->sk->sk_family, sock->sk->sk_type,
148                    sock->sk->sk_protocol, &nsock);
151
152     nsock->type = sock->type;
153     nsock->ops = sock->ops;
154     sock->ops->accept(sock, nsock, O_NONBLOCK);
157
158     nsock->sk->sk_allocation = GFP_ATOMIC;
159     set_anon_sock_callbacks(nsock);
165
174     mc_dispatch_conn_new(nsock, conn_new_cmd,
175          DATA_BUF_SIZE, ss->transport);
188 }

mc_accept_one()通过sock_create_lite()和sock->ops->accept()得到新连接, 之后调用mc_dispatch_conn_new(). 是不是觉得这个函数有点眼熟呢?

mc_worker

上文提到, 针对UDP socket, dispatcher将调用mc_dispatch_conn_udp(). 而针对accept出来的新连接, 将调用mc_dispatch_conn_new(). 实际上这两个函数的是对__dispatch_conn_new()的简单封装, 这就使得UDP sockets和TCP sockets的处理得到了统一:

682 int mc_dispatch_conn_udp(struct socket *sock, conn_state_t state,
683              int rbuflen, int cpu)
684 {
685     return __dispatch_conn_new(sock, state, rbuflen, udp_transport, cpu);
686 }
687
688 int mc_dispatch_conn_new(struct socket *sock, conn_state_t state,
689              int rbuflen, net_transport_t transport)
690 {
691     int ret;
692
693     ret = __dispatch_conn_new(sock, state, rbuflen, transport, get_cpu());
694     put_cpu();
695
696     return ret;
697 }

接下来一窥__dispatch_conn_new()究竟:

643 /**
644  * Dispatches a new connection to another thread.
645  *
646  * Returns 0 on success, error code other wise
647  */
648 static inline int __dispatch_conn_new(struct socket *sock, conn_state_t state,
649                       int rbuflen, net_transport_t transport, int cpu)
650 {
651     int ret = 0;
652     struct conn_req *rq;
653
654     rq = new_conn_req();
660
661     rq->state = state;
662     rq->transport = transport;
663     rq->sock = sock;
664     rq->rsize = rbuflen;
665     INIT_WORK(&rq->work, mc_conn_new_work);
666
667     ret = queue_work_on(cpu, slaved, &rq->work);
673
674     return 0;
680 }

该函数也是非常简单, 为参数socket *sock创建并初始化一个conn_req *rq对象, 注册rq->work的回调函数为mc_conn_new_work, 然后通过queue_work_on()提交任务到名为slaved的workqueue. slaved是在kmemcache.ko模块在初始化时通过kmemcache_init() -> register_kmemcache_bh() -> kmemcache_bh_init() -> __kmemcache_bh_init() -> worker_init() 调用链初始化的. * (这里所说的调用链未区分调用和回调)

699 /**
700  * create slaved’s workqueue & info storage.
701  *
702  * Returns 0 on success, error code other wise.
703  */
704 int workers_init(void)
705 {
733     slaved = create_workqueue("kmcslaved");
748 }

可以看到, slaved被创建所使用的是create_workqueue(), 简单理解为通过该函数为每个CPU创建了对应的worker线程. 而queue_work_on(cpu, slaved, &rq->work)的第一个参数CPU的含义, 便是指定rq->work任务提交给slaved workqueue的哪个CPU对应的worker线程上.

回头看看mc_dispatch_conn_udp()和mc_dispatch_conn_new()的实现, 不难发现:

— 对UDP socket, rq->work任务所提交的CPU由参数传入. mc_dispatch_conn_udp()由server_socket_inet()调用, 通过server_socket_inet()第437-453行得知, kmemcache.ko模块将根据settings.num_threads_per_udp是否为1, 也就是每个UDP socket是否只使用一个worker线程的配置, 决定将一个UDP socket提交到某个CPU(各UDP sockets以round robin形式选择一个CPU), 还是将该UDP socket提交到所有在线的CPUs.

— 对TCP socket, rq->work任务所提交的CPU恰恰就是mc_dispatch_conn_new()被执行时所在的CPU. 而该函数的调用者mc_listen_work(), 其实是作为一个任务, 由sk_user_data_ready()(即mc_disp_data_ready())调用_queue()提交到dsper.wq的. dsper.wq由create_singlethread_workqueue()创建, 它对应一个线程, 该线程在多个CPU之间调度, 该线程调度在某个CPU上执行, mc_dispatch_conn_new()被将rq->work提交到slaved workqueue的哪个CPU对应的worker线程上.

到这里, 无论是UDP sockets还是accept出来的TCP sockets, 它们都被抽象成一个conn_req *rq, rq->work的回调函数统一为mc_conn_new_work(). 然后rq->work被提交到了slaved的某个CPU worker线程. 而这里所谓的”某个CPU”的选择, 是kmemcache代码实现的(即作者jgli说的”基于packet的线程调度机制“), 它保证了同个请求前后的多次处理始终在同一个CPU上, 一方面提高cache命中率, 另一方面合理利用了多CPU资源. 从这点看, kmemcache有点像RPS, RFS补丁(更多), 当然kmemcache更加强大, 控制能力更强.

接下来, 便是rq->work任务被调度后, mc_conn_new_work()得到回调:

600 static void mc_conn_new_work(struct work_struct *work)
601 {
602     conn *c;
603     struct conn_req *rq =
604         container_of(work, struct conn_req, work);
605
606     c = mc_conn_new(rq);
611     mc_queue_conn(c);
623 }

388 void mc_queue_conn(conn *c)
389 {
395     __queue_conn(c);
396 }

368 static inline void __queue_conn(conn *c)
369 {
380     queue_work(slaved, &c->work);
386 }

在这里, conn_req *rq进一步被抽象为conn *c, 然后由mc_queue_conn()将c->work提交到原来所在的CPU的slaved orkqueue上. 回调函数由mc_conn_new()注册为mc_conn_work().

 70 conn* mc_conn_new(struct conn_req *rq)
 71 {
 74      conn *c = _conn_new();
119     c->sock = rq->sock;
120     c->state = rq->state;
121     c->transport = rq->transport;
122     INIT_WORK(&c->work, mc_conn_work);
123     atomic_set(&c->nref, 1);
124     set_bit(EV_RDWR, &c->event);
125     set_sock_callbacks(c->sock, c);
145 }

625 void mc_conn_work(struct work_struct *work)
626 {
634     mc_worker_machine(c);
637     mc_requeue_conn(c);
641 }

mc_worker_machine()由conn *c当前的状态驱动, 如已读数据不满足解析状态则读入数据, 否则解析数据, 解析成功曾调用相应的处理函数, 有数据可写则写出数据, 等等等等. 因为开始和memcache逻辑息息相关了, 后面的代码我未做深究.

因为workqueue是one shot的, 回调后若仍需后续处理, 自然该重新提交任务, 很明显这就是mc_requeue_conn()的功能. 值得说明的一点是, 在这之前, mc_requeue_conn()将通过sock->ops->poll()主动获取当前socket的读写状态并填入conn *c的event字段:

398 void mc_requeue_conn(conn *c)
399 {
400     int poll;
401
402     if (test_bit(EV_DEAD, &c->event)) {
403         PRINFO("mc_requeue_conn %p ignore EV_DEAD", c);
404         return;
405     }
406
407     poll = c->sock->ops->poll(c->sock->file, c->sock, NULL);
408     if (test_bit(EV_RDWR, &c->event)) {
409         if (poll & CONN_READ) {
410             goto queue_conn;
411         } else {
412             PRINFO("mc_queue_conn %p ignore EV_READ", c);
413         }
414     } else {
415         if (poll & CONN_WRITE) {
416             goto queue_conn;
417         } else {
418             PRINFO("mc_queue_conn %p ignore EV_WRITE", c);
419         }
420     }
421
422     return;
423
424 queue_conn:
425     __queue_conn(c);
426
427 }

注意调用poll()时最后一个参数为NULL, 这说明仅仅要求获取当前socket事件, 而不需要内核为该socket创建wait队列并在socket状态将来改变时回调以唤醒等待进程. (个人感觉将poll()延后到由mc_work_machine()函数调用后更好, 当前的实现是poll()出事件, 然后提交任务, 因而在任务被调度时可能该socket上又有了新事件, 但mc_worker_machine()对此毫不知情.)

总结一下, 不管是UDP请求还是TCP请求, 都通过__dispatch_conn_new()提交任务到slaved. 任务的回调是mc_conn_new_work(). 该函数进一步将请求抽象为conn *c, 并再次向原来的CPU对应的slaved workqueue提交任务, 回调为mc_conn_work(). mc_conn_work()由conn *c的状态驱动, 每次被回调后会判断任务是否已完成, 若未完成, 则重新提交任务.

epoll的不足

在用户态网络服务上, epoll工作的足够好, 当然, 这是比起select而言. 如果尝试在内核态使用epoll, 不难发现它的不足.

1. epoll_wait实质是轮询
2. epoll未反馈socket最后所在的CPU

下面根据源码简单阐述(基于个人理解, 若有不对, 欢迎指正).

1446 static int ep_poll(struct eventpoll *ep, struct epoll_event __user *events,
1447            int maxevents, long timeout)
1448 {
1471 fetch_events:
1472     spin_lock_irqsave(&ep->lock, flags);
1473
1474     if (!ep_events_available(ep)) {
1482
1483         for (;;) {
1489             set_current_state(TASK_INTERRUPTIBLE);
1490             if (ep_events_available(ep) || timed_out)
1491                 break;
1492             if (signal_pending(current)) {
1493                 res = –EINTR;
1494                 break;
1495             }
1496
1497             spin_unlock_irqrestore(&ep->lock, flags);
1498             if (!schedule_hrtimeout_range(to, slack, HRTIMER_MODE_ABS))
1499                 timed_out = 1;
1500
1501             spin_lock_irqsave(&ep->lock, flags);
1502         }
1504
1505         set_current_state(TASK_RUNNING);
1506     }
1523 }

可以看到, ep_poll()在for循环中不断轮询是否有socket可用, 若无socket可用, 则调用schedule_hrtimeout_range()主动让出CPU, 直到超时或有可以sockets.

实际上, epoll多路复用的功能, 是依靠->f_op->poll()注册回调实现的. 以ep_insert()为例:

1145 static int ep_insert(struct eventpoll *ep, struct epoll_event *event,
1146              struct file *tfile, int fd)
1147 {
1152     struct ep_pqueue epq;
1153
1177     /* Initialize the poll table using the queue callback */
1178     epq.epi = epi;
1179     init_poll_funcptr(&epq.pt, ep_ptable_queue_proc);
1180     epq.pt._key = event->events;
1181
1182     /*
1183      * Attach the item to the poll hooks and get current event bits.
1184      * We can safely use the file* here because its usage count has
1185      * been increased by the caller of this function. Note that after
1186      * this operation completes, the poll callback can start hitting
1187      * the new item.
1188      */
1189     revents = tfile->f_op->poll(tfile, &epq.pt);
1269 }

ep_insert()通过tfile->f_op->poll()(对socket而言, 为sock_poll(); 更进一步, 对tcp socket来说, 便是tcp_poll())调用poll_wait()将回调函数ep_ptable_queue_proc()注册在wait queue上. 当socket状态改变时, 内核协议栈通过wait_event_*()对wait queue上的回调函数逐个回调. 对ep_ptable_queue_proc()而言, 它将fd封转为epitem添加到目的file的sock等待队列, 回调函数为ep_poll_callback().

当socket收到数据后, 内核协议栈将回调sk_data_ready(默认为sock_def_readable), 最终会调用ep_poll_callback():

896 static int ep_poll_callback(wait_queue_t *wait, unsigned mode, int sync, void *key)
897 {
956     /* If this file is already in the ready list we exit soon */
957     if (!ep_is_linked(&epi->rdllink)) {
958         list_add_tail(&epi->rdllink, &ep->rdllist);
959         __pm_stay_awake(epi->ws);
960     }
979 }

ep_poll_callback()将socket插入就绪队列. 而epoll_wait()轮询的正是就绪队列是否为空.

从上面的讨论可以看到, epoll_wait本质为轮询, 且其分割了数据逻辑和处理逻辑: socket有事件后, 通过辗转回调插入就绪队列, 最后由epoll_wait收割回用户态进行处理. 另一方面, 用户态无法获取就绪的socket所在的CPU, 处理逻辑如果不在原来的CPU, 则CPU cache命中率势必会受到影响.

高性能, 路漫漫

我曾断断续续的写过一个内核态网络框架knp, 原理与kmemcache几乎相同, 当然在实现上天真很多. (当时太过强调兼容已有的网络框架, 导致不少时间被浪费在重造fifo, msg queue, shm allocator, …之上. 现在回想起来, 后悔不已.)

和kmemcache作者jgli一样, 在读完The Secret To 10 Million Concurrent Connections -The Kernel Is The Problem, Not The Solution后, 感触颇深: 内核确实带来了太多的overhead. 于是我转头了解了netmap项目, 其作者是N年前提出DEVICE_POLLING的Luigi Rizzo. netmap代码不多, 只是有太多我尚未熟悉的领域, 于是浅尝辄止, 悻悻作罢. 以后有空再看看吧.

至于我那个knp框架, 早已沉寂多日了.