对于文件的读写,即使以O_NONBLOCK方式来打开一个文件,也会处于"阻塞"状态。因为文件时时刻刻处于可读状态。而从磁盘到内存所等待的时间是惊人的。为了充份发挥把数据从磁盘复制到内存的时间,引入了aio模型。linux下有aio封装,但是aio采用的是线程或信号用以通知,为了能更多的控制io行为,可以使用更为低级libaio。
一、基本函数与结构
1. libaio函数
extern int io_setup(int maxevents, io_context_t *ctxp); extern int io_destroy(io_context_t ctx); extern int io_submit(io_context_t ctx, long nr, struct iocb *ios[]); extern int io_cancel(io_context_t ctx, struct iocb *iocb, struct io_event *evt); extern int io_getevents(io_context_t ctx_id, long min_nr, long nr, struct io_event *events, struct timespec *timeout);
2. 结构
struct io_iocb_poll {
PADDED(int events, __pad1);
}; /* result code is the set of result flags or -'ve errno */
struct io_iocb_sockaddr {
struct sockaddr *addr;
int len;
}; /* result code is the length of the sockaddr, or -'ve errno */
struct io_iocb_common {
PADDEDptr(void *buf, __pad1);
PADDEDul(nbytes, __pad2);
long long offset;
long long __pad3;
unsigned flags;
unsigned resfd;
}; /* result code is the amount read or -'ve errno */
struct io_iocb_vector {
const struct iovec *vec;
int nr;
long long offset;
}; /* result code is the amount read or -'ve errno */
struct iocb {
PADDEDptr(void *data, __pad1); /* Return in the io completion event */
PADDED(unsigned key, __pad2); /* For use in identifying io requests */
short aio_lio_opcode;
short aio_reqprio;
int aio_fildes;
union {
struct io_iocb_common c;
struct io_iocb_vector v;
struct io_iocb_poll poll;
struct io_iocb_sockaddr saddr;
} u;
};
struct io_event {
PADDEDptr(void *data, __pad1);
PADDEDptr(struct iocb *obj, __pad2);
PADDEDul(res, __pad3);
PADDEDul(res2, __pad4);
};
3. 内联函数
static inline void io_set_callback(struct iocb *iocb, io_callback_t cb); static inline void io_prep_pread(struct iocb *iocb, int fd, void *buf, size_t count, long long offset); static inline void io_prep_pwrite(struct iocb *iocb, int fd, void *buf, size_t count, long long offset); static inline void io_prep_preadv(struct iocb *iocb, int fd, const struct iovec *iov, int iovcnt, long long offset); static inline void io_prep_pwritev(struct iocb *iocb, int fd, const struct iovec *iov, int iovcnt, long long offset); /* Jeff Moyer says this was implemented in Red Hat AS2.1 and RHEL3. * AFAICT, it was never in mainline, and should not be used. --RR */ static inline void io_prep_poll(struct iocb *iocb, int fd, int events); static inline int io_poll(io_context_t ctx, struct iocb *iocb, io_callback_t cb, int fd, int events); static inline void io_prep_fsync(struct iocb *iocb, int fd); static inline int io_fsync(io_context_t ctx, struct iocb *iocb, io_callback_t cb, int fd); static inline void io_prep_fdsync(struct iocb *iocb, int fd); static inline int io_fdsync(io_context_t ctx, struct iocb *iocb, io_callback_t cb, int fd); static inline void io_set_eventfd(struct iocb *iocb, int eventfd);
二、使用方法
1、初使化io_context2、open文件取得fd
3、根据fd,buffer offset等息建立iocb
4、submit iocb到context
5、io_getevents取得events状态
6、回到3步
三、例子
#include <unistd.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <error.h>
#include <errno.h>
#include <fcntl.h>
#include <libaio.h>
int main(int argc, char *argv[])
{
// 每次读入32K字节
const int buffer_size = 0x8000;
// 最大事件数 32
const int nr_events = 32;
int rt;
io_context_t ctx = {0};
// 初使化 io_context_t
rt = io_setup(nr_events, &ctx);
if ( rt != 0 )
error(1, rt, "io_setup");
// 依次读取参数作为文件名加入提交到ctx
int pagesize = sysconf(_SC_PAGESIZE);
for (int i=1; i<argc; ++i) {
iocb *cb = (iocb*)malloc(sizeof(iocb));
void *buffer;
// 要使用O_DIRECT, 必须要对齐
posix_memalign(&buffer, pagesize, buffer_size);
io_prep_pread(cb, open(argv[i], O_RDONLY | O_DIRECT), buffer, buffer_size, 0);
rt = io_submit(ctx, 1, &cb);
if (rt < 0)
error(1, -rt, "io_submit %s", argv[i]);;
}
io_event events[nr_events];
iocb *cbs[nr_events];
int remain = argc - 1;
int n = 0;
// 接收数据最小返回的请求数为1,最大为nr_events
while (remain && (n = io_getevents(ctx, 1, nr_events, events, 0))) {
int nr_cbs = 0;
for (int i=0; i<n; ++i) {
io_event &event = events[i];
iocb *cb = event.obj;
// event.res为unsigned
//printf("%d receive %d bytes\n", cb->aio_fildes, event.res);
if (event.res > buffer_size) {
printf("%s\n", strerror(-event.res));
}
if (event.res != buffer_size || event.res2 != 0) {
--remain;
// 释放buffer, fd 与 cb
free(cb->u.c.buf);
close(cb->aio_fildes);
free(cb);
} else {
// 更新cb的offset
cb->u.c.offset += event.res;
cbs[nr_cbs++] = cb;
}
}
if (nr_cbs) {
// 继续接收数据
io_submit(ctx, nr_cbs, cbs);
}
}
return 0;
}$ truncate foo.txt -s 100K $ truncate foo2.txt -s 200K $ g++ -O3 libaio_simple.cc -laio && ./a.out foo.txt foo2.txt 3 received 32768 bytes 4 received 32768 bytes 3 received 32768 bytes 4 received 32768 bytes 3 received 32768 bytes 4 received 32768 bytes 3 received 4096 bytes 3 done. 4 received 32768 bytes 4 received 32768 bytes 4 received 32768 bytes 4 received 8192 bytes 4 done.