[ofa-general] [PATCH] sdp: process RX CQ from interrupt

Amir Vadai amirv at mellanox.co.il
Mon Jun 22 01:18:44 PDT 2009


Queue RX packets in interrupt context - reduce unnecessary context switches.

Signed-off-by: Amir Vadai <amirv at mellanox.co.il>
---
 drivers/infiniband/ulp/sdp/sdp.h       |  140 +++++++----
 drivers/infiniband/ulp/sdp/sdp_bcopy.c |   66 ++----
 drivers/infiniband/ulp/sdp/sdp_cma.c   |  114 ++-------
 drivers/infiniband/ulp/sdp/sdp_main.c  |  124 +++++-----
 drivers/infiniband/ulp/sdp/sdp_proc.c  |   13 +-
 drivers/infiniband/ulp/sdp/sdp_rx.c    |  442 ++++++++++++++++++++++---------
 drivers/infiniband/ulp/sdp/sdp_tx.c    |  160 ++++++++----
 7 files changed, 614 insertions(+), 445 deletions(-)

diff --git a/drivers/infiniband/ulp/sdp/sdp.h b/drivers/infiniband/ulp/sdp/sdp.h
index 5e78282..57265aa 100644
--- a/drivers/infiniband/ulp/sdp/sdp.h
+++ b/drivers/infiniband/ulp/sdp/sdp.h
@@ -7,11 +7,12 @@
 #include <net/tcp.h> /* For urgent data flags */
 #include <rdma/ib_verbs.h>
 
+#undef SDP_LOCKS_CHECK
 #define SDPSTATS_ON
-#undef CONFIG_INFINIBAND_SDP_DEBUG_DATA
+#define SDP_PROFILING
 
 #define _sdp_printk(func, line, level, sk, format, arg...)                \
-	printk(level "%s:%d sdp_sock(%d %d:%d): " format,             \
+	printk(level "%s:%d sdp_sock(%5d %d:%d): " format,             \
 	       func, line, \
 	       current->pid, \
 	       (sk) ? inet_sk(sk)->num : -1,                 \
@@ -22,6 +23,34 @@
 	sdp_printk(KERN_WARNING, sk, format , ## arg)
 
 
+#ifdef SDP_LOCKS_CHECK
+#define WARN_ON_UNLOCKED(sk, l) do {\
+	if (unlikely(!spin_is_locked(l))) { \
+		sdp_warn(sk, "lock " #l " should be locked\n"); \
+		WARN_ON(1); \
+	} \
+} while (0)
+
+#define WARN_ON_LOCKED(sk, l) do {\
+	if (unlikely(spin_is_locked(l))) { \
+		sdp_warn(sk, "lock " #l " should be unlocked\n"); \
+		WARN_ON(1); \
+	} \
+} while (0)
+#else
+#define WARN_ON_UNLOCKED(sk, l)
+#define WARN_ON_LOCKED(sk, l)
+#endif
+
+#define rx_ring_lock(ssk, f) do { \
+	spin_lock_irqsave(&ssk->rx_ring.lock, f); \
+} while (0)
+
+#define rx_ring_unlock(ssk, f) do { \
+	spin_unlock_irqrestore(&ssk->rx_ring.lock, f); \
+} while (0)
+
+#ifdef SDP_PROFILING
 struct sk_buff;
 struct sdpprf_log {
 	int 		idx;
@@ -37,7 +66,7 @@ struct sdpprf_log {
 	int 		line;
 };
 
-#define SDPPRF_LOG_SIZE 0x10000 /* must be a power of 2 */
+#define SDPPRF_LOG_SIZE 0x20000 /* must be a power of 2 */
 
 extern struct sdpprf_log sdpprf_log[SDPPRF_LOG_SIZE];
 extern int sdpprf_log_count;
@@ -48,7 +77,6 @@ static inline unsigned long long current_nsec(void)
 	getnstimeofday(&tv);
 	return tv.tv_sec * NSEC_PER_SEC + tv.tv_nsec;
 }
-#if 1
 #define sdp_prf(sk, s, format, arg...) ({ \
 	struct sdpprf_log *l = &sdpprf_log[sdpprf_log_count++ & (SDPPRF_LOG_SIZE - 1)]; \
 	l->idx = sdpprf_log_count - 1; \
@@ -66,16 +94,6 @@ static inline unsigned long long current_nsec(void)
 #define sdp_prf(sk, s, format, arg...)
 #endif
 
-#if 0
-#if 1
-#define sdp_prf_rx(sk, s, format, arg...) sdp_prf(sk, s, format, ## arg)
-#define sdp_prf_tx(sk, s, format, arg...)
-#else
-#define sdp_prf_rx(sk, s, format, arg...)
-#define sdp_prf_tx(sk, s, format, arg...) sdp_prf(sk, s, format, ## arg)
-#endif
-#endif
-
 #ifdef CONFIG_INFINIBAND_SDP_DEBUG
 extern int sdp_debug_level;
 
@@ -125,7 +143,6 @@ extern int sdp_data_debug_level;
 	} while (0)
 #else
 #define sdp_dbg_data(priv, format, arg...)
-//	do { (void) (priv); } while (0)
 #define SDP_DUMP_PACKET(sk, str, skb, h)
 #endif
 
@@ -300,23 +317,36 @@ struct sdp_buf {
         u64             mapping[SDP_MAX_SEND_SKB_FRAGS + 1];
 };
 
+#define ring_head(ring)   (atomic_read(&(ring).head))
+#define ring_tail(ring)   (atomic_read(&(ring).tail))
+#define ring_posted(ring) (ring_head(ring) - ring_tail(ring))
 
 struct sdp_tx_ring {
 	struct sdp_buf   *buffer;
-	unsigned          head;
-	unsigned          tail;
+	atomic_t          head;
+	atomic_t          tail;
+	struct ib_cq 	 *cq;
 
 	int 		  una_seq;
-	unsigned 	  credits;
+	atomic_t 	  credits;
+#define tx_credits(ssk) (atomic_read(&ssk->tx_ring.credits))
 
 	struct timer_list timer;
 	u16 		  poll_cnt;
+};
+
+struct sdp_rx_ring {
+	struct sdp_buf   *buffer;
+	atomic_t          head;
+	atomic_t          tail;
 	struct ib_cq 	 *cq;
+
+	spinlock_t 	 lock;
 };
 
 static inline int sdp_tx_ring_slots_left(struct sdp_tx_ring *tx_ring)
 {
-	return SDP_TX_SIZE - (tx_ring->head - tx_ring->tail);
+	return SDP_TX_SIZE - ring_posted(*tx_ring);
 }
 
 struct sdp_chrecvbuf {
@@ -329,6 +359,7 @@ struct sdp_sock {
 	struct list_head sock_list;
 	struct list_head accept_queue;
 	struct list_head backlog_queue;
+	struct sk_buff_head rx_backlog;
 	struct sock *parent;
 
 	struct work_struct rx_comp_work;
@@ -362,32 +393,27 @@ struct sdp_sock {
 	int sdp_disconnect;
 	int destruct_in_process;
 
-	
+	struct sdp_rx_ring rx_ring;
+	struct sdp_tx_ring tx_ring;
 
 	/* Data below will be reset on error */
 	struct rdma_cm_id *id;
 	struct ib_device *ib_device;
 
 	/* SDP specific */
-	struct ib_recv_wr rx_wr;
-	unsigned rx_head;
-	unsigned rx_tail;
-	unsigned mseq_ack;
+	atomic_t mseq_ack;
+#define mseq_ack(ssk) (atomic_read(&ssk->mseq_ack))
 	unsigned max_bufs;	/* Initial buffers offered by other side */
 	unsigned min_bufs;	/* Low water mark to wake senders */
 
-	int               remote_credits;
+	atomic_t               remote_credits;
+#define remote_credits(ssk) (atomic_read(&ssk->remote_credits))
 	int 		  poll_cq;
 
 	/* rdma specific */
 	struct ib_qp *qp;
-	struct ib_cq *rx_cq;
 	struct ib_mr *mr;
 
-	struct sdp_buf *rx_ring;
-	struct sdp_tx_ring tx_ring;
-	struct ib_send_wr tx_wr;
-
 	/* SDP slow start */
 	int rcvbuf_scale; 	/* local recv buf scale for each socket */
 	int sent_request_head; 	/* mark the tx_head of the last send resize
@@ -402,8 +428,6 @@ struct sdp_sock {
 
 	/* BZCOPY data */
 	int   zcopy_thresh;
-
-	struct ib_sge ibsge[SDP_MAX_SEND_SKB_FRAGS + 1];
 };
 
 /* Context used for synchronous zero copy bcopy (BZCOY) */
@@ -499,13 +523,6 @@ static inline void sdp_set_error(struct sock *sk, int err)
 	sk->sk_error_report(sk);
 }
 
-static inline void sdp_arm_rx_cq(struct sock *sk)
-{
-	sdp_dbg_data(sk, "ib_req_notify_cq on RX cq\n");
-	
-	ib_req_notify_cq(sdp_sk(sk)->rx_cq, IB_CQ_NEXT_COMP);
-}
-
 #ifdef CONFIG_INFINIBAND_SDP_DEBUG_DATA
 void _dump_packet(const char *func, int line, struct sock *sk, char *str,
 		struct sk_buff *skb, const struct sdp_bsdh *h);
@@ -524,28 +541,53 @@ void sdp_remove_sock(struct sdp_sock *ssk);
 void sdp_remove_large_sock(struct sdp_sock *ssk);
 void sdp_post_keepalive(struct sdp_sock *ssk);
 void sdp_start_keepalive_timer(struct sock *sk);
-void sdp_bzcopy_write_space(struct sdp_sock *ssk);
 int sdp_init_sock(struct sock *sk);
 int __init sdp_proc_init(void);
 void sdp_proc_unregister(void);
 
+/* sdp_tx.c */
+int sdp_tx_ring_create(struct sdp_sock *ssk, struct ib_device *device);
+void sdp_tx_ring_destroy(struct sdp_sock *ssk);
 int sdp_xmit_poll(struct sdp_sock *ssk, int force);
-void sdp_tx_ring_purge(struct sdp_sock *ssk);
 void sdp_post_send(struct sdp_sock *ssk, struct sk_buff *skb, u8 mid);
 void _sdp_post_sends(const char *func, int line, struct sdp_sock *ssk, int nonagle);
 #define sdp_post_sends(ssk, nonagle) _sdp_post_sends(__func__, __LINE__, ssk, nonagle)
-void sdp_process_tx_wc_work(struct work_struct *work);
-void sdp_poll_tx_cq(unsigned long data);
-void _sdp_poll_tx_cq(unsigned long data);
-void sdp_tx_irq(struct ib_cq *cq, void *cq_context);
 
+/* sdp_rx.c */
+void sdp_rx_ring_init(struct sdp_sock *ssk);
+int sdp_rx_ring_create(struct sdp_sock *ssk, struct ib_device *device);
+void sdp_rx_ring_destroy(struct sdp_sock *ssk);
+int sdp_process_rx_q(struct sdp_sock *ssk);
 int sdp_resize_buffers(struct sdp_sock *ssk, u32 new_size);
 int sdp_init_buffers(struct sdp_sock *ssk, u32 new_size);
-void sdp_rx_ring_purge(struct sdp_sock *ssk);
 void sdp_post_recvs(struct sdp_sock *ssk);
-void sdp_rx_comp_work(struct work_struct *work);
-int sdp_poll_rx_cq(struct sdp_sock *ssk);
-void sdp_rx_irq(struct ib_cq *cq, void *cq_context);
+
+static inline void sdp_arm_rx_cq(struct sock *sk)
+{
+	sdp_prf(sk, NULL, "Arming RX cq");
+	sdp_dbg_data(sk, "Arming RX cq\n");
+	
+	ib_req_notify_cq(sdp_sk(sk)->rx_ring.cq, IB_CQ_NEXT_COMP);
+}
+
+/* utilities */
+static inline char *mid2str(int mid)
+{
+#define ENUM2STR(e) [e] = #e
+	static char *mid2str[] = {
+		ENUM2STR(SDP_MID_HELLO),
+		ENUM2STR(SDP_MID_HELLO_ACK),
+		ENUM2STR(SDP_MID_DISCONN),
+		ENUM2STR(SDP_MID_CHRCVBUF),
+		ENUM2STR(SDP_MID_CHRCVBUF_ACK),
+		ENUM2STR(SDP_MID_DATA),
+	};
+
+	if (mid >= ARRAY_SIZE(mid2str))
+		return NULL;
+
+	return mid2str[mid];
+}
 
 static inline struct sk_buff *sdp_stream_alloc_skb(struct sock *sk, int size, gfp_t gfp)
 {
diff --git a/drivers/infiniband/ulp/sdp/sdp_bcopy.c b/drivers/infiniband/ulp/sdp/sdp_bcopy.c
index 5d4441a..b98171e 100644
--- a/drivers/infiniband/ulp/sdp/sdp_bcopy.c
+++ b/drivers/infiniband/ulp/sdp/sdp_bcopy.c
@@ -45,18 +45,9 @@ void _dump_packet(const char *func, int line, struct sock *sk, char *str,
 {
 	int len = 0;
 	char buf[256];
-#define ENUM2STR(e) [e] = #e
-	static char *mid2str[] = {
-		ENUM2STR(SDP_MID_HELLO),
-		ENUM2STR(SDP_MID_HELLO_ACK),
-		ENUM2STR(SDP_MID_DISCONN),
-		ENUM2STR(SDP_MID_CHRCVBUF),
-		ENUM2STR(SDP_MID_CHRCVBUF_ACK),
-		ENUM2STR(SDP_MID_DATA),
-	};
-	len += snprintf(buf, 255-len, "skb: %p mid: %2x:%-20s flags: 0x%x bufs: %d "
+	len += snprintf(buf, 255-len, "%s skb: %p mid: %2x:%-20s flags: 0x%x bufs: %d "
 		"len: %d mseq: %d mseq_ack: %d",
-		skb, h->mid, mid2str[h->mid], h->flags,
+		str, skb, h->mid, mid2str(h->mid), h->flags,
 		ntohs(h->bufs), ntohl(h->len),ntohl(h->mseq),
 		ntohl(h->mseq_ack));
 
@@ -117,7 +108,7 @@ static inline int sdp_nagle_off(struct sdp_sock *ssk, struct sk_buff *skb)
 	return (ssk->nonagle & TCP_NAGLE_OFF) ||
 		skb->next != (struct sk_buff *)&ssk->isk.sk.sk_write_queue ||
 		skb->len + sizeof(struct sdp_bsdh) >= ssk->xmit_size_goal ||
-		(ssk->tx_ring.tail == ssk->tx_ring.head &&
+		(ring_tail(ssk->tx_ring) == ring_head(ssk->tx_ring) &&
 		 !(ssk->nonagle & TCP_NAGLE_CORK)) ||
 		(TCP_SKB_CB(skb)->flags & TCPCB_FLAG_PSH);
 }
@@ -125,26 +116,14 @@ static inline int sdp_nagle_off(struct sdp_sock *ssk, struct sk_buff *skb)
 int sdp_post_credits(struct sdp_sock *ssk)
 {
 	int post_count = 0;
-	struct sk_buff *skb;
 
 	sdp_dbg_data(&ssk->isk.sk, "credits: %d remote credits: %d "
 			"tx ring slots left: %d send_head: %p\n",
-		ssk->tx_ring.credits, ssk->remote_credits,
+		tx_credits(ssk), remote_credits(ssk),
 		sdp_tx_ring_slots_left(&ssk->tx_ring),
 		ssk->isk.sk.sk_send_head);
 
-	if (ssk->tx_ring.credits > SDP_MIN_TX_CREDITS &&
-	       sdp_tx_ring_slots_left(&ssk->tx_ring) &&
-	       (skb = ssk->isk.sk.sk_send_head) &&
-		sdp_nagle_off(ssk, skb)) {
-		update_send_head(&ssk->isk.sk, skb);
-		__skb_dequeue(&ssk->isk.sk.sk_write_queue);
-		sdp_post_send(ssk, skb, SDP_MID_DATA);
-		post_count++;
-		goto out;
-	}
-
-	if (likely(ssk->tx_ring.credits > 1) &&
+	if (likely(tx_credits(ssk) > 1) &&
 	    likely(sdp_tx_ring_slots_left(&ssk->tx_ring))) {
 		struct sk_buff *skb;
 		skb = sdp_stream_alloc_skb(&ssk->isk.sk,
@@ -156,7 +135,6 @@ int sdp_post_credits(struct sdp_sock *ssk)
 		post_count++;
 	}
 
-out:
 	if (post_count)
 		sdp_xmit_poll(ssk, 0);
 	return post_count;
@@ -188,7 +166,7 @@ void _sdp_post_sends(const char *func, int line, struct sdp_sock *ssk, int nonag
 		gfp_page = GFP_KERNEL;
 
 	sdp_dbg_data(&ssk->isk.sk, "credits: %d tx ring slots left: %d send_head: %p\n",
-		ssk->tx_ring.credits, sdp_tx_ring_slots_left(&ssk->tx_ring),
+		tx_credits(ssk), sdp_tx_ring_slots_left(&ssk->tx_ring),
 		ssk->isk.sk.sk_send_head);
 
 	if (sdp_tx_ring_slots_left(&ssk->tx_ring) < SDP_TX_SIZE / 2) {
@@ -197,8 +175,8 @@ void _sdp_post_sends(const char *func, int line, struct sdp_sock *ssk, int nonag
 	}
 
 	if (ssk->recv_request &&
-	    ssk->rx_tail >= ssk->recv_request_head &&
-	    ssk->tx_ring.credits >= SDP_MIN_TX_CREDITS &&
+	    ring_tail(ssk->rx_ring) >= ssk->recv_request_head &&
+	    tx_credits(ssk) >= SDP_MIN_TX_CREDITS &&
 	    sdp_tx_ring_slots_left(&ssk->tx_ring)) {
 		struct sdp_chrecvbuf *resp_size;
 		ssk->recv_request = 0;
@@ -214,7 +192,7 @@ void _sdp_post_sends(const char *func, int line, struct sdp_sock *ssk, int nonag
 		post_count++;
 	}
 
-	if (ssk->tx_ring.credits <= SDP_MIN_TX_CREDITS &&
+	if (tx_credits(ssk) <= SDP_MIN_TX_CREDITS &&
 	       sdp_tx_ring_slots_left(&ssk->tx_ring) &&
 	       (skb = ssk->isk.sk.sk_send_head) &&
 		sdp_nagle_off(ssk, skb)) {
@@ -222,7 +200,7 @@ void _sdp_post_sends(const char *func, int line, struct sdp_sock *ssk, int nonag
 		sdp_prf(&ssk->isk.sk, skb, "no credits. called from %s:%d", func, line);
 	}
 
-	while (ssk->tx_ring.credits > SDP_MIN_TX_CREDITS &&
+	while (tx_credits(ssk) > SDP_MIN_TX_CREDITS &&
 	       sdp_tx_ring_slots_left(&ssk->tx_ring) &&
 	       (skb = ssk->isk.sk.sk_send_head) &&
 		sdp_nagle_off(ssk, skb)) {
@@ -232,9 +210,9 @@ void _sdp_post_sends(const char *func, int line, struct sdp_sock *ssk, int nonag
 		post_count++;
 	}
 
-	if (0 && ssk->tx_ring.credits == SDP_MIN_TX_CREDITS &&
+	if (0 && tx_credits(ssk) == SDP_MIN_TX_CREDITS &&
 	    !ssk->sent_request &&
-	    ssk->tx_ring.head > ssk->sent_request_head + SDP_RESIZE_WAIT &&
+	    ring_head(ssk->tx_ring) > ssk->sent_request_head + SDP_RESIZE_WAIT &&
 	    sdp_tx_ring_slots_left(&ssk->tx_ring)) {
 		struct sdp_chrecvbuf *req_size;
 		skb = sdp_stream_alloc_skb(&ssk->isk.sk,
@@ -244,19 +222,19 @@ void _sdp_post_sends(const char *func, int line, struct sdp_sock *ssk, int nonag
 		/* FIXME */
 		BUG_ON(!skb);
 		ssk->sent_request = SDP_MAX_SEND_SKB_FRAGS * PAGE_SIZE;
-		ssk->sent_request_head = ssk->tx_ring.head;
+		ssk->sent_request_head = ring_head(ssk->tx_ring);
 		req_size = (struct sdp_chrecvbuf *)skb_put(skb, sizeof *req_size);
 		req_size->size = htonl(ssk->sent_request);
 		sdp_post_send(ssk, skb, SDP_MID_CHRCVBUF);
 		post_count++;
 	}
 
-	c = ssk->remote_credits;
+	c = remote_credits(ssk);
 	if (likely(c > SDP_MIN_TX_CREDITS))
 		c *= 2;
 
-	if (unlikely(c < ssk->rx_head - ssk->rx_tail) &&
-	    likely(ssk->tx_ring.credits > 1) &&
+	if (unlikely(c < ring_posted(ssk->rx_ring)) &&
+	    likely(tx_credits(ssk) > 1) &&
 	    likely(sdp_tx_ring_slots_left(&ssk->tx_ring)) &&
 	    likely((1 << ssk->isk.sk.sk_state) &
 		    (TCPF_ESTABLISHED | TCPF_FIN_WAIT1))) {
@@ -276,8 +254,8 @@ void _sdp_post_sends(const char *func, int line, struct sdp_sock *ssk, int nonag
 	 * messages that provide additional credits and also do not contain ULP
 	 * payload. */
 	if (unlikely(ssk->sdp_disconnect) &&
-		!ssk->isk.sk.sk_send_head &&
-		ssk->tx_ring.credits > (ssk->remote_credits >= ssk->rx_head - ssk->rx_tail)) {
+			!ssk->isk.sk.sk_send_head &&
+			tx_credits(ssk) > 1) {
 		ssk->sdp_disconnect = 0;
 		skb = sdp_stream_alloc_skb(&ssk->isk.sk,
 					  sizeof(struct sdp_bsdh),
@@ -287,14 +265,10 @@ void _sdp_post_sends(const char *func, int line, struct sdp_sock *ssk, int nonag
 		sdp_post_send(ssk, skb, SDP_MID_DISCONN);
 		post_count++;
 	}
+
 	if (post_count) {
 		sdp_xmit_poll(ssk, 0);
 
-		sdp_prf(&ssk->isk.sk, NULL, "finshed polling from post_sends");
+		sdp_prf(&ssk->isk.sk, NULL, "post_sends finished polling [%s:%d].", func, line);
 	}
 }
-
-static inline int sdp_tx_qp_empty(struct sdp_sock *ssk)
-{
-	return (ssk->tx_ring.head - ssk->tx_ring.tail) == 0;
-}
diff --git a/drivers/infiniband/ulp/sdp/sdp_cma.c b/drivers/infiniband/ulp/sdp/sdp_cma.c
index e0d0b20..a9dcf77 100644
--- a/drivers/infiniband/ulp/sdp/sdp_cma.c
+++ b/drivers/infiniband/ulp/sdp/sdp_cma.c
@@ -53,10 +53,6 @@ enum {
 	SDP_HAH_SIZE = 180,
 };
 
-static void sdp_cq_event_handler(struct ib_event *event, void *data)
-{
-}
-
 static void sdp_qp_event_handler(struct ib_event *event, void *data)
 {
 }
@@ -73,36 +69,12 @@ static int sdp_init_qp(struct sock *sk, struct rdma_cm_id *id)
         	.qp_type = IB_QPT_RC,
 	};
 	struct ib_device *device = id->device;
-	struct ib_cq *rx_cq, *tx_cq;
 	struct ib_mr *mr;
 	struct ib_pd *pd;
 	int rc;
 
 	sdp_dbg(sk, "%s\n", __func__);
 
-	sdp_sk(sk)->tx_ring.head = 1;
-	sdp_sk(sk)->tx_ring.tail = 1;
-	sdp_sk(sk)->rx_head = 1;
-	sdp_sk(sk)->rx_tail = 1;
-
-	sdp_sk(sk)->tx_ring.buffer = kmalloc(sizeof(*sdp_sk(sk)->tx_ring.buffer) *
-			(SDP_TX_SIZE + 1), GFP_KERNEL);
-	if (!sdp_sk(sk)->tx_ring.buffer) {
-		rc = -ENOMEM;
-		sdp_warn(sk, "Unable to allocate TX Ring size %zd.\n",
-			 sizeof *sdp_sk(sk)->tx_ring.buffer * (SDP_TX_SIZE + 1));
-		goto err_tx;
-	}
-
-	sdp_sk(sk)->rx_ring = kmalloc(sizeof *sdp_sk(sk)->rx_ring * SDP_RX_SIZE,
-				      GFP_KERNEL);
-	if (!sdp_sk(sk)->rx_ring) {
-		rc = -ENOMEM;
-		sdp_warn(sk, "Unable to allocate RX Ring size %zd.\n",
-			 sizeof *sdp_sk(sk)->rx_ring * SDP_RX_SIZE);
-		goto err_rx;
-	}
-
 	pd = ib_alloc_pd(device);
 	if (IS_ERR(pd)) {
 		rc = PTR_ERR(pd);
@@ -118,43 +90,15 @@ static int sdp_init_qp(struct sock *sk, struct rdma_cm_id *id)
         }
 
 	sdp_sk(sk)->mr = mr;
-	INIT_WORK(&sdp_sk(sk)->rx_comp_work, sdp_rx_comp_work);
-
-	rx_cq = ib_create_cq(device, sdp_rx_irq, sdp_cq_event_handler,
-			  sk, SDP_RX_SIZE, 0);
-
-	if (IS_ERR(rx_cq)) {
-		rc = PTR_ERR(rx_cq);
-		sdp_warn(sk, "Unable to allocate RX CQ: %d.\n", rc);
-		goto err_rx_cq;
-	}
 
-	rc = ib_modify_cq(rx_cq, 10, 200);
-	if (rc) {
-		sdp_warn(sk, "Unable to modify RX CQ: %d.\n", rc);
-		goto err_tx_cq;
-	}
-	sdp_warn(sk, "Initialized CQ moderation\n");
-	sdp_sk(sk)->rx_cq = rx_cq;
-	sdp_arm_rx_cq(sk);
-	qp_init_attr.recv_cq = rx_cq;
-
-	tx_cq = ib_create_cq(device, sdp_tx_irq, sdp_cq_event_handler,
-			  sk, SDP_TX_SIZE, 0);
-
-	if (IS_ERR(tx_cq)) {
-		rc = PTR_ERR(tx_cq);
-		sdp_warn(sk, "Unable to allocate TX CQ: %d.\n", rc);
-		goto err_tx_cq;
-	}
+	if ((rc = sdp_rx_ring_create(sdp_sk(sk), device)))
+		goto err_rx;
 
-	init_timer(&sdp_sk(sk)->tx_ring.timer);
-	sdp_sk(sk)->tx_ring.timer.function = _sdp_poll_tx_cq;
-	sdp_sk(sk)->tx_ring.timer.data = (unsigned long) sdp_sk(sk);
-	sdp_sk(sk)->tx_ring.poll_cnt = 0;
+	if ((rc = sdp_tx_ring_create(sdp_sk(sk), device)))
+		goto err_tx;
 
-	sdp_sk(sk)->tx_ring.cq = tx_cq;
-        qp_init_attr.send_cq = tx_cq;
+	qp_init_attr.recv_cq = sdp_sk(sk)->rx_ring.cq;
+	qp_init_attr.send_cq = sdp_sk(sk)->tx_ring.cq;
 
 	rc = rdma_create_qp(id, pd, &qp_init_attr);
 	if (rc) {
@@ -170,20 +114,14 @@ static int sdp_init_qp(struct sock *sk, struct rdma_cm_id *id)
 	return 0;
 
 err_qp:
-	ib_destroy_cq(tx_cq);
-err_tx_cq:
-	ib_destroy_cq(rx_cq);
-err_rx_cq:
+	sdp_tx_ring_destroy(sdp_sk(sk));
+err_tx:
+	sdp_rx_ring_destroy(sdp_sk(sk));
+err_rx:
 	ib_dereg_mr(sdp_sk(sk)->mr);
 err_mr:
 	ib_dealloc_pd(pd);
 err_pd:
-	kfree(sdp_sk(sk)->rx_ring);
-	sdp_sk(sk)->rx_ring = NULL;
-err_rx:
-	kfree(sdp_sk(sk)->tx_ring.buffer);
-	sdp_sk(sk)->tx_ring.buffer = NULL;
-err_tx:
 	return rc;
 }
 
@@ -225,8 +163,10 @@ static int sdp_connect_handler(struct sock *sk, struct rdma_cm_id *id,
 
 	sdp_add_sock(sdp_sk(child));
 
-	sdp_sk(child)->max_bufs = sdp_sk(child)->tx_ring.credits = ntohs(h->bsdh.bufs);
-	sdp_sk(child)->min_bufs = sdp_sk(child)->tx_ring.credits / 4;
+	sdp_sk(child)->max_bufs = ntohs(h->bsdh.bufs);
+	atomic_set(&sdp_sk(child)->tx_ring.credits, sdp_sk(child)->max_bufs);
+
+	sdp_sk(child)->min_bufs = tx_credits(sdp_sk(child)) / 4;
 	sdp_sk(child)->xmit_size_goal = ntohl(h->localrcvsz) -
 		sizeof(struct sdp_bsdh);
 	sdp_sk(child)->send_frags = PAGE_ALIGN(sdp_sk(child)->xmit_size_goal) /
@@ -236,7 +176,7 @@ static int sdp_connect_handler(struct sock *sk, struct rdma_cm_id *id,
 	sdp_dbg(child, "%s recv_frags: %d tx credits %d xmit_size_goal %d send trigger %d\n",
 		__func__,
 		sdp_sk(child)->recv_frags,
-		sdp_sk(child)->tx_ring.credits,
+		tx_credits(sdp_sk(child)),
 		sdp_sk(child)->xmit_size_goal,
 		sdp_sk(child)->min_bufs);
 
@@ -272,8 +212,9 @@ static int sdp_response_handler(struct sock *sk, struct rdma_cm_id *id,
 
 	h = event->param.conn.private_data;
 	SDP_DUMP_PACKET(sk, "RX", NULL, &h->bsdh);
-	sdp_sk(sk)->max_bufs = sdp_sk(sk)->tx_ring.credits = ntohs(h->bsdh.bufs);
-	sdp_sk(sk)->min_bufs = sdp_sk(sk)->tx_ring.credits / 4;
+	sdp_sk(sk)->max_bufs = ntohs(h->bsdh.bufs);
+	atomic_set(&sdp_sk(sk)->tx_ring.credits, sdp_sk(sk)->max_bufs);
+	sdp_sk(sk)->min_bufs = tx_credits(sdp_sk(sk)) / 4;
 	sdp_sk(sk)->xmit_size_goal = ntohl(h->actrcvsz) -
 		sizeof(struct sdp_bsdh);
 	sdp_sk(sk)->send_frags = MIN(PAGE_ALIGN(sdp_sk(sk)->xmit_size_goal) /
@@ -282,14 +223,12 @@ static int sdp_response_handler(struct sock *sk, struct rdma_cm_id *id,
 		sdp_sk(sk)->send_frags * PAGE_SIZE);
 
 	sdp_dbg(sk, "tx credits %d xmit_size_goal %d send_frags: %d credits update trigger %d\n",
-		sdp_sk(sk)->tx_ring.credits,
+		tx_credits(sdp_sk(sk)),
 		sdp_sk(sk)->xmit_size_goal,
 		sdp_sk(sk)->send_frags,
 		sdp_sk(sk)->min_bufs);
 
 	sdp_sk(sk)->poll_cq = 1;
-	sdp_arm_rx_cq(sk);
-	sdp_poll_rx_cq(sdp_sk(sk));
 
 	sk->sk_state_change(sk);
 	sk_wake_async(sk, 0, POLL_OUT);
@@ -349,8 +288,7 @@ static int sdp_disconnected_handler(struct sock *sk)
 
 	sdp_dbg(sk, "%s\n", __func__);
 
-	if (ssk->rx_cq)
-		sdp_poll_rx_cq(ssk);
+	sdp_process_rx_q(ssk);
 
 	if (ssk->tx_ring.cq)
 		sdp_xmit_poll(ssk, 1);
@@ -400,7 +338,7 @@ int sdp_cma_handler(struct rdma_cm_id *id, struct rdma_cm_event *event)
 		rc = rdma_resolve_route(id, SDP_ROUTE_TIMEOUT);
 		break;
 	case RDMA_CM_EVENT_ADDR_ERROR:
-		sdp_dbg(sk, "RDMA_CM_EVENT_ADDR_ERROR\n");
+		sdp_warn(sk, "RDMA_CM_EVENT_ADDR_ERROR\n");
 		rc = -ENETUNREACH;
 		break;
 	case RDMA_CM_EVENT_ROUTE_RESOLVED:
@@ -408,11 +346,10 @@ int sdp_cma_handler(struct rdma_cm_id *id, struct rdma_cm_event *event)
 		rc = sdp_init_qp(sk, id);
 		if (rc)
 			break;
-		sdp_sk(sk)->remote_credits = sdp_sk(sk)->rx_head -
-			sdp_sk(sk)->rx_tail;
+		atomic_set(&sdp_sk(sk)->remote_credits, ring_posted(sdp_sk(sk)->rx_ring));
 		memset(&hh, 0, sizeof hh);
 		hh.bsdh.mid = SDP_MID_HELLO;
-		hh.bsdh.bufs = htons(sdp_sk(sk)->remote_credits);
+		hh.bsdh.bufs = htons(remote_credits(sdp_sk(sk)));
 		hh.bsdh.len = htonl(sizeof(struct sdp_bsdh) + SDP_HH_SIZE);
 		hh.max_adverts = 1;
 		hh.majv_minv = SDP_MAJV_MINV;
@@ -443,11 +380,10 @@ int sdp_cma_handler(struct rdma_cm_id *id, struct rdma_cm_event *event)
 			break;
 		}
 		child = id->context;
-		sdp_sk(child)->remote_credits = sdp_sk(child)->rx_head -
-			sdp_sk(child)->rx_tail;
+		atomic_set(&sdp_sk(child)->remote_credits, ring_posted(sdp_sk(child)->rx_ring));
 		memset(&hah, 0, sizeof hah);
 		hah.bsdh.mid = SDP_MID_HELLO_ACK;
-		hah.bsdh.bufs = htons(sdp_sk(child)->remote_credits);
+		hah.bsdh.bufs = htons(remote_credits(sdp_sk(child)));
 		hah.bsdh.len = htonl(sizeof(struct sdp_bsdh) + SDP_HAH_SIZE);
 		hah.majv_minv = SDP_MAJV_MINV;
 		hah.ext_max_adverts = 1; /* Doesn't seem to be mandated by spec,
diff --git a/drivers/infiniband/ulp/sdp/sdp_main.c b/drivers/infiniband/ulp/sdp/sdp_main.c
index b5322da..c6b17db 100644
--- a/drivers/infiniband/ulp/sdp/sdp_main.c
+++ b/drivers/infiniband/ulp/sdp/sdp_main.c
@@ -206,31 +206,24 @@ static int sdp_get_port(struct sock *sk, unsigned short snum)
 static void sdp_destroy_qp(struct sdp_sock *ssk)
 {
 	struct ib_pd *pd = NULL;
-	struct ib_cq *rx_cq = NULL;
-	struct ib_cq *tx_cq = NULL;
+	unsigned long flags;
+
+
+	sdp_dbg(&ssk->isk.sk, "destroying qp\n");
 
 	del_timer(&ssk->tx_ring.timer);
 
+	rx_ring_lock(ssk, flags);
+
+	sdp_rx_ring_destroy(ssk);
+	sdp_tx_ring_destroy(ssk);
+
 	if (ssk->qp) {
 		pd = ssk->qp->pd;
-		rx_cq = ssk->rx_cq;
-		ssk->rx_cq = NULL;
-		tx_cq = ssk->tx_ring.cq;
-		ssk->tx_ring.cq = NULL;
 		ib_destroy_qp(ssk->qp);
 		ssk->qp = NULL;
-
-		sdp_rx_ring_purge(ssk);
-		sdp_tx_ring_purge(ssk);
-	}
-
-	if (tx_cq) {
-		ib_destroy_cq(tx_cq);
 	}
 
-	if (rx_cq)
-		ib_destroy_cq(rx_cq);
-
 	if (ssk->mr) {
 		ib_dereg_mr(ssk->mr);
 		ssk->mr = NULL;
@@ -241,14 +234,8 @@ static void sdp_destroy_qp(struct sdp_sock *ssk)
 
 	sdp_remove_large_sock(ssk);
 
-	if (ssk->rx_ring) {
-		kfree(ssk->rx_ring);
-		ssk->rx_ring = NULL;
-	}
-	if (ssk->tx_ring.buffer) {
-		kfree(ssk->tx_ring.buffer);
-		ssk->tx_ring.buffer = NULL;
-	}
+	rx_ring_unlock(ssk, flags);
+
 }
 
 static void sdp_reset_keepalive_timer(struct sock *sk, unsigned long len)
@@ -257,8 +244,8 @@ static void sdp_reset_keepalive_timer(struct sock *sk, unsigned long len)
 
 	sdp_dbg(sk, "%s\n", __func__);
 
-	ssk->keepalive_tx_head = ssk->tx_ring.head;
-	ssk->keepalive_rx_head = ssk->rx_head;
+	ssk->keepalive_tx_head = ring_head(ssk->tx_ring);
+	ssk->keepalive_rx_head = ring_head(ssk->rx_ring);
 
 	sk_reset_timer(sk, &sk->sk_timer, jiffies + len);
 }
@@ -293,8 +280,8 @@ static void sdp_keepalive_timer(unsigned long data)
 	    sk->sk_state == TCP_CLOSE)
 		goto out;
 
-	if (ssk->keepalive_tx_head == ssk->tx_ring.head &&
-	    ssk->keepalive_rx_head == ssk->rx_head)
+	if (ssk->keepalive_tx_head == ring_head(ssk->tx_ring) &&
+	    ssk->keepalive_rx_head == ring_head(ssk->rx_ring))
 		sdp_post_keepalive(ssk);
 
 	sdp_reset_keepalive_timer(sk, sdp_keepalive_time_when(ssk));
@@ -338,17 +325,21 @@ void sdp_reset_sk(struct sock *sk, int rc)
 
 	read_lock(&device_removal_lock);
 
-	if (ssk->rx_cq)
-		sdp_poll_rx_cq(ssk);
+	sdp_process_rx_q(ssk);
 
 	if (ssk->tx_ring.cq)
 		sdp_xmit_poll(ssk, 1);
 
-	if (!(sk->sk_shutdown & RCV_SHUTDOWN) || !sk_stream_memory_free(sk))
+	if (!(sk->sk_shutdown & RCV_SHUTDOWN) || !sk_stream_memory_free(sk)) {
+		sdp_warn(sk, "setting state to error. shutdown: %d, mem_free: %d\n",
+				!(sk->sk_shutdown & RCV_SHUTDOWN),
+				!sk_stream_memory_free(sk));
 		sdp_set_error(sk, rc);
+	}
 
 	sdp_destroy_qp(ssk);
 
+	sdp_dbg(sk, "memset on sdp_sock\n");
 	memset((void *)&ssk->id, 0, sizeof(*ssk) - offsetof(typeof(*ssk), id));
 
 	sk->sk_state_change(sk);
@@ -488,6 +479,7 @@ static void sdp_close(struct sock *sk, long timeout)
 	lock_sock(sk);
 
 	sdp_dbg(sk, "%s\n", __func__);
+	sdp_prf(sk, NULL, __func__);
 
 	sdp_delete_keepalive_timer(sk);
 
@@ -773,10 +765,9 @@ out:
 	release_sock(sk);
 	if (newsk) {
 		lock_sock(newsk);
-		if (newssk->rx_cq) {
+		if (newssk->rx_ring.cq) {
 			newssk->poll_cq = 1;
 			sdp_arm_rx_cq(&newssk->isk.sk);
-			sdp_poll_rx_cq(newssk);
 		}
 		release_sock(newsk);
 	}
@@ -934,7 +925,11 @@ int sdp_init_sock(struct sock *sk)
 
 	sk->sk_route_caps |= NETIF_F_SG | NETIF_F_NO_CSUM;
 
-	ssk->rx_ring = NULL;
+	skb_queue_head_init(&ssk->rx_backlog);
+
+	atomic_set(&ssk->mseq_ack, 0);
+
+	sdp_rx_ring_init(ssk);
 	ssk->tx_ring.buffer = NULL;
 	ssk->sdp_disconnect = 0;
 	ssk->destructed_already = 0;
@@ -1142,14 +1137,13 @@ static int sdp_getsockopt(struct sock *sk, int level, int optname,
 static inline int poll_recv_cq(struct sock *sk)
 {
 	int i;
-	if (sdp_sk(sk)->rx_cq) {
-		for (i = 0; i < recv_poll; ++i)
-			if (!sdp_poll_rx_cq(sdp_sk(sk))) {
-				++recv_poll_hit;
-				return 0;
-			}
-		++recv_poll_miss;
+	for (i = 0; i < recv_poll; ++i) {
+		if (!sdp_process_rx_q(sdp_sk(sk))) {
+			++recv_poll_hit;
+			return 0;
+		}
 	}
+	++recv_poll_miss;
 	return 1;
 }
 
@@ -1551,8 +1545,8 @@ static inline int slots_free(struct sdp_sock *ssk)
 {
 	int min_free;
 
-	min_free = MIN(ssk->tx_ring.credits,
-			SDP_TX_SIZE - (ssk->tx_ring.head - ssk->tx_ring.tail));
+	min_free = MIN(tx_credits(ssk),
+			SDP_TX_SIZE - ring_posted(ssk->tx_ring));
 	if (min_free < SDP_MIN_TX_CREDITS)
 		return 0;
 
@@ -1608,6 +1602,9 @@ static int sdp_bzcopy_wait_memory(struct sdp_sock *ssk, long *timeo_p,
 
 		set_bit(SOCK_NOSPACE, &sk->sk_socket->flags);
 		sk->sk_write_pending++;
+		sdp_prf(sk, NULL, "credits: %d, head: %d, tail: %d, busy: %d",
+				tx_credits(ssk), ring_head(ssk->tx_ring), ring_tail(ssk->tx_ring),
+				bz->busy);
 		sk_wait_event(sk, &current_timeo,
 			sdp_bzcopy_slots_avail(ssk, bz) && vm_wait);
 		sk->sk_write_pending--;
@@ -1627,24 +1624,6 @@ static int sdp_bzcopy_wait_memory(struct sdp_sock *ssk, long *timeo_p,
 	return err;
 }
 
-/* like sk_stream_write_space - execpt measures remote credits */
-void sdp_bzcopy_write_space(struct sdp_sock *ssk)
-{
-	struct sock *sk = &ssk->isk.sk;
-	struct socket *sock = sk->sk_socket;
-
-	if (ssk->tx_ring.credits >= ssk->min_bufs &&
-	    ssk->tx_ring.head == ssk->tx_ring.tail &&
-	   sock != NULL) {
-		clear_bit(SOCK_NOSPACE, &sock->flags);
-
-		if (sk->sk_sleep && waitqueue_active(sk->sk_sleep))
-			wake_up_interruptible(sk->sk_sleep);
-		if (sock->fasync_list && !(sk->sk_shutdown & SEND_SHUTDOWN))
-			sock_wake_async(sock, 2, POLL_OUT);
-	}
-}
-
 /* Like tcp_sendmsg */
 /* TODO: check locking */
 static int sdp_sendmsg(struct kiocb *iocb, struct sock *sk, struct msghdr *msg,
@@ -1735,7 +1714,7 @@ new_segment:
 				if (!skb)
 					goto wait_for_memory;
 
-				sdp_prf(sk, skb, "Created");
+//				sdp_prf(sk, skb, "Created");
 
 				BZCOPY_STATE(skb) = bz;
 
@@ -1751,7 +1730,7 @@ new_segment:
 				skb_entail(sk, ssk, skb);
 				copy = size_goal;
 			} else {
-				sdp_prf(sk, skb, "adding %d bytes", copy);
+//				sdp_prf(sk, skb, "adding %d bytes", copy);
 				sdp_dbg_data(sk, "adding to existing skb: %p"
 					" len = %d, sk_send_head: %p copy: %d\n",
 					skb, skb->len, sk->sk_send_head, copy);
@@ -1770,10 +1749,8 @@ new_segment:
 				goto new_segment;
 			}
 
-//			sdp_prf(sk, skb, "before memcpy %d bytes", copy);
 			copy = (bz) ? sdp_bzcopy_get(sk, skb, from, copy, bz) :
 				      sdp_bcopy_get(sk, skb, from, copy);
-//			sdp_prf(sk, skb, "after memcpy. result: %d", copy);
 			if (unlikely(copy < 0)) {
 				if (!++copy)
 					goto wait_for_memory;
@@ -1863,6 +1840,20 @@ out_err:
 	return err;
 }
 
+int dummy_memcpy_toiovec(struct iovec *iov, int len)
+{
+	while (len > 0) {
+		if (iov->iov_len) {
+			int copy = min_t(unsigned int, iov->iov_len, len);
+			len -= copy;
+			iov->iov_len -= copy;
+			iov->iov_base += copy;
+		}
+		iov++;
+	}
+
+	return 0;
+}
 /* Like tcp_recvmsg */
 /* Maybe use skb_recv_datagram here? */
 /* Note this does not seem to handle vectored messages. Relevant? */
@@ -1884,7 +1875,7 @@ static int sdp_recvmsg(struct kiocb *iocb, struct sock *sk, struct msghdr *msg,
 	lock_sock(sk);
 	sdp_dbg_data(sk, "%s\n", __func__);
 
-	sdp_prf(sk, skb, "Read from user");
+//	sdp_prf(sk, skb, "Read from user");
 
 	err = -ENOTCONN;
 	if (sk->sk_state == TCP_LISTEN)
@@ -2024,6 +2015,7 @@ static int sdp_recvmsg(struct kiocb *iocb, struct sock *sk, struct msghdr *msg,
 			err = skb_copy_datagram_iovec(skb, offset,
 						      /* TODO: skip header? */
 						      msg->msg_iov, used);
+//			err = dummy_memcpy_toiovec(msg->msg_iov, used);
 			sdp_prf(sk, skb, "Copied to user %ld bytes. err = %d", used, err);
 			if (err) {
 				sdp_dbg(sk, "%s: skb_copy_datagram_iovec failed"
diff --git a/drivers/infiniband/ulp/sdp/sdp_proc.c b/drivers/infiniband/ulp/sdp/sdp_proc.c
index 3778c9a..537b3a6 100644
--- a/drivers/infiniband/ulp/sdp/sdp_proc.c
+++ b/drivers/infiniband/ulp/sdp/sdp_proc.c
@@ -236,15 +236,6 @@ static void sdpstats_seq_hist(struct seq_file *seq, char *str, u32 *h, int n, in
 
 static int sdpstats_seq_show(struct seq_file *seq, void *v)
 {
-#define ENUM2STR(e) [e] = #e
-	static char *mid2str[] = {
-		ENUM2STR(SDP_MID_HELLO),
-		ENUM2STR(SDP_MID_HELLO_ACK),
-		ENUM2STR(SDP_MID_DISCONN),
-		ENUM2STR(SDP_MID_CHRCVBUF),
-		ENUM2STR(SDP_MID_CHRCVBUF_ACK),
-		ENUM2STR(SDP_MID_DATA),
-	};
 	int i;
 
 	seq_printf(seq, "SDP statistics:\n");
@@ -268,9 +259,9 @@ static int sdpstats_seq_show(struct seq_file *seq, void *v)
 	seq_printf(seq, "memcpy_count       \t\t: %u\n", sdpstats.memcpy_count);
 
 	for (i = 0; i < ARRAY_SIZE(sdpstats.post_send); i++) {
-		if (mid2str[i]) {
+		if (mid2str(i)) {
 			seq_printf(seq, "post_send %-20s\t: %d\n",
-					mid2str[i], sdpstats.post_send[i]);
+					mid2str(i), sdpstats.post_send[i]);
 		}
 	}
 
diff --git a/drivers/infiniband/ulp/sdp/sdp_rx.c b/drivers/infiniband/ulp/sdp/sdp_rx.c
index ba8e0fe..810dcbb 100644
--- a/drivers/infiniband/ulp/sdp/sdp_rx.c
+++ b/drivers/infiniband/ulp/sdp/sdp_rx.c
@@ -39,7 +39,7 @@
 
 static int rcvbuf_scale = 0x10;
 
-int rcvbuf_initial_size = SDP_HEAD_SIZE;
+int rcvbuf_initial_size = 32 * 1024;
 module_param_named(rcvbuf_initial_size, rcvbuf_initial_size, int, 0644);
 MODULE_PARM_DESC(rcvbuf_initial_size, "Receive buffer initial size in bytes.");
 
@@ -149,22 +149,28 @@ static void sdp_fin(struct sock *sk)
 	}
 }
 
-
-static void sdp_post_recv(struct sdp_sock *ssk)
+/* lock_sock must be taken before calling this - since rx_ring.head is not 
+ * protected (although being atomic
+ */
+static int sdp_post_recv(struct sdp_sock *ssk)
 {
 	struct sdp_buf *rx_req;
 	int i, rc, frags;
 	u64 addr;
 	struct ib_device *dev;
-	struct ib_sge *sge;
+	struct ib_recv_wr rx_wr = { 0 };
+	struct ib_sge ibsge[SDP_MAX_SEND_SKB_FRAGS + 1];
+	struct ib_sge *sge = ibsge;
 	struct ib_recv_wr *bad_wr;
 	struct sk_buff *skb;
 	struct page *page;
 	skb_frag_t *frag;
 	struct sdp_bsdh *h;
-	int id = ssk->rx_head;
+	int id = ring_head(ssk->rx_ring);
 	gfp_t gfp_page;
+	int ret = 0;
 
+	WARN_ON_UNLOCKED(&ssk->isk.sk, &ssk->rx_ring.lock);
 	/* Now, allocate and repost recv */
 	/* TODO: allocate from cache */
 
@@ -194,10 +200,9 @@ static void sdp_post_recv(struct sdp_sock *ssk)
 		skb->truesize += frag->size;
 	}
 
-        rx_req = ssk->rx_ring + (id & (SDP_RX_SIZE - 1));
+        rx_req = ssk->rx_ring.buffer + (id & (SDP_RX_SIZE - 1));
 	rx_req->skb = skb;
 	dev = ssk->ib_device;
-	sge = ssk->ibsge;
 	addr = ib_dma_map_single(dev, h, SDP_HEAD_SIZE, DMA_FROM_DEVICE);
 	BUG_ON(ib_dma_mapping_error(dev, addr));
 
@@ -221,27 +226,32 @@ static void sdp_post_recv(struct sdp_sock *ssk)
 		sge->lkey = ssk->mr->lkey;
 	}
 
-	ssk->rx_wr.next = NULL;
-	ssk->rx_wr.wr_id = id | SDP_OP_RECV;
-	ssk->rx_wr.sg_list = ssk->ibsge;
-	ssk->rx_wr.num_sge = frags + 1;
-	rc = ib_post_recv(ssk->qp, &ssk->rx_wr, &bad_wr);
-	sdp_prf(&ssk->isk.sk, skb, "rx skb was posted");
+	rx_wr.next = NULL;
+	rx_wr.wr_id = id | SDP_OP_RECV;
+	rx_wr.sg_list = ibsge;
+	rx_wr.num_sge = frags + 1;
+	rc = ib_post_recv(ssk->qp, &rx_wr, &bad_wr);
 	SDPSTATS_COUNTER_INC(post_recv);
-	++ssk->rx_head;
+	atomic_inc(&ssk->rx_ring.head);
 	if (unlikely(rc)) {
-		sdp_dbg(&ssk->isk.sk, "ib_post_recv failed with status %d\n", rc);
+		sdp_warn(&ssk->isk.sk, "ib_post_recv failed with status %d\n", rc);
 		sdp_reset(&ssk->isk.sk);
+		ret = -1;
 	}
 
 	atomic_add(SDP_MAX_SEND_SKB_FRAGS, &sdp_current_mem_usage);
+
+	return ret;
 }
 
-void sdp_post_recvs(struct sdp_sock *ssk)
+/* lock_sock must be taken before calling this */
+static void _sdp_post_recvs(struct sdp_sock *ssk)
 {
 	struct sock *sk = &ssk->isk.sk;
 	int scale = ssk->rcvbuf_scale;
 
+	WARN_ON_UNLOCKED(&ssk->isk.sk, &ssk->rx_ring.lock);
+
 	if (unlikely(!ssk->id || ((1 << sk->sk_state) & 
 		(TCPF_CLOSE | TCPF_TIME_WAIT)))) {
 		return;
@@ -251,12 +261,23 @@ void sdp_post_recvs(struct sdp_sock *ssk)
 	    (top_mem_usage * 0x100000) < atomic_read(&sdp_current_mem_usage) * PAGE_SIZE)
 		scale = 1;
 
-	while ((likely(ssk->rx_head - ssk->rx_tail < SDP_RX_SIZE) &&
-		(ssk->rx_head - ssk->rx_tail - SDP_MIN_TX_CREDITS) *
+	while ((likely(ring_posted(ssk->rx_ring) < SDP_RX_SIZE) &&
+		(ring_posted(ssk->rx_ring) - SDP_MIN_TX_CREDITS) *
 		(SDP_HEAD_SIZE + ssk->recv_frags * PAGE_SIZE) +
 		ssk->rcv_nxt - ssk->copied_seq < sk->sk_rcvbuf * scale) ||
-	       unlikely(ssk->rx_head - ssk->rx_tail < SDP_MIN_TX_CREDITS))
-		sdp_post_recv(ssk);
+	       unlikely(ring_posted(ssk->rx_ring) < SDP_MIN_TX_CREDITS)) {
+		if (sdp_post_recv(ssk))
+			break;
+	}
+}
+
+void sdp_post_recvs(struct sdp_sock *ssk)
+{
+	unsigned long flags;
+
+	rx_ring_lock(ssk, flags);
+	_sdp_post_recvs(ssk);
+	rx_ring_unlock(ssk, flags);
 }
 
 static inline struct sk_buff *sdp_sock_queue_rcv_skb(struct sock *sk,
@@ -328,9 +349,9 @@ int sdp_resize_buffers(struct sdp_sock *ssk, u32 new_size)
 static void sdp_handle_resize_request(struct sdp_sock *ssk, struct sdp_chrecvbuf *buf)
 {
 	if (sdp_resize_buffers(ssk, ntohl(buf->size)) == 0)
-		ssk->recv_request_head = ssk->rx_head + 1;
+		ssk->recv_request_head = ring_head(ssk->rx_ring) + 1;
 	else
-		ssk->recv_request_head = ssk->rx_tail;
+		ssk->recv_request_head = ring_tail(ssk->rx_ring);
 	ssk->recv_request = 1;
 }
 
@@ -347,23 +368,17 @@ static void sdp_handle_resize_ack(struct sdp_sock *ssk, struct sdp_chrecvbuf *bu
 		ssk->sent_request = 0;
 }
 
-static inline int credit_update_needed(struct sdp_sock *ssk, int wc_processed)
+static inline int credit_update_needed(struct sdp_sock *ssk)
 {
 	int c;
 
-	c = ssk->remote_credits;
+	c = remote_credits(ssk);
 	if (likely(c > SDP_MIN_TX_CREDITS))
 		c += c/2;
 
-/*	sdp_warn(&ssk->isk.sk, "credits: %d remote credits: %d "
-			"tx ring slots left: %d send_head: %p\n",
-		ssk->tx_ring.credits, ssk->remote_credits,
-		sdp_tx_ring_slots_left(&ssk->tx_ring),
-		ssk->isk.sk.sk_send_head);
-*/
-	return (unlikely(c < ssk->rx_head - ssk->rx_tail + wc_processed) &&
-	    likely(ssk->tx_ring.credits > 1) &&
-	    likely(sdp_tx_ring_slots_left(&ssk->tx_ring)));
+	return unlikely(c < ring_posted(ssk->rx_ring)) &&
+	    likely(tx_credits(ssk) > 1) &&
+	    likely(sdp_tx_ring_slots_left(&ssk->tx_ring));
 }
 
 
@@ -374,14 +389,16 @@ static struct sk_buff *sdp_recv_completion(struct sdp_sock *ssk, int id)
 	struct sk_buff *skb;
 	int i, frags;
 
-	if (unlikely(id != ssk->rx_tail)) {
+	WARN_ON_UNLOCKED(&ssk->isk.sk, &ssk->rx_ring.lock);
+
+	if (unlikely(id != ring_tail(ssk->rx_ring))) {
 		printk(KERN_WARNING "Bogus recv completion id %d tail %d\n",
-			id, ssk->rx_tail);
+			id, ring_tail(ssk->rx_ring));
 		return NULL;
 	}
 
 	dev = ssk->ib_device;
-        rx_req = &ssk->rx_ring[id & (SDP_RX_SIZE - 1)];
+        rx_req = &ssk->rx_ring.buffer[id & (SDP_RX_SIZE - 1)];
 	skb = rx_req->skb;
 	ib_dma_unmap_single(dev, rx_req->mapping[0], SDP_HEAD_SIZE,
 			    DMA_FROM_DEVICE);
@@ -390,67 +407,20 @@ static struct sk_buff *sdp_recv_completion(struct sdp_sock *ssk, int id)
 		ib_dma_unmap_page(dev, rx_req->mapping[i + 1],
 				  skb_shinfo(skb)->frags[i].size,
 				  DMA_FROM_DEVICE);
-	++ssk->rx_tail;
-	--ssk->remote_credits;
+	atomic_inc(&ssk->rx_ring.tail);
+	atomic_dec(&ssk->remote_credits);
 	return skb;
 }
 
-static int sdp_process_rx_wc(struct sdp_sock *ssk, struct ib_wc *wc)
+/* this must be called while sock_lock is taken */
+static int sdp_process_rx_skb(struct sdp_sock *ssk, struct sk_buff *skb)
 {
 	struct sock *sk = &ssk->isk.sk;
 	int frags;
-	struct sk_buff *skb;
 	struct sdp_bsdh *h;
 	int pagesz, i;
 
-	skb = sdp_recv_completion(ssk, wc->wr_id);
-	if (unlikely(!skb))
-		return -1;
-
-	sdp_prf(sk, skb, "recv completion");	
-
-	atomic_sub(SDP_MAX_SEND_SKB_FRAGS, &sdp_current_mem_usage);
-
-	if (unlikely(wc->status)) {
-		if (wc->status != IB_WC_WR_FLUSH_ERR) {
-			sdp_dbg(sk, "Recv completion with error. Status %d\n",
-				wc->status);
-			sdp_reset(sk);
-		}
-		__kfree_skb(skb);
-		return 0;
-	}
-
-	sdp_dbg_data(sk, "Recv completion. ID %d Length %d\n",
-			(int)wc->wr_id, wc->byte_len);
-	if (unlikely(wc->byte_len < sizeof(struct sdp_bsdh))) {
-		printk(KERN_WARNING "SDP BUG! byte_len %d < %zd\n",
-				wc->byte_len, sizeof(struct sdp_bsdh));
-		__kfree_skb(skb);
-		return -1;
-	}
-	skb->len = wc->byte_len;
-	if (likely(wc->byte_len > SDP_HEAD_SIZE))
-		skb->data_len = wc->byte_len - SDP_HEAD_SIZE;
-	else
-		skb->data_len = 0;
-	skb->data = skb->head;
-#ifdef NET_SKBUFF_DATA_USES_OFFSET
-	skb->tail = skb_headlen(skb);
-#else
-	skb->tail = skb->head + skb_headlen(skb);
-#endif
 	h = (struct sdp_bsdh *)skb->data;
-	SDP_DUMP_PACKET(&ssk->isk.sk, "RX", skb, h);
-	skb_reset_transport_header(skb);
-	ssk->mseq_ack = ntohl(h->mseq);
-	if (ssk->mseq_ack != (int)wc->wr_id)
-		printk(KERN_WARNING "SDP BUG! mseq %d != wrid %d\n",
-				ssk->mseq_ack, (int)wc->wr_id);
-
-	SDPSTATS_HIST_LINEAR(credits_before_update, ssk->tx_ring.credits);
-	ssk->tx_ring.credits = ntohl(h->mseq_ack) - ssk->tx_ring.head + 1 +
-		ntohs(h->bufs);
 
 	frags = skb_shinfo(skb)->nr_frags;
 	pagesz = PAGE_ALIGN(skb->data_len);
@@ -513,13 +483,105 @@ static int sdp_process_rx_wc(struct sdp_sock *ssk, struct ib_wc *wc)
 	return 0;
 }
 
-int sdp_poll_rx_cq(struct sdp_sock *ssk)
+/* called only from irq */
+static struct sk_buff *sdp_process_rx_wc(struct sdp_sock *ssk, struct ib_wc *wc)
 {
-	struct ib_cq *cq = ssk->rx_cq;
+	struct sk_buff *skb;
+	struct sdp_bsdh *h;
+	struct sock *sk = &ssk->isk.sk;
+	int credits_before;
+	
+	skb = sdp_recv_completion(ssk, wc->wr_id);
+	if (unlikely(!skb))
+		return NULL;
+
+	atomic_sub(SDP_MAX_SEND_SKB_FRAGS, &sdp_current_mem_usage);
+
+	if (unlikely(wc->status)) {
+		if (wc->status != IB_WC_WR_FLUSH_ERR) {
+			sdp_warn(sk, "Recv completion with error. Status %d\n",
+				wc->status);
+			sdp_reset(sk);
+		}
+		__kfree_skb(skb);
+		return NULL;
+	}
+
+	sdp_dbg_data(sk, "Recv completion. ID %d Length %d\n",
+			(int)wc->wr_id, wc->byte_len);
+	if (unlikely(wc->byte_len < sizeof(struct sdp_bsdh))) {
+		printk(KERN_WARNING "SDP BUG! byte_len %d < %zd\n",
+				wc->byte_len, sizeof(struct sdp_bsdh));
+		__kfree_skb(skb);
+		return NULL;
+	}
+	skb->len = wc->byte_len;
+	if (likely(wc->byte_len > SDP_HEAD_SIZE))
+		skb->data_len = wc->byte_len - SDP_HEAD_SIZE;
+	else
+		skb->data_len = 0;
+	skb->data = skb->head;
+#ifdef NET_SKBUFF_DATA_USES_OFFSET
+	skb->tail = skb_headlen(skb);
+#else
+	skb->tail = skb->head + skb_headlen(skb);
+#endif
+	h = (struct sdp_bsdh *)skb->data;
+	SDP_DUMP_PACKET(&ssk->isk.sk, "RX", skb, h);
+	skb_reset_transport_header(skb);
+	atomic_set(&ssk->mseq_ack, ntohl(h->mseq));
+	if (mseq_ack(ssk) != (int)wc->wr_id)
+		printk(KERN_WARNING "SDP BUG! mseq %d != wrid %d\n",
+				mseq_ack(ssk), (int)wc->wr_id);
+
+	SDPSTATS_HIST_LINEAR(credits_before_update, tx_credits(ssk));
+
+	credits_before = tx_credits(ssk);
+	atomic_set(&ssk->tx_ring.credits, ntohl(h->mseq_ack) - ring_head(ssk->tx_ring) + 1 +
+		ntohs(h->bufs));
+
+	sdp_prf(&ssk->isk.sk, skb, "RX %s bufs=%d c before:%d after:%d "
+		"mseq:%d, ack:%d", mid2str(h->mid), ntohs(h->bufs), credits_before, 
+		tx_credits(ssk), ntohl(h->mseq), ntohl(h->mseq_ack));
+
+	return skb;
+}
+
+/* like sk_stream_write_space - execpt measures remote credits */
+static void sdp_bzcopy_write_space(struct sdp_sock *ssk)
+{
+	struct sock *sk = &ssk->isk.sk;
+	struct socket *sock = sk->sk_socket;
+
+	if (sk->sk_sleep && waitqueue_active(sk->sk_sleep)) {
+		sdp_prf(&ssk->isk.sk, NULL, "credits: %d, min_bufs: %d. tx_head: %d, tx_tail: %d",
+				tx_credits(ssk), ssk->min_bufs,
+				ring_head(ssk->tx_ring), ring_tail(ssk->tx_ring));
+	}
+
+	if (tx_credits(ssk) >= ssk->min_bufs &&
+	    ring_head(ssk->tx_ring) == ring_tail(ssk->tx_ring) &&
+	   sock != NULL) {
+		clear_bit(SOCK_NOSPACE, &sock->flags);
+
+		if (sk->sk_sleep && waitqueue_active(sk->sk_sleep))
+			wake_up_interruptible(sk->sk_sleep);
+		if (sock->fasync_list && !(sk->sk_shutdown & SEND_SHUTDOWN))
+			sock_wake_async(sock, 2, POLL_OUT);
+	}
+}
+
+/* only from interrupt.
+ * drain rx cq into rx_backlog queue */
+static int sdp_poll_rx_cq(struct sdp_sock *ssk)
+{
+	struct ib_cq *cq = ssk->rx_ring.cq;
 	struct ib_wc ibwc[SDP_NUM_WC];
 	int n, i;
-	int ret = -EAGAIN;
 	int wc_processed = 0;
+	struct sk_buff *skb;
+
+	WARN_ON_UNLOCKED(&ssk->isk.sk, &ssk->rx_ring.lock);
 
 	do {
 		n = ib_poll_cq(cq, SDP_NUM_WC, ibwc);
@@ -527,102 +589,224 @@ int sdp_poll_rx_cq(struct sdp_sock *ssk)
 			struct ib_wc *wc = &ibwc[i];
 
 			BUG_ON(!(wc->wr_id & SDP_OP_RECV));
-			sdp_process_rx_wc(ssk, wc);
+			skb = sdp_process_rx_wc(ssk, wc);
+			if (!skb)
+				continue;
+			skb_queue_tail(&ssk->rx_backlog, skb);
 			wc_processed++;
-
-			if (credit_update_needed(ssk, wc_processed)) {
-				sdp_prf(&ssk->isk.sk, NULL, "credit update. remote_credits: %d, avail now: %d processed: %d",
-						ssk->remote_credits,
-						ssk->rx_head - ssk->rx_tail,
-						wc_processed);
-				sdp_post_recvs(ssk);
-				if (sdp_post_credits(ssk))
-					wc_processed = 0;
-			}
-
-			ret = 0;
 		}
 	} while (n == SDP_NUM_WC);
 
-	if (!ret) {
-		struct sock *sk = &ssk->isk.sk;
+	if (wc_processed)
+		sdp_bzcopy_write_space(ssk);
 
-		sdp_post_recvs(ssk);
+	return wc_processed;
+}
 
-		/* update credits */
-		sdp_post_sends(ssk, 0);
+int sdp_process_rx_q(struct sdp_sock *ssk)
+{
+	struct sk_buff *skb;
+	struct sock *sk = &ssk->isk.sk;
+	unsigned long flags;
 
-		if (sk->sk_sleep && waitqueue_active(sk->sk_sleep))
-			sk_stream_write_space(&ssk->isk.sk);
-	} else {
+	if (!ssk->rx_backlog.next || !ssk->rx_backlog.prev) {
+		sdp_warn(&ssk->isk.sk, "polling a zeroed rx_backlog!!!! %p\n", &ssk->rx_backlog);
+		return 0;
+	}
+
+	if (skb_queue_empty(&ssk->rx_backlog)) {
 		SDPSTATS_COUNTER_INC(rx_poll_miss);
+		return -EAGAIN;
 	}
 
-	return ret;
+	/* update credits */
+	sdp_post_sends(ssk, 0);
+
+	spin_lock_irqsave(&ssk->rx_backlog.lock, flags);
+	while ((skb = __skb_dequeue(&ssk->rx_backlog))) {
+		sdp_process_rx_skb(ssk, skb);
+	}
+	spin_unlock_irqrestore(&ssk->rx_backlog.lock, flags);
+
+	if (sk->sk_sleep && waitqueue_active(sk->sk_sleep))
+		sk_stream_write_space(&ssk->isk.sk);
+
+	return 0;
 }
 
-void sdp_rx_comp_work(struct work_struct *work)
+static void sdp_rx_comp_work(struct work_struct *work)
 {
 	struct sdp_sock *ssk = container_of(work, struct sdp_sock, rx_comp_work);
 	struct sock *sk = &ssk->isk.sk;
 	struct ib_cq *rx_cq;
 
 	lock_sock(sk);
-	rx_cq = ssk->rx_cq;
+	rx_cq = ssk->rx_ring.cq;
 	if (unlikely(!rx_cq))
 		goto out;
 
 	if (unlikely(!ssk->poll_cq)) {
 		struct rdma_cm_id *id = ssk->id;
+		sdp_warn(sk, "poll cq is 0. socket was reset or wasn't initialized\n");
 		if (id && id->qp)
 			rdma_notify(id, RDMA_CM_EVENT_ESTABLISHED);
 		goto out;
 	}
 
-	sdp_poll_rx_cq(ssk);
+	sdp_process_rx_q(ssk);
 	sdp_xmit_poll(ssk,  1); /* if has pending tx because run out of tx_credits - xmit it */
 	release_sock(sk);
 	sk_stream_mem_reclaim(sk);
 	lock_sock(sk);
-	rx_cq = ssk->rx_cq;
+	rx_cq = ssk->rx_ring.cq;
 	if (unlikely(!rx_cq))
 		goto out;
 	
-	sdp_arm_rx_cq(sk);
-	sdp_poll_rx_cq(ssk);
+	sdp_process_rx_q(ssk);
 	sdp_xmit_poll(ssk,  1);
+
 out:
 	release_sock(sk);
 }
 
-void sdp_rx_irq(struct ib_cq *cq, void *cq_context)
+static void sdp_rx_irq(struct ib_cq *cq, void *cq_context)
 {
 	struct sock *sk = cq_context;
 	struct sdp_sock *ssk = sdp_sk(sk);
+	unsigned long flags;
+	int wc_processed = 0;
 
-	WARN_ON(ssk->rx_cq && cq != ssk->rx_cq);
+	sdp_dbg_data(&ssk->isk.sk, "rx irq called\n");
 
-	if (!ssk->rx_cq)
-		sdp_warn(&ssk->isk.sk, "WARNING: rx irq after cq destroyed\n");
+	WARN_ON(cq != ssk->rx_ring.cq);
 
 	SDPSTATS_COUNTER_INC(rx_int_count);
 
-	sdp_prf(sk, NULL, "rx completion");
+	sdp_prf(sk, NULL, "rx irq");
+
+	rx_ring_lock(ssk, flags);
+
+	if (unlikely(!ssk->poll_cq))
+		sdp_warn(sk, "poll cq is 0. socket was reset or wasn't initialized\n");
+
+	if (!ssk->rx_ring.cq) {
+		sdp_warn(&ssk->isk.sk, "WARNING: rx irq after cq destroyed\n");
+
+		goto out;
+	}
+
+	wc_processed = sdp_poll_rx_cq(ssk);
+	sdp_prf(&ssk->isk.sk, NULL, "processed %d", wc_processed);
+
+	if (wc_processed) {
+		_sdp_post_recvs(ssk);
+
+		/* Best was to send credit update from here */
+/*		sdp_post_credits(ssk); */
 
-	/* issue sdp_rx_comp_work() */
-	queue_work(rx_comp_wq, &ssk->rx_comp_work);
+		/* issue sdp_rx_comp_work() */
+		queue_work(rx_comp_wq, &ssk->rx_comp_work);
+	}
+
+	sdp_arm_rx_cq(sk);
+
+out:
+	rx_ring_unlock(ssk, flags);
 }
 
-void sdp_rx_ring_purge(struct sdp_sock *ssk)
+static void sdp_rx_ring_purge(struct sdp_sock *ssk)
 {
-	struct sk_buff *skb;
+	WARN_ON_UNLOCKED(&ssk->isk.sk, &ssk->rx_ring.lock);
 
-	while (ssk->rx_head != ssk->rx_tail) {
+	while (ring_posted(ssk->rx_ring) > 0) {
 		struct sk_buff *skb;
-		skb = sdp_recv_completion(ssk, ssk->rx_tail);
+		skb = sdp_recv_completion(ssk, ring_tail(ssk->rx_ring));
 		if (!skb)
 			break;
 		atomic_sub(SDP_MAX_SEND_SKB_FRAGS, &sdp_current_mem_usage);
 		__kfree_skb(skb);
 	}
 }
+
+void sdp_rx_ring_init(struct sdp_sock *ssk)
+{
+	ssk->rx_ring.buffer = NULL;
+	spin_lock_init(&ssk->rx_ring.lock);
+}
+
+static void sdp_rx_cq_event_handler(struct ib_event *event, void *data)
+{
+}
+
+int sdp_rx_ring_create(struct sdp_sock *ssk, struct ib_device *device)
+{
+	struct ib_cq *rx_cq;
+	int rc = 0;
+	unsigned long flags;
+
+	rx_ring_lock(ssk, flags);
+
+	atomic_set(&ssk->rx_ring.head, 1);
+	atomic_set(&ssk->rx_ring.tail, 1);
+
+	ssk->rx_ring.buffer = kmalloc(sizeof *ssk->rx_ring.buffer * SDP_RX_SIZE,
+				      GFP_KERNEL);
+	if (!ssk->rx_ring.buffer) {
+		rc = -ENOMEM;
+		sdp_warn(&ssk->isk.sk, "Unable to allocate RX Ring size %zd.\n",
+			 sizeof(*ssk->rx_ring.buffer) * SDP_RX_SIZE);
+
+		goto out;
+	}
+
+	rx_cq = ib_create_cq(device, sdp_rx_irq, sdp_rx_cq_event_handler,
+			  &ssk->isk.sk, SDP_RX_SIZE, 0);
+
+	if (IS_ERR(rx_cq)) {
+		rc = PTR_ERR(rx_cq);
+		sdp_warn(&ssk->isk.sk, "Unable to allocate RX CQ: %d.\n", rc);
+		goto err_cq;
+	}
+
+	rc = ib_modify_cq(rx_cq, 10, 200);
+	if (rc) {
+		sdp_warn(&ssk->isk.sk, "Unable to modify RX CQ: %d.\n", rc);
+		goto err_mod;
+	}
+	sdp_warn(&ssk->isk.sk, "Initialized CQ moderation\n");
+	sdp_sk(&ssk->isk.sk)->rx_ring.cq = rx_cq;
+
+	INIT_WORK(&ssk->rx_comp_work, sdp_rx_comp_work);
+
+	sdp_arm_rx_cq(&ssk->isk.sk);
+
+	goto out;
+
+err_mod:
+	ib_destroy_cq(rx_cq);
+err_cq:
+	kfree(ssk->rx_ring.buffer);
+	ssk->rx_ring.buffer = NULL;
+out:
+	rx_ring_unlock(ssk, flags);
+	return rc;
+}
+
+void sdp_rx_ring_destroy(struct sdp_sock *ssk)
+{
+	WARN_ON_UNLOCKED(&ssk->isk.sk, &ssk->rx_ring.lock);
+
+	if (ssk->rx_ring.buffer) {
+		sdp_rx_ring_purge(ssk);
+
+		kfree(ssk->rx_ring.buffer);
+		ssk->rx_ring.buffer = NULL;
+	}
+
+	if (ssk->rx_ring.cq) {
+		ib_destroy_cq(ssk->rx_ring.cq);
+		ssk->rx_ring.cq = NULL;
+	}
+
+	WARN_ON(ring_head(ssk->rx_ring) != ring_tail(ssk->rx_ring));
+}
diff --git a/drivers/infiniband/ulp/sdp/sdp_tx.c b/drivers/infiniband/ulp/sdp/sdp_tx.c
index 5e6a2dc..de9d792 100644
--- a/drivers/infiniband/ulp/sdp/sdp_tx.c
+++ b/drivers/infiniband/ulp/sdp/sdp_tx.c
@@ -55,8 +55,11 @@ int sdp_xmit_poll(struct sdp_sock *ssk, int force)
 		mod_timer(&ssk->tx_ring.timer, jiffies + SDP_TX_POLL_TIMEOUT);
 
 	/* Poll the CQ every SDP_TX_POLL_MODER packets */
-	if (force || (++ssk->tx_ring.poll_cnt & (SDP_TX_POLL_MODER - 1)) == 0)
+	if (force || (++ssk->tx_ring.poll_cnt & (SDP_TX_POLL_MODER - 1)) == 0) {
 		wc_processed = sdp_process_tx_cq(ssk);
+		sdp_prf(&ssk->isk.sk, NULL, "processed %d wc's. inflight=%d", wc_processed,
+				ring_posted(ssk->tx_ring));
+	}
 
 	return wc_processed;	
 }
@@ -65,24 +68,16 @@ void sdp_post_send(struct sdp_sock *ssk, struct sk_buff *skb, u8 mid)
 {
 	struct sdp_buf *tx_req;
 	struct sdp_bsdh *h = (struct sdp_bsdh *)skb_push(skb, sizeof *h);
-	unsigned mseq = ssk->tx_ring.head;
+	unsigned mseq = ring_head(ssk->tx_ring);
 	int i, rc, frags;
 	u64 addr;
 	struct ib_device *dev;
-	struct ib_sge *sge;
 	struct ib_send_wr *bad_wr;
 
-#define ENUM2STR(e) [e] = #e
-	static char *mid2str[] = {
-		ENUM2STR(SDP_MID_HELLO),
-		ENUM2STR(SDP_MID_HELLO_ACK),
-		ENUM2STR(SDP_MID_DISCONN),
-		ENUM2STR(SDP_MID_CHRCVBUF),
-		ENUM2STR(SDP_MID_CHRCVBUF_ACK),
-		ENUM2STR(SDP_MID_DATA),
-	};
-	sdp_prf(&ssk->isk.sk, skb, "post_send mid = %s, bufs = %d",
-			mid2str[mid], ssk->rx_head - ssk->rx_tail);
+	struct ib_sge ibsge[SDP_MAX_SEND_SKB_FRAGS + 1];
+	struct ib_sge *sge = ibsge;
+	struct ib_send_wr tx_wr = { 0 };
+
 
 	SDPSTATS_COUNTER_MID_INC(post_send, mid);
 	SDPSTATS_HIST(send_size, skb->len);
@@ -93,16 +88,19 @@ void sdp_post_send(struct sdp_sock *ssk, struct sk_buff *skb, u8 mid)
 	else
 		h->flags = 0;
 
-	h->bufs = htons(ssk->rx_head - ssk->rx_tail);
+	h->bufs = htons(ring_posted(ssk->rx_ring));
 	h->len = htonl(skb->len);
 	h->mseq = htonl(mseq);
-	h->mseq_ack = htonl(ssk->mseq_ack);
+	h->mseq_ack = htonl(mseq_ack(ssk));
+
+	sdp_prf(&ssk->isk.sk, skb, "TX: %s bufs: %d mseq:%d ack:%d",
+			mid2str(mid), ring_posted(ssk->rx_ring), mseq, ntohl(h->mseq_ack));
 
 	SDP_DUMP_PACKET(&ssk->isk.sk, "TX", skb, h);
+
 	tx_req = &ssk->tx_ring.buffer[mseq & (SDP_TX_SIZE - 1)];
 	tx_req->skb = skb;
 	dev = ssk->ib_device;
-	sge = ssk->ibsge;
 	addr = ib_dma_map_single(dev, skb->data, skb->len - skb->data_len,
 				 DMA_TO_DEVICE);
 	tx_req->mapping[0] = addr;
@@ -127,14 +125,14 @@ void sdp_post_send(struct sdp_sock *ssk, struct sk_buff *skb, u8 mid)
 		sge->lkey = ssk->mr->lkey;
 	}
 
-	ssk->tx_wr.next = NULL;
-	ssk->tx_wr.wr_id = ssk->tx_ring.head | SDP_OP_SEND;
-	ssk->tx_wr.sg_list = ssk->ibsge;
-	ssk->tx_wr.num_sge = frags + 1;
-	ssk->tx_wr.opcode = IB_WR_SEND;
-	ssk->tx_wr.send_flags = IB_SEND_SIGNALED;
+	tx_wr.next = NULL;
+	tx_wr.wr_id = ring_head(ssk->tx_ring) | SDP_OP_SEND;
+	tx_wr.sg_list = ibsge;
+	tx_wr.num_sge = frags + 1;
+	tx_wr.opcode = IB_WR_SEND;
+	tx_wr.send_flags = IB_SEND_SIGNALED;
 	if (unlikely(TCP_SKB_CB(skb)->flags & TCPCB_URG))
-		ssk->tx_wr.send_flags |= IB_SEND_SOLICITED;
+		tx_wr.send_flags |= IB_SEND_SOLICITED;
 	
 	{
 		static unsigned long last_send = 0;
@@ -145,19 +143,15 @@ void sdp_post_send(struct sdp_sock *ssk, struct sk_buff *skb, u8 mid)
 
 		last_send = jiffies;
 	}
-	rc = ib_post_send(ssk->qp, &ssk->tx_wr, &bad_wr);
-	++ssk->tx_ring.head;
-	--ssk->tx_ring.credits;
-	ssk->remote_credits = ssk->rx_head - ssk->rx_tail;
+	rc = ib_post_send(ssk->qp, &tx_wr, &bad_wr);
+	atomic_inc(&ssk->tx_ring.head);
+	atomic_dec(&ssk->tx_ring.credits);
+	atomic_set(&ssk->remote_credits, ring_posted(ssk->rx_ring));
 	if (unlikely(rc)) {
 		sdp_dbg(&ssk->isk.sk, "ib_post_send failed with status %d.\n", rc);
 		sdp_set_error(&ssk->isk.sk, -ECONNRESET);
 		wake_up(&ssk->wq);
 	}
-
-	if (ssk->tx_ring.credits <= SDP_MIN_TX_CREDITS) {
-		sdp_poll_rx_cq(ssk);
-	}
 }
 
 static struct sk_buff *sdp_send_completion(struct sdp_sock *ssk, int mseq)
@@ -168,9 +162,9 @@ static struct sk_buff *sdp_send_completion(struct sdp_sock *ssk, int mseq)
 	struct bzcopy_state *bz;
 	int i, frags;
 	struct sdp_tx_ring *tx_ring = &ssk->tx_ring;
-	if (unlikely(mseq != tx_ring->tail)) {
+	if (unlikely(mseq != ring_tail(*tx_ring))) {
 		printk(KERN_WARNING "Bogus send completion id %d tail %d\n",
-			mseq, tx_ring->tail);
+			mseq, ring_tail(*tx_ring));
 		goto out;
 	}
 
@@ -193,7 +187,7 @@ static struct sk_buff *sdp_send_completion(struct sdp_sock *ssk, int mseq)
 	if (bz)
 		bz->busy--;
 
-	++tx_ring->tail;
+	atomic_inc(&tx_ring->tail);
 
 out:
 	return skb;
@@ -219,7 +213,11 @@ static int sdp_handle_send_comp(struct sdp_sock *ssk, struct ib_wc *wc)
 		}
 	}
 
-	sdp_prf(&ssk->isk.sk, skb, "tx completion");
+	{
+		struct sdp_bsdh *h = (struct sdp_bsdh *)skb->data;
+		sdp_prf(&ssk->isk.sk, skb, "tx completion. mseq:%d", ntohl(h->mseq));
+	}
+
 	sk_stream_free_skb(&ssk->isk.sk, skb);
 
 	return 0;
@@ -281,15 +279,14 @@ static int sdp_process_tx_cq(struct sdp_sock *ssk)
 	return wc_processed;	
 }
 
-void sdp_poll_tx_cq(unsigned long data)
+static void sdp_poll_tx_timeout(unsigned long data)
 {
 	struct sdp_sock *ssk = (struct sdp_sock *)data;
 	struct sock *sk = &ssk->isk.sk;
 	u32 inflight, wc_processed;
 
-
 	sdp_dbg_data(&ssk->isk.sk, "Polling tx cq. inflight=%d\n",
-		(u32) ssk->tx_ring.head - ssk->tx_ring.tail);
+		(u32) ring_posted(ssk->tx_ring));
 
 	/* Only process if the socket is not in use */
 	bh_lock_sock(sk);
@@ -303,12 +300,14 @@ void sdp_poll_tx_cq(unsigned long data)
 		goto out;
 
 	wc_processed = sdp_process_tx_cq(ssk);
+	sdp_prf(&ssk->isk.sk, NULL, "processed %d wc's. inflight=%d", wc_processed,
+		ring_posted(ssk->tx_ring));
 	if (!wc_processed)
 		SDPSTATS_COUNTER_INC(tx_poll_miss);
 	else
 		SDPSTATS_COUNTER_INC(tx_poll_hit);
 
-	inflight = (u32) ssk->tx_ring.head - ssk->tx_ring.tail;
+	inflight = (u32) ring_posted(ssk->tx_ring);
 
 	/* If there are still packets in flight and the timer has not already
 	 * been scheduled by the Tx routine then schedule it here to guarantee
@@ -322,17 +321,7 @@ out:
 	bh_unlock_sock(sk);
 }
 
-void _sdp_poll_tx_cq(unsigned long data)
-{
-	struct sdp_sock *ssk = (struct sdp_sock *)data;
-	struct sock *sk = &ssk->isk.sk;
-
-	sdp_prf(sk, NULL, "sdp poll tx timeout expired");
-
-	sdp_poll_tx_cq(data);
-}
-
-void sdp_tx_irq(struct ib_cq *cq, void *cq_context)
+static void sdp_tx_irq(struct ib_cq *cq, void *cq_context)
 {
 	struct sock *sk = cq_context;
 	struct sdp_sock *ssk = sdp_sk(sk);
@@ -344,11 +333,9 @@ void sdp_tx_irq(struct ib_cq *cq, void *cq_context)
 
 void sdp_tx_ring_purge(struct sdp_sock *ssk)
 {
-	struct sk_buff *skb;
-
-	while (ssk->tx_ring.head != ssk->tx_ring.tail) {
+	while (ring_posted(ssk->tx_ring)) {
 		struct sk_buff *skb;
-		skb = sdp_send_completion(ssk, ssk->tx_ring.tail);
+		skb = sdp_send_completion(ssk, ring_tail(ssk->tx_ring));
 		if (!skb)
 			break;
 		__kfree_skb(skb);
@@ -380,3 +367,66 @@ void sdp_post_keepalive(struct sdp_sock *ssk)
 	sdp_cnt(sdp_keepalive_probes_sent);
 }
 
+static void sdp_tx_cq_event_handler(struct ib_event *event, void *data)
+{
+}
+
+int sdp_tx_ring_create(struct sdp_sock *ssk, struct ib_device *device)
+{
+	struct ib_cq *tx_cq;
+	int rc = 0;
+
+	atomic_set(&ssk->tx_ring.head, 1);
+	atomic_set(&ssk->tx_ring.tail, 1);
+
+	ssk->tx_ring.buffer = kmalloc(sizeof *ssk->tx_ring.buffer * SDP_TX_SIZE,
+				      GFP_KERNEL);
+	if (!ssk->tx_ring.buffer) {
+		rc = -ENOMEM;
+		sdp_warn(&ssk->isk.sk, "Unable to allocate TX Ring size %zd.\n",
+			 sizeof(*ssk->tx_ring.buffer) * SDP_TX_SIZE);
+
+		goto out;
+	}
+
+	tx_cq = ib_create_cq(device, sdp_tx_irq, sdp_tx_cq_event_handler,
+			  &ssk->isk.sk, SDP_TX_SIZE, 0);
+
+	if (IS_ERR(tx_cq)) {
+		rc = PTR_ERR(tx_cq);
+		sdp_warn(&ssk->isk.sk, "Unable to allocate TX CQ: %d.\n", rc);
+		goto err_cq;
+	}
+
+	sdp_sk(&ssk->isk.sk)->tx_ring.cq = tx_cq;
+
+	init_timer(&ssk->tx_ring.timer);
+	ssk->tx_ring.timer.function = sdp_poll_tx_timeout;
+	ssk->tx_ring.timer.data = (unsigned long) ssk;
+	ssk->tx_ring.poll_cnt = 0;
+
+	return 0;
+
+err_cq:
+	kfree(ssk->tx_ring.buffer);
+	ssk->tx_ring.buffer = NULL;
+out:
+	return rc;
+}
+
+void sdp_tx_ring_destroy(struct sdp_sock *ssk)
+{
+	if (ssk->tx_ring.buffer) {
+		sdp_tx_ring_purge(ssk);
+
+		kfree(ssk->tx_ring.buffer);
+		ssk->tx_ring.buffer = NULL;
+	}
+
+	if (ssk->tx_ring.cq) {
+		ib_destroy_cq(ssk->tx_ring.cq);
+		ssk->tx_ring.cq = NULL;
+	}
+
+	WARN_ON(ring_head(ssk->tx_ring) != ring_tail(ssk->tx_ring));
+}
-- 
1.5.3.7




More information about the general mailing list