[ofa-general] [PATCH RFC/untested v0] IPoIB/CM: fix SRQ WR leak

Michael S. Tsirkin mst at dev.mellanox.co.il
Tue May 15 14:04:53 PDT 2007


If the Consumer does not wait for the Affiliated Asynchronous Last WQE Reached
Event, then WQE and Data Segment leakage may occur.
This leakage has been observed with IPoIB/CM: flipping ports on and off will,
with time, leak out all WRs and then all connections will start getting RNR
NACKs. Fix this in the way suggested by spec: create a "drain qp" in error state,
wait for last wqe reached event on a qp and then post send on "drain QP".
Once we observe a completion on the drain QP, it's safe to call ib_destroy_qp.

---

Roland, all. Here's a largish, and untested, patch that fixes a design bug in
the way IPoIB/CM destroyed passive connections.

Unfortunately, doing it by the spec kind of forces us to add a "state"
for passive connections, and split the passive list per connection state.
That's why the patch grew to be so large.

I expect to post a fully tested version by beginning of next week. The issue
addressed is very severe (work-around is to unload the ipoib module once in a
while) and I do think we need this fixed in 2.6.22, so
given how large the patch is, I'd like to ask everyone to review and comment.

NB: this is on top of 2.6.22-rc1.

diff --git a/drivers/infiniband/ulp/ipoib/ipoib.h b/drivers/infiniband/ulp/ipoib/ipoib.h
index 87310ee..e300c75 100644
--- a/drivers/infiniband/ulp/ipoib/ipoib.h
+++ b/drivers/infiniband/ulp/ipoib/ipoib.h
@@ -132,12 +132,39 @@ struct ipoib_cm_data {
 	__be32 mtu;
 };
 
+/* Quoting spec:
+ *
+ * If the Consumer does not wait for the Affiliated Asynchronous Last WQE Reached
+ * Event, then WQE and Data Segment leakage may occur. Therefore, it is good
+ * programming practice to tear down a QP that is associated with an SRQ by using
+ * the following process:
+ *
+ *
+ * Put the QP in the Error State
+ * Wait for the Affiliated Asynchronous Last WQE Reached Event;
+ * either:
+ *       drain the CQ by invoking the Poll CQ verb and either wait for CQ
+ *       to be empty or the number of Poll CQ operations has exceeded
+ *       CQ capacity size;
+ * or
+ *       post another WR that completes on the same CQ and wait for this
+ *       WR to return as a WC;
+ * and then invoke a Destroy QP or Reset QP.
+ */
+
+enum ipoib_cm_state {
+	IPOIB_CM_RX_LIVE,
+	IPOIB_CM_RX_ERROR, /* Ignored by stale task */
+	IPOIB_CM_RX_FLUSH  /* Last WQE Reached event observed */
+};
+
 struct ipoib_cm_rx {
 	struct ib_cm_id     *id;
 	struct ib_qp        *qp;
 	struct list_head     list;
 	struct net_device   *dev;
 	unsigned long        jiffies;
+	enum ipoib_cm_state  state;
 };
 
 struct ipoib_cm_tx {
@@ -165,10 +192,15 @@ struct ipoib_cm_dev_priv {
 	struct ib_srq  	       *srq;
 	struct ipoib_cm_rx_buf *srq_ring;
 	struct ib_cm_id        *id;
-	struct list_head        passive_ids;
+	struct ib_qp           *rx_drain_qp;
+	struct list_head        passive_ids;   /* state: LIVE */
+	struct list_head        rx_error_list; /* state: ERROR */
+	struct list_head        rx_flush_list; /* state: FLUSH, drain not started */
+	struct list_head        rx_drain_list; /* state: FLUSH, drain started */
 	struct work_struct      start_task;
 	struct work_struct      reap_task;
 	struct work_struct      skb_task;
+	struct work_struct      rx_drain_task;
 	struct delayed_work     stale_task;
 	struct sk_buff_head     skb_queue;
 	struct list_head        start_list;
diff --git a/drivers/infiniband/ulp/ipoib/ipoib_cm.c b/drivers/infiniband/ulp/ipoib/ipoib_cm.c
index 785bc85..f6a1405 100644
--- a/drivers/infiniband/ulp/ipoib/ipoib_cm.c
+++ b/drivers/infiniband/ulp/ipoib/ipoib_cm.c
@@ -62,6 +62,17 @@ struct ipoib_cm_id {
 	u32 remote_mtu;
 };
 
+static struct ib_qp_attr ipoib_cm_err_attr __read_mostly = {
+	.qp_state = IB_QPS_ERR
+};
+
+#define IPOIB_CM_RX_DRAIN_WRID 0x7fffffff
+
+static struct ib_send_wr ipoib_cm_rx_drain_wr __read_mostly = {
+	.wr_id = 0xfff /* todo */,
+	.opcode = IB_WR_SEND
+};
+
 static int ipoib_cm_tx_handler(struct ib_cm_id *cm_id,
 			       struct ib_cm_event *event);
 
@@ -150,11 +161,42 @@ partial_error:
 	return NULL;
 }
 
+static void ipoib_cm_rx_drain(struct ipoib_dev_priv* priv)
+{
+	struct ib_send_wr *bad_send_wr;
+
+	if (list_empty(&priv->cm.rx_flush_list) ||
+	    !list_empty(&priv->cm.rx_drain_list))
+		return;
+
+	if (ib_post_send(priv->cm.rx_drain_qp, &ipoib_cm_rx_drain_wr, &bad_send_wr))
+		ipoib_warn(priv, "failed to start rx flush\n");
+
+	list_splice_init(&priv->cm.rx_flush_list, &priv->cm.rx_drain_list);
+}
+
+static void ipoib_cm_rx_event_handler(struct ib_event *event, void *ctx)
+{
+	struct ipoib_cm_rx *p = ctx;
+	struct ipoib_dev_priv *priv = netdev_priv(p->dev);
+	unsigned long flags;
+
+	if (event->event != IB_EVENT_QP_LAST_WQE_REACHED)
+		return;
+
+	spin_lock_irqsave(&priv->lock, flags);
+	list_move(&p->list, &priv->cm.rx_flush_list);
+	p->state = IPOIB_CM_RX_FLUSH;
+	ipoib_cm_rx_drain(priv);
+	spin_unlock_irqrestore(&priv->lock, flags);
+}
+
 static struct ib_qp *ipoib_cm_create_rx_qp(struct net_device *dev,
 					   struct ipoib_cm_rx *p)
 {
 	struct ipoib_dev_priv *priv = netdev_priv(dev);
 	struct ib_qp_init_attr attr = {
+		.event_handler = ipoib_cm_rx_event_handler,
 		.send_cq = priv->cq, /* does not matter, we never send anything */
 		.recv_cq = priv->cq,
 		.srq = priv->cm.srq,
@@ -256,6 +298,7 @@ static int ipoib_cm_req_handler(struct ib_cm_id *cm_id, struct ib_cm_event *even
 
 	cm_id->context = p;
 	p->jiffies = jiffies;
+	p->state = IPOIB_CM_RX_LIVE;
 	spin_lock_irq(&priv->lock);
 	list_add(&p->list, &priv->cm.passive_ids);
 	spin_unlock_irq(&priv->lock);
@@ -271,12 +314,12 @@ err_qp:
 	return ret;
 }
 
+
 static int ipoib_cm_rx_handler(struct ib_cm_id *cm_id,
 			       struct ib_cm_event *event)
 {
 	struct ipoib_cm_rx *p;
 	struct ipoib_dev_priv *priv;
-	int ret;
 
 	switch (event->event) {
 	case IB_CM_REQ_RECEIVED:
@@ -288,20 +331,9 @@ static int ipoib_cm_rx_handler(struct ib_cm_id *cm_id,
 	case IB_CM_REJ_RECEIVED:
 		p = cm_id->context;
 		priv = netdev_priv(p->dev);
-		spin_lock_irq(&priv->lock);
-		if (list_empty(&p->list))
-			ret = 0; /* Connection is going away already. */
-		else {
-			list_del_init(&p->list);
-			ret = -ECONNRESET;
-		}
-		spin_unlock_irq(&priv->lock);
-		if (ret) {
-			ib_destroy_qp(p->qp);
-			kfree(p);
-			return ret;
-		}
-		return 0;
+		if (ib_modify_qp(p->qp, &ipoib_cm_err_attr, IB_QP_STATE))
+			ipoib_warn(priv, "unable to move qp to error state\n");
+		/* Fall through */
 	default:
 		return 0;
 	}
@@ -353,8 +385,11 @@ void ipoib_cm_handle_rx_wc(struct net_device *dev, struct ib_wc *wc)
 		       wr_id, wc->status);
 
 	if (unlikely(wr_id >= ipoib_recvq_size)) {
-		ipoib_warn(priv, "cm recv completion event with wrid %d (> %d)\n",
-			   wr_id, ipoib_recvq_size);
+		if (wr_id == IPOIB_CM_RX_DRAIN_WRID)
+			queue_work(ipoib_workqueue, &priv->cm.rx_drain_task);
+		else
+			ipoib_warn(priv, "cm recv completion event with wrid %d (> %d)\n",
+				   wr_id, ipoib_recvq_size);
 		return;
 	}
 
@@ -373,9 +408,9 @@ void ipoib_cm_handle_rx_wc(struct net_device *dev, struct ib_wc *wc)
 		if (p && time_after_eq(jiffies, p->jiffies + IPOIB_CM_RX_UPDATE_TIME)) {
 			spin_lock_irqsave(&priv->lock, flags);
 			p->jiffies = jiffies;
-			/* Move this entry to list head, but do
-			 * not re-add it if it has been removed. */
-			if (!list_empty(&p->list))
+			/* Move this entry to list head, but do not re-add it
+			 * if it has been moved out of list. */
+			if (p->state == IPOIB_CM_RX_LIVE)
 				list_move(&p->list, &priv->cm.passive_ids);
 			spin_unlock_irqrestore(&priv->lock, flags);
 			queue_delayed_work(ipoib_workqueue,
@@ -584,17 +619,40 @@ static void ipoib_cm_tx_completion(struct ib_cq *cq, void *tx_ptr)
 int ipoib_cm_dev_open(struct net_device *dev)
 {
 	struct ipoib_dev_priv *priv = netdev_priv(dev);
+	struct ib_qp_attr qp_attr;
 	int ret;
 
 	if (!IPOIB_CM_SUPPORTED(dev->dev_addr))
 		return 0;
 
+	priv->cm.rx_drain_qp = ipoib_cm_create_rx_qp(dev, NULL);
+	if (IS_ERR(priv->cm.rx_drain_qp)) {
+		printk(KERN_WARNING "%s: failed to create CM ID\n", priv->ca->name);
+		ret = PTR_ERR(priv->cm.rx_drain_qp);
+		return ret;
+	}
+
+	qp_attr.qp_state = IB_QPS_INIT;
+	qp_attr.port_num = priv->port;
+	qp_attr.qkey = 0;
+	qp_attr.qp_access_flags = 0;
+	ret = ib_modify_qp(priv->cm.rx_drain_qp, &qp_attr,
+			   IB_QP_STATE | IB_QP_ACCESS_FLAGS | IB_QP_PORT | IB_QP_QKEY);
+	if (ret) {
+		ipoib_warn(priv, "failed to modify drain QP to INIT: %d\n", ret);
+		goto err_qp;
+	}
+	qp_attr.qp_state = IB_QPS_ERR;
+	ret = ib_modify_qp(priv->cm.rx_drain_qp, &qp_attr, IB_QP_STATE);
+	if (ret) {
+		ipoib_warn(priv, "failed to modify drain QP to error: %d\n", ret);
+		goto err_qp;
+	}
+
 	priv->cm.id = ib_create_cm_id(priv->ca, ipoib_cm_rx_handler, dev);
 	if (IS_ERR(priv->cm.id)) {
 		printk(KERN_WARNING "%s: failed to create CM ID\n", priv->ca->name);
-		ret = PTR_ERR(priv->cm.id);
-		priv->cm.id = NULL;
-		return ret;
+		goto err_cm;
 	}
 
 	ret = ib_cm_listen(priv->cm.id, cpu_to_be64(IPOIB_CM_IETF_ID | priv->qp->qp_num),
@@ -602,35 +660,76 @@ int ipoib_cm_dev_open(struct net_device *dev)
 	if (ret) {
 		printk(KERN_WARNING "%s: failed to listen on ID 0x%llx\n", priv->ca->name,
 		       IPOIB_CM_IETF_ID | priv->qp->qp_num);
-		ib_destroy_cm_id(priv->cm.id);
-		priv->cm.id = NULL;
-		return ret;
+		goto err_cm;
 	}
+
 	return 0;
+
+err_cm:
+	ib_destroy_cm_id(priv->cm.id);
+	priv->cm.id = NULL;
+err_qp:
+	ib_destroy_qp(priv->cm.rx_drain_qp);
+	return ret;
 }
 
 void ipoib_cm_dev_stop(struct net_device *dev)
 {
 	struct ipoib_dev_priv *priv = netdev_priv(dev);
-	struct ipoib_cm_rx *p;
+	struct ipoib_cm_rx *p, *n;
+	unsigned long begin;
+	LIST_HEAD(list);
+	int ret;
 
 	if (!IPOIB_CM_SUPPORTED(dev->dev_addr) || !priv->cm.id)
 		return;
 
 	ib_destroy_cm_id(priv->cm.id);
 	priv->cm.id = NULL;
+
 	spin_lock_irq(&priv->lock);
 	while (!list_empty(&priv->cm.passive_ids)) {
-		p = list_entry(priv->cm.passive_ids.next, typeof(*p), list);
-		list_del_init(&p->list);
+		p = list_entry(priv->cm.passive_ids.prev, typeof(*p), list);
+		list_move(&p->list, &priv->cm.rx_error_list);
+		p->state = IPOIB_CM_RX_ERROR;
 		spin_unlock_irq(&priv->lock);
+		ret = ib_modify_qp(p->qp, &ipoib_cm_err_attr, IB_QP_STATE);
+		if (ret)
+			ipoib_warn(priv, "unable to move qp to error state: %d\n", ret);
+		spin_lock_irq(&priv->lock);
+	}
+
+	/* Wait for all RX to be drained */
+	begin = jiffies;
+
+	while (!list_empty(&priv->cm.rx_error_list) ||
+	       !list_empty(&priv->cm.rx_flush_list) ||
+	       !list_empty(&priv->cm.rx_drain_list)) {
+		if (!time_after(jiffies, begin + 5 * HZ)) {
+			ipoib_warn(priv, "RX drain timing out\n");
+
+			/*
+			 * assume the HW is wedged and just free up everything.
+			 */
+			list_splice_init(&priv->cm.rx_flush_list, &list);
+			list_splice_init(&priv->cm.rx_error_list, &list);
+			list_splice_init(&priv->cm.rx_drain_list, &list);
+			break;
+		}
+		spin_unlock_irq(&priv->lock);
+		msleep(1);
+		spin_lock_irq(&priv->lock);
+	}
+
+	spin_unlock_irq(&priv->lock);
+
+	list_for_each_entry_safe(p, n, &list, list) {
 		ib_destroy_cm_id(p->id);
 		ib_destroy_qp(p->qp);
 		kfree(p);
-		spin_lock_irq(&priv->lock);
 	}
-	spin_unlock_irq(&priv->lock);
 
+	ib_destroy_qp(priv->cm.rx_drain_qp);
 	cancel_delayed_work(&priv->cm.stale_task);
 }
 
@@ -1080,24 +1179,45 @@ void ipoib_cm_skb_too_long(struct net_device* dev, struct sk_buff *skb,
 		queue_work(ipoib_workqueue, &priv->cm.skb_task);
 }
 
+static void ipoib_cm_rx_drain_task(struct work_struct *work)
+{
+	struct ipoib_dev_priv *priv = container_of(work, struct ipoib_dev_priv,
+						   cm.rx_drain_task);
+	struct ipoib_cm_rx *p, *n;
+	LIST_HEAD(list);
+
+	spin_lock_irq(&priv->lock);
+	list_splice_init(&priv->cm.rx_drain_list, &list);
+	ipoib_cm_rx_drain(priv);
+	spin_unlock_irq(&priv->lock);
+
+	list_for_each_entry_safe(p, n, &list, list) {
+		ib_destroy_cm_id(p->id);
+		ib_destroy_qp(p->qp);
+		kfree(p);
+	}
+}
+
 static void ipoib_cm_stale_task(struct work_struct *work)
 {
 	struct ipoib_dev_priv *priv = container_of(work, struct ipoib_dev_priv,
 						   cm.stale_task.work);
 	struct ipoib_cm_rx *p;
+	int ret;
 
 	spin_lock_irq(&priv->lock);
 	while (!list_empty(&priv->cm.passive_ids)) {
-		/* List if sorted by LRU, start from tail,
+		/* List is sorted by LRU, start from tail,
 		 * stop when we see a recently used entry */
 		p = list_entry(priv->cm.passive_ids.prev, typeof(*p), list);
 		if (time_before_eq(jiffies, p->jiffies + IPOIB_CM_RX_TIMEOUT))
 			break;
-		list_del_init(&p->list);
+		list_move(&p->list, &priv->cm.rx_error_list);
+		p->state = IPOIB_CM_RX_ERROR;
 		spin_unlock_irq(&priv->lock);
-		ib_destroy_cm_id(p->id);
-		ib_destroy_qp(p->qp);
-		kfree(p);
+		ret = ib_modify_qp(p->qp, &ipoib_cm_err_attr, IB_QP_STATE);
+		if (ret)
+			ipoib_warn(priv, "unable to move qp to error state: %d\n", ret);
 		spin_lock_irq(&priv->lock);
 	}
 	spin_unlock_irq(&priv->lock);
@@ -1161,9 +1281,12 @@ int ipoib_cm_dev_init(struct net_device *dev)
 	INIT_LIST_HEAD(&priv->cm.passive_ids);
 	INIT_LIST_HEAD(&priv->cm.reap_list);
 	INIT_LIST_HEAD(&priv->cm.start_list);
+	INIT_LIST_HEAD(&priv->cm.rx_flush_list);
+	INIT_LIST_HEAD(&priv->cm.rx_error_list);
 	INIT_WORK(&priv->cm.start_task, ipoib_cm_tx_start);
 	INIT_WORK(&priv->cm.reap_task, ipoib_cm_tx_reap);
 	INIT_WORK(&priv->cm.skb_task, ipoib_cm_skb_reap);
+	INIT_WORK(&priv->cm.rx_drain_task, ipoib_cm_rx_drain_task);
 	INIT_DELAYED_WORK(&priv->cm.stale_task, ipoib_cm_stale_task);
 
 	skb_queue_head_init(&priv->cm.skb_queue);
diff --git a/drivers/infiniband/ulp/ipoib/ipoib_verbs.c b/drivers/infiniband/ulp/ipoib/ipoib_verbs.c
index 5c3c6a4..af8a6d4 100644
--- a/drivers/infiniband/ulp/ipoib/ipoib_verbs.c
+++ b/drivers/infiniband/ulp/ipoib/ipoib_verbs.c
@@ -185,7 +185,7 @@ int ipoib_transport_dev_init(struct net_device *dev, struct ib_device *ca)
 	size = ipoib_sendq_size + ipoib_recvq_size + 1;
 	ret = ipoib_cm_dev_init(dev);
 	if (!ret)
-		size += ipoib_recvq_size;
+		size += ipoib_recvq_size + 1 /* 1 extra for rx_drain_qp */;
 
 	priv->cq = ib_create_cq(priv->ca, ipoib_ib_completion, NULL, dev, size, 0);
 	if (IS_ERR(priv->cq)) {


-- 
MST



More information about the general mailing list