[openib-general] Re: [PATCH (draft)] sdp: fix aio/sync completion race

Libor Michalek libor at topspin.com
Thu Jun 30 18:16:36 PDT 2005


On Tue, Jun 28, 2005 at 11:18:57PM +0300, Michael S. Tsirkin wrote:
> Libor, here's a stub at solving an old problem.
> Most of the patch is passing sdp_opt to iocb complete and cancel functions.
> 
> I didnt yet test it, and I wont have the time today,
> but maybe you could tell me whether I'm going in the right
> direction with this.

  I'm wondering if we should prevent a new call from proceeding until
all IOCBs have been completed, like the patch you have, or if we should
force the creation of an IOCB when there are any IOCBs outstanding,
even if the call can be processed synchronously.

  Remember the original problem was IOCB completes in the work queue
completing after a call to send/recv that was processed synchronously.
Either forcing an IOCB or waiting on the lock should prevent the
ordering issue.

  I think either case if means passing the sdp_opt to iocb_complete. In
do_iocb_complete you cannot call conn_unlock unconditionally since
the lock.users variable is only incremented in_atomic() or irqs_disabled()
Also, I think the conn reference needs to be incremented before
creating the work request, and decremeneted upon completion since
nothing prevents it from going away before the thread is scheduled.


-Libor

> ---
> 
> Use "users" in the lock to prevent more iocbs from completing before
> a current aio is done.
> Any synchronous transfer would then wait till socket is unlocked.
> (Once we switch to get_user_pages, we'll be able to do it only for receive
> iocbs).
> 
> Signed-off-by: Michael S. Tsirkin <mst at mellanox.co.il>
> 
> Index: sdp_write.c
> ===================================================================
> --- sdp_write.c	(revision 2726)
> +++ sdp_write.c	(working copy)
> @@ -109,7 +109,7 @@ int sdp_event_write(struct sdp_opt *conn
>  		SDP_CONN_STAT_WRITE_INC(conn, iocb->post);
>  		SDP_CONN_STAT_WQ_DEC(conn, iocb->size);
>  
> -		sdp_iocb_complete(iocb, 0);
> +		sdp_iocb_complete(conn, iocb, 0);
>  
>  		break;
>  	case SDP_DESC_TYPE_NONE:
> Index: sdp_rcvd.c
> ===================================================================
> --- sdp_rcvd.c	(revision 2726)
> +++ sdp_rcvd.c	(working copy)
> @@ -152,7 +152,7 @@ static int sdp_rcvd_send_sm(struct sdp_o
>  
>  			conn->src_sent--;
>  
> -			sdp_iocb_complete(iocb, 0);
> +			sdp_iocb_complete(conn, iocb, 0);
>  		}
>  		/*
>  		 * Cancel complete, clear the state.
> @@ -209,7 +209,7 @@ static int sdp_rcvd_rdma_wr(struct sdp_o
>  
>  	conn->snk_sent--;
>  
> -	sdp_iocb_complete(iocb, 0);
> +	sdp_iocb_complete(conn, iocb, 0);
>  
>  	return 0;
>  }
> @@ -270,7 +270,7 @@ static int sdp_rcvd_rdma_rd(struct sdp_o
>  		SDP_CONN_STAT_WRITE_INC(conn, iocb->post);
>  		SDP_CONN_STAT_WQ_DEC(conn, iocb->size);
>  
> -		sdp_iocb_complete(iocb, 0);
> +		sdp_iocb_complete(conn, iocb, 0);
>  	}
>  	/*
>  	 * If Source Cancel was in process, and there are no more outstanding
> @@ -562,7 +562,7 @@ static int sdp_rcvd_snk_cancel_ack(struc
>  	while ((iocb = sdp_iocb_q_get_head(&conn->r_snk))) {
>  
>  		conn->snk_sent--;
> -		sdp_iocb_complete(iocb, 0);
> +		sdp_iocb_complete(conn, iocb, 0);
>  	}
>  	/*
>  	 * cancellation is complete. Cancel flag is cleared in RECV post.
> @@ -684,7 +684,7 @@ static int sdp_rcvd_snk_avail(struct sdp
>  				sdp_desc_q_put_head(&conn->send_queue,
>  						    (struct sdpc_desc *)iocb);
>  			else
> -				sdp_iocb_complete(iocb, 0);
> +				sdp_iocb_complete(conn, iocb, 0);
>  		}
>  		/*
>  		 * If Source Cancel was in process, it should now 
> Index: sdp_proto.h
> ===================================================================
> --- sdp_proto.h	(revision 2726)
> +++ sdp_proto.h	(working copy)
> @@ -162,7 +162,8 @@ void sdp_iocb_q_put_tail(struct sdpc_ioc
>  
>  struct sdpc_iocb *sdp_iocb_q_lookup(struct sdpc_iocb_q *table, u32 key);
>  
> -void sdp_iocb_q_cancel(struct sdpc_iocb_q *table, u32 mask, ssize_t comp);
> +void sdp_iocb_q_cancel(struct sdp_opt *conn, struct sdpc_iocb_q *table,
> +		       u32 mask, ssize_t comp);
>  
>  void sdp_iocb_q_remove(struct sdpc_iocb *iocb);
>  
> @@ -170,7 +171,8 @@ int sdp_iocb_register(struct sdpc_iocb *
>  
>  void sdp_iocb_release(struct sdpc_iocb *iocb);
>  
> -void sdp_iocb_complete(struct sdpc_iocb *iocb, ssize_t status);
> +void sdp_iocb_complete(struct sdp_opt *conn, struct sdpc_iocb *iocb,
> +		       ssize_t status);
>  
>  int sdp_iocb_lock(struct sdpc_iocb *iocb);
>  
> Index: sdp_read.c
> ===================================================================
> --- sdp_read.c	(revision 2726)
> +++ sdp_read.c	(working copy)
> @@ -205,7 +205,7 @@ int sdp_event_read(struct sdp_opt *conn,
>  		SDP_CONN_STAT_READ_INC(conn, iocb->post);
>  		SDP_CONN_STAT_RQ_DEC(conn, iocb->size);
>  
> -		sdp_iocb_complete(iocb, 0);
> +		sdp_iocb_complete(conn, iocb, 0);
>  
>  		break;
>  	case SDP_DESC_TYPE_NONE:
> @@ -226,7 +226,7 @@ int sdp_event_read(struct sdp_opt *conn,
>  		SDP_CONN_STAT_READ_INC(conn, iocb->post);
>  		SDP_CONN_STAT_RQ_DEC(conn, iocb->size);
>  
> -		sdp_iocb_complete(sdp_iocb_q_get_head(&conn->r_pend), 0);
> +		sdp_iocb_complete(conn, sdp_iocb_q_get_head(&conn->r_pend), 0);
>  
>  		break;
>  	default:
> Index: sdp_send.c
> ===================================================================
> --- sdp_send.c	(revision 2726)
> +++ sdp_send.c	(working copy)
> @@ -859,7 +859,7 @@ static int sdp_send_data_iocb(struct sdp
>  			SDP_CONN_STAT_WRITE_INC(conn, iocb->post);
>  			SDP_CONN_STAT_WQ_DEC(conn, iocb->size);
>  
> -			sdp_iocb_complete(iocb, 0);
> +			sdp_iocb_complete(conn, iocb, 0);
>  		}
>  
>  		goto done;
> @@ -1730,7 +1730,7 @@ static int sdp_inet_write_cancel(struct 
>  			 * needs to be compelted.
>  			 */
>  			if (iocb->post > 0) {
> -				sdp_iocb_complete(iocb, 0);
> +				sdp_iocb_complete(conn, iocb, 0);
>  				result = -EAGAIN;
>  			} else {
>  				sdp_iocb_destroy(iocb);
> Index: sdp_conn.c
> ===================================================================
> --- sdp_conn.c	(revision 2726)
> +++ sdp_conn.c	(working copy)
> @@ -697,7 +697,8 @@ static int sdp_desc_q_cancel_lookup_func
>  	return ((element->type == SDP_DESC_TYPE_IOCB) ? 0 : -ERANGE);
>  }
>  
> -static void sdp_desc_q_cancel_iocb(struct sdpc_desc_q *table, ssize_t error)
> +static void sdp_desc_q_cancel_iocb(struct sdp_opt *conn,
> +				   struct sdpc_desc_q *table, ssize_t error)
>  {
>  	struct sdpc_iocb *iocb;
>  
> @@ -707,24 +708,24 @@ static void sdp_desc_q_cancel_iocb(struc
>  			 NULL))) {
>  
>  		sdp_iocb_q_remove(iocb);
> -		sdp_iocb_complete(iocb, error);
> +		sdp_iocb_complete(conn, iocb, error);
>  	}
>  }
>  
>  void sdp_iocb_q_cancel_all_read(struct sdp_opt *conn, ssize_t error)
>  {
> -	sdp_iocb_q_cancel(&conn->r_pend, SDP_IOCB_F_ALL, error);
> -	sdp_iocb_q_cancel(&conn->r_snk, SDP_IOCB_F_ALL, error);
> +	sdp_iocb_q_cancel(conn, &conn->r_pend, SDP_IOCB_F_ALL, error);
> +	sdp_iocb_q_cancel(conn, &conn->r_snk, SDP_IOCB_F_ALL, error);
>  
> -	sdp_desc_q_cancel_iocb(&conn->r_src, error);
> +	sdp_desc_q_cancel_iocb(conn, &conn->r_src, error);
>  }
>  
>  void sdp_iocb_q_cancel_all_write(struct sdp_opt *conn, ssize_t error)
>  {
> -	sdp_iocb_q_cancel(&conn->w_src, SDP_IOCB_F_ALL, error);
> +	sdp_iocb_q_cancel(conn, &conn->w_src, SDP_IOCB_F_ALL, error);
>  
> -	sdp_desc_q_cancel_iocb(&conn->send_queue, error);
> -	sdp_desc_q_cancel_iocb(&conn->w_snk, error);
> +	sdp_desc_q_cancel_iocb(conn, &conn->send_queue, error);
> +	sdp_desc_q_cancel_iocb(conn, &conn->w_snk, error);
>  }
>  
>  void sdp_iocb_q_cancel_all(struct sdp_opt *conn, ssize_t error)
> Index: sdp_recv.c
> ===================================================================
> --- sdp_recv.c	(revision 2726)
> +++ sdp_recv.c	(working copy)
> @@ -663,7 +663,7 @@ static int sdp_recv_buff_iocb_active(str
>  	/*
>  	 * callback to complete IOCB
>  	 */
> -	sdp_iocb_complete(iocb, 0);
> +	sdp_iocb_complete(conn, iocb, 0);
>  
>  	return (buff->tail - buff->data);
>  }
> @@ -716,7 +716,7 @@ static int sdp_recv_buff_iocb_pending(st
>  		/*
>  		 * callback to complete IOCB
>  		 */
> -		sdp_iocb_complete(sdp_iocb_q_get_head(&conn->r_pend), 0);
> +		sdp_iocb_complete(conn, sdp_iocb_q_get_head(&conn->r_pend), 0);
>  	}
>  
>  	return (buff->tail - buff->data);
> @@ -875,7 +875,7 @@ static int sdp_inet_read_cancel(struct k
>  				/*
>  				 * callback to complete IOCB, or drop reference
>  				 */
> -				sdp_iocb_complete(iocb, 0);
> +				sdp_iocb_complete(conn, iocb, 0);
>  				result = -EAGAIN;
>  			}
>  			else {
> Index: sdp_conn.h
> ===================================================================
> --- sdp_conn.h	(revision 2726)
> +++ sdp_conn.h	(working copy)
> @@ -471,7 +471,7 @@ static inline void sdp_conn_unlock(struc
>  		sdp_conn_internal_unlock(conn);
>  	}
>  
> -	conn->lock.users = 0;
> +	--conn->lock.users;
>  	wake_up(&conn->lock.waitq);
>  
>  	spin_unlock_irqrestore(&conn->lock.slock, flags);
> Index: sdp_iocb.c
> ===================================================================
> --- sdp_iocb.c	(revision 2726)
> +++ sdp_iocb.c	(working copy)
> @@ -508,6 +508,8 @@ static void do_iocb_complete(void *arg)
>  	 * we ignore the value.
>  	 */
>  	(void)aio_complete(iocb->req, value, 0);
> +
> +	sdp_conn_unlock(iocb->conn);
>  	/*
>  	 * delete IOCB
>  	 */
> @@ -517,11 +519,14 @@ static void do_iocb_complete(void *arg)
>  /*
>   * sdp_iocb_complete - complete an IOCB
>   */
> -void sdp_iocb_complete(struct sdpc_iocb *iocb, ssize_t status)
> +void sdp_iocb_complete(struct sdp_opt *conn, struct sdpc_iocb *iocb,
> +		       ssize_t status)
>  {
>  	iocb->status = status;
>  	
>  	if (in_atomic() || irqs_disabled()) {
> +		conn->lock.users++;
> +		iocb->conn = conn;
>  		INIT_WORK(&iocb->completion, do_iocb_complete, (void *)iocb);
>  		schedule_work(&iocb->completion);
>  	} else
> @@ -750,7 +755,8 @@ void sdp_iocb_q_put_head(struct sdpc_ioc
>  /*
>   * sdp_iocb_q_cancel - cancel all outstanding AIOs in a queue
>   */
> -void sdp_iocb_q_cancel(struct sdpc_iocb_q *table, u32 mask, ssize_t comp)
> +void sdp_iocb_q_cancel(struct sdp_opt *conn, struct sdpc_iocb_q *table,
> +		       u32 mask, ssize_t comp)
>  {
>  	struct sdpc_iocb *iocb;
>  	struct sdpc_iocb *next;
> @@ -772,7 +778,7 @@ void sdp_iocb_q_cancel(struct sdpc_iocb_
>  				    iocb->post, iocb->len);
>  
>  			sdp_iocb_q_remove(iocb);
> -			sdp_iocb_complete(iocb, comp);
> +			sdp_iocb_complete(conn, iocb, comp);
>  		}
>  
>  		iocb = next;
> Index: sdp_iocb.h
> ===================================================================
> --- sdp_iocb.h	(revision 2726)
> +++ sdp_iocb.h	(working copy)
> @@ -109,7 +109,8 @@ struct sdpc_iocb {
>  	int           page_count;  /* number of physical pages. */
>  	int           page_offset; /* offset into first page. */
>  
> -	struct work_struct completion; /* task for defered completion. */
> +	struct work_struct completion; /* task for deferred completion. */
> +	struct sdp_opt *conn; /* conn to unlock for deferred completion. */
>  	/*
>  	 * kernel iocb structure
>  	 */
> 
> -- 
> MST



More information about the general mailing list