aio.c 9.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400
  1. #include <aio.h>
  2. #include <pthread.h>
  3. #include <semaphore.h>
  4. #include <limits.h>
  5. #include <errno.h>
  6. #include <unistd.h>
  7. #include <stdlib.h>
  8. #include <sys/auxv.h>
  9. #include "syscall.h"
  10. #include "atomic.h"
  11. #include "pthread_impl.h"
  12. /* The following is a threads-based implementation of AIO with minimal
  13. * dependence on implementation details. Most synchronization is
  14. * performed with pthread primitives, but atomics and futex operations
  15. * are used for notification in a couple places where the pthread
  16. * primitives would be inefficient or impractical.
  17. *
  18. * For each fd with outstanding aio operations, an aio_queue structure
  19. * is maintained. These are reference-counted and destroyed by the last
  20. * aio worker thread to exit. Accessing any member of the aio_queue
  21. * structure requires a lock on the aio_queue. Adding and removing aio
  22. * queues themselves requires a write lock on the global map object,
  23. * a 4-level table mapping file descriptor numbers to aio queues. A
  24. * read lock on the map is used to obtain locks on existing queues by
  25. * excluding destruction of the queue by a different thread while it is
  26. * being locked.
  27. *
  28. * Each aio queue has a list of active threads/operations. Presently there
  29. * is a one to one relationship between threads and operations. The only
  30. * members of the aio_thread structure which are accessed by other threads
  31. * are the linked list pointers, op (which is immutable), running (which
  32. * is updated atomically), and err (which is synchronized via running),
  33. * so no locking is necessary. Most of the other other members are used
  34. * for sharing data between the main flow of execution and cancellation
  35. * cleanup handler.
  36. *
  37. * Taking any aio locks requires having all signals blocked. This is
  38. * necessary because aio_cancel is needed by close, and close is required
  39. * to be async-signal safe. All aio worker threads run with all signals
  40. * blocked permanently.
  41. */
  42. struct aio_thread {
  43. pthread_t td;
  44. struct aiocb *cb;
  45. struct aio_thread *next, *prev;
  46. struct aio_queue *q;
  47. volatile int running;
  48. int err, op;
  49. ssize_t ret;
  50. };
  51. struct aio_queue {
  52. int fd, seekable, append, ref, init;
  53. pthread_mutex_t lock;
  54. pthread_cond_t cond;
  55. struct aio_thread *head;
  56. };
  57. struct aio_args {
  58. struct aiocb *cb;
  59. struct aio_queue *q;
  60. int op;
  61. sem_t sem;
  62. };
  63. static pthread_rwlock_t maplock = PTHREAD_RWLOCK_INITIALIZER;
  64. static struct aio_queue *****map;
  65. static volatile int aio_fd_cnt;
  66. volatile int __aio_fut;
  67. static struct aio_queue *__aio_get_queue(int fd, int need)
  68. {
  69. if (fd < 0) {
  70. errno = EBADF;
  71. return 0;
  72. }
  73. int a=fd>>24;
  74. unsigned char b=fd>>16, c=fd>>8, d=fd;
  75. struct aio_queue *q = 0;
  76. pthread_rwlock_rdlock(&maplock);
  77. if ((!map || !map[a] || !map[a][b] || !map[a][b][c] || !(q=map[a][b][c][d])) && need) {
  78. pthread_rwlock_unlock(&maplock);
  79. if (fcntl(fd, F_GETFD) < 0) return 0;
  80. pthread_rwlock_wrlock(&maplock);
  81. if (!map) map = calloc(sizeof *map, (-1U/2+1)>>24);
  82. if (!map) goto out;
  83. if (!map[a]) map[a] = calloc(sizeof **map, 256);
  84. if (!map[a]) goto out;
  85. if (!map[a][b]) map[a][b] = calloc(sizeof ***map, 256);
  86. if (!map[a][b]) goto out;
  87. if (!map[a][b][c]) map[a][b][c] = calloc(sizeof ****map, 256);
  88. if (!map[a][b][c]) goto out;
  89. if (!(q = map[a][b][c][d])) {
  90. map[a][b][c][d] = q = calloc(sizeof *****map, 1);
  91. if (q) {
  92. q->fd = fd;
  93. pthread_mutex_init(&q->lock, 0);
  94. pthread_cond_init(&q->cond, 0);
  95. a_inc(&aio_fd_cnt);
  96. }
  97. }
  98. }
  99. if (q) pthread_mutex_lock(&q->lock);
  100. out:
  101. pthread_rwlock_unlock(&maplock);
  102. return q;
  103. }
  104. static void __aio_unref_queue(struct aio_queue *q)
  105. {
  106. if (q->ref > 1) {
  107. q->ref--;
  108. pthread_mutex_unlock(&q->lock);
  109. return;
  110. }
  111. /* This is potentially the last reference, but a new reference
  112. * may arrive since we cannot free the queue object without first
  113. * taking the maplock, which requires releasing the queue lock. */
  114. pthread_mutex_unlock(&q->lock);
  115. pthread_rwlock_wrlock(&maplock);
  116. pthread_mutex_lock(&q->lock);
  117. if (q->ref == 1) {
  118. int fd=q->fd;
  119. int a=fd>>24;
  120. unsigned char b=fd>>16, c=fd>>8, d=fd;
  121. map[a][b][c][d] = 0;
  122. a_dec(&aio_fd_cnt);
  123. pthread_rwlock_unlock(&maplock);
  124. pthread_mutex_unlock(&q->lock);
  125. free(q);
  126. } else {
  127. q->ref--;
  128. pthread_rwlock_unlock(&maplock);
  129. pthread_mutex_unlock(&q->lock);
  130. }
  131. }
  132. static void cleanup(void *ctx)
  133. {
  134. struct aio_thread *at = ctx;
  135. struct aio_queue *q = at->q;
  136. struct aiocb *cb = at->cb;
  137. struct sigevent sev = cb->aio_sigevent;
  138. /* There are four potential types of waiters we could need to wake:
  139. * 1. Callers of aio_cancel/close.
  140. * 2. Callers of aio_suspend with a single aiocb.
  141. * 3. Callers of aio_suspend with a list.
  142. * 4. AIO worker threads waiting for sequenced operations.
  143. * Types 1-3 are notified via atomics/futexes, mainly for AS-safety
  144. * considerations. Type 4 is notified later via a cond var. */
  145. cb->__ret = at->ret;
  146. if (a_swap(&at->running, 0) < 0)
  147. __wake(&at->running, -1, 1);
  148. if (a_swap(&cb->__err, at->err) != EINPROGRESS)
  149. __wake(&cb->__err, -1, 1);
  150. if (a_swap(&__aio_fut, 0))
  151. __wake(&__aio_fut, -1, 1);
  152. pthread_mutex_lock(&q->lock);
  153. if (at->next) at->next->prev = at->prev;
  154. if (at->prev) at->prev->next = at->next;
  155. else q->head = at->next;
  156. /* Signal aio worker threads waiting for sequenced operations. */
  157. pthread_cond_broadcast(&q->cond);
  158. __aio_unref_queue(q);
  159. if (sev.sigev_notify == SIGEV_SIGNAL) {
  160. siginfo_t si = {
  161. .si_signo = sev.sigev_signo,
  162. .si_value = sev.sigev_value,
  163. .si_code = SI_ASYNCIO,
  164. .si_pid = getpid(),
  165. .si_uid = getuid()
  166. };
  167. __syscall(SYS_rt_sigqueueinfo, si.si_pid, si.si_signo, &si);
  168. }
  169. if (sev.sigev_notify == SIGEV_THREAD) {
  170. a_store(&__pthread_self()->cancel, 0);
  171. sev.sigev_notify_function(sev.sigev_value);
  172. }
  173. }
  174. static void *io_thread_func(void *ctx)
  175. {
  176. struct aio_thread at, *p;
  177. struct aio_args *args = ctx;
  178. struct aiocb *cb = args->cb;
  179. int fd = cb->aio_fildes;
  180. int op = args->op;
  181. void *buf = (void *)cb->aio_buf;
  182. size_t len = cb->aio_nbytes;
  183. off_t off = cb->aio_offset;
  184. struct aio_queue *q = args->q;
  185. ssize_t ret;
  186. pthread_mutex_lock(&q->lock);
  187. sem_post(&args->sem);
  188. at.op = op;
  189. at.running = 1;
  190. at.ret = -1;
  191. at.err = ECANCELED;
  192. at.q = q;
  193. at.td = __pthread_self();
  194. at.cb = cb;
  195. at.prev = 0;
  196. if ((at.next = q->head)) at.next->prev = &at;
  197. q->head = &at;
  198. if (!q->init) {
  199. int seekable = lseek(fd, 0, SEEK_CUR) >= 0;
  200. q->seekable = seekable;
  201. q->append = !seekable || (fcntl(fd, F_GETFL) & O_APPEND);
  202. q->init = 1;
  203. }
  204. pthread_cleanup_push(cleanup, &at);
  205. /* Wait for sequenced operations. */
  206. if (op!=LIO_READ && (op!=LIO_WRITE || q->append)) {
  207. for (;;) {
  208. for (p=at.next; p && p->op!=LIO_WRITE; p=p->next);
  209. if (!p) break;
  210. pthread_cond_wait(&q->cond, &q->lock);
  211. }
  212. }
  213. pthread_mutex_unlock(&q->lock);
  214. switch (op) {
  215. case LIO_WRITE:
  216. ret = q->append ? write(fd, buf, len) : pwrite(fd, buf, len, off);
  217. break;
  218. case LIO_READ:
  219. ret = !q->seekable ? read(fd, buf, len) : pread(fd, buf, len, off);
  220. break;
  221. case O_SYNC:
  222. ret = fsync(fd);
  223. break;
  224. case O_DSYNC:
  225. ret = fdatasync(fd);
  226. break;
  227. }
  228. at.ret = ret;
  229. at.err = ret<0 ? errno : 0;
  230. pthread_cleanup_pop(1);
  231. return 0;
  232. }
  233. static size_t io_thread_stack_size = MINSIGSTKSZ+2048;
  234. static pthread_once_t init_stack_size_once;
  235. static void init_stack_size()
  236. {
  237. unsigned long val = __getauxval(AT_MINSIGSTKSZ);
  238. if (val > MINSIGSTKSZ) io_thread_stack_size = val + 512;
  239. }
  240. static int submit(struct aiocb *cb, int op)
  241. {
  242. int ret = 0;
  243. pthread_attr_t a;
  244. sigset_t allmask, origmask;
  245. pthread_t td;
  246. struct aio_queue *q = __aio_get_queue(cb->aio_fildes, 1);
  247. struct aio_args args = { .cb = cb, .op = op, .q = q };
  248. sem_init(&args.sem, 0, 0);
  249. if (!q) {
  250. if (errno != EBADF) errno = EAGAIN;
  251. cb->__ret = -1;
  252. cb->__err = errno;
  253. return -1;
  254. }
  255. q->ref++;
  256. pthread_mutex_unlock(&q->lock);
  257. if (cb->aio_sigevent.sigev_notify == SIGEV_THREAD) {
  258. if (cb->aio_sigevent.sigev_notify_attributes)
  259. a = *cb->aio_sigevent.sigev_notify_attributes;
  260. else
  261. pthread_attr_init(&a);
  262. } else {
  263. pthread_once(&init_stack_size_once, init_stack_size);
  264. pthread_attr_init(&a);
  265. pthread_attr_setstacksize(&a, io_thread_stack_size);
  266. pthread_attr_setguardsize(&a, 0);
  267. }
  268. pthread_attr_setdetachstate(&a, PTHREAD_CREATE_DETACHED);
  269. sigfillset(&allmask);
  270. pthread_sigmask(SIG_BLOCK, &allmask, &origmask);
  271. cb->__err = EINPROGRESS;
  272. if (pthread_create(&td, &a, io_thread_func, &args)) {
  273. pthread_mutex_lock(&q->lock);
  274. __aio_unref_queue(q);
  275. cb->__err = errno = EAGAIN;
  276. cb->__ret = ret = -1;
  277. }
  278. pthread_sigmask(SIG_SETMASK, &origmask, 0);
  279. if (!ret) {
  280. while (sem_wait(&args.sem));
  281. }
  282. return ret;
  283. }
  284. int aio_read(struct aiocb *cb)
  285. {
  286. return submit(cb, LIO_READ);
  287. }
  288. int aio_write(struct aiocb *cb)
  289. {
  290. return submit(cb, LIO_WRITE);
  291. }
  292. int aio_fsync(int op, struct aiocb *cb)
  293. {
  294. if (op != O_SYNC && op != O_DSYNC) {
  295. errno = EINVAL;
  296. return -1;
  297. }
  298. return submit(cb, op);
  299. }
  300. ssize_t aio_return(struct aiocb *cb)
  301. {
  302. return cb->__ret;
  303. }
  304. int aio_error(const struct aiocb *cb)
  305. {
  306. a_barrier();
  307. return cb->__err & 0x7fffffff;
  308. }
  309. int aio_cancel(int fd, struct aiocb *cb)
  310. {
  311. sigset_t allmask, origmask;
  312. int ret = AIO_ALLDONE;
  313. struct aio_thread *p;
  314. struct aio_queue *q;
  315. /* Unspecified behavior case. Report an error. */
  316. if (cb && fd != cb->aio_fildes) {
  317. errno = EINVAL;
  318. return -1;
  319. }
  320. sigfillset(&allmask);
  321. pthread_sigmask(SIG_BLOCK, &allmask, &origmask);
  322. errno = ENOENT;
  323. if (!(q = __aio_get_queue(fd, 0))) {
  324. if (errno == EBADF) ret = -1;
  325. goto done;
  326. }
  327. for (p = q->head; p; p = p->next) {
  328. if (cb && cb != p->cb) continue;
  329. /* Transition target from running to running-with-waiters */
  330. if (a_cas(&p->running, 1, -1)) {
  331. pthread_cancel(p->td);
  332. __wait(&p->running, 0, -1, 1);
  333. if (p->err == ECANCELED) ret = AIO_CANCELED;
  334. }
  335. }
  336. pthread_mutex_unlock(&q->lock);
  337. done:
  338. pthread_sigmask(SIG_SETMASK, &origmask, 0);
  339. return ret;
  340. }
  341. int __aio_close(int fd)
  342. {
  343. a_barrier();
  344. if (aio_fd_cnt) aio_cancel(fd, 0);
  345. return fd;
  346. }
  347. weak_alias(aio_cancel, aio_cancel64);
  348. weak_alias(aio_error, aio_error64);
  349. weak_alias(aio_fsync, aio_fsync64);
  350. weak_alias(aio_read, aio_read64);
  351. weak_alias(aio_write, aio_write64);
  352. weak_alias(aio_return, aio_return64);