[ofa-general] [PATCH 1/1] SDP - Fix bug where zcopy bcopy returns before data copied

Jim Mott jim at mellanox.com
Sun Nov 11 07:56:06 PST 2007


Mellanox regression testing for data correctness started failing 
after the recent addition of bzcopy.  This was because sdp_sendmsg
returned before all in-flight RC transfers completed.  This
allowed user space to modify buffers that had not been sent.

A big oops.

This fixes that bug.  Small frame bandwidth is even worse
now, but small frame latency is lower which is good.  The
default transfer size that triggers bzcopy has been
increased to the bandwidth crossover point found in
MLX4-MLX4 tests.  More work will be required to find the
best value for the release.

Signed-off-by: Jim Mott <jim at mellanox.com>
---

Index: ofed_1_3/drivers/infiniband/ulp/sdp/sdp.h
===================================================================
--- ofed_1_3.orig/drivers/infiniband/ulp/sdp/sdp.h	2007-11-11
07:33:36.000000000 -0800
+++ ofed_1_3/drivers/infiniband/ulp/sdp/sdp.h	2007-11-11
07:36:47.000000000 -0800
@@ -148,6 +148,8 @@ struct sdp_sock {
 	unsigned rx_tail;
 	unsigned mseq_ack;
 	unsigned bufs;
+	unsigned max_bufs;	/* Initial buffers offered by other side
*/
+	unsigned min_bufs;	/* Low water mark to wake senders */
 
 	int               remote_credits;
 
@@ -168,13 +170,28 @@ struct sdp_sock {
 	int recv_frags; 	/* max skb frags in recv packets */
 	int send_frags; 	/* max skb frags in send packets */
 
-	/* ZCOPY data */
-	int zcopy_thresh;
+	/* BZCOPY data */
+	int   zcopy_thresh;
+	void *zcopy_context;
 
 	struct ib_sge ibsge[SDP_MAX_SEND_SKB_FRAGS + 1];
 	struct ib_wc  ibwc[SDP_NUM_WC];
 };
 
+/* Context used for synchronous zero copy bcopy (BZCOY) */
+struct bzcopy_state {
+	unsigned char __user  *u_base;
+	int                    u_len;
+	int                    left;
+	int                    page_cnt;
+	int                    cur_page;
+	int                    cur_offset;
+	int                    busy;
+	struct sdp_sock      *ssk;
+	struct page         **pages;
+};
+
+
 extern struct proto sdp_proto;
 extern struct workqueue_struct *sdp_workqueue;
 
@@ -246,5 +263,6 @@ void sdp_remove_large_sock(struct sdp_so
 int sdp_resize_buffers(struct sdp_sock *ssk, u32 new_size);
 void sdp_post_keepalive(struct sdp_sock *ssk);
 void sdp_start_keepalive_timer(struct sock *sk);
+void sdp_bzcopy_write_space(struct sdp_sock *ssk);
 
 #endif
Index: ofed_1_3/drivers/infiniband/ulp/sdp/sdp_bcopy.c
===================================================================
--- ofed_1_3.orig/drivers/infiniband/ulp/sdp/sdp_bcopy.c
2007-11-11 07:33:36.000000000 -0800
+++ ofed_1_3/drivers/infiniband/ulp/sdp/sdp_bcopy.c	2007-11-11
07:36:47.000000000 -0800
@@ -240,6 +240,19 @@ struct sk_buff *sdp_send_completion(stru
 
 	ssk->snd_una += TCP_SKB_CB(skb)->end_seq;
 	++ssk->tx_tail;
+
+	/* TODO: AIO and real zcopy cdoe; add their context support here
*/
+	if (ssk->zcopy_context && skb->data_len) {
+		struct bzcopy_state *bz;
+		struct sdp_bsdh *h;
+
+		h = (struct sdp_bsdh *)skb->data;
+		if (h->mid == SDP_MID_DATA) {
+			bz = (struct bzcopy_state *)ssk->zcopy_context;
+			bz->busy--;
+		}
+	}
+
 	return skb;
 }
 
@@ -668,8 +681,6 @@ static void sdp_handle_wc(struct sdp_soc
 				wake_up(&ssk->wq);
 			}
 		}
-
-		sk_stream_write_space(&ssk->isk.sk);
 	} else {
 		sdp_cnt(sdp_keepalive_probes_sent);
 
@@ -688,11 +699,6 @@ static void sdp_handle_wc(struct sdp_soc
 		return;
 	}
 
-	if (likely(!wc->status)) {
-		sdp_post_recvs(ssk);
-		sdp_post_sends(ssk, 0);
-	}
-
 	if (ssk->time_wait && !ssk->isk.sk.sk_send_head &&
 	    ssk->tx_head == ssk->tx_tail) {
 		sdp_dbg(&ssk->isk.sk, "%s: destroy in time wait
state\n",
@@ -719,6 +725,21 @@ int sdp_poll_cq(struct sdp_sock *ssk, st
 			ret = 0;
 		}
 	} while (n == SDP_NUM_WC);
+
+	if (!ret) {
+		struct sock *sk = &ssk->isk.sk;
+
+		sdp_post_recvs(ssk);
+		sdp_post_sends(ssk, 0);
+
+		if (sk->sk_sleep && waitqueue_active(sk->sk_sleep)) {
+			if (ssk->zcopy_context)
+				sdp_bzcopy_write_space(ssk);
+			else
+				sk_stream_write_space(&ssk->isk.sk);
+		}
+	}
+
 	return ret;
 }
 
Index: ofed_1_3/drivers/infiniband/ulp/sdp/sdp_cma.c
===================================================================
--- ofed_1_3.orig/drivers/infiniband/ulp/sdp/sdp_cma.c	2007-11-11
07:33:36.000000000 -0800
+++ ofed_1_3/drivers/infiniband/ulp/sdp/sdp_cma.c	2007-11-11
07:36:47.000000000 -0800
@@ -234,16 +234,19 @@ int sdp_connect_handler(struct sock *sk,
 		return rc;
 	}
 
-	sdp_sk(child)->bufs = ntohs(h->bsdh.bufs);
+	sdp_sk(child)->max_bufs = sdp_sk(child)->bufs =
ntohs(h->bsdh.bufs);
+	sdp_sk(child)->min_bufs = sdp_sk(child)->bufs / 4;
 	sdp_sk(child)->xmit_size_goal = ntohl(h->localrcvsz) -
 		sizeof(struct sdp_bsdh);
 	sdp_sk(child)->send_frags =
PAGE_ALIGN(sdp_sk(child)->xmit_size_goal) /
 		PAGE_SIZE;
 	sdp_resize_buffers(sdp_sk(child), ntohl(h->desremrcvsz));
 
-	sdp_dbg(child, "%s bufs %d xmit_size_goal %d\n", __func__,
+	sdp_dbg(child, "%s bufs %d xmit_size_goal %d send trigger %d\n",
+		__func__,
 		sdp_sk(child)->bufs,
-		sdp_sk(child)->xmit_size_goal);
+		sdp_sk(child)->xmit_size_goal,
+		sdp_sk(child)->min_bufs);
 
 	id->context = child;
 	sdp_sk(child)->id = id;
@@ -276,15 +279,18 @@ static int sdp_response_handler(struct s
 		return 0;
 
 	h = event->param.conn.private_data;
-	sdp_sk(sk)->bufs = ntohs(h->bsdh.bufs);
+	sdp_sk(sk)->max_bufs = sdp_sk(sk)->bufs = ntohs(h->bsdh.bufs);
+	sdp_sk(sk)->min_bufs = sdp_sk(sk)->bufs / 4;
 	sdp_sk(sk)->xmit_size_goal = ntohl(h->actrcvsz) -
 		sizeof(struct sdp_bsdh);
 	sdp_sk(sk)->send_frags = PAGE_ALIGN(sdp_sk(sk)->xmit_size_goal)
/
 		PAGE_SIZE;
 
-	sdp_dbg(sk, "%s bufs %d xmit_size_goal %d\n", __func__,
+	sdp_dbg(sk, "%s bufs %d xmit_size_goal %d send trigger %d\n",
+		__func__,
 		sdp_sk(sk)->bufs,
-		sdp_sk(sk)->xmit_size_goal);
+		sdp_sk(sk)->xmit_size_goal,
+		sdp_sk(sk)->min_bufs);
 
 	ib_req_notify_cq(sdp_sk(sk)->cq, IB_CQ_NEXT_COMP);
 
Index: ofed_1_3/drivers/infiniband/ulp/sdp/sdp_main.c
===================================================================
--- ofed_1_3.orig/drivers/infiniband/ulp/sdp/sdp_main.c	2007-11-11
07:33:36.000000000 -0800
+++ ofed_1_3/drivers/infiniband/ulp/sdp/sdp_main.c	2007-11-11
07:36:47.000000000 -0800
@@ -74,16 +74,6 @@ unsigned int csum_partial_copy_from_user
 #include "sdp.h"
 #include <linux/delay.h>
 
-struct bzcopy_state {
-	unsigned char __user  *u_base;
-	int                    u_len;
-	int		       left;
-	int                    page_cnt;
-        int                    cur_page;
-        int                    cur_offset;
-	struct page          **pages;
-};
-
 MODULE_AUTHOR("Michael S. Tsirkin");
 MODULE_DESCRIPTION("InfiniBand SDP module");
 MODULE_LICENSE("Dual BSD/GPL");
@@ -141,7 +131,7 @@ static unsigned int sdp_keepalive_time =
 module_param_named(sdp_keepalive_time, sdp_keepalive_time, uint, 0644);
 MODULE_PARM_DESC(sdp_keepalive_time, "Default idle time in seconds
before keepalive probe sent.");
 
-static int sdp_zcopy_thresh = 2048;
+static int sdp_zcopy_thresh = 8192;
 module_param_named(sdp_zcopy_thresh, sdp_zcopy_thresh, int, 0644);
 MODULE_PARM_DESC(sdp_zcopy_thresh, "Zero copy send threshold; 0=0ff.");
 
@@ -1213,9 +1203,12 @@ void sdp_push_one(struct sock *sk, unsig
 {
 }
 
-static struct bzcopy_state *sdp_bz_cleanup(struct bzcopy_state *bz)
+static inline struct bzcopy_state *sdp_bz_cleanup(struct bzcopy_state
*bz)
 {
 	int i;
+	struct sdp_sock *ssk = (struct sdp_sock *)bz->ssk;
+
+	ssk->zcopy_context = NULL;
 
 	if (bz->pages) {
 		for (i = bz->cur_page; i < bz->page_cnt; i++)
@@ -1266,6 +1259,8 @@ static struct bzcopy_state *sdp_bz_setup
 	bz->u_len      = len;
 	bz->left       = len;
 	bz->cur_offset = addr & ~PAGE_MASK;
+	bz->busy       = 0;
+	bz->ssk        = ssk;
 	bz->page_cnt   = PAGE_ALIGN(len + bz->cur_offset) >> PAGE_SHIFT;
 	bz->pages      = kcalloc(bz->page_cnt, sizeof(struct page *),
GFP_KERNEL);
 
@@ -1287,6 +1282,7 @@ static struct bzcopy_state *sdp_bz_setup
 	}
 
 	up_write(&current->mm->mmap_sem);
+	ssk->zcopy_context = bz;
 
 	return bz;
 
@@ -1398,6 +1394,7 @@ static inline int sdp_bzcopy_get(struct 
 	int this_page, left;
 	struct sdp_sock *ssk = sdp_sk(sk);
 
+	/* Push the first chunk to page align all following - TODO:
review */
 	if (skb_shinfo(skb)->nr_frags == ssk->send_frags) {
 		sdp_mark_push(ssk, skb);
 		return SDP_NEW_SEG;
@@ -1449,9 +1446,110 @@ static inline int sdp_bzcopy_get(struct 
 	}
 
 	bz->left -= copy;
+	bz->busy++;
 	return copy;
 }
 
+static inline int slots_free(struct sdp_sock *ssk)
+{
+	int min_free;
+
+	min_free = SDP_TX_SIZE - (ssk->tx_head - ssk->tx_tail);
+	if (ssk->bufs < min_free)
+		min_free = ssk->bufs;
+	min_free -= (min_free < SDP_MIN_BUFS) ? min_free : SDP_MIN_BUFS;
+
+	return min_free;
+};
+
+/* like sk_stream_memory_free - except measures remote credits */
+static inline int sdp_bzcopy_slots_avail(struct sdp_sock *ssk)
+{
+	struct bzcopy_state *bz = (struct bzcopy_state
*)ssk->zcopy_context;
+
+	BUG_ON(!bz);
+	return slots_free(ssk) > bz->busy;
+}
+
+/* like sk_stream_wait_memory - except waits on remote credits */
+static int sdp_bzcopy_wait_memory(struct sdp_sock *ssk, long *timeo_p)
+{
+	struct sock *sk = &ssk->isk.sk;
+	struct bzcopy_state *bz = (struct bzcopy_state
*)ssk->zcopy_context;
+	int err = 0;
+	long vm_wait = 0;
+	long current_timeo = *timeo_p;
+	DEFINE_WAIT(wait);
+
+	BUG_ON(!bz);
+
+	if (sdp_bzcopy_slots_avail(ssk))
+		current_timeo = vm_wait = (net_random() % (HZ / 5)) + 2;
+
+	while (1) {
+		set_bit(SOCK_ASYNC_NOSPACE, &sk->sk_socket->flags);
+
+		prepare_to_wait(sk->sk_sleep, &wait,
TASK_INTERRUPTIBLE);
+
+		if (unlikely(sk->sk_err | (sk->sk_shutdown &
SEND_SHUTDOWN))) {
+			err = -EPIPE;
+			break;
+		}
+
+		if (unlikely(!*timeo_p)) {
+			err = -EAGAIN;
+			break;
+		}
+
+		if (unlikely(signal_pending(current))) {
+			err = sock_intr_errno(*timeo_p);
+			break;
+		}
+
+		clear_bit(SOCK_ASYNC_NOSPACE, &sk->sk_socket->flags);
+
+		if (sdp_bzcopy_slots_avail(ssk))
+			break;
+
+		set_bit(SOCK_NOSPACE, &sk->sk_socket->flags);
+		sk->sk_write_pending++;
+		sk_wait_event(sk, &current_timeo,
+			sdp_bzcopy_slots_avail(ssk) && vm_wait);
+		sk->sk_write_pending--;
+
+		if (vm_wait) {
+			vm_wait -= current_timeo;
+			current_timeo = *timeo_p;
+			if (current_timeo != MAX_SCHEDULE_TIMEOUT &&
+			    (current_timeo -= vm_wait) < 0)
+				current_timeo = 0;
+			vm_wait = 0;
+		}
+		*timeo_p = current_timeo;
+	}
+
+	finish_wait(sk->sk_sleep, &wait);
+	return err;
+}
+
+/* like sk_stream_write_space - execpt measures remote credits */
+void sdp_bzcopy_write_space(struct sdp_sock *ssk)
+{
+	struct sock *sk = &ssk->isk.sk;
+	struct socket *sock = sk->sk_socket;
+
+	if (ssk->bufs >= ssk->min_bufs &&
+	    ssk->tx_head == ssk->tx_tail &&
+	   sock != NULL) {
+		clear_bit(SOCK_NOSPACE, &sock->flags);
+
+		if (sk->sk_sleep && waitqueue_active(sk->sk_sleep))
+			wake_up_interruptible(sk->sk_sleep);
+		if (sock->fasync_list && !(sk->sk_shutdown &
SEND_SHUTDOWN))
+			sock_wake_async(sock, 2, POLL_OUT);
+	}
+}
+
 
 /* Like tcp_sendmsg */
 /* TODO: check locking */
@@ -1510,11 +1608,20 @@ int sdp_sendmsg(struct kiocb *iocb, stru
 			    (copy = size_goal - skb->len) <= 0) {
 
 new_segment:
-				/* Allocate new segment. If the
interface is SG,
-				 * allocate skb fitting to single page.
+				/*
+				 * Allocate a new segment
+				 *   For bcopy, we stop sending once we
have
+				 * SO_SENDBUF bytes in flight.  For
bzcopy
+				 * we stop sending once we run out of
remote
+				 * receive credits.
 				 */
-				if (!sk_stream_memory_free(sk))
-					goto wait_for_sndbuf;
+				if (bz) {
+					if
(!sdp_bzcopy_slots_avail(ssk))
+						goto wait_for_sndbuf;
+				} else {
+					if (!sk_stream_memory_free(sk))
+						goto wait_for_sndbuf;
+				}
 
 				skb = sk_stream_alloc_pskb(sk,
select_size(sk, ssk),
 							   0,
sk->sk_allocation);
@@ -1586,7 +1693,9 @@ wait_for_memory:
 			if (copied)
 				sdp_push(sk, ssk, flags & ~MSG_MORE,
mss_now, TCP_NAGLE_PUSH);
 
-			if ((err = sk_stream_wait_memory(sk, &timeo)) !=
0)
+			err = (bz) ? sdp_bzcopy_wait_memory(ssk, &timeo)
:
+				     sk_stream_wait_memory(sk, &timeo);
+			if (err)
 				goto do_error;
 
 			mss_now = sdp_current_mss(sk, !(flags&MSG_OOB));
@@ -1595,12 +1704,30 @@ wait_for_memory:
 	}
 
 out:
-	if (bz)
-		bz = sdp_bz_cleanup(bz);
-	if (copied)
+	if (copied) {
 		sdp_push(sk, ssk, flags, mss_now, ssk->nonagle);
-	if (size > send_poll_thresh)
-		poll_send_cq(sk);
+		if (bz) {
+			int max_retry;
+
+			/* Wait for in-flight sends; should be quick */
+			for (max_retry = 0; max_retry < 10000;
max_retry++) {
+				if (!bz->busy)
+					break;
+
+				poll_send_cq(sk);
+			}
+
+			if (bz->busy)
+				sdp_warn(sk,
+					 "Could not reap %d in-flight
sends\n",
+					 bz->busy);
+
+			bz = sdp_bz_cleanup(bz);
+		} else
+			if (size > send_poll_thresh)
+				poll_send_cq(sk);
+	}
+
 	release_sock(sk);
 	return copied;




More information about the general mailing list