[ofa-general] IPOB CM (NOSRQ) [PATCH V7] patch

Pradeep Satyanarayana pradeeps at linux.vnet.ibm.com
Tue Jul 17 15:39:27 PDT 2007


Here is a seventh version of the IPOIB_CM_NOSRQ patch.

Changes from V6:
1. Minor changes incorporating Sean Hefty's comments (changed
spin lock to an atomic and additional cleanups)

This patch has been tested with linux-2.6.22 derived from Roland's
for-2.6.23 git tree on ppc64 machines

Signed-off-by: Pradeep Satyanarayana <pradeeps at linux.vnet.ibm.com>
---

--- a/linux-2.6.22/drivers/infiniband/ulp/ipoib/ipoib.h	2007-05-30 
14:56:25.000000000 -0400
+++ b/linux-2.6.22/drivers/infiniband/ulp/ipoib/ipoib.h	2007-07-10 
18:30:10.000000000 -0400
@@ -95,11 +95,16 @@ enum {
  	IPOIB_MCAST_FLAG_ATTACHED = 3,
  };

+#define CM_PACKET_SIZE (1ul << 16)
  #define	IPOIB_OP_RECV   (1ul << 31)
  #ifdef CONFIG_INFINIBAND_IPOIB_CM
-#define	IPOIB_CM_OP_SRQ (1ul << 30)
+#define	IPOIB_CM_OP_RECV (1ul << 30)
+
+#define NOSRQ_INDEX_TABLE_SIZE 128
+#define NOSRQ_INDEX_MASK      (NOSRQ_INDEX_TABLE_SIZE -1)
+
  #else
-#define	IPOIB_CM_OP_SRQ (0)
+#define	IPOIB_CM_OP_RECV (0)
  #endif

  /* structs */
@@ -166,11 +171,14 @@ enum ipoib_cm_state {
  };

  struct ipoib_cm_rx {
-	struct ib_cm_id     *id;
-	struct ib_qp        *qp;
-	struct list_head     list;
-	struct net_device   *dev;
-	unsigned long        jiffies;
+	struct ib_cm_id     	*id;
+	struct ib_qp        	*qp;
+	struct ipoib_cm_rx_buf  *rx_ring; /* Used by NOSRQ only */
+	struct list_head     	 list;
+	struct net_device   	*dev;
+	unsigned long        	 jiffies;
+	u32                      index; /* wr_ids are distinguished by index
+					 * to identify the QP -NOSRQ only */
  	enum ipoib_cm_state  state;
  };

@@ -215,6 +223,8 @@ struct ipoib_cm_dev_priv {
  	struct ib_wc            ibwc[IPOIB_NUM_WC];
  	struct ib_sge           rx_sge[IPOIB_CM_RX_SG];
  	struct ib_recv_wr       rx_wr;
+	struct ipoib_cm_rx	**rx_index_table; /* See ipoib_cm_dev_init()
+						   *for usage of this element */
  };

  /*
@@ -564,10 +574,9 @@ static inline void ipoib_cm_skb_too_long
  	dev_kfree_skb_any(skb);
  }

-static inline void ipoib_cm_handle_rx_wc(struct net_device *dev, struct 
ib_wc *wc)
+void ipoib_cm_handle_rx_wc(struct net_device *dev, struct ib_wc *wc)
  {
  }
-
  #endif

  #ifdef CONFIG_INFINIBAND_IPOIB_DEBUG
--- a/linux-2.6.22/drivers/infiniband/ulp/ipoib/ipoib_cm.c	2007-07-10 
17:02:33.000000000 -0400
+++ b/linux-2.6.22/drivers/infiniband/ulp/ipoib/ipoib_cm.c	2007-07-17 
17:45:16.000000000 -0400
@@ -49,6 +49,17 @@ MODULE_PARM_DESC(cm_data_debug_level,

  #include "ipoib.h"

+int max_rc_qp = NOSRQ_INDEX_TABLE_SIZE;
+int max_recv_buf = 1024; /* Default is 1024 MB */
+
+module_param_named(nosrq_max_rc_qp, max_rc_qp, int, 0644);
+MODULE_PARM_DESC(nosrq_max_rc_qp, "Max number of NOSRQ RC QPs supported");
+
+module_param_named(max_receive_buffer, max_recv_buf, int, 0644);
+MODULE_PARM_DESC(max_receive_buffer, "Max Receive Buffer Size in MB");
+
+atomic_t current_rc_qp; /* Active number of RC QPs for NOSRQ */
+
  #define IPOIB_CM_IETF_ID 0x1000000000000000ULL

  #define IPOIB_CM_RX_UPDATE_TIME (256 * HZ)
@@ -81,20 +92,20 @@ static void ipoib_cm_dma_unmap_rx(struct
  		ib_dma_unmap_single(priv->ca, mapping[i + 1], PAGE_SIZE, 
DMA_FROM_DEVICE);
  }

-static int ipoib_cm_post_receive(struct net_device *dev, int id)
+static int post_receive_srq(struct net_device *dev, u64 id)
  {
  	struct ipoib_dev_priv *priv = netdev_priv(dev);
  	struct ib_recv_wr *bad_wr;
  	int i, ret;

-	priv->cm.rx_wr.wr_id = id | IPOIB_CM_OP_SRQ;
+	priv->cm.rx_wr.wr_id = id | IPOIB_CM_OP_RECV;

  	for (i = 0; i < IPOIB_CM_RX_SG; ++i)
  		priv->cm.rx_sge[i].addr = priv->cm.srq_ring[id].mapping[i];

  	ret = ib_post_srq_recv(priv->cm.srq, &priv->cm.rx_wr, &bad_wr);
  	if (unlikely(ret)) {
-		ipoib_warn(priv, "post srq failed for buf %d (%d)\n", id, ret);
+		ipoib_warn(priv, "post srq failed for buf %ld (%d)\n", id, ret);
  		ipoib_cm_dma_unmap_rx(priv, IPOIB_CM_RX_SG - 1,
  				      priv->cm.srq_ring[id].mapping);
  		dev_kfree_skb_any(priv->cm.srq_ring[id].skb);
@@ -104,12 +115,47 @@ static int ipoib_cm_post_receive(struct
  	return ret;
  }

-static struct sk_buff *ipoib_cm_alloc_rx_skb(struct net_device *dev, 
int id, int frags,
+static int post_receive_nosrq(struct net_device *dev, u64 id)
+{
+	struct ipoib_dev_priv *priv = netdev_priv(dev);
+	struct ib_recv_wr *bad_wr;
+	int i, ret;
+	u32 index;
+	u32 wr_id;
+	struct ipoib_cm_rx *rx_ptr;
+
+	index = id  & NOSRQ_INDEX_MASK ;
+	wr_id = id >> 32;
+
+	rx_ptr = priv->cm.rx_index_table[index];
+
+	priv->cm.rx_wr.wr_id = id | IPOIB_CM_OP_RECV;
+
+	for (i = 0; i < IPOIB_CM_RX_SG; ++i)
+		priv->cm.rx_sge[i].addr = rx_ptr->rx_ring[wr_id].mapping[i];
+
+	ret = ib_post_recv(rx_ptr->qp, &priv->cm.rx_wr, &bad_wr);
+	if (unlikely(ret)) {
+		ipoib_warn(priv, "post recv failed for buf %d (%d)\n",
+		           wr_id, ret);
+		ipoib_cm_dma_unmap_rx(priv, IPOIB_CM_RX_SG - 1,
+		                      rx_ptr->rx_ring[wr_id].mapping);
+		dev_kfree_skb_any(rx_ptr->rx_ring[wr_id].skb);
+		rx_ptr->rx_ring[wr_id].skb = NULL;
+	}
+
+	return ret;
+}
+
+static struct sk_buff *ipoib_cm_alloc_rx_skb(struct net_device *dev, 
u64 id,
+					     int frags,
  					     u64 mapping[IPOIB_CM_RX_SG])
  {
  	struct ipoib_dev_priv *priv = netdev_priv(dev);
  	struct sk_buff *skb;
  	int i;
+	struct ipoib_cm_rx *rx_ptr;
+	u32 index, wr_id;

  	skb = dev_alloc_skb(IPOIB_CM_HEAD_SIZE + 12);
  	if (unlikely(!skb))
@@ -141,7 +187,14 @@ static struct sk_buff *ipoib_cm_alloc_rx
  			goto partial_error;
  	}

-	priv->cm.srq_ring[id].skb = skb;
+	if (priv->cm.srq)
+		priv->cm.srq_ring[id].skb = skb;
+	else {
+		index = id  & NOSRQ_INDEX_MASK ;
+		wr_id = id >> 32;
+		rx_ptr = priv->cm.rx_index_table[index];
+		rx_ptr->rx_ring[wr_id].skb = skb;
+	}
  	return skb;

  partial_error:
@@ -198,16 +251,21 @@ static struct ib_qp *ipoib_cm_create_rx_
  {
  	struct ipoib_dev_priv *priv = netdev_priv(dev);
  	struct ib_qp_init_attr attr = {
-		.event_handler = ipoib_cm_rx_event_handler,
  		.send_cq = priv->cq, /* For drain WR */
  		.recv_cq = priv->cq,
  		.srq = priv->cm.srq,
  		.cap.max_send_wr = 1, /* For drain WR */
+		.cap.max_recv_wr = ipoib_recvq_size + 1,
  		.cap.max_send_sge = 1, /* FIXME: 0 Seems not to work */
  		.sq_sig_type = IB_SIGNAL_ALL_WR,
  		.qp_type = IB_QPT_RC,
  		.qp_context = p,
  	};
+	if (!priv->cm.srq) {
+		attr.cap.max_recv_sge = IPOIB_CM_RX_SG;	
+		attr.event_handler = NULL;
+	} else
+		attr.event_handler = ipoib_cm_rx_event_handler;
  	return ib_create_qp(priv->pd, &attr);
  }

@@ -282,12 +340,129 @@ static int ipoib_cm_send_rep(struct net_
  	rep.flow_control = 0;
  	rep.rnr_retry_count = req->rnr_retry_count;
  	rep.target_ack_delay = 20; /* FIXME */
-	rep.srq = 1;
  	rep.qp_num = qp->qp_num;
  	rep.starting_psn = psn;
+	rep.srq	= !!priv->cm.srq;
  	return ib_send_cm_rep(cm_id, &rep);
  }

+static void init_context_and_add_list(struct ib_cm_id *cm_id,
+				    struct ipoib_cm_rx *p,
+				    struct ipoib_dev_priv *priv)
+{
+	cm_id->context = p;
+	p->jiffies = jiffies;
+	spin_lock_irq(&priv->lock);
+	if (list_empty(&priv->cm.passive_ids))
+		queue_delayed_work(ipoib_workqueue,
+				   &priv->cm.stale_task, IPOIB_CM_RX_DELAY);
+	if (priv->cm.srq) {
+		/* Add this entry to passive ids list head, but do not re-add
+		 * it if IB_EVENT_QP_LAST_WQE_REACHED has moved it to flush
+		 * list.
+		 */
+		if (p->state == IPOIB_CM_RX_LIVE)
+			list_move(&p->list, &priv->cm.passive_ids);
+	}
+	spin_unlock_irq(&priv->lock);
+}
+
+static int allocate_and_post_rbuf_nosrq(struct ib_cm_id *cm_id,
+				        struct ipoib_cm_rx *p, unsigned psn)
+{
+	struct net_device *dev = cm_id->context;
+	struct ipoib_dev_priv *priv = netdev_priv(dev);
+	int ret;
+	u32 qp_num, index;
+	u64 i, recv_mem_used;
+
+	qp_num = p->qp->qp_num;
+
+	/* In the SRQ case there is a common rx buffer called the srq_ring.
+	 * However, for the NOSRQ we create an rx_ring for every
+	 * struct ipoib_cm_rx.
+	 */
+	p->rx_ring = kzalloc(ipoib_recvq_size * sizeof *p->rx_ring, GFP_KERNEL);
+	if (!p->rx_ring) {
+		printk(KERN_WARNING "Failed to allocate rx_ring for 0x%x\n",
+		       qp_num);
+		return -ENOMEM;
+	}
+
+	spin_lock_irq(&priv->lock);
+	list_add(&p->list, &priv->cm.passive_ids);
+	spin_unlock_irq(&priv->lock);
+
+	init_context_and_add_list(cm_id, p, priv);
+	spin_lock_irq(&priv->lock);
+		
+	for (index = 0; index < max_rc_qp; index++)
+		if (priv->cm.rx_index_table[index] == NULL)
+			break;
+
+	recv_mem_used = (u64)ipoib_recvq_size *
+		        (u64)atomic_inc_return(&current_rc_qp)
+		         * CM_PACKET_SIZE; /* packets are 64K */
+	if ((index == max_rc_qp) ||
+	( recv_mem_used >= max_recv_buf * (1ul << 20))) {
+		spin_unlock_irq(&priv->lock);
+		ipoib_warn(priv, "NOSRQ has reached the configurable limit "
+		           "of either %d RC QPs or, max recv buf size of "
+			   "0x%x MB\n", max_rc_qp, max_recv_buf);
+
+		/* We send a REJ to the remote side indicating that we
+		 * have no more free RC QPs and leave it to the remote side
+		 * to take appropriate action. This should leave the
+		 * current set of QPs unaffected and any subsequent REQs
+		 * will be able to use RC QPs if they are available.
+		 */
+		ib_send_cm_rej(cm_id, IB_CM_REJ_NO_QP, NULL, 0, NULL, 0);
+		ret = -EINVAL;
+		goto err_alloc_and_post;
+	}
+
+	priv->cm.rx_index_table[index] = p;
+	spin_unlock_irq(&priv->lock);
+
+	/* We will subsequently use this stored pointer while freeing
+	 * resources in stale task
+	 */
+	p->index = index;
+
+	ret = ipoib_cm_modify_rx_qp(dev, cm_id, p->qp, psn);
+	if (ret) {
+		ipoib_warn(priv, "ipoib_cm_modify_rx_qp() failed %d\n", ret);
+		ipoib_cm_dev_cleanup(dev);
+		goto err_alloc_and_post;
+	}
+
+	for (i = 0; i < ipoib_recvq_size; ++i) {
+		if (!ipoib_cm_alloc_rx_skb(dev, i << 32 | index,
+					   IPOIB_CM_RX_SG - 1,
+					   p->rx_ring[i].mapping)) {
+			ipoib_warn(priv, "failed to allocate receive "
+			           "buffer %ld\n", i);
+			ipoib_cm_dev_cleanup(dev);
+			ret = -ENOMEM;
+			goto err_alloc_and_post;
+		}
+
+		if (post_receive_nosrq(dev, i << 32 | index)) {
+			ipoib_warn(priv, "post_receive_nosrq "
+			           "failed for  buf %ld\n", i);
+			ipoib_cm_dev_cleanup(dev);
+			ret = -EIO;
+			goto err_alloc_and_post;
+		}
+	}
+
+	return 0;
+
+err_alloc_and_post:
+	kfree(p->rx_ring);
+	return ret;
+}
+
  static int ipoib_cm_req_handler(struct ib_cm_id *cm_id, struct 
ib_cm_event *event)
  {
  	struct net_device *dev = cm_id->context;
@@ -298,13 +473,13 @@ static int ipoib_cm_req_handler(struct i

  	ipoib_dbg(priv, "REQ arrived\n");
  	p = kzalloc(sizeof *p, GFP_KERNEL);
-	if (!p)
+	if (!p) {
+		printk(KERN_WARNING "Failed to allocate RX control block when "
+		       "REQ arrived\n");
  		return -ENOMEM;
+	}
  	p->dev = dev;
  	p->id = cm_id;
-	cm_id->context = p;
-	p->state = IPOIB_CM_RX_LIVE;
-	p->jiffies = jiffies;
  	INIT_LIST_HEAD(&p->list);

  	p->qp = ipoib_cm_create_rx_qp(dev, p);
@@ -314,19 +489,20 @@ static int ipoib_cm_req_handler(struct i
  	}

  	psn = random32() & 0xffffff;
-	ret = ipoib_cm_modify_rx_qp(dev, cm_id, p->qp, psn);
-	if (ret)
-		goto err_modify;
+	if (!priv->cm.srq) {
+		if ((ret = allocate_and_post_rbuf_nosrq(cm_id, p, psn)))
+			goto err_post_nosrq;
+	} else {
+		p->rx_ring = NULL;
+		ret = ipoib_cm_modify_rx_qp(dev, cm_id, p->qp, psn);
+		if (ret)
+			goto err_modify;
+	}

-	spin_lock_irq(&priv->lock);
-	queue_delayed_work(ipoib_workqueue,
-			   &priv->cm.stale_task, IPOIB_CM_RX_DELAY);
-	/* Add this entry to passive ids list head, but do not re-add it
-	 * if IB_EVENT_QP_LAST_WQE_REACHED has moved it to flush list. */
-	p->jiffies = jiffies;
-	if (p->state == IPOIB_CM_RX_LIVE)
-		list_move(&p->list, &priv->cm.passive_ids);
-	spin_unlock_irq(&priv->lock);
+	if (priv->cm.srq) {
+		p->state = IPOIB_CM_RX_LIVE;
+		init_context_and_add_list(cm_id, p, priv);
+	}

  	ret = ipoib_cm_send_rep(dev, cm_id, p->qp, &event->param.req_rcvd, psn);
  	if (ret) {
@@ -336,6 +512,9 @@ static int ipoib_cm_req_handler(struct i
  	}
  	return 0;

+err_post_nosrq:
+	list_del_init(&p->list);
+	atomic_dec(&current_rc_qp);
  err_modify:
  	ib_destroy_qp(p->qp);
  err_qp:
@@ -399,29 +578,60 @@ static void skb_put_frags(struct sk_buff
  	}
  }

-void ipoib_cm_handle_rx_wc(struct net_device *dev, struct ib_wc *wc)
+static void timer_check_srq(struct ipoib_dev_priv *priv, struct 
ipoib_cm_rx *p)
+{
+	unsigned long flags;
+
+	if (p && time_after_eq(jiffies, p->jiffies + IPOIB_CM_RX_UPDATE_TIME)) {
+		spin_lock_irqsave(&priv->lock, flags);
+		p->jiffies = jiffies;
+		/* Move this entry to list head, but do
+		 * not re-add it if it has been removed.
+		 */
+		if (p->state == IPOIB_CM_RX_LIVE)
+			list_move(&p->list, &priv->cm.passive_ids);
+		spin_unlock_irqrestore(&priv->lock, flags);
+	}
+}
+
+static void timer_check_nosrq(struct ipoib_dev_priv *priv, struct 
ipoib_cm_rx *p)
+{
+	unsigned long flags;
+
+	if (p && time_after_eq(jiffies, p->jiffies + IPOIB_CM_RX_UPDATE_TIME)) {
+		spin_lock_irqsave(&priv->lock, flags);
+		p->jiffies = jiffies;
+		/* Move this entry to list head, but do
+		 * not re-add it if it has been removed. */
+		if (!list_empty(&p->list))	
+			list_move(&p->list, &priv->cm.passive_ids);
+		spin_unlock_irqrestore(&priv->lock, flags);
+	}
+}
+
+void handle_rx_wc_srq(struct net_device *dev, struct ib_wc *wc)
  {
  	struct ipoib_dev_priv *priv = netdev_priv(dev);
-	unsigned int wr_id = wc->wr_id & ~IPOIB_CM_OP_SRQ;
+	u64 wr_id = wc->wr_id & ~IPOIB_CM_OP_RECV;
  	struct sk_buff *skb, *newskb;
  	struct ipoib_cm_rx *p;
  	unsigned long flags;
  	u64 mapping[IPOIB_CM_RX_SG];
-	int frags;
+	int frags, ret;

  	ipoib_dbg_data(priv, "cm recv completion: id %d, status: %d\n",
  		       wr_id, wc->status);

  	if (unlikely(wr_id >= ipoib_recvq_size)) {
-		if (wr_id == (IPOIB_CM_RX_DRAIN_WRID & ~IPOIB_CM_OP_SRQ)) {
+		if (wr_id == (IPOIB_CM_RX_DRAIN_WRID & ~IPOIB_CM_OP_RECV)) {
  			spin_lock_irqsave(&priv->lock, flags);
  			list_splice_init(&priv->cm.rx_drain_list, &priv->cm.rx_reap_list);
  			ipoib_cm_start_rx_drain(priv);
  			queue_work(ipoib_workqueue, &priv->cm.rx_reap_task);
  			spin_unlock_irqrestore(&priv->lock, flags);
  		} else
-			ipoib_warn(priv, "cm recv completion event with wrid %d (> %d)\n",
-				   wr_id, ipoib_recvq_size);
+			ipoib_warn(priv, "cm recv completion event with wrid 0x%llx (> 0x%x)\n",
+				   (unsigned long long)wr_id, ipoib_recvq_size);
  		return;
  	}

@@ -429,23 +639,15 @@ void ipoib_cm_handle_rx_wc(struct net_de

  	if (unlikely(wc->status != IB_WC_SUCCESS)) {
  		ipoib_dbg(priv, "cm recv error "
-			   "(status=%d, wrid=%d vend_err %x)\n",
-			   wc->status, wr_id, wc->vendor_err);
+			   "(status=%d, wrid=0x%llx vend_err %x)\n",
+			   wc->status, (unsigned long long)wr_id, wc->vendor_err);
  		++priv->stats.rx_dropped;
-		goto repost;
+		goto repost_srq;
  	}

  	if (!likely(wr_id & IPOIB_CM_RX_UPDATE_MASK)) {
  		p = wc->qp->qp_context;
-		if (p && time_after_eq(jiffies, p->jiffies + IPOIB_CM_RX_UPDATE_TIME)) {
-			spin_lock_irqsave(&priv->lock, flags);
-			p->jiffies = jiffies;
-			/* Move this entry to list head, but do not re-add it
-			 * if it has been moved out of list. */
-			if (p->state == IPOIB_CM_RX_LIVE)
-				list_move(&p->list, &priv->cm.passive_ids);
-			spin_unlock_irqrestore(&priv->lock, flags);
-		}
+		timer_check_srq(priv, p);
  	}

  	frags = PAGE_ALIGN(wc->byte_len - min(wc->byte_len,
@@ -457,13 +659,112 @@ void ipoib_cm_handle_rx_wc(struct net_de
  		 * If we can't allocate a new RX buffer, dump
  		 * this packet and reuse the old buffer.
  		 */
-		ipoib_dbg(priv, "failed to allocate receive buffer %d\n", wr_id);
+		ipoib_dbg(priv, "failed to allocate receive buffer %ld\n", wr_id);
+                ++priv->stats.rx_dropped;
+                goto repost_srq;
+        }
+
+	ipoib_cm_dma_unmap_rx(priv, frags,
+	                      priv->cm.srq_ring[wr_id].mapping);
+	memcpy(priv->cm.srq_ring[wr_id].mapping, mapping,
+	       (frags + 1) * sizeof *mapping);
+	ipoib_dbg_data(priv, "received %d bytes, SLID 0x%04x\n",
+		       wc->byte_len, wc->slid);
+
+	skb_put_frags(skb, IPOIB_CM_HEAD_SIZE, wc->byte_len, newskb);
+
+	skb->protocol = ((struct ipoib_header *) skb->data)->proto;
+	skb_reset_mac_header(skb);	
+	skb_pull(skb, IPOIB_ENCAP_LEN);
+
+	dev->last_rx = jiffies;
+	++priv->stats.rx_packets;
+	priv->stats.rx_bytes += skb->len;
+
+	skb->dev = dev;
+	/* XXX get correct PACKET_ type here */
+	skb->pkt_type = PACKET_HOST;
+	netif_receive_skb(skb);
+
+repost_srq:
+	ret = post_receive_srq(dev, wr_id);
+
+	if (unlikely(ret))
+		ipoib_warn(priv, "post_receive_srq failed for buf %ld\n",
+		           wr_id);
+
+}
+
+static void handle_rx_wc_nosrq(struct net_device *dev, struct ib_wc *wc)
+{
+	struct ipoib_dev_priv *priv = netdev_priv(dev);
+	struct sk_buff *skb, *newskb;
+	u64 mapping[IPOIB_CM_RX_SG], wr_id = wc->wr_id >> 32;
+	u32 index;
+	struct ipoib_cm_rx *rx_ptr;
+	int frags, ret;
+
+
+	ipoib_dbg_data(priv, "cm recv completion: id %d, status: %d\n",
+		       wr_id, wc->status);
+
+	if (unlikely(wr_id >= ipoib_recvq_size)) {
+		ipoib_warn(priv, "cm recv completion event with wrid 0x%llx (> %d)\n",
+				   (unsigned long long)wr_id, ipoib_recvq_size);
+		return;
+	}
+
+	index = (wc->wr_id & ~IPOIB_CM_OP_RECV) & NOSRQ_INDEX_MASK ;
+
+	/* This is the only place where rx_ptr could be a NULL - could
+	 * have just received a packet from a connection that has become
+	 * stale and so is going away. We will simply drop the packet and
+	 * let the hardware (it s IB_QPT_RC) handle the dropped packet.
+	 * In the timer_check() function below, p->jiffies is updated and
+	 * hence the connection will not be stale after that.
+	 */
+	rx_ptr = priv->cm.rx_index_table[index];
+	if (unlikely(!rx_ptr)) {
+		ipoib_warn(priv, "Received packet from a connection "
+		           "that is going away. Hardware will handle it.\n");
+		return;
+	}
+
+	skb = rx_ptr->rx_ring[wr_id].skb;
+
+	if (unlikely(wc->status != IB_WC_SUCCESS)) {
+		ipoib_dbg(priv, "cm recv error "
+			   "(status=%d, wrid=%ld vend_err %x)\n",
+			   wc->status, wr_id, wc->vendor_err);
+		++priv->stats.rx_dropped;
+		goto repost_nosrq;
+	}
+
+	if (!likely(wr_id & IPOIB_CM_RX_UPDATE_MASK)) {
+		/* There are no guarantees that wc->qp is not NULL for HCAs
+	 	* that do not support SRQ. */
+		timer_check_nosrq(priv, rx_ptr);
+	}
+
+	frags = PAGE_ALIGN(wc->byte_len - min(wc->byte_len,
+					      (unsigned)IPOIB_CM_HEAD_SIZE)) / PAGE_SIZE;
+
+	newskb = ipoib_cm_alloc_rx_skb(dev, wr_id << 32 | index, frags,
+				       mapping);
+	if (unlikely(!newskb)) {
+		/*
+		 * If we can't allocate a new RX buffer, dump
+		 * this packet and reuse the old buffer.
+		 */
+		ipoib_dbg(priv, "failed to allocate receive buffer %ld\n", wr_id);
  		++priv->stats.rx_dropped;
-		goto repost;
+		goto repost_nosrq;
  	}

-	ipoib_cm_dma_unmap_rx(priv, frags, priv->cm.srq_ring[wr_id].mapping);
-	memcpy(priv->cm.srq_ring[wr_id].mapping, mapping, (frags + 1) * sizeof 
*mapping);
+	ipoib_cm_dma_unmap_rx(priv, frags,
+	                      rx_ptr->rx_ring[wr_id].mapping);
+	memcpy(rx_ptr->rx_ring[wr_id].mapping, mapping,
+	       (frags + 1) * sizeof *mapping);

  	ipoib_dbg_data(priv, "received %d bytes, SLID 0x%04x\n",
  		       wc->byte_len, wc->slid);
@@ -483,10 +784,22 @@ void ipoib_cm_handle_rx_wc(struct net_de
  	skb->pkt_type = PACKET_HOST;
  	netif_receive_skb(skb);

-repost:
-	if (unlikely(ipoib_cm_post_receive(dev, wr_id)))
-		ipoib_warn(priv, "ipoib_cm_post_receive failed "
-			   "for buf %d\n", wr_id);
+repost_nosrq:
+	ret = post_receive_nosrq(dev, wr_id << 32 | index);
+
+	if (unlikely(ret))
+		ipoib_warn(priv, "post_receive_nosrq failed for buf %ld\n",
+		           wr_id);
+}
+
+void ipoib_cm_handle_rx_wc(struct net_device *dev, struct ib_wc *wc)
+{
+	struct ipoib_dev_priv *priv = netdev_priv(dev);
+
+	if (priv->cm.srq)
+		handle_rx_wc_srq(dev, wc);
+	else
+		handle_rx_wc_nosrq(dev, wc);
  }

  static inline int post_send(struct ipoib_dev_priv *priv,
@@ -678,6 +991,42 @@ err_cm:
  	return ret;
  }

+static void free_resources_nosrq(struct ipoib_dev_priv *priv, struct 
ipoib_cm_rx *p)
+{
+	int i;
+
+	for(i = 0; i < ipoib_recvq_size; ++i)
+		if(p->rx_ring[i].skb) {
+			ipoib_cm_dma_unmap_rx(priv,
+				         IPOIB_CM_RX_SG - 1,
+					 p->rx_ring[i].mapping);
+			dev_kfree_skb_any(p->rx_ring[i].skb);
+			p->rx_ring[i].skb = NULL;
+		}
+	kfree(p->rx_ring);
+}
+
+void dev_stop_nosrq(struct ipoib_dev_priv *priv)
+{
+	struct ipoib_cm_rx *p;
+
+	spin_lock_irq(&priv->lock);
+	while (!list_empty(&priv->cm.passive_ids)) {
+		p = list_entry(priv->cm.passive_ids.next, typeof(*p), list);
+		free_resources_nosrq(priv, p);
+		list_del(&p->list);
+		spin_unlock_irq(&priv->lock);
+		ib_destroy_cm_id(p->id);
+		ib_destroy_qp(p->qp);
+		atomic_dec(&current_rc_qp);
+		kfree(p);
+		spin_lock_irq(&priv->lock);
+	}
+	spin_unlock_irq(&priv->lock);
+
+	cancel_delayed_work(&priv->cm.stale_task);
+}
+
  void ipoib_cm_dev_stop(struct net_device *dev)
  {
  	struct ipoib_dev_priv *priv = netdev_priv(dev);
@@ -692,6 +1041,11 @@ void ipoib_cm_dev_stop(struct net_device
  	ib_destroy_cm_id(priv->cm.id);
  	priv->cm.id = NULL;

+	if (!priv->cm.srq) {
+		dev_stop_nosrq(priv);
+		return;
+	}
+
  	spin_lock_irq(&priv->lock);
  	while (!list_empty(&priv->cm.passive_ids)) {
  		p = list_entry(priv->cm.passive_ids.next, typeof(*p), list);
@@ -737,6 +1091,7 @@ void ipoib_cm_dev_stop(struct net_device
  		kfree(p);
  	}

+
  	cancel_delayed_work(&priv->cm.stale_task);
  }

@@ -815,7 +1170,9 @@ static struct ib_qp *ipoib_cm_create_tx_
  	attr.recv_cq = priv->cq;
  	attr.srq = priv->cm.srq;
  	attr.cap.max_send_wr = ipoib_sendq_size;
+	attr.cap.max_recv_wr = 1;
  	attr.cap.max_send_sge = 1;
+	attr.cap.max_recv_sge = 1;
  	attr.sq_sig_type = IB_SIGNAL_ALL_WR;
  	attr.qp_type = IB_QPT_RC;
  	attr.send_cq = cq;
@@ -855,7 +1212,7 @@ static int ipoib_cm_send_req(struct net_
  	req.retry_count 	      = 0; /* RFC draft warns against retries */
  	req.rnr_retry_count 	      = 0; /* RFC draft warns against retries */
  	req.max_cm_retries 	      = 15;
-	req.srq 	              = 1;
+	req.srq			      = !!priv->cm.srq;
  	return ib_send_cm_req(id, &req);
  }

@@ -1200,6 +1557,9 @@ static void ipoib_cm_rx_reap(struct work
  	list_for_each_entry_safe(p, n, &list, list) {
  		ib_destroy_cm_id(p->id);
  		ib_destroy_qp(p->qp);
+		if (!priv->cm.srq) {	
+			atomic_dec(&current_rc_qp);
+		}
  		kfree(p);
  	}
  }
@@ -1218,12 +1578,19 @@ static void ipoib_cm_stale_task(struct w
  		p = list_entry(priv->cm.passive_ids.prev, typeof(*p), list);
  		if (time_before_eq(jiffies, p->jiffies + IPOIB_CM_RX_TIMEOUT))
  			break;
-		list_move(&p->list, &priv->cm.rx_error_list);
-		p->state = IPOIB_CM_RX_ERROR;
-		spin_unlock_irq(&priv->lock);
-		ret = ib_modify_qp(p->qp, &ipoib_cm_err_attr, IB_QP_STATE);
-		if (ret)
-			ipoib_warn(priv, "unable to move qp to error state: %d\n", ret);
+		if (!priv->cm.srq) {
+			free_resources_nosrq(priv, p);
+			list_del_init(&p->list);
+			priv->cm.rx_index_table[p->index] = NULL;
+			spin_unlock_irq(&priv->lock);
+		} else {
+			list_move(&p->list, &priv->cm.rx_error_list);
+			p->state = IPOIB_CM_RX_ERROR;
+			spin_unlock_irq(&priv->lock);
+			ret = ib_modify_qp(p->qp, &ipoib_cm_err_attr, IB_QP_STATE);
+			if (ret)
+				ipoib_warn(priv, "unable to move qp to error state: %d\n", ret);
+		}
  		spin_lock_irq(&priv->lock);
  	}

@@ -1277,16 +1644,40 @@ int ipoib_cm_add_mode_attr(struct net_de
  	return device_create_file(&dev->dev, &dev_attr_mode);
  }

+static int create_srq(struct net_device *dev, struct ipoib_dev_priv *priv)
+{
+	struct ib_srq_init_attr srq_init_attr;
+	int ret;
+
+	srq_init_attr.attr.max_wr = ipoib_recvq_size;
+	srq_init_attr.attr.max_sge = IPOIB_CM_RX_SG;
+
+	priv->cm.srq = ib_create_srq(priv->pd, &srq_init_attr);
+	if (IS_ERR(priv->cm.srq)) {
+		ret = PTR_ERR(priv->cm.srq);
+		priv->cm.srq = NULL;
+		return ret;
+	}
+
+	priv->cm.srq_ring = kzalloc(ipoib_recvq_size *
+		                    sizeof *priv->cm.srq_ring,
+			            GFP_KERNEL);
+	if (!priv->cm.srq_ring) {
+		printk(KERN_WARNING "%s: failed to allocate CM ring "
+		       "(%d entries)\n",
+	       	       priv->ca->name, ipoib_recvq_size);
+		ipoib_cm_dev_cleanup(dev);
+		return -ENOMEM;
+	}
+
+	return 0;
+}
+
  int ipoib_cm_dev_init(struct net_device *dev)
  {
  	struct ipoib_dev_priv *priv = netdev_priv(dev);
-	struct ib_srq_init_attr srq_init_attr = {
-		.attr = {
-			.max_wr  = ipoib_recvq_size,
-			.max_sge = IPOIB_CM_RX_SG
-		}
-	};
  	int ret, i;
+	struct ib_device_attr attr;

  	INIT_LIST_HEAD(&priv->cm.passive_ids);
  	INIT_LIST_HEAD(&priv->cm.reap_list);
@@ -1303,20 +1694,32 @@ int ipoib_cm_dev_init(struct net_device

  	skb_queue_head_init(&priv->cm.skb_queue);

-	priv->cm.srq = ib_create_srq(priv->pd, &srq_init_attr);
-	if (IS_ERR(priv->cm.srq)) {
-		ret = PTR_ERR(priv->cm.srq);
-		priv->cm.srq = NULL;
+	if ((ret = ib_query_device(priv->ca, &attr)))
  		return ret;
-	}

-	priv->cm.srq_ring = kzalloc(ipoib_recvq_size * sizeof *priv->cm.srq_ring,
-				    GFP_KERNEL);
-	if (!priv->cm.srq_ring) {
-		printk(KERN_WARNING "%s: failed to allocate CM ring (%d entries)\n",
-		       priv->ca->name, ipoib_recvq_size);
-		ipoib_cm_dev_cleanup(dev);
-		return -ENOMEM;
+	if (attr.max_srq) {
+		/* This device supports SRQ */
+		if ((ret = create_srq(dev, priv)))
+			return ret;
+		priv->cm.rx_index_table = NULL;
+	} else {
+		priv->cm.srq = NULL;
+		priv->cm.srq_ring = NULL;
+
+		/* Every new REQ that arrives creates a struct ipoib_cm_rx.
+		 * These structures form a link list starting with the
+		 * passive_ids. For quick and easy access we maintain a table
+		 * of pointers to struct ipoib_cm_rx called the rx_index_table
+		 */
+		priv->cm.rx_index_table = kzalloc(NOSRQ_INDEX_TABLE_SIZE *
+					 sizeof *priv->cm.rx_index_table,
+					 GFP_KERNEL);
+		if (!priv->cm.rx_index_table) {
+			printk(KERN_WARNING "Failed to allocate NOSRQ_INDEX_TABLE\n");
+			return -ENOMEM;
+		}
+
+		atomic_set(&current_rc_qp, 0);
  	}

  	for (i = 0; i < IPOIB_CM_RX_SG; ++i)
@@ -1329,17 +1732,24 @@ int ipoib_cm_dev_init(struct net_device
  	priv->cm.rx_wr.sg_list = priv->cm.rx_sge;
  	priv->cm.rx_wr.num_sge = IPOIB_CM_RX_SG;

-	for (i = 0; i < ipoib_recvq_size; ++i) {
-		if (!ipoib_cm_alloc_rx_skb(dev, i, IPOIB_CM_RX_SG - 1,
+	/* One can post receive buffers even before the RX QP is created
+	 * only in the SRQ case. Therefore for NOSRQ we skip the rest of init
+	 * and do that in ipoib_cm_req_handler()
+	 */
+
+	if (priv->cm.srq) {
+		for (i = 0; i < ipoib_recvq_size; ++i) {
+			if (!ipoib_cm_alloc_rx_skb(dev, i, IPOIB_CM_RX_SG - 1,
  					   priv->cm.srq_ring[i].mapping)) {
-			ipoib_warn(priv, "failed to allocate receive buffer %d\n", i);
-			ipoib_cm_dev_cleanup(dev);
-			return -ENOMEM;
-		}
-		if (ipoib_cm_post_receive(dev, i)) {
-			ipoib_warn(priv, "ipoib_ib_post_receive failed for buf %d\n", i);
-			ipoib_cm_dev_cleanup(dev);
-			return -EIO;
+				ipoib_warn(priv, "failed to allocate receive buffer %d\n", i);
+				ipoib_cm_dev_cleanup(dev);
+				return -ENOMEM;
+			}
+			if (post_receive_srq(dev, i)) {
+				ipoib_warn(priv, "post_receive_srq failed for buf %d\n", i);
+				ipoib_cm_dev_cleanup(dev);
+				return -EIO;
+			}
  		}
  	}

--- a/linux-2.6.22/drivers/infiniband/ulp/ipoib/ipoib_ib.c	2007-05-30 
14:56:25.000000000 -0400
+++ b/linux-2.6.22/drivers/infiniband/ulp/ipoib/ipoib_ib.c	2007-07-10 
18:30:10.000000000 -0400
@@ -299,7 +299,7 @@ int ipoib_poll(struct net_device *dev, i
  		for (i = 0; i < n; ++i) {
  			struct ib_wc *wc = priv->ibwc + i;

-			if (wc->wr_id & IPOIB_CM_OP_SRQ) {
+			if (wc->wr_id & IPOIB_CM_OP_RECV) {
  				++done;
  				--max;
  				ipoib_cm_handle_rx_wc(dev, wc);
@@ -557,7 +557,7 @@ void ipoib_drain_cq(struct net_device *d
  	do {
  		n = ib_poll_cq(priv->cq, IPOIB_NUM_WC, priv->ibwc);
  		for (i = 0; i < n; ++i) {
-			if (priv->ibwc[i].wr_id & IPOIB_CM_OP_SRQ)
+			if (priv->ibwc[i].wr_id & IPOIB_CM_OP_RECV)
  				ipoib_cm_handle_rx_wc(dev, priv->ibwc + i);
  			else if (priv->ibwc[i].wr_id & IPOIB_OP_RECV)
  				ipoib_ib_handle_rx_wc(dev, priv->ibwc + i);
--- a/linux-2.6.22/drivers/infiniband/ulp/ipoib/ipoib_verbs.c	2007-05-30 
14:56:25.000000000 -0400
+++ b/linux-2.6.22/drivers/infiniband/ulp/ipoib/ipoib_verbs.c	2007-07-10 
18:30:10.000000000 -0400
@@ -175,6 +175,15 @@ int ipoib_transport_dev_init(struct net_
  	if (!ret)
  		size += ipoib_recvq_size + 1 /* 1 extra for rx_drain_qp */;

+ 	/* We increase the size of the CQ in the NOSRQ case to prevent CQ
+ 	 * overflow. Every new REQ creates a new RX QP and each QP has an
+ 	 * RX ring associated with it. Therefore we could have
+ 	 * NOSRQ_INDEX_TABLE_SIZE*ipoib_recvq_size + ipoib_sendq_size CQEs
+ 	 * in a CQ.
+ 	 */
+ 	if(!priv->cm.srq)
+ 		size += (NOSRQ_INDEX_TABLE_SIZE -1)* ipoib_recvq_size;
+
  	priv->cq = ib_create_cq(priv->ca, ipoib_ib_completion, NULL, dev, 
size, 0);
  	if (IS_ERR(priv->cq)) {
  		printk(KERN_WARNING "%s: failed to create CQ\n", ca->name);




More information about the general mailing list