[ofa-general] IPOB CM (NOSRQ) [PATCH V8] patch

Pradeep Satyanarayana pradeeps at linux.vnet.ibm.com
Thu Jul 19 11:55:39 PDT 2007


Addressed Roland's comments and more (hope this passes muster :)). 
The event_handler issue pointed out will be addressed in another patch.


Signed-off-by: Pradeep Satyanarayana <pradeeps at linux.vnet.ibm.com>
---

--- a/linux-2.6.22/drivers/infiniband/ulp/ipoib/ipoib.h	2007-05-30 14:56:25.000000000 -0400
+++ b/linux-2.6.22/drivers/infiniband/ulp/ipoib/ipoib.h	2007-07-19 11:17:39.000000000 -0400
@@ -95,11 +95,15 @@ enum {
 	IPOIB_MCAST_FLAG_ATTACHED = 3,
 };
 
+#define CM_PACKET_SIZE (ALIGN(IPOIB_CM_MTU, PAGE_SIZE))
 #define	IPOIB_OP_RECV   (1ul << 31)
 #ifdef CONFIG_INFINIBAND_IPOIB_CM
-#define	IPOIB_CM_OP_SRQ (1ul << 30)
+#define	IPOIB_CM_OP_RECV (1ul << 30)
+
+#define NOSRQ_INDEX_TABLE_SIZE 128
+#define NOSRQ_INDEX_MASK      (NOSRQ_INDEX_TABLE_SIZE -1)
 #else
-#define	IPOIB_CM_OP_SRQ (0)
+#define	IPOIB_CM_OP_RECV (0)
 #endif
 
 /* structs */
@@ -166,11 +170,14 @@ enum ipoib_cm_state {
 };
 
 struct ipoib_cm_rx {
-	struct ib_cm_id     *id;
-	struct ib_qp        *qp;
-	struct list_head     list;
-	struct net_device   *dev;
-	unsigned long        jiffies;
+	struct ib_cm_id     	*id;
+	struct ib_qp        	*qp;
+	struct ipoib_cm_rx_buf  *rx_ring; /* Used by NOSRQ only */
+	struct list_head     	 list;
+	struct net_device   	*dev;
+	unsigned long        	 jiffies;
+	u32                      index; /* wr_ids are distinguished by index
+					 * to identify the QP -NOSRQ only */
 	enum ipoib_cm_state  state;
 };
 
@@ -215,6 +222,8 @@ struct ipoib_cm_dev_priv {
 	struct ib_wc            ibwc[IPOIB_NUM_WC];
 	struct ib_sge           rx_sge[IPOIB_CM_RX_SG];
 	struct ib_recv_wr       rx_wr;
+	struct ipoib_cm_rx	**rx_index_table; /* See ipoib_cm_dev_init()
+						   *for usage of this element */
 };
 
 /*
--- a/linux-2.6.22/drivers/infiniband/ulp/ipoib/ipoib_cm.c	2007-07-10 17:02:33.000000000 -0400
+++ b/linux-2.6.22/drivers/infiniband/ulp/ipoib/ipoib_cm.c	2007-07-19 13:55:59.000000000 -0400
@@ -49,6 +49,17 @@ MODULE_PARM_DESC(cm_data_debug_level,
 
 #include "ipoib.h"
 
+static int max_rc_qp = NOSRQ_INDEX_TABLE_SIZE;
+static int max_recv_buf = 1024; /* Default is 1024 MB */
+
+module_param_named(nosrq_max_rc_qp, max_rc_qp, int, 0644);
+MODULE_PARM_DESC(nosrq_max_rc_qp, "Max number of NOSRQ RC QPs supported");
+
+module_param_named(max_receive_buffer, max_recv_buf, int, 0644);
+MODULE_PARM_DESC(max_receive_buffer, "Max Receive Buffer Size in MB");
+
+static atomic_t current_rc_qp = ATOMIC_INIT(0); /* Active number of RC QPs for NOSRQ */
+
 #define IPOIB_CM_IETF_ID 0x1000000000000000ULL
 
 #define IPOIB_CM_RX_UPDATE_TIME (256 * HZ)
@@ -81,20 +92,21 @@ static void ipoib_cm_dma_unmap_rx(struct
 		ib_dma_unmap_single(priv->ca, mapping[i + 1], PAGE_SIZE, DMA_FROM_DEVICE);
 }
 
-static int ipoib_cm_post_receive(struct net_device *dev, int id)
+static int post_receive_srq(struct net_device *dev, u64 id)
 {
 	struct ipoib_dev_priv *priv = netdev_priv(dev);
 	struct ib_recv_wr *bad_wr;
 	int i, ret;
 
-	priv->cm.rx_wr.wr_id = id | IPOIB_CM_OP_SRQ;
+	priv->cm.rx_wr.wr_id = id | IPOIB_CM_OP_RECV;
 
 	for (i = 0; i < IPOIB_CM_RX_SG; ++i)
 		priv->cm.rx_sge[i].addr = priv->cm.srq_ring[id].mapping[i];
 
 	ret = ib_post_srq_recv(priv->cm.srq, &priv->cm.rx_wr, &bad_wr);
 	if (unlikely(ret)) {
-		ipoib_warn(priv, "post srq failed for buf %d (%d)\n", id, ret);
+		ipoib_warn(priv, "post srq failed for buf %lld (%d)\n",
+			   (unsigned long long)id, ret);
 		ipoib_cm_dma_unmap_rx(priv, IPOIB_CM_RX_SG - 1,
 				      priv->cm.srq_ring[id].mapping);
 		dev_kfree_skb_any(priv->cm.srq_ring[id].skb);
@@ -104,12 +116,47 @@ static int ipoib_cm_post_receive(struct 
 	return ret;
 }
 
-static struct sk_buff *ipoib_cm_alloc_rx_skb(struct net_device *dev, int id, int frags,
+static int post_receive_nosrq(struct net_device *dev, u64 id)
+{
+	struct ipoib_dev_priv *priv = netdev_priv(dev);
+	struct ib_recv_wr *bad_wr;
+	int i, ret;
+	u32 index;
+	u32 wr_id;
+	struct ipoib_cm_rx *rx_ptr;
+
+	index = id  & NOSRQ_INDEX_MASK ;
+	wr_id = id >> 32;
+
+	rx_ptr = priv->cm.rx_index_table[index];
+
+	priv->cm.rx_wr.wr_id = id | IPOIB_CM_OP_RECV;
+
+	for (i = 0; i < IPOIB_CM_RX_SG; ++i)
+		priv->cm.rx_sge[i].addr = rx_ptr->rx_ring[wr_id].mapping[i];
+
+	ret = ib_post_recv(rx_ptr->qp, &priv->cm.rx_wr, &bad_wr);
+	if (unlikely(ret)) {
+		ipoib_warn(priv, "post recv failed for buf %d (%d)\n",
+			   wr_id, ret);
+		ipoib_cm_dma_unmap_rx(priv, IPOIB_CM_RX_SG - 1,
+				      rx_ptr->rx_ring[wr_id].mapping);
+		dev_kfree_skb_any(rx_ptr->rx_ring[wr_id].skb);
+		rx_ptr->rx_ring[wr_id].skb = NULL;
+	}
+
+	return ret;
+}
+
+static struct sk_buff *ipoib_cm_alloc_rx_skb(struct net_device *dev, u64 id,
+					     int frags,
 					     u64 mapping[IPOIB_CM_RX_SG])
 {
 	struct ipoib_dev_priv *priv = netdev_priv(dev);
 	struct sk_buff *skb;
 	int i;
+	struct ipoib_cm_rx *rx_ptr;
+	u32 index, wr_id;
 
 	skb = dev_alloc_skb(IPOIB_CM_HEAD_SIZE + 12);
 	if (unlikely(!skb))
@@ -141,7 +188,14 @@ static struct sk_buff *ipoib_cm_alloc_rx
 			goto partial_error;
 	}
 
-	priv->cm.srq_ring[id].skb = skb;
+	if (priv->cm.srq)
+		priv->cm.srq_ring[id].skb = skb;
+	else {
+		index = id  & NOSRQ_INDEX_MASK ;
+		wr_id = id >> 32;
+		rx_ptr = priv->cm.rx_index_table[index];
+		rx_ptr->rx_ring[wr_id].skb = skb;
+	}
 	return skb;
 
 partial_error:
@@ -198,16 +252,21 @@ static struct ib_qp *ipoib_cm_create_rx_
 {
 	struct ipoib_dev_priv *priv = netdev_priv(dev);
 	struct ib_qp_init_attr attr = {
-		.event_handler = ipoib_cm_rx_event_handler,
 		.send_cq = priv->cq, /* For drain WR */
 		.recv_cq = priv->cq,
 		.srq = priv->cm.srq,
 		.cap.max_send_wr = 1, /* For drain WR */
+		.cap.max_recv_wr = ipoib_recvq_size + 1,
 		.cap.max_send_sge = 1, /* FIXME: 0 Seems not to work */
 		.sq_sig_type = IB_SIGNAL_ALL_WR,
 		.qp_type = IB_QPT_RC,
 		.qp_context = p,
 	};
+	if (!priv->cm.srq) {
+		attr.cap.max_recv_sge = IPOIB_CM_RX_SG;
+		attr.event_handler = NULL;
+	} else
+		attr.event_handler = ipoib_cm_rx_event_handler;
 	return ib_create_qp(priv->pd, &attr);
 }
 
@@ -282,12 +341,129 @@ static int ipoib_cm_send_rep(struct net_
 	rep.flow_control = 0;
 	rep.rnr_retry_count = req->rnr_retry_count;
 	rep.target_ack_delay = 20; /* FIXME */
-	rep.srq = 1;
 	rep.qp_num = qp->qp_num;
 	rep.starting_psn = psn;
+	rep.srq	= !!priv->cm.srq;
 	return ib_send_cm_rep(cm_id, &rep);
 }
 
+static void init_context_and_add_list(struct ib_cm_id *cm_id,
+				    struct ipoib_cm_rx *p,
+				    struct ipoib_dev_priv *priv)
+{
+	cm_id->context = p;
+	p->jiffies = jiffies;
+	spin_lock_irq(&priv->lock);
+	if (list_empty(&priv->cm.passive_ids))
+		queue_delayed_work(ipoib_workqueue,
+				   &priv->cm.stale_task, IPOIB_CM_RX_DELAY);
+	if (priv->cm.srq) {
+		/* Add this entry to passive ids list head, but do not re-add
+		 * it if IB_EVENT_QP_LAST_WQE_REACHED has moved it to flush
+		 * list.
+		 */
+		if (p->state == IPOIB_CM_RX_LIVE)
+			list_move(&p->list, &priv->cm.passive_ids);
+	}
+	spin_unlock_irq(&priv->lock);
+}
+
+static int allocate_and_post_rbuf_nosrq(struct ib_cm_id *cm_id,
+					struct ipoib_cm_rx *p, unsigned psn)
+{
+	struct net_device *dev = cm_id->context;
+	struct ipoib_dev_priv *priv = netdev_priv(dev);
+	int ret;
+	u32 qp_num, index;
+	u64 i, recv_mem_used;
+
+	qp_num = p->qp->qp_num;
+
+	/* In the SRQ case there is a common rx buffer called the srq_ring.
+	 * However, for the NOSRQ we create an rx_ring for every
+	 * struct ipoib_cm_rx.
+	 */
+	p->rx_ring = kzalloc(ipoib_recvq_size * sizeof *p->rx_ring, GFP_KERNEL);
+	if (!p->rx_ring) {
+		printk(KERN_WARNING "Failed to allocate rx_ring for 0x%x\n",
+		       qp_num);
+		return -ENOMEM;
+	}
+
+	spin_lock_irq(&priv->lock);
+	list_add(&p->list, &priv->cm.passive_ids);
+	spin_unlock_irq(&priv->lock);
+
+	init_context_and_add_list(cm_id, p, priv);
+	spin_lock_irq(&priv->lock);
+
+	for (index = 0; index < max_rc_qp; index++)
+		if (priv->cm.rx_index_table[index] == NULL)
+			break;
+
+	recv_mem_used = (u64)ipoib_recvq_size *
+			(u64)atomic_inc_return(&current_rc_qp) * CM_PACKET_SIZE;
+	if ((index == max_rc_qp) ||
+	    (recv_mem_used >= max_recv_buf * (1ul << 20))) {
+		spin_unlock_irq(&priv->lock);
+		ipoib_warn(priv, "NOSRQ has reached the configurable limit "
+			   "of either %d RC QPs or, max recv buf size of "
+			   "0x%x MB\n", max_rc_qp, max_recv_buf);
+
+		/* We send a REJ to the remote side indicating that we
+		 * have no more free RC QPs and leave it to the remote side
+		 * to take appropriate action. This should leave the
+		 * current set of QPs unaffected and any subsequent REQs
+		 * will be able to use RC QPs if they are available.
+		 */
+		ib_send_cm_rej(cm_id, IB_CM_REJ_NO_QP, NULL, 0, NULL, 0);
+		ret = -EINVAL;
+		goto err_alloc_and_post;
+	}
+
+	priv->cm.rx_index_table[index] = p;
+	spin_unlock_irq(&priv->lock);
+
+	/* We will subsequently use this stored pointer while freeing
+	 * resources in stale task
+	 */
+	p->index = index;
+
+	ret = ipoib_cm_modify_rx_qp(dev, cm_id, p->qp, psn);
+	if (ret) {
+		ipoib_warn(priv, "ipoib_cm_modify_rx_qp() failed %d\n", ret);
+		ipoib_cm_dev_cleanup(dev);
+		goto err_alloc_and_post;
+	}
+
+	for (i = 0; i < ipoib_recvq_size; ++i) {
+		if (!ipoib_cm_alloc_rx_skb(dev, i << 32 | index,
+					   IPOIB_CM_RX_SG - 1,
+					   p->rx_ring[i].mapping)) {
+			ipoib_warn(priv, "failed to allocate receive "
+				   "buffer %d\n", (int)i);
+			ipoib_cm_dev_cleanup(dev);
+			ret = -ENOMEM;
+			goto err_alloc_and_post;
+		}
+
+		if (post_receive_nosrq(dev, i << 32 | index)) {
+			ipoib_warn(priv, "post_receive_nosrq "
+				   "failed for  buf %lld\n", (unsigned long long)i);
+			ipoib_cm_dev_cleanup(dev);
+			ret = -EIO;
+			goto err_alloc_and_post;
+		}
+	}
+
+	return 0;
+
+err_alloc_and_post:
+	atomic_dec(&current_rc_qp);
+	kfree(p->rx_ring);
+	return ret;
+}
+
 static int ipoib_cm_req_handler(struct ib_cm_id *cm_id, struct ib_cm_event *event)
 {
 	struct net_device *dev = cm_id->context;
@@ -302,9 +478,6 @@ static int ipoib_cm_req_handler(struct i
 		return -ENOMEM;
 	p->dev = dev;
 	p->id = cm_id;
-	cm_id->context = p;
-	p->state = IPOIB_CM_RX_LIVE;
-	p->jiffies = jiffies;
 	INIT_LIST_HEAD(&p->list);
 
 	p->qp = ipoib_cm_create_rx_qp(dev, p);
@@ -314,19 +487,21 @@ static int ipoib_cm_req_handler(struct i
 	}
 
 	psn = random32() & 0xffffff;
-	ret = ipoib_cm_modify_rx_qp(dev, cm_id, p->qp, psn);
-	if (ret)
-		goto err_modify;
+	if (!priv->cm.srq) {
+		ret = allocate_and_post_rbuf_nosrq(cm_id, p, psn);
+		if (ret)
+			goto err_post_nosrq;
+	} else {
+		p->rx_ring = NULL;
+		ret = ipoib_cm_modify_rx_qp(dev, cm_id, p->qp, psn);
+		if (ret)
+			goto err_modify;
+	}
 
-	spin_lock_irq(&priv->lock);
-	queue_delayed_work(ipoib_workqueue,
-			   &priv->cm.stale_task, IPOIB_CM_RX_DELAY);
-	/* Add this entry to passive ids list head, but do not re-add it
-	 * if IB_EVENT_QP_LAST_WQE_REACHED has moved it to flush list. */
-	p->jiffies = jiffies;
-	if (p->state == IPOIB_CM_RX_LIVE)
-		list_move(&p->list, &priv->cm.passive_ids);
-	spin_unlock_irq(&priv->lock);
+	if (priv->cm.srq) {
+		p->state = IPOIB_CM_RX_LIVE;
+		init_context_and_add_list(cm_id, p, priv);
+	}
 
 	ret = ipoib_cm_send_rep(dev, cm_id, p->qp, &event->param.req_rcvd, psn);
 	if (ret) {
@@ -336,6 +511,8 @@ static int ipoib_cm_req_handler(struct i
 	}
 	return 0;
 
+err_post_nosrq:
+	list_del_init(&p->list);
 err_modify:
 	ib_destroy_qp(p->qp);
 err_qp:
@@ -399,29 +576,60 @@ static void skb_put_frags(struct sk_buff
 	}
 }
 
-void ipoib_cm_handle_rx_wc(struct net_device *dev, struct ib_wc *wc)
+static void timer_check_srq(struct ipoib_dev_priv *priv, struct ipoib_cm_rx *p)
+{
+	unsigned long flags;
+
+	if (p && time_after_eq(jiffies, p->jiffies + IPOIB_CM_RX_UPDATE_TIME)) {
+		spin_lock_irqsave(&priv->lock, flags);
+		p->jiffies = jiffies;
+		/* Move this entry to list head, but do
+		 * not re-add it if it has been removed.
+		 */
+		if (p->state == IPOIB_CM_RX_LIVE)
+			list_move(&p->list, &priv->cm.passive_ids);
+		spin_unlock_irqrestore(&priv->lock, flags);
+	}
+}
+
+static void timer_check_nosrq(struct ipoib_dev_priv *priv, struct ipoib_cm_rx *p)
+{
+	unsigned long flags;
+
+	if (p && time_after_eq(jiffies, p->jiffies + IPOIB_CM_RX_UPDATE_TIME)) {
+		spin_lock_irqsave(&priv->lock, flags);
+		p->jiffies = jiffies;
+		/* Move this entry to list head, but do
+		 * not re-add it if it has been removed. */
+		if (!list_empty(&p->list))
+			list_move(&p->list, &priv->cm.passive_ids);
+		spin_unlock_irqrestore(&priv->lock, flags);
+	}
+}
+
+void handle_rx_wc_srq(struct net_device *dev, struct ib_wc *wc)
 {
 	struct ipoib_dev_priv *priv = netdev_priv(dev);
-	unsigned int wr_id = wc->wr_id & ~IPOIB_CM_OP_SRQ;
+	u64 wr_id = wc->wr_id & ~IPOIB_CM_OP_RECV;
 	struct sk_buff *skb, *newskb;
 	struct ipoib_cm_rx *p;
 	unsigned long flags;
 	u64 mapping[IPOIB_CM_RX_SG];
-	int frags;
+	int frags, ret;
 
-	ipoib_dbg_data(priv, "cm recv completion: id %d, status: %d\n",
-		       wr_id, wc->status);
+	ipoib_dbg_data(priv, "cm recv completion: id %lld, status: %d\n",
+		       (unsigned long long)wr_id, wc->status);
 
 	if (unlikely(wr_id >= ipoib_recvq_size)) {
-		if (wr_id == (IPOIB_CM_RX_DRAIN_WRID & ~IPOIB_CM_OP_SRQ)) {
+		if (wr_id == (IPOIB_CM_RX_DRAIN_WRID & ~IPOIB_CM_OP_RECV)) {
 			spin_lock_irqsave(&priv->lock, flags);
 			list_splice_init(&priv->cm.rx_drain_list, &priv->cm.rx_reap_list);
 			ipoib_cm_start_rx_drain(priv);
 			queue_work(ipoib_workqueue, &priv->cm.rx_reap_task);
 			spin_unlock_irqrestore(&priv->lock, flags);
 		} else
-			ipoib_warn(priv, "cm recv completion event with wrid %d (> %d)\n",
-				   wr_id, ipoib_recvq_size);
+			ipoib_warn(priv, "cm recv completion event with wrid %lld (> %d)\n",
+				   (unsigned long long)wr_id, ipoib_recvq_size);
 		return;
 	}
 
@@ -429,23 +637,15 @@ void ipoib_cm_handle_rx_wc(struct net_de
 
 	if (unlikely(wc->status != IB_WC_SUCCESS)) {
 		ipoib_dbg(priv, "cm recv error "
-			   "(status=%d, wrid=%d vend_err %x)\n",
-			   wc->status, wr_id, wc->vendor_err);
+			   "(status=%d, wrid=%lld vend_err %x)\n",
+			   wc->status, (unsigned long long)wr_id, wc->vendor_err);
 		++priv->stats.rx_dropped;
-		goto repost;
+		goto repost_srq;
 	}
 
 	if (!likely(wr_id & IPOIB_CM_RX_UPDATE_MASK)) {
 		p = wc->qp->qp_context;
-		if (p && time_after_eq(jiffies, p->jiffies + IPOIB_CM_RX_UPDATE_TIME)) {
-			spin_lock_irqsave(&priv->lock, flags);
-			p->jiffies = jiffies;
-			/* Move this entry to list head, but do not re-add it
-			 * if it has been moved out of list. */
-			if (p->state == IPOIB_CM_RX_LIVE)
-				list_move(&p->list, &priv->cm.passive_ids);
-			spin_unlock_irqrestore(&priv->lock, flags);
-		}
+		timer_check_srq(priv, p);
 	}
 
 	frags = PAGE_ALIGN(wc->byte_len - min(wc->byte_len,
@@ -457,13 +657,113 @@ void ipoib_cm_handle_rx_wc(struct net_de
 		 * If we can't allocate a new RX buffer, dump
 		 * this packet and reuse the old buffer.
 		 */
-		ipoib_dbg(priv, "failed to allocate receive buffer %d\n", wr_id);
+		ipoib_dbg(priv, "failed to allocate receive buffer %lld\n",
+			  (unsigned long long)wr_id);
+		++priv->stats.rx_dropped;
+		goto repost_srq;
+	}
+
+	ipoib_cm_dma_unmap_rx(priv, frags,
+			      priv->cm.srq_ring[wr_id].mapping);
+	memcpy(priv->cm.srq_ring[wr_id].mapping, mapping,
+	       (frags + 1) * sizeof *mapping);
+	ipoib_dbg_data(priv, "received %d bytes, SLID 0x%04x\n",
+		       wc->byte_len, wc->slid);
+
+	skb_put_frags(skb, IPOIB_CM_HEAD_SIZE, wc->byte_len, newskb);
+
+	skb->protocol = ((struct ipoib_header *) skb->data)->proto;
+	skb_reset_mac_header(skb);
+	skb_pull(skb, IPOIB_ENCAP_LEN);
+
+	dev->last_rx = jiffies;
+	++priv->stats.rx_packets;
+	priv->stats.rx_bytes += skb->len;
+
+	skb->dev = dev;
+	/* XXX get correct PACKET_ type here */
+	skb->pkt_type = PACKET_HOST;
+	netif_receive_skb(skb);
+
+repost_srq:
+	ret = post_receive_srq(dev, wr_id);
+
+	if (unlikely(ret))
+		ipoib_warn(priv, "post_receive_srq failed for buf %lld\n",
+			   (unsigned long long)wr_id);
+
+}
+
+static void handle_rx_wc_nosrq(struct net_device *dev, struct ib_wc *wc)
+{
+	struct ipoib_dev_priv *priv = netdev_priv(dev);
+	struct sk_buff *skb, *newskb;
+	u64 mapping[IPOIB_CM_RX_SG], wr_id = wc->wr_id >> 32;
+	u32 index;
+	struct ipoib_cm_rx *rx_ptr;
+	int frags, ret;
+
+
+	ipoib_dbg_data(priv, "cm recv completion: id %lld, status: %d\n",
+		       (unsigned long long)wr_id, wc->status);
+
+	if (unlikely(wr_id >= ipoib_recvq_size)) {
+		ipoib_warn(priv, "cm recv completion event with wrid %lld (> %d)\n",
+				   (unsigned long long)wr_id, ipoib_recvq_size);
+		return;
+	}
+
+	index = (wc->wr_id & ~IPOIB_CM_OP_RECV) & NOSRQ_INDEX_MASK ;
+
+	/* This is the only place where rx_ptr could be a NULL - could
+	 * have just received a packet from a connection that has become
+	 * stale and so is going away. We will simply drop the packet and
+	 * let the hardware (it s IB_QPT_RC) handle the dropped packet.
+	 * In the timer_check() function below, p->jiffies is updated and
+	 * hence the connection will not be stale after that.
+	 */
+	rx_ptr = priv->cm.rx_index_table[index];
+	if (unlikely(!rx_ptr)) {
+		ipoib_warn(priv, "Received packet from a connection "
+			   "that is going away. Hardware will handle it.\n");
+		return;
+	}
+
+	skb = rx_ptr->rx_ring[wr_id].skb;
+
+	if (unlikely(wc->status != IB_WC_SUCCESS)) {
+		ipoib_dbg(priv, "cm recv error "
+			   "(status=%d, wrid=%lld vend_err %x)\n",
+			   wc->status, (unsigned long long)wr_id, wc->vendor_err);
+		++priv->stats.rx_dropped;
+		goto repost_nosrq;
+	}
+
+	if (!likely(wr_id & IPOIB_CM_RX_UPDATE_MASK)) {
+		/* There are no guarantees that wc->qp is not NULL for HCAs
+		 * that do not support SRQ. */
+		timer_check_nosrq(priv, rx_ptr);
+	}
+
+	frags = PAGE_ALIGN(wc->byte_len - min(wc->byte_len,
+					      (unsigned)IPOIB_CM_HEAD_SIZE)) / PAGE_SIZE;
+
+	newskb = ipoib_cm_alloc_rx_skb(dev, wr_id << 32 | index, frags,
+				       mapping);
+	if (unlikely(!newskb)) {
+		/*
+		 * If we can't allocate a new RX buffer, dump
+		 * this packet and reuse the old buffer.
+		 */
+		ipoib_dbg(priv, "failed to allocate receive buffer %lld\n",
+			  (unsigned long long)wr_id);
 		++priv->stats.rx_dropped;
-		goto repost;
+		goto repost_nosrq;
 	}
 
-	ipoib_cm_dma_unmap_rx(priv, frags, priv->cm.srq_ring[wr_id].mapping);
-	memcpy(priv->cm.srq_ring[wr_id].mapping, mapping, (frags + 1) * sizeof *mapping);
+	ipoib_cm_dma_unmap_rx(priv, frags, rx_ptr->rx_ring[wr_id].mapping);
+	memcpy(rx_ptr->rx_ring[wr_id].mapping, mapping,
+	       (frags + 1) * sizeof *mapping);
 
 	ipoib_dbg_data(priv, "received %d bytes, SLID 0x%04x\n",
 		       wc->byte_len, wc->slid);
@@ -483,10 +783,22 @@ void ipoib_cm_handle_rx_wc(struct net_de
 	skb->pkt_type = PACKET_HOST;
 	netif_receive_skb(skb);
 
-repost:
-	if (unlikely(ipoib_cm_post_receive(dev, wr_id)))
-		ipoib_warn(priv, "ipoib_cm_post_receive failed "
-			   "for buf %d\n", wr_id);
+repost_nosrq:
+	ret = post_receive_nosrq(dev, wr_id << 32 | index);
+
+	if (unlikely(ret))
+		ipoib_warn(priv, "post_receive_nosrq failed for buf %lld\n",
+			   (unsigned long long)wr_id);
+}
+
+void ipoib_cm_handle_rx_wc(struct net_device *dev, struct ib_wc *wc)
+{
+	struct ipoib_dev_priv *priv = netdev_priv(dev);
+
+	if (priv->cm.srq)
+		handle_rx_wc_srq(dev, wc);
+	else
+		handle_rx_wc_nosrq(dev, wc);
 }
 
 static inline int post_send(struct ipoib_dev_priv *priv,
@@ -678,6 +990,42 @@ err_cm:
 	return ret;
 }
 
+static void free_resources_nosrq(struct ipoib_dev_priv *priv, struct ipoib_cm_rx *p)
+{
+	int i;
+
+	for (i = 0; i < ipoib_recvq_size; ++i)
+		if (p->rx_ring[i].skb) {
+			ipoib_cm_dma_unmap_rx(priv,
+					 IPOIB_CM_RX_SG - 1,
+					 p->rx_ring[i].mapping);
+			dev_kfree_skb_any(p->rx_ring[i].skb);
+			p->rx_ring[i].skb = NULL;
+		}
+	kfree(p->rx_ring);
+}
+
+void dev_stop_nosrq(struct ipoib_dev_priv *priv)
+{
+	struct ipoib_cm_rx *p;
+
+	spin_lock_irq(&priv->lock);
+	while (!list_empty(&priv->cm.passive_ids)) {
+		p = list_entry(priv->cm.passive_ids.next, typeof(*p), list);
+		free_resources_nosrq(priv, p);
+		list_del(&p->list);
+		spin_unlock_irq(&priv->lock);
+		ib_destroy_cm_id(p->id);
+		ib_destroy_qp(p->qp);
+		atomic_dec(&current_rc_qp);
+		kfree(p);
+		spin_lock_irq(&priv->lock);
+	}
+	spin_unlock_irq(&priv->lock);
+
+	cancel_delayed_work(&priv->cm.stale_task);
+}
+
 void ipoib_cm_dev_stop(struct net_device *dev)
 {
 	struct ipoib_dev_priv *priv = netdev_priv(dev);
@@ -692,6 +1040,11 @@ void ipoib_cm_dev_stop(struct net_device
 	ib_destroy_cm_id(priv->cm.id);
 	priv->cm.id = NULL;
 
+	if (!priv->cm.srq) {
+		dev_stop_nosrq(priv);
+		return;
+	}
+
 	spin_lock_irq(&priv->lock);
 	while (!list_empty(&priv->cm.passive_ids)) {
 		p = list_entry(priv->cm.passive_ids.next, typeof(*p), list);
@@ -815,7 +1168,9 @@ static struct ib_qp *ipoib_cm_create_tx_
 	attr.recv_cq = priv->cq;
 	attr.srq = priv->cm.srq;
 	attr.cap.max_send_wr = ipoib_sendq_size;
+	attr.cap.max_recv_wr = 1;
 	attr.cap.max_send_sge = 1;
+	attr.cap.max_recv_sge = 1;
 	attr.sq_sig_type = IB_SIGNAL_ALL_WR;
 	attr.qp_type = IB_QPT_RC;
 	attr.send_cq = cq;
@@ -855,7 +1210,7 @@ static int ipoib_cm_send_req(struct net_
 	req.retry_count 	      = 0; /* RFC draft warns against retries */
 	req.rnr_retry_count 	      = 0; /* RFC draft warns against retries */
 	req.max_cm_retries 	      = 15;
-	req.srq 	              = 1;
+	req.srq			      = !!priv->cm.srq;
 	return ib_send_cm_req(id, &req);
 }
 
@@ -1200,6 +1555,8 @@ static void ipoib_cm_rx_reap(struct work
 	list_for_each_entry_safe(p, n, &list, list) {
 		ib_destroy_cm_id(p->id);
 		ib_destroy_qp(p->qp);
+		if (!priv->cm.srq)
+			atomic_dec(&current_rc_qp);
 		kfree(p);
 	}
 }
@@ -1218,12 +1575,19 @@ static void ipoib_cm_stale_task(struct w
 		p = list_entry(priv->cm.passive_ids.prev, typeof(*p), list);
 		if (time_before_eq(jiffies, p->jiffies + IPOIB_CM_RX_TIMEOUT))
 			break;
-		list_move(&p->list, &priv->cm.rx_error_list);
-		p->state = IPOIB_CM_RX_ERROR;
-		spin_unlock_irq(&priv->lock);
-		ret = ib_modify_qp(p->qp, &ipoib_cm_err_attr, IB_QP_STATE);
-		if (ret)
-			ipoib_warn(priv, "unable to move qp to error state: %d\n", ret);
+		if (!priv->cm.srq) {
+			free_resources_nosrq(priv, p);
+			list_del_init(&p->list);
+			priv->cm.rx_index_table[p->index] = NULL;
+			spin_unlock_irq(&priv->lock);
+		} else {
+			list_move(&p->list, &priv->cm.rx_error_list);
+			p->state = IPOIB_CM_RX_ERROR;
+			spin_unlock_irq(&priv->lock);
+			ret = ib_modify_qp(p->qp, &ipoib_cm_err_attr, IB_QP_STATE);
+			if (ret)
+				ipoib_warn(priv, "unable to move qp to error state: %d\n", ret);
+		}
 		spin_lock_irq(&priv->lock);
 	}
 
@@ -1277,16 +1641,40 @@ int ipoib_cm_add_mode_attr(struct net_de
 	return device_create_file(&dev->dev, &dev_attr_mode);
 }
 
+static int create_srq(struct net_device *dev, struct ipoib_dev_priv *priv)
+{
+	struct ib_srq_init_attr srq_init_attr;
+	int ret;
+
+	srq_init_attr.attr.max_wr = ipoib_recvq_size;
+	srq_init_attr.attr.max_sge = IPOIB_CM_RX_SG;
+
+	priv->cm.srq = ib_create_srq(priv->pd, &srq_init_attr);
+	if (IS_ERR(priv->cm.srq)) {
+		ret = PTR_ERR(priv->cm.srq);
+		priv->cm.srq = NULL;
+		return ret;
+	}
+
+	priv->cm.srq_ring = kzalloc(ipoib_recvq_size *
+				    sizeof *priv->cm.srq_ring,
+				    GFP_KERNEL);
+	if (!priv->cm.srq_ring) {
+		printk(KERN_WARNING "%s: failed to allocate CM ring "
+		       "(%d entries)\n",
+			priv->ca->name, ipoib_recvq_size);
+		ipoib_cm_dev_cleanup(dev);
+		return -ENOMEM;
+	}
+
+	return 0;
+}
+
 int ipoib_cm_dev_init(struct net_device *dev)
 {
 	struct ipoib_dev_priv *priv = netdev_priv(dev);
-	struct ib_srq_init_attr srq_init_attr = {
-		.attr = {
-			.max_wr  = ipoib_recvq_size,
-			.max_sge = IPOIB_CM_RX_SG
-		}
-	};
 	int ret, i;
+	struct ib_device_attr attr;
 
 	INIT_LIST_HEAD(&priv->cm.passive_ids);
 	INIT_LIST_HEAD(&priv->cm.reap_list);
@@ -1303,20 +1691,32 @@ int ipoib_cm_dev_init(struct net_device 
 
 	skb_queue_head_init(&priv->cm.skb_queue);
 
-	priv->cm.srq = ib_create_srq(priv->pd, &srq_init_attr);
-	if (IS_ERR(priv->cm.srq)) {
-		ret = PTR_ERR(priv->cm.srq);
-		priv->cm.srq = NULL;
+	ret = ib_query_device(priv->ca, &attr);
+	if (ret)
 		return ret;
-	}
 
-	priv->cm.srq_ring = kzalloc(ipoib_recvq_size * sizeof *priv->cm.srq_ring,
-				    GFP_KERNEL);
-	if (!priv->cm.srq_ring) {
-		printk(KERN_WARNING "%s: failed to allocate CM ring (%d entries)\n",
-		       priv->ca->name, ipoib_recvq_size);
-		ipoib_cm_dev_cleanup(dev);
-		return -ENOMEM;
+	if (attr.max_srq) {
+		/* This device supports SRQ */
+		ret = create_srq(dev, priv);
+		if (ret)
+			return ret;
+		priv->cm.rx_index_table = NULL;
+	} else {
+		priv->cm.srq = NULL;
+		priv->cm.srq_ring = NULL;
+
+		/* Every new REQ that arrives creates a struct ipoib_cm_rx.
+		 * These structures form a link list starting with the
+		 * passive_ids. For quick and easy access we maintain a table
+		 * of pointers to struct ipoib_cm_rx called the rx_index_table
+		 */
+		priv->cm.rx_index_table = kzalloc(NOSRQ_INDEX_TABLE_SIZE *
+					 sizeof *priv->cm.rx_index_table,
+					 GFP_KERNEL);
+		if (!priv->cm.rx_index_table) {
+			printk(KERN_WARNING "Failed to allocate NOSRQ_INDEX_TABLE\n");
+			return -ENOMEM;
+		}
 	}
 
 	for (i = 0; i < IPOIB_CM_RX_SG; ++i)
@@ -1329,17 +1729,24 @@ int ipoib_cm_dev_init(struct net_device 
 	priv->cm.rx_wr.sg_list = priv->cm.rx_sge;
 	priv->cm.rx_wr.num_sge = IPOIB_CM_RX_SG;
 
-	for (i = 0; i < ipoib_recvq_size; ++i) {
-		if (!ipoib_cm_alloc_rx_skb(dev, i, IPOIB_CM_RX_SG - 1,
+	/* One can post receive buffers even before the RX QP is created
+	 * only in the SRQ case. Therefore for NOSRQ we skip the rest of init
+	 * and do that in ipoib_cm_req_handler()
+	 */
+
+	if (priv->cm.srq) {
+		for (i = 0; i < ipoib_recvq_size; ++i) {
+			if (!ipoib_cm_alloc_rx_skb(dev, i, IPOIB_CM_RX_SG - 1,
 					   priv->cm.srq_ring[i].mapping)) {
-			ipoib_warn(priv, "failed to allocate receive buffer %d\n", i);
-			ipoib_cm_dev_cleanup(dev);
-			return -ENOMEM;
-		}
-		if (ipoib_cm_post_receive(dev, i)) {
-			ipoib_warn(priv, "ipoib_ib_post_receive failed for buf %d\n", i);
-			ipoib_cm_dev_cleanup(dev);
-			return -EIO;
+				ipoib_warn(priv, "failed to allocate receive buffer %d\n", i);
+				ipoib_cm_dev_cleanup(dev);
+				return -ENOMEM;
+			}
+			if (post_receive_srq(dev, i)) {
+				ipoib_warn(priv, "post_receive_srq failed for buf %d\n", i);
+				ipoib_cm_dev_cleanup(dev);
+				return -EIO;
+			}
 		}
 	}
 
--- a/linux-2.6.22/drivers/infiniband/ulp/ipoib/ipoib_ib.c	2007-05-30 14:56:25.000000000 -0400
+++ b/linux-2.6.22/drivers/infiniband/ulp/ipoib/ipoib_ib.c	2007-07-10 18:30:10.000000000 -0400
@@ -299,7 +299,7 @@ int ipoib_poll(struct net_device *dev, i
 		for (i = 0; i < n; ++i) {
 			struct ib_wc *wc = priv->ibwc + i;
 
-			if (wc->wr_id & IPOIB_CM_OP_SRQ) {
+			if (wc->wr_id & IPOIB_CM_OP_RECV) {
 				++done;
 				--max;
 				ipoib_cm_handle_rx_wc(dev, wc);
@@ -557,7 +557,7 @@ void ipoib_drain_cq(struct net_device *d
 	do {
 		n = ib_poll_cq(priv->cq, IPOIB_NUM_WC, priv->ibwc);
 		for (i = 0; i < n; ++i) {
-			if (priv->ibwc[i].wr_id & IPOIB_CM_OP_SRQ)
+			if (priv->ibwc[i].wr_id & IPOIB_CM_OP_RECV)
 				ipoib_cm_handle_rx_wc(dev, priv->ibwc + i);
 			else if (priv->ibwc[i].wr_id & IPOIB_OP_RECV)
 				ipoib_ib_handle_rx_wc(dev, priv->ibwc + i);
--- a/linux-2.6.22/drivers/infiniband/ulp/ipoib/ipoib_verbs.c	2007-05-30 14:56:25.000000000 -0400
+++ b/linux-2.6.22/drivers/infiniband/ulp/ipoib/ipoib_verbs.c	2007-07-19 02:55:24.000000000 -0400
@@ -175,6 +175,18 @@ int ipoib_transport_dev_init(struct net_
 	if (!ret)
 		size += ipoib_recvq_size + 1 /* 1 extra for rx_drain_qp */;
 
+#ifdef CONFIG_INFINIBAND_IPOIB_CM
+
+	/* We increase the size of the CQ in the NOSRQ case to prevent CQ
+	 * overflow. Every new REQ creates a new RX QP and each QP has an
+	 * RX ring associated with it. Therefore we could have
+	 * NOSRQ_INDEX_TABLE_SIZE*ipoib_recvq_size + ipoib_sendq_size CQEs
+	 * in a CQ.
+	 */
+	if (!priv->cm.srq)
+		size += (NOSRQ_INDEX_TABLE_SIZE - 1) * ipoib_recvq_size;
+#endif
+
 	priv->cq = ib_create_cq(priv->ca, ipoib_ib_completion, NULL, dev, size, 0);
 	if (IS_ERR(priv->cq)) {
 		printk(KERN_WARNING "%s: failed to create CQ\n", ca->name);





More information about the general mailing list