[ofa-general] Re: IPOIB CM (NOSRQ)[PATCH V3] patch for review
Michael S. Tsirkin
mst at dev.mellanox.co.il
Tue May 1 23:46:48 PDT 2007
OK, we are making progress (line-wrapping issues aside :). And there seems to
be some whitespace damage, too. Pls take care of this.
I think the handle_rx_wc split is going in the right direction,
but let's take this through all the datapath.
I went over the patch in a bit more depth, and I have some questions:
> + for (i = 0; i < ipoib_recvq_size; ++i) {
> + if (!ipoib_cm_alloc_rx_skb(dev, i << 32 | index,
...
> + if (ipoib_cm_post_receive(dev, i << 32 | index)) {
1. It seems there are multiple QPs mapped to a single CQ -
and each gets ipoib_recvq_size recv WRs above.
Is that right? How do you prevent CQ overrun then?
> + /* Find an empty rx_index_ring[] entry */
> + for (index = 0; index < NOSRQ_INDEX_RING_SIZE; index++)
> + if (priv->cm.rx_index_ring[index] == NULL)
> + break;
> +
> + if ( index == NOSRQ_INDEX_RING_SIZE) {
> + spin_unlock_irq(&priv->lock);
> + printk(KERN_WARNING "NOSRQ supports a max of %d RC "
> + "QPs. That limit has now been reached\n",
> + NOSRQ_INDEX_RING_SIZE);
> + return -EINVAL;
> + }
2. So, when QP limit has been reached, remote side will get
a reject with custom reject reason?
Is so, it seems that since the remote does not know
what the reason for reject is, it'll just retry
on the next packet, and again and again. Basically,
connectivity is denied where it previously worked fine
by falling back on datagram mode?
One way to fix this, could be to try and use a reject reason
that will tell the remote "I'm busy, switch to datagram mode
for a loooooong time". Using path mtu discovery here might be useful
to actually have it come back and retry after several minutes.
*In theory*, we could get this even with SRQ -
if the *HCA* starts running out of RC QPs - it is just
never happening in practice as current HCAs support #QPs larger
than a maximum IB subnet size.
So I might post a patch to implement this, stay tuned.
> + spin_lock_irqsave(&priv->lock, flags);
> + rx_ptr = priv->cm.rx_index_ring[index];
> + spin_unlock_irqrestore(&priv->lock, flags);
3. You never actually test the rx_ptr that you got.
So why does locking help?
A better way to destroy QPs might be to move it to error state first.
We actually need something like this for CM too - stay tuned for a patch.
I also commented on some style issues below.
> Note 1: I have retained the code to avoid IB_WC_RETRY_EXC_ERR while performing
> interoperability tests As discussed in this mailing list that may be a CM bug or
> have the various HCA address it. Hence I would like to seperate out that issue
> from this patch.
> At a future point when the issue gets resolved I can provide
> another patch to change the retry_count values back to 0 if need be.
The correct way to separate it, in my opinion, is to set retry_count = 0,
and (for now) apply a work-around patch at your site before testing.
We really don't want to paper over this bug, in my opinion.
A general suggestion, before we dive into code: document, first of
all, data structures, then functions.
Rest of code quite often can be made self documenting.
Stuff like if (!srq) /* no SRQ */, and } /* end of loop */
is really not telling us anything useful.
> --- a/linux-2.6.21-rc7/drivers/infiniband/ulp/ipoib/ipoib.h 2007-04-24 18:10:17.000000000 -0700
> +++ b//linux-2.6.21-rc7/drivers/infiniband/ulp/ipoib/ipoib.h 2007-04-25 10:11:34.000000000 -0700
> @@ -99,6 +99,12 @@ enum {
> #define IPOIB_OP_RECV (1ul << 31)
> #ifdef CONFIG_INFINIBAND_IPOIB_CM
> #define IPOIB_CM_OP_SRQ (1ul << 30)
> +#define IPOIB_CM_OP_NOSRQ (1ul << 29)
> +
> +/* These two go hand in hand */
> +#define NOSRQ_INDEX_RING_SIZE 1024
> +#define NOSRQ_INDEX_MASK 0x00000000000003ff
> +
When you need a comment for 2 lines of code is when you
know something's really obscure.
How about
#define NOSRQ_INDEX_MASK (NOSRQ_INDEX_RING_SIZE - 1)
and we can kill the comment?
> #else
> #define IPOIB_CM_OP_SRQ (0)
> #endif
> @@ -136,9 +142,11 @@ struct ipoib_cm_data {
> struct ipoib_cm_rx {
> struct ib_cm_id *id;
> struct ib_qp *qp;
> + struct ipoib_cm_rx_buf *rx_ring;
Alignment's broken here.
> struct list_head list;
> struct net_device *dev;
> unsigned long jiffies;
> + u32 index;
index and rx_ring are only valid for non-srq code, right?
I think we need a comment of some kind to tell us this.
> };
>
> struct ipoib_cm_tx {
> @@ -177,6 +185,7 @@ struct ipoib_cm_dev_priv {
> struct ib_wc ibwc[IPOIB_NUM_WC];
> struct ib_sge rx_sge[IPOIB_CM_RX_SG];
> struct ib_recv_wr rx_wr;
> + struct ipoib_cm_rx **rx_index_ring;
> };
>
> /*
Isn't "ring" a bit of a misnomer?
Also - you have multiple QPs mapped to a single CQ - how do you prevent CQ overrun?
> --- a/linux-2.6.21-rc7/drivers/infiniband/ulp/ipoib/ipoib_cm.c 2007-04-24 18:10:17.000000000 -0700
> +++ b//linux-2.6.21-rc7/drivers/infiniband/ulp/ipoib/ipoib_cm.c 2007-04-27 14:03:40.000000000 -0700
> @@ -76,7 +76,7 @@ static void ipoib_cm_dma_unmap_rx(struct
> ib_dma_unmap_single(priv->ca, mapping[i + 1], PAGE_SIZE, DMA_FROM_DEVICE);
> }
>
> -static int ipoib_cm_post_receive(struct net_device *dev, int id)
> +static int post_receive_srq(struct net_device *dev, u64 id)
> {
> struct ipoib_dev_priv *priv = netdev_priv(dev);
> struct ib_recv_wr *bad_wr;
> @@ -85,13 +85,14 @@ static int ipoib_cm_post_receive(struct
> priv->cm.rx_wr.wr_id = id | IPOIB_CM_OP_SRQ;
>
> for (i = 0; i < IPOIB_CM_RX_SG; ++i)
> - priv->cm.rx_sge[i].addr = priv->cm.srq_ring[id].mapping[i];
> + priv->cm.rx_sge[i].addr =
> + priv->cm.srq_ring[id].mapping[i];
The line wasn't too long here, so why wrap it?
And continuation lines need to be shifted *significantly*
to the right.
> ret = ib_post_srq_recv(priv->cm.srq, &priv->cm.rx_wr, &bad_wr);
> if (unlikely(ret)) {
> ipoib_warn(priv, "post srq failed for buf %d (%d)\n", id, ret);
> ipoib_cm_dma_unmap_rx(priv, IPOIB_CM_RX_SG - 1,
> - priv->cm.srq_ring[id].mapping);
> + priv->cm.srq_ring[id].mapping);
what's the deal here?
> dev_kfree_skb_any(priv->cm.srq_ring[id].skb);
> priv->cm.srq_ring[id].skb = NULL;
> }
> @@ -99,12 +100,69 @@ static int ipoib_cm_post_receive(struct
> return ret;
> }
>
> -static struct sk_buff *ipoib_cm_alloc_rx_skb(struct net_device *dev, int id, int frags,
> +static int post_receive_nosrq(struct net_device *dev, u64 id)
> +{
> + struct ipoib_dev_priv *priv = netdev_priv(dev);
> + struct ib_recv_wr *bad_wr;
> + int i, ret;
> + u32 index;
> + u64 wr_id;
> + struct ipoib_cm_rx *rx_ptr;
> + unsigned long flags;
> +
> + index = id & NOSRQ_INDEX_MASK ;
> + wr_id = id >> 32;
So wr_id has always, ever, 32 lower bits set - why make it u64 then?
> + /* There is a slender chance of a race between the stale_task
> + * running after a period of inactivity and the receipt of
> + * a packet being processed at about the same instant.
> + * Hence the lock */
I think you can get rid of this, by changing the stale task code:
move QP to error, and wait for WRs posted to complete.
Then there won't be any more completions for this QP.
As it is, I'm not convinced you can't get a completion after
QP has been removed out of the array - so it seems the race hasn't
been solved here?
We actually need something like this for CM too -
stay tuned for a patch.
> + spin_lock_irqsave(&priv->lock, flags);
> + rx_ptr = priv->cm.rx_index_ring[index];
> + spin_unlock_irqrestore(&priv->lock, flags);
> +
> + priv->cm.rx_wr.wr_id = wr_id << 32 | index | IPOIB_CM_OP_NOSRQ;
Isn't this just id, again?
> + for (i = 0; i < IPOIB_CM_RX_SG; ++i)
> + priv->cm.rx_sge[i].addr = rx_ptr->rx_ring[wr_id].mapping[i];
> +
> + ret = ib_post_recv(rx_ptr->qp, &priv->cm.rx_wr, &bad_wr);
> + if (unlikely(ret)) {
> + ipoib_warn(priv, "post recv failed for buf %d (%d)\n",
> + wr_id, ret);
> + ipoib_cm_dma_unmap_rx(priv, IPOIB_CM_RX_SG - 1,
> + rx_ptr->rx_ring[wr_id].mapping);
> + dev_kfree_skb_any(rx_ptr->rx_ring[wr_id].skb);
> + rx_ptr->rx_ring[wr_id].skb = NULL;
> + }
> +
> + return ret;
> +}
> +
> +static int ipoib_cm_post_receive(struct net_device *dev, u64 id)
> +{
> + struct ipoib_dev_priv *priv = netdev_priv(dev);
> + int ret;
> +
> + if (priv->cm.srq)
> + ret = post_receive_srq(dev, id);
> + else
> + ret = post_receive_nosrq(dev, id);
> +
> + return ret;
> +}
I think you can split this one now that srq/nonsrq completions are
handled separately.
> +static struct sk_buff *ipoib_cm_alloc_rx_skb(struct net_device *dev, u64 id,
> + int frags,
> u64 mapping[IPOIB_CM_RX_SG])
> {
> struct ipoib_dev_priv *priv = netdev_priv(dev);
> struct sk_buff *skb;
> int i;
> + struct ipoib_cm_rx *rx_ptr;
> + u32 index, wr_id;
> + unsigned long flags;
>
> skb = dev_alloc_skb(IPOIB_CM_HEAD_SIZE + 12);
> if (unlikely(!skb))
> @@ -123,7 +181,7 @@ static struct sk_buff *ipoib_cm_alloc_rx
> return NULL;
> }
>
> - for (i = 0; i < frags; i++) {
> + for (i = 0; i < frags; i++) {
what's the deal here?
> struct page *page = alloc_page(GFP_ATOMIC);
>
> if (!page)
> @@ -136,7 +194,17 @@ static struct sk_buff *ipoib_cm_alloc_rx
> goto partial_error;
> }
>
> - priv->cm.srq_ring[id].skb = skb;
> + if (priv->cm.srq)
> + priv->cm.srq_ring[id].skb = skb;
> + else {
> + index = id & NOSRQ_INDEX_MASK ;
> + wr_id = id >> 32;
> + spin_lock_irqsave(&priv->lock, flags);
> + rx_ptr = priv->cm.rx_index_ring[index];
> + spin_unlock_irqrestore(&priv->lock, flags);
See above about the locking here. Try to get rid of it - this is datapath.
> +
> + rx_ptr->rx_ring[wr_id].skb = skb;
> + }
> return skb;
>
> partial_error:
A branch on datapath just for 2 lines that are different
is not worth it. Just keep common code in ipoib_cm_alloc_rx,
and move lines that differ to the site of call.
> @@ -157,13 +225,20 @@ static struct ib_qp *ipoib_cm_create_rx_
> struct ib_qp_init_attr attr = {
> .send_cq = priv->cq, /* does not matter, we never send anything */
> .recv_cq = priv->cq,
> - .srq = priv->cm.srq,
> .cap.max_send_wr = 1, /* FIXME: 0 Seems not to work */
> + .cap.max_recv_wr = ipoib_recvq_size + 1,
> .cap.max_send_sge = 1, /* FIXME: 0 Seems not to work */
> + .cap.max_recv_sge = IPOIB_CM_RX_SG, /* Is this correct? */
I don't think we should set both attr.srq and max_recv_sge
for a QP connected to SRQ.
> .sq_sig_type = IB_SIGNAL_ALL_WR,
> .qp_type = IB_QPT_RC,
> .qp_context = p,
> };
> +
> + if (priv->cm.srq)
> + attr.srq = priv->cm.srq;
> + else
> + attr.srq = NULL;
Since attr has an initializer, attr.srq is already 0
unless you set it.
> +
> return ib_create_qp(priv->pd, &attr);
> }
>
> @@ -198,6 +273,7 @@ static int ipoib_cm_modify_rx_qp(struct
> ipoib_warn(priv, "failed to modify QP to RTR: %d\n", ret);
> return ret;
> }
> +
Kill this.
> return 0;
> }
>
> @@ -217,12 +293,87 @@ static int ipoib_cm_send_rep(struct net_
> rep.flow_control = 0;
> rep.rnr_retry_count = req->rnr_retry_count;
> rep.target_ack_delay = 20; /* FIXME */
> - rep.srq = 1;
> rep.qp_num = qp->qp_num;
> rep.starting_psn = psn;
> +
> + if (priv->cm.srq)
> + rep.srq = 1;
> + else
> + rep.srq = 0;
> return ib_send_cm_rep(cm_id, &rep);
> }
>
> +int allocate_and_post_rbuf_nosrq(struct ib_cm_id *cm_id, struct ipoib_cm_rx *p, unsigned psn)
This one's too long I think.
> +{
> + struct net_device *dev = cm_id->context;
> + struct ipoib_dev_priv *priv = netdev_priv(dev);
> + int ret;
> + u32 qp_num, index;
> + u64 i;
> +
> + qp_num = p->qp->qp_num;
> + /* Allocate space for the rx_ring here */
You mostly want to kill such comments - they take up code lines
and don't really tell anything useful.
> + p->rx_ring = kzalloc(ipoib_recvq_size * sizeof *p->rx_ring,
> + GFP_KERNEL);
> + if (p->rx_ring == NULL)
> + return -ENOMEM;
> +
> + cm_id->context = p;
> + p->jiffies = jiffies;
> + spin_lock_irq(&priv->lock);
> + list_add(&p->list, &priv->cm.passive_ids);
> +
> + /* Find an empty rx_index_ring[] entry */
And this.
> + for (index = 0; index < NOSRQ_INDEX_RING_SIZE; index++)
> + if (priv->cm.rx_index_ring[index] == NULL)
> + break;
No == NULL tests please.
> +
> + if ( index == NOSRQ_INDEX_RING_SIZE) {
> + spin_unlock_irq(&priv->lock);
> + printk(KERN_WARNING "NOSRQ supports a max of %d RC "
> + "QPs. That limit has now been reached\n",
> + NOSRQ_INDEX_RING_SIZE);
> + return -EINVAL;
> + }
So, when QP limit has been reached, connectivity is denied
where it previously worked fine in datagram mode?
This looks like an important regression.
> + /* Store the pointer to retrieve it later using the index */
Kill this too.
> + priv->cm.rx_index_ring[index] = p;
> + spin_unlock_irq(&priv->lock);
> + p->index = index;
> +
> + ret = ipoib_cm_modify_rx_qp(dev, cm_id, p->qp, psn);
> + if (ret) {
> + ipoib_warn(priv, "ipoib_cm_modify_rx_qp() failed %d\n", ret);
> + goto err_modify_nosrq;
> + }
> +
> + for (i = 0; i < ipoib_recvq_size; ++i) {
> + if (!ipoib_cm_alloc_rx_skb(dev, i << 32 | index,
> + IPOIB_CM_RX_SG - 1,
> + p->rx_ring[i].mapping)) {
> + ipoib_warn(priv, "failed to allocate receive "
> + "buffer %d\n", i);
> + ipoib_cm_dev_cleanup(dev);
> + /* Free rx_ring previously allocated */
And this.
> + kfree(p->rx_ring);
> + return -ENOMEM;
> + }
> +
> + /* Can we call the nosrq version? */
what's the deal here?
> + if (ipoib_cm_post_receive(dev, i << 32 | index)) {
> + ipoib_warn(priv, "ipoib_ib_post_receive "
> + "failed for buf %d\n", i);
> + ipoib_cm_dev_cleanup(dev);
> + return -EIO;
> + }
> + } /* end for */
And surely this.
> +
> + return 0;
> +
> +err_modify_nosrq:
> + return ret;
> +}
> +
> static int ipoib_cm_req_handler(struct ib_cm_id *cm_id, struct ib_cm_event *event)
> {
> struct net_device *dev = cm_id->context;
> @@ -243,10 +394,17 @@ static int ipoib_cm_req_handler(struct i
> goto err_qp;
> }
>
> - psn = random32() & 0xffffff;
> - ret = ipoib_cm_modify_rx_qp(dev, cm_id, p->qp, psn);
> - if (ret)
> - goto err_modify;
> + if (priv->cm.srq == NULL) { /* NOSRQ */
No == NULL tests please. Also - what does the comment tell us
that we don't already know?
> + psn = random32() & 0xffffff;
random call could be in common code?
> + if (ret = allocate_and_post_rbuf_nosrq(cm_id, p, psn))
> + goto err_modify;
> + } else { /* SRQ */
What does the comment tell us that we don't already know?
> + p->rx_ring = NULL; /* This is used only by NOSRQ */
> + psn = random32() & 0xffffff;
> + ret = ipoib_cm_modify_rx_qp(dev, cm_id, p->qp, psn);
> + if (ret)
> + goto err_modify;
> + }
>
> ret = ipoib_cm_send_rep(dev, cm_id, p->qp, &event->param.req_rcvd, psn);
> if (ret) {
> @@ -254,11 +412,13 @@ static int ipoib_cm_req_handler(struct i
> goto err_rep;
> }
>
> - cm_id->context = p;
> - p->jiffies = jiffies;
> - spin_lock_irq(&priv->lock);
> - list_add(&p->list, &priv->cm.passive_ids);
> - spin_unlock_irq(&priv->lock);
> + if (priv->cm.srq) {
> + cm_id->context = p;
> + p->jiffies = jiffies;
> + spin_lock_irq(&priv->lock);
> + list_add(&p->list, &priv->cm.passive_ids);
> + spin_unlock_irq(&priv->lock);
> + }
> queue_delayed_work(ipoib_workqueue,
> &priv->cm.stale_task, IPOIB_CM_RX_DELAY);
> return 0;
> @@ -339,23 +499,40 @@ static void skb_put_frags(struct sk_buff
> }
> }
>
> -void ipoib_cm_handle_rx_wc(struct net_device *dev, struct ib_wc *wc)
> +static void timer_check(struct ipoib_dev_priv *priv, struct ipoib_cm_rx *p)
> +{
> + unsigned long flags;
> +
> + if (time_after_eq(jiffies, p->jiffies + IPOIB_CM_RX_UPDATE_TIME)) {
Now that it's a separate function, we can
if (time_before(....))
return;
> + spin_lock_irqsave(&priv->lock, flags);
> + p->jiffies = jiffies;
> + /* Move this entry to list head, but do
> + * not re-add it if it has been removed. */
> + if (!list_empty(&p->list))
> + list_move(&p->list, &priv->cm.passive_ids);
> + spin_unlock_irqrestore(&priv->lock, flags);
> + queue_delayed_work(ipoib_workqueue,
> + &priv->cm.stale_task, IPOIB_CM_RX_DELAY);
> + }
> +}
> +static int handle_rx_wc_srq(struct net_device *dev, struct ib_wc *wc)
Why is making this an int a good idea?
You aren't doing anything useful with this down the line.
> {
> struct ipoib_dev_priv *priv = netdev_priv(dev);
> - unsigned int wr_id = wc->wr_id & ~IPOIB_CM_OP_SRQ;
> struct sk_buff *skb, *newskb;
> + u64 mapping[IPOIB_CM_RX_SG], wr_id;
> struct ipoib_cm_rx *p;
> unsigned long flags;
> - u64 mapping[IPOIB_CM_RX_SG];
> - int frags;
> + int frags, ret;
> +
> + wr_id = wc->wr_id & ~IPOIB_CM_OP_SRQ;
I like initing the variable at declaration site.
If you wan to change the style, maybe make it a separate patch?
> ipoib_dbg_data(priv, "cm recv completion: id %d, status: %d\n",
> wr_id, wc->status);
>
> if (unlikely(wr_id >= ipoib_recvq_size)) {
> - ipoib_warn(priv, "cm recv completion event with wrid %d (> %d)\n",
> - wr_id, ipoib_recvq_size);
> - return;
> + ipoib_warn(priv, "cm recv completion event with wrid %d "
> + "(> %d)\n", wr_id, ipoib_recvq_size);
the line wasn't too long before, so why wrap it?
> + return 1;
> }
>
> skb = priv->cm.srq_ring[wr_id].skb;
> @@ -365,22 +542,12 @@ void ipoib_cm_handle_rx_wc(struct net_de
> "(status=%d, wrid=%d vend_err %x)\n",
> wc->status, wr_id, wc->vendor_err);
> ++priv->stats.rx_dropped;
> - goto repost;
> + goto repost_srq;
> }
>
> if (!likely(wr_id & IPOIB_CM_RX_UPDATE_MASK)) {
> p = wc->qp->qp_context;
> - if (time_after_eq(jiffies, p->jiffies + IPOIB_CM_RX_UPDATE_TIME)) {
> - spin_lock_irqsave(&priv->lock, flags);
> - p->jiffies = jiffies;
> - /* Move this entry to list head, but do
> - * not re-add it if it has been removed. */
> - if (!list_empty(&p->list))
> - list_move(&p->list, &priv->cm.passive_ids);
> - spin_unlock_irqrestore(&priv->lock, flags);
> - queue_delayed_work(ipoib_workqueue,
> - &priv->cm.stale_task, IPOIB_CM_RX_DELAY);
> - }
> + timer_check(priv, p);
> }
>
> frags = PAGE_ALIGN(wc->byte_len - min(wc->byte_len,
> @@ -388,22 +555,119 @@ void ipoib_cm_handle_rx_wc(struct net_de
>
> newskb = ipoib_cm_alloc_rx_skb(dev, wr_id, frags, mapping);
> if (unlikely(!newskb)) {
> - /*
> - * If we can't allocate a new RX buffer, dump
> - * this packet and reuse the old buffer.
> - */
> - ipoib_dbg(priv, "failed to allocate receive buffer %d\n", wr_id);
> + /*
> + * If we can't allocate a new RX buffer, dump
> + * this packet and reuse the old buffer.
> + */
> + ipoib_dbg(priv, "failed to allocate receive buffer %d\n", wr_id);
> + ++priv->stats.rx_dropped;
> + goto repost_srq;
> + }
> +
> + ipoib_cm_dma_unmap_rx(priv, frags,
> + priv->cm.srq_ring[wr_id].mapping);
> + memcpy(priv->cm.srq_ring[wr_id].mapping, mapping,
> + (frags + 1) * sizeof *mapping);
> + ipoib_dbg_data(priv, "received %d bytes, SLID 0x%04x\n",
> + wc->byte_len, wc->slid);
> +
> + skb_put_frags(skb, IPOIB_CM_HEAD_SIZE, wc->byte_len, newskb);
> +
> + skb->protocol = ((struct ipoib_header *) skb->data)->proto;
> + skb->mac.raw = skb->data;
> + skb_pull(skb, IPOIB_ENCAP_LEN);
> +
> + dev->last_rx = jiffies;
> + ++priv->stats.rx_packets;
> + priv->stats.rx_bytes += skb->len;
> +
> + skb->dev = dev;
> + /* XXX get correct PACKET_ type here */
> + skb->pkt_type = PACKET_HOST;
> +
> + netif_rx_ni(skb);
> +
> +repost_srq:
Labels don't need to be unique cross-function.
So you can call this one repost:
> + ret = ipoib_cm_post_receive(dev, wr_id);
> +
> + if (unlikely(ret)) {
> + ipoib_warn(priv, "ipoib_cm_post_receive failed for buf %d\n",
> + wr_id);
> + return 1;
> + }
> +
> + return 0;
> +
> +}
> +
> +static int handle_rx_wc_nosrq(struct net_device *dev, struct ib_wc *wc)
> +{
> + struct ipoib_dev_priv *priv = netdev_priv(dev);
> + struct sk_buff *skb, *newskb;
> + u64 mapping[IPOIB_CM_RX_SG], wr_id;
> + u32 index;
> + struct ipoib_cm_rx *p, *rx_ptr;
> + unsigned long flags;
> + int frags, ret;
> +
> +
> + wr_id = wc->wr_id >> 32;
> +
> + ipoib_dbg_data(priv, "cm recv completion: id %d, status: %d\n",
> + wr_id, wc->status);
> +
> + if (unlikely(wr_id >= ipoib_recvq_size)) {
> + ipoib_warn(priv, "cm recv completion event with wrid %d "
> + "(> %d)\n", wr_id, ipoib_recvq_size);
> + return 1;
> + }
> +
> + index = (wc->wr_id & ~IPOIB_CM_OP_NOSRQ) & NOSRQ_INDEX_MASK ;
> + spin_lock_irqsave(&priv->lock, flags);
> + rx_ptr = priv->cm.rx_index_ring[index];
> + spin_unlock_irqrestore(&priv->lock, flags);
> +
> + skb = rx_ptr->rx_ring[wr_id].skb;
> +
> + if (unlikely(wc->status != IB_WC_SUCCESS)) {
> + ipoib_dbg(priv, "cm recv error "
> + "(status=%d, wrid=%d vend_err %x)\n",
> + wc->status, wr_id, wc->vendor_err);
> ++priv->stats.rx_dropped;
> - goto repost;
> + goto repost_nosrq;
> }
>
> - ipoib_cm_dma_unmap_rx(priv, frags, priv->cm.srq_ring[wr_id].mapping);
> - memcpy(priv->cm.srq_ring[wr_id].mapping, mapping, (frags + 1) * sizeof *mapping);
> + if (!likely(wr_id & IPOIB_CM_RX_UPDATE_MASK)) {
> + /* There are no guarantees that wc->qp is not NULL for HCAs
> + * that do not support SRQ. */
> + p = rx_ptr;
> + timer_check(priv, p);
> + }
> +
> + frags = PAGE_ALIGN(wc->byte_len - min(wc->byte_len,
> + (unsigned)IPOIB_CM_HEAD_SIZE)) / PAGE_SIZE;
> +
> + newskb = ipoib_cm_alloc_rx_skb(dev, wr_id << 32 | index, frags,
> + mapping);
> + if (unlikely(!newskb)) {
> + /*
> + * If we can't allocate a new RX buffer, dump
> + * this packet and reuse the old buffer.
> + */
> + ipoib_dbg(priv, "failed to allocate receive buffer %d\n", wr_id);
> + ++priv->stats.rx_dropped;
> + goto repost_nosrq;
> + }
> +
> + ipoib_cm_dma_unmap_rx(priv, frags,
> + rx_ptr->rx_ring[wr_id].mapping);
> + memcpy(rx_ptr->rx_ring[wr_id].mapping, mapping,
> + (frags + 1) * sizeof *mapping);
>
> ipoib_dbg_data(priv, "received %d bytes, SLID 0x%04x\n",
> wc->byte_len, wc->slid);
>
> - skb_put_frags(skb, IPOIB_CM_HEAD_SIZE, wc->byte_len, newskb);
> + skb_put_frags(skb, IPOIB_CM_HEAD_SIZE, wc->byte_len, newskb);
>
> skb->protocol = ((struct ipoib_header *) skb->data)->proto;
> skb->mac.raw = skb->data;
> @@ -416,12 +680,34 @@ void ipoib_cm_handle_rx_wc(struct net_de
> skb->dev = dev;
> /* XXX get correct PACKET_ type here */
> skb->pkt_type = PACKET_HOST;
> +
> netif_rx_ni(skb);
>
> -repost:
> - if (unlikely(ipoib_cm_post_receive(dev, wr_id)))
> - ipoib_warn(priv, "ipoib_cm_post_receive failed "
> - "for buf %d\n", wr_id);
> +repost_nosrq:
Labels don't need to be unique cross-function.
So you can call this one repost:
> + ret = ipoib_cm_post_receive(dev, wr_id << 32 | index);
> +
> + if (unlikely(ret)) {
> + ipoib_warn(priv, "ipoib_cm_post_receive failed for buf %d\n",
> + wr_id);
> + return 1;
> + }
> +
> + return 0;
> +}
> +
> +void ipoib_cm_handle_rx_wc(struct net_device *dev, struct ib_wc *wc)
> +{
> + struct ipoib_dev_priv *priv = netdev_priv(dev);
> + int ret;
> +
> +
> + if (priv->cm.srq)
> + ret = handle_rx_wc_srq(dev, wc);
> + else
> + ret = handle_rx_wc_nosrq(dev, wc);
> +
> + if (unlikely(ret))
> + ipoib_warn(priv, "Error processing rx wc\n");
> }
See below about this.
> static inline int post_send(struct ipoib_dev_priv *priv,
> @@ -606,6 +892,22 @@ int ipoib_cm_dev_open(struct net_device
> return 0;
> }
>
> +static void free_resources_nosrq(struct ipoib_dev_priv *priv, struct ipoib_cm_rx *p)
> +{
I suggest you loose the _nosrq suffix and just do
if (priv->cq.srq)
return;
At the top of the function.
> + int i;
> +
> + for(i = 0; i < ipoib_recvq_size; ++i)
> + if(p->rx_ring[i].skb) {
> + ipoib_cm_dma_unmap_rx(priv,
> + IPOIB_CM_RX_SG - 1,
> + p->rx_ring[i].mapping);
> + dev_kfree_skb_any(p->rx_ring[i].skb);
> + p->rx_ring[i].skb = NULL;
> + }
> + kfree(p->rx_ring);
> +}
> +
> +
Loose double empty lines.
> void ipoib_cm_dev_stop(struct net_device *dev)
> {
> struct ipoib_dev_priv *priv = netdev_priv(dev);
> @@ -618,6 +920,8 @@ void ipoib_cm_dev_stop(struct net_device
> spin_lock_irq(&priv->lock);
> while (!list_empty(&priv->cm.passive_ids)) {
> p = list_entry(priv->cm.passive_ids.next, typeof(*p), list);
> + if (priv->cm.srq == NULL)
> + free_resources_nosrq(priv, p);
No == NULL tests please.
> list_del_init(&p->list);
> spin_unlock_irq(&priv->lock);
> ib_destroy_cm_id(p->id);
> @@ -703,9 +1007,14 @@ static struct ib_qp *ipoib_cm_create_tx_
> struct ipoib_dev_priv *priv = netdev_priv(dev);
> struct ib_qp_init_attr attr = {};
> attr.recv_cq = priv->cq;
> - attr.srq = priv->cm.srq;
> + if (priv->cm.srq)
> + attr.srq = priv->cm.srq;
> + else
> + attr.srq = NULL;
> attr.cap.max_send_wr = ipoib_sendq_size;
> + attr.cap.max_recv_wr = 1; /* Not in MST code */
> attr.cap.max_send_sge = 1;
> + attr.cap.max_recv_sge = 1; /* Not in MST code */
I don't think we should set both attr.srq and max_recv_sge
for a QP connected to SRQ.
> attr.sq_sig_type = IB_SIGNAL_ALL_WR;
> attr.qp_type = IB_QPT_RC;
> attr.send_cq = cq;
> @@ -742,10 +1051,13 @@ static int ipoib_cm_send_req(struct net_
> req.responder_resources = 4;
> req.remote_cm_response_timeout = 20;
> req.local_cm_response_timeout = 20;
> - req.retry_count = 0; /* RFC draft warns against retries */
> - req.rnr_retry_count = 0; /* RFC draft warns against retries */
> + req.retry_count = 6; /* RFC draft warns against retries */
> + req.rnr_retry_count = 6;/* RFC draft warns against retries */
> req.max_cm_retries = 15;
> - req.srq = 1;
> + if (priv->cm.srq)
> + req.srq = 1;
> + else
> + req.srq = 0;
> return ib_send_cm_req(id, &req);
> }
>
> @@ -1089,6 +1401,10 @@ static void ipoib_cm_stale_task(struct w
> p = list_entry(priv->cm.passive_ids.prev, typeof(*p), list);
> if (time_before_eq(jiffies, p->jiffies + IPOIB_CM_RX_TIMEOUT))
> break;
> + if (priv->cm.srq == NULL) { /* NOSRQ */
No == NULL tests please. Also - what does the comment tell us?
> + free_resources_nosrq(priv, p);
> + priv->cm.rx_index_ring[p->index] = NULL;
> + }
> list_del_init(&p->list);
> spin_unlock_irq(&priv->lock);
> ib_destroy_cm_id(p->id);
> @@ -1143,16 +1459,40 @@ int ipoib_cm_add_mode_attr(struct net_de
> return device_create_file(&dev->dev, &dev_attr_mode);
> }
>
> +static int create_srq(struct net_device *dev, struct ipoib_dev_priv *priv)
> +{
> + struct ib_srq_init_attr srq_init_attr;
> + int ret;
> +
> + srq_init_attr.attr.max_wr = ipoib_recvq_size;
> + srq_init_attr.attr.max_sge = IPOIB_CM_RX_SG;
> +
> + priv->cm.srq = ib_create_srq(priv->pd, &srq_init_attr);
> + if (IS_ERR(priv->cm.srq)) {
> + ret = PTR_ERR(priv->cm.srq);
> + priv->cm.srq = NULL;
> + return ret;
> + }
> +
> + priv->cm.srq_ring = kzalloc(ipoib_recvq_size *
> + sizeof *priv->cm.srq_ring,
> + GFP_KERNEL);
> + if (!priv->cm.srq_ring) {
> + printk(KERN_WARNING "%s: failed to allocate CM ring "
> + "(%d entries)\n",
> + priv->ca->name, ipoib_recvq_size);
> + ipoib_cm_dev_cleanup(dev);
Since you have separated create_srq from cm_dev_init,
calling ipoib_cm_dev_cleanup from it looks wrong.
> + return -ENOMEM;
> + }
> +
> + return 0;
> +}
> +
> int ipoib_cm_dev_init(struct net_device *dev)
> {
> struct ipoib_dev_priv *priv = netdev_priv(dev);
> - struct ib_srq_init_attr srq_init_attr = {
> - .attr = {
> - .max_wr = ipoib_recvq_size,
> - .max_sge = IPOIB_CM_RX_SG
> - }
> - };
> - int ret, i;
> + int ret, i, supports_srq;
> + struct ib_device_attr attr;
>
> INIT_LIST_HEAD(&priv->cm.passive_ids);
> INIT_LIST_HEAD(&priv->cm.reap_list);
> @@ -1164,21 +1504,26 @@ int ipoib_cm_dev_init(struct net_device
>
> skb_queue_head_init(&priv->cm.skb_queue);
>
> - priv->cm.srq = ib_create_srq(priv->pd, &srq_init_attr);
> - if (IS_ERR(priv->cm.srq)) {
> - ret = PTR_ERR(priv->cm.srq);
> - priv->cm.srq = NULL;
> + if (ret = ib_query_device(priv->ca, &attr))
> return ret;
I think a cleaner way would be to just test device->create_srq.
> + if (attr.max_srq)
> + supports_srq = 1; /* This device supports SRQ */
> + else {
> + supports_srq = 0;
> }
>
> - priv->cm.srq_ring = kzalloc(ipoib_recvq_size * sizeof *priv->cm.srq_ring,
> - GFP_KERNEL);
> - if (!priv->cm.srq_ring) {
> - printk(KERN_WARNING "%s: failed to allocate CM ring (%d entries)\n",
> - priv->ca->name, ipoib_recvq_size);
> - ipoib_cm_dev_cleanup(dev);
> - return -ENOMEM;
> - }
> + if (supports_srq) {
> + if (ret = create_srq(dev, priv))
> + return ret;
> +
> + priv->cm.rx_index_ring = NULL; /* Not needed for SRQ */
> + } else {
> + priv->cm.srq = NULL;
> + priv->cm.srq_ring = NULL;
> + priv->cm.rx_index_ring = kzalloc(NOSRQ_INDEX_RING_SIZE *
> + sizeof *priv->cm.rx_index_ring,
> + GFP_KERNEL);
> + }
>
> for (i = 0; i < IPOIB_CM_RX_SG; ++i)
> priv->cm.rx_sge[i].lkey = priv->mr->lkey;
do we really need supports_srq variable? It's only used once ...
> @@ -1190,19 +1535,25 @@ int ipoib_cm_dev_init(struct net_device
> priv->cm.rx_wr.sg_list = priv->cm.rx_sge;
> priv->cm.rx_wr.num_sge = IPOIB_CM_RX_SG;
>
> - for (i = 0; i < ipoib_recvq_size; ++i) {
> - if (!ipoib_cm_alloc_rx_skb(dev, i, IPOIB_CM_RX_SG - 1,
> + /* One can post receive buffers even before the RX QP is created
> + * only in the SRQ case. Therefore for NOSRQ we skip the rest of init
> + * and do that in ipoib_cm_req_handler() */
> +
> + if (priv->cm.srq) {
> + for (i = 0; i < ipoib_recvq_size; ++i) {
> + if (!ipoib_cm_alloc_rx_skb(dev, i, IPOIB_CM_RX_SG - 1,
> priv->cm.srq_ring[i].mapping)) {
> - ipoib_warn(priv, "failed to allocate receive buffer %d\n", i);
> - ipoib_cm_dev_cleanup(dev);
> - return -ENOMEM;
> - }
> - if (ipoib_cm_post_receive(dev, i)) {
> - ipoib_warn(priv, "ipoib_ib_post_receive failed for buf %d\n", i);
> - ipoib_cm_dev_cleanup(dev);
> - return -EIO;
> + ipoib_warn(priv, "failed to allocate receive buffer %d\n", i);
> + ipoib_cm_dev_cleanup(dev);
> + return -ENOMEM;
> + }
> + if (ipoib_cm_post_receive(dev, i)) {
> + ipoib_warn(priv, "ipoib_ib_post_receive failed for buf %d\n", i);
> + ipoib_cm_dev_cleanup(dev);
> + return -EIO;
> + }
> }
> - }
> + } /* if SRQ */
>
> priv->dev->dev_addr[0] = IPOIB_FLAGS_RC;
> return 0;
When you start adding /* if SRQ */ comments near the closing bracket,
is where you know your nesting is too deep.
How about
if (!priv->cm.srq)
goto done;
> --- a/linux-2.6.21-rc7/drivers/infiniband/ulp/ipoib/ipoib_ib.c 2007-04-24 18:10:17.000000000 -0700
> +++ b//linux-2.6.21-rc7/drivers/infiniband/ulp/ipoib/ipoib_ib.c 2007-04-25 10:11:34.000000000 -0700
> @@ -282,7 +282,7 @@ static void ipoib_ib_handle_tx_wc(struct
>
> static void ipoib_ib_handle_wc(struct net_device *dev, struct ib_wc *wc)
> {
> - if (wc->wr_id & IPOIB_CM_OP_SRQ)
> + if ((wc->wr_id & IPOIB_CM_OP_SRQ) || (wc->wr_id & IPOIB_CM_OP_NOSRQ))
> ipoib_cm_handle_rx_wc(dev, wc);
> else if (wc->wr_id & IPOIB_OP_RECV)
> ipoib_ib_handle_rx_wc(dev, wc);
So you have a branch on IPOIB_CM_OP_NOSRQ here, and you have
a branch on priv->srq down the line.
What I suggest instead, is split ipoib_ib_completion to SRQ/non-SRQ
variants, which will completely avoid extra branch cost at runtime.
--
MST
More information about the general
mailing list