[openib-general] [PATCH applied] sdp zcopy support for send_msg/recv_msd

Michael S. Tsirkin mst at mellanox.co.il
Thu Dec 1 05:47:08 PST 2005


I have added zcopy option to trunk.
With this enabled I am getting good bandwidth with
multiple sockets, but typically worse that bcopy bandwidth for
a single socket.

swlab155:~ # ( export SIMPLE_LIBSDP=1 ; export LD_PRELOAD=/usr/local/lib/libsdp.so; iperf -c 11.4.8.156 -P 4 -l 64000 -f M )
------------------------------------------------------------
Client connecting to 11.4.8.156, TCP port 5001
TCP window size: 0.11 MByte (default)
------------------------------------------------------------
[  7] local 11.4.8.155 port 32812 connected with 11.4.8.156 port 5001
[  5] local 11.4.8.155 port 32810 connected with 11.4.8.156 port 5001
[  8] local 11.4.8.155 port 32813 connected with 11.4.8.156 port 5001
[  6] local 11.4.8.155 port 32811 connected with 11.4.8.156 port 5001
[  7]  0.0-10.0 sec  2309 MBytes    231 MBytes/sec
[  5]  0.0-10.0 sec  2309 MBytes    231 MBytes/sec
[  8]  0.0-10.0 sec  2309 MBytes    231 MBytes/sec
[  6]  0.0-10.0 sec  2309 MBytes    231 MBytes/sec
[SUM]  0.0-10.0 sec  9235 MBytes    924 MBytes/sec
swlab155:~ # ( export SIMPLE_LIBSDP=1 ; export LD_PRELOAD=/usr/local/lib/libsdp.so; iperf -c 11.4.8.156 -P 2 -l 64000 -f M )
------------------------------------------------------------
Client connecting to 11.4.8.156, TCP port 5001
TCP window size: 0.11 MByte (default)
------------------------------------------------------------
[  5] local 11.4.8.155 port 32814 connected with 11.4.8.156 port 5001
[  6] local 11.4.8.155 port 32815 connected with 11.4.8.156 port 5001
[  5]  0.0-10.0 sec  4233 MBytes    423 MBytes/sec
[  6]  0.0-10.0 sec  4233 MBytes    423 MBytes/sec
[SUM]  0.0-10.0 sec  8466 MBytes    847 MBytes/sec
swlab155:~ # ( export SIMPLE_LIBSDP=1 ; export LD_PRELOAD=/usr/local/lib/libsdp.so; iperf -c 11.4.8.156 -l 64000 -f M )
------------------------------------------------------------
Client connecting to 11.4.8.156, TCP port 5001
TCP window size: 0.11 MByte (default)
------------------------------------------------------------
[  5] local 11.4.8.155 port 32816 connected with 11.4.8.156 port 5001
[  5]  0.0-10.0 sec  5092 MBytes    509 MBytes/sec


---

Add zero copy support to synchronous socket operations (send_msg/recv_msg).

Signed-off-by: Michael S. Tsirkin <mst at mellanox.co.il>

Index: linux-2.6.14/drivers/infiniband/ulp/sdp/Kconfig
===================================================================
--- linux-2.6.14/drivers/infiniband/ulp/sdp/Kconfig	(revision 4198)
+++ linux-2.6.14/drivers/infiniband/ulp/sdp/Kconfig	(working copy)
@@ -8,6 +8,20 @@
           libsdp library from <http://openib.org> to have standard
           sockets applications use SDP.
 
+config INFINIBAND_SDP_SEND_ZCOPY
+	bool "Sockets Direct Protocol Zero Copy Send support"
+	depends on INFINIBAND_SDP
+	default n
+	---help---
+	  This option enables Zero Copy support for send_msg transactions.
+
+config INFINIBAND_SDP_RECV_ZCOPY
+	bool "Sockets Direct Protocol Zero Copy Receive support"
+	depends on INFINIBAND_SDP && INFINIBAND_SDP_SEND_ZCOPY
+	default n
+	---help---
+	  This option enables Zero Copy support for recv_msg transactions.
+
 config INFINIBAND_SDP_DEBUG
 	bool "Sockets Direct Protocol debugging"
 	depends on INFINIBAND_SDP
Index: linux-2.6.14/drivers/infiniband/ulp/sdp/sdp_rcvd.c
===================================================================
--- linux-2.6.14/drivers/infiniband/ulp/sdp/sdp_rcvd.c	(revision 4198)
+++ linux-2.6.14/drivers/infiniband/ulp/sdp/sdp_rcvd.c	(working copy)
@@ -439,6 +439,11 @@
 
 		sdp_advt_destroy(advt);
 	}
+
+#ifdef CONFIG_INFINIBAND_SDP_RECV_ZCOPY
+	/* There are no more src_avail, wake up any waiting thread */
+	sdp_iocb_q_wakeup_complete(&conn->r_pend);
+#endif
 	/*
 	 * If there are active reads, mark the connection as being in
 	 * source cancel. Otherwise
Index: linux-2.6.14/drivers/infiniband/ulp/sdp/sdp_sock.h
===================================================================
--- linux-2.6.14/drivers/infiniband/ulp/sdp/sdp_sock.h	(revision 4198)
+++ linux-2.6.14/drivers/infiniband/ulp/sdp/sdp_sock.h	(working copy)
@@ -61,7 +61,9 @@
 #define SDP_ZCOPY_THRSH_SRC  257 /* Threshold for AIO write advertisments */
 #define SDP_ZCOPY_THRSH_SNK  258 /* Threshold for AIO read advertisments */
 #define SDP_ZCOPY_THRSH      256 /* Convenience for read and write */
-
+#ifdef CONFIG_INFINIBAND_SDP_SEND_ZCOPY
+#define SDP_ZCOPY_CANCEL_TIMEOUT (HZ * 60) /* Time before abortive close */
+#endif
 /*
  * Default values for SDP specific socket options. (for reference)
  */
Index: linux-2.6.14/drivers/infiniband/ulp/sdp/sdp_proto.h
===================================================================
--- linux-2.6.14/drivers/infiniband/ulp/sdp/sdp_proto.h	(revision 4198)
+++ linux-2.6.14/drivers/infiniband/ulp/sdp/sdp_proto.h	(working copy)
@@ -152,7 +152,13 @@
 void sdp_iocb_q_put_tail(struct sdpc_iocb_q *table, struct sdpc_iocb *iocb);
 
 struct sdpc_iocb *sdp_iocb_q_lookup(struct sdpc_iocb_q *table, u32 key);
+struct sdpc_iocb *sdp_iocb_q_lookup_req(struct sdpc_iocb_q *table, struct kiocb *req);
+struct sdpc_iocb *sdp_iocb_q_lookup_complete(struct sdpc_iocb_q *table, struct kiocb *req);
 
+struct sdpc_iocb *sdp_iocb_q_wakeup_complete(struct sdpc_iocb_q *table);
+
+void sdp_iocb_q_mark_cancel(struct sdpc_iocb_q *table, struct kiocb *req);
+
 void sdp_iocb_q_cancel(struct sdpc_iocb_q *table, u32 mask, ssize_t comp);
 
 void sdp_iocb_q_remove(struct sdpc_iocb *iocb);
@@ -197,6 +203,8 @@
 						  void *arg),
 				    void *arg);
 
+int sdp_iocb_find_req(struct sdpc_desc *element, void *arg);
+
 int sdp_desc_q_types_size(struct sdpc_desc_q *table,
 			  enum sdp_desc_type type);
 
Index: linux-2.6.14/drivers/infiniband/ulp/sdp/sdp_read.c
===================================================================
--- linux-2.6.14/drivers/infiniband/ulp/sdp/sdp_read.c	(revision 4198)
+++ linux-2.6.14/drivers/infiniband/ulp/sdp/sdp_read.c	(working copy)
@@ -93,6 +93,12 @@
 		}
 	}
 
+#ifdef CONFIG_INFINIBAND_SDP_RECV_ZCOPY
+	/* If there are no more src_avail, wake up any waiting thread */
+	if (!conn->src_recv)
+		sdp_iocb_q_wakeup_complete(&conn->r_pend);
+
+#endif
 done:
 	return 0;
 error:
@@ -222,14 +228,23 @@
 
 		iocb->flags &= ~(SDP_IOCB_F_ACTIVE | SDP_IOCB_F_RDMA_R);
 
-		if (sk_sdp(conn)->sk_rcvlowat > iocb->post)
-			break;
+#ifdef CONFIG_INFINIBAND_SDP_RECV_ZCOPY
+		if (!iocb->len || (!conn->src_recv && iocb->post >= iocb->lowat))
+#else
+		if (iocb->post >= iocb->lowat)
+#endif
+		{
+			/*
+			 * complete IOCB
+			 */
+			SDP_CONN_STAT_READ_INC(conn, iocb->post);
+			SDP_CONN_STAT_RQ_DEC(conn, iocb->size);
+			/*
+			 * callback to complete IOCB
+			 */
+			sdp_iocb_complete(sdp_iocb_q_get_head(&conn->r_pend), 0);
+		}
 
-		SDP_CONN_STAT_READ_INC(conn, iocb->post);
-		SDP_CONN_STAT_RQ_DEC(conn, iocb->size);
-
-		sdp_iocb_complete(sdp_iocb_q_get_head(&conn->r_pend), 0);
-
 		break;
 	default:
 		sdp_warn("Unknown type <%d> at head of READ SRC queue. <%d>",
Index: linux-2.6.14/drivers/infiniband/ulp/sdp/sdp_send.c
===================================================================
--- linux-2.6.14/drivers/infiniband/ulp/sdp/sdp_send.c	(revision 4198)
+++ linux-2.6.14/drivers/infiniband/ulp/sdp/sdp_send.c	(working copy)
@@ -122,6 +122,10 @@
 		send_param.send_flags |= IB_SEND_SIGNALED;
 		conn->send_cons = 0;
 	}
+
+	if (buff->bsdh_hdr->mid == SDP_MID_SRC_CANCEL)
+		sdp_dbg_ctrl(conn, "SRC_CANCEL bsdh_hdr->seq_num = %d conn->send_seq=%d\n",
+			     buff->bsdh_hdr->seq_num, conn->send_seq);
 	/*
 	 * post send
 	 */
@@ -1680,8 +1684,8 @@
 static int sdp_inet_write_cancel(struct kiocb *req, struct io_event *ev)
 {
 	struct sock_iocb *si = kiocb_to_siocb(req);
-	struct sdp_sock   *conn;
 	struct sdpc_iocb *iocb;
+	struct sdp_sock *conn;
 	int result = 0;
 
 	sdp_dbg_ctrl(NULL, "Cancel Write IOCB user <%d> key <%d> flag <%08lx>",
@@ -1810,7 +1813,151 @@
 	return result;
 }
 
+#ifdef CONFIG_INFINIBAND_SDP_SEND_ZCOPY
+static int sdp_write_src_cancel(struct sdpc_desc *element, void *arg)
+{
+	struct sdpc_iocb *iocb = (struct sdpc_iocb *) element;
+	struct kiocb *req = (struct kiocb *)arg;
+
+	if (element->type == SDP_DESC_TYPE_IOCB && iocb->req == req)
+		iocb->flags |= SDP_IOCB_F_CANCEL;
+	return -ERANGE;
+}
+
+static int sdp_req_busy(struct sdp_sock *conn, struct sdpc_iocb_wait *wait)
+{
+	unsigned long flags;
+	int result = -EAGAIN;
+
+	sdp_conn_lock(conn);
+	sdp_conn_unlock(conn);
+
+	spin_lock_irqsave(&wait->lock, flags);
+	if (!wait->outstanding)
+		result = 0;
+	spin_unlock_irqrestore(&wait->lock, flags);
+	return result;
+}
 /*
+ * sdp_write_cancel - cancel a synchronous IO operation
+ */
+static int sdp_write_cancel(struct kiocb *req, struct sdp_sock *conn,
+			    struct sdpc_iocb_wait *wait)
+{
+	struct sdpc_iocb *iocb;
+	int result = 0;
+
+	sdp_dbg_ctrl(NULL, "Cancel Write IOCB user <%d> key <%d> flag <%08lx>",
+		     req->ki_users, req->ki_key, req->ki_flags);
+
+	sdp_conn_lock(conn);
+
+	sdp_dbg_ctrl(conn, "Cancel Write IOCB. <%08x:%04x> <%08x:%04x>",
+		     conn->src_addr, conn->src_port,
+		     conn->dst_addr, conn->dst_port);
+	/*
+	 * attempt to find the IOCB for this key. we don't have an indication
+	 * whether this is a read or write.
+	 */
+
+	while ((iocb = (struct sdpc_iocb *)
+	       sdp_desc_q_lookup(&conn->send_queue, sdp_iocb_find_req, req))) {
+		iocb->flags |= SDP_IOCB_F_CANCEL;
+
+		/*
+		 * always remove the IOCB.
+		 * If active, then place it into the correct active queue
+		 */
+		sdp_desc_q_remove((struct sdpc_desc *)iocb);
+
+		if (iocb->flags & SDP_IOCB_F_ACTIVE) {
+			if (iocb->flags & SDP_IOCB_F_RDMA_W)
+				sdp_desc_q_put_tail(&conn->w_snk,
+						    (struct sdpc_desc *)iocb);
+			else {
+				SDP_EXPECT((iocb->flags & SDP_IOCB_F_RDMA_R));
+
+				sdp_iocb_q_put_tail(&conn->w_src, iocb);
+			}
+		} else {
+			/*
+			 * empty IOCBs can be deleted, while partials
+			 * needs to be compelted.
+			 */
+			if (iocb->post > 0) {
+				sdp_iocb_complete(iocb, 0);
+				result = -EAGAIN;
+			} else {
+				sdp_iocb_destroy(iocb);
+
+				/*
+				 * completion reference
+				 */
+				if (!iocb->wait)
+					aio_put_req(iocb->req);
+				else {
+					unsigned long flags;
+					spin_lock_irqsave(&iocb->wait->lock, flags);
+					--iocb->wait->outstanding;
+					/* No need to wake up,
+					   since we call sdp_req_busy
+					   directly below */
+
+					spin_unlock_irqrestore(&iocb->wait->lock, flags);
+				}
+			}
+		}
+	}
+
+	/*
+	 * check the sink queue, not much to do, since the operation is
+	 * already in flight.
+	 */
+	sdp_desc_q_lookup(&conn->w_snk, sdp_write_src_cancel, req);
+
+	iocb = (struct sdpc_iocb *)sdp_desc_q_lookup(&conn->w_snk,
+						     sdp_iocb_find_req,
+						     req);
+	if (iocb) {
+		sdp_dbg_ctrl(conn, "Sink Queue busy\n");
+		result = -EAGAIN;
+	}
+
+	/*
+	 * check source queue. If we're in the source queue, then a cancel
+	 * needs to be issued.
+	 */
+	sdp_iocb_q_mark_cancel(&conn->w_src, req);
+
+	iocb = sdp_iocb_q_lookup_req(&conn->w_src, req);
+	if (iocb) {
+		sdp_dbg_ctrl(conn, "Sending Src Cancel\n");
+
+		if (! (conn->flags & SDP_CONN_F_SRC_CANCEL_L)) {
+			sdp_desc_q_lookup(&conn->w_snk, sdp_write_src_cancel, req);
+			conn->flags |= SDP_CONN_F_SRC_CANCEL_L;
+			result = sdp_send_ctrl_src_cancel(conn);
+			SDP_EXPECT(result >= 0);
+		}
+
+		result = -EAGAIN;
+	}
+
+	if (!result) {
+		/*
+		 * no IOCB found. Assume the IOCB will be completed.
+		 */
+		sdp_dbg_ctrl(conn, "Cancel IOCB done. <%d:%d:%08lx>",
+			     req->ki_users, req->ki_key, req->ki_flags);
+	}
+
+	sdp_conn_unlock(conn);
+
+	return sdp_req_busy(conn, wait);
+}
+#endif
+
+/*
  * sdp_send_flush_advt - Flush passive sink advertisments
  */
 static int sdp_send_flush_advt(struct sdp_sock *conn)
@@ -1987,7 +2134,7 @@
 	return timeout;
 }
 
-static inline int sdp_queue_iocb(struct kiocb *req, struct sdp_sock *conn,
+static inline int sdp_queue_aio(struct kiocb *req, struct sdp_sock *conn,
 				 struct msghdr *msg, size_t size,
 				 size_t *copied)
 {
@@ -2038,14 +2185,79 @@
 	return -EIOCBQUEUED;
 }
 
+#ifdef CONFIG_INFINIBAND_SDP_SEND_ZCOPY
+static inline int sdp_queue_sync(struct kiocb *req, struct sdp_sock *conn,
+				 struct msghdr *msg, size_t size,
+				 size_t *copied,
+				 struct sdpc_iocb_wait *wait)
+{
+	struct sdpc_iocb *iocb;
+	struct iovec *msg_iov;
+	unsigned long flags;
+	size_t len;
+	int result;
+	/*
+	 * create IOCB with remaining space
+	 */
+	iocb = sdp_iocb_create();
+	if (!iocb) {
+		sdp_dbg_warn(conn, "Failed to allocate IOCB <%Zu:%ld>",
+			     size, (long)*copied);
+		return -ENOMEM;
+	}
+
+	for (msg_iov = msg->msg_iov; !msg_iov->iov_len; ++msg_iov);
+
+	/* FMR alignment can add an extra page. */
+	len = min(msg_iov->iov_len, (size_t)SDP_IOCB_SIZE_MAX - 4096);
+	iocb->len  = len;
+	iocb->post = 0;
+	iocb->size = len;
+	iocb->req  = req;
+	iocb->key  = req->ki_key;
+	iocb->addr = (unsigned long)msg_iov->iov_base;
+	iocb->wait = wait;
+
+	result = sdp_iocb_lock(iocb);
+	if (result < 0) {
+		sdp_dbg_warn(conn, "Error <%d> locking IOCB <%Zu:%ld>",
+			     result, size, (long)copied);
+
+		sdp_iocb_destroy(iocb);
+		return result;
+	}
+
+	SDP_CONN_STAT_WQ_INC(conn, iocb->size);
+
+	result = sdp_send_data_queue(conn, (struct sdpc_desc *)iocb);
+	if (result < 0) {
+		sdp_dbg_warn(conn, "Error <%d> queueing write IOCB", result);
+		sdp_iocb_destroy(iocb);
+		return result;
+	}
+
+	spin_lock_irqsave(&wait->lock, flags);
+	++wait->outstanding;
+	spin_unlock_irqrestore(&wait->lock, flags);
+
+	conn->send_pipe += len;
+	*copied += len; /* copied amount was saved in IOCB. */
+	msg_iov->iov_len -= len;
+	msg_iov->iov_base += len;
+	return 0;
+}
+#endif
 /*
  * sdp_inet_send - send data from user space to the network
  */
 int sdp_inet_send(struct kiocb *req, struct socket *sock, struct msghdr *msg,
 		  size_t size)
 {
-	struct sock      *sk;
-	struct sdp_sock   *conn;
+#ifdef CONFIG_INFINIBAND_SDP_SEND_ZCOPY
+	struct sdpc_iocb_wait wait;
+#endif
+	struct sock *sk;
+	struct sdp_sock *conn;
 	int result = 0;
 	size_t copied = 0;
 	int oob, zcopy;
@@ -2074,6 +2286,7 @@
 	if (conn->state == SDP_CONN_ST_LISTEN ||
 	    conn->state == SDP_CONN_ST_CLOSED) {
 		result = -ENOTCONN;
+		sdp_conn_unlock(conn);
 		goto done;
 	}
 	/*
@@ -2082,13 +2295,24 @@
 	 * they are smaller then the zopy threshold, but only if there is
 	 * no buffer write space.
 	 */
-	zcopy = (size >= conn->src_zthresh && !is_sync_kiocb(req));
+	zcopy = (size >= conn->src_zthresh && (!is_sync_kiocb(req)
+#ifdef CONFIG_INFINIBAND_SDP_SEND_ZCOPY
+		 || (!(msg->msg_flags & MSG_DONTWAIT) && !oob)
+#endif
+		));
 
 	/*
 	 * clear ASYN space bit, it'll be reset if there is no space.
 	 */
 	if (!zcopy)
 		clear_bit(SOCK_ASYNC_NOSPACE, &sk->sk_socket->flags);
+#ifdef CONFIG_INFINIBAND_SDP_SEND_ZCOPY
+	else if (is_sync_kiocb(req)) {
+		init_waitqueue_head(&wait.wait);
+		spin_lock_init(&wait.lock);
+		wait.outstanding = 0;
+	}
+#endif
 	/*
 	 * process data first if window is open, next check conditions, then
 	 * wait if there is more work to be done. The absolute window size is
@@ -2143,14 +2367,45 @@
 		 * completion. Wait on sync IO call create IOCB for async
 		 * call.
 		 */
+#ifdef CONFIG_INFINIBAND_SDP_SEND_ZCOPY
+		if (is_sync_kiocb(req) && zcopy)
+			result = sdp_queue_sync(req, conn, msg, size, &copied,
+						&wait);
+			/* TODO: limit the # of outstanding reqs */
+			/* TODO: sleep on recoverable errors */
+		else
+#endif
 		if (is_sync_kiocb(req))
 			timeout = sdp_wait_till_space(sk, conn, oob, timeout);
 		else
-			result = sdp_queue_iocb(req, conn, msg, size, &copied);
+			result = sdp_queue_aio(req, conn, msg, size, &copied);
 	}
 
+	sdp_conn_unlock(conn);
+
+#ifdef CONFIG_INFINIBAND_SDP_SEND_ZCOPY
+	if (!result && is_sync_kiocb(req) && zcopy) {
+		timeout = wait_event_interruptible_timeout(wait.wait,
+					   !sdp_req_busy(conn, &wait), timeout);
+		if (!timeout)
+			result = -EAGAIN;
+	}
+
+	if (signal_pending(current) && is_sync_kiocb(req) && zcopy) {
+		result = (timeout > 0) ? sock_intr_errno(timeout) : -EAGAIN;
+
+		timeout = wait_event_timeout(wait.wait,
+				!sdp_write_cancel(req, conn, &wait),
+				SDP_ZCOPY_CANCEL_TIMEOUT);
+		if (!timeout) {
+			sdp_warn("sdp_write_cancel timed out. Abort.\n");
+			sdp_conn_lock(conn);
+			sdp_conn_abort(conn);
+			sdp_conn_unlock(conn);
+		}
+	}
+#endif
 done:
-	sdp_conn_unlock(conn);
 	result = ((copied > 0) ? copied : result);
 
 	if (result == -EPIPE && !(msg->msg_flags & MSG_NOSIGNAL))
Index: linux-2.6.14/drivers/infiniband/ulp/sdp/sdp_recv.c
===================================================================
--- linux-2.6.14/drivers/infiniband/ulp/sdp/sdp_recv.c	(revision 4198)
+++ linux-2.6.14/drivers/infiniband/ulp/sdp/sdp_recv.c	(working copy)
@@ -327,6 +327,10 @@
 	iocb = sdp_iocb_q_look(&conn->r_pend);
 	if (!iocb)
 		return ENODEV;
+#ifdef CONFIG_INFINIBAND_SDP_RECV_ZCOPY
+	if (iocb->flags & SDP_IOCB_F_RO)
+		return ENODEV;
+#endif
 	/*
 	 * check zcopy threshold
 	 */
@@ -414,7 +418,11 @@
 		 * loop posting RDMA reads, if there is room.
 		 */
 		if (!sdp_iocb_q_size(&conn->r_pend))
-			while (sdp_advt_q_size(&conn->src_pend) > 0 &&
+			while(
+#ifdef CONFIG_INFINIBAND_SDP_RECV_ZCOPY
+			      !sdp_desc_q_size(&conn->r_src) &&
+#endif
+			      sdp_advt_q_size(&conn->src_pend) > 0 &&
 			       conn->recv_max >
 			       sdp_buff_q_size(&conn->recv_pool) &&
 			       conn->rwin_max > conn->byte_strm) {
@@ -706,9 +714,8 @@
 	 *    b) the amount of data moved into the IOCB is greater then the
 	 *       socket recv low water mark.
 	 */
-	if (!iocb->len ||
-	    (!conn->src_recv &&
-	     !(sk_sdp(conn)->sk_rcvlowat > iocb->post))) {
+	if (!iocb->len || (!conn->src_recv && iocb->post >= iocb->lowat))
+	{
 		/*
 		 * complete IOCB
 		 */
@@ -1055,7 +1062,151 @@
 	return result;
 }
 
+#ifdef CONFIG_INFINIBAND_SDP_RECV_ZCOPY
+static int sdp_req_busy(struct kiocb *req, struct sdp_sock *conn,
+			struct sdpc_iocb_wait *wait, size_t *copied)
+{
+	struct sdpc_iocb *iocb;
+	unsigned long flags;
+	int result = -EAGAIN;
+	int lowat_reached = 0;
+
+	sdp_conn_lock(conn);
+	/* Unlock polls cqs */
+	sdp_conn_unlock(conn);
+
+	for (;;) {
+		spin_lock_irqsave(&wait->lock, flags);
+		iocb = sdp_iocb_q_get_head(&wait->q);
+		if (!iocb)
+			break;
+		--wait->outstanding;
+
+		if (iocb->post >= iocb->lowat)
+			wait->lowat_reached = 1;
+
+		lowat_reached = wait->lowat_reached;
+
+		spin_unlock_irqrestore(&wait->lock, flags);
+
+		*copied -= iocb->len;
+		sdp_iocb_release(iocb);
+		sdp_iocb_unlock(iocb);
+		sdp_iocb_destroy(iocb);
+	}
+
+	if (!wait->outstanding)
+		result = 0;
+
+	spin_unlock_irqrestore(&wait->lock, flags);
+
+	/* Remove any outstanding iocbs which have their low watermark
+	   satisfied */
+	if (lowat_reached && result) {
+		sdp_conn_lock(conn);
+		if (!conn->src_recv)
+			while ((iocb = sdp_iocb_q_lookup_complete(&conn->r_pend,
+								  req))) {
+				sdp_iocb_q_remove(iocb);
+				SDP_CONN_STAT_READ_INC(conn, iocb->post);
+				SDP_CONN_STAT_RQ_DEC(conn, iocb->size);
+				sdp_iocb_complete(iocb, 0);
+			}
+		sdp_conn_unlock(conn);
+	}
+
+	return result;
+}
+
 /*
+ * sdp_read_cancel - cancel a synchronous IO operation
+ */
+static int sdp_read_cancel(struct kiocb *req, struct sdp_sock *conn,
+			   struct sdpc_iocb_wait *wait, size_t *copied)
+{
+	struct sdpc_iocb *iocb;
+	sdp_dbg_ctrl(NULL, "Cancel Read IOCBs. user <%d> req <%p> flag <%08lx>",
+		     req->ki_users, req, req->ki_flags);
+
+	sdp_conn_lock(conn);
+
+	sdp_dbg_ctrl(conn, "Cancel Read IOCBs. <%08x:%04x> <%08x:%04x>",
+		     conn->src_addr, conn->src_port,
+		     conn->dst_addr, conn->dst_port);
+	/*
+	 * attempt to find the IOCB for this req.
+	 */
+	while ((iocb = sdp_iocb_q_lookup_req(&conn->r_pend, req))) {
+		/*
+		 * always remove the IOCB. If active, then place it into
+		 * the correct active queue. Inactive empty IOCBs can be
+		 * deleted, while inactive partials needs to be compelted.
+		 */
+		sdp_iocb_q_remove(iocb);
+
+		if (!(iocb->flags & SDP_IOCB_F_ACTIVE)) {
+			sdp_iocb_complete(iocb, 0);
+			goto unlock;
+		}
+
+		if (iocb->flags & SDP_IOCB_F_RDMA_W)
+			sdp_iocb_q_put_tail(&conn->r_snk, iocb);
+		else {
+			SDP_EXPECT((iocb->flags & SDP_IOCB_F_RDMA_R));
+
+			sdp_desc_q_put_tail(&conn->r_src,
+					    (struct sdpc_desc *)iocb);
+		}
+	}
+	/*
+	 * check the source queue, not much to do, since the operation is
+	 * already in flight.
+	 */
+	iocb = (struct sdpc_iocb *)sdp_desc_q_lookup(&conn->r_src,
+						     sdp_iocb_find_req, req);
+	if (iocb) {
+		iocb->flags |= SDP_IOCB_F_CANCEL;
+		goto unlock;
+	}
+	/*
+	 * check sink queue. If we're in the sink queue, then a cancel
+	 * needs to be issued.
+	 */
+	iocb = sdp_iocb_q_lookup_req(&conn->r_snk, req);
+	if (iocb) {
+		/*
+		 * Unfortunetly there is only a course grain cancel in SDP, so
+		 * we have to cancel everything.
+		 */
+		if (!(conn->flags & SDP_CONN_F_SNK_CANCEL)) {
+			int result;
+
+			result = sdp_send_ctrl_snk_cancel(conn);
+			SDP_EXPECT(result >= 0);
+
+			conn->flags |= SDP_CONN_F_SNK_CANCEL;
+		}
+
+		iocb->flags |= SDP_IOCB_F_CANCEL;
+
+		goto unlock;
+	}
+	/*
+	 * no IOCB found. The cancel is probably in a race with a completion.
+	 */
+	sdp_dbg_ctrl(NULL, "Cancel read with no IOCB. <%d:%d:%08lx>",
+		     req->ki_users, req->ki_key, req->ki_flags);
+
+
+unlock:
+	sdp_conn_unlock(conn);
+
+	return sdp_req_busy(req, conn, wait, copied);
+}
+
+#endif
+
+/*
  * sdp_inet_recv - recv data from the network to user space
  */
 int sdp_inet_recv(struct kiocb  *req, struct socket *sock, struct msghdr *msg,
@@ -1065,17 +1216,22 @@
 	struct sdp_sock   *conn;
 	struct sdpc_iocb *iocb;
 	struct sdpc_buff *buff;
-	long   timeout;
+	long   timeout = 0 /*Turn off compiler warning */;
 	size_t length;
 	int result = 0;
 	int expect;
 	int low_water;
-	int copied = 0;
+	size_t copied = 0;
 	int copy;
 	int update;
 	s8 oob = 0;
 	s8 ack = 0;
 	struct sdpc_buff_q peek_queue;
+#ifdef CONFIG_INFINIBAND_SDP_RECV_ZCOPY
+	int zcopy = 0;
+	struct sdpc_iocb_wait wait;
+	unsigned long f;
+#endif
 
 	sk = sock->sk;
 	conn = sdp_sk(sk);
@@ -1293,6 +1449,80 @@
 		/*
 		 * Either wait or create IOCB for defered completion.
 		 */
+#ifdef CONFIG_INFINIBAND_SDP_RECV_ZCOPY
+		if (is_sync_kiocb(req) && !(flags & MSG_PEEK) &&
+			(zcopy || size - copied >= conn->snk_zthresh)
+			/* && (conn->src_recv ||
+				 (low_water - copied >= conn->snk_zthresh)) */ ) {
+			struct iovec *msg_iov;
+			size_t len;
+			/*
+			 * create IOCB with remaining space
+			 */
+			iocb = sdp_iocb_create();
+			if (!iocb) {
+				sdp_dbg_warn(conn,
+					     "Error allocating IOCB <%Zu:%Zd>",
+					     size, copied);
+				result = -ENOMEM;
+				break;
+			}
+
+			for (msg_iov = msg->msg_iov; !msg_iov->iov_len; ++msg_iov);
+
+			/* FMR alignment can add an extra page. */
+			len = min(msg_iov->iov_len, (size_t)SDP_IOCB_SIZE_MAX - 4096);
+			iocb->len  = len;
+			iocb->post = 0;
+			iocb->size = len;
+			iocb->req  = req;
+			iocb->key  = req->ki_key;
+			iocb->addr = (unsigned long)msg_iov->iov_base;
+			if (copied >= low_water)
+				iocb->lowat = 0;
+			else
+				iocb->lowat = min_t(size_t, len, low_water - copied);
+			iocb->wait = &wait;
+
+			iocb->flags |= SDP_IOCB_F_RECV | SDP_IOCB_F_RO;
+
+			req->ki_cancel = sdp_inet_read_cancel;
+
+			result = sdp_iocb_lock(iocb);
+			if (result < 0) {
+				sdp_dbg_warn(conn,
+					     "Error <%d> IOCB lock <%Zu:%Zd>",
+					     result, size, copied);
+
+				sdp_iocb_destroy(iocb);
+				break;
+			}
+
+			SDP_CONN_STAT_RQ_INC(conn, iocb->size);
+
+			if (!zcopy) {
+				init_waitqueue_head(&wait.wait);
+				spin_lock_init(&wait.lock);
+				sdp_iocb_q_init(&wait.q);
+				wait.outstanding = 0;
+				wait.lowat_reached = copied >= low_water;
+				zcopy  = 1;
+			}
+
+			sdp_iocb_q_put_tail(&conn->r_pend, iocb);
+
+			spin_lock_irqsave(&wait.lock, f);
+			++wait.outstanding;
+			spin_unlock_irqrestore(&wait.lock, f);
+
+			/* TODO: set it? */
+			ack = 1;
+			copied += len;
+			msg_iov->iov_len -= len;
+			msg_iov->iov_base += len;
+			break;
+		} else
+#endif
 		if (is_sync_kiocb(req)) {
 			DECLARE_WAITQUEUE(wait, current);
 
@@ -1325,7 +1555,7 @@
 			iocb = sdp_iocb_create();
 			if (!iocb) {
 				sdp_dbg_warn(conn,
-					     "Error allocating IOCB <%Zu:%d>",
+					     "Error allocating IOCB <%Zu:%Zd>",
 					     size, copied);
 				result = -ENOMEM;
 				break;
@@ -1338,7 +1568,7 @@
 			iocb->key  = req->ki_key;
 			iocb->addr = ((unsigned long)msg->msg_iov->iov_base -
 				      copied);
-
+			iocb->lowat = low_water;
 			iocb->flags |= SDP_IOCB_F_RECV;
 
 			req->ki_cancel = sdp_inet_read_cancel;
@@ -1346,7 +1576,7 @@
 			result = sdp_iocb_lock(iocb);
 			if (result < 0) {
 				sdp_dbg_warn(conn,
-					     "Error <%d> IOCB lock <%Zu:%d>",
+					     "Error <%d> IOCB lock <%Zu:%Zd>",
 					     result, size, copied);
 
 				sdp_iocb_destroy(iocb);
@@ -1383,5 +1613,28 @@
 			sdp_buff_q_put_head(&conn->recv_pool, buff);
 
 	sdp_conn_unlock(conn);
+#ifdef CONFIG_INFINIBAND_SDP_RECV_ZCOPY
+	if (!result && is_sync_kiocb(req) && zcopy) {
+		timeout = wait_event_interruptible_timeout(wait.wait,
+			   !sdp_req_busy(req, conn, &wait, &copied), timeout);
+		if (!timeout)
+			result = -EAGAIN;
+	}
+
+	if (signal_pending(current) && is_sync_kiocb(req) && zcopy) {
+		result = (timeout > 0) ? sock_intr_errno(timeout) : -EAGAIN;
+
+		timeout = wait_event_timeout(wait.wait,
+				!sdp_read_cancel(req, conn, &wait, &copied),
+				SDP_ZCOPY_CANCEL_TIMEOUT);
+		if (!timeout) {
+			sdp_warn("sdp_read_cancel timed out. Abort.\n");
+			sdp_conn_lock(conn);
+			sdp_conn_abort(conn);
+			sdp_conn_unlock(conn);
+		}
+	}
+#endif
+
 	return ((copied > 0) ? copied : result);
 }
Index: linux-2.6.14/drivers/infiniband/ulp/sdp/sdp_iocb.c
===================================================================
--- linux-2.6.14/drivers/infiniband/ulp/sdp/sdp_iocb.c	(revision 4198)
+++ linux-2.6.14/drivers/infiniband/ulp/sdp/sdp_iocb.c	(working copy)
@@ -317,12 +317,23 @@
 	sdp_dbg_data(NULL, "IOCB complete. <%d:%d:%08lx> value <%ld>",
 		     iocb->req->ki_users, iocb->req->ki_key,
 		     iocb->req->ki_flags, value);
+
+#ifdef CONFIG_INFINIBAND_SDP_SEND_ZCOPY
+	if (iocb->wait) {
+		unsigned long flags;
+		spin_lock_irqsave(&iocb->wait->lock, flags);
+		if (!--iocb->wait->outstanding) {
+			wake_up(&iocb->wait->wait);
+		}
+		spin_unlock_irqrestore(&iocb->wait->lock, flags);
+	} else
+#endif
+		/*
+		* valid result can be 0 or 1 for complete so
+		* we ignore the value.
+		*/
+		(void)aio_complete(iocb->req, value, 0);
 	/*
-	 * valid result can be 0 or 1 for complete so
-	 * we ignore the value.
-	 */
-	(void)aio_complete(iocb->req, value, 0);
-	/*
 	 * delete IOCB
 	 */
 	sdp_iocb_destroy(iocb);
@@ -335,7 +346,19 @@
 {
 	iocb->status = status;
 
-	if (in_atomic() || irqs_disabled()) {
+#ifdef CONFIG_INFINIBAND_SDP_RECV_ZCOPY
+	if ((iocb->flags & SDP_IOCB_F_RECV) && iocb->wait) {
+		unsigned long flags;
+		spin_lock_irqsave(&iocb->wait->lock, flags);
+		sdp_iocb_q_put_tail(&iocb->wait->q, iocb);
+		/* Possible optimization: only wake
+		   if no more outstanding iocbs or low watermark reached */
+		wake_up(&iocb->wait->wait);
+		spin_unlock_irqrestore(&iocb->wait->lock, flags);
+	} else
+#endif
+	if ((iocb->flags & SDP_IOCB_F_RECV) &&
+	    (in_atomic() || irqs_disabled())) {
 		INIT_WORK(&iocb->completion, do_iocb_complete, (void *)iocb);
 		schedule_work(&iocb->completion);
 	} else
@@ -392,6 +415,75 @@
 	return NULL;
 }
 
+#ifdef CONFIG_INFINIBAND_SDP_SEND_ZCOPY
+struct sdpc_iocb *sdp_iocb_q_lookup_req(struct sdpc_iocb_q *table, struct kiocb *req)
+{
+	struct sdpc_iocb *iocb;
+	int counter;
+
+	for (counter = 0, iocb = table->head; counter < table->size;
+	     counter++, iocb = iocb->next)
+		if (iocb->req == req)
+			return iocb;
+
+	return NULL;
+}
+
+void sdp_iocb_q_mark_cancel(struct sdpc_iocb_q *table, struct kiocb *req)
+{
+	struct sdpc_iocb *iocb = NULL;
+	int counter;
+
+	for (counter = 0, iocb = table->head; counter < table->size;
+	     counter++, iocb = iocb->next)
+		if (iocb->req == req)
+			iocb->flags |= SDP_IOCB_F_CANCEL;
+
+}
+
+int sdp_iocb_find_req(struct sdpc_desc *element, void *arg)
+{
+	struct sdpc_iocb *iocb = (struct sdpc_iocb *) element;
+	struct kiocb *req = (struct kiocb *)arg;
+
+	if (element->type == SDP_DESC_TYPE_IOCB && iocb->req == req)
+		return 0;
+	return -ERANGE;
+}
+#endif
+
+#ifdef CONFIG_INFINIBAND_SDP_RECV_ZCOPY
+struct sdpc_iocb *sdp_iocb_q_lookup_complete(struct sdpc_iocb_q *table, struct kiocb *req)
+{
+	struct sdpc_iocb *iocb;
+	int counter;
+
+	for (counter = 0, iocb = table->head; counter < table->size;
+	     counter++, iocb = iocb->next)
+		if (iocb->req == req && iocb->post >= iocb->lowat)
+			return iocb;
+
+	return NULL;
+}
+struct sdpc_iocb *sdp_iocb_q_wakeup_complete(struct sdpc_iocb_q *table)
+{
+	struct sdpc_iocb *iocb;
+	unsigned long flags;
+	int counter;
+
+	for (counter = 0, iocb = table->head; counter < table->size;
+	     counter++, iocb = iocb->next)
+		if (iocb->wait && iocb->post >= iocb->lowat) {
+			spin_lock_irqsave(&iocb->wait->lock, flags);
+			iocb->wait->lowat_reached = 1;
+			spin_unlock_irqrestore(&iocb->wait->lock, flags);
+			wake_up(&iocb->wait->wait);
+		}
+
+	return NULL;
+}
+#endif
+
 /*
  * sdp_iocb_create - create an IOCB object
  */
Index: linux-2.6.14/drivers/infiniband/ulp/sdp/sdp_iocb.h
===================================================================
--- linux-2.6.14/drivers/infiniband/ulp/sdp/sdp_iocb.h	(revision 4198)
+++ linux-2.6.14/drivers/infiniband/ulp/sdp/sdp_iocb.h	(working copy)
@@ -55,6 +55,9 @@
 #define SDP_IOCB_F_LOCKED 0x00000040 /* IOCB is locked in memory */
 #define SDP_IOCB_F_REG    0x00000080 /* IOCB memory is registered */
 #define SDP_IOCB_F_RECV   0x00000100 /* IOCB is for a receive request */
+#ifdef CONFIG_INFINIBAND_SDP_RECV_ZCOPY
+#define SDP_IOCB_F_RO     0x00000200 /* Suppress SinkAvail for this IOCB */
+#endif
 #define SDP_IOCB_F_ALL    0xFFFFFFFF /* IOCB all mask */
 /*
  * zcopy constants.
@@ -66,10 +69,12 @@
  */
 #define sdp_iocb_q_size(table) ((table)->size)
 
+#ifdef CONFIG_INFINIBAND_SDP_SEND_ZCOPY
+struct sdpc_iocb_wait;
+#endif
 /*
  * INET read/write IOCBs
  */
-
 /*
  * save a kvec read/write for processing once data shows up.
  */
@@ -80,7 +85,7 @@
 	struct sdpc_iocb_q *table; /* table to which this iocb belongs */
 	void (*release)(struct sdpc_iocb *iocb); /* release the object */
 	/*
-	 * iocb sepcific
+	 * iocb specific
 	 */
 	int      flags;  /* usage flags */
 	/*
@@ -89,6 +94,7 @@
 	u32 key;    /* matches kiocb key for lookups */
 	int len;    /* space left in the user buffer */
 	int post;   /* amount of data requested so far. */
+	int lowat;  /* when to complete this IOCB (receive only). */
 	u64 wrid;   /* work request completing this IOCB */
 	ssize_t status; /* status of completed iocb */
 	/*
@@ -112,6 +118,9 @@
 	int           page_offset; /* offset into first page. */
 
 	struct work_struct completion; /* task for defered completion. */
+#ifdef CONFIG_INFINIBAND_SDP_SEND_ZCOPY
+	struct sdpc_iocb_wait *wait;
+#endif
 	/*
 	 * kernel iocb structure
 	 */
@@ -127,4 +136,17 @@
 	int size;               /* current number of IOCBs in table */
 };
 
+#ifdef CONFIG_INFINIBAND_SDP_SEND_ZCOPY
+/* Report completions here */
+struct sdpc_iocb_wait {
+	spinlock_t lock;
+	int outstanding;
+	wait_queue_head_t wait;
+#ifdef CONFIG_INFINIBAND_SDP_RECV_ZCOPY
+	struct sdpc_iocb_q q; /* Receive iocbs only */
+	int lowat_reached;
+#endif
+};
+
+#endif
 #endif /* _SDP_IOCB_H */



-- 
MST



More information about the general mailing list