[openib-general] [PATCH][RFC] nfsordma: initial port of nfsrdma to 2.6 and james'sss kdapl

Tom Duffy tduffy at sun.com
Mon Jun 27 15:25:40 PDT 2005


I have done some initial work to port nfsrdma to 2.6 and to James's
kDAPL.  This builds now inside the kernel.

You will need to follow the kDAPL directions first to put that in your
kernel tree, then slap this patch over top of that.

So you have 2.6.12 + svn drivers/infiniband + kdapl from james's tree +
this patch to get it to build.  Oh you will also need to patch the rpc
header to get it to build.

I think it is time to open up a tree in openib repository.  Tom, is
netapp willing to GPL this code?

Signed-off-by: Tom Duffy <tduffy at sun.com>

Index: drivers/infiniband/ulp/nfsrdma/rdma_kdapl.c
===================================================================
--- drivers/infiniband/ulp/nfsrdma/rdma_kdapl.c	(revision 0)
+++ drivers/infiniband/ulp/nfsrdma/rdma_kdapl.c	(revision 0)
@@ -0,0 +1,1353 @@
+/*
+ * Copyright (c) 2003, 2004, Network Appliance, Inc. All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ *	Redistributions of source code must retain the above copyright
+ *	notice, this list of conditions and the following disclaimer.
+ *
+ *	Redistributions in binary form must reproduce the above
+ *	copyright notice, this list of conditions and the following
+ *	disclaimer in the documentation and/or other materials provided
+ *	with the distribution.
+ *
+ *	Neither the name of the Network Appliance, Inc. nor the names of
+ *	its contributors may be used to endorse or promote products
+ *	derived from this software without specific prior written
+ *	permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+
+/*
+ * rdma_kdapl.c
+ *
+ * Implements an interface to kDAPL that is used by the RPC RDMA
+ * transport. Encapsulates the major functions managing:
+ *  o adapters
+ *  o endpoints
+ *  o connections
+ *  o buffer memory
+ *
+ * This file does not depend on any modified Linux kernel code.
+ * It depends on kDAPL, and some basic Linux facilities.
+ */
+
+#include <linux/slab.h>		/* kmalloc, kfree */
+#include <linux/interrupt.h>	/* tasklet */
+#include <linux/mm.h>		/* num_physpages */
+
+#include "rdma_kdapl.h"
+
+/*
+ * Globals/Macros
+ */
+
+#undef Dprintk
+#if RPCRDMA_DEBUG
+#define Dprintk(cond, x) if (rdma_kdapl_debug >= cond) printk x
+int rdma_kdapl_debug;
+#else
+#define Dprintk(n, x)
+#endif
+
+/*
+ * local types and constants
+ */
+
+/* handle replies in tasklet context, using a single, global list */
+static void rdma_run_tasklet(unsigned long data);
+DECLARE_TASKLET(rdma_tasklet_g, rdma_run_tasklet, 0UL);
+
+static spinlock_t rdma_tk_lock_g = SPIN_LOCK_UNLOCKED;
+static LIST_HEAD(rdma_tasklets_g);
+
+/*
+ * local function prototypes
+ */
+static void rdma_clean_evd(struct dat_evd *, char *);
+static void rdma_async_evd_upcall(void *, const struct dat_event *, boolean_t);
+static void rdma_event_evd_upcall(void *, const struct dat_event *, boolean_t);
+static void rdma_conn_evd_upcall(void *, const struct dat_event *, boolean_t);
+
+static inline void
+rdma_schedule_tasklet(rdma_rep_t *rep)
+{
+	unsigned int lock_flags;
+
+	spin_lock_irqsave(&rdma_tk_lock_g, lock_flags);
+	list_add_tail(&rep->rr_list, &rdma_tasklets_g);
+	spin_unlock_irqrestore(&rdma_tk_lock_g, lock_flags);
+	tasklet_schedule(&rdma_tasklet_g);
+}
+
+
+static inline u32
+rdma_wait_conn(rdma_ep_t *ep, rdma_ia_t *ia, unsigned long to)
+{
+	/* TBD handle timeout */
+	wait_event_interruptible(ep->rep_connect_wait, ep->rep_connected == 1);
+	/* TBD handle errors here? */
+	return DAT_SUCCESS;
+}
+
+static inline u32
+rdma_wait_disconn(rdma_ep_t *ep, rdma_ia_t *ia, unsigned long to)
+{
+	/* TBD handle timeout */
+	wait_event_interruptible(ep->rep_connect_wait, ep->rep_connected != 1);
+	/* TBD handle errors here? */
+	return DAT_SUCCESS;
+}
+
+#if RPCRDMA_DEBUG
+static const char *
+ststatus(unsigned int status)
+{
+	static const char * const ststrings[] = {
+		"success",
+		"flushed",
+		"local length error",
+		"local endpoint error",
+		"local protection error",
+		"bad response",
+		"remote access error",
+		"remote responder error",
+		"transport error",
+		"receiver not ready",
+		"partial packet",
+		"rmr operation error"
+
+	};
+	if (status <= 11)
+		return ststrings[status];
+	return "unknown";
+}
+#endif
+
+/*
+ * Exported functions.
+ */
+
+/*
+ * Open and initialize an Interface Adapter.
+ *  o initializes fields of rdma_ia_t, including
+ *    interface and provider attributes and protection zone.
+ */
+rdma_ia_t *
+rdma_ia_init(rdma_ia_t *ia, char *ia_name, int memreg)
+{
+	u32 datstatus;
+	struct dat_upcall_object upcall;
+	DAT_REGION_DESCRIPTION region;
+	DAT_RMR_CONTEXT *rmr_contextp;
+	enum dat_mem_priv_flags mem_priv;
+	u64 pgcount;
+
+	memset(ia, 0, sizeof *ia);
+	/* open IA */
+	datstatus = dat_ia_open(ia_name, 4,
+				&ia->ri_async_evd_handle,
+				&ia->ri_ia_handle);
+	if (datstatus != DAT_SUCCESS) {
+		Dprintk(0, ("rdma_ia_init: dat_ia_open failed on %s, status 0x%x\n",
+			ia_name, datstatus));
+		goto out;
+	}
+
+	/* j.i.c., re-vector the async evd. Not fatal. */
+	upcall.instance_data = ia;
+	upcall.upcall_func = rdma_async_evd_upcall;
+	datstatus = dat_evd_modify_upcall(ia->ri_async_evd_handle,
+					DAT_UPCALL_SINGLE_INSTANCE, &upcall);
+	Dprintk(0 && datstatus != DAT_SUCCESS,
+		("rdma_ia_init: dat_evd_modify_upcall (async) failed: 0x%x\n",
+		datstatus));
+
+	/* get IA attributes */
+	datstatus = dat_ia_query(ia->ri_ia_handle,
+				&ia->ri_async_evd_handle,
+				&ia->ri_ia_attr,
+				&ia->ri_pv_attr);
+	if (datstatus != DAT_SUCCESS) {
+		Dprintk(0, ("rdma_ia_init: dat_ia_query failed: 0x%x\n",
+			datstatus));
+		goto out;
+	}
+
+	/* TBD provider or endpoint attributes to check? */
+
+	/* create protection zone for IA */
+	datstatus = dat_pz_create(ia->ri_ia_handle, &ia->ri_pz_handle);
+	if (datstatus != DAT_SUCCESS) {
+		Dprintk(0, ("rdma_ia_init: dat_pz_create failed: 0x%x\n",
+			datstatus));
+		goto out;
+	}
+
+	/*
+	 * Optionally register an underlying physical identity mapping in
+	 * order to do high performance dat_rmr_bind. This base registration
+	 * is protected from remote access - that is enabled only by binding
+	 * for the specific bytes targeted during each RPC operation, and
+	 * revoked after the corresponding completion similar to a storage
+	 * adapter.
+	 */
+	if (memreg > 1) {
+		/* round up pgcount by 1MB */
+		pgcount = ((u64) num_physpages + 255ULL) & ~255ULL,
+		region.for_va = (void *)0;
+		mem_priv = DAT_MEM_PRIV_LOCAL_READ_FLAG |
+				DAT_MEM_PRIV_LOCAL_WRITE_FLAG;
+		rmr_contextp = NULL;
+#if RPCRDMA_DEBUG
+		if (memreg == 4) {
+			mem_priv = DAT_MEM_PRIV_ALL_FLAG;
+			rmr_contextp = &ia->ri_bind_rmr;
+		}
+#endif
+		datstatus = dat_lmr_kcreate(ia->ri_ia_handle,
+					DAT_MEM_TYPE_PHYSICAL,
+					region,
+					pgcount,
+					ia->ri_pz_handle,
+					mem_priv,
+					DAT_MEM_OPTIMIZE_DONT_CARE,
+					&ia->ri_bind_mem,
+					&ia->ri_bind_iov.lmr_context,
+					rmr_contextp,
+					&ia->ri_bind_iov.segment_length,
+					&ia->ri_bind_iov.virtual_address);
+		if (datstatus != DAT_SUCCESS) {
+			printk("rdma_ia_init: dat_lmr_kcreate for "
+				"fast register failed with 0x%x\n\t"
+				"Will continue with degraded performance\n",
+				datstatus);
+			datstatus = DAT_SUCCESS;
+			memreg = 1;
+		} else {
+			Dprintk(1, ("rdma_ia_init: preregistered %llu physical"
+				" pages for rmr creation (0->0x%llx, %lluMB)\n",
+				ia->ri_bind_iov.segment_length >> PAGE_SHIFT,
+				ia->ri_bind_iov.segment_length - 1,
+				ia->ri_bind_iov.segment_length >> 20));
+		}
+	}
+
+	/* Else will do lmr_kcreate/lmr_free for each chunk */
+	ia->ri_memreg_strategy = memreg;
+out:
+	if (datstatus != DAT_SUCCESS) {
+		if (ia->ri_ia_handle != NULL) {
+			rdma_ia_close(ia);
+		}
+		ia = NULL;
+	}
+	return ia;
+}
+
+/*
+ * Clean up/close an IA.
+ *   o if event handles and PZ have been initialized, free them.
+ *   o close the IA
+ */
+void
+rdma_ia_close(rdma_ia_t *ia)
+{
+	u32 datstatus;
+
+	Dprintk(1, ("rdma_ia_close: entering\n"));
+	if (ia->ri_bind_mem != NULL) {
+		datstatus = dat_lmr_free(ia->ri_bind_mem);
+		Dprintk(0 && datstatus != DAT_SUCCESS,
+			("rdma_ia_close: dat_lmr_free(phys) returned 0x%x\n",
+			datstatus));
+	}
+	if (ia->ri_pz_handle != NULL) {
+		datstatus = dat_pz_free(ia->ri_pz_handle);
+		Dprintk(0 && datstatus != DAT_SUCCESS,
+			("rdma_ia_close: dat_pz_free returned 0x%x\n",
+			datstatus));
+	}
+	if (ia->ri_async_evd_handle != NULL) {
+		rdma_clean_evd(ia->ri_async_evd_handle, "async");
+		/* Do not destroy - provider owns this. */
+	}
+
+	if (ia->ri_ia_handle != NULL) {
+		datstatus = dat_ia_close(ia->ri_ia_handle, DAT_CLOSE_ABRUPT_FLAG);
+		Dprintk(0 && datstatus != DAT_SUCCESS,
+			("rdma_ia_close: dat_ia_close returned 0x%x\n",
+			datstatus));
+	}
+}
+
+/*
+ * Initialize default attributes for an endpoint, in preparation for create.
+ */
+void
+rdma_ep_default_attr(rdma_ep_t *ep, rdma_ia_t *ia,
+			struct rdma_create_data_internal *cdata)
+{
+	memset(&ep->rep_attr, 0, sizeof(ep->rep_attr));
+	ep->rep_attr.max_message_size = max(cdata->inline_rsize, cdata->inline_wsize);
+	ep->rep_attr.max_message_size += MAX_RPCHDR + MAX_RDMAHDR;
+	ep->rep_attr.max_rdma_size = max(cdata->rsize, cdata->wsize);
+	ep->rep_attr.max_recv_dtos = cdata->max_requests;
+	ep->rep_attr.max_request_dtos = cdata->max_requests;
+	if (ia->ri_bind_mem != NULL && ia->ri_memreg_strategy != 4) {
+		/* Add room for rmr_binds+unbinds - overkill! */
+		ep->rep_attr.max_request_dtos++;
+		ep->rep_attr.max_request_dtos *= (2 * RDMA_MAX_SEGS);
+	}
+	if (ep->rep_attr.max_request_dtos > ia->ri_ia_attr.max_dto_per_ep) {
+		ep->rep_attr.max_request_dtos = ia->ri_ia_attr.max_dto_per_ep;
+	}
+	ep->rep_attr.max_recv_iov = 1;
+	ep->rep_attr.max_request_iov = (cdata->padding ? 4 : 2);
+	ep->rep_attr.max_rdma_read_in = cdata->max_requests * (RDMA_MAX_SEGS/2);
+	if (ep->rep_attr.max_rdma_read_in > ia->ri_ia_attr.max_rdma_read_per_ep_in) {
+		ep->rep_attr.max_rdma_read_in = ia->ri_ia_attr.max_rdma_read_per_ep_in;
+	}
+	ep->rep_attr.max_rdma_read_out = 0;	/* always */
+	ep->rep_attr.recv_completion_flags = DAT_COMPLETION_DEFAULT_FLAG;
+	ep->rep_attr.request_completion_flags = DAT_COMPLETION_SUPPRESS_FLAG;
+
+	/* set trigger for requesting send completion */
+	ep->rep_cqinit = ep->rep_attr.max_request_dtos - 1;
+	if (ia->ri_bind_mem != NULL && ia->ri_memreg_strategy != 4)
+		ep->rep_cqinit -= RDMA_MAX_SEGS;
+	if (ep->rep_cqinit <= 2)
+		ep->rep_cqinit = 0;
+	INIT_CQCOUNT(ep);
+	spin_lock_init(&ep->rep_postlock);
+	ep->rep_ia = ia;
+	init_waitqueue_head(&ep->rep_connect_wait);
+}
+
+/*
+ * Create unconnected endpoint.
+ */
+rdma_ep_t *
+rdma_ep_create(rdma_ep_t *ep, rdma_ia_t *ia,
+				struct rdma_create_data_internal *cdata)
+{
+	struct dat_upcall_object upcall;
+	u32 datstatus;
+	struct dat_ep_param params;
+
+	/*
+	 * Create a single evd for receive dto and rmr_bind (only ever
+	 * care about unbind, really). Send completions are suppressed.
+	 * Use single threaded upcalls to maintain ordering. Operation
+	 * is parallel via tasklet.
+	 */
+	upcall.instance_data = NULL;
+	upcall.upcall_func = rdma_event_evd_upcall;
+	datstatus = dat_evd_kcreate(ia->ri_ia_handle,
+				ep->rep_attr.max_recv_dtos +
+					ep->rep_attr.max_request_dtos + 1,
+				DAT_UPCALL_SINGLE_INSTANCE,
+				&upcall,
+				DAT_EVD_DTO_FLAG|
+					DAT_EVD_RMR_BIND_FLAG,
+				&ep->rep_evd_handle);
+	if (datstatus != DAT_SUCCESS) {
+		Dprintk(0, ("rdma_ep_create: dat_evd_kcreate (event) failed: 0x%x\n",
+			datstatus));
+		return NULL;
+	}
+
+	/*
+	 * Create a second evd for connection events. This avoids any
+	 * performance issues with completion evd sharing.
+	 */
+	upcall.instance_data = ep;
+	upcall.upcall_func = rdma_conn_evd_upcall;
+	datstatus = dat_evd_kcreate(ia->ri_ia_handle,
+				4,
+				DAT_UPCALL_SINGLE_INSTANCE,
+				&upcall,
+				DAT_EVD_CONNECTION_FLAG,
+				&ep->rep_conn_handle);
+	if (datstatus != DAT_SUCCESS) {
+		Dprintk(0, ("rdma_ep_create: dat_evd_kcreate (conn) failed: 0x%x\n",
+			datstatus));
+		rdma_ep_destroy(ep, ia);
+		return NULL;
+	}
+
+	datstatus = dat_ep_create(ia->ri_ia_handle,
+				ia->ri_pz_handle,
+				ep->rep_evd_handle,	/* recv */
+				ep->rep_evd_handle,	/* rqst */
+				ep->rep_conn_handle,	/* conn */
+				&ep->rep_attr, &ep->rep_handle);
+
+	if (datstatus != DAT_SUCCESS) {
+		Dprintk(0, ("dat_ep_create failed with 0x%x\n", datstatus));
+		rdma_ep_destroy(ep, ia);
+		return NULL;
+	}
+	/* Get the actual ep attributes */
+	datstatus = dat_ep_query(ep->rep_handle, &params);
+	if (datstatus == DAT_SUCCESS) {
+		ep->rep_attr = params.ep_attr;
+		Dprintk(1, ("rdma_ep_create: max: msg %lld, rdma %lld; "
+			"dtos: send %d recv %d; iovs: send %d recv %d; "
+			"rdma reads: in %d out %d\n",
+			ep->rep_attr.max_message_size,
+			ep->rep_attr.max_rdma_size,
+			ep->rep_attr.max_request_dtos,
+			ep->rep_attr.max_recv_dtos,
+			ep->rep_attr.max_request_iov,
+			ep->rep_attr.max_recv_iov,
+			ep->rep_attr.max_rdma_read_in,
+			ep->rep_attr.max_rdma_read_out));
+
+		if (ep->rep_attr.max_recv_dtos < cdata->max_requests) {
+			Dprintk(0, ("rdma_ep_create: reducing "
+				"max_requests to %d to match available DTOs\n",
+				ep->rep_attr.max_recv_dtos));
+			cdata->max_requests = ep->rep_attr.max_recv_dtos;
+		}
+		/* TBD presume sizes, send DTOs, rdma reads etc ok */
+	} else {
+		Dprintk(0, ("dat_ep_query failed with 0x%x\n", datstatus));
+	}
+	return ep;
+}
+
+/*
+ * rdma_ep_destroy
+ *
+ * Disconnect and destroy endpoint. After this, the only
+ * valid operations on the ep are to free it (if dynamically
+ * allocated) or re-create it.
+ *
+ * Depending on the caller's error handling, the endpoint
+ * could leak if this function fails.
+ */
+int
+rdma_ep_destroy(rdma_ep_t *ep, rdma_ia_t *ia)
+{
+	u32 datstatus;
+
+	Dprintk(1, ("rdma_ep_destroy: entering, connected is %d\n",
+		ep->rep_connected));
+
+	if (ep->rep_handle) {
+		datstatus = rdma_ep_disconnect(ep, ia);
+		Dprintk(1 && datstatus != DAT_SUCCESS,
+			("rdma_ep_destroy: rdma_ep_disconnect returned 0x%x\n",
+			datstatus));
+		(void) dat_ep_reset(ep->rep_handle);
+	}
+
+	ep->rep_func = 0;
+
+	/* padding - should actually be done in rdma_buffer_destroy... */
+	if (ep->rep_padhandle) {
+		rdma_deregister_internal(ep->rep_padhandle);
+		ep->rep_padhandle = NULL;
+	}
+
+	if (ep->rep_handle) {
+		datstatus = dat_ep_free(ep->rep_handle);
+		Dprintk(0 && datstatus != DAT_SUCCESS,
+			("rdma_ep_destroy: dat_ep_free returned 0x%x\n",
+			datstatus));
+	}
+
+	if (ep->rep_conn_handle) {
+		rdma_clean_evd(ep->rep_conn_handle, "connection");
+		datstatus = dat_evd_free(ep->rep_conn_handle);
+		Dprintk(0 && datstatus != DAT_SUCCESS,
+			("rdma_ep_destroy: dat_evd_free (conn) returned 0x%x\n",
+			datstatus));
+	}
+
+	rdma_clean_evd(ep->rep_evd_handle, "completion");
+	datstatus = dat_evd_free(ep->rep_evd_handle);
+	Dprintk(0 && datstatus != DAT_SUCCESS,
+		("rdma_ep_destroy: dat_evd_free returned 0x%x\n", datstatus));
+
+	return (datstatus != DAT_SUCCESS);
+}
+
+/*
+ * Connect unconnected endpoint.
+ *
+ * TBD:
+ *   o check for unconnected state; fail if not
+ *   o pass in, or set DAT_CONN_QUAL
+ *   o don't use infinite timeout
+ *   o need better error granularity for various failure events?
+ */
+int
+rdma_ep_connect(rdma_ep_t *ep, rdma_ia_t *ia, int nowait)
+{
+	u32 datstatus;
+	int retry_count = 0;
+
+retry:
+	ep->rep_event_num = 0;
+	ep->rep_connected = -1;
+	datstatus = dat_ep_connect(ep->rep_handle,
+				(struct sockaddr *)&ep->rep_remote_addr,
+				ep->rep_server_port,
+				RDMA_CONNECT_TIMEOUT,
+				0, NULL,	/* no private data */
+				DAT_QOS_BEST_EFFORT,
+				DAT_CONNECT_DEFAULT_FLAG);
+	if (datstatus == DAT_SUCCESS) {
+		/* set during reconnect attempts */
+		if (nowait) {
+			return 0;
+		}
+
+		rdma_wait_conn(ep, ia, DAT_TIMEOUT_MAX);
+		/*
+		 * Check state. A non-peer reject indicates no listener
+		 * (ECONNREFUSED), which may be a transient state. All
+		 * others indicate a transport condition which has already
+		 * undergone a best-effort.
+		 */
+		if (ep->rep_event_num == DAT_CONNECTION_EVENT_NON_PEER_REJECTED
+		    && ++retry_count <= RDMA_CONNECT_RETRY_MAX) {
+			goto retry;
+		}
+		if (ep->rep_event_num != DAT_CONNECTION_EVENT_ESTABLISHED) {
+			Dprintk(0, ("rdma_ep_connect: failed to connect, event 0x%x\n",
+				ep->rep_event_num));
+			datstatus = DAT_TIMEOUT_EXPIRED;
+		} else {
+			RDMA_ASSERT(ep->rep_connected == 1,
+				"rdma_ep_connect connection not indicated");
+			Dprintk(1, ("rdma_ep_connect: connected\n"));
+		}
+	} else {
+		Dprintk(0, ("dat_ep_connect failed with 0x%x\n", datstatus));
+	}
+	ep->rep_event_num = 0;
+	return (datstatus != DAT_SUCCESS);
+}
+
+/*
+ * rdma_ep_disconnect
+ *
+ * This is separate from destroy to facilitate the ability
+ * to reconnect without recreating the endpoint.
+ *
+ * This call is not reentrant, and must not be made in parallel
+ * on the same endpoint.
+ */
+int
+rdma_ep_disconnect(rdma_ep_t *ep, rdma_ia_t *ia)
+{
+	u32 datstatus;
+
+	rdma_clean_evd(ep->rep_evd_handle, "completion");
+	ep->rep_event_num = 0;
+	datstatus = dat_ep_disconnect(ep->rep_handle, DAT_CLOSE_ABRUPT_FLAG);
+	Dprintk(0 && datstatus != DAT_SUCCESS,
+		("rdma_ep_disconnect: dat_ep_disconnect status 0x%x\n",
+		datstatus));
+	if (datstatus == DAT_SUCCESS) {
+		/* returns without wait if not connected */
+		rdma_wait_disconn(ep, ia, DAT_TIMEOUT_MAX);
+		Dprintk(1, ("rdma_ep_disconnect: after wait, %sconnected\n",
+			(ep->rep_connected == 1) ? "still " : "dis"));
+	}
+	ep->rep_connected = 0;
+	/* ignore event number */
+	return (datstatus != DAT_SUCCESS);
+}
+
+/*
+ * re-connect failed endpoint
+ *
+ * TBD: implement synchronization on this to avoid
+ *      multiple callers (RPC mechanism may be enough)
+ */
+int
+rdma_ep_reconnect(rdma_ep_t *ep, rdma_ia_t *ia)
+{
+	u32 datstatus;
+	(void) rdma_ep_disconnect(ep, ia);
+	datstatus = dat_ep_reset(ep->rep_handle);
+	Dprintk(0 && datstatus != DAT_SUCCESS,
+		("rdma_ep_reconnect: dat_ep_reset failed 0x%x\n",
+		datstatus));
+	rdma_clean_evd(ep->rep_evd_handle, "completion");
+	datstatus = rdma_ep_connect(ep, ia, 1);
+	Dprintk(1, ("rdma_ep_connect: rdma_ep_connect status 0x%x\n",
+		datstatus));
+	return (datstatus != DAT_SUCCESS);
+}
+
+/*
+ * Drain any EVD, prior to teardown.
+ */
+static void
+rdma_clean_evd(struct dat_evd *hdl, char *which)
+{
+	struct dat_event event;
+	int count = 0;
+
+	while (dat_evd_dequeue(hdl, &event) == DAT_SUCCESS) {
+		++count;
+	}
+	Dprintk(0 && count,
+		("rdma_clean_evd: flushed %d %s events (last 0x%x)\n",
+		 count, which, event.event_number));
+}
+
+/*
+ * Initialize buffer memory
+ */
+rdma_buffer_t *
+rdma_buffer_create(rdma_buffer_t *buf, rdma_ep_t *ep,
+			rdma_ia_t *ia, struct rdma_create_data_internal *cdata)
+{
+	u32 datstatus;
+	char *p;
+	int i, len;
+
+	buf->rb_max_requests = cdata->max_requests;
+	spin_lock_init(&buf->rb_lock);
+	atomic_set(&buf->rb_credits, 1);
+
+	/* Need to allocate:
+	 *   1.  arrays for send and recv pointers
+	 *   2.  arrays of rdma_req_t to fill in pointers
+	 *   3.  array of rdma_rep_t for replies
+	 *   4.  padding, if any
+	 *   5.  rmr's, if any
+	 * Send/recv buffers in req/rep need to be registered for LMR
+	 */
+
+	len = buf->rb_max_requests * (sizeof(rdma_req_t *) + sizeof(rdma_rep_t *));
+#if RPCRDMA_DEBUG
+	if (ia->ri_memreg_strategy != 4)
+#endif
+	if (ia->ri_bind_mem) {
+		len += (buf->rb_max_requests * RDMA_MAX_SEGS * sizeof (struct rdma_rmr_entry));
+	}
+	len += cdata->padding;
+
+	/* allocate 1, 4 and 5 in one shot */
+	p = kmalloc(len, GFP_KERNEL);
+	if (p == NULL) {
+		Dprintk(0, ("rdma_buffer_create: req_t/rep_t/pad kmalloc(%d) failed\n", len));
+		goto outfail;
+	}
+	memset(p, 0, len);
+	buf->rb_pool = p;	/* for freeing it later */
+
+	buf->rb_send_bufs = (rdma_req_t **) p;
+	p = (char *) &buf->rb_send_bufs[buf->rb_max_requests];
+	buf->rb_recv_bufs = (rdma_rep_t **) p;
+	p = (char *) &buf->rb_recv_bufs[buf->rb_max_requests];
+
+	/*
+	 * Register the zeroed pad buffer, if any.
+	 */
+	if (cdata->padding && rdma_register_internal(ia, p, cdata->padding,
+					&ep->rep_padhandle, &ep->rep_pad)) {
+		goto outfail;
+	}
+	p += cdata->padding;
+
+	/*
+	 * Allocate the rmr's for fast chunk registration.
+	 * We "cycle" the rmr's in order to minimize rmr_context reuse,
+	 * and also reduce unbind-to-bind collision.
+	 */
+	INIT_LIST_HEAD(&buf->rb_rmrs);
+#if RPCRDMA_DEBUG
+	if (ia->ri_memreg_strategy != 4)
+#endif
+	if (ia->ri_bind_mem) {
+		struct rdma_rmr_entry *r = (struct rdma_rmr_entry *)p;
+		/* Allocate one extra request's worth, for full cycling */
+		for (i = (buf->rb_max_requests + 1) * RDMA_MAX_SEGS; i; i--) {
+			datstatus = dat_rmr_create(ia->ri_pz_handle,
+							&r->rmr_handle);
+			if (datstatus != DAT_SUCCESS) {
+				Dprintk(0, ("rdma_buffer_create: dat_rmr_create"
+						" failed with 0x%x\n",
+					datstatus));
+				goto outfail;
+			}
+			list_add(&r->rmr_freelist, &buf->rb_rmrs);
+			++r;
+		}
+	}
+
+	/*
+	 * Allocate/init the request/reply buffers. Doing this
+	 * using kmalloc for now -- one for each buf.
+	 */
+	for (i = 0; i < buf->rb_max_requests; i++) {
+		rdma_req_t *req;
+		rdma_rep_t *rep;
+		/* Allocate an extra RPCHDR to satisfy RPC allocate() call */
+		len = cdata->inline_wsize + (2 * MAX_RPCHDR);
+		req = kmalloc(len + sizeof (rdma_req_t), GFP_KERNEL);
+		if (req == NULL) {
+			Dprintk(0, ("rdma_buffer_create: request "
+					"buffer %d kmalloc failed\n", i));
+			goto outfail;
+		}
+		memset(req, 0, sizeof (rdma_req_t));
+		buf->rb_send_bufs[i] = req;
+		buf->rb_send_bufs[i]->rl_buffer = buf;
+
+		if (rdma_register_internal(ia, req->rl_base, len + MAX_RDMAHDR,
+					&buf->rb_send_bufs[i]->rl_handle,
+					&buf->rb_send_bufs[i]->rl_iov)) {
+			goto outfail;
+		}
+		/* don't include rdma header or extra RPC header in space */
+		buf->rb_send_bufs[i]->rl_size = len - MAX_RPCHDR;
+
+		len = cdata->inline_rsize + MAX_RPCHDR;
+		rep = kmalloc(len + sizeof (rdma_rep_t), GFP_KERNEL);
+		if (rep == NULL) {
+			Dprintk(0, ("rdma_buffer_create: reply "
+					"buffer %d kmalloc failed\n", i));
+			goto outfail;
+		}
+		memset(rep, 0, sizeof (rdma_rep_t));
+		buf->rb_recv_bufs[i] = rep;
+		buf->rb_recv_bufs[i]->rr_buffer = buf;
+		init_waitqueue_head(&rep->rr_unbind);
+
+		if (rdma_register_internal(ia, rep->rr_base, len + MAX_RDMAHDR,
+					&buf->rb_recv_bufs[i]->rr_handle,
+					&buf->rb_recv_bufs[i]->rr_iov)) {
+			goto outfail;
+		}
+	}
+	/* done */
+	return buf;
+
+      outfail:
+	if (buf) {
+		rdma_buffer_destroy(buf);
+	}
+	return NULL;
+}
+
+/*
+ * Destroy/cleanup buffer/lmr memory. Need to deal with
+ * partial initialization, so it's callable from failed create.
+ * Must be called before destroying endpoint, as registrations
+ * reference it.
+ *
+ * TBD: fully account for memory that may belong to the H/W.
+ * means working with the EVDs. There should be events
+ * returning memory after the connection was closed (which must
+ * be done BEFORE this call is made).
+ */
+void
+rdma_buffer_destroy(rdma_buffer_t *bufp)
+{
+	u32 datstatus;
+	int i;
+
+	/* clean up in reverse order from create
+	 *   1.  recv lmr memory (lmr free, then kfree)
+	 *   1a. bind rmr memory
+	 *   2.  send lmr memory (lmr free, then kfree)
+	 *   3.  padding (if any) [moved to rdma_ep_destroy]
+	 *   4.  arrays
+	 */
+	Dprintk(1, ("rdma_buffer_destroy: entering\n"));
+	/*  _rdma_dump_buffer(bufp); */
+
+	for (i = 0; i < bufp->rb_max_requests; i++) {
+		if (bufp->rb_recv_bufs && bufp->rb_recv_bufs[i]) {
+			rdma_deregister_internal(bufp->rb_recv_bufs[i]->rr_handle);
+			kfree(bufp->rb_recv_bufs[i]);
+		}
+		if (bufp->rb_send_bufs && bufp->rb_send_bufs[i]) {
+			while (!list_empty(&bufp->rb_rmrs)) {
+				struct rdma_rmr_entry *r;
+				r = list_entry(bufp->rb_rmrs.next,
+					struct rdma_rmr_entry, rmr_freelist);
+				list_del(&r->rmr_freelist);
+				datstatus = dat_rmr_free(r->rmr_handle);
+				Dprintk(0 && datstatus != DAT_SUCCESS,
+					("rdma_buffer_destroy: dat_rmr_free "
+						"failed with 0x%x\n",
+					datstatus));
+			}
+			rdma_deregister_internal(bufp->rb_send_bufs[i]->rl_handle);
+			kfree(bufp->rb_send_bufs[i]);
+		}
+	}
+
+	if (bufp->rb_pool) {
+		kfree(bufp->rb_pool);
+	}
+}
+
+/*
+ * Get a set of request/reply buffers.
+ *
+ * Reply buffer (if needed) is attached to send buffer upon return.
+ * Rule:
+ *    rb_send_index and rb_recv_index MUST always be pointing to the
+ *    *next* available buffer (non-NULL). They are incremented after
+ *    removing buffers, and decremented *before* returning them.
+ */
+rdma_req_t *
+rdma_buffer_get(rdma_buffer_t *buffers)
+{
+	rdma_req_t *req;
+	unsigned int lock_flags;
+
+	spin_lock_irqsave(&buffers->rb_lock, lock_flags);
+	RDMA_ASSERT(buffers->rb_send_index < buffers->rb_max_requests,
+			"rdma_buffer_get over max_requests");
+	req = buffers->rb_send_bufs[buffers->rb_send_index];
+	if (buffers->rb_send_index < buffers->rb_recv_index) {
+		Dprintk(1, ("rdma_buffer_get: %d extra receives outstanding (ok)\n",
+			buffers->rb_recv_index - buffers->rb_send_index));
+		req->rl_reply = NULL;
+	} else {
+		req->rl_reply = buffers->rb_recv_bufs[buffers->rb_recv_index];
+		buffers->rb_recv_bufs[buffers->rb_recv_index++] = NULL;
+	}
+	buffers->rb_send_bufs[buffers->rb_send_index++] = NULL;
+	if (!list_empty(&buffers->rb_rmrs)) {
+		int i = RDMA_MAX_SEGS - 1;
+		do {
+			struct rdma_rmr_entry *r;
+			r = list_entry(buffers->rb_rmrs.next,
+					struct rdma_rmr_entry, rmr_freelist);
+			list_del(&r->rmr_freelist);
+			req->rl_seg_handles[i].rl_rmr = r;
+		} while (--i >= 0);
+	}
+	spin_unlock_irqrestore(&buffers->rb_lock, lock_flags);
+	return req;
+}
+
+/*
+ * Put request/reply buffers back into pool.
+ * Pre-decrement counter/array index.
+ */
+void
+rdma_buffer_put(rdma_req_t *req)
+{
+	rdma_buffer_t *buffers = req->rl_buffer;
+	unsigned int lock_flags;
+
+	RDMA_ASSERT(req->rl_nsegs == 0, "rdma_buffer_put with active RDMA");
+	spin_lock_irqsave(&buffers->rb_lock, lock_flags);
+	buffers->rb_send_bufs[--buffers->rb_send_index] = req;
+	req->rl_niovs = req->rl_nsegs = 0;
+	if (req->rl_reply) {
+		buffers->rb_recv_bufs[--buffers->rb_recv_index] = req->rl_reply;
+		init_waitqueue_head(&req->rl_reply->rr_unbind);
+		req->rl_reply->rr_func = NULL;
+		req->rl_reply = NULL;
+	}
+	if (req->rl_seg_handles[0].rl_rmr) {
+		/*
+		 * Cycle rmr's back in reverse order, and "spin" them.
+		 * This delays and scrambles reuse as much as possible.
+		 */
+		int i = 1;
+		do {
+			list_add_tail(&req->rl_seg_handles[i].rl_rmr->rmr_freelist,
+					&buffers->rb_rmrs);
+			req->rl_seg_handles[i].rl_rmr = NULL;
+		} while (++i < RDMA_MAX_SEGS);
+		list_add_tail(&req->rl_seg_handles[0].rl_rmr->rmr_freelist,
+					&buffers->rb_rmrs);
+		req->rl_seg_handles[0].rl_rmr = NULL;
+	}
+	spin_unlock_irqrestore(&buffers->rb_lock, lock_flags);
+}
+
+/*
+ * Recover reply buffers from pool.
+ * This happens when recovering from error conditions.
+ * Post-increment counter/array index.
+ */
+void
+rdma_recv_buffer_get(rdma_req_t *req)
+{
+	rdma_buffer_t *buffers = req->rl_buffer;
+	unsigned int lock_flags;
+
+	spin_lock_irqsave(&buffers->rb_lock, lock_flags);
+	if (buffers->rb_recv_index < buffers->rb_max_requests) {
+		req->rl_reply = buffers->rb_recv_bufs[buffers->rb_recv_index];
+		buffers->rb_recv_bufs[buffers->rb_recv_index++] = NULL;
+	}
+	spin_unlock_irqrestore(&buffers->rb_lock, lock_flags);
+}
+
+/*
+ * Put reply buffers back into pool when not attached to
+ * request. This happens in error conditions, and when
+ * aborting unbinds. Pre-decrement counter/array index.
+ */
+void
+rdma_recv_buffer_put(rdma_rep_t *rep)
+{
+	rdma_buffer_t *buffers = rep->rr_buffer;
+	unsigned int lock_flags;
+
+	rep->rr_func = NULL;
+	spin_lock_irqsave(&buffers->rb_lock, lock_flags);
+	buffers->rb_recv_bufs[--buffers->rb_recv_index] = rep;
+	spin_unlock_irqrestore(&buffers->rb_lock, lock_flags);
+}
+
+/*
+ * Wrappers for internal-use kmalloc memory registration, used by buffer code.
+ */
+int
+rdma_register_internal(rdma_ia_t *ia, void *va, int len,
+		       struct dat_lmr **handlep,
+		       struct dat_lmr_triplet *triplet)
+{
+	u32 datstatus;
+	DAT_REGION_DESCRIPTION region;
+
+	if (ia->ri_bind_mem) {
+		*handlep = NULL;
+		triplet->lmr_context = ia->ri_bind_iov.lmr_context;
+		triplet->segment_length = len;
+		triplet->virtual_address = __pa(va);
+		return 0;
+	}
+
+	region.for_va = (void *)va;
+	datstatus = dat_lmr_kcreate(ia->ri_ia_handle,
+				DAT_MEM_TYPE_VIRTUAL,
+				region,
+				(u64) len,
+				ia->ri_pz_handle,
+				DAT_MEM_PRIV_LOCAL_READ_FLAG |
+					DAT_MEM_PRIV_LOCAL_WRITE_FLAG,
+				DAT_MEM_OPTIMIZE_DONT_CARE,
+				handlep,
+				&triplet->lmr_context,
+				NULL,	/* no need for rmr context */
+				&triplet->segment_length,
+				&triplet->virtual_address);
+	Dprintk(0 && datstatus != DAT_SUCCESS,
+		("rdma_register_internal: dat_lmr_kcreate failed with 0x%x\n",
+			datstatus));
+	return (datstatus != DAT_SUCCESS);
+}
+
+int
+rdma_deregister_internal(void *handle)
+{
+	u32 datstatus;
+
+	if (handle == NULL) {
+		datstatus = DAT_SUCCESS;
+	} else {
+		datstatus = dat_lmr_free(handle);
+		Dprintk(0 && datstatus != DAT_SUCCESS,
+			("rdma_deregister_internal: dat_lmr_free failed with 0x%x\n",
+				datstatus));
+	}
+	return (datstatus != DAT_SUCCESS);
+}
+
+/*
+ * Wrappers for chunk registration, shared by read/write chunk code.
+ */
+int
+rdma_register_external(rdma_mr_iov *seg,
+			rdma_mr_handle *handlep,
+			DAT_RMR_CONTEXT *rmr_contextp,
+			int writing, rdma_xprt_t *r_xprt)
+{
+	u32 datstatus;
+	rdma_ia_t *ia = r_xprt->rx_ia;
+	enum dat_mem_priv_flags mem_priv = (writing ?
+				DAT_MEM_PRIV_REMOTE_WRITE_FLAG :
+				DAT_MEM_PRIV_REMOTE_READ_FLAG);
+	unsigned int lock_flags;
+
+#if RPCRDMA_DEBUG
+	/* Persistent registration */
+	if (ia->ri_memreg_strategy == 4) {
+		*rmr_contextp = ia->ri_bind_rmr;
+		return 0;
+	}
+#endif
+
+	/* Fast registration using rmr_bind on physical lmr */
+	if (ia->ri_bind_mem != NULL) {
+		DAT_RMR_COOKIE cookie;
+		struct dat_lmr_triplet triplet;
+		cookie.as_ptr = 0;
+		triplet.virtual_address = seg->mr_base;
+		triplet.segment_length = seg->mr_len;
+		triplet.lmr_context = ia->ri_bind_iov.lmr_context;
+		spin_lock_irqsave(&r_xprt->rx_ep.rep_postlock, lock_flags);
+		DECR_CQCOUNT(&r_xprt->rx_ep);
+		datstatus = dat_rmr_bind(handlep->rl_rmr->rmr_handle,
+					 handlep->rl_lmr, &triplet,
+					 mem_priv, r_xprt->rx_ep.rep_handle,
+					 cookie, DAT_COMPLETION_SUPPRESS_FLAG,
+					 rmr_contextp);
+		spin_unlock_irqrestore(&r_xprt->rx_ep.rep_postlock, lock_flags);
+
+	/* Default registration using a new lmr_kcreate each time */
+	} else {
+		DAT_REGION_DESCRIPTION region;
+		DAT_LMR_CONTEXT lmr_context;
+		u64 vaddr;
+		u64 seglen;
+		region.for_va = (void *) (unsigned long) seg->mr_base;
+		datstatus = dat_lmr_kcreate(ia->ri_ia_handle,
+				DAT_MEM_TYPE_VIRTUAL, region, seg->mr_len,
+				ia->ri_pz_handle, mem_priv,
+				DAT_MEM_OPTIMIZE_DONT_CARE,
+				&handlep->rl_lmr, &lmr_context,
+				rmr_contextp, &seglen, &vaddr);
+	}
+	Dprintk(0 && datstatus != DAT_SUCCESS,
+		("rdma_register_external failed dat_%s %llu at 0x%llx status 0x%x\n",
+		(ia->ri_bind_mem != NULL) ? "rmr_bind" : "lmr_kcreate",
+		seg->mr_len, seg->mr_base, datstatus));
+
+	return (datstatus != DAT_SUCCESS);
+}
+
+int
+rdma_deregister_external(rdma_mr_handle *handlep,
+				rdma_xprt_t *r_xprt, void *rep)
+{
+	u32 datstatus;
+	unsigned int lock_flags;
+
+#if RPCRDMA_DEBUG
+	/* Persistent registration */
+	if (r_xprt->rx_ia->ri_memreg_strategy == 4) {
+		if (rep) {
+			rdma_reply_func func = ((rdma_rep_t *) rep)->rr_func;
+			((rdma_rep_t *) rep)->rr_func = NULL;
+			func(rep);
+		}
+		return 0;
+	}
+#endif
+
+	if (r_xprt->rx_ia->ri_bind_mem != NULL) {
+		DAT_RMR_COOKIE cookie;
+		struct dat_lmr_triplet triplet;
+		cookie.as_ptr = rep;
+		triplet.lmr_context = 0;
+		triplet.segment_length = 0;
+		triplet.virtual_address = 0;
+		spin_lock_irqsave(&r_xprt->rx_ep.rep_postlock, lock_flags);
+		if (rep) {
+			INIT_CQCOUNT(&r_xprt->rx_ep);
+			datstatus = dat_rmr_bind(handlep->rl_rmr->rmr_handle,
+						 handlep->rl_lmr, &triplet,
+						 DAT_MEM_PRIV_NONE_FLAG,
+						 r_xprt->rx_ep.rep_handle,
+						 cookie,
+						 DAT_COMPLETION_DEFAULT_FLAG,
+						 NULL);
+			if (datstatus != DAT_SUCCESS)
+				((rdma_rep_t *) rep)->rr_func = NULL;
+		} else {
+			DECR_CQCOUNT(&r_xprt->rx_ep);
+			datstatus = dat_rmr_bind(handlep->rl_rmr->rmr_handle,
+						 handlep->rl_lmr,
+					&triplet, DAT_MEM_PRIV_NONE_FLAG,
+					r_xprt->rx_ep.rep_handle, cookie,
+					DAT_COMPLETION_SUPPRESS_FLAG, NULL);
+		}
+		spin_unlock_irqrestore(&r_xprt->rx_ep.rep_postlock, lock_flags);
+	} else {
+		datstatus = dat_lmr_free(handlep->rl_lmr);
+		handlep->rl_lmr = NULL;
+	}
+	Dprintk(0 && datstatus != DAT_SUCCESS,
+		("rdma_deregister_external failed dat_%s, status 0x%x\n",
+		(r_xprt->rx_ia->ri_bind_mem != NULL) ?
+		"rmr_unbind" : "lmr_free", datstatus));
+	return (datstatus != DAT_SUCCESS);
+}
+
+/*
+ * Prepost any receive buffer, then post send.
+ *
+ * Receive buffer is donated to hardware, reclaimed upon recv completion.
+ */
+int
+rdma_ep_post(rdma_ep_t *ep, rdma_req_t *req)
+{
+	DAT_DTO_COOKIE rcookie, scookie;
+	u32 datstatus;
+	rdma_rep_t *rep = req->rl_reply;
+	unsigned int lock_flags;
+
+	/* recv cookie */
+	rcookie.as_ptr = rep;
+	/* no send cookie */
+	scookie.as_ptr = NULL;
+	spin_lock_irqsave(&ep->rep_postlock, lock_flags);
+	if (rep) {
+		DECR_CQCOUNT(ep);
+		datstatus = dat_ep_post_recv(ep->rep_handle,
+					1, &rep->rr_iov,
+					rcookie, DAT_COMPLETION_DEFAULT_FLAG);
+		if (datstatus != DAT_SUCCESS) {
+			goto out;
+		}
+		rep = req->rl_reply = NULL;
+	}
+
+	if (DECR_CQCOUNT(ep) > 0) {
+		datstatus = dat_ep_post_send(ep->rep_handle,
+					req->rl_niovs, req->rl_send_iov,
+					scookie, DAT_COMPLETION_SUPPRESS_FLAG);
+	} else {
+		/* Provider must take a send completion every now and then */
+		INIT_CQCOUNT(ep);
+		datstatus = dat_ep_post_send(ep->rep_handle,
+					req->rl_niovs, req->rl_send_iov,
+					scookie, DAT_COMPLETION_DEFAULT_FLAG);
+	}
+      out:
+	spin_unlock_irqrestore(&ep->rep_postlock, lock_flags);
+
+	Dprintk(0 && datstatus != DAT_SUCCESS,
+		("rdma_ep_post: dat_ep_post_%s returned 0x%x\n",
+			rep ? "recv" : "send", datstatus));
+
+	return (datstatus != DAT_SUCCESS);
+}
+
+/*
+ * Re-post a receive buffer. Only used in error cases.
+ */
+int
+rdma_ep_post_recv(rdma_ep_t *ep, rdma_rep_t *rep)
+{
+	DAT_DTO_COOKIE cookie;
+	u32 datstatus;
+	unsigned int lock_flags;
+
+	cookie.as_ptr = rep;
+	spin_lock_irqsave(&ep->rep_postlock, lock_flags);
+	DECR_CQCOUNT(ep);
+	datstatus = dat_ep_post_recv(ep->rep_handle, 1,
+			&rep->rr_iov, cookie, DAT_COMPLETION_DEFAULT_FLAG);
+	spin_unlock_irqrestore(&ep->rep_postlock, lock_flags);
+
+	Dprintk(0 && datstatus != DAT_SUCCESS,
+		("rdma_ep_post_recv: dat_ep_post_recv returned 0x%x\n",
+		datstatus));
+	return (datstatus != DAT_SUCCESS);
+}
+
+/*
+ * internal functions
+ */
+
+/*
+ * rdma_async_evd_upcall
+ *
+ * This upcall "handles" catastrophic errors.
+ */
+static void
+rdma_async_evd_upcall(void *instance_data,
+			const struct dat_event *event, boolean_t bool_arg)
+{
+	static const char * const asyncstrings[] = {
+		"evd overflow",
+		"catastrophic error",
+		"endpoint broken",
+		"timed out",
+		"provider internal error"
+	};
+	unsigned int evnum = event->event_number - DAT_ASYNC_ERROR_EVD_OVERFLOW;
+	printk("rdma_async_evd_upcall: nic %s %s (event 0x%x)\n",
+		((rdma_ia_t *) instance_data)->ri_ia_attr.adapter_name,
+		evnum < 5 ? asyncstrings[evnum] : "unknown error",
+		event->event_number);
+}
+
+/*
+ * rdma_event_evd_upcall
+ *
+ * This upcall handles DTO, (recv, send, bind and unbind) events.
+ * It is reentrant but has been specified using DAT_UPCALL_SINGLE_INSTANCE
+ * in order to maintain ordering of receives to keep server credits.
+ * It must also be prepared to be called from interrupt context,
+ * so it must not block or perform blocking calls.
+ *
+ * It is the responsibility of the scheduled tasklet to return
+ * recv buffers to the pool. NOTE: this affects synchronization of
+ * connection shutdown. That is, the structures required for
+ * the completion of the reply handler must remain intact until
+ * all memory has been reclaimed. There is some work here TBD.
+ *
+ * Note that send events are suppressed and do not result in an upcall.
+ */
+static void
+rdma_event_evd_upcall(void *instance_data,
+			const struct dat_event *event, boolean_t bool_arg)
+{
+	const struct dat_dto_completion_event_data *dto_data;
+	const struct dat_rmr_bind_completion_event_data *rmr_data;
+	rdma_rep_t *rep;	/* in cookie */
+
+	switch (event->event_number) {
+	case DAT_DTO_COMPLETION_EVENT:
+		dto_data = &event->event_data.dto_completion_event_data;
+		rep = (rdma_rep_t *) dto_data->user_cookie.as_ptr;
+		if (rep) {
+			Dprintk(2 || dto_data->status != DAT_DTO_SUCCESS,
+				("rdma_event_evd_upcall: receive DTO_COMPLETION, "
+				"status 0x%x (%s), len %lld, cookie 0x%p\n",
+				dto_data->status, ststatus(dto_data->status),
+				dto_data->transfered_length, rep));
+			if (dto_data->status == DAT_DTO_SUCCESS) {
+				rep->rr_len = dto_data->transfered_length;
+				/* Keep (only) the most recent credits, check
+				 * their validity later */
+				if (rep->rr_len >= sizeof(struct rdma_msg)) {
+					struct rdma_msg *p =
+					    (struct rdma_msg *) rep->rr_base;
+					atomic_set(&rep->rr_buffer->rb_credits,
+						ntohl(p->rdma_credit));
+				}
+			} else {
+				rep->rr_len = ~0U;
+			}
+			rdma_schedule_tasklet(rep);
+		} else {
+			Dprintk(2 || dto_data->status != DAT_DTO_SUCCESS,
+				("rdma_event_evd_upcall: send DTO_COMPLETION, "
+				"status 0x%x (%s), len %lld\n",
+				dto_data->status, ststatus(dto_data->status),
+				dto_data->transfered_length));
+		}
+		break;
+
+	case DAT_RMR_BIND_COMPLETION_EVENT:
+		rmr_data = &event->event_data.rmr_completion_event_data;
+		rep = (rdma_rep_t *) rmr_data->user_cookie.as_ptr;
+		Dprintk(2 || rmr_data->status != DAT_RMR_BIND_SUCCESS,
+			("rdma_event_evd_upcall: %sBIND_COMPLETION, "
+			 "status 0x%x (%s), cookie 0x%p\n",
+			rep ? "UN" : "", rmr_data->status,
+			ststatus(rmr_data->status), rep));
+		if (rep) {
+			rdma_schedule_tasklet(rep);
+		}
+		break;
+
+	default:
+		Dprintk(0, ("rdma_event_evd_upcall: unexpected event 0x%x\n",
+			event->event_number));
+		break;
+	}
+}
+
+/*
+ * Connection event handler.
+ */
+static void
+rdma_conn_evd_upcall(void *instance_data,
+			const struct dat_event *event, boolean_t bool_arg)
+{
+	rdma_ep_t *ep = instance_data;
+	int connstate = 0;
+
+	switch (event->event_number) {
+	case DAT_CONNECTION_EVENT_ESTABLISHED:
+		connstate = 1;
+		/* fall through */
+	case DAT_CONNECTION_EVENT_PEER_REJECTED:
+	case DAT_CONNECTION_EVENT_NON_PEER_REJECTED:
+	case DAT_CONNECTION_EVENT_ACCEPT_COMPLETION_ERROR:
+	case DAT_CONNECTION_EVENT_DISCONNECTED:
+	case DAT_CONNECTION_EVENT_BROKEN:
+	case DAT_CONNECTION_EVENT_TIMED_OUT:
+	case DAT_CONNECTION_EVENT_UNREACHABLE:
+		Dprintk(1, ("rdma_conn_evd_upcall: CONNECTION event 0x%x, ep 0x%p\n",
+			event->event_number, ep));
+		if (ep->rep_connected != connstate) {
+			/* notify waiter, pass changed state */
+			ep->rep_event_num = event->event_number;
+			ep->rep_connected = connstate;
+			Dprintk(1, ("rdma_conn_evd_upcall: %sconnected\n",
+					connstate ? "" : "dis"));
+			ep->rep_func(ep);
+			wake_up_all(&ep->rep_connect_wait);
+		}
+		break;
+
+	default:
+		Dprintk(0, ("rdma_conn_evd_upcall: unexpected event 0x%x\n",
+			event->event_number));
+		break;
+	}
+}
+
+/*
+ * rdma tasklet function -- just turn around and call the func
+ * for all replies on the list
+ */
+static void
+rdma_run_tasklet(unsigned long data)
+{
+	rdma_rep_t *rep;
+	rdma_reply_func func;
+	unsigned int lock_flags;
+
+	data = data;
+	Dprintk(3, ("rdma_run_tasklet: entering\n"));
+	spin_lock_irqsave(&rdma_tk_lock_g, lock_flags);
+	while (!list_empty(&rdma_tasklets_g)) {
+		rep = list_entry(rdma_tasklets_g.next, rdma_rep_t, rr_list);
+		Dprintk(3, ("rdma_run_tasklet: found entry 0x%p\n", rep));
+		list_del(&rep->rr_list);
+		func = rep->rr_func;
+		rep->rr_func = NULL;
+		spin_unlock_irqrestore(&rdma_tk_lock_g, lock_flags);
+
+		if (func) {
+			func(rep);
+		} else {
+			Dprintk(1, ("rdma_run_tasklet: orphaned reply 0x%p\n", rep));
+			rdma_recv_buffer_put(rep);
+		}
+
+		spin_lock_irqsave(&rdma_tk_lock_g, lock_flags);
+	}
+	spin_unlock_irqrestore(&rdma_tk_lock_g, lock_flags);
+	Dprintk(3, ("rdma_run_tasklet: leaving\n"));
+}
Index: drivers/infiniband/ulp/nfsrdma/rdma_transport.c
===================================================================
--- drivers/infiniband/ulp/nfsrdma/rdma_transport.c	(revision 0)
+++ drivers/infiniband/ulp/nfsrdma/rdma_transport.c	(revision 0)
@@ -0,0 +1,950 @@
+/*
+ * Copyright (c) 2003, 2004, Network Appliance, Inc. All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ *	Redistributions of source code must retain the above copyright
+ *	notice, this list of conditions and the following disclaimer.
+ *
+ *	Redistributions in binary form must reproduce the above
+ *	copyright notice, this list of conditions and the following
+ *	disclaimer in the documentation and/or other materials provided
+ *	with the distribution.
+ *
+ *	Neither the name of the Network Appliance, Inc. nor the names of
+ *	its contributors may be used to endorse or promote products
+ *	derived from this software without specific prior written
+ *	permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+
+/*
+ * rdma_transport.c
+ *
+ * This file contains the top-level implementation of an RPC RDMA
+ * transport, implemented using the new transport switch.
+ * Major functions, which are required by the switch include:
+ *   xprt_rdma_create -- create an instance (e.g. during mount)
+ *   xprt_rdma_destroy -- destroy transport instance
+ *   xprt_rdma_close
+ *   xprt_rdma_allocate -- allocate message buffer(s)
+ *   xprt_rdma_sendmsg -- actually do the send. In the case of RDMA,
+ *         the recv is posted first, and completed async.
+ *   xprt_rdma_reconnect
+ *   xprt_rdma_free -- release message buffer(s)
+ *
+ * Naming convention: functions beginning with xprt_ are part of the
+ * transport switch. All others are RPC RDMA internal.
+ */
+
+/*
+ * See "TBD" for to-be-dones
+ */
+
+#include <linux/module.h>
+#include <linux/version.h>
+#include <linux/sunrpc/clnt.h>
+
+#include "rdma_kdapl.h"
+
+/* prototypes and plumbing for switch */
+
+static int xprt_rdma_create(struct rpc_xprt *, struct xprt_create_data *);
+static int xprt_rdma_destroy(struct rpc_xprt *);
+static void xprt_rdma_close(struct rpc_xprt *);
+static void *xprt_rdma_allocate(struct rpc_xprt *, struct rpc_task *,
+				unsigned int);
+static int xprt_rdma_sendmsg(struct rpc_xprt *, struct rpc_rqst *);
+static void xprt_rdma_reconnect(struct rpc_task *);
+static void xprt_rdma_free(struct rpc_xprt *, struct rpc_task *, void *);
+
+/* Transport procedures for rdma */
+static struct rpc_transport xprt_rdma = {
+	"RDMA",
+	RPC_XPRT_RDMA,
+	{
+		xprt_rdma_allocate,
+		xprt_rdma_sendmsg,
+		xprt_rdma_free,
+		xprt_rdma_reconnect,
+		xprt_rdma_create,
+		xprt_rdma_destroy,
+		xprt_rdma_close
+	}
+};
+
+/*
+ * local functions
+ */
+
+/* register rdma transport upon module load */
+static int rdma_register_xprt(void);
+static int rdma_remove_xprt(void);
+
+/* Callbacks */
+/* perform rpc call completion, as tasklet */
+static void rdma_reply_handler(rdma_rep_t *);
+/* signal rdma unbind completion */
+static void rdma_unbind_func(rdma_rep_t *);
+/* called for connection-relevant events */
+static void rdma_conn_func(struct rdma_ep *);
+
+/* do the work of xprt creation */
+static rdma_xprt_t *rdma_create_xprt(char *,
+					struct rdma_create_data_internal *,
+					struct rpc_xprt *);
+
+/* RPC/RDMA parameters */
+static char *Adapter = "null";	/* TBD this could be a list */
+static int MaxRequests = 100;	/* RPC_MAXREQS;	Linux default */
+static int MaxInlineRead = 1024;	/* 0; */
+static int MaxInlineWrite = 1024;	/* 0; */
+static int Padding = 512;	/* set to server's preference if any */
+static int Stream = 1;		/* TCP(1)/UDP(0) emulation, testing */
+
+#undef Dprintk
+#ifdef RPCRDMA_DEBUG
+static int Memreg = 4;		/* memreg strategy higher=faster */
+#define Dprintk(n, x) if (Debug >= n) printk x
+int Debug = 0;
+static int RKDebug = 0;
+extern int rdma_kdapl_debug;
+#else
+static int Memreg = 3;		/* memreg strategy higher=faster */
+#define Dprintk(n, x)
+#endif
+
+static rdma_ia_t default_ia;	/* The default adapter */
+static int registered = 0;
+
+MODULE_LICENSE("BSD");
+MODULE_DESCRIPTION("RPC/RDMA Transport for Linux kernel NFS");
+MODULE_AUTHOR("Network Appliance, Inc.");
+
+MODULE_PARM(Adapter, "1-32s");
+MODULE_PARM_DESC(Adapter, "Default RDMA adapter name");
+
+MODULE_PARM(MaxRequests, "1-256i");
+MODULE_PARM_DESC(MaxRequests, "Credits requested of server");
+
+MODULE_PARM(MaxInlineRead, "0-65536i");
+MODULE_PARM_DESC(MaxInlineRead, "Maximum inline (non-RDMA) read size");
+
+MODULE_PARM(MaxInlineWrite, "0-65536i");
+MODULE_PARM_DESC(MaxInlineWrite, "Maximum inline (non-RDMA) write size");
+
+MODULE_PARM(Padding, "0-8192i");
+MODULE_PARM_DESC(Padding, "Inline write padding");
+
+#if !RPCRDMA_DEBUG
+
+MODULE_PARM(Memreg, "0-3i");
+MODULE_PARM_DESC(Memreg, "Memreg none 0, sync/lmr 1, sync/rmr 2, async/rmr 3");
+
+#else
+
+MODULE_PARM(Memreg, "0-4i");
+MODULE_PARM_DESC(Memreg, "Memreg none 0, sync/lmr 1, sync/rmr 2, async/rmr 3, perm 4");
+
+MODULE_PARM(Stream, "0-1i");
+MODULE_PARM_DESC(Stream, "Emulation UDP 0, TCP 1, testing");
+
+MODULE_PARM(Debug, "0-1i");
+MODULE_PARM_DESC(Debug, "NFS/RDMA debug");
+
+MODULE_PARM(RKDebug, "0-3i");
+MODULE_PARM_DESC(RKDebug, "RPC/kDAPL debug");
+#endif
+
+/***********************************************************************
+ * init_module
+ *
+ * Entry point for a Linux module, performs simple initialization
+ ***********************************************************************/
+
+static int __init rdma_init(void)
+{
+#if RPCRDMA_DEBUG
+	rdma_kdapl_debug = RKDebug;
+#endif
+
+	Dprintk(0, ("RPCRDMA Module Init, register RPC RDMA transport\n"));
+
+	Dprintk(0, ("Defaults:\n"));
+	Dprintk(0, ("\tAdapter %s\n\tMaxRequests %d\n",
+		Adapter, MaxRequests));
+	Dprintk(0, ("\tMaxInlineRead %d\n\tMaxInlineWrite %d\n",
+		MaxInlineRead, MaxInlineWrite));
+	Dprintk(0, ("\tPadding %d\n\tMemreg %d\n\tStream %d\n", Padding,
+		Memreg, Stream));
+	Dprintk(0, ("\tDebug %d\n\tRKDebug %d\n", Debug, RKDebug));
+
+	/* Open the default adapter */
+	if (!rdma_ia_init(&default_ia, Adapter, Memreg)) {
+		Dprintk(0, ("RPCRDMA module: can't open default adapter \"%s\"\n",
+			Adapter));
+		return -ENOENT;
+	}
+
+	if (rdma_register_xprt()) {
+		rdma_ia_close(&default_ia);
+		return -EEXIST;
+	}
+
+	return 0;
+}
+
+/***********************************************************************
+ * cleanup_module
+ *
+ * Entry point for a Linux module, cleans up the module on exit
+ ***********************************************************************/
+static void __exit rdma_exit(void)
+{
+	rdma_ia_close(&default_ia);
+
+	Dprintk(0, ("RPCRDMA Module Removed, deregister RPC RDMA transport\n"));
+	rdma_remove_xprt();
+}
+
+module_init(rdma_init);
+module_exit(rdma_exit);
+
+/*
+ * Routines to register/remove the transport. To be called when
+ * module is loaded/unloaded
+ */
+static int
+rdma_register_xprt()
+{
+	if (xprt_register(&xprt_rdma))
+		return -EEXIST;
+	registered++;
+	return 0;
+}
+
+static int
+rdma_remove_xprt()
+{
+	if (xprt_unregister(&xprt_rdma))
+		return -ENOENT;
+	registered--;
+	return 0;
+}
+
+static int
+xprt_rdma_create(struct rpc_xprt *xprt, struct xprt_create_data *data)
+{
+	rdma_xprt_t *r_xprt;
+	struct rdma_create_data *datap = &data->u.rdma_data;
+	struct rdma_create_data_internal cdata;
+
+	/* fill in some generic fields to keep RPC happy */
+	xprt->stream = (Stream != 0);
+	xprt->addr.sin_port = 1;	/* avoid portmap calls */
+	xprt->prot = (xprt->stream ? IPPROTO_TCP : IPPROTO_UDP);
+
+	xprt->nocong = 1;		/* we do this ourselves */
+	xprt->cwnd = RPC_CWNDSCALE;	/* Allow one send at first */
+
+#if 0
+	if (datap->timeo) {
+		xprt->timeout = *datap->timeo;
+		xprt->timeout.to_current = datap->timeo->to_initval;
+		xprt->timeout.to_resrvval = datap->timeo->to_maxval << 1;
+	} else
+#endif
+		xprt_set_timeout(&xprt->timeout, 1, 30 * HZ);
+
+	/* Set server address(es) */
+	xprt->addr = datap->srvaddr;	/* IP address */
+	cdata.addr = datap->addr;	/* RDMA address, possibly different */
+
+	/* [See temporary trick in sunrpc.c xprt_create_proto()] */
+	if (cdata.addr.sa_family == 0) {
+		cdata.addr = *(struct sockaddr *)&xprt->addr;
+		memcpy(&cdata.addr.sa_data[2], &xprt->addr.sin_zero[2], 4);
+		/* zero out the 9999 trick, lest clients like NLM come back
+		 * with it. We only support the initial mount doing this. */
+		memset(xprt->addr.sin_zero, 0, sizeof xprt->addr.sin_zero);
+		memset(datap->srvaddr.sin_zero, 0, sizeof xprt->addr.sin_zero);
+	}
+	/* [end trick] */
+
+	if (datap->port)
+		cdata.port = datap->port;
+	else
+		cdata.port = 2049ULL;
+
+	/* Set max requests */
+	if ((cdata.max_requests = datap->max_requests) <= 0) {
+		cdata.max_requests = MaxRequests;
+	}
+
+	/* Set some length limits */
+	if ((cdata.rsize = datap->rsize) <= 0)
+		cdata.rsize = 32768;		/* NFS3_MAXDATA */
+	if ((cdata.wsize = datap->wsize) <= 0)
+		cdata.wsize = 32768;		/* NFS3_MAXDATA */
+
+	if ((cdata.inline_wsize = datap->max_inline_send) <= 0) {
+		cdata.inline_wsize = MaxInlineWrite;
+	}
+	if (cdata.inline_wsize > cdata.wsize) {
+		cdata.inline_wsize = cdata.wsize;
+	}
+
+	if ((cdata.inline_rsize = datap->max_inline_recv) <= 0) {
+		cdata.inline_rsize = MaxInlineRead;
+	}
+	if (cdata.inline_rsize > cdata.rsize) {
+		cdata.inline_rsize = cdata.rsize;
+	}
+
+	if ((cdata.padding = datap->padding) <= 0) {
+		cdata.padding = Padding;
+	}
+
+	/*
+	 * Create and connect new transport.
+	 * TBD provide adapter choice?
+	 */
+	r_xprt = rdma_create_xprt(NULL, &cdata, xprt);
+
+	if (r_xprt == NULL) {
+		return -1;
+	}
+
+	/* rpciod handles stream reconnect */
+	if (xprt->stream)
+		rpciod_up();
+
+	/* Overload unused "sock" as our private pointer */
+	xprt->sock = (struct socket *)r_xprt;
+	xprt->inet = (struct sock *)~0;
+	return 0;
+}
+
+/*
+ * xprt_rdma_destroy
+ *
+ * Destroy the xprt.
+ * Free all memory associated with the object, including its own.
+ * NOTE: none of the *destroy methods free memory for their top-level
+ * objects, even though they may have allocated it (they do free
+ * private memory). It's up to the caller to handle it. In this 
+ * case (RDMA transport), all structure memory is inlined with the
+ * rdma_xprt_t.
+ */
+static int
+xprt_rdma_destroy(struct rpc_xprt *xprt)
+{
+	rdma_xprt_t *r_xprt = rpcx_to_rdmax(xprt);
+
+	Dprintk(1, ("xprt_rdma_destroy called\n"));
+
+	xprt_clear_connected(xprt);
+
+	if (r_xprt) {
+		rdma_buffer_destroy(&r_xprt->rx_buf);
+		(void) rdma_ep_destroy(&r_xprt->rx_ep, r_xprt->rx_ia);
+	}
+
+	if (r_xprt->rx_ia != &default_ia) {
+		rdma_ia_close(r_xprt->rx_ia);
+		kfree(r_xprt->rx_ia);
+	}
+
+	kfree(r_xprt);
+	if (xprt->stream)
+		rpciod_down();
+	Dprintk(1, ("xprt_rdma_destroy returning\n"));
+	return 0;
+}
+
+/*
+ * rdma_create_xprt
+ *
+ * Create a transport instance, which includes initialized
+ *  o ia
+ *  o endpoint
+ *  o buffers
+ * Returns a pointer to the new structure.
+ */
+static rdma_xprt_t *
+rdma_create_xprt(char *ia_name,
+		struct rdma_create_data_internal *cdata, struct rpc_xprt *xprt)
+{
+	rdma_xprt_t *new_xprt;
+	rdma_ep_t *new_ep = NULL;
+	rdma_ia_t *new_ia = NULL;
+	rdma_buffer_t *new_buf = NULL;
+	int extra = 0;
+	struct rpc_rqst *rqst;
+
+	list_for_each_entry(rqst, &xprt->free, rq_list)
+		extra++;
+	if (cdata->max_requests > extra) {
+		/* More than provided by RPC: carve out some more slots */
+		extra = (cdata->max_requests - extra) * sizeof(struct rpc_rqst);
+	} else {
+		extra = 0;
+	}
+
+	new_xprt = kmalloc(sizeof(rdma_xprt_t) + extra, GFP_KERNEL);
+	if (!new_xprt) {
+		goto outfail;
+	}
+	memset(new_xprt, 0, sizeof(rdma_xprt_t) + extra);
+
+	/* Add any extra request slot entries to table */
+	rqst = (struct rpc_rqst *) (new_xprt + 1);
+	while ((extra -= sizeof(struct rpc_rqst)) >= 0) {
+		list_add(&rqst->rq_list, &xprt->free);
+		rqst++;
+	}
+
+	if (ia_name == NULL) {
+		new_ia = &default_ia;
+		ia_name = Adapter;
+	} else {
+		new_ia = kmalloc(sizeof(rdma_ia_t), GFP_KERNEL);
+		if (!new_ia) {
+			goto outfail;
+		}
+		if (!rdma_ia_init(new_ia, ia_name, Memreg)) {
+			goto outfail;
+		}
+	}
+	new_xprt->rx_ia = new_ia;
+
+	/*
+	 * initialize and create ep
+	 */
+	new_ep = &new_xprt->rx_ep;
+	new_xprt->rx_data = *cdata;
+	new_ep->rep_remote_addr = cdata->addr;
+	new_ep->rep_server_port = cdata->port; 	/* (64-bit!) */
+	rdma_ep_default_attr(new_ep, new_ia, &new_xprt->rx_data);
+	Dprintk(0, ("rdma_create_xprt: %s %d.%d.%d.%d:%lld\n",
+		ia_name,
+		(unsigned char) new_ep->rep_remote_addr.sa_data[2],
+		(unsigned char) new_ep->rep_remote_addr.sa_data[3],
+		(unsigned char) new_ep->rep_remote_addr.sa_data[4],
+		(unsigned char) new_ep->rep_remote_addr.sa_data[5],
+		new_ep->rep_server_port));
+
+	new_ep = rdma_ep_create(&new_xprt->rx_ep, new_ia, &new_xprt->rx_data);
+	if (!new_ep) {
+		goto outfail;
+	}
+
+	/*
+	 * Allocate pre-registered send and receive buffers for headers and
+	 * any inline data. Also specify any padding which will be provided
+	 * from a preregistered zero buffer.
+	 */
+	new_buf = rdma_buffer_create(&new_xprt->rx_buf, new_ep,
+				     new_ia, &new_xprt->rx_data);
+	if (!new_buf) {
+		goto outfail;
+	}
+
+	/*
+	 * Register a callback for connection events. This is necessary because
+	 * connection loss notification is async. We also catch connection loss
+	 * when reaping receives.
+	 */
+	new_ep->rep_func = rdma_conn_func;
+	new_ep->rep_xprt = xprt;
+
+	if (rdma_ep_connect(new_ep, new_ia, 0)) {
+		goto outfail;
+	}
+	return new_xprt;
+
+      outfail:
+	/* clean up any work done */
+	if (new_buf) {
+		rdma_buffer_destroy(new_buf);
+	}
+	if (new_ep) {
+		rdma_ep_destroy(new_ep, new_ia);
+	}
+	if (new_ia && new_ia != &default_ia) {
+		rdma_ia_close(new_ia);
+		kfree(new_ia);
+	}
+	if (new_xprt) {
+		kfree(new_xprt);
+	}
+	return NULL;
+}
+
+/*
+ * Close a connection, during shutdown or timeout/reconnect
+ */
+static void
+xprt_rdma_close(struct rpc_xprt *xprt)
+{
+	rdma_xprt_t *r_xprt = rpcx_to_rdmax(xprt);
+
+	Dprintk(1, ("xprt_rdma_close: closing"));
+	xprt_clear_connected(xprt);
+	rdma_ep_disconnect(&r_xprt->rx_ep, r_xprt->rx_ia);
+}
+
+/*
+ * The RDMA allocate/free functions need the task structure as a place
+ * to hide the rdma_req_t, which is necessary for the actual send/recv
+ * sequence. For this reason, the recv buffers are attached to send
+ * buffers for portions of the RPC. Note that the RPC layer allocates
+ * both send and receive buffers in the same call. We may register
+ * the receive buffer portion when using reply chunks.
+ */
+static void *
+xprt_rdma_allocate(struct rpc_xprt *xprt,
+			struct rpc_task *task, unsigned int size)
+{
+	rdma_req_t *req, *nreq;
+
+	req = rdma_buffer_get(&rpcx_to_rdmax(xprt)->rx_buf);
+	Dprintk(1, ("xprt_rdma_allocate: size %d, request 0x%p\n", size, req));
+
+	if (size > req->rl_size + MAX_RPCHDR) {
+		Dprintk(1, ("xprt_rdma_allocate: size %d too large for %d: "
+			"prog %d vers %d proc %d\n",
+			size, req->rl_size,
+			task->tk_client->cl_prog, task->tk_client->cl_vers,
+			task->tk_msg.rpc_proc->p_proc));
+		/*
+		 * Outgoing length shortage. Our inline write max must have
+		 * been configured to perform direct i/o.
+		 *
+		 * This is therefore a large metadata operation, and the
+		 * allocate call was made on the maximum possible message,
+		 * e.g. containing long filename(s) or symlink data. In
+		 * fact, while these metadata operations *might* carry
+		 * large outgoing payloads, they rarely *do*. However, we
+		 * have to commit to the request here, so reallocate and
+		 * register it now. The data path will never require this
+		 * reallocation.
+		 * 
+		 * If the allocation or registration fails, the RPC framework
+		 * will (doggedly) retry.
+		 */
+		if (rpcx_to_rdmax(xprt)->rx_ia->ri_memreg_strategy == 0) {
+			/* forced to "pure inline" */
+			Dprintk(0, ("xprt_rdma_allocate: too much data "
+					"(%d) for inline\n", size));
+			return NULL;
+		}
+		nreq = kmalloc(sizeof *req + size, GFP_KERNEL);
+		if (nreq == NULL) {
+			return NULL;
+		}
+		if (rdma_register_internal(rpcx_to_rdmax(xprt)->rx_ia,
+					nreq->rl_base, size + MAX_RDMAHDR,
+					&nreq->rl_handle, &nreq->rl_iov)) {
+			kfree(nreq);
+			return NULL;
+		}
+/* TBD increment a "hard way" statistic (+= size) */
+		nreq->rl_size = size;
+		nreq->rl_niovs = 0;
+		nreq->rl_nsegs = 0;
+		nreq->rl_buffer = (struct rdma_buffer *)req;
+		nreq->rl_reply = req->rl_reply;
+		memcpy(nreq->rl_seg_handles, req->rl_seg_handles,
+						sizeof nreq->rl_seg_handles);
+		/* flag the swap with an unused field */
+		nreq->rl_iov.segment_length = 0;
+		req->rl_reply = NULL;
+		req = nreq;
+	}
+	return req + 1;
+}
+
+/*
+ * This function returns all RDMA resources to the pool.
+ */
+static void
+xprt_rdma_free(struct rpc_xprt *xprt, struct rpc_task *task, void *buf)
+{
+	rdma_req_t *req = (rdma_req_t *)buf - 1;
+	rdma_rep_t *rep = req->rl_reply;
+	rdma_xprt_t *r_xprt = rpcx_to_rdmax(xprt);
+
+	Dprintk(1, ("xprt_rdma_free: called with buf 0x%p, %swait on 0x%p\n",
+		buf, (rep && rep->rr_func) ? "" : "no ", rep));
+
+	/*
+	 * Finish the deregistration. When using rmr bind, this was
+	 * begun in rdma_reply_handler(). When using lmr_free, we do
+	 * it here, in thread context. The process is considered
+	 * complete when the rr_func vector becomes NULL - this
+	 * was put in place during rdma_reply_handler() - the wait
+	 * call below will not block if the dereg is "done". If
+	 * interrupted, our framework will clean up.
+	 */
+	while (req->rl_nsegs) {
+		rdma_deregister_external(&req->rl_seg_handles[--req->rl_nsegs],
+					       r_xprt, NULL);
+	}
+
+	if (rep && wait_event_interruptible(rep->rr_unbind, !rep->rr_func)) {
+		rep->rr_func = NULL;	/* abandon the callback */
+		req->rl_reply = NULL;
+	}
+
+	if (req->rl_iov.segment_length == 0) {	/* see allocate above */
+		rdma_req_t *oreq = (rdma_req_t *)req->rl_buffer;
+		oreq->rl_reply = req->rl_reply;
+		rdma_deregister_internal(req->rl_handle);
+		kfree(req);
+		req = oreq;
+	}
+
+	/* Put back request+reply buffers */
+	rdma_buffer_put(req);
+
+	/* Update most recent server credits and done */
+	if (r_xprt->rx_ep.rep_connected == 1) {
+		int credits;
+		spin_lock(&xprt->xprt_lock);
+		credits = atomic_read(&req->rl_buffer->rb_credits);
+		if (credits > req->rl_buffer->rb_max_requests) {
+			credits = req->rl_buffer->rb_max_requests;
+		}
+		credits *= RPC_CWNDSCALE;
+		/* new credits can drop at most one/reply, but
+		 * can increase by any amount. */
+		if (credits < xprt->cwnd) {
+			xprt->cwnd -= RPC_CWNDSCALE;
+		} else {
+			xprt->cwnd = credits;
+		}
+/* TBD keep a stat and note credit grant/use hiwaters */
+		if (RPCXPRT_CONGESTED(xprt)) {
+			if (xprt->cwnd == 0) {
+				Dprintk(0, ("rdma_reply: server dropped credits to 0!\n"));
+				xprt->cwnd = RPC_CWNDSCALE; /* don't deadlock */
+			}
+			Dprintk(1 && xprt->cong - RPC_CWNDSCALE >= xprt->cwnd,
+				("rdma_reply: still flow controlled (%ld/%ld)\n",
+				 (xprt->cong / RPC_CWNDSCALE) - 1,
+				 xprt->cwnd / RPC_CWNDSCALE));
+		}
+		spin_unlock(&xprt->xprt_lock);
+	}
+}
+
+/*
+ * This function is called when an unbind we are waiting for completes.
+ * Just use rr_func (zeroed by upcall) to signal completion.
+ */
+static void
+rdma_unbind_func(rdma_rep_t *rep)
+{
+	wake_up(&rep->rr_unbind);
+}
+
+/*
+ * This function is called when an async event is posted to
+ * the connection which changes the connection state. All it
+ * does at this point is mark the connection up/down, the rpc
+ * timers do the rest.
+ */
+static void
+rdma_conn_func(struct rdma_ep *ep)
+{
+	struct rpc_xprt *xprt = (struct rpc_xprt *) ep->rep_xprt;
+	const char * const conn[] = {
+		"connected",
+		"peer rejected",
+		"peer not listening",
+		"peer failed to accept",
+		"disconnected",
+		"connection broken",
+		"timed out",
+		"unreachable"
+	};
+	unsigned int err = ep->rep_event_num - DAT_CONNECTION_EVENT_ESTABLISHED;                    
+	Dprintk(0 && !xprt->shutdown,
+		("rdma_conn_func: %s: %d.%d.%d.%d:%lld (DAT event 0x%x)\n",
+		(err <= 7) ? conn[err] : "unknown connection error",
+		(unsigned char) ep->rep_remote_addr.sa_data[2],
+		(unsigned char) ep->rep_remote_addr.sa_data[3],
+		(unsigned char) ep->rep_remote_addr.sa_data[4],
+		(unsigned char) ep->rep_remote_addr.sa_data[5],
+		ep->rep_server_port, ep->rep_event_num));
+
+	if (ep->rep_connected) {
+		xprt_set_connected(xprt);
+		/* Only schedule one task, until credits refreshed */
+		rpc_wake_up_next(&xprt->sending);
+	} else {
+		xprt_clear_connected(xprt);
+		xprt->cwnd = RPC_CWNDSCALE;	/* Reset server credits */
+	}
+}
+
+/*
+ * Reconnect. This routine is TBD and under construction.
+ * It is only called by the RPC framework when xprt->stream
+ * is true and xprt_connected() is false.
+ */
+static void
+xprt_rdma_reconnect(struct rpc_task *task)
+{
+	struct rpc_xprt *xprt = task->tk_xprt;
+	rdma_xprt_t *r_xprt = rpcx_to_rdmax(xprt);
+
+	if (!xprt->shutdown && r_xprt->rx_ep.rep_connected != 1) {
+		Dprintk(0, ("xprt_rdma_reconnect: attempt reconnect\n"));
+		if (rdma_ep_reconnect(&r_xprt->rx_ep, r_xprt->rx_ia) == 0)
+			rpc_delay(task, HZ / (1000000 / RDMA_CONNECT_TIMEOUT));
+		/* count retries? */
+	}
+	if (r_xprt->rx_ep.rep_connected != 1) {
+		rpc_delay(task, 5 * HZ);
+		task->tk_status = -ENOTCONN;
+	}
+}
+
+/*
+ * Sendmsg invokes the meat of RPC RDMA. It must do the following:
+ *  1.  Detect any connection issues, particularly when acting like
+ *	a UDP (connectionless) transport.
+ *  2.  Marshal the RPC request into an RPC RDMA request, which means
+ *	putting a header in front of data, and creating IOVs for kDAPL
+ *	from those in the request.
+ *  3.  In marshaling, detect opportunities for RDMA, and use them.
+ *  4.  Post a recv message to set up asynch completion, then send
+ *	the request (rdma_ep_post).
+ *  5.  Return value is number of bytes which were requested to be sent,
+ *	or error. No partial sends are supported.
+ *
+ * Synchronization: This routine is reentrant.
+ *   The RPC subsystem ensures that a given rpc_task is
+ *   protected. Multiple tasks can run at the same time. Shared structures
+ *   include the endpoint and interface structures, but these are not modified
+ *   during this operation, unless a reconnect is required, which is
+ *   serialized by the RPC subsystem as well
+ *   TBD: make sure that the RPC lock protects this (true for stream?).
+ *   TBD: verify sync of destroy of rdma_xprt_t with this and other users.
+ */
+
+static int
+xprt_rdma_sendmsg(struct rpc_xprt *xprt, struct rpc_rqst *rqst)
+{
+	rdma_req_t *req;
+	rdma_xprt_t *r_xprt = rpcx_to_rdmax(xprt);
+
+	/*
+	 * Normally we run with xprt->stream, in which case the RPC
+	 * framework handles reconnect. However, when running in "UDP
+	 * mode", support it ourself. This method is TBD.
+	 */
+	if (r_xprt->rx_ep.rep_connected != 1) {
+		if (r_xprt->rx_ep.rep_connected == 0 && !xprt->stream) {
+			Dprintk(0, ("xprt_rdma_sendmsg: attempt reconnect\n"));
+			rdma_ep_reconnect(&r_xprt->rx_ep, r_xprt->rx_ia);
+		}
+		return -ENOTCONN;
+	}
+
+	/* no retransmits */
+	if (rqst->rq_bytes_sent) {
+		Dprintk(1, ("xprt_rdma_sendmsg: retransmit suppressed!\n"));
+		return 0;
+	}
+	req = rpcr_to_rdmar(rqst);
+
+	if (req->rl_niovs == 0) {		/* first time */
+		/* marshal the send itself */
+		if (rdma_marshal_req(rqst) != 0) {
+			Dprintk(0, ("xprt_rdma_sendmsg: rdma_marshal_req failed\n"));
+			return -EIO;
+		}
+	} else
+		Dprintk(1, ("xprt_rdma_sendmsg: retransmit initiated!\n"));
+
+	if (req->rl_reply == NULL) {		/* e.g. reconnection */
+		rdma_recv_buffer_get(req);
+	}
+	if (req->rl_reply != NULL) {
+		req->rl_reply->rr_func = rdma_reply_handler;
+		/* this need only be done once, but... */
+		req->rl_reply->rr_xprt = xprt;
+	}
+
+	if (rdma_ep_post(&r_xprt->rx_ep, req)) {
+		xprt_rdma_close(xprt);
+		return -ENOTCONN;	/* implies disconnect */
+	}
+	return rqst->rq_slen;
+}
+
+/*
+ * Called as a tasklet to do req/reply match and complete a request
+ * Errors must result in the RPC task either being awakened, or
+ * allowed to timeout, to discover the errors at that time.
+ */
+static void
+rdma_reply_handler(rdma_rep_t *rep)
+{
+	struct rdma_msg_no_chunks *headerp;
+	rdma_req_t *req;
+	struct rpc_rqst *rqst;
+	struct rpc_xprt *xprt = rep->rr_xprt;
+	rdma_xprt_t *r_xprt = rpcx_to_rdmax(xprt);
+	unsigned int *iptr;
+	int rdmalen, status;
+
+	/* Check status. If bad, signal disconnect and return rep to pool */
+	if (rep->rr_len == ~0U) {
+		rdma_recv_buffer_put(rep);
+		if (r_xprt->rx_ep.rep_connected == 1) {
+			r_xprt->rx_ep.rep_event_num = ~0;
+			r_xprt->rx_ep.rep_connected = 0;
+			rdma_conn_func(&r_xprt->rx_ep);
+		}
+		return;
+	}
+	if (rep->rr_len < sizeof *headerp) {
+		Dprintk(0, ("rdma_reply_handler: short/invalid reply\n"));
+		goto repost;
+	}
+	headerp = (struct rdma_msg_no_chunks *) rep->rr_base;
+
+	/* Get XID and try for a match. */
+	rqst = xprt_lookup_rqst(xprt, headerp->rdma_xid);
+	if (rqst == NULL) {
+		Dprintk(0, ("rdma_reply_handler: reply 0x%p failed "
+			"to match request 0x%08x len %d\n",
+			rep, headerp->rdma_xid, rep->rr_len));
+	      repost:
+		rep->rr_func = rdma_reply_handler;
+		if (rdma_ep_post_recv(&r_xprt->rx_ep, rep)) {
+			rdma_recv_buffer_put(rep);
+		}
+		return;
+	}
+
+	/* get request object */
+	req = rpcr_to_rdmar(rqst);
+
+	Dprintk(1, ("rdma_reply_handler: reply 0x%p completes request 0x%p"
+			" RPC request 0x%p xid 0x%08x\n",
+			rep, req, rqst, headerp->rdma_xid));
+
+	RDMA_ASSERT(req && !req->rl_reply,
+			"xprt_lookup_rqst returned bad/duplicate reply");
+
+	/* from here on, the reply is no longer an orphan */
+	req->rl_reply = rep;
+	rqst->rq_task->tk_garb_retry = 0;	/* no retry on invalid reply */
+
+	/* check for expected message types */
+	/* The order of some of these tests is important. */
+	switch (headerp->rdma_type) {
+	case __constant_htonl(RDMA_MSG):
+		/* never expect read chunks */
+		/* never expect reply chunks (two ways to check) */
+		/* never expect write chunks without offered RDMA */
+		if (headerp->rdma_nochunks[0] != xdr_zero ||
+		    (headerp->rdma_nochunks[1] == xdr_zero &&
+		     headerp->rdma_nochunks[2] != xdr_zero) ||
+		    (headerp->rdma_nochunks[1] != xdr_zero &&
+		     req->rl_nsegs == 0)) {
+			goto badheader;
+		}
+		if (headerp->rdma_nochunks[1] != xdr_zero) {
+			/* count any expected write chunks in read reply */
+			/* start at write chunk array count */
+			iptr = &headerp->rdma_nochunks[2];
+			rdmalen = rdma_count_chunks(rep, req->rl_nsegs, 1, &iptr);
+			/* check for validity, and no reply chunk after */
+			if (rdmalen < 0 || *iptr++ != xdr_zero) {
+				goto badheader;
+			}
+			rep->rr_len -=
+			    ((unsigned char *)iptr - (unsigned char *)headerp);
+			status = rep->rr_len + rdmalen;
+		} else {
+			/* else ordinary inline */
+			iptr = (unsigned int *) (headerp + 1);
+			rep->rr_len -= sizeof *headerp;
+			status = rep->rr_len;
+		}
+		/* Fix up the rpc results for upper layer */
+		rdma_inline_fixup(rqst, iptr, rep->rr_len);
+		break;
+
+	case __constant_htonl(RDMA_NOMSG):
+		/* never expect read or write chunks, always reply chunks */
+		if (headerp->rdma_nochunks[0] != xdr_zero ||
+		    headerp->rdma_nochunks[1] != xdr_zero ||
+		    headerp->rdma_nochunks[2] != xdr_one ||
+		    req->rl_nsegs == 0) {
+			goto badheader;
+		}
+		iptr = (unsigned int *) (headerp + 1);
+		rdmalen = rdma_count_chunks(rep, req->rl_nsegs, 0, &iptr);
+		if (rdmalen < 0) {
+			goto badheader;
+		}
+		/* Reply chunk buffer already is the reply vector - no fixup. */
+		status = rdmalen;
+		break;
+
+	default:
+	badheader:
+		Dprintk(0, ("rdma_reply_handler: invalid reply header (type %d)\n",
+			ntohl(headerp->rdma_type)));
+		status = -EIO;
+		break;
+	}
+
+	/* If using rmr bind, start the deregister process now. */
+	/* (Note: if lmr_free(), cannot perform it here, in tasklet context) */
+	if (req->rl_nsegs && r_xprt->rx_ia->ri_memreg_strategy > 1) {
+		do {
+			/* Optionally wait (not here) for unbinds to complete */
+			if (--req->rl_nsegs == 0 &&
+			    r_xprt->rx_ia->ri_memreg_strategy == 2) {
+				rep->rr_func = rdma_unbind_func;
+				rdma_deregister_external(&req->rl_seg_handles[0],
+						       r_xprt, rep);
+			} else {
+				rdma_deregister_external(&req->rl_seg_handles[req->rl_nsegs],
+						       r_xprt, NULL);
+			}
+		} while (req->rl_nsegs);
+	}
+
+#if LINUX_VERSION_CODE <= KERNEL_VERSION(2,4,19)
+	/* complete RPC task. */
+	rqst->rq_task->tk_status = status;
+	rqst->rq_received = 1;
+	rpc_wake_up_task(rqst->rq_task);
+	/* Lookup returns a locked task. */
+	rpc_unlock_task(rqst->rq_task);
+#else
+	xprt_complete_rqst(xprt, rqst, status);
+#endif
+}
Index: drivers/infiniband/ulp/nfsrdma/Kconfig
===================================================================
--- drivers/infiniband/ulp/nfsrdma/Kconfig	(revision 0)
+++ drivers/infiniband/ulp/nfsrdma/Kconfig	(revision 0)
@@ -0,0 +1,5 @@
+config NFSRDMA
+	tristate "NFS over RDMA"
+	depends on INFINIBAND && INFINIBAND_IPOIB && DAT
+	---help---
+	NFSoRMDA
Index: drivers/infiniband/ulp/nfsrdma/rdma_kdapl.h
===================================================================
--- drivers/infiniband/ulp/nfsrdma/rdma_kdapl.h	(revision 0)
+++ drivers/infiniband/ulp/nfsrdma/rdma_kdapl.h	(revision 0)
@@ -0,0 +1,350 @@
+/*
+ * Copyright (c) 2003, 2004, Network Appliance, Inc. All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ *	Redistributions of source code must retain the above copyright
+ *	notice, this list of conditions and the following disclaimer.
+ *
+ *	Redistributions in binary form must reproduce the above
+ *	copyright notice, this list of conditions and the following
+ *	disclaimer in the documentation and/or other materials provided
+ *	with the distribution.
+ *
+ *	Neither the name of the Network Appliance, Inc. nor the names of
+ *	its contributors may be used to endorse or promote products
+ *	derived from this software without specific prior written
+ *	permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+
+#ifndef _RDMA_KDAPL_H
+#define _RDMA_KDAPL_H
+
+#include <dat.h>		/* kDAPL */
+#include <linux/wait.h>		/* wait_queue_head_t, etc */
+#include <linux/spinlock.h>	/* spinlock_t, etc */
+#include <asm/atomic.h>		/* atomic_t, etc */
+#include "rdma_proto.h"		/* RPC/RDMA protocol */
+
+#if RPCRDMA_DEBUG
+#define RDMA_ASSERT(x, s) if (!(x)) panic (s)
+#else
+#define RDMA_ASSERT(x, s)
+#endif
+
+/*
+ * Constants. Max RPC/NFS header is big enough to account for
+ * additional marshaling buffers passed down by Linux client.
+ *
+ * RDMA header is big enough for a fully-chunked message (read
+ * chunks are the largest). Note only a single chunk type per
+ * message is supported currently.
+ */
+#define MAX_RPCHDR	(512 + 136)	/* RPC/NFS overhead to allocate */
+#define MAX_RDMAHDR	(		/* RPC/RDMA max header */ \
+	sizeof (struct rdma_msg) + (2 * sizeof (uint32_t)) + \
+	(sizeof (struct xdr_read_chunk) * RDMA_MAX_SEGS) + sizeof (uint32_t))
+#define RDMA_INLINE_PAD_THRESH 512	/* payload threshold to pad */
+
+#define RDMA_MAX_SEGS 17		/* max scatter/gather: hdr + 16 pgs */
+
+#define RDMA_CONNECT_TIMEOUT	500000	/* TBD .5 seconds */
+#define RDMA_CONNECT_RETRY_MAX	9
+
+/*
+ * Data structures:
+ *   rdma_ia -- interface adapter, plus pz and event handles
+ *   rdma_ep -- represents an endpoint.
+ *   rdma_buffer -- holds pre-registered buffer memory for inline messages.
+ *   rdma_xprt -- encapsulates the structures above for integration with RPC.
+ */
+
+/*
+ * Interface Adapter -- one per module instance
+ */
+typedef struct rdma_ia {
+	struct dat_ia	*ri_ia_handle;
+	int		ri_memreg_strategy;
+	struct dat_lmr	*ri_bind_mem;
+#if RPCRDMA_DEBUG
+	DAT_RMR_CONTEXT	ri_bind_rmr;
+#endif
+	struct dat_lmr_triplet ri_bind_iov;
+	struct dat_ia_attr ri_ia_attr;
+	struct dat_provider_attr ri_pv_attr;
+	struct dat_pz *ri_pz_handle;
+	struct dat_evd *ri_async_evd_handle;
+} rdma_ia_t;
+
+/*
+ * RDMA Endpoint -- one per transport instance
+ */
+typedef struct rdma_ep {
+	spinlock_t	rep_postlock;
+	int		rep_cqcount;
+	int		rep_cqinit;
+	int		rep_connected;
+	rdma_ia_t	*rep_ia;
+	struct dat_ep 	*rep_handle;
+	struct dat_evd	*rep_evd_handle;
+	struct dat_evd	*rep_conn_handle;
+	struct sockaddr rep_remote_addr;
+	DAT_CONN_QUAL	rep_server_port;
+	struct dat_ep_attr rep_attr;
+	wait_queue_head_t rep_connect_wait;
+	enum dat_event_number rep_event_num;
+	struct dat_lmr_triplet rep_pad;	/* holds zeroed pad */
+	struct dat_lmr *rep_padhandle;	/* holds zeroed pad */
+	void		(*rep_func)(struct rdma_ep *);
+	void		*rep_xprt;	/* for rep_func - rpc xprt */
+} rdma_ep_t;
+
+#define INIT_CQCOUNT(ep) ((ep)->rep_cqcount = (ep)->rep_cqinit)
+#define DECR_CQCOUNT(ep) (--(ep)->rep_cqcount)
+
+/*
+ * rdma_rep_t -- this structure encapsulates state required to recv
+ * and complete a reply, asychronously. It needs several pieces of
+ * state:
+ *   o recv buffer (posted to provider)
+ *   o DAT_LMR_TRIPLET (also donated to provider)
+ *   o status of reply (length, success or not)
+ *   o bookkeeping state to get run by tasklet (list, etc)
+ *
+ * These are allocated during initialization, per-transport instance;
+ * however, the tasklet execution list itself is global, as it should
+ * always be pretty short.
+ *
+ * N of these are associated with a transport instance, and stored in
+ * rdma_buffer_t. N is the max number of outstanding requests.
+ */
+
+struct rdma_rep;
+struct rdma_buffer;
+typedef void (*rdma_reply_func)(struct rdma_rep *);
+
+typedef struct rdma_rep {
+	unsigned int	rr_len;		/* actual received reply length */
+	struct rdma_buffer *rr_buffer;	/* home base for this structure */
+	struct rpc_xprt	*rr_xprt;	/* needed for request/reply matching */
+	rdma_reply_func	rr_func;	/* called by tasklet in softint */
+	struct list_head rr_list;	/* tasklet list */
+	wait_queue_head_t rr_unbind;	/* optional unbind wait */
+	struct dat_lmr_triplet rr_iov;		/* for posting */
+	struct dat_lmr *rr_handle;	/* handle for mem in rr_iov */
+	unsigned char	rr_base[MAX_RDMAHDR];	/* start of actual buffer */
+} rdma_rep_t;
+
+/*
+ * rdma_req_t -- this structure is central to the request/reply sequence.
+ *
+ * N of these are associated with a transport instance, and stored in
+ * rdma_buffer_t. N is the max number of outstanding requests.
+ *
+ * It includes pre-registered buffer memory for send AND recv.
+ * The recv buffer, however, is not owned by this structure, and
+ * is "donated" to the hardware when a recv is posted. When a
+ * reply is handled, the recv buffer used is given back to the
+ * rdma_req_t associated with the request.
+ *
+ * In addition to the basic memory, this structure includes an array
+ * of DAT_LMR_TRIPLET (IOV) for send operations. The reason is
+ * that the kDAPL spec says that the iov's passed to *post_{send,recv}
+ * must not be modified until the DTO completes, because behavior is
+ * provider-dependent.
+ *
+ * NOTES:
+ *   o RDMA_MAX_SEGS is the max number of addressible chunk elements we
+ *     marshal. The number needed varies depending on the iov lists that
+ *     are passed to us, and if physical addressing is used, the layout.
+ */
+
+typedef struct {		/* chunk descriptors */
+	u64 mr_base;
+	u64 mr_len;
+} rdma_mr_iov;
+
+typedef union {			/* chunk memory handles */
+	struct dat_lmr *rl_lmr;
+	struct rdma_rmr_entry {
+		struct list_head rmr_freelist;
+		struct dat_rmr *rmr_handle;
+	} *rl_rmr;
+} rdma_mr_handle;
+
+typedef struct rdma_req {
+	unsigned int	rl_size;	/* actual length of buffer */
+	unsigned int	rl_niovs;	/* 0, 2 or 4 */
+	unsigned int	rl_nsegs;	/* non-zero if chunks */
+	struct rdma_buffer *rl_buffer;	/* home base for this structure */
+	rdma_rep_t	*rl_reply;	/* holder for reply buffer */
+	rdma_mr_handle	rl_seg_handles[RDMA_MAX_SEGS];	/* chunk segments */
+	struct dat_lmr_triplet rl_send_iov[4];	/* for active requests */
+	struct dat_lmr_triplet rl_iov;		/* for posting */
+	struct dat_lmr *rl_handle;	/* handle for mem in rl_iov */
+	unsigned char	rl_base[MAX_RDMAHDR];	/* start of actual buffer */
+} rdma_req_t;
+
+#define rpcr_to_rdmar(r) ((rdma_req_t *)(r)->rq_task->tk_buffer - 1)
+
+/*
+ * rdma_buffer_t -- holds list/queue of registered memory for
+ * requests/replies, and client/server credits.
+ *
+ * One of these is associated with a transport instance
+ */
+typedef struct rdma_buffer {
+	spinlock_t	rb_lock;	/* protects indexes */
+	atomic_t	rb_credits;	/* most recent server credits */
+	int		rb_max_requests;/* client max requests */
+	struct list_head rb_rmrs;	/* optional memory windows */
+	int		rb_send_index;
+	rdma_req_t	**rb_send_bufs;
+	int		rb_recv_index;
+	rdma_rep_t	**rb_recv_bufs;
+	char		*rb_pool;
+} rdma_buffer_t;
+
+/*
+ * Internal structure for transport instance creation. This
+ * exists primarily to keep this file from including xprt.h,
+ * where rdma_create_data is defined.
+ *
+ * TBD: add additional fields/information for creation
+ */
+struct rdma_create_data_internal {
+	struct sockaddr	addr;
+	DAT_CONN_QUAL	port;
+	unsigned int	max_requests;	/* max requests (slots) in flight */
+	unsigned int	rsize;		/* mount rsize - max read hdr+data */
+	unsigned int	wsize;		/* mount wsize - max write hdr+data */
+	unsigned int	inline_rsize;	/* max non-rdma read data payload */
+	unsigned int	inline_wsize;	/* max non-rdma write data payload */
+	unsigned int	padding;	/* non-rdma write header padding */
+};
+
+#define RDMA_INLINE_READ_THRESHOLD(rq) \
+	(rpcx_to_rdmad((rq)->rq_xprt).inline_rsize + MAX_RPCHDR)
+
+#define RDMA_INLINE_WRITE_THRESHOLD(rq)\
+	(rpcx_to_rdmad((rq)->rq_xprt).inline_wsize + MAX_RPCHDR)
+
+#define RDMA_INLINE_PAD_VALUE(rq)\
+	rpcx_to_rdmad((rq)->rq_xprt).padding
+
+/*
+ * Stats for RDMA
+ * TBD: add a /proc interface.
+ */
+typedef struct {
+/* TBD
+	total read
+	total write
+	write header total
+	read header total
+	write chunk count
+	read chunk count
+	reply chunk count
+	fixup data copy count
+	hardway register count
+	failed marshal
+*/
+} rdma_stats_t;
+
+/*
+ * RDMA transport
+ *
+ * This is the single object that bundles the state defined
+ * in the RDMA interface over kDAPL.
+ * The contained structures are embedded, not pointers,
+ * for convenience - except for the ia, which can be shared.
+ * This structure need not be visible externally.
+ *
+ * It is allocated and initialized during mount, and released
+ * during unmount.
+ */
+typedef struct {
+	struct rdma_ia	*rx_ia;
+	struct rdma_ep	rx_ep;
+	struct rdma_buffer rx_buf;
+	struct rdma_create_data_internal rx_data;
+	rdma_stats_t	rx_stats;
+} rdma_xprt_t;
+
+#define rpcx_to_rdmax(x) ((rdma_xprt_t *)(x)->sock)
+#define rpcx_to_rdmad(x) (rpcx_to_rdmax(x)->rx_data)
+
+/*
+ * Interface Adapter calls
+ * Creation:
+ *  o ia_t is provided by caller (zero'd first)
+ * Destruction/close:
+ *  o will NOT kfree the structure
+ */
+rdma_ia_t *rdma_ia_init(rdma_ia_t *, char *, int);
+void rdma_ia_close(rdma_ia_t *);
+
+/*
+ * Endpoint calls
+ * Creation:
+ *  o ep_t is provided by caller but must have been
+ *    initialized via a call to rdma_ep_default_attr.
+ * Destroy:
+ *  o will NOT kfree the structure
+ */
+rdma_ep_t *rdma_ep_create(rdma_ep_t *, rdma_ia_t *,
+				struct rdma_create_data_internal *);
+void rdma_ep_default_attr(rdma_ep_t *, rdma_ia_t *,
+				struct rdma_create_data_internal *);
+
+int rdma_ep_destroy(rdma_ep_t *, rdma_ia_t *);
+int rdma_ep_connect(rdma_ep_t *, rdma_ia_t *, int);
+int rdma_ep_disconnect(rdma_ep_t *, rdma_ia_t *);
+int rdma_ep_reconnect(rdma_ep_t *, rdma_ia_t *);
+
+int rdma_ep_post(rdma_ep_t *, rdma_req_t *);
+int rdma_ep_post_recv(rdma_ep_t *, rdma_rep_t *);
+
+/*
+ * Buffer calls
+ */
+rdma_buffer_t *rdma_buffer_create(rdma_buffer_t *, rdma_ep_t *, rdma_ia_t *,
+				struct rdma_create_data_internal *);
+void rdma_buffer_destroy(rdma_buffer_t *);
+
+rdma_req_t *rdma_buffer_get(rdma_buffer_t *);
+void rdma_buffer_put(rdma_req_t *);
+void rdma_recv_buffer_get(rdma_req_t *);
+void rdma_recv_buffer_put(rdma_rep_t *);
+
+int rdma_register_internal(rdma_ia_t *, void *, int, struct dat_lmr **,
+			   struct dat_lmr_triplet *);
+int rdma_deregister_internal(void *);
+
+int rdma_register_external(rdma_mr_iov *, rdma_mr_handle *,
+				DAT_RMR_CONTEXT *, int, rdma_xprt_t *);
+int rdma_deregister_external(rdma_mr_handle *, rdma_xprt_t *, void *);
+
+/*
+ * NFS/RDMA calls (kernel version dependent - rdma_marshal.c)
+ */
+struct rpc_rqst;
+int rdma_marshal_req(struct rpc_rqst *);
+int rdma_count_chunks(rdma_rep_t *, int, int, unsigned int **);
+void rdma_inline_fixup(struct rpc_rqst *, void *, int);
+
+#endif				/* _RDMA_KDAPL_H */
Index: drivers/infiniband/ulp/nfsrdma/rdma_marshal.c
===================================================================
--- drivers/infiniband/ulp/nfsrdma/rdma_marshal.c	(revision 0)
+++ drivers/infiniband/ulp/nfsrdma/rdma_marshal.c	(revision 0)
@@ -0,0 +1,881 @@
+/*
+ * Copyright (c) 2003, 2004, Network Appliance, Inc. All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ *	Redistributions of source code must retain the above copyright
+ *	notice, this list of conditions and the following disclaimer.
+ *
+ *	Redistributions in binary form must reproduce the above
+ *	copyright notice, this list of conditions and the following
+ *	disclaimer in the documentation and/or other materials provided
+ *	with the distribution.
+ *
+ *	Neither the name of the Network Appliance, Inc. nor the names of
+ *	its contributors may be used to endorse or promote products
+ *	derived from this software without specific prior written
+ *	permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+
+/*
+ * rdma_marshal.c
+ *
+ * This file contains the guts of the RPC RDMA protocol, and
+ * does marshaling/unmarshaling, etc. It is also where any
+ * changes related to the various Linux kernel generations
+ * live. These are primarily related to memory representation
+ * (iovs, pages, etc).
+ *
+ * Where possible, the differences wrt specific kernels are
+ * discussed.
+ */
+
+#include "rdma_kdapl.h"
+#include <linux/sunrpc/clnt.h>
+
+/*
+ * local functions
+ */
+
+typedef enum { noch = 0, readch, areadch, writech, replych } chunktype;
+static int rdma_inline_pullup(struct rpc_rqst *, int);
+
+
+/*
+ * Macros to hide differences from 2.4.18 and 2.4.20 in RPC/XDR code
+ */
+
+#include <linux/version.h>
+#include <linux/mm.h>
+#include <linux/highmem.h>
+#include <asm/io.h>
+
+#if LINUX_VERSION_CODE <= KERNEL_VERSION(2,4,19)
+
+#define XDR_TARGET		struct rpc_iov
+#define pos0(r)			(r)->io_vec->iov_len
+
+#define RPC_SEND_VECS(rqst)	(rqst)->rq_snr
+#define RPC_SEND_SEG0(rqst)	(rqst)->rq_snd_buf.io_vec[0].iov_base
+#define RPC_SEND_LEN0(rqst)	(rqst)->rq_snd_buf.io_vec[0].iov_len
+
+static inline int
+RPC_SEND_COPY(struct rpc_rqst *rqst, int n, int len, char *destp)
+{
+	if (len > rqst->rq_snd_buf.io_vec[n].iov_len)
+		len = rqst->rq_snd_buf.io_vec[n].iov_len;
+	memcpy(destp, rqst->rq_snd_buf.io_vec[n].iov_base, len);
+	return len;
+}
+
+#define RPC_RECV_VECS(rqst)	(rqst)->rq_rnr
+#define RPC_RECV_SEG0(rqst)	(rqst)->rq_rcv_buf.io_vec[0].iov_base
+#define RPC_RECV_LEN0(rqst)	(rqst)->rq_rcv_buf.io_vec[0].iov_len
+
+static inline int
+RPC_RECV_COPY(struct rpc_rqst *rqst, int n, int len, char *src)
+{
+	if (len > rqst->rq_rcv_buf.io_vec[n].iov_len)
+		len = rqst->rq_rcv_buf.io_vec[n].iov_len;
+	memcpy(rqst->rq_rcv_buf.io_vec[n].iov_base, src, len);
+	return len;
+}
+
+#else	/* >= 2.4.20 */
+
+#define XDR_TARGET		struct xdr_buf
+#define pos0(x)			(x)->head[0].iov_len
+
+#define RPC_SEND_VECS(rqst)	\
+	(1 + (PAGE_ALIGN((rqst)->rq_snd_buf.page_base +  \
+			 (rqst)->rq_snd_buf.page_len) >> PAGE_SHIFT) + \
+		((rqst)->rq_snd_buf.tail[0].iov_len != 0))
+#define RPC_SEND_SEG0(rqst)	(rqst)->rq_svec[0].iov_base
+#define RPC_SEND_LEN0(rqst)	(rqst)->rq_svec[0].iov_len
+
+static inline int
+RPC_SEND_COPY(struct rpc_rqst *rqst, int n, int len, char *dest)
+{
+	char *src;
+	int lim;
+	if (--n == 0) {
+		lim = PAGE_SIZE - rqst->rq_snd_buf.page_base;
+	} else {
+		lim = rqst->rq_snd_buf.page_base + rqst->rq_snd_buf.page_len;
+		lim = PAGE_ALIGN(lim);
+		if (n < (lim >> PAGE_SHIFT))
+			lim = PAGE_SIZE;
+		else if (n == (lim >> PAGE_SHIFT))
+			lim &= ~PAGE_MASK;
+		else {
+			if (len > rqst->rq_snd_buf.tail[0].iov_len)
+				len = rqst->rq_snd_buf.tail[0].iov_len;
+			memcpy(dest, rqst->rq_snd_buf.tail[0].iov_base, len);
+			return len;
+		}
+	}
+	if (len > lim)
+		len = lim;
+
+	if (PageHighMem(rqst->rq_snd_buf.pages[n])) {
+		src = kmap(rqst->rq_snd_buf.pages[n]);
+		memcpy(dest, src, len);
+		kunmap(rqst->rq_snd_buf.pages[n]);
+	} else {
+		src = page_address(rqst->rq_snd_buf.pages[n]) +
+			 (n ? 0 : rqst->rq_snd_buf.page_base);
+		memcpy(dest, src, len);
+	}
+	return len;
+}
+
+#define RPC_RECV_VECS(rqst)	\
+	(1 + (PAGE_ALIGN((rqst)->rq_rcv_buf.page_base + \
+			 (rqst)->rq_rcv_buf.page_len) >> PAGE_SHIFT) + \
+		((rqst)->rq_rcv_buf.tail[0].iov_len != 0))
+#define RPC_RECV_SEG0(rqst)	(rqst)->rq_rcv_buf.head[0].iov_base
+#define RPC_RECV_LEN0(rqst)	(rqst)->rq_rcv_buf.head[0].iov_len
+
+static inline int
+RPC_RECV_COPY(struct rpc_rqst *rqst, int n, int len, char *src)
+{
+	char *dest;
+	int lim;
+	if (--n == 0) {
+		lim = PAGE_SIZE - rqst->rq_rcv_buf.page_base;
+	} else {
+		lim = rqst->rq_rcv_buf.page_base + rqst->rq_rcv_buf.page_len;
+		lim = PAGE_ALIGN(lim);
+		if (n < (lim >> PAGE_SHIFT))
+			lim = PAGE_SIZE;
+		else if (n == (lim >> PAGE_SHIFT))
+			lim &= ~PAGE_MASK;
+		else {
+			if (len > rqst->rq_rcv_buf.tail[0].iov_len)
+				len = rqst->rq_rcv_buf.tail[0].iov_len;
+			memcpy(rqst->rq_rcv_buf.tail[0].iov_base, src, len);
+			return len;
+		}
+	}
+	if (len > lim)
+		len = lim;
+
+	if (PageHighMem(rqst->rq_rcv_buf.pages[n])) {
+		dest = kmap(rqst->rq_rcv_buf.pages[n]);
+		memcpy(dest, src, len);
+		kunmap(rqst->rq_rcv_buf.pages[n]);
+	} else {
+		dest = page_address(rqst->rq_rcv_buf.pages[n]) +
+			 (n ? 0 : rqst->rq_rcv_buf.page_base);
+		memcpy(dest, src, len);
+	}
+	return len;
+}
+
+#endif
+
+static unsigned int rdma_create_chunks(struct rpc_rqst *, XDR_TARGET *,
+				struct rdma_msg *, chunktype);
+static int rdma_convert_virt(XDR_TARGET *, int, rdma_mr_iov *, int);
+static int rdma_convert_phys(XDR_TARGET *, int, rdma_mr_iov *, int);
+
+#undef Dprintk
+#if RPCRDMA_DEBUG
+#define Dprintk(n, x) if (Debug >= n) printk x
+extern int Debug;
+#else
+#define Dprintk(n, x)
+#endif
+
+/*
+ * Marshal a request: the primary job of this routine is to choose
+ * the transfer modes. See comments below.
+ *
+ * Uses multiple kDAPL IOVs for a request:
+ *  [0] -- RPC RDMA header, which uses memory from the *start* of the
+ *         preregistered buffer that already holds the RPC data in
+ *         its middle.
+ *  [1] -- the RPC header/data, marshaled by RPC and the NFS protocol.
+ *  [2] -- optional padding.
+ *  [3] -- if padded, header only in [1] and data here.
+ */
+
+#if RPCRDMA_DEBUG
+static const char transfertypes[][12] = {
+	"pure inline",	/* no chunks */
+	" read chunk",	/* some argument via rdma read */
+	"*read chunk",	/* entire request via rdma read */
+	"write chunk",	/* some result via rdma write */
+	"reply chunk"	/* entire reply via rdma write */
+};
+#endif
+
+int
+rdma_marshal_req(struct rpc_rqst *rqst)
+{
+	struct rpc_xprt *xprt = rqst->rq_xprt;
+	rdma_req_t *req = rpcr_to_rdmar(rqst);
+	unsigned char *base;
+	unsigned int hdrlen, rpclen, padlen, buflen;
+	chunktype rtype, wtype;
+	struct rdma_msg_no_chunks *headerp;
+
+	if (xprt->stream) {	/* trim the stream marker and total len */
+		RPC_SEND_SEG0(rqst) += 4;
+		RPC_SEND_LEN0(rqst) -= 4;
+		rqst->rq_slen -= 4;
+	}
+
+	/*
+	 * rpclen gets amount of data in first buffer, which is the
+	 * pre-registered buffer.
+	 */
+	base = RPC_SEND_SEG0(rqst);
+	rpclen = RPC_SEND_LEN0(rqst);
+	buflen = ((unsigned char *)(req + 1) + req->rl_size) - base;
+
+	/* sanity check -- these should match */
+	RDMA_ASSERT(buflen <= req->rl_size,
+			"rdma_marshal_req cannot match request buffers");
+
+	/* build RDMA header in private area at front */
+	headerp = (struct rdma_msg_no_chunks *) req->rl_base;
+	/* don't htonl XID, it's already done in request */
+	headerp->rdma_xid = rqst->rq_xid;
+	headerp->rdma_vers = xdr_one;
+	headerp->rdma_credit = htonl(req->rl_buffer->rb_max_requests);
+	headerp->rdma_type = __constant_htonl(RDMA_MSG);
+
+	/*
+	 * Chunks needed for results?
+	 *
+	 * o If the expected result is under the inline threshold, all ops
+	 *   return as inline (but see later).
+	 * o Large non-read ops return as a single reply chunk.
+	 * o Large read ops return data as write chunk(s), header as inline.
+	 *
+	 * Note: the NFS code sending down multiple result segments implies
+	 * the op is one of read, readdir[plus] or readlink.
+	 */
+	if (rqst->rq_rcv_buf.len <= RDMA_INLINE_READ_THRESHOLD(rqst)) {
+		wtype = noch;
+	} else if (RPC_RECV_VECS(rqst) == 1) {
+		wtype = replych;
+	} else {
+		/* TBD fix this intimacy with a better test! */
+		#include <linux/nfs3.h>
+		if (rqst->rq_task->tk_client->cl_prog == NFS_PROGRAM &&
+		    rqst->rq_task->tk_client->cl_vers == 3 &&
+		    rqst->rq_task->tk_msg.rpc_proc->p_proc == NFS3PROC_READ) {
+			wtype = writech;
+		} else {
+			wtype = replych;
+		}
+	}
+
+	/*
+	 * Chunks needed for arguments?
+	 *
+	 * o If the total request is under the inline threshold, all ops
+	 *   are sent as inline.
+	 * o Large non-write ops are sent with the entire message as a
+	 *   single read chunk (protocol 0-position special case).
+	 * o Large write ops transmit data as read chunk(s), header as
+	 *   inline.
+	 *
+	 * Note: the NFS code sending down multiple argument segments
+	 * implies the op is a write.
+	 */
+	if (rqst->rq_slen <= RDMA_INLINE_WRITE_THRESHOLD(rqst)) {
+		rtype = noch;
+	} else if (RPC_SEND_VECS(rqst) == 1) {
+		rtype = areadch;
+	} else {
+		rtype = readch;
+	}
+
+	/* The following simplification is not true forever */
+	RDMA_ASSERT(rtype == noch || wtype == noch,
+		"rdma_marshal_req read and write chunks needed");
+
+	if (rpcx_to_rdmax(xprt)->rx_ia->ri_memreg_strategy == 0 &&
+	    (rtype != noch || wtype != noch)) {
+		/* forced to "pure inline"? */
+		Dprintk(0, ("rdma_marshal_req: too much data for inline\n"));
+		return -1;
+	}
+
+	hdrlen = sizeof *headerp;
+	padlen = 0;
+
+	/*
+	 * Pull up any extra send data into the preregistered buffer.
+	 * When padding is in use and applies to the transfer, insert
+	 * it and change the message type.
+	 */
+	if (rtype == noch) {
+
+		padlen = rdma_inline_pullup(rqst, RDMA_INLINE_PAD_VALUE(rqst));
+
+		if (padlen) {
+			struct rdma_msg_padded *pheaderp =
+				(struct rdma_msg_padded *) headerp;
+			pheaderp->rdma_type = __constant_htonl(RDMA_MSGP);
+			pheaderp->rdma_align = htonl(RDMA_INLINE_PAD_VALUE(rqst));
+			pheaderp->rdma_thresh = __constant_htonl(RDMA_INLINE_PAD_THRESH);
+			pheaderp->rdma_nochunks[0] = xdr_zero;
+			pheaderp->rdma_nochunks[1] = xdr_zero;
+			pheaderp->rdma_nochunks[2] = xdr_zero;
+			hdrlen = sizeof *pheaderp;
+			RDMA_ASSERT(wtype == noch,
+					"rdma_marshal_req padded write chunk");
+
+		} else {
+			headerp->rdma_nochunks[0] = xdr_zero;
+			headerp->rdma_nochunks[1] = xdr_zero;
+			headerp->rdma_nochunks[2] = xdr_zero;
+			/* new length after pullup */
+			rpclen = RPC_SEND_LEN0(rqst);
+			/*
+			 * Currently we try to not actually use read inline.
+			 * Reply chunks have the desirable property that
+			 * they land, packed, directly in the target buffers
+			 * without headers, so they require no fixup. The
+			 * additional RDMA Write op sends the same amount
+			 * of data, streams on-the-wire and adds no overhead
+			 * on receive. Therefore, we request a reply chunk
+			 * for non-writes wherever feasible and efficient.
+			 */
+			if (wtype == noch &&
+			    rpcx_to_rdmax(xprt)->rx_ia->ri_memreg_strategy > 1)
+				wtype = replych;
+		}
+	}
+
+	/*
+	 * Marshal chunks. This routine will return the header length
+	 * consumed by marshaling.
+	 */
+	if (rtype != noch) {
+		hdrlen = rdma_create_chunks(rqst, &rqst->rq_snd_buf,
+					(struct rdma_msg *)headerp, rtype);
+		wtype = rtype;	/* simplify Dprintk */
+
+	} else if (wtype != noch) {
+		hdrlen = rdma_create_chunks(rqst, &rqst->rq_rcv_buf,
+					(struct rdma_msg *)headerp, wtype);
+	}
+
+	if (hdrlen == 0)
+		return -1;
+
+	Dprintk(1, ("rdma_marshal_req: %s: "
+			"hdrlen %d rpclen %d padlen %d buflen %d "
+			"headerp 0x%p base 0x%p lmr 0x%x\n",
+			transfertypes[wtype], hdrlen, rpclen, padlen, buflen,
+			headerp, base, req->rl_iov.lmr_context));
+
+	/*
+	 * initialize send_iov's - normally only two: rdma chunk header and
+	 * single preregistered header buffer, but if padding is present,
+	 * then use a preregistered (and zeroed) pad buffer between the RPC
+	 * header and any write data. In all non-rdma cases, any following
+	 * data has been copied into the header buffer.
+	 *
+	 * Be sure to account for any trimmed 4-byte length marker.
+	 */
+	req->rl_send_iov[0].virtual_address = req->rl_iov.virtual_address;
+	req->rl_send_iov[0].segment_length = hdrlen;
+	req->rl_send_iov[0].lmr_context = req->rl_iov.lmr_context;
+
+	req->rl_send_iov[1].virtual_address =
+			req->rl_iov.virtual_address + MAX_RDMAHDR;
+	if (xprt->stream)
+		req->rl_send_iov[1].virtual_address += 4;
+	req->rl_send_iov[1].segment_length = rpclen;
+	req->rl_send_iov[1].lmr_context = req->rl_iov.lmr_context;
+
+	req->rl_niovs = 2;
+
+	if (padlen) {
+		rdma_ep_t *ep = &rpcx_to_rdmax(xprt)->rx_ep;
+
+		req->rl_send_iov[2].virtual_address = ep->rep_pad.virtual_address;
+		req->rl_send_iov[2].segment_length = padlen;
+		req->rl_send_iov[2].lmr_context = ep->rep_pad.lmr_context;
+
+		req->rl_send_iov[3].virtual_address =
+				req->rl_send_iov[1].virtual_address + rpclen;
+		req->rl_send_iov[3].segment_length = rqst->rq_slen - rpclen;
+		req->rl_send_iov[3].lmr_context = req->rl_iov.lmr_context;
+
+		req->rl_niovs = 4;
+	}
+
+	if (xprt->stream) {	/* restore the stream marker and total len */
+		RPC_SEND_SEG0(rqst) -= 4;
+		RPC_SEND_LEN0(rqst) += 4;
+		rqst->rq_slen += 4;
+	}
+
+	return 0;
+}
+
+/*
+ * Create read/write chunk lists, and reply chunks, for RDMA
+ *
+ *   Assume check against THRESHOLD has been done, and chunks are required.
+ *   Assume only encoding one list entry for read|write chunks. The NFSv3
+ *     protocol is simple enough to allow this as it only has a single "bulk
+ *     result" in each procedure - complicated NFSv4 COMPOUNDs are not. (The
+ *     RDMA/Sessions NFSv4 proposal addresses this for future v4 revs.)
+ *
+ * Algorithm:
+ *   For each entry in the recv vector:
+ *     o attempt to coalesce with next entry to create fewer ranges
+ *     o when no longer contiguous, create a chunk:
+ *       register memory
+ *       marshal chunk
+ *   Save lmr_handle in embedded array in rdma_req_t
+ *
+ * When used for a single reply chunk (which is a special write
+ * chunk used for the entire reply, rather than just the data), it
+ * is used primarily for READDIR and READLINK which would otherwise
+ * be severely size-limited by a small rdma inline read max. The server
+ * response will come back as an RDMA Write, followed by a message
+ * of type RDMA_NOMSG carrying the xid and length. As a result, reply
+ * chunks do not provide data alignment.
+ *
+ * Encoding key for single-list chunks (HLOO = Handle32 Length32 Offset64):
+ *
+ *  Read chunklist (a linked list):
+ *   N elements, position P (same P for all chunks of same arg!):
+ *    1 - PHLOO - 1 - PHLOO - ... - 1 - PHLOO - 0
+ *
+ *  Write chunklist (a list of (one) counted array):
+ *   N elements:
+ *    1 - N - HLOO - HLOO - ... - HLOO - 0
+ *
+ *  Reply chunk (a counted array):
+ *   N elements:
+ *    1 - N - HLOO - HLOO - ... - HLOO
+ *
+ * Changes for 2.4.20:
+ *
+ * This function will need to change for 2.4.20, as the extra
+ * data is passed as a page list, and not struct iovec.
+ * (The page lists in 2.4.20 are not kmap'd, which in the RDMA
+ * case is a big advantage, since we don't need them mapped.)
+ */
+
+static unsigned int
+rdma_create_chunks(struct rpc_rqst *rqst, XDR_TARGET *target,
+			struct rdma_msg *headerp, chunktype type)
+{
+	rdma_req_t *req = rpcr_to_rdmar(rqst);
+	rdma_xprt_t *r_xprt = rpcx_to_rdmax(rqst->rq_xprt);
+	int nsegs = 0;
+	int niovs, pos;
+	rdma_mr_iov segs[RDMA_MAX_SEGS], *seg = segs;
+	DAT_RMR_CONTEXT rmr_context;
+	struct xdr_read_chunk *cur_rchunk = NULL;
+	struct xdr_write_array *warray = NULL;
+	struct xdr_write_chunk *cur_wchunk = NULL;
+	unsigned int *iptr = (unsigned int *)(headerp + 1);
+
+	pos = (type == replych || type == areadch ? 0 : pos0(target));
+	/* Check for coalesce, optionally convert to physical for RMR bind */
+	if (r_xprt->rx_ia->ri_memreg_strategy > 1) {
+		niovs = rdma_convert_phys(target, pos != 0, seg, RDMA_MAX_SEGS);
+	} else {
+		niovs = rdma_convert_virt(target, pos != 0, seg, RDMA_MAX_SEGS);
+	}
+	if (niovs == 0) {
+		return 0;
+	}
+
+	if (type == readch || type == areadch) {
+		/* a read chunk - server will RDMA Read our memory */
+		cur_rchunk = (struct xdr_read_chunk *) iptr;
+	} else {
+		/* a write or reply chunk - server will RDMA Write our memory */
+		*iptr++ = xdr_zero;	/* encode a NULL read chunk list */
+		if (type == replych) {
+			*iptr++ = xdr_zero;	/* a NULL write chunk list */
+		}
+		warray = (struct xdr_write_array *) iptr;
+		cur_wchunk = (struct xdr_write_chunk *) (warray + 1);
+	}
+
+	while (niovs--) {
+		/* bind/register the memory, then build chunk. */
+		if (rdma_register_external(seg,
+					&req->rl_seg_handles[nsegs],
+					&rmr_context,
+					cur_wchunk != NULL,
+					r_xprt)) {
+			goto out;
+		}
+		if (cur_rchunk) {	/* read */
+			cur_rchunk->discrim = xdr_one;
+			/* all read chunks have the same "position" */
+			cur_rchunk->position = htonl(pos);
+			cur_rchunk->target.handle = htonl(rmr_context);
+			cur_rchunk->target.length = htonl(seg->mr_len);
+			xdr_encode_hyper((u32 *)&cur_rchunk->target.offset,
+					seg->mr_base);
+			Dprintk(1, ("rdma_create_chunks: read chunk "
+				"elem %lld at 0x%llx:0x%x pos %d (%s)\n",
+				seg->mr_len, seg->mr_base, rmr_context,
+				pos, niovs ? "more" : "last"));
+			cur_rchunk++;
+		} else {		/* write/reply */
+			cur_wchunk->target.handle = htonl(rmr_context);
+			cur_wchunk->target.length = htonl(seg->mr_len);
+			xdr_encode_hyper((u32 *)&cur_wchunk->target.offset,
+					seg->mr_base);
+			Dprintk(1, ("rdma_create_chunks: %s chunk "
+				"elem %lld at 0x%llx:0x%x (%s)\n",
+				(type == replych) ? "reply" : "write",
+				seg->mr_len, seg->mr_base, rmr_context,
+				niovs ? "more" : "last"));
+			cur_wchunk++;
+		}
+		seg++;
+		nsegs++;
+	}
+
+	/* success. all failures return above */
+	req->rl_nsegs = nsegs;
+
+	RDMA_ASSERT(nsegs > 0, "rdma_create_chunks internal error");
+
+	/*
+	 * finish off header. If write, marshal discrim and nchunks.
+	 */
+	if (cur_rchunk) {
+		iptr = (unsigned int *) cur_rchunk;
+		*iptr++ = xdr_zero;	/* finish the read chunk list */
+		*iptr++ = xdr_zero;	/* encode a NULL write chunk list */
+		*iptr++ = xdr_zero;	/* encode a NULL reply chunk */
+	} else {
+		warray->discrim = xdr_one;
+		warray->nchunks = htonl(nsegs);
+		iptr = (unsigned int *) cur_wchunk;
+		if (type == writech) {
+			*iptr++ = xdr_zero; /* finish the write chunk list */
+			*iptr++ = xdr_zero; /* encode a NULL reply chunk */
+		}
+	}
+
+	/*
+	 * Return header size.
+	 */
+	return (unsigned char *)iptr - (unsigned char *)headerp;
+
+out:
+	while (--nsegs >= 0) {
+		rdma_deregister_external(&req->rl_seg_handles[nsegs], r_xprt, NULL);
+	}
+	return 0;
+}
+
+/*
+ * Copy write data inline.
+ * This function is used for "small" requests. Data which is passed
+ * to RPC via iovecs (or page list) is copied directly into the
+ * pre-registered memory buffer for this request. For small amounts
+ * of data, this is efficient. The cutoff value is tunable.
+ */
+static int
+rdma_inline_pullup(struct rpc_rqst *rqst, int pad)
+{
+	int i, n, curlen;
+	unsigned char *destp;
+
+	destp = RPC_SEND_SEG0(rqst);
+	curlen = RPC_SEND_LEN0(rqst);
+	destp += curlen;
+	/*
+	 * Do optional padding where it makes sense. Alignment of write
+	 * payload can help the server, if our setting is accurate.
+	 */
+	pad -= (curlen + sizeof(struct rdma_msg_padded));
+	if (pad < 0 || rqst->rq_slen - curlen < RDMA_INLINE_PAD_THRESH) {
+		pad = 0;	/* don't pad this request */
+	}
+
+	n = RPC_SEND_VECS(rqst);
+	for (i = 1; i < n; i++) {
+		curlen = RPC_SEND_COPY(rqst, i, 65536, destp);
+		RPC_SEND_LEN0(rqst) += curlen;
+		destp += curlen;
+	}
+	/* header now contains entire send message */
+	return pad;
+}
+
+/*
+ * Coalesce chunks and/or convert a kernel-virtual iov list to physical.
+ *
+ * It's worth it to coalesce adjacent regions because each one
+ * requires its own RDMA op.
+ */
+static int
+rdma_convert_physiov(struct kvec *iov, int niovs,
+			rdma_mr_iov *seg, int nsegs)
+{
+	unsigned char *base = iov->iov_base, *next;
+	unsigned long more;
+	u64 phys;
+	u64 len = iov->iov_len;
+	struct page *pg;
+	int n = 0, off, m;
+
+	for (;;) {
+		++iov;
+		--niovs;
+		more = 0;
+		next = 0;
+		off = (unsigned long)base & ~PAGE_MASK;
+		m = PAGE_SIZE;
+
+		next = (unsigned char *)PAGE_ALIGN((unsigned long)(base + 1));
+		pg = vmalloc_to_page(base);
+		phys = (pg ? page_to_phys(pg) : __pa(base) - off);
+
+		/* check for coalesce */
+		while (m - off < len) {
+			pg = vmalloc_to_page(next);
+			if (phys + m != (pg ? page_to_phys(pg) : __pa(next))) {
+				--iov;	/* back up the truck */
+				++niovs;
+				goto out;
+			}
+			m += PAGE_SIZE;	/* coalesce physical */
+			next += PAGE_SIZE;
+		}
+		/* check for coalesce across iov's too */
+		if (niovs > 0) {
+			/* TBD */
+		}
+	out:
+		phys += off;
+		if (len > m - off) {
+			more = len - (m - off);
+			len = m - off;
+		} else {
+			more = 0;
+		}
+		seg->mr_base = phys;
+		seg->mr_len  = len;
+		++seg;
+		++n;
+		if (niovs == 0)	/* done */
+			return n;
+		if (n >= nsegs)	/* overflow */
+			return 0;
+		if (more) {
+			base = next;
+			len  = more;
+		} else {
+			base = iov->iov_base;
+			len  = iov->iov_len;
+		}
+	}
+	/* notreached */
+}
+
+#if LINUX_VERSION_CODE <= KERNEL_VERSION(2,4,19)
+
+/*
+ * Coalesce chunks in a kernel-virtual iov list.
+ */
+static int
+rdma_convert_virt(struct rpc_iov *target, int first,
+			rdma_mr_iov *seg, int nsegs)
+{
+	struct iovec *iov = target->io_vec + first;
+	int niovs = target->io_nr - first;
+	unsigned char *base = iov->iov_base;
+	int len = iov->iov_len;
+	int n = 0;
+
+	for (;;) {
+		++iov;
+		--niovs;
+		if (niovs > 0 && iov->iov_base == base + len) {
+			len += iov->iov_len;	/* coalesce */
+			continue;
+		}
+		seg->mr_base = (unsigned long) base;
+		seg->mr_len  = len;
+		++seg;
+		++n;
+		if (niovs == 0)	/* done */
+			return n;
+		if (n >= nsegs)	/* overflow */
+			return 0;
+		base = iov->iov_base;
+		len  = iov->iov_len;
+	}
+	/* notreached */
+}
+
+static int
+rdma_convert_phys(struct rpc_iov *target, int first,
+			rdma_mr_iov *seg, int nsegs)
+{
+	return rdma_convert_physiov(target->io_vec + first,
+					target->io_nr - first, seg, nsegs);
+}
+
+#else /* >= 2.4.20 */
+
+static int
+rdma_convert_virt(struct xdr_buf *xdrbuf, int first, rdma_mr_iov *seg, int nsegs)
+{
+/*
+ * Need to kmap, return vaddrs, and find a place to kunmap.
+ */
+	printk("rdma_convert_virt: Memreg=1 not yet supported on this Linux version\n");
+	return 0;
+}
+
+static int
+rdma_convert_phys(struct xdr_buf *xdrbuf, int first, rdma_mr_iov *seg, int nsegs)
+{
+	int len, n = 0, p;
+
+	if (first == 0) {
+		n = rdma_convert_physiov(xdrbuf->head, 1, seg, nsegs);
+		if (n == 0)
+			return 0;
+	}
+
+	if (xdrbuf->page_len) {
+		/* This code can and should be improved to check for
+		 * coalescing opportunities, like the code above. */
+		if (n == nsegs)
+			return 0;
+		seg[n].mr_base = page_to_phys(xdrbuf->pages[0]) + xdrbuf->page_base;
+		seg[n].mr_len = PAGE_SIZE - xdrbuf->page_base;
+		len = xdrbuf->page_len - seg[n].mr_len;
+		++n;
+		p = 1;
+		while (len > 0) {
+			if (n == nsegs)
+				return 0;
+			seg[n].mr_base = page_to_phys(xdrbuf->pages[p]);
+			if (len > PAGE_SIZE)
+				seg[n].mr_len = PAGE_SIZE;
+			else
+				seg[n].mr_len = len;
+			++n;
+			++p;
+			len -= PAGE_SIZE;
+		}
+	}
+	if (xdrbuf->tail[0].iov_len) {
+		if (n == nsegs)
+			return 0;
+		p = rdma_convert_physiov(xdrbuf->tail, 1, &seg[n], nsegs - n);
+		if (p == 0)
+			return 0;
+		n += p;
+	}
+	return n;
+}
+#endif
+
+/*
+ * Chase down a received write or reply chunklist to get length
+ * RDMA'd by server. See map at rdma_create_chunks()! :-)
+ */
+int
+rdma_count_chunks(rdma_rep_t *rep, int max, int wrchunk, unsigned int **iptrp)
+{
+	unsigned int i, total_len;
+	struct xdr_write_chunk *cur_wchunk;
+
+	i = ntohl(**iptrp);	/* get array count */
+	if (i > max) {
+		return -1;
+	}
+	cur_wchunk = (struct xdr_write_chunk *) (*iptrp + 1);
+	total_len = 0;
+	while (i--) {
+#if RPCRDMA_DEBUG
+		unsigned long long off;
+		xdr_decode_hyper((u32 *)&cur_wchunk->target.offset, &off);
+#endif
+		Dprintk(1, ("rdma_count_chunks: chunk %d at 0x%llx:0x%x\n",
+			ntohl(cur_wchunk->target.length), off,
+			ntohl(cur_wchunk->target.handle)));
+		total_len += ntohl(cur_wchunk->target.length);
+		++cur_wchunk;
+	}
+	/* check and adjust for properly terminated write chunk */
+#if 0 /* XXX wtf? */
+	if (wrchunk && *((unsigned int *) cur_wchunk)++ != xdr_zero) {
+		return -1;
+	}
+#endif
+	if ((unsigned char *) cur_wchunk > rep->rr_base + rep->rr_len) {
+		return -1;
+	}
+	*iptrp = (unsigned int *) cur_wchunk;
+	return total_len;
+}
+
+/*
+ * Scatter inline received data back into provided iov's.
+ *
+ * TBD keep a statistic fixup += len
+ */
+void
+rdma_inline_fixup(struct rpc_rqst *rqst, void *srcp, int len)
+{
+	int i, j, n;
+
+	if (len > 0) {
+		j = RPC_RECV_LEN0(rqst);
+                if (j >= len) {
+                        j = len;
+			RPC_RECV_LEN0(rqst) = j; /* write chunk header fixup */
+		}
+		/* Shift pointer for first receive segment only */
+		RPC_RECV_SEG0(rqst) = srcp;
+                srcp = (char *) srcp + j;
+                len -= j;
+        }
+	if (len > 0) {
+		n = RPC_RECV_VECS(rqst);
+		for (i = 1; i < n; i++) {
+			j = RPC_RECV_COPY(rqst, i, len, srcp);
+			srcp = (char *) srcp + j;
+			if ((len -= j) == 0)
+				break;
+		}
+	}
+#if LINUX_VERSION_CODE > KERNEL_VERSION(2,4,19)
+	/* TBD avoid a warning from call_decode() */
+	rqst->rq_private_buf = rqst->rq_rcv_buf;
+#endif
+
+        RDMA_ASSERT(len == 0, "rdma_inline_fixup too much inline data");
+}
Index: drivers/infiniband/ulp/nfsrdma/rdma_proto.h
===================================================================
--- drivers/infiniband/ulp/nfsrdma/rdma_proto.h	(revision 0)
+++ drivers/infiniband/ulp/nfsrdma/rdma_proto.h	(revision 0)
@@ -0,0 +1,101 @@
+/*
+ * Copyright (c) 2003, 2004, Network Appliance, Inc. All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ *	Redistributions of source code must retain the above copyright
+ *	notice, this list of conditions and the following disclaimer.
+ *
+ *	Redistributions in binary form must reproduce the above
+ *	copyright notice, this list of conditions and the following
+ *	disclaimer in the documentation and/or other materials provided
+ *	with the distribution.
+ *
+ *	Neither the name of the Network Appliance, Inc. nor the names of
+ *	its contributors may be used to endorse or promote products
+ *	derived from this software without specific prior written
+ *	permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+
+#ifndef _RDMA_PROTO_H
+#define _RDMA_PROTO_H
+
+struct xdr_rdma_segment {
+	uint32_t handle;	/* Registered memory handle */
+	uint32_t length;	/* Length of the chunk in bytes */
+	uint64_t offset;	/* Chunk virtual address or offset */
+};
+
+/*
+ * read chunk(s), encoded as a linked list.
+ */
+struct xdr_read_chunk {
+	uint32_t discrim;	/* 1 indicates presence */
+	uint32_t position;	/* Position in XDR stream */
+	struct xdr_rdma_segment target;
+};
+
+/*
+ * write chunk(s), encoded as a counted array.
+ */
+struct xdr_write_array {
+	uint32_t discrim;	/* 1 indicates presence */
+	uint32_t nchunks;	/* Array count */
+	/* struct xdr_write_chunk array<> */
+};
+
+/*
+ * actual write chunk, and reply chunk.
+ */
+struct xdr_write_chunk {
+	struct xdr_rdma_segment target;
+};
+
+struct rdma_msg {
+	uint32_t rdma_xid;	/* Mirrors the RPC header xid */
+	uint32_t rdma_vers;	/* Version of this protocol */
+	uint32_t rdma_credit;	/* Buffers requested/granted */
+	uint32_t rdma_type;	/* Type of message (enum rdma_proc) */
+	/* rdma_body rdma_body; */
+};
+
+struct rdma_msg_no_chunks {
+	uint32_t rdma_xid;	/* Mirrors the RPC header xid */
+	uint32_t rdma_vers;	/* Version of this protocol */
+	uint32_t rdma_credit;	/* Buffers requested/granted */
+	uint32_t rdma_type;	/* Type of message (enum rdma_proc) */
+	uint32_t rdma_nochunks[3];	/* 3 empty chunk lists */
+};
+
+struct rdma_msg_padded {
+	uint32_t rdma_xid;	/* Mirrors the RPC header xid */
+	uint32_t rdma_vers;	/* Version of this protocol */
+	uint32_t rdma_credit;	/* Buffers requested/granted */
+	uint32_t rdma_type;	/* Type of message (enum rdma_proc) */
+	uint32_t rdma_align;	/* Padding alignment */
+	uint32_t rdma_thresh;	/* Padding threshold */
+	uint32_t rdma_nochunks[3];	/* 3 empty chunk lists */
+};
+
+enum rdma_proc {
+	RDMA_MSG = 0,		/* An RPC call or reply msg */
+	RDMA_NOMSG = 1,		/* An RPC call or reply msg - separate body */
+	RDMA_MSGP = 2,		/* An RPC call or reply msg with padding */
+	RDMA_DONE = 3		/* Client signals reply completion */
+};
+
+#endif				/* _RDMA_PROTO_H */
Index: drivers/infiniband/ulp/nfsrdma/Makefile
===================================================================
--- drivers/infiniband/ulp/nfsrdma/Makefile	(revision 0)
+++ drivers/infiniband/ulp/nfsrdma/Makefile	(revision 0)
@@ -0,0 +1,5 @@
+EXTRA_CFLAGS += -DRPCRDMA_DEBUG -Idrivers/dat
+
+obj-$(CONFIG_NFSRDMA) = rdma_xprt.o
+
+rdma_xprt-objs := rdma_kdapl.o rdma_transport.o rdma_marshal.o
Index: drivers/infiniband/Kconfig
===================================================================
--- drivers/infiniband/Kconfig	(revision 2723)
+++ drivers/infiniband/Kconfig	(working copy)
@@ -23,4 +23,8 @@ source "drivers/infiniband/ulp/ipoib/Kco
 
 source "drivers/infiniband/ulp/sdp/Kconfig"
 
+source "drivers/infiniband/ulp/dat-provider/Kconfig"
+
+source "drivers/infiniband/ulp/nfsrdma/Kconfig"
+
 endmenu
Index: drivers/infiniband/Makefile
===================================================================
--- drivers/infiniband/Makefile	(revision 2723)
+++ drivers/infiniband/Makefile	(working copy)
@@ -2,3 +2,5 @@ obj-$(CONFIG_INFINIBAND)		+= core/
 obj-$(CONFIG_INFINIBAND_MTHCA)		+= hw/mthca/
 obj-$(CONFIG_INFINIBAND_IPOIB)		+= ulp/ipoib/
 obj-$(CONFIG_INFINIBAND_SDP)		+= ulp/sdp/
+obj-$(CONFIG_INFINIBAND_DAT_PROVIDER)	+= ulp/dat-provider/
+obj-$(CONFIG_INFINIBAND_DAT_PROVIDER)	+= ulp/nfsrdma/




More information about the general mailing list