[ofa-general] [RFC PATCH] rds: enable rdma on iWARP

Jon Mason jon at opengridcomputing.com
Mon Jul 28 08:29:20 PDT 2008


I am able to get rds-rdma over iWARP to mostly work.  It still has a timing bug
that is causing the send wr ring to get corrupted, but it will run successfully
for a few seconds for rds-stress with rdma enabled.

This bulk of this patch is removing the pre-existing posting of the invalidate
logic and adding it prior to the fastreg send posting.  The previous logic
assumed that posting an invalidate to a dummy qp would successfully invalidate
the entry.  Unfortunately, the invalidate must be posted on the same qp as the
fastreg and the pre-existing logic does not have a way to get the qp the fastreg
is posted on.

The rest of the patch is cleaning up miscellaneous errors.

This patch is based off the future-20080715 branch.

Signed-Off-By: Jon Mason <jon at opengridcomputing.com>

diff --git a/net/rds/ib.h b/net/rds/ib.h
index 9e17075..6c2656f 100644
--- a/net/rds/ib.h
+++ b/net/rds/ib.h
@@ -74,6 +74,7 @@ struct rds_ib_mapping {
 	struct list_head	m_list;
 	struct rds_ib_mr *	m_mr;
 	uint32_t		m_rkey;
+	uint32_t		m_prev_rkey;
 	struct rds_ib_scatterlist m_sg;
 };
 
diff --git a/net/rds/ib_cm.c b/net/rds/ib_cm.c
index 9969504..70adee6 100644
--- a/net/rds/ib_cm.c
+++ b/net/rds/ib_cm.c
@@ -671,6 +671,7 @@ out:
 int rds_ib_conn_connect(struct rds_connection *conn)
 {
 	struct rds_ib_connection *ic = conn->c_transport_data;
+	struct rds_ib_device *rds_ibdev;
 	struct sockaddr_in src, dest;
 	int ret;
 
@@ -698,8 +699,11 @@ int rds_ib_conn_connect(struct rds_connection *conn)
 		goto out;
 	}
 
+	rds_ibdev = ib_get_client_data(ic->i_cm_id->device, &rds_ib_client);
+
 	/* Now check the device type and set i_iwarp */
 	ic->i_iwarp = (ic->i_cm_id->device->node_type == RDMA_NODE_RNIC);
+	ic->i_fastreg = rds_ibdev->use_fastreg;
 
 	dest.sin_family = AF_INET;
 	dest.sin_addr.s_addr = (__force u32)conn->c_faddr;
diff --git a/net/rds/ib_rdma.c b/net/rds/ib_rdma.c
index ee473ca..47b4e1d 100644
--- a/net/rds/ib_rdma.c
+++ b/net/rds/ib_rdma.c
@@ -95,13 +95,6 @@ struct rds_ib_mr_pool {
 	unsigned long		max_free_pinned;
 	struct ib_fmr_attr	fmr_attr;
 
-	/* Dummy QP used to handle invalidate for fastreg */
-	struct {
-	    struct ib_qp	*qp;
-	    struct rds_ib_inv_wr *send_wrs;
-	    struct rds_ib_work_ring send_ring, recv_ring;
-	} fastreg;
-
 	struct rds_ib_mr_pool_ops *op;
 };
 
@@ -139,7 +132,6 @@ static unsigned int rds_ib_unmap_fastreg_list(struct rds_ib_mr_pool *pool,
 static void rds_ib_destroy_fastreg(struct rds_ib_mr_pool *pool, struct rds_ib_mr *ibmr);
 static int rds_ib_local_invalidate(struct rds_ib_mr_pool *pool,
 		struct rds_ib_mapping *mapping);
-static void rds_ib_inval_cq_handler(struct ib_cq *cq, void *context);
 
 static struct rds_ib_mr_pool_ops rds_ib_fmr_pool_ops = {
 	.init		= rds_ib_init_fmr,
@@ -393,23 +385,8 @@ struct rds_ib_mr_pool *rds_ib_create_mr_pool(struct rds_ib_device *rds_ibdev)
 					pool_size,
 					&rds_ib_fastreg_pool_ops);
 
-		if (!IS_ERR(pool)) {
-			/* Fill in the blanks:
-			 *  create a dummy QP to which we can post LOCAL_INV
-			 *  requests when invalidating MRs
-			 */
-			rds_ib_ring_init(&pool->fastreg.send_ring, 64);
-			rds_ib_ring_init(&pool->fastreg.recv_ring, 64);
-			pool->fastreg.qp = rds_ib_create_qp(rds_ibdev,
-					&pool->fastreg.send_ring,
-					rds_ib_inval_cq_handler,
-					&pool->fastreg.recv_ring,
-					NULL,
-					pool);
-
-			if (IS_ERR(pool->fastreg.qp))
-				BUG(); /* FIXME handle gracefully */
-			/* FIXME allocate pool->fasteg.send_wrs */
+		if (IS_ERR(pool)) {
+			printk("__rds_ib_create_mr_pool error\n");
 		}
 	}
 
@@ -430,10 +407,6 @@ void rds_ib_destroy_mr_pool(struct rds_ib_mr_pool *pool)
 	rds_ib_flush_mr_pool(pool, 1);
 	BUG_ON(atomic_read(&pool->item_count));
 	BUG_ON(atomic_read(&pool->free_pinned));
-
-	if (pool->fastreg.qp)
-		ib_destroy_qp(pool->fastreg.qp);
-
 	kfree(pool);
 }
 
@@ -697,7 +670,7 @@ void *rds_ib_get_mr(struct scatterlist *sg, unsigned long nents,
 
 	ret = pool->op->map(pool, ibmr, sg, nents);
 	if (ret == 0)
-		*key_ret = ibmr->u.fmr->rkey;
+		*key_ret = rds_ibdev->dev->node_type == RDMA_NODE_RNIC ? ibmr->fr_mr->rkey : ibmr->u.fmr->rkey;
 	else
 		printk(KERN_WARNING "RDS/IB: failed to map mr (errno=%d)\n", ret);
 
@@ -908,10 +881,13 @@ static int rds_ib_map_fastreg(struct rds_ib_mr_pool *pool,
 
 	rds_ib_set_scatterlist(&mapping->m_sg, sg, sg_len);
 
+	ibmr->fr_page_shift = rds_ibdev->fmr_page_shift;	/* XXX really? */
+
 	dma_pages = rds_ib_map_scatterlist(rds_ibdev,
 				&mapping->m_sg,
 				ibmr->fr_page_shift);
 	if (IS_ERR(dma_pages)) {
+		printk("rds_ib_map_scatterlist failed \n");
 		ret = PTR_ERR(dma_pages);
 		dma_pages = NULL;
 		goto out;
@@ -923,11 +899,11 @@ static int rds_ib_map_fastreg(struct rds_ib_mr_pool *pool,
 	}
 
 	ibmr->fr_page_list_len = mapping->m_sg.dma_len;
-	ibmr->fr_page_shift = rds_ibdev->fmr_page_shift;	/* XXX really? */
 
 	for (i = 0; i < mapping->m_sg.dma_npages; ++i)
 		ibmr->fr_page_list->page_list[i] = dma_pages[i];
 
+	mapping->m_prev_rkey = ibmr->fr_mr->rkey;
 	ib_update_fast_reg_key(ibmr->fr_mr, ibmr->remap_count++);
 
 	mapping->m_rkey = ibmr->fr_mr->rkey;
@@ -969,7 +945,7 @@ static void rds_ib_free_fastreg(struct rds_ib_mr_pool *pool,
 		rds_ib_local_inv_complete(pool, &ibmr->mapping, IB_WC_SUCCESS);
 
 		spin_lock_irqsave(&pool->list_lock, flags);
-		list_add(&mapping->m_list, &pool->clean_list);
+		list_add(&ibmr->mapping.m_list, &pool->clean_list);
 		spin_unlock_irqrestore(&pool->list_lock, flags);
 		return;
 
@@ -1053,7 +1029,8 @@ struct rds_ib_mapping *rds_ib_rdma_get_mapping(struct rds_mr *mr)
 		/* Okay, we should register the mapping now.
 		 * Set map_seq so the flush worker knows whether a
 		 * mapping is newer */
-		ibmr->map_seq = atomic_read(&ibmr->pool->flush_seq);
+		if (ibmr->pool)
+			ibmr->map_seq = atomic_read(&ibmr->pool->flush_seq);
 		return mapping;
 
 	case RDS_IB_MAP_MAPPING:
@@ -1077,33 +1054,19 @@ struct rds_ib_mapping *rds_ib_rdma_get_mapping(struct rds_mr *mr)
  */
 int rds_ib_local_invalidate(struct rds_ib_mr_pool *pool, struct rds_ib_mapping *mapping)
 {
-	struct rds_ib_inv_wr *inval;
-	struct ib_send_wr *failed_wr;
 	unsigned long flags;
-	u32 pos;
-	int ret;
-
-	if (!rds_ib_ring_alloc(&pool->fastreg.send_ring, 1, &pos))
-		return 0;
 
-	inval = &pool->fastreg.send_wrs[pos];
-
-	memset(inval, 0, sizeof(*inval));
-	inval->i_wr.wr_id = pos;
-	inval->i_wr.opcode = IB_WR_LOCAL_INV;
-	inval->i_wr.ex.invalidate_rkey = mapping->m_rkey;
-	inval->i_mapping = mapping;
+	/*FIXME - potential problem.  We are invalidating the fastreg prior to mapping
+	 * it, but it should be done on the qp being used.  Unfortunately, we cannot
+	 * get to there from here.  So, lie to the state machine for now, as the
+	 * mapping will be invalidated eventually.
+	 */
 
 	spin_lock_irqsave(&mapping->m_lock, flags);
-	ret = ib_post_send(pool->fastreg.qp, &inval->i_wr, &failed_wr);
-	if (ret == 0) {
-		mapping->m_state = RDS_IB_MAP_UNMAPPING;
-	} else {
-		rds_ib_ring_unalloc(&pool->fastreg.send_ring, 1);
-	}
+	mapping->m_state = RDS_IB_MAP_UNMAPPING;
 	spin_unlock_irqrestore(&mapping->m_lock, flags);
 
-	return ret == 0;
+	return 1;
 }
 
 void rds_ib_local_inv_complete(struct rds_ib_mr_pool *pool,
@@ -1126,23 +1089,6 @@ void rds_ib_local_inv_complete(struct rds_ib_mr_pool *pool,
 	}
 }
 
-static void rds_ib_inval_cq_handler(struct ib_cq *cq, void *context)
-{
-	struct rds_ib_mr_pool *pool = context;
-	struct ib_wc wc;
-
-	ib_req_notify_cq(cq, IB_CQ_NEXT_COMP);
-	while (ib_poll_cq(cq, 1, &wc) > 0) {
-		struct rds_ib_inv_wr *wr;
-
-		wr = &pool->fastreg.send_wrs[wc.wr_id];
-		rds_ib_local_inv_complete(pool, wr->i_mapping, wc.status);
-	}
-
-	if (waitqueue_active(&pool->flush_waitq))
-		wake_up(&pool->flush_waitq);
-}
-
 void rds_ib_rdma_build_fastreg(struct ib_send_wr *wr, struct rds_ib_mapping *mapping)
 {
 	struct rds_ib_mr *ibmr = mapping->m_mr;
@@ -1156,6 +1102,7 @@ void rds_ib_rdma_build_fastreg(struct ib_send_wr *wr, struct rds_ib_mapping *map
 	wr->wr.fast_reg.access_flags = IB_ACCESS_LOCAL_WRITE |
 				IB_ACCESS_REMOTE_READ |
 				IB_ACCESS_REMOTE_WRITE;
+	wr->wr.fast_reg.iova_start = 0;
 }
 
 void rds_ib_fast_reg_complete(struct rds_ib_mapping *mapping, int status)
@@ -1206,9 +1153,6 @@ static unsigned int rds_ib_unmap_fastreg_list(struct rds_ib_mr_pool *pool,
 	while (!list_empty(unmap_list)) {
 		unsigned long flags;
 
-		wait_event(pool->flush_waitq,
-				rds_ib_ring_empty(&pool->fastreg.send_ring));
-
 		spin_lock_irqsave(&pool->list_lock, flags);
 		list_for_each_entry_safe(mapping, next, unmap_list, m_list) {
 			switch (mapping->m_state) {
diff --git a/net/rds/ib_recv.c b/net/rds/ib_recv.c
index 9f72556..c2fb039 100644
--- a/net/rds/ib_recv.c
+++ b/net/rds/ib_recv.c
@@ -102,7 +102,6 @@ void rds_ib_recv_init_ring(struct rds_ib_connection *ic)
 		sge = rds_ib_header_sge(ic, recv->r_sge);
 		sge->addr = ic->i_recv_hdrs_dma + (i * sizeof(struct rds_header));
 		sge->length = sizeof(struct rds_header);
-		sge->lkey = ic->i_mr->lkey;
 	}
 }
 
diff --git a/net/rds/ib_send.c b/net/rds/ib_send.c
index 4878d3b..870c697 100644
--- a/net/rds/ib_send.c
+++ b/net/rds/ib_send.c
@@ -137,6 +137,7 @@ void rds_ib_send_init_ring(struct rds_ib_connection *ic)
 		send->s_op = NULL;
 		send->s_mapping = NULL;
 
+		send->s_wr.next = NULL;
 		send->s_wr.wr_id = i;
 		send->s_wr.sg_list = send->s_sge;
 		send->s_wr.num_sge = 1;
@@ -221,6 +222,7 @@ void rds_ib_send_cq_comp_handler(struct ib_cq *cq, void *context)
 				if (send->s_rm)
 					rds_ib_send_unmap_rm(ic, send, wc.status);
 				break;
+			case IB_WR_LOCAL_INV:
 			case IB_WR_RDMA_WRITE:
 			case IB_WR_RDMA_READ:
 				/* Nothing to be done - the SG list will be unmapped
@@ -233,8 +235,8 @@ void rds_ib_send_cq_comp_handler(struct ib_cq *cq, void *context)
 			default:
 				if (printk_ratelimit())
 					printk(KERN_NOTICE
-						"RDS/IB: %s: unexpected opcode 0x%x in WR!\n",
-						__FUNCTION__, send->s_wr.opcode);
+						"RDS/IB: %s: unexpected opcode 0x%x in WR! %d\n",
+						__FUNCTION__, send->s_wr.opcode, wc.opcode);
 				break;
 			}
 
@@ -253,7 +255,6 @@ void rds_ib_send_cq_comp_handler(struct ib_cq *cq, void *context)
 				if ((rm = rds_send_get_message(conn, send->s_op)) != NULL)
 					rds_ib_send_rdma_complete(rm, wc.status);
 			}
-
 			oldest = (oldest + 1) % ic->i_send_ring.w_nr;
 		}
 
@@ -866,7 +867,7 @@ int rds_ib_xmit_fastreg(struct rds_connection *conn, struct rds_mr *mr)
 {
 	struct rds_ib_mapping *mapping;
 	struct rds_ib_connection *ic = conn->c_transport_data;
-	struct rds_ib_send_work *send = NULL;
+	struct rds_ib_send_work *send1 = NULL, *send2 = NULL;
 	struct ib_send_wr *failed_wr;
 	u32 pos;
 	u32 work_alloc = 0;
@@ -885,31 +886,50 @@ int rds_ib_xmit_fastreg(struct rds_connection *conn, struct rds_mr *mr)
 	 * inside the fast_reg_mr WR.  The key used is a rolling 8bit
 	 * counter, which should guarantee uniqueness.
 	 */
-	work_alloc = rds_ib_ring_alloc(&ic->i_send_ring, 1, &pos);
-	if (work_alloc == 0) {
+	work_alloc = rds_ib_ring_alloc(&ic->i_send_ring, 2, &pos);
+	if (work_alloc < 2) {
 		rds_ib_ring_unalloc(&ic->i_send_ring, work_alloc);
 		rds_ib_stats_inc(s_ib_tx_ring_full);
 		ret = -ENOMEM;
 		goto out;
 	}
+	send1 = &ic->i_sends[pos];
+	send2 = &ic->i_sends[pos+1];
 
-	send = &ic->i_sends[pos];
+	send1->s_wr.opcode = IB_WR_LOCAL_INV;
+	send1->s_wr.ex.invalidate_rkey = mapping->m_prev_rkey;
+	send1->s_wr.send_flags = IB_SEND_SIGNALED;
+	send1->s_queued = jiffies;
 
-	memset(send, 0, sizeof(*send));
-	rds_ib_rdma_build_fastreg(&send->s_wr, mapping);
-	send->s_mapping = mapping;
-	send->s_queued = jiffies;
+	rds_ib_rdma_build_fastreg(&send2->s_wr, mapping);
+	send2->s_wr.send_flags = IB_SEND_SIGNALED;
+	send2->s_mapping = mapping;
+	send2->s_queued = jiffies;
+
+
+	failed_wr = &send1->s_wr;
+	ret = ib_post_send(ic->i_cm_id->qp, &send1->s_wr, &failed_wr);
+
+	rdsdebug("ic %p send %p (wr %p) ret %d wr %p\n", ic,
+		 send1, &send1->s_wr, ret, failed_wr);
+	BUG_ON(failed_wr != &send1->s_wr);
+	if (ret) {
+		printk(KERN_WARNING "RDS/IB: fastreg ib_post_send to %u.%u.%u.%u "
+		       "returned %d\n", NIPQUAD(conn->c_faddr), ret);
+		rds_ib_ring_unalloc(&ic->i_send_ring, 2);
+		return ret;
+	}
 
-	failed_wr = &send->s_wr;
-	ret = ib_post_send(ic->i_cm_id->qp, &send->s_wr, &failed_wr);
+	failed_wr = &send2->s_wr;
+	ret = ib_post_send(ic->i_cm_id->qp, &send2->s_wr, &failed_wr);
 
 	rdsdebug("ic %p send %p (wr %p) ret %d wr %p\n", ic,
-		 send, &send->s_wr, ret, failed_wr);
-	BUG_ON(failed_wr != &send->s_wr);
+		 send2, &send2->s_wr, ret, failed_wr);
+	BUG_ON(failed_wr != &send2->s_wr);
 	if (ret) {
 		printk(KERN_WARNING "RDS/IB: fastreg ib_post_send to %u.%u.%u.%u "
 		       "returned %d\n", NIPQUAD(conn->c_faddr), ret);
-		rds_ib_ring_unalloc(&ic->i_send_ring, 1);
+		rds_ib_ring_unalloc(&ic->i_send_ring, 2);
 		return ret;
 	}
 



More information about the general mailing list