[ofa-general] [PATCH RFC/untested v0.5] IPoIB/CM: fix SRQ WR leak

Michael S. Tsirkin mst at dev.mellanox.co.il
Wed May 16 03:14:57 PDT 2007


SRQ WR leakage has been observed with IPoIB/CM: e.g. flipping ports on and off
will, with time, leak out all WRs and then all connections will start getting RNR
NACKs. Fix this in the way suggested by spec: move QP to error, wait for last
wqe reached event and then post send on "drain QP" connected to the same CQ.
Once we observe a completion on the drain QP, it's safe to call ib_destroy_qp.

Signed-off-by: Michael S. Tsirkin <mst at dev.mellanox.co.il>

---

Changes from v0: fixed drain WR ID, comment on the algorithm used,
cleaned up the patch description.

Roland, all. This is a largish, and still untested, patch that fixes a design bug in
the way IPoIB/CM destroyed QPs connected to SRQ.

Unfortunately, doing it by the spec kind of forces us to add a "state"
for passive connections, and split the connection list per connection state.
That's why the patch grew to be so large.

The issue addressed is very severe (the only work-around is to unload the ipoib module
once in a while), so given how large the patch is, I'd like to ask everyone to review
and comment.

NB: this is on top of 2.6.22-rc1.

 ipoib.h       |   38 ++++++++++
 ipoib_cm.c    |  208 ++++++++++++++++++++++++++++++++++++++++++++++++----------
 ipoib_verbs.c |    2
 3 files changed, 212 insertions(+), 36 deletions(-)

diff --git a/drivers/infiniband/ulp/ipoib/ipoib.h b/drivers/infiniband/ulp/ipoib/ipoib.h
index 87310ee..087bbfc 100644
--- a/drivers/infiniband/ulp/ipoib/ipoib.h
+++ b/drivers/infiniband/ulp/ipoib/ipoib.h
@@ -132,12 +132,43 @@ struct ipoib_cm_data {
 	__be32 mtu;
 };
 
+/*
+ * Quoting 10.3.1 Queue Pair and EE Context States:
+ *
+ * Note, for QPs that are associated with an SRQ, the Consumer should take the
+ * QP through the Error State before invoking a Destroy QP or a Modify QP to the
+ * Reset State.  The Consumer may invoke the Destroy QP without first performing
+ * a Modify QP to the Error State and waiting for the Affiliated Asynchronous
+ * Last WQE Reached Event. However, if the Consumer does not wait for the
+ * Affiliated Asynchronous Last WQE Reached Event, then WQE and Data Segment
+ * leakage may occur. Therefore, it is good programming practice to tear down a
+ * QP that is associated with an SRQ by using the following process:
+ *
+ * - Put the QP in the Error State
+ * - Wait for the Affiliated Asynchronous Last WQE Reached Event;
+ * - either:
+ *       drain the CQ by invoking the Poll CQ verb and either wait for CQ
+ *       to be empty or the number of Poll CQ operations has exceeded
+ *       CQ capacity size;
+ * - or
+ *       post another WR that completes on the same CQ and wait for this
+ *       WR to return as a WC; (NB: this is the option that we use)
+ * and then invoke a Destroy QP or Reset QP.
+ */
+
+enum ipoib_cm_state {
+	IPOIB_CM_RX_LIVE,
+	IPOIB_CM_RX_ERROR, /* Ignored by stale task */
+	IPOIB_CM_RX_FLUSH  /* Last WQE Reached event observed */
+};
+
 struct ipoib_cm_rx {
 	struct ib_cm_id     *id;
 	struct ib_qp        *qp;
 	struct list_head     list;
 	struct net_device   *dev;
 	unsigned long        jiffies;
+	enum ipoib_cm_state  state;
 };
 
 struct ipoib_cm_tx {
@@ -165,10 +196,15 @@ struct ipoib_cm_dev_priv {
 	struct ib_srq  	       *srq;
 	struct ipoib_cm_rx_buf *srq_ring;
 	struct ib_cm_id        *id;
-	struct list_head        passive_ids;
+	struct ib_qp           *rx_drain_qp;   /* generates WR described in 10.3.1 */
+	struct list_head        passive_ids;   /* state: LIVE */
+	struct list_head        rx_error_list; /* state: ERROR */
+	struct list_head        rx_flush_list; /* state: FLUSH, drain not started */
+	struct list_head        rx_drain_list; /* state: FLUSH, drain started */
 	struct work_struct      start_task;
 	struct work_struct      reap_task;
 	struct work_struct      skb_task;
+	struct work_struct      rx_drain_task;
 	struct delayed_work     stale_task;
 	struct sk_buff_head     skb_queue;
 	struct list_head        start_list;
diff --git a/drivers/infiniband/ulp/ipoib/ipoib_cm.c b/drivers/infiniband/ulp/ipoib/ipoib_cm.c
index 785bc85..d4e4cf3 100644
--- a/drivers/infiniband/ulp/ipoib/ipoib_cm.c
+++ b/drivers/infiniband/ulp/ipoib/ipoib_cm.c
@@ -62,6 +62,17 @@ struct ipoib_cm_id {
 	u32 remote_mtu;
 };
 
+static struct ib_qp_attr ipoib_cm_err_attr __read_mostly = {
+	.qp_state = IB_QPS_ERR
+};
+
+#define IPOIB_CM_RX_DRAIN_WRID 0x7fffffff
+
+static struct ib_send_wr ipoib_cm_rx_drain_wr __read_mostly = {
+	.wr_id = IPOIB_CM_RX_DRAIN_WRID,
+	.opcode = IB_WR_SEND
+};
+
 static int ipoib_cm_tx_handler(struct ib_cm_id *cm_id,
 			       struct ib_cm_event *event);
 
@@ -150,11 +161,44 @@ partial_error:
 	return NULL;
 }
 
+static void ipoib_cm_start_rx_drain(struct ipoib_dev_priv* priv)
+{
+	struct ib_send_wr *bad_send_wr;
+
+	/* rx_drain_qp send queue depth is 1, so
+	 * make sure we have at most 1 outstanding WR. */
+	if (list_empty(&priv->cm.rx_flush_list) ||
+	    !list_empty(&priv->cm.rx_drain_list))
+		return;
+
+	if (ib_post_send(priv->cm.rx_drain_qp, &ipoib_cm_rx_drain_wr, &bad_send_wr))
+		ipoib_warn(priv, "failed to post rx_drain wr\n");
+
+	list_splice_init(&priv->cm.rx_flush_list, &priv->cm.rx_drain_list);
+}
+
+static void ipoib_cm_rx_event_handler(struct ib_event *event, void *ctx)
+{
+	struct ipoib_cm_rx *p = ctx;
+	struct ipoib_dev_priv *priv = netdev_priv(p->dev);
+	unsigned long flags;
+
+	if (event->event != IB_EVENT_QP_LAST_WQE_REACHED)
+		return;
+
+	spin_lock_irqsave(&priv->lock, flags);
+	list_move(&p->list, &priv->cm.rx_flush_list);
+	p->state = IPOIB_CM_RX_FLUSH;
+	ipoib_cm_start_rx_drain(priv);
+	spin_unlock_irqrestore(&priv->lock, flags);
+}
+
 static struct ib_qp *ipoib_cm_create_rx_qp(struct net_device *dev,
 					   struct ipoib_cm_rx *p)
 {
 	struct ipoib_dev_priv *priv = netdev_priv(dev);
 	struct ib_qp_init_attr attr = {
+		.event_handler = ipoib_cm_rx_event_handler,
 		.send_cq = priv->cq, /* does not matter, we never send anything */
 		.recv_cq = priv->cq,
 		.srq = priv->cm.srq,
@@ -256,6 +300,7 @@ static int ipoib_cm_req_handler(struct ib_cm_id *cm_id, struct ib_cm_event *even
 
 	cm_id->context = p;
 	p->jiffies = jiffies;
+	p->state = IPOIB_CM_RX_LIVE;
 	spin_lock_irq(&priv->lock);
 	list_add(&p->list, &priv->cm.passive_ids);
 	spin_unlock_irq(&priv->lock);
@@ -276,7 +321,6 @@ static int ipoib_cm_rx_handler(struct ib_cm_id *cm_id,
 {
 	struct ipoib_cm_rx *p;
 	struct ipoib_dev_priv *priv;
-	int ret;
 
 	switch (event->event) {
 	case IB_CM_REQ_RECEIVED:
@@ -288,20 +332,9 @@ static int ipoib_cm_rx_handler(struct ib_cm_id *cm_id,
 	case IB_CM_REJ_RECEIVED:
 		p = cm_id->context;
 		priv = netdev_priv(p->dev);
-		spin_lock_irq(&priv->lock);
-		if (list_empty(&p->list))
-			ret = 0; /* Connection is going away already. */
-		else {
-			list_del_init(&p->list);
-			ret = -ECONNRESET;
-		}
-		spin_unlock_irq(&priv->lock);
-		if (ret) {
-			ib_destroy_qp(p->qp);
-			kfree(p);
-			return ret;
-		}
-		return 0;
+		if (ib_modify_qp(p->qp, &ipoib_cm_err_attr, IB_QP_STATE))
+			ipoib_warn(priv, "unable to move qp to error state\n");
+		/* Fall through */
 	default:
 		return 0;
 	}
@@ -353,8 +386,11 @@ void ipoib_cm_handle_rx_wc(struct net_device *dev, struct ib_wc *wc)
 		       wr_id, wc->status);
 
 	if (unlikely(wr_id >= ipoib_recvq_size)) {
-		ipoib_warn(priv, "cm recv completion event with wrid %d (> %d)\n",
-			   wr_id, ipoib_recvq_size);
+		if (wr_id == IPOIB_CM_RX_DRAIN_WRID)
+			queue_work(ipoib_workqueue, &priv->cm.rx_drain_task);
+		else
+			ipoib_warn(priv, "cm recv completion event with wrid %d (> %d)\n",
+				   wr_id, ipoib_recvq_size);
 		return;
 	}
 
@@ -373,9 +409,9 @@ void ipoib_cm_handle_rx_wc(struct net_device *dev, struct ib_wc *wc)
 		if (p && time_after_eq(jiffies, p->jiffies + IPOIB_CM_RX_UPDATE_TIME)) {
 			spin_lock_irqsave(&priv->lock, flags);
 			p->jiffies = jiffies;
-			/* Move this entry to list head, but do
-			 * not re-add it if it has been removed. */
-			if (!list_empty(&p->list))
+			/* Move this entry to list head, but do not re-add it
+			 * if it has been moved out of list. */
+			if (p->state == IPOIB_CM_RX_LIVE)
 				list_move(&p->list, &priv->cm.passive_ids);
 			spin_unlock_irqrestore(&priv->lock, flags);
 			queue_delayed_work(ipoib_workqueue,
@@ -584,17 +620,54 @@ static void ipoib_cm_tx_completion(struct ib_cq *cq, void *tx_ptr)
 int ipoib_cm_dev_open(struct net_device *dev)
 {
 	struct ipoib_dev_priv *priv = netdev_priv(dev);
+	struct ib_qp_attr qp_attr;
+	struct ib_qp_init_attr qp_init_attr = {
+		.send_cq = priv->cq,
+		.recv_cq = priv->cq, /* does not matter, we never get anything */
+		.srq = priv->cm.srq, /* does not matter, we never get anything */
+		.cap.max_send_wr = 1,
+		.cap.max_send_sge = 1, /* FIXME: 0 Seems not to work */
+		.sq_sig_type = IB_SIGNAL_ALL_WR,
+		.qp_type = IB_QPT_UC,
+	};
 	int ret;
 
 	if (!IPOIB_CM_SUPPORTED(dev->dev_addr))
 		return 0;
 
+	priv->cm.rx_drain_qp = ib_create_qp(priv->pd, &qp_init_attr);
+	if (IS_ERR(priv->cm.rx_drain_qp)) {
+		printk(KERN_WARNING "%s: failed to create CM ID\n", priv->ca->name);
+		ret = PTR_ERR(priv->cm.rx_drain_qp);
+		return ret;
+	}
+
+	qp_attr.qp_state = IB_QPS_INIT;
+	qp_attr.port_num = priv->port;
+	qp_attr.qkey = 0;
+	qp_attr.qp_access_flags = 0;
+	ret = ib_modify_qp(priv->cm.rx_drain_qp, &qp_attr,
+			   IB_QP_STATE | IB_QP_ACCESS_FLAGS | IB_QP_PORT | IB_QP_QKEY);
+	if (ret) {
+		ipoib_warn(priv, "failed to modify drain QP to INIT: %d\n", ret);
+		goto err_qp;
+	}
+
+	/* We put the QP in error state directly: this way, hardware
+	 * will immediately generate WC for each WR we post, without
+	 * sending anything on the wire. */
+	qp_attr.qp_state = IB_QPS_ERR;
+	ret = ib_modify_qp(priv->cm.rx_drain_qp, &qp_attr, IB_QP_STATE);
+	if (ret) {
+		ipoib_warn(priv, "failed to modify drain QP to error: %d\n", ret);
+		goto err_qp;
+	}
+
 	priv->cm.id = ib_create_cm_id(priv->ca, ipoib_cm_rx_handler, dev);
 	if (IS_ERR(priv->cm.id)) {
 		printk(KERN_WARNING "%s: failed to create CM ID\n", priv->ca->name);
 		ret = PTR_ERR(priv->cm.id);
-		priv->cm.id = NULL;
-		return ret;
+		goto err_cm;
 	}
 
 	ret = ib_cm_listen(priv->cm.id, cpu_to_be64(IPOIB_CM_IETF_ID | priv->qp->qp_num),
@@ -602,35 +675,77 @@ int ipoib_cm_dev_open(struct net_device *dev)
 	if (ret) {
 		printk(KERN_WARNING "%s: failed to listen on ID 0x%llx\n", priv->ca->name,
 		       IPOIB_CM_IETF_ID | priv->qp->qp_num);
-		ib_destroy_cm_id(priv->cm.id);
-		priv->cm.id = NULL;
-		return ret;
+		goto err_listen;
 	}
+
 	return 0;
+
+err_listen:
+	ib_destroy_cm_id(priv->cm.id);
+err_cm:
+	priv->cm.id = NULL;
+err_qp:
+	ib_destroy_qp(priv->cm.rx_drain_qp);
+	return ret;
 }
 
 void ipoib_cm_dev_stop(struct net_device *dev)
 {
 	struct ipoib_dev_priv *priv = netdev_priv(dev);
-	struct ipoib_cm_rx *p;
+	struct ipoib_cm_rx *p, *n;
+	unsigned long begin;
+	LIST_HEAD(list);
+	int ret;
 
 	if (!IPOIB_CM_SUPPORTED(dev->dev_addr) || !priv->cm.id)
 		return;
 
 	ib_destroy_cm_id(priv->cm.id);
 	priv->cm.id = NULL;
+
 	spin_lock_irq(&priv->lock);
 	while (!list_empty(&priv->cm.passive_ids)) {
 		p = list_entry(priv->cm.passive_ids.next, typeof(*p), list);
-		list_del_init(&p->list);
+		list_move(&p->list, &priv->cm.rx_error_list);
+		p->state = IPOIB_CM_RX_ERROR;
 		spin_unlock_irq(&priv->lock);
+		ret = ib_modify_qp(p->qp, &ipoib_cm_err_attr, IB_QP_STATE);
+		if (ret)
+			ipoib_warn(priv, "unable to move qp to error state: %d\n", ret);
+		spin_lock_irq(&priv->lock);
+	}
+
+	/* Wait for all RX to be drained */
+	begin = jiffies;
+
+	while (!list_empty(&priv->cm.rx_error_list) ||
+	       !list_empty(&priv->cm.rx_flush_list) ||
+	       !list_empty(&priv->cm.rx_drain_list)) {
+		if (!time_after(jiffies, begin + 5 * HZ)) {
+			ipoib_warn(priv, "RX drain timing out\n");
+
+			/*
+			 * assume the HW is wedged and just free up everything.
+			 */
+			list_splice_init(&priv->cm.rx_flush_list, &list);
+			list_splice_init(&priv->cm.rx_error_list, &list);
+			list_splice_init(&priv->cm.rx_drain_list, &list);
+			break;
+		}
+		spin_unlock_irq(&priv->lock);
+		msleep(1);
+		spin_lock_irq(&priv->lock);
+	}
+
+	spin_unlock_irq(&priv->lock);
+
+	list_for_each_entry_safe(p, n, &list, list) {
 		ib_destroy_cm_id(p->id);
 		ib_destroy_qp(p->qp);
 		kfree(p);
-		spin_lock_irq(&priv->lock);
 	}
-	spin_unlock_irq(&priv->lock);
 
+	ib_destroy_qp(priv->cm.rx_drain_qp);
 	cancel_delayed_work(&priv->cm.stale_task);
 }
 
@@ -1080,24 +1195,45 @@ void ipoib_cm_skb_too_long(struct net_device* dev, struct sk_buff *skb,
 		queue_work(ipoib_workqueue, &priv->cm.skb_task);
 }
 
+static void ipoib_cm_rx_drain_task(struct work_struct *work)
+{
+	struct ipoib_dev_priv *priv = container_of(work, struct ipoib_dev_priv,
+						   cm.rx_drain_task);
+	struct ipoib_cm_rx *p, *n;
+	LIST_HEAD(list);
+
+	spin_lock_irq(&priv->lock);
+	list_splice_init(&priv->cm.rx_drain_list, &list);
+	ipoib_cm_start_rx_drain(priv);
+	spin_unlock_irq(&priv->lock);
+
+	list_for_each_entry_safe(p, n, &list, list) {
+		ib_destroy_cm_id(p->id);
+		ib_destroy_qp(p->qp);
+		kfree(p);
+	}
+}
+
 static void ipoib_cm_stale_task(struct work_struct *work)
 {
 	struct ipoib_dev_priv *priv = container_of(work, struct ipoib_dev_priv,
 						   cm.stale_task.work);
 	struct ipoib_cm_rx *p;
+	int ret;
 
 	spin_lock_irq(&priv->lock);
 	while (!list_empty(&priv->cm.passive_ids)) {
-		/* List if sorted by LRU, start from tail,
+		/* List is sorted by LRU, start from tail,
 		 * stop when we see a recently used entry */
 		p = list_entry(priv->cm.passive_ids.prev, typeof(*p), list);
 		if (time_before_eq(jiffies, p->jiffies + IPOIB_CM_RX_TIMEOUT))
 			break;
-		list_del_init(&p->list);
+		list_move(&p->list, &priv->cm.rx_error_list);
+		p->state = IPOIB_CM_RX_ERROR;
 		spin_unlock_irq(&priv->lock);
-		ib_destroy_cm_id(p->id);
-		ib_destroy_qp(p->qp);
-		kfree(p);
+		ret = ib_modify_qp(p->qp, &ipoib_cm_err_attr, IB_QP_STATE);
+		if (ret)
+			ipoib_warn(priv, "unable to move qp to error state: %d\n", ret);
 		spin_lock_irq(&priv->lock);
 	}
 	spin_unlock_irq(&priv->lock);
@@ -1161,9 +1297,13 @@ int ipoib_cm_dev_init(struct net_device *dev)
 	INIT_LIST_HEAD(&priv->cm.passive_ids);
 	INIT_LIST_HEAD(&priv->cm.reap_list);
 	INIT_LIST_HEAD(&priv->cm.start_list);
+	INIT_LIST_HEAD(&priv->cm.rx_error_list);
+	INIT_LIST_HEAD(&priv->cm.rx_flush_list);
+	INIT_LIST_HEAD(&priv->cm.rx_drain_list);
 	INIT_WORK(&priv->cm.start_task, ipoib_cm_tx_start);
 	INIT_WORK(&priv->cm.reap_task, ipoib_cm_tx_reap);
 	INIT_WORK(&priv->cm.skb_task, ipoib_cm_skb_reap);
+	INIT_WORK(&priv->cm.rx_drain_task, ipoib_cm_rx_drain_task);
 	INIT_DELAYED_WORK(&priv->cm.stale_task, ipoib_cm_stale_task);
 
 	skb_queue_head_init(&priv->cm.skb_queue);
diff --git a/drivers/infiniband/ulp/ipoib/ipoib_verbs.c b/drivers/infiniband/ulp/ipoib/ipoib_verbs.c
index 5c3c6a4..af8a6d4 100644
--- a/drivers/infiniband/ulp/ipoib/ipoib_verbs.c
+++ b/drivers/infiniband/ulp/ipoib/ipoib_verbs.c
@@ -185,7 +185,7 @@ int ipoib_transport_dev_init(struct net_device *dev, struct ib_device *ca)
 	size = ipoib_sendq_size + ipoib_recvq_size + 1;
 	ret = ipoib_cm_dev_init(dev);
 	if (!ret)
-		size += ipoib_recvq_size;
+		size += ipoib_recvq_size + 1 /* 1 extra for rx_drain_qp */;
 
 	priv->cq = ib_create_cq(priv->ca, ipoib_ib_completion, NULL, dev, size, 0);
 	if (IS_ERR(priv->cq)) {
-- 
MST



More information about the general mailing list