[ofa-general] IPOB CM (NOSRQ) [PATCH V6] patch

Sean Hefty sean.hefty at intel.com
Tue Jun 12 12:13:37 PDT 2007


>+module_param_named(max_recieve_buffer, max_recv_buf, int, 0644);
>+MODULE_PARM_DESC(max_recieve_buffer, "Max Recieve Buffer Size in MB");

nit: receive misspelled

>+static int allocate_and_post_rbuf_nosrq(struct ib_cm_id *cm_id,
>+				        struct ipoib_cm_rx *p, unsigned psn)
>+{
>+	struct net_device *dev = cm_id->context;
>+	struct ipoib_dev_priv *priv = netdev_priv(dev);
>+	int ret;
>+	u32 qp_num, index;
>+	u64 i, recv_mem_used;
>+
>+	qp_num = p->qp->qp_num;
>+
>+	/* In the SRQ case there is a common rx buffer called the srq_ring.
>+	 * However, for the NOSRQ we create an rx_ring for every
>+	 * struct ipoib_cm_rx.
>+	 */
>+	p->rx_ring = kzalloc(ipoib_recvq_size * sizeof *p->rx_ring, GFP_KERNEL);
>+	if (!p->rx_ring) {
>+		printk(KERN_WARNING "Failed to allocate rx_ring for 0x%x\n",
>+		       qp_num);
>+		return -ENOMEM;
>+	}
>+
>+	init_context_and_add_list(cm_id, p, priv);
>+	spin_lock_irq(&priv->lock);
>+
>+	for (index = 0; index < max_rc_qp; index++)
>+		if (priv->cm.rx_index_table[index] == NULL)
>+			break;
>+
>+	spin_lock(&nosrq_count.lock);
>+	recv_mem_used = (u64)ipoib_recvq_size * (u64)nosrq_count.current_rc_qp
>+			* CM_PACKET_SIZE; /* packets are 64K */
>+	spin_unlock(&nosrq_count.lock);

Is a spin lock needed here?  Could you make current_rc_qp an atomic?

>+err_send_rej:
>+err_modify_nosrq:
>+err_alloc_and_post:

Maybe just use a single label?

>@@ -316,9 +488,18 @@ static int ipoib_cm_req_handler(struct i
>  	}
>
>  	psn = random32() & 0xffffff;
>-	ret = ipoib_cm_modify_rx_qp(dev, cm_id, p->qp, psn);
>-	if (ret)
>-		goto err_modify;
>+	if (!priv->cm.srq) {
>+		spin_lock(&nosrq_count.lock);
>+		nosrq_count.current_rc_qp++;
>+		spin_unlock(&nosrq_count.lock);
>+		if (ret = allocate_and_post_rbuf_nosrq(cm_id, p, psn))

Use double parens around assignment: if ((ret = ..))

>+			goto err_post_nosrq;
>+	} else {
>+		p->rx_ring = NULL;
>+		ret = ipoib_cm_modify_rx_qp(dev, cm_id, p->qp, psn);
>+		if (ret)
>+			goto err_modify;
>+	}
>
>  	ret = ipoib_cm_send_rep(dev, cm_id, p->qp, &event->param.req_rcvd, psn);
>  	if (ret) {
>@@ -326,18 +507,18 @@ static int ipoib_cm_req_handler(struct i
>  		goto err_rep;
>  	}
>
>-	cm_id->context = p;
>-	p->jiffies = jiffies;
>-	p->state = IPOIB_CM_RX_LIVE;
>-	spin_lock_irq(&priv->lock);
>-	if (list_empty(&priv->cm.passive_ids))
>-		queue_delayed_work(ipoib_workqueue,
>-				   &priv->cm.stale_task, IPOIB_CM_RX_DELAY);
>-	list_add(&p->list, &priv->cm.passive_ids);
>-	spin_unlock_irq(&priv->lock);
>+	if (priv->cm.srq) {
>+		init_context_and_add_list(cm_id, p, priv);
>+		p->state = IPOIB_CM_RX_LIVE;

The order between setting p->state and adding the item to the list changes here.
I don't know if this matters, but it's now possible for the work queue to
execute before p->state is set.

>+	}
>  	return 0;
>
>  err_rep:
>+err_post_nosrq:
>+	list_del_init(&p->list);

Is this correct?  Is p->list on any list at this point?

>+	spin_lock(&nosrq_count.lock);
>+	nosrq_count.current_rc_qp--;
>+	spin_unlock(&nosrq_count.lock);
>  err_modify:
>  	ib_destroy_qp(p->qp);
>  err_qp:
>@@ -401,21 +582,51 @@ static void skb_put_frags(struct sk_buff
>  	}
>  }
>
>-void ipoib_cm_handle_rx_wc(struct net_device *dev, struct ib_wc *wc)
>+static void timer_check_srq(struct ipoib_dev_priv *priv, struct
>ipoib_cm_rx *p)
>+{
>+	unsigned long flags;
>+
>+	if (p && time_after_eq(jiffies, p->jiffies + IPOIB_CM_RX_UPDATE_TIME)) {
>+		spin_lock_irqsave(&priv->lock, flags);
>+		p->jiffies = jiffies;
>+		/* Move this entry to list head, but do
>+		 * not re-add it if it has been removed. */

nit: There are several places in the patch where the commenting style needs
updating.

>+		if (p->state == IPOIB_CM_RX_LIVE)
>+			list_move(&p->list, &priv->cm.passive_ids);
>+		spin_unlock_irqrestore(&priv->lock, flags);
>+	}
>+}
>+
>+static void timer_check_nosrq(struct ipoib_dev_priv *priv, struct
>ipoib_cm_rx *p)
>+{
>+	unsigned long flags;
>+
>+	if (p && time_after_eq(jiffies, p->jiffies + IPOIB_CM_RX_UPDATE_TIME)) {
>+		spin_lock_irqsave(&priv->lock, flags);
>+		p->jiffies = jiffies;
>+		/* Move this entry to list head, but do
>+		 * not re-add it if it has been removed. */
>+		if (!list_empty(&p->list))

This line is the only difference between this function and the previous one.  Is
it possible to always use the state check?

>+			list_move(&p->list, &priv->cm.passive_ids);
>+		spin_unlock_irqrestore(&priv->lock, flags);
>+	}
>+}


>+static void handle_rx_wc_nosrq(struct net_device *dev, struct ib_wc *wc)
>+{
>+	struct ipoib_dev_priv *priv = netdev_priv(dev);
>+	struct sk_buff *skb, *newskb;
>+	u64 mapping[IPOIB_CM_RX_SG], wr_id = wc->wr_id >> 32;
>+	u32 index;
>+	struct ipoib_cm_rx *p, *rx_ptr;
>+	int frags, ret;
>+
>+
>+	ipoib_dbg_data(priv, "cm recv completion: id %d, status: %d\n",
>+		       wr_id, wc->status);
>+
>+	if (unlikely(wr_id >= ipoib_recvq_size)) {
>+		ipoib_warn(priv, "cm recv completion event with wrid %d (>
%d)\n",
>+				   wr_id, ipoib_recvq_size);
>+		return;
>+	}
>+
>+	index = (wc->wr_id & ~IPOIB_CM_OP_RECV) & NOSRQ_INDEX_MASK ;
>+
>+	/* This is the only place where rx_ptr could be a NULL - could
>+	 * have just received a packet from a connection that has become
>+	 * stale and so is going away. We will simply drop the packet and
>+	 * let the hardware (it s IB_QPT_RC) handle the dropped packet.
>+	 * In the timer_check() function below, p->jiffies is updated and
>+	 * hence the connection will not be stale after that.
>+	 */
>+	rx_ptr = priv->cm.rx_index_table[index];

Is synchronization needed here?

>+	if (unlikely(!rx_ptr)) {
>+		ipoib_warn(priv, "Received packet from a connection "
>+		           "that is going away. Hardware will handle it.\n");
>+		return;
>+	}
>+
>+	skb = rx_ptr->rx_ring[wr_id].skb;
>+
>+	if (unlikely(wc->status != IB_WC_SUCCESS)) {
>+		ipoib_dbg(priv, "cm recv error "
>+			   "(status=%d, wrid=%ld vend_err %x)\n",
>+			   wc->status, wr_id, wc->vendor_err);
>+		++priv->stats.rx_dropped;
>+		goto repost_nosrq;
>+	}
>+
>+	if (!likely(wr_id & IPOIB_CM_RX_UPDATE_MASK)) {
>+		/* There are no guarantees that wc->qp is not NULL for HCAs
>+	 	* that do not support SRQ. */
>+		p = rx_ptr;
>+		timer_check_nosrq(priv, p);

This appears to be the only place 'p' is used in this call.  I think we can just
remove it.

>+	}
>+
>+	frags = PAGE_ALIGN(wc->byte_len - min(wc->byte_len,
>+					      (unsigned)IPOIB_CM_HEAD_SIZE)) /
PAGE_SIZE;
>+
>+	newskb = ipoib_cm_alloc_rx_skb(dev, wr_id << 32 | index, frags,
>+				       mapping);
>+	if (unlikely(!newskb)) {
>+		/*
>+		 * If we can't allocate a new RX buffer, dump
>+		 * this packet and reuse the old buffer.
>+		 */
>+		ipoib_dbg(priv, "failed to allocate receive buffer %ld\n",
wr_id);
>  		++priv->stats.rx_dropped;
>-		goto repost;
>+		goto repost_nosrq;
>  	}
>
>-	ipoib_cm_dma_unmap_rx(priv, frags, priv->cm.srq_ring[wr_id].mapping);
>-	memcpy(priv->cm.srq_ring[wr_id].mapping, mapping, (frags + 1) * sizeof
>*mapping);
>+	ipoib_cm_dma_unmap_rx(priv, frags,
>+	                      rx_ptr->rx_ring[wr_id].mapping);
>+	memcpy(rx_ptr->rx_ring[wr_id].mapping, mapping,
>+	       (frags + 1) * sizeof *mapping);
>
>  	ipoib_dbg_data(priv, "received %d bytes, SLID 0x%04x\n",
>  		       wc->byte_len, wc->slid);
>@@ -485,10 +788,22 @@ void ipoib_cm_handle_rx_wc(struct net_de
>  	skb->pkt_type = PACKET_HOST;
>  	netif_receive_skb(skb);
>
>-repost:
>-	if (unlikely(ipoib_cm_post_receive(dev, wr_id)))
>-		ipoib_warn(priv, "ipoib_cm_post_receive failed "
>-			   "for buf %d\n", wr_id);
>+repost_nosrq:
>+	ret = post_receive_nosrq(dev, wr_id << 32 | index);
>+
>+	if (unlikely(ret))
>+		ipoib_warn(priv, "post_receive_nosrq failed for buf %ld\n",
>+		           wr_id);
>+}
>+
>+void ipoib_cm_handle_rx_wc(struct net_device *dev, struct ib_wc *wc)
>+{
>+	struct ipoib_dev_priv *priv = netdev_priv(dev);
>+
>+	if (priv->cm.srq)
>+		handle_rx_wc_srq(dev, wc);
>+	else
>+		handle_rx_wc_nosrq(dev, wc);
>  }
>
>  static inline int post_send(struct ipoib_dev_priv *priv,
>@@ -680,6 +995,44 @@ err_cm:
>  	return ret;
>  }
>
>+static void free_resources_nosrq(struct ipoib_dev_priv *priv, struct
>ipoib_cm_rx *p)
>+{
>+	int i;
>+
>+	for(i = 0; i < ipoib_recvq_size; ++i)
>+		if(p->rx_ring[i].skb) {
>+			ipoib_cm_dma_unmap_rx(priv,
>+				         IPOIB_CM_RX_SG - 1,
>+					 p->rx_ring[i].mapping);
>+			dev_kfree_skb_any(p->rx_ring[i].skb);
>+			p->rx_ring[i].skb = NULL;
>+		}
>+	kfree(p->rx_ring);
>+}
>+
>+void dev_stop_nosrq(struct ipoib_dev_priv *priv)
>+{
>+	struct ipoib_cm_rx *p;
>+
>+	spin_lock_irq(&priv->lock);
>+	while (!list_empty(&priv->cm.passive_ids)) {
>+		p = list_entry(priv->cm.passive_ids.next, typeof(*p), list);
>+		free_resources_nosrq(priv, p);
>+		list_del_init(&p->list);

just list_del should work here

>+		spin_unlock_irq(&priv->lock);
>+		ib_destroy_cm_id(p->id);
>+		ib_destroy_qp(p->qp);
>+		spin_lock(&nosrq_count.lock);
>+		nosrq_count.current_rc_qp--;
>+		spin_unlock(&nosrq_count.lock);
>+		kfree(p);
>+		spin_lock_irq(&priv->lock);
>+	}
>+	spin_unlock_irq(&priv->lock);
>+
>+	cancel_delayed_work(&priv->cm.stale_task);
>+}
>+
>  void ipoib_cm_dev_stop(struct net_device *dev)
>  {
>  	struct ipoib_dev_priv *priv = netdev_priv(dev);
>@@ -694,6 +1047,11 @@ void ipoib_cm_dev_stop(struct net_device
>  	ib_destroy_cm_id(priv->cm.id);
>  	priv->cm.id = NULL;
>
>+	if (!priv->cm.srq) {
>+		dev_stop_nosrq(priv);
>+		return;
>+	}
>+
>  	spin_lock_irq(&priv->lock);
>  	while (!list_empty(&priv->cm.passive_ids)) {
>  		p = list_entry(priv->cm.passive_ids.next, typeof(*p), list);
>@@ -739,6 +1097,7 @@ void ipoib_cm_dev_stop(struct net_device
>  		kfree(p);
>  	}
>
>+
>  	cancel_delayed_work(&priv->cm.stale_task);
>  }
>
>@@ -817,7 +1176,9 @@ static struct ib_qp *ipoib_cm_create_tx_
>  	attr.recv_cq = priv->cq;
>  	attr.srq = priv->cm.srq;
>  	attr.cap.max_send_wr = ipoib_sendq_size;
>+	attr.cap.max_recv_wr = 1;
>  	attr.cap.max_send_sge = 1;
>+	attr.cap.max_recv_sge = 1;
>  	attr.sq_sig_type = IB_SIGNAL_ALL_WR;
>  	attr.qp_type = IB_QPT_RC;
>  	attr.send_cq = cq;
>@@ -857,7 +1218,7 @@ static int ipoib_cm_send_req(struct net_
>  	req.retry_count 	      = 0; /* RFC draft warns against retries */
>  	req.rnr_retry_count 	      = 0; /* RFC draft warns against retries */
>  	req.max_cm_retries 	      = 15;
>-	req.srq 	              = 1;
>+	req.srq			      = !!priv->cm.srq;
>  	return ib_send_cm_req(id, &req);
>  }
>
>@@ -1202,6 +1563,11 @@ static void ipoib_cm_rx_reap(struct work
>  	list_for_each_entry_safe(p, n, &list, list) {
>  		ib_destroy_cm_id(p->id);
>  		ib_destroy_qp(p->qp);
>+		if (!priv->cm.srq) {
>+			spin_lock(&nosrq_count.lock);
>+			nosrq_count.current_rc_qp--;
>+			spin_unlock(&nosrq_count.lock);
>+		}
>  		kfree(p);
>  	}
>  }
>@@ -1220,12 +1586,19 @@ static void ipoib_cm_stale_task(struct w
>  		p = list_entry(priv->cm.passive_ids.prev, typeof(*p), list);
>  		if (time_before_eq(jiffies, p->jiffies + IPOIB_CM_RX_TIMEOUT))
>  			break;
>-		list_move(&p->list, &priv->cm.rx_error_list);
>-		p->state = IPOIB_CM_RX_ERROR;
>-		spin_unlock_irq(&priv->lock);
>-		ret = ib_modify_qp(p->qp, &ipoib_cm_err_attr, IB_QP_STATE);
>-		if (ret)
>-			ipoib_warn(priv, "unable to move qp to error state:
%d\n",
>ret);
>+		if (!priv->cm.srq) {
>+			free_resources_nosrq(priv, p);
>+			list_del_init(&p->list);
>+			priv->cm.rx_index_table[p->index] = NULL;
>+			spin_unlock_irq(&priv->lock);
>+		} else {
>+			list_move(&p->list, &priv->cm.rx_error_list);
>+			p->state = IPOIB_CM_RX_ERROR;
>+			spin_unlock_irq(&priv->lock);
>+			ret = ib_modify_qp(p->qp, &ipoib_cm_err_attr,
IB_QP_STATE);
>+			if (ret)
>+				ipoib_warn(priv, "unable to move qp to error
state:
>%d\n", ret);
>+		}
>  		spin_lock_irq(&priv->lock);
>  	}
>
>@@ -1279,16 +1652,40 @@ int ipoib_cm_add_mode_attr(struct net_de
>  	return device_create_file(&dev->dev, &dev_attr_mode);
>  }
>
>+static int create_srq(struct net_device *dev, struct ipoib_dev_priv *priv)
>+{
>+	struct ib_srq_init_attr srq_init_attr;
>+	int ret;
>+
>+	srq_init_attr.attr.max_wr = ipoib_recvq_size;
>+	srq_init_attr.attr.max_sge = IPOIB_CM_RX_SG;
>+
>+	priv->cm.srq = ib_create_srq(priv->pd, &srq_init_attr);
>+	if (IS_ERR(priv->cm.srq)) {
>+		ret = PTR_ERR(priv->cm.srq);
>+		priv->cm.srq = NULL;
>+		return ret;

nit: you can just return PTR_ERR here, and remove the ret stack variable

>+	}
>+
>+	priv->cm.srq_ring = kzalloc(ipoib_recvq_size *
>+		                    sizeof *priv->cm.srq_ring,
>+			            GFP_KERNEL);
>+	if (!priv->cm.srq_ring) {
>+		printk(KERN_WARNING "%s: failed to allocate CM ring "
>+		       "(%d entries)\n",
>+	       	       priv->ca->name, ipoib_recvq_size);
>+		ipoib_cm_dev_cleanup(dev);
>+		return -ENOMEM;
>+	}
>+
>+	return 0;
>+}
>+
>  int ipoib_cm_dev_init(struct net_device *dev)
>  {
>  	struct ipoib_dev_priv *priv = netdev_priv(dev);
>-	struct ib_srq_init_attr srq_init_attr = {
>-		.attr = {
>-			.max_wr  = ipoib_recvq_size,
>-			.max_sge = IPOIB_CM_RX_SG
>-		}
>-	};
>  	int ret, i;
>+	struct ib_device_attr attr;
>
>  	INIT_LIST_HEAD(&priv->cm.passive_ids);
>  	INIT_LIST_HEAD(&priv->cm.reap_list);
>@@ -1305,20 +1702,33 @@ int ipoib_cm_dev_init(struct net_device
>
>  	skb_queue_head_init(&priv->cm.skb_queue);
>
>-	priv->cm.srq = ib_create_srq(priv->pd, &srq_init_attr);
>-	if (IS_ERR(priv->cm.srq)) {
>-		ret = PTR_ERR(priv->cm.srq);
>-		priv->cm.srq = NULL;
>+	if (ret = ib_query_device(priv->ca, &attr))
>  		return ret;

double parens around assignment - also below

>-	}
>
>-	priv->cm.srq_ring = kzalloc(ipoib_recvq_size * sizeof
*priv->cm.srq_ring,
>-				    GFP_KERNEL);
>-	if (!priv->cm.srq_ring) {
>-		printk(KERN_WARNING "%s: failed to allocate CM ring (%d
>entries)\n",
>-		       priv->ca->name, ipoib_recvq_size);
>-		ipoib_cm_dev_cleanup(dev);
>-		return -ENOMEM;
>+	if (attr.max_srq) {
>+		/* This device supports SRQ */
>+		if (ret = create_srq(dev, priv))
>+			return ret;
>+		priv->cm.rx_index_table = NULL;
>+	} else {
>+		priv->cm.srq = NULL;
>+		priv->cm.srq_ring = NULL;
>+
>+		/* Every new REQ that arrives creates a struct ipoib_cm_rx.
>+		 * These structures form a link list starting with the
>+		 * passive_ids. For quick and easy access we maintain a table
>+		 * of pointers to struct ipoib_cm_rx called the rx_index_table
>+		 */

Why store the structures in a linked list if they're stored in a table?

>+		priv->cm.rx_index_table = kzalloc(NOSRQ_INDEX_TABLE_SIZE *
>+					 sizeof *priv->cm.rx_index_table,
>+					 GFP_KERNEL);
>+		if (!priv->cm.rx_index_table) {
>+			printk(KERN_WARNING "Failed to allocate
>NOSRQ_INDEX_TABLE\n");
>+			return -ENOMEM;
>+		}
>+
>+		spin_lock_init(&nosrq_count.lock);
>+		nosrq_count.current_rc_qp = 0;
>  	}
>
>  	for (i = 0; i < IPOIB_CM_RX_SG; ++i)
>@@ -1331,17 +1741,23 @@ int ipoib_cm_dev_init(struct net_device
>  	priv->cm.rx_wr.sg_list = priv->cm.rx_sge;
>  	priv->cm.rx_wr.num_sge = IPOIB_CM_RX_SG;
>
>-	for (i = 0; i < ipoib_recvq_size; ++i) {
>-		if (!ipoib_cm_alloc_rx_skb(dev, i, IPOIB_CM_RX_SG - 1,
>+	/* One can post receive buffers even before the RX QP is created
>+	 * only in the SRQ case. Therefore for NOSRQ we skip the rest of init
>+	 * and do that in ipoib_cm_req_handler() */

This is separate from this patch, but why not wait to post receives to a SRQ
only after we've received a REQ?  Would this simplify the code any?

- Sean



More information about the general mailing list