[ofa-general] RDS flow control

Olaf Kirch olaf.kirch at oracle.com
Mon May 19 01:05:59 PDT 2008


> However, I'm still seeing performance degradation of ~5% with some packet
> sizes. And that is *just* the overhead from exchanging the credit information
> and checking it - at some point we need to take a spinlock, and that seems
> to delay things just enough to make a dent in my throughput graph.

Here's an updated version of the flow control patch - which is now completely
lockless, and uses a single atomic_t to hold both credit counters. This has
given me back close to full performance in my testing (throughput seems to be
down less than 1%, which is almost within the noise range).

I'll push it to my git tree a little later today, so folks can test it if
they like.

Olaf
-- 
Olaf Kirch  |  --- o --- Nous sommes du soleil we love when we play
okir at lst.de |    / | \   sol.dhoop.naytheet.ah kin.ir.samse.qurax
----
From: Olaf Kirch <olaf.kirch at oracle.com>
Subject: RDS: Implement IB flow control

Here it is - flow control for RDS/IB.

This patch is still very much experimental. Here's the essentials

 -	The approach chosen here uses a credit-based flow control
	mechanism. Every SEND WR (including ACKs) consumes one credit,
	and if the sender runs out of credits, it stalls.

 -	As new receive buffers are posted, credits are transferred to the
	remote node (using yet another RDS header byte for this).

 -	Flow control is negotiated during connection setup. Initial credits
 	are exchanged in the rds_ib_connect_private sruct - sending a value
	of zero (which is also the default for older protocol versions)
	means no flow control.

 -	We avoid deadlock (both nodes depleting their credits, and being
 	unable to inform the peer of newly posted buffers) by requiring
	that the last credit can only be used if we're posting new credits
	to the peer.

The approach implemented here is lock-free; preliminary tests show
the impact on throughput to be less than 1%, and the impact on RTT,
CPU, TX delay and other metrics to be below the noise threshold.

Flow control is configurable via sysctl. It only affects newly created
connections however - so your best bet is to set this right after loading
the RDS module.

Signed-off-by: Olaf Kirch <olaf.kirch at oracle.com>
---
 net/rds/ib.c        |    1 
 net/rds/ib.h        |   30 ++++++++
 net/rds/ib_cm.c     |   49 ++++++++++++-
 net/rds/ib_recv.c   |   48 +++++++++---
 net/rds/ib_send.c   |  194 ++++++++++++++++++++++++++++++++++++++++++++++++++++
 net/rds/ib_stats.c  |    3 
 net/rds/ib_sysctl.c |   10 ++
 net/rds/rds.h       |    4 -
 8 files changed, 325 insertions(+), 14 deletions(-)

Index: ofa_kernel-1.3/net/rds/ib.h
===================================================================
--- ofa_kernel-1.3.orig/net/rds/ib.h
+++ ofa_kernel-1.3/net/rds/ib.h
@@ -46,6 +46,7 @@ struct rds_ib_connect_private {
 	__be16			dp_protocol_minor_mask; /* bitmask */
 	__be32			dp_reserved1;
 	__be64			dp_ack_seq;
+	__be32			dp_credit;		/* non-zero enables flow ctl */
 };
 
 struct rds_ib_send_work {
@@ -110,15 +111,32 @@ struct rds_ib_connection {
 	struct ib_sge		i_ack_sge;
 	u64			i_ack_dma;
 	unsigned long		i_ack_queued;
+
+	/* Flow control related information
+	 *
+	 * Our algorithm uses a pair variables that we need to access
+	 * atomically - one for the send credits, and one posted
+	 * recv credits we need to transfer to remote.
+	 * Rather than protect them using a slow spinlock, we put both into
+	 * a single atomic_t and update it using cmpxchg
+	 */
+	atomic_t		i_credits;
  
 	/* Protocol version specific information */
 	unsigned int		i_hdr_idx;	/* 1 (old) or 0 (3.1 or later) */
+	unsigned int		i_flowctl : 1;	/* enable/disable flow ctl */
 
 	/* Batched completions */
 	unsigned int		i_unsignaled_wrs;
 	long			i_unsignaled_bytes;
 };
 
+/* This assumes that atomic_t is at least 32 bits */
+#define IB_GET_SEND_CREDITS(v)	((v) & 0xffff)
+#define IB_GET_POST_CREDITS(v)	((v) >> 16)
+#define IB_SET_SEND_CREDITS(v)	((v) & 0xffff)
+#define IB_SET_POST_CREDITS(v)	((v) << 16)
+
 struct rds_ib_ipaddr {
 	struct list_head	list;
 	__be32			ipaddr;
@@ -153,14 +171,17 @@ struct rds_ib_statistics {
 	unsigned long	s_ib_tx_cq_call;
 	unsigned long	s_ib_tx_cq_event;
 	unsigned long	s_ib_tx_ring_full;
+	unsigned long	s_ib_tx_throttle;
 	unsigned long	s_ib_tx_sg_mapping_failure;
 	unsigned long	s_ib_tx_stalled;
+	unsigned long	s_ib_tx_credit_updates;
 	unsigned long	s_ib_rx_cq_call;
 	unsigned long	s_ib_rx_cq_event;
 	unsigned long	s_ib_rx_ring_empty;
 	unsigned long	s_ib_rx_refill_from_cq;
 	unsigned long	s_ib_rx_refill_from_thread;
 	unsigned long	s_ib_rx_alloc_limit;
+	unsigned long	s_ib_rx_credit_updates;
 	unsigned long	s_ib_ack_sent;
 	unsigned long	s_ib_ack_send_failure;
 	unsigned long	s_ib_ack_send_delayed;
@@ -244,6 +265,8 @@ void rds_ib_flush_mrs(void);
 int __init rds_ib_recv_init(void);
 void rds_ib_recv_exit(void);
 int rds_ib_recv(struct rds_connection *conn);
+int rds_ib_recv_refill(struct rds_connection *conn, gfp_t kptr_gfp,
+		       gfp_t page_gfp, int prefill);
 void rds_ib_inc_purge(struct rds_incoming *inc);
 void rds_ib_inc_free(struct rds_incoming *inc);
 int rds_ib_inc_copy_to_user(struct rds_incoming *inc, struct iovec *iov,
@@ -252,6 +275,7 @@ void rds_ib_recv_cq_comp_handler(struct 
 void rds_ib_recv_init_ring(struct rds_ib_connection *ic);
 void rds_ib_recv_clear_ring(struct rds_ib_connection *ic);
 void rds_ib_recv_init_ack(struct rds_ib_connection *ic);
+void rds_ib_attempt_ack(struct rds_ib_connection *ic);
 void rds_ib_ack_send_complete(struct rds_ib_connection *ic);
 u64 rds_ib_piggyb_ack(struct rds_ib_connection *ic);
 
@@ -266,12 +290,17 @@ u32 rds_ib_ring_completed(struct rds_ib_
 extern wait_queue_head_t rds_ib_ring_empty_wait;
 
 /* ib_send.c */
+void rds_ib_xmit_complete(struct rds_connection *conn);
 int rds_ib_xmit(struct rds_connection *conn, struct rds_message *rm,
 	        unsigned int hdr_off, unsigned int sg, unsigned int off);
 void rds_ib_send_cq_comp_handler(struct ib_cq *cq, void *context);
 void rds_ib_send_init_ring(struct rds_ib_connection *ic);
 void rds_ib_send_clear_ring(struct rds_ib_connection *ic);
 int rds_ib_xmit_rdma(struct rds_connection *conn, struct rds_rdma_op *op);
+void rds_ib_send_add_credits(struct rds_connection *conn, unsigned int credits);
+void rds_ib_advertise_credits(struct rds_connection *conn, unsigned int posted);
+int rds_ib_send_grab_credits(struct rds_ib_connection *ic, u32 wanted,
+			     u32 *adv_credits);
 
 /* ib_stats.c */
 RDS_DECLARE_PER_CPU(struct rds_ib_statistics, rds_ib_stats);
@@ -287,6 +316,7 @@ extern unsigned long rds_ib_sysctl_max_r
 extern unsigned long rds_ib_sysctl_max_unsig_wrs;
 extern unsigned long rds_ib_sysctl_max_unsig_bytes;
 extern unsigned long rds_ib_sysctl_max_recv_allocation;
+extern unsigned int rds_ib_sysctl_flow_control;
 extern ctl_table rds_ib_sysctl_table[];
 
 /*
Index: ofa_kernel-1.3/net/rds/ib_cm.c
===================================================================
--- ofa_kernel-1.3.orig/net/rds/ib_cm.c
+++ ofa_kernel-1.3/net/rds/ib_cm.c
@@ -55,6 +55,22 @@ static void rds_ib_set_protocol(struct r
 }
 
 /*
+ * Set up flow control
+ */
+static void rds_ib_set_flow_control(struct rds_connection *conn, u32 credits)
+{
+	struct rds_ib_connection *ic = conn->c_transport_data;
+
+	if (rds_ib_sysctl_flow_control && credits != 0) {
+		/* We're doing flow control */
+		ic->i_flowctl = 1;
+		rds_ib_send_add_credits(conn, credits);
+	} else {
+		ic->i_flowctl = 0;
+	}
+}
+
+/*
  * Connection established.
  * We get here for both outgoing and incoming connection.
  */
@@ -72,12 +88,16 @@ static void rds_ib_connect_complete(stru
 		rds_ib_set_protocol(conn,
 				RDS_PROTOCOL(dp->dp_protocol_major,
 					dp->dp_protocol_minor));
+		rds_ib_set_flow_control(conn, be32_to_cpu(dp->dp_credit));
 	}
 
-	rdsdebug("RDS/IB: ib conn complete on %u.%u.%u.%u version %u.%u\n",
+	printk(KERN_NOTICE "RDS/IB: connected to %u.%u.%u.%u version %u.%u%s\n",
 			NIPQUAD(conn->c_laddr),
 			RDS_PROTOCOL_MAJOR(conn->c_version),
-			RDS_PROTOCOL_MINOR(conn->c_version));
+			RDS_PROTOCOL_MINOR(conn->c_version),
+			ic->i_flowctl? ", flow control" : "");
+
+	rds_ib_recv_refill(conn, GFP_KERNEL, GFP_HIGHUSER, 1);
 
 	/* Tune the RNR timeout. We use a rather low timeout, but
 	 * not the absolute minimum - this should be tunable.
@@ -129,6 +149,24 @@ static void rds_ib_cm_fill_conn_param(st
 		dp->dp_protocol_minor_mask = cpu_to_be16(RDS_IB_SUPPORTED_PROTOCOLS);
 		dp->dp_ack_seq = rds_ib_piggyb_ack(ic);
 
+		/* Advertise flow control.
+		 *
+		 * Major chicken and egg alert!
+		 * We would like to post receive buffers before we get here (eg.
+		 * in rds_ib_setup_qp), so that we can give the peer an accurate
+		 * credit value.
+		 * Unfortunately we can't post receive buffers until we've finished
+		 * protocol negotiation, and know in which order data and payload
+		 * are arranged.
+		 *
+		 * What we do here is we give the peer a small initial credit, and
+		 * initialize the number of posted buffers to a negative value.
+		 */
+		if (ic->i_flowctl) {
+			atomic_set(&ic->i_credits, IB_SET_POST_CREDITS(-4));
+			dp->dp_credit = cpu_to_be32(4);
+		}
+
 		conn_param->private_data = dp;
 		conn_param->private_data_len = sizeof(*dp);
 	}
@@ -363,6 +401,7 @@ static int rds_ib_cm_handle_connect(stru
 	ic = conn->c_transport_data;
 
 	rds_ib_set_protocol(conn, version);
+	rds_ib_set_flow_control(conn, be32_to_cpu(dp->dp_credit));
 
 	/* If the peer gave us the last packet it saw, process this as if
 	 * we had received a regular ACK. */
@@ -428,6 +467,7 @@ out:
 static int rds_ib_cm_initiate_connect(struct rdma_cm_id *cm_id)
 {
 	struct rds_connection *conn = cm_id->context;
+	struct rds_ib_connection *ic = conn->c_transport_data;
 	struct rdma_conn_param conn_param;
 	struct rds_ib_connect_private dp;
 	int ret;
@@ -435,6 +475,7 @@ static int rds_ib_cm_initiate_connect(st
 	/* If the peer doesn't do protocol negotiation, we must
 	 * default to RDSv3.0 */
 	rds_ib_set_protocol(conn, RDS_PROTOCOL_3_0);
+	ic->i_flowctl = rds_ib_sysctl_flow_control;	/* advertise flow control */
 
 	ret = rds_ib_setup_qp(conn);
 	if (ret) {
@@ -688,6 +729,10 @@ void rds_ib_conn_shutdown(struct rds_con
 #endif
 	ic->i_ack_recv = 0;
 
+	/* Clear flow control state */
+	ic->i_flowctl = 0;
+	atomic_set(&ic->i_credits, 0);
+
 	if (ic->i_ibinc) {
 		rds_inc_put(&ic->i_ibinc->ii_inc);
 		ic->i_ibinc = NULL;
Index: ofa_kernel-1.3/net/rds/ib_recv.c
===================================================================
--- ofa_kernel-1.3.orig/net/rds/ib_recv.c
+++ ofa_kernel-1.3/net/rds/ib_recv.c
@@ -220,16 +220,17 @@ out:
  * -1 is returned if posting fails due to temporary resource exhaustion.
  */
 int rds_ib_recv_refill(struct rds_connection *conn, gfp_t kptr_gfp,
-		       gfp_t page_gfp)
+		       gfp_t page_gfp, int prefill)
 {
 	struct rds_ib_connection *ic = conn->c_transport_data;
 	struct rds_ib_recv_work *recv;
 	struct ib_recv_wr *failed_wr;
+	unsigned int posted = 0;
 	int ret = 0;
 	u32 pos;
 
-	while (rds_conn_up(conn) && rds_ib_ring_alloc(&ic->i_recv_ring, 1, &pos)) {
-
+	while ((prefill || rds_conn_up(conn))
+			&& rds_ib_ring_alloc(&ic->i_recv_ring, 1, &pos)) {
 		if (pos >= ic->i_recv_ring.w_nr) {
 			printk(KERN_NOTICE "Argh - ring alloc returned pos=%u\n",
 					pos);
@@ -257,8 +258,14 @@ int rds_ib_recv_refill(struct rds_connec
 			ret = -1;
 			break;
 		}
+
+		posted++;
 	}
 
+	/* We're doing flow control - update the window. */
+	if (ic->i_flowctl && posted)
+		rds_ib_advertise_credits(conn, posted);
+
 	if (ret)
 		rds_ib_ring_unalloc(&ic->i_recv_ring, 1);
 	return ret;
@@ -436,7 +443,7 @@ static u64 rds_ib_get_ack(struct rds_ib_
 #endif
 
 
-static void rds_ib_send_ack(struct rds_ib_connection *ic)
+static void rds_ib_send_ack(struct rds_ib_connection *ic, unsigned int adv_credits)
 {
 	struct rds_header *hdr = ic->i_ack;
 	struct ib_send_wr *failed_wr;
@@ -448,6 +455,7 @@ static void rds_ib_send_ack(struct rds_i
 	rdsdebug("send_ack: ic %p ack %llu\n", ic, (unsigned long long) seq);
 	rds_message_populate_header(hdr, 0, 0, 0);
 	hdr->h_ack = cpu_to_be64(seq);
+	hdr->h_credit = adv_credits;
 	rds_message_make_checksum(hdr);
 	ic->i_ack_queued = jiffies;
 
@@ -460,6 +468,8 @@ static void rds_ib_send_ack(struct rds_i
 		set_bit(IB_ACK_REQUESTED, &ic->i_ack_flags);
 
  		rds_ib_stats_inc(s_ib_ack_send_failure);
+		/* Need to finesse this later. */
+		BUG();
 	} else
 		rds_ib_stats_inc(s_ib_ack_sent);
 }
@@ -502,15 +512,27 @@ static void rds_ib_send_ack(struct rds_i
  * When we get here, we're called from the recv queue handler.
  * Check whether we ought to transmit an ACK.
  */
-static void rds_ib_attempt_ack(struct rds_ib_connection *ic)
+void rds_ib_attempt_ack(struct rds_ib_connection *ic)
 {
+	unsigned int adv_credits;
+
 	if (!test_bit(IB_ACK_REQUESTED, &ic->i_ack_flags))
 		return;
-	if (!test_and_set_bit(IB_ACK_IN_FLIGHT, &ic->i_ack_flags)) {
-		clear_bit(IB_ACK_REQUESTED, &ic->i_ack_flags);
-		rds_ib_send_ack(ic);
-	} else
+
+	if (test_and_set_bit(IB_ACK_IN_FLIGHT, &ic->i_ack_flags)) {
 		rds_ib_stats_inc(s_ib_ack_send_delayed);
+		return;
+	}
+
+	/* Can we get a send credit? */
+	if (!rds_ib_send_grab_credits(ic, 1, &adv_credits)) {
+		rds_ib_stats_inc(s_ib_tx_throttle);
+		clear_bit(IB_ACK_IN_FLIGHT, &ic->i_ack_flags);
+		return;
+	}
+
+	clear_bit(IB_ACK_REQUESTED, &ic->i_ack_flags);
+	rds_ib_send_ack(ic, adv_credits);
 }
 
 /*
@@ -706,6 +728,10 @@ void rds_ib_process_recv(struct rds_conn
 	state->ack_recv = be64_to_cpu(ihdr->h_ack);
 	state->ack_recv_valid = 1;
 
+	/* Process the credits update if there was one */
+	if (ihdr->h_credit)
+		rds_ib_send_add_credits(conn, ihdr->h_credit);
+
 	if (ihdr->h_sport == 0 && ihdr->h_dport == 0 && byte_len == 0) {
 		/* This is an ACK-only packet. The fact that it gets
 		 * special treatment here is that historically, ACKs
@@ -877,7 +903,7 @@ void rds_ib_recv_cq_comp_handler(struct 
 
 	if (mutex_trylock(&ic->i_recv_mutex)) {
 		if (rds_ib_recv_refill(conn, GFP_ATOMIC,
-					 GFP_ATOMIC | __GFP_HIGHMEM))
+					 GFP_ATOMIC | __GFP_HIGHMEM, 0))
 			ret = -EAGAIN;
 		else
 			rds_ib_stats_inc(s_ib_rx_refill_from_cq);
@@ -901,7 +927,7 @@ int rds_ib_recv(struct rds_connection *c
 	 * we're really low and we want the caller to back off for a bit.
 	 */
 	mutex_lock(&ic->i_recv_mutex);
-	if (rds_ib_recv_refill(conn, GFP_KERNEL, GFP_HIGHUSER))
+	if (rds_ib_recv_refill(conn, GFP_KERNEL, GFP_HIGHUSER, 0))
 		ret = -ENOMEM;
 	else
 		rds_ib_stats_inc(s_ib_rx_refill_from_thread);
Index: ofa_kernel-1.3/net/rds/ib.c
===================================================================
--- ofa_kernel-1.3.orig/net/rds/ib.c
+++ ofa_kernel-1.3/net/rds/ib.c
@@ -187,6 +187,7 @@ static void rds_ib_exit(void)
 
 struct rds_transport rds_ib_transport = {
 	.laddr_check		= rds_ib_laddr_check,
+	.xmit_complete		= rds_ib_xmit_complete,
 	.xmit			= rds_ib_xmit,
 	.xmit_cong_map		= NULL,
 	.xmit_rdma		= rds_ib_xmit_rdma,
Index: ofa_kernel-1.3/net/rds/ib_send.c
===================================================================
--- ofa_kernel-1.3.orig/net/rds/ib_send.c
+++ ofa_kernel-1.3/net/rds/ib_send.c
@@ -245,6 +245,144 @@ void rds_ib_send_cq_comp_handler(struct 
 	}
 }
 
+/*
+ * This is the main function for allocating credits when sending
+ * messages.
+ *
+ * Conceptually, we have two counters:
+ *  -	send credits: this tells us how many WRs we're allowed
+ *	to submit without overruning the reciever's queue. For
+ *	each SEND WR we post, we decrement this by one.
+ *
+ *  -	posted credits: this tells us how many WRs we recently
+ *	posted to the receive queue. This value is transferred
+ *	to the peer as a "credit update" in a RDS header field.
+ *	Every time we transmit credits to the peer, we subtract
+ *	the amount of transferred credits from this counter.
+ *
+ * It is essential that we avoid situations where both sides have
+ * exhausted their send credits, and are unable to send new credits
+ * to the peer. We achieve this by requiring that we send at least
+ * one credit update to the peer before exhausting our credits.
+ * When new credits arrive, we subtract one credit that is withheld
+ * until we've posted new buffers and are ready to transmit these
+ * credits (see rds_ib_send_add_credits below).
+ *
+ * The RDS send code is essentially single-threaded; rds_send_xmit
+ * grabs c_send_sem to ensure exclusive access to the send ring.
+ * However, the ACK sending code is independent and can race with
+ * message SENDs.
+ *
+ * In the send path, we need to update the counters for send credits
+ * and the counter of posted buffers atomically - when we use the
+ * last available credit, we cannot allow another thread to race us
+ * and grab the posted credits counter.  Hence, we have to use a
+ * spinlock to protect the credit counter, or use atomics.
+ *
+ * Spinlocks shared between the send and the receive path are bad,
+ * because they create unnecessary delays. An early implementation
+ * using a spinlock showed a 5% degradation in throughput at some
+ * loads.
+ *
+ * This implementation avoids spinlocks completely, putting both
+ * counters into a single atomic, and updating that atomic using
+ * atomic_add (in the receive path, when receiving fresh credits),
+ * and using atomic_cmpxchg when updating the two counters.
+ */
+int rds_ib_send_grab_credits(struct rds_ib_connection *ic,
+			     u32 wanted, u32 *adv_credits)
+{
+	unsigned int avail, posted, got = 0, advertise;
+	long oldval, newval;
+
+	*adv_credits = 0;
+	if (!ic->i_flowctl)
+		return wanted;
+
+try_again:
+	advertise = 0;
+	oldval = newval = atomic_read(&ic->i_credits);
+	posted = IB_GET_POST_CREDITS(oldval);
+	avail = IB_GET_SEND_CREDITS(oldval);
+
+	rdsdebug("rds_ib_send_grab_credits(%u): credits=%u posted=%u\n",
+			wanted, avail, posted);
+
+	/* The last credit must be used to send a credit updated. */
+	if (avail && !posted)
+		avail--;
+
+	if (avail < wanted) {
+		struct rds_connection *conn = ic->i_cm_id->context;
+
+		/* Oops, there aren't that many credits left! */
+		set_bit(RDS_LL_SEND_FULL, &conn->c_flags);
+		got = avail;
+	} else {
+		/* Sometimes you get what you want, lalala. */
+		got = wanted;
+	}
+	newval -= IB_SET_SEND_CREDITS(got);
+
+	if (got && posted) {
+		advertise = min_t(unsigned int, posted, RDS_MAX_ADV_CREDIT);
+		newval -= IB_SET_POST_CREDITS(advertise);
+	}
+
+	/* Finally bill everything */
+	if (atomic_cmpxchg(&ic->i_credits, oldval, newval) != oldval)
+		goto try_again;
+
+	*adv_credits = advertise;
+	return got;
+}
+
+void rds_ib_send_add_credits(struct rds_connection *conn, unsigned int credits)
+{
+	struct rds_ib_connection *ic = conn->c_transport_data;
+
+	if (credits == 0)
+		return;
+
+	rdsdebug("rds_ib_send_add_credits(%u): current=%u%s\n",
+			credits,
+			IB_GET_SEND_CREDITS(atomic_read(&ic->i_credits)),
+			test_bit(RDS_LL_SEND_FULL, &conn->c_flags)? ", ll_send_full" : "");
+
+	atomic_add(IB_SET_SEND_CREDITS(credits), &ic->i_credits);
+	if (test_and_clear_bit(RDS_LL_SEND_FULL, &conn->c_flags))
+		queue_delayed_work(rds_wq, &conn->c_send_w, 0);
+
+	WARN_ON(IB_GET_SEND_CREDITS(credits) >= 16384);
+
+	rds_ib_stats_inc(s_ib_rx_credit_updates);
+}
+
+void rds_ib_advertise_credits(struct rds_connection *conn, unsigned int posted)
+{
+	struct rds_ib_connection *ic = conn->c_transport_data;
+
+	if (posted == 0)
+		return;
+
+	atomic_add(IB_SET_POST_CREDITS(posted), &ic->i_credits);
+
+	/* Decide whether to send an update to the peer now.
+	 * If we would send a credit update for every single buffer we
+	 * post, we would end up with an ACK storm (ACK arrives,
+	 * consumes buffer, we refill the ring, send ACK to remote
+	 * advertising the newly posted buffer... ad inf)
+	 *
+	 * Performance pretty much depends on how often we send
+	 * credit updates - too frequent updates mean lots of ACKs.
+	 * Too infrequent updates, and the peer will run out of
+	 * credits and has to throttle.
+	 * For the time being, 16 seems to be a good compromise.
+	 */
+	if (IB_GET_POST_CREDITS(atomic_read(&ic->i_credits)) >= 16)
+		set_bit(IB_ACK_REQUESTED, &ic->i_ack_flags);
+}
+
 static inline void
 rds_ib_xmit_populate_wr(struct rds_ib_connection *ic,
 		struct rds_ib_send_work *send, unsigned int pos,
@@ -307,6 +445,8 @@ int rds_ib_xmit(struct rds_connection *c
 	u32 pos;
 	u32 i;
 	u32 work_alloc;
+	u32 credit_alloc;
+	u32 adv_credits = 0;
 	int send_flags = 0;
 	int sent;
 	int ret;
@@ -314,6 +454,7 @@ int rds_ib_xmit(struct rds_connection *c
 	BUG_ON(off % RDS_FRAG_SIZE);
 	BUG_ON(hdr_off != 0 && hdr_off != sizeof(struct rds_header));
 
+	/* FIXME we may overallocate here */
 	if (be32_to_cpu(rm->m_inc.i_hdr.h_len) == 0)
 		i = 1;
 	else
@@ -327,8 +468,29 @@ int rds_ib_xmit(struct rds_connection *c
 		goto out;
 	}
 
+	credit_alloc = work_alloc;
+	if (ic->i_flowctl) {
+		credit_alloc = rds_ib_send_grab_credits(ic, work_alloc, &adv_credits);
+		if (credit_alloc < work_alloc) {
+			rds_ib_ring_unalloc(&ic->i_send_ring, work_alloc - credit_alloc);
+			work_alloc = credit_alloc;
+		}
+		if (work_alloc == 0) {
+			rds_ib_ring_unalloc(&ic->i_send_ring, work_alloc);
+			rds_ib_stats_inc(s_ib_tx_throttle);
+			ret = -ENOMEM;
+			goto out;
+		}
+	}
+
 	/* map the message the first time we see it */
 	if (ic->i_rm == NULL) {
+		/*
+		printk(KERN_NOTICE "rds_ib_xmit prep msg dport=%u flags=0x%x len=%d\n",
+				be16_to_cpu(rm->m_inc.i_hdr.h_dport),
+				rm->m_inc.i_hdr.h_flags,
+				be32_to_cpu(rm->m_inc.i_hdr.h_len));
+		   */
 		if (rm->m_nents) {
 			rm->m_count = ib_dma_map_sg(dev,
 					 rm->m_sg, rm->m_nents, DMA_TO_DEVICE);
@@ -449,6 +611,24 @@ add_header:
 		 * have been set up to point to the right header buffer. */
 		memcpy(&ic->i_send_hdrs[pos], &rm->m_inc.i_hdr, sizeof(struct rds_header));
 
+		if (0) {
+			struct rds_header *hdr = &ic->i_send_hdrs[pos];
+
+			printk(KERN_NOTICE "send WR dport=%u flags=0x%x len=%d\n",
+				be16_to_cpu(hdr->h_dport),
+				hdr->h_flags,
+				be32_to_cpu(hdr->h_len));
+		}
+		if (adv_credits) {
+			struct rds_header *hdr = &ic->i_send_hdrs[pos];
+
+			/* add credit and redo the header checksum */
+			hdr->h_credit = adv_credits;
+			rds_message_make_checksum(hdr);
+			adv_credits = 0;
+			rds_ib_stats_inc(s_ib_tx_credit_updates);
+		}
+
 		if (prev)
 			prev->s_wr.next = &send->s_wr;
 		prev = send;
@@ -472,6 +652,8 @@ add_header:
 		rds_ib_ring_unalloc(&ic->i_send_ring, work_alloc - i);
 		work_alloc = i;
 	}
+	if (ic->i_flowctl && i < credit_alloc)
+		rds_ib_send_add_credits(conn, credit_alloc - i);
 
 	/* XXX need to worry about failed_wr and partial sends. */
 	failed_wr = &first->s_wr;
@@ -487,11 +669,14 @@ add_header:
 			ic->i_rm = prev->s_rm;
 			prev->s_rm = NULL;
 		}
+		/* Finesse this later */
+		BUG();
 		goto out;
 	}
 
 	ret = sent;
 out:
+	BUG_ON(adv_credits);
 	return ret;
 }
 
@@ -630,3 +815,12 @@ int rds_ib_xmit_rdma(struct rds_connecti
 out:
 	return ret;
 }
+
+void rds_ib_xmit_complete(struct rds_connection *conn)
+{
+	struct rds_ib_connection *ic = conn->c_transport_data;
+
+	/* We may have a pending ACK or window update we were unable
+	 * to send previously (due to flow control). Try again. */
+	rds_ib_attempt_ack(ic);
+}
Index: ofa_kernel-1.3/net/rds/ib_stats.c
===================================================================
--- ofa_kernel-1.3.orig/net/rds/ib_stats.c
+++ ofa_kernel-1.3/net/rds/ib_stats.c
@@ -46,14 +46,17 @@ static char *rds_ib_stat_names[] = {
 	"ib_tx_cq_call",
 	"ib_tx_cq_event",
 	"ib_tx_ring_full",
+	"ib_tx_throttle",
 	"ib_tx_sg_mapping_failure",
 	"ib_tx_stalled",
+	"ib_tx_credit_updates",
 	"ib_rx_cq_call",
 	"ib_rx_cq_event",
 	"ib_rx_ring_empty",
 	"ib_rx_refill_from_cq",
 	"ib_rx_refill_from_thread",
 	"ib_rx_alloc_limit",
+	"ib_rx_credit_updates",
 	"ib_ack_sent",
 	"ib_ack_send_failure",
 	"ib_ack_send_delayed",
Index: ofa_kernel-1.3/net/rds/rds.h
===================================================================
--- ofa_kernel-1.3.orig/net/rds/rds.h
+++ ofa_kernel-1.3/net/rds/rds.h
@@ -170,6 +170,7 @@ struct rds_connection {
 #define RDS_FLAG_CONG_BITMAP	0x01
 #define RDS_FLAG_ACK_REQUIRED	0x02
 #define RDS_FLAG_RETRANSMITTED	0x04
+#define RDS_MAX_ADV_CREDIT	255
 
 /*
  * Maximum space available for extension headers.
@@ -183,7 +184,8 @@ struct rds_header {
 	__be16	h_sport;
 	__be16	h_dport;
 	u8	h_flags;
-	u8	h_padding[5];
+	u8	h_credit;
+	u8	h_padding[4];
 	__sum16	h_csum;
 
 	u8	h_exthdr[RDS_HEADER_EXT_SPACE];
Index: ofa_kernel-1.3/net/rds/ib_sysctl.c
===================================================================
--- ofa_kernel-1.3.orig/net/rds/ib_sysctl.c
+++ ofa_kernel-1.3/net/rds/ib_sysctl.c
@@ -53,6 +53,8 @@ unsigned long rds_ib_sysctl_max_unsig_by
 static unsigned long rds_ib_sysctl_max_unsig_bytes_min = 1;
 static unsigned long rds_ib_sysctl_max_unsig_bytes_max = ~0UL;
 
+unsigned int rds_ib_sysctl_flow_control = 1;
+
 ctl_table rds_ib_sysctl_table[] = {
 	{
 		.ctl_name       = 1,
@@ -102,6 +104,14 @@ ctl_table rds_ib_sysctl_table[] = {
 		.mode           = 0644,
 		.proc_handler   = &proc_doulongvec_minmax,
 	},
+	{
+		.ctl_name	= 6,
+		.procname	= "flow_control",
+		.data		= &rds_ib_sysctl_flow_control,
+		.maxlen		= sizeof(rds_ib_sysctl_flow_control),
+		.mode		= 0644,
+		.proc_handler	= &proc_dointvec,
+	},
 	{ .ctl_name = 0}
 };
 



More information about the general mailing list