[ofa-general] [PATCH] sdp: all previous patches in one patch

Amir Vadai amirv at mellanox.co.il
Mon Jun 22 01:18:49 PDT 2009


changes:
* fixed coding style
* make interrupt moderation adaptive
* arm nagle timer on not sent packet instead of on sent packet
* fix bad HELLO/HELLO_ACK buffer size matching + let recvmsg do posts on data packets too
* make bzcopy poll timeout in jiffies instead of iterations count
* fix bad handling for not aligned buffers in bzcopy + removed poll at end of send
* TX from 1 context only. RX with minimal context switches
* don't arm nagle timer for every sent packet
* Do not nagle BZCopy packets
* don't do nagle on first packet
* process RX CQ from interrupt
* move rx/tx stuff to dedicated files: sdp_rx and sdp_tx
* /proc/net/sdpprf - performance utilities
* no tx interrupts
* move tx_ring into dedicated structre + many cosmetic fixes
* Interrupts performance fixes
* added /proc/net/sdpstats + packets dump

Signed-off-by: Amir Vadai <amirv at mellanox.co.il>
---

 drivers/infiniband/ulp/sdp/Makefile    |    2 +-
 drivers/infiniband/ulp/sdp/sdp.h       |  455 ++++++++++++++---
 drivers/infiniband/ulp/sdp/sdp_bcopy.c |  893 +++++--------------------------
 drivers/infiniband/ulp/sdp/sdp_cma.c   |  191 +++-----
 drivers/infiniband/ulp/sdp/sdp_main.c  |  897 +++++++++++++++-----------------
 drivers/infiniband/ulp/sdp/sdp_proc.c  |  496 ++++++++++++++++++
 drivers/infiniband/ulp/sdp/sdp_rx.c    |  850 ++++++++++++++++++++++++++++++
 drivers/infiniband/ulp/sdp/sdp_tx.c    |  442 ++++++++++++++++
 8 files changed, 2800 insertions(+), 1426 deletions(-)

diff --git a/drivers/infiniband/ulp/sdp/Makefile b/drivers/infiniband/ulp/sdp/Makefile
index c889cce..b14a16a 100644
--- a/drivers/infiniband/ulp/sdp/Makefile
+++ b/drivers/infiniband/ulp/sdp/Makefile
@@ -3,4 +3,4 @@ EXTRA_CFLAGS += -ggdb
 
 obj-$(CONFIG_INFINIBAND_SDP) += ib_sdp.o
 
-ib_sdp-objs := sdp_main.o sdp_cma.o sdp_bcopy.o
+ib_sdp-objs := sdp_main.o sdp_cma.o sdp_bcopy.o sdp_proc.o sdp_tx.o sdp_rx.o
diff --git a/drivers/infiniband/ulp/sdp/sdp.h b/drivers/infiniband/ulp/sdp/sdp.h
index f9c2421..53f4b95 100644
--- a/drivers/infiniband/ulp/sdp/sdp.h
+++ b/drivers/infiniband/ulp/sdp/sdp.h
@@ -6,15 +6,91 @@
 #include <net/inet_sock.h>
 #include <net/tcp.h> /* For urgent data flags */
 #include <rdma/ib_verbs.h>
+#include <linux/sched.h>
 
-#define sdp_printk(level, sk, format, arg...)                \
-	printk(level "%s:%d sdp_sock(%d:%d): " format,             \
-	       __func__, __LINE__, \
+#define SDPSTATS_ON
+/* #define SDP_PROFILING */
+
+#define _sdp_printk(func, line, level, sk, format, arg...)                \
+	printk(level "%s:%d sdp_sock(%5d:%d %d:%d): " format,             \
+	       func, line, \
+	       current->pid, smp_processor_id(), \
 	       (sk) ? inet_sk(sk)->num : -1,                 \
 	       (sk) ? ntohs(inet_sk(sk)->dport) : -1, ## arg)
+#define sdp_printk(level, sk, format, arg...)                \
+	_sdp_printk(__func__, __LINE__, level, sk, format, ## arg)
 #define sdp_warn(sk, format, arg...)                         \
 	sdp_printk(KERN_WARNING, sk, format , ## arg)
 
+#define rx_ring_lock(ssk, f) do { \
+	spin_lock_irqsave(&ssk->rx_ring.lock, f); \
+} while (0)
+
+#define rx_ring_unlock(ssk, f) do { \
+	spin_unlock_irqrestore(&ssk->rx_ring.lock, f); \
+} while (0)
+
+#define SDP_MODPARAM_SINT(var, def_val, msg) \
+	static int var = def_val; \
+	module_param_named(var, var, int, 0644); \
+	MODULE_PARM_DESC(var, msg " [" #def_val "]"); \
+
+#define SDP_MODPARAM_INT(var, def_val, msg) \
+	int var = def_val; \
+	module_param_named(var, var, int, 0644); \
+	MODULE_PARM_DESC(var, msg " [" #def_val "]"); \
+
+#ifdef SDP_PROFILING
+struct sk_buff;
+struct sdpprf_log {
+	int 		idx;
+	int 		pid;
+	int 		cpu;
+	int 		sk_num;
+	int 		sk_dport;
+	struct sk_buff 	*skb;
+	char		msg[256];
+
+	unsigned long long time;
+
+	const char 	*func;
+	int 		line;
+};
+
+#define SDPPRF_LOG_SIZE 0x20000 /* must be a power of 2 */
+
+extern struct sdpprf_log sdpprf_log[SDPPRF_LOG_SIZE];
+extern int sdpprf_log_count;
+
+static inline unsigned long long current_nsec(void)
+{
+	struct timespec tv;
+	getnstimeofday(&tv);
+	return tv.tv_sec * NSEC_PER_SEC + tv.tv_nsec;
+}
+#define sdp_prf1(sk, s, format, arg...) ({ \
+	struct sdpprf_log *l = \
+		&sdpprf_log[sdpprf_log_count++ & (SDPPRF_LOG_SIZE - 1)]; \
+	l->idx = sdpprf_log_count - 1; \
+	l->pid = current->pid; \
+	l->sk_num = (sk) ? inet_sk(sk)->num : -1;                 \
+	l->sk_dport = (sk) ? ntohs(inet_sk(sk)->dport) : -1; \
+	l->cpu = smp_processor_id(); \
+	l->skb = s; \
+	snprintf(l->msg, sizeof(l->msg) - 1, format, ## arg); \
+	l->time = current_nsec(); \
+	l->func = __func__; \
+	l->line = __LINE__; \
+	1; \
+})
+#define sdp_prf(sk, s, format, arg...)
+/* #define sdp_prf(sk, s, format, arg...) sdp_prf1(sk, s, format, ## arg) */
+
+#else
+#define sdp_prf1(sk, s, format, arg...)
+#define sdp_prf(sk, s, format, arg...)
+#endif
+
 #ifdef CONFIG_INFINIBAND_SDP_DEBUG
 extern int sdp_debug_level;
 
@@ -37,9 +113,9 @@ extern int sdp_debug_level;
 })
 
 #define sk_common_release(sk) do { \
-		sdp_dbg(sk, "%s:%d - sock_put(" SOCK_REF_BORN ") - refcount = %d " \
-			"from withing sk_common_release\n",\
-			__FUNCTION__, __LINE__, atomic_read(&(sk)->sk_refcnt)); \
+		sdp_dbg(sk, "%s:%d - sock_put(" SOCK_REF_BORN \
+			") - refcount = %d from withing sk_common_release\n",\
+			__func__, __LINE__, atomic_read(&(sk)->sk_refcnt));\
 		sk_common_release(sk); \
 } while (0)
 
@@ -50,15 +126,80 @@ extern int sdp_debug_level;
 #endif /* CONFIG_INFINIBAND_SDP_DEBUG */
 
 #ifdef CONFIG_INFINIBAND_SDP_DEBUG_DATA
+
 extern int sdp_data_debug_level;
-#define sdp_dbg_data(sk, format, arg...)                     \
-	do {                                                 \
-		if (sdp_data_debug_level > 0)                \
-		sdp_printk(KERN_DEBUG, sk, format , ## arg); \
+#define sdp_dbg_data(sk, format, arg...)                     		\
+	do {                                                 		\
+		if (sdp_data_debug_level & 0x2)                		\
+			sdp_printk(KERN_DEBUG, sk, format , ## arg); 	\
+	} while (0)
+#define SDP_DUMP_PACKET(sk, str, skb, h)                     		\
+	do {                                                 		\
+		if (sdp_data_debug_level & 0x1)                		\
+			dump_packet(sk, str, skb, h); 			\
 	} while (0)
 #else
-#define sdp_dbg_data(priv, format, arg...)                   \
-	do { (void) (priv); } while (0)
+#define sdp_dbg_data(priv, format, arg...)
+#define SDP_DUMP_PACKET(sk, str, skb, h)
+#endif
+
+#ifdef SDPSTATS_ON
+
+struct sdpstats {
+	u32 post_send[256];
+	u32 sendmsg_bcopy_segment;
+	u32 sendmsg_bzcopy_segment;
+	u32 sendmsg;
+	u32 post_send_credits;
+	u32 sendmsg_nagle_skip;
+	u32 sendmsg_seglen[25];
+	u32 send_size[25];
+	u32 post_recv;
+	u32 rx_int_count;
+	u32 tx_int_count;
+	u32 bzcopy_poll_miss;
+	u32 send_wait_for_mem;
+	u32 send_miss_no_credits;
+	u32 rx_poll_miss;
+	u32 tx_poll_miss;
+	u32 tx_poll_hit;
+	u32 tx_poll_busy;
+	u32 memcpy_count;
+	u32 credits_before_update[64];
+	u32 send_interval[25];
+
+	u32 bz_clean_sum;
+	u32 bz_setup_sum;
+	u32 tx_copy_sum;
+	u32 sendmsg_sum;
+};
+extern struct sdpstats sdpstats;
+
+static inline void sdpstats_hist(u32 *h, u32 val, u32 maxidx, int is_log)
+{
+	int idx = is_log ? ilog2(val) : val;
+	if (idx > maxidx)
+		idx = maxidx;
+
+	h[idx]++;
+}
+
+#define SDPSTATS_COUNTER_INC(stat) do { sdpstats.stat++; } while (0)
+#define SDPSTATS_COUNTER_ADD(stat, val) do { sdpstats.stat += val; } while (0)
+#define SDPSTATS_COUNTER_MID_INC(stat, mid) do { sdpstats.stat[mid]++; } \
+	while (0)
+#define SDPSTATS_HIST(stat, size) \
+	sdpstats_hist(sdpstats.stat, size, ARRAY_SIZE(sdpstats.stat) - 1, 1)
+
+#define SDPSTATS_HIST_LINEAR(stat, size) \
+	sdpstats_hist(sdpstats.stat, size, ARRAY_SIZE(sdpstats.stat) - 1, 0)
+
+#else
+#define SDPSTATS_COUNTER_INC(stat)
+#define SDPSTATS_COUNTER_ADD(stat, val)
+#define SDPSTATS_COUNTER_MID_INC(stat, mid)
+#define SDPSTATS_HIST_LINEAR(stat, size)
+#define SDPSTATS_HIST(stat, size)
 #endif
 
 #define SOCK_REF_RESET "RESET"
@@ -72,6 +213,12 @@ extern int sdp_data_debug_level;
 #define sock_put(sk, msg)  sock_ref(sk, msg, sock_put)
 #define __sock_put(sk, msg)  sock_ref(sk, msg, __sock_put)
 
+/* Interval between sucessive polls in the Tx routine when polling is used
+   instead of interrupts (in per-core Tx rings) - should be power of 2 */
+#define SDP_TX_POLL_MODER	16
+#define SDP_TX_POLL_TIMEOUT	(HZ / 4)
+#define SDP_NAGLE_TIMEOUT (HZ / 10)
+
 #define SDP_RESOLVE_TIMEOUT 1000
 #define SDP_ROUTE_TIMEOUT 1000
 #define SDP_RETRY_COUNT 5
@@ -81,7 +228,8 @@ extern int sdp_data_debug_level;
 #define SDP_TX_SIZE 0x40
 #define SDP_RX_SIZE 0x40
 
-#define SDP_MAX_SEND_SKB_FRAGS (PAGE_SIZE > 0x8000 ? 1 : 0x8000 / PAGE_SIZE)
+#define SDP_MAX_RECV_SKB_FRAGS (PAGE_SIZE > 0x8000 ? 1 : 0x8000 / PAGE_SIZE)
+#define SDP_MAX_SEND_SKB_FRAGS (SDP_MAX_RECV_SKB_FRAGS + 1)
 #define SDP_HEAD_SIZE (PAGE_SIZE / 2 + sizeof(struct sdp_bsdh))
 #define SDP_NUM_WC 4
 #define SDP_MAX_PAYLOAD ((1 << 16) - SDP_HEAD_SIZE)
@@ -92,6 +240,21 @@ extern int sdp_data_debug_level;
 #define SDP_OP_RECV 0x800000000LL
 #define SDP_OP_SEND 0x400000000LL
 
+/* how long (in jiffies) to block sender till tx completion*/
+#define SDP_BZCOPY_POLL_TIMEOUT (HZ / 10)
+
+#define SDP_AUTO_CONF	0xffff
+#define AUTO_MOD_DELAY (HZ / 4)
+
+#define BZCOPY_STATE(skb) (*(struct bzcopy_state **)(skb->cb))
+#ifndef MIN
+#define MIN(a, b) (a < b ? a : b)
+#endif
+
+extern struct workqueue_struct *sdp_wq;
+extern struct list_head sock_list;
+extern spinlock_t sock_list_lock;
+
 enum sdp_mid {
 	SDP_MID_HELLO = 0x0,
 	SDP_MID_HELLO_ACK = 0x1,
@@ -107,7 +270,7 @@ enum sdp_flags {
 };
 
 enum {
-	SDP_MIN_BUFS = 2
+	SDP_MIN_TX_CREDITS = 2
 };
 
 enum {
@@ -129,33 +292,132 @@ struct sdp_bsdh {
 	__u32 mseq_ack;
 };
 
+union cma_ip_addr {
+	struct in6_addr ip6;
+	struct {
+		__u32 pad[3];
+		__u32 addr;
+	} ip4;
+};
+
+/* TODO: too much? Can I avoid having the src/dst and port here? */
+struct sdp_hh {
+	struct sdp_bsdh bsdh;
+	u8 majv_minv;
+	u8 ipv_cap;
+	u8 rsvd1;
+	u8 max_adverts;
+	__u32 desremrcvsz;
+	__u32 localrcvsz;
+	__u16 port;
+	__u16 rsvd2;
+	union cma_ip_addr src_addr;
+	union cma_ip_addr dst_addr;
+};
+
+struct sdp_hah {
+	struct sdp_bsdh bsdh;
+	u8 majv_minv;
+	u8 ipv_cap;
+	u8 rsvd1;
+	u8 ext_max_adverts;
+	__u32 actrcvsz;
+};
+
 struct sdp_buf {
         struct sk_buff *skb;
         u64             mapping[SDP_MAX_SEND_SKB_FRAGS + 1];
 };
 
+#define ring_head(ring)   (atomic_read(&(ring).head))
+#define ring_tail(ring)   (atomic_read(&(ring).tail))
+#define ring_posted(ring) (ring_head(ring) - ring_tail(ring))
+
+struct sdp_tx_ring {
+	struct sdp_buf   *buffer;
+	atomic_t          head;
+	atomic_t          tail;
+	struct ib_cq 	 *cq;
+
+	int 		  una_seq;
+	atomic_t 	  credits;
+#define tx_credits(ssk) (atomic_read(&ssk->tx_ring.credits))
+
+	struct timer_list timer;
+	u16 		  poll_cnt;
+};
+
+struct sdp_rx_ring {
+	struct sdp_buf   *buffer;
+	atomic_t          head;
+	atomic_t          tail;
+	struct ib_cq 	 *cq;
+
+	spinlock_t 	 lock;
+};
+
+static inline int sdp_tx_ring_slots_left(struct sdp_tx_ring *tx_ring)
+{
+	return SDP_TX_SIZE - ring_posted(*tx_ring);
+}
+
+struct sdp_chrecvbuf {
+	u32 size;
+};
+
+#define posts_handler(ssk) atomic_read(&ssk->somebody_is_doing_posts)
+#define posts_handler_get(ssk) atomic_inc(&ssk->somebody_is_doing_posts)
+#define posts_handler_put(ssk) do {\
+	atomic_dec(&ssk->somebody_is_doing_posts); \
+	sdp_do_posts(ssk); \
+} while (0)
+
+struct sdp_moderation {
+	unsigned long last_moder_packets;
+	unsigned long last_moder_tx_packets;
+	unsigned long last_moder_bytes;
+	unsigned long last_moder_jiffies;
+	int last_moder_time;
+	u16 rx_usecs;
+	u16 rx_frames;
+	u16 tx_usecs;
+	u32 pkt_rate_low;
+	u16 rx_usecs_low;
+	u32 pkt_rate_high;
+	u16 rx_usecs_high;
+	u16 sample_interval;
+	u16 adaptive_rx_coal;
+	u32 msg_enable;
+
+	int moder_cnt;
+	int moder_time;
+};
+
 struct sdp_sock {
 	/* sk has to be the first member of inet_sock */
 	struct inet_sock isk;
 	struct list_head sock_list;
 	struct list_head accept_queue;
 	struct list_head backlog_queue;
+	struct sk_buff_head rx_ctl_q;
 	struct sock *parent;
 
-	struct work_struct work;
+	struct work_struct rx_comp_work;
 	wait_queue_head_t wq;
 
 	struct delayed_work dreq_wait_work;
 	struct work_struct destroy_work;
 
+	atomic_t somebody_is_doing_posts;
+
 	/* Like tcp_sock */
 	u16 urg_data;
 	u32 urg_seq;
 	u32 copied_seq;
-	u32 rcv_nxt;
+#define rcv_nxt(ssk) atomic_read(&(ssk->rcv_nxt))
+	atomic_t rcv_nxt;
 
 	int write_seq;
-	int snd_una;
 	int pushed_seq;
 	int xmit_size_goal;
 	int nonagle;
@@ -174,32 +436,29 @@ struct sdp_sock {
 	int sdp_disconnect;
 	int destruct_in_process;
 
-	struct sdp_buf *rx_ring;
-	struct sdp_buf   *tx_ring;
+	struct sdp_rx_ring rx_ring;
+	struct sdp_tx_ring tx_ring;
 
-	/* rdma specific */
-	struct ib_qp *qp;
-	struct ib_cq *cq;
-	struct ib_mr *mr;
 	/* Data below will be reset on error */
 	struct rdma_cm_id *id;
 	struct ib_device *ib_device;
 
 	/* SDP specific */
-	struct ib_recv_wr rx_wr;
-	unsigned rx_head;
-	unsigned rx_tail;
-	unsigned mseq_ack;
-	unsigned bufs;
+	atomic_t mseq_ack;
+#define mseq_ack(ssk) (atomic_read(&ssk->mseq_ack))
 	unsigned max_bufs;	/* Initial buffers offered by other side */
 	unsigned min_bufs;	/* Low water mark to wake senders */
 
-	int               remote_credits;
+	unsigned long nagle_last_unacked; /* mseq of lastest unacked packet */
+	struct timer_list nagle_timer; /* timeout waiting for ack */
+
+	atomic_t               remote_credits;
+#define remote_credits(ssk) (atomic_read(&ssk->remote_credits))
 	int 		  poll_cq;
 
-	unsigned          tx_head;
-	unsigned          tx_tail;
-	struct ib_send_wr tx_wr;
+	/* rdma specific */
+	struct ib_qp *qp;
+	struct ib_mr *mr;
 
 	/* SDP slow start */
 	int rcvbuf_scale; 	/* local recv buf scale for each socket */
@@ -213,11 +472,14 @@ struct sdp_sock {
 	int recv_frags; 	/* max skb frags in recv packets */
 	int send_frags; 	/* max skb frags in send packets */
 
+	unsigned long tx_packets;
+	unsigned long rx_packets;
+	unsigned long tx_bytes;
+	unsigned long rx_bytes;
+	struct sdp_moderation auto_mod;
+
 	/* BZCOPY data */
 	int   zcopy_thresh;
-
-	struct ib_sge ibsge[SDP_MAX_SEND_SKB_FRAGS + 1];
-	struct ib_wc  ibwc[SDP_NUM_WC];
 };
 
 /* Context used for synchronous zero copy bcopy (BZCOY) */
@@ -236,26 +498,11 @@ struct bzcopy_state {
 extern int rcvbuf_initial_size;
 
 extern struct proto sdp_proto;
-extern struct workqueue_struct *sdp_workqueue;
+extern struct workqueue_struct *rx_comp_wq;
 
 extern atomic_t sdp_current_mem_usage;
 extern spinlock_t sdp_large_sockets_lock;
 
-/* just like TCP fs */
-struct sdp_seq_afinfo {
-	struct module           *owner;
-	char                    *name;
-	sa_family_t             family;
-	int                     (*seq_show) (struct seq_file *m, void *v);
-	struct file_operations  *seq_fops;
-};
-
-struct sdp_iter_state {
-	sa_family_t             family;
-	int                     num;
-	struct seq_operations   seq_ops;
-};
-
 static inline struct sdp_sock *sdp_sk(const struct sock *sk)
 {
 	        return (struct sdp_sock *)sk;
@@ -290,9 +537,10 @@ static inline int _sdp_exch_state(const char *func, int line, struct sock *sk,
 	int old;
 
 	spin_lock_irqsave(&sdp_sk(sk)->lock, flags);
-	
+
 	sdp_dbg(sk, "%s:%d - set state: %s -> %s 0x%x\n", func, line,
-		sdp_state_str(sk->sk_state), sdp_state_str(state), from_states);
+		sdp_state_str(sk->sk_state),
+		sdp_state_str(state), from_states);
 
 	if ((1 << sk->sk_state) & ~from_states) {
 		sdp_warn(sk, "trying to exchange state from unexpected state "
@@ -327,35 +575,93 @@ static inline void sdp_set_error(struct sock *sk, int err)
 	sk->sk_error_report(sk);
 }
 
-extern struct workqueue_struct *sdp_workqueue;
+#ifdef CONFIG_INFINIBAND_SDP_DEBUG_DATA
+void _dump_packet(const char *func, int line, struct sock *sk, char *str,
+		struct sk_buff *skb, const struct sdp_bsdh *h);
+#define dump_packet(sk, str, skb, h) \
+	_dump_packet(__func__, __LINE__, sk, str, skb, h)
+#endif
 
-int sdp_cma_handler(struct rdma_cm_id *, struct rdma_cm_event *);
-void sdp_reset(struct sock *sk);
+/* sdp_main.c */
+void sdp_set_default_moderation(struct sdp_sock *ssk);
+int sdp_init_sock(struct sock *sk);
+void sdp_start_keepalive_timer(struct sock *sk);
+void sdp_remove_sock(struct sdp_sock *ssk);
+void sdp_add_sock(struct sdp_sock *ssk);
+void sdp_urg(struct sdp_sock *ssk, struct sk_buff *skb);
+void sdp_dreq_wait_timeout_work(struct work_struct *work);
+void sdp_cancel_dreq_wait_timeout(struct sdp_sock *ssk);
+void sdp_destroy_work(struct work_struct *work);
 void sdp_reset_sk(struct sock *sk, int rc);
-void sdp_completion_handler(struct ib_cq *cq, void *cq_context);
-void sdp_work(struct work_struct *work);
+void sdp_reset(struct sock *sk);
+
+/* sdp_proc.c */
+int __init sdp_proc_init(void);
+void sdp_proc_unregister(void);
+
+/* sdp_cma.c */
+int sdp_cma_handler(struct rdma_cm_id *, struct rdma_cm_event *);
+
+/* sdp_bcopy.c */
 int sdp_post_credits(struct sdp_sock *ssk);
+
+/* sdp_tx.c */
+int sdp_tx_ring_create(struct sdp_sock *ssk, struct ib_device *device);
+void sdp_tx_ring_destroy(struct sdp_sock *ssk);
+int sdp_xmit_poll(struct sdp_sock *ssk, int force);
 void sdp_post_send(struct sdp_sock *ssk, struct sk_buff *skb, u8 mid);
-void sdp_post_recvs(struct sdp_sock *ssk);
-int sdp_poll_cq(struct sdp_sock *ssk, struct ib_cq *cq);
 void sdp_post_sends(struct sdp_sock *ssk, int nonagle);
-void sdp_destroy_work(struct work_struct *work);
-void sdp_cancel_dreq_wait_timeout(struct sdp_sock *ssk);
-void sdp_dreq_wait_timeout_work(struct work_struct *work);
-struct sk_buff *sdp_recv_completion(struct sdp_sock *ssk, int id);
-struct sk_buff *sdp_send_completion(struct sdp_sock *ssk, int mseq);
-void sdp_urg(struct sdp_sock *ssk, struct sk_buff *skb);
-void sdp_add_sock(struct sdp_sock *ssk);
-void sdp_remove_sock(struct sdp_sock *ssk);
-void sdp_remove_large_sock(struct sdp_sock *ssk);
+void sdp_nagle_timeout(unsigned long data);
+void sdp_post_keepalive(struct sdp_sock *ssk);
+
+/* sdp_rx.c */
+void sdp_rx_ring_init(struct sdp_sock *ssk);
+int sdp_rx_ring_create(struct sdp_sock *ssk, struct ib_device *device);
+void sdp_rx_ring_destroy(struct sdp_sock *ssk);
 int sdp_resize_buffers(struct sdp_sock *ssk, u32 new_size);
 int sdp_init_buffers(struct sdp_sock *ssk, u32 new_size);
-void sdp_post_keepalive(struct sdp_sock *ssk);
-void sdp_start_keepalive_timer(struct sock *sk);
-void sdp_bzcopy_write_space(struct sdp_sock *ssk);
-int sdp_init_sock(struct sock *sk);
+void sdp_do_posts(struct sdp_sock *ssk);
+void sdp_rx_comp_full(struct sdp_sock *ssk);
+void sdp_remove_large_sock(struct sdp_sock *ssk);
 
-static inline struct sk_buff *sdp_stream_alloc_skb(struct sock *sk, int size, gfp_t gfp)
+static inline void sdp_arm_rx_cq(struct sock *sk)
+{
+	sdp_prf(sk, NULL, "Arming RX cq");
+	sdp_dbg_data(sk, "Arming RX cq\n");
+
+	ib_req_notify_cq(sdp_sk(sk)->rx_ring.cq, IB_CQ_NEXT_COMP);
+}
+
+static inline void sdp_arm_tx_cq(struct sock *sk)
+{
+	sdp_prf(sk, NULL, "Arming TX cq");
+	sdp_dbg_data(sk, "Arming TX cq. credits: %d, posted: %d\n",
+		tx_credits(sdp_sk(sk)), ring_posted(sdp_sk(sk)->tx_ring));
+
+	ib_req_notify_cq(sdp_sk(sk)->tx_ring.cq, IB_CQ_NEXT_COMP);
+}
+
+/* utilities */
+static inline char *mid2str(int mid)
+{
+#define ENUM2STR(e) [e] = #e
+	static char *mid2str[] = {
+		ENUM2STR(SDP_MID_HELLO),
+		ENUM2STR(SDP_MID_HELLO_ACK),
+		ENUM2STR(SDP_MID_DISCONN),
+		ENUM2STR(SDP_MID_CHRCVBUF),
+		ENUM2STR(SDP_MID_CHRCVBUF_ACK),
+		ENUM2STR(SDP_MID_DATA),
+	};
+
+	if (mid >= ARRAY_SIZE(mid2str))
+		return NULL;
+
+	return mid2str[mid];
+}
+
+static inline struct sk_buff *sdp_stream_alloc_skb(struct sock *sk, int size,
+		gfp_t gfp)
 {
 	struct sk_buff *skb;
 
@@ -380,5 +686,4 @@ static inline struct sk_buff *sdp_stream_alloc_skb(struct sock *sk, int size, gf
 	return NULL;
 }
 
-
 #endif
diff --git a/drivers/infiniband/ulp/sdp/sdp_bcopy.c b/drivers/infiniband/ulp/sdp/sdp_bcopy.c
index 475d7c3..e2c4ffd 100644
--- a/drivers/infiniband/ulp/sdp/sdp_bcopy.c
+++ b/drivers/infiniband/ulp/sdp/sdp_bcopy.c
@@ -39,459 +39,139 @@
 
 #define SDP_RESIZE_WAIT 16
 
-struct sdp_chrecvbuf {
-	u32 size;
-};
-
-static int rcvbuf_scale = 0x10;
-
-int rcvbuf_initial_size = SDP_HEAD_SIZE;
-module_param_named(rcvbuf_initial_size, rcvbuf_initial_size, int, 0644);
-MODULE_PARM_DESC(rcvbuf_initial_size, "Receive buffer initial size in bytes.");
-
-module_param_named(rcvbuf_scale, rcvbuf_scale, int, 0644);
-MODULE_PARM_DESC(rcvbuf_scale, "Receive buffer size scale factor.");
-
-static int top_mem_usage = 0;
-module_param_named(top_mem_usage, top_mem_usage, int, 0644);
-MODULE_PARM_DESC(top_mem_usage, "Top system wide sdp memory usage for recv (in MB).");
-
-#ifdef CONFIG_PPC
-static int max_large_sockets = 100;
-#else
-static int max_large_sockets = 1000;
-#endif
-module_param_named(max_large_sockets, max_large_sockets, int, 0644);
-MODULE_PARM_DESC(max_large_sockets, "Max number of large sockets (32k buffers).");
-
-#define sdp_cnt(var) do { (var)++; } while (0)
-static unsigned sdp_keepalive_probes_sent = 0;
-
-module_param_named(sdp_keepalive_probes_sent, sdp_keepalive_probes_sent, uint, 0644);
-MODULE_PARM_DESC(sdp_keepalive_probes_sent, "Total number of keepalive probes sent.");
-
-static int curr_large_sockets = 0;
-atomic_t sdp_current_mem_usage;
-spinlock_t sdp_large_sockets_lock;
-
-static int sdp_get_large_socket(struct sdp_sock *ssk)
-{
-	int count, ret;
-
-	if (ssk->recv_request)
-		return 1;
-
-	spin_lock_irq(&sdp_large_sockets_lock);
-	count = curr_large_sockets;
-	ret = curr_large_sockets < max_large_sockets;
-	if (ret)
-		curr_large_sockets++;
-	spin_unlock_irq(&sdp_large_sockets_lock);
-
-	return ret;
-}
-
-void sdp_remove_large_sock(struct sdp_sock *ssk)
+#ifdef CONFIG_INFINIBAND_SDP_DEBUG_DATA
+void _dump_packet(const char *func, int line, struct sock *sk, char *str,
+		struct sk_buff *skb, const struct sdp_bsdh *h)
 {
-	if (ssk->recv_frags) {
-		spin_lock_irq(&sdp_large_sockets_lock);
-		curr_large_sockets--;
-		spin_unlock_irq(&sdp_large_sockets_lock);
-	}
-}
-
-/* Like tcp_fin - called when SDP_MID_DISCONNECT is received */
-static void sdp_fin(struct sock *sk)
-{
-	sdp_dbg(sk, "%s\n", __func__);
+	struct sdp_hh *hh;
+	struct sdp_hah *hah;
+	struct sdp_chrecvbuf *req_size;
+	int len = 0;
+	char buf[256];
+	len += snprintf(buf, 255-len, "%s skb: %p mid: %2x:%-20s flags: 0x%x "
+			"bufs: %d len: %d mseq: %d mseq_ack: %d | ",
+			str, skb, h->mid, mid2str(h->mid), h->flags,
+			ntohs(h->bufs), ntohl(h->len), ntohl(h->mseq),
+			ntohl(h->mseq_ack));
 
-	sk->sk_shutdown |= RCV_SHUTDOWN;
-	sock_set_flag(sk, SOCK_DONE);
-
-	switch (sk->sk_state) {
-	case TCP_SYN_RECV:
-	case TCP_ESTABLISHED:
-		sdp_exch_state(sk, TCPF_SYN_RECV | TCPF_ESTABLISHED,
-				TCP_CLOSE_WAIT);
+	switch (h->mid) {
+	case SDP_MID_HELLO:
+		hh = (struct sdp_hh *)h;
+		len += snprintf(buf + len, 255-len,
+				"max_adverts: %d  majv_minv: %d "
+				"localrcvsz: %d desremrcvsz: %d |",
+				hh->max_adverts, hh->majv_minv,
+				ntohl(hh->localrcvsz),
+				ntohl(hh->desremrcvsz));
 		break;
-
-	case TCP_FIN_WAIT1:
-		/* Received a reply FIN - start Infiniband tear down */
-		sdp_dbg(sk, "%s: Starting Infiniband tear down sending DREQ\n",
-				__func__);
-
-		sdp_cancel_dreq_wait_timeout(sdp_sk(sk));
-
-		sdp_exch_state(sk, TCPF_FIN_WAIT1, TCP_TIME_WAIT);
-
-		if (sdp_sk(sk)->id) {
-			rdma_disconnect(sdp_sk(sk)->id);
-		} else {
-			sdp_warn(sk, "%s: sdp_sk(sk)->id is NULL\n", __func__);
-			return;
-		}
+	case SDP_MID_HELLO_ACK:
+		hah = (struct sdp_hah *)h;
+		len += snprintf(buf + len, 255-len, "actrcvz: %d |",
+				ntohl(hah->actrcvsz));
 		break;
-	case TCP_TIME_WAIT:
-		/* This is a mutual close situation and we've got the DREQ from
-		   the peer before the SDP_MID_DISCONNECT */
+	case SDP_MID_CHRCVBUF:
+	case SDP_MID_CHRCVBUF_ACK:
+		req_size = (struct sdp_chrecvbuf *)(h+1);
+		len += snprintf(buf + len, 255-len, "req_size: %d |",
+				ntohl(req_size->size));
 		break;
-	case TCP_CLOSE:
-		/* FIN arrived after IB teardown started - do nothing */
-		sdp_dbg(sk, "%s: fin in state %s\n",
-				__func__, sdp_state_str(sk->sk_state));
-		return;
+	case SDP_MID_DATA:
+		len += snprintf(buf + len, 255-len, "data_len: %ld |",
+			ntohl(h->len) - sizeof(struct sdp_bsdh));
 	default:
-		sdp_warn(sk, "%s: FIN in unexpected state. sk->sk_state=%d\n",
-				__func__, sk->sk_state);
 		break;
 	}
-
-
-	sk_mem_reclaim(sk);
-
-	if (!sock_flag(sk, SOCK_DEAD)) {
-		sk->sk_state_change(sk);
-
-		/* Do not send POLL_HUP for half duplex close. */
-		if (sk->sk_shutdown == SHUTDOWN_MASK ||
-		    sk->sk_state == TCP_CLOSE)
-			sk_wake_async(sk, 1, POLL_HUP);
-		else
-			sk_wake_async(sk, 1, POLL_IN);
-	}
-}
-
-void sdp_post_keepalive(struct sdp_sock *ssk)
-{
-	int rc;
-	struct ib_send_wr wr, *bad_wr;
-
-	sdp_dbg(&ssk->isk.sk, "%s\n", __func__);
-
-	memset(&wr, 0, sizeof(wr));
-
-	wr.next    = NULL;
-	wr.wr_id   = 0;
-	wr.sg_list = NULL;
-	wr.num_sge = 0;
-	wr.opcode  = IB_WR_RDMA_WRITE;
-
-	rc = ib_post_send(ssk->qp, &wr, &bad_wr);
-	if (rc) {
-		sdp_dbg(&ssk->isk.sk, "ib_post_keepalive failed with status %d.\n", rc);
-		sdp_set_error(&ssk->isk.sk, -ECONNRESET);
-		wake_up(&ssk->wq);
-	}
-
-	sdp_cnt(sdp_keepalive_probes_sent);
+	buf[len] = 0;
+	_sdp_printk(func, line, KERN_WARNING, sk, "%s: %s\n", str, buf);
 }
+#endif
 
-void sdp_post_send(struct sdp_sock *ssk, struct sk_buff *skb, u8 mid)
+static inline void update_send_head(struct sock *sk, struct sk_buff *skb)
 {
-	struct sdp_buf *tx_req;
-	struct sdp_bsdh *h = (struct sdp_bsdh *)skb_push(skb, sizeof *h);
-	unsigned mseq = ssk->tx_head;
-	int i, rc, frags;
-	u64 addr;
-	struct ib_device *dev;
-	struct ib_sge *sge;
-	struct ib_send_wr *bad_wr;
-
-	h->mid = mid;
-	if (unlikely(TCP_SKB_CB(skb)->flags & TCPCB_FLAG_URG))
-		h->flags = SDP_OOB_PRES | SDP_OOB_PEND;
-	else
-		h->flags = 0;
-
-	h->bufs = htons(ssk->rx_head - ssk->rx_tail);
-	h->len = htonl(skb->len);
-	h->mseq = htonl(mseq);
-	h->mseq_ack = htonl(ssk->mseq_ack);
-
-	tx_req = &ssk->tx_ring[mseq & (SDP_TX_SIZE - 1)];
-	tx_req->skb = skb;
-	dev = ssk->ib_device;
-	sge = ssk->ibsge;
-	addr = ib_dma_map_single(dev, skb->data, skb->len - skb->data_len,
-				 DMA_TO_DEVICE);
-	tx_req->mapping[0] = addr;
-
-	/* TODO: proper error handling */
-	BUG_ON(ib_dma_mapping_error(dev, addr));
-
-	sge->addr = addr;
-	sge->length = skb->len - skb->data_len;
-	sge->lkey = ssk->mr->lkey;
-	frags = skb_shinfo(skb)->nr_frags;
-	for (i = 0; i < frags; ++i) {
-		++sge;
-		addr = ib_dma_map_page(dev, skb_shinfo(skb)->frags[i].page,
-				       skb_shinfo(skb)->frags[i].page_offset,
-				       skb_shinfo(skb)->frags[i].size,
-				       DMA_TO_DEVICE);
-		BUG_ON(ib_dma_mapping_error(dev, addr));
-		tx_req->mapping[i + 1] = addr;
-		sge->addr = addr;
-		sge->length = skb_shinfo(skb)->frags[i].size;
-		sge->lkey = ssk->mr->lkey;
-	}
-
-	ssk->tx_wr.next = NULL;
-	ssk->tx_wr.wr_id = ssk->tx_head | SDP_OP_SEND;
-	ssk->tx_wr.sg_list = ssk->ibsge;
-	ssk->tx_wr.num_sge = frags + 1;
-	ssk->tx_wr.opcode = IB_WR_SEND;
-	ssk->tx_wr.send_flags = IB_SEND_SIGNALED;
-	if (unlikely(TCP_SKB_CB(skb)->flags & TCPCB_FLAG_URG))
-		ssk->tx_wr.send_flags |= IB_SEND_SOLICITED;
-	rc = ib_post_send(ssk->qp, &ssk->tx_wr, &bad_wr);
-	++ssk->tx_head;
-	--ssk->bufs;
-	ssk->remote_credits = ssk->rx_head - ssk->rx_tail;
-	if (unlikely(rc)) {
-		sdp_dbg(&ssk->isk.sk, "ib_post_send failed with status %d.\n", rc);
-		sdp_set_error(&ssk->isk.sk, -ECONNRESET);
-		wake_up(&ssk->wq);
+	struct page *page;
+	sk->sk_send_head = skb->next;
+	if (sk->sk_send_head == (struct sk_buff *)&sk->sk_write_queue) {
+		sk->sk_send_head = NULL;
+		page = sk->sk_sndmsg_page;
+		if (page) {
+			put_page(page);
+			sk->sk_sndmsg_page = NULL;
+		}
 	}
 }
 
-struct sk_buff *sdp_send_completion(struct sdp_sock *ssk, int mseq)
+static inline int sdp_nagle_off(struct sdp_sock *ssk, struct sk_buff *skb)
 {
-	struct ib_device *dev;
-	struct sdp_buf *tx_req;
-	struct sk_buff *skb;
-	struct bzcopy_state *bz;
-	int i, frags;
-
-	if (unlikely(mseq != ssk->tx_tail)) {
-		printk(KERN_WARNING "Bogus send completion id %d tail %d\n",
-			mseq, ssk->tx_tail);
-		return NULL;
-	}
+	int send_now =
+		BZCOPY_STATE(skb) ||
+		 (ssk->nonagle & TCP_NAGLE_OFF) ||
+		!ssk->nagle_last_unacked ||
+		skb->next != (struct sk_buff *)&ssk->isk.sk.sk_write_queue ||
+		skb->len + sizeof(struct sdp_bsdh) >= ssk->xmit_size_goal ||
+		(TCP_SKB_CB(skb)->flags & TCPCB_FLAG_PSH);
 
-	dev = ssk->ib_device;
-        tx_req = &ssk->tx_ring[mseq & (SDP_TX_SIZE - 1)];
-	skb = tx_req->skb;
-	ib_dma_unmap_single(dev, tx_req->mapping[0], skb->len - skb->data_len,
-			    DMA_TO_DEVICE);
-	frags = skb_shinfo(skb)->nr_frags;
-	for (i = 0; i < frags; ++i) {
-		ib_dma_unmap_page(dev, tx_req->mapping[i + 1],
-				  skb_shinfo(skb)->frags[i].size,
-				  DMA_TO_DEVICE);
+	if (send_now) {
+		unsigned long mseq = ring_head(ssk->tx_ring);
+		ssk->nagle_last_unacked = mseq;
+	} else {
+		if (!timer_pending(&ssk->nagle_timer)) {
+			mod_timer(&ssk->nagle_timer,
+					jiffies + SDP_NAGLE_TIMEOUT);
+			sdp_dbg_data(&ssk->isk.sk, "Starting nagle timer\n");
+		}
 	}
+	sdp_dbg_data(&ssk->isk.sk, "send_now = %d last_unacked = %ld\n",
+		send_now, ssk->nagle_last_unacked);
 
-	ssk->snd_una += TCP_SKB_CB(skb)->end_seq;
-	++ssk->tx_tail;
-
-	/* TODO: AIO and real zcopy cdoe; add their context support here */
-	bz = *(struct bzcopy_state **)skb->cb;
-	if (bz)
-		bz->busy--;
-
-	return skb;
+	return send_now;
 }
 
-
-static void sdp_post_recv(struct sdp_sock *ssk)
+void sdp_nagle_timeout(unsigned long data)
 {
-	struct sdp_buf *rx_req;
-	int i, rc, frags;
-	u64 addr;
-	struct ib_device *dev;
-	struct ib_sge *sge;
-	struct ib_recv_wr *bad_wr;
-	struct sk_buff *skb;
-	struct page *page;
-	skb_frag_t *frag;
-	struct sdp_bsdh *h;
-	int id = ssk->rx_head;
-	gfp_t gfp_page;
-
-	/* Now, allocate and repost recv */
-	/* TODO: allocate from cache */
-
-	if (unlikely(ssk->isk.sk.sk_allocation)) {
-		skb = sdp_stream_alloc_skb(&ssk->isk.sk, SDP_HEAD_SIZE,
-					  ssk->isk.sk.sk_allocation);
-		gfp_page = ssk->isk.sk.sk_allocation | __GFP_HIGHMEM;
-	} else {
-		skb = sdp_stream_alloc_skb(&ssk->isk.sk, SDP_HEAD_SIZE,
-					  GFP_KERNEL);
-		gfp_page = GFP_HIGHUSER;
-	}
-
-	/* FIXME */
-	BUG_ON(!skb);
-	h = (struct sdp_bsdh *)skb->head;
-	for (i = 0; i < ssk->recv_frags; ++i) {
-		page = alloc_pages(gfp_page, 0);
-		BUG_ON(!page);
-		frag = &skb_shinfo(skb)->frags[i];
-		frag->page                = page;
-		frag->page_offset         = 0;
-
-		/* Bugzilla 1311 */
-		if ( sizeof(frag->size) < 4 )
-			frag->size = min(PAGE_SIZE, SDP_MAX_PAYLOAD);
-		else
-			frag->size = PAGE_SIZE;
-
-		++skb_shinfo(skb)->nr_frags;
-		skb->len += frag->size;
-		skb->data_len += frag->size;
-		skb->truesize += frag->size;
-	}
-
-        rx_req = ssk->rx_ring + (id & (SDP_RX_SIZE - 1));
-	rx_req->skb = skb;
-	dev = ssk->ib_device;
-	sge = ssk->ibsge;
-	addr = ib_dma_map_single(dev, h, SDP_HEAD_SIZE, DMA_FROM_DEVICE);
-	BUG_ON(ib_dma_mapping_error(dev, addr));
+	struct sdp_sock *ssk = (struct sdp_sock *)data;
+	struct sock *sk = &ssk->isk.sk;
 
-	rx_req->mapping[0] = addr;
+	sdp_dbg_data(sk, "last_unacked = %ld\n", ssk->nagle_last_unacked);
 
-	/* TODO: proper error handling */
-	sge->addr = (u64)addr;
-	sge->length = SDP_HEAD_SIZE;
-	sge->lkey = ssk->mr->lkey;
-	frags = skb_shinfo(skb)->nr_frags;
-	for (i = 0; i < frags; ++i) {
-		++sge;
-		addr = ib_dma_map_page(dev, skb_shinfo(skb)->frags[i].page,
-				       skb_shinfo(skb)->frags[i].page_offset,
-				       skb_shinfo(skb)->frags[i].size,
-				       DMA_FROM_DEVICE);
-		BUG_ON(ib_dma_mapping_error(dev, addr));
-		rx_req->mapping[i + 1] = addr;
-		sge->addr = addr;
-		sge->length = skb_shinfo(skb)->frags[i].size;
-		sge->lkey = ssk->mr->lkey;
-	}
+	if (!ssk->nagle_last_unacked)
+		goto out2;
 
-	ssk->rx_wr.next = NULL;
-	ssk->rx_wr.wr_id = id | SDP_OP_RECV;
-	ssk->rx_wr.sg_list = ssk->ibsge;
-	ssk->rx_wr.num_sge = frags + 1;
-	rc = ib_post_recv(ssk->qp, &ssk->rx_wr, &bad_wr);
-	++ssk->rx_head;
-	if (unlikely(rc)) {
-		sdp_dbg(&ssk->isk.sk, "ib_post_recv failed with status %d\n", rc);
-		sdp_reset(&ssk->isk.sk);
+	/* Only process if the socket is not in use */
+	bh_lock_sock(sk);
+	if (sock_owned_by_user(sk)) {
+		sdp_dbg_data(sk, "socket is busy - will try later\n");
+		goto out;
 	}
 
-	atomic_add(SDP_MAX_SEND_SKB_FRAGS, &sdp_current_mem_usage);
-}
-
-void sdp_post_recvs(struct sdp_sock *ssk)
-{
-	struct sock *sk = &ssk->isk.sk;
-	int scale = ssk->rcvbuf_scale;
-
-	if (unlikely(!ssk->id || ((1 << sk->sk_state) & 
-		(TCPF_CLOSE | TCPF_TIME_WAIT)))) {
+	if (sk->sk_state == TCP_CLOSE) {
+		bh_unlock_sock(sk);
 		return;
 	}
 
-	if (top_mem_usage &&
-	    (top_mem_usage * 0x100000) < atomic_read(&sdp_current_mem_usage) * PAGE_SIZE)
-		scale = 1;
-
-	while ((likely(ssk->rx_head - ssk->rx_tail < SDP_RX_SIZE) &&
-		(ssk->rx_head - ssk->rx_tail - SDP_MIN_BUFS) *
-		(SDP_HEAD_SIZE + ssk->recv_frags * PAGE_SIZE) +
-		ssk->rcv_nxt - ssk->copied_seq < sk->sk_rcvbuf * scale) ||
-	       unlikely(ssk->rx_head - ssk->rx_tail < SDP_MIN_BUFS))
-		sdp_post_recv(ssk);
-}
-
-struct sk_buff *sdp_recv_completion(struct sdp_sock *ssk, int id)
-{
-	struct sdp_buf *rx_req;
-	struct ib_device *dev;
-	struct sk_buff *skb;
-	int i, frags;
-
-	if (unlikely(id != ssk->rx_tail)) {
-		printk(KERN_WARNING "Bogus recv completion id %d tail %d\n",
-			id, ssk->rx_tail);
-		return NULL;
-	}
+	ssk->nagle_last_unacked = 0;
+	sdp_post_sends(ssk, 0);
 
-	dev = ssk->ib_device;
-        rx_req = &ssk->rx_ring[id & (SDP_RX_SIZE - 1)];
-	skb = rx_req->skb;
-	ib_dma_unmap_single(dev, rx_req->mapping[0], SDP_HEAD_SIZE,
-			    DMA_FROM_DEVICE);
-	frags = skb_shinfo(skb)->nr_frags;
-	for (i = 0; i < frags; ++i)
-		ib_dma_unmap_page(dev, rx_req->mapping[i + 1],
-				  skb_shinfo(skb)->frags[i].size,
-				  DMA_FROM_DEVICE);
-	++ssk->rx_tail;
-	--ssk->remote_credits;
-	return skb;
+	if (sk->sk_sleep && waitqueue_active(sk->sk_sleep))
+		sk_stream_write_space(&ssk->isk.sk);
+out:
+	bh_unlock_sock(sk);
+out2:
+	if (sk->sk_send_head) /* If has pending sends - rearm */
+		mod_timer(&ssk->nagle_timer, jiffies + SDP_NAGLE_TIMEOUT);
 }
 
-/* Here because I do not want queue to fail. */
-static inline struct sk_buff *sdp_sock_queue_rcv_skb(struct sock *sk,
-						     struct sk_buff *skb)
+int sdp_post_credits(struct sdp_sock *ssk)
 {
-	int skb_len;
-	struct sdp_sock *ssk = sdp_sk(sk);
-	struct sk_buff *tail = NULL;
-
-	/* not needed since sk_rmem_alloc is not currently used
-	 * TODO - remove this?
-	skb_set_owner_r(skb, sk); */
-
-	skb_len = skb->len;
+	int post_count = 0;
 
-	TCP_SKB_CB(skb)->seq = ssk->rcv_nxt;
-	ssk->rcv_nxt += skb_len;
+	sdp_dbg_data(&ssk->isk.sk, "credits: %d remote credits: %d "
+			"tx ring slots left: %d send_head: %p\n",
+		tx_credits(ssk), remote_credits(ssk),
+		sdp_tx_ring_slots_left(&ssk->tx_ring),
+		ssk->isk.sk.sk_send_head);
 
-	if (likely(skb_len && (tail = skb_peek_tail(&sk->sk_receive_queue))) &&
-	    unlikely(skb_tailroom(tail) >= skb_len)) {
-		skb_copy_bits(skb, 0, skb_put(tail, skb_len), skb_len);
-		__kfree_skb(skb);
-		skb = tail;
-	} else
-		skb_queue_tail(&sk->sk_receive_queue, skb);
-
-	if (!sock_flag(sk, SOCK_DEAD))
-		sk->sk_data_ready(sk, skb_len);
-	return skb;
-}
-
-static inline void update_send_head(struct sock *sk, struct sk_buff *skb)
-{
-	struct page *page;
-	sk->sk_send_head = skb->next;
-	if (sk->sk_send_head == (struct sk_buff *)&sk->sk_write_queue) {
-		sk->sk_send_head = NULL;
-		page = sk->sk_sndmsg_page;
-		if (page) {
-			put_page(page);
-			sk->sk_sndmsg_page = NULL;
-		}
-	}
-}
-
-static inline int sdp_nagle_off(struct sdp_sock *ssk, struct sk_buff *skb)
-{
-	return (ssk->nonagle & TCP_NAGLE_OFF) ||
-		skb->next != (struct sk_buff *)&ssk->isk.sk.sk_write_queue ||
-		skb->len + sizeof(struct sdp_bsdh) >= ssk->xmit_size_goal ||
-		(ssk->tx_tail == ssk->tx_head &&
-		 !(ssk->nonagle & TCP_NAGLE_CORK)) ||
-		(TCP_SKB_CB(skb)->flags & TCPCB_FLAG_PSH);
-}
-
-int sdp_post_credits(struct sdp_sock *ssk)
-{
-	if (likely(ssk->bufs > 1) &&
-	    likely(ssk->tx_head - ssk->tx_tail < SDP_TX_SIZE)) {
+	if (likely(tx_credits(ssk) > 1) &&
+	    likely(sdp_tx_ring_slots_left(&ssk->tx_ring))) {
 		struct sk_buff *skb;
 		skb = sdp_stream_alloc_skb(&ssk->isk.sk,
 					  sizeof(struct sdp_bsdh),
@@ -499,8 +179,12 @@ int sdp_post_credits(struct sdp_sock *ssk)
 		if (!skb)
 			return -ENOMEM;
 		sdp_post_send(ssk, skb, SDP_MID_DATA);
+		post_count++;
 	}
-	return 0;
+
+	if (post_count)
+		sdp_xmit_poll(ssk, 0);
+	return post_count;
 }
 
 void sdp_post_sends(struct sdp_sock *ssk, int nonagle)
@@ -509,6 +193,7 @@ void sdp_post_sends(struct sdp_sock *ssk, int nonagle)
 	struct sk_buff *skb;
 	int c;
 	gfp_t gfp_page;
+	int post_count = 0;
 
 	if (unlikely(!ssk->id)) {
 		if (ssk->isk.sk.sk_send_head) {
@@ -525,10 +210,15 @@ void sdp_post_sends(struct sdp_sock *ssk, int nonagle)
 	else
 		gfp_page = GFP_KERNEL;
 
+	if (sdp_tx_ring_slots_left(&ssk->tx_ring) < SDP_TX_SIZE / 2) {
+		int wc_processed = sdp_xmit_poll(ssk,  1);
+		sdp_dbg_data(&ssk->isk.sk, "freed %d\n", wc_processed);
+	}
+
 	if (ssk->recv_request &&
-	    ssk->rx_tail >= ssk->recv_request_head &&
-	    ssk->bufs >= SDP_MIN_BUFS &&
-	    ssk->tx_head - ssk->tx_tail < SDP_TX_SIZE) {
+	    ring_tail(ssk->rx_ring) >= ssk->recv_request_head &&
+	    tx_credits(ssk) >= SDP_MIN_TX_CREDITS &&
+	    sdp_tx_ring_slots_left(&ssk->tx_ring)) {
 		struct sdp_chrecvbuf *resp_size;
 		ssk->recv_request = 0;
 		skb = sdp_stream_alloc_skb(&ssk->isk.sk,
@@ -537,45 +227,37 @@ void sdp_post_sends(struct sdp_sock *ssk, int nonagle)
 					  gfp_page);
 		/* FIXME */
 		BUG_ON(!skb);
-		resp_size = (struct sdp_chrecvbuf *)skb_put(skb, sizeof *resp_size);
+		resp_size = (struct sdp_chrecvbuf *)skb_put(skb,
+				sizeof *resp_size);
 		resp_size->size = htonl(ssk->recv_frags * PAGE_SIZE);
 		sdp_post_send(ssk, skb, SDP_MID_CHRCVBUF_ACK);
+		post_count++;
+	}
+
+	if (tx_credits(ssk) <= SDP_MIN_TX_CREDITS &&
+	       sdp_tx_ring_slots_left(&ssk->tx_ring) &&
+	       ssk->isk.sk.sk_send_head &&
+		sdp_nagle_off(ssk, ssk->isk.sk.sk_send_head)) {
+		SDPSTATS_COUNTER_INC(send_miss_no_credits);
 	}
 
-	while (ssk->bufs > SDP_MIN_BUFS &&
-	       ssk->tx_head - ssk->tx_tail < SDP_TX_SIZE &&
+	while (tx_credits(ssk) > SDP_MIN_TX_CREDITS &&
+	       sdp_tx_ring_slots_left(&ssk->tx_ring) &&
 	       (skb = ssk->isk.sk.sk_send_head) &&
 		sdp_nagle_off(ssk, skb)) {
 		update_send_head(&ssk->isk.sk, skb);
 		__skb_dequeue(&ssk->isk.sk.sk_write_queue);
 		sdp_post_send(ssk, skb, SDP_MID_DATA);
+		post_count++;
 	}
 
-	if (ssk->bufs == SDP_MIN_BUFS &&
-	    !ssk->sent_request &&
-	    ssk->tx_head > ssk->sent_request_head + SDP_RESIZE_WAIT &&
-	    ssk->tx_head - ssk->tx_tail < SDP_TX_SIZE) {
-		struct sdp_chrecvbuf *req_size;
-		skb = sdp_stream_alloc_skb(&ssk->isk.sk,
-					  sizeof(struct sdp_bsdh) +
-					  sizeof(*req_size),
-					  gfp_page);
-		/* FIXME */
-		BUG_ON(!skb);
-		ssk->sent_request = SDP_MAX_SEND_SKB_FRAGS * PAGE_SIZE;
-		ssk->sent_request_head = ssk->tx_head;
-		req_size = (struct sdp_chrecvbuf *)skb_put(skb, sizeof *req_size);
-		req_size->size = htonl(ssk->sent_request);
-		sdp_post_send(ssk, skb, SDP_MID_CHRCVBUF);
-	}
-
-	c = ssk->remote_credits;
-	if (likely(c > SDP_MIN_BUFS))
+	c = remote_credits(ssk);
+	if (likely(c > SDP_MIN_TX_CREDITS))
 		c *= 2;
 
-	if (unlikely(c < ssk->rx_head - ssk->rx_tail) &&
-	    likely(ssk->bufs > 1) &&
-	    likely(ssk->tx_head - ssk->tx_tail < SDP_TX_SIZE) &&
+	if (unlikely(c < ring_posted(ssk->rx_ring)) &&
+	    likely(tx_credits(ssk) > 1) &&
+	    likely(sdp_tx_ring_slots_left(&ssk->tx_ring)) &&
 	    likely((1 << ssk->isk.sk.sk_state) &
 		    (TCPF_ESTABLISHED | TCPF_FIN_WAIT1))) {
 		skb = sdp_stream_alloc_skb(&ssk->isk.sk,
@@ -583,12 +265,19 @@ void sdp_post_sends(struct sdp_sock *ssk, int nonagle)
 					  GFP_KERNEL);
 		/* FIXME */
 		BUG_ON(!skb);
+		SDPSTATS_COUNTER_INC(post_send_credits);
 		sdp_post_send(ssk, skb, SDP_MID_DATA);
+		post_count++;
 	}
 
+	/* send DisConn if needed
+	 * Do not send DisConn if there is only 1 credit. Compliance with CA4-82
+	 * If one credit is available, an implementation shall only send SDP
+	 * messages that provide additional credits and also do not contain ULP
+	 * payload. */
 	if (unlikely(ssk->sdp_disconnect) &&
-		!ssk->isk.sk.sk_send_head &&
-		ssk->bufs > (ssk->remote_credits >= ssk->rx_head - ssk->rx_tail)) {
+			!ssk->isk.sk.sk_send_head &&
+			tx_credits(ssk) > 1) {
 		ssk->sdp_disconnect = 0;
 		skb = sdp_stream_alloc_skb(&ssk->isk.sk,
 					  sizeof(struct sdp_bsdh),
@@ -596,307 +285,9 @@ void sdp_post_sends(struct sdp_sock *ssk, int nonagle)
 		/* FIXME */
 		BUG_ON(!skb);
 		sdp_post_send(ssk, skb, SDP_MID_DISCONN);
+		post_count++;
 	}
-}
-
-int sdp_init_buffers(struct sdp_sock *ssk, u32 new_size)
-{
-	ssk->recv_frags = PAGE_ALIGN(new_size - SDP_HEAD_SIZE) / PAGE_SIZE;
-	if (ssk->recv_frags > SDP_MAX_SEND_SKB_FRAGS)
-		ssk->recv_frags = SDP_MAX_SEND_SKB_FRAGS;
-	ssk->rcvbuf_scale = rcvbuf_scale;
-
-	sdp_post_recvs(ssk);
-
-	return 0;
-}
-
-int sdp_resize_buffers(struct sdp_sock *ssk, u32 new_size)
-{
-	skb_frag_t skb_frag;
-	u32 curr_size = SDP_HEAD_SIZE + ssk->recv_frags * PAGE_SIZE;
-	u32 max_size = SDP_HEAD_SIZE + SDP_MAX_SEND_SKB_FRAGS * PAGE_SIZE;
-
-	/* Bugzilla 1311, Kernels using smaller fragments must reject
-	 * re-size requests larger than 32k to prevent being sent
-	 * fragment larger than the receive buffer fragment.
-	 */
-	if ( (sizeof(skb_frag.size) < 4) && (max_size > 0x8000))
-		max_size = 0x8000;
-
-	if (new_size > curr_size && new_size <= max_size &&
-	    sdp_get_large_socket(ssk)) {
-		ssk->rcvbuf_scale = rcvbuf_scale;
-		ssk->recv_frags = PAGE_ALIGN(new_size - SDP_HEAD_SIZE) / PAGE_SIZE;
-		if (ssk->recv_frags > SDP_MAX_SEND_SKB_FRAGS)
-			ssk->recv_frags = SDP_MAX_SEND_SKB_FRAGS;
-		return 0;
-	} else
-		return -1;
-}
-
-static void sdp_handle_resize_request(struct sdp_sock *ssk, struct sdp_chrecvbuf *buf)
-{
-	if (sdp_resize_buffers(ssk, ntohl(buf->size)) == 0)
-		ssk->recv_request_head = ssk->rx_head + 1;
-	else
-		ssk->recv_request_head = ssk->rx_tail;
-	ssk->recv_request = 1;
-}
-
-static void sdp_handle_resize_ack(struct sdp_sock *ssk, struct sdp_chrecvbuf *buf)
-{
-	u32 new_size = ntohl(buf->size);
-
-	if (new_size > ssk->xmit_size_goal) {
-		ssk->sent_request = -1;
-		ssk->xmit_size_goal = new_size;
-		ssk->send_frags =
-			PAGE_ALIGN(ssk->xmit_size_goal) / PAGE_SIZE;
-	} else
-		ssk->sent_request = 0;
-}
-
-static int sdp_handle_recv_comp(struct sdp_sock *ssk, struct ib_wc *wc)
-{
-	struct sock *sk = &ssk->isk.sk;
-	int frags;
-	struct sk_buff *skb;
-	struct sdp_bsdh *h;
-	int pagesz, i;
-
-	skb = sdp_recv_completion(ssk, wc->wr_id);
-	if (unlikely(!skb))
-		return -1;
-
-	atomic_sub(SDP_MAX_SEND_SKB_FRAGS, &sdp_current_mem_usage);
-
-	if (unlikely(wc->status)) {
-		if (wc->status != IB_WC_WR_FLUSH_ERR) {
-			sdp_dbg(sk, "Recv completion with error. Status %d\n",
-				wc->status);
-			sdp_reset(sk);
-		}
-		__kfree_skb(skb);
-		return 0;
-	}
-
-	sdp_dbg_data(sk, "Recv completion. ID %d Length %d\n",
-			(int)wc->wr_id, wc->byte_len);
-	if (unlikely(wc->byte_len < sizeof(struct sdp_bsdh))) {
-		printk(KERN_WARNING "SDP BUG! byte_len %d < %zd\n",
-				wc->byte_len, sizeof(struct sdp_bsdh));
-		__kfree_skb(skb);
-		return -1;
-	}
-	skb->len = wc->byte_len;
-	if (likely(wc->byte_len > SDP_HEAD_SIZE))
-		skb->data_len = wc->byte_len - SDP_HEAD_SIZE;
-	else
-		skb->data_len = 0;
-	skb->data = skb->head;
-#ifdef NET_SKBUFF_DATA_USES_OFFSET
-	skb->tail = skb_headlen(skb);
-#else
-	skb->tail = skb->head + skb_headlen(skb);
-#endif
-	h = (struct sdp_bsdh *)skb->data;
-	skb_reset_transport_header(skb);
-	ssk->mseq_ack = ntohl(h->mseq);
-	if (ssk->mseq_ack != (int)wc->wr_id)
-		printk(KERN_WARNING "SDP BUG! mseq %d != wrid %d\n",
-				ssk->mseq_ack, (int)wc->wr_id);
-	ssk->bufs = ntohl(h->mseq_ack) - ssk->tx_head + 1 +
-		ntohs(h->bufs);
-
-	frags = skb_shinfo(skb)->nr_frags;
-	pagesz = PAGE_ALIGN(skb->data_len);
-	skb_shinfo(skb)->nr_frags = pagesz / PAGE_SIZE;
-
-	for (i = skb_shinfo(skb)->nr_frags;
-			i < frags; ++i) {
-		put_page(skb_shinfo(skb)->frags[i].page);
-		skb->truesize -= PAGE_SIZE;
-	}
-
-	if (unlikely(h->flags & SDP_OOB_PEND))
-		sk_send_sigurg(sk);
-
-	skb_pull(skb, sizeof(struct sdp_bsdh));
-
-	switch (h->mid) {
-	case SDP_MID_DATA:
-		if (unlikely(skb->len <= 0)) {
-			__kfree_skb(skb);
-			break;
-		}
-
-		if (unlikely(sk->sk_shutdown & RCV_SHUTDOWN)) {
-			/* got data in RCV_SHUTDOWN */
-			if (sk->sk_state == TCP_FIN_WAIT1) {
-				/* go into abortive close */
-				sdp_exch_state(sk, TCPF_FIN_WAIT1,
-					       TCP_TIME_WAIT);
-
-				sk->sk_prot->disconnect(sk, 0);
-			}
-
-			__kfree_skb(skb);
-			break;
-		}
-		skb = sdp_sock_queue_rcv_skb(sk, skb);
-		if (unlikely(h->flags & SDP_OOB_PRES))
-			sdp_urg(ssk, skb);
-		break;
-	case SDP_MID_DISCONN:
-		__kfree_skb(skb);
-		sdp_fin(sk);
-		break;
-	case SDP_MID_CHRCVBUF:
-		sdp_handle_resize_request(ssk,
-			(struct sdp_chrecvbuf *)skb->data);
-		__kfree_skb(skb);
-		break;
-	case SDP_MID_CHRCVBUF_ACK:
-		sdp_handle_resize_ack(ssk, (struct sdp_chrecvbuf *)skb->data);
-		__kfree_skb(skb);
-		break;
-	default:
-		/* TODO: Handle other messages */
-		printk(KERN_WARNING "SDP: FIXME MID %d\n", h->mid);
-		__kfree_skb(skb);
-	}
-
-	return 0;
-}
-
-static int sdp_handle_send_comp(struct sdp_sock *ssk, struct ib_wc *wc)
-{
-	struct sk_buff *skb;
-	struct sdp_bsdh *h;
-
-	skb = sdp_send_completion(ssk, wc->wr_id);
-	if (unlikely(!skb))
-		return -1;
-
-	if (unlikely(wc->status)) {
-		if (wc->status != IB_WC_WR_FLUSH_ERR) {
-			struct sock *sk = &ssk->isk.sk;
-			sdp_dbg(sk, "Send completion with error. "
-				"Status %d\n", wc->status);
-			sdp_set_error(sk, -ECONNRESET);
-			wake_up(&ssk->wq);
-
-			queue_work(sdp_workqueue, &ssk->destroy_work);
-		}
-		goto out;
-	}
-
-	h = (struct sdp_bsdh *)skb->data;
-
-	if (likely(h->mid != SDP_MID_DISCONN))
-		goto out;
-
-	if ((1 << ssk->isk.sk.sk_state) & ~(TCPF_FIN_WAIT1 | TCPF_LAST_ACK)) {
-		sdp_dbg(&ssk->isk.sk,
-			"%s: sent DISCONNECT from unexpected state %d\n",
-			__func__, ssk->isk.sk.sk_state);
-	}
-
-out:
-	sk_wmem_free_skb(&ssk->isk.sk, skb);
-
-	return 0;
-}
-
-static void sdp_handle_wc(struct sdp_sock *ssk, struct ib_wc *wc)
-{
-	if (wc->wr_id & SDP_OP_RECV) {
-		if (sdp_handle_recv_comp(ssk, wc))
-			return;
-	} else if (likely(wc->wr_id & SDP_OP_SEND)) {
-		if (sdp_handle_send_comp(ssk, wc))
-			return;
-	} else {
-		sdp_cnt(sdp_keepalive_probes_sent);
 
-		if (likely(!wc->status))
-			return;
-
-		sdp_dbg(&ssk->isk.sk, " %s consumes KEEPALIVE status %d\n",
-		        __func__, wc->status);
-
-		if (wc->status == IB_WC_WR_FLUSH_ERR)
-			return;
-
-		sdp_set_error(&ssk->isk.sk, -ECONNRESET);
-		wake_up(&ssk->wq);
-
-		return;
-	}
-}
-
-void sdp_completion_handler(struct ib_cq *cq, void *cq_context)
-{
-	struct sock *sk = cq_context;
-	struct sdp_sock *ssk = sdp_sk(sk);
-	schedule_work(&ssk->work);
-}
-
-int sdp_poll_cq(struct sdp_sock *ssk, struct ib_cq *cq)
-{
-	int n, i;
-	int ret = -EAGAIN;
-	do {
-		n = ib_poll_cq(cq, SDP_NUM_WC, ssk->ibwc);
-		for (i = 0; i < n; ++i) {
-			sdp_handle_wc(ssk, ssk->ibwc + i);
-			ret = 0;
-		}
-	} while (n == SDP_NUM_WC);
-
-	if (!ret) {
-		struct sock *sk = &ssk->isk.sk;
-
-		sdp_post_recvs(ssk);
-		sdp_post_sends(ssk, 0);
-
-		if (sk->sk_sleep && waitqueue_active(sk->sk_sleep))
-			sk_stream_write_space(&ssk->isk.sk);
-	}
-
-	return ret;
-}
-
-void sdp_work(struct work_struct *work)
-{
-	struct sdp_sock *ssk = container_of(work, struct sdp_sock, work);
-	struct sock *sk = &ssk->isk.sk;
-	struct ib_cq *cq;
-
-	sdp_dbg_data(sk, "%s\n", __func__);
-
-	lock_sock(sk);
-	cq = ssk->cq;
-	if (unlikely(!cq))
-		goto out;
-
-	if (unlikely(!ssk->poll_cq)) {
-		struct rdma_cm_id *id = ssk->id;
-		if (id && id->qp)
-			rdma_notify(id, RDMA_CM_EVENT_ESTABLISHED);
-		goto out;
-	}
-
-	sdp_poll_cq(ssk, cq);
-	release_sock(sk);
-	sk_mem_reclaim(sk);
-	lock_sock(sk);
-	cq = ssk->cq;
-	if (unlikely(!cq))
-		goto out;
-	ib_req_notify_cq(cq, IB_CQ_NEXT_COMP);
-	sdp_poll_cq(ssk, cq);
-out:
-	release_sock(sk);
+	if (post_count)
+		sdp_xmit_poll(ssk, 0);
 }
diff --git a/drivers/infiniband/ulp/sdp/sdp_cma.c b/drivers/infiniband/ulp/sdp/sdp_cma.c
index 96d65bd..f7dc7c6 100644
--- a/drivers/infiniband/ulp/sdp/sdp_cma.c
+++ b/drivers/infiniband/ulp/sdp/sdp_cma.c
@@ -46,49 +46,13 @@
 #include "sdp_socket.h"
 #include "sdp.h"
 
-union cma_ip_addr {
-        struct in6_addr ip6;
-        struct {
-                __u32 pad[3];
-                __u32 addr;
-        } ip4;
-};
-
 #define SDP_MAJV_MINV 0x22
 
-/* TODO: too much? Can I avoid having the src/dst and port here? */
-struct sdp_hh {
-	struct sdp_bsdh bsdh;
-	u8 majv_minv;
-	u8 ipv_cap;
-	u8 rsvd1;
-	u8 max_adverts;
-	__u32 desremrcvsz;
-	__u32 localrcvsz;
-	__u16 port;
-	__u16 rsvd2;
-	union cma_ip_addr src_addr;
-	union cma_ip_addr dst_addr;
-};
-
-struct sdp_hah {
-	struct sdp_bsdh bsdh;
-	u8 majv_minv;
-	u8 ipv_cap;
-	u8 rsvd1;
-	u8 ext_max_adverts;
-	__u32 actrcvsz;
-};
-
 enum {
 	SDP_HH_SIZE = 76,
 	SDP_HAH_SIZE = 180,
 };
 
-static void sdp_cq_event_handler(struct ib_event *event, void *data)
-{
-}
-
 static void sdp_qp_event_handler(struct ib_event *event, void *data)
 {
 }
@@ -98,43 +62,19 @@ static int sdp_init_qp(struct sock *sk, struct rdma_cm_id *id)
 	struct ib_qp_init_attr qp_init_attr = {
 		.event_handler = sdp_qp_event_handler,
 		.cap.max_send_wr = SDP_TX_SIZE,
-		.cap.max_send_sge = SDP_MAX_SEND_SKB_FRAGS + 1, /* TODO */
+		.cap.max_send_sge = SDP_MAX_SEND_SKB_FRAGS,
 		.cap.max_recv_wr = SDP_RX_SIZE,
-		.cap.max_recv_sge = SDP_MAX_SEND_SKB_FRAGS + 1, /* TODO */
+		.cap.max_recv_sge = SDP_MAX_RECV_SKB_FRAGS + 1,
         	.sq_sig_type = IB_SIGNAL_REQ_WR,
         	.qp_type = IB_QPT_RC,
 	};
 	struct ib_device *device = id->device;
-	struct ib_cq *cq;
 	struct ib_mr *mr;
 	struct ib_pd *pd;
 	int rc;
 
 	sdp_dbg(sk, "%s\n", __func__);
 
-	sdp_sk(sk)->tx_head = 1;
-	sdp_sk(sk)->tx_tail = 1;
-	sdp_sk(sk)->rx_head = 1;
-	sdp_sk(sk)->rx_tail = 1;
-
-	sdp_sk(sk)->tx_ring = kmalloc(sizeof *sdp_sk(sk)->tx_ring * SDP_TX_SIZE,
-				      GFP_KERNEL);
-	if (!sdp_sk(sk)->tx_ring) {
-		rc = -ENOMEM;
-		sdp_warn(sk, "Unable to allocate TX Ring size %zd.\n",
-			 sizeof *sdp_sk(sk)->tx_ring * SDP_TX_SIZE);
-		goto err_tx;
-	}
-
-	sdp_sk(sk)->rx_ring = kmalloc(sizeof *sdp_sk(sk)->rx_ring * SDP_RX_SIZE,
-				      GFP_KERNEL);
-	if (!sdp_sk(sk)->rx_ring) {
-		rc = -ENOMEM;
-		sdp_warn(sk, "Unable to allocate RX Ring size %zd.\n",
-			 sizeof *sdp_sk(sk)->rx_ring * SDP_RX_SIZE);
-		goto err_rx;
-	}
-
 	pd = ib_alloc_pd(device);
 	if (IS_ERR(pd)) {
 		rc = PTR_ERR(pd);
@@ -150,27 +90,23 @@ static int sdp_init_qp(struct sock *sk, struct rdma_cm_id *id)
         }
 
 	sdp_sk(sk)->mr = mr;
-	INIT_WORK(&sdp_sk(sk)->work, sdp_work);
-
-	cq = ib_create_cq(device, sdp_completion_handler, sdp_cq_event_handler,
-			  sk, SDP_TX_SIZE + SDP_RX_SIZE, 0);
 
-	if (IS_ERR(cq)) {
-		rc = PTR_ERR(cq);
-		sdp_warn(sk, "Unable to allocate CQ: %d.\n", rc);
-		goto err_cq;
-	}
+	rc = sdp_rx_ring_create(sdp_sk(sk), device);
+	if (rc)
+		goto err_rx;
 
-	ib_req_notify_cq(cq, IB_CQ_NEXT_COMP);
+	rc = sdp_tx_ring_create(sdp_sk(sk), device);
+	if (rc)
+		goto err_tx;
 
-        qp_init_attr.send_cq = qp_init_attr.recv_cq = cq;
+	qp_init_attr.recv_cq = sdp_sk(sk)->rx_ring.cq;
+	qp_init_attr.send_cq = sdp_sk(sk)->tx_ring.cq;
 
 	rc = rdma_create_qp(id, pd, &qp_init_attr);
 	if (rc) {
 		sdp_warn(sk, "Unable to create QP: %d.\n", rc);
 		goto err_qp;
 	}
-	sdp_sk(sk)->cq = cq;
 	sdp_sk(sk)->qp = id->qp;
 	sdp_sk(sk)->ib_device = device;
 
@@ -180,18 +116,14 @@ static int sdp_init_qp(struct sock *sk, struct rdma_cm_id *id)
 	return 0;
 
 err_qp:
-	ib_destroy_cq(cq);
-err_cq:
+	sdp_tx_ring_destroy(sdp_sk(sk));
+err_tx:
+	sdp_rx_ring_destroy(sdp_sk(sk));
+err_rx:
 	ib_dereg_mr(sdp_sk(sk)->mr);
 err_mr:
 	ib_dealloc_pd(pd);
 err_pd:
-	kfree(sdp_sk(sk)->rx_ring);
-	sdp_sk(sk)->rx_ring = NULL;
-err_rx:
-	kfree(sdp_sk(sk)->tx_ring);
-	sdp_sk(sk)->tx_ring = NULL;
-err_tx:
 	return rc;
 }
 
@@ -206,6 +138,7 @@ static int sdp_connect_handler(struct sock *sk, struct rdma_cm_id *id,
 	sdp_dbg(sk, "%s %p -> %p\n", __func__, sdp_sk(sk)->id, id);
 
 	h = event->param.conn.private_data;
+	SDP_DUMP_PACKET(sk, "RX", NULL, &h->bsdh);
 
 	if (!h->max_adverts)
 		return -EINVAL;
@@ -232,24 +165,21 @@ static int sdp_connect_handler(struct sock *sk, struct rdma_cm_id *id,
 
 	sdp_add_sock(sdp_sk(child));
 
-	sdp_sk(child)->max_bufs = sdp_sk(child)->bufs = ntohs(h->bsdh.bufs);
-	sdp_sk(child)->min_bufs = sdp_sk(child)->bufs / 4;
+	sdp_sk(child)->max_bufs = ntohs(h->bsdh.bufs);
+	atomic_set(&sdp_sk(child)->tx_ring.credits, sdp_sk(child)->max_bufs);
+
+	sdp_sk(child)->min_bufs = tx_credits(sdp_sk(child)) / 4;
 	sdp_sk(child)->xmit_size_goal = ntohl(h->localrcvsz) -
 		sizeof(struct sdp_bsdh);
 	sdp_sk(child)->send_frags = PAGE_ALIGN(sdp_sk(child)->xmit_size_goal) /
-		PAGE_SIZE;
-	sdp_init_buffers(sdp_sk(child), ntohl(h->desremrcvsz));
-
-	sdp_dbg(child, "%s bufs %d xmit_size_goal %d send trigger %d\n",
-		__func__,
-		sdp_sk(child)->bufs,
-		sdp_sk(child)->xmit_size_goal,
-		sdp_sk(child)->min_bufs);
+		PAGE_SIZE + 1; /* +1 to conpensate on not aligned buffers */
+	sdp_init_buffers(sdp_sk(child), rcvbuf_initial_size);
 
 	id->context = child;
 	sdp_sk(child)->id = id;
 
-	list_add_tail(&sdp_sk(child)->backlog_queue, &sdp_sk(sk)->backlog_queue);
+	list_add_tail(&sdp_sk(child)->backlog_queue,
+			&sdp_sk(sk)->backlog_queue);
 	sdp_sk(child)->parent = sk;
 
 	sdp_exch_state(child, TCPF_LISTEN | TCPF_CLOSE, TCP_SYN_RECV);
@@ -269,6 +199,7 @@ static int sdp_response_handler(struct sock *sk, struct rdma_cm_id *id,
 	sdp_dbg(sk, "%s\n", __func__);
 
 	sdp_exch_state(sk, TCPF_SYN_SENT, TCP_ESTABLISHED);
+	sdp_set_default_moderation(sdp_sk(sk));
 
 	if (sock_flag(sk, SOCK_KEEPOPEN))
 		sdp_start_keepalive_timer(sk);
@@ -277,22 +208,19 @@ static int sdp_response_handler(struct sock *sk, struct rdma_cm_id *id,
 		return 0;
 
 	h = event->param.conn.private_data;
-	sdp_sk(sk)->max_bufs = sdp_sk(sk)->bufs = ntohs(h->bsdh.bufs);
-	sdp_sk(sk)->min_bufs = sdp_sk(sk)->bufs / 4;
-	sdp_sk(sk)->xmit_size_goal = ntohl(h->actrcvsz) -
-		sizeof(struct sdp_bsdh);
-	sdp_sk(sk)->send_frags = PAGE_ALIGN(sdp_sk(sk)->xmit_size_goal) /
-		PAGE_SIZE;
-
-	sdp_dbg(sk, "%s bufs %d xmit_size_goal %d send trigger %d\n",
-		__func__,
-		sdp_sk(sk)->bufs,
-		sdp_sk(sk)->xmit_size_goal,
-		sdp_sk(sk)->min_bufs);
+	SDP_DUMP_PACKET(sk, "RX", NULL, &h->bsdh);
+	sdp_sk(sk)->max_bufs = ntohs(h->bsdh.bufs);
+	atomic_set(&sdp_sk(sk)->tx_ring.credits, sdp_sk(sk)->max_bufs);
+	sdp_sk(sk)->min_bufs = tx_credits(sdp_sk(sk)) / 4;
+	sdp_sk(sk)->xmit_size_goal =
+		ntohl(h->actrcvsz) - sizeof(struct sdp_bsdh);
+	sdp_sk(sk)->send_frags = MIN(PAGE_ALIGN(sdp_sk(sk)->xmit_size_goal) /
+		PAGE_SIZE, MAX_SKB_FRAGS) + 1;  /* +1 to conpensate on not */
+						/* aligned buffers         */
+	sdp_sk(sk)->xmit_size_goal = MIN(sdp_sk(sk)->xmit_size_goal,
+		sdp_sk(sk)->send_frags * PAGE_SIZE);
 
 	sdp_sk(sk)->poll_cq = 1;
-	ib_req_notify_cq(sdp_sk(sk)->cq, IB_CQ_NEXT_COMP);
-	sdp_poll_cq(sdp_sk(sk), sdp_sk(sk)->cq);
 
 	sk->sk_state_change(sk);
 	sk_wake_async(sk, 0, POLL_OUT);
@@ -314,6 +242,8 @@ static int sdp_connected_handler(struct sock *sk, struct rdma_cm_event *event)
 
 	sdp_exch_state(sk, TCPF_SYN_RECV, TCP_ESTABLISHED);
 
+	sdp_set_default_moderation(sdp_sk(sk));
+
 	if (sock_flag(sk, SOCK_KEEPOPEN))
 		sdp_start_keepalive_timer(sk);
 
@@ -325,18 +255,12 @@ static int sdp_connected_handler(struct sock *sk, struct rdma_cm_event *event)
 		sdp_dbg(sk, "parent is going away.\n");
 		goto done;
 	}
-#if 0
-	/* TODO: backlog */
-	if (sk_acceptq_is_full(parent)) {
-		sdp_dbg(parent, "%s ECONNREFUSED: parent accept queue full: %d > %d\n", __func__, parent->sk_ack_backlog, parent->sk_max_ack_backlog);
-		release_sock(parent);
-		return -ECONNREFUSED;
-	}
-#endif
+
 	sk_acceptq_added(parent);
 	sdp_dbg(parent, "%s child connection established\n", __func__);
 	list_del_init(&sdp_sk(sk)->backlog_queue);
-	list_add_tail(&sdp_sk(sk)->accept_queue, &sdp_sk(parent)->accept_queue);
+	list_add_tail(&sdp_sk(sk)->accept_queue,
+			&sdp_sk(parent)->accept_queue);
 
 	parent->sk_state_change(parent);
 	sk_wake_async(parent, 0, POLL_OUT);
@@ -352,13 +276,13 @@ static int sdp_disconnected_handler(struct sock *sk)
 
 	sdp_dbg(sk, "%s\n", __func__);
 
-	if (ssk->cq)
-		sdp_poll_cq(ssk, ssk->cq);
+	if (ssk->tx_ring.cq)
+		sdp_xmit_poll(ssk, 1);
 
 	if (sk->sk_state == TCP_SYN_RECV) {
 		sdp_connected_handler(sk, NULL);
 
-		if (ssk->rcv_nxt)
+		if (rcv_nxt(ssk))
 			return 0;
 	}
 
@@ -400,7 +324,7 @@ int sdp_cma_handler(struct rdma_cm_id *id, struct rdma_cm_event *event)
 		rc = rdma_resolve_route(id, SDP_ROUTE_TIMEOUT);
 		break;
 	case RDMA_CM_EVENT_ADDR_ERROR:
-		sdp_dbg(sk, "RDMA_CM_EVENT_ADDR_ERROR\n");
+		sdp_warn(sk, "RDMA_CM_EVENT_ADDR_ERROR\n");
 		rc = -ENETUNREACH;
 		break;
 	case RDMA_CM_EVENT_ROUTE_RESOLVED:
@@ -408,17 +332,17 @@ int sdp_cma_handler(struct rdma_cm_id *id, struct rdma_cm_event *event)
 		rc = sdp_init_qp(sk, id);
 		if (rc)
 			break;
-		sdp_sk(sk)->remote_credits = sdp_sk(sk)->rx_head -
-			sdp_sk(sk)->rx_tail;
+		atomic_set(&sdp_sk(sk)->remote_credits,
+				ring_posted(sdp_sk(sk)->rx_ring));
 		memset(&hh, 0, sizeof hh);
 		hh.bsdh.mid = SDP_MID_HELLO;
-		hh.bsdh.bufs = htons(sdp_sk(sk)->remote_credits);
+		hh.bsdh.bufs = htons(remote_credits(sdp_sk(sk)));
 		hh.bsdh.len = htonl(sizeof(struct sdp_bsdh) + SDP_HH_SIZE);
 		hh.max_adverts = 1;
 		hh.majv_minv = SDP_MAJV_MINV;
 		sdp_init_buffers(sdp_sk(sk), rcvbuf_initial_size);
 		hh.localrcvsz = hh.desremrcvsz = htonl(sdp_sk(sk)->recv_frags *
-						       PAGE_SIZE + SDP_HEAD_SIZE);
+				PAGE_SIZE + sizeof(struct sdp_bsdh));
 		hh.max_adverts = 0x1;
 		inet_sk(sk)->saddr = inet_sk(sk)->rcv_saddr =
 			((struct sockaddr_in *)&id->route.addr.src_addr)->sin_addr.s_addr;
@@ -428,6 +352,7 @@ int sdp_cma_handler(struct rdma_cm_id *id, struct rdma_cm_event *event)
 		conn_param.responder_resources = 4 /* TODO */;
 		conn_param.initiator_depth = 4 /* TODO */;
 		conn_param.retry_count = SDP_RETRY_COUNT;
+		SDP_DUMP_PACKET(NULL, "TX", NULL, &hh.bsdh);
 		rc = rdma_connect(id, &conn_param);
 		break;
 	case RDMA_CM_EVENT_ROUTE_ERROR:
@@ -442,22 +367,24 @@ int sdp_cma_handler(struct rdma_cm_id *id, struct rdma_cm_event *event)
 			break;
 		}
 		child = id->context;
-		sdp_sk(child)->remote_credits = sdp_sk(child)->rx_head -
-			sdp_sk(child)->rx_tail;
+		atomic_set(&sdp_sk(child)->remote_credits,
+				ring_posted(sdp_sk(child)->rx_ring));
 		memset(&hah, 0, sizeof hah);
 		hah.bsdh.mid = SDP_MID_HELLO_ACK;
-		hah.bsdh.bufs = htons(sdp_sk(child)->remote_credits);
+		hah.bsdh.bufs = htons(remote_credits(sdp_sk(child)));
 		hah.bsdh.len = htonl(sizeof(struct sdp_bsdh) + SDP_HAH_SIZE);
 		hah.majv_minv = SDP_MAJV_MINV;
 		hah.ext_max_adverts = 1; /* Doesn't seem to be mandated by spec,
 					    but just in case */
-		hah.actrcvsz = htonl(sdp_sk(child)->recv_frags * PAGE_SIZE + SDP_HEAD_SIZE);
+		hah.actrcvsz = htonl(sdp_sk(child)->recv_frags * PAGE_SIZE +
+			sizeof(struct sdp_bsdh));
 		memset(&conn_param, 0, sizeof conn_param);
 		conn_param.private_data_len = sizeof hah;
 		conn_param.private_data = &hah;
 		conn_param.responder_resources = 4 /* TODO */;
 		conn_param.initiator_depth = 4 /* TODO */;
 		conn_param.retry_count = SDP_RETRY_COUNT;
+		SDP_DUMP_PACKET(sk, "TX", NULL, &hah.bsdh);
 		rc = rdma_accept(id, &conn_param);
 		if (rc) {
 			sdp_sk(child)->id = NULL;
@@ -475,7 +402,7 @@ int sdp_cma_handler(struct rdma_cm_id *id, struct rdma_cm_event *event)
 			rc = rdma_accept(id, NULL);
 
 		if (!rc)
-			rc = sdp_post_credits(sdp_sk(sk));
+			rc = sdp_post_credits(sdp_sk(sk)) < 0 ?: 0;
 		break;
 	case RDMA_CM_EVENT_CONNECT_ERROR:
 		sdp_dbg(sk, "RDMA_CM_EVENT_CONNECT_ERROR\n");
@@ -511,8 +438,9 @@ int sdp_cma_handler(struct rdma_cm_id *id, struct rdma_cm_event *event)
 
 		if (sk->sk_state != TCP_TIME_WAIT) {
 			if (sk->sk_state == TCP_CLOSE_WAIT) {
-				sdp_dbg(sk, "IB teardown while in TCP_CLOSE_WAIT "
-					    "taking reference to let close() finish the work\n");
+				sdp_dbg(sk, "IB teardown while in "
+					"TCP_CLOSE_WAIT taking reference to "
+					"let close() finish the work\n");
 				sock_hold(sk, SOCK_REF_CM_TW);
 			}
 			sdp_set_error(sk, EPIPE);
@@ -550,7 +478,6 @@ int sdp_cma_handler(struct rdma_cm_id *id, struct rdma_cm_event *event)
 	sdp_dbg(sk, "event %d done. status %d\n", event->event, rc);
 
 	if (parent) {
-		sdp_dbg(parent, "deleting child %d done. status %d\n", event->event, rc);
 		lock_sock(parent);
 		if (!sdp_sk(parent)->id) { /* TODO: look at SOCK_DEAD? */
 			sdp_dbg(sk, "parent is going away.\n");
diff --git a/drivers/infiniband/ulp/sdp/sdp_main.c b/drivers/infiniband/ulp/sdp/sdp_main.c
index 7a38c47..8e1a6d7 100644
--- a/drivers/infiniband/ulp/sdp/sdp_main.c
+++ b/drivers/infiniband/ulp/sdp/sdp_main.c
@@ -67,7 +67,6 @@ unsigned int csum_partial_copy_from_user_new (const char *src, char *dst,
 #include <linux/socket.h>
 #include <net/protocol.h>
 #include <net/inet_common.h>
-#include <linux/proc_fs.h>
 #include <rdma/rdma_cm.h>
 #include <rdma/ib_verbs.h>
 /* TODO: remove when sdp_socket.h becomes part of include/linux/socket.h */
@@ -80,66 +79,42 @@ MODULE_DESCRIPTION("InfiniBand SDP module");
 MODULE_LICENSE("Dual BSD/GPL");
 
 #ifdef CONFIG_INFINIBAND_SDP_DEBUG
-int sdp_debug_level;
-
-module_param_named(debug_level, sdp_debug_level, int, 0644);
-MODULE_PARM_DESC(debug_level, "Enable debug tracing if > 0.");
+SDP_MODPARAM_INT(sdp_debug_level, 0, "Enable debug tracing if > 0.");
 #endif
 #ifdef CONFIG_INFINIBAND_SDP_DEBUG
-int sdp_data_debug_level;
-
-module_param_named(data_debug_level, sdp_data_debug_level, int, 0644);
-MODULE_PARM_DESC(data_debug_level, "Enable data path debug tracing if > 0.");
+SDP_MODPARAM_INT(sdp_data_debug_level, 0,
+		"Enable data path debug tracing if > 0.");
 #endif
 
-static int send_poll_hit;
-
-module_param_named(send_poll_hit, send_poll_hit, int, 0644);
-MODULE_PARM_DESC(send_poll_hit, "How many times send poll helped.");
-
-static int send_poll_miss;
-
-module_param_named(send_poll_miss, send_poll_miss, int, 0644);
-MODULE_PARM_DESC(send_poll_miss, "How many times send poll missed.");
-
-static int recv_poll_hit;
-
-module_param_named(recv_poll_hit, recv_poll_hit, int, 0644);
-MODULE_PARM_DESC(recv_poll_hit, "How many times recv poll helped.");
-
-static int recv_poll_miss;
-
-module_param_named(recv_poll_miss, recv_poll_miss, int, 0644);
-MODULE_PARM_DESC(recv_poll_miss, "How many times recv poll missed.");
-
-static int send_poll = 100;
-
-module_param_named(send_poll, send_poll, int, 0644);
-MODULE_PARM_DESC(send_poll, "How many times to poll send.");
-
-static int recv_poll = 1000;
-
-module_param_named(recv_poll, recv_poll, int, 0644);
-MODULE_PARM_DESC(recv_poll, "How many times to poll recv.");
-
-static int send_poll_thresh = 8192;
-
-module_param_named(send_poll_thresh, send_poll_thresh, int, 0644);
-MODULE_PARM_DESC(send_poll_thresh, "Send message size thresh hold over which to start polling.");
-
-static unsigned int sdp_keepalive_time = SDP_KEEPALIVE_TIME;
-
-module_param_named(sdp_keepalive_time, sdp_keepalive_time, uint, 0644);
-MODULE_PARM_DESC(sdp_keepalive_time, "Default idle time in seconds before keepalive probe sent.");
-
-static int sdp_zcopy_thresh = 65536;
-module_param_named(sdp_zcopy_thresh, sdp_zcopy_thresh, int, 0644);
-MODULE_PARM_DESC(sdp_zcopy_thresh, "Zero copy send threshold; 0=0ff.");
-
-struct workqueue_struct *sdp_workqueue;
-
-static struct list_head sock_list;
-static spinlock_t sock_list_lock;
+SDP_MODPARAM_SINT(recv_poll_hit, -1, "How many times recv poll helped.");
+SDP_MODPARAM_SINT(recv_poll_miss, -1, "How many times recv poll missed.");
+SDP_MODPARAM_SINT(recv_poll, 1000, "How many times to poll recv.");
+SDP_MODPARAM_SINT(sdp_keepalive_time, SDP_KEEPALIVE_TIME,
+	"Default idle time in seconds before keepalive probe sent.");
+SDP_MODPARAM_SINT(sdp_zcopy_thresh, 65536, "Zero copy send threshold; 0=0ff.");
+
+#define SDP_RX_COAL_TIME_HIGH 128
+SDP_MODPARAM_SINT(sdp_rx_coal_target, 0x50000,
+	"Target number of bytes to coalesce with interrupt moderation.");
+SDP_MODPARAM_SINT(sdp_rx_coal_time, 0x10, "rx coal time (jiffies).");
+SDP_MODPARAM_SINT(sdp_rx_rate_low, 80000, "rx_rate low (packets/sec).");
+SDP_MODPARAM_SINT(sdp_rx_coal_time_low, 0, "low moderation usec.");
+SDP_MODPARAM_SINT(sdp_rx_rate_high, 100000, "rx_rate high (packets/sec).");
+SDP_MODPARAM_SINT(sdp_rx_coal_time_high, 128, "high moderation usec.");
+SDP_MODPARAM_SINT(sdp_rx_rate_thresh, (200000 / SDP_RX_COAL_TIME_HIGH),
+	"rx rate thresh ().");
+SDP_MODPARAM_SINT(sdp_sample_interval, (HZ / 4), "sample interval (jiffies).");
+
+SDP_MODPARAM_INT(hw_int_mod_count, -1,
+		"forced hw int moderation val. -1 for auto (packets).");
+SDP_MODPARAM_INT(hw_int_mod_usec, -1,
+		"forced hw int moderation val. -1 for auto (usec).");
+
+struct workqueue_struct *sdp_wq;
+struct workqueue_struct *rx_comp_wq;
+
+struct list_head sock_list;
+spinlock_t sock_list_lock;
 
 static DEFINE_RWLOCK(device_removal_lock);
 
@@ -206,52 +181,38 @@ static int sdp_get_port(struct sock *sk, unsigned short snum)
 static void sdp_destroy_qp(struct sdp_sock *ssk)
 {
 	struct ib_pd *pd = NULL;
-	struct ib_cq *cq = NULL;
+	unsigned long flags;
+
+
+	sdp_dbg(&ssk->isk.sk, "destroying qp\n");
+	sdp_prf(&ssk->isk.sk, NULL, "destroying qp");
+
+	del_timer(&ssk->tx_ring.timer);
+
+	rx_ring_lock(ssk, flags);
+
+	sdp_rx_ring_destroy(ssk);
+	sdp_tx_ring_destroy(ssk);
 
 	if (ssk->qp) {
 		pd = ssk->qp->pd;
-		cq = ssk->cq;
-		ssk->cq = NULL;
 		ib_destroy_qp(ssk->qp);
-
-		while (ssk->rx_head != ssk->rx_tail) {
-			struct sk_buff *skb;
-			skb = sdp_recv_completion(ssk, ssk->rx_tail);
-			if (!skb)
-				break;
-			atomic_sub(SDP_MAX_SEND_SKB_FRAGS, &sdp_current_mem_usage);
-			__kfree_skb(skb);
-		}
-		while (ssk->tx_head != ssk->tx_tail) {
-			struct sk_buff *skb;
-			skb = sdp_send_completion(ssk, ssk->tx_tail);
-			if (!skb)
-				break;
-			__kfree_skb(skb);
-		}
+		ssk->qp = NULL;
 	}
 
-	if (cq)
-		ib_destroy_cq(cq);
-
-	if (ssk->mr)
+	if (ssk->mr) {
 		ib_dereg_mr(ssk->mr);
+		ssk->mr = NULL;
+	}
 
 	if (pd)
 		ib_dealloc_pd(pd);
 
 	sdp_remove_large_sock(ssk);
 
-	if (ssk->rx_ring) {
-		kfree(ssk->rx_ring);
-		ssk->rx_ring = NULL;
-	}
-	if (ssk->tx_ring) {
-		kfree(ssk->tx_ring);
-		ssk->tx_ring = NULL;
-	}
-}
+	rx_ring_unlock(ssk, flags);
 
+}
 
 static void sdp_reset_keepalive_timer(struct sock *sk, unsigned long len)
 {
@@ -259,8 +220,8 @@ static void sdp_reset_keepalive_timer(struct sock *sk, unsigned long len)
 
 	sdp_dbg(sk, "%s\n", __func__);
 
-	ssk->keepalive_tx_head = ssk->tx_head;
-	ssk->keepalive_rx_head = ssk->rx_head;
+	ssk->keepalive_tx_head = ring_head(ssk->tx_ring);
+	ssk->keepalive_rx_head = ring_head(ssk->rx_ring);
 
 	sk_reset_timer(sk, &sk->sk_timer, jiffies + len);
 }
@@ -295,8 +256,8 @@ static void sdp_keepalive_timer(unsigned long data)
 	    sk->sk_state == TCP_CLOSE)
 		goto out;
 
-	if (ssk->keepalive_tx_head == ssk->tx_head &&
-	    ssk->keepalive_rx_head == ssk->rx_head)
+	if (ssk->keepalive_tx_head == ring_head(ssk->tx_ring) &&
+	    ssk->keepalive_rx_head == ring_head(ssk->rx_ring))
 		sdp_post_keepalive(ssk);
 
 	sdp_reset_keepalive_timer(sk, sdp_keepalive_time_when(ssk));
@@ -306,7 +267,7 @@ out:
 	sock_put(sk, SOCK_REF_BORN);
 }
 
-static void sdp_init_timer(struct sock *sk)
+static void sdp_init_keepalive_timer(struct sock *sk)
 {
 	init_timer(&sk->sk_timer);
 
@@ -332,6 +293,150 @@ void sdp_start_keepalive_timer(struct sock *sk)
 	sdp_reset_keepalive_timer(sk, sdp_keepalive_time_when(sdp_sk(sk)));
 }
 
+void sdp_set_default_moderation(struct sdp_sock *ssk)
+{
+	struct sock *sk = &ssk->isk.sk;
+	struct sdp_moderation *mod = &ssk->auto_mod;
+	int rx_buf_size;
+
+	if (hw_int_mod_count > -1 || hw_int_mod_usec > -1) {
+		int err;
+
+		mod->adaptive_rx_coal = 0;
+
+		if (hw_int_mod_count > 0 && hw_int_mod_usec > 0) {
+			err = ib_modify_cq(ssk->rx_ring.cq, hw_int_mod_count,
+					hw_int_mod_usec);
+			if (err)
+				sdp_warn(sk,
+					"Failed modifying moderation for cq");
+			else
+				sdp_dbg(sk,
+					"Using fixed interrupt moderation\n");
+		}
+		return;
+	}
+
+	mod->adaptive_rx_coal = 1;
+	sdp_dbg(sk, "Using adaptive interrupt moderation\n");
+
+	/* If we haven't received a specific coalescing setting
+	 * (module param), we set the moderation paramters as follows:
+	 * - moder_cnt is set to the number of mtu sized packets to
+	 *   satisfy our coelsing target.
+	 * - moder_time is set to a fixed value.
+	 */
+	rx_buf_size = (ssk->recv_frags * PAGE_SIZE) + sizeof(struct sdp_bsdh);
+	mod->moder_cnt = sdp_rx_coal_target / rx_buf_size + 1;
+	mod->moder_time = sdp_rx_coal_time;
+	sdp_dbg(sk, "Default coalesing params for buf size:%d - "
+			     "moder_cnt:%d moder_time:%d\n",
+		 rx_buf_size, mod->moder_cnt, mod->moder_time);
+
+	/* Reset auto-moderation params */
+	mod->pkt_rate_low = sdp_rx_rate_low;
+	mod->rx_usecs_low = sdp_rx_coal_time_low;
+	mod->pkt_rate_high = sdp_rx_rate_high;
+	mod->rx_usecs_high = sdp_rx_coal_time_high;
+	mod->sample_interval = sdp_sample_interval;
+
+	mod->last_moder_time = SDP_AUTO_CONF;
+	mod->last_moder_jiffies = 0;
+	mod->last_moder_packets = 0;
+	mod->last_moder_tx_packets = 0;
+	mod->last_moder_bytes = 0;
+}
+
+/* If tx and rx packet rates are not balanced, assume that
+ * traffic is mainly BW bound and apply maximum moderation.
+ * Otherwise, moderate according to packet rate */
+static inline int calc_moder_time(int rate, struct sdp_moderation *mod,
+		int tx_pkt_diff, int rx_pkt_diff)
+{
+	if (2 * tx_pkt_diff > 3 * rx_pkt_diff ||
+			2 * rx_pkt_diff > 3 * tx_pkt_diff)
+		return mod->rx_usecs_high;
+
+	if (rate < mod->pkt_rate_low)
+		return mod->rx_usecs_low;
+
+	if (rate > mod->pkt_rate_high)
+		return mod->rx_usecs_high;
+
+	return (rate - mod->pkt_rate_low) *
+		(mod->rx_usecs_high - mod->rx_usecs_low) /
+		(mod->pkt_rate_high - mod->pkt_rate_low) +
+		mod->rx_usecs_low;
+}
+
+static void sdp_auto_moderation(struct sdp_sock *ssk)
+{
+	struct sdp_moderation *mod = &ssk->auto_mod;
+
+	unsigned long period = jiffies - mod->last_moder_jiffies;
+	unsigned long packets;
+	unsigned long rate;
+	unsigned long avg_pkt_size;
+	unsigned long tx_pkt_diff;
+	unsigned long rx_pkt_diff;
+	int moder_time;
+	int err;
+
+	if (!mod->adaptive_rx_coal)
+		return;
+
+	if (period < mod->sample_interval)
+		return;
+
+	if (!mod->last_moder_jiffies || !period)
+		goto out;
+
+	tx_pkt_diff = ((unsigned long) (ssk->tx_packets -
+					mod->last_moder_tx_packets));
+	rx_pkt_diff = ((unsigned long) (ssk->rx_packets -
+					mod->last_moder_packets));
+	packets = max(tx_pkt_diff, rx_pkt_diff);
+	rate = packets * HZ / period;
+	avg_pkt_size = packets ? ((unsigned long) (ssk->rx_bytes -
+				 mod->last_moder_bytes)) / packets : 0;
+
+	/* Apply auto-moderation only when packet rate exceeds a rate that
+	 * it matters */
+	if (rate > sdp_rx_rate_thresh) {
+		moder_time = calc_moder_time(rate, mod, tx_pkt_diff,
+				rx_pkt_diff);
+	} else {
+		/* When packet rate is low, use default moderation rather
+		 * than 0 to prevent interrupt storms if traffic suddenly
+		 * increases */
+		moder_time = mod->moder_time;
+	}
+
+	sdp_dbg_data(&ssk->isk.sk, "tx rate:%lu rx_rate:%lu\n",
+			tx_pkt_diff * HZ / period, rx_pkt_diff * HZ / period);
+
+	sdp_dbg_data(&ssk->isk.sk, "Rx moder_time changed from:%d to %d "
+			"period:%lu [jiff] packets:%lu avg_pkt_size:%lu "
+			"rate:%lu [p/s])\n",
+			mod->last_moder_time, moder_time, period, packets,
+			avg_pkt_size, rate);
+
+	if (moder_time != mod->last_moder_time) {
+		mod->last_moder_time = moder_time;
+		err = ib_modify_cq(ssk->rx_ring.cq, mod->moder_cnt, moder_time);
+		if (err) {
+			sdp_dbg_data(&ssk->isk.sk,
+					"Failed modifying moderation for cq");
+		}
+	}
+
+out:
+	mod->last_moder_packets = ssk->rx_packets;
+	mod->last_moder_tx_packets = ssk->tx_packets;
+	mod->last_moder_bytes = ssk->rx_bytes;
+	mod->last_moder_jiffies = jiffies;
+}
+
 void sdp_reset_sk(struct sock *sk, int rc)
 {
 	struct sdp_sock *ssk = sdp_sk(sk);
@@ -340,11 +445,15 @@ void sdp_reset_sk(struct sock *sk, int rc)
 
 	read_lock(&device_removal_lock);
 
-	if (ssk->cq)
-		sdp_poll_cq(ssk, ssk->cq);
+	if (ssk->tx_ring.cq)
+		sdp_xmit_poll(ssk, 1);
 
-	if (!(sk->sk_shutdown & RCV_SHUTDOWN) || !sk_stream_memory_free(sk))
+	if (!(sk->sk_shutdown & RCV_SHUTDOWN) || !sk_stream_memory_free(sk)) {
+		sdp_warn(sk, "setting state to error\n");
 		sdp_set_error(sk, rc);
+	}
+
+	sdp_destroy_qp(ssk);
 
 	memset((void *)&ssk->id, 0, sizeof(*ssk) - offsetof(typeof(*ssk), id));
 
@@ -352,7 +461,7 @@ void sdp_reset_sk(struct sock *sk, int rc)
 
 	/* Don't destroy socket before destroy work does its job */
 	sock_hold(sk, SOCK_REF_RESET);
-	queue_work(sdp_workqueue, &ssk->destroy_work);
+	queue_work(sdp_wq, &ssk->destroy_work);
 
 	read_unlock(&device_removal_lock);
 }
@@ -430,7 +539,7 @@ static void sdp_destruct(struct sock *sk)
 	ssk->destructed_already = 1;
 
 	sdp_remove_sock(ssk);
-	
+
 	sdp_close_sk(sk);
 
 	if (ssk->parent)
@@ -485,6 +594,7 @@ static void sdp_close(struct sock *sk, long timeout)
 	lock_sock(sk);
 
 	sdp_dbg(sk, "%s\n", __func__);
+	sdp_prf(sk, NULL, __func__);
 
 	sdp_delete_keepalive_timer(sk);
 
@@ -510,7 +620,7 @@ static void sdp_close(struct sock *sk, long timeout)
 	 *  descriptor close, not protocol-sourced closes, because the
 	 *  reader process may not have drained the data yet!
 	 */
-	while ((skb = __skb_dequeue(&sk->sk_receive_queue)) != NULL) {
+	while ((skb = skb_dequeue(&sk->sk_receive_queue)) != NULL) {
 		data_was_unread = 1;
 		__kfree_skb(skb);
 	}
@@ -701,13 +811,9 @@ static int sdp_wait_for_connect(struct sock *sk, long timeo)
 					  TASK_INTERRUPTIBLE);
 		release_sock(sk);
 		if (list_empty(&ssk->accept_queue)) {
-			sdp_dbg(sk, "%s schedule_timeout\n", __func__);
 			timeo = schedule_timeout(timeo);
-			sdp_dbg(sk, "%s schedule_timeout done\n", __func__);
 		}
-		sdp_dbg(sk, "%s lock_sock\n", __func__);
 		lock_sock(sk);
-		sdp_dbg(sk, "%s lock_sock done\n", __func__);
 		err = 0;
 		if (!list_empty(&ssk->accept_queue))
 			break;
@@ -730,7 +836,7 @@ static int sdp_wait_for_connect(struct sock *sk, long timeo)
 /* Like inet_csk_accept */
 static struct sock *sdp_accept(struct sock *sk, int flags, int *err)
 {
-	struct sdp_sock *newssk, *ssk;
+	struct sdp_sock *newssk = NULL, *ssk;
 	struct sock *newsk;
 	int error;
 
@@ -761,7 +867,8 @@ static struct sock *sdp_accept(struct sock *sk, int flags, int *err)
 			goto out_err;
 	}
 
-	newssk = list_entry(ssk->accept_queue.next, struct sdp_sock, accept_queue);
+	newssk = list_entry(ssk->accept_queue.next, struct sdp_sock,
+			accept_queue);
 	list_del_init(&newssk->accept_queue);
 	newssk->parent = NULL;
 	sk_acceptq_removed(sk);
@@ -770,11 +877,9 @@ out:
 	release_sock(sk);
 	if (newsk) {
 		lock_sock(newsk);
-		if (newssk->cq) {
-			sdp_dbg(newsk, "%s: ib_req_notify_cq\n", __func__);
+		if (newssk->rx_ring.cq) {
 			newssk->poll_cq = 1;
-			ib_req_notify_cq(newssk->cq, IB_CQ_NEXT_COMP);
-			sdp_poll_cq(newssk, newssk->cq);
+			sdp_arm_rx_cq(&newssk->isk.sk);
 		}
 		release_sock(newsk);
 	}
@@ -807,8 +912,8 @@ static int sdp_ioctl(struct sock *sk, int cmd, unsigned long arg)
 		else if (sock_flag(sk, SOCK_URGINLINE) ||
 			 !ssk->urg_data ||
 			 before(ssk->urg_seq, ssk->copied_seq) ||
-			 !before(ssk->urg_seq, ssk->rcv_nxt)) {
-			answ = ssk->rcv_nxt - ssk->copied_seq;
+			 !before(ssk->urg_seq, rcv_nxt(ssk))) {
+			answ = rcv_nxt(ssk) - ssk->copied_seq;
 
 			/* Subtract 1, if FIN is in queue. */
 			if (answ && !skb_queue_empty(&sk->sk_receive_queue))
@@ -829,7 +934,7 @@ static int sdp_ioctl(struct sock *sk, int cmd, unsigned long arg)
 		if ((1 << sk->sk_state) & (TCPF_SYN_SENT | TCPF_SYN_RECV))
 			answ = 0;
 		else
-			answ = ssk->write_seq - ssk->snd_una;
+			answ = ssk->write_seq - ssk->tx_ring.una_seq;
 		break;
 	default:
 		return -ENOIOCTLCMD;
@@ -837,14 +942,14 @@ static int sdp_ioctl(struct sock *sk, int cmd, unsigned long arg)
 	/* TODO: Need to handle:
 	   case SIOCOUTQ:
 	 */
-	return put_user(answ, (int __user *)arg); 
+	return put_user(answ, (int __user *)arg);
 }
 
 static inline void sdp_start_dreq_wait_timeout(struct sdp_sock *ssk, int timeo)
 {
 	sdp_dbg(&ssk->isk.sk, "Starting dreq wait timeout\n");
 
-	queue_delayed_work(sdp_workqueue, &ssk->dreq_wait_work, timeo);
+	queue_delayed_work(sdp_wq, &ssk->dreq_wait_work, timeo);
 	ssk->dreq_wait_timeout = 1;
 }
 
@@ -866,7 +971,8 @@ void sdp_cancel_dreq_wait_timeout(struct sdp_sock *ssk)
 
 void sdp_destroy_work(struct work_struct *work)
 {
-	struct sdp_sock *ssk = container_of(work, struct sdp_sock, destroy_work);
+	struct sdp_sock *ssk = container_of(work, struct sdp_sock,
+			destroy_work);
 	struct sock *sk = &ssk->isk.sk;
 	sdp_dbg(sk, "%s: refcnt %d\n", __func__, atomic_read(&sk->sk_refcnt));
 
@@ -908,12 +1014,10 @@ void sdp_dreq_wait_timeout_work(struct work_struct *work)
 
 	release_sock(sk);
 
-	if (sdp_sk(sk)->id) {
+	if (sdp_sk(sk)->id)
 		rdma_disconnect(sdp_sk(sk)->id);
-	} else {
-		sdp_warn(sk, "NOT SENDING DREQ - no need to wait for timewait exit\n");
+	else
 		sock_put(sk, SOCK_REF_CM_TW);
-	}
 
 out:
 	sock_put(sk, SOCK_REF_DREQ_TO);
@@ -932,13 +1036,19 @@ int sdp_init_sock(struct sock *sk)
 
 	sk->sk_route_caps |= NETIF_F_SG | NETIF_F_NO_CSUM;
 
-	ssk->rx_ring = NULL;
-	ssk->tx_ring = NULL;
+	skb_queue_head_init(&ssk->rx_ctl_q);
+
+	atomic_set(&ssk->mseq_ack, 0);
+
+	sdp_rx_ring_init(ssk);
+	ssk->tx_ring.buffer = NULL;
 	ssk->sdp_disconnect = 0;
 	ssk->destructed_already = 0;
 	ssk->destruct_in_process = 0;
 	spin_lock_init(&ssk->lock);
 
+	atomic_set(&ssk->somebody_is_doing_posts, 0);
+
 	return 0;
 }
 
@@ -976,7 +1086,7 @@ static void sdp_mark_push(struct sdp_sock *ssk, struct sk_buff *skb)
 {
 	TCP_SKB_CB(skb)->flags |= TCPCB_FLAG_PSH;
 	ssk->pushed_seq = ssk->write_seq;
-	sdp_post_sends(ssk, 0);
+	sdp_do_posts(ssk);
 }
 
 static inline void sdp_push_pending_frames(struct sock *sk)
@@ -984,7 +1094,6 @@ static inline void sdp_push_pending_frames(struct sock *sk)
 	struct sk_buff *skb = sk->sk_send_head;
 	if (skb) {
 		sdp_mark_push(sdp_sk(sk), skb);
-		sdp_post_sends(sdp_sk(sk), 0);
 	}
 }
 
@@ -1072,8 +1181,11 @@ static int sdp_setsockopt(struct sock *sk, int level, int optname,
 			ssk->keepalive_time = val * HZ;
 
 			if (sock_flag(sk, SOCK_KEEPOPEN) &&
-			    !((1 << sk->sk_state) & (TCPF_CLOSE | TCPF_LISTEN)))
-				sdp_reset_keepalive_timer(sk, ssk->keepalive_time);
+			    !((1 << sk->sk_state) &
+				    (TCPF_CLOSE | TCPF_LISTEN))) {
+				sdp_reset_keepalive_timer(sk,
+				ssk->keepalive_time);
+			}
 		}
 		break;
 	case SDP_ZCOPY_THRESH:
@@ -1140,30 +1252,16 @@ static int sdp_getsockopt(struct sock *sk, int level, int optname,
 static inline int poll_recv_cq(struct sock *sk)
 {
 	int i;
-	if (sdp_sk(sk)->cq) {
-		for (i = 0; i < recv_poll; ++i)
-			if (!sdp_poll_cq(sdp_sk(sk), sdp_sk(sk)->cq)) {
-				++recv_poll_hit;
-				return 0;
-			}
-		++recv_poll_miss;
+	for (i = 0; i < recv_poll; ++i) {
+		if (!skb_queue_empty(&sk->sk_receive_queue)) {
+			++recv_poll_hit;
+			return 0;
+		}
 	}
+	++recv_poll_miss;
 	return 1;
 }
 
-static inline void poll_send_cq(struct sock *sk)
-{
-	int i;
-	if (sdp_sk(sk)->cq) {
-		for (i = 0; i < send_poll; ++i)
-			if (!sdp_poll_cq(sdp_sk(sk), sdp_sk(sk)->cq)) {
-				++send_poll_hit;
-				return;
-			}
-		++send_poll_miss;
-	}
-}
-
 /* Like tcp_recv_urg */
 /*
  *	Handle reading urgent data. BSD has very simple semantics for
@@ -1218,30 +1316,8 @@ static int sdp_recv_urg(struct sock *sk, long timeo,
 	return -EAGAIN;
 }
 
-static void sdp_rcv_space_adjust(struct sock *sk)
-{
-	sdp_post_recvs(sdp_sk(sk));
-	sdp_post_sends(sdp_sk(sk), 0);
-}
-
-static unsigned int sdp_current_mss(struct sock *sk, int large_allowed)
-{
-	/* TODO */
-	return PAGE_SIZE;
-}
-
-static int forced_push(struct sdp_sock *sk)
-{
-	/* TODO */
-	return 0;
-}
-
-static inline int select_size(struct sock *sk, struct sdp_sock *ssk)
-{
-	return 0;
-}
-
-static inline void sdp_mark_urg(struct sock *sk, struct sdp_sock *ssk, int flags)
+static inline void sdp_mark_urg(struct sock *sk, struct sdp_sock *ssk,
+		int flags)
 {
 	if (unlikely(flags & MSG_OOB)) {
 		struct sk_buff *skb = sk->sk_write_queue.prev;
@@ -1249,12 +1325,11 @@ static inline void sdp_mark_urg(struct sock *sk, struct sdp_sock *ssk, int flags
 	}
 }
 
-static inline void sdp_push(struct sock *sk, struct sdp_sock *ssk, int flags,
-			    int mss_now, int nonagle)
+static inline void sdp_push(struct sock *sk, struct sdp_sock *ssk, int flags)
 {
 	if (sk->sk_send_head)
 		sdp_mark_urg(sk, ssk, flags);
-	sdp_post_sends(ssk, nonagle);
+	sdp_do_posts(sdp_sk(sk));
 }
 
 static inline void skb_entail(struct sock *sk, struct sdp_sock *ssk,
@@ -1270,24 +1345,21 @@ static inline void skb_entail(struct sock *sk, struct sdp_sock *ssk,
                 ssk->nonagle &= ~TCP_NAGLE_PUSH;
 }
 
-static void sdp_push_one(struct sock *sk, unsigned int mss_now)
-{
-}
-
 static inline struct bzcopy_state *sdp_bz_cleanup(struct bzcopy_state *bz)
 {
-	int i, max_retry;
+	int i;
 	struct sdp_sock *ssk = (struct sdp_sock *)bz->ssk;
 
 	/* Wait for in-flight sends; should be quick */
 	if (bz->busy) {
 		struct sock *sk = &ssk->isk.sk;
+		unsigned long timeout = jiffies + SDP_BZCOPY_POLL_TIMEOUT;
 
-		for (max_retry = 0; max_retry < 10000; max_retry++) {
-			poll_send_cq(sk);
-
+		while (jiffies < timeout) {
+			sdp_xmit_poll(sdp_sk(sk), 1);
 			if (!bz->busy)
 				break;
+			SDPSTATS_COUNTER_INC(bzcopy_poll_miss);
 		}
 
 		if (bz->busy)
@@ -1319,11 +1391,14 @@ static struct bzcopy_state *sdp_bz_setup(struct sdp_sock *ssk,
 	int thresh;
 	mm_segment_t cur_fs;
 
-	cur_fs = get_fs();
-
 	thresh = ssk->zcopy_thresh ? : sdp_zcopy_thresh;
-	if (thresh == 0 || len < thresh || !capable(CAP_IPC_LOCK))
+	if (thresh == 0 || len < thresh || !capable(CAP_IPC_LOCK)) {
+		SDPSTATS_COUNTER_INC(sendmsg_bcopy_segment);
 		return NULL;
+	}
+	SDPSTATS_COUNTER_INC(sendmsg_bzcopy_segment);
+
+	cur_fs = get_fs();
 
 	/*
 	 *   Since we use the TCP segmentation fields of the skb to map user
@@ -1347,7 +1422,8 @@ static struct bzcopy_state *sdp_bz_setup(struct sdp_sock *ssk,
 	bz->busy       = 0;
 	bz->ssk        = ssk;
 	bz->page_cnt   = PAGE_ALIGN(len + bz->cur_offset) >> PAGE_SHIFT;
-	bz->pages      = kcalloc(bz->page_cnt, sizeof(struct page *), GFP_KERNEL);
+	bz->pages      = kcalloc(bz->page_cnt, sizeof(struct page *),
+			GFP_KERNEL);
 
 	if (!bz->pages) {
 		kfree(bz);
@@ -1386,7 +1462,6 @@ static struct bzcopy_state *sdp_bz_setup(struct sdp_sock *ssk,
 	return bz;
 }
 
-
 #define TCP_PAGE(sk)	(sk->sk_sndmsg_page)
 #define TCP_OFF(sk)	(sk->sk_sndmsg_off)
 static inline int sdp_bcopy_get(struct sock *sk, struct sk_buff *skb,
@@ -1413,12 +1488,9 @@ static inline int sdp_bcopy_get(struct sock *sk, struct sk_buff *skb,
 			/* We can extend the last page
 			 * fragment. */
 			merge = 1;
-		} else if (i == ssk->send_frags ||
-			   (!i &&
-			   !(sk->sk_route_caps & NETIF_F_SG))) {
+		} else if (i == ssk->send_frags) {
 			/* Need to add new fragment and cannot
-			 * do this because interface is non-SG,
-			 * or because all the page slots are
+			 * do this because all the page slots are
 			 * busy. */
 			sdp_mark_push(ssk, skb);
 			return SDP_NEW_SEG;
@@ -1445,6 +1517,7 @@ static inline int sdp_bcopy_get(struct sock *sk, struct sk_buff *skb,
 
 		/* Time to copy data. We are close to
 		 * the end! */
+		SDPSTATS_COUNTER_ADD(memcpy_count, copy);
 		err = skb_copy_to_page(sk, from, skb, page,
 				       off, copy);
 		if (err) {
@@ -1478,7 +1551,6 @@ static inline int sdp_bcopy_get(struct sock *sk, struct sk_buff *skb,
 	return copy;
 }
 
-
 static inline int sdp_bzcopy_get(struct sock *sk, struct sk_buff *skb,
 				 unsigned char __user *from, int copy,
 				 struct bzcopy_state *bz)
@@ -1539,23 +1611,27 @@ static inline int sdp_bzcopy_get(struct sock *sk, struct sk_buff *skb,
 	return copy;
 }
 
-static inline int slots_free(struct sdp_sock *ssk)
+/* return the min of:
+ * - tx credits
+ * - free slots in tx_ring (not including SDP_MIN_TX_CREDITS
+ */
+static inline int tx_slots_free(struct sdp_sock *ssk)
 {
 	int min_free;
 
-	min_free = SDP_TX_SIZE - (ssk->tx_head - ssk->tx_tail);
-	if (ssk->bufs < min_free)
-		min_free = ssk->bufs;
-	min_free -= (min_free < SDP_MIN_BUFS) ? min_free : SDP_MIN_BUFS;
+	min_free = MIN(tx_credits(ssk),
+			SDP_TX_SIZE - ring_posted(ssk->tx_ring));
+	if (min_free < SDP_MIN_TX_CREDITS)
+		return 0;
 
-	return min_free;
+	return min_free - SDP_MIN_TX_CREDITS;
 };
 
 /* like sk_stream_memory_free - except measures remote credits */
 static inline int sdp_bzcopy_slots_avail(struct sdp_sock *ssk,
 					 struct bzcopy_state *bz)
 {
-	return slots_free(ssk) > bz->busy;
+	return tx_slots_free(ssk) > bz->busy;
 }
 
 /* like sk_stream_wait_memory - except waits on remote credits */
@@ -1595,14 +1671,24 @@ static int sdp_bzcopy_wait_memory(struct sdp_sock *ssk, long *timeo_p,
 
 		clear_bit(SOCK_ASYNC_NOSPACE, &sk->sk_socket->flags);
 
+		posts_handler_put(ssk);
+
 		if (sdp_bzcopy_slots_avail(ssk, bz))
 			break;
 
 		set_bit(SOCK_NOSPACE, &sk->sk_socket->flags);
 		sk->sk_write_pending++;
+
+		if (tx_credits(ssk) > SDP_MIN_TX_CREDITS)
+			sdp_arm_tx_cq(sk);
+
 		sk_wait_event(sk, &current_timeo,
 			sdp_bzcopy_slots_avail(ssk, bz) && vm_wait);
 		sk->sk_write_pending--;
+		sdp_prf1(sk, NULL, "finished wait for mem");
+
+		posts_handler_get(ssk);
+		sdp_do_posts(ssk);
 
 		if (vm_wait) {
 			vm_wait -= current_timeo;
@@ -1619,25 +1705,6 @@ static int sdp_bzcopy_wait_memory(struct sdp_sock *ssk, long *timeo_p,
 	return err;
 }
 
-/* like sk_stream_write_space - execpt measures remote credits */
-void sdp_bzcopy_write_space(struct sdp_sock *ssk)
-{
-	struct sock *sk = &ssk->isk.sk;
-	struct socket *sock = sk->sk_socket;
-
-	if (ssk->bufs >= ssk->min_bufs &&
-	    ssk->tx_head == ssk->tx_tail &&
-	   sock != NULL) {
-		clear_bit(SOCK_NOSPACE, &sock->flags);
-
-		if (sk->sk_sleep && waitqueue_active(sk->sk_sleep))
-			wake_up_interruptible(sk->sk_sleep);
-		if (sock->fasync_list && !(sk->sk_shutdown & SEND_SHUTDOWN))
-			sock_wake_async(sock, 2, POLL_OUT);
-	}
-}
-
-
 /* Like tcp_sendmsg */
 /* TODO: check locking */
 static int sdp_sendmsg(struct kiocb *iocb, struct sock *sk, struct msghdr *msg,
@@ -1647,14 +1714,20 @@ static int sdp_sendmsg(struct kiocb *iocb, struct sock *sk, struct msghdr *msg,
 	struct sdp_sock *ssk = sdp_sk(sk);
 	struct sk_buff *skb;
 	int iovlen, flags;
-	int mss_now, size_goal;
+	int size_goal;
 	int err, copied;
 	long timeo;
 	struct bzcopy_state *bz = NULL;
-
+	unsigned long long a, b, c;
+	unsigned long long start, end;
+	SDPSTATS_COUNTER_INC(sendmsg);
 	lock_sock(sk);
 	sdp_dbg_data(sk, "%s\n", __func__);
 
+	rdtscll(start);
+
+	posts_handler_get(ssk);
+
 	flags = msg->msg_flags;
 	timeo = sock_sndtimeo(sk, flags & MSG_DONTWAIT);
 
@@ -1666,7 +1739,6 @@ static int sdp_sendmsg(struct kiocb *iocb, struct sock *sk, struct msghdr *msg,
 	/* This should be in poll */
 	clear_bit(SOCK_ASYNC_NOSPACE, &sk->sk_socket->flags);
 
-	mss_now = sdp_current_mss(sk, !(flags&MSG_OOB));
 	size_goal = ssk->xmit_size_goal;
 
 	/* Ok commence sending. */
@@ -1688,9 +1760,16 @@ static int sdp_sendmsg(struct kiocb *iocb, struct sock *sk, struct msghdr *msg,
 		if (size_goal > SDP_MAX_PAYLOAD)
 			size_goal = SDP_MAX_PAYLOAD;
 
+		SDPSTATS_HIST(sendmsg_seglen, seglen);
+
+		rdtscll(a);
 		if (bz)
 			sdp_bz_cleanup(bz);
+		rdtscll(b);
 		bz = sdp_bz_setup(ssk, from, seglen, size_goal);
+		rdtscll(c);
+		SDPSTATS_COUNTER_ADD(bz_clean_sum, b - a);
+		SDPSTATS_COUNTER_ADD(bz_setup_sum, c - b);
 		if (IS_ERR(bz)) {
 			bz = NULL;
 			err = PTR_ERR(bz);
@@ -1704,8 +1783,7 @@ static int sdp_sendmsg(struct kiocb *iocb, struct sock *sk, struct msghdr *msg,
 
 			if (!sk->sk_send_head ||
 			    (copy = size_goal - skb->len) <= 0 ||
-			    bz != *(struct bzcopy_state **)skb->cb) {
-
+			    bz != BZCOPY_STATE(skb)) {
 new_segment:
 				/*
 				 * Allocate a new segment
@@ -1722,12 +1800,12 @@ new_segment:
 						goto wait_for_sndbuf;
 				}
 
-				skb = sdp_stream_alloc_skb(sk, select_size(sk, ssk),
-							   sk->sk_allocation);
+				skb = sdp_stream_alloc_skb(sk, 0,
+						sk->sk_allocation);
 				if (!skb)
 					goto wait_for_memory;
 
-				*((struct bzcopy_state **)skb->cb) = bz;
+				BZCOPY_STATE(skb) = bz;
 
 				/*
 				 * Check whether we can use HW checksum.
@@ -1739,6 +1817,11 @@ new_segment:
 
 				skb_entail(sk, ssk, skb);
 				copy = size_goal;
+			} else {
+				sdp_dbg_data(sk, "adding to existing skb: %p"
+					" len = %d, sk_send_head: %p "
+					"copy: %d\n",
+					skb, skb->len, sk->sk_send_head, copy);
 			}
 
 			/* Try to append data to the end of skb. */
@@ -1753,8 +1836,12 @@ new_segment:
 				goto new_segment;
 			}
 
+			rdtscll(a);
 			copy = (bz) ? sdp_bzcopy_get(sk, skb, from, copy, bz) :
 				      sdp_bcopy_get(sk, skb, from, copy);
+			rdtscll(b);
+			if (copy > 0)
+				SDPSTATS_COUNTER_ADD(tx_copy_sum, b - a);
 			if (unlikely(copy < 0)) {
 				if (!++copy)
 					goto wait_for_memory;
@@ -1777,48 +1864,60 @@ new_segment:
 			if ((seglen -= copy) == 0 && iovlen == 0)
 				goto out;
 
-			if (skb->len < mss_now || (flags & MSG_OOB))
+			if (skb->len < PAGE_SIZE || (flags & MSG_OOB))
 				continue;
-
-			if (forced_push(ssk)) {
-				sdp_mark_push(ssk, skb);
-				/* TODO: and push pending frames mss_now */
-				/* sdp_push_pending(sk, ssk, mss_now, TCP_NAGLE_PUSH); */
-			} else if (skb == sk->sk_send_head)
-				sdp_push_one(sk, mss_now);
 			continue;
 
 wait_for_sndbuf:
 			set_bit(SOCK_NOSPACE, &sk->sk_socket->flags);
 wait_for_memory:
+			sdp_prf(sk, skb, "wait for mem");
+			SDPSTATS_COUNTER_INC(send_wait_for_mem);
 			if (copied)
-				sdp_push(sk, ssk, flags & ~MSG_MORE, mss_now, TCP_NAGLE_PUSH);
+				sdp_push(sk, ssk, flags & ~MSG_MORE);
+
+			sdp_xmit_poll(ssk, 1);
+
+			if (bz) {
+				err = sdp_bzcopy_wait_memory(ssk, &timeo, bz);
+			} else {
+				posts_handler_put(ssk);
+
+				sdp_arm_tx_cq(sk);
+
+				err = sk_stream_wait_memory(sk, &timeo);
+
+				posts_handler_get(ssk);
+				sdp_do_posts(ssk);
+			}
 
-			err = (bz) ? sdp_bzcopy_wait_memory(ssk, &timeo, bz) :
-				     sk_stream_wait_memory(sk, &timeo);
 			if (err)
 				goto do_error;
 
-			mss_now = sdp_current_mss(sk, !(flags&MSG_OOB));
 			size_goal = ssk->xmit_size_goal;
 		}
 	}
 
 out:
 	if (copied) {
-		sdp_push(sk, ssk, flags, mss_now, ssk->nonagle);
+		sdp_push(sk, ssk, flags);
 
 		if (bz)
 			bz = sdp_bz_cleanup(bz);
-		else
-			if (size > send_poll_thresh)
-				poll_send_cq(sk);
 	}
 
+	posts_handler_put(ssk);
+
+	sdp_auto_moderation(ssk);
+
+	rdtscll(end);
+	SDPSTATS_COUNTER_ADD(sendmsg_sum, end - start);
 	release_sock(sk);
 	return copied;
 
 do_fault:
+	sdp_prf(sk, skb, "prepare fault");
+
 	if (!skb->len) {
 		if (sk->sk_send_head == skb)
 			sk->sk_send_head = NULL;
@@ -1833,6 +1932,9 @@ out_err:
 	if (bz)
 		bz = sdp_bz_cleanup(bz);
 	err = sk_stream_error(sk, flags, err);
+
+	posts_handler_put(ssk);
+
 	release_sock(sk);
 	return err;
 }
@@ -1841,7 +1943,7 @@ out_err:
 /* Maybe use skb_recv_datagram here? */
 /* Note this does not seem to handle vectored messages. Relevant? */
 static int sdp_recvmsg(struct kiocb *iocb, struct sock *sk, struct msghdr *msg,
-		       size_t len, int noblock, int flags, 
+		       size_t len, int noblock, int flags,
 		       int *addr_len)
 {
 	struct sk_buff *skb = NULL;
@@ -1858,6 +1960,10 @@ static int sdp_recvmsg(struct kiocb *iocb, struct sock *sk, struct msghdr *msg,
 	lock_sock(sk);
 	sdp_dbg_data(sk, "%s\n", __func__);
 
+	posts_handler_get(ssk);
+
+	sdp_prf(sk, skb, "Read from user");
+
 	err = -ENOTCONN;
 	if (sk->sk_state == TCP_LISTEN)
 		goto out;
@@ -1878,12 +1984,14 @@ static int sdp_recvmsg(struct kiocb *iocb, struct sock *sk, struct msghdr *msg,
 	do {
 		u32 offset;
 
-		/* Are we at urgent data? Stop if we have read anything or have SIGURG pending. */
+		/* Are we at urgent data? Stop if we have read anything or have
+		 * SIGURG pending. */
 		if (ssk->urg_data && ssk->urg_seq == *seq) {
 			if (copied)
 				break;
 			if (signal_pending(current)) {
-				copied = timeo ? sock_intr_errno(timeo) : -EAGAIN;
+				copied = timeo ? sock_intr_errno(timeo) :
+					-EAGAIN;
 				break;
 			}
 		}
@@ -1897,8 +2005,9 @@ static int sdp_recvmsg(struct kiocb *iocb, struct sock *sk, struct msghdr *msg,
 				goto found_fin_ok;
 
 			if (before(*seq, TCP_SKB_CB(skb)->seq)) {
-				printk(KERN_INFO "recvmsg bug: copied %X "
-				       "seq %X\n", *seq, TCP_SKB_CB(skb)->seq);
+				sdp_warn(sk, "recvmsg bug: copied %X seq %X\n",
+					*seq, TCP_SKB_CB(skb)->seq);
+				sdp_reset(sk);
 				break;
 			}
 
@@ -1962,15 +2071,24 @@ static int sdp_recvmsg(struct kiocb *iocb, struct sock *sk, struct msghdr *msg,
 			release_sock(sk);
 			lock_sock(sk);
 		} else if (rc) {
-			sdp_dbg_data(sk, "%s: sk_wait_data %ld\n", __func__, timeo);
+			sdp_dbg_data(sk, "sk_wait_data %ld\n", timeo);
+
+			posts_handler_put(ssk);
+
+			/* socket lock is released inside sk_wait_data */
 			sk_wait_data(sk, &timeo);
+
+			posts_handler_get(ssk);
+			sdp_prf(sk, NULL, "got data");
+
+			sdp_do_posts(ssk);
 		}
 		continue;
 
 	found_ok_skb:
-		sdp_dbg_data(sk, "%s: found_ok_skb len %d\n", __func__, skb->len);
-		sdp_dbg_data(sk, "%s: len %Zd offset %d\n", __func__, len, offset);
-		sdp_dbg_data(sk, "%s: copied %d target %d\n", __func__, copied, target);
+		sdp_dbg_data(sk, "found_ok_skb len %d\n", skb->len);
+		sdp_dbg_data(sk, "len %Zd offset %d\n", len, offset);
+		sdp_dbg_data(sk, "copied %d target %d\n", copied, target);
 		used = skb->len - offset;
 		if (len < used)
 			used = len;
@@ -2011,9 +2129,9 @@ static int sdp_recvmsg(struct kiocb *iocb, struct sock *sk, struct msghdr *msg,
 		len -= used;
 		*seq += used;
 
-		sdp_dbg_data(sk, "%s: done copied %d target %d\n", __func__, copied, target);
+		sdp_dbg_data(sk, "done copied %d target %d\n", copied, target);
 
-		sdp_rcv_space_adjust(sk);
+		sdp_do_posts(sdp_sk(sk));
 skip_copy:
 		if (ssk->urg_data && after(ssk->copied_seq, ssk->urg_seq))
 			ssk->urg_data = 0;
@@ -2021,22 +2139,32 @@ skip_copy:
 			continue;
 		offset = 0;
 
-		if (!(flags & MSG_PEEK))
-			sk_eat_skb(sk, skb, 0);
-
+		if (!(flags & MSG_PEEK)) {
+			struct sdp_bsdh *h;
+			h = (struct sdp_bsdh *)skb_transport_header(skb);
+			sdp_prf1(sk, skb, "READ finished. mseq: %d mseq_ack:%d",
+				ntohl(h->mseq), ntohl(h->mseq_ack));
+			skb_unlink(skb, &sk->sk_receive_queue);
+			__kfree_skb(skb);
+		}
 		continue;
 found_fin_ok:
 		++*seq;
-		if (!(flags & MSG_PEEK))
-			sk_eat_skb(sk, skb, 0);
-
+		if (!(flags & MSG_PEEK)) {
+			skb_unlink(skb, &sk->sk_receive_queue);
+			__kfree_skb(skb);
+		}
 		break;
-	} while (len > 0);
 
-	release_sock(sk);
-	return copied;
+	} while (len > 0);
 
+	err = copied;
 out:
+
+	posts_handler_put(ssk);
+
+	sdp_auto_moderation(ssk);
+
 	release_sock(sk);
 	return err;
 
@@ -2128,8 +2256,8 @@ static unsigned int sdp_poll(struct file *file, struct socket *socket,
        /*
         * Adjust for memory in later kernels
         */
-       if (!sk_stream_memory_free(sk) || !slots_free(ssk))
-               mask &= ~(POLLOUT | POLLWRNORM | POLLWRBAND);
+	if (!sk_stream_memory_free(sk) || !tx_slots_free(ssk))
+		mask &= ~(POLLOUT | POLLWRNORM | POLLWRBAND);
 
 	/* TODO: Slightly ugly: it would be nicer if there was function
 	 * like datagram_poll that didn't include poll_wait,
@@ -2221,7 +2349,7 @@ static int sdp_create_socket(struct net *net, struct socket *sock, int protocol)
 	struct sock *sk;
 	int rc;
 
-	sdp_dbg(NULL, "%s: type %d protocol %d\n", __func__, sock->type, protocol);
+	sdp_dbg(NULL, "type %d protocol %d\n", sock->type, protocol);
 
 	if (net != &init_net)
 		return -EAFNOSUPPORT;
@@ -2256,7 +2384,7 @@ static int sdp_create_socket(struct net *net, struct socket *sock, int protocol)
 
 	sk->sk_destruct = sdp_destruct;
 
-	sdp_init_timer(sk);
+	sdp_init_keepalive_timer(sk);
 
 	sock->ops = &sdp_proto_ops;
 	sock->state = SS_UNCONNECTED;
@@ -2266,192 +2394,6 @@ static int sdp_create_socket(struct net *net, struct socket *sock, int protocol)
 	return 0;
 }
 
-#ifdef CONFIG_PROC_FS
-
-static void *sdp_get_idx(struct seq_file *seq, loff_t pos)
-{
-	int i = 0;
-	struct sdp_sock *ssk;
-
-	if (!list_empty(&sock_list))
-		list_for_each_entry(ssk, &sock_list, sock_list) {
-			if (i == pos)
-				return ssk;
-			i++;
-		}
-
-	return NULL;
-}
-
-static void *sdp_seq_start(struct seq_file *seq, loff_t *pos)
-{
-	void *start = NULL;
-	struct sdp_iter_state* st = seq->private;
-
-	st->num = 0;
-
-	if (!*pos)
-		return SEQ_START_TOKEN;
-
-	spin_lock_irq(&sock_list_lock);
-	start = sdp_get_idx(seq, *pos - 1);
-	if (start)
-		sock_hold((struct sock *)start, SOCK_REF_SEQ);
-	spin_unlock_irq(&sock_list_lock);
-
-	return start;
-}
-
-static void *sdp_seq_next(struct seq_file *seq, void *v, loff_t *pos)
-{
-	struct sdp_iter_state* st = seq->private;
-	void *next = NULL;
-
-	spin_lock_irq(&sock_list_lock);
-	if (v == SEQ_START_TOKEN)
-		next = sdp_get_idx(seq, 0);
-	else
-		next = sdp_get_idx(seq, *pos);
-	if (next)
-		sock_hold((struct sock *)next, SOCK_REF_SEQ);
-	spin_unlock_irq(&sock_list_lock);
-
-	*pos += 1;
-	st->num++;
-
-	return next;
-}
-
-static void sdp_seq_stop(struct seq_file *seq, void *v)
-{
-}
-
-#define TMPSZ 150
-
-static int sdp_seq_show(struct seq_file *seq, void *v)
-{
-	struct sdp_iter_state* st;
-	struct sock *sk = v;
-	char tmpbuf[TMPSZ + 1];
-	unsigned int dest;
-	unsigned int src;
-	int uid;
-	unsigned long inode;
-	__u16 destp;
-	__u16 srcp;
-	__u32 rx_queue, tx_queue;
-
-	if (v == SEQ_START_TOKEN) {
-		seq_printf(seq, "%-*s\n", TMPSZ - 1,
-				"  sl  local_address rem_address        uid inode"
-				"   rx_queue tx_queue state");
-		goto out;
-	}
-
-	st = seq->private;
-
-	dest = inet_sk(sk)->daddr;
-	src = inet_sk(sk)->rcv_saddr;
-	destp = ntohs(inet_sk(sk)->dport);
-	srcp = ntohs(inet_sk(sk)->sport);
-	uid = sock_i_uid(sk);
-	inode = sock_i_ino(sk);
-	rx_queue = sdp_sk(sk)->rcv_nxt - sdp_sk(sk)->copied_seq;
-	tx_queue = sdp_sk(sk)->write_seq - sdp_sk(sk)->snd_una;
-
-	sprintf(tmpbuf, "%4d: %08X:%04X %08X:%04X %5d %lu	%08X:%08X %X",
-		st->num, src, srcp, dest, destp, uid, inode,
-		rx_queue, tx_queue, sk->sk_state);
-
-	seq_printf(seq, "%-*s\n", TMPSZ - 1, tmpbuf);
-
-	sock_put(sk, SOCK_REF_SEQ);
-out:
-	return 0;
-}
-
-static int sdp_seq_open(struct inode *inode, struct file *file)
-{
-	struct sdp_seq_afinfo *afinfo = PDE(inode)->data;
-	struct seq_file *seq;
-	struct sdp_iter_state *s;
-	int rc;
-
-	if (unlikely(afinfo == NULL))
-		return -EINVAL;
-
-	s = kzalloc(sizeof(*s), GFP_KERNEL);
-	if (!s)
-		return -ENOMEM;
-	s->family               = afinfo->family;
-	s->seq_ops.start        = sdp_seq_start;
-	s->seq_ops.next         = sdp_seq_next;
-	s->seq_ops.show         = afinfo->seq_show;
-	s->seq_ops.stop         = sdp_seq_stop;
-
-	rc = seq_open(file, &s->seq_ops);
-	if (rc)
-		goto out_kfree;
-	seq          = file->private_data;
-	seq->private = s;
-out:
-	return rc;
-out_kfree:
-	kfree(s);
-	goto out;
-}
-
-
-static struct file_operations sdp_seq_fops;
-static struct sdp_seq_afinfo sdp_seq_afinfo = {
-	.owner          = THIS_MODULE,
-	.name           = "sdp",
-	.family         = AF_INET_SDP,
-	.seq_show       = sdp_seq_show,
-	.seq_fops       = &sdp_seq_fops,
-};
-
-
-static int __init sdp_proc_init(void)
-{
-	int rc = 0;
-	struct proc_dir_entry *p;
-
-	sdp_seq_afinfo.seq_fops->owner         = sdp_seq_afinfo.owner;
-	sdp_seq_afinfo.seq_fops->open          = sdp_seq_open;
-	sdp_seq_afinfo.seq_fops->read          = seq_read;
-	sdp_seq_afinfo.seq_fops->llseek        = seq_lseek;
-	sdp_seq_afinfo.seq_fops->release       = seq_release_private;
-
-	p = proc_net_fops_create(&init_net, sdp_seq_afinfo.name, S_IRUGO,
-				 sdp_seq_afinfo.seq_fops);
-	if (p)
-		p->data = &sdp_seq_afinfo;
-	else
-		rc = -ENOMEM;
-
-	return rc;
-}
-
-static void sdp_proc_unregister(void)
-{
-	proc_net_remove(&init_net, sdp_seq_afinfo.name);
-	memset(sdp_seq_afinfo.seq_fops, 0, sizeof(*sdp_seq_afinfo.seq_fops));
-}
-
-#else /* CONFIG_PROC_FS */
-
-static int __init sdp_proc_init(void)
-{
-	return 0;
-}
-
-static void sdp_proc_unregister(void)
-{
-
-}
-#endif /* CONFIG_PROC_FS */
-
 static void sdp_add_device(struct ib_device *device)
 {
 }
@@ -2499,12 +2441,12 @@ kill_socks:
 
 			sk->sk_shutdown |= RCV_SHUTDOWN;
 			sdp_reset(sk);
-			if ((1 << sk->sk_state) & 
+			if ((1 << sk->sk_state) &
 				(TCPF_FIN_WAIT1 | TCPF_CLOSE_WAIT |
 				 TCPF_LAST_ACK | TCPF_TIME_WAIT)) {
 				sock_put(sk, SOCK_REF_CM_TW);
 			}
-			
+
 			schedule();
 
 			spin_lock_irq(&sock_list_lock);
@@ -2532,39 +2474,44 @@ static struct ib_client sdp_client = {
 
 static int __init sdp_init(void)
 {
-	int rc;
+	int rc = -ENOMEM;
 
 	INIT_LIST_HEAD(&sock_list);
 	spin_lock_init(&sock_list_lock);
 	spin_lock_init(&sdp_large_sockets_lock);
 
 	sockets_allocated = kmalloc(sizeof(*sockets_allocated), GFP_KERNEL);
+	if (!sockets_allocated)
+		goto no_mem_sockets_allocated;
+
 	orphan_count = kmalloc(sizeof(*orphan_count), GFP_KERNEL);
+	if (!orphan_count)
+		goto no_mem_orphan_count;
+
 	percpu_counter_init(sockets_allocated, 0);
 	percpu_counter_init(orphan_count, 0);
 
 	sdp_proto.sockets_allocated = sockets_allocated;
 	sdp_proto.orphan_count = orphan_count;
 
+	rx_comp_wq = create_singlethread_workqueue("rx_comp_wq");
+	if (!rx_comp_wq)
+		goto no_mem_rx_wq;
 
-	sdp_workqueue = create_singlethread_workqueue("sdp");
-	if (!sdp_workqueue) {
-		return -ENOMEM;
-	}
+	sdp_wq = create_singlethread_workqueue("sdp_wq");
+	if (!sdp_wq)
+		goto no_mem_sdp_wq;
 
 	rc = proto_register(&sdp_proto, 1);
 	if (rc) {
-		printk(KERN_WARNING "%s: proto_register failed: %d\n", __func__, rc);
-		destroy_workqueue(sdp_workqueue);
-		return rc;
+		printk(KERN_WARNING "proto_register failed: %d\n", rc);
+		goto error_proto_reg;
 	}
 
 	rc = sock_register(&sdp_net_proto);
 	if (rc) {
-		printk(KERN_WARNING "%s: sock_register failed: %d\n", __func__, rc);
-		proto_unregister(&sdp_proto);
-		destroy_workqueue(sdp_workqueue);
-		return rc;
+		printk(KERN_WARNING "sock_register failed: %d\n", rc);
+		goto error_sock_reg;
 	}
 
 	sdp_proc_init();
@@ -2574,6 +2521,19 @@ static int __init sdp_init(void)
 	ib_register_client(&sdp_client);
 
 	return 0;
+
+error_sock_reg:
+	proto_unregister(&sdp_proto);
+error_proto_reg:
+	destroy_workqueue(sdp_wq);
+no_mem_sdp_wq:
+	destroy_workqueue(rx_comp_wq);
+no_mem_rx_wq:
+	kfree(orphan_count);
+no_mem_orphan_count:
+	kfree(sockets_allocated);
+no_mem_sockets_allocated:
+	return rc;
 }
 
 static void __exit sdp_exit(void)
@@ -2584,7 +2544,10 @@ static void __exit sdp_exit(void)
 	if (percpu_counter_read_positive(orphan_count))
 		printk(KERN_WARNING "%s: orphan_count %lld\n", __func__,
 		       percpu_counter_read_positive(orphan_count));
-	destroy_workqueue(sdp_workqueue);
+
+	destroy_workqueue(rx_comp_wq);
+	destroy_workqueue(sdp_wq);
+
 	flush_scheduled_work();
 
 	BUG_ON(!list_empty(&sock_list));
diff --git a/drivers/infiniband/ulp/sdp/sdp_proc.c b/drivers/infiniband/ulp/sdp/sdp_proc.c
new file mode 100644
index 0000000..924e2ee
--- /dev/null
+++ b/drivers/infiniband/ulp/sdp/sdp_proc.c
@@ -0,0 +1,496 @@
+/*
+ * Copyright (c) 2008 Mellanox Technologies Ltd.  All rights reserved.
+ *
+ * This software is available to you under a choice of one of two
+ * licenses.  You may choose to be licensed under the terms of the GNU
+ * General Public License (GPL) Version 2, available from the file
+ * COPYING in the main directory of this source tree, or the
+ * OpenIB.org BSD license below:
+ *
+ *     Redistribution and use in source and binary forms, with or
+ *     without modification, are permitted provided that the following
+ *     conditions are met:
+ *
+ *      - Redistributions of source code must retain the above
+ *        copyright notice, this list of conditions and the following
+ *        disclaimer.
+ *
+ *      - Redistributions in binary form must reproduce the above
+ *        copyright notice, this list of conditions and the following
+ *        disclaimer in the documentation and/or other materials
+ *        provided with the distribution.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
+ * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
+ * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
+ * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
+ * BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
+ * ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
+ * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+#include <linux/proc_fs.h>
+#include "sdp_socket.h"
+#include "sdp.h"
+
+#ifdef CONFIG_PROC_FS
+
+#define PROC_SDP_STATS "sdpstats"
+#define PROC_SDP_PERF "sdpprf"
+
+/* just like TCP fs */
+struct sdp_seq_afinfo {
+	struct module           *owner;
+	char                    *name;
+	sa_family_t             family;
+	int                     (*seq_show) (struct seq_file *m, void *v);
+	struct file_operations  *seq_fops;
+};
+
+struct sdp_iter_state {
+	sa_family_t             family;
+	int                     num;
+	struct seq_operations   seq_ops;
+};
+
+static void *sdp_get_idx(struct seq_file *seq, loff_t pos)
+{
+	int i = 0;
+	struct sdp_sock *ssk;
+
+	if (!list_empty(&sock_list))
+		list_for_each_entry(ssk, &sock_list, sock_list) {
+			if (i == pos)
+				return ssk;
+			i++;
+		}
+
+	return NULL;
+}
+
+static void *sdp_seq_start(struct seq_file *seq, loff_t *pos)
+{
+	void *start = NULL;
+	struct sdp_iter_state *st = seq->private;
+
+	st->num = 0;
+
+	if (!*pos)
+		return SEQ_START_TOKEN;
+
+	spin_lock_irq(&sock_list_lock);
+	start = sdp_get_idx(seq, *pos - 1);
+	if (start)
+		sock_hold((struct sock *)start, SOCK_REF_SEQ);
+	spin_unlock_irq(&sock_list_lock);
+
+	return start;
+}
+
+static void *sdp_seq_next(struct seq_file *seq, void *v, loff_t *pos)
+{
+	struct sdp_iter_state *st = seq->private;
+	void *next = NULL;
+
+	spin_lock_irq(&sock_list_lock);
+	if (v == SEQ_START_TOKEN)
+		next = sdp_get_idx(seq, 0);
+	else
+		next = sdp_get_idx(seq, *pos);
+	if (next)
+		sock_hold((struct sock *)next, SOCK_REF_SEQ);
+	spin_unlock_irq(&sock_list_lock);
+
+	*pos += 1;
+	st->num++;
+
+	return next;
+}
+
+static void sdp_seq_stop(struct seq_file *seq, void *v)
+{
+}
+
+#define TMPSZ 150
+
+static int sdp_seq_show(struct seq_file *seq, void *v)
+{
+	struct sdp_iter_state *st;
+	struct sock *sk = v;
+	char tmpbuf[TMPSZ + 1];
+	unsigned int dest;
+	unsigned int src;
+	int uid;
+	unsigned long inode;
+	__u16 destp;
+	__u16 srcp;
+	__u32 rx_queue, tx_queue;
+
+	if (v == SEQ_START_TOKEN) {
+		seq_printf(seq, "%-*s\n", TMPSZ - 1,
+				"  sl  local_address rem_address        "
+				"uid inode   rx_queue tx_queue state");
+		goto out;
+	}
+
+	st = seq->private;
+
+	dest = inet_sk(sk)->daddr;
+	src = inet_sk(sk)->rcv_saddr;
+	destp = ntohs(inet_sk(sk)->dport);
+	srcp = ntohs(inet_sk(sk)->sport);
+	uid = sock_i_uid(sk);
+	inode = sock_i_ino(sk);
+	rx_queue = rcv_nxt(sdp_sk(sk)) - sdp_sk(sk)->copied_seq;
+	tx_queue = sdp_sk(sk)->write_seq - sdp_sk(sk)->tx_ring.una_seq;
+
+	sprintf(tmpbuf, "%4d: %08X:%04X %08X:%04X %5d %lu	%08X:%08X %X",
+		st->num, src, srcp, dest, destp, uid, inode,
+		rx_queue, tx_queue, sk->sk_state);
+
+	seq_printf(seq, "%-*s\n", TMPSZ - 1, tmpbuf);
+
+	sock_put(sk, SOCK_REF_SEQ);
+out:
+	return 0;
+}
+
+static int sdp_seq_open(struct inode *inode, struct file *file)
+{
+	struct sdp_seq_afinfo *afinfo = PDE(inode)->data;
+	struct seq_file *seq;
+	struct sdp_iter_state *s;
+	int rc;
+
+	if (unlikely(afinfo == NULL))
+		return -EINVAL;
+
+	s = kzalloc(sizeof(*s), GFP_KERNEL);
+	if (!s)
+		return -ENOMEM;
+	s->family               = afinfo->family;
+	s->seq_ops.start        = sdp_seq_start;
+	s->seq_ops.next         = sdp_seq_next;
+	s->seq_ops.show         = afinfo->seq_show;
+	s->seq_ops.stop         = sdp_seq_stop;
+
+	rc = seq_open(file, &s->seq_ops);
+	if (rc)
+		goto out_kfree;
+	seq          = file->private_data;
+	seq->private = s;
+out:
+	return rc;
+out_kfree:
+	kfree(s);
+	goto out;
+}
+
+
+static struct file_operations sdp_seq_fops;
+static struct sdp_seq_afinfo sdp_seq_afinfo = {
+	.owner          = THIS_MODULE,
+	.name           = "sdp",
+	.family         = AF_INET_SDP,
+	.seq_show       = sdp_seq_show,
+	.seq_fops       = &sdp_seq_fops,
+};
+
+#ifdef SDPSTATS_ON
+struct sdpstats sdpstats = { { 0 } };
+
+static void sdpstats_seq_hist(struct seq_file *seq, char *str, u32 *h, int n,
+		int is_log)
+{
+	int i;
+	u32 max = 0;
+
+	seq_printf(seq, "%s:\n", str);
+
+	for (i = 0; i < n; i++) {
+		if (h[i] > max)
+			max = h[i];
+	}
+
+	if (max == 0) {
+		seq_printf(seq, " - all values are 0\n");
+		return;
+	}
+
+	for (i = 0; i < n; i++) {
+		char s[51];
+		int j = 50 * h[i] / max;
+		int val = is_log ? (i == n-1 ? 0 : 1<<i) : i;
+		memset(s, '*', j);
+		s[j] = '\0';
+
+		seq_printf(seq, "%10d | %-50s - %d\n", val, s, h[i]);
+	}
+}
+
+static int sdpstats_seq_show(struct seq_file *seq, void *v)
+{
+	int i;
+
+	seq_printf(seq, "SDP statistics:\n");
+
+	sdpstats_seq_hist(seq, "sendmsg_seglen", sdpstats.sendmsg_seglen,
+		ARRAY_SIZE(sdpstats.sendmsg_seglen), 1);
+
+	sdpstats_seq_hist(seq, "send_size", sdpstats.send_size,
+		ARRAY_SIZE(sdpstats.send_size), 1);
+
+	sdpstats_seq_hist(seq, "credits_before_update",
+		sdpstats.credits_before_update,
+		ARRAY_SIZE(sdpstats.credits_before_update), 0);
+
+	seq_printf(seq, "sdp_sendmsg() calls\t\t: %d\n", sdpstats.sendmsg);
+	seq_printf(seq, "bcopy segments     \t\t: %d\n",
+		sdpstats.sendmsg_bcopy_segment);
+	seq_printf(seq, "bzcopy segments    \t\t: %d\n",
+		sdpstats.sendmsg_bzcopy_segment);
+	seq_printf(seq, "post_send_credits  \t\t: %d\n",
+		sdpstats.post_send_credits);
+	seq_printf(seq, "memcpy_count       \t\t: %u\n",
+		sdpstats.memcpy_count);
+
+	for (i = 0; i < ARRAY_SIZE(sdpstats.post_send); i++) {
+		if (mid2str(i)) {
+			seq_printf(seq, "post_send %-20s\t: %d\n",
+					mid2str(i), sdpstats.post_send[i]);
+		}
+	}
+
+	seq_printf(seq, "\n");
+	seq_printf(seq, "post_recv         \t\t: %d\n", sdpstats.post_recv);
+	seq_printf(seq, "BZCopy poll miss  \t\t: %d\n",
+		sdpstats.bzcopy_poll_miss);
+	seq_printf(seq, "send_wait_for_mem \t\t: %d\n",
+		sdpstats.send_wait_for_mem);
+	seq_printf(seq, "send_miss_no_credits\t\t: %d\n",
+		sdpstats.send_miss_no_credits);
+
+	seq_printf(seq, "rx_poll_miss      \t\t: %d\n", sdpstats.rx_poll_miss);
+	seq_printf(seq, "tx_poll_miss      \t\t: %d\n", sdpstats.tx_poll_miss);
+	seq_printf(seq, "tx_poll_busy      \t\t: %d\n", sdpstats.tx_poll_busy);
+	seq_printf(seq, "tx_poll_hit       \t\t: %d\n", sdpstats.tx_poll_hit);
+
+	seq_printf(seq, "CQ stats:\n");
+	seq_printf(seq, "- RX interrupts\t\t: %d\n", sdpstats.rx_int_count);
+	seq_printf(seq, "- TX interrupts\t\t: %d\n", sdpstats.tx_int_count);
+
+	seq_printf(seq, "bz_clean       \t\t: %d\n", sdpstats.sendmsg ?
+		sdpstats.bz_clean_sum / sdpstats.sendmsg : 0);
+	seq_printf(seq, "bz_setup       \t\t: %d\n", sdpstats.sendmsg ?
+		sdpstats.bz_setup_sum / sdpstats.sendmsg : 0);
+	seq_printf(seq, "tx_copy        \t\t: %d\n", sdpstats.sendmsg ?
+		sdpstats.tx_copy_sum / sdpstats.sendmsg : 0);
+	seq_printf(seq, "sendmsg        \t\t: %d\n", sdpstats.sendmsg ?
+		sdpstats.sendmsg_sum / sdpstats.sendmsg : 0);
+	return 0;
+}
+
+static ssize_t sdpstats_write(struct file *file, const char __user *buf,
+			    size_t count, loff_t *offs)
+{
+	memset(&sdpstats, 0, sizeof(sdpstats));
+	printk(KERN_WARNING "Cleared sdp statistics\n");
+
+	return count;
+}
+
+static int sdpstats_seq_open(struct inode *inode, struct file *file)
+{
+	return single_open(file, sdpstats_seq_show, NULL);
+}
+
+static struct file_operations sdpstats_fops = {
+	.owner		= THIS_MODULE,
+	.open		= sdpstats_seq_open,
+	.read		= seq_read,
+	.write		= sdpstats_write,
+	.llseek		= seq_lseek,
+	.release	= single_release,
+};
+
+#endif
+
+#ifdef SDP_PROFILING
+struct sdpprf_log sdpprf_log[SDPPRF_LOG_SIZE];
+int sdpprf_log_count;
+
+static unsigned long long start_t;
+
+static int sdpprf_show(struct seq_file *m, void *v)
+{
+	struct sdpprf_log *l = v;
+	unsigned long nsec_rem, t;
+
+	if (!sdpprf_log_count) {
+		seq_printf(m, "No performance logs\n");
+		goto out;
+	}
+
+	t = l->time - start_t;
+	nsec_rem = do_div(t, 1000000000);
+
+	seq_printf(m, "%-6d: [%5lu.%06lu] %-50s - [%d{%d} %d:%d] "
+			"skb: %p %s:%d\n",
+			l->idx, (unsigned long)t, nsec_rem/1000,
+			l->msg, l->pid, l->cpu, l->sk_num, l->sk_dport,
+			l->skb, l->func, l->line);
+out:
+	return 0;
+}
+
+static void *sdpprf_start(struct seq_file *p, loff_t *pos)
+{
+	int idx = *pos;
+
+	if (!*pos) {
+		if (!sdpprf_log_count)
+			return SEQ_START_TOKEN;
+	}
+
+	if (*pos >= MIN(sdpprf_log_count, SDPPRF_LOG_SIZE - 1))
+		return NULL;
+
+	if (sdpprf_log_count >= SDPPRF_LOG_SIZE - 1) {
+		int off = sdpprf_log_count & (SDPPRF_LOG_SIZE - 1);
+		idx = (idx + off) & (SDPPRF_LOG_SIZE - 1);
+
+	}
+
+	if (!start_t)
+		start_t = sdpprf_log[idx].time;
+	return &sdpprf_log[idx];
+}
+
+static void *sdpprf_next(struct seq_file *p, void *v, loff_t *pos)
+{
+	struct sdpprf_log *l = v;
+
+	if (++*pos >= MIN(sdpprf_log_count, SDPPRF_LOG_SIZE - 1))
+		return NULL;
+
+	++l;
+	if (l - &sdpprf_log[0] >= SDPPRF_LOG_SIZE - 1)
+		return &sdpprf_log[0];
+
+	return l;
+}
+
+static void sdpprf_stop(struct seq_file *p, void *v)
+{
+}
+
+
+const struct seq_operations sdpprf_ops = {
+	.start = sdpprf_start,
+	.stop = sdpprf_stop,
+	.next = sdpprf_next,
+	.show = sdpprf_show,
+};
+
+static int sdpprf_open(struct inode *inode, struct file *file)
+{
+	int res;
+
+	res = seq_open(file, &sdpprf_ops);
+
+	return res;
+}
+
+static ssize_t sdpprf_write(struct file *file, const char __user *buf,
+			    size_t count, loff_t *offs)
+{
+	sdpprf_log_count = 0;
+	printk(KERN_INFO "Cleared sdpprf statistics\n");
+
+	return count;
+}
+
+static const struct file_operations sdpprf_fops = {
+	.open           = sdpprf_open,
+	.read           = seq_read,
+	.llseek         = seq_lseek,
+	.release        = seq_release,
+	.write		= sdpprf_write,
+};
+#endif /* SDP_PROFILING */
+
+int __init sdp_proc_init(void)
+{
+	struct proc_dir_entry *p = NULL;
+	struct proc_dir_entry *sdpstats = NULL;
+	struct proc_dir_entry *sdpprf = NULL;
+
+	sdp_seq_afinfo.seq_fops->owner         = sdp_seq_afinfo.owner;
+	sdp_seq_afinfo.seq_fops->open          = sdp_seq_open;
+	sdp_seq_afinfo.seq_fops->read          = seq_read;
+	sdp_seq_afinfo.seq_fops->llseek        = seq_lseek;
+	sdp_seq_afinfo.seq_fops->release       = seq_release_private;
+
+	p = proc_net_fops_create(&init_net, sdp_seq_afinfo.name, S_IRUGO,
+				 sdp_seq_afinfo.seq_fops);
+	if (p)
+		p->data = &sdp_seq_afinfo;
+	else
+		goto no_mem;
+
+#ifdef SDPSTATS_ON
+
+	sdpstats = proc_net_fops_create(&init_net, PROC_SDP_STATS,
+			S_IRUGO | S_IWUGO, &sdpstats_fops);
+	if (!sdpstats)
+		goto no_mem;
+
+#endif
+
+#ifdef SDP_PROFILING
+	sdpprf = proc_net_fops_create(&init_net, PROC_SDP_PERF,
+			S_IRUGO | S_IWUGO, &sdpprf_fops);
+	if (!sdpprf)
+		goto no_mem;
+#endif
+
+	return 0;
+no_mem:
+	if (sdpprf)
+		proc_net_remove(&init_net, PROC_SDP_PERF);
+
+	if (sdpstats)
+		proc_net_remove(&init_net, PROC_SDP_STATS);
+
+	if (p)
+		proc_net_remove(&init_net, sdp_seq_afinfo.name);
+
+	return -ENOMEM;
+}
+
+void sdp_proc_unregister(void)
+{
+	proc_net_remove(&init_net, sdp_seq_afinfo.name);
+	memset(sdp_seq_afinfo.seq_fops, 0, sizeof(*sdp_seq_afinfo.seq_fops));
+
+#ifdef SDPSTATS_ON
+	proc_net_remove(&init_net, PROC_SDP_STATS);
+#endif
+#ifdef SDP_PROFILING
+	proc_net_remove(&init_net, PROC_SDP_PERF);
+#endif
+}
+
+#else /* CONFIG_PROC_FS */
+
+int __init sdp_proc_init(void)
+{
+	return 0;
+}
+
+void sdp_proc_unregister(void)
+{
+
+}
+#endif /* CONFIG_PROC_FS */
diff --git a/drivers/infiniband/ulp/sdp/sdp_rx.c b/drivers/infiniband/ulp/sdp/sdp_rx.c
new file mode 100644
index 0000000..ed5c2ea
--- /dev/null
+++ b/drivers/infiniband/ulp/sdp/sdp_rx.c
@@ -0,0 +1,850 @@
+/*
+ * Copyright (c) 2009 Mellanox Technologies Ltd.  All rights reserved.
+ *
+ * This software is available to you under a choice of one of two
+ * licenses.  You may choose to be licensed under the terms of the GNU
+ * General Public License (GPL) Version 2, available from the file
+ * COPYING in the main directory of this source tree, or the
+ * OpenIB.org BSD license below:
+ *
+ *     Redistribution and use in source and binary forms, with or
+ *     without modification, are permitted provided that the following
+ *     conditions are met:
+ *
+ *      - Redistributions of source code must retain the above
+ *        copyright notice, this list of conditions and the following
+ *        disclaimer.
+ *
+ *      - Redistributions in binary form must reproduce the above
+ *        copyright notice, this list of conditions and the following
+ *        disclaimer in the documentation and/or other materials
+ *        provided with the distribution.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
+ * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
+ * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
+ * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
+ * BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
+ * ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
+ * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+#include <linux/interrupt.h>
+#include <linux/dma-mapping.h>
+#include <rdma/ib_verbs.h>
+#include <rdma/rdma_cm.h>
+#include "sdp.h"
+
+SDP_MODPARAM_INT(rcvbuf_initial_size, 32 * 1024,
+		"Receive buffer initial size in bytes.");
+SDP_MODPARAM_SINT(rcvbuf_scale, 0x10,
+		"Receive buffer size scale factor.");
+SDP_MODPARAM_SINT(top_mem_usage, 0,
+		"Top system wide sdp memory usage for recv (in MB).");
+
+#ifdef CONFIG_PPC
+SDP_MODPARAM_SINT(max_large_sockets, 100,
+		"Max number of large sockets (32k buffers).");
+#else
+SDP_MODPARAM_SINT(max_large_sockets, 1000,
+		"Max number of large sockets (32k buffers).");
+#endif
+
+static int curr_large_sockets;
+atomic_t sdp_current_mem_usage;
+spinlock_t sdp_large_sockets_lock;
+
+static int sdp_get_large_socket(struct sdp_sock *ssk)
+{
+	int count, ret;
+
+	if (ssk->recv_request)
+		return 1;
+
+	spin_lock_irq(&sdp_large_sockets_lock);
+	count = curr_large_sockets;
+	ret = curr_large_sockets < max_large_sockets;
+	if (ret)
+		curr_large_sockets++;
+	spin_unlock_irq(&sdp_large_sockets_lock);
+
+	return ret;
+}
+
+void sdp_remove_large_sock(struct sdp_sock *ssk)
+{
+	if (ssk->recv_frags) {
+		spin_lock_irq(&sdp_large_sockets_lock);
+		curr_large_sockets--;
+		spin_unlock_irq(&sdp_large_sockets_lock);
+	}
+}
+
+/* Like tcp_fin - called when SDP_MID_DISCONNECT is received */
+static void sdp_fin(struct sock *sk)
+{
+	sdp_dbg(sk, "%s\n", __func__);
+
+	sk->sk_shutdown |= RCV_SHUTDOWN;
+	sock_set_flag(sk, SOCK_DONE);
+
+	switch (sk->sk_state) {
+	case TCP_SYN_RECV:
+	case TCP_ESTABLISHED:
+		sdp_exch_state(sk, TCPF_SYN_RECV | TCPF_ESTABLISHED,
+				TCP_CLOSE_WAIT);
+		break;
+
+	case TCP_FIN_WAIT1:
+		/* Received a reply FIN - start Infiniband tear down */
+		sdp_dbg(sk, "%s: Starting Infiniband tear down sending DREQ\n",
+				__func__);
+
+		sdp_cancel_dreq_wait_timeout(sdp_sk(sk));
+
+		sdp_exch_state(sk, TCPF_FIN_WAIT1, TCP_TIME_WAIT);
+
+		if (sdp_sk(sk)->id) {
+			rdma_disconnect(sdp_sk(sk)->id);
+		} else {
+			sdp_warn(sk, "%s: sdp_sk(sk)->id is NULL\n", __func__);
+			return;
+		}
+		break;
+	case TCP_TIME_WAIT:
+		/* This is a mutual close situation and we've got the DREQ from
+		   the peer before the SDP_MID_DISCONNECT */
+		break;
+	case TCP_CLOSE:
+		/* FIN arrived after IB teardown started - do nothing */
+		sdp_dbg(sk, "%s: fin in state %s\n",
+				__func__, sdp_state_str(sk->sk_state));
+		return;
+	default:
+		sdp_warn(sk, "%s: FIN in unexpected state. sk->sk_state=%d\n",
+				__func__, sk->sk_state);
+		break;
+	}
+
+
+	sk_mem_reclaim(sk);
+
+	if (!sock_flag(sk, SOCK_DEAD)) {
+		sk->sk_state_change(sk);
+
+		/* Do not send POLL_HUP for half duplex close. */
+		if (sk->sk_shutdown == SHUTDOWN_MASK ||
+		    sk->sk_state == TCP_CLOSE)
+			sk_wake_async(sk, 1, POLL_HUP);
+		else
+			sk_wake_async(sk, 1, POLL_IN);
+	}
+}
+
+static int sdp_post_recv(struct sdp_sock *ssk)
+{
+	struct sdp_buf *rx_req;
+	int i, rc, frags;
+	u64 addr;
+	struct ib_device *dev;
+	struct ib_recv_wr rx_wr = { 0 };
+	struct ib_sge ibsge[SDP_MAX_RECV_SKB_FRAGS + 1];
+	struct ib_sge *sge = ibsge;
+	struct ib_recv_wr *bad_wr;
+	struct sk_buff *skb;
+	struct page *page;
+	skb_frag_t *frag;
+	struct sdp_bsdh *h;
+	int id = ring_head(ssk->rx_ring);
+	gfp_t gfp_page;
+	int ret = 0;
+
+	/* Now, allocate and repost recv */
+	/* TODO: allocate from cache */
+
+	if (unlikely(ssk->isk.sk.sk_allocation)) {
+		skb = sdp_stream_alloc_skb(&ssk->isk.sk, SDP_HEAD_SIZE,
+					  ssk->isk.sk.sk_allocation);
+		gfp_page = ssk->isk.sk.sk_allocation | __GFP_HIGHMEM;
+	} else {
+		skb = sdp_stream_alloc_skb(&ssk->isk.sk, SDP_HEAD_SIZE,
+					  GFP_KERNEL);
+		gfp_page = GFP_HIGHUSER;
+	}
+
+	sdp_prf(&ssk->isk.sk, skb, "Posting skb");
+	/* FIXME */
+	BUG_ON(!skb);
+	h = (struct sdp_bsdh *)skb->head;
+	for (i = 0; i < ssk->recv_frags; ++i) {
+		page = alloc_pages(gfp_page, 0);
+		BUG_ON(!page);
+		frag = &skb_shinfo(skb)->frags[i];
+		frag->page                = page;
+		frag->page_offset         = 0;
+		frag->size                =  min(PAGE_SIZE, SDP_MAX_PAYLOAD);
+		++skb_shinfo(skb)->nr_frags;
+		skb->len += frag->size;
+		skb->data_len += frag->size;
+		skb->truesize += frag->size;
+	}
+
+	rx_req = ssk->rx_ring.buffer + (id & (SDP_RX_SIZE - 1));
+	rx_req->skb = skb;
+	dev = ssk->ib_device;
+	addr = ib_dma_map_single(dev, h, SDP_HEAD_SIZE, DMA_FROM_DEVICE);
+	BUG_ON(ib_dma_mapping_error(dev, addr));
+
+	rx_req->mapping[0] = addr;
+
+	/* TODO: proper error handling */
+	sge->addr = (u64)addr;
+	sge->length = SDP_HEAD_SIZE;
+	sge->lkey = ssk->mr->lkey;
+	frags = skb_shinfo(skb)->nr_frags;
+	for (i = 0; i < frags; ++i) {
+		++sge;
+		addr = ib_dma_map_page(dev, skb_shinfo(skb)->frags[i].page,
+				       skb_shinfo(skb)->frags[i].page_offset,
+				       skb_shinfo(skb)->frags[i].size,
+				       DMA_FROM_DEVICE);
+		BUG_ON(ib_dma_mapping_error(dev, addr));
+		rx_req->mapping[i + 1] = addr;
+		sge->addr = addr;
+		sge->length = skb_shinfo(skb)->frags[i].size;
+		sge->lkey = ssk->mr->lkey;
+	}
+
+	rx_wr.next = NULL;
+	rx_wr.wr_id = id | SDP_OP_RECV;
+	rx_wr.sg_list = ibsge;
+	rx_wr.num_sge = frags + 1;
+	rc = ib_post_recv(ssk->qp, &rx_wr, &bad_wr);
+	atomic_inc(&ssk->rx_ring.head);
+	if (unlikely(rc)) {
+		sdp_warn(&ssk->isk.sk, "ib_post_recv failed. status %d\n", rc);
+
+		lock_sock(&ssk->isk.sk);
+		sdp_reset(&ssk->isk.sk);
+		release_sock(&ssk->isk.sk);
+
+		ret = -1;
+	}
+
+	SDPSTATS_COUNTER_INC(post_recv);
+	atomic_add(ssk->recv_frags, &sdp_current_mem_usage);
+
+	return ret;
+}
+
+static inline int sdp_post_recvs_needed(struct sdp_sock *ssk)
+{
+	struct sock *sk = &ssk->isk.sk;
+	int scale = ssk->rcvbuf_scale;
+	int buffer_size = SDP_HEAD_SIZE + ssk->recv_frags * PAGE_SIZE;
+	unsigned long max_bytes;
+
+	if (top_mem_usage && (top_mem_usage * 0x100000) <
+			atomic_read(&sdp_current_mem_usage) * PAGE_SIZE) {
+		scale = 1;
+	}
+
+	max_bytes = sk->sk_rcvbuf * scale;
+
+	if  (unlikely(ring_posted(ssk->rx_ring) >= SDP_RX_SIZE))
+		return 0;
+
+	if (likely(ring_posted(ssk->rx_ring) > SDP_MIN_TX_CREDITS)) {
+		unsigned long bytes_in_process =
+			(ring_posted(ssk->rx_ring) - SDP_MIN_TX_CREDITS) *
+			buffer_size;
+		bytes_in_process += rcv_nxt(ssk) - ssk->copied_seq;
+
+		if (bytes_in_process >= max_bytes) {
+			sdp_prf(sk, NULL,
+				"bytes_in_process:%ld > max_bytes:%ld",
+				bytes_in_process, max_bytes);
+			return 0;
+		}
+	}
+
+	return 1;
+}
+
+static inline void sdp_post_recvs(struct sdp_sock *ssk)
+{
+again:
+	while (sdp_post_recvs_needed(ssk)) {
+		if (sdp_post_recv(ssk))
+			goto out;
+	}
+
+	sk_stream_mem_reclaim(&ssk->isk.sk);
+
+	if (sdp_post_recvs_needed(ssk))
+		goto again;
+out:
+	sk_stream_mem_reclaim(&ssk->isk.sk);
+}
+
+static inline struct sk_buff *sdp_sock_queue_rcv_skb(struct sock *sk,
+						     struct sk_buff *skb)
+{
+	int skb_len;
+	struct sdp_sock *ssk = sdp_sk(sk);
+
+	/* not needed since sk_rmem_alloc is not currently used
+	 * TODO - remove this?
+	skb_set_owner_r(skb, sk); */
+
+	skb_len = skb->len;
+
+	TCP_SKB_CB(skb)->seq = rcv_nxt(ssk);
+	atomic_add(skb_len, &ssk->rcv_nxt);
+
+	skb_queue_tail(&sk->sk_receive_queue, skb);
+
+	if (!sock_flag(sk, SOCK_DEAD))
+		sk->sk_data_ready(sk, skb_len);
+	return skb;
+}
+
+int sdp_init_buffers(struct sdp_sock *ssk, u32 new_size)
+{
+	ssk->recv_frags = PAGE_ALIGN(new_size - SDP_HEAD_SIZE) / PAGE_SIZE;
+	if (ssk->recv_frags > SDP_MAX_RECV_SKB_FRAGS)
+		ssk->recv_frags = SDP_MAX_RECV_SKB_FRAGS;
+	ssk->rcvbuf_scale = rcvbuf_scale;
+
+	sdp_post_recvs(ssk);
+
+	return 0;
+}
+
+int sdp_resize_buffers(struct sdp_sock *ssk, u32 new_size)
+{
+	u32 curr_size = SDP_HEAD_SIZE + ssk->recv_frags * PAGE_SIZE;
+#if defined(__ia64__)
+	/* for huge PAGE_SIZE systems, aka IA64, limit buffers size
+	   [re-]negotiation to a known+working size that will not
+	   trigger a HW error/rc to be interpreted as a IB_WC_LOC_LEN_ERR */
+	u32 max_size = (SDP_HEAD_SIZE + SDP_MAX_RECV_SKB_FRAGS * PAGE_SIZE) <=
+		32784 ?
+		(SDP_HEAD_SIZE + SDP_MAX_RECV_SKB_FRAGS * PAGE_SIZE) : 32784;
+#else
+	u32 max_size = SDP_HEAD_SIZE + SDP_MAX_RECV_SKB_FRAGS * PAGE_SIZE;
+#endif
+
+	if (new_size > curr_size && new_size <= max_size &&
+	    sdp_get_large_socket(ssk)) {
+		ssk->rcvbuf_scale = rcvbuf_scale;
+		ssk->recv_frags = PAGE_ALIGN(new_size - SDP_HEAD_SIZE) /
+			PAGE_SIZE;
+		if (ssk->recv_frags > SDP_MAX_RECV_SKB_FRAGS)
+			ssk->recv_frags = SDP_MAX_RECV_SKB_FRAGS;
+		return 0;
+	} else
+		return -1;
+}
+
+static void sdp_handle_resize_request(struct sdp_sock *ssk,
+		struct sdp_chrecvbuf *buf)
+{
+	if (sdp_resize_buffers(ssk, ntohl(buf->size)) == 0)
+		ssk->recv_request_head = ring_head(ssk->rx_ring) + 1;
+	else
+		ssk->recv_request_head = ring_tail(ssk->rx_ring);
+	ssk->recv_request = 1;
+}
+
+static void sdp_handle_resize_ack(struct sdp_sock *ssk,
+		struct sdp_chrecvbuf *buf)
+{
+	u32 new_size = ntohl(buf->size);
+
+	if (new_size > ssk->xmit_size_goal) {
+		ssk->sent_request = -1;
+		ssk->xmit_size_goal = new_size;
+		ssk->send_frags =
+			PAGE_ALIGN(ssk->xmit_size_goal) / PAGE_SIZE + 1;
+	} else
+		ssk->sent_request = 0;
+}
+
+static inline int credit_update_needed(struct sdp_sock *ssk)
+{
+	int c;
+
+	c = remote_credits(ssk);
+	if (likely(c > SDP_MIN_TX_CREDITS))
+		c += c/2;
+
+	return unlikely(c < ring_posted(ssk->rx_ring)) &&
+	    likely(tx_credits(ssk) > 1) &&
+	    likely(sdp_tx_ring_slots_left(&ssk->tx_ring));
+}
+
+
+static struct sk_buff *sdp_recv_completion(struct sdp_sock *ssk, int id)
+{
+	struct sdp_buf *rx_req;
+	struct ib_device *dev;
+	struct sk_buff *skb;
+	int i, frags;
+
+	if (unlikely(id != ring_tail(ssk->rx_ring))) {
+		printk(KERN_WARNING "Bogus recv completion id %d tail %d\n",
+			id, ring_tail(ssk->rx_ring));
+		return NULL;
+	}
+
+	dev = ssk->ib_device;
+	rx_req = &ssk->rx_ring.buffer[id & (SDP_RX_SIZE - 1)];
+	skb = rx_req->skb;
+	ib_dma_unmap_single(dev, rx_req->mapping[0], SDP_HEAD_SIZE,
+			    DMA_FROM_DEVICE);
+	frags = skb_shinfo(skb)->nr_frags;
+	for (i = 0; i < frags; ++i)
+		ib_dma_unmap_page(dev, rx_req->mapping[i + 1],
+				  skb_shinfo(skb)->frags[i].size,
+				  DMA_FROM_DEVICE);
+	atomic_inc(&ssk->rx_ring.tail);
+	atomic_dec(&ssk->remote_credits);
+	return skb;
+}
+
+/* socket lock should be taken before calling this */
+static int sdp_process_rx_ctl_skb(struct sdp_sock *ssk, struct sk_buff *skb)
+{
+	struct sdp_bsdh *h = (struct sdp_bsdh *)skb_transport_header(skb);
+	struct sock *sk = &ssk->isk.sk;
+
+	switch (h->mid) {
+	case SDP_MID_DATA:
+		WARN_ON(!(sk->sk_shutdown & RCV_SHUTDOWN));
+
+		sdp_warn(sk, "DATA after socket rcv was shutdown\n");
+
+		/* got data in RCV_SHUTDOWN */
+		if (sk->sk_state == TCP_FIN_WAIT1) {
+			sdp_warn(sk, "RX data when state = FIN_WAIT1\n");
+			/* go into abortive close */
+			sdp_exch_state(sk, TCPF_FIN_WAIT1,
+					TCP_TIME_WAIT);
+
+			sk->sk_prot->disconnect(sk, 0);
+		}
+		__kfree_skb(skb);
+
+		break;
+	case SDP_MID_DISCONN:
+		sdp_dbg_data(sk, "Handling RX disconnect\n");
+		sdp_prf(sk, NULL, "Handling RX disconnect");
+		sdp_fin(sk);
+		sdp_prf(sk, NULL, "Queueing fin skb - release recvmsg");
+		/* Enqueue fin skb to release sleeping recvmsg */
+		sdp_sock_queue_rcv_skb(sk, skb);
+		break;
+	case SDP_MID_CHRCVBUF:
+		sdp_dbg_data(sk, "Handling RX CHRCVBUF\n");
+		sdp_handle_resize_request(ssk, (struct sdp_chrecvbuf *)h);
+		__kfree_skb(skb);
+		break;
+	case SDP_MID_CHRCVBUF_ACK:
+		sdp_dbg_data(sk, "Handling RX CHRCVBUF_ACK\n");
+		sdp_handle_resize_ack(ssk, (struct sdp_chrecvbuf *)h);
+		__kfree_skb(skb);
+		break;
+	default:
+		/* TODO: Handle other messages */
+		sdp_warn(sk, "SDP: FIXME MID %d\n", h->mid);
+		__kfree_skb(skb);
+	}
+
+	return 0;
+}
+
+static int sdp_process_rx_skb(struct sdp_sock *ssk, struct sk_buff *skb)
+{
+	struct sock *sk = &ssk->isk.sk;
+	int frags;
+	struct sdp_bsdh *h;
+	int pagesz, i;
+	unsigned long mseq_ack;
+	int credits_before;
+
+	h = (struct sdp_bsdh *)skb_transport_header(skb);
+
+	SDPSTATS_HIST_LINEAR(credits_before_update, tx_credits(ssk));
+
+	mseq_ack = ntohl(h->mseq_ack);
+	credits_before = tx_credits(ssk);
+	atomic_set(&ssk->tx_ring.credits, mseq_ack - ring_head(ssk->tx_ring) +
+			1 + ntohs(h->bufs));
+	if (mseq_ack >= ssk->nagle_last_unacked)
+		ssk->nagle_last_unacked = 0;
+
+	sdp_prf1(&ssk->isk.sk, skb, "RX %s +%d c:%d->%d mseq:%d ack:%d",
+		mid2str(h->mid), ntohs(h->bufs), credits_before,
+		tx_credits(ssk), ntohl(h->mseq), ntohl(h->mseq_ack));
+
+	frags = skb_shinfo(skb)->nr_frags;
+	pagesz = PAGE_ALIGN(skb->data_len);
+	skb_shinfo(skb)->nr_frags = pagesz / PAGE_SIZE;
+
+	for (i = skb_shinfo(skb)->nr_frags; i < frags; ++i) {
+		put_page(skb_shinfo(skb)->frags[i].page);
+		skb->truesize -= PAGE_SIZE;
+	}
+
+/*	if (unlikely(h->flags & SDP_OOB_PEND))
+		sk_send_sigurg(sk);*/
+
+	skb_pull(skb, sizeof(struct sdp_bsdh));
+
+	if (h->mid != SDP_MID_DATA ||
+			unlikely(sk->sk_shutdown & RCV_SHUTDOWN)) {
+		sdp_prf(sk, NULL, "Control skb - queing to control queue");
+		skb_queue_tail(&ssk->rx_ctl_q, skb);
+		return 0;
+	}
+
+	if (unlikely(skb->len <= 0)) {
+		__kfree_skb(skb);
+		return 0;
+	}
+
+	sdp_prf(sk, NULL, "queueing a %s skb",
+			(h->mid == SDP_MID_DATA ? "data" : "disconnect"));
+	skb = sdp_sock_queue_rcv_skb(sk, skb);
+
+/*	if (unlikely(h->flags & SDP_OOB_PRES))
+		sdp_urg(ssk, skb);*/
+
+	return 0;
+}
+
+/* called only from irq */
+static struct sk_buff *sdp_process_rx_wc(struct sdp_sock *ssk,
+		struct ib_wc *wc)
+{
+	struct sk_buff *skb;
+	struct sdp_bsdh *h;
+	struct sock *sk = &ssk->isk.sk;
+	int mseq;
+
+	skb = sdp_recv_completion(ssk, wc->wr_id);
+	if (unlikely(!skb))
+		return NULL;
+
+	atomic_sub(skb_shinfo(skb)->nr_frags, &sdp_current_mem_usage);
+
+	if (unlikely(wc->status)) {
+		if (wc->status != IB_WC_WR_FLUSH_ERR) {
+			sdp_warn(sk, "Recv completion with error. Status %d\n",
+				wc->status);
+			sdp_reset(sk);
+		}
+		__kfree_skb(skb);
+		return NULL;
+	}
+
+	sdp_dbg_data(sk, "Recv completion. ID %d Length %d\n",
+			(int)wc->wr_id, wc->byte_len);
+	if (unlikely(wc->byte_len < sizeof(struct sdp_bsdh))) {
+		sdp_warn(sk, "SDP BUG! byte_len %d < %zd\n",
+				wc->byte_len, sizeof(struct sdp_bsdh));
+		__kfree_skb(skb);
+		return NULL;
+	}
+	skb->len = wc->byte_len;
+	if (likely(wc->byte_len > SDP_HEAD_SIZE))
+		skb->data_len = wc->byte_len - SDP_HEAD_SIZE;
+	else
+		skb->data_len = 0;
+	skb->data = skb->head;
+#ifdef NET_SKBUFF_DATA_USES_OFFSET
+	skb->tail = skb_headlen(skb);
+#else
+	skb->tail = skb->head + skb_headlen(skb);
+#endif
+	h = (struct sdp_bsdh *)skb->data;
+	SDP_DUMP_PACKET(&ssk->isk.sk, "RX", skb, h);
+	skb_reset_transport_header(skb);
+
+	ssk->rx_packets++;
+	ssk->rx_bytes += skb->len;
+
+	mseq = ntohl(h->mseq);
+	atomic_set(&ssk->mseq_ack, mseq);
+	if (mseq != (int)wc->wr_id)
+		sdp_warn(sk, "SDP BUG! mseq %d != wrid %d\n",
+				mseq, (int)wc->wr_id);
+
+	return skb;
+}
+
+/* like sk_stream_write_space - execpt measures remote credits */
+static void sdp_bzcopy_write_space(struct sdp_sock *ssk)
+{
+	struct sock *sk = &ssk->isk.sk;
+	struct socket *sock = sk->sk_socket;
+
+	if (sk->sk_sleep && waitqueue_active(sk->sk_sleep)) {
+		sdp_prf1(&ssk->isk.sk, NULL, "credits: %d, min_bufs: %d. "
+			"tx_head: %d, tx_tail: %d",
+			tx_credits(ssk), ssk->min_bufs,
+			ring_head(ssk->tx_ring), ring_tail(ssk->tx_ring));
+	}
+
+	if (tx_credits(ssk) >= ssk->min_bufs && sock != NULL) {
+		clear_bit(SOCK_NOSPACE, &sock->flags);
+		sdp_prf1(sk, NULL, "Waking up sleepers");
+
+		if (sk->sk_sleep && waitqueue_active(sk->sk_sleep))
+			wake_up_interruptible(sk->sk_sleep);
+		if (sock->fasync_list && !(sk->sk_shutdown & SEND_SHUTDOWN))
+			sock_wake_async(sock, 2, POLL_OUT);
+	}
+}
+
+/* only from interrupt. */
+static int sdp_poll_rx_cq(struct sdp_sock *ssk)
+{
+	struct ib_cq *cq = ssk->rx_ring.cq;
+	struct ib_wc ibwc[SDP_NUM_WC];
+	int n, i;
+	int wc_processed = 0;
+	struct sk_buff *skb;
+
+	do {
+		n = ib_poll_cq(cq, SDP_NUM_WC, ibwc);
+		for (i = 0; i < n; ++i) {
+			struct ib_wc *wc = &ibwc[i];
+
+			BUG_ON(!(wc->wr_id & SDP_OP_RECV));
+			skb = sdp_process_rx_wc(ssk, wc);
+			if (!skb)
+				continue;
+
+			sdp_process_rx_skb(ssk, skb);
+			wc_processed++;
+		}
+	} while (n == SDP_NUM_WC);
+
+	if (wc_processed)
+		sdp_bzcopy_write_space(ssk);
+
+	return wc_processed;
+}
+
+void sdp_rx_comp_work(struct work_struct *work)
+{
+	struct sdp_sock *ssk = container_of(work, struct sdp_sock,
+			rx_comp_work);
+	struct sock *sk = &ssk->isk.sk;
+
+	sdp_prf(sk, NULL, "%s", __func__);
+
+	if (unlikely(!ssk->qp)) {
+		sdp_prf(sk, NULL, "qp was destroyed");
+		return;
+	}
+	if (unlikely(!ssk->rx_ring.cq)) {
+		sdp_prf(sk, NULL, "rx_ring.cq is NULL");
+		return;
+	}
+
+	if (unlikely(!ssk->poll_cq)) {
+		struct rdma_cm_id *id = ssk->id;
+		if (id && id->qp)
+			rdma_notify(id, RDMA_CM_EVENT_ESTABLISHED);
+		return;
+	}
+
+	lock_sock(sk);
+
+	sdp_do_posts(ssk);
+
+	release_sock(sk);
+}
+
+void sdp_do_posts(struct sdp_sock *ssk)
+{
+	struct sock *sk = &ssk->isk.sk;
+	int xmit_poll_force;
+	struct sk_buff *skb;
+
+	while ((skb = skb_dequeue(&ssk->rx_ctl_q)))
+		sdp_process_rx_ctl_skb(ssk, skb);
+
+	if (sk->sk_state == TCP_TIME_WAIT)
+		return;
+
+	if (!ssk->rx_ring.cq || !ssk->tx_ring.cq)
+		return;
+
+	sdp_post_recvs(ssk);
+
+	if (ring_posted(ssk->tx_ring))
+		sdp_xmit_poll(ssk, 1);
+
+	sdp_post_sends(ssk, 0);
+
+	sk_stream_mem_reclaim(sk);
+
+	xmit_poll_force = sk->sk_write_pending &&
+		(tx_credits(ssk) > SDP_MIN_TX_CREDITS);
+
+	if (credit_update_needed(ssk) || xmit_poll_force) {
+		/* if has pending tx because run out of tx_credits - xmit it */
+		sdp_prf(sk, NULL, "Processing to free pending sends");
+		sdp_xmit_poll(ssk,  xmit_poll_force);
+		sdp_prf(sk, NULL, "Sending credit update");
+		sdp_post_sends(ssk, 0);
+	}
+
+}
+
+static void sdp_rx_irq(struct ib_cq *cq, void *cq_context)
+{
+	struct sock *sk = cq_context;
+	struct sdp_sock *ssk = sdp_sk(sk);
+	unsigned long flags;
+	int wc_processed = 0;
+	int credits_before;
+
+	sdp_dbg_data(&ssk->isk.sk, "rx irq called\n");
+
+	if (cq != ssk->rx_ring.cq) {
+		sdp_warn(sk, "cq = %p, ssk->cq = %p\n", cq, ssk->rx_ring.cq);
+		return;
+	}
+
+	SDPSTATS_COUNTER_INC(rx_int_count);
+
+	sdp_prf(sk, NULL, "rx irq");
+
+	rx_ring_lock(ssk, flags);
+
+	credits_before = tx_credits(ssk);
+
+	if (!ssk->rx_ring.cq) {
+		sdp_warn(&ssk->isk.sk, "WARNING: rx irq after cq destroyed\n");
+
+		goto out;
+	}
+
+	wc_processed = sdp_poll_rx_cq(ssk);
+	sdp_prf(&ssk->isk.sk, NULL, "processed %d", wc_processed);
+
+	if (wc_processed) {
+		sdp_prf(&ssk->isk.sk, NULL, "credits:  %d -> %d",
+				credits_before, tx_credits(ssk));
+
+		if (posts_handler(ssk) || (sk->sk_socket &&
+			test_bit(SOCK_ASYNC_WAITDATA, &sk->sk_socket->flags))) {
+
+			sdp_prf(&ssk->isk.sk, NULL, 
+				"Somebody is doing the post work for me. %d",
+				posts_handler(ssk));
+
+		} else {
+			sdp_prf(&ssk->isk.sk, NULL, "Queuing work. ctl_q: %d",
+					!skb_queue_empty(&ssk->rx_ctl_q));
+			queue_work(rx_comp_wq, &ssk->rx_comp_work);
+		}
+	}
+	sdp_arm_rx_cq(sk);
+
+out:
+	rx_ring_unlock(ssk, flags);
+}
+
+static void sdp_rx_ring_purge(struct sdp_sock *ssk)
+{
+	while (ring_posted(ssk->rx_ring) > 0) {
+		struct sk_buff *skb;
+		skb = sdp_recv_completion(ssk, ring_tail(ssk->rx_ring));
+		if (!skb)
+			break;
+		atomic_sub(skb_shinfo(skb)->nr_frags, &sdp_current_mem_usage);
+		__kfree_skb(skb);
+	}
+}
+
+void sdp_rx_ring_init(struct sdp_sock *ssk)
+{
+	ssk->rx_ring.buffer = NULL;
+	spin_lock_init(&ssk->rx_ring.lock);
+}
+
+static void sdp_rx_cq_event_handler(struct ib_event *event, void *data)
+{
+}
+
+int sdp_rx_ring_create(struct sdp_sock *ssk, struct ib_device *device)
+{
+	struct ib_cq *rx_cq;
+	int rc = 0;
+	unsigned long flags;
+
+	rx_ring_lock(ssk, flags);
+
+	atomic_set(&ssk->rx_ring.head, 1);
+	atomic_set(&ssk->rx_ring.tail, 1);
+
+	ssk->rx_ring.buffer = kmalloc(
+			sizeof *ssk->rx_ring.buffer * SDP_RX_SIZE, GFP_KERNEL);
+	if (!ssk->rx_ring.buffer) {
+		rc = -ENOMEM;
+		sdp_warn(&ssk->isk.sk,
+			"Unable to allocate RX Ring size %zd.\n",
+			 sizeof(*ssk->rx_ring.buffer) * SDP_RX_SIZE);
+
+		goto out;
+	}
+
+	/* TODO: use vector=IB_CQ_VECTOR_LEAST_ATTACHED when implemented
+	 * in ofed-1.5 */
+	rx_cq = ib_create_cq(device, sdp_rx_irq, sdp_rx_cq_event_handler,
+			  &ssk->isk.sk, SDP_RX_SIZE, 0);
+
+	if (IS_ERR(rx_cq)) {
+		rc = PTR_ERR(rx_cq);
+		sdp_warn(&ssk->isk.sk, "Unable to allocate RX CQ: %d.\n", rc);
+		goto err_cq;
+	}
+
+	sdp_sk(&ssk->isk.sk)->rx_ring.cq = rx_cq;
+
+	INIT_WORK(&ssk->rx_comp_work, sdp_rx_comp_work);
+
+	sdp_arm_rx_cq(&ssk->isk.sk);
+
+	goto out;
+
+err_cq:
+	kfree(ssk->rx_ring.buffer);
+	ssk->rx_ring.buffer = NULL;
+out:
+	rx_ring_unlock(ssk, flags);
+	return rc;
+}
+
+void sdp_rx_ring_destroy(struct sdp_sock *ssk)
+{
+	if (ssk->rx_ring.buffer) {
+		sdp_rx_ring_purge(ssk);
+
+		kfree(ssk->rx_ring.buffer);
+		ssk->rx_ring.buffer = NULL;
+	}
+
+	if (ssk->rx_ring.cq) {
+		ib_destroy_cq(ssk->rx_ring.cq);
+		ssk->rx_ring.cq = NULL;
+	}
+
+	WARN_ON(ring_head(ssk->rx_ring) != ring_tail(ssk->rx_ring));
+}
diff --git a/drivers/infiniband/ulp/sdp/sdp_tx.c b/drivers/infiniband/ulp/sdp/sdp_tx.c
new file mode 100644
index 0000000..d53e838
--- /dev/null
+++ b/drivers/infiniband/ulp/sdp/sdp_tx.c
@@ -0,0 +1,442 @@
+/*
+ * Copyright (c) 2009 Mellanox Technologies Ltd.  All rights reserved.
+ *
+ * This software is available to you under a choice of one of two
+ * licenses.  You may choose to be licensed under the terms of the GNU
+ * General Public License (GPL) Version 2, available from the file
+ * COPYING in the main directory of this source tree, or the
+ * OpenIB.org BSD license below:
+ *
+ *     Redistribution and use in source and binary forms, with or
+ *     without modification, are permitted provided that the following
+ *     conditions are met:
+ *
+ *      - Redistributions of source code must retain the above
+ *        copyright notice, this list of conditions and the following
+ *        disclaimer.
+ *
+ *      - Redistributions in binary form must reproduce the above
+ *        copyright notice, this list of conditions and the following
+ *        disclaimer in the documentation and/or other materials
+ *        provided with the distribution.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
+ * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
+ * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
+ * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
+ * BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
+ * ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
+ * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+#include <linux/interrupt.h>
+#include <linux/dma-mapping.h>
+#include <rdma/ib_verbs.h>
+#include <rdma/rdma_cm.h>
+#include "sdp.h"
+
+#define sdp_cnt(var) do { (var)++; } while (0)
+
+SDP_MODPARAM_SINT(sdp_keepalive_probes_sent, 0,
+		"Total number of keepalive probes sent.");
+
+static int sdp_process_tx_cq(struct sdp_sock *ssk);
+
+int sdp_xmit_poll(struct sdp_sock *ssk, int force)
+{
+	int wc_processed = 0;
+
+	sdp_prf(&ssk->isk.sk, NULL, "called from %s:%d", func, line);
+
+	/* If we don't have a pending timer, set one up to catch our recent
+	   post in case the interface becomes idle */
+	if (!timer_pending(&ssk->tx_ring.timer))
+		mod_timer(&ssk->tx_ring.timer, jiffies + SDP_TX_POLL_TIMEOUT);
+
+	/* Poll the CQ every SDP_TX_POLL_MODER packets */
+	if (force || (++ssk->tx_ring.poll_cnt & (SDP_TX_POLL_MODER - 1)) == 0)
+		wc_processed = sdp_process_tx_cq(ssk);
+
+	return wc_processed;
+}
+
+static unsigned long last_send;
+
+void sdp_post_send(struct sdp_sock *ssk, struct sk_buff *skb, u8 mid)
+{
+	struct sdp_buf *tx_req;
+	struct sdp_bsdh *h = (struct sdp_bsdh *)skb_push(skb, sizeof *h);
+	unsigned long mseq = ring_head(ssk->tx_ring);
+	int i, rc, frags;
+	u64 addr;
+	struct ib_device *dev;
+	struct ib_send_wr *bad_wr;
+	int delta;
+
+	struct ib_sge ibsge[SDP_MAX_SEND_SKB_FRAGS + 1];
+	struct ib_sge *sge = ibsge;
+	struct ib_send_wr tx_wr = { 0 };
+
+	SDPSTATS_COUNTER_MID_INC(post_send, mid);
+	SDPSTATS_HIST(send_size, skb->len);
+
+	ssk->tx_packets++;
+	ssk->tx_bytes += skb->len;
+
+	h->mid = mid;
+	if (unlikely(TCP_SKB_CB(skb)->flags & TCPCB_FLAG_URG))
+		h->flags = SDP_OOB_PRES | SDP_OOB_PEND;
+	else
+		h->flags = 0;
+
+	h->bufs = htons(ring_posted(ssk->rx_ring));
+	h->len = htonl(skb->len);
+	h->mseq = htonl(mseq);
+	h->mseq_ack = htonl(mseq_ack(ssk));
+
+	sdp_prf1(&ssk->isk.sk, skb, "TX: %s bufs: %d mseq:%ld ack:%d",
+			mid2str(mid), ring_posted(ssk->rx_ring), mseq,
+			ntohl(h->mseq_ack));
+
+	SDP_DUMP_PACKET(&ssk->isk.sk, "TX", skb, h);
+
+	tx_req = &ssk->tx_ring.buffer[mseq & (SDP_TX_SIZE - 1)];
+	tx_req->skb = skb;
+	dev = ssk->ib_device;
+	addr = ib_dma_map_single(dev, skb->data, skb->len - skb->data_len,
+				 DMA_TO_DEVICE);
+	tx_req->mapping[0] = addr;
+
+	/* TODO: proper error handling */
+	BUG_ON(ib_dma_mapping_error(dev, addr));
+
+	sge->addr = addr;
+	sge->length = skb->len - skb->data_len;
+	sge->lkey = ssk->mr->lkey;
+	frags = skb_shinfo(skb)->nr_frags;
+	for (i = 0; i < frags; ++i) {
+		++sge;
+		addr = ib_dma_map_page(dev, skb_shinfo(skb)->frags[i].page,
+				       skb_shinfo(skb)->frags[i].page_offset,
+				       skb_shinfo(skb)->frags[i].size,
+				       DMA_TO_DEVICE);
+		BUG_ON(ib_dma_mapping_error(dev, addr));
+		tx_req->mapping[i + 1] = addr;
+		sge->addr = addr;
+		sge->length = skb_shinfo(skb)->frags[i].size;
+		sge->lkey = ssk->mr->lkey;
+	}
+
+	tx_wr.next = NULL;
+	tx_wr.wr_id = ring_head(ssk->tx_ring) | SDP_OP_SEND;
+	tx_wr.sg_list = ibsge;
+	tx_wr.num_sge = frags + 1;
+	tx_wr.opcode = IB_WR_SEND;
+	tx_wr.send_flags = IB_SEND_SIGNALED;
+	if (unlikely(TCP_SKB_CB(skb)->flags & TCPCB_FLAG_URG))
+		tx_wr.send_flags |= IB_SEND_SOLICITED;
+
+	delta = jiffies - last_send;
+	if (likely(last_send))
+		SDPSTATS_HIST(send_interval, delta);
+	last_send = jiffies;
+
+	rc = ib_post_send(ssk->qp, &tx_wr, &bad_wr);
+	atomic_inc(&ssk->tx_ring.head);
+	atomic_dec(&ssk->tx_ring.credits);
+	atomic_set(&ssk->remote_credits, ring_posted(ssk->rx_ring));
+	if (unlikely(rc)) {
+		sdp_dbg(&ssk->isk.sk,
+				"ib_post_send failed with status %d.\n", rc);
+		sdp_set_error(&ssk->isk.sk, -ECONNRESET);
+		wake_up(&ssk->wq);
+	}
+}
+
+static struct sk_buff *sdp_send_completion(struct sdp_sock *ssk, int mseq)
+{
+	struct ib_device *dev;
+	struct sdp_buf *tx_req;
+	struct sk_buff *skb = NULL;
+	struct bzcopy_state *bz;
+	int i, frags;
+	struct sdp_tx_ring *tx_ring = &ssk->tx_ring;
+	if (unlikely(mseq != ring_tail(*tx_ring))) {
+		printk(KERN_WARNING "Bogus send completion id %d tail %d\n",
+			mseq, ring_tail(*tx_ring));
+		goto out;
+	}
+
+	dev = ssk->ib_device;
+	tx_req = &tx_ring->buffer[mseq & (SDP_TX_SIZE - 1)];
+	skb = tx_req->skb;
+	ib_dma_unmap_single(dev, tx_req->mapping[0], skb->len - skb->data_len,
+			    DMA_TO_DEVICE);
+	frags = skb_shinfo(skb)->nr_frags;
+	for (i = 0; i < frags; ++i) {
+		ib_dma_unmap_page(dev, tx_req->mapping[i + 1],
+				  skb_shinfo(skb)->frags[i].size,
+				  DMA_TO_DEVICE);
+	}
+
+	tx_ring->una_seq += TCP_SKB_CB(skb)->end_seq;
+
+	/* TODO: AIO and real zcopy code; add their context support here */
+	bz = BZCOPY_STATE(skb);
+	if (bz)
+		bz->busy--;
+
+	atomic_inc(&tx_ring->tail);
+
+out:
+	return skb;
+}
+
+static int sdp_handle_send_comp(struct sdp_sock *ssk, struct ib_wc *wc)
+{
+	struct sk_buff *skb = NULL;
+
+	skb = sdp_send_completion(ssk, wc->wr_id);
+	if (unlikely(!skb))
+		return -1;
+
+	if (unlikely(wc->status)) {
+		if (wc->status != IB_WC_WR_FLUSH_ERR) {
+			struct sock *sk = &ssk->isk.sk;
+			sdp_prf(sk, skb, "Send completion with error. "
+				"Status %d", wc->status);
+			sdp_warn(sk, "Send completion with error. "
+				"Status %d\n", wc->status);
+			sdp_set_error(sk, -ECONNRESET);
+			wake_up(&ssk->wq);
+
+			queue_work(sdp_wq, &ssk->destroy_work);
+		}
+	}
+
+#ifdef SDP_PROFILING
+{
+	struct sdp_bsdh *h = (struct sdp_bsdh *)skb->data;
+	sdp_prf1(&ssk->isk.sk, skb, "tx completion. mseq:%d", ntohl(h->mseq));
+}
+#endif
+	sk_wmem_free_skb(&ssk->isk.sk, skb);
+
+	return 0;
+}
+
+static inline void sdp_process_tx_wc(struct sdp_sock *ssk, struct ib_wc *wc)
+{
+	if (likely(wc->wr_id & SDP_OP_SEND)) {
+		sdp_handle_send_comp(ssk, wc);
+		return;
+	}
+
+	/* Keepalive probe sent cleanup */
+	sdp_cnt(sdp_keepalive_probes_sent);
+
+	if (likely(!wc->status))
+		return;
+
+	sdp_dbg(&ssk->isk.sk, " %s consumes KEEPALIVE status %d\n",
+			__func__, wc->status);
+
+	if (wc->status == IB_WC_WR_FLUSH_ERR)
+		return;
+
+	sdp_set_error(&ssk->isk.sk, -ECONNRESET);
+	wake_up(&ssk->wq);
+}
+
+static int sdp_process_tx_cq(struct sdp_sock *ssk)
+{
+	struct ib_wc ibwc[SDP_NUM_WC];
+	int n, i;
+	int wc_processed = 0;
+
+	if (!ssk->tx_ring.cq) {
+		sdp_warn(&ssk->isk.sk, "tx irq on destroyed tx_cq\n");
+		return 0;
+	}
+
+	do {
+		n = ib_poll_cq(ssk->tx_ring.cq, SDP_NUM_WC, ibwc);
+		for (i = 0; i < n; ++i) {
+			sdp_process_tx_wc(ssk, ibwc + i);
+			wc_processed++;
+		}
+	} while (n == SDP_NUM_WC);
+
+	sdp_dbg_data(&ssk->isk.sk, "processed %d wc's\n", wc_processed);
+
+	if (wc_processed) {
+		struct sock *sk = &ssk->isk.sk;
+		sdp_post_sends(ssk, 0);
+
+		if (sk->sk_sleep && waitqueue_active(sk->sk_sleep))
+			sk_stream_write_space(&ssk->isk.sk);
+	}
+
+	return wc_processed;
+}
+
+static void sdp_poll_tx_timeout(unsigned long data)
+{
+	struct sdp_sock *ssk = (struct sdp_sock *)data;
+	struct sock *sk = &ssk->isk.sk;
+	u32 inflight, wc_processed;
+
+	sdp_dbg_data(&ssk->isk.sk, "Polling tx cq. inflight=%d\n",
+		(u32) ring_posted(ssk->tx_ring));
+
+	sdp_prf(&ssk->isk.sk, NULL, "%s. inflight=%d", __func__,
+		(u32) ring_posted(ssk->tx_ring));
+
+	/* Only process if the socket is not in use */
+	bh_lock_sock(sk);
+	if (sock_owned_by_user(sk)) {
+		mod_timer(&ssk->tx_ring.timer, jiffies + SDP_TX_POLL_TIMEOUT);
+		sdp_dbg_data(&ssk->isk.sk, "socket is busy - trying later\n");
+		SDPSTATS_COUNTER_INC(tx_poll_busy);
+		goto out;
+	}
+
+	if (unlikely(sk->sk_state == TCP_CLOSE))
+		goto out;
+
+	wc_processed = sdp_process_tx_cq(ssk);
+	if (!wc_processed)
+		SDPSTATS_COUNTER_INC(tx_poll_miss);
+	else
+		SDPSTATS_COUNTER_INC(tx_poll_hit);
+
+	inflight = (u32) ring_posted(ssk->tx_ring);
+
+	/* If there are still packets in flight and the timer has not already
+	 * been scheduled by the Tx routine then schedule it here to guarantee
+	 * completion processing of these packets */
+	if (inflight) { /* TODO: make sure socket is not closed */
+		sdp_dbg_data(sk, "arming timer for more polling\n");
+		mod_timer(&ssk->tx_ring.timer, jiffies + SDP_TX_POLL_TIMEOUT);
+	}
+
+out:
+	bh_unlock_sock(sk);
+}
+
+static void sdp_tx_irq(struct ib_cq *cq, void *cq_context)
+{
+	struct sock *sk = cq_context;
+	struct sdp_sock *ssk = sdp_sk(sk);
+
+	sdp_prf1(sk, NULL, "Got tx comp interrupt");
+
+	mod_timer(&ssk->tx_ring.timer, jiffies);
+}
+
+void sdp_tx_ring_purge(struct sdp_sock *ssk)
+{
+	while (ring_posted(ssk->tx_ring)) {
+		struct sk_buff *skb;
+		skb = sdp_send_completion(ssk, ring_tail(ssk->tx_ring));
+		if (!skb)
+			break;
+		__kfree_skb(skb);
+	}
+}
+
+void sdp_post_keepalive(struct sdp_sock *ssk)
+{
+	int rc;
+	struct ib_send_wr wr, *bad_wr;
+
+	sdp_dbg(&ssk->isk.sk, "%s\n", __func__);
+
+	memset(&wr, 0, sizeof(wr));
+
+	wr.next    = NULL;
+	wr.wr_id   = 0;
+	wr.sg_list = NULL;
+	wr.num_sge = 0;
+	wr.opcode  = IB_WR_RDMA_WRITE;
+
+	rc = ib_post_send(ssk->qp, &wr, &bad_wr);
+	if (rc) {
+		sdp_dbg(&ssk->isk.sk,
+			"ib_post_keepalive failed with status %d.\n", rc);
+		sdp_set_error(&ssk->isk.sk, -ECONNRESET);
+		wake_up(&ssk->wq);
+	}
+
+	sdp_cnt(sdp_keepalive_probes_sent);
+}
+
+static void sdp_tx_cq_event_handler(struct ib_event *event, void *data)
+{
+}
+
+int sdp_tx_ring_create(struct sdp_sock *ssk, struct ib_device *device)
+{
+	struct ib_cq *tx_cq;
+	int rc = 0;
+
+	atomic_set(&ssk->tx_ring.head, 1);
+	atomic_set(&ssk->tx_ring.tail, 1);
+
+	ssk->tx_ring.buffer = kmalloc(
+			sizeof *ssk->tx_ring.buffer * SDP_TX_SIZE, GFP_KERNEL);
+	if (!ssk->tx_ring.buffer) {
+		rc = -ENOMEM;
+		sdp_warn(&ssk->isk.sk, "Can't allocate TX Ring size %zd.\n",
+			 sizeof(*ssk->tx_ring.buffer) * SDP_TX_SIZE);
+
+		goto out;
+	}
+
+	tx_cq = ib_create_cq(device, sdp_tx_irq, sdp_tx_cq_event_handler,
+			  &ssk->isk.sk, SDP_TX_SIZE, 0);
+
+	if (IS_ERR(tx_cq)) {
+		rc = PTR_ERR(tx_cq);
+		sdp_warn(&ssk->isk.sk, "Unable to allocate TX CQ: %d.\n", rc);
+		goto err_cq;
+	}
+
+	sdp_sk(&ssk->isk.sk)->tx_ring.cq = tx_cq;
+
+	init_timer(&ssk->tx_ring.timer);
+	ssk->tx_ring.timer.function = sdp_poll_tx_timeout;
+	ssk->tx_ring.timer.data = (unsigned long) ssk;
+	ssk->tx_ring.poll_cnt = 0;
+
+	init_timer(&ssk->nagle_timer);
+	ssk->nagle_timer.function = sdp_nagle_timeout;
+	ssk->nagle_timer.data = (unsigned long) ssk;
+
+	return 0;
+
+err_cq:
+	kfree(ssk->tx_ring.buffer);
+	ssk->tx_ring.buffer = NULL;
+out:
+	return rc;
+}
+
+void sdp_tx_ring_destroy(struct sdp_sock *ssk)
+{
+	del_timer(&ssk->nagle_timer);
+
+	if (ssk->tx_ring.buffer) {
+		sdp_tx_ring_purge(ssk);
+
+		kfree(ssk->tx_ring.buffer);
+		ssk->tx_ring.buffer = NULL;
+	}
+
+	if (ssk->tx_ring.cq) {
+		ib_destroy_cq(ssk->tx_ring.cq);
+		ssk->tx_ring.cq = NULL;
+	}
+
+	WARN_ON(ring_head(ssk->tx_ring) != ring_tail(ssk->tx_ring));
+}



More information about the general mailing list