Linux libaio

Linux aio是Linux下的异步读写模型。
对于文件的读写,即使以O_NONBLOCK方式来打开一个文件,也会处于"阻塞"状态。因为文件时时刻刻处于可读状态。而从磁盘到内存所等待的时间是惊人的。为了充份发挥把数据从磁盘复制到内存的时间,引入了aio模型。linux下有aio封装,但是aio采用的是线程或信号用以通知,为了能更多的控制io行为,可以使用更为低级libaio。

一、基本函数与结构

1. libaio函数

extern int io_setup(int maxevents, io_context_t *ctxp);
extern int io_destroy(io_context_t ctx);
extern int io_submit(io_context_t ctx, long nr, struct iocb *ios[]);
extern int io_cancel(io_context_t ctx, struct iocb *iocb, struct io_event *evt);
extern int io_getevents(io_context_t ctx_id, long min_nr, long nr, struct io_event *events, struct timespec *timeout);

2. 结构

struct io_iocb_poll {
	PADDED(int events, __pad1);
};	/* result code is the set of result flags or -'ve errno */

struct io_iocb_sockaddr {
	struct sockaddr *addr;
	int		len;
};	/* result code is the length of the sockaddr, or -'ve errno */

struct io_iocb_common {
	PADDEDptr(void	*buf, __pad1);
	PADDEDul(nbytes, __pad2);
	long long	offset;
	long long	__pad3;
	unsigned	flags;
	unsigned	resfd;
};	/* result code is the amount read or -'ve errno */

struct io_iocb_vector {
	const struct iovec	*vec;
	int			nr;
	long long		offset;
};	/* result code is the amount read or -'ve errno */

struct iocb {
	PADDEDptr(void *data, __pad1);	/* Return in the io completion event */
	PADDED(unsigned key, __pad2);	/* For use in identifying io requests */

	short		aio_lio_opcode;	
	short		aio_reqprio;
	int		aio_fildes;

	union {
		struct io_iocb_common		c;
		struct io_iocb_vector		v;
		struct io_iocb_poll		poll;
		struct io_iocb_sockaddr	saddr;
	} u;
};

struct io_event {
	PADDEDptr(void *data, __pad1);
	PADDEDptr(struct iocb *obj,  __pad2);
	PADDEDul(res,  __pad3);
	PADDEDul(res2, __pad4);
};

3. 内联函数

static inline void io_set_callback(struct iocb *iocb, io_callback_t cb);
static inline void io_prep_pread(struct iocb *iocb, int fd, void *buf, size_t count, long long offset);
static inline void io_prep_pwrite(struct iocb *iocb, int fd, void *buf, size_t count, long long offset);
static inline void io_prep_preadv(struct iocb *iocb, int fd, const struct iovec *iov, int iovcnt, long long offset);
static inline void io_prep_pwritev(struct iocb *iocb, int fd, const struct iovec *iov, int iovcnt, long long offset);
/* Jeff Moyer says this was implemented in Red Hat AS2.1 and RHEL3.
 * AFAICT, it was never in mainline, and should not be used. --RR */
static inline void io_prep_poll(struct iocb *iocb, int fd, int events);
static inline int io_poll(io_context_t ctx, struct iocb *iocb, io_callback_t cb, int fd, int events);
static inline void io_prep_fsync(struct iocb *iocb, int fd);
static inline int io_fsync(io_context_t ctx, struct iocb *iocb, io_callback_t cb, int fd);
static inline void io_prep_fdsync(struct iocb *iocb, int fd);
static inline int io_fdsync(io_context_t ctx, struct iocb *iocb, io_callback_t cb, int fd);
static inline void io_set_eventfd(struct iocb *iocb, int eventfd);

二、使用方法

1、初使化io_context
2、open文件取得fd
3、根据fd,buffer offset等息建立iocb
4、submit iocb到context
5、io_getevents取得events状态
6、回到3步

三、例子

#include <unistd.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <error.h>
#include <errno.h>

#include <fcntl.h>
#include <libaio.h>

int main(int argc, char *argv[])
{
	// 每次读入32K字节
	const int buffer_size = 0x8000;

	// 最大事件数 32
	const int nr_events   = 32;
	int rt;

	io_context_t ctx = {0};

	// 初使化 io_context_t
	rt = io_setup(nr_events, &ctx);
	if ( rt != 0 )
		error(1, rt, "io_setup");

	// 依次读取参数作为文件名加入提交到ctx
	int pagesize = sysconf(_SC_PAGESIZE);
	for (int i=1; i<argc; ++i) {
		iocb *cb = (iocb*)malloc(sizeof(iocb));
		void *buffer;
		// 要使用O_DIRECT, 必须要对齐
		posix_memalign(&buffer, pagesize, buffer_size);
		io_prep_pread(cb, open(argv[i], O_RDONLY | O_DIRECT), buffer, buffer_size, 0);
		rt = io_submit(ctx, 1, &cb);
		if (rt < 0)
			error(1, -rt, "io_submit %s", argv[i]);;
	}

	io_event events[nr_events];
	iocb     *cbs[nr_events];

	int remain = argc - 1;
	int n      = 0;

	// 接收数据最小返回的请求数为1,最大为nr_events
	while (remain && (n = io_getevents(ctx, 1, nr_events, events, 0))) {
		int nr_cbs = 0;
		for (int i=0; i<n; ++i) {
			io_event &event = events[i];
			iocb     *cb    = event.obj;
			// event.res为unsigned
			//printf("%d receive %d bytes\n", cb->aio_fildes, event.res);
			if (event.res > buffer_size) {
				printf("%s\n", strerror(-event.res));
			}
			if (event.res != buffer_size || event.res2 != 0) {
				--remain;
				// 释放buffer, fd 与 cb
				free(cb->u.c.buf);
				close(cb->aio_fildes);
				free(cb);
			} else {
				// 更新cb的offset
				cb->u.c.offset += event.res;
				cbs[nr_cbs++] = cb;
			}
		}

		if (nr_cbs) {
			// 继续接收数据
			io_submit(ctx, nr_cbs, cbs);
		}
	}
	return 0;
}
运行
$ truncate foo.txt -s 100K
$ truncate foo2.txt -s 200K
$ g++ -O3 libaio_simple.cc -laio && ./a.out foo.txt foo2.txt
3 received 32768 bytes
4 received 32768 bytes
3 received 32768 bytes
4 received 32768 bytes
3 received 32768 bytes
4 received 32768 bytes
3 received 4096 bytes
3 done.
4 received 32768 bytes
4 received 32768 bytes
4 received 32768 bytes
4 received 8192 bytes
4 done.

四、其它

这里有个问题,因为O_DIRECT跳过系统缓存,直接从磁盘读取,对于读写来讲是个大问题。要自已实现缓存,需要一堆东西要啃,而且还不一定写得好。

发表评论

电子邮件地址不会被公开。 必填项已被标记为 *

*

您可以使用这些 HTML 标签和属性: <a href="" title=""> <abbr title=""> <acronym title=""> <b> <blockquote cite=""> <cite> <code> <del datetime=""> <em> <i> <q cite=""> <strike> <strong>