[openib-general] [PATCH] [RFC] - example user mode rdma ping/pong program using CMA
Steve Wise
swise at opengridcomputing.com
Wed Feb 8 08:27:22 PST 2006
All,
Attached is a user-mode program, called rping, that uses librdmacm and
libibverbs to implement a ping-pong program over an RC connection. The
program utilizes SEND, RECV, RDMA READ, and WRITE ops, as well as cq
channels to get cq events, and rdma_get_event() to detect CMA events.
It is multi-threaded.
I've built it as an example program in librdmacm/examples and tested it
with mthca. It is useful to test CMA as well as all the major rdma
operations in a transport-neutral way.
If you all find it has utility, please pull it into librdmacm/examples.
Signed-off-by: Steve Wise <swise at opengridcomputing.com>
Index: Makefile.am
===================================================================
--- Makefile.am (revision 5330)
+++ Makefile.am (working copy)
@@ -18,9 +18,11 @@
src_librdmacm_la_SOURCES = src/cma.c
src_librdmacm_la_LDFLAGS = -avoid-version $(rdmacm_version_script)
-bin_PROGRAMS = examples/ucmatose
+bin_PROGRAMS = examples/ucmatose examples/rping
examples_ucmatose_SOURCES = examples/cmatose.c
examples_ucmatose_LDADD = $(top_builddir)/src/librdmacm.la
+examples_rping_SOURCES = examples/rping.c
+examples_rping_LDADD = $(top_builddir)/src/librdmacm.la
librdmacmincludedir = $(includedir)/rdma
Index: examples/rping.c
===================================================================
--- examples/rping.c (revision 0)
+++ examples/rping.c (revision 0)
@@ -0,0 +1,1175 @@
+/*
+ * Copyright (c) 2005 Ammasso, Inc. All rights reserved.
+ * Copyright (c) 2006 Open Grid Computing, Inc. All rights reserved.
+ *
+ * This software is available to you under a choice of one of two
+ * licenses. You may choose to be licensed under the terms of the GNU
+ * General Public License (GPL) Version 2, available from the file
+ * COPYING in the main directory of this source tree, or the
+ * OpenIB.org BSD license below:
+ *
+ * Redistribution and use in source and binary forms, with or
+ * without modification, are permitted provided that the following
+ * conditions are met:
+ *
+ * - Redistributions of source code must retain the above
+ * copyright notice, this list of conditions and the following
+ * disclaimer.
+ *
+ * - Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following
+ * disclaimer in the documentation and/or other materials
+ * provided with the distribution.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
+ * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
+ * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
+ * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
+ * BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
+ * ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
+ * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+#include <getopt.h>
+#include <stdlib.h>
+#include <string.h>
+#include <stdio.h>
+#include <errno.h>
+#include <sys/types.h>
+#include <netinet/in.h>
+#include <sys/socket.h>
+#include <netdb.h>
+#include <byteswap.h>
+#include <semaphore.h>
+#include <arpa/inet.h>
+#include <pthread.h>
+
+#include <rdma/rdma_cma.h>
+
+static int debug = 0;
+#define DEBUG_LOG if (debug) printf
+
+/*
+ * rping "ping/pong" loop:
+ * client sends source rkey/addr/len
+ * server receives source rkey/add/len
+ * server rdma reads "ping" data from source
+ * server sends "go ahead" on rdma read completion
+ * client sends sink rkey/addr/len
+ * server receives sink rkey/addr/len
+ * server rdma writes "pong" data to sink
+ * server sends "go ahead" on rdma write completion
+ * <repeat loop>
+ */
+
+/*
+ * These states are used to signal events between the completion handler
+ * and the main client or server thread.
+ *
+ * Once CONNECTED, they cycle through RDMA_READ_ADV, RDMA_WRITE_ADV,
+ * and RDMA_WRITE_COMPLETE for each ping.
+ */
+typedef enum {
+ IDLE = 1,
+ CONNECT_REQUEST,
+ CONNECTED,
+ RDMA_READ_ADV,
+ RDMA_READ_COMPLETE,
+ RDMA_WRITE_ADV,
+ RDMA_WRITE_COMPLETE,
+ ERROR
+} state_t;
+
+/*
+ * Default max buffer size for IO...
+ */
+#define RPING_BUFSIZE 64*1024
+#define RPING_SQ_DEPTH 16
+
+/*
+ * Control block struct.
+ */
+struct rping_cb {
+ int server; /* 0 iff client */
+ pthread_t cqthread;
+ struct ibv_comp_channel *channel;
+ struct ibv_cq *cq;
+ struct ibv_pd *pd;
+ struct ibv_qp *qp;
+
+ struct ibv_recv_wr rq_wr; /* recv work request record */
+ struct ibv_sge recv_sgl; /* recv single SGE */
+ char *recv_buf; /* malloc'd buffer */
+ struct ibv_mr *recv_mr; /* MR associated with this buffer */
+
+ struct ibv_send_wr sq_wr; /* send work requrest record */
+ struct ibv_sge send_sgl;
+ char *send_buf; /* single send buf */
+ struct ibv_mr *send_mr;
+
+ struct ibv_send_wr rdma_sq_wr; /* rdma work request record */
+ struct ibv_sge rdma_sgl; /* rdma single SGE */
+ char *rdma_buf; /* used as rdma sink */
+ struct ibv_mr *rdma_mr;
+
+
+ uint32_t remote_rkey; /* remote guys RKEY */
+ uint64_t remote_addr; /* remote guys TO */
+ uint32_t remote_len; /* remote guys LEN */
+
+ char *start_buf; /* rdma read src */
+ struct ibv_mr *start_mr;
+
+ state_t state; /* used for cond/signalling */
+ sem_t sem;
+
+ uint16_t port; /* dst port in NBO */
+ uint32_t addr; /* dst addr in NBO */
+ char *addr_str; /* dst addr string */
+ int verbose; /* verbose logging */
+ int count; /* ping count */
+ int size; /* ping data size */
+ int validate; /* validate ping data */
+
+ /* CM stuff */
+ pthread_t cmthread;
+ struct rdma_cm_id *cm_id; /* connection on client side,*/
+ /* listener on service side. */
+ struct rdma_cm_id *child_cm_id; /* connection on server side */
+};
+
+
+static void rping_cma_event_handler(struct rdma_cm_id *cma_id,
+ struct rdma_cm_event *event)
+{
+ int rc = 0;
+ struct rping_cb *cbp = (struct rping_cb *) cma_id->context;
+
+
+ DEBUG_LOG("cma_event type %d cma_id %p (%s)\n",
+ event->event, cma_id,
+ (cma_id == cbp->cm_id) ? "parent" : "child");
+ switch (event->event) {
+
+ case RDMA_CM_EVENT_ADDR_RESOLVED:
+ rc = rdma_resolve_route(cma_id, 2000);
+ if (rc) {
+ fprintf(stderr, "rdma_resolve_route error %d\n", rc);
+ cbp->state = ERROR;
+ sem_post(&cbp->sem);
+ }
+ break;
+
+ case RDMA_CM_EVENT_ROUTE_RESOLVED:
+ sem_post(&cbp->sem);
+ break;
+
+ case RDMA_CM_EVENT_CONNECT_REQUEST:
+ cbp->state = CONNECT_REQUEST;
+ cbp->child_cm_id = cma_id;
+ DEBUG_LOG("child cma %p\n", cbp->child_cm_id);
+ sem_post(&cbp->sem);
+ break;
+
+ case RDMA_CM_EVENT_ESTABLISHED:
+ DEBUG_LOG("ESTABLISHED\n");
+ cbp->state = CONNECTED;
+ sem_post(&cbp->sem);
+ break;
+
+ case RDMA_CM_EVENT_ADDR_ERROR:
+ case RDMA_CM_EVENT_ROUTE_ERROR:
+ case RDMA_CM_EVENT_CONNECT_ERROR:
+ case RDMA_CM_EVENT_UNREACHABLE:
+ case RDMA_CM_EVENT_REJECTED:
+ fprintf(stderr, "cma event %d, error %d\n", event->event,
+ event->status);
+ cbp->state = ERROR;
+ sem_post(&cbp->sem);
+ break;
+
+ case RDMA_CM_EVENT_DISCONNECTED:
+ fprintf(stderr, "DISCONNECT EVENT...\n");
+ cbp->state = ERROR;
+ sem_post(&cbp->sem);
+ break;
+
+ case RDMA_CM_EVENT_DEVICE_REMOVAL:
+ fprintf(stderr, "cma detected device removal!!!!\n");
+ break;
+
+ default:
+ fprintf(stderr, "oof bad type!\n");
+ cbp->state = ERROR;
+ sem_post(&cbp->sem);
+ break;
+
+ }
+ return;
+}
+
+static void rping_cq_event_handler(struct rping_cb *cbp)
+{
+ struct ibv_wc wc;
+ struct ibv_recv_wr *bad_wr;
+ int rc;
+
+ while ((rc = ibv_poll_cq(cbp->cq, 1, &wc)) == 1) {
+ if (wc.status) {
+ fprintf(stderr, "cq completion failed status %d\n",
+ wc.status);
+ cbp->state = ERROR;
+ sem_post(&cbp->sem);
+ return;
+ }
+ switch (wc.opcode) {
+ case IBV_WC_SEND:
+ DEBUG_LOG("send completion\n");
+ break;
+
+ case IBV_WC_RDMA_WRITE:
+ DEBUG_LOG("rdma write completion\n");
+ cbp->state = RDMA_WRITE_COMPLETE;
+ sem_post(&cbp->sem);
+ break;
+
+ case IBV_WC_RDMA_READ:
+ cbp->state = RDMA_READ_COMPLETE;
+ DEBUG_LOG("rdma read completion\n");
+ sem_post(&cbp->sem);
+ break;
+
+ case IBV_WC_RECV:
+ DEBUG_LOG("recv completion\n");
+ if (cbp->server) {
+ if (wc.byte_len != 16) {
+ fprintf(stderr,
+ "Received bogus data, size %d\n",
+ wc.byte_len);
+ cbp->state = ERROR;
+ sem_post(&cbp->sem);
+ return;
+ }
+ cbp->remote_rkey = *((uint32_t *)cbp->recv_buf);
+ cbp->remote_addr =
+ *((uint64_t *) & cbp->recv_buf[4]);
+ cbp->remote_len =
+ *((uint32_t *) & cbp->recv_buf[12]);
+ DEBUG_LOG(
+ "Received rkey %x addr %llx "
+ "len %d from peer\n",
+ cbp->remote_rkey, cbp->remote_addr,
+ cbp->remote_len);
+ if (cbp->state == CONNECTED
+ || cbp->state == RDMA_WRITE_COMPLETE)
+ cbp->state = RDMA_READ_ADV;
+ else {
+ cbp->state = RDMA_WRITE_ADV;
+ }
+ } else {
+ if (wc.byte_len != 1) {
+ fprintf(stderr,
+ "Received bogus data, size %d\n",
+ wc.byte_len);
+ cbp->state = ERROR;
+ sem_post(&cbp->sem);
+ return;
+ }
+ if (cbp->state == RDMA_READ_ADV) {
+ cbp->state = RDMA_WRITE_ADV;
+ DEBUG_LOG("set state to WRITE_ADV\n");
+ } else {
+ cbp->state = RDMA_WRITE_COMPLETE;
+ DEBUG_LOG("set state "
+ "to WRITE_COMPLETE\n");
+ }
+ }
+
+ /*
+ * post recv buf again
+ */
+ rc = ibv_post_recv(cbp->qp, &cbp->rq_wr, &bad_wr);
+ if (rc) {
+ cbp->state = ERROR;
+ }
+ sem_post(&cbp->sem);
+ break;
+
+ default:
+ DEBUG_LOG("unknown!!!!! completion\n");
+ break;
+ }
+ }
+ if (rc) {
+ fprintf(stderr, "poll error %d\n", rc);
+ exit(rc);
+ }
+}
+
+static int rping_accept_cr(struct rping_cb *cbp, char *priv, int len)
+{
+ struct rdma_conn_param conn_param;
+
+ DEBUG_LOG("accept_cr!\n");
+ memset(&conn_param, 0, sizeof conn_param);
+ conn_param.private_data = priv;
+ conn_param.private_data_len = len;
+ conn_param.responder_resources = 1;
+ conn_param.initiator_depth = 1;
+ return rdma_accept(cbp->child_cm_id, &conn_param);
+}
+
+static int rping_connect(struct rping_cb *cbp, char *priv, int len)
+{
+ int rc;
+ struct rdma_conn_param conn_param;
+
+ memset(&conn_param, 0, sizeof conn_param);
+ conn_param.private_data = priv;
+ conn_param.private_data_len = len;
+ conn_param.responder_resources = 1;
+ conn_param.initiator_depth = 1;
+ conn_param.retry_count = 10;
+ rc = rdma_connect(cbp->cm_id, &conn_param);
+ if (rc) {
+ fprintf(stderr, "rdma_connect error %d\n", rc);
+ cbp->state = ERROR;
+ sem_post(&cbp->sem);
+ }
+ return 0;
+}
+
+static void rping_free_buffers(struct rping_cb *cbp)
+{
+ DEBUG_LOG("rping_free_buffers called on cbp %p\n", cbp);
+ ibv_dereg_mr(cbp->recv_mr);
+ free(cbp->recv_buf);
+ ibv_dereg_mr(cbp->send_mr);
+ free(cbp->send_buf);
+ ibv_dereg_mr(cbp->rdma_mr);
+ free(cbp->rdma_buf);
+ if (!cbp->server) {
+ ibv_dereg_mr(cbp->start_mr);
+ free(cbp->start_buf);
+ }
+}
+
+static int rping_setup_buffers(struct rping_cb *cbp)
+{
+ struct ibv_recv_wr *bad_wr;
+ int rc;
+
+ DEBUG_LOG("rping_setup_buffers called on cbp %p\n", cbp);
+ cbp->recv_buf = malloc(RPING_BUFSIZE);
+ if (cbp->recv_buf == NULL) {
+ return ENOMEM;
+ }
+
+ cbp->recv_mr = ibv_reg_mr(cbp->pd, cbp->recv_buf, RPING_BUFSIZE,
+ IBV_ACCESS_LOCAL_WRITE);
+ if (!(cbp->recv_mr)) {
+ free(cbp->recv_buf);
+ cbp->recv_buf = NULL;
+ return errno;
+ }
+
+ /*
+ * these never change
+ */
+ cbp->recv_sgl.addr = (uint64_t) (unsigned long) cbp->recv_buf;
+ cbp->recv_sgl.length = RPING_BUFSIZE;
+ cbp->recv_sgl.lkey = cbp->recv_mr->lkey;
+
+ /*
+ * these never change
+ */
+ cbp->rq_wr.wr_id = (uint64_t) (unsigned long) &cbp->rq_wr;
+ cbp->rq_wr.sg_list = &cbp->recv_sgl;
+ cbp->rq_wr.num_sge = 1;
+
+ cbp->send_buf = malloc(RPING_BUFSIZE);
+ if (cbp->send_buf == NULL) {
+ ibv_dereg_mr(cbp->recv_mr);
+ free(cbp->recv_buf);
+ cbp->recv_buf = NULL;
+ return ENOMEM;
+ }
+
+ cbp->send_mr = ibv_reg_mr(cbp->pd, cbp->send_buf, RPING_BUFSIZE, 0);
+ if (!(cbp->send_mr)) {
+ ibv_dereg_mr(cbp->recv_mr);
+ free(cbp->recv_buf);
+ free(cbp->send_buf);
+ cbp->recv_buf = NULL;
+ return errno;
+ }
+
+ /*
+ * these never change
+ */
+ cbp->send_sgl.addr = (uint64_t) (unsigned long) cbp->send_buf;
+ cbp->send_sgl.lkey = cbp->send_mr->lkey;
+
+ /*
+ * these never change
+ */
+ cbp->sq_wr.opcode = IBV_WR_SEND;
+ cbp->sq_wr.wr_id = (uint64_t) (unsigned long) &cbp->sq_wr;
+ cbp->sq_wr.num_sge = 1;
+ cbp->sq_wr.sg_list = &cbp->send_sgl;
+ cbp->sq_wr.send_flags = IBV_SEND_SIGNALED;
+
+ cbp->rdma_buf = malloc(RPING_BUFSIZE);
+ if (cbp->rdma_buf == NULL) {
+ ibv_dereg_mr(cbp->send_mr);
+ ibv_dereg_mr(cbp->recv_mr);
+ free(cbp->recv_buf);
+ free(cbp->send_buf);
+ cbp->recv_buf = NULL;
+ return ENOMEM;
+ }
+
+ cbp->rdma_mr = ibv_reg_mr(cbp->pd, cbp->rdma_buf, RPING_BUFSIZE,
+ IBV_ACCESS_LOCAL_WRITE | IBV_ACCESS_REMOTE_READ |
+ IBV_ACCESS_REMOTE_WRITE);
+ if (!(cbp->rdma_mr)) {
+ ibv_dereg_mr(cbp->send_mr);
+ ibv_dereg_mr(cbp->recv_mr);
+ free(cbp->recv_buf);
+ free(cbp->send_buf);
+ free(cbp->rdma_buf);
+ cbp->recv_buf = NULL;
+ return errno;
+ }
+
+ /*
+ * these never change
+ */
+ cbp->rdma_sq_wr.wr_id = (uint64_t) (unsigned long) &cbp->rdma_sq_wr;
+ cbp->rdma_sq_wr.sg_list = &cbp->rdma_sgl;
+ cbp->rdma_sq_wr.num_sge = 1;
+ cbp->rdma_sgl.addr = (uint64_t) (unsigned long) cbp->rdma_buf;
+ cbp->rdma_sgl.lkey = cbp->rdma_mr->lkey;
+
+ if (!cbp->server) {
+ cbp->start_buf = malloc(RPING_BUFSIZE);
+ if (cbp->start_buf == NULL) {
+ ibv_dereg_mr(cbp->send_mr);
+ ibv_dereg_mr(cbp->recv_mr);
+ ibv_dereg_mr(cbp->rdma_mr);
+ free(cbp->send_buf);
+ free(cbp->recv_buf);
+ free(cbp->rdma_buf);
+ cbp->recv_buf = NULL;
+ return ENOMEM;
+ }
+
+ cbp->start_mr = ibv_reg_mr(cbp->pd, cbp->start_buf,
+ RPING_BUFSIZE,
+ IBV_ACCESS_LOCAL_WRITE |
+ IBV_ACCESS_REMOTE_READ |
+ IBV_ACCESS_REMOTE_WRITE);
+ if (!(cbp->start_mr)) {
+ ibv_dereg_mr(cbp->send_mr);
+ ibv_dereg_mr(cbp->recv_mr);
+ ibv_dereg_mr(cbp->rdma_mr);
+ free(cbp->send_buf);
+ free(cbp->recv_buf);
+ free(cbp->rdma_buf);
+ free(cbp->start_buf);
+ cbp->recv_buf = NULL;
+ return errno;
+ }
+ }
+
+ rc = ibv_post_recv(cbp->qp, &cbp->rq_wr, &bad_wr);
+ if (rc) {
+ ibv_dereg_mr(cbp->send_mr);
+ ibv_dereg_mr(cbp->recv_mr);
+ ibv_dereg_mr(cbp->rdma_mr);
+ free(cbp->recv_buf);
+ free(cbp->send_buf);
+ free(cbp->rdma_buf);
+ if (!cbp->server) {
+ free(cbp->start_buf);
+ ibv_dereg_mr(cbp->start_mr);
+ }
+ cbp->recv_buf = NULL;
+ return rc;
+ }
+
+ DEBUG_LOG("allocated & registered buffers...\n");
+ return 0;
+}
+
+static int rping_create_qp(struct rping_cb *cbp)
+{
+ struct ibv_qp_init_attr init_attr;
+ struct ibv_qp_attr qp_attr;
+ int rc = 0;
+
+ memset(&init_attr, 0, sizeof(init_attr));
+ init_attr.cap.max_send_wr = RPING_SQ_DEPTH;
+ init_attr.cap.max_recv_wr = 2;
+ init_attr.cap.max_recv_sge = 1;
+ init_attr.cap.max_send_sge = 1;
+ init_attr.qp_type = IBV_QPT_RC;
+ init_attr.send_cq = cbp->cq;
+ init_attr.recv_cq = cbp->cq;
+ if (cbp->server) {
+ rc = rdma_create_qp(cbp->child_cm_id, cbp->pd, &init_attr);
+ cbp->qp = cbp->child_cm_id->qp;
+ } else {
+ rc = rdma_create_qp(cbp->cm_id, cbp->pd, &init_attr);
+ cbp->qp = cbp->cm_id->qp;
+ }
+ if (rc) {
+ cbp->qp = NULL;
+ return rc;
+ }
+
+ /* set REMOTE access rights on QP */
+ qp_attr.qp_access_flags = IBV_ACCESS_REMOTE_READ|
+ IBV_ACCESS_REMOTE_WRITE;
+ rc = ibv_modify_qp(cbp->qp, &qp_attr, IBV_QP_ACCESS_FLAGS);
+ if (rc)
+ printf("ibv_modify_qp returned %d\n", rc);
+ return rc;
+}
+
+static void *cm_thread(void *arg)
+{
+ int rc;
+ struct rdma_cm_event *event;
+
+ while (1) {
+ rc = rdma_get_cm_event(&event);
+ if (rc) {
+ fprintf(stderr, "rdma_get_cm_event err %d\n", rc);
+ exit(rc);
+ }
+ rping_cma_event_handler(event->id, event);
+ rdma_ack_cm_event(event);
+ }
+}
+
+static void *cq_thread(void *arg)
+{
+ struct rping_cb *cbp = arg;
+ int rc;
+
+ DEBUG_LOG("cq_thread started.\n");
+
+ while (1) {
+ struct ibv_cq *ev_cq;
+ void *ev_ctx;
+
+ rc = ibv_get_cq_event(cbp->channel, &ev_cq, &ev_ctx);
+ if (rc) {
+ fprintf(stderr, "Failed to get cq event!\n");
+ exit(rc);
+ }
+ if (ev_cq != cbp->cq) {
+ fprintf(stderr, "Unkown CQ!\n");
+ exit(-1);
+ }
+ rc = ibv_req_notify_cq(cbp->cq, 0);
+ if (rc) {
+ fprintf(stderr, "Failed to set notify!\n");
+ exit(rc);
+ }
+ rping_cq_event_handler(cbp);
+ ibv_ack_cq_events(cbp->cq, 1);
+ }
+}
+
+static void do_rping(struct rping_cb *cbp)
+{
+ int ping = 0;
+ int start;
+ int cc;
+ unsigned char c;
+ int rc;
+
+ if (cbp->size == 0)
+ cbp->size = 64;
+
+ /*
+ * Now doit!
+ */
+ if (cbp->server) {
+
+ /*
+ * Create listening endpoint.
+ */
+ rc = rdma_listen(cbp->cm_id, 3);
+ if (rc) {
+ fprintf(stderr, "listen error %d\n", rc);
+ goto out;
+ }
+
+ /*
+ * Wait for a connection request.
+ */
+ rc = sem_wait(&cbp->sem);
+ if (rc || (cbp->state == ERROR)) {
+ fprintf(stderr,
+ "wait for CONNECT_REQUEST error %d state %d\n",
+ rc, cbp->state);
+ goto out;
+ }
+
+ cbp->pd = ibv_alloc_pd(cbp->child_cm_id->verbs);
+ if (!(cbp->pd)) {
+ rc = errno;
+ goto out;
+ }
+ DEBUG_LOG("created pd %p\n", cbp->pd);
+
+ cbp->channel = ibv_create_comp_channel(cbp->child_cm_id->verbs);
+ if (!cbp->channel) {
+ rc = errno;
+ goto out;
+ }
+ DEBUG_LOG("created channel %p\n", cbp->channel);
+
+ cbp->cq = ibv_create_cq(cbp->child_cm_id->verbs,
+ RPING_SQ_DEPTH * 2, cbp,
+ cbp->channel, 0);
+ if (!(cbp->cq)) {
+ rc = errno;
+ goto out;
+ }
+ DEBUG_LOG("created cq %p\n", cbp->cq);
+
+ rc = ibv_req_notify_cq(cbp->cq, 0);
+ if (rc) {
+ fprintf(stderr, "Failed to set notify!\n");
+ rc = errno;
+ goto out;
+ }
+
+ pthread_create(&cbp->cqthread, NULL, cq_thread, cbp);
+
+ rc = rping_create_qp(cbp);
+ if (rc) {
+ goto out;
+ }
+ DEBUG_LOG("created qp %p\n", cbp->qp);
+
+ /*
+ * Setup registered buffers.
+ */
+ rc = rping_setup_buffers(cbp);
+ if (rc) {
+ goto out;
+ }
+
+ /*
+ * Accept the connection request.
+ */
+ rc = rping_accept_cr(cbp, "server", strlen("server") + 1);
+ if (rc) {
+ fprintf(stderr, "accept error %d\n", rc);
+ goto out;
+ }
+ rc = sem_wait(&cbp->sem);
+ if (rc || (cbp->state == ERROR)) {
+ fprintf(stderr, "wait for CONNECTED "
+ "state error %d state %d\n",
+ rc, cbp->state);
+ goto out;
+ }
+
+ /*
+ * Server side ping loop
+ */
+ while (1) {
+ struct ibv_send_wr *bad_wr;
+
+ /*
+ * Wait for client's Start STAG/TO/Len
+ */
+ rc = sem_wait(&cbp->sem);
+ if (rc || (cbp->state == ERROR)) {
+ fprintf(stderr, "wait for RDMA_READ_ADV "
+ "state error %d state %d\n",
+ rc, cbp->state);
+ goto out;
+ }
+
+ DEBUG_LOG("server received sink adv\n");
+
+ /*
+ * Issue RDMA Read.
+ */
+ cbp->rdma_sq_wr.opcode = IBV_WR_RDMA_READ;
+ cbp->rdma_sq_wr.wr_id =
+ (uint64_t) (unsigned long) &cbp->rdma_sq_wr;
+ cbp->rdma_sq_wr.sg_list = &cbp->rdma_sgl;
+ cbp->rdma_sq_wr.send_flags = IBV_SEND_SIGNALED;
+ cbp->rdma_sq_wr.wr.rdma.rkey = cbp->remote_rkey;
+ cbp->rdma_sq_wr.wr.rdma.remote_addr = cbp->remote_addr;
+ cbp->rdma_sq_wr.sg_list->length = cbp->remote_len;
+
+ rc = ibv_post_send(cbp->qp, &cbp->rdma_sq_wr, &bad_wr);
+ if (rc) {
+ fprintf(stderr, "post send error %d\n", rc);
+ goto out;
+ }
+ DEBUG_LOG("server posted rdma read req \n");
+
+ /*
+ * Wait for read completion
+ */
+ rc = sem_wait(&cbp->sem);
+ if (rc || (cbp->state == ERROR)) {
+ fprintf(stderr, "wait for "
+ "RDMA_READ_COMPLETE state error %d "
+ "state %d\n",
+ rc, cbp->state);
+ goto out;
+ }
+ DEBUG_LOG("server received read complete\n");
+
+ /*
+ * Display data in recv buf
+ */
+ if (cbp->verbose) {
+ printf("server ping data: %s\n",
+ cbp->rdma_buf);
+ }
+
+ /*
+ * Tell client to continue
+ */
+ cbp->send_sgl.length = 1;
+ rc = ibv_post_send(cbp->qp, &cbp->sq_wr, &bad_wr);
+ if (rc) {
+ fprintf(stderr, "post send error %d\n", rc);
+ goto out;
+ }
+ DEBUG_LOG("server posted go ahead\n");
+
+ /*
+ * Wait for client's RDMA STAG/TO/Len
+ */
+ rc = sem_wait(&cbp->sem);
+ if (rc || (cbp->state == ERROR)) {
+ fprintf(stderr, "wait for RDMA_WRITE_ADV "
+ "state error %d state %d\n",
+ rc, cbp->state);
+ goto out;
+ }
+ DEBUG_LOG("server received sink adv\n");
+
+ /*
+ * RDMA Write echo data
+ */
+ cbp->rdma_sq_wr.opcode = IBV_WR_RDMA_WRITE;
+ cbp->rdma_sq_wr.wr.rdma.rkey = cbp->remote_rkey;
+ cbp->rdma_sq_wr.wr.rdma.remote_addr = cbp->remote_addr;
+ cbp->rdma_sq_wr.send_flags = IBV_SEND_SIGNALED;
+ cbp->rdma_sq_wr.sg_list->length =
+ strlen(cbp->rdma_buf) + 1;
+ DEBUG_LOG(
+ "rdma write from lkey %x laddr %llx len %d\n",
+ cbp->rdma_sq_wr.sg_list->lkey,
+ cbp->rdma_sq_wr.sg_list->addr,
+ cbp->rdma_sq_wr.sg_list->length);
+ rc = ibv_post_send(cbp->qp, &cbp->rdma_sq_wr, &bad_wr);
+ if (rc) {
+ fprintf(stderr, "post send error %d\n", rc);
+ goto out;
+ }
+
+ /*
+ * Wait for completion
+ */
+ rc = sem_wait(&cbp->sem);
+ if (rc || (cbp->state == ERROR)) {
+ fprintf(stderr, "waiting for "
+ "RDMA_WRITE_COMPLETE state error %d\n",
+ rc);
+ goto out;
+ }
+ DEBUG_LOG("server rdma write complete \n");
+
+ /*
+ * Tell client to begin again
+ */
+ cbp->send_sgl.length = 1;
+ rc = ibv_post_send(cbp->qp, &cbp->sq_wr, &bad_wr);
+ if (rc) {
+ fprintf(stderr, "post send error %d\n", rc);
+ goto out;
+ }
+ DEBUG_LOG("server posted go ahead\n");
+ }
+ } else {
+ cbp->pd = ibv_alloc_pd(cbp->cm_id->verbs);
+ if (!(cbp->pd)) {
+ rc = errno;
+ goto out;
+ }
+ DEBUG_LOG("created pd %p\n", cbp->pd);
+
+ cbp->channel = ibv_create_comp_channel(cbp->cm_id->verbs);
+ if (!cbp->channel) {
+ rc = errno;
+ goto out;
+ }
+ DEBUG_LOG("created channel %p\n", cbp->channel);
+
+ cbp->cq = ibv_create_cq(cbp->cm_id->verbs,
+ RPING_SQ_DEPTH * 2, cbp, cbp->channel,
+ 0);
+ if (!(cbp->cq)) {
+ rc = errno;
+ goto out;
+ }
+ DEBUG_LOG("created cq %p\n", cbp->cq);
+
+ rc = ibv_req_notify_cq(cbp->cq, 0);
+ if (rc) {
+ fprintf(stderr, "Failed to set notify!\n");
+ rc = errno;
+ goto out;
+ }
+
+ pthread_create(&cbp->cqthread, NULL, cq_thread, cbp);
+
+ rc = rping_create_qp(cbp);
+ if (rc) {
+ goto out;
+ }
+ DEBUG_LOG("created qp %p\n", cbp->qp);
+
+ /*
+ * Setup registered buffers.
+ */
+ rc = rping_setup_buffers(cbp);
+ if (rc)
+ goto out;
+
+ /*
+ * Connect to server.
+ */
+ rc = rping_connect(cbp, "client", strlen("client") + 1);
+ if (rc) {
+ fprintf(stderr, "connect error %d\n", rc);
+ goto out;
+ }
+
+ rc = sem_wait(&cbp->sem);
+ if (rc || (cbp->state == ERROR)) {
+ fprintf(stderr,
+ "wait for CONNECTED error %d state %d\n", rc,
+ cbp->state);
+ goto out;
+ }
+
+ /*
+ * Client side ping loop.
+ */
+ start = 65;
+ while (1) {
+ int i;
+ struct ibv_send_wr *bad_wr;
+
+ cbp->state = RDMA_READ_ADV;
+
+ ++ping;
+ if (cbp->count && (ping > cbp->count)) {
+ goto out;
+ }
+
+ /*
+ * Put some ascii text in the buffer.
+ */
+ cc = sprintf(cbp->start_buf, "rdma-ping-%d: ", ping);
+ for (i = cc, c = start; i < cbp->size; i++) {
+ cbp->start_buf[i] = c;
+ c++;
+ if (c > 122)
+ c = 65;
+ }
+ start++;
+ if (start > 122)
+ start = 65;
+ cbp->start_buf[cbp->size] = 0;
+
+ /*
+ * Send our start buffer rkey/addr/len...
+ * The server will use this to RDMA READ the ping.
+ */
+ DEBUG_LOG("Sending Start rkey %x "
+ "addr %llx len %d for RDMA READ Source\n",
+ cbp->start_mr->rkey,
+ (uint64_t) (unsigned long) cbp->start_buf,
+ cbp->size + 1);
+ cbp->send_sgl.length = 16;
+ *((uint32_t *) (cbp->send_buf)) = cbp->start_mr->rkey;
+ *((uint64_t *) (cbp->send_buf + 4)) =
+ (uint64_t) (unsigned long) cbp->start_buf;
+ *((uint32_t *) (cbp->send_buf + 12)) = cbp->size + 1;
+ rc = ibv_post_send(cbp->qp, &cbp->sq_wr, &bad_wr);
+ if (rc) {
+ fprintf(stderr, "post send error %d\n", rc);
+ goto out;
+ }
+
+ /*
+ * Wait for server to ACK
+ */
+ rc = sem_wait(&cbp->sem);
+ if (rc || (cbp->state == ERROR)) {
+ fprintf(stderr, "wait for RDMA_WRITE_ADV "
+ "state error %d state %d\n",
+ rc, cbp->state);
+ goto out;
+ }
+
+ /*
+ * Send our rdma buffer rkey/addr/len for receiving
+ * the ping echo from the server via RDMA_WRITE...
+ */
+ DEBUG_LOG("Sending rkey %x addr %llx "
+ "len %d for RDMA WRITE Sink\n",
+ cbp->rdma_mr->rkey,
+ (uint64_t) (unsigned long) cbp->rdma_buf,
+ cbp->size + 1);
+ cbp->send_sgl.length = 16;
+ *((uint32_t *) (cbp->send_buf)) = cbp->rdma_mr->rkey;
+ *((uint64_t *) (cbp->send_buf + 4)) =
+ (uint64_t) (unsigned long) cbp->rdma_buf;
+ *((uint32_t *) (cbp->send_buf + 12)) = cbp->size + 1;
+ rc = ibv_post_send(cbp->qp, &cbp->sq_wr, &bad_wr);
+ if (rc) {
+ fprintf(stderr, "post send error %d\n", rc);
+ goto out;
+ }
+
+ /*
+ * Wait for the server to say the RDMA Write is
+ * complete.
+ */
+ rc = sem_wait(&cbp->sem);
+ if (rc || (cbp->state == ERROR)) {
+ fprintf(stderr, "wait for "
+ "RDMA_WRITE_COMPLETE state error %d "
+ "state %d\n", rc, cbp->state);
+ goto out;
+ }
+
+ if (cbp->validate) {
+
+ /*
+ * Validate data
+ */
+ if (memcmp (cbp->start_buf, cbp->rdma_buf,
+ cbp->size + 1)) {
+ fprintf(stderr, "data mismatch!\n");
+ goto out;
+ }
+ }
+
+ /*
+ * Display ping data.
+ */
+ if (cbp->verbose) {
+ printf("ping data: %s\n", cbp->rdma_buf);
+ }
+ }
+ }
+out:
+ DEBUG_LOG("disconnecting\n");
+ rdma_disconnect(cbp->cm_id);
+
+ /* cleanup */
+ if (cbp->child_cm_id) {
+ DEBUG_LOG("destroying child cm_id %p\n", cbp->child_cm_id);
+ rdma_destroy_id(cbp->child_cm_id);
+ }
+ if (cbp->qp) {
+ DEBUG_LOG("destroying qp %p\n", cbp->qp);
+ ibv_destroy_qp(cbp->qp);
+ }
+ if (cbp->cq) {
+ DEBUG_LOG("destroying cq %p\n", cbp->cq);
+ ibv_destroy_cq(cbp->cq);
+ }
+ if (cbp->recv_buf) {
+ DEBUG_LOG("freeing bufs/mrs\n");
+ rping_free_buffers(cbp);
+ }
+ if (cbp->pd) {
+ DEBUG_LOG("dealloc pd %p\n", cbp->pd);
+ ibv_dealloc_pd(cbp->pd);
+ }
+ printf("close complete - returning from test \n");
+ return;
+}
+
+static void usage(char *name)
+{
+ printf("%s -c|s [-vVd] [-S size] [-C count] -a addr -p port\n",
+ basename(name));
+ printf("\t-c\t\tclient side\n");
+ printf("\t-s\t\tserver side\n");
+ printf("\t-v\t\tdisplay ping data to stdout\n");
+ printf("\t-V\t\tverbosity\n");
+ printf("\t-d\t\tdebug printfs\n");
+ printf("\t-S size \tping data size\n");
+ printf("\t-C count\tping count times\n");
+ printf("\t-a addr\t\taddress\n");
+ printf("\t-p port\t\tport\n");
+}
+
+/*
+ * This function parses the command and executes the appropriate
+ * rping test. It is assumed this entire function
+ * can execute on the calling thread and sleep if needed.
+ */
+int main(int argc, char *argv[])
+{
+ struct rping_cb *cbp;
+ int op;
+ int rc = 0;
+ struct sockaddr_in sin;
+
+ cbp = malloc(sizeof(*cbp));
+ if (cbp == NULL) {
+ return ENOMEM;
+ }
+ memset(cbp, 0, sizeof(*cbp));
+ cbp->server = -1;
+ cbp->state = IDLE;
+ sem_init(&cbp->sem, 0, 0);
+
+ opterr = 0;
+ while ((op=getopt(argc, argv, "a:p:C:S:t:scvVd")) != -1) {
+ switch (op) {
+ case 'a':
+ cbp->addr_str = optarg;
+ cbp->addr = inet_addr(optarg);
+ DEBUG_LOG("ipaddr (%s)\n", optarg);
+ break;
+ case 'p':
+ cbp->port = htons(atoi(optarg));
+ DEBUG_LOG("port %d\n", (int) atoi(optarg));
+ break;
+ case 's':
+ cbp->server = 1;
+ DEBUG_LOG("server\n");
+ break;
+ case 'c':
+ cbp->server = 0;
+ DEBUG_LOG("client\n");
+ break;
+ case 'S':
+ cbp->size = atoi(optarg) - 1;
+ if ((cbp->size < 1)
+ || (cbp->size > (RPING_BUFSIZE - 1))) {
+ fprintf(stderr, "Invalid size %d "
+ "(valid range is 1 to %d)\n",
+ cbp->size, RPING_BUFSIZE);
+ rc = EINVAL;
+ } else
+ DEBUG_LOG("size %d\n",
+ (int) atoi(optarg));
+ break;
+ case 'C':
+ cbp->count = atoi(optarg);
+ if (cbp->count < 0) {
+ fprintf(stderr, "Invalid count %d\n",
+ cbp->count);
+ rc = EINVAL;
+ } else
+ DEBUG_LOG("count %d\n",
+ (int) cbp->count);
+ break;
+ case 'v':
+ cbp->verbose++;
+ DEBUG_LOG("verbose\n");
+ break;
+ case 'V':
+ cbp->validate++;
+ DEBUG_LOG("validate data\n");
+ break;
+ case 'd':
+ debug++;
+ break;
+ default:
+ usage("rping");
+ rc = EINVAL;
+ break;
+ }
+ }
+ if (rc)
+ goto out;
+
+ if (cbp->server == -1) {
+ fprintf(stderr, "must be either client or server\n");
+ rc = EINVAL;
+ goto out;
+ }
+
+ rc = rdma_create_id(&cbp->cm_id, cbp);
+ if (rc) {
+ rc = errno;
+ cbp->cm_id = NULL;
+ fprintf(stderr, "rdma_create_id error %d\n", rc);
+ goto out;
+ }
+ DEBUG_LOG("created cm_id %p\n", cbp->cm_id);
+
+ pthread_create(&cbp->cmthread, NULL, cm_thread, cbp);
+
+ /*
+ * Server binds to local addr/port to find the device. Client resolves
+ * the remote addr/port to find the device.
+ */
+ if (cbp->server) {
+ memset(&sin, 0, sizeof(sin));
+ sin.sin_family = AF_INET;
+ sin.sin_addr.s_addr = cbp->addr;
+ sin.sin_port = cbp->port;
+ rc = rdma_bind_addr(cbp->cm_id, (struct sockaddr *) &sin);
+ if (rc) {
+ fprintf(stderr, "rdma_bind_addr error %d\n", rc);
+ goto out;
+ }
+ DEBUG_LOG("rdma_bind_addr worked\n");
+ } else {
+ memset(&sin, 0, sizeof(sin));
+ sin.sin_family = AF_INET;
+ sin.sin_addr.s_addr = cbp->addr;
+ sin.sin_port = cbp->port;
+ rc = rdma_resolve_addr(cbp->cm_id, NULL,
+ (struct sockaddr *) &sin, 2000);
+ if (rc) {
+ fprintf(stderr, "rdma_resolve_addr error %d\n", rc);
+ goto out;
+ }
+
+ rc = sem_wait(&cbp->sem);
+ if (rc || cbp->state == ERROR) {
+ fprintf(stderr, "waiting for address resolution "
+ "error %d state %d\n", rc, cbp->state);
+ goto out;
+ }
+ DEBUG_LOG("rdma_resolve_addr worked\n");
+ }
+
+ do_rping(cbp);
+
+out:
+ if (cbp->cm_id) {
+ DEBUG_LOG("destroy cm_id %p\n", cbp->cm_id);
+ rdma_destroy_id(cbp->cm_id);
+ }
+ free(cbp);
+ return rc;
+}
More information about the general
mailing list