[ofa-general] [PATCH 1/1] IB/SDP - Zero copy bcopy support

Jim Mott jim at mellanox.com
Tue Oct 9 15:45:28 PDT 2007


This patch adds zero copy send support to SDP.  Below 2K transfer size, 
it is better to bcopy.  With larger transfers, this is a net win on
bandwidth.  Latency testing is yet to be done.

                     BCOPY        BZCOPY
   1K  TCP_STREAM  3555 Mb/sec  2250 Mb/sec
   2K  TCP_STREAM  3650 Mb/sec  3785 Mb/sec
   4K  TCP_STREAM  3560 Mb/sec  6220 Mb/sec
   8K  TCP_STREAM  3555 Mb/sec  6190 Mb/sec
  16K  TCP_STREAM  5100 Mb/sec  6155 Mb/sec
   1M  TCP_STREAM  4630 Mb/sec  6210 Mb/sec

Performance work still remains.  Open issues include correct setsockopt 
defines (use previous SDP values?), code cleanup, performance tuning,
rigorous regression testing, and multi-OS build+test.  Simple testing to
date includes netperf and iperf, ^C recovery, unload/load, and checking
for gross memory leaks on Rhat4u4.


Signed-off-by: Jim Mott <jim at mellanox.com>
---

Index: ofa_1_3_dev_kernel/drivers/infiniband/ulp/sdp/sdp.h
===================================================================
--- ofa_1_3_dev_kernel.orig/drivers/infiniband/ulp/sdp/sdp.h	2007-10-08 08:20:57.000000000 -0500
+++ ofa_1_3_dev_kernel/drivers/infiniband/ulp/sdp/sdp.h	2007-10-08 08:31:41.000000000 -0500
@@ -50,6 +50,9 @@ extern int sdp_data_debug_level;
 #define SDP_HEAD_SIZE (PAGE_SIZE / 2 + sizeof(struct sdp_bsdh))
 #define SDP_NUM_WC 4
 
+#define SDP_MIN_ZCOPY_THRESH    1024
+#define SDP_MAX_ZCOPY_THRESH 1048576
+
 #define SDP_OP_RECV 0x800000000LL
 
 enum sdp_mid {
@@ -70,6 +73,13 @@ enum {
 	SDP_MIN_BUFS = 2
 };
 
+enum {
+	SDP_ERR_ERROR   = -4,
+	SDP_ERR_FAULT   = -3,
+	SDP_NEW_SEG     = -2,
+	SDP_DO_WAIT_MEM = -1
+};
+
 struct rdma_cm_id;
 struct rdma_cm_event;
 
@@ -148,6 +158,9 @@ struct sdp_sock {
 	int recv_frags;
 	int send_frags;
 
+	/* ZCOPY data */
+	int zcopy_thresh;
+
 	struct ib_sge ibsge[SDP_MAX_SEND_SKB_FRAGS + 1];
 	struct ib_wc  ibwc[SDP_NUM_WC];
 };
Index: ofa_1_3_dev_kernel/drivers/infiniband/ulp/sdp/sdp_main.c
===================================================================
--- ofa_1_3_dev_kernel.orig/drivers/infiniband/ulp/sdp/sdp_main.c	2007-10-08 08:21:05.000000000 -0500
+++ ofa_1_3_dev_kernel/drivers/infiniband/ulp/sdp/sdp_main.c	2007-10-09 16:52:34.000000000 -0500
@@ -65,6 +65,16 @@ unsigned int csum_partial_copy_from_user
 #include "sdp.h"
 #include <linux/delay.h>
 
+struct bzcopy_state {
+	unsigned char __user  *u_base;
+	int                    u_len;
+	int         	     left;
+	int                    page_cnt;
+     int                    cur_page;
+     int                    cur_offset;
+	struct page          **pages;
+};
+
 MODULE_AUTHOR("Michael S. Tsirkin");
 MODULE_DESCRIPTION("InfiniBand SDP module");
 MODULE_LICENSE("Dual BSD/GPL");
@@ -117,6 +127,10 @@ static int send_poll_thresh = 8192;
 module_param_named(send_poll_thresh, send_poll_thresh, int, 0644);
 MODULE_PARM_DESC(send_poll_thresh, "Send message size thresh hold over which to start polling.");
 
+static int sdp_zcopy_thresh = 0;
+module_param_named(sdp_zcopy_thresh, sdp_zcopy_thresh, int, 0644);
+MODULE_PARM_DESC(sdp_zcopy_thresh, "Zero copy send threshold; 0=0ff.");
+
 struct workqueue_struct *sdp_workqueue;
 
 static struct list_head sock_list;
@@ -867,6 +881,12 @@ static int sdp_setsockopt(struct sock *s
 			sdp_push_pending_frames(sk);
 		}
 		break;
+	case SDP_ZCOPY_THRESH:
+		if (val < SDP_MIN_ZCOPY_THRESH || val > SDP_MAX_ZCOPY_THRESH)
+			err = -EINVAL;
+		else
+			ssk->zcopy_thresh = val;
+		break;
 	default:
 		err = -ENOPROTOOPT;
 		break;
@@ -904,6 +924,9 @@ static int sdp_getsockopt(struct sock *s
 	case TCP_CORK:
 		val = !!(ssk->nonagle&TCP_NAGLE_CORK);
 		break;
+	case SDP_ZCOPY_THRESH:
+		val = ssk->zcopy_thresh ? ssk->zcopy_thresh : sdp_zcopy_thresh;
+		break;
 	default:
 		return -ENOPROTOOPT;
 	}
@@ -1051,10 +1074,252 @@ void sdp_push_one(struct sock *sk, unsig
 {
 }
 
-/* Like tcp_sendmsg */
-/* TODO: check locking */
+static struct bzcopy_state *sdp_bz_cleanup(struct bzcopy_state *bz)
+{
+	int i;
+
+	if (bz->pages) {
+		for (i = bz->cur_page; i < bz->page_cnt; i++)
+			put_page(bz->pages[i]);
+
+		kfree(bz->pages);
+	}
+
+	kfree(bz);
+
+	return NULL;
+}
+
+
+static struct bzcopy_state *sdp_bz_setup(struct sdp_sock *ssk,
+					 unsigned char __user *base,
+					 int len,
+					 int size_goal)
+{
+	struct bzcopy_state *bz;
+	unsigned long addr;
+	unsigned long locked, locked_limit;
+	int done_pages;
+	int thresh;
+
+	thresh = ssk->zcopy_thresh ? : sdp_zcopy_thresh;
+	if (thresh == 0 || len < thresh)
+		return NULL;
+
+	if (!can_do_mlock())
+		return NULL;
+
+	bz = kzalloc(sizeof(*bz), GFP_KERNEL);
+	if (!bz)
+		return NULL;
+
+	/*
+	 *   Since we use the TCP segmentation fields of the skb to map user
+	 * pages, we must make sure that everything we send in a single chunk
+	 * fits into the frags array in the skb.
+	 */
+	size_goal = size_goal / PAGE_SIZE + 1;
+	if (size_goal >= MAX_SKB_FRAGS)
+		return NULL;
+
+	addr = (unsigned long)base;
+
+	bz->u_base     = base;
+	bz->u_len      = len;
+	bz->left       = len;
+	bz->cur_offset = addr & ~PAGE_MASK;
+	bz->page_cnt   = PAGE_ALIGN(len + bz->cur_offset) >> PAGE_SHIFT;
+	bz->pages      = kcalloc(bz->page_cnt, sizeof(struct page *), GFP_KERNEL);
+
+	if (!bz->pages)
+		goto out_1;
+
+	down_write(&current->mm->mmap_sem);
+
+	locked       = bz->page_cnt + current->mm->locked_vm;
+	locked_limit = current->rlim[RLIMIT_MEMLOCK].rlim_cur >> PAGE_SHIFT;
+
+	if ((locked > locked_limit) && !capable(CAP_IPC_LOCK))
+		goto out_2;
+
+	addr &= PAGE_MASK;
+
+	done_pages = get_user_pages(current, current->mm, addr, bz->page_cnt,
+				    0, 0, bz->pages, NULL);
+	if (unlikely(done_pages != bz->page_cnt)){
+		bz->page_cnt = done_pages;
+		goto out_2;
+	}
+
+	up_write(&current->mm->mmap_sem);
+
+	return bz;
+
+out_2:
+	up_write(&current->mm->mmap_sem);
+out_1:
+	sdp_bz_cleanup(bz);
+
+	return NULL;
+}
+
+
 #define TCP_PAGE(sk)	(sk->sk_sndmsg_page)
 #define TCP_OFF(sk)	(sk->sk_sndmsg_off)
+static inline int sdp_bcopy_get(struct sock *sk, struct sk_buff *skb,
+				unsigned char __user *from, int copy)
+{
+	int err;
+	struct sdp_sock *ssk = sdp_sk(sk);
+
+	/* Where to copy to? */
+	if (skb_tailroom(skb) > 0) {
+		/* We have some space in skb head. Superb! */
+		if (copy > skb_tailroom(skb))
+			copy = skb_tailroom(skb);
+		if ((err = skb_add_data(skb, from, copy)) != 0)
+			return SDP_ERR_FAULT;
+	} else {
+		int merge = 0;
+		int i = skb_shinfo(skb)->nr_frags;
+		struct page *page = TCP_PAGE(sk);
+		int off = TCP_OFF(sk);
+
+		if (skb_can_coalesce(skb, i, page, off) &&
+		    off != PAGE_SIZE) {
+			/* We can extend the last page
+			 * fragment. */
+			merge = 1;
+		} else if (i == ssk->send_frags ||
+			   (!i &&
+			   !(sk->sk_route_caps & NETIF_F_SG))) {
+			/* Need to add new fragment and cannot
+			 * do this because interface is non-SG,
+			 * or because all the page slots are
+			 * busy. */
+			sdp_mark_push(ssk, skb);
+			return SDP_NEW_SEG;
+		} else if (page) {
+			if (off == PAGE_SIZE) {
+				put_page(page);
+				TCP_PAGE(sk) = page = NULL;
+				off = 0;
+			}
+		} else
+			off = 0;
+
+		if (copy > PAGE_SIZE - off)
+			copy = PAGE_SIZE - off;
+
+		if (!sk_stream_wmem_schedule(sk, copy))
+			return SDP_DO_WAIT_MEM;
+
+		if (!page) {
+			/* Allocate new cache page. */
+			if (!(page = sk_stream_alloc_page(sk)))
+				return SDP_DO_WAIT_MEM;
+		}
+
+		/* Time to copy data. We are close to
+		 * the end! */
+		err = skb_copy_to_page(sk, from, skb, page,
+				       off, copy);
+		if (err) {
+			/* If this page was new, give it to the
+			 * socket so it does not get leaked.
+			 */
+			if (!TCP_PAGE(sk)) {
+				TCP_PAGE(sk) = page;
+				TCP_OFF(sk) = 0;
+			}
+			return SDP_ERR_ERROR;
+		}
+
+		/* Update the skb. */
+		if (merge) {
+			skb_shinfo(skb)->frags[i - 1].size +=
+							copy;
+		} else {
+			skb_fill_page_desc(skb, i, page, off, copy);
+			if (TCP_PAGE(sk)) {
+				get_page(page);
+			} else if (off + copy < PAGE_SIZE) {
+				get_page(page);
+				TCP_PAGE(sk) = page;
+			}
+		}
+
+		TCP_OFF(sk) = off + copy;
+	}
+
+	return copy;
+}
+
+
+static inline int sdp_bzcopy_get(struct sock *sk, struct sk_buff *skb,
+				 unsigned char __user *from, int copy,
+				 struct bzcopy_state *bz)
+{
+	int this_page, left;
+	struct sdp_sock *ssk = sdp_sk(sk);
+
+	if (skb_shinfo(skb)->nr_frags == ssk->send_frags) {
+		sdp_mark_push(ssk, skb);
+		return SDP_NEW_SEG;
+	}
+
+	left = copy;
+	BUG_ON(left > bz->left);
+
+	while (left) {
+		if (skb_shinfo(skb)->nr_frags == ssk->send_frags) {
+			copy = copy - left;
+			break;
+		}
+
+		this_page = PAGE_SIZE - bz->cur_offset;
+
+		if (left <= this_page)
+			this_page = left;
+
+		if (!sk_stream_wmem_schedule(sk, copy))
+			return SDP_DO_WAIT_MEM;
+
+		skb_fill_page_desc(skb, skb_shinfo(skb)->nr_frags,
+				   bz->pages[bz->cur_page], bz->cur_offset,
+				   this_page);
+
+		BUG_ON(skb_shinfo(skb)->nr_frags >= MAX_SKB_FRAGS);
+
+		bz->cur_offset += this_page;
+		if (bz->cur_offset == PAGE_SIZE) {
+			bz->cur_offset = 0;
+			bz->cur_page++;
+
+			BUG_ON(bz->cur_page > bz->page_cnt);
+		} else {
+			BUG_ON(bz->cur_offset > PAGE_SIZE);
+
+			if (bz->cur_page != bz->page_cnt || left != this_page)
+				get_page(bz->pages[bz->cur_page]);
+		}
+
+		left -= this_page;
+
+		skb->len             += this_page;
+		skb->data_len         = skb->len;
+		skb->truesize        += this_page;
+		sk->sk_wmem_queued   += this_page;
+		sk->sk_forward_alloc -= this_page;
+	}
+
+	bz->left -= copy;
+	return copy;
+}
+
+
+/* Like tcp_sendmsg */
+/* TODO: check locking */
 int sdp_sendmsg(struct kiocb *iocb, struct sock *sk, struct msghdr *msg,
 		size_t size)
 {
@@ -1065,6 +1330,7 @@ int sdp_sendmsg(struct kiocb *iocb, stru
 	int mss_now, size_goal;
 	int err, copied;
 	long timeo;
+	struct bzcopy_state *bz = NULL;
 
 	lock_sock(sk);
 	sdp_dbg_data(sk, "%s\n", __func__);
@@ -1098,6 +1364,8 @@ int sdp_sendmsg(struct kiocb *iocb, stru
 
 		iov++;
 
+		bz = sdp_bz_setup(ssk, from, seglen, size_goal);
+
 		while (seglen > 0) {
 			int copy;
 
@@ -1141,84 +1409,17 @@ new_segment:
 				sdp_mark_push(ssk, skb);
 				goto new_segment;
 			}
-			/* Where to copy to? */
-			if (skb_tailroom(skb) > 0) {
-				/* We have some space in skb head. Superb! */
-				if (copy > skb_tailroom(skb))
-					copy = skb_tailroom(skb);
-				if ((err = skb_add_data(skb, from, copy)) != 0)
-					goto do_fault;
-			} else {
-				int merge = 0;
-				int i = skb_shinfo(skb)->nr_frags;
-				struct page *page = TCP_PAGE(sk);
-				int off = TCP_OFF(sk);
-
-				if (skb_can_coalesce(skb, i, page, off) &&
-				    off != PAGE_SIZE) {
-					/* We can extend the last page
-					 * fragment. */
-					merge = 1;
-				} else if (i == ssk->send_frags ||
-					   (!i &&
-					   !(sk->sk_route_caps & NETIF_F_SG))) {
-					/* Need to add new fragment and cannot
-					 * do this because interface is non-SG,
-					 * or because all the page slots are
-					 * busy. */
-					sdp_mark_push(ssk, skb);
-					goto new_segment;
-				} else if (page) {
-					if (off == PAGE_SIZE) {
-						put_page(page);
-						TCP_PAGE(sk) = page = NULL;
-						off = 0;
-					}
-				} else
-					off = 0;
-
-				if (copy > PAGE_SIZE - off)
-					copy = PAGE_SIZE - off;
 
-				if (!sk_stream_wmem_schedule(sk, copy))
+			copy = (bz) ? sdp_bzcopy_get(sk, skb, from, copy, bz) :
+				      sdp_bcopy_get(sk, skb, from, copy);
+			if (unlikely(copy < 0)) {
+				if (!++copy)
 					goto wait_for_memory;
-
-				if (!page) {
-					/* Allocate new cache page. */
-					if (!(page = sk_stream_alloc_page(sk)))
-						goto wait_for_memory;
-				}
-
-				/* Time to copy data. We are close to
-				 * the end! */
-				err = skb_copy_to_page(sk, from, skb, page,
-						       off, copy);
-				if (err) {
-					/* If this page was new, give it to the
-					 * socket so it does not get leaked.
-					 */
-					if (!TCP_PAGE(sk)) {
-						TCP_PAGE(sk) = page;
-						TCP_OFF(sk) = 0;
-					}
-					goto do_error;
-				}
-
-				/* Update the skb. */
-				if (merge) {
-					skb_shinfo(skb)->frags[i - 1].size +=
-									copy;
-				} else {
-					skb_fill_page_desc(skb, i, page, off, copy);
-					if (TCP_PAGE(sk)) {
-						get_page(page);
-					} else if (off + copy < PAGE_SIZE) {
-						get_page(page);
-						TCP_PAGE(sk) = page;
-					}
-				}
-
-				TCP_OFF(sk) = off + copy;
+				if (!++copy)
+					goto new_segment;
+				if (!++copy)
+					goto do_fault;
+				goto do_error;
 			}
 
 			if (!copied)
@@ -1259,6 +1460,8 @@ wait_for_memory:
 	}
 
 out:
+	if (bz)
+		bz = sdp_bz_cleanup(bz);
 	if (copied)
 		sdp_push(sk, ssk, flags, mss_now, ssk->nonagle);
 	if (size > send_poll_thresh)
@@ -1278,6 +1481,8 @@ do_error:
 	if (copied)
 		goto out;
 out_err:
+	if (bz)
+		bz = sdp_bz_cleanup(bz);
 	err = sk_stream_error(sk, flags, err);
 	release_sock(sk);
 	return err;
Index: ofa_1_3_dev_kernel/drivers/infiniband/ulp/sdp/sdp_socket.h
===================================================================
--- ofa_1_3_dev_kernel.orig/drivers/infiniband/ulp/sdp/sdp_socket.h	2007-09-26 01:30:20.000000000 -0500
+++ ofa_1_3_dev_kernel/drivers/infiniband/ulp/sdp/sdp_socket.h	2007-10-08 08:33:40.000000000 -0500
@@ -8,6 +8,10 @@
 #define PF_INET_SDP AF_INET_SDP
 #endif
 
+#ifndef SDP_ZCOPY_THRESH
+#define SDP_ZCOPY_THRESH 80
+#endif
+
 /* TODO: AF_INET6_SDP ? */
 
 #endif


More information about the general mailing list