[ofa-general] IPoIB CM (NOSRQ) [PATCH 1] review
Sean Hefty
mshefty at ichips.intel.com
Mon Sep 17 12:11:36 PDT 2007
Copied from web link. I didn't have this in my inbox anymore.
I should also mention that I'm not up on all of the IPoIB RFCs, so if
some of my comments don't apply to an RFC, just ignore them. :)
Most of these comments are about code organization, to make the with and
without srq code cleaner.
> --- a/linux-2.6.23-rc1/drivers/infiniband/ulp/ipoib/ipoib.h 2007-08-20 17:39:25.000000000 -0400
> +++ b/linux-2.6.23-rc1/drivers/infiniband/ulp/ipoib/ipoib.h 2007-08-20 17:49:14.000000000 -0400
> @@ -95,11 +95,14 @@ enum {
> IPOIB_MCAST_FLAG_ATTACHED = 3,
> };
>
> +#define CM_PACKET_SIZE (ALIGN(IPOIB_CM_MTU, PAGE_SIZE))
> #define IPOIB_OP_RECV (1ul << 31)
Inserting a blank line here...
> #ifdef CONFIG_INFINIBAND_IPOIB_CM
> -#define IPOIB_CM_OP_SRQ (1ul << 30)
> +#define IPOIB_CM_OP_RECV (1ul << 30)
> +
...but not here, would make it easier to follow the ifdef's.
> +#define NOSRQ_INDEX_TABLE_SIZE 128
I'm not fond of this name. It's really just the default number of
supported connected QPs. The value is only used in one place - to set
max_rc_qp, which is a module parameter anyway. We can just remove this
definition.
> #else
> -#define IPOIB_CM_OP_SRQ (0)
> +#define IPOIB_CM_OP_RECV (0)
> #endif
>
> /* structs */
> @@ -166,11 +169,14 @@ enum ipoib_cm_state {
> };
>
> struct ipoib_cm_rx {
> - struct ib_cm_id *id;
> - struct ib_qp *qp;
> - struct list_head list;
> - struct net_device *dev;
> - unsigned long jiffies;
> + struct ib_cm_id *id;
> + struct ib_qp *qp;
> + struct ipoib_cm_rx_buf *rx_ring; /* Used by NOSRQ only */
Nit: can we use 'no SRQ' or 'without SRQ', rather than 'NOSRQ' as a
single string? Or, alternately, only call out when SRQ is in use?
> + struct list_head list;
> + struct net_device *dev;
> + unsigned long jiffies;
> + u32 index; /* wr_ids are distinguished by index
> + * to identify the QP -NOSRQ only */
> enum ipoib_cm_state state;
> };
>
> @@ -215,6 +221,8 @@ struct ipoib_cm_dev_priv {
> struct ib_wc ibwc[IPOIB_NUM_WC];
> struct ib_sge rx_sge[IPOIB_CM_RX_SG];
> struct ib_recv_wr rx_wr;
> + struct ipoib_cm_rx **rx_index_table; /* See ipoib_cm_dev_init()
> + *for usage of this element */
Just call this rx_table. We're not storing indices...
Also, linking the entries would avoid the linear search through the
table. E.g.
struct ipoib_cm_rx_entry {
int next; /* -1 = end of list */
struct ipoib_cm_rx *rx;
};
struct ipoib_cm_dev_priv {
...
int free_entry; /* -1 = none free */
struct ipoib_cm_rx_entry **rx_table;
...
> };
>
> /*
> @@ -438,6 +446,7 @@ void ipoib_drain_cq(struct net_device *d
> /* We don't support UC connections at the moment */
> #define IPOIB_CM_SUPPORTED(ha) (ha[0] & (IPOIB_FLAGS_RC))
>
> +extern int max_rc_qp;
> static inline int ipoib_cm_admin_enabled(struct net_device *dev)
> {
> struct ipoib_dev_priv *priv = netdev_priv(dev);
> --- a/linux-2.6.23-rc1/drivers/infiniband/ulp/ipoib/ipoib_cm.c 2007-08-20 17:39:25.000000000 -0400
> +++ b/linux-2.6.23-rc1/drivers/infiniband/ulp/ipoib/ipoib_cm.c 2007-08-20 17:51:46.000000000 -0400
> @@ -49,6 +49,18 @@ MODULE_PARM_DESC(cm_data_debug_level,
>
> #include "ipoib.h"
>
> +int max_rc_qp = NOSRQ_INDEX_TABLE_SIZE;
> +static int max_recv_buf = 1024; /* Default is 1024 MB */
> +
> +module_param_named(nosrq_max_rc_qp, max_rc_qp, int, 0444);
> +MODULE_PARM_DESC(nosrq_max_rc_qp, "Max number of NOSRQ RC QPs supported; must be a power of 2");
Would multiple values be better here? Something like: max_conn_qp,
qp_type, and use_srq.
We're getting into a lot of possible options: UD, UD with SRQ (?), UC,
RC, RC with SRQ, UC with SRQ and spec changes... I'm guessing that each
one is useful under different configurations (fabric size, application
load, etc.) It would be nice if the framework moved in the direction of
supporting any of these. E.g. use 'conn' in place of 'rc'.
Also, why is max_rc_qp restricted to a power of 2? We can just let the
lower (30?) bits of a wr_id match the ipoib_cm_rx index. max_rc_qp just
needs to be less than 2^30, which is required anyway.
> +module_param_named(max_receive_buffer, max_recv_buf, int, 0644);
> +MODULE_PARM_DESC(max_receive_buffer, "Max Receive Buffer Size in MB");
Do we really need a new parameter here? What controls does the user
have access over? If they can set the max number of QPs, size of
each QP, and the size of each message, then I think we should eliminate
this. (And if they can't set each of these, then maybe we should look
at adding those parameters versus an all encompassing max memory type of
value.)
Btw, the naming and description are a little misleading. This is a
limit on all allocated receive buffers for all connected QPs. The name
and description make it sound like a limitation for a single buffer.
> +static atomic_t current_rc_qp = ATOMIC_INIT(0); /* Active number of RC QPs for NOSRQ */
Taking some of the changes above, we can drop this variable. Even
without the changes above, there's no point in setting max_rc_qp higher
than the number of QPs that the user could create because of
max_recv_buf limitations. In other words, I would rather see
current_rc_qp and max_recv_buf go away, but even if max_recv_buf were
kept, the recv_mem_used check in allocate_and_post_rbuf_nosrq() should
instead be used to limit setting max_rc_qp. (Hope this make sense.)
> +
> +#define NOSRQ_INDEX_MASK (max_rc_qp -1)
This goes away by just reserving the lower bits of the wr_id for the
rx_table index.
> #define IPOIB_CM_IETF_ID 0x1000000000000000ULL
>
> #define IPOIB_CM_RX_UPDATE_TIME (256 * HZ)
> @@ -81,20 +93,21 @@ static void ipoib_cm_dma_unmap_rx(struct
> ib_dma_unmap_single(priv->ca, mapping[i + 1], PAGE_SIZE, DMA_FROM_DEVICE);
> }
>
> -static int ipoib_cm_post_receive(struct net_device *dev, int id)
> +static int post_receive_srq(struct net_device *dev, u64 id)
> {
> struct ipoib_dev_priv *priv = netdev_priv(dev);
> struct ib_recv_wr *bad_wr;
> int i, ret;
>
> - priv->cm.rx_wr.wr_id = id | IPOIB_CM_OP_SRQ;
> + priv->cm.rx_wr.wr_id = id | IPOIB_CM_OP_RECV;
>
> for (i = 0; i < IPOIB_CM_RX_SG; ++i)
> priv->cm.rx_sge[i].addr = priv->cm.srq_ring[id].mapping[i];
>
> ret = ib_post_srq_recv(priv->cm.srq, &priv->cm.rx_wr, &bad_wr);
> if (unlikely(ret)) {
> - ipoib_warn(priv, "post srq failed for buf %d (%d)\n", id, ret);
> + ipoib_warn(priv, "post srq failed for buf %lld (%d)\n",
> + (unsigned long long)id, ret);
> ipoib_cm_dma_unmap_rx(priv, IPOIB_CM_RX_SG - 1,
> priv->cm.srq_ring[id].mapping);
> dev_kfree_skb_any(priv->cm.srq_ring[id].skb);
I see that the code was already this way, but it's not clear why unmap
and free_skb are called within this function. The mapping and skb
allocation are not done here. I would rather see the function that does
the mapping and allocation do the cleanup, rather than assuming that
it's done in a called routine. Optionally, move the mapping and
allocation into this routine.
> -static struct sk_buff *ipoib_cm_alloc_rx_skb(struct net_device *dev, int id, int frags,
> +static int post_receive_nosrq(struct net_device *dev, u64 id)
> +{
> + struct ipoib_dev_priv *priv = netdev_priv(dev);
> + struct ib_recv_wr *bad_wr;
> + int i, ret;
> + u32 index;
> + u32 wr_id;
> + struct ipoib_cm_rx *rx_ptr;
> +
> + index = id & NOSRQ_INDEX_MASK;
> + wr_id = id >> 32;
> +
> + rx_ptr = priv->cm.rx_index_table[index];
> +
> + priv->cm.rx_wr.wr_id = id | IPOIB_CM_OP_RECV;
> +
> + for (i = 0; i < IPOIB_CM_RX_SG; ++i)
> + priv->cm.rx_sge[i].addr = rx_ptr->rx_ring[wr_id].mapping[i];
> +
> + ret = ib_post_recv(rx_ptr->qp, &priv->cm.rx_wr, &bad_wr);
> + if (unlikely(ret)) {
> + ipoib_warn(priv, "post recv failed for buf %d (%d)\n",
> + wr_id, ret);
> + ipoib_cm_dma_unmap_rx(priv, IPOIB_CM_RX_SG - 1,
> + rx_ptr->rx_ring[wr_id].mapping);
> + dev_kfree_skb_any(rx_ptr->rx_ring[wr_id].skb);
> + rx_ptr->rx_ring[wr_id].skb = NULL;
> + }
same cleanup issue as above
> +
> + return ret;
> +}
> +
> +static struct sk_buff *ipoib_cm_alloc_rx_skb(struct net_device *dev, u64 id,
> + int frags,
> u64 mapping[IPOIB_CM_RX_SG])
> {
> struct ipoib_dev_priv *priv = netdev_priv(dev);
> struct sk_buff *skb;
> int i;
> + struct ipoib_cm_rx *rx_ptr;
> + u32 index, wr_id;
>
> skb = dev_alloc_skb(IPOIB_CM_HEAD_SIZE + 12);
> if (unlikely(!skb))
> @@ -141,7 +189,14 @@ static struct sk_buff *ipoib_cm_alloc_rx
> goto partial_error;
> }
>
> - priv->cm.srq_ring[id].skb = skb;
> + if (priv->cm.srq)
> + priv->cm.srq_ring[id].skb = skb;
> + else {
> + index = id & NOSRQ_INDEX_MASK;
> + wr_id = id >> 32;
> + rx_ptr = priv->cm.rx_index_table[index];
> + rx_ptr->rx_ring[wr_id].skb = skb;
> + }
> return skb;
>
> partial_error:
> @@ -203,11 +258,14 @@ static struct ib_qp *ipoib_cm_create_rx_
> .recv_cq = priv->cq,
> .srq = priv->cm.srq,
> .cap.max_send_wr = 1, /* For drain WR */
> + .cap.max_recv_wr = ipoib_recvq_size + 1,
> .cap.max_send_sge = 1, /* FIXME: 0 Seems not to work */
> .sq_sig_type = IB_SIGNAL_ALL_WR,
> .qp_type = IB_QPT_RC,
> .qp_context = p,
> };
> + if (!priv->cm.srq)
> + attr.cap.max_recv_sge = IPOIB_CM_RX_SG;
We can just set max_recv_sge here without the check. It is ignored if
the QP is associated with an srq.
> return ib_create_qp(priv->pd, &attr);
> }
>
> @@ -281,12 +339,130 @@ static int ipoib_cm_send_rep(struct net_
> rep.private_data_len = sizeof data;
> rep.flow_control = 0;
> rep.rnr_retry_count = req->rnr_retry_count;
> - rep.srq = 1;
> rep.qp_num = qp->qp_num;
> rep.starting_psn = psn;
> + rep.srq = !!priv->cm.srq;
> return ib_send_cm_rep(cm_id, &rep);
> }
>
> +static void init_context_and_add_list(struct ib_cm_id *cm_id,
I'm really not sure what this function is trying to do.
There's got to be a better name for this function, or a better way to
organize the code. This looks more like just a blob of code, rather
than code performing a well defined task. See additional comment below
(6-7 comments down) about locking as well.
> + struct ipoib_cm_rx *p,
Some naming consistency for struct ipoib_cm_rx would be nice. 'rx_ptr'
or just 'rx' are fine. In a lot of places, the variable is just called
'p', which is really bad IMO. Some of this already exists, so applying
a patch which renames 'p' to something useful, either before or after
applying this patch would be nice.
> + struct ipoib_dev_priv *priv)
> +{
> + cm_id->context = p;
> + p->jiffies = jiffies;
> + spin_lock_irq(&priv->lock);
> + if (list_empty(&priv->cm.passive_ids))
> + queue_delayed_work(ipoib_workqueue,
> + &priv->cm.stale_task, IPOIB_CM_RX_DELAY);
> + if (priv->cm.srq) {
> + /* Add this entry to passive ids list head, but do not re-add
> + * it if IB_EVENT_QP_LAST_WQE_REACHED has moved it to flush
> + * list.
> + */
> + if (p->state == IPOIB_CM_RX_LIVE)
> + list_move(&p->list, &priv->cm.passive_ids);
Should there be a state change here? It just seems cleaner to me if the
state indicated which list the rx were located on. I don't like that
the state only seems to be used consistently in the srq case. I would
think it could apply to all cases.
> + }
> + spin_unlock_irq(&priv->lock);
> +}
> +
> +static int allocate_and_post_rbuf_nosrq(struct ib_cm_id *cm_id,
> + struct ipoib_cm_rx *p, unsigned psn)
Function name is a little long. Maybe there should be multiple
functions here. (Use of 'and' in the function name points to multiple
functions that are grouped together. Maybe we should add a function
naming rule: if the function name contains 'and', create separate
functions...)
> +{
> + struct net_device *dev = cm_id->context;
> + struct ipoib_dev_priv *priv = netdev_priv(dev);
> + int ret;
> + u32 qp_num, index;
> + u64 i, recv_mem_used;
> +
> + qp_num = p->qp->qp_num;
qp_num is only used in one place in this function, and only for a debug
print.
> +
> + /* In the SRQ case there is a common rx buffer called the srq_ring.
> + * However, for the NOSRQ case we create an rx_ring for every
> + * struct ipoib_cm_rx.
> + */
> + p->rx_ring = kzalloc(ipoib_recvq_size * sizeof *p->rx_ring, GFP_KERNEL);
> + if (!p->rx_ring) {
> + printk(KERN_WARNING "Failed to allocate rx_ring for 0x%x\n",
> + qp_num);
> + return -ENOMEM;
> + }
> +
> + spin_lock_irq(&priv->lock);
> + list_add(&p->list, &priv->cm.passive_ids);
> + spin_unlock_irq(&priv->lock);
> +
> + init_context_and_add_list(cm_id, p, priv);
stale_task thread could be executing on 'p' at this point. Is that
acceptable? (I'm pretty sure I pointed this out before, but I don't
remember what the response was.)
We just added 'p' to the passive_ids list here, but
init_context_and_add_list() also adds it to the list, but only in the
srq case. It would be cleaner to always just add it to the list in
init_context_and_add_list() or always do it outside of the list.
> + spin_lock_irq(&priv->lock);
Including the call above, we end up acquiring this lock 3 times in a
row, setting 2 variables between the first and second time, and doing
nothing between the second and third time.
> +
> + for (index = 0; index < max_rc_qp; index++)
> + if (priv->cm.rx_index_table[index] == NULL)
> + break;
See previous comment about avoiding a linear search.
> +
> + recv_mem_used = (u64)ipoib_recvq_size *
> + (u64)atomic_inc_return(¤t_rc_qp) * CM_PACKET_SIZE;
> + if ((index == max_rc_qp) ||
> + (recv_mem_used >= max_recv_buf * (1ul << 20))) {
I would prefer a single check against max_rc_qp. (Fold memory
constraints into limiting the value of max_rc_qp.) Otherwise, we can
end up allocating a larger array of rx_index_table than is actually
usable.
> + spin_unlock_irq(&priv->lock);
> + ipoib_warn(priv, "NOSRQ has reached the configurable limit "
> + "of either %d RC QPs or, max recv buf size of "
> + "0x%x MB\n", max_rc_qp, max_recv_buf);
> +
> + /* We send a REJ to the remote side indicating that we
> + * have no more free RC QPs and leave it to the remote side
> + * to take appropriate action. This should leave the
> + * current set of QPs unaffected and any subsequent REQs
> + * will be able to use RC QPs if they are available.
> + */
> + ib_send_cm_rej(cm_id, IB_CM_REJ_NO_QP, NULL, 0, NULL, 0);
> + ret = -EINVAL;
> + goto err_alloc_and_post;
> + }
> +
> + priv->cm.rx_index_table[index] = p;
> + spin_unlock_irq(&priv->lock);
> +
> + /* We will subsequently use this stored pointer while freeing
> + * resources in stale task
> + */
> + p->index = index;
Is it dangerous to have this not set before releasing the lock? (It
doesn't look like it, but wanted to check.) Could anything be iterating
over the table expecting p->index to be set.
> +
> + ret = ipoib_cm_modify_rx_qp(dev, cm_id, p->qp, psn);
> + if (ret) {
> + ipoib_warn(priv, "ipoib_cm_modify_rx_qp() failed %d\n", ret);
> + ipoib_cm_dev_cleanup(dev);
> + goto err_alloc_and_post;
> + }
> +
> + for (i = 0; i < ipoib_recvq_size; ++i) {
> + if (!ipoib_cm_alloc_rx_skb(dev, i << 32 | index,
> + IPOIB_CM_RX_SG - 1,
> + p->rx_ring[i].mapping)) {
> + ipoib_warn(priv, "failed to allocate receive "
> + "buffer %d\n", (int)i);
> + ipoib_cm_dev_cleanup(dev);
> + ret = -ENOMEM;
> + goto err_alloc_and_post;
> + }
> +
> + if (post_receive_nosrq(dev, i << 32 | index)) {
> + ipoib_warn(priv, "post_receive_nosrq "
> + "failed for buf %lld\n", (unsigned long long)i);
> + ipoib_cm_dev_cleanup(dev);
> + ret = -EIO;
Why not just do:
ret = post_receive_nosrq()?
if (ret) ...
> + goto err_alloc_and_post;
> + }
> + }
> +
> + return 0;
> +
> +err_alloc_and_post:
> + atomic_dec(¤t_rc_qp);
> + kfree(p->rx_ring);
> + list_del_init(&p->list);
We need a lock here.
Is priv->cm.rx_index_table[index] cleaned up anywhere?
> + return ret;
> +}
> +
> static int ipoib_cm_req_handler(struct ib_cm_id *cm_id, struct ib_cm_event *event)
> {
> struct net_device *dev = cm_id->context;
> @@ -301,9 +477,6 @@ static int ipoib_cm_req_handler(struct i
> return -ENOMEM;
> p->dev = dev;
> p->id = cm_id;
> - cm_id->context = p;
> - p->state = IPOIB_CM_RX_LIVE;
> - p->jiffies = jiffies;
> INIT_LIST_HEAD(&p->list);
>
> p->qp = ipoib_cm_create_rx_qp(dev, p);
> @@ -313,19 +486,21 @@ static int ipoib_cm_req_handler(struct i
> }
>
> psn = random32() & 0xffffff;
> - ret = ipoib_cm_modify_rx_qp(dev, cm_id, p->qp, psn);
> - if (ret)
> - goto err_modify;
> + if (!priv->cm.srq) {
> + ret = allocate_and_post_rbuf_nosrq(cm_id, p, psn);
> + if (ret)
> + goto err_modify;
> + } else {
> + p->rx_ring = NULL;
> + ret = ipoib_cm_modify_rx_qp(dev, cm_id, p->qp, psn);
> + if (ret)
> + goto err_modify;
> + }
>
> - spin_lock_irq(&priv->lock);
> - queue_delayed_work(ipoib_workqueue,
> - &priv->cm.stale_task, IPOIB_CM_RX_DELAY);
> - /* Add this entry to passive ids list head, but do not re-add it
> - * if IB_EVENT_QP_LAST_WQE_REACHED has moved it to flush list. */
> - p->jiffies = jiffies;
> - if (p->state == IPOIB_CM_RX_LIVE)
> - list_move(&p->list, &priv->cm.passive_ids);
> - spin_unlock_irq(&priv->lock);
> + if (priv->cm.srq) {
> + p->state = IPOIB_CM_RX_LIVE;
This if can be merged with the previous if statement above, which
performs a similar check.
Does it matter that the state is set outside of any locks?
> + init_context_and_add_list(cm_id, p, priv);
> + }
>
> ret = ipoib_cm_send_rep(dev, cm_id, p->qp, &event->param.req_rcvd, psn);
> if (ret) {
> @@ -398,29 +573,60 @@ static void skb_put_frags(struct sk_buff
> }
> }
>
> -void ipoib_cm_handle_rx_wc(struct net_device *dev, struct ib_wc *wc)
> +static void timer_check_srq(struct ipoib_dev_priv *priv, struct ipoib_cm_rx *p)
> +{
> + unsigned long flags;
> +
> + if (p && time_after_eq(jiffies, p->jiffies + IPOIB_CM_RX_UPDATE_TIME)) {
> + spin_lock_irqsave(&priv->lock, flags);
> + p->jiffies = jiffies;
> + /* Move this entry to list head, but do
> + * not re-add it if it has been removed.
> + */
> + if (p->state == IPOIB_CM_RX_LIVE)
> + list_move(&p->list, &priv->cm.passive_ids);
> + spin_unlock_irqrestore(&priv->lock, flags);
> + }
> +}
> +
> +static void timer_check_nosrq(struct ipoib_dev_priv *priv, struct ipoib_cm_rx *p)
> +{
> + unsigned long flags;
> +
> + if (p && time_after_eq(jiffies, p->jiffies + IPOIB_CM_RX_UPDATE_TIME)) {
> + spin_lock_irqsave(&priv->lock, flags);
> + p->jiffies = jiffies;
> + /* Move this entry to list head, but do
> + * not re-add it if it has been removed. */
> + if (!list_empty(&p->list))
> + list_move(&p->list, &priv->cm.passive_ids);
> + spin_unlock_irqrestore(&priv->lock, flags);
> + }
> +}
Letting with and without srq use the same state let's us combine these
routines. It seems cleaner to act in the no srq case based on an
explicit state of the ipoib_cm_rx, rather than the state of a list item,
given that the state tracking is already there.
> +
> +void handle_rx_wc_srq(struct net_device *dev, struct ib_wc *wc)
> {
> struct ipoib_dev_priv *priv = netdev_priv(dev);
> - unsigned int wr_id = wc->wr_id & ~IPOIB_CM_OP_SRQ;
> + u64 wr_id = wc->wr_id & ~IPOIB_CM_OP_RECV;
> struct sk_buff *skb, *newskb;
> struct ipoib_cm_rx *p;
> unsigned long flags;
> u64 mapping[IPOIB_CM_RX_SG];
> - int frags;
> + int frags, ret;
>
> - ipoib_dbg_data(priv, "cm recv completion: id %d, status: %d\n",
> - wr_id, wc->status);
> + ipoib_dbg_data(priv, "cm recv completion: id %lld, status: %d\n",
> + (unsigned long long)wr_id, wc->status);
>
> if (unlikely(wr_id >= ipoib_recvq_size)) {
Why would this ever occur?
> - if (wr_id == (IPOIB_CM_RX_DRAIN_WRID & ~IPOIB_CM_OP_SRQ)) {
> + if (wr_id == (IPOIB_CM_RX_DRAIN_WRID & ~IPOIB_CM_OP_RECV)) {
> spin_lock_irqsave(&priv->lock, flags);
> list_splice_init(&priv->cm.rx_drain_list, &priv->cm.rx_reap_list);
> ipoib_cm_start_rx_drain(priv);
> queue_work(ipoib_workqueue, &priv->cm.rx_reap_task);
> spin_unlock_irqrestore(&priv->lock, flags);
> } else
> - ipoib_warn(priv, "cm recv completion event with wrid %d (> %d)\n",
> - wr_id, ipoib_recvq_size);
> + ipoib_warn(priv, "cm recv completion event with wrid %lld (> %d)\n",
> + (unsigned long long)wr_id, ipoib_recvq_size);
> return;
> }
>
> @@ -428,23 +634,15 @@ void ipoib_cm_handle_rx_wc(struct net_de
>
> if (unlikely(wc->status != IB_WC_SUCCESS)) {
> ipoib_dbg(priv, "cm recv error "
> - "(status=%d, wrid=%d vend_err %x)\n",
> - wc->status, wr_id, wc->vendor_err);
> + "(status=%d, wrid=%lld vend_err %x)\n",
> + wc->status, (unsigned long long)wr_id, wc->vendor_err);
> ++priv->stats.rx_dropped;
> - goto repost;
> + goto repost_srq;
> }
>
> if (!likely(wr_id & IPOIB_CM_RX_UPDATE_MASK)) {
> p = wc->qp->qp_context;
> - if (p && time_after_eq(jiffies, p->jiffies + IPOIB_CM_RX_UPDATE_TIME)) {
> - spin_lock_irqsave(&priv->lock, flags);
> - p->jiffies = jiffies;
> - /* Move this entry to list head, but do not re-add it
> - * if it has been moved out of list. */
> - if (p->state == IPOIB_CM_RX_LIVE)
> - list_move(&p->list, &priv->cm.passive_ids);
> - spin_unlock_irqrestore(&priv->lock, flags);
> - }
> + timer_check_srq(priv, p);
This looks like noise at the moment. (See previous comment about
timer_check_srq.)
> }
>
> frags = PAGE_ALIGN(wc->byte_len - min(wc->byte_len,
> @@ -456,13 +654,112 @@ void ipoib_cm_handle_rx_wc(struct net_de
> * If we can't allocate a new RX buffer, dump
> * this packet and reuse the old buffer.
> */
> - ipoib_dbg(priv, "failed to allocate receive buffer %d\n", wr_id);
> + ipoib_dbg(priv, "failed to allocate receive buffer %lld\n",
> + (unsigned long long)wr_id);
> ++priv->stats.rx_dropped;
> - goto repost;
> + goto repost_srq;
> }
>
> - ipoib_cm_dma_unmap_rx(priv, frags, priv->cm.srq_ring[wr_id].mapping);
> - memcpy(priv->cm.srq_ring[wr_id].mapping, mapping, (frags + 1) * sizeof *mapping);
> + ipoib_cm_dma_unmap_rx(priv, frags,
> + priv->cm.srq_ring[wr_id].mapping);
> + memcpy(priv->cm.srq_ring[wr_id].mapping, mapping,
> + (frags + 1) * sizeof *mapping);
> + ipoib_dbg_data(priv, "received %d bytes, SLID 0x%04x\n",
> + wc->byte_len, wc->slid);
> +
> + skb_put_frags(skb, IPOIB_CM_HEAD_SIZE, wc->byte_len, newskb);
> +
> + skb->protocol = ((struct ipoib_header *) skb->data)->proto;
> + skb_reset_mac_header(skb);
> + skb_pull(skb, IPOIB_ENCAP_LEN);
> +
> + dev->last_rx = jiffies;
> + ++priv->stats.rx_packets;
> + priv->stats.rx_bytes += skb->len;
> +
> + skb->dev = dev;
> + /* XXX get correct PACKET_ type here */
> + skb->pkt_type = PACKET_HOST;
> + netif_receive_skb(skb);
> +
> +repost_srq:
> + ret = post_receive_srq(dev, wr_id);
> +
> + if (unlikely(ret))
> + ipoib_warn(priv, "post_receive_srq failed for buf %lld\n",
> + (unsigned long long)wr_id);
> +
> +}
Some of the changes to this call look like noise.
There's a lot of code at the end of this routine (shows as 'new' code in
the diff) that's duplicated in handle_rx_wc_nosrq. Can we pull out the
common code into a function or merge these two routines?
One possibility is to store a function pointer with the ipoib_cm_rx
that's invoked for posting receive buffers. (Even ib_post_recv and
ib_post_srq_recv are similar, if you want to carry this concept further
to allow better code sharing.)
> +
> +static void handle_rx_wc_nosrq(struct net_device *dev, struct ib_wc *wc)
> +{
> + struct ipoib_dev_priv *priv = netdev_priv(dev);
> + struct sk_buff *skb, *newskb;
> + u64 mapping[IPOIB_CM_RX_SG], wr_id = wc->wr_id >> 32;
> + u32 index;
> + struct ipoib_cm_rx *rx_ptr;
> + int frags, ret;
> +
> +
extra white space
> + ipoib_dbg_data(priv, "cm recv completion: id %lld, status: %d\n",
> + (unsigned long long)wr_id, wc->status);
> +
> + if (unlikely(wr_id >= ipoib_recvq_size)) {
Why would this ever occur?
> + ipoib_warn(priv, "cm recv completion event with wrid %lld (> %d)\n",
> + (unsigned long long)wr_id, ipoib_recvq_size);
> + return;
> + }
> +
> + index = (wc->wr_id & ~IPOIB_CM_OP_RECV) & NOSRQ_INDEX_MASK;
> +
> + /* This is the only place where rx_ptr could be a NULL - could
> + * have just received a packet from a connection that has become
> + * stale and so is going away. We will simply drop the packet and
> + * let the hardware (it s IB_QPT_RC) handle the dropped packet.
I don't understand this comment. How can the hardware handle a packet
dropped by software?
If the completion can be for a connection that has gone away, what's to
prevent a new connection from grabbing the same slot in the
rx_index_table. If this occurs, then the completion will reference the
wrong connection.
> + * In the timer_check() function below, p->jiffies is updated and
> + * hence the connection will not be stale after that.
> + */
> + rx_ptr = priv->cm.rx_index_table[index];
> + if (unlikely(!rx_ptr)) {
> + ipoib_warn(priv, "Received packet from a connection "
> + "that is going away. Hardware will handle it.\n");
> + return;
> + }
> +
> + skb = rx_ptr->rx_ring[wr_id].skb;
> +
> + if (unlikely(wc->status != IB_WC_SUCCESS)) {
> + ipoib_dbg(priv, "cm recv error "
> + "(status=%d, wrid=%lld vend_err %x)\n",
> + wc->status, (unsigned long long)wr_id, wc->vendor_err);
> + ++priv->stats.rx_dropped;
> + goto repost_nosrq;
> + }
> +
> + if (!likely(wr_id & IPOIB_CM_RX_UPDATE_MASK))
> + /* There are no guarantees that wc->qp is not NULL for HCAs
> + * that do not support SRQ. */
This comment seems kind of random here... First, it would help to word
it without 'no' 'not' "NULL' 'not', to help with deciphering. Second, I
don't see how it relates to the surrounding code.
> + timer_check_nosrq(priv, rx_ptr);
> +
> + frags = PAGE_ALIGN(wc->byte_len - min(wc->byte_len,
> + (unsigned)IPOIB_CM_HEAD_SIZE)) / PAGE_SIZE;
> +
> + newskb = ipoib_cm_alloc_rx_skb(dev, wr_id << 32 | index, frags,
> + mapping);
> + if (unlikely(!newskb)) {
> + /*
> + * If we can't allocate a new RX buffer, dump
> + * this packet and reuse the old buffer.
> + */
> + ipoib_dbg(priv, "failed to allocate receive buffer %lld\n",
> + (unsigned long long)wr_id);
> + ++priv->stats.rx_dropped;
> + goto repost_nosrq;
> + }
> +
> + ipoib_cm_dma_unmap_rx(priv, frags, rx_ptr->rx_ring[wr_id].mapping);
> + memcpy(rx_ptr->rx_ring[wr_id].mapping, mapping,
> + (frags + 1) * sizeof *mapping);
>
> ipoib_dbg_data(priv, "received %d bytes, SLID 0x%04x\n",
> wc->byte_len, wc->slid);
> @@ -482,10 +779,22 @@ void ipoib_cm_handle_rx_wc(struct net_de
> skb->pkt_type = PACKET_HOST;
> netif_receive_skb(skb);
>
> -repost:
> - if (unlikely(ipoib_cm_post_receive(dev, wr_id)))
> - ipoib_warn(priv, "ipoib_cm_post_receive failed "
> - "for buf %d\n", wr_id);
> +repost_nosrq:
> + ret = post_receive_nosrq(dev, wr_id << 32 | index);
> +
> + if (unlikely(ret))
> + ipoib_warn(priv, "post_receive_nosrq failed for buf %lld\n",
> + (unsigned long long)wr_id);
> +}
> +
> +void ipoib_cm_handle_rx_wc(struct net_device *dev, struct ib_wc *wc)
> +{
> + struct ipoib_dev_priv *priv = netdev_priv(dev);
> +
> + if (priv->cm.srq)
> + handle_rx_wc_srq(dev, wc);
> + else
> + handle_rx_wc_nosrq(dev, wc);
> }
We're taking a branch here to two functions that contain a fair amount
of identical code. My personal preference is to have a single
handle_rx_wc call with 2-3 if (srq) checks if needed than the current
duplication.
There are so many if (unlikely...) if (likely... checks in the rx_wc
handlers, that I have a hard time believing that an additional 1-2 if
(srq) checks will impact performance worse than the code increase.
>
> static inline int post_send(struct ipoib_dev_priv *priv,
> @@ -677,6 +986,43 @@ err_cm:
> return ret;
> }
>
> +static void free_resources_nosrq(struct ipoib_dev_priv *priv, struct ipoib_cm_rx *p)
> +{
> + int i;
> +
> + for (i = 0; i < ipoib_recvq_size; ++i)
> + if (p->rx_ring[i].skb) {
> + ipoib_cm_dma_unmap_rx(priv,
> + IPOIB_CM_RX_SG - 1,
> + p->rx_ring[i].mapping);
> + dev_kfree_skb_any(p->rx_ring[i].skb);
> + p->rx_ring[i].skb = NULL;
We're freeing rx_ring, so setting skb to NULL seems unnecessary.
> + }
> + kfree(p->rx_ring);
> +}
> +
> +void dev_stop_nosrq(struct ipoib_dev_priv *priv)
> +{
> + struct ipoib_cm_rx *p;
> +
> + spin_lock_irq(&priv->lock);
> + while (!list_empty(&priv->cm.passive_ids)) {
> + p = list_entry(priv->cm.passive_ids.next, typeof(*p), list);
> + free_resources_nosrq(priv, p);
Does this call need to be made under lock? Or can we just remove it
from the list, release the lock, then cleanup?
> + list_del(&p->list);
> + spin_unlock_irq(&priv->lock);
> + ib_destroy_cm_id(p->id);
> + ib_destroy_qp(p->qp);
> + atomic_dec(¤t_rc_qp);
> + kfree(p);
> + spin_lock_irq(&priv->lock);
> + }
> + spin_unlock_irq(&priv->lock);
> +
> + cancel_delayed_work(&priv->cm.stale_task);
> + kfree(priv->cm.rx_index_table);
> +}
> +
> void ipoib_cm_dev_stop(struct net_device *dev)
> {
> struct ipoib_dev_priv *priv = netdev_priv(dev);
> @@ -691,6 +1037,11 @@ void ipoib_cm_dev_stop(struct net_device
> ib_destroy_cm_id(priv->cm.id);
> priv->cm.id = NULL;
>
> + if (!priv->cm.srq) {
> + dev_stop_nosrq(priv);
> + return;
> + }
> +
Maybe it would be better to create two dev_stop calls, versus the
current code flow where srq is done entirely within dev_stop, but no srq
jumps to a different routine. (Is there any way to make this cleanup
code a little more similar?)
> spin_lock_irq(&priv->lock);
> while (!list_empty(&priv->cm.passive_ids)) {
> p = list_entry(priv->cm.passive_ids.next, typeof(*p), list);
> @@ -814,7 +1165,9 @@ static struct ib_qp *ipoib_cm_create_tx_
> attr.recv_cq = priv->cq;
> attr.srq = priv->cm.srq;
> attr.cap.max_send_wr = ipoib_sendq_size;
> + attr.cap.max_recv_wr = 0;
> attr.cap.max_send_sge = 1;
> + attr.cap.max_recv_sge = 0;
> attr.sq_sig_type = IB_SIGNAL_ALL_WR;
> attr.qp_type = IB_QPT_RC;
> attr.send_cq = cq;
> @@ -854,7 +1207,7 @@ static int ipoib_cm_send_req(struct net_
> req.retry_count = 0; /* RFC draft warns against retries */
> req.rnr_retry_count = 0; /* RFC draft warns against retries */
> req.max_cm_retries = 15;
> - req.srq = 1;
> + req.srq = !!priv->cm.srq;
> return ib_send_cm_req(id, &req);
> }
>
> @@ -1198,6 +1551,8 @@ static void ipoib_cm_rx_reap(struct work
> list_for_each_entry_safe(p, n, &list, list) {
> ib_destroy_cm_id(p->id);
> ib_destroy_qp(p->qp);
> + if (!priv->cm.srq)
> + atomic_dec(¤t_rc_qp);
I think QP limitations should apply independent of SRQ. See comments at
the top of mail about separating the limitations.
> kfree(p);
> }
> }
> @@ -1216,12 +1571,19 @@ static void ipoib_cm_stale_task(struct w
> p = list_entry(priv->cm.passive_ids.prev, typeof(*p), list);
> if (time_before_eq(jiffies, p->jiffies + IPOIB_CM_RX_TIMEOUT))
> break;
> - list_move(&p->list, &priv->cm.rx_error_list);
> - p->state = IPOIB_CM_RX_ERROR;
> - spin_unlock_irq(&priv->lock);
> - ret = ib_modify_qp(p->qp, &ipoib_cm_err_attr, IB_QP_STATE);
> - if (ret)
> - ipoib_warn(priv, "unable to move qp to error state: %d\n", ret);
> + if (!priv->cm.srq) {
> + free_resources_nosrq(priv, p);
> + list_del_init(&p->list);
> + priv->cm.rx_index_table[p->index] = NULL;
> + spin_unlock_irq(&priv->lock);
> + } else {
> + list_move(&p->list, &priv->cm.rx_error_list);
> + p->state = IPOIB_CM_RX_ERROR;
> + spin_unlock_irq(&priv->lock);
> + ret = ib_modify_qp(p->qp, &ipoib_cm_err_attr, IB_QP_STATE);
> + if (ret)
> + ipoib_warn(priv, "unable to move qp to error state: %d\n", ret);
> + }
I don't understand the differences between srq and no srq. Both have a
list of QPs? Why does one track state, the other just remove itself
from a list? Why not just have both transition into the error state?
> spin_lock_irq(&priv->lock);
> }
>
> @@ -1275,16 +1637,40 @@ int ipoib_cm_add_mode_attr(struct net_de
> return device_create_file(&dev->dev, &dev_attr_mode);
> }
>
> +static int create_srq(struct net_device *dev, struct ipoib_dev_priv *priv)
> +{
> + struct ib_srq_init_attr srq_init_attr;
> + int ret;
> +
> + srq_init_attr.attr.max_wr = ipoib_recvq_size;
> + srq_init_attr.attr.max_sge = IPOIB_CM_RX_SG;
> +
> + priv->cm.srq = ib_create_srq(priv->pd, &srq_init_attr);
> + if (IS_ERR(priv->cm.srq)) {
> + ret = PTR_ERR(priv->cm.srq);
> + priv->cm.srq = NULL;
> + return ret;
> + }
Can a failure here result in trying to use no SRQ mode?
> +
> + priv->cm.srq_ring = kzalloc(ipoib_recvq_size *
> + sizeof *priv->cm.srq_ring,
> + GFP_KERNEL);
> + if (!priv->cm.srq_ring) {
> + printk(KERN_WARNING "%s: failed to allocate CM ring "
> + "(%d entries)\n",
> + priv->ca->name, ipoib_recvq_size);
> + ipoib_cm_dev_cleanup(dev);
I think we should limit the cleanup of this function to only what it
creates. Only destroy the srq here if we can't allocate srq_ring.
> + return -ENOMEM;
> + }
> +
> + return 0;
> +}
> +
> int ipoib_cm_dev_init(struct net_device *dev)
> {
> struct ipoib_dev_priv *priv = netdev_priv(dev);
> - struct ib_srq_init_attr srq_init_attr = {
> - .attr = {
> - .max_wr = ipoib_recvq_size,
> - .max_sge = IPOIB_CM_RX_SG
> - }
> - };
> int ret, i;
> + struct ib_device_attr attr;
>
> INIT_LIST_HEAD(&priv->cm.passive_ids);
> INIT_LIST_HEAD(&priv->cm.reap_list);
> @@ -1301,20 +1687,32 @@ int ipoib_cm_dev_init(struct net_device
>
> skb_queue_head_init(&priv->cm.skb_queue);
>
> - priv->cm.srq = ib_create_srq(priv->pd, &srq_init_attr);
> - if (IS_ERR(priv->cm.srq)) {
> - ret = PTR_ERR(priv->cm.srq);
> - priv->cm.srq = NULL;
> + ret = ib_query_device(priv->ca, &attr);
> + if (ret)
> return ret;
> - }
>
> - priv->cm.srq_ring = kzalloc(ipoib_recvq_size * sizeof *priv->cm.srq_ring,
> - GFP_KERNEL);
> - if (!priv->cm.srq_ring) {
> - printk(KERN_WARNING "%s: failed to allocate CM ring (%d entries)\n",
> - priv->ca->name, ipoib_recvq_size);
> - ipoib_cm_dev_cleanup(dev);
> - return -ENOMEM;
> + if (attr.max_srq) {
> + /* This device supports SRQ */
> + ret = create_srq(dev, priv);
> + if (ret)
> + return ret;
> + priv->cm.rx_index_table = NULL;
> + } else {
> + priv->cm.srq = NULL;
> + priv->cm.srq_ring = NULL;
> +
> + /* Every new REQ that arrives creates a struct ipoib_cm_rx.
> + * These structures form a link list starting with the
> + * passive_ids. For quick and easy access we maintain a table
> + * of pointers to struct ipoib_cm_rx called the rx_index_table
> + */
> + priv->cm.rx_index_table = kcalloc(max_rc_qp,
> + sizeof *priv->cm.rx_index_table,
> + GFP_KERNEL);
> + if (!priv->cm.rx_index_table) {
> + printk(KERN_WARNING "Failed to allocate rx_index_table\n");
> + return -ENOMEM;
> + }
> }
>
> for (i = 0; i < IPOIB_CM_RX_SG; ++i)
> @@ -1327,17 +1725,24 @@ int ipoib_cm_dev_init(struct net_device
> priv->cm.rx_wr.sg_list = priv->cm.rx_sge;
> priv->cm.rx_wr.num_sge = IPOIB_CM_RX_SG;
>
> - for (i = 0; i < ipoib_recvq_size; ++i) {
> - if (!ipoib_cm_alloc_rx_skb(dev, i, IPOIB_CM_RX_SG - 1,
> + /* One can post receive buffers even before the RX QP is created
> + * only in the SRQ case. Therefore for NOSRQ we skip the rest of init
> + * and do that in ipoib_cm_req_handler()
> + */
> +
> + if (priv->cm.srq) {
> + for (i = 0; i < ipoib_recvq_size; ++i) {
> + if (!ipoib_cm_alloc_rx_skb(dev, i, IPOIB_CM_RX_SG - 1,
> priv->cm.srq_ring[i].mapping)) {
> - ipoib_warn(priv, "failed to allocate receive buffer %d\n", i);
> - ipoib_cm_dev_cleanup(dev);
> - return -ENOMEM;
> - }
> - if (ipoib_cm_post_receive(dev, i)) {
> - ipoib_warn(priv, "ipoib_ib_post_receive failed for buf %d\n", i);
> - ipoib_cm_dev_cleanup(dev);
> - return -EIO;
> + ipoib_warn(priv, "failed to allocate receive buffer %d\n", i);
> + ipoib_cm_dev_cleanup(dev);
> + return -ENOMEM;
> + }
> + if (post_receive_srq(dev, i)) {
> + ipoib_warn(priv, "post_receive_srq failed for buf %d\n", i);
> + ipoib_cm_dev_cleanup(dev);
> + return -EIO;
> + }
Why not just wait until the req_handler to post receives in both cases?
There's no need to consume the resources until a connection has been made.
> }
> }
>
> --- a/linux-2.6.23-rc1/drivers/infiniband/ulp/ipoib/ipoib_ib.c 2007-08-20 17:39:25.000000000 -0400
> +++ b/linux-2.6.23-rc1/drivers/infiniband/ulp/ipoib/ipoib_ib.c 2007-08-14 19:53:16.000000000 -0400
> @@ -300,7 +300,7 @@ int ipoib_poll(struct net_device *dev, i
> for (i = 0; i < n; ++i) {
> struct ib_wc *wc = priv->ibwc + i;
>
> - if (wc->wr_id & IPOIB_CM_OP_SRQ) {
> + if (wc->wr_id & IPOIB_CM_OP_RECV) {
> ++done;
> --max;
> ipoib_cm_handle_rx_wc(dev, wc);
> @@ -558,7 +558,7 @@ void ipoib_drain_cq(struct net_device *d
> do {
> n = ib_poll_cq(priv->cq, IPOIB_NUM_WC, priv->ibwc);
> for (i = 0; i < n; ++i) {
> - if (priv->ibwc[i].wr_id & IPOIB_CM_OP_SRQ)
> + if (priv->ibwc[i].wr_id & IPOIB_CM_OP_RECV)
> ipoib_cm_handle_rx_wc(dev, priv->ibwc + i);
> else if (priv->ibwc[i].wr_id & IPOIB_OP_RECV)
> ipoib_ib_handle_rx_wc(dev, priv->ibwc + i);
> --- a/linux-2.6.23-rc1/drivers/infiniband/ulp/ipoib/ipoib_verbs.c 2007-08-20 17:39:25.000000000 -0400
> +++ b/linux-2.6.23-rc1/drivers/infiniband/ulp/ipoib/ipoib_verbs.c 2007-08-14 19:53:16.000000000 -0400
> @@ -175,6 +175,18 @@ int ipoib_transport_dev_init(struct net_
> if (!ret)
> size += ipoib_recvq_size + 1 /* 1 extra for rx_drain_qp */;
>
> +#ifdef CONFIG_INFINIBAND_IPOIB_CM
> +
> + /* We increase the size of the CQ in the NOSRQ case to prevent CQ
> + * overflow. Every new REQ creates a new RX QP and each QP has an
> + * RX ring associated with it. Therefore we could have
> + * max_rc_qp*ipoib_recvq_size + ipoib_sendq_size CQEs
> + * in a CQ.
> + */
> + if (!priv->cm.srq)
> + size += (max_rc_qp - 1) * ipoib_recvq_size;
> +#endif
> +
> priv->cq = ib_create_cq(priv->ca, ipoib_ib_completion, NULL, dev, size, 0);
> if (IS_ERR(priv->cq)) {
> printk(KERN_WARNING "%s: failed to create CQ\n", ca->name);
> --- a/linux-2.6.23-rc1/drivers/infiniband/ulp/ipoib/ipoib_main.c 2007-08-20 17:39:25.000000000 -0400
> +++ b/linux-2.6.23-rc1/drivers/infiniband/ulp/ipoib/ipoib_main.c 2007-08-20 17:42:13.000000000 -0400
> @@ -1227,6 +1227,7 @@ static int __init ipoib_init_module(void
> ipoib_sendq_size = roundup_pow_of_two(ipoib_sendq_size);
> ipoib_sendq_size = min(ipoib_sendq_size, IPOIB_MAX_QUEUE_SIZE);
> ipoib_sendq_size = max(ipoib_sendq_size, IPOIB_MIN_QUEUE_SIZE);
> + max_rc_qp = roundup_pow_of_two(max_rc_qp);
>
> ret = ipoib_register_debugfs();
> if (ret)
- Sean
More information about the general
mailing list