[openib-general] [PATCH (draft)] sdp: fix aio/sync completion race

Michael S. Tsirkin mst at mellanox.co.il
Tue Jun 28 13:18:57 PDT 2005


Libor, here's a stub at solving an old problem.
Most of the patch is passing sdp_opt to iocb complete and cancel functions.

I didnt yet test it, and I wont have the time today,
but maybe you could tell me whether I'm going in the right
direction with this.

---

Use "users" in the lock to prevent more iocbs from completing before
a current aio is done.
Any synchronous transfer would then wait till socket is unlocked.
(Once we switch to get_user_pages, we'll be able to do it only for receive
iocbs).

Signed-off-by: Michael S. Tsirkin <mst at mellanox.co.il>

Index: sdp_write.c
===================================================================
--- sdp_write.c	(revision 2726)
+++ sdp_write.c	(working copy)
@@ -109,7 +109,7 @@ int sdp_event_write(struct sdp_opt *conn
 		SDP_CONN_STAT_WRITE_INC(conn, iocb->post);
 		SDP_CONN_STAT_WQ_DEC(conn, iocb->size);
 
-		sdp_iocb_complete(iocb, 0);
+		sdp_iocb_complete(conn, iocb, 0);
 
 		break;
 	case SDP_DESC_TYPE_NONE:
Index: sdp_rcvd.c
===================================================================
--- sdp_rcvd.c	(revision 2726)
+++ sdp_rcvd.c	(working copy)
@@ -152,7 +152,7 @@ static int sdp_rcvd_send_sm(struct sdp_o
 
 			conn->src_sent--;
 
-			sdp_iocb_complete(iocb, 0);
+			sdp_iocb_complete(conn, iocb, 0);
 		}
 		/*
 		 * Cancel complete, clear the state.
@@ -209,7 +209,7 @@ static int sdp_rcvd_rdma_wr(struct sdp_o
 
 	conn->snk_sent--;
 
-	sdp_iocb_complete(iocb, 0);
+	sdp_iocb_complete(conn, iocb, 0);
 
 	return 0;
 }
@@ -270,7 +270,7 @@ static int sdp_rcvd_rdma_rd(struct sdp_o
 		SDP_CONN_STAT_WRITE_INC(conn, iocb->post);
 		SDP_CONN_STAT_WQ_DEC(conn, iocb->size);
 
-		sdp_iocb_complete(iocb, 0);
+		sdp_iocb_complete(conn, iocb, 0);
 	}
 	/*
 	 * If Source Cancel was in process, and there are no more outstanding
@@ -562,7 +562,7 @@ static int sdp_rcvd_snk_cancel_ack(struc
 	while ((iocb = sdp_iocb_q_get_head(&conn->r_snk))) {
 
 		conn->snk_sent--;
-		sdp_iocb_complete(iocb, 0);
+		sdp_iocb_complete(conn, iocb, 0);
 	}
 	/*
 	 * cancellation is complete. Cancel flag is cleared in RECV post.
@@ -684,7 +684,7 @@ static int sdp_rcvd_snk_avail(struct sdp
 				sdp_desc_q_put_head(&conn->send_queue,
 						    (struct sdpc_desc *)iocb);
 			else
-				sdp_iocb_complete(iocb, 0);
+				sdp_iocb_complete(conn, iocb, 0);
 		}
 		/*
 		 * If Source Cancel was in process, it should now 
Index: sdp_proto.h
===================================================================
--- sdp_proto.h	(revision 2726)
+++ sdp_proto.h	(working copy)
@@ -162,7 +162,8 @@ void sdp_iocb_q_put_tail(struct sdpc_ioc
 
 struct sdpc_iocb *sdp_iocb_q_lookup(struct sdpc_iocb_q *table, u32 key);
 
-void sdp_iocb_q_cancel(struct sdpc_iocb_q *table, u32 mask, ssize_t comp);
+void sdp_iocb_q_cancel(struct sdp_opt *conn, struct sdpc_iocb_q *table,
+		       u32 mask, ssize_t comp);
 
 void sdp_iocb_q_remove(struct sdpc_iocb *iocb);
 
@@ -170,7 +171,8 @@ int sdp_iocb_register(struct sdpc_iocb *
 
 void sdp_iocb_release(struct sdpc_iocb *iocb);
 
-void sdp_iocb_complete(struct sdpc_iocb *iocb, ssize_t status);
+void sdp_iocb_complete(struct sdp_opt *conn, struct sdpc_iocb *iocb,
+		       ssize_t status);
 
 int sdp_iocb_lock(struct sdpc_iocb *iocb);
 
Index: sdp_read.c
===================================================================
--- sdp_read.c	(revision 2726)
+++ sdp_read.c	(working copy)
@@ -205,7 +205,7 @@ int sdp_event_read(struct sdp_opt *conn,
 		SDP_CONN_STAT_READ_INC(conn, iocb->post);
 		SDP_CONN_STAT_RQ_DEC(conn, iocb->size);
 
-		sdp_iocb_complete(iocb, 0);
+		sdp_iocb_complete(conn, iocb, 0);
 
 		break;
 	case SDP_DESC_TYPE_NONE:
@@ -226,7 +226,7 @@ int sdp_event_read(struct sdp_opt *conn,
 		SDP_CONN_STAT_READ_INC(conn, iocb->post);
 		SDP_CONN_STAT_RQ_DEC(conn, iocb->size);
 
-		sdp_iocb_complete(sdp_iocb_q_get_head(&conn->r_pend), 0);
+		sdp_iocb_complete(conn, sdp_iocb_q_get_head(&conn->r_pend), 0);
 
 		break;
 	default:
Index: sdp_send.c
===================================================================
--- sdp_send.c	(revision 2726)
+++ sdp_send.c	(working copy)
@@ -859,7 +859,7 @@ static int sdp_send_data_iocb(struct sdp
 			SDP_CONN_STAT_WRITE_INC(conn, iocb->post);
 			SDP_CONN_STAT_WQ_DEC(conn, iocb->size);
 
-			sdp_iocb_complete(iocb, 0);
+			sdp_iocb_complete(conn, iocb, 0);
 		}
 
 		goto done;
@@ -1730,7 +1730,7 @@ static int sdp_inet_write_cancel(struct 
 			 * needs to be compelted.
 			 */
 			if (iocb->post > 0) {
-				sdp_iocb_complete(iocb, 0);
+				sdp_iocb_complete(conn, iocb, 0);
 				result = -EAGAIN;
 			} else {
 				sdp_iocb_destroy(iocb);
Index: sdp_conn.c
===================================================================
--- sdp_conn.c	(revision 2726)
+++ sdp_conn.c	(working copy)
@@ -697,7 +697,8 @@ static int sdp_desc_q_cancel_lookup_func
 	return ((element->type == SDP_DESC_TYPE_IOCB) ? 0 : -ERANGE);
 }
 
-static void sdp_desc_q_cancel_iocb(struct sdpc_desc_q *table, ssize_t error)
+static void sdp_desc_q_cancel_iocb(struct sdp_opt *conn,
+				   struct sdpc_desc_q *table, ssize_t error)
 {
 	struct sdpc_iocb *iocb;
 
@@ -707,24 +708,24 @@ static void sdp_desc_q_cancel_iocb(struc
 			 NULL))) {
 
 		sdp_iocb_q_remove(iocb);
-		sdp_iocb_complete(iocb, error);
+		sdp_iocb_complete(conn, iocb, error);
 	}
 }
 
 void sdp_iocb_q_cancel_all_read(struct sdp_opt *conn, ssize_t error)
 {
-	sdp_iocb_q_cancel(&conn->r_pend, SDP_IOCB_F_ALL, error);
-	sdp_iocb_q_cancel(&conn->r_snk, SDP_IOCB_F_ALL, error);
+	sdp_iocb_q_cancel(conn, &conn->r_pend, SDP_IOCB_F_ALL, error);
+	sdp_iocb_q_cancel(conn, &conn->r_snk, SDP_IOCB_F_ALL, error);
 
-	sdp_desc_q_cancel_iocb(&conn->r_src, error);
+	sdp_desc_q_cancel_iocb(conn, &conn->r_src, error);
 }
 
 void sdp_iocb_q_cancel_all_write(struct sdp_opt *conn, ssize_t error)
 {
-	sdp_iocb_q_cancel(&conn->w_src, SDP_IOCB_F_ALL, error);
+	sdp_iocb_q_cancel(conn, &conn->w_src, SDP_IOCB_F_ALL, error);
 
-	sdp_desc_q_cancel_iocb(&conn->send_queue, error);
-	sdp_desc_q_cancel_iocb(&conn->w_snk, error);
+	sdp_desc_q_cancel_iocb(conn, &conn->send_queue, error);
+	sdp_desc_q_cancel_iocb(conn, &conn->w_snk, error);
 }
 
 void sdp_iocb_q_cancel_all(struct sdp_opt *conn, ssize_t error)
Index: sdp_recv.c
===================================================================
--- sdp_recv.c	(revision 2726)
+++ sdp_recv.c	(working copy)
@@ -663,7 +663,7 @@ static int sdp_recv_buff_iocb_active(str
 	/*
 	 * callback to complete IOCB
 	 */
-	sdp_iocb_complete(iocb, 0);
+	sdp_iocb_complete(conn, iocb, 0);
 
 	return (buff->tail - buff->data);
 }
@@ -716,7 +716,7 @@ static int sdp_recv_buff_iocb_pending(st
 		/*
 		 * callback to complete IOCB
 		 */
-		sdp_iocb_complete(sdp_iocb_q_get_head(&conn->r_pend), 0);
+		sdp_iocb_complete(conn, sdp_iocb_q_get_head(&conn->r_pend), 0);
 	}
 
 	return (buff->tail - buff->data);
@@ -875,7 +875,7 @@ static int sdp_inet_read_cancel(struct k
 				/*
 				 * callback to complete IOCB, or drop reference
 				 */
-				sdp_iocb_complete(iocb, 0);
+				sdp_iocb_complete(conn, iocb, 0);
 				result = -EAGAIN;
 			}
 			else {
Index: sdp_conn.h
===================================================================
--- sdp_conn.h	(revision 2726)
+++ sdp_conn.h	(working copy)
@@ -471,7 +471,7 @@ static inline void sdp_conn_unlock(struc
 		sdp_conn_internal_unlock(conn);
 	}
 
-	conn->lock.users = 0;
+	--conn->lock.users;
 	wake_up(&conn->lock.waitq);
 
 	spin_unlock_irqrestore(&conn->lock.slock, flags);
Index: sdp_iocb.c
===================================================================
--- sdp_iocb.c	(revision 2726)
+++ sdp_iocb.c	(working copy)
@@ -508,6 +508,8 @@ static void do_iocb_complete(void *arg)
 	 * we ignore the value.
 	 */
 	(void)aio_complete(iocb->req, value, 0);
+
+	sdp_conn_unlock(iocb->conn);
 	/*
 	 * delete IOCB
 	 */
@@ -517,11 +519,14 @@ static void do_iocb_complete(void *arg)
 /*
  * sdp_iocb_complete - complete an IOCB
  */
-void sdp_iocb_complete(struct sdpc_iocb *iocb, ssize_t status)
+void sdp_iocb_complete(struct sdp_opt *conn, struct sdpc_iocb *iocb,
+		       ssize_t status)
 {
 	iocb->status = status;
 	
 	if (in_atomic() || irqs_disabled()) {
+		conn->lock.users++;
+		iocb->conn = conn;
 		INIT_WORK(&iocb->completion, do_iocb_complete, (void *)iocb);
 		schedule_work(&iocb->completion);
 	} else
@@ -750,7 +755,8 @@ void sdp_iocb_q_put_head(struct sdpc_ioc
 /*
  * sdp_iocb_q_cancel - cancel all outstanding AIOs in a queue
  */
-void sdp_iocb_q_cancel(struct sdpc_iocb_q *table, u32 mask, ssize_t comp)
+void sdp_iocb_q_cancel(struct sdp_opt *conn, struct sdpc_iocb_q *table,
+		       u32 mask, ssize_t comp)
 {
 	struct sdpc_iocb *iocb;
 	struct sdpc_iocb *next;
@@ -772,7 +778,7 @@ void sdp_iocb_q_cancel(struct sdpc_iocb_
 				    iocb->post, iocb->len);
 
 			sdp_iocb_q_remove(iocb);
-			sdp_iocb_complete(iocb, comp);
+			sdp_iocb_complete(conn, iocb, comp);
 		}
 
 		iocb = next;
Index: sdp_iocb.h
===================================================================
--- sdp_iocb.h	(revision 2726)
+++ sdp_iocb.h	(working copy)
@@ -109,7 +109,8 @@ struct sdpc_iocb {
 	int           page_count;  /* number of physical pages. */
 	int           page_offset; /* offset into first page. */
 
-	struct work_struct completion; /* task for defered completion. */
+	struct work_struct completion; /* task for deferred completion. */
+	struct sdp_opt *conn; /* conn to unlock for deferred completion. */
 	/*
 	 * kernel iocb structure
 	 */

-- 
MST



More information about the general mailing list