[ofa-general] IPOB CM (NOSRQ) [PATCH V6] patch
Pradeep Satyanarayana
pradeeps at linux.vnet.ibm.com
Tue Jun 12 17:14:00 PDT 2007
Sean, Thanks for looking through this. My responses below.
Pradeep
Sean Hefty wrote:
>> +module_param_named(max_recieve_buffer, max_recv_buf, int, 0644);
>> +MODULE_PARM_DESC(max_recieve_buffer, "Max Recieve Buffer Size in MB");
>
> nit: receive misspelled
you are correct.
>
>> +static int allocate_and_post_rbuf_nosrq(struct ib_cm_id *cm_id,
>> + struct ipoib_cm_rx *p, unsigned psn)
>> +{
>> + struct net_device *dev = cm_id->context;
>> + struct ipoib_dev_priv *priv = netdev_priv(dev);
>> + int ret;
>> + u32 qp_num, index;
>> + u64 i, recv_mem_used;
>> +
>> + qp_num = p->qp->qp_num;
>> +
>> + /* In the SRQ case there is a common rx buffer called the srq_ring.
>> + * However, for the NOSRQ we create an rx_ring for every
>> + * struct ipoib_cm_rx.
>> + */
>> + p->rx_ring = kzalloc(ipoib_recvq_size * sizeof *p->rx_ring, GFP_KERNEL);
>> + if (!p->rx_ring) {
>> + printk(KERN_WARNING "Failed to allocate rx_ring for 0x%x\n",
>> + qp_num);
>> + return -ENOMEM;
>> + }
>> +
>> + init_context_and_add_list(cm_id, p, priv);
>> + spin_lock_irq(&priv->lock);
>> +
>> + for (index = 0; index < max_rc_qp; index++)
>> + if (priv->cm.rx_index_table[index] == NULL)
>> + break;
>> +
>> + spin_lock(&nosrq_count.lock);
>> + recv_mem_used = (u64)ipoib_recvq_size * (u64)nosrq_count.current_rc_qp
>> + * CM_PACKET_SIZE; /* packets are 64K */
>> + spin_unlock(&nosrq_count.lock);
>
> Is a spin lock needed here? Could you make current_rc_qp an atomic?
This function is called only when a REQ is received. Otherwise
current_rc_qp is only used in the error case, or when the connection
is being torn down. Hence I don't think it makes a significant
difference which one is used.
>
>> +err_send_rej:
>> +err_modify_nosrq:
>> +err_alloc_and_post:
>
> Maybe just use a single label?
Yes, that is doable
>
>> @@ -316,9 +488,18 @@ static int ipoib_cm_req_handler(struct i
>> }
>>
>> psn = random32() & 0xffffff;
>> - ret = ipoib_cm_modify_rx_qp(dev, cm_id, p->qp, psn);
>> - if (ret)
>> - goto err_modify;
>> + if (!priv->cm.srq) {
>> + spin_lock(&nosrq_count.lock);
>> + nosrq_count.current_rc_qp++;
>> + spin_unlock(&nosrq_count.lock);
>> + if (ret = allocate_and_post_rbuf_nosrq(cm_id, p, psn))
>
> Use double parens around assignment: if ((ret = ..))
okay
>
>> + goto err_post_nosrq;
>> + } else {
>> + p->rx_ring = NULL;
>> + ret = ipoib_cm_modify_rx_qp(dev, cm_id, p->qp, psn);
>> + if (ret)
>> + goto err_modify;
>> + }
>>
>> ret = ipoib_cm_send_rep(dev, cm_id, p->qp, &event->param.req_rcvd, psn);
>> if (ret) {
>> @@ -326,18 +507,18 @@ static int ipoib_cm_req_handler(struct i
>> goto err_rep;
>> }
>>
>> - cm_id->context = p;
>> - p->jiffies = jiffies;
>> - p->state = IPOIB_CM_RX_LIVE;
>> - spin_lock_irq(&priv->lock);
>> - if (list_empty(&priv->cm.passive_ids))
>> - queue_delayed_work(ipoib_workqueue,
>> - &priv->cm.stale_task, IPOIB_CM_RX_DELAY);
>> - list_add(&p->list, &priv->cm.passive_ids);
>> - spin_unlock_irq(&priv->lock);
>> + if (priv->cm.srq) {
>> + init_context_and_add_list(cm_id, p, priv);
>> + p->state = IPOIB_CM_RX_LIVE;
>
> The order between setting p->state and adding the item to the list changes here.
> I don't know if this matters, but it's now possible for the work queue to
> execute before p->state is set.
You are correct. I need to set p->state first and then call
init_context_and add_list().
>
>> + }
>> return 0;
>>
>> err_rep:
>> +err_post_nosrq:
>> + list_del_init(&p->list);
>
> Is this correct? Is p->list on any list at this point?
>
>> + spin_lock(&nosrq_count.lock);
>> + nosrq_count.current_rc_qp--;
>> + spin_unlock(&nosrq_count.lock);
>> err_modify:
>> ib_destroy_qp(p->qp);
>> err_qp:
>> @@ -401,21 +582,51 @@ static void skb_put_frags(struct sk_buff
>> }
>> }
>>
>> -void ipoib_cm_handle_rx_wc(struct net_device *dev, struct ib_wc *wc)
>> +static void timer_check_srq(struct ipoib_dev_priv *priv, struct
>> ipoib_cm_rx *p)
>> +{
>> + unsigned long flags;
>> +
>> + if (p && time_after_eq(jiffies, p->jiffies + IPOIB_CM_RX_UPDATE_TIME)) {
>> + spin_lock_irqsave(&priv->lock, flags);
>> + p->jiffies = jiffies;
>> + /* Move this entry to list head, but do
>> + * not re-add it if it has been removed. */
>
> nit: There are several places in the patch where the commenting style needs
> updating.
Move the closing "*/" to the next line?
>
>> + if (p->state == IPOIB_CM_RX_LIVE)
>> + list_move(&p->list, &priv->cm.passive_ids);
>> + spin_unlock_irqrestore(&priv->lock, flags);
>> + }
>> +}
>> +
>> +static void timer_check_nosrq(struct ipoib_dev_priv *priv, struct
>> ipoib_cm_rx *p)
>> +{
>> + unsigned long flags;
>> +
>> + if (p && time_after_eq(jiffies, p->jiffies + IPOIB_CM_RX_UPDATE_TIME)) {
>> + spin_lock_irqsave(&priv->lock, flags);
>> + p->jiffies = jiffies;
>> + /* Move this entry to list head, but do
>> + * not re-add it if it has been removed. */
>> + if (!list_empty(&p->list))
>
> This line is the only difference between this function and the previous one. Is
> it possible to always use the state check?
The state check is only used in the SRQ case.
>
>> + list_move(&p->list, &priv->cm.passive_ids);
>> + spin_unlock_irqrestore(&priv->lock, flags);
>> + }
>> +}
>
>
>> +static void handle_rx_wc_nosrq(struct net_device *dev, struct ib_wc *wc)
>> +{
>> + struct ipoib_dev_priv *priv = netdev_priv(dev);
>> + struct sk_buff *skb, *newskb;
>> + u64 mapping[IPOIB_CM_RX_SG], wr_id = wc->wr_id >> 32;
>> + u32 index;
>> + struct ipoib_cm_rx *p, *rx_ptr;
>> + int frags, ret;
>> +
>> +
>> + ipoib_dbg_data(priv, "cm recv completion: id %d, status: %d\n",
>> + wr_id, wc->status);
>> +
>> + if (unlikely(wr_id >= ipoib_recvq_size)) {
>> + ipoib_warn(priv, "cm recv completion event with wrid %d (>
> %d)\n",
>> + wr_id, ipoib_recvq_size);
>> + return;
>> + }
>> +
>> + index = (wc->wr_id & ~IPOIB_CM_OP_RECV) & NOSRQ_INDEX_MASK ;
>> +
>> + /* This is the only place where rx_ptr could be a NULL - could
>> + * have just received a packet from a connection that has become
>> + * stale and so is going away. We will simply drop the packet and
>> + * let the hardware (it s IB_QPT_RC) handle the dropped packet.
>> + * In the timer_check() function below, p->jiffies is updated and
>> + * hence the connection will not be stale after that.
>> + */
>> + rx_ptr = priv->cm.rx_index_table[index];
>
> Is synchronization needed here?
No locking required
>
>> + if (unlikely(!rx_ptr)) {
>> + ipoib_warn(priv, "Received packet from a connection "
>> + "that is going away. Hardware will handle it.\n");
>> + return;
>> + }
>> +
>> + skb = rx_ptr->rx_ring[wr_id].skb;
>> +
>> + if (unlikely(wc->status != IB_WC_SUCCESS)) {
>> + ipoib_dbg(priv, "cm recv error "
>> + "(status=%d, wrid=%ld vend_err %x)\n",
>> + wc->status, wr_id, wc->vendor_err);
>> + ++priv->stats.rx_dropped;
>> + goto repost_nosrq;
>> + }
>> +
>> + if (!likely(wr_id & IPOIB_CM_RX_UPDATE_MASK)) {
>> + /* There are no guarantees that wc->qp is not NULL for HCAs
>> + * that do not support SRQ. */
>> + p = rx_ptr;
>> + timer_check_nosrq(priv, p);
>
> This appears to be the only place 'p' is used in this call. I think we can just
> remove it.
correct.
>
>> + }
>> +
>> + frags = PAGE_ALIGN(wc->byte_len - min(wc->byte_len,
>> + (unsigned)IPOIB_CM_HEAD_SIZE)) /
> PAGE_SIZE;
>> +
>> + newskb = ipoib_cm_alloc_rx_skb(dev, wr_id << 32 | index, frags,
>> + mapping);
>> + if (unlikely(!newskb)) {
>> + /*
>> + * If we can't allocate a new RX buffer, dump
>> + * this packet and reuse the old buffer.
>> + */
>> + ipoib_dbg(priv, "failed to allocate receive buffer %ld\n",
> wr_id);
>> ++priv->stats.rx_dropped;
>> - goto repost;
>> + goto repost_nosrq;
>> }
>>
>> - ipoib_cm_dma_unmap_rx(priv, frags, priv->cm.srq_ring[wr_id].mapping);
>> - memcpy(priv->cm.srq_ring[wr_id].mapping, mapping, (frags + 1) * sizeof
>> *mapping);
>> + ipoib_cm_dma_unmap_rx(priv, frags,
>> + rx_ptr->rx_ring[wr_id].mapping);
>> + memcpy(rx_ptr->rx_ring[wr_id].mapping, mapping,
>> + (frags + 1) * sizeof *mapping);
>>
>> ipoib_dbg_data(priv, "received %d bytes, SLID 0x%04x\n",
>> wc->byte_len, wc->slid);
>> @@ -485,10 +788,22 @@ void ipoib_cm_handle_rx_wc(struct net_de
>> skb->pkt_type = PACKET_HOST;
>> netif_receive_skb(skb);
>>
>> -repost:
>> - if (unlikely(ipoib_cm_post_receive(dev, wr_id)))
>> - ipoib_warn(priv, "ipoib_cm_post_receive failed "
>> - "for buf %d\n", wr_id);
>> +repost_nosrq:
>> + ret = post_receive_nosrq(dev, wr_id << 32 | index);
>> +
>> + if (unlikely(ret))
>> + ipoib_warn(priv, "post_receive_nosrq failed for buf %ld\n",
>> + wr_id);
>> +}
>> +
>> +void ipoib_cm_handle_rx_wc(struct net_device *dev, struct ib_wc *wc)
>> +{
>> + struct ipoib_dev_priv *priv = netdev_priv(dev);
>> +
>> + if (priv->cm.srq)
>> + handle_rx_wc_srq(dev, wc);
>> + else
>> + handle_rx_wc_nosrq(dev, wc);
>> }
>>
>> static inline int post_send(struct ipoib_dev_priv *priv,
>> @@ -680,6 +995,44 @@ err_cm:
>> return ret;
>> }
>>
>> +static void free_resources_nosrq(struct ipoib_dev_priv *priv, struct
>> ipoib_cm_rx *p)
>> +{
>> + int i;
>> +
>> + for(i = 0; i < ipoib_recvq_size; ++i)
>> + if(p->rx_ring[i].skb) {
>> + ipoib_cm_dma_unmap_rx(priv,
>> + IPOIB_CM_RX_SG - 1,
>> + p->rx_ring[i].mapping);
>> + dev_kfree_skb_any(p->rx_ring[i].skb);
>> + p->rx_ring[i].skb = NULL;
>> + }
>> + kfree(p->rx_ring);
>> +}
>> +
>> +void dev_stop_nosrq(struct ipoib_dev_priv *priv)
>> +{
>> + struct ipoib_cm_rx *p;
>> +
>> + spin_lock_irq(&priv->lock);
>> + while (!list_empty(&priv->cm.passive_ids)) {
>> + p = list_entry(priv->cm.passive_ids.next, typeof(*p), list);
>> + free_resources_nosrq(priv, p);
>> + list_del_init(&p->list);
>
> just list_del should work here
>
>> + spin_unlock_irq(&priv->lock);
>> + ib_destroy_cm_id(p->id);
>> + ib_destroy_qp(p->qp);
>> + spin_lock(&nosrq_count.lock);
>> + nosrq_count.current_rc_qp--;
>> + spin_unlock(&nosrq_count.lock);
>> + kfree(p);
>> + spin_lock_irq(&priv->lock);
>> + }
>> + spin_unlock_irq(&priv->lock);
>> +
>> + cancel_delayed_work(&priv->cm.stale_task);
>> +}
>> +
>> void ipoib_cm_dev_stop(struct net_device *dev)
>> {
>> struct ipoib_dev_priv *priv = netdev_priv(dev);
>> @@ -694,6 +1047,11 @@ void ipoib_cm_dev_stop(struct net_device
>> ib_destroy_cm_id(priv->cm.id);
>> priv->cm.id = NULL;
>>
>> + if (!priv->cm.srq) {
>> + dev_stop_nosrq(priv);
>> + return;
>> + }
>> +
>> spin_lock_irq(&priv->lock);
>> while (!list_empty(&priv->cm.passive_ids)) {
>> p = list_entry(priv->cm.passive_ids.next, typeof(*p), list);
>> @@ -739,6 +1097,7 @@ void ipoib_cm_dev_stop(struct net_device
>> kfree(p);
>> }
>>
>> +
>> cancel_delayed_work(&priv->cm.stale_task);
>> }
>>
>> @@ -817,7 +1176,9 @@ static struct ib_qp *ipoib_cm_create_tx_
>> attr.recv_cq = priv->cq;
>> attr.srq = priv->cm.srq;
>> attr.cap.max_send_wr = ipoib_sendq_size;
>> + attr.cap.max_recv_wr = 1;
>> attr.cap.max_send_sge = 1;
>> + attr.cap.max_recv_sge = 1;
>> attr.sq_sig_type = IB_SIGNAL_ALL_WR;
>> attr.qp_type = IB_QPT_RC;
>> attr.send_cq = cq;
>> @@ -857,7 +1218,7 @@ static int ipoib_cm_send_req(struct net_
>> req.retry_count = 0; /* RFC draft warns against retries */
>> req.rnr_retry_count = 0; /* RFC draft warns against retries */
>> req.max_cm_retries = 15;
>> - req.srq = 1;
>> + req.srq = !!priv->cm.srq;
>> return ib_send_cm_req(id, &req);
>> }
>>
>> @@ -1202,6 +1563,11 @@ static void ipoib_cm_rx_reap(struct work
>> list_for_each_entry_safe(p, n, &list, list) {
>> ib_destroy_cm_id(p->id);
>> ib_destroy_qp(p->qp);
>> + if (!priv->cm.srq) {
>> + spin_lock(&nosrq_count.lock);
>> + nosrq_count.current_rc_qp--;
>> + spin_unlock(&nosrq_count.lock);
>> + }
>> kfree(p);
>> }
>> }
>> @@ -1220,12 +1586,19 @@ static void ipoib_cm_stale_task(struct w
>> p = list_entry(priv->cm.passive_ids.prev, typeof(*p), list);
>> if (time_before_eq(jiffies, p->jiffies + IPOIB_CM_RX_TIMEOUT))
>> break;
>> - list_move(&p->list, &priv->cm.rx_error_list);
>> - p->state = IPOIB_CM_RX_ERROR;
>> - spin_unlock_irq(&priv->lock);
>> - ret = ib_modify_qp(p->qp, &ipoib_cm_err_attr, IB_QP_STATE);
>> - if (ret)
>> - ipoib_warn(priv, "unable to move qp to error state:
> %d\n",
>> ret);
>> + if (!priv->cm.srq) {
>> + free_resources_nosrq(priv, p);
>> + list_del_init(&p->list);
>> + priv->cm.rx_index_table[p->index] = NULL;
>> + spin_unlock_irq(&priv->lock);
>> + } else {
>> + list_move(&p->list, &priv->cm.rx_error_list);
>> + p->state = IPOIB_CM_RX_ERROR;
>> + spin_unlock_irq(&priv->lock);
>> + ret = ib_modify_qp(p->qp, &ipoib_cm_err_attr,
> IB_QP_STATE);
>> + if (ret)
>> + ipoib_warn(priv, "unable to move qp to error
> state:
>> %d\n", ret);
>> + }
>> spin_lock_irq(&priv->lock);
>> }
>>
>> @@ -1279,16 +1652,40 @@ int ipoib_cm_add_mode_attr(struct net_de
>> return device_create_file(&dev->dev, &dev_attr_mode);
>> }
>>
>> +static int create_srq(struct net_device *dev, struct ipoib_dev_priv *priv)
>> +{
>> + struct ib_srq_init_attr srq_init_attr;
>> + int ret;
>> +
>> + srq_init_attr.attr.max_wr = ipoib_recvq_size;
>> + srq_init_attr.attr.max_sge = IPOIB_CM_RX_SG;
>> +
>> + priv->cm.srq = ib_create_srq(priv->pd, &srq_init_attr);
>> + if (IS_ERR(priv->cm.srq)) {
>> + ret = PTR_ERR(priv->cm.srq);
>> + priv->cm.srq = NULL;
>> + return ret;
>
> nit: you can just return PTR_ERR here, and remove the ret stack variable
okay
>
>> + }
>> +
>> + priv->cm.srq_ring = kzalloc(ipoib_recvq_size *
>> + sizeof *priv->cm.srq_ring,
>> + GFP_KERNEL);
>> + if (!priv->cm.srq_ring) {
>> + printk(KERN_WARNING "%s: failed to allocate CM ring "
>> + "(%d entries)\n",
>> + priv->ca->name, ipoib_recvq_size);
>> + ipoib_cm_dev_cleanup(dev);
>> + return -ENOMEM;
>> + }
>> +
>> + return 0;
>> +}
>> +
>> int ipoib_cm_dev_init(struct net_device *dev)
>> {
>> struct ipoib_dev_priv *priv = netdev_priv(dev);
>> - struct ib_srq_init_attr srq_init_attr = {
>> - .attr = {
>> - .max_wr = ipoib_recvq_size,
>> - .max_sge = IPOIB_CM_RX_SG
>> - }
>> - };
>> int ret, i;
>> + struct ib_device_attr attr;
>>
>> INIT_LIST_HEAD(&priv->cm.passive_ids);
>> INIT_LIST_HEAD(&priv->cm.reap_list);
>> @@ -1305,20 +1702,33 @@ int ipoib_cm_dev_init(struct net_device
>>
>> skb_queue_head_init(&priv->cm.skb_queue);
>>
>> - priv->cm.srq = ib_create_srq(priv->pd, &srq_init_attr);
>> - if (IS_ERR(priv->cm.srq)) {
>> - ret = PTR_ERR(priv->cm.srq);
>> - priv->cm.srq = NULL;
>> + if (ret = ib_query_device(priv->ca, &attr))
>> return ret;
>
> double parens around assignment - also below
okay
>
>> - }
>>
>> - priv->cm.srq_ring = kzalloc(ipoib_recvq_size * sizeof
> *priv->cm.srq_ring,
>> - GFP_KERNEL);
>> - if (!priv->cm.srq_ring) {
>> - printk(KERN_WARNING "%s: failed to allocate CM ring (%d
>> entries)\n",
>> - priv->ca->name, ipoib_recvq_size);
>> - ipoib_cm_dev_cleanup(dev);
>> - return -ENOMEM;
>> + if (attr.max_srq) {
>> + /* This device supports SRQ */
>> + if (ret = create_srq(dev, priv))
>> + return ret;
>> + priv->cm.rx_index_table = NULL;
>> + } else {
>> + priv->cm.srq = NULL;
>> + priv->cm.srq_ring = NULL;
>> +
>> + /* Every new REQ that arrives creates a struct ipoib_cm_rx.
>> + * These structures form a link list starting with the
>> + * passive_ids. For quick and easy access we maintain a table
>> + * of pointers to struct ipoib_cm_rx called the rx_index_table
>> + */
>
> Why store the structures in a linked list if they're stored in a table?
This linked list is common to both SRQ and NOSRQ. Only the NOSRQ code
uses the table.
>
>> + priv->cm.rx_index_table = kzalloc(NOSRQ_INDEX_TABLE_SIZE *
>> + sizeof *priv->cm.rx_index_table,
>> + GFP_KERNEL);
>> + if (!priv->cm.rx_index_table) {
>> + printk(KERN_WARNING "Failed to allocate
>> NOSRQ_INDEX_TABLE\n");
>> + return -ENOMEM;
>> + }
>> +
>> + spin_lock_init(&nosrq_count.lock);
>> + nosrq_count.current_rc_qp = 0;
>> }
>>
>> for (i = 0; i < IPOIB_CM_RX_SG; ++i)
>> @@ -1331,17 +1741,23 @@ int ipoib_cm_dev_init(struct net_device
>> priv->cm.rx_wr.sg_list = priv->cm.rx_sge;
>> priv->cm.rx_wr.num_sge = IPOIB_CM_RX_SG;
>>
>> - for (i = 0; i < ipoib_recvq_size; ++i) {
>> - if (!ipoib_cm_alloc_rx_skb(dev, i, IPOIB_CM_RX_SG - 1,
>> + /* One can post receive buffers even before the RX QP is created
>> + * only in the SRQ case. Therefore for NOSRQ we skip the rest of init
>> + * and do that in ipoib_cm_req_handler() */
>
> This is separate from this patch, but why not wait to post receives to a SRQ
> only after we've received a REQ? Would this simplify the code any?
Good point. We could think of that in the future.
>
> - Sean
>
More information about the general
mailing list