[ofa-general] [RFC PATCH] rds: enable rdma on iWARP
Jon Mason
jon at opengridcomputing.com
Mon Jul 28 08:29:20 PDT 2008
I am able to get rds-rdma over iWARP to mostly work. It still has a timing bug
that is causing the send wr ring to get corrupted, but it will run successfully
for a few seconds for rds-stress with rdma enabled.
This bulk of this patch is removing the pre-existing posting of the invalidate
logic and adding it prior to the fastreg send posting. The previous logic
assumed that posting an invalidate to a dummy qp would successfully invalidate
the entry. Unfortunately, the invalidate must be posted on the same qp as the
fastreg and the pre-existing logic does not have a way to get the qp the fastreg
is posted on.
The rest of the patch is cleaning up miscellaneous errors.
This patch is based off the future-20080715 branch.
Signed-Off-By: Jon Mason <jon at opengridcomputing.com>
diff --git a/net/rds/ib.h b/net/rds/ib.h
index 9e17075..6c2656f 100644
--- a/net/rds/ib.h
+++ b/net/rds/ib.h
@@ -74,6 +74,7 @@ struct rds_ib_mapping {
struct list_head m_list;
struct rds_ib_mr * m_mr;
uint32_t m_rkey;
+ uint32_t m_prev_rkey;
struct rds_ib_scatterlist m_sg;
};
diff --git a/net/rds/ib_cm.c b/net/rds/ib_cm.c
index 9969504..70adee6 100644
--- a/net/rds/ib_cm.c
+++ b/net/rds/ib_cm.c
@@ -671,6 +671,7 @@ out:
int rds_ib_conn_connect(struct rds_connection *conn)
{
struct rds_ib_connection *ic = conn->c_transport_data;
+ struct rds_ib_device *rds_ibdev;
struct sockaddr_in src, dest;
int ret;
@@ -698,8 +699,11 @@ int rds_ib_conn_connect(struct rds_connection *conn)
goto out;
}
+ rds_ibdev = ib_get_client_data(ic->i_cm_id->device, &rds_ib_client);
+
/* Now check the device type and set i_iwarp */
ic->i_iwarp = (ic->i_cm_id->device->node_type == RDMA_NODE_RNIC);
+ ic->i_fastreg = rds_ibdev->use_fastreg;
dest.sin_family = AF_INET;
dest.sin_addr.s_addr = (__force u32)conn->c_faddr;
diff --git a/net/rds/ib_rdma.c b/net/rds/ib_rdma.c
index ee473ca..47b4e1d 100644
--- a/net/rds/ib_rdma.c
+++ b/net/rds/ib_rdma.c
@@ -95,13 +95,6 @@ struct rds_ib_mr_pool {
unsigned long max_free_pinned;
struct ib_fmr_attr fmr_attr;
- /* Dummy QP used to handle invalidate for fastreg */
- struct {
- struct ib_qp *qp;
- struct rds_ib_inv_wr *send_wrs;
- struct rds_ib_work_ring send_ring, recv_ring;
- } fastreg;
-
struct rds_ib_mr_pool_ops *op;
};
@@ -139,7 +132,6 @@ static unsigned int rds_ib_unmap_fastreg_list(struct rds_ib_mr_pool *pool,
static void rds_ib_destroy_fastreg(struct rds_ib_mr_pool *pool, struct rds_ib_mr *ibmr);
static int rds_ib_local_invalidate(struct rds_ib_mr_pool *pool,
struct rds_ib_mapping *mapping);
-static void rds_ib_inval_cq_handler(struct ib_cq *cq, void *context);
static struct rds_ib_mr_pool_ops rds_ib_fmr_pool_ops = {
.init = rds_ib_init_fmr,
@@ -393,23 +385,8 @@ struct rds_ib_mr_pool *rds_ib_create_mr_pool(struct rds_ib_device *rds_ibdev)
pool_size,
&rds_ib_fastreg_pool_ops);
- if (!IS_ERR(pool)) {
- /* Fill in the blanks:
- * create a dummy QP to which we can post LOCAL_INV
- * requests when invalidating MRs
- */
- rds_ib_ring_init(&pool->fastreg.send_ring, 64);
- rds_ib_ring_init(&pool->fastreg.recv_ring, 64);
- pool->fastreg.qp = rds_ib_create_qp(rds_ibdev,
- &pool->fastreg.send_ring,
- rds_ib_inval_cq_handler,
- &pool->fastreg.recv_ring,
- NULL,
- pool);
-
- if (IS_ERR(pool->fastreg.qp))
- BUG(); /* FIXME handle gracefully */
- /* FIXME allocate pool->fasteg.send_wrs */
+ if (IS_ERR(pool)) {
+ printk("__rds_ib_create_mr_pool error\n");
}
}
@@ -430,10 +407,6 @@ void rds_ib_destroy_mr_pool(struct rds_ib_mr_pool *pool)
rds_ib_flush_mr_pool(pool, 1);
BUG_ON(atomic_read(&pool->item_count));
BUG_ON(atomic_read(&pool->free_pinned));
-
- if (pool->fastreg.qp)
- ib_destroy_qp(pool->fastreg.qp);
-
kfree(pool);
}
@@ -697,7 +670,7 @@ void *rds_ib_get_mr(struct scatterlist *sg, unsigned long nents,
ret = pool->op->map(pool, ibmr, sg, nents);
if (ret == 0)
- *key_ret = ibmr->u.fmr->rkey;
+ *key_ret = rds_ibdev->dev->node_type == RDMA_NODE_RNIC ? ibmr->fr_mr->rkey : ibmr->u.fmr->rkey;
else
printk(KERN_WARNING "RDS/IB: failed to map mr (errno=%d)\n", ret);
@@ -908,10 +881,13 @@ static int rds_ib_map_fastreg(struct rds_ib_mr_pool *pool,
rds_ib_set_scatterlist(&mapping->m_sg, sg, sg_len);
+ ibmr->fr_page_shift = rds_ibdev->fmr_page_shift; /* XXX really? */
+
dma_pages = rds_ib_map_scatterlist(rds_ibdev,
&mapping->m_sg,
ibmr->fr_page_shift);
if (IS_ERR(dma_pages)) {
+ printk("rds_ib_map_scatterlist failed \n");
ret = PTR_ERR(dma_pages);
dma_pages = NULL;
goto out;
@@ -923,11 +899,11 @@ static int rds_ib_map_fastreg(struct rds_ib_mr_pool *pool,
}
ibmr->fr_page_list_len = mapping->m_sg.dma_len;
- ibmr->fr_page_shift = rds_ibdev->fmr_page_shift; /* XXX really? */
for (i = 0; i < mapping->m_sg.dma_npages; ++i)
ibmr->fr_page_list->page_list[i] = dma_pages[i];
+ mapping->m_prev_rkey = ibmr->fr_mr->rkey;
ib_update_fast_reg_key(ibmr->fr_mr, ibmr->remap_count++);
mapping->m_rkey = ibmr->fr_mr->rkey;
@@ -969,7 +945,7 @@ static void rds_ib_free_fastreg(struct rds_ib_mr_pool *pool,
rds_ib_local_inv_complete(pool, &ibmr->mapping, IB_WC_SUCCESS);
spin_lock_irqsave(&pool->list_lock, flags);
- list_add(&mapping->m_list, &pool->clean_list);
+ list_add(&ibmr->mapping.m_list, &pool->clean_list);
spin_unlock_irqrestore(&pool->list_lock, flags);
return;
@@ -1053,7 +1029,8 @@ struct rds_ib_mapping *rds_ib_rdma_get_mapping(struct rds_mr *mr)
/* Okay, we should register the mapping now.
* Set map_seq so the flush worker knows whether a
* mapping is newer */
- ibmr->map_seq = atomic_read(&ibmr->pool->flush_seq);
+ if (ibmr->pool)
+ ibmr->map_seq = atomic_read(&ibmr->pool->flush_seq);
return mapping;
case RDS_IB_MAP_MAPPING:
@@ -1077,33 +1054,19 @@ struct rds_ib_mapping *rds_ib_rdma_get_mapping(struct rds_mr *mr)
*/
int rds_ib_local_invalidate(struct rds_ib_mr_pool *pool, struct rds_ib_mapping *mapping)
{
- struct rds_ib_inv_wr *inval;
- struct ib_send_wr *failed_wr;
unsigned long flags;
- u32 pos;
- int ret;
-
- if (!rds_ib_ring_alloc(&pool->fastreg.send_ring, 1, &pos))
- return 0;
- inval = &pool->fastreg.send_wrs[pos];
-
- memset(inval, 0, sizeof(*inval));
- inval->i_wr.wr_id = pos;
- inval->i_wr.opcode = IB_WR_LOCAL_INV;
- inval->i_wr.ex.invalidate_rkey = mapping->m_rkey;
- inval->i_mapping = mapping;
+ /*FIXME - potential problem. We are invalidating the fastreg prior to mapping
+ * it, but it should be done on the qp being used. Unfortunately, we cannot
+ * get to there from here. So, lie to the state machine for now, as the
+ * mapping will be invalidated eventually.
+ */
spin_lock_irqsave(&mapping->m_lock, flags);
- ret = ib_post_send(pool->fastreg.qp, &inval->i_wr, &failed_wr);
- if (ret == 0) {
- mapping->m_state = RDS_IB_MAP_UNMAPPING;
- } else {
- rds_ib_ring_unalloc(&pool->fastreg.send_ring, 1);
- }
+ mapping->m_state = RDS_IB_MAP_UNMAPPING;
spin_unlock_irqrestore(&mapping->m_lock, flags);
- return ret == 0;
+ return 1;
}
void rds_ib_local_inv_complete(struct rds_ib_mr_pool *pool,
@@ -1126,23 +1089,6 @@ void rds_ib_local_inv_complete(struct rds_ib_mr_pool *pool,
}
}
-static void rds_ib_inval_cq_handler(struct ib_cq *cq, void *context)
-{
- struct rds_ib_mr_pool *pool = context;
- struct ib_wc wc;
-
- ib_req_notify_cq(cq, IB_CQ_NEXT_COMP);
- while (ib_poll_cq(cq, 1, &wc) > 0) {
- struct rds_ib_inv_wr *wr;
-
- wr = &pool->fastreg.send_wrs[wc.wr_id];
- rds_ib_local_inv_complete(pool, wr->i_mapping, wc.status);
- }
-
- if (waitqueue_active(&pool->flush_waitq))
- wake_up(&pool->flush_waitq);
-}
-
void rds_ib_rdma_build_fastreg(struct ib_send_wr *wr, struct rds_ib_mapping *mapping)
{
struct rds_ib_mr *ibmr = mapping->m_mr;
@@ -1156,6 +1102,7 @@ void rds_ib_rdma_build_fastreg(struct ib_send_wr *wr, struct rds_ib_mapping *map
wr->wr.fast_reg.access_flags = IB_ACCESS_LOCAL_WRITE |
IB_ACCESS_REMOTE_READ |
IB_ACCESS_REMOTE_WRITE;
+ wr->wr.fast_reg.iova_start = 0;
}
void rds_ib_fast_reg_complete(struct rds_ib_mapping *mapping, int status)
@@ -1206,9 +1153,6 @@ static unsigned int rds_ib_unmap_fastreg_list(struct rds_ib_mr_pool *pool,
while (!list_empty(unmap_list)) {
unsigned long flags;
- wait_event(pool->flush_waitq,
- rds_ib_ring_empty(&pool->fastreg.send_ring));
-
spin_lock_irqsave(&pool->list_lock, flags);
list_for_each_entry_safe(mapping, next, unmap_list, m_list) {
switch (mapping->m_state) {
diff --git a/net/rds/ib_recv.c b/net/rds/ib_recv.c
index 9f72556..c2fb039 100644
--- a/net/rds/ib_recv.c
+++ b/net/rds/ib_recv.c
@@ -102,7 +102,6 @@ void rds_ib_recv_init_ring(struct rds_ib_connection *ic)
sge = rds_ib_header_sge(ic, recv->r_sge);
sge->addr = ic->i_recv_hdrs_dma + (i * sizeof(struct rds_header));
sge->length = sizeof(struct rds_header);
- sge->lkey = ic->i_mr->lkey;
}
}
diff --git a/net/rds/ib_send.c b/net/rds/ib_send.c
index 4878d3b..870c697 100644
--- a/net/rds/ib_send.c
+++ b/net/rds/ib_send.c
@@ -137,6 +137,7 @@ void rds_ib_send_init_ring(struct rds_ib_connection *ic)
send->s_op = NULL;
send->s_mapping = NULL;
+ send->s_wr.next = NULL;
send->s_wr.wr_id = i;
send->s_wr.sg_list = send->s_sge;
send->s_wr.num_sge = 1;
@@ -221,6 +222,7 @@ void rds_ib_send_cq_comp_handler(struct ib_cq *cq, void *context)
if (send->s_rm)
rds_ib_send_unmap_rm(ic, send, wc.status);
break;
+ case IB_WR_LOCAL_INV:
case IB_WR_RDMA_WRITE:
case IB_WR_RDMA_READ:
/* Nothing to be done - the SG list will be unmapped
@@ -233,8 +235,8 @@ void rds_ib_send_cq_comp_handler(struct ib_cq *cq, void *context)
default:
if (printk_ratelimit())
printk(KERN_NOTICE
- "RDS/IB: %s: unexpected opcode 0x%x in WR!\n",
- __FUNCTION__, send->s_wr.opcode);
+ "RDS/IB: %s: unexpected opcode 0x%x in WR! %d\n",
+ __FUNCTION__, send->s_wr.opcode, wc.opcode);
break;
}
@@ -253,7 +255,6 @@ void rds_ib_send_cq_comp_handler(struct ib_cq *cq, void *context)
if ((rm = rds_send_get_message(conn, send->s_op)) != NULL)
rds_ib_send_rdma_complete(rm, wc.status);
}
-
oldest = (oldest + 1) % ic->i_send_ring.w_nr;
}
@@ -866,7 +867,7 @@ int rds_ib_xmit_fastreg(struct rds_connection *conn, struct rds_mr *mr)
{
struct rds_ib_mapping *mapping;
struct rds_ib_connection *ic = conn->c_transport_data;
- struct rds_ib_send_work *send = NULL;
+ struct rds_ib_send_work *send1 = NULL, *send2 = NULL;
struct ib_send_wr *failed_wr;
u32 pos;
u32 work_alloc = 0;
@@ -885,31 +886,50 @@ int rds_ib_xmit_fastreg(struct rds_connection *conn, struct rds_mr *mr)
* inside the fast_reg_mr WR. The key used is a rolling 8bit
* counter, which should guarantee uniqueness.
*/
- work_alloc = rds_ib_ring_alloc(&ic->i_send_ring, 1, &pos);
- if (work_alloc == 0) {
+ work_alloc = rds_ib_ring_alloc(&ic->i_send_ring, 2, &pos);
+ if (work_alloc < 2) {
rds_ib_ring_unalloc(&ic->i_send_ring, work_alloc);
rds_ib_stats_inc(s_ib_tx_ring_full);
ret = -ENOMEM;
goto out;
}
+ send1 = &ic->i_sends[pos];
+ send2 = &ic->i_sends[pos+1];
- send = &ic->i_sends[pos];
+ send1->s_wr.opcode = IB_WR_LOCAL_INV;
+ send1->s_wr.ex.invalidate_rkey = mapping->m_prev_rkey;
+ send1->s_wr.send_flags = IB_SEND_SIGNALED;
+ send1->s_queued = jiffies;
- memset(send, 0, sizeof(*send));
- rds_ib_rdma_build_fastreg(&send->s_wr, mapping);
- send->s_mapping = mapping;
- send->s_queued = jiffies;
+ rds_ib_rdma_build_fastreg(&send2->s_wr, mapping);
+ send2->s_wr.send_flags = IB_SEND_SIGNALED;
+ send2->s_mapping = mapping;
+ send2->s_queued = jiffies;
+
+
+ failed_wr = &send1->s_wr;
+ ret = ib_post_send(ic->i_cm_id->qp, &send1->s_wr, &failed_wr);
+
+ rdsdebug("ic %p send %p (wr %p) ret %d wr %p\n", ic,
+ send1, &send1->s_wr, ret, failed_wr);
+ BUG_ON(failed_wr != &send1->s_wr);
+ if (ret) {
+ printk(KERN_WARNING "RDS/IB: fastreg ib_post_send to %u.%u.%u.%u "
+ "returned %d\n", NIPQUAD(conn->c_faddr), ret);
+ rds_ib_ring_unalloc(&ic->i_send_ring, 2);
+ return ret;
+ }
- failed_wr = &send->s_wr;
- ret = ib_post_send(ic->i_cm_id->qp, &send->s_wr, &failed_wr);
+ failed_wr = &send2->s_wr;
+ ret = ib_post_send(ic->i_cm_id->qp, &send2->s_wr, &failed_wr);
rdsdebug("ic %p send %p (wr %p) ret %d wr %p\n", ic,
- send, &send->s_wr, ret, failed_wr);
- BUG_ON(failed_wr != &send->s_wr);
+ send2, &send2->s_wr, ret, failed_wr);
+ BUG_ON(failed_wr != &send2->s_wr);
if (ret) {
printk(KERN_WARNING "RDS/IB: fastreg ib_post_send to %u.%u.%u.%u "
"returned %d\n", NIPQUAD(conn->c_faddr), ret);
- rds_ib_ring_unalloc(&ic->i_send_ring, 1);
+ rds_ib_ring_unalloc(&ic->i_send_ring, 2);
return ret;
}
More information about the general
mailing list