[ofa-general] IPOB CM (NOSRQ) [PATCH V6] patch

Pradeep Satyanarayana pradeeps at linux.vnet.ibm.com
Tue Jun 12 17:14:00 PDT 2007


Sean, Thanks for looking through this. My responses below.

Pradeep

Sean Hefty wrote:
>> +module_param_named(max_recieve_buffer, max_recv_buf, int, 0644);
>> +MODULE_PARM_DESC(max_recieve_buffer, "Max Recieve Buffer Size in MB");
> 
> nit: receive misspelled

you are correct.

> 
>> +static int allocate_and_post_rbuf_nosrq(struct ib_cm_id *cm_id,
>> +				        struct ipoib_cm_rx *p, unsigned psn)
>> +{
>> +	struct net_device *dev = cm_id->context;
>> +	struct ipoib_dev_priv *priv = netdev_priv(dev);
>> +	int ret;
>> +	u32 qp_num, index;
>> +	u64 i, recv_mem_used;
>> +
>> +	qp_num = p->qp->qp_num;
>> +
>> +	/* In the SRQ case there is a common rx buffer called the srq_ring.
>> +	 * However, for the NOSRQ we create an rx_ring for every
>> +	 * struct ipoib_cm_rx.
>> +	 */
>> +	p->rx_ring = kzalloc(ipoib_recvq_size * sizeof *p->rx_ring, GFP_KERNEL);
>> +	if (!p->rx_ring) {
>> +		printk(KERN_WARNING "Failed to allocate rx_ring for 0x%x\n",
>> +		       qp_num);
>> +		return -ENOMEM;
>> +	}
>> +
>> +	init_context_and_add_list(cm_id, p, priv);
>> +	spin_lock_irq(&priv->lock);
>> +
>> +	for (index = 0; index < max_rc_qp; index++)
>> +		if (priv->cm.rx_index_table[index] == NULL)
>> +			break;
>> +
>> +	spin_lock(&nosrq_count.lock);
>> +	recv_mem_used = (u64)ipoib_recvq_size * (u64)nosrq_count.current_rc_qp
>> +			* CM_PACKET_SIZE; /* packets are 64K */
>> +	spin_unlock(&nosrq_count.lock);
> 
> Is a spin lock needed here?  Could you make current_rc_qp an atomic?

This function is called only when a REQ is received. Otherwise
current_rc_qp is only used in the error case, or when the connection
is being torn down. Hence I don't think it makes a significant
difference which one is used.

> 
>> +err_send_rej:
>> +err_modify_nosrq:
>> +err_alloc_and_post:
> 
> Maybe just use a single label?

Yes, that is doable

> 
>> @@ -316,9 +488,18 @@ static int ipoib_cm_req_handler(struct i
>>  	}
>>
>>  	psn = random32() & 0xffffff;
>> -	ret = ipoib_cm_modify_rx_qp(dev, cm_id, p->qp, psn);
>> -	if (ret)
>> -		goto err_modify;
>> +	if (!priv->cm.srq) {
>> +		spin_lock(&nosrq_count.lock);
>> +		nosrq_count.current_rc_qp++;
>> +		spin_unlock(&nosrq_count.lock);
>> +		if (ret = allocate_and_post_rbuf_nosrq(cm_id, p, psn))
> 
> Use double parens around assignment: if ((ret = ..))

okay

> 
>> +			goto err_post_nosrq;
>> +	} else {
>> +		p->rx_ring = NULL;
>> +		ret = ipoib_cm_modify_rx_qp(dev, cm_id, p->qp, psn);
>> +		if (ret)
>> +			goto err_modify;
>> +	}
>>
>>  	ret = ipoib_cm_send_rep(dev, cm_id, p->qp, &event->param.req_rcvd, psn);
>>  	if (ret) {
>> @@ -326,18 +507,18 @@ static int ipoib_cm_req_handler(struct i
>>  		goto err_rep;
>>  	}
>>
>> -	cm_id->context = p;
>> -	p->jiffies = jiffies;
>> -	p->state = IPOIB_CM_RX_LIVE;
>> -	spin_lock_irq(&priv->lock);
>> -	if (list_empty(&priv->cm.passive_ids))
>> -		queue_delayed_work(ipoib_workqueue,
>> -				   &priv->cm.stale_task, IPOIB_CM_RX_DELAY);
>> -	list_add(&p->list, &priv->cm.passive_ids);
>> -	spin_unlock_irq(&priv->lock);
>> +	if (priv->cm.srq) {
>> +		init_context_and_add_list(cm_id, p, priv);
>> +		p->state = IPOIB_CM_RX_LIVE;
> 
> The order between setting p->state and adding the item to the list changes here.
> I don't know if this matters, but it's now possible for the work queue to
> execute before p->state is set.

You are correct. I need to set p->state first and then call
init_context_and add_list().
> 
>> +	}
>>  	return 0;
>>
>>  err_rep:
>> +err_post_nosrq:
>> +	list_del_init(&p->list);
> 
> Is this correct?  Is p->list on any list at this point?
> 
>> +	spin_lock(&nosrq_count.lock);
>> +	nosrq_count.current_rc_qp--;
>> +	spin_unlock(&nosrq_count.lock);
>>  err_modify:
>>  	ib_destroy_qp(p->qp);
>>  err_qp:
>> @@ -401,21 +582,51 @@ static void skb_put_frags(struct sk_buff
>>  	}
>>  }
>>
>> -void ipoib_cm_handle_rx_wc(struct net_device *dev, struct ib_wc *wc)
>> +static void timer_check_srq(struct ipoib_dev_priv *priv, struct
>> ipoib_cm_rx *p)
>> +{
>> +	unsigned long flags;
>> +
>> +	if (p && time_after_eq(jiffies, p->jiffies + IPOIB_CM_RX_UPDATE_TIME)) {
>> +		spin_lock_irqsave(&priv->lock, flags);
>> +		p->jiffies = jiffies;
>> +		/* Move this entry to list head, but do
>> +		 * not re-add it if it has been removed. */
> 
> nit: There are several places in the patch where the commenting style needs
> updating.

Move the closing "*/" to the next line?

> 
>> +		if (p->state == IPOIB_CM_RX_LIVE)
>> +			list_move(&p->list, &priv->cm.passive_ids);
>> +		spin_unlock_irqrestore(&priv->lock, flags);
>> +	}
>> +}
>> +
>> +static void timer_check_nosrq(struct ipoib_dev_priv *priv, struct
>> ipoib_cm_rx *p)
>> +{
>> +	unsigned long flags;
>> +
>> +	if (p && time_after_eq(jiffies, p->jiffies + IPOIB_CM_RX_UPDATE_TIME)) {
>> +		spin_lock_irqsave(&priv->lock, flags);
>> +		p->jiffies = jiffies;
>> +		/* Move this entry to list head, but do
>> +		 * not re-add it if it has been removed. */
>> +		if (!list_empty(&p->list))
> 
> This line is the only difference between this function and the previous one.  Is
> it possible to always use the state check?

The state check is only used in the SRQ case.

> 
>> +			list_move(&p->list, &priv->cm.passive_ids);
>> +		spin_unlock_irqrestore(&priv->lock, flags);
>> +	}
>> +}
> 
> 
>> +static void handle_rx_wc_nosrq(struct net_device *dev, struct ib_wc *wc)
>> +{
>> +	struct ipoib_dev_priv *priv = netdev_priv(dev);
>> +	struct sk_buff *skb, *newskb;
>> +	u64 mapping[IPOIB_CM_RX_SG], wr_id = wc->wr_id >> 32;
>> +	u32 index;
>> +	struct ipoib_cm_rx *p, *rx_ptr;
>> +	int frags, ret;
>> +
>> +
>> +	ipoib_dbg_data(priv, "cm recv completion: id %d, status: %d\n",
>> +		       wr_id, wc->status);
>> +
>> +	if (unlikely(wr_id >= ipoib_recvq_size)) {
>> +		ipoib_warn(priv, "cm recv completion event with wrid %d (>
> %d)\n",
>> +				   wr_id, ipoib_recvq_size);
>> +		return;
>> +	}
>> +
>> +	index = (wc->wr_id & ~IPOIB_CM_OP_RECV) & NOSRQ_INDEX_MASK ;
>> +
>> +	/* This is the only place where rx_ptr could be a NULL - could
>> +	 * have just received a packet from a connection that has become
>> +	 * stale and so is going away. We will simply drop the packet and
>> +	 * let the hardware (it s IB_QPT_RC) handle the dropped packet.
>> +	 * In the timer_check() function below, p->jiffies is updated and
>> +	 * hence the connection will not be stale after that.
>> +	 */
>> +	rx_ptr = priv->cm.rx_index_table[index];
> 
> Is synchronization needed here?

No locking required

> 
>> +	if (unlikely(!rx_ptr)) {
>> +		ipoib_warn(priv, "Received packet from a connection "
>> +		           "that is going away. Hardware will handle it.\n");
>> +		return;
>> +	}
>> +
>> +	skb = rx_ptr->rx_ring[wr_id].skb;
>> +
>> +	if (unlikely(wc->status != IB_WC_SUCCESS)) {
>> +		ipoib_dbg(priv, "cm recv error "
>> +			   "(status=%d, wrid=%ld vend_err %x)\n",
>> +			   wc->status, wr_id, wc->vendor_err);
>> +		++priv->stats.rx_dropped;
>> +		goto repost_nosrq;
>> +	}
>> +
>> +	if (!likely(wr_id & IPOIB_CM_RX_UPDATE_MASK)) {
>> +		/* There are no guarantees that wc->qp is not NULL for HCAs
>> +	 	* that do not support SRQ. */
>> +		p = rx_ptr;
>> +		timer_check_nosrq(priv, p);
> 
> This appears to be the only place 'p' is used in this call.  I think we can just
> remove it.

correct.

> 
>> +	}
>> +
>> +	frags = PAGE_ALIGN(wc->byte_len - min(wc->byte_len,
>> +					      (unsigned)IPOIB_CM_HEAD_SIZE)) /
> PAGE_SIZE;
>> +
>> +	newskb = ipoib_cm_alloc_rx_skb(dev, wr_id << 32 | index, frags,
>> +				       mapping);
>> +	if (unlikely(!newskb)) {
>> +		/*
>> +		 * If we can't allocate a new RX buffer, dump
>> +		 * this packet and reuse the old buffer.
>> +		 */
>> +		ipoib_dbg(priv, "failed to allocate receive buffer %ld\n",
> wr_id);
>>  		++priv->stats.rx_dropped;
>> -		goto repost;
>> +		goto repost_nosrq;
>>  	}
>>
>> -	ipoib_cm_dma_unmap_rx(priv, frags, priv->cm.srq_ring[wr_id].mapping);
>> -	memcpy(priv->cm.srq_ring[wr_id].mapping, mapping, (frags + 1) * sizeof
>> *mapping);
>> +	ipoib_cm_dma_unmap_rx(priv, frags,
>> +	                      rx_ptr->rx_ring[wr_id].mapping);
>> +	memcpy(rx_ptr->rx_ring[wr_id].mapping, mapping,
>> +	       (frags + 1) * sizeof *mapping);
>>
>>  	ipoib_dbg_data(priv, "received %d bytes, SLID 0x%04x\n",
>>  		       wc->byte_len, wc->slid);
>> @@ -485,10 +788,22 @@ void ipoib_cm_handle_rx_wc(struct net_de
>>  	skb->pkt_type = PACKET_HOST;
>>  	netif_receive_skb(skb);
>>
>> -repost:
>> -	if (unlikely(ipoib_cm_post_receive(dev, wr_id)))
>> -		ipoib_warn(priv, "ipoib_cm_post_receive failed "
>> -			   "for buf %d\n", wr_id);
>> +repost_nosrq:
>> +	ret = post_receive_nosrq(dev, wr_id << 32 | index);
>> +
>> +	if (unlikely(ret))
>> +		ipoib_warn(priv, "post_receive_nosrq failed for buf %ld\n",
>> +		           wr_id);
>> +}
>> +
>> +void ipoib_cm_handle_rx_wc(struct net_device *dev, struct ib_wc *wc)
>> +{
>> +	struct ipoib_dev_priv *priv = netdev_priv(dev);
>> +
>> +	if (priv->cm.srq)
>> +		handle_rx_wc_srq(dev, wc);
>> +	else
>> +		handle_rx_wc_nosrq(dev, wc);
>>  }
>>
>>  static inline int post_send(struct ipoib_dev_priv *priv,
>> @@ -680,6 +995,44 @@ err_cm:
>>  	return ret;
>>  }
>>
>> +static void free_resources_nosrq(struct ipoib_dev_priv *priv, struct
>> ipoib_cm_rx *p)
>> +{
>> +	int i;
>> +
>> +	for(i = 0; i < ipoib_recvq_size; ++i)
>> +		if(p->rx_ring[i].skb) {
>> +			ipoib_cm_dma_unmap_rx(priv,
>> +				         IPOIB_CM_RX_SG - 1,
>> +					 p->rx_ring[i].mapping);
>> +			dev_kfree_skb_any(p->rx_ring[i].skb);
>> +			p->rx_ring[i].skb = NULL;
>> +		}
>> +	kfree(p->rx_ring);
>> +}
>> +
>> +void dev_stop_nosrq(struct ipoib_dev_priv *priv)
>> +{
>> +	struct ipoib_cm_rx *p;
>> +
>> +	spin_lock_irq(&priv->lock);
>> +	while (!list_empty(&priv->cm.passive_ids)) {
>> +		p = list_entry(priv->cm.passive_ids.next, typeof(*p), list);
>> +		free_resources_nosrq(priv, p);
>> +		list_del_init(&p->list);
> 
> just list_del should work here
> 
>> +		spin_unlock_irq(&priv->lock);
>> +		ib_destroy_cm_id(p->id);
>> +		ib_destroy_qp(p->qp);
>> +		spin_lock(&nosrq_count.lock);
>> +		nosrq_count.current_rc_qp--;
>> +		spin_unlock(&nosrq_count.lock);
>> +		kfree(p);
>> +		spin_lock_irq(&priv->lock);
>> +	}
>> +	spin_unlock_irq(&priv->lock);
>> +
>> +	cancel_delayed_work(&priv->cm.stale_task);
>> +}
>> +
>>  void ipoib_cm_dev_stop(struct net_device *dev)
>>  {
>>  	struct ipoib_dev_priv *priv = netdev_priv(dev);
>> @@ -694,6 +1047,11 @@ void ipoib_cm_dev_stop(struct net_device
>>  	ib_destroy_cm_id(priv->cm.id);
>>  	priv->cm.id = NULL;
>>
>> +	if (!priv->cm.srq) {
>> +		dev_stop_nosrq(priv);
>> +		return;
>> +	}
>> +
>>  	spin_lock_irq(&priv->lock);
>>  	while (!list_empty(&priv->cm.passive_ids)) {
>>  		p = list_entry(priv->cm.passive_ids.next, typeof(*p), list);
>> @@ -739,6 +1097,7 @@ void ipoib_cm_dev_stop(struct net_device
>>  		kfree(p);
>>  	}
>>
>> +
>>  	cancel_delayed_work(&priv->cm.stale_task);
>>  }
>>
>> @@ -817,7 +1176,9 @@ static struct ib_qp *ipoib_cm_create_tx_
>>  	attr.recv_cq = priv->cq;
>>  	attr.srq = priv->cm.srq;
>>  	attr.cap.max_send_wr = ipoib_sendq_size;
>> +	attr.cap.max_recv_wr = 1;
>>  	attr.cap.max_send_sge = 1;
>> +	attr.cap.max_recv_sge = 1;
>>  	attr.sq_sig_type = IB_SIGNAL_ALL_WR;
>>  	attr.qp_type = IB_QPT_RC;
>>  	attr.send_cq = cq;
>> @@ -857,7 +1218,7 @@ static int ipoib_cm_send_req(struct net_
>>  	req.retry_count 	      = 0; /* RFC draft warns against retries */
>>  	req.rnr_retry_count 	      = 0; /* RFC draft warns against retries */
>>  	req.max_cm_retries 	      = 15;
>> -	req.srq 	              = 1;
>> +	req.srq			      = !!priv->cm.srq;
>>  	return ib_send_cm_req(id, &req);
>>  }
>>
>> @@ -1202,6 +1563,11 @@ static void ipoib_cm_rx_reap(struct work
>>  	list_for_each_entry_safe(p, n, &list, list) {
>>  		ib_destroy_cm_id(p->id);
>>  		ib_destroy_qp(p->qp);
>> +		if (!priv->cm.srq) {
>> +			spin_lock(&nosrq_count.lock);
>> +			nosrq_count.current_rc_qp--;
>> +			spin_unlock(&nosrq_count.lock);
>> +		}
>>  		kfree(p);
>>  	}
>>  }
>> @@ -1220,12 +1586,19 @@ static void ipoib_cm_stale_task(struct w
>>  		p = list_entry(priv->cm.passive_ids.prev, typeof(*p), list);
>>  		if (time_before_eq(jiffies, p->jiffies + IPOIB_CM_RX_TIMEOUT))
>>  			break;
>> -		list_move(&p->list, &priv->cm.rx_error_list);
>> -		p->state = IPOIB_CM_RX_ERROR;
>> -		spin_unlock_irq(&priv->lock);
>> -		ret = ib_modify_qp(p->qp, &ipoib_cm_err_attr, IB_QP_STATE);
>> -		if (ret)
>> -			ipoib_warn(priv, "unable to move qp to error state:
> %d\n",
>> ret);
>> +		if (!priv->cm.srq) {
>> +			free_resources_nosrq(priv, p);
>> +			list_del_init(&p->list);
>> +			priv->cm.rx_index_table[p->index] = NULL;
>> +			spin_unlock_irq(&priv->lock);
>> +		} else {
>> +			list_move(&p->list, &priv->cm.rx_error_list);
>> +			p->state = IPOIB_CM_RX_ERROR;
>> +			spin_unlock_irq(&priv->lock);
>> +			ret = ib_modify_qp(p->qp, &ipoib_cm_err_attr,
> IB_QP_STATE);
>> +			if (ret)
>> +				ipoib_warn(priv, "unable to move qp to error
> state:
>> %d\n", ret);
>> +		}
>>  		spin_lock_irq(&priv->lock);
>>  	}
>>
>> @@ -1279,16 +1652,40 @@ int ipoib_cm_add_mode_attr(struct net_de
>>  	return device_create_file(&dev->dev, &dev_attr_mode);
>>  }
>>
>> +static int create_srq(struct net_device *dev, struct ipoib_dev_priv *priv)
>> +{
>> +	struct ib_srq_init_attr srq_init_attr;
>> +	int ret;
>> +
>> +	srq_init_attr.attr.max_wr = ipoib_recvq_size;
>> +	srq_init_attr.attr.max_sge = IPOIB_CM_RX_SG;
>> +
>> +	priv->cm.srq = ib_create_srq(priv->pd, &srq_init_attr);
>> +	if (IS_ERR(priv->cm.srq)) {
>> +		ret = PTR_ERR(priv->cm.srq);
>> +		priv->cm.srq = NULL;
>> +		return ret;
> 
> nit: you can just return PTR_ERR here, and remove the ret stack variable

okay
> 
>> +	}
>> +
>> +	priv->cm.srq_ring = kzalloc(ipoib_recvq_size *
>> +		                    sizeof *priv->cm.srq_ring,
>> +			            GFP_KERNEL);
>> +	if (!priv->cm.srq_ring) {
>> +		printk(KERN_WARNING "%s: failed to allocate CM ring "
>> +		       "(%d entries)\n",
>> +	       	       priv->ca->name, ipoib_recvq_size);
>> +		ipoib_cm_dev_cleanup(dev);
>> +		return -ENOMEM;
>> +	}
>> +
>> +	return 0;
>> +}
>> +
>>  int ipoib_cm_dev_init(struct net_device *dev)
>>  {
>>  	struct ipoib_dev_priv *priv = netdev_priv(dev);
>> -	struct ib_srq_init_attr srq_init_attr = {
>> -		.attr = {
>> -			.max_wr  = ipoib_recvq_size,
>> -			.max_sge = IPOIB_CM_RX_SG
>> -		}
>> -	};
>>  	int ret, i;
>> +	struct ib_device_attr attr;
>>
>>  	INIT_LIST_HEAD(&priv->cm.passive_ids);
>>  	INIT_LIST_HEAD(&priv->cm.reap_list);
>> @@ -1305,20 +1702,33 @@ int ipoib_cm_dev_init(struct net_device
>>
>>  	skb_queue_head_init(&priv->cm.skb_queue);
>>
>> -	priv->cm.srq = ib_create_srq(priv->pd, &srq_init_attr);
>> -	if (IS_ERR(priv->cm.srq)) {
>> -		ret = PTR_ERR(priv->cm.srq);
>> -		priv->cm.srq = NULL;
>> +	if (ret = ib_query_device(priv->ca, &attr))
>>  		return ret;
> 
> double parens around assignment - also below

okay
> 
>> -	}
>>
>> -	priv->cm.srq_ring = kzalloc(ipoib_recvq_size * sizeof
> *priv->cm.srq_ring,
>> -				    GFP_KERNEL);
>> -	if (!priv->cm.srq_ring) {
>> -		printk(KERN_WARNING "%s: failed to allocate CM ring (%d
>> entries)\n",
>> -		       priv->ca->name, ipoib_recvq_size);
>> -		ipoib_cm_dev_cleanup(dev);
>> -		return -ENOMEM;
>> +	if (attr.max_srq) {
>> +		/* This device supports SRQ */
>> +		if (ret = create_srq(dev, priv))
>> +			return ret;
>> +		priv->cm.rx_index_table = NULL;
>> +	} else {
>> +		priv->cm.srq = NULL;
>> +		priv->cm.srq_ring = NULL;
>> +
>> +		/* Every new REQ that arrives creates a struct ipoib_cm_rx.
>> +		 * These structures form a link list starting with the
>> +		 * passive_ids. For quick and easy access we maintain a table
>> +		 * of pointers to struct ipoib_cm_rx called the rx_index_table
>> +		 */
> 
> Why store the structures in a linked list if they're stored in a table?

This linked list is common to both SRQ and NOSRQ. Only the NOSRQ code
uses the table.
> 
>> +		priv->cm.rx_index_table = kzalloc(NOSRQ_INDEX_TABLE_SIZE *
>> +					 sizeof *priv->cm.rx_index_table,
>> +					 GFP_KERNEL);
>> +		if (!priv->cm.rx_index_table) {
>> +			printk(KERN_WARNING "Failed to allocate
>> NOSRQ_INDEX_TABLE\n");
>> +			return -ENOMEM;
>> +		}
>> +
>> +		spin_lock_init(&nosrq_count.lock);
>> +		nosrq_count.current_rc_qp = 0;
>>  	}
>>
>>  	for (i = 0; i < IPOIB_CM_RX_SG; ++i)
>> @@ -1331,17 +1741,23 @@ int ipoib_cm_dev_init(struct net_device
>>  	priv->cm.rx_wr.sg_list = priv->cm.rx_sge;
>>  	priv->cm.rx_wr.num_sge = IPOIB_CM_RX_SG;
>>
>> -	for (i = 0; i < ipoib_recvq_size; ++i) {
>> -		if (!ipoib_cm_alloc_rx_skb(dev, i, IPOIB_CM_RX_SG - 1,
>> +	/* One can post receive buffers even before the RX QP is created
>> +	 * only in the SRQ case. Therefore for NOSRQ we skip the rest of init
>> +	 * and do that in ipoib_cm_req_handler() */
> 
> This is separate from this patch, but why not wait to post receives to a SRQ
> only after we've received a REQ?  Would this simplify the code any?

Good point. We could think of that in the future.
> 
> - Sean
> 





More information about the general mailing list