[openib-general] [PATCHv2] fix sdp aio race

Michael S. Tsirkin mst at mellanox.co.il
Tue Jul 19 16:27:11 PDT 2005


Second attempt to solve race where AIO completed asynchronously
is bypassed by other completions.
Solution: set a bit in connection, and defer any cq polling
till work queue handles the AIO.
We have to pass the connection to iocb handlers for this to work.
I also got rid of in_atomic.

Libor, please comment.

Signed-off-by: Michael S. Tsirkin <mst at mellanox.co.il>

Index: linux-2.6.11.12/drivers/infiniband/ulp/sdp/sdp_write.c
===================================================================
--- linux-2.6.11.12.orig/drivers/infiniband/ulp/sdp/sdp_write.c
+++ linux-2.6.11.12/drivers/infiniband/ulp/sdp/sdp_write.c
@@ -109,7 +109,7 @@ int sdp_event_write(struct sdp_sock *con
 		SDP_CONN_STAT_WRITE_INC(conn, iocb->post);
 		SDP_CONN_STAT_WQ_DEC(conn, iocb->size);
 
-		sdp_iocb_complete(iocb, 0);
+		sdp_iocb_complete(conn, iocb, 0);
 
 		break;
 	case SDP_DESC_TYPE_NONE:
Index: linux-2.6.11.12/drivers/infiniband/ulp/sdp/sdp_rcvd.c
===================================================================
--- linux-2.6.11.12.orig/drivers/infiniband/ulp/sdp/sdp_rcvd.c
+++ linux-2.6.11.12/drivers/infiniband/ulp/sdp/sdp_rcvd.c
@@ -155,7 +155,7 @@ static int sdp_rcvd_send_sm(struct sdp_s
 
 			conn->src_sent--;
 
-			sdp_iocb_complete(iocb, 0);
+			sdp_iocb_complete(conn, iocb, 0);
 		}
 		/*
 		 * Cancel complete, clear the state.
@@ -212,7 +212,7 @@ static int sdp_rcvd_rdma_wr(struct sdp_s
 
 	conn->snk_sent--;
 
-	sdp_iocb_complete(iocb, 0);
+	sdp_iocb_complete(conn, iocb, 0);
 
 	return 0;
 }
@@ -273,7 +273,7 @@ static int sdp_rcvd_rdma_rd(struct sdp_s
 		SDP_CONN_STAT_WRITE_INC(conn, iocb->post);
 		SDP_CONN_STAT_WQ_DEC(conn, iocb->size);
 
-		sdp_iocb_complete(iocb, 0);
+		sdp_iocb_complete(conn, iocb, 0);
 	}
 	/*
 	 * If Source Cancel was in process, and there are no more outstanding
@@ -565,7 +565,7 @@ static int sdp_rcvd_snk_cancel_ack(struc
 	while ((iocb = sdp_iocb_q_get_head(&conn->r_snk))) {
 
 		conn->snk_sent--;
-		sdp_iocb_complete(iocb, 0);
+		sdp_iocb_complete(conn, iocb, 0);
 	}
 	/*
 	 * cancellation is complete. Cancel flag is cleared in RECV post.
@@ -687,7 +687,7 @@ static int sdp_rcvd_snk_avail(struct sdp
 				sdp_desc_q_put_head(&conn->send_queue,
 						    (struct sdpc_desc *)iocb);
 			else
-				sdp_iocb_complete(iocb, 0);
+				sdp_iocb_complete(conn, iocb, 0);
 		}
 		/*
 		 * If Source Cancel was in process, it should now 
Index: linux-2.6.11.12/drivers/infiniband/ulp/sdp/sdp_proto.h
===================================================================
--- linux-2.6.11.12.orig/drivers/infiniband/ulp/sdp/sdp_proto.h
+++ linux-2.6.11.12/drivers/infiniband/ulp/sdp/sdp_proto.h
@@ -162,7 +162,8 @@ void sdp_iocb_q_put_tail(struct sdpc_ioc
 
 struct sdpc_iocb *sdp_iocb_q_lookup(struct sdpc_iocb_q *table, u32 key);
 
-void sdp_iocb_q_cancel(struct sdpc_iocb_q *table, u32 mask, ssize_t comp);
+void sdp_iocb_q_cancel(struct sdp_sock *conn, struct sdpc_iocb_q *table,
+		       u32 mask, ssize_t comp);
 
 void sdp_iocb_q_remove(struct sdpc_iocb *iocb);
 
@@ -170,7 +171,8 @@ int sdp_iocb_register(struct sdpc_iocb *
 
 void sdp_iocb_release(struct sdpc_iocb *iocb);
 
-void sdp_iocb_complete(struct sdpc_iocb *iocb, ssize_t status);
+void sdp_iocb_complete(struct sdp_sock *conn, struct sdpc_iocb *iocb,
+		       ssize_t status);
 
 int sdp_iocb_lock(struct sdpc_iocb *iocb);
 
Index: linux-2.6.11.12/drivers/infiniband/ulp/sdp/sdp_read.c
===================================================================
--- linux-2.6.11.12.orig/drivers/infiniband/ulp/sdp/sdp_read.c
+++ linux-2.6.11.12/drivers/infiniband/ulp/sdp/sdp_read.c
@@ -207,7 +207,7 @@ int sdp_event_read(struct sdp_sock *conn
 		SDP_CONN_STAT_READ_INC(conn, iocb->post);
 		SDP_CONN_STAT_RQ_DEC(conn, iocb->size);
 
-		sdp_iocb_complete(iocb, 0);
+		sdp_iocb_complete(conn, iocb, 0);
 
 		break;
 	case SDP_DESC_TYPE_NONE:
@@ -228,7 +228,7 @@ int sdp_event_read(struct sdp_sock *conn
 		SDP_CONN_STAT_READ_INC(conn, iocb->post);
 		SDP_CONN_STAT_RQ_DEC(conn, iocb->size);
 
-		sdp_iocb_complete(sdp_iocb_q_get_head(&conn->r_pend), 0);
+		sdp_iocb_complete(conn, sdp_iocb_q_get_head(&conn->r_pend), 0);
 
 		break;
 	default:
Index: linux-2.6.11.12/drivers/infiniband/ulp/sdp/sdp_send.c
===================================================================
--- linux-2.6.11.12.orig/drivers/infiniband/ulp/sdp/sdp_send.c
+++ linux-2.6.11.12/drivers/infiniband/ulp/sdp/sdp_send.c
@@ -859,7 +859,7 @@ static int sdp_send_data_iocb(struct sdp
 			SDP_CONN_STAT_WRITE_INC(conn, iocb->post);
 			SDP_CONN_STAT_WQ_DEC(conn, iocb->size);
 
-			sdp_iocb_complete(iocb, 0);
+			sdp_iocb_complete(conn, iocb, 0);
 		}
 
 		goto done;
@@ -1730,7 +1730,7 @@ static int sdp_inet_write_cancel(struct 
 			 * needs to be compelted.
 			 */
 			if (iocb->post > 0) {
-				sdp_iocb_complete(iocb, 0);
+				sdp_iocb_complete(conn, iocb, 0);
 				result = -EAGAIN;
 			} else {
 				sdp_iocb_destroy(iocb);
Index: linux-2.6.11.12/drivers/infiniband/ulp/sdp/sdp_conn.c
===================================================================
--- linux-2.6.11.12.orig/drivers/infiniband/ulp/sdp/sdp_conn.c
+++ linux-2.6.11.12/drivers/infiniband/ulp/sdp/sdp_conn.c
@@ -665,7 +665,8 @@ static int sdp_desc_q_cancel_lookup_func
 	return ((element->type == SDP_DESC_TYPE_IOCB) ? 0 : -ERANGE);
 }
 
-static void sdp_desc_q_cancel_iocb(struct sdpc_desc_q *table, ssize_t error)
+static void sdp_desc_q_cancel_iocb(struct sdp_sock *conn,
+				   struct sdpc_desc_q *table, ssize_t error)
 {
 	struct sdpc_iocb *iocb;
 
@@ -675,24 +676,24 @@ static void sdp_desc_q_cancel_iocb(struc
 			 NULL))) {
 
 		sdp_iocb_q_remove(iocb);
-		sdp_iocb_complete(iocb, error);
+		sdp_iocb_complete(conn, iocb, error);
 	}
 }
 
 void sdp_iocb_q_cancel_all_read(struct sdp_sock *conn, ssize_t error)
 {
-	sdp_iocb_q_cancel(&conn->r_pend, SDP_IOCB_F_ALL, error);
-	sdp_iocb_q_cancel(&conn->r_snk, SDP_IOCB_F_ALL, error);
+	sdp_iocb_q_cancel(conn, &conn->r_pend, SDP_IOCB_F_ALL, error);
+	sdp_iocb_q_cancel(conn, &conn->r_snk, SDP_IOCB_F_ALL, error);
 
-	sdp_desc_q_cancel_iocb(&conn->r_src, error);
+	sdp_desc_q_cancel_iocb(conn, &conn->r_src, error);
 }
 
 void sdp_iocb_q_cancel_all_write(struct sdp_sock *conn, ssize_t error)
 {
-	sdp_iocb_q_cancel(&conn->w_src, SDP_IOCB_F_ALL, error);
+	sdp_iocb_q_cancel(conn, &conn->w_src, SDP_IOCB_F_ALL, error);
 
-	sdp_desc_q_cancel_iocb(&conn->send_queue, error);
-	sdp_desc_q_cancel_iocb(&conn->w_snk, error);
+	sdp_desc_q_cancel_iocb(conn, &conn->send_queue, error);
+	sdp_desc_q_cancel_iocb(conn, &conn->w_snk, error);
 }
 
 void sdp_iocb_q_cancel_all(struct sdp_sock *conn, ssize_t error)
@@ -927,7 +928,7 @@ int sdp_conn_cq_drain(struct ib_cq *cq, 
 	 * the function should only be called under the connection locks
 	 * spinlock to ensure the call is serialized to avoid races.
 	 */
-	for (;;) {
+	while(!(conn->lock.event & SDP_LOCK_F_DEFERRED)) {
 		/*
 		 * poll for a completion
 		 */
@@ -971,17 +972,16 @@ int sdp_conn_cq_drain(struct ib_cq *cq, 
  */
 void sdp_conn_internal_unlock(struct sdp_sock *conn)
 {
-	int calls = 0;
 	/*
 	 * poll CQs for events.
 	 */
-	if (conn->lock.event & SDP_LOCK_F_RECV_CQ)
-		calls += sdp_conn_cq_drain(conn->recv_cq, conn);
-
-	if (conn->lock.event & SDP_LOCK_F_SEND_CQ)
-		calls += sdp_conn_cq_drain(conn->send_cq, conn);
-
-	conn->lock.event = 0;
+	if ((conn->lock.event & SDP_LOCK_F_RECV_CQ) &&
+		!sdp_conn_cq_drain(conn->recv_cq, conn))
+		conn->lock.event &= ~SDP_LOCK_F_RECV_CQ;
+
+	if ((conn->lock.event & SDP_LOCK_F_SEND_CQ) &&
+		!sdp_conn_cq_drain(conn->send_cq, conn))
+		conn->lock.event &= ~SDP_LOCK_F_SEND_CQ;
 }
 
 /*
Index: linux-2.6.11.12/drivers/infiniband/ulp/sdp/sdp_recv.c
===================================================================
--- linux-2.6.11.12.orig/drivers/infiniband/ulp/sdp/sdp_recv.c
+++ linux-2.6.11.12/drivers/infiniband/ulp/sdp/sdp_recv.c
@@ -663,7 +663,7 @@ static int sdp_recv_buff_iocb_active(str
 	/*
 	 * callback to complete IOCB
 	 */
-	sdp_iocb_complete(iocb, 0);
+	sdp_iocb_complete(conn, iocb, 0);
 
 	return (buff->tail - buff->data);
 }
@@ -716,7 +716,7 @@ static int sdp_recv_buff_iocb_pending(st
 		/*
 		 * callback to complete IOCB
 		 */
-		sdp_iocb_complete(sdp_iocb_q_get_head(&conn->r_pend), 0);
+		sdp_iocb_complete(conn, sdp_iocb_q_get_head(&conn->r_pend), 0);
 	}
 
 	return (buff->tail - buff->data);
@@ -875,7 +875,7 @@ static int sdp_inet_read_cancel(struct k
 				/*
 				 * callback to complete IOCB, or drop reference
 				 */
-				sdp_iocb_complete(iocb, 0);
+				sdp_iocb_complete(conn, iocb, 0);
 				result = -EAGAIN;
 			}
 			else {
Index: linux-2.6.11.12/drivers/infiniband/ulp/sdp/sdp_conn.h
===================================================================
--- linux-2.6.11.12.orig/drivers/infiniband/ulp/sdp/sdp_conn.h
+++ linux-2.6.11.12/drivers/infiniband/ulp/sdp/sdp_conn.h
@@ -163,8 +163,9 @@ struct sdp_conn_lock {
 	wait_queue_head_t waitq;
 };
 
-#define SDP_LOCK_F_RECV_CQ 0x01 /* recv CQ event is pending */
-#define SDP_LOCK_F_SEND_CQ 0x02 /* send CQ event is pending */
+#define SDP_LOCK_F_RECV_CQ  0x01 /* recv CQ event is pending */
+#define SDP_LOCK_F_SEND_CQ  0x02 /* send CQ event is pending */
+#define SDP_LOCK_F_DEFERRED 0x04 /* work scheduled, pending completion */
 /*
  * SDP Connection structure.
  */
Index: linux-2.6.11.12/drivers/infiniband/ulp/sdp/sdp_iocb.c
===================================================================
--- linux-2.6.11.12.orig/drivers/infiniband/ulp/sdp/sdp_iocb.c
+++ linux-2.6.11.12/drivers/infiniband/ulp/sdp/sdp_iocb.c
@@ -483,9 +483,8 @@ void sdp_iocb_release(struct sdpc_iocb *
 /*
  * do_iocb_complete - complete an IOCB for real in thread context
  */
-static void do_iocb_complete(void *arg)
+static void do_iocb_complete(struct sdpc_iocb *iocb)
 {
-	struct sdpc_iocb *iocb = (struct sdpc_iocb *)arg;
 	long value;
 	/*
 	 * release memory
@@ -508,21 +507,37 @@ static void do_iocb_complete(void *arg)
 	 * we ignore the value.
 	 */
 	(void)aio_complete(iocb->req, value, 0);
+
 	/*
 	 * delete IOCB
 	 */
 	sdp_iocb_destroy(iocb);
 }
 
+static void deferred_iocb_complete(void *arg)
+{
+	struct sdpc_iocb *iocb = arg;
+	struct sdp_sock *conn = iocb->conn;
+	do_iocb_complete(iocb);
+	if (conn) {
+		sdp_conn_lock(conn);
+		conn->lock.event &= SDP_LOCK_F_DEFERRED;
+		sdp_conn_unlock(conn);
+	}
+}
 /*
  * sdp_iocb_complete - complete an IOCB
  */
-void sdp_iocb_complete(struct sdpc_iocb *iocb, ssize_t status)
+void sdp_iocb_complete(struct sdp_sock *conn, struct sdpc_iocb *iocb,
+		       ssize_t status)
 {
 	iocb->status = status;
-	
-	if (in_atomic() || irqs_disabled()) {
-		INIT_WORK(&iocb->completion, do_iocb_complete, (void *)iocb);
+
+	/* This is always called with connection locked,
+	 * so users == 0 means this was an irq lock. */
+	if (!conn->lock.users) {
+		iocb->conn = conn;
+		INIT_WORK(&iocb->completion, deferred_iocb_complete, iocb);
 		schedule_work(&iocb->completion);
 	} else
 		do_iocb_complete(iocb);
@@ -750,7 +765,8 @@ void sdp_iocb_q_put_head(struct sdpc_ioc
 /*
  * sdp_iocb_q_cancel - cancel all outstanding AIOs in a queue
  */
-void sdp_iocb_q_cancel(struct sdpc_iocb_q *table, u32 mask, ssize_t comp)
+void sdp_iocb_q_cancel(struct sdp_sock *conn, struct sdpc_iocb_q *table,
+		       u32 mask, ssize_t comp)
 {
 	struct sdpc_iocb *iocb;
 	struct sdpc_iocb *next;
@@ -772,7 +788,7 @@ void sdp_iocb_q_cancel(struct sdpc_iocb_
 				    iocb->post, iocb->len);
 
 			sdp_iocb_q_remove(iocb);
-			sdp_iocb_complete(iocb, comp);
+			sdp_iocb_complete(conn, iocb, comp);
 		}
 
 		iocb = next;
Index: linux-2.6.11.12/drivers/infiniband/ulp/sdp/sdp_event.c
===================================================================
--- linux-2.6.11.12.orig/drivers/infiniband/ulp/sdp/sdp_event.c
+++ linux-2.6.11.12/drivers/infiniband/ulp/sdp/sdp_event.c
@@ -169,8 +169,8 @@ void sdp_cq_event_handler(struct ib_cq *
 			/*
 			 * dispatch CQ completions.
 			 */
-			(void)sdp_conn_cq_drain(cq, conn);
-			event = 0;
+			if (!sdp_conn_cq_drain(cq, conn))
+				event = 0;
 		}
 	}
 	/*
Index: linux-2.6.11.12/drivers/infiniband/ulp/sdp/sdp_iocb.h
===================================================================
--- linux-2.6.11.12.orig/drivers/infiniband/ulp/sdp/sdp_iocb.h
+++ linux-2.6.11.12/drivers/infiniband/ulp/sdp/sdp_iocb.h
@@ -109,7 +109,8 @@ struct sdpc_iocb {
 	int           page_count;  /* number of physical pages. */
 	int           page_offset; /* offset into first page. */
 
-	struct work_struct completion; /* task for defered completion. */
+	struct work_struct completion; /* task for deferred completion. */
+	struct sdp_sock *conn; /* conn to unlock for deferred completion. */
 	/*
 	 * kernel iocb structure
 	 */

-- 
MST



More information about the general mailing list