[openib-general] [PATCH] [RFC] - example user mode rdma ping/pong program using CMA

Steve Wise swise at opengridcomputing.com
Wed Feb 8 08:27:22 PST 2006


All,

Attached is a user-mode program, called rping, that uses librdmacm and
libibverbs to implement a ping-pong program over an RC connection.  The
program utilizes SEND, RECV, RDMA READ, and WRITE ops, as well as cq
channels to get cq events, and rdma_get_event() to detect CMA events.
It is multi-threaded.  

I've built it as an example program in librdmacm/examples and tested it
with mthca.  It is useful to test CMA as well as all the major rdma
operations in a transport-neutral way.

If you all find it has utility, please pull it into librdmacm/examples.


Signed-off-by: Steve Wise <swise at opengridcomputing.com>



Index: Makefile.am
===================================================================
--- Makefile.am	(revision 5330)
+++ Makefile.am	(working copy)
@@ -18,9 +18,11 @@
 src_librdmacm_la_SOURCES = src/cma.c
 src_librdmacm_la_LDFLAGS = -avoid-version $(rdmacm_version_script)
 
-bin_PROGRAMS = examples/ucmatose
+bin_PROGRAMS = examples/ucmatose examples/rping
 examples_ucmatose_SOURCES = examples/cmatose.c
 examples_ucmatose_LDADD = $(top_builddir)/src/librdmacm.la
+examples_rping_SOURCES = examples/rping.c
+examples_rping_LDADD = $(top_builddir)/src/librdmacm.la
 
 librdmacmincludedir = $(includedir)/rdma
 
Index: examples/rping.c
===================================================================
--- examples/rping.c	(revision 0)
+++ examples/rping.c	(revision 0)
@@ -0,0 +1,1175 @@
+/*
+ * Copyright (c) 2005 Ammasso, Inc. All rights reserved.
+ * Copyright (c) 2006 Open Grid Computing, Inc. All rights reserved.
+ *
+ * This software is available to you under a choice of one of two
+ * licenses.  You may choose to be licensed under the terms of the GNU
+ * General Public License (GPL) Version 2, available from the file
+ * COPYING in the main directory of this source tree, or the
+ * OpenIB.org BSD license below:
+ *
+ *     Redistribution and use in source and binary forms, with or
+ *     without modification, are permitted provided that the following
+ *     conditions are met:
+ *
+ *      - Redistributions of source code must retain the above
+ *        copyright notice, this list of conditions and the following
+ *        disclaimer.
+ *
+ *      - Redistributions in binary form must reproduce the above
+ *        copyright notice, this list of conditions and the following
+ *        disclaimer in the documentation and/or other materials
+ *        provided with the distribution.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
+ * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
+ * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
+ * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
+ * BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
+ * ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
+ * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+#include <getopt.h>
+#include <stdlib.h>
+#include <string.h>
+#include <stdio.h>
+#include <errno.h>
+#include <sys/types.h>
+#include <netinet/in.h>
+#include <sys/socket.h>
+#include <netdb.h>
+#include <byteswap.h>
+#include <semaphore.h>
+#include <arpa/inet.h>
+#include <pthread.h>
+
+#include <rdma/rdma_cma.h>
+
+static int debug = 0;
+#define DEBUG_LOG if (debug) printf
+
+/*
+ * rping "ping/pong" loop:
+ * 	client sends source rkey/addr/len
+ *	server receives source rkey/add/len
+ *	server rdma reads "ping" data from source
+ * 	server sends "go ahead" on rdma read completion
+ *	client sends sink rkey/addr/len
+ * 	server receives sink rkey/addr/len
+ * 	server rdma writes "pong" data to sink
+ * 	server sends "go ahead" on rdma write completion
+ * 	<repeat loop>
+ */
+
+/*
+ * These states are used to signal events between the completion handler
+ * and the main client or server thread.
+ *
+ * Once CONNECTED, they cycle through RDMA_READ_ADV, RDMA_WRITE_ADV, 
+ * and RDMA_WRITE_COMPLETE for each ping.
+ */
+typedef enum {
+	IDLE = 1,
+	CONNECT_REQUEST,
+	CONNECTED,
+	RDMA_READ_ADV,
+	RDMA_READ_COMPLETE,
+	RDMA_WRITE_ADV,
+	RDMA_WRITE_COMPLETE,
+	ERROR
+} state_t;
+
+/*
+ * Default max buffer size for IO...
+ */
+#define RPING_BUFSIZE 64*1024
+#define RPING_SQ_DEPTH 16
+
+/*
+ * Control block struct.
+ */
+struct rping_cb {
+	int server;			/* 0 iff client */
+	pthread_t cqthread;
+	struct ibv_comp_channel *channel;
+	struct ibv_cq *cq;
+	struct ibv_pd *pd;
+	struct ibv_qp *qp;
+
+	struct ibv_recv_wr rq_wr;	/* recv work request record */
+	struct ibv_sge recv_sgl;	/* recv single SGE */
+	char *recv_buf;			/* malloc'd buffer */
+	struct ibv_mr *recv_mr;		/* MR associated with this buffer */
+
+	struct ibv_send_wr sq_wr;	/* send work requrest record */
+	struct ibv_sge send_sgl;
+	char *send_buf;			/* single send buf */
+	struct ibv_mr *send_mr;
+
+	struct ibv_send_wr rdma_sq_wr;	/* rdma work request record */
+	struct ibv_sge rdma_sgl;	/* rdma single SGE */
+	char *rdma_buf;			/* used as rdma sink */
+	struct ibv_mr *rdma_mr;
+
+
+	uint32_t remote_rkey;		/* remote guys RKEY */
+	uint64_t remote_addr;		/* remote guys TO */
+	uint32_t remote_len;		/* remote guys LEN */
+
+	char *start_buf;		/* rdma read src */
+	struct ibv_mr *start_mr;
+
+	state_t state;			/* used for cond/signalling */
+	sem_t sem;
+
+	uint16_t port;			/* dst port in NBO */
+	uint32_t addr;			/* dst addr in NBO */
+	char *addr_str;			/* dst addr string */
+	int verbose;			/* verbose logging */
+	int count;			/* ping count */
+	int size;			/* ping data size */
+	int validate;			/* validate ping data */
+
+	/* CM stuff */
+	pthread_t cmthread;
+	struct rdma_cm_id *cm_id;	/* connection on client side,*/
+					/* listener on service side. */
+	struct rdma_cm_id *child_cm_id;	/* connection on server side */
+};
+
+
+static void rping_cma_event_handler(struct rdma_cm_id *cma_id,
+				    struct rdma_cm_event *event)
+{
+	int rc = 0;
+	struct rping_cb *cbp = (struct rping_cb *) cma_id->context;
+
+
+	DEBUG_LOG("cma_event type %d cma_id %p (%s)\n",
+		  event->event, cma_id,
+		  (cma_id == cbp->cm_id) ? "parent" : "child");
+	switch (event->event) {
+
+	case RDMA_CM_EVENT_ADDR_RESOLVED:
+		rc = rdma_resolve_route(cma_id, 2000);
+		if (rc) {
+			fprintf(stderr, "rdma_resolve_route error %d\n", rc);
+			cbp->state = ERROR;
+			sem_post(&cbp->sem);
+		}
+		break;
+
+	case RDMA_CM_EVENT_ROUTE_RESOLVED:
+		sem_post(&cbp->sem);
+		break;
+
+	case RDMA_CM_EVENT_CONNECT_REQUEST:
+		cbp->state = CONNECT_REQUEST;
+		cbp->child_cm_id = cma_id;
+		DEBUG_LOG("child cma %p\n", cbp->child_cm_id);
+		sem_post(&cbp->sem);
+		break;
+
+	case RDMA_CM_EVENT_ESTABLISHED:
+		DEBUG_LOG("ESTABLISHED\n");
+		cbp->state = CONNECTED;
+		sem_post(&cbp->sem);
+		break;
+
+	case RDMA_CM_EVENT_ADDR_ERROR:
+	case RDMA_CM_EVENT_ROUTE_ERROR:
+	case RDMA_CM_EVENT_CONNECT_ERROR:
+	case RDMA_CM_EVENT_UNREACHABLE:
+	case RDMA_CM_EVENT_REJECTED:
+		fprintf(stderr, "cma event %d, error %d\n", event->event,
+		       event->status);
+		cbp->state = ERROR;
+		sem_post(&cbp->sem);
+		break;
+
+	case RDMA_CM_EVENT_DISCONNECTED:
+		fprintf(stderr, "DISCONNECT EVENT...\n");
+		cbp->state = ERROR;
+		sem_post(&cbp->sem);
+		break;
+
+	case RDMA_CM_EVENT_DEVICE_REMOVAL:
+		fprintf(stderr, "cma detected device removal!!!!\n");
+		break;
+
+	default:
+		fprintf(stderr, "oof bad type!\n");
+		cbp->state = ERROR;
+		sem_post(&cbp->sem);
+		break;
+
+	}
+	return;
+}
+
+static void rping_cq_event_handler(struct rping_cb *cbp)
+{
+	struct ibv_wc wc;
+	struct ibv_recv_wr *bad_wr;
+	int rc;
+
+	while ((rc = ibv_poll_cq(cbp->cq, 1, &wc)) == 1) {
+		if (wc.status) {
+			fprintf(stderr, "cq completion failed status %d\n",
+			       wc.status);
+			cbp->state = ERROR;
+			sem_post(&cbp->sem);
+			return;
+		}
+		switch (wc.opcode) {
+		case IBV_WC_SEND:
+			DEBUG_LOG("send completion\n");
+			break;
+
+		case IBV_WC_RDMA_WRITE:
+			DEBUG_LOG("rdma write completion\n");
+			cbp->state = RDMA_WRITE_COMPLETE;
+			sem_post(&cbp->sem);
+			break;
+
+		case IBV_WC_RDMA_READ:
+			cbp->state = RDMA_READ_COMPLETE;
+			DEBUG_LOG("rdma read completion\n");
+			sem_post(&cbp->sem);
+			break;
+
+		case IBV_WC_RECV:
+			DEBUG_LOG("recv completion\n");
+			if (cbp->server) {
+				if (wc.byte_len != 16) {
+					fprintf(stderr,
+					       "Received bogus data, size %d\n",
+					       wc.byte_len);
+					cbp->state = ERROR;
+					sem_post(&cbp->sem);
+					return;
+				}
+				cbp->remote_rkey = *((uint32_t *)cbp->recv_buf);
+				cbp->remote_addr =
+				    *((uint64_t *) & cbp->recv_buf[4]);
+				cbp->remote_len =
+				    *((uint32_t *) & cbp->recv_buf[12]);
+				DEBUG_LOG(
+					  "Received rkey %x addr %llx "
+					  "len %d from peer\n",
+					  cbp->remote_rkey, cbp->remote_addr,
+					  cbp->remote_len);
+				if (cbp->state == CONNECTED
+				    || cbp->state == RDMA_WRITE_COMPLETE)
+					cbp->state = RDMA_READ_ADV;
+				else {
+					cbp->state = RDMA_WRITE_ADV;
+				}
+			} else {
+				if (wc.byte_len != 1) {
+					fprintf(stderr,
+					       "Received bogus data, size %d\n",
+					       wc.byte_len);
+					cbp->state = ERROR;
+					sem_post(&cbp->sem);
+					return;
+				}
+				if (cbp->state == RDMA_READ_ADV) {
+					cbp->state = RDMA_WRITE_ADV;
+					DEBUG_LOG("set state to WRITE_ADV\n");
+				} else {
+					cbp->state = RDMA_WRITE_COMPLETE;
+					DEBUG_LOG("set state "
+						  "to WRITE_COMPLETE\n");
+				}
+			}
+
+			/*
+			 * post recv buf again
+			 */
+			rc = ibv_post_recv(cbp->qp, &cbp->rq_wr, &bad_wr);
+			if (rc) {
+				cbp->state = ERROR;
+			}
+			sem_post(&cbp->sem);
+			break;
+
+		default:
+			DEBUG_LOG("unknown!!!!! completion\n");
+			break;
+		}
+	}
+	if (rc) {
+		fprintf(stderr, "poll error %d\n", rc);
+		exit(rc);
+	}
+}
+
+static int rping_accept_cr(struct rping_cb *cbp, char *priv, int len)
+{
+	struct rdma_conn_param conn_param;
+
+	DEBUG_LOG("accept_cr!\n");
+	memset(&conn_param, 0, sizeof conn_param);
+	conn_param.private_data = priv;
+	conn_param.private_data_len = len;
+	conn_param.responder_resources = 1;
+	conn_param.initiator_depth = 1;
+	return rdma_accept(cbp->child_cm_id, &conn_param);
+}
+
+static int rping_connect(struct rping_cb *cbp, char *priv, int len)
+{
+	int rc;
+	struct rdma_conn_param conn_param;
+
+	memset(&conn_param, 0, sizeof conn_param);
+	conn_param.private_data = priv;
+	conn_param.private_data_len = len;
+	conn_param.responder_resources = 1;
+	conn_param.initiator_depth = 1;
+	conn_param.retry_count = 10;
+	rc = rdma_connect(cbp->cm_id, &conn_param);
+	if (rc) {
+		fprintf(stderr, "rdma_connect error %d\n", rc);
+		cbp->state = ERROR;
+		sem_post(&cbp->sem);
+	}
+	return 0;
+}
+
+static void rping_free_buffers(struct rping_cb *cbp)
+{
+	DEBUG_LOG("rping_free_buffers called on cbp %p\n", cbp);
+	ibv_dereg_mr(cbp->recv_mr);
+	free(cbp->recv_buf);
+	ibv_dereg_mr(cbp->send_mr);
+	free(cbp->send_buf);
+	ibv_dereg_mr(cbp->rdma_mr);
+	free(cbp->rdma_buf);
+	if (!cbp->server) {
+		ibv_dereg_mr(cbp->start_mr);
+		free(cbp->start_buf);
+	}
+}
+
+static int rping_setup_buffers(struct rping_cb *cbp)
+{
+	struct ibv_recv_wr *bad_wr;
+	int rc;
+
+	DEBUG_LOG("rping_setup_buffers called on cbp %p\n", cbp);
+	cbp->recv_buf = malloc(RPING_BUFSIZE);
+	if (cbp->recv_buf == NULL) {
+		return ENOMEM;
+	}
+
+	cbp->recv_mr = ibv_reg_mr(cbp->pd, cbp->recv_buf, RPING_BUFSIZE,
+			          IBV_ACCESS_LOCAL_WRITE);
+	if (!(cbp->recv_mr)) {
+		free(cbp->recv_buf);
+		cbp->recv_buf = NULL;
+		return errno;
+	}
+
+	/* 
+	 * these never change
+	 */
+	cbp->recv_sgl.addr = (uint64_t) (unsigned long) cbp->recv_buf;
+	cbp->recv_sgl.length = RPING_BUFSIZE;
+	cbp->recv_sgl.lkey = cbp->recv_mr->lkey;
+
+	/* 
+	 * these never change
+	 */
+	cbp->rq_wr.wr_id = (uint64_t) (unsigned long) &cbp->rq_wr;
+	cbp->rq_wr.sg_list = &cbp->recv_sgl;
+	cbp->rq_wr.num_sge = 1;
+
+	cbp->send_buf = malloc(RPING_BUFSIZE);
+	if (cbp->send_buf == NULL) {
+		ibv_dereg_mr(cbp->recv_mr);
+		free(cbp->recv_buf);
+		cbp->recv_buf = NULL;
+		return ENOMEM;
+	}
+
+	cbp->send_mr = ibv_reg_mr(cbp->pd, cbp->send_buf, RPING_BUFSIZE, 0);
+	if (!(cbp->send_mr)) {
+		ibv_dereg_mr(cbp->recv_mr);
+		free(cbp->recv_buf);
+		free(cbp->send_buf);
+		cbp->recv_buf = NULL;
+		return errno;
+	}
+
+	/* 
+	 * these never change
+	 */
+	cbp->send_sgl.addr = (uint64_t) (unsigned long) cbp->send_buf;
+	cbp->send_sgl.lkey = cbp->send_mr->lkey;
+
+	/* 
+	 * these never change
+	 */
+	cbp->sq_wr.opcode = IBV_WR_SEND;
+	cbp->sq_wr.wr_id = (uint64_t) (unsigned long) &cbp->sq_wr;
+	cbp->sq_wr.num_sge = 1;
+	cbp->sq_wr.sg_list = &cbp->send_sgl;
+	cbp->sq_wr.send_flags = IBV_SEND_SIGNALED;
+
+	cbp->rdma_buf = malloc(RPING_BUFSIZE);
+	if (cbp->rdma_buf == NULL) {
+		ibv_dereg_mr(cbp->send_mr);
+		ibv_dereg_mr(cbp->recv_mr);
+		free(cbp->recv_buf);
+		free(cbp->send_buf);
+		cbp->recv_buf = NULL;
+		return ENOMEM;
+	}
+
+	cbp->rdma_mr = ibv_reg_mr(cbp->pd, cbp->rdma_buf, RPING_BUFSIZE,
+			   IBV_ACCESS_LOCAL_WRITE | IBV_ACCESS_REMOTE_READ |
+			   IBV_ACCESS_REMOTE_WRITE);
+	if (!(cbp->rdma_mr)) {
+		ibv_dereg_mr(cbp->send_mr);
+		ibv_dereg_mr(cbp->recv_mr);
+		free(cbp->recv_buf);
+		free(cbp->send_buf);
+		free(cbp->rdma_buf);
+		cbp->recv_buf = NULL;
+		return errno;
+	}
+
+	/* 
+	 * these never change
+	 */
+	cbp->rdma_sq_wr.wr_id = (uint64_t) (unsigned long) &cbp->rdma_sq_wr;
+	cbp->rdma_sq_wr.sg_list = &cbp->rdma_sgl;
+	cbp->rdma_sq_wr.num_sge = 1;
+	cbp->rdma_sgl.addr = (uint64_t) (unsigned long) cbp->rdma_buf;
+	cbp->rdma_sgl.lkey = cbp->rdma_mr->lkey;
+
+	if (!cbp->server) {
+		cbp->start_buf = malloc(RPING_BUFSIZE);
+		if (cbp->start_buf == NULL) {
+			ibv_dereg_mr(cbp->send_mr);
+			ibv_dereg_mr(cbp->recv_mr);
+			ibv_dereg_mr(cbp->rdma_mr);
+			free(cbp->send_buf);
+			free(cbp->recv_buf);
+			free(cbp->rdma_buf);
+			cbp->recv_buf = NULL;
+			return ENOMEM;
+		}
+
+		cbp->start_mr = ibv_reg_mr(cbp->pd, cbp->start_buf, 
+				           RPING_BUFSIZE,
+					   IBV_ACCESS_LOCAL_WRITE | 
+					   IBV_ACCESS_REMOTE_READ |
+					   IBV_ACCESS_REMOTE_WRITE);
+		if (!(cbp->start_mr)) {
+			ibv_dereg_mr(cbp->send_mr);
+			ibv_dereg_mr(cbp->recv_mr);
+			ibv_dereg_mr(cbp->rdma_mr);
+			free(cbp->send_buf);
+			free(cbp->recv_buf);
+			free(cbp->rdma_buf);
+			free(cbp->start_buf);
+			cbp->recv_buf = NULL;
+			return errno;
+		}
+	}
+
+	rc = ibv_post_recv(cbp->qp, &cbp->rq_wr, &bad_wr);
+	if (rc) {
+		ibv_dereg_mr(cbp->send_mr);
+		ibv_dereg_mr(cbp->recv_mr);
+		ibv_dereg_mr(cbp->rdma_mr);
+		free(cbp->recv_buf);
+		free(cbp->send_buf);
+		free(cbp->rdma_buf);
+		if (!cbp->server) {
+			free(cbp->start_buf);
+			ibv_dereg_mr(cbp->start_mr);
+		}
+		cbp->recv_buf = NULL;
+		return rc;
+	}
+
+	DEBUG_LOG("allocated & registered buffers...\n");
+	return 0;
+}
+
+static int rping_create_qp(struct rping_cb *cbp)
+{
+	struct ibv_qp_init_attr init_attr;
+        struct ibv_qp_attr qp_attr;
+	int rc = 0;
+
+	memset(&init_attr, 0, sizeof(init_attr));
+	init_attr.cap.max_send_wr = RPING_SQ_DEPTH;
+	init_attr.cap.max_recv_wr = 2;
+	init_attr.cap.max_recv_sge = 1;
+	init_attr.cap.max_send_sge = 1;
+	init_attr.qp_type = IBV_QPT_RC;
+	init_attr.send_cq = cbp->cq;
+	init_attr.recv_cq = cbp->cq;
+	if (cbp->server) {
+		rc = rdma_create_qp(cbp->child_cm_id, cbp->pd, &init_attr);
+		cbp->qp = cbp->child_cm_id->qp;
+	} else {
+		rc = rdma_create_qp(cbp->cm_id, cbp->pd, &init_attr);
+		cbp->qp = cbp->cm_id->qp;
+	}
+	if (rc) {
+		cbp->qp = NULL;
+		return rc;
+	}
+
+	/* set REMOTE access rights on QP */
+        qp_attr.qp_access_flags = IBV_ACCESS_REMOTE_READ|
+				  IBV_ACCESS_REMOTE_WRITE;
+        rc = ibv_modify_qp(cbp->qp, &qp_attr, IBV_QP_ACCESS_FLAGS);
+	if (rc) 
+		printf("ibv_modify_qp returned %d\n", rc);
+	return rc;
+}
+
+static void *cm_thread(void *arg)
+{
+	int rc;
+	struct rdma_cm_event *event;
+
+	while (1) {
+		rc = rdma_get_cm_event(&event);
+		if (rc) {
+			fprintf(stderr, "rdma_get_cm_event err %d\n", rc);
+			exit(rc);
+		}
+		rping_cma_event_handler(event->id, event);
+		rdma_ack_cm_event(event);
+	}
+}
+
+static void *cq_thread(void *arg)
+{
+	struct rping_cb *cbp = arg;
+	int rc;
+	
+	DEBUG_LOG("cq_thread started.\n");
+
+	while (1) {
+		struct ibv_cq *ev_cq;
+		void *ev_ctx;
+		
+		rc = ibv_get_cq_event(cbp->channel, &ev_cq, &ev_ctx);
+		if (rc) {
+			fprintf(stderr, "Failed to get cq event!\n");
+			exit(rc);
+		}
+		if (ev_cq != cbp->cq) {
+			fprintf(stderr, "Unkown CQ!\n");
+			exit(-1);
+		}
+		rc = ibv_req_notify_cq(cbp->cq, 0);
+		if (rc) {
+			fprintf(stderr, "Failed to set notify!\n");
+			exit(rc);
+		}
+		rping_cq_event_handler(cbp);
+		ibv_ack_cq_events(cbp->cq, 1);
+	}
+}
+
+static void do_rping(struct rping_cb *cbp)
+{
+	int ping = 0;
+	int start;
+	int cc;
+	unsigned char c;
+	int rc;
+
+	if (cbp->size == 0)
+		cbp->size = 64;
+
+	/* 
+	 * Now doit!
+	 */
+	if (cbp->server) {
+
+		/*
+		 * Create listening endpoint.
+		 */
+		rc = rdma_listen(cbp->cm_id, 3);
+		if (rc) {
+			fprintf(stderr, "listen error %d\n", rc);
+			goto out;
+		}
+		
+		/*
+		 * Wait for a connection request.
+		 */
+		rc = sem_wait(&cbp->sem);
+		if (rc || (cbp->state == ERROR)) {
+			fprintf(stderr,
+			       "wait for CONNECT_REQUEST error %d state %d\n",
+			       rc, cbp->state);
+			goto out;
+		}
+
+		cbp->pd = ibv_alloc_pd(cbp->child_cm_id->verbs);
+		if (!(cbp->pd)) {
+			rc = errno;
+			goto out;
+		}
+		DEBUG_LOG("created pd %p\n", cbp->pd);
+
+		cbp->channel = ibv_create_comp_channel(cbp->child_cm_id->verbs);
+		if (!cbp->channel) {
+			rc = errno;
+			goto out;
+		}
+		DEBUG_LOG("created channel %p\n", cbp->channel);
+
+		cbp->cq = ibv_create_cq(cbp->child_cm_id->verbs, 
+					RPING_SQ_DEPTH * 2, cbp, 
+					cbp->channel, 0);
+		if (!(cbp->cq)) {
+			rc = errno;
+			goto out;
+		}
+		DEBUG_LOG("created cq %p\n", cbp->cq);
+
+		rc = ibv_req_notify_cq(cbp->cq, 0);
+		if (rc) {
+			fprintf(stderr, "Failed to set notify!\n");
+			rc = errno;
+			goto out;
+		}
+
+		pthread_create(&cbp->cqthread, NULL, cq_thread, cbp);
+
+		rc = rping_create_qp(cbp);
+		if (rc) {
+			goto out;
+		}
+		DEBUG_LOG("created qp %p\n", cbp->qp);
+
+		/*
+		 * Setup registered buffers.
+		 */
+		rc = rping_setup_buffers(cbp);
+		if (rc) {
+			goto out;
+		}
+
+		/*
+		 * Accept the connection request.
+		 */
+		rc = rping_accept_cr(cbp, "server", strlen("server") + 1);
+		if (rc) {
+			fprintf(stderr, "accept error %d\n", rc);
+			goto out;
+		}
+		rc = sem_wait(&cbp->sem);
+		if (rc || (cbp->state == ERROR)) {
+			fprintf(stderr, "wait for CONNECTED "
+			       "state error %d state %d\n",
+			       rc, cbp->state);
+			goto out;
+		}
+
+		/*
+		 * Server side ping loop
+		 */
+		while (1) {
+			struct ibv_send_wr *bad_wr;
+
+			/*
+			 * Wait for client's Start STAG/TO/Len
+			 */
+			rc = sem_wait(&cbp->sem);
+			if (rc || (cbp->state == ERROR)) {
+				fprintf(stderr, "wait for RDMA_READ_ADV "
+				       "state error %d state %d\n",
+				       rc, cbp->state);
+				goto out;
+			}
+
+			DEBUG_LOG("server received sink adv\n");
+
+			/*
+			 * Issue RDMA Read.
+			 */
+			cbp->rdma_sq_wr.opcode = IBV_WR_RDMA_READ;
+			cbp->rdma_sq_wr.wr_id =
+			    (uint64_t) (unsigned long) &cbp->rdma_sq_wr;
+			cbp->rdma_sq_wr.sg_list = &cbp->rdma_sgl;
+			cbp->rdma_sq_wr.send_flags = IBV_SEND_SIGNALED;
+			cbp->rdma_sq_wr.wr.rdma.rkey = cbp->remote_rkey;
+			cbp->rdma_sq_wr.wr.rdma.remote_addr = cbp->remote_addr;
+			cbp->rdma_sq_wr.sg_list->length = cbp->remote_len;
+
+			rc = ibv_post_send(cbp->qp, &cbp->rdma_sq_wr, &bad_wr);
+			if (rc) {
+				fprintf(stderr, "post send error %d\n", rc);
+				goto out;
+			}
+			DEBUG_LOG("server posted rdma read req \n");
+
+			/*
+			 * Wait for read completion
+			 */
+			rc = sem_wait(&cbp->sem);
+			if (rc || (cbp->state == ERROR)) {
+				fprintf(stderr, "wait for "
+				       "RDMA_READ_COMPLETE state error %d "
+				       "state %d\n",
+				       rc, cbp->state);
+				goto out;
+			}
+			DEBUG_LOG("server received read complete\n");
+
+			/*
+			 * Display data in recv buf
+			 */
+			if (cbp->verbose) {
+				printf("server ping data: %s\n",
+				       cbp->rdma_buf);
+			}
+
+			/*
+			 * Tell client to continue
+			 */
+			cbp->send_sgl.length = 1;
+			rc = ibv_post_send(cbp->qp, &cbp->sq_wr, &bad_wr);
+			if (rc) {
+				fprintf(stderr, "post send error %d\n", rc);
+				goto out;
+			}
+			DEBUG_LOG("server posted go ahead\n");
+
+			/*
+			 * Wait for client's RDMA STAG/TO/Len
+			 */
+			rc = sem_wait(&cbp->sem);
+			if (rc || (cbp->state == ERROR)) {
+				fprintf(stderr, "wait for RDMA_WRITE_ADV "
+				       "state error %d state %d\n",
+				       rc, cbp->state);
+				goto out;
+			}
+			DEBUG_LOG("server received sink adv\n");
+
+			/*
+			 * RDMA Write echo data
+			 */
+			cbp->rdma_sq_wr.opcode = IBV_WR_RDMA_WRITE;
+			cbp->rdma_sq_wr.wr.rdma.rkey = cbp->remote_rkey;
+			cbp->rdma_sq_wr.wr.rdma.remote_addr = cbp->remote_addr;
+			cbp->rdma_sq_wr.send_flags = IBV_SEND_SIGNALED;
+			cbp->rdma_sq_wr.sg_list->length =
+			    strlen(cbp->rdma_buf) + 1;
+			DEBUG_LOG(
+				  "rdma write from lkey %x laddr %llx len %d\n",
+				  cbp->rdma_sq_wr.sg_list->lkey,
+				  cbp->rdma_sq_wr.sg_list->addr,
+				  cbp->rdma_sq_wr.sg_list->length);
+			rc = ibv_post_send(cbp->qp, &cbp->rdma_sq_wr, &bad_wr);
+			if (rc) {
+				fprintf(stderr, "post send error %d\n", rc);
+				goto out;
+			}
+
+			/*
+			 * Wait for completion
+			 */
+			rc = sem_wait(&cbp->sem);
+			if (rc || (cbp->state == ERROR)) {
+				fprintf(stderr, "waiting for "
+				       "RDMA_WRITE_COMPLETE state error %d\n",
+				       rc);
+				goto out;
+			}
+			DEBUG_LOG("server rdma write complete \n");
+
+			/*
+			 * Tell client to begin again
+			 */
+			cbp->send_sgl.length = 1;
+			rc = ibv_post_send(cbp->qp, &cbp->sq_wr, &bad_wr);
+			if (rc) {
+				fprintf(stderr, "post send error %d\n", rc);
+				goto out;
+			}
+			DEBUG_LOG("server posted go ahead\n");
+		}
+	} else {
+		cbp->pd = ibv_alloc_pd(cbp->cm_id->verbs);
+		if (!(cbp->pd)) {
+			rc = errno;
+			goto out;
+		}
+		DEBUG_LOG("created pd %p\n", cbp->pd);
+
+		cbp->channel = ibv_create_comp_channel(cbp->cm_id->verbs);
+		if (!cbp->channel) {
+			rc = errno;
+			goto out;
+		}
+		DEBUG_LOG("created channel %p\n", cbp->channel);
+
+		cbp->cq = ibv_create_cq(cbp->cm_id->verbs, 
+					RPING_SQ_DEPTH * 2, cbp, cbp->channel, 
+					0);
+		if (!(cbp->cq)) {
+			rc = errno;
+			goto out;
+		}
+		DEBUG_LOG("created cq %p\n", cbp->cq);
+
+		rc = ibv_req_notify_cq(cbp->cq, 0);
+		if (rc) {
+			fprintf(stderr, "Failed to set notify!\n");
+			rc = errno;
+			goto out;
+		}
+
+		pthread_create(&cbp->cqthread, NULL, cq_thread, cbp);
+
+		rc = rping_create_qp(cbp);
+		if (rc) {
+			goto out;
+		}
+		DEBUG_LOG("created qp %p\n", cbp->qp);
+
+		/*
+		 * Setup registered buffers.
+		 */
+		rc = rping_setup_buffers(cbp);
+		if (rc)
+			goto out;
+
+		/*
+		 * Connect to server.
+		 */
+		rc = rping_connect(cbp, "client", strlen("client") + 1);
+		if (rc) {
+			fprintf(stderr, "connect error %d\n", rc);
+			goto out;
+		}
+ 
+		rc = sem_wait(&cbp->sem);
+		if (rc || (cbp->state == ERROR)) {
+			fprintf(stderr,
+			       "wait for CONNECTED error %d state %d\n", rc,
+			       cbp->state);
+			goto out;
+		}
+
+		/*      
+		 * Client side ping loop.
+		 */
+		start = 65;
+		while (1) {
+			int i;
+			struct ibv_send_wr *bad_wr;
+
+			cbp->state = RDMA_READ_ADV;
+
+			++ping;
+			if (cbp->count && (ping > cbp->count)) {
+				goto out;
+			}
+
+			/*
+			 * Put some ascii text in the buffer.
+			 */
+			cc = sprintf(cbp->start_buf, "rdma-ping-%d: ", ping);
+			for (i = cc, c = start; i < cbp->size; i++) {
+				cbp->start_buf[i] = c;
+				c++;
+				if (c > 122)
+					c = 65;
+			}
+			start++;
+			if (start > 122)
+				start = 65;
+			cbp->start_buf[cbp->size] = 0;
+
+			/*
+			 * Send our start buffer rkey/addr/len...
+			 * The server will use this to RDMA READ the ping.
+			 */
+			DEBUG_LOG("Sending Start rkey %x "
+				  "addr %llx len %d for RDMA READ Source\n",
+				  cbp->start_mr->rkey,
+				  (uint64_t) (unsigned long) cbp->start_buf,
+				  cbp->size + 1);
+			cbp->send_sgl.length = 16;
+			*((uint32_t *) (cbp->send_buf)) = cbp->start_mr->rkey;
+			*((uint64_t *) (cbp->send_buf + 4)) =
+			    (uint64_t) (unsigned long) cbp->start_buf;
+			*((uint32_t *) (cbp->send_buf + 12)) = cbp->size + 1;
+			rc = ibv_post_send(cbp->qp, &cbp->sq_wr, &bad_wr);
+			if (rc) {
+				fprintf(stderr, "post send error %d\n", rc);
+				goto out;
+			}
+
+			/*
+			 * Wait for server to ACK
+			 */
+			rc = sem_wait(&cbp->sem);
+			if (rc || (cbp->state == ERROR)) {
+				fprintf(stderr, "wait for RDMA_WRITE_ADV "
+				       "state error %d state %d\n",
+				       rc, cbp->state);
+				goto out;
+			}
+
+			/*
+			 * Send our rdma buffer rkey/addr/len for receiving 
+			 * the ping echo from the server via RDMA_WRITE...
+			 */
+			DEBUG_LOG("Sending rkey %x addr %llx "
+				  "len %d for RDMA WRITE Sink\n",
+				  cbp->rdma_mr->rkey,
+				  (uint64_t) (unsigned long) cbp->rdma_buf,
+				  cbp->size + 1);
+			cbp->send_sgl.length = 16;
+			*((uint32_t *) (cbp->send_buf)) = cbp->rdma_mr->rkey;
+			*((uint64_t *) (cbp->send_buf + 4)) =
+			    (uint64_t) (unsigned long) cbp->rdma_buf;
+			*((uint32_t *) (cbp->send_buf + 12)) = cbp->size + 1;
+			rc = ibv_post_send(cbp->qp, &cbp->sq_wr, &bad_wr);
+			if (rc) {
+				fprintf(stderr, "post send error %d\n", rc);
+				goto out;
+			}
+
+			/*
+			 * Wait for the server to say the RDMA Write is 
+			 * complete.
+			 */
+			rc = sem_wait(&cbp->sem);
+			if (rc || (cbp->state == ERROR)) {
+				fprintf(stderr, "wait for "
+				       "RDMA_WRITE_COMPLETE state error %d "
+				       "state %d\n", rc, cbp->state);
+				goto out;
+			}
+
+			if (cbp->validate) {
+
+				/*
+				 * Validate data
+				 */
+				if (memcmp (cbp->start_buf, cbp->rdma_buf,
+				     cbp->size + 1)) {
+					fprintf(stderr, "data mismatch!\n");
+					goto out;
+				}
+			}
+
+			/*
+			 * Display ping data.
+			 */
+			if (cbp->verbose) {
+				printf("ping data: %s\n", cbp->rdma_buf);
+			} 
+		}
+	}
+out:
+	DEBUG_LOG("disconnecting\n");
+	rdma_disconnect(cbp->cm_id);
+
+	/* cleanup */
+	if (cbp->child_cm_id) {
+		DEBUG_LOG("destroying child cm_id %p\n", cbp->child_cm_id);
+		rdma_destroy_id(cbp->child_cm_id);
+	}
+	if (cbp->qp) {
+		DEBUG_LOG("destroying qp %p\n", cbp->qp);
+		ibv_destroy_qp(cbp->qp);
+	}
+	if (cbp->cq) {
+		DEBUG_LOG("destroying cq %p\n", cbp->cq);
+		ibv_destroy_cq(cbp->cq);
+	}
+	if (cbp->recv_buf) {
+		DEBUG_LOG("freeing bufs/mrs\n");
+		rping_free_buffers(cbp);
+	}
+	if (cbp->pd) {
+		DEBUG_LOG("dealloc pd %p\n", cbp->pd);
+		ibv_dealloc_pd(cbp->pd);
+	}
+	printf("close complete - returning from test \n");
+	return;
+}
+
+static void usage(char *name)
+{
+	printf("%s -c|s [-vVd] [-S size] [-C count] -a addr -p port\n", 
+	       basename(name));
+	printf("\t-c\t\tclient side\n");
+	printf("\t-s\t\tserver side\n");
+	printf("\t-v\t\tdisplay ping data to stdout\n");
+	printf("\t-V\t\tverbosity\n");
+	printf("\t-d\t\tdebug printfs\n");
+	printf("\t-S size \tping data size\n");
+	printf("\t-C count\tping count times\n");
+	printf("\t-a addr\t\taddress\n");
+	printf("\t-p port\t\tport\n");
+}
+
+/*
+ * This function parses the command and executes the appropriate
+ * rping test.  It is assumed this entire function
+ * can execute on the calling thread and sleep if needed.
+ */
+int main(int argc, char *argv[])
+{
+	struct rping_cb *cbp;
+	int op;
+	int rc = 0;
+	struct sockaddr_in sin;
+
+	cbp = malloc(sizeof(*cbp));
+	if (cbp == NULL) {
+		return ENOMEM;
+	}
+	memset(cbp, 0, sizeof(*cbp));
+	cbp->server = -1;
+	cbp->state = IDLE;
+	sem_init(&cbp->sem, 0, 0);
+
+	opterr = 0;
+	while ((op=getopt(argc, argv, "a:p:C:S:t:scvVd")) != -1) {
+		switch (op) {
+		case 'a':
+			cbp->addr_str = optarg;
+			cbp->addr = inet_addr(optarg);
+			DEBUG_LOG("ipaddr (%s)\n", optarg);
+			break;
+		case 'p':
+			cbp->port = htons(atoi(optarg));
+			DEBUG_LOG("port %d\n", (int) atoi(optarg));
+			break;
+		case 's':
+			cbp->server = 1;
+			DEBUG_LOG("server\n");
+			break;
+		case 'c':
+			cbp->server = 0;
+			DEBUG_LOG("client\n");
+			break;
+		case 'S':
+			cbp->size = atoi(optarg) - 1;
+			if ((cbp->size < 1)
+			    || (cbp->size > (RPING_BUFSIZE - 1))) {
+				fprintf(stderr, "Invalid size %d "
+				       "(valid range is 1 to %d)\n",
+				       cbp->size, RPING_BUFSIZE);
+				rc = EINVAL;
+			} else
+				DEBUG_LOG("size %d\n",
+					  (int) atoi(optarg));
+			break;
+		case 'C':
+			cbp->count = atoi(optarg);
+			if (cbp->count < 0) {
+				fprintf(stderr, "Invalid count %d\n",
+				       cbp->count);
+				rc = EINVAL;
+			} else
+				DEBUG_LOG("count %d\n",
+					  (int) cbp->count);
+			break;
+		case 'v':
+			cbp->verbose++;
+			DEBUG_LOG("verbose\n");
+			break;
+		case 'V':
+			cbp->validate++;
+			DEBUG_LOG("validate data\n");
+			break;
+		case 'd':
+			debug++;
+			break;
+		default:
+			usage("rping");
+			rc = EINVAL;
+			break;
+		}
+	}
+	if (rc)
+		goto out;
+
+	if (cbp->server == -1) {
+		fprintf(stderr, "must be either client or server\n");
+		rc = EINVAL;
+		goto out;
+	}
+
+	rc = rdma_create_id(&cbp->cm_id, cbp);
+	if (rc) {
+		rc = errno;
+		cbp->cm_id = NULL;
+		fprintf(stderr, "rdma_create_id error %d\n", rc);
+		goto out;
+	}
+	DEBUG_LOG("created cm_id %p\n", cbp->cm_id);
+
+	pthread_create(&cbp->cmthread, NULL, cm_thread, cbp);
+
+	/*
+	 * Server binds to local addr/port to find the device.  Client resolves 
+	 * the remote addr/port to find the device.
+	 */
+	if (cbp->server) {
+		memset(&sin, 0, sizeof(sin));
+		sin.sin_family = AF_INET;
+		sin.sin_addr.s_addr = cbp->addr;
+		sin.sin_port = cbp->port;
+		rc = rdma_bind_addr(cbp->cm_id, (struct sockaddr *) &sin);
+		if (rc) {
+			fprintf(stderr, "rdma_bind_addr error %d\n", rc);
+			goto out;
+		}
+		DEBUG_LOG("rdma_bind_addr worked\n");
+	} else {
+		memset(&sin, 0, sizeof(sin));
+		sin.sin_family = AF_INET;
+		sin.sin_addr.s_addr = cbp->addr;
+		sin.sin_port = cbp->port;
+		rc = rdma_resolve_addr(cbp->cm_id, NULL,
+				       (struct sockaddr *) &sin, 2000);
+		if (rc) {
+			fprintf(stderr, "rdma_resolve_addr error %d\n", rc);
+			goto out;
+		}
+
+		rc = sem_wait(&cbp->sem);
+		if (rc || cbp->state == ERROR) {
+			fprintf(stderr, "waiting for address resolution "
+			       "error %d state %d\n", rc, cbp->state);
+			goto out;
+		}
+		DEBUG_LOG("rdma_resolve_addr worked\n");
+	}
+
+	do_rping(cbp);
+
+out:
+	if (cbp->cm_id) {
+		DEBUG_LOG("destroy cm_id %p\n", cbp->cm_id);
+		rdma_destroy_id(cbp->cm_id);
+	}
+	free(cbp);
+	return rc;
+}




More information about the general mailing list