Browse Source

implement POSIX asynchronous io

some features are not yet supported, and only minimal testing has been
performed. should be considered experimental at this point.
Rich Felker 13 years ago
parent
commit
b4de6f93ae
8 changed files with 398 additions and 0 deletions
  1. 60 0
      include/aio.h
  2. 16 0
      src/aio/aio_cancel.c
  3. 6 0
      src/aio/aio_error.c
  4. 9 0
      src/aio/aio_fsync.c
  5. 104 0
      src/aio/aio_readwrite.c
  6. 6 0
      src/aio/aio_return.c
  7. 57 0
      src/aio/aio_suspend.c
  8. 140 0
      src/aio/lio_listio.c

+ 60 - 0
include/aio.h

@@ -0,0 +1,60 @@
+#ifndef _AIO_H
+#define _AIO_H
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+#if defined(_POSIX_SOURCE) || defined(_POSIX_C_SOURCE) \
+ || defined(_XOPEN_SOURCE) || defined(_GNU_SOURCE)
+
+#include <signal.h>
+#include <time.h>
+
+#define __NEED_ssize_t
+#define __NEED_off_t
+
+#include <bits/alltypes.h>
+
+struct aiocb {
+	int aio_filedes, aio_lio_opcode, aio_reqprio;
+	volatile void *aio_buf;
+	size_t aio_nbytes;
+	struct sigevent aio_sigevent;
+	void *__td;
+	int __lock[2];
+	int __err;
+	ssize_t __ret;
+	off_t aio_offset;
+	void *__next, *__prev;
+	char __dummy4[32-2*sizeof(void *)];
+};
+
+#define AIO_CANCELED 0
+#define AIO_NOTCANCELED 1
+#define AIO_ALLDONE 2
+
+#define LIO_READ 0
+#define LIO_WRITE 1
+#define LIO_NOP 2
+
+#define LIO_WAIT 0
+#define LIO_NOWAIT 1
+
+int aio_read(struct aiocb *);
+int aio_write(struct aiocb *);
+int aio_error(struct aiocb *);
+ssize_t aio_return(struct aiocb *);
+int aio_cancel(int, struct aiocb *);
+int aio_suspend(struct aiocb *const [], int, const struct timespec *);
+int aio_fsync(int, struct aiocb *);
+
+int lio_listio(int, struct aiocb *const [], int, struct sigevent *);
+
+#endif
+
+#ifdef __cplusplus
+}
+#endif
+
+#endif

+ 16 - 0
src/aio/aio_cancel.c

@@ -0,0 +1,16 @@
+#include <aio.h>
+#include <pthread.h>
+#include <errno.h>
+
+int aio_cancel(int fd, struct aiocb *cb)
+{
+	if (!cb) {
+		/* FIXME: for correctness, we should return AIO_ALLDONE
+		 * if there are no outstanding aio operations on this
+		 * file descriptor, but that would require making aio
+		 * much slower, and seems to have little advantage since
+		 * we don't support cancellation anyway. */
+		return AIO_NOTCANCELED;
+	}
+	return cb->__err==EINPROGRESS ? AIO_NOTCANCELED : AIO_ALLDONE;
+}

+ 6 - 0
src/aio/aio_error.c

@@ -0,0 +1,6 @@
+#include <aio.h>
+
+int aio_error(struct aiocb *cb)
+{
+	return cb->__err;
+}

+ 9 - 0
src/aio/aio_fsync.c

@@ -0,0 +1,9 @@
+#include <aio.h>
+#include <errno.h>
+
+int aio_fsync(int op, struct aiocb *cb)
+{
+	/* FIXME: unsupported */
+	errno = EINVAL;
+	return -1;
+}

+ 104 - 0
src/aio/aio_readwrite.c

@@ -0,0 +1,104 @@
+#include <aio.h>
+#include <fcntl.h>
+#include "pthread_impl.h"
+
+static void dummy(void)
+{
+}
+
+weak_alias(dummy, __aio_wake);
+
+static void notify_signal(struct sigevent *sev)
+{
+	siginfo_t si = {
+		.si_signo = sev->sigev_signo,
+		.si_value = sev->sigev_value,
+		.si_code = SI_ASYNCIO,
+		.si_pid = __pthread_self()->pid,
+		.si_uid = getuid()
+	};
+	__syscall(SYS_rt_sigqueueinfo, si.si_pid, si.si_signo, &si);
+}
+
+static void *io_thread(void *p)
+{
+	struct aiocb *cb = p;
+	int fd = cb->aio_filedes;
+	void *buf = (void *)cb->aio_buf;
+	size_t len = cb->aio_nbytes;
+	off_t off = cb->aio_offset;
+	int op = cb->aio_lio_opcode;
+	struct sigevent sev = cb->aio_sigevent;
+	ssize_t ret;
+
+	if (op == LIO_WRITE) {
+		if (  (fcntl(fd, F_GETFL) & O_APPEND)
+		    ||((ret = pwrite(fd, buf, len, off))<0 && errno==ESPIPE) )
+			ret = write(fd, buf, len);
+	} else if (op == LIO_READ) {
+		if ( (ret = pread(fd, buf, len, off))<0 && errno==ESPIPE )
+			ret = read(fd, buf, len);
+	} else {
+		ret = 0;
+	}
+	cb->__ret = ret;
+
+	if (ret < 0) a_store(&cb->__err, errno);
+	else a_store(&cb->__err, 0);
+
+	__aio_wake();
+
+	switch (cb->aio_sigevent.sigev_notify) {
+	case SIGEV_SIGNAL:
+		notify_signal(&sev);
+		break;
+	case SIGEV_THREAD:
+		sev.sigev_notify_function(sev.sigev_value);
+		break;
+	}
+
+	return 0;
+}
+
+static int new_req(struct aiocb *cb)
+{
+	int ret = 0;
+	pthread_attr_t a;
+	sigset_t set;
+	pthread_t td;
+
+	if (cb->aio_sigevent.sigev_notify == SIGEV_THREAD) {
+		if (cb->aio_sigevent.sigev_notify_attributes)
+			a = *cb->aio_sigevent.sigev_notify_attributes;
+		else
+			pthread_attr_init(&a);
+	} else {
+		pthread_attr_init(&a);
+		pthread_attr_setstacksize(&a, PAGE_SIZE);
+		pthread_attr_setguardsize(&a, 0);
+	}
+	pthread_attr_setdetachstate(&a, PTHREAD_CREATE_DETACHED);
+	sigfillset(&set);
+	pthread_sigmask(SIG_BLOCK, &set, &set);
+	cb->__err = EINPROGRESS;
+	if (pthread_create(&td, &a, io_thread, cb)) {
+		errno = EAGAIN;
+		ret = -1;
+	}
+	pthread_sigmask(SIG_SETMASK, &set, 0);
+	cb->__td = td;
+
+	return ret;
+}
+
+ssize_t aio_read(struct aiocb *cb)
+{
+	cb->aio_lio_opcode = LIO_READ;
+	return new_req(cb);
+}
+
+ssize_t aio_write(struct aiocb *cb)
+{
+	cb->aio_lio_opcode = LIO_WRITE;
+	return new_req(cb);
+}

+ 6 - 0
src/aio/aio_return.c

@@ -0,0 +1,6 @@
+#include <aio.h>
+
+ssize_t aio_return(struct aiocb *cb)
+{
+	return cb->__ret;
+}

+ 57 - 0
src/aio/aio_suspend.c

@@ -0,0 +1,57 @@
+#include <aio.h>
+#include <errno.h>
+#include "pthread_impl.h"
+
+/* Due to the requirement that aio_suspend be async-signal-safe, we cannot
+ * use any locks, wait queues, etc. that would make it more efficient. The
+ * only obviously-correct algorithm is to generate a wakeup every time any
+ * aio operation finishes and have aio_suspend re-evaluate the completion
+ * status of each aiocb it was waiting on. */
+
+static volatile int seq;
+
+void __aio_wake(void)
+{
+	a_inc(&seq);
+	__wake(&seq, -1, 1);
+}
+
+int aio_suspend(struct aiocb *const cbs[], int cnt, const struct timespec *ts)
+{
+	int i, last, first=1, ret=0;
+	struct timespec at;
+
+	if (cnt<0) {
+		errno = EINVAL;
+		return -1;
+	}
+
+	for (;;) {
+		last = seq;
+
+		for (i=0; i<cnt; i++) {
+			if (cbs[i] && cbs[i]->__err != EINPROGRESS)
+				return 0;
+		}
+
+		if (first && ts) {
+			clock_gettime(CLOCK_MONOTONIC, &at);
+			at.tv_sec += ts->tv_sec;
+			if ((at.tv_nsec += ts->tv_nsec) >= 1000000000) {
+				at.tv_nsec -= 1000000000;
+				at.tv_sec++;
+			}
+			first = 0;
+		}
+
+		ret = __timedwait(&seq, last, CLOCK_MONOTONIC,
+			ts ? &at : 0, 0, 0, 1);
+
+		if (ret == ETIMEDOUT) ret = EAGAIN;
+
+		if (ret) {
+			errno = ret;
+			return -1;
+		}
+	}
+}

+ 140 - 0
src/aio/lio_listio.c

@@ -0,0 +1,140 @@
+#include <aio.h>
+#include <errno.h>
+#include "pthread_impl.h"
+
+struct lio_state {
+	struct sigevent *sev;
+	int cnt;
+	struct aiocb *cbs[];
+};
+
+static int lio_wait(struct lio_state *st)
+{
+	int i, err, got_err;
+	int cnt = st->cnt;
+	struct aiocb **cbs = st->cbs;
+
+	for (;;) {
+		for (i=0; i<cnt; i++) {
+			if (!cbs[i]) continue;
+			err = aio_error(cbs[i]);
+			if (err==EINPROGRESS)
+				break;
+			if (err) got_err=1;
+			cbs[i] = 0;
+		}
+		if (i==cnt) {
+			if (got_err) {
+				errno = EIO;
+				return -1;
+			}
+			return 0;
+		}
+		if (aio_suspend(cbs, cnt, 0))
+			return -1;
+	}
+}
+
+static void notify_signal(struct sigevent *sev)
+{
+	siginfo_t si = {
+		.si_signo = sev->sigev_signo,
+		.si_value = sev->sigev_value,
+		.si_code = SI_ASYNCIO,
+		.si_pid = __pthread_self()->pid,
+		.si_uid = getuid()
+	};
+	__syscall(SYS_rt_sigqueueinfo, si.si_pid, si.si_signo, &si);
+}
+
+static void *wait_thread(void *p)
+{
+	struct lio_state *st = p;
+	struct sigevent *sev = st->sev;
+	lio_wait(st);
+	free(st);
+	switch (sev->sigev_notify) {
+	case SIGEV_SIGNAL:
+		notify_signal(sev);
+		break;
+	case SIGEV_THREAD:
+		sev->sigev_notify_function(sev->sigev_value);
+		break;
+	}
+	return 0;
+}
+
+int lio_listio(int mode, struct aiocb *const cbs[], int cnt, struct sigevent *sev)
+{
+	int i, ret;
+	struct lio_state *st=0;
+
+	if (cnt < 0) {
+		errno = EINVAL;
+		return -1;
+	}
+
+	if (mode == LIO_WAIT || (sev && sev->sigev_notify != SIGEV_NONE)) {
+		if (!(st = malloc(sizeof *st + cnt*sizeof *cbs))) {
+			errno = EAGAIN;
+			return -1;
+		}
+		st->cnt = cnt;
+		st->sev = sev;
+		memcpy(st->cbs, cbs, cnt*sizeof *cbs);
+	}
+
+	for (i=0; i<cnt; i++) {
+		if (!cbs[i]) continue;
+		switch (cbs[i]->aio_lio_opcode) {
+		case LIO_READ:
+			ret = aio_read(cbs[i]);
+			break;
+		case LIO_WRITE:
+			ret = aio_write(cbs[i]);
+			break;
+		default:
+			continue;
+		}
+		if (ret) {
+			free(st);
+			errno = EAGAIN;
+			return -1;
+		}
+	}
+
+	if (mode == LIO_WAIT) {
+		ret = lio_wait(st);
+		free(st);
+		return 0;
+	}
+
+	if (st) {
+		pthread_attr_t a;
+		sigset_t set;
+		pthread_t td;
+
+		if (sev->sigev_notify == SIGEV_THREAD) {
+			if (sev->sigev_notify_attributes)
+				a = *sev->sigev_notify_attributes;
+			else
+				pthread_attr_init(&a);
+		} else {
+			pthread_attr_init(&a);
+			pthread_attr_setstacksize(&a, PAGE_SIZE);
+			pthread_attr_setguardsize(&a, 0);
+		}
+		pthread_attr_setdetachstate(&a, PTHREAD_CREATE_DETACHED);
+		sigfillset(&set);
+		pthread_sigmask(SIG_BLOCK, &set, &set);
+		if (pthread_create(&td, &a, wait_thread, st)) {
+			free(st);
+			errno = EAGAIN;
+			return -1;
+		}
+		pthread_sigmask(SIG_SETMASK, &set, 0);
+	}
+
+	return 0;
+}
+