Browse Source

new futex-requeue-based pthread_cond_broadcast implementation

this avoids the "stampede effect" where pthread_cond_broadcast would
result in all waiters waking up simultaneously, only to immediately
contend for the mutex and go back to sleep.
Rich Felker 13 năm trước cách đây
mục cha
commit
cba4e1c0a3

+ 6 - 3
src/internal/pthread_impl.h

@@ -64,9 +64,12 @@ struct __timer {
 #define _m_prev __u.__p[3]
 #define _m_next __u.__p[4]
 #define _m_count __u.__i[5]
-#define _c_block __u.__i[0]
-#define _c_clock __u.__i[1]
-#define _c_waiters __u.__i[2]
+#define _c_mutex __u.__p[0]
+#define _c_block __u.__i[2]
+#define _c_waiters __u.__i[3]
+#define _c_clock __u.__i[4]
+#define _c_bcast __u.__i[5]
+#define _c_leavers __u.__i[6]
 #define _rw_lock __u.__i[0]
 #define _rw_waiters __u.__i[1]
 #define _b_inst __u.__p[0]

+ 40 - 1
src/thread/pthread_cond_broadcast.c

@@ -1,8 +1,47 @@
 #include "pthread_impl.h"
 
+static void unlock(pthread_cond_t *c)
+{
+	a_dec(&c->_c_bcast);
+	if (c->_c_leavers) __wake(&c->_c_bcast, -1, 0);
+}
+
 int pthread_cond_broadcast(pthread_cond_t *c)
 {
+	pthread_mutex_t *m;
+	int w;
+
+	if (!c->_c_waiters) return 0;
+	a_inc(&c->_c_bcast);
+	if (!c->_c_waiters) {
+		unlock(c);
+		return 0;
+	}
+
 	a_store(&c->_c_block, 0);
-	if (c->_c_waiters) __wake(&c->_c_block, -1, 0);
+
+	m = c->_c_mutex;
+
+	/* If mutex ptr is not available, simply wake all waiters. */
+	if (m == (void *)-1) {
+		unlock(c);
+		__wake(&c->_c_block, -1, 0);
+		return 0;
+	}
+
+	/* Move waiter count to the mutex */
+	for (;;) {
+		w = c->_c_waiters;
+		a_fetch_add(&m->_m_waiters, w);
+		if (a_cas(&c->_c_waiters, w, 0) == w) break;
+		a_fetch_add(&m->_m_waiters, -w);
+	}
+
+	/* Perform the futex requeue, waking one waiter if and only if
+	 * the calling thread does not hold the mutex. */
+	__syscall(SYS_futex, &c->_c_block, FUTEX_REQUEUE,
+		m->_m_lock!=pthread_self()->tid, INT_MAX, &m->_m_lock);
+
+	unlock(c);
 	return 0;
 }

+ 4 - 1
src/thread/pthread_cond_init.c

@@ -3,6 +3,9 @@
 int pthread_cond_init(pthread_cond_t *c, const pthread_condattr_t *a)
 {
 	memset(c, 0, sizeof *c);
-	if (a) c->_c_clock = *a & 0x7fffffff;
+	if (a) {
+		c->_c_clock = *a & 0x7fffffff;
+		if (*a>>31) c->_c_mutex = (void *)-1;
+	}
 	return 0;
 }

+ 19 - 2
src/thread/pthread_cond_timedwait.c

@@ -5,10 +5,25 @@ struct cm {
 	pthread_mutex_t *m;
 };
 
+static void unwait(pthread_cond_t *c, pthread_mutex_t *m)
+{
+	int w;
+
+	/* Cannot leave waiting status if there are any live broadcasters
+	 * which might be inspecting/using the mutex. */
+	while ((w=c->_c_bcast)) __wait(&c->_c_bcast, &c->_c_leavers, w, 0);
+
+	/* If the waiter count is zero, it must be the case that the
+	 * caller's count has been moved to the mutex due to bcast. */
+	do w = c->_c_waiters;
+	while (w && a_cas(&c->_c_waiters, w, w-1)!=w);
+	if (!w) a_dec(&m->_m_waiters);
+}
+
 static void cleanup(void *p)
 {
 	struct cm *cm = p;
-	a_dec(&cm->c->_c_waiters);
+	unwait(cm->c, cm->m);
 	pthread_mutex_lock(cm->m);
 }
 
@@ -22,6 +37,8 @@ int pthread_cond_timedwait(pthread_cond_t *c, pthread_mutex_t *m, const struct t
 
 	pthread_testcancel();
 
+	if (c->_c_mutex != (void *)-1) c->_c_mutex = m;
+
 	a_inc(&c->_c_waiters);
 	c->_c_block = tid = pthread_self()->tid;
 
@@ -31,7 +48,7 @@ int pthread_cond_timedwait(pthread_cond_t *c, pthread_mutex_t *m, const struct t
 	while (c->_c_block == tid && (!e || e==EINTR));
 	if (e == EINTR) e = 0;
 
-	a_dec(&c->_c_waiters);
+	unwait(c, m);
 
 	if ((r=pthread_mutex_lock(m))) return r;