[ofa-general] [PATCH RFC] rds: add iwarp support

Jon Mason jon at opengridcomputing.com
Thu Jul 3 14:34:12 PDT 2008


This patch adds support for running RDS over iWARP adapters.  It
requires the fast_reg_mr patches and Olaf's RDS patches (for the flow
control enablement) to apply cleanly and build.  It also includes the
patch I sent out earlier to have different connection ports for IB, TCP,
and IWARP.

It is a "work in progress", but I would like for people to take a look
and let me know what they think.  I was able to run rds-ping and
rds-stress for lengths of time without problems (with the exception of
the rds-stress "-D" flag, which enabled RDMA support).  With fast_reg_mr
support, I was able to get RDMA to work at small intervals but there are
still bugs to be worked out.

The bulk of the code is a copy of the RDS IB with the changes necessary
to get iWARP working.  Thus there may be stale comments and redundant
code.  I will audit the code to remove the stale comments, and remove
duplicated and unmodified code (but feel free to point out any that you
see).

The logic in the RDS connection setup to determine which method to use,
requires a new function in the core IB logic to translate a given IP
Address to the IB/iWARP device associated with it.  The only place this
can be determined is in the pre-existing structs in cma.c.

Because the RDS iWARP get_mr function needs to know the QP associated
with the mr, more data is now needed to be passed around.  Luckily, the
QP can be determined based on the rds_sock (which also contains the
ip_addr).  By using this struct, the addition of another function param
to get_mr is not necessary.  However, this did require a change to IB's
get_mr function, as well as the protoype.

Olaf, if you would like to pull this into your personal tree, I can
provide you with delta patches until it is accepted into the OFED 1.4
tree (assuming that you have no major problem with the code below).

Signed-Off-By: Jon Mason <jon at opengridcomputing.com>

diff --git a/drivers/infiniband/core/cma.c b/drivers/infiniband/core/cma.c
index 0751697..0c5028a 100644
--- a/drivers/infiniband/core/cma.c
+++ b/drivers/infiniband/core/cma.c
@@ -36,6 +36,7 @@
 #include <linux/random.h>
 #include <linux/idr.h>
 #include <linux/inetdevice.h>
+#include <linux/if_arp.h>
 
 #include <net/tcp.h>
 
@@ -200,6 +201,43 @@ struct sdp_hah {
 #define CMA_VERSION 0x00
 #define SDP_MAJ_VERSION 0x2
 
+struct ib_device *ipaddr_to_ibdev(u32 addr)
+{
+	struct rdma_dev_addr dev_addr;
+	struct cma_device *cma_dev;
+	struct net_device *dev;
+	union ib_gid gid;
+
+	dev = ip_dev_find(addr);
+	if (!dev)
+		goto out;
+
+	rdma_copy_addr(&dev_addr, dev, NULL);
+
+	switch (dev->type) {
+	case ARPHRD_INFINIBAND:
+		ib_addr_get_sgid(&dev_addr, &gid);
+		break;
+	case ARPHRD_ETHER:
+		iw_addr_get_sgid(&dev_addr, &gid);
+		break;
+	default:
+		goto out;
+	}
+
+	list_for_each_entry(cma_dev, &dev_list, list) {
+		int ret;
+		u8 port;
+
+		ret = ib_find_cached_gid(cma_dev->device, &gid, &port, NULL);
+		if (!ret)
+			return cma_dev->device;
+	}
+out:
+	return NULL;
+}
+EXPORT_SYMBOL(ipaddr_to_ibdev);
+
 static int cma_comp(struct rdma_id_private *id_priv, enum cma_state comp)
 {
 	unsigned long flags;
diff --git a/net/rds/Kconfig b/net/rds/Kconfig
index 57efde6..08edb04 100644
--- a/net/rds/Kconfig
+++ b/net/rds/Kconfig
@@ -10,6 +10,10 @@ config RDS_IB
 	tristate "  RDS over Infiniband"
 	depends RDS
 
+config RDS_IWARP
+	tristate "  RDS over iWARP"
+	depends RDS
+
 config RDS_TCP
 	tristate "  RDS over TCP"
 	depends RDS
diff --git a/net/rds/Makefile b/net/rds/Makefile
index 6f0d72f..d647eb7 100644
--- a/net/rds/Makefile
+++ b/net/rds/Makefile
@@ -16,6 +16,12 @@ rds-y +=	ib.o ib_cm.o ib_recv.o ib_ring.o ib_send.o ib_stats.o \
 		ib_sysctl.o ib_rdma.o
 endif
 
+# we don't *quite* have modular transports yet
+ifeq ($(CONFIG_RDS_IWARP), m)
+rds-y +=	iwarp.o iwarp_cm.o iwarp_recv.o iwarp_ring.o iwarp_send.o iwarp_stats.o \
+		iwarp_sysctl.o iwarp_rdma.o
+endif
+
 ifeq ($(CONFIG_RDS_DEBUG), y)
 EXTRA_CFLAGS += -DDEBUG
 endif
diff --git a/net/rds/ib.c b/net/rds/ib.c
index cd2dc7c..1b41ef1 100644
--- a/net/rds/ib.c
+++ b/net/rds/ib.c
@@ -64,6 +64,9 @@ void rds_ib_add_one(struct ib_device *device)
 	struct rds_ib_device *rds_ibdev;
 	struct ib_device_attr *dev_attr;
 
+	if (device->node_type == RDMA_NODE_RNIC)
+		return;
+
 	dev_attr = kmalloc(sizeof *dev_attr, GFP_KERNEL);
 	if (!dev_attr)
 		return;
diff --git a/net/rds/ib.h b/net/rds/ib.h
index 947977c..f1d3be3 100644
--- a/net/rds/ib.h
+++ b/net/rds/ib.h
@@ -256,7 +256,7 @@ int ib_update_ipaddr_for_device(struct rds_ib_device *rds_ibdev, __be32 ipaddr);
 struct rds_ib_mr_pool *rds_ib_create_mr_pool(struct rds_ib_device *);
 void rds_ib_destroy_mr_pool(struct rds_ib_mr_pool *);
 void *rds_ib_get_mr(struct scatterlist *sg, unsigned long nents,
-		    __be32 ip_addr, u32 *key_ret);
+		    struct rds_sock *rs, u32 *key_ret);
 void rds_ib_sync_mr(void *trans_private, int dir);
 void rds_ib_free_mr(void *trans_private, int invalidate);
 void rds_ib_flush_mrs(void);
diff --git a/net/rds/ib_cm.c b/net/rds/ib_cm.c
index 20c888d..6287fb6 100644
--- a/net/rds/ib_cm.c
+++ b/net/rds/ib_cm.c
@@ -609,7 +609,7 @@ int rds_ib_conn_connect(struct rds_connection *conn)
 
 	dest.sin_family = AF_INET;
 	dest.sin_addr.s_addr = (__force u32)conn->c_faddr;
-	dest.sin_port = (__force u16)htons(RDS_PORT);
+	dest.sin_port = (__force u16)htons(RDS_IB_PORT);
 
 	ret = rdma_resolve_addr(ic->i_cm_id, (struct sockaddr *)&src,
 				(struct sockaddr *)&dest,
@@ -794,7 +794,7 @@ int __init rds_ib_listen_init(void)
 
 	sin.sin_family = PF_INET,
 	sin.sin_addr.s_addr = (__force u32)htonl(INADDR_ANY);
-	sin.sin_port = (__force u16)htons(RDS_PORT);
+	sin.sin_port = (__force u16)htons(RDS_IB_PORT);
 
 	/*
 	 * XXX I bet this binds the cm_id to a device.  If we want to support
@@ -814,7 +814,7 @@ int __init rds_ib_listen_init(void)
 		goto out;
 	}
 
-	rdsdebug("cm %p listening on port %u\n", cm_id, RDS_PORT);
+	rdsdebug("cm %p listening on port %u\n", cm_id, RDS_IB_PORT);
 
 	rds_ib_listen_id = cm_id;
 	cm_id = NULL;
diff --git a/net/rds/ib_rdma.c b/net/rds/ib_rdma.c
index 8c6cac3..b427e73 100644
--- a/net/rds/ib_rdma.c
+++ b/net/rds/ib_rdma.c
@@ -523,10 +523,11 @@ void rds_ib_flush_mrs(void)
 }
 
 void *rds_ib_get_mr(struct scatterlist *sg, unsigned long nents,
-		    __be32 ip_addr, u32 *key_ret)
+		    struct rds_sock *rs, u32 *key_ret)
 {
 	struct rds_ib_device *rds_ibdev;
 	struct rds_ib_mr *ibmr = NULL;
+	__be32 ip_addr = rs->rs_bound_addr;
 	int ret;
 
 	rds_ibdev = ib_get_device(ip_addr);
diff --git a/net/rds/iwarp.c b/net/rds/iwarp.c
new file mode 100644
index 0000000..8491839
--- /dev/null
+++ b/net/rds/iwarp.c
@@ -0,0 +1,247 @@
+/*
+ * Copyright (c) 2008 Chelsio, Inc.  All rights reserved.
+ *
+ * This software is available to you under a choice of one of two
+ * licenses.  You may choose to be licensed under the terms of the GNU
+ * General Public License (GPL) Version 2, available from the file
+ * COPYING in the main directory of this source tree, or the
+ * OpenIB.org BSD license below:
+ *
+ *     Redistribution and use in source and binary forms, with or
+ *     without modification, are permitted provided that the following
+ *     conditions are met:
+ *
+ *      - Redistributions of source code must retain the above
+ *        copyright notice, this list of conditions and the following
+ *        disclaimer.
+ *
+ *      - Redistributions in binary form must reproduce the above
+ *        copyright notice, this list of conditions and the following
+ *        disclaimer in the documentation and/or other materials
+ *        provided with the distribution.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
+ * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
+ * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
+ * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
+ * BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
+ * ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
+ * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ *
+ */
+#include <linux/kernel.h>
+#include <linux/in.h>
+#include <linux/if.h>
+#include <linux/netdevice.h>
+#include <linux/inetdevice.h>
+#include <linux/if_arp.h>
+
+#include "rds.h"
+#include "iwarp.h"
+
+unsigned int fast_mr_pool_size = RDS_FMR_POOL_SIZE;
+unsigned int fast_mr_message_size = RDS_FMR_SIZE + 1; /* +1 allows for unaligned MRs */
+
+module_param(fast_mr_pool_size, int, 0444);
+MODULE_PARM_DESC(fast_mr_pool_size, " Max number of fmr per HCA");
+module_param(fast_mr_message_size, int, 0444);
+MODULE_PARM_DESC(fast_mr_message_size, " Max size of a RDMA transfer");
+
+struct list_head rds_iwarp_devices;
+
+void rds_iwarp_add_one(struct ib_device *device);
+void rds_iwarp_remove_one(struct ib_device *device);
+
+struct ib_client rds_iwarp_client = {
+	.name   = "rds_iwarp",
+	.add    = rds_iwarp_add_one,
+	.remove = rds_iwarp_remove_one
+};
+
+void rds_iwarp_add_one(struct ib_device *device)
+{
+	struct rds_iwarp_device *rds_iwarpdev;
+	struct ib_device_attr *dev_attr;
+
+	if (device->node_type != RDMA_NODE_RNIC)
+		return;
+
+	dev_attr = kmalloc(sizeof *dev_attr, GFP_KERNEL);
+	if (!dev_attr)
+		return;
+
+	if (ib_query_device(device, dev_attr)) {
+		rdsdebug("Query device failed for %s\n", device->name);
+		goto free_attr;
+	}
+
+	rds_iwarpdev = kmalloc(sizeof *rds_iwarpdev, GFP_KERNEL);
+	if (!rds_iwarpdev)
+		goto free_attr;
+
+	spin_lock_init(&rds_iwarpdev->spinlock);
+
+	rds_iwarpdev->max_sge = min(dev_attr->max_sge, RDS_IWARP_MAX_SGE);
+
+	rds_iwarpdev->fmr_page_shift = max(9, ffs(dev_attr->page_size_cap) - 1);
+	rds_iwarpdev->fmr_page_size  = 1 << rds_iwarpdev->fmr_page_shift;
+	rds_iwarpdev->fmr_page_mask  = ~((u64) rds_iwarpdev->fmr_page_size - 1);
+	rds_iwarpdev->fmr_max_remaps = dev_attr->max_map_per_fmr?: 32;
+	rds_iwarpdev->max_fmrs = dev_attr->max_fmr?
+			min_t(unsigned int, dev_attr->max_fmr, fast_mr_pool_size) :
+			fast_mr_pool_size;
+
+	rds_iwarpdev->dev = device;
+	rds_iwarpdev->pd = ib_alloc_pd(device);
+	if (IS_ERR(rds_iwarpdev->pd))
+		goto free_dev;
+
+	rds_iwarpdev->mr = ib_get_dma_mr(rds_iwarpdev->pd, IB_ACCESS_REMOTE_READ | 
+					 IB_ACCESS_REMOTE_WRITE | IB_ACCESS_LOCAL_WRITE);
+	if (IS_ERR(rds_iwarpdev->mr))
+		goto err_pd;
+
+	rds_iwarpdev->mr_pool = rds_iwarp_create_mr_pool(rds_iwarpdev);
+	if (IS_ERR(rds_iwarpdev->mr_pool)) {
+		rds_iwarpdev->mr_pool = NULL;
+		goto err_mr;
+	}
+
+	INIT_LIST_HEAD(&rds_iwarpdev->ipaddr_list);
+	list_add_tail(&rds_iwarpdev->list, &rds_iwarp_devices);
+
+	ib_set_client_data(device, &rds_iwarp_client, rds_iwarpdev);
+
+	goto free_attr;
+
+err_mr:
+	ib_dereg_mr(rds_iwarpdev->mr);
+err_pd:
+	ib_dealloc_pd(rds_iwarpdev->pd);
+free_dev:
+	kfree(rds_iwarpdev);
+free_attr:
+	kfree(dev_attr);
+}
+
+void rds_iwarp_remove_one(struct ib_device *device)
+{
+	struct rds_iwarp_device *rds_iwarpdev;
+	struct rds_iwarp_ipaddr *i_ipaddr, *next;
+
+	rds_iwarpdev = ib_get_client_data(device, &rds_iwarp_client);
+	if (!rds_iwarpdev)
+		return;
+
+	list_for_each_entry_safe(i_ipaddr, next, &rds_iwarpdev->ipaddr_list, list) {
+		list_del(&i_ipaddr->list);
+		kfree(i_ipaddr);
+	}
+
+	if (rds_iwarpdev->mr_pool)
+		rds_iwarp_destroy_mr_pool(rds_iwarpdev->mr_pool);
+
+	ib_dereg_mr(rds_iwarpdev->mr);
+	ib_dealloc_pd(rds_iwarpdev->pd);
+	
+	list_del(&rds_iwarpdev->list);
+	kfree(rds_iwarpdev);
+}
+
+extern struct ib_device *ipaddr_to_ibdev(u32 addr);
+
+static int rds_iwarp_laddr_check(__be32 addr)
+{
+	struct ib_device *ibdev;
+	int ret = -EADDRNOTAVAIL;
+
+	ibdev = ipaddr_to_ibdev(addr);
+	if (!ibdev)
+		goto out;
+
+	if (ibdev->node_type == RDMA_NODE_RNIC) {
+		ret = 0;
+	}
+
+out:
+	rdsdebug("addr %u.%u.%u.%u ret %d\n", NIPQUAD(addr), ret);
+	return ret;
+}
+
+/*
+ * conns should have been freed up by the time we get here..
+ */
+static void rds_iwarp_exit(void)
+{
+	rds_iwarp_listen_stop();
+	rds_trans_unregister(&rds_iwarp_transport);
+	rds_iwarp_recv_exit();
+	rds_iwarp_sysctl_exit();
+	ib_unregister_client(&rds_iwarp_client);
+}
+
+struct rds_transport rds_iwarp_transport = {
+	.laddr_check		= rds_iwarp_laddr_check,
+	.xmit_complete		= rds_iwarp_xmit_complete,
+	.xmit			= rds_iwarp_xmit,
+	.xmit_cong_map		= NULL,
+	.xmit_rdma		= rds_iwarp_xmit_rdma,
+	.recv			= rds_iwarp_recv,
+	.conn_alloc		= rds_iwarp_conn_alloc,
+	.conn_free		= rds_iwarp_conn_free,
+	.conn_connect		= rds_iwarp_conn_connect,
+	.conn_shutdown		= rds_iwarp_conn_shutdown,
+	.inc_copy_to_user	= rds_iwarp_inc_copy_to_user,
+	.inc_purge		= rds_iwarp_inc_purge,
+	.inc_free		= rds_iwarp_inc_free,
+	.listen_stop		= rds_iwarp_listen_stop,
+	.stats_info_copy	= rds_iwarp_stats_info_copy,
+	.exit			= rds_iwarp_exit,
+	.get_mr			= rds_iwarp_get_mr,
+	.sync_mr		= rds_iwarp_sync_mr,
+	.free_mr		= rds_iwarp_free_mr,
+	.flush_mrs		= rds_iwarp_flush_mrs,
+	.t_owner		= THIS_MODULE,
+	.t_name			= "iwarp",
+};
+
+int __init rds_iwarp_init(void)
+{
+	int ret;
+
+	INIT_LIST_HEAD(&rds_iwarp_devices);
+
+	ret = ib_register_client(&rds_iwarp_client);
+	if (ret)
+		goto out;
+
+	ret = rds_iwarp_sysctl_init();
+	if (ret)
+		goto out_iwarpreg;
+
+	ret = rds_iwarp_recv_init();
+	if (ret)
+		goto out_sysctl;
+
+	ret = rds_trans_register(&rds_iwarp_transport);
+	if (ret)
+		goto out_recv;
+
+	ret = rds_iwarp_listen_init();
+	if (ret)
+		goto out_register;
+
+	goto out;
+
+out_register:
+	rds_trans_unregister(&rds_iwarp_transport);
+out_recv:
+	rds_iwarp_recv_exit();
+out_sysctl:
+	rds_iwarp_sysctl_exit();
+out_iwarpreg:
+	ib_unregister_client(&rds_iwarp_client);
+out:
+	return ret;
+}
diff --git a/net/rds/iwarp.h b/net/rds/iwarp.h
new file mode 100644
index 0000000..cc07f2d
--- /dev/null
+++ b/net/rds/iwarp.h
@@ -0,0 +1,339 @@
+#ifndef _RDS_IWARP_H
+#define _RDS_IWARP_H 
+
+#include <rdma/ib_verbs.h>
+#include <rdma/rdma_cm.h>
+#include "rds.h"
+
+#define RDS_IWARP_RESOLVE_TIMEOUT_MS	5000
+
+/* FIXME - use ib_query_device to determine proper value */
+#define RDS_FMR_SIZE			20 
+#define RDS_FMR_POOL_SIZE		2048
+
+#define RDS_IWARP_MAX_SGE		8
+#define RDS_IWARP_RECV_SGE 		2
+
+/* FIXME - call ib_query_device to determine sane values for this based on the HW */
+#define RDS_IWARP_DEFAULT_RECV_WR	512
+#define RDS_IWARP_DEFAULT_SEND_WR	128
+
+#define RDS_IWARP_SUPPORTED_PROTOCOLS	0x00000003	/* minor versions supported */
+
+/*
+ * IWARP posts RDS_FRAG_SIZE fragments of pages to the receive queues to 
+ * try and minimize the amount of memory tied up both the device and
+ * socket receive queues.
+ */
+/* page offset of the final full frag that fits in the page */
+#define RDS_PAGE_LAST_OFF (((PAGE_SIZE  / RDS_FRAG_SIZE) - 1) * RDS_FRAG_SIZE)
+struct rds_page_frag {
+	struct list_head	f_item;
+	struct page		*f_page;
+	unsigned long		f_offset;
+	dma_addr_t 		f_mapped;
+};
+
+struct rds_iwarp_incoming {
+	struct list_head	ii_frags;
+	struct rds_incoming	ii_inc;
+};
+
+struct rds_iwarp_connect_private {
+	/* Add new fields at the end, and don't permute existing fields. */
+	__be32			dp_saddr;
+	__be32			dp_daddr;
+	u8			dp_protocol_major;
+	u8			dp_protocol_minor;
+	__be16			dp_protocol_minor_mask; /* bitmask */
+	__be32			dp_reserved1;
+	__be64			dp_ack_seq;
+	__be32			dp_credit;		/* non-zero enables flow ctl */
+};
+
+struct rds_iwarp_send_work {
+	struct rds_message	*s_rm;
+	struct rds_rdma_op	*s_op;
+	struct ib_send_wr	s_wr;
+	struct ib_sge		s_sge[RDS_IWARP_MAX_SGE];
+	unsigned long		s_queued;
+};
+
+struct rds_iwarp_recv_work {
+	struct rds_iwarp_incoming 	*r_iwarpinc;
+	struct rds_page_frag	*r_frag;
+	struct ib_recv_wr	r_wr;
+	struct ib_sge		r_sge[2];
+};
+
+struct rds_iwarp_work_ring {
+	u32		w_nr;
+	u32		w_alloc_ptr;
+	u32		w_alloc_ctr;
+	u32		w_free_ptr;
+	atomic_t	w_free_ctr;
+};
+
+struct rds_iwarp_connection {
+	/* alphabet soup, IWARPTA style */
+	struct rdma_cm_id	*i_cm_id;
+	struct ib_pd		*i_pd;
+	struct ib_mr		*i_mr;
+	struct ib_cq		*i_send_cq;
+	struct ib_cq		*i_recv_cq;
+	
+	/* tx */
+	struct rds_iwarp_work_ring	i_send_ring;
+	struct rds_message	*i_rm;
+	struct rds_header	*i_send_hdrs;
+	u64			i_send_hdrs_dma;
+	struct rds_iwarp_send_work *i_sends;
+
+	/* rx */
+	struct mutex		i_recv_mutex;
+	struct rds_iwarp_work_ring	i_recv_ring;
+	struct rds_iwarp_incoming	*i_iwarpinc;
+	u32			i_recv_data_rem;
+	struct rds_header	*i_recv_hdrs;
+	u64			i_recv_hdrs_dma;
+	struct rds_iwarp_recv_work *i_recvs;
+	struct rds_page_frag	i_frag;
+	u64			i_ack_recv;	/* last ACK received */
+
+	/* sending acks */
+	unsigned long		i_ack_flags;
+#ifndef KERNEL_HAS_ATOMIC64
+	spinlock_t		i_ack_lock;
+	u64			i_ack_next;	/* next ACK to send */
+#else
+	atomic64_t		i_ack_next;	/* next ACK to send */
+#endif
+	struct rds_header	*i_ack;
+	struct ib_send_wr	i_ack_wr;
+	struct ib_sge		i_ack_sge;
+	u64			i_ack_dma;
+	unsigned long		i_ack_queued;
+
+	/* Flow control related information
+	 *
+	 * Our algorithm uses a pair variables that we need to access
+	 * atomically - one for the send credits, and one posted
+	 * recv credits we need to transfer to remote.
+	 * Rather than protect them using a slow spinlock, we put both into
+	 * a single atomic_t and update it using cmpxchg
+	 */
+	atomic_t		i_credits;
+ 
+	/* Protocol version specific information */
+	unsigned int		i_hdr_idx;	/* 1 (old) or 0 (3.1 or later) */
+	unsigned int		i_flowctl : 1;	/* enable/disable flow ctl */
+
+	/* Batched completions */
+	unsigned int		i_unsignaled_wrs;
+	long			i_unsignaled_bytes;
+};
+
+/* This assumes that atomic_t is at least 32 bits */
+#define IWARP_GET_SEND_CREDITS(v)	((v) & 0xffff)
+#define IWARP_GET_POST_CREDITS(v)	((v) >> 16)
+#define IWARP_SET_SEND_CREDITS(v)	((v) & 0xffff)
+#define IWARP_SET_POST_CREDITS(v)	((v) << 16)
+
+struct rds_iwarp_ipaddr {
+	struct list_head	list;
+	__be32			ipaddr;
+};
+
+struct rds_iwarp_device {
+	struct list_head	list;
+	struct list_head	ipaddr_list;
+	struct ib_device	*dev;
+	struct ib_pd		*pd;
+	struct ib_mr		*mr;
+	struct rds_iwarp_mr_pool	*mr_pool;
+	int			fmr_page_shift;
+	int			fmr_page_size;
+	u64			fmr_page_mask;
+	unsigned int		fmr_max_remaps;
+	unsigned int		max_fmrs;
+	int			max_sge;
+	spinlock_t		spinlock;
+};
+
+/* bits for i_ack_flags */
+#define IWARP_ACK_IN_FLIGHT	0
+#define IWARP_ACK_REQUESTED	1
+
+/* Magic WR_ID for ACKs */
+#define RDS_IWARP_ACK_WR_ID	(~(u64) 0)
+
+struct rds_iwarp_statistics {
+	unsigned long	s_iwarp_connect_raced;
+	unsigned long	s_iwarp_listen_closed_stale;
+	unsigned long	s_iwarp_tx_cq_call;
+	unsigned long	s_iwarp_tx_cq_event;
+	unsigned long	s_iwarp_tx_ring_full;
+	unsigned long	s_iwarp_tx_throttle;
+	unsigned long	s_iwarp_tx_sg_mapping_failure;
+	unsigned long	s_iwarp_tx_stalled;
+	unsigned long	s_iwarp_tx_credit_updates;
+	unsigned long	s_iwarp_rx_cq_call;
+	unsigned long	s_iwarp_rx_cq_event;
+	unsigned long	s_iwarp_rx_ring_empty;
+	unsigned long	s_iwarp_rx_refill_from_cq;
+	unsigned long	s_iwarp_rx_refill_from_thread;
+	unsigned long	s_iwarp_rx_alloc_limit;
+	unsigned long	s_iwarp_rx_credit_updates;
+	unsigned long	s_iwarp_ack_sent;
+	unsigned long	s_iwarp_ack_send_failure;
+	unsigned long	s_iwarp_ack_send_delayed;
+	unsigned long	s_iwarp_ack_send_piggybacked;
+	unsigned long	s_iwarp_ack_received;
+	unsigned long	s_iwarp_rdma_mr_alloc;
+	unsigned long	s_iwarp_rdma_mr_free;
+	unsigned long	s_iwarp_rdma_mr_used;
+	unsigned long	s_iwarp_rdma_mr_pool_flush;
+	unsigned long	s_iwarp_rdma_mr_pool_wait;
+};
+
+extern struct workqueue_struct *rds_iwarp_wq;
+
+/*
+ * Fake ib_dma_sync_sg_for_{cpu,device} as long as ib_verbs.h
+ * doesn't define it.
+ */
+static inline void rds_iwarp_dma_sync_sg_for_cpu(struct ib_device *dev,
+		struct scatterlist *sg, unsigned int sg_dma_len, int direction)
+{
+	unsigned int i;
+
+	for (i = 0; i < sg_dma_len; ++i) {
+		ib_dma_sync_single_for_cpu(dev,
+				ib_sg_dma_address(dev, &sg[i]),
+				ib_sg_dma_len(dev, &sg[i]),
+				direction);
+	}
+}
+#define ib_dma_sync_sg_for_cpu	rds_iwarp_dma_sync_sg_for_cpu
+
+static void inline rds_iwarp_dma_sync_sg_for_device(struct ib_device *dev,
+		struct scatterlist *sg, unsigned int sg_dma_len, int direction)
+{
+	unsigned int i;
+
+	for (i = 0; i < sg_dma_len; ++i) {
+		ib_dma_sync_single_for_device(dev,
+				ib_sg_dma_address(dev, &sg[i]),
+				ib_sg_dma_len(dev, &sg[i]),
+				direction);
+	}
+}
+#define ib_dma_sync_sg_for_device	rds_iwarp_dma_sync_sg_for_device
+
+
+/* iwarp.c */
+extern struct rds_transport rds_iwarp_transport;
+extern void rds_iwarp_add_one(struct ib_device *device);
+extern void rds_iwarp_remove_one(struct ib_device *device);
+extern struct ib_client rds_iwarp_client;
+
+extern unsigned int fast_mr_pool_size;
+extern unsigned int fast_mr_message_size;
+
+/* iwarp_cm.c */
+int rds_iwarp_conn_alloc(struct rds_connection *conn, gfp_t gfp);
+void rds_iwarp_conn_free(void *arg);
+int rds_iwarp_conn_connect(struct rds_connection *conn);
+void rds_iwarp_conn_shutdown(struct rds_connection *conn);
+void rds_iwarp_state_change(struct sock *sk);
+int __init rds_iwarp_listen_init(void);
+void rds_iwarp_listen_stop(void);
+void __rds_iwarp_conn_error(struct rds_connection *conn, const char *, ...);
+
+#define rds_iwarp_conn_error(conn, fmt...) \
+	__rds_iwarp_conn_error(conn, KERN_WARNING "RDS/IWARP: " fmt )
+
+/* iwarp_rdma.c */
+struct rds_iwarp_mr_pool *rds_iwarp_create_mr_pool(struct rds_iwarp_device *);
+void rds_iwarp_destroy_mr_pool(struct rds_iwarp_mr_pool *);
+void *rds_iwarp_get_mr(struct scatterlist *sg, unsigned long nents,
+			struct rds_sock *rs, u32 *key_ret);
+void rds_iwarp_sync_mr(void *trans_private, int dir);
+void rds_iwarp_free_mr(void *trans_private, int invalidate);
+void rds_iwarp_flush_mrs(void);
+
+/* iwarp_recv.c */
+int __init rds_iwarp_recv_init(void);
+void rds_iwarp_recv_exit(void);
+int rds_iwarp_recv(struct rds_connection *conn);
+int rds_iwarp_recv_refill(struct rds_connection *conn, gfp_t kptr_gfp,
+		       gfp_t page_gfp, int prefill);
+void rds_iwarp_inc_purge(struct rds_incoming *inc);
+void rds_iwarp_inc_free(struct rds_incoming *inc);
+int rds_iwarp_inc_copy_to_user(struct rds_incoming *inc, struct iovec *iov,
+			     size_t size);
+void rds_iwarp_recv_cq_comp_handler(struct ib_cq *cq, void *context);
+void rds_iwarp_recv_init_ring(struct rds_iwarp_connection *ic);
+void rds_iwarp_recv_clear_ring(struct rds_iwarp_connection *ic);
+void rds_iwarp_recv_init_ack(struct rds_iwarp_connection *ic);
+void rds_iwarp_attempt_ack(struct rds_iwarp_connection *ic);
+void rds_iwarp_ack_send_complete(struct rds_iwarp_connection *ic);
+u64 rds_iwarp_piggyb_ack(struct rds_iwarp_connection *ic);
+
+/* iwarp_ring.c */
+void rds_iwarp_ring_init(struct rds_iwarp_work_ring *ring, u32 nr);
+u32 rds_iwarp_ring_alloc(struct rds_iwarp_work_ring *ring, u32 val, u32 *pos);
+void rds_iwarp_ring_free(struct rds_iwarp_work_ring *ring, u32 val);
+void rds_iwarp_ring_unalloc(struct rds_iwarp_work_ring *ring, u32 val);
+int rds_iwarp_ring_empty(struct rds_iwarp_work_ring *ring);
+u32 rds_iwarp_ring_oldest(struct rds_iwarp_work_ring *ring);
+u32 rds_iwarp_ring_completed(struct rds_iwarp_work_ring *ring, u32 wr_id, u32 oldest);
+extern wait_queue_head_t rds_iwarp_ring_empty_wait;
+
+/* iwarp_send.c */
+void rds_iwarp_xmit_complete(struct rds_connection *conn);
+int rds_iwarp_xmit(struct rds_connection *conn, struct rds_message *rm,
+	        unsigned int hdr_off, unsigned int sg, unsigned int off);
+void rds_iwarp_send_cq_comp_handler(struct ib_cq *cq, void *context);
+void rds_iwarp_send_init_ring(struct rds_iwarp_connection *ic);
+void rds_iwarp_send_clear_ring(struct rds_iwarp_connection *ic);
+int rds_iwarp_xmit_rdma(struct rds_connection *conn, struct rds_rdma_op *op);
+void rds_iwarp_send_add_credits(struct rds_connection *conn, unsigned int credits);
+void rds_iwarp_advertise_credits(struct rds_connection *conn, unsigned int posted);
+int rds_iwarp_send_grab_credits(struct rds_iwarp_connection *ic, u32 wanted,
+			     u32 *adv_credits);
+
+/* iwarp_stats.c */
+RDS_DECLARE_PER_CPU(struct rds_iwarp_statistics, rds_iwarp_stats);
+#define rds_iwarp_stats_inc(member) rds_stats_inc_which(rds_iwarp_stats, member)
+unsigned int rds_iwarp_stats_info_copy(struct rds_info_iterator *iter,
+				    unsigned int avail);
+
+/* iwarp_sysctl.c */
+int __init rds_iwarp_sysctl_init(void);
+void rds_iwarp_sysctl_exit(void);
+extern unsigned long rds_iwarp_sysctl_max_send_wr;
+extern unsigned long rds_iwarp_sysctl_max_recv_wr;
+extern unsigned long rds_iwarp_sysctl_max_unsig_wrs;
+extern unsigned long rds_iwarp_sysctl_max_unsig_bytes;
+extern unsigned long rds_iwarp_sysctl_max_recv_allocation;
+extern unsigned int rds_iwarp_sysctl_flow_control;
+extern ctl_table rds_iwarp_sysctl_table[];
+
+/*
+ * Helper functions for getting/setting the header and data SGEs in
+ * RDS packets (not RDMA)
+ */
+static inline struct ib_sge *
+rds_iwarp_header_sge(struct rds_iwarp_connection *ic, struct ib_sge *sge)
+{
+	return &sge[ic->i_hdr_idx];
+}
+
+static inline struct ib_sge *
+rds_iwarp_data_sge(struct rds_iwarp_connection *ic, struct ib_sge *sge)
+{
+	return &sge[1 - ic->i_hdr_idx];
+}
+
+#endif
diff --git a/net/rds/iwarp_cm.c b/net/rds/iwarp_cm.c
new file mode 100644
index 0000000..ecd08dc
--- /dev/null
+++ b/net/rds/iwarp_cm.c
@@ -0,0 +1,853 @@
+/*
+ * Copyright (c) 2008 Chelsio, Inc.  All rights reserved.
+ *
+ * This software is available to you under a choice of one of two
+ * licenses.  You may choose to be licensed under the terms of the GNU
+ * General Public License (GPL) Version 2, available from the file
+ * COPYING in the main directory of this source tree, or the
+ * OpenIB.org BSD license below:
+ *
+ *     Redistribution and use in source and binary forms, with or
+ *     without modification, are permitted provided that the following
+ *     conditions are met:
+ *
+ *      - Redistributions of source code must retain the above
+ *        copyright notice, this list of conditions and the following
+ *        disclaimer.
+ *
+ *      - Redistributions in binary form must reproduce the above
+ *        copyright notice, this list of conditions and the following
+ *        disclaimer in the documentation and/or other materials
+ *        provided with the distribution.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
+ * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
+ * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
+ * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
+ * BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
+ * ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
+ * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ *
+ */
+#include <linux/kernel.h>
+#include <linux/in.h>
+
+#include "rds.h"
+#include "iwarp.h"
+
+static struct rdma_cm_id *rds_iwarp_listen_id;
+
+/*
+ * Set the selected protocol version
+ */
+static void rds_iwarp_set_protocol(struct rds_connection *conn, unsigned int version)
+{
+	struct rds_iwarp_connection *ic = conn->c_transport_data;
+
+	conn->c_version = version;
+
+	if (conn->c_version == RDS_PROTOCOL_3_0) {
+		ic->i_hdr_idx = 1;
+	} else {
+		ic->i_hdr_idx = 0;
+	}
+}
+
+/*
+ * Set up flow control
+ */
+static void rds_iwarp_set_flow_control(struct rds_connection *conn, u32 credits)
+{
+	struct rds_iwarp_connection *ic = conn->c_transport_data;
+
+	if (rds_iwarp_sysctl_flow_control && credits != 0) {
+		/* We're doing flow control */
+		ic->i_flowctl = 1;
+		rds_iwarp_send_add_credits(conn, credits);
+	} else {
+		ic->i_flowctl = 0;
+	}
+}
+
+static int iwarp_update_ipaddr_for_device(struct rds_iwarp_device *rds_iwarpdev, __be32 ipaddr)
+{
+	struct rds_iwarp_ipaddr *i_ipaddr;
+
+	spin_lock_irq(&rds_iwarpdev->spinlock);
+	list_for_each_entry(i_ipaddr, &rds_iwarpdev->ipaddr_list, list) {
+		if (i_ipaddr->ipaddr == ipaddr) {
+			spin_unlock_irq(&rds_iwarpdev->spinlock);
+			return 0;
+		}
+	}
+	spin_unlock_irq(&rds_iwarpdev->spinlock);
+
+	i_ipaddr = kmalloc(sizeof *i_ipaddr, GFP_KERNEL);
+	if (!i_ipaddr)
+		return -ENOMEM;
+
+	i_ipaddr->ipaddr = ipaddr;
+
+	spin_lock_irq(&rds_iwarpdev->spinlock);
+	list_add_tail(&i_ipaddr->list, &rds_iwarpdev->ipaddr_list);
+	spin_unlock_irq(&rds_iwarpdev->spinlock);
+
+	return 0;
+}
+
+/*
+ * Connection established.
+ * We get here for both outgoing and incoming connection.
+ */
+static void rds_iwarp_connect_complete(struct rds_connection *conn, struct rdma_cm_event *event)
+{
+	const struct rds_iwarp_connect_private *dp = NULL;
+	struct rds_iwarp_connection *ic = conn->c_transport_data;
+	struct rds_iwarp_device *rds_iwarpdev;
+	struct ib_qp_attr qp_attr;
+	int ret;
+
+	if (event->param.conn.private_data_len) {
+		dp = event->param.conn.private_data;
+
+		rds_iwarp_set_protocol(conn,
+				RDS_PROTOCOL(dp->dp_protocol_major,
+					dp->dp_protocol_minor));
+		rds_iwarp_set_flow_control(conn, be32_to_cpu(dp->dp_credit));
+	}
+
+	printk(KERN_NOTICE "RDS/IWARP: connected to %u.%u.%u.%u version %u.%u%s\n",
+			NIPQUAD(conn->c_laddr),
+			RDS_PROTOCOL_MAJOR(conn->c_version),
+			RDS_PROTOCOL_MINOR(conn->c_version),
+			ic->i_flowctl? ", flow control" : "");
+
+	rds_iwarp_recv_refill(conn, GFP_KERNEL, GFP_HIGHUSER, 1);
+
+	/* Tune the RNR timeout. We use a rather low timeout, but
+	 * not the absolute minimum - this should be tunable.
+	 *
+	 * We already set the RNR retry count to 7 (which is the
+	 * smallest infinite number :-) above
+	 */
+	qp_attr.qp_state = IB_QPS_RTS;
+	qp_attr.min_rnr_timer = IB_RNR_TIMER_000_32;
+	ret = ib_modify_qp(ic->i_cm_id->qp, &qp_attr,
+				IB_QP_STATE | IB_QP_MIN_RNR_TIMER);
+	if (ret) {
+		printk(KERN_NOTICE "ib_modify_qp(IB_QP_MIN_RNR_TIMER, %u): err=%d\n",
+				qp_attr.min_rnr_timer, -ret);
+	}
+
+	/* update ib_device with this local ipaddr */
+	rds_iwarpdev = ib_get_client_data(ic->i_cm_id->device, &rds_iwarp_client);
+	iwarp_update_ipaddr_for_device(rds_iwarpdev, conn->c_laddr);
+
+	/* If the peer gave us the last packet it saw, process this as if
+	 * we had received a regular ACK. */
+	if (dp && dp->dp_ack_seq)
+		rds_send_drop_acked(conn, be64_to_cpu(dp->dp_ack_seq), NULL);
+
+	rds_connect_complete(conn);
+}
+
+static void rds_iwarp_cm_fill_conn_param(struct rds_connection *conn,
+			struct rdma_conn_param *conn_param,
+			struct rds_iwarp_connect_private *dp,
+			u32 protocol_version)
+{
+	memset(conn_param, 0, sizeof(struct rdma_conn_param));
+	/* XXX tune these? */
+	conn_param->responder_resources = 1;
+	conn_param->initiator_depth = 1;
+
+	if (dp) {
+		struct rds_iwarp_connection *ic = conn->c_transport_data;
+
+		memset(dp, 0, sizeof(*dp));
+		dp->dp_saddr = conn->c_laddr;
+		dp->dp_daddr = conn->c_faddr;
+		dp->dp_protocol_major = RDS_PROTOCOL_MAJOR(protocol_version);
+		dp->dp_protocol_minor = RDS_PROTOCOL_MINOR(protocol_version);
+		dp->dp_protocol_minor_mask = cpu_to_be16(RDS_IWARP_SUPPORTED_PROTOCOLS);
+		dp->dp_ack_seq = rds_iwarp_piggyb_ack(ic);
+
+		/* Advertise flow control.
+		 *
+		 * Major chicken and egg alert!
+		 * We would like to post receive buffers before we get here (eg.
+		 * in rds_iwarp_setup_qp), so that we can give the peer an accurate
+		 * credit value.
+		 * Unfortunately we can't post receive buffers until we've finished
+		 * protocol negotiation, and know in which order data and payload
+		 * are arranged.
+		 *
+		 * What we do here is we give the peer a small initial credit, and
+		 * initialize the number of posted buffers to a negative value.
+		 */
+		if (ic->i_flowctl) {
+			atomic_set(&ic->i_credits, IWARP_SET_POST_CREDITS(-4));
+			dp->dp_credit = cpu_to_be32(4);
+		}
+
+		conn_param->private_data = dp;
+		conn_param->private_data_len = sizeof(*dp);
+	}
+}
+
+static void rds_iwarp_cq_event_handler(struct ib_event *event, void *data)
+{
+	rdsdebug("event %u data %p\n", event->event, data);
+}
+
+static void rds_iwarp_qp_event_handler(struct ib_event *event, void *data)
+{
+	struct rds_connection *conn = data;
+	struct rds_iwarp_connection *ic = conn->c_transport_data;
+
+	rdsdebug("conn %p ic %p event %u\n", conn, ic, event->event);
+
+	switch (event->event) {
+		case IB_EVENT_COMM_EST:
+			rdma_notify(ic->i_cm_id, IB_EVENT_COMM_EST);
+			break;
+		default:
+			printk(KERN_WARNING "RDS/IWARP: unhandled QP event %u "
+			       "on connection to %u.%u.%u.%u\n", event->event,
+			       NIPQUAD(conn->c_faddr));
+			break;
+	}
+}
+
+/*
+ * This needs to be very careful to not leave IS_ERR pointers around for
+ * cleanup to trip over.
+ */
+static int rds_iwarp_setup_qp(struct rds_connection *conn)
+{
+	struct rds_iwarp_connection *ic = conn->c_transport_data;
+	struct ib_device *dev = ic->i_cm_id->device;
+	struct ib_qp_init_attr attr;
+	struct rds_iwarp_device *rds_iwarpdev;
+	int ret;
+
+	/* rds_iwarp_add_one creates a rds_iwarp_device object per IB device,
+	 * and allocates a protection domain, memory range and FMR pool
+	 * for each.  If that fails for any reason, it will not register
+	 * the rds_iwarpdev at all.
+	 */
+	rds_iwarpdev = ib_get_client_data(dev, &rds_iwarp_client);
+	if (rds_iwarpdev == NULL) {
+		if (printk_ratelimit())
+			printk(KERN_NOTICE "RDS/IWARP: No client_data for device %s\n",
+					dev->name);
+		return -EOPNOTSUPP;
+	}
+
+	/* Protection domain and memory range */
+	ic->i_pd = rds_iwarpdev->pd;
+	ic->i_mr = rds_iwarpdev->mr;
+
+	ic->i_send_cq = ib_create_cq(dev, rds_iwarp_send_cq_comp_handler,
+				     rds_iwarp_cq_event_handler, conn,
+				     ic->i_send_ring.w_nr + 1, 0);
+	if (IS_ERR(ic->i_send_cq)) {
+		ret = PTR_ERR(ic->i_send_cq);
+		ic->i_send_cq = NULL;
+		rdsdebug("ib_create_cq send failed: %d\n", ret);
+		goto out;
+	}
+
+	ic->i_recv_cq = ib_create_cq(dev, rds_iwarp_recv_cq_comp_handler,
+				     rds_iwarp_cq_event_handler, conn,
+				     ic->i_recv_ring.w_nr, 0);
+	if (IS_ERR(ic->i_recv_cq)) {
+		ret = PTR_ERR(ic->i_recv_cq);
+		ic->i_recv_cq = NULL;
+		rdsdebug("ib_create_cq recv failed: %d\n", ret);
+		goto out;
+	}
+
+	ret = ib_req_notify_cq(ic->i_send_cq, IB_CQ_NEXT_COMP);
+	if (ret) {
+		rdsdebug("ib_req_notify_cq send failed: %d\n", ret);
+		goto out;
+	}
+
+	ret = ib_req_notify_cq(ic->i_recv_cq, IB_CQ_SOLICITED);
+	if (ret) {
+		rdsdebug("ib_req_notify_cq recv failed: %d\n", ret);
+		goto out;
+	}
+
+	/* XXX negotiate max send/recv with remote? */
+	memset(&attr, 0, sizeof(attr));
+	attr.event_handler = rds_iwarp_qp_event_handler;
+	attr.qp_context = conn;
+	/* + 1 to allow for the single ack message */
+	attr.cap.max_send_wr = ic->i_send_ring.w_nr + 1;
+	attr.cap.max_recv_wr = ic->i_recv_ring.w_nr + 1;
+	attr.cap.max_send_sge = rds_iwarpdev->max_sge;
+	attr.cap.max_recv_sge = RDS_IWARP_RECV_SGE;
+	attr.sq_sig_type = IB_SIGNAL_REQ_WR;
+	attr.qp_type = IB_QPT_RC;
+	attr.send_cq = ic->i_send_cq;
+	attr.recv_cq = ic->i_recv_cq;
+
+	/* 
+	 * XXX this can fail if max_*_wr is too large?  Are we supposed
+	 * to back off until we get a value that the hardware can support?
+	 */
+	ret = rdma_create_qp(ic->i_cm_id, ic->i_pd, &attr);
+	if (ret) {
+		rdsdebug("rdma_create_qp failed: %d\n", ret);
+		goto out;
+	}
+
+	ic->i_send_hdrs = ib_dma_alloc_coherent(dev,
+					   ic->i_send_ring.w_nr *
+					   	sizeof(struct rds_header),
+					   &ic->i_send_hdrs_dma, GFP_KERNEL);
+	if (ic->i_send_hdrs == NULL) {
+		ret = -ENOMEM;
+		rdsdebug("ib_dma_alloc_coherent send failed\n");
+		goto out;
+	}
+
+	ic->i_recv_hdrs = ib_dma_alloc_coherent(dev,
+					   ic->i_recv_ring.w_nr *
+					   	sizeof(struct rds_header),
+					   &ic->i_recv_hdrs_dma, GFP_KERNEL);
+	if (ic->i_recv_hdrs == NULL) {
+		ret = -ENOMEM;
+		rdsdebug("ib_dma_alloc_coherent recv failed\n");
+		goto out;
+	}
+
+	ic->i_ack = ib_dma_alloc_coherent(dev, sizeof(struct rds_header),
+				       &ic->i_ack_dma, GFP_KERNEL);
+	if (ic->i_ack == NULL) {
+		ret = -ENOMEM;
+		rdsdebug("ib_dma_alloc_coherent ack failed\n");
+		goto out;
+	}
+
+	ic->i_sends = kmalloc(ic->i_send_ring.w_nr *
+				sizeof(struct rds_iwarp_send_work), GFP_KERNEL);
+	if (ic->i_sends == NULL) {
+		ret = -ENOMEM;
+		rdsdebug("send allocation failed\n");
+		goto out;
+	}
+	rds_iwarp_send_init_ring(ic);
+
+	ic->i_recvs = kmalloc(ic->i_recv_ring.w_nr *
+				sizeof(struct rds_iwarp_recv_work), GFP_KERNEL);
+	if (ic->i_recvs == NULL) {
+		ret = -ENOMEM;
+		rdsdebug("recv allocation failed\n");
+		goto out;
+	}
+
+	rds_iwarp_recv_init_ring(ic);
+	rds_iwarp_recv_init_ack(ic);
+
+	rdsdebug("conn %p pd %p mr %p cq %p %p\n", conn, ic->i_pd, ic->i_mr,
+		 ic->i_send_cq, ic->i_recv_cq);
+
+out:
+	return ret;
+}
+
+static u32 rds_iwarp_protocol_compatible(const struct rds_iwarp_connect_private *dp)
+{
+	u16 common;
+	u32 version = 0;
+
+	/* rdma_cm private data is odd - when there is any private data in the
+	 * request, we will be given a pretty large buffer without telling us the
+	 * original size. The only way to tell the difference is by looking at
+	 * the contents, which are initialized to zero.
+	 * If the protocol version fields aren't set, this is a connection attempt
+	 * from an older version. This could could be 3.0 or 2.0 - we can't tell.
+	 * We really should have changed this for OFED 1.3 :-( */
+	if (dp->dp_protocol_major == 0)
+		return RDS_PROTOCOL_3_0;
+
+	common = be16_to_cpu(dp->dp_protocol_minor_mask) & RDS_IWARP_SUPPORTED_PROTOCOLS;
+	if (dp->dp_protocol_major == 3 && common) {
+		version = RDS_PROTOCOL_3_0;
+		while ((common >>= 1) != 0)
+			version++;
+	} else if (printk_ratelimit()) {
+		printk(KERN_NOTICE "RDS: Connection from %u.%u.%u.%u using "
+			"incompatible protocol version %u.%u\n",
+			NIPQUAD(dp->dp_saddr),
+			dp->dp_protocol_major,
+			dp->dp_protocol_minor);
+	}
+	return version;
+}
+
+static int rds_iwarp_cm_handle_connect(struct rdma_cm_id *cm_id,
+				    struct rdma_cm_event *event)
+{
+	const struct rds_iwarp_connect_private *dp = event->param.conn.private_data;
+	struct rds_iwarp_connect_private dp_rep;
+	struct rds_connection *conn = NULL;
+	struct rds_iwarp_connection *ic = NULL;
+	struct rdma_conn_param conn_param;
+	struct rds_iwarp_device *rds_iwarpdev;
+	u32 version;
+	int err, destroy = 1;
+
+	/* Check whether the remote protocol version matches ours. */
+	version = rds_iwarp_protocol_compatible(dp);
+	if (!version)
+		goto out;
+
+	rdsdebug("saddr %u.%u.%u.%u daddr %u.%u.%u.%u RDSv%u.%u\n",
+		 NIPQUAD(dp->dp_saddr), NIPQUAD(dp->dp_daddr),
+		 RDS_PROTOCOL_MAJOR(version), RDS_PROTOCOL_MINOR(version));
+
+	conn = rds_conn_create(dp->dp_daddr, dp->dp_saddr, &rds_iwarp_transport,
+			       GFP_KERNEL);
+	if (IS_ERR(conn)) {
+		rdsdebug("rds_conn_create failed (%ld)\n", PTR_ERR(conn));
+		conn = NULL;
+		goto out;
+	}
+	ic = conn->c_transport_data;
+
+	rds_iwarp_set_protocol(conn, version);
+	rds_iwarp_set_flow_control(conn, be32_to_cpu(dp->dp_credit));
+
+	/* If the peer gave us the last packet it saw, process this as if
+	 * we had received a regular ACK. */
+	if (dp->dp_ack_seq)
+		rds_send_drop_acked(conn, be64_to_cpu(dp->dp_ack_seq), NULL);
+
+	/*
+	 * The connection request may occur while the
+	 * previous connection exist, e.g. in case of failover.
+	 * But as connections may be initiated simultaneously
+	 * by both hosts, we have a random backoff mechanism -
+	 * see the comment above rds_queuereconnect()
+	 */
+	if (!rds_conn_transition(conn, RDS_CONN_DOWN, RDS_CONN_CONNECTING)) {
+		if (rds_conn_state(conn) == RDS_CONN_UP) {
+			rds_iwarp_conn_error(conn, "incoming connect while connecting\n");
+			rds_iwarp_stats_inc(s_iwarp_listen_closed_stale);
+		} else
+		if (rds_conn_state(conn) == RDS_CONN_CONNECTING) {
+			/* Wait and see - our connect may still be succeeding */
+			rds_iwarp_stats_inc(s_iwarp_connect_raced);
+		}
+		goto out;
+	}
+
+	BUG_ON(cm_id->context);
+	BUG_ON(ic->i_cm_id);
+
+	ic->i_cm_id = cm_id;
+	cm_id->context = conn;
+
+ 	/* We got halfway through setting up the ib_connection, if we
+ 	 * fail now, we have to take the long route out of this mess. */
+ 	destroy = 0;
+
+ 	err = rds_iwarp_setup_qp(conn);
+ 	if (err) {
+		rds_iwarp_conn_error(conn, "rds_iwarp_setup_qp failed (%d)\n", err);
+		goto out;
+	}
+
+	rds_iwarp_cm_fill_conn_param(conn, &conn_param, &dp_rep, version);
+
+	/* rdma_accept() calls rdma_reject() internally if it fails */
+	err = rdma_accept(cm_id, &conn_param);
+	if (err) {
+		rds_iwarp_conn_error(conn, "rdma_accept failed (%d)\n", err);
+ 		goto out;
+ 	}
+
+	/* update ib_device with this local ipaddr */
+	rds_iwarpdev = ib_get_client_data(ic->i_cm_id->device, &rds_iwarp_client);
+	iwarp_update_ipaddr_for_device(rds_iwarpdev, dp->dp_saddr);
+
+ 	return 0;
+
+out:
+	rdma_reject(cm_id, NULL, 0);
+	return destroy;
+}
+
+
+static int rds_iwarp_cm_initiate_connect(struct rdma_cm_id *cm_id)
+{
+	struct rds_connection *conn = cm_id->context;
+	struct rds_iwarp_connection *ic = conn->c_transport_data;
+	struct rdma_conn_param conn_param;
+	struct rds_iwarp_connect_private dp;
+	int ret;
+
+	/* If the peer doesn't do protocol negotiation, we must
+	 * default to RDSv3.0 */
+	rds_iwarp_set_protocol(conn, RDS_PROTOCOL_3_0);
+	ic->i_flowctl = rds_iwarp_sysctl_flow_control;	/* advertise flow control */
+
+	ret = rds_iwarp_setup_qp(conn);
+	if (ret) {
+		rds_iwarp_conn_error(conn, "1rds_iwarp_setup_qp failed (%d)\n", ret);
+		goto out;
+	}
+
+	rds_iwarp_cm_fill_conn_param(conn, &conn_param, &dp, RDS_PROTOCOL_VERSION);
+
+	ret = rdma_connect(cm_id, &conn_param);
+	if (ret)
+		rds_iwarp_conn_error(conn, "rdma_connect failed (%d)\n", ret);
+
+out:
+	/* Beware - returning non-zero tells the rdma_cm to destroy
+	 * the cm_id. We should certainly not do it as long as we still
+	 * "own" the cm_id. */
+	if (ret) {
+		struct rds_iwarp_connection *ic = conn->c_transport_data;
+
+		if (ic->i_cm_id == cm_id)
+			ret = 0;
+	}
+	return ret;
+}
+
+static int rds_iwarp_cm_event_handler(struct rdma_cm_id *cm_id,
+				   struct rdma_cm_event *event)
+{
+	/* this can be null in the listening path */
+	struct rds_connection *conn = cm_id->context;
+	int ret = 0;
+
+	rdsdebug("conn %p id %p handling event %u\n", conn, cm_id,
+		 event->event);
+
+	/* Prevent shutdown from tearing down the connection
+	 * while we're executing. */
+	if (conn) {
+		mutex_lock(&conn->c_cm_lock);
+
+		/* If the connection is being shut down, bail out
+		 * right away. We return 0 so cm_id doesn't get
+		 * destroyed prematurely */
+		if (atomic_read(&conn->c_state) == RDS_CONN_DISCONNECTING) {
+			/* Reject incoming connections while we're tearing
+			 * down an existing one. */
+			if (event->event == RDMA_CM_EVENT_CONNECT_REQUEST)
+				ret = 1;
+			goto out;
+		}
+	}
+
+	switch (event->event) {
+		case RDMA_CM_EVENT_CONNECT_REQUEST:
+			ret = rds_iwarp_cm_handle_connect(cm_id, event);
+			break;
+
+		case RDMA_CM_EVENT_ADDR_RESOLVED:
+			/* XXX do we need to clean up if this fails? */
+			ret = rdma_resolve_route(cm_id,
+						 RDS_IWARP_RESOLVE_TIMEOUT_MS);
+			break;
+
+		case RDMA_CM_EVENT_ROUTE_RESOLVED:
+			/* XXX worry about racing with listen acceptance */
+			ret = rds_iwarp_cm_initiate_connect(cm_id);
+			break;
+
+		case RDMA_CM_EVENT_ESTABLISHED:
+			rds_iwarp_connect_complete(conn, event);
+			break;
+
+		case RDMA_CM_EVENT_ADDR_ERROR:
+		case RDMA_CM_EVENT_ROUTE_ERROR:
+		case RDMA_CM_EVENT_CONNECT_ERROR:
+		case RDMA_CM_EVENT_UNREACHABLE:
+		case RDMA_CM_EVENT_REJECTED:
+		case RDMA_CM_EVENT_DEVICE_REMOVAL:
+			if (conn)
+				rds_conn_drop(conn);
+			break;
+
+		case RDMA_CM_EVENT_DISCONNECTED:
+			rds_conn_drop(conn);
+			break;
+
+		default:
+			/* things like device disconnect? */
+			printk(KERN_ERR "unknown event %u\n", event->event);
+			BUG();
+			break;
+	}
+
+out:
+	if (conn) {
+		struct rds_iwarp_connection *ic = conn->c_transport_data;
+
+		/* If we return non-zero, we must to hang on to the cm_id */
+		BUG_ON(ic->i_cm_id == cm_id && ret);
+
+		mutex_unlock(&conn->c_cm_lock);
+	}
+
+	rdsdebug("id %p event %u handling ret %d\n", cm_id, event->event, ret);
+
+	return ret;
+}
+
+int rds_iwarp_conn_connect(struct rds_connection *conn)
+{
+	struct rds_iwarp_connection *ic = conn->c_transport_data;
+	struct sockaddr_in src, dest;
+	int ret;
+
+	/* XXX I wonder what affect the port space has */
+	ic->i_cm_id = rdma_create_id(rds_iwarp_cm_event_handler, conn,
+				     RDMA_PS_TCP);
+	if (IS_ERR(ic->i_cm_id)) {
+		ret = PTR_ERR(ic->i_cm_id);
+		ic->i_cm_id = NULL;
+		rdsdebug("rdma_create_id() failed: %d\n", ret);
+		goto out;
+	} 
+
+	rdsdebug("created cm id %p for conn %p\n", ic->i_cm_id, conn);
+
+	src.sin_family = AF_INET;
+	src.sin_addr.s_addr = (__force u32)conn->c_laddr;
+	src.sin_port = (__force u16)htons(0);
+
+	dest.sin_family = AF_INET;
+	dest.sin_addr.s_addr = (__force u32)conn->c_faddr;
+	dest.sin_port = (__force u16)htons(RDS_IWARP_PORT);
+
+	ret = rdma_resolve_addr(ic->i_cm_id, (struct sockaddr *)&src,
+				(struct sockaddr *)&dest,
+				RDS_IWARP_RESOLVE_TIMEOUT_MS);
+	if (ret) {
+		rdsdebug("addr resolve failed for cm id %p: %d\n", ic->i_cm_id,
+			 ret);
+		rdma_destroy_id(ic->i_cm_id);
+		ic->i_cm_id = NULL;
+	}
+
+out:
+	return ret;
+}
+
+/*
+ * This is so careful about only cleaning up resources that were built up
+ * so that it can be called at any point during startup.  In fact it
+ * can be called multiple times for a given connection.
+ */
+void rds_iwarp_conn_shutdown(struct rds_connection *conn)
+{
+	struct rds_iwarp_connection *ic = conn->c_transport_data;
+
+	rdsdebug("cm %p pd %p cq %p %p qp %p\n", ic->i_cm_id,
+		 ic->i_pd, ic->i_send_cq, ic->i_recv_cq, 
+		 ic->i_cm_id ? ic->i_cm_id->qp : NULL);
+
+	if (ic->i_cm_id) {
+		int err;
+		struct ib_device *dev = ic->i_cm_id->device;
+
+		rdsdebug("disconnecting cm %p\n", ic->i_cm_id);
+		err = rdma_disconnect(ic->i_cm_id);
+		if (err) {
+			/* Actually this may happen quite frequently, when
+			 * an outgoing connect raced with an incoming connect.
+			 */
+			printk(KERN_DEBUG "rds_iwarp_conn_shutdown: failed to disconnect,"
+				   " cm: %p err %d\n", ic->i_cm_id, err);
+		}
+
+		/* NOTE: For iWARP, it is unnecessary to move the QP to error state */
+
+		wait_event(rds_iwarp_ring_empty_wait,
+			rds_iwarp_ring_empty(&ic->i_send_ring) &&
+			rds_iwarp_ring_empty(&ic->i_recv_ring));
+
+		if (ic->i_send_hdrs)
+			ib_dma_free_coherent(dev,
+					   ic->i_send_ring.w_nr *
+					   	sizeof(struct rds_header),
+					   ic->i_send_hdrs,
+					   ic->i_send_hdrs_dma);
+
+		if (ic->i_recv_hdrs)
+			ib_dma_free_coherent(dev,
+					   ic->i_recv_ring.w_nr *
+					   	sizeof(struct rds_header),
+					   ic->i_recv_hdrs,
+					   ic->i_recv_hdrs_dma);
+
+		if (ic->i_ack)
+			ib_dma_free_coherent(dev, sizeof(struct rds_header),
+					     ic->i_ack, ic->i_ack_dma);
+
+		if (ic->i_sends)
+			rds_iwarp_send_clear_ring(ic);
+		if (ic->i_recvs)
+			rds_iwarp_recv_clear_ring(ic);
+
+		if (ic->i_cm_id->qp)
+			rdma_destroy_qp(ic->i_cm_id);
+		if (ic->i_send_cq)
+			ib_destroy_cq(ic->i_send_cq);
+		if (ic->i_recv_cq)
+			ib_destroy_cq(ic->i_recv_cq);
+		rdma_destroy_id(ic->i_cm_id);
+		ic->i_cm_id = NULL;
+		ic->i_pd = NULL;
+                ic->i_mr = NULL;
+		ic->i_send_cq = NULL;
+		ic->i_recv_cq = NULL;
+		ic->i_send_hdrs = NULL;
+		ic->i_recv_hdrs = NULL;
+		ic->i_ack = NULL;
+	}
+
+	/* Clear pending transmit */
+	if (ic->i_rm) {
+		rds_message_put(ic->i_rm);
+		ic->i_rm = NULL;
+	}
+
+	/* Clear the ACK state */
+	clear_bit(IWARP_ACK_IN_FLIGHT, &ic->i_ack_flags);
+#ifdef KERNEL_HAS_ATOMIC64
+	atomic64_set(&ic->i_ack_next, 0);
+#else
+	ic->i_ack_next = 0;
+#endif
+	ic->i_ack_recv = 0;
+
+	/* Clear flow control state */
+	ic->i_flowctl = 0;
+	atomic_set(&ic->i_credits, 0);
+
+	if (ic->i_iwarpinc) {
+		rds_inc_put(&ic->i_iwarpinc->ii_inc);
+		ic->i_iwarpinc = NULL;
+	}
+
+	kfree(ic->i_sends);
+	ic->i_sends = NULL;
+	kfree(ic->i_recvs);
+	ic->i_recvs = NULL;
+}
+
+int rds_iwarp_conn_alloc(struct rds_connection *conn, gfp_t gfp)
+{
+	struct rds_iwarp_connection *ic;
+
+	/* XXX too lazy? */
+	ic = kzalloc(sizeof(struct rds_iwarp_connection), GFP_KERNEL);
+	if (ic == NULL)
+		return -ENOMEM;
+
+	mutex_init(&ic->i_recv_mutex);
+#ifndef KERNEL_HAS_ATOMIC64
+	spin_lock_init(&ic->i_ack_lock);
+#endif
+
+	/* 
+	 * rds_iwarp_conn_shutdown() waits for these to be emptied so they
+	 * must be initialized before it can be called.
+	 */
+	rds_iwarp_ring_init(&ic->i_send_ring, rds_iwarp_sysctl_max_send_wr);
+	rds_iwarp_ring_init(&ic->i_recv_ring, rds_iwarp_sysctl_max_recv_wr);
+
+	conn->c_transport_data = ic;
+
+	rdsdebug("conn %p conn ic %p\n", conn, conn->c_transport_data);
+	return 0;
+}
+
+void rds_iwarp_conn_free(void *arg)
+{
+	struct rds_iwarp_connection *ic = arg;
+	rdsdebug("ic %p\n", ic);
+	kfree(ic);
+}
+
+int __init rds_iwarp_listen_init(void)
+{
+	struct sockaddr_in sin;
+	struct rdma_cm_id *cm_id;
+	int ret;
+
+	cm_id = rdma_create_id(rds_iwarp_cm_event_handler, NULL, RDMA_PS_TCP);
+	if (IS_ERR(cm_id)) {
+		ret = PTR_ERR(cm_id);
+		printk(KERN_ERR "RDS/IWARP: failed to setup listener, "
+		       "rdma_create_id() returned %d\n", ret);
+		goto out;
+	}
+
+	sin.sin_family = PF_INET,
+	sin.sin_addr.s_addr = (__force u32)htonl(INADDR_ANY);
+	sin.sin_port = (__force u16)htons(RDS_IWARP_PORT);
+
+	/*
+	 * XXX I bet this binds the cm_id to a device.  If we want to support
+	 * fail-over we'll have to take this into consideration.
+	 */
+	ret = rdma_bind_addr(cm_id, (struct sockaddr *)&sin);
+	if (ret) {
+		printk(KERN_ERR "RDS/IWARP: failed to setup listener, "
+		       "rdma_bind_addr() returned %d\n", ret);
+		goto out;
+	}
+
+	ret = rdma_listen(cm_id, 128);
+	if (ret) {
+		printk(KERN_ERR "RDS/WARP: failed to setup listener, "
+		       "rdma_listen() returned %d\n", ret);
+		goto out;
+	}
+
+	rdsdebug("cm %p listening on port %u\n", cm_id, RDS_IWARP_PORT);
+
+	rds_iwarp_listen_id = cm_id;
+	cm_id = NULL;
+out:
+	if (cm_id)
+		rdma_destroy_id(cm_id);
+	return ret;
+}
+
+void rds_iwarp_listen_stop(void)
+{
+	if (rds_iwarp_listen_id) {
+		rdsdebug("cm %p\n", rds_iwarp_listen_id);
+		rdma_destroy_id(rds_iwarp_listen_id);
+		rds_iwarp_listen_id = NULL;
+	}
+}
+
+/*
+ * An error occurred on the connection
+ */
+void
+__rds_iwarp_conn_error(struct rds_connection *conn, const char *fmt, ...)
+{
+	va_list ap;
+
+	rds_conn_drop(conn);
+
+	va_start(ap, fmt);
+	vprintk(fmt, ap);
+	va_end(ap);
+}
diff --git a/net/rds/iwarp_rdma.c b/net/rds/iwarp_rdma.c
new file mode 100644
index 0000000..93b7d2b
--- /dev/null
+++ b/net/rds/iwarp_rdma.c
@@ -0,0 +1,572 @@
+/*
+ * Copyright (c) 2008 Chelsio, Inc.  All rights reserved.
+ *
+ * This software is available to you under a choice of one of two
+ * licenses.  You may choose to be licensed under the terms of the GNU
+ * General Public License (GPL) Version 2, available from the file
+ * COPYING in the main directory of this source tree, or the
+ * OpenIB.org BSD license below:
+ *
+ *	 Redistribution and use in source and binary forms, with or
+ *	 without modification, are permitted provided that the following
+ *	 conditions are met:
+ *
+ *	  - Redistributions of source code must retain the above
+ *		copyright notice, this list of conditions and the following
+ *		disclaimer.
+ *
+ *	  - Redistributions in binary form must reproduce the above
+ *		copyright notice, this list of conditions and the following
+ *		disclaimer in the documentation and/or other materials
+ *		provided with the distribution.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
+ * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
+ * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
+ * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
+ * BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
+ * ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
+ * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ *
+ */
+#include <linux/kernel.h>
+
+#include "rds.h"
+#include "rdma.h"
+#include "iwarp.h"
+
+
+extern struct list_head rds_iwarp_devices;
+
+/*
+ * This is stored as mr->r_trans_private.
+ */
+struct rds_iwarp_mr {
+	struct rds_iwarp_device	*device;
+	struct rds_iwarp_mr_pool	*pool;
+	struct ib_fast_reg_page_list	*page_list;
+	struct ib_mr 		*fastreg_mr;
+	struct list_head	list;
+	unsigned int		remap_count;
+	struct scatterlist *	sg;
+	unsigned int		sg_len;
+	u64 *			dma;
+	int			sg_dma_len;
+	u8			rkey_cnt;
+	struct ib_send_wr	invalidate_wr;
+	struct ib_send_wr	fastreg_wr;
+};
+
+/*
+ * Our own little FMR pool
+ */
+struct rds_iwarp_mr_pool {
+	struct mutex		flush_lock;		/* serialize fmr invalidate */
+	struct work_struct	flush_worker;		/* flush worker */
+
+	spinlock_t		list_lock;		/* protect variables below */
+	atomic_t		item_count;		/* total # of MRs */
+	atomic_t		dirty_count;		/* # dirty of MRs */
+	struct list_head	drop_list;		/* MRs that have reached their max_maps limit */
+	struct list_head	free_list;		/* unused MRs */
+	struct list_head	clean_list;		/* unused & unamapped MRs */
+	atomic_t		free_pinned;		/* memory pinned by free MRs */
+	unsigned long		max_items;
+	unsigned long		max_items_soft;
+	unsigned long		max_free_pinned;
+	struct ib_fmr_attr	fmr_attr;
+};
+
+static int rds_iwarp_flush_mr_pool(struct rds_iwarp_mr_pool *pool, int free_all);
+static void rds_iwarp_teardown_mr(struct rds_iwarp_mr *ibmr);
+static void rds_iwarp_mr_pool_flush_worker(struct work_struct *work);
+
+struct rds_iwarp_device* iwarp_get_device(__be32 ipaddr)
+{
+	struct rds_iwarp_device *rds_iwarpdev;
+	struct rds_iwarp_ipaddr *i_ipaddr;
+
+	list_for_each_entry(rds_iwarpdev, &rds_iwarp_devices, list) {
+		spin_lock_irq(&rds_iwarpdev->spinlock);
+		list_for_each_entry(i_ipaddr, &rds_iwarpdev->ipaddr_list, list) {
+			if (i_ipaddr->ipaddr == ipaddr) {
+				spin_unlock_irq(&rds_iwarpdev->spinlock);
+				return rds_iwarpdev;
+			}
+		}
+		spin_unlock_irq(&rds_iwarpdev->spinlock);
+	}
+
+	return NULL;
+}
+
+struct rds_iwarp_mr_pool *rds_iwarp_create_mr_pool(struct rds_iwarp_device *rds_iwarpdev)
+{
+	struct rds_iwarp_mr_pool *pool;
+
+	pool = kzalloc(sizeof(*pool), GFP_KERNEL);
+	if (!pool)
+		return ERR_PTR(-ENOMEM);
+
+	INIT_LIST_HEAD(&pool->free_list);
+	INIT_LIST_HEAD(&pool->drop_list);
+	INIT_LIST_HEAD(&pool->clean_list);
+	mutex_init(&pool->flush_lock);
+	spin_lock_init(&pool->list_lock);
+	INIT_WORK(&pool->flush_worker, rds_iwarp_mr_pool_flush_worker);
+
+	/* We never allow more than max_items MRs to be allocated.
+	 * When we exceed more than max_items_soft, we start freeing
+	 * items more aggressively.
+	 * Make sure that max_items > max_items_soft > max_items / 2
+	 */
+	pool->max_items_soft = rds_iwarpdev->max_fmrs * 3 / 4;
+	pool->max_items = rds_iwarpdev->max_fmrs;
+
+	return pool;
+}
+
+void rds_iwarp_destroy_mr_pool(struct rds_iwarp_mr_pool *pool)
+{
+	flush_workqueue(rds_wq);
+	rds_iwarp_flush_mr_pool(pool, 1);
+	BUG_ON(atomic_read(&pool->item_count));
+	BUG_ON(atomic_read(&pool->free_pinned));
+	kfree(pool);
+}
+
+static inline struct rds_iwarp_mr *rds_iwarp_reuse_fmr(struct rds_iwarp_mr_pool *pool)
+{
+	struct rds_iwarp_mr *ibmr = NULL;
+	unsigned long flags;
+
+	spin_lock_irqsave(&pool->list_lock, flags);
+	if (!list_empty(&pool->clean_list)) {
+		ibmr = list_entry(pool->clean_list.next, struct rds_iwarp_mr, list);
+		list_del_init(&ibmr->list);
+	}
+	spin_unlock_irqrestore(&pool->list_lock, flags);
+
+	return ibmr;
+}
+
+static struct rds_iwarp_mr *rds_iwarp_alloc_fmr(struct rds_iwarp_device *rds_iwarpdev)
+{
+	struct rds_iwarp_mr_pool *pool = rds_iwarpdev->mr_pool;
+	struct rds_iwarp_mr *ibmr = NULL;
+	int err = 0, iter = 0;
+
+	while (1) {
+		if ((ibmr = rds_iwarp_reuse_fmr(pool)) != NULL)
+			return ibmr;
+
+		/* No clean MRs - now we have the choice of either
+		 * allocating a fresh MR up to the limit imposed by the
+		 * driver, or flush any dirty unused MRs.
+		 * We try to avoid stalling in the send path if possible,
+		 * so we allocate as long as we're allowed to.
+		 *
+		 * We're fussy with enforcing the FMR limit, though. If the driver
+		 * tells us we can't use more than N fmrs, we shouldn't start
+		 * arguing with it */
+		if (atomic_inc_return(&pool->item_count) <= pool->max_items)
+			break;
+
+		atomic_dec(&pool->item_count);
+
+		if (++iter > 2)
+			return ERR_PTR(-EAGAIN);
+
+		/* We do have some empty MRs. Flush them out. */
+		rds_iwarp_stats_inc(s_iwarp_rdma_mr_pool_wait);
+		rds_iwarp_flush_mr_pool(pool, 0);
+	}
+
+	ibmr = kzalloc(sizeof(*ibmr), GFP_KERNEL);
+	if (!ibmr) {
+		err = -ENOMEM;
+		goto out_no_cigar;
+	}
+
+	ibmr->page_list = ib_alloc_fast_reg_page_list(rds_iwarpdev->pd->device, fast_mr_message_size);
+	if (IS_ERR(ibmr->page_list)) {
+		err = PTR_ERR(ibmr->page_list);
+		ibmr->page_list = NULL;
+		printk(KERN_WARNING "RDS/IWARP: ib_alloc_fast_reg_page_list failed (err=%d)\n", err);
+		goto out_no_cigar;
+	}
+
+	ibmr->fastreg_mr = ib_alloc_fast_reg_mr(rds_iwarpdev->pd, ibmr->page_list->max_page_list_len);
+	if (IS_ERR(ibmr->page_list)) {
+		err = PTR_ERR(ibmr->fastreg_mr);
+		ibmr->fastreg_mr = NULL;
+		printk(KERN_WARNING "RDS/IWARP: ib_alloc_fast_reg_mr failed (err=%d)\n", err);
+		goto out_no_cigar;
+	}
+
+	ibmr->rkey_cnt = 0;
+
+	rds_iwarp_stats_inc(s_iwarp_rdma_mr_alloc);
+	return ibmr;
+
+out_no_cigar:
+	if (ibmr) {
+		if (ibmr->fastreg_mr)
+			ib_dereg_mr(ibmr->fastreg_mr);
+		kfree(ibmr);
+	}
+	atomic_dec(&pool->item_count);
+	return ERR_PTR(err);
+}
+
+static int rds_iwarp_map_fmr(struct rds_iwarp_device *rds_iwarpdev, struct rds_iwarp_mr *ibmr,
+		   struct scatterlist *sg, unsigned int nents, struct ib_qp *qp)
+{
+	struct ib_device *dev = rds_iwarpdev->dev;
+	struct scatterlist *scat = sg;
+	u32 len;
+	int page_cnt, sg_dma_len;
+	int i, j;
+	int ret;
+	struct ib_send_wr *bad_wr;
+	struct ib_mr *mr;
+
+	sg_dma_len = ib_dma_map_sg(dev, sg, nents,
+				 DMA_BIDIRECTIONAL);
+	if (unlikely(!sg_dma_len)) {
+			printk(KERN_WARNING "RDS/IWARP: dma_map_sg failed!\n");
+		return -EBUSY;
+	}
+
+	len = 0;
+	page_cnt = 0;
+
+	for (i = 0; i < sg_dma_len; ++i) {
+		unsigned int dma_len = ib_sg_dma_len(dev, &scat[i]);
+		u64 dma_addr = ib_sg_dma_address(dev, &scat[i]);
+	
+		if (dma_addr & ~rds_iwarpdev->fmr_page_mask) {
+			if (i > 0)
+				return -EINVAL;
+			else
+				++page_cnt;
+		}
+		if ((dma_addr + dma_len) & ~rds_iwarpdev->fmr_page_mask) {
+			if (i < sg_dma_len - 1)
+				return -EINVAL;
+			else
+				++page_cnt;
+		}
+
+		len += dma_len;
+	}
+
+	page_cnt += len >> rds_iwarpdev->fmr_page_shift;
+	if (page_cnt > fast_mr_message_size)
+		return -EINVAL;
+
+	/*
+	 * Perform 2 WRs for the fast_reg_mr's and Chain them together.  The
+	 * first WR is used to invalidate the old rkey, and the second WR is
+	 * used to define the new fast_reg_mr request.  Each individual page
+	 * in the sg list is added to the fast reg page list and placed
+	 * inside the fast_reg_mr WR.  The key used is a rolling 8bit
+	 * counter, which should guarantee uniqueness.
+	 */
+	mr = ibmr->fastreg_mr;
+	memset(&ibmr->invalidate_wr, 0, sizeof(ibmr->invalidate_wr));
+	ibmr->invalidate_wr.ex.invalidate_rkey = mr->rkey;
+	ibmr->invalidate_wr.opcode = IB_WR_LOCAL_INV;
+
+	ib_update_fast_reg_key(mr, ibmr->rkey_cnt++);
+	memset(&ibmr->fastreg_wr, 0, sizeof(ibmr->fastreg_wr));
+	ibmr->fastreg_wr.opcode = IB_WR_FAST_REG_MR;
+	ibmr->fastreg_wr.wr.fast_reg.length = len;
+	ibmr->fastreg_wr.wr.fast_reg.rkey = mr->rkey;
+	ibmr->fastreg_wr.wr.fast_reg.page_list = ibmr->page_list;
+	ibmr->fastreg_wr.wr.fast_reg.page_list_len = sg_dma_len; 
+	ibmr->fastreg_wr.wr.fast_reg.page_shift = rds_iwarpdev->fmr_page_shift;
+
+	ibmr->invalidate_wr.next = &ibmr->fastreg_wr;
+
+	ibmr->fastreg_wr.wr.fast_reg.access_flags = IB_ACCESS_LOCAL_WRITE |
+						    IB_ACCESS_REMOTE_READ |
+						    IB_ACCESS_REMOTE_WRITE;
+	ibmr->fastreg_wr.wr.fast_reg.iova_start = 0;
+
+	rdsdebug("%d old rkey = 0x%x....new rkey = 0x%x\n",
+		 ibmr->rkey_cnt,
+		 ibmr->invalidate_wr.ex.invalidate_rkey,
+		 ibmr->fastreg_wr.wr.fast_reg.rkey);
+	rdsdebug("page shift = %d length = %d flags 0x%x\n",
+		 ibmr->fastreg_wr.wr.fast_reg.page_shift,
+		 ibmr->fastreg_wr.wr.fast_reg.length,
+		 ibmr->fastreg_wr.wr.fast_reg.access_flags);
+	rdsdebug("page list %#lx page list len %d\n",
+		 ibmr->fastreg_wr.wr.fast_reg.page_list,
+		 ibmr->fastreg_wr.wr.fast_reg.page_list_len);
+	rdsdebug("iova start %lx\n", ibmr->fastreg_wr.wr.fast_reg.iova_start);
+
+	page_cnt = 0;
+	for (i = 0; i < sg_dma_len; ++i) {
+		unsigned int dma_len = ib_sg_dma_len(dev, &scat[i]);
+		u64 dma_addr = ib_sg_dma_address(dev, &scat[i]);
+	
+		for (j = 0; j < dma_len; j += rds_iwarpdev->fmr_page_size) {
+			ibmr->page_list->page_list[page_cnt++] =
+				 (dma_addr & rds_iwarpdev->fmr_page_mask) + j;
+			rdsdebug("page list entry %lx\n",
+				 ibmr->page_list->page_list[page_cnt-1]);
+		}
+	}
+
+	if (qp) {
+		ret = ib_post_send(qp, &ibmr->fastreg_wr, &bad_wr);
+		if (ret) {
+			printk(KERN_ERR "post send error %d\n", ret);
+			return -EINVAL;
+		}
+	}
+
+	ibmr->sg = scat;
+	ibmr->sg_len = nents;
+	ibmr->sg_dma_len = sg_dma_len;
+	ibmr->remap_count++;
+
+	rds_iwarp_stats_inc(s_iwarp_rdma_mr_used);
+	ret = 0;
+
+	return ret;
+}
+
+void rds_iwarp_sync_mr(void *trans_private, int direction)
+{
+	struct rds_iwarp_mr *ibmr = trans_private;
+	struct rds_iwarp_device *rds_iwarpdev = ibmr->device;
+
+	switch (direction) {
+	case DMA_FROM_DEVICE:
+		ib_dma_sync_sg_for_cpu(rds_iwarpdev->dev, ibmr->sg,
+			ibmr->sg_dma_len, DMA_BIDIRECTIONAL);
+		break;
+	case DMA_TO_DEVICE:
+		ib_dma_sync_sg_for_device(rds_iwarpdev->dev, ibmr->sg,
+			ibmr->sg_dma_len, DMA_BIDIRECTIONAL);
+		break;
+	}
+}
+
+static void __rds_iwarp_teardown_mr(struct rds_iwarp_mr *ibmr)
+{
+	struct rds_iwarp_device *rds_iwarpdev = ibmr->device;
+
+	if (ibmr->sg_dma_len) {
+		ib_dma_unmap_sg(rds_iwarpdev->dev,
+				ibmr->sg, ibmr->sg_len,
+				DMA_BIDIRECTIONAL);
+		ibmr->sg_dma_len = 0;
+	}
+
+	/* Release the s/g list */
+	if (ibmr->sg_len) {
+		unsigned int i;
+
+		for (i = 0; i < ibmr->sg_len; ++i) {
+			struct page *page = sg_page(&ibmr->sg[i]);
+
+			/* FIXME we need a way to tell a r/w MR
+			 * from a r/o MR */
+			set_page_dirty(page);
+			put_page(page);
+		}
+		kfree(ibmr->sg);
+
+		ibmr->sg = NULL;
+		ibmr->sg_len = 0;
+	}
+}
+
+void rds_iwarp_teardown_mr(struct rds_iwarp_mr *ibmr)
+{
+	unsigned int pinned = ibmr->sg_len;
+
+	__rds_iwarp_teardown_mr(ibmr);
+	if (pinned) {
+		struct rds_iwarp_device *rds_iwarpdev = ibmr->device;
+		struct rds_iwarp_mr_pool *pool = rds_iwarpdev->mr_pool;
+
+		atomic_sub(pinned, &pool->free_pinned);
+	}
+}
+
+static inline unsigned int rds_iwarp_flush_goal(struct rds_iwarp_mr_pool *pool, int free_all)
+{
+	unsigned int item_count;
+
+	item_count = atomic_read(&pool->item_count);
+	if (free_all)
+		return item_count;
+#if 0
+	if (item_count > pool->max_items_soft)
+		return item_count - pool->max_items / 2;
+#endif
+	return 0;
+}
+
+/*
+ * Flush our pool of MRs.
+ * At a minimum, all currently unused MRs are unmapped.
+ * If the number of MRs allocated exceeds the limit, we also try
+ * to free as many MRs as needed to get back to this limit.
+ */
+int rds_iwarp_flush_mr_pool(struct rds_iwarp_mr_pool *pool, int free_all)
+{
+	struct rds_iwarp_mr *ibmr, *next;
+	LIST_HEAD(unmap_list);
+	LIST_HEAD(fmr_list);
+	unsigned long unpinned = 0;
+	unsigned long flags;
+	unsigned int nfreed = 0, ncleaned = 0, free_goal;
+	int ret = 0;
+
+	rds_iwarp_stats_inc(s_iwarp_rdma_mr_pool_flush);
+
+	mutex_lock(&pool->flush_lock);
+
+	spin_lock_irqsave(&pool->list_lock, flags);
+	/* Get the list of all MRs to be dropped. Ordering matters -
+	 * we want to put drop_list ahead of free_list. */
+	list_splice_init(&pool->free_list, &unmap_list);
+	list_splice_init(&pool->drop_list, &unmap_list);
+	if (free_all)
+		list_splice_init(&pool->clean_list, &unmap_list);
+	spin_unlock_irqrestore(&pool->list_lock, flags);
+
+	free_goal = rds_iwarp_flush_goal(pool, free_all);
+
+	if (list_empty(&unmap_list))
+		goto out;
+
+	ib_dereg_mr(ibmr->fastreg_mr);
+
+	/* Now we can destroy the DMA mapping and unpin any pages */
+	list_for_each_entry_safe(ibmr, next, &unmap_list, list) {
+		unpinned += ibmr->sg_len;
+		__rds_iwarp_teardown_mr(ibmr);
+		if (nfreed < free_goal || ibmr->remap_count >= pool->fmr_attr.max_maps) {
+			rds_iwarp_stats_inc(s_iwarp_rdma_mr_free);
+			list_del(&ibmr->list);
+			kfree(ibmr);
+			nfreed++;
+		}
+		ncleaned++;
+	}
+
+	spin_lock_irqsave(&pool->list_lock, flags);
+	list_splice(&unmap_list, &pool->clean_list);
+	spin_unlock_irqrestore(&pool->list_lock, flags);
+
+	atomic_sub(unpinned, &pool->free_pinned);
+	atomic_sub(ncleaned, &pool->dirty_count);
+	atomic_sub(nfreed, &pool->item_count);
+
+out:
+	mutex_unlock(&pool->flush_lock);
+	return ret;
+}
+
+void rds_iwarp_mr_pool_flush_worker(struct work_struct *work)
+{
+	struct rds_iwarp_mr_pool *pool = container_of(work, struct rds_iwarp_mr_pool, flush_worker);
+
+	rds_iwarp_flush_mr_pool(pool, 0);
+}
+
+void rds_iwarp_free_mr(void *trans_private, int invalidate)
+{
+	struct rds_iwarp_mr *ibmr = trans_private;
+	struct rds_iwarp_device *rds_iwarpdev = ibmr->device;
+	struct rds_iwarp_mr_pool *pool = rds_iwarpdev->mr_pool;
+	unsigned long flags;
+
+	rdsdebug("RDS/IWARP: free_mr nents %u\n", ibmr->sg_len);
+
+	/* Return it to the pool's free list */
+	spin_lock_irqsave(&pool->list_lock, flags);
+	if (ibmr->remap_count >= pool->fmr_attr.max_maps) {
+		list_add(&ibmr->list, &pool->drop_list);
+	} else {
+		list_add(&ibmr->list, &pool->free_list);
+	}
+	atomic_add(ibmr->sg_len, &pool->free_pinned);
+	atomic_inc(&pool->dirty_count);
+	spin_unlock_irqrestore(&pool->list_lock, flags);
+
+	/* If we've pinned too many pages, request a flush */
+	if (atomic_read(&pool->free_pinned) >= pool->max_free_pinned
+	 || atomic_read(&pool->dirty_count) >= pool->max_items / 10)
+		queue_work(rds_wq, &pool->flush_worker);
+
+	if (invalidate) {
+		if (likely(!in_interrupt())) {
+			rds_iwarp_flush_mr_pool(pool, 0);
+		} else {
+			/* We get here if the user created a MR marked
+			 * as use_once and invalidate at the same time. */
+			queue_work(rds_wq, &pool->flush_worker);
+		}
+	}
+}
+
+void rds_iwarp_flush_mrs(void)
+{
+	struct rds_iwarp_device *rds_iwarpdev;
+
+	list_for_each_entry(rds_iwarpdev, &rds_iwarp_devices, list) {
+		struct rds_iwarp_mr_pool *pool = rds_iwarpdev->mr_pool;
+
+		if (pool)
+			rds_iwarp_flush_mr_pool(pool, 0);
+	}
+}
+
+void *rds_iwarp_get_mr(struct scatterlist *sg, unsigned long nents,
+			struct rds_sock *rs, u32 *key_ret)
+{
+	struct rds_iwarp_device *rds_iwarpdev;
+	struct rds_iwarp_mr *ibmr = NULL;
+	__be32 ip_addr = rs->rs_bound_addr;
+	int ret;
+
+	rds_iwarpdev = iwarp_get_device(ip_addr);
+	if (!rds_iwarpdev) {
+		ret = -ENODEV;
+		goto out;
+	}
+
+	if (!rds_iwarpdev->mr_pool) {
+		ret = -ENODEV;
+		goto out;
+	}
+
+	ibmr = rds_iwarp_alloc_fmr(rds_iwarpdev);
+	if (IS_ERR(ibmr))
+		return ibmr;
+
+	ret = rds_iwarp_map_fmr(rds_iwarpdev, ibmr, sg, nents, ((struct rds_iwarp_connection *)rs->rs_conn->c_transport_data)->i_cm_id->qp);
+	if (ret == 0)
+		*key_ret = ibmr->fastreg_mr->rkey;
+	else
+		printk(KERN_WARNING "RDS/IWARP: map_fmr failed (errno=%d)\n", ret);
+
+	ibmr->device = rds_iwarpdev;
+
+ out:   
+	if (ret) {
+		if (ibmr) 
+			rds_iwarp_free_mr(ibmr, 0);
+		ibmr = ERR_PTR(ret);
+	}
+	return ibmr;
+}
diff --git a/net/rds/iwarp_rds.h b/net/rds/iwarp_rds.h
new file mode 100644
index 0000000..fe11c92
--- /dev/null
+++ b/net/rds/iwarp_rds.h
@@ -0,0 +1,240 @@
+/*
+ * Copyright (c) 2008 Chelsio, Inc.  All rights reserved.
+ *
+ * This software is available to you under a choice of one of two
+ * licenses.  You may choose to be licensed under the terms of the GNU
+ * General Public License (GPL) Version 2, available from the file
+ * COPYING in the main directory of this source tree, or the
+ * OpenIB.org BSD license below:
+ *
+ *     Redistribution and use in source and binary forms, with or
+ *     without modification, are permitted provided that the following
+ *     conditions are met:
+ *
+ *      - Redistributions of source code must retain the above
+ *        copyright notice, this list of conditions and the following
+ *        disclaimer.
+ *
+ *      - Redistributions in binary form must reproduce the above
+ *        copyright notice, this list of conditions and the following
+ *        disclaimer in the documentation and/or other materials
+ *        provided with the distribution.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
+ * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
+ * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
+ * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
+ * BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
+ * ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
+ * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ *
+ */
+
+#ifndef IB_RDS_H
+#define IB_RDS_H
+
+#include <linux/types.h>
+
+/* These sparse annotated types shouldn't be in any user
+ * visible header file. We should clean this up rather
+ * than kludging around them. */
+#ifndef __KERNEL__
+#define __be16	u_int16_t
+#define __be32	u_int32_t
+#define __be64	u_int64_t
+#endif
+
+#define RDS_IB_ABI_VERSION		0x301
+
+/*
+ * setsockopt/getsockopt for SOL_RDS
+ */
+#define RDS_CANCEL_SENT_TO      	1
+#define RDS_GET_MR			2
+#define RDS_FREE_MR			3
+/* deprecated: RDS_BARRIER 4 */
+#define RDS_RECVERR			5
+#define RDS_CONG_MONITOR		6
+
+/*
+ * Control message types for SOL_RDS.
+ *
+ * CMSG_RDMA_ARGS (sendmsg)
+ *	Request a RDMA transfer to/from the specified
+ *	memory ranges.
+ *	The cmsg_data is a struct rds_rdma_args.
+ * RDS_CMSG_RDMA_DEST (recvmsg, sendmsg)
+ *	Kernel informs application about intended
+ *	source/destination of a RDMA transfer
+ * RDS_CMSG_RDMA_MAP (sendmsg)
+ *	Application asks kernel to map the given
+ *	memory range into a IB MR, and send the
+ *	R_Key along in an RDS extension header.
+ *	The cmsg_data is a struct rds_get_mr_args,
+ *	the same as for the GET_MR setsockopt.
+ * RDS_CMSG_RDMA_STATUS (recvmsg)
+ *	Returns the status of a completed RDMA operation.
+ */
+#define RDS_CMSG_RDMA_ARGS		1
+#define RDS_CMSG_RDMA_DEST		2
+#define RDS_CMSG_RDMA_MAP		3
+#define RDS_CMSG_RDMA_STATUS		4
+#define RDS_CMSG_CONG_UPDATE		5
+
+#define RDS_INFO_COUNTERS		10000
+#define RDS_INFO_CONNECTIONS		10001
+/* 10002 aka RDS_INFO_FLOWS is deprecated */
+#define RDS_INFO_SEND_MESSAGES		10003
+#define RDS_INFO_RETRANS_MESSAGES       10004
+#define RDS_INFO_RECV_MESSAGES          10005
+#define RDS_INFO_SOCKETS                10006
+#define RDS_INFO_TCP_SOCKETS            10007
+
+struct rds_info_counter {
+	u_int8_t	name[32];
+	u_int64_t	value;
+} __attribute__((packed));
+
+#define RDS_INFO_CONNECTION_FLAG_SENDING	0x01
+#define RDS_INFO_CONNECTION_FLAG_CONNECTING	0x02
+#define RDS_INFO_CONNECTION_FLAG_CONNECTED	0x04
+
+struct rds_info_connection {
+	u_int64_t	next_tx_seq;
+	u_int64_t	next_rx_seq;
+	__be32		laddr;
+	__be32		faddr;
+	u_int8_t	transport[15];		/* null term ascii */
+	u_int8_t	flags;
+} __attribute__((packed));
+
+struct rds_info_flow {
+	__be32		laddr;
+	__be32		faddr;
+	u_int32_t	bytes;
+	__be16		lport;
+	__be16		fport;
+} __attribute__((packed));
+
+#define RDS_INFO_MESSAGE_FLAG_ACK               0x01
+#define RDS_INFO_MESSAGE_FLAG_FAST_ACK          0x02
+
+struct rds_info_message {
+	u_int64_t	seq;
+	u_int32_t	len;
+	__be32		laddr;
+	__be32		faddr;
+	__be16		lport;
+	__be16		fport;
+	u_int8_t	flags;
+} __attribute__((packed));
+
+struct rds_info_socket {
+	u_int32_t	sndbuf;
+	__be32		bound_addr;
+	__be32		connected_addr;
+	__be16		bound_port;
+	__be16		connected_port;
+	u_int32_t	rcvbuf;
+} __attribute__((packed));
+
+struct rds_info_tcp_socket {
+	__be32		local_addr;
+	__be16		local_port;
+	__be32		peer_addr;
+	__be16		peer_port;
+	u_int64_t	hdr_rem;
+	u_int64_t	data_rem;
+	u_int32_t	last_sent_nxt;
+	u_int32_t	last_expected_una;
+	u_int32_t	last_seen_una;
+} __attribute__((packed));
+
+/*
+ * Congestion monitoring.
+ * Congestion control in RDS happens at the host connection
+ * level by exchanging a bitmap marking congested ports.
+ * By default, a process sleeping in poll() is always woken
+ * up when the congestion map is updated.
+ * With explicit monitoring, an application can have more
+ * fine-grained control.
+ * The application installs a 64bit mask value in the socket,
+ * where each bit corresponds to a group of ports.
+ * When a congestion update arrives, RDS checks the set of
+ * ports that are now uncongested against the list bit mask
+ * installed in the socket, and if they overlap, we queue a
+ * cong_notification on the socket.
+ *
+ * To install the congestion monitor bitmask, use RDS_CONG_MONITOR
+ * with the 64bit mask.
+ * Congestion updates are received via RDS_CMSG_CONG_UPDATE
+ * control messages.
+ *
+ * The correspondence between bits and ports is
+ *	1 << (portnum % 64)
+ */
+#define RDS_CONG_MONITOR_SIZE	64
+#define RDS_CONG_MONITOR_BIT(port)  (((unsigned int) port) % RDS_CONG_MONITOR_SIZE)
+#define RDS_CONG_MONITOR_MASK(port) (1ULL << RDS_CONG_MONITOR_BIT(port))
+
+/*
+ * RDMA related types
+ */
+
+/*
+ * This encapsulates a remote memory location.
+ * In the current implementation, it contains the R_Key
+ * of the remote memory region, and the offset into it
+ * (so that the application does not have to worry about
+ * alignment).
+ */
+typedef u_int64_t	rds_rdma_cookie_t;
+
+struct rds_iovec {
+	u_int64_t	addr;
+	u_int64_t	bytes;
+};
+
+struct rds_get_mr_args {
+	struct rds_iovec vec;
+	u_int64_t	cookie_addr;
+	uint64_t	flags;
+};
+
+struct rds_free_mr_args {
+	rds_rdma_cookie_t cookie;
+	u_int64_t	flags;
+};
+
+struct rds_rdma_args {
+	rds_rdma_cookie_t cookie;
+	struct rds_iovec remote_vec;
+	u_int64_t	local_vec_addr;
+	u_int64_t	nr_local;
+	u_int64_t	flags;
+	u_int64_t	user_token;
+};
+
+struct rds_rdma_notify {
+	u_int64_t	user_token;
+	int32_t		status;
+};
+
+#define RDS_RDMA_SUCCESS	0
+#define RDS_RDMA_REMOTE_ERROR	1
+#define RDS_RDMA_CANCELED	2
+#define RDS_RDMA_DROPPED	3
+#define RDS_RDMA_OTHER_ERROR	4
+
+/*
+ * Common set of flags for all RDMA related structs
+ */
+#define RDS_RDMA_READWRITE	0x0001
+#define RDS_RDMA_FENCE		0x0002	/* use FENCE for immediate send */
+#define RDS_RDMA_INVALIDATE	0x0004	/* invalidate R_Key after freeing MR */
+#define RDS_RDMA_USE_ONCE	0x0008	/* free MR after use */
+#define RDS_RDMA_DONTWAIT	0x0010	/* Don't wait in SET_BARRIER */
+#define RDS_RDMA_NOTIFY_ME	0x0020	/* Notify when operation completes */
+
+#endif /* IB_RDS_H */
diff --git a/net/rds/iwarp_recv.c b/net/rds/iwarp_recv.c
new file mode 100644
index 0000000..778298d
--- /dev/null
+++ b/net/rds/iwarp_recv.c
@@ -0,0 +1,968 @@
+/*
+ * Copyright (c) 2008 Chelsio, Inc.  All rights reserved.
+ *
+ * This software is available to you under a choice of one of two
+ * licenses.  You may choose to be licensed under the terms of the GNU
+ * General Public License (GPL) Version 2, available from the file
+ * COPYING in the main directory of this source tree, or the
+ * OpenIB.org BSD license below:
+ *
+ *     Redistribution and use in source and binary forms, with or
+ *     without modification, are permitted provided that the following
+ *     conditions are met:
+ *
+ *      - Redistributions of source code must retain the above
+ *        copyright notice, this list of conditions and the following
+ *        disclaimer.
+ *
+ *      - Redistributions in binary form must reproduce the above
+ *        copyright notice, this list of conditions and the following
+ *        disclaimer in the documentation and/or other materials
+ *        provided with the distribution.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
+ * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
+ * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
+ * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
+ * BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
+ * ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
+ * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ *
+ */
+#include <linux/kernel.h>
+#include <linux/pci.h>
+#include <linux/dma-mapping.h>
+#include <rdma/rdma_cm.h>
+
+#include "rds.h"
+#include "iwarp.h"
+
+static struct kmem_cache *rds_iwarp_incoming_slab;
+static struct kmem_cache *rds_iwarp_frag_slab;
+static atomic_t	rds_iwarp_allocation = ATOMIC_INIT(0);
+
+static void rds_iwarp_frag_drop_page(struct rds_page_frag *frag)
+{
+	rdsdebug("frag %p page %p\n", frag, frag->f_page);
+	__free_page(frag->f_page);
+	frag->f_page = NULL;
+}
+
+static void rds_iwarp_frag_free(struct rds_page_frag *frag)
+{
+	rdsdebug("frag %p page %p\n", frag, frag->f_page);
+	BUG_ON(frag->f_page != NULL);
+	kmem_cache_free(rds_iwarp_frag_slab, frag);
+}
+
+/*
+ * We map a page at a time.  Its fragments are posted in order.  This
+ * is called in fragment order as the fragments get send completion events.
+ * Only the last frag in the page performs the unmapping.
+ *
+ * It's OK for ring cleanup to call this in whatever order it likes because
+ * DMA is not in flight and so we can unmap while other ring entries still
+ * hold page references in their frags.
+ */
+static void rds_iwarp_recv_unmap_page(struct rds_iwarp_connection *ic,
+				   struct rds_iwarp_recv_work *recv)
+{
+	struct rds_page_frag *frag = recv->r_frag;
+
+	rdsdebug("recv %p frag %p page %p\n", recv, frag, frag->f_page);
+	if (frag->f_mapped)
+		ib_dma_unmap_page(ic->i_cm_id->device,
+			       frag->f_mapped,
+			       RDS_FRAG_SIZE, DMA_FROM_DEVICE);
+	frag->f_mapped = 0;
+}
+
+void rds_iwarp_recv_init_ring(struct rds_iwarp_connection *ic)
+{
+	struct rds_iwarp_recv_work *recv;
+	u32 i;
+
+	for(i = 0, recv = ic->i_recvs; i < ic->i_recv_ring.w_nr; i++, recv++) {
+		struct ib_sge *sge;
+
+		recv->r_iwarpinc = NULL;
+		recv->r_frag = NULL;
+
+		recv->r_wr.next = NULL;
+		recv->r_wr.wr_id = i;
+		recv->r_wr.sg_list = recv->r_sge;
+		recv->r_wr.num_sge = RDS_IWARP_RECV_SGE;
+
+		sge = rds_iwarp_data_sge(ic, recv->r_sge);
+		sge->addr = 0;
+		sge->length = RDS_FRAG_SIZE;
+		sge->lkey = ic->i_mr->lkey;
+
+		sge = rds_iwarp_header_sge(ic, recv->r_sge);
+		sge->addr = ic->i_recv_hdrs_dma + (i * sizeof(struct rds_header));
+		sge->length = sizeof(struct rds_header);
+		sge->lkey = ic->i_mr->lkey;
+	}
+}
+
+static void rds_iwarp_recv_clear_one(struct rds_iwarp_connection *ic,
+			          struct rds_iwarp_recv_work *recv)
+{
+	if (recv->r_iwarpinc) {
+		rds_inc_put(&recv->r_iwarpinc->ii_inc);
+		recv->r_iwarpinc = NULL;
+	}
+	if (recv->r_frag) {
+		rds_iwarp_recv_unmap_page(ic, recv);
+		if (recv->r_frag->f_page)
+			rds_iwarp_frag_drop_page(recv->r_frag);
+		rds_iwarp_frag_free(recv->r_frag);
+		recv->r_frag = NULL;
+	}
+}
+
+void rds_iwarp_recv_clear_ring(struct rds_iwarp_connection *ic)
+{
+	u32 i;
+
+	for(i = 0; i < ic->i_recv_ring.w_nr; i++)
+		rds_iwarp_recv_clear_one(ic, &ic->i_recvs[i]);
+
+	if (ic->i_frag.f_page)
+		rds_iwarp_frag_drop_page(&ic->i_frag);
+}
+
+static int rds_iwarp_recv_refill_one(struct rds_connection *conn, 
+				  struct rds_iwarp_recv_work *recv,
+				  gfp_t kptr_gfp, gfp_t page_gfp)
+{
+	struct rds_iwarp_connection *ic = conn->c_transport_data;
+	dma_addr_t dma_addr;
+	struct ib_sge *sge;
+	int ret = -ENOMEM;
+
+	if (recv->r_iwarpinc == NULL) {
+		if (atomic_read(&rds_iwarp_allocation) >= rds_iwarp_sysctl_max_recv_allocation) {
+			rds_iwarp_stats_inc(s_iwarp_rx_alloc_limit);
+			goto out;
+		}
+		recv->r_iwarpinc = kmem_cache_alloc(rds_iwarp_incoming_slab,
+						 kptr_gfp);
+		if (recv->r_iwarpinc == NULL)
+			goto out;
+		atomic_inc(&rds_iwarp_allocation);
+		INIT_LIST_HEAD(&recv->r_iwarpinc->ii_frags);
+		rds_inc_init(&recv->r_iwarpinc->ii_inc, conn, conn->c_faddr);
+	}
+
+	if (recv->r_frag == NULL) {
+		recv->r_frag = kmem_cache_alloc(rds_iwarp_frag_slab, kptr_gfp);
+		if (recv->r_frag == NULL)
+			goto out;
+		INIT_LIST_HEAD(&recv->r_frag->f_item);
+		recv->r_frag->f_page = NULL;
+	}
+
+	if (ic->i_frag.f_page == NULL) {
+		ic->i_frag.f_page = alloc_page(page_gfp);
+		if (ic->i_frag.f_page == NULL)
+			goto out;
+		ic->i_frag.f_offset = 0;
+	}
+
+	dma_addr = ib_dma_map_page(ic->i_cm_id->device,
+				  ic->i_frag.f_page,
+				  ic->i_frag.f_offset,
+				  RDS_FRAG_SIZE,
+				  DMA_FROM_DEVICE);
+	if (ib_dma_mapping_error(ic->i_cm_id->device, dma_addr))
+		goto out;
+
+	/*
+	 * Once we get the RDS_PAGE_LAST_OFF frag then rds_iwarp_frag_unmap()
+	 * must be called on this recv.  This happens as completions hit
+	 * in order or on connection shutdown.
+	 */
+	recv->r_frag->f_page = ic->i_frag.f_page;
+	recv->r_frag->f_offset = ic->i_frag.f_offset;
+	recv->r_frag->f_mapped = dma_addr;
+
+	sge = rds_iwarp_data_sge(ic, recv->r_sge);
+	sge->addr = dma_addr;
+	sge->length = RDS_FRAG_SIZE;
+
+	sge = rds_iwarp_header_sge(ic, recv->r_sge);
+	sge->addr = ic->i_recv_hdrs_dma + (recv - ic->i_recvs) * sizeof(struct rds_header);
+	sge->length = sizeof(struct rds_header);
+
+	get_page(recv->r_frag->f_page);
+
+	if (ic->i_frag.f_offset < RDS_PAGE_LAST_OFF) {
+		ic->i_frag.f_offset += RDS_FRAG_SIZE;
+	} else {
+		put_page(ic->i_frag.f_page);
+		ic->i_frag.f_page = NULL;
+		ic->i_frag.f_offset = 0;
+	}
+
+	ret = 0;
+out:
+	return ret;
+}
+
+/*
+ * This tries to allocate and post unused work requests after making sure that
+ * they have all the allocations they need to queue received fragments into
+ * sockets.  The i_recv_mutex is held here so that ring_alloc and _unalloc
+ * pairs don't go unmatched.
+ *
+ * -1 is returned if posting fails due to temporary resource exhaustion.
+ */
+int rds_iwarp_recv_refill(struct rds_connection *conn, gfp_t kptr_gfp,
+		       gfp_t page_gfp, int prefill)
+{
+	struct rds_iwarp_connection *ic = conn->c_transport_data;
+	struct rds_iwarp_recv_work *recv;
+	struct ib_recv_wr *failed_wr;
+	unsigned int posted = 0;
+	int ret = 0;
+	u32 pos;
+
+	while ((prefill || rds_conn_up(conn))
+			&& rds_iwarp_ring_alloc(&ic->i_recv_ring, 1, &pos)) {
+		if (pos >= ic->i_recv_ring.w_nr) {
+			printk(KERN_NOTICE "Argh - ring alloc returned pos=%u\n",
+					pos);
+			ret = -EINVAL;
+			break;
+		}
+
+		recv = &ic->i_recvs[pos];
+		ret = rds_iwarp_recv_refill_one(conn, recv, kptr_gfp, page_gfp);
+		if (ret) {
+			ret = -1;
+			break;
+		}
+
+		/* XXX when can this fail? */
+		ret = ib_post_recv(ic->i_cm_id->qp, &recv->r_wr, &failed_wr);
+		rdsdebug("recv %p ibinc %p page %p addr %lu ret %d\n", recv,
+			 recv->r_iwarpinc, recv->r_frag->f_page,
+			 (long) recv->r_frag->f_mapped, ret);
+		if (ret) {
+			rds_iwarp_conn_error(conn, "recv post on "
+			       "%u.%u.%u.%u returned %d, disconnecting and "
+			       "reconnecting\n", NIPQUAD(conn->c_faddr),
+			       ret);
+			ret = -1;
+			break;
+		}
+
+		posted++;
+	}
+
+	/* We're doing flow control - update the window. */
+	if (ic->i_flowctl && posted)
+		rds_iwarp_advertise_credits(conn, posted);
+
+	if (ret)
+		rds_iwarp_ring_unalloc(&ic->i_recv_ring, 1);
+	return ret;
+}
+
+void rds_iwarp_inc_purge(struct rds_incoming *inc)
+{
+	struct rds_iwarp_incoming *ibinc;
+	struct rds_page_frag *frag;
+	struct rds_page_frag *pos;
+		
+	ibinc = container_of(inc, struct rds_iwarp_incoming, ii_inc);
+	rdsdebug("purging ibinc %p inc %p\n", ibinc, inc);
+
+	list_for_each_entry_safe(frag, pos, &ibinc->ii_frags, f_item) {
+		list_del_init(&frag->f_item);
+		rds_iwarp_frag_drop_page(frag);
+		rds_iwarp_frag_free(frag);
+	}
+}
+
+void rds_iwarp_inc_free(struct rds_incoming *inc)
+{
+	struct rds_iwarp_incoming *ibinc;
+		
+	ibinc = container_of(inc, struct rds_iwarp_incoming, ii_inc);
+
+	rds_iwarp_inc_purge(inc);
+	rdsdebug("freeing ibinc %p inc %p\n", ibinc, inc);
+	BUG_ON(!list_empty(&ibinc->ii_frags));
+	kmem_cache_free(rds_iwarp_incoming_slab, ibinc);
+	atomic_dec(&rds_iwarp_allocation);
+	BUG_ON(atomic_read(&rds_iwarp_allocation) < 0);
+}
+
+int rds_iwarp_inc_copy_to_user(struct rds_incoming *inc, struct iovec *first_iov,
+			    size_t size)
+{
+	struct rds_iwarp_incoming *ibinc;
+ 	struct rds_page_frag *frag;
+	struct iovec *iov = first_iov;
+	unsigned long to_copy;
+ 	unsigned long frag_off = 0;
+	unsigned long iov_off = 0;
+	int copied = 0;
+	int ret;
+	u32 len;
+
+	ibinc = container_of(inc, struct rds_iwarp_incoming, ii_inc);
+ 	frag = list_entry(ibinc->ii_frags.next, struct rds_page_frag, f_item);
+	len = be32_to_cpu(inc->i_hdr.h_len);
+
+	while (copied < size && copied < len) {
+ 		if (frag_off == RDS_FRAG_SIZE) {
+ 			frag = list_entry(frag->f_item.next,
+ 					  struct rds_page_frag, f_item);
+ 			frag_off = 0;
+		}
+		while (iov_off == iov->iov_len) {
+			iov_off = 0;
+			iov++;
+		}
+
+ 		to_copy = min(iov->iov_len - iov_off, RDS_FRAG_SIZE - frag_off);
+		to_copy = min_t(size_t, to_copy, size - copied);
+		to_copy = min_t(unsigned long, to_copy, len - copied);
+
+		rdsdebug("%lu bytes to user [%p, %zu] + %lu from frag "
+			 "[%p, %lu] + %lu\n", 
+			 to_copy, iov->iov_base, iov->iov_len, iov_off,
+			 frag->f_page, frag->f_offset, frag_off);
+
+		/* XXX needs + offset for multiple recvs per page */ 
+		ret = rds_page_copy_to_user(frag->f_page,
+					    frag->f_offset + frag_off,
+					    iov->iov_base + iov_off,
+					    to_copy);
+		if (ret) {
+			copied = ret;
+			break;
+		}
+
+		iov_off += to_copy;
+ 		frag_off += to_copy;
+		copied += to_copy;
+	}
+
+	return copied;
+}
+
+/* ic starts out kzalloc()ed */
+void rds_iwarp_recv_init_ack(struct rds_iwarp_connection *ic)
+{
+	struct ib_send_wr *wr = &ic->i_ack_wr;
+	struct ib_sge *sge = &ic->i_ack_sge;
+
+	sge->addr = ic->i_ack_dma;
+	sge->length = sizeof(struct rds_header);
+	sge->lkey = ic->i_mr->lkey;
+
+	wr->sg_list = sge;
+	wr->num_sge = 1;
+	wr->opcode = IB_WR_SEND;
+	wr->wr_id = RDS_IWARP_ACK_WR_ID;
+	wr->send_flags = IB_SEND_SIGNALED | IB_SEND_SOLICITED;
+}
+
+/*
+ * You'd think that with reliable IB connections you wouldn't need to ack
+ * messages that have been received.  The problem is that IB hardware generates
+ * an ack message before it has DMAed the message into memory.  This creates a
+ * potential message loss if the HCA is disabled for any reason between when it
+ * sends the ack and before the message is DMAed and processed.  This is only a
+ * potential issue if another HCA is available for fail-over.
+ *
+ * When the remote host receives our ack they'll free the sent message from
+ * their send queue.  To decrease the latency of this we always send an ack
+ * immediately after we've received messages.
+ *
+ * For simplicity, we only have one ack in flight at a time.  This puts
+ * pressure on senders to have deep enough send queues to absorb the latency of
+ * a single ack frame being in flight.  This might not be good enough.
+ *
+ * This is implemented by have a long-lived send_wr and sge which point to a
+ * statically allocated ack frame.  This ack wr does not fall under the ring
+ * accounting that the tx and rx wrs do.  The QP attribute specifically makes
+ * room for it beyond the ring size.  Send completion notices its special
+ * wr_id and avoids working with the ring in that case.
+ */
+#ifndef KERNEL_HAS_ATOMIC64
+static void rds_iwarp_set_ack(struct rds_iwarp_connection *ic, u64 seq,
+				int ack_required)
+{
+	unsigned long flags;
+
+	spin_lock_irqsave(&ic->i_ack_lock, flags);
+	if (ack_required)
+		set_bit(IWARP_ACK_REQUESTED, &ic->i_ack_flags);
+	if (seq > ic->i_ack_next)
+		ic->i_ack_next = seq;
+	spin_unlock_irqrestore(&ic->i_ack_lock, flags);
+}
+
+static u64 rds_iwarp_get_ack(struct rds_iwarp_connection *ic)
+{
+	unsigned long flags;
+	u64 seq;
+
+	clear_bit(IWARP_ACK_REQUESTED, &ic->i_ack_flags);
+
+	spin_lock_irqsave(&ic->i_ack_lock, flags);
+	seq = ic->i_ack_next;
+	spin_unlock_irqrestore(&ic->i_ack_lock, flags);
+
+	return seq;
+}
+#else
+static void rds_iwarp_set_ack(struct rds_iwarp_connection *ic, u64 seq,
+				int ack_required)
+{
+	atomic64_set(&ic->i_ack_next, seq);
+	if (ack_required) {
+		smp_mb__before_clear_bit();
+		set_bit(IWARP_ACK_REQUESTED, &ic->i_ack_flags);
+	}
+}
+
+static u64 rds_iwarp_get_ack(struct rds_iwarp_connection *ic)
+{
+	clear_bit(IWARP_ACK_REQUESTED, &ic->i_ack_flags);
+	smp_mb__after_clear_bit();
+
+	return atomic64_read(&ic->i_ack_next);
+}
+#endif
+
+
+static void rds_iwarp_send_ack(struct rds_iwarp_connection *ic, unsigned int adv_credits)
+{
+	struct rds_header *hdr = ic->i_ack;
+	struct ib_send_wr *failed_wr;
+	u64 seq;
+	int ret;
+
+	seq = rds_iwarp_get_ack(ic);
+
+	rdsdebug("send_ack: ic %p ack %llu\n", ic, (unsigned long long) seq);
+	rds_message_populate_header(hdr, 0, 0, 0);
+	hdr->h_ack = cpu_to_be64(seq);
+	hdr->h_credit = adv_credits;
+	rds_message_make_checksum(hdr);
+	ic->i_ack_queued = jiffies;
+
+	ret = ib_post_send(ic->i_cm_id->qp, &ic->i_ack_wr, &failed_wr);
+	if (unlikely(ret)) {
+		/* Failed to send. Release the WR, and
+		 * force another ACK.
+		 */
+		clear_bit(IWARP_ACK_IN_FLIGHT, &ic->i_ack_flags);
+		set_bit(IWARP_ACK_REQUESTED, &ic->i_ack_flags);
+
+ 		rds_iwarp_stats_inc(s_iwarp_ack_send_failure);
+		/* Need to finesse this later. */
+		BUG();
+	} else
+		rds_iwarp_stats_inc(s_iwarp_ack_sent);
+}
+
+/*
+ * There are 3 ways of getting acknowledgements to the peer:
+ *  1.	We call rds_iwarp_attempt_ack from the recv completion handler
+ *	to send an ACK-only frame.
+ *	However, there can be only one such frame in the send queue
+ *	at any time, so we may have to postpone it.
+ *  2.	When another (data) packet is transmitted while there's
+ *	an ACK in the queue, we piggyback the ACK sequence number
+ *	on the data packet.
+ *  3.	If the ACK WR is done sending, we get called from the
+ *	send queue completion handler, and check whether there's
+ *	another ACK pending (postponed because the WR was on the
+ *	queue). If so, we transmit it.
+ *
+ * We maintain 2 variables:
+ *  -	i_ack_flags, which keeps track of whether the ACK WR
+ *	is currently in the send queue or not (IWARP_ACK_IN_FLIGHT)
+ *  -	i_ack_next, which is the last sequence number we received
+ *
+ * Potentially, send queue and receive queue handlers can run concurrently.
+ * It would be nice to not have to use a spinlock to synchronize things,
+ * but the one problem that rules this out is that 64bit updates are
+ * not atomic on all platforms. Things would be a lot simpler if
+ * we had atomic64 or maybe cmpxchg64 everywhere.
+ *
+ * Reconnecting complicates this picture just slightly. When we
+ * reconnect, we may be seeing duplicate packets. The peer
+ * is retransmitting them, because it hasn't seen an ACK for
+ * them. It is important that we ACK these.
+ *
+ * ACK mitigation adds a header flag "ACK_REQUIRED"; any packet with
+ * this flag set *MUST* be acknowledged immediately.
+ */
+
+/*
+ * When we get here, we're called from the recv queue handler.
+ * Check whether we ought to transmit an ACK.
+ */
+void rds_iwarp_attempt_ack(struct rds_iwarp_connection *ic)
+{
+	unsigned int adv_credits;
+
+	if (!test_bit(IWARP_ACK_REQUESTED, &ic->i_ack_flags))
+		return;
+
+	if (test_and_set_bit(IWARP_ACK_IN_FLIGHT, &ic->i_ack_flags)) {
+		rds_iwarp_stats_inc(s_iwarp_ack_send_delayed);
+		return;
+	}
+
+	/* Can we get a send credit? */
+	if (!rds_iwarp_send_grab_credits(ic, 1, &adv_credits)) {
+		rds_iwarp_stats_inc(s_iwarp_tx_throttle);
+		clear_bit(IWARP_ACK_IN_FLIGHT, &ic->i_ack_flags);
+		return;
+	}
+
+	clear_bit(IWARP_ACK_REQUESTED, &ic->i_ack_flags);
+	rds_iwarp_send_ack(ic, adv_credits);
+}
+
+/*
+ * We get here from the send completion handler, when the
+ * adapter tells us the ACK frame was sent.
+ */
+void rds_iwarp_ack_send_complete(struct rds_iwarp_connection *ic)
+{
+	clear_bit(IWARP_ACK_IN_FLIGHT, &ic->i_ack_flags);
+	rds_iwarp_attempt_ack(ic);
+}
+
+/*
+ * This is called by the regular xmit code when it wants to piggyback
+ * an ACK on an outgoing frame.
+ */
+u64 rds_iwarp_piggyb_ack(struct rds_iwarp_connection *ic)
+{
+	if (test_and_clear_bit(IWARP_ACK_REQUESTED, &ic->i_ack_flags))
+		rds_iwarp_stats_inc(s_iwarp_ack_send_piggybacked);
+	return rds_iwarp_get_ack(ic);
+}
+
+/*
+ * Work is posted as a RDS_FRAG_SIZE payload and then a header.  This is
+ * done so that we can send fragments without headers and keep the fragments
+ * large and aligned.  The sender doesn't pad their fragments so the header
+ * will spill into the posted regions just after the fragment.
+ *
+ * XXX If we were to flip r_page into userspace or the page cache then we'd
+ * have to zero the header and possibly the rest of the page.
+ */
+static int rds_iwarp_copy_header(struct rds_connection *conn,
+			       struct rds_header *hdr,
+			       struct rds_iwarp_recv_work *recv, u32 start)
+{
+	struct rds_iwarp_connection *ic = conn->c_transport_data;
+	void *dst = hdr;
+	void *addr;
+	u32 len = 0;
+
+	/* get the start of the header from the tail of the fragment */
+	if (start < RDS_FRAG_SIZE) {
+		len = min_t(u32, RDS_FRAG_SIZE - start,
+				  sizeof(struct rds_header));
+		addr = kmap_atomic(recv->r_frag->f_page, KM_SOFTIRQ0);
+		memcpy(dst,
+		       addr + recv->r_frag->f_offset + start, 
+		       len);
+		kunmap_atomic(addr, KM_SOFTIRQ0);
+		dst += len;
+	}
+
+	/* and the rest that might have spilled into the posted header space */
+	if (len < sizeof(struct rds_header)) {
+		memcpy(dst,
+		       &ic->i_recv_hdrs[recv - ic->i_recvs],
+		       sizeof(struct rds_header) - len);
+	}
+
+	return 1;
+}
+
+/*
+ * It's kind of lame that we're copying from the posted receive pages into
+ * long-lived bitmaps.  We could have posted the bitmaps and rdma written into
+ * them.  But receiving new congestion bitmaps should be a *rare* event, so
+ * hopefully we won't need to invest that complexity in making it more
+ * efficient.  By copying we can share a simpler core with TCP which has to
+ * copy.
+ */
+static void rds_iwarp_cong_recv(struct rds_connection *conn,
+			      struct rds_iwarp_incoming *ibinc)
+{
+	struct rds_cong_map *map;
+	unsigned int map_off;
+	unsigned int map_page;
+ 	struct rds_page_frag *frag;
+ 	unsigned long frag_off;
+	unsigned long to_copy;
+	unsigned long copied;
+	uint64_t uncongested = 0;
+	void *addr;
+
+	/* catch completely corrupt packets */
+	if (be32_to_cpu(ibinc->ii_inc.i_hdr.h_len) != RDS_CONG_MAP_BYTES)
+		return;
+
+	map = conn->c_fcong;
+	map_page = 0;
+	map_off = 0;
+
+ 	frag = list_entry(ibinc->ii_frags.next, struct rds_page_frag, f_item);
+	frag_off = 0;
+
+	copied = 0;
+
+	while (copied < RDS_CONG_MAP_BYTES) {
+		uint64_t *src, *dst;
+		unsigned int k;
+
+ 		to_copy = min(RDS_FRAG_SIZE - frag_off, PAGE_SIZE - map_off);
+		BUG_ON(to_copy & 7); /* Must be 64bit aligned. */
+
+		addr = kmap_atomic(frag->f_page, KM_SOFTIRQ0);
+
+		src = addr + frag_off;
+		dst = (void *)map->m_page_addrs[map_page] + map_off;
+		for (k = 0; k < to_copy; k += 8) {
+			/* Record ports that became uncongested, ie
+			 * bits that changed from 0 to 1. */
+			uncongested |= ~(*src) & *dst;
+			*dst++ = *src++;
+		}
+		kunmap_atomic(addr, KM_SOFTIRQ0);
+
+		copied += to_copy;
+
+		map_off += to_copy;
+		if (map_off == PAGE_SIZE) {
+			map_off = 0;
+			map_page++;
+		}
+
+		frag_off += to_copy;
+ 		if (frag_off == RDS_FRAG_SIZE) {
+ 			frag = list_entry(frag->f_item.next,
+ 					  struct rds_page_frag, f_item);
+ 			frag_off = 0;
+		}
+	}
+
+	/* the congestion map is in little endian order */
+	uncongested = le64_to_cpu(uncongested);
+
+	rds_cong_map_updated(map, uncongested);
+}
+
+/*
+ * Rings are posted with all the allocations they'll need to queue the
+ * incoming message to the receiving socket so this can't fail.
+ * All fragments start with a header, so we can make sure we're not receiving
+ * garbage, and we can tell a small 8 byte fragment from an ACK frame.
+ */
+struct rds_iwarp_ack_state {
+	u64	ack_next;
+	u64	ack_recv;
+	int	ack_required : 1,
+		ack_next_valid : 1,
+		ack_recv_valid : 1;
+};
+static void rds_iwarp_process_recv(struct rds_connection *conn,
+				struct rds_iwarp_recv_work *recv, u32 byte_len,
+				struct rds_iwarp_ack_state *state)
+{
+	struct rds_iwarp_connection *ic = conn->c_transport_data;
+	struct rds_iwarp_incoming *ibinc = ic->i_iwarpinc;
+	struct rds_header hdr_buf, *ihdr, *hdr;
+
+	/* XXX shut down the connection if port 0,0 are seen? */
+
+	rdsdebug("ic %p ibinc %p recv %p byte len %u\n", ic, ibinc, recv,
+		 byte_len);
+
+	if (byte_len < sizeof(struct rds_header)) {
+		rds_iwarp_conn_error(conn, "incoming message "
+		       "from %u.%u.%u.%u didn't inclue a "
+		       "header, disconnecting and "
+		       "reconnecting\n",
+		       NIPQUAD(conn->c_faddr));
+		return;
+	}
+	byte_len -= sizeof(struct rds_header);
+
+	if (ic->i_hdr_idx == 0)
+		ihdr = &ic->i_recv_hdrs[recv - ic->i_recvs];
+	else if (rds_iwarp_copy_header(conn, &hdr_buf, recv, byte_len))
+		ihdr = &hdr_buf;
+	else
+		return;
+
+	/* Validate the checksum. */
+	if (!rds_message_verify_checksum(ihdr)) {
+		rds_iwarp_conn_error(conn, "incoming message "
+		       "from %u.%u.%u.%u has corrupted header - "
+		       "forcing a reconnect\n",
+		       NIPQUAD(conn->c_faddr));
+		rds_stats_inc(s_recv_drop_bad_checksum);
+		return;
+	}
+
+	/* Process the ACK sequence which comes with every packet */
+	state->ack_recv = be64_to_cpu(ihdr->h_ack);
+	state->ack_recv_valid = 1;
+
+	/* Process the credits update if there was one */
+	if (ihdr->h_credit)
+		rds_iwarp_send_add_credits(conn, ihdr->h_credit);
+
+	if (ihdr->h_sport == 0 && ihdr->h_dport == 0 && byte_len == 0) {
+		/* This is an ACK-only packet. The fact that it gets
+		 * special treatment here is that historically, ACKs
+		 * were rather special beasts.
+		 */
+		rds_iwarp_stats_inc(s_iwarp_ack_received);
+
+		/*
+		 * Usually the frags make their way on to incs and are then freed as
+		 * the inc is freed.  We don't go that route, so we have to drop the
+		 * page ref ourselves.  We can't just leave the page on the recv
+		 * because that confuses the dma mapping of pages and each recv's use
+		 * of a partial page.  We can leave the frag, though, it will be
+		 * reused.
+		 *
+		 * FIXME: Fold this into the code path below.
+		 */
+		rds_iwarp_frag_drop_page(recv->r_frag);
+		return;
+	}
+
+	/*
+	 * If we don't already have an inc on the connection then this
+	 * fragment has a header and starts a message.. copy its header
+	 * into the inc and save the inc so we can hang upcoming fragments
+	 * off its list.
+	 */ 
+	if (ibinc == NULL) {
+		ibinc = recv->r_iwarpinc;
+		recv->r_iwarpinc = NULL;
+		ic->i_iwarpinc = ibinc;
+
+		hdr = &ibinc->ii_inc.i_hdr;
+		memcpy(hdr, ihdr, sizeof(*hdr));
+		ic->i_recv_data_rem = be32_to_cpu(hdr->h_len);
+
+		rdsdebug("ic %p ibinc %p rem %u flag 0x%x\n", ic, ibinc,
+			 ic->i_recv_data_rem, hdr->h_flags);
+	} else {
+		hdr = &ibinc->ii_inc.i_hdr;
+		/* We can't just use memcmp here; fragments of a
+		 * single message may carry different ACKs */
+		if (hdr->h_sequence != ihdr->h_sequence
+		 || hdr->h_len != ihdr->h_len
+		 || hdr->h_sport != ihdr->h_sport
+		 || hdr->h_dport != ihdr->h_dport) {
+			rds_iwarp_conn_error(conn,
+				"fragment header mismatch; forcing reconnect\n");
+			return;
+		}
+	}
+
+	list_add_tail(&recv->r_frag->f_item, &ibinc->ii_frags);
+	recv->r_frag = NULL;
+
+	if (ic->i_recv_data_rem > RDS_FRAG_SIZE)
+		ic->i_recv_data_rem -= RDS_FRAG_SIZE;
+	else {
+		ic->i_recv_data_rem = 0;
+		ic->i_iwarpinc = NULL;
+
+		if (ibinc->ii_inc.i_hdr.h_flags == RDS_FLAG_CONG_BITMAP)
+			rds_iwarp_cong_recv(conn, ibinc);
+		else
+			rds_recv_incoming(conn, conn->c_faddr, conn->c_laddr,
+					  &ibinc->ii_inc, GFP_ATOMIC,
+					  KM_SOFTIRQ0);
+
+		/* Evaluate the ACK_REQUIRED flag *after* we received
+		 * the complete frame, and after bumping the next_rx
+		 * sequence. */
+		if (hdr->h_flags & RDS_FLAG_ACK_REQUIRED) {
+			rds_stats_inc(s_recv_ack_required);
+			state->ack_required = 1;
+		}
+
+		state->ack_next = be64_to_cpu(hdr->h_sequence);
+		state->ack_next_valid = 1;
+		rds_inc_put(&ibinc->ii_inc);
+	}
+}
+
+/*
+ * Plucking the oldest entry from the ring can be done concurrently with
+ * the thread refilling the ring.  Each ring operation is protected by
+ * spinlocks and the transient state of refilling doesn't change the
+ * recording of which entry is oldest.
+ *
+ * This relies on IB only calling one cq comp_handler for each cq so that
+ * there will only be one caller of rds_recv_incoming() per RDS connection.
+ */
+void rds_iwarp_recv_cq_comp_handler(struct ib_cq *cq, void *context)
+{
+	struct rds_connection *conn = context;
+	struct rds_iwarp_connection *ic = conn->c_transport_data;
+	struct ib_wc wc;
+	struct rds_iwarp_ack_state state = { 0, };
+	struct rds_iwarp_recv_work *recv;
+	int ret = 0;
+
+	rdsdebug("conn %p cq %p\n", conn, cq);
+
+	rds_iwarp_stats_inc(s_iwarp_rx_cq_call);
+
+	ib_req_notify_cq(cq, IB_CQ_SOLICITED);
+
+	while (ib_poll_cq(cq, 1, &wc) > 0) {
+		rdsdebug("wc wr_id 0x%llx status %u byte_len %u imm_data %u\n",
+			 (unsigned long long)wc.wr_id, wc.status, wc.byte_len,
+			 be32_to_cpu(wc.ex.imm_data));
+		rds_iwarp_stats_inc(s_iwarp_rx_cq_event);
+
+		recv = &ic->i_recvs[rds_iwarp_ring_oldest(&ic->i_recv_ring)];
+#if 0
+		if (recv->r_sge[1].addr == 0)
+			printk("recv CQ ring: alloc ptr=%u/ctr=%u free ptr=%u/ctr=%u\n",
+				ic->i_recv_ring.w_alloc_ptr,
+				ic->i_recv_ring.w_alloc_ctr,
+				ic->i_recv_ring.w_free_ptr,
+				atomic_read(&ic->i_recv_ring.w_free_ctr));
+#endif
+		rds_iwarp_recv_unmap_page(ic, recv);
+
+		if (rds_conn_up(conn)) {
+			/* We expect errors as the qp is drained during shutdown */
+			if (wc.status == IB_WC_SUCCESS) {
+				rds_iwarp_process_recv(conn, recv, wc.byte_len, &state);
+			} else {
+				rds_iwarp_conn_error(conn, "recv completion on "
+				       "%u.%u.%u.%u had status %u, disconnecting and "
+				       "reconnecting\n", NIPQUAD(conn->c_faddr),
+				       wc.status);
+			}
+		}
+
+		rds_iwarp_ring_free(&ic->i_recv_ring, 1);
+	}
+
+	if (state.ack_next_valid)
+		rds_iwarp_set_ack(ic, state.ack_next, state.ack_required);
+	if (state.ack_recv_valid && state.ack_recv > ic->i_ack_recv) {
+		rds_send_drop_acked(conn, state.ack_recv, NULL);
+		ic->i_ack_recv = state.ack_recv;
+	}
+	if (rds_conn_up(conn))
+		rds_iwarp_attempt_ack(ic);
+
+	/* 
+	 * XXX atomic is bad as it drains reserve pools, we should really
+	 * do some non-blocking alloc that doesn't touch the pools but
+	 * will fail.  Then leave it to the thread to get to reclaim
+	 * and alloc.
+	 */
+	
+	/* 
+	 * If we fail to refill we assume it's a allocation failure
+	 * from our use of GFP_ATOMIC and we want the thread to try again
+	 * immediately.  Similarly, if the thread is already trying to
+	 * refill we want it to try again immediately as it may have missed
+	 * the ring entry we just completed before it released the
+	 * i_recv_mutex.
+	 */
+	/* If we ever end up with a really empty receive ring, we're
+	 * in deep trouble, as the sender will definitely see RNR
+	 * timeouts. */
+	if (rds_iwarp_ring_empty(&ic->i_recv_ring))
+		rds_iwarp_stats_inc(s_iwarp_rx_ring_empty);
+
+	if (mutex_trylock(&ic->i_recv_mutex)) {
+		if (rds_iwarp_recv_refill(conn, GFP_ATOMIC,
+					 GFP_ATOMIC | __GFP_HIGHMEM, 0))
+			ret = -EAGAIN;
+		else
+			rds_iwarp_stats_inc(s_iwarp_rx_refill_from_cq);
+		mutex_unlock(&ic->i_recv_mutex);
+	} else 
+		ret = -EAGAIN;
+
+	if (ret)
+		queue_delayed_work(rds_wq, &conn->c_recv_w, 0);
+}
+
+int rds_iwarp_recv(struct rds_connection *conn)
+{
+	struct rds_iwarp_connection *ic = conn->c_transport_data;
+	int ret = 0;
+
+	rdsdebug("conn %p\n", conn);
+
+	/*
+	 * If we get a temporary posting failure in this context then
+	 * we're really low and we want the caller to back off for a bit.
+	 */
+	mutex_lock(&ic->i_recv_mutex);
+	if (rds_iwarp_recv_refill(conn, GFP_KERNEL, GFP_HIGHUSER, 0))
+		ret = -ENOMEM;
+	else
+		rds_iwarp_stats_inc(s_iwarp_rx_refill_from_thread);
+	mutex_unlock(&ic->i_recv_mutex);
+
+	return ret;
+}
+
+int __init rds_iwarp_recv_init(void)
+{
+	struct sysinfo si;
+	int ret = -ENOMEM;
+
+	/* Default to 30% of all available RAM for recv memory */
+	si_meminfo(&si);
+	rds_iwarp_sysctl_max_recv_allocation = si.totalram / 3 * PAGE_SIZE / RDS_FRAG_SIZE;
+
+	rds_iwarp_incoming_slab = kmem_cache_create("rds_iwarp_incoming",
+					sizeof(struct rds_iwarp_incoming),
+					0, 0, NULL);
+	if (rds_iwarp_incoming_slab == NULL)
+		goto out;
+
+	rds_iwarp_frag_slab = kmem_cache_create("rds_iwarp_frag",
+					sizeof(struct rds_page_frag),
+					0, 0, NULL);
+	if (rds_iwarp_frag_slab == NULL)
+		kmem_cache_destroy(rds_iwarp_incoming_slab);
+	else
+		ret = 0;
+out:
+	return ret;
+}
+
+void rds_iwarp_recv_exit(void)
+{
+	kmem_cache_destroy(rds_iwarp_incoming_slab);
+	kmem_cache_destroy(rds_iwarp_frag_slab);
+}
diff --git a/net/rds/iwarp_ring.c b/net/rds/iwarp_ring.c
new file mode 100644
index 0000000..64361e9
--- /dev/null
+++ b/net/rds/iwarp_ring.c
@@ -0,0 +1,156 @@
+/*
+ * Copyright (c) 2008 Chelsio, Inc.  All rights reserved.
+ *
+ * This software is available to you under a choice of one of two
+ * licenses.  You may choose to be licensed under the terms of the GNU
+ * General Public License (GPL) Version 2, available from the file
+ * COPYING in the main directory of this source tree, or the
+ * OpenIB.org BSD license below:
+ *
+ *     Redistribution and use in source and binary forms, with or
+ *     without modification, are permitted provided that the following
+ *     conditions are met:
+ *
+ *      - Redistributions of source code must retain the above
+ *        copyright notice, this list of conditions and the following
+ *        disclaimer.
+ *
+ *      - Redistributions in binary form must reproduce the above
+ *        copyright notice, this list of conditions and the following
+ *        disclaimer in the documentation and/or other materials
+ *        provided with the distribution.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
+ * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
+ * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
+ * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
+ * BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
+ * ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
+ * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ *
+ */
+#include <linux/kernel.h>
+
+#define DEBUG
+#include "rds.h"
+#include "iwarp.h"
+
+/*
+ * Locking for IB rings.
+ * We assume that allocation is always protected by a mutex
+ * in the caller (this is a valid assumption for the current
+ * implementation).
+ *
+ * Freeing always happens in an interrupt, and hence only
+ * races with allocations, but not with other free()s.
+ *
+ * The interaction between allocation and freeing is that
+ * the alloc code has to determine the number of free entries.
+ * To this end, we maintain two counters; an allocation counter
+ * and a free counter. Both are allowed to run freely, and wrap
+ * around.
+ * The number of used entries is always (alloc_ctr - free_ctr) % NR.
+ *
+ * The current implementation makes free_ctr atomic. When the
+ * caller finds an allocation fails, it should set an "alloc fail"
+ * bit and retry the allocation. The "alloc fail" bit essentially tells
+ * the CQ completion handlers to wake it up after freeing some
+ * more entries.
+ */
+
+/*
+ * This only happens on shutdown.
+ */
+DECLARE_WAIT_QUEUE_HEAD(rds_iwarp_ring_empty_wait);
+
+void rds_iwarp_ring_init(struct rds_iwarp_work_ring *ring, u32 nr)
+{
+	printk(KERN_DEBUG "rds_iwarp_ring_init(%u)\n", nr);
+	memset(ring, 0, sizeof(*ring));
+	ring->w_nr = nr;
+}
+
+static inline u32 __rds_iwarp_ring_used(struct rds_iwarp_work_ring *ring)
+{
+	u32 diff;
+
+	/* This assumes that atomic_t has at least as many bits as u32 */
+	diff = ring->w_alloc_ctr - (u32) atomic_read(&ring->w_free_ctr);
+	BUG_ON(diff > ring->w_nr);
+
+	return diff;
+}
+
+static int __rds_iwarp_ring_empty(struct rds_iwarp_work_ring *ring)
+{
+	return __rds_iwarp_ring_used(ring) == 0;
+}
+
+u32 rds_iwarp_ring_alloc(struct rds_iwarp_work_ring *ring, u32 val, u32 *pos)
+{
+	u32 ret = 0, avail;
+
+	avail = ring->w_nr - __rds_iwarp_ring_used(ring);
+
+	rdsdebug("ring %p val %u next %u free %u\n", ring, val,
+		 ring->w_alloc_ptr, avail);
+
+	if (val && avail) {
+		ret = min(val, avail);
+		*pos = ring->w_alloc_ptr;
+
+		ring->w_alloc_ptr = (ring->w_alloc_ptr + ret) % ring->w_nr;
+		ring->w_alloc_ctr += ret;
+	}
+
+	return ret;
+}
+
+void rds_iwarp_ring_free(struct rds_iwarp_work_ring *ring, u32 val)
+{
+	ring->w_free_ptr = (ring->w_free_ptr + val) % ring->w_nr;
+	atomic_add(val, &ring->w_free_ctr);
+
+	if (__rds_iwarp_ring_empty(ring) &&
+	    waitqueue_active(&rds_iwarp_ring_empty_wait))
+		wake_up(&rds_iwarp_ring_empty_wait);
+}
+
+void rds_iwarp_ring_unalloc(struct rds_iwarp_work_ring *ring, u32 val)
+{
+	ring->w_alloc_ptr = (ring->w_alloc_ptr - val) % ring->w_nr;
+	ring->w_alloc_ctr -= val;
+}
+
+int rds_iwarp_ring_empty(struct rds_iwarp_work_ring *ring)
+{
+	return __rds_iwarp_ring_empty(ring);
+}
+
+/*
+ * returns the oldest alloced ring entry.  This will be the next one
+ * freed.  This can't be called if there are none allocated.
+ */
+u32 rds_iwarp_ring_oldest(struct rds_iwarp_work_ring *ring)
+{
+	return ring->w_free_ptr;
+}
+
+/*
+ * returns the number of completed work requests.
+ */
+
+u32 rds_iwarp_ring_completed(struct rds_iwarp_work_ring *ring, u32 wr_id, u32 oldest)
+{
+	u32 ret;
+
+	if (oldest <= (unsigned long long)wr_id)
+		ret = (unsigned long long)wr_id - oldest + 1;
+	else
+		ret = ring->w_nr - oldest + (unsigned long long)wr_id + 1;
+
+	rdsdebug("ring %p ret %u wr_id %u oldest %u\n", ring, ret,
+		 wr_id, oldest);
+	return ret;
+}
diff --git a/net/rds/iwarp_send.c b/net/rds/iwarp_send.c
new file mode 100644
index 0000000..434adf8
--- /dev/null
+++ b/net/rds/iwarp_send.c
@@ -0,0 +1,826 @@
+/*
+ * Copyright (c) 2008 Chelsio, Inc.  All rights reserved.
+ *
+ * This software is available to you under a choice of one of two
+ * licenses.  You may choose to be licensed under the terms of the GNU
+ * General Public License (GPL) Version 2, available from the file
+ * COPYING in the main directory of this source tree, or the
+ * OpenIB.org BSD license below:
+ *
+ *     Redistribution and use in source and binary forms, with or
+ *     without modification, are permitted provided that the following
+ *     conditions are met:
+ *
+ *      - Redistributions of source code must retain the above
+ *        copyright notice, this list of conditions and the following
+ *        disclaimer.
+ *
+ *      - Redistributions in binary form must reproduce the above
+ *        copyright notice, this list of conditions and the following
+ *        disclaimer in the documentation and/or other materials
+ *        provided with the distribution.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
+ * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
+ * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
+ * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
+ * BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
+ * ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
+ * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ *
+ */
+#include <linux/kernel.h>
+#include <linux/in.h>
+#include <linux/device.h>
+#include <linux/dmapool.h>
+
+#include "rds.h"
+#include "rdma.h"
+#include "iwarp.h"
+
+void rds_iwarp_send_unmap_rm(struct rds_iwarp_connection *ic,
+		          struct rds_iwarp_send_work *send,
+			  int success)
+{
+	struct rds_message *rm = send->s_rm;
+
+	rdsdebug("ic %p send %p rm %p\n", ic, send, rm);
+
+	ib_dma_unmap_sg(ic->i_cm_id->device,
+		     rm->m_sg, rm->m_nents,
+		     DMA_TO_DEVICE);
+
+	/* raise rdma completion hwm */
+	if (rm->m_rdma_op && success) {
+		struct rds_rdma_op *op = rm->m_rdma_op;
+
+		/* If this was a RDMA READ, make sure the CPU sees all
+		 * the updates. */
+		if (!op->r_write)
+			ib_dma_sync_sg_for_cpu(ic->i_cm_id->device,
+					op->r_sg, op->r_count,
+					DMA_FROM_DEVICE);
+
+		/* If the user asked for a completion notification on this
+		 * message, we can implement three different semantics:
+		 *  1.	Notify when we received the ACK on the RDS message
+		 *	that was queued with the RDMA. This provides reliable
+		 *	notification of RDMA status at the expense of a one-way
+		 *	packet delay.
+		 *  2.	Notify when the IB stack gives us the completion event for
+		 *	the RDMA operation.
+		 *  3.	Notify when the IB stack gives us the completion event for
+		 *	the accompanying RDS messages.
+		 * Here, we implement approach #3. To implement approach #2,
+		 * call rds_rdma_send_complete from the cq_handler. To implement #1,
+		 * don't call rds_rdma_send_complete at all, and fall back to the notify
+		 * handling in the ACK processing code.
+		 */
+		rds_rdma_send_complete(rm);
+
+		if (rm->m_rdma_op->r_write)
+			rds_stats_add(s_send_rdma_bytes, rm->m_rdma_op->r_bytes);
+		else
+			rds_stats_add(s_recv_rdma_bytes, rm->m_rdma_op->r_bytes);
+	}
+
+	/* If anyone waited for this message to get flushed out, wake
+	 * them up now */
+	rds_message_unmapped(rm);
+
+	rds_message_put(rm);
+	send->s_rm = NULL;
+}
+
+void rds_iwarp_send_init_ring(struct rds_iwarp_connection *ic)
+{
+	struct rds_iwarp_send_work *send;
+	u32 i;
+
+	for(i = 0, send = ic->i_sends; i < ic->i_send_ring.w_nr; i++, send++) {
+		struct ib_sge *sge;
+
+		send->s_rm = NULL;
+		send->s_op = NULL;
+
+		send->s_wr.wr_id = i;
+		send->s_wr.sg_list = send->s_sge;
+		send->s_wr.num_sge = 1;
+		send->s_wr.opcode = IB_WR_SEND;
+		send->s_wr.send_flags = 0;
+		//send->s_wr.imm_data = 0;
+
+		sge = rds_iwarp_data_sge(ic, send->s_sge);
+		sge->lkey = ic->i_mr->lkey;
+
+		sge = rds_iwarp_header_sge(ic, send->s_sge);
+		sge->addr = ic->i_send_hdrs_dma + (i * sizeof(struct rds_header));
+		sge->length = sizeof(struct rds_header);
+		sge->lkey = ic->i_mr->lkey;
+	}
+}
+
+void rds_iwarp_send_clear_ring(struct rds_iwarp_connection *ic)
+{
+	struct rds_iwarp_send_work *send;
+	u32 i;
+
+	for(i = 0, send = ic->i_sends; i < ic->i_send_ring.w_nr; i++, send++) {
+		if (send->s_rm)
+			rds_iwarp_send_unmap_rm(ic, send, 0);
+		if (send->s_op)
+			ib_dma_unmap_sg(ic->i_cm_id->device,
+				send->s_op->r_sg, send->s_op->r_nents,
+				send->s_op->r_write?  DMA_TO_DEVICE : DMA_FROM_DEVICE);
+	}
+}
+
+/*
+ * The _oldest/_free ring operations here race cleanly with the alloc/unalloc 
+ * operations performed in the send path.  As the sender allocs and potentially
+ * unallocs the next free entry in the ring it doesn't alter which is
+ * the next to be freed, which is what this is concerned with.
+ */
+void rds_iwarp_send_cq_comp_handler(struct ib_cq *cq, void *context)
+{
+	struct rds_connection *conn = context;
+	struct rds_iwarp_connection *ic = conn->c_transport_data;
+	struct ib_wc wc;
+	struct rds_iwarp_send_work *send;
+	u32 completed;
+	u32 oldest;
+	u32 i = 0;
+	int ret;
+
+	rdsdebug("cq %p conn %p\n", cq, conn);
+	rds_iwarp_stats_inc(s_iwarp_tx_cq_call);
+	ret = ib_req_notify_cq(cq, IB_CQ_NEXT_COMP);
+	if (ret) {
+		rdsdebug("ib_req_notify_cq send failed: %d\n", ret);
+	}
+
+	while (ib_poll_cq(cq, 1, &wc) > 0 ) {
+/*
+		rdsdebug("wc wr_id 0x%llx status %u byte_len %u imm_data %u\n",
+			 (unsigned long long)wc.wr_id, wc.status, wc.byte_len,
+			 be32_to_cpu(wc.imm_data));
+*/		rds_iwarp_stats_inc(s_iwarp_tx_cq_event);
+
+		if (wc.wr_id == RDS_IWARP_ACK_WR_ID) {
+			if (ic->i_ack_queued + HZ/2 < jiffies)
+				rds_iwarp_stats_inc(s_iwarp_tx_stalled);
+			rds_iwarp_ack_send_complete(ic);
+			continue;
+		}
+
+		oldest = rds_iwarp_ring_oldest(&ic->i_send_ring);
+
+		completed = rds_iwarp_ring_completed(&ic->i_send_ring, wc.wr_id, oldest);
+
+		for (i = 0; i < completed; i++) {
+			send = &ic->i_sends[oldest];
+
+			/* In the error case, wc.opcode sometimes contains garbage */
+			switch (send->s_wr.opcode) {
+			case IB_WR_SEND:
+				if (send->s_rm)
+					rds_iwarp_send_unmap_rm(ic, send, 1);
+				break;
+			case IB_WR_RDMA_WRITE:
+				if (send->s_op)
+					ib_dma_unmap_sg(ic->i_cm_id->device,
+						send->s_op->r_sg, send->s_op->r_nents,
+						DMA_TO_DEVICE);
+				break;
+			case IB_WR_RDMA_READ:
+				if (send->s_op)
+					ib_dma_unmap_sg(ic->i_cm_id->device,
+						send->s_op->r_sg, send->s_op->r_nents,
+						DMA_FROM_DEVICE);
+				break;
+			default:
+				if (printk_ratelimit())
+					printk(KERN_NOTICE
+						"RDS/IWARP: %s: unexpected opcode 0x%x in WR!\n",
+						__FUNCTION__, send->s_wr.opcode);
+				break;
+			}
+
+			send->s_wr.opcode = 0xdead;
+			send->s_wr.num_sge = 1;
+			if (send->s_queued + HZ/2 < jiffies)
+				rds_iwarp_stats_inc(s_iwarp_tx_stalled);
+
+			oldest = (oldest + 1) % ic->i_send_ring.w_nr;
+		}
+
+		if (unlikely(wc.status != IB_WC_SUCCESS && send->s_op && send->s_op->r_notifier)) {
+			switch (wc.status) {
+			default:
+				send->s_op->r_notifier->n_status = RDS_RDMA_OTHER_ERROR;
+				break;
+			case IB_WC_REM_ACCESS_ERR:
+				send->s_op->r_notifier->n_status = RDS_RDMA_REMOTE_ERROR;
+				break;
+			case IB_WC_WR_FLUSH_ERR:
+				/* flushed out; not an error */
+				break;
+			}
+		}
+
+		rds_iwarp_ring_free(&ic->i_send_ring, completed);
+
+		if (test_and_clear_bit(RDS_LL_SEND_FULL, &conn->c_flags)
+		 || test_bit(0, &conn->c_map_queued))
+			queue_delayed_work(rds_wq, &conn->c_send_w, 0);
+
+		/* We expect errors as the qp is drained during shutdown */
+		if (wc.status != IB_WC_SUCCESS && rds_conn_up(conn)) {
+			rds_iwarp_conn_error(conn,
+				"send completion on %u.%u.%u.%u "
+				"had status %u, disconnecting and reconnecting\n",
+				NIPQUAD(conn->c_faddr), wc.status);
+		}
+	}
+}
+
+/*
+ * This is the main function for allocating credits when sending
+ * messages.
+ *
+ * Conceptually, we have two counters:
+ *  -	send credits: this tells us how many WRs we're allowed
+ *	to submit without overruning the reciever's queue. For
+ *	each SEND WR we post, we decrement this by one.
+ *
+ *  -	posted credits: this tells us how many WRs we recently
+ *	posted to the receive queue. This value is transferred
+ *	to the peer as a "credit update" in a RDS header field.
+ *	Every time we transmit credits to the peer, we subtract
+ *	the amount of transferred credits from this counter.
+ *
+ * It is essential that we avoid situations where both sides have
+ * exhausted their send credits, and are unable to send new credits
+ * to the peer. We achieve this by requiring that we send at least
+ * one credit update to the peer before exhausting our credits.
+ * When new credits arrive, we subtract one credit that is withheld
+ * until we've posted new buffers and are ready to transmit these
+ * credits (see rds_iwarp_send_add_credits below).
+ *
+ * The RDS send code is essentially single-threaded; rds_send_xmit
+ * grabs c_send_sem to ensure exclusive access to the send ring.
+ * However, the ACK sending code is independent and can race with
+ * message SENDs.
+ *
+ * In the send path, we need to update the counters for send credits
+ * and the counter of posted buffers atomically - when we use the
+ * last available credit, we cannot allow another thread to race us
+ * and grab the posted credits counter.  Hence, we have to use a
+ * spinlock to protect the credit counter, or use atomics.
+ *
+ * Spinlocks shared between the send and the receive path are bad,
+ * because they create unnecessary delays. An early implementation
+ * using a spinlock showed a 5% degradation in throughput at some
+ * loads.
+ *
+ * This implementation avoids spinlocks completely, putting both
+ * counters into a single atomic, and updating that atomic using
+ * atomic_add (in the receive path, when receiving fresh credits),
+ * and using atomic_cmpxchg when updating the two counters.
+ */
+int rds_iwarp_send_grab_credits(struct rds_iwarp_connection *ic,
+			     u32 wanted, u32 *adv_credits)
+{
+	unsigned int avail, posted, got = 0, advertise;
+	long oldval, newval;
+
+	*adv_credits = 0;
+	if (!ic->i_flowctl)
+		return wanted;
+
+try_again:
+	advertise = 0;
+	oldval = newval = atomic_read(&ic->i_credits);
+	posted = IWARP_GET_POST_CREDITS(oldval);
+	avail = IWARP_GET_SEND_CREDITS(oldval);
+
+	rdsdebug("rds_iwarp_send_grab_credits(%u): credits=%u posted=%u\n",
+			wanted, avail, posted);
+
+	/* The last credit must be used to send a credit updated. */
+	if (avail && !posted)
+		avail--;
+
+	if (avail < wanted) {
+		struct rds_connection *conn = ic->i_cm_id->context;
+
+		/* Oops, there aren't that many credits left! */
+		set_bit(RDS_LL_SEND_FULL, &conn->c_flags);
+		got = avail;
+	} else {
+		/* Sometimes you get what you want, lalala. */
+		got = wanted;
+	}
+	newval -= IWARP_SET_SEND_CREDITS(got);
+
+	if (got && posted) {
+		advertise = min_t(unsigned int, posted, RDS_MAX_ADV_CREDIT);
+		newval -= IWARP_SET_POST_CREDITS(advertise);
+	}
+
+	/* Finally bill everything */
+	if (atomic_cmpxchg(&ic->i_credits, oldval, newval) != oldval)
+		goto try_again;
+
+	*adv_credits = advertise;
+	return got;
+}
+
+void rds_iwarp_send_add_credits(struct rds_connection *conn, unsigned int credits)
+{
+	struct rds_iwarp_connection *ic = conn->c_transport_data;
+
+	if (credits == 0)
+		return;
+
+	rdsdebug("rds_iwarp_send_add_credits(%u): current=%u%s\n",
+			credits,
+			IWARP_GET_SEND_CREDITS(atomic_read(&ic->i_credits)),
+			test_bit(RDS_LL_SEND_FULL, &conn->c_flags)? ", ll_send_full" : "");
+
+	atomic_add(IWARP_SET_SEND_CREDITS(credits), &ic->i_credits);
+	if (test_and_clear_bit(RDS_LL_SEND_FULL, &conn->c_flags))
+		queue_delayed_work(rds_wq, &conn->c_send_w, 0);
+
+	WARN_ON(IWARP_GET_SEND_CREDITS(credits) >= 16384);
+
+	rds_iwarp_stats_inc(s_iwarp_rx_credit_updates);
+}
+
+void rds_iwarp_advertise_credits(struct rds_connection *conn, unsigned int posted)
+{
+	struct rds_iwarp_connection *ic = conn->c_transport_data;
+
+	if (posted == 0)
+		return;
+
+	atomic_add(IWARP_SET_POST_CREDITS(posted), &ic->i_credits);
+
+	/* Decide whether to send an update to the peer now.
+	 * If we would send a credit update for every single buffer we
+	 * post, we would end up with an ACK storm (ACK arrives,
+	 * consumes buffer, we refill the ring, send ACK to remote
+	 * advertising the newly posted buffer... ad inf)
+	 *
+	 * Performance pretty much depends on how often we send
+	 * credit updates - too frequent updates mean lots of ACKs.
+	 * Too infrequent updates, and the peer will run out of
+	 * credits and has to throttle.
+	 * For the time being, 16 seems to be a good compromise.
+	 */
+	if (IWARP_GET_POST_CREDITS(atomic_read(&ic->i_credits)) >= 16)
+		set_bit(IWARP_ACK_REQUESTED, &ic->i_ack_flags);
+}
+
+static inline void
+rds_iwarp_xmit_populate_wr(struct rds_iwarp_connection *ic,
+		struct rds_iwarp_send_work *send, unsigned int pos,
+		unsigned long buffer, unsigned int length,
+		int send_flags)
+{
+	struct ib_sge *sge;
+
+	WARN_ON(pos != send - ic->i_sends);
+
+	send->s_wr.send_flags = send_flags;
+	send->s_wr.opcode = IB_WR_SEND;
+	send->s_wr.num_sge = 2;
+	send->s_wr.next = NULL;
+	send->s_queued = jiffies;
+	send->s_op = NULL;
+
+	if (length != 0) {
+		sge = rds_iwarp_data_sge(ic, send->s_sge);
+		sge->addr = buffer;
+		sge->length = length;
+		sge->lkey = ic->i_mr->lkey;
+
+		sge = rds_iwarp_header_sge(ic, send->s_sge);
+	} else {
+		/* We're sending a packet with no payload. There is only
+		 * one SGE */
+		send->s_wr.num_sge = 1;
+		sge = &send->s_sge[0];
+	}
+
+	sge->addr = ic->i_send_hdrs_dma + (pos * sizeof(struct rds_header));
+	sge->length = sizeof(struct rds_header);
+	sge->lkey = ic->i_mr->lkey;
+}
+
+/*
+ * This can be called multiple times for a given message.  The first time
+ * we see a message we map its scatterlist into the IB device so that
+ * we can provide that mapped address to the IB scatter gather entries
+ * in the IB work requests.  We translate the scatterlist into a series
+ * of work requests that fragment the message.  These work requests complete
+ * in order so we pass ownership of the message to the completion handler
+ * once we send the final fragment. 
+ *
+ * The RDS core uses the c_send_sem to only enter this function once
+ * per connection.  This makes sure that the tx ring alloc/unalloc pairs
+ * don't get out of sync and confuse the ring.
+ */
+int rds_iwarp_xmit(struct rds_connection *conn, struct rds_message *rm,
+	        unsigned int hdr_off, unsigned int sg, unsigned int off)
+{
+	struct rds_iwarp_connection *ic = conn->c_transport_data;
+	struct ib_device *dev = ic->i_cm_id->device;
+	struct rds_iwarp_send_work *send = NULL;
+	struct rds_iwarp_send_work *first;
+	struct rds_iwarp_send_work *prev;
+	struct ib_send_wr *failed_wr;
+	struct scatterlist *scat;
+	u32 pos;
+	u32 i;
+	u32 work_alloc;
+	u32 credit_alloc;
+	u32 adv_credits = 0;
+	int send_flags = 0;
+	int sent;
+	int ret;
+
+	BUG_ON(off % RDS_FRAG_SIZE);
+	BUG_ON(hdr_off != 0 && hdr_off != sizeof(struct rds_header));
+
+	/* FIXME we may overallocate here */
+	if (be32_to_cpu(rm->m_inc.i_hdr.h_len) == 0)
+		i = 1;
+	else
+		i = ceil(be32_to_cpu(rm->m_inc.i_hdr.h_len), RDS_FRAG_SIZE);
+
+	work_alloc = rds_iwarp_ring_alloc(&ic->i_send_ring, i, &pos);
+	if (work_alloc == 0) {
+		set_bit(RDS_LL_SEND_FULL, &conn->c_flags);
+		rds_iwarp_stats_inc(s_iwarp_tx_ring_full);
+		ret = -ENOMEM;
+		goto out;
+	}
+
+	credit_alloc = work_alloc;
+	if (ic->i_flowctl) {
+		credit_alloc = rds_iwarp_send_grab_credits(ic, work_alloc, &adv_credits);
+		if (credit_alloc < work_alloc) {
+			rds_iwarp_ring_unalloc(&ic->i_send_ring, work_alloc - credit_alloc);
+			work_alloc = credit_alloc;
+		}
+		if (work_alloc == 0) {
+			rds_iwarp_ring_unalloc(&ic->i_send_ring, work_alloc);
+			rds_iwarp_stats_inc(s_iwarp_tx_throttle);
+			ret = -ENOMEM;
+			goto out;
+		}
+	}
+
+	/* map the message the first time we see it */
+	if (ic->i_rm == NULL) {
+		/*
+		printk(KERN_NOTICE "rds_iwarp_xmit prep msg dport=%u flags=0x%x len=%d\n",
+				be16_to_cpu(rm->m_inc.i_hdr.h_dport),
+				rm->m_inc.i_hdr.h_flags,
+				be32_to_cpu(rm->m_inc.i_hdr.h_len));
+		   */
+		if (rm->m_nents) {
+			rm->m_count = ib_dma_map_sg(dev,
+					 rm->m_sg, rm->m_nents, DMA_TO_DEVICE);
+			rdsdebug("ic %p mapping rm %p: %d\n", ic, rm, rm->m_count);
+			if (rm->m_count == 0) {
+				rds_iwarp_stats_inc(s_iwarp_tx_sg_mapping_failure);
+				ret = -ENOMEM; /* XXX ? */
+				goto out;
+			}
+		} else {
+			rm->m_count = 0;
+		}
+
+		ic->i_unsignaled_wrs = rds_iwarp_sysctl_max_unsig_wrs;
+		ic->i_unsignaled_bytes = rds_iwarp_sysctl_max_unsig_bytes;
+		rds_message_addref(rm);
+		ic->i_rm = rm;
+
+		/* Finalize the header */
+		if (test_bit(RDS_MSG_ACK_REQUIRED, &rm->m_flags))
+			rm->m_inc.i_hdr.h_flags |= RDS_FLAG_ACK_REQUIRED;
+		if (test_bit(RDS_MSG_RETRANSMITTED, &rm->m_flags))
+			rm->m_inc.i_hdr.h_flags |= RDS_FLAG_RETRANSMITTED;
+
+		/* If it has a RDMA op, tell the peer we did it. This is
+		 * used by the peer to release use-once RDMA MRs. */
+		if (rm->m_rdma_op) {
+			struct rds_ext_header_rdma ext_hdr;
+
+			ext_hdr.h_rdma_rkey = cpu_to_be32(rm->m_rdma_op->r_key);
+			rds_message_add_extension(&rm->m_inc.i_hdr,
+					RDS_EXTHDR_RDMA, &ext_hdr, sizeof(ext_hdr));
+		}
+		if (rm->m_rdma_cookie) {
+			rds_message_add_rdma_dest_extension(&rm->m_inc.i_hdr,
+					rds_rdma_cookie_key(rm->m_rdma_cookie),
+					rds_rdma_cookie_offset(rm->m_rdma_cookie));
+		}
+
+		/* Note - rds_iwarp_piggyb_ack clears the ACK_REQUIRED bit, so
+		 * we should not do this unless we have a chance of at least
+		 * sticking the header into the send ring. Which is why we
+		 * should call rds_iwarp_ring_alloc first. */
+		rm->m_inc.i_hdr.h_ack = cpu_to_be64(rds_iwarp_piggyb_ack(ic));
+		rds_message_make_checksum(&rm->m_inc.i_hdr);
+	} else if (ic->i_rm != rm)
+		BUG();
+
+	send = &ic->i_sends[pos];
+	first = send;
+	prev = NULL;
+	scat = &rm->m_sg[sg];
+	sent = 0;
+	i = 0;
+
+	/* Sometimes you want to put a fence between an RDMA
+	 * READ and the following SEND.
+	 * We could either do this all the time
+	 * or when requested by the user. Right now, we let
+	 * the application choose.
+	 */
+	if (rm->m_rdma_op && rm->m_rdma_op->r_fence)
+		send_flags = IB_SEND_FENCE;
+
+	/*
+	 * We could be copying the header into the unused tail of the page.
+	 * That would need to be changed in the future when those pages might
+	 * be mapped userspace pages or page cache pages.  So instead we always
+	 * use a second sge and our long-lived ring of mapped headers.  We send
+	 * the header after the data so that the data payload can be aligned on
+	 * the receiver.
+	 */
+
+	/* handle a 0-len message */
+	if (be32_to_cpu(rm->m_inc.i_hdr.h_len) == 0) {
+		rds_iwarp_xmit_populate_wr(ic, send, pos, 0, 0, send_flags);
+		goto add_header;
+	}
+
+	/* if there's data reference it with a chain of work reqs */
+	for(; i < work_alloc && scat != &rm->m_sg[rm->m_count]; i++) {
+		unsigned int len;
+
+		send = &ic->i_sends[pos];
+
+		len = min(RDS_FRAG_SIZE, ib_sg_dma_len(dev, scat) - off);
+		rds_iwarp_xmit_populate_wr(ic, send, pos,
+				ib_sg_dma_address(dev, scat) + off, len,
+				send_flags);
+
+                /* 
+                 * We want to delay signaling completions just enough to get 
+                 * the batching benefits but not so much that we create dead time on the wire. 
+                 */
+		if (ic->i_unsignaled_wrs-- == 0) {
+			ic->i_unsignaled_wrs = rds_iwarp_sysctl_max_unsig_wrs;
+			send->s_wr.send_flags |= IB_SEND_SIGNALED | IB_SEND_SOLICITED;
+		}
+
+		ic->i_unsignaled_bytes -= len;
+		if (ic->i_unsignaled_bytes <= 0) {
+			ic->i_unsignaled_bytes = rds_iwarp_sysctl_max_unsig_bytes;
+			send->s_wr.send_flags |= IB_SEND_SIGNALED | IB_SEND_SOLICITED;
+		}
+
+		rdsdebug("send %p wr %p num_sge %u next %p\n", send,
+			 &send->s_wr, send->s_wr.num_sge, send->s_wr.next);
+
+		sent += len;
+		off += len;
+		if (off == ib_sg_dma_len(dev, scat)) {
+			scat++;
+			off = 0;
+		}
+
+add_header:
+		/* Tack on the header after the data. The header SGE should already
+		 * have been set up to point to the right header buffer. */
+		memcpy(&ic->i_send_hdrs[pos], &rm->m_inc.i_hdr, sizeof(struct rds_header));
+
+		if (0) {
+			struct rds_header *hdr = &ic->i_send_hdrs[pos];
+
+			printk(KERN_NOTICE "send WR dport=%u flags=0x%x len=%d\n",
+				be16_to_cpu(hdr->h_dport),
+				hdr->h_flags,
+				be32_to_cpu(hdr->h_len));
+		}
+		if (adv_credits) {
+			struct rds_header *hdr = &ic->i_send_hdrs[pos];
+
+			/* add credit and redo the header checksum */
+			hdr->h_credit = adv_credits;
+			rds_message_make_checksum(hdr);
+			adv_credits = 0;
+			rds_iwarp_stats_inc(s_iwarp_tx_credit_updates);
+		}
+
+		if (prev)
+			prev->s_wr.next = &send->s_wr;
+		prev = send;
+
+		pos = (pos + 1) % ic->i_send_ring.w_nr;
+	}
+
+	/* Account the RDS header in the number of bytes we sent, but just once.
+	 * The caller has no concept of fragmentation. */
+	if (hdr_off == 0)
+		sent += sizeof(struct rds_header);
+
+	/* if we finished the message then send completion owns it */
+	if (scat == &rm->m_sg[rm->m_count]) {
+		prev->s_rm = ic->i_rm;
+		prev->s_wr.send_flags |= IB_SEND_SIGNALED | IB_SEND_SOLICITED;
+		ic->i_rm = NULL;
+	}
+
+	if (i < work_alloc) {
+		rds_iwarp_ring_unalloc(&ic->i_send_ring, work_alloc - i);
+		work_alloc = i;
+	}
+	if (ic->i_flowctl && i < credit_alloc)
+		rds_iwarp_send_add_credits(conn, credit_alloc - i);
+
+	/* XXX need to worry about failed_wr and partial sends. */
+	failed_wr = &first->s_wr;
+	ret = ib_post_send(ic->i_cm_id->qp, &first->s_wr, &failed_wr);
+	rdsdebug("ic %p first %p (wr %p) ret %d wr %p\n", ic, 
+		 first, &first->s_wr, ret, failed_wr); 
+	BUG_ON(failed_wr != &first->s_wr);
+	if (ret) {
+		printk(KERN_WARNING "RDS/IWARP: ib_post_send to %u.%u.%u.%u "
+		       "returned %d\n", NIPQUAD(conn->c_faddr), ret);
+		rds_iwarp_ring_unalloc(&ic->i_send_ring, work_alloc);
+		if (prev->s_rm) {
+			ic->i_rm = prev->s_rm;
+			prev->s_rm = NULL;
+		}
+		/* Finesse this later */
+		BUG();
+		goto out;
+	}
+
+	ret = sent;
+out:
+	BUG_ON(adv_credits);
+	return ret;
+}
+
+int rds_iwarp_xmit_rdma(struct rds_connection *conn, struct rds_rdma_op *op)
+{
+	struct rds_iwarp_connection *ic = conn->c_transport_data;
+	struct rds_iwarp_send_work *send = NULL;
+	struct rds_iwarp_send_work *first;
+	struct rds_iwarp_send_work *prev;
+	struct ib_send_wr *failed_wr;
+	struct rds_iwarp_device *rds_iwarpdev;
+	struct scatterlist *scat;
+	unsigned long len;
+	u64 remote_addr = op->r_remote_addr;
+	u32 pos;
+	u32 work_alloc;
+	u32 i;
+	u32 j;
+	int sent;
+	int ret;
+	int num_sge;
+
+	rds_iwarpdev = ib_get_client_data(ic->i_cm_id->device, &rds_iwarp_client);
+
+	/* map the message the first time we see it */
+	op->r_count = ib_dma_map_sg(ic->i_cm_id->device,
+					op->r_sg, op->r_nents, (op->r_write) ?
+					DMA_TO_DEVICE : DMA_FROM_DEVICE);
+	rdsdebug("ic %p mapping op %p: %d\n", ic, op, op->r_count);
+	if (op->r_count == 0) {
+		rds_iwarp_stats_inc(s_iwarp_tx_sg_mapping_failure);
+		ret = -ENOMEM; /* XXX ? */
+		goto out;
+	}
+
+	/*
+	 * Instead of knowing how to return a partial rdma read/write we insist that there
+	 * be enough work requests to send the entire message.
+	 */
+	i = ceil(op->r_count, rds_iwarpdev->max_sge);
+
+	work_alloc = rds_iwarp_ring_alloc(&ic->i_send_ring, i, &pos);
+	if (work_alloc != i) {
+		rds_iwarp_ring_unalloc(&ic->i_send_ring, work_alloc);
+		rds_iwarp_stats_inc(s_iwarp_tx_ring_full);
+		ret = -ENOMEM;
+		goto out;
+	}
+
+	send = &ic->i_sends[pos];
+	first = send;
+	prev = NULL;
+	scat = &op->r_sg[0];
+	sent = 0;
+	num_sge = op->r_count;
+
+	for ( i = 0; i < work_alloc && scat != &op->r_sg[op->r_count]; i++ ) {
+		send->s_wr.send_flags = 0;
+		send->s_queued = jiffies;
+		/*
+		 * We want to delay signaling completions just enough to get
+		 * the batching benefits but not so much that we create dead time on the wire.
+		 */
+		if (ic->i_unsignaled_wrs-- == 0) {
+			ic->i_unsignaled_wrs = rds_iwarp_sysctl_max_unsig_wrs;
+			send->s_wr.send_flags = IB_SEND_SIGNALED;
+		}
+
+		send->s_wr.opcode = op->r_write ? IB_WR_RDMA_WRITE : IB_WR_RDMA_READ;
+		send->s_wr.wr.rdma.remote_addr = remote_addr;
+		send->s_wr.wr.rdma.rkey = op->r_key;
+		send->s_op = NULL;
+
+		if (num_sge > rds_iwarpdev->max_sge) {
+			send->s_wr.num_sge = rds_iwarpdev->max_sge;
+			num_sge -= rds_iwarpdev->max_sge;
+		}
+		else
+			send->s_wr.num_sge = num_sge;
+
+		send->s_wr.next = NULL;
+
+		if (prev)
+			prev->s_wr.next = &send->s_wr;
+
+		for (j = 0; j < send->s_wr.num_sge && scat != &op->r_sg[op->r_count]; j++) {
+			len = sg_dma_len(scat);
+			send->s_sge[j].addr = sg_dma_address(scat);
+			send->s_sge[j].length = len;
+			send->s_sge[j].lkey = ic->i_mr->lkey;
+
+			sent += len;
+			rdsdebug("ic %p sent %d remote_addr %llu\n", ic, sent, remote_addr);
+
+			remote_addr += sg_dma_len(scat);
+			scat++;
+		}
+
+		rdsdebug("send %p wr %p num_sge %u next %p\n", send,
+			&send->s_wr, send->s_wr.num_sge, send->s_wr.next);
+
+		prev = send;
+		if (++send == &ic->i_sends[ic->i_send_ring.w_nr])
+			send = ic->i_sends;
+	}
+
+	/* if we finished the message then send completion owns it */
+	if (scat == &op->r_sg[op->r_count]) {
+		prev->s_wr.send_flags = IB_SEND_SIGNALED;
+		prev->s_op = op;
+	}
+
+	if (i < work_alloc) {
+		rds_iwarp_ring_unalloc(&ic->i_send_ring, work_alloc - i);
+		work_alloc = i;
+	}
+
+	failed_wr = &first->s_wr;
+	ret = ib_post_send(ic->i_cm_id->qp, &first->s_wr, &failed_wr);
+	rdsdebug("ic %p first %p (wr %p) ret %d wr %p\n", ic,
+		 first, &first->s_wr, ret, failed_wr);
+	BUG_ON(failed_wr != &first->s_wr);
+	if (ret) {
+		printk(KERN_WARNING "RDS/IWARP: rdma ib_post_send to %u.%u.%u.%u "
+		       "returned %d\n", NIPQUAD(conn->c_faddr), ret);
+		rds_iwarp_ring_unalloc(&ic->i_send_ring, work_alloc);
+		goto out;
+	}
+
+	if (unlikely(failed_wr != &first->s_wr)) {
+		printk(KERN_WARNING "RDS/IWARP: ib_post_send() rc=%d, but failed_wqe updated!\n", ret);
+		BUG_ON(failed_wr != &first->s_wr);
+	}
+
+
+out:
+	return ret;
+}
+
+void rds_iwarp_xmit_complete(struct rds_connection *conn)
+{
+	struct rds_iwarp_connection *ic = conn->c_transport_data;
+
+	/* We may have a pending ACK or window update we were unable
+	 * to send previously (due to flow control). Try again. */
+	rds_iwarp_attempt_ack(ic);
+}
diff --git a/net/rds/iwarp_stats.c b/net/rds/iwarp_stats.c
new file mode 100644
index 0000000..192b328
--- /dev/null
+++ b/net/rds/iwarp_stats.c
@@ -0,0 +1,95 @@
+/*
+ * Copyright (c) 2008 Chelsio, Inc.  All rights reserved.
+ *
+ * This software is available to you under a choice of one of two
+ * licenses.  You may choose to be licensed under the terms of the GNU
+ * General Public License (GPL) Version 2, available from the file
+ * COPYING in the main directory of this source tree, or the
+ * OpenIB.org BSD license below:
+ *
+ *     Redistribution and use in source and binary forms, with or
+ *     without modification, are permitted provided that the following
+ *     conditions are met:
+ *
+ *      - Redistributions of source code must retain the above
+ *        copyright notice, this list of conditions and the following
+ *        disclaimer.
+ *
+ *      - Redistributions in binary form must reproduce the above
+ *        copyright notice, this list of conditions and the following
+ *        disclaimer in the documentation and/or other materials
+ *        provided with the distribution.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
+ * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
+ * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
+ * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
+ * BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
+ * ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
+ * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ *
+ */
+#include <linux/percpu.h>
+#include <linux/seq_file.h>
+#include <linux/proc_fs.h>
+
+#include "rds.h"
+#include "iwarp.h"
+
+RDS_DEFINE_PER_CPU(struct rds_iwarp_statistics, rds_iwarp_stats) 
+	____cacheline_aligned;
+
+static char *rds_iwarp_stat_names[] = {
+	"iwarp_connect_raced",
+	"iwarp_listen_closed_stale",
+	"iwarp_tx_cq_call",
+	"iwarp_tx_cq_event",
+	"iwarp_tx_ring_full",
+	"iwarp_tx_throttle",
+	"iwarp_tx_sg_mapping_failure",
+	"iwarp_tx_stalled",
+	"iwarp_tx_credit_updates",
+	"iwarp_rx_cq_call",
+	"iwarp_rx_cq_event",
+	"iwarp_rx_ring_empty",
+	"iwarp_rx_refill_from_cq",
+	"iwarp_rx_refill_from_thread",
+	"iwarp_rx_alloc_limit",
+	"iwarp_rx_credit_updates",
+	"iwarp_ack_sent",
+	"iwarp_ack_send_failure",
+	"iwarp_ack_send_delayed",
+	"iwarp_ack_send_piggybacked",
+	"iwarp_ack_received",
+	"iwarp_rdma_mr_alloc",
+	"iwarp_rdma_mr_free",
+	"iwarp_rdma_mr_used",
+	"iwarp_rdma_mr_pool_flush",
+	"iwarp_rdma_mr_pool_wait",
+};
+
+unsigned int rds_iwarp_stats_info_copy(struct rds_info_iterator *iter,
+				    unsigned int avail)
+{
+	struct rds_iwarp_statistics stats = {0, };
+	unsigned long *src;
+	unsigned long *sum;
+	size_t i;
+	int cpu;
+
+	if (avail < ARRAY_SIZE(rds_iwarp_stat_names))
+		goto out;
+
+	for_each_online_cpu(cpu) {
+		src = (unsigned long *)&(rds_per_cpu(rds_iwarp_stats, cpu));
+		sum = (unsigned long *)&stats;
+		for (i = 0; i < sizeof(stats) / sizeof(unsigned long); i++)
+			*(sum++) += *(src++);
+	}
+
+	rds_stats_info_copy(iter, (unsigned long *)&stats, rds_iwarp_stat_names,
+			    ARRAY_SIZE(rds_iwarp_stat_names));
+out:
+	return ARRAY_SIZE(rds_iwarp_stat_names);
+}
diff --git a/net/rds/iwarp_sysctl.c b/net/rds/iwarp_sysctl.c
new file mode 100644
index 0000000..dd9060c
--- /dev/null
+++ b/net/rds/iwarp_sysctl.c
@@ -0,0 +1,166 @@
+/*
+ * Copyright (c) 2008 Chelsio, Inc.  All rights reserved.
+ *
+ * This software is available to you under a choice of one of two
+ * licenses.  You may choose to be licensed under the terms of the GNU
+ * General Public License (GPL) Version 2, available from the file
+ * COPYING in the main directory of this source tree, or the
+ * OpenIB.org BSD license below:
+ *
+ *     Redistribution and use in source and binary forms, with or
+ *     without modification, are permitted provided that the following
+ *     conditions are met:
+ *
+ *      - Redistributions of source code must retain the above
+ *        copyright notice, this list of conditions and the following
+ *        disclaimer.
+ *
+ *      - Redistributions in binary form must reproduce the above
+ *        copyright notice, this list of conditions and the following
+ *        disclaimer in the documentation and/or other materials
+ *        provided with the distribution.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
+ * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
+ * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
+ * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
+ * BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
+ * ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
+ * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ *
+ */
+#include <linux/kernel.h>
+#include <linux/sysctl.h>
+#include <linux/proc_fs.h>
+
+#include "iwarp.h"
+
+static struct ctl_table_header *rds_iwarp_sysctl_hdr;
+
+unsigned long rds_iwarp_sysctl_max_send_wr = RDS_IWARP_DEFAULT_SEND_WR;
+unsigned long rds_iwarp_sysctl_max_recv_wr = RDS_IWARP_DEFAULT_RECV_WR;
+unsigned long rds_iwarp_sysctl_max_recv_allocation = (128 * 1024 * 1024) / RDS_FRAG_SIZE;
+static unsigned long rds_iwarp_sysctl_max_wr_min = 1;
+/* hardware will fail CQ creation long before this */
+static unsigned long rds_iwarp_sysctl_max_wr_max = (u32)~0;
+
+unsigned long rds_iwarp_sysctl_max_unsig_wrs = 16;
+static unsigned long rds_iwarp_sysctl_max_unsig_wr_min = 1;
+static unsigned long rds_iwarp_sysctl_max_unsig_wr_max = 64;
+
+unsigned long rds_iwarp_sysctl_max_unsig_bytes = (16 << 20);
+static unsigned long rds_iwarp_sysctl_max_unsig_bytes_min = 1;
+static unsigned long rds_iwarp_sysctl_max_unsig_bytes_max = ~0UL;
+
+unsigned int rds_iwarp_sysctl_flow_control = 1;
+
+ctl_table rds_iwarp_sysctl_table[] = {
+	{
+		.ctl_name       = 1,
+		.procname       = "max_send_wr",
+		.data		= &rds_iwarp_sysctl_max_send_wr,
+		.maxlen         = sizeof(unsigned long),
+		.mode           = 0644,
+		.proc_handler   = &proc_doulongvec_minmax,
+		.extra1		= &rds_iwarp_sysctl_max_wr_min,
+		.extra2		= &rds_iwarp_sysctl_max_wr_max,
+	},
+	{
+		.ctl_name       = 2,
+		.procname       = "max_recv_wr",
+		.data		= &rds_iwarp_sysctl_max_recv_wr,
+		.maxlen         = sizeof(unsigned long),
+		.mode           = 0644,
+		.proc_handler   = &proc_doulongvec_minmax,
+		.extra1		= &rds_iwarp_sysctl_max_wr_min,
+		.extra2		= &rds_iwarp_sysctl_max_wr_max,
+	},
+	{
+		.ctl_name       = 3,
+		.procname       = "max_unsignaled_wr",
+		.data		= &rds_iwarp_sysctl_max_unsig_wrs,
+		.maxlen         = sizeof(unsigned long),
+		.mode           = 0644,
+		.proc_handler   = &proc_doulongvec_minmax,
+		.extra1		= &rds_iwarp_sysctl_max_unsig_wr_min,
+		.extra2		= &rds_iwarp_sysctl_max_unsig_wr_max,
+	},
+	{
+		.ctl_name       = 4,
+		.procname       = "max_unsignaled_bytes",
+		.data		= &rds_iwarp_sysctl_max_unsig_bytes,
+		.maxlen         = sizeof(unsigned long),
+		.mode           = 0644,
+		.proc_handler   = &proc_doulongvec_minmax,
+		.extra1		= &rds_iwarp_sysctl_max_unsig_bytes_min,
+		.extra2		= &rds_iwarp_sysctl_max_unsig_bytes_max,
+	},
+	{
+		.ctl_name       = 5,
+		.procname       = "max_recv_allocation",
+		.data		= &rds_iwarp_sysctl_max_recv_allocation,
+		.maxlen         = sizeof(unsigned long),
+		.mode           = 0644,
+		.proc_handler   = &proc_doulongvec_minmax,
+	},
+	{
+		.ctl_name	= 6,
+		.procname	= "flow_control",
+		.data		= &rds_iwarp_sysctl_flow_control,
+		.maxlen		= sizeof(rds_iwarp_sysctl_flow_control),
+		.mode		= 0644,
+		.proc_handler	= &proc_dointvec,
+	},
+	{ .ctl_name = 0}
+};
+
+static ctl_table rds_iwarp_sysctl_rds_table[] = {
+	{
+		.ctl_name	= SYSCTL_NET_RDS_IWARP,
+		.procname	= "iwarp",
+		.data		= NULL,
+		.maxlen		= 0,
+		.mode		= 0555,
+		.child		= rds_iwarp_sysctl_table,
+	},
+	{ .ctl_name = 0}
+};
+
+static ctl_table rds_iwarp_sysctl_net_table[] = {
+	{
+		.ctl_name	= SYSCTL_NET_RDS,
+		.procname	= "rds",
+		.data		= NULL,
+		.maxlen		= 0,
+		.mode		= 0555,
+		.child		= rds_iwarp_sysctl_rds_table
+	},
+	{ .ctl_name = 0}
+};
+
+static ctl_table rds_iwarp_sysctl_root_table[] = {
+	{
+		.ctl_name	= CTL_NET,
+		.procname	= "net",
+		.data		= NULL,
+		.maxlen		= 0,
+		.mode		= 0555,
+		.child		= rds_iwarp_sysctl_net_table
+	},
+	{ .ctl_name = 0 }
+};
+
+void rds_iwarp_sysctl_exit(void)
+{
+	if (rds_iwarp_sysctl_hdr)
+		unregister_sysctl_table(rds_iwarp_sysctl_hdr);
+}
+
+int __init rds_iwarp_sysctl_init(void)
+{
+	rds_iwarp_sysctl_hdr = register_sysctl_table(rds_iwarp_sysctl_root_table, 1);
+	if (rds_iwarp_sysctl_hdr == NULL)
+		return -ENOMEM;
+	return 0;
+}
diff --git a/net/rds/rdma.c b/net/rds/rdma.c
index 1f1039e..9dbcdf5 100644
--- a/net/rds/rdma.c
+++ b/net/rds/rdma.c
@@ -258,7 +258,7 @@ static int __rds_rdma_map(struct rds_sock *rs, struct rds_get_mr_args *args,
 	 * Note that dma_map() implies that pending writes are
 	 * flushed to RAM, so no dma_sync is needed here. */
 	trans_private = rs->rs_transport->get_mr(sg, nents,
-						 rs->rs_bound_addr, 
+						 rs, 
 						 &mr->r_key);
 
 	if (IS_ERR(trans_private)) {
diff --git a/net/rds/rds.h b/net/rds/rds.h
index 03031e2..b49a643 100644
--- a/net/rds/rds.h
+++ b/net/rds/rds.h
@@ -26,8 +26,9 @@
  *
  * port 18633 was the version that had ack frames on the wire.
  */
-#define RDS_PORT	18634
-
+#define RDS_TCP_PORT	18634
+#define RDS_IB_PORT	18635
+#define RDS_IWARP_PORT	18636
 
 #ifndef AF_RDS
 #define AF_RDS          28      /* Reliable Datagram Socket     */
@@ -58,6 +59,7 @@
 /* XXX crap, we need to worry about this conflicting too */
 #define SYSCTL_NET_RDS 9912
 #define SYSCTL_NET_RDS_IB 100
+#define SYSCTL_NET_RDS_IWARP 101
 
 #ifdef DEBUG
 #define rdsdebug(fmt, args...) pr_debug("%s(): " fmt, __func__ , ##args)
@@ -375,7 +377,7 @@ struct rds_transport {
 					unsigned int avail);
 	void (*exit)(void);
 	void *(*get_mr)(struct scatterlist *sg, unsigned long nr_sg,
-			__be32 ip_addr, u32 *key_ret);
+			struct rds_sock *rs, u32 *key_ret);
 	void (*sync_mr)(void *trans_private, int direction);
 	void (*free_mr)(void *trans_private, int invalidate);
 	void (*flush_mrs)(void);
diff --git a/net/rds/tcp_connect.c b/net/rds/tcp_connect.c
index 0389a99..298e372 100644
--- a/net/rds/tcp_connect.c
+++ b/net/rds/tcp_connect.c
@@ -96,7 +96,7 @@ int rds_tcp_conn_connect(struct rds_connection *conn)
 
 	dest.sin_family = AF_INET;
 	dest.sin_addr.s_addr = (__force u32)conn->c_faddr;
-	dest.sin_port = (__force u16)htons(RDS_PORT);
+	dest.sin_port = (__force u16)htons(RDS_TCP_PORT);
 
 	/* 
 	 * once we call connect() we can start getting callbacks and they
diff --git a/net/rds/tcp_listen.c b/net/rds/tcp_listen.c
index caeacbe..50709b7 100644
--- a/net/rds/tcp_listen.c
+++ b/net/rds/tcp_listen.c
@@ -159,7 +159,7 @@ int __init rds_tcp_listen_init(void)
 
 	sin.sin_family = PF_INET,
 	sin.sin_addr.s_addr = (__force u32)htonl(INADDR_ANY);
-	sin.sin_port = (__force u16)htons(RDS_PORT);
+	sin.sin_port = (__force u16)htons(RDS_TCP_PORT);
 
 	ret = sock->ops->bind(sock, (struct sockaddr *)&sin, sizeof(sin));
 	if (ret < 0)
diff --git a/net/rds/transport.c b/net/rds/transport.c
index 19a1a57..dd571c5 100644
--- a/net/rds/transport.c
+++ b/net/rds/transport.c
@@ -1,5 +1,6 @@
 /*
  * Copyright (c) 2006 Oracle.  All rights reserved.
+ * Copyright (c) 2008 Chelsio, Inc.  All rights reserved.
  *
  * This software is available to you under a choice of one of two
  * licenses.  You may choose to be licensed under the terms of the GNU
@@ -163,6 +164,14 @@ int __init rds_trans_init(void)
 			goto out;
 	}
 #endif
+#ifdef CONFIG_RDS_IWARP
+	{
+		extern int __init rds_iwarp_init(void);
+		ret = rds_iwarp_init();
+		if (ret)
+			goto out;
+	}
+#endif
 #ifdef CONFIG_RDS_TCP
 	{
 		extern int __init rds_tcp_init(void);



More information about the general mailing list