[ofa-general] [PATCH] sdp: no tx interrupts

Amir Vadai amirv at mellanox.co.il
Mon Jun 22 01:18:43 PDT 2009


poll tx cq with timer instead of interrupts

Signed-off-by: Amir Vadai <amirv at mellanox.co.il>
---
 drivers/infiniband/ulp/sdp/sdp.h       |   34 ++++--
 drivers/infiniband/ulp/sdp/sdp_bcopy.c |  222 ++++++++++++++++++++++++-------
 drivers/infiniband/ulp/sdp/sdp_cma.c   |   72 +++++++----
 drivers/infiniband/ulp/sdp/sdp_main.c  |   83 ++++++++-----
 drivers/infiniband/ulp/sdp/sdp_proc.c  |    4 +-
 5 files changed, 297 insertions(+), 118 deletions(-)

diff --git a/drivers/infiniband/ulp/sdp/sdp.h b/drivers/infiniband/ulp/sdp/sdp.h
index cbef0eb..8128065 100644
--- a/drivers/infiniband/ulp/sdp/sdp.h
+++ b/drivers/infiniband/ulp/sdp/sdp.h
@@ -83,12 +83,14 @@ struct sdpstats {
 	u32 sendmsg_seglen[25];
 	u32 send_size[25];
 	u32 post_recv;
-	u32 int_count;
+	u32 rx_int_count;
+	u32 tx_int_count;
 	u32 bzcopy_poll_miss;
 	u32 send_wait_for_mem;
 	u32 send_miss_no_credits;
 	u32 rx_poll_miss;
 	u32 tx_poll_miss;
+	u32 tx_poll_hit;
 	u32 memcpy_count;
 	u32 credits_before_update[64];
 	u32 send_interval[25];
@@ -132,6 +134,11 @@ static inline void sdpstats_hist(u32 *h, u32 val, u32 maxidx, int is_log)
 #define sock_put(sk, msg)  sock_ref(sk, msg, sock_put)
 #define __sock_put(sk, msg)  sock_ref(sk, msg, __sock_put)
 
+/* Interval between sucessive polls in the Tx routine when polling is used
+   instead of interrupts (in per-core Tx rings) - should be power of 2 */
+#define SDP_TX_POLL_MODER	16
+#define SDP_TX_POLL_TIMEOUT	(HZ / 4)
+
 #define SDP_RESOLVE_TIMEOUT 1000
 #define SDP_ROUTE_TIMEOUT 1000
 #define SDP_RETRY_COUNT 5
@@ -242,7 +249,10 @@ struct sdp_tx_ring {
 
 	int 		  una_seq;
 	unsigned 	  credits;
+
+	struct timer_list timer;
 	u16 		  poll_cnt;
+	struct ib_cq 	 *cq;
 };
 
 static inline int sdp_tx_ring_slots_left(struct sdp_tx_ring *tx_ring)
@@ -258,7 +268,7 @@ struct sdp_sock {
 	struct list_head backlog_queue;
 	struct sock *parent;
 
-	struct work_struct work;
+	struct work_struct rx_comp_work;
 	wait_queue_head_t wq;
 
 	struct delayed_work dreq_wait_work;
@@ -308,7 +318,7 @@ struct sdp_sock {
 
 	/* rdma specific */
 	struct ib_qp *qp;
-	struct ib_cq *cq;
+	struct ib_cq *rx_cq;
 	struct ib_mr *mr;
 
 	struct sdp_buf *rx_ring;
@@ -349,7 +359,7 @@ struct bzcopy_state {
 extern int rcvbuf_initial_size;
 
 extern struct proto sdp_proto;
-extern struct workqueue_struct *comp_wq;
+extern struct workqueue_struct *rx_comp_wq;
 
 extern atomic_t sdp_current_mem_usage;
 extern spinlock_t sdp_large_sockets_lock;
@@ -440,11 +450,11 @@ static inline void sdp_set_error(struct sock *sk, int err)
 	sk->sk_error_report(sk);
 }
 
-static inline void sdp_arm_cq(struct sock *sk)
+static inline void sdp_arm_rx_cq(struct sock *sk)
 {
-	sdp_dbg_data(sk, "ib_req_notify_cq on cq\n");
+	sdp_dbg_data(sk, "ib_req_notify_cq on RX cq\n");
 	
-	ib_req_notify_cq(sdp_sk(sk)->cq, IB_CQ_NEXT_COMP);
+	ib_req_notify_cq(sdp_sk(sk)->rx_cq, IB_CQ_NEXT_COMP);
 }
 
 #ifdef CONFIG_INFINIBAND_SDP_DEBUG_DATA
@@ -453,12 +463,15 @@ void dump_packet(struct sock *sk, char *str, struct sk_buff *skb, const struct s
 int sdp_cma_handler(struct rdma_cm_id *, struct rdma_cm_event *);
 void sdp_reset(struct sock *sk);
 void sdp_reset_sk(struct sock *sk, int rc);
-void sdp_completion_handler(struct ib_cq *cq, void *cq_context);
-void sdp_work(struct work_struct *work);
+void sdp_rx_irq(struct ib_cq *cq, void *cq_context);
+void sdp_tx_irq(struct ib_cq *cq, void *cq_context);
+void sdp_poll_tx_cq(unsigned long data);
+void sdp_rx_comp_work(struct work_struct *work);
+void sdp_process_tx_wc_work(struct work_struct *work);
 int sdp_post_credits(struct sdp_sock *ssk);
 void sdp_post_send(struct sdp_sock *ssk, struct sk_buff *skb, u8 mid);
 void sdp_post_recvs(struct sdp_sock *ssk);
-int sdp_poll_cq(struct sdp_sock *ssk, struct ib_cq *cq);
+int sdp_poll_rx_cq(struct sdp_sock *ssk);
 void sdp_post_sends(struct sdp_sock *ssk, int nonagle);
 void sdp_destroy_work(struct work_struct *work);
 void sdp_cancel_dreq_wait_timeout(struct sdp_sock *ssk);
@@ -477,6 +490,7 @@ void sdp_bzcopy_write_space(struct sdp_sock *ssk);
 int sdp_init_sock(struct sock *sk);
 int __init sdp_proc_init(void);
 void sdp_proc_unregister(void);
+int sdp_xmit_poll(struct sdp_sock *ssk, int force);
 
 static inline struct sk_buff *sdp_stream_alloc_skb(struct sock *sk, int size, gfp_t gfp)
 {
diff --git a/drivers/infiniband/ulp/sdp/sdp_bcopy.c b/drivers/infiniband/ulp/sdp/sdp_bcopy.c
index 9f7f4a0..29c9761 100644
--- a/drivers/infiniband/ulp/sdp/sdp_bcopy.c
+++ b/drivers/infiniband/ulp/sdp/sdp_bcopy.c
@@ -244,6 +244,24 @@ void dump_packet(struct sock *sk, char *str, struct sk_buff *skb, const struct s
 }
 #endif
 
+static int sdp_process_tx_cq(struct sdp_sock *ssk);
+
+int sdp_xmit_poll(struct sdp_sock *ssk, int force)
+{
+	int wc_processed = 0;
+
+	/* If we don't have a pending timer, set one up to catch our recent
+	   post in case the interface becomes idle */
+	if (!timer_pending(&ssk->tx_ring.timer))
+		mod_timer(&ssk->tx_ring.timer, jiffies + SDP_TX_POLL_TIMEOUT);
+
+	/* Poll the CQ every SDP_TX_POLL_MODER packets */
+	if (force || (++ssk->tx_ring.poll_cnt & (SDP_TX_POLL_MODER - 1)) == 0)
+		wc_processed = sdp_process_tx_cq(ssk);
+
+	return wc_processed;	
+}
+
 void sdp_post_send(struct sdp_sock *ssk, struct sk_buff *skb, u8 mid)
 {
 	struct sdp_buf *tx_req;
@@ -573,6 +591,7 @@ int sdp_post_credits(struct sdp_sock *ssk)
 		if (!skb)
 			return -ENOMEM;
 		sdp_post_send(ssk, skb, SDP_MID_DATA);
+		sdp_xmit_poll(ssk, 0);
 	}
 	return 0;
 }
@@ -583,6 +602,7 @@ void sdp_post_sends(struct sdp_sock *ssk, int nonagle)
 	struct sk_buff *skb;
 	int c;
 	gfp_t gfp_page;
+	int post_count = 0;
 
 	if (unlikely(!ssk->id)) {
 		if (ssk->isk.sk.sk_send_head) {
@@ -602,7 +622,7 @@ void sdp_post_sends(struct sdp_sock *ssk, int nonagle)
 	if (ssk->recv_request &&
 	    ssk->rx_tail >= ssk->recv_request_head &&
 	    ssk->tx_ring.credits >= SDP_MIN_TX_CREDITS &&
-	    ssk->tx_ring.head - ssk->tx_ring.tail < SDP_TX_SIZE) {
+	    sdp_tx_ring_slots_left(&ssk->tx_ring)) {
 		struct sdp_chrecvbuf *resp_size;
 		ssk->recv_request = 0;
 		skb = sdp_stream_alloc_skb(&ssk->isk.sk,
@@ -614,6 +634,7 @@ void sdp_post_sends(struct sdp_sock *ssk, int nonagle)
 		resp_size = (struct sdp_chrecvbuf *)skb_put(skb, sizeof *resp_size);
 		resp_size->size = htonl(ssk->recv_frags * PAGE_SIZE);
 		sdp_post_send(ssk, skb, SDP_MID_CHRCVBUF_ACK);
+		post_count++;
 	}
 
 	if (ssk->tx_ring.credits <= SDP_MIN_TX_CREDITS &&
@@ -634,12 +655,13 @@ void sdp_post_sends(struct sdp_sock *ssk, int nonagle)
 		update_send_head(&ssk->isk.sk, skb);
 		__skb_dequeue(&ssk->isk.sk.sk_write_queue);
 		sdp_post_send(ssk, skb, SDP_MID_DATA);
+		post_count++;
 	}
 
 	if (ssk->tx_ring.credits == SDP_MIN_TX_CREDITS &&
 	    !ssk->sent_request &&
 	    ssk->tx_ring.head > ssk->sent_request_head + SDP_RESIZE_WAIT &&
-	    ssk->tx_ring.head - ssk->tx_ring.tail < SDP_TX_SIZE) {
+	    sdp_tx_ring_slots_left(&ssk->tx_ring)) {
 		struct sdp_chrecvbuf *req_size;
 		skb = sdp_stream_alloc_skb(&ssk->isk.sk,
 					  sizeof(struct sdp_bsdh) +
@@ -652,6 +674,7 @@ void sdp_post_sends(struct sdp_sock *ssk, int nonagle)
 		req_size = (struct sdp_chrecvbuf *)skb_put(skb, sizeof *req_size);
 		req_size->size = htonl(ssk->sent_request);
 		sdp_post_send(ssk, skb, SDP_MID_CHRCVBUF);
+		post_count++;
 	}
 
 	c = ssk->remote_credits;
@@ -660,7 +683,7 @@ void sdp_post_sends(struct sdp_sock *ssk, int nonagle)
 
 	if (unlikely(c < ssk->rx_head - ssk->rx_tail) &&
 	    likely(ssk->tx_ring.credits > 1) &&
-	    likely(ssk->tx_ring.head - ssk->tx_ring.tail < SDP_TX_SIZE) &&
+	    likely(sdp_tx_ring_slots_left(&ssk->tx_ring)) &&
 	    likely((1 << ssk->isk.sk.sk_state) &
 		    (TCPF_ESTABLISHED | TCPF_FIN_WAIT1))) {
 		skb = sdp_stream_alloc_skb(&ssk->isk.sk,
@@ -670,6 +693,7 @@ void sdp_post_sends(struct sdp_sock *ssk, int nonagle)
 		BUG_ON(!skb);
 		SDPSTATS_COUNTER_INC(post_send_credits);
 		sdp_post_send(ssk, skb, SDP_MID_DATA);
+		post_count++;
 	}
 
 	/* send DisConn if needed
@@ -687,7 +711,10 @@ void sdp_post_sends(struct sdp_sock *ssk, int nonagle)
 		/* FIXME */
 		BUG_ON(!skb);
 		sdp_post_send(ssk, skb, SDP_MID_DISCONN);
+		post_count++;
 	}
+	if (post_count)
+		sdp_xmit_poll(ssk, 0);
 }
 
 int sdp_init_buffers(struct sdp_sock *ssk, u32 new_size)
@@ -867,7 +894,6 @@ static int sdp_handle_recv_comp(struct sdp_sock *ssk, struct ib_wc *wc)
 static int sdp_handle_send_comp(struct sdp_sock *ssk, struct ib_wc *wc)
 {
 	struct sk_buff *skb = NULL;
-	struct sdp_bsdh *h;
 
 	skb = sdp_send_completion(ssk, wc->wr_id);
 	if (unlikely(!skb))
@@ -881,72 +907,162 @@ static int sdp_handle_send_comp(struct sdp_sock *ssk, struct ib_wc *wc)
 			sdp_set_error(sk, -ECONNRESET);
 			wake_up(&ssk->wq);
 
-			queue_work(comp_wq, &ssk->destroy_work);
+			queue_work(rx_comp_wq, &ssk->destroy_work);
 		}
-		goto out;
 	}
 
-	h = (struct sdp_bsdh *)skb->data;
+	sk_stream_free_skb(&ssk->isk.sk, skb);
 
-	if (likely(h->mid != SDP_MID_DISCONN))
-		goto out;
+	return 0;
+}
+
+void sdp_rx_irq(struct ib_cq *cq, void *cq_context)
+{
+	struct sock *sk = cq_context;
+	struct sdp_sock *ssk = sdp_sk(sk);
+
+	WARN_ON(ssk->rx_cq && cq != ssk->rx_cq);
+
+	if (!ssk->rx_cq)
+		sdp_warn(&ssk->isk.sk, "WARNING: rx irq after cq destroyed\n");
+
+	SDPSTATS_COUNTER_INC(rx_int_count);
+
+	queue_work(rx_comp_wq, &ssk->rx_comp_work);
+}
 
-	if ((1 << ssk->isk.sk.sk_state) & ~(TCPF_FIN_WAIT1 | TCPF_LAST_ACK)) {
-		sdp_dbg(&ssk->isk.sk,
-			"%s: sent DISCONNECT from unexpected state %d\n",
-			__func__, ssk->isk.sk.sk_state);
+static inline void sdp_process_tx_wc(struct sdp_sock *ssk, struct ib_wc *wc)
+{
+	if (likely(wc->wr_id & SDP_OP_SEND)) {
+		sdp_handle_send_comp(ssk, wc);
+		return;
 	}
 
-out:
 	sk_wmem_free_skb(&ssk->isk.sk, skb);
 
-	return 0;
+	/* Keepalive probe sent cleanup */
+	sdp_cnt(sdp_keepalive_probes_sent);
+
+	if (likely(!wc->status))
+		return;
+
+	sdp_dbg(&ssk->isk.sk, " %s consumes KEEPALIVE status %d\n",
+			__func__, wc->status);
+
+	if (wc->status == IB_WC_WR_FLUSH_ERR)
+		return;
+
+	sdp_set_error(&ssk->isk.sk, -ECONNRESET);
+	wake_up(&ssk->wq);
 }
 
-static void sdp_handle_wc(struct sdp_sock *ssk, struct ib_wc *wc)
+static int sdp_process_tx_cq(struct sdp_sock *ssk)
 {
-	if (wc->wr_id & SDP_OP_RECV) {
-		if (sdp_handle_recv_comp(ssk, wc))
-			return;
-	} else if (likely(wc->wr_id & SDP_OP_SEND)) {
-		if (sdp_handle_send_comp(ssk, wc))
-			return;
-	} else {
-		sdp_cnt(sdp_keepalive_probes_sent);
+	struct ib_wc ibwc[SDP_NUM_WC];
+	int n, i;
+	int wc_processed = 0;
 
-		if (likely(!wc->status))
-			return;
+	if (!ssk->tx_ring.cq) {
+		sdp_warn(&ssk->isk.sk, "WARNING: tx irq when tx_cq is destroyed\n");
+		return 0;
+	}
+	
+	do {
+		n = ib_poll_cq(ssk->tx_ring.cq, SDP_NUM_WC, ibwc);
+		for (i = 0; i < n; ++i) {
+			sdp_process_tx_wc(ssk, ibwc + i);
+			wc_processed++;
+		}
+	} while (n == SDP_NUM_WC);
 
-		sdp_dbg(&ssk->isk.sk, " %s consumes KEEPALIVE status %d\n",
-		        __func__, wc->status);
+	sdp_dbg_data(&ssk->isk.sk, "processed %d wc's\n", wc_processed);
 
-		if (wc->status == IB_WC_WR_FLUSH_ERR)
-			return;
+	if (wc_processed) {
+		struct sock *sk = &ssk->isk.sk;
+		sdp_post_sends(ssk, 0);
 
-		sdp_set_error(&ssk->isk.sk, -ECONNRESET);
-		wake_up(&ssk->wq);
+		if (sk->sk_sleep && waitqueue_active(sk->sk_sleep))
+			sk_stream_write_space(&ssk->isk.sk);
 
-		return;
 	}
+
+	return wc_processed;	
 }
 
-void sdp_completion_handler(struct ib_cq *cq, void *cq_context)
+void sdp_poll_tx_cq(unsigned long data)
+{
+	struct sdp_sock *ssk = (struct sdp_sock *)data;
+	struct sock *sk = &ssk->isk.sk;
+	u32 inflight, wc_processed;
+
+	sdp_dbg_data(&ssk->isk.sk, "Polling tx cq. inflight=%d\n",
+		(u32) ssk->tx_ring.head - ssk->tx_ring.tail);
+
+	/* Only process if the socket is not in use */
+	bh_lock_sock(sk);
+	if (sock_owned_by_user(sk)) {
+		mod_timer(&ssk->tx_ring.timer, jiffies + SDP_TX_POLL_TIMEOUT);
+		sdp_dbg_data(&ssk->isk.sk, "socket is busy - trying later\n");
+		goto out;
+	}
+
+	if (sk->sk_state == TCP_CLOSE)
+		goto out;
+
+	wc_processed = sdp_process_tx_cq(ssk);
+	if (!wc_processed)
+		SDPSTATS_COUNTER_INC(tx_poll_miss);
+	else
+		SDPSTATS_COUNTER_INC(tx_poll_hit);
+
+	inflight = (u32) ssk->tx_ring.head - ssk->tx_ring.tail;
+
+	/* If there are still packets in flight and the timer has not already
+	 * been scheduled by the Tx routine then schedule it here to guarantee
+	 * completion processing of these packets */
+	if (inflight) { /* TODO: make sure socket is not closed */
+		sdp_dbg_data(sk, "arming timer for more polling\n");
+		mod_timer(&ssk->tx_ring.timer, jiffies + SDP_TX_POLL_TIMEOUT);
+	}
+
+out:
+	bh_unlock_sock(sk);
+}
+
+
+void sdp_tx_irq(struct ib_cq *cq, void *cq_context)
 {
 	struct sock *sk = cq_context;
 	struct sdp_sock *ssk = sdp_sk(sk);
-	schedule_work(&ssk->work);
-	SDPSTATS_COUNTER_INC(int_count);
+
+	sdp_warn(sk, "Got tx comp interrupt\n");
+
+	mod_timer(&ssk->tx_ring.timer, jiffies + 1);
 }
 
-int sdp_poll_cq(struct sdp_sock *ssk, struct ib_cq *cq)
+
+int sdp_poll_rx_cq(struct sdp_sock *ssk)
 {
+	struct ib_cq *cq = ssk->rx_cq;
 	struct ib_wc ibwc[SDP_NUM_WC];
 	int n, i;
 	int ret = -EAGAIN;
+	int updated_credits = 0;
+
 	do {
 		n = ib_poll_cq(cq, SDP_NUM_WC, ibwc);
 		for (i = 0; i < n; ++i) {
-			sdp_handle_wc(ssk, &ibwc[i]);
+			struct ib_wc *wc = &ibwc[i];
+
+			BUG_ON(!(wc->wr_id & SDP_OP_RECV));
+			sdp_handle_recv_comp(ssk, wc);
+
+			if (!updated_credits) {
+				sdp_post_recvs(ssk);
+				sdp_post_sends(ssk, 0);
+				updated_credits = 1;
+			}
+
 			ret = 0;
 		}
 	} while (n == SDP_NUM_WC);
@@ -968,17 +1084,20 @@ int sdp_poll_cq(struct sdp_sock *ssk, struct ib_cq *cq)
 	return ret;
 }
 
-void sdp_work(struct work_struct *work)
+static inline int sdp_tx_qp_empty(struct sdp_sock *ssk)
 {
-	struct sdp_sock *ssk = container_of(work, struct sdp_sock, work);
-	struct sock *sk = &ssk->isk.sk;
-	struct ib_cq *cq;
+	return (ssk->tx_ring.head - ssk->tx_ring.tail) == 0;
+}
 
-	sdp_dbg_data(sk, "%s\n", __func__);
+void sdp_rx_comp_work(struct work_struct *work)
+{
+	struct sdp_sock *ssk = container_of(work, struct sdp_sock, rx_comp_work);
+	struct sock *sk = &ssk->isk.sk;
+	struct ib_cq *rx_cq;
 
 	lock_sock(sk);
-	cq = ssk->cq;
-	if (unlikely(!cq))
+	rx_cq = ssk->rx_cq;
+	if (unlikely(!rx_cq))
 		goto out;
 
 	if (unlikely(!ssk->poll_cq)) {
@@ -988,15 +1107,18 @@ void sdp_work(struct work_struct *work)
 		goto out;
 	}
 
-	sdp_poll_cq(ssk, cq);
+	sdp_poll_rx_cq(ssk);
+	sdp_xmit_poll(ssk,  1); /* if has pending tx because run out of tx_credits - xmit it */
 	release_sock(sk);
 	sk_mem_reclaim(sk);
 	lock_sock(sk);
-	cq = ssk->cq;
-	if (unlikely(!cq))
+	rx_cq = ssk->rx_cq;
+	if (unlikely(!rx_cq))
 		goto out;
-	sdp_arm_cq(sk);
-	sdp_poll_cq(ssk, cq);
+	
+	sdp_arm_rx_cq(sk);
+	sdp_poll_rx_cq(ssk);
+	sdp_xmit_poll(ssk,  1);
 out:
 	release_sock(sk);
 }
diff --git a/drivers/infiniband/ulp/sdp/sdp_cma.c b/drivers/infiniband/ulp/sdp/sdp_cma.c
index 1e1ff9d..7c34637 100644
--- a/drivers/infiniband/ulp/sdp/sdp_cma.c
+++ b/drivers/infiniband/ulp/sdp/sdp_cma.c
@@ -73,7 +73,7 @@ static int sdp_init_qp(struct sock *sk, struct rdma_cm_id *id)
         	.qp_type = IB_QPT_RC,
 	};
 	struct ib_device *device = id->device;
-	struct ib_cq *cq;
+	struct ib_cq *rx_cq, *tx_cq;
 	struct ib_mr *mr;
 	struct ib_pd *pd;
 	int rc;
@@ -118,32 +118,49 @@ static int sdp_init_qp(struct sock *sk, struct rdma_cm_id *id)
         }
 
 	sdp_sk(sk)->mr = mr;
-	INIT_WORK(&sdp_sk(sk)->work, sdp_work);
+	INIT_WORK(&sdp_sk(sk)->rx_comp_work, sdp_rx_comp_work);
 
-	cq = ib_create_cq(device, sdp_completion_handler, sdp_cq_event_handler,
-			  sk, SDP_TX_SIZE + SDP_RX_SIZE, 0);
+	rx_cq = ib_create_cq(device, sdp_rx_irq, sdp_cq_event_handler,
+			  sk, SDP_RX_SIZE, 0);
 
-	if (IS_ERR(cq)) {
-		rc = PTR_ERR(cq);
-		sdp_warn(sk, "Unable to allocate CQ: %d.\n", rc);
-		goto err_cq;
+	if (IS_ERR(rx_cq)) {
+		rc = PTR_ERR(rx_cq);
+		sdp_warn(sk, "Unable to allocate RX CQ: %d.\n", rc);
+		goto err_rx_cq;
 	}
 
-	rc = ib_modify_cq(cq, 10, 200);
+	rc = ib_modify_cq(rx_cq, 10, 200);
 	if (rc) {
 		sdp_warn(sk, "Unable to modify RX CQ: %d.\n", rc);
-		goto err_qp;
+		goto err_tx_cq;
 	}
 	sdp_warn(sk, "Initialized CQ moderation\n");
+	sdp_sk(sk)->rx_cq = rx_cq;
+	sdp_arm_rx_cq(sk);
+	qp_init_attr.recv_cq = rx_cq;
+
+	tx_cq = ib_create_cq(device, sdp_tx_irq, sdp_cq_event_handler,
+			  sk, SDP_TX_SIZE, 0);
+
+	if (IS_ERR(tx_cq)) {
+		rc = PTR_ERR(tx_cq);
+		sdp_warn(sk, "Unable to allocate TX CQ: %d.\n", rc);
+		goto err_tx_cq;
+	}
 
-        qp_init_attr.send_cq = qp_init_attr.recv_cq = cq;
+	init_timer(&sdp_sk(sk)->tx_ring.timer);
+	sdp_sk(sk)->tx_ring.timer.function = sdp_poll_tx_cq;
+	sdp_sk(sk)->tx_ring.timer.data = (unsigned long) sdp_sk(sk);
+	sdp_sk(sk)->tx_ring.poll_cnt = 0;
+
+	sdp_sk(sk)->tx_ring.cq = tx_cq;
+        qp_init_attr.send_cq = tx_cq;
 
 	rc = rdma_create_qp(id, pd, &qp_init_attr);
 	if (rc) {
 		sdp_warn(sk, "Unable to create QP: %d.\n", rc);
 		goto err_qp;
 	}
-	sdp_sk(sk)->cq = cq;
 	sdp_sk(sk)->qp = id->qp;
 	sdp_sk(sk)->ib_device = device;
 
@@ -153,8 +170,10 @@ static int sdp_init_qp(struct sock *sk, struct rdma_cm_id *id)
 	return 0;
 
 err_qp:
-	ib_destroy_cq(cq);
-err_cq:
+	ib_destroy_cq(tx_cq);
+err_tx_cq:
+	ib_destroy_cq(rx_cq);
+err_rx_cq:
 	ib_dereg_mr(sdp_sk(sk)->mr);
 err_mr:
 	ib_dealloc_pd(pd);
@@ -162,7 +181,6 @@ err_pd:
 	kfree(sdp_sk(sk)->rx_ring);
 	sdp_sk(sk)->rx_ring = NULL;
 err_rx:
-	WARN_ON(sdp_sk(sk)->tx_ring.head != sdp_sk(sk)->tx_ring.tail);
 	kfree(sdp_sk(sk)->tx_ring.buffer);
 	sdp_sk(sk)->tx_ring.buffer = NULL;
 err_tx:
@@ -258,21 +276,20 @@ static int sdp_response_handler(struct sock *sk, struct rdma_cm_id *id,
 	sdp_sk(sk)->min_bufs = sdp_sk(sk)->tx_ring.credits / 4;
 	sdp_sk(sk)->xmit_size_goal = ntohl(h->actrcvsz) -
 		sizeof(struct sdp_bsdh);
- 	sdp_sk(sk)->send_frags = MIN(PAGE_ALIGN(sdp_sk(sk)->xmit_size_goal) /
- 		PAGE_SIZE, SDP_MAX_SEND_SKB_FRAGS);
- 	sdp_sk(sk)->xmit_size_goal = MIN(sdp_sk(sk)->xmit_size_goal, 
- 		sdp_sk(sk)->send_frags * PAGE_SIZE);
+	sdp_sk(sk)->send_frags = MIN(PAGE_ALIGN(sdp_sk(sk)->xmit_size_goal) /
+		PAGE_SIZE, SDP_MAX_SEND_SKB_FRAGS);
+	sdp_sk(sk)->xmit_size_goal = MIN(sdp_sk(sk)->xmit_size_goal, 
+		sdp_sk(sk)->send_frags * PAGE_SIZE);
 
-	sdp_dbg(sk, "%s bufs %d xmit_size_goal %d send_frags: %d send trigger %d\n",
-		__func__,
+	sdp_dbg(sk, "tx credits %d xmit_size_goal %d send_frags: %d credits update trigger %d\n",
 		sdp_sk(sk)->tx_ring.credits,
 		sdp_sk(sk)->xmit_size_goal,
- 		sdp_sk(sk)->send_frags,
+		sdp_sk(sk)->send_frags,
 		sdp_sk(sk)->min_bufs);
 
 	sdp_sk(sk)->poll_cq = 1;
-	sdp_arm_cq(sk);
-	sdp_poll_cq(sdp_sk(sk), sdp_sk(sk)->cq);
+	sdp_arm_rx_cq(sk);
+	sdp_poll_rx_cq(sdp_sk(sk));
 
 	sk->sk_state_change(sk);
 	sk_wake_async(sk, 0, POLL_OUT);
@@ -332,8 +349,11 @@ static int sdp_disconnected_handler(struct sock *sk)
 
 	sdp_dbg(sk, "%s\n", __func__);
 
-	if (ssk->cq)
-		sdp_poll_cq(ssk, ssk->cq);
+	if (ssk->rx_cq)
+		sdp_poll_rx_cq(ssk);
+
+	if (ssk->tx_ring.cq)
+		sdp_xmit_poll(ssk, 1);
 
 	if (sk->sk_state == TCP_SYN_RECV) {
 		sdp_connected_handler(sk, NULL);
diff --git a/drivers/infiniband/ulp/sdp/sdp_main.c b/drivers/infiniband/ulp/sdp/sdp_main.c
index 6b7494f..454abeb 100644
--- a/drivers/infiniband/ulp/sdp/sdp_main.c
+++ b/drivers/infiniband/ulp/sdp/sdp_main.c
@@ -135,7 +135,8 @@ static int sdp_zcopy_thresh = 65536;
 module_param_named(sdp_zcopy_thresh, sdp_zcopy_thresh, int, 0644);
 MODULE_PARM_DESC(sdp_zcopy_thresh, "Zero copy send threshold; 0=0ff.");
 
-struct workqueue_struct *comp_wq;
+struct workqueue_struct *sdp_wq;
+struct workqueue_struct *rx_comp_wq;
 
 struct list_head sock_list;
 spinlock_t sock_list_lock;
@@ -205,12 +206,17 @@ static int sdp_get_port(struct sock *sk, unsigned short snum)
 static void sdp_destroy_qp(struct sdp_sock *ssk)
 {
 	struct ib_pd *pd = NULL;
-	struct ib_cq *cq = NULL;
+	struct ib_cq *rx_cq = NULL;
+	struct ib_cq *tx_cq = NULL;
+
+	del_timer(&ssk->tx_ring.timer);
 
 	if (ssk->qp) {
 		pd = ssk->qp->pd;
-		cq = ssk->cq;
-		ssk->cq = NULL;
+		rx_cq = ssk->rx_cq;
+		ssk->rx_cq = NULL;
+		tx_cq = ssk->tx_ring.cq;
+		ssk->tx_ring.cq = NULL;
 		ib_destroy_qp(ssk->qp);
 		ssk->qp = NULL;
 
@@ -231,8 +237,12 @@ static void sdp_destroy_qp(struct sdp_sock *ssk)
 		}
 	}
 
-	if (cq)
-		ib_destroy_cq(cq);
+	if (tx_cq) {
+		ib_destroy_cq(tx_cq);
+	}
+
+	if (rx_cq)
+		ib_destroy_cq(rx_cq);
 
 	if (ssk->mr) {
 		ib_dereg_mr(ssk->mr);
@@ -341,8 +351,11 @@ void sdp_reset_sk(struct sock *sk, int rc)
 
 	read_lock(&device_removal_lock);
 
-	if (ssk->cq)
-		sdp_poll_cq(ssk, ssk->cq);
+	if (ssk->rx_cq)
+		sdp_poll_rx_cq(ssk);
+
+	if (ssk->tx_ring.cq)
+		sdp_xmit_poll(ssk, 1);
 
 	if (!(sk->sk_shutdown & RCV_SHUTDOWN) || !sk_stream_memory_free(sk))
 		sdp_set_error(sk, rc);
@@ -355,7 +368,7 @@ void sdp_reset_sk(struct sock *sk, int rc)
 
 	/* Don't destroy socket before destroy work does its job */
 	sock_hold(sk, SOCK_REF_RESET);
-	queue_work(comp_wq, &ssk->destroy_work);
+	queue_work(sdp_wq, &ssk->destroy_work);
 
 	read_unlock(&device_removal_lock);
 }
@@ -773,11 +786,10 @@ out:
 	release_sock(sk);
 	if (newsk) {
 		lock_sock(newsk);
-		if (newssk->cq) {
-			sdp_dbg(newsk, "%s: ib_req_notify_cq\n", __func__);
+		if (newssk->rx_cq) {
 			newssk->poll_cq = 1;
-			sdp_arm_cq(&newssk->isk.sk);
-			sdp_poll_cq(newssk, newssk->cq);
+			sdp_arm_rx_cq(&newssk->isk.sk);
+			sdp_poll_rx_cq(newssk);
 		}
 		release_sock(newsk);
 	}
@@ -847,7 +859,7 @@ static inline void sdp_start_dreq_wait_timeout(struct sdp_sock *ssk, int timeo)
 {
 	sdp_dbg(&ssk->isk.sk, "Starting dreq wait timeout\n");
 
-	queue_delayed_work(comp_wq, &ssk->dreq_wait_work, timeo);
+	queue_delayed_work(sdp_wq, &ssk->dreq_wait_work, timeo);
 	ssk->dreq_wait_timeout = 1;
 }
 
@@ -1143,9 +1155,9 @@ static int sdp_getsockopt(struct sock *sk, int level, int optname,
 static inline int poll_recv_cq(struct sock *sk)
 {
 	int i;
-	if (sdp_sk(sk)->cq) {
+	if (sdp_sk(sk)->rx_cq) {
 		for (i = 0; i < recv_poll; ++i)
-			if (!sdp_poll_cq(sdp_sk(sk), sdp_sk(sk)->cq)) {
+			if (!sdp_poll_rx_cq(sdp_sk(sk))) {
 				++recv_poll_hit;
 				return 0;
 			}
@@ -1157,9 +1169,9 @@ static inline int poll_recv_cq(struct sock *sk)
 static inline void poll_send_cq(struct sock *sk)
 {
 	int i;
-	if (sdp_sk(sk)->cq) {
+	if (sdp_sk(sk)->tx_ring.cq) {
 		for (i = 0; i < send_poll; ++i)
-			if (!sdp_poll_cq(sdp_sk(sk), sdp_sk(sk)->cq)) {
+			if (sdp_xmit_poll(sdp_sk(sk), 1)) {
 				++send_poll_hit;
 				return;
 			}
@@ -1421,12 +1433,9 @@ static inline int sdp_bcopy_get(struct sock *sk, struct sk_buff *skb,
 			/* We can extend the last page
 			 * fragment. */
 			merge = 1;
-		} else if (i == ssk->send_frags ||
-			   (!i &&
-			   !(sk->sk_route_caps & NETIF_F_SG))) {
+		} else if (i == ssk->send_frags) {
 			/* Need to add new fragment and cannot
-			 * do this because interface is non-SG,
-			 * or because all the page slots are
+			 * do this because all the page slots are
 			 * busy. */
 			sdp_mark_push(ssk, skb);
 			return SDP_NEW_SEG;
@@ -1649,7 +1658,6 @@ void sdp_bzcopy_write_space(struct sdp_sock *ssk)
 	}
 }
 
-
 /* Like tcp_sendmsg */
 /* TODO: check locking */
 static int sdp_sendmsg(struct kiocb *iocb, struct sock *sk, struct msghdr *msg,
@@ -1812,7 +1820,9 @@ wait_for_sndbuf:
 wait_for_memory:
 			SDPSTATS_COUNTER_INC(send_wait_for_mem);
 			if (copied)
-				sdp_push(sk, ssk, flags & ~MSG_MORE, mss_now, TCP_NAGLE_PUSH);
+				sdp_push(sk, ssk, flags & ~MSG_MORE, PAGE_SIZE, TCP_NAGLE_PUSH);
+
+			sdp_xmit_poll(ssk, 1);
 
 			err = (bz) ? sdp_bzcopy_wait_memory(ssk, &timeo, bz) :
 				     sk_stream_wait_memory(sk, &timeo);
@@ -2386,17 +2396,25 @@ static int __init sdp_init(void)
 	sdp_proto.sockets_allocated = sockets_allocated;
 	sdp_proto.orphan_count = orphan_count;
 
-	comp_wq = create_singlethread_workqueue("comp_wq");
-	if (!comp_wq)
+	rx_comp_wq = create_singlethread_workqueue("rx_comp_wq");
+	if (!rx_comp_wq)
 		goto no_mem_rx_wq;
 
+	sdp_wq = create_singlethread_workqueue("sdp_wq");
+	if (!sdp_wq)
+		goto no_mem_sdp_wq;
+
 	rc = proto_register(&sdp_proto, 1);
-	if (rc)
+	if (rc) {
+		printk(KERN_WARNING "%s: proto_register failed: %d\n", __func__, rc);
 		goto error_proto_reg;
+	}
 
 	rc = sock_register(&sdp_net_proto);
-	if (rc)
+	if (rc) {
+		printk(KERN_WARNING "%s: sock_register failed: %d\n", __func__, rc);
 		goto error_sock_reg;
+	}
 
 	sdp_proc_init();
 
@@ -2409,7 +2427,9 @@ static int __init sdp_init(void)
 error_sock_reg:
 	proto_unregister(&sdp_proto);
 error_proto_reg:
-	destroy_workqueue(comp_wq);
+	destroy_workqueue(sdp_wq);
+no_mem_sdp_wq:
+	destroy_workqueue(rx_comp_wq);
 no_mem_rx_wq:
 	kfree(orphan_count);
 no_mem_orphan_count:
@@ -2427,7 +2447,8 @@ static void __exit sdp_exit(void)
 		printk(KERN_WARNING "%s: orphan_count %lld\n", __func__,
 		       percpu_counter_read_positive(orphan_count));
 
-	destroy_workqueue(comp_wq);
+	destroy_workqueue(rx_comp_wq);
+	destroy_workqueue(sdp_wq);
 
 	flush_scheduled_work();
 
diff --git a/drivers/infiniband/ulp/sdp/sdp_proc.c b/drivers/infiniband/ulp/sdp/sdp_proc.c
index e759864..0971a49 100644
--- a/drivers/infiniband/ulp/sdp/sdp_proc.c
+++ b/drivers/infiniband/ulp/sdp/sdp_proc.c
@@ -260,9 +260,11 @@ static int sdpstats_seq_show(struct seq_file *seq, void *v)
 
 	seq_printf(seq, "rx_poll_miss      \t\t: %d\n", sdpstats.rx_poll_miss);
 	seq_printf(seq, "tx_poll_miss      \t\t: %d\n", sdpstats.tx_poll_miss);
+	seq_printf(seq, "tx_poll_hit       \t\t: %d\n", sdpstats.tx_poll_hit);
 
 	seq_printf(seq, "CQ stats:\n");
-	seq_printf(seq, "- interrupts\t\t: %d\n", sdpstats.int_count);
+	seq_printf(seq, "- RX interrupts\t\t: %d\n", sdpstats.rx_int_count);
+	seq_printf(seq, "- TX interrupts\t\t: %d\n", sdpstats.tx_int_count);
 	return 0;
 }
 
-- 
1.5.3.7




More information about the general mailing list