[ofa-general] [PATCH RFC] rds: add iwarp support
Jon Mason
jon at opengridcomputing.com
Mon Jul 7 09:55:55 PDT 2008
On Mon, Jul 07, 2008 at 02:40:45PM +0200, Olaf Kirch wrote:
> On Thursday 03 July 2008 23:34:12 Jon Mason wrote:
> > This patch adds support for running RDS over iWARP adapters. It
> Hi Jon,
>
> I took your patch and tried to isolate the iWARP specific changes
> in bcopy mode, and roll them into a smaller patch that doesn't duplicate
> all the ib*.[hc] files.
>
> I also tried to come to some working solution for RDMA - as you can
> see from the deluge of messages I wrote on this :-) the approach you
> chose has some problems.
>
> Please take a look at the attached patch and let me know whether
> (a) bcopy mode works, and (b) if the rdma approach may work with
> iwarp nics.
It doesn't seem to work. After looking through the code, the
check_laddr will fail because it is still looking for the IB arp and not
the inet arp needed by iWARP. I hacked around that but it is still not
working. I'll look through the patch more and see if I can determine
what is still breaking.
Thanks,
Jon
>
> Olaf
>
> --
> Olaf Kirch | --- o --- Nous sommes du soleil we love when we play
> okir at lst.de | / | \ sol.dhoop.naytheet.ah kin.ir.samse.qurax
> From: Olaf Kirch <olaf.kirch at oracle.com>
> Subject: [PATCH RFC] RDS: Add iWARP Support
>
> This is based on the work posted by Jon Mason. It extracts
> the iWARP-specific changes that are needed to support bcopy
> mode (I hope I caught all of them).
>
> I also did some work on RDMA support. This is a lot harder,
> because the interface and implementation were designed with
> classic MRs in mind. However, I think the approach taken below
> may result in a working approach (it's not working yet - I left
> some blanks and BUG() asserts in there, because I wanted to get this
> patch out as a RFC sooner rather than later).
>
> Also, this is a pretty large patch - it needs to be broken down into
> half a dozen or so smaller functional changes for better review.
>
> Olaf
> ---
> net/rds/ib.c | 30 ++
> net/rds/ib.h | 55 ++++
> net/rds/ib_cm.c | 36 ++-
> net/rds/ib_rdma.c | 610 +++++++++++++++++++++++++++++++++++++++++++++---------
> net/rds/ib_recv.c | 2
> net/rds/ib_send.c | 133 +++++++++++
> net/rds/message.c | 2
> net/rds/rdma.c | 17 -
> net/rds/rdma.h | 7
> net/rds/rds.h | 4
> net/rds/send.c | 7
> 11 files changed, 778 insertions(+), 125 deletions(-)
>
> Index: build-2.6/net/rds/ib.c
> ===================================================================
> --- build-2.6.orig/net/rds/ib.c
> +++ build-2.6/net/rds/ib.c
> @@ -42,6 +42,7 @@
>
> unsigned int fmr_pool_size = RDS_FMR_POOL_SIZE;
> unsigned int fmr_message_size = RDS_FMR_SIZE + 1; /* +1 allows for unaligned MRs */
> +unsigned int fastreg_pool_size = RDS_FMR_POOL_SIZE;
>
> module_param(fmr_pool_size, int, 0444);
> MODULE_PARM_DESC(fmr_pool_size, " Max number of fmr per HCA");
> @@ -85,21 +86,38 @@ void rds_ib_add_one(struct ib_device *de
> rds_ibdev->fmr_page_size = 1 << rds_ibdev->fmr_page_shift;
> rds_ibdev->fmr_page_mask = ~((u64) rds_ibdev->fmr_page_size - 1);
> rds_ibdev->fmr_max_remaps = dev_attr->max_map_per_fmr?: 32;
> - rds_ibdev->max_fmrs = dev_attr->max_fmr?
> - min_t(unsigned int, dev_attr->max_fmr, fmr_pool_size) :
> - fmr_pool_size;
> + rds_ibdev->max_fmrs = dev_attr->max_fmr;
>
> rds_ibdev->dev = device;
> rds_ibdev->pd = ib_alloc_pd(device);
> if (IS_ERR(rds_ibdev->pd))
> goto free_dev;
>
> - rds_ibdev->mr = ib_get_dma_mr(rds_ibdev->pd,
> - IB_ACCESS_LOCAL_WRITE);
> + if (device->node_type != RDMA_NODE_RNIC) {
> + rds_ibdev->mr = ib_get_dma_mr(rds_ibdev->pd,
> + IB_ACCESS_LOCAL_WRITE);
> + } else {
> + /* Why does it have to have these permissions? */
> + rds_ibdev->mr = ib_get_dma_mr(rds_ibdev->pd,
> + IB_ACCESS_REMOTE_READ |
> + IB_ACCESS_REMOTE_WRITE |
> + IB_ACCESS_LOCAL_WRITE);
> + }
> if (IS_ERR(rds_ibdev->mr))
> goto err_pd;
>
> - rds_ibdev->mr_pool = rds_ib_create_mr_pool(rds_ibdev);
> + /* Create the MR pool. We choose different strategies for
> + * MRs depending on the hardware.
> + */
> + if (dev_attr->device_cap_flags & IB_DEVICE_MEM_MGT_EXTENSIONS) {
> + /* Use fast registrations */
> + rds_ibdev->mr_pool = rds_ib_create_fastreg_pool(rds_ibdev);
> + rds_ibdev->use_fastreg = 1;
> + } else {
> + /* Default: use FMRs. Would be nice if there was
> + * a capability flag to test for. */
> + rds_ibdev->mr_pool = rds_ib_create_mr_pool(rds_ibdev);
> + }
> if (IS_ERR(rds_ibdev->mr_pool)) {
> rds_ibdev->mr_pool = NULL;
> goto err_mr;
> Index: build-2.6/net/rds/ib.h
> ===================================================================
> --- build-2.6.orig/net/rds/ib.h
> +++ build-2.6/net/rds/ib.h
> @@ -49,9 +49,51 @@ struct rds_ib_connect_private {
> __be32 dp_credit; /* non-zero enables flow ctl */
> };
>
> +struct rds_ib_scatterlist {
> + struct scatterlist * list;
> + unsigned int len;
> + int dma_len;
> +};
> +
> +/* We need to post a LOCAL_INV request unless f_old_rkey
> + * has this value. */
> +#define RDS_IB_INVALID_FASTREG_KEY 0
> +
> +struct rds_ib_fastreg {
> + atomic_t f_refcnt;
> + unsigned int f_posted : 1,
> + f_done : 1;
> +
> + u32 f_old_rkey;
> +
> + u32 f_rkey;
> + unsigned int f_length;
> +
> + struct rds_ib_scatterlist f_sg;
> +
> + struct ib_fast_reg_page_list *f_page_list;
> + unsigned int f_page_list_len;
> + unsigned int f_page_shift;
> +
> +#if 0
> + u32 f_invalidate_rkey;
> + struct ib_send_wr f_wr;
> + wait_queue_head_t f_waitq;
> + struct list_head f_list;
> + unsigned int f_done;
> + int f_status;
> +#endif
> +
> + struct rds_ib_mr *f_mr;
> +};
> +
> struct rds_ib_send_work {
> struct rds_message *s_rm;
> +
> + /* We should really put these into a union: */
> struct rds_rdma_op *s_op;
> + struct rds_ib_fastreg *s_fastreg;
> +
> struct ib_send_wr s_wr;
> struct ib_sge s_sge[RDS_IB_MAX_SGE];
> unsigned long s_queued;
> @@ -86,6 +128,7 @@ struct rds_ib_connection {
> struct rds_header *i_send_hdrs;
> u64 i_send_hdrs_dma;
> struct rds_ib_send_work *i_sends;
> + struct list_head i_fastreg_pending;
>
> /* rx */
> struct mutex i_recv_mutex;
> @@ -123,7 +166,9 @@ struct rds_ib_connection {
> atomic_t i_credits;
>
> /* Protocol version specific information */
> - unsigned int i_flowctl : 1; /* enable/disable flow ctl */
> + unsigned int i_flowctl : 1, /* enable/disable flow ctl */
> + i_iwarp : 1, /* this is actually iWARP not IB */
> + i_fastreg : 1; /* use fastreg */
>
> /* Batched completions */
> unsigned int i_unsignaled_wrs;
> @@ -154,6 +199,7 @@ struct rds_ib_device {
> unsigned int fmr_max_remaps;
> unsigned int max_fmrs;
> int max_sge;
> + unsigned int use_fastreg : 1;
> spinlock_t spinlock;
> };
>
> @@ -236,6 +282,7 @@ extern void rds_ib_remove_one(struct ib_
> extern struct ib_client rds_ib_client;
>
> extern unsigned int fmr_pool_size;
> +extern unsigned int fastreg_pool_size;
> extern unsigned int fmr_message_size;
>
> /* ib_cm.c */
> @@ -254,6 +301,7 @@ void __rds_ib_conn_error(struct rds_conn
> /* ib_rdma.c */
> int ib_update_ipaddr_for_device(struct rds_ib_device *rds_ibdev, __be32 ipaddr);
> struct rds_ib_mr_pool *rds_ib_create_mr_pool(struct rds_ib_device *);
> +struct rds_ib_mr_pool *rds_ib_create_fastreg_pool(struct rds_ib_device *);
> void rds_ib_get_mr_info(struct rds_ib_device *rds_ibdev, struct rds_info_ib_connection *iinfo);
> void rds_ib_destroy_mr_pool(struct rds_ib_mr_pool *);
> void *rds_ib_get_mr(struct scatterlist *sg, unsigned long nents,
> @@ -261,6 +309,10 @@ void *rds_ib_get_mr(struct scatterlist *
> void rds_ib_sync_mr(void *trans_private, int dir);
> void rds_ib_free_mr(void *trans_private, int invalidate);
> void rds_ib_flush_mrs(void);
> +struct rds_ib_fastreg *rds_ib_rdma_get_fastreg(struct rds_mr *);
> +void rds_ib_fastreg_release(struct rds_ib_fastreg *frr);
> +void rds_ib_local_inv_complete(struct rds_ib_fastreg *frr, int status);
> +void rds_ib_fast_reg_complete(struct rds_ib_fastreg *frr, int status);
>
> /* ib_recv.c */
> int __init rds_ib_recv_init(void);
> @@ -298,6 +350,7 @@ void rds_ib_send_cq_comp_handler(struct
> void rds_ib_send_init_ring(struct rds_ib_connection *ic);
> void rds_ib_send_clear_ring(struct rds_ib_connection *ic);
> int rds_ib_xmit_rdma(struct rds_connection *conn, struct rds_rdma_op *op);
> +int rds_ib_xmit_fastreg(struct rds_connection *conn, struct rds_mr *mr);
> void rds_ib_send_add_credits(struct rds_connection *conn, unsigned int credits);
> void rds_ib_advertise_credits(struct rds_connection *conn, unsigned int posted);
> int rds_ib_send_grab_credits(struct rds_ib_connection *ic, u32 wanted,
> Index: build-2.6/net/rds/ib_rdma.c
> ===================================================================
> --- build-2.6.orig/net/rds/ib_rdma.c
> +++ build-2.6/net/rds/ib_rdma.c
> @@ -45,20 +45,31 @@ extern struct list_head rds_ib_devices;
> struct rds_ib_mr {
> struct rds_ib_device *device;
> struct rds_ib_mr_pool *pool;
> - struct ib_fmr *fmr;
> +
> + spinlock_t lock;
> + union {
> + struct {
> + struct ib_fmr *fmr;
> + } ib;
> + struct {
> + struct ib_fast_reg_page_list *page_list;
> + struct ib_mr *fastreg_mr;
> + u32 rkey;
> + struct rds_ib_fastreg *pending;
> + } iwarp;
> + } u;
> struct list_head list;
> unsigned int remap_count;
>
> - struct scatterlist * sg;
> - unsigned int sg_len;
> - u64 * dma;
> - int sg_dma_len;
> + struct rds_ib_scatterlist sg;
> };
>
> /*
> * Our own little FMR pool
> */
> struct rds_ib_mr_pool {
> + struct rds_ib_device * device;
> +
> struct mutex flush_lock; /* serialize fmr invalidate */
> struct work_struct flush_worker; /* flush worker */
>
> @@ -68,16 +79,57 @@ struct rds_ib_mr_pool {
> struct list_head drop_list; /* MRs that have reached their max_maps limit */
> struct list_head free_list; /* unused MRs */
> struct list_head clean_list; /* unused & unamapped MRs */
> + struct list_head fastreg_list; /* pending fastreg's */
> atomic_t free_pinned; /* memory pinned by free MRs */
> + unsigned long max_message_size; /* in pages */
> unsigned long max_items;
> unsigned long max_items_soft;
> unsigned long max_free_pinned;
> struct ib_fmr_attr fmr_attr;
> +
> + /* Dummy QP used to handle invalidate for fastreg */
> + struct ib_qp *qp;
> +
> + struct rds_ib_mr_pool_ops *op;
> +};
> +
> +struct rds_ib_mr_pool_ops {
> + int (*init)(struct rds_ib_mr_pool *, struct rds_ib_mr *);
> + int (*map)(struct rds_ib_mr_pool *pool, struct rds_ib_mr *ibmr,
> + struct scatterlist *sg, unsigned int sg_len);
> + void (*unmap)(struct rds_ib_mr_pool *, struct list_head *);
> + void (*destroy)(struct rds_ib_mr_pool *, struct rds_ib_mr *);
> };
>
> static int rds_ib_flush_mr_pool(struct rds_ib_mr_pool *pool, int free_all);
> static void rds_ib_teardown_mr(struct rds_ib_mr *ibmr);
> static void rds_ib_mr_pool_flush_worker(struct work_struct *work);
> +static int rds_ib_init_fmr(struct rds_ib_mr_pool *pool, struct rds_ib_mr *ibmr);
> +static int rds_ib_map_fmr(struct rds_ib_mr_pool *pool,
> + struct rds_ib_mr *ibmr,
> + struct scatterlist *sg, unsigned int nents);
> +static void rds_ib_unmap_fmr_list(struct rds_ib_mr_pool *pool, struct list_head *unmap_list);
> +static void rds_ib_destroy_fmr(struct rds_ib_mr_pool *pool, struct rds_ib_mr *ibmr);
> +static int rds_ib_init_fastreg(struct rds_ib_mr_pool *pool, struct rds_ib_mr *ibmr);
> +static int rds_ib_map_fastreg(struct rds_ib_mr_pool *pool,
> + struct rds_ib_mr *ibmr,
> + struct scatterlist *sg, unsigned int nents);
> +static void rds_ib_unmap_fastreg_list(struct rds_ib_mr_pool *pool, struct list_head *unmap_list);
> +static void rds_ib_destroy_fastreg(struct rds_ib_mr_pool *pool, struct rds_ib_mr *ibmr);
> +
> +static struct rds_ib_mr_pool_ops rds_ib_fmr_pool_ops = {
> + .init = rds_ib_init_fmr,
> + .map = rds_ib_map_fmr,
> + .unmap = rds_ib_unmap_fmr_list,
> + .destroy = rds_ib_destroy_fmr,
> +};
> +
> +static struct rds_ib_mr_pool_ops rds_ib_fastreg_pool_ops = {
> + .init = rds_ib_init_fastreg,
> + .map = rds_ib_map_fastreg,
> + .unmap = rds_ib_unmap_fastreg_list,
> + .destroy = rds_ib_destroy_fastreg,
> +};
>
> int ib_update_ipaddr_for_device(struct rds_ib_device *rds_ibdev, __be32 ipaddr)
> {
> @@ -124,7 +176,158 @@ struct rds_ib_device* ib_get_device(__be
> return NULL;
> }
>
> -struct rds_ib_mr_pool *rds_ib_create_mr_pool(struct rds_ib_device *rds_ibdev)
> +static void rds_ib_set_scatterlist(struct rds_ib_scatterlist *sg,
> + struct scatterlist *list,
> + unsigned int sg_len, unsigned int sg_dma_len)
> +{
> + sg->list = list;
> + sg->len = sg_len;
> + sg->dma_len = sg_dma_len;
> +}
> +
> +static void rds_ib_rdma_drop_scatterlist(struct rds_ib_device *rds_ibdev,
> + struct rds_ib_scatterlist *sg)
> +{
> + if (sg->dma_len) {
> + ib_dma_unmap_sg(rds_ibdev->dev,
> + sg->list, sg->len,
> + DMA_BIDIRECTIONAL);
> + sg->dma_len = 0;
> + }
> +
> + /* Release the s/g list */
> + if (sg->len) {
> + unsigned int i;
> +
> + for (i = 0; i < sg->len; ++i) {
> + struct page *page = sg_page(&sg->list[i]);
> +
> + /* FIXME we need a way to tell a r/w MR
> + * from a r/o MR */
> + set_page_dirty(page);
> + put_page(page);
> + }
> + kfree(sg->list);
> +
> + sg->list = NULL;
> + sg->len = 0;
> + }
> +}
> +
> +/*
> + * IB FMR handling
> + */
> +static int rds_ib_init_fmr(struct rds_ib_mr_pool *pool,
> + struct rds_ib_mr *ibmr)
> +{
> + struct rds_ib_device *rds_ibdev = pool->device;
> + struct ib_fmr *fmr;
> +
> + fmr = ib_alloc_fmr(rds_ibdev->pd,
> + (IB_ACCESS_LOCAL_WRITE |
> + IB_ACCESS_REMOTE_READ |
> + IB_ACCESS_REMOTE_WRITE),
> + &pool->fmr_attr);
> + if (IS_ERR(fmr)) {
> + int err = PTR_ERR(fmr);
> +
> + printk(KERN_WARNING "RDS/IB: ib_alloc_fmr failed (err=%d)\n", err);
> + return err;
> + }
> +
> + ibmr->u.ib.fmr = fmr;
> + return 0;
> +}
> +
> +static void rds_ib_unmap_fmr_list(struct rds_ib_mr_pool *pool,
> + struct list_head *unmap_list)
> +{
> + struct rds_ib_mr *ibmr;
> + LIST_HEAD(fmr_list);
> + int ret;
> +
> + /* String all ib_mr's onto one list and hand them to ib_unmap_fmr */
> + list_for_each_entry(ibmr, unmap_list, list)
> + list_add(&ibmr->u.ib.fmr->list, &fmr_list);
> + ret = ib_unmap_fmr(&fmr_list);
> + if (ret)
> + printk(KERN_WARNING "RDS/IB: ib_unmap_fmr failed (err=%d)\n", ret);
> +}
> +
> +static void rds_ib_destroy_fmr(struct rds_ib_mr_pool *pool,
> + struct rds_ib_mr *ibmr)
> +{
> + if (ibmr->u.ib.fmr)
> + ib_dealloc_fmr(ibmr->u.ib.fmr);
> + ibmr->u.ib.fmr = NULL;
> +}
> +
> +/*
> + * iWARP fastreg handling
> + */
> +static int rds_ib_init_fastreg(struct rds_ib_mr_pool *pool,
> + struct rds_ib_mr *ibmr)
> +{
> + struct rds_ib_device *rds_ibdev = pool->device;
> + struct ib_mr *mr;
> +
> + mr = ib_alloc_fast_reg_mr(rds_ibdev->pd, pool->max_message_size);
> + if (IS_ERR(mr)) {
> + int err = PTR_ERR(mr);
> +
> + printk(KERN_WARNING "RDS/IWARP: ib_alloc_fast_reg_mr failed (err=%d)\n", err);
> + return err;
> + }
> +
> + ibmr->u.iwarp.rkey = RDS_IB_INVALID_FASTREG_KEY;
> + ibmr->u.iwarp.fastreg_mr = mr;
> + return 0;
> +}
> +
> +static void rds_ib_unmap_fastreg_list(struct rds_ib_mr_pool *pool,
> + struct list_head *unmap_list)
> +{
> + LIST_HEAD(fmr_list);
> +
> + /* Batched invalidation of fastreg MRs.
> + * Why do we do it this way, even though we could pipeline unmap
> + * and remap? The reason is the application semantics - when the
> + * application requests an invalidation of MRs, it expects all
> + * previously released R_Keys to become invalid.
> + *
> + * If we implement MR reuse naively, we risk memory corruption
> + * (this has actually been observed). So the default behavior
> + * requires that a MR goes through an explicit unmap operation before
> + * we can reuse it again.
> + *
> + * We could probably improve on this a little, by allowing immediate
> + * reuse of a MR on the same socket (eg you could add small
> + * cache of unused MRs to strct rds_socket - GET_MR could grab one
> + * of these without requiring an explicit invalidate).
> + */
> +
> + /* Fill in the blanks:
> + Go through the list of dirty MRs, and post LOCAL_INV WRs to the
> + dummy pool->qp. When the completion for the last WR arrives,
> + the CQ handler wakes up the caller.
> + */
> + BUG(); /* not implemented yet. */
> +}
> +
> +static void rds_ib_destroy_fastreg(struct rds_ib_mr_pool *pool,
> + struct rds_ib_mr *ibmr)
> +{
> + if (ibmr->u.iwarp.page_list)
> + ib_free_fast_reg_page_list(ibmr->u.iwarp.page_list);
> + if (ibmr->u.iwarp.fastreg_mr)
> + ib_dereg_mr(ibmr->u.iwarp.fastreg_mr);
> + if (ibmr->u.iwarp.pending)
> + rds_ib_fastreg_release(ibmr->u.iwarp.pending);
> +}
> +
> +struct rds_ib_mr_pool *__rds_ib_create_mr_pool(struct rds_ib_device *rds_ibdev,
> + unsigned int message_size, unsigned int pool_size,
> + struct rds_ib_mr_pool_ops *ops)
> {
> struct rds_ib_mr_pool *pool;
>
> @@ -132,25 +335,68 @@ struct rds_ib_mr_pool *rds_ib_create_mr_
> if (!pool)
> return ERR_PTR(-ENOMEM);
>
> + pool->device = rds_ibdev;
> INIT_LIST_HEAD(&pool->free_list);
> INIT_LIST_HEAD(&pool->drop_list);
> INIT_LIST_HEAD(&pool->clean_list);
> + INIT_LIST_HEAD(&pool->fastreg_list);
> mutex_init(&pool->flush_lock);
> spin_lock_init(&pool->list_lock);
> INIT_WORK(&pool->flush_worker, rds_ib_mr_pool_flush_worker);
>
> - pool->fmr_attr.max_pages = fmr_message_size;
> - pool->fmr_attr.max_maps = rds_ibdev->fmr_max_remaps;
> - pool->fmr_attr.page_shift = rds_ibdev->fmr_page_shift;
> - pool->max_free_pinned = rds_ibdev->max_fmrs * fmr_message_size / 4;
> + pool->max_message_size = message_size;
> + pool->max_items = pool_size;
> + pool->max_free_pinned = pool->max_items * pool->max_message_size / 4;
>
> /* We never allow more than max_items MRs to be allocated.
> * When we exceed more than max_items_soft, we start freeing
> * items more aggressively.
> * Make sure that max_items > max_items_soft > max_items / 2
> */
> - pool->max_items_soft = rds_ibdev->max_fmrs * 3 / 4;
> - pool->max_items = rds_ibdev->max_fmrs;
> + pool->max_items_soft = pool->max_items * 3 / 4;
> +
> + return pool;
> +}
> +
> +struct rds_ib_mr_pool *rds_ib_create_mr_pool(struct rds_ib_device *rds_ibdev)
> +{
> + struct rds_ib_mr_pool *pool;
> + unsigned int pool_size = fmr_pool_size;
> +
> + if (rds_ibdev->max_fmrs && rds_ibdev->max_fmrs < pool_size)
> + pool_size = rds_ibdev->max_fmrs;
> +
> + pool = __rds_ib_create_mr_pool(rds_ibdev, fmr_message_size, pool_size,
> + &rds_ib_fmr_pool_ops);
> +
> + if (!IS_ERR(pool)) {
> + pool->fmr_attr.max_pages = pool->max_message_size;
> + pool->fmr_attr.max_maps = rds_ibdev->fmr_max_remaps;
> + pool->fmr_attr.page_shift = rds_ibdev->fmr_page_shift;
> + }
> +
> + return pool;
> +}
> +
> +struct rds_ib_mr_pool *rds_ib_create_fastreg_pool(struct rds_ib_device *rds_ibdev)
> +{
> + struct rds_ib_mr_pool *pool;
> + unsigned int pool_size = fmr_pool_size;
> +
> + if (rds_ibdev->max_fmrs && rds_ibdev->max_fmrs < pool_size)
> + pool_size = rds_ibdev->max_fmrs;
> +
> + pool = __rds_ib_create_mr_pool(rds_ibdev, fmr_message_size,
> + fastreg_pool_size,
> + &rds_ib_fastreg_pool_ops);
> +
> + if (!IS_ERR(pool)) {
> + /* Fill in the blanks:
> + * create a dummy QP to which we can post LOCAL_INV
> + * requests when invalidating MRs
> + */
> + pool->qp = NULL;
> + }
>
> return pool;
> }
> @@ -169,6 +415,10 @@ void rds_ib_destroy_mr_pool(struct rds_i
> rds_ib_flush_mr_pool(pool, 1);
> BUG_ON(atomic_read(&pool->item_count));
> BUG_ON(atomic_read(&pool->free_pinned));
> +
> + if (pool->qp)
> + ib_destroy_qp(pool->qp);
> +
> kfree(pool);
> }
>
> @@ -227,77 +477,82 @@ static struct rds_ib_mr *rds_ib_alloc_fm
> goto out_no_cigar;
> }
>
> - ibmr->fmr = ib_alloc_fmr(rds_ibdev->pd,
> - (IB_ACCESS_LOCAL_WRITE |
> - IB_ACCESS_REMOTE_READ |
> - IB_ACCESS_REMOTE_WRITE),
> - &pool->fmr_attr);
> - if (IS_ERR(ibmr->fmr)) {
> - err = PTR_ERR(ibmr->fmr);
> - ibmr->fmr = NULL;
> - printk(KERN_WARNING "RDS/IB: ib_alloc_fmr failed (err=%d)\n", err);
> + spin_lock_init(&ibmr->lock);
> +
> + err = pool->op->init(pool, ibmr);
> + if (err)
> goto out_no_cigar;
> - }
>
> rds_ib_stats_inc(s_ib_rdma_mr_alloc);
> return ibmr;
>
> out_no_cigar:
> if (ibmr) {
> - if (ibmr->fmr)
> - ib_dealloc_fmr(ibmr->fmr);
> + pool->op->destroy(pool, ibmr);
> kfree(ibmr);
> }
> atomic_dec(&pool->item_count);
> return ERR_PTR(err);
> }
>
> -static int rds_ib_map_fmr(struct rds_ib_device *rds_ibdev, struct rds_ib_mr *ibmr,
> - struct scatterlist *sg, unsigned int nents)
> +static int rds_ib_count_dma_pages(struct rds_ib_device *rds_ibdev,
> + struct scatterlist *sg, unsigned int sg_dma_len,
> + unsigned int *lenp)
> {
> struct ib_device *dev = rds_ibdev->dev;
> - struct scatterlist *scat = sg;
> - u64 io_addr = 0;
> - u64 *dma_pages;
> - u32 len;
> - int page_cnt, sg_dma_len;
> - int i, j;
> - int ret;
> -
> - sg_dma_len = ib_dma_map_sg(dev, sg, nents,
> - DMA_BIDIRECTIONAL);
> - if (unlikely(!sg_dma_len)) {
> - printk(KERN_WARNING "RDS/IB: dma_map_sg failed!\n");
> - return -EBUSY;
> - }
> -
> - len = 0;
> - page_cnt = 0;
> + unsigned int i, page_cnt = 0, len = 0;
>
> for (i = 0; i < sg_dma_len; ++i) {
> - unsigned int dma_len = ib_sg_dma_len(dev, &scat[i]);
> - u64 dma_addr = ib_sg_dma_address(dev, &scat[i]);
> + unsigned int dma_len = ib_sg_dma_len(dev, &sg[i]);
> + u64 dma_addr = ib_sg_dma_address(dev, &sg[i]);
>
> if (dma_addr & ~rds_ibdev->fmr_page_mask) {
> if (i > 0)
> return -EINVAL;
> - else
> - ++page_cnt;
> + ++page_cnt;
> }
> if ((dma_addr + dma_len) & ~rds_ibdev->fmr_page_mask) {
> if (i < sg_dma_len - 1)
> return -EINVAL;
> - else
> - ++page_cnt;
> + ++page_cnt;
> }
>
> len += dma_len;
> }
>
> page_cnt += len >> rds_ibdev->fmr_page_shift;
> - if (page_cnt > fmr_message_size)
> + if (page_cnt > rds_ibdev->mr_pool->max_message_size)
> return -EINVAL;
>
> + return page_cnt;
> +}
> +
> +static int rds_ib_map_fmr(struct rds_ib_mr_pool *pool,
> + struct rds_ib_mr *ibmr,
> + struct scatterlist *sg, unsigned int nents)
> +{
> + struct rds_ib_device *rds_ibdev = pool->device;
> + struct ib_device *dev = rds_ibdev->dev;
> + struct scatterlist *scat = sg;
> + u64 io_addr = 0;
> + u64 *dma_pages;
> + int page_cnt, sg_dma_len;
> + int i, j;
> + int ret;
> +
> + sg_dma_len = ib_dma_map_sg(dev, sg, nents,
> + DMA_BIDIRECTIONAL);
> + if (unlikely(!sg_dma_len)) {
> + printk(KERN_WARNING "RDS/IB: dma_map_sg failed!\n");
> + return -EBUSY;
> + }
> +
> + /* FIXME: when returning an error, we need to unmap the SG */
> +
> + page_cnt = rds_ib_count_dma_pages(rds_ibdev, sg, sg_dma_len, NULL);
> + if (page_cnt < 0)
> + return page_cnt;
> +
> dma_pages = kmalloc(sizeof(u64) * page_cnt, GFP_ATOMIC);
> if (!dma_pages)
> return -ENOMEM;
> @@ -312,7 +567,7 @@ static int rds_ib_map_fmr(struct rds_ib_
> (dma_addr & rds_ibdev->fmr_page_mask) + j;
> }
>
> - ret = ib_map_phys_fmr(ibmr->fmr,
> + ret = ib_map_phys_fmr(ibmr->u.ib.fmr,
> dma_pages, page_cnt, io_addr);
> if (ret)
> goto out;
> @@ -321,9 +576,9 @@ static int rds_ib_map_fmr(struct rds_ib_
> * safely tear down the old mapping. */
> rds_ib_teardown_mr(ibmr);
>
> - ibmr->sg = scat;
> - ibmr->sg_len = nents;
> - ibmr->sg_dma_len = sg_dma_len;
> + ibmr->sg.list = scat;
> + ibmr->sg.len = nents;
> + ibmr->sg.dma_len = sg_dma_len;
> ibmr->remap_count++;
>
> rds_ib_stats_inc(s_ib_rdma_mr_used);
> @@ -335,6 +590,192 @@ out:
> return ret;
> }
>
> +static int rds_ib_map_fastreg(struct rds_ib_mr_pool *pool,
> + struct rds_ib_mr *ibmr,
> + struct scatterlist *sg, unsigned int sg_len)
> +{
> + struct rds_ib_device *rds_ibdev = pool->device;
> + struct ib_device *dev = rds_ibdev->dev;
> + struct ib_fast_reg_page_list *page_list = NULL;
> + struct rds_ib_fastreg *frr;
> + unsigned int len;
> + int i, j, page_cnt, sg_dma_len = 0;
> + int ret;
> +
> + BUG_ON(ibmr->u.iwarp.pending);
> +
> + page_list = ib_alloc_fast_reg_page_list(rds_ibdev->dev, pool->max_message_size);
> + if (IS_ERR(page_list)) {
> + ret = PTR_ERR(page_list);
> + page_list = NULL;
> +
> + printk(KERN_WARNING "RDS/iWARP: ib_alloc_fast_reg_page_list failed (err=%d)\n", ret);
> + return ret;
> + }
> +
> + sg_dma_len = ib_dma_map_sg(dev, sg, sg_len, DMA_BIDIRECTIONAL);
> + if (unlikely(!sg_dma_len)) {
> + printk(KERN_WARNING "RDS/iWARP: dma_map_sg failed!\n");
> + ret = -EBUSY;
> + goto out;
> + }
> +
> + page_cnt = rds_ib_count_dma_pages(rds_ibdev, sg, sg_dma_len, &len);
> + if (page_cnt < 0) {
> + ret = page_cnt;
> + goto out;
> + }
> +
> + page_cnt = 0;
> + for (i = 0; i < sg_dma_len; ++i) {
> + unsigned int dma_len = ib_sg_dma_len(dev, &sg[i]);
> + u64 dma_addr = ib_sg_dma_address(dev, &sg[i]);
> +
> + for (j = 0; j < dma_len; j += rds_ibdev->fmr_page_size)
> + page_list->page_list[page_cnt++] =
> + (dma_addr & rds_ibdev->fmr_page_mask) + j;
> + }
> +
> + /* Allocate the fastreg request structure */
> + frr = kzalloc(sizeof(*frr), GFP_KERNEL);
> + if (!frr) {
> + ret = -ENOMEM;
> + goto out;
> + }
> +
> + ib_update_fast_reg_key(ibmr->u.iwarp.fastreg_mr, ibmr->remap_count++);
> +
> + /* Build the fastreg WR */
> + frr->f_mr = ibmr;
> + rds_ib_set_scatterlist(&frr->f_sg, sg, sg_len, sg_dma_len);
> + frr->f_length = len;
> + frr->f_rkey = ibmr->u.iwarp.fastreg_mr->rkey;
> + frr->f_page_list = page_list;
> + frr->f_page_list_len = sg_dma_len;
> + frr->f_page_shift = rds_ibdev->fmr_page_shift;
> +
> + frr->f_old_rkey = ibmr->u.iwarp.rkey;
> +
> + /* Attach the fastreg info to the MR */
> + atomic_set(&frr->f_refcnt, 1);
> + ibmr->u.iwarp.pending = frr;
> +
> + rds_ib_stats_inc(s_ib_rdma_mr_used);
> + ret = 0;
> +
> +out:
> + if (ret) {
> + ib_free_fast_reg_page_list(page_list);
> + if (sg_dma_len)
> + ib_dma_unmap_sg(dev, sg, sg_dma_len, DMA_BIDIRECTIONAL);
> + }
> +
> + return ret;
> +}
> +
> +struct rds_ib_fastreg *rds_ib_rdma_get_fastreg(struct rds_mr *mr)
> +{
> + struct rds_ib_mr *ibmr = mr->r_trans_private;
> + struct rds_ib_fastreg *frr;
> + unsigned long flags;
> +
> + spin_lock_irqsave(&ibmr->lock, flags);
> + frr = ibmr->u.iwarp.pending;
> + if (frr) {
> + /* FIXME: we need to mark the frr as "locked"
> + * to prevent FREE_MR from trashing the MR
> + * as long as the fastreg is on the queue */
> + atomic_inc(&frr->f_refcnt);
> + }
> + spin_unlock_irqrestore(&ibmr->lock, flags);
> +
> + return frr;
> +}
> +
> +void rds_ib_fastreg_release(struct rds_ib_fastreg *frr)
> +{
> + struct rds_ib_device *rds_ibdev = NULL;
> +
> + if (atomic_dec_and_test(&frr->f_refcnt)) {
> + ib_free_fast_reg_page_list(frr->f_page_list);
> + BUG(); /* FIXME: obtain rds_ibdev */
> + rds_ib_rdma_drop_scatterlist(rds_ibdev, &frr->f_sg);
> + kfree(frr);
> + }
> +}
> +
> +/*
> + * These functions are called back from the send CQ handler
> + * when the LOCAL_INV or FAST_REG_MR WRs complete.
> + */
> +void rds_ib_local_inv_complete(struct rds_ib_fastreg *frr, int status)
> +{
> + struct rds_ib_mr *ibmr = frr->f_mr;
> +
> + spin_lock(&ibmr->lock);
> + if (ibmr->u.iwarp.pending != frr)
> + goto out_unlock;
> +
> + if (status != IB_WC_SUCCESS) {
> + /* Yikes. Invalidation failed. What can we do but complain? */
> + printk(KERN_NOTICE "RDS/iWARP: Unable to invalidate fastreg MR.\n");
> + goto out_unlock;
> + }
> +
> + if (frr->f_old_rkey == ibmr->u.iwarp.rkey) {
> + ibmr->u.iwarp.rkey = 0;
> + /* Now we can unpin any memory pinned for this MR. */
> + rds_ib_teardown_mr(ibmr);
> + }
> + frr->f_old_rkey = RDS_IB_INVALID_FASTREG_KEY;
> +
> +out_unlock:
> + spin_unlock(&ibmr->lock);
> +
> + /* The WR owned a reference to this frr. Drop it */
> + rds_ib_fastreg_release(frr);
> +}
> +
> +void rds_ib_fast_reg_complete(struct rds_ib_fastreg *frr, int status)
> +{
> + struct rds_ib_mr *ibmr = frr->f_mr;
> +
> + spin_lock(&ibmr->lock);
> +
> + /* Technically, this would be a bug */
> + if (ibmr->u.iwarp.pending != frr)
> + goto out_unlock;
> +
> + if (status != IB_WC_SUCCESS) {
> + /* Yikes. We were unable to register the application's
> + * memory. We have no way of notifying the application.
> + * We could probably tear down the QP and cry uncle, but
> + * the SEND may already have gone out.
> + * The only solace is that the RDMA initiated by the remote
> + * will fail, because the key isn't valid.
> + */
> + if (printk_ratelimit())
> + printk(KERN_NOTICE "RDS/iWARP: Unable to "
> + "perform fast memory registration.\n");
> + goto out_unlock;
> + }
> +
> + ibmr->sg = frr->f_sg;
> + ibmr->u.iwarp.page_list = frr->f_page_list;
> + ibmr->u.iwarp.rkey = frr->f_rkey;
> +
> + /* Detach frr from MR. We still have at least one ref after this */
> + ibmr->u.iwarp.pending = NULL;
> + rds_ib_fastreg_release(frr);
> + frr->f_done = 1;
> +
> +out_unlock:
> + spin_unlock(&ibmr->lock);
> +
> + /* The WR owned a reference to this frr. Drop it */
> + rds_ib_fastreg_release(frr);
> +}
> +
> void rds_ib_sync_mr(void *trans_private, int direction)
> {
> struct rds_ib_mr *ibmr = trans_private;
> @@ -342,49 +783,24 @@ void rds_ib_sync_mr(void *trans_private,
>
> switch (direction) {
> case DMA_FROM_DEVICE:
> - ib_dma_sync_sg_for_cpu(rds_ibdev->dev, ibmr->sg,
> - ibmr->sg_dma_len, DMA_BIDIRECTIONAL);
> + ib_dma_sync_sg_for_cpu(rds_ibdev->dev, ibmr->sg.list,
> + ibmr->sg.dma_len, DMA_BIDIRECTIONAL);
> break;
> case DMA_TO_DEVICE:
> - ib_dma_sync_sg_for_device(rds_ibdev->dev, ibmr->sg,
> - ibmr->sg_dma_len, DMA_BIDIRECTIONAL);
> + ib_dma_sync_sg_for_device(rds_ibdev->dev, ibmr->sg.list,
> + ibmr->sg.dma_len, DMA_BIDIRECTIONAL);
> break;
> }
> }
>
> static void __rds_ib_teardown_mr(struct rds_ib_mr *ibmr)
> {
> - struct rds_ib_device *rds_ibdev = ibmr->device;
> -
> - if (ibmr->sg_dma_len) {
> - ib_dma_unmap_sg(rds_ibdev->dev,
> - ibmr->sg, ibmr->sg_len,
> - DMA_BIDIRECTIONAL);
> - ibmr->sg_dma_len = 0;
> - }
> -
> - /* Release the s/g list */
> - if (ibmr->sg_len) {
> - unsigned int i;
> -
> - for (i = 0; i < ibmr->sg_len; ++i) {
> - struct page *page = sg_page(&ibmr->sg[i]);
> -
> - /* FIXME we need a way to tell a r/w MR
> - * from a r/o MR */
> - set_page_dirty(page);
> - put_page(page);
> - }
> - kfree(ibmr->sg);
> -
> - ibmr->sg = NULL;
> - ibmr->sg_len = 0;
> - }
> + rds_ib_rdma_drop_scatterlist(ibmr->device, &ibmr->sg);
> }
>
> void rds_ib_teardown_mr(struct rds_ib_mr *ibmr)
> {
> - unsigned int pinned = ibmr->sg_len;
> + unsigned int pinned = ibmr->sg.len;
>
> __rds_ib_teardown_mr(ibmr);
> if (pinned) {
> @@ -419,7 +835,6 @@ int rds_ib_flush_mr_pool(struct rds_ib_m
> {
> struct rds_ib_mr *ibmr, *next;
> LIST_HEAD(unmap_list);
> - LIST_HEAD(fmr_list);
> unsigned long unpinned = 0;
> unsigned long flags;
> unsigned int nfreed = 0, ncleaned = 0, free_goal;
> @@ -443,21 +858,17 @@ int rds_ib_flush_mr_pool(struct rds_ib_m
> if (list_empty(&unmap_list))
> goto out;
>
> - /* String all ib_mr's onto one list and hand them to ib_unmap_fmr */
> - list_for_each_entry(ibmr, &unmap_list, list)
> - list_add(&ibmr->fmr->list, &fmr_list);
> - ret = ib_unmap_fmr(&fmr_list);
> - if (ret)
> - printk(KERN_WARNING "RDS/IB: ib_unmap_fmr failed (err=%d)\n", ret);
> + /* Batched invalidate of dirty MRs: */
> + pool->op->unmap(pool, &unmap_list);
>
> /* Now we can destroy the DMA mapping and unpin any pages */
> list_for_each_entry_safe(ibmr, next, &unmap_list, list) {
> - unpinned += ibmr->sg_len;
> + unpinned += ibmr->sg.len;
> __rds_ib_teardown_mr(ibmr);
> if (nfreed < free_goal || ibmr->remap_count >= pool->fmr_attr.max_maps) {
> rds_ib_stats_inc(s_ib_rdma_mr_free);
> list_del(&ibmr->list);
> - ib_dealloc_fmr(ibmr->fmr);
> + pool->op->destroy(pool, ibmr);
> kfree(ibmr);
> nfreed++;
> }
> @@ -491,7 +902,7 @@ void rds_ib_free_mr(void *trans_private,
> struct rds_ib_mr_pool *pool = rds_ibdev->mr_pool;
> unsigned long flags;
>
> - rdsdebug("RDS/IB: free_mr nents %u\n", ibmr->sg_len);
> + rdsdebug("RDS/IB: free_mr nents %u\n", ibmr->sg.len);
>
> /* Return it to the pool's free list */
> spin_lock_irqsave(&pool->list_lock, flags);
> @@ -500,7 +911,7 @@ void rds_ib_free_mr(void *trans_private,
> } else {
> list_add(&ibmr->list, &pool->free_list);
> }
> - atomic_add(ibmr->sg_len, &pool->free_pinned);
> + atomic_add(ibmr->sg.len, &pool->free_pinned);
> atomic_inc(&pool->dirty_count);
> spin_unlock_irqrestore(&pool->list_lock, flags);
>
> @@ -536,6 +947,7 @@ void *rds_ib_get_mr(struct scatterlist *
> __be32 ip_addr, u32 *key_ret)
> {
> struct rds_ib_device *rds_ibdev;
> + struct rds_ib_mr_pool *pool;
> struct rds_ib_mr *ibmr = NULL;
> int ret;
>
> @@ -545,7 +957,7 @@ void *rds_ib_get_mr(struct scatterlist *
> goto out;
> }
>
> - if (!rds_ibdev->mr_pool) {
> + if (!(pool = rds_ibdev->mr_pool)) {
> ret = -ENODEV;
> goto out;
> }
> @@ -554,9 +966,9 @@ void *rds_ib_get_mr(struct scatterlist *
> if (IS_ERR(ibmr))
> return ibmr;
>
> - ret = rds_ib_map_fmr(rds_ibdev, ibmr, sg, nents);
> + ret = pool->op->map(pool, ibmr, sg, nents);
> if (ret == 0)
> - *key_ret = ibmr->fmr->rkey;
> + *key_ret = ibmr->u.ib.fmr->rkey;
> else
> printk(KERN_WARNING "RDS/IB: map_fmr failed (errno=%d)\n", ret);
>
> Index: build-2.6/net/rds/rdma.c
> ===================================================================
> --- build-2.6.orig/net/rds/rdma.c
> +++ build-2.6/net/rds/rdma.c
> @@ -116,11 +116,8 @@ static void rds_destroy_mr(struct rds_mr
> mr->r_trans->free_mr(trans_private, mr->r_invalidate);
> }
>
> -static void rds_mr_put(struct rds_mr *mr)
> +void __rds_put_mr_final(struct rds_mr *mr)
> {
> - if (!atomic_dec_and_test(&mr->r_refcount))
> - return;
> -
> rds_destroy_mr(mr);
> kfree(mr);
> }
> @@ -169,7 +166,7 @@ static int rds_pin_pages(unsigned long u
> }
>
> static int __rds_rdma_map(struct rds_sock *rs, struct rds_get_mr_args *args,
> - u64 *cookie_ret)
> + u64 *cookie_ret, struct rds_mr **mr_ret)
> {
> struct rds_mr *mr = NULL, *found;
> unsigned int nr_pages;
> @@ -297,6 +294,10 @@ static int __rds_rdma_map(struct rds_soc
>
> rdsdebug("RDS: get_mr key is %x\n", mr->r_key);
>
> + if (mr_ret) {
> + atomic_inc(&mr->r_refcount);
> + *mr_ret = mr;
> + }
> ret = 0;
> out:
> if (pages)
> @@ -317,7 +318,7 @@ int rds_get_mr(struct rds_sock *rs, char
> sizeof(struct rds_get_mr_args)))
> return -EFAULT;
>
> - return __rds_rdma_map(rs, &args, NULL);
> + return __rds_rdma_map(rs, &args, NULL, NULL);
> }
>
> /*
> @@ -655,7 +656,7 @@ int rds_cmsg_rdma_dest(struct rds_sock *
>
> if (mr) {
> mr->r_trans->sync_mr(mr->r_trans_private, DMA_TO_DEVICE);
> - rds_mr_put(mr);
> + rm->m_rdma_mr = mr;
> }
> return err;
> }
> @@ -673,5 +674,5 @@ int rds_cmsg_rdma_map(struct rds_sock *r
> || rm->m_rdma_cookie != 0)
> return -EINVAL;
>
> - return __rds_rdma_map(rs, CMSG_DATA(cmsg), &rm->m_rdma_cookie);
> + return __rds_rdma_map(rs, CMSG_DATA(cmsg), &rm->m_rdma_cookie, &rm->m_rdma_mr);
> }
> Index: build-2.6/net/rds/rds.h
> ===================================================================
> --- build-2.6.orig/net/rds/rds.h
> +++ build-2.6/net/rds/rds.h
> @@ -30,6 +30,7 @@
> */
> #define RDS_IB_PORT 18635
> #define RDS_TCP_PORT 18636
> +#define RDS_IWARP_PORT 18637
>
> #ifndef AF_RDS
> #define AF_RDS 28 /* Reliable Datagram Socket */
> @@ -60,6 +61,7 @@
> /* XXX crap, we need to worry about this conflicting too */
> #define SYSCTL_NET_RDS 9912
> #define SYSCTL_NET_RDS_IB 100
> +#define SYSCTL_NET_RDS_IWARP 101
>
> #ifdef DEBUG
> #define rdsdebug(fmt, args...) pr_debug("%s(): " fmt, __func__ , ##args)
> @@ -282,6 +284,7 @@ struct rds_incoming {
> #define RDS_MSG_RETRANSMITTED 5
> #define RDS_MSG_MAPPED 6
> #define RDS_MSG_PAGEVEC 7
> +#define RDS_MSG_FASTREG_POSTED 8
>
> struct rds_message {
> atomic_t m_refcount;
> @@ -301,6 +304,7 @@ struct rds_message {
> struct rds_sock *m_rs;
> struct rds_rdma_op *m_rdma_op;
> rds_rdma_cookie_t m_rdma_cookie;
> + struct rds_mr *m_rdma_mr;
> unsigned int m_nents;
> unsigned int m_count;
> struct scatterlist m_sg[0];
> Index: build-2.6/net/rds/ib_cm.c
> ===================================================================
> --- build-2.6.orig/net/rds/ib_cm.c
> +++ build-2.6/net/rds/ib_cm.c
> @@ -142,16 +142,19 @@ static void rds_ib_cm_fill_conn_param(st
> struct rds_ib_connect_private *dp,
> u32 protocol_version)
> {
> + struct rds_ib_connection *ic = conn->c_transport_data;
> +
> memset(conn_param, 0, sizeof(struct rdma_conn_param));
> /* XXX tune these? */
> conn_param->responder_resources = 1;
> conn_param->initiator_depth = 1;
> - conn_param->retry_count = 7;
> - conn_param->rnr_retry_count = 7;
>
> - if (dp) {
> - struct rds_ib_connection *ic = conn->c_transport_data;
> + if (!ic->i_iwarp) {
> + conn_param->retry_count = 7;
> + conn_param->rnr_retry_count = 7;
> + }
>
> + if (dp) {
> memset(dp, 0, sizeof(*dp));
> dp->dp_saddr = conn->c_laddr;
> dp->dp_daddr = conn->c_faddr;
> @@ -288,7 +291,7 @@ static int rds_ib_setup_qp(struct rds_co
> */
> ret = rdma_create_qp(ic->i_cm_id, ic->i_pd, &attr);
> if (ret) {
> - rdsdebug("ib_req_notify_cq failed: %d\n", ret);
> + rdsdebug("rdma_create_qp failed: %d\n", ret);
> goto out;
> }
>
> @@ -442,6 +445,12 @@ static int rds_ib_cm_handle_connect(stru
> ic->i_cm_id = cm_id;
> cm_id->context = conn;
>
> + rds_ibdev = ib_get_client_data(cm_id->device, &rds_ib_client);
> +
> + /* Remember whether this is IB or iWARP */
> + ic->i_iwarp = (cm_id->device->node_type == RDMA_NODE_RNIC);
> + ic->i_fastreg = rds_ibdev->use_fastreg;
> +
> /* We got halfway through setting up the ib_connection, if we
> * fail now, we have to take the long route out of this mess. */
> destroy = 0;
> @@ -462,7 +471,6 @@ static int rds_ib_cm_handle_connect(stru
> }
>
> /* update ib_device with this local ipaddr */
> - rds_ibdev = ib_get_client_data(ic->i_cm_id->device, &rds_ib_client);
> ib_update_ipaddr_for_device(rds_ibdev, dp->dp_saddr);
>
> return 0;
> @@ -616,6 +624,17 @@ int rds_ib_conn_connect(struct rds_conne
> src.sin_addr.s_addr = (__force u32)conn->c_laddr;
> src.sin_port = (__force u16)htons(0);
>
> + /* First, bind to the local address and device. */
> + ret = rdma_bind_addr(ic->i_cm_id, (struct sockaddr *) &src);
> + if (ret) {
> + rdsdebug("rdma_bind_addr(%u.%u.%u.%u) failed: %d\n",
> + NIPQUAD(conn->c_laddr), ret);
> + goto out;
> + }
> +
> + /* Now check the device type and set i_iwarp */
> + ic->i_iwarp = (ic->i_cm_id->device->node_type == RDMA_NODE_RNIC);
> +
> dest.sin_family = AF_INET;
> dest.sin_addr.s_addr = (__force u32)conn->c_faddr;
> dest.sin_port = (__force u16)htons(RDS_IB_PORT);
> @@ -662,8 +681,9 @@ void rds_ib_conn_shutdown(struct rds_con
> " cm: %p err %d\n", ic->i_cm_id, err);
> }
>
> - /* Always move the QP to error state */
> - if (ic->i_cm_id->qp) {
> + /* For IB, we have to move the QP to error state.
> + * This is not needed for iWARP */
> + if (ic->i_cm_id->qp && !ic->i_iwarp) {
> qp_attr.qp_state = IB_QPS_ERR;
> err = ib_modify_qp(ic->i_cm_id->qp, &qp_attr, IB_QP_STATE);
> if (err) {
> Index: build-2.6/net/rds/ib_send.c
> ===================================================================
> --- build-2.6.orig/net/rds/ib_send.c
> +++ build-2.6/net/rds/ib_send.c
> @@ -165,6 +165,8 @@ void rds_ib_send_clear_ring(struct rds_i
> rds_ib_send_unmap_rm(ic, send, IB_WC_WR_FLUSH_ERR);
> if (send->s_op)
> rds_ib_send_unmap_rdma(ic, send->s_op);
> + if (send->s_fastreg)
> + rds_ib_fastreg_release(send->s_fastreg);
> }
> }
>
> @@ -195,7 +197,7 @@ void rds_ib_send_cq_comp_handler(struct
> while (ib_poll_cq(cq, 1, &wc) > 0 ) {
> rdsdebug("wc wr_id 0x%llx status %u byte_len %u imm_data %u\n",
> (unsigned long long)wc.wr_id, wc.status, wc.byte_len,
> - be32_to_cpu(wc.imm_data));
> + be32_to_cpu(wc.ex.imm_data));
> rds_ib_stats_inc(s_ib_tx_cq_event);
>
> if (wc.wr_id == RDS_IB_ACK_WR_ID) {
> @@ -223,6 +225,16 @@ void rds_ib_send_cq_comp_handler(struct
> /* Nothing to be done - the SG list will be unmapped
> * when the SEND completes. */
> break;
> + case IB_WR_LOCAL_INV:
> + /* We invalidated an r_key. the caller may want to
> + * learn about this. */
> + if (send->s_fastreg)
> + rds_ib_local_inv_complete(send->s_fastreg, wc.status);
> + break;
> + case IB_WR_FAST_REG_MR:
> + if (send->s_fastreg)
> + rds_ib_fast_reg_complete(send->s_fastreg, wc.status);
> + break;
> default:
> if (printk_ratelimit())
> printk(KERN_NOTICE
> @@ -261,7 +273,7 @@ void rds_ib_send_cq_comp_handler(struct
> * queue_delay_work will not do anything if the work
> * struct is already queued, so we need to cancel it first.
> */
> - cancel_delayed_work(&conn->c_send_w);
> + cancel_delayed_work(&conn->c_send_w); /* FIXME barf */
> queue_delayed_work(rds_wq, &conn->c_send_w, 0);
> }
>
> @@ -490,6 +502,21 @@ int rds_ib_xmit(struct rds_connection *c
> else
> i = ceil(be32_to_cpu(rm->m_inc.i_hdr.h_len), RDS_FRAG_SIZE);
>
> + /* Fastreg support */
> + if (rds_rdma_cookie_key(rm->m_rdma_cookie)
> + && ic->i_fastreg
> + && !test_bit(RDS_MSG_FASTREG_POSTED, &rm->m_flags)) {
> + ret = rds_ib_xmit_fastreg(conn, rm->m_rdma_mr);
> + if (ret)
> + goto out;
> +
> + /* We don't release the fastreg yet - we can only
> + * do that when it has completed. If the connection
> + * goes down, and we re-queue the message, we would
> + * have to retry the registration. */
> + set_bit(RDS_MSG_FASTREG_POSTED, &rm->m_flags);
> + }
> +
> work_alloc = rds_ib_ring_alloc(&ic->i_send_ring, i, &pos);
> if (work_alloc == 0) {
> set_bit(RDS_LL_SEND_FULL, &conn->c_flags);
> @@ -849,6 +876,108 @@ out:
> return ret;
> }
>
> +static int __rds_ib_xmit_fastreg(struct rds_connection *conn,
> + struct rds_ib_fastreg *frr)
> +{
> + struct rds_ib_connection *ic = conn->c_transport_data;
> + struct rds_ib_send_work *send = NULL;
> + struct rds_ib_send_work *first;
> + struct ib_send_wr *failed_wr;
> + u32 pos;
> + u32 work_alloc = 0;
> + int ret;
> + int num_wrs;
> +
> + /*
> + * Perform 2 WRs for the fast_reg_mr's and chain them together. The
> + * first WR is used to invalidate the old rkey, and the second WR is
> + * used to define the new fast_reg_mr request. Each individual page
> + * in the sg list is added to the fast reg page list and placed
> + * inside the fast_reg_mr WR. The key used is a rolling 8bit
> + * counter, which should guarantee uniqueness.
> + */
> + num_wrs = 0;
> + if (frr->f_old_rkey != RDS_IB_INVALID_FASTREG_KEY)
> + num_wrs++;
> + if (frr->f_page_list)
> + num_wrs++;
> + if (!num_wrs)
> + return 0;
> +
> + work_alloc = rds_ib_ring_alloc(&ic->i_send_ring, num_wrs, &pos);
> + if (work_alloc != num_wrs) {
> + rds_ib_ring_unalloc(&ic->i_send_ring, work_alloc);
> + rds_ib_stats_inc(s_ib_tx_ring_full);
> + ret = -ENOMEM;
> + goto out;
> + }
> +
> + first = send = &ic->i_sends[pos];
> +
> + if (frr->f_old_rkey != RDS_IB_INVALID_FASTREG_KEY) {
> + memset(send, 0, sizeof(*send));
> + send->s_wr.opcode = IB_WR_LOCAL_INV;
> + send->s_wr.ex.invalidate_rkey = frr->f_old_rkey;
> + send->s_fastreg = frr;
> + send->s_queued = jiffies;
> +
> + /* Get the next WR */
> + pos = (pos + 1) % ic->i_send_ring.w_nr;
> + send = &ic->i_sends[pos];
> + }
> +
> + if (frr->f_page_list) {
> + memset(send, 0, sizeof(*send));
> + send->s_wr.opcode = IB_WR_FAST_REG_MR;
> + send->s_wr.wr.fast_reg.length = frr->f_length;
> + send->s_wr.wr.fast_reg.rkey = frr->f_rkey;
> + send->s_wr.wr.fast_reg.page_list = frr->f_page_list;
> + send->s_wr.wr.fast_reg.page_list_len = frr->f_page_list_len;
> + send->s_wr.wr.fast_reg.page_shift = frr->f_page_shift;
> + send->s_wr.wr.fast_reg.access_flags = IB_ACCESS_LOCAL_WRITE |
> + IB_ACCESS_REMOTE_READ |
> + IB_ACCESS_REMOTE_WRITE;
> + send->s_fastreg = frr;
> + send->s_queued = jiffies;
> + }
> +
> + atomic_add(num_wrs, &frr->f_refcnt);
> +
> + /* Chain the two WRs together */
> + if (num_wrs == 2)
> + first->s_wr.next = &send->s_wr;
> +
> + failed_wr = &first->s_wr;
> + ret = ib_post_send(ic->i_cm_id->qp, &first->s_wr, &failed_wr);
> +
> + rdsdebug("ic %p first %p (wr %p) ret %d wr %p\n", ic,
> + first, &first->s_wr, ret, failed_wr);
> + BUG_ON(failed_wr != &first->s_wr);
> + if (ret) {
> + printk(KERN_WARNING "RDS/IB: fastreg ib_post_send to %u.%u.%u.%u "
> + "returned %d\n", NIPQUAD(conn->c_faddr), ret);
> + while (num_wrs--)
> + rds_ib_fastreg_release(frr);
> + rds_ib_ring_unalloc(&ic->i_send_ring, 2);
> + return ret;
> + }
> +
> +out:
> + return ret;
> +}
> +
> +int rds_ib_xmit_fastreg(struct rds_connection *conn, struct rds_mr *mr)
> +{
> + struct rds_ib_fastreg *frr;
> +
> + frr = rds_ib_rdma_get_fastreg(mr);
> + if (!frr)
> + return 0;
> + if (IS_ERR(frr))
> + return PTR_ERR(frr);
> + return __rds_ib_xmit_fastreg(conn, frr);
> +}
> +
> void rds_ib_xmit_complete(struct rds_connection *conn)
> {
> struct rds_ib_connection *ic = conn->c_transport_data;
> Index: build-2.6/net/rds/send.c
> ===================================================================
> --- build-2.6.orig/net/rds/send.c
> +++ build-2.6/net/rds/send.c
> @@ -84,6 +84,10 @@ void rds_send_reset(struct rds_connectio
> list_for_each_entry_safe(rm, tmp, &conn->c_retrans, m_conn_item) {
> set_bit(RDS_MSG_ACK_REQUIRED, &rm->m_flags);
> set_bit(RDS_MSG_RETRANSMITTED, &rm->m_flags);
> + /* If we were in the process of performing a fastreg
> + * memory registration when the connection went down,
> + * we have to retry it. */
> + clear_bit(RDS_MSG_FASTREG_POSTED, &rm->m_flags);
> }
> list_splice_init(&conn->c_retrans, &conn->c_send_queue);
> spin_unlock_irqrestore(&conn->c_lock, flags);
> @@ -765,6 +769,9 @@ static int rds_cmsg_send(struct rds_sock
> if (cmsg->cmsg_level != SOL_RDS)
> continue;
>
> + /* As a side effect, RDMA_DEST and RDMA_MAP will set
> + * rm->m_rdma_cookie and rm->m_rdma_mr.
> + */
> switch (cmsg->cmsg_type) {
> case RDS_CMSG_RDMA_ARGS:
> ret = rds_cmsg_rdma_args(rs, rm, cmsg);
> Index: build-2.6/net/rds/message.c
> ===================================================================
> --- build-2.6.orig/net/rds/message.c
> +++ build-2.6/net/rds/message.c
> @@ -71,6 +71,8 @@ static void rds_message_purge(struct rds
>
> if (rm->m_rdma_op)
> rds_rdma_free_op(rm->m_rdma_op);
> + if (rm->m_rdma_mr)
> + rds_mr_put(rm->m_rdma_mr);
> }
>
> void rds_message_inc_purge(struct rds_incoming *inc)
> Index: build-2.6/net/rds/rdma.h
> ===================================================================
> --- build-2.6.orig/net/rds/rdma.h
> +++ build-2.6/net/rds/rdma.h
> @@ -74,4 +74,11 @@ int rds_cmsg_rdma_map(struct rds_sock *r
> void rds_rdma_free_op(struct rds_rdma_op *ro);
> void rds_rdma_send_complete(struct rds_message *rm, int);
>
> +extern void __rds_put_mr_final(struct rds_mr *mr);
> +static inline void rds_mr_put(struct rds_mr *mr)
> +{
> + if (atomic_dec_and_test(&mr->r_refcount))
> + __rds_put_mr_final(mr);
> +}
> +
> #endif
> Index: build-2.6/net/rds/ib_recv.c
> ===================================================================
> --- build-2.6.orig/net/rds/ib_recv.c
> +++ build-2.6/net/rds/ib_recv.c
> @@ -796,7 +796,7 @@ void rds_ib_recv_cq_comp_handler(struct
> while (ib_poll_cq(cq, 1, &wc) > 0) {
> rdsdebug("wc wr_id 0x%llx status %u byte_len %u imm_data %u\n",
> (unsigned long long)wc.wr_id, wc.status, wc.byte_len,
> - be32_to_cpu(wc.imm_data));
> + be32_to_cpu(wc.ex.imm_data));
> rds_ib_stats_inc(s_ib_rx_cq_event);
>
> recv = &ic->i_recvs[rds_ib_ring_oldest(&ic->i_recv_ring)];
More information about the general
mailing list