[ewg] [PATH librdmacm] rping - persistent server support

Steve Wise swise at opengridcomputing.com
Fri Sep 7 11:12:29 PDT 2007


Sean,

This might be a good patch for ofed-1.3.  It adds a -P option to rping
to allow a server to be persistent and accept many incoming rping client
connections...

Steve.

----------------------------

Persistent rping server.

From: Steve Wise <swise at opengridcomputing.com>

Support a rping server mode where the server handles many incoming 
connections by creating threads to process each new rping session.

Signed-off-by: Steve Wise <swise at opengridcomputing.com>
---

 examples/rping.c |  130 +++++++++++++++++++++++++++++++++++++++++++++++++-----
 man/rping.1      |    6 ++
 2 files changed, 122 insertions(+), 14 deletions(-)

diff --git a/examples/rping.c b/examples/rping.c
index 5098ebc..983ce1c 100644
--- a/examples/rping.c
+++ b/examples/rping.c
@@ -112,6 +112,7 @@ #define RPING_MIN_BUFSIZE       sizeof(s
 struct rping_cb {
 	int server;			/* 0 iff client */
 	pthread_t cqthread;
+	pthread_t persistent_server_thread;
 	struct ibv_comp_channel *channel;
 	struct ibv_cq *cq;
 	struct ibv_pd *pd;
@@ -591,24 +592,26 @@ static void *cq_thread(void *arg)
 	DEBUG_LOG("cq_thread started.\n");
 
 	while (1) {	
+		pthread_testcancel();
+
 		ret = ibv_get_cq_event(cb->channel, &ev_cq, &ev_ctx);
 		if (ret) {
 			fprintf(stderr, "Failed to get cq event!\n");
-			exit(ret);
+			pthread_exit(NULL);
 		}
 		if (ev_cq != cb->cq) {
 			fprintf(stderr, "Unkown CQ!\n");
-			exit(-1);
+			pthread_exit(NULL);
 		}
 		ret = ibv_req_notify_cq(cb->cq, 0);
 		if (ret) {
 			fprintf(stderr, "Failed to set notify!\n");
-			exit(ret);
+			pthread_exit(NULL);
 		}
 		ret = rping_cq_event_handler(cb);
 		ibv_ack_cq_events(cb->cq, 1);
 		if (ret)
-			exit(ret);
+			pthread_exit(NULL);
 	}
 }
 
@@ -748,13 +751,99 @@ static int rping_bind_server(struct rpin
 		return ret;
 	}
 
-	sem_wait(&cb->sem);
-	if (cb->state != CONNECT_REQUEST) {
-		fprintf(stderr, "wait for CONNECT_REQUEST state %d\n",
-			cb->state);
-		return -1;
+	return 0;
+}
+
+static struct rping_cb *clone_cb(struct rping_cb *listening_cb)
+{
+	struct rping_cb *cb = malloc(sizeof *cb);
+	if (!cb)
+		return NULL;
+	*cb = *listening_cb;
+	cb->child_cm_id->context = cb;
+	return cb;
+}
+
+static void free_cb(struct rping_cb *cb)
+{
+	free(cb);
+}
+
+static void *rping_persistent_server_thread(void *arg)
+{
+	struct rping_cb *cb = arg;
+	struct ibv_recv_wr *bad_wr;
+	int ret;
+
+	ret = rping_setup_qp(cb, cb->child_cm_id);
+	if (ret) {
+		fprintf(stderr, "setup_qp failed: %d\n", ret);
+		goto err0;
+	}
+
+	ret = rping_setup_buffers(cb);
+	if (ret) {
+		fprintf(stderr, "rping_setup_buffers failed: %d\n", ret);
+		goto err1;
+	}
+
+	ret = ibv_post_recv(cb->qp, &cb->rq_wr, &bad_wr);
+	if (ret) {
+		fprintf(stderr, "ibv_post_recv failed: %d\n", ret);
+		goto err2;
 	}
 
+	pthread_create(&cb->cqthread, NULL, cq_thread, cb);
+
+	ret = rping_accept(cb);
+	if (ret) {
+		fprintf(stderr, "connect error %d\n", ret);
+		goto err3;
+	}
+
+	rping_test_server(cb);
+	rdma_disconnect(cb->child_cm_id);
+	rping_free_buffers(cb);
+	rping_free_qp(cb);
+	pthread_cancel(cb->cqthread);
+	pthread_join(cb->cqthread, NULL);
+	rdma_destroy_id(cb->child_cm_id);
+	free_cb(cb);
+	return NULL;
+err3:
+	pthread_cancel(cb->cqthread);
+	pthread_join(cb->cqthread, NULL);
+err2:
+	rping_free_buffers(cb);
+err1:
+	rping_free_qp(cb);
+err0:
+	free_cb(cb);
+	return NULL;
+}
+
+static int rping_run_persistent_server(struct rping_cb *listening_cb)
+{
+	int ret;
+	struct rping_cb *cb;
+
+	ret = rping_bind_server(listening_cb);
+	if (ret)
+		return ret;
+
+	while (1) {
+		sem_wait(&listening_cb->sem);
+		if (listening_cb->state != CONNECT_REQUEST) {
+			fprintf(stderr, "wait for CONNECT_REQUEST state %d\n",
+				listening_cb->state);
+			return -1;
+		}
+
+		cb = clone_cb(listening_cb);
+		if (!cb)
+			return -1;
+		pthread_create(&cb->persistent_server_thread, NULL, rping_persistent_server_thread, cb);
+	}
 	return 0;
 }
 
@@ -767,6 +856,13 @@ static int rping_run_server(struct rping
 	if (ret)
 		return ret;
 
+	sem_wait(&cb->sem);
+	if (cb->state != CONNECT_REQUEST) {
+		fprintf(stderr, "wait for CONNECT_REQUEST state %d\n",
+			cb->state);
+		return -1;
+	}
+
 	ret = rping_setup_qp(cb, cb->child_cm_id);
 	if (ret) {
 		fprintf(stderr, "setup_qp failed: %d\n", ret);
@@ -987,6 +1083,7 @@ static void usage(char *name)
 	printf("\t-C count\tping count times\n");
 	printf("\t-a addr\t\taddress\n");
 	printf("\t-p port\t\tport\n");
+	printf("\t-P\t\tpersistent server mode allowing multiple connections\n");
 }
 
 int main(int argc, char *argv[])
@@ -994,6 +1091,7 @@ int main(int argc, char *argv[])
 	struct rping_cb *cb;
 	int op;
 	int ret = 0;
+	int persistent_server = 0;
 
 	cb = malloc(sizeof(*cb));
 	if (!cb)
@@ -1007,13 +1105,16 @@ int main(int argc, char *argv[])
 	sem_init(&cb->sem, 0, 0);
 
 	opterr = 0;
-	while ((op=getopt(argc, argv, "a:p:C:S:t:scvVd")) != -1) {
+	while ((op=getopt(argc, argv, "a:Pp:C:S:t:scvVd")) != -1) {
 		switch (op) {
 		case 'a':
 			cb->addr_str = optarg;
 			cb->addr = inet_addr(optarg);
 			DEBUG_LOG("ipaddr (%s)\n", optarg);
 			break;
+		case 'P':
+			persistent_server = 1;
+			break;
 		case 'p':
 			cb->port = htons(atoi(optarg));
 			DEBUG_LOG("port %d\n", (int) atoi(optarg));
@@ -1089,9 +1190,12 @@ int main(int argc, char *argv[])
 
 	pthread_create(&cb->cmthread, NULL, cm_thread, cb);
 
-	if (cb->server)
-		ret = rping_run_server(cb);
-	else
+	if (cb->server) {
+		if (persistent_server)
+			ret = rping_run_persistent_server(cb);
+		else
+			ret = rping_run_server(cb);
+	} else
 		ret = rping_run_client(cb);
 
 	DEBUG_LOG("destroy cm_id %p\n", cb->cm_id);
diff --git a/man/rping.1 b/man/rping.1
index 153436a..a2b7b6b 100644
--- a/man/rping.1
+++ b/man/rping.1
@@ -4,7 +4,7 @@ rping \- RDMA CM connection and RDMA pin
 .SH SYNOPSIS
 .sp
 .nf
-\fIrping\fR -s [-v] [-V] [-d] [-a address] [-p port]
+\fIrping\fR -s [-v] [-V] [-d] [-P] [-a address] [-p port]
 		[-C message_count] [-S message_size]
 \fIrping\fR -c [-v] [-V] [-d] -a address [-p port]
 		[-C message_count] [-S message_size]
@@ -42,6 +42,10 @@ The number of messages to transfer over 
 .TP
 \-S message_size
 The size of each message transferred, in bytes.  (default 100)
+.TP
+\-P
+Run the server in persistent mode.  This allows multiple rping clients
+to connect to a single server instance. The server will run until killed.
 .SH "NOTES"
 Because this test maps RDMA resources to userspace, users must ensure
 that they have available system resources and permissions.  See the





More information about the ewg mailing list