[ofa-general] IPOB CM (NOSRQ) [PATCH V8] patch
Pradeep Satyanarayana
pradeeps at linux.vnet.ibm.com
Thu Jul 19 11:55:39 PDT 2007
Addressed Roland's comments and more (hope this passes muster :)).
The event_handler issue pointed out will be addressed in another patch.
Signed-off-by: Pradeep Satyanarayana <pradeeps at linux.vnet.ibm.com>
---
--- a/linux-2.6.22/drivers/infiniband/ulp/ipoib/ipoib.h 2007-05-30 14:56:25.000000000 -0400
+++ b/linux-2.6.22/drivers/infiniband/ulp/ipoib/ipoib.h 2007-07-19 11:17:39.000000000 -0400
@@ -95,11 +95,15 @@ enum {
IPOIB_MCAST_FLAG_ATTACHED = 3,
};
+#define CM_PACKET_SIZE (ALIGN(IPOIB_CM_MTU, PAGE_SIZE))
#define IPOIB_OP_RECV (1ul << 31)
#ifdef CONFIG_INFINIBAND_IPOIB_CM
-#define IPOIB_CM_OP_SRQ (1ul << 30)
+#define IPOIB_CM_OP_RECV (1ul << 30)
+
+#define NOSRQ_INDEX_TABLE_SIZE 128
+#define NOSRQ_INDEX_MASK (NOSRQ_INDEX_TABLE_SIZE -1)
#else
-#define IPOIB_CM_OP_SRQ (0)
+#define IPOIB_CM_OP_RECV (0)
#endif
/* structs */
@@ -166,11 +170,14 @@ enum ipoib_cm_state {
};
struct ipoib_cm_rx {
- struct ib_cm_id *id;
- struct ib_qp *qp;
- struct list_head list;
- struct net_device *dev;
- unsigned long jiffies;
+ struct ib_cm_id *id;
+ struct ib_qp *qp;
+ struct ipoib_cm_rx_buf *rx_ring; /* Used by NOSRQ only */
+ struct list_head list;
+ struct net_device *dev;
+ unsigned long jiffies;
+ u32 index; /* wr_ids are distinguished by index
+ * to identify the QP -NOSRQ only */
enum ipoib_cm_state state;
};
@@ -215,6 +222,8 @@ struct ipoib_cm_dev_priv {
struct ib_wc ibwc[IPOIB_NUM_WC];
struct ib_sge rx_sge[IPOIB_CM_RX_SG];
struct ib_recv_wr rx_wr;
+ struct ipoib_cm_rx **rx_index_table; /* See ipoib_cm_dev_init()
+ *for usage of this element */
};
/*
--- a/linux-2.6.22/drivers/infiniband/ulp/ipoib/ipoib_cm.c 2007-07-10 17:02:33.000000000 -0400
+++ b/linux-2.6.22/drivers/infiniband/ulp/ipoib/ipoib_cm.c 2007-07-19 13:55:59.000000000 -0400
@@ -49,6 +49,17 @@ MODULE_PARM_DESC(cm_data_debug_level,
#include "ipoib.h"
+static int max_rc_qp = NOSRQ_INDEX_TABLE_SIZE;
+static int max_recv_buf = 1024; /* Default is 1024 MB */
+
+module_param_named(nosrq_max_rc_qp, max_rc_qp, int, 0644);
+MODULE_PARM_DESC(nosrq_max_rc_qp, "Max number of NOSRQ RC QPs supported");
+
+module_param_named(max_receive_buffer, max_recv_buf, int, 0644);
+MODULE_PARM_DESC(max_receive_buffer, "Max Receive Buffer Size in MB");
+
+static atomic_t current_rc_qp = ATOMIC_INIT(0); /* Active number of RC QPs for NOSRQ */
+
#define IPOIB_CM_IETF_ID 0x1000000000000000ULL
#define IPOIB_CM_RX_UPDATE_TIME (256 * HZ)
@@ -81,20 +92,21 @@ static void ipoib_cm_dma_unmap_rx(struct
ib_dma_unmap_single(priv->ca, mapping[i + 1], PAGE_SIZE, DMA_FROM_DEVICE);
}
-static int ipoib_cm_post_receive(struct net_device *dev, int id)
+static int post_receive_srq(struct net_device *dev, u64 id)
{
struct ipoib_dev_priv *priv = netdev_priv(dev);
struct ib_recv_wr *bad_wr;
int i, ret;
- priv->cm.rx_wr.wr_id = id | IPOIB_CM_OP_SRQ;
+ priv->cm.rx_wr.wr_id = id | IPOIB_CM_OP_RECV;
for (i = 0; i < IPOIB_CM_RX_SG; ++i)
priv->cm.rx_sge[i].addr = priv->cm.srq_ring[id].mapping[i];
ret = ib_post_srq_recv(priv->cm.srq, &priv->cm.rx_wr, &bad_wr);
if (unlikely(ret)) {
- ipoib_warn(priv, "post srq failed for buf %d (%d)\n", id, ret);
+ ipoib_warn(priv, "post srq failed for buf %lld (%d)\n",
+ (unsigned long long)id, ret);
ipoib_cm_dma_unmap_rx(priv, IPOIB_CM_RX_SG - 1,
priv->cm.srq_ring[id].mapping);
dev_kfree_skb_any(priv->cm.srq_ring[id].skb);
@@ -104,12 +116,47 @@ static int ipoib_cm_post_receive(struct
return ret;
}
-static struct sk_buff *ipoib_cm_alloc_rx_skb(struct net_device *dev, int id, int frags,
+static int post_receive_nosrq(struct net_device *dev, u64 id)
+{
+ struct ipoib_dev_priv *priv = netdev_priv(dev);
+ struct ib_recv_wr *bad_wr;
+ int i, ret;
+ u32 index;
+ u32 wr_id;
+ struct ipoib_cm_rx *rx_ptr;
+
+ index = id & NOSRQ_INDEX_MASK ;
+ wr_id = id >> 32;
+
+ rx_ptr = priv->cm.rx_index_table[index];
+
+ priv->cm.rx_wr.wr_id = id | IPOIB_CM_OP_RECV;
+
+ for (i = 0; i < IPOIB_CM_RX_SG; ++i)
+ priv->cm.rx_sge[i].addr = rx_ptr->rx_ring[wr_id].mapping[i];
+
+ ret = ib_post_recv(rx_ptr->qp, &priv->cm.rx_wr, &bad_wr);
+ if (unlikely(ret)) {
+ ipoib_warn(priv, "post recv failed for buf %d (%d)\n",
+ wr_id, ret);
+ ipoib_cm_dma_unmap_rx(priv, IPOIB_CM_RX_SG - 1,
+ rx_ptr->rx_ring[wr_id].mapping);
+ dev_kfree_skb_any(rx_ptr->rx_ring[wr_id].skb);
+ rx_ptr->rx_ring[wr_id].skb = NULL;
+ }
+
+ return ret;
+}
+
+static struct sk_buff *ipoib_cm_alloc_rx_skb(struct net_device *dev, u64 id,
+ int frags,
u64 mapping[IPOIB_CM_RX_SG])
{
struct ipoib_dev_priv *priv = netdev_priv(dev);
struct sk_buff *skb;
int i;
+ struct ipoib_cm_rx *rx_ptr;
+ u32 index, wr_id;
skb = dev_alloc_skb(IPOIB_CM_HEAD_SIZE + 12);
if (unlikely(!skb))
@@ -141,7 +188,14 @@ static struct sk_buff *ipoib_cm_alloc_rx
goto partial_error;
}
- priv->cm.srq_ring[id].skb = skb;
+ if (priv->cm.srq)
+ priv->cm.srq_ring[id].skb = skb;
+ else {
+ index = id & NOSRQ_INDEX_MASK ;
+ wr_id = id >> 32;
+ rx_ptr = priv->cm.rx_index_table[index];
+ rx_ptr->rx_ring[wr_id].skb = skb;
+ }
return skb;
partial_error:
@@ -198,16 +252,21 @@ static struct ib_qp *ipoib_cm_create_rx_
{
struct ipoib_dev_priv *priv = netdev_priv(dev);
struct ib_qp_init_attr attr = {
- .event_handler = ipoib_cm_rx_event_handler,
.send_cq = priv->cq, /* For drain WR */
.recv_cq = priv->cq,
.srq = priv->cm.srq,
.cap.max_send_wr = 1, /* For drain WR */
+ .cap.max_recv_wr = ipoib_recvq_size + 1,
.cap.max_send_sge = 1, /* FIXME: 0 Seems not to work */
.sq_sig_type = IB_SIGNAL_ALL_WR,
.qp_type = IB_QPT_RC,
.qp_context = p,
};
+ if (!priv->cm.srq) {
+ attr.cap.max_recv_sge = IPOIB_CM_RX_SG;
+ attr.event_handler = NULL;
+ } else
+ attr.event_handler = ipoib_cm_rx_event_handler;
return ib_create_qp(priv->pd, &attr);
}
@@ -282,12 +341,129 @@ static int ipoib_cm_send_rep(struct net_
rep.flow_control = 0;
rep.rnr_retry_count = req->rnr_retry_count;
rep.target_ack_delay = 20; /* FIXME */
- rep.srq = 1;
rep.qp_num = qp->qp_num;
rep.starting_psn = psn;
+ rep.srq = !!priv->cm.srq;
return ib_send_cm_rep(cm_id, &rep);
}
+static void init_context_and_add_list(struct ib_cm_id *cm_id,
+ struct ipoib_cm_rx *p,
+ struct ipoib_dev_priv *priv)
+{
+ cm_id->context = p;
+ p->jiffies = jiffies;
+ spin_lock_irq(&priv->lock);
+ if (list_empty(&priv->cm.passive_ids))
+ queue_delayed_work(ipoib_workqueue,
+ &priv->cm.stale_task, IPOIB_CM_RX_DELAY);
+ if (priv->cm.srq) {
+ /* Add this entry to passive ids list head, but do not re-add
+ * it if IB_EVENT_QP_LAST_WQE_REACHED has moved it to flush
+ * list.
+ */
+ if (p->state == IPOIB_CM_RX_LIVE)
+ list_move(&p->list, &priv->cm.passive_ids);
+ }
+ spin_unlock_irq(&priv->lock);
+}
+
+static int allocate_and_post_rbuf_nosrq(struct ib_cm_id *cm_id,
+ struct ipoib_cm_rx *p, unsigned psn)
+{
+ struct net_device *dev = cm_id->context;
+ struct ipoib_dev_priv *priv = netdev_priv(dev);
+ int ret;
+ u32 qp_num, index;
+ u64 i, recv_mem_used;
+
+ qp_num = p->qp->qp_num;
+
+ /* In the SRQ case there is a common rx buffer called the srq_ring.
+ * However, for the NOSRQ we create an rx_ring for every
+ * struct ipoib_cm_rx.
+ */
+ p->rx_ring = kzalloc(ipoib_recvq_size * sizeof *p->rx_ring, GFP_KERNEL);
+ if (!p->rx_ring) {
+ printk(KERN_WARNING "Failed to allocate rx_ring for 0x%x\n",
+ qp_num);
+ return -ENOMEM;
+ }
+
+ spin_lock_irq(&priv->lock);
+ list_add(&p->list, &priv->cm.passive_ids);
+ spin_unlock_irq(&priv->lock);
+
+ init_context_and_add_list(cm_id, p, priv);
+ spin_lock_irq(&priv->lock);
+
+ for (index = 0; index < max_rc_qp; index++)
+ if (priv->cm.rx_index_table[index] == NULL)
+ break;
+
+ recv_mem_used = (u64)ipoib_recvq_size *
+ (u64)atomic_inc_return(¤t_rc_qp) * CM_PACKET_SIZE;
+ if ((index == max_rc_qp) ||
+ (recv_mem_used >= max_recv_buf * (1ul << 20))) {
+ spin_unlock_irq(&priv->lock);
+ ipoib_warn(priv, "NOSRQ has reached the configurable limit "
+ "of either %d RC QPs or, max recv buf size of "
+ "0x%x MB\n", max_rc_qp, max_recv_buf);
+
+ /* We send a REJ to the remote side indicating that we
+ * have no more free RC QPs and leave it to the remote side
+ * to take appropriate action. This should leave the
+ * current set of QPs unaffected and any subsequent REQs
+ * will be able to use RC QPs if they are available.
+ */
+ ib_send_cm_rej(cm_id, IB_CM_REJ_NO_QP, NULL, 0, NULL, 0);
+ ret = -EINVAL;
+ goto err_alloc_and_post;
+ }
+
+ priv->cm.rx_index_table[index] = p;
+ spin_unlock_irq(&priv->lock);
+
+ /* We will subsequently use this stored pointer while freeing
+ * resources in stale task
+ */
+ p->index = index;
+
+ ret = ipoib_cm_modify_rx_qp(dev, cm_id, p->qp, psn);
+ if (ret) {
+ ipoib_warn(priv, "ipoib_cm_modify_rx_qp() failed %d\n", ret);
+ ipoib_cm_dev_cleanup(dev);
+ goto err_alloc_and_post;
+ }
+
+ for (i = 0; i < ipoib_recvq_size; ++i) {
+ if (!ipoib_cm_alloc_rx_skb(dev, i << 32 | index,
+ IPOIB_CM_RX_SG - 1,
+ p->rx_ring[i].mapping)) {
+ ipoib_warn(priv, "failed to allocate receive "
+ "buffer %d\n", (int)i);
+ ipoib_cm_dev_cleanup(dev);
+ ret = -ENOMEM;
+ goto err_alloc_and_post;
+ }
+
+ if (post_receive_nosrq(dev, i << 32 | index)) {
+ ipoib_warn(priv, "post_receive_nosrq "
+ "failed for buf %lld\n", (unsigned long long)i);
+ ipoib_cm_dev_cleanup(dev);
+ ret = -EIO;
+ goto err_alloc_and_post;
+ }
+ }
+
+ return 0;
+
+err_alloc_and_post:
+ atomic_dec(¤t_rc_qp);
+ kfree(p->rx_ring);
+ return ret;
+}
+
static int ipoib_cm_req_handler(struct ib_cm_id *cm_id, struct ib_cm_event *event)
{
struct net_device *dev = cm_id->context;
@@ -302,9 +478,6 @@ static int ipoib_cm_req_handler(struct i
return -ENOMEM;
p->dev = dev;
p->id = cm_id;
- cm_id->context = p;
- p->state = IPOIB_CM_RX_LIVE;
- p->jiffies = jiffies;
INIT_LIST_HEAD(&p->list);
p->qp = ipoib_cm_create_rx_qp(dev, p);
@@ -314,19 +487,21 @@ static int ipoib_cm_req_handler(struct i
}
psn = random32() & 0xffffff;
- ret = ipoib_cm_modify_rx_qp(dev, cm_id, p->qp, psn);
- if (ret)
- goto err_modify;
+ if (!priv->cm.srq) {
+ ret = allocate_and_post_rbuf_nosrq(cm_id, p, psn);
+ if (ret)
+ goto err_post_nosrq;
+ } else {
+ p->rx_ring = NULL;
+ ret = ipoib_cm_modify_rx_qp(dev, cm_id, p->qp, psn);
+ if (ret)
+ goto err_modify;
+ }
- spin_lock_irq(&priv->lock);
- queue_delayed_work(ipoib_workqueue,
- &priv->cm.stale_task, IPOIB_CM_RX_DELAY);
- /* Add this entry to passive ids list head, but do not re-add it
- * if IB_EVENT_QP_LAST_WQE_REACHED has moved it to flush list. */
- p->jiffies = jiffies;
- if (p->state == IPOIB_CM_RX_LIVE)
- list_move(&p->list, &priv->cm.passive_ids);
- spin_unlock_irq(&priv->lock);
+ if (priv->cm.srq) {
+ p->state = IPOIB_CM_RX_LIVE;
+ init_context_and_add_list(cm_id, p, priv);
+ }
ret = ipoib_cm_send_rep(dev, cm_id, p->qp, &event->param.req_rcvd, psn);
if (ret) {
@@ -336,6 +511,8 @@ static int ipoib_cm_req_handler(struct i
}
return 0;
+err_post_nosrq:
+ list_del_init(&p->list);
err_modify:
ib_destroy_qp(p->qp);
err_qp:
@@ -399,29 +576,60 @@ static void skb_put_frags(struct sk_buff
}
}
-void ipoib_cm_handle_rx_wc(struct net_device *dev, struct ib_wc *wc)
+static void timer_check_srq(struct ipoib_dev_priv *priv, struct ipoib_cm_rx *p)
+{
+ unsigned long flags;
+
+ if (p && time_after_eq(jiffies, p->jiffies + IPOIB_CM_RX_UPDATE_TIME)) {
+ spin_lock_irqsave(&priv->lock, flags);
+ p->jiffies = jiffies;
+ /* Move this entry to list head, but do
+ * not re-add it if it has been removed.
+ */
+ if (p->state == IPOIB_CM_RX_LIVE)
+ list_move(&p->list, &priv->cm.passive_ids);
+ spin_unlock_irqrestore(&priv->lock, flags);
+ }
+}
+
+static void timer_check_nosrq(struct ipoib_dev_priv *priv, struct ipoib_cm_rx *p)
+{
+ unsigned long flags;
+
+ if (p && time_after_eq(jiffies, p->jiffies + IPOIB_CM_RX_UPDATE_TIME)) {
+ spin_lock_irqsave(&priv->lock, flags);
+ p->jiffies = jiffies;
+ /* Move this entry to list head, but do
+ * not re-add it if it has been removed. */
+ if (!list_empty(&p->list))
+ list_move(&p->list, &priv->cm.passive_ids);
+ spin_unlock_irqrestore(&priv->lock, flags);
+ }
+}
+
+void handle_rx_wc_srq(struct net_device *dev, struct ib_wc *wc)
{
struct ipoib_dev_priv *priv = netdev_priv(dev);
- unsigned int wr_id = wc->wr_id & ~IPOIB_CM_OP_SRQ;
+ u64 wr_id = wc->wr_id & ~IPOIB_CM_OP_RECV;
struct sk_buff *skb, *newskb;
struct ipoib_cm_rx *p;
unsigned long flags;
u64 mapping[IPOIB_CM_RX_SG];
- int frags;
+ int frags, ret;
- ipoib_dbg_data(priv, "cm recv completion: id %d, status: %d\n",
- wr_id, wc->status);
+ ipoib_dbg_data(priv, "cm recv completion: id %lld, status: %d\n",
+ (unsigned long long)wr_id, wc->status);
if (unlikely(wr_id >= ipoib_recvq_size)) {
- if (wr_id == (IPOIB_CM_RX_DRAIN_WRID & ~IPOIB_CM_OP_SRQ)) {
+ if (wr_id == (IPOIB_CM_RX_DRAIN_WRID & ~IPOIB_CM_OP_RECV)) {
spin_lock_irqsave(&priv->lock, flags);
list_splice_init(&priv->cm.rx_drain_list, &priv->cm.rx_reap_list);
ipoib_cm_start_rx_drain(priv);
queue_work(ipoib_workqueue, &priv->cm.rx_reap_task);
spin_unlock_irqrestore(&priv->lock, flags);
} else
- ipoib_warn(priv, "cm recv completion event with wrid %d (> %d)\n",
- wr_id, ipoib_recvq_size);
+ ipoib_warn(priv, "cm recv completion event with wrid %lld (> %d)\n",
+ (unsigned long long)wr_id, ipoib_recvq_size);
return;
}
@@ -429,23 +637,15 @@ void ipoib_cm_handle_rx_wc(struct net_de
if (unlikely(wc->status != IB_WC_SUCCESS)) {
ipoib_dbg(priv, "cm recv error "
- "(status=%d, wrid=%d vend_err %x)\n",
- wc->status, wr_id, wc->vendor_err);
+ "(status=%d, wrid=%lld vend_err %x)\n",
+ wc->status, (unsigned long long)wr_id, wc->vendor_err);
++priv->stats.rx_dropped;
- goto repost;
+ goto repost_srq;
}
if (!likely(wr_id & IPOIB_CM_RX_UPDATE_MASK)) {
p = wc->qp->qp_context;
- if (p && time_after_eq(jiffies, p->jiffies + IPOIB_CM_RX_UPDATE_TIME)) {
- spin_lock_irqsave(&priv->lock, flags);
- p->jiffies = jiffies;
- /* Move this entry to list head, but do not re-add it
- * if it has been moved out of list. */
- if (p->state == IPOIB_CM_RX_LIVE)
- list_move(&p->list, &priv->cm.passive_ids);
- spin_unlock_irqrestore(&priv->lock, flags);
- }
+ timer_check_srq(priv, p);
}
frags = PAGE_ALIGN(wc->byte_len - min(wc->byte_len,
@@ -457,13 +657,113 @@ void ipoib_cm_handle_rx_wc(struct net_de
* If we can't allocate a new RX buffer, dump
* this packet and reuse the old buffer.
*/
- ipoib_dbg(priv, "failed to allocate receive buffer %d\n", wr_id);
+ ipoib_dbg(priv, "failed to allocate receive buffer %lld\n",
+ (unsigned long long)wr_id);
+ ++priv->stats.rx_dropped;
+ goto repost_srq;
+ }
+
+ ipoib_cm_dma_unmap_rx(priv, frags,
+ priv->cm.srq_ring[wr_id].mapping);
+ memcpy(priv->cm.srq_ring[wr_id].mapping, mapping,
+ (frags + 1) * sizeof *mapping);
+ ipoib_dbg_data(priv, "received %d bytes, SLID 0x%04x\n",
+ wc->byte_len, wc->slid);
+
+ skb_put_frags(skb, IPOIB_CM_HEAD_SIZE, wc->byte_len, newskb);
+
+ skb->protocol = ((struct ipoib_header *) skb->data)->proto;
+ skb_reset_mac_header(skb);
+ skb_pull(skb, IPOIB_ENCAP_LEN);
+
+ dev->last_rx = jiffies;
+ ++priv->stats.rx_packets;
+ priv->stats.rx_bytes += skb->len;
+
+ skb->dev = dev;
+ /* XXX get correct PACKET_ type here */
+ skb->pkt_type = PACKET_HOST;
+ netif_receive_skb(skb);
+
+repost_srq:
+ ret = post_receive_srq(dev, wr_id);
+
+ if (unlikely(ret))
+ ipoib_warn(priv, "post_receive_srq failed for buf %lld\n",
+ (unsigned long long)wr_id);
+
+}
+
+static void handle_rx_wc_nosrq(struct net_device *dev, struct ib_wc *wc)
+{
+ struct ipoib_dev_priv *priv = netdev_priv(dev);
+ struct sk_buff *skb, *newskb;
+ u64 mapping[IPOIB_CM_RX_SG], wr_id = wc->wr_id >> 32;
+ u32 index;
+ struct ipoib_cm_rx *rx_ptr;
+ int frags, ret;
+
+
+ ipoib_dbg_data(priv, "cm recv completion: id %lld, status: %d\n",
+ (unsigned long long)wr_id, wc->status);
+
+ if (unlikely(wr_id >= ipoib_recvq_size)) {
+ ipoib_warn(priv, "cm recv completion event with wrid %lld (> %d)\n",
+ (unsigned long long)wr_id, ipoib_recvq_size);
+ return;
+ }
+
+ index = (wc->wr_id & ~IPOIB_CM_OP_RECV) & NOSRQ_INDEX_MASK ;
+
+ /* This is the only place where rx_ptr could be a NULL - could
+ * have just received a packet from a connection that has become
+ * stale and so is going away. We will simply drop the packet and
+ * let the hardware (it s IB_QPT_RC) handle the dropped packet.
+ * In the timer_check() function below, p->jiffies is updated and
+ * hence the connection will not be stale after that.
+ */
+ rx_ptr = priv->cm.rx_index_table[index];
+ if (unlikely(!rx_ptr)) {
+ ipoib_warn(priv, "Received packet from a connection "
+ "that is going away. Hardware will handle it.\n");
+ return;
+ }
+
+ skb = rx_ptr->rx_ring[wr_id].skb;
+
+ if (unlikely(wc->status != IB_WC_SUCCESS)) {
+ ipoib_dbg(priv, "cm recv error "
+ "(status=%d, wrid=%lld vend_err %x)\n",
+ wc->status, (unsigned long long)wr_id, wc->vendor_err);
+ ++priv->stats.rx_dropped;
+ goto repost_nosrq;
+ }
+
+ if (!likely(wr_id & IPOIB_CM_RX_UPDATE_MASK)) {
+ /* There are no guarantees that wc->qp is not NULL for HCAs
+ * that do not support SRQ. */
+ timer_check_nosrq(priv, rx_ptr);
+ }
+
+ frags = PAGE_ALIGN(wc->byte_len - min(wc->byte_len,
+ (unsigned)IPOIB_CM_HEAD_SIZE)) / PAGE_SIZE;
+
+ newskb = ipoib_cm_alloc_rx_skb(dev, wr_id << 32 | index, frags,
+ mapping);
+ if (unlikely(!newskb)) {
+ /*
+ * If we can't allocate a new RX buffer, dump
+ * this packet and reuse the old buffer.
+ */
+ ipoib_dbg(priv, "failed to allocate receive buffer %lld\n",
+ (unsigned long long)wr_id);
++priv->stats.rx_dropped;
- goto repost;
+ goto repost_nosrq;
}
- ipoib_cm_dma_unmap_rx(priv, frags, priv->cm.srq_ring[wr_id].mapping);
- memcpy(priv->cm.srq_ring[wr_id].mapping, mapping, (frags + 1) * sizeof *mapping);
+ ipoib_cm_dma_unmap_rx(priv, frags, rx_ptr->rx_ring[wr_id].mapping);
+ memcpy(rx_ptr->rx_ring[wr_id].mapping, mapping,
+ (frags + 1) * sizeof *mapping);
ipoib_dbg_data(priv, "received %d bytes, SLID 0x%04x\n",
wc->byte_len, wc->slid);
@@ -483,10 +783,22 @@ void ipoib_cm_handle_rx_wc(struct net_de
skb->pkt_type = PACKET_HOST;
netif_receive_skb(skb);
-repost:
- if (unlikely(ipoib_cm_post_receive(dev, wr_id)))
- ipoib_warn(priv, "ipoib_cm_post_receive failed "
- "for buf %d\n", wr_id);
+repost_nosrq:
+ ret = post_receive_nosrq(dev, wr_id << 32 | index);
+
+ if (unlikely(ret))
+ ipoib_warn(priv, "post_receive_nosrq failed for buf %lld\n",
+ (unsigned long long)wr_id);
+}
+
+void ipoib_cm_handle_rx_wc(struct net_device *dev, struct ib_wc *wc)
+{
+ struct ipoib_dev_priv *priv = netdev_priv(dev);
+
+ if (priv->cm.srq)
+ handle_rx_wc_srq(dev, wc);
+ else
+ handle_rx_wc_nosrq(dev, wc);
}
static inline int post_send(struct ipoib_dev_priv *priv,
@@ -678,6 +990,42 @@ err_cm:
return ret;
}
+static void free_resources_nosrq(struct ipoib_dev_priv *priv, struct ipoib_cm_rx *p)
+{
+ int i;
+
+ for (i = 0; i < ipoib_recvq_size; ++i)
+ if (p->rx_ring[i].skb) {
+ ipoib_cm_dma_unmap_rx(priv,
+ IPOIB_CM_RX_SG - 1,
+ p->rx_ring[i].mapping);
+ dev_kfree_skb_any(p->rx_ring[i].skb);
+ p->rx_ring[i].skb = NULL;
+ }
+ kfree(p->rx_ring);
+}
+
+void dev_stop_nosrq(struct ipoib_dev_priv *priv)
+{
+ struct ipoib_cm_rx *p;
+
+ spin_lock_irq(&priv->lock);
+ while (!list_empty(&priv->cm.passive_ids)) {
+ p = list_entry(priv->cm.passive_ids.next, typeof(*p), list);
+ free_resources_nosrq(priv, p);
+ list_del(&p->list);
+ spin_unlock_irq(&priv->lock);
+ ib_destroy_cm_id(p->id);
+ ib_destroy_qp(p->qp);
+ atomic_dec(¤t_rc_qp);
+ kfree(p);
+ spin_lock_irq(&priv->lock);
+ }
+ spin_unlock_irq(&priv->lock);
+
+ cancel_delayed_work(&priv->cm.stale_task);
+}
+
void ipoib_cm_dev_stop(struct net_device *dev)
{
struct ipoib_dev_priv *priv = netdev_priv(dev);
@@ -692,6 +1040,11 @@ void ipoib_cm_dev_stop(struct net_device
ib_destroy_cm_id(priv->cm.id);
priv->cm.id = NULL;
+ if (!priv->cm.srq) {
+ dev_stop_nosrq(priv);
+ return;
+ }
+
spin_lock_irq(&priv->lock);
while (!list_empty(&priv->cm.passive_ids)) {
p = list_entry(priv->cm.passive_ids.next, typeof(*p), list);
@@ -815,7 +1168,9 @@ static struct ib_qp *ipoib_cm_create_tx_
attr.recv_cq = priv->cq;
attr.srq = priv->cm.srq;
attr.cap.max_send_wr = ipoib_sendq_size;
+ attr.cap.max_recv_wr = 1;
attr.cap.max_send_sge = 1;
+ attr.cap.max_recv_sge = 1;
attr.sq_sig_type = IB_SIGNAL_ALL_WR;
attr.qp_type = IB_QPT_RC;
attr.send_cq = cq;
@@ -855,7 +1210,7 @@ static int ipoib_cm_send_req(struct net_
req.retry_count = 0; /* RFC draft warns against retries */
req.rnr_retry_count = 0; /* RFC draft warns against retries */
req.max_cm_retries = 15;
- req.srq = 1;
+ req.srq = !!priv->cm.srq;
return ib_send_cm_req(id, &req);
}
@@ -1200,6 +1555,8 @@ static void ipoib_cm_rx_reap(struct work
list_for_each_entry_safe(p, n, &list, list) {
ib_destroy_cm_id(p->id);
ib_destroy_qp(p->qp);
+ if (!priv->cm.srq)
+ atomic_dec(¤t_rc_qp);
kfree(p);
}
}
@@ -1218,12 +1575,19 @@ static void ipoib_cm_stale_task(struct w
p = list_entry(priv->cm.passive_ids.prev, typeof(*p), list);
if (time_before_eq(jiffies, p->jiffies + IPOIB_CM_RX_TIMEOUT))
break;
- list_move(&p->list, &priv->cm.rx_error_list);
- p->state = IPOIB_CM_RX_ERROR;
- spin_unlock_irq(&priv->lock);
- ret = ib_modify_qp(p->qp, &ipoib_cm_err_attr, IB_QP_STATE);
- if (ret)
- ipoib_warn(priv, "unable to move qp to error state: %d\n", ret);
+ if (!priv->cm.srq) {
+ free_resources_nosrq(priv, p);
+ list_del_init(&p->list);
+ priv->cm.rx_index_table[p->index] = NULL;
+ spin_unlock_irq(&priv->lock);
+ } else {
+ list_move(&p->list, &priv->cm.rx_error_list);
+ p->state = IPOIB_CM_RX_ERROR;
+ spin_unlock_irq(&priv->lock);
+ ret = ib_modify_qp(p->qp, &ipoib_cm_err_attr, IB_QP_STATE);
+ if (ret)
+ ipoib_warn(priv, "unable to move qp to error state: %d\n", ret);
+ }
spin_lock_irq(&priv->lock);
}
@@ -1277,16 +1641,40 @@ int ipoib_cm_add_mode_attr(struct net_de
return device_create_file(&dev->dev, &dev_attr_mode);
}
+static int create_srq(struct net_device *dev, struct ipoib_dev_priv *priv)
+{
+ struct ib_srq_init_attr srq_init_attr;
+ int ret;
+
+ srq_init_attr.attr.max_wr = ipoib_recvq_size;
+ srq_init_attr.attr.max_sge = IPOIB_CM_RX_SG;
+
+ priv->cm.srq = ib_create_srq(priv->pd, &srq_init_attr);
+ if (IS_ERR(priv->cm.srq)) {
+ ret = PTR_ERR(priv->cm.srq);
+ priv->cm.srq = NULL;
+ return ret;
+ }
+
+ priv->cm.srq_ring = kzalloc(ipoib_recvq_size *
+ sizeof *priv->cm.srq_ring,
+ GFP_KERNEL);
+ if (!priv->cm.srq_ring) {
+ printk(KERN_WARNING "%s: failed to allocate CM ring "
+ "(%d entries)\n",
+ priv->ca->name, ipoib_recvq_size);
+ ipoib_cm_dev_cleanup(dev);
+ return -ENOMEM;
+ }
+
+ return 0;
+}
+
int ipoib_cm_dev_init(struct net_device *dev)
{
struct ipoib_dev_priv *priv = netdev_priv(dev);
- struct ib_srq_init_attr srq_init_attr = {
- .attr = {
- .max_wr = ipoib_recvq_size,
- .max_sge = IPOIB_CM_RX_SG
- }
- };
int ret, i;
+ struct ib_device_attr attr;
INIT_LIST_HEAD(&priv->cm.passive_ids);
INIT_LIST_HEAD(&priv->cm.reap_list);
@@ -1303,20 +1691,32 @@ int ipoib_cm_dev_init(struct net_device
skb_queue_head_init(&priv->cm.skb_queue);
- priv->cm.srq = ib_create_srq(priv->pd, &srq_init_attr);
- if (IS_ERR(priv->cm.srq)) {
- ret = PTR_ERR(priv->cm.srq);
- priv->cm.srq = NULL;
+ ret = ib_query_device(priv->ca, &attr);
+ if (ret)
return ret;
- }
- priv->cm.srq_ring = kzalloc(ipoib_recvq_size * sizeof *priv->cm.srq_ring,
- GFP_KERNEL);
- if (!priv->cm.srq_ring) {
- printk(KERN_WARNING "%s: failed to allocate CM ring (%d entries)\n",
- priv->ca->name, ipoib_recvq_size);
- ipoib_cm_dev_cleanup(dev);
- return -ENOMEM;
+ if (attr.max_srq) {
+ /* This device supports SRQ */
+ ret = create_srq(dev, priv);
+ if (ret)
+ return ret;
+ priv->cm.rx_index_table = NULL;
+ } else {
+ priv->cm.srq = NULL;
+ priv->cm.srq_ring = NULL;
+
+ /* Every new REQ that arrives creates a struct ipoib_cm_rx.
+ * These structures form a link list starting with the
+ * passive_ids. For quick and easy access we maintain a table
+ * of pointers to struct ipoib_cm_rx called the rx_index_table
+ */
+ priv->cm.rx_index_table = kzalloc(NOSRQ_INDEX_TABLE_SIZE *
+ sizeof *priv->cm.rx_index_table,
+ GFP_KERNEL);
+ if (!priv->cm.rx_index_table) {
+ printk(KERN_WARNING "Failed to allocate NOSRQ_INDEX_TABLE\n");
+ return -ENOMEM;
+ }
}
for (i = 0; i < IPOIB_CM_RX_SG; ++i)
@@ -1329,17 +1729,24 @@ int ipoib_cm_dev_init(struct net_device
priv->cm.rx_wr.sg_list = priv->cm.rx_sge;
priv->cm.rx_wr.num_sge = IPOIB_CM_RX_SG;
- for (i = 0; i < ipoib_recvq_size; ++i) {
- if (!ipoib_cm_alloc_rx_skb(dev, i, IPOIB_CM_RX_SG - 1,
+ /* One can post receive buffers even before the RX QP is created
+ * only in the SRQ case. Therefore for NOSRQ we skip the rest of init
+ * and do that in ipoib_cm_req_handler()
+ */
+
+ if (priv->cm.srq) {
+ for (i = 0; i < ipoib_recvq_size; ++i) {
+ if (!ipoib_cm_alloc_rx_skb(dev, i, IPOIB_CM_RX_SG - 1,
priv->cm.srq_ring[i].mapping)) {
- ipoib_warn(priv, "failed to allocate receive buffer %d\n", i);
- ipoib_cm_dev_cleanup(dev);
- return -ENOMEM;
- }
- if (ipoib_cm_post_receive(dev, i)) {
- ipoib_warn(priv, "ipoib_ib_post_receive failed for buf %d\n", i);
- ipoib_cm_dev_cleanup(dev);
- return -EIO;
+ ipoib_warn(priv, "failed to allocate receive buffer %d\n", i);
+ ipoib_cm_dev_cleanup(dev);
+ return -ENOMEM;
+ }
+ if (post_receive_srq(dev, i)) {
+ ipoib_warn(priv, "post_receive_srq failed for buf %d\n", i);
+ ipoib_cm_dev_cleanup(dev);
+ return -EIO;
+ }
}
}
--- a/linux-2.6.22/drivers/infiniband/ulp/ipoib/ipoib_ib.c 2007-05-30 14:56:25.000000000 -0400
+++ b/linux-2.6.22/drivers/infiniband/ulp/ipoib/ipoib_ib.c 2007-07-10 18:30:10.000000000 -0400
@@ -299,7 +299,7 @@ int ipoib_poll(struct net_device *dev, i
for (i = 0; i < n; ++i) {
struct ib_wc *wc = priv->ibwc + i;
- if (wc->wr_id & IPOIB_CM_OP_SRQ) {
+ if (wc->wr_id & IPOIB_CM_OP_RECV) {
++done;
--max;
ipoib_cm_handle_rx_wc(dev, wc);
@@ -557,7 +557,7 @@ void ipoib_drain_cq(struct net_device *d
do {
n = ib_poll_cq(priv->cq, IPOIB_NUM_WC, priv->ibwc);
for (i = 0; i < n; ++i) {
- if (priv->ibwc[i].wr_id & IPOIB_CM_OP_SRQ)
+ if (priv->ibwc[i].wr_id & IPOIB_CM_OP_RECV)
ipoib_cm_handle_rx_wc(dev, priv->ibwc + i);
else if (priv->ibwc[i].wr_id & IPOIB_OP_RECV)
ipoib_ib_handle_rx_wc(dev, priv->ibwc + i);
--- a/linux-2.6.22/drivers/infiniband/ulp/ipoib/ipoib_verbs.c 2007-05-30 14:56:25.000000000 -0400
+++ b/linux-2.6.22/drivers/infiniband/ulp/ipoib/ipoib_verbs.c 2007-07-19 02:55:24.000000000 -0400
@@ -175,6 +175,18 @@ int ipoib_transport_dev_init(struct net_
if (!ret)
size += ipoib_recvq_size + 1 /* 1 extra for rx_drain_qp */;
+#ifdef CONFIG_INFINIBAND_IPOIB_CM
+
+ /* We increase the size of the CQ in the NOSRQ case to prevent CQ
+ * overflow. Every new REQ creates a new RX QP and each QP has an
+ * RX ring associated with it. Therefore we could have
+ * NOSRQ_INDEX_TABLE_SIZE*ipoib_recvq_size + ipoib_sendq_size CQEs
+ * in a CQ.
+ */
+ if (!priv->cm.srq)
+ size += (NOSRQ_INDEX_TABLE_SIZE - 1) * ipoib_recvq_size;
+#endif
+
priv->cq = ib_create_cq(priv->ca, ipoib_ib_completion, NULL, dev, size, 0);
if (IS_ERR(priv->cq)) {
printk(KERN_WARNING "%s: failed to create CQ\n", ca->name);
More information about the general
mailing list