[openib-general] Re: [PATCHv2] fix sdp aio race

Libor Michalek limichal at cisco.com
Thu Jul 21 12:43:49 PDT 2005


On Wed, Jul 20, 2005 at 02:27:11AM +0300, Michael S. Tsirkin wrote:
> Second attempt to solve race where AIO completed asynchronously
> is bypassed by other completions.
> Solution: set a bit in connection, and defer any cq polling
> till work queue handles the AIO. We have to pass the connection
> to iocb handlers for this to work. I also got rid of in_atomic.

  I don't think a bit is sufficient, it would need to be a reference
count, since you'll potentially have multiple outstanding completions.
Also, the lock.event is modified, deferred flag cleared, inside the
conn_lock, but it needs to be protected by the conn_lock's irq.

  I'll look at if it's possible to never get into this ordering issue
by always forcing iocb defered completions...

-Libor
 
> Signed-off-by: Michael S. Tsirkin <mst at mellanox.co.il>
> 
> Index: linux-2.6.11.12/drivers/infiniband/ulp/sdp/sdp_write.c
> ===================================================================
> --- linux-2.6.11.12.orig/drivers/infiniband/ulp/sdp/sdp_write.c
> +++ linux-2.6.11.12/drivers/infiniband/ulp/sdp/sdp_write.c
> @@ -109,7 +109,7 @@ int sdp_event_write(struct sdp_sock *con
>  		SDP_CONN_STAT_WRITE_INC(conn, iocb->post);
>  		SDP_CONN_STAT_WQ_DEC(conn, iocb->size);
>  
> -		sdp_iocb_complete(iocb, 0);
> +		sdp_iocb_complete(conn, iocb, 0);
>  
>  		break;
>  	case SDP_DESC_TYPE_NONE:
> Index: linux-2.6.11.12/drivers/infiniband/ulp/sdp/sdp_rcvd.c
> ===================================================================
> --- linux-2.6.11.12.orig/drivers/infiniband/ulp/sdp/sdp_rcvd.c
> +++ linux-2.6.11.12/drivers/infiniband/ulp/sdp/sdp_rcvd.c
> @@ -155,7 +155,7 @@ static int sdp_rcvd_send_sm(struct sdp_s
>  
>  			conn->src_sent--;
>  
> -			sdp_iocb_complete(iocb, 0);
> +			sdp_iocb_complete(conn, iocb, 0);
>  		}
>  		/*
>  		 * Cancel complete, clear the state.
> @@ -212,7 +212,7 @@ static int sdp_rcvd_rdma_wr(struct sdp_s
>  
>  	conn->snk_sent--;
>  
> -	sdp_iocb_complete(iocb, 0);
> +	sdp_iocb_complete(conn, iocb, 0);
>  
>  	return 0;
>  }
> @@ -273,7 +273,7 @@ static int sdp_rcvd_rdma_rd(struct sdp_s
>  		SDP_CONN_STAT_WRITE_INC(conn, iocb->post);
>  		SDP_CONN_STAT_WQ_DEC(conn, iocb->size);
>  
> -		sdp_iocb_complete(iocb, 0);
> +		sdp_iocb_complete(conn, iocb, 0);
>  	}
>  	/*
>  	 * If Source Cancel was in process, and there are no more outstanding
> @@ -565,7 +565,7 @@ static int sdp_rcvd_snk_cancel_ack(struc
>  	while ((iocb = sdp_iocb_q_get_head(&conn->r_snk))) {
>  
>  		conn->snk_sent--;
> -		sdp_iocb_complete(iocb, 0);
> +		sdp_iocb_complete(conn, iocb, 0);
>  	}
>  	/*
>  	 * cancellation is complete. Cancel flag is cleared in RECV post.
> @@ -687,7 +687,7 @@ static int sdp_rcvd_snk_avail(struct sdp
>  				sdp_desc_q_put_head(&conn->send_queue,
>  						    (struct sdpc_desc *)iocb);
>  			else
> -				sdp_iocb_complete(iocb, 0);
> +				sdp_iocb_complete(conn, iocb, 0);
>  		}
>  		/*
>  		 * If Source Cancel was in process, it should now 
> Index: linux-2.6.11.12/drivers/infiniband/ulp/sdp/sdp_proto.h
> ===================================================================
> --- linux-2.6.11.12.orig/drivers/infiniband/ulp/sdp/sdp_proto.h
> +++ linux-2.6.11.12/drivers/infiniband/ulp/sdp/sdp_proto.h
> @@ -162,7 +162,8 @@ void sdp_iocb_q_put_tail(struct sdpc_ioc
>  
>  struct sdpc_iocb *sdp_iocb_q_lookup(struct sdpc_iocb_q *table, u32 key);
>  
> -void sdp_iocb_q_cancel(struct sdpc_iocb_q *table, u32 mask, ssize_t comp);
> +void sdp_iocb_q_cancel(struct sdp_sock *conn, struct sdpc_iocb_q *table,
> +		       u32 mask, ssize_t comp);
>  
>  void sdp_iocb_q_remove(struct sdpc_iocb *iocb);
>  
> @@ -170,7 +171,8 @@ int sdp_iocb_register(struct sdpc_iocb *
>  
>  void sdp_iocb_release(struct sdpc_iocb *iocb);
>  
> -void sdp_iocb_complete(struct sdpc_iocb *iocb, ssize_t status);
> +void sdp_iocb_complete(struct sdp_sock *conn, struct sdpc_iocb *iocb,
> +		       ssize_t status);
>  
>  int sdp_iocb_lock(struct sdpc_iocb *iocb);
>  
> Index: linux-2.6.11.12/drivers/infiniband/ulp/sdp/sdp_read.c
> ===================================================================
> --- linux-2.6.11.12.orig/drivers/infiniband/ulp/sdp/sdp_read.c
> +++ linux-2.6.11.12/drivers/infiniband/ulp/sdp/sdp_read.c
> @@ -207,7 +207,7 @@ int sdp_event_read(struct sdp_sock *conn
>  		SDP_CONN_STAT_READ_INC(conn, iocb->post);
>  		SDP_CONN_STAT_RQ_DEC(conn, iocb->size);
>  
> -		sdp_iocb_complete(iocb, 0);
> +		sdp_iocb_complete(conn, iocb, 0);
>  
>  		break;
>  	case SDP_DESC_TYPE_NONE:
> @@ -228,7 +228,7 @@ int sdp_event_read(struct sdp_sock *conn
>  		SDP_CONN_STAT_READ_INC(conn, iocb->post);
>  		SDP_CONN_STAT_RQ_DEC(conn, iocb->size);
>  
> -		sdp_iocb_complete(sdp_iocb_q_get_head(&conn->r_pend), 0);
> +		sdp_iocb_complete(conn, sdp_iocb_q_get_head(&conn->r_pend), 0);
>  
>  		break;
>  	default:
> Index: linux-2.6.11.12/drivers/infiniband/ulp/sdp/sdp_send.c
> ===================================================================
> --- linux-2.6.11.12.orig/drivers/infiniband/ulp/sdp/sdp_send.c
> +++ linux-2.6.11.12/drivers/infiniband/ulp/sdp/sdp_send.c
> @@ -859,7 +859,7 @@ static int sdp_send_data_iocb(struct sdp
>  			SDP_CONN_STAT_WRITE_INC(conn, iocb->post);
>  			SDP_CONN_STAT_WQ_DEC(conn, iocb->size);
>  
> -			sdp_iocb_complete(iocb, 0);
> +			sdp_iocb_complete(conn, iocb, 0);
>  		}
>  
>  		goto done;
> @@ -1730,7 +1730,7 @@ static int sdp_inet_write_cancel(struct 
>  			 * needs to be compelted.
>  			 */
>  			if (iocb->post > 0) {
> -				sdp_iocb_complete(iocb, 0);
> +				sdp_iocb_complete(conn, iocb, 0);
>  				result = -EAGAIN;
>  			} else {
>  				sdp_iocb_destroy(iocb);
> Index: linux-2.6.11.12/drivers/infiniband/ulp/sdp/sdp_conn.c
> ===================================================================
> --- linux-2.6.11.12.orig/drivers/infiniband/ulp/sdp/sdp_conn.c
> +++ linux-2.6.11.12/drivers/infiniband/ulp/sdp/sdp_conn.c
> @@ -665,7 +665,8 @@ static int sdp_desc_q_cancel_lookup_func
>  	return ((element->type == SDP_DESC_TYPE_IOCB) ? 0 : -ERANGE);
>  }
>  
> -static void sdp_desc_q_cancel_iocb(struct sdpc_desc_q *table, ssize_t error)
> +static void sdp_desc_q_cancel_iocb(struct sdp_sock *conn,
> +				   struct sdpc_desc_q *table, ssize_t error)
>  {
>  	struct sdpc_iocb *iocb;
>  
> @@ -675,24 +676,24 @@ static void sdp_desc_q_cancel_iocb(struc
>  			 NULL))) {
>  
>  		sdp_iocb_q_remove(iocb);
> -		sdp_iocb_complete(iocb, error);
> +		sdp_iocb_complete(conn, iocb, error);
>  	}
>  }
>  
>  void sdp_iocb_q_cancel_all_read(struct sdp_sock *conn, ssize_t error)
>  {
> -	sdp_iocb_q_cancel(&conn->r_pend, SDP_IOCB_F_ALL, error);
> -	sdp_iocb_q_cancel(&conn->r_snk, SDP_IOCB_F_ALL, error);
> +	sdp_iocb_q_cancel(conn, &conn->r_pend, SDP_IOCB_F_ALL, error);
> +	sdp_iocb_q_cancel(conn, &conn->r_snk, SDP_IOCB_F_ALL, error);
>  
> -	sdp_desc_q_cancel_iocb(&conn->r_src, error);
> +	sdp_desc_q_cancel_iocb(conn, &conn->r_src, error);
>  }
>  
>  void sdp_iocb_q_cancel_all_write(struct sdp_sock *conn, ssize_t error)
>  {
> -	sdp_iocb_q_cancel(&conn->w_src, SDP_IOCB_F_ALL, error);
> +	sdp_iocb_q_cancel(conn, &conn->w_src, SDP_IOCB_F_ALL, error);
>  
> -	sdp_desc_q_cancel_iocb(&conn->send_queue, error);
> -	sdp_desc_q_cancel_iocb(&conn->w_snk, error);
> +	sdp_desc_q_cancel_iocb(conn, &conn->send_queue, error);
> +	sdp_desc_q_cancel_iocb(conn, &conn->w_snk, error);
>  }
>  
>  void sdp_iocb_q_cancel_all(struct sdp_sock *conn, ssize_t error)
> @@ -927,7 +928,7 @@ int sdp_conn_cq_drain(struct ib_cq *cq, 
>  	 * the function should only be called under the connection locks
>  	 * spinlock to ensure the call is serialized to avoid races.
>  	 */
> -	for (;;) {
> +	while(!(conn->lock.event & SDP_LOCK_F_DEFERRED)) {
>  		/*
>  		 * poll for a completion
>  		 */
> @@ -971,17 +972,16 @@ int sdp_conn_cq_drain(struct ib_cq *cq, 
>   */
>  void sdp_conn_internal_unlock(struct sdp_sock *conn)
>  {
> -	int calls = 0;
>  	/*
>  	 * poll CQs for events.
>  	 */
> -	if (conn->lock.event & SDP_LOCK_F_RECV_CQ)
> -		calls += sdp_conn_cq_drain(conn->recv_cq, conn);
> -
> -	if (conn->lock.event & SDP_LOCK_F_SEND_CQ)
> -		calls += sdp_conn_cq_drain(conn->send_cq, conn);
> -
> -	conn->lock.event = 0;
> +	if ((conn->lock.event & SDP_LOCK_F_RECV_CQ) &&
> +		!sdp_conn_cq_drain(conn->recv_cq, conn))
> +		conn->lock.event &= ~SDP_LOCK_F_RECV_CQ;
> +
> +	if ((conn->lock.event & SDP_LOCK_F_SEND_CQ) &&
> +		!sdp_conn_cq_drain(conn->send_cq, conn))
> +		conn->lock.event &= ~SDP_LOCK_F_SEND_CQ;
>  }
>  
>  /*
> Index: linux-2.6.11.12/drivers/infiniband/ulp/sdp/sdp_recv.c
> ===================================================================
> --- linux-2.6.11.12.orig/drivers/infiniband/ulp/sdp/sdp_recv.c
> +++ linux-2.6.11.12/drivers/infiniband/ulp/sdp/sdp_recv.c
> @@ -663,7 +663,7 @@ static int sdp_recv_buff_iocb_active(str
>  	/*
>  	 * callback to complete IOCB
>  	 */
> -	sdp_iocb_complete(iocb, 0);
> +	sdp_iocb_complete(conn, iocb, 0);
>  
>  	return (buff->tail - buff->data);
>  }
> @@ -716,7 +716,7 @@ static int sdp_recv_buff_iocb_pending(st
>  		/*
>  		 * callback to complete IOCB
>  		 */
> -		sdp_iocb_complete(sdp_iocb_q_get_head(&conn->r_pend), 0);
> +		sdp_iocb_complete(conn, sdp_iocb_q_get_head(&conn->r_pend), 0);
>  	}
>  
>  	return (buff->tail - buff->data);
> @@ -875,7 +875,7 @@ static int sdp_inet_read_cancel(struct k
>  				/*
>  				 * callback to complete IOCB, or drop reference
>  				 */
> -				sdp_iocb_complete(iocb, 0);
> +				sdp_iocb_complete(conn, iocb, 0);
>  				result = -EAGAIN;
>  			}
>  			else {
> Index: linux-2.6.11.12/drivers/infiniband/ulp/sdp/sdp_conn.h
> ===================================================================
> --- linux-2.6.11.12.orig/drivers/infiniband/ulp/sdp/sdp_conn.h
> +++ linux-2.6.11.12/drivers/infiniband/ulp/sdp/sdp_conn.h
> @@ -163,8 +163,9 @@ struct sdp_conn_lock {
>  	wait_queue_head_t waitq;
>  };
>  
> -#define SDP_LOCK_F_RECV_CQ 0x01 /* recv CQ event is pending */
> -#define SDP_LOCK_F_SEND_CQ 0x02 /* send CQ event is pending */
> +#define SDP_LOCK_F_RECV_CQ  0x01 /* recv CQ event is pending */
> +#define SDP_LOCK_F_SEND_CQ  0x02 /* send CQ event is pending */
> +#define SDP_LOCK_F_DEFERRED 0x04 /* work scheduled, pending completion */
>  /*
>   * SDP Connection structure.
>   */
> Index: linux-2.6.11.12/drivers/infiniband/ulp/sdp/sdp_iocb.c
> ===================================================================
> --- linux-2.6.11.12.orig/drivers/infiniband/ulp/sdp/sdp_iocb.c
> +++ linux-2.6.11.12/drivers/infiniband/ulp/sdp/sdp_iocb.c
> @@ -483,9 +483,8 @@ void sdp_iocb_release(struct sdpc_iocb *
>  /*
>   * do_iocb_complete - complete an IOCB for real in thread context
>   */
> -static void do_iocb_complete(void *arg)
> +static void do_iocb_complete(struct sdpc_iocb *iocb)
>  {
> -	struct sdpc_iocb *iocb = (struct sdpc_iocb *)arg;
>  	long value;
>  	/*
>  	 * release memory
> @@ -508,21 +507,37 @@ static void do_iocb_complete(void *arg)
>  	 * we ignore the value.
>  	 */
>  	(void)aio_complete(iocb->req, value, 0);
> +
>  	/*
>  	 * delete IOCB
>  	 */
>  	sdp_iocb_destroy(iocb);
>  }
>  
> +static void deferred_iocb_complete(void *arg)
> +{
> +	struct sdpc_iocb *iocb = arg;
> +	struct sdp_sock *conn = iocb->conn;
> +	do_iocb_complete(iocb);
> +	if (conn) {
> +		sdp_conn_lock(conn);
> +		conn->lock.event &= SDP_LOCK_F_DEFERRED;
> +		sdp_conn_unlock(conn);
> +	}
> +}
>  /*
>   * sdp_iocb_complete - complete an IOCB
>   */
> -void sdp_iocb_complete(struct sdpc_iocb *iocb, ssize_t status)
> +void sdp_iocb_complete(struct sdp_sock *conn, struct sdpc_iocb *iocb,
> +		       ssize_t status)
>  {
>  	iocb->status = status;
> -	
> -	if (in_atomic() || irqs_disabled()) {
> -		INIT_WORK(&iocb->completion, do_iocb_complete, (void *)iocb);
> +
> +	/* This is always called with connection locked,
> +	 * so users == 0 means this was an irq lock. */
> +	if (!conn->lock.users) {
> +		iocb->conn = conn;
> +		INIT_WORK(&iocb->completion, deferred_iocb_complete, iocb);
>  		schedule_work(&iocb->completion);
>  	} else
>  		do_iocb_complete(iocb);
> @@ -750,7 +765,8 @@ void sdp_iocb_q_put_head(struct sdpc_ioc
>  /*
>   * sdp_iocb_q_cancel - cancel all outstanding AIOs in a queue
>   */
> -void sdp_iocb_q_cancel(struct sdpc_iocb_q *table, u32 mask, ssize_t comp)
> +void sdp_iocb_q_cancel(struct sdp_sock *conn, struct sdpc_iocb_q *table,
> +		       u32 mask, ssize_t comp)
>  {
>  	struct sdpc_iocb *iocb;
>  	struct sdpc_iocb *next;
> @@ -772,7 +788,7 @@ void sdp_iocb_q_cancel(struct sdpc_iocb_
>  				    iocb->post, iocb->len);
>  
>  			sdp_iocb_q_remove(iocb);
> -			sdp_iocb_complete(iocb, comp);
> +			sdp_iocb_complete(conn, iocb, comp);
>  		}
>  
>  		iocb = next;
> Index: linux-2.6.11.12/drivers/infiniband/ulp/sdp/sdp_event.c
> ===================================================================
> --- linux-2.6.11.12.orig/drivers/infiniband/ulp/sdp/sdp_event.c
> +++ linux-2.6.11.12/drivers/infiniband/ulp/sdp/sdp_event.c
> @@ -169,8 +169,8 @@ void sdp_cq_event_handler(struct ib_cq *
>  			/*
>  			 * dispatch CQ completions.
>  			 */
> -			(void)sdp_conn_cq_drain(cq, conn);
> -			event = 0;
> +			if (!sdp_conn_cq_drain(cq, conn))
> +				event = 0;
>  		}
>  	}
>  	/*
> Index: linux-2.6.11.12/drivers/infiniband/ulp/sdp/sdp_iocb.h
> ===================================================================
> --- linux-2.6.11.12.orig/drivers/infiniband/ulp/sdp/sdp_iocb.h
> +++ linux-2.6.11.12/drivers/infiniband/ulp/sdp/sdp_iocb.h
> @@ -109,7 +109,8 @@ struct sdpc_iocb {
>  	int           page_count;  /* number of physical pages. */
>  	int           page_offset; /* offset into first page. */
>  
> -	struct work_struct completion; /* task for defered completion. */
> +	struct work_struct completion; /* task for deferred completion. */
> +	struct sdp_sock *conn; /* conn to unlock for deferred completion. */
>  	/*
>  	 * kernel iocb structure
>  	 */
> 
> -- 
> MST



More information about the general mailing list