[ofa-general] [PATCH RFC] rds: add iwarp support

Jon Mason jon at opengridcomputing.com
Mon Jul 7 09:55:55 PDT 2008


On Mon, Jul 07, 2008 at 02:40:45PM +0200, Olaf Kirch wrote:
> On Thursday 03 July 2008 23:34:12 Jon Mason wrote:
> > This patch adds support for running RDS over iWARP adapters.  It
> Hi Jon,
> 
> I took your patch and tried to isolate the iWARP specific changes
> in bcopy mode, and roll them into a smaller patch that doesn't duplicate
> all the ib*.[hc] files.
> 
> I also tried to come to some working solution for RDMA - as you can
> see from the deluge of messages I wrote on this :-) the approach you
> chose has some problems.
> 
> Please take a look at the attached patch and let me know whether
> (a) bcopy mode works, and (b) if the rdma approach may work with
> iwarp nics.

It doesn't seem to work.  After looking through the code, the
check_laddr will fail because it is still looking for the IB arp and not
the inet arp needed by iWARP.  I hacked around that but it is still not
working.  I'll look through the patch more and see if I can determine
what is still breaking.

Thanks,
Jon

> 
> Olaf
> 
> -- 
> Olaf Kirch  |  --- o --- Nous sommes du soleil we love when we play
> okir at lst.de |    / | \   sol.dhoop.naytheet.ah kin.ir.samse.qurax

> From: Olaf Kirch <olaf.kirch at oracle.com>
> Subject: [PATCH RFC] RDS: Add iWARP Support
> 
> This is based on the work posted by Jon Mason. It extracts
> the iWARP-specific changes that are needed to support bcopy
> mode (I hope I caught all of them).
> 
> I also did some work on RDMA support. This is a lot harder,
> because the interface and implementation were designed with
> classic MRs in mind. However, I think the approach taken below
> may result in a working approach (it's not working yet - I left
> some blanks and BUG() asserts in there, because I wanted to get this
> patch out as a RFC sooner rather than later).
> 
> Also, this is a pretty large patch - it needs to be broken down into
> half a dozen or so smaller functional changes for better review.
> 
> Olaf
> ---
>  net/rds/ib.c      |   30 ++
>  net/rds/ib.h      |   55 ++++
>  net/rds/ib_cm.c   |   36 ++-
>  net/rds/ib_rdma.c |  610 +++++++++++++++++++++++++++++++++++++++++++++---------
>  net/rds/ib_recv.c |    2 
>  net/rds/ib_send.c |  133 +++++++++++
>  net/rds/message.c |    2 
>  net/rds/rdma.c    |   17 -
>  net/rds/rdma.h    |    7 
>  net/rds/rds.h     |    4 
>  net/rds/send.c    |    7 
>  11 files changed, 778 insertions(+), 125 deletions(-)
> 
> Index: build-2.6/net/rds/ib.c
> ===================================================================
> --- build-2.6.orig/net/rds/ib.c
> +++ build-2.6/net/rds/ib.c
> @@ -42,6 +42,7 @@
>  
>  unsigned int fmr_pool_size = RDS_FMR_POOL_SIZE;
>  unsigned int fmr_message_size = RDS_FMR_SIZE + 1; /* +1 allows for unaligned MRs */
> +unsigned int fastreg_pool_size = RDS_FMR_POOL_SIZE;
>  
>  module_param(fmr_pool_size, int, 0444);
>  MODULE_PARM_DESC(fmr_pool_size, " Max number of fmr per HCA");
> @@ -85,21 +86,38 @@ void rds_ib_add_one(struct ib_device *de
>  	rds_ibdev->fmr_page_size  = 1 << rds_ibdev->fmr_page_shift;
>  	rds_ibdev->fmr_page_mask  = ~((u64) rds_ibdev->fmr_page_size - 1);
>  	rds_ibdev->fmr_max_remaps = dev_attr->max_map_per_fmr?: 32;
> -	rds_ibdev->max_fmrs = dev_attr->max_fmr?
> -			min_t(unsigned int, dev_attr->max_fmr, fmr_pool_size) :
> -			fmr_pool_size;
> +	rds_ibdev->max_fmrs = dev_attr->max_fmr;
>  
>  	rds_ibdev->dev = device;
>  	rds_ibdev->pd = ib_alloc_pd(device);
>  	if (IS_ERR(rds_ibdev->pd))
>  		goto free_dev;
>  
> -	rds_ibdev->mr = ib_get_dma_mr(rds_ibdev->pd,
> -				      IB_ACCESS_LOCAL_WRITE);
> +	if (device->node_type != RDMA_NODE_RNIC) {
> +		rds_ibdev->mr = ib_get_dma_mr(rds_ibdev->pd,
> +					IB_ACCESS_LOCAL_WRITE);
> +	} else {
> +		/* Why does it have to have these permissions? */
> +		rds_ibdev->mr = ib_get_dma_mr(rds_ibdev->pd,
> +					IB_ACCESS_REMOTE_READ |
> +					IB_ACCESS_REMOTE_WRITE |
> +					IB_ACCESS_LOCAL_WRITE);
> +	}
>  	if (IS_ERR(rds_ibdev->mr))
>  		goto err_pd;
>  
> -	rds_ibdev->mr_pool = rds_ib_create_mr_pool(rds_ibdev);
> +	/* Create the MR pool. We choose different strategies for
> +	 * MRs depending on the hardware.
> +	 */
> +	if (dev_attr->device_cap_flags & IB_DEVICE_MEM_MGT_EXTENSIONS) {
> +		/* Use fast registrations */
> +		rds_ibdev->mr_pool = rds_ib_create_fastreg_pool(rds_ibdev);
> +		rds_ibdev->use_fastreg = 1;
> +	} else {
> +		/* Default: use FMRs. Would be nice if there was
> +		 * a capability flag to test for. */
> +		rds_ibdev->mr_pool = rds_ib_create_mr_pool(rds_ibdev);
> +	}
>  	if (IS_ERR(rds_ibdev->mr_pool)) {
>  		rds_ibdev->mr_pool = NULL;
>  		goto err_mr;
> Index: build-2.6/net/rds/ib.h
> ===================================================================
> --- build-2.6.orig/net/rds/ib.h
> +++ build-2.6/net/rds/ib.h
> @@ -49,9 +49,51 @@ struct rds_ib_connect_private {
>  	__be32			dp_credit;		/* non-zero enables flow ctl */
>  };
>  
> +struct rds_ib_scatterlist {
> +	struct scatterlist *	list;
> +	unsigned int		len;
> +	int			dma_len;
> +};
> +
> +/* We need to post a LOCAL_INV request unless f_old_rkey
> + * has this value. */
> +#define RDS_IB_INVALID_FASTREG_KEY 0
> +
> +struct rds_ib_fastreg {
> +	atomic_t		f_refcnt;
> +	unsigned int		f_posted : 1,
> +				f_done : 1;
> +
> +	u32			f_old_rkey;
> +
> +	u32			f_rkey;
> +	unsigned int		f_length;
> +
> +	struct rds_ib_scatterlist f_sg;
> +
> +	struct ib_fast_reg_page_list *f_page_list;
> +	unsigned int		f_page_list_len;
> +	unsigned int		f_page_shift;
> +
> +#if 0
> +	u32			f_invalidate_rkey;
> +	struct ib_send_wr	f_wr;
> +	wait_queue_head_t	f_waitq;
> +	struct list_head	f_list;
> +	unsigned int		f_done;
> +	int			f_status;
> +#endif
> +
> +	struct rds_ib_mr	*f_mr;
> +};
> +
>  struct rds_ib_send_work {
>  	struct rds_message	*s_rm;
> +
> +	/* We should really put these into a union: */
>  	struct rds_rdma_op	*s_op;
> +	struct rds_ib_fastreg	*s_fastreg;
> +
>  	struct ib_send_wr	s_wr;
>  	struct ib_sge		s_sge[RDS_IB_MAX_SGE];
>  	unsigned long		s_queued;
> @@ -86,6 +128,7 @@ struct rds_ib_connection {
>  	struct rds_header	*i_send_hdrs;
>  	u64			i_send_hdrs_dma;
>  	struct rds_ib_send_work *i_sends;
> +	struct list_head	i_fastreg_pending;
>  
>  	/* rx */
>  	struct mutex		i_recv_mutex;
> @@ -123,7 +166,9 @@ struct rds_ib_connection {
>  	atomic_t		i_credits;
>  
>    	/* Protocol version specific information */
> -	unsigned int		i_flowctl : 1;	/* enable/disable flow ctl */
> +	unsigned int		i_flowctl : 1,	/* enable/disable flow ctl */
> +				i_iwarp   : 1,	/* this is actually iWARP not IB */
> +				i_fastreg : 1;	/* use fastreg */
>  
>  	/* Batched completions */
>  	unsigned int		i_unsignaled_wrs;
> @@ -154,6 +199,7 @@ struct rds_ib_device {
>  	unsigned int		fmr_max_remaps;
>  	unsigned int		max_fmrs;
>  	int			max_sge;
> +	unsigned int		use_fastreg : 1;
>  	spinlock_t		spinlock;
>  };
>  
> @@ -236,6 +282,7 @@ extern void rds_ib_remove_one(struct ib_
>  extern struct ib_client rds_ib_client;
>  
>  extern unsigned int fmr_pool_size;
> +extern unsigned int fastreg_pool_size;
>  extern unsigned int fmr_message_size;
>  
>  /* ib_cm.c */
> @@ -254,6 +301,7 @@ void __rds_ib_conn_error(struct rds_conn
>  /* ib_rdma.c */
>  int ib_update_ipaddr_for_device(struct rds_ib_device *rds_ibdev, __be32 ipaddr);
>  struct rds_ib_mr_pool *rds_ib_create_mr_pool(struct rds_ib_device *);
> +struct rds_ib_mr_pool *rds_ib_create_fastreg_pool(struct rds_ib_device *);
>  void rds_ib_get_mr_info(struct rds_ib_device *rds_ibdev, struct rds_info_ib_connection *iinfo);
>  void rds_ib_destroy_mr_pool(struct rds_ib_mr_pool *);
>  void *rds_ib_get_mr(struct scatterlist *sg, unsigned long nents,
> @@ -261,6 +309,10 @@ void *rds_ib_get_mr(struct scatterlist *
>  void rds_ib_sync_mr(void *trans_private, int dir);
>  void rds_ib_free_mr(void *trans_private, int invalidate);
>  void rds_ib_flush_mrs(void);
> +struct rds_ib_fastreg *rds_ib_rdma_get_fastreg(struct rds_mr *);
> +void rds_ib_fastreg_release(struct rds_ib_fastreg *frr);
> +void rds_ib_local_inv_complete(struct rds_ib_fastreg *frr, int status);
> +void rds_ib_fast_reg_complete(struct rds_ib_fastreg *frr, int status);
>  
>  /* ib_recv.c */
>  int __init rds_ib_recv_init(void);
> @@ -298,6 +350,7 @@ void rds_ib_send_cq_comp_handler(struct 
>  void rds_ib_send_init_ring(struct rds_ib_connection *ic);
>  void rds_ib_send_clear_ring(struct rds_ib_connection *ic);
>  int rds_ib_xmit_rdma(struct rds_connection *conn, struct rds_rdma_op *op);
> +int rds_ib_xmit_fastreg(struct rds_connection *conn, struct rds_mr *mr);
>  void rds_ib_send_add_credits(struct rds_connection *conn, unsigned int credits);
>  void rds_ib_advertise_credits(struct rds_connection *conn, unsigned int posted);
>  int rds_ib_send_grab_credits(struct rds_ib_connection *ic, u32 wanted,
> Index: build-2.6/net/rds/ib_rdma.c
> ===================================================================
> --- build-2.6.orig/net/rds/ib_rdma.c
> +++ build-2.6/net/rds/ib_rdma.c
> @@ -45,20 +45,31 @@ extern struct list_head rds_ib_devices;
>  struct rds_ib_mr {
>  	struct rds_ib_device	*device;
>  	struct rds_ib_mr_pool	*pool;
> -	struct ib_fmr		*fmr;
> +
> +	spinlock_t		lock;
> +	union {
> +	    struct {
> +		struct ib_fmr	*fmr;
> +	    } ib;
> +	    struct {
> +		struct ib_fast_reg_page_list *page_list;
> +		struct ib_mr	*fastreg_mr;
> +		u32		rkey;
> +		struct rds_ib_fastreg *pending;
> +	    } iwarp;
> +	} u;
>  	struct list_head	list;
>  	unsigned int		remap_count;
>  
> -	struct scatterlist *	sg;
> -	unsigned int		sg_len;
> -	u64 *			dma;
> -	int			sg_dma_len;
> +	struct rds_ib_scatterlist sg;
>  };
>  
>  /*
>   * Our own little FMR pool
>   */
>  struct rds_ib_mr_pool {
> +	struct rds_ib_device *	device;
> +
>  	struct mutex		flush_lock;		/* serialize fmr invalidate */
>  	struct work_struct	flush_worker;		/* flush worker */
>  
> @@ -68,16 +79,57 @@ struct rds_ib_mr_pool {
>  	struct list_head	drop_list;		/* MRs that have reached their max_maps limit */
>  	struct list_head	free_list;		/* unused MRs */
>  	struct list_head	clean_list;		/* unused & unamapped MRs */
> +	struct list_head	fastreg_list;		/* pending fastreg's */
>  	atomic_t		free_pinned;		/* memory pinned by free MRs */
> +	unsigned long		max_message_size;	/* in pages */
>  	unsigned long		max_items;
>  	unsigned long		max_items_soft;
>  	unsigned long		max_free_pinned;
>  	struct ib_fmr_attr	fmr_attr;
> +
> +	/* Dummy QP used to handle invalidate for fastreg */
> +	struct ib_qp		*qp;
> +
> +	struct rds_ib_mr_pool_ops *op;
> +};
> +
> +struct rds_ib_mr_pool_ops {
> +	int			(*init)(struct rds_ib_mr_pool *, struct rds_ib_mr *);
> +	int			(*map)(struct rds_ib_mr_pool *pool, struct rds_ib_mr *ibmr,
> +					struct scatterlist *sg, unsigned int sg_len);
> +	void			(*unmap)(struct rds_ib_mr_pool *, struct list_head *);
> +	void			(*destroy)(struct rds_ib_mr_pool *, struct rds_ib_mr *);
>  };
>  
>  static int rds_ib_flush_mr_pool(struct rds_ib_mr_pool *pool, int free_all);
>  static void rds_ib_teardown_mr(struct rds_ib_mr *ibmr);
>  static void rds_ib_mr_pool_flush_worker(struct work_struct *work);
> +static int rds_ib_init_fmr(struct rds_ib_mr_pool *pool, struct rds_ib_mr *ibmr);
> +static int rds_ib_map_fmr(struct rds_ib_mr_pool *pool,
> +			  struct rds_ib_mr *ibmr,
> +	       		  struct scatterlist *sg, unsigned int nents);
> +static void rds_ib_unmap_fmr_list(struct rds_ib_mr_pool *pool, struct list_head *unmap_list);
> +static void rds_ib_destroy_fmr(struct rds_ib_mr_pool *pool, struct rds_ib_mr *ibmr);
> +static int rds_ib_init_fastreg(struct rds_ib_mr_pool *pool, struct rds_ib_mr *ibmr);
> +static int rds_ib_map_fastreg(struct rds_ib_mr_pool *pool,
> +			  struct rds_ib_mr *ibmr,
> +	       		  struct scatterlist *sg, unsigned int nents);
> +static void rds_ib_unmap_fastreg_list(struct rds_ib_mr_pool *pool, struct list_head *unmap_list);
> +static void rds_ib_destroy_fastreg(struct rds_ib_mr_pool *pool, struct rds_ib_mr *ibmr);
> +
> +static struct rds_ib_mr_pool_ops rds_ib_fmr_pool_ops = {
> +	.init		= rds_ib_init_fmr,
> +	.map		= rds_ib_map_fmr,
> +	.unmap		= rds_ib_unmap_fmr_list,
> +	.destroy	= rds_ib_destroy_fmr,
> +};
> +
> +static struct rds_ib_mr_pool_ops rds_ib_fastreg_pool_ops = {
> +	.init		= rds_ib_init_fastreg,
> +	.map		= rds_ib_map_fastreg,
> +	.unmap		= rds_ib_unmap_fastreg_list,
> +	.destroy	= rds_ib_destroy_fastreg,
> +};
>  
>  int ib_update_ipaddr_for_device(struct rds_ib_device *rds_ibdev, __be32 ipaddr)
>  {
> @@ -124,7 +176,158 @@ struct rds_ib_device* ib_get_device(__be
>  	return NULL;
>  }
>  
> -struct rds_ib_mr_pool *rds_ib_create_mr_pool(struct rds_ib_device *rds_ibdev)
> +static void rds_ib_set_scatterlist(struct rds_ib_scatterlist *sg,
> +		struct scatterlist *list,
> +		unsigned int sg_len, unsigned int sg_dma_len)
> +{
> +	sg->list = list;
> +	sg->len = sg_len;
> +	sg->dma_len = sg_dma_len;
> +}
> +
> +static void rds_ib_rdma_drop_scatterlist(struct rds_ib_device *rds_ibdev,
> +		struct rds_ib_scatterlist *sg)
> +{
> +	if (sg->dma_len) {
> +		ib_dma_unmap_sg(rds_ibdev->dev,
> +				sg->list, sg->len,
> +				DMA_BIDIRECTIONAL);
> +		sg->dma_len = 0;
> +	}
> +
> +	/* Release the s/g list */
> +	if (sg->len) {
> +		unsigned int i;
> +
> +		for (i = 0; i < sg->len; ++i) {
> +			struct page *page = sg_page(&sg->list[i]);
> +
> +			/* FIXME we need a way to tell a r/w MR
> +			 * from a r/o MR */
> +			set_page_dirty(page);
> +			put_page(page);
> +		}
> +		kfree(sg->list);
> +
> +		sg->list = NULL;
> +		sg->len = 0;
> +	}
> +}
> +
> +/*
> + * IB FMR handling
> + */
> +static int rds_ib_init_fmr(struct rds_ib_mr_pool *pool,
> +				struct rds_ib_mr *ibmr)
> +{
> +	struct rds_ib_device *rds_ibdev = pool->device;
> +	struct ib_fmr *fmr;
> +
> +	fmr = ib_alloc_fmr(rds_ibdev->pd,
> +			(IB_ACCESS_LOCAL_WRITE |
> +			 IB_ACCESS_REMOTE_READ |
> +			 IB_ACCESS_REMOTE_WRITE),
> +			&pool->fmr_attr);
> +	if (IS_ERR(fmr)) {
> +		int err = PTR_ERR(fmr);
> +
> +		printk(KERN_WARNING "RDS/IB: ib_alloc_fmr failed (err=%d)\n", err);
> +		return err;
> +	}
> +
> +	ibmr->u.ib.fmr = fmr;
> +	return 0;
> +}
> +
> +static void rds_ib_unmap_fmr_list(struct rds_ib_mr_pool *pool,
> +				struct list_head *unmap_list)
> +{
> +	struct rds_ib_mr *ibmr;
> +	LIST_HEAD(fmr_list);
> +	int ret;
> +
> +	/* String all ib_mr's onto one list and hand them to ib_unmap_fmr */
> +	list_for_each_entry(ibmr, unmap_list, list)
> +		list_add(&ibmr->u.ib.fmr->list, &fmr_list);
> +	ret = ib_unmap_fmr(&fmr_list);
> +	if (ret)
> +		printk(KERN_WARNING "RDS/IB: ib_unmap_fmr failed (err=%d)\n", ret);
> +}
> +
> +static void rds_ib_destroy_fmr(struct rds_ib_mr_pool *pool,
> +				struct rds_ib_mr *ibmr)
> +{
> +	if (ibmr->u.ib.fmr)
> +		ib_dealloc_fmr(ibmr->u.ib.fmr);
> +	ibmr->u.ib.fmr = NULL;
> +}
> +
> +/*
> + * iWARP fastreg handling
> + */
> +static int rds_ib_init_fastreg(struct rds_ib_mr_pool *pool,
> +				struct rds_ib_mr *ibmr)
> +{
> +	struct rds_ib_device *rds_ibdev = pool->device;
> +	struct ib_mr *mr;
> +
> +	mr = ib_alloc_fast_reg_mr(rds_ibdev->pd, pool->max_message_size);
> +	if (IS_ERR(mr)) {
> +		int err = PTR_ERR(mr);
> +
> +		printk(KERN_WARNING "RDS/IWARP: ib_alloc_fast_reg_mr failed (err=%d)\n", err);
> +		return err;
> +	}
> +
> +	ibmr->u.iwarp.rkey = RDS_IB_INVALID_FASTREG_KEY;
> +	ibmr->u.iwarp.fastreg_mr = mr;
> +	return 0;
> +}
> +
> +static void rds_ib_unmap_fastreg_list(struct rds_ib_mr_pool *pool,
> +				struct list_head *unmap_list)
> +{
> +	LIST_HEAD(fmr_list);
> +
> +	/* Batched invalidation of fastreg MRs.
> +	 * Why do we do it this way, even though we could pipeline unmap
> +	 * and remap? The reason is the application semantics - when the
> +	 * application requests an invalidation of MRs, it expects all
> +	 * previously released R_Keys to become invalid.
> +	 *
> +	 * If we implement MR reuse naively, we risk memory corruption
> +	 * (this has actually been observed). So the default behavior
> +	 * requires that a MR goes through an explicit unmap operation before
> +	 * we can reuse it again.
> +	 *
> +	 * We could probably improve on this a little, by allowing immediate
> +	 * reuse of a MR on the same socket (eg you could add small
> +	 * cache of unused MRs to strct rds_socket - GET_MR could grab one
> +	 * of these without requiring an explicit invalidate).
> +	 */
> +
> +	/* Fill in the blanks:
> +	    Go through the list of dirty MRs, and post LOCAL_INV WRs to the
> +	    dummy pool->qp. When the completion for the last WR arrives,
> +	    the CQ handler wakes up the caller.
> +	  */
> +	BUG(); /* not implemented yet. */
> +}
> +
> +static void rds_ib_destroy_fastreg(struct rds_ib_mr_pool *pool,
> +		struct rds_ib_mr *ibmr)
> +{
> +	if (ibmr->u.iwarp.page_list)
> +		ib_free_fast_reg_page_list(ibmr->u.iwarp.page_list);
> +	if (ibmr->u.iwarp.fastreg_mr)
> +		ib_dereg_mr(ibmr->u.iwarp.fastreg_mr);
> +	if (ibmr->u.iwarp.pending)
> +		rds_ib_fastreg_release(ibmr->u.iwarp.pending);
> +}
> +
> +struct rds_ib_mr_pool *__rds_ib_create_mr_pool(struct rds_ib_device *rds_ibdev,
> +		unsigned int message_size, unsigned int pool_size,
> +		struct rds_ib_mr_pool_ops *ops)
>  {
>  	struct rds_ib_mr_pool *pool;
>  
> @@ -132,25 +335,68 @@ struct rds_ib_mr_pool *rds_ib_create_mr_
>  	if (!pool)
>  		return ERR_PTR(-ENOMEM);
>  
> +	pool->device = rds_ibdev;
>  	INIT_LIST_HEAD(&pool->free_list);
>  	INIT_LIST_HEAD(&pool->drop_list);
>  	INIT_LIST_HEAD(&pool->clean_list);
> +	INIT_LIST_HEAD(&pool->fastreg_list);
>  	mutex_init(&pool->flush_lock);
>  	spin_lock_init(&pool->list_lock);
>  	INIT_WORK(&pool->flush_worker, rds_ib_mr_pool_flush_worker);
>  
> -	pool->fmr_attr.max_pages = fmr_message_size;
> -	pool->fmr_attr.max_maps = rds_ibdev->fmr_max_remaps;
> -	pool->fmr_attr.page_shift = rds_ibdev->fmr_page_shift;
> -	pool->max_free_pinned = rds_ibdev->max_fmrs * fmr_message_size / 4;
> +	pool->max_message_size = message_size;
> +	pool->max_items = pool_size;
> +	pool->max_free_pinned = pool->max_items * pool->max_message_size / 4;
>  
>  	/* We never allow more than max_items MRs to be allocated.
>  	 * When we exceed more than max_items_soft, we start freeing
>  	 * items more aggressively.
>  	 * Make sure that max_items > max_items_soft > max_items / 2
>  	 */
> -	pool->max_items_soft = rds_ibdev->max_fmrs * 3 / 4;
> -	pool->max_items = rds_ibdev->max_fmrs;
> +	pool->max_items_soft = pool->max_items * 3 / 4;
> +
> +	return pool;
> +}
> +
> +struct rds_ib_mr_pool *rds_ib_create_mr_pool(struct rds_ib_device *rds_ibdev)
> +{
> +	struct rds_ib_mr_pool *pool;
> +	unsigned int pool_size = fmr_pool_size;
> +
> +	if (rds_ibdev->max_fmrs && rds_ibdev->max_fmrs < pool_size)
> +		pool_size = rds_ibdev->max_fmrs;
> +
> +	pool = __rds_ib_create_mr_pool(rds_ibdev, fmr_message_size, pool_size,
> +				&rds_ib_fmr_pool_ops);
> +
> +	if (!IS_ERR(pool)) {
> +		pool->fmr_attr.max_pages = pool->max_message_size;
> +		pool->fmr_attr.max_maps = rds_ibdev->fmr_max_remaps;
> +		pool->fmr_attr.page_shift = rds_ibdev->fmr_page_shift;
> +	}
> +
> +	return pool;
> +}
> +
> +struct rds_ib_mr_pool *rds_ib_create_fastreg_pool(struct rds_ib_device *rds_ibdev)
> +{
> +	struct rds_ib_mr_pool *pool;
> +	unsigned int pool_size = fmr_pool_size;
> +
> +	if (rds_ibdev->max_fmrs && rds_ibdev->max_fmrs < pool_size)
> +		pool_size = rds_ibdev->max_fmrs;
> +
> +	pool = __rds_ib_create_mr_pool(rds_ibdev, fmr_message_size,
> +				fastreg_pool_size,
> +				&rds_ib_fastreg_pool_ops);
> +
> +	if (!IS_ERR(pool)) {
> +		/* Fill in the blanks:
> +		 *  create a dummy QP to which we can post LOCAL_INV
> +		 *  requests when invalidating MRs
> +		 */
> +		pool->qp = NULL;
> +	}
>  
>  	return pool;
>  }
> @@ -169,6 +415,10 @@ void rds_ib_destroy_mr_pool(struct rds_i
>  	rds_ib_flush_mr_pool(pool, 1);
>  	BUG_ON(atomic_read(&pool->item_count));
>  	BUG_ON(atomic_read(&pool->free_pinned));
> +
> +	if (pool->qp)
> +		ib_destroy_qp(pool->qp);
> +
>  	kfree(pool);
>  }
>  
> @@ -227,77 +477,82 @@ static struct rds_ib_mr *rds_ib_alloc_fm
>  		goto out_no_cigar;
>  	}
>  
> -	ibmr->fmr = ib_alloc_fmr(rds_ibdev->pd,
> -			(IB_ACCESS_LOCAL_WRITE |
> -			 IB_ACCESS_REMOTE_READ |
> -			 IB_ACCESS_REMOTE_WRITE),
> -			&pool->fmr_attr);
> -	if (IS_ERR(ibmr->fmr)) {
> -		err = PTR_ERR(ibmr->fmr);
> -		ibmr->fmr = NULL;
> -		printk(KERN_WARNING "RDS/IB: ib_alloc_fmr failed (err=%d)\n", err);
> +	spin_lock_init(&ibmr->lock);
> +
> +	err = pool->op->init(pool, ibmr);
> +	if (err)
>  		goto out_no_cigar;
> -	}
>  
>  	rds_ib_stats_inc(s_ib_rdma_mr_alloc);
>  	return ibmr;
>  
>  out_no_cigar:
>  	if (ibmr) {
> -		if (ibmr->fmr)
> -			ib_dealloc_fmr(ibmr->fmr);
> +		pool->op->destroy(pool, ibmr);
>  		kfree(ibmr);
>  	}
>  	atomic_dec(&pool->item_count);
>  	return ERR_PTR(err);
>  }
>  
> -static int rds_ib_map_fmr(struct rds_ib_device *rds_ibdev, struct rds_ib_mr *ibmr,
> -	       struct scatterlist *sg, unsigned int nents)
> +static int rds_ib_count_dma_pages(struct rds_ib_device *rds_ibdev,
> +			  struct scatterlist *sg, unsigned int sg_dma_len,
> +			  unsigned int *lenp)
>  {
>  	struct ib_device *dev = rds_ibdev->dev;
> -	struct scatterlist *scat = sg;
> -	u64 io_addr = 0;
> -	u64 *dma_pages;
> -	u32 len;
> -	int page_cnt, sg_dma_len;
> -	int i, j;
> -	int ret;
> -
> -	sg_dma_len = ib_dma_map_sg(dev, sg, nents,
> -				 DMA_BIDIRECTIONAL);
> -	if (unlikely(!sg_dma_len)) {
> -	        printk(KERN_WARNING "RDS/IB: dma_map_sg failed!\n");
> -		return -EBUSY;
> -	}
> -
> -	len = 0;
> -	page_cnt = 0;
> +	unsigned int i, page_cnt = 0, len = 0;
>  
>  	for (i = 0; i < sg_dma_len; ++i) {
> -		unsigned int dma_len = ib_sg_dma_len(dev, &scat[i]);
> -		u64 dma_addr = ib_sg_dma_address(dev, &scat[i]);
> +		unsigned int dma_len = ib_sg_dma_len(dev, &sg[i]);
> +		u64 dma_addr = ib_sg_dma_address(dev, &sg[i]);
>  	
>  		if (dma_addr & ~rds_ibdev->fmr_page_mask) {
>  			if (i > 0)
>  				return -EINVAL;
> -			else
> -				++page_cnt;
> +			++page_cnt;
>  		}
>  		if ((dma_addr + dma_len) & ~rds_ibdev->fmr_page_mask) {
>  			if (i < sg_dma_len - 1)
>  				return -EINVAL;
> -			else
> -				++page_cnt;
> +			++page_cnt;
>  		}
>  
>  		len += dma_len;
>  	}
>  
>  	page_cnt += len >> rds_ibdev->fmr_page_shift;
> -	if (page_cnt > fmr_message_size)
> +	if (page_cnt > rds_ibdev->mr_pool->max_message_size)
>  		return -EINVAL;
>  
> +	return page_cnt;
> +}
> +
> +static int rds_ib_map_fmr(struct rds_ib_mr_pool *pool,
> +			  struct rds_ib_mr *ibmr,
> +	       		  struct scatterlist *sg, unsigned int nents)
> +{
> +	struct rds_ib_device *rds_ibdev = pool->device;
> +	struct ib_device *dev = rds_ibdev->dev;
> +	struct scatterlist *scat = sg;
> +	u64 io_addr = 0;
> +	u64 *dma_pages;
> +	int page_cnt, sg_dma_len;
> +	int i, j;
> +	int ret;
> +
> +	sg_dma_len = ib_dma_map_sg(dev, sg, nents,
> +				 DMA_BIDIRECTIONAL);
> +	if (unlikely(!sg_dma_len)) {
> +	        printk(KERN_WARNING "RDS/IB: dma_map_sg failed!\n");
> +		return -EBUSY;
> +	}
> +
> +	/* FIXME: when returning an error, we need to unmap the SG */
> +
> +	page_cnt = rds_ib_count_dma_pages(rds_ibdev, sg, sg_dma_len, NULL);
> +	if (page_cnt < 0)
> +		return page_cnt;
> +
>  	dma_pages = kmalloc(sizeof(u64) * page_cnt, GFP_ATOMIC);
>  	if (!dma_pages)
>  		return -ENOMEM;
> @@ -312,7 +567,7 @@ static int rds_ib_map_fmr(struct rds_ib_
>  				(dma_addr & rds_ibdev->fmr_page_mask) + j;
>  	}
>  				
> -	ret = ib_map_phys_fmr(ibmr->fmr,
> +	ret = ib_map_phys_fmr(ibmr->u.ib.fmr,
>  				   dma_pages, page_cnt, io_addr);	
>  	if (ret)
>  		goto out;
> @@ -321,9 +576,9 @@ static int rds_ib_map_fmr(struct rds_ib_
>  	 * safely tear down the old mapping. */
>  	rds_ib_teardown_mr(ibmr);
>  
> -	ibmr->sg = scat;
> -	ibmr->sg_len = nents;
> -	ibmr->sg_dma_len = sg_dma_len;
> +	ibmr->sg.list = scat;
> +	ibmr->sg.len = nents;
> +	ibmr->sg.dma_len = sg_dma_len;
>  	ibmr->remap_count++;
>  
>  	rds_ib_stats_inc(s_ib_rdma_mr_used);
> @@ -335,6 +590,192 @@ out:
>  	return ret;
>  }
>  
> +static int rds_ib_map_fastreg(struct rds_ib_mr_pool *pool,
> +			struct rds_ib_mr *ibmr,
> +	       		struct scatterlist *sg, unsigned int sg_len)
> +{
> +	struct rds_ib_device *rds_ibdev = pool->device;
> +	struct ib_device *dev = rds_ibdev->dev;
> +	struct ib_fast_reg_page_list *page_list = NULL;
> +	struct rds_ib_fastreg *frr;
> +	unsigned int len;
> +	int i, j, page_cnt, sg_dma_len = 0;
> +	int ret;
> +
> +	BUG_ON(ibmr->u.iwarp.pending);
> +
> +	page_list = ib_alloc_fast_reg_page_list(rds_ibdev->dev, pool->max_message_size);
> +	if (IS_ERR(page_list)) {
> +		ret = PTR_ERR(page_list);
> +		page_list = NULL;
> +
> +		printk(KERN_WARNING "RDS/iWARP: ib_alloc_fast_reg_page_list failed (err=%d)\n", ret);
> +		return ret;
> +	}
> +
> +	sg_dma_len = ib_dma_map_sg(dev, sg, sg_len, DMA_BIDIRECTIONAL);
> +	if (unlikely(!sg_dma_len)) {
> +	        printk(KERN_WARNING "RDS/iWARP: dma_map_sg failed!\n");
> +		ret = -EBUSY;
> +		goto out;
> +	}
> +
> +	page_cnt = rds_ib_count_dma_pages(rds_ibdev, sg, sg_dma_len, &len);
> +	if (page_cnt < 0) {
> +		ret = page_cnt;
> +		goto out;
> +	}
> +
> +	page_cnt = 0;
> +	for (i = 0; i < sg_dma_len; ++i) {
> +		unsigned int dma_len = ib_sg_dma_len(dev, &sg[i]);
> +		u64 dma_addr = ib_sg_dma_address(dev, &sg[i]);
> +	
> +		for (j = 0; j < dma_len; j += rds_ibdev->fmr_page_size)
> +			page_list->page_list[page_cnt++] =
> +				(dma_addr & rds_ibdev->fmr_page_mask) + j;
> +	}
> +
> +	/* Allocate the fastreg request structure */
> +	frr = kzalloc(sizeof(*frr), GFP_KERNEL);
> +	if (!frr) {
> +		ret = -ENOMEM;
> +		goto out;
> +	}
> +
> +	ib_update_fast_reg_key(ibmr->u.iwarp.fastreg_mr, ibmr->remap_count++);
> +
> +	/* Build the fastreg WR */
> +	frr->f_mr = ibmr;
> +	rds_ib_set_scatterlist(&frr->f_sg, sg, sg_len, sg_dma_len);
> +	frr->f_length = len;
> +	frr->f_rkey = ibmr->u.iwarp.fastreg_mr->rkey;
> +	frr->f_page_list = page_list;
> +	frr->f_page_list_len = sg_dma_len;
> +	frr->f_page_shift = rds_ibdev->fmr_page_shift;
> +
> +	frr->f_old_rkey = ibmr->u.iwarp.rkey;
> +
> +	/* Attach the fastreg info to the MR */
> +	atomic_set(&frr->f_refcnt, 1);
> +	ibmr->u.iwarp.pending = frr;
> +
> +	rds_ib_stats_inc(s_ib_rdma_mr_used);
> +	ret = 0;
> +
> +out:
> +	if (ret) {
> +		ib_free_fast_reg_page_list(page_list);
> +		if (sg_dma_len)
> +			ib_dma_unmap_sg(dev, sg, sg_dma_len, DMA_BIDIRECTIONAL);
> +	}
> +
> +	return ret;
> +}
> +
> +struct rds_ib_fastreg *rds_ib_rdma_get_fastreg(struct rds_mr *mr)
> +{
> +	struct rds_ib_mr *ibmr = mr->r_trans_private;
> +	struct rds_ib_fastreg *frr;
> +	unsigned long flags;
> +
> +	spin_lock_irqsave(&ibmr->lock, flags);
> +	frr = ibmr->u.iwarp.pending;
> +	if (frr) {
> +		/* FIXME: we need to mark the frr as "locked"
> +		 * to prevent FREE_MR from trashing the MR
> +		 * as long as the fastreg is on the queue */
> +		atomic_inc(&frr->f_refcnt);
> +	}
> +	spin_unlock_irqrestore(&ibmr->lock, flags);
> +
> +	return frr;
> +}
> +
> +void rds_ib_fastreg_release(struct rds_ib_fastreg *frr)
> +{
> +	struct rds_ib_device *rds_ibdev = NULL;
> +
> +	if (atomic_dec_and_test(&frr->f_refcnt)) {
> +		ib_free_fast_reg_page_list(frr->f_page_list);
> +		BUG(); /* FIXME: obtain rds_ibdev */
> +		rds_ib_rdma_drop_scatterlist(rds_ibdev, &frr->f_sg);
> +		kfree(frr);
> +	}
> +}
> +
> +/*
> + * These functions are called back from the send CQ handler
> + * when the LOCAL_INV or FAST_REG_MR WRs complete.
> + */
> +void rds_ib_local_inv_complete(struct rds_ib_fastreg *frr, int status)
> +{
> +	struct rds_ib_mr *ibmr = frr->f_mr;
> +
> +	spin_lock(&ibmr->lock);
> +	if (ibmr->u.iwarp.pending != frr)
> +		goto out_unlock;
> +
> +	if (status != IB_WC_SUCCESS) {
> +		/* Yikes. Invalidation failed. What can we do but complain? */
> +		printk(KERN_NOTICE "RDS/iWARP: Unable to invalidate fastreg MR.\n");
> +		goto out_unlock;
> +	}
> +
> +	if (frr->f_old_rkey == ibmr->u.iwarp.rkey) {
> +		ibmr->u.iwarp.rkey = 0;
> +		/* Now we can unpin any memory pinned for this MR. */
> +		rds_ib_teardown_mr(ibmr);
> +	}
> +	frr->f_old_rkey = RDS_IB_INVALID_FASTREG_KEY;
> +
> +out_unlock:
> +	spin_unlock(&ibmr->lock);
> +
> +	/* The WR owned a reference to this frr. Drop it */
> +	rds_ib_fastreg_release(frr);
> +}
> +
> +void rds_ib_fast_reg_complete(struct rds_ib_fastreg *frr, int status)
> +{
> +	struct rds_ib_mr *ibmr = frr->f_mr;
> +
> +	spin_lock(&ibmr->lock);
> +
> +	/* Technically, this would be a bug */
> +	if (ibmr->u.iwarp.pending != frr)
> +		goto out_unlock;
> +
> +	if (status != IB_WC_SUCCESS) {
> +		/* Yikes. We were unable to register the application's
> +		 * memory. We have no way of notifying the application.
> +		 * We could probably tear down the QP and cry uncle, but
> +		 * the SEND may already have gone out.
> +		 * The only solace is that the RDMA initiated by the remote
> +		 * will fail, because the key isn't valid.
> +		 */
> +		if (printk_ratelimit())
> +			printk(KERN_NOTICE "RDS/iWARP: Unable to "
> +					"perform fast memory registration.\n");
> +		goto out_unlock;
> +	}
> +
> +	ibmr->sg = frr->f_sg;
> +	ibmr->u.iwarp.page_list = frr->f_page_list;
> +	ibmr->u.iwarp.rkey = frr->f_rkey;
> +
> +	/* Detach frr from MR. We still have at least one ref after this */
> +	ibmr->u.iwarp.pending = NULL;
> +	rds_ib_fastreg_release(frr);
> +	frr->f_done = 1;
> +
> +out_unlock:
> +	spin_unlock(&ibmr->lock);
> +
> +	/* The WR owned a reference to this frr. Drop it */
> +	rds_ib_fastreg_release(frr);
> +}
> +
>  void rds_ib_sync_mr(void *trans_private, int direction)
>  {
>  	struct rds_ib_mr *ibmr = trans_private;
> @@ -342,49 +783,24 @@ void rds_ib_sync_mr(void *trans_private,
>  
>  	switch (direction) {
>  	case DMA_FROM_DEVICE:
> -		ib_dma_sync_sg_for_cpu(rds_ibdev->dev, ibmr->sg,
> -			ibmr->sg_dma_len, DMA_BIDIRECTIONAL);
> +		ib_dma_sync_sg_for_cpu(rds_ibdev->dev, ibmr->sg.list,
> +			ibmr->sg.dma_len, DMA_BIDIRECTIONAL);
>  		break;
>  	case DMA_TO_DEVICE:
> -		ib_dma_sync_sg_for_device(rds_ibdev->dev, ibmr->sg,
> -			ibmr->sg_dma_len, DMA_BIDIRECTIONAL);
> +		ib_dma_sync_sg_for_device(rds_ibdev->dev, ibmr->sg.list,
> +			ibmr->sg.dma_len, DMA_BIDIRECTIONAL);
>  		break;
>  	}
>  }
>  
>  static void __rds_ib_teardown_mr(struct rds_ib_mr *ibmr)
>  {
> -	struct rds_ib_device *rds_ibdev = ibmr->device;
> -
> -	if (ibmr->sg_dma_len) {
> -		ib_dma_unmap_sg(rds_ibdev->dev,
> -				ibmr->sg, ibmr->sg_len,
> -				DMA_BIDIRECTIONAL);
> -		ibmr->sg_dma_len = 0;
> -	}
> -
> -	/* Release the s/g list */
> -	if (ibmr->sg_len) {
> -		unsigned int i;
> -
> -		for (i = 0; i < ibmr->sg_len; ++i) {
> -			struct page *page = sg_page(&ibmr->sg[i]);
> -
> -			/* FIXME we need a way to tell a r/w MR
> -			 * from a r/o MR */
> -			set_page_dirty(page);
> -			put_page(page);
> -		}
> -		kfree(ibmr->sg);
> -
> -		ibmr->sg = NULL;
> -		ibmr->sg_len = 0;
> -	}
> +	rds_ib_rdma_drop_scatterlist(ibmr->device, &ibmr->sg);
>  }
>  
>  void rds_ib_teardown_mr(struct rds_ib_mr *ibmr)
>  {
> -	unsigned int pinned = ibmr->sg_len;
> +	unsigned int pinned = ibmr->sg.len;
>  
>  	__rds_ib_teardown_mr(ibmr);
>  	if (pinned) {
> @@ -419,7 +835,6 @@ int rds_ib_flush_mr_pool(struct rds_ib_m
>  {
>  	struct rds_ib_mr *ibmr, *next;
>  	LIST_HEAD(unmap_list);
> -	LIST_HEAD(fmr_list);
>  	unsigned long unpinned = 0;
>  	unsigned long flags;
>  	unsigned int nfreed = 0, ncleaned = 0, free_goal;
> @@ -443,21 +858,17 @@ int rds_ib_flush_mr_pool(struct rds_ib_m
>  	if (list_empty(&unmap_list))
>  		goto out;
>  
> -	/* String all ib_mr's onto one list and hand them to ib_unmap_fmr */
> -	list_for_each_entry(ibmr, &unmap_list, list)
> -		list_add(&ibmr->fmr->list, &fmr_list);
> -	ret = ib_unmap_fmr(&fmr_list);
> -	if (ret)
> -		printk(KERN_WARNING "RDS/IB: ib_unmap_fmr failed (err=%d)\n", ret);
> +	/* Batched invalidate of dirty MRs: */
> +	pool->op->unmap(pool, &unmap_list);
>  
>  	/* Now we can destroy the DMA mapping and unpin any pages */
>  	list_for_each_entry_safe(ibmr, next, &unmap_list, list) {
> -		unpinned += ibmr->sg_len;
> +		unpinned += ibmr->sg.len;
>  		__rds_ib_teardown_mr(ibmr);
>  		if (nfreed < free_goal || ibmr->remap_count >= pool->fmr_attr.max_maps) {
>  			rds_ib_stats_inc(s_ib_rdma_mr_free);
>  			list_del(&ibmr->list);
> -			ib_dealloc_fmr(ibmr->fmr);
> +			pool->op->destroy(pool, ibmr);
>  			kfree(ibmr);
>  			nfreed++;
>  		}
> @@ -491,7 +902,7 @@ void rds_ib_free_mr(void *trans_private,
>  	struct rds_ib_mr_pool *pool = rds_ibdev->mr_pool;
>  	unsigned long flags;
>  
> -	rdsdebug("RDS/IB: free_mr nents %u\n", ibmr->sg_len);
> +	rdsdebug("RDS/IB: free_mr nents %u\n", ibmr->sg.len);
>  
>  	/* Return it to the pool's free list */
>  	spin_lock_irqsave(&pool->list_lock, flags);
> @@ -500,7 +911,7 @@ void rds_ib_free_mr(void *trans_private,
>  	} else {
>  		list_add(&ibmr->list, &pool->free_list);
>  	}
> -	atomic_add(ibmr->sg_len, &pool->free_pinned);
> +	atomic_add(ibmr->sg.len, &pool->free_pinned);
>  	atomic_inc(&pool->dirty_count);
>  	spin_unlock_irqrestore(&pool->list_lock, flags);
>  
> @@ -536,6 +947,7 @@ void *rds_ib_get_mr(struct scatterlist *
>  		    __be32 ip_addr, u32 *key_ret)
>  {
>  	struct rds_ib_device *rds_ibdev;
> +	struct rds_ib_mr_pool *pool;
>  	struct rds_ib_mr *ibmr = NULL;
>  	int ret;
>  
> @@ -545,7 +957,7 @@ void *rds_ib_get_mr(struct scatterlist *
>  		goto out;
>  	}
>  
> -	if (!rds_ibdev->mr_pool) {
> +	if (!(pool = rds_ibdev->mr_pool)) {
>  		ret = -ENODEV;
>  		goto out;
>  	}
> @@ -554,9 +966,9 @@ void *rds_ib_get_mr(struct scatterlist *
>  	if (IS_ERR(ibmr))
>  		return ibmr;
>  
> -	ret = rds_ib_map_fmr(rds_ibdev, ibmr, sg, nents);
> +	ret = pool->op->map(pool, ibmr, sg, nents);
>  	if (ret == 0)
> -		*key_ret = ibmr->fmr->rkey;
> +		*key_ret = ibmr->u.ib.fmr->rkey;
>  	else
>  		printk(KERN_WARNING "RDS/IB: map_fmr failed (errno=%d)\n", ret);
>  
> Index: build-2.6/net/rds/rdma.c
> ===================================================================
> --- build-2.6.orig/net/rds/rdma.c
> +++ build-2.6/net/rds/rdma.c
> @@ -116,11 +116,8 @@ static void rds_destroy_mr(struct rds_mr
>  		mr->r_trans->free_mr(trans_private, mr->r_invalidate);
>  }
>  
> -static void rds_mr_put(struct rds_mr *mr)
> +void __rds_put_mr_final(struct rds_mr *mr)
>  {
> -	if (!atomic_dec_and_test(&mr->r_refcount))
> -		return;
> -
>  	rds_destroy_mr(mr);
>  	kfree(mr);
>  }
> @@ -169,7 +166,7 @@ static int rds_pin_pages(unsigned long u
>  }
>  
>  static int __rds_rdma_map(struct rds_sock *rs, struct rds_get_mr_args *args,
> -				u64 *cookie_ret)
> +				u64 *cookie_ret, struct rds_mr **mr_ret)
>  {
>  	struct rds_mr *mr = NULL, *found;
>  	unsigned int nr_pages;
> @@ -297,6 +294,10 @@ static int __rds_rdma_map(struct rds_soc
>  
>  	rdsdebug("RDS: get_mr key is %x\n", mr->r_key);
>  
> +	if (mr_ret) {
> +		atomic_inc(&mr->r_refcount);
> +		*mr_ret = mr;
> +	}
>  	ret = 0;
>  out:
>  	if (pages)
> @@ -317,7 +318,7 @@ int rds_get_mr(struct rds_sock *rs, char
>  			   sizeof(struct rds_get_mr_args)))
>  		return -EFAULT;
>  
> -	return __rds_rdma_map(rs, &args, NULL);
> +	return __rds_rdma_map(rs, &args, NULL, NULL);
>  }
>  
>  /*
> @@ -655,7 +656,7 @@ int rds_cmsg_rdma_dest(struct rds_sock *
>  
>  	if (mr) {
>  		mr->r_trans->sync_mr(mr->r_trans_private, DMA_TO_DEVICE);
> -		rds_mr_put(mr);
> +		rm->m_rdma_mr = mr;
>  	}
>  	return err;
>  }
> @@ -673,5 +674,5 @@ int rds_cmsg_rdma_map(struct rds_sock *r
>  	 || rm->m_rdma_cookie != 0)
>  		return -EINVAL;
>  
> -	return __rds_rdma_map(rs, CMSG_DATA(cmsg), &rm->m_rdma_cookie);
> +	return __rds_rdma_map(rs, CMSG_DATA(cmsg), &rm->m_rdma_cookie, &rm->m_rdma_mr);
>  }
> Index: build-2.6/net/rds/rds.h
> ===================================================================
> --- build-2.6.orig/net/rds/rds.h
> +++ build-2.6/net/rds/rds.h
> @@ -30,6 +30,7 @@
>   */
>  #define RDS_IB_PORT	18635
>  #define RDS_TCP_PORT	18636
> +#define RDS_IWARP_PORT	18637
>  
>  #ifndef AF_RDS
>  #define AF_RDS          28      /* Reliable Datagram Socket     */
> @@ -60,6 +61,7 @@
>  /* XXX crap, we need to worry about this conflicting too */
>  #define SYSCTL_NET_RDS 9912
>  #define SYSCTL_NET_RDS_IB 100
> +#define SYSCTL_NET_RDS_IWARP 101
>  
>  #ifdef DEBUG
>  #define rdsdebug(fmt, args...) pr_debug("%s(): " fmt, __func__ , ##args)
> @@ -282,6 +284,7 @@ struct rds_incoming {
>  #define RDS_MSG_RETRANSMITTED	5
>  #define RDS_MSG_MAPPED		6
>  #define RDS_MSG_PAGEVEC		7
> +#define RDS_MSG_FASTREG_POSTED	8
>  
>  struct rds_message {
>  	atomic_t		m_refcount;
> @@ -301,6 +304,7 @@ struct rds_message {
>  	struct rds_sock		*m_rs;
>  	struct rds_rdma_op	*m_rdma_op;
>  	rds_rdma_cookie_t	m_rdma_cookie;
> +	struct rds_mr		*m_rdma_mr;
>  	unsigned int		m_nents;
>  	unsigned int		m_count;
>  	struct scatterlist	m_sg[0];
> Index: build-2.6/net/rds/ib_cm.c
> ===================================================================
> --- build-2.6.orig/net/rds/ib_cm.c
> +++ build-2.6/net/rds/ib_cm.c
> @@ -142,16 +142,19 @@ static void rds_ib_cm_fill_conn_param(st
>  			struct rds_ib_connect_private *dp,
>  			u32 protocol_version)
>  {
> +	struct rds_ib_connection *ic = conn->c_transport_data;
> +
>  	memset(conn_param, 0, sizeof(struct rdma_conn_param));
>  	/* XXX tune these? */
>  	conn_param->responder_resources = 1;
>  	conn_param->initiator_depth = 1;
> -	conn_param->retry_count = 7;
> -	conn_param->rnr_retry_count = 7;
>  
> -	if (dp) {
> -		struct rds_ib_connection *ic = conn->c_transport_data;
> +	if (!ic->i_iwarp) {
> +		conn_param->retry_count = 7;
> +		conn_param->rnr_retry_count = 7;
> +	}
>  
> +	if (dp) {
>  		memset(dp, 0, sizeof(*dp));
>  		dp->dp_saddr = conn->c_laddr;
>  		dp->dp_daddr = conn->c_faddr;
> @@ -288,7 +291,7 @@ static int rds_ib_setup_qp(struct rds_co
>  	 */
>  	ret = rdma_create_qp(ic->i_cm_id, ic->i_pd, &attr);
>  	if (ret) {
> -		rdsdebug("ib_req_notify_cq failed: %d\n", ret);
> +		rdsdebug("rdma_create_qp failed: %d\n", ret);
>  		goto out;
>  	}
>  
> @@ -442,6 +445,12 @@ static int rds_ib_cm_handle_connect(stru
>  	ic->i_cm_id = cm_id;
>  	cm_id->context = conn;
>  
> +	rds_ibdev = ib_get_client_data(cm_id->device, &rds_ib_client);
> +
> +	/* Remember whether this is IB or iWARP */
> +	ic->i_iwarp = (cm_id->device->node_type == RDMA_NODE_RNIC);
> +	ic->i_fastreg = rds_ibdev->use_fastreg;
> +
>   	/* We got halfway through setting up the ib_connection, if we
>   	 * fail now, we have to take the long route out of this mess. */
>   	destroy = 0;
> @@ -462,7 +471,6 @@ static int rds_ib_cm_handle_connect(stru
>   	}
>  
>  	/* update ib_device with this local ipaddr */
> -	rds_ibdev = ib_get_client_data(ic->i_cm_id->device, &rds_ib_client);
>  	ib_update_ipaddr_for_device(rds_ibdev, dp->dp_saddr);
>  
>   	return 0;
> @@ -616,6 +624,17 @@ int rds_ib_conn_connect(struct rds_conne
>  	src.sin_addr.s_addr = (__force u32)conn->c_laddr;
>  	src.sin_port = (__force u16)htons(0);
>  
> +	/* First, bind to the local address and device. */
> +	ret = rdma_bind_addr(ic->i_cm_id, (struct sockaddr *) &src);
> +	if (ret) {
> +		rdsdebug("rdma_bind_addr(%u.%u.%u.%u) failed: %d\n",
> +				NIPQUAD(conn->c_laddr), ret);
> +		goto out;
> +	}
> +
> +	/* Now check the device type and set i_iwarp */
> +	ic->i_iwarp = (ic->i_cm_id->device->node_type == RDMA_NODE_RNIC);
> +
>  	dest.sin_family = AF_INET;
>  	dest.sin_addr.s_addr = (__force u32)conn->c_faddr;
>  	dest.sin_port = (__force u16)htons(RDS_IB_PORT);
> @@ -662,8 +681,9 @@ void rds_ib_conn_shutdown(struct rds_con
>  				   " cm: %p err %d\n", ic->i_cm_id, err);
>  		}
>  
> -		/* Always move the QP to error state */
> -		if (ic->i_cm_id->qp) {
> +		/* For IB, we have to move the QP to error state.
> +		 * This is not needed for iWARP */
> +		if (ic->i_cm_id->qp && !ic->i_iwarp) {
>  			qp_attr.qp_state = IB_QPS_ERR;
>  			err = ib_modify_qp(ic->i_cm_id->qp, &qp_attr, IB_QP_STATE);
>  			if (err) {
> Index: build-2.6/net/rds/ib_send.c
> ===================================================================
> --- build-2.6.orig/net/rds/ib_send.c
> +++ build-2.6/net/rds/ib_send.c
> @@ -165,6 +165,8 @@ void rds_ib_send_clear_ring(struct rds_i
>  			rds_ib_send_unmap_rm(ic, send, IB_WC_WR_FLUSH_ERR);
>  		if (send->s_op)
>  			rds_ib_send_unmap_rdma(ic, send->s_op);
> +		if (send->s_fastreg)
> +			rds_ib_fastreg_release(send->s_fastreg);
>  	}
>  }
>  
> @@ -195,7 +197,7 @@ void rds_ib_send_cq_comp_handler(struct 
>  	while (ib_poll_cq(cq, 1, &wc) > 0 ) {
>  		rdsdebug("wc wr_id 0x%llx status %u byte_len %u imm_data %u\n",
>  			 (unsigned long long)wc.wr_id, wc.status, wc.byte_len,
> -			 be32_to_cpu(wc.imm_data));
> +			 be32_to_cpu(wc.ex.imm_data));
>  		rds_ib_stats_inc(s_ib_tx_cq_event);
>  
>  		if (wc.wr_id == RDS_IB_ACK_WR_ID) {
> @@ -223,6 +225,16 @@ void rds_ib_send_cq_comp_handler(struct 
>  				/* Nothing to be done - the SG list will be unmapped
>  				 * when the SEND completes. */
>  				break;
> +			case IB_WR_LOCAL_INV:
> +				/* We invalidated an r_key. the caller may want to
> +				 * learn about this. */
> +				if (send->s_fastreg)
> +					rds_ib_local_inv_complete(send->s_fastreg, wc.status);
> +				break;
> +			case IB_WR_FAST_REG_MR:
> +				if (send->s_fastreg)
> +					rds_ib_fast_reg_complete(send->s_fastreg, wc.status);
> +				break;
>  			default:
>  				if (printk_ratelimit())
>  					printk(KERN_NOTICE
> @@ -261,7 +273,7 @@ void rds_ib_send_cq_comp_handler(struct 
>  			 * queue_delay_work will not do anything if the work
>  			 * struct is already queued, so we need to cancel it first.
>  			 */
> -			cancel_delayed_work(&conn->c_send_w);
> +			cancel_delayed_work(&conn->c_send_w); /* FIXME barf */
>  			queue_delayed_work(rds_wq, &conn->c_send_w, 0);
>  		}
>  
> @@ -490,6 +502,21 @@ int rds_ib_xmit(struct rds_connection *c
>  	else
>  		i = ceil(be32_to_cpu(rm->m_inc.i_hdr.h_len), RDS_FRAG_SIZE);
>  
> +	/* Fastreg support */
> +	if (rds_rdma_cookie_key(rm->m_rdma_cookie)
> +	 && ic->i_fastreg
> +	 && !test_bit(RDS_MSG_FASTREG_POSTED, &rm->m_flags)) {
> +		ret = rds_ib_xmit_fastreg(conn, rm->m_rdma_mr);
> +		if (ret)
> +			goto out;
> +
> +		/* We don't release the fastreg yet - we can only
> +		 * do that when it has completed. If the connection
> +		 * goes down, and we re-queue the message, we would
> +		 * have to retry the registration. */
> +		set_bit(RDS_MSG_FASTREG_POSTED, &rm->m_flags);
> +	}
> +
>  	work_alloc = rds_ib_ring_alloc(&ic->i_send_ring, i, &pos);
>  	if (work_alloc == 0) {
>  		set_bit(RDS_LL_SEND_FULL, &conn->c_flags);
> @@ -849,6 +876,108 @@ out:
>  	return ret;
>  }
>  
> +static int __rds_ib_xmit_fastreg(struct rds_connection *conn,
> +				 struct rds_ib_fastreg *frr)
> +{
> +	struct rds_ib_connection *ic = conn->c_transport_data;
> +	struct rds_ib_send_work *send = NULL;
> +	struct rds_ib_send_work *first;
> +	struct ib_send_wr *failed_wr;
> +	u32 pos;
> +	u32 work_alloc = 0;
> +	int ret;
> +	int num_wrs;
> +
> +	/*
> +	 * Perform 2 WRs for the fast_reg_mr's and chain them together.  The
> +	 * first WR is used to invalidate the old rkey, and the second WR is
> +	 * used to define the new fast_reg_mr request.  Each individual page
> +	 * in the sg list is added to the fast reg page list and placed
> +	 * inside the fast_reg_mr WR.  The key used is a rolling 8bit
> +	 * counter, which should guarantee uniqueness.
> +	 */
> +	num_wrs = 0;
> +	if (frr->f_old_rkey != RDS_IB_INVALID_FASTREG_KEY)
> +		num_wrs++;
> +	if (frr->f_page_list)
> +		num_wrs++;
> +	if (!num_wrs)
> +		return 0;
> +
> +	work_alloc = rds_ib_ring_alloc(&ic->i_send_ring, num_wrs, &pos);
> +	if (work_alloc != num_wrs) {
> +		rds_ib_ring_unalloc(&ic->i_send_ring, work_alloc);
> +		rds_ib_stats_inc(s_ib_tx_ring_full);
> +		ret = -ENOMEM;
> +		goto out;
> +	}
> +
> +	first = send = &ic->i_sends[pos];
> +
> +	if (frr->f_old_rkey != RDS_IB_INVALID_FASTREG_KEY) {
> +		memset(send, 0, sizeof(*send));
> +		send->s_wr.opcode = IB_WR_LOCAL_INV;
> +		send->s_wr.ex.invalidate_rkey = frr->f_old_rkey;
> +		send->s_fastreg = frr;
> +		send->s_queued = jiffies;
> +
> +		/* Get the next WR */
> +		pos = (pos + 1) % ic->i_send_ring.w_nr;
> +		send = &ic->i_sends[pos];
> +	}
> +
> +	if (frr->f_page_list) {
> +		memset(send, 0, sizeof(*send));
> +		send->s_wr.opcode = IB_WR_FAST_REG_MR;
> +		send->s_wr.wr.fast_reg.length = frr->f_length;
> +		send->s_wr.wr.fast_reg.rkey = frr->f_rkey;
> +		send->s_wr.wr.fast_reg.page_list = frr->f_page_list;
> +		send->s_wr.wr.fast_reg.page_list_len = frr->f_page_list_len;
> +		send->s_wr.wr.fast_reg.page_shift = frr->f_page_shift;
> +		send->s_wr.wr.fast_reg.access_flags = IB_ACCESS_LOCAL_WRITE |
> +					IB_ACCESS_REMOTE_READ |
> +					IB_ACCESS_REMOTE_WRITE;
> +		send->s_fastreg = frr;
> +		send->s_queued = jiffies;
> +	}
> +
> +	atomic_add(num_wrs, &frr->f_refcnt);
> +
> +	/* Chain the two WRs together */
> +	if (num_wrs == 2)
> +		first->s_wr.next = &send->s_wr;
> +
> +	failed_wr = &first->s_wr;
> +	ret = ib_post_send(ic->i_cm_id->qp, &first->s_wr, &failed_wr);
> +
> +	rdsdebug("ic %p first %p (wr %p) ret %d wr %p\n", ic,
> +		 first, &first->s_wr, ret, failed_wr);
> +	BUG_ON(failed_wr != &first->s_wr);
> +	if (ret) {
> +		printk(KERN_WARNING "RDS/IB: fastreg ib_post_send to %u.%u.%u.%u "
> +		       "returned %d\n", NIPQUAD(conn->c_faddr), ret);
> +		while (num_wrs--)
> +			rds_ib_fastreg_release(frr);
> +		rds_ib_ring_unalloc(&ic->i_send_ring, 2);
> +		return ret;
> +	}
> +
> +out:
> +	return ret;
> +}
> +
> +int rds_ib_xmit_fastreg(struct rds_connection *conn, struct rds_mr *mr)
> +{
> +	struct rds_ib_fastreg *frr;
> +
> +	frr = rds_ib_rdma_get_fastreg(mr);
> +	if (!frr)
> +		return 0;
> +	if (IS_ERR(frr))
> +		return PTR_ERR(frr);
> +	return __rds_ib_xmit_fastreg(conn, frr);
> +}
> +
>  void rds_ib_xmit_complete(struct rds_connection *conn)
>  {
>  	struct rds_ib_connection *ic = conn->c_transport_data;
> Index: build-2.6/net/rds/send.c
> ===================================================================
> --- build-2.6.orig/net/rds/send.c
> +++ build-2.6/net/rds/send.c
> @@ -84,6 +84,10 @@ void rds_send_reset(struct rds_connectio
>  	list_for_each_entry_safe(rm, tmp, &conn->c_retrans, m_conn_item) {
>  		set_bit(RDS_MSG_ACK_REQUIRED, &rm->m_flags);
>  		set_bit(RDS_MSG_RETRANSMITTED, &rm->m_flags);
> +		/* If we were in the process of performing a fastreg
> +		 * memory registration when the connection went down,
> +		 * we have to retry it. */
> +		clear_bit(RDS_MSG_FASTREG_POSTED, &rm->m_flags);
>  	}
>  	list_splice_init(&conn->c_retrans, &conn->c_send_queue);
>  	spin_unlock_irqrestore(&conn->c_lock, flags);
> @@ -765,6 +769,9 @@ static int rds_cmsg_send(struct rds_sock
>  		if (cmsg->cmsg_level != SOL_RDS)
>  			continue;
>  
> +		/* As a side effect, RDMA_DEST and RDMA_MAP will set
> +		 * rm->m_rdma_cookie and rm->m_rdma_mr.
> +		 */
>  		switch (cmsg->cmsg_type) {
>  		case RDS_CMSG_RDMA_ARGS:
>  			ret = rds_cmsg_rdma_args(rs, rm, cmsg);
> Index: build-2.6/net/rds/message.c
> ===================================================================
> --- build-2.6.orig/net/rds/message.c
> +++ build-2.6/net/rds/message.c
> @@ -71,6 +71,8 @@ static void rds_message_purge(struct rds
>  
>  	if (rm->m_rdma_op)
>  		rds_rdma_free_op(rm->m_rdma_op);
> +	if (rm->m_rdma_mr)
> +		rds_mr_put(rm->m_rdma_mr);
>  }
>  
>  void rds_message_inc_purge(struct rds_incoming *inc)
> Index: build-2.6/net/rds/rdma.h
> ===================================================================
> --- build-2.6.orig/net/rds/rdma.h
> +++ build-2.6/net/rds/rdma.h
> @@ -74,4 +74,11 @@ int rds_cmsg_rdma_map(struct rds_sock *r
>  void rds_rdma_free_op(struct rds_rdma_op *ro);
>  void rds_rdma_send_complete(struct rds_message *rm, int);
>  
> +extern void __rds_put_mr_final(struct rds_mr *mr);
> +static inline void rds_mr_put(struct rds_mr *mr)
> +{
> +	if (atomic_dec_and_test(&mr->r_refcount))
> +		__rds_put_mr_final(mr);
> +}
> +
>  #endif
> Index: build-2.6/net/rds/ib_recv.c
> ===================================================================
> --- build-2.6.orig/net/rds/ib_recv.c
> +++ build-2.6/net/rds/ib_recv.c
> @@ -796,7 +796,7 @@ void rds_ib_recv_cq_comp_handler(struct 
>  	while (ib_poll_cq(cq, 1, &wc) > 0) {
>  		rdsdebug("wc wr_id 0x%llx status %u byte_len %u imm_data %u\n",
>  			 (unsigned long long)wc.wr_id, wc.status, wc.byte_len,
> -			 be32_to_cpu(wc.imm_data));
> +			 be32_to_cpu(wc.ex.imm_data));
>  		rds_ib_stats_inc(s_ib_rx_cq_event);
>  
>  		recv = &ic->i_recvs[rds_ib_ring_oldest(&ic->i_recv_ring)];




More information about the general mailing list