[ofw] [PATCH] dapl: fix ring buffer synchronization

Sean Hefty sean.hefty at intel.com
Thu Mar 4 16:18:06 PST 2010


The dapl ring buffer implementation is not thread safe.  Replace
the use of atomic variables with actual locking to ensure that
there are not races inserting and/or removing items at the same time.

Without proper synchronization, the EVD can report invalid events or
the same event multiple times.

Signed-off-by: Sean Hefty <sean.hefty at intel.com>
---
 .../ulp/dapl2/dapl/common/dapl_ring_buffer_util.c  |  187 +++++++++-----------
 .../ulp/dapl2/dapl/common/dapl_ring_buffer_util.h  |    7 -
 trunk/ulp/dapl2/dapl/include/dapl.h                |    7 -
 3 files changed, 88 insertions(+), 113 deletions(-)

diff --git a/trunk/ulp/dapl2/dapl/common/dapl_ring_buffer_util.c b/trunk/ulp/dapl2/dapl/common/dapl_ring_buffer_util.c
index 54517a9..d1ee269 100644
--- a/trunk/ulp/dapl2/dapl/common/dapl_ring_buffer_util.c
+++ b/trunk/ulp/dapl2/dapl/common/dapl_ring_buffer_util.c
@@ -41,8 +41,7 @@
  * dapls_rbuf_alloc
  *
  * Given a DAPL_RING_BUFFER, initialize it and provide memory for
- * the ringbuf itself. A passed in size will be adjusted to the next
- * largest power of two number to simplify management.
+ * the ringbuf itself.
  *
  * Input:
  *	rbuf		pointer to DAPL_RING_BUFFER
@@ -58,38 +57,27 @@
  */
 DAT_RETURN dapls_rbuf_alloc(INOUT DAPL_RING_BUFFER * rbuf, IN DAT_COUNT size)
 {
-	unsigned int rsize;	/* real size */
-
 	/* The circular buffer must be allocated one too large.
 	 * This eliminates any need for a distinct counter, as
 	 * having the two pointers equal always means "empty" -- never "full"
 	 */
 	size++;
 
-	/* Put size on a power of 2 boundary */
-	rsize = 1;
-	while ((DAT_COUNT) rsize < size) {
-		rsize <<= 1;
-	}
-
-	rbuf->base = (void *)dapl_os_alloc(rsize * sizeof(void *));
-	if (rbuf->base != NULL) {
-		rbuf->lim = rsize - 1;
-		dapl_os_atomic_set(&rbuf->head, 0);
-		dapl_os_atomic_set(&rbuf->tail, 0);
-	} else {
+	rbuf->base = (void *)dapl_os_alloc(size * sizeof(void *));
+	if (rbuf->base == NULL)
 		return DAT_INSUFFICIENT_RESOURCES | DAT_RESOURCE_MEMORY;
-	}
 
+	dapl_os_lock_init(&rbuf->lock);
+	rbuf->size = size;
+	rbuf->head = 0;
+	rbuf->tail = 0;
 	return DAT_SUCCESS;
 }
 
 /*
  * dapls_rbuf_realloc
  *
- * Resizes a DAPL_RING_BUFFER. This function is not thread safe;
- * adding or removing elements from a ring buffer while resizing 
- * will have indeterminate results.
+ * Resizes a DAPL_RING_BUFFER.
  *
  * Input:
  *	rbuf		pointer to DAPL_RING_BUFFER
@@ -106,41 +94,35 @@ DAT_RETURN dapls_rbuf_alloc(INOUT DAPL_RING_BUFFER * rbuf, IN DAT_COUNT size)
  */
 DAT_RETURN dapls_rbuf_realloc(INOUT DAPL_RING_BUFFER * rbuf, IN DAT_COUNT size)
 {
-	DAPL_RING_BUFFER new_rbuf;
-	void *entry;
-	DAT_RETURN dat_status;
-
-	dat_status = DAT_SUCCESS;
+	void **base;
 
 	/* decreasing the size or retaining the old size is not allowed */
-	if (size <= rbuf->lim + 1) {
-		dat_status = DAT_ERROR(DAT_INVALID_PARAMETER, DAT_INVALID_ARG2);
-		goto bail;
-	}
+	if (size <= rbuf->size + 1)
+		return DAT_ERROR(DAT_INVALID_PARAMETER, DAT_INVALID_ARG2);
 
-	/*
-	 * !This is NOT ATOMIC!
-	 * Simple algorithm: Allocate a new ring buffer, take everything
-	 * out of the old one and put it in the new one, and release the 
-	 * old base buffer.
-	 */
-	dat_status = dapls_rbuf_alloc(&new_rbuf, size);
-	if (dat_status != DAT_SUCCESS) {
-		goto bail;
-	}
+	base = (void *) dapl_os_alloc(size * sizeof(void *));
+	if (base == NULL)
+		return DAT_INSUFFICIENT_RESOURCES | DAT_RESOURCE_MEMORY;
 
-	while ((entry = dapls_rbuf_remove(rbuf)) != NULL) {
-		/* We know entries will fit so ignore the return code */
-		(void)dapls_rbuf_add(&new_rbuf, entry);
+	dapl_os_lock(&rbuf->lock);
+	if (rbuf->head > rbuf->tail) {
+		memcpy(&base[rbuf->tail], &rbuf->base[rbuf->tail],
+			(rbuf->head - rbuf->tail) * sizeof(void *));
+	} else if (rbuf->head < rbuf->tail) {
+		memcpy(&base[0], &rbuf->base[rbuf->tail],
+			(rbuf->size - rbuf->tail) * sizeof(void *));
+		memcpy(&base[rbuf->size - rbuf->tail], &rbuf->base[0],
+			rbuf->head * sizeof(void *));
+		rbuf->head = rbuf->size - rbuf->tail + rbuf->head;
+		rbuf->tail = 0;
 	}
 
-	/* release the old base buffer */
-	dapl_os_free(rbuf->base, (rbuf->lim + 1) * sizeof(void *));
+	dapl_os_free(rbuf->base, rbuf->size * sizeof(void *));
+	rbuf->base = base;
+	rbuf->size = size;
+	dapl_os_unlock(&rbuf->lock);
 
-	*rbuf = new_rbuf;
-
-      bail:
-	return dat_status;
+	return DAT_SUCCESS;
 }
 
 /*
@@ -160,15 +142,21 @@ DAT_RETURN dapls_rbuf_realloc(INOUT DAPL_RING_BUFFER * rbuf, IN DAT_COUNT size)
  */
 void dapls_rbuf_destroy(IN DAPL_RING_BUFFER * rbuf)
 {
-	if ((NULL == rbuf) || (NULL == rbuf->base)) {
-		return;
-	}
+	dapl_os_lock_destroy(&rbuf->lock);
+	dapl_os_free(rbuf->base, rbuf->size * sizeof(void *));
+}
 
-	dapl_os_free(rbuf->base, (rbuf->lim + 1) * sizeof(void *));
-	rbuf->base = NULL;
-	rbuf->lim = 0;
+static DAT_COUNT dapli_rbuf_count(IN DAPL_RING_BUFFER * rbuf)
+{
+	if (rbuf->head >= rbuf->tail)
+		return rbuf->head - rbuf->tail;
+	else
+		return rbuf->size - rbuf->tail + rbuf->head;
+}
 
-	return;
+static int dapli_rbuf_empty(IN DAPL_RING_BUFFER *rbuf)
+{
+	return rbuf->head == rbuf->tail;
 }
 
 /*
@@ -190,22 +178,20 @@ void dapls_rbuf_destroy(IN DAPL_RING_BUFFER * rbuf)
  */
 DAT_RETURN dapls_rbuf_add(IN DAPL_RING_BUFFER * rbuf, IN void *entry)
 {
-	int pos;
-	int val;
-
-	while (((dapl_os_atomic_read(&rbuf->head) + 1) & rbuf->lim) !=
-	       (dapl_os_atomic_read(&rbuf->tail) & rbuf->lim)) {
-		pos = dapl_os_atomic_read(&rbuf->head);
-		val = dapl_os_atomic_assign(&rbuf->head, pos, pos + 1);
-		if (val == pos) {
-			pos = (pos + 1) & rbuf->lim;	/* verify in range */
-			rbuf->base[pos] = entry;
-			return DAT_SUCCESS;
-		}
+	DAT_RETURN ret;
+
+	dapl_os_lock(&rbuf->lock);
+	if (dapli_rbuf_count(rbuf) < rbuf->size - 1) {
+		rbuf->base[rbuf->head++] = entry;
+		if (rbuf->head == rbuf->size)
+			rbuf->head = 0;
+		ret = DAT_SUCCESS;
+	} else {
+		ret = DAT_ERROR(DAT_INSUFFICIENT_RESOURCES, DAT_RESOURCE_MEMORY);
 	}
+	dapl_os_unlock(&rbuf->lock);
 
-	return DAT_ERROR(DAT_INSUFFICIENT_RESOURCES, DAT_RESOURCE_MEMORY);
-
+	return ret;
 }
 
 /*
@@ -226,21 +212,19 @@ DAT_RETURN dapls_rbuf_add(IN DAPL_RING_BUFFER * rbuf, IN void *entry)
  */
 void *dapls_rbuf_remove(IN DAPL_RING_BUFFER * rbuf)
 {
-	int pos;
-	int val;
-
-	while (dapl_os_atomic_read(&rbuf->head) !=
-	       dapl_os_atomic_read(&rbuf->tail)) {
-		pos = dapl_os_atomic_read(&rbuf->tail);
-		val = dapl_os_atomic_assign(&rbuf->tail, pos, pos + 1);
-		if (val == pos) {
-			pos = (pos + 1) & rbuf->lim;	/* verify in range */
+	void *entry;
 
-			return (rbuf->base[pos]);
-		}
+	dapl_os_lock(&rbuf->lock);
+	if (!dapli_rbuf_empty(rbuf)) {
+		entry = rbuf->base[rbuf->tail++];
+		if (rbuf->tail == rbuf->size)
+			rbuf->tail = 0;
+	} else {
+		entry = NULL;
 	}
+	dapl_os_unlock(&rbuf->lock);
 
-	return NULL;
+	return entry;
 
 }
 
@@ -263,18 +247,10 @@ void *dapls_rbuf_remove(IN DAPL_RING_BUFFER * rbuf)
 DAT_COUNT dapls_rbuf_count(IN DAPL_RING_BUFFER * rbuf)
 {
 	DAT_COUNT count;
-	int head;
-	int tail;
-
-	head = dapl_os_atomic_read(&rbuf->head) & rbuf->lim;
-	tail = dapl_os_atomic_read(&rbuf->tail) & rbuf->lim;
-	if (head > tail) {
-		count = head - tail;
-	} else {
-		/* add 1 to lim as it is a mask, number of entries - 1 */
-		count = (rbuf->lim + 1 - tail + head) & rbuf->lim;
-	}
 
+	dapl_os_lock(&rbuf->lock);
+	count = dapli_rbuf_count(rbuf);
+	dapl_os_unlock(&rbuf->lock);
 	return count;
 }
 
@@ -299,19 +275,20 @@ DAT_COUNT dapls_rbuf_count(IN DAPL_RING_BUFFER * rbuf)
  */
 void dapls_rbuf_adjust(IN DAPL_RING_BUFFER * rbuf, IN intptr_t offset)
 {
-	int pos;
+	int i;
 
-	pos = dapl_os_atomic_read(&rbuf->head);
-	while (pos != dapl_os_atomic_read(&rbuf->tail)) {
-		rbuf->base[pos] = (void *)((char *)rbuf->base[pos] + offset);
-		pos = (pos + 1) & rbuf->lim;	/* verify in range */
-	}
+	dapl_os_lock(&rbuf->lock);
+	for (i = 0; i < rbuf->size; i++)
+		rbuf->base[i] = (void *) ((char *)rbuf->base[i] + offset);
+	dapl_os_unlock(&rbuf->lock);
 }
 
-/*
- * Local variables:
- *  c-indent-level: 4
- *  c-basic-offset: 4
- *  tab-width: 8
- * End:
- */
+int dapls_rbuf_empty(IN DAPL_RING_BUFFER * rbuf)
+{
+	int empty;
+
+	dapl_os_lock(&rbuf->lock);
+	empty = dapli_rbuf_empty(rbuf);
+	dapl_os_unlock(&rbuf->lock);
+	return empty;
+}
diff --git a/trunk/ulp/dapl2/dapl/common/dapl_ring_buffer_util.h b/trunk/ulp/dapl2/dapl/common/dapl_ring_buffer_util.h
index 46c82c9..1eb782d 100644
--- a/trunk/ulp/dapl2/dapl/common/dapl_ring_buffer_util.h
+++ b/trunk/ulp/dapl2/dapl/common/dapl_ring_buffer_util.h
@@ -68,11 +68,8 @@ void dapls_rbuf_adjust (
 	IN  DAPL_RING_BUFFER		*rbuf,
 	IN  intptr_t			offset);
 
-
-/*
- * Simple functions
- */
-#define dapls_rbuf_empty(rbuf)	(rbuf->head == rbuf->tail)
+int dapls_rbuf_empty(
+	IN   DAPL_RING_BUFFER		*rbuf);
 
 
 #endif /* _DAPL_RING_BUFFER_H_ */
diff --git a/trunk/ulp/dapl2/dapl/include/dapl.h b/trunk/ulp/dapl2/dapl/include/dapl.h
index 4439ec5..f7b885b 100644
--- a/trunk/ulp/dapl2/dapl/include/dapl.h
+++ b/trunk/ulp/dapl2/dapl/include/dapl.h
@@ -237,9 +237,10 @@ struct dapl_llist_entry
 struct dapl_ring_buffer
 {
     void		**base;		/* base of element array */
-    DAT_COUNT		lim;		/* mask, number of entries - 1 */
-    DAPL_ATOMIC		head;		/* head pointer index */
-    DAPL_ATOMIC		tail;		/* tail pointer index */
+    DAT_COUNT		size;
+    DAT_COUNT		head;
+    DAT_COUNT		tail;
+    DAPL_OS_LOCK	lock;
 };
 
 struct dapl_cookie_buffer





More information about the ofw mailing list