[ofa-general] IPOB CM (NOSRQ) [PATCH V6] patch
Sean Hefty
sean.hefty at intel.com
Tue Jun 12 12:13:37 PDT 2007
>+module_param_named(max_recieve_buffer, max_recv_buf, int, 0644);
>+MODULE_PARM_DESC(max_recieve_buffer, "Max Recieve Buffer Size in MB");
nit: receive misspelled
>+static int allocate_and_post_rbuf_nosrq(struct ib_cm_id *cm_id,
>+ struct ipoib_cm_rx *p, unsigned psn)
>+{
>+ struct net_device *dev = cm_id->context;
>+ struct ipoib_dev_priv *priv = netdev_priv(dev);
>+ int ret;
>+ u32 qp_num, index;
>+ u64 i, recv_mem_used;
>+
>+ qp_num = p->qp->qp_num;
>+
>+ /* In the SRQ case there is a common rx buffer called the srq_ring.
>+ * However, for the NOSRQ we create an rx_ring for every
>+ * struct ipoib_cm_rx.
>+ */
>+ p->rx_ring = kzalloc(ipoib_recvq_size * sizeof *p->rx_ring, GFP_KERNEL);
>+ if (!p->rx_ring) {
>+ printk(KERN_WARNING "Failed to allocate rx_ring for 0x%x\n",
>+ qp_num);
>+ return -ENOMEM;
>+ }
>+
>+ init_context_and_add_list(cm_id, p, priv);
>+ spin_lock_irq(&priv->lock);
>+
>+ for (index = 0; index < max_rc_qp; index++)
>+ if (priv->cm.rx_index_table[index] == NULL)
>+ break;
>+
>+ spin_lock(&nosrq_count.lock);
>+ recv_mem_used = (u64)ipoib_recvq_size * (u64)nosrq_count.current_rc_qp
>+ * CM_PACKET_SIZE; /* packets are 64K */
>+ spin_unlock(&nosrq_count.lock);
Is a spin lock needed here? Could you make current_rc_qp an atomic?
>+err_send_rej:
>+err_modify_nosrq:
>+err_alloc_and_post:
Maybe just use a single label?
>@@ -316,9 +488,18 @@ static int ipoib_cm_req_handler(struct i
> }
>
> psn = random32() & 0xffffff;
>- ret = ipoib_cm_modify_rx_qp(dev, cm_id, p->qp, psn);
>- if (ret)
>- goto err_modify;
>+ if (!priv->cm.srq) {
>+ spin_lock(&nosrq_count.lock);
>+ nosrq_count.current_rc_qp++;
>+ spin_unlock(&nosrq_count.lock);
>+ if (ret = allocate_and_post_rbuf_nosrq(cm_id, p, psn))
Use double parens around assignment: if ((ret = ..))
>+ goto err_post_nosrq;
>+ } else {
>+ p->rx_ring = NULL;
>+ ret = ipoib_cm_modify_rx_qp(dev, cm_id, p->qp, psn);
>+ if (ret)
>+ goto err_modify;
>+ }
>
> ret = ipoib_cm_send_rep(dev, cm_id, p->qp, &event->param.req_rcvd, psn);
> if (ret) {
>@@ -326,18 +507,18 @@ static int ipoib_cm_req_handler(struct i
> goto err_rep;
> }
>
>- cm_id->context = p;
>- p->jiffies = jiffies;
>- p->state = IPOIB_CM_RX_LIVE;
>- spin_lock_irq(&priv->lock);
>- if (list_empty(&priv->cm.passive_ids))
>- queue_delayed_work(ipoib_workqueue,
>- &priv->cm.stale_task, IPOIB_CM_RX_DELAY);
>- list_add(&p->list, &priv->cm.passive_ids);
>- spin_unlock_irq(&priv->lock);
>+ if (priv->cm.srq) {
>+ init_context_and_add_list(cm_id, p, priv);
>+ p->state = IPOIB_CM_RX_LIVE;
The order between setting p->state and adding the item to the list changes here.
I don't know if this matters, but it's now possible for the work queue to
execute before p->state is set.
>+ }
> return 0;
>
> err_rep:
>+err_post_nosrq:
>+ list_del_init(&p->list);
Is this correct? Is p->list on any list at this point?
>+ spin_lock(&nosrq_count.lock);
>+ nosrq_count.current_rc_qp--;
>+ spin_unlock(&nosrq_count.lock);
> err_modify:
> ib_destroy_qp(p->qp);
> err_qp:
>@@ -401,21 +582,51 @@ static void skb_put_frags(struct sk_buff
> }
> }
>
>-void ipoib_cm_handle_rx_wc(struct net_device *dev, struct ib_wc *wc)
>+static void timer_check_srq(struct ipoib_dev_priv *priv, struct
>ipoib_cm_rx *p)
>+{
>+ unsigned long flags;
>+
>+ if (p && time_after_eq(jiffies, p->jiffies + IPOIB_CM_RX_UPDATE_TIME)) {
>+ spin_lock_irqsave(&priv->lock, flags);
>+ p->jiffies = jiffies;
>+ /* Move this entry to list head, but do
>+ * not re-add it if it has been removed. */
nit: There are several places in the patch where the commenting style needs
updating.
>+ if (p->state == IPOIB_CM_RX_LIVE)
>+ list_move(&p->list, &priv->cm.passive_ids);
>+ spin_unlock_irqrestore(&priv->lock, flags);
>+ }
>+}
>+
>+static void timer_check_nosrq(struct ipoib_dev_priv *priv, struct
>ipoib_cm_rx *p)
>+{
>+ unsigned long flags;
>+
>+ if (p && time_after_eq(jiffies, p->jiffies + IPOIB_CM_RX_UPDATE_TIME)) {
>+ spin_lock_irqsave(&priv->lock, flags);
>+ p->jiffies = jiffies;
>+ /* Move this entry to list head, but do
>+ * not re-add it if it has been removed. */
>+ if (!list_empty(&p->list))
This line is the only difference between this function and the previous one. Is
it possible to always use the state check?
>+ list_move(&p->list, &priv->cm.passive_ids);
>+ spin_unlock_irqrestore(&priv->lock, flags);
>+ }
>+}
>+static void handle_rx_wc_nosrq(struct net_device *dev, struct ib_wc *wc)
>+{
>+ struct ipoib_dev_priv *priv = netdev_priv(dev);
>+ struct sk_buff *skb, *newskb;
>+ u64 mapping[IPOIB_CM_RX_SG], wr_id = wc->wr_id >> 32;
>+ u32 index;
>+ struct ipoib_cm_rx *p, *rx_ptr;
>+ int frags, ret;
>+
>+
>+ ipoib_dbg_data(priv, "cm recv completion: id %d, status: %d\n",
>+ wr_id, wc->status);
>+
>+ if (unlikely(wr_id >= ipoib_recvq_size)) {
>+ ipoib_warn(priv, "cm recv completion event with wrid %d (>
%d)\n",
>+ wr_id, ipoib_recvq_size);
>+ return;
>+ }
>+
>+ index = (wc->wr_id & ~IPOIB_CM_OP_RECV) & NOSRQ_INDEX_MASK ;
>+
>+ /* This is the only place where rx_ptr could be a NULL - could
>+ * have just received a packet from a connection that has become
>+ * stale and so is going away. We will simply drop the packet and
>+ * let the hardware (it s IB_QPT_RC) handle the dropped packet.
>+ * In the timer_check() function below, p->jiffies is updated and
>+ * hence the connection will not be stale after that.
>+ */
>+ rx_ptr = priv->cm.rx_index_table[index];
Is synchronization needed here?
>+ if (unlikely(!rx_ptr)) {
>+ ipoib_warn(priv, "Received packet from a connection "
>+ "that is going away. Hardware will handle it.\n");
>+ return;
>+ }
>+
>+ skb = rx_ptr->rx_ring[wr_id].skb;
>+
>+ if (unlikely(wc->status != IB_WC_SUCCESS)) {
>+ ipoib_dbg(priv, "cm recv error "
>+ "(status=%d, wrid=%ld vend_err %x)\n",
>+ wc->status, wr_id, wc->vendor_err);
>+ ++priv->stats.rx_dropped;
>+ goto repost_nosrq;
>+ }
>+
>+ if (!likely(wr_id & IPOIB_CM_RX_UPDATE_MASK)) {
>+ /* There are no guarantees that wc->qp is not NULL for HCAs
>+ * that do not support SRQ. */
>+ p = rx_ptr;
>+ timer_check_nosrq(priv, p);
This appears to be the only place 'p' is used in this call. I think we can just
remove it.
>+ }
>+
>+ frags = PAGE_ALIGN(wc->byte_len - min(wc->byte_len,
>+ (unsigned)IPOIB_CM_HEAD_SIZE)) /
PAGE_SIZE;
>+
>+ newskb = ipoib_cm_alloc_rx_skb(dev, wr_id << 32 | index, frags,
>+ mapping);
>+ if (unlikely(!newskb)) {
>+ /*
>+ * If we can't allocate a new RX buffer, dump
>+ * this packet and reuse the old buffer.
>+ */
>+ ipoib_dbg(priv, "failed to allocate receive buffer %ld\n",
wr_id);
> ++priv->stats.rx_dropped;
>- goto repost;
>+ goto repost_nosrq;
> }
>
>- ipoib_cm_dma_unmap_rx(priv, frags, priv->cm.srq_ring[wr_id].mapping);
>- memcpy(priv->cm.srq_ring[wr_id].mapping, mapping, (frags + 1) * sizeof
>*mapping);
>+ ipoib_cm_dma_unmap_rx(priv, frags,
>+ rx_ptr->rx_ring[wr_id].mapping);
>+ memcpy(rx_ptr->rx_ring[wr_id].mapping, mapping,
>+ (frags + 1) * sizeof *mapping);
>
> ipoib_dbg_data(priv, "received %d bytes, SLID 0x%04x\n",
> wc->byte_len, wc->slid);
>@@ -485,10 +788,22 @@ void ipoib_cm_handle_rx_wc(struct net_de
> skb->pkt_type = PACKET_HOST;
> netif_receive_skb(skb);
>
>-repost:
>- if (unlikely(ipoib_cm_post_receive(dev, wr_id)))
>- ipoib_warn(priv, "ipoib_cm_post_receive failed "
>- "for buf %d\n", wr_id);
>+repost_nosrq:
>+ ret = post_receive_nosrq(dev, wr_id << 32 | index);
>+
>+ if (unlikely(ret))
>+ ipoib_warn(priv, "post_receive_nosrq failed for buf %ld\n",
>+ wr_id);
>+}
>+
>+void ipoib_cm_handle_rx_wc(struct net_device *dev, struct ib_wc *wc)
>+{
>+ struct ipoib_dev_priv *priv = netdev_priv(dev);
>+
>+ if (priv->cm.srq)
>+ handle_rx_wc_srq(dev, wc);
>+ else
>+ handle_rx_wc_nosrq(dev, wc);
> }
>
> static inline int post_send(struct ipoib_dev_priv *priv,
>@@ -680,6 +995,44 @@ err_cm:
> return ret;
> }
>
>+static void free_resources_nosrq(struct ipoib_dev_priv *priv, struct
>ipoib_cm_rx *p)
>+{
>+ int i;
>+
>+ for(i = 0; i < ipoib_recvq_size; ++i)
>+ if(p->rx_ring[i].skb) {
>+ ipoib_cm_dma_unmap_rx(priv,
>+ IPOIB_CM_RX_SG - 1,
>+ p->rx_ring[i].mapping);
>+ dev_kfree_skb_any(p->rx_ring[i].skb);
>+ p->rx_ring[i].skb = NULL;
>+ }
>+ kfree(p->rx_ring);
>+}
>+
>+void dev_stop_nosrq(struct ipoib_dev_priv *priv)
>+{
>+ struct ipoib_cm_rx *p;
>+
>+ spin_lock_irq(&priv->lock);
>+ while (!list_empty(&priv->cm.passive_ids)) {
>+ p = list_entry(priv->cm.passive_ids.next, typeof(*p), list);
>+ free_resources_nosrq(priv, p);
>+ list_del_init(&p->list);
just list_del should work here
>+ spin_unlock_irq(&priv->lock);
>+ ib_destroy_cm_id(p->id);
>+ ib_destroy_qp(p->qp);
>+ spin_lock(&nosrq_count.lock);
>+ nosrq_count.current_rc_qp--;
>+ spin_unlock(&nosrq_count.lock);
>+ kfree(p);
>+ spin_lock_irq(&priv->lock);
>+ }
>+ spin_unlock_irq(&priv->lock);
>+
>+ cancel_delayed_work(&priv->cm.stale_task);
>+}
>+
> void ipoib_cm_dev_stop(struct net_device *dev)
> {
> struct ipoib_dev_priv *priv = netdev_priv(dev);
>@@ -694,6 +1047,11 @@ void ipoib_cm_dev_stop(struct net_device
> ib_destroy_cm_id(priv->cm.id);
> priv->cm.id = NULL;
>
>+ if (!priv->cm.srq) {
>+ dev_stop_nosrq(priv);
>+ return;
>+ }
>+
> spin_lock_irq(&priv->lock);
> while (!list_empty(&priv->cm.passive_ids)) {
> p = list_entry(priv->cm.passive_ids.next, typeof(*p), list);
>@@ -739,6 +1097,7 @@ void ipoib_cm_dev_stop(struct net_device
> kfree(p);
> }
>
>+
> cancel_delayed_work(&priv->cm.stale_task);
> }
>
>@@ -817,7 +1176,9 @@ static struct ib_qp *ipoib_cm_create_tx_
> attr.recv_cq = priv->cq;
> attr.srq = priv->cm.srq;
> attr.cap.max_send_wr = ipoib_sendq_size;
>+ attr.cap.max_recv_wr = 1;
> attr.cap.max_send_sge = 1;
>+ attr.cap.max_recv_sge = 1;
> attr.sq_sig_type = IB_SIGNAL_ALL_WR;
> attr.qp_type = IB_QPT_RC;
> attr.send_cq = cq;
>@@ -857,7 +1218,7 @@ static int ipoib_cm_send_req(struct net_
> req.retry_count = 0; /* RFC draft warns against retries */
> req.rnr_retry_count = 0; /* RFC draft warns against retries */
> req.max_cm_retries = 15;
>- req.srq = 1;
>+ req.srq = !!priv->cm.srq;
> return ib_send_cm_req(id, &req);
> }
>
>@@ -1202,6 +1563,11 @@ static void ipoib_cm_rx_reap(struct work
> list_for_each_entry_safe(p, n, &list, list) {
> ib_destroy_cm_id(p->id);
> ib_destroy_qp(p->qp);
>+ if (!priv->cm.srq) {
>+ spin_lock(&nosrq_count.lock);
>+ nosrq_count.current_rc_qp--;
>+ spin_unlock(&nosrq_count.lock);
>+ }
> kfree(p);
> }
> }
>@@ -1220,12 +1586,19 @@ static void ipoib_cm_stale_task(struct w
> p = list_entry(priv->cm.passive_ids.prev, typeof(*p), list);
> if (time_before_eq(jiffies, p->jiffies + IPOIB_CM_RX_TIMEOUT))
> break;
>- list_move(&p->list, &priv->cm.rx_error_list);
>- p->state = IPOIB_CM_RX_ERROR;
>- spin_unlock_irq(&priv->lock);
>- ret = ib_modify_qp(p->qp, &ipoib_cm_err_attr, IB_QP_STATE);
>- if (ret)
>- ipoib_warn(priv, "unable to move qp to error state:
%d\n",
>ret);
>+ if (!priv->cm.srq) {
>+ free_resources_nosrq(priv, p);
>+ list_del_init(&p->list);
>+ priv->cm.rx_index_table[p->index] = NULL;
>+ spin_unlock_irq(&priv->lock);
>+ } else {
>+ list_move(&p->list, &priv->cm.rx_error_list);
>+ p->state = IPOIB_CM_RX_ERROR;
>+ spin_unlock_irq(&priv->lock);
>+ ret = ib_modify_qp(p->qp, &ipoib_cm_err_attr,
IB_QP_STATE);
>+ if (ret)
>+ ipoib_warn(priv, "unable to move qp to error
state:
>%d\n", ret);
>+ }
> spin_lock_irq(&priv->lock);
> }
>
>@@ -1279,16 +1652,40 @@ int ipoib_cm_add_mode_attr(struct net_de
> return device_create_file(&dev->dev, &dev_attr_mode);
> }
>
>+static int create_srq(struct net_device *dev, struct ipoib_dev_priv *priv)
>+{
>+ struct ib_srq_init_attr srq_init_attr;
>+ int ret;
>+
>+ srq_init_attr.attr.max_wr = ipoib_recvq_size;
>+ srq_init_attr.attr.max_sge = IPOIB_CM_RX_SG;
>+
>+ priv->cm.srq = ib_create_srq(priv->pd, &srq_init_attr);
>+ if (IS_ERR(priv->cm.srq)) {
>+ ret = PTR_ERR(priv->cm.srq);
>+ priv->cm.srq = NULL;
>+ return ret;
nit: you can just return PTR_ERR here, and remove the ret stack variable
>+ }
>+
>+ priv->cm.srq_ring = kzalloc(ipoib_recvq_size *
>+ sizeof *priv->cm.srq_ring,
>+ GFP_KERNEL);
>+ if (!priv->cm.srq_ring) {
>+ printk(KERN_WARNING "%s: failed to allocate CM ring "
>+ "(%d entries)\n",
>+ priv->ca->name, ipoib_recvq_size);
>+ ipoib_cm_dev_cleanup(dev);
>+ return -ENOMEM;
>+ }
>+
>+ return 0;
>+}
>+
> int ipoib_cm_dev_init(struct net_device *dev)
> {
> struct ipoib_dev_priv *priv = netdev_priv(dev);
>- struct ib_srq_init_attr srq_init_attr = {
>- .attr = {
>- .max_wr = ipoib_recvq_size,
>- .max_sge = IPOIB_CM_RX_SG
>- }
>- };
> int ret, i;
>+ struct ib_device_attr attr;
>
> INIT_LIST_HEAD(&priv->cm.passive_ids);
> INIT_LIST_HEAD(&priv->cm.reap_list);
>@@ -1305,20 +1702,33 @@ int ipoib_cm_dev_init(struct net_device
>
> skb_queue_head_init(&priv->cm.skb_queue);
>
>- priv->cm.srq = ib_create_srq(priv->pd, &srq_init_attr);
>- if (IS_ERR(priv->cm.srq)) {
>- ret = PTR_ERR(priv->cm.srq);
>- priv->cm.srq = NULL;
>+ if (ret = ib_query_device(priv->ca, &attr))
> return ret;
double parens around assignment - also below
>- }
>
>- priv->cm.srq_ring = kzalloc(ipoib_recvq_size * sizeof
*priv->cm.srq_ring,
>- GFP_KERNEL);
>- if (!priv->cm.srq_ring) {
>- printk(KERN_WARNING "%s: failed to allocate CM ring (%d
>entries)\n",
>- priv->ca->name, ipoib_recvq_size);
>- ipoib_cm_dev_cleanup(dev);
>- return -ENOMEM;
>+ if (attr.max_srq) {
>+ /* This device supports SRQ */
>+ if (ret = create_srq(dev, priv))
>+ return ret;
>+ priv->cm.rx_index_table = NULL;
>+ } else {
>+ priv->cm.srq = NULL;
>+ priv->cm.srq_ring = NULL;
>+
>+ /* Every new REQ that arrives creates a struct ipoib_cm_rx.
>+ * These structures form a link list starting with the
>+ * passive_ids. For quick and easy access we maintain a table
>+ * of pointers to struct ipoib_cm_rx called the rx_index_table
>+ */
Why store the structures in a linked list if they're stored in a table?
>+ priv->cm.rx_index_table = kzalloc(NOSRQ_INDEX_TABLE_SIZE *
>+ sizeof *priv->cm.rx_index_table,
>+ GFP_KERNEL);
>+ if (!priv->cm.rx_index_table) {
>+ printk(KERN_WARNING "Failed to allocate
>NOSRQ_INDEX_TABLE\n");
>+ return -ENOMEM;
>+ }
>+
>+ spin_lock_init(&nosrq_count.lock);
>+ nosrq_count.current_rc_qp = 0;
> }
>
> for (i = 0; i < IPOIB_CM_RX_SG; ++i)
>@@ -1331,17 +1741,23 @@ int ipoib_cm_dev_init(struct net_device
> priv->cm.rx_wr.sg_list = priv->cm.rx_sge;
> priv->cm.rx_wr.num_sge = IPOIB_CM_RX_SG;
>
>- for (i = 0; i < ipoib_recvq_size; ++i) {
>- if (!ipoib_cm_alloc_rx_skb(dev, i, IPOIB_CM_RX_SG - 1,
>+ /* One can post receive buffers even before the RX QP is created
>+ * only in the SRQ case. Therefore for NOSRQ we skip the rest of init
>+ * and do that in ipoib_cm_req_handler() */
This is separate from this patch, but why not wait to post receives to a SRQ
only after we've received a REQ? Would this simplify the code any?
- Sean
More information about the general
mailing list