[ofa-general] IPOIB CM (NOSRQ)[PATCH V4] patch for review

Pradeep Satyanarayana pradeeps at linux.vnet.ibm.com
Wed May 9 08:32:43 PDT 2007


Here is a fourth version of the IPOIB_CM_NOSRQ patch for review. This 
patch will benefit adapters that do not support shared receive queues.

This patch incorporates the following review comments from v3:
1. Incorporated review comments (related to style) from Roland Dreier 
and Michael Tsirkin
2. Fixed a couple of leaks in the error path (thanks to Roland Dreier 
for pointing them out).
3. Eliminated spin lock in data path (as suggested by Michael Tsirkin)
4. Changes to avoid CQ overflow (issue pointed out by Micheal Tsirkin)
5. Send REJ when no RC QPs remain (credit Micheal Tsirkin for the idea)
6. I have reset the retry_count to 0 in ipoib_cm_send_req()

This patch has been tested with linux-2.6.21-rc7 derived from Roland's 
for-2.6.22  git tree on 05/07/2007) with Topspin and IBM HCAs on ppc64 
machines. I have run netperf between two IBM HCAs and as well
as between IBM and Topspin HCA.

Note 1: For interoperability retry_count in ipoib_cm_send_req() may need 
to be changed to a non zero value (3 has worked for me). This is a 
temporary work around till HCA and/or CM bug is fixed that takes into 
account the HCA local ACK delay.

Note 2: I ran into problems trying to build Roland's git tree (on ppc64) 
that I downloaded 05/07/2007. Hence I just used the infiniband/ 
subdirectory and had to make changes to use skb->mac.raw = skb->data 
instead of skb_reset_mac_header(skb). Did not want to submit a patch 
that was untested. This can be fixed with a subsequent patch when I the 
tree to build.

Signed-off-by: Pradeep Satyanarayana
---

--- a/drivers/infiniband/ulp/ipoib/ipoib.h	2007-05-07 16:05:32.000000000 
-0700
+++ b/drivers/infiniband/ulp/ipoib/ipoib.h	2007-05-07 17:42:14.000000000 
-0700
@@ -97,9 +97,13 @@ enum {

  #define	IPOIB_OP_RECV   (1ul << 31)
  #ifdef CONFIG_INFINIBAND_IPOIB_CM
-#define	IPOIB_CM_OP_SRQ (1ul << 30)
+#define	IPOIB_CM_OP_RECV (1ul << 30)
+
+#define NOSRQ_INDEX_TABLE_SIZE 1024
+#define NOSRQ_INDEX_MASK      (NOSRQ_INDEX_TABLE_SIZE -1)
+
  #else
-#define	IPOIB_CM_OP_SRQ (0)
+#define	IPOIB_CM_OP_RECV (0)
  #endif

  /* structs */
@@ -133,11 +137,14 @@ struct ipoib_cm_data {
  };

  struct ipoib_cm_rx {
-	struct ib_cm_id     *id;
-	struct ib_qp        *qp;
-	struct list_head     list;
-	struct net_device   *dev;
-	unsigned long        jiffies;
+	struct ib_cm_id     	*id;
+	struct ib_qp        	*qp;
+	struct ipoib_cm_rx_buf 	*rx_ring; /* Used by NOSRQ only */
+	struct list_head     	 list;
+	struct net_device   	*dev;
+	unsigned long        	 jiffies;
+	u32			 index; /* wr_ids are distinguished by index
+					 * to identify the QP -NOSRQ only */
  };

  struct ipoib_cm_tx {
@@ -176,6 +183,8 @@ struct ipoib_cm_dev_priv {
  	struct ib_wc            ibwc[IPOIB_NUM_WC];
  	struct ib_sge           rx_sge[IPOIB_CM_RX_SG];
  	struct ib_recv_wr       rx_wr;
+	struct ipoib_cm_rx	**rx_index_table; /* See ipoib_cm_dev_init()
+						   *for usage of this element */
  };

  /*
@@ -521,10 +530,9 @@ static inline void ipoib_cm_skb_too_long
  	dev_kfree_skb_any(skb);
  }

-static inline void ipoib_cm_handle_rx_wc(struct net_device *dev, struct 
ib_wc *wc)
+void ipoib_cm_handle_rx_wc(struct net_device *dev, struct ib_wc *wc)
  {
  }
-
  #endif

  #ifdef CONFIG_INFINIBAND_IPOIB_DEBUG
--- a/drivers/infiniband/ulp/ipoib/ipoib_cm.c	2007-05-07 
22:19:52.000000000 -0700
+++ b/drivers/infiniband/ulp/ipoib/ipoib_cm.c	2007-05-08 
18:07:15.000000000 -0700
@@ -76,20 +76,20 @@ static void ipoib_cm_dma_unmap_rx(struct
  		ib_dma_unmap_single(priv->ca, mapping[i + 1], PAGE_SIZE, 
DMA_FROM_DEVICE);
  }

-static int ipoib_cm_post_receive(struct net_device *dev, int id)
+static int post_receive_srq(struct net_device *dev, u64 id)
  {
  	struct ipoib_dev_priv *priv = netdev_priv(dev);
  	struct ib_recv_wr *bad_wr;
  	int i, ret;

-	priv->cm.rx_wr.wr_id = id | IPOIB_CM_OP_SRQ;
+	priv->cm.rx_wr.wr_id = id | IPOIB_CM_OP_RECV;

  	for (i = 0; i < IPOIB_CM_RX_SG; ++i)
  		priv->cm.rx_sge[i].addr = priv->cm.srq_ring[id].mapping[i];

  	ret = ib_post_srq_recv(priv->cm.srq, &priv->cm.rx_wr, &bad_wr);
  	if (unlikely(ret)) {
-		ipoib_warn(priv, "post srq failed for buf %d (%d)\n", id, ret);
+		ipoib_warn(priv, "post srq failed for buf %ld (%d)\n", id, ret);
  		ipoib_cm_dma_unmap_rx(priv, IPOIB_CM_RX_SG - 1,
  				      priv->cm.srq_ring[id].mapping);
  		dev_kfree_skb_any(priv->cm.srq_ring[id].skb);
@@ -99,12 +99,60 @@ static int ipoib_cm_post_receive(struct
  	return ret;
  }

-static struct sk_buff *ipoib_cm_alloc_rx_skb(struct net_device *dev, 
int id, int frags,
+static int post_receive_nosrq(struct net_device *dev, u64 id)
+{
+	struct ipoib_dev_priv *priv = netdev_priv(dev);
+	struct ib_recv_wr *bad_wr;
+	int i, ret;
+	u32 index;
+	u32 wr_id;
+	struct ipoib_cm_rx *rx_ptr;
+
+	index = id  & NOSRQ_INDEX_MASK ;
+	wr_id = id >> 32;
+
+	rx_ptr = priv->cm.rx_index_table[index];
+
+	priv->cm.rx_wr.wr_id = id | IPOIB_CM_OP_RECV;
+
+	for (i = 0; i < IPOIB_CM_RX_SG; ++i)
+		priv->cm.rx_sge[i].addr = rx_ptr->rx_ring[wr_id].mapping[i];
+
+	ret = ib_post_recv(rx_ptr->qp, &priv->cm.rx_wr, &bad_wr);
+	if (unlikely(ret)) {
+		ipoib_warn(priv, "post recv failed for buf %d (%d)\n",
+		           wr_id, ret);
+		ipoib_cm_dma_unmap_rx(priv, IPOIB_CM_RX_SG - 1,
+		                      rx_ptr->rx_ring[wr_id].mapping);
+		dev_kfree_skb_any(rx_ptr->rx_ring[wr_id].skb);
+		rx_ptr->rx_ring[wr_id].skb = NULL;
+	}
+
+	return ret;
+}
+
+static int ipoib_cm_post_receive(struct net_device *dev, u64 id)
+{
+	struct ipoib_dev_priv *priv = netdev_priv(dev);
+	int ret;
+
+	if (priv->cm.srq)
+		ret = post_receive_srq(dev, id);
+	else
+		ret = post_receive_nosrq(dev, id);
+
+	return ret;
+}
+
+static struct sk_buff *ipoib_cm_alloc_rx_skb(struct net_device *dev, 
u64 id,
+					     int frags,
  					     u64 mapping[IPOIB_CM_RX_SG])
  {
  	struct ipoib_dev_priv *priv = netdev_priv(dev);
  	struct sk_buff *skb;
  	int i;
+	struct ipoib_cm_rx *rx_ptr;
+	u32 index, wr_id;

  	skb = dev_alloc_skb(IPOIB_CM_HEAD_SIZE + 12);
  	if (unlikely(!skb))
@@ -136,7 +184,14 @@ static struct sk_buff *ipoib_cm_alloc_rx
  			goto partial_error;
  	}

-	priv->cm.srq_ring[id].skb = skb;
+	if (priv->cm.srq)
+		priv->cm.srq_ring[id].skb = skb;
+	else {
+		index = id  & NOSRQ_INDEX_MASK ;
+		wr_id = id >> 32;
+		rx_ptr = priv->cm.rx_index_table[index];
+		rx_ptr->rx_ring[wr_id].skb = skb;
+	}
  	return skb;

  partial_error:
@@ -159,11 +214,14 @@ static struct ib_qp *ipoib_cm_create_rx_
  		.recv_cq = priv->cq,
  		.srq = priv->cm.srq,
  		.cap.max_send_wr = 1, /* FIXME: 0 Seems not to work */
+		.cap.max_recv_wr = ipoib_recvq_size + 1,
  		.cap.max_send_sge = 1, /* FIXME: 0 Seems not to work */
  		.sq_sig_type = IB_SIGNAL_ALL_WR,
  		.qp_type = IB_QPT_RC,
  		.qp_context = p,
  	};
+	if (!priv->cm.srq)
+		attr.cap.max_recv_sge = IPOIB_CM_RX_SG;
  	return ib_create_qp(priv->pd, &attr);
  }

@@ -217,12 +275,103 @@ static int ipoib_cm_send_rep(struct net_
  	rep.flow_control = 0;
  	rep.rnr_retry_count = req->rnr_retry_count;
  	rep.target_ack_delay = 20; /* FIXME */
-	rep.srq = 1;
  	rep.qp_num = qp->qp_num;
  	rep.starting_psn = psn;
+	rep.srq	= !!priv->cm.srq;
  	return ib_send_cm_rep(cm_id, &rep);
  }

+static int allocate_and_post_rbuf_nosrq(struct ib_cm_id *cm_id,
+				        struct ipoib_cm_rx *p, unsigned psn)
+{
+	struct net_device *dev = cm_id->context;
+	struct ipoib_dev_priv *priv = netdev_priv(dev);
+	int ret;
+	u32 qp_num, index;
+	u64 i;
+
+	qp_num = p->qp->qp_num;
+
+	/* In the SRQ case there is a common rx buffer called the srq_ring.
+	 * However, for the NOSRQ we create an rx_ring for every
+	 * struct ipoib_cm_rx.
+	 */
+	p->rx_ring = kzalloc(ipoib_recvq_size * sizeof *p->rx_ring, GFP_KERNEL);
+	if (!p->rx_ring) {
+		printk(KERN_WARNING "Failed to allocate rx_ring for 0x%x\n",
+		       qp_num);
+		return -ENOMEM;
+	}
+
+	cm_id->context = p;
+	p->jiffies = jiffies;
+	spin_lock_irq(&priv->lock);
+	list_add(&p->list, &priv->cm.passive_ids);
+		
+	for (index = 0; index < NOSRQ_INDEX_TABLE_SIZE; index++)
+		if (priv->cm.rx_index_table[index] == NULL)
+			break;
+
+	if ( index == NOSRQ_INDEX_TABLE_SIZE) {
+		spin_unlock_irq(&priv->lock);
+		ipoib_warn(priv, "NOSRQ supports a max of %d RC "
+		       "QPs. That limit has now been reached\n",
+		       NOSRQ_INDEX_TABLE_SIZE);
+
+		/* We send a REJ to the remote side indicating that we
+		 * have no more free RC QPs and leave it to the remote side
+		 * to take appropriate action. This should leave the
+		 * current set of QPs unaffected and any subsequent REQs
+		 * will be able to use RC QPs if they are available.
+		 */
+		ib_send_cm_rej(cm_id, IB_CM_REJ_NO_QP, NULL, 0, NULL, 0);
+		ret = -EINVAL;
+		goto err_send_rej;
+	}
+
+	priv->cm.rx_index_table[index] = p;
+	spin_unlock_irq(&priv->lock);
+
+	/* We will subsequently use this stored pointer while freeing
+	 * resources in stale task */
+	p->index = index;
+
+	ret = ipoib_cm_modify_rx_qp(dev, cm_id, p->qp, psn);
+	if (ret) {
+		ipoib_warn(priv, "ipoib_cm_modify_rx_qp() failed %d\n", ret);
+		ipoib_cm_dev_cleanup(dev);
+		goto err_modify_nosrq;
+	}
+
+	for (i = 0; i < ipoib_recvq_size; ++i) {
+		if (!ipoib_cm_alloc_rx_skb(dev, i << 32 | index,
+					   IPOIB_CM_RX_SG - 1,
+					   p->rx_ring[i].mapping)) {
+			ipoib_warn(priv, "failed to allocate receive "
+			           "buffer %ld\n", i);
+			ipoib_cm_dev_cleanup(dev);
+			ret = -ENOMEM;
+			goto err_alloc_and_post;
+		}
+
+		if (ipoib_cm_post_receive(dev, i << 32 | index)) {
+			ipoib_warn(priv, "ipoib_ib_post_receive "
+			           "failed for  buf %ld\n", i);
+			ipoib_cm_dev_cleanup(dev);
+			ret = -EIO;
+			goto err_alloc_and_post;
+		}
+	}
+
+	return 0;
+
+err_send_rej:
+err_modify_nosrq:
+err_alloc_and_post:
+	kfree(p->rx_ring);
+	return ret;
+}
+
  static int ipoib_cm_req_handler(struct ib_cm_id *cm_id, struct 
ib_cm_event *event)
  {
  	struct net_device *dev = cm_id->context;
@@ -233,8 +382,11 @@ static int ipoib_cm_req_handler(struct i

  	ipoib_dbg(priv, "REQ arrived\n");
  	p = kzalloc(sizeof *p, GFP_KERNEL);
-	if (!p)
+	if (!p) {
+		printk(KERN_WARNING "Failed to allocate RX control block when "
+		       "REQ arrived\n");
  		return -ENOMEM;
+	}
  	p->dev = dev;
  	p->id = cm_id;
  	p->qp = ipoib_cm_create_rx_qp(dev, p);
@@ -244,9 +396,15 @@ static int ipoib_cm_req_handler(struct i
  	}

  	psn = random32() & 0xffffff;
-	ret = ipoib_cm_modify_rx_qp(dev, cm_id, p->qp, psn);
-	if (ret)
-		goto err_modify;
+	if (!priv->cm.srq) {
+		if (ret = allocate_and_post_rbuf_nosrq(cm_id, p, psn))
+			goto err_post_nosrq;
+	} else {
+		p->rx_ring = NULL;
+		ret = ipoib_cm_modify_rx_qp(dev, cm_id, p->qp, psn);
+		if (ret)
+			goto err_modify;
+	}

  	ret = ipoib_cm_send_rep(dev, cm_id, p->qp, &event->param.req_rcvd, psn);
  	if (ret) {
@@ -254,16 +412,19 @@ static int ipoib_cm_req_handler(struct i
  		goto err_rep;
  	}

-	cm_id->context = p;
-	p->jiffies = jiffies;
-	spin_lock_irq(&priv->lock);
-	list_add(&p->list, &priv->cm.passive_ids);
-	spin_unlock_irq(&priv->lock);
+	if (priv->cm.srq) {
+		cm_id->context = p;
+		p->jiffies = jiffies;
+		spin_lock_irq(&priv->lock);
+		list_add(&p->list, &priv->cm.passive_ids);
+		spin_unlock_irq(&priv->lock);
+	}
  	queue_delayed_work(ipoib_workqueue,
  			   &priv->cm.stale_task, IPOIB_CM_RX_DELAY);
  	return 0;

  err_rep:
+err_post_nosrq:
  err_modify:
  	ib_destroy_qp(p->qp);
  err_qp:
@@ -339,48 +500,53 @@ static void skb_put_frags(struct sk_buff
  	}
  }

-void ipoib_cm_handle_rx_wc(struct net_device *dev, struct ib_wc *wc)
+static void timer_check(struct ipoib_dev_priv *priv, struct ipoib_cm_rx *p)
+{
+	unsigned long flags;
+
+	if (p && time_after_eq(jiffies, p->jiffies + IPOIB_CM_RX_UPDATE_TIME)) {
+		spin_lock_irqsave(&priv->lock, flags);
+		p->jiffies = jiffies;
+		/* Move this entry to list head, but do
+		 * not re-add it if it has been removed. */
+		if (!list_empty(&p->list))
+			list_move(&p->list, &priv->cm.passive_ids);
+		spin_unlock_irqrestore(&priv->lock, flags);
+		queue_delayed_work(ipoib_workqueue,
+				   &priv->cm.stale_task, IPOIB_CM_RX_DELAY);
+	}
+}
+
+static void handle_rx_wc_srq(struct net_device *dev, struct ib_wc *wc)
  {
  	struct ipoib_dev_priv *priv = netdev_priv(dev);
-	unsigned int wr_id = wc->wr_id & ~IPOIB_CM_OP_SRQ;
  	struct sk_buff *skb, *newskb;
+	u64 mapping[IPOIB_CM_RX_SG], wr_id = wc->wr_id & ~IPOIB_CM_OP_RECV;
  	struct ipoib_cm_rx *p;
-	unsigned long flags;
-	u64 mapping[IPOIB_CM_RX_SG];
-	int frags;
+	int frags, ret;

  	ipoib_dbg_data(priv, "cm recv completion: id %d, status: %d\n",
  		       wr_id, wc->status);

  	if (unlikely(wr_id >= ipoib_recvq_size)) {
-		ipoib_warn(priv, "cm recv completion event with wrid %d (> %d)\n",
-			   wr_id, ipoib_recvq_size);
-		return;
+		ipoib_warn(priv, "cm recv completion event with wrid %ld "
+		           "(> %d)\n", wr_id, ipoib_recvq_size);
+		return;
  	}

  	skb  = priv->cm.srq_ring[wr_id].skb;

  	if (unlikely(wc->status != IB_WC_SUCCESS)) {
  		ipoib_dbg(priv, "cm recv error "
-			   "(status=%d, wrid=%d vend_err %x)\n",
+			   "(status=%d, wrid=%ld vend_err %x)\n",
  			   wc->status, wr_id, wc->vendor_err);
  		++priv->stats.rx_dropped;
-		goto repost;
+		goto repost_srq;
  	}

  	if (!likely(wr_id & IPOIB_CM_RX_UPDATE_MASK)) {
  		p = wc->qp->qp_context;
-		if (p && time_after_eq(jiffies, p->jiffies + IPOIB_CM_RX_UPDATE_TIME)) {
-			spin_lock_irqsave(&priv->lock, flags);
-			p->jiffies = jiffies;
-			/* Move this entry to list head, but do
-			 * not re-add it if it has been removed. */
-			if (!list_empty(&p->list))
-				list_move(&p->list, &priv->cm.passive_ids);
-			spin_unlock_irqrestore(&priv->lock, flags);
-			queue_delayed_work(ipoib_workqueue,
-					   &priv->cm.stale_task, IPOIB_CM_RX_DELAY);
-		}
+		timer_check(priv, p);
  	}

  	frags = PAGE_ALIGN(wc->byte_len - min(wc->byte_len,
@@ -392,13 +558,113 @@ void ipoib_cm_handle_rx_wc(struct net_de
  		 * If we can't allocate a new RX buffer, dump
  		 * this packet and reuse the old buffer.
  		 */
-		ipoib_dbg(priv, "failed to allocate receive buffer %d\n", wr_id);
+		ipoib_dbg(priv, "failed to allocate receive buffer %ld\n", wr_id);
+                ++priv->stats.rx_dropped;
+                goto repost_srq;
+        }
+
+	ipoib_cm_dma_unmap_rx(priv, frags,
+	                      priv->cm.srq_ring[wr_id].mapping);
+	memcpy(priv->cm.srq_ring[wr_id].mapping, mapping,
+	       (frags + 1) * sizeof *mapping);
+	ipoib_dbg_data(priv, "received %d bytes, SLID 0x%04x\n",
+		       wc->byte_len, wc->slid);
+
+	skb_put_frags(skb, IPOIB_CM_HEAD_SIZE, wc->byte_len, newskb);
+
+	skb->protocol = ((struct ipoib_header *) skb->data)->proto;
+	skb->mac.raw = skb->data;
+	skb_pull(skb, IPOIB_ENCAP_LEN);
+
+	dev->last_rx = jiffies;
+	++priv->stats.rx_packets;
+	priv->stats.rx_bytes += skb->len;
+
+	skb->dev = dev;
+	/* XXX get correct PACKET_ type here */
+	skb->pkt_type = PACKET_HOST;
+	netif_rx_ni(skb);
+
+repost_srq:
+	ret = ipoib_cm_post_receive(dev, wr_id);
+
+	if (unlikely(ret))
+		ipoib_warn(priv, "ipoib_cm_post_receive failed for buf %ld\n",
+		           wr_id);
+
+}
+
+static void handle_rx_wc_nosrq(struct net_device *dev, struct ib_wc *wc)
+{
+	struct ipoib_dev_priv *priv = netdev_priv(dev);
+	struct sk_buff *skb, *newskb;
+	u64 mapping[IPOIB_CM_RX_SG], wr_id = wc->wr_id >> 32;
+	u32 index;
+	struct ipoib_cm_rx *p, *rx_ptr;
+	int frags, ret;
+
+
+	ipoib_dbg_data(priv, "cm recv completion: id %d, status: %d\n",
+		       wr_id, wc->status);
+
+	if (unlikely(wr_id >= ipoib_recvq_size)) {
+		ipoib_warn(priv, "cm recv completion event with wrid %ld "
+		           "(> %d)\n", wr_id, ipoib_recvq_size);
+		return;
+	}
+
+	index = (wc->wr_id & ~IPOIB_CM_OP_RECV) & NOSRQ_INDEX_MASK ;
+
+	/* This is the only place where rx_ptr could be a NULL - could
+	 * have just received a packet from a connection that has become
+	 * stale and so is going away. We will simply drop the packet and
+	 * let the hardware (it s IB_QPT_RC) handle the dropped packet.
+	 * In the timer_check() function below, p->jiffies is updated and
+	 * hence the connection will not be stale after that.
+	 */
+	rx_ptr = priv->cm.rx_index_table[index];
+	if (unlikely(!rx_ptr)) {
+		ipoib_warn(priv, "Received packet from a connection "
+		           "that is going away. Hardware will handle it.\n");
+		return;
+	}
+
+	skb = rx_ptr->rx_ring[wr_id].skb;
+
+	if (unlikely(wc->status != IB_WC_SUCCESS)) {
+		ipoib_dbg(priv, "cm recv error "
+			   "(status=%d, wrid=%ld vend_err %x)\n",
+			   wc->status, wr_id, wc->vendor_err);
+		++priv->stats.rx_dropped;
+		goto repost_nosrq;
+	}
+
+	if (!likely(wr_id & IPOIB_CM_RX_UPDATE_MASK)) {
+		/* There are no guarantees that wc->qp is not NULL for HCAs
+	 	* that do not support SRQ. */
+		p = rx_ptr;
+		timer_check(priv, p);
+	}
+
+	frags = PAGE_ALIGN(wc->byte_len - min(wc->byte_len,
+					      (unsigned)IPOIB_CM_HEAD_SIZE)) / PAGE_SIZE;
+
+	newskb = ipoib_cm_alloc_rx_skb(dev, wr_id << 32 | index, frags,
+				       mapping);
+	if (unlikely(!newskb)) {
+		/*
+		 * If we can't allocate a new RX buffer, dump
+		 * this packet and reuse the old buffer.
+		 */
+		ipoib_dbg(priv, "failed to allocate receive buffer %ld\n", wr_id);
  		++priv->stats.rx_dropped;
-		goto repost;
+		goto repost_nosrq;
  	}

-	ipoib_cm_dma_unmap_rx(priv, frags, priv->cm.srq_ring[wr_id].mapping);
-	memcpy(priv->cm.srq_ring[wr_id].mapping, mapping, (frags + 1) * sizeof 
*mapping);
+	ipoib_cm_dma_unmap_rx(priv, frags,
+	                      rx_ptr->rx_ring[wr_id].mapping);
+	memcpy(rx_ptr->rx_ring[wr_id].mapping, mapping,
+	       (frags + 1) * sizeof *mapping);

  	ipoib_dbg_data(priv, "received %d bytes, SLID 0x%04x\n",
  		       wc->byte_len, wc->slid);
@@ -418,10 +684,22 @@ void ipoib_cm_handle_rx_wc(struct net_de
  	skb->pkt_type = PACKET_HOST;
  	netif_receive_skb(skb);

-repost:
-	if (unlikely(ipoib_cm_post_receive(dev, wr_id)))
-		ipoib_warn(priv, "ipoib_cm_post_receive failed "
-			   "for buf %d\n", wr_id);
+repost_nosrq:
+	ret = ipoib_cm_post_receive(dev, wr_id << 32 | index);
+
+	if (unlikely(ret))
+		ipoib_warn(priv, "ipoib_cm_post_receive failed for buf %ld\n",
+		           wr_id);
+}
+
+void ipoib_cm_handle_rx_wc(struct net_device *dev, struct ib_wc *wc)
+{
+	struct ipoib_dev_priv *priv = netdev_priv(dev);
+
+	if (priv->cm.srq)
+		handle_rx_wc_srq(dev, wc);
+	else
+		handle_rx_wc_nosrq(dev, wc);
  }

  static inline int post_send(struct ipoib_dev_priv *priv,
@@ -609,6 +887,22 @@ int ipoib_cm_dev_open(struct net_device
  	return 0;
  }

+static void free_resources_nosrq(struct ipoib_dev_priv *priv, struct 
ipoib_cm_rx *p)
+{
+	int i;
+
+	for(i = 0; i < ipoib_recvq_size; ++i)
+		if(p->rx_ring[i].skb) {
+			ipoib_cm_dma_unmap_rx(priv,
+				         IPOIB_CM_RX_SG - 1,
+					 p->rx_ring[i].mapping);
+			dev_kfree_skb_any(p->rx_ring[i].skb);
+			p->rx_ring[i].skb = NULL;
+		}
+		kfree(p->rx_ring);
+}
+
+
  void ipoib_cm_dev_stop(struct net_device *dev)
  {
  	struct ipoib_dev_priv *priv = netdev_priv(dev);
@@ -622,6 +916,8 @@ void ipoib_cm_dev_stop(struct net_device
  	spin_lock_irq(&priv->lock);
  	while (!list_empty(&priv->cm.passive_ids)) {
  		p = list_entry(priv->cm.passive_ids.next, typeof(*p), list);
+		if (!priv->cm.srq)
+			free_resources_nosrq(priv, p);
  		list_del_init(&p->list);
  		spin_unlock_irq(&priv->lock);
  		ib_destroy_cm_id(p->id);
@@ -709,7 +1005,9 @@ static struct ib_qp *ipoib_cm_create_tx_
  	attr.recv_cq = priv->cq;
  	attr.srq = priv->cm.srq;
  	attr.cap.max_send_wr = ipoib_sendq_size;
+	attr.cap.max_recv_wr = 1;
  	attr.cap.max_send_sge = 1;
+	attr.cap.max_recv_sge = 1;
  	attr.sq_sig_type = IB_SIGNAL_ALL_WR;
  	attr.qp_type = IB_QPT_RC;
  	attr.send_cq = cq;
@@ -749,7 +1047,7 @@ static int ipoib_cm_send_req(struct net_
  	req.retry_count 	      = 0; /* RFC draft warns against retries */
  	req.rnr_retry_count 	      = 0; /* RFC draft warns against retries */
  	req.max_cm_retries 	      = 15;
-	req.srq 	              = 1;
+	req.srq			      = !!priv->cm.srq;
  	return ib_send_cm_req(id, &req);
  }

@@ -1085,6 +1383,7 @@ static void ipoib_cm_stale_task(struct w
  	struct ipoib_dev_priv *priv = container_of(work, struct ipoib_dev_priv,
  						   cm.stale_task.work);
  	struct ipoib_cm_rx *p;
+	struct ib_qp_attr qp_attr;

  	spin_lock_irq(&priv->lock);
  	while (!list_empty(&priv->cm.passive_ids)) {
@@ -1093,6 +1392,12 @@ static void ipoib_cm_stale_task(struct w
  		p = list_entry(priv->cm.passive_ids.prev, typeof(*p), list);
  		if (time_before_eq(jiffies, p->jiffies + IPOIB_CM_RX_TIMEOUT))
  			break;
+		if (!priv->cm.srq) {
+			free_resources_nosrq(priv, p);
+			priv->cm.rx_index_table[p->index] = NULL;
+			qp_attr.qp_state = IB_QPS_ERR;
+			ib_modify_qp(p->qp, &qp_attr, IB_QP_STATE);
+		}
  		list_del_init(&p->list);
  		spin_unlock_irq(&priv->lock);
  		ib_destroy_cm_id(p->id);
@@ -1147,16 +1452,40 @@ int ipoib_cm_add_mode_attr(struct net_de
  	return device_create_file(&dev->dev, &dev_attr_mode);
  }

+static int create_srq(struct net_device *dev, struct ipoib_dev_priv *priv)
+{
+	struct ib_srq_init_attr srq_init_attr;
+	int ret;
+
+	srq_init_attr.attr.max_wr = ipoib_recvq_size;
+	srq_init_attr.attr.max_sge = IPOIB_CM_RX_SG;
+
+	priv->cm.srq = ib_create_srq(priv->pd, &srq_init_attr);
+	if (IS_ERR(priv->cm.srq)) {
+		ret = PTR_ERR(priv->cm.srq);
+		priv->cm.srq = NULL;
+		return ret;
+	}
+
+	priv->cm.srq_ring = kzalloc(ipoib_recvq_size *
+		                    sizeof *priv->cm.srq_ring,
+			            GFP_KERNEL);
+	if (!priv->cm.srq_ring) {
+		printk(KERN_WARNING "%s: failed to allocate CM ring "
+		       "(%d entries)\n",
+	       	       priv->ca->name, ipoib_recvq_size);
+		ipoib_cm_dev_cleanup(dev);
+		return -ENOMEM;
+	}
+
+	return 0;
+}
+
  int ipoib_cm_dev_init(struct net_device *dev)
  {
  	struct ipoib_dev_priv *priv = netdev_priv(dev);
-	struct ib_srq_init_attr srq_init_attr = {
-		.attr = {
-			.max_wr  = ipoib_recvq_size,
-			.max_sge = IPOIB_CM_RX_SG
-		}
-	};
  	int ret, i;
+	struct ib_device_attr attr;

  	INIT_LIST_HEAD(&priv->cm.passive_ids);
  	INIT_LIST_HEAD(&priv->cm.reap_list);
@@ -1168,20 +1497,30 @@ int ipoib_cm_dev_init(struct net_device

  	skb_queue_head_init(&priv->cm.skb_queue);

-	priv->cm.srq = ib_create_srq(priv->pd, &srq_init_attr);
-	if (IS_ERR(priv->cm.srq)) {
-		ret = PTR_ERR(priv->cm.srq);
-		priv->cm.srq = NULL;
+	if (ret = ib_query_device(priv->ca, &attr))
  		return ret;
-	}

-	priv->cm.srq_ring = kzalloc(ipoib_recvq_size * sizeof *priv->cm.srq_ring,
-				    GFP_KERNEL);
-	if (!priv->cm.srq_ring) {
-		printk(KERN_WARNING "%s: failed to allocate CM ring (%d entries)\n",
-		       priv->ca->name, ipoib_recvq_size);
-		ipoib_cm_dev_cleanup(dev);
-		return -ENOMEM;
+	if (attr.max_srq) {
+		/* This device supports SRQ */
+		if (ret = create_srq(dev, priv))
+			return ret;
+		priv->cm.rx_index_table = NULL;
+	} else {
+		priv->cm.srq = NULL;
+		priv->cm.srq_ring = NULL;
+
+		/* Every new REQ that arrives creates a struct ipoib_cm_rx.
+		 * These structures form a link list starting with the
+		 * passive_ids. For quick and easy access we maintain a table
+		 * of pointers to struct ipoib_cm_rx called the rx_index_table
+		 */
+		priv->cm.rx_index_table = kzalloc(NOSRQ_INDEX_TABLE_SIZE *
+					 sizeof *priv->cm.rx_index_table,
+					 GFP_KERNEL);
+		if (!priv->cm.rx_index_table) {
+			printk(KERN_WARNING "Failed to allocate NOSRQ_INDEX_TABLE\n");
+			return -ENOMEM;
+		}	
  	}

  	for (i = 0; i < IPOIB_CM_RX_SG; ++i)
@@ -1194,17 +1533,23 @@ int ipoib_cm_dev_init(struct net_device
  	priv->cm.rx_wr.sg_list = priv->cm.rx_sge;
  	priv->cm.rx_wr.num_sge = IPOIB_CM_RX_SG;

-	for (i = 0; i < ipoib_recvq_size; ++i) {
-		if (!ipoib_cm_alloc_rx_skb(dev, i, IPOIB_CM_RX_SG - 1,
+	/* One can post receive buffers even before the RX QP is created
+	 * only in the SRQ case. Therefore for NOSRQ we skip the rest of init
+	 * and do that in ipoib_cm_req_handler() */
+
+	if (priv->cm.srq) {
+		for (i = 0; i < ipoib_recvq_size; ++i) {
+			if (!ipoib_cm_alloc_rx_skb(dev, i, IPOIB_CM_RX_SG - 1,
  					   priv->cm.srq_ring[i].mapping)) {
-			ipoib_warn(priv, "failed to allocate receive buffer %d\n", i);
-			ipoib_cm_dev_cleanup(dev);
-			return -ENOMEM;
-		}
-		if (ipoib_cm_post_receive(dev, i)) {
-			ipoib_warn(priv, "ipoib_ib_post_receive failed for buf %d\n", i);
-			ipoib_cm_dev_cleanup(dev);
-			return -EIO;
+				ipoib_warn(priv, "failed to allocate receive buffer %d\n", i);
+				ipoib_cm_dev_cleanup(dev);
+				return -ENOMEM;
+			}
+			if (ipoib_cm_post_receive(dev, i)) {
+				ipoib_warn(priv, "ipoib_ib_post_receive failed for buf %d\n", i);
+				ipoib_cm_dev_cleanup(dev);
+				return -EIO;
+			}
  		}
  	}

--- a/drivers/infiniband/ulp/ipoib/ipoib_ib.c	2007-05-07 
22:31:33.000000000 -0700
+++ b/drivers/infiniband/ulp/ipoib/ipoib_ib.c	2007-05-07 
17:29:52.000000000 -0700
@@ -299,7 +299,7 @@ int ipoib_poll(struct net_device *dev, i
  		for (i = 0; i < n; ++i) {
  			struct ib_wc *wc = priv->ibwc + i;

-			if (wc->wr_id & IPOIB_CM_OP_SRQ) {
+			if (wc->wr_id & IPOIB_CM_OP_RECV) {
  				++done;
  				--max;
  				ipoib_cm_handle_rx_wc(dev, wc);
@@ -607,7 +607,7 @@ int ipoib_ib_dev_stop(struct net_device
  		do {
  			n = ib_poll_cq(priv->cq, IPOIB_NUM_WC, priv->ibwc);
  			for (i = 0; i < n; ++i) {
-				if (priv->ibwc[i].wr_id & IPOIB_CM_OP_SRQ)
+				if (priv->ibwc[i].wr_id & IPOIB_CM_OP_RECV)
  					ipoib_cm_handle_rx_wc(dev, priv->ibwc + i);
  				else if (priv->ibwc[i].wr_id & IPOIB_OP_RECV)
  					ipoib_ib_handle_rx_wc(dev, priv->ibwc + i);
--- a/drivers/infiniband/ulp/ipoib/ipoib_verbs.c	2007-05-07 
16:05:32.000000000 -0700
+++ b/drivers/infiniband/ulp/ipoib/ipoib_verbs.c	2007-05-07 
17:13:28.000000000 -0700
@@ -187,6 +187,15 @@ int ipoib_transport_dev_init(struct net_
  	if (!ret)
  		size += ipoib_recvq_size;

+ 	/* We increase the size of the CQ in the NOSRQ case to prevent CQ
+ 	 * overflow. Every new REQ creates a new RX QP and each QP has an
+ 	 * RX ring associated with it. Therefore we could have
+ 	 * NOSRQ_INDEX_TABLE_SIZE*ipoib_recvq_size + ipoib_sendq_size CQEs
+ 	 * in a CQ.
+ 	 */
+ 	if(!priv->cm.srq)
+ 		size += (NOSRQ_INDEX_TABLE_SIZE -1)* ipoib_recvq_size;
+
  	priv->cq = ib_create_cq(priv->ca, ipoib_ib_completion, NULL, dev, 
size, 0);
  	if (IS_ERR(priv->cq)) {
  		printk(KERN_WARNING "%s: failed to create CQ\n", ca->name);




More information about the general mailing list