|
@@ -41,13 +41,6 @@
|
|
|
* blocked permanently.
|
|
|
*/
|
|
|
|
|
|
-struct aio_args {
|
|
|
- struct aiocb *cb;
|
|
|
- int op;
|
|
|
- int err;
|
|
|
- sem_t sem;
|
|
|
-};
|
|
|
-
|
|
|
struct aio_thread {
|
|
|
pthread_t td;
|
|
|
struct aiocb *cb;
|
|
@@ -65,6 +58,13 @@ struct aio_queue {
|
|
|
struct aio_thread *head;
|
|
|
};
|
|
|
|
|
|
+struct aio_args {
|
|
|
+ struct aiocb *cb;
|
|
|
+ struct aio_queue *q;
|
|
|
+ int op;
|
|
|
+ sem_t sem;
|
|
|
+};
|
|
|
+
|
|
|
static pthread_rwlock_t maplock = PTHREAD_RWLOCK_INITIALIZER;
|
|
|
static struct aio_queue *****map;
|
|
|
static volatile int aio_fd_cnt;
|
|
@@ -196,12 +196,11 @@ static void *io_thread_func(void *ctx)
|
|
|
size_t len = cb->aio_nbytes;
|
|
|
off_t off = cb->aio_offset;
|
|
|
|
|
|
- struct aio_queue *q = __aio_get_queue(fd, 1);
|
|
|
+ struct aio_queue *q = args->q;
|
|
|
ssize_t ret;
|
|
|
|
|
|
- args->err = q ? 0 : EAGAIN;
|
|
|
+ pthread_mutex_lock(&q->lock);
|
|
|
sem_post(&args->sem);
|
|
|
- if (!q) return 0;
|
|
|
|
|
|
at.op = op;
|
|
|
at.running = 1;
|
|
@@ -213,7 +212,6 @@ static void *io_thread_func(void *ctx)
|
|
|
at.prev = 0;
|
|
|
if ((at.next = q->head)) at.next->prev = &at;
|
|
|
q->head = &at;
|
|
|
- q->ref++;
|
|
|
|
|
|
if (!q->init) {
|
|
|
int seekable = lseek(fd, 0, SEEK_CUR) >= 0;
|
|
@@ -272,9 +270,18 @@ static int submit(struct aiocb *cb, int op)
|
|
|
pthread_attr_t a;
|
|
|
sigset_t allmask, origmask;
|
|
|
pthread_t td;
|
|
|
- struct aio_args args = { .cb = cb, .op = op };
|
|
|
+ struct aio_queue *q = __aio_get_queue(cb->aio_fildes, 1);
|
|
|
+ struct aio_args args = { .cb = cb, .op = op, .q = q };
|
|
|
sem_init(&args.sem, 0, 0);
|
|
|
|
|
|
+ if (!q) {
|
|
|
+ if (cb->aio_fildes < 0) errno = EBADF;
|
|
|
+ else errno = EAGAIN;
|
|
|
+ return -1;
|
|
|
+ }
|
|
|
+ q->ref++;
|
|
|
+ pthread_mutex_unlock(&q->lock);
|
|
|
+
|
|
|
if (cb->aio_sigevent.sigev_notify == SIGEV_THREAD) {
|
|
|
if (cb->aio_sigevent.sigev_notify_attributes)
|
|
|
a = *cb->aio_sigevent.sigev_notify_attributes;
|
|
@@ -291,6 +298,8 @@ static int submit(struct aiocb *cb, int op)
|
|
|
pthread_sigmask(SIG_BLOCK, &allmask, &origmask);
|
|
|
cb->__err = EINPROGRESS;
|
|
|
if (pthread_create(&td, &a, io_thread_func, &args)) {
|
|
|
+ pthread_mutex_lock(&q->lock);
|
|
|
+ __aio_unref_queue(q);
|
|
|
errno = EAGAIN;
|
|
|
ret = -1;
|
|
|
}
|
|
@@ -298,10 +307,6 @@ static int submit(struct aiocb *cb, int op)
|
|
|
|
|
|
if (!ret) {
|
|
|
while (sem_wait(&args.sem));
|
|
|
- if (args.err) {
|
|
|
- errno = args.err;
|
|
|
- ret = -1;
|
|
|
- }
|
|
|
}
|
|
|
|
|
|
return ret;
|